nuovi Analyzer con backtrader

Creare nuovi Analyzer con Backtrader

In un articolo precedente abbiamo descritto come usare gli analyzer di Backtrader in un post precedente, dove abbiamo esaminato gli analyzer built-in TradeAnalyzer e SQN per avere un feedback significativo sull’andamento della strategia. In questo articolo facciamo un ulteriore passo avanti e vediamo come creare nuovi analyzer con backtrader per misurare le prestazione delle strategie di trading algoritmico

Backtrader potrebbe non essere il primo strumento che viene in mente quando si pensa all’analisi statistica. Tuttavia, ci sono alcuni aspetti positivi che lo rendono una buona scelta. In particolare, possiamo usare una piattaforma che già conosciamo e con cui abbiamo familiarità. Non c’è bisogno di passare ore per imparare altri framework “più adatti” all’analisi statistica. Possiamo fruttare il nostro background con Backtrader per ottenere lo stesso risultato in meno tempo. Un altro vantaggio degli analyzser di Backtrader è la possibilità di avere un feedback immediato in tempo reale invece di aspettare la conclusione del backtest. Non è necessario esportare i dati finali in un altro framework per ulteriori analisi.

Nuovi analyzer con backtrader

Per iniziare a creare nuovi analyzer con backtrader, vediamo come creare un semplice analyzer per conteggiare il numero di volte per cui il prezzo colpisce le linee degli indicatori pivot p, s1, s2, r1 e r2. Conoscere la frequenza  con la quale il prezzo “colpisce” il pivot potrebbe essere utile per trovare un edge. Inoltre, può essere utile per determinare il posizionamento dello stop se, ad esempio, il livello s2 è raggiunto solo il x% delle volte. Questi sono solo esempi e il codice proposto ha solamente lo scopo di stimolare nuove idee. Possiamo espandere l’analisi nel codice o guardare un’altra area che può interessante.

Analyzer dei punti pivot

				
					import backtrader as bt
from backtrader import Analyzer
from backtrader.utils import AutoOrderedDict
from backtrader.indicators import PivotPoint

