Pair trading con Python

Backtest ETF: Pair trading con Python

Sommario

In questo articolo descriviamo come implementare il backtest di una strategia di pair trading con Python per il mercato degli ETF. L’articolo fa parte della miniserie “Backtest di una strategia di mean reverting con gli ETF”.

Gli articoli che fanno parte di questa serie sono:

  1. Backtest ETF: Web scraping e Database Sqlite3
  2. Backtest ETF: Creazione di coppie di Ticker
  3. Backtest ETF: Mean-reverting con Python
  4. Backtest ETF: Strategia di Trading con lo Z-score
  5. Backtest ETF: Pair trading con Python 

Concludiamo questa serie di  articoli fornendo il codice completo ed eseguibile che include l’uso di query SQL per estrarre i simboli ticker dal database SQLite che abbiamo creato in un precedente articolo.

Backtest di coppie di ETF

In questo esempio abbiamo eseguito una serie di backtest sui simboli ticker del database, basati su una “nicchia” di “minatori d’oro”. In teoria, questa selezione di ticker basata su alcuni criteri di raggruppamento significativi e non arbitrari ci consente di concentrarci su coppie di simboli che hanno maggiori probabilità di avere una cointegrazione delle serie di prezzi statisticamente significativa 

Per testare altri gruppi di simboli, dobbiamo semplicemente aggiornare la query SQL secondo un qualsiasi criterio ci interessa. Ad esempio, possiamo modificare la query per selezionare i simboli in base a una “Asset Class” di “Fixed Income”. Le possibili combinazioni sono ovviamente abbondanti e potremmo persino incorporare più criteri contemporaneamente per affinare ulteriormente la nostra ricerca.

Il codice

Quindi, dopo aver seguito le istruzioni per creare il nostro database, possiamo copiare e incollare l’intero codice sottostante in uno script python in modo da eseguire tutti i backtest in una volta sola:

				
					from datetime import datetime
import pandas as pd
import numpy as np
from numpy import log, polyfit, sqrt, std, subtract
import statsmodels.tsa.stattools as ts
import statsmodels.api as sm
import matplotlib.pyplot as plt
import seaborn as sns
sns.set_style('whitegrid')
import pprint
import sqlite3 as db
import yfinance as yf


# imposta il percorso del file del database a cui desideriamo connetterci
# questo è univoco e dipende dove si trova il database SQLite nel sistema locale
database = "C:\Users\Datatrading\sqlite_databases\etfs.db"

# questa è l'istruzione SQL contenente le informazioni riguardo
# a quali ticker vogliamo estrarre dal database
# Ad esempio, ho scelto di estrarre tutti i ticker che hanno
# il loro "Focus" corrispondente a "Silver"
sql = 'SELECT Ticker FROM etftable WHERE Focus = "Silver";'

# crea una connessione al database
cnx = db.connect(database)
cur = cnx.cursor()

# eseque l'istruzione SQL e salva i risultati in una variabile chiamata "tickers"
tickers = pd.read_sql(sql, con=cnx)

# crea una lista vuota
symbList = []

# iterazione sul DataFrame e inserimento dei ticker nella lista
for i in range(len(tickers)):
    symbList.append(tickers.ix[i][0])


def get_symb_pairs(symbList):
    """
    symbList è una lista di simboli ETF
    Questa funzione ha una lista di simboli come paramentro
    e restituisce una lista di coppie univoche di simboli
    """

    symbPairs = []
    i = 0

    # scorre la lista e crea tutte le possibili combinazioni di coppie
    # di ticker e aggiunge le coppie alla lista "symbPairs"
    while i < len(symbList) - 1:
        j = i + 1
        while j < len(symbList):
            symbPairs.append([symbList[i], symbList[j]])
            j += 1
        i += 1

    # scorre la lista di coppie appena creato e rimuove
    # tutte le coppie composte da due ticker identici
    for i in symbPairs:
        if i[0] == i[1]:
            symbPairs.remove(i)

    # crea una nuova lista vuota per memorizzare solo coppie univoche
    symbPairs2 = []

    # scorre la lista originale e aggiunge alla nuova lista solo coppie univoche
    for i in symbPairs:
        if i not in symbPairs2:
            symbPairs2.append(i)
    return symbPairs2

