85 lines
2.5 KiB
Python
Executable File
85 lines
2.5 KiB
Python
Executable File
#!/usr/bin/env python3
|
||
"""
|
||
清理 signal_indicators 中 strategy_id 对应 symbol 不一致的历史脏数据。
|
||
|
||
默认 dry-run;加 --apply 才会真正删除。
|
||
"""
|
||
|
||
from __future__ import annotations
|
||
|
||
import argparse
|
||
import os
|
||
|
||
try:
|
||
import psycopg2
|
||
except Exception:
|
||
psycopg2 = None
|
||
|
||
|
||
def get_conn():
|
||
if psycopg2 is None:
|
||
raise RuntimeError("缺少 psycopg2 依赖,请先安装:pip install psycopg2-binary")
|
||
host = os.getenv("PG_HOST") or os.getenv("DB_HOST") or "127.0.0.1"
|
||
port = int(os.getenv("PG_PORT") or os.getenv("DB_PORT") or 5432)
|
||
dbname = os.getenv("PG_DB") or os.getenv("DB_NAME") or "arb_engine"
|
||
user = os.getenv("PG_USER") or os.getenv("DB_USER") or "arb"
|
||
password = os.getenv("PG_PASS") or os.getenv("DB_PASS") or "arb_engine_2026"
|
||
return psycopg2.connect(host=host, port=port, dbname=dbname, user=user, password=password)
|
||
|
||
|
||
def main() -> int:
|
||
parser = argparse.ArgumentParser()
|
||
parser.add_argument("--apply", action="store_true", help="执行删除")
|
||
args = parser.parse_args()
|
||
|
||
conn = get_conn()
|
||
conn.autocommit = False
|
||
|
||
try:
|
||
with conn.cursor() as cur:
|
||
cur.execute(
|
||
"""
|
||
SELECT
|
||
COUNT(*) AS mismatch_rows,
|
||
to_timestamp(min(si.ts)/1000.0) AS first_ts_utc,
|
||
to_timestamp(max(si.ts)/1000.0) AS last_ts_utc
|
||
FROM signal_indicators si
|
||
JOIN strategies s ON s.strategy_id = si.strategy_id
|
||
WHERE si.symbol <> s.symbol
|
||
"""
|
||
)
|
||
mismatch_rows, first_ts, last_ts = cur.fetchone()
|
||
print(f"mismatch_rows={mismatch_rows}, range={first_ts} -> {last_ts}")
|
||
|
||
if mismatch_rows == 0:
|
||
print("无需清理")
|
||
conn.rollback()
|
||
return 0
|
||
|
||
if not args.apply:
|
||
print("dry-run 模式,仅输出统计;加 --apply 执行删除")
|
||
conn.rollback()
|
||
return 0
|
||
|
||
cur.execute(
|
||
"""
|
||
DELETE FROM signal_indicators si
|
||
USING strategies s
|
||
WHERE si.strategy_id = s.strategy_id
|
||
AND si.symbol <> s.symbol
|
||
"""
|
||
)
|
||
print(f"deleted_rows={cur.rowcount}")
|
||
|
||
conn.commit()
|
||
return 0
|
||
except Exception:
|
||
conn.rollback()
|
||
raise
|
||
finally:
|
||
conn.close()
|
||
|
||
|
||
if __name__ == "__main__":
|
||
raise SystemExit(main())
|