class pivotPointAnalyzer(Analyzer):
    '''
    Analyzer per restituire alcune statistiche di base che mostrano:
        - Totale giorni/periodi analizzati
        - La percentuale con cui p è stata toccata
        - La percentuale con cui s1 è stata toccata
        - La percentuale con cui s2 è stata toccata
        - La percentuale con cui r1 è stata toccata
        - La percentuale con cui r2 è stata toccata
        - Le percentuali con cui s1 e r1 sono state toccate nello stesso giorno/periodo
        - Le percentuali con cui s1 e r2 sono state toccate nello stesso giorno/periodo
        - La percentuale con cui s2 e r1 sono state toccate nello stesso giorno/periodo
        - La percentuale con cui s2 e r1 sono state toccate nello stesso giorno/periodo
        - La percentuale con cui p è stata toccata senza toccare s1
        - La percentuale con cui p è stata toccata senza toccare r1
        - La percentuale con cui s1 è stata toccata senza toccare r1
        - La percentuale con cui r1 è stata toccata senza toccare s1
    '''

    def __init__(self):
        #data1 è il resample di data
        self.pivots = PivotPoint()
        # Non vogliamo stampare l'analyzer
        self.pivots.autoplot = False


    def create_analysis(self):
        hit_desc = ('Mesures the frequency in percent that each pivot\n      '
            'was hit over the course of the time period analyzed\n')

        db_desc = ('Mesures the frequency in percent that a pair of pivots\n      '
            'were hit on the same day over the course of the time period analyzed\n')

        xy_desc = ('Mesures the frequency in percent that one pivot (x)\n      '
            'was hit without hitting another pivot (y) on the same day\n      '
            'over the course of the time period analyzed\n')

        self.rets = AutoOrderedDict()
        self.counts = AutoOrderedDict()
        self.counts.total = 0

        self.counts['Hit']['R1'] = 0
        self.counts['Hit']['R2'] = 0
        self.counts['Hit']['P'] = 0
        self.counts['Hit']['S1'] = 0
        self.counts['Hit']['S2'] = 0

        self.counts['DB']['S1 & R1'] = 0
        self.counts['DB']['S1 & R2'] = 0
        self.counts['DB']['S2 & R1'] = 0
        self.counts['DB']['S2 & R2'] = 0

        self.counts['XY']['x=P y=R2'] = 0
        self.counts['XY']['x=P y=R1'] = 0
        self.counts['XY']['x=P y=S2'] = 0
        self.counts['XY']['x=P y=S1'] = 0
        self.counts['XY']['x=R1 y=S2'] = 0
        self.counts['XY']['x=R1 y=S1'] = 0
        self.counts['XY']['x=R1 y=P'] = 0
        self.counts['XY']['x=R2 y=S2'] = 0
        self.counts['XY']['x=R2 y=S1'] = 0
        self.counts['XY']['x=R2 y=P'] = 0
        self.counts['XY']['x=S1 y=R2'] = 0
        self.counts['XY']['x=S1 y=R1'] = 0
        self.counts['XY']['x=S1 y=P'] = 0
        self.counts['XY']['x=S2 y=R2'] = 0
        self.counts['XY']['x=S2 y=R1'] = 0
        self.counts['XY']['x=S2 y=P'] = 0

        self.rets['Hit']['Description'] = hit_desc
        self.rets['Hit']['R1'] = 0
        self.rets['Hit']['R2'] = 0
        self.rets['Hit']['P'] = 0
        self.rets['Hit']['S1'] = 0
        self.rets['Hit']['S2'] = 0

        self.rets['DB']['Description'] = db_desc
        self.rets['DB']['S1 & R1'] = 0
        self.rets['DB']['S1 & R2'] = 0
        self.rets['DB']['S2 & R1'] = 0
        self.rets['DB']['S2 & R2'] = 0

        self.rets['XY']['Description'] = xy_desc
        self.rets['XY']['x=P y=R2'] = 0
        self.rets['XY']['x=P y=R1'] = 0
        self.rets['XY']['x=P y=S2'] = 0
        self.rets['XY']['x=P y=S1'] = 0
        self.rets['XY']['x=R1 y=S2'] = 0
        self.rets['XY']['x=R1 y=S1'] = 0
        self.rets['XY']['x=R1 y=P'] = 0
        self.rets['XY']['x=R2 y=S2'] = 0
        self.rets['XY']['x=R2 y=S1'] = 0
        self.rets['XY']['x=R2 y=P'] = 0
        self.rets['XY']['x=S1 y=R2'] = 0
        self.rets['XY']['x=S1 y=R1'] = 0
        self.rets['XY']['x=S1 y=P'] = 0
        self.rets['XY']['x=S2 y=R2'] = 0
        self.rets['XY']['x=S2 y=R1'] = 0
        self.rets['XY']['x=S2 y=P'] = 0

    def next(self):
        r2 = self.pivots.lines.r2[-1]
        r1 = self.pivots.lines.r1[-1]
        p = self.pivots.lines.p[-1]
        s1 = self.pivots.lines.s1[-1]
        s2 = self.pivots.lines.s2[-1]

        o = self.data.open[0]
        h = self.data.high[0]
        l = self.data.low[0]
        c = self.data.close[0]

        pivots = [
            ['R2', r2],
            ['R1', r1],
            ['P', p],
            ['S1', s1],
            ['S2', s2]]

        for piv in pivots:
            #if piv[0] == 'r2':
                #print('h: {} L {}, piv {}'.format(h,l, piv[1]))
            if h > piv[1] and l < piv[1]:
                #print('h: {} L {}, piv {}'.format(h,l, piv[1]))
                # Pivot ttoccato
                self.counts['Hit'][piv[0]] +=1

        db_pivots = [
            ['S1 & R1', s1, r1],
            ['S1 & R2', s1, r2],
            ['S2 & R1', s2, r1],
            ['S2 & R2', s2, r2]
        ]
        # DB
        for piv in db_pivots:
            db_conditions = [
                h > piv[1],
                h > piv[2],
                l < piv[1],
                l < piv[2],
            ]
            if all(db_conditions):
                self.counts['DB'][piv[0]] +=1

        # X senza toccare Y
        xy_pivots = [
        ['x=P y=R2', p, r2, 'r'],
        ['x=P y=R1', p, r1, 'r'],
        ['x=P y=S2', p, s2, 's'],
        ['x=P y=S1', p, s1,'s'],
        ['x=R1 y=S2', r1, s2, 's'],
        ['x=R1 y=S1', r1, s1, 's'],
        ['x=R1 y=P', r1, p, 'p'],
        ['x=R2 y=S2', r2, s2, 's'],
        ['x=R2 y=S1', r2, s1, 's'],
        ['x=R2 y=P', r2, p, 'p'],
        ['x=S1 y=R2', s1, r2, 'r'],
        ['x=S1 y=R1', s1, r1, 'r'],
        ['x=S1 y=P', s1, p, 'p'],
        ['x=S2 y=R2', s2, r2, 'r'],
        ['x=S2 y=R1', s2, r1, 'r'],
        ['x=S2 y=P', s2, p, 'p']
        ]
        for piv in xy_pivots:
            if piv[3] == 'r':
                db_conditions = [
                h > piv[1],
                l < piv[1],
                h < piv[2]
                ]
            elif piv[3] == 's':
                db_conditions = [
                h > piv[1],
                l < piv[1],
                l > piv[2]
                ]
            elif piv[3] == 'p':
                db_conditions = [
                h > piv[1],
                l < piv[1],
                (h > piv[2] and l > piv[2]) or (h < piv[2] and l < piv[2])
                ]

            if all(db_conditions):
                self.counts['XY'][piv[0]] +=1


    def stop(self):
        self.counts.total = len(self.data)
        # ITERARE SU COUNTS COSI' LA DESCRIZIONE NON CAUSA ERRORI
        for key, value in self.counts['Hit'].items():
            try:
                perc = round((value / self.counts.total) * 100,2)
                self.rets['Hit'][key] = str(perc) + '%'
            except ZeroDivisionError:
                self.rets['Hit'][key] = '0%'

        # ITERARE SU COUNTS COSI' LA DESCRIZIONE NON CAUSA ERRORI
        for key, value in self.counts['DB'].items():
            try:
                perc = round((value / self.counts.total) * 100,2)
                self.rets['DB'][key] = str(perc) + '%'
            except ZeroDivisionError:
                self.rets['DB'][key] = '0%'

        # ITERARE SU COUNTS COSI' LA DESCRIZIONE NON CAUSA ERRORI
        for key, value in self.counts['XY'].items():
            try:

                perc = round((value / self.counts.total) * 100,2)
                self.rets['XY'][key] = str(perc) + '%'
            except ZeroDivisionError:
                self.rets['XY'][key] = '0%'

        self.rets._close() 
				
			

