Nel precedente articolo della serie sul trading algoritmico per il forex abbiamo descritto alcune importanti modifiche al software DTForex. Questi aggiornamento hanno aumentato in modo signicativo le funzionalità del sistema, al punto che è quasi pronto per il backtesting con dati storici di tick su una gamma di coppie di valute.
In questo articolo descriviamo le seguenti modifiche apportate al sistema:
- Ulteriori modifiche agli oggetti
Position
ePortfolio
per consentire lo scambio di più coppie di valute e valute non denominate nella valuta del conto, cioè con un conto nominato in EUR si può ora negoziare anche GBP/USD, ad esempio. - Revisione completa delle modalità con cui
Position
ePortfolio
calcolano di apertura, chiusura, aggiunta e rimozione di unità. L’oggettoPosition
esegue ora la maggior parte della logica, lasciando all’oggettoPortfolio
la gestione ad alto livello. - Aggiunta della prima strategia non banale, ovvero la ben nota strategia Moving Average Crossover con una coppia di medie mobili semplici (SMA).
- Modifica di
backtest.py
per renderlo single-threaded e deterministico. Nonostante il mio ottimismo sul fatto che un approccio multi-thread non sarebbe troppo dannoso per l’accuratezza della simulazione, ho trovato difficile ottenere risultati soddisfacenti di backtest con un approccio multi-thread. - Introduzione di uno script molto semplice di output basato su Matplotlib per visualizzare la curva di equity.
Gestione di coppie di valute multiple
Una caratteristica del sistema di trading che abbiamo discusso molte volte negli articoli di questa serie è la possibilità capacità di gestire più coppie di valute.
In questa articolo vediamo come modificare il software per consentire la gestione di conti nominati in valute diverse da EUR, che era la sola valuta codificata in precedenza. Descriviamo anche come poter negoziare altre coppie di valute, ad eccezione di quelle che consistono in una base o quotazione in Yen giapponese (JPY). La limitazione sullo Yen è dovuta alle modalità di calcolo delle dimensioni dei tick nelle coppie di valute con JPY.
Per ottenere questo è necessario modificare la logica di calcolo del profitto quando le unità vengono rimosse o la posizione viene chiusa. Di seguito vediamo il nuovo codice per calcolo dei pips, nel file position.py
:
def calculate_pips(self):
mult = Decimal("1")
if self.position_type == "long":
mult = Decimal("1")
elif self.position_type == "short":
mult = Decimal("-1")
pips = (mult * (self.cur_price - self.avg_price)).quantize(
Decimal("0.00001"), ROUND_HALF_DOWN
)
return pips
Se chiudiamo la posizione per realizzare un guadagno o una perdita, dobbiamo utilizzare il seguente codice per close_position
, da inserire nel file position.py
:
def close_position(self):
ticker_cp = self.ticker.prices[self.currency_pair]
ticker_qh = self.ticker.prices[self.quote_home_currency_pair]
if self.position_type == "long":
remove_price = ticker_cp["ask"]
qh_close = ticker_qh["bid"]
else:
remove_price = ticker_cp["bid"]
qh_close = ticker_qh["ask"]
self.update_position_price()
# Calcolo dele PnL
pnl = self.calculate_pips() * qh_close * self.units
return pnl.quantize(Decimal("0.01", ROUND_HALF_DOWN))
In primo luogo otteniamo i prezzi denaro e lettera sia per la coppia di valute negoziata che per la coppia di valute di base (“quote/home”). Ad esempio, per un conto denominato in EUR, dove stiamo negoziando GBP/USD, dobbiamo ottenere i prezzi per “USD/EUR”, poiché GBP è la valuta di base e USD è la quotazione.
In questa fase controlliamo se la posizione stessa è una posizione long o short e quindi calcoliamo il “prezzo di rimozione” per la coppia negoziata e il “prezzo di rimozione” per la coppia quote/home, che calcolati rispettivamente da remove_price
e qh_close
.
Quindi aggiorniamo i prezzi correnti e medi all’interno della posizione e infine calcoliamo il P&L moltiplicando i pip, il prezzo di rimozione per quote/home e quindi il numero di unità che stiamo chiudendo.
Abbiamo completamente eliminato la necessità di valutare la “esposizione”, che era una variabile ridondante. Questa formula fornisce quindi correttamente il P&L rispetto a qualsiasi scambio di coppie di valute (non denominate in JPY).
Revisione della posizione e gestione del portafoglio
Oltre alla possibilità di negoziare più coppie di valute, vediamo come perfezionare la logica in cui Position
e Portfolio
“condividono” la responsabilità di aprire e chiudere le posizioni, nonché di aggiungere e sottrarre unità. In particolare, dobbiamo spostare molto del codice di gestione della posizione da portfolio.py
a position.py
.
Questo è più naturale poiché la posizione dovrebbe prendersi cura di se stessa e non delegarla al portafoglio!
In particolare, dobbiamo creare o migrare i metodi add_units
, remove_units
e close_position
:
def add_units(self, units):
cp = self.ticker.prices[self.currency_pair]
if self.position_type == "long":
add_price = cp["ask"]
else:
add_price = cp["bid"]
new_total_units = self.units + units
new_total_cost = self.avg_price * self.units + add_price * units
self.avg_price = new_total_cost / new_total_units
self.units = new_total_units
self.update_position_price()
def remove_units(self, units):
dec_units = Decimal(str(units))
ticker_cp = self.ticker.prices[self.currency_pair]
ticker_qh = self.ticker.prices[self.quote_home_currency_pair]
if self.position_type == "long":
remove_price = ticker_cp["ask"]
qh_close = ticker_qh["bid"]
else:
remove_price = ticker_cp["bid"]
qh_close = ticker_qh["ask"]
self.units -= dec_units
self.update_position_price()
# Calcolo dele PnL
pnl = self.calculate_pips() * qh_close * dec_units
return pnl.quantize(Decimal("0.01", ROUND_HALF_DOWN))
def close_position(self):
ticker_cp = self.ticker.prices[self.currency_pair]
ticker_qh = self.ticker.prices[self.quote_home_currency_pair]
if self.position_type == "long":
remove_price = ticker_cp["ask"]
qh_close = ticker_qh["bid"]
else:
remove_price = ticker_cp["bid"]
qh_close = ticker_qh["ask"]
self.update_position_price()
# Calcolo dele PnL
pnl = self.calculate_pips() * qh_close * self.units
return pnl.quantize(Decimal("0.01", ROUND_HALF_DOWN))
Portfolio
sono state ridotte. In particolare, i metodi add_new_position
, add_position_units
, remove_position_units
e close_position
sono stati modificati a seguito dello spostamento del calcolo all’interno dell’oggetto Position
:
def add_new_position(self, position_type, currency_pair, units, ticker):
ps = Position(
self.home_currency, position_type,
currency_pair, units, ticker
)
self.positions[currency_pair] = ps
def add_position_units(self, currency_pair, units):
if currency_pair not in self.positions:
return False
else:
ps = self.positions[currency_pair]
ps.add_units(units)
return True
def remove_position_units(self, currency_pair, units):
if currency_pair not in self.positions:
return False
else:
ps = self.positions[currency_pair]
pnl = ps.remove_units(units)
self.balance += pnl
return True
def close_position(self, currency_pair):
if currency_pair not in self.positions:
return False
else:
ps = self.positions[currency_pair]
pnl = ps.close_position()
self.balance += pnl
del[self.positions[currency_pair]]
return True
In sostanza, tutti i metodi (a parte add_new_position
) controllano semplicemente se la posizione esiste per quella coppia di valute e quindi chiamano il corrispondente metodo in Position
, tenendo conto del profitto se necessario.
Strategia di crossover della media mobile
In DataTrading abbiamo già descritto una strategia Moving Average Crossover, nel contesto del mercato azionario. È una strategia utile come banco di prova del sistema perché i calcoli sono facile da replicare, anche a mano (almeno a frequenze più basse!), al fine di verificare che il backtester si stia comportando come dovrebbe.
L’idea di base della strategia è la seguente:
- Vengono creati due filtri separati di media mobile semplici, con periodi variabili della finestra, di una particolare serie temporale.
- I segnali di acquisto dell’asset si verificano quando la media mobile più breve supera la media mobile più lunga.
- Se la media più lunga successivamente supera la media più breve, l’asset viene venduto.
La strategia funziona bene quando una serie temporale entra in un periodo di forte tendenza e poi lentamente inverte la tendenza.
L’implementazione è semplice. In primo luogo, implementiamo un metodo calc_rolling_sma
che ci consente di utilizzare in modo più efficiente il calcolo SMA del periodo di tempo precedente per generare quello nuovo, senza dover ricalcolare completamente l’SMA in ogni fase.
In secondo luogo, generiamo segnali in due casi. Nel primo caso generiamo un segnale se la SMA breve supera la SMA lunga e non siamo long nella coppia di valute. Nel secondo caso generiamo un segnale se la SMA lunga supera la SMA breve e siamo già long nello strumento.
In questo esempio abbiamo impostato il periodo della finestra a 500 tick per la SMA breve e 2.000 tick per la SMA lunga. Ovviamente in un ambiente di produzione questi parametri devono essere ottimizzati, ma funzionano bene per i nostri scopi di test.
class MovingAverageCrossStrategy(object):
"""
Una strategia base di Moving Average Crossover che genera
due medie mobili semplici (SMA), con finestre predefinite
di 500 tick per la SMA breve e 2.000 tick per la SMA
lunga.
La strategia è "solo long" nel senso che aprirà solo una
posizione long una volta che la SMA breve supera la SMA
lunga. Chiuderà la posizione (prendendo un corrispondente
ordine di vendita) quando la SMA lunga incrocia nuovamente
la SMA breve.
La strategia utilizza un calcolo SMA a rotazione per
aumentare l'efficienza eliminando la necessità di chiamare due
calcoli della media mobile completa su ogni tick.
"""
def __init__(
self, pairs, events,
short_window=500, long_window=2000
):
self.pairs = pairs
self.events = events
self.ticks = 0
self.invested = False
self.short_window = short_window
self.long_window = long_window
self.short_sma = None
self.long_sma = None
def calc_rolling_sma(self, sma_m_1, window, price):
return ((sma_m_1 * (window - 1)) + price) / window
def calculate_signals(self, event):
if event.type == 'TICK':
price = event.bid
if self.ticks == 0:
self.short_sma = price
self.long_sma = price
else:
self.short_sma = self.calc_rolling_sma(
self.short_sma, self.short_window, price
)
self.long_sma = self.calc_rolling_sma(
self.long_sma, self.long_window, price
)
# Si avvia la strategia solamente dopo aver creato una accurata
# finestra di breve periodo
if self.ticks > self.short_window:
if self.short_sma > self.long_sma and not self.invested:
signal = SignalEvent(self.pairs[0], "market", "buy", event.time)
self.events.put(signal)
self.invested = True
if self.short_sma < self.long_sma and self.invested:
signal = SignalEvent(self.pairs[0], "market", "sell", event.time)
self.events.put(signal)
self.invested = False
self.ticks += 1
Backtester a thread singolo
Un altro cambiamento importante è modificare il componente del backtest in modo da essere a singolo thread, anziché multi-thread.
Dobbiamo prevede questa modifica perché è molto complesso sincronizzare i thread da eseguire in un modo simile a quello che si avrebbe nel trading live senza introdurre errori e bias che comprometto i risultati del backtest. In particolare, con un backtester multi-thread si hanno i prezzi di entrata e di uscita molto irrealistici, perchè si verificano tipicamente dopo alcune ore (virtuali) l’effettiva ricezione del tick.
Per evitare questa criticità è sufficiente incorporare lo streaming dell’oggetto TickEvent
nel ciclo di backtest, come implementato nel seguente frammento di backtest.py
:
def backtest(events, ticker, strategy, portfolio,
execution, heartbeat, max_iters=200000
):
"""
Esegue un ciclo while infinito che esegue il polling
della coda degli eventi e indirizza ogni evento al
componente della strategia del gestore di esecuzione.
Il ciclo si fermerà quindi per "heartbeat" secondi
e continuerà fino a quando si supera il numero massimo
di iterazioni.
"""
iters = 0
while True and iters < max_iters:
ticker.stream_next_tick()
try:
event = events.get(False)
except queue.Empty:
pass
else:
if event is not None:
if event.type == 'TICK':
strategy.calculate_signals(event)
elif event.type == 'SIGNAL':
portfolio.execute_signal(event)
elif event.type == 'ORDER':
execution.execute_order(event)
time.sleep(heartbeat)
iters += 1
portfolio.output_results()
Da notare la linea ticker.stream_next_tick()
. Questo metodo viene chiamato prima di un polling della coda degli eventi e quindi ci assicuriamo che un nuovo evento tick venga elaborato prima che la coda venga nuovamente interrogata.
In questo modo un segnale è eseguito all’arrivo di nuovi dati di mercato, anche se c’è un certo ritardo nel processo di esecuzione degli ordini a causa dello slippage.
Abbiamo anche impostato un valore max_iters
che controlla per quanto tempo continua il ciclo di backtest. In pratica questo dovrà essere abbastanza grande quando si tratta di più valute in più giorni. In questo caso è stato impostato su un valore predefinito che consente di elaborare i dati di un singolo giorno di una coppia di valute.
Il metodo stream_next_tick
della classe del price handler è simile a, stream_to_queue
tranne per il fatto che chiama manualmente il metodo iterativo next()
, invece di eseguire il tick streaming in un ciclo for:
def stream_next_tick(self):
"""
Il Backtester è ora passato ad un modello a un thread singolo
in modo da riprodurre completamente i risultati su ogni esecuzione.
Ciò significa che il metodo stream_to_queue non può essere usato
ed è sostituito dal metodo stream_next_tick.
Questo metodo viene chiamato dalla funzione di backtesting, esterna
a questa classe e inserisce un solo tick nella coda, ed inoltre
aggiornare l'attuale bid / ask e l'inverso bid / ask.
"""
try:
index, row = self.all_pairs.next()
except StopIteration:
return
else:
self.prices[row["Pair"]]["bid"] = Decimal(str(row["Bid"])).quantize(
Decimal("0.00001", ROUND_HALF_DOWN)
)
self.prices[row["Pair"]]["ask"] = Decimal(str(row["Ask"])).quantize(
Decimal("0.00001", ROUND_HALF_DOWN)
)
self.prices[row["Pair"]]["time"] = index
inv_pair, inv_bid, inv_ask = self.invert_prices(row)
self.prices[inv_pair]["bid"] = inv_bid
self.prices[inv_pair]["ask"] = inv_ask
self.prices[inv_pair]["time"] = index
tev = TickEvent(row["Pair"], index, row["Bid"], row["Ask"])
self.events_queue.put(tev)
Da notare che si interrompe al ricevimento di un’eccezione StopIteration
. Ciò consente al codice di riprendere l’esecuzione anziché bloccarsi.
Visualizzazione risultati con Matplotlib
Dobbiamo anche creare uno script di output, utilizzando Matplotlib in modo molto semplice per visualizzare la curva di equity. Il file output.py
è inserito all’interno della directory backtest
di DTForex ed il codice è riportato di seguito:.
import os, os.path
import pandas as pd
import matplotlib.pyplot as plt
from settings import OUTPUT_RESULTS_DIR
if __name__ == "__main__":
"""
Un semplice script per visualizzare il grafico del bilancio del portfolio, o
"curva di equity", in funzione del tempo.
Richiede l'impostazione di OUTPUT_RESULTS_DIR nel settings del progetto.
"""
equity_file = os.path.join(OUTPUT_RESULTS_DIR, "equity.csv")
equity = pd.io.parsers.read_csv(
equity_file, header=True,
names=["time", "balance"],
parse_dates=True, index_col=0
)
equity["balance"].plot()
plt.show()
Da notare che settings.py
deve ora prevedere la nuova variabile OUTPUT_RESULTS_DIR
, che deve essere presente e valorizzata nelle impostazioni. In questo esempio abbiamo impostato a una directory temporanea fuori dalla struttura del progetto in modo da non aggiungere accidentalmente nessun risultato di backtest al codice base del progetto!
La curva di equity è costruita aggiungendo un valore di portafoglio (“balance”) a una lista di dizionari, con un dizionario corrispondente a una marca temporale.
Una volta completato il backtest, l’elenco dei dizionari viene convertito in un DataFrame di pandas e il metodo to_csv
viene utilizzato per l’output equity.csv
.
Questo script di output legge semplicemente il file e visualizza il grafico della colonna balance
del DataFrame.
Di seguito il codice per i metodi append_equity_row
e output_results
della classe Portfolio
:
def append_equity_row(self, time, balance):
d = {"time": time, "balance": balance}
self.equity.append(d)
def output_results(self):
filename = "equity.csv"
out_file = os.path.join(OUTPUT_RESULTS_DIR, filename)
df_equity = pd.DataFrame.from_records(self.equity, index='time')
df_equity.to_csv(out_file)
print
"Simulation complete and results exported to %s" % filename
Ogni volta che viene chiamato execute_signal
, si richiamata il metodo precedente e si aggiunge il valore di timestamp / saldo al membro equity
.
Alla fine del backtest viene chiamato output_results
che semplicemente converte l’elenco dei dizionari in un DataFrame e quindi l’output nella directory specificata in OUTPUT_RESULTS_DIR
.
Sfortunatamente, questo non è un modo particolarmente appropriato per creare una curva di equity poiché si verifica solo quando viene generato un segnale. Ciò significa che non tiene conto del P&L non realizzato .
Anche se questo è il modo in cui avviene realmente il trading (non si fa effettivamente un profitto/perdita fino a quando non si chiude una posizione!), Significa che la curva dell’equità rimarrà completamente piatta tra gli aggiornamenti del saldo del portafoglio. Peggio ancora, Matplotlib per impostazione predefinita esegue l’interpolazione lineare tra questi punti, fornendo così la falsa impressione del P&L non realizzato.
La soluzione a questo problema è creare un tracker P&L non realizzato per la classe Position
che si aggiorna correttamente ad ogni tick. Questo è un po ‘più costoso dal punto di vista computazionale, ma consente una curva di equity più utile e realistica. Descriveremo questa funzione in un prossimo articolo!
Prossimi Passi
La prossima funzionalità da prevedere per DTForex è la possibilità di effettuare il backtesting con dati relativi a molti giorni. Attualmente l’oggetto HistoricCSVPriceHandler
carica solo il valore di un singolo giorno di dati tick DukasCopy per qualsiasi coppia di valute specificata.
Per consentire backtest per periodi che coprono più giorni, sarà necessario caricare e trasmettere sequenzialmente un singolo giorno in modo da evitare di riempire la RAM con l’intera cronologia dei dati dei tick. Ciò richiederà una modifica al funzionamento del metodo stream_next_tick
. Una volta completato, consentirà il backtesting della strategia a lungo termine su più coppie.
Un altro compito è migliorare l’output della curva di equity Per calcolare una qualsiasi delle normali metriche di performance (come lo Sharpe Ratio ), avremo bisogno di calcolare i rendimenti percentuali in un determinato periodo di tempo. Tuttavia, ciò richiede di raggruppare i dati del tick in barre per calcolare un rendimento di periodo di tempo.
Tale binning deve avvenire su una frequenza di campionamento che è simile alla frequenza di negoziazione o lo Sharpe Ratio non rifletterà il vero rischio / rendimento della strategia. Questo raggruppamento non è un esercizio banale in quanto ci sono molti presupposti che contribuiscono a generare un “prezzo” per ogni campione.
Una volta completate queste due attività e acquisiti dati sufficienti, saremo in grado di eseguire il backtest di un’ampia gamma di strategie forex basate sui dati tick e di produrre curve azionarie al netto della maggior parte dei costi di transazione. Inoltre, sarà estremamente semplice testare queste strategie sul conto di paper trading fornito da OANDA.
Ciò dovrebbe consentire di prendere decisioni più precise sull’opportunità di eseguire una strategia, rispetto ai test effettuati con un sistema di backtesting più “orientato alla ricerca”.
Per il codice completo riportato in questo articolo, utilizzando il modulo di backtesting event-driven per il forex (DTForex) si può consultare il seguente repository di github:
https://github.com/datatrading-info/DTForex