配对交易是一种市场中性策略,通过同时买入和卖出具有相关性的资产对,利用价格偏离均衡状态时的统计套利机会获利。本项目将引导学习者构建完整的配对交易系统,从资产选择到策略执行与调整。
配对交易核心概念
配对交易的核心思想是利用两个相关资产之间的价格关系,当价差偏离其长期均衡水平时进行交易。这种策略通常被归类为统计套利,其基础是价格均值回归的统计性质。
协整性理论
协整(Cointegration)是配对交易的理论基础,两个非平稳时间序列的线性组合可能是平稳的,这种关系被称为协整。检验协整性的主要方法包括:
价差建模方法
project/
├── data/ # 数据存储目录
├── models/ # 模型定义与训练
├── strategies/ # 交易策略实现
├── backtesting/ # 回测框架
├── monitoring/ # 策略监控与调整
├── utils/ # 工具函数
├── config.py # 配置文件
├── main.py # 主程序入口
└── requirements.txt # 依赖包列表
# utils/pair_selection.py
import numpy as np
import pandas as pd
from statsmodels.tsa.stattools import coint
import itertools
class PairSelector:
def __init__(self, price_data, lookback_period=252):
"""
初始化配对选择器
参数:
price_data (pd.DataFrame): 多股票的价格数据,索引为日期,列为股票代码
lookback_period (int): 回看期,用于计算协整性
"""
self.price_data = price_data
self.lookback_period = lookback_period
def calculate_correlation_matrix(self):
"""计算相关性矩阵"""
return self.price_data.pct_change().dropna().corr()
def find_cointegrated_pairs(self, pvalue_threshold=0.05):
"""
寻找协整的股票对
参数:
pvalue_threshold (float): 协整性检验的p值阈值
返回:
list: 协整的股票对列表及其p值
"""
n = len(self.price_data.columns)
pvalue_matrix = np.ones((n, n))
pairs = []
# 获取最近的数据用于协整性检验
recent_data = self.price_data.iloc[-self.lookback_period:]
for i, j in itertools.combinations(range(n), 2):
stock1 = self.price_data.columns[i]
stock2 = self.price_data.columns[j]
# 执行协整性检验
result = coint(recent_data[stock1], recent_data[stock2])
pvalue = result[1]
pvalue_matrix[i, j] = pvalue
if pvalue < pvalue_threshold:
pairs.append((stock1, stock2, pvalue))
return sorted(pairs, key=lambda x: x[2])
def filter_pairs_by_correlation(self, pairs, corr_threshold=0.7):
"""使用相关性进一步筛选配对"""
corr_matrix = self.calculate_correlation_matrix()
filtered_pairs = []
for stock1, stock2, pvalue in pairs:
correlation = corr_matrix.loc[stock1, stock2]
if abs(correlation) > corr_threshold:
filtered_pairs.append((stock1, stock2, pvalue, correlation))
return sorted(filtered_pairs, key=lambda x: x[2])
# 待实现:配对筛选算法
# TODO: 实现基于行业分类、基本面相似性等的更高级筛选方法
# utils/cointegration.py
import numpy as np
import pandas as pd
import statsmodels.api as sm
from statsmodels.tsa.stattools import adfuller, coint
import matplotlib.pyplot as plt
class CointegrationTest:
def __init__(self, series1, series2):
"""
初始化协整性测试
参数:
series1 (pd.Series): 第一个价格序列
series2 (pd.Series): 第二个价格序列
"""
self.series1 = series1
self.series2 = series2
def engle_granger_test(self):
"""
执行Engle-Granger两步法
返回:
dict: 包含测试结果、p值和临界值
"""
# 步骤1: 估计协整方程
X = sm.add_constant(self.series1)
model = sm.OLS(self.series2, X).fit()
beta = model.params[1]
# 步骤2: 检验残差的平稳性
spread = self.series2 - beta * self.series1
adf_result = adfuller(spread)
return {
'hedge_ratio': beta,
'adf_statistic': adf_result[0],
'pvalue': adf_result[1],
'critical_values': adf_result[4],
'spread': spread
}
def johansen_test(self):
"""
执行Johansen检验
返回:
dict: 检验结果
"""
# 组合数据
data = pd.concat([self.series1, self.series2], axis=1)
# 执行Johansen检验
# 注意:对于配对交易,通常我们关注是否有至少一个协整向量
johansen_result = sm.tsa.vector_ar.vecm.coint_johansen(data, det_order=0, k_ar_diff=1)
return {
'trace_statistic': johansen_result.lr1,
'critical_values': johansen_result.cvt
}
def plot_spread(self, result):
"""
绘制价差图
参数:
result (dict): engle_granger_test的返回值
"""
plt.figure(figsize=(12, 6))
plt.plot(result['spread'])
plt.axhline(y=result['spread'].mean(), color='r', linestyle='-')
plt.axhline(y=result['spread'].mean() + 2*result['spread'].std(), color='g', linestyle='--')
plt.axhline(y=result['spread'].mean() - 2*result['spread'].std(), color='g', linestyle='--')
plt.title('Price Spread with Mean and 2-sigma Bands')
plt.ylabel('Spread')
plt.grid(True)
plt.tight_layout()
return plt
# strategies/pair_trading.py
import numpy as np
import pandas as pd
class PairTradingStrategy:
def __init__(self, stock1, stock2, hedge_ratio, entry_z_score=2.0, exit_z_score=0.0,
stop_loss_z_score=3.5, lookback_period=20):
"""
初始化配对交易策略
参数:
stock1 (str): 第一个股票代码
stock2 (str): 第二个股票代码
hedge_ratio (float): 对冲比率,用于计算价差
entry_z_score (float): 入场的z值阈值
exit_z_score (float): 出场的z值阈值
stop_loss_z_score (float): 止损的z值阈值
lookback_period (int): 计算均值和标准差的回看期
"""
self.stock1 = stock1
self.stock2 = stock2
self.hedge_ratio = hedge_ratio
self.entry_z_score = entry_z_score
self.exit_z_score = exit_z_score
self.stop_loss_z_score = stop_loss_z_score
self.lookback_period = lookback_period
# 状态变量
self.position = 0 # 1: 做多stock1,做空stock2; -1: 做空stock1,做多stock2; 0: 空仓
self.entry_price_spread = 0
self.trade_history = []
def calculate_spread(self, price1, price2):
"""计算价差"""
return price2 - self.hedge_ratio * price1
def calculate_zscore(self, spread, historical_spreads):
"""计算当前价差的z值"""
mean = np.mean(historical_spreads)
std = np.std(historical_spreads)
if std == 0:
return 0
return (spread - mean) / std
def generate_signals(self, price_data):
"""
生成交易信号
参数:
price_data (pd.DataFrame): 包含两只股票价格的数据框,索引为日期
返回:
pd.DataFrame: 包含信号和仓位的数据框
"""
# 确保数据框包含两只股票的数据
if self.stock1 not in price_data.columns or self.stock2 not in price_data.columns:
raise ValueError(f"价格数据中缺少股票: {self.stock1} 或 {self.stock2}")
# 创建结果数据框
results = pd.DataFrame(index=price_data.index)
results['spread'] = self.calculate_spread(
price_data[self.stock1], price_data[self.stock2]
)
# 计算移动平均和标准差
results['moving_avg'] = results['spread'].rolling(window=self.lookback_period).mean()
results['moving_std'] = results['spread'].rolling(window=self.lookback_period).std()
# 计算z值
results['zscore'] = (results['spread'] - results['moving_avg']) / results['moving_std']
# 初始化信号和仓位列
results['signal'] = 0
results['position'] = 0
# 生成交易信号
for i in range(self.lookback_period, len(results)):
current_zscore = results['zscore'].iloc[i]
previous_position = results['position'].iloc[i-1] if i > 0 else 0
# 默认保持前一个仓位
results.loc[results.index[i], 'position'] = previous_position
# 空仓时的入场信号
if previous_position == 0:
if current_zscore > self.entry_z_score:
# 做空stock1,做多stock2
results.loc[results.index[i], 'signal'] = -1
results.loc[results.index[i], 'position'] = -1
self.entry_price_spread = results['spread'].iloc[i]
elif current_zscore < -self.entry_z_score:
# 做多stock1,做空stock2
results.loc[results.index[i], 'signal'] = 1
results.loc[results.index[i], 'position'] = 1
self.entry_price_spread = results['spread'].iloc[i]
# 持仓时的出场或止损信号
elif previous_position == 1: # 做多stock1,做空stock2
if current_zscore >= -self.exit_z_score or current_zscore <= -self.stop_loss_z_score:
# 平仓
results.loc[results.index[i], 'signal'] = -1
results.loc[results.index[i], 'position'] = 0
elif previous_position == -1: # 做空stock1,做多stock2
if current_zscore <= self.exit_z_score or current_zscore >= self.stop_loss_z_score:
# 平仓
results.loc[results.index[i], 'signal'] = 1
results.loc[results.index[i], 'position'] = 0
return results
# 待实现:价差建模与预测模块、交易信号触发机制、动态止损策略
# TODO: 实现价差动态建模和预测
# TODO: 设计更复杂的交易信号生成机制
# TODO: 实现动态止损和策略参数自适应
# backtesting/backtest_engine.py
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from datetime import datetime
class PairTradingBacktester:
def __init__(self, price_data, strategy, initial_capital=100000.0,
commission=0.001, slippage=0.001):
"""
初始化回测引擎
参数:
price_data (pd.DataFrame): 价格数据,索引为日期,列为股票代码
strategy (PairTradingStrategy): 配对交易策略实例
initial_capital (float): 初始资金
commission (float): 交易佣金率
slippage (float): 滑点率
"""
self.price_data = price_data
self.strategy = strategy
self.initial_capital = initial_capital
self.current_capital = initial_capital
self.commission = commission
self.slippage = slippage
# 回测结果
self.portfolio_value = []
self.trades = []
self.positions = {
strategy.stock1: 0,
strategy.stock2: 0
}
def run_backtest(self):
"""
运行回测
返回:
pd.DataFrame: 回测结果
"""
# 生成交易信号
signals = self.strategy.generate_signals(self.price_data)
# 初始化回测结果
results = pd.DataFrame(index=signals.index)
results['capital'] = self.initial_capital
results[f'position_{self.strategy.stock1}'] = 0
results[f'position_{self.strategy.stock2}'] = 0
results['portfolio_value'] = self.initial_capital
# 运行回测
for i in range(1, len(signals)):
current_date = signals.index[i]
previous_date = signals.index[i-1]
# 获取当前价格
price1 = self.price_data[self.strategy.stock1].loc[current_date]
price2 = self.price_data[self.strategy.stock2].loc[current_date]
# 获取前一日持仓
position1 = results[f'position_{self.strategy.stock1}'].loc[previous_date]
position2 = results[f'position_{self.strategy.stock2}'].loc[previous_date]
# 更新持仓价值
position_value1 = position1 * price1
position_value2 = position2 * price2
capital = results['capital'].loc[previous_date]
# 执行交易
if signals['signal'].iloc[i] != 0:
# 平仓原有持仓
if position1 != 0 or position2 != 0:
# 计算交易费用
close_commission = (abs(position_value1) + abs(position_value2)) * self.commission
capital += position_value1 + position_value2 - close_commission
# 记录交易
self.trades.append({
'date': current_date,
'action': 'CLOSE',
f'{self.strategy.stock1}': -position1,
f'{self.strategy.stock2}': -position2,
'price1': price1,
'price2': price2,
'commission': close_commission
})
position1 = 0
position2 = 0
# 开新仓
if signals['position'].iloc[i] != 0:
signal = signals['position'].iloc[i]
# 确定分配资金
allocation = capital * 0.9 # 使用90%的资金开仓
allocation_per_stock = allocation / 2
# 计算股数,确保对冲比例正确
qty1 = int(allocation_per_stock / price1)
qty2 = int(allocation_per_stock / price2)
# 调整为对冲比例
if signal == 1: # 做多stock1,做空stock2
position1 = qty1
position2 = -int(qty1 * self.strategy.hedge_ratio * price1 / price2)
else: # 做空stock1,做多stock2
position1 = -qty1
position2 = int(qty1 * self.strategy.hedge_ratio * price1 / price2)
# 计算交易费用
open_commission = (abs(position1 * price1) + abs(position2 * price2)) * self.commission
capital -= open_commission
# 记录交易
self.trades.append({
'date': current_date,
'action': 'OPEN',
f'{self.strategy.stock1}': position1,
f'{self.strategy.stock2}': position2,
'price1': price1,
'price2': price2,
'commission': open_commission
})
# 更新结果
results[f'position_{self.strategy.stock1}'].loc[current_date] = position1
results[f'position_{self.strategy.stock2}'].loc[current_date] = position2
results['capital'].loc[current_date] = capital
results['portfolio_value'].loc[current_date] = capital + position1 * price1 + position2 * price2
# 计算额外的指标
results['returns'] = results['portfolio_value'].pct_change()
results['cumulative_returns'] = (1 + results['returns']).cumprod() - 1
return results
def calculate_performance_metrics(self, results):
"""
计算绩效指标
参数:
results (pd.DataFrame): 回测结果
返回:
dict: 绩效指标
"""
returns = results['returns'].dropna()
# 计算年化收益率
total_days = (results.index[-1] - results.index[0]).days
annual_return = (1 + results['cumulative_returns'].iloc[-1]) ** (365 / total_days) - 1
# 计算最大回撤
cumulative_returns = results['cumulative_returns']
running_max = cumulative_returns.cummax()
drawdown = (cumulative_returns - running_max) / running_max
max_drawdown = drawdown.min()
# 计算夏普比率
risk_free_rate = 0.0 # 假设无风险利率为0
sharpe_ratio = (returns.mean() * 252 - risk_free_rate) / (returns.std() * np.sqrt(252))
# 获取交易统计
total_trades = len(self.trades)
profit_trades = sum(1 for t in self.trades if t['action'] == 'CLOSE' and
(t[f'{self.strategy.stock1}'] * t['price1'] +
t[f'{self.strategy.stock2}'] * t['price2']) > 0)
win_rate = profit_trades / total_trades if total_trades > 0 else 0
return {
'initial_capital': self.initial_capital,
'final_portfolio_value': results['portfolio_value'].iloc[-1],
'total_return': results['cumulative_returns'].iloc[-1],
'annual_return': annual_return,
'sharpe_ratio': sharpe_ratio,
'max_drawdown': max_drawdown,
'total_trades': total_trades,
'win_rate': win_rate
}
def plot_results(self, results):
"""
绘制回测结果
参数:
results (pd.DataFrame): 回测结果
"""
fig, axes = plt.subplots(3, 1, figsize=(12, 15), sharex=True)
# 绘制资产组合价值
axes[0].plot(results['portfolio_value'])
axes[0].set_title('Portfolio Value')
axes[0].set_ylabel('Value')
axes[0].grid(True)
# 绘制配对价差和z-score
ax_twin = axes[1].twinx()
axes[1].plot(self.strategy.generate_signals(self.price_data)['spread'], 'b-', label='Spread')
ax_twin.plot(self.strategy.generate_signals(self.price_data)['zscore'], 'r-', label='Z-Score')
axes[1].set_title('Spread and Z-Score')
axes[1].set_ylabel('Spread', color='b')
ax_twin.set_ylabel('Z-Score', color='r')
axes[1].grid(True)
# 绘制仓位
axes[2].plot(results[f'position_{self.strategy.stock1}'], 'g-', label=self.strategy.stock1)
axes[2].plot(results[f'position_{self.strategy.stock2}'], 'm-', label=self.strategy.stock2)
axes[2].set_title('Positions')
axes[2].set_ylabel('Quantity')
axes[2].set_xlabel('Date')
axes[2].legend()
axes[2].grid(True)
plt.tight_layout()
return fig
在 utils/pair_selection.py
中添加更高级的配对选择方法,包括:
参考框架:
import numpy as np
import pandas as pd
import statsmodels.api as sm
from statsmodels.tsa.stattools import coint, adfuller
from itertools import combinations
class PairSelector:
def __init__(self, price_data):
"""
初始化配对选择器
参数:
price_data (DataFrame): 包含股票价格的数据框,索引为日期,列为股票代码
"""
self.price_data = price_data
self.pairs_results = {}
def select_pairs_by_industry(self, industry_data, min_pairs_per_industry=5, p_value_threshold=0.05):
"""
基于行业分类筛选配对
参数:
industry_data (dict): 股票代码到行业的映射字典
min_pairs_per_industry (int): 每个行业至少保留的配对数量
p_value_threshold (float): 协整检验的p值阈值
返回:
dict: 行业到配对列表的映射
"""
# 按行业分组
industry_groups = {}
for stock, industry in industry_data.items():
if industry not in industry_groups:
industry_groups[industry] = []
industry_groups[industry].append(stock)
# 在每个行业内寻找协整对
industry_pairs = {}
for industry, stocks in industry_groups.items():
if len(stocks) < 2:
continue
# 只考虑同行业内的股票
stock_prices = self.price_data[stocks].dropna(axis=1)
if len(stock_prices.columns) < 2:
continue
# 调用协整检验函数
pairs = self._find_cointegrated_pairs_in_subset(stock_prices, p_value_threshold)
# 如果找到了足够的配对,则添加到结果中
if len(pairs) >= min_pairs_per_industry:
industry_pairs[industry] = pairs
return industry_pairs
def _find_cointegrated_pairs_in_subset(self, price_subset, p_value_threshold=0.05):
"""
在股票子集中寻找协整对
参数:
price_subset (DataFrame): 股票价格子集
p_value_threshold (float): 协整检验的p值阈值
返回:
list: 协整对列表,每个元素为(股票1, 股票2, p值, 协整系数)
"""
# TODO: 实现协整检验逻辑
# 1. 遍历所有可能的股票对
# 2. 对每对股票进行协整检验
# 3. 保存通过检验的配对及其协整系数
pass
def filter_by_fundamentals(self, pairs, fundamental_data, similarity_threshold=0.2):
"""
基于基本面数据筛选配对
参数:
pairs (list): 配对列表,每个元素为(股票1, 股票2, p值, 协整系数)
fundamental_data (DataFrame): 基本面数据,索引为股票代码,列为各基本面指标
similarity_threshold (float): 基本面相似性阈值
返回:
list: 经过基本面筛选后的配对列表
"""
# TODO: 实现基于基本面相似性的筛选
# 1. 对每个配对计算基本面指标的相似度
# 2. 筛选相似度满足条件的配对
# 3. 可考虑市值、市盈率、市净率等指标的相对差异
pass
def filter_by_liquidity(self, pairs, volume_data, min_volume=100000, min_days_active=0.9):
"""
基于交易量筛选配对
参数:
pairs (list): 配对列表
volume_data (DataFrame): 交易量数据,索引为日期,列为股票代码
min_volume (float): 最低平均日交易量
min_days_active (float): 最低交易活跃度(有交易的天数比例)
返回:
list: 经过流动性筛选后的配对列表
"""
# TODO: 实现基于流动性的筛选
# 1. 计算每支股票的平均交易量
# 2. 计算每支股票的交易活跃度(有交易的天数比例)
# 3. 筛选满足条件的配对
pass
def multi_period_cointegration(self, pairs, periods=[30, 60, 120, 252], consistency_threshold=0.7):
"""
多周期协整性验证
参数:
pairs (list): 配对列表
periods (list): 要检验的时间窗口长度列表(交易日)
consistency_threshold (float): 多周期一致性阈值
返回:
list: 经过多周期验证的配对列表,每个元素为(股票1, 股票2, 稳定性得分, 平均协整系数)
"""
# TODO: 在多个时间窗口进行协整检验,评估稳定性
# 1. 对每个配对,在不同的时间窗口进行协整检验
# 2. 计算配对在各个时间窗口的稳定性得分
# 3. 筛选多周期都稳定的配对
pass
def calculate_spread(self, stock1, stock2, window=252):
"""
计算两支股票的价差序列
参数:
stock1 (str): 第一支股票代码
stock2 (str): 第二支股票代码
window (int): 用于计算的历史窗口长度
返回:
Series: 标准化后的价差序列
"""
# TODO: 实现价差计算逻辑
# 1. 获取两支股票的历史价格
# 2. 计算线性回归系数
# 3. 计算并标准化价差序列
pass
def rank_pairs(self, pairs, metrics=None):
"""
对筛选出的配对进行排序
参数:
pairs (list): 配对列表
metrics (dict): 排序指标权重
返回:
list: 排序后的配对列表
"""
# TODO: 实现配对排序逻辑
# 1. 为每个配对计算多个评分指标(如协整性强度、价差均值回归速度等)
# 2. 根据指标权重计算综合得分
# 3. 按得分排序返回配对
pass
def visualize_pair(self, stock1, stock2, window=252):
"""
可视化配对的价格和价差
参数:
stock1 (str): 第一支股票代码
stock2 (str): 第二支股票代码
window (int): 用于可视化的历史窗口长度
"""
# TODO: 实现配对可视化逻辑
# 1. 绘制两支股票的价格走势
# 2. 绘制标准化价差及其上下界
# 3. 标记潜在交易信号
pass
def test_pair_stability(price_data, stock1, stock2, test_windows=[30, 60, 90, 120]):
"""
测试配对的稳定性
参数:
price_data (DataFrame): 股票价格数据
stock1 (str): 第一支股票代码
stock2 (str): 第二支股票代码
test_windows (list): 测试窗口列表
返回:
dict: 稳定性测试结果
"""
# TODO: 实现配对稳定性测试逻辑
# 1. 在不同窗口下检验配对的协整性
# 2. 计算半衰期和平稳性指标
# 3. 返回综合评估结果
pass
在 strategies/spread_modeling.py
中实现价差建模和预测功能:
# strategies/spread_modeling.py
import numpy as np
import pandas as pd
from statsmodels.tsa.arima.model import ARIMA
from pykalman import KalmanFilter
from sklearn.linear_model import LinearRegression
class SpreadModel:
def __init__(self, model_type='kalman'):
"""
初始化价差模型
参数:
model_type (str): 模型类型,'kalman'、'arima'或'regression'
"""
self.model_type = model_type
self.model = None
self.hedge_ratio = None
def fit(self, series1, series2, **kwargs):
"""
拟合价差模型
参数:
series1 (pd.Series): 第一个资产价格序列
series2 (pd.Series): 第二个资产价格序列
**kwargs: 模型特定参数
返回:
self: 模型实例
"""
if self.model_type == 'kalman':
return self._fit_kalman(series1, series2, **kwargs)
elif self.model_type == 'arima':
return self._fit_arima(series1, series2, **kwargs)
elif self.model_type == 'regression':
return self._fit_regression(series1, series2, **kwargs)
else:
raise ValueError(f"不支持的模型类型: {self.model_type}")
def _fit_kalman(self, series1, series2, **kwargs):
"""使用卡尔曼滤波拟合动态对冲比率"""
# TODO: 实现卡尔曼滤波模型
pass
def _fit_arima(self, series1, series2, **kwargs):
"""先计算对冲比率,然后使用ARIMA模型拟合价差"""
# TODO: 实现ARIMA建模
pass
def _fit_regression(self, series1, series2, **kwargs):
"""使用滚动窗口线性回归拟合动态对冲比率"""
# TODO: 实现滚动回归模型
pass
def predict(self, series1, series2, horizon=1):
"""
预测未来价差
参数:
series1 (pd.Series): 第一个资产最新价格序列
series2 (pd.Series): 第二个资产最新价格序列
horizon (int): 预测期数
返回:
pd.Series: 预测的价差序列
"""
# TODO: 实现预测逻辑
pass
def calculate_spread(self, series1, series2):
"""
计算当前价差
参数:
series1 (pd.Series): 第一个资产价格序列
series2 (pd.Series): 第二个资产价格序列
返回:
pd.Series: 价差序列
"""
# TODO: 实现价差计算
pass
def get_hedge_ratio(self):
"""获取当前对冲比率"""
return self.hedge_ratio
扩展 strategies/pair_trading.py
中的信号生成方法:
import numpy as np
import pandas as pd
from scipy import stats
import statsmodels.api as sm
import talib as ta
from sklearn.linear_model import LinearRegression
class PairTradingSignals:
def __init__(self, z_entry=2.0, z_exit=0.5, max_active_positions=3, stop_loss_z=4.0):
"""
初始化配对交易信号生成器
参数:
z_entry (float): 入场z分数阈值
z_exit (float): 出场z分数阈值
max_active_positions (int): 最大活跃仓位数
stop_loss_z (float): 止损z分数阈值
"""
self.z_entry = z_entry
self.z_exit = z_exit
self.max_active_positions = max_active_positions
self.stop_loss_z = stop_loss_z
self.active_pairs = {}
def generate_basic_signals(self, spread, stock1, stock2):
"""
生成基本的配对交易信号
参数:
spread (pd.Series): 标准化价差序列
stock1 (str): 第一支股票代码
stock2 (str): 第二支股票代码
返回:
pd.DataFrame: 包含信号的数据框
"""
signals = pd.DataFrame(index=spread.index)
signals['zscore'] = spread
signals['signal'] = 0
# 基本信号逻辑
signals.loc[spread > self.z_entry, 'signal'] = -1 # 做空价差
signals.loc[spread < -self.z_entry, 'signal'] = 1 # 做多价差
signals.loc[(spread < self.z_exit) & (spread > -self.z_exit), 'signal'] = 0 # 平仓
# 止损逻辑
signals.loc[spread > self.stop_loss_z, 'signal'] = 0 # 止损平仓
signals.loc[spread < -self.stop_loss_z, 'signal'] = 0 # 止损平仓
# 转换信号为持仓
signals['position'] = signals['signal'].shift(1).fillna(0)
return signals
def generate_advanced_signals(self, price_data, spread_model, lookback=60, risk_factor=0.02):
"""
生成基于高级模型的交易信号
参数:
price_data (pd.DataFrame): 包含两只股票价格的数据框
spread_model (SpreadModel): 价差模型
lookback (int): 技术指标计算的回看期
risk_factor (float): 风险因子
返回:
pd.DataFrame: 包含信号和仓位的数据框
"""
# TODO: 实现基于预测模型的信号生成
# 1. 使用spread_model预测价差
# 2. 基于预测价差生成信号
# 3. 结合技术指标如RSI、MACD等优化信号
# 获取股票代码
stock1, stock2 = price_data.columns
# 计算实际价差和预测价差
# 计算技术指标
# 结合预测和技术指标生成信号
# 应用自适应阈值
# 更新持仓
# 添加波动率调整的仓位大小
pass
def adaptive_threshold(self, spread_data, lookback=120, quantile=0.9, min_threshold=1.5, max_threshold=3.0):
"""
计算自适应入场阈值
参数:
spread_data (pd.Series): 价差数据
lookback (int): 回看期
quantile (float): 分位数
min_threshold (float): 最小阈值
max_threshold (float): 最大阈值
返回:
float: 自适应阈值
"""
# TODO: 实现基于历史波动率的自适应阈值
# 计算滚动波动率
# 确定适当的分位数阈值
# 在最小和最大阈值范围内限制结果
pass
def volatility_adjusted_position_sizing(self, zscore, price1, price2, capital,
vol_window=30, max_risk_per_trade=0.02):
"""
基于波动率的仓位调整
参数:
zscore (float): 当前价差的z分数
price1 (float): 第一个资产价格
price2 (float): 第二个资产价格
capital (float): 可用资金
vol_window (int): 波动率计算窗口
max_risk_per_trade (float): 每笔交易最大风险比例
返回:
tuple: (仓位1, 仓位2)
"""
# TODO: 实现基于波动率的仓位大小计算
# 计算相对波动率
# 确定对冲比例
# 计算仓位大小,考虑波动率和风险限制
# 确保仓位不超过可用资金
pass
def add_stop_loss_signals(self, signals, spread, trailing_pct=0.3, time_stop_days=10):
"""
添加止损信号
参数:
signals (pd.DataFrame): 原始信号数据框
spread (pd.Series): 标准化价差序列
trailing_pct (float): 追踪止损百分比
time_stop_days (int): 时间止损天数
返回:
pd.DataFrame: 包含止损信号的数据框
"""
# TODO: 实现多种止损策略
# 1. 基于Z分数的固定止损
# 2. 基于追踪止损
# 3. 基于时间止损
# 4. 基于利润保护止损
pass
def calculate_trade_quality(self, spread, entry_point, current_point, position):
"""
计算交易质量得分
参数:
spread (pd.Series): 价差序列
entry_point (int): 入场点索引
current_point (int): 当前点索引
position (int): 仓位方向 (1:多, -1:空)
返回:
float: 交易质量得分
"""
# TODO: 实现交易质量评估
# 计算价差变动
# 计算均值回归速度
# 考虑价差波动
# 返回综合得分
pass
def integrate_market_regime(self, signals, market_data, vix_threshold=25):
"""
根据市场环境调整交易信号
参数:
signals (pd.DataFrame): 信号数据框
market_data (pd.DataFrame): 市场数据,包含指数和VIX
vix_threshold (float): VIX阈值
返回:
pd.DataFrame: 调整后的信号数据框
"""
# TODO: 实现市场环境感知的信号调整
# 识别市场状态(正常/波动/危机)
# 根据市场状态调整入场阈值
# 根据市场状态调整仓位大小
pass
def filter_trending_pairs(self, spread, trend_window=20, trend_threshold=0.6):
"""
过滤有明显趋势的价差
参数:
spread (pd.Series): 价差序列
trend_window (int): 趋势判断窗口
trend_threshold (float): 趋势强度阈值
返回:
bool: 是否应该过滤该配对
"""
# TODO: 实现趋势检测和过滤
# 计算趋势强度指标
# 判断是否存在显著趋势
pass
def calculate_expected_holding_period(self, spread, z_score):
"""
估计预期持仓时间
参数:
spread (pd.Series): 价差序列
z_score (float): 当前z分数
返回:
int: 预期持仓天数
"""
# TODO: 实现持仓时间预测
# 计算半衰期
# 基于当前偏离程度预测回归时间
pass
def generate_signal_confidence(self, price_data, spread, window=60):
"""
计算信号置信度
参数:
price_data (pd.DataFrame): 价格数据
spread (pd.Series): 价差序列
window (int): 计算窗口
返回:
pd.Series: 信号置信度序列
"""
# TODO: 实现信号置信度计算
# 计算价差统计特性
# 评估协整稳定性
# 考虑技术指标一致性
# 综合评分
pass
在现有策略基础上增加动态风险管理功能:
class DynamicStopLossManager:
def __init__(self, initial_stop_loss=3.0, sensitivity=0.5):
"""
初始化动态止损管理器
参数:
initial_stop_loss (float): 初始止损z分数阈值
sensitivity (float): 止损调整敏感度
"""
self.current_stop_loss = initial_stop_loss
self.sensitivity = sensitivity
self.trade_outcomes = []
def update_stop_loss(self, trade_result):
"""
基于交易结果更新止损水平
参数:
trade_result (dict): 包含交易结果信息的字典
返回:
float: 更新后的止损水平
"""
# TODO: 实现基于交易结果的动态止损策略
pass
def calculate_time_based_stop_loss(self, entry_time, current_time, max_holding_period=10):
"""
基于持仓时间的止损策略
参数:
entry_time (datetime): 入场时间
current_time (datetime): 当前时间
max_holding_period (int): 最大持仓天数
返回:
bool: 是否应该止损
"""
# TODO: 实现基于时间的止损
pass
def calculate_profit_target(self, entry_spread, spread_volatility):
"""
计算利润目标
参数:
entry_spread (float): 入场时的价差
spread_volatility (float): 价差波动率
返回:
float: 利润目标
"""
# TODO: 实现利润目标计算
pass
创建策略监控和动态调整模块:
# monitoring/strategy_monitor.py
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import streamlit as st
class PairStrategyMonitor:
def __init__(self, strategy, backtest_results=None):
"""
初始化策略监控器
参数:
strategy (PairTradingStrategy): 配对交易策略
backtest_results (pd.DataFrame): 回测结果
"""
self.strategy = strategy
self.backtest_results = backtest_results
self.live_performance = []
self.parameter_history = []
def record_trade(self, trade_info):
"""记录交易"""
self.live_performance.append(trade_info)
def update_parameters(self, params):
"""记录参数更新"""
self.parameter_history.append({
'timestamp': pd.Timestamp.now(),
'params': params
})
def analyze_performance(self):
"""分析策略表现"""
# TODO: 实现性能分析
pass
def detect_regime_change(self, price_data, lookback=60):
"""
检测市场状态变化
参数:
price_data (pd.DataFrame): 价格数据
lookback (int): 回看期
返回:
bool: 是否检测到状态变化
"""
# TODO: 实现市场状态变化检测
pass
def optimize_parameters(self, price_data):
"""
基于最新市场数据优化策略参数
参数:
price_data (pd.DataFrame): 最新价格数据
返回:
dict: 优化后的参数
"""
# TODO: 实现参数优化
pass
def visualize_dashboard(self):
"""创建Streamlit可视化仪表板"""
st.title("配对交易策略监控仪表板")
# 显示策略概况
st.header("策略概况")
st.write(f"资产对: {self.strategy.stock1} - {self.strategy.stock2}")
st.write(f"对冲比率: {self.strategy.hedge_ratio:.4f}")
# 回测结果
if self.backtest_results is not None:
st.header("回测绩效")
# TODO: 添加回测结果可视化
# 实时性能监控
st.header("实时性能")
# TODO: 添加实时性能可视化
# 参数历史
st.header("参数调整历史")
# TODO: 显示参数调整历史
# 市场状态分析
st.header("市场状态分析")
# TODO: 添加市场状态可视化
创建主程序文件 main.py
,整合所有组件:
# main.py
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import yfinance as yf
from datetime import datetime, timedelta
from utils.pair_selection import PairSelector
from utils.cointegration import CointegrationTest
from strategies.pair_trading import PairTradingStrategy
from backtesting.backtest_engine import PairTradingBacktester
def fetch_data(symbols, start_date, end_date):
"""获取股票数据"""
data = yf.download(symbols, start=start_date, end=end_date)['Adj Close']
return data
def main():
# 设置时间范围
end_date = datetime.now().strftime('%Y-%m-%d')
start_date = (datetime.now() - timedelta(days=365*3)).strftime('%Y-%m-%d')
# 数据配置
tech_symbols = ['AAPL', 'MSFT', 'GOOGL', 'META', 'AMZN', 'NVDA', 'INTC', 'CSCO', 'ADBE', 'ORCL']
# 获取数据
print("正在获取数据...")
price_data = fetch_data(tech_symbols, start_date, end_date)
# 选择配对
print("正在选择配对...")
selector = PairSelector(price_data)
pairs = selector.find_cointegrated_pairs()
filtered_pairs = selector.filter_pairs_by_correlation(pairs)
if not filtered_pairs:
print("未找到合适的配对")
return
# 选择第一个配对
stock1, stock2, pvalue, correlation = filtered_pairs[0]
print(f"选择的配对: {stock1} - {stock2} (p值: {pvalue:.4f}, 相关性: {correlation:.4f})")
# 进行协整性测试
print("正在进行协整性测试...")
coint_test = CointegrationTest(price_data[stock1], price_data[stock2])
result = coint_test.engle_granger_test()
hedge_ratio = result['hedge_ratio']
print(f"对冲比率: {hedge_ratio:.4f}")
print(f"ADF检验统计量: {result['adf_statistic']:.4f}")
print(f"p值: {result['pvalue']:.4f}")
# 绘制价差图
spread_plot = coint_test.plot_spread(result)
spread_plot.savefig("spread.png")
print("价差图已保存为spread.png")
# 设置策略
print("正在配置策略...")
strategy = PairTradingStrategy(
stock1=stock1,
stock2=stock2,
hedge_ratio=hedge_ratio,
entry_z_score=2.0,
exit_z_score=0.0,
stop_loss_z_score=3.5,
lookback_period=20
)
# 回测策略
print("正在进行回测...")
backtester = PairTradingBacktester(
price_data=price_data[[stock1, stock2]],
strategy=strategy,
initial_capital=100000.0
)
results = backtester.run_backtest()
# 计算绩效指标
metrics = backtester.calculate_performance_metrics(results)
# 打印结果
print("\n==== 回测结果 ====")
print(f"初始资金: ${metrics['initial_capital']:.2f}")
print(f"最终资产: ${metrics['final_portfolio_value']:.2f}")
print(f"总收益率: {metrics['total_return']*100:.2f}%")
print(f"年化收益率: {metrics['annual_return']*100:.2f}%")
print(f"夏普比率: {metrics['sharpe_ratio']:.4f}")
print(f"最大回撤: {metrics['max_drawdown']*100:.2f}%")
print(f"总交易次数: {metrics['total_trades']}")
print(f"胜率: {metrics['win_rate']*100:.2f}%")
# 绘制回测结果
result_plot = backtester.plot_results(results)
result_plot.savefig("backtest_results.png")
print("回测结果图表已保存为backtest_results.png")
if __name__ == "__main__":
main()