Commento del codice

Cominciamo dall’inizio. Per creare un analizzatore dobbiamo ereditare la  classe Analyzer di Backtraders, di conseguenza il metodo __init_() è abbastanza semplice. Importiamo semplicemente  l”indicatore PivotPoint di Backtrader e disattiviamo il plottaggio. Successivamente passiamo al  più interessante metodo create_analysis(). Qui impostiamo le metriche che devono essere monitorate dall’analizzatore. Backtrader usa un AutoOrderedDict() per memorizzare le metriche che desideriamo monitorare. Un dizionario ordinato consente al metodo ereditato print()  dall’analyzer di stampare le metriche in un ordine fisso e definito. 

Se hai mai provato a stampare valori da un normale dizionario, potresti aver notato che l’ordine in cui vengono stampate le chiavi può sembrare un po’ casuale. (Probabilmente non è casuale, ma di certo non conosco la logica). Per il metodo print(), di seguito descriviamo un esempio su come usarlo. Gli analyzer integrati in Backtrader usano una convenzione per denominare il dizionario usato per memorizzare le metriche da stampare. Si chiama self.rets. Nel codice di esempio abbiamo un dizionario che segue questa convenzione e un dizionario che non la segue. (countsrets) perché siamo principalmente interessati alla percentuale/frequenza con cui i pivot vengono colpiti  invece che al numero assoluto di volte. Per questo motivo non vogliamo che i conteggi siano inclusi nel dizionario rets e contemporaneamente stampati .

