🔴
入学要求
💯
能力测试
🛣️
课程安排
🕹️
研究资源

项目7:配对量化交易策略

1. 项目介绍

配对交易是一种市场中性策略,通过同时买入和卖出具有相关性的资产对,利用价格偏离均衡状态时的统计套利机会获利。本项目将引导学习者构建完整的配对交易系统,从资产选择到策略执行与调整。

1.1. 学习目标

1.2. 理论基础

配对交易核心概念

配对交易的核心思想是利用两个相关资产之间的价格关系,当价差偏离其长期均衡水平时进行交易。这种策略通常被归类为统计套利,其基础是价格均值回归的统计性质。

协整性理论

协整(Cointegration)是配对交易的理论基础,两个非平稳时间序列的线性组合可能是平稳的,这种关系被称为协整。检验协整性的主要方法包括:

  1. Engle-Granger两步法
  1. Johansen检验
  1. 增广Dickey-Fuller(ADF)检验

价差建模方法

  1. 简单价差模型: 直接计算两个资产价格之差
  1. 比率模型: 计算价格比率并观察其变化
  1. 回归模型: 通过线性回归确定最优对冲比率
  1. 卡尔曼滤波: 动态调整对冲比率
  1. 机器学习模型: 非线性模型捕捉复杂关系

2. 项目框架

project/
├── data/                  # 数据存储目录
├── models/                # 模型定义与训练
├── strategies/            # 交易策略实现
├── backtesting/           # 回测框架
├── monitoring/            # 策略监控与调整
├── utils/                 # 工具函数
├── config.py              # 配置文件
├── main.py                # 主程序入口
└── requirements.txt       # 依赖包列表

2.1. 配对选择框架

# utils/pair_selection.py
import numpy as np
import pandas as pd
from statsmodels.tsa.stattools import coint
import itertools

class PairSelector:
    def __init__(self, price_data, lookback_period=252):
        """
        初始化配对选择器

        参数:
        price_data (pd.DataFrame): 多股票的价格数据,索引为日期,列为股票代码
        lookback_period (int): 回看期,用于计算协整性
        """
        self.price_data = price_data
        self.lookback_period = lookback_period

    def calculate_correlation_matrix(self):
        """计算相关性矩阵"""
        return self.price_data.pct_change().dropna().corr()

    def find_cointegrated_pairs(self, pvalue_threshold=0.05):
        """
        寻找协整的股票对

        参数:
        pvalue_threshold (float): 协整性检验的p值阈值

        返回:
        list: 协整的股票对列表及其p值
        """
        n = len(self.price_data.columns)
        pvalue_matrix = np.ones((n, n))
        pairs = []

        # 获取最近的数据用于协整性检验
        recent_data = self.price_data.iloc[-self.lookback_period:]

        for i, j in itertools.combinations(range(n), 2):
            stock1 = self.price_data.columns[i]
            stock2 = self.price_data.columns[j]

            # 执行协整性检验
            result = coint(recent_data[stock1], recent_data[stock2])
            pvalue = result[1]
            pvalue_matrix[i, j] = pvalue

            if pvalue < pvalue_threshold:
                pairs.append((stock1, stock2, pvalue))

        return sorted(pairs, key=lambda x: x[2])

    def filter_pairs_by_correlation(self, pairs, corr_threshold=0.7):
        """使用相关性进一步筛选配对"""
        corr_matrix = self.calculate_correlation_matrix()
        filtered_pairs = []

        for stock1, stock2, pvalue in pairs:
            correlation = corr_matrix.loc[stock1, stock2]
            if abs(correlation) > corr_threshold:
                filtered_pairs.append((stock1, stock2, pvalue, correlation))

        return sorted(filtered_pairs, key=lambda x: x[2])

# 待实现:配对筛选算法
# TODO: 实现基于行业分类、基本面相似性等的更高级筛选方法

2.2. 协整性测试功能

# utils/cointegration.py
import numpy as np
import pandas as pd
import statsmodels.api as sm
from statsmodels.tsa.stattools import adfuller, coint
import matplotlib.pyplot as plt

class CointegrationTest:
    def __init__(self, series1, series2):
        """
        初始化协整性测试

        参数:
        series1 (pd.Series): 第一个价格序列
        series2 (pd.Series): 第二个价格序列
        """
        self.series1 = series1
        self.series2 = series2

    def engle_granger_test(self):
        """
        执行Engle-Granger两步法

        返回:
        dict: 包含测试结果、p值和临界值
        """
        # 步骤1: 估计协整方程
        X = sm.add_constant(self.series1)
        model = sm.OLS(self.series2, X).fit()
        beta = model.params[1]

        # 步骤2: 检验残差的平稳性
        spread = self.series2 - beta * self.series1
        adf_result = adfuller(spread)

        return {
            'hedge_ratio': beta,
            'adf_statistic': adf_result[0],
            'pvalue': adf_result[1],
            'critical_values': adf_result[4],
            'spread': spread
        }

    def johansen_test(self):
        """
        执行Johansen检验

        返回:
        dict: 检验结果
        """
        # 组合数据
        data = pd.concat([self.series1, self.series2], axis=1)

        # 执行Johansen检验
        # 注意:对于配对交易,通常我们关注是否有至少一个协整向量
        johansen_result = sm.tsa.vector_ar.vecm.coint_johansen(data, det_order=0, k_ar_diff=1)

        return {
            'trace_statistic': johansen_result.lr1,
            'critical_values': johansen_result.cvt
        }

    def plot_spread(self, result):
        """
        绘制价差图

        参数:
        result (dict): engle_granger_test的返回值
        """
        plt.figure(figsize=(12, 6))
        plt.plot(result['spread'])
        plt.axhline(y=result['spread'].mean(), color='r', linestyle='-')
        plt.axhline(y=result['spread'].mean() + 2*result['spread'].std(), color='g', linestyle='--')
        plt.axhline(y=result['spread'].mean() - 2*result['spread'].std(), color='g', linestyle='--')
        plt.title('Price Spread with Mean and 2-sigma Bands')
        plt.ylabel('Spread')
        plt.grid(True)
        plt.tight_layout()
        return plt

