arbitrage-engine/scripts/fix_signal_symbol_mismatch.py

85 lines
2.5 KiB
Python
Executable File
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
清理 signal_indicators 中 strategy_id 对应 symbol 不一致的历史脏数据。
默认 dry-run加 --apply 才会真正删除。
"""
from __future__ import annotations
import argparse
import os
try:
import psycopg2
except Exception:
psycopg2 = None
def get_conn():
if psycopg2 is None:
raise RuntimeError("缺少 psycopg2 依赖请先安装pip install psycopg2-binary")
host = os.getenv("PG_HOST") or os.getenv("DB_HOST") or "127.0.0.1"
port = int(os.getenv("PG_PORT") or os.getenv("DB_PORT") or 5432)
dbname = os.getenv("PG_DB") or os.getenv("DB_NAME") or "arb_engine"
user = os.getenv("PG_USER") or os.getenv("DB_USER") or "arb"
password = os.getenv("PG_PASS") or os.getenv("DB_PASS") or "arb_engine_2026"
return psycopg2.connect(host=host, port=port, dbname=dbname, user=user, password=password)
def main() -> int:
parser = argparse.ArgumentParser()
parser.add_argument("--apply", action="store_true", help="执行删除")
args = parser.parse_args()
conn = get_conn()
conn.autocommit = False
try:
with conn.cursor() as cur:
cur.execute(
"""
SELECT
COUNT(*) AS mismatch_rows,
to_timestamp(min(si.ts)/1000.0) AS first_ts_utc,
to_timestamp(max(si.ts)/1000.0) AS last_ts_utc
FROM signal_indicators si
JOIN strategies s ON s.strategy_id = si.strategy_id
WHERE si.symbol <> s.symbol
"""
)
mismatch_rows, first_ts, last_ts = cur.fetchone()
print(f"mismatch_rows={mismatch_rows}, range={first_ts} -> {last_ts}")
if mismatch_rows == 0:
print("无需清理")
conn.rollback()
return 0
if not args.apply:
print("dry-run 模式,仅输出统计;加 --apply 执行删除")
conn.rollback()
return 0
cur.execute(
"""
DELETE FROM signal_indicators si
USING strategies s
WHERE si.strategy_id = s.strategy_id
AND si.symbol <> s.symbol
"""
)
print(f"deleted_rows={cur.rowcount}")
conn.commit()
return 0
except Exception:
conn.rollback()
raise
finally:
conn.close()
if __name__ == "__main__":
raise SystemExit(main())