小QMT数据获取下载,持仓和全推连接测试
bili 视频:
github 开源地址:yuhanbo758/qmt_connect_test: mini QMT连接测试,与下载行情数据
程序一键下载: 下载
本文档旨在为具有一定技术背景但非编程专家的读者,详细解释 qmt_download_and_connect_test.py 文件的功能、结构、算法和潜在问题。该文件是一个集成了 QMT 交易接口连接、行情数据下载、实时数据获取等功能的 GUI 应用程序。
1. 代码概述
该 Python 脚本使用 tkinter 库创建图形用户界面,xtquant 库连接 QMT 交易接口,并使用多线程、队列等技术实现异步操作和数据处理。其主要功能包括:
- QMT 连接管理: 连接 QMT 交易接口,查询账户资产和持仓。
- 历史数据下载: 下载股票的历史行情数据,包括 Tick 数据、分钟线、日线等。
- 实时行情监控: 获取股票的实时行情数据,并进行涨跌幅监控和预警。
- 配置文件管理: 使用 JSON 格式的配置文件保存用户设置,并支持自动保存和重置。
2. 代码结构与组织方式
该脚本主要由以下几个类和函数组成:
QMTDataDownloadGUI 类包含了大量的函数,用于实现 GUI 界面的各个功能,例如:
- QMTTraderCallback: QMT 交易回调类,用于处理 QMT 交易接口的事件回调,如连接断开、账户状态更新等。
- QMTDataDownloadGUI: GUI 主类,负责创建和管理 GUI 界面,处理用户交互,并调用 QMT 接口进行数据操作。
- main(): 主函数,用于创建 GUI 应用程序并启动主循环。
- GUI 初始化: __init__() 函数初始化 GUI 界面,包括设置窗口标题、大小、图标,创建各种 GUI 组件,以及加载配置文件。
- GUI 组件创建: create_widgets() 函数创建 GUI 组件,包括标签页、按钮、文本框、列表框等。
- QMT 连接管理: connect_qmt(), disconnect_qmt(), query_assets(), query_positions() 函数用于连接和断开 QMT 交易接口,以及查询账户资产和持仓。
- 历史数据下载: download_single_stock(), save_data(), check_existing_data() 函数用于下载和保存股票的历史行情数据。
- 实时行情监控: get_latest_price(), subscribe_realtime(), unsubscribe_realtime(), start_fullpush_monitor(), stop_fullpush_monitor(), process_fullpush_data() 函数用于获取股票的实时行情数据,并进行涨跌幅监控和预警。
- 配置文件管理: load_config(), save_config(), reset_config(), auto_save_config(), open_config_file() 函数用于加载、保存、重置和打开配置文件。
- 日志管理: log(), process_log_queue() 函数用于记录和显示程序运行日志。

3. 核心算法与数据结构
3.1 多线程与队列
该脚本使用了多线程和队列技术来实现异步操作和数据处理。threading 模块用于创建和管理线程,queue 模块用于创建线程间通信的队列。
- 异步操作: QMT 交易接口的连接、数据下载、实时行情获取等操作都是在单独的线程中进行的,以避免阻塞 GUI 主线程,保证 GUI 界面的响应性。
- 线程间通信: GUI 主线程和数据处理线程之间通过队列进行通信。例如,log() 函数将日志消息放入 log_queue 队列中,process_log_queue() 函数从队列中取出消息并显示在 GUI 界面上。
3.2 历史数据增量下载
check_existing_data() 函数用于检查本地是否已存在历史数据,并根据 incremental_var 变量的值,决定是否进行增量下载。
- CSV 文件: 如果保存格式为 CSV 文件,该函数读取 CSV 文件的最后一行,获取最新日期,并计算下一个需要下载的日期。
- SQLite 数据库: 如果保存格式为 SQLite 数据库,该函数查询数据库中的最新日期,并计算下一个需要下载的日期。
3.3 全推数据监控
start_fullpush_monitor() 函数用于启动全推数据监控,process_fullpush_data() 函数用于处理全推数据。
- 全推数据: QMT 交易接口可以推送全市场的行情数据,包括股票代码、最新价格、涨跌幅等。
- 涨跌幅监控: process_fullpush_data() 函数根据用户设置的涨跌幅阈值,对全推数据进行监控,并发出预警。
- 声音预警: 如果启用声音预警,当股票的涨跌幅超过阈值时,play_alert_sound() 函数会播放系统提示音或自定义音效。

