Motore di Backtesting con Python – Parte IX (Connessione con IB)

È passato un po ‘di tempo da quando abbiamo la serie di articoli realtivi ad un ambiente di backtesting basato sugli eventi, che abbiamo iniziato a discutere in questo articolo. Nella Parte VI è stato descritto come implementare un modello di ExecutionHandler funzionante per una simulazione storica di backtesting. In questo si vuole implementare il gestore dell’API di Interactive Brokers in modo da poter utilizzare l’ExecutionHandler per il live trading.

In precedenza abbiamo visto come come scaricare Trader Workstation e creare un account demo di Interactive Brokers e su come creare un’interfaccia di base verso l’API IB usando IbPy. Questo articolo descrivere come collegare l’interfaccia IbPy all’interno di un sistema event-driven, in modo tale che, quando accoppiato con un feed di dati di mercato real-time, costituirà la base per un sistema di esecuzione automatizzato.

L’idea alla base della classe IBExecutionHandler consiste nel ricevere istanze OrderEvent dalla coda degli eventi ed eseguirli direttamente verso l’API di ordine di Interactive Brokers utilizzando la libreria IbPy. La classe gestirà anche i messaggi “Server Response” inviati in rispota dalla stessa API. In questa fase, l’unica azione intrapresa sarà creare le corrispondenti istanze FillEvent corrispondenti che verranno quindi ritrasferite nella coda degli eventi.

La stessa classe può essere facilmente resa più complessa, includendo una logica di ottimizzazione dell’esecuzione e una sofisticata gestione degli errori. Tuttavia, in questa fase è opportuno mantenerla relativamente semplice in modo che si possa capire le principali funzionalità ed estenderle nella direzione che si adatta al tuo particolare stile di trading.

Implementazione in Python

Come sempre, il primo passo è creare il file Python e importare le librerie necessarie. Il file si chiama ib_execution.py e risiede nella stessa directory degli altri file event-driven. Importiamo le necessarie librerie per la gestione della data / ora, gli oggetti IbPy e i specifici oggetti Event gestiti da IBExecutionHandler:
            # ib_execution.py

import datetime
import time

from ib.ext.Contract import Contract
from ib.ext.Order import Order
from ib.opt import ibConnection, message

from event import FillEvent, OrderEvent
from execution import ExecutionHandler
        

A questo punto è necessario definire la classe IBExecutionHandler. Innanzitutto il costruttore __init__ richiede in input la coda degli eventi. Prevende inoltre la specifica di order_routing, che viene impostata a “SMART” come default. Nel caso l’exchange abbia specifici requisiti, questi possono essere specificati in questo costruttore. Inoltre la currency predefinita è stata impostata sui Dollari USA.

All’interno del metodo si crea un dizionario fill_dict, necessario per l’utilizzo nella generazione delle istanze di FillEvent. Si prevede anche un oggetto di connessione tws_conn per archiviare le informazioni di connessione verso l’API di Interactive Brokers. Inoltre si crea un order_id iniziale, che tiene traccia di tutti gli ordini successivi per evitare duplicati. Infine si registra il gestore dei messaggi (che sarà definito dettagliatamente più avanti):

            # ib_execution.py

class IBExecutionHandler(ExecutionHandler):
    """
    Gestisce l'esecuzione degli ordini tramite l'API di Interactive 
    Brokers, da utilizzare direttamente sui conti reali durante il 
    live trading.
    """

    def __init__(self, events,
                 order_routing="SMART",
                 currency="USD"):
        """
        Inizializza l'instanza di IBExecutionHandler.
        """
        self.events = events
        self.order_routing = order_routing
        self.currency = currency
        self.fill_dict = {}

        self.tws_conn = self.create_tws_connection()
        self.order_id = self.create_initial_order_id()
        self.register_handlers()
        

L’API di IB utilizza un sistema di eventi basato sui messaggi che consente alla nostra classe di rispondere in modo specifico a determinati messaggi, in analogia allo stesso ambiente di backtesing event-driven stesso. Non si include nessuna gestione degli errori reali (a fini di brevità), ad eccezione dell’output al terminale tramite il metodo _error_handler.

Il metodo _reply_handler, d’altra parte, viene utilizzato per determinare se è necessario creare un’istanza FillEvent. Il metodo verifica se è stato ricevuto un messaggio “openOrder” e controlla se è presente una voce fill_dict relativa a questo particolare orderId. In caso contrario, ne viene creata una.

Inoltre, se il metodo verifica la presenta di un messaggio “orderStatus” e nel caso quel particolare messaggio indichi che un ordine è stato eseguito, allora richiama la funzione create_fill per creare un FillEvent. Si invia anche un messaggio al terminale per scopi di logging / debug:

            # ib_execution.py 
   
    def _error_handler(self, msg):
        """
        Gestore per la cattura dei messagi di errori
        """
        # Al momento non c'è gestione degli errori.
        print
        "Server Error: %s" % msg


    def _reply_handler(self, msg):
        """
        Gestione delle risposte dal server
        """
        # Gestisce il processo degli orderId degli ordini aperti
        if msg.typeName == "openOrder" and \
                msg.orderId == self.order_id and \
                not self.fill_dict.has_key(msg.orderId):
            self.create_fill_dict_entry(msg)
        # Gestione dell'esecuzione degli ordini (Fills)
        if msg.typeName == "orderStatus" and \
                msg.status == "Filled" and \
                self.fill_dict[msg.orderId]["filled"] == False:
            self.create_fill(msg)
        print
        "Server Response: %s, %s\n" % (msg.typeName, msg)
        
Il seguente metodo, create_tws_connection, crea una connessione all’API di IB usando l’oggetto ibConnection di IbPy. Utilizza la porta predefinita 7496 e un clientId predefinito a 10. Una volta creato l’oggetto, viene richiamato il metodo di connessione per eseguire la connessione:
            # ib_execution.py
    
    def create_tws_connection(self):
        """
        Collegamento alla Trader Workstation (TWS) in esecuzione 
        sulla porta standard 7496, con un clientId di 10.
        Il clientId è scelto da noi e avremo bisogno ID separati 
        sia per la connessione di esecuzione che per la connessione
        ai dati di mercato, se quest'ultima è utilizzata altrove.
        """
        tws_conn = ibConnection()
        tws_conn.connect()
        return tws_conn
        
Per tenere traccia degli ordini separati (ai fini del tracciamento degli eseguiti) viene utilizzato il metodo create_initial_order_id. E’ stato impostato su “1”, ma un approccio più sofisticato prevede la gestione della query IB per conoscere ed utilizzare l’ultimo ID disponibile. Si può sempre reimpostare l’ID dell’ordine corrente dell’API tramite il pannello Trader Workstation –> Configurazione globale –> Impostazioni API:
            # ib_execution.py
   
    def create_initial_order_id(self):
        """
        Crea l'iniziale ID dell'ordine utilizzato da Interactive
        Broker per tenere traccia degli ordini inviati.
        """
        # Qui c'è spazio per una maggiore logica, ma 
        # per ora useremo "1" come predefinito.
        return 1
        
Il seguente metodo, register_handlers, registra semplicemente i metodi per la gestione degli errori e delle risposte, definiti in precedenza con la connessione TWS:
            # ib_execution.py
    
    def register_handlers(self):
        """
        Registra le funzioni di gestione di errori e dei 
        messaggi di risposta dal server.
        """
        # Assegna la funzione di gestione degli errori definita
        # sopra alla connessione TWS 
        self.tws_conn.register(self._error_handler, 'Error')

        # Assegna tutti i messaggi di risposta del server alla
        # funzione reply_handler definita sopra
        self.tws_conn.registerAll(self._reply_handler)
        
Come descritto nel precedente tutorial relativo all’uso di IbPy, si deve creare un’istanza di Contract ed associarla a un’istanza di Order, che verrà inviata all’API di IB. Il seguente metodo, create_contract, genera la prima componente di questa coppia. Si aspetta in input un simbolo ticker, un tipo di sicurezza (ad esempio, azioni o futures), un exchange primario e una valuta. Restituisce l’istanza di Contract:
            # ib_execution.py
    
    def create_contract(self, symbol, sec_type, exch, prim_exch, curr):
        """
        Crea un oggetto Contract definendo cosa sarà
        acquistato, in quale exchange e in quale valuta.

        symbol - Il simbolo del ticker per il contratto
        sec_type - Il tipo di asset per il contratto ("STK" è "stock")
        exch - La borsa su cui eseguire il contratto
        prim_exch - Lo scambio principale su cui eseguire il contratto
        curr - La valuta in cui acquistare il contratto
        """
        contract = Contract()
        contract.m_symbol = symbol
        contract.m_secType = sec_type
        contract.m_exchange = exch
        contract.m_primaryExch = prim_exch
        contract.m_currency = curr
        return contract
        
Il metodo create_order genera la seconda componente della coppia, ovvero l’istanza di Order. Questo metodo prevede in input un tipo di ordine (ad es. market o limit), una quantità del bene da scambiare e una “posizione” (acquisto o vendita). Restituisce l’istanza di Order:
            # ib_execution.py
    
    def create_order(self, order_type, quantity, action):
        """
        Crea un oggetto Ordine (Market/Limit) per andare long/short.

        order_type - "MKT", "LMT" per ordini a mercato o limite
        quantity - Numero intero di asset dell'ordine
        action - 'BUY' o 'SELL'
        """
        order = Order()
        order.m_orderType = order_type
        order.m_totalQuantity = quantity
        order.m_action = action
        return order
        
Per evitare la duplicazione delle istanze di FillEvent per un particolare ID ordine, si utilizza un dizionario chiamato fill_dict per memorizzare le chiavi che corrispondono a particolari ID ordine. Quando è stato generato un eseguito, la chiave “fill” di una voce per un particolare ID ordine è impostata su True. Nel caso si riceva un successivo messaggio “Server Response” da IB che dichiara che un ordine è stato eseguito (ed è un messaggio duplicato) non si creerà un nuovo eseguito. Il seguente metodo create_fill_dict_entry implementa questa logica:
            # ib_execution.py
    
    def create_fill_dict_entry(self, msg):
        """
        Crea una voce nel dizionario Fill che elenca gli orderIds
        e fornisce informazioni sull'asset. Ciò è necessario
        per il comportamento guidato dagli eventi del gestore
        dei messaggi del server IB.
        """
        self.fill_dict[msg.orderId] = {
            "symbol": msg.contract.m_symbol,
            "exchange": msg.contract.m_exchange,
            "direction": msg.order.m_action,
            "filled": False
        }
        

 

Il metodo create_fill si occupa di creare effettivamente l’istanza di FillEvent e la inserisce all’interno della coda degli eventi:

            # ib_execution.py
    
    def create_fill(self, msg):
        """
        Gestisce la creazione del FillEvent che saranno
        inseriti nella coda degli eventi successivamente
        alla completa esecuzione di un ordine.
        """
        fd = self.fill_dict[msg.orderId]

        # Preparazione dei dati di esecuzione
        symbol = fd["symbol"]
        exchange = fd["exchange"]
        filled = msg.filled
        direction = fd["direction"]
        fill_cost = msg.avgFillPrice

        # Crea un oggetto di Fill Event
        fill_event = FillEvent(
            datetime.datetime.utcnow(), symbol,
            exchange, filled, direction, fill_cost
        )

        # Controllo per evitare che messaggi multipli non
        # creino dati addizionali.
        self.fill_dict[msg.orderId]["filled"] = True

        # Inserisce il fill event nella coda di eventi
        self.events.put(fill_event)
        

Dopo aver implementato tutti i metodi precedenti, resta solamente da sviluppare il metodo execute_order della classe base astratta ExecutionHandler. Questo metodo esegue effettivamente il posizionamento dell’ordine tramite l’API di IB.

Si verifica innanzitutto che l’evento ricevuto con questo metodo sia realmente un OrderEvent e quindi prepara gli oggetti Contract e Order con i rispettivi parametri. Una volta che sono stati creati entrambi, il metodo placeOrder dell’oggetto di connessione viene richiamato con associato a order_ID.

È estremamente importante chiamare il metodo time.sleep(1) per garantire che l’ordine sia effettivamente trasmesso ad IB. La rimozione di questa linea può causare comportamenti incoerenti dell’API, e perfino malfunzionamenti!

Infine, si incrementa l’ID ordine al fine di evitare la duplicazione degli ordini:

            # ib_execution.py
    
    def execute_order(self, event):
        """
        Crea il necessario oggetto ordine InteractiveBrokers
        e lo invia a IB tramite la loro API.

        I risultati vengono quindi interrogati per generare il 
        corrispondente oggetto Fill, che viene nuovamente posizionato
        nella coda degli eventi.

        Parametri:
        event - Contiene un oggetto Event con informazioni sull'ordine.
        """
        if event.type == 'ORDER':
            # Prepara i parametri per l'ordine dell'asset
            asset = event.symbol
            asset_type = "STK"
            order_type = event.order_type
            quantity = event.quantity
            direction = event.direction

            # Crea un contratto per Interactive Brokers tramite
            # l'evento Order in inuput
            ib_contract = self.create_contract(
                asset, asset_type, self.order_routing,
                self.order_routing, self.currency
            )

            # Crea un ordine per Interactive Brokers tramite
            # l'evento Order in inuput
            ib_order = self.create_order(
                order_type, quantity, direction
            )

            # Usa la connessione per inviare l'ordine a IB
            self.tws_conn.placeOrder(
                self.order_id, ib_contract, ib_order
            )

            # NOTE: questa linea è cruciale
            # Questo assicura che l'ordina sia effettivamente trasmesso!
            time.sleep(1)

            # Incrementa l'ordene ID per questa sessione
            self.order_id += 1
        

Questa classe costituisce la base per gestione dell’esecuzione verso Interactive Brokers e può essere utilizzata al posto del gestore dell’esecuzione simulata, che è adatto solo per il backtesting. Prima che il gestore di IB possa essere utilizzato è necessario creare un gestore del feed dei dati di mercato in tempo reale che deve sostituire il gestore del feed dei dati storici utilizzato nel backtesting.

Con questo approccio è possibile riutilizzare la maggior parte delle componenti di un sistema di backtesting per un sistema live, in modo da garantire che il codice “swap out” sia ridotto al minimo e quindi assicurare un comportamento simile, se non identico, tra i due sistemi.

 

Per il codice completo riportato in questo articolo, utilizzando il modulo di backtesting event-driven DataBacktest si può consultare il seguente repository di github:
https://github.com/datatrading-info/DataBacktest

Motore di Backtesting con Python – Parte VIII (Backtest)

Backtesting-Event-Driven-Python-Trading-Algoritmico - Parte VIII

Siamo ora in grado di creare la classe gerarchica Backtest. L’oggetto Backtest incapsula la logica di gestione degli eventi ed essenzialmente lega insieme tutte le altre classi che abbiamo descritto negli articoli precedenti.

L’oggetto Backtest è progettato per eseguire un sistema guidato da eventi annidati in un ciclo while per gestire gli eventi inseriti nell’oggetto EventQueue. Il ciclo while esterno è noto come il “heartbeat loop” e decide la risoluzione temporale del sistema di backtesting. In un live ambiente questo valore sarà un numero positivo, ad esempio 600 secondi (ogni dieci minuti). Così i dati di mercato e le posizioni verranno aggiornati solo in questo lasso di tempo.
Per il backtester qui descritto il “heartbeat” può essere impostato a zero, indipendentemente dalla frequenza della strategia, poiché i dati sono già disponibili in virtù del fatto che sono storici!

