python 量化,获取量化模型之一的技术方法,17种技术指标——量化 06

17 种技术指标代码

import pandas as pd
import numpy as np
from scipy import stats
from datetime import datetime, timedelta
import akshare as ak
import datetime

# 获取日线历史行情
def get_stock_data(symbol, table_name=None):

    today = datetime.date.today()
    end_date_str = today.strftime("%Y%m%d")
    start_date = (today - datetime.timedelta(days=7 * 365)).strftime("%Y%m%d")

    if table_name == "美股中国企业价值":
        symbol = "106." + symbol
        stock_data = ak.stock_us_hist(
            symbol=symbol, period="daily", start_date=start_date, end_date=end_date_str, adjust="qfq"
        )
    elif table_name == "纳指100价值":
        symbol = "105." + symbol
        stock_data = ak.stock_us_hist(
            symbol=symbol, period="daily", start_date=start_date, end_date=end_date_str, adjust="qfq"
        )
    elif table_name == "港股价值":
        stock_data = ak.stock_hk_hist(
            symbol=symbol, period="daily", start_date=start_date, end_date=end_date_str, adjust="qfq"
        )
    elif table_name == "场内基金价值":
        stock_data = ak.index_zh_a_hist(
            symbol=symbol, period="daily", start_date=start_date, end_date=end_date_str
        )
    else:
        if symbol.startswith(("1", "5")):
            stock_data = ak.fund_etf_hist_em(
                symbol=symbol, period="daily", start_date=start_date, end_date=end_date_str, adjust="qfq"
            )
        else:
            stock_data = ak.stock_zh_a_hist(
                symbol=symbol, period="daily", start_date=start_date, end_date=end_date_str, adjust="qfq"
            )

    return stock_data
# 获取简单移动平均线,参数有2个,一个是数据源,一个是日期
def MA(data, n):
    MA = pd.Series(data['收盘'].rolling(n).mean(), name='MA_' + str(n)).iloc[-1]
    close = data['收盘'].iloc[-1]
    signal = np.where(MA < close, 1, np.where(MA > close, -1, 0))
    return pd.DataFrame(signal.reshape(1, -1), columns=['MA_' + str(n)]) 

# 获取指数移动平均线,参数有2个,一个是数据源,一个是日期
def EMA(data, n):
    EMA = pd.Series(data['收盘'].ewm(span=n, min_periods=n).mean(), name='EMA_' + str(n)).iloc[-1]
    close = data['收盘'].iloc[-1]
    signal = np.where(EMA < close, 1, np.where(EMA > close, -1, 0))
    return pd.DataFrame(signal.reshape(1, -1), columns=['EMA_' + str(n)])

# 获取一目均衡表基准线 (data, conversion_periods, base_periods, lagging_span2_periods, displacement)
# 参数有5个,第一个是数据源,其他4个分别是一目均衡表基准线 (9, 26, 52, 26),即ichimoku_cloud(data,9, 26, 52, 26)
def ichimoku_cloud(data, conversion_periods, base_periods, lagging_span2_periods, displacement):
    def donchian(length):
        return (data['high'].rolling(length).max() + data['low'].rolling(length).min()) / 2
    
    conversion_line = donchian(conversion_periods)
    base_line = donchian(base_periods)
    lead_line1 = (conversion_line + base_line) / 2
    lead_line2 = donchian(lagging_span2_periods)
    
    # lagging_span = data['close'].shift(-displacement + 1).shift(25).ffill()
    # leading_span_a = lead_line1.shift(displacement - 1)
    # leading_span_b = lead_line2.shift(displacement - 1)
    
    # ichimoku_data = pd.concat([conversion_line, base_line, lagging_span, lead_line1, lead_line2], axis=1)
    # ichimoku_data.columns = ['Conversion Line', 'Base Line', 'Lagging Span', 'lead_line1', 'lead_line2']
    # price = data['close'].iloc[-1]
    base = base_line.iloc[-1]
    conversion = conversion_line.iloc[-1]
    lead1 = lead_line1.iloc[-1]
    lead2 = lead_line2.iloc[-1]
    conversion_prev = conversion_line.shift(1).iloc[-1]
    
    close = data['close'].iloc[-1]
    signal = np.where((base < close) & (conversion_prev < close) & (conversion > close) & (lead1 > close) & (lead1 > lead2), 1,
                      np.where((base > close) & (conversion_prev > close) & (conversion < close) & (lead1 < close) & (lead1 < lead2), -1, 0))
    
    return pd.DataFrame(signal.reshape(1, -1), columns=['Ichimoku Cloud'])