symbPairs = get_symb_pairs(symbList)


def backtest(symbList):

    start_date = '2012-01-01'
    end_date = '2020-01-01'

    # Download i dati da Yahoo Finance
    y = yf.download(symbList[0], start=start_date, end=end_date)
    x = yf.download(symbList[1], start=start_date, end=end_date)

    # Rinomina le colonne
    y.rename(columns={'Adj Close': 'price'}, inplace=True)
    x.rename(columns={'Adj Close': 'price'}, inplace=True)

    # Verificare che i dataframe sono della stessa lunghezza
    min_date = max(df.dropna().index[0] for df in [y, x])
    max_date = min(df.dropna().index[-1] for df in [y, x])
    y = y[(y.index >= min_date) & (y.index <= max_date)]
    x = x[(x.index >= min_date) & (x.index <= max_date)]

    ############################################################

    plt.plot(y.price, label=symbList[0])
    plt.plot(x.price, label=symbList[1])
    plt.ylabel('Price')
    plt.xlabel('Time')
    plt.legend(bbox_to_anchor=(1.05, 1), loc=2, borderaxespad=0.)
    plt.show()

    #############################################################

    sns.jointplot(y=y.price, x=x.price, color='b')
    plt.show()

    #############################################################

    # esegue la regressione dei minimi quadrati ordinari per trovare
    # il rapporto di copertura e quindi creare serie di spread
    df1 = pd.DataFrame({'y': y['price'], 'x': x['price']})
    est = sm.OLS(df1.y, df1.x)
    est = est.fit()
    df1['hr'] = -est.params[0]
    df1['spread'] = df1.y + (df1.x * df1.hr)

    ##############################################################

    plt.plot(df1.spread)
    plt.show()

    ##############################################################

    cadf = ts.adfuller(df1.spread)
    print('Augmented Dickey Fuller test statistic =', cadf[0])
    print('Augmented Dickey Fuller p-value =', cadf[1])
    print('Augmented Dickey Fuller 1%, 5% and 10% test statistics =', cadf[4])

    ##############################################################

    def hurst(ts):
        """
        Restituisce l'Esponente Hurst Exponent del vettore della serie temporale ts
        """
        # Crea il range dei valori ritardati
        lags = range(2, 100)

        # Calcola l'array delle variance delle differenze dei ritardi
        tau = [sqrt(std(subtract(ts[lag:], ts[:-lag]))) for lag in lags]

        # Usa una regressione lineare per stimare l'Esponente di Hurst
        poly = polyfit(log(lags), log(tau), 1)

        # Restituisce l'Esponente di Hurst dall'output di polyfit
        return poly[0] * 2.0

    ##############################################################

    print("Hurst Exponent =", round(hurst(df1.spread.to_list()), 2))

    ##############################################################

    # Calcolo della regressione OLS per la serie degli spread e la sua versione ritardata
    spread_lag = df1.spread.shift(1)
    spread_lag.iloc[0] = spread_lag.iloc[1]
    spread_ret = df1.spread - spread_lag
    spread_ret.iloc[0] = spread_ret.iloc[1]
    spread_lag2 = sm.add_constant(spread_lag)
    model = sm.OLS(spread_ret, spread_lag2)
    res = model.fit()

    halflife = int(round(-np.log(2) / res.params[1], 0))

    if halflife <= 0:
        halflife = 1

    print('Halflife = ', halflife)

    ##############################################################

    meanSpread = df1.spread.rolling(window=halflife).mean()
    stdSpread = df1.spread.rolling(window=halflife).std()

    df1['zScore'] = (df1.spread - meanSpread) / stdSpread

    df1['zScore'].plot()
    plt.show()

    ##############################################################

    entryZscore = 2
    exitZscore = 0

    # calcolo num units long
    df1['long entry'] = ((df1.zScore < - entryZscore) & (df1.zScore.shift(1) > - entryZscore))
    df1['long exit'] = ((df1.zScore > - exitZscore) & (df1.zScore.shift(1) < - exitZscore))
    df1['num units long'] = np.nan
    df1.loc[df1['long entry'], 'num units long'] = 1
    df1.loc[df1['long exit'], 'num units long'] = 0
    df1['num units long'][0] = 0
    df1['num units long'] = df1['num units long'].fillna(method='pad')

    # calcolo num units short
    df1['short entry'] = ((df1.zScore >  entryZscore) & ( df1.zScore.shift(1) < entryZscore))
    df1['short exit'] = ((df1.zScore < exitZscore) & (df1.zScore.shift(1) > exitZscore))
    df1.loc[df1['short entry'], 'num units short'] = -1
    df1.loc[df1['short exit'], 'num units short'] = 0
    df1['num units short'][0] = 0
    df1['num units short'] = df1['num units short'].fillna(method='pad')

    df1['numUnits'] = df1['num units long'] + df1['num units short']
    df1['spread pct ch'] = (df1['spread'] - df1['spread'].shift(1)) / ((df1['x'] * abs(df1['hr'])) + df1['y'])
    df1['port rets'] = df1['spread pct ch'] * df1['numUnits'].shift(1)

    df1['cum rets'] = df1['port rets'].cumsum()
    df1['cum rets'] = df1['cum rets'] + 1

    ##############################################################

    try:
        sharpe = ((df1['port rets'].mean() / df1['port rets'].std()) * sqrt(252))
    except ZeroDivisionError:
        sharpe = 0.0

    plt.plot(df1['cum rets'])
    plt.xlabel(symbList[1])
    plt.ylabel(symbList[0])
    plt.show()

    ##############################################################

    start_val = 1
    end_val = df1['cum rets'].iat[-1]

    start_date = df1.iloc[0].name
    end_date = df1.iloc[-1].name
    days = (end_date - start_date).days

    CAGR = round(((float(end_val) / float(start_val)) ** (252.0 / days)) - 1, 4)

    print("CAGR = {}%".format(CAGR * 100))