Possiamo eseguire il backtest a qualsiasi velocità desideriamo, poiché il sistema guidato dagli eventi è agnostico dalla disponibilità temporale dei dati storici, a condizione che abbiano un timestamp associato. Quindi ho solo incluso per dimostrare come funzionerebbe un motore di trading live. Il ciclo esterno così termina una volta che DataHandler lo comunica all’oggetto Backtest, utilizzando un attributo booleano continue_backtest.
Il ciclo while interno elabora effettivamente i segnali e li invia al componente corretto a seconda del tipo di evento. Pertanto, la coda degli eventi viene continuamente popolata e spopolata da eventi. Questo è ciò che significa avere un sistema guidato dagli eventi.

Il primo compito è importare le librerie necessarie. Importiamo pprint (“pretty-print”), perché vogliamo visualizzare le statistiche in modo semplice per l’output:

            # backtest.py

import datetime
import pprint
import queue
import time
        
L’inizializzazione dell’oggetto Backtest richiede la directory CSV, l’elenco completo dei simboli da analizzare, il capitale iniziale, il periodo di “heartbreat” in millisecondi, la data e ora di inizio del backtest nonché degli oggetti DataHandler, ExecutionHandler, Portfolio e Strategy. Una coda viene utilizzata per contenere gli eventi. Vengono conteggiati i segnali, gli ordini e le esecuzioni:
            # backtest.py

class Backtest(object):
    """
    Racchiude le impostazioni e i componenti per l'esecuzione
    un backtest basato sugli eventi.
    """
    def __init__(self, csv_dir, symbol_list, initial_capital,
                 heartbeat, start_date, data_handler,
                 execution_handler, portfolio, strategy ):
        """
        Inizializza il backtest.

        Parametri:
        csv_dir - Il percorso della directory dei dati CSV.
        symbol_list - L'elenco dei simboli.
        intial_capital - Il capitale iniziale del portafoglio.
        heartbeat - il "battito cardiaco" del backtest in secondi
        data_inizio - La data e ora di inizio della strategia.
        data_handler - (Classe) Gestisce il feed di dati di mercato.
        execution_handler - (Classe) Gestisce gli ordini / esecuzioni per i trade.
        portfolio - (Classe) Tiene traccia del portafoglio attuale e delle posizioni precedenti.
        strategy - (Classe) Genera segnali basati sui dati di mercato.
        """
        
        self.csv_dir = csv_dir
        self.symbol_list = symbol_list
        self.initial_capital = initial_capital
        self.heartbeat = heartbeat
        self.start_date = start_date
        self.data_handler_cls = data_handler
        self.execution_handler_cls = execution_handler
        self.portfolio_cls = portfolio
        self.strategy_cls = strategy
        self.events = queue.Queue()
        self.signals = 0
        self.orders = 0
        self.fills = 0
        self.num_strats = 1
        self._generate_trading_instances()
        
Il primo metodo, _generate_trading_instances, collega tutti gli oggetti di trading (Data- Handler, Strategy, Portfolio and ExecutionHandler) a vari componenti interni:
            # backtest.py

    def _generate_trading_instances(self):
        """
        Genera le istanze degli componenti del backtest a partire dalle loro classi.
        """

        print("Creating DataHandler, Strategy, Portfolio and ExecutionHandler")
        self.data_handler = self.data_handler_cls(self.events, 
                                                  self.csv_dir,
                                                  self.symbol_list)
        self.strategy = self.strategy_cls(self.data_handler, 
                                          self.events)
        self.portfolio = self.portfolio_cls(self.data_handler, 
                                            self.events,
                                            self.start_date,
                                            self.initial_capital)
        self.execution_handler = self.execution_handler_cls(self.events)
        

Il metodo _run_backtest è dove viene effettuata la gestione di segnali all’interno del motore di backtest.

Come descritto negli articoli precedenti, ci sono due cicli while, uno annidato all’interno dell’altro.
Il ciclo esterno tiene traccia del “battito” del sistema, mentre il ciclo interno controlla se c’è un evento in coda e agisce su di esso chiamando il metodo appropriato sull’oggetto necessario.

Per un MarketEvent, si chiede all’oggetto Strategy di ricalcolare nuovi segnali, mentre all’ oggetto Portfolio
si chiede di reindicizzare l’ora.
Se si riceve un SignalEvent, si comunica al Portfolio di gestire il nuovo segnale e convertirlo in un insieme di OrderEvents, se necessario.
Nel caso di ricezione di un  OrderEvent, si invia l’ordine all’ExecutionHandler in modo che venga trasmesso al broker (se è attivo il live trading).
Infine, se viene ricevuto un “FillEvent”, si aggiorna il Portfolio in modo da essere a allineato con le nuove posizioni:

            # backtest.py

    def _run_backtest(self):
        """
        Esecuzione del backtest.
        """
        i = 0
        while True:
            i += 1
            print(i)
            # Aggiornamento dei dati di mercato
            if self.data_handler.continue_backtest == True:
                self.data_handler.update_bars()
            else:
               break
            # Gestione degli eventi
            while True:
                try:
                    event = self.events.get(False)
                except queue.Empty:
                    break
                else:
                    if event is not None:
                        if event.type == 'MARKET':
                            self.strategy.calculate_signals(event)
                            self.portfolio.update_timeindex(event)
                        elif event.type == 'SIGNAL':
                            self.signals += 1
                            self.portfolio.update_signal(event)
                        elif event.type == 'ORDER':
                            self.orders += 1
                            self.execution_handler.execute_order(event)
                        elif event.type == 'FILL':
                            self.fills += 1
                            self.portfolio.update_fill(event)
            time.sleep(self.heartbeat)
        
Una volta completata la simulazione del backtest, è possibile visualizzare le prestazioni della strategia nel terminale / console python.
Viene creata la curva di equity dal Dataframe pandas e vengono visualizzate le statistiche di riepilogo, così come il conteggio di Segnali, Ordini ed Eseguiti:
            # backtest.py

    def _output_performance(self):
        """
        Stampa delle performance della strategia dai risultati del backtest.
        """
        self.portfolio.create_equity_curve_dataframe()
        print("Creating summary stats...")
        stats = self.portfolio.output_summary_stats()
        print("Creating equity curve...")
        print(self.portfolio.equity_curve.tail(10))
        pprint.pprint(stats)
        print("Signals: %s" % self.signals)
        print("Orders: %s" % self.orders)
        print("Fills: %s" % self.fills)
        
L’ultimo metodo da implementare è il simulate_trading. Esso richiama semplicemente in ordine i 2 metodi descritti precedentemente:
            # backtest.py

    def simulate_trading(self):
        """
        Simula il backtest e stampa le performance del portafoglio.
        """
        self._run_backtest()
        self._output_performance()
        

 

Per il codice completo riportato in questo articolo, utilizzando il modulo di backtesting event-driven DataBacktest si può consultare il seguente repository di github:
https://github.com/datatrading-info/DataBacktest

Motore di Backtesting con Python – Parte VII (Performance)

Nel precedente articolo della serie “Ambiente di Backtesting Event-Driven” è stato descritta la gerarchia della classe ExecutionHandler. In questo articolo si introduce l’implementazione delle metriche per misurare le prestazioni di una strategia usando la curva equity DataFrame precedentemente costruita nell’oggetto Portfolio.

Misurare le Performance

Abbiamo già descritto il Sharpe Ratio in un precedente articolo. In quell’articolo il Sharpe Ratio (annualizzato) viene calcolato tramite:

\(\begin{eqnarray*} S_A = \sqrt{N} \frac{\mathbb{E}(R_a – R_b)}{\sqrt{\text{Var} (R_a – R_b)}} \end{eqnarray*}\)

Dove \(R_a\) è il flusso dei rendimenti della curva equity e \(R_b\) è un indice di riferimento, come uno specifico tasso di interesse o un indice azionario. Il massimo drawdown e la durata del drawdown sono due ulteriori misure che gli investitori utilizzano per valutare il rischio in un portafoglio. Il primo rappresenta è più grande discesa, la correzione, da un precedente massimo relativo o massimo assoluto, della curva equity, mentre il secondo è definito come il numero di periodi di trading in cui si verifica. In questo articolo si implementa il Sharpe Ratio, il drawdown massimo e la durata del drawdown come misure delle prestazioni del portafoglio da utilizzare nella suite di Backtesting Event-Driven sviluppato in Python.

Implementazione

Il primo passo è creare un nuovo file performance.py, che memorizzi le funzioni per calcolare il Sharpe Ratio e le informazioni sul drawdown. Come per la maggior parte delle classi che prevedono elevati carichi computanzionali, abbiamo bisogno di importare NumPy e Pandas:
            # performance.py

import numpy as np
import pandas as pd
        

Il Sharpe Ratio è una misura del rischio/rendimento (in realtà è una delle tante!) e prevede un singolo parametro, cioè il numero di periodi da considerare per il ridimensionamento al valore annualizzato.

Di solito questo valore è impostato su 252, ovvero il numero di giorni di negoziazione (mercati aperti) negli Stati Uniti in un anno. Tuttavia, ad esempio, se la strategia apre e chiude posizioni all’interno di un’ora, si deve regolare lo Sharpe per annualizzarlo correttamente. Pertanto, è necessario impostare il periods come 252 * 6.5 = 1638, ovvero il numero di ore di trading statunitensi in un anno. Se si effettua trading sul minuto, questo fattore deve essere impostato come 252 * 6.5 * 60 = 98280.

La funzione create_sharpe_ratio opera su un oggetto Serie di Pandas che abbiamo chiamato returns e calcola semplicemente il rapporto tra la media dei rendimenti percentuali del periodo e le deviazioni standard dei rendimenti percentuali ridimensionato in base al fattore periods:

            # performance.py

def create_sharpe_ratio(returns, periods=252):
    """
    Crea il Sharpe ratio per la strategia, basato su a benchmark 
    pari a zero (ovvero nessuna informazione sui tassi privi di rischio).

    Parametri:
    returns - Una serie panda che rappresenta i rendimenti percentuali nel periodo.
    periods - Giornaliero (252), orario (252 * 6,5), minuto (252 * 6,5 * 60) ecc.
    """
    return np.sqrt(periods) * (np.mean(returns)) / np.std(returns)
        

Mentre il Sharpe Ratio indica il livello di rischio (definito dalla deviazione standard del patrimonio) per unità di rendimento, il “drawdown” è definito come la distanza tra un massimo relativo e un minimo relativo lungo una curva equity.

La funzione create_drawdowns calcola sia il drawdown massimo che la durata massima di drawdown. Il primo è la discesa più elevata tra un massimo e minimo relativi, mentre il secondo è definito come il numero di periodi in cui questa discesa si verifica.

E’ necessario prestare molta attenzione nell’interpretazione della durata del drawdown in quanto questo fattore identifica i periodi di trading e quindi non è direttamente traducibile in un’unità temporale come “giorni”.

La funzione inizia creando due oggetti Serie di Pandas che rappresentano il drawdown e la durata di ogni “barra” di trading. Quindi viene stabilito l’attuale high water mark (HWM) determinando se la curva di equty supera tutti i picchi precedenti.

Il drawdown è quindi semplicemente la differenza tra l’attuale HWM e la curva di equity. Se questo valore è negativo, la durata viene aumentata per ogni barra che si verifica fino al raggiungimento del prossimo HWM. La funzione restituisce quindi semplicemente il massimo di ciascuna delle due serie:

            # performance.py

def create_drawdowns(pnl):
    """
    Calcola il massimo drawdown tra il picco e il minimo della curva PnL
    così come la durata del drawdown. Richiede che il pnl_returns
    sia una serie di pandas.

    Parametri:
    pnl - Una serie pandas che rappresenta i rendimenti percentuali del periodo.

    Restituisce:
    Drawdown, duration - Massimo drawdown picco-minimo e relativa durata.
    """

    # Calcola la curva cumulativa dei rendimenti
    # e imposta un "High Water Mark"
    # Quindi crea le serie dei drawdown e relative durate
    hwm = [0]
    idx = pnl.index
    drawdown = pd.Series(index = idx)
    duration = pd.Series(index = idx)

    # Ciclo sul range dell'indice
    for t in range(1, len(idx)):
        cur_hwm = max(hwm[t-1], pnl[t])
        hwm.append(cur_hwm)
        dd = (hwm[t] - pnl[t])
        drawdown[t]= dd
        duration[t]= (0 if drawdown[t] == 0 else duration[t-1] + 1)
    return drawdown, drawdown.max(), duration.max()
        
Al fine di utilizzare queste misure di performance, si ha bisogno di un metodo per calcolarle dopo che è stato effettuato un backtest, cioè quando è disponibile un’adeguata curva di equity! E’ necessario inoltre associare tale metodo a una particolare gerarchia di oggetti. Dato che le misure di rendimento sono calcolate a partire dal portafoglio, ha senso inserire i calcoli delle prestazioni all’interno di un metodo nella gerarchia della classe Portfolio, che è stata descritta in questo articolo. Il primo compito è aprire portfolio.py e importare le funzioni di performance:
            # portfolio.py

..  # Other imports

from performance import create_sharpe_ratio, create_drawdowns
        
Poiché Portfolio è una classe base astratta, si deve associare un metodo a una delle sue classi derivate, che in questo caso corrisponde a NaivePortfolio. Quindi si crea un metodo chiamato output_summary_stats che elabora la curva equity del portafoglio per generare le informazioni relative allo Sharpe e drawdown. Il metodo è semplice. Utilizza semplicemente le due misure di performance e le applica direttamente al DataFrame Pandas relativo alla curva equity, restituendo le statistiche come una lista di tuple in un formato “user-friendly”:
            # portfolio.py

..
..

class NaivePortfolio(object):

    ..
    ..

    def output_summary_stats(self):
        """
        Crea un elenco di statistiche di riepilogo per il portafoglio 
        come lo Sharpe Ratio e le informazioni sul drowdown.
        """
        total_return = self.equity_curve['equity_curve'][-1]
        returns = self.equity_curve['returns']
        pnl = self.equity_curve['equity_curve']

        sharpe_ratio = create_sharpe_ratio(returns)
        drawdown, max_dd, dd_duration = create_drawdowns(pnl)
        self.equity_curve['drawdown'] = drawdown
        stats = [("Total Return", "%0.2f%%" % \
                  ((total_return - 1.0) * 100.0)),
                 ("Sharpe Ratio", "%0.2f" % sharpe_ratio),
                 ("Max Drawdown", "%0.2f%%" % (max_dd * 100.0)),
                 ("Drawdown Duration", "%d" % dd_duration)]
        self.equity_curve.to_csv('equity.csv')
        return stats
        
Chiaramente questa è un’analisi molto semplice delle prestazioni per un portfolio. Non prende in considerazione l’analisi a livello di singolo trade o altre misure del rapporto rischio / rendimento. Tuttavia è molto semplice da estendere, aggiungendo più metodi in performance.py e quindi incorporandoli in output_summary_stats come richiesto.

 

Per il codice completo riportato in questo articolo, utilizzando il modulo di backtesting event-driven DataBacktest si può consultare il seguente repository di github:
https://github.com/datatrading-info/DataBacktest

Motore di Backtesting con Python – Parte VI (Esecuzione degli Ordini)

In questo articolo continua lo sviluppo di un ambiente di backtesting basato sugli eventi, utilizzando Python. Nel precedente articolo è stata approfondita la gerarchia della classe Portfolio che permette di gestire le posizioni correnti, generare ordini di trading e tenere traccia dei profitti e delle perdite (PnL).

Il porossimo passo è implementare l’esecuzione di questi ordini, creando una gerarchia di classi che rappresenta un meccanismo per la simulazione della gestione degli ordini e, infine, collegarsi ad un broker o ad altri intermediari di mercato.