Next()

Gli analyzer hanno un  metodo next() proprio come gli indicatori e le strategie. In questo modo possiamo facilmente usare  le logiche già descritte per implementare gli indicatori ed applicarle a un analyzer. Il metodo next() di questo analyzer verifica se il  prezzo HIGH è al di sopra del pivot E e il LOW è al di sotto di esso. Questo significa che il pivot è stato rotto durante la barra.

Stop()

Il metodo stop() è chiamato alla fine del backtesting. Questo è disponibile anche nelle strategie e negli indicatori. Un esempio è stato descritto nell’articolo fermare il trading live con Backtrader. Nel codice precedente usiamo  il metodo stop() per costruire le percentuali finali dal dizionario dei conteggi. Se desideriamo accedere all’analyzer durante l’esecuzione live o durante un metodo next(), dobbiamo modificare il codice per calcolare le percentuali ad ogni barra.

Usare un analyzer

Nel codice dell’esempio non abbiamo descritto le istruzioni per impostare  cerebro o definire una strategia. Questo perché vogliamo mantenere modulare il codice dell’analyzer in una posizione separata per poterlo riutilizzarlo. Una panoramica su come creare moduli Python per i progetti  con Backtrader è  descritta nell’articolo creare codice modulare con Backtrader. L’analyzer è importato in una strategia quando richiesto. Supponiamo di aver  memorizzato l’analyzer in un modulo, quindi per poterlo usare è necessario creare uno script come segue:

				
					'''
Script di supporto per testare gli analizzatori sviluppati.
'''

import backtrader as bt
from datetime import datetime
from extensions.analyzers import pivotPointAnalyzer
from extensions.misc.datafeeds import OandaCSVMidData

# Crea un'istanza di cerebro
cerebro = bt.Cerebro()


data = OandaCSVMidData(dataname='data/fx/GBP_USD-2005-2017-D1.csv',
                            timeframe=bt.TimeFrame.Days,
                            compression=1)

cerebro.adddata(data)

cerebro.addanalyzer(pivotPointAnalyzer)

# Esecuzione del backtest e restituzione di un elenco di oggetti strategy
stratList = cerebro.run()

# Ottenere il primo oggetto dell'elenco
strat = stratList[0]

for x in strat.analyzers:
    x.print()

cerebro.plot(style='candlestick')
				
			

Il codice precedente non è eseguito immediatamente. È solo a scopo esemplificativo perché:

  • Il codice presuppone che l’analizzatore sia  memorizzato in un sottomodulo analyzers che fa parte del modulo extensions. Possiamo emularlo, creando la tua struttura del modulo o semplicemente copiando ed incollando l’analyzer nello script principale.
  • Il codice usa i dati salvati da Oanda. Dobbiamo sostituire questa parte dove sono memorizzati i dati e lo scrypt del modulo che li  configura.

Aggiungere poi l’analyzer e ottenere i risultati è piuttosto semplice. Una volta importato, lo aggiungiamo con una singola riga cerebro.addanalyzer(pivotPointAnalyzer). Per accedere ai risultati e stamparli dopo la fine dell’esecuzione dobbiamo usare le seguenti righe:

				
					
cerebro.addanalyzer(pivotPointAnalyzer)

# Esecuzione del backtest e restituzione di un elenco di oggetti strategy
stratList = cerebro.run()

# Ottenere il primo oggetto dell'elenco
strat = stratList[0]

for x in strat.analyzers:
    x.print()

				
			

E questo è tutto quello che dobbiamo fare. Una volta configurato ed eseguito, otteniamo un output simile a questo:

creare nuovi analyzer con backtrader

Codice completo

In questo articolo abbiamo descritto come creare nuovi analyzer con backtrader per misurare le prestazione delle strategie di trading algoritmico. Per il codice completo riportato in questo articolo, si può consultare il seguente repository di github:
https://github.com/datatrading-info/BackTrader

Scroll to Top