2.3. 基础交易执行模板

# strategies/pair_trading.py
import numpy as np
import pandas as pd

class PairTradingStrategy:
    def __init__(self, stock1, stock2, hedge_ratio, entry_z_score=2.0, exit_z_score=0.0,
                 stop_loss_z_score=3.5, lookback_period=20):
        """
        初始化配对交易策略

        参数:
        stock1 (str): 第一个股票代码
        stock2 (str): 第二个股票代码
        hedge_ratio (float): 对冲比率,用于计算价差
        entry_z_score (float): 入场的z值阈值
        exit_z_score (float): 出场的z值阈值
        stop_loss_z_score (float): 止损的z值阈值
        lookback_period (int): 计算均值和标准差的回看期
        """
        self.stock1 = stock1
        self.stock2 = stock2
        self.hedge_ratio = hedge_ratio
        self.entry_z_score = entry_z_score
        self.exit_z_score = exit_z_score
        self.stop_loss_z_score = stop_loss_z_score
        self.lookback_period = lookback_period

        # 状态变量
        self.position = 0  # 1: 做多stock1,做空stock2; -1: 做空stock1,做多stock2; 0: 空仓
        self.entry_price_spread = 0
        self.trade_history = []

    def calculate_spread(self, price1, price2):
        """计算价差"""
        return price2 - self.hedge_ratio * price1

    def calculate_zscore(self, spread, historical_spreads):
        """计算当前价差的z值"""
        mean = np.mean(historical_spreads)
        std = np.std(historical_spreads)
        if std == 0:
            return 0
        return (spread - mean) / std

    def generate_signals(self, price_data):
        """
        生成交易信号

        参数:
        price_data (pd.DataFrame): 包含两只股票价格的数据框,索引为日期

        返回:
        pd.DataFrame: 包含信号和仓位的数据框
        """
        # 确保数据框包含两只股票的数据
        if self.stock1 not in price_data.columns or self.stock2 not in price_data.columns:
            raise ValueError(f"价格数据中缺少股票: {self.stock1} 或 {self.stock2}")

        # 创建结果数据框
        results = pd.DataFrame(index=price_data.index)
        results['spread'] = self.calculate_spread(
            price_data[self.stock1], price_data[self.stock2]
        )

        # 计算移动平均和标准差
        results['moving_avg'] = results['spread'].rolling(window=self.lookback_period).mean()
        results['moving_std'] = results['spread'].rolling(window=self.lookback_period).std()

        # 计算z值
        results['zscore'] = (results['spread'] - results['moving_avg']) / results['moving_std']

        # 初始化信号和仓位列
        results['signal'] = 0
        results['position'] = 0

        # 生成交易信号
        for i in range(self.lookback_period, len(results)):
            current_zscore = results['zscore'].iloc[i]
            previous_position = results['position'].iloc[i-1] if i > 0 else 0

            # 默认保持前一个仓位
            results.loc[results.index[i], 'position'] = previous_position

            # 空仓时的入场信号
            if previous_position == 0:
                if current_zscore > self.entry_z_score:
                    # 做空stock1,做多stock2
                    results.loc[results.index[i], 'signal'] = -1
                    results.loc[results.index[i], 'position'] = -1
                    self.entry_price_spread = results['spread'].iloc[i]

                elif current_zscore < -self.entry_z_score:
                    # 做多stock1,做空stock2
                    results.loc[results.index[i], 'signal'] = 1
                    results.loc[results.index[i], 'position'] = 1
                    self.entry_price_spread = results['spread'].iloc[i]

            # 持仓时的出场或止损信号
            elif previous_position == 1:  # 做多stock1,做空stock2
                if current_zscore >= -self.exit_z_score or current_zscore <= -self.stop_loss_z_score:
                    # 平仓
                    results.loc[results.index[i], 'signal'] = -1
                    results.loc[results.index[i], 'position'] = 0

            elif previous_position == -1:  # 做空stock1,做多stock2
                if current_zscore <= self.exit_z_score or current_zscore >= self.stop_loss_z_score:
                    # 平仓
                    results.loc[results.index[i], 'signal'] = 1
                    results.loc[results.index[i], 'position'] = 0

        return results

# 待实现:价差建模与预测模块、交易信号触发机制、动态止损策略
# TODO: 实现价差动态建模和预测
# TODO: 设计更复杂的交易信号生成机制
# TODO: 实现动态止损和策略参数自适应

2.4. 回测环境配置

# backtesting/backtest_engine.py
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from datetime import datetime

