From e1290cd9b5cad6c4eef7f097b6251c14cd0b569d Mon Sep 17 00:00:00 2001 From: fanziqi <403508380@qq.com> Date: Tue, 31 Mar 2026 21:44:44 +0800 Subject: [PATCH] feat(auto-evolve): add daily optimizer and pre-cleanup tooling --- README.md | 4 +- automation/auto_evolve/README.md | 30 + automation/auto_evolve/config.example.json | 8 + automation/auto_evolve/run_daily.py | 890 +++++++++++++++++++++ backend/requirements.txt | 2 + backend/signal_engine.py | 27 +- backend/strategy_loader.py | 2 +- backend/strategy_scoring.py | 18 +- docs/AI_HANDBOOK.md | 2 +- docs/AUTO_EVOLVE_RUNBOOK.md | 144 ++++ docs/BACKEND_RUNTIME.md | 2 +- docs/CODEX_DAILY_REVIEW_PROMPT.md | 30 + docs/OPS_CONNECTIONS.md | 177 ++-- docs/STRATEGY_FACTORY_VALIDATION.md | 4 +- docs/arbitrage-engine-full-spec.md | 4 +- reports/auto-evolve/.gitkeep | 0 scripts/fix_signal_symbol_mismatch.py | 84 ++ 17 files changed, 1319 insertions(+), 109 deletions(-) create mode 100644 automation/auto_evolve/README.md create mode 100644 automation/auto_evolve/config.example.json create mode 100755 automation/auto_evolve/run_daily.py create mode 100644 docs/AUTO_EVOLVE_RUNBOOK.md create mode 100644 docs/CODEX_DAILY_REVIEW_PROMPT.md create mode 100644 reports/auto-evolve/.gitkeep create mode 100755 scripts/fix_signal_symbol_mismatch.py diff --git a/README.md b/README.md index bbe27f6..1a6cb82 100644 --- a/README.md +++ b/README.md @@ -4,4 +4,6 @@ ## Docs -- 运维连接手册(Cloud SQL + GCE): [docs/OPS_CONNECTIONS.md](docs/OPS_CONNECTIONS.md) +- 运维连接手册(本地 PostgreSQL + GCE): [docs/OPS_CONNECTIONS.md](docs/OPS_CONNECTIONS.md) +- Auto-Evolve 运行手册: [docs/AUTO_EVOLVE_RUNBOOK.md](docs/AUTO_EVOLVE_RUNBOOK.md) +- Codex 每日复盘 Prompt: [docs/CODEX_DAILY_REVIEW_PROMPT.md](docs/CODEX_DAILY_REVIEW_PROMPT.md) diff --git a/automation/auto_evolve/README.md b/automation/auto_evolve/README.md new file mode 100644 index 0000000..da35d4c --- /dev/null +++ b/automation/auto_evolve/README.md @@ -0,0 +1,30 @@ +# Auto-Evolve + +## 文件 + +- `run_daily.py`: 每日自动分析 + 自动调参 + 自动上新/下线 + 报告输出 +- `config.example.json`: 配置模板 + +## 快速开始 + +```bash +# dry-run(不写库) +python3 automation/auto_evolve/run_daily.py + +# apply(写库) +python3 automation/auto_evolve/run_daily.py --apply + +# 带配置 +python3 automation/auto_evolve/run_daily.py --config automation/auto_evolve/config.example.json --apply +``` + +## 输出 + +- Markdown 报告:`reports/auto-evolve/YYYY-MM-DD/HHMMSS_auto_evolve.md` +- JSON 报告:`reports/auto-evolve/YYYY-MM-DD/HHMMSS_auto_evolve.json` + +## 默认安全策略 + +- 默认 dry-run; +- `--apply` 才会写入策略; +- 写入失败自动回滚事务。 diff --git a/automation/auto_evolve/config.example.json b/automation/auto_evolve/config.example.json new file mode 100644 index 0000000..035ed67 --- /dev/null +++ b/automation/auto_evolve/config.example.json @@ -0,0 +1,8 @@ +{ + "lookback_days": 7, + "min_closed_trades": 12, + "max_new_per_symbol": 1, + "max_codex_running_per_symbol": 3, + "min_codex_age_hours_to_deprecate": 24, + "report_dir": "reports/auto-evolve" +} diff --git a/automation/auto_evolve/run_daily.py b/automation/auto_evolve/run_daily.py new file mode 100755 index 0000000..4c2318d --- /dev/null +++ b/automation/auto_evolve/run_daily.py @@ -0,0 +1,890 @@ +#!/usr/bin/env python3 +""" +Auto Evolve Daily Runner + +目标: +1) 基于最近数据自动复盘 running 策略; +2) 每个币种生成 1 个 codex 优化候选策略; +3) 自动下线超配且表现最差的 codex 策略; +4) 产出可审计报告(Markdown + JSON)。 + +注意: +- 默认 dry-run,仅输出建议不写库; +- 传 --apply 才会真正写入 DB。 +""" + +from __future__ import annotations + +import argparse +import json +import os +import uuid +from dataclasses import dataclass +from decimal import Decimal +from datetime import datetime, timedelta, timezone +from pathlib import Path +from typing import Any + +try: + import psycopg2 + from psycopg2.extras import RealDictCursor +except Exception: # pragma: no cover - runtime dependency guard + psycopg2 = None + RealDictCursor = None + + +BJ = timezone(timedelta(hours=8)) +SYMBOLS = ("BTCUSDT", "ETHUSDT", "SOLUSDT", "XRPUSDT") +WINDOW_MINUTES = {"5m": 5, "15m": 15, "30m": 30, "1h": 60, "4h": 240} +WINDOW_PAIRS = [ + ("5m", "30m"), + ("5m", "1h"), + ("5m", "4h"), + ("15m", "30m"), + ("15m", "1h"), + ("15m", "4h"), + ("30m", "1h"), + ("30m", "4h"), +] + + +@dataclass +class Config: + lookback_days: int = 7 + min_closed_trades: int = 12 + max_new_per_symbol: int = 1 + max_codex_running_per_symbol: int = 3 + min_codex_age_hours_to_deprecate: int = 24 + report_dir: str = "reports/auto-evolve" + + +def load_config(path: str | None) -> Config: + cfg = Config() + if not path: + return cfg + p = Path(path) + if not p.exists(): + return cfg + raw = json.loads(p.read_text(encoding="utf-8")) + for k in cfg.__dataclass_fields__: + if k in raw: + setattr(cfg, k, raw[k]) + return cfg + + +def get_db_conn(): + if psycopg2 is None: + raise RuntimeError( + "缺少 psycopg2 依赖,请先安装:pip install psycopg2-binary" + ) + host = os.getenv("PG_HOST") or os.getenv("DB_HOST") or "127.0.0.1" + port = int(os.getenv("PG_PORT") or os.getenv("DB_PORT") or 5432) + dbname = os.getenv("PG_DB") or os.getenv("DB_NAME") or "arb_engine" + user = os.getenv("PG_USER") or os.getenv("DB_USER") or "arb" + password = os.getenv("PG_PASS") or os.getenv("DB_PASS") or "arb_engine_2026" + return psycopg2.connect( + host=host, + port=port, + dbname=dbname, + user=user, + password=password, + ) + + +def as_float(v: Any, default: float = 0.0) -> float: + if v is None: + return default + try: + return float(v) + except Exception: + return default + + +def as_int(v: Any, default: int = 0) -> int: + if v is None: + return default + try: + return int(v) + except Exception: + return default + + +def detect_regime(avg_atr_pct: float, avg_abs_slope: float, signal_rate: float) -> str: + if avg_atr_pct >= 85: + return "crash" + if avg_atr_pct >= 70: + return "high_vol" + if avg_abs_slope >= 30 and signal_rate >= 0.02: + return "trend" + return "range" + + +def compute_fitness(row: dict, lookback_days: int, min_closed: int) -> float: + closed = as_int(row.get("closed_trades")) + net_r = as_float(row.get("net_r")) + win_rate = as_float(row.get("win_rate")) + gross_profit = as_float(row.get("gross_profit")) + gross_loss = as_float(row.get("gross_loss")) + + r_per_day = net_r / max(lookback_days, 1) + profit_factor = gross_profit / gross_loss if gross_loss > 0 else (2.0 if gross_profit > 0 else 0.0) + sample_penalty = max(min_closed - closed, 0) * 0.35 + consistency_bonus = (win_rate - 0.5) * 2.5 + + score = ( + net_r + + 0.9 * r_per_day + + 0.6 * (profit_factor - 1.0) + + 0.5 * consistency_bonus + - sample_penalty + ) + return round(score, 4) + + +def fetch_running_strategies(cur) -> list[dict]: + cur.execute( + """ + SELECT + strategy_id::text AS strategy_id, + display_name, + symbol, + direction, + cvd_fast_window, + cvd_slow_window, + weight_direction, + weight_env, + weight_aux, + weight_momentum, + entry_score, + gate_vol_enabled, + vol_atr_pct_min, + gate_cvd_enabled, + gate_whale_enabled, + whale_usd_threshold, + whale_flow_pct, + gate_obi_enabled, + obi_threshold, + gate_spot_perp_enabled, + spot_perp_threshold, + sl_atr_multiplier, + tp1_ratio, + tp2_ratio, + timeout_minutes, + flip_threshold, + initial_balance, + current_balance, + created_at, + updated_at, + description + FROM strategies + WHERE status='running' + ORDER BY symbol, created_at + """ + ) + return [dict(r) for r in cur.fetchall()] + + +def fetch_metrics(cur, lookback_days: int) -> dict[str, dict]: + cur.execute( + """ + WITH trade_agg AS ( + SELECT + s.strategy_id::text AS strategy_id, + COUNT(*) FILTER ( + WHERE p.status <> 'active' + AND p.symbol = s.symbol + AND p.created_at >= NOW() - (%s || ' days')::interval + ) AS closed_trades, + COALESCE(SUM(CASE + WHEN p.status <> 'active' + AND p.symbol = s.symbol + AND p.created_at >= NOW() - (%s || ' days')::interval + THEN p.pnl_r ELSE 0 END), 0) AS net_r, + COALESCE(AVG(CASE + WHEN p.status <> 'active' + AND p.symbol = s.symbol + AND p.created_at >= NOW() - (%s || ' days')::interval + THEN p.pnl_r END), 0) AS avg_r, + COALESCE(AVG(CASE + WHEN p.status <> 'active' + AND p.symbol = s.symbol + AND p.created_at >= NOW() - (%s || ' days')::interval + THEN CASE WHEN p.pnl_r > 0 THEN 1 ELSE 0 END END), 0) AS win_rate, + COALESCE(SUM(CASE + WHEN p.status <> 'active' + AND p.symbol = s.symbol + AND p.created_at >= NOW() - (%s || ' days')::interval + AND p.pnl_r > 0 + THEN p.pnl_r ELSE 0 END), 0) AS gross_profit, + COALESCE(ABS(SUM(CASE + WHEN p.status <> 'active' + AND p.symbol = s.symbol + AND p.created_at >= NOW() - (%s || ' days')::interval + AND p.pnl_r < 0 + THEN p.pnl_r ELSE 0 END)), 0) AS gross_loss + FROM strategies s + LEFT JOIN paper_trades p ON p.strategy_id = s.strategy_id + WHERE s.status='running' + GROUP BY s.strategy_id + ), + signal_agg AS ( + SELECT + s.strategy_id::text AS strategy_id, + COUNT(*) FILTER ( + WHERE si.ts >= (extract(epoch from NOW() - interval '24 hours') * 1000)::bigint + AND si.symbol = s.symbol + ) AS ticks_24h, + COUNT(*) FILTER ( + WHERE si.ts >= (extract(epoch from NOW() - interval '24 hours') * 1000)::bigint + AND si.symbol = s.symbol + AND COALESCE(si.signal, '') <> '' + ) AS entry_signals_24h, + COALESCE(AVG(si.score) FILTER ( + WHERE si.ts >= (extract(epoch from NOW() - interval '24 hours') * 1000)::bigint + AND si.symbol = s.symbol + AND COALESCE(si.signal, '') <> '' + ), 0) AS avg_signal_score_24h + FROM strategies s + LEFT JOIN signal_indicators si ON si.strategy_id = s.strategy_id + WHERE s.status='running' + GROUP BY s.strategy_id + ) + SELECT + s.strategy_id::text AS strategy_id, + s.symbol, + COALESCE(t.closed_trades, 0) AS closed_trades, + COALESCE(t.net_r, 0) AS net_r, + COALESCE(t.avg_r, 0) AS avg_r, + COALESCE(t.win_rate, 0) AS win_rate, + COALESCE(t.gross_profit, 0) AS gross_profit, + COALESCE(t.gross_loss, 0) AS gross_loss, + COALESCE(sa.ticks_24h, 0) AS ticks_24h, + COALESCE(sa.entry_signals_24h, 0) AS entry_signals_24h, + COALESCE(sa.avg_signal_score_24h, 0) AS avg_signal_score_24h + FROM strategies s + LEFT JOIN trade_agg t ON t.strategy_id = s.strategy_id::text + LEFT JOIN signal_agg sa ON sa.strategy_id = s.strategy_id::text + WHERE s.status='running' + """, + (lookback_days, lookback_days, lookback_days, lookback_days, lookback_days, lookback_days), + ) + data: dict[str, dict] = {} + for r in cur.fetchall(): + data[r["strategy_id"]] = dict(r) + return data + + +def fetch_symbol_stats(cur) -> dict[str, dict]: + cur.execute( + """ + SELECT + si.symbol, + COALESCE(AVG(si.atr_percentile), 0) AS avg_atr_percentile, + COALESCE(AVG(ABS(si.cvd_fast_slope)), 0) AS avg_abs_cvd_slope, + COALESCE(AVG(CASE WHEN COALESCE(si.signal,'') <> '' THEN 1 ELSE 0 END), 0) AS signal_rate, + COUNT(*) AS rows_24h + FROM signal_indicators si + JOIN strategies s ON s.strategy_id = si.strategy_id + WHERE s.status='running' + AND si.symbol = s.symbol + AND si.ts >= (extract(epoch from NOW() - interval '24 hours') * 1000)::bigint + GROUP BY si.symbol + """ + ) + stats: dict[str, dict] = {} + for r in cur.fetchall(): + avg_atr = as_float(r["avg_atr_percentile"]) + avg_slope = as_float(r["avg_abs_cvd_slope"]) + signal_rate = as_float(r["signal_rate"]) + regime = detect_regime(avg_atr, avg_slope, signal_rate) + stats[r["symbol"]] = { + "avg_atr_percentile": round(avg_atr, 2), + "avg_abs_cvd_slope": round(avg_slope, 2), + "signal_rate": round(signal_rate, 5), + "rows_24h": as_int(r["rows_24h"]), + "regime": regime, + } + return stats + + +def normalize_weights(wd: int, we: int, wa: int, wm: int) -> tuple[int, int, int, int]: + vals = [max(0, wd), max(0, we), max(0, wa), max(0, wm)] + s = sum(vals) + if s <= 0: + return (55, 25, 15, 5) + scaled = [round(v * 100 / s) for v in vals] + diff = 100 - sum(scaled) + scaled[0] += diff + return tuple(int(max(0, v)) for v in scaled) + + +def mutate_profile(parent: dict, regime: str) -> list[dict]: + base_entry = as_int(parent["entry_score"], 75) + base_sl = as_float(parent["sl_atr_multiplier"], 1.8) + base_timeout = as_int(parent["timeout_minutes"], 240) + + if regime in ("trend", "high_vol"): + tp_profiles = [ + ("激进", 1.25, 2.7, max(1.5, base_sl)), + ("平衡", 1.0, 2.1, max(1.6, base_sl)), + ("保守", 0.8, 1.6, max(1.5, base_sl - 0.1)), + ] + entry_steps = [0, 2, 4] + timeout_choices = [max(120, base_timeout - 60), base_timeout] + elif regime == "crash": + tp_profiles = [ + ("防守", 0.7, 1.4, max(1.3, base_sl - 0.3)), + ("平衡", 0.9, 1.8, max(1.4, base_sl - 0.2)), + ] + entry_steps = [3, 5] + timeout_choices = [max(90, base_timeout - 120), max(120, base_timeout - 60)] + else: + tp_profiles = [ + ("保守", 0.75, 1.5, max(1.4, base_sl - 0.2)), + ("平衡", 1.0, 2.0, max(1.5, base_sl - 0.1)), + ("激进", 1.3, 2.4, max(1.6, base_sl)), + ] + entry_steps = [-2, 0, 2] + timeout_choices = [base_timeout, min(360, base_timeout + 60)] + + candidates = [] + for fast, slow in WINDOW_PAIRS: + for profile_name, tp1, tp2, sl in tp_profiles: + for step in entry_steps: + for timeout_min in timeout_choices: + entry = min(95, max(60, base_entry + step)) + wd, we, wa, wm = ( + as_int(parent["weight_direction"], 55), + as_int(parent["weight_env"], 25), + as_int(parent["weight_aux"], 15), + as_int(parent["weight_momentum"], 5), + ) + if regime in ("trend", "high_vol"): + wd, we, wa, wm = normalize_weights(wd + 4, we + 1, wa - 3, wm - 2) + elif regime == "crash": + wd, we, wa, wm = normalize_weights(wd + 2, we + 4, wa - 4, wm - 2) + else: + wd, we, wa, wm = normalize_weights(wd - 1, we + 3, wa + 1, wm - 3) + + c = { + "cvd_fast_window": fast, + "cvd_slow_window": slow, + "entry_score": entry, + "sl_atr_multiplier": round(sl, 2), + "tp1_ratio": round(tp1, 2), + "tp2_ratio": round(tp2, 2), + "timeout_minutes": int(timeout_min), + "weight_direction": wd, + "weight_env": we, + "weight_aux": wa, + "weight_momentum": wm, + "profile_name": profile_name, + } + candidates.append(c) + return candidates + + +def candidate_signature(c: dict) -> tuple: + return ( + c["cvd_fast_window"], + c["cvd_slow_window"], + c["entry_score"], + c["sl_atr_multiplier"], + c["tp1_ratio"], + c["tp2_ratio"], + c["timeout_minutes"], + c["weight_direction"], + c["weight_env"], + c["weight_aux"], + c["weight_momentum"], + ) + + +def estimate_candidate_score(parent_metric: dict, candidate: dict, regime: str) -> float: + base = as_float(parent_metric.get("fitness"), 0) + signal_density = as_float(parent_metric.get("entry_signals_24h"), 0) + closed_trades = as_int(parent_metric.get("closed_trades"), 0) + + bonus = 0.0 + if regime in ("trend", "high_vol") and candidate["tp2_ratio"] >= 2.2: + bonus += 0.45 + if regime == "range" and candidate["entry_score"] <= as_int(parent_metric.get("entry_score"), 75): + bonus += 0.25 + if signal_density < 20 and candidate["entry_score"] < as_int(parent_metric.get("entry_score"), 75): + bonus += 0.35 + if signal_density > 120 and candidate["entry_score"] > as_int(parent_metric.get("entry_score"), 75): + bonus += 0.2 + if closed_trades < 10: + bonus -= 0.25 + if candidate["sl_atr_multiplier"] < 1.3 and regime in ("high_vol", "crash"): + bonus -= 0.4 + if candidate["cvd_fast_window"] == candidate["cvd_slow_window"]: + bonus -= 0.2 + + return round(base + bonus, 4) + + +def choose_new_candidates( + strategies: list[dict], + metrics: dict[str, dict], + symbol_stats: dict[str, dict], + cfg: Config, +) -> tuple[list[dict], list[str]]: + by_symbol: dict[str, list[dict]] = {sym: [] for sym in SYMBOLS} + for s in strategies: + if s["symbol"] in by_symbol: + row = dict(s) + row.update(metrics.get(s["strategy_id"], {})) + row["fitness"] = compute_fitness(row, cfg.lookback_days, cfg.min_closed_trades) + by_symbol[s["symbol"]].append(row) + + created_plan: list[dict] = [] + logs: list[str] = [] + + for sym in SYMBOLS: + symbol_rows = by_symbol.get(sym, []) + if not symbol_rows: + logs.append(f"[{sym}] 无 running 策略,跳过") + continue + + symbol_rows.sort(key=lambda x: x["fitness"], reverse=True) + regime = symbol_stats.get(sym, {}).get("regime", "range") + existing_sigs = { + candidate_signature( + { + "cvd_fast_window": r["cvd_fast_window"], + "cvd_slow_window": r["cvd_slow_window"], + "entry_score": as_int(r["entry_score"], 75), + "sl_atr_multiplier": round(as_float(r["sl_atr_multiplier"], 1.8), 2), + "tp1_ratio": round(as_float(r["tp1_ratio"], 1.0), 2), + "tp2_ratio": round(as_float(r["tp2_ratio"], 2.0), 2), + "timeout_minutes": as_int(r["timeout_minutes"], 240), + "weight_direction": as_int(r["weight_direction"], 55), + "weight_env": as_int(r["weight_env"], 25), + "weight_aux": as_int(r["weight_aux"], 15), + "weight_momentum": as_int(r["weight_momentum"], 5), + } + ) + for r in symbol_rows + } + + parent = symbol_rows[0] + parent_for_est = dict(parent) + parent_for_est["entry_signals_24h"] = as_int(parent.get("entry_signals_24h"), 0) + + pool = mutate_profile(parent, regime) + scored_pool = [] + for c in pool: + sig = candidate_signature(c) + if sig in existing_sigs: + continue + score = estimate_candidate_score(parent_for_est, c, regime) + c2 = dict(c) + c2["estimated_fitness"] = score + c2["source_strategy_id"] = parent["strategy_id"] + c2["source_display_name"] = parent["display_name"] + c2["symbol"] = sym + c2["direction"] = parent["direction"] + c2["gate_vol_enabled"] = parent["gate_vol_enabled"] + c2["vol_atr_pct_min"] = as_float(parent["vol_atr_pct_min"], 0.002) + c2["gate_cvd_enabled"] = parent["gate_cvd_enabled"] + c2["gate_whale_enabled"] = parent["gate_whale_enabled"] + c2["whale_usd_threshold"] = as_float(parent["whale_usd_threshold"], 50000) + c2["whale_flow_pct"] = as_float(parent["whale_flow_pct"], 0.5) + c2["gate_obi_enabled"] = parent["gate_obi_enabled"] + c2["obi_threshold"] = as_float(parent["obi_threshold"], 0.35) + c2["gate_spot_perp_enabled"] = parent["gate_spot_perp_enabled"] + c2["spot_perp_threshold"] = as_float(parent["spot_perp_threshold"], 0.005) + c2["flip_threshold"] = as_int(parent["flip_threshold"], max(c2["entry_score"], 75)) + c2["initial_balance"] = as_float(parent["initial_balance"], 10000) + scored_pool.append(c2) + + scored_pool.sort(key=lambda x: x["estimated_fitness"], reverse=True) + if not scored_pool: + logs.append(f"[{sym}] 无可生成的新候选(参数已覆盖)") + continue + + top_n = max(1, cfg.max_new_per_symbol) + picks = scored_pool[:top_n] + created_plan.extend(picks) + logs.append( + f"[{sym}] regime={regime} parent={parent['display_name']} -> candidate={len(picks)}" + ) + + return created_plan, logs + + +def build_display_name(candidate: dict, now_bj: datetime) -> str: + sym = candidate["symbol"].replace("USDT", "") + fw = candidate["cvd_fast_window"] + sw = candidate["cvd_slow_window"] + profile = candidate["profile_name"] + stamp = now_bj.strftime("%m%d") + return f"codex优化-{sym}_CVD{fw}x{sw}_TP{profile}_{stamp}" + + +def insert_strategy(cur, c: dict, now_bj: datetime) -> str: + sid = str(uuid.uuid4()) + display_name = build_display_name(c, now_bj) + + cur.execute("SELECT 1 FROM strategies WHERE display_name=%s", (display_name,)) + if cur.fetchone(): + display_name = f"{display_name}_{sid[:4]}" + + description = ( + "AutoEvolve generated by Codex; " + f"source={c['source_display_name']}({c['source_strategy_id'][:8]}), " + f"estimated_fitness={c['estimated_fitness']:.3f}, " + f"profile={c['profile_name']}" + ) + + cur.execute( + """ + INSERT INTO strategies ( + strategy_id, display_name, schema_version, status, + symbol, direction, + cvd_fast_window, cvd_slow_window, + weight_direction, weight_env, weight_aux, weight_momentum, + entry_score, + gate_vol_enabled, vol_atr_pct_min, + gate_cvd_enabled, + gate_whale_enabled, whale_usd_threshold, whale_flow_pct, + gate_obi_enabled, obi_threshold, + gate_spot_perp_enabled, spot_perp_threshold, + sl_atr_multiplier, tp1_ratio, tp2_ratio, + timeout_minutes, flip_threshold, + initial_balance, current_balance, + description, tags, + created_at, updated_at, status_changed_at + ) VALUES ( + %s, %s, 1, 'running', + %s, %s, + %s, %s, + %s, %s, %s, %s, + %s, + %s, %s, + %s, + %s, %s, %s, + %s, %s, + %s, %s, + %s, %s, %s, + %s, %s, + %s, %s, + %s, %s, + NOW(), NOW(), NOW() + ) + """, + ( + sid, + display_name, + c["symbol"], + c["direction"], + c["cvd_fast_window"], + c["cvd_slow_window"], + c["weight_direction"], + c["weight_env"], + c["weight_aux"], + c["weight_momentum"], + c["entry_score"], + c["gate_vol_enabled"], + c["vol_atr_pct_min"], + c["gate_cvd_enabled"], + c["gate_whale_enabled"], + c["whale_usd_threshold"], + c["whale_flow_pct"], + c["gate_obi_enabled"], + c["obi_threshold"], + c["gate_spot_perp_enabled"], + c["spot_perp_threshold"], + c["sl_atr_multiplier"], + c["tp1_ratio"], + c["tp2_ratio"], + c["timeout_minutes"], + c["flip_threshold"], + c["initial_balance"], + c["initial_balance"], + description, + ["codex", "auto-evolve", f"source:{c['source_strategy_id'][:8]}"], + ), + ) + + return sid + + +def deprecate_overflow_codex(cur, symbol: str, metrics: dict[str, dict], cfg: Config, new_ids: set[str], dry_run: bool) -> list[dict]: + cur.execute( + """ + SELECT + strategy_id::text AS strategy_id, + display_name, + created_at, + symbol + FROM strategies + WHERE status='running' + AND symbol=%s + AND display_name LIKE 'codex优化-%%' + ORDER BY created_at ASC + """, + (symbol,), + ) + rows = [dict(r) for r in cur.fetchall()] + if len(rows) <= cfg.max_codex_running_per_symbol: + return [] + + now = datetime.now(timezone.utc) + min_age = timedelta(hours=cfg.min_codex_age_hours_to_deprecate) + candidates = [] + for r in rows: + sid = r["strategy_id"] + if sid in new_ids: + continue + created_at = r.get("created_at") + if created_at and (now - created_at) < min_age: + continue + m = metrics.get(sid, {}) + fitness = as_float(m.get("fitness"), -999) + candidates.append((fitness, created_at, r)) + + if not candidates: + return [] + + candidates.sort(key=lambda x: (x[0], x[1] or datetime(1970, 1, 1, tzinfo=timezone.utc))) + need_drop = len(rows) - cfg.max_codex_running_per_symbol + drops = [c[2] for c in candidates[:need_drop]] + + if not dry_run: + for d in drops: + cur.execute( + """ + UPDATE strategies + SET status='deprecated', deprecated_at=NOW(), status_changed_at=NOW(), updated_at=NOW() + WHERE strategy_id=%s + """, + (d["strategy_id"],), + ) + + return drops + + +def top_bottom(strategies: list[dict], top_n: int = 3) -> tuple[list[dict], list[dict]]: + s = sorted(strategies, key=lambda x: x.get("fitness", -999), reverse=True) + return s[:top_n], list(reversed(s[-top_n:])) + + +def to_jsonable(value: Any): + if isinstance(value, datetime): + return value.isoformat() + if isinstance(value, Decimal): + return float(value) + if isinstance(value, dict): + return {k: to_jsonable(v) for k, v in value.items()} + if isinstance(value, list): + return [to_jsonable(v) for v in value] + return value + + +def write_report( + cfg: Config, + now_bj: datetime, + strategies: list[dict], + symbol_stats: dict[str, dict], + created: list[dict], + deprecated: list[dict], + plan_logs: list[str], + dry_run: bool, +) -> tuple[Path, Path]: + report_root = Path(cfg.report_dir) + report_root.mkdir(parents=True, exist_ok=True) + day_dir = report_root / now_bj.strftime("%Y-%m-%d") + day_dir.mkdir(parents=True, exist_ok=True) + + stamp = now_bj.strftime("%H%M%S") + md_path = day_dir / f"{stamp}_auto_evolve.md" + json_path = day_dir / f"{stamp}_auto_evolve.json" + + top3, bottom3 = top_bottom(strategies, 3) + + lines = [] + lines.append(f"# Auto Evolve Report ({now_bj.strftime('%Y-%m-%d %H:%M:%S %Z')})") + lines.append("") + lines.append(f"- mode: {'DRY-RUN' if dry_run else 'APPLY'}") + lines.append(f"- lookback_days: {cfg.lookback_days}") + lines.append(f"- running_strategies: {len(strategies)}") + lines.append("") + + lines.append("## Symbol Regime") + for sym in SYMBOLS: + st = symbol_stats.get(sym, {}) + lines.append( + f"- {sym}: regime={st.get('regime','unknown')}, avg_atr_pct={st.get('avg_atr_percentile',0)}, " + f"avg_abs_slope={st.get('avg_abs_cvd_slope',0)}, signal_rate={st.get('signal_rate',0)}" + ) + lines.append("") + + lines.append("## Top 3 (Fitness)") + for r in top3: + lines.append( + f"- {r['display_name']} ({r['symbol']}): fitness={r['fitness']:.3f}, " + f"net_r={as_float(r.get('net_r')):.2f}, closed={as_int(r.get('closed_trades'))}, " + f"win_rate={as_float(r.get('win_rate'))*100:.1f}%" + ) + lines.append("") + + lines.append("## Bottom 3 (Fitness)") + for r in bottom3: + lines.append( + f"- {r['display_name']} ({r['symbol']}): fitness={r['fitness']:.3f}, " + f"net_r={as_float(r.get('net_r')):.2f}, closed={as_int(r.get('closed_trades'))}, " + f"win_rate={as_float(r.get('win_rate'))*100:.1f}%" + ) + lines.append("") + + lines.append("## Candidate Plan") + if plan_logs: + lines.extend([f"- {x}" for x in plan_logs]) + else: + lines.append("- no candidate plan") + lines.append("") + + lines.append("## Created Strategies") + if created: + for c in created: + lines.append( + f"- {c['display_name']} ({c['symbol']}): id={c['strategy_id']}, " + f"source={c['source_display_name']}, est_fitness={c['estimated_fitness']:.3f}" + ) + else: + lines.append("- none") + lines.append("") + + lines.append("## Deprecated Strategies") + if deprecated: + for d in deprecated: + lines.append(f"- {d['display_name']} ({d['symbol']}): id={d['strategy_id']}") + else: + lines.append("- none") + lines.append("") + + md_path.write_text("\n".join(lines), encoding="utf-8") + + payload = { + "generated_at": now_bj.isoformat(), + "mode": "dry-run" if dry_run else "apply", + "lookback_days": cfg.lookback_days, + "running_strategies": len(strategies), + "symbol_stats": symbol_stats, + "top3": top3, + "bottom3": bottom3, + "created": created, + "deprecated": deprecated, + "plan_logs": plan_logs, + } + json_path.write_text( + json.dumps(to_jsonable(payload), ensure_ascii=False, indent=2), + encoding="utf-8", + ) + + return md_path, json_path + + +def main() -> int: + parser = argparse.ArgumentParser(description="Auto evolve daily runner") + parser.add_argument("--config", default=None, help="Path to JSON config") + parser.add_argument("--apply", action="store_true", help="Apply DB mutations (default dry-run)") + args = parser.parse_args() + + cfg = load_config(args.config) + dry_run = not args.apply + now_bj = datetime.now(BJ) + + conn = get_db_conn() + conn.autocommit = False + + created_rows: list[dict] = [] + deprecated_rows: list[dict] = [] + + try: + with conn.cursor(cursor_factory=RealDictCursor) as cur: + strategies = fetch_running_strategies(cur) + metrics = fetch_metrics(cur, cfg.lookback_days) + symbol_stats = fetch_symbol_stats(cur) + + for s in strategies: + m = metrics.get(s["strategy_id"], {}) + s.update(m) + s["fitness"] = compute_fitness(s, cfg.lookback_days, cfg.min_closed_trades) + metrics[s["strategy_id"]] = { + **m, + "fitness": s["fitness"], + } + + plan, plan_logs = choose_new_candidates(strategies, metrics, symbol_stats, cfg) + + new_ids: set[str] = set() + if not dry_run: + for c in plan: + sid = insert_strategy(cur, c, now_bj) + new_ids.add(sid) + created_rows.append( + { + **c, + "strategy_id": sid, + "display_name": build_display_name(c, now_bj), + } + ) + + # 重新拉一次 metrics,让 deprecate 基于最新 running 集合 + metrics = fetch_metrics(cur, cfg.lookback_days) + for sid, m in metrics.items(): + m["fitness"] = compute_fitness(m, cfg.lookback_days, cfg.min_closed_trades) + + for sym in SYMBOLS: + drops = deprecate_overflow_codex(cur, sym, metrics, cfg, new_ids, dry_run=False) + deprecated_rows.extend(drops) + + conn.commit() + else: + for c in plan: + created_rows.append( + { + **c, + "strategy_id": "DRY-RUN", + "display_name": build_display_name(c, now_bj), + } + ) + + # 报告按提交后的状态生成(dry-run 就用当前状态) + if not dry_run: + strategies = fetch_running_strategies(cur) + metrics = fetch_metrics(cur, cfg.lookback_days) + for s in strategies: + m = metrics.get(s["strategy_id"], {}) + s.update(m) + s["fitness"] = compute_fitness(s, cfg.lookback_days, cfg.min_closed_trades) + + md_path, json_path = write_report( + cfg=cfg, + now_bj=now_bj, + strategies=strategies, + symbol_stats=symbol_stats, + created=created_rows, + deprecated=deprecated_rows, + plan_logs=plan_logs, + dry_run=dry_run, + ) + + print(f"[auto-evolve] done mode={'dry-run' if dry_run else 'apply'}") + print(f"[auto-evolve] report_md={md_path}") + print(f"[auto-evolve] report_json={json_path}") + return 0 + + except Exception as e: + conn.rollback() + print(f"[auto-evolve] failed: {e}") + raise + finally: + conn.close() + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/backend/requirements.txt b/backend/requirements.txt index 2acb9a0..820f675 100644 --- a/backend/requirements.txt +++ b/backend/requirements.txt @@ -3,3 +3,5 @@ uvicorn httpx python-dotenv psutil +asyncpg +psycopg2-binary diff --git a/backend/signal_engine.py b/backend/signal_engine.py index 848aed7..7af7bd1 100644 --- a/backend/signal_engine.py +++ b/backend/signal_engine.py @@ -315,6 +315,20 @@ def save_indicator_1m(ts: int, symbol: str, result: dict): # ─── 模拟盘 ────────────────────────────────────────────────────── +def strategy_applies_to_symbol(strategy_cfg: dict, symbol: str) -> bool: + """ + 判断策略是否适用于当前 symbol。 + - V5.4 单币策略:strategy_cfg.symbol 必须匹配; + - 兼容旧版多币策略:若配置了 symbols,则需在列表内。 + """ + strategy_symbol = strategy_cfg.get("symbol") + if strategy_symbol and strategy_symbol != symbol: + return False + allowed_symbols = strategy_cfg.get("symbols", []) + if allowed_symbols and symbol not in allowed_symbols: + return False + return True + # ─── 实时 WebSocket 数据(OBI + 期现背离)──────────────────────── @@ -468,11 +482,17 @@ def main(): snapshot = state.build_evaluation_snapshot(now_ms) strategy_results: list[tuple[dict, dict]] = [] for strategy_cfg in strategy_configs: + if not strategy_applies_to_symbol(strategy_cfg, sym): + continue strategy_result = score_strategy( state, now_ms, strategy_cfg=strategy_cfg, snapshot=snapshot ) strategy_results.append((strategy_cfg, strategy_result)) + if not strategy_results: + logger.warning(f"[{sym}] 当前无可用策略配置,跳过本轮") + continue + # 每个策略独立存储indicator for strategy_cfg, strategy_result in strategy_results: sname = strategy_cfg.get("name") or "v53" @@ -501,9 +521,6 @@ def main(): if not is_strategy_enabled(strategy_name): continue # V5.4: custom策略只处理自己配置的symbol - strategy_symbol = strategy_cfg.get("symbol", "") - if strategy_symbol and strategy_symbol != sym: - continue # per-strategy 方向约束:只接受与策略方向一致的反向信号 dir_cfg_raw = (strategy_cfg.get("direction") or "both").lower() if dir_cfg_raw not in ("long", "short", "both"): @@ -533,10 +550,6 @@ def main(): ) # 模拟盘开仓(需该策略启用 + 跳过冷启动) if is_strategy_enabled(strategy_name) and warmup_cycles <= 0: - # V5.4: custom策略只在自己配置的symbol上开仓 - strategy_symbol = strategy_cfg.get("symbol", "") - if strategy_symbol and strategy_symbol != sym: - continue # 跳过不属于该策略的币种 if not paper_has_active_position(sym, strategy_name): active_count = paper_active_count(strategy_name) if active_count < PAPER_MAX_POSITIONS: diff --git a/backend/strategy_loader.py b/backend/strategy_loader.py index e5a1810..a752ba8 100644 --- a/backend/strategy_loader.py +++ b/backend/strategy_loader.py @@ -53,7 +53,7 @@ def load_strategy_configs() -> list[dict]: "tp2_multiplier": 3.0, }, # 默认支持四个主交易对,其他细节(gates/symbol_gates) - # 在 evaluate_v53 内部有安全的默认值。 + # 在 evaluate_factory_strategy 内部有安全的默认值。 "symbols": ["BTCUSDT", "ETHUSDT", "XRPUSDT", "SOLUSDT"], } ) diff --git a/backend/strategy_scoring.py b/backend/strategy_scoring.py index f319f75..619b1bf 100644 --- a/backend/strategy_scoring.py +++ b/backend/strategy_scoring.py @@ -169,6 +169,18 @@ def evaluate_factory_strategy( # 门1:波动率下限(可关闭) atr_pct_price = atr / price if price > 0 else 0 + + # 市场状态(供复盘/优化使用,不直接改变默认策略行为) + regime = "range" + if atr_pct_price >= 0.012: + regime = "crash" + elif atr_pct_price >= 0.008: + regime = "high_vol" + elif abs(cvd_fast_accel) > 0 and abs(cvd_fast) > 0 and abs(cvd_mid) > 0: + same_dir = (cvd_fast > 0 and cvd_mid > 0) or (cvd_fast < 0 and cvd_mid < 0) + if same_dir and abs(cvd_fast_accel) > 10: + regime = "trend" + if gate_vol_enabled and atr_pct_price < min_vol: gate_block = f"low_vol({atr_pct_price:.4f}<{min_vol})" @@ -434,6 +446,7 @@ def evaluate_factory_strategy( "cvd_fast_5m": cvd_fast if is_fast else None, "factors": { "track": "BTC" if state.symbol == "BTCUSDT" else "ALT", + "regime": regime, "gate_passed": gate_passed, "gate_block": gate_block, "atr_pct_price": round(atr_pct_price, 5), @@ -516,7 +529,7 @@ def evaluate_signal( ) -> dict: """ 统一评分入口: - - v53*/custom_* → evaluate_v53 (V5.3/V5.4 策略工厂) + - v53*/custom_* → evaluate_factory_strategy (V5.3/V5.4 策略工厂) - 其他策略(v51_baseline/v52_8signals 等)→ 视为已下线,返回空结果并记录 warning """ strategy_cfg = strategy_cfg or {} @@ -549,6 +562,3 @@ def evaluate_signal( ) return _empty_result(strategy_name, snap) - -# 向后兼容别名(文档/历史代码中的名字) -evaluate_v53 = evaluate_factory_strategy diff --git a/docs/AI_HANDBOOK.md b/docs/AI_HANDBOOK.md index f3964ac..8069cb0 100644 --- a/docs/AI_HANDBOOK.md +++ b/docs/AI_HANDBOOK.md @@ -125,7 +125,7 @@ - 说明对历史数据/统计的影响; - 更新 full-spec 中对应章节。 3. **信号引擎核心打分/开仓/平仓逻辑** - - `evaluate_factory_strategy`(内部别名 `evaluate_v53`,原 `_evaluate_v53`)/ V5.4 策略工厂中的核心决策逻辑,包括: + - `evaluate_factory_strategy`(V5.4 策略工厂核心)中的核心决策逻辑,包括: - 门控系统(5 个 Gate)的通过/否决条件; - 各层得分的具体计算公式; - `entry_score` / `flip_threshold` 等核心阈值的语义; diff --git a/docs/AUTO_EVOLVE_RUNBOOK.md b/docs/AUTO_EVOLVE_RUNBOOK.md new file mode 100644 index 0000000..1d5968e --- /dev/null +++ b/docs/AUTO_EVOLVE_RUNBOOK.md @@ -0,0 +1,144 @@ +# Auto-Evolve 运行手册 + +## 1. 目标 + +本手册用于把策略工厂升级为“自动进化”流程: + +1. 自动分析(每天) +2. 自动调整(每天每币最多新增 1 条 `codex优化-*`) +3. 自动复盘(输出报告) +4. 自动清理(超配时下线表现最差的 codex 策略) + +> 注意:自动化默认仅在模拟盘执行。任何实盘动作必须人工确认。 + +--- + +## 2. 代码入口 + +- 自动进化脚本:`automation/auto_evolve/run_daily.py` +- 示例配置:`automation/auto_evolve/config.example.json` +- 报告输出目录:`reports/auto-evolve/YYYY-MM-DD/` + +--- + +## 3. 连接信息(快速) + +完整信息见:`docs/OPS_CONNECTIONS.md` + +### 3.1 服务器 + +```bash +gcloud compute ssh instance-20260221-064508 \ + --zone asia-northeast1-b \ + --project gen-lang-client-0835616737 \ + --tunnel-through-iap +``` + +### 3.2 数据库(服务器本地) + +```bash +sudo -u fzq1228 bash -lc "export PGPASSWORD=arb_engine_2026; psql -h 127.0.0.1 -U arb -d arb_engine -c 'SELECT now();'" +``` + +--- + +## 4. 手工运行(建议先 dry-run) + +### 4.0 前置修复:symbol 不一致脏数据清理(一次性) + +```bash +# 先看统计(不删除) +python3 scripts/fix_signal_symbol_mismatch.py + +# 确认后执行删除 +python3 scripts/fix_signal_symbol_mismatch.py --apply +``` + +### 4.1 本地或服务器 dry-run + +```bash +cd /home/fzq1228/Projects/arbitrage-engine +python3 automation/auto_evolve/run_daily.py +``` + +### 4.2 实际执行(写入策略变更) + +```bash +cd /home/fzq1228/Projects/arbitrage-engine +python3 automation/auto_evolve/run_daily.py --apply +``` + +### 4.3 指定配置 + +```bash +python3 automation/auto_evolve/run_daily.py --config automation/auto_evolve/config.example.json --apply +``` + +--- + +## 5. 自动策略逻辑(当前版本) + +### 5.1 分析输入 + +- `strategies`(running 策略) +- `paper_trades`(仅统计 `paper_trades.symbol = strategies.symbol`) +- `signal_indicators`(仅统计 `signal_indicators.symbol = strategies.symbol`) + +### 5.2 评分 + +- `fitness = 净R + 速度因子 + 稳定性因子 + PF因子 - 样本惩罚` +- 样本不足会被惩罚(避免小样本幻觉) + +### 5.3 自动调整 + +- 按币种识别简化 regime:`trend/high_vol/crash/range` +- 每币种最多产出 1 条新策略(可配置) +- 复制父策略门控参数,重点微调: + - CVD 窗口 + - 四层权重 + - entry score + - TP/SL/timeout + +### 5.4 自动下线 + +- 若某币种 `codex优化-*` 运行数超过阈值(默认 3) +- 自动下线最差且达到最小存活时间的策略(默认 24h) + +--- + +## 6. 每日报告内容 + +脚本每次执行都会输出: + +- Top3 / Bottom3(fitness) +- 每币种 regime +- 本次新建策略列表 +- 本次下线策略列表 +- 计划日志(候选生成来源) + +文件: + +- Markdown:`reports/auto-evolve/YYYY-MM-DD/HHMMSS_auto_evolve.md` +- JSON:`reports/auto-evolve/YYYY-MM-DD/HHMMSS_auto_evolve.json` + +--- + +## 7. Codex 每日二次复盘(深度搜索) + +建议每日在脚本执行后,由 Codex 自动执行第二轮复盘: + +1. 读取当天 auto-evolve 报告 +2. 联网做深度搜索(最新策略研究、订单流门控、风险控制) +3. 对照本项目现状判断可落地改进项 +4. 若有明确提升收益的改动,触发 `work-to-down` 模式落地到代码 +5. 生成复盘结论(改了什么、预期收益、风险) + +> 这一步建议通过 Codex Automation 调度,不建议手工每天执行。 + +--- + +## 8. 安全边界 + +- 默认不修改历史交易,不回填旧数据。 +- 自动化只操作策略参数与策略状态,不触碰用户认证/账密。 +- 自动化失败时会回滚事务,不会留下半写入状态。 diff --git a/docs/BACKEND_RUNTIME.md b/docs/BACKEND_RUNTIME.md index 5bfdb3d..4beb627 100644 --- a/docs/BACKEND_RUNTIME.md +++ b/docs/BACKEND_RUNTIME.md @@ -74,7 +74,7 @@ PM2 名称:`signal-engine` - 定时循环(约每 15 秒): 1. 从 `agg_trades` 滑动窗口计算 CVD、ATR、VWAP 等指标; 2. 从 `market_indicators` / `liquidations` 读取宏观指标和爆仓数据; - 3. 对每个策略(`strategies` 表中 `status=running` 的记录)调用评估函数(V5.3 之前 `_evaluate_v53`,V5.4 起为 `evaluate_factory_strategy`,内部别名 `evaluate_v53` 的策略工厂流程); + 3. 对每个策略(`strategies` 表中 `status=running` 的记录)调用评估函数(V5.3 之前 `_evaluate_v53`,V5.4 起为 `evaluate_factory_strategy`); 4. 生成评分与信号(`score`、`signal`),写入 `signal_indicators`; 5. 在满足开仓条件时写入 `paper_trades`,通过 `paper_open_trade()` 等逻辑开模拟仓。 diff --git a/docs/CODEX_DAILY_REVIEW_PROMPT.md b/docs/CODEX_DAILY_REVIEW_PROMPT.md new file mode 100644 index 0000000..e50268f --- /dev/null +++ b/docs/CODEX_DAILY_REVIEW_PROMPT.md @@ -0,0 +1,30 @@ +# Codex 每日自动复盘 Prompt(模板) + +> 供 Codex Automation 使用。目标:每天自动执行脚本 + 深度复盘 + 自动优化。 + +``` +先执行 [$work-to-down](/Users/fanziqi/.codex/skills/work-to-down/SKILL.md)。 + +你在项目 /Users/fanziqi/codex/arbitrage-engine 工作。 + +按顺序完成: +1) 运行自动进化脚本(写库执行): + python3 automation/auto_evolve/run_daily.py --apply +2) 打开最新报告(reports/auto-evolve/YYYY-MM-DD/*_auto_evolve.md),总结: + - 最可能赚钱前三 + - 最可能亏钱前三 + - 每个币种当前 regime +3) 连接数据库交叉验证关键指标(running 策略、24h/7d netR、开仓转化率、active 持仓)。 +4) 进行深度联网搜索(优先策略研究/风控/订单流相关的高质量一手资料),输出“可落地改进清单”。 +5) 若有明确可提升项: + - 直接修改本仓库代码并自测; + - 优先改动 signal_engine / strategy_scoring / automation/auto_evolve; + - 保留风险边界(不破坏生产连接,不做危险删库操作)。 +6) 生成当日复盘结论到 reports/auto-evolve/YYYY-MM-DD/HHMMSS_codex_review.md,内容包括: + - 今日问题 + - 今日改进 + - 预期收益提升点 + - 风险与回滚建议 + +执行原则:做到为止,不要只给建议,能落地就直接落地。 +``` diff --git a/docs/OPS_CONNECTIONS.md b/docs/OPS_CONNECTIONS.md index edf5f7a..0958b6c 100644 --- a/docs/OPS_CONNECTIONS.md +++ b/docs/OPS_CONNECTIONS.md @@ -1,82 +1,39 @@ -# 运维连接手册(Cloud SQL + GCE) +# 运维连接手册(本地 PostgreSQL + GCE) -本手册记录本项目当前线上环境的关键连接信息。 -目标:后续只看这一个文档,就能快速连上数据库和服务器。 +本手册记录项目线上关键连接信息: +- 怎么连线上服务器; +- 怎么连服务器本地数据库(当前生产数据源); +- 线上项目目录和常用排查命令。 ## 0. 固定信息 - GCP Project ID: `gen-lang-client-0835616737` -- Cloud SQL instance connection name: `gen-lang-client-0835616737:asia-northeast1:arb-db` -- Cloud SQL DB: `arb_engine` -- DB user: `arb` -- DB password (当前): `arb_engine_2026` - 线上 VM: `instance-20260221-064508` - Zone: `asia-northeast1-b` - 线上项目目录: `/home/fzq1228/Projects/arbitrage-engine` +- 线上数据库(服务器本地 PostgreSQL) + - Host: `127.0.0.1` + - Port: `5432` + - DB: `arb_engine` + - User: `arb` + - Password: `arb_engine_2026` -## 1. 本机连接 Cloud SQL +## 1. 连接线上服务器(GCE) -### 1.1 启动 Proxy(本机) - -开一个终端窗口常驻运行: - -```bash -cloud-sql-proxy gen-lang-client-0835616737:asia-northeast1:arb-db --port 9470 -``` - -如果系统找不到命令,用完整路径: - -```bash -/opt/homebrew/share/google-cloud-sdk/bin/cloud-sql-proxy \ - gen-lang-client-0835616737:asia-northeast1:arb-db \ - --port 9470 -``` - -### 1.2 测试 DB 连通性 - -```bash -PGPASSWORD=arb_engine_2026 psql -h 127.0.0.1 -p 9470 -U arb -d arb_engine -c "SELECT 1;" -``` - -进入交互式 SQL: - -```bash -PGPASSWORD=arb_engine_2026 psql -h 127.0.0.1 -p 9470 -U arb -d arb_engine -``` - -### 1.3 Proxy 常见修复 - -如果报错类似 `server closed the connection unexpectedly` 或 `connection refused`: - -```bash -pkill -f "cloud-sql-proxy gen-lang-client-0835616737:asia-northeast1:arb-db --port 9470" || true -cloud-sql-proxy gen-lang-client-0835616737:asia-northeast1:arb-db --port 9470 -``` - -## 2. 连接线上服务器(GCE) - -### 2.1 先确认账号和项目 +### 1.1 先确认账号和项目 ```bash gcloud auth list --filter=status:ACTIVE --format='value(account)' gcloud config get-value project ``` -如需切换项目: +如需切项目: ```bash gcloud config set project gen-lang-client-0835616737 ``` -### 2.2 查看实例 - -```bash -gcloud compute instances list --format='table(name,zone,status,networkInterfaces[0].networkIP,networkInterfaces[0].accessConfigs[0].natIP)' -``` - -### 2.3 SSH(推荐 IAP) - -注意:该实例直连 22 端口可能失败,推荐固定用 IAP: +### 1.2 SSH(推荐 IAP) ```bash gcloud compute ssh instance-20260221-064508 \ @@ -95,53 +52,93 @@ gcloud compute ssh instance-20260221-064508 \ --command "hostname; whoami; uptime; pwd" ``` -## 3. 线上项目目录与运行状态 +## 2. 连接服务器本地数据库(当前生产) -### 3.1 项目目录 +### 2.1 在服务器内直接 psql + +```bash +sudo -u fzq1228 bash -lc "export PGPASSWORD=arb_engine_2026; psql -h 127.0.0.1 -U arb -d arb_engine -c 'SELECT now();'" +``` + +进入交互式: + +```bash +sudo -u fzq1228 bash -lc "export PGPASSWORD=arb_engine_2026; psql -h 127.0.0.1 -U arb -d arb_engine" +``` + +### 2.2 从本机临时连(SSH Tunnel) + +如果需要在本机 SQL 客户端查看: + +```bash +# 本机开隧道 + gcloud compute ssh instance-20260221-064508 \ + --zone asia-northeast1-b \ + --project gen-lang-client-0835616737 \ + --tunnel-through-iap \ + -- -N -L 9432:127.0.0.1:5432 +``` + +新开终端连本地映射端口: + +```bash +PGPASSWORD=arb_engine_2026 psql -h 127.0.0.1 -p 9432 -U arb -d arb_engine -c "SELECT 1;" +``` + +## 3. 线上项目目录与运行状态 - Repo root: `/home/fzq1228/Projects/arbitrage-engine` - Backend: `/home/fzq1228/Projects/arbitrage-engine/backend` - Frontend: `/home/fzq1228/Projects/arbitrage-engine/frontend` - -### 3.2 当前运行形态(关键) - - 运行用户:`fzq1228` -- 进程管理:`pm2`(PM2 God Daemon 在跑) -- 关键进程: - - `python3 signal_engine.py` - - `python3 paper_monitor.py` - - `uvicorn main:app --host 0.0.0.0 --port 4332` - - `next start -p 4333` +- 进程管理:`pm2` -进程核对命令: - -```bash -ps -ef | egrep -i 'signal_engine|paper_monitor|uvicorn|next start|PM2' | grep -v egrep -``` - -## 4. 常用线上操作命令 - -进入项目目录: +常用检查: ```bash cd /home/fzq1228/Projects/arbitrage-engine +pm2 list +pm2 logs --lines 100 ``` -查看分支和最近提交(避免 safe.directory 报错): +重点进程: +- `signal-engine` +- `paper-monitor` +- `arb-api` +- `agg-collector` +- `market-collector` +- `liq-collector` +- `position-sync` +- `risk-guard` + +## 4. 版本同步 + +查看服务器分支和提交: ```bash -sudo git -c safe.directory=/home/fzq1228/Projects/arbitrage-engine -C /home/fzq1228/Projects/arbitrage-engine rev-parse --abbrev-ref HEAD -sudo git -c safe.directory=/home/fzq1228/Projects/arbitrage-engine -C /home/fzq1228/Projects/arbitrage-engine log --oneline -1 +sudo git -c safe.directory=/home/fzq1228/Projects/arbitrage-engine \ + -C /home/fzq1228/Projects/arbitrage-engine rev-parse --abbrev-ref HEAD +sudo git -c safe.directory=/home/fzq1228/Projects/arbitrage-engine \ + -C /home/fzq1228/Projects/arbitrage-engine log --oneline -1 ``` -## 5. 快速故障排查 +服务器拉代码: -- DB 报 `connection refused` - - Proxy 没开或者端口不是 `9470`。 -- DB 报 `server closed the connection unexpectedly` - - 通常是旧 Proxy 假活着,重启 Proxy。 -- SSH 报 `Connection closed by ... port 22` - - 改用 `--tunnel-through-iap`。 -- 能 SSH 但找不到项目目录 - - 当前在线项目目录固定是 `/home/fzq1228/Projects/arbitrage-engine`,不要只在 `~/` 下面找。 +```bash +cd /home/fzq1228/Projects/arbitrage-engine +git pull origin codex/codex_dev +``` +## 5. 常见故障排查 + +- API 正常但策略不出信号: + - 看 `signal-engine` 日志; + - 检查 `strategies.status='running'`; + - 检查 `signal_indicators` 最近 5 分钟是否持续写入。 +- 有信号但不开仓: + - 查 `paper_trades` 是否达到 `max_positions`; + - 查 `flip_threshold/entry_score` 是否过高; + - 查 `paper_monitor` 和 `signal_engine` 日志是否有拒绝原因。 +- 数据断流: + - 检查 `agg_trades` 最新 `time_ms`; + - 检查 `agg-collector`、`market-collector`、`liq-collector` 进程。 diff --git a/docs/STRATEGY_FACTORY_VALIDATION.md b/docs/STRATEGY_FACTORY_VALIDATION.md index dfd1e2b..c8c53b1 100644 --- a/docs/STRATEGY_FACTORY_VALIDATION.md +++ b/docs/STRATEGY_FACTORY_VALIDATION.md @@ -49,7 +49,7 @@ - `load_strategy_configs_from_db()`:把这些行映射成内部配置 dict。 - **决策 / 落库** - - `evaluate_factory_strategy()`(别名 `evaluate_v53`,原 `_evaluate_v53`):统一的评分与 Gates 逻辑; + - `evaluate_factory_strategy()`(原 `_evaluate_v53`):统一的评分与 Gates 逻辑; - `save_indicator()`:写 `signal_indicators`; - `paper_open_trade()`:写 `paper_trades`; - `NOTIFY new_signal`:推送给 live_executor 等其他进程。 @@ -81,7 +81,7 @@ - ✅ 其他窗口(5m/15m/1h)的 CVD 是否通过对已有窗口(win_fast/win_mid)的列表按时间切片重算,而不是重新查库。 - 实现:通过 `_window_ms()` 把窗口字符串转毫秒,从 `win_fast`/`win_mid` 的 `trades` 列表按 `t_ms >= now_ms - window_ms` 切片重算,未重复查库。 - ✅ 所有窗口的截止时间点是否都使用当前 `now_ms`,不会出现“5m 用旧 now_ms,30m 用新 now_ms”的错位。 - - 实现:`build_evaluation_snapshot(now_ms)` 与 `evaluate_factory_strategy(now_ms, ...)`(内部别名 `evaluate_v53`)使用同一个 `now_ms`,动态切片和快照在同一时间基准上计算。 + - 实现:`build_evaluation_snapshot(now_ms)` 与 `evaluate_factory_strategy(now_ms, ...)` 使用同一个 `now_ms`,动态切片和快照在同一时间基准上计算。 - ⚠️ 斜率与加速度: - ✅ 定义:`cvd_fast_slope = cvd_fast(now) - prev_cvd_fast`,`cvd_fast_accel = cvd_fast_slope(now) - prev_cvd_fast_slope`,在持续运行时语义正确; - ⚠️ 冷启动第一帧使用 `prev_* = 0` 计算,未做专门回退为 0 的处理,可能使第一帧 slope/accel 数值偏大;目前视为需要后续评估/是否调整的注意点。 diff --git a/docs/arbitrage-engine-full-spec.md b/docs/arbitrage-engine-full-spec.md index 8d2c28a..0b90fd8 100644 --- a/docs/arbitrage-engine-full-spec.md +++ b/docs/arbitrage-engine-full-spec.md @@ -328,7 +328,7 @@ Admin 账号:`fzq1228@gmail.com`,role=admin。 - fast 周期 > 30m → 从 `win_mid.trades` 切片 - slow 周期 → 始终从 `win_mid.trades` 切片 -### 5.3 评分模型(`evaluate_factory_strategy`,别名 `evaluate_v53`,原 `_evaluate_v53`) +### 5.3 评分模型(`evaluate_factory_strategy`,原 `_evaluate_v53`) **四层线性评分**,总分 = 各层得分之和,满分 = 各层权重之和。 @@ -555,7 +555,7 @@ agg_trades 表(PostgreSQL 分区表) ↓ signal_engine.py(每15秒读取) 滑动窗口(win_fast 30m / win_mid 4h / win_day 24h) ↓ 计算CVD/ATR/VWAP -evaluate_factory_strategy() (内部别名 evaluate_v53,原 _evaluate_v53) +evaluate_factory_strategy() (原 _evaluate_v53) ↓ 5门检查 → 四层评分 signal_indicators 写入 ↓ signal IS NOT NULL 且 score >= entry_score diff --git a/reports/auto-evolve/.gitkeep b/reports/auto-evolve/.gitkeep new file mode 100644 index 0000000..e69de29 diff --git a/scripts/fix_signal_symbol_mismatch.py b/scripts/fix_signal_symbol_mismatch.py new file mode 100755 index 0000000..c6a0860 --- /dev/null +++ b/scripts/fix_signal_symbol_mismatch.py @@ -0,0 +1,84 @@ +#!/usr/bin/env python3 +""" +清理 signal_indicators 中 strategy_id 对应 symbol 不一致的历史脏数据。 + +默认 dry-run;加 --apply 才会真正删除。 +""" + +from __future__ import annotations + +import argparse +import os + +try: + import psycopg2 +except Exception: + psycopg2 = None + + +def get_conn(): + if psycopg2 is None: + raise RuntimeError("缺少 psycopg2 依赖,请先安装:pip install psycopg2-binary") + host = os.getenv("PG_HOST") or os.getenv("DB_HOST") or "127.0.0.1" + port = int(os.getenv("PG_PORT") or os.getenv("DB_PORT") or 5432) + dbname = os.getenv("PG_DB") or os.getenv("DB_NAME") or "arb_engine" + user = os.getenv("PG_USER") or os.getenv("DB_USER") or "arb" + password = os.getenv("PG_PASS") or os.getenv("DB_PASS") or "arb_engine_2026" + return psycopg2.connect(host=host, port=port, dbname=dbname, user=user, password=password) + + +def main() -> int: + parser = argparse.ArgumentParser() + parser.add_argument("--apply", action="store_true", help="执行删除") + args = parser.parse_args() + + conn = get_conn() + conn.autocommit = False + + try: + with conn.cursor() as cur: + cur.execute( + """ + SELECT + COUNT(*) AS mismatch_rows, + to_timestamp(min(si.ts)/1000.0) AS first_ts_utc, + to_timestamp(max(si.ts)/1000.0) AS last_ts_utc + FROM signal_indicators si + JOIN strategies s ON s.strategy_id = si.strategy_id + WHERE si.symbol <> s.symbol + """ + ) + mismatch_rows, first_ts, last_ts = cur.fetchone() + print(f"mismatch_rows={mismatch_rows}, range={first_ts} -> {last_ts}") + + if mismatch_rows == 0: + print("无需清理") + conn.rollback() + return 0 + + if not args.apply: + print("dry-run 模式,仅输出统计;加 --apply 执行删除") + conn.rollback() + return 0 + + cur.execute( + """ + DELETE FROM signal_indicators si + USING strategies s + WHERE si.strategy_id = s.strategy_id + AND si.symbol <> s.symbol + """ + ) + print(f"deleted_rows={cur.rowcount}") + + conn.commit() + return 0 + except Exception: + conn.rollback() + raise + finally: + conn.close() + + +if __name__ == "__main__": + raise SystemExit(main())