打地鼠策略:一种动态止盈或买入策略

下面代码描述了一个名为“打地鼠”的量化交易策略,主要用于股票市场的自动交易。它通过设定特定的止盈和买入条件,动态管理股票交易,以达到保护收益和寻找买入机会的目的。这里将详细解释代码的功能,并进一步阐述其在金融量化编程中的应用。

db_path = r'D:\wenjian\python\smart\data\guojin_account.db'

# 读取需要打地鼠的数据库中的持仓数据,并保存到数据表——与策略一起运行,与打地鼠策略无直接关系。
def get_filtered_data(db_path: str, table_names: list, output_table_name: str):
    """
    从SQLite数据库中读取多个表的数据,并筛选出对于每个'证券代码',
    所有'成交数量'乘以'买卖'的和大于0的行,并将合并结果保存到指定的数据表中。

    :param db_path: 数据库文件的路径。
    :param table_names: 数据表名称的列表。
    :param output_table_name: 输出数据表的名称。
    """
    try:
        # 创建到SQLite数据库的连接
        conn = sqlite3.connect(db_path)
        
        # 用于存储每个表筛选结果的DataFrame列表
        dfs = []

        for table_name in table_names:
            # 读取每个表的数据
            query = f"SELECT * FROM {table_name}"
            df = pd.read_sql_query(query, conn)

            # 计算每个'证券代码'的'成交数量'乘以'买卖'的总和
            df['product'] = df['成交数量'] * df['买卖']
            grouped = df.groupby('证券代码')['product'].sum().reset_index(name='持仓数量')

            # 筛选出总和大于0的'证券代码'
            filtered_codes = grouped[grouped['持仓数量'] > 0]['证券代码']

            # 从原始DataFrame中筛选出具有这些代码的行,并合并计算的总和
            filtered_df = df[df['证券代码'].isin(filtered_codes)]

            # 只保留需要的列
            final_df = filtered_df[['策略名称', '证券代码']].drop_duplicates().merge(grouped, on='证券代码')
            
            # 将筛选后的DataFrame添加到列表中
            dfs.append(final_df)
        
        # 合并所有表的结果
        merged_df = pd.concat(dfs, ignore_index=True)

        # 将合并后的结果保存到指定的数据表中
        merged_df.to_sql(output_table_name, conn, if_exists='replace', index=False)

    except Exception as e:
        print(f"An error occurred: {e}")
    finally:
        # 关闭数据库连接
        if conn:
            conn.close()

# 获取行情数据,并进行筛选,筛选出符合条件的标的
def get_snapshot(code_list: List[str]):
    # 获取标的快照数据
    df = xtdata.get_full_tick(code_list)
    df = DataFrame.from_dict(df).T.reset_index().rename(columns={'index': '证券代码'})

    # 计算标的均价
    bidPrice_columns = ['bid1', 'bid2', 'bid3', 'bid4', 'bid5']
    askPrice_columns = ['ask1', 'ask2', 'ask3', 'ask4', 'ask5']
    df[bidPrice_columns] = df['bidPrice'].apply(Series, index=bidPrice_columns)
    df[askPrice_columns] = df['askPrice'].apply(Series, index=askPrice_columns)

    # 对可能需要转换为float的列进行转换
    float_columns = ['bid1', 'bid2', 'bid3', 'bid4', 'bid5', 'ask1', 'ask2', 'ask3', 'ask4', 'ask5', 'amount', 'lastClose', 'volume']
    for col in float_columns:
        df[col] = pd.to_numeric(df[col], errors='coerce')

    df['averagePrice'] = (df['bid1'] + df['ask1']) / 2              # 求买1和卖1的平均价
    df.loc[(df['bid1'] == 0) | (df['ask1'] == 0), 'averagePrice'] = df['bid1'] + df['ask1'] # 涨跌停修正

    df.rename(columns={'averagePrice': 'close', 'lastClose': 'pre_close', 'volume': 'vol'}, inplace=True)
    df['amount'] = df['amount'] / 1e4
    df = df[(df.close != 0) & (df.high != 0)] # 现价大于1的标的

    # 计算衍生指标
    df['pct_chg'] = ((df.close / df.pre_close - 1) * 100)   # 今日涨跌幅(%)
    df['max_pct_chg'] = ((df.high / df.pre_close - 1) * 100)    # 最大涨跌幅(%)

    # 展示列,分别表示:代码、买1和卖1平均价、今日涨跌幅(%)、最大涨跌幅(%)、最高价、最低价、昨收价、成交量、成交额(万元)
    display_columns = ['证券代码', 'close', 'pct_chg', 'max_pct_chg', 'high', 'low', 'pre_close', 'vol', 'amount']
    df = df[display_columns]
    return df