class PairTradingBacktester:
    def __init__(self, price_data, strategy, initial_capital=100000.0,
                 commission=0.001, slippage=0.001):
        """
        初始化回测引擎

        参数:
        price_data (pd.DataFrame): 价格数据,索引为日期,列为股票代码
        strategy (PairTradingStrategy): 配对交易策略实例
        initial_capital (float): 初始资金
        commission (float): 交易佣金率
        slippage (float): 滑点率
        """
        self.price_data = price_data
        self.strategy = strategy
        self.initial_capital = initial_capital
        self.current_capital = initial_capital
        self.commission = commission
        self.slippage = slippage

        # 回测结果
        self.portfolio_value = []
        self.trades = []
        self.positions = {
            strategy.stock1: 0,
            strategy.stock2: 0
        }

    def run_backtest(self):
        """
        运行回测

        返回:
        pd.DataFrame: 回测结果
        """
        # 生成交易信号
        signals = self.strategy.generate_signals(self.price_data)

        # 初始化回测结果
        results = pd.DataFrame(index=signals.index)
        results['capital'] = self.initial_capital
        results[f'position_{self.strategy.stock1}'] = 0
        results[f'position_{self.strategy.stock2}'] = 0
        results['portfolio_value'] = self.initial_capital

        # 运行回测
        for i in range(1, len(signals)):
            current_date = signals.index[i]
            previous_date = signals.index[i-1]

            # 获取当前价格
            price1 = self.price_data[self.strategy.stock1].loc[current_date]
            price2 = self.price_data[self.strategy.stock2].loc[current_date]

            # 获取前一日持仓
            position1 = results[f'position_{self.strategy.stock1}'].loc[previous_date]
            position2 = results[f'position_{self.strategy.stock2}'].loc[previous_date]

            # 更新持仓价值
            position_value1 = position1 * price1
            position_value2 = position2 * price2
            capital = results['capital'].loc[previous_date]

            # 执行交易
            if signals['signal'].iloc[i] != 0:
                # 平仓原有持仓
                if position1 != 0 or position2 != 0:
                    # 计算交易费用
                    close_commission = (abs(position_value1) + abs(position_value2)) * self.commission
                    capital += position_value1 + position_value2 - close_commission

                    # 记录交易
                    self.trades.append({
                        'date': current_date,
                        'action': 'CLOSE',
                        f'{self.strategy.stock1}': -position1,
                        f'{self.strategy.stock2}': -position2,
                        'price1': price1,
                        'price2': price2,
                        'commission': close_commission
                    })

                    position1 = 0
                    position2 = 0

                # 开新仓
                if signals['position'].iloc[i] != 0:
                    signal = signals['position'].iloc[i]

                    # 确定分配资金
                    allocation = capital * 0.9  # 使用90%的资金开仓
                    allocation_per_stock = allocation / 2

                    # 计算股数,确保对冲比例正确
                    qty1 = int(allocation_per_stock / price1)
                    qty2 = int(allocation_per_stock / price2)

                    # 调整为对冲比例
                    if signal == 1:  # 做多stock1,做空stock2
                        position1 = qty1
                        position2 = -int(qty1 * self.strategy.hedge_ratio * price1 / price2)
                    else:  # 做空stock1,做多stock2
                        position1 = -qty1
                        position2 = int(qty1 * self.strategy.hedge_ratio * price1 / price2)

                    # 计算交易费用
                    open_commission = (abs(position1 * price1) + abs(position2 * price2)) * self.commission
                    capital -= open_commission

                    # 记录交易
                    self.trades.append({
                        'date': current_date,
                        'action': 'OPEN',
                        f'{self.strategy.stock1}': position1,
                        f'{self.strategy.stock2}': position2,
                        'price1': price1,
                        'price2': price2,
                        'commission': open_commission
                    })

            # 更新结果
            results[f'position_{self.strategy.stock1}'].loc[current_date] = position1
            results[f'position_{self.strategy.stock2}'].loc[current_date] = position2
            results['capital'].loc[current_date] = capital
            results['portfolio_value'].loc[current_date] = capital + position1 * price1 + position2 * price2

        # 计算额外的指标
        results['returns'] = results['portfolio_value'].pct_change()
        results['cumulative_returns'] = (1 + results['returns']).cumprod() - 1

        return results

    def calculate_performance_metrics(self, results):
        """
        计算绩效指标

        参数:
        results (pd.DataFrame): 回测结果

        返回:
        dict: 绩效指标
        """
        returns = results['returns'].dropna()

        # 计算年化收益率
        total_days = (results.index[-1] - results.index[0]).days
        annual_return = (1 + results['cumulative_returns'].iloc[-1]) ** (365 / total_days) - 1

        # 计算最大回撤
        cumulative_returns = results['cumulative_returns']
        running_max = cumulative_returns.cummax()
        drawdown = (cumulative_returns - running_max) / running_max
        max_drawdown = drawdown.min()

        # 计算夏普比率
        risk_free_rate = 0.0  # 假设无风险利率为0
        sharpe_ratio = (returns.mean() * 252 - risk_free_rate) / (returns.std() * np.sqrt(252))

        # 获取交易统计
        total_trades = len(self.trades)
        profit_trades = sum(1 for t in self.trades if t['action'] == 'CLOSE' and
                           (t[f'{self.strategy.stock1}'] * t['price1'] +
                            t[f'{self.strategy.stock2}'] * t['price2']) > 0)
        win_rate = profit_trades / total_trades if total_trades > 0 else 0

        return {
            'initial_capital': self.initial_capital,
            'final_portfolio_value': results['portfolio_value'].iloc[-1],
            'total_return': results['cumulative_returns'].iloc[-1],
            'annual_return': annual_return,
            'sharpe_ratio': sharpe_ratio,
            'max_drawdown': max_drawdown,
            'total_trades': total_trades,
            'win_rate': win_rate
        }

    def plot_results(self, results):
        """
        绘制回测结果

        参数:
        results (pd.DataFrame): 回测结果
        """
        fig, axes = plt.subplots(3, 1, figsize=(12, 15), sharex=True)

        # 绘制资产组合价值
        axes[0].plot(results['portfolio_value'])
        axes[0].set_title('Portfolio Value')
        axes[0].set_ylabel('Value')
        axes[0].grid(True)

        # 绘制配对价差和z-score
        ax_twin = axes[1].twinx()
        axes[1].plot(self.strategy.generate_signals(self.price_data)['spread'], 'b-', label='Spread')
        ax_twin.plot(self.strategy.generate_signals(self.price_data)['zscore'], 'r-', label='Z-Score')
        axes[1].set_title('Spread and Z-Score')
        axes[1].set_ylabel('Spread', color='b')
        ax_twin.set_ylabel('Z-Score', color='r')
        axes[1].grid(True)

        # 绘制仓位
        axes[2].plot(results[f'position_{self.strategy.stock1}'], 'g-', label=self.strategy.stock1)
        axes[2].plot(results[f'position_{self.strategy.stock2}'], 'm-', label=self.strategy.stock2)
        axes[2].set_title('Positions')
        axes[2].set_ylabel('Quantity')
        axes[2].set_xlabel('Date')
        axes[2].legend()
        axes[2].grid(True)

        plt.tight_layout()
        return fig