L’ExecutionHandler descritto in questo articolo è estremamente semplice, poiché esegue tutti gli ordini al prezzo corrente di mercato. Questo è altamente irrealistico, ma serve come una buona base di partenza da perfezionare successivamente.

Come per le precedenti gerarchie di classi astratte di base, bisogna importare le proprietà e i decoratori necessari dalla libreria abc. Inoltre, è necessario importare FillEvent e OrderEvent:

            # execution.py

import datetime
import queue

from abc import ABCMeta, abstractmethod

from event.event import FillEvent, OrderEvent
        

 

La classe ExecutionHandler è simile alle precedenti classi astratte di base e ha solamente un metodo virtuale, execute_order:

            # execution.py

class ExecutionHandler(object):
    """
    La classe astratta ExecutionHandler gestisce l'interazione
    tra un insieme di oggetti "ordini" generati da un portafoglio e
    l'ultimo set di oggetti Fill che effettivamente si verificano
    nel mercato.

    Gli handles possono essere utilizzati per creare sottoclassi
    con interfacce identiche per broker simulati o broker live.
    Questo permette di sottoporre strategie a backtesting in modo
    molto simile al motore di live trading.
    """

    __metaclass__ = ABCMeta

    @abstractmethod
    def execute_order(self, event):
        """
        Accetta un evento Order e lo esegue, producendo
        un evento Fill che viene inserito nella coda degli eventi.

        Parametri:
        event - Contiene un oggetto Event con informazioni sull'ordine.
        """
        raise NotImplementedError("Should implement execute_order()")
        

Per testare le strategie, è necessario simulare il modo in cui un trade verrà eseguito. L’implementazione più semplice possibile consiste nell’ipotizzare che tutti gli ordini siano stati eseguiti al prezzo corrente di mercato per qualsiasi quantità. Questo è chiaramente estremamente irrealistico e gran parte del lavoro per aumentare il grado di realismo del backtesting consiste nel progettare dei modelli avanzati per simulare lo slippage e il market-impact.

Da notare che all’interno del metodo FillEvent viene passato un valore pari a None per l’attributo fill_cost (vedere la penultima riga in execute_order) come abbiamo descritto per il costo di esecuzione nell’oggetto NaivePortfolio descritto nell’articolo precedente. In un’implementazione più realistica, si utilizza il valore di dati di mercato “attuali” per ottenere un costo di esecuzione più realistico.

Ho inoltre utilizzato ARCA come exchange, anche se per i scopi di backtesting questo è puramente un segnaposto. In un ambiente di esecuzione dal vivo questo attributo diventa molto più importante:

            # execution.py

class SimulatedExecutionHandler(ExecutionHandler):
    """
    Il gestore di esecuzione simulato converte semplicemente tutti gli
    oggetti Ordine automaticamente negli equivalenti oggetti Fill
    senza considerare i problemi di latenza, slittamento e rapporto di
    esecuzione (fill-ratio).

    Ciò consente un semplice test "first go" di qualsiasi strategia,
    prima dell'implementazione con un gestiore di esecuzione più sofisticato.
    """

    def __init__(self, events):
        """
        Inizializza il gestore, impostando internamente le code degli eventi.

        Parametri
        events - L'oggetto di coda degli eventi.
        """
        self.events = events

    def execute_order(self, event):
        """
        Converte semplicemente gli oggetti Order in oggetti Fill base,
        cioè senza considerare latenza, slittamento o rapporto di esecuzione.

        Parametri:
        event - Contiene un oggetto Event con informazioni sull'ordine.
        """
        if event.type == 'ORDER':
            fill_event = FillEvent(datetime.datetime.utcnow(), event.symbol,
                                   'ARCA', event.quantity, event.direction, None)
            self.events.put(fill_event)
        

Questo conclude le gerarchie di classi necessarie per implementare un ambiente di backtesting basato sugli eventi.

Nel prossimo articolo si descriverà come calcolare un insieme di metriche sul rendimento per la strategia oggetto del backtesting.

 

Per il codice completo riportato in questo articolo, utilizzando il modulo di backtesting event-driven DataBacktest si può consultare il seguente repository di github:
https://github.com/datatrading-info/DataBacktest

Motore di Backtesting con Python – Parte V (Portafoglio)

Nel precedente articolo relativo al backtesting basato sugli eventi abbiamo descritto come costruire la gerarchia della classe Strategy.

Le strategie, per come sono state definite, sono utilizzate per generare signals, che sono l’input di un oggetto portfolio al fine di decidere se inviare o meno gli orders. Inizialmente, è naturale creare una classe astratta di base (ABC) del Portfolio da cui si ereditano tutte le sottoclassi successive.

Questo articolo descrive un oggetto NaivePortfolio che tiene traccia delle posizioni all’interno di un portafoglio e genera ordini di una quantità fissa di azioni in base ai segnali. Oggetti di portfolio avanzati includono strumenti di gestione del rischio più sofisticati e saranno oggetto di articoli successivi.

Monitoraggio della Posizione e Gestione degli Ordini

Il sistema di gestione degli ordini del portafoglio è probabilmente la componente più complessa di un ambiente backtesting basato sugli eventi. Questa componente ha il compito di tenere traccia di tutte le attuali posizioni aperte sul mercato e del valore di mercato di queste posizioni (note come “holdings”). Questa è semplicemente una stima del valore di liquidazione della posizione ed è derivata in parte dalla funzione di gestione dei dati del backtester.

Oltre alle posizioni e alla gestione degli holdings, il portafoglio deve essere a conoscenza dei fattori di rischio e delle tecniche di dimensionamento delle posizioni al fine di ottimizzare gli ordini inviati ad un broker o verso altre forme di accesso al mercato.

In analogia alla gerarchia della classe Event, un oggetto Portfolio deve essere in grado di gestire oggetti SignalEvent, generare oggetti OrderEvent e interpretare oggetti FillEvent per aggiornare le posizioni. Pertanto non sorprende che gli oggetti portfolio siano spesso il componente più importante dei sistemi event-driven, in termini di righe di codice (LOC).


Implementazione

Si crea un nuovo file portfolio.py e si importa le librerie necessarie. Queste sono le stesse della maggior parte delle altre implementazioni delle classe astratte di base. In particolare si importa la funzione floor dalla libreria math per generare dimensioni di ordine con valori interi, ed inoltre si importano gli oggetti FillEvent e OrderEvent poiché il Portfolio gestisce entrambi.
            # portfolio.py

import datetime
import numpy as np
import pandas as pd
import queue

from abc import ABCMeta, abstractmethod
from math import floor

from event import FillEvent, OrderEvent
        

 

A questo punto si crea una classe ABC per il Portfolio e si implementano due metodi virtuali update_signal e update_fill. Il primo elabora i nuovi segnali di trading che vengono prelevati dalla coda degli eventi, mentre il secondo gestisce gli ordini eseguiti e ricevuti dall’oggetto di gestione dell’esecuzione.

            # portfolio.py

class Portfolio(object):
    """
    La classe Portfolio gestisce le posizioni e il valore di
    mercato di tutti gli strumenti alla risoluzione di una "barra",
    cioè ogni secondo, ogni minuto, 5 minuti, 30 minuti, 60 minuti o EOD.
    """

    __metaclass__ = ABCMeta

    @abstractmethod
    def update_signal(self, event):
        """
        Azioni su un SignalEvent per generare nuovi ordini
        basati sulla logica di portafoglio
        """
        raise NotImplementedError("Should implement update_signal()")

    @abstractmethod
    def update_fill(self, event):
        """
        Aggiorna le posizioni e il patrimonio del portafoglio 
        da un FillEvent.
        """
        raise NotImplementedError("Should implement update_fill()")
        

L’argomento principale di questo articolo è la classe NaivePortfolio. Questa classe è progettata per gestire il dimensionamento delle posizioni e gli holdings correnti, ma esegue gli ordini di compravendita in modo “stupido”, semplicemente inviandoli direttamente al broker con una dimensione fissa e predeterminata, indipendentemente dalla liquidità detenuta. Queste sono tutte ipotesi irrealistiche, ma aiutano a delineare come funziona un sistema di gestione degli ordini di portafoglio (OMS) basato sugli eventi.

La NaivePortfolio richiede un valore del capitale iniziale, che ho impostato sul valore predefinito di 100.000 USD. Richiede anche una data di inizio.

Il portfolio contiene gli attributi all_positions e current_positions. Il primo memorizza un elenco di tutte le precedenti posizioni registrate ad uno specifico timestamp di un evento di dati di mercato. Una posizione è semplicemente la quantità dell’asset. Le posizioni negative indicano che l’asset è stato ridotto. Il secondo attributo memorizza un dizionario contenente le posizioni correnti per l’ultimo aggiornamento dei dati di mercato.

Oltre agli attributi delle posizioni, il portafoglio memorizza gli holdings, che descrivono il valore corrente di mercato delle posizioni detenute. Il “Valore corrente di mercato” indica, in questo caso, il prezzo di chiusura ottenuto dalla barra OLHCV corrente, che è chiaramente un’approssimazione, ma è abbastanza accettabile in questo momento. L’attributo all_holdings memorizza la lista storica di tutte gli holding dei simboli, mentre current_holdings memorizza il dizionario aggiornato di tutti i valori di holdings dei simboli.

            # portfolio.py

class NaivePortfolio(Portfolio):
    """
    L'oggetto NaivePortfolio è progettato per inviare ordini a
    un oggetto di intermediazione con una dimensione di quantità costante,
    cioè senza alcuna gestione del rischio o dimensionamento della posizione. È
    utilizzato per testare strategie più semplici come BuyAndHoldStrategy.
    """

    def __init__(self, bars, events, start_date, initial_capital=100000.0):
        """
        Inizializza il portfolio con la coda delle barre e degli eventi.
        Include anche un indice datetime iniziale e un capitale iniziale
        (USD se non diversamente specificato).

        Parametri:
        bars - L'oggetto DataHandler con i dati di mercato correnti.
        events: l'oggetto Event Queue (coda di eventi).
        start_date - La data di inizio (barra) del portfolio.
        initial_capital - Il capitale iniziale in USD.
        """
        self.bars = bars
        self.events = events
        self.symbol_list = self.bars.symbol_list
        self.start_date = start_date
        self.initial_capital = initial_capital

        self.all_positions = self.construct_all_positions()
        self.current_positions = dict((k, v) for k, v in [(s, 0) for s in self.symbol_list])

        self.all_holdings = self.construct_all_holdings()
        self.current_holdings = self.construct_current_holdings()
        

 

Il seguente metodo, construct_all_positions, crea semplicemente un dizionario per ogni simbolo, e per ciascuno imposta il valore a zero e quindi aggiunge una chiave datetime, inserendo infine questo oggetto in un elenco. Usa una comprensione del dizionario, che è simile alla comprensione di una lista:

            # portfolio.py

    def construct_all_positions(self):
        """
        Costruisce l'elenco delle posizioni utilizzando start_date
        per determinare quando inizierà l'indice temporale.
        """
        d = dict( (k,v) for k, v in [(s, 0) for s in self.symbol_list] )
        d['datetime'] = self.start_date
        return [d]
        
Il metodo construct_all_holdings è simile al precedente, ma aggiunge delle chiavi extra per memorizzare i contanti, le commissioni e il totale, che rappresentano rispettivamente la riserva di denaro nel conto dopo eventuali acquisti, la commissione cumulativa maturata e il totale del conto azionario inclusi i contanti e le posizioni aperte. Le posizioni short sono considerate negative. I contanti (cash) e il totale (total) sono entrambi inizializzati con il capitale iniziale:
            # portfolio.py

    def construct_all_holdings(self):
        """
        Costruisce l'elenco delle partecipazioni utilizzando start_date
        per determinare quando inizierà l'indice temporale.
        """
        d = dict( (k,v) for k, v in [(s, 0.0) for s in self.symbol_list] )
        d['datetime'] = self.start_date
        d['cash'] = self.initial_capital
        d['commission'] = 0.0
        d['total'] = self.initial_capital
        return [d]
        

 

Il metodo seguente, construct_current_holdings è quasi identico al metodo precedente, tranne per il fatto che non racchiude il dizionario in un elenco:

            # portfolio.py

    def construct_current_holdings(self):
        """
        Questo costruisce il dizionario che conterrà l'istantaneo
        valore del portafoglio attraverso tutti i simboli.
        """
        d = dict( (k,v) for k, v in [(s, 0.0) for s in self.symbol_list] )
        d['cash'] = self.initial_capital
        d['commission'] = 0.0
        d['total'] = self.initial_capital
        return d
        

Ad ogni “battito” o impulso del sistema, cioè ogni volta che vengono richiesti nuovi dati di mercato dall’oggetto DataHandler, il portfolio deve aggiornare il valore corrente di mercato di tutte le posizioni detenute. In uno scenario di trading live queste informazioni possono essere scaricate e analizzate direttamente dal broker, ma per un’implementazione di backtesting è necessario calcolare manualmente questi valori.

Sfortunatamente non esiste una cosa come il “valore corrente di mercato” a causa degli spread bid / ask e delle problematiche di liquidità. Quindi è necessario stimarlo moltiplicando la quantità del bene detenuta per un determinato “prezzo”. L’approccio utilizzato in questo esempio prevede di utilizzare il prezzo di chiusura dell’ultima barra ricevuta. Per una strategia intraday questo è relativamente realistico. Per una strategia quotidiana questo è meno realistico in quanto il prezzo di apertura può differire molto dal prezzo di chiusura.

Il metodo update_timeindex gestisce il monitoraggio dei nuovi holdings. In particolare ricava i prezzi più recenti dal gestore dei dati di mercato e crea un nuovo dizionario di simboli per rappresentare le posizioni correnti, impostando le posizioni “nuove” uguali alle posizioni “correnti”. Questi vengono modificati solo quando si riceva un FillEvent, che viene successivamente gestito dal portfolio. Il metodo quindi aggiunge questo insieme di posizioni correnti alla lista all_positions. Successivamente, le posizioni vengono aggiornate in modo simile, con l’eccezione che il valore di mercato viene ricalcolato moltiplicando il conteggio delle posizioni correnti con il prezzo di chiusura dell’ultima barra (self.current_positions [s] * bars [s] [0] [ 5]). Infine, i nuovi holdings sono agggiunti a all_holdings:

            # portfolio.py

    def update_timeindex(self, event):
        """
        Aggiunge un nuovo record alla matrice delle posizioni per la barra corrente
        dei dati di mercato. Questo riflette la barra PRECEDENTE, cioè in questa fase
        tutti gli attuali dati di mercato sono noti (OLHCVI).

        Utilizza un MarketEvent dalla coda degli eventi.
        """
        latest_datetime = self.bars.get_latest_bar_datetime(
                                self.symbol_list[0]
                            )

        # Update positions
        dp = dict( (k,v) for k, v in [(s, 0) for s in self.symbol_list] )
        dp['datetime'] = latest_datetime

        for s in self.symbol_list:
            dp[s] = self.current_positions[s]

        # Aggiunge le posizioni correnti
        self.all_positions.append(dp)

        # Aggiorno delle holdings
        dh = dict( (k,v) for k, v in [(s, 0) for s in self.symbol_list] )
        dh['datetime'] = latest_datetime
        dh['cash'] = self.current_holdings['cash']
        dh['commission'] = self.current_holdings['commission']
        dh['total'] = self.current_holdings['cash']

        for s in self.symbol_list:
            # Approossimazione ad un valore reale
            market_value =  market_value = self.current_positions[s] * \
                            self.bars.get_latest_bar_value(s, "adj_close")"adj_close")
            dh[s] = market_value
            dh['total'] += market_value

        # Aggiunta alle holdings correnti
        self.all_holdings.append(dh)
        

 

