arbitrage-engine/backend/position_sync.py
root fab3a3d909 feat: position_sync.py - 仓位对账+SL自动补挂+TP1监控+平仓检测
- 每30秒对账: 本地DB vs 币安持仓/挂单
- SL缺失自动补挂: 立即补→3s复检→再补→2次失败告警
- TP1触发监控: 自动移SL到保本+重挂TP2
- 平仓检测: 币安无仓→查成交记录→计算pnl_r→更新DB
- 清算距离三档告警: 20%黄/12%红/8%紧急
- 幽灵仓位检测: 币安有仓但本地无记录
2026-03-02 09:04:27 +00:00

495 lines
17 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
Position Sync - 仓位同步与对账模块
每30秒对账本地DB vs 币安实际持仓/挂单SL缺失自动补挂
架构:
live_executor.py 开仓 → position_sync.py 持续对账 + 监控TP/SL状态
"""
import os
import sys
import json
import time
import logging
import asyncio
import hashlib
import hmac
from urllib.parse import urlencode
import psycopg2
import aiohttp
# ============ 配置 ============
TRADE_ENV = os.getenv("TRADE_ENV", "testnet")
BINANCE_ENDPOINTS = {
"testnet": "https://testnet.binancefuture.com",
"production": "https://fapi.binance.com",
}
BASE_URL = BINANCE_ENDPOINTS[TRADE_ENV]
DB_CONFIG = {
"host": os.getenv("DB_HOST", "10.106.0.3"),
"port": int(os.getenv("DB_PORT", "5432")),
"dbname": os.getenv("DB_NAME", "arb_engine"),
"user": os.getenv("DB_USER", "arb"),
"password": os.getenv("DB_PASSWORD", "arb_engine_2026"),
}
CHECK_INTERVAL = 30 # 对账间隔(秒)
SL_REHANG_DELAYS = [0, 3] # SL补挂重试延迟(秒)
MAX_REHANG_RETRIES = 2
MISMATCH_ESCALATION_SEC = 60 # 差异持续超过此秒数升级告警
SYMBOLS = ["BTCUSDT", "ETHUSDT", "XRPUSDT", "SOLUSDT"]
SYMBOL_PRECISION = {
"BTCUSDT": {"qty": 3, "price": 1},
"ETHUSDT": {"qty": 3, "price": 2},
"XRPUSDT": {"qty": 0, "price": 4},
"SOLUSDT": {"qty": 2, "price": 2},
}
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
)
logger = logging.getLogger("position-sync")
# ============ API Key ============
_api_key = None
_secret_key = None
def load_api_keys():
global _api_key, _secret_key
_api_key = os.getenv("BINANCE_API_KEY")
_secret_key = os.getenv("BINANCE_SECRET_KEY")
if _api_key and _secret_key:
logger.info(f"API Key loaded from env")
return
try:
from google.cloud import secretmanager
client = secretmanager.SecretManagerServiceClient()
project = os.getenv("GCP_PROJECT", "gen-lang-client-0835616737")
prefix = "binance-testnet" if TRADE_ENV == "testnet" else "binance-live"
_api_key = client.access_secret_version(
name=f"projects/{project}/secrets/{prefix}-api-key/versions/latest"
).payload.data.decode()
_secret_key = client.access_secret_version(
name=f"projects/{project}/secrets/{prefix}-secret-key/versions/latest"
).payload.data.decode()
logger.info(f"API Key loaded from GCP Secret Manager")
except Exception as e:
logger.error(f"Failed to load API keys: {e}")
sys.exit(1)
def sign_params(params):
params["timestamp"] = int(time.time() * 1000)
query_string = urlencode(params)
signature = hmac.new(_secret_key.encode(), query_string.encode(), hashlib.sha256).hexdigest()
params["signature"] = signature
return params
async def binance_request(session, method, path, params=None):
url = f"{BASE_URL}{path}"
headers = {"X-MBX-APIKEY": _api_key}
if params is None:
params = {}
params = sign_params(params)
try:
if method == "GET":
async with session.get(url, params=params, headers=headers) as resp:
return await resp.json(), resp.status
elif method == "POST":
async with session.post(url, params=params, headers=headers) as resp:
return await resp.json(), resp.status
elif method == "DELETE":
async with session.delete(url, params=params, headers=headers) as resp:
return await resp.json(), resp.status
except Exception as e:
logger.error(f"Request failed: {e}")
return {"error": str(e)}, 500
# ============ 对账核心 ============
async def get_binance_positions(session):
"""获取币安所有持仓"""
data, status = await binance_request(session, "GET", "/fapi/v2/positionRisk")
positions = {}
if status == 200 and isinstance(data, list):
for pos in data:
amt = float(pos.get("positionAmt", 0))
if amt != 0:
positions[pos["symbol"]] = {
"amount": amt,
"direction": "LONG" if amt > 0 else "SHORT",
"entry_price": float(pos.get("entryPrice", 0)),
"unrealized_pnl": float(pos.get("unRealizedProfit", 0)),
"liquidation_price": float(pos.get("liquidationPrice", 0)),
"mark_price": float(pos.get("markPrice", 0)),
}
return positions
async def get_binance_open_orders(session, symbol=None):
"""获取币安挂单"""
params = {}
if symbol:
params["symbol"] = symbol
data, status = await binance_request(session, "GET", "/fapi/v1/openOrders", params)
if status == 200 and isinstance(data, list):
return data
return []
def get_local_positions(conn):
"""获取本地DB活跃持仓"""
cur = conn.cursor()
cur.execute("""
SELECT id, symbol, strategy, direction, entry_price, sl_price, tp1_price, tp2_price,
tp1_hit, status, risk_distance, binance_order_id
FROM live_trades
WHERE status IN ('active', 'tp1_hit')
ORDER BY entry_ts DESC
""")
positions = []
for row in cur.fetchall():
positions.append({
"id": row[0], "symbol": row[1], "strategy": row[2], "direction": row[3],
"entry_price": row[4], "sl_price": row[5], "tp1_price": row[6], "tp2_price": row[7],
"tp1_hit": row[8], "status": row[9], "risk_distance": row[10],
"binance_order_id": row[11],
})
return positions
async def check_sl_exists(session, symbol, direction):
"""检查是否有SL挂单"""
orders = await get_binance_open_orders(session, symbol)
close_side = "SELL" if direction == "LONG" else "BUY"
for order in orders:
if order.get("type") == "STOP_MARKET" and order.get("side") == close_side:
return True, order
return False, None
async def rehang_sl(session, symbol, direction, sl_price, quantity):
"""补挂SL保护单"""
prec = SYMBOL_PRECISION.get(symbol, {"qty": 3, "price": 2})
close_side = "SELL" if direction == "LONG" else "BUY"
qty_str = f"{abs(quantity):.{prec['qty']}f}"
price_str = f"{sl_price:.{prec['price']}f}"
params = {
"symbol": symbol,
"side": close_side,
"type": "STOP_MARKET",
"stopPrice": price_str,
"quantity": qty_str,
"reduceOnly": "true",
}
data, status = await binance_request(session, "POST", "/fapi/v1/order", params)
return status == 200, data
async def reconcile(session, conn):
"""执行一次完整对账"""
local_positions = get_local_positions(conn)
binance_positions = await get_binance_positions(session)
issues = []
cur = conn.cursor()
# 1. 检查本地有仓但币安没有(漏单/已被清算)
for lp in local_positions:
symbol = lp["symbol"]
bp = binance_positions.get(symbol)
if not bp:
issues.append({
"type": "local_only",
"severity": "critical",
"symbol": symbol,
"detail": f"本地有{lp['direction']}仓位(id={lp['id']})但币安无持仓",
})
continue
# 方向不一致
if bp["direction"] != lp["direction"]:
issues.append({
"type": "direction_mismatch",
"severity": "critical",
"symbol": symbol,
"detail": f"本地={lp['direction']} vs 币安={bp['direction']}",
})
continue
# 2. 检查SL保护单是否存在
sl_exists, sl_order = await check_sl_exists(session, symbol, lp["direction"])
if not sl_exists:
issues.append({
"type": "sl_missing",
"severity": "critical",
"symbol": symbol,
"detail": f"SL保护单缺失! 仓位裸奔中!",
})
# 自动补挂SL
logger.warning(f"[{symbol}] ⚠️ SL缺失开始自动补挂...")
success = False
for attempt, delay in enumerate(SL_REHANG_DELAYS):
if delay > 0:
await asyncio.sleep(delay)
ok, data = await rehang_sl(session, symbol, lp["direction"], lp["sl_price"], bp["amount"])
if ok:
logger.info(f"[{symbol}] ✅ SL补挂成功 (attempt={attempt+1})")
success = True
break
else:
logger.error(f"[{symbol}] ❌ SL补挂失败 (attempt={attempt+1}): {data}")
if not success:
issues.append({
"type": "sl_rehang_failed",
"severity": "emergency",
"symbol": symbol,
"detail": f"SL补挂{MAX_REHANG_RETRIES}次全部失败! 建议进入只减仓模式!",
})
# 3. 检查清算距离
if bp.get("liquidation_price") and bp.get("mark_price"):
liq = bp["liquidation_price"]
mark = bp["mark_price"]
if liq > 0 and mark > 0:
if lp["direction"] == "LONG":
dist_pct = (mark - liq) / mark * 100
else:
dist_pct = (liq - mark) / mark * 100
if dist_pct < 8:
issues.append({
"type": "liquidation_near",
"severity": "emergency",
"symbol": symbol,
"detail": f"距清算仅{dist_pct:.1f}%! 建议立即减仓!",
})
elif dist_pct < 12:
issues.append({
"type": "liquidation_warning",
"severity": "high",
"symbol": symbol,
"detail": f"距清算{dist_pct:.1f}%",
})
elif dist_pct < 20:
issues.append({
"type": "liquidation_caution",
"severity": "medium",
"symbol": symbol,
"detail": f"距清算{dist_pct:.1f}%",
})
# 4. 检查币安有仓但本地没有(幽灵仓位)
local_symbols = {lp["symbol"] for lp in local_positions}
for symbol, bp in binance_positions.items():
if symbol not in local_symbols and symbol in [s for s in SYMBOLS]:
issues.append({
"type": "exchange_only",
"severity": "high",
"symbol": symbol,
"detail": f"币安有{bp['direction']}仓位但本地无记录",
})
# 记录对账结果
now_ms = int(time.time() * 1000)
result = {
"timestamp_ms": now_ms,
"local_count": len(local_positions),
"exchange_count": len(binance_positions),
"issues_count": len(issues),
"issues": issues,
"status": "ok" if len(issues) == 0 else "mismatch",
}
# 写对账结果到DB可选创建reconciliation_log表
if issues:
for issue in issues:
level = "🔴" if issue["severity"] in ("critical", "emergency") else "🟡"
logger.warning(f"{level} [{issue['symbol']}] {issue['type']}: {issue['detail']}")
else:
logger.info(f"✅ 对账正常 | 本地={len(local_positions)}仓 | 币安={len(binance_positions)}")
return result
# ============ TP1触发监控 ============
async def check_tp1_triggers(session, conn):
"""检查TP1是否触发触发后移SL到保本价"""
local_positions = get_local_positions(conn)
cur = conn.cursor()
for lp in local_positions:
if lp["tp1_hit"]:
continue # 已处理
symbol = lp["symbol"]
# 查币安挂单看TP1是否已成交不在挂单列表里了
orders = await get_binance_open_orders(session, symbol)
close_side = "SELL" if lp["direction"] == "LONG" else "BUY"
tp1_found = False
for order in orders:
if (order.get("type") == "TAKE_PROFIT_MARKET"
and order.get("side") == close_side
and abs(float(order.get("stopPrice", 0)) - lp["tp1_price"]) < lp["tp1_price"] * 0.001):
tp1_found = True
break
if not tp1_found and lp["status"] == "active":
# TP1可能已触发验证仓位是否减半
bp = (await get_binance_positions(session)).get(symbol)
if bp and abs(bp["amount"]) < abs(float(lp.get("binance_order_id", "0") or "0")):
# 确认TP1触发
logger.info(f"[{symbol}] ✅ TP1触发! 移SL到保本价")
# 取消旧SL
await binance_request(session, "DELETE", "/fapi/v1/allOpenOrders", {"symbol": symbol})
# 计算保本SL
if lp["direction"] == "LONG":
new_sl = lp["entry_price"] * 1.0005
else:
new_sl = lp["entry_price"] * 0.9995
# 挂新SL半仓
prec = SYMBOL_PRECISION.get(symbol, {"qty": 3, "price": 2})
ok, _ = await rehang_sl(session, symbol, lp["direction"], new_sl, bp["amount"])
# 重新挂TP2半仓
tp2_price = lp["tp2_price"]
qty_str = f"{abs(bp['amount']):.{prec['qty']}f}"
price_str = f"{tp2_price:.{prec['price']}f}"
await binance_request(session, "POST", "/fapi/v1/order", {
"symbol": symbol, "side": close_side, "type": "TAKE_PROFIT_MARKET",
"stopPrice": price_str, "quantity": qty_str, "reduceOnly": "true",
})
# 更新DB
cur.execute("""
UPDATE live_trades SET tp1_hit=TRUE, sl_price=%s, status='tp1_hit'
WHERE id=%s
""", (new_sl, lp["id"]))
conn.commit()
logger.info(f"[{symbol}] 🛡 SL移至保本 {new_sl:.4f}, TP2={tp2_price:.4f}")
# ============ 平仓检测 ============
async def check_closed_positions(session, conn):
"""检测已平仓的交易更新DB"""
local_positions = get_local_positions(conn)
binance_positions = await get_binance_positions(session)
cur = conn.cursor()
for lp in local_positions:
symbol = lp["symbol"]
bp = binance_positions.get(symbol)
# 币安无持仓但本地还active → 已平仓
if not bp:
logger.info(f"[{symbol}] 📝 检测到平仓,查询成交记录...")
# 查最近成交确定平仓价
# 简化:用当前标记价做近似
now_ms = int(time.time() * 1000)
rd = lp["risk_distance"] or 1
# 判断平仓类型(通过挂单是否还在来推断)
# 如果SL/TP都不在了说明触发了其中一个
status = "unknown"
exit_price = lp["entry_price"] # fallback
# 尝试从最近交易记录获取
trades_data, trades_status = await binance_request(session, "GET", "/fapi/v1/userTrades", {
"symbol": symbol, "limit": 5
})
if trades_status == 200 and isinstance(trades_data, list) and trades_data:
last_trade = trades_data[-1]
exit_price = float(last_trade.get("price", exit_price))
# 计算pnl
if lp["direction"] == "LONG":
raw_pnl_r = (exit_price - lp["entry_price"]) / rd
else:
raw_pnl_r = (lp["entry_price"] - exit_price) / rd
if lp["tp1_hit"]:
tp1_r = abs(lp["tp1_price"] - lp["entry_price"]) / rd
pnl_r = 0.5 * tp1_r + 0.5 * raw_pnl_r
else:
pnl_r = raw_pnl_r
# 扣手续费
fee_r = 0.001 * lp["entry_price"] / rd
pnl_r -= fee_r
# 判断状态
if pnl_r > 0.5:
status = "tp"
elif pnl_r < -0.5:
status = "sl"
elif lp["tp1_hit"] and pnl_r >= -0.1:
status = "sl_be"
else:
status = "closed"
cur.execute("""
UPDATE live_trades SET status=%s, exit_price=%s, exit_ts=%s, pnl_r=%s
WHERE id=%s
""", (status, exit_price, now_ms, round(pnl_r, 4), lp["id"]))
conn.commit()
logger.info(f"[{symbol}] 📝 平仓记录: {status} | exit={exit_price:.4f} | pnl={pnl_r:+.2f}R")
# ============ 主循环 ============
async def main():
logger.info("=" * 60)
logger.info(f"🔄 Position Sync 启动 | 环境={TRADE_ENV} | 间隔={CHECK_INTERVAL}")
logger.info("=" * 60)
load_api_keys()
conn = psycopg2.connect(**DB_CONFIG)
async with aiohttp.ClientSession() as session:
while True:
try:
# 1. 对账
result = await reconcile(session, conn)
# 2. 检查TP1触发
await check_tp1_triggers(session, conn)
# 3. 检查已平仓
await check_closed_positions(session, conn)
await asyncio.sleep(CHECK_INTERVAL)
except KeyboardInterrupt:
break
except Exception as e:
logger.error(f"❌ 对账异常: {e}", exc_info=True)
await asyncio.sleep(10)
conn.close()
logger.info("Position Sync 已停止")
if __name__ == "__main__":
asyncio.run(main())