可转债轮动策略:5年25倍,年化收益率91.35%
该策略是我目前运行的另一个可转债策略,经过 6 万多次调试之后,收益相对较好的策略。当时回测的三年年化收益率结果是 119.42%,最大回撤率为 11.82%。
而近一年的表现相对较差,年化收益率收益率在 21.66%,最大回撤率也因 1 月份市场狂跌,扩大到 15.70%。

不过,从最近 5 年的回测结果来看,年化收益率高达 91.35%,相较于转债等权基准,还有 73.62%的超额收益率。

交易代码
#coding=utf-8
from xtquant.xttrader import XtQuantTrader, XtQuantTraderCallback
from xtquant.xttype import StockAccount
from xtquant import xtconstant
from xtquant import xtdata
from typing import List
from pandas import Series, DataFrame
from datetime import datetime, timedelta
import sqlite3
import pandas as pd
import random
import os
import numpy as np
import schedule
import time
import send_email as se
import convertible_bond_rotation_strategy as cbrs
# 时间的转换和格式化函数
def convert_time(unix_timestamp):
traded_time_utc = datetime.utcfromtimestamp(unix_timestamp) # 注意这里改为 datetime.utcfromtimestamp
traded_time_beijing = traded_time_utc + datetime.timedelta(hours=8) # 注意这里改为 datetime.timedelta
return traded_time_beijing.strftime('%Y-%m-%d %H:%M:%S')
# 证券资产查询并保存到数据库manage_assets
def save_stock_asset(asset, db_path):
try:
# 连接到SQLite数据库
conn = sqlite3.connect(db_path)
# 创建DataFrame
data = {
'账户类型': [asset.account_type],
'资金账号': [asset.account_id],
'现金': [asset.cash],
'冻结现金': [asset.frozen_cash],
'市值': [asset.market_value],
'总资产': [asset.total_asset]
}
df = pd.DataFrame(data)
# 将DataFrame写入数据库,替换现有数据
df.to_sql('manage_assets', conn, if_exists='replace', index=False)
conn.close()
# 打印资金变动推送
# print("资金变动推送")
# print(asset.account_type, asset.account_id, asset.cash, asset.frozen_cash, asset.market_value, asset.total_asset)
except Exception as e:
print("An error occurred:", e)
# 获取持仓数据并保存到数据库account_holdings
def save_positions(positions, db_path):
try:
# 连接到SQLite数据库
conn = sqlite3.connect(db_path)
# 创建DataFrame
data = {
'账户类型': [position.account_type for position in positions],
'资金账号': [position.account_id for position in positions],
'证券代码': [position.stock_code for position in positions],
'持仓数量': [position.volume for position in positions],
'可用数量': [position.can_use_volume for position in positions],
'平均建仓成本': [position.open_price for position in positions],
'市值': [position.market_value for position in positions]
}
df = pd.DataFrame(data)
# 将DataFrame写入数据库,替换现有数据
df.to_sql('account_holdings', conn, if_exists='replace', index=False)
conn.close()
except Exception as e:
print("An error occurred:", e)
# 查询当日委托并保存到数据库daily_orders
def save_daily_orders(orders, db_path):
try:
# 连接到SQLite数据库
conn = sqlite3.connect(db_path)
# 创建DataFrame
data = {
'账户类型': [order.account_type for order in orders],
'资金账号': [order.account_id for order in orders],
'证券代码': [order.stock_code for order in orders],
'委托类型': [order.order_type for order in orders],
'订单编号': [order.order_id for order in orders],
'报单时间': [convert_time(order.order_time) for order in orders],
'委托价格': [order.price for order in orders],
'委托数量': [order.order_volume for order in orders],
'报价类型': [order.price_type for order in orders],
'委托状态': [order.order_status for order in orders],
'柜台合同编号': [order.order_sysid for order in orders],
'策略名称': [order.strategy_name for order in orders],
'委托备注': [order.order_remark for order in orders]
}
df = pd.DataFrame(data)
# 将DataFrame写入数据库,替换现有数据
df.to_sql('daily_orders', conn, if_exists='replace', index=False)
conn.close()
except Exception as e:
print("An error occurred:", e)
# 假设 orders 是一个包含所有订单信息的列表
# save_daily_orders(orders, db_path)
# 查询当日成交并保存到数据库daily_trades
def save_daily_trades(trades, db_path):
try:
# 连接到SQLite数据库
conn = sqlite3.connect(db_path)
# 创建DataFrame
data = {
'账户类型': [trade.account_type for trade in trades],
'资金账号': [trade.account_id for trade in trades],
'证券代码': [trade.stock_code for trade in trades],
'委托类型': [trade.order_type for trade in trades],
'成交编号': [trade.traded_id for trade in trades],
'成交时间': [convert_time(trade.traded_time) for trade in trades],
'成交均价': [trade.traded_price for trade in trades],
'成交数量': [trade.traded_volume for trade in trades],
'成交金额': [trade.traded_amount for trade in trades],
'订单编号': [trade.order_id for trade in trades],
'柜台合同编号': [trade.order_sysid for trade in trades],
'策略名称': [trade.strategy_name for trade in trades],
'委托备注': [trade.order_remark for trade in trades]
}
df = pd.DataFrame(data)
# 将DataFrame写入数据库,替换现有数据
df.to_sql('daily_trades', conn, if_exists='replace', index=False)
conn.close()
except Exception as e:
print("An error occurred:", e)
# 假设 trades 是一个包含所有交易信息的列表
# save_daily_trades(trades, db_path)
# 查询除策略外的其他持仓,并将它保存到数据表other_positions
def calculate_remaining_holdings(db_path, strategy_tables):
try:
# 连接到SQLite数据库
conn = sqlite3.connect(db_path)
# 从account_holdings获取所有的持仓,并确保数据类型正确
account_holdings = pd.read_sql('SELECT * FROM account_holdings', conn).set_index('证券代码')
account_holdings['持仓数量'] = account_holdings['持仓数量'].astype(int)
# 对于每个策略的成交数量表,计算每种证券的总成交数量(考虑买卖方向)并从account_holdings中减去
for table in strategy_tables:
strategy_data = pd.read_sql(f'SELECT 证券代码, SUM(成交数量 * 买卖) as total FROM {table} GROUP BY 证券代码', conn).set_index('证券代码')
strategy_data['total'] = strategy_data['total'].astype(int)
account_holdings['持仓数量'] -= account_holdings.join(strategy_data, how='left')['total'].fillna(0).astype(int)
# 只保留持仓数量不等于0的记录
account_holdings = account_holdings[account_holdings['持仓数量'] != 0].reset_index()
# 将结果存储到新的数据表other_positions
account_holdings.to_sql('other_positions', conn, if_exists='replace', index=False)
except Exception as e:
print(f"出现错误: {e}")
finally:
# 无论是否出现异常,都关闭数据库连接
conn.close()
# 证券委托,参数分别是:数据库路径、委托数据表名称、成交数据表名称、判断处函数前缀(不改动)、账号(不改动)
# 注:报价类型主要有xtconstant.FIX_PRICE(限价)、xtconstant.LATEST_PRICE(最新介)、xtconstant.MARKET_PEER_PRICE_FIRST(对手方最优)、xtconstant.MARKET_MINE_PRICE_FIRST(本方最优)
def place_orders(db_path, table_name, trade_table_name, xt_trader, acc):
try:
# 连接到SQLite数据库
conn = sqlite3.connect(db_path)
# 从数据库读取交易数据
frame = pd.read_sql(f'SELECT * FROM {table_name}', conn)
# 如果frame为空,跳过后续所有操作
if frame.empty:
print("没有需要处理的交易数据")
return # 退出函数
# 检查连接结果
connect_result = xt_trader.connect()
if connect_result == 0:
print('连接成功')
# 查询可撤单的订单,连接成功后执行
orders = xt_trader.query_stock_orders(acc, True)
# 遍历交易数据,执行交易操作
for i, row in frame.iterrows():
stock_code = row['证券代码']
order_type = xtconstant.STOCK_BUY if row['买卖'] == 1 else xtconstant.STOCK_SELL
order_volume = row['委托数量']
price = row['委托价格']
# 根据'委托价格'设置price_type,当委托价格为0时用最新价交易,非0以限价交易
if price == 0:
price_type = xtconstant.LATEST_PRICE
else:
price_type = xtconstant.FIX_PRICE
strategy_name = row['策略名称']
order_remark = row['委托备注']
# 检查是否需要撤单
for order in orders:
if order.stock_code == stock_code:
# 撤单
cancel_result = xt_trader.cancel_order_stock(acc, order.order_id)
if cancel_result == 0:
print(f"撤单成功:{order.order_id}")
else:
print(f"撤单失败:{order.order_id}")
# 买入操作
if order_type == xtconstant.STOCK_BUY:
asset = xt_trader.query_stock_asset(acc)
if asset.cash < order_volume * price:
print(f"跳过买入 {stock_code},现金不足")
continue
# 卖出操作
if order_type == xtconstant.STOCK_SELL:
# 从指定的trade_table_name表中获取该股票的可用数量,并求和
query = f"SELECT SUM(成交数量 * 买卖) FROM {trade_table_name} WHERE 证券代码 = '{stock_code}'"
available_volume = pd.read_sql_query(query, conn).iloc[0, 0]
if available_volume is None or available_volume <= 0:
print(f"跳过卖出 {stock_code},没有持仓或可用数量不足")
continue
if available_volume < order_volume:
print(f"{stock_code} 可用数量不足,将卖出剩余 {available_volume} 股")
order_volume = available_volume
elif available_volume >= order_volume:
print(f"{stock_code} 可用数量充足,将卖出 {order_volume} 股")
# 尝试同步报单,异步报单需要改成xt_trader.order_stock_async
try:
order_id = xt_trader.order_stock(acc, stock_code, order_type, order_volume, price_type, price, strategy_name, order_remark)
print(order_id, stock_code, order_type, order_volume, price)
except Exception as e:
error_message = f"报单操作失败,原因:{str(e)}"
se.send_other_email(error_message)
print(f"报单操作失败,原因:{e}")
continue
else:
print('连接失败')
except Exception as e:
print(f"执行过程中出现错误:{e}")
finally:
# 无论是否出现异常,都关闭数据库连接
conn.close()
# 使用示例,参数分别是:数据库路径、委托数据表名称、成交数据表名称、判断处函数前缀(不改动)、账号(不改动)
# place_orders(db_path, 'your_table_name', 'your_trade_table_name', xt_trader, acc)
# 定义qmt推送的类
class MyXtQuantTraderCallback(XtQuantTraderCallback):
gj_db_path = r'E:\smart\data\guojin_account.db'
# 资金变动推送 注意,该回调函数目前不生效
def on_stock_asset(self, asset):
try:
# 使用上下文管理器连接到SQLite数据库
with sqlite3.connect(self.gj_db_path) as conn:
# 创建DataFrame
data = {
'账户类型': [asset.account_type],
'资金账号': [asset.account_id],
'现金': [asset.cash],
'冻结现金': [asset.frozen_cash],
'市值': [asset.market_value],
'总资产': [asset.total_asset]
}
df = pd.DataFrame(data)
# 将DataFrame写入数据库,替换现有数据
df.to_sql('manage_assets', conn, if_exists='replace', index=False)
# 打印资金变动推送
print("资金变动推送")
print(asset.account_type, asset.account_id, asset.cash, asset.frozen_cash, asset.market_value, asset.total_asset)
except Exception as e:
print("出现错误:", e)
# 成交变动推送,每增加一个策略都要往里增加保存数据表的名称
def on_stock_trade(self, trade):
try:
# 使用上下文管理器连接到SQLite数据库
with sqlite3.connect(self.gj_db_path) as conn:
# 成交变动推送
buy_sell = 1 if trade.order_type == 23 else -1 if trade.order_type == 24 else 0
values = (
trade.account_id, # 资金账号
trade.order_id, # 订单编号
trade.strategy_name, # 策略名称
trade.order_remark, # 委托备注
convert_time(trade.traded_time), # 成交时间
trade.order_type, # 委托类型
trade.stock_code, # 证券代码
trade.traded_price, # 成交均价
trade.traded_volume, # 成交数量
trade.traded_amount, # 成交金额
buy_sell # 买卖
)
# 使用策略名称与表名的映射字典简化选择插入的表的代码
strategy_to_table = {
"卡玛比率策略": "execute_calmar_ratio_trade"
}
table_name = strategy_to_table.get(trade.strategy_name, "unallocated_transaction")
if table_name == "unallocated_transaction":
return
insert_query = f"""
INSERT INTO {table_name} (资金账号, 订单编号, 策略名称, 委托备注, 成交时间, 委托类型, 证券代码, 成交均价, 成交数量, 成交金额, 买卖)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
"""
conn.execute(insert_query, values)
conn.commit()
except Exception as e:
print("出现错误:", e)
# 持仓变动推送 注意,该回调函数目前不生效
def on_stock_position(self, positions):
try:
# 使用上下文管理器连接到SQLite数据库
with sqlite3.connect(self.gj_db_path) as conn:
# 准备插入的数据
values = [(position.account_type, # 账户类型
position.account_id, # 资金账号
position.stock_code, # 证券代码
position.volume, # 持仓数量
position.can_use_volume, # 可用数量
position.open_price, # 平均建仓成本
position.market_value # 市值
) for position in positions]
# 插入到account_holdings表
insert_query = """
INSERT INTO account_holdings (账户类型, 资金账号, 证券代码, 持仓数量, 可用数量, 平均建仓成本, 市值)
VALUES (?, ?, ?, ?, ?, ?, ?)
"""
conn.executemany(insert_query, values)
conn.commit()
except Exception as e:
print("出现错误:", e)
def on_disconnected(self):
"""
连接断开
:return:
"""
print("connection lost, 交易接口断开,即将重连")
global xt_trader
xt_trader = None
def on_stock_order(self, order):
"""
委托回报推送
:param order: XtOrder对象
:return:
"""
print("委托回报推送")
print(order.stock_code, order.order_status, order.order_sysid)
def on_order_error(self, order_error):
"""
委托失败推送
:param order_error:XtOrderError 对象
:return:
"""
print("委托失败推送")
print(order_error.order_id, order_error.error_id, order_error.error_msg)
def on_cancel_error(self, cancel_error):
"""
撤单失败推送
:param cancel_error: XtCancelError 对象
:return:
"""
print("撤单失败推送")
print(cancel_error.order_id, cancel_error.error_id, cancel_error.error_msg)
def on_order_stock_async_response(self, response):
"""
异步下单回报推送
:param response: XtOrderResponse 对象
:return:
"""
print("异步下单回报推送")
print(response.account_id, response.order_id, response.seq)
# 账户状态
def on_account_status(self, status):
"""
:param response: XtAccountStatus 对象
:return:
"""
print("账户状态,类型2为“证券账户”,状态:-1-无效;0-正常;4-初始化") # http://docs.thinktrader.net/vip/pages/198696/#%E8%B4%A6%E5%8F%B7%E7%8A%B6%E6%80%81-account-status
print(status.account_id, status.account_type, status.status)
# 保存证券资产、委托、成交和持仓数据到数据库
def save_daily_data(xt_trader, acc, db_path):
"""
保存当日的持仓、委托和成交数据到数据库
:param xt_trader: 交易对象
:param acc: 账户信息
:param db_path: 数据库路径
"""
try:
# 查询证券资产
asset = xt_trader.query_stock_asset(acc)
if asset:
save_stock_asset(asset, db_path)
print("证券资产查询保存成功,可用资金:", asset.cash)
except Exception as e:
print("保存证券资产时出现错误:", e)
try:
# 查询当日所有的持仓
positions = xt_trader.query_stock_positions(acc)
if len(positions) != 0:
save_positions(positions, db_path)
print(f"持仓保存成功, 持仓数据: {len(positions)}")
except Exception as e:
print("保存持仓数据时出现错误:", e)
try:
# 查询当日所有的委托
orders = xt_trader.query_stock_orders(acc)
if len(orders) != 0:
save_daily_orders(orders, db_path)
print(f"当日委托保存成功, 委托数据: {len(orders)}")
except Exception as e:
print("保存委托数据时出现错误:", e)
try:
# 查询当日所有的成交
trades = xt_trader.query_stock_trades(acc)
if len(trades) != 0:
save_daily_trades(trades, db_path)
print(f"当日成交保存成功, 成交数据: {len(trades)}")
except Exception as e:
print("保存成交数据时出现错误:", e)
# 对委托没有成交的进行撤单操作
def cancel_all_orders(xt_trader, acc):
try:
orders = xt_trader.query_stock_orders(acc, True)
# 提取订单ID
order_ids_strings = [order.order_id for order in orders]
# 如果有订单,则尝试撤销
if len(orders) != 0:
print(f"查询到的订单ID: {order_ids_strings}")
# 遍历订单ID并撤销
for order in order_ids_strings:
xt_trader.cancel_order_stock(acc, int(order))
print(f"订单ID {order} 已成功撤销。")
print("所有订单已成功撤销。")
else:
print("没有订单需要撤销。")
except Exception as e:
print("撤销订单时出现错误:", e)
# 开盘后必要运行,获取每天只需获取一次的数据,需修改为自己数据库的路径
def daily_mandatory_execution():
db_path = r'E:\smart\data\guojin_account.db'
# 获取满足强赎的可转债,用于剔除,每天运行一次。
cbrs.filter_bond_cb_redeem_data_and_save_to_db()
# 查询除策略外的其他持仓,并将它保存到数据表other_positions
strategy_tables = ['execute_sortino_ratio_trade', 'execute_calmar_ratio_trade']
calculate_remaining_holdings(db_path, strategy_tables)
print("其他持仓数据俽数据成功")
# 卡玛比率策略
def calmar_strategy():
cbrs.run_calmar_strategy(db_path)
cbrs.process_fund_data(db_path, 40, "calmar_ratio_top_3", "place_calmar_ratio_order", "execute_calmar_ratio_trade", "卡玛比率策略")
print("提取委托卡玛策略完成")
place_orders(db_path, 'place_calmar_ratio_order', 'execute_calmar_ratio_trade', xt_trader, acc)
print("卡玛策略委托完成")
# 周一到周五运行函数
def run_weekdays_at(time_str, function):
schedule.every().monday.at(time_str).do(function)
schedule.every().tuesday.at(time_str).do(function)
schedule.every().wednesday.at(time_str).do(function)
schedule.every().thursday.at(time_str).do(function)
schedule.every().friday.at(time_str).do(function)
# 主函数,用于设定所有计划任务
def schedule_jobs():
# 在"10:15", "14:35"执行calmar_strategy任务,运行可转债策略
for time_str in ["10:15", "14:35"]:
run_weekdays_at(time_str, calmar_strategy)
while True:
schedule.run_pending()
current_time = time.strftime("%H:%M", time.localtime())
# 检查是否到达或超过15:30,如果是,则退出循环
if current_time >= "15:30":
print("已经15:30,退出委托程序。")
break
# 每3秒检查一次
time.sleep(3)
if __name__ == "__main__":
xt_trader = None
while True:
try:
# 获取当前时间
current_time = time.strftime("%H:%M", time.localtime())
# 如果在9:00至16:00之间
if "09:00" <= current_time <= "16:00":
db_path = r'E:\smart\data\guojin_account.db'
path = r'E:\投资理财\国金QMT\国金证券QMT交易端\userdata_mini'
session_id = int(random.randint(100000, 999999))
xt_trader = XtQuantTrader(path, session_id)
acc = StockAccount('888133225')
callback = MyXtQuantTraderCallback()
xt_trader.register_callback(callback)
xt_trader.start()
connect_result = xt_trader.connect()
if connect_result != 0:
print('连接失败,程序即将重试')
else:
subscribe_result = xt_trader.subscribe(acc)
if subscribe_result != 0:
print('账号订阅失败 %d' % subscribe_result)
asset = xt_trader.query_stock_asset(acc)
if asset:
save_stock_asset(asset, db_path)
print("证券资产查询保存成功,可用资金:", asset.cash)
schedule_jobs()
# 内部循环,检查连接状态并尝试重新连接
start_time = datetime.now()
while xt_trader is not None:
# 检查是否超过了某个时间限制,例如30分钟
if (datetime.now() - start_time).total_seconds() > 1800:
break
# 每30秒检查一次时间
time.sleep(30)
current_time = datetime.now().strftime("%H:%M")
if current_time > "16:00":
break
else:
print("当前时间不在运行时段内")
if xt_trader is not None:
xt_trader = None # 设置为None以便于后续重新连接
time.sleep(3600) # 睡眠一小时后再次检查
except Exception as e:
print(f"运行过程中发生错误: {e}")
if xt_trader is not None:
xt_trader = None
time.sleep(1800) # 如果遇到异常,休息半小时再试
if current_time > "16:00":
break
代码目的和功能
提供的代码是一个量化交易策略,用于在 QMT 平台上执行可转债投资策略。具体来说,它包含了一系列函数和任务,用于:
- 从数据库保存和检索证券资产、委托、成交和持仓数据
- 处理可转债数据并识别可交易机会
- 生成交易委托,包括买入和卖出策略
- 撤销未成交的委托
- 安排定时任务以自动执行交易策略
代码结构和组织方式
代码被组织成以下主要模块:
- **数据库操作函数:**这些函数用于保存和检索数据,例如证券资产、委托、成交和持仓。
- **策略函数:**这些函数包含可转债策略的逻辑,包括识别可交易机会和生成交易委托。
- **交易函数:**这些函数用于执行交易操作,例如下委托和撤销委托。
- **定时任务函数:**这些函数用于安排定时任务,以自动执行策略在工作日特定时间的行为。
算法和数据结构
代码使用以下算法和数据结构:
- **可转债筛选算法:**该算法用于筛选满足强赎条件的可转债,并将其剔除交易机会。
- **卡玛比率排序算法:**该算法用于对可转债进行排序,以便于优先考虑具有较高收益风险比的债券。
- **委托队列:**该数据结构用于存储待执行的交易委托。
- **数据表:**代码使用 SQLite 数据库来存储数据,例如策略委托、成交和持仓。
复杂或不寻常的方面
代码中一个复杂且不寻常的方面是它对多个策略的灵活支持能力。代码允许用户配置不同的策略函数,以便于在不同的市场条件下执行不同的交易策略。
下载或阅读完整内容为付费内容,金额为:888.8
该内容与微信公众号的付费阅读和本站点的“付费阅读”绑定:
- 公众号的付费阅读可以直接获得下载或阅读内容,关注微信公众号:余汉波-文章视频-付费阅读,找到对应的内容,或跳转至:可转债轮动策略:5年25倍,年化收益率91.35%,
- 扫描打赏二维码,打赏指定金额,截图+标题发送至邮箱(yuhanbo@sanrenjz.com),或发送到微信(yuhanbo758),等待回复的付费阅读密码:5年25倍,年化收益率91.35%的可转债轮动策略 | 余汉波 文档

版权声明:
作者:余汉波
链接:https://www.sanrenjz.com/2024/11/27/%e5%8f%af%e8%bd%ac%e5%80%ba%e8%bd%ae%e5%8a%a8%e7%ad%96%e7%95%a5%ef%bc%9a5%e5%b9%b425%e5%80%8d%ef%bc%8c%e5%b9%b4%e5%8c%96%e6%94%b6%e7%9b%8a%e7%8e%8791-35/
文章版权归作者所有,未经允许请勿转载。
THE END