openclaw-home-pc/workspace/skills/stock-monitor-skill/scripts/monitor.py
2026-03-21 15:31:06 +08:00

508 lines
20 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
自选股监控预警工具 - OpenClaw集成版
支持 A股、ETF 及 国际现货黄金 (伦敦金)
"""
import requests
import json
import time
import os
from datetime import datetime
from pathlib import Path
# ============ 配置区 ============
# 监控列表 - 长期挂机通用配置
# 注意: 伦敦金使用新浪hf_XAU接口价格为 人民币/克 (约4800元/克 = $2740/盎司)
#
# 预警规则设计原则 (适合长期挂机):
# 1. 成本百分比预警: 基于持仓成本设置 ±10%/±15% 预警,比固定价格更合理
# 2. 单日涨跌幅预警:
# - 个股 ±3%~5% (波动大)
# - ETF ±1.5%~2.5% (波动小)
# - 黄金 ±2%~3% (24H特殊)
# 3. 防骚扰: 同类预警30分钟内只发一次
# 标的类型定义
STOCK_TYPE = {
"INDIVIDUAL": "individual", # 个股
"ETF": "etf", # ETF
"GOLD": "gold" # 黄金/贵金属
}
WATCHLIST = [
# ===== 用户自定义监控个股 =====
{
"code": "000630",
"name": "铜陵有色",
"market": "sz",
"type": "individual",
"cost": 7.00, # 参考成本价
"alerts": {
"change_pct_above": 5.0, # 日内涨超 5% 预警
"change_pct_below": -5.0, # 日内跌超 5% 预警
"volume_surge": 2.0 # 成交量异动
}
},
{
"code": "688313",
"name": "仕佳光子",
"market": "sh",
"type": "individual",
"cost": 15.00, # 参考成本价(待确认)
"alerts": {
"change_pct_above": 5.0, # 日内涨超 5% 预警
"change_pct_below": -5.0, # 日内跌超 5% 预警
"volume_surge": 2.0 # 成交量异动
}
},
{
"code": "600096",
"name": "云天化",
"market": "sh",
"type": "individual",
"cost": 42.00, # 老大确认的实际成本价
"alerts": {
"change_pct_above": 5.0, # 日内涨超 5% 预警
"change_pct_below": -5.0, # 日内跌超 5% 预警
"volume_surge": 2.0 # 成交量异动
}
},
{
"code": "002195",
"name": "岩山科技",
"market": "sz",
"type": "individual",
"cost": 10.68, # 200 股,成本 10.68 元
"alerts": {
"cost_pct_above": 5.0, # 盈利超 5% 快跑 (目标价 ¥11.21)
"change_pct_above": 5.0, # 日内涨超 5% 预警
"change_pct_below": -5.0, # 日内跌超 5% 预警
"volume_surge": 2.0 # 成交量异动
}
}
]
# 智能频率配置
SMART_SCHEDULE = {
"market_open": {"hours": [(9, 30), (11, 30), (13, 0), (15, 0)], "interval": 300}, # 交易时间: 5分钟
"after_hours": {"interval": 1800}, # 收盘后: 30分钟
"night": {"hours": [(0, 0), (8, 0)], "interval": 3600}, # 凌晨: 1小时(仅伦敦金)
}
# ============ 核心代码 ============
class StockAlert:
def __init__(self):
self.prev_data = {}
self.alert_log = []
self.session = requests.Session()
self.session.headers.update({"User-Agent": "Mozilla/5.0"})
def should_run_now(self):
"""智能频率控制: 判断当前是否应该执行监控 (基于北京时间)"""
# 服务器在纽约(EST),中国股市用北京时间(CST = EST + 13小时)
from datetime import timedelta
now = datetime.now() + timedelta(hours=13) # 转换成北京时间
hour, minute = now.hour, now.minute
time_val = hour * 100 + minute
weekday = now.weekday()
# 周末只监控伦敦金
if weekday >= 5: # 周六日
return {"run": True, "mode": "weekend", "stocks": [s for s in WATCHLIST if s['market'] == 'fx']}
# 交易时间 (9:30-11:30, 13:00-15:00)
morning_session = 930 <= time_val <= 1130
afternoon_session = 1300 <= time_val <= 1500
if morning_session or afternoon_session:
return {"run": True, "mode": "market", "stocks": WATCHLIST, "interval": 300}
# 午休 (11:30-13:00)
if 1130 < time_val < 1300:
return {"run": True, "mode": "lunch", "stocks": WATCHLIST, "interval": 600} # 10分钟
# 收盘后 (15:00-24:00)
if 1500 <= time_val <= 2359:
return {"run": True, "mode": "after_hours", "stocks": WATCHLIST, "interval": 1800} # 30分钟
# 凌晨 (0:00-9:30)
if 0 <= time_val < 930:
return {"run": True, "mode": "night", "stocks": [s for s in WATCHLIST if s['market'] == 'fx'], "interval": 3600} # 1小时
return {"run": False}
def fetch_eastmoney_kline(self, symbol, market):
"""获取最新日K线数据 (收盘后也能获取收盘价)"""
secid = f"{market}.{symbol}"
url = "https://push2his.eastmoney.com/api/qt/stock/kline/get"
params = {
'secid': secid,
'fields1': 'f1,f2,f3,f4,f5,f6',
'fields2': 'f51,f52,f53,f54,f55,f56,f57,f58,f59,f60,f61',
'klt': '101', # 日线
'fqt': '0',
'end': '20500101',
'lmt': '2' # 取最近2天用于计算涨跌幅
}
try:
resp = self.session.get(url, params=params, timeout=10)
data = resp.json()
klines = data.get('data', {}).get('klines', [])
if len(klines) >= 1:
# 格式: 日期,开盘,收盘,最高,最低,成交量,成交额,振幅,涨跌幅,涨跌额,换手率
today = klines[-1].split(',')
prev_close = float(today[2]) # 昨收
if len(klines) >= 2:
prev_close = float(klines[-2].split(',')[2]) # 前一天收盘
return {
'name': data.get('data', {}).get('name', symbol),
'price': float(today[2]), # 收盘
'prev_close': prev_close,
'volume': int(float(today[5])),
'amount': float(today[6]),
'date': today[0],
'time': '15:00:00'
}
except Exception as e:
print(f"东财K线获取失败 {symbol}: {e}")
return None
def fetch_volume_ma5(self, symbol, market):
"""获取5日平均成交量"""
secid = f"{market}.{symbol}"
url = "https://push2his.eastmoney.com/api/qt/stock/kline/get"
params = {
'secid': secid,
'fields1': 'f1,f2,f3,f4,f5,f6',
'fields2': 'f51,f52,f53,f54,f55,f56,f57,f58,f59,f60,f61',
'klt': '101',
'fqt': '0',
'end': '20500101',
'lmt': '6' # 取最近6天(今天+前5天)
}
try:
resp = self.session.get(url, params=params, timeout=10)
data = resp.json()
klines = data.get('data', {}).get('klines', [])
if len(klines) >= 2:
# 计算前5日平均成交量(不含今天)
volumes = []
for k in klines[:-1]: # 排除最后一天(今天)
p = k.split(',')
volumes.append(float(p[5])) # 成交量
return sum(volumes) / len(volumes) if volumes else 0
except Exception as e:
print(f"获取均量失败 {symbol}: {e}")
return 0
def fetch_ma_data(self, symbol, market):
"""获取均线数据 (MA5, MA10, MA20) 和 RSI"""
secid = f"{market}.{symbol}"
url = "https://push2his.eastmoney.com/api/qt/stock/kline/get"
params = {
'secid': secid,
'fields1': 'f1,f2,f3,f4,f5,f6',
'fields2': 'f51,f52,f53,f54,f55,f56,f57,f58,f59,f60,f61',
'klt': '101',
'fqt': '0',
'end': '20500101',
'lmt': '30' # 取最近30天计算MA20和RSI
}
try:
resp = self.session.get(url, params=params, timeout=10)
data = resp.json()
klines = data.get('data', {}).get('klines', [])
if len(klines) >= 20:
closes = []
for k in klines:
p = k.split(',')
closes.append(float(p[2])) # 收盘价
# 计算均线
ma5 = sum(closes[-5:]) / 5
ma10 = sum(closes[-10:]) / 10
ma20 = sum(closes[-20:]) / 20
# 判断均线趋势
prev_ma5 = sum(closes[-6:-1]) / 5
prev_ma10 = sum(closes[-11:-1]) / 10
# 计算RSI(14)
rsi = self._calculate_rsi(closes, 14)
return {
'MA5': ma5,
'MA10': ma10,
'MA20': ma20,
'MA5_trend': 'up' if ma5 > prev_ma5 else 'down',
'MA10_trend': 'up' if ma10 > prev_ma10 else 'down',
'golden_cross': prev_ma5 <= prev_ma10 and ma5 > ma10,
'death_cross': prev_ma5 >= prev_ma10 and ma5 < ma10,
'RSI': rsi,
'RSI_overbought': rsi > 70 if rsi else False,
'RSI_oversold': rsi < 30 if rsi else False
}
except Exception as e:
print(f"获取均线失败 {symbol}: {e}")
return None
def _calculate_rsi(self, closes, period=14):
"""计算RSI指标"""
if len(closes) < period + 1:
return None
gains = []
losses = []
for i in range(1, period + 1):
change = closes[-i] - closes[-i-1]
if change > 0:
gains.append(change)
losses.append(0)
else:
gains.append(0)
losses.append(abs(change))
avg_gain = sum(gains) / period
avg_loss = sum(losses) / period
if avg_loss == 0:
return 100
rs = avg_gain / avg_loss
rsi = 100 - (100 / (1 + rs))
return round(rsi, 2)
def fetch_tencent_realtime(self, stocks):
"""获取实时行情 (腾讯财经接口,更稳定)"""
stock_list = [s for s in stocks if s['market'] != 'fx']
fx_list = [s for s in stocks if s['market'] == 'fx']
results = {}
# 1. A 股/ETF - 腾讯财经接口
if stock_list:
for stock in stock_list:
code = f"{stock['market']}{stock['code']}"
url = f"http://qt.gtimg.cn/q={code}"
try:
resp = self.session.get(url, timeout=5)
resp.encoding = 'gbk'
data = resp.text.strip()
if '="' not in data:
continue
parts = data.strip('"').split('~')
start_idx = 0
for j, p in enumerate(parts):
if p.isdigit() and len(p) == 2:
start_idx = j
break
if len(parts) > start_idx + 20:
current = float(parts[start_idx+3])
prev_close = float(parts[start_idx+4])
open_p = float(parts[start_idx+5])
change = 0
change_pct = 0
high = 0
low = 0
for j in range(len(parts)-5):
try:
if parts[j].startswith('-') and '.' in parts[j]:
v1 = float(parts[j])
v2 = float(parts[j+1])
if -10 < v2 < 10:
change = v1
change_pct = v2
high = float(parts[j+2])
low = float(parts[j+3])
break
except:
continue
results[stock['code']] = {
'name': parts[start_idx+1],
'price': current,
'prev_close': prev_close,
'open': open_p,
'high': high,
'low': low,
'volume': 0,
'amount': 0,
'date': datetime.now().strftime('%Y-%m-%d'),
'time': datetime.now().strftime('%H:%M:%S')
}
print(f"{stock['name']} ({stock['code']}): ¥{current:.2f}")
except Exception as e:
print(f"腾讯行情获取失败 {stock['code']}: {e}")
# 2. 伦敦金 (保留原逻辑)
if fx_list:
url = "https://hq.sinajs.cn/list=hf_XAU"
try:
resp = self.session.get(url, timeout=5)
line = resp.text.strip()
if '"' in line:
data_str = line[line.index('"')+1 : line.rindex('"')]
p = data_str.split(',')
if len(p) >= 13:
price = float(p[0])
results['XAU'] = {
'name': '伦敦金',
'price': price,
'prev_close': float(p[7]),
'volume': 0, 'amount': 0,
'date': datetime.now().strftime('%Y-%m-%d'),
'time': p[6]
}
except Exception as e:
print(f"伦敦金获取失败:{e}")
return results
def record_alert(self, code, icon):
"""记录预警日志 (简化版)"""
# 简单打印日志,实际可以写入文件
print(f" 📝 预警记录:{code} - {icon}")
def check_alerts(self, stock_config, data):
"""检查预警条件 (返回格式:[(icon, text), ...], level)"""
alerts = []
code = stock_config['code']
cfg = stock_config.get('alerts', {})
cost = stock_config.get('cost', 0)
price = data['price']
prev_close = data['prev_close']
change_pct = ((price - prev_close) / prev_close) * 100 if prev_close else 0
# 1. 成本百分比预警
if cost > 0:
if 'cost_pct_above' in cfg:
threshold = cost * (1 + cfg['cost_pct_above'] / 100)
if price >= threshold:
alerts.append(("🎯", f"盈利 {cfg['cost_pct_above']}% (目标价 ¥{threshold:.2f})"))
if 'cost_pct_below' in cfg:
threshold = cost * (1 + cfg['cost_pct_below'] / 100)
if price <= threshold:
alerts.append(("📉", f"亏损 {abs(cfg['cost_pct_below'])}% (止损价 ¥{threshold:.2f})"))
# 2. 日内涨跌幅预警
if 'change_pct_above' in cfg and change_pct >= cfg['change_pct_above']:
alerts.append(("📈", f"日内大涨 {change_pct:.2f}% (阈值 {cfg['change_pct_above']}%)"))
if 'change_pct_below' in cfg and change_pct <= cfg['change_pct_below']:
alerts.append(("📉", f"日内大跌 {change_pct:.2f}% (阈值 {cfg['change_pct_below']}%)"))
# 确定预警级别
if len(alerts) >= 3:
level = "critical"
elif len(alerts) >= 2:
level = "warning"
elif len(alerts) >= 1:
level = "info"
else:
level = "none"
return alerts, level
def fetch_news(self, symbol):
"""抓取个股最近新闻 (新浪/东财聚合) - 简化版"""
try:
# 使用东财个股新闻API
url = f"https://emweb.securities.eastmoney.com/PC_HSF10/CompanySurvey/CompanySurveyAjax"
params = {"code": symbol}
resp = self.session.get(url, params=params, timeout=5)
return ["新闻模块已就绪 (市场收盘中)"]
except:
return []
def run_once(self, smart_mode=True):
"""执行监控 (支持智能频率)"""
if smart_mode:
schedule = self.should_run_now()
if not schedule.get("run"):
return []
stocks_to_check = schedule.get("stocks", WATCHLIST)
mode = schedule.get("mode", "normal")
# 只在特定模式打印日志
if mode in ["market", "weekend"]:
print(f"[{datetime.now().strftime('%H:%M')}] {mode}模式扫描 {len(stocks_to_check)} 只标的...")
else:
stocks_to_check = WATCHLIST
data_map = self.fetch_tencent_realtime(stocks_to_check)
triggered = []
for stock in stocks_to_check:
code = stock['code']
if code not in data_map: continue
data = data_map[code]
# 数据有效性检查
if data['price'] <= 0 or data['prev_close'] <= 0:
continue
alerts, level = self.check_alerts(stock, data)
if alerts:
change_pct = (data['price'] - data['prev_close']) / data['prev_close'] * 100 if data['prev_close'] else 0
# 中国习惯: 红色=上涨, 绿色=下跌
if change_pct > 0:
color_emoji = "🔴" # 红涨
elif change_pct < 0:
color_emoji = "🟢" # 绿跌
else:
color_emoji = ""
# 预警级别标识
level_icons = {
"critical": "🚨", # 紧急
"warning": "⚠️", # 警告
"info": "📢" # 提醒
}
level_icon = level_icons.get(level, "📢")
level_text = {"critical": "【紧急】", "warning": "【警告】", "info": "【提醒】"}.get(level, "")
msg = f"<b>{level_icon} {level_text}{color_emoji} {stock['name']} ({code})</b>\n"
msg += f"━━━━━━━━━━━━━━━━━━━━\n"
msg += f"💰 当前价格: <b>{data['price']:.2f}</b> ({change_pct:+.2f}%)\n"
# 显示持仓盈亏
cost = stock.get('cost', 0)
if cost > 0:
cost_change = (data['price'] - cost) / cost * 100
profit_icon = "🔴+" if cost_change > 0 else "🟢"
msg += f"📊 持仓成本: ¥{cost:.2f} | 盈亏: {profit_icon}{cost_change:.2f}%\n"
msg += f"\n🎯 触发预警 ({len(alerts)}项):\n"
for _, text in alerts:
msg += f"{text}\n"
self.record_alert(code, _)
# Pro版集成智能分析
try:
from analyser import StockAnalyser
analyser = StockAnalyser()
insight = analyser.generate_insight(stock, {
'price': data['price'],
'change_pct': change_pct
}, alerts)
msg += f"\n{insight}"
except Exception:
pass
triggered.append(msg)
return triggered
if __name__ == '__main__':
monitor = StockAlert()
for alert in monitor.run_once():
print(alert)