Il metodo update_positions_from_fill determina se FillEvent è un Buy o un Sell e quindi aggiorna di conseguenza il dizionario current_positions aggiungendo / sottraendo la corretta quantità di asset:

            # portfolio.py

    def update_positions_from_fill(self, fill):
        """
        Prende un oggetto FilltEvent e aggiorna la matrice delle posizioni
        per riflettere le nuove posizioni.

        Parametri:
        fill - L'oggetto FillEvent da aggiornare con le posizioni.
        """
        # Check whether the fill is a buy or sell
        fill_dir = 0
        if fill.direction == 'BUY':
            fill_dir = 1
        if fill.direction == 'SELL':
            fill_dir = -1

        # Aggiorna le posizioni con le nuove quantità
        self.current_positions[fill.symbol] += fill_dir * fill.quantity
        

 

Il corrispondente update_holdings_from_fill è simile al metodo precedente ma aggiorna i valori di holdings. Per simulare il costo di riempimento, il metodo seguente non utilizza il costo associato a FillEvent. Perchè questo approccio? In parole povere, in un ambiente di backtesting il costo di riempimento è in realtà sconosciuto e quindi deve essere stimato. Quindi il costo di riempimento è impostato sul “prezzo corrente di mercato” (il prezzo di chiusura dell’ultima barra). Le posizioni per un particolare simbolo vengono quindi impostate per essere uguali al costo di riempimento moltiplicato per la quantità del trade.

Una volta che il costo di riempimento è noto, gli holdings correnti, i contanti e i valori totali possono essere aggiornati. Anche la commissione cumulativa viene aggiornata:

            # portfolio.py

    def update_holdings_from_fill(self, fill):
        """
        Prende un oggetto FillEvent e aggiorna la matrice delle holdings
        per riflettere il valore delle holdings.

        Parametri:
        fill - L'oggetto FillEvent da aggiornare con le holdings.
        """
        # Controllo se l'oggetto fill è un buy o sell
        fill_dir = 0
        if fill.direction == 'BUY':
            fill_dir = 1
        if fill.direction == 'SELL':
            fill_dir = -1

        # Aggiorna la lista di holdings con le nuove quantità
        fill_cost = self.bars.get_latest_bar_value(fill.symbol, "adj_close")
        cost = fill_dir * fill_cost * fill.quantity
        self.current_holdings[fill.symbol] += cost
        self.current_holdings['commission'] += fill.commission
        self.current_holdings['cash'] -= (cost + fill.commission)
        self.current_holdings['total'] -= (cost + fill.commission)
        

 

Qui viene implementato il metodo virtuale update_fill della classe ABC Portfolio . Esegue semplicemente i due metodi precedenti, update_positions_from_fill update_holdings_from_fill, che sono già stati discussi sopra:

             # portfolio.py

def update_fill(self, event):
        """
        Aggiorna le attuali posizioni e holdings del portafoglio da un FillEvent.
        """
        if event.type == 'FILL':
            self.update_positions_from_fill(event)
            self.update_holdings_from_fill(event)
        

L’oggetto Portfolio, oltre a gestire i FillEvents, deve anche occuparsi della generazione degli OrderEvents al ricevimento di uno o più SignalEvents. Il metodo generate_naive_order prende un segnale di long o short di un asset e invia un ordine per aprire una posizione per 100 shares di tale asset. Chiaramente 100 è un valore arbitrario. In un’implementazione realistica questo valore sarà determinato da una gestione del rischio o da un overlay di ridimensionamento della posizione. Tuttavia, questo è un NaivePortfolio e quindi “ingenuamente” invia tutti gli ordini direttamente dai segnali, senza un sistema di dimensionamento della posizione.

Il metodo gestisce il long, lo short e l’uscita di una posizione, in base alla quantità corrente e allo specifico simbolo. Infine vengono generati i corrispondenti oggetti OrderEvent:

            # portfolio.py

    def generate_naive_order(self, signal):
        """
        Trasmette semplicemente un oggetto OrderEvent con una quantità costante
        che dipendente dell'oggetto segnale, senza gestione del rischio o
        considerazioni sul dimensionamento della posizione.

        Parametri:
        signal - L'oggetto SignalEvent.
        """     
        order = None

        symbol = signal.symbol
        direction = signal.signal_type
        strength = signal.strength

        mkt_quantity = floor(100 * strength)
        cur_quantity = self.current_positions[symbol]
        order_type = 'MKT'

        if direction == 'LONG' and cur_quantity == 0:
            order = OrderEvent(symbol, order_type, mkt_quantity, 'BUY')
        if direction == 'SHORT' and cur_quantity == 0:
            order = OrderEvent(symbol, order_type, mkt_quantity, 'SELL')   
    
        if direction == 'EXIT' and cur_quantity > 0:
            order = OrderEvent(symbol, order_type, abs(cur_quantity), 'SELL')
        if direction == 'EXIT' and cur_quantity < 0:
            order = OrderEvent(symbol, order_type, abs(cur_quantity), 'BUY')
        return order
        

 

Il metodo update_signal richiama semplicemente il metodo precedente e aggiunge l’ordine generato alla coda degli eventi:

            # portfolio.py

    def update_signal(self, event):
        """
        Azioni a seguito di un SignalEvent per generare nuovi ordini
        basati sulla logica del portafoglio 
        """
        if event.type == 'SIGNAL':
            order_event = self.generate_naive_order(event)
            self.events.put(order_event)
        
Il penultimo metodo di NaivePortfolio prevede la generazione di una curva equity. Crea semplicemente un flusso dei rendimenti, utilizzato per i calcoli delle prestazioni e quindi normalizza la curva equity in base alla percentuale. La dimensione iniziale dell’account è pari a 1,0:
            # portfolio.py

    def create_equity_curve_dataframe(self):
        """
        Crea un DataFrame pandas dalla lista di dizionari "all_holdings"
        """
        curve = pd.DataFrame(self.all_holdings)
        curve.set_index('datetime', inplace=True)
        curve['returns'] = curve['total'].pct_change()
        curve['equity_curve'] = (1.0+curve['returns']).cumprod()
        self.equity_curve = curve
        
Il metodo finale nel NaivePortfolio è l’output della curva azionaria e di varie statistiche sulle performance della strategia. L’ultima riga genera un file, equity.csv, nella stessa directory del codice, che può essere caricato in uno script Matplotlib Python (o un foglio di calcolo come MS Excel o LibreOffice Calc) per un’analisi successiva. Si noti che la Durata del Drawdown è data in termini di numero assoluto di “barre” per le quali si è svolto il Drawdown, al contrario di un determinato periodo di tempo.
            # portfolio.py

    def output_summary_stats(self):
        """
        Crea un elenco di statistiche di riepilogo per il portafoglio
        come lo Sharpe Ratio e le informazioni sul drowdown.
        """
        total_return = self.equity_curve['equity_curve'][-1]
        returns = self.equity_curve['returns']
        pnl = self.equity_curve['equity_curve']
        sharpe_ratio = create_sharpe_ratio(returns, periods=252 * 60 * 6.5)
        drawdown, max_dd, dd_duration = create_drawdowns(pnl)
        self.equity_curve['drawdown'] = drawdown
        stats = [("Total Return", "%0.2f%%" % \
                  ((total_return - 1.0) * 100.0)),
                 ("Sharpe Ratio", "%0.2f" % sharpe_ratio),
                 ("Max Drawdown", "%0.2f%%" % (max_dd * 100.0)),
                 ("Drawdown Duration", "%d" % dd_duration)]
        self.equity_curve.to_csv('equity.csv')
        return stats
        

L’oggetto NaivePortfolio è la componente più complessa dell’intero sistema di backtesting basato sugli eventi. L’implementazione è complessa, quindi in questo articolo abbiamo semplificato alcuni aspetti tra cui la gestione delle posizioni. Le versioni successive prenderanno in considerazione la gestione del rischio e il dimensionamento delle posizioni, che porterà a un’idea molto più realistica delle prestazioni della strategia.

Nel prossimo articolo considereremo l’ultimo modulo di un sistema di backtesting event-driven, ovvero l’oggetto ExecutionHandler, che viene utilizzato per prelevare oggetti OrderEvent e creare oggetti FillEvent.

 

Per il codice completo riportato in questo articolo, utilizzando il modulo di backtesting event-driven DataBacktest si può consultare il seguente repository di github:
https://github.com/datatrading-info/DataBacktest

Motore di Backtesting con Python – Parte IV (Gestione della Strategia)

In questa serie di articoli relativa all’implementazione di un ambiente di backtesting basato sugli eventi abbiamo già descritto la struttura degli event-loop, la gerarchia della classe Event e la componente per la gestione dei dati. In questo articolo si introduce la gerarchia della classe Strategy. Gli oggetti “strategia” prendono i dati di mercato come input e producono eventi di tipo Signal Trading come output.

Un oggetto Strategy include tutti i calcoli sui dati di mercato che generano segnali advisory per l’oggetto Portfolio. In questa fase di sviluppo dell’ambiente di backtesting event-driven non introduciamo i concetto di indicatore o filtro, come quelli che sono usati nell’analisi tecnica classica. Questi sono tuttavia buoni candidati per la creazione di una gerarchia di classi, ma vanno oltre lo scopo di questo articolo.

La gerarchia della classe Strategy è relativamente semplice poiché consiste in una classe base astratta con un singolo metodo puro virtuale per generare oggetti SignalEvent. Per creare la gerarchia della strategia è necessario importare NumPy, Pandas, l’oggetto Queue, i strumenti della classe base astratta e SignalEvent:

            # strategy.py

import datetime
import numpy as np
import pandas as pd
import queue

from abc import ABCMeta, abstractmethod

from event import SignalEvent
        

 

La classe base astratta Strategy definisce semplicemente il metodo virtuale calculate_signals. Questo metodo sarà usato nelle classi derivate per gestire la creazione di oggetti SignalEvent a seconda degli aggiornamenti dei dati di mercato:

            # strategy.py


class Strategy(object):
    """
    Strategy è una classe base astratta che fornisce un'interfaccia per
    tutti i successivi oggetti (ereditati) di gestione della strategia.

    L'obiettivo di un oggetto (derivato da) Strategy è generare un oggetto
    Signal per specifici simboli basati sugli input di Bars
    (OLHCVI) generati da un oggetto DataHandler.

    Questo è progettato per funzionare sia con dati storici che in tempo reale
    quindi l'oggetto Strategy è agnostico rispetto all'origine dati,
    poiché ricava le tuple di barre da un oggetto Queue (coda).
    """

    __metaclass__ = ABCMeta

    @abstractmethod
    def calculate_signals(self):
        """
        Fornisce il meccanismo per calcolare la lista di segnali.
        """
        raise NotImplementedError("Should implement calculate_signals()")
        

 

Come mostrato nel codice precedente, la definizione della classe astratta Strategy è semplice.
Un primo esempio di sottoclasse dell’oggetto Strategy è la creazione della classe BuyAndHoldStrategy, che implementa la classica strategia buy and hold. Questa strategia compra un asset ad una certo istante e lo conserva all’interno del portafoglio. Quindi viene generato un solo segnale per ogni asset.

Il costruttore (__init__) prevede, come input, il gestore dei dati di mercato e l’oggetto della coda degli eventi Events:

            # strategy.py

class BuyAndHoldStrategy(Strategy):
    """
    Questa è una strategia estremamente semplice che va LONG su tutti
    i simboli non appena viene ricevuta una barra. Non uscirà mai da una posizione.

    Viene utilizzato principalmente come meccanismo di test per la classe Strategy
    nonché un benchmark con cui confrontare le altre strategie.
    """

    def __init__(self, bars, events):
        """
        Inizializza la strategia "buy and hold".

        Parametri:
        bars - L'oggetto DataHandler che fornisce le informazioni sui prezzi
        events - L'oggetto Event Queue (coda di eventi).
        """
        self.bars = bars
        self.symbol_list = self.bars.symbol_list
        self.events = events

        # Quando il segnale "buy & hold" viene inviato, questi sono impostati a True
        self.bought = self._calculate_initial_bought()
        

Nell’inizializzazione di BuyAndHoldStrategy, l’attributo bought viene instanziato con un dictionary (una struttura dati nativa di Python) di chiavi per ogni simbolo, tutte impostate con False. Una volta che un asset è andato “long”, la relativa chiave viene impostata su True. In sostanza ciò consente alla Strategia di sapere su quali asset è “sul mercato” o meno:

            # strategy.py

     def _calculate_initial_bought(self):
        """
        Aggiunge le chiavi di tutti i simboli al dizionario "bought"
        e li imposta su False.
        """
        bought = {}
        for s in self.symbol_list:
            bought[s] = False
        return bought
        

Il metodo virtuale calculate_signals viene concretamente implementato in questa classe. Il metodo scorre su tutti i simboli nell’elenco dei simboli e recupera la barra OLHCV più recente dal gestore dei dati di mercato. Quindi controlla se quel simbolo è stato “comprato” (cioè se abbiamo una posizione aperta a mercato per questo simbolo o no) e, in caso negativo, crea un singolo oggetto SignalEvent. Quest’ultimo viene poi inserito nella coda degli eventi e il dizionario bought viene correttamente aggiornato con True per questo specifico simbolo:

            # strategy.py

    def calculate_signals(self, event):
        """
        For "Buy and Hold" generiamo un singolo segnale per simbolo
        e quindi nessun segnale aggiuntivo. Ciò significa che siamo
        costantemente LONG sul mercato a partire dalla data di 
        inizializzazione della strategia.

        Parametri
        event - Un oggetto MarketEvent.
        """
        if event.type == 'MARKET':
            for s in self.symbol_list:
                bars = self.bars.get_latest_bars(s, N=1)
                if bars is not None and bars != []:
                    if self.bought[s] == False:
                        # (Symbol, Datetime, Type = LONG, SHORT or EXIT)
                        signal = SignalEvent(bars[0][0], bars[0][1], 'LONG')
                        self.events.put(signal)
                        self.bought[s] = True
        

Questa semplice strategia è sufficiente per dimostrare la natura di una gerarchia basata su eventi. Negli articoli successivi considereremo strategie più sofisticate come il pairs trading.

Infine nel prossimo articolo considereremo come creare la gerarchia della classe Portfolio che tenga traccia delle nostre posizioni con un profitto e una perdita (“PnL”)

 

Per il codice completo riportato in questo articolo, utilizzando il modulo di backtesting event-driven DataBacktest si può consultare il seguente repository di github:
https://github.com/datatrading-info/DataBacktest

Motore di Backtesting con Python – Parte III (Dati di Mercato)

Nei due articoli precedenti della serie abbiamo introdotto i concetti base di un sistema di backtesting basato sugli eventi e la gerarchia di classi per l’oggetto Event. In questo articolo vediamo come vengono utilizzati i dati di mercato, sia in un contesto storico di backtesting sia per l’esecuzione del live trading.

Uno dei nostri obiettivi con un sistema di trading basato sugli eventi è di minimizzare la duplicazione del codice tra l’elemento di backtesting e l’elemento di esecuzione live. Idealmente, è ottimale utilizzare la stessa metodologia di generazione del segnale e le stesse componenti di gestione del portafoglio sia per i test storici che per trading reale. Affinché questo funzioni, l’oggetto Strategy, che genera i segnali, e l’oggetto Portfolio, che fornisce gli ordini basati su di essi, devono utilizzare un’identica interfaccia verso un feed di dati finanziari, sia per la versione di backtesting che per quella live.

Questo requisito motiva la necessità di una gerarchia di classi basata sull’oggetto DataHandler, che l’implementa un’interfaccia, disponibile a tutte le sottoclassi, per fornire i dati di mercato alle rimanenti componenti del sistema. In questo modo, si può intercambiare qualsiasi sottoclasse di “fornitura” di dati finanziari senza influenzare la strategia o il calcolo del portafoglio.

