Negli ultimi giorno sono stato impegnato a lavorare sul progetto open-source DTForex. Ho apportato alcuni utili miglioramenti e ho pensato di condividerli con questo nuovo articolo della serie dedicata al trading algoritmico sul mercato forex.
In particolare, ho apportato le seguenti modifiche, che descriveremo a lungo in questo articolo:
- Modifica dell’oggetto
Position
per correggere un errore nella gestione delle aperture e delle chiusure di una posizione - Aggiunta della funzionalità di gestione dei dati storici tramite il download di file di dati tick da DukasCopy
- Implementazione della prima versione di un backtester basato su eventi sulla base dei dati di tick giornalieri
Correzione degli errori di gestione della posizione
La prima modifica che introduciamo è una nuova logica per gestire gli ordini acquisto/vendita nell’oggetto Position
.
Inizialmente l’oggetto Position
è stato progettato in modo molto snello, delegando all’oggetto Portfolio
la maggior parte del lavoro per il calcolo dei prezzi di posizione
Tuttavia, questo ha aumentato inutilmente la complessità della classe Portfolio
, che rende il codice difficile da leggere e capirne la logica. Inoltre diventa particolarmente problematico quando si vuole implementare una gestione personalizzata del portafoglio senza doversi preoccupare della gestione delle posizioni “standard”.
Inoltre abbiamo verificato la presenza di un errore concettuale nella logica implementata: abbiamo mescolato l’acquisto e la vendita di ordini con essere in una posizione long o short. Questo significava il calcolo non corretto del P&L alla chiusura di una posizione il calcolo.
Abbiamo quindi modificato l’oggetto Position
per accettare i prezzi bid e ask, invece di “aggiungere” e “rimuovere” i prezzi, che erano originariamente determinati a monte dell’oggetto Position
tramite il Portfolio
. In questo modo l’oggetto Position
tiene traccia se siamo long o short e utilizza il corretto prezzo di bid/ask come valore di acquisto o di chiusura.
Abbiamo anche modificato gli unit test per riflettere la nuova interfaccia. Nonostante il fatto che queste modifiche richiedano del tempo per essere completate, fornisce una maggiore fiducia nei risultati. Ciò è particolarmente vero se consideriamo strategie più sofisticate.
Di seguito vediamo il codice del nuovo file position.py:
from decimal import Decimal, getcontext, ROUND_HALF_DOWN
class Position(object):
def __init__(
self, position_type, market,
units, exposure, bid, ask
):
self.position_type = position_type # Long or short
self.market = market
self.units = units
self.exposure = Decimal(str(exposure))
# Long or short
if self.position_type == "long":
self.avg_price = Decimal(str(ask))
self.cur_price = Decimal(str(bid))
else:
self.avg_price = Decimal(str(bid))
self.cur_price = Decimal(str(ask))
self.profit_base = self.calculate_profit_base(self.exposure)
self.profit_perc = self.calculate_profit_perc(self.exposure)
def calculate_pips(self):
getcontext.prec = 6
mult = Decimal("1")
if self.position_type == "long":
mult = Decimal("1")
elif self.position_type == "short":
mult = Decimal("-1")
return (mult * (self.cur_price - self.avg_price)).quantize(
Decimal("0.00001"), ROUND_HALF_DOWN
)
def calculate_profit_base(self, exposure):
pips = self.calculate_pips()
return (pips * exposure / self.cur_price).quantize(
Decimal("0.00001"), ROUND_HALF_DOWN
)
def calculate_profit_perc(self, exposure):
return (self.profit_base / exposure * Decimal("100.00")).quantize(
Decimal("0.00001"), ROUND_HALF_DOWN
)
def update_position_price(self, bid, ask, exposure):
if self.position_type == "long":
self.cur_price = Decimal(str(bid))
else:
self.cur_price = Decimal(str(ask))
self.profit_base = self.calculate_profit_base(exposure)
self.profit_perc = self.calculate_profit_perc(exposure)
Gestione dei dati storici dei tick
La successiva importante funzionalità da prevedere all’interno di un completo sistema di trading è l’abilità di effettuare un backtesting ad alta frequenza .
Un prerequisito essenziale consiste nella creazione di un archivio per i dati di tick delle coppie di valute. Tali dati possono diventare piuttosto grandi. Ad esempio, i dati di tick di un giorno per una singola coppia di valute da DukasCopy in formato CSV ha una dimensione di circa 3,3 Mb.
Si può quindi facilmente intuire come il backtest intraday di oltre 20 coppie di valute, su più anni, con significative variazioni dei parametri, può portare rapidamente a gigabyte di dati che devono essere elaborati.
Tali dati necessitano di una gestione speciale, compresa la creazione di un database di titoli, al alte prestazioni e completamente automatizzato. Discuteremo di questo sistema in futuro, ma per ora i file CSV saranno sufficienti per i nostri scopi.
Per mettere sullo stesso piano i dati storici di backtest e di live streaming, dobbiamo creare una classe atratta di gestione dei prezzi chiamata PriceHandler
.
PriceHandler
è un esempio di una classe base astratta, dove si prevede che qualsiasi classe ereditata deve sovrascrivere i metodi “puramente virtuali”. L’unico metodo obbligatorio è stream_to_queue
, che viene chiamato tramite il thread dei prezzi quando il sistema viene attivato (live trading o backtest). La funzione stream_to_queue
recuepra i dati sui prezzi da una sorgente che dipende dalla particolare implementazione della classe, quindi utilizza il metodo .put()
della libreria queue per aggiungere un oggetto TickEvent
.
In questo modo tutte le sottoclassi di PriceHandler
possono interfacciarsi con il resto del sistema di trading senza che i componenti rimanenti sappiano (o si preoccupino!) di come vengono generati i dati sui prezzi.
Questo ci offre una notevole flessibilità per collegare file flat, archivi di file come HDF5, database relazionali come PostgreSQL o anche risorse esterne come siti Web, al motore di backtesting o di trading live.
Di seguito il codice dell’oggetto PriceHandler
:
from abc import ABCMeta, abstractmethod
..
..
class PriceHandler(object):
"""
PriceHandler è una classe base astratta che fornisce un'interfaccia per
tutti i successivi gestori di dati (ereditati) (sia live che storici).
L'obiettivo di un oggetto PriceHandler (derivato) è produrre un insieme di
bid / ask / timestamp "tick" per ogni coppia di valute e inserirli
una coda di eventi.
Questo replicherà il modo in cui una strategia live funzionerebbe con i dati
dei tick che sarebbero trasmessi in streaming tramite un broker.
"""
__metaclass__ = ABCMeta
@abstractmethod
def stream_to_queue(self):
"""
Trasmette una sequenza di eventi di dati tick (timestamp, bid, ask)
come tuple alla coda degli eventi.
"""
raise NotImplementedError("Should implement stream_to_queue()")
Abbiamo bisogno inoltre di una sottoclasse chiamata HistoricCSVPriceHandler
, che preveda due metodi.
Il primo è chiamato _open_convert_csv_files
e utilizza Pandas per caricare un file CSV in un DataFrame e formare le colonne Bid e Ask. Il secondo metodo, stream_to_queue
scorre attraverso questo DataFrame e ad ogni iterazione aggiunge un oggetto TickEvent
alla coda degli eventi.
Inoltre, i prezzi correnti di bid/ask correnti impostati a livello di classe, e vengono successivamente interrogati tramite l’oggetto Portfolio
.
Di seguito il codice di HistoricCSVPriceHandler
:
class HistoricCSVPriceHandler(PriceHandler):
"""
HistoricCSVPriceHandler è progettato per leggere un file CSV di
dati tick per ciascuna coppia di valute richiesta e trasmetterli in streaming
alla coda degli eventi.
"""
def __init__(self, pairs, events_queue, csv_dir):
"""
Inizializza il gestore dati storici richiedendo
la posizione dei file CSV e un elenco di simboli.
Si presume che tutti i file siano nella forma
'pair.csv', dove " pair " è la coppia di valute. Per
EUR/USD il nome del file è EURUSD.csv.
Parametri:
pairs - L'elenco delle coppie di valute da ottenere.
events_queue - La coda degli eventi a cui inviare i tick.
csv_dir: percorso di directory assoluto per i file CSV.
"""
self.pairs = pairs
self.events_queue = events_queue
self.csv_dir = csv_dir
self.cur_bid = None
self.cur_ask = None
def _open_convert_csv_files(self):
"""
Apre i file CSV dalla directory su disco, converte i dati
in un DataFrame di pandas con un dizionario di coppie.
"""
pair_path = os.path.join(self.csv_dir, '%s.csv' % self.pairs[0])
self.pair = pd.io.parsers.read_csv(
pair_path, header=True, index_col=0, parse_dates=True,
names=("Time", "Ask", "Bid", "AskVolume", "BidVolume")
).iterrows()
def stream_to_queue(self):
self._open_convert_csv_files()
for index, row in self.pair:
self.cur_bid = Decimal(str(row["Bid"])).quantize(
Decimal("0.00001", ROUND_HALF_DOWN)
)
self.cur_ask = Decimal(str(row["Ask"])).quantize(
Decimal("0.00001", ROUND_HALF_DOWN)
)
tev = TickEvent(self.pairs[0], index, row["Bid"], row["Ask"])
self.events_queue.put(tev)
Ora che abbiamo una funzionalità per gestire i dati storici di base, siamo in grado di creare un backtester completamente guidato dagli eventi.
Funzionalità di BackTesting Event-Driven
Nel trading algoritmico è fondamentale utilizzare un motore di backtesting che si avvicina il più possibile ad un motore di trading live. Ciò è dovuto al fatto che una sofisticata gestione dei costi di transazione, soprattutto ad alta frequenza, è spesso il fattore determinante per stabilire se una strategia sarà redditizia o meno.
Tale gestione dei costi di transazione ad alta frequenza può essere realmente simulata solo con l’uso di un motore di esecuzione basato su eventi multi-thread. Sebbene un tale sistema sia significativamente più complicato di un basilare backtester vettorializzato di “ricerca” di P&L, potrà simulare più fedelmente il comportamento reale e ci consentirà di prendere decisioni migliori nella scelta delle strategie.
Inoltre, possiamo iterare più rapidamente col passare del tempo, perché non dovremo passare continuamente dalla strategia di “livello di ricerca” alla strategia di “livello di implementazione” poiché sono la stessa cosa. Gli unici due componenti che cambiano sono la classe di streaming dei prezzi e la classe di esecuzione. Tutto il resto sarà identico tra i sistemi di backtesting e live trading.
In effetti, questo significa che il nuovo codice backtest.py
è quasi identico al codice trading.py
che gestisce il trading real o il trading practice con OANDA. Abbiamo solo bisogno di prevedere l’importazione delle classi HistoricPriceCSVHandler
e SimulatedExecution
al posto delle classi StreamingPriceHandler
e OANDAExecutionHandler
. Tutto il resto rimane lo stesso.
Di seguito il codice di backtest.py
:
import copy, sys
import queue
import threading
import time
from decimal import Decimal, getcontext
from execution import SimulatedExecution
from portfolio import Portfolio
from settings import settings
from strategy import TestStrategy
from data.price import HistoricCSVPriceHandler
def trade(events, strategy, portfolio, execution, heartbeat):
"""
Esegue un ciclo while infinito che esegue il polling
della coda degli eventi e indirizza ogni evento al
componente della strategia del gestore di esecuzione.
Il ciclo si fermerà quindi per "heartbeat" secondi
e continuerà.
"""
while True:
try:
event = events.get(False)
except queue.Empty:
pass
else:
if event is not None:
if event.type == 'TICK':
strategy.calculate_signals(event)
elif event.type == 'SIGNAL':
portfolio.execute_signal(event)
elif event.type == 'ORDER':
execution.execute_order(event)
time.sleep(heartbeat)
if __name__ == "__main__":
# Imposta il numero di decimali a 2
getcontext().prec = 2
heartbeat = 0.0 # mezzo secondo tra ogni polling
events = queue.Queue()
equity = settings.EQUITY
# Carica il file CSV dei dati storici
pairs = ["EURUSD"]
csv_dir = settings.CSV_DATA_DIR
if csv_dir is None:
print("No historic data directory provided - backtest terminating.")
sys.exit()
# Crea la classe di streaming dei dati storici di tick
prices = HistoricCSVPriceHandler(pairs, events, csv_dir)
# Crea il generatore della strategia/signale, passando lo
# strumento e la coda degli eventi
strategy = TestStrategy(pairs[0], events)
# Crea l'oggetto portfolio per tracciare i trade
portfolio = Portfolio(prices, events, equity=equity)
# Crea il gestore di esecuzione simulato
execution = SimulatedExecution()
# Crea due thread separati: uno per il ciclo di trading
# e un'altro per la classe di streaming dei prezzi di mercato
trade_thread = threading.Thread(
target=trade, args=(
events, strategy, portfolio, execution, heartbeat
)
)
price_thread = threading.Thread(target=prices.stream_to_queue, args=[])
# Avvia entrambi i thread
trade_thread.start()
price_thread.start()
L’utilizzo di un sistema di esecuzione multi-thread per il backtest ha il principale svantaggio di non essere deterministico. Ciò significa che eseguendo più volte il backtest degli stessi dati si avranno risultati differenti, anche se piccole.
Questo accade perché non è possiamo garantire lo stesso ordine delle istruzioni eseguite dai thread, per esecuzioni differenti della stessa simulazione. Ad esempio, quando si inseriscono elementi nella coda, si potrebbero ottenere nove oggetti TickEvent
inseriti nella coda nel backtest n.1, ma potremmo ottenerne undici nel backtest n.2.
Poiché l’oggetto Strategy
esegue il polling della coda degli oggetti TickEvent
, vedrà prezzi bid/ask diversi nelle due serie e quindi aprirà una posizione a prezzi bid/ask diversi. Ciò porterà a (piccole) differenze nei rendimenti.
Questo è un grosso problema? Non credo proprio. Non solo è così che funzionerà il sistema live, ma ci consente anche di sapere quanto sia sensibile la nostra strategia alla velocità di ricezione dei dati. Ad esempio, se calcoliamo la varianza dei rendimenti in tutte i backtest eseguiti con gli stessi dati, avremo un’idea di quanto la strategia sia sensibile alla latenza dei dati.
Idealmente, vogliamo una strategia che abbia una piccola varianza in ciascuna delle nostre serie. Tuttavia, se si ha una varianza elevata, significa che dovremmo fare molta attenzioni a mettere live questa strategia.
Potremmo persino eliminare completamente il problema del determinismo semplicemente utilizzando un thread singolo nel nostro codice di backtest (come per il backtester event-driven per le azioni di DataTrading). Tuttavia, questo ha lo svantaggio di ridurre il realismo con il sistema live. Questi sono i dilemmi di simulazione di trading ad alta frequenza!
Prossimi Passi
Un altro problema del sistema che bisogna risolvere è la gestione di solo una valuta di base di EUR e una singola coppia di valute, EUR/USD.
Ora che la gestione Position
è stata sostanzialmente modificata, sarà molto più semplice estenderla per gestire più coppie di valute. Questo è il passaggio successivo.
A quel punto saremo in grado di provare strategie multi-coppia di valute ed eventualmente introdurre Matplotlib per rappresentare graficamente i risultati.
Per il codice completo riportato in questo articolo, utilizzando il modulo di backtesting event-driven per il forex (DTForex) si può consultare il seguente repository di github:
https://github.com/datatrading-info/DTForex