P0-1: JWT_SECRET生产环境强制配置,测试环境保留默认 P0-2: DB密码生产环境强制从env读,测试环境保留fallback P0-3: SL三次失败→查真实持仓→reduceOnly平仓→校验结果→写event P0-4: TP1后SL重挂失败则不推进tp1_hit状态,continue等下轮重试 P0-5: 超时自动平仓用SYMBOL_QTY_PRECISION格式化+校验结果 P0-6: 同币种去重改为不区分策略(币安单向模式共享净仓位) P1-1: 手续费窗口entry_ts-200→+200(避免纳入开仓前成交) 额外: 模拟盘*200和实盘*2硬编码→从配置动态读取
585 lines
22 KiB
Python
585 lines
22 KiB
Python
#!/usr/bin/env python3
|
||
"""
|
||
Risk Guard - 风控熔断模块
|
||
实时监控风险指标,触发熔断时自动执行保护动作
|
||
|
||
熔断规则:
|
||
1. 单日亏损超限(-5R) → 全平+停机
|
||
2. 连续亏损(5连亏) → 暂停开仓1小时
|
||
3. API连接异常(>30秒) → 暂停开仓
|
||
4. 余额不足(< 风险×2) → 拒绝开仓
|
||
5. 数据新鲜度超时 → 禁止新开仓
|
||
"""
|
||
|
||
import os
|
||
import sys
|
||
import json
|
||
import time
|
||
from pathlib import Path
|
||
try:
|
||
from dotenv import load_dotenv; load_dotenv(Path(__file__).parent / ".env")
|
||
except ImportError:
|
||
pass
|
||
import logging
|
||
import asyncio
|
||
import hashlib
|
||
import hmac
|
||
from urllib.parse import urlencode
|
||
from datetime import datetime, timezone
|
||
|
||
import psycopg2
|
||
import aiohttp
|
||
|
||
# ============ 配置 ============
|
||
|
||
TRADE_ENV = os.getenv("TRADE_ENV", "testnet")
|
||
BINANCE_ENDPOINTS = {
|
||
"testnet": "https://testnet.binancefuture.com",
|
||
"production": "https://fapi.binance.com",
|
||
}
|
||
BASE_URL = BINANCE_ENDPOINTS[TRADE_ENV]
|
||
|
||
_DB_PASSWORD = os.getenv("DB_PASSWORD", "arb_engine_2026" if TRADE_ENV == "testnet" else "")
|
||
if not _DB_PASSWORD:
|
||
print("FATAL: DB_PASSWORD 未设置(生产环境必须配置)", file=sys.stderr)
|
||
sys.exit(1)
|
||
|
||
DB_CONFIG = {
|
||
"host": os.getenv("DB_HOST", "10.106.0.3"),
|
||
"port": int(os.getenv("DB_PORT", "5432")),
|
||
"dbname": os.getenv("DB_NAME", "arb_engine"),
|
||
"user": os.getenv("DB_USER", "arb"),
|
||
"password": _DB_PASSWORD,
|
||
}
|
||
|
||
# 风控参数
|
||
DAILY_LOSS_LIMIT_R = float(os.getenv("DAILY_LOSS_LIMIT_R", "-5"))
|
||
CONSECUTIVE_LOSS_LIMIT = int(os.getenv("CONSECUTIVE_LOSS_LIMIT", "5"))
|
||
CONSECUTIVE_LOSS_COOLDOWN_MIN = int(os.getenv("CONSECUTIVE_LOSS_COOLDOWN_MIN", "60"))
|
||
API_DISCONNECT_THRESHOLD_SEC = int(os.getenv("API_DISCONNECT_THRESHOLD_SEC", "30"))
|
||
MIN_BALANCE_MULTIPLE = float(os.getenv("MIN_BALANCE_MULTIPLE", "2"))
|
||
RISK_PER_TRADE_USD = float(os.getenv("RISK_PER_TRADE_USD", "2"))
|
||
|
||
# 超时处置
|
||
HOLD_TIMEOUT_YELLOW_MIN = 45
|
||
HOLD_TIMEOUT_RED_MIN = 60
|
||
HOLD_TIMEOUT_GRACE_MIN = 10 # 红灯后10分钟人工窗口
|
||
HOLD_TIMEOUT_AUTO_CLOSE_MIN = HOLD_TIMEOUT_RED_MIN + HOLD_TIMEOUT_GRACE_MIN # 70分钟
|
||
|
||
# 数据新鲜度
|
||
MARKET_DATA_STALE_SEC = 10
|
||
ACCOUNT_UPDATE_STALE_SEC = 20
|
||
|
||
CHECK_INTERVAL = 5 # 风控检查间隔(秒)
|
||
|
||
from trade_config import SYMBOLS, SYMBOL_QTY_PRECISION
|
||
|
||
from logging.handlers import RotatingFileHandler
|
||
_log_fmt = "%(asctime)s [%(levelname)s] %(name)s: %(message)s"
|
||
logging.basicConfig(level=logging.INFO, format=_log_fmt)
|
||
logger = logging.getLogger("risk-guard")
|
||
_fh = RotatingFileHandler("logs/risk_guard.log", maxBytes=10*1024*1024, backupCount=5)
|
||
_fh.setFormatter(logging.Formatter(_log_fmt))
|
||
logger.addHandler(_fh)
|
||
|
||
# ============ 状态 ============
|
||
|
||
class RiskState:
|
||
"""风控状态管理"""
|
||
def __init__(self):
|
||
self.status = "normal" # normal / warning / circuit_break
|
||
self.block_new_entries = False
|
||
self.reduce_only = False
|
||
self.manual_override = False
|
||
self.circuit_break_reason = None
|
||
self.circuit_break_time = None
|
||
self.auto_resume_time = None
|
||
self.last_api_success = time.time()
|
||
self.last_market_data = time.time()
|
||
self.last_account_update = time.time()
|
||
self.consecutive_losses = 0
|
||
self.today_realized_r = 0.0
|
||
self.today_unrealized_r = 0.0
|
||
self.breaker_history = []
|
||
# 超时处置队列: {trade_id: {"entered_queue_ts": ..., "notified": bool}}
|
||
self.timeout_queue = {}
|
||
|
||
def to_dict(self):
|
||
return {
|
||
"status": self.status,
|
||
"block_new_entries": self.block_new_entries,
|
||
"reduce_only": self.reduce_only,
|
||
"circuit_break_reason": self.circuit_break_reason,
|
||
"circuit_break_time": self.circuit_break_time,
|
||
"auto_resume_time": self.auto_resume_time,
|
||
"consecutive_losses": self.consecutive_losses,
|
||
"today_realized_r": round(self.today_realized_r, 2),
|
||
"today_unrealized_r": round(self.today_unrealized_r, 2),
|
||
"today_total_r": round(self.today_realized_r + min(self.today_unrealized_r, 0), 2),
|
||
}
|
||
|
||
|
||
risk_state = RiskState()
|
||
|
||
# ============ API Key ============
|
||
_api_key = None
|
||
_secret_key = None
|
||
|
||
|
||
def load_api_keys():
|
||
global _api_key, _secret_key
|
||
_api_key = os.getenv("BINANCE_API_KEY")
|
||
_secret_key = os.getenv("BINANCE_SECRET_KEY")
|
||
if _api_key and _secret_key:
|
||
return
|
||
try:
|
||
from google.cloud import secretmanager
|
||
client = secretmanager.SecretManagerServiceClient()
|
||
project = os.getenv("GCP_PROJECT", "gen-lang-client-0835616737")
|
||
prefix = "binance-testnet" if TRADE_ENV == "testnet" else "binance-live"
|
||
_api_key = client.access_secret_version(
|
||
name=f"projects/{project}/secrets/{prefix}-api-key/versions/latest"
|
||
).payload.data.decode()
|
||
_secret_key = client.access_secret_version(
|
||
name=f"projects/{project}/secrets/{prefix}-secret-key/versions/latest"
|
||
).payload.data.decode()
|
||
except Exception as e:
|
||
logger.error(f"Failed to load API keys: {e}")
|
||
sys.exit(1)
|
||
|
||
|
||
def sign_params(params):
|
||
params["timestamp"] = int(time.time() * 1000)
|
||
query_string = urlencode(params)
|
||
signature = hmac.new(_secret_key.encode(), query_string.encode(), hashlib.sha256).hexdigest()
|
||
params["signature"] = signature
|
||
return params
|
||
|
||
|
||
async def binance_request(session, method, path, params=None):
|
||
url = f"{BASE_URL}{path}"
|
||
headers = {"X-MBX-APIKEY": _api_key}
|
||
if params is None:
|
||
params = {}
|
||
params = sign_params(params)
|
||
try:
|
||
if method == "GET":
|
||
async with session.get(url, params=params, headers=headers) as resp:
|
||
data = await resp.json()
|
||
risk_state.last_api_success = time.time()
|
||
return data, resp.status
|
||
elif method == "POST":
|
||
async with session.post(url, params=params, headers=headers) as resp:
|
||
data = await resp.json()
|
||
risk_state.last_api_success = time.time()
|
||
return data, resp.status
|
||
elif method == "DELETE":
|
||
async with session.delete(url, params=params, headers=headers) as resp:
|
||
data = await resp.json()
|
||
risk_state.last_api_success = time.time()
|
||
return data, resp.status
|
||
except Exception as e:
|
||
logger.error(f"API request failed: {e}")
|
||
return {"error": str(e)}, 500
|
||
|
||
|
||
# ============ 风控检查 ============
|
||
|
||
def check_daily_loss(conn):
|
||
"""检查今日已实现亏损"""
|
||
cur = conn.cursor()
|
||
# 今日UTC起始
|
||
today_start = datetime.now(timezone.utc).replace(hour=0, minute=0, second=0, microsecond=0)
|
||
today_start_ms = int(today_start.timestamp() * 1000)
|
||
|
||
cur.execute("""
|
||
SELECT COALESCE(SUM(pnl_r), 0) as total_r,
|
||
COUNT(*) FILTER (WHERE pnl_r < 0) as loss_count
|
||
FROM live_trades
|
||
WHERE exit_ts >= %s AND status NOT IN ('active', 'tp1_hit')
|
||
""", (today_start_ms,))
|
||
row = cur.fetchone()
|
||
realized_r = float(row[0]) if row[0] else 0
|
||
risk_state.today_realized_r = realized_r
|
||
|
||
# 检查连续亏损
|
||
cur.execute("""
|
||
SELECT pnl_r FROM live_trades
|
||
WHERE status NOT IN ('active', 'tp1_hit')
|
||
ORDER BY exit_ts DESC LIMIT %s
|
||
""", (CONSECUTIVE_LOSS_LIMIT,))
|
||
recent = [r[0] for r in cur.fetchall()]
|
||
consecutive = 0
|
||
for r in recent:
|
||
if r and r < 0:
|
||
consecutive += 1
|
||
else:
|
||
break
|
||
risk_state.consecutive_losses = consecutive
|
||
|
||
return realized_r, consecutive
|
||
|
||
|
||
async def check_unrealized_loss(session):
|
||
"""检查未实现亏损"""
|
||
data, status = await binance_request(session, "GET", "/fapi/v2/positionRisk")
|
||
total_unrealized = 0
|
||
if status == 200 and isinstance(data, list):
|
||
for pos in data:
|
||
pnl = float(pos.get("unRealizedProfit", 0))
|
||
total_unrealized += pnl
|
||
# 转为R
|
||
unrealized_r = total_unrealized / RISK_PER_TRADE_USD if RISK_PER_TRADE_USD > 0 else 0
|
||
risk_state.today_unrealized_r = unrealized_r
|
||
return unrealized_r
|
||
|
||
|
||
async def check_balance(session):
|
||
"""检查余额是否足够"""
|
||
data, status = await binance_request(session, "GET", "/fapi/v2/balance")
|
||
if status == 200 and isinstance(data, list):
|
||
for asset in data:
|
||
if asset.get("asset") == "USDT":
|
||
available = float(asset.get("availableBalance", 0))
|
||
return available
|
||
return 0
|
||
|
||
|
||
def check_data_freshness():
|
||
"""检查数据新鲜度"""
|
||
now = time.time()
|
||
issues = []
|
||
|
||
api_gap = now - risk_state.last_api_success
|
||
if api_gap > API_DISCONNECT_THRESHOLD_SEC:
|
||
issues.append(f"API无响应{api_gap:.0f}秒")
|
||
|
||
return issues
|
||
|
||
|
||
async def check_hold_timeout(session, conn):
|
||
"""检查持仓超时,管理处置队列"""
|
||
cur = conn.cursor()
|
||
cur.execute("""
|
||
SELECT id, symbol, direction, entry_ts, entry_price, risk_distance
|
||
FROM live_trades
|
||
WHERE status IN ('active', 'tp1_hit')
|
||
""")
|
||
now_ms = int(time.time() * 1000)
|
||
|
||
for row in cur.fetchall():
|
||
trade_id, symbol, direction, entry_ts, entry_price, rd = row
|
||
hold_min = (now_ms - entry_ts) / 60000
|
||
|
||
if hold_min >= HOLD_TIMEOUT_AUTO_CLOSE_MIN:
|
||
# 70分钟:人工窗口到期,自动平仓
|
||
if trade_id in risk_state.timeout_queue:
|
||
logger.warning(f"[{symbol}] ⏰ 持仓{hold_min:.0f}分钟,人工窗口到期,自动市价平仓!")
|
||
close_side = "SELL" if direction == "LONG" else "BUY"
|
||
# 取消所有挂单
|
||
await binance_request(session, "DELETE", "/fapi/v1/allOpenOrders", {"symbol": symbol})
|
||
# 查仓位大小
|
||
pos_data, _ = await binance_request(session, "GET", "/fapi/v2/positionRisk", {"symbol": symbol})
|
||
if isinstance(pos_data, list):
|
||
for p in pos_data:
|
||
amt = abs(float(p.get("positionAmt", 0)))
|
||
if amt > 0 and p["symbol"] == symbol:
|
||
qty_prec = SYMBOL_QTY_PRECISION.get(symbol, 3)
|
||
qty_str = f"{amt:.{qty_prec}f}"
|
||
close_data, close_status = await binance_request(session, "POST", "/fapi/v1/order", {
|
||
"symbol": symbol, "side": close_side, "type": "MARKET",
|
||
"quantity": qty_str, "reduceOnly": "true",
|
||
})
|
||
if close_status != 200:
|
||
logger.error(f"[{symbol}] ❌ 超时自动平仓失败: {close_data}")
|
||
else:
|
||
logger.info(f"[{symbol}] 🔴 超时自动平仓完成 qty={qty_str}")
|
||
del risk_state.timeout_queue[trade_id]
|
||
|
||
elif hold_min >= HOLD_TIMEOUT_RED_MIN:
|
||
# 60分钟:进入处置队列+10分钟倒计时
|
||
if trade_id not in risk_state.timeout_queue:
|
||
risk_state.timeout_queue[trade_id] = {
|
||
"entered_queue_ts": time.time(),
|
||
"notified": False,
|
||
"symbol": symbol,
|
||
}
|
||
remaining = HOLD_TIMEOUT_GRACE_MIN
|
||
logger.warning(f"[{symbol}] 🔴 持仓{hold_min:.0f}分钟超时! 进入处置队列, {remaining}分钟后自动平仓")
|
||
# TODO: Discord通知范总
|
||
|
||
elif not risk_state.timeout_queue[trade_id]["notified"]:
|
||
risk_state.timeout_queue[trade_id]["notified"] = True
|
||
# TODO: Discord紧急通知
|
||
|
||
elif hold_min >= HOLD_TIMEOUT_YELLOW_MIN:
|
||
logger.info(f"[{symbol}] 🟡 持仓{hold_min:.0f}分钟,接近超时")
|
||
|
||
|
||
# ============ 熔断动作 ============
|
||
|
||
async def trigger_circuit_break(session, conn, reason: str, action: str = "block_new"):
|
||
"""触发熔断"""
|
||
now = time.time()
|
||
risk_state.status = "circuit_break"
|
||
risk_state.circuit_break_reason = reason
|
||
risk_state.circuit_break_time = now
|
||
|
||
if action == "block_new":
|
||
risk_state.block_new_entries = True
|
||
logger.error(f"🔴 熔断触发: {reason} | 动作: 禁止新开仓")
|
||
|
||
elif action == "reduce_only":
|
||
risk_state.block_new_entries = True
|
||
risk_state.reduce_only = True
|
||
logger.error(f"🔴 熔断触发: {reason} | 动作: 只减仓模式")
|
||
|
||
elif action == "close_all":
|
||
risk_state.block_new_entries = True
|
||
risk_state.reduce_only = True
|
||
logger.error(f"🔴🔴 熔断触发: {reason} | 动作: 全部平仓!")
|
||
|
||
# 执行全平
|
||
for symbol in SYMBOLS:
|
||
await binance_request(session, "DELETE", "/fapi/v1/allOpenOrders", {"symbol": symbol})
|
||
pos_data, _ = await binance_request(session, "GET", "/fapi/v2/positionRisk", {"symbol": symbol})
|
||
if isinstance(pos_data, list):
|
||
for p in pos_data:
|
||
amt = float(p.get("positionAmt", 0))
|
||
if amt != 0:
|
||
close_side = "SELL" if amt > 0 else "BUY"
|
||
qty_prec = SYMBOL_QTY_PRECISION.get(symbol, 3)
|
||
qty_str = f"{abs(amt):.{qty_prec}f}"
|
||
await binance_request(session, "POST", "/fapi/v1/order", {
|
||
"symbol": symbol, "side": close_side, "type": "MARKET",
|
||
"quantity": qty_str, "reduceOnly": "true",
|
||
})
|
||
logger.info(f"[{symbol}] 🔴 紧急平仓 {close_side} qty={qty_str}")
|
||
|
||
# 记录历史
|
||
risk_state.breaker_history.append({
|
||
"time": now,
|
||
"reason": reason,
|
||
"action": action,
|
||
})
|
||
|
||
# 写状态文件(供其他进程读取)
|
||
write_risk_state()
|
||
|
||
# 写event到DB
|
||
_log_event(conn, "critical", "risk", f"🔴 熔断: {reason} | 动作: {action}", detail={"reason": reason, "action": action})
|
||
|
||
|
||
def write_risk_state():
|
||
"""写风控状态到文件(供live_executor读取判断是否可开仓)"""
|
||
state_path = "/tmp/risk_guard_state.json"
|
||
try:
|
||
with open(state_path, "w") as f:
|
||
json.dump(risk_state.to_dict(), f)
|
||
except Exception as e:
|
||
logger.error(f"写风控状态失败: {e}")
|
||
|
||
|
||
def _log_event(conn, level, category, message, symbol=None, detail=None):
|
||
"""写live_events表"""
|
||
try:
|
||
cur = conn.cursor()
|
||
cur.execute(
|
||
"INSERT INTO live_events (level, category, symbol, message, detail) VALUES (%s, %s, %s, %s, %s)",
|
||
(level, category, symbol, message, json.dumps(detail) if detail else None)
|
||
)
|
||
conn.commit()
|
||
except Exception:
|
||
pass
|
||
|
||
|
||
def check_auto_resume():
|
||
"""检查是否可以自动恢复"""
|
||
if risk_state.status != "circuit_break":
|
||
return
|
||
|
||
now = time.time()
|
||
|
||
# 连续亏损冷却期到了
|
||
if (risk_state.circuit_break_reason
|
||
and "连续亏损" in risk_state.circuit_break_reason
|
||
and risk_state.circuit_break_time):
|
||
elapsed_min = (now - risk_state.circuit_break_time) / 60
|
||
if elapsed_min >= CONSECUTIVE_LOSS_COOLDOWN_MIN:
|
||
logger.info(f"✅ 连续亏损冷却期结束({CONSECUTIVE_LOSS_COOLDOWN_MIN}分钟),自动恢复交易")
|
||
risk_state.status = "normal"
|
||
risk_state.block_new_entries = False
|
||
risk_state.reduce_only = False
|
||
risk_state.circuit_break_reason = None
|
||
write_risk_state()
|
||
|
||
# API恢复(仅限API断连导致的熔断,日限亏损等不自动恢复)
|
||
if (risk_state.circuit_break_reason
|
||
and risk_state.circuit_break_reason.startswith("API")
|
||
and "日限" not in risk_state.circuit_break_reason
|
||
and "人工" not in risk_state.circuit_break_reason):
|
||
api_gap = now - risk_state.last_api_success
|
||
if api_gap < 10: # API恢复正常10秒
|
||
logger.info("✅ API连接恢复,自动恢复交易")
|
||
risk_state.status = "normal"
|
||
risk_state.block_new_entries = False
|
||
risk_state.circuit_break_reason = None
|
||
write_risk_state()
|
||
|
||
|
||
# ============ 前端紧急指令处理 ============
|
||
|
||
EMERGENCY_FILE = "/tmp/risk_guard_emergency.json"
|
||
|
||
async def check_emergency_commands(session, conn):
|
||
"""读取前端发的紧急操作指令并执行"""
|
||
try:
|
||
with open(EMERGENCY_FILE) as f:
|
||
cmd = json.load(f)
|
||
except FileNotFoundError:
|
||
return
|
||
except Exception:
|
||
return
|
||
|
||
action = cmd.get("action")
|
||
user = cmd.get("user", "unknown")
|
||
logger.info(f"📩 收到紧急指令: {action} (操作人: {user})")
|
||
|
||
if action == "close_all":
|
||
await trigger_circuit_break(session, conn, f"人工紧急全平 (操作人: {user})", "close_all")
|
||
|
||
elif action == "block_new":
|
||
risk_state.block_new_entries = True
|
||
risk_state.status = "warning"
|
||
risk_state.circuit_break_reason = f"人工禁止新开仓 (操作人: {user})"
|
||
write_risk_state()
|
||
logger.warning(f"🟡 人工禁止新开仓 (操作人: {user})")
|
||
_log_event(conn, "warn", "risk", f"🟡 人工禁止新开仓 (操作人: {user})")
|
||
|
||
elif action == "resume":
|
||
risk_state.status = "normal"
|
||
risk_state.block_new_entries = False
|
||
risk_state.reduce_only = False
|
||
risk_state.circuit_break_reason = None
|
||
risk_state.circuit_break_time = None
|
||
write_risk_state()
|
||
logger.info(f"✅ 人工恢复交易 (操作人: {user})")
|
||
_log_event(conn, "info", "risk", f"✅ 人工恢复交易 (操作人: {user})")
|
||
|
||
else:
|
||
logger.warning(f"⚠ 未知紧急指令: {action}")
|
||
|
||
# 操作完成+state已写入后,才删除emergency文件(消除TOCTOU竞争)
|
||
try:
|
||
os.remove(EMERGENCY_FILE)
|
||
except Exception:
|
||
pass
|
||
|
||
|
||
# ============ 主循环 ============
|
||
|
||
def ensure_db_conn(conn):
|
||
"""检查DB连接,断线则重连"""
|
||
try:
|
||
conn.cursor().execute("SELECT 1")
|
||
return conn
|
||
except Exception:
|
||
logger.warning("⚠️ DB连接断开,重连中...")
|
||
try:
|
||
conn.close()
|
||
except Exception:
|
||
pass
|
||
return psycopg2.connect(**DB_CONFIG)
|
||
|
||
|
||
async def main():
|
||
logger.info("=" * 60)
|
||
logger.info(f"🛡 Risk Guard 启动 | 环境={TRADE_ENV}")
|
||
logger.info(f" 日限={DAILY_LOSS_LIMIT_R}R | 连亏限={CONSECUTIVE_LOSS_LIMIT}次 | 冷却={CONSECUTIVE_LOSS_COOLDOWN_MIN}分钟")
|
||
logger.info(f" 超时: {HOLD_TIMEOUT_YELLOW_MIN}min黄/{HOLD_TIMEOUT_RED_MIN}min红/{HOLD_TIMEOUT_AUTO_CLOSE_MIN}min自动平")
|
||
logger.info("=" * 60)
|
||
|
||
load_api_keys()
|
||
conn = psycopg2.connect(**DB_CONFIG)
|
||
|
||
# 初始状态写入
|
||
write_risk_state()
|
||
|
||
async with aiohttp.ClientSession() as session:
|
||
while True:
|
||
try:
|
||
conn = ensure_db_conn(conn)
|
||
# 0. 检查自动恢复
|
||
check_auto_resume()
|
||
|
||
# 0.5 检查前端紧急操作指令
|
||
await check_emergency_commands(session, conn)
|
||
|
||
# 1. 今日亏损检查
|
||
realized_r, consecutive = check_daily_loss(conn)
|
||
unrealized_r = await check_unrealized_loss(session)
|
||
total_r = realized_r + min(unrealized_r, 0) # 已实现 + 未实现亏损
|
||
|
||
if total_r <= DAILY_LOSS_LIMIT_R and risk_state.status != "circuit_break":
|
||
await trigger_circuit_break(
|
||
session, conn,
|
||
f"今日亏损{total_r:.2f}R,超过日限{DAILY_LOSS_LIMIT_R}R",
|
||
"close_all"
|
||
)
|
||
|
||
# 2. 连续亏损检查
|
||
if (consecutive >= CONSECUTIVE_LOSS_LIMIT
|
||
and risk_state.status != "circuit_break"):
|
||
await trigger_circuit_break(
|
||
session, conn,
|
||
f"连续亏损{consecutive}次,超过限制{CONSECUTIVE_LOSS_LIMIT}次",
|
||
"block_new"
|
||
)
|
||
risk_state.auto_resume_time = time.time() + CONSECUTIVE_LOSS_COOLDOWN_MIN * 60
|
||
|
||
# 3. API连接检查
|
||
freshness_issues = check_data_freshness()
|
||
if freshness_issues and risk_state.status != "circuit_break":
|
||
await trigger_circuit_break(
|
||
session, conn,
|
||
f"数据异常: {'; '.join(freshness_issues)}",
|
||
"block_new"
|
||
)
|
||
|
||
# 4. 余额检查
|
||
balance = await check_balance(session)
|
||
if balance < RISK_PER_TRADE_USD * MIN_BALANCE_MULTIPLE:
|
||
if not risk_state.block_new_entries:
|
||
risk_state.block_new_entries = True
|
||
logger.warning(f"🟡 余额不足: ${balance:.2f} < ${RISK_PER_TRADE_USD * MIN_BALANCE_MULTIPLE:.2f},暂停开仓")
|
||
|
||
# 5. 持仓超时检查
|
||
await check_hold_timeout(session, conn)
|
||
|
||
# 6. 写状态
|
||
write_risk_state()
|
||
|
||
# 日志(每60秒输出一次状态)
|
||
if int(time.time()) % 60 < CHECK_INTERVAL:
|
||
logger.info(
|
||
f"📊 风控状态: {risk_state.status} | "
|
||
f"已实现={realized_r:+.2f}R | 未实现={unrealized_r:+.2f}R | "
|
||
f"合计={total_r:+.2f}R | 连亏={consecutive} | "
|
||
f"余额=${balance:.2f}"
|
||
)
|
||
|
||
await asyncio.sleep(CHECK_INTERVAL)
|
||
|
||
except KeyboardInterrupt:
|
||
break
|
||
except Exception as e:
|
||
logger.error(f"❌ 风控检查异常: {e}", exc_info=True)
|
||
await asyncio.sleep(10)
|
||
|
||
conn.close()
|
||
logger.info("Risk Guard 已停止")
|
||
|
||
|
||
if __name__ == "__main__":
|
||
asyncio.run(main())
|