本项目旨在帮助学习者掌握量化交易中常用的统计模型应用及简单交易策略开发流程。通过设计、实现和优化基于统计模型的交易策略,学习者将深入理解量化交易的核心概念、技术实现及绩效评估方法。
统计套利与统计模型是量化交易中的经典方法,它们基于市场价格的统计特性,捕捉短期市场异常并从中获利。这类策略通常具有较高的胜率和稳定性,是量化投资组合中的重要组成部分。
# backtester.py
import pandas as pd
import numpy as np
from datetime import datetime
import matplotlib.pyplot as plt
class Backtester:
def __init__(self, data, initial_capital=100000.0, commission=0.0003):
"""
初始化回测引擎
参数:
data: DataFrame, 包含日期索引和OHLCV数据
initial_capital: float, 初始资金
commission: float, 交易佣金比例
"""
self.data = data.copy()
self.initial_capital = initial_capital
self.commission = commission
self.positions = pd.Series(0, index=data.index)
self.capital = pd.Series(initial_capital, index=data.index)
self.trades = []
def run_strategy(self, signals):
"""
运行回测策略
参数:
signals: Series, 交易信号 (1: 买入, -1: 卖出, 0: 持仓不变)
返回:
DataFrame, 回测结果
"""
# 确保信号与数据长度一致
if len(signals) != len(self.data):
raise ValueError("信号长度与数据长度不匹配")
# 初始化结果容器
positions = pd.Series(0, index=self.data.index)
holdings = pd.Series(0.0, index=self.data.index)
cash = pd.Series(self.initial_capital, index=self.data.index)
equity = pd.Series(0.0, index=self.data.index)
# 遍历每个交易日
for i in range(len(self.data)):
# 当日价格
close_price = self.data['close'].iloc[i]
# 如果不是第一个交易日,继承前一日持仓
if i > 0:
positions.iloc[i] = positions.iloc[i-1]
cash.iloc[i] = cash.iloc[i-1]
# 交易信号处理
if signals.iloc[i] == 1 and positions.iloc[i] <= 0: # 买入信号
# 计算可买入数量
affordable_shares = int(cash.iloc[i] / (close_price * (1 + self.commission)))
# 更新持仓
new_position = affordable_shares
# 记录交易
if affordable_shares > 0:
self.trades.append({
'date': self.data.index[i],
'action': 'BUY',
'price': close_price,
'quantity': new_position - positions.iloc[i],
'commission': close_price * (new_position - positions.iloc[i]) * self.commission
})
# 更新现金和持仓
trade_cost = close_price * (new_position - positions.iloc[i])
commission_cost = trade_cost * self.commission
cash.iloc[i] -= (trade_cost + commission_cost)
positions.iloc[i] = new_position
elif signals.iloc[i] == -1 and positions.iloc[i] >= 0: # 卖出信号
if positions.iloc[i] > 0:
# 记录交易
self.trades.append({
'date': self.data.index[i],
'action': 'SELL',
'price': close_price,
'quantity': positions.iloc[i],
'commission': close_price * positions.iloc[i] * self.commission
})
# 更新现金和持仓
trade_value = close_price * positions.iloc[i]
commission_cost = trade_value * self.commission
cash.iloc[i] += (trade_value - commission_cost)
positions.iloc[i] = 0
# 计算当日持仓市值和总资产
holdings.iloc[i] = positions.iloc[i] * close_price
equity.iloc[i] = cash.iloc[i] + holdings.iloc[i]
# 整合结果
results = pd.DataFrame({
'positions': positions,
'holdings': holdings,
'cash': cash,
'equity': equity,
'returns': equity.pct_change()
})
self.results = results
return results
def plot_results(self):
"""绘制回测结果图表"""
if not hasattr(self, 'results'):
raise ValueError("请先运行策略回测")
plt.figure(figsize=(12, 8))
# 绘制资产曲线
plt.subplot(2, 1, 1)
plt.plot(self.results.index, self.results['equity'], label='Portfolio Value')
plt.plot(self.results.index, self.results['cash'], label='Cash', alpha=0.5)
plt.title('Backtest Results')
plt.xlabel('Date')
plt.ylabel('Value')
plt.legend()
plt.grid(True)
# 绘制持仓和价格
plt.subplot(2, 1, 2)
plt.plot(self.data.index, self.data['close'], label='Close Price')
plt.fill_between(self.results.index, 0, self.results['positions'],
alpha=0.3, label='Position')
plt.xlabel('Date')
plt.ylabel('Price / Position')
plt.legend()
plt.grid(True)
plt.tight_layout()
plt.show()
def get_performance_stats(self):
"""计算策略绩效统计指标"""
if not hasattr(self, 'results'):
raise ValueError("请先运行策略回测")
daily_returns = self.results['returns'].dropna()
# 计算年化收益率
annual_return = (daily_returns.mean() * 252) * 100
# 计算波动率
volatility = (daily_returns.std() * np.sqrt(252)) * 100
# 计算夏普比率
risk_free_rate = 0.02 # 假设的无风险利率
sharpe_ratio = (annual_return / 100 - risk_free_rate) / (volatility / 100)
# 计算最大回撤
equity = self.results['equity']
rolling_max = equity.cummax()
drawdown = (equity - rolling_max) / rolling_max
max_drawdown = drawdown.min() * 100
# 计算交易统计
num_trades = len(self.trades)
if num_trades > 0:
profit_trades = sum(1 for t in self.trades if t['action'] == 'SELL' and
t['price'] * t['quantity'] - t.get('buy_price', 0) * t['quantity'] > 0)
win_rate = profit_trades / num_trades if num_trades > 0 else 0
else:
win_rate = 0
# 组织结果
stats = {
'Initial Capital': self.initial_capital,
'Final Equity': self.results['equity'].iloc[-1],
'Total Return (%)': ((self.results['equity'].iloc[-1] / self.initial_capital) - 1) * 100,
'Annual Return (%)': annual_return,
'Volatility (%)': volatility,
'Sharpe Ratio': sharpe_ratio,
'Max Drawdown (%)': max_drawdown,
'Number of Trades': num_trades,
'Win Rate (%)': win_rate * 100 if num_trades > 0 else 0
}
return stats
# market_simulator.py
import pandas as pd
import numpy as np
import yfinance as yf
from datetime import datetime, timedelta
class MarketSimulator:
def __init__(self):
"""初始化市场模拟器"""
pass
def fetch_data(self, ticker, start_date, end_date, interval='1d'):
"""
从Yahoo Finance获取历史数据
参数:
ticker: str, 股票代码
start_date: str, 开始日期,格式 'YYYY-MM-DD'
end_date: str, 结束日期,格式 'YYYY-MM-DD'
interval: str, 时间间隔,如 '1d', '1h', '1m'
返回:
DataFrame, 历史价格数据
"""
data = yf.download(ticker, start=start_date, end=end_date, interval=interval)
return data
def generate_synthetic_data(self, days=252, volatility=0.01, trend=0.0001,
start_price=100, seed=None):
"""
生成合成的价格数据
参数:
days: int, 数据天数
volatility: float, 日波动率
trend: float, 日趋势因子
start_price: float, 起始价格
seed: int, 随机数种子
返回:
DataFrame, 合成的价格数据
"""
if seed is not None:
np.random.seed(seed)
# 生成日期序列
end_date = datetime.now()
start_date = end_date - timedelta(days=days)
date_range = pd.date_range(start=start_date, end=end_date, freq='B')
# 生成随机价格
returns = np.random.normal(trend, volatility, len(date_range))
price_series = start_price * (1 + returns).cumprod()
# 创建OHLCV数据
high = price_series * (1 + np.random.uniform(0, 0.005, len(date_range)))
low = price_series * (1 - np.random.uniform(0, 0.005, len(date_range)))
open_price = price_series * (1 + np.random.normal(0, 0.003, len(date_range)))
volume = np.random.lognormal(10, 1, len(date_range)).astype(int)
# 创建DataFrame
data = pd.DataFrame({
'open': open_price,
'high': high,
'low': low,
'close': price_series,
'volume': volume
}, index=date_range)
return data
def add_noise(self, data, noise_level=0.005):
"""
向价格数据添加噪声
参数:
data: DataFrame, 价格数据
noise_level: float, 噪声级别
返回:
DataFrame, 添加噪声后的数据
"""
noisy_data = data.copy()
# 为OHLC添加噪声
for col in ['open', 'high', 'low', 'close']:
if col in noisy_data.columns:
noise = np.random.normal(0, noise_level, len(noisy_data))
noisy_data[col] = noisy_data[col] * (1 + noise)
return noisy_data
def create_mean_reverting_series(self, days=252, mean=100, reversion_strength=0.05,
volatility=1.0, seed=None):
"""
创建均值回归价格序列
参数:
days: int, 数据天数
mean: float, 均值
reversion_strength: float, 回归强度
volatility: float, 波动率
seed: int, 随机数种子
返回:
DataFrame, 均值回归价格数据
"""
if seed is not None:
np.random.seed(seed)
# 生成日期序列
end_date = datetime.now()
start_date = end_date - timedelta(days=days)
date_range = pd.date_range(start=start_date, end=end_date, freq='B')
# 初始化价格
prices = np.zeros(len(date_range))
prices[0] = mean
# 生成均值回归序列
for i in range(1, len(date_range)):
# 均值回归项 + 随机波动
prices[i] = prices[i-1] + reversion_strength * (mean - prices[i-1]) + np.random.normal(0, volatility)
# 创建OHLCV数据
high = prices + np.random.uniform(0, 0.5, len(date_range))
low = prices - np.random.uniform(0, 0.5, len(date_range))
open_price = prices + np.random.normal(0, 0.3, len(date_range))
volume = np.random.lognormal(10, 1, len(date_range)).astype(int)
# 创建DataFrame
data = pd.DataFrame({
'open': open_price,
'high': high,
'low': low,
'close': prices,
'volume': volume
}, index=date_range)
return data
# performance_metrics.py
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
class ReturnMetrics:
"""计算收益相关指标的类"""
@staticmethod
def calculate_returns(equity_curve):
"""计算每日收益率"""
return equity_curve.pct_change().dropna()
@staticmethod
def calculate_cumulative_returns(returns):
"""计算累积收益率"""
return (1 + returns).cumprod() - 1
@staticmethod
def calculate_annual_return(returns):
"""计算年化收益率"""
trading_days = 252
total_days = len(returns)
total_return = (1 + returns).prod() - 1
return (1 + total_return) ** (trading_days / total_days) - 1
class RiskMetrics:
"""计算风险相关指标的类"""
@staticmethod
def calculate_volatility(returns):
"""计算波动率"""
return returns.std() * np.sqrt(252)
@staticmethod
def calculate_sharpe_ratio(returns, risk_free_rate=0.0):
"""计算夏普比率"""
excess_returns = returns - risk_free_rate / 252
return excess_returns.mean() / excess_returns.std() * np.sqrt(252)
@staticmethod
def calculate_sortino_ratio(returns, risk_free_rate=0.0):
"""计算索提诺比率"""
excess_returns = returns - risk_free_rate / 252
downside_returns = excess_returns[excess_returns < 0]
downside_deviation = downside_returns.std() * np.sqrt(252)
return (excess_returns.mean() * 252) / downside_deviation if downside_deviation != 0 else np.nan
@staticmethod
def calculate_max_drawdown(equity_curve):
"""计算最大回撤"""
rolling_max = equity_curve.cummax()
drawdown = (equity_curve - rolling_max) / rolling_max
max_drawdown = drawdown.min()
max_drawdown_idx = drawdown.idxmin()
peak_idx = equity_curve[:max_drawdown_idx].idxmax()
if max_drawdown_idx == equity_curve.index[-1]:
recovery_idx = None
recovery_time = None
else:
recovery_mask = equity_curve[max_drawdown_idx:] >= equity_curve[peak_idx]
recovery_idx = recovery_mask[recovery_mask].first_valid_index() if recovery_mask.any() else None
recovery_time = (recovery_idx - max_drawdown_idx).days if recovery_idx else None
drawdown_duration = (max_drawdown_idx - peak_idx).days
drawdown_info = {
'max_drawdown': max_drawdown,
'peak_date': peak_idx,
'trough_date': max_drawdown_idx,
'recovery_date': recovery_idx,
'drawdown_duration_days': drawdown_duration,
'recovery_duration_days': recovery_time
}
return max_drawdown, drawdown_info
@staticmethod
def calculate_calmar_ratio(annual_return, max_drawdown):
"""计算卡玛比率"""
return -annual_return / max_drawdown if max_drawdown != 0 else np.inf
@staticmethod
def calculate_omega_ratio(returns, threshold=0.0):
"""计算欧米茄比率"""
threshold_daily = threshold / 252
gain = returns[returns > threshold_daily].sum()
loss = -returns[returns < threshold_daily].sum()
return gain / loss if loss != 0 else np.inf
class TradeMetrics:
"""计算交易相关指标的类"""
@staticmethod
def calculate_win_rate(trades):
"""计算胜率"""
if not trades:
return 0.0
winning_trades = sum(1 for t in trades if t['profit'] > 0)
return winning_trades / len(trades)
@staticmethod
def calculate_profit_factor(trades):
"""计算盈利因子"""
total_profit = sum(t['profit'] for t in trades if t['profit'] > 0)
total_loss = -sum(t['profit'] for t in trades if t['profit'] < 0)
return total_profit / total_loss if total_loss != 0 else np.inf
@staticmethod
def calculate_average_profit(trades):
"""计算平均盈利"""
return np.mean([t['profit'] for t in trades]) if trades else 0
@staticmethod
def calculate_average_loss(trades):
"""计算平均亏损"""
losses = [t['profit'] for t in trades if t['profit'] < 0]
return np.mean(losses) if losses else 0
@staticmethod
def calculate_average_holding_period(trades):
"""计算平均持仓周期"""
if not trades or not any('holding_period' in t for t in trades):
return None
return np.mean([t['holding_period'] for t in trades if 'holding_period' in t])
class BenchmarkMetrics:
"""计算基准比较相关指标的类"""
@staticmethod
def calculate_beta(returns, benchmark_returns):
"""计算贝塔值"""
return returns.cov(benchmark_returns) / benchmark_returns.var()
@staticmethod
def calculate_alpha(annual_return, beta, benchmark_annual_return, risk_free_rate=0.0):
"""计算阿尔法值"""
return annual_return - risk_free_rate - beta * (benchmark_annual_return - risk_free_rate)
@staticmethod
def calculate_information_ratio(annual_return, benchmark_annual_return, returns, benchmark_returns):
"""计算信息比率"""
tracking_error = (returns - benchmark_returns).std() * np.sqrt(252)
return (annual_return - benchmark_annual_return) / tracking_error if tracking_error != 0 else np.nan
class VisualizationTools:
"""绘制各种分析图表的类"""
@staticmethod
def plot_equity_curve(equity_curve, benchmark=None):
"""绘制权益曲线"""
plt.figure(figsize=(12, 6))
plt.plot(equity_curve, label='Strategy', linewidth=2)
if benchmark is not None:
plt.plot(benchmark, label='Benchmark', linewidth=2, alpha=0.7)
plt.title('Equity Curve')
plt.xlabel('Date')
plt.ylabel('Value')
plt.legend()
plt.grid(True)
plt.tight_layout()
plt.show()
@staticmethod
def plot_drawdown_underwater(equity_curve):
"""绘制回撤水下图"""
rolling_max = equity_curve.cummax()
drawdown = (equity_curve - rolling_max) / rolling_max
plt.figure(figsize=(12, 6))
plt.plot(drawdown, color='red', linewidth=2)
plt.fill_between(drawdown.index, drawdown, 0, color='red', alpha=0.3)
plt.title('Drawdown Underwater Chart')
plt.xlabel('Date')
plt.ylabel('Drawdown')
plt.grid(True)
plt.tight_layout()
plt.show()
@staticmethod
def plot_monthly_returns_heatmap(returns):
"""绘制月度收益热图"""
monthly_returns = returns.resample('M').apply(lambda x: (1 + x).prod() - 1)
monthly_returns_matrix = monthly_returns.groupby([
monthly_returns.index.year.rename('Year'),
monthly_returns.index.month.rename('Month')
]).first().unstack('Month')
plt.figure(figsize=(12, 6))
heatmap = plt.pcolor(monthly_returns_matrix, cmap='RdYlGn', edgecolors='white', linewidths=1)
plt.colorbar(heatmap)
plt.yticks(np.arange(0.5, len(monthly_returns_matrix.index)), monthly_returns_matrix.index)
plt.xticks(np.arange(0.5, 13), ['Jan', 'Feb', 'Mar', 'Apr', 'May', 'Jun', 'Jul', 'Aug', 'Sep', 'Oct', 'Nov', 'Dec'])
for i in range(len(monthly_returns_matrix.index)):
for j in range(len(monthly_returns_matrix.columns)):
value = monthly_returns_matrix.iloc[i, j]
if not np.isnan(value):
text_color = 'white' if abs(value) > 0.05 else 'black'
plt.text(j + 0.5, i + 0.5, f'{value:.1%}',
ha='center', va='center', color=text_color)
plt.title('Monthly Returns (%)')
plt.tight_layout()
plt.show()
class PerformanceReport:
"""生成综合绩效报告的类"""
@staticmethod
def generate_report(equity_curve, trades=None, benchmark=None, risk_free_rate=0.0):
"""生成绩效报告"""
# 计算基础指标
returns = ReturnMetrics.calculate_returns(equity_curve)
cumulative_returns = ReturnMetrics.calculate_cumulative_returns(returns)
annual_return = ReturnMetrics.calculate_annual_return(returns)
# 计算风险指标
volatility = RiskMetrics.calculate_volatility(returns)
sharpe = RiskMetrics.calculate_sharpe_ratio(returns, risk_free_rate)
sortino = RiskMetrics.calculate_sortino_ratio(returns, risk_free_rate)
max_drawdown, drawdown_info = RiskMetrics.calculate_max_drawdown(equity_curve)
calmar = RiskMetrics.calculate_calmar_ratio(annual_return, max_drawdown)
omega = RiskMetrics.calculate_omega_ratio(returns, risk_free_rate)
# 构建报告
report = {
'General': {
'Start Date': equity_curve.index[0],
'End Date': equity_curve.index[-1],
'Duration': f"{(equity_curve.index[-1] - equity_curve.index[0]).days} days",
'Initial Equity': equity_curve.iloc[0],
'Final Equity': equity_curve.iloc[-1],
'Total Return': f"{cumulative_returns.iloc[-1]:.2%}",
'Annual Return': f"{annual_return:.2%}",
'Annual Volatility': f"{volatility:.2%}"
},
'Risk Metrics': {
'Sharpe Ratio': f"{sharpe:.2f}",
'Sortino Ratio': f"{sortino:.2f}",
'Calmar Ratio': f"{calmar:.2f}",
'Omega Ratio': f"{omega:.2f}",
'Maximum Drawdown': f"{max_drawdown:.2%}",
'Drawdown Start': drawdown_info['peak_date'],
'Drawdown End': drawdown_info['trough_date'],
'Drawdown Duration': f"{drawdown_info['drawdown_duration_days']} days",
'Recovery Duration': f"{drawdown_info['recovery_duration_days']} days" if drawdown_info['recovery_duration_days'] else "Not Recovered"
}
}
# 添加基准比较指标
if benchmark is not None:
benchmark_returns = ReturnMetrics.calculate_returns(benchmark)
benchmark_cumulative_returns = ReturnMetrics.calculate_cumulative_returns(benchmark_returns)
benchmark_annual_return = ReturnMetrics.calculate_annual_return(benchmark_returns)
beta = BenchmarkMetrics.calculate_beta(returns, benchmark_returns)
alpha = BenchmarkMetrics.calculate_alpha(annual_return, beta, benchmark_annual_return, risk_free_rate)
information_ratio = BenchmarkMetrics.calculate_information_ratio(
annual_return, benchmark_annual_return, returns, benchmark_returns)
report['Benchmark Comparison'] = {
'Beta': f"{beta:.2f}",
'Alpha (annualized)': f"{alpha:.2%}",
'Information Ratio': f"{information_ratio:.2f}",
'Benchmark Return': f"{benchmark_cumulative_returns.iloc[-1]:.2%}",
'Excess Return': f"{cumulative_returns.iloc[-1] - benchmark_cumulative_returns.iloc[-1]:.2%}"
}
# 添加交易统计
if trades:
win_rate = TradeMetrics.calculate_win_rate(trades)
avg_profit = TradeMetrics.calculate_average_profit(trades)
avg_loss = TradeMetrics.calculate_average_loss(trades)
profit_factor = TradeMetrics.calculate_profit_factor(trades)
avg_holding_period = TradeMetrics.calculate_average_holding_period(trades)
report['Trade Statistics'] = {
'Number of Trades': len(trades),
'Win Rate': f"{win_rate:.2%}",
'Average Profit': f"{avg_profit:.2f}",
'Average Loss': f"{avg_loss:.2f}",
'Profit Factor': f"{profit_factor:.2f}",
'Average Holding Period': f"{avg_holding_period:.1f} days" if avg_holding_period else "N/A"
}
return report
作为本项目的核心部分,您需要实现至少3种不同的统计模型用于市场分析和交易信号生成:
a. 移动平均交叉策略
这是一个经典的趋势跟踪策略。您需要实现:
参考实现框架:
# moving_average_strategy.py
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
class MovingAverageStrategy:
def __init__(self, data):
"""
初始化移动平均策略
参数:
data: DataFrame, 包含OHLCV数据
"""
self.data = data.copy()
self.signals = pd.Series(0, index=data.index)
def calculate_sma(self, window):
"""
计算简单移动平均
参数:
window: int, 窗口大小
返回:
Series, 移动平均序列
"""
# TODO: 实现简单移动平均计算
pass
def calculate_ema(self, window):
"""
计算指数移动平均
参数:
window: int, 窗口大小
返回:
Series, 移动平均序列
"""
# TODO: 实现指数移动平均计算
pass
def generate_signals(self, short_window, long_window, ma_type='sma'):
"""
基于移动平均交叉生成交易信号
参数:
short_window: int, 短期窗口大小
long_window: int, 长期窗口大小
ma_type: str, 移动平均类型 ('sma' 或 'ema')
返回:
Series, 交易信号 (1: 买入, -1: 卖出, 0: 持仓不变)
"""
# TODO: 实现双均线交叉信号生成
pass
def optimize_parameters(self, short_window_range, long_window_range, ma_type='sma'):
"""
优化移动平均窗口参数
参数:
short_window_range: range, 短期窗口范围
long_window_range: range, 长期窗口范围
ma_type: str, 移动平均类型 ('sma' 或 'ema')
返回:
tuple, 最优参数 (short_window, long_window)
"""
# TODO: 实现参数优化
pass
def plot_strategy(self, short_window, long_window, ma_type='sma'):
"""
绘制策略图表
参数:
short_window: int, 短期窗口大小
long_window: int, 长期窗口大小
ma_type: str, 移动平均类型 ('sma' 或 'ema')
"""
# TODO: 实现策略可视化
pass
b. 均值回归策略
此策略基于价格会回归到均值的假设。您需要实现:
参考实现框架:
# mean_reversion_strategy.py
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from statsmodels.regression.linear_model import OLS
import statsmodels.api as sm
class MeanReversionStrategy:
def __init__(self, data):
"""
初始化均值回归策略
参数:
data: DataFrame, 包含OHLCV数据
"""
self.data = data.copy()
self.signals = pd.Series(0, index=data.index)
def calculate_bollinger_bands(self, window=20, num_std=2):
"""
计算布林带指标
参数:
window: int, 窗口大小
num_std: int, 标准差倍数
返回:
DataFrame, 包含中轨、上轨和下轨
"""
# TODO: 实现布林带计算
pass
def calculate_zscore(self, window=20):
"""
计算z-score
参数:
window: int, 窗口大小
返回:
Series, z-score序列
"""
# TODO: 实现z-score计算
pass
def estimate_half_life(self, series):
"""
估计半衰期
参数:
series: Series, 价格序列
返回:
float, 半衰期
"""
# TODO: 实现半衰期估计
pass
def generate_bb_signals(self, window=20, num_std=2, threshold=0.9):
"""
基于布林带生成交易信号
参数:
window: int, 窗口大小
num_std: int, 标准差倍数
threshold: float, 触发阈值
返回:
Series, 交易信号 (1: 买入, -1: 卖出, 0: 持仓不变)
"""
# TODO: 实现布林带信号生成
pass
def generate_zscore_signals(self, window=20, entry_threshold=1.5, exit_threshold=0.5):
"""
基于z-score生成交易信号
参数:
window: int, 窗口大小
entry_threshold: float, 入场阈值
exit_threshold: float, 出场阈值
返回:
Series, 交易信号 (1: 买入, -1: 卖出, 0: 持仓不变)
"""
# TODO: 实现z-score信号生成
pass
def optimize_parameters(self, window_range, threshold_range, signal_type='bollinger'):
"""
优化策略参数
参数:
window_range: range, 窗口范围
threshold_range: range/list, 阈值范围
signal_type: str, 信号类型 ('bollinger' 或 'zscore')
返回:
tuple, 最优参数
"""
# TODO: 实现参数优化
pass
def plot_strategy(self, window=20, signal_type='bollinger', **kwargs):
"""
绘制策略图表
参数:
window: int, 窗口大小
signal_type: str, 信号类型 ('bollinger' 或 'zscore')
**kwargs: 其他参数
"""
# TODO: 实现策略可视化
pass
c. 动量策略
动量策略基于价格趋势延续的假设。您需要实现:
参考实现框架:
# momentum_strategy.py
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
class MomentumStrategy:
def __init__(self, data):
"""
初始化动量策略
参数:
data: DataFrame, 包含OHLCV数据
"""
self.data = data.copy()
self.signals = pd.Series(0, index=data.index)
def calculate_rsi(self, window=14):
"""
计算相对强弱指标(RSI)
参数:
window: int, 窗口大小
返回:
Series, RSI指标
"""
# TODO: 实现RSI计算
pass
def calculate_macd(self, fast_window=12, slow_window=26, signal_window=9):
"""
计算MACD指标
参数:
fast_window: int, 快线窗口
slow_window: int, 慢线窗口
signal_window: int, 信号线窗口
返回:
DataFrame, 包含MACD线、信号线和柱状图
"""
# TODO: 实现MACD计算
pass
def calculate_momentum(self, window=10):
"""
计算动量指标
参数:
window: int, 窗口大小
返回:
Series, 动量指标
"""
# TODO: 实现动量计算
pass
def generate_rsi_signals(self, window=14, oversold=30, overbought=70):
"""
基于RSI生成交易信号
参数:
window: int, 窗口大小
oversold: int, 超卖阈值
overbought: int, 超买阈值
返回:
Series, 交易信号 (1: 买入, -1: 卖出, 0: 持仓不变)
"""
# TODO: 实现RSI信号生成
pass
def generate_macd_signals(self, fast_window=12, slow_window=26, signal_window=9):
"""
基于MACD生成交易信号
参数:
fast_window: int, 快线窗口
slow_window: int, 慢线窗口
signal_window: int, 信号线窗口
返回:
Series, 交易信号 (1: 买入, -1: 卖出, 0: 持仓不变)
"""
# TODO: 实现MACD信号生成
pass
def generate_momentum_signals(self, window=10, threshold=0):
"""
基于动量生成交易信号
参数:
window: int, 窗口大小
threshold: float, 阈值
返回:
Series, 交易信号 (1: 买入, -1: 卖出, 0: 持仓不变)
"""
# TODO: 实现动量信号生成
pass
def optimize_parameters(self, indicator_type, **kwargs):
"""
优化策略参数
参数:
indicator_type: str, 指标类型 ('rsi', 'macd', 'momentum')
**kwargs: 参数范围
返回:
dict, 最优参数
"""
# TODO: 实现参数优化
pass
def plot_strategy(self, indicator_type, **kwargs):
"""
绘制策略图表
参数:
indicator_type: str, 指标类型 ('rsi', 'macd', 'momentum')
**kwargs: 其他参数
"""
# TODO: 实现策略可视化
pass
交易信号是策略决策的核心。您需要设计并实现:
# signal_generator.py
import pandas as pd
import numpy as np
class SignalGenerator:
def __init__(self, data):
"""
初始化信号生成器
参数:
data: DataFrame, 包含OHLCV数据及技术指标
"""
self.data = data.copy()
self.signals = pd.Series(0, index=data.index)
def combine_signals(self, signal_list, method='majority'):
"""
组合多个信号源
参数:
signal_list: list, 信号序列列表
method: str, 组合方法 ('majority', 'consensus', 'weighted')
返回:
Series, 组合后的信号
"""
# TODO: 实现信号组合逻辑
pass
def add_filters(self, signals, volatility_threshold=None, volume_threshold=None):
"""
添加过滤条件
参数:
signals: Series, 初始信号
volatility_threshold: float, 波动率阈值
volume_threshold: float, 成交量阈值
返回:
Series, 过滤后的信号
"""
# TODO: 实现信号过滤
pass
def add_confirmation(self, signals, confirmation_window=2):
"""
添加确认机制
参数:
signals: Series, 初始信号
confirmation_window: int, 确认窗口
返回:
Series, 确认后的信号
"""
# TODO: 实现信号确认
pass
def apply_time_filters(self, signals, time_of_day=None, day_of_week=None):
"""
应用时间过滤
参数:
signals: Series, 初始信号
time_of_day: tuple, 交易时间范围
day_of_week: list, 交易日列表
返回:
Series, 过滤后的信号
"""
# TODO: 实现时间过滤
pass
有效的仓位管理是策略风险控制的关键:
# position_manager.py
import pandas as pd
import numpy as np
class PositionManager:
def __init__(self, initial_capital=100000.0):
"""
初始化仓位管理器
参数:
initial_capital: float, 初始资金
"""
self.initial_capital = initial_capital
self.current_capital = initial_capital
self.current_position = 0
def fixed_size(self, signals, size):
"""
固定数量仓位管理
参数:
signals: Series, 交易信号
size: int, 固定仓位大小
返回:
Series, 目标仓位数量
"""
# TODO: 实现固定数量仓位管理
pass
def percentage_risk(self, signals, price, volatility, risk_pct=0.01):
"""
百分比风险仓位管理
参数:
signals: Series, 交易信号
price: Series, 价格序列
volatility: Series, 波动率序列
risk_pct: float, 风险比例
返回:
Series, 目标仓位数量
"""
# TODO: 实现百分比风险仓位管理
pass
def kelly_criterion(self, signals, win_rate, win_loss_ratio):
"""
凯利公式仓位管理
参数:
signals: Series, 交易信号
win_rate: float, 胜率
win_loss_ratio: float, 盈亏比
返回:
Series, 目标仓位数量
"""
# TODO: 实现凯利公式仓位管理
pass
def volatility_sizing(self, signals, price, historical_volatility, target_volatility=0.01):
"""
波动率调整仓位管理
参数:
signals: Series, 交易信号
price: Series, 价格序列
historical_volatility: Series, 历史波动率
target_volatility: float, 目标波动率
返回:
Series, 目标仓位数量
"""
# TODO: 实现波动率调整仓位管理
pass
风险控制是量化交易成功的保障:
# risk_manager.py
import pandas as pd
import numpy as np
class RiskManager:
def __init__(self, initial_capital=100000.0):
"""
初始化风险管理器
参数:
initial_capital: float, 初始资金
"""
self.initial_capital = initial_capital
self.current_capital = initial_capital
def apply_stop_loss(self, signals, prices, position_sizes, stop_loss_pct=0.02):
"""
应用止损
参数:
signals: Series, 交易信号
prices: Series, 价格序列
position_sizes: Series, 仓位大小
stop_loss_pct: float, 止损比例
返回:
Series, 调整后的信号
"""
# TODO: 实现止损机制
pass
def apply_take_profit(self, signals, prices, position_sizes, take_profit_pct=0.03):
"""
应用止盈
参数:
signals: Series, 交易信号
prices: Series, 价格序列
position_sizes: Series, 仓位大小
take_profit_pct: float, 止盈比例
返回:
Series, 调整后的信号
"""
# TODO: 实现止盈机制
pass
def apply_trailing_stop(self, signals, prices, position_sizes, trail_pct=0.015):
"""
应用追踪止损
参数:
signals: Series, 交易信号
prices: Series, 价格序列
position_sizes: Series, 仓位大小
trail_pct: float, 追踪比例
返回:
Series, 调整后的信号
"""
# TODO: 实现追踪止损
pass
def apply_max_drawdown_control(self, equity_curve, max_drawdown_limit=0.1):
"""
最大回撤控制
参数:
equity_curve: Series, 权益曲线
max_drawdown_limit: float, 最大回撤限制
返回:
bool, 是否继续交易
"""
# TODO: 实现最大回撤控制
pass
def apply_volatility_control(self, returns, target_volatility=0.01, lookback=20):
"""
波动率控制
参数:
returns: Series, 收益率序列
target_volatility: float, 目标波动率
lookback: int, 回溯期
返回:
float, 调整系数
"""
# TODO: 实现波动率控制
pass
参数优化可以提升策略性能:
# parameter_optimizer.py
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from itertools import product
from tqdm.notebook import tqdm
class ParameterOptimizer:
def __init__(self, data, backtester):
"""
初始化参数优化器
参数:
data: DataFrame, 包含OHLCV数据
backtester: Backtester, 回测引擎对象
"""
self.data = data.copy()
self.backtester = backtester
def grid_search(self, strategy_class, param_grid, metric='sharpe_ratio'):
"""
网格搜索优化
参数:
strategy_class: class, 策略类
param_grid: dict, 参数网格
metric: str, 优化指标
返回:
dict, 最优参数和性能指标
"""
# TODO: 实现网格搜索
pass
def walk_forward_validation(self, strategy_class, param_grid, train_ratio=0.7,
window_size=126, step_size=21, metric='sharpe_ratio'):
"""
向前推进验证
参数:
strategy_class: class, 策略类
param_grid: dict, 参数网格
train_ratio: float, 训练集比例
window_size: int, 窗口大小
step_size: int, 步长
metric: str, 优化指标
返回:
dict, 各窗口最优参数和性能指标
"""
# TODO: 实现向前推进验证
pass
def monte_carlo_optimization(self, strategy_class, param_ranges, num_samples=100, metric='sharpe_ratio'):
"""
蒙特卡洛优化
参数:
strategy_class: class, 策略类
param_ranges: dict, 参数范围
num_samples: int, 采样数量
metric: str, 优化指标
返回:
dict, 最优参数和性能指标
"""
# TODO: 实现蒙特卡洛优化
pass
def genetic_algorithm(self, strategy_class, param_ranges,
population_size=50, generations=10, metric='sharpe_ratio'):
"""
遗传算法优化
参数:
strategy_class: class, 策略类
param_ranges: dict, 参数范围
population_size: int, 种群大小
generations: int, 代数
metric: str, 优化指标
返回:
dict, 最优参数和性能指标
"""
# TODO: 实现遗传算法优化
pass
def plot_parameter_sensitivity(self, results, param_name):
"""
绘制参数敏感性分析图
参数:
results: list, 优化结果列表
param_name: str, 参数名称
"""
# TODO: 实现参数敏感性分析
pass
完成本项目后,您需要提交以下内容:
项目将从以下几个方面进行评估:
常见统计策略原理解释:
参数调优方法指南:
策略评估标准说明: