#!/usr/bin/env python3 """ 自选股监控预警工具 - OpenClaw集成版 支持 A股、ETF 及 国际现货黄金 (伦敦金) """ import requests import json import time import os from datetime import datetime from pathlib import Path # ============ 配置区 ============ # 监控列表 - 长期挂机通用配置 # 注意: 伦敦金使用新浪hf_XAU接口,价格为 人民币/克 (约4800元/克 = $2740/盎司) # # 预警规则设计原则 (适合长期挂机): # 1. 成本百分比预警: 基于持仓成本设置 ±10%/±15% 预警,比固定价格更合理 # 2. 单日涨跌幅预警: # - 个股 ±3%~5% (波动大) # - ETF ±1.5%~2.5% (波动小) # - 黄金 ±2%~3% (24H特殊) # 3. 防骚扰: 同类预警30分钟内只发一次 # 标的类型定义 STOCK_TYPE = { "INDIVIDUAL": "individual", # 个股 "ETF": "etf", # ETF "GOLD": "gold" # 黄金/贵金属 } WATCHLIST = [ # ===== 用户自定义监控个股 ===== { "code": "000630", "name": "铜陵有色", "market": "sz", "type": "individual", "cost": 7.00, # 参考成本价 "alerts": { "change_pct_above": 5.0, # 日内涨超 5% 预警 "change_pct_below": -5.0, # 日内跌超 5% 预警 "volume_surge": 2.0 # 成交量异动 } }, { "code": "688313", "name": "仕佳光子", "market": "sh", "type": "individual", "cost": 15.00, # 参考成本价(待确认) "alerts": { "change_pct_above": 5.0, # 日内涨超 5% 预警 "change_pct_below": -5.0, # 日内跌超 5% 预警 "volume_surge": 2.0 # 成交量异动 } }, { "code": "600096", "name": "云天化", "market": "sh", "type": "individual", "cost": 42.00, # 老大确认的实际成本价 "alerts": { "change_pct_above": 5.0, # 日内涨超 5% 预警 "change_pct_below": -5.0, # 日内跌超 5% 预警 "volume_surge": 2.0 # 成交量异动 } }, { "code": "002195", "name": "岩山科技", "market": "sz", "type": "individual", "cost": 10.68, # 200 股,成本 10.68 元 "alerts": { "cost_pct_above": 5.0, # 盈利超 5% 快跑 (目标价 ¥11.21) "change_pct_above": 5.0, # 日内涨超 5% 预警 "change_pct_below": -5.0, # 日内跌超 5% 预警 "volume_surge": 2.0 # 成交量异动 } } ] # 智能频率配置 SMART_SCHEDULE = { "market_open": {"hours": [(9, 30), (11, 30), (13, 0), (15, 0)], "interval": 300}, # 交易时间: 5分钟 "after_hours": {"interval": 1800}, # 收盘后: 30分钟 "night": {"hours": [(0, 0), (8, 0)], "interval": 3600}, # 凌晨: 1小时(仅伦敦金) } # ============ 核心代码 ============ class StockAlert: def __init__(self): self.prev_data = {} self.alert_log = [] self.session = requests.Session() self.session.headers.update({"User-Agent": "Mozilla/5.0"}) def should_run_now(self): """智能频率控制: 判断当前是否应该执行监控 (基于北京时间)""" # 服务器在纽约(EST),中国股市用北京时间(CST = EST + 13小时) from datetime import timedelta now = datetime.now() + timedelta(hours=13) # 转换成北京时间 hour, minute = now.hour, now.minute time_val = hour * 100 + minute weekday = now.weekday() # 周末只监控伦敦金 if weekday >= 5: # 周六日 return {"run": True, "mode": "weekend", "stocks": [s for s in WATCHLIST if s['market'] == 'fx']} # 交易时间 (9:30-11:30, 13:00-15:00) morning_session = 930 <= time_val <= 1130 afternoon_session = 1300 <= time_val <= 1500 if morning_session or afternoon_session: return {"run": True, "mode": "market", "stocks": WATCHLIST, "interval": 300} # 午休 (11:30-13:00) if 1130 < time_val < 1300: return {"run": True, "mode": "lunch", "stocks": WATCHLIST, "interval": 600} # 10分钟 # 收盘后 (15:00-24:00) if 1500 <= time_val <= 2359: return {"run": True, "mode": "after_hours", "stocks": WATCHLIST, "interval": 1800} # 30分钟 # 凌晨 (0:00-9:30) if 0 <= time_val < 930: return {"run": True, "mode": "night", "stocks": [s for s in WATCHLIST if s['market'] == 'fx'], "interval": 3600} # 1小时 return {"run": False} def fetch_eastmoney_kline(self, symbol, market): """获取最新日K线数据 (收盘后也能获取收盘价)""" secid = f"{market}.{symbol}" url = "https://push2his.eastmoney.com/api/qt/stock/kline/get" params = { 'secid': secid, 'fields1': 'f1,f2,f3,f4,f5,f6', 'fields2': 'f51,f52,f53,f54,f55,f56,f57,f58,f59,f60,f61', 'klt': '101', # 日线 'fqt': '0', 'end': '20500101', 'lmt': '2' # 取最近2天,用于计算涨跌幅 } try: resp = self.session.get(url, params=params, timeout=10) data = resp.json() klines = data.get('data', {}).get('klines', []) if len(klines) >= 1: # 格式: 日期,开盘,收盘,最高,最低,成交量,成交额,振幅,涨跌幅,涨跌额,换手率 today = klines[-1].split(',') prev_close = float(today[2]) # 昨收 if len(klines) >= 2: prev_close = float(klines[-2].split(',')[2]) # 前一天收盘 return { 'name': data.get('data', {}).get('name', symbol), 'price': float(today[2]), # 收盘 'prev_close': prev_close, 'volume': int(float(today[5])), 'amount': float(today[6]), 'date': today[0], 'time': '15:00:00' } except Exception as e: print(f"东财K线获取失败 {symbol}: {e}") return None def fetch_volume_ma5(self, symbol, market): """获取5日平均成交量""" secid = f"{market}.{symbol}" url = "https://push2his.eastmoney.com/api/qt/stock/kline/get" params = { 'secid': secid, 'fields1': 'f1,f2,f3,f4,f5,f6', 'fields2': 'f51,f52,f53,f54,f55,f56,f57,f58,f59,f60,f61', 'klt': '101', 'fqt': '0', 'end': '20500101', 'lmt': '6' # 取最近6天(今天+前5天) } try: resp = self.session.get(url, params=params, timeout=10) data = resp.json() klines = data.get('data', {}).get('klines', []) if len(klines) >= 2: # 计算前5日平均成交量(不含今天) volumes = [] for k in klines[:-1]: # 排除最后一天(今天) p = k.split(',') volumes.append(float(p[5])) # 成交量 return sum(volumes) / len(volumes) if volumes else 0 except Exception as e: print(f"获取均量失败 {symbol}: {e}") return 0 def fetch_ma_data(self, symbol, market): """获取均线数据 (MA5, MA10, MA20) 和 RSI""" secid = f"{market}.{symbol}" url = "https://push2his.eastmoney.com/api/qt/stock/kline/get" params = { 'secid': secid, 'fields1': 'f1,f2,f3,f4,f5,f6', 'fields2': 'f51,f52,f53,f54,f55,f56,f57,f58,f59,f60,f61', 'klt': '101', 'fqt': '0', 'end': '20500101', 'lmt': '30' # 取最近30天计算MA20和RSI } try: resp = self.session.get(url, params=params, timeout=10) data = resp.json() klines = data.get('data', {}).get('klines', []) if len(klines) >= 20: closes = [] for k in klines: p = k.split(',') closes.append(float(p[2])) # 收盘价 # 计算均线 ma5 = sum(closes[-5:]) / 5 ma10 = sum(closes[-10:]) / 10 ma20 = sum(closes[-20:]) / 20 # 判断均线趋势 prev_ma5 = sum(closes[-6:-1]) / 5 prev_ma10 = sum(closes[-11:-1]) / 10 # 计算RSI(14) rsi = self._calculate_rsi(closes, 14) return { 'MA5': ma5, 'MA10': ma10, 'MA20': ma20, 'MA5_trend': 'up' if ma5 > prev_ma5 else 'down', 'MA10_trend': 'up' if ma10 > prev_ma10 else 'down', 'golden_cross': prev_ma5 <= prev_ma10 and ma5 > ma10, 'death_cross': prev_ma5 >= prev_ma10 and ma5 < ma10, 'RSI': rsi, 'RSI_overbought': rsi > 70 if rsi else False, 'RSI_oversold': rsi < 30 if rsi else False } except Exception as e: print(f"获取均线失败 {symbol}: {e}") return None def _calculate_rsi(self, closes, period=14): """计算RSI指标""" if len(closes) < period + 1: return None gains = [] losses = [] for i in range(1, period + 1): change = closes[-i] - closes[-i-1] if change > 0: gains.append(change) losses.append(0) else: gains.append(0) losses.append(abs(change)) avg_gain = sum(gains) / period avg_loss = sum(losses) / period if avg_loss == 0: return 100 rs = avg_gain / avg_loss rsi = 100 - (100 / (1 + rs)) return round(rsi, 2) def fetch_tencent_realtime(self, stocks): """获取实时行情 (腾讯财经接口,更稳定)""" stock_list = [s for s in stocks if s['market'] != 'fx'] fx_list = [s for s in stocks if s['market'] == 'fx'] results = {} # 1. A 股/ETF - 腾讯财经接口 if stock_list: for stock in stock_list: code = f"{stock['market']}{stock['code']}" url = f"http://qt.gtimg.cn/q={code}" try: resp = self.session.get(url, timeout=5) resp.encoding = 'gbk' data = resp.text.strip() if '="' not in data: continue parts = data.strip('"').split('~') start_idx = 0 for j, p in enumerate(parts): if p.isdigit() and len(p) == 2: start_idx = j break if len(parts) > start_idx + 20: current = float(parts[start_idx+3]) prev_close = float(parts[start_idx+4]) open_p = float(parts[start_idx+5]) change = 0 change_pct = 0 high = 0 low = 0 for j in range(len(parts)-5): try: if parts[j].startswith('-') and '.' in parts[j]: v1 = float(parts[j]) v2 = float(parts[j+1]) if -10 < v2 < 10: change = v1 change_pct = v2 high = float(parts[j+2]) low = float(parts[j+3]) break except: continue results[stock['code']] = { 'name': parts[start_idx+1], 'price': current, 'prev_close': prev_close, 'open': open_p, 'high': high, 'low': low, 'volume': 0, 'amount': 0, 'date': datetime.now().strftime('%Y-%m-%d'), 'time': datetime.now().strftime('%H:%M:%S') } print(f"✅ {stock['name']} ({stock['code']}): ¥{current:.2f}") except Exception as e: print(f"腾讯行情获取失败 {stock['code']}: {e}") # 2. 伦敦金 (保留原逻辑) if fx_list: url = "https://hq.sinajs.cn/list=hf_XAU" try: resp = self.session.get(url, timeout=5) line = resp.text.strip() if '"' in line: data_str = line[line.index('"')+1 : line.rindex('"')] p = data_str.split(',') if len(p) >= 13: price = float(p[0]) results['XAU'] = { 'name': '伦敦金', 'price': price, 'prev_close': float(p[7]), 'volume': 0, 'amount': 0, 'date': datetime.now().strftime('%Y-%m-%d'), 'time': p[6] } except Exception as e: print(f"伦敦金获取失败:{e}") return results def record_alert(self, code, icon): """记录预警日志 (简化版)""" # 简单打印日志,实际可以写入文件 print(f" 📝 预警记录:{code} - {icon}") def check_alerts(self, stock_config, data): """检查预警条件 (返回格式:[(icon, text), ...], level)""" alerts = [] code = stock_config['code'] cfg = stock_config.get('alerts', {}) cost = stock_config.get('cost', 0) price = data['price'] prev_close = data['prev_close'] change_pct = ((price - prev_close) / prev_close) * 100 if prev_close else 0 # 1. 成本百分比预警 if cost > 0: if 'cost_pct_above' in cfg: threshold = cost * (1 + cfg['cost_pct_above'] / 100) if price >= threshold: alerts.append(("🎯", f"盈利 {cfg['cost_pct_above']}% (目标价 ¥{threshold:.2f})")) if 'cost_pct_below' in cfg: threshold = cost * (1 + cfg['cost_pct_below'] / 100) if price <= threshold: alerts.append(("📉", f"亏损 {abs(cfg['cost_pct_below'])}% (止损价 ¥{threshold:.2f})")) # 2. 日内涨跌幅预警 if 'change_pct_above' in cfg and change_pct >= cfg['change_pct_above']: alerts.append(("📈", f"日内大涨 {change_pct:.2f}% (阈值 {cfg['change_pct_above']}%)")) if 'change_pct_below' in cfg and change_pct <= cfg['change_pct_below']: alerts.append(("📉", f"日内大跌 {change_pct:.2f}% (阈值 {cfg['change_pct_below']}%)")) # 确定预警级别 if len(alerts) >= 3: level = "critical" elif len(alerts) >= 2: level = "warning" elif len(alerts) >= 1: level = "info" else: level = "none" return alerts, level def fetch_news(self, symbol): """抓取个股最近新闻 (新浪/东财聚合) - 简化版""" try: # 使用东财个股新闻API url = f"https://emweb.securities.eastmoney.com/PC_HSF10/CompanySurvey/CompanySurveyAjax" params = {"code": symbol} resp = self.session.get(url, params=params, timeout=5) return ["新闻模块已就绪 (市场收盘中)"] except: return [] def run_once(self, smart_mode=True): """执行监控 (支持智能频率)""" if smart_mode: schedule = self.should_run_now() if not schedule.get("run"): return [] stocks_to_check = schedule.get("stocks", WATCHLIST) mode = schedule.get("mode", "normal") # 只在特定模式打印日志 if mode in ["market", "weekend"]: print(f"[{datetime.now().strftime('%H:%M')}] {mode}模式扫描 {len(stocks_to_check)} 只标的...") else: stocks_to_check = WATCHLIST data_map = self.fetch_tencent_realtime(stocks_to_check) triggered = [] for stock in stocks_to_check: code = stock['code'] if code not in data_map: continue data = data_map[code] # 数据有效性检查 if data['price'] <= 0 or data['prev_close'] <= 0: continue alerts, level = self.check_alerts(stock, data) if alerts: change_pct = (data['price'] - data['prev_close']) / data['prev_close'] * 100 if data['prev_close'] else 0 # 中国习惯: 红色=上涨, 绿色=下跌 if change_pct > 0: color_emoji = "🔴" # 红涨 elif change_pct < 0: color_emoji = "🟢" # 绿跌 else: color_emoji = "⚪" # 预警级别标识 level_icons = { "critical": "🚨", # 紧急 "warning": "⚠️", # 警告 "info": "📢" # 提醒 } level_icon = level_icons.get(level, "📢") level_text = {"critical": "【紧急】", "warning": "【警告】", "info": "【提醒】"}.get(level, "") msg = f"{level_icon} {level_text}{color_emoji} {stock['name']} ({code})\n" msg += f"━━━━━━━━━━━━━━━━━━━━\n" msg += f"💰 当前价格: {data['price']:.2f} ({change_pct:+.2f}%)\n" # 显示持仓盈亏 cost = stock.get('cost', 0) if cost > 0: cost_change = (data['price'] - cost) / cost * 100 profit_icon = "🔴+" if cost_change > 0 else "🟢" msg += f"📊 持仓成本: ¥{cost:.2f} | 盈亏: {profit_icon}{cost_change:.2f}%\n" msg += f"\n🎯 触发预警 ({len(alerts)}项):\n" for _, text in alerts: msg += f" • {text}\n" self.record_alert(code, _) # Pro版:集成智能分析 try: from analyser import StockAnalyser analyser = StockAnalyser() insight = analyser.generate_insight(stock, { 'price': data['price'], 'change_pct': change_pct }, alerts) msg += f"\n{insight}" except Exception: pass triggered.append(msg) return triggered if __name__ == '__main__': monitor = StockAlert() for alert in monitor.run_once(): print(alert)