arbitrage-engine/backend/live_executor.py
dev-worker 18506f2a44 fix: P1/P2/P3剩余6项全部修复
P1-3: 前端持仓USDT从config读riskUsd(不再硬编码*2)
P1-4: 平仓兜底不盲目取最后成交,无明确平仓记录则延后结算
P2-1: LISTEN连接断线自动重建+重新LISTEN
P2-2: 余额风控LOW_BALANCE自动恢复(余额回升则解除暂停)
P2-3: fetch_pending_signals改用asyncio.to_thread避免阻塞事件循环
P3-1: dashboard页面改用新auth体系(authFetch+useAuth+/api/auth/me)
2026-03-02 16:19:03 +00:00

701 lines
26 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
Live Executor - 实盘交易执行模块
监听PG NOTIFY接收新信号调币安API执行交易
架构:
signal_engine.py → NOTIFY new_signal → live_executor.py → Binance API
不影响模拟盘任何进程。
"""
import os
import sys
from pathlib import Path
try:
from dotenv import load_dotenv; load_dotenv(Path(__file__).parent / ".env")
except ImportError:
pass
import json
import time
import logging
import asyncio
import hashlib
import hmac
from urllib.parse import urlencode
import psycopg2
import psycopg2.extensions
import aiohttp
# ============ 配置 ============
# 环境testnet / production
TRADE_ENV = os.getenv("TRADE_ENV", "testnet")
# 币安API端点
BINANCE_ENDPOINTS = {
"testnet": "https://testnet.binancefuture.com",
"production": "https://fapi.binance.com",
}
BASE_URL = BINANCE_ENDPOINTS[TRADE_ENV]
# 数据库
_DB_PASSWORD = os.getenv("DB_PASSWORD", "arb_engine_2026" if TRADE_ENV == "testnet" else "")
if not _DB_PASSWORD:
print("FATAL: DB_PASSWORD 未设置(生产环境必须配置)", file=sys.stderr)
sys.exit(1)
DB_CONFIG = {
"host": os.getenv("DB_HOST", "10.106.0.3"),
"port": int(os.getenv("DB_PORT", "5432")),
"dbname": os.getenv("DB_NAME", "arb_engine"),
"user": os.getenv("DB_USER", "arb"),
"password": _DB_PASSWORD,
}
# 策略
ENABLED_STRATEGIES = json.loads(os.getenv("LIVE_STRATEGIES", '["v52_8signals"]'))
# 风险参数默认值会被DB live_config覆盖
RISK_PER_TRADE_USD = float(os.getenv("RISK_PER_TRADE_USD", "2"))
MAX_POSITIONS = int(os.getenv("MAX_POSITIONS", "4"))
FEE_RATE = 0.0005 # Taker 0.05%
_config_cache = {}
_config_cache_ts = 0
def reload_live_config(conn):
"""从live_config表读取配置每60秒刷新一次"""
global RISK_PER_TRADE_USD, MAX_POSITIONS, _config_cache, _config_cache_ts
now = time.time()
if now - _config_cache_ts < 60:
return
try:
cur = conn.cursor()
cur.execute("SELECT key, value FROM live_config")
for row in cur.fetchall():
_config_cache[row[0]] = row[1]
RISK_PER_TRADE_USD = float(_config_cache.get("risk_per_trade_usd", "2"))
MAX_POSITIONS = int(_config_cache.get("max_positions", "4"))
_config_cache_ts = now
logger.info(f"📋 配置刷新: 1R=${RISK_PER_TRADE_USD} | 最大持仓={MAX_POSITIONS}")
except Exception as e:
logger.warning(f"读取live_config失败: {e}")
# 币种精度(从共用配置导入)
from trade_config import SYMBOL_PRECISION
# 日志
from logging.handlers import RotatingFileHandler
_log_fmt = "%(asctime)s [%(levelname)s] %(name)s: %(message)s"
logging.basicConfig(level=logging.INFO, format=_log_fmt)
logger = logging.getLogger("live-executor")
_fh = RotatingFileHandler("logs/live_executor.log", maxBytes=10*1024*1024, backupCount=5)
_fh.setFormatter(logging.Formatter(_log_fmt))
logger.addHandler(_fh)
# ============ API Key管理 ============
_api_key = None
_secret_key = None
def load_api_keys():
"""从GCP Secret Manager或环境变量加载API Key"""
global _api_key, _secret_key
# 优先环境变量(本地开发/测试用)
_api_key = os.getenv("BINANCE_API_KEY")
_secret_key = os.getenv("BINANCE_SECRET_KEY")
if _api_key and _secret_key:
logger.info(f"API Key loaded from env (first 10: {_api_key[:10]}...)")
return
# 从GCP Secret Manager读取
try:
from google.cloud import secretmanager
client = secretmanager.SecretManagerServiceClient()
project = os.getenv("GCP_PROJECT", "gen-lang-client-0835616737")
prefix = "binance-testnet" if TRADE_ENV == "testnet" else "binance-live"
api_key_name = f"projects/{project}/secrets/{prefix}-api-key/versions/latest"
secret_key_name = f"projects/{project}/secrets/{prefix}-secret-key/versions/latest"
_api_key = client.access_secret_version(name=api_key_name).payload.data.decode()
_secret_key = client.access_secret_version(name=secret_key_name).payload.data.decode()
logger.info(f"API Key loaded from GCP Secret Manager ({prefix}, first 10: {_api_key[:10]}...)")
except Exception as e:
logger.error(f"Failed to load API keys: {e}")
sys.exit(1)
# ============ 币安API工具 ============
def sign_params(params: dict) -> dict:
"""HMAC SHA256签名"""
params["timestamp"] = int(time.time() * 1000)
query_string = urlencode(params)
signature = hmac.new(
_secret_key.encode(), query_string.encode(), hashlib.sha256
).hexdigest()
params["signature"] = signature
return params
async def binance_request(session: aiohttp.ClientSession, method: str, path: str, params: dict = None, signed: bool = True):
"""发送币安API请求"""
url = f"{BASE_URL}{path}"
headers = {"X-MBX-APIKEY": _api_key}
if params is None:
params = {}
if signed:
params = sign_params(params)
try:
if method == "GET":
async with session.get(url, params=params, headers=headers) as resp:
data = await resp.json()
if resp.status != 200:
logger.error(f"Binance API error: {resp.status} {data}")
return data, resp.status
elif method == "POST":
async with session.post(url, params=params, headers=headers) as resp:
data = await resp.json()
if resp.status != 200:
logger.error(f"Binance API error: {resp.status} {data}")
return data, resp.status
elif method == "DELETE":
async with session.delete(url, params=params, headers=headers) as resp:
data = await resp.json()
if resp.status != 200:
logger.error(f"Binance API error: {resp.status} {data}")
return data, resp.status
except Exception as e:
logger.error(f"Binance request failed: {e}")
return {"error": str(e)}, 500
# ============ 交易执行 ============
async def set_leverage_and_margin(session: aiohttp.ClientSession, symbol: str, leverage: int = 20):
"""设置杠杆和逐仓模式"""
# 设置逐仓
await binance_request(session, "POST", "/fapi/v1/marginType", {
"symbol": symbol, "marginType": "ISOLATED"
})
# 设置杠杆
await binance_request(session, "POST", "/fapi/v1/leverage", {
"symbol": symbol, "leverage": leverage
})
logger.info(f"[{symbol}] 设置逐仓 + {leverage}x杠杆")
async def place_market_order(session: aiohttp.ClientSession, symbol: str, side: str, quantity: float):
"""市价开仓"""
prec = SYMBOL_PRECISION.get(symbol, {"qty": 3})
qty_str = f"{quantity:.{prec['qty']}f}"
params = {
"symbol": symbol,
"side": side,
"type": "MARKET",
"quantity": qty_str,
}
t_submit = time.time() * 1000
data, status = await binance_request(session, "POST", "/fapi/v1/order", params)
t_ack = time.time() * 1000
if status == 200:
logger.info(f"[{symbol}] ✅ 市价{side} {qty_str}个 | orderId={data.get('orderId')} | 延迟={t_ack-t_submit:.0f}ms")
return data, status, t_submit, t_ack
async def place_stop_order(session: aiohttp.ClientSession, symbol: str, side: str, stop_price: float, quantity: float, order_type: str = "STOP_MARKET"):
"""挂止损/止盈单"""
prec = SYMBOL_PRECISION.get(symbol, {"qty": 3, "price": 2})
qty_str = f"{quantity:.{prec['qty']}f}"
price_str = f"{stop_price:.{prec['price']}f}"
params = {
"symbol": symbol,
"side": side,
"type": order_type,
"stopPrice": price_str,
"quantity": qty_str,
"reduceOnly": "true",
}
# 先尝试传统endpoint
data, status = await binance_request(session, "POST", "/fapi/v1/order", params)
# 如果不支持(-4120)降级到Algo Order API
if status == 400 and isinstance(data, dict) and data.get("code") == -4120:
logger.info(f"[{symbol}] 传统endpoint不支持{order_type}切换Algo Order API")
algo_params = {
"symbol": symbol,
"side": side,
"type": order_type,
"stopPrice": price_str,
"quantity": qty_str,
"reduceOnly": "true",
}
data, status = await binance_request(session, "POST", "/fapi/v1/algo/order", algo_params)
if status == 200:
logger.info(f"[{symbol}] 📌 挂{order_type} {side} @ {price_str} qty={qty_str} | orderId={data.get('orderId')}")
return data, status
async def cancel_all_orders(session: aiohttp.ClientSession, symbol: str):
"""取消某币种所有挂单"""
data, status = await binance_request(session, "DELETE", "/fapi/v1/allOpenOrders", {"symbol": symbol})
if status == 200:
logger.info(f"[{symbol}] 🗑 已取消所有挂单")
return data, status
async def get_position(session: aiohttp.ClientSession, symbol: str):
"""查询当前持仓"""
data, status = await binance_request(session, "GET", "/fapi/v2/positionRisk", {"symbol": symbol})
if status == 200 and isinstance(data, list):
for pos in data:
if pos.get("symbol") == symbol and float(pos.get("positionAmt", 0)) != 0:
return pos
return None
async def get_account_balance(session: aiohttp.ClientSession):
"""查询账户余额"""
data, status = await binance_request(session, "GET", "/fapi/v2/balance")
if status == 200 and isinstance(data, list):
for asset in data:
if asset.get("asset") == "USDT":
return float(asset.get("availableBalance", 0))
return 0
# ============ 核心:开仓流程 ============
async def execute_entry(session: aiohttp.ClientSession, signal: dict, db_conn):
"""
完整开仓流程:
1. 检查余额和持仓数
2. 计算仓位大小
3. 市价开仓
4. 挂SL + TP1 + TP2保护单
5. 写live_trades表
6. 记录延迟指标
"""
symbol = signal["symbol"]
direction = signal["direction"]
score = signal["score"]
tier = signal.get("tier", "standard")
strategy = signal["strategy"]
risk_distance = signal["risk_distance"]
sl_price = signal["sl_price"]
tp1_price = signal["tp1_price"]
tp2_price = signal["tp2_price"]
atr = signal.get("atr", 0)
signal_ts = signal["signal_ts"]
signal_id = signal.get("signal_id")
factors = signal.get("factors")
t_signal = signal_ts # 信号时间戳(ms)
# 刷新配置每60秒从DB读一次
reload_live_config(db_conn)
# 0. 信号新鲜度检查超过2秒弃仓
SIGNAL_MAX_AGE_MS = 2000
signal_age_ms = time.time() * 1000 - t_signal
if signal_age_ms > SIGNAL_MAX_AGE_MS:
logger.warning(f"[{symbol}] ❌ 信号过期: {signal_age_ms:.0f}ms > {SIGNAL_MAX_AGE_MS}ms弃仓")
_log_event(db_conn, "warn", "trade",
f"信号过期弃仓 {direction} {symbol} | age={signal_age_ms:.0f}ms | score={score}",
symbol, {"signal_age_ms": round(signal_age_ms), "score": score})
return None
# 0.5 检查风控状态读risk_guard写的状态文件
try:
with open("/tmp/risk_guard_state.json") as f:
risk_state = json.load(f)
if risk_state.get("block_new_entries"):
logger.warning(f"[{symbol}] ❌ 风控禁止新开仓: {risk_state.get('circuit_break_reason', '未知原因')}")
return None
if risk_state.get("reduce_only"):
logger.warning(f"[{symbol}] ❌ 只减仓模式,拒绝开仓")
return None
except FileNotFoundError:
pass # risk_guard还没启动允许交易
except Exception as e:
logger.warning(f"[{symbol}] ⚠ 读取风控状态失败: {e},继续交易")
# 检查前端紧急操作指令
try:
with open("/tmp/risk_guard_emergency.json") as f:
emergency = json.load(f)
action = emergency.get("action")
if action in ("close_all", "block_new"):
logger.warning(f"[{symbol}] ❌ 紧急指令生效: {action},拒绝开仓")
return None
except FileNotFoundError:
pass
except Exception:
pass
# 1. 检查余额
balance = await get_account_balance(session)
if balance < RISK_PER_TRADE_USD * 2:
logger.warning(f"[{symbol}] ❌ 余额不足: ${balance:.2f} < ${RISK_PER_TRADE_USD * 2}")
return None
# 2. 检查持仓数
cur = db_conn.cursor()
cur.execute("SELECT count(*) FROM live_trades WHERE strategy=%s AND status='active'", (strategy,))
active_count = cur.fetchone()[0]
if active_count >= MAX_POSITIONS:
logger.warning(f"[{symbol}] ❌ 已达最大持仓数 {MAX_POSITIONS}")
return None
# 3. 检查是否已有同币种持仓(不区分策略,币安单向模式下同币共享净仓位)
cur.execute("SELECT id, strategy FROM live_trades WHERE symbol=%s AND status IN ('active', 'tp1_hit')", (symbol,))
existing = cur.fetchone()
if existing:
logger.info(f"[{symbol}] ⏭ 已有活跃持仓(id={existing[0]}, strategy={existing[1]}),跳过")
return None
# 4. 设置杠杆和逐仓
await set_leverage_and_margin(session, symbol)
# 5. 计算仓位
# position_size(个) = risk_usd / risk_distance
qty = RISK_PER_TRADE_USD / risk_distance
prec = SYMBOL_PRECISION.get(symbol, {"qty": 3})
qty = round(qty, prec["qty"])
# 检查最小名义值
# 需要当前价来估算
entry_side = "BUY" if direction == "LONG" else "SELL"
# 6. 市价开仓
t_before_order = time.time() * 1000
order_data, order_status, t_submit, t_ack = await place_market_order(session, symbol, entry_side, qty)
if order_status != 200:
logger.error(f"[{symbol}] ❌ 开仓失败: {order_data}")
return None
# 解析成交信息
order_id = str(order_data.get("orderId", ""))
fill_price = float(order_data.get("avgPrice", 0))
if fill_price == 0:
fill_price = float(order_data.get("price", 0))
t_fill = time.time() * 1000
signal_to_order_ms = int(t_submit - t_signal)
order_to_fill_ms = int(t_fill - t_submit)
# 计算滑点
signal_price = signal.get("signal_price", fill_price)
if signal_price > 0:
if direction == "LONG":
slippage_bps = (fill_price - signal_price) / signal_price * 10000
else:
slippage_bps = (signal_price - fill_price) / signal_price * 10000
else:
slippage_bps = 0
logger.info(f"[{symbol}] 📊 成交价={fill_price} | 信号价={signal_price} | 滑点={slippage_bps:.1f}bps | 信号→下单={signal_to_order_ms}ms | 下单→成交={order_to_fill_ms}ms")
# 7. 挂保护单SL + TP1半仓 + TP2半仓
close_side = "SELL" if direction == "LONG" else "BUY"
half_qty = round(qty / 2, prec["qty"])
other_half = round(qty - half_qty, prec["qty"])
# SL - 全仓失败重试2次3次都失败则紧急平仓
t_before_sl = time.time() * 1000
sl_data, sl_status = await place_stop_order(session, symbol, close_side, sl_price, qty, "STOP_MARKET")
if sl_status != 200:
for retry in range(2):
logger.warning(f"[{symbol}] SL挂单重试 {retry+1}/2...")
await asyncio.sleep(0.3)
sl_data, sl_status = await place_stop_order(session, symbol, close_side, sl_price, qty, "STOP_MARKET")
if sl_status == 200:
break
if sl_status != 200:
logger.error(f"[{symbol}] ❌ SL 3次全部失败紧急reduceOnly平仓! data={sl_data}")
# 查真实持仓量用reduceOnly市价平仓避免反向开仓
emergency_pos = await get_position(session, symbol)
if emergency_pos:
emergency_amt = abs(float(emergency_pos.get("positionAmt", 0)))
emergency_prec = SYMBOL_PRECISION.get(symbol, {"qty": 3})
emergency_qty_str = f"{emergency_amt:.{emergency_prec['qty']}f}"
close_data, close_status = await binance_request(session, "POST", "/fapi/v1/order", {
"symbol": symbol, "side": close_side, "type": "MARKET",
"quantity": emergency_qty_str, "reduceOnly": "true",
})
if close_status != 200:
logger.error(f"[{symbol}] ❌ 紧急平仓也失败! close_data={close_data}")
_log_event(db_conn, "critical", "trade",
f"SL失败后紧急平仓也失败需人工介入", symbol,
{"sl_data": str(sl_data), "close_data": str(close_data)})
else:
logger.info(f"[{symbol}] ✅ 紧急reduceOnly平仓成功 qty={emergency_qty_str}")
_log_event(db_conn, "critical", "trade", f"SL挂单3次失败已紧急reduceOnly平仓", symbol, {"sl_data": str(sl_data)})
else:
logger.warning(f"[{symbol}] SL失败但币安已无持仓无需平仓")
_log_event(db_conn, "critical", "trade", f"SL挂单3次失败但币安无持仓", symbol, {"sl_data": str(sl_data)})
return None
t_after_sl = time.time() * 1000
protection_gap_ms = int(t_after_sl - t_fill)
# TP1 - 半仓
tp1_type = "TAKE_PROFIT_MARKET"
await place_stop_order(session, symbol, close_side, tp1_price, half_qty, tp1_type)
# TP2 - 半仓
await place_stop_order(session, symbol, close_side, tp2_price, other_half, tp1_type)
logger.info(f"[{symbol}] 🛡 保护单已挂 | SL={sl_price} TP1={tp1_price}(半仓) TP2={tp2_price}(半仓) | 裸奔={protection_gap_ms}ms")
# 8. 写DB
cur.execute("""
INSERT INTO live_trades (
symbol, strategy, direction, entry_price, entry_ts,
sl_price, tp1_price, tp2_price, score, tier, status,
risk_distance, atr_at_entry, score_factors, signal_id,
binance_order_id, fill_price, slippage_bps,
protection_gap_ms, signal_to_order_ms, order_to_fill_ms,
qty
) VALUES (
%s, %s, %s, %s, %s,
%s, %s, %s, %s, %s, 'active',
%s, %s, %s, %s,
%s, %s, %s,
%s, %s, %s,
%s
) RETURNING id
""", (
symbol, strategy, direction, fill_price, int(t_fill),
sl_price, tp1_price, tp2_price, score, tier,
risk_distance, atr, json.dumps(factors) if factors else None, signal_id,
order_id, fill_price, round(slippage_bps, 2),
protection_gap_ms, signal_to_order_ms, order_to_fill_ms,
qty,
))
trade_id = cur.fetchone()[0]
db_conn.commit()
logger.info(f"[{symbol}] ✅ 实盘开仓完成 | trade_id={trade_id} | {direction} @ {fill_price} | score={score} | 策略={strategy}")
# 写event
_log_event(db_conn, "info", "trade", f"{direction} {symbol} @ {fill_price} | score={score} | 滑点={slippage_bps:.1f}bps", symbol,
{"trade_id": trade_id, "score": score, "fill_price": fill_price, "slippage_bps": round(slippage_bps, 2)})
return trade_id
# ============ 事件日志 ============
def _log_event(conn, level, category, message, symbol=None, detail=None):
"""写live_events表"""
try:
cur = conn.cursor()
cur.execute(
"INSERT INTO live_events (level, category, symbol, message, detail) VALUES (%s, %s, %s, %s, %s)",
(level, category, symbol, message, json.dumps(detail) if detail else None)
)
conn.commit()
except Exception:
pass
# ============ 信号监听 ============
def get_db_connection():
"""获取DB连接"""
conn = psycopg2.connect(**DB_CONFIG)
conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)
return conn
def fetch_pending_signals(conn):
"""查询未处理的新信号"""
cur = conn.cursor()
# 查最近60秒内的新信号score>=阈值、策略匹配、未被live_executor处理过
strategies_str = ",".join(f"'{s}'" for s in ENABLED_STRATEGIES)
cur.execute(f"""
SELECT si.id, si.symbol, si.signal, si.score, si.ts, si.factors, si.strategy,
si.price
FROM signal_indicators si
WHERE si.signal IS NOT NULL
AND si.signal != ''
AND si.strategy IN ({strategies_str})
AND si.ts > extract(epoch from now()) * 1000 - 60000
AND NOT EXISTS (
SELECT 1 FROM live_trades lt
WHERE lt.signal_id = si.id AND lt.strategy = si.strategy
)
ORDER BY si.ts DESC
""")
rows = cur.fetchall()
signals = []
for row in rows:
factors = row[5]
if isinstance(factors, str):
try:
factors = json.loads(factors)
except:
factors = None
signals.append({
"signal_id": row[0],
"symbol": row[1],
"direction": row[2],
"score": row[3],
"signal_ts": row[4],
"factors": factors,
"strategy": row[6],
"signal_price": float(row[7]) if row[7] else 0,
})
return signals
def enrich_signal_with_trade_params(signal: dict, conn):
"""从策略配置计算TP/SL参数"""
symbol = signal["symbol"]
strategy = signal["strategy"]
direction = signal["direction"]
# 读策略配置JSON
config_map = {
"v51_baseline": {"sl_multiplier": 1.4, "tp1_multiplier": 1.05, "tp2_multiplier": 2.1},
"v52_8signals": {"sl_multiplier": 2.1, "tp1_multiplier": 1.4, "tp2_multiplier": 3.15},
}
cfg = config_map.get(strategy)
if not cfg:
logger.warning(f"[{symbol}] 未知策略: {strategy}")
return False
# 从signal_indicators读ATR和价格
cur = conn.cursor()
cur.execute("""
SELECT atr_5m, price FROM signal_indicators
WHERE id = %s
""", (signal["signal_id"],))
row = cur.fetchone()
if not row or not row[0]:
logger.warning(f"[{symbol}] signal_id={signal['signal_id']} 无ATR数据")
return False
atr = float(row[0])
price = float(row[1])
signal["atr"] = atr
signal["signal_price"] = price
# 计算SL/TP直接用ATR不乘0.7
risk_distance = cfg["sl_multiplier"] * atr
signal["risk_distance"] = risk_distance
if direction == "LONG":
signal["sl_price"] = price - cfg["sl_multiplier"] * atr
signal["tp1_price"] = price + cfg["tp1_multiplier"] * atr
signal["tp2_price"] = price + cfg["tp2_multiplier"] * atr
else: # SHORT
signal["sl_price"] = price + cfg["sl_multiplier"] * atr
signal["tp1_price"] = price - cfg["tp1_multiplier"] * atr
signal["tp2_price"] = price - cfg["tp2_multiplier"] * atr
return True
# ============ 主循环 ============
async def main():
"""主循环监听PG NOTIFY + 定时轮询fallback"""
logger.info("=" * 60)
logger.info(f"🚀 Live Executor 启动 | 环境={TRADE_ENV} | 策略={ENABLED_STRATEGIES}")
logger.info(f" 风险/笔=${RISK_PER_TRADE_USD} | 最大持仓={MAX_POSITIONS}")
logger.info(f" API端点={BASE_URL}")
logger.info("=" * 60)
load_api_keys()
# 测试API连通性
async with aiohttp.ClientSession() as http_session:
balance = await get_account_balance(http_session)
logger.info(f"💰 账户余额: ${balance:.2f} USDT")
if balance <= 0:
logger.error("❌ 无法获取余额或余额为0请检查API Key")
return
# DB连接用于LISTEN
listen_conn = get_db_connection()
cur = listen_conn.cursor()
cur.execute("LISTEN new_signal;")
logger.info("👂 已注册PG LISTEN new_signal")
# 工作DB连接
work_conn = psycopg2.connect(**DB_CONFIG)
def ensure_db_conn(conn):
try:
conn.cursor().execute("SELECT 1")
return conn
except Exception:
logger.warning("⚠️ DB连接断开重连中...")
try:
conn.close()
except Exception:
pass
return psycopg2.connect(**DB_CONFIG)
async with aiohttp.ClientSession() as http_session:
while True:
try:
# 检查PG NOTIFY非阻塞
got_notify = False
if listen_conn.poll() == psycopg2.extensions.POLL_OK:
while listen_conn.notifies:
notify = listen_conn.notifies.pop(0)
logger.info(f"📡 收到NOTIFY: {notify.payload}")
got_notify = True
work_conn = ensure_db_conn(work_conn)
# 获取待处理信号NOTIFY + 轮询双保险)
signals = await asyncio.to_thread(fetch_pending_signals, work_conn)
for sig in signals:
# 补充TP/SL参数
if not enrich_signal_with_trade_params(sig, work_conn):
continue
logger.info(f"[{sig['symbol']}] 🎯 新信号: {sig['direction']} score={sig['score']} strategy={sig['strategy']}")
# 执行开仓
trade_id = await execute_entry(http_session, sig, work_conn)
if trade_id:
logger.info(f"[{sig['symbol']}] ✅ trade_id={trade_id} 开仓成功")
if not got_notify:
await asyncio.sleep(1) # 无NOTIFY时才sleep有NOTIFY立即处理下一轮
except KeyboardInterrupt:
logger.info("🛑 收到退出信号")
break
except Exception as e:
logger.error(f"❌ 主循环异常: {e}", exc_info=True)
await asyncio.sleep(5)
listen_conn.close()
work_conn.close()
logger.info("Live Executor 已停止")
if __name__ == "__main__":
asyncio.run(main())