arbitrage-engine/backend/position_sync.py
2026-03-31 08:56:11 +00:00

687 lines
26 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
Position Sync - 仓位同步与对账模块
每30秒对账本地DB vs 币安实际持仓/挂单SL缺失自动补挂
架构:
live_executor.py 开仓 → position_sync.py 持续对账 + 监控TP/SL状态
"""
import os
import sys
import json
from pathlib import Path
try:
from dotenv import load_dotenv; load_dotenv(Path(__file__).parent / ".env")
except ImportError:
pass
import time
import logging
import asyncio
import hashlib
import hmac
from urllib.parse import urlencode
import psycopg2
import aiohttp
# ============ 配置 ============
TRADE_ENV = os.getenv("TRADE_ENV", "testnet")
BINANCE_ENDPOINTS = {
"testnet": "https://testnet.binancefuture.com",
"production": "https://fapi.binance.com",
}
BASE_URL = BINANCE_ENDPOINTS[TRADE_ENV]
_DB_PASSWORD = os.getenv("DB_PASSWORD", "arb_engine_2026" if TRADE_ENV == "testnet" else "")
if not _DB_PASSWORD:
print("FATAL: DB_PASSWORD 未设置(生产环境必须配置)", file=sys.stderr)
sys.exit(1)
DB_CONFIG = {
"host": os.getenv("DB_HOST", "127.0.0.1"),
"port": int(os.getenv("DB_PORT", "5432")),
"dbname": os.getenv("DB_NAME", "arb_engine"),
"user": os.getenv("DB_USER", "arb"),
"password": _DB_PASSWORD,
}
CHECK_INTERVAL = 30 # 对账间隔(秒)
SL_REHANG_DELAYS = [0, 3] # SL补挂重试延迟(秒)
MAX_REHANG_RETRIES = 2
MISMATCH_ESCALATION_SEC = 60 # 差异持续超过此秒数升级告警
RISK_PER_TRADE_USD = float(os.getenv("RISK_PER_TRADE_USD", "2")) # $2=1R
from trade_config import SYMBOLS, SYMBOL_PRECISION
from logging.handlers import RotatingFileHandler
_log_fmt = "%(asctime)s [%(levelname)s] %(name)s: %(message)s"
logging.basicConfig(level=logging.INFO, format=_log_fmt)
logger = logging.getLogger("position-sync")
_fh = RotatingFileHandler("logs/position_sync.log", maxBytes=10*1024*1024, backupCount=5)
_fh.setFormatter(logging.Formatter(_log_fmt))
logger.addHandler(_fh)
# ============ API Key ============
_api_key = None
_secret_key = None
def load_api_keys():
global _api_key, _secret_key
_api_key = os.getenv("BINANCE_API_KEY")
_secret_key = os.getenv("BINANCE_SECRET_KEY")
if _api_key and _secret_key:
logger.info(f"API Key loaded from env")
return
try:
from google.cloud import secretmanager
client = secretmanager.SecretManagerServiceClient()
project = os.getenv("GCP_PROJECT", "gen-lang-client-0835616737")
prefix = "binance-testnet" if TRADE_ENV == "testnet" else "binance-live"
_api_key = client.access_secret_version(
name=f"projects/{project}/secrets/{prefix}-api-key/versions/latest"
).payload.data.decode()
_secret_key = client.access_secret_version(
name=f"projects/{project}/secrets/{prefix}-secret-key/versions/latest"
).payload.data.decode()
logger.info(f"API Key loaded from GCP Secret Manager")
except Exception as e:
logger.error(f"Failed to load API keys: {e}")
sys.exit(1)
def sign_params(params):
params["timestamp"] = int(time.time() * 1000)
query_string = urlencode(params)
signature = hmac.new(_secret_key.encode(), query_string.encode(), hashlib.sha256).hexdigest()
params["signature"] = signature
return params
async def binance_request(session, method, path, params=None):
url = f"{BASE_URL}{path}"
headers = {"X-MBX-APIKEY": _api_key}
if params is None:
params = {}
params = sign_params(params)
try:
if method == "GET":
async with session.get(url, params=params, headers=headers) as resp:
return await resp.json(), resp.status
elif method == "POST":
async with session.post(url, params=params, headers=headers) as resp:
return await resp.json(), resp.status
elif method == "DELETE":
async with session.delete(url, params=params, headers=headers) as resp:
return await resp.json(), resp.status
except Exception as e:
logger.error(f"Request failed: {e}")
return {"error": str(e)}, 500
# ============ 对账核心 ============
async def get_binance_positions(session):
"""获取币安所有持仓"""
data, status = await binance_request(session, "GET", "/fapi/v2/positionRisk")
positions = {}
if status == 200 and isinstance(data, list):
for pos in data:
amt = float(pos.get("positionAmt", 0))
if amt != 0:
positions[pos["symbol"]] = {
"amount": amt,
"direction": "LONG" if amt > 0 else "SHORT",
"entry_price": float(pos.get("entryPrice", 0)),
"unrealized_pnl": float(pos.get("unRealizedProfit", 0)),
"liquidation_price": float(pos.get("liquidationPrice", 0)),
"mark_price": float(pos.get("markPrice", 0)),
}
return positions
async def get_binance_open_orders(session, symbol=None):
"""获取币安挂单"""
params = {}
if symbol:
params["symbol"] = symbol
data, status = await binance_request(session, "GET", "/fapi/v1/openOrders", params)
if status == 200 and isinstance(data, list):
return data
return []
def get_local_positions(conn):
"""获取本地DB活跃持仓"""
cur = conn.cursor()
cur.execute("""
SELECT id, symbol, strategy, direction, entry_price, sl_price, tp1_price, tp2_price,
tp1_hit, status, risk_distance, binance_order_id, entry_ts, qty
FROM live_trades
WHERE status IN ('active', 'tp1_hit')
ORDER BY entry_ts DESC
""")
positions = []
for row in cur.fetchall():
positions.append({
"id": row[0], "symbol": row[1], "strategy": row[2], "direction": row[3],
"entry_price": row[4], "sl_price": row[5], "tp1_price": row[6], "tp2_price": row[7],
"tp1_hit": row[8], "status": row[9], "risk_distance": row[10],
"binance_order_id": row[11], "entry_ts": row[12],
"qty": row[13],
})
return positions
async def check_sl_exists(session, symbol, direction):
"""检查是否有SL挂单"""
orders = await get_binance_open_orders(session, symbol)
close_side = "SELL" if direction == "LONG" else "BUY"
for order in orders:
if order.get("type") == "STOP_MARKET" and order.get("side") == close_side:
return True, order
return False, None
async def rehang_sl(session, symbol, direction, sl_price, quantity):
"""补挂SL保护单"""
prec = SYMBOL_PRECISION.get(symbol, {"qty": 3, "price": 2})
close_side = "SELL" if direction == "LONG" else "BUY"
qty_str = f"{abs(quantity):.{prec['qty']}f}"
price_str = f"{sl_price:.{prec['price']}f}"
params = {
"symbol": symbol,
"side": close_side,
"type": "STOP_MARKET",
"stopPrice": price_str,
"quantity": qty_str,
"reduceOnly": "true",
}
data, status = await binance_request(session, "POST", "/fapi/v1/order", params)
return status == 200, data
async def reconcile(session, conn):
"""执行一次完整对账"""
local_positions = get_local_positions(conn)
binance_positions = await get_binance_positions(session)
issues = []
cur = conn.cursor()
# 1. 检查本地有仓但币安没有(漏单/已被清算)
for lp in local_positions:
symbol = lp["symbol"]
bp = binance_positions.get(symbol)
if not bp:
issues.append({
"type": "local_only",
"severity": "critical",
"symbol": symbol,
"detail": f"本地有{lp['direction']}仓位(id={lp['id']})但币安无持仓",
})
continue
# 方向不一致
if bp["direction"] != lp["direction"]:
issues.append({
"type": "direction_mismatch",
"severity": "critical",
"symbol": symbol,
"detail": f"本地={lp['direction']} vs 币安={bp['direction']}",
})
continue
# 2. 检查SL保护单是否存在
sl_exists, sl_order = await check_sl_exists(session, symbol, lp["direction"])
if not sl_exists:
issues.append({
"type": "sl_missing",
"severity": "critical",
"symbol": symbol,
"detail": f"SL保护单缺失! 仓位裸奔中!",
})
# 自动补挂SL
logger.warning(f"[{symbol}] ⚠️ SL缺失开始自动补挂...")
success = False
for attempt, delay in enumerate(SL_REHANG_DELAYS):
if delay > 0:
await asyncio.sleep(delay)
ok, data = await rehang_sl(session, symbol, lp["direction"], lp["sl_price"], bp["amount"])
if ok:
logger.info(f"[{symbol}] ✅ SL补挂成功 (attempt={attempt+1})")
success = True
break
else:
logger.error(f"[{symbol}] ❌ SL补挂失败 (attempt={attempt+1}): {data}")
if not success:
issues.append({
"type": "sl_rehang_failed",
"severity": "emergency",
"symbol": symbol,
"detail": f"SL补挂{MAX_REHANG_RETRIES}次全部失败! 建议进入只减仓模式!",
})
# 3. 检查清算距离
if bp.get("liquidation_price") and bp.get("mark_price"):
liq = bp["liquidation_price"]
mark = bp["mark_price"]
if liq > 0 and mark > 0:
if lp["direction"] == "LONG":
dist_pct = (mark - liq) / mark * 100
else:
dist_pct = (liq - mark) / mark * 100
if dist_pct < 8:
issues.append({
"type": "liquidation_near",
"severity": "emergency",
"symbol": symbol,
"detail": f"距清算仅{dist_pct:.1f}%! 建议立即减仓!",
})
elif dist_pct < 12:
issues.append({
"type": "liquidation_warning",
"severity": "high",
"symbol": symbol,
"detail": f"距清算{dist_pct:.1f}%",
})
elif dist_pct < 20:
issues.append({
"type": "liquidation_caution",
"severity": "medium",
"symbol": symbol,
"detail": f"距清算{dist_pct:.1f}%",
})
# 4. 检查币安有仓但本地没有(幽灵仓位)
local_symbols = {lp["symbol"] for lp in local_positions}
for symbol, bp in binance_positions.items():
if symbol not in local_symbols and symbol in [s for s in SYMBOLS]:
issues.append({
"type": "exchange_only",
"severity": "high",
"symbol": symbol,
"detail": f"币安有{bp['direction']}仓位但本地无记录",
})
# 记录对账结果
now_ms = int(time.time() * 1000)
result = {
"timestamp_ms": now_ms,
"local_count": len(local_positions),
"exchange_count": len(binance_positions),
"issues_count": len(issues),
"issues": issues,
"status": "ok" if len(issues) == 0 else "mismatch",
}
# 写对账结果到DB可选创建reconciliation_log表
if issues:
for issue in issues:
level = "🔴" if issue["severity"] in ("critical", "emergency") else "🟡"
logger.warning(f"{level} [{issue['symbol']}] {issue['type']}: {issue['detail']}")
else:
logger.info(f"✅ 对账正常 | 本地={len(local_positions)}仓 | 币安={len(binance_positions)}")
return result
# ============ TP1触发监控 ============
async def check_tp1_triggers(session, conn):
"""检查TP1是否触发触发后移SL到保本价"""
local_positions = get_local_positions(conn)
cur = conn.cursor()
for lp in local_positions:
if lp["tp1_hit"]:
continue # 已处理
symbol = lp["symbol"]
# 查币安挂单看TP1是否已成交不在挂单列表里了
orders = await get_binance_open_orders(session, symbol)
close_side = "SELL" if lp["direction"] == "LONG" else "BUY"
tp1_found = False
for order in orders:
if (order.get("type") == "TAKE_PROFIT_MARKET"
and order.get("side") == close_side
and abs(float(order.get("stopPrice", 0)) - lp["tp1_price"]) < lp["tp1_price"] * 0.001):
tp1_found = True
break
if not tp1_found and lp["status"] == "active":
# TP1可能已触发验证仓位是否减半
bp = (await get_binance_positions(session)).get(symbol)
entry_qty = lp.get("qty") or 0
if bp and entry_qty > 0 and abs(bp["amount"]) < entry_qty * 0.75:
# 确认TP1触发
logger.info(f"[{symbol}] ✅ TP1触发! 移SL到保本价")
# 取消旧SL
await binance_request(session, "DELETE", "/fapi/v1/allOpenOrders", {"symbol": symbol})
# 计算保本SL
if lp["direction"] == "LONG":
new_sl = lp["entry_price"] * 1.0005
else:
new_sl = lp["entry_price"] * 0.9995
# 挂新SL半仓— 失败则不推进状态
prec = SYMBOL_PRECISION.get(symbol, {"qty": 3, "price": 2})
ok, sl_resp = await rehang_sl(session, symbol, lp["direction"], new_sl, bp["amount"])
if not ok:
logger.error(f"[{symbol}] ❌ TP1后重挂SL失败: {sl_resp}不推进tp1_hit状态")
_log_event(conn, "critical", "trade",
"TP1后重挂SL失败仓位可能裸奔需人工确认", symbol,
{"trade_id": lp["id"], "sl_resp": str(sl_resp)})
continue
# 重新挂TP2半仓
tp2_price = lp["tp2_price"]
qty_str = f"{abs(bp['amount']):.{prec['qty']}f}"
price_str = f"{tp2_price:.{prec['price']}f}"
tp2_data, tp2_status = await binance_request(session, "POST", "/fapi/v1/order", {
"symbol": symbol, "side": close_side, "type": "TAKE_PROFIT_MARKET",
"stopPrice": price_str, "quantity": qty_str, "reduceOnly": "true",
})
if tp2_status != 200:
logger.error(f"[{symbol}] ❌ TP2重挂失败: {tp2_data}SL已挂但TP2缺失")
# SL成功才更新DB
cur.execute("""
UPDATE live_trades SET tp1_hit=TRUE, sl_price=%s, status='tp1_hit'
WHERE id=%s
""", (new_sl, lp["id"]))
conn.commit()
logger.info(f"[{symbol}] 🛡 SL移至保本 {new_sl:.4f}, TP2={tp2_price:.4f}")
# ============ 平仓检测 ============
async def check_closed_positions(session, conn):
"""检测已平仓的交易更新DB"""
local_positions = get_local_positions(conn)
binance_positions = await get_binance_positions(session)
cur = conn.cursor()
for lp in local_positions:
symbol = lp["symbol"]
bp = binance_positions.get(symbol)
# 币安无持仓但本地还active → 已平仓
if not bp:
logger.info(f"[{symbol}] 📝 检测到平仓,查询成交记录...")
# 查最近成交确定平仓价
# 简化:用当前标记价做近似
now_ms = int(time.time() * 1000)
rd = lp["risk_distance"] or 1
# 判断平仓类型(通过挂单是否还在来推断)
# 如果SL/TP都不在了说明触发了其中一个
status = "unknown"
exit_price = lp["entry_price"] # fallback
# 尝试从最近交易记录获取成交价和手续费
entry_ts = lp.get("entry_ts", 0)
trades_data, trades_status = await binance_request(session, "GET", "/fapi/v1/userTrades", {
"symbol": symbol, "startTime": entry_ts, "limit": 100
})
actual_fee_usdt = 0
if trades_status == 200 and isinstance(trades_data, list) and trades_data:
# 过滤平仓成交LONG平仓是SELL(buyer=false), SHORT平仓是BUY(buyer=true)
is_close_buyer = lp["direction"] == "SHORT"
close_trades = [t for t in trades_data if bool(t.get("buyer")) == is_close_buyer and int(t.get("time", 0)) > entry_ts + 1000]
if close_trades:
total_qty = sum(float(t["qty"]) for t in close_trades)
if total_qty > 0:
exit_price = sum(float(t["price"]) * float(t["qty"]) for t in close_trades) / total_qty
elif trades_data:
# fallback: 未找到明确平仓成交,延后本轮结算
logger.warning(f"[{symbol}] 未找到明确平仓成交,延后结算")
continue
# 汇总手续费开仓后200ms起算避免含其他策略成交
for t in trades_data:
t_time = int(t.get("time", 0))
if t_time >= entry_ts + 200: # 开仓后200ms起算避免纳入开仓前成交
actual_fee_usdt += abs(float(t.get("commission", 0)))
# 计算pnl — gross(不含费)
if lp["direction"] == "LONG":
gross_pnl_r = (exit_price - lp["entry_price"]) / rd
else:
gross_pnl_r = (lp["entry_price"] - exit_price) / rd
if lp["tp1_hit"]:
tp1_r = abs(lp["tp1_price"] - lp["entry_price"]) / rd
gross_pnl_r = 0.5 * tp1_r + 0.5 * gross_pnl_r
# 手续费(R) — 用实际成交手续费动态1R
_risk_usd = load_live_risk_usd(conn)
if actual_fee_usdt > 0:
fee_r = actual_fee_usdt / (_risk_usd if _risk_usd > 0 else rd)
else:
# fallback: 按0.1%估算(开+平各0.05%)
fee_r = 0.001 * lp["entry_price"] / rd
# funding费(R)
funding_usdt = 0
cur.execute("SELECT COALESCE(funding_fee_usdt, 0) FROM live_trades WHERE id = %s", (lp["id"],))
fr_row = cur.fetchone()
if fr_row:
funding_usdt = fr_row[0]
# P1-3: 正向funding也计入净PnL正值增加收益负值减少收益
funding_r = funding_usdt / (_risk_usd if _risk_usd > 0 else rd)
# 净PnL = gross - fee + funding (funding正值=收入,负值=支出)
pnl_r = gross_pnl_r - fee_r + funding_r
# 判断状态
if pnl_r > 0.5:
status = "tp"
elif pnl_r < -0.5:
status = "sl"
elif lp["tp1_hit"] and pnl_r >= -0.1:
status = "sl_be"
else:
status = "closed"
cur.execute("""
UPDATE live_trades SET status=%s, exit_price=%s, exit_ts=%s, pnl_r=%s, fee_usdt=%s
WHERE id=%s
""", (status, exit_price, now_ms, round(pnl_r, 4), round(actual_fee_usdt, 4), lp["id"]))
conn.commit()
logger.info(
f"[{symbol}] 📝 平仓: {status} | exit={exit_price:.4f} | "
f"gross={gross_pnl_r:+.3f}R fee={fee_r:.3f}R({actual_fee_usdt:.4f}$) "
f"funding={funding_usdt:+.4f}$ | net={pnl_r:+.3f}R"
)
# 写event
evt_level = "info" if pnl_r >= 0 else "warn"
_log_event(conn, evt_level, "trade",
f"平仓 {lp['direction']} {symbol} | {status} | net={pnl_r:+.3f}R (gross={gross_pnl_r:+.3f} fee=-{fee_r:.3f} fr={funding_usdt:+.4f}$)",
symbol, {"trade_id": lp["id"], "status": status, "pnl_r": round(pnl_r, 4), "exit_price": exit_price})
# ============ 资金费率结算追踪 ============
# 币安结算时间UTC 00:00, 08:00, 16:00
FUNDING_SETTLEMENT_HOURS = [0, 8, 16]
_last_funding_check_ts = 0 # 上次查funding的时间戳
async def track_funding_fees(session, conn):
"""
查询币安资金费率收支更新到live_trades的funding_fee_usdt字段。
只在结算时间点附近查询每8小时一次±5分钟窗口内查一次
"""
global _last_funding_check_ts
import datetime as _dt
now = _dt.datetime.now(_dt.timezone.utc)
now_ts = time.time()
# 判断是否在结算窗口内结算时间后0-5分钟
in_settlement_window = False
for h in FUNDING_SETTLEMENT_HOURS:
settlement_time = now.replace(hour=h, minute=0, second=0, microsecond=0)
diff_sec = (now - settlement_time).total_seconds()
if 0 <= diff_sec <= 300: # 结算后5分钟内
in_settlement_window = True
break
# 不在窗口内或者5分钟内已经查过了
if not in_settlement_window:
return
if now_ts - _last_funding_check_ts < 300:
return
_last_funding_check_ts = now_ts
logger.info("💰 资金费率结算窗口查询funding收支...")
# 获取当前活跃持仓
cur = conn.cursor()
cur.execute("""
SELECT id, symbol, direction, entry_ts, COALESCE(funding_fee_usdt, 0) as current_funding
FROM live_trades WHERE status IN ('active', 'tp1_hit')
""")
active = cur.fetchall()
if not active:
logger.info("💰 无活跃持仓跳过funding查询")
return
# 查币安最近的funding收入记录
# 对齐到本次结算周期00:00/08:00/16:00 UTC
from datetime import datetime, timezone
now_utc = datetime.fromtimestamp(now_ts, tz=timezone.utc)
hour = now_utc.hour
# 找到最近的结算时间点(0/8/16)
settlement_hour = (hour // 8) * 8
settlement_time = now_utc.replace(hour=settlement_hour, minute=0, second=0, microsecond=0)
settlement_start_ms = int(settlement_time.timestamp() * 1000)
data, status = await binance_request(session, "GET", "/fapi/v1/income", {
"incomeType": "FUNDING_FEE",
"startTime": settlement_start_ms,
"limit": 100,
})
if status != 200 or not isinstance(data, list):
logger.warning(f"💰 查询funding income失败: {status}")
return
# 按symbol汇总本次结算的funding只取本结算周期内的
funding_by_symbol = {}
for item in data:
sym = item.get("symbol", "")
income = float(item.get("income", 0))
ts = int(item.get("time", 0))
if ts >= settlement_start_ms:
funding_by_symbol[sym] = funding_by_symbol.get(sym, 0) + income
if not funding_by_symbol:
logger.info("💰 本次结算无funding记录")
return
# 更新到live_trades
for trade_id, symbol, direction, entry_ts, current_funding in active:
fr_income = funding_by_symbol.get(symbol, 0)
if fr_income != 0:
new_total = current_funding + fr_income
cur.execute("UPDATE live_trades SET funding_fee_usdt = %s WHERE id = %s", (new_total, trade_id))
logger.info(f"[{symbol}] 💰 Funding: {fr_income:+.4f} USDT (累计: {new_total:+.4f})")
conn.commit()
logger.info(f"💰 Funding更新完成: {funding_by_symbol}")
def _log_event(conn, level, category, message, symbol=None, detail=None):
"""写live_events表"""
try:
cur = conn.cursor()
cur.execute(
"INSERT INTO live_events (level, category, symbol, message, detail) VALUES (%s, %s, %s, %s, %s)",
(level, category, symbol, message, json.dumps(detail) if detail else None)
)
conn.commit()
except Exception:
pass
# ============ 主循环 ============
def load_live_risk_usd(conn, default=2.0):
"""从live_config动态读取1R金额与live_executor保持一致"""
try:
cur = conn.cursor()
cur.execute("SELECT value FROM live_config WHERE key='risk_per_trade_usd'")
row = cur.fetchone()
return float(row[0]) if row and row[0] else default
except Exception:
return default
def ensure_db_conn(conn):
"""检查DB连接断线则重连"""
try:
conn.cursor().execute("SELECT 1")
return conn
except Exception:
logger.warning("⚠️ DB连接断开重连中...")
try:
conn.close()
except Exception:
pass
return psycopg2.connect(**DB_CONFIG)
async def main():
logger.info("=" * 60)
logger.info(f"🔄 Position Sync 启动 | 环境={TRADE_ENV} | 间隔={CHECK_INTERVAL}")
logger.info("=" * 60)
load_api_keys()
conn = psycopg2.connect(**DB_CONFIG)
async with aiohttp.ClientSession() as session:
while True:
try:
conn = ensure_db_conn(conn)
# 1. 对账
result = await reconcile(session, conn)
# 2. 检查TP1触发
await check_tp1_triggers(session, conn)
# 3. 检查已平仓
await check_closed_positions(session, conn)
# 4. 资金费率结算追踪
await track_funding_fees(session, conn)
await asyncio.sleep(CHECK_INTERVAL)
except KeyboardInterrupt:
break
except Exception as e:
logger.error(f"❌ 对账异常: {e}", exc_info=True)
await asyncio.sleep(10)
conn.close()
logger.info("Position Sync 已停止")
if __name__ == "__main__":
asyncio.run(main())