3. 学习者需完成的任务

3.1. 实现配对筛选算法

utils/pair_selection.py 中添加更高级的配对选择方法,包括:

参考框架:

import numpy as np
import pandas as pd
import statsmodels.api as sm
from statsmodels.tsa.stattools import coint, adfuller
from itertools import combinations

class PairSelector:
    def __init__(self, price_data):
        """
        初始化配对选择器
        
        参数:
            price_data (DataFrame): 包含股票价格的数据框,索引为日期,列为股票代码
        """
        self.price_data = price_data
        self.pairs_results = {}
    
    def select_pairs_by_industry(self, industry_data, min_pairs_per_industry=5, p_value_threshold=0.05):
        """
        基于行业分类筛选配对
        
        参数:
            industry_data (dict): 股票代码到行业的映射字典
            min_pairs_per_industry (int): 每个行业至少保留的配对数量
            p_value_threshold (float): 协整检验的p值阈值
            
        返回:
            dict: 行业到配对列表的映射
        """
        # 按行业分组
        industry_groups = {}
        for stock, industry in industry_data.items():
            if industry not in industry_groups:
                industry_groups[industry] = []
            industry_groups[industry].append(stock)

        # 在每个行业内寻找协整对
        industry_pairs = {}
        for industry, stocks in industry_groups.items():
            if len(stocks) < 2:
                continue

            # 只考虑同行业内的股票
            stock_prices = self.price_data[stocks].dropna(axis=1)
            if len(stock_prices.columns) < 2:
                continue

            # 调用协整检验函数
            pairs = self._find_cointegrated_pairs_in_subset(stock_prices, p_value_threshold)
            
            # 如果找到了足够的配对,则添加到结果中
            if len(pairs) >= min_pairs_per_industry:
                industry_pairs[industry] = pairs

        return industry_pairs

    def _find_cointegrated_pairs_in_subset(self, price_subset, p_value_threshold=0.05):
        """
        在股票子集中寻找协整对
        
        参数:
            price_subset (DataFrame): 股票价格子集
            p_value_threshold (float): 协整检验的p值阈值
            
        返回:
            list: 协整对列表,每个元素为(股票1, 股票2, p值, 协整系数)
        """
        # TODO: 实现协整检验逻辑
        # 1. 遍历所有可能的股票对
        # 2. 对每对股票进行协整检验
        # 3. 保存通过检验的配对及其协整系数
        pass

    def filter_by_fundamentals(self, pairs, fundamental_data, similarity_threshold=0.2):
        """
        基于基本面数据筛选配对
        
        参数:
            pairs (list): 配对列表,每个元素为(股票1, 股票2, p值, 协整系数)
            fundamental_data (DataFrame): 基本面数据,索引为股票代码,列为各基本面指标
            similarity_threshold (float): 基本面相似性阈值
            
        返回:
            list: 经过基本面筛选后的配对列表
        """
        # TODO: 实现基于基本面相似性的筛选
        # 1. 对每个配对计算基本面指标的相似度
        # 2. 筛选相似度满足条件的配对
        # 3. 可考虑市值、市盈率、市净率等指标的相对差异
        pass

    def filter_by_liquidity(self, pairs, volume_data, min_volume=100000, min_days_active=0.9):
        """
        基于交易量筛选配对
        
        参数:
            pairs (list): 配对列表
            volume_data (DataFrame): 交易量数据,索引为日期,列为股票代码
            min_volume (float): 最低平均日交易量
            min_days_active (float): 最低交易活跃度(有交易的天数比例)
            
        返回:
            list: 经过流动性筛选后的配对列表
        """
        # TODO: 实现基于流动性的筛选
        # 1. 计算每支股票的平均交易量
        # 2. 计算每支股票的交易活跃度(有交易的天数比例)
        # 3. 筛选满足条件的配对
        pass

    def multi_period_cointegration(self, pairs, periods=[30, 60, 120, 252], consistency_threshold=0.7):
        """
        多周期协整性验证
        
        参数:
            pairs (list): 配对列表
            periods (list): 要检验的时间窗口长度列表(交易日)
            consistency_threshold (float): 多周期一致性阈值
            
        返回:
            list: 经过多周期验证的配对列表,每个元素为(股票1, 股票2, 稳定性得分, 平均协整系数)
        """
        # TODO: 在多个时间窗口进行协整检验,评估稳定性
        # 1. 对每个配对,在不同的时间窗口进行协整检验
        # 2. 计算配对在各个时间窗口的稳定性得分
        # 3. 筛选多周期都稳定的配对
        pass
    
    def calculate_spread(self, stock1, stock2, window=252):
        """
        计算两支股票的价差序列
        
        参数:
            stock1 (str): 第一支股票代码
            stock2 (str): 第二支股票代码
            window (int): 用于计算的历史窗口长度
            
        返回:
            Series: 标准化后的价差序列
        """
        # TODO: 实现价差计算逻辑
        # 1. 获取两支股票的历史价格
        # 2. 计算线性回归系数
        # 3. 计算并标准化价差序列
        pass
    
    def rank_pairs(self, pairs, metrics=None):
        """
        对筛选出的配对进行排序
        
        参数:
            pairs (list): 配对列表
            metrics (dict): 排序指标权重
            
        返回:
            list: 排序后的配对列表
        """
        # TODO: 实现配对排序逻辑
        # 1. 为每个配对计算多个评分指标(如协整性强度、价差均值回归速度等)
        # 2. 根据指标权重计算综合得分
        # 3. 按得分排序返回配对
        pass
    
    def visualize_pair(self, stock1, stock2, window=252):
        """
        可视化配对的价格和价差
        
        参数:
            stock1 (str): 第一支股票代码
            stock2 (str): 第二支股票代码
            window (int): 用于可视化的历史窗口长度
        """
        # TODO: 实现配对可视化逻辑
        # 1. 绘制两支股票的价格走势
        # 2. 绘制标准化价差及其上下界
        # 3. 标记潜在交易信号
        pass

