diff --git a/backend/db.py b/backend/db.py index 041155b..844d25b 100644 --- a/backend/db.py +++ b/backend/db.py @@ -303,6 +303,57 @@ CREATE TABLE IF NOT EXISTS paper_trades ( score_factors JSONB, created_at TIMESTAMP DEFAULT NOW() ); + +-- Live trading tables +CREATE TABLE IF NOT EXISTS live_config ( + key TEXT PRIMARY KEY, + value TEXT NOT NULL, + label TEXT, + updated_at TIMESTAMP DEFAULT NOW() +); + +CREATE TABLE IF NOT EXISTS live_events ( + id BIGSERIAL PRIMARY KEY, + ts BIGINT DEFAULT (EXTRACT(EPOCH FROM NOW()) * 1000)::BIGINT, + level TEXT, + category TEXT, + symbol TEXT, + message TEXT, + detail JSONB +); + +CREATE TABLE IF NOT EXISTS live_trades ( + id BIGSERIAL PRIMARY KEY, + symbol TEXT NOT NULL, + strategy TEXT NOT NULL, + direction TEXT NOT NULL, + status TEXT NOT NULL DEFAULT 'active', + entry_price DOUBLE PRECISION, + exit_price DOUBLE PRECISION, + entry_ts BIGINT, + exit_ts BIGINT, + sl_price DOUBLE PRECISION, + tp1_price DOUBLE PRECISION, + tp2_price DOUBLE PRECISION, + tp1_hit BOOLEAN DEFAULT FALSE, + score DOUBLE PRECISION, + tier TEXT, + pnl_r DOUBLE PRECISION, + fee_usdt DOUBLE PRECISION DEFAULT 0, + funding_fee_usdt DOUBLE PRECISION DEFAULT 0, + risk_distance DOUBLE PRECISION, + atr_at_entry DOUBLE PRECISION, + score_factors JSONB, + signal_id BIGINT, + binance_order_id TEXT, + fill_price DOUBLE PRECISION, + slippage_bps DOUBLE PRECISION, + protection_gap_ms BIGINT, + signal_to_order_ms BIGINT, + order_to_fill_ms BIGINT, + qty DOUBLE PRECISION, + created_at TIMESTAMP DEFAULT NOW() +); """ diff --git a/backend/live_executor.py b/backend/live_executor.py index c25a50f..a121374 100644 --- a/backend/live_executor.py +++ b/backend/live_executor.py @@ -317,9 +317,15 @@ async def execute_entry(session: aiohttp.ClientSession, signal: dict, db_conn): symbol, {"signal_age_ms": round(signal_age_ms), "score": score}) return None - # 0.5 检查风控状态(读risk_guard写的状态文件) + # 0.5 检查风控状态(Fail-Closed: 状态异常时拒绝开仓) + state_path = "/tmp/risk_guard_state.json" try: - with open("/tmp/risk_guard_state.json") as f: + st = os.stat(state_path) + if time.time() - st.st_mtime > 15: + logger.error(f"[{symbol}] ❌ 风控状态文件超过15秒未更新,risk_guard可能失联,拒绝开仓") + _log_event(db_conn, "critical", "risk", "风控失联(状态文件过期),拒绝开仓", symbol) + return None + with open(state_path) as f: risk_state = json.load(f) if risk_state.get("block_new_entries"): logger.warning(f"[{symbol}] ❌ 风控禁止新开仓: {risk_state.get('circuit_break_reason', '未知原因')}") @@ -328,9 +334,11 @@ async def execute_entry(session: aiohttp.ClientSession, signal: dict, db_conn): logger.warning(f"[{symbol}] ❌ 只减仓模式,拒绝开仓") return None except FileNotFoundError: - pass # risk_guard还没启动,允许交易 + logger.error(f"[{symbol}] ❌ 风控状态文件不存在,risk_guard未运行,拒绝开仓") + return None except Exception as e: - logger.warning(f"[{symbol}] ⚠ 读取风控状态失败: {e},继续交易") + logger.error(f"[{symbol}] ❌ 读取风控状态失败: {e},拒绝开仓") + return None # 检查前端紧急操作指令 try: @@ -525,21 +533,20 @@ def fetch_pending_signals(conn): """查询未处理的新信号""" cur = conn.cursor() # 查最近60秒内的新信号(score>=阈值、策略匹配、未被live_executor处理过) - strategies_str = ",".join(f"'{s}'" for s in ENABLED_STRATEGIES) - cur.execute(f""" + cur.execute(""" SELECT si.id, si.symbol, si.signal, si.score, si.ts, si.factors, si.strategy, si.price FROM signal_indicators si WHERE si.signal IS NOT NULL AND si.signal != '' - AND si.strategy IN ({strategies_str}) + AND si.strategy = ANY(%s) AND si.ts > extract(epoch from now()) * 1000 - 60000 AND NOT EXISTS ( SELECT 1 FROM live_trades lt WHERE lt.signal_id = si.id AND lt.strategy = si.strategy ) ORDER BY si.ts DESC - """) + """, (ENABLED_STRATEGIES,)) rows = cur.fetchall() signals = [] for row in rows: diff --git a/backend/main.py b/backend/main.py index 46392c7..42082a2 100644 --- a/backend/main.py +++ b/backend/main.py @@ -1473,13 +1473,20 @@ async def live_risk_status(user: dict = Depends(get_current_user)): return {"status": "unknown", "error": "risk_guard_state.json not found"} +_risk_usd_cache = {"v": 2.0, "ts": 0.0} + async def _get_risk_usd() -> float: """从live_config读取1R金额,缓存60秒""" + now = time.time() + if now - _risk_usd_cache["ts"] < 60: + return _risk_usd_cache["v"] try: row = await async_fetchrow("SELECT value FROM live_config WHERE key = $1", "risk_per_trade_usd") - return float(row["value"]) if row else 2.0 + v = float(row["value"]) if row else 2.0 except Exception: - return 2.0 + v = 2.0 + _risk_usd_cache.update({"v": v, "ts": now}) + return v def _require_admin(user: dict): diff --git a/backend/market_data_collector.py b/backend/market_data_collector.py index 33a3a43..ebee07c 100644 --- a/backend/market_data_collector.py +++ b/backend/market_data_collector.py @@ -113,7 +113,9 @@ class MarketDataCollector: "BTCUSDT": "BTC-USD", "ETHUSDT": "ETH-USD", } - coinbase_pair = pair_map[symbol] + coinbase_pair = pair_map.get(symbol) + if not coinbase_pair: + return # XRP/SOL无Coinbase数据,跳过 binance_url = "https://api.binance.com/api/v3/ticker/price" coinbase_url = f"https://api.coinbase.com/v2/prices/{coinbase_pair}/spot" diff --git a/backend/position_sync.py b/backend/position_sync.py index 484e50e..340c705 100644 --- a/backend/position_sync.py +++ b/backend/position_sync.py @@ -444,8 +444,8 @@ async def check_closed_positions(session, conn): if total_qty > 0: exit_price = sum(float(t["price"]) * float(t["qty"]) for t in close_trades) / total_qty elif trades_data: - # fallback: 不盲目取最后一条(可能是开仓成交),延后本轮结算 - logger.warning(f"[{symbol}] 未找到明确平仓成交,延后结算") + # fallback: 未找到明确平仓成交,延后本轮结算 + logger.warning(f"[{symbol}] 未找到明确平仓成交,延后结算") continue # 汇总手续费(开仓后200ms起算,避免含其他策略成交) @@ -464,9 +464,10 @@ async def check_closed_positions(session, conn): tp1_r = abs(lp["tp1_price"] - lp["entry_price"]) / rd gross_pnl_r = 0.5 * tp1_r + 0.5 * gross_pnl_r - # 手续费(R) — 用实际成交手续费 + # 手续费(R) — 用实际成交手续费(动态1R) + _risk_usd = load_live_risk_usd(conn) if actual_fee_usdt > 0: - fee_r = actual_fee_usdt / (RISK_PER_TRADE_USD if RISK_PER_TRADE_USD > 0 else rd) + fee_r = actual_fee_usdt / (_risk_usd if _risk_usd > 0 else rd) else: # fallback: 按0.1%估算(开+平各0.05%) fee_r = 0.001 * lp["entry_price"] / rd @@ -477,10 +478,11 @@ async def check_closed_positions(session, conn): fr_row = cur.fetchone() if fr_row: funding_usdt = fr_row[0] - funding_r = abs(funding_usdt) / (RISK_PER_TRADE_USD if RISK_PER_TRADE_USD > 0 else rd) if funding_usdt < 0 else 0 + # P1-3: 正向funding也计入净PnL(正值增加收益,负值减少收益) + funding_r = funding_usdt / (_risk_usd if _risk_usd > 0 else rd) - # 净PnL = gross - fee - funding_cost - pnl_r = gross_pnl_r - fee_r - funding_r + # 净PnL = gross - fee + funding (funding正值=收入,负值=支出) + pnl_r = gross_pnl_r - fee_r + funding_r # 判断状态 if pnl_r > 0.5: @@ -619,6 +621,17 @@ def _log_event(conn, level, category, message, symbol=None, detail=None): # ============ 主循环 ============ +def load_live_risk_usd(conn, default=2.0): + """从live_config动态读取1R金额,与live_executor保持一致""" + try: + cur = conn.cursor() + cur.execute("SELECT value FROM live_config WHERE key='risk_per_trade_usd'") + row = cur.fetchone() + return float(row[0]) if row and row[0] else default + except Exception: + return default + + def ensure_db_conn(conn): """检查DB连接,断线则重连""" try: diff --git a/backend/risk_guard.py b/backend/risk_guard.py index fa00a6e..e582431 100644 --- a/backend/risk_guard.py +++ b/backend/risk_guard.py @@ -220,7 +220,7 @@ def check_daily_loss(conn): return realized_r, consecutive -async def check_unrealized_loss(session): +async def check_unrealized_loss(session, risk_usd_dynamic=2.0): """检查未实现亏损""" data, status = await binance_request(session, "GET", "/fapi/v2/positionRisk") total_unrealized = 0 @@ -229,7 +229,7 @@ async def check_unrealized_loss(session): pnl = float(pos.get("unRealizedProfit", 0)) total_unrealized += pnl # 转为R - unrealized_r = total_unrealized / RISK_PER_TRADE_USD if RISK_PER_TRADE_USD > 0 else 0 + unrealized_r = total_unrealized / risk_usd_dynamic if risk_usd_dynamic > 0 else 0 risk_state.today_unrealized_r = unrealized_r return unrealized_r @@ -245,7 +245,7 @@ async def check_balance(session): return 0 -def check_data_freshness(): +def check_data_freshness(conn=None): """检查数据新鲜度""" now = time.time() issues = [] @@ -254,6 +254,19 @@ def check_data_freshness(): if api_gap > API_DISCONNECT_THRESHOLD_SEC: issues.append(f"API无响应{api_gap:.0f}秒") + # 检查行情数据新鲜度(signal_indicators最新ts) + if conn: + try: + cur = conn.cursor() + cur.execute("SELECT MAX(ts) FROM signal_indicators") + row = cur.fetchone() + if row and row[0]: + market_age = now - (row[0] / 1000) + if market_age > MARKET_DATA_STALE_SEC: + issues.append(f"行情数据延迟{market_age:.1f}秒") + except Exception: + pass + return issues @@ -350,11 +363,23 @@ async def trigger_circuit_break(session, conn, reason: str, action: str = "block close_side = "SELL" if amt > 0 else "BUY" qty_prec = SYMBOL_QTY_PRECISION.get(symbol, 3) qty_str = f"{abs(amt):.{qty_prec}f}" - await binance_request(session, "POST", "/fapi/v1/order", { + close_resp, close_status = await binance_request(session, "POST", "/fapi/v1/order", { "symbol": symbol, "side": close_side, "type": "MARKET", "quantity": qty_str, "reduceOnly": "true", }) - logger.info(f"[{symbol}] 🔴 紧急平仓 {close_side} qty={qty_str}") + if close_status != 200: + logger.error(f"[{symbol}] ❌ 紧急平仓失败: {close_resp}") + _log_event(conn, "critical", "risk", f"紧急平仓失败 {symbol}", symbol, + {"response": str(close_resp)}) + else: + logger.info(f"[{symbol}] 🔴 紧急平仓 {close_side} qty={qty_str}") + # 二次验仓 + verify, v_status = await binance_request(session, "GET", "/fapi/v2/positionRisk", {"symbol": symbol}) + if v_status == 200 and isinstance(verify, list): + still_open = any(abs(float(p.get("positionAmt", 0))) > 0 and p.get("symbol") == symbol for p in verify) + if still_open: + logger.error(f"[{symbol}] ❌ 紧急平仓后仍有仓位! 需人工介入") + _log_event(conn, "critical", "risk", f"紧急平仓后仍有仓位 {symbol}", symbol) # 记录历史 risk_state.breaker_history.append({ @@ -478,6 +503,17 @@ async def check_emergency_commands(session, conn): # ============ 主循环 ============ +def load_live_risk_usd(conn, default=2.0): + """从live_config动态读取1R金额,与live_executor保持一致""" + try: + cur = conn.cursor() + cur.execute("SELECT value FROM live_config WHERE key='risk_per_trade_usd'") + row = cur.fetchone() + return float(row[0]) if row and row[0] else default + except Exception: + return default + + def ensure_db_conn(conn): """检查DB连接,断线则重连""" try: @@ -517,7 +553,8 @@ async def main(): # 1. 今日亏损检查 realized_r, consecutive = check_daily_loss(conn) - unrealized_r = await check_unrealized_loss(session) + _live_risk_usd = load_live_risk_usd(conn) + unrealized_r = await check_unrealized_loss(session, _live_risk_usd) total_r = realized_r + min(unrealized_r, 0) # 已实现 + 未实现亏损 if total_r <= DAILY_LOSS_LIMIT_R and risk_state.status != "circuit_break": @@ -538,7 +575,7 @@ async def main(): risk_state.auto_resume_time = time.time() + CONSECUTIVE_LOSS_COOLDOWN_MIN * 60 # 3. API连接检查 - freshness_issues = check_data_freshness() + freshness_issues = check_data_freshness(conn) if freshness_issues and risk_state.status != "circuit_break": await trigger_circuit_break( session, conn, diff --git a/backend/signal_engine.py b/backend/signal_engine.py index 920b61f..126775f 100644 --- a/backend/signal_engine.py +++ b/backend/signal_engine.py @@ -151,7 +151,8 @@ def fetch_market_indicators(symbol: str) -> dict: elif ind_type == "open_interest_hist": indicators[ind_type] = float(val.get("sumOpenInterestValue", 0)) elif ind_type == "coinbase_premium": - indicators[ind_type] = float(val.get("premium_pct", 0)) + # premium_pct存的是百分比(如0.05=0.05%),转成比例(0.0005) + indicators[ind_type] = float(val.get("premium_pct", 0)) / 100.0 elif ind_type == "funding_rate": indicators[ind_type] = float(val.get("fundingRate", val.get("lastFundingRate", 0))) return indicators diff --git a/frontend/app/paper-v52/page.tsx b/frontend/app/paper-v52/page.tsx index b60907a..dc64419 100644 --- a/frontend/app/paper-v52/page.tsx +++ b/frontend/app/paper-v52/page.tsx @@ -294,7 +294,8 @@ function ActivePositions({ strategy }: { strategy: StrategyFilter }) { const fullR = riskDist > 0 ? (p.direction === "LONG" ? (currentPrice - entry) / riskDist : (entry - currentPrice) / riskDist) : 0; const tp1R = riskDist > 0 ? (p.direction === "LONG" ? ((p.tp1_price || 0) - entry) / riskDist : (entry - (p.tp1_price || 0)) / riskDist) : 0; const unrealR = p.tp1_hit ? 0.5 * tp1R + 0.5 * fullR : fullR; - const unrealUsdt = unrealR * 200; + const paper1R = (config?.initial_balance || 10000) * (config?.risk_per_trade || 0.02); + const unrealUsdt = unrealR * paper1R; return (
diff --git a/frontend/app/paper/page.tsx b/frontend/app/paper/page.tsx index 9a73c05..3f93591 100644 --- a/frontend/app/paper/page.tsx +++ b/frontend/app/paper/page.tsx @@ -231,7 +231,8 @@ function ActivePositions() { const fullR = riskDist > 0 ? (p.direction === "LONG" ? (currentPrice - entry) / riskDist : (entry - currentPrice) / riskDist) : 0; const tp1R = riskDist > 0 ? (p.direction === "LONG" ? ((p.tp1_price || 0) - entry) / riskDist : (entry - (p.tp1_price || 0)) / riskDist) : 0; const unrealR = p.tp1_hit ? 0.5 * tp1R + 0.5 * fullR : fullR; - const unrealUsdt = unrealR * 200; + const paper1R = (config?.initial_balance || 10000) * (config?.risk_per_trade || 0.02); + const unrealUsdt = unrealR * paper1R; return (