# 读取指定数据库表中的'证券代码'列,获取对应的行情数据和持仓量,最后打印合并后的数据。
def process_and_merge_data(db_path: str, table_name: str, acc: str):
    """
    读取指定数据库表中的'证券代码'列,获取对应的行情数据和持仓量,
    并将行情数据与原始表数据以及持仓数据合并,最后打印合并后的数据。

    :param db_path: 数据库文件的路径。
    :param table_name: 读取证券代码的数据表名称。
    :param acc: 账户标识符。
    """
    try:
        # 连接到数据库
        conn = sqlite3.connect(db_path)

        # 从数据库读取证券代码
        query = f"SELECT 证券代码 FROM {table_name}"
        strategy_data = pd.read_sql_query(query, conn)

        # 重新读取完整的数据表
        complete_strategy_data = pd.read_sql_query(f"SELECT * FROM {table_name}", conn)

        # 获取行情数据
        codes = strategy_data['证券代码'].tolist()
        market_data = get_snapshot(codes)  # 确保这个函数返回一个包含'证券代码'列的DataFrame

        # 获取所有持仓信息
        positions = xt_trader.query_stock_positions(acc)  # 获取所有持仓
        positions_data = pd.DataFrame([{'证券代码': pos.stock_code, '持仓量': pos.volume} for pos in positions])

        # 合并策略数据和行情数据
        merged_data = pd.merge(complete_strategy_data, market_data, on='证券代码', how='left')
        # 再将持仓数据合并到已有的合并数据中
        final_merged_data = pd.merge(merged_data, positions_data, on='证券代码', how='left')

        # 打印合并后的数据
        # print(final_merged_data)
        return final_merged_data

    except Exception as e:
        print(f"An error occurred: {e}")
    finally:
        # 关闭数据库连接
        conn.close()



