python 量化,获取量化模型之一的技术方法,17种技术指标——量化 06
17 种技术指标代码
import pandas as pd
import numpy as np
from scipy import stats
from datetime import datetime, timedelta
import akshare as ak
import datetime
# 获取日线历史行情
def get_stock_data(symbol, table_name=None):
today = datetime.date.today()
end_date_str = today.strftime("%Y%m%d")
start_date = (today - datetime.timedelta(days=7 * 365)).strftime("%Y%m%d")
if table_name == "美股中国企业价值":
symbol = "106." + symbol
stock_data = ak.stock_us_hist(
symbol=symbol, period="daily", start_date=start_date, end_date=end_date_str, adjust="qfq"
)
elif table_name == "纳指100价值":
symbol = "105." + symbol
stock_data = ak.stock_us_hist(
symbol=symbol, period="daily", start_date=start_date, end_date=end_date_str, adjust="qfq"
)
elif table_name == "港股价值":
stock_data = ak.stock_hk_hist(
symbol=symbol, period="daily", start_date=start_date, end_date=end_date_str, adjust="qfq"
)
elif table_name == "场内基金价值":
stock_data = ak.index_zh_a_hist(
symbol=symbol, period="daily", start_date=start_date, end_date=end_date_str
)
else:
if symbol.startswith(("1", "5")):
stock_data = ak.fund_etf_hist_em(
symbol=symbol, period="daily", start_date=start_date, end_date=end_date_str, adjust="qfq"
)
else:
stock_data = ak.stock_zh_a_hist(
symbol=symbol, period="daily", start_date=start_date, end_date=end_date_str, adjust="qfq"
)
return stock_data
# 获取简单移动平均线,参数有2个,一个是数据源,一个是日期
def MA(data, n):
MA = pd.Series(data['收盘'].rolling(n).mean(), name='MA_' + str(n)).iloc[-1]
close = data['收盘'].iloc[-1]
signal = np.where(MA < close, 1, np.where(MA > close, -1, 0))
return pd.DataFrame(signal.reshape(1, -1), columns=['MA_' + str(n)])
# 获取指数移动平均线,参数有2个,一个是数据源,一个是日期
def EMA(data, n):
EMA = pd.Series(data['收盘'].ewm(span=n, min_periods=n).mean(), name='EMA_' + str(n)).iloc[-1]
close = data['收盘'].iloc[-1]
signal = np.where(EMA < close, 1, np.where(EMA > close, -1, 0))
return pd.DataFrame(signal.reshape(1, -1), columns=['EMA_' + str(n)])
# 获取一目均衡表基准线 (data, conversion_periods, base_periods, lagging_span2_periods, displacement)
# 参数有5个,第一个是数据源,其他4个分别是一目均衡表基准线 (9, 26, 52, 26),即ichimoku_cloud(data,9, 26, 52, 26)
def ichimoku_cloud(data, conversion_periods, base_periods, lagging_span2_periods, displacement):
def donchian(length):
return (data['high'].rolling(length).max() + data['low'].rolling(length).min()) / 2
conversion_line = donchian(conversion_periods)
base_line = donchian(base_periods)
lead_line1 = (conversion_line + base_line) / 2
lead_line2 = donchian(lagging_span2_periods)
# lagging_span = data['close'].shift(-displacement + 1).shift(25).ffill()
# leading_span_a = lead_line1.shift(displacement - 1)
# leading_span_b = lead_line2.shift(displacement - 1)
# ichimoku_data = pd.concat([conversion_line, base_line, lagging_span, lead_line1, lead_line2], axis=1)
# ichimoku_data.columns = ['Conversion Line', 'Base Line', 'Lagging Span', 'lead_line1', 'lead_line2']
# price = data['close'].iloc[-1]
base = base_line.iloc[-1]
conversion = conversion_line.iloc[-1]
lead1 = lead_line1.iloc[-1]
lead2 = lead_line2.iloc[-1]
conversion_prev = conversion_line.shift(1).iloc[-1]
close = data['close'].iloc[-1]
signal = np.where((base < close) & (conversion_prev < close) & (conversion > close) & (lead1 > close) & (lead1 > lead2), 1,
np.where((base > close) & (conversion_prev > close) & (conversion < close) & (lead1 < close) & (lead1 < lead2), -1, 0))
return pd.DataFrame(signal.reshape(1, -1), columns=['Ichimoku Cloud'])
# 成交量加权移动平均线 VWMA (data, 20),参数有2个,1个是数据源,另一个是日期,通过为20
def VWMA(data, n):
VWMA = pd.Series((data['close'] * data['volume']).rolling(n).sum() / data['volume'].rolling(n).sum(), name='VWMA_' + str(n)).iloc[-1]
close = data['close'].iloc[-1]
signal = np.where(VWMA < close, 1, np.where(VWMA > close, -1, 0))
return pd.DataFrame(signal.reshape(1, -1), columns=['VWMA_' + str(n)])
# 计算Hull MA船体移动平均线 Hull MA (data,9),参数有2,一个是数据源,另一个是日期,一般为9
def HullMA(data, n=9):
def wma(series, period):
weights = np.arange(1, period + 1)
return series.rolling(period).apply(lambda x: np.dot(x, weights) / weights.sum(), raw=True)
source = data['close']
wma1 = wma(source, n // 2) * 2
wma2 = wma(source, n)
hullma = wma(wma1 - wma2, int(math.floor(math.sqrt(n)))).iloc[-1]
close = data['close'].iloc[-1]
signal = np.where(hullma < close, 1, np.where(hullma > close, -1, 0))
return pd.DataFrame(signal.reshape(1, -1), columns=['HullMA_' + str(n)])
# 计算RSI指标,参数有2,一个为数据源,另一个为日期,一般为14,即RSI(data, 14)
def RSI(data, n):
lc = data['close'].shift(1)
diff = data['close'] - lc
up = diff.where(diff > 0, 0)
down = -diff.where(diff < 0, 0)
ema_up = up.ewm(alpha=1/n, adjust=False).mean()
ema_down = down.ewm(alpha=1/n, adjust=False).mean()
rs = ema_up / ema_down
rsi = 100 - 100 / (1 + rs)
rsi_plus = rsi.iloc[-1]
rsi_prev = rsi.iloc[-2]
signal = np.where((rsi_plus < 30) & (rsi_plus > rsi_prev), 1, np.where((rsi_plus > 70) & (rsi_plus < rsi_prev), -1, 0))
return pd.DataFrame(signal.reshape(1, -1), columns=['RSI_' + str(n)])
# 计算Stochastic,k是主线,d_signal是信号线,参数有4,一个是数据源,另外三个为日期,一般为STOK(data, 14, 3, 3)
def STOK(data, n, m, t):
high = data['high'].rolling(n).max()
low = data['low'].rolling(n).min()
k = 100 * (data['close'] - low) / (high - low)
d = k.rolling(m).mean()
d_signal = d.rolling(t).mean()
data['%K'] = k
data['%D'] = d
data['%D_signal'] = d_signal
main_line = data['%K'].iloc[-1]
signal_line = data['%D_signal'].iloc[-1]
main_line_prev = data['%K'].iloc[-2]
signal_line_prev = data['%D_signal'].iloc[-2]
signal = np.where((main_line < 20) & (main_line_prev < signal_line_prev) & (main_line > signal_line), 1, np.where((main_line > 80) & (main_line_prev > signal_line_prev) & (main_line < signal_line), -1, 0))
return pd.DataFrame(signal.reshape(1, -1), columns=['STOK1'])
# 计算 CCI 指标,参数有 2,一个是数据源,另一个是日期,一般为 20,即 CCI(data, 20)
def CCI(data, n):
TP = (data['high'] + data['low'] + data['close']) / 3
MA = TP.rolling(n).mean()
MD = TP.rolling(n).apply(lambda x: np.abs(x - x.mean()).mean())
CCI = (TP - MA) / (0.015 * MD)
CCI_plus = CCI.iloc[-1]
CCI_prev = CCI.iloc[-2]
signal = np.where((CCI_plus < -100) & (CCI_plus > CCI_prev), 1, np.where((CCI_plus > 100) & (CCI_plus < CCI_prev), -1, 0))
return pd.DataFrame(signal.reshape(1, -1), columns=['CCI_' + str(n)])
# 平均趋向指数ADX(14),参数有2,一个是数据源,另一个是日期,一般为14,即ADX(data,14)
def ADX(data, n):
up = data['high'] - data['high'].shift(1)
down = data['low'].shift(1) - data['low']
plusDM = pd.Series(np.where((up > down) & (up > 0), up, 0))
minusDM = pd.Series(np.where((down > up) & (down > 0), down, 0))
truerange = np.maximum(data['high'] - data['low'], np.maximum(np.abs(data['high'] - data['close'].shift()), np.abs(data['low'] - data['close'].shift())))
plus = 100 * plusDM.ewm(alpha=1/n, min_periods=n).mean() / truerange.ewm(alpha=1/n, min_periods=n).mean()
minus = 100 * minusDM.ewm(alpha=1/n, min_periods=n).mean() / truerange.ewm(alpha=1/n, min_periods=n).mean()
sum = plus + minus
adx = 100 * (np.abs(plus - minus) / np.where(sum == 0, 1, sum)).ewm(alpha=1/n, min_periods=n).mean()
adx_plusDI = adx.iloc[-1] > 20 and plus.iloc[-1] > minus.iloc[-1] and plus.iloc[-2] < minus.iloc[-2]
adx_minusDI = adx.iloc[-1] > 20 and plus.iloc[-1] < minus.iloc[-1] and plus.iloc[-2] > minus.iloc[-2]
signal = np.where(adx_plusDI, 1, np.where(adx_minusDI, -1, 0))
return pd.DataFrame(signal.reshape(1, -1), columns=['ADX'])
# 计算动量震荡指标(AO),参数只有一个,即数据源
def AO(data):
data['AO'] = (data['high'].rolling(5).mean() + data['low'].rolling(5).mean()) / 2 - (data['high'].rolling(34).mean() + data['low'].rolling(34).mean()) / 2
AO = data['AO'].iloc[-3:]
AO_plus = (AO.iloc[0] < AO.iloc[1]) and (AO.iloc[1] < AO.iloc[2]) and (AO.iloc[2] > 0)
AO_minus = (AO.iloc[0] > AO.iloc[1]) and (AO.iloc[1] > AO.iloc[2]) and (AO.iloc[2] < 0)
signal = np.where(AO_plus, 1, np.where(AO_minus, -1, 0))
return pd.DataFrame(signal.reshape(1, -1), columns=['AO'])
# 计算动量指标(10),参数只有一个,即数据源
def MTM(data):
data['MTM'] = data['close'] - data['close'].shift(10)
MTM = data['MTM'].iloc[-2:]
signal = np.where(MTM.iloc[0] < MTM.iloc[1], 1, np.where(MTM.iloc[0] > MTM.iloc[1], -1, 0))
return pd.DataFrame(signal.reshape(1, -1), columns=['MTM'])
# MACD_1是以金叉和死叉进行判断,参数有3个,第一个是数据源,其余两个为日期,一般取12和26,即MACD(data, 12,26)
def MACD_1(data, n_fast, n_slow):
EMAfast = data['close'].ewm(span=n_fast, min_periods=n_slow).mean()
EMAslow = data['close'].ewm(span=n_slow, min_periods=n_slow).mean()
data['MACD'] = EMAfast - EMAslow
data['MACDsignal'] = data['MACD'].ewm(span=9, min_periods=9).mean()
data['MACDhist'] = data['MACD'] - data['MACDsignal']
signal = np.where(data['MACDhist'].iloc[-2] < 0 and data['MACDhist'].iloc[-1] > 0, 1, np.where(data['MACDhist'].iloc[-2] > 0 and data['MACDhist'].iloc[-1] < 0, -1, 0))
return pd.DataFrame(signal.reshape(1, -1), columns=['MACD_1'])
# MACD_2是以MACD柱是否大于0进行判断,tradingview的判断依据
def MACD_2(data, n_fast, n_slow):
EMAfast = data['close'].ewm(span=n_fast, min_periods=n_slow).mean()
EMAslow = data['close'].ewm(span=n_slow, min_periods=n_slow).mean()
data['MACD'] = EMAfast - EMAslow
data['MACDsignal'] = data['MACD'].ewm(span=9, min_periods=9).mean()
data['MACDhist'] = data['MACD'] - data['MACDsignal']
signal = np.where(data['MACDhist'].iloc[-1] > 0, 1, np.where(data['MACDhist'].iloc[-1] < 0, -1, 0))
return pd.DataFrame(signal.reshape(1, -1), columns=['MACD_2'])
# 计算Stoch_RSI(data,3, 3, 14, 14),有5个参数,第1个为数据源
def Stoch_RSI(data, smoothK=3, smoothD=3, lengthRSI=14, lengthStoch=14):
# 计算RSI
lc = data['close'].shift(1)
diff = data['close'] - lc
up = diff.where(diff > 0, 0)
down = -diff.where(diff < 0, 0)
ema_up = up.ewm(alpha=1/lengthRSI, adjust=False).mean()
ema_down = down.ewm(alpha=1/lengthRSI, adjust=False).mean()
rs = ema_up / ema_down
rsi = 100 - 100 / (1 + rs)
# 计算Stochastic
stoch = (rsi - rsi.rolling(window=lengthStoch).min()) / (rsi.rolling(window=lengthStoch).max() - rsi.rolling(window=lengthStoch).min())
k = stoch.rolling(window=smoothK).mean()
d = k.rolling(window=smoothD).mean()
# 添加到data中
data['Stoch_RSI_K'] = k * 100
data['Stoch_RSI_D'] = d * 100
trend = np.where(data['close'].iloc[-1] > data['close'].iloc[-10], 1, -1)
signal = np.where(trend == 1 and data['Stoch_RSI_K'].iloc[-1] < 20 and data['Stoch_RSI_D'].iloc[-1] < 20 and data['Stoch_RSI_K'].iloc[-1] > data['Stoch_RSI_D'].iloc[-1] and data['Stoch_RSI_K'].iloc[-2] < data['Stoch_RSI_D'].iloc[-2], 1, np.where(trend == -1 and data['Stoch_RSI_K'].iloc[-1] > 80 and data['Stoch_RSI_D'].iloc[-1] > 80 and data['Stoch_RSI_K'].iloc[-1] < data['Stoch_RSI_D'].iloc[-1] and data['Stoch_RSI_K'].iloc[-2] > data['Stoch_RSI_D'].iloc[-2], -1, 0))
return pd.DataFrame(signal.reshape(1, -1), columns=['Stoch_RSI'])
# 计算威廉百分比变动,参数有2,第1是数据源,第二是日期,一般为14,即WPR(data, 14)
def WPR(data, n):
WPR = pd.Series((data['high'].rolling(n).max() - data['close']) / (data['high'].rolling(n).max() - data['low'].rolling(n).min()) * -100, name='WPR_' + str(n))
lower_band = -80
upper_band = -20
signal = np.where((WPR.iloc[-1] < lower_band) & (WPR.iloc[-1] > WPR.iloc[-2]), 1, np.where((WPR.iloc[-1] > upper_band) & (WPR.iloc[-1] < WPR.iloc[-2]), -1, 0))
return pd.DataFrame(signal.reshape(1, -1), columns=['WPR_' + str(n)])
# 计算Bull Bear Power牛熊力量(BBP),参数有2,一个是数据源,另一个是日期,一般为20,但在tradingview取13,即BBP(data, 13)
def BBP(data, n):
bullPower = data['high'] - data['close'].ewm(span=n).mean()
bearPower = data['low'] - data['close'].ewm(span=n).mean()
BBP = bullPower + bearPower
data['BBP'] = BBP
trend = np.where(data['close'].iloc[-1] > data['close'].iloc[-n], 1, np.where(data['close'].iloc[-1] < data['close'].iloc[-n], -1, 0))
signal = np.where((trend == 1) & (bearPower.iloc[-1] < 0) & (bearPower.iloc[-1] > bearPower.iloc[-2]), 1, np.where((trend == -1) & (bullPower.iloc[-1] > 0) & (bullPower.iloc[-1] < bullPower.iloc[-2]), -1, 0))
return pd.DataFrame(signal.reshape(1, -1), columns=['BBP'])
# 计算Ultimate Oscillator终极震荡指标UO (data,7, 14, 28),有4个参数,第1个是数据源,其他的是日期
def UO(data, n1, n2, n3):
min_low_or_close = pd.concat([data['low'], data['close'].shift(1)], axis=1).min(axis=1)
max_high_or_close = pd.concat([data['high'], data['close'].shift(1)], axis=1).max(axis=1)
bp = data['close'] - min_low_or_close
tr_ = max_high_or_close - min_low_or_close
avg7 = bp.rolling(n1).sum() / tr_.rolling(n1).sum()
avg14 = bp.rolling(n2).sum() / tr_.rolling(n2).sum()
avg28 = bp.rolling(n3).sum() / tr_.rolling(n3).sum()
UO = 100 * ((4 * avg7) + (2 * avg14) + avg28) / 7
signal = np.where(UO.iloc[-1] > 70, 1, np.where(UO.iloc[-1] < 30, -1, 0))
return pd.DataFrame(signal.reshape(1, -1), columns=['UO'])
if __name__ == '__main__':
# MA(get_stock_data(symbol, table_name=None), n)
data = get_stock_data('000001', table_name=None)
data1 = MA(data, 10)
data2 = EMA(data, 10)
print(data)
print(data1)
print(data2)
代码说明
上面代码共有 18 个自定义函数:
- def get_stock_data(symbol, table_name=None):获取日线历史行情数据,可以是指数、股票和基金。参数有 2 个,分别是代码和数据来源对代码增加市场,第 2 个为非必要参数。
- 其他 17 个自定义函数都是技术指标函数,包括 MA、IC、VWMA、HullMA、RSI、STOK、CCI、ADX、AO、MTM、MACD_1、MACD_2、Stoch_RSI、WPR、BBP 和 UO。其中技术指标的参数 n 都是技术的日期,比如 MA(data, 10)为 10 日均线指标。
需要修改的代码
根据自己的需要求不同日期的技术指标,比如需要判断 10 日均线是否符合买入,可以用 MA(data, 10)获取 10 日均线是否符合。若返回结果为 1,则说明现价>10日均线。
#金融 #投资 #股票 #数据分析 #基金 #python量化
版权声明:
作者:余汉波
链接:https://www.sanrenjz.com/2023/06/26/python-%e9%87%8f%e5%8c%96%ef%bc%8c%e8%8e%b7%e5%8f%96%e9%87%8f%e5%8c%96%e6%a8%a1%e5%9e%8b%e4%b9%8b%e4%b8%80%e7%9a%84%e6%8a%80%e6%9c%af%e6%96%b9%e6%b3%95%ef%bc%8c17%e7%a7%8d%e6%8a%80%e6%9c%af%e6%8c%87/
文章版权归作者所有,未经允许请勿转载。
THE END