[Python] 纯文本查看 复制代码
```python
import pandas as pd
import numpy as np
import logging
# 设置日志级别
logging.basicConfig(level=logging.INFO)
def flexible_calculate_macd(data, short_window_range=(10, 15), long_window_range=(20, 30), signal_window_range=(5, 10)):
"""计算 MACD 指标,参数范围可调整"""
if 'close' not in data.columns:
logging.warning("数据中缺少 'close' 列,无法计算 MACD。")
return data
short_window = np.random.randint(*short_window_range)
long_window = np.random.randint(*long_window_range)
signal_window = np.random.randint(*signal_window_range)
exp1 = data['close'].ewm(span=short_window, adjust=False).mean()
exp2 = data['close'].ewm(span=long_window, adjust=False).mean()
data['macd'] = exp1 - exp2
data['signal_line'] = data['macd'].ewm(span=signal_window, adjust=False).mean()
data['macd_diff'] = data['macd'] - data['signal_line']
return data
def flexible_calculate_rsi(data, period_range=(10, 20)):
"""计算相对强弱指标(RSI),参数范围可调整"""
if 'close' not in data.columns:
logging.warning("数据中缺少 'close' 列,无法计算 RSI。")
return data
period = np.random.randint(*period_range)
delta = data['close'].diff()
gain = delta.where(delta > 0, 0)
loss = -delta.where(delta < 0, 0)
avg_gain = gain.ewm(alpha=1/period, adjust=False).mean()
avg_loss = loss.ewm(alpha=1/period, adjust=False).mean()
rs = avg_gain / avg_loss
data['rsi'] = 100 - (100 / (1 + rs))
return data
def flexible_calculate_bias(data, n_range=(5, 8)):
"""计算乖离率(BIAS),参数范围可调整"""
if 'close' not in data.columns:
logging.warning("数据中缺少 'close' 列,无法计算 BIAS。")
return data
n = np.random.randint(*n_range)
ma = data['close'].ewm(alpha=1/n, adjust=False).mean()
data['bias'] = (data['close'] - ma) / ma * 100
return data
def flexible_calculate_williams_r(data, period_range=(10, 15)):
"""计算威廉指标(%R),参数范围可调整"""
if 'high' not in data.columns or 'low' not in data.columns or 'close' not in data.columns:
logging.warning("数据缺少 'high'、'low' 或 'close' 列,无法计算威廉指标。")
return data
period = np.random.randint(*period_range)
highest_high = data['high'].rolling(window=period).max()
lowest_low = data['low'].rolling(window=period).min()
data['williams_r'] = (highest_high - data['close']) / (highest_high - lowest_low) * -100
return data
def flexible_calculate_kdj(data, n_range=(7, 12), m1_range=(2, 4), m2_range=(2, 4)):
"""计算随机指标(KDJ),参数范围可调整"""
if 'high' not in data.columns or 'low' not in data.columns or 'close' not in data.columns:
logging.warning("数据缺少 'high'、'low' 或 'close' 列,无法计算 KDJ。")
return data
n = np.random.randint(*n_range)
m1 = np.random.randint(*m1_range)
m2 = np.random.randint(*m2_range)
lowest_low = data['low'].rolling(window=n).min()
highest_high = data['high'].rolling(window=n).max()
rsv = ((data['close'] - lowest_low) / (highest_high - lowest_low)) * 100
k = rsv.ewm(span=m1, adjust=False).mean()
d = k.ewm(span=m2, adjust=False).mean()
j = 3 * k - 2 * d
data['k'] = k
data['d'] = d
data['j'] = j
return data
def flexible_calculate_cci(data, period_range=(15, 25)):
"""计算顺势指标(CCI),参数范围可调整"""
if 'high' not in data.columns or 'low' not in data.columns or 'close' not in data.columns:
logging.warning("数据缺少 'high'、'low' 或 'close' 列,无法计算 CCI。")
return data
period = np.random.randint(*period_range)
typical_price = (data['high'] + data['low'] + data['close']) / 3
mean_deviation = typical_price.rolling(window=period).apply(lambda x: np.abs(x - x.mean()).mean())
data['cci'] = (typical_price - typical_price.rolling(window=period).mean()) / (0.015 * mean_deviation)
return data
def flexible_calculate_adx(data, period_range=(10, 15)):
"""计算平均趋向指标(ADX),参数范围可调整"""
if 'high' not in data.columns or 'low' not in data.columns or 'close' not in data.columns:
logging.warning("数据缺少 'high'、'low' 或 'close' 列,无法计算 ADX。")
return data
period = np.random.randint(*period_range)
plus_dm = data['high'].diff()
minus_dm = data['low'].diff()
plus_dm[plus_dm < 0] = 0
minus_dm[minus_dm > 0] = 0
true_range = pd.DataFrame({'high_low': data['high'] - data['low'],
'high_close': np.abs(data['high'] - data['close'].shift()),
'low_close': np.abs(data['low'] - data['close'].shift())})
true_range['tr'] = true_range.max(axis=1)
plus_di = 100 * (plus_dm.ewm(alpha=1/period, adjust=False).mean() / true_range['tr'].ewm(alpha=1/period, adjust=False).mean())
minus_di = 100 * (-minus_dm.ewm(alpha=1/period, adjust=False).mean() / true_range['tr'].ewm(alpha=1/period, adjust=False).mean())
dx = 100 * np.abs((plus_di - minus_di) / (plus_di + minus_di))
data['adx'] = dx.ewm(alpha=1/period, adjust=False).mean()
return data
def flexible_calculate_roc(data, period_range=(8, 12)):
"""计算变动率指标(ROC),参数范围可调整"""
if 'close' not in data.columns:
logging.warning("数据中缺少 'close' 列,无法计算 ROC。")
return data
period = np.random.randint(*period_range)
data['roc'] = data['close'].pct_change(periods=period)
return data
def flexible_calculate_mfi(data, period_range=(10, 15)):
"""计算资金流量指标(MFI),参数范围可调整"""
if 'high' not in data.columns or 'low' not in data.columns or 'close' not in data.columns or 'volume' not in data.columns:
logging.warning("数据缺少必要列,无法计算 MFI。")
return data
period = np.random.randint(*period_range)
typical_price = (data['high'] + data['low'] + data['close']) / 3
money_flow = typical_price * data['volume']
positive_flow = money_flow.where((data['close'] > data['close'].shift(1)), 0).rolling(window=period).sum()
negative_flow = money_flow.where((data['close'] < data['close'].shift(1)), 0).rolling(window=period).sum()
money_ratio = positive_flow / negative_flow
data['mfi'] = 100 - (100 / (1 + money_ratio))
return data
def flexible_calculate_aroon(data, period_range=(20, 30)):
"""计算阿隆指标(Aroon),参数范围可调整"""
if 'high' not in data.columns or 'low' not in data.columns:
logging.warning("数据缺少必要列,无法计算 Aroon。")
return data
period = np.random.randint(*period_range)
def aroon_calc(col):
n = len(col)
return 100 * pd.Series(col).rolling(window=period).apply(lambda x: np.argmax(x) if len(x) > 0 else 0) / n
data['aroon_up'] = aroon_calc(data['high'])
data['aroon_down'] = aroon_calc(data['low'])
return data
def flexible_calculate_stochastic_oscillator(data, k_period_range=(10, 15), d_period_range=(3, 5)):
"""计算随机震荡指标(Stochastic Oscillator),参数范围可调整"""
if 'high' not in data.columns or 'low' not in data.columns or 'close' not in data.columns:
logging.warning("数据缺少必要列,无法计算 Stochastic Oscillator。")
return data
k_period = np.random.randint(*k_period_range)
d_period = np.random.randint(*d_period_range)
lowest_low = data['low'].rolling(window=k_period).min()
highest_high = data['high'].rolling(window=k_period).max()
data['%K'] = 100 * ((data['close'] - lowest_low) / (highest_high - lowest_low))
data['%D'] = data['%K'].rolling(window=d_period).mean()
return data
def run_strategy(data, ma_window_range=(20, 40), stop_loss_pct_range=(0.03, 0.07), take_profit_pct_range=(0.1, 0.2), transaction_cost=0.001):
if 'close' not in data.columns:
logging.warning("数据中缺少 'close' 列,无法运行策略。")
return data
# 考虑中国股市涨跌停限制为 10%
data['daily_limit'] = data['close'].shift(1) * 1.1
data['lower_limit'] = data['close'].shift(1) * 0.9
# 随机选择参数,但记录下来以便后续分析和优化
ma_window = np.random.randint(*ma_window_range)
stop_loss_pct = np.random.uniform(*stop_loss_pct_range)
take_profit_pct = np.random.uniform(*take_profit_pct_range)
# 计算布林线上下轨
if len(data) < ma_window:
logging.warning(f"数据长度不足 {ma_window},无法计算布林线。")
return data
data['std'] = data['close'].rolling(window=ma_window).std()
data['upper_band'] = data['close'].rolling(window=ma_window).mean() + 2 * data['std']
data['lower_band'] = data['close'].rolling(window=ma_window).mean() - 2 * data['std']
# 计算各种指标
data = flexible_calculate_macd(data)
data = flexible_calculate_rsi(data)
data = flexible_calculate_bias(data)
data = flexible_calculate_williams_r(data)
data = flexible_calculate_kdj(data)
data = flexible_calculate_cci(data)
data = flexible_calculate_adx(data)
data = flexible_calculate_roc(data)
data = flexible_calculate_mfi(data)
data = flexible_calculate_aroon(data)
data = flexible_calculate_stochastic_oscillator(data)
# 处理缺失值
data = data.dropna()
# 计算简单移动平均线
data['sma'] = data['close'].rolling(window=ma_window).mean()
# 初始化交易信号和持仓状态
data['signal'] = 0
data['position'] = 0
data['entry_price'] = np.nan
data['strategy_params'] = f"MA_Window={ma_window}, StopLoss={stop_loss_pct:.2f}, TakeProfit={take_profit_pct:.2f}"
# 生成交易信号
for i in range(ma_window, len(data)):
# 判断上升趋势且多个指标同时满足,同时考虑价格未触及涨停
if data.at[i, 'adx'] > 25 and data.at[i, 'close'] > data.at[i - 1, 'sma'] and data.at[i - 1, 'position'] <= 0:
macd_condition = data.at[i, 'macd_diff'] > 0 and data.at[i, 'macd_diff'] > data.at[i - 1, 'macd_diff']
rsi_condition = data.at[i, 'rsi'] < 70 and data.at[i, 'rsi'] > data.at[i - 1, 'rsi']
bias_condition = data.at[i, 'bias'] > -5 and data.at[i, 'bias'] > data.at[i - 1, 'bias']
williams_r_condition = data.at[i, 'williams_r'] > -20 and data.at[i, 'williams_r'] > data.at[i - 1, 'williams_r']
kdj_condition = data.at[i, 'k'] > data.at[i, 'd'] and data.at[i, 'k'] > data.at[i - 1, 'k'] and data.at[i, 'd'] > data.at[i - 1, 'd']
cci_condition = data.at[i, 'cci'] > -100 and data.at[i, 'cci'] > data.at[i - 1, 'cci']
roc_condition = data.at[i, 'roc'] > 0 and data.at[i, 'roc'] > data.at[i - 1, 'roc']
mfi_condition = data.at[i, 'mfi'] > 50 and data.at[i, 'mfi'] > data.at[i - 1, 'mfi']
aroon_condition = data.at[i, 'aroon_up'] > data.at[i, 'aroon_down'] and data.at[i, 'aroon_up'] > data.at[i - 1, 'aroon_up'] and data.at[i, 'aroon_down'] < data.at[i - 1, 'aroon_down']
stochastic_condition = data.at[i, '%K'] < data.at[i, '%D'] and data.at[i, '%K'] > data.at[i - 1, '%K'] and data.at[i, '%D'] > data.at[i - 1, '%D']
if (macd_condition and rsi_condition and bias_condition and williams_r_condition and kdj_condition and cci_condition and roc_condition and mfi_condition and aroon_condition and stochastic_condition and data.at[i, 'close'] < data.at[i, 'daily_limit']):
data.at[i, 'signal'] = 1
data.at[i, 'position'] += 1
data.at[i, 'entry_price'] = data.at[i, 'close']
logging.info(f"发出买入提示:在 {data.at[i, 'date']} 可能是买入时机。参数:{data.at[i, 'strategy_params']}")
# 判断下降趋势且多个指标同时满足,同时考虑价格未触及跌停
elif data.at[i, 'adx'] > 25 and data.at[i, 'close'] < data.at[i - 1, 'sma'] and data.at[i - 1, 'position'] >= 0:
macd_condition = data.at[i, 'macd_diff'] < 0 and data.at[i, 'macd_diff'] < data.at[i - 1, 'macd_diff']
rsi_condition = data.at[i, 'rsi'] > 30 and data.at[i, 'rsi'] < data.at[i - 1, 'rsi']
bias_condition = data.at[i, 'bias'] < 5 and data.at[i, 'bias'] < data.at[i - 1, 'bias']
williams_r_condition = data.at[i, 'williams_r'] < -80 and data.at[i, 'williams_r'] < data.at[i - 1, 'williams_r']
kdj_condition = data.at[i, 'k'] < data.at[i, 'd'] and data.at[i, 'k'] < data.at[i - 1, 'k'] and data.at[i, 'd'] < data.at[i - 1, 'd']
cci_condition = data.at[i, 'cci'] < 100 and data.at[i, 'cci'] < data.at[i - 1, 'cci']
roc_condition = data.at[i, 'roc'] < 0 and data.at[i, 'roc'] < data.at[i - 1, 'roc']
mfi_condition = data.at[i, 'mfi'] < 50 and data.at[i, 'mfi'] < data.at[i - 1