fix: 审阅P1/P2/P3全部修复

P1-1: exit_price按reduceOnly过滤平仓成交+加权平均价
P1-2: 资金费率窗口对齐8h结算周期(0/8/16 UTC),防双累加
P2-2: emergency文件先执行操作+写state再删除,消除TOCTOU
P2-3: API恢复auto_resume排除日限亏损/人工熔断
P2-4: fee汇总窗口从entry_ts-200ms起算(与P1-1一起修)
P3-1: SYMBOL_PRECISION提取到trade_config.py共用模块
P3-2: main.py 4处hardcoded risk_usd=2改为从live_config读
P3-3: CORS限制为arb.zhouyangclaw.com+localhost
P3-4: 三进程加RotatingFileHandler(10MB×5轮转)
This commit is contained in:
root 2026-03-02 14:04:33 +00:00
parent 638589852b
commit 8694e5cf3a
7 changed files with 86 additions and 52 deletions

1
.gitignore vendored
View File

@ -1 +1,2 @@
__pycache__/ __pycache__/
logs/*.log

View File

@ -77,20 +77,17 @@ def reload_live_config(conn):
except Exception as e: except Exception as e:
logger.warning(f"读取live_config失败: {e}") logger.warning(f"读取live_config失败: {e}")
# 币种精度(币安要求) # 币种精度(从共用配置导入)
SYMBOL_PRECISION = { from trade_config import SYMBOL_PRECISION
"BTCUSDT": {"qty": 3, "price": 1, "min_notional": 100},
"ETHUSDT": {"qty": 3, "price": 2, "min_notional": 20},
"XRPUSDT": {"qty": 0, "price": 4, "min_notional": 5},
"SOLUSDT": {"qty": 2, "price": 2, "min_notional": 5},
}
# 日志 # 日志
logging.basicConfig( from logging.handlers import RotatingFileHandler
level=logging.INFO, _log_fmt = "%(asctime)s [%(levelname)s] %(name)s: %(message)s"
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s", logging.basicConfig(level=logging.INFO, format=_log_fmt)
)
logger = logging.getLogger("live-executor") logger = logging.getLogger("live-executor")
_fh = RotatingFileHandler("logs/live_executor.log", maxBytes=10*1024*1024, backupCount=5)
_fh.setFormatter(logging.Formatter(_log_fmt))
logger.addHandler(_fh)
# ============ API Key管理 ============ # ============ API Key管理 ============

0
backend/logs/.gitkeep Normal file
View File

View File

@ -15,7 +15,7 @@ app = FastAPI(title="Arbitrage Engine API")
app.add_middleware( app.add_middleware(
CORSMiddleware, CORSMiddleware,
allow_origins=["*"], allow_origins=["https://arb.zhouyangclaw.com", "http://localhost:3000", "http://localhost:3001"],
allow_methods=["*"], allow_methods=["*"],
allow_headers=["*"], allow_headers=["*"],
) )
@ -1227,7 +1227,7 @@ async def live_summary(
"total_trades": total, "total_trades": total,
"win_rate": round(win_rate, 1), "win_rate": round(win_rate, 1),
"total_pnl_r": round(total_pnl, 2), "total_pnl_r": round(total_pnl, 2),
"total_pnl_usdt": round(total_pnl * 2, 2), # $2=1R "total_pnl_usdt": round(total_pnl * (await _get_risk_usd()), 2),
"active_positions": len(active), "active_positions": len(active),
"profit_factor": round(profit_factor, 2), "profit_factor": round(profit_factor, 2),
"total_fee_usdt": round(total_fee, 2), "total_fee_usdt": round(total_fee, 2),
@ -1351,7 +1351,7 @@ async def live_trades(
fee_usdt = r["fee_usdt"] or 0 fee_usdt = r["fee_usdt"] or 0
funding_usdt = r["funding_fee_usdt"] or 0 funding_usdt = r["funding_fee_usdt"] or 0
risk_usd = 2 # $2=1R risk_usd = await _get_risk_usd()
fee_r = fee_usdt / risk_usd if risk_usd > 0 else 0 fee_r = fee_usdt / risk_usd if risk_usd > 0 else 0
funding_r = abs(funding_usdt) / risk_usd if funding_usdt < 0 else 0 funding_r = abs(funding_usdt) / risk_usd if funding_usdt < 0 else 0
# slippage_r: 滑点造成的R损失 # slippage_r: 滑点造成的R损失
@ -1471,6 +1471,15 @@ async def live_risk_status(user: dict = Depends(get_current_user)):
return {"status": "unknown", "error": "risk_guard_state.json not found"} return {"status": "unknown", "error": "risk_guard_state.json not found"}
async def _get_risk_usd() -> float:
"""从live_config读取1R金额缓存60秒"""
try:
row = await async_fetchrow("SELECT value FROM live_config WHERE key = $1", "risk_per_trade_usd")
return float(row["value"]) if row else 2.0
except Exception:
return 2.0
def _require_admin(user: dict): def _require_admin(user: dict):
"""检查管理员权限""" """检查管理员权限"""
if user.get("role") != "admin": if user.get("role") != "admin":
@ -1578,7 +1587,7 @@ async def live_account(user: dict = Depends(get_current_user)):
"unrealized_pnl": round(unrealized, 2), "unrealized_pnl": round(unrealized, 2),
"effective_leverage": effective_leverage, "effective_leverage": effective_leverage,
"today_realized_r": round(today_realized_r, 2), "today_realized_r": round(today_realized_r, 2),
"today_realized_usdt": round(today_realized_r * 2, 2), "today_realized_usdt": round(today_realized_r * (await _get_risk_usd()), 2),
"today_fee": round(today_fee, 2), "today_fee": round(today_fee, 2),
"today_volume": round(today_volume, 2), "today_volume": round(today_volume, 2),
} }

View File

@ -48,20 +48,15 @@ MAX_REHANG_RETRIES = 2
MISMATCH_ESCALATION_SEC = 60 # 差异持续超过此秒数升级告警 MISMATCH_ESCALATION_SEC = 60 # 差异持续超过此秒数升级告警
RISK_PER_TRADE_USD = float(os.getenv("RISK_PER_TRADE_USD", "2")) # $2=1R RISK_PER_TRADE_USD = float(os.getenv("RISK_PER_TRADE_USD", "2")) # $2=1R
SYMBOLS = ["BTCUSDT", "ETHUSDT", "XRPUSDT", "SOLUSDT"] from trade_config import SYMBOLS, SYMBOL_PRECISION
SYMBOL_PRECISION = { from logging.handlers import RotatingFileHandler
"BTCUSDT": {"qty": 3, "price": 1}, _log_fmt = "%(asctime)s [%(levelname)s] %(name)s: %(message)s"
"ETHUSDT": {"qty": 3, "price": 2}, logging.basicConfig(level=logging.INFO, format=_log_fmt)
"XRPUSDT": {"qty": 0, "price": 4},
"SOLUSDT": {"qty": 2, "price": 2},
}
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
)
logger = logging.getLogger("position-sync") logger = logging.getLogger("position-sync")
_fh = RotatingFileHandler("logs/position_sync.log", maxBytes=10*1024*1024, backupCount=5)
_fh.setFormatter(logging.Formatter(_log_fmt))
logger.addHandler(_fh)
# ============ API Key ============ # ============ API Key ============
_api_key = None _api_key = None
@ -422,19 +417,26 @@ async def check_closed_positions(session, conn):
exit_price = lp["entry_price"] # fallback exit_price = lp["entry_price"] # fallback
# 尝试从最近交易记录获取成交价和手续费 # 尝试从最近交易记录获取成交价和手续费
entry_ts = lp.get("entry_ts", 0)
trades_data, trades_status = await binance_request(session, "GET", "/fapi/v1/userTrades", { trades_data, trades_status = await binance_request(session, "GET", "/fapi/v1/userTrades", {
"symbol": symbol, "limit": 20 "symbol": symbol, "startTime": entry_ts, "limit": 100
}) })
actual_fee_usdt = 0 actual_fee_usdt = 0
if trades_status == 200 and isinstance(trades_data, list) and trades_data: if trades_status == 200 and isinstance(trades_data, list) and trades_data:
# 取最近的平仓成交reduceOnly或最后几笔 # 过滤平仓成交LONG平仓是SELL(buyer=false), SHORT平仓是BUY(buyer=true)
last_trade = trades_data[-1] is_close_buyer = lp["direction"] == "SHORT"
exit_price = float(last_trade.get("price", exit_price)) close_trades = [t for t in trades_data if bool(t.get("buyer")) == is_close_buyer and int(t.get("time", 0)) > entry_ts + 1000]
# 汇总最近相关成交的手续费(开仓+平仓) if close_trades:
entry_ts = lp.get("entry_ts", 0) total_qty = sum(float(t["qty"]) for t in close_trades)
if total_qty > 0:
exit_price = sum(float(t["price"]) * float(t["qty"]) for t in close_trades) / total_qty
elif trades_data:
exit_price = float(trades_data[-1].get("price", exit_price))
# 汇总手续费开仓后200ms起算避免含其他策略成交
for t in trades_data: for t in trades_data:
t_time = int(t.get("time", 0)) t_time = int(t.get("time", 0))
if t_time >= entry_ts - 5000: # 开仓前5秒到现在的所有成交 if t_time >= entry_ts - 200:
actual_fee_usdt += abs(float(t.get("commission", 0))) actual_fee_usdt += abs(float(t.get("commission", 0)))
# 计算pnl — gross(不含费) # 计算pnl — gross(不含费)
@ -543,11 +545,18 @@ async def track_funding_fees(session, conn):
return return
# 查币安最近的funding收入记录 # 查币安最近的funding收入记录
# GET /fapi/v1/income?incomeType=FUNDING_FEE&limit=100 # 对齐到本次结算周期00:00/08:00/16:00 UTC
start_ts = int((now_ts - 3600) * 1000) # 最近1小时 from datetime import datetime, timezone
now_utc = datetime.fromtimestamp(now_ts, tz=timezone.utc)
hour = now_utc.hour
# 找到最近的结算时间点(0/8/16)
settlement_hour = (hour // 8) * 8
settlement_time = now_utc.replace(hour=settlement_hour, minute=0, second=0, microsecond=0)
settlement_start_ms = int(settlement_time.timestamp() * 1000)
data, status = await binance_request(session, "GET", "/fapi/v1/income", { data, status = await binance_request(session, "GET", "/fapi/v1/income", {
"incomeType": "FUNDING_FEE", "incomeType": "FUNDING_FEE",
"startTime": start_ts, "startTime": settlement_start_ms,
"limit": 100, "limit": 100,
}) })
@ -555,14 +564,13 @@ async def track_funding_fees(session, conn):
logger.warning(f"💰 查询funding income失败: {status}") logger.warning(f"💰 查询funding income失败: {status}")
return return
# 按symbol汇总本次结算的funding # 按symbol汇总本次结算的funding(只取本结算周期内的)
funding_by_symbol = {} funding_by_symbol = {}
for item in data: for item in data:
sym = item.get("symbol", "") sym = item.get("symbol", "")
income = float(item.get("income", 0)) income = float(item.get("income", 0))
ts = int(item.get("time", 0)) ts = int(item.get("time", 0))
# 只取最近30分钟内的本次结算的 if ts >= settlement_start_ms:
if now_ts * 1000 - ts < 1800000:
funding_by_symbol[sym] = funding_by_symbol.get(sym, 0) + income funding_by_symbol[sym] = funding_by_symbol.get(sym, 0) + income
if not funding_by_symbol: if not funding_by_symbol:

View File

@ -67,16 +67,15 @@ ACCOUNT_UPDATE_STALE_SEC = 20
CHECK_INTERVAL = 5 # 风控检查间隔(秒) CHECK_INTERVAL = 5 # 风控检查间隔(秒)
SYMBOLS = ["BTCUSDT", "ETHUSDT", "XRPUSDT", "SOLUSDT"] from trade_config import SYMBOLS, SYMBOL_QTY_PRECISION
SYMBOL_QTY_PRECISION = {
"BTCUSDT": 3, "ETHUSDT": 3, "XRPUSDT": 0, "SOLUSDT": 2,
}
logging.basicConfig( from logging.handlers import RotatingFileHandler
level=logging.INFO, _log_fmt = "%(asctime)s [%(levelname)s] %(name)s: %(message)s"
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s", logging.basicConfig(level=logging.INFO, format=_log_fmt)
)
logger = logging.getLogger("risk-guard") logger = logging.getLogger("risk-guard")
_fh = RotatingFileHandler("logs/risk_guard.log", maxBytes=10*1024*1024, backupCount=5)
_fh.setFormatter(logging.Formatter(_log_fmt))
logger.addHandler(_fh)
# ============ 状态 ============ # ============ 状态 ============
@ -404,9 +403,11 @@ def check_auto_resume():
risk_state.circuit_break_reason = None risk_state.circuit_break_reason = None
write_risk_state() write_risk_state()
# API恢复 # API恢复仅限API断连导致的熔断日限亏损等不自动恢复
if (risk_state.circuit_break_reason if (risk_state.circuit_break_reason
and "API" in risk_state.circuit_break_reason): and risk_state.circuit_break_reason.startswith("API")
and "日限" not in risk_state.circuit_break_reason
and "人工" not in risk_state.circuit_break_reason):
api_gap = now - risk_state.last_api_success api_gap = now - risk_state.last_api_success
if api_gap < 10: # API恢复正常10秒 if api_gap < 10: # API恢复正常10秒
logger.info("✅ API连接恢复自动恢复交易") logger.info("✅ API连接恢复自动恢复交易")
@ -425,8 +426,6 @@ async def check_emergency_commands(session, conn):
try: try:
with open(EMERGENCY_FILE) as f: with open(EMERGENCY_FILE) as f:
cmd = json.load(f) cmd = json.load(f)
# 读完立即删除,防止重复执行
os.remove(EMERGENCY_FILE)
except FileNotFoundError: except FileNotFoundError:
return return
except Exception: except Exception:
@ -460,6 +459,12 @@ async def check_emergency_commands(session, conn):
else: else:
logger.warning(f"⚠ 未知紧急指令: {action}") logger.warning(f"⚠ 未知紧急指令: {action}")
# 操作完成+state已写入后才删除emergency文件消除TOCTOU竞争
try:
os.remove(EMERGENCY_FILE)
except Exception:
pass
# ============ 主循环 ============ # ============ 主循环 ============

14
backend/trade_config.py Normal file
View File

@ -0,0 +1,14 @@
"""交易配置常量 — 所有实盘模块共用"""
SYMBOLS = ["BTCUSDT", "ETHUSDT", "XRPUSDT", "SOLUSDT"]
# 币安合约精度qty=数量小数位, price=价格小数位, min_notional=最小名义价值)
SYMBOL_PRECISION = {
"BTCUSDT": {"qty": 3, "price": 1, "min_notional": 100},
"ETHUSDT": {"qty": 3, "price": 2, "min_notional": 20},
"XRPUSDT": {"qty": 0, "price": 4, "min_notional": 5},
"SOLUSDT": {"qty": 2, "price": 4, "min_notional": 5},
}
# qty精度快捷查询
SYMBOL_QTY_PRECISION = {sym: p["qty"] for sym, p in SYMBOL_PRECISION.items()}