for i in symbPairs:
    backtest(i)
				
			

Speriamo che questo codice possa essere semplice da copiare e incollare per essere eseguito su una macchina locale da chiunque legga questo articolo.

I risultati

Otteniamo risultati del backtest tutt’altro che sorprendenti e moltissime coppie hanno una equity negativa. Tuttavia ci sono un paio di coppie che generano una performance abbastanza regolare con un trend ascendente (e questo è solo per l’attributo “Niche” = “Gold Miners”), che restituisce un CAGR di circa il 4-5%  e uno Sharpe Ratio di 1,9 – 2.5. Ad esempio,  abbiamo la seguente ecquity per la coppia [GDX, PSAU].
BackTest-ETF-pair-trading-equity-portfolio

Naturalmente, non abbiamo tenuto conto dei costi di transazione, quindi anche piccoli guadagni potrebbero finire in territorio negativo, a seconda di quante operazioni devono essere effettuate.

Con questa serie abbiamo descritto praticamente tutto quello necessario per implementare uno script eseguibile completamente funzionante in modo da eseguire i test iniziali sulle coppie ETF per identificare eventuali possibili candidati per una strategia di ritorno alla media.

Codice completo

In questo articolo abbiamo descritto implementare il backtest di una strategia di pair trading con Python per il mercato degli ETF. Per il codice completo riportato in questo articolo, si può consultare il seguente repository di github: https://github.com/datatrading-info/Backtest_Strategie

Benvenuto su DataTrading!

Sono Gianluca, ingegnere software e data scientist. Sono appassionato di coding, finanza e trading. Leggi la mia storia.

Ho creato DataTrading per aiutare le altre persone ad utilizzare nuovi approcci e nuovi strumenti, ed applicarli correttamente al mondo del trading.

DataTrading vuole essere un punto di ritrovo per scambiare esperienze, opinioni ed idee.

SCRIVIMI SU TELEGRAM

Per informazioni, suggerimenti, collaborazioni...

Scroll to Top