arbitrage-engine/backend/migrate_v54b_gates.py
2026-03-31 08:56:11 +00:00

156 lines
6.6 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
V5.4b Gate Schema Migration
将 strategies 表的 gate 字段从4门改为5门与 signal_engine.py 实际执行逻辑完全对应。
变更:
1. 重命名 atr_percentile_min → vol_atr_pct_minATR%价格阈值如0.002=0.2%
2. 重命名 whale_cvd_threshold → whale_flow_pct鲸鱼CVD流量阈值BTC专用0-1
3. 新增 gate_cvd_enabled BOOLEAN DEFAULT TRUE门2 CVD共振开关
4. 新增 whale_usd_threshold FLOAT DEFAULT 50000门3 大单USD金额阈值
5. 用 v53.json 里的 per-symbol 默认值回填旧三条策略
五门对应:
门1 波动率gate_vol_enabled + vol_atr_pct_min
门2 CVD共振gate_cvd_enabled无参数判断快慢CVD同向
门3 鲸鱼否决gate_whale_enabled + whale_usd_thresholdALT大单USD+ whale_flow_pctBTC CVD流量
门4 OBI否决gate_obi_enabled + obi_threshold
门5 期现背离gate_spot_perp_enabled + spot_perp_threshold
"""
import os, sys
import psycopg2
from psycopg2.extras import RealDictCursor
PG_HOST = os.getenv("PG_HOST", "127.0.0.1")
PG_USER = os.getenv("PG_USER", "arb")
PG_PASS = os.getenv("PG_PASS", "arb_engine_2026")
PG_DB = os.getenv("PG_DB", "arb_engine")
# Per-symbol 默认值(来自 v53.json symbol_gates
SYMBOL_DEFAULTS = {
"BTCUSDT": {"vol_atr_pct_min": 0.002, "whale_usd_threshold": 100000, "whale_flow_pct": 0.5, "obi_threshold": 0.30, "spot_perp_threshold": 0.003},
"ETHUSDT": {"vol_atr_pct_min": 0.003, "whale_usd_threshold": 50000, "whale_flow_pct": 0.5, "obi_threshold": 0.35, "spot_perp_threshold": 0.005},
"SOLUSDT": {"vol_atr_pct_min": 0.004, "whale_usd_threshold": 20000, "whale_flow_pct": 0.5, "obi_threshold": 0.45, "spot_perp_threshold": 0.008},
"XRPUSDT": {"vol_atr_pct_min": 0.0025,"whale_usd_threshold": 30000, "whale_flow_pct": 0.5, "obi_threshold": 0.40, "spot_perp_threshold": 0.006},
None: {"vol_atr_pct_min": 0.002, "whale_usd_threshold": 50000, "whale_flow_pct": 0.5, "obi_threshold": 0.35, "spot_perp_threshold": 0.005},
}
DRY_RUN = "--dry-run" in sys.argv
def get_conn():
return psycopg2.connect(
host=PG_HOST, port=5432,
user=PG_USER, password=PG_PASS, dbname=PG_DB
)
def run():
conn = get_conn()
cur = conn.cursor(cursor_factory=RealDictCursor)
print("=== V5.4b Gate Schema Migration ===")
print(f"DRY_RUN={DRY_RUN}")
print()
# Step 1: 检查字段是否已迁移
cur.execute("""
SELECT column_name FROM information_schema.columns
WHERE table_name='strategies' AND column_name IN
('vol_atr_pct_min','whale_flow_pct','gate_cvd_enabled','whale_usd_threshold',
'atr_percentile_min','whale_cvd_threshold')
""")
existing = {r["column_name"] for r in cur.fetchall()}
print(f"现有相关字段: {existing}")
sqls = []
# Step 2: 改名 atr_percentile_min → vol_atr_pct_min
if "atr_percentile_min" in existing and "vol_atr_pct_min" not in existing:
sqls.append("ALTER TABLE strategies RENAME COLUMN atr_percentile_min TO vol_atr_pct_min")
print("✅ RENAME atr_percentile_min → vol_atr_pct_min")
elif "vol_atr_pct_min" in existing:
print("⏭️ vol_atr_pct_min 已存在,跳过改名")
# Step 3: 改名 whale_cvd_threshold → whale_flow_pct
if "whale_cvd_threshold" in existing and "whale_flow_pct" not in existing:
sqls.append("ALTER TABLE strategies RENAME COLUMN whale_cvd_threshold TO whale_flow_pct")
print("✅ RENAME whale_cvd_threshold → whale_flow_pct")
elif "whale_flow_pct" in existing:
print("⏭️ whale_flow_pct 已存在,跳过改名")
# Step 4: 新增 gate_cvd_enabled
if "gate_cvd_enabled" not in existing:
sqls.append("ALTER TABLE strategies ADD COLUMN gate_cvd_enabled BOOLEAN NOT NULL DEFAULT TRUE")
print("✅ ADD gate_cvd_enabled BOOLEAN DEFAULT TRUE")
else:
print("⏭️ gate_cvd_enabled 已存在,跳过")
# Step 5: 新增 whale_usd_threshold
if "whale_usd_threshold" not in existing:
sqls.append("ALTER TABLE strategies ADD COLUMN whale_usd_threshold FLOAT NOT NULL DEFAULT 50000")
print("✅ ADD whale_usd_threshold FLOAT DEFAULT 50000")
else:
print("⏭️ whale_usd_threshold 已存在,跳过")
print()
if not sqls:
print("无需迁移,所有字段已是最新状态。")
conn.close()
return
if DRY_RUN:
print("=== DRY RUN - 以下SQL不会执行 ===")
for sql in sqls:
print(f" {sql};")
conn.close()
return
# 执行 DDL
for sql in sqls:
print(f"执行: {sql}")
cur.execute(sql)
conn.commit()
print()
# Step 6: 回填 per-symbol 默认值
cur.execute("SELECT strategy_id, symbol FROM strategies")
rows = cur.fetchall()
print(f"回填 {len(rows)} 条策略的 per-symbol 默认值...")
for row in rows:
sid = row["strategy_id"]
sym = row["symbol"]
defaults = SYMBOL_DEFAULTS.get(sym, SYMBOL_DEFAULTS[None])
cur.execute("""
UPDATE strategies SET
vol_atr_pct_min = %s,
whale_flow_pct = %s,
whale_usd_threshold = %s,
obi_threshold = %s,
spot_perp_threshold = %s
WHERE strategy_id = %s
""", (
defaults["vol_atr_pct_min"],
defaults["whale_flow_pct"],
defaults["whale_usd_threshold"],
defaults["obi_threshold"],
defaults["spot_perp_threshold"],
sid
))
print(f" {sid} ({sym}): vol_atr_pct={defaults['vol_atr_pct_min']} whale_usd={defaults['whale_usd_threshold']} obi={defaults['obi_threshold']}")
conn.commit()
print()
print("=== 迁移完成 ===")
# 验证
cur.execute("SELECT strategy_id, display_name, gate_cvd_enabled, gate_vol_enabled, vol_atr_pct_min, gate_whale_enabled, whale_usd_threshold, whale_flow_pct, gate_obi_enabled, obi_threshold, gate_spot_perp_enabled, spot_perp_threshold FROM strategies ORDER BY created_at")
print("\n验证结果:")
print(f"{'display_name':<15} {'cvd':>4} {'vol':>4} {'vol_pct':>8} {'whale':>6} {'whale_usd':>10} {'flow_pct':>9} {'obi':>4} {'obi_thr':>8} {'spd':>4} {'spd_thr':>8}")
for r in cur.fetchall():
print(f"{r['display_name']:<15} {str(r['gate_cvd_enabled']):>4} {str(r['gate_vol_enabled']):>4} {r['vol_atr_pct_min']:>8.4f} {str(r['gate_whale_enabled']):>6} {r['whale_usd_threshold']:>10.0f} {r['whale_flow_pct']:>9.3f} {str(r['gate_obi_enabled']):>4} {r['obi_threshold']:>8.3f} {str(r['gate_spot_perp_enabled']):>4} {r['spot_perp_threshold']:>8.4f}")
conn.close()
if __name__ == "__main__":
run()