Esempi di sottoclassi specifiche possono includere HistoricCSVDataHandler, QuandlDataHandler, SecuritiesMasterDataHandler, InteractiveBrokersMarketFeedDataHandler ecc. In questo tutorial descriviamo solamente la creazione di un gestore CSV di dati storici, che caricherà da un CSV i dati intraday per le azioni nel formato Open-Low-High-Close- Volume-OpenInterest. Questo può quindi essere usato per alimentare con i dati “candela-per-candela” le classi Strategy e Portfolio per ogni heartbeat (o impulso) del sistema, evitando così i bias di look-ahead.

Il primo compito è importare le librerie necessarie. Nello specifico, si includono Pandas e gli strumenti astratti della classe base. Dato che DataHandler genera MarketEvents, si importa anche event.py come descritto nel tutorial precedente:

            # data.py

import datetime
import os, os.path
import pandas as pd

from abc import ABCMeta, abstractmethod

from event import MarketEvent
        

La classe DataHandler è una classe base astratta (ABC), cioè è impossibile istanziare direttamente un’istanza. Possono essere istanziate solamente le sottoclassi. Con questo approccio la classe ABC fornisce un’interfaccia che tutte le successive sottoclassi di DataHandler devono rispettare, garantendo in tal modo la compatibilità con altre classi che comunicano con esse.

Facciamo uso della proprietà __metaclass__ per far sapere a Python che questa è una classe ABC. Inoltre usiamo il decoratore @abstractmethod per far sapere a Python che il metodo verrà sovrascritto dalle sottoclassi (questo è identico a un metodo virtuale puro di C++).

I due metodi fondamentali sono get_latest_bars e update_bars. Il primo restituisce le ultime barre N a partire dal timestamp dall’attuale “impulso”, necessarie per far eseguire le elaborazioni previste nelle classi Strategy. Il secondo metodo fornisce un meccanismo di “alimentazione a goccia” per posizionare le informazioni OLHCV su una nuova struttura dati in modo da evitare la distorsione lookahead. Si noti che verranno sollevate eccezioni se si verifica un tentativo di istanziazione della classe:

            # data.py

class DataHandler(object):
    """
    DataHandler è una classe base astratta che fornisce un'interfaccia per
    tutti i successivi  gestori di dati (ereditati) (sia live che storici).

    L'obiettivo di un oggetto (derivato da) DataHandler è generare un 
    set di barre (OLHCVI) per ogni simbolo richiesto.

    Questo replicherà il modo in cui una strategia live funzionerebbe quando nuovi 
    i dati di mercato sarebbero inviati "giù per il tubo". Questo permette a sistemi 
    live e a sistemi con dati storici di essere trattati allo stesso modo dal resto
    della suite di backtest.
    """

    __metaclass__ = ABCMeta

    @abstractmethod
    def get_latest_bar(self, symbol):
        """
        Restituisce l'ultima barra dalla lista latest_symbol.
        """
        raise NotImplementedError("Should implement get_latest_bar()")

    @abstractmethod
    def get_latest_bars(self, symbol, N=1):
        """
        Restituisce le ultime N barre dalla lista di barre
        per il simbolo, o meno se sono disponibili poche barre
        """
        raise NotImplementedError("Should implement get_latest_bars()")

    def get_latest_bar(self, symbol):
        """
        Restituisce l'ultima barra dalla lista latest_symbol.
        """
        raise NotImplementedError("Should implement get_latest_bar()")

    @abstractmethod
    def get_latest_bar_datetime(self, symbol):
        """
        Restituisce un oggetto datetime di Python per l'ultima barra.
        """
        raise NotImplementedError("Should implement get_latest_bar_datetime()")\


    @abstractmethod
    def get_latest_bar_value(self, symbol, val_type):
        """
        Restituisce un elemento tra Open, High, Low, Close, Volume o Adj_Close
        from the last bar.
        """
        raise NotImplementedError("Should implement get_latest_bar_value()")


    @abstractmethod
    def get_latest_bars_values(self, symbol, val_type, N=1):
        """
        Restituisce i valori delle ultime N barre dalla lista
        latest_symbol, o N-k se non meno disponibili.
        """
        raise NotImplementedError("Should implement get_latest_bars_values()")

    @abstractmethod
    def update_bars(self):
        """
        Inserisce la barra più recente nella struttura delle barre per
        tutti i simboli della lista di simboli.
        """
        raise NotImplementedError("Should implement update_bars()")
        

 

Dopo aver definito la classe DataHandler, il passo successivo è creare un gestore per i file CSV di dati storici. In particolare, HistoricCSVDataHandler prenderà più file CSV, uno per ciascun simbolo, e li convertirà in un DataFrame di Panda.

Il gestore dati richiede alcuni parametri, ovvero una coda di eventi su cui inviare informazioni di MarketEvent, il percorso assoluto dei file CSV e un elenco di simboli.
Di seguito l’inizializzazione della classe:

            # data.py

class HistoricCSVDataHandler(DataHandler):
    """
    HistoricCSVDataHandler è progettato per leggere dal disco
    fisso un file CSV per ogni simbolo richiesto e fornire
    un'interfaccia per ottenere la barra "più recente" in un
    modo identico a un'interfaccia di live trading.
    """

    def __init__(self, events, csv_dir, symbol_list):
        """
        Inizializza il gestore dei dati storici richiedendo
        la posizione dei file CSV e un elenco di simboli.

        Si presume che tutti i file abbiano la forma
        "symbol.csv", dove symbol è una stringa dell'elenco.

        Parametri:
        events - la coda degli eventi.
        csv_dir - percorso assoluto della directory dei file CSV.
        symbol_list - Un elenco di stringhe di simboli.
        """
        
        self.events = events
        self.csv_dir = csv_dir
        self.symbol_list = symbol_list

        self.symbol_data = {}
        self.latest_symbol_data = {}
        self.continue_backtest = True

        self._open_convert_csv_files()
        

 

Questa funzione prevede quindi di aprire i file nel formato “SYMBOL.csv” dove il SYMBOL è il simbolo del ticker. Il formato dei file corrisponde a quello fornito da DTN IQFeed, ma si può facilmente modificare per gestire formati di dati aggiuntivi. L’apertura dei file è gestita dal seguente metodo _open_convert_csv_files.

Uno dei vantaggi dell’uso della libreria Pandas come archivio all’interno di HistoricCSVDataHandler è la possibilità di unire gli indici di tutti i simboli tracciati. Ciò consente di correggere i punti di dati mancanti in avanti, indietro o interpolati all’interno di questi spazi, in modo tale che i ticker possano essere confrontati “candela-per-candela”. Questo è necessario, ad esempio, per strategie di mean-reverting. Si noti l’uso dei metodi union e reindex quando si combina gli indici di tutti i simboli:

            # data.py

    def _open_convert_csv_files(self):
        """
        Apre i file CSV dalla directory dei dati, convertendoli
        in DataFrame pandas all'interno di un dizionario di simboli.

        Per questo gestore si assumerà che i dati siano
        tratto da DTN IQFeed. Così il suo formato sarà rispettato.
        """
        comb_index = None
        for s in self.symbol_list:
            # Carica il file CSV senza nomi delle colonne, indicizzati per data
            self.symbol_data[s] = pd.io.parsers.read_csv(
                                      os.path.join(self.csv_dir, '%s.csv' % s),
                                      header=0, index_col=0,
                                      names=['datetime','open','low','high',
                                             'close','volume','adj_close']
                                  )

            # Combina l'indice per riempire i valori successivi
            if comb_index is None:
                comb_index = self.symbol_data[s].index
            else:
                comb_index.union(self.symbol_data[s].index)

            # Imposta il più recente symbol_data a None
            self.latest_symbol_data[s] = []

        # Indicizza nuovamente i dataframes
        for s in self.symbol_list:
            self.symbol_data[s] = self.symbol_data[s].reindex(index=comb_index, method='pad').iterrows()
            
        

 

Il metodo _get_new_bar crea un generatore python per fornire una versione formattata dei dati OLCHV. Questo significa che le successive chiamate al metodo genereranno una nuova barra fino al raggiungimento della fine dei dati del simbolo:

            # data.py

    def _get_new_bar(self, symbol):
        """
        Restituisce l'ultima barra dal feed di dati come una tupla di
        (sybmbol, datetime, open, low, high, close, volume).
        """
        for b in self.symbol_data[symbol]:
            yield b
        

Di seguito l’implementazione dei metodi astratti di DataHandler. Questi metodi forniscono varie forme di accesso alle barre acquisite. Dipende dalla fonte di acquisizione dati e dalla struttura dati in cui viene acquisita:

            # data.py

    def get_latest_bar(self, symbol):
        """
        Restituisce l'ultima barra dalla lista latest_symbol.
        """
        try:
            bars_list = self.latest_symbol_data[symbol]
        except KeyError:
            print("That symbol is not available in the historical data set.")
            raise
        else:
            return bars_list[-1]


    def get_latest_bars(self, symbol, N=1):
        """
        Restituisce le ultime N barre dall'elenco latest_symbol
        o N-k se non sono tutte disponibili.
        """
        try:
            bars_list = self.latest_symbol_data[symbol]
        except KeyError:
            print("That symbol is not available in the historical data set.")
        else:
            return bars_list[-N:]


    def get_latest_bar_datetime(self, symbol):
        """
        Restituisce un oggetto datetime di Python per l'ultima barra.
        """
        try:
            bars_list = self.latest_symbol_data[symbol]
        except KeyError:
            print("That symbol is not available in the historical data set.")
            raise
        else:
            return bars_list[-1][0]

    def get_latest_bar_value(self, symbol, val_type):
        """
        Restituisce un elemento tra Open, High, Low, Close, Volume o Adj_Close
        from the last bar.
        """
        try:
            bars_list = self.latest_symbol_data[symbol]
        except KeyError:
            print("That symbol is not available in the historical data set.")
            raise
        else:
            return getattr(bars_list[-1][1], val_type)


    def get_latest_bars_values(self, symbol, val_type, N=1):
        """
        Restituisce i valori delle ultime N barre dalla lista
        latest_symbol, o N-k se non meno disponibili.
        """
        try:
            bars_list = self.get_latest_bars(symbol, N)
        except KeyError:
            print("That symbol is not available in the historical data set.")
            raise
        else:
            return np.array([getattr(b[1], val_type) for b in bars_list])

        

 

L’ultimo metodo astratto, update_bars, genera semplicemente un MarketEvent che viene aggiunto alla coda, e aggiunge le ultime barre a latest_symbol_data:

            # data.py

    def update_bars(self):
        """
        Inserisce l'ultima barra nella struttura latest_symbol_data
        per tutti i simboli nell'elenco dei simboli.
        """
        for s in self.symbol_list:
            try:
                bar = self._get_new_bar(s).next()
            except StopIteration:
                self.continue_backtest = False
            else:
                if bar is not None:
                    self.latest_symbol_data[s].append(bar)
        self.events.put(MarketEvent())
        

A questo punto abbiamo implementato un oggetto derivato da DataHandler, che viene utilizzato dai restanti componenti per tenere traccia dei dati di mercato. Gli oggetti Strategy, Portfolio ed ExecutionHandler richiedono i dati di mercato aggiornati, quindi ha senso centralizzare questa gestione al fine di evitare la duplicazione del codice e di possibili bug.

Nel prossimo articolo vedremo la gerarchia della classe Strategy e descriviamo come una strategia può essere progettata per gestire più simboli, generando così più SignalEvents per l’oggetto Portfolio.

 

Per il codice completo riportato in questo articolo, utilizzando il modulo di backtesting event-driven DataBacktest si può consultare il seguente repository di github:
https://github.com/datatrading-info/DataBacktest

Motore di Backtesting con Python – Parte II (Gli Eventi)

Nel precedente articolo abbiamo introdotto la struttura base di un ambiente di backtesting event-driven. Il resto di questa serie di articoli si concentrerà su ciascuna delle gerarchie di classi  che costituiscono il sistema generale. In questo articolo i descrivono gli Eventi e come questi possono essere usati per scambiare informazioni tra gli oggetti.

Come discusso nel precedente articolo, il sistema di trading utilizza due loop: uno esterno e uno interno. Il loop interno gestisce l’acquisizione degli eventi da una coda in memoria, e il loro smistamento verso gli specific componenti che gestiscono la conseguente azione.

In questo sistema ci sono quattro tipi di eventi:

  • MarketEvent: viene attivato quando il loop esterno inizia un nuovo “impulso”. Si verifica quando l’oggetto DataHandler riceve un nuovo aggiornamento dei dati di mercato per tutti i simboli che sono attualmente monitorati. Viene utilizzato per attivare l’oggetto Strategy che genera nuovi segnali di trading. L’oggetto Event contiene semplicemente un’identificazione che si tratta di un evento di mercato, senza altre strutture.
  • SignalEvent: l’oggetto Strategy utilizza i dati di mercato per creare nuovi SignalEvent. SignalEvents contiene un simbolo ticker, il timestamp di quando è stato generato e una direzione (long o short). I SignalEvents sono utilizzati dall’oggetto Portfolio come consigli su come effettuare i trade.
  • OrderEvent: quando un oggetto Portfolio riceve i SignalEvents, questi sono valutati nel contesto generale del portfolio, in termini di rischio e dimensionamento della posizione. Questo processo genera un OrderEvents che verrà inviato a un ExecutionHandler.
  • FillEvent: dopo la ricezione di un OrderEvent, l’ExecutionHandler deve eseguire l’ordine. Una volta che un ordine è stato eseguito, genera un FillEvent, che descrive il costo di acquisto o vendita, nonché i costi di transazione, come le commissioni o lo slippage.

La classe genitore è chiamata Event. È una classe base e non fornisce alcuna funzionalità o interfaccia specifica. Nelle implementazioni successive gli oggetti Event svilupperanno una maggiore complessità e quindi stiamo definendo la progettazione di tali sistemi creando una gerarchia di classi.

            # event.py

class Event(object):
    """
    Event è la classe base che fornisce un'interfaccia per tutti
    i tipi di sottoeventi (ereditati), che attiverà ulteriori 
    eventi nell'infrastruttura di trading.
    """
    pass
        

 

La classe MarketEvent eredita da Event e prevede semplicemente di autoidentificare l’evento come di tipo “MARKET”.

            # event.py

class MarketEvent(Event):
    """
    Gestisce l'evento di ricezione di un nuovo aggiornamento dei 
    dati di mercato con le corrispondenti barre.
    """

    def __init__(self):
        """
        Inizializzazione del MarketEvent.
        """
        self.type = 'MARKET'
        

 

La classe SignalEvent richiede un simbolo ticker, un timestamp della generazione e una direzione per “avvisare” un oggetto Portfolio.

            # event.py

class SignalEvent(Event):
    """
    Gestisce l'evento di invio di un Segnale da un oggetto Strategia.
    Questo viene ricevuto da un oggetto Portfolio e si agisce su di esso.
    """

    def __init__(self, symbol, datetime, signal_type):
        """
        Inizializzazione del SignalEvent.

        Parametri:
        symbol - Il simbolo del ticker, es. 'GOOG'.
        datetime - Il timestamp al quale il segnale è stato generato.
        signal_type - 'LONG' o 'SHORT'.
        """

        self.type = 'SIGNAL'
        self.symbol = symbol
        self.datetime = datetime
        self.signal_type = signal_type
        

 

