From d7788d3766854e12c9599f4622dd4ee3ae18c1b6 Mon Sep 17 00:00:00 2001 From: root Date: Mon, 2 Mar 2026 10:22:40 +0000 Subject: [PATCH] =?UTF-8?q?feat:=20L7=E4=BA=8B=E4=BB=B6=E6=B5=81=20?= =?UTF-8?q?=E2=80=94=20live=5Fevents=E8=A1=A8+API+=E5=89=8D=E7=AB=AF+?= =?UTF-8?q?=E4=B8=89=E6=A8=A1=E5=9D=97=E4=BA=8B=E4=BB=B6=E5=86=99=E5=85=A5?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit DB: - live_events表(id/ts/level/category/symbol/message/detail) 后端: - GET /api/live/events?limit=30&level=all|critical|warn|info&category=all|trade|risk - log_live_event() 异步辅助函数 事件源: - live_executor: 开仓成功写event(trade/info, 含滑点/score) - position_sync: 平仓写event(trade, 含PnL拆解) - risk_guard: 熔断/禁仓/恢复写event(risk/critical|warn|info) 前端L7: - 5秒轮询, 最近30条事件 - 按级别筛选(全部/严重/警告/信息) - 彩色分类标签(trade绿/risk红/system灰/reconciliation紫) - 时间线布局, 最大高度56带滚动 --- backend/live_executor.py | 20 ++++++++++++ backend/main.py | 45 +++++++++++++++++++++++++ backend/position_sync.py | 19 +++++++++++ backend/risk_guard.py | 18 ++++++++++ frontend/app/live/page.tsx | 67 ++++++++++++++++++++++++++++++++++++++ 5 files changed, 169 insertions(+) diff --git a/backend/live_executor.py b/backend/live_executor.py index 43f3f1d..cb614ff 100644 --- a/backend/live_executor.py +++ b/backend/live_executor.py @@ -404,9 +404,29 @@ async def execute_entry(session: aiohttp.ClientSession, signal: dict, db_conn): db_conn.commit() logger.info(f"[{symbol}] ✅ 实盘开仓完成 | trade_id={trade_id} | {direction} @ {fill_price} | score={score} | 策略={strategy}") + + # 写event + _log_event(db_conn, "info", "trade", f"{direction} {symbol} @ {fill_price} | score={score} | 滑点={slippage_bps:.1f}bps", symbol, + {"trade_id": trade_id, "score": score, "fill_price": fill_price, "slippage_bps": round(slippage_bps, 2)}) + return trade_id +# ============ 事件日志 ============ + +def _log_event(conn, level, category, message, symbol=None, detail=None): + """写live_events表""" + try: + cur = conn.cursor() + cur.execute( + "INSERT INTO live_events (level, category, symbol, message, detail) VALUES (%s, %s, %s, %s, %s)", + (level, category, symbol, message, json.dumps(detail) if detail else None) + ) + conn.commit() + except Exception: + pass + + # ============ 信号监听 ============ def get_db_connection(): diff --git a/backend/main.py b/backend/main.py index 1bd06d4..91240b5 100644 --- a/backend/main.py +++ b/backend/main.py @@ -1842,3 +1842,48 @@ async def live_paper_comparison( "avg_pnl_diff_r": round(total_pnl_diff / count, 4) if count > 0 else 0, "data": comparisons, } + + +# ============ Live Events (L7通知流) ============ + +async def log_live_event(level: str, category: str, message: str, symbol: str = None, detail: dict = None): + """写入实盘事件日志(供各模块调用)""" + try: + import json as _json + await async_execute( + "INSERT INTO live_events (level, category, symbol, message, detail) VALUES ($1, $2, $3, $4, $5)", + level, category, symbol, message, _json.dumps(detail) if detail else None + ) + except Exception: + pass + + +@app.get("/api/live/events") +async def live_events( + limit: int = 50, + level: str = "all", + category: str = "all", + user: dict = Depends(get_current_user), +): + """实盘事件流""" + conditions = ["1=1"] + params = [] + idx = 1 + + if level != "all": + conditions.append(f"level = ${idx}") + params.append(level) + idx += 1 + if category != "all": + conditions.append(f"category = ${idx}") + params.append(category) + idx += 1 + + params.append(limit) + where = " AND ".join(conditions) + rows = await async_fetch( + f"SELECT id, ts, level, category, symbol, message, detail " + f"FROM live_events WHERE {where} ORDER BY ts DESC LIMIT ${idx}", + *params + ) + return {"count": len(rows), "data": rows} diff --git a/backend/position_sync.py b/backend/position_sync.py index b96bcb9..675a942 100644 --- a/backend/position_sync.py +++ b/backend/position_sync.py @@ -481,6 +481,12 @@ async def check_closed_positions(session, conn): f"funding={funding_usdt:+.4f}$ | net={pnl_r:+.3f}R" ) + # 写event + evt_level = "info" if pnl_r >= 0 else "warn" + _log_event(conn, evt_level, "trade", + f"平仓 {lp['direction']} {symbol} | {status} | net={pnl_r:+.3f}R (gross={gross_pnl_r:+.3f} fee=-{fee_r:.3f} fr={funding_usdt:+.4f}$)", + symbol, {"trade_id": lp["id"], "status": status, "pnl_r": round(pnl_r, 4), "exit_price": exit_price}) + # ============ 资金费率结算追踪 ============ @@ -569,6 +575,19 @@ async def track_funding_fees(session, conn): logger.info(f"💰 Funding更新完成: {funding_by_symbol}") +def _log_event(conn, level, category, message, symbol=None, detail=None): + """写live_events表""" + try: + cur = conn.cursor() + cur.execute( + "INSERT INTO live_events (level, category, symbol, message, detail) VALUES (%s, %s, %s, %s, %s)", + (level, category, symbol, message, json.dumps(detail) if detail else None) + ) + conn.commit() + except Exception: + pass + + # ============ 主循环 ============ async def main(): diff --git a/backend/risk_guard.py b/backend/risk_guard.py index 0841646..8263f2b 100644 --- a/backend/risk_guard.py +++ b/backend/risk_guard.py @@ -347,6 +347,9 @@ async def trigger_circuit_break(session, conn, reason: str, action: str = "block # 写状态文件(供其他进程读取) write_risk_state() + # 写event到DB + _log_event(conn, "critical", "risk", f"🔴 熔断: {reason} | 动作: {action}", detail={"reason": reason, "action": action}) + def write_risk_state(): """写风控状态到文件(供live_executor读取判断是否可开仓)""" @@ -358,6 +361,19 @@ def write_risk_state(): logger.error(f"写风控状态失败: {e}") +def _log_event(conn, level, category, message, symbol=None, detail=None): + """写live_events表""" + try: + cur = conn.cursor() + cur.execute( + "INSERT INTO live_events (level, category, symbol, message, detail) VALUES (%s, %s, %s, %s, %s)", + (level, category, symbol, message, json.dumps(detail) if detail else None) + ) + conn.commit() + except Exception: + pass + + def check_auto_resume(): """检查是否可以自动恢复""" if risk_state.status != "circuit_break": @@ -419,6 +435,7 @@ async def check_emergency_commands(session, conn): risk_state.circuit_break_reason = f"人工禁止新开仓 (操作人: {user})" write_risk_state() logger.warning(f"🟡 人工禁止新开仓 (操作人: {user})") + _log_event(conn, "warn", "risk", f"🟡 人工禁止新开仓 (操作人: {user})") elif action == "resume": risk_state.status = "normal" @@ -428,6 +445,7 @@ async def check_emergency_commands(session, conn): risk_state.circuit_break_time = None write_risk_state() logger.info(f"✅ 人工恢复交易 (操作人: {user})") + _log_event(conn, "info", "risk", f"✅ 人工恢复交易 (操作人: {user})") else: logger.warning(f"⚠ 未知紧急指令: {action}") diff --git a/frontend/app/live/page.tsx b/frontend/app/live/page.tsx index b7036a0..8e8061f 100644 --- a/frontend/app/live/page.tsx +++ b/frontend/app/live/page.tsx @@ -381,6 +381,69 @@ function L6_RiskStatus() { ); } +// ═══════════════════════════════════════════════════════════════ +// L7: 实时事件流 +// ═══════════════════════════════════════════════════════════════ +function L7_EventStream() { + const [events, setEvents] = useState([]); + const [filter, setFilter] = useState("all"); + useEffect(() => { + const f = async () => { + try { + const r = await authFetch(`/api/live/events?limit=30&level=${filter}`); + if (r.ok) { const d = await r.json(); setEvents(d.data || []); } + } catch {} + }; + f(); const iv = setInterval(f, 5000); return () => clearInterval(iv); + }, [filter]); + + const levelIcon: Record = { info: "ℹ️", warn: "⚠️", error: "❌", critical: "🔴" }; + const levelColor: Record = { + info: "text-blue-600 bg-blue-50", warn: "text-amber-600 bg-amber-50", + error: "text-red-600 bg-red-50", critical: "text-red-700 bg-red-100 font-bold" + }; + const catColor: Record = { + trade: "bg-emerald-100 text-emerald-700", risk: "bg-red-100 text-red-700", + system: "bg-slate-100 text-slate-600", reconciliation: "bg-violet-100 text-violet-700" + }; + + return ( +
+
+

L7 事件流

+
+ {["all","critical","warn","info"].map(l => ( + + ))} +
+
+
+ {events.length === 0 ?
暂无事件
: ( +
+ {events.map((e: any) => { + const t = new Date(typeof e.ts === "number" ? (e.ts > 1e12 ? e.ts : e.ts*1000) : e.ts); + const timeStr = t.toLocaleTimeString("zh-CN", {hour:"2-digit",minute:"2-digit",second:"2-digit",hour12:false}); + const dateStr = t.toLocaleDateString("zh-CN", {month:"2-digit",day:"2-digit"}); + return ( +
+ {levelIcon[e.level] || "📝"} + {dateStr} {timeStr} + {e.category && {e.category}} + {e.symbol && {e.symbol.replace("USDT","")}} + {e.message} +
+ ); + })} +
+ )} +
+
+ ); +} + // ═══════════════════════════════════════════════════════════════ // L8: 实盘 vs 模拟盘对照 // ═══════════════════════════════════════════════════════════════ @@ -597,6 +660,10 @@ export default function LiveTradingPage() { + +{/* L7: 实时事件流 */} + +