arbitrage-engine/backend/risk_guard.py
root 638589852b fix: P0审阅修复 + P1/P2增强
P0-1: SL挂单失败→重试2次→3次失败紧急市价平仓+写event
P0-2: TP1检测改用DB qty字段(新增)比对仓位减少,不再用orderId
P0-3: emergency-close/block-new/resume/config PUT加admin权限验证
P0-5: risk_guard全平qty按币种精度格式化(BTC:3/ETH:3/XRP:0/SOL:2)

P1-3: NOTIFY收到后立即处理跳过sleep,减少信号延迟
P2-1: 三个进程加DB连接断线重连(ensure_db_conn)

DB: live_trades新增qty字段
2026-03-02 13:56:36 +00:00

570 lines
21 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
Risk Guard - 风控熔断模块
实时监控风险指标,触发熔断时自动执行保护动作
熔断规则:
1. 单日亏损超限(-5R) → 全平+停机
2. 连续亏损(5连亏) → 暂停开仓1小时
3. API连接异常(>30秒) → 暂停开仓
4. 余额不足(< 风险×2) → 拒绝开仓
5. 数据新鲜度超时 → 禁止新开仓
"""
import os
import sys
import json
import time
from pathlib import Path
try:
from dotenv import load_dotenv; load_dotenv(Path(__file__).parent / ".env")
except ImportError:
pass
import logging
import asyncio
import hashlib
import hmac
from urllib.parse import urlencode
from datetime import datetime, timezone
import psycopg2
import aiohttp
# ============ 配置 ============
TRADE_ENV = os.getenv("TRADE_ENV", "testnet")
BINANCE_ENDPOINTS = {
"testnet": "https://testnet.binancefuture.com",
"production": "https://fapi.binance.com",
}
BASE_URL = BINANCE_ENDPOINTS[TRADE_ENV]
DB_CONFIG = {
"host": os.getenv("DB_HOST", "10.106.0.3"),
"port": int(os.getenv("DB_PORT", "5432")),
"dbname": os.getenv("DB_NAME", "arb_engine"),
"user": os.getenv("DB_USER", "arb"),
"password": os.getenv("DB_PASSWORD", "arb_engine_2026"),
}
# 风控参数
DAILY_LOSS_LIMIT_R = float(os.getenv("DAILY_LOSS_LIMIT_R", "-5"))
CONSECUTIVE_LOSS_LIMIT = int(os.getenv("CONSECUTIVE_LOSS_LIMIT", "5"))
CONSECUTIVE_LOSS_COOLDOWN_MIN = int(os.getenv("CONSECUTIVE_LOSS_COOLDOWN_MIN", "60"))
API_DISCONNECT_THRESHOLD_SEC = int(os.getenv("API_DISCONNECT_THRESHOLD_SEC", "30"))
MIN_BALANCE_MULTIPLE = float(os.getenv("MIN_BALANCE_MULTIPLE", "2"))
RISK_PER_TRADE_USD = float(os.getenv("RISK_PER_TRADE_USD", "2"))
# 超时处置
HOLD_TIMEOUT_YELLOW_MIN = 45
HOLD_TIMEOUT_RED_MIN = 60
HOLD_TIMEOUT_GRACE_MIN = 10 # 红灯后10分钟人工窗口
HOLD_TIMEOUT_AUTO_CLOSE_MIN = HOLD_TIMEOUT_RED_MIN + HOLD_TIMEOUT_GRACE_MIN # 70分钟
# 数据新鲜度
MARKET_DATA_STALE_SEC = 10
ACCOUNT_UPDATE_STALE_SEC = 20
CHECK_INTERVAL = 5 # 风控检查间隔(秒)
SYMBOLS = ["BTCUSDT", "ETHUSDT", "XRPUSDT", "SOLUSDT"]
SYMBOL_QTY_PRECISION = {
"BTCUSDT": 3, "ETHUSDT": 3, "XRPUSDT": 0, "SOLUSDT": 2,
}
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
)
logger = logging.getLogger("risk-guard")
# ============ 状态 ============
class RiskState:
"""风控状态管理"""
def __init__(self):
self.status = "normal" # normal / warning / circuit_break
self.block_new_entries = False
self.reduce_only = False
self.manual_override = False
self.circuit_break_reason = None
self.circuit_break_time = None
self.auto_resume_time = None
self.last_api_success = time.time()
self.last_market_data = time.time()
self.last_account_update = time.time()
self.consecutive_losses = 0
self.today_realized_r = 0.0
self.today_unrealized_r = 0.0
self.breaker_history = []
# 超时处置队列: {trade_id: {"entered_queue_ts": ..., "notified": bool}}
self.timeout_queue = {}
def to_dict(self):
return {
"status": self.status,
"block_new_entries": self.block_new_entries,
"reduce_only": self.reduce_only,
"circuit_break_reason": self.circuit_break_reason,
"circuit_break_time": self.circuit_break_time,
"auto_resume_time": self.auto_resume_time,
"consecutive_losses": self.consecutive_losses,
"today_realized_r": round(self.today_realized_r, 2),
"today_unrealized_r": round(self.today_unrealized_r, 2),
"today_total_r": round(self.today_realized_r + min(self.today_unrealized_r, 0), 2),
}
risk_state = RiskState()
# ============ API Key ============
_api_key = None
_secret_key = None
def load_api_keys():
global _api_key, _secret_key
_api_key = os.getenv("BINANCE_API_KEY")
_secret_key = os.getenv("BINANCE_SECRET_KEY")
if _api_key and _secret_key:
return
try:
from google.cloud import secretmanager
client = secretmanager.SecretManagerServiceClient()
project = os.getenv("GCP_PROJECT", "gen-lang-client-0835616737")
prefix = "binance-testnet" if TRADE_ENV == "testnet" else "binance-live"
_api_key = client.access_secret_version(
name=f"projects/{project}/secrets/{prefix}-api-key/versions/latest"
).payload.data.decode()
_secret_key = client.access_secret_version(
name=f"projects/{project}/secrets/{prefix}-secret-key/versions/latest"
).payload.data.decode()
except Exception as e:
logger.error(f"Failed to load API keys: {e}")
sys.exit(1)
def sign_params(params):
params["timestamp"] = int(time.time() * 1000)
query_string = urlencode(params)
signature = hmac.new(_secret_key.encode(), query_string.encode(), hashlib.sha256).hexdigest()
params["signature"] = signature
return params
async def binance_request(session, method, path, params=None):
url = f"{BASE_URL}{path}"
headers = {"X-MBX-APIKEY": _api_key}
if params is None:
params = {}
params = sign_params(params)
try:
if method == "GET":
async with session.get(url, params=params, headers=headers) as resp:
data = await resp.json()
risk_state.last_api_success = time.time()
return data, resp.status
elif method == "POST":
async with session.post(url, params=params, headers=headers) as resp:
data = await resp.json()
risk_state.last_api_success = time.time()
return data, resp.status
elif method == "DELETE":
async with session.delete(url, params=params, headers=headers) as resp:
data = await resp.json()
risk_state.last_api_success = time.time()
return data, resp.status
except Exception as e:
logger.error(f"API request failed: {e}")
return {"error": str(e)}, 500
# ============ 风控检查 ============
def check_daily_loss(conn):
"""检查今日已实现亏损"""
cur = conn.cursor()
# 今日UTC起始
today_start = datetime.now(timezone.utc).replace(hour=0, minute=0, second=0, microsecond=0)
today_start_ms = int(today_start.timestamp() * 1000)
cur.execute("""
SELECT COALESCE(SUM(pnl_r), 0) as total_r,
COUNT(*) FILTER (WHERE pnl_r < 0) as loss_count
FROM live_trades
WHERE exit_ts >= %s AND status NOT IN ('active', 'tp1_hit')
""", (today_start_ms,))
row = cur.fetchone()
realized_r = float(row[0]) if row[0] else 0
risk_state.today_realized_r = realized_r
# 检查连续亏损
cur.execute("""
SELECT pnl_r FROM live_trades
WHERE status NOT IN ('active', 'tp1_hit')
ORDER BY exit_ts DESC LIMIT %s
""", (CONSECUTIVE_LOSS_LIMIT,))
recent = [r[0] for r in cur.fetchall()]
consecutive = 0
for r in recent:
if r and r < 0:
consecutive += 1
else:
break
risk_state.consecutive_losses = consecutive
return realized_r, consecutive
async def check_unrealized_loss(session):
"""检查未实现亏损"""
data, status = await binance_request(session, "GET", "/fapi/v2/positionRisk")
total_unrealized = 0
if status == 200 and isinstance(data, list):
for pos in data:
pnl = float(pos.get("unRealizedProfit", 0))
total_unrealized += pnl
# 转为R
unrealized_r = total_unrealized / RISK_PER_TRADE_USD if RISK_PER_TRADE_USD > 0 else 0
risk_state.today_unrealized_r = unrealized_r
return unrealized_r
async def check_balance(session):
"""检查余额是否足够"""
data, status = await binance_request(session, "GET", "/fapi/v2/balance")
if status == 200 and isinstance(data, list):
for asset in data:
if asset.get("asset") == "USDT":
available = float(asset.get("availableBalance", 0))
return available
return 0
def check_data_freshness():
"""检查数据新鲜度"""
now = time.time()
issues = []
api_gap = now - risk_state.last_api_success
if api_gap > API_DISCONNECT_THRESHOLD_SEC:
issues.append(f"API无响应{api_gap:.0f}")
return issues
async def check_hold_timeout(session, conn):
"""检查持仓超时,管理处置队列"""
cur = conn.cursor()
cur.execute("""
SELECT id, symbol, direction, entry_ts, entry_price, risk_distance
FROM live_trades
WHERE status IN ('active', 'tp1_hit')
""")
now_ms = int(time.time() * 1000)
for row in cur.fetchall():
trade_id, symbol, direction, entry_ts, entry_price, rd = row
hold_min = (now_ms - entry_ts) / 60000
if hold_min >= HOLD_TIMEOUT_AUTO_CLOSE_MIN:
# 70分钟人工窗口到期自动平仓
if trade_id in risk_state.timeout_queue:
logger.warning(f"[{symbol}] ⏰ 持仓{hold_min:.0f}分钟,人工窗口到期,自动市价平仓!")
close_side = "SELL" if direction == "LONG" else "BUY"
# 取消所有挂单
await binance_request(session, "DELETE", "/fapi/v1/allOpenOrders", {"symbol": symbol})
# 查仓位大小
pos_data, _ = await binance_request(session, "GET", "/fapi/v2/positionRisk", {"symbol": symbol})
if isinstance(pos_data, list):
for p in pos_data:
amt = abs(float(p.get("positionAmt", 0)))
if amt > 0 and p["symbol"] == symbol:
await binance_request(session, "POST", "/fapi/v1/order", {
"symbol": symbol, "side": close_side, "type": "MARKET",
"quantity": str(amt), "reduceOnly": "true",
})
logger.info(f"[{symbol}] 🔴 自动平仓完成 qty={amt}")
del risk_state.timeout_queue[trade_id]
elif hold_min >= HOLD_TIMEOUT_RED_MIN:
# 60分钟进入处置队列+10分钟倒计时
if trade_id not in risk_state.timeout_queue:
risk_state.timeout_queue[trade_id] = {
"entered_queue_ts": time.time(),
"notified": False,
"symbol": symbol,
}
remaining = HOLD_TIMEOUT_GRACE_MIN
logger.warning(f"[{symbol}] 🔴 持仓{hold_min:.0f}分钟超时! 进入处置队列, {remaining}分钟后自动平仓")
# TODO: Discord通知范总
elif not risk_state.timeout_queue[trade_id]["notified"]:
risk_state.timeout_queue[trade_id]["notified"] = True
# TODO: Discord紧急通知
elif hold_min >= HOLD_TIMEOUT_YELLOW_MIN:
logger.info(f"[{symbol}] 🟡 持仓{hold_min:.0f}分钟,接近超时")
# ============ 熔断动作 ============
async def trigger_circuit_break(session, conn, reason: str, action: str = "block_new"):
"""触发熔断"""
now = time.time()
risk_state.status = "circuit_break"
risk_state.circuit_break_reason = reason
risk_state.circuit_break_time = now
if action == "block_new":
risk_state.block_new_entries = True
logger.error(f"🔴 熔断触发: {reason} | 动作: 禁止新开仓")
elif action == "reduce_only":
risk_state.block_new_entries = True
risk_state.reduce_only = True
logger.error(f"🔴 熔断触发: {reason} | 动作: 只减仓模式")
elif action == "close_all":
risk_state.block_new_entries = True
risk_state.reduce_only = True
logger.error(f"🔴🔴 熔断触发: {reason} | 动作: 全部平仓!")
# 执行全平
for symbol in SYMBOLS:
await binance_request(session, "DELETE", "/fapi/v1/allOpenOrders", {"symbol": symbol})
pos_data, _ = await binance_request(session, "GET", "/fapi/v2/positionRisk", {"symbol": symbol})
if isinstance(pos_data, list):
for p in pos_data:
amt = float(p.get("positionAmt", 0))
if amt != 0:
close_side = "SELL" if amt > 0 else "BUY"
qty_prec = SYMBOL_QTY_PRECISION.get(symbol, 3)
qty_str = f"{abs(amt):.{qty_prec}f}"
await binance_request(session, "POST", "/fapi/v1/order", {
"symbol": symbol, "side": close_side, "type": "MARKET",
"quantity": qty_str, "reduceOnly": "true",
})
logger.info(f"[{symbol}] 🔴 紧急平仓 {close_side} qty={qty_str}")
# 记录历史
risk_state.breaker_history.append({
"time": now,
"reason": reason,
"action": action,
})
# 写状态文件(供其他进程读取)
write_risk_state()
# 写event到DB
_log_event(conn, "critical", "risk", f"🔴 熔断: {reason} | 动作: {action}", detail={"reason": reason, "action": action})
def write_risk_state():
"""写风控状态到文件供live_executor读取判断是否可开仓"""
state_path = "/tmp/risk_guard_state.json"
try:
with open(state_path, "w") as f:
json.dump(risk_state.to_dict(), f)
except Exception as e:
logger.error(f"写风控状态失败: {e}")
def _log_event(conn, level, category, message, symbol=None, detail=None):
"""写live_events表"""
try:
cur = conn.cursor()
cur.execute(
"INSERT INTO live_events (level, category, symbol, message, detail) VALUES (%s, %s, %s, %s, %s)",
(level, category, symbol, message, json.dumps(detail) if detail else None)
)
conn.commit()
except Exception:
pass
def check_auto_resume():
"""检查是否可以自动恢复"""
if risk_state.status != "circuit_break":
return
now = time.time()
# 连续亏损冷却期到了
if (risk_state.circuit_break_reason
and "连续亏损" in risk_state.circuit_break_reason
and risk_state.circuit_break_time):
elapsed_min = (now - risk_state.circuit_break_time) / 60
if elapsed_min >= CONSECUTIVE_LOSS_COOLDOWN_MIN:
logger.info(f"✅ 连续亏损冷却期结束({CONSECUTIVE_LOSS_COOLDOWN_MIN}分钟),自动恢复交易")
risk_state.status = "normal"
risk_state.block_new_entries = False
risk_state.reduce_only = False
risk_state.circuit_break_reason = None
write_risk_state()
# API恢复
if (risk_state.circuit_break_reason
and "API" in risk_state.circuit_break_reason):
api_gap = now - risk_state.last_api_success
if api_gap < 10: # API恢复正常10秒
logger.info("✅ API连接恢复自动恢复交易")
risk_state.status = "normal"
risk_state.block_new_entries = False
risk_state.circuit_break_reason = None
write_risk_state()
# ============ 前端紧急指令处理 ============
EMERGENCY_FILE = "/tmp/risk_guard_emergency.json"
async def check_emergency_commands(session, conn):
"""读取前端发的紧急操作指令并执行"""
try:
with open(EMERGENCY_FILE) as f:
cmd = json.load(f)
# 读完立即删除,防止重复执行
os.remove(EMERGENCY_FILE)
except FileNotFoundError:
return
except Exception:
return
action = cmd.get("action")
user = cmd.get("user", "unknown")
logger.info(f"📩 收到紧急指令: {action} (操作人: {user})")
if action == "close_all":
await trigger_circuit_break(session, conn, f"人工紧急全平 (操作人: {user})", "close_all")
elif action == "block_new":
risk_state.block_new_entries = True
risk_state.status = "warning"
risk_state.circuit_break_reason = f"人工禁止新开仓 (操作人: {user})"
write_risk_state()
logger.warning(f"🟡 人工禁止新开仓 (操作人: {user})")
_log_event(conn, "warn", "risk", f"🟡 人工禁止新开仓 (操作人: {user})")
elif action == "resume":
risk_state.status = "normal"
risk_state.block_new_entries = False
risk_state.reduce_only = False
risk_state.circuit_break_reason = None
risk_state.circuit_break_time = None
write_risk_state()
logger.info(f"✅ 人工恢复交易 (操作人: {user})")
_log_event(conn, "info", "risk", f"✅ 人工恢复交易 (操作人: {user})")
else:
logger.warning(f"⚠ 未知紧急指令: {action}")
# ============ 主循环 ============
def ensure_db_conn(conn):
"""检查DB连接断线则重连"""
try:
conn.cursor().execute("SELECT 1")
return conn
except Exception:
logger.warning("⚠️ DB连接断开重连中...")
try:
conn.close()
except Exception:
pass
return psycopg2.connect(**DB_CONFIG)
async def main():
logger.info("=" * 60)
logger.info(f"🛡 Risk Guard 启动 | 环境={TRADE_ENV}")
logger.info(f" 日限={DAILY_LOSS_LIMIT_R}R | 连亏限={CONSECUTIVE_LOSS_LIMIT}次 | 冷却={CONSECUTIVE_LOSS_COOLDOWN_MIN}分钟")
logger.info(f" 超时: {HOLD_TIMEOUT_YELLOW_MIN}min黄/{HOLD_TIMEOUT_RED_MIN}min红/{HOLD_TIMEOUT_AUTO_CLOSE_MIN}min自动平")
logger.info("=" * 60)
load_api_keys()
conn = psycopg2.connect(**DB_CONFIG)
# 初始状态写入
write_risk_state()
async with aiohttp.ClientSession() as session:
while True:
try:
conn = ensure_db_conn(conn)
# 0. 检查自动恢复
check_auto_resume()
# 0.5 检查前端紧急操作指令
await check_emergency_commands(session, conn)
# 1. 今日亏损检查
realized_r, consecutive = check_daily_loss(conn)
unrealized_r = await check_unrealized_loss(session)
total_r = realized_r + min(unrealized_r, 0) # 已实现 + 未实现亏损
if total_r <= DAILY_LOSS_LIMIT_R and risk_state.status != "circuit_break":
await trigger_circuit_break(
session, conn,
f"今日亏损{total_r:.2f}R超过日限{DAILY_LOSS_LIMIT_R}R",
"close_all"
)
# 2. 连续亏损检查
if (consecutive >= CONSECUTIVE_LOSS_LIMIT
and risk_state.status != "circuit_break"):
await trigger_circuit_break(
session, conn,
f"连续亏损{consecutive}次,超过限制{CONSECUTIVE_LOSS_LIMIT}",
"block_new"
)
risk_state.auto_resume_time = time.time() + CONSECUTIVE_LOSS_COOLDOWN_MIN * 60
# 3. API连接检查
freshness_issues = check_data_freshness()
if freshness_issues and risk_state.status != "circuit_break":
await trigger_circuit_break(
session, conn,
f"数据异常: {'; '.join(freshness_issues)}",
"block_new"
)
# 4. 余额检查
balance = await check_balance(session)
if balance < RISK_PER_TRADE_USD * MIN_BALANCE_MULTIPLE:
if not risk_state.block_new_entries:
risk_state.block_new_entries = True
logger.warning(f"🟡 余额不足: ${balance:.2f} < ${RISK_PER_TRADE_USD * MIN_BALANCE_MULTIPLE:.2f},暂停开仓")
# 5. 持仓超时检查
await check_hold_timeout(session, conn)
# 6. 写状态
write_risk_state()
# 日志每60秒输出一次状态
if int(time.time()) % 60 < CHECK_INTERVAL:
logger.info(
f"📊 风控状态: {risk_state.status} | "
f"已实现={realized_r:+.2f}R | 未实现={unrealized_r:+.2f}R | "
f"合计={total_r:+.2f}R | 连亏={consecutive} | "
f"余额=${balance:.2f}"
)
await asyncio.sleep(CHECK_INTERVAL)
except KeyboardInterrupt:
break
except Exception as e:
logger.error(f"❌ 风控检查异常: {e}", exc_info=True)
await asyncio.sleep(10)
conn.close()
logger.info("Risk Guard 已停止")
if __name__ == "__main__":
asyncio.run(main())