La classe OrderEvent è leggermente più complessa rispetto alla SignalEvent poiché contiene un campo quantità, oltre alle già citate proprietà di SignalEvent. La quantità è determinata dai vincoli del portafoglio. Inoltre, OrderEvent ha un metodo print_order(), utilizzato per inviare le informazioni alla console, se necessario.

            # event.py

class OrderEvent(Event):
    """
    Gestisce l'evento di invio di un ordine al sistema di esecuzione.
    L'ordine contiene un simbolo (ad esempio GOOG), un tipo di ordine
    (a mercato o limite), una quantità e una direzione.
    """

    def __init__(self, symbol, order_type, quantity, direction):
        """
        Inizializza il tipo di ordine, impostando se è un ordine a mercato
        ('MKT') o un ordine limite ('LMT'), la quantità (integral) 
        e la sua direzione ('BUY' or 'SELL').

        Parametri:
        symbol - Lo strumento da tradare.
        order_type - 'MKT' o 'LMT' per ordine Market or Limit.
        quantity - Intero non negativo per la quantità.
        direction - 'BUY' o 'SELL' per long o short.
        """

        self.type = 'ORDER'
        self.symbol = symbol
        self.order_type = order_type
        self.quantity = quantity
        self.direction = direction

    def print_order(self):
        """
        Stampa dei valori che compongono l'ordine.
        """
        print
        "Order: Symbol=%s, Type=%s, Quantity=%s, Direction=%s" % \
        (self.symbol, self.order_type, self.quantity, self.direction)
        

 

La classe FillEvent è l’evento con la maggiore complessità. Contiene un timestamp di quando è stato effettuato un ordine, il simbolo dell’ordine e l’exchange su cui è stato eseguito, la quantità di azioni negoziate, il prezzo effettivo dell’acquisto e la commissione sostenuta.

La commissione viene calcolata utilizzando le commissioni di Interactive Brokers. Per l’azionario statunitenze questa commissione è pari ad un minimo di 1 USD per ordine, con una tariffa fissa di 0,005 USD per azione.

            # event.py

class FillEvent(Event):
    """
    Incorpora il concetto di un ordine eseguito, come restituito
    da un broker. Memorizza l'effettiva quantità scambiata di
    uno strumento e a quale prezzo. Inoltre, memorizza
    la commissione del trade applicata dal broker.
    """

    def __init__(self, timeindex, symbol, exchange, quantity,
                 direction, fill_cost, commission=None):
        """
        Inizializza l'oggetto FillEvent. Imposta il simbolo, il broker,
        la quantità, la direzione, il costo di esecuzione e una
        commissione opzionale.

        Se la commissione non viene fornita, l'oggetto Fill la calcola
        in base alla dimensione del trade e alle commissioni di
        Interactive Brokers.

        Parametri:
        timeindex - La risoluzione delle barre quando l'ordine è stato eseguito.
        symbol - Lo strumento che è stato eseguito.
        exchange - Il broker/exchange dove l'ordine è stato eseguito.
        quantity - La quantità effettivamente scambiata.
        direction - La direzione dell'esecuzione ('BUY' o 'SELL')
        fill_cost - Il valore nominale in dollari.
        commission - La commissione opzionale inviata da IB.
        """

        self.type = 'FILL'
        self.timeindex = timeindex
        self.symbol = symbol
        self.exchange = exchange
        self.quantity = quantity
        self.direction = direction
        self.fill_cost = fill_cost

        # Calcolo della commissione
        if commission is None:
            self.commission = self.calculate_ib_commission()
        else:
            self.commission = commission

    def calculate_ib_commission(self):
        """
        Calcolo delle commisioni di trading basate sulla struttura
        delle fee per la API di Interactive Brokers, in USD.

        Non sono incluse le fee di exchange o ECN.

        Basata sulla "US API Directed Orders":
        https://www.interactivebrokers.com/en/index.php?f=commission&p=stocks2
        """
        full_cost = 1.3
        if self.quantity <= 300:
            full_cost = max(0.35, 0.0035 * self.quantity)
        elif self.quantity <= 3000:
            full_cost = max(0.35, 0.002 * self.quantity)
        elif self.quantity <= 20000:
            full_cost = max(0.35, 0.0015 * self.quantity)
        elif self.quantity <= 100000:
            full_cost = max(0.35, 0.001 * self.quantity)
        else:  # Maggiore di 100 mila azioni
            full_cost = max(0.35, 0.0005 * self.quantity)
   #     full_cost = min(full_cost, 1.0 / 100.0 * self.quantity * self.fill_cost)
        return full_cost
        
Nel prossimo articolo della serie vedremo come sviluppare la gerarchia della classe DataHandler che permetta sia backtesting storico che il live trading, tramite la stessa classe di interfaccia.

 

Per il codice completo riportato in questo articolo, utilizzando il modulo di backtesting event-driven DataBacktest si può consultare il seguente repository di github:
https://github.com/datatrading-info/DataBacktest

Motore di Backtesting con Python – Parte I (Struttura Base)

Negli ultimi mesi abbiamo descritto su DataTrading come testare le varie strategie di trading utilizzando Python e Pandas. La natura vettoriale di Pandas permette elaborazioni estremamente rapide su set di dati di grandi dimensioni siano. Tuttavia, gli approcci di backtesting vettorializzato che abbiamo studiato finora presentano alcune criticità nelle modalità di simulazione dell’esecuzione dei trade. In questa serie di articoli discuteremo un approccio più realistico alla simulazione della strategia, usando Python per costruire un ambiente di backtesting basato sugli eventi.

Software basati sugli Eventi

Prima di approfondire lo sviluppo di questo ambiente di backtesting, è necessario introdurre i concetti base dei sistemi basati sugli eventi. I videogiochi forniscono un classico caso d’uso di tale tipologia di software e sono un semplice esempio da studiare. Un videogioco ha più componenti che interagiscono tra loro in un ambiente real-time con elevati frame-rate. Questo viene gestito grazie all’esecuzione di una serie di calcoli all’interno di un ciclo “infinito” noto come event-loop o game-loop.

Ad ogni tick del game-loop viene chiamata una funzione che si occupa di acquisire l’ultimo evento, quest’ultimo è stato generato da una corrispondente azione precedente all’interno del loop. A seconda della natura dell’evento, come ad esempio la pressione di un tasto o il clic del mouse, sono eseguite specifiche azioni che interromperanno il ciclo o genereranno alcuni eventi aggiuntivi.

Di seguito un esempio del pseudo-codice dell’event-loop:

            while True:  # Esecuzione infinita del loop f
    new_event = get_new_event()   # ottengo l'ultimo evento

    # A seconda del tipo di evento si esegue una azione
    if new_event.type == "LEFT_MOUSE_CLICK":
        open_menu()
    elif new_event.type == "ESCAPE_KEY_PRESS":
        quit_game()
    elif new_event.type == "UP_KEY_PRESS":
        move_player_north()
    # ... e molti altri eventi

    redraw_screen()   # Update dell'output per fornire un'animazione
    tick(50)   # Pausa di 50 millisecondi
        

Il codice verifica continuamente la presenza di nuovi eventi e, in caso affermativo, esegue azioni a seconda del tipo di eventi. In particolare, permette l’illusione di un sistema con risposta in tempo reale dato che il codice viene continuamente ripetuto e quindi si ha una verifica continua degli eventi. Ovviamente, questo è esattamente quello di cui abbiamo bisogno per effettuare simulazioni di trading ad alta frequenza.

Perchè abbiamo bisogno di un Backtesting Event-Driven

I sistemi basati sugli eventi offrono molti vantaggi rispetto a un approccio vettorializzato:

  • Riutilizzo del codice – un backtester basato sugli eventi, in base alla progettazione, può essere utilizzato sia per il backtesting storico sia per il live trading con una minima modifica dei componenti. Questo non è possibile per i backtesting vettorizzati dove tutti i dati devono essere disponibili contemporaneamente per poter effettuare analisi statistiche.
  • Bias di Look-Ahead – con un backtesting basato sugli eventi non vi è alcun bias di previsione perché l’acquisizione dei dati finanziari è gestita come un “evento” su cui si deve effettuare specifiche azioni. In questo modo è possibile un ambiente di backtestering event-driven è alimentato  “instante dopo instante” con i dati di mercato, replicando il comportamento di un sistema di gestione degli ordini e del portafoglio.
  • Realismo – I backtesting basati sugli eventi consentono una significativa personalizzazione del modalità di esecuzione degli ordini e dei costi di transazione sostenuti. È semplice gestire il market-order e il limit-order, oltre al market-on-open (MOO) e al market-on-close (MOC), poiché è possibile costruire un gestore di exchange personalizzato.

 

Sebbene i sistemi basati sugli eventi siano dotati di numerosi vantaggi, essi presentano due importanti svantaggi rispetto ai più semplici sistemi vettorizzati. Innanzitutto sono molto più complessi da implementare e testare. Ci sono più “parti mobili” che causano una maggiore probabilità di introdurre bug. Per mitigare questa criticità è possibile implementare una  metodologia di testing del software, come il test-driven development.

In secondo luogo, hanno tempi di esecuzione più lenti da eseguire rispetto a un sistema vettorializzato. Le operazioni vettoriali ottimizzate non possono essere utilizzate quando si eseguono calcoli matematici. Discuteremo dei modi per superare queste limitazioni negli articoli successivi.

Struttura di un sistema di Backtesting Event-Driven

Per applicare un approccio event-driven a un sistema di backtesting è necessario definire i componenti base (o oggetti) che gestiscono compiti specifici:

  • Event: l’Event è la classe fondamentale di un sistema event-driven. Contiene un attributto “tipo” (ad esempo, “MARKET”, “SIGNAL”, “ORDER” o “FILL”) che determina come viene gestito uno specifico evento all’interno dell’event-loop.
  • Event Queue: la Coda degli Eventi è un oggetto Python Queue che memorizza tutti gli oggetti della sotto-classe Event generati dal resto del software.
  • DataHandler: il DataHandler è una classe base astratta (ABC) che presenta un’interfaccia per la gestione di dati storici o del mercato in tempo reale. Fornisce una significativa flessibilità in quanto i moduli della strategia e del portfolio possono essere riutilizzati da entrambi gli approcci. Il DataHandler genera un nuovo MarketEvent ad loop del sistema (vedi sotto).
  • Strategy: anche la Strategy è una classe ABC e presenta un’interfaccia per elaborare i dati di mercato e generare i corrispondenti SignalEvents, che vengono infine utilizzati dall’oggetto Portfolio. Un SignalEvent contiene un simbolo ticker, una direzione (LONG or SHORT) e un timestamp.
  • Portfolio: si tratta di una classe ABC che implementa la gestione degli ordini associata alle posizioni attuali e future di una strategia. Svolge anche la gestione del rischio in tutto il portafoglio, compresa l’esposizione settoriale e il dimensionamento delle posizioni. In un’implementazione più sofisticata, questo potrebbe essere delegato a una classe RiskManagement. La classe Portfolio prende un SignalEvents dalla coda e genera uno o più OrderEvents che vengono aggiunti alla coda.
  • ExecutionHandler: l’ExecutionHandler simula una connessione a una società di intermediazione o broker. Il suo compito consiste nel prelevare gli OrderEvents dalla coda ed eseguirli, tramite un approccio simulato o una connessione reale verso il broker. Una volta eseguiti gli ordini, il gestore crea i FillEvents, che descrivono ciò che è stato effettivamente scambiato, comprese le commissioni, lo spread e lo slippage (se modellato).
  • Loop – Tutti questi componenti sono racchiusi in un event-loop che gestisce correttamente tutti i tipi di eventi, indirizzandoli al componente appropriato.

Questo è il modello base di un motore di trading. Vi è un significativo margine di espansione, in particolare per quanto riguarda l’utilizzo del portafoglio. Inoltre, i diversi modelli di costo delle transazioni possono essere implementati utilizzando una propria gerarchia di classi. In questa fase però introdurrebbe una complessità inutile all’interno di questa serie di articoli, quindi al momento non viene approfondita ulteriormente. Nei tutorial successivi si potrà pensare di  espandere il sistema per includere ulteriori gradi di realismo.

Di seguito potete trovare il codice Python che mostra  come il backtester funziona in pratica. Ci sono due loop nidificati all’interno del codice. Il loop esterno è usato per dare al backtester un impulso, o ritmo. Nel live trading questa è la frequenza con cui vengono acquisiti i nuovi dati di mercato. Per le strategie di backtesting questo non è strettamente necessario poiché il backtester utilizza i dati di mercato forniti in forma di drip-feed (vedi la riga bars.update_bars ()).

Il ciclo interno gestisce effettivamente gli eventi dall’oggetto Queue. Gli Eventi specifici sono delegati al rispettivo componente e successivamente vengono aggiunti nuovi eventi alla coda. Quando la coda degli eventi è vuota, si riprende il ciclo esterno:

            # Dichiarazione dei componenti e rispettive classi 
bars = DataHandler(..)
strategy = Strategy(..)
port = Portfolio(..)
broker = ExecutionHandler(..)

while True:
    # Update delle barre dei prezzi (codice specifico per il backtesting, opposto al live trading)
    if bars.continue_backtest == True:
        bars.update_bars()
    else:
        break

    # Gestione degli eventi
    while True:
        try:
            event = events.get(False)
        except Queue.Empty:
            break
        else:
            if event is not None:
                if event.type == 'MARKET':
                    strategy.calculate_signals(event)
                    port.update_timeindex(event)

                elif event.type == 'SIGNAL':
                    port.update_signal(event)

                elif event.type == 'ORDER':
                    broker.execute_order(event)

                elif event.type == 'FILL':
                    port.update_fill(event)

    # pausa di 10 minuti
    time.sleep(10 * 60)
        
Questo è lo schema di base di come è progettato un ambiente di backtesting basato sugli eventi. Nel prossimo articolo si descrive la gerarchia della classe Events.

Utilizzo della Cross-Validation per ottimizzare un metodo di Machine Learning: configurazione della Regressione

cross-validation-machine-learning-trading-algoritmico

Una delle aree più problematiche del trading quantitativo è l’ottimizzazione di una strategia di previsione per migliorarne le prestazioni.

I trader quantistici esperti sono ben consapevoli che è fin troppo facile generare una strategia con capacità predittive stellari durante un backtest. Tuttavia, alcuni backtest possono mascherare il pericolo di un modello overfit , che può portare a una drastica sottoperformance quando viene implementata una strategia.

In questo articolo descriviamo un approccio per ridurre il problema dell’overfitting di un modello di machine learning, utilizzando una tecnica nota come cross-validation.

Per prima cosa introduciamo la definizione della cross-validation e poi descriviamo il funzionamento. In secondo luogo, costruiamo un modello di previsione utilizzando un indice azionario e quindi applichiamo due metodi di validazione incrociata a questo esempio: il validation set approach e k-fold cross-validation. Infine discuteremo il codice per le simulazioni utilizzando Python, Pandas , Matplotlib e Scikit-Learn .

Questo articolo è il “successore spirituale” di un precedente articolo scritto di recente sul compromesso tra bias e varianza . In quell’articolo abbiamo menzionato la convalida incrociata come un mezzo per risolvere alcuni dei problemi causati dal compromesso bias-varianza.

Il nostro obiettivo è infine creare una serie di strumenti statistici che possono essere utilizzati all’interno di un motore di backtesting per aiutarci a ridurre al minimo il problema dell’overfitting di un modello e quindi limitare le perdite future a causa di una strategia scarsamente performante.

Panoramica della Cross-Validation