def test_pair_stability(price_data, stock1, stock2, test_windows=[30, 60, 90, 120]):
    """
    测试配对的稳定性
    
    参数:
        price_data (DataFrame): 股票价格数据
        stock1 (str): 第一支股票代码
        stock2 (str): 第二支股票代码
        test_windows (list): 测试窗口列表
        
    返回:
        dict: 稳定性测试结果
    """
    # TODO: 实现配对稳定性测试逻辑
    # 1. 在不同窗口下检验配对的协整性
    # 2. 计算半衰期和平稳性指标
    # 3. 返回综合评估结果
    pass

3.2. 开发价差建模与预测模块

strategies/spread_modeling.py 中实现价差建模和预测功能:

# strategies/spread_modeling.py
import numpy as np
import pandas as pd
from statsmodels.tsa.arima.model import ARIMA
from pykalman import KalmanFilter
from sklearn.linear_model import LinearRegression

class SpreadModel:
    def __init__(self, model_type='kalman'):
        """
        初始化价差模型

        参数:
        model_type (str): 模型类型,'kalman'、'arima'或'regression'
        """
        self.model_type = model_type
        self.model = None
        self.hedge_ratio = None

    def fit(self, series1, series2, **kwargs):
        """
        拟合价差模型

        参数:
        series1 (pd.Series): 第一个资产价格序列
        series2 (pd.Series): 第二个资产价格序列
        **kwargs: 模型特定参数

        返回:
        self: 模型实例
        """
        if self.model_type == 'kalman':
            return self._fit_kalman(series1, series2, **kwargs)
        elif self.model_type == 'arima':
            return self._fit_arima(series1, series2, **kwargs)
        elif self.model_type == 'regression':
            return self._fit_regression(series1, series2, **kwargs)
        else:
            raise ValueError(f"不支持的模型类型: {self.model_type}")

    def _fit_kalman(self, series1, series2, **kwargs):
        """使用卡尔曼滤波拟合动态对冲比率"""
        # TODO: 实现卡尔曼滤波模型
        pass

    def _fit_arima(self, series1, series2, **kwargs):
        """先计算对冲比率,然后使用ARIMA模型拟合价差"""
        # TODO: 实现ARIMA建模
        pass

    def _fit_regression(self, series1, series2, **kwargs):
        """使用滚动窗口线性回归拟合动态对冲比率"""
        # TODO: 实现滚动回归模型
        pass

    def predict(self, series1, series2, horizon=1):
        """
        预测未来价差

        参数:
        series1 (pd.Series): 第一个资产最新价格序列
        series2 (pd.Series): 第二个资产最新价格序列
        horizon (int): 预测期数

        返回:
        pd.Series: 预测的价差序列
        """
        # TODO: 实现预测逻辑
        pass

    def calculate_spread(self, series1, series2):
        """
        计算当前价差

        参数:
        series1 (pd.Series): 第一个资产价格序列
        series2 (pd.Series): 第二个资产价格序列

        返回:
        pd.Series: 价差序列
        """
        # TODO: 实现价差计算
        pass

    def get_hedge_ratio(self):
        """获取当前对冲比率"""
        return self.hedge_ratio