# 条件判断,并进行委托,参数为:数据库路径、数据库表名称、账户标识符、止盈条件、买入条件
def mole_hunting_delegation(db_path: str, table_name: str, acc: str, drawdown: float, active_thres: float, deal_thres: float):
    """
    读取指定数据库表中的'证券代码'列,获取对应的行情数据,
    并根据止盈条件和买入条件处理数据,最后返回处理后的数据。

    :param db_path: 数据库文件的路径。
    :param table_name: 读取证券代码的数据表名称。
    :param acc: 账户标识符。
    :param drawdown: 触发止盈的最大回撤跌幅。
    :param active_thres: 激活止盈的最大涨幅阈值。
    :param deal_thres: 执行止盈的最小涨幅阈值。
    :return: 处理后的DataFrame。
    """
    try:
        # 执行函数并获得DataFrame
        df = process_and_merge_data(db_path, table_name, acc)

        # 确保df不是None
        if df is None:
            raise ValueError("Returned DataFrame is None")

        # 判断是否触发止盈条件
        sell_cond = (df['max_pct_chg'] - df['pct_chg']) > drawdown   # 最大涨幅减去今日涨幅大于最大回撤跌幅,即触发上涨止盈后回撤多少止盈
        sell_cond &= df['max_pct_chg'] > active_thres   # 最大涨幅>多少点后,触发移动止盈
        sell_cond &= df['pct_chg'] > deal_thres  # 今日涨幅>多少点后,才可以止盈
        sell_cond &= df['持仓数量'] == df['持仓量']   # 假设只有满仓时才考虑止盈,持仓数量是计划持仓数量,持仓量是实际持仓数量
        sell_df = df[sell_cond]

        # 判断是否触发买入条件(这里需要您定义买入条件)
        buy_cond = (df['max_pct_chg'] - df['pct_chg']) > (drawdown+1.0)   # 最大涨幅减去今日涨幅大于最大回撤跌幅再加1%,以此来确保卖出后买入有1%的收益率
        buy_cond &= df['max_pct_chg'] > active_thres   # 最大涨幅>多少点后,触发移动止盈,只有触发移动止盈,即有了打地鼠之后再会有买回的考虑
        buy_cond &= (df['high'] + df['low'] + df['pre_close'])/3 > df['close']    # (最高价+最低价+昨收价)的平均价大于现价,即买入的价格在平均价上下
        buy_cond &= df['持仓数量'] > df['持仓量']   # 假设无持仓时才考虑买入
        buy_df = df[buy_cond]

        # 执行卖出操作
        if not sell_df.empty:
            sell_df['target_price'] = sell_df['close'] * 0.98
            sell_df['order_volume'] = sell_df['持仓数量']/2   # 假设卖出一半
            sell_df['remark'] = '动态止盈单'
            for index, row in sell_df.iterrows():
                xt_trader.order_stock(acc, row['证券代码'], xtconstant.STOCK_SELL, row['order_volume'], xtconstant.FIX_PRICE, row['target_price'], row['策略名称'], row['remark'])

        # 执行买入操作
        if not buy_df.empty:
            buy_df['target_price'] = buy_df['close'] * 1.02  # 假设以当前价格的102%买入
            buy_df['order_volume'] = buy_df['持仓数量'] - buy_df['持仓量']   # 假设卖出一半
            buy_df['remark'] = '动态买入单'
            for index, row in buy_df.iterrows():
                xt_trader.order_stock(acc, row['证券代码'], xtconstant.STOCK_BUY, row['order_volume'], xtconstant.FIX_PRICE, row['target_price'], row['策略名称'], row['remark'])
        print('打地鼠策略执行完毕')
    except Exception as e:
        print(f"发生错误: {e}")

1. 策略概述

“打地鼠”策略是一种基于动态止盈和买入条件的交易策略。它的核心在于监控股票的价格变动,并在满足特定条件时执行买卖操作。该策略通过设置止盈条件来保护投资者的利润,并在价格合适时买入股票,从而在控制风险的同时寻求收益。

2. 策略实现

策略的实现依赖于Python编程。首先,策略从指定的数据库中读取股票代码,并获取这些股票的实时行情数据。然后,根据预设的止盈和买入条件,对数据进行处理,最后返回处理结果。

3. 止盈条件

止盈条件是指当股票价格达到一定涨幅后,为了保护收益而执行的卖出操作。在本策略中,止盈条件包括三个部分:最大涨幅减去当日涨幅大于预设的最大回撤跌幅、最大涨幅超过激活止盈的阈值、当日涨幅超过执行止盈的最小涨幅阈值。

4. 买入条件

买入条件是指在股价下跌或合适的价格时买入股票。在这个策略中,买入条件包括:最大涨幅减去当日涨幅大于最大回撤跌幅加1%、股票的平均价格(最高价、最低价和前一天收盘价的平均)大于当前价格、股票的持仓量小于预设的持仓数量。

5. 代码实现细节

代码中,首先从数据库中读取股票代码和相关数据,然后根据设定的条件判断是否满足止盈或买入条件。满足止盈条件时,会执行卖出操作,将股票以略低于当前价格的价格卖出;满足买入条件时,则会执行买入操作,以略高于当前价格的价格买入股票。

6. 策略的优势与挑战

这种策略的优势在于能够根据市场情况动态调整交易决策,有助于保护投资者的利润,并在合适的时机进行买入。然而,它也面临着市场波动性大、预测不准确等挑战。

THE END