Nel precedente articolo sul compromesso bias-varianza sono state introdotte le definizioni di errore di test e flessibilità:

  • Errore di test: l’errore medio, dove la media è calcolata tra molte osservazioni, associato alle prestazioni predittive di un particolare modello statistico quando è valutato su nuove osservazioni che non sono state utilizzate per addestrare il modello .
  • Flessibilità : i gradi di libertà a disposizione del modello per “adattarsi” ai dati di addestramento. Una regressione lineare è molto rigida (ha solo due gradi di libertà) mentre un polinomio di alto grado è molto flessibile (e come tale può avere molti gradi di libertà).

Con questi concetti in mente possiamo ora definire la cross-validation:

L’obiettivo della cross-validation è stimare l’errore di test associato a un modello statistico o selezionare il livello di flessibilità appropriato per un particolare metodo statistico.

Ancora una volta, possiamo ricordare dall’articolo sul compromesso bias-varianza che l’ errore di addestramento associato a un modello può sottovalutare notevolmente l’errore di test del modello. La convalida incrociata ci fornisce la capacità di stimare più accuratamente l’errore di test, che non conosceremo mai nella pratica.

La convalida incrociata consiste nell’escludere specifici sottoinsiemi dai dati di addestramento al fine di usarli come osservazioni di test. In questo articolo discuteremo i vari modi in cui tali sottoinsiemi vengono distribuiti e implementeremo i metodi usando Python su un modello di previsione di esempio basato su dati storici precedenti.

Esempio di previsione

Per rendere concreta la seguente discussione teorica prenderemo in considerazione lo sviluppo di una nuova strategia di trading basata sulla previsione dei livelli di prezzo di un indice azionario. Prenderemo in considerazione l’indicie S&P500, che contiene un raggruppamento ponderato delle cinquecento aziende più grandi società quotate (per capitalizzazione di mercato) a Wall Street. Allo stesso modo potremmo scegliere l’Euro Stoxx 50  o il DAX .

Per questa strategia considereremo semplicemente il prezzo di chiusura delle barre giornaliere storiche Open-High-Low-Close (OHLC) come predittori e il prezzo di chiusura del giorno successivo come risposta. Quindi stiamo tentando di prevedere il prezzo di domani utilizzando i prezzi storici giornalieri.

Un’osservazione sarà costituito da una coppia di vettori , \(X\) e \(y\), che contengono rispettivamente i valori predittori e il valore di risposta. Se consideriamo un ritardo giornaliero di \(p\) giorni, \(X\) ha \(p\) componenti. Ciascuno di questi componenti rappresenta il prezzo di chiusura di un giorno precedente. \(X_p\) rappresenta il prezzo di chiusura di oggi (noto), mentre \(X_{p-1}\) rappresenta il prezzo di chiusura di ieri, mentre \(X_1\) rappresenta il prezzo di \(p-1\) giorni fa.

\(Y\) contiene un solo valore, vale a dire il prezzo di chiusura di domani, ed è quindi uno scalare. Ogni osservazione è una tupla \((X, y)\). Considereremo una serie di \(n\) osservazioni corrispondenti a \(n\) giorni di informazioni storiche sui prezzi del SP500.

Il nostro obiettivo è trovare un modello statistico che tenti di prevedere il livello di prezzo del SP500 in base ai prezzi dei giorni precedenti. Se dovessimo ottenere una previsione accurata, potremmo usarla per generare segnali di trading di base. Questo articolo è principalmente interessato alla parte precedente del modello, quella della componente predittiva.

Useremo la convalida incrociata in due modi: in primo luogo per stimare l’errore di test di particolari metodi di apprendimento statistico (cioè le loro separate prestazioni predittive), e in secondo luogo per selezionare la flessibilità ottimale del metodo scelto al fine di minimizzare gli errori associati a bias e varianza.

Descriveremo ora i diversi modi di eseguire la convalida incrociata, iniziando con l’ approccio dell’insieme di convalida e poi infine con la convalida incrociata k-fold . In ogni caso useremo Pandas e Scikit-Learn per implementare questi metodi.

Validation Set Approach

L’approccio del set di convalida alla cross-validation è molto semplice da eseguire. Essenzialmente prendiamo l’insieme delle osservazioni (\(n\) giorni di dati) e le dividiamo casualmente in due metà. Una metà è nota come set di addestramento mentre la seconda metà è nota come set di convalida . Il modello si adatta utilizzando solo i dati nel set di addestramento, mentre il suo errore di test viene stimato utilizzando solo il set di convalida.

Questo è facilmente riconoscibile come una tecnica spesso utilizzata nel trading quantitativo come meccanismo per valutare le prestazioni predittive. Tuttavia, è più comune trovare due terzi dei dati utilizzati per il set di addestramento, mentre il terzo rimanente viene utilizzato per la convalida. Inoltre è più comune mantenere l’ordine delle serie temporali in modo tale che i primi due terzi rappresentino cronologicamente i primi due terzi dei dati storici.

Meno frequente è l’applicazione di questo metodo per la randomizzazione delle osservazioni in ciascuno dei due set. Ancora meno frequente è una discussione su quali sottili problemi possono sorgere quando si esegue questa randomizzazione.

In primo luogo, e soprattutto in situazioni con dati limitati, la procedura può portare ad un’elevata varianza per la stima dell’errore di test dovuto alla randomizzazione dei campioni. Questo è un tipico “trucco” quando si esegue l’approccio del set di convalida alla cross-validation. È fin troppo facile ottenere un basso errore di test semplicemente tramite un caso fortunato nel dividere appropriatamente attraverso la fortuna cieca nel ricevere una divisione del campione casuale appropriata. Quindi il vero errore di test (cioè il potere predittivo) può essere notevolmente sottostimato .

In secondo luogo, si noti che nella divisione 50-50 dei dati  addestramento/testing tralasciamo la metà di tutte le osservazioni. Quindi stiamo riducendo le informazioni che altrimenti sarebbero utilizzate per addestrare il modello. Quindi è probabile che abbia prestazioni peggiori rispetto a se avessimo usato tutte le osservazioni, comprese quelle nel set di convalida. Ciò porta a una situazione in cui potremmo effettivamente sovrastimare l’errore di test per l’intero set di dati.

Al fine di ridurre l’impatto di questi problemi, prenderemo in considerazione una suddivisione più sofisticata dei dati nota come convalida incrociata k-fold.

k-Fold Cross Validation

La convalida incrociata K-fold migliora validation set approach dividendo le \(n\) osservazioni in  \(k\) sottoinsiemi che si escludono a vicenda e di dimensioni approssimativamente uguali note come “fold”.

Il primo fold diventa un set di convalida, mentre i restanti \(k-1\) fold (aggregati insieme) diventano il set di addestramento. Il modello si adatta al set di addestramento e il suo errore di test viene stimato sul set di convalida. Questa procedura viene ripetuta \(k\) volte, con ciascuna ripetizione che offre un fold come set di convalida, mentre i restanti \(k-1\) vengono utilizzati per l’addestramento.

Ciò consente di calcolare una stima complessiva del test, \(\text{CV}_k\), che è una media di tutti i singoli errori quadratici medi, \(\text{MSE}_i\), per ogni fold:

\(\begin{eqnarray} \text {CV} _k = \frac {1} {k} \sum ^ {k} _ {i = 1} \text {MSE} _i \end{eqnarray}\)

La domanda ovvia che ci si pone in questa fase è come scegliere il valore di \(k\)? La risposta semplice (basata su studi empirici) è scegliere \(k = 5\) o \( k = 10 \). La risposta completa a questa domanda si riferisce sia alla spesa computazionale sia , ancora una volta, al compromesso bias-varianza.

Leave-One-Out Cross Validation

Possiamo effettivamente scegliere \(k = n\), cioè adattiamo il modello \(n\) volte, con una sola osservazione tralasciata per ogni adattamento. Questo è noto come validazione incrociata leave-one-out (LOOCV). Può essere molto costoso in termini di calcolo, soprattutto se \(n\) è grande e il modello ha una procedura di adattamento costosa.

Sebbene LOOCV sia vantaggioso per ridurre il bias , poiché quasi tutti i campioni vengono utilizzati per l’adattamento, in realtà soffre del problema dell’elevata varianza. Questo perché stiamo calcolando l’errore di test ogni volta su una singola risposta per ogni osservazione nel set di dati.

La convalida incrociata k-fold riduce la varianza a scapito dell’introduzione di qualche bias in più, dato che alcune delle osservazioni non sono utilizzate per l’addestramento. Con \(k = 5\) o \(k = 10\) il compromesso bias-varianza è generalmente ottimizzato.

Implementazione in Python

Siamo abbastanza fortunati quando lavoriamo con Python e il suo ecosistema di librerie, poiché gran parte del “lavoro pesante” è già stato implementato e così risparmiamo molto tempo e mal di testa! Utilizzando Pandas, Scikit-Learn e Matplotlib, possiamo creare rapidamente alcuni esempi per mostrare l’utilizzo e le problematiche relative alla convalida incrociata.

Se non hai ancora configurato un ambiente di ricerca Python, ti consiglio vivamente di scaricare il pacchetto Anaconda di Continuum Analytics che fornisce tutte le librerie che utilizzeremo in questo articolo e un IDE pronto per l’uso chiamato Spyder.

Ottenere i dati

Il primo compito è ottenere i dati e metterli in un formato che possiamo usare. In realtà abbiamo già eseguito questa procedura in un articolo precedente , ma vale la pena provare ad avere questi articoli il più autonomi possibile! Quindi, puoi utilizzare il seguente codice per ottenere dati storici di qualsiasi serie temporale finanziaria disponibile su Yahoo Finanza, nonché i valori di ritardo predittivi giornalieri associati:

            import datetime
import numpy as np
import pandas as pd
import sklearn
import pandas_datareader as pdr


def create_lagged_series(symbol, start_date, end_date, lags=5):
    """
    Si crea un DataFrame pandas che memorizza i rendimenti percentuali dei
    prezzi di chiusura rettificata di un titolo azionario ottenuta da Yahoo
    Finance, insieme a una serie di rendimenti ritardati dai giorni di negoziazione
    precedenti (i valori predefiniti ritardano di 5 giorni).
    Sono inclusi anche il volume degli scambi, così come la direzione del giorno
    precedente.
    """

    # Ottieni informazioni sulle azioni da Yahoo Finance
    ts = pdr.get_data_yahoo(symbol, start_date-datetime.timedelta(days=365), end_date)

    # Crea un nuovo Dataframe per i ritardi
    tslag = pd.DataFrame(index=ts.index)
    tslag["Today"] = ts["Adj Close"]
    tslag["Volume"] = ts["Volume"]

    # Crea la serie traslata dei ritardi dei prezzi di chiusura del 
    # periodo (giorno) precedente
    for i in range(0,lags):
        tslag["Lag%s" % str(i+1)] = ts["Adj Close"].shift(i+1)

    # Crea il DataFrame dei ritorni
    tsret = pd.DataFrame(index=tslag.index)
    tsret["Volume"] = tslag["Volume"]
    tsret["Today"] = tslag["Today"].pct_change()*100.0

    # Se qualsiasi valore dei ritorni percentuali è uguale a zero, si imposta 
    # a un numero piccolo (per non avere problemi con il QDA in scikit-learn)
    for i,x in enumerate(tsret["Today"]):
        if (abs(x) < 0.0001):
            tsret["Today"][i] = 0.0001

    # Crea la serie dei ritorni precedenti percentuali
    for i in range(0,lags):
        tsret["Lag%s" % str(i+1)] = tslag["Lag%s" % str(i+1)].pct_change()*100.0

    # Crea la serie "Direction" (+1 o -1) che indica un giorno up/down
    tsret["Direction"] = np.sign(tsret["Today"])
    tsret = tsret[tsret.index >= start_date]

    return tsret
        

Da notare che non memorizziamo i valori del prezzo di chiusura direttamente nelle colonne “Today” o “Lags”. Invece, stiamo memorizzando il rendimento percentuale tra i prezzi di chiusura di un giorno e del giorno precedente.

Dobbiamo ottenere i dati per i prezzi giornalieri del SP500 per un periodo di tempo adeguato. Si considera dal 1 ° gennaio 2004 al 31 dicembre 2004. Tuttavia questa è una scelta arbitraria. Si può regolare l’intervallo di tempo come meglio si crede. Per ottenere i dati e inserirli in un Pandas DataFrame chiamato sp500_lags possiamo utilizzare il seguente codice:

            if __name__ == "__main__":
    symbol = "^GSPC"
    start_date = datetime.datetime(2004, 1, 1)
    end_date = datetime.datetime(2004, 12, 31)
    sp500_lags = create_lagged_series(symbol, start_date, end_date, lags=5)
        

A questo punto abbiamo i dati necessari per iniziare a creare una serie di modelli statistici di machine learning..

Validation Set Approach

Ora che abbiamo i dati finanziari necessari per creare una serie di modelli di regressione predittiva, possiamo utilizzare i metodi di convalida incrociata sopra riportati per ottenere stime per l’errore di test.

Il primo compito è importare i modelli da Scikit-Learn. Scegliamo un modello di regressione lineare con caratteristiche polinomiali. Questo ci fornisce la possibilità di scegliere diversi gradi di flessibilità semplicemente aumentando il grado dell’ordine polinomiale delle features. Inizialmente si considera l’approccio del set di convalida per la cross-validation.

Scikit-Learn fornisce un approccio a set di convalida tramite il metodo train_test_split trovato nel modulo cross_validation. Successivamente dobbiamo importare il metodo KFold per la convalida incrociata k-fold, così come il modello di regressione lineare stesso. Dobbiamo importare il calcolo MSE così come Pipeline PolynomialFeatures. Gli ultimi due metodi ci consentono di creare facilmente un insieme di modelli di regressione lineare di feature polinomiali con una minima codifica aggiuntiva:

            ..
from sklearn.model_selection import train_test_split, KFold
from sklearn.linear_model import LinearRegression
from sklearn.metrics import mean_squared_error
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import PolynomialFeatures
..
        

Una volta importati i moduli, possiamo creare un DataFrame SP500 che utilizza i rendimenti in ritardo dei cinque giorni precedenti come predittori. Possiamo quindi creare dieci separate suddivisioni casuali dei dati in un set di addestramento e convalida.

Infine, per gradi multipli delle features polinomiali della regressione lineare, possiamo calcolare l’errore di test. Questo ci fornisce dieci separate curve di errore di test, ogni valore delle quali mostra il test MSE per un grado diverso del kernel polinomiale:

            ..
..

def validation_set_poly(random_seeds, degrees, X, y):
    """
    Utilizza il metodo train_test_split per creare un set
    di addestramento e un set di convalida (50% per ciascuno)
    utilizzando separati campionamenti casuali "random_seeds"
    per modelli di regressione lineare di varia flessibilità
    """
    sample_dict = dict([("seed_%s" % i,[]) for i in range(1, random_seeds+1)])
    # Esegui un ciclo su ogni suddivisione casuale in una suddivisione train-test
    for i in range(1, random_seeds+1):
        print("Random: %s" % i)
        # Aumenta il grado di ordine polinomiale della regressione lineare
        for d in range(1, degrees+1):
            print("Degree: %s" % d)
            # Crea il modello, divide gli insiemi e li addestra
            polynomial_features = PolynomialFeatures(
                degree=d, include_bias=False
            )
            linear_regression = LinearRegression()
            model = Pipeline([
                ("polynomial_features", polynomial_features),
                ("linear_regression", linear_regression)
            ])
            X_train, X_test, y_train, y_test = train_test_split(
                X, y, test_size=0.5, random_state=i
            )
            model.fit(X_train, y_train)
            # Calcola il test MSE e lo aggiunge al
            # dizionario di tutte le curve di test
            y_pred = model.predict(X_test)
            test_mse = mean_squared_error(y_test, y_pred)
            sample_dict["seed_%s" % i].append(test_mse)
        # Converte queste liste in array numpy per calcolare la media
        sample_dict["seed_%s" % i] = np.array(sample_dict["seed_%s" % i])
    # Crea la serie delle "medie dei test MSE" colcolando la media
    # del test MSE per ogni grado dei modelli di regressione lineare,
    # attraverso tutti i campionamenti casuali
    sample_dict["avg"] = np.zeros(degrees)
    for i in range(1, random_seeds+1):
        sample_dict["avg"] += sample_dict["seed_%s" % i]
    sample_dict["avg"] /= float(random_seeds)
    return sample_dict

