feat: L7事件流 — live_events表+API+前端+三模块事件写入

DB:
- live_events表(id/ts/level/category/symbol/message/detail)

后端:
- GET /api/live/events?limit=30&level=all|critical|warn|info&category=all|trade|risk
- log_live_event() 异步辅助函数

事件源:
- live_executor: 开仓成功写event(trade/info, 含滑点/score)
- position_sync: 平仓写event(trade, 含PnL拆解)
- risk_guard: 熔断/禁仓/恢复写event(risk/critical|warn|info)

前端L7:
- 5秒轮询, 最近30条事件
- 按级别筛选(全部/严重/警告/信息)
- 彩色分类标签(trade绿/risk红/system灰/reconciliation紫)
- 时间线布局, 最大高度56带滚动
This commit is contained in:
root 2026-03-02 10:22:40 +00:00
parent fb0c3806b5
commit d7788d3766
5 changed files with 169 additions and 0 deletions

View File

@ -404,9 +404,29 @@ async def execute_entry(session: aiohttp.ClientSession, signal: dict, db_conn):
db_conn.commit()
logger.info(f"[{symbol}] ✅ 实盘开仓完成 | trade_id={trade_id} | {direction} @ {fill_price} | score={score} | 策略={strategy}")
# 写event
_log_event(db_conn, "info", "trade", f"{direction} {symbol} @ {fill_price} | score={score} | 滑点={slippage_bps:.1f}bps", symbol,
{"trade_id": trade_id, "score": score, "fill_price": fill_price, "slippage_bps": round(slippage_bps, 2)})
return trade_id
# ============ 事件日志 ============
def _log_event(conn, level, category, message, symbol=None, detail=None):
"""写live_events表"""
try:
cur = conn.cursor()
cur.execute(
"INSERT INTO live_events (level, category, symbol, message, detail) VALUES (%s, %s, %s, %s, %s)",
(level, category, symbol, message, json.dumps(detail) if detail else None)
)
conn.commit()
except Exception:
pass
# ============ 信号监听 ============
def get_db_connection():

View File

@ -1842,3 +1842,48 @@ async def live_paper_comparison(
"avg_pnl_diff_r": round(total_pnl_diff / count, 4) if count > 0 else 0,
"data": comparisons,
}
# ============ Live Events (L7通知流) ============
async def log_live_event(level: str, category: str, message: str, symbol: str = None, detail: dict = None):
"""写入实盘事件日志(供各模块调用)"""
try:
import json as _json
await async_execute(
"INSERT INTO live_events (level, category, symbol, message, detail) VALUES ($1, $2, $3, $4, $5)",
level, category, symbol, message, _json.dumps(detail) if detail else None
)
except Exception:
pass
@app.get("/api/live/events")
async def live_events(
limit: int = 50,
level: str = "all",
category: str = "all",
user: dict = Depends(get_current_user),
):
"""实盘事件流"""
conditions = ["1=1"]
params = []
idx = 1
if level != "all":
conditions.append(f"level = ${idx}")
params.append(level)
idx += 1
if category != "all":
conditions.append(f"category = ${idx}")
params.append(category)
idx += 1
params.append(limit)
where = " AND ".join(conditions)
rows = await async_fetch(
f"SELECT id, ts, level, category, symbol, message, detail "
f"FROM live_events WHERE {where} ORDER BY ts DESC LIMIT ${idx}",
*params
)
return {"count": len(rows), "data": rows}

View File

@ -481,6 +481,12 @@ async def check_closed_positions(session, conn):
f"funding={funding_usdt:+.4f}$ | net={pnl_r:+.3f}R"
)
# 写event
evt_level = "info" if pnl_r >= 0 else "warn"
_log_event(conn, evt_level, "trade",
f"平仓 {lp['direction']} {symbol} | {status} | net={pnl_r:+.3f}R (gross={gross_pnl_r:+.3f} fee=-{fee_r:.3f} fr={funding_usdt:+.4f}$)",
symbol, {"trade_id": lp["id"], "status": status, "pnl_r": round(pnl_r, 4), "exit_price": exit_price})
# ============ 资金费率结算追踪 ============
@ -569,6 +575,19 @@ async def track_funding_fees(session, conn):
logger.info(f"💰 Funding更新完成: {funding_by_symbol}")
def _log_event(conn, level, category, message, symbol=None, detail=None):
"""写live_events表"""
try:
cur = conn.cursor()
cur.execute(
"INSERT INTO live_events (level, category, symbol, message, detail) VALUES (%s, %s, %s, %s, %s)",
(level, category, symbol, message, json.dumps(detail) if detail else None)
)
conn.commit()
except Exception:
pass
# ============ 主循环 ============
async def main():

View File

