feat: live_executor.py + signal_engine NOTIFY + live_trades table

- live_executor.py: PG LISTEN/NOTIFY接收信号, 币安API开仓+挂保护单
- signal_engine: 有信号时发NOTIFY new_signal
- live_trades表: 含binance_order_id, slippage_bps, protection_gap_ms等实盘字段
- 支持GCP Secret Manager + env fallback读API Key
- 支持testnet/production环境切换
This commit is contained in:
root 2026-03-02 08:55:36 +00:00
parent a7dec3fe14
commit 21970038df
2 changed files with 553 additions and 0 deletions

550
backend/live_executor.py Normal file
View File

@ -0,0 +1,550 @@
#!/usr/bin/env python3
"""
Live Executor - 实盘交易执行模块
监听PG NOTIFY接收新信号调币安API执行交易
架构
signal_engine.py NOTIFY new_signal live_executor.py Binance API
不影响模拟盘任何进程
"""
import os
import sys
import json
import time
import logging
import asyncio
import hashlib
import hmac
from urllib.parse import urlencode
import psycopg2
import psycopg2.extensions
import aiohttp
# ============ 配置 ============
# 环境testnet / production
TRADE_ENV = os.getenv("TRADE_ENV", "testnet")
# 币安API端点
BINANCE_ENDPOINTS = {
"testnet": "https://testnet.binancefuture.com",
"production": "https://fapi.binance.com",
}
BASE_URL = BINANCE_ENDPOINTS[TRADE_ENV]
# 数据库
DB_CONFIG = {
"host": os.getenv("DB_HOST", "10.106.0.3"),
"port": int(os.getenv("DB_PORT", "5432")),
"dbname": os.getenv("DB_NAME", "arb_engine"),
"user": os.getenv("DB_USER", "arb"),
"password": os.getenv("DB_PASSWORD", "arb_engine_2026"),
}
# 策略
ENABLED_STRATEGIES = json.loads(os.getenv("LIVE_STRATEGIES", '["v52_8signals"]'))
# 风险参数
RISK_PER_TRADE_USD = float(os.getenv("RISK_PER_TRADE_USD", "2")) # 测试网$2=1R
MAX_POSITIONS = int(os.getenv("MAX_POSITIONS", "4"))
FEE_RATE = 0.0005 # Taker 0.05%
# 币种精度(币安要求)
SYMBOL_PRECISION = {
"BTCUSDT": {"qty": 3, "price": 1, "min_notional": 100},
"ETHUSDT": {"qty": 3, "price": 2, "min_notional": 20},
"XRPUSDT": {"qty": 0, "price": 4, "min_notional": 5},
"SOLUSDT": {"qty": 2, "price": 2, "min_notional": 5},
}
# 日志
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
)
logger = logging.getLogger("live-executor")
# ============ API Key管理 ============
_api_key = None
_secret_key = None
def load_api_keys():
"""从GCP Secret Manager或环境变量加载API Key"""
global _api_key, _secret_key
# 优先环境变量(本地开发/测试用)
_api_key = os.getenv("BINANCE_API_KEY")
_secret_key = os.getenv("BINANCE_SECRET_KEY")
if _api_key and _secret_key:
logger.info(f"API Key loaded from env (first 10: {_api_key[:10]}...)")
return
# 从GCP Secret Manager读取
try:
from google.cloud import secretmanager
client = secretmanager.SecretManagerServiceClient()
project = os.getenv("GCP_PROJECT", "gen-lang-client-0835616737")
prefix = "binance-testnet" if TRADE_ENV == "testnet" else "binance-live"
api_key_name = f"projects/{project}/secrets/{prefix}-api-key/versions/latest"
secret_key_name = f"projects/{project}/secrets/{prefix}-secret-key/versions/latest"
_api_key = client.access_secret_version(name=api_key_name).payload.data.decode()
_secret_key = client.access_secret_version(name=secret_key_name).payload.data.decode()
logger.info(f"API Key loaded from GCP Secret Manager ({prefix}, first 10: {_api_key[:10]}...)")
except Exception as e:
logger.error(f"Failed to load API keys: {e}")
sys.exit(1)
# ============ 币安API工具 ============
def sign_params(params: dict) -> dict:
"""HMAC SHA256签名"""
params["timestamp"] = int(time.time() * 1000)
query_string = urlencode(params)
signature = hmac.new(
_secret_key.encode(), query_string.encode(), hashlib.sha256
).hexdigest()
params["signature"] = signature
return params
async def binance_request(session: aiohttp.ClientSession, method: str, path: str, params: dict = None, signed: bool = True):
"""发送币安API请求"""
url = f"{BASE_URL}{path}"
headers = {"X-MBX-APIKEY": _api_key}
if params is None:
params = {}
if signed:
params = sign_params(params)
try:
if method == "GET":
async with session.get(url, params=params, headers=headers) as resp:
data = await resp.json()
if resp.status != 200:
logger.error(f"Binance API error: {resp.status} {data}")
return data, resp.status
elif method == "POST":
async with session.post(url, params=params, headers=headers) as resp:
data = await resp.json()
if resp.status != 200:
logger.error(f"Binance API error: {resp.status} {data}")
return data, resp.status
elif method == "DELETE":
async with session.delete(url, params=params, headers=headers) as resp:
data = await resp.json()
if resp.status != 200:
logger.error(f"Binance API error: {resp.status} {data}")
return data, resp.status
except Exception as e:
logger.error(f"Binance request failed: {e}")
return {"error": str(e)}, 500
# ============ 交易执行 ============
async def set_leverage_and_margin(session: aiohttp.ClientSession, symbol: str, leverage: int = 20):
"""设置杠杆和逐仓模式"""
# 设置逐仓
await binance_request(session, "POST", "/fapi/v1/marginType", {
"symbol": symbol, "marginType": "ISOLATED"
})
# 设置杠杆
await binance_request(session, "POST", "/fapi/v1/leverage", {
"symbol": symbol, "leverage": leverage
})
logger.info(f"[{symbol}] 设置逐仓 + {leverage}x杠杆")
async def place_market_order(session: aiohttp.ClientSession, symbol: str, side: str, quantity: float):
"""市价开仓"""
prec = SYMBOL_PRECISION.get(symbol, {"qty": 3})
qty_str = f"{quantity:.{prec['qty']}f}"
params = {
"symbol": symbol,
"side": side,
"type": "MARKET",
"quantity": qty_str,
}
t_submit = time.time() * 1000
data, status = await binance_request(session, "POST", "/fapi/v1/order", params)
t_ack = time.time() * 1000
if status == 200:
logger.info(f"[{symbol}] ✅ 市价{side} {qty_str}个 | orderId={data.get('orderId')} | 延迟={t_ack-t_submit:.0f}ms")
return data, status, t_submit, t_ack
async def place_stop_order(session: aiohttp.ClientSession, symbol: str, side: str, stop_price: float, quantity: float, order_type: str = "STOP_MARKET"):
"""挂止损/止盈单"""
prec = SYMBOL_PRECISION.get(symbol, {"qty": 3, "price": 2})
qty_str = f"{quantity:.{prec['qty']}f}"
price_str = f"{stop_price:.{prec['price']}f}"
params = {
"symbol": symbol,
"side": side,
"type": order_type,
"stopPrice": price_str,
"quantity": qty_str,
"reduceOnly": "true",
}
data, status = await binance_request(session, "POST", "/fapi/v1/order", params)
if status == 200:
logger.info(f"[{symbol}] 📌 挂{order_type} {side} @ {price_str} qty={qty_str} | orderId={data.get('orderId')}")
return data, status
async def cancel_all_orders(session: aiohttp.ClientSession, symbol: str):
"""取消某币种所有挂单"""
data, status = await binance_request(session, "DELETE", "/fapi/v1/allOpenOrders", {"symbol": symbol})
if status == 200:
logger.info(f"[{symbol}] 🗑 已取消所有挂单")
return data, status
async def get_position(session: aiohttp.ClientSession, symbol: str):
"""查询当前持仓"""
data, status = await binance_request(session, "GET", "/fapi/v2/positionRisk", {"symbol": symbol})
if status == 200 and isinstance(data, list):
for pos in data:
if pos.get("symbol") == symbol and float(pos.get("positionAmt", 0)) != 0:
return pos
return None
async def get_account_balance(session: aiohttp.ClientSession):
"""查询账户余额"""
data, status = await binance_request(session, "GET", "/fapi/v2/balance")
if status == 200 and isinstance(data, list):
for asset in data:
if asset.get("asset") == "USDT":
return float(asset.get("availableBalance", 0))
return 0
# ============ 核心:开仓流程 ============
async def execute_entry(session: aiohttp.ClientSession, signal: dict, db_conn):
"""
完整开仓流程
1. 检查余额和持仓数
2. 计算仓位大小
3. 市价开仓
4. 挂SL + TP1 + TP2保护单
5. 写live_trades表
6. 记录延迟指标
"""
symbol = signal["symbol"]
direction = signal["direction"]
score = signal["score"]
tier = signal.get("tier", "standard")
strategy = signal["strategy"]
risk_distance = signal["risk_distance"]
sl_price = signal["sl_price"]
tp1_price = signal["tp1_price"]
tp2_price = signal["tp2_price"]
atr = signal.get("atr", 0)
signal_ts = signal["signal_ts"]
signal_id = signal.get("signal_id")
factors = signal.get("factors")
t_signal = signal_ts # 信号时间戳(ms)
# 1. 检查余额
balance = await get_account_balance(session)
if balance < RISK_PER_TRADE_USD * 2:
logger.warning(f"[{symbol}] ❌ 余额不足: ${balance:.2f} < ${RISK_PER_TRADE_USD * 2}")
return None
# 2. 检查持仓数
cur = db_conn.cursor()
cur.execute("SELECT count(*) FROM live_trades WHERE strategy=%s AND status='active'", (strategy,))
active_count = cur.fetchone()[0]
if active_count >= MAX_POSITIONS:
logger.warning(f"[{symbol}] ❌ 已达最大持仓数 {MAX_POSITIONS}")
return None
# 3. 检查是否已有同币种同方向持仓
cur.execute("SELECT id FROM live_trades WHERE symbol=%s AND strategy=%s AND status='active'", (symbol, strategy))
if cur.fetchone():
logger.info(f"[{symbol}] ⏭ 已有活跃持仓,跳过")
return None
# 4. 设置杠杆和逐仓
await set_leverage_and_margin(session, symbol)
# 5. 计算仓位
# position_size(个) = risk_usd / risk_distance
qty = RISK_PER_TRADE_USD / risk_distance
prec = SYMBOL_PRECISION.get(symbol, {"qty": 3})
qty = round(qty, prec["qty"])
# 检查最小名义值
# 需要当前价来估算
entry_side = "BUY" if direction == "LONG" else "SELL"
# 6. 市价开仓
t_before_order = time.time() * 1000
order_data, order_status, t_submit, t_ack = await place_market_order(session, symbol, entry_side, qty)
if order_status != 200:
logger.error(f"[{symbol}] ❌ 开仓失败: {order_data}")
return None
# 解析成交信息
order_id = str(order_data.get("orderId", ""))
fill_price = float(order_data.get("avgPrice", 0))
if fill_price == 0:
fill_price = float(order_data.get("price", 0))
t_fill = time.time() * 1000
signal_to_order_ms = int(t_submit - t_signal)
order_to_fill_ms = int(t_fill - t_submit)
# 计算滑点
signal_price = signal.get("signal_price", fill_price)
if signal_price > 0:
if direction == "LONG":
slippage_bps = (fill_price - signal_price) / signal_price * 10000
else:
slippage_bps = (signal_price - fill_price) / signal_price * 10000
else:
slippage_bps = 0
logger.info(f"[{symbol}] 📊 成交价={fill_price} | 信号价={signal_price} | 滑点={slippage_bps:.1f}bps | 信号→下单={signal_to_order_ms}ms | 下单→成交={order_to_fill_ms}ms")
# 7. 挂保护单SL + TP1半仓 + TP2半仓
close_side = "SELL" if direction == "LONG" else "BUY"
half_qty = round(qty / 2, prec["qty"])
other_half = round(qty - half_qty, prec["qty"])
# SL - 全仓
t_before_sl = time.time() * 1000
sl_data, sl_status = await place_stop_order(session, symbol, close_side, sl_price, qty, "STOP_MARKET")
t_after_sl = time.time() * 1000
protection_gap_ms = int(t_after_sl - t_fill)
if sl_status != 200:
logger.error(f"[{symbol}] ⚠️ SL挂单失败! 裸奔中! data={sl_data}")
# TODO: 自动补挂逻辑
# TP1 - 半仓
tp1_type = "TAKE_PROFIT_MARKET"
await place_stop_order(session, symbol, close_side, tp1_price, half_qty, tp1_type)
# TP2 - 半仓
await place_stop_order(session, symbol, close_side, tp2_price, other_half, tp1_type)
logger.info(f"[{symbol}] 🛡 保护单已挂 | SL={sl_price} TP1={tp1_price}(半仓) TP2={tp2_price}(半仓) | 裸奔={protection_gap_ms}ms")
# 8. 写DB
cur.execute("""
INSERT INTO live_trades (
symbol, strategy, direction, entry_price, entry_ts,
sl_price, tp1_price, tp2_price, score, tier, status,
risk_distance, atr_at_entry, score_factors, signal_id,
binance_order_id, fill_price, slippage_bps,
protection_gap_ms, signal_to_order_ms, order_to_fill_ms
) VALUES (
%s, %s, %s, %s, %s,
%s, %s, %s, %s, %s, 'active',
%s, %s, %s, %s,
%s, %s, %s,
%s, %s, %s
) RETURNING id
""", (
symbol, strategy, direction, fill_price, int(t_fill),
sl_price, tp1_price, tp2_price, score, tier,
risk_distance, atr, json.dumps(factors) if factors else None, signal_id,
order_id, fill_price, round(slippage_bps, 2),
protection_gap_ms, signal_to_order_ms, order_to_fill_ms,
))
trade_id = cur.fetchone()[0]
db_conn.commit()
logger.info(f"[{symbol}] ✅ 实盘开仓完成 | trade_id={trade_id} | {direction} @ {fill_price} | score={score} | 策略={strategy}")
return trade_id
# ============ 信号监听 ============
def get_db_connection():
"""获取DB连接"""
conn = psycopg2.connect(**DB_CONFIG)
conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)
return conn
def fetch_pending_signals(conn):
"""查询未处理的新信号"""
cur = conn.cursor()
# 查最近60秒内的新信号score>=阈值、策略匹配、未被live_executor处理过
strategies_str = ",".join(f"'{s}'" for s in ENABLED_STRATEGIES)
cur.execute(f"""
SELECT si.id, si.symbol, si.signal, si.score, si.ts, si.factors, si.strategy,
si.price
FROM signal_indicators si
WHERE si.signal IS NOT NULL
AND si.signal != ''
AND si.strategy IN ({strategies_str})
AND si.ts > extract(epoch from now()) * 1000 - 60000
AND NOT EXISTS (
SELECT 1 FROM live_trades lt
WHERE lt.signal_id = si.id AND lt.strategy = si.strategy
)
ORDER BY si.ts DESC
""")
rows = cur.fetchall()
signals = []
for row in rows:
factors = row[5]
if isinstance(factors, str):
try:
factors = json.loads(factors)
except:
factors = None
signals.append({
"signal_id": row[0],
"symbol": row[1],
"direction": row[2],
"score": row[3],
"signal_ts": row[4],
"factors": factors,
"strategy": row[6],
"signal_price": float(row[7]) if row[7] else 0,
})
return signals
def enrich_signal_with_trade_params(signal: dict, conn):
"""从策略配置计算TP/SL参数"""
symbol = signal["symbol"]
strategy = signal["strategy"]
direction = signal["direction"]
# 读策略配置JSON
config_map = {
"v51_baseline": {"sl_multiplier": 1.4, "tp1_multiplier": 1.05, "tp2_multiplier": 2.1},
"v52_8signals": {"sl_multiplier": 2.1, "tp1_multiplier": 1.4, "tp2_multiplier": 3.15},
}
cfg = config_map.get(strategy)
if not cfg:
logger.warning(f"[{symbol}] 未知策略: {strategy}")
return False
# 从signal_indicators读ATR和价格
cur = conn.cursor()
cur.execute("""
SELECT atr_5m, price FROM signal_indicators
WHERE id = %s
""", (signal["signal_id"],))
row = cur.fetchone()
if not row or not row[0]:
logger.warning(f"[{symbol}] signal_id={signal['signal_id']} 无ATR数据")
return False
atr = float(row[0])
price = float(row[1])
signal["atr"] = atr
signal["signal_price"] = price
# 计算SL/TP直接用ATR不乘0.7
risk_distance = cfg["sl_multiplier"] * atr
signal["risk_distance"] = risk_distance
if direction == "LONG":
signal["sl_price"] = price - cfg["sl_multiplier"] * atr
signal["tp1_price"] = price + cfg["tp1_multiplier"] * atr
signal["tp2_price"] = price + cfg["tp2_multiplier"] * atr
else: # SHORT
signal["sl_price"] = price + cfg["sl_multiplier"] * atr
signal["tp1_price"] = price - cfg["tp1_multiplier"] * atr
signal["tp2_price"] = price - cfg["tp2_multiplier"] * atr
return True
# ============ 主循环 ============
async def main():
"""主循环监听PG NOTIFY + 定时轮询fallback"""
logger.info("=" * 60)
logger.info(f"🚀 Live Executor 启动 | 环境={TRADE_ENV} | 策略={ENABLED_STRATEGIES}")
logger.info(f" 风险/笔=${RISK_PER_TRADE_USD} | 最大持仓={MAX_POSITIONS}")
logger.info(f" API端点={BASE_URL}")
logger.info("=" * 60)
load_api_keys()
# 测试API连通性
async with aiohttp.ClientSession() as http_session:
balance = await get_account_balance(http_session)
logger.info(f"💰 账户余额: ${balance:.2f} USDT")
if balance <= 0:
logger.error("❌ 无法获取余额或余额为0请检查API Key")
return
# DB连接用于LISTEN
listen_conn = get_db_connection()
cur = listen_conn.cursor()
cur.execute("LISTEN new_signal;")
logger.info("👂 已注册PG LISTEN new_signal")
# 工作DB连接
work_conn = psycopg2.connect(**DB_CONFIG)
async with aiohttp.ClientSession() as http_session:
while True:
try:
# 检查PG NOTIFY非阻塞超时1秒
if listen_conn.poll() == psycopg2.extensions.POLL_OK:
while listen_conn.notifies:
notify = listen_conn.notifies.pop(0)
logger.info(f"📡 收到NOTIFY: {notify.payload}")
# 获取待处理信号NOTIFY + 轮询双保险)
signals = fetch_pending_signals(work_conn)
for sig in signals:
# 补充TP/SL参数
if not enrich_signal_with_trade_params(sig, work_conn):
continue
logger.info(f"[{sig['symbol']}] 🎯 新信号: {sig['direction']} score={sig['score']} strategy={sig['strategy']}")
# 执行开仓
trade_id = await execute_entry(http_session, sig, work_conn)
if trade_id:
logger.info(f"[{sig['symbol']}] ✅ trade_id={trade_id} 开仓成功")
await asyncio.sleep(1) # 1秒轮询作为fallback
except KeyboardInterrupt:
logger.info("🛑 收到退出信号")
break
except Exception as e:
logger.error(f"❌ 主循环异常: {e}", exc_info=True)
await asyncio.sleep(5)
listen_conn.close()
work_conn.close()
logger.info("Live Executor 已停止")
if __name__ == "__main__":
asyncio.run(main())

View File

@ -697,6 +697,9 @@ def save_indicator(ts: int, symbol: str, result: dict, strategy: str = "v52_8sig
result["atr"], result["atr_pct"], result["vwap"], result["price"],
result["p95"], result["p99"], result["score"], result.get("signal"), factors_json)
)
# 有信号时通知live_executor
if result.get("signal"):
cur.execute("NOTIFY new_signal, %s", (f"{symbol}:{strategy}:{result['signal']}:{result['score']}",))
conn.commit()