..
..

        

Possiamo usare Matplotlib per tracciare il grafico di questi dati. Dobbiamo importare pylab e quindi creare una funzione per tracciare le curve di errore di test:

            ..
import pylab as plt
..
..
def plot_test_error_curves_vs(sample_dict, random_seeds, degrees):
    fig, ax = plt.subplots()
    ds = range(1, degrees+1)
    for i in range(1, random_seeds+1):
        ax.plot(ds, sample_dict["seed_%s" % i], lw=2,
                label='Test MSE - Sample %s' % i)

    ax.plot(ds, sample_dict["avg"], linestyle='--', color="black",
                    lw=3, label='Avg Test MSE')
    ax.legend(loc=0)
    ax.set_xlabel('Degree of Polynomial Fit')
    ax.set_ylabel('Mean Squared Error')
    ax.set_ylim([0.0, 4.0])
    fig.set_facecolor('white')
    plt.show()
..
..
        

Abbiamo selezionato il grado delle nostre features polinomiali al variare tra \(d = 1\) e \(d = 3\), prevedendo così un ordine cubico nelle nostre features. La seguente Figura 1 mostra le dieci diverse suddivisioni casuali dei dati di addestramento e test, insieme alla media del test MSE (la linea tratteggiata nera):

trading-algoritmico-cross-val-fig1
Figura 1 - Le curve di test MSE per più divisioni di convalida dell'addestramento per una regressione lineare con features polinomiali di grado crescente.

È immediatamente evidente quanta variazione ci sia tra diverse suddivisioni casuali in un set di addestramento e convalida. Poiché non c’è una grande quantità di segnale predittivo nell’utilizzo dei prezzi di chiusura storici dei giorni precedenti del SP500, vediamo che all’aumentare del grado delle features polinomiali, effettivamente il test MSE aumenta.

Inoltre è chiaro che il set di convalida soffre di una elevata varianza. Il test MSE medio per l’approccio del set di validazione sul modello di grado \(d = 3\) è di circa 1,9.

Per ridurre al minimo questo problema, implementeremo ora la convalida incrociata k-fold sullo stesso set di dati dell’SP500.

k-Fold Cross Validation

Poiché ci siamo già occupati delle importazioni di cui sopra, ci limitiamo a delineare le nuove funzioni per eseguire la convalida incrociata k-fold. Sono quasi identiche alle funzioni utilizzate per la divisione del test di addestramento. Tuttavia, dobbiamo usare l’oggetto  KFold per iterare su \(k \) “fold”.

In particolare l’oggetto KFold fornisce un iteratore che ci permette di indicizzare correttamente i campioni nel data set e creare fold separati di training/test. In questo esempio abbiamo scelto \(k = 10\).

Come con l’approccio dell’insieme di convalida, creiamo una pipeline di trasformazione delle features polinomiali e applichiamo un modello di regressione lineare. Quindi calcoliamo il test MSE e costruiamo curve di test MSE separate per ogni fold. Infine, creiamo una curva MSE media tra le fold:

            ..
..
def k_fold_cross_val_poly(folds, degrees, X, y):
    n = len(X)
    kf = KFold(n, n_folds=folds)
    kf_dict = dict([("fold_%s" % i,[]) for i in range(1, folds+1)])
    fold = 0
    for train_index, test_index in kf:
        fold += 1
        print("Fold: %s" % fold)
        X_train, X_test = X.ix[train_index], X.ix[test_index]
        y_train, y_test = y.ix[train_index], y.ix[test_index]
        # Aumenta il grado di ordine polinomiale della regressione lineare
        for d in range(1, degrees+1):
            print("Degree: %s" % d)
            # Crea il modello e lo addestra
            polynomial_features = PolynomialFeatures(
                degree=d, include_bias=False
            )
            linear_regression = LinearRegression()
            model = Pipeline([
                ("polynomial_features", polynomial_features),
                ("linear_regression", linear_regression)
            ])
            model.fit(X_train, y_train)
            # Calcola il test MSE e lo aggiunge al 
            # dizionario di tutte le curve di test
            y_pred = model.predict(X_test)
            test_mse = mean_squared_error(y_test, y_pred)
            kf_dict["fold_%s" % fold].append(test_mse)
        # Converte queste liste in array numpy per calcolare la media
        kf_dict["fold_%s" % fold] = np.array(kf_dict["fold_%s" % fold])
    # Crea la serie dei "test MSE medi" calcolando la media dei 
    # test MSE per ogni grado del modello di regressione lineare,
    # tramite ogni k folds.
    kf_dict["avg"] = np.zeros(degrees)
    for i in range(1, folds+1):
        kf_dict["avg"] += kf_dict["fold_%s" % i]
    kf_dict["avg"] /= float(folds)
    return kf_dict
..
..
        

Possiamo tracciare queste curve con la seguente funzione:

            ..
..
def plot_test_error_curves_kf(kf_dict, folds, degrees):
    fig, ax = plt.subplots()
    ds = range(1, degrees+1)
    for i in range(1, folds+1):
        ax.plot(ds, kf_dict["fold_%s" % i], lw=2, label='Test MSE - Fold %s' % i)

    ax.plot(ds, kf_dict["avg"], linestyle='--', color="black", 
                                lw=3, label='Avg Test MSE')
    ax.legend(loc=0)
    ax.set_xlabel('Degree of Polynomial Fit')
    ax.set_ylabel('Mean Squared Error')
    ax.set_ylim([0.0, 4.0])
    fig.set_facecolor('white')
    plt.show()
..
..
        

L’output è riportato nella seguente Figura 2:

trading-algoritmico-cross-val-fig2
Figura 2 - Curve dei test MSE per più fold di convalida incrociata k-fold per una regressione lineare con features polinomiali di grado crescente.

Si noti che la variazione tra le curve di errore è molto inferiore rispetto al validation set approch. Questo è l’effetto desiderato dell’esecuzione della convalida incrociata. In particolare, per \(d = 3\) abbiamo un errore di test medio ridotto di circa 0,8.

La convalida incrociata fornisce generalmente una stima molto migliore del vero test MSE, a scapito di qualche lieve bias. Questo di solito è un compromesso accettabile nelle applicazioni di machine learning.

Negli articoli futuri prenderemo in considerazione approcci di ricampionamento alternativi , inclusi Bootstrap, Bootstrap Aggregation (“Bagging”) e Boosting. Si tratta di tecniche più sofisticate che ci aiuteranno a selezionare meglio i nostri modelli e (si spera) a ridurre ulteriormente i nostri errori.

Codice Python Completo

Di seguito il codice Python completo per il file cross_validation.py:

            import datetime
import numpy as np
import pandas as pd
import sklearn
import pandas_datareader as pdr
import pylab as plt

from sklearn.model_selection import train_test_split, KFold
from sklearn.linear_model import LinearRegression
from sklearn.metrics import mean_squared_error
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import PolynomialFeatures



def create_lagged_series(symbol, start_date, end_date, lags=5):
    """
    Si crea un DataFrame pandas che memorizza i rendimenti percentuali dei
    prezzi di chiusura rettificata di un titolo azionario ottenuta da Yahoo
    Finance, insieme a una serie di rendimenti ritardati dai giorni di negoziazione
    precedenti (i valori predefiniti ritardano di 5 giorni).
    Sono inclusi anche il volume degli scambi, così come la direzione del giorno precedente.
    """

    # Ottieni informazioni sulle azioni da Yahoo Finance
    ts = pdr.get_data_yahoo(symbol, start_date-datetime.timedelta(days=365), end_date)

    # Crea un nuovo Dataframe per i ritardi
    tslag = pd.DataFrame(index=ts.index)
    tslag["Today"] = ts["Adj Close"]
    tslag["Volume"] = ts["Volume"]

    # Crea la serie traslata dei ritardi dei prezzi di chiusura del periodo (giorno) precedente
    for i in range(0,lags):
        tslag["Lag%s" % str(i+1)] = ts["Adj Close"].shift(i+1)

    # Crea il DataFrame dei ritorni
    tsret = pd.DataFrame(index=tslag.index)
    tsret["Volume"] = tslag["Volume"]
    tsret["Today"] = tslag["Today"].pct_change()*100.0

    # Se uno qualsiasi dei valori dei ritorni percentuali è uguale a zero, si impostano
    # a un numero piccolo (per non avere problemi con il modello QDA in scikit-learn)
    for i,x in enumerate(tsret["Today"]):
        if (abs(x) < 0.0001):
            tsret["Today"][i] = 0.0001

    # Crea la serie dei ritorni precedenti percentuali
    for i in range(0,lags):
        tsret["Lag%s" % str(i+1)] = tslag["Lag%s" % str(i+1)].pct_change()*100.0

    # Crea la serie "Direction" (+1 o -1) che indica un giorno up/down
    tsret["Direction"] = np.sign(tsret["Today"])
    tsret = tsret[tsret.index >= start_date]

    return tsret


def validation_set_poly(random_seeds, degrees, X, y):
    """
    Utilizza il metodo train_test_split per creare un set
    di addestramento e un set di convalida (50% per ciascuno)
    utilizzando separati campionamenti casuali "random_seeds"
    per modelli di regressione lineare di varia flessibilità
    """
    sample_dict = dict([("seed_%s" % i,[]) for i in range(1, random_seeds+1)])
    # Esegui un ciclo su ogni suddivisione casuale in una suddivisione train-test
    for i in range(1, random_seeds+1):
        print("Random: %s" % i)
        # Aumenta il grado di ordine polinomiale della regressione lineare
        for d in range(1, degrees+1):
            print("Degree: %s" % d)
            # Crea il modello, divide gli insiemi e li addestra
            polynomial_features = PolynomialFeatures(
                degree=d, include_bias=False
            )
            linear_regression = LinearRegression()
            model = Pipeline([
                ("polynomial_features", polynomial_features),
                ("linear_regression", linear_regression)
            ])
            X_train, X_test, y_train, y_test = train_test_split(
                X, y, test_size=0.5, random_state=i
            )
            model.fit(X_train, y_train)
            # Calcola il test MSE e lo aggiunge al
            # dizionario di tutte le curve di test
            y_pred = model.predict(X_test)
            test_mse = mean_squared_error(y_test, y_pred)
            sample_dict["seed_%s" % i].append(test_mse)
        # Converte queste liste in array numpy per calcolare la media
        sample_dict["seed_%s" % i] = np.array(sample_dict["seed_%s" % i])
    # Crea la serie delle "medie dei test MSE" colcolando la media
    # del test MSE per ogni grado dei modelli di regressione lineare,
    # attraverso tutti i campionamenti casuali
    sample_dict["avg"] = np.zeros(degrees)
    for i in range(1, random_seeds+1):
        sample_dict["avg"] += sample_dict["seed_%s" % i]
    sample_dict["avg"] /= float(random_seeds)
    return sample_dict


def plot_test_error_curves_vs(sample_dict, random_seeds, degrees):
    fig, ax = plt.subplots()
    ds = range(1, degrees+1)
    for i in range(1, random_seeds+1):
        ax.plot(ds, sample_dict["seed_%s" % i], lw=2, label='Test MSE - Sample %s' % i)
    ax.plot(ds, sample_dict["avg"], linestyle='--', color="black", lw=3, label='Avg Test MSE')
    ax.legend(loc=0)
    ax.set_xlabel('Degree of Polynomial Fit')
    ax.set_ylabel('Mean Squared Error')
    ax.set_ylim([0.0, 4.0])
    fig.set_facecolor('white')
    plt.show()


def k_fold_cross_val_poly(folds, degrees, X, y):
    n = len(X)
    kf = KFold(n, n_folds=folds)
    kf_dict = dict([("fold_%s" % i,[]) for i in range(1, folds+1)])
    fold = 0
    for train_index, test_index in kf:
        fold += 1
        print("Fold: %s" % fold)
        X_train, X_test = X.ix[train_index], X.ix[test_index]
        y_train, y_test = y.ix[train_index], y.ix[test_index]
        # Aumenta il grado di ordine polinomiale della regressione lineare
        for d in range(1, degrees+1):
            print("Degree: %s" % d)
            # Crea il modello e lo addestra
            polynomial_features = PolynomialFeatures(
                degree=d, include_bias=False
            )
            linear_regression = LinearRegression()
            model = Pipeline([
                ("polynomial_features", polynomial_features),
                ("linear_regression", linear_regression)
            ])
            model.fit(X_train, y_train)
            # Calcola il test MSE e lo aggiunge al
            # dizionario di tutte le curve di test
            y_pred = model.predict(X_test)
            test_mse = mean_squared_error(y_test, y_pred)
            kf_dict["fold_%s" % fold].append(test_mse)
        # Converte queste liste in array numpy per calcolare la media
        kf_dict["fold_%s" % fold] = np.array(kf_dict["fold_%s" % fold])
    # Crea la serie dei "test MSE medi" calcolando la media dei
    # test MSE per ogni grado del modello di regressione lineare,
    # tramite ogni k folds.
    kf_dict["avg"] = np.zeros(degrees)
    for i in range(1, folds+1):
        kf_dict["avg"] += kf_dict["fold_%s" % i]
    kf_dict["avg"] /= float(folds)
    return kf_dict


def plot_test_error_curves_kf(kf_dict, folds, degrees):
    fig, ax = plt.subplots()
    ds = range(1, degrees+1)
    for i in range(1, folds+1):
        ax.plot(ds, kf_dict["fold_%s" % i], lw=2, label='Test MSE - Fold %s' % i)
    ax.plot(ds, kf_dict["avg"], linestyle='--', color="black", lw=3, label='Avg Test MSE')
    ax.legend(loc=0)
    ax.set_xlabel('Degree of Polynomial Fit')
    ax.set_ylabel('Mean Squared Error')
    ax.set_ylim([0.0, 4.0])
    fig.set_facecolor('white')
    plt.show()



if __name__ == "__main__":
    symbol = "^FTSE"
    symbol = "^GSPC"
    start_date = datetime.datetime(2004, 1, 1)
    end_date = datetime.datetime(2004, 12, 31)
    sp500_lags = create_lagged_series(symbol, start_date, end_date, lags=5)

    # Uso tutti e venti i ritorni di 2 giorni precedenti come  
    # valori di predizione, con "Today" come risposta
    X = sp500_lags[[
        "Lag1", "Lag2", "Lag3", "Lag4", "Lag5",
        # "Lag6", "Lag7", "Lag8", "Lag9", "Lag10",
        # "Lag11", "Lag12", "Lag13", "Lag14", "Lag15",
        # "Lag16", "Lag17", "Lag18", "Lag19", "Lag20"
    ]]
    y = sp500_lags["Today"]
    degrees = 3

    # Visualizza le curve dell'errore di test per il set di validazione
    random_seeds = 10
    sample_dict_val = validation_set_poly(random_seeds, degrees, X, y)
    plot_test_error_curves_vs(sample_dict_val, random_seeds, degrees)

    # Visualizza le curve dell'errore di test per il set di k-fold CV
    folds = 10
    kf_dict = k_fold_cross_val_poly(folds, degrees, X, y)
    plot_test_error_curves_kf(kf_dict, folds, degrees)