# 成交量加权移动平均线 VWMA (data, 20),参数有2个,1个是数据源,另一个是日期,通过为20
def VWMA(data, n):
    VWMA = pd.Series((data['close'] * data['volume']).rolling(n).sum() / data['volume'].rolling(n).sum(), name='VWMA_' + str(n)).iloc[-1]
    close = data['close'].iloc[-1]
    signal = np.where(VWMA < close, 1, np.where(VWMA > close, -1, 0))
    return pd.DataFrame(signal.reshape(1, -1), columns=['VWMA_' + str(n)])

# 计算Hull MA船体移动平均线 Hull MA (data,9),参数有2,一个是数据源,另一个是日期,一般为9
def HullMA(data, n=9):
    def wma(series, period):
        weights = np.arange(1, period + 1)
        return series.rolling(period).apply(lambda x: np.dot(x, weights) / weights.sum(), raw=True)
    
    source = data['close']
    wma1 = wma(source, n // 2) * 2
    wma2 = wma(source, n)
    hullma = wma(wma1 - wma2, int(math.floor(math.sqrt(n)))).iloc[-1]
    close = data['close'].iloc[-1]
    signal = np.where(hullma < close, 1, np.where(hullma > close, -1, 0))
    return pd.DataFrame(signal.reshape(1, -1), columns=['HullMA_' + str(n)])

# 计算RSI指标,参数有2,一个为数据源,另一个为日期,一般为14,即RSI(data, 14)
def RSI(data, n):
    lc = data['close'].shift(1)
    diff = data['close'] - lc
    up = diff.where(diff > 0, 0)
    down = -diff.where(diff < 0, 0)
    ema_up = up.ewm(alpha=1/n, adjust=False).mean()
    ema_down = down.ewm(alpha=1/n, adjust=False).mean()
    rs = ema_up / ema_down
    rsi = 100 - 100 / (1 + rs)
    rsi_plus = rsi.iloc[-1]
    rsi_prev = rsi.iloc[-2]
    signal = np.where((rsi_plus < 30) & (rsi_plus > rsi_prev), 1, np.where((rsi_plus > 70) & (rsi_plus < rsi_prev), -1, 0))
    return pd.DataFrame(signal.reshape(1, -1), columns=['RSI_' + str(n)])

# 计算Stochastic,k是主线,d_signal是信号线,参数有4,一个是数据源,另外三个为日期,一般为STOK(data, 14, 3, 3)
def STOK(data, n, m, t):
    high = data['high'].rolling(n).max()
    low = data['low'].rolling(n).min()
    k = 100 * (data['close'] - low) / (high - low)
    d = k.rolling(m).mean()
    d_signal = d.rolling(t).mean()
    data['%K'] = k
    data['%D'] = d
    data['%D_signal'] = d_signal
    main_line = data['%K'].iloc[-1]
    signal_line = data['%D_signal'].iloc[-1]
    main_line_prev = data['%K'].iloc[-2]
    signal_line_prev = data['%D_signal'].iloc[-2]
    signal = np.where((main_line < 20) & (main_line_prev < signal_line_prev) & (main_line > signal_line), 1, np.where((main_line > 80) & (main_line_prev > signal_line_prev) & (main_line < signal_line), -1, 0))
    return pd.DataFrame(signal.reshape(1, -1), columns=['STOK1'])

# 计算 CCI 指标,参数有 2,一个是数据源,另一个是日期,一般为 20,即 CCI(data, 20)
def CCI(data, n):
    TP = (data['high'] + data['low'] + data['close']) / 3
    MA = TP.rolling(n).mean()
    MD = TP.rolling(n).apply(lambda x: np.abs(x - x.mean()).mean())
    CCI = (TP - MA) / (0.015 * MD)
    CCI_plus = CCI.iloc[-1]
    CCI_prev = CCI.iloc[-2]
    signal = np.where((CCI_plus < -100) & (CCI_plus > CCI_prev), 1, np.where((CCI_plus > 100) & (CCI_plus < CCI_prev), -1, 0))
    return pd.DataFrame(signal.reshape(1, -1), columns=['CCI_' + str(n)])


# 平均趋向指数ADX(14),参数有2,一个是数据源,另一个是日期,一般为14,即ADX(data,14)
def ADX(data, n):
    up = data['high'] - data['high'].shift(1)
    down = data['low'].shift(1) - data['low']
    plusDM = pd.Series(np.where((up > down) & (up > 0), up, 0))
    minusDM = pd.Series(np.where((down > up) & (down > 0), down, 0))
    truerange = np.maximum(data['high'] - data['low'], np.maximum(np.abs(data['high'] - data['close'].shift()), np.abs(data['low'] - data['close'].shift())))
    plus = 100 * plusDM.ewm(alpha=1/n, min_periods=n).mean() / truerange.ewm(alpha=1/n, min_periods=n).mean()
    minus = 100 * minusDM.ewm(alpha=1/n, min_periods=n).mean() / truerange.ewm(alpha=1/n, min_periods=n).mean()
    sum = plus + minus
    adx = 100 * (np.abs(plus - minus) / np.where(sum == 0, 1, sum)).ewm(alpha=1/n, min_periods=n).mean()
    adx_plusDI = adx.iloc[-1] > 20 and plus.iloc[-1] > minus.iloc[-1] and plus.iloc[-2] < minus.iloc[-2]
    adx_minusDI = adx.iloc[-1] > 20 and plus.iloc[-1] < minus.iloc[-1] and plus.iloc[-2] > minus.iloc[-2]
    signal = np.where(adx_plusDI, 1, np.where(adx_minusDI, -1, 0))
    return pd.DataFrame(signal.reshape(1, -1), columns=['ADX'])

# 计算动量震荡指标(AO),参数只有一个,即数据源
def AO(data):
    data['AO'] = (data['high'].rolling(5).mean() + data['low'].rolling(5).mean()) / 2 - (data['high'].rolling(34).mean() + data['low'].rolling(34).mean()) / 2
    AO = data['AO'].iloc[-3:]
    AO_plus = (AO.iloc[0] < AO.iloc[1]) and (AO.iloc[1] < AO.iloc[2]) and (AO.iloc[2] > 0)
    AO_minus = (AO.iloc[0] > AO.iloc[1]) and (AO.iloc[1] > AO.iloc[2]) and (AO.iloc[2] < 0)
    signal = np.where(AO_plus, 1, np.where(AO_minus, -1, 0))
    return pd.DataFrame(signal.reshape(1, -1), columns=['AO'])

# 计算动量指标(10),参数只有一个,即数据源
def MTM(data):
    data['MTM'] = data['close'] - data['close'].shift(10)
    MTM = data['MTM'].iloc[-2:]
    signal = np.where(MTM.iloc[0] < MTM.iloc[1], 1, np.where(MTM.iloc[0] > MTM.iloc[1], -1, 0))
    return pd.DataFrame(signal.reshape(1, -1), columns=['MTM'])

# MACD_1是以金叉和死叉进行判断,参数有3个,第一个是数据源,其余两个为日期,一般取12和26,即MACD(data, 12,26)
def MACD_1(data, n_fast, n_slow):
    EMAfast = data['close'].ewm(span=n_fast, min_periods=n_slow).mean()
    EMAslow = data['close'].ewm(span=n_slow, min_periods=n_slow).mean()
    data['MACD'] = EMAfast - EMAslow
    data['MACDsignal'] = data['MACD'].ewm(span=9, min_periods=9).mean()
    data['MACDhist'] = data['MACD'] - data['MACDsignal']
    signal = np.where(data['MACDhist'].iloc[-2] < 0 and data['MACDhist'].iloc[-1] > 0, 1, np.where(data['MACDhist'].iloc[-2] > 0 and data['MACDhist'].iloc[-1] < 0, -1, 0))
    return pd.DataFrame(signal.reshape(1, -1), columns=['MACD_1'])

# MACD_2是以MACD柱是否大于0进行判断,tradingview的判断依据
def MACD_2(data, n_fast, n_slow):
    EMAfast = data['close'].ewm(span=n_fast, min_periods=n_slow).mean()
    EMAslow = data['close'].ewm(span=n_slow, min_periods=n_slow).mean()
    data['MACD'] = EMAfast - EMAslow
    data['MACDsignal'] = data['MACD'].ewm(span=9, min_periods=9).mean()
    data['MACDhist'] = data['MACD'] - data['MACDsignal']
    signal = np.where(data['MACDhist'].iloc[-1] > 0, 1, np.where(data['MACDhist'].iloc[-1] < 0, -1, 0))
    return pd.DataFrame(signal.reshape(1, -1), columns=['MACD_2'])

# 计算Stoch_RSI(data,3, 3, 14, 14),有5个参数,第1个为数据源
def Stoch_RSI(data, smoothK=3, smoothD=3, lengthRSI=14, lengthStoch=14):
    # 计算RSI
    lc = data['close'].shift(1)
    diff = data['close'] - lc
    up = diff.where(diff > 0, 0)
    down = -diff.where(diff < 0, 0)
    ema_up = up.ewm(alpha=1/lengthRSI, adjust=False).mean()
    ema_down = down.ewm(alpha=1/lengthRSI, adjust=False).mean()
    rs = ema_up / ema_down
    rsi = 100 - 100 / (1 + rs)
    # 计算Stochastic
    stoch = (rsi - rsi.rolling(window=lengthStoch).min()) / (rsi.rolling(window=lengthStoch).max() - rsi.rolling(window=lengthStoch).min())
    k = stoch.rolling(window=smoothK).mean()
    d = k.rolling(window=smoothD).mean()
    # 添加到data中
    data['Stoch_RSI_K'] = k * 100
    data['Stoch_RSI_D'] = d * 100
    trend = np.where(data['close'].iloc[-1] > data['close'].iloc[-10], 1, -1)
    signal = np.where(trend == 1 and data['Stoch_RSI_K'].iloc[-1] < 20 and data['Stoch_RSI_D'].iloc[-1] < 20 and data['Stoch_RSI_K'].iloc[-1] > data['Stoch_RSI_D'].iloc[-1] and data['Stoch_RSI_K'].iloc[-2] < data['Stoch_RSI_D'].iloc[-2], 1, np.where(trend == -1 and data['Stoch_RSI_K'].iloc[-1] > 80 and data['Stoch_RSI_D'].iloc[-1] > 80 and data['Stoch_RSI_K'].iloc[-1] < data['Stoch_RSI_D'].iloc[-1] and data['Stoch_RSI_K'].iloc[-2] > data['Stoch_RSI_D'].iloc[-2], -1, 0))
    return pd.DataFrame(signal.reshape(1, -1), columns=['Stoch_RSI'])

# 计算威廉百分比变动,参数有2,第1是数据源,第二是日期,一般为14,即WPR(data, 14)
def WPR(data, n):
    WPR = pd.Series((data['high'].rolling(n).max() - data['close']) / (data['high'].rolling(n).max() - data['low'].rolling(n).min()) * -100, name='WPR_' + str(n))
    lower_band = -80
    upper_band = -20
    signal = np.where((WPR.iloc[-1] < lower_band) & (WPR.iloc[-1] > WPR.iloc[-2]), 1, np.where((WPR.iloc[-1] > upper_band) & (WPR.iloc[-1] < WPR.iloc[-2]), -1, 0))
    return pd.DataFrame(signal.reshape(1, -1), columns=['WPR_' + str(n)])

# 计算Bull Bear Power牛熊力量(BBP),参数有2,一个是数据源,另一个是日期,一般为20,但在tradingview取13,即BBP(data, 13)
def BBP(data, n):
    bullPower = data['high'] - data['close'].ewm(span=n).mean()
    bearPower = data['low'] - data['close'].ewm(span=n).mean()
    BBP = bullPower + bearPower
    data['BBP'] = BBP
    trend = np.where(data['close'].iloc[-1] > data['close'].iloc[-n], 1, np.where(data['close'].iloc[-1] < data['close'].iloc[-n], -1, 0))
    signal = np.where((trend == 1) & (bearPower.iloc[-1] < 0) & (bearPower.iloc[-1] > bearPower.iloc[-2]), 1, np.where((trend == -1) & (bullPower.iloc[-1] > 0) & (bullPower.iloc[-1] < bullPower.iloc[-2]), -1, 0))
    return pd.DataFrame(signal.reshape(1, -1), columns=['BBP'])

# 计算Ultimate Oscillator终极震荡指标UO (data,7, 14, 28),有4个参数,第1个是数据源,其他的是日期
def UO(data, n1, n2, n3):
    min_low_or_close = pd.concat([data['low'], data['close'].shift(1)], axis=1).min(axis=1)
    max_high_or_close = pd.concat([data['high'], data['close'].shift(1)], axis=1).max(axis=1)
    bp = data['close'] - min_low_or_close
    tr_ = max_high_or_close - min_low_or_close
    avg7 = bp.rolling(n1).sum() / tr_.rolling(n1).sum()
    avg14 = bp.rolling(n2).sum() / tr_.rolling(n2).sum()
    avg28 = bp.rolling(n3).sum() / tr_.rolling(n3).sum()
    UO = 100 * ((4 * avg7) + (2 * avg14) + avg28) / 7
    signal = np.where(UO.iloc[-1] > 70, 1, np.where(UO.iloc[-1] < 30, -1, 0))
    return pd.DataFrame(signal.reshape(1, -1), columns=['UO'])
    

if __name__ == '__main__':
    # MA(get_stock_data(symbol, table_name=None), n)
    data = get_stock_data('000001', table_name=None)
    data1 = MA(data, 10)
    data2 = EMA(data, 10)
    print(data)
    print(data1)
    print(data2)

代码说明

上面代码共有 18 个自定义函数:

  1. def get_stock_data(symbol, table_name=None):获取日线历史行情数据,可以是指数、股票和基金。参数有 2 个,分别是代码和数据来源对代码增加市场,第 2 个为非必要参数。
  2. 其他 17 个自定义函数都是技术指标函数,包括 MA、IC、VWMA、HullMA、RSI、STOK、CCI、ADX、AO、MTM、MACD_1、MACD_2、Stoch_RSI、WPR、BBP 和 UO。其中技术指标的参数 n 都是技术的日期,比如 MA(data, 10)为 10 日均线指标。

需要修改的代码

根据自己的需要求不同日期的技术指标,比如需要判断 10 日均线是否符合买入,可以用 MA(data, 10)获取 10 日均线是否符合。若返回结果为 1,则说明现价>10日均线。

#金融 #投资 #股票 #数据分析 #基金 #python量化

THE END