feat: position_sync.py - 仓位对账+SL自动补挂+TP1监控+平仓检测

- 每30秒对账: 本地DB vs 币安持仓/挂单
- SL缺失自动补挂: 立即补→3s复检→再补→2次失败告警
- TP1触发监控: 自动移SL到保本+重挂TP2
- 平仓检测: 币安无仓→查成交记录→计算pnl_r→更新DB
- 清算距离三档告警: 20%黄/12%红/8%紧急
- 幽灵仓位检测: 币安有仓但本地无记录
This commit is contained in:
root 2026-03-02 09:04:27 +00:00
parent 21970038df
commit fab3a3d909

494
backend/position_sync.py Normal file
View File

@ -0,0 +1,494 @@
#!/usr/bin/env python3
"""
Position Sync - 仓位同步与对账模块
每30秒对账本地DB vs 币安实际持仓/挂单SL缺失自动补挂
架构
live_executor.py 开仓 position_sync.py 持续对账 + 监控TP/SL状态
"""
import os
import sys
import json
import time
import logging
import asyncio
import hashlib
import hmac
from urllib.parse import urlencode
import psycopg2
import aiohttp
# ============ 配置 ============
TRADE_ENV = os.getenv("TRADE_ENV", "testnet")
BINANCE_ENDPOINTS = {
"testnet": "https://testnet.binancefuture.com",
"production": "https://fapi.binance.com",
}
BASE_URL = BINANCE_ENDPOINTS[TRADE_ENV]
DB_CONFIG = {
"host": os.getenv("DB_HOST", "10.106.0.3"),
"port": int(os.getenv("DB_PORT", "5432")),
"dbname": os.getenv("DB_NAME", "arb_engine"),
"user": os.getenv("DB_USER", "arb"),
"password": os.getenv("DB_PASSWORD", "arb_engine_2026"),
}
CHECK_INTERVAL = 30 # 对账间隔(秒)
SL_REHANG_DELAYS = [0, 3] # SL补挂重试延迟(秒)
MAX_REHANG_RETRIES = 2
MISMATCH_ESCALATION_SEC = 60 # 差异持续超过此秒数升级告警
SYMBOLS = ["BTCUSDT", "ETHUSDT", "XRPUSDT", "SOLUSDT"]
SYMBOL_PRECISION = {
"BTCUSDT": {"qty": 3, "price": 1},
"ETHUSDT": {"qty": 3, "price": 2},
"XRPUSDT": {"qty": 0, "price": 4},
"SOLUSDT": {"qty": 2, "price": 2},
}
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
)
logger = logging.getLogger("position-sync")
# ============ API Key ============
_api_key = None
_secret_key = None
def load_api_keys():
global _api_key, _secret_key
_api_key = os.getenv("BINANCE_API_KEY")
_secret_key = os.getenv("BINANCE_SECRET_KEY")
if _api_key and _secret_key:
logger.info(f"API Key loaded from env")
return
try:
from google.cloud import secretmanager
client = secretmanager.SecretManagerServiceClient()
project = os.getenv("GCP_PROJECT", "gen-lang-client-0835616737")
prefix = "binance-testnet" if TRADE_ENV == "testnet" else "binance-live"
_api_key = client.access_secret_version(
name=f"projects/{project}/secrets/{prefix}-api-key/versions/latest"
).payload.data.decode()
_secret_key = client.access_secret_version(
name=f"projects/{project}/secrets/{prefix}-secret-key/versions/latest"
).payload.data.decode()
logger.info(f"API Key loaded from GCP Secret Manager")
except Exception as e:
logger.error(f"Failed to load API keys: {e}")
sys.exit(1)
def sign_params(params):
params["timestamp"] = int(time.time() * 1000)
query_string = urlencode(params)
signature = hmac.new(_secret_key.encode(), query_string.encode(), hashlib.sha256).hexdigest()
params["signature"] = signature
return params
async def binance_request(session, method, path, params=None):
url = f"{BASE_URL}{path}"
headers = {"X-MBX-APIKEY": _api_key}
if params is None:
params = {}
params = sign_params(params)
try:
if method == "GET":
async with session.get(url, params=params, headers=headers) as resp:
return await resp.json(), resp.status
elif method == "POST":
async with session.post(url, params=params, headers=headers) as resp:
return await resp.json(), resp.status
elif method == "DELETE":
async with session.delete(url, params=params, headers=headers) as resp:
return await resp.json(), resp.status
except Exception as e:
logger.error(f"Request failed: {e}")
return {"error": str(e)}, 500
# ============ 对账核心 ============
async def get_binance_positions(session):
"""获取币安所有持仓"""
data, status = await binance_request(session, "GET", "/fapi/v2/positionRisk")
positions = {}
if status == 200 and isinstance(data, list):
for pos in data:
amt = float(pos.get("positionAmt", 0))
if amt != 0:
positions[pos["symbol"]] = {
"amount": amt,
"direction": "LONG" if amt > 0 else "SHORT",
"entry_price": float(pos.get("entryPrice", 0)),
"unrealized_pnl": float(pos.get("unRealizedProfit", 0)),
"liquidation_price": float(pos.get("liquidationPrice", 0)),
"mark_price": float(pos.get("markPrice", 0)),
}
return positions
async def get_binance_open_orders(session, symbol=None):
"""获取币安挂单"""
params = {}
if symbol:
params["symbol"] = symbol
data, status = await binance_request(session, "GET", "/fapi/v1/openOrders", params)
if status == 200 and isinstance(data, list):
return data
return []
def get_local_positions(conn):
"""获取本地DB活跃持仓"""
cur = conn.cursor()
cur.execute("""
SELECT id, symbol, strategy, direction, entry_price, sl_price, tp1_price, tp2_price,
tp1_hit, status, risk_distance, binance_order_id
FROM live_trades
WHERE status IN ('active', 'tp1_hit')
ORDER BY entry_ts DESC
""")
positions = []
for row in cur.fetchall():
positions.append({
"id": row[0], "symbol": row[1], "strategy": row[2], "direction": row[3],
"entry_price": row[4], "sl_price": row[5], "tp1_price": row[6], "tp2_price": row[7],
"tp1_hit": row[8], "status": row[9], "risk_distance": row[10],
"binance_order_id": row[11],
})
return positions
async def check_sl_exists(session, symbol, direction):
"""检查是否有SL挂单"""
orders = await get_binance_open_orders(session, symbol)
close_side = "SELL" if direction == "LONG" else "BUY"
for order in orders:
if order.get("type") == "STOP_MARKET" and order.get("side") == close_side:
return True, order
return False, None
async def rehang_sl(session, symbol, direction, sl_price, quantity):
"""补挂SL保护单"""
prec = SYMBOL_PRECISION.get(symbol, {"qty": 3, "price": 2})
close_side = "SELL" if direction == "LONG" else "BUY"
qty_str = f"{abs(quantity):.{prec['qty']}f}"
price_str = f"{sl_price:.{prec['price']}f}"
params = {
"symbol": symbol,
"side": close_side,
"type": "STOP_MARKET",
"stopPrice": price_str,
"quantity": qty_str,
"reduceOnly": "true",
}
data, status = await binance_request(session, "POST", "/fapi/v1/order", params)
return status == 200, data
async def reconcile(session, conn):
"""执行一次完整对账"""
local_positions = get_local_positions(conn)
binance_positions = await get_binance_positions(session)
issues = []
cur = conn.cursor()
# 1. 检查本地有仓但币安没有(漏单/已被清算)
for lp in local_positions:
symbol = lp["symbol"]
bp = binance_positions.get(symbol)
if not bp:
issues.append({
"type": "local_only",
"severity": "critical",
"symbol": symbol,
"detail": f"本地有{lp['direction']}仓位(id={lp['id']})但币安无持仓",
})
continue
# 方向不一致
if bp["direction"] != lp["direction"]:
issues.append({
"type": "direction_mismatch",
"severity": "critical",
"symbol": symbol,
"detail": f"本地={lp['direction']} vs 币安={bp['direction']}",
})
continue
# 2. 检查SL保护单是否存在
sl_exists, sl_order = await check_sl_exists(session, symbol, lp["direction"])
if not sl_exists:
issues.append({
"type": "sl_missing",
"severity": "critical",
"symbol": symbol,
"detail": f"SL保护单缺失! 仓位裸奔中!",
})
# 自动补挂SL
logger.warning(f"[{symbol}] ⚠️ SL缺失开始自动补挂...")
success = False
for attempt, delay in enumerate(SL_REHANG_DELAYS):
if delay > 0:
await asyncio.sleep(delay)
ok, data = await rehang_sl(session, symbol, lp["direction"], lp["sl_price"], bp["amount"])
if ok:
logger.info(f"[{symbol}] ✅ SL补挂成功 (attempt={attempt+1})")
success = True
break
else:
logger.error(f"[{symbol}] ❌ SL补挂失败 (attempt={attempt+1}): {data}")
if not success:
issues.append({
"type": "sl_rehang_failed",
"severity": "emergency",
"symbol": symbol,
"detail": f"SL补挂{MAX_REHANG_RETRIES}次全部失败! 建议进入只减仓模式!",
})
# 3. 检查清算距离
if bp.get("liquidation_price") and bp.get("mark_price"):
liq = bp["liquidation_price"]
mark = bp["mark_price"]
if liq > 0 and mark > 0:
if lp["direction"] == "LONG":
dist_pct = (mark - liq) / mark * 100
else:
dist_pct = (liq - mark) / mark * 100
if dist_pct < 8:
issues.append({
"type": "liquidation_near",
"severity": "emergency",
"symbol": symbol,
"detail": f"距清算仅{dist_pct:.1f}%! 建议立即减仓!",
})
elif dist_pct < 12:
issues.append({
"type": "liquidation_warning",
"severity": "high",
"symbol": symbol,
"detail": f"距清算{dist_pct:.1f}%",
})
elif dist_pct < 20:
issues.append({
"type": "liquidation_caution",
"severity": "medium",
"symbol": symbol,
"detail": f"距清算{dist_pct:.1f}%",
})
# 4. 检查币安有仓但本地没有(幽灵仓位)
local_symbols = {lp["symbol"] for lp in local_positions}
for symbol, bp in binance_positions.items():
if symbol not in local_symbols and symbol in [s for s in SYMBOLS]:
issues.append({
"type": "exchange_only",
"severity": "high",
"symbol": symbol,
"detail": f"币安有{bp['direction']}仓位但本地无记录",
})
# 记录对账结果
now_ms = int(time.time() * 1000)
result = {
"timestamp_ms": now_ms,
"local_count": len(local_positions),
"exchange_count": len(binance_positions),
"issues_count": len(issues),
"issues": issues,
"status": "ok" if len(issues) == 0 else "mismatch",
}
# 写对账结果到DB可选创建reconciliation_log表
if issues:
for issue in issues:
level = "🔴" if issue["severity"] in ("critical", "emergency") else "🟡"
logger.warning(f"{level} [{issue['symbol']}] {issue['type']}: {issue['detail']}")
else:
logger.info(f"✅ 对账正常 | 本地={len(local_positions)}仓 | 币安={len(binance_positions)}")
return result
# ============ TP1触发监控 ============
async def check_tp1_triggers(session, conn):
"""检查TP1是否触发触发后移SL到保本价"""
local_positions = get_local_positions(conn)
cur = conn.cursor()
for lp in local_positions:
if lp["tp1_hit"]:
continue # 已处理
symbol = lp["symbol"]
# 查币安挂单看TP1是否已成交不在挂单列表里了
orders = await get_binance_open_orders(session, symbol)
close_side = "SELL" if lp["direction"] == "LONG" else "BUY"
tp1_found = False
for order in orders:
if (order.get("type") == "TAKE_PROFIT_MARKET"
and order.get("side") == close_side
and abs(float(order.get("stopPrice", 0)) - lp["tp1_price"]) < lp["tp1_price"] * 0.001):
tp1_found = True
break
if not tp1_found and lp["status"] == "active":
# TP1可能已触发验证仓位是否减半
bp = (await get_binance_positions(session)).get(symbol)
if bp and abs(bp["amount"]) < abs(float(lp.get("binance_order_id", "0") or "0")):
# 确认TP1触发
logger.info(f"[{symbol}] ✅ TP1触发! 移SL到保本价")
# 取消旧SL
await binance_request(session, "DELETE", "/fapi/v1/allOpenOrders", {"symbol": symbol})
# 计算保本SL
if lp["direction"] == "LONG":
new_sl = lp["entry_price"] * 1.0005
else:
new_sl = lp["entry_price"] * 0.9995
# 挂新SL半仓
prec = SYMBOL_PRECISION.get(symbol, {"qty": 3, "price": 2})
ok, _ = await rehang_sl(session, symbol, lp["direction"], new_sl, bp["amount"])
# 重新挂TP2半仓
tp2_price = lp["tp2_price"]
qty_str = f"{abs(bp['amount']):.{prec['qty']}f}"
price_str = f"{tp2_price:.{prec['price']}f}"
await binance_request(session, "POST", "/fapi/v1/order", {
"symbol": symbol, "side": close_side, "type": "TAKE_PROFIT_MARKET",
"stopPrice": price_str, "quantity": qty_str, "reduceOnly": "true",
})
# 更新DB
cur.execute("""
UPDATE live_trades SET tp1_hit=TRUE, sl_price=%s, status='tp1_hit'
WHERE id=%s
""", (new_sl, lp["id"]))
conn.commit()
logger.info(f"[{symbol}] 🛡 SL移至保本 {new_sl:.4f}, TP2={tp2_price:.4f}")
# ============ 平仓检测 ============
async def check_closed_positions(session, conn):
"""检测已平仓的交易更新DB"""
local_positions = get_local_positions(conn)
binance_positions = await get_binance_positions(session)
cur = conn.cursor()
for lp in local_positions:
symbol = lp["symbol"]
bp = binance_positions.get(symbol)
# 币安无持仓但本地还active → 已平仓
if not bp:
logger.info(f"[{symbol}] 📝 检测到平仓,查询成交记录...")
# 查最近成交确定平仓价
# 简化:用当前标记价做近似
now_ms = int(time.time() * 1000)
rd = lp["risk_distance"] or 1
# 判断平仓类型(通过挂单是否还在来推断)
# 如果SL/TP都不在了说明触发了其中一个
status = "unknown"
exit_price = lp["entry_price"] # fallback
# 尝试从最近交易记录获取
trades_data, trades_status = await binance_request(session, "GET", "/fapi/v1/userTrades", {
"symbol": symbol, "limit": 5
})
if trades_status == 200 and isinstance(trades_data, list) and trades_data:
last_trade = trades_data[-1]
exit_price = float(last_trade.get("price", exit_price))
# 计算pnl
if lp["direction"] == "LONG":
raw_pnl_r = (exit_price - lp["entry_price"]) / rd
else:
raw_pnl_r = (lp["entry_price"] - exit_price) / rd
if lp["tp1_hit"]:
tp1_r = abs(lp["tp1_price"] - lp["entry_price"]) / rd
pnl_r = 0.5 * tp1_r + 0.5 * raw_pnl_r
else:
pnl_r = raw_pnl_r
# 扣手续费
fee_r = 0.001 * lp["entry_price"] / rd
pnl_r -= fee_r
# 判断状态
if pnl_r > 0.5:
status = "tp"
elif pnl_r < -0.5:
status = "sl"
elif lp["tp1_hit"] and pnl_r >= -0.1:
status = "sl_be"
else:
status = "closed"
cur.execute("""
UPDATE live_trades SET status=%s, exit_price=%s, exit_ts=%s, pnl_r=%s
WHERE id=%s
""", (status, exit_price, now_ms, round(pnl_r, 4), lp["id"]))
conn.commit()
logger.info(f"[{symbol}] 📝 平仓记录: {status} | exit={exit_price:.4f} | pnl={pnl_r:+.2f}R")
# ============ 主循环 ============
async def main():
logger.info("=" * 60)
logger.info(f"🔄 Position Sync 启动 | 环境={TRADE_ENV} | 间隔={CHECK_INTERVAL}")
logger.info("=" * 60)
load_api_keys()
conn = psycopg2.connect(**DB_CONFIG)
async with aiohttp.ClientSession() as session:
while True:
try:
# 1. 对账
result = await reconcile(session, conn)
# 2. 检查TP1触发
await check_tp1_triggers(session, conn)
# 3. 检查已平仓
await check_closed_positions(session, conn)
await asyncio.sleep(CHECK_INTERVAL)
except KeyboardInterrupt:
break
except Exception as e:
logger.error(f"❌ 对账异常: {e}", exc_info=True)
await asyncio.sleep(10)
conn.close()
logger.info("Position Sync 已停止")
if __name__ == "__main__":
asyncio.run(main())