4. 潜在限制与改进建议
4.1 错误处理
该脚本的错误处理机制相对简单,主要通过 try...except 块捕获异常,并将错误信息显示在 GUI 界面的日志区域。
- 改进建议: 可以考虑增加更详细的错误处理机制,例如:
4.2 性能优化
该脚本在处理大量数据时可能会出现性能问题,例如:
- 全推数据监控: 全推数据量非常大,如果监控的股票数量过多,可能会导致 CPU 占用率过高,GUI 界面卡顿。
- 批量下载: 批量下载股票数据时,如果股票数量过多,可能会导致下载速度过慢。
- 改进建议: 可以考虑以下性能优化措施:
4.3 用户体验
该脚本的 GUI 界面相对简单,用户体验还有提升空间。
- 改进建议: 可以考虑以下用户体验改进措施:
5. 编程语言与库
该脚本使用 Python 编程语言,主要使用了以下库:
- tkinter: 用于创建 GUI 界面。
- xtquant: 用于连接 QMT 交易接口。
- threading: 用于创建和管理线程。
- queue: 用于创建线程间通信的队列。
- os: 用于进行文件和目录操作。
- json: 用于读写 JSON 格式的配置文件。
- sqlite3: 用于读写 SQLite 数据库。
- pandas: 用于处理数据,例如读取 CSV 文件。
- datetime: 用于处理日期和时间。
- time: 用于进行时间操作,例如睡眠。
- winsound: 用于播放系统提示音。
- markdown: 用于渲染 Markdown 文件。
- html2text: 用于将 HTML 转换为纯文本。
- subprocess: 用于执行系统命令,例如打开配置文件。
6. 代码流程图
graph TD
A[开始] --> B{检查QMT库是否可用};
B -- 可用 --> C[初始化GUI界面];
B -- 不可用 --> D[输出警告信息];
C --> E{创建GUI组件};
E --> F[加载配置文件];
F --> G[绑定变量变化事件];
G --> H[启动日志处理];
H --> I[绑定窗口关闭事件];
I --> J[进入主循环];
J --> K{用户交互};
K --> L{连接QMT};
K --> M{下载数据};
K --> N{实时监控};
L --> O{连接成功};
L --> P{连接失败};
M --> Q{下载完成};
M --> R{下载失败};
N --> S{监控启动};
N --> T{监控停止};
S --> U{预警触发};
U --> V[播放声音];
V --> S;
T --> N;
Q --> K;
R --> K;
O --> K;
P --> K;
D --> C;
style D fill:#f9f,stroke:#333,stroke-width:2px
7.测试代码
QMT账号连接代码:
#coding=utf-8
from xtquant.xttrader import XtQuantTrader, XtQuantTraderCallback
from xtquant.xttype import StockAccount
import random
import time
# 定义回调类
class MyXtQuantTraderCallback(XtQuantTraderCallback):
# 连接断开时的回调
def on_disconnected(self):
"""
连接断开
:return:
"""
print("连接断开,交易接口断开,即将重连")
global xt_trader
xt_trader = None
# 账户状态回调
def on_account_status(self, status):
"""
:param status: XtAccountStatus 对象
:return:
"""
print(f"账户状态: 账号={status.account_id}, 类型={status.account_type}, 状态={status.status}")
if __name__ == '__main__':
# 全局变量
xt_trader = None
print("开始连接交易接口...")
while True:
try:
# 基本配置
path = r'D:\jiaoyi\江海证券QMT模拟交易端\userdata_mini'
acc = StockAccount('18014745')
# 初始化交易接口
session_id = int(random.randint(100000, 999999))
if xt_trader is None:
xt_trader = XtQuantTrader(path, session_id)
xt_trader.start()
# 连接交易接口
connect_result = xt_trader.connect()
if connect_result != 0:
print('连接失败,程序将重试')
time.sleep(5) # 等待5秒后重试
continue
# 连接成功,注册回调并订阅账户
print('连接成功,正在订阅账户...')
callback = MyXtQuantTraderCallback()
xt_trader.register_callback(callback)
subscribe_result = xt_trader.subscribe(acc)
if subscribe_result != 0:
print(f'账号订阅失败: {subscribe_result}')
time.sleep(5) # 等待5秒后重试
continue
# 账户订阅成功,查询资产
print('账户订阅成功!')
# 查询资产信息
asset = xt_trader.query_stock_asset(acc)
if asset:
print(f"账户资产信息: 现金={asset.cash}, 冻结资金={asset.frozen_cash}, 市值={asset.market_value}, 总资产={asset.total_asset}")
# 查询并打印持仓信息
positions = xt_trader.query_stock_positions(acc)
if positions:
print("\n持仓信息:")
for position in positions:
print(f"证券代码: {position.stock_code}, 持仓数量: {position.volume}, 可用数量: {position.can_use_volume}, 成本价: {position.open_price:.2f}, 市值: {position.market_value:.2f}")
else:
print("当前没有持仓")
# 保持连接
print("连接已建立并保持中...")
while xt_trader:
time.sleep(10) # 每10秒检查一次连接状态
except Exception as e:
print(f"运行过程中发生错误: {e}")
# 遇到错误等待5秒后重试
time.sleep(5)
# 如果交易对象存在,尝试重新连接
if xt_trader:
try:
xt_trader = None
except:
pass
数据获取测试代码:
from xtquant import xtdata
xtdata.enable_hello = False
import pandas as pd
from datetime import datetime, timedelta
# 获取股票历史行情数据,比如日线,分钟线等
def get_stock_history_data(stock_code, period='1d', start_time=None, end_time=None):
"""
获取股票历史行情数据
Args:
stock_code: 股票代码,如'000001.SZ'
period: 周期,'1d'为日线,'1m'为1分钟线
start_time: 开始时间,格式'20240301'
end_time: 结束时间,格式'20240314'
Returns:
DataFrame格式的行情数据
"""
# 如果没有指定时间,默认获取近10个交易日的数据
if not start_time or not end_time:
end_time = datetime.now().strftime('%Y%m%d')
start_time = (datetime.now() - timedelta(days=7*365)).strftime('%Y%m%d')
# 下载历史数据
xtdata.download_history_data(stock_code, period=period, start_time=start_time)
# 获取历史行情数据
fields = ['time', 'open', 'high', 'low', 'close', 'volume', 'amount']
market_data = xtdata.get_market_data(field_list=fields,
stock_list=[stock_code],
period=period,
start_time=start_time,
end_time=end_time)
if not market_data:
print(f"未获取到{stock_code}的数据")
return None
# 转换成DataFrame格式
df = pd.DataFrame()
for field in fields:
if field in market_data:
df[field] = market_data[field].values[0] # 使用values[0]获取数据
# 将time列转换为日期时间格式
if 'time' in df.columns:
df['time'] = pd.to_datetime(df['time'], unit='ms')
return df
# 获取股票实时行情tick数据和最新价格
def get_stock_data(codes):
"""
获取股票实时行情数据和最新价格
Args:
codes: 股票代码列表,如 ['000001.SZ']
Returns:
market_data: 行情数据字典
df: 行情数据DataFrame格式
last_price: 第一个股票代码的最新价格
"""
market_data = xtdata.get_full_tick(codes)
print(market_data)
df = pd.DataFrame.from_dict(market_data, orient='index').reset_index().rename(columns={'index': '证券代码'})
last_price = market_data[codes[0]]['lastPrice']
print(market_data)
print(f"{codes[0]}的最新价格是: {last_price}")
return market_data, df, last_price
if __name__ == "__main__":
stock_code = '000001.SZ'
# 获取日线数据
# print("获取近10日日线行情:")
daily_data = get_stock_history_data(stock_code, period='1d')
if daily_data is not None:
print(daily_data)
print("\n")
# 获取1分钟线数据
start_time = '20241101' # 指定具体的开始时间
end_time = '20241108' # 指定具体的结束时间
print(f"获取{start_time}至{end_time}的1分钟行情:")
min_data = get_stock_history_data(stock_code, period='1m', start_time=start_time, end_time=end_time)
if min_data is not None:
print(f"获取到的1分钟行情数据条数: {len(min_data)}")
print(min_data)
# 示例使用
codes = ['000001.SZ']
market_data, df, last_price = get_stock_data(codes)
print(market_data)
版权声明:
作者:余汉波
链接:https://www.sanrenjz.com/2025/09/23/%e5%b0%8fqmt%e6%95%b0%e6%8d%ae%e8%8e%b7%e5%8f%96%e4%b8%8b%e8%bd%bd%ef%bc%8c%e6%8c%81%e4%bb%93%e5%92%8c%e5%85%a8%e6%8e%a8%e8%bf%9e%e6%8e%a5%e6%b5%8b%e8%af%95/
文章版权归作者所有,未经允许请勿转载。
THE END