@ -347,6 +347,9 @@ async def trigger_circuit_break(session, conn, reason: str, action: str = "block
# 写状态文件(供其他进程读取)
write_risk_state()
# 写event到DB
_log_event(conn, "critical", "risk", f"🔴 熔断: {reason} | 动作: {action}", detail={"reason": reason, "action": action})
def write_risk_state():
"""写风控状态到文件供live_executor读取判断是否可开仓"""
@ -358,6 +361,19 @@ def write_risk_state():
logger.error(f"写风控状态失败: {e}")
def _log_event(conn, level, category, message, symbol=None, detail=None):
"""写live_events表"""
try:
cur = conn.cursor()
cur.execute(
"INSERT INTO live_events (level, category, symbol, message, detail) VALUES (%s, %s, %s, %s, %s)",
(level, category, symbol, message, json.dumps(detail) if detail else None)
)
conn.commit()
except Exception:
pass
def check_auto_resume():
"""检查是否可以自动恢复"""
if risk_state.status != "circuit_break":
@ -419,6 +435,7 @@ async def check_emergency_commands(session, conn):
risk_state.circuit_break_reason = f"人工禁止新开仓 (操作人: {user})"
write_risk_state()
logger.warning(f"🟡 人工禁止新开仓 (操作人: {user})")
_log_event(conn, "warn", "risk", f"🟡 人工禁止新开仓 (操作人: {user})")
elif action == "resume":
risk_state.status = "normal"
@ -428,6 +445,7 @@ async def check_emergency_commands(session, conn):
risk_state.circuit_break_time = None
write_risk_state()
logger.info(f"✅ 人工恢复交易 (操作人: {user})")
_log_event(conn, "info", "risk", f"✅ 人工恢复交易 (操作人: {user})")
else:
logger.warning(f"⚠ 未知紧急指令: {action}")

View File

@ -381,6 +381,69 @@ function L6_RiskStatus() {
);
}
// ═══════════════════════════════════════════════════════════════
// L7: 实时事件流
// ═══════════════════════════════════════════════════════════════
function L7_EventStream() {
const [events, setEvents] = useState<any[]>([]);
const [filter, setFilter] = useState("all");
useEffect(() => {
const f = async () => {
try {
const r = await authFetch(`/api/live/events?limit=30&level=${filter}`);
if (r.ok) { const d = await r.json(); setEvents(d.data || []); }
} catch {}
};
f(); const iv = setInterval(f, 5000); return () => clearInterval(iv);
}, [filter]);
const levelIcon: Record<string, string> = { info: "", warn: "⚠️", error: "❌", critical: "🔴" };
const levelColor: Record<string, string> = {
info: "text-blue-600 bg-blue-50", warn: "text-amber-600 bg-amber-50",
error: "text-red-600 bg-red-50", critical: "text-red-700 bg-red-100 font-bold"
};
const catColor: Record<string, string> = {
trade: "bg-emerald-100 text-emerald-700", risk: "bg-red-100 text-red-700",
system: "bg-slate-100 text-slate-600", reconciliation: "bg-violet-100 text-violet-700"
};
return (
<div className="rounded-xl border border-slate-200 bg-white overflow-hidden">
<div className="px-3 py-2 border-b border-slate-100 flex items-center justify-between">
<h3 className="font-semibold text-slate-800 text-xs">L7 </h3>
<div className="flex gap-1">
{["all","critical","warn","info"].map(l => (
<button key={l} onClick={() => setFilter(l)}
className={`px-1.5 py-0.5 rounded text-[9px] ${filter===l?"bg-blue-600 text-white":"bg-slate-100 text-slate-500 hover:bg-slate-200"}`}>
{l==="all"?"全部":l==="critical"?"严重":l==="warn"?"警告":"信息"}
</button>
))}
</div>
</div>
<div className="max-h-56 overflow-y-auto">
{events.length === 0 ? <div className="text-center text-slate-400 text-sm py-6"></div> : (
<div className="divide-y divide-slate-50">
{events.map((e: any) => {
const t = new Date(typeof e.ts === "number" ? (e.ts > 1e12 ? e.ts : e.ts*1000) : e.ts);
const timeStr = t.toLocaleTimeString("zh-CN", {hour:"2-digit",minute:"2-digit",second:"2-digit",hour12:false});
const dateStr = t.toLocaleDateString("zh-CN", {month:"2-digit",day:"2-digit"});
return (
<div key={e.id} className={`px-3 py-1.5 flex items-start gap-2 text-[10px] ${levelColor[e.level] || ""}`}>
<span className="shrink-0">{levelIcon[e.level] || "📝"}</span>
<span className="shrink-0 text-slate-400 font-mono">{dateStr} {timeStr}</span>
{e.category && <span className={`shrink-0 px-1 py-0.5 rounded text-[8px] ${catColor[e.category] || "bg-slate-100 text-slate-500"}`}>{e.category}</span>}
{e.symbol && <span className="shrink-0 font-mono text-slate-500">{e.symbol.replace("USDT","")}</span>}
<span className="flex-1">{e.message}</span>
</div>
);
})}
</div>
)}
</div>
</div>
);
}
// ═══════════════════════════════════════════════════════════════
// L8: 实盘 vs 模拟盘对照
// ═══════════════════════════════════════════════════════════════
@ -597,6 +660,10 @@ export default function LiveTradingPage() {
<L4_ExecutionQuality />
<L5_Reconciliation />
<L6_RiskStatus />
{/* L7: 实时事件流 */}
<L7_EventStream />
<L8_PaperComparison />
<L9_EquityCurve />
<L10_TradeHistory />