当前位置: 首页 > article >正文

使用WebSocket实时获取印度股票数据源(无调用次数限制)实战


使用WebSocket实时获取印度股票数据源(无调用次数限制)实战


一、前置准备

1. 获取API密钥

登录 StockTV开发者平台 → 联系客服获取测试Key(格式MY4b781f618e3f43c4b055f25fa61941ad),该密钥无调用次数限制且支持实时数据持续订阅。

2. 安装Python依赖

pip install websocket-client json pandas

二、核心代码实现

1. 建立WebSocket连接

import websocket
import json
import timeAPI_KEY = "YOUR_API_KEY"
WS_URL = f"wss://ws-api.stocktv.top/connect?key={API_KEY}"def on_message(ws, message):"""处理实时行情推送"""data = json.loads(message)if data.get('type') == 'stock':print(f"[{data['symbol']}] 价格: {data['last']} 涨跌幅: {data['pcp']}%")def on_error(ws, error):print(f"连接异常: {error}")def on_close(ws, close_status_code, close_msg):print(f"连接关闭: {close_msg}")def on_open(ws):"""连接成功后订阅股票"""subscribe_msg = json.dumps({"action": "subscribe","symbols": ["RELIANCE", "NSEI"]  # 印度信实工业/Nifty50指数})ws.send(subscribe_msg)print("订阅成功,开始接收实时数据...")

2. 启动实时监听(含自动重连)

def start_websocket():while True:try:ws = websocket.WebSocketApp(WS_URL,on_message=on_message,on_error=on_error,on_close=on_close)ws.on_open = on_openws.run_forever()except Exception as e:print(f"连接异常,5秒后重连: {str(e)}")time.sleep(5)# 启动线程持续运行
import threading
threading.Thread(target=start_websocket, daemon=True).start()

3. 添加心跳机制(保持长连接)

def send_heartbeat(ws):"""每30秒发送心跳包"""while True:try:ws.send(json.dumps({"action": "ping"}))time.sleep(30)except Exception as e:break# 在on_open函数中启动心跳线程
def on_open(ws):# ...原有订阅代码...threading.Thread(target=send_heartbeat, args=(ws,), daemon=True).start()

三、实时数据示例输出

订阅成功,开始接收实时数据...
[RELIANCE] 价格: 2856.15 涨跌幅: +1.23%
[NSEI] 价格: 22985.40 涨跌幅: +0.75%
[RELIANCE] 价格: 2857.80 涨跌幅: +1.35% 

四、关键参数说明

字段说明示例值
symbol股票/指数代码RELIANCE, NSEI
last最新成交价2856.15
pcp涨跌幅百分比(自动带±号)+1.23%
volume成交量(股)1254875
timestamp数据时间戳(Unix毫秒级)1725002394123

五、注意事项

  1. 连接稳定性
    通过自动重连机制+心跳包保障7×24小时持续运行

  2. 数据时效性
    印度市场交易时段为IST 9:15-15:30(北京时间11:45-18:00),非交易时段无实时数据推送

  3. 性能优化
    建议使用异步处理框架(如asyncio)避免数据堆积,实测单连接可承载100+标的实时推送


http://www.lryc.cn/news/2403496.html

相关文章:

  • 阿里140 补环境日志
  • uniapp map组件的基础与实践
  • 在 Kali 上打造渗透测试专用的 VSCode 环境
  • 《前端面试题:CSS3新特性》
  • 极速互联·智控未来——SG-Can(FD)Hub-600 六通道CANFD集线器
  • OpenVINO环境配置--OpenVINO安装
  • Linux top 命令 的使用总结
  • ajax学习手册
  • Python爬虫实战:研究urlunparse函数相关技术
  • [蓝桥杯]采油
  • OpenLayers 地图定位
  • 黑龙江云前沿服务器租用:便捷高效的灵活之选​
  • PyTorch中matmul函数使用详解和示例代码
  • 论文解读:Locating and Editing Factual Associations in GPT(ROME)
  • NoSQl之Redis部署
  • 学习设计模式《十二》——命令模式
  • 十三、【核心功能篇】测试计划管理:组织和编排测试用例
  • 手撕 K-Means
  • SmolVLA: 让机器人更懂 “看听说做” 的轻量化解决方案
  • day45python打卡
  • AIGC赋能前端开发
  • Web 3D协作平台开发案例:构建制造业远程设计与可视化协作
  • AI Agent开发第78课-大模型结合Flink构建政务类长公文、长文件、OA应用Agent
  • 极空间z4pro配置gitea mysql,内网穿透
  • 第三方测试机构进行科技成果鉴定测试有什么价值
  • 华为云Flexus+DeepSeek征文|基于华为云Flexus X和DeepSeek-R1打造个人知识库问答系统
  • 【数据结构】_排序
  • 《前端面试题:JS数据类型》
  • PPT转图片拼贴工具 v4.3
  • Chrome安装代理插件ZeroOmega(保姆级别)