打地鼠策略:一种动态止盈或买入策略
下面代码描述了一个名为“打地鼠”的量化交易策略,主要用于股票市场的自动交易。它通过设定特定的止盈和买入条件,动态管理股票交易,以达到保护收益和寻找买入机会的目的。这里将详细解释代码的功能,并进一步阐述其在金融量化编程中的应用。
db_path = r'D:\wenjian\python\smart\data\guojin_account.db'
# 读取需要打地鼠的数据库中的持仓数据,并保存到数据表——与策略一起运行,与打地鼠策略无直接关系。
def get_filtered_data(db_path: str, table_names: list, output_table_name: str):
"""
从SQLite数据库中读取多个表的数据,并筛选出对于每个'证券代码',
所有'成交数量'乘以'买卖'的和大于0的行,并将合并结果保存到指定的数据表中。
:param db_path: 数据库文件的路径。
:param table_names: 数据表名称的列表。
:param output_table_name: 输出数据表的名称。
"""
try:
# 创建到SQLite数据库的连接
conn = sqlite3.connect(db_path)
# 用于存储每个表筛选结果的DataFrame列表
dfs = []
for table_name in table_names:
# 读取每个表的数据
query = f"SELECT * FROM {table_name}"
df = pd.read_sql_query(query, conn)
# 计算每个'证券代码'的'成交数量'乘以'买卖'的总和
df['product'] = df['成交数量'] * df['买卖']
grouped = df.groupby('证券代码')['product'].sum().reset_index(name='持仓数量')
# 筛选出总和大于0的'证券代码'
filtered_codes = grouped[grouped['持仓数量'] > 0]['证券代码']
# 从原始DataFrame中筛选出具有这些代码的行,并合并计算的总和
filtered_df = df[df['证券代码'].isin(filtered_codes)]
# 只保留需要的列
final_df = filtered_df[['策略名称', '证券代码']].drop_duplicates().merge(grouped, on='证券代码')
# 将筛选后的DataFrame添加到列表中
dfs.append(final_df)
# 合并所有表的结果
merged_df = pd.concat(dfs, ignore_index=True)
# 将合并后的结果保存到指定的数据表中
merged_df.to_sql(output_table_name, conn, if_exists='replace', index=False)
except Exception as e:
print(f"An error occurred: {e}")
finally:
# 关闭数据库连接
if conn:
conn.close()
# 获取行情数据,并进行筛选,筛选出符合条件的标的
def get_snapshot(code_list: List[str]):
# 获取标的快照数据
df = xtdata.get_full_tick(code_list)
df = DataFrame.from_dict(df).T.reset_index().rename(columns={'index': '证券代码'})
# 计算标的均价
bidPrice_columns = ['bid1', 'bid2', 'bid3', 'bid4', 'bid5']
askPrice_columns = ['ask1', 'ask2', 'ask3', 'ask4', 'ask5']
df[bidPrice_columns] = df['bidPrice'].apply(Series, index=bidPrice_columns)
df[askPrice_columns] = df['askPrice'].apply(Series, index=askPrice_columns)
# 对可能需要转换为float的列进行转换
float_columns = ['bid1', 'bid2', 'bid3', 'bid4', 'bid5', 'ask1', 'ask2', 'ask3', 'ask4', 'ask5', 'amount', 'lastClose', 'volume']
for col in float_columns:
df[col] = pd.to_numeric(df[col], errors='coerce')
df['averagePrice'] = (df['bid1'] + df['ask1']) / 2 # 求买1和卖1的平均价
df.loc[(df['bid1'] == 0) | (df['ask1'] == 0), 'averagePrice'] = df['bid1'] + df['ask1'] # 涨跌停修正
df.rename(columns={'averagePrice': 'close', 'lastClose': 'pre_close', 'volume': 'vol'}, inplace=True)
df['amount'] = df['amount'] / 1e4
df = df[(df.close != 0) & (df.high != 0)] # 现价大于1的标的
# 计算衍生指标
df['pct_chg'] = ((df.close / df.pre_close - 1) * 100) # 今日涨跌幅(%)
df['max_pct_chg'] = ((df.high / df.pre_close - 1) * 100) # 最大涨跌幅(%)
# 展示列,分别表示:代码、买1和卖1平均价、今日涨跌幅(%)、最大涨跌幅(%)、最高价、最低价、昨收价、成交量、成交额(万元)
display_columns = ['证券代码', 'close', 'pct_chg', 'max_pct_chg', 'high', 'low', 'pre_close', 'vol', 'amount']
df = df[display_columns]
return df
# 读取指定数据库表中的'证券代码'列,获取对应的行情数据和持仓量,最后打印合并后的数据。
def process_and_merge_data(db_path: str, table_name: str, acc: str):
"""
读取指定数据库表中的'证券代码'列,获取对应的行情数据和持仓量,
并将行情数据与原始表数据以及持仓数据合并,最后打印合并后的数据。
:param db_path: 数据库文件的路径。
:param table_name: 读取证券代码的数据表名称。
:param acc: 账户标识符。
"""
try:
# 连接到数据库
conn = sqlite3.connect(db_path)
# 从数据库读取证券代码
query = f"SELECT 证券代码 FROM {table_name}"
strategy_data = pd.read_sql_query(query, conn)
# 重新读取完整的数据表
complete_strategy_data = pd.read_sql_query(f"SELECT * FROM {table_name}", conn)
# 获取行情数据
codes = strategy_data['证券代码'].tolist()
market_data = get_snapshot(codes) # 确保这个函数返回一个包含'证券代码'列的DataFrame
# 获取所有持仓信息
positions = xt_trader.query_stock_positions(acc) # 获取所有持仓
positions_data = pd.DataFrame([{'证券代码': pos.stock_code, '持仓量': pos.volume} for pos in positions])
# 合并策略数据和行情数据
merged_data = pd.merge(complete_strategy_data, market_data, on='证券代码', how='left')
# 再将持仓数据合并到已有的合并数据中
final_merged_data = pd.merge(merged_data, positions_data, on='证券代码', how='left')
# 打印合并后的数据
# print(final_merged_data)
return final_merged_data
except Exception as e:
print(f"An error occurred: {e}")
finally:
# 关闭数据库连接
conn.close()
# 条件判断,并进行委托,参数为:数据库路径、数据库表名称、账户标识符、止盈条件、买入条件
def mole_hunting_delegation(db_path: str, table_name: str, acc: str, drawdown: float, active_thres: float, deal_thres: float):
"""
读取指定数据库表中的'证券代码'列,获取对应的行情数据,
并根据止盈条件和买入条件处理数据,最后返回处理后的数据。
:param db_path: 数据库文件的路径。
:param table_name: 读取证券代码的数据表名称。
:param acc: 账户标识符。
:param drawdown: 触发止盈的最大回撤跌幅。
:param active_thres: 激活止盈的最大涨幅阈值。
:param deal_thres: 执行止盈的最小涨幅阈值。
:return: 处理后的DataFrame。
"""
try:
# 执行函数并获得DataFrame
df = process_and_merge_data(db_path, table_name, acc)
# 确保df不是None
if df is None:
raise ValueError("Returned DataFrame is None")
# 判断是否触发止盈条件
sell_cond = (df['max_pct_chg'] - df['pct_chg']) > drawdown # 最大涨幅减去今日涨幅大于最大回撤跌幅,即触发上涨止盈后回撤多少止盈
sell_cond &= df['max_pct_chg'] > active_thres # 最大涨幅>多少点后,触发移动止盈
sell_cond &= df['pct_chg'] > deal_thres # 今日涨幅>多少点后,才可以止盈
sell_cond &= df['持仓数量'] == df['持仓量'] # 假设只有满仓时才考虑止盈,持仓数量是计划持仓数量,持仓量是实际持仓数量
sell_df = df[sell_cond]
# 判断是否触发买入条件(这里需要您定义买入条件)
buy_cond = (df['max_pct_chg'] - df['pct_chg']) > (drawdown+1.0) # 最大涨幅减去今日涨幅大于最大回撤跌幅再加1%,以此来确保卖出后买入有1%的收益率
buy_cond &= df['max_pct_chg'] > active_thres # 最大涨幅>多少点后,触发移动止盈,只有触发移动止盈,即有了打地鼠之后再会有买回的考虑
buy_cond &= (df['high'] + df['low'] + df['pre_close'])/3 > df['close'] # (最高价+最低价+昨收价)的平均价大于现价,即买入的价格在平均价上下
buy_cond &= df['持仓数量'] > df['持仓量'] # 假设无持仓时才考虑买入
buy_df = df[buy_cond]
# 执行卖出操作
if not sell_df.empty:
sell_df['target_price'] = sell_df['close'] * 0.98
sell_df['order_volume'] = sell_df['持仓数量']/2 # 假设卖出一半
sell_df['remark'] = '动态止盈单'
for index, row in sell_df.iterrows():
xt_trader.order_stock(acc, row['证券代码'], xtconstant.STOCK_SELL, row['order_volume'], xtconstant.FIX_PRICE, row['target_price'], row['策略名称'], row['remark'])
# 执行买入操作
if not buy_df.empty:
buy_df['target_price'] = buy_df['close'] * 1.02 # 假设以当前价格的102%买入
buy_df['order_volume'] = buy_df['持仓数量'] - buy_df['持仓量'] # 假设卖出一半
buy_df['remark'] = '动态买入单'
for index, row in buy_df.iterrows():
xt_trader.order_stock(acc, row['证券代码'], xtconstant.STOCK_BUY, row['order_volume'], xtconstant.FIX_PRICE, row['target_price'], row['策略名称'], row['remark'])
print('打地鼠策略执行完毕')
except Exception as e:
print(f"发生错误: {e}")
1. 策略概述
“打地鼠”策略是一种基于动态止盈和买入条件的交易策略。它的核心在于监控股票的价格变动,并在满足特定条件时执行买卖操作。该策略通过设置止盈条件来保护投资者的利润,并在价格合适时买入股票,从而在控制风险的同时寻求收益。
2. 策略实现
策略的实现依赖于Python编程。首先,策略从指定的数据库中读取股票代码,并获取这些股票的实时行情数据。然后,根据预设的止盈和买入条件,对数据进行处理,最后返回处理结果。
3. 止盈条件
止盈条件是指当股票价格达到一定涨幅后,为了保护收益而执行的卖出操作。在本策略中,止盈条件包括三个部分:最大涨幅减去当日涨幅大于预设的最大回撤跌幅、最大涨幅超过激活止盈的阈值、当日涨幅超过执行止盈的最小涨幅阈值。
4. 买入条件
买入条件是指在股价下跌或合适的价格时买入股票。在这个策略中,买入条件包括:最大涨幅减去当日涨幅大于最大回撤跌幅加1%、股票的平均价格(最高价、最低价和前一天收盘价的平均)大于当前价格、股票的持仓量小于预设的持仓数量。
5. 代码实现细节
代码中,首先从数据库中读取股票代码和相关数据,然后根据设定的条件判断是否满足止盈或买入条件。满足止盈条件时,会执行卖出操作,将股票以略低于当前价格的价格卖出;满足买入条件时,则会执行买入操作,以略高于当前价格的价格买入股票。
6. 策略的优势与挑战
这种策略的优势在于能够根据市场情况动态调整交易决策,有助于保护投资者的利润,并在合适的时机进行买入。然而,它也面临着市场波动性大、预测不准确等挑战。
版权声明:
作者:余汉波
链接:https://www.sanrenjz.com/2024/01/29/%e6%89%93%e5%9c%b0%e9%bc%a0%e7%ad%96%e7%95%a5%ef%bc%9a%e4%b8%80%e7%a7%8d%e5%8a%a8%e6%80%81%e6%ad%a2%e7%9b%88%e6%88%96%e4%b9%b0%e5%85%a5%e7%ad%96%e7%95%a5/
文章版权归作者所有,未经允许请勿转载。
THE END