#!/usr/bin/env python3 """ Risk Guard - 风控熔断模块 实时监控风险指标,触发熔断时自动执行保护动作 熔断规则: 1. 单日亏损超限(-5R) → 全平+停机 2. 连续亏损(5连亏) → 暂停开仓1小时 3. API连接异常(>30秒) → 暂停开仓 4. 余额不足(< 风险×2) → 拒绝开仓 5. 数据新鲜度超时 → 禁止新开仓 """ import os import sys import json import time import logging import asyncio import hashlib import hmac from urllib.parse import urlencode from datetime import datetime, timezone import psycopg2 import aiohttp # ============ 配置 ============ TRADE_ENV = os.getenv("TRADE_ENV", "testnet") BINANCE_ENDPOINTS = { "testnet": "https://testnet.binancefuture.com", "production": "https://fapi.binance.com", } BASE_URL = BINANCE_ENDPOINTS[TRADE_ENV] DB_CONFIG = { "host": os.getenv("DB_HOST", "10.106.0.3"), "port": int(os.getenv("DB_PORT", "5432")), "dbname": os.getenv("DB_NAME", "arb_engine"), "user": os.getenv("DB_USER", "arb"), "password": os.getenv("DB_PASSWORD", "arb_engine_2026"), } # 风控参数 DAILY_LOSS_LIMIT_R = float(os.getenv("DAILY_LOSS_LIMIT_R", "-5")) CONSECUTIVE_LOSS_LIMIT = int(os.getenv("CONSECUTIVE_LOSS_LIMIT", "5")) CONSECUTIVE_LOSS_COOLDOWN_MIN = int(os.getenv("CONSECUTIVE_LOSS_COOLDOWN_MIN", "60")) API_DISCONNECT_THRESHOLD_SEC = int(os.getenv("API_DISCONNECT_THRESHOLD_SEC", "30")) MIN_BALANCE_MULTIPLE = float(os.getenv("MIN_BALANCE_MULTIPLE", "2")) RISK_PER_TRADE_USD = float(os.getenv("RISK_PER_TRADE_USD", "2")) # 超时处置 HOLD_TIMEOUT_YELLOW_MIN = 45 HOLD_TIMEOUT_RED_MIN = 60 HOLD_TIMEOUT_GRACE_MIN = 10 # 红灯后10分钟人工窗口 HOLD_TIMEOUT_AUTO_CLOSE_MIN = HOLD_TIMEOUT_RED_MIN + HOLD_TIMEOUT_GRACE_MIN # 70分钟 # 数据新鲜度 MARKET_DATA_STALE_SEC = 10 ACCOUNT_UPDATE_STALE_SEC = 20 CHECK_INTERVAL = 5 # 风控检查间隔(秒) SYMBOLS = ["BTCUSDT", "ETHUSDT", "XRPUSDT", "SOLUSDT"] logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(levelname)s] %(name)s: %(message)s", ) logger = logging.getLogger("risk-guard") # ============ 状态 ============ class RiskState: """风控状态管理""" def __init__(self): self.status = "normal" # normal / warning / circuit_break self.block_new_entries = False self.reduce_only = False self.manual_override = False self.circuit_break_reason = None self.circuit_break_time = None self.auto_resume_time = None self.last_api_success = time.time() self.last_market_data = time.time() self.last_account_update = time.time() self.consecutive_losses = 0 self.today_realized_r = 0.0 self.today_unrealized_r = 0.0 self.breaker_history = [] # 超时处置队列: {trade_id: {"entered_queue_ts": ..., "notified": bool}} self.timeout_queue = {} def to_dict(self): return { "status": self.status, "block_new_entries": self.block_new_entries, "reduce_only": self.reduce_only, "circuit_break_reason": self.circuit_break_reason, "circuit_break_time": self.circuit_break_time, "auto_resume_time": self.auto_resume_time, "consecutive_losses": self.consecutive_losses, "today_realized_r": round(self.today_realized_r, 2), "today_unrealized_r": round(self.today_unrealized_r, 2), "today_total_r": round(self.today_realized_r + min(self.today_unrealized_r, 0), 2), } risk_state = RiskState() # ============ API Key ============ _api_key = None _secret_key = None def load_api_keys(): global _api_key, _secret_key _api_key = os.getenv("BINANCE_API_KEY") _secret_key = os.getenv("BINANCE_SECRET_KEY") if _api_key and _secret_key: return try: from google.cloud import secretmanager client = secretmanager.SecretManagerServiceClient() project = os.getenv("GCP_PROJECT", "gen-lang-client-0835616737") prefix = "binance-testnet" if TRADE_ENV == "testnet" else "binance-live" _api_key = client.access_secret_version( name=f"projects/{project}/secrets/{prefix}-api-key/versions/latest" ).payload.data.decode() _secret_key = client.access_secret_version( name=f"projects/{project}/secrets/{prefix}-secret-key/versions/latest" ).payload.data.decode() except Exception as e: logger.error(f"Failed to load API keys: {e}") sys.exit(1) def sign_params(params): params["timestamp"] = int(time.time() * 1000) query_string = urlencode(params) signature = hmac.new(_secret_key.encode(), query_string.encode(), hashlib.sha256).hexdigest() params["signature"] = signature return params async def binance_request(session, method, path, params=None): url = f"{BASE_URL}{path}" headers = {"X-MBX-APIKEY": _api_key} if params is None: params = {} params = sign_params(params) try: if method == "GET": async with session.get(url, params=params, headers=headers) as resp: data = await resp.json() risk_state.last_api_success = time.time() return data, resp.status elif method == "POST": async with session.post(url, params=params, headers=headers) as resp: data = await resp.json() risk_state.last_api_success = time.time() return data, resp.status elif method == "DELETE": async with session.delete(url, params=params, headers=headers) as resp: data = await resp.json() risk_state.last_api_success = time.time() return data, resp.status except Exception as e: logger.error(f"API request failed: {e}") return {"error": str(e)}, 500 # ============ 风控检查 ============ def check_daily_loss(conn): """检查今日已实现亏损""" cur = conn.cursor() # 今日UTC起始 today_start = datetime.now(timezone.utc).replace(hour=0, minute=0, second=0, microsecond=0) today_start_ms = int(today_start.timestamp() * 1000) cur.execute(""" SELECT COALESCE(SUM(pnl_r), 0) as total_r, COUNT(*) FILTER (WHERE pnl_r < 0) as loss_count FROM live_trades WHERE exit_ts >= %s AND status NOT IN ('active', 'tp1_hit') """, (today_start_ms,)) row = cur.fetchone() realized_r = float(row[0]) if row[0] else 0 risk_state.today_realized_r = realized_r # 检查连续亏损 cur.execute(""" SELECT pnl_r FROM live_trades WHERE status NOT IN ('active', 'tp1_hit') ORDER BY exit_ts DESC LIMIT %s """, (CONSECUTIVE_LOSS_LIMIT,)) recent = [r[0] for r in cur.fetchall()] consecutive = 0 for r in recent: if r and r < 0: consecutive += 1 else: break risk_state.consecutive_losses = consecutive return realized_r, consecutive async def check_unrealized_loss(session): """检查未实现亏损""" data, status = await binance_request(session, "GET", "/fapi/v2/positionRisk") total_unrealized = 0 if status == 200 and isinstance(data, list): for pos in data: pnl = float(pos.get("unRealizedProfit", 0)) total_unrealized += pnl # 转为R unrealized_r = total_unrealized / RISK_PER_TRADE_USD if RISK_PER_TRADE_USD > 0 else 0 risk_state.today_unrealized_r = unrealized_r return unrealized_r async def check_balance(session): """检查余额是否足够""" data, status = await binance_request(session, "GET", "/fapi/v2/balance") if status == 200 and isinstance(data, list): for asset in data: if asset.get("asset") == "USDT": available = float(asset.get("availableBalance", 0)) return available return 0 def check_data_freshness(): """检查数据新鲜度""" now = time.time() issues = [] api_gap = now - risk_state.last_api_success if api_gap > API_DISCONNECT_THRESHOLD_SEC: issues.append(f"API无响应{api_gap:.0f}秒") return issues async def check_hold_timeout(session, conn): """检查持仓超时,管理处置队列""" cur = conn.cursor() cur.execute(""" SELECT id, symbol, direction, entry_ts, entry_price, risk_distance FROM live_trades WHERE status IN ('active', 'tp1_hit') """) now_ms = int(time.time() * 1000) for row in cur.fetchall(): trade_id, symbol, direction, entry_ts, entry_price, rd = row hold_min = (now_ms - entry_ts) / 60000 if hold_min >= HOLD_TIMEOUT_AUTO_CLOSE_MIN: # 70分钟:人工窗口到期,自动平仓 if trade_id in risk_state.timeout_queue: logger.warning(f"[{symbol}] ⏰ 持仓{hold_min:.0f}分钟,人工窗口到期,自动市价平仓!") close_side = "SELL" if direction == "LONG" else "BUY" # 取消所有挂单 await binance_request(session, "DELETE", "/fapi/v1/allOpenOrders", {"symbol": symbol}) # 查仓位大小 pos_data, _ = await binance_request(session, "GET", "/fapi/v2/positionRisk", {"symbol": symbol}) if isinstance(pos_data, list): for p in pos_data: amt = abs(float(p.get("positionAmt", 0))) if amt > 0 and p["symbol"] == symbol: await binance_request(session, "POST", "/fapi/v1/order", { "symbol": symbol, "side": close_side, "type": "MARKET", "quantity": str(amt), "reduceOnly": "true", }) logger.info(f"[{symbol}] 🔴 自动平仓完成 qty={amt}") del risk_state.timeout_queue[trade_id] elif hold_min >= HOLD_TIMEOUT_RED_MIN: # 60分钟:进入处置队列+10分钟倒计时 if trade_id not in risk_state.timeout_queue: risk_state.timeout_queue[trade_id] = { "entered_queue_ts": time.time(), "notified": False, "symbol": symbol, } remaining = HOLD_TIMEOUT_GRACE_MIN logger.warning(f"[{symbol}] 🔴 持仓{hold_min:.0f}分钟超时! 进入处置队列, {remaining}分钟后自动平仓") # TODO: Discord通知范总 elif not risk_state.timeout_queue[trade_id]["notified"]: risk_state.timeout_queue[trade_id]["notified"] = True # TODO: Discord紧急通知 elif hold_min >= HOLD_TIMEOUT_YELLOW_MIN: logger.info(f"[{symbol}] 🟡 持仓{hold_min:.0f}分钟,接近超时") # ============ 熔断动作 ============ async def trigger_circuit_break(session, conn, reason: str, action: str = "block_new"): """触发熔断""" now = time.time() risk_state.status = "circuit_break" risk_state.circuit_break_reason = reason risk_state.circuit_break_time = now if action == "block_new": risk_state.block_new_entries = True logger.error(f"🔴 熔断触发: {reason} | 动作: 禁止新开仓") elif action == "reduce_only": risk_state.block_new_entries = True risk_state.reduce_only = True logger.error(f"🔴 熔断触发: {reason} | 动作: 只减仓模式") elif action == "close_all": risk_state.block_new_entries = True risk_state.reduce_only = True logger.error(f"🔴🔴 熔断触发: {reason} | 动作: 全部平仓!") # 执行全平 for symbol in SYMBOLS: await binance_request(session, "DELETE", "/fapi/v1/allOpenOrders", {"symbol": symbol}) pos_data, _ = await binance_request(session, "GET", "/fapi/v2/positionRisk", {"symbol": symbol}) if isinstance(pos_data, list): for p in pos_data: amt = float(p.get("positionAmt", 0)) if amt != 0: close_side = "SELL" if amt > 0 else "BUY" await binance_request(session, "POST", "/fapi/v1/order", { "symbol": symbol, "side": close_side, "type": "MARKET", "quantity": str(abs(amt)), "reduceOnly": "true", }) logger.info(f"[{symbol}] 🔴 紧急平仓 {close_side} qty={abs(amt)}") # 记录历史 risk_state.breaker_history.append({ "time": now, "reason": reason, "action": action, }) # 写状态文件(供其他进程读取) write_risk_state() def write_risk_state(): """写风控状态到文件(供live_executor读取判断是否可开仓)""" state_path = "/tmp/risk_guard_state.json" try: with open(state_path, "w") as f: json.dump(risk_state.to_dict(), f) except Exception as e: logger.error(f"写风控状态失败: {e}") def check_auto_resume(): """检查是否可以自动恢复""" if risk_state.status != "circuit_break": return now = time.time() # 连续亏损冷却期到了 if (risk_state.circuit_break_reason and "连续亏损" in risk_state.circuit_break_reason and risk_state.circuit_break_time): elapsed_min = (now - risk_state.circuit_break_time) / 60 if elapsed_min >= CONSECUTIVE_LOSS_COOLDOWN_MIN: logger.info(f"✅ 连续亏损冷却期结束({CONSECUTIVE_LOSS_COOLDOWN_MIN}分钟),自动恢复交易") risk_state.status = "normal" risk_state.block_new_entries = False risk_state.reduce_only = False risk_state.circuit_break_reason = None write_risk_state() # API恢复 if (risk_state.circuit_break_reason and "API" in risk_state.circuit_break_reason): api_gap = now - risk_state.last_api_success if api_gap < 10: # API恢复正常10秒 logger.info("✅ API连接恢复,自动恢复交易") risk_state.status = "normal" risk_state.block_new_entries = False risk_state.circuit_break_reason = None write_risk_state() # ============ 主循环 ============ async def main(): logger.info("=" * 60) logger.info(f"🛡 Risk Guard 启动 | 环境={TRADE_ENV}") logger.info(f" 日限={DAILY_LOSS_LIMIT_R}R | 连亏限={CONSECUTIVE_LOSS_LIMIT}次 | 冷却={CONSECUTIVE_LOSS_COOLDOWN_MIN}分钟") logger.info(f" 超时: {HOLD_TIMEOUT_YELLOW_MIN}min黄/{HOLD_TIMEOUT_RED_MIN}min红/{HOLD_TIMEOUT_AUTO_CLOSE_MIN}min自动平") logger.info("=" * 60) load_api_keys() conn = psycopg2.connect(**DB_CONFIG) # 初始状态写入 write_risk_state() async with aiohttp.ClientSession() as session: while True: try: # 0. 检查自动恢复 check_auto_resume() # 1. 今日亏损检查 realized_r, consecutive = check_daily_loss(conn) unrealized_r = await check_unrealized_loss(session) total_r = realized_r + min(unrealized_r, 0) # 已实现 + 未实现亏损 if total_r <= DAILY_LOSS_LIMIT_R and risk_state.status != "circuit_break": await trigger_circuit_break( session, conn, f"今日亏损{total_r:.2f}R,超过日限{DAILY_LOSS_LIMIT_R}R", "close_all" ) # 2. 连续亏损检查 if (consecutive >= CONSECUTIVE_LOSS_LIMIT and risk_state.status != "circuit_break"): await trigger_circuit_break( session, conn, f"连续亏损{consecutive}次,超过限制{CONSECUTIVE_LOSS_LIMIT}次", "block_new" ) risk_state.auto_resume_time = time.time() + CONSECUTIVE_LOSS_COOLDOWN_MIN * 60 # 3. API连接检查 freshness_issues = check_data_freshness() if freshness_issues and risk_state.status != "circuit_break": await trigger_circuit_break( session, conn, f"数据异常: {'; '.join(freshness_issues)}", "block_new" ) # 4. 余额检查 balance = await check_balance(session) if balance < RISK_PER_TRADE_USD * MIN_BALANCE_MULTIPLE: if not risk_state.block_new_entries: risk_state.block_new_entries = True logger.warning(f"🟡 余额不足: ${balance:.2f} < ${RISK_PER_TRADE_USD * MIN_BALANCE_MULTIPLE:.2f},暂停开仓") # 5. 持仓超时检查 await check_hold_timeout(session, conn) # 6. 写状态 write_risk_state() # 日志(每60秒输出一次状态) if int(time.time()) % 60 < CHECK_INTERVAL: logger.info( f"📊 风控状态: {risk_state.status} | " f"已实现={realized_r:+.2f}R | 未实现={unrealized_r:+.2f}R | " f"合计={total_r:+.2f}R | 连亏={consecutive} | " f"余额=${balance:.2f}" ) await asyncio.sleep(CHECK_INTERVAL) except KeyboardInterrupt: break except Exception as e: logger.error(f"❌ 风控检查异常: {e}", exc_info=True) await asyncio.sleep(10) conn.close() logger.info("Risk Guard 已停止") if __name__ == "__main__": asyncio.run(main())