From b08ea8f7729f2b96368ffd54cd5719c28822e0bd Mon Sep 17 00:00:00 2001 From: root Date: Mon, 2 Mar 2026 09:06:31 +0000 Subject: [PATCH] =?UTF-8?q?feat:=20risk=5Fguard.py=20-=20=E9=A3=8E?= =?UTF-8?q?=E6=8E=A7=E7=86=94=E6=96=AD=E6=A8=A1=E5=9D=97?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 单日亏损超限(-5R): 已实现+未实现亏损预估 → 全平+停机 - 连续亏损(5连亏): 暂停开仓1小时,冷却期后自动恢复 - API连接异常(>30秒): 暂停开仓,恢复后自动解除 - 余额不足(<风险×2): 拒绝开仓 - 持仓超时: 45min黄/60min红+10min人工窗口/70min自动平仓 - 写/tmp/risk_guard_state.json供live_executor读取 - 每5秒检查一次,每60秒输出状态日志 --- backend/risk_guard.py | 480 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 480 insertions(+) create mode 100644 backend/risk_guard.py diff --git a/backend/risk_guard.py b/backend/risk_guard.py new file mode 100644 index 0000000..712fdf2 --- /dev/null +++ b/backend/risk_guard.py @@ -0,0 +1,480 @@ +#!/usr/bin/env python3 +""" +Risk Guard - 风控熔断模块 +实时监控风险指标,触发熔断时自动执行保护动作 + +熔断规则: +1. 单日亏损超限(-5R) → 全平+停机 +2. 连续亏损(5连亏) → 暂停开仓1小时 +3. API连接异常(>30秒) → 暂停开仓 +4. 余额不足(< 风险×2) → 拒绝开仓 +5. 数据新鲜度超时 → 禁止新开仓 +""" + +import os +import sys +import json +import time +import logging +import asyncio +import hashlib +import hmac +from urllib.parse import urlencode +from datetime import datetime, timezone + +import psycopg2 +import aiohttp + +# ============ 配置 ============ + +TRADE_ENV = os.getenv("TRADE_ENV", "testnet") +BINANCE_ENDPOINTS = { + "testnet": "https://testnet.binancefuture.com", + "production": "https://fapi.binance.com", +} +BASE_URL = BINANCE_ENDPOINTS[TRADE_ENV] + +DB_CONFIG = { + "host": os.getenv("DB_HOST", "10.106.0.3"), + "port": int(os.getenv("DB_PORT", "5432")), + "dbname": os.getenv("DB_NAME", "arb_engine"), + "user": os.getenv("DB_USER", "arb"), + "password": os.getenv("DB_PASSWORD", "arb_engine_2026"), +} + +# 风控参数 +DAILY_LOSS_LIMIT_R = float(os.getenv("DAILY_LOSS_LIMIT_R", "-5")) +CONSECUTIVE_LOSS_LIMIT = int(os.getenv("CONSECUTIVE_LOSS_LIMIT", "5")) +CONSECUTIVE_LOSS_COOLDOWN_MIN = int(os.getenv("CONSECUTIVE_LOSS_COOLDOWN_MIN", "60")) +API_DISCONNECT_THRESHOLD_SEC = int(os.getenv("API_DISCONNECT_THRESHOLD_SEC", "30")) +MIN_BALANCE_MULTIPLE = float(os.getenv("MIN_BALANCE_MULTIPLE", "2")) +RISK_PER_TRADE_USD = float(os.getenv("RISK_PER_TRADE_USD", "2")) + +# 超时处置 +HOLD_TIMEOUT_YELLOW_MIN = 45 +HOLD_TIMEOUT_RED_MIN = 60 +HOLD_TIMEOUT_GRACE_MIN = 10 # 红灯后10分钟人工窗口 +HOLD_TIMEOUT_AUTO_CLOSE_MIN = HOLD_TIMEOUT_RED_MIN + HOLD_TIMEOUT_GRACE_MIN # 70分钟 + +# 数据新鲜度 +MARKET_DATA_STALE_SEC = 10 +ACCOUNT_UPDATE_STALE_SEC = 20 + +CHECK_INTERVAL = 5 # 风控检查间隔(秒) + +SYMBOLS = ["BTCUSDT", "ETHUSDT", "XRPUSDT", "SOLUSDT"] + +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s [%(levelname)s] %(name)s: %(message)s", +) +logger = logging.getLogger("risk-guard") + +# ============ 状态 ============ + +class RiskState: + """风控状态管理""" + def __init__(self): + self.status = "normal" # normal / warning / circuit_break + self.block_new_entries = False + self.reduce_only = False + self.manual_override = False + self.circuit_break_reason = None + self.circuit_break_time = None + self.auto_resume_time = None + self.last_api_success = time.time() + self.last_market_data = time.time() + self.last_account_update = time.time() + self.consecutive_losses = 0 + self.today_realized_r = 0.0 + self.today_unrealized_r = 0.0 + self.breaker_history = [] + # 超时处置队列: {trade_id: {"entered_queue_ts": ..., "notified": bool}} + self.timeout_queue = {} + + def to_dict(self): + return { + "status": self.status, + "block_new_entries": self.block_new_entries, + "reduce_only": self.reduce_only, + "circuit_break_reason": self.circuit_break_reason, + "circuit_break_time": self.circuit_break_time, + "auto_resume_time": self.auto_resume_time, + "consecutive_losses": self.consecutive_losses, + "today_realized_r": round(self.today_realized_r, 2), + "today_unrealized_r": round(self.today_unrealized_r, 2), + "today_total_r": round(self.today_realized_r + min(self.today_unrealized_r, 0), 2), + } + + +risk_state = RiskState() + +# ============ API Key ============ +_api_key = None +_secret_key = None + + +def load_api_keys(): + global _api_key, _secret_key + _api_key = os.getenv("BINANCE_API_KEY") + _secret_key = os.getenv("BINANCE_SECRET_KEY") + if _api_key and _secret_key: + return + try: + from google.cloud import secretmanager + client = secretmanager.SecretManagerServiceClient() + project = os.getenv("GCP_PROJECT", "gen-lang-client-0835616737") + prefix = "binance-testnet" if TRADE_ENV == "testnet" else "binance-live" + _api_key = client.access_secret_version( + name=f"projects/{project}/secrets/{prefix}-api-key/versions/latest" + ).payload.data.decode() + _secret_key = client.access_secret_version( + name=f"projects/{project}/secrets/{prefix}-secret-key/versions/latest" + ).payload.data.decode() + except Exception as e: + logger.error(f"Failed to load API keys: {e}") + sys.exit(1) + + +def sign_params(params): + params["timestamp"] = int(time.time() * 1000) + query_string = urlencode(params) + signature = hmac.new(_secret_key.encode(), query_string.encode(), hashlib.sha256).hexdigest() + params["signature"] = signature + return params + + +async def binance_request(session, method, path, params=None): + url = f"{BASE_URL}{path}" + headers = {"X-MBX-APIKEY": _api_key} + if params is None: + params = {} + params = sign_params(params) + try: + if method == "GET": + async with session.get(url, params=params, headers=headers) as resp: + data = await resp.json() + risk_state.last_api_success = time.time() + return data, resp.status + elif method == "POST": + async with session.post(url, params=params, headers=headers) as resp: + data = await resp.json() + risk_state.last_api_success = time.time() + return data, resp.status + elif method == "DELETE": + async with session.delete(url, params=params, headers=headers) as resp: + data = await resp.json() + risk_state.last_api_success = time.time() + return data, resp.status + except Exception as e: + logger.error(f"API request failed: {e}") + return {"error": str(e)}, 500 + + +# ============ 风控检查 ============ + +def check_daily_loss(conn): + """检查今日已实现亏损""" + cur = conn.cursor() + # 今日UTC起始 + today_start = datetime.now(timezone.utc).replace(hour=0, minute=0, second=0, microsecond=0) + today_start_ms = int(today_start.timestamp() * 1000) + + cur.execute(""" + SELECT COALESCE(SUM(pnl_r), 0) as total_r, + COUNT(*) FILTER (WHERE pnl_r < 0) as loss_count + FROM live_trades + WHERE exit_ts >= %s AND status NOT IN ('active', 'tp1_hit') + """, (today_start_ms,)) + row = cur.fetchone() + realized_r = float(row[0]) if row[0] else 0 + risk_state.today_realized_r = realized_r + + # 检查连续亏损 + cur.execute(""" + SELECT pnl_r FROM live_trades + WHERE status NOT IN ('active', 'tp1_hit') + ORDER BY exit_ts DESC LIMIT %s + """, (CONSECUTIVE_LOSS_LIMIT,)) + recent = [r[0] for r in cur.fetchall()] + consecutive = 0 + for r in recent: + if r and r < 0: + consecutive += 1 + else: + break + risk_state.consecutive_losses = consecutive + + return realized_r, consecutive + + +async def check_unrealized_loss(session): + """检查未实现亏损""" + data, status = await binance_request(session, "GET", "/fapi/v2/positionRisk") + total_unrealized = 0 + if status == 200 and isinstance(data, list): + for pos in data: + pnl = float(pos.get("unRealizedProfit", 0)) + total_unrealized += pnl + # 转为R + unrealized_r = total_unrealized / RISK_PER_TRADE_USD if RISK_PER_TRADE_USD > 0 else 0 + risk_state.today_unrealized_r = unrealized_r + return unrealized_r + + +async def check_balance(session): + """检查余额是否足够""" + data, status = await binance_request(session, "GET", "/fapi/v2/balance") + if status == 200 and isinstance(data, list): + for asset in data: + if asset.get("asset") == "USDT": + available = float(asset.get("availableBalance", 0)) + return available + return 0 + + +def check_data_freshness(): + """检查数据新鲜度""" + now = time.time() + issues = [] + + api_gap = now - risk_state.last_api_success + if api_gap > API_DISCONNECT_THRESHOLD_SEC: + issues.append(f"API无响应{api_gap:.0f}秒") + + return issues + + +async def check_hold_timeout(session, conn): + """检查持仓超时,管理处置队列""" + cur = conn.cursor() + cur.execute(""" + SELECT id, symbol, direction, entry_ts, entry_price, risk_distance + FROM live_trades + WHERE status IN ('active', 'tp1_hit') + """) + now_ms = int(time.time() * 1000) + + for row in cur.fetchall(): + trade_id, symbol, direction, entry_ts, entry_price, rd = row + hold_min = (now_ms - entry_ts) / 60000 + + if hold_min >= HOLD_TIMEOUT_AUTO_CLOSE_MIN: + # 70分钟:人工窗口到期,自动平仓 + if trade_id in risk_state.timeout_queue: + logger.warning(f"[{symbol}] ⏰ 持仓{hold_min:.0f}分钟,人工窗口到期,自动市价平仓!") + close_side = "SELL" if direction == "LONG" else "BUY" + # 取消所有挂单 + await binance_request(session, "DELETE", "/fapi/v1/allOpenOrders", {"symbol": symbol}) + # 查仓位大小 + pos_data, _ = await binance_request(session, "GET", "/fapi/v2/positionRisk", {"symbol": symbol}) + if isinstance(pos_data, list): + for p in pos_data: + amt = abs(float(p.get("positionAmt", 0))) + if amt > 0 and p["symbol"] == symbol: + await binance_request(session, "POST", "/fapi/v1/order", { + "symbol": symbol, "side": close_side, "type": "MARKET", + "quantity": str(amt), "reduceOnly": "true", + }) + logger.info(f"[{symbol}] 🔴 自动平仓完成 qty={amt}") + del risk_state.timeout_queue[trade_id] + + elif hold_min >= HOLD_TIMEOUT_RED_MIN: + # 60分钟:进入处置队列+10分钟倒计时 + if trade_id not in risk_state.timeout_queue: + risk_state.timeout_queue[trade_id] = { + "entered_queue_ts": time.time(), + "notified": False, + "symbol": symbol, + } + remaining = HOLD_TIMEOUT_GRACE_MIN + logger.warning(f"[{symbol}] 🔴 持仓{hold_min:.0f}分钟超时! 进入处置队列, {remaining}分钟后自动平仓") + # TODO: Discord通知范总 + + elif not risk_state.timeout_queue[trade_id]["notified"]: + risk_state.timeout_queue[trade_id]["notified"] = True + # TODO: Discord紧急通知 + + elif hold_min >= HOLD_TIMEOUT_YELLOW_MIN: + logger.info(f"[{symbol}] 🟡 持仓{hold_min:.0f}分钟,接近超时") + + +# ============ 熔断动作 ============ + +async def trigger_circuit_break(session, conn, reason: str, action: str = "block_new"): + """触发熔断""" + now = time.time() + risk_state.status = "circuit_break" + risk_state.circuit_break_reason = reason + risk_state.circuit_break_time = now + + if action == "block_new": + risk_state.block_new_entries = True + logger.error(f"🔴 熔断触发: {reason} | 动作: 禁止新开仓") + + elif action == "reduce_only": + risk_state.block_new_entries = True + risk_state.reduce_only = True + logger.error(f"🔴 熔断触发: {reason} | 动作: 只减仓模式") + + elif action == "close_all": + risk_state.block_new_entries = True + risk_state.reduce_only = True + logger.error(f"🔴🔴 熔断触发: {reason} | 动作: 全部平仓!") + + # 执行全平 + for symbol in SYMBOLS: + await binance_request(session, "DELETE", "/fapi/v1/allOpenOrders", {"symbol": symbol}) + pos_data, _ = await binance_request(session, "GET", "/fapi/v2/positionRisk", {"symbol": symbol}) + if isinstance(pos_data, list): + for p in pos_data: + amt = float(p.get("positionAmt", 0)) + if amt != 0: + close_side = "SELL" if amt > 0 else "BUY" + await binance_request(session, "POST", "/fapi/v1/order", { + "symbol": symbol, "side": close_side, "type": "MARKET", + "quantity": str(abs(amt)), "reduceOnly": "true", + }) + logger.info(f"[{symbol}] 🔴 紧急平仓 {close_side} qty={abs(amt)}") + + # 记录历史 + risk_state.breaker_history.append({ + "time": now, + "reason": reason, + "action": action, + }) + + # 写状态文件(供其他进程读取) + write_risk_state() + + +def write_risk_state(): + """写风控状态到文件(供live_executor读取判断是否可开仓)""" + state_path = "/tmp/risk_guard_state.json" + try: + with open(state_path, "w") as f: + json.dump(risk_state.to_dict(), f) + except Exception as e: + logger.error(f"写风控状态失败: {e}") + + +def check_auto_resume(): + """检查是否可以自动恢复""" + if risk_state.status != "circuit_break": + return + + now = time.time() + + # 连续亏损冷却期到了 + if (risk_state.circuit_break_reason + and "连续亏损" in risk_state.circuit_break_reason + and risk_state.circuit_break_time): + elapsed_min = (now - risk_state.circuit_break_time) / 60 + if elapsed_min >= CONSECUTIVE_LOSS_COOLDOWN_MIN: + logger.info(f"✅ 连续亏损冷却期结束({CONSECUTIVE_LOSS_COOLDOWN_MIN}分钟),自动恢复交易") + risk_state.status = "normal" + risk_state.block_new_entries = False + risk_state.reduce_only = False + risk_state.circuit_break_reason = None + write_risk_state() + + # API恢复 + if (risk_state.circuit_break_reason + and "API" in risk_state.circuit_break_reason): + api_gap = now - risk_state.last_api_success + if api_gap < 10: # API恢复正常10秒 + logger.info("✅ API连接恢复,自动恢复交易") + risk_state.status = "normal" + risk_state.block_new_entries = False + risk_state.circuit_break_reason = None + write_risk_state() + + +# ============ 主循环 ============ + +async def main(): + logger.info("=" * 60) + logger.info(f"🛡 Risk Guard 启动 | 环境={TRADE_ENV}") + logger.info(f" 日限={DAILY_LOSS_LIMIT_R}R | 连亏限={CONSECUTIVE_LOSS_LIMIT}次 | 冷却={CONSECUTIVE_LOSS_COOLDOWN_MIN}分钟") + logger.info(f" 超时: {HOLD_TIMEOUT_YELLOW_MIN}min黄/{HOLD_TIMEOUT_RED_MIN}min红/{HOLD_TIMEOUT_AUTO_CLOSE_MIN}min自动平") + logger.info("=" * 60) + + load_api_keys() + conn = psycopg2.connect(**DB_CONFIG) + + # 初始状态写入 + write_risk_state() + + async with aiohttp.ClientSession() as session: + while True: + try: + # 0. 检查自动恢复 + check_auto_resume() + + # 1. 今日亏损检查 + realized_r, consecutive = check_daily_loss(conn) + unrealized_r = await check_unrealized_loss(session) + total_r = realized_r + min(unrealized_r, 0) # 已实现 + 未实现亏损 + + if total_r <= DAILY_LOSS_LIMIT_R and risk_state.status != "circuit_break": + await trigger_circuit_break( + session, conn, + f"今日亏损{total_r:.2f}R,超过日限{DAILY_LOSS_LIMIT_R}R", + "close_all" + ) + + # 2. 连续亏损检查 + if (consecutive >= CONSECUTIVE_LOSS_LIMIT + and risk_state.status != "circuit_break"): + await trigger_circuit_break( + session, conn, + f"连续亏损{consecutive}次,超过限制{CONSECUTIVE_LOSS_LIMIT}次", + "block_new" + ) + risk_state.auto_resume_time = time.time() + CONSECUTIVE_LOSS_COOLDOWN_MIN * 60 + + # 3. API连接检查 + freshness_issues = check_data_freshness() + if freshness_issues and risk_state.status != "circuit_break": + await trigger_circuit_break( + session, conn, + f"数据异常: {'; '.join(freshness_issues)}", + "block_new" + ) + + # 4. 余额检查 + balance = await check_balance(session) + if balance < RISK_PER_TRADE_USD * MIN_BALANCE_MULTIPLE: + if not risk_state.block_new_entries: + risk_state.block_new_entries = True + logger.warning(f"🟡 余额不足: ${balance:.2f} < ${RISK_PER_TRADE_USD * MIN_BALANCE_MULTIPLE:.2f},暂停开仓") + + # 5. 持仓超时检查 + await check_hold_timeout(session, conn) + + # 6. 写状态 + write_risk_state() + + # 日志(每60秒输出一次状态) + if int(time.time()) % 60 < CHECK_INTERVAL: + logger.info( + f"📊 风控状态: {risk_state.status} | " + f"已实现={realized_r:+.2f}R | 未实现={unrealized_r:+.2f}R | " + f"合计={total_r:+.2f}R | 连亏={consecutive} | " + f"余额=${balance:.2f}" + ) + + await asyncio.sleep(CHECK_INTERVAL) + + except KeyboardInterrupt: + break + except Exception as e: + logger.error(f"❌ 风控检查异常: {e}", exc_info=True) + await asyncio.sleep(10) + + conn.close() + logger.info("Risk Guard 已停止") + + +if __name__ == "__main__": + asyncio.run(main())