查看全集:💎Quantopia量化分析56讲
def rolling_coint(pair, window=90, p_threshold=0.05):
"""
滚动协整检验系统
pair: DataFrame包含两列价格序列
"""
dates = pair.index[window:]
pvalues = []
models = []
for i in range(window, len(pair)):
sub = pair.iloc[i-window:i]
_, pvalue, _ = coint(sub.iloc[:,0], sub.iloc[:,1])
pvalues.append(pvalue)
# 存储有效模型
if pvalue < p_threshold:
X = sm.add_constant(sub.iloc[:,0])
model = sm.OLS(sub.iloc[:,1], X).fit()
models.append(model)
else:
models.append(None)
return pd.Series(pvalues, index=dates), models
# 应用示例
prices = data[['AAPL', 'MSFT']].dropna()
pvalues, models = rolling_coint(prices)
plt.plot(pvalues, label='P-value')
plt.axhline(0.05, color='r', linestyle='--')
plt.title('滚动协整检验')
from hmmlearn import hmm
# 构建隐马尔可夫模型
class CointegratedHMM:
def __init__(self, n_states=2):
self.model = hmm.GaussianHMM(n_components=n_states)
def fit(self, spread):
self.model.fit(spread.values.reshape(-1,1))
def predict_state(self, spread):
return self.model.predict(spread.values.reshape(-1,1))
# 应用示例
spread = prices['MSFT'] - 0.8*prices['AAPL']
hmm_model = CointegratedHMM()
hmm_model.fit(spread)
states = hmm_model.predict_state(spread)
plt.figure(figsize=(12,4))
plt.plot(spread, label='Spread')
plt.scatter(spread.index, spread, c=states, cmap='coolwarm')
plt.title('HMM状态识别')
from pykalman import KalmanFilter
def kalman_filter_hedge(y, x):
"""动态对冲卡尔曼滤波器"""
delta = 1e-5
trans_cov = delta / (1 - delta) * np.eye(2)
kf = KalmanFilter(
n_dim_obs=1,
n_dim_state=2,
initial_state_mean=np.zeros(2),
initial_state_covariance=np.ones((2,2)),
transition_matrices=np.eye(2),
observation_matrices=np.expand_dims(np.vstack([[x], [np.ones(len(x))]]).T, axis=1),
observation_covariance=1.0,
transition_covariance=trans_cov
)
state_means, _ = kf.filter(y.values)
hedge_ratio = state_means[:,0]
return hedge_ratio
# 应用示例
hedge_ratio_kf = kalman_filter_hedge(prices['MSFT'], prices['AAPL'])
plt.plot(hedge_ratio_kf)
plt.title('卡尔曼滤波对冲比率')
class AdaptiveSpread:
def __init__(self, lookback=30, coint_threshold=0.05):
self.lookback = lookback
self.coint_threshold = coint_threshold
def calculate_spread(self, price_A, price_B):
spreads = []
hedge_ratios = []
valid = []
for i in range(self.lookback, len(price_A)):
window_A = price_A.iloc[i-self.lookback:i]
window_B = price_B.iloc[i-self.lookback:i]
# 协整检验
_, pvalue, _ = coint(window_A, window_B)
if pvalue < self.coint_threshold:
X = sm.add_constant(window_A)
model = sm.OLS(window_B, X).fit()
hr = model.params[1]
spread = window_B.iloc[-1] - hr * window_A.iloc[-1]
valid.append(True)
else:
hr = np.nan
spread = np.nan
valid.append(False)
hedge_ratios.append(hr)
spreads.append(spread)
return pd.Series(spreads, index=price_A.index[self.lookback:]), \
pd.Series(hedge_ratios, index=price_A.index[self.lookback:]), \
pd.Series(valid, index=price_A.index[self.lookback:])
# 应用示例
adapter = AdaptiveSpread(lookback=60)
spread, hr, valid = adapter.calculate_spread(prices['AAPL'], prices['MSFT'])
def adaptive_zscore(spread, vol_window=30, z_window=90):
"""动态波动率调整Z-score"""
returns = spread.diff().dropna()
volatility = returns.rolling(vol_window).std()
adj_spread = spread / volatility
mean = adj_spread.rolling(z_window).mean()
std = adj_spread.rolling(z_window).std()
return (adj_spread - mean) / std
spread_z_adj = adaptive_zscore(spread.dropna())
plt.plot(spread_z_adj)
plt.axhline(1.5, c='r', ls='--')
plt.axhline(-1.5, c='g', ls='--')
class RiskControlSystem:
def __init__(self, max_loss=0.05, trail_stop=0.03):
self.positions = {}
self.max_loss = max_loss
self.trail_stop = trail_stop
def update(self, date, price_A, price_B, positions):
for pair in positions:
entry_price_A = self.positions[pair]['price_A']
entry_price_B = self.positions[pair]['price_B']
# 计算浮动盈亏
pnl_A = (price_A - entry_price_A) / entry_price_A
pnl_B = (price_B - entry_price_B) / entry_price_B
total_pnl = pnl_B - pnl_A # 假设1:1对冲
# 止损检查
if total_pnl < -self.max_loss:
print(f"{date} 触发止损,平仓{pair}")
del self.positions[pair]
# 移动止盈
elif total_pnl > self.trail_stop:
new_stop = total_pnl - self.trail_stop
if new_stop > self.positions[pair]['trailing_stop']:
self.positions[pair]['trailing_stop'] = new_stop
from itertools import product
def grid_search(prices, entry_range, exit_range, windows):
results = []
for entry, exit, window in product(entry_range, exit_range, windows):
spread_z = zscore(spread, window)
signals = generate_signals(spread_z, entry, exit)
ret = calculate_returns(prices, signals)
stats = performance_stats(ret)
results.append({
'entry': entry,
'exit': exit,
'window': window,
'sharpe': stats['sharpe'],
'max_dd': stats['max_dd']
})
return pd.DataFrame(results)
# 示例参数范围
entry_range = [1.0, 1.5, 2.0]
exit_range = [0.5, 0.7, 1.0]
windows = [20, 30, 60]
optim_results = grid_search(prices, entry_range, exit_range, windows)
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split
def ml_signal_enhancement(spread, features, target):
"""
features: DataFrame包含技术指标特征
target: 未来价差方向标签
"""
X_train, X_test, y_train, y_test = train_test_split(features, target, test_size=0.2)
model = RandomForestClassifier(n_estimators=100)
model.fit(X_train, y_train)
proba = model.predict_proba(X_test)[:,1]
return proba
# 特征工程示例
features = pd.DataFrame({
'zscore': spread_z,
'rsi': talib.RSI(spread),
'macd': talib.MACD(spread)[0],
'volatility': spread.rolling(20).std()
}).dropna()
target = (spread.shift(-5) > spread).astype(int).iloc[:-5]
symbols = ['AAPL', 'MSFT', 'GOOG', 'AMZN', 'META']
data = yf.download(symbols, start='2018-01-01')['Close']
coint_matrix = pd.DataFrame(index=symbols, columns=symbols)
for i in range(len(symbols)):
for j in range(i+1, len(symbols)):
_, pvalue, _ = coint(data[symbols[i]], data[symbols[j]])
coint_matrix.loc[symbols[i], symbols[j]] = pvalue
plt.figure(figsize=(10,8))
sns.heatmap(coint_matrix.astype(float), annot=True, cmap='coolwarm')
plt.title('协整关系矩阵')
def risk_parity_allocation(returns, target_vol=0.15):
cov = returns.cov() * 252
n = cov.shape[0]
weights = np.ones(n) / n
def portfolio_vol(w):
return np.sqrt(w.T @ cov @ w)
cons = ({'type': 'eq', 'fun': lambda w: np.sum(w) - 1})
bounds = [(0, 1) for _ in range(n)]
res = minimize(lambda w: (portfolio_vol(w) - target_vol)**2,
weights,
method='SLSQP',
bounds=bounds,
constraints=cons)
return res.x
# 应用示例
returns = data.pct_change().dropna()
weights = risk_parity_allocation(returns)
# 综合策略回测示例
class AdvancedPairsTrading:
def __init__(self):
self.hedge_ratio = None
self.position = 0
self.trade_log = []
def update(self, price_A, price_B):
# 动态计算对冲比率
hr = kalman_filter_hedge(price_B, price_A)[-1]
# 计算自适应价差
spread = price_B - hr * price_A
z = adaptive_zscore(spread)
# 生成交易信号
if z > 1.5 and self.position <=0:
self.position = 1
self.trade_log.append(('buy', price_A, price_B))
elif z < -1.5 and self.position >=0:
self.position = -1
self.trade_log.append(('sell', price_A, price_B))
# 风险控制
if abs(z) < 0.5:
self.position = 0
本案例展示了配对交易策略的高级演进方向,实际应用中需结合具体市场环境进行参数优化和风控设计。建议通过压力测试验证策略鲁棒性,并持续监控协整关系稳定性。