3.3. 设计交易信号触发机制

扩展 strategies/pair_trading.py 中的信号生成方法:

import numpy as np
import pandas as pd
from scipy import stats
import statsmodels.api as sm
import talib as ta
from sklearn.linear_model import LinearRegression


class PairTradingSignals:
    def __init__(self, z_entry=2.0, z_exit=0.5, max_active_positions=3, stop_loss_z=4.0):
        """
        初始化配对交易信号生成器
        
        参数:
            z_entry (float): 入场z分数阈值
            z_exit (float): 出场z分数阈值
            max_active_positions (int): 最大活跃仓位数
            stop_loss_z (float): 止损z分数阈值
        """
        self.z_entry = z_entry
        self.z_exit = z_exit
        self.max_active_positions = max_active_positions
        self.stop_loss_z = stop_loss_z
        self.active_pairs = {}
        
    def generate_basic_signals(self, spread, stock1, stock2):
        """
        生成基本的配对交易信号
        
        参数:
            spread (pd.Series): 标准化价差序列
            stock1 (str): 第一支股票代码
            stock2 (str): 第二支股票代码
            
        返回:
            pd.DataFrame: 包含信号的数据框
        """
        signals = pd.DataFrame(index=spread.index)
        signals['zscore'] = spread
        signals['signal'] = 0
        
        # 基本信号逻辑
        signals.loc[spread > self.z_entry, 'signal'] = -1  # 做空价差
        signals.loc[spread < -self.z_entry, 'signal'] = 1  # 做多价差
        signals.loc[(spread < self.z_exit) & (spread > -self.z_exit), 'signal'] = 0  # 平仓
        
        # 止损逻辑
        signals.loc[spread > self.stop_loss_z, 'signal'] = 0  # 止损平仓
        signals.loc[spread < -self.stop_loss_z, 'signal'] = 0  # 止损平仓
        
        # 转换信号为持仓
        signals['position'] = signals['signal'].shift(1).fillna(0)
        
        return signals
    
    def generate_advanced_signals(self, price_data, spread_model, lookback=60, risk_factor=0.02):
        """
        生成基于高级模型的交易信号

        参数:
            price_data (pd.DataFrame): 包含两只股票价格的数据框
            spread_model (SpreadModel): 价差模型
            lookback (int): 技术指标计算的回看期
            risk_factor (float): 风险因子

        返回:
            pd.DataFrame: 包含信号和仓位的数据框
        """
        # TODO: 实现基于预测模型的信号生成
        # 1. 使用spread_model预测价差
        # 2. 基于预测价差生成信号
        # 3. 结合技术指标如RSI、MACD等优化信号
        
        # 获取股票代码
        stock1, stock2 = price_data.columns
        
        # 计算实际价差和预测价差
        
        # 计算技术指标
        
        # 结合预测和技术指标生成信号
        
        # 应用自适应阈值
        
        # 更新持仓
        
        # 添加波动率调整的仓位大小
        
        pass

    def adaptive_threshold(self, spread_data, lookback=120, quantile=0.9, min_threshold=1.5, max_threshold=3.0):
        """
        计算自适应入场阈值

        参数:
            spread_data (pd.Series): 价差数据
            lookback (int): 回看期
            quantile (float): 分位数
            min_threshold (float): 最小阈值
            max_threshold (float): 最大阈值

        返回:
            float: 自适应阈值
        """
        # TODO: 实现基于历史波动率的自适应阈值
        
        # 计算滚动波动率
        
        # 确定适当的分位数阈值
        
        # 在最小和最大阈值范围内限制结果
        
        pass

    def volatility_adjusted_position_sizing(self, zscore, price1, price2, capital, 
                                            vol_window=30, max_risk_per_trade=0.02):
        """
        基于波动率的仓位调整

        参数:
            zscore (float): 当前价差的z分数
            price1 (float): 第一个资产价格
            price2 (float): 第二个资产价格
            capital (float): 可用资金
            vol_window (int): 波动率计算窗口
            max_risk_per_trade (float): 每笔交易最大风险比例

        返回:
            tuple: (仓位1, 仓位2)
        """
        # TODO: 实现基于波动率的仓位大小计算
        
        # 计算相对波动率
        
        # 确定对冲比例
        
        # 计算仓位大小,考虑波动率和风险限制
        
        # 确保仓位不超过可用资金
        
        pass
    
    def add_stop_loss_signals(self, signals, spread, trailing_pct=0.3, time_stop_days=10):
        """
        添加止损信号
        
        参数:
            signals (pd.DataFrame): 原始信号数据框
            spread (pd.Series): 标准化价差序列
            trailing_pct (float): 追踪止损百分比
            time_stop_days (int): 时间止损天数
            
        返回:
            pd.DataFrame: 包含止损信号的数据框
        """
        # TODO: 实现多种止损策略
        
        # 1. 基于Z分数的固定止损
        
        # 2. 基于追踪止损
        
        # 3. 基于时间止损
        
        # 4. 基于利润保护止损
        
        pass
    
    def calculate_trade_quality(self, spread, entry_point, current_point, position):
        """
        计算交易质量得分
        
        参数:
            spread (pd.Series): 价差序列
            entry_point (int): 入场点索引
            current_point (int): 当前点索引
            position (int): 仓位方向 (1:多, -1:空)
            
        返回:
            float: 交易质量得分
        """
        # TODO: 实现交易质量评估
        
        # 计算价差变动
        
        # 计算均值回归速度
        
        # 考虑价差波动
        
        # 返回综合得分
        
        pass
    
    def integrate_market_regime(self, signals, market_data, vix_threshold=25):
        """
        根据市场环境调整交易信号
        
        参数:
            signals (pd.DataFrame): 信号数据框
            market_data (pd.DataFrame): 市场数据,包含指数和VIX
            vix_threshold (float): VIX阈值
            
        返回:
            pd.DataFrame: 调整后的信号数据框
        """
        # TODO: 实现市场环境感知的信号调整
        
        # 识别市场状态(正常/波动/危机)
        
        # 根据市场状态调整入场阈值
        
        # 根据市场状态调整仓位大小
        
        pass

    def filter_trending_pairs(self, spread, trend_window=20, trend_threshold=0.6):
        """
        过滤有明显趋势的价差
        
        参数:
            spread (pd.Series): 价差序列
            trend_window (int): 趋势判断窗口
            trend_threshold (float): 趋势强度阈值
            
        返回:
            bool: 是否应该过滤该配对
        """
        # TODO: 实现趋势检测和过滤
        
        # 计算趋势强度指标
        
        # 判断是否存在显著趋势
        
        pass
    
    def calculate_expected_holding_period(self, spread, z_score):
        """
        估计预期持仓时间
        
        参数:
            spread (pd.Series): 价差序列
            z_score (float): 当前z分数
            
        返回:
            int: 预期持仓天数
        """
        # TODO: 实现持仓时间预测
        
        # 计算半衰期
        
        # 基于当前偏离程度预测回归时间
        
        pass
    
    def generate_signal_confidence(self, price_data, spread, window=60):
        """
        计算信号置信度
        
        参数:
            price_data (pd.DataFrame): 价格数据
            spread (pd.Series): 价差序列
            window (int): 计算窗口
            
        返回:
            pd.Series: 信号置信度序列
        """
        # TODO: 实现信号置信度计算
        
        # 计算价差统计特性
        
        # 评估协整稳定性
        
        # 考虑技术指标一致性
        
        # 综合评分
        
        pass

