#!/usr/bin/env python3 """ Auto Evolve Daily Runner 目标: 1) 基于最近数据自动复盘 running 策略; 2) 每个币种生成 1 个 codex 优化候选策略; 3) 自动下线超配且表现最差的 codex 策略; 4) 产出可审计报告(Markdown + JSON)。 注意: - 默认 dry-run,仅输出建议不写库; - 传 --apply 才会真正写入 DB。 """ from __future__ import annotations import argparse import json import os import uuid from dataclasses import dataclass from decimal import Decimal from datetime import datetime, timedelta, timezone from pathlib import Path from typing import Any try: import psycopg2 from psycopg2.extras import RealDictCursor except Exception: # pragma: no cover - runtime dependency guard psycopg2 = None RealDictCursor = None BJ = timezone(timedelta(hours=8)) SYMBOLS = ("BTCUSDT", "ETHUSDT", "SOLUSDT", "XRPUSDT") WINDOW_MINUTES = {"5m": 5, "15m": 15, "30m": 30, "1h": 60, "4h": 240} WINDOW_PAIRS = [ ("5m", "30m"), ("5m", "1h"), ("5m", "4h"), ("15m", "30m"), ("15m", "1h"), ("15m", "4h"), ("30m", "1h"), ("30m", "4h"), ] @dataclass class Config: lookback_days: int = 7 min_closed_trades: int = 12 max_new_per_symbol: int = 1 max_codex_running_per_symbol: int = 3 min_codex_age_hours_to_deprecate: int = 24 report_dir: str = "reports/auto-evolve" def load_config(path: str | None) -> Config: cfg = Config() if not path: return cfg p = Path(path) if not p.exists(): return cfg raw = json.loads(p.read_text(encoding="utf-8")) for k in cfg.__dataclass_fields__: if k in raw: setattr(cfg, k, raw[k]) return cfg def get_db_conn(): if psycopg2 is None: raise RuntimeError( "缺少 psycopg2 依赖,请先安装:pip install psycopg2-binary" ) host = os.getenv("PG_HOST") or os.getenv("DB_HOST") or "127.0.0.1" port = int(os.getenv("PG_PORT") or os.getenv("DB_PORT") or 5432) dbname = os.getenv("PG_DB") or os.getenv("DB_NAME") or "arb_engine" user = os.getenv("PG_USER") or os.getenv("DB_USER") or "arb" password = os.getenv("PG_PASS") or os.getenv("DB_PASS") or "arb_engine_2026" return psycopg2.connect( host=host, port=port, dbname=dbname, user=user, password=password, ) def as_float(v: Any, default: float = 0.0) -> float: if v is None: return default try: return float(v) except Exception: return default def as_int(v: Any, default: int = 0) -> int: if v is None: return default try: return int(v) except Exception: return default def detect_regime(avg_atr_pct: float, avg_abs_slope: float, signal_rate: float) -> str: if avg_atr_pct >= 85: return "crash" if avg_atr_pct >= 70: return "high_vol" if avg_abs_slope >= 30 and signal_rate >= 0.02: return "trend" return "range" def compute_fitness(row: dict, lookback_days: int, min_closed: int) -> float: closed = as_int(row.get("closed_trades")) net_r = as_float(row.get("net_r")) win_rate = as_float(row.get("win_rate")) gross_profit = as_float(row.get("gross_profit")) gross_loss = as_float(row.get("gross_loss")) r_per_day = net_r / max(lookback_days, 1) profit_factor = gross_profit / gross_loss if gross_loss > 0 else (2.0 if gross_profit > 0 else 0.0) sample_penalty = max(min_closed - closed, 0) * 0.35 consistency_bonus = (win_rate - 0.5) * 2.5 score = ( net_r + 0.9 * r_per_day + 0.6 * (profit_factor - 1.0) + 0.5 * consistency_bonus - sample_penalty ) return round(score, 4) def fetch_running_strategies(cur) -> list[dict]: cur.execute( """ SELECT strategy_id::text AS strategy_id, display_name, symbol, direction, cvd_fast_window, cvd_slow_window, weight_direction, weight_env, weight_aux, weight_momentum, entry_score, gate_vol_enabled, vol_atr_pct_min, gate_cvd_enabled, gate_whale_enabled, whale_usd_threshold, whale_flow_pct, gate_obi_enabled, obi_threshold, gate_spot_perp_enabled, spot_perp_threshold, sl_atr_multiplier, tp1_ratio, tp2_ratio, timeout_minutes, flip_threshold, initial_balance, current_balance, created_at, updated_at, description FROM strategies WHERE status='running' ORDER BY symbol, created_at """ ) return [dict(r) for r in cur.fetchall()] def fetch_metrics(cur, lookback_days: int) -> dict[str, dict]: cur.execute( """ WITH trade_agg AS ( SELECT s.strategy_id::text AS strategy_id, COUNT(*) FILTER ( WHERE p.status <> 'active' AND p.symbol = s.symbol AND p.created_at >= NOW() - (%s || ' days')::interval ) AS closed_trades, COALESCE(SUM(CASE WHEN p.status <> 'active' AND p.symbol = s.symbol AND p.created_at >= NOW() - (%s || ' days')::interval THEN p.pnl_r ELSE 0 END), 0) AS net_r, COALESCE(AVG(CASE WHEN p.status <> 'active' AND p.symbol = s.symbol AND p.created_at >= NOW() - (%s || ' days')::interval THEN p.pnl_r END), 0) AS avg_r, COALESCE(AVG(CASE WHEN p.status <> 'active' AND p.symbol = s.symbol AND p.created_at >= NOW() - (%s || ' days')::interval THEN CASE WHEN p.pnl_r > 0 THEN 1 ELSE 0 END END), 0) AS win_rate, COALESCE(SUM(CASE WHEN p.status <> 'active' AND p.symbol = s.symbol AND p.created_at >= NOW() - (%s || ' days')::interval AND p.pnl_r > 0 THEN p.pnl_r ELSE 0 END), 0) AS gross_profit, COALESCE(ABS(SUM(CASE WHEN p.status <> 'active' AND p.symbol = s.symbol AND p.created_at >= NOW() - (%s || ' days')::interval AND p.pnl_r < 0 THEN p.pnl_r ELSE 0 END)), 0) AS gross_loss FROM strategies s LEFT JOIN paper_trades p ON p.strategy_id = s.strategy_id WHERE s.status='running' GROUP BY s.strategy_id ), signal_agg AS ( SELECT s.strategy_id::text AS strategy_id, COUNT(*) FILTER ( WHERE si.ts >= (extract(epoch from NOW() - interval '24 hours') * 1000)::bigint AND si.symbol = s.symbol ) AS ticks_24h, COUNT(*) FILTER ( WHERE si.ts >= (extract(epoch from NOW() - interval '24 hours') * 1000)::bigint AND si.symbol = s.symbol AND COALESCE(si.signal, '') <> '' ) AS entry_signals_24h, COALESCE(AVG(si.score) FILTER ( WHERE si.ts >= (extract(epoch from NOW() - interval '24 hours') * 1000)::bigint AND si.symbol = s.symbol AND COALESCE(si.signal, '') <> '' ), 0) AS avg_signal_score_24h FROM strategies s LEFT JOIN signal_indicators si ON si.strategy_id = s.strategy_id WHERE s.status='running' GROUP BY s.strategy_id ) SELECT s.strategy_id::text AS strategy_id, s.symbol, COALESCE(t.closed_trades, 0) AS closed_trades, COALESCE(t.net_r, 0) AS net_r, COALESCE(t.avg_r, 0) AS avg_r, COALESCE(t.win_rate, 0) AS win_rate, COALESCE(t.gross_profit, 0) AS gross_profit, COALESCE(t.gross_loss, 0) AS gross_loss, COALESCE(sa.ticks_24h, 0) AS ticks_24h, COALESCE(sa.entry_signals_24h, 0) AS entry_signals_24h, COALESCE(sa.avg_signal_score_24h, 0) AS avg_signal_score_24h FROM strategies s LEFT JOIN trade_agg t ON t.strategy_id = s.strategy_id::text LEFT JOIN signal_agg sa ON sa.strategy_id = s.strategy_id::text WHERE s.status='running' """, (lookback_days, lookback_days, lookback_days, lookback_days, lookback_days, lookback_days), ) data: dict[str, dict] = {} for r in cur.fetchall(): data[r["strategy_id"]] = dict(r) return data def fetch_symbol_stats(cur) -> dict[str, dict]: cur.execute( """ SELECT si.symbol, COALESCE(AVG(si.atr_percentile), 0) AS avg_atr_percentile, COALESCE(AVG(ABS(si.cvd_fast_slope)), 0) AS avg_abs_cvd_slope, COALESCE(AVG(CASE WHEN COALESCE(si.signal,'') <> '' THEN 1 ELSE 0 END), 0) AS signal_rate, COUNT(*) AS rows_24h FROM signal_indicators si JOIN strategies s ON s.strategy_id = si.strategy_id WHERE s.status='running' AND si.symbol = s.symbol AND si.ts >= (extract(epoch from NOW() - interval '24 hours') * 1000)::bigint GROUP BY si.symbol """ ) stats: dict[str, dict] = {} for r in cur.fetchall(): avg_atr = as_float(r["avg_atr_percentile"]) avg_slope = as_float(r["avg_abs_cvd_slope"]) signal_rate = as_float(r["signal_rate"]) regime = detect_regime(avg_atr, avg_slope, signal_rate) stats[r["symbol"]] = { "avg_atr_percentile": round(avg_atr, 2), "avg_abs_cvd_slope": round(avg_slope, 2), "signal_rate": round(signal_rate, 5), "rows_24h": as_int(r["rows_24h"]), "regime": regime, } return stats def normalize_weights(wd: int, we: int, wa: int, wm: int) -> tuple[int, int, int, int]: vals = [max(0, wd), max(0, we), max(0, wa), max(0, wm)] s = sum(vals) if s <= 0: return (55, 25, 15, 5) scaled = [round(v * 100 / s) for v in vals] diff = 100 - sum(scaled) scaled[0] += diff return tuple(int(max(0, v)) for v in scaled) def mutate_profile(parent: dict, regime: str) -> list[dict]: base_entry = as_int(parent["entry_score"], 75) base_sl = as_float(parent["sl_atr_multiplier"], 1.8) base_timeout = as_int(parent["timeout_minutes"], 240) if regime in ("trend", "high_vol"): tp_profiles = [ ("激进", 1.25, 2.7, max(1.5, base_sl)), ("平衡", 1.0, 2.1, max(1.6, base_sl)), ("保守", 0.8, 1.6, max(1.5, base_sl - 0.1)), ] entry_steps = [0, 2, 4] timeout_choices = [max(120, base_timeout - 60), base_timeout] elif regime == "crash": tp_profiles = [ ("防守", 0.7, 1.4, max(1.3, base_sl - 0.3)), ("平衡", 0.9, 1.8, max(1.4, base_sl - 0.2)), ] entry_steps = [3, 5] timeout_choices = [max(90, base_timeout - 120), max(120, base_timeout - 60)] else: tp_profiles = [ ("保守", 0.75, 1.5, max(1.4, base_sl - 0.2)), ("平衡", 1.0, 2.0, max(1.5, base_sl - 0.1)), ("激进", 1.3, 2.4, max(1.6, base_sl)), ] entry_steps = [-2, 0, 2] timeout_choices = [base_timeout, min(360, base_timeout + 60)] candidates = [] for fast, slow in WINDOW_PAIRS: for profile_name, tp1, tp2, sl in tp_profiles: for step in entry_steps: for timeout_min in timeout_choices: entry = min(95, max(60, base_entry + step)) wd, we, wa, wm = ( as_int(parent["weight_direction"], 55), as_int(parent["weight_env"], 25), as_int(parent["weight_aux"], 15), as_int(parent["weight_momentum"], 5), ) if regime in ("trend", "high_vol"): wd, we, wa, wm = normalize_weights(wd + 4, we + 1, wa - 3, wm - 2) elif regime == "crash": wd, we, wa, wm = normalize_weights(wd + 2, we + 4, wa - 4, wm - 2) else: wd, we, wa, wm = normalize_weights(wd - 1, we + 3, wa + 1, wm - 3) c = { "cvd_fast_window": fast, "cvd_slow_window": slow, "entry_score": entry, "sl_atr_multiplier": round(sl, 2), "tp1_ratio": round(tp1, 2), "tp2_ratio": round(tp2, 2), "timeout_minutes": int(timeout_min), "weight_direction": wd, "weight_env": we, "weight_aux": wa, "weight_momentum": wm, "profile_name": profile_name, } candidates.append(c) return candidates def candidate_signature(c: dict) -> tuple: return ( c["cvd_fast_window"], c["cvd_slow_window"], c["entry_score"], c["sl_atr_multiplier"], c["tp1_ratio"], c["tp2_ratio"], c["timeout_minutes"], c["weight_direction"], c["weight_env"], c["weight_aux"], c["weight_momentum"], ) def estimate_candidate_score(parent_metric: dict, candidate: dict, regime: str) -> float: base = as_float(parent_metric.get("fitness"), 0) signal_density = as_float(parent_metric.get("entry_signals_24h"), 0) closed_trades = as_int(parent_metric.get("closed_trades"), 0) bonus = 0.0 if regime in ("trend", "high_vol") and candidate["tp2_ratio"] >= 2.2: bonus += 0.45 if regime == "range" and candidate["entry_score"] <= as_int(parent_metric.get("entry_score"), 75): bonus += 0.25 if signal_density < 20 and candidate["entry_score"] < as_int(parent_metric.get("entry_score"), 75): bonus += 0.35 if signal_density > 120 and candidate["entry_score"] > as_int(parent_metric.get("entry_score"), 75): bonus += 0.2 if closed_trades < 10: bonus -= 0.25 if candidate["sl_atr_multiplier"] < 1.3 and regime in ("high_vol", "crash"): bonus -= 0.4 if candidate["cvd_fast_window"] == candidate["cvd_slow_window"]: bonus -= 0.2 return round(base + bonus, 4) def choose_new_candidates( strategies: list[dict], metrics: dict[str, dict], symbol_stats: dict[str, dict], cfg: Config, ) -> tuple[list[dict], list[str]]: by_symbol: dict[str, list[dict]] = {sym: [] for sym in SYMBOLS} for s in strategies: if s["symbol"] in by_symbol: row = dict(s) row.update(metrics.get(s["strategy_id"], {})) row["fitness"] = compute_fitness(row, cfg.lookback_days, cfg.min_closed_trades) by_symbol[s["symbol"]].append(row) created_plan: list[dict] = [] logs: list[str] = [] for sym in SYMBOLS: symbol_rows = by_symbol.get(sym, []) if not symbol_rows: logs.append(f"[{sym}] 无 running 策略,跳过") continue symbol_rows.sort(key=lambda x: x["fitness"], reverse=True) regime = symbol_stats.get(sym, {}).get("regime", "range") existing_sigs = { candidate_signature( { "cvd_fast_window": r["cvd_fast_window"], "cvd_slow_window": r["cvd_slow_window"], "entry_score": as_int(r["entry_score"], 75), "sl_atr_multiplier": round(as_float(r["sl_atr_multiplier"], 1.8), 2), "tp1_ratio": round(as_float(r["tp1_ratio"], 1.0), 2), "tp2_ratio": round(as_float(r["tp2_ratio"], 2.0), 2), "timeout_minutes": as_int(r["timeout_minutes"], 240), "weight_direction": as_int(r["weight_direction"], 55), "weight_env": as_int(r["weight_env"], 25), "weight_aux": as_int(r["weight_aux"], 15), "weight_momentum": as_int(r["weight_momentum"], 5), } ) for r in symbol_rows } parent = symbol_rows[0] parent_for_est = dict(parent) parent_for_est["entry_signals_24h"] = as_int(parent.get("entry_signals_24h"), 0) pool = mutate_profile(parent, regime) scored_pool = [] for c in pool: sig = candidate_signature(c) if sig in existing_sigs: continue score = estimate_candidate_score(parent_for_est, c, regime) c2 = dict(c) c2["estimated_fitness"] = score c2["source_strategy_id"] = parent["strategy_id"] c2["source_display_name"] = parent["display_name"] c2["symbol"] = sym c2["direction"] = parent["direction"] c2["gate_vol_enabled"] = parent["gate_vol_enabled"] c2["vol_atr_pct_min"] = as_float(parent["vol_atr_pct_min"], 0.002) c2["gate_cvd_enabled"] = parent["gate_cvd_enabled"] c2["gate_whale_enabled"] = parent["gate_whale_enabled"] c2["whale_usd_threshold"] = as_float(parent["whale_usd_threshold"], 50000) c2["whale_flow_pct"] = as_float(parent["whale_flow_pct"], 0.5) c2["gate_obi_enabled"] = parent["gate_obi_enabled"] c2["obi_threshold"] = as_float(parent["obi_threshold"], 0.35) c2["gate_spot_perp_enabled"] = parent["gate_spot_perp_enabled"] c2["spot_perp_threshold"] = as_float(parent["spot_perp_threshold"], 0.005) c2["flip_threshold"] = as_int(parent["flip_threshold"], max(c2["entry_score"], 75)) c2["initial_balance"] = as_float(parent["initial_balance"], 10000) scored_pool.append(c2) scored_pool.sort(key=lambda x: x["estimated_fitness"], reverse=True) if not scored_pool: logs.append(f"[{sym}] 无可生成的新候选(参数已覆盖)") continue top_n = max(1, cfg.max_new_per_symbol) picks = scored_pool[:top_n] created_plan.extend(picks) logs.append( f"[{sym}] regime={regime} parent={parent['display_name']} -> candidate={len(picks)}" ) return created_plan, logs def build_display_name(candidate: dict, now_bj: datetime) -> str: sym = candidate["symbol"].replace("USDT", "") fw = candidate["cvd_fast_window"] sw = candidate["cvd_slow_window"] profile = candidate["profile_name"] stamp = now_bj.strftime("%m%d") return f"codex优化-{sym}_CVD{fw}x{sw}_TP{profile}_{stamp}" def insert_strategy(cur, c: dict, now_bj: datetime) -> str: sid = str(uuid.uuid4()) display_name = build_display_name(c, now_bj) cur.execute("SELECT 1 FROM strategies WHERE display_name=%s", (display_name,)) if cur.fetchone(): display_name = f"{display_name}_{sid[:4]}" description = ( "AutoEvolve generated by Codex; " f"source={c['source_display_name']}({c['source_strategy_id'][:8]}), " f"estimated_fitness={c['estimated_fitness']:.3f}, " f"profile={c['profile_name']}" ) cur.execute( """ INSERT INTO strategies ( strategy_id, display_name, schema_version, status, symbol, direction, cvd_fast_window, cvd_slow_window, weight_direction, weight_env, weight_aux, weight_momentum, entry_score, gate_vol_enabled, vol_atr_pct_min, gate_cvd_enabled, gate_whale_enabled, whale_usd_threshold, whale_flow_pct, gate_obi_enabled, obi_threshold, gate_spot_perp_enabled, spot_perp_threshold, sl_atr_multiplier, tp1_ratio, tp2_ratio, timeout_minutes, flip_threshold, initial_balance, current_balance, description, tags, created_at, updated_at, status_changed_at ) VALUES ( %s, %s, 1, 'running', %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, NOW(), NOW(), NOW() ) """, ( sid, display_name, c["symbol"], c["direction"], c["cvd_fast_window"], c["cvd_slow_window"], c["weight_direction"], c["weight_env"], c["weight_aux"], c["weight_momentum"], c["entry_score"], c["gate_vol_enabled"], c["vol_atr_pct_min"], c["gate_cvd_enabled"], c["gate_whale_enabled"], c["whale_usd_threshold"], c["whale_flow_pct"], c["gate_obi_enabled"], c["obi_threshold"], c["gate_spot_perp_enabled"], c["spot_perp_threshold"], c["sl_atr_multiplier"], c["tp1_ratio"], c["tp2_ratio"], c["timeout_minutes"], c["flip_threshold"], c["initial_balance"], c["initial_balance"], description, ["codex", "auto-evolve", f"source:{c['source_strategy_id'][:8]}"], ), ) return sid def deprecate_overflow_codex(cur, symbol: str, metrics: dict[str, dict], cfg: Config, new_ids: set[str], dry_run: bool) -> list[dict]: cur.execute( """ SELECT strategy_id::text AS strategy_id, display_name, created_at, symbol FROM strategies WHERE status='running' AND symbol=%s AND display_name LIKE 'codex优化-%%' ORDER BY created_at ASC """, (symbol,), ) rows = [dict(r) for r in cur.fetchall()] if len(rows) <= cfg.max_codex_running_per_symbol: return [] now = datetime.now(timezone.utc) min_age = timedelta(hours=cfg.min_codex_age_hours_to_deprecate) candidates = [] for r in rows: sid = r["strategy_id"] if sid in new_ids: continue created_at = r.get("created_at") if created_at and (now - created_at) < min_age: continue m = metrics.get(sid, {}) fitness = as_float(m.get("fitness"), -999) candidates.append((fitness, created_at, r)) if not candidates: return [] candidates.sort(key=lambda x: (x[0], x[1] or datetime(1970, 1, 1, tzinfo=timezone.utc))) need_drop = len(rows) - cfg.max_codex_running_per_symbol drops = [c[2] for c in candidates[:need_drop]] if not dry_run: for d in drops: cur.execute( """ UPDATE strategies SET status='deprecated', deprecated_at=NOW(), status_changed_at=NOW(), updated_at=NOW() WHERE strategy_id=%s """, (d["strategy_id"],), ) return drops def top_bottom(strategies: list[dict], top_n: int = 3) -> tuple[list[dict], list[dict]]: s = sorted(strategies, key=lambda x: x.get("fitness", -999), reverse=True) return s[:top_n], list(reversed(s[-top_n:])) def to_jsonable(value: Any): if isinstance(value, datetime): return value.isoformat() if isinstance(value, Decimal): return float(value) if isinstance(value, dict): return {k: to_jsonable(v) for k, v in value.items()} if isinstance(value, list): return [to_jsonable(v) for v in value] return value def write_report( cfg: Config, now_bj: datetime, strategies: list[dict], symbol_stats: dict[str, dict], created: list[dict], deprecated: list[dict], plan_logs: list[str], dry_run: bool, ) -> tuple[Path, Path]: report_root = Path(cfg.report_dir) report_root.mkdir(parents=True, exist_ok=True) day_dir = report_root / now_bj.strftime("%Y-%m-%d") day_dir.mkdir(parents=True, exist_ok=True) stamp = now_bj.strftime("%H%M%S") md_path = day_dir / f"{stamp}_auto_evolve.md" json_path = day_dir / f"{stamp}_auto_evolve.json" top3, bottom3 = top_bottom(strategies, 3) lines = [] lines.append(f"# Auto Evolve Report ({now_bj.strftime('%Y-%m-%d %H:%M:%S %Z')})") lines.append("") lines.append(f"- mode: {'DRY-RUN' if dry_run else 'APPLY'}") lines.append(f"- lookback_days: {cfg.lookback_days}") lines.append(f"- running_strategies: {len(strategies)}") lines.append("") lines.append("## Symbol Regime") for sym in SYMBOLS: st = symbol_stats.get(sym, {}) lines.append( f"- {sym}: regime={st.get('regime','unknown')}, avg_atr_pct={st.get('avg_atr_percentile',0)}, " f"avg_abs_slope={st.get('avg_abs_cvd_slope',0)}, signal_rate={st.get('signal_rate',0)}" ) lines.append("") lines.append("## Top 3 (Fitness)") for r in top3: lines.append( f"- {r['display_name']} ({r['symbol']}): fitness={r['fitness']:.3f}, " f"net_r={as_float(r.get('net_r')):.2f}, closed={as_int(r.get('closed_trades'))}, " f"win_rate={as_float(r.get('win_rate'))*100:.1f}%" ) lines.append("") lines.append("## Bottom 3 (Fitness)") for r in bottom3: lines.append( f"- {r['display_name']} ({r['symbol']}): fitness={r['fitness']:.3f}, " f"net_r={as_float(r.get('net_r')):.2f}, closed={as_int(r.get('closed_trades'))}, " f"win_rate={as_float(r.get('win_rate'))*100:.1f}%" ) lines.append("") lines.append("## Candidate Plan") if plan_logs: lines.extend([f"- {x}" for x in plan_logs]) else: lines.append("- no candidate plan") lines.append("") lines.append("## Created Strategies") if created: for c in created: lines.append( f"- {c['display_name']} ({c['symbol']}): id={c['strategy_id']}, " f"source={c['source_display_name']}, est_fitness={c['estimated_fitness']:.3f}" ) else: lines.append("- none") lines.append("") lines.append("## Deprecated Strategies") if deprecated: for d in deprecated: lines.append(f"- {d['display_name']} ({d['symbol']}): id={d['strategy_id']}") else: lines.append("- none") lines.append("") md_path.write_text("\n".join(lines), encoding="utf-8") payload = { "generated_at": now_bj.isoformat(), "mode": "dry-run" if dry_run else "apply", "lookback_days": cfg.lookback_days, "running_strategies": len(strategies), "symbol_stats": symbol_stats, "top3": top3, "bottom3": bottom3, "created": created, "deprecated": deprecated, "plan_logs": plan_logs, } json_path.write_text( json.dumps(to_jsonable(payload), ensure_ascii=False, indent=2), encoding="utf-8", ) return md_path, json_path def main() -> int: parser = argparse.ArgumentParser(description="Auto evolve daily runner") parser.add_argument("--config", default=None, help="Path to JSON config") parser.add_argument("--apply", action="store_true", help="Apply DB mutations (default dry-run)") args = parser.parse_args() cfg = load_config(args.config) dry_run = not args.apply now_bj = datetime.now(BJ) conn = get_db_conn() conn.autocommit = False created_rows: list[dict] = [] deprecated_rows: list[dict] = [] try: with conn.cursor(cursor_factory=RealDictCursor) as cur: strategies = fetch_running_strategies(cur) metrics = fetch_metrics(cur, cfg.lookback_days) symbol_stats = fetch_symbol_stats(cur) for s in strategies: m = metrics.get(s["strategy_id"], {}) s.update(m) s["fitness"] = compute_fitness(s, cfg.lookback_days, cfg.min_closed_trades) metrics[s["strategy_id"]] = { **m, "fitness": s["fitness"], } plan, plan_logs = choose_new_candidates(strategies, metrics, symbol_stats, cfg) new_ids: set[str] = set() if not dry_run: for c in plan: sid = insert_strategy(cur, c, now_bj) new_ids.add(sid) created_rows.append( { **c, "strategy_id": sid, "display_name": build_display_name(c, now_bj), } ) # 重新拉一次 metrics,让 deprecate 基于最新 running 集合 metrics = fetch_metrics(cur, cfg.lookback_days) for sid, m in metrics.items(): m["fitness"] = compute_fitness(m, cfg.lookback_days, cfg.min_closed_trades) for sym in SYMBOLS: drops = deprecate_overflow_codex(cur, sym, metrics, cfg, new_ids, dry_run=False) deprecated_rows.extend(drops) conn.commit() else: for c in plan: created_rows.append( { **c, "strategy_id": "DRY-RUN", "display_name": build_display_name(c, now_bj), } ) # 报告按提交后的状态生成(dry-run 就用当前状态) if not dry_run: strategies = fetch_running_strategies(cur) metrics = fetch_metrics(cur, cfg.lookback_days) for s in strategies: m = metrics.get(s["strategy_id"], {}) s.update(m) s["fitness"] = compute_fitness(s, cfg.lookback_days, cfg.min_closed_trades) md_path, json_path = write_report( cfg=cfg, now_bj=now_bj, strategies=strategies, symbol_stats=symbol_stats, created=created_rows, deprecated=deprecated_rows, plan_logs=plan_logs, dry_run=dry_run, ) print(f"[auto-evolve] done mode={'dry-run' if dry_run else 'apply'}") print(f"[auto-evolve] report_md={md_path}") print(f"[auto-evolve] report_json={json_path}") return 0 except Exception as e: conn.rollback() print(f"[auto-evolve] failed: {e}") raise finally: conn.close() if __name__ == "__main__": raise SystemExit(main())