arbitrage-engine/backend/migrate_v54.py
2026-03-31 08:56:11 +00:00

328 lines
12 KiB
Python

#!/usr/bin/env python3
"""
V5.4 Strategy Factory DB Migration Script
- Creates `strategies` table
- Adds strategy_id + strategy_name_snapshot to paper_trades, signal_indicators
- Inserts existing 3 strategies with fixed UUIDs
- Backfills strategy_id + strategy_name_snapshot for all existing records
"""
import os
import sys
import psycopg2
from psycopg2.extras import execute_values
PG_HOST = os.environ.get("PG_HOST", "127.0.0.1")
PG_PASS = os.environ.get("PG_PASS", "arb_engine_2026")
PG_USER = "arb"
PG_DB = "arb_engine"
# Fixed UUIDs for existing strategies (deterministic, easy to recognize)
LEGACY_STRATEGY_MAP = {
"v53": ("00000000-0000-0000-0000-000000000053", "V5.3 Standard"),
"v53_middle": ("00000000-0000-0000-0000-000000000054", "V5.3 Middle"),
"v53_fast": ("00000000-0000-0000-0000-000000000055", "V5.3 Fast"),
}
# Default config values per strategy (from strategy JSON files)
LEGACY_CONFIGS = {
"v53": {
"symbol": "BTCUSDT", # multi-symbol, use BTC as representative
"cvd_fast_window": "30m",
"cvd_slow_window": "4h",
"weight_direction": 55,
"weight_env": 25,
"weight_aux": 15,
"weight_momentum": 5,
"entry_score": 75,
"sl_atr_multiplier": 1.0,
"tp1_ratio": 0.75,
"tp2_ratio": 1.5,
"timeout_minutes": 60,
"flip_threshold": 75,
"status": "running",
"initial_balance": 10000.0,
},
"v53_middle": {
"symbol": "BTCUSDT",
"cvd_fast_window": "15m",
"cvd_slow_window": "1h",
"weight_direction": 55,
"weight_env": 25,
"weight_aux": 15,
"weight_momentum": 5,
"entry_score": 75,
"sl_atr_multiplier": 1.0,
"tp1_ratio": 0.75,
"tp2_ratio": 1.5,
"timeout_minutes": 60,
"flip_threshold": 75,
"status": "running",
"initial_balance": 10000.0,
},
"v53_fast": {
"symbol": "BTCUSDT",
"cvd_fast_window": "5m",
"cvd_slow_window": "30m",
"weight_direction": 55,
"weight_env": 25,
"weight_aux": 15,
"weight_momentum": 5,
"entry_score": 75,
"sl_atr_multiplier": 1.0,
"tp1_ratio": 0.75,
"tp2_ratio": 1.5,
"timeout_minutes": 60,
"flip_threshold": 75,
"status": "running",
"initial_balance": 10000.0,
},
}
def get_conn():
return psycopg2.connect(
host=PG_HOST, user=PG_USER, password=PG_PASS, dbname=PG_DB
)
def step1_create_strategies_table(cur):
print("[Step 1] Creating strategies table...")
cur.execute("""
CREATE TABLE IF NOT EXISTS strategies (
strategy_id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
display_name TEXT NOT NULL,
schema_version INT NOT NULL DEFAULT 1,
status TEXT NOT NULL DEFAULT 'running'
CHECK (status IN ('running', 'paused', 'deprecated')),
status_changed_at TIMESTAMP,
last_run_at TIMESTAMP,
deprecated_at TIMESTAMP,
symbol TEXT NOT NULL
CHECK (symbol IN ('BTCUSDT', 'ETHUSDT', 'SOLUSDT', 'XRPUSDT')),
direction TEXT NOT NULL DEFAULT 'both'
CHECK (direction IN ('long_only', 'short_only', 'both')),
cvd_fast_window TEXT NOT NULL DEFAULT '30m'
CHECK (cvd_fast_window IN ('5m', '15m', '30m')),
cvd_slow_window TEXT NOT NULL DEFAULT '4h'
CHECK (cvd_slow_window IN ('30m', '1h', '4h')),
weight_direction INT NOT NULL DEFAULT 55,
weight_env INT NOT NULL DEFAULT 25,
weight_aux INT NOT NULL DEFAULT 15,
weight_momentum INT NOT NULL DEFAULT 5,
entry_score INT NOT NULL DEFAULT 75,
gate_obi_enabled BOOL NOT NULL DEFAULT TRUE,
obi_threshold FLOAT NOT NULL DEFAULT 0.3,
gate_whale_enabled BOOL NOT NULL DEFAULT TRUE,
whale_cvd_threshold FLOAT NOT NULL DEFAULT 0.0,
gate_vol_enabled BOOL NOT NULL DEFAULT TRUE,
atr_percentile_min INT NOT NULL DEFAULT 20,
gate_spot_perp_enabled BOOL NOT NULL DEFAULT FALSE,
spot_perp_threshold FLOAT NOT NULL DEFAULT 0.002,
sl_atr_multiplier FLOAT NOT NULL DEFAULT 1.5,
tp1_ratio FLOAT NOT NULL DEFAULT 0.75,
tp2_ratio FLOAT NOT NULL DEFAULT 1.5,
timeout_minutes INT NOT NULL DEFAULT 240,
flip_threshold INT NOT NULL DEFAULT 80,
initial_balance FLOAT NOT NULL DEFAULT 10000.0,
current_balance FLOAT NOT NULL DEFAULT 10000.0,
description TEXT,
tags TEXT[],
created_at TIMESTAMP NOT NULL DEFAULT NOW(),
updated_at TIMESTAMP NOT NULL DEFAULT NOW()
)
""")
cur.execute("CREATE INDEX IF NOT EXISTS idx_strategies_status ON strategies(status)")
cur.execute("CREATE INDEX IF NOT EXISTS idx_strategies_symbol ON strategies(symbol)")
cur.execute("CREATE INDEX IF NOT EXISTS idx_strategies_last_run ON strategies(last_run_at)")
print("[Step 1] Done.")
def step2_add_columns(cur):
print("[Step 2] Adding strategy_id + strategy_name_snapshot columns...")
# paper_trades
for col, col_type in [
("strategy_id", "UUID REFERENCES strategies(strategy_id)"),
("strategy_name_snapshot", "TEXT"),
]:
cur.execute(f"""
ALTER TABLE paper_trades
ADD COLUMN IF NOT EXISTS {col} {col_type}
""")
# signal_indicators
for col, col_type in [
("strategy_id", "UUID REFERENCES strategies(strategy_id)"),
("strategy_name_snapshot", "TEXT"),
]:
cur.execute(f"""
ALTER TABLE signal_indicators
ADD COLUMN IF NOT EXISTS {col} {col_type}
""")
# Indexes
cur.execute("CREATE INDEX IF NOT EXISTS idx_paper_trades_strategy_id ON paper_trades(strategy_id)")
cur.execute("CREATE INDEX IF NOT EXISTS idx_si_strategy_id ON signal_indicators(strategy_id)")
print("[Step 2] Done.")
def step3_insert_legacy_strategies(cur):
print("[Step 3] Inserting legacy strategies into strategies table...")
for strategy_name, (uuid, display_name) in LEGACY_STRATEGY_MAP.items():
cfg = LEGACY_CONFIGS[strategy_name]
# Compute current_balance from actual paper trades
cur.execute("""
SELECT
COALESCE(SUM(pnl_r) * 200, 0) as total_pnl_usdt
FROM paper_trades
WHERE strategy = %s AND status != 'active'
""", (strategy_name,))
row = cur.fetchone()
pnl_usdt = row[0] if row else 0
current_balance = round(cfg["initial_balance"] + pnl_usdt, 2)
cur.execute("""
INSERT INTO strategies (
strategy_id, display_name, schema_version, status,
symbol, direction,
cvd_fast_window, cvd_slow_window,
weight_direction, weight_env, weight_aux, weight_momentum,
entry_score,
gate_obi_enabled, obi_threshold,
gate_whale_enabled, whale_cvd_threshold,
gate_vol_enabled, atr_percentile_min,
gate_spot_perp_enabled, spot_perp_threshold,
sl_atr_multiplier, tp1_ratio, tp2_ratio,
timeout_minutes, flip_threshold,
initial_balance, current_balance,
description
) VALUES (
%s, %s, 1, %s,
%s, 'both',
%s, %s,
%s, %s, %s, %s,
%s,
TRUE, 0.3,
TRUE, 0.0,
TRUE, 20,
FALSE, 0.002,
%s, %s, %s,
%s, %s,
%s, %s,
%s
)
ON CONFLICT (strategy_id) DO NOTHING
""", (
uuid, display_name, cfg["status"],
cfg["symbol"], cfg["cvd_fast_window"], cfg["cvd_slow_window"],
cfg["weight_direction"], cfg["weight_env"], cfg["weight_aux"], cfg["weight_momentum"],
cfg["entry_score"],
cfg["sl_atr_multiplier"], cfg["tp1_ratio"], cfg["tp2_ratio"],
cfg["timeout_minutes"], cfg["flip_threshold"],
cfg["initial_balance"], current_balance,
f"Migrated from V5.3 legacy strategy: {strategy_name}"
))
print(f" Inserted {strategy_name}{uuid} (balance: {current_balance})")
print("[Step 3] Done.")
def step4_backfill(cur):
print("[Step 4] Backfilling strategy_id + strategy_name_snapshot...")
for strategy_name, (uuid, display_name) in LEGACY_STRATEGY_MAP.items():
# paper_trades
cur.execute("""
UPDATE paper_trades
SET strategy_id = %s::uuid,
strategy_name_snapshot = %s
WHERE strategy = %s AND strategy_id IS NULL
""", (uuid, display_name, strategy_name))
count = cur.rowcount
print(f" paper_trades [{strategy_name}]: {count} rows updated")
# signal_indicators
cur.execute("""
UPDATE signal_indicators
SET strategy_id = %s::uuid,
strategy_name_snapshot = %s
WHERE strategy = %s AND strategy_id IS NULL
""", (uuid, display_name, strategy_name))
count = cur.rowcount
print(f" signal_indicators [{strategy_name}]: {count} rows updated")
print("[Step 4] Done.")
def step5_verify(cur):
print("[Step 5] Verifying migration completeness...")
# Check strategies table
cur.execute("SELECT COUNT(*) FROM strategies")
n = cur.fetchone()[0]
print(f" strategies table: {n} rows")
# Check NULL strategy_id in paper_trades (for known strategies)
cur.execute("""
SELECT strategy, COUNT(*) as cnt
FROM paper_trades
WHERE strategy IN ('v53', 'v53_middle', 'v53_fast')
AND strategy_id IS NULL
GROUP BY strategy
""")
rows = cur.fetchall()
if rows:
print(f" WARNING: NULL strategy_id found in paper_trades:")
for r in rows:
print(f" {r[0]}: {r[1]} rows")
else:
print(" paper_trades: all known strategies backfilled ✅")
# Check NULL in signal_indicators
cur.execute("""
SELECT strategy, COUNT(*) as cnt
FROM signal_indicators
WHERE strategy IN ('v53', 'v53_middle', 'v53_fast')
AND strategy_id IS NULL
GROUP BY strategy
""")
rows = cur.fetchall()
if rows:
print(f" WARNING: NULL strategy_id found in signal_indicators:")
for r in rows:
print(f" {r[0]}: {r[1]} rows")
else:
print(" signal_indicators: all known strategies backfilled ✅")
print("[Step 5] Done.")
def main():
dry_run = "--dry-run" in sys.argv
if dry_run:
print("=== DRY RUN MODE (no changes will be committed) ===")
conn = get_conn()
conn.autocommit = False
cur = conn.cursor()
try:
step1_create_strategies_table(cur)
step2_add_columns(cur)
step3_insert_legacy_strategies(cur)
step4_backfill(cur)
step5_verify(cur)
if dry_run:
conn.rollback()
print("\n=== DRY RUN: rolled back all changes ===")
else:
conn.commit()
print("\n=== Migration completed successfully ✅ ===")
except Exception as e:
conn.rollback()
print(f"\n=== ERROR: {e} ===")
raise
finally:
cur.close()
conn.close()
if __name__ == "__main__":
main()