arbitrage-engine/backend/risk_guard.py
dev-worker 27a51b4d19 fix: P0第二轮修复 — JWT安全/DB密码/SL紧急平仓reduceOnly/TP1状态守卫/超时精度/跨策略去重 + 硬编码消除
P0-1: JWT_SECRET生产环境强制配置,测试环境保留默认
P0-2: DB密码生产环境强制从env读,测试环境保留fallback
P0-3: SL三次失败→查真实持仓→reduceOnly平仓→校验结果→写event
P0-4: TP1后SL重挂失败则不推进tp1_hit状态,continue等下轮重试
P0-5: 超时自动平仓用SYMBOL_QTY_PRECISION格式化+校验结果
P0-6: 同币种去重改为不区分策略(币安单向模式共享净仓位)
P1-1: 手续费窗口entry_ts-200→+200(避免纳入开仓前成交)
额外: 模拟盘*200和实盘*2硬编码→从配置动态读取
2026-03-02 16:11:43 +00:00

585 lines
22 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
Risk Guard - 风控熔断模块
实时监控风险指标,触发熔断时自动执行保护动作
熔断规则:
1. 单日亏损超限(-5R) → 全平+停机
2. 连续亏损(5连亏) → 暂停开仓1小时
3. API连接异常(>30秒) → 暂停开仓
4. 余额不足(< 风险×2) → 拒绝开仓
5. 数据新鲜度超时 → 禁止新开仓
"""
import os
import sys
import json
import time
from pathlib import Path
try:
from dotenv import load_dotenv; load_dotenv(Path(__file__).parent / ".env")
except ImportError:
pass
import logging
import asyncio
import hashlib
import hmac
from urllib.parse import urlencode
from datetime import datetime, timezone
import psycopg2
import aiohttp
# ============ 配置 ============
TRADE_ENV = os.getenv("TRADE_ENV", "testnet")
BINANCE_ENDPOINTS = {
"testnet": "https://testnet.binancefuture.com",
"production": "https://fapi.binance.com",
}
BASE_URL = BINANCE_ENDPOINTS[TRADE_ENV]
_DB_PASSWORD = os.getenv("DB_PASSWORD", "arb_engine_2026" if TRADE_ENV == "testnet" else "")
if not _DB_PASSWORD:
print("FATAL: DB_PASSWORD 未设置(生产环境必须配置)", file=sys.stderr)
sys.exit(1)
DB_CONFIG = {
"host": os.getenv("DB_HOST", "10.106.0.3"),
"port": int(os.getenv("DB_PORT", "5432")),
"dbname": os.getenv("DB_NAME", "arb_engine"),
"user": os.getenv("DB_USER", "arb"),
"password": _DB_PASSWORD,
}
# 风控参数
DAILY_LOSS_LIMIT_R = float(os.getenv("DAILY_LOSS_LIMIT_R", "-5"))
CONSECUTIVE_LOSS_LIMIT = int(os.getenv("CONSECUTIVE_LOSS_LIMIT", "5"))
CONSECUTIVE_LOSS_COOLDOWN_MIN = int(os.getenv("CONSECUTIVE_LOSS_COOLDOWN_MIN", "60"))
API_DISCONNECT_THRESHOLD_SEC = int(os.getenv("API_DISCONNECT_THRESHOLD_SEC", "30"))
MIN_BALANCE_MULTIPLE = float(os.getenv("MIN_BALANCE_MULTIPLE", "2"))
RISK_PER_TRADE_USD = float(os.getenv("RISK_PER_TRADE_USD", "2"))
# 超时处置
HOLD_TIMEOUT_YELLOW_MIN = 45
HOLD_TIMEOUT_RED_MIN = 60
HOLD_TIMEOUT_GRACE_MIN = 10 # 红灯后10分钟人工窗口
HOLD_TIMEOUT_AUTO_CLOSE_MIN = HOLD_TIMEOUT_RED_MIN + HOLD_TIMEOUT_GRACE_MIN # 70分钟
# 数据新鲜度
MARKET_DATA_STALE_SEC = 10
ACCOUNT_UPDATE_STALE_SEC = 20
CHECK_INTERVAL = 5 # 风控检查间隔(秒)
from trade_config import SYMBOLS, SYMBOL_QTY_PRECISION
from logging.handlers import RotatingFileHandler
_log_fmt = "%(asctime)s [%(levelname)s] %(name)s: %(message)s"
logging.basicConfig(level=logging.INFO, format=_log_fmt)
logger = logging.getLogger("risk-guard")
_fh = RotatingFileHandler("logs/risk_guard.log", maxBytes=10*1024*1024, backupCount=5)
_fh.setFormatter(logging.Formatter(_log_fmt))
logger.addHandler(_fh)
# ============ 状态 ============
class RiskState:
"""风控状态管理"""
def __init__(self):
self.status = "normal" # normal / warning / circuit_break
self.block_new_entries = False
self.reduce_only = False
self.manual_override = False
self.circuit_break_reason = None
self.circuit_break_time = None
self.auto_resume_time = None
self.last_api_success = time.time()
self.last_market_data = time.time()
self.last_account_update = time.time()
self.consecutive_losses = 0
self.today_realized_r = 0.0
self.today_unrealized_r = 0.0
self.breaker_history = []
# 超时处置队列: {trade_id: {"entered_queue_ts": ..., "notified": bool}}
self.timeout_queue = {}
def to_dict(self):
return {
"status": self.status,
"block_new_entries": self.block_new_entries,
"reduce_only": self.reduce_only,
"circuit_break_reason": self.circuit_break_reason,
"circuit_break_time": self.circuit_break_time,
"auto_resume_time": self.auto_resume_time,
"consecutive_losses": self.consecutive_losses,
"today_realized_r": round(self.today_realized_r, 2),
"today_unrealized_r": round(self.today_unrealized_r, 2),
"today_total_r": round(self.today_realized_r + min(self.today_unrealized_r, 0), 2),
}
risk_state = RiskState()
# ============ API Key ============
_api_key = None
_secret_key = None
def load_api_keys():
global _api_key, _secret_key
_api_key = os.getenv("BINANCE_API_KEY")
_secret_key = os.getenv("BINANCE_SECRET_KEY")
if _api_key and _secret_key:
return
try:
from google.cloud import secretmanager
client = secretmanager.SecretManagerServiceClient()
project = os.getenv("GCP_PROJECT", "gen-lang-client-0835616737")
prefix = "binance-testnet" if TRADE_ENV == "testnet" else "binance-live"
_api_key = client.access_secret_version(
name=f"projects/{project}/secrets/{prefix}-api-key/versions/latest"
).payload.data.decode()
_secret_key = client.access_secret_version(
name=f"projects/{project}/secrets/{prefix}-secret-key/versions/latest"
).payload.data.decode()
except Exception as e:
logger.error(f"Failed to load API keys: {e}")
sys.exit(1)
def sign_params(params):
params["timestamp"] = int(time.time() * 1000)
query_string = urlencode(params)
signature = hmac.new(_secret_key.encode(), query_string.encode(), hashlib.sha256).hexdigest()
params["signature"] = signature
return params
async def binance_request(session, method, path, params=None):
url = f"{BASE_URL}{path}"
headers = {"X-MBX-APIKEY": _api_key}
if params is None:
params = {}
params = sign_params(params)
try:
if method == "GET":
async with session.get(url, params=params, headers=headers) as resp:
data = await resp.json()
risk_state.last_api_success = time.time()
return data, resp.status
elif method == "POST":
async with session.post(url, params=params, headers=headers) as resp:
data = await resp.json()
risk_state.last_api_success = time.time()
return data, resp.status
elif method == "DELETE":
async with session.delete(url, params=params, headers=headers) as resp:
data = await resp.json()
risk_state.last_api_success = time.time()
return data, resp.status
except Exception as e:
logger.error(f"API request failed: {e}")
return {"error": str(e)}, 500
# ============ 风控检查 ============
def check_daily_loss(conn):
"""检查今日已实现亏损"""
cur = conn.cursor()
# 今日UTC起始
today_start = datetime.now(timezone.utc).replace(hour=0, minute=0, second=0, microsecond=0)
today_start_ms = int(today_start.timestamp() * 1000)
cur.execute("""
SELECT COALESCE(SUM(pnl_r), 0) as total_r,
COUNT(*) FILTER (WHERE pnl_r < 0) as loss_count
FROM live_trades
WHERE exit_ts >= %s AND status NOT IN ('active', 'tp1_hit')
""", (today_start_ms,))
row = cur.fetchone()
realized_r = float(row[0]) if row[0] else 0
risk_state.today_realized_r = realized_r
# 检查连续亏损
cur.execute("""
SELECT pnl_r FROM live_trades
WHERE status NOT IN ('active', 'tp1_hit')
ORDER BY exit_ts DESC LIMIT %s
""", (CONSECUTIVE_LOSS_LIMIT,))
recent = [r[0] for r in cur.fetchall()]
consecutive = 0
for r in recent:
if r and r < 0:
consecutive += 1
else:
break
risk_state.consecutive_losses = consecutive
return realized_r, consecutive
async def check_unrealized_loss(session):
"""检查未实现亏损"""
data, status = await binance_request(session, "GET", "/fapi/v2/positionRisk")
total_unrealized = 0
if status == 200 and isinstance(data, list):
for pos in data:
pnl = float(pos.get("unRealizedProfit", 0))
total_unrealized += pnl
# 转为R
unrealized_r = total_unrealized / RISK_PER_TRADE_USD if RISK_PER_TRADE_USD > 0 else 0
risk_state.today_unrealized_r = unrealized_r
return unrealized_r
async def check_balance(session):
"""检查余额是否足够"""
data, status = await binance_request(session, "GET", "/fapi/v2/balance")
if status == 200 and isinstance(data, list):
for asset in data:
if asset.get("asset") == "USDT":
available = float(asset.get("availableBalance", 0))
return available
return 0
def check_data_freshness():
"""检查数据新鲜度"""
now = time.time()
issues = []
api_gap = now - risk_state.last_api_success
if api_gap > API_DISCONNECT_THRESHOLD_SEC:
issues.append(f"API无响应{api_gap:.0f}")
return issues
async def check_hold_timeout(session, conn):
"""检查持仓超时,管理处置队列"""
cur = conn.cursor()
cur.execute("""
SELECT id, symbol, direction, entry_ts, entry_price, risk_distance
FROM live_trades
WHERE status IN ('active', 'tp1_hit')
""")
now_ms = int(time.time() * 1000)
for row in cur.fetchall():
trade_id, symbol, direction, entry_ts, entry_price, rd = row
hold_min = (now_ms - entry_ts) / 60000
if hold_min >= HOLD_TIMEOUT_AUTO_CLOSE_MIN:
# 70分钟人工窗口到期自动平仓
if trade_id in risk_state.timeout_queue:
logger.warning(f"[{symbol}] ⏰ 持仓{hold_min:.0f}分钟,人工窗口到期,自动市价平仓!")
close_side = "SELL" if direction == "LONG" else "BUY"
# 取消所有挂单
await binance_request(session, "DELETE", "/fapi/v1/allOpenOrders", {"symbol": symbol})
# 查仓位大小
pos_data, _ = await binance_request(session, "GET", "/fapi/v2/positionRisk", {"symbol": symbol})
if isinstance(pos_data, list):
for p in pos_data:
amt = abs(float(p.get("positionAmt", 0)))
if amt > 0 and p["symbol"] == symbol:
qty_prec = SYMBOL_QTY_PRECISION.get(symbol, 3)
qty_str = f"{amt:.{qty_prec}f}"
close_data, close_status = await binance_request(session, "POST", "/fapi/v1/order", {
"symbol": symbol, "side": close_side, "type": "MARKET",
"quantity": qty_str, "reduceOnly": "true",
})
if close_status != 200:
logger.error(f"[{symbol}] ❌ 超时自动平仓失败: {close_data}")
else:
logger.info(f"[{symbol}] 🔴 超时自动平仓完成 qty={qty_str}")
del risk_state.timeout_queue[trade_id]
elif hold_min >= HOLD_TIMEOUT_RED_MIN:
# 60分钟进入处置队列+10分钟倒计时
if trade_id not in risk_state.timeout_queue:
risk_state.timeout_queue[trade_id] = {
"entered_queue_ts": time.time(),
"notified": False,
"symbol": symbol,
}
remaining = HOLD_TIMEOUT_GRACE_MIN
logger.warning(f"[{symbol}] 🔴 持仓{hold_min:.0f}分钟超时! 进入处置队列, {remaining}分钟后自动平仓")
# TODO: Discord通知范总
elif not risk_state.timeout_queue[trade_id]["notified"]:
risk_state.timeout_queue[trade_id]["notified"] = True
# TODO: Discord紧急通知
elif hold_min >= HOLD_TIMEOUT_YELLOW_MIN:
logger.info(f"[{symbol}] 🟡 持仓{hold_min:.0f}分钟,接近超时")
# ============ 熔断动作 ============
async def trigger_circuit_break(session, conn, reason: str, action: str = "block_new"):
"""触发熔断"""
now = time.time()
risk_state.status = "circuit_break"
risk_state.circuit_break_reason = reason
risk_state.circuit_break_time = now
if action == "block_new":
risk_state.block_new_entries = True
logger.error(f"🔴 熔断触发: {reason} | 动作: 禁止新开仓")
elif action == "reduce_only":
risk_state.block_new_entries = True
risk_state.reduce_only = True
logger.error(f"🔴 熔断触发: {reason} | 动作: 只减仓模式")
elif action == "close_all":
risk_state.block_new_entries = True
risk_state.reduce_only = True
logger.error(f"🔴🔴 熔断触发: {reason} | 动作: 全部平仓!")
# 执行全平
for symbol in SYMBOLS:
await binance_request(session, "DELETE", "/fapi/v1/allOpenOrders", {"symbol": symbol})
pos_data, _ = await binance_request(session, "GET", "/fapi/v2/positionRisk", {"symbol": symbol})
if isinstance(pos_data, list):
for p in pos_data:
amt = float(p.get("positionAmt", 0))
if amt != 0:
close_side = "SELL" if amt > 0 else "BUY"
qty_prec = SYMBOL_QTY_PRECISION.get(symbol, 3)
qty_str = f"{abs(amt):.{qty_prec}f}"
await binance_request(session, "POST", "/fapi/v1/order", {
"symbol": symbol, "side": close_side, "type": "MARKET",
"quantity": qty_str, "reduceOnly": "true",
})
logger.info(f"[{symbol}] 🔴 紧急平仓 {close_side} qty={qty_str}")
# 记录历史
risk_state.breaker_history.append({
"time": now,
"reason": reason,
"action": action,
})
# 写状态文件(供其他进程读取)
write_risk_state()
# 写event到DB
_log_event(conn, "critical", "risk", f"🔴 熔断: {reason} | 动作: {action}", detail={"reason": reason, "action": action})
def write_risk_state():
"""写风控状态到文件供live_executor读取判断是否可开仓"""
state_path = "/tmp/risk_guard_state.json"
try:
with open(state_path, "w") as f:
json.dump(risk_state.to_dict(), f)
except Exception as e:
logger.error(f"写风控状态失败: {e}")
def _log_event(conn, level, category, message, symbol=None, detail=None):
"""写live_events表"""
try:
cur = conn.cursor()
cur.execute(
"INSERT INTO live_events (level, category, symbol, message, detail) VALUES (%s, %s, %s, %s, %s)",
(level, category, symbol, message, json.dumps(detail) if detail else None)
)
conn.commit()
except Exception:
pass
def check_auto_resume():
"""检查是否可以自动恢复"""
if risk_state.status != "circuit_break":
return
now = time.time()
# 连续亏损冷却期到了
if (risk_state.circuit_break_reason
and "连续亏损" in risk_state.circuit_break_reason
and risk_state.circuit_break_time):
elapsed_min = (now - risk_state.circuit_break_time) / 60
if elapsed_min >= CONSECUTIVE_LOSS_COOLDOWN_MIN:
logger.info(f"✅ 连续亏损冷却期结束({CONSECUTIVE_LOSS_COOLDOWN_MIN}分钟),自动恢复交易")
risk_state.status = "normal"
risk_state.block_new_entries = False
risk_state.reduce_only = False
risk_state.circuit_break_reason = None
write_risk_state()
# API恢复仅限API断连导致的熔断日限亏损等不自动恢复
if (risk_state.circuit_break_reason
and risk_state.circuit_break_reason.startswith("API")
and "日限" not in risk_state.circuit_break_reason
and "人工" not in risk_state.circuit_break_reason):
api_gap = now - risk_state.last_api_success
if api_gap < 10: # API恢复正常10秒
logger.info("✅ API连接恢复自动恢复交易")
risk_state.status = "normal"
risk_state.block_new_entries = False
risk_state.circuit_break_reason = None
write_risk_state()
# ============ 前端紧急指令处理 ============
EMERGENCY_FILE = "/tmp/risk_guard_emergency.json"
async def check_emergency_commands(session, conn):
"""读取前端发的紧急操作指令并执行"""
try:
with open(EMERGENCY_FILE) as f:
cmd = json.load(f)
except FileNotFoundError:
return
except Exception:
return
action = cmd.get("action")
user = cmd.get("user", "unknown")
logger.info(f"📩 收到紧急指令: {action} (操作人: {user})")
if action == "close_all":
await trigger_circuit_break(session, conn, f"人工紧急全平 (操作人: {user})", "close_all")
elif action == "block_new":
risk_state.block_new_entries = True
risk_state.status = "warning"
risk_state.circuit_break_reason = f"人工禁止新开仓 (操作人: {user})"
write_risk_state()
logger.warning(f"🟡 人工禁止新开仓 (操作人: {user})")
_log_event(conn, "warn", "risk", f"🟡 人工禁止新开仓 (操作人: {user})")
elif action == "resume":
risk_state.status = "normal"
risk_state.block_new_entries = False
risk_state.reduce_only = False
risk_state.circuit_break_reason = None
risk_state.circuit_break_time = None
write_risk_state()
logger.info(f"✅ 人工恢复交易 (操作人: {user})")
_log_event(conn, "info", "risk", f"✅ 人工恢复交易 (操作人: {user})")
else:
logger.warning(f"⚠ 未知紧急指令: {action}")
# 操作完成+state已写入后才删除emergency文件消除TOCTOU竞争
try:
os.remove(EMERGENCY_FILE)
except Exception:
pass
# ============ 主循环 ============
def ensure_db_conn(conn):
"""检查DB连接断线则重连"""
try:
conn.cursor().execute("SELECT 1")
return conn
except Exception:
logger.warning("⚠️ DB连接断开重连中...")
try:
conn.close()
except Exception:
pass
return psycopg2.connect(**DB_CONFIG)
async def main():
logger.info("=" * 60)
logger.info(f"🛡 Risk Guard 启动 | 环境={TRADE_ENV}")
logger.info(f" 日限={DAILY_LOSS_LIMIT_R}R | 连亏限={CONSECUTIVE_LOSS_LIMIT}次 | 冷却={CONSECUTIVE_LOSS_COOLDOWN_MIN}分钟")
logger.info(f" 超时: {HOLD_TIMEOUT_YELLOW_MIN}min黄/{HOLD_TIMEOUT_RED_MIN}min红/{HOLD_TIMEOUT_AUTO_CLOSE_MIN}min自动平")
logger.info("=" * 60)
load_api_keys()
conn = psycopg2.connect(**DB_CONFIG)
# 初始状态写入
write_risk_state()
async with aiohttp.ClientSession() as session:
while True:
try:
conn = ensure_db_conn(conn)
# 0. 检查自动恢复
check_auto_resume()
# 0.5 检查前端紧急操作指令
await check_emergency_commands(session, conn)
# 1. 今日亏损检查
realized_r, consecutive = check_daily_loss(conn)
unrealized_r = await check_unrealized_loss(session)
total_r = realized_r + min(unrealized_r, 0) # 已实现 + 未实现亏损
if total_r <= DAILY_LOSS_LIMIT_R and risk_state.status != "circuit_break":
await trigger_circuit_break(
session, conn,
f"今日亏损{total_r:.2f}R超过日限{DAILY_LOSS_LIMIT_R}R",
"close_all"
)
# 2. 连续亏损检查
if (consecutive >= CONSECUTIVE_LOSS_LIMIT
and risk_state.status != "circuit_break"):
await trigger_circuit_break(
session, conn,
f"连续亏损{consecutive}次,超过限制{CONSECUTIVE_LOSS_LIMIT}",
"block_new"
)
risk_state.auto_resume_time = time.time() + CONSECUTIVE_LOSS_COOLDOWN_MIN * 60
# 3. API连接检查
freshness_issues = check_data_freshness()
if freshness_issues and risk_state.status != "circuit_break":
await trigger_circuit_break(
session, conn,
f"数据异常: {'; '.join(freshness_issues)}",
"block_new"
)
# 4. 余额检查
balance = await check_balance(session)
if balance < RISK_PER_TRADE_USD * MIN_BALANCE_MULTIPLE:
if not risk_state.block_new_entries:
risk_state.block_new_entries = True
logger.warning(f"🟡 余额不足: ${balance:.2f} < ${RISK_PER_TRADE_USD * MIN_BALANCE_MULTIPLE:.2f},暂停开仓")
# 5. 持仓超时检查
await check_hold_timeout(session, conn)
# 6. 写状态
write_risk_state()
# 日志每60秒输出一次状态
if int(time.time()) % 60 < CHECK_INTERVAL:
logger.info(
f"📊 风控状态: {risk_state.status} | "
f"已实现={realized_r:+.2f}R | 未实现={unrealized_r:+.2f}R | "
f"合计={total_r:+.2f}R | 连亏={consecutive} | "
f"余额=${balance:.2f}"
)
await asyncio.sleep(CHECK_INTERVAL)
except KeyboardInterrupt:
break
except Exception as e:
logger.error(f"❌ 风控检查异常: {e}", exc_info=True)
await asyncio.sleep(10)
conn.close()
logger.info("Risk Guard 已停止")
if __name__ == "__main__":
asyncio.run(main())