3.4. 构建动态止损策略

在现有策略基础上增加动态风险管理功能:

class DynamicStopLossManager:
    def __init__(self, initial_stop_loss=3.0, sensitivity=0.5):
        """
        初始化动态止损管理器

        参数:
        initial_stop_loss (float): 初始止损z分数阈值
        sensitivity (float): 止损调整敏感度
        """
        self.current_stop_loss = initial_stop_loss
        self.sensitivity = sensitivity
        self.trade_outcomes = []

    def update_stop_loss(self, trade_result):
        """
        基于交易结果更新止损水平

        参数:
        trade_result (dict): 包含交易结果信息的字典

        返回:
        float: 更新后的止损水平
        """
        # TODO: 实现基于交易结果的动态止损策略
        pass

    def calculate_time_based_stop_loss(self, entry_time, current_time, max_holding_period=10):
        """
        基于持仓时间的止损策略

        参数:
        entry_time (datetime): 入场时间
        current_time (datetime): 当前时间
        max_holding_period (int): 最大持仓天数

        返回:
        bool: 是否应该止损
        """
        # TODO: 实现基于时间的止损
        pass

    def calculate_profit_target(self, entry_spread, spread_volatility):
        """
        计算利润目标

        参数:
        entry_spread (float): 入场时的价差
        spread_volatility (float): 价差波动率

        返回:
        float: 利润目标
        """
        # TODO: 实现利润目标计算
        pass

3.5. 开发策略监控与调整功能

创建策略监控和动态调整模块:

# monitoring/strategy_monitor.py
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import streamlit as st

class PairStrategyMonitor:
    def __init__(self, strategy, backtest_results=None):
        """
        初始化策略监控器

        参数:
        strategy (PairTradingStrategy): 配对交易策略
        backtest_results (pd.DataFrame): 回测结果
        """
        self.strategy = strategy
        self.backtest_results = backtest_results
        self.live_performance = []
        self.parameter_history = []

    def record_trade(self, trade_info):
        """记录交易"""
        self.live_performance.append(trade_info)

    def update_parameters(self, params):
        """记录参数更新"""
        self.parameter_history.append({
            'timestamp': pd.Timestamp.now(),
            'params': params
        })

    def analyze_performance(self):
        """分析策略表现"""
        # TODO: 实现性能分析
        pass

    def detect_regime_change(self, price_data, lookback=60):
        """
        检测市场状态变化

        参数:
        price_data (pd.DataFrame): 价格数据
        lookback (int): 回看期

        返回:
        bool: 是否检测到状态变化
        """
        # TODO: 实现市场状态变化检测
        pass

    def optimize_parameters(self, price_data):
        """
        基于最新市场数据优化策略参数

        参数:
        price_data (pd.DataFrame): 最新价格数据

        返回:
        dict: 优化后的参数
        """
        # TODO: 实现参数优化
        pass

    def visualize_dashboard(self):
        """创建Streamlit可视化仪表板"""
        st.title("配对交易策略监控仪表板")

        # 显示策略概况
        st.header("策略概况")
        st.write(f"资产对: {self.strategy.stock1} - {self.strategy.stock2}")
        st.write(f"对冲比率: {self.strategy.hedge_ratio:.4f}")

        # 回测结果
        if self.backtest_results is not None:
            st.header("回测绩效")
            # TODO: 添加回测结果可视化

        # 实时性能监控
        st.header("实时性能")
        # TODO: 添加实时性能可视化

        # 参数历史
        st.header("参数调整历史")
        # TODO: 显示参数调整历史

        # 市场状态分析
        st.header("市场状态分析")
        # TODO: 添加市场状态可视化

