arbitrage-engine/backend/signal_engine.py

488 lines
19 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
signal_engine.py — V5 短线交易信号引擎PostgreSQL版
架构:
- 独立PM2进程每5秒循环
- 内存滚动窗口计算指标CVD/ATR/VWAP/大单阈值)
- 启动时回灌历史数据冷启动warmup
- 信号评估核心3条件+加分3条件
- 输出signal_indicators表 + signal_trades表 + Discord推送
指标:
- CVD_fast (30m滚动) / CVD_mid (4h滚动) / CVD_day (UTC日内)
- ATR (5m, 14周期)
- VWAP_30m
- 大单阈值 P95/P99 (24h滚动)
"""
import logging
import os
import time
from collections import deque
from datetime import datetime, timezone
from typing import Any, Optional
from db import get_sync_conn, init_schema
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
handlers=[
logging.StreamHandler(),
logging.FileHandler(os.path.join(os.path.dirname(__file__), "..", "signal-engine.log")),
],
)
logger = logging.getLogger("signal-engine")
SYMBOLS = ["BTCUSDT", "ETHUSDT"]
LOOP_INTERVAL = 15 # 秒从5改15CPU降60%,信号质量无影响)
# 窗口大小(毫秒)
WINDOW_FAST = 30 * 60 * 1000 # 30分钟
WINDOW_MID = 4 * 3600 * 1000 # 4小时
WINDOW_DAY = 24 * 3600 * 1000 # 24小时
WINDOW_VWAP = 30 * 60 * 1000 # 30分钟
# ATR参数
ATR_PERIOD_MS = 5 * 60 * 1000
ATR_LENGTH = 14
# 信号冷却
COOLDOWN_MS = 10 * 60 * 1000
def fetch_market_indicators(symbol: str) -> dict:
"""从PG读取最新的market_indicators数据"""
with get_sync_conn() as conn:
with conn.cursor() as cur:
indicators = {}
for ind_type in ["long_short_ratio", "top_trader_position", "open_interest_hist", "coinbase_premium"]:
cur.execute(
"SELECT value FROM market_indicators WHERE symbol=%s AND indicator_type=%s ORDER BY timestamp_ms DESC LIMIT 1",
(symbol, ind_type),
)
row = cur.fetchone()
indicators[ind_type] = row[0] if row else None
return indicators
def to_float(value: Any) -> Optional[float]:
try:
return float(value) if value is not None else None
except (TypeError, ValueError):
return None
# ─── 滚动窗口 ───────────────────────────────────────────────────
class TradeWindow:
def __init__(self, window_ms: int):
self.window_ms = window_ms
self.trades: deque = deque()
self.buy_vol = 0.0
self.sell_vol = 0.0
self.pq_sum = 0.0
self.q_sum = 0.0
def add(self, time_ms: int, qty: float, price: float, is_buyer_maker: int):
self.trades.append((time_ms, qty, price, is_buyer_maker))
pq = price * qty
self.pq_sum += pq
self.q_sum += qty
if is_buyer_maker == 0:
self.buy_vol += qty
else:
self.sell_vol += qty
def trim(self, now_ms: int):
cutoff = now_ms - self.window_ms
while self.trades and self.trades[0][0] < cutoff:
t_ms, qty, price, ibm = self.trades.popleft()
self.pq_sum -= price * qty
self.q_sum -= qty
if ibm == 0:
self.buy_vol -= qty
else:
self.sell_vol -= qty
@property
def cvd(self) -> float:
return self.buy_vol - self.sell_vol
@property
def vwap(self) -> float:
return self.pq_sum / self.q_sum if self.q_sum > 0 else 0.0
class ATRCalculator:
def __init__(self, period_ms: int = ATR_PERIOD_MS, length: int = ATR_LENGTH):
self.period_ms = period_ms
self.length = length
self.candles: deque = deque(maxlen=length + 1)
self.current_candle: Optional[dict] = None
self.atr_history: deque = deque(maxlen=288)
def update(self, time_ms: int, price: float):
bar_ms = (time_ms // self.period_ms) * self.period_ms
if self.current_candle is None or self.current_candle["bar"] != bar_ms:
if self.current_candle is not None:
self.candles.append(self.current_candle)
self.current_candle = {"bar": bar_ms, "open": price, "high": price, "low": price, "close": price}
else:
c = self.current_candle
c["high"] = max(c["high"], price)
c["low"] = min(c["low"], price)
c["close"] = price
@property
def atr(self) -> float:
if len(self.candles) < 2:
return 0.0
trs = []
candles_list = list(self.candles)
for i in range(1, len(candles_list)):
prev_close = candles_list[i-1]["close"]
c = candles_list[i]
tr = max(c["high"] - c["low"], abs(c["high"] - prev_close), abs(c["low"] - prev_close))
trs.append(tr)
if not trs:
return 0.0
atr_val = trs[0]
for tr in trs[1:]:
atr_val = (atr_val * (self.length - 1) + tr) / self.length
return atr_val
@property
def atr_percentile(self) -> float:
current = self.atr
if current == 0:
return 50.0
self.atr_history.append(current)
if len(self.atr_history) < 10:
return 50.0
sorted_hist = sorted(self.atr_history)
rank = sum(1 for x in sorted_hist if x <= current)
return (rank / len(sorted_hist)) * 100
class SymbolState:
def __init__(self, symbol: str):
self.symbol = symbol
self.win_fast = TradeWindow(WINDOW_FAST)
self.win_mid = TradeWindow(WINDOW_MID)
self.win_day = TradeWindow(WINDOW_DAY)
self.win_vwap = TradeWindow(WINDOW_VWAP)
self.atr_calc = ATRCalculator()
self.last_processed_id = 0
self.warmup = True
self.prev_cvd_fast = 0.0
self.prev_cvd_fast_slope = 0.0
self.market_indicators = fetch_market_indicators(symbol)
self.last_signal_ts = 0
self.last_signal_dir = ""
self.recent_large_trades: deque = deque()
def process_trade(self, agg_id: int, time_ms: int, price: float, qty: float, is_buyer_maker: int):
now_ms = time_ms
self.win_fast.add(time_ms, qty, price, is_buyer_maker)
self.win_mid.add(time_ms, qty, price, is_buyer_maker)
self.win_day.add(time_ms, qty, price, is_buyer_maker)
self.win_vwap.add(time_ms, qty, price, is_buyer_maker)
self.atr_calc.update(time_ms, price)
self.win_fast.trim(now_ms)
self.win_mid.trim(now_ms)
self.win_day.trim(now_ms)
self.win_vwap.trim(now_ms)
self.last_processed_id = agg_id
def compute_p95_p99(self) -> tuple:
if len(self.win_day.trades) < 100:
return 5.0, 10.0
qtys = sorted([t[1] for t in self.win_day.trades])
n = len(qtys)
p95 = qtys[int(n * 0.95)]
p99 = qtys[int(n * 0.99)]
if "BTC" in self.symbol:
p95 = max(p95, 5.0); p99 = max(p99, 10.0)
else:
p95 = max(p95, 50.0); p99 = max(p99, 100.0)
return p95, p99
def update_large_trades(self, now_ms: int, p99: float):
cutoff = now_ms - 15 * 60 * 1000
while self.recent_large_trades and self.recent_large_trades[0][0] < cutoff:
self.recent_large_trades.popleft()
for t in self.win_fast.trades:
if t[1] >= p99 and t[0] > cutoff:
self.recent_large_trades.append((t[0], t[1], t[3]))
def evaluate_signal(self, now_ms: int) -> dict:
cvd_fast = self.win_fast.cvd
cvd_mid = self.win_mid.cvd
vwap = self.win_vwap.vwap
atr = self.atr_calc.atr
atr_pct = self.atr_calc.atr_percentile
p95, p99 = self.compute_p95_p99()
self.update_large_trades(now_ms, p99)
price = vwap if vwap > 0 else 0
cvd_fast_slope = cvd_fast - self.prev_cvd_fast
cvd_fast_accel = cvd_fast_slope - self.prev_cvd_fast_slope
self.prev_cvd_fast = cvd_fast
self.prev_cvd_fast_slope = cvd_fast_slope
result = {
"cvd_fast": cvd_fast, "cvd_mid": cvd_mid, "cvd_day": self.win_day.cvd,
"cvd_fast_slope": cvd_fast_slope,
"atr": atr, "atr_pct": atr_pct, "vwap": vwap, "price": price,
"p95": p95, "p99": p99, "signal": None, "direction": None, "score": 0,
"tier": None,
"factors": {},
}
if self.warmup or price == 0 or atr == 0:
return result
if now_ms - self.last_signal_ts < COOLDOWN_MS:
return result
if cvd_fast > 0 and cvd_mid > 0:
direction = "LONG"
elif cvd_fast < 0 and cvd_mid < 0:
direction = "SHORT"
else:
return result
# V5.1 五层评分体系总分100方向层可因加速额外+5
# 1) 方向层45分 + 5加速分
direction_score = 0
if (direction == "LONG" and cvd_fast > 0) or (direction == "SHORT" and cvd_fast < 0):
direction_score += 15
if (direction == "LONG" and cvd_mid > 0) or (direction == "SHORT" and cvd_mid < 0):
direction_score += 15
has_adverse_p99 = any(
(direction == "LONG" and lt[2] == 1) or (direction == "SHORT" and lt[2] == 0)
for lt in self.recent_large_trades
)
has_aligned_p99 = any(
(direction == "LONG" and lt[2] == 0) or (direction == "SHORT" and lt[2] == 1)
for lt in self.recent_large_trades
)
if has_aligned_p99:
direction_score += 15
elif not has_adverse_p99:
direction_score += 10
accel_bonus = 0
if (direction == "LONG" and cvd_fast_accel > 0) or (direction == "SHORT" and cvd_fast_accel < 0):
accel_bonus = 5
# 2) 拥挤层20分- market_indicators缺失时给中间分
long_short_ratio = to_float(self.market_indicators.get("long_short_ratio"))
if long_short_ratio is None:
ls_score = 5
elif (direction == "SHORT" and long_short_ratio > 2.0) or (direction == "LONG" and long_short_ratio < 0.5):
ls_score = 10
else:
ls_score = 5
top_trader_position = to_float(self.market_indicators.get("top_trader_position"))
if top_trader_position is None:
top_trader_score = 5
else:
if direction == "LONG":
if top_trader_position >= 0.55:
top_trader_score = 10
elif top_trader_position <= 0.45:
top_trader_score = 0
else:
top_trader_score = 5
else:
if top_trader_position <= 0.45:
top_trader_score = 10
elif top_trader_position >= 0.55:
top_trader_score = 0
else:
top_trader_score = 5
crowding_score = ls_score + top_trader_score
# 3) 环境层15分
oi_change = to_float(self.market_indicators.get("open_interest_hist"))
if oi_change is None:
environment_score = 10
elif oi_change >= 0.03:
environment_score = 15
elif oi_change > 0:
environment_score = 10
else:
environment_score = 5
# 4) 确认层15分
confirmation_score = 15 if ((direction == "LONG" and cvd_fast > 0 and cvd_mid > 0) or (direction == "SHORT" and cvd_fast < 0 and cvd_mid < 0)) else 0
# 5) 辅助层5分
coinbase_premium = to_float(self.market_indicators.get("coinbase_premium"))
if coinbase_premium is None:
aux_score = 2
elif (direction == "LONG" and coinbase_premium > 0.0005) or (direction == "SHORT" and coinbase_premium < -0.0005):
aux_score = 5
elif abs(coinbase_premium) <= 0.0005:
aux_score = 2
else:
aux_score = 0
total_score = direction_score + accel_bonus + crowding_score + environment_score + confirmation_score + aux_score
result["score"] = total_score
result["direction"] = direction
result["factors"] = {
"direction": {
"score": direction_score,
"cvd_fast": 15 if ((direction == "LONG" and cvd_fast > 0) or (direction == "SHORT" and cvd_fast < 0)) else 0,
"cvd_mid": 15 if ((direction == "LONG" and cvd_mid > 0) or (direction == "SHORT" and cvd_mid < 0)) else 0,
"p99_flow": 15 if has_aligned_p99 else (10 if not has_adverse_p99 else 0),
"accel_bonus": accel_bonus,
},
"crowding": {"score": crowding_score, "long_short_ratio": ls_score, "top_trader_position": top_trader_score},
"environment": {"score": environment_score, "open_interest_hist": oi_change},
"confirmation": {"score": confirmation_score},
"auxiliary": {"score": aux_score, "coinbase_premium": coinbase_premium},
}
if total_score >= 85:
result["signal"] = direction
result["tier"] = "heavy"
elif total_score >= 75:
result["signal"] = direction
result["tier"] = "standard"
elif total_score >= 60:
result["signal"] = direction
result["tier"] = "light"
else:
result["signal"] = None
result["tier"] = None
if result["signal"]:
self.last_signal_ts = now_ms
self.last_signal_dir = direction
return result
# ─── PG DB操作 ───────────────────────────────────────────────────
def load_historical(state: SymbolState, window_ms: int):
now_ms = int(time.time() * 1000)
start_ms = now_ms - window_ms
count = 0
with get_sync_conn() as conn:
with conn.cursor() as cur:
cur.execute(
"SELECT agg_id, price, qty, time_ms, is_buyer_maker FROM agg_trades "
"WHERE symbol = %s AND time_ms >= %s ORDER BY agg_id ASC",
(state.symbol, start_ms)
)
while True:
rows = cur.fetchmany(5000)
if not rows:
break
for r in rows:
state.process_trade(r[0], r[3], r[1], r[2], r[4])
count += 1
logger.info(f"[{state.symbol}] 冷启动完成: 加载{count:,}条历史数据 (窗口={window_ms//3600000}h)")
state.warmup = False
def fetch_new_trades(symbol: str, last_id: int) -> list:
with get_sync_conn() as conn:
with conn.cursor() as cur:
cur.execute(
"SELECT agg_id, price, qty, time_ms, is_buyer_maker FROM agg_trades "
"WHERE symbol = %s AND agg_id > %s ORDER BY agg_id ASC LIMIT 10000",
(symbol, last_id)
)
return [{"agg_id": r[0], "price": r[1], "qty": r[2], "time_ms": r[3], "is_buyer_maker": r[4]}
for r in cur.fetchall()]
def save_indicator(ts: int, symbol: str, result: dict):
with get_sync_conn() as conn:
with conn.cursor() as cur:
cur.execute(
"INSERT INTO signal_indicators "
"(ts,symbol,cvd_fast,cvd_mid,cvd_day,cvd_fast_slope,atr_5m,atr_percentile,vwap_30m,price,p95_qty,p99_qty,score,signal) "
"VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)",
(ts, symbol, result["cvd_fast"], result["cvd_mid"], result["cvd_day"], result["cvd_fast_slope"],
result["atr"], result["atr_pct"], result["vwap"], result["price"],
result["p95"], result["p99"], result["score"], result.get("signal"))
)
conn.commit()
def save_indicator_1m(ts: int, symbol: str, result: dict):
bar_ts = (ts // 60000) * 60000
with get_sync_conn() as conn:
with conn.cursor() as cur:
cur.execute("SELECT id FROM signal_indicators_1m WHERE ts=%s AND symbol=%s", (bar_ts, symbol))
if cur.fetchone():
cur.execute(
"UPDATE signal_indicators_1m SET cvd_fast=%s,cvd_mid=%s,cvd_day=%s,atr_5m=%s,vwap_30m=%s,price=%s,score=%s,signal=%s WHERE ts=%s AND symbol=%s",
(result["cvd_fast"], result["cvd_mid"], result["cvd_day"], result["atr"], result["vwap"],
result["price"], result["score"], result.get("signal"), bar_ts, symbol)
)
else:
cur.execute(
"INSERT INTO signal_indicators_1m (ts,symbol,cvd_fast,cvd_mid,cvd_day,atr_5m,vwap_30m,price,score,signal) VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)",
(bar_ts, symbol, result["cvd_fast"], result["cvd_mid"], result["cvd_day"], result["atr"],
result["vwap"], result["price"], result["score"], result.get("signal"))
)
conn.commit()
# ─── 主循环 ──────────────────────────────────────────────────────
def main():
init_schema()
states = {sym: SymbolState(sym) for sym in SYMBOLS}
for sym, state in states.items():
load_historical(state, WINDOW_MID)
logger.info("=== Signal Engine (PG) 启动完成 ===")
last_1m_save = {}
cycle = 0
while True:
try:
now_ms = int(time.time() * 1000)
for sym, state in states.items():
new_trades = fetch_new_trades(sym, state.last_processed_id)
for t in new_trades:
state.process_trade(t["agg_id"], t["time_ms"], t["price"], t["qty"], t["is_buyer_maker"])
state.market_indicators = fetch_market_indicators(sym)
result = state.evaluate_signal(now_ms)
save_indicator(now_ms, sym, result)
bar_1m = (now_ms // 60000) * 60000
if last_1m_save.get(sym) != bar_1m:
save_indicator_1m(now_ms, sym, result)
last_1m_save[sym] = bar_1m
if result.get("signal"):
logger.info(f"[{sym}] 🚨 信号: {result['signal']} score={result['score']} price={result['price']:.1f}")
cycle += 1
if cycle % 60 == 0:
for sym, state in states.items():
logger.info(
f"[{sym}] 状态: CVD_fast={state.win_fast.cvd:.1f} "
f"CVD_mid={state.win_mid.cvd:.1f} ATR={state.atr_calc.atr:.2f} "
f"({state.atr_calc.atr_percentile:.0f}%) VWAP={state.win_vwap.vwap:.1f}"
)
except Exception as e:
logger.error(f"循环异常: {e}", exc_info=True)
time.sleep(LOOP_INTERVAL)
if __name__ == "__main__":
main()