arbitrage-engine/backend/live_executor.py
root fe754cf628 feat: live_executor开仓前检查risk_guard状态+紧急指令
- 读/tmp/risk_guard_state.json: block_new_entries=true → 拒绝开仓
- 读/tmp/risk_guard_state.json: reduce_only=true → 拒绝开仓
- 读/tmp/risk_guard_emergency.json: close_all/block_new → 拒绝开仓
- risk_guard未启动(文件不存在) → 允许交易(不阻塞)
2026-03-02 10:05:45 +00:00

579 lines
20 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
Live Executor - 实盘交易执行模块
监听PG NOTIFY接收新信号调币安API执行交易
架构:
signal_engine.py → NOTIFY new_signal → live_executor.py → Binance API
不影响模拟盘任何进程。
"""
import os
import sys
import json
import time
import logging
import asyncio
import hashlib
import hmac
from urllib.parse import urlencode
import psycopg2
import psycopg2.extensions
import aiohttp
# ============ 配置 ============
# 环境testnet / production
TRADE_ENV = os.getenv("TRADE_ENV", "testnet")
# 币安API端点
BINANCE_ENDPOINTS = {
"testnet": "https://testnet.binancefuture.com",
"production": "https://fapi.binance.com",
}
BASE_URL = BINANCE_ENDPOINTS[TRADE_ENV]
# 数据库
DB_CONFIG = {
"host": os.getenv("DB_HOST", "10.106.0.3"),
"port": int(os.getenv("DB_PORT", "5432")),
"dbname": os.getenv("DB_NAME", "arb_engine"),
"user": os.getenv("DB_USER", "arb"),
"password": os.getenv("DB_PASSWORD", "arb_engine_2026"),
}
# 策略
ENABLED_STRATEGIES = json.loads(os.getenv("LIVE_STRATEGIES", '["v52_8signals"]'))
# 风险参数
RISK_PER_TRADE_USD = float(os.getenv("RISK_PER_TRADE_USD", "2")) # 测试网$2=1R
MAX_POSITIONS = int(os.getenv("MAX_POSITIONS", "4"))
FEE_RATE = 0.0005 # Taker 0.05%
# 币种精度(币安要求)
SYMBOL_PRECISION = {
"BTCUSDT": {"qty": 3, "price": 1, "min_notional": 100},
"ETHUSDT": {"qty": 3, "price": 2, "min_notional": 20},
"XRPUSDT": {"qty": 0, "price": 4, "min_notional": 5},
"SOLUSDT": {"qty": 2, "price": 2, "min_notional": 5},
}
# 日志
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
)
logger = logging.getLogger("live-executor")
# ============ API Key管理 ============
_api_key = None
_secret_key = None
def load_api_keys():
"""从GCP Secret Manager或环境变量加载API Key"""
global _api_key, _secret_key
# 优先环境变量(本地开发/测试用)
_api_key = os.getenv("BINANCE_API_KEY")
_secret_key = os.getenv("BINANCE_SECRET_KEY")
if _api_key and _secret_key:
logger.info(f"API Key loaded from env (first 10: {_api_key[:10]}...)")
return
# 从GCP Secret Manager读取
try:
from google.cloud import secretmanager
client = secretmanager.SecretManagerServiceClient()
project = os.getenv("GCP_PROJECT", "gen-lang-client-0835616737")
prefix = "binance-testnet" if TRADE_ENV == "testnet" else "binance-live"
api_key_name = f"projects/{project}/secrets/{prefix}-api-key/versions/latest"
secret_key_name = f"projects/{project}/secrets/{prefix}-secret-key/versions/latest"
_api_key = client.access_secret_version(name=api_key_name).payload.data.decode()
_secret_key = client.access_secret_version(name=secret_key_name).payload.data.decode()
logger.info(f"API Key loaded from GCP Secret Manager ({prefix}, first 10: {_api_key[:10]}...)")
except Exception as e:
logger.error(f"Failed to load API keys: {e}")
sys.exit(1)
# ============ 币安API工具 ============
def sign_params(params: dict) -> dict:
"""HMAC SHA256签名"""
params["timestamp"] = int(time.time() * 1000)
query_string = urlencode(params)
signature = hmac.new(
_secret_key.encode(), query_string.encode(), hashlib.sha256
).hexdigest()
params["signature"] = signature
return params
async def binance_request(session: aiohttp.ClientSession, method: str, path: str, params: dict = None, signed: bool = True):
"""发送币安API请求"""
url = f"{BASE_URL}{path}"
headers = {"X-MBX-APIKEY": _api_key}
if params is None:
params = {}
if signed:
params = sign_params(params)
try:
if method == "GET":
async with session.get(url, params=params, headers=headers) as resp:
data = await resp.json()
if resp.status != 200:
logger.error(f"Binance API error: {resp.status} {data}")
return data, resp.status
elif method == "POST":
async with session.post(url, params=params, headers=headers) as resp:
data = await resp.json()
if resp.status != 200:
logger.error(f"Binance API error: {resp.status} {data}")
return data, resp.status
elif method == "DELETE":
async with session.delete(url, params=params, headers=headers) as resp:
data = await resp.json()
if resp.status != 200:
logger.error(f"Binance API error: {resp.status} {data}")
return data, resp.status
except Exception as e:
logger.error(f"Binance request failed: {e}")
return {"error": str(e)}, 500
# ============ 交易执行 ============
async def set_leverage_and_margin(session: aiohttp.ClientSession, symbol: str, leverage: int = 20):
"""设置杠杆和逐仓模式"""
# 设置逐仓
await binance_request(session, "POST", "/fapi/v1/marginType", {
"symbol": symbol, "marginType": "ISOLATED"
})
# 设置杠杆
await binance_request(session, "POST", "/fapi/v1/leverage", {
"symbol": symbol, "leverage": leverage
})
logger.info(f"[{symbol}] 设置逐仓 + {leverage}x杠杆")
async def place_market_order(session: aiohttp.ClientSession, symbol: str, side: str, quantity: float):
"""市价开仓"""
prec = SYMBOL_PRECISION.get(symbol, {"qty": 3})
qty_str = f"{quantity:.{prec['qty']}f}"
params = {
"symbol": symbol,
"side": side,
"type": "MARKET",
"quantity": qty_str,
}
t_submit = time.time() * 1000
data, status = await binance_request(session, "POST", "/fapi/v1/order", params)
t_ack = time.time() * 1000
if status == 200:
logger.info(f"[{symbol}] ✅ 市价{side} {qty_str}个 | orderId={data.get('orderId')} | 延迟={t_ack-t_submit:.0f}ms")
return data, status, t_submit, t_ack
async def place_stop_order(session: aiohttp.ClientSession, symbol: str, side: str, stop_price: float, quantity: float, order_type: str = "STOP_MARKET"):
"""挂止损/止盈单"""
prec = SYMBOL_PRECISION.get(symbol, {"qty": 3, "price": 2})
qty_str = f"{quantity:.{prec['qty']}f}"
price_str = f"{stop_price:.{prec['price']}f}"
params = {
"symbol": symbol,
"side": side,
"type": order_type,
"stopPrice": price_str,
"quantity": qty_str,
"reduceOnly": "true",
}
data, status = await binance_request(session, "POST", "/fapi/v1/order", params)
if status == 200:
logger.info(f"[{symbol}] 📌 挂{order_type} {side} @ {price_str} qty={qty_str} | orderId={data.get('orderId')}")
return data, status
async def cancel_all_orders(session: aiohttp.ClientSession, symbol: str):
"""取消某币种所有挂单"""
data, status = await binance_request(session, "DELETE", "/fapi/v1/allOpenOrders", {"symbol": symbol})
if status == 200:
logger.info(f"[{symbol}] 🗑 已取消所有挂单")
return data, status
async def get_position(session: aiohttp.ClientSession, symbol: str):
"""查询当前持仓"""
data, status = await binance_request(session, "GET", "/fapi/v2/positionRisk", {"symbol": symbol})
if status == 200 and isinstance(data, list):
for pos in data:
if pos.get("symbol") == symbol and float(pos.get("positionAmt", 0)) != 0:
return pos
return None
async def get_account_balance(session: aiohttp.ClientSession):
"""查询账户余额"""
data, status = await binance_request(session, "GET", "/fapi/v2/balance")
if status == 200 and isinstance(data, list):
for asset in data:
if asset.get("asset") == "USDT":
return float(asset.get("availableBalance", 0))
return 0
# ============ 核心:开仓流程 ============
async def execute_entry(session: aiohttp.ClientSession, signal: dict, db_conn):
"""
完整开仓流程:
1. 检查余额和持仓数
2. 计算仓位大小
3. 市价开仓
4. 挂SL + TP1 + TP2保护单
5. 写live_trades表
6. 记录延迟指标
"""
symbol = signal["symbol"]
direction = signal["direction"]
score = signal["score"]
tier = signal.get("tier", "standard")
strategy = signal["strategy"]
risk_distance = signal["risk_distance"]
sl_price = signal["sl_price"]
tp1_price = signal["tp1_price"]
tp2_price = signal["tp2_price"]
atr = signal.get("atr", 0)
signal_ts = signal["signal_ts"]
signal_id = signal.get("signal_id")
factors = signal.get("factors")
t_signal = signal_ts # 信号时间戳(ms)
# 0. 检查风控状态读risk_guard写的状态文件
try:
with open("/tmp/risk_guard_state.json") as f:
risk_state = json.load(f)
if risk_state.get("block_new_entries"):
logger.warning(f"[{symbol}] ❌ 风控禁止新开仓: {risk_state.get('circuit_break_reason', '未知原因')}")
return None
if risk_state.get("reduce_only"):
logger.warning(f"[{symbol}] ❌ 只减仓模式,拒绝开仓")
return None
except FileNotFoundError:
pass # risk_guard还没启动允许交易
except Exception as e:
logger.warning(f"[{symbol}] ⚠ 读取风控状态失败: {e},继续交易")
# 检查前端紧急操作指令
try:
with open("/tmp/risk_guard_emergency.json") as f:
emergency = json.load(f)
action = emergency.get("action")
if action in ("close_all", "block_new"):
logger.warning(f"[{symbol}] ❌ 紧急指令生效: {action},拒绝开仓")
return None
except FileNotFoundError:
pass
except Exception:
pass
# 1. 检查余额
balance = await get_account_balance(session)
if balance < RISK_PER_TRADE_USD * 2:
logger.warning(f"[{symbol}] ❌ 余额不足: ${balance:.2f} < ${RISK_PER_TRADE_USD * 2}")
return None
# 2. 检查持仓数
cur = db_conn.cursor()
cur.execute("SELECT count(*) FROM live_trades WHERE strategy=%s AND status='active'", (strategy,))
active_count = cur.fetchone()[0]
if active_count >= MAX_POSITIONS:
logger.warning(f"[{symbol}] ❌ 已达最大持仓数 {MAX_POSITIONS}")
return None
# 3. 检查是否已有同币种同方向持仓
cur.execute("SELECT id FROM live_trades WHERE symbol=%s AND strategy=%s AND status='active'", (symbol, strategy))
if cur.fetchone():
logger.info(f"[{symbol}] ⏭ 已有活跃持仓,跳过")
return None
# 4. 设置杠杆和逐仓
await set_leverage_and_margin(session, symbol)
# 5. 计算仓位
# position_size(个) = risk_usd / risk_distance
qty = RISK_PER_TRADE_USD / risk_distance
prec = SYMBOL_PRECISION.get(symbol, {"qty": 3})
qty = round(qty, prec["qty"])
# 检查最小名义值
# 需要当前价来估算
entry_side = "BUY" if direction == "LONG" else "SELL"
# 6. 市价开仓
t_before_order = time.time() * 1000
order_data, order_status, t_submit, t_ack = await place_market_order(session, symbol, entry_side, qty)
if order_status != 200:
logger.error(f"[{symbol}] ❌ 开仓失败: {order_data}")
return None
# 解析成交信息
order_id = str(order_data.get("orderId", ""))
fill_price = float(order_data.get("avgPrice", 0))
if fill_price == 0:
fill_price = float(order_data.get("price", 0))
t_fill = time.time() * 1000
signal_to_order_ms = int(t_submit - t_signal)
order_to_fill_ms = int(t_fill - t_submit)
# 计算滑点
signal_price = signal.get("signal_price", fill_price)
if signal_price > 0:
if direction == "LONG":
slippage_bps = (fill_price - signal_price) / signal_price * 10000
else:
slippage_bps = (signal_price - fill_price) / signal_price * 10000
else:
slippage_bps = 0
logger.info(f"[{symbol}] 📊 成交价={fill_price} | 信号价={signal_price} | 滑点={slippage_bps:.1f}bps | 信号→下单={signal_to_order_ms}ms | 下单→成交={order_to_fill_ms}ms")
# 7. 挂保护单SL + TP1半仓 + TP2半仓
close_side = "SELL" if direction == "LONG" else "BUY"
half_qty = round(qty / 2, prec["qty"])
other_half = round(qty - half_qty, prec["qty"])
# SL - 全仓
t_before_sl = time.time() * 1000
sl_data, sl_status = await place_stop_order(session, symbol, close_side, sl_price, qty, "STOP_MARKET")
t_after_sl = time.time() * 1000
protection_gap_ms = int(t_after_sl - t_fill)
if sl_status != 200:
logger.error(f"[{symbol}] ⚠️ SL挂单失败! 裸奔中! data={sl_data}")
# TODO: 自动补挂逻辑
# TP1 - 半仓
tp1_type = "TAKE_PROFIT_MARKET"
await place_stop_order(session, symbol, close_side, tp1_price, half_qty, tp1_type)
# TP2 - 半仓
await place_stop_order(session, symbol, close_side, tp2_price, other_half, tp1_type)
logger.info(f"[{symbol}] 🛡 保护单已挂 | SL={sl_price} TP1={tp1_price}(半仓) TP2={tp2_price}(半仓) | 裸奔={protection_gap_ms}ms")
# 8. 写DB
cur.execute("""
INSERT INTO live_trades (
symbol, strategy, direction, entry_price, entry_ts,
sl_price, tp1_price, tp2_price, score, tier, status,
risk_distance, atr_at_entry, score_factors, signal_id,
binance_order_id, fill_price, slippage_bps,
protection_gap_ms, signal_to_order_ms, order_to_fill_ms
) VALUES (
%s, %s, %s, %s, %s,
%s, %s, %s, %s, %s, 'active',
%s, %s, %s, %s,
%s, %s, %s,
%s, %s, %s
) RETURNING id
""", (
symbol, strategy, direction, fill_price, int(t_fill),
sl_price, tp1_price, tp2_price, score, tier,
risk_distance, atr, json.dumps(factors) if factors else None, signal_id,
order_id, fill_price, round(slippage_bps, 2),
protection_gap_ms, signal_to_order_ms, order_to_fill_ms,
))
trade_id = cur.fetchone()[0]
db_conn.commit()
logger.info(f"[{symbol}] ✅ 实盘开仓完成 | trade_id={trade_id} | {direction} @ {fill_price} | score={score} | 策略={strategy}")
return trade_id
# ============ 信号监听 ============
def get_db_connection():
"""获取DB连接"""
conn = psycopg2.connect(**DB_CONFIG)
conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)
return conn
def fetch_pending_signals(conn):
"""查询未处理的新信号"""
cur = conn.cursor()
# 查最近60秒内的新信号score>=阈值、策略匹配、未被live_executor处理过
strategies_str = ",".join(f"'{s}'" for s in ENABLED_STRATEGIES)
cur.execute(f"""
SELECT si.id, si.symbol, si.signal, si.score, si.ts, si.factors, si.strategy,
si.price
FROM signal_indicators si
WHERE si.signal IS NOT NULL
AND si.signal != ''
AND si.strategy IN ({strategies_str})
AND si.ts > extract(epoch from now()) * 1000 - 60000
AND NOT EXISTS (
SELECT 1 FROM live_trades lt
WHERE lt.signal_id = si.id AND lt.strategy = si.strategy
)
ORDER BY si.ts DESC
""")
rows = cur.fetchall()
signals = []
for row in rows:
factors = row[5]
if isinstance(factors, str):
try:
factors = json.loads(factors)
except:
factors = None
signals.append({
"signal_id": row[0],
"symbol": row[1],
"direction": row[2],
"score": row[3],
"signal_ts": row[4],
"factors": factors,
"strategy": row[6],
"signal_price": float(row[7]) if row[7] else 0,
})
return signals
def enrich_signal_with_trade_params(signal: dict, conn):
"""从策略配置计算TP/SL参数"""
symbol = signal["symbol"]
strategy = signal["strategy"]
direction = signal["direction"]
# 读策略配置JSON
config_map = {
"v51_baseline": {"sl_multiplier": 1.4, "tp1_multiplier": 1.05, "tp2_multiplier": 2.1},
"v52_8signals": {"sl_multiplier": 2.1, "tp1_multiplier": 1.4, "tp2_multiplier": 3.15},
}
cfg = config_map.get(strategy)
if not cfg:
logger.warning(f"[{symbol}] 未知策略: {strategy}")
return False
# 从signal_indicators读ATR和价格
cur = conn.cursor()
cur.execute("""
SELECT atr_5m, price FROM signal_indicators
WHERE id = %s
""", (signal["signal_id"],))
row = cur.fetchone()
if not row or not row[0]:
logger.warning(f"[{symbol}] signal_id={signal['signal_id']} 无ATR数据")
return False
atr = float(row[0])
price = float(row[1])
signal["atr"] = atr
signal["signal_price"] = price
# 计算SL/TP直接用ATR不乘0.7
risk_distance = cfg["sl_multiplier"] * atr
signal["risk_distance"] = risk_distance
if direction == "LONG":
signal["sl_price"] = price - cfg["sl_multiplier"] * atr
signal["tp1_price"] = price + cfg["tp1_multiplier"] * atr
signal["tp2_price"] = price + cfg["tp2_multiplier"] * atr
else: # SHORT
signal["sl_price"] = price + cfg["sl_multiplier"] * atr
signal["tp1_price"] = price - cfg["tp1_multiplier"] * atr
signal["tp2_price"] = price - cfg["tp2_multiplier"] * atr
return True
# ============ 主循环 ============
async def main():
"""主循环监听PG NOTIFY + 定时轮询fallback"""
logger.info("=" * 60)
logger.info(f"🚀 Live Executor 启动 | 环境={TRADE_ENV} | 策略={ENABLED_STRATEGIES}")
logger.info(f" 风险/笔=${RISK_PER_TRADE_USD} | 最大持仓={MAX_POSITIONS}")
logger.info(f" API端点={BASE_URL}")
logger.info("=" * 60)
load_api_keys()
# 测试API连通性
async with aiohttp.ClientSession() as http_session:
balance = await get_account_balance(http_session)
logger.info(f"💰 账户余额: ${balance:.2f} USDT")
if balance <= 0:
logger.error("❌ 无法获取余额或余额为0请检查API Key")
return
# DB连接用于LISTEN
listen_conn = get_db_connection()
cur = listen_conn.cursor()
cur.execute("LISTEN new_signal;")
logger.info("👂 已注册PG LISTEN new_signal")
# 工作DB连接
work_conn = psycopg2.connect(**DB_CONFIG)
async with aiohttp.ClientSession() as http_session:
while True:
try:
# 检查PG NOTIFY非阻塞超时1秒
if listen_conn.poll() == psycopg2.extensions.POLL_OK:
while listen_conn.notifies:
notify = listen_conn.notifies.pop(0)
logger.info(f"📡 收到NOTIFY: {notify.payload}")
# 获取待处理信号NOTIFY + 轮询双保险)
signals = fetch_pending_signals(work_conn)
for sig in signals:
# 补充TP/SL参数
if not enrich_signal_with_trade_params(sig, work_conn):
continue
logger.info(f"[{sig['symbol']}] 🎯 新信号: {sig['direction']} score={sig['score']} strategy={sig['strategy']}")
# 执行开仓
trade_id = await execute_entry(http_session, sig, work_conn)
if trade_id:
logger.info(f"[{sig['symbol']}] ✅ trade_id={trade_id} 开仓成功")
await asyncio.sleep(1) # 1秒轮询作为fallback
except KeyboardInterrupt:
logger.info("🛑 收到退出信号")
break
except Exception as e:
logger.error(f"❌ 主循环异常: {e}", exc_info=True)
await asyncio.sleep(5)
listen_conn.close()
work_conn.close()
logger.info("Live Executor 已停止")
if __name__ == "__main__":
asyncio.run(main())