项目整合与提交

项目整合

创建主程序文件 main.py,整合所有组件:

# main.py
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import yfinance as yf
from datetime import datetime, timedelta

from utils.pair_selection import PairSelector
from utils.cointegration import CointegrationTest
from strategies.pair_trading import PairTradingStrategy
from backtesting.backtest_engine import PairTradingBacktester

def fetch_data(symbols, start_date, end_date):
    """获取股票数据"""
    data = yf.download(symbols, start=start_date, end=end_date)['Adj Close']
    return data

def main():
    # 设置时间范围
    end_date = datetime.now().strftime('%Y-%m-%d')
    start_date = (datetime.now() - timedelta(days=365*3)).strftime('%Y-%m-%d')

    # 数据配置
    tech_symbols = ['AAPL', 'MSFT', 'GOOGL', 'META', 'AMZN', 'NVDA', 'INTC', 'CSCO', 'ADBE', 'ORCL']

    # 获取数据
    print("正在获取数据...")
    price_data = fetch_data(tech_symbols, start_date, end_date)

    # 选择配对
    print("正在选择配对...")
    selector = PairSelector(price_data)
    pairs = selector.find_cointegrated_pairs()
    filtered_pairs = selector.filter_pairs_by_correlation(pairs)

    if not filtered_pairs:
        print("未找到合适的配对")
        return

    # 选择第一个配对
    stock1, stock2, pvalue, correlation = filtered_pairs[0]
    print(f"选择的配对: {stock1} - {stock2} (p值: {pvalue:.4f}, 相关性: {correlation:.4f})")

    # 进行协整性测试
    print("正在进行协整性测试...")
    coint_test = CointegrationTest(price_data[stock1], price_data[stock2])
    result = coint_test.engle_granger_test()
    hedge_ratio = result['hedge_ratio']

    print(f"对冲比率: {hedge_ratio:.4f}")
    print(f"ADF检验统计量: {result['adf_statistic']:.4f}")
    print(f"p值: {result['pvalue']:.4f}")

    # 绘制价差图
    spread_plot = coint_test.plot_spread(result)
    spread_plot.savefig("spread.png")
    print("价差图已保存为spread.png")

    # 设置策略
    print("正在配置策略...")
    strategy = PairTradingStrategy(
        stock1=stock1,
        stock2=stock2,
        hedge_ratio=hedge_ratio,
        entry_z_score=2.0,
        exit_z_score=0.0,
        stop_loss_z_score=3.5,
        lookback_period=20
    )

    # 回测策略
    print("正在进行回测...")
    backtester = PairTradingBacktester(
        price_data=price_data[[stock1, stock2]],
        strategy=strategy,
        initial_capital=100000.0
    )

    results = backtester.run_backtest()

    # 计算绩效指标
    metrics = backtester.calculate_performance_metrics(results)

    # 打印结果
    print("\n==== 回测结果 ====")
    print(f"初始资金: ${metrics['initial_capital']:.2f}")
    print(f"最终资产: ${metrics['final_portfolio_value']:.2f}")
    print(f"总收益率: {metrics['total_return']*100:.2f}%")
    print(f"年化收益率: {metrics['annual_return']*100:.2f}%")
    print(f"夏普比率: {metrics['sharpe_ratio']:.4f}")
    print(f"最大回撤: {metrics['max_drawdown']*100:.2f}%")
    print(f"总交易次数: {metrics['total_trades']}")
    print(f"胜率: {metrics['win_rate']*100:.2f}%")

    # 绘制回测结果
    result_plot = backtester.plot_results(results)
    result_plot.savefig("backtest_results.png")
    print("回测结果图表已保存为backtest_results.png")

if __name__ == "__main__":
    main()

评估标准

  1. 策略稳健性:策略在不同市场条件下的表现
  1. 代码质量:模块化设计、代码可读性和效率
  1. 创新程度:算法和模型的创新应用
  1. 回测完整性:考虑交易成本、滑点等真实因素
  1. 文档完整性:代码注释和说明文档

参考资料

  1. Pairs Trading: Quantitative Methods and Analysis - Ganapathy Vidyamurthy
  1. Quantconnect 配对交易教程
  1. Python for Finance: Mastering Data-Driven Finance - Yves Hilpisch
  1. Cointegration in Time Series Analysis - StatsModels 文档
  1. Kalman Filter for Pairs Trading - Python for Finance
  1. Advanced Pairs Trading Strategies - Hudson & Thames
  1. Dynamic Time Series Models - Forecasting: Principles and Practice
  1. Streamlit Documentation - 用于创建交互式应用
  1. Backtesting.py - 回测框架参考实现
  1. GitHub: PyPortfolioOpt - 投资组合优化
  1. QuantStart: Statistical Arbitrage - 统计套利入门
  1. TowardsDataScience: Pair Trading - 配对交易算法实现
  1. Finance Train: Cointegration and Pairs Trading - Python实现配对交易
  1. Machine Learning for Trading - 机器学习在交易中的应用