Files
docker-configs/backtest/strategy/emotional-damage/backtest_emotional_damage.py
2025-07-18 00:00:01 -05:00

348 lines
14 KiB
Python
Executable File

import sqlite3
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
import matplotlib.pyplot as plt
import seaborn as sns
from matplotlib.backends.backend_pdf import PdfPages
import warnings
warnings.filterwarnings('ignore')
class EmotionalDamageStrategy:
def __init__(self, initial_capital=100000):
self.initial_capital = initial_capital
self.cash = initial_capital
self.positions = {} # ticker: shares
self.portfolio_value = []
self.trades = []
self.state = 'QQQ_HOLD' # QQQ_HOLD, CASH_WAIT, VOLATILE_STOCKS
self.fear_threshold = 25
self.greed_threshold = 75
def get_data(self):
"""Load Fear & Greed Index and QQQ/SPY data"""
conn = sqlite3.connect('data/stock_data.db')
# Get Fear & Greed Index
fg_data = pd.read_sql_query('''
SELECT date, fear_greed_index
FROM fear_greed_index
ORDER BY date
''', conn)
fg_data['date'] = pd.to_datetime(fg_data['date'])
fg_data.set_index('date', inplace=True)
# Get QQQ price data (using SPY as proxy since QQQ data might be limited)
spy_data = pd.read_sql_query('''
SELECT date, spy_close
FROM fear_greed_data
ORDER BY date
''', conn)
spy_data['date'] = pd.to_datetime(spy_data['date'])
spy_data.set_index('date', inplace=True)
# Get available tickers for high volatility selection
cursor = conn.cursor()
cursor.execute('SELECT ticker FROM ticker_list WHERE records > 1000')
self.available_tickers = [row[0] for row in cursor.fetchall()]
conn.close()
# Merge data
self.data = pd.merge(fg_data, spy_data, left_index=True, right_index=True, how='inner')
self.data.sort_index(inplace=True)
print(f"Loaded data from {self.data.index.min().strftime('%Y-%m-%d')} to {self.data.index.max().strftime('%Y-%m-%d')}")
print(f"Available tickers for high volatility selection: {len(self.available_tickers)}")
def get_stock_price(self, ticker, date):
"""Get stock price for a specific ticker and date"""
conn = sqlite3.connect('data/stock_data.db')
query = f'''
SELECT close FROM {ticker.lower()}
WHERE date <= ?
ORDER BY date DESC
LIMIT 1
'''
cursor = conn.cursor()
cursor.execute(query, (date.strftime('%Y-%m-%d'),))
result = cursor.fetchone()
conn.close()
return result[0] if result else None
def calculate_volatility(self, tickers, start_date, end_date):
"""Calculate historical volatility for tickers during fear period"""
volatilities = {}
conn = sqlite3.connect('data/stock_data.db')
for ticker in tickers:
try:
query = f'''
SELECT date, close FROM {ticker.lower()}
WHERE date >= ? AND date <= ?
ORDER BY date
'''
df = pd.read_sql_query(query, conn, params=(
start_date.strftime('%Y-%m-%d'),
end_date.strftime('%Y-%m-%d')
))
if len(df) > 10: # Ensure sufficient data
df['returns'] = df['close'].pct_change()
volatility = df['returns'].std() * np.sqrt(252) # Annualized volatility
volatilities[ticker] = volatility
except Exception as e:
continue
conn.close()
return volatilities
def select_top_volatile_stocks(self, fear_start_date, fear_end_date, top_n=10):
"""Select top N most volatile stocks during fear period"""
volatilities = self.calculate_volatility(self.available_tickers, fear_start_date, fear_end_date)
# Sort by volatility and select top N
sorted_vol = sorted(volatilities.items(), key=lambda x: x[1], reverse=True)
top_stocks = [ticker for ticker, vol in sorted_vol[:top_n]]
print(f"Top {top_n} volatile stocks during fear period {fear_start_date.strftime('%Y-%m-%d')} to {fear_end_date.strftime('%Y-%m-%d')}: {top_stocks}")
return top_stocks
def execute_trade(self, date, action, ticker=None, amount=None):
"""Execute a trade and record it"""
if action == 'BUY_QQQ':
# Buy QQQ with all cash
price = self.data.loc[date, 'spy_close'] # Using SPY as QQQ proxy
shares = self.cash / price
self.positions['QQQ'] = shares
self.cash = 0
self.trades.append({
'date': date,
'action': 'BUY_QQQ',
'ticker': 'QQQ',
'shares': shares,
'price': price,
'value': shares * price
})
elif action == 'SELL_QQQ':
# Sell all QQQ
if 'QQQ' in self.positions:
shares = self.positions['QQQ']
price = self.data.loc[date, 'spy_close']
self.cash = shares * price
del self.positions['QQQ']
self.trades.append({
'date': date,
'action': 'SELL_QQQ',
'ticker': 'QQQ',
'shares': shares,
'price': price,
'value': shares * price
})
elif action == 'BUY_VOLATILE':
# Buy volatile stocks with equal weight
if ticker and amount:
price = self.get_stock_price(ticker, date)
if price:
shares = amount / price
self.positions[ticker] = shares
self.trades.append({
'date': date,
'action': 'BUY_VOLATILE',
'ticker': ticker,
'shares': shares,
'price': price,
'value': amount
})
elif action == 'SELL_ALL_VOLATILE':
# Sell all volatile stocks
total_value = 0
for ticker in list(self.positions.keys()):
if ticker != 'QQQ':
shares = self.positions[ticker]
price = self.get_stock_price(ticker, date)
if price:
value = shares * price
total_value += value
self.trades.append({
'date': date,
'action': 'SELL_VOLATILE',
'ticker': ticker,
'shares': shares,
'price': price,
'value': value
})
del self.positions[ticker]
self.cash = total_value
def calculate_portfolio_value(self, date):
"""Calculate total portfolio value at given date"""
total_value = self.cash
for ticker, shares in self.positions.items():
if ticker == 'QQQ':
price = self.data.loc[date, 'spy_close']
else:
price = self.get_stock_price(ticker, date)
if price:
total_value += shares * price
return total_value
def run_backtest(self):
"""Run the emotional damage strategy backtest"""
print("Running Emotional Damage Strategy Backtest...")
self.get_data()
# Start with QQQ
first_date = self.data.index[0]
self.execute_trade(first_date, 'BUY_QQQ')
self.state = 'QQQ_HOLD'
fear_start_date = None
for i, (date, row) in enumerate(self.data.iterrows()):
fg_index = row['fear_greed_index']
if self.state == 'QQQ_HOLD':
# Check if Fear & Greed drops below 25
if fg_index < self.fear_threshold:
self.execute_trade(date, 'SELL_QQQ')
self.state = 'CASH_WAIT'
fear_start_date = date
print(f"{date.strftime('%Y-%m-%d')}: Fear & Greed {fg_index:.1f} < 25, selling QQQ, holding cash")
elif self.state == 'CASH_WAIT':
# Check if Fear & Greed recovers above 25
if fg_index >= self.fear_threshold and fear_start_date:
# Select top volatile stocks during fear period
fear_end_date = date
top_volatile = self.select_top_volatile_stocks(fear_start_date, fear_end_date)
# Buy top volatile stocks with equal weight
if top_volatile:
amount_per_stock = self.cash / len(top_volatile)
for ticker in top_volatile:
self.execute_trade(date, 'BUY_VOLATILE', ticker, amount_per_stock)
self.cash = 0 # All cash invested
self.state = 'VOLATILE_STOCKS'
print(f"{date.strftime('%Y-%m-%d')}: Fear & Greed recovered to {fg_index:.1f}, buying volatile stocks: {top_volatile}")
elif self.state == 'VOLATILE_STOCKS':
# Check if Fear & Greed exceeds 75 (extreme greed)
if fg_index > self.greed_threshold:
self.execute_trade(date, 'SELL_ALL_VOLATILE')
self.execute_trade(date, 'BUY_QQQ')
self.state = 'QQQ_HOLD'
print(f"{date.strftime('%Y-%m-%d')}: Fear & Greed {fg_index:.1f} > 75, selling volatile stocks, buying QQQ")
# Record portfolio value
portfolio_value = self.calculate_portfolio_value(date)
self.portfolio_value.append({
'date': date,
'value': portfolio_value,
'state': self.state,
'fg_index': fg_index
})
print(f"Backtest completed! Total trades: {len(self.trades)}")
def calculate_performance_metrics(self, returns):
"""Calculate performance metrics"""
total_return = (returns.iloc[-1] / returns.iloc[0] - 1) * 100
annual_return = ((returns.iloc[-1] / returns.iloc[0]) ** (252 / len(returns)) - 1) * 100
# Calculate max drawdown
peak = returns.expanding().max()
drawdown = (returns - peak) / peak
max_drawdown = drawdown.min() * 100
# Find max drawdown period
max_dd_date = drawdown.idxmin()
# Calculate Sharpe ratio
daily_returns = returns.pct_change().dropna()
sharpe_ratio = np.sqrt(252) * daily_returns.mean() / daily_returns.std()
# Annual returns by year
annual_rets = {}
for year in returns.index.year.unique():
year_data = returns[returns.index.year == year]
if len(year_data) > 1:
year_return = (year_data.iloc[-1] / year_data.iloc[0] - 1) * 100
annual_rets[year] = year_return
return {
'total_return': total_return,
'annual_return': annual_return,
'max_drawdown': max_drawdown,
'max_drawdown_date': max_dd_date,
'sharpe_ratio': sharpe_ratio,
'annual_returns': annual_rets
}
def run_emotional_damage_backtest():
"""Run the emotional damage strategy and generate results"""
# Run strategy
strategy = EmotionalDamageStrategy(initial_capital=100000)
strategy.run_backtest()
# Convert results to DataFrame
portfolio_df = pd.DataFrame(strategy.portfolio_value)
portfolio_df.set_index('date', inplace=True)
# Get benchmark data (QQQ and SPY)
conn = sqlite3.connect('data/stock_data.db')
benchmark_data = pd.read_sql_query('''
SELECT date, spy_close
FROM fear_greed_data
ORDER BY date
''', conn)
benchmark_data['date'] = pd.to_datetime(benchmark_data['date'])
benchmark_data.set_index('date', inplace=True)
conn.close()
# Align dates
common_dates = portfolio_df.index.intersection(benchmark_data.index)
portfolio_df = portfolio_df.loc[common_dates]
benchmark_data = benchmark_data.loc[common_dates]
# Normalize to starting value for comparison
start_value = 100000
portfolio_df['normalized'] = portfolio_df['value']
# Create QQQ and SPY buy-and-hold benchmarks
benchmark_data['qqq_value'] = start_value * (benchmark_data['spy_close'] / benchmark_data['spy_close'].iloc[0])
benchmark_data['spy_value'] = start_value * (benchmark_data['spy_close'] / benchmark_data['spy_close'].iloc[0])
# Calculate performance metrics
strategy_metrics = strategy.calculate_performance_metrics(portfolio_df['value'])
qqq_metrics = strategy.calculate_performance_metrics(benchmark_data['qqq_value'])
spy_metrics = strategy.calculate_performance_metrics(benchmark_data['spy_value'])
return {
'strategy': strategy,
'portfolio_df': portfolio_df,
'benchmark_data': benchmark_data,
'strategy_metrics': strategy_metrics,
'qqq_metrics': qqq_metrics,
'spy_metrics': spy_metrics
}
if __name__ == "__main__":
results = run_emotional_damage_backtest()
print("Backtest completed! Results ready for PDF generation.")