From 638589852bbdec8402ec2dec9e73a3317c0e5830 Mon Sep 17 00:00:00 2001 From: root Date: Mon, 2 Mar 2026 13:56:36 +0000 Subject: [PATCH] =?UTF-8?q?fix:=20P0=E5=AE=A1=E9=98=85=E4=BF=AE=E5=A4=8D?= =?UTF-8?q?=20+=20P1/P2=E5=A2=9E=E5=BC=BA?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit P0-1: SL挂单失败→重试2次→3次失败紧急市价平仓+写event P0-2: TP1检测改用DB qty字段(新增)比对仓位减少,不再用orderId P0-3: emergency-close/block-new/resume/config PUT加admin权限验证 P0-5: risk_guard全平qty按币种精度格式化(BTC:3/ETH:3/XRP:0/SOL:2) P1-3: NOTIFY收到后立即处理跳过sleep,减少信号延迟 P2-1: 三个进程加DB连接断线重连(ensure_db_conn) DB: live_trades新增qty字段 --- backend/live_executor.py | 46 ++++++++++++++++++++++++++++++++-------- backend/main.py | 10 +++++++++ backend/position_sync.py | 22 ++++++++++++++++--- backend/risk_guard.py | 24 +++++++++++++++++++-- 4 files changed, 88 insertions(+), 14 deletions(-) diff --git a/backend/live_executor.py b/backend/live_executor.py index a7dfb30..143f83b 100644 --- a/backend/live_executor.py +++ b/backend/live_executor.py @@ -411,16 +411,24 @@ async def execute_entry(session: aiohttp.ClientSession, signal: dict, db_conn): half_qty = round(qty / 2, prec["qty"]) other_half = round(qty - half_qty, prec["qty"]) - # SL - 全仓 + # SL - 全仓(失败重试2次,3次都失败则紧急平仓) t_before_sl = time.time() * 1000 sl_data, sl_status = await place_stop_order(session, symbol, close_side, sl_price, qty, "STOP_MARKET") + if sl_status != 200: + for retry in range(2): + logger.warning(f"[{symbol}] SL挂单重试 {retry+1}/2...") + await asyncio.sleep(0.3) + sl_data, sl_status = await place_stop_order(session, symbol, close_side, sl_price, qty, "STOP_MARKET") + if sl_status == 200: + break + if sl_status != 200: + logger.error(f"[{symbol}] ❌ SL 3次全部失败,紧急市价平仓! data={sl_data}") + await place_market_order(session, symbol, close_side, qty) + _log_event(db_conn, "critical", "trade", f"SL挂单3次失败,已紧急平仓", symbol, {"sl_data": str(sl_data)}) + return None t_after_sl = time.time() * 1000 protection_gap_ms = int(t_after_sl - t_fill) - if sl_status != 200: - logger.error(f"[{symbol}] ⚠️ SL挂单失败! 裸奔中! data={sl_data}") - # TODO: 自动补挂逻辑 - # TP1 - 半仓 tp1_type = "TAKE_PROFIT_MARKET" await place_stop_order(session, symbol, close_side, tp1_price, half_qty, tp1_type) @@ -437,13 +445,15 @@ async def execute_entry(session: aiohttp.ClientSession, signal: dict, db_conn): sl_price, tp1_price, tp2_price, score, tier, status, risk_distance, atr_at_entry, score_factors, signal_id, binance_order_id, fill_price, slippage_bps, - protection_gap_ms, signal_to_order_ms, order_to_fill_ms + protection_gap_ms, signal_to_order_ms, order_to_fill_ms, + qty ) VALUES ( %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, 'active', %s, %s, %s, %s, %s, %s, %s, - %s, %s, %s + %s, %s, %s, + %s ) RETURNING id """, ( symbol, strategy, direction, fill_price, int(t_fill), @@ -451,6 +461,7 @@ async def execute_entry(session: aiohttp.ClientSession, signal: dict, db_conn): risk_distance, atr, json.dumps(factors) if factors else None, signal_id, order_id, fill_price, round(slippage_bps, 2), protection_gap_ms, signal_to_order_ms, order_to_fill_ms, + qty, )) trade_id = cur.fetchone()[0] db_conn.commit() @@ -608,14 +619,30 @@ async def main(): # 工作DB连接 work_conn = psycopg2.connect(**DB_CONFIG) + def ensure_db_conn(conn): + try: + conn.cursor().execute("SELECT 1") + return conn + except Exception: + logger.warning("⚠️ DB连接断开,重连中...") + try: + conn.close() + except Exception: + pass + return psycopg2.connect(**DB_CONFIG) + async with aiohttp.ClientSession() as http_session: while True: try: - # 检查PG NOTIFY(非阻塞,超时1秒) + # 检查PG NOTIFY(非阻塞) + got_notify = False if listen_conn.poll() == psycopg2.extensions.POLL_OK: while listen_conn.notifies: notify = listen_conn.notifies.pop(0) logger.info(f"📡 收到NOTIFY: {notify.payload}") + got_notify = True + + work_conn = ensure_db_conn(work_conn) # 获取待处理信号(NOTIFY + 轮询双保险) signals = fetch_pending_signals(work_conn) @@ -632,7 +659,8 @@ async def main(): if trade_id: logger.info(f"[{sig['symbol']}] ✅ trade_id={trade_id} 开仓成功") - await asyncio.sleep(1) # 1秒轮询作为fallback + if not got_notify: + await asyncio.sleep(1) # 无NOTIFY时才sleep,有NOTIFY立即处理下一轮 except KeyboardInterrupt: logger.info("🛑 收到退出信号") diff --git a/backend/main.py b/backend/main.py index 0cad1ac..9a08a63 100644 --- a/backend/main.py +++ b/backend/main.py @@ -1471,9 +1471,16 @@ async def live_risk_status(user: dict = Depends(get_current_user)): return {"status": "unknown", "error": "risk_guard_state.json not found"} +def _require_admin(user: dict): + """检查管理员权限""" + if user.get("role") != "admin": + raise HTTPException(status_code=403, detail="仅管理员可执行此操作") + + @app.post("/api/live/emergency-close") async def live_emergency_close(user: dict = Depends(get_current_user)): """紧急全平(写标记文件,由risk_guard执行)""" + _require_admin(user) try: import json as _json with open("/tmp/risk_guard_emergency.json", "w") as f: @@ -1486,6 +1493,7 @@ async def live_emergency_close(user: dict = Depends(get_current_user)): @app.post("/api/live/block-new") async def live_block_new(user: dict = Depends(get_current_user)): """禁止新开仓""" + _require_admin(user) try: import json as _json with open("/tmp/risk_guard_emergency.json", "w") as f: @@ -1498,6 +1506,7 @@ async def live_block_new(user: dict = Depends(get_current_user)): @app.post("/api/live/resume") async def live_resume(user: dict = Depends(get_current_user)): """恢复交易""" + _require_admin(user) try: import json as _json with open("/tmp/risk_guard_emergency.json", "w") as f: @@ -1904,6 +1913,7 @@ async def live_config_get(user: dict = Depends(get_current_user)): @app.put("/api/live/config") async def live_config_update(request: Request, user: dict = Depends(get_current_user)): """更新实盘配置""" + _require_admin(user) body = await request.json() updated = [] for key, value in body.items(): diff --git a/backend/position_sync.py b/backend/position_sync.py index aa68dc3..a311a76 100644 --- a/backend/position_sync.py +++ b/backend/position_sync.py @@ -158,7 +158,7 @@ def get_local_positions(conn): cur = conn.cursor() cur.execute(""" SELECT id, symbol, strategy, direction, entry_price, sl_price, tp1_price, tp2_price, - tp1_hit, status, risk_distance, binance_order_id, entry_ts + tp1_hit, status, risk_distance, binance_order_id, entry_ts, qty FROM live_trades WHERE status IN ('active', 'tp1_hit') ORDER BY entry_ts DESC @@ -170,7 +170,7 @@ def get_local_positions(conn): "entry_price": row[4], "sl_price": row[5], "tp1_price": row[6], "tp2_price": row[7], "tp1_hit": row[8], "status": row[9], "risk_distance": row[10], "binance_order_id": row[11], "entry_ts": row[12], - "binance_order_id": row[11], + "qty": row[13], }) return positions @@ -359,7 +359,8 @@ async def check_tp1_triggers(session, conn): if not tp1_found and lp["status"] == "active": # TP1可能已触发,验证仓位是否减半 bp = (await get_binance_positions(session)).get(symbol) - if bp and abs(bp["amount"]) < abs(float(lp.get("binance_order_id", "0") or "0")): + entry_qty = lp.get("qty") or 0 + if bp and entry_qty > 0 and abs(bp["amount"]) < entry_qty * 0.75: # 确认TP1触发 logger.info(f"[{symbol}] ✅ TP1触发! 移SL到保本价") @@ -595,6 +596,20 @@ def _log_event(conn, level, category, message, symbol=None, detail=None): # ============ 主循环 ============ +def ensure_db_conn(conn): + """检查DB连接,断线则重连""" + try: + conn.cursor().execute("SELECT 1") + return conn + except Exception: + logger.warning("⚠️ DB连接断开,重连中...") + try: + conn.close() + except Exception: + pass + return psycopg2.connect(**DB_CONFIG) + + async def main(): logger.info("=" * 60) logger.info(f"🔄 Position Sync 启动 | 环境={TRADE_ENV} | 间隔={CHECK_INTERVAL}秒") @@ -606,6 +621,7 @@ async def main(): async with aiohttp.ClientSession() as session: while True: try: + conn = ensure_db_conn(conn) # 1. 对账 result = await reconcile(session, conn) diff --git a/backend/risk_guard.py b/backend/risk_guard.py index 9d21a79..03042f1 100644 --- a/backend/risk_guard.py +++ b/backend/risk_guard.py @@ -68,6 +68,9 @@ ACCOUNT_UPDATE_STALE_SEC = 20 CHECK_INTERVAL = 5 # 风控检查间隔(秒) SYMBOLS = ["BTCUSDT", "ETHUSDT", "XRPUSDT", "SOLUSDT"] +SYMBOL_QTY_PRECISION = { + "BTCUSDT": 3, "ETHUSDT": 3, "XRPUSDT": 0, "SOLUSDT": 2, +} logging.basicConfig( level=logging.INFO, @@ -336,11 +339,13 @@ async def trigger_circuit_break(session, conn, reason: str, action: str = "block amt = float(p.get("positionAmt", 0)) if amt != 0: close_side = "SELL" if amt > 0 else "BUY" + qty_prec = SYMBOL_QTY_PRECISION.get(symbol, 3) + qty_str = f"{abs(amt):.{qty_prec}f}" await binance_request(session, "POST", "/fapi/v1/order", { "symbol": symbol, "side": close_side, "type": "MARKET", - "quantity": str(abs(amt)), "reduceOnly": "true", + "quantity": qty_str, "reduceOnly": "true", }) - logger.info(f"[{symbol}] 🔴 紧急平仓 {close_side} qty={abs(amt)}") + logger.info(f"[{symbol}] 🔴 紧急平仓 {close_side} qty={qty_str}") # 记录历史 risk_state.breaker_history.append({ @@ -458,6 +463,20 @@ async def check_emergency_commands(session, conn): # ============ 主循环 ============ +def ensure_db_conn(conn): + """检查DB连接,断线则重连""" + try: + conn.cursor().execute("SELECT 1") + return conn + except Exception: + logger.warning("⚠️ DB连接断开,重连中...") + try: + conn.close() + except Exception: + pass + return psycopg2.connect(**DB_CONFIG) + + async def main(): logger.info("=" * 60) logger.info(f"🛡 Risk Guard 启动 | 环境={TRADE_ENV}") @@ -474,6 +493,7 @@ async def main(): async with aiohttp.ClientSession() as session: while True: try: + conn = ensure_db_conn(conn) # 0. 检查自动恢复 check_auto_resume()