P0-1: JWT_SECRET生产环境强制配置,测试环境保留默认 P0-2: DB密码生产环境强制从env读,测试环境保留fallback P0-3: SL三次失败→查真实持仓→reduceOnly平仓→校验结果→写event P0-4: TP1后SL重挂失败则不推进tp1_hit状态,continue等下轮重试 P0-5: 超时自动平仓用SYMBOL_QTY_PRECISION格式化+校验结果 P0-6: 同币种去重改为不区分策略(币安单向模式共享净仓位) P1-1: 手续费窗口entry_ts-200→+200(避免纳入开仓前成交) 额外: 模拟盘*200和实盘*2硬编码→从配置动态读取
1937 lines
70 KiB
Python
1937 lines
70 KiB
Python
from fastapi import FastAPI, HTTPException, Depends, Request
|
||
from fastapi.middleware.cors import CORSMiddleware
|
||
import httpx
|
||
from datetime import datetime, timedelta
|
||
import asyncio, time, os, json
|
||
|
||
from auth import router as auth_router, get_current_user, ensure_tables as ensure_auth_tables
|
||
from db import (
|
||
init_schema, ensure_partitions, get_async_pool, async_fetch, async_fetchrow, async_execute,
|
||
close_async_pool,
|
||
)
|
||
import datetime as _dt
|
||
|
||
app = FastAPI(title="Arbitrage Engine API")
|
||
|
||
app.add_middleware(
|
||
CORSMiddleware,
|
||
allow_origins=["https://arb.zhouyangclaw.com", "http://localhost:3000", "http://localhost:3001"],
|
||
allow_methods=["*"],
|
||
allow_headers=["*"],
|
||
)
|
||
|
||
app.include_router(auth_router)
|
||
|
||
BINANCE_FAPI = "https://fapi.binance.com/fapi/v1"
|
||
SYMBOLS = ["BTCUSDT", "ETHUSDT", "XRPUSDT", "SOLUSDT"]
|
||
HEADERS = {"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36"}
|
||
|
||
# 简单内存缓存(history/stats 60秒,rates 3秒)
|
||
_cache: dict = {}
|
||
|
||
def get_cache(key: str, ttl: int):
|
||
entry = _cache.get(key)
|
||
if entry and time.time() - entry["ts"] < ttl:
|
||
return entry["data"]
|
||
return None
|
||
|
||
def set_cache(key: str, data):
|
||
_cache[key] = {"ts": time.time(), "data": data}
|
||
|
||
|
||
async def save_snapshot(rates: dict):
|
||
try:
|
||
btc = rates.get("BTC", {})
|
||
eth = rates.get("ETH", {})
|
||
await async_execute(
|
||
"INSERT INTO rate_snapshots (ts, btc_rate, eth_rate, btc_price, eth_price, btc_index_price, eth_index_price) "
|
||
"VALUES ($1,$2,$3,$4,$5,$6,$7)",
|
||
int(time.time()),
|
||
float(btc.get("lastFundingRate", 0)),
|
||
float(eth.get("lastFundingRate", 0)),
|
||
float(btc.get("markPrice", 0)),
|
||
float(eth.get("markPrice", 0)),
|
||
float(btc.get("indexPrice", 0)),
|
||
float(eth.get("indexPrice", 0)),
|
||
)
|
||
except Exception as e:
|
||
pass # 落库失败不影响API响应
|
||
|
||
|
||
async def background_snapshot_loop():
|
||
"""后台每2秒自动拉取费率+价格并落库"""
|
||
while True:
|
||
try:
|
||
async with httpx.AsyncClient(timeout=5, headers=HEADERS) as client:
|
||
tasks = [client.get(f"{BINANCE_FAPI}/premiumIndex", params={"symbol": s}) for s in SYMBOLS]
|
||
responses = await asyncio.gather(*tasks, return_exceptions=True)
|
||
result = {}
|
||
for sym, resp in zip(SYMBOLS, responses):
|
||
if isinstance(resp, Exception) or resp.status_code != 200:
|
||
continue
|
||
data = resp.json()
|
||
key = sym.replace("USDT", "")
|
||
result[key] = {
|
||
"lastFundingRate": float(data["lastFundingRate"]),
|
||
"markPrice": float(data["markPrice"]),
|
||
"indexPrice": float(data["indexPrice"]),
|
||
}
|
||
if result:
|
||
await save_snapshot(result)
|
||
except Exception:
|
||
pass
|
||
await asyncio.sleep(2)
|
||
|
||
|
||
@app.on_event("startup")
|
||
async def startup():
|
||
# 初始化PG schema
|
||
init_schema()
|
||
ensure_auth_tables()
|
||
# 初始化asyncpg池
|
||
await get_async_pool()
|
||
asyncio.create_task(background_snapshot_loop())
|
||
|
||
|
||
@app.on_event("shutdown")
|
||
async def shutdown():
|
||
await close_async_pool()
|
||
|
||
|
||
@app.get("/api/health")
|
||
async def health():
|
||
return {"status": "ok", "timestamp": datetime.utcnow().isoformat()}
|
||
|
||
|
||
@app.get("/api/rates")
|
||
async def get_rates():
|
||
cached = get_cache("rates", 3)
|
||
if cached: return cached
|
||
async with httpx.AsyncClient(timeout=10, headers=HEADERS) as client:
|
||
tasks = [client.get(f"{BINANCE_FAPI}/premiumIndex", params={"symbol": s}) for s in SYMBOLS]
|
||
responses = await asyncio.gather(*tasks)
|
||
result = {}
|
||
for sym, resp in zip(SYMBOLS, responses):
|
||
if resp.status_code != 200:
|
||
raise HTTPException(status_code=502, detail=f"Binance error for {sym}")
|
||
data = resp.json()
|
||
key = sym.replace("USDT", "")
|
||
result[key] = {
|
||
"symbol": sym,
|
||
"markPrice": float(data["markPrice"]),
|
||
"indexPrice": float(data["indexPrice"]),
|
||
"lastFundingRate": float(data["lastFundingRate"]),
|
||
"nextFundingTime": data["nextFundingTime"],
|
||
"timestamp": data["time"],
|
||
}
|
||
set_cache("rates", result)
|
||
asyncio.create_task(save_snapshot(result))
|
||
return result
|
||
|
||
|
||
@app.get("/api/snapshots")
|
||
async def get_snapshots(hours: int = 24, limit: int = 5000, user: dict = Depends(get_current_user)):
|
||
since = int(time.time()) - hours * 3600
|
||
rows = await async_fetch(
|
||
"SELECT ts, btc_rate, eth_rate, btc_price, eth_price FROM rate_snapshots "
|
||
"WHERE ts >= $1 ORDER BY ts ASC LIMIT $2",
|
||
since, limit
|
||
)
|
||
return {"count": len(rows), "hours": hours, "data": rows}
|
||
|
||
|
||
@app.get("/api/kline")
|
||
async def get_kline(symbol: str = "BTC", interval: str = "5m", limit: int = 500, user: dict = Depends(get_current_user)):
|
||
interval_secs = {
|
||
"1m": 60, "5m": 300, "30m": 1800,
|
||
"1h": 3600, "4h": 14400, "8h": 28800,
|
||
"1d": 86400, "1w": 604800, "1M": 2592000,
|
||
}
|
||
bar_secs = interval_secs.get(interval, 300)
|
||
rate_col = "btc_rate" if symbol.upper() == "BTC" else "eth_rate"
|
||
price_col = "btc_price" if symbol.upper() == "BTC" else "eth_price"
|
||
since = int(time.time()) - bar_secs * limit
|
||
|
||
rows = await async_fetch(
|
||
f"SELECT ts, {rate_col} as rate, {price_col} as price FROM rate_snapshots "
|
||
f"WHERE ts >= $1 ORDER BY ts ASC",
|
||
since
|
||
)
|
||
|
||
if not rows:
|
||
return {"symbol": symbol, "interval": interval, "data": []}
|
||
|
||
bars: dict = {}
|
||
for r in rows:
|
||
ts, rate, price = r["ts"], r["rate"], r["price"]
|
||
bar_ts = (ts // bar_secs) * bar_secs
|
||
if bar_ts not in bars:
|
||
bars[bar_ts] = {
|
||
"time": bar_ts,
|
||
"open": rate, "high": rate, "low": rate, "close": rate,
|
||
"price_open": price, "price_high": price, "price_low": price, "price_close": price,
|
||
}
|
||
else:
|
||
b = bars[bar_ts]
|
||
b["high"] = max(b["high"], rate)
|
||
b["low"] = min(b["low"], rate)
|
||
b["close"] = rate
|
||
b["price_high"] = max(b["price_high"], price)
|
||
b["price_low"] = min(b["price_low"], price)
|
||
b["price_close"] = price
|
||
|
||
data = sorted(bars.values(), key=lambda x: x["time"])[-limit:]
|
||
for b in data:
|
||
for k in ("open", "high", "low", "close"):
|
||
b[k] = round(b[k] * 10000, 4)
|
||
return {"symbol": symbol, "interval": interval, "count": len(data), "data": data}
|
||
|
||
|
||
@app.get("/api/stats/ytd")
|
||
async def get_stats_ytd(user: dict = Depends(get_current_user)):
|
||
cached = get_cache("stats_ytd", 3600)
|
||
if cached: return cached
|
||
import datetime
|
||
year_start = int(datetime.datetime(datetime.datetime.utcnow().year, 1, 1).timestamp() * 1000)
|
||
end_time = int(time.time() * 1000)
|
||
async with httpx.AsyncClient(timeout=20, headers=HEADERS) as client:
|
||
tasks = [
|
||
client.get(f"{BINANCE_FAPI}/fundingRate",
|
||
params={"symbol": s, "startTime": year_start, "endTime": end_time, "limit": 1000})
|
||
for s in SYMBOLS
|
||
]
|
||
responses = await asyncio.gather(*tasks)
|
||
result = {}
|
||
for sym, resp in zip(SYMBOLS, responses):
|
||
if resp.status_code != 200:
|
||
result[sym.replace("USDT","")] = {"annualized": 0, "count": 0}
|
||
continue
|
||
key = sym.replace("USDT", "")
|
||
rates = [float(item["fundingRate"]) for item in resp.json()]
|
||
if not rates:
|
||
result[key] = {"annualized": 0, "count": 0}
|
||
continue
|
||
mean = sum(rates) / len(rates)
|
||
annualized = round(mean * 3 * 365 * 100, 2)
|
||
result[key] = {"annualized": annualized, "count": len(rates)}
|
||
set_cache("stats_ytd", result)
|
||
return result
|
||
|
||
|
||
@app.get("/api/signals/history")
|
||
async def get_signals_history(limit: int = 100, user: dict = Depends(get_current_user)):
|
||
try:
|
||
rows = await async_fetch(
|
||
"SELECT id, symbol, rate, annualized, sent_at, message FROM signal_logs ORDER BY sent_at DESC LIMIT $1",
|
||
limit
|
||
)
|
||
return {"items": rows}
|
||
except Exception as e:
|
||
return {"items": [], "error": str(e)}
|
||
|
||
|
||
@app.get("/api/history")
|
||
async def get_history(user: dict = Depends(get_current_user)):
|
||
cached = get_cache("history", 60)
|
||
if cached: return cached
|
||
end_time = int(datetime.utcnow().timestamp() * 1000)
|
||
start_time = int((datetime.utcnow() - timedelta(days=7)).timestamp() * 1000)
|
||
async with httpx.AsyncClient(timeout=15, headers=HEADERS) as client:
|
||
tasks = [
|
||
client.get(f"{BINANCE_FAPI}/fundingRate",
|
||
params={"symbol": s, "startTime": start_time, "endTime": end_time, "limit": 1000})
|
||
for s in SYMBOLS
|
||
]
|
||
responses = await asyncio.gather(*tasks)
|
||
result = {}
|
||
for sym, resp in zip(SYMBOLS, responses):
|
||
if resp.status_code != 200:
|
||
raise HTTPException(status_code=502, detail=f"Binance history error for {sym}")
|
||
key = sym.replace("USDT", "")
|
||
result[key] = [
|
||
{"fundingTime": item["fundingTime"], "fundingRate": float(item["fundingRate"]),
|
||
"timestamp": datetime.utcfromtimestamp(item["fundingTime"] / 1000).isoformat()}
|
||
for item in resp.json()
|
||
]
|
||
set_cache("history", result)
|
||
return result
|
||
|
||
|
||
@app.get("/api/stats")
|
||
async def get_stats(user: dict = Depends(get_current_user)):
|
||
cached = get_cache("stats", 60)
|
||
if cached: return cached
|
||
end_time = int(datetime.utcnow().timestamp() * 1000)
|
||
start_time = int((datetime.utcnow() - timedelta(days=7)).timestamp() * 1000)
|
||
async with httpx.AsyncClient(timeout=15, headers=HEADERS) as client:
|
||
tasks = [
|
||
client.get(f"{BINANCE_FAPI}/fundingRate",
|
||
params={"symbol": s, "startTime": start_time, "endTime": end_time, "limit": 1000})
|
||
for s in SYMBOLS
|
||
]
|
||
responses = await asyncio.gather(*tasks)
|
||
stats = {}
|
||
for sym, resp in zip(SYMBOLS, responses):
|
||
if resp.status_code != 200:
|
||
raise HTTPException(status_code=502, detail=f"Binance stats error for {sym}")
|
||
key = sym.replace("USDT", "")
|
||
rates = [float(item["fundingRate"]) for item in resp.json()]
|
||
if not rates:
|
||
stats[key] = {"mean7d": 0, "annualized": 0, "count": 0}
|
||
continue
|
||
mean = sum(rates) / len(rates)
|
||
annualized = mean * 3 * 365 * 100
|
||
stats[key] = {
|
||
"mean7d": round(mean * 100, 6),
|
||
"annualized": round(annualized, 2),
|
||
"count": len(rates),
|
||
}
|
||
btc_ann = stats.get("BTC", {}).get("annualized", 0)
|
||
eth_ann = stats.get("ETH", {}).get("annualized", 0)
|
||
btc_mean = stats.get("BTC", {}).get("mean7d", 0)
|
||
eth_mean = stats.get("ETH", {}).get("mean7d", 0)
|
||
stats["combo"] = {
|
||
"mean7d": round((btc_mean + eth_mean) / 2, 6),
|
||
"annualized": round((btc_ann + eth_ann) / 2, 2),
|
||
}
|
||
set_cache("stats", stats)
|
||
return stats
|
||
|
||
|
||
# ─── aggTrades 查询接口(PG版)───────────────────────────────────
|
||
|
||
@app.get("/api/trades/meta")
|
||
async def get_trades_meta(user: dict = Depends(get_current_user)):
|
||
rows = await async_fetch("SELECT symbol, last_agg_id, last_time_ms, updated_at FROM agg_trades_meta")
|
||
result = {}
|
||
for r in rows:
|
||
sym = r["symbol"].replace("USDT", "")
|
||
result[sym] = {
|
||
"last_agg_id": r["last_agg_id"],
|
||
"last_time_ms": r["last_time_ms"],
|
||
"updated_at": r["updated_at"],
|
||
}
|
||
return result
|
||
|
||
|
||
@app.get("/api/trades/summary")
|
||
async def get_trades_summary(
|
||
symbol: str = "BTC",
|
||
start_ms: int = 0,
|
||
end_ms: int = 0,
|
||
interval: str = "1m",
|
||
user: dict = Depends(get_current_user),
|
||
):
|
||
if end_ms == 0:
|
||
end_ms = int(time.time() * 1000)
|
||
if start_ms == 0:
|
||
start_ms = end_ms - 3600 * 1000
|
||
|
||
interval_ms = {"1m": 60000, "5m": 300000, "15m": 900000, "1h": 3600000}.get(interval, 60000)
|
||
sym_full = symbol.upper() + "USDT"
|
||
|
||
# PG原生聚合,比Python循环快100倍
|
||
rows = await async_fetch(
|
||
"""
|
||
SELECT
|
||
(time_ms / $4) * $4 AS bar_ms,
|
||
ROUND(SUM(CASE WHEN is_buyer_maker = 0 THEN qty ELSE 0 END)::numeric, 4) AS buy_vol,
|
||
ROUND(SUM(CASE WHEN is_buyer_maker = 1 THEN qty ELSE 0 END)::numeric, 4) AS sell_vol,
|
||
COUNT(*) AS trade_count,
|
||
ROUND((SUM(price * qty) / NULLIF(SUM(qty), 0))::numeric, 2) AS vwap,
|
||
ROUND(MAX(qty)::numeric, 4) AS max_qty
|
||
FROM agg_trades
|
||
WHERE symbol = $1 AND time_ms >= $2 AND time_ms < $3
|
||
GROUP BY bar_ms
|
||
ORDER BY bar_ms ASC
|
||
""",
|
||
sym_full, start_ms, end_ms, interval_ms
|
||
)
|
||
|
||
result = []
|
||
for r in rows:
|
||
buy = float(r["buy_vol"])
|
||
sell = float(r["sell_vol"])
|
||
result.append({
|
||
"time_ms": r["bar_ms"],
|
||
"buy_vol": buy,
|
||
"sell_vol": sell,
|
||
"delta": round(buy - sell, 4),
|
||
"total_vol": round(buy + sell, 4),
|
||
"trade_count": r["trade_count"],
|
||
"vwap": float(r["vwap"]) if r["vwap"] else 0,
|
||
"max_qty": float(r["max_qty"]),
|
||
})
|
||
|
||
return {"symbol": symbol, "interval": interval, "count": len(result), "data": result}
|
||
|
||
|
||
@app.get("/api/trades/latest")
|
||
async def get_trades_latest(
|
||
symbol: str = "BTC",
|
||
limit: int = 30,
|
||
user: dict = Depends(get_current_user),
|
||
):
|
||
cache_key = f"trades_latest_{symbol}_{limit}"
|
||
cached = get_cache(cache_key, 2)
|
||
if cached: return cached
|
||
sym_full = symbol.upper() + "USDT"
|
||
rows = await async_fetch(
|
||
"SELECT agg_id, price, qty, time_ms, is_buyer_maker FROM agg_trades "
|
||
"WHERE symbol = $1 ORDER BY time_ms DESC, agg_id DESC LIMIT $2",
|
||
sym_full, limit
|
||
)
|
||
result = {"symbol": symbol, "count": len(rows), "data": rows}
|
||
set_cache(cache_key, result)
|
||
return result
|
||
|
||
|
||
@app.get("/api/collector/health")
|
||
async def collector_health(user: dict = Depends(get_current_user)):
|
||
now_ms = int(time.time() * 1000)
|
||
rows = await async_fetch("SELECT symbol, last_agg_id, last_time_ms FROM agg_trades_meta")
|
||
status = {}
|
||
for r in rows:
|
||
sym = r["symbol"].replace("USDT", "")
|
||
lag_s = (now_ms - (r["last_time_ms"] or 0)) / 1000
|
||
status[sym] = {
|
||
"last_agg_id": r["last_agg_id"],
|
||
"lag_seconds": round(lag_s, 1),
|
||
"healthy": lag_s < 30,
|
||
}
|
||
return {"collector": status, "timestamp": now_ms}
|
||
|
||
|
||
# ─── V5 Signal Engine API ────────────────────────────────────────
|
||
|
||
@app.get("/api/signals/indicators")
|
||
async def get_signal_indicators(
|
||
symbol: str = "BTC",
|
||
minutes: int = 60,
|
||
user: dict = Depends(get_current_user),
|
||
):
|
||
sym_full = symbol.upper() + "USDT"
|
||
now_ms = int(time.time() * 1000)
|
||
start_ms = now_ms - minutes * 60 * 1000
|
||
rows = await async_fetch(
|
||
"SELECT ts, cvd_fast, cvd_mid, cvd_day, atr_5m, vwap_30m, price, score, signal "
|
||
"FROM signal_indicators_1m WHERE symbol = $1 AND ts >= $2 ORDER BY ts ASC",
|
||
sym_full, start_ms
|
||
)
|
||
return {"symbol": symbol, "count": len(rows), "data": rows}
|
||
|
||
|
||
@app.get("/api/signals/latest")
|
||
async def get_signal_latest(user: dict = Depends(get_current_user), strategy: str = "v52_8signals"):
|
||
result = {}
|
||
for sym in SYMBOLS:
|
||
row = await async_fetchrow(
|
||
"SELECT ts, cvd_fast, cvd_mid, cvd_day, cvd_fast_slope, atr_5m, atr_percentile, "
|
||
"vwap_30m, price, p95_qty, p99_qty, score, signal, factors "
|
||
"FROM signal_indicators WHERE symbol = $1 AND strategy = $2 ORDER BY ts DESC LIMIT 1",
|
||
sym, strategy
|
||
)
|
||
if row:
|
||
data = dict(row)
|
||
if isinstance(data.get("factors"), str):
|
||
try:
|
||
data["factors"] = json.loads(data["factors"])
|
||
except Exception:
|
||
pass
|
||
result[sym.replace("USDT", "")] = data
|
||
return result
|
||
|
||
|
||
def _primary_signal_strategy() -> str:
|
||
strategy_dir = os.path.join(os.path.dirname(__file__), "strategies")
|
||
try:
|
||
names = []
|
||
for fn in os.listdir(strategy_dir):
|
||
if not fn.endswith(".json"):
|
||
continue
|
||
with open(os.path.join(strategy_dir, fn), "r", encoding="utf-8") as f:
|
||
cfg = json.load(f)
|
||
if cfg.get("name"):
|
||
names.append(cfg["name"])
|
||
if "v52_8signals" in names:
|
||
return "v52_8signals"
|
||
if "v51_baseline" in names:
|
||
return "v51_baseline"
|
||
except Exception:
|
||
pass
|
||
return "v51_baseline"
|
||
|
||
|
||
def _normalize_factors(raw):
|
||
if not raw:
|
||
return {}
|
||
if isinstance(raw, str):
|
||
try:
|
||
return json.loads(raw)
|
||
except Exception:
|
||
return {}
|
||
if isinstance(raw, dict):
|
||
return raw
|
||
return {}
|
||
|
||
|
||
@app.get("/api/signals/latest-v52")
|
||
async def get_signal_latest_v52(user: dict = Depends(get_current_user)):
|
||
"""返回V5.1/V5.2并排展示所需的最新信号信息。"""
|
||
primary_strategy = _primary_signal_strategy()
|
||
result = {}
|
||
for sym in SYMBOLS:
|
||
base_row = await async_fetchrow(
|
||
"SELECT ts, score, signal FROM signal_indicators WHERE symbol = $1 ORDER BY ts DESC LIMIT 1",
|
||
sym,
|
||
)
|
||
strategy_rows = await async_fetch(
|
||
"SELECT strategy, score, direction, entry_ts, score_factors "
|
||
"FROM paper_trades WHERE symbol = $1 AND strategy IN ('v51_baseline','v52_8signals') "
|
||
"ORDER BY entry_ts DESC",
|
||
sym,
|
||
)
|
||
latest_by_strategy: dict[str, dict] = {}
|
||
for row in strategy_rows:
|
||
st = (row.get("strategy") or "v51_baseline")
|
||
if st not in latest_by_strategy:
|
||
latest_by_strategy[st] = row
|
||
if "v51_baseline" in latest_by_strategy and "v52_8signals" in latest_by_strategy:
|
||
break
|
||
|
||
def build_strategy_payload(strategy_name: str):
|
||
trade_row = latest_by_strategy.get(strategy_name)
|
||
if trade_row:
|
||
payload = {
|
||
"score": trade_row.get("score"),
|
||
"signal": trade_row.get("direction"),
|
||
"ts": trade_row.get("entry_ts"),
|
||
"source": "paper_trade",
|
||
}
|
||
elif base_row and primary_strategy == strategy_name:
|
||
payload = {
|
||
"score": base_row.get("score"),
|
||
"signal": base_row.get("signal"),
|
||
"ts": base_row.get("ts"),
|
||
"source": "signal_indicators",
|
||
}
|
||
else:
|
||
payload = {
|
||
"score": None,
|
||
"signal": None,
|
||
"ts": None,
|
||
"source": "unavailable",
|
||
}
|
||
|
||
factors = _normalize_factors(trade_row.get("score_factors") if trade_row else None)
|
||
payload["funding_rate_score"] = factors.get("funding_rate", {}).get("score")
|
||
payload["liquidation_score"] = factors.get("liquidation", {}).get("score")
|
||
return payload
|
||
|
||
result[sym.replace("USDT", "")] = {
|
||
"primary_strategy": primary_strategy,
|
||
"latest_signal": base_row.get("signal") if base_row else None,
|
||
"latest_ts": base_row.get("ts") if base_row else None,
|
||
"v51": build_strategy_payload("v51_baseline"),
|
||
"v52": build_strategy_payload("v52_8signals"),
|
||
}
|
||
return result
|
||
|
||
|
||
@app.get("/api/signals/market-indicators")
|
||
async def get_market_indicators(user: dict = Depends(get_current_user)):
|
||
"""返回最新的market_indicators数据(V5.1新增4个数据源)"""
|
||
result = {}
|
||
for sym in SYMBOLS:
|
||
indicators = {}
|
||
for ind_type in ["long_short_ratio", "top_trader_position", "open_interest_hist", "coinbase_premium", "funding_rate"]:
|
||
row = await async_fetchrow(
|
||
"SELECT value, timestamp_ms FROM market_indicators WHERE symbol = $1 AND indicator_type = $2 ORDER BY timestamp_ms DESC LIMIT 1",
|
||
sym,
|
||
ind_type,
|
||
)
|
||
if row:
|
||
val = row["value"]
|
||
if isinstance(val, str):
|
||
import json as _json
|
||
try:
|
||
val = _json.loads(val)
|
||
except Exception:
|
||
pass
|
||
indicators[ind_type] = {"value": val, "ts": row["timestamp_ms"]}
|
||
result[sym.replace("USDT", "")] = indicators
|
||
return result
|
||
|
||
|
||
@app.get("/api/signals/signal-history")
|
||
async def get_signal_history(
|
||
symbol: str = "BTC",
|
||
limit: int = 50,
|
||
strategy: str = "v52_8signals",
|
||
user: dict = Depends(get_current_user),
|
||
):
|
||
"""返回最近的信号历史(只返回有信号的记录),含各层分数"""
|
||
sym_full = symbol.upper() + "USDT"
|
||
rows = await async_fetch(
|
||
"SELECT ts, score, signal, factors FROM signal_indicators "
|
||
"WHERE symbol = $1 AND strategy = $2 AND signal IS NOT NULL "
|
||
"ORDER BY ts DESC LIMIT $3",
|
||
sym_full, strategy, limit
|
||
)
|
||
# factors可能是JSON string
|
||
for r in rows:
|
||
if isinstance(r.get("factors"), str):
|
||
try:
|
||
r["factors"] = json.loads(r["factors"])
|
||
except Exception:
|
||
pass
|
||
return {"symbol": symbol, "count": len(rows), "data": rows}
|
||
|
||
|
||
@app.get("/api/signals/trades")
|
||
async def get_signal_trades(
|
||
status: str = "all",
|
||
limit: int = 50,
|
||
user: dict = Depends(get_current_user),
|
||
):
|
||
if status == "all":
|
||
rows = await async_fetch(
|
||
"SELECT * FROM signal_trades ORDER BY ts_open DESC LIMIT $1", limit
|
||
)
|
||
else:
|
||
rows = await async_fetch(
|
||
"SELECT * FROM signal_trades WHERE status = $1 ORDER BY ts_open DESC LIMIT $2",
|
||
status, limit
|
||
)
|
||
return {"count": len(rows), "data": rows}
|
||
|
||
|
||
# ─── 模拟盘 API ──────────────────────────────────────────────────
|
||
|
||
# 模拟盘配置状态(与signal_engine共享的运行时状态)
|
||
paper_config = {
|
||
"enabled": False,
|
||
"enabled_strategies": [], # 分策略开关: ["v51_baseline", "v52_8signals"]
|
||
"initial_balance": 10000,
|
||
"risk_per_trade": 0.02,
|
||
"max_positions": 4,
|
||
"tier_multiplier": {"light": 0.5, "standard": 1.0, "heavy": 1.5},
|
||
}
|
||
|
||
# 启动时加载已有配置
|
||
_config_path = os.path.join(os.path.dirname(__file__), "paper_config.json")
|
||
if os.path.exists(_config_path):
|
||
try:
|
||
with open(_config_path, "r") as _f:
|
||
_saved = json.load(_f)
|
||
paper_config.update(_saved)
|
||
except Exception:
|
||
pass
|
||
|
||
|
||
@app.get("/api/paper/config")
|
||
async def paper_get_config(user: dict = Depends(get_current_user)):
|
||
"""获取模拟盘配置"""
|
||
return paper_config
|
||
|
||
|
||
@app.post("/api/paper/config")
|
||
async def paper_set_config(request: Request, user: dict = Depends(get_current_user)):
|
||
"""修改模拟盘配置(仅admin)"""
|
||
if user.get("role") != "admin":
|
||
raise HTTPException(status_code=403, detail="仅管理员可修改")
|
||
body = await request.json()
|
||
for k in ["enabled", "enabled_strategies", "initial_balance", "risk_per_trade", "max_positions"]:
|
||
if k in body:
|
||
paper_config[k] = body[k]
|
||
# 写入配置文件让signal_engine也能读到
|
||
config_path = os.path.join(os.path.dirname(__file__), "paper_config.json")
|
||
with open(config_path, "w") as f:
|
||
json.dump(paper_config, f, indent=2)
|
||
return {"ok": True, "config": paper_config}
|
||
|
||
|
||
@app.get("/api/paper/summary")
|
||
async def paper_summary(
|
||
strategy: str = "all",
|
||
user: dict = Depends(get_current_user),
|
||
):
|
||
"""模拟盘总览"""
|
||
if strategy == "all":
|
||
closed = await async_fetch(
|
||
"SELECT pnl_r, direction FROM paper_trades WHERE status NOT IN ('active','tp1_hit')"
|
||
)
|
||
active = await async_fetch(
|
||
"SELECT id FROM paper_trades WHERE status IN ('active','tp1_hit')"
|
||
)
|
||
first = await async_fetchrow("SELECT MIN(created_at) as start FROM paper_trades")
|
||
else:
|
||
closed = await async_fetch(
|
||
"SELECT pnl_r, direction FROM paper_trades "
|
||
"WHERE status NOT IN ('active','tp1_hit') AND strategy = $1",
|
||
strategy,
|
||
)
|
||
active = await async_fetch(
|
||
"SELECT id FROM paper_trades WHERE status IN ('active','tp1_hit') AND strategy = $1",
|
||
strategy,
|
||
)
|
||
first = await async_fetchrow(
|
||
"SELECT MIN(created_at) as start FROM paper_trades WHERE strategy = $1",
|
||
strategy,
|
||
)
|
||
|
||
total = len(closed)
|
||
wins = len([r for r in closed if r["pnl_r"] > 0])
|
||
total_pnl = sum(r["pnl_r"] for r in closed)
|
||
paper_1r_usd = paper_config["initial_balance"] * paper_config["risk_per_trade"]
|
||
total_pnl_usdt = total_pnl * paper_1r_usd
|
||
balance = paper_config["initial_balance"] + total_pnl_usdt
|
||
win_rate = (wins / total * 100) if total > 0 else 0
|
||
gross_profit = sum(r["pnl_r"] for r in closed if r["pnl_r"] > 0)
|
||
gross_loss = abs(sum(r["pnl_r"] for r in closed if r["pnl_r"] <= 0))
|
||
profit_factor = (gross_profit / gross_loss) if gross_loss > 0 else 0
|
||
|
||
return {
|
||
"total_trades": total,
|
||
"win_rate": round(win_rate, 1),
|
||
"total_pnl": round(total_pnl, 2),
|
||
"total_pnl_usdt": round(total_pnl_usdt, 2),
|
||
"balance": round(balance, 2),
|
||
"active_positions": len(active),
|
||
"profit_factor": round(profit_factor, 2),
|
||
"start_time": str(first["start"]) if first and first["start"] else None,
|
||
}
|
||
|
||
|
||
@app.get("/api/paper/positions")
|
||
async def paper_positions(
|
||
strategy: str = "all",
|
||
user: dict = Depends(get_current_user),
|
||
):
|
||
"""当前活跃持仓(含实时价格和浮动盈亏)"""
|
||
if strategy == "all":
|
||
rows = await async_fetch(
|
||
"SELECT id, symbol, direction, score, tier, strategy, entry_price, entry_ts, "
|
||
"tp1_price, tp2_price, sl_price, tp1_hit, status, atr_at_entry, score_factors, risk_distance "
|
||
"FROM paper_trades WHERE status IN ('active','tp1_hit') ORDER BY entry_ts DESC"
|
||
)
|
||
else:
|
||
rows = await async_fetch(
|
||
"SELECT id, symbol, direction, score, tier, strategy, entry_price, entry_ts, "
|
||
"tp1_price, tp2_price, sl_price, tp1_hit, status, atr_at_entry, score_factors, risk_distance "
|
||
"FROM paper_trades WHERE status IN ('active','tp1_hit') AND strategy = $1 ORDER BY entry_ts DESC",
|
||
strategy,
|
||
)
|
||
# 从币安API获取实时价格
|
||
prices = {}
|
||
symbols_needed = list(set(r["symbol"] for r in rows))
|
||
if symbols_needed:
|
||
try:
|
||
async with httpx.AsyncClient(timeout=5) as client:
|
||
resp = await client.get("https://fapi.binance.com/fapi/v1/ticker/price")
|
||
if resp.status_code == 200:
|
||
for item in resp.json():
|
||
if item["symbol"] in symbols_needed:
|
||
prices[item["symbol"]] = float(item["price"])
|
||
except Exception:
|
||
pass
|
||
# fallback: 如果币安API失败,用signal_indicators
|
||
for r in rows:
|
||
sym = r["symbol"]
|
||
if sym not in prices:
|
||
try:
|
||
latest = await async_fetchrow(
|
||
"SELECT price FROM signal_indicators WHERE symbol=$1 ORDER BY ts DESC LIMIT 1", sym
|
||
)
|
||
prices[sym] = latest["price"] if latest else 0
|
||
except Exception:
|
||
prices[sym] = 0
|
||
|
||
result = []
|
||
for r in rows:
|
||
d = dict(r)
|
||
current_price = prices.get(r["symbol"], 0)
|
||
d["current_price"] = current_price
|
||
# 浮动盈亏(R)
|
||
entry = r["entry_price"]
|
||
rd = r.get("risk_distance") or abs(entry - r["sl_price"]) or 1
|
||
if rd > 0 and entry > 0:
|
||
if r["direction"] == "LONG":
|
||
d["unrealized_pnl_r"] = round((current_price - entry) / rd, 2)
|
||
else:
|
||
d["unrealized_pnl_r"] = round((entry - current_price) / rd, 2)
|
||
# 浮动盈亏(USDT) — 假设1R = risk_per_trade
|
||
paper_1r = paper_config["initial_balance"] * paper_config["risk_per_trade"]
|
||
d["unrealized_pnl_usdt"] = round(d["unrealized_pnl_r"] * paper_1r, 2)
|
||
else:
|
||
d["unrealized_pnl_r"] = 0
|
||
d["unrealized_pnl_usdt"] = 0
|
||
result.append(d)
|
||
return {"data": result}
|
||
|
||
|
||
@app.get("/api/paper/trades")
|
||
async def paper_trades(
|
||
symbol: str = "all",
|
||
result: str = "all",
|
||
strategy: str = "all",
|
||
limit: int = 100,
|
||
user: dict = Depends(get_current_user),
|
||
):
|
||
"""历史交易列表"""
|
||
conditions = ["status NOT IN ('active','tp1_hit')"]
|
||
params = []
|
||
idx = 1
|
||
|
||
if symbol != "all":
|
||
conditions.append(f"symbol = ${idx}")
|
||
params.append(symbol.upper() + "USDT")
|
||
idx += 1
|
||
|
||
if result == "win":
|
||
conditions.append("pnl_r > 0")
|
||
elif result == "loss":
|
||
conditions.append("pnl_r <= 0")
|
||
|
||
if strategy != "all":
|
||
conditions.append(f"strategy = ${idx}")
|
||
params.append(strategy)
|
||
idx += 1
|
||
|
||
where = " AND ".join(conditions)
|
||
params.append(limit)
|
||
rows = await async_fetch(
|
||
f"SELECT id, symbol, direction, score, tier, strategy, entry_price, exit_price, "
|
||
f"entry_ts, exit_ts, pnl_r, status, tp1_hit, score_factors "
|
||
f"FROM paper_trades WHERE {where} ORDER BY exit_ts DESC LIMIT ${idx}",
|
||
*params
|
||
)
|
||
return {"count": len(rows), "data": rows}
|
||
|
||
|
||
@app.get("/api/paper/equity-curve")
|
||
async def paper_equity_curve(
|
||
strategy: str = "all",
|
||
user: dict = Depends(get_current_user),
|
||
):
|
||
"""权益曲线"""
|
||
if strategy == "all":
|
||
rows = await async_fetch(
|
||
"SELECT exit_ts, pnl_r FROM paper_trades "
|
||
"WHERE status NOT IN ('active','tp1_hit') ORDER BY exit_ts ASC"
|
||
)
|
||
else:
|
||
rows = await async_fetch(
|
||
"SELECT exit_ts, pnl_r FROM paper_trades "
|
||
"WHERE status NOT IN ('active','tp1_hit') AND strategy = $1 ORDER BY exit_ts ASC",
|
||
strategy,
|
||
)
|
||
cumulative = 0.0
|
||
curve = []
|
||
for r in rows:
|
||
cumulative += r["pnl_r"]
|
||
curve.append({"ts": r["exit_ts"], "pnl": round(cumulative, 2)})
|
||
return {"data": curve}
|
||
|
||
|
||
@app.get("/api/paper/stats")
|
||
async def paper_stats(
|
||
strategy: str = "all",
|
||
user: dict = Depends(get_current_user),
|
||
):
|
||
"""详细统计"""
|
||
if strategy == "all":
|
||
rows = await async_fetch(
|
||
"SELECT symbol, direction, pnl_r, tier, entry_ts, exit_ts "
|
||
"FROM paper_trades WHERE status NOT IN ('active','tp1_hit')"
|
||
)
|
||
else:
|
||
rows = await async_fetch(
|
||
"SELECT symbol, direction, pnl_r, tier, entry_ts, exit_ts "
|
||
"FROM paper_trades WHERE status NOT IN ('active','tp1_hit') AND strategy = $1",
|
||
strategy,
|
||
)
|
||
if not rows:
|
||
return {"error": "暂无数据"}
|
||
|
||
total = len(rows)
|
||
wins = [r for r in rows if r["pnl_r"] > 0]
|
||
losses = [r for r in rows if r["pnl_r"] <= 0]
|
||
|
||
# 基础统计
|
||
win_rate = len(wins) / total * 100
|
||
avg_win = sum(r["pnl_r"] for r in wins) / len(wins) if wins else 0
|
||
avg_loss = abs(sum(r["pnl_r"] for r in losses)) / len(losses) if losses else 0
|
||
win_loss_ratio = avg_win / avg_loss if avg_loss > 0 else 0
|
||
|
||
# MDD
|
||
peak = 0.0
|
||
mdd = 0.0
|
||
running = 0.0
|
||
for r in sorted(rows, key=lambda x: x["exit_ts"] or 0):
|
||
running += r["pnl_r"]
|
||
peak = max(peak, running)
|
||
mdd = max(mdd, peak - running)
|
||
|
||
# 夏普
|
||
returns = [r["pnl_r"] for r in rows]
|
||
if len(returns) > 1:
|
||
import statistics
|
||
avg_ret = statistics.mean(returns)
|
||
std_ret = statistics.stdev(returns)
|
||
sharpe = (avg_ret / std_ret) * (252 ** 0.5) if std_ret > 0 else 0
|
||
else:
|
||
sharpe = 0
|
||
|
||
# 按币种 — 完整统计
|
||
by_symbol = {}
|
||
for r in rows:
|
||
s = r["symbol"].replace("USDT", "")
|
||
if s not in by_symbol:
|
||
by_symbol[s] = []
|
||
by_symbol[s].append(r)
|
||
|
||
def calc_stats(trade_list):
|
||
t = len(trade_list)
|
||
w = [r for r in trade_list if r["pnl_r"] > 0]
|
||
l = [r for r in trade_list if r["pnl_r"] <= 0]
|
||
aw = sum(r["pnl_r"] for r in w) / len(w) if w else 0
|
||
al = abs(sum(r["pnl_r"] for r in l)) / len(l) if l else 0
|
||
wlr = aw / al if al > 0 else 0
|
||
# MDD
|
||
pk, dd, rn = 0.0, 0.0, 0.0
|
||
for r in sorted(trade_list, key=lambda x: x["exit_ts"] or 0):
|
||
rn += r["pnl_r"]
|
||
pk = max(pk, rn)
|
||
dd = max(dd, pk - rn)
|
||
# 夏普
|
||
rets = [r["pnl_r"] for r in trade_list]
|
||
if len(rets) > 1:
|
||
import statistics
|
||
avg_r = statistics.mean(rets)
|
||
std_r = statistics.stdev(rets)
|
||
sp = (avg_r / std_r) * (252 ** 0.5) if std_r > 0 else 0
|
||
else:
|
||
sp = 0
|
||
# 方向
|
||
lg = [r for r in trade_list if r["direction"] == "LONG"]
|
||
sh = [r for r in trade_list if r["direction"] == "SHORT"]
|
||
lwr = len([r for r in lg if r["pnl_r"] > 0]) / len(lg) * 100 if lg else 0
|
||
swr = len([r for r in sh if r["pnl_r"] > 0]) / len(sh) * 100 if sh else 0
|
||
total_pnl = sum(r["pnl_r"] for r in trade_list)
|
||
return {
|
||
"total": t, "win_rate": round(len(w)/t*100, 1) if t else 0,
|
||
"avg_win": round(aw, 2), "avg_loss": round(al, 2),
|
||
"win_loss_ratio": round(wlr, 2), "mdd": round(dd, 2),
|
||
"sharpe": round(sp, 2), "total_pnl": round(total_pnl, 2),
|
||
"long_win_rate": round(lwr, 1), "long_count": len(lg),
|
||
"short_win_rate": round(swr, 1), "short_count": len(sh),
|
||
}
|
||
|
||
symbol_stats = {s: calc_stats(tl) for s, tl in by_symbol.items()}
|
||
|
||
# 按方向
|
||
longs = [r for r in rows if r["direction"] == "LONG"]
|
||
shorts = [r for r in rows if r["direction"] == "SHORT"]
|
||
long_wr = len([r for r in longs if r["pnl_r"] > 0]) / len(longs) * 100 if longs else 0
|
||
short_wr = len([r for r in shorts if r["pnl_r"] > 0]) / len(shorts) * 100 if shorts else 0
|
||
|
||
# 按档位
|
||
by_tier = {}
|
||
for r in rows:
|
||
t = r["tier"]
|
||
if t not in by_tier:
|
||
by_tier[t] = {"total": 0, "wins": 0}
|
||
by_tier[t]["total"] += 1
|
||
if r["pnl_r"] > 0:
|
||
by_tier[t]["wins"] += 1
|
||
tier_stats = {t: {"total": v["total"], "win_rate": round(v["wins"]/v["total"]*100, 1)} for t, v in by_tier.items()}
|
||
|
||
return {
|
||
"total": total,
|
||
"win_rate": round(win_rate, 1),
|
||
"avg_win": round(avg_win, 2),
|
||
"avg_loss": round(avg_loss, 2),
|
||
"win_loss_ratio": round(win_loss_ratio, 2),
|
||
"mdd": round(mdd, 2),
|
||
"sharpe": round(sharpe, 2),
|
||
"total_pnl": round(sum(r["pnl_r"] for r in rows), 2),
|
||
"long_win_rate": round(long_wr, 1),
|
||
"long_count": len(longs),
|
||
"short_win_rate": round(short_wr, 1),
|
||
"short_count": len(shorts),
|
||
"by_symbol": symbol_stats,
|
||
"by_tier": tier_stats,
|
||
}
|
||
|
||
|
||
@app.get("/api/paper/stats-by-strategy")
|
||
async def paper_stats_by_strategy(user: dict = Depends(get_current_user)):
|
||
"""按策略聚合模拟盘表现"""
|
||
rows = await async_fetch(
|
||
"SELECT strategy, pnl_r FROM paper_trades WHERE status NOT IN ('active','tp1_hit')"
|
||
)
|
||
active_rows = await async_fetch(
|
||
"SELECT strategy, COUNT(*) AS active_count FROM paper_trades "
|
||
"WHERE status IN ('active','tp1_hit') GROUP BY strategy"
|
||
)
|
||
if not rows and not active_rows:
|
||
return {"data": []}
|
||
|
||
active_map = {r["strategy"] or "v51_baseline": int(r["active_count"]) for r in active_rows}
|
||
by_strategy: dict[str, list[float]] = {}
|
||
for row in rows:
|
||
strategy = row["strategy"] or "v51_baseline"
|
||
by_strategy.setdefault(strategy, []).append(float(row["pnl_r"]))
|
||
|
||
stats = []
|
||
for strategy, pnls in by_strategy.items():
|
||
total = len(pnls)
|
||
wins = [p for p in pnls if p > 0]
|
||
losses = [p for p in pnls if p <= 0]
|
||
avg_win = sum(wins) / len(wins) if wins else 0
|
||
avg_loss = abs(sum(losses) / len(losses)) if losses else 0
|
||
stats.append(
|
||
{
|
||
"strategy": strategy,
|
||
"total": total,
|
||
"win_rate": round((len(wins) / total) * 100, 1) if total else 0,
|
||
"total_pnl": round(sum(pnls), 2),
|
||
"avg_win": round(avg_win, 2),
|
||
"avg_loss": round(avg_loss, 2),
|
||
"active_positions": active_map.get(strategy, 0),
|
||
}
|
||
)
|
||
|
||
for strategy, active_count in active_map.items():
|
||
if strategy not in by_strategy:
|
||
stats.append(
|
||
{
|
||
"strategy": strategy,
|
||
"total": 0,
|
||
"win_rate": 0,
|
||
"total_pnl": 0,
|
||
"avg_win": 0,
|
||
"avg_loss": 0,
|
||
"active_positions": active_count,
|
||
}
|
||
)
|
||
|
||
stats.sort(key=lambda x: x["strategy"])
|
||
return {"data": stats}
|
||
|
||
|
||
# ─── 服务器状态监控 ───────────────────────────────────────────────
|
||
|
||
import shutil, subprocess, psutil
|
||
|
||
# 服务器状态缓存(避免重复调用慢操作)
|
||
_server_cache: dict = {"data": None, "ts": 0}
|
||
_PM2_BIN = None
|
||
|
||
def _find_pm2_bin():
|
||
"""找到pm2二进制路径,避免每次走npx"""
|
||
global _PM2_BIN
|
||
if _PM2_BIN:
|
||
return _PM2_BIN
|
||
import shutil as _sh
|
||
for p in ["/home/fzq1228/.local/bin/pm2", "/usr/local/bin/pm2", "/usr/bin/pm2"]:
|
||
if os.path.exists(p):
|
||
_PM2_BIN = p
|
||
return p
|
||
found = _sh.which("pm2")
|
||
if found:
|
||
_PM2_BIN = found
|
||
return found
|
||
return "npx pm2"
|
||
|
||
# 启动时初始化CPU采样(首次调用不阻塞)
|
||
psutil.cpu_percent(interval=None)
|
||
|
||
@app.get("/api/server/status")
|
||
async def get_server_status(user: dict = Depends(get_current_user)):
|
||
"""服务器全状态:CPU/内存/硬盘/负载/PM2进程/PG数据库/回补进度"""
|
||
|
||
# 5秒缓存,避免频繁调用慢操作
|
||
now = time.time()
|
||
if _server_cache["data"] and (now - _server_cache["ts"]) < 5:
|
||
return _server_cache["data"]
|
||
|
||
# CPU(非阻塞,取上次采样间隔的值)
|
||
cpu_percent = psutil.cpu_percent(interval=None)
|
||
cpu_count = psutil.cpu_count()
|
||
|
||
# 内存
|
||
mem = psutil.virtual_memory()
|
||
swap = psutil.swap_memory()
|
||
|
||
# 硬盘
|
||
disk = shutil.disk_usage("/")
|
||
|
||
# 负载
|
||
load1, load5, load15 = os.getloadavg()
|
||
|
||
# Uptime
|
||
boot_time = psutil.boot_time()
|
||
uptime_s = time.time() - boot_time
|
||
|
||
# 网络IO
|
||
net = psutil.net_io_counters()
|
||
|
||
# PM2进程状态(直接调pm2二进制,不走npx)
|
||
pm2_procs = []
|
||
try:
|
||
pm2_bin = _find_pm2_bin()
|
||
cmd = [pm2_bin, "jlist"] if not pm2_bin.startswith("npx") else ["npx", "pm2", "jlist"]
|
||
result = subprocess.run(
|
||
cmd,
|
||
capture_output=True, text=True, timeout=5
|
||
)
|
||
import json as _json
|
||
procs = _json.loads(result.stdout)
|
||
for p in procs:
|
||
pm2_procs.append({
|
||
"name": p.get("name", ""),
|
||
"status": p.get("pm2_env", {}).get("status", "unknown"),
|
||
"cpu": p.get("monit", {}).get("cpu", 0),
|
||
"memory_mb": round(p.get("monit", {}).get("memory", 0) / 1024 / 1024, 1),
|
||
"restarts": p.get("pm2_env", {}).get("restart_time", 0),
|
||
"uptime_ms": p.get("pm2_env", {}).get("pm_uptime", 0),
|
||
"pid": p.get("pid", 0),
|
||
})
|
||
except Exception:
|
||
pm2_procs = []
|
||
|
||
# PG数据库大小 + agg_trades条数(用估算值,快1000倍)
|
||
pg_info = {}
|
||
try:
|
||
row = await async_fetchrow(
|
||
"SELECT pg_database_size(current_database()) as db_size"
|
||
)
|
||
pg_info["db_size_mb"] = round(row["db_size"] / 1024 / 1024, 1) if row else 0
|
||
|
||
# 用PG统计信息估算行数(毫秒级,而非COUNT(*)的秒级全表扫描)
|
||
row2 = await async_fetchrow(
|
||
"SELECT SUM(n_live_tup)::bigint as cnt FROM pg_stat_user_tables WHERE relname LIKE 'agg_trades%'"
|
||
)
|
||
pg_info["agg_trades_count"] = row2["cnt"] if row2 and row2["cnt"] else 0
|
||
|
||
row3 = await async_fetchrow(
|
||
"SELECT n_live_tup::bigint as cnt FROM pg_stat_user_tables WHERE relname = 'rate_snapshots'"
|
||
)
|
||
pg_info["rate_snapshots_count"] = row3["cnt"] if row3 else 0
|
||
|
||
# 各symbol最新数据时间
|
||
meta_rows = await async_fetch("SELECT symbol, last_time_ms, earliest_time_ms FROM agg_trades_meta")
|
||
pg_info["symbols"] = {}
|
||
for m in meta_rows:
|
||
sym = m["symbol"].replace("USDT", "")
|
||
pg_info["symbols"][sym] = {
|
||
"latest_ms": m["last_time_ms"],
|
||
"earliest_ms": m["earliest_time_ms"],
|
||
"span_hours": round((m["last_time_ms"] - m["earliest_time_ms"]) / 3600000, 1),
|
||
}
|
||
except Exception:
|
||
pass
|
||
|
||
# 回补进程
|
||
backfill_running = False
|
||
try:
|
||
for proc in psutil.process_iter(["pid", "cmdline"]):
|
||
cmdline = " ".join(proc.info.get("cmdline") or [])
|
||
if "backfill_agg_trades" in cmdline:
|
||
backfill_running = True
|
||
break
|
||
except Exception:
|
||
pass
|
||
|
||
result = {
|
||
"timestamp": int(time.time() * 1000),
|
||
"cpu": {
|
||
"percent": cpu_percent,
|
||
"cores": cpu_count,
|
||
},
|
||
"memory": {
|
||
"total_gb": round(mem.total / 1024**3, 1),
|
||
"used_gb": round(mem.used / 1024**3, 1),
|
||
"percent": mem.percent,
|
||
"swap_percent": swap.percent,
|
||
},
|
||
"disk": {
|
||
"total_gb": round(disk.total / 1024**3, 1),
|
||
"used_gb": round(disk.used / 1024**3, 1),
|
||
"free_gb": round(disk.free / 1024**3, 1),
|
||
"percent": round(disk.used / disk.total * 100, 1),
|
||
},
|
||
"load": {
|
||
"load1": round(load1, 2),
|
||
"load5": round(load5, 2),
|
||
"load15": round(load15, 2),
|
||
},
|
||
"uptime_hours": round(uptime_s / 3600, 1),
|
||
"network": {
|
||
"bytes_sent_gb": round(net.bytes_sent / 1024**3, 2),
|
||
"bytes_recv_gb": round(net.bytes_recv / 1024**3, 2),
|
||
},
|
||
"pm2": pm2_procs,
|
||
"postgres": pg_info,
|
||
"backfill_running": backfill_running,
|
||
}
|
||
_server_cache["data"] = result
|
||
_server_cache["ts"] = now
|
||
return result
|
||
|
||
|
||
# ============================================================
|
||
# 实盘 API(/api/live/...)
|
||
# ============================================================
|
||
|
||
@app.get("/api/live/summary")
|
||
async def live_summary(
|
||
strategy: str = "v52_8signals",
|
||
user: dict = Depends(get_current_user),
|
||
):
|
||
"""实盘总览"""
|
||
closed = await async_fetch(
|
||
"SELECT pnl_r, direction, fee_usdt, funding_fee_usdt, slippage_bps "
|
||
"FROM live_trades WHERE status NOT IN ('active','tp1_hit') AND strategy = $1",
|
||
strategy,
|
||
)
|
||
active = await async_fetch(
|
||
"SELECT id FROM live_trades WHERE status IN ('active','tp1_hit') AND strategy = $1",
|
||
strategy,
|
||
)
|
||
first = await async_fetchrow(
|
||
"SELECT MIN(created_at) as start FROM live_trades WHERE strategy = $1",
|
||
strategy,
|
||
)
|
||
|
||
total = len(closed)
|
||
wins = len([r for r in closed if r["pnl_r"] and r["pnl_r"] > 0])
|
||
total_pnl = sum(r["pnl_r"] for r in closed if r["pnl_r"])
|
||
total_fee = sum(r["fee_usdt"] or 0 for r in closed)
|
||
total_funding = sum(r["funding_fee_usdt"] or 0 for r in closed)
|
||
win_rate = (wins / total * 100) if total > 0 else 0
|
||
gross_profit = sum(r["pnl_r"] for r in closed if r["pnl_r"] and r["pnl_r"] > 0)
|
||
gross_loss = abs(sum(r["pnl_r"] for r in closed if r["pnl_r"] and r["pnl_r"] <= 0))
|
||
profit_factor = (gross_profit / gross_loss) if gross_loss > 0 else 0
|
||
|
||
# 读风控状态
|
||
risk_status = {}
|
||
try:
|
||
import json as _json
|
||
with open("/tmp/risk_guard_state.json") as f:
|
||
risk_status = _json.load(f)
|
||
except:
|
||
risk_status = {"status": "unknown"}
|
||
|
||
return {
|
||
"total_trades": total,
|
||
"win_rate": round(win_rate, 1),
|
||
"total_pnl_r": round(total_pnl, 2),
|
||
"total_pnl_usdt": round(total_pnl * (await _get_risk_usd()), 2),
|
||
"active_positions": len(active),
|
||
"profit_factor": round(profit_factor, 2),
|
||
"total_fee_usdt": round(total_fee, 2),
|
||
"total_funding_usdt": round(total_funding, 2),
|
||
"start_time": str(first["start"]) if first and first["start"] else None,
|
||
"risk_status": risk_status,
|
||
}
|
||
|
||
|
||
@app.get("/api/live/positions")
|
||
async def live_positions(
|
||
strategy: str = "v52_8signals",
|
||
user: dict = Depends(get_current_user),
|
||
):
|
||
"""实盘当前持仓"""
|
||
rows = await async_fetch(
|
||
"SELECT id, symbol, direction, score, tier, strategy, entry_price, entry_ts, "
|
||
"tp1_price, tp2_price, sl_price, tp1_hit, status, risk_distance, "
|
||
"binance_order_id, fill_price, slippage_bps, protection_gap_ms, "
|
||
"signal_to_order_ms, order_to_fill_ms, score_factors "
|
||
"FROM live_trades WHERE status IN ('active','tp1_hit') AND strategy = $1 "
|
||
"ORDER BY entry_ts DESC",
|
||
strategy,
|
||
)
|
||
# 实时价格
|
||
prices = {}
|
||
symbols_needed = list(set(r["symbol"] for r in rows))
|
||
if symbols_needed:
|
||
try:
|
||
async with httpx.AsyncClient(timeout=5) as client:
|
||
resp = await client.get("https://fapi.binance.com/fapi/v1/ticker/price")
|
||
if resp.status_code == 200:
|
||
for item in resp.json():
|
||
if item["symbol"] in symbols_needed:
|
||
prices[item["symbol"]] = float(item["price"])
|
||
except:
|
||
pass
|
||
|
||
result = []
|
||
for r in rows:
|
||
d = dict(r)
|
||
current_price = prices.get(r["symbol"], 0)
|
||
d["current_price"] = current_price
|
||
entry = r["entry_price"] or 0
|
||
rd = r.get("risk_distance") or 1
|
||
if rd > 0 and entry > 0 and current_price > 0:
|
||
if r["direction"] == "LONG":
|
||
d["unrealized_pnl_r"] = round((current_price - entry) / rd, 4)
|
||
else:
|
||
d["unrealized_pnl_r"] = round((entry - current_price) / rd, 4)
|
||
d["unrealized_pnl_usdt"] = round(d["unrealized_pnl_r"] * (await _get_risk_usd()), 2)
|
||
else:
|
||
d["unrealized_pnl_r"] = 0
|
||
d["unrealized_pnl_usdt"] = 0
|
||
# 持仓时间
|
||
if r["entry_ts"]:
|
||
import time as _time
|
||
d["hold_time_min"] = round((_time.time() * 1000 - r["entry_ts"]) / 60000, 1)
|
||
result.append(d)
|
||
return {"data": result}
|
||
|
||
|
||
@app.get("/api/live/trades")
|
||
async def live_trades(
|
||
symbol: str = "all",
|
||
result: str = "all",
|
||
strategy: str = "v52_8signals",
|
||
limit: int = 100,
|
||
user: dict = Depends(get_current_user),
|
||
):
|
||
"""实盘历史交易"""
|
||
conditions = ["status NOT IN ('active','tp1_hit')"]
|
||
params = []
|
||
idx = 1
|
||
|
||
conditions.append(f"strategy = ${idx}")
|
||
params.append(strategy)
|
||
idx += 1
|
||
|
||
if symbol != "all":
|
||
conditions.append(f"symbol = ${idx}")
|
||
params.append(symbol.upper() + "USDT")
|
||
idx += 1
|
||
if result == "win":
|
||
conditions.append("pnl_r > 0")
|
||
elif result == "loss":
|
||
conditions.append("pnl_r <= 0")
|
||
|
||
where = " AND ".join(conditions)
|
||
params.append(limit)
|
||
rows = await async_fetch(
|
||
f"SELECT id, symbol, direction, score, tier, strategy, entry_price, exit_price, "
|
||
f"entry_ts, exit_ts, pnl_r, status, tp1_hit, score_factors, "
|
||
f"binance_order_id, fill_price, slippage_bps, fee_usdt, funding_fee_usdt, "
|
||
f"protection_gap_ms, signal_to_order_ms, order_to_fill_ms, risk_distance, "
|
||
f"tp1_price, tp2_price, sl_price "
|
||
f"FROM live_trades WHERE {where} ORDER BY exit_ts DESC LIMIT ${idx}",
|
||
*params
|
||
)
|
||
# PnL拆解
|
||
result_data = []
|
||
for r in rows:
|
||
d = dict(r)
|
||
entry = r["entry_price"] or 0
|
||
exit_p = r["exit_price"] or 0
|
||
rd = r["risk_distance"] or 1
|
||
direction = r["direction"]
|
||
tp1_hit = r["tp1_hit"]
|
||
tp1_price = r.get("tp1_price") or 0
|
||
|
||
# gross_pnl_r(不含任何费用)
|
||
if direction == "LONG":
|
||
raw_r = (exit_p - entry) / rd if rd > 0 else 0
|
||
else:
|
||
raw_r = (entry - exit_p) / rd if rd > 0 else 0
|
||
if tp1_hit and tp1_price:
|
||
tp1_r = abs(tp1_price - entry) / rd if rd > 0 else 0
|
||
gross_r = 0.5 * tp1_r + 0.5 * raw_r
|
||
else:
|
||
gross_r = raw_r
|
||
|
||
fee_usdt = r["fee_usdt"] or 0
|
||
funding_usdt = r["funding_fee_usdt"] or 0
|
||
risk_usd = await _get_risk_usd()
|
||
fee_r = fee_usdt / risk_usd if risk_usd > 0 else 0
|
||
funding_r = abs(funding_usdt) / risk_usd if funding_usdt < 0 else 0
|
||
# slippage_r: 滑点造成的R损失
|
||
slippage_bps = r["slippage_bps"] or 0
|
||
slippage_usdt = abs(slippage_bps) / 10000 * entry * (risk_usd / rd) if rd > 0 else 0
|
||
slippage_r = slippage_usdt / risk_usd if risk_usd > 0 else 0
|
||
|
||
d["gross_pnl_r"] = round(gross_r, 4)
|
||
d["fee_r"] = round(fee_r, 4)
|
||
d["funding_r"] = round(funding_r, 4)
|
||
d["slippage_r"] = round(slippage_r, 4)
|
||
d["net_pnl_r"] = r["pnl_r"] # 已经是net
|
||
result_data.append(d)
|
||
|
||
return {"count": len(result_data), "data": result_data}
|
||
|
||
|
||
@app.get("/api/live/equity-curve")
|
||
async def live_equity_curve(
|
||
strategy: str = "v52_8signals",
|
||
user: dict = Depends(get_current_user),
|
||
):
|
||
"""实盘权益曲线"""
|
||
rows = await async_fetch(
|
||
"SELECT exit_ts, pnl_r FROM live_trades "
|
||
"WHERE status NOT IN ('active','tp1_hit') AND strategy = $1 ORDER BY exit_ts ASC",
|
||
strategy,
|
||
)
|
||
cumulative = 0.0
|
||
curve = []
|
||
for r in rows:
|
||
cumulative += r["pnl_r"] or 0
|
||
curve.append({"ts": r["exit_ts"], "pnl": round(cumulative, 2)})
|
||
return {"data": curve}
|
||
|
||
|
||
@app.get("/api/live/stats")
|
||
async def live_stats(
|
||
strategy: str = "v52_8signals",
|
||
user: dict = Depends(get_current_user),
|
||
):
|
||
"""实盘详细统计"""
|
||
rows = await async_fetch(
|
||
"SELECT symbol, direction, pnl_r, tier, entry_ts, exit_ts, slippage_bps "
|
||
"FROM live_trades WHERE status NOT IN ('active','tp1_hit') AND strategy = $1",
|
||
strategy,
|
||
)
|
||
if not rows:
|
||
return {"error": "no data"}
|
||
|
||
total = len(rows)
|
||
wins = [r for r in rows if r["pnl_r"] and r["pnl_r"] > 0]
|
||
losses = [r for r in rows if r["pnl_r"] and r["pnl_r"] <= 0]
|
||
win_rate = len(wins) / total * 100 if total > 0 else 0
|
||
avg_win = sum(r["pnl_r"] for r in wins) / len(wins) if wins else 0
|
||
avg_loss = sum(abs(r["pnl_r"]) for r in losses) / len(losses) if losses else 0
|
||
win_loss_ratio = avg_win / avg_loss if avg_loss > 0 else 0
|
||
total_pnl = sum(r["pnl_r"] for r in rows if r["pnl_r"])
|
||
|
||
# 滑点统计
|
||
slippages = [r["slippage_bps"] for r in rows if r["slippage_bps"] is not None]
|
||
avg_slippage = sum(slippages) / len(slippages) if slippages else 0
|
||
slippages_sorted = sorted(slippages) if slippages else [0]
|
||
p50_slip = slippages_sorted[len(slippages_sorted)//2] if slippages_sorted else 0
|
||
p95_idx = min(int(len(slippages_sorted)*0.95), len(slippages_sorted)-1)
|
||
p95_slip = slippages_sorted[p95_idx] if slippages_sorted else 0
|
||
|
||
# MDD
|
||
cum = 0
|
||
peak = 0
|
||
mdd = 0
|
||
for r in sorted(rows, key=lambda x: x["exit_ts"] or 0):
|
||
cum += r["pnl_r"] or 0
|
||
if cum > peak:
|
||
peak = cum
|
||
dd = peak - cum
|
||
if dd > mdd:
|
||
mdd = dd
|
||
|
||
# 按币种
|
||
by_symbol = {}
|
||
for r in rows:
|
||
s = r["symbol"]
|
||
if s not in by_symbol:
|
||
by_symbol[s] = {"wins": 0, "total": 0, "pnl": 0}
|
||
by_symbol[s]["total"] += 1
|
||
by_symbol[s]["pnl"] += r["pnl_r"] or 0
|
||
if r["pnl_r"] and r["pnl_r"] > 0:
|
||
by_symbol[s]["wins"] += 1
|
||
for s in by_symbol:
|
||
by_symbol[s]["win_rate"] = round(by_symbol[s]["wins"]/by_symbol[s]["total"]*100, 1) if by_symbol[s]["total"] > 0 else 0
|
||
by_symbol[s]["total_pnl"] = round(by_symbol[s]["pnl"], 2)
|
||
|
||
return {
|
||
"total": total,
|
||
"win_rate": round(win_rate, 1),
|
||
"avg_win": round(avg_win, 3),
|
||
"avg_loss": round(avg_loss, 3),
|
||
"win_loss_ratio": round(win_loss_ratio, 2),
|
||
"total_pnl": round(total_pnl, 2),
|
||
"mdd": round(mdd, 2),
|
||
"avg_slippage_bps": round(avg_slippage, 2),
|
||
"p50_slippage_bps": round(p50_slip, 2),
|
||
"p95_slippage_bps": round(p95_slip, 2),
|
||
"by_symbol": by_symbol,
|
||
}
|
||
|
||
|
||
@app.get("/api/live/risk-status")
|
||
async def live_risk_status(user: dict = Depends(get_current_user)):
|
||
"""风控状态"""
|
||
try:
|
||
import json as _json
|
||
with open("/tmp/risk_guard_state.json") as f:
|
||
return _json.load(f)
|
||
except:
|
||
return {"status": "unknown", "error": "risk_guard_state.json not found"}
|
||
|
||
|
||
async def _get_risk_usd() -> float:
|
||
"""从live_config读取1R金额,缓存60秒"""
|
||
try:
|
||
row = await async_fetchrow("SELECT value FROM live_config WHERE key = $1", "risk_per_trade_usd")
|
||
return float(row["value"]) if row else 2.0
|
||
except Exception:
|
||
return 2.0
|
||
|
||
|
||
def _require_admin(user: dict):
|
||
"""检查管理员权限"""
|
||
if user.get("role") != "admin":
|
||
raise HTTPException(status_code=403, detail="仅管理员可执行此操作")
|
||
|
||
|
||
@app.post("/api/live/emergency-close")
|
||
async def live_emergency_close(user: dict = Depends(get_current_user)):
|
||
"""紧急全平(写标记文件,由risk_guard执行)"""
|
||
_require_admin(user)
|
||
try:
|
||
import json as _json
|
||
with open("/tmp/risk_guard_emergency.json", "w") as f:
|
||
_json.dump({"action": "close_all", "time": time.time(), "user": user.get("email", "unknown")}, f)
|
||
return {"ok": True, "message": "紧急平仓指令已发送"}
|
||
except Exception as e:
|
||
return {"ok": False, "error": str(e)}
|
||
|
||
|
||
@app.post("/api/live/block-new")
|
||
async def live_block_new(user: dict = Depends(get_current_user)):
|
||
"""禁止新开仓"""
|
||
_require_admin(user)
|
||
try:
|
||
import json as _json
|
||
with open("/tmp/risk_guard_emergency.json", "w") as f:
|
||
_json.dump({"action": "block_new", "time": time.time(), "user": user.get("email", "unknown")}, f)
|
||
return {"ok": True, "message": "已禁止新开仓"}
|
||
except Exception as e:
|
||
return {"ok": False, "error": str(e)}
|
||
|
||
|
||
@app.post("/api/live/resume")
|
||
async def live_resume(user: dict = Depends(get_current_user)):
|
||
"""恢复交易"""
|
||
_require_admin(user)
|
||
try:
|
||
import json as _json
|
||
with open("/tmp/risk_guard_emergency.json", "w") as f:
|
||
_json.dump({"action": "resume", "time": time.time(), "user": user.get("email", "unknown")}, f)
|
||
return {"ok": True, "message": "已恢复交易"}
|
||
except Exception as e:
|
||
return {"ok": False, "error": str(e)}
|
||
|
||
|
||
# ============================================================
|
||
# 实盘 API 补充(L0-L11)
|
||
# ============================================================
|
||
|
||
@app.get("/api/live/account")
|
||
async def live_account(user: dict = Depends(get_current_user)):
|
||
"""L2: 账户概览 — 权益/保证金/杠杆/今日成交额"""
|
||
import httpx
|
||
import hashlib, hmac, time as _time
|
||
from urllib.parse import urlencode
|
||
|
||
api_key = os.environ.get("BINANCE_API_KEY", "")
|
||
secret_key = os.environ.get("BINANCE_SECRET_KEY", "")
|
||
trade_env = os.environ.get("TRADE_ENV", "testnet")
|
||
base = "https://testnet.binancefuture.com" if trade_env == "testnet" else "https://fapi.binance.com"
|
||
|
||
if not api_key or not secret_key:
|
||
return {"error": "API keys not configured", "equity": 0, "available_margin": 0, "used_margin": 0, "effective_leverage": 0}
|
||
|
||
def sign(params):
|
||
params["timestamp"] = int(_time.time() * 1000)
|
||
qs = urlencode(params)
|
||
sig = hmac.new(secret_key.encode(), qs.encode(), hashlib.sha256).hexdigest()
|
||
params["signature"] = sig
|
||
return params
|
||
|
||
try:
|
||
async with httpx.AsyncClient(timeout=5) as client:
|
||
# 账户信息
|
||
params = sign({})
|
||
resp = await client.get(f"{base}/fapi/v2/account", params=params, headers={"X-MBX-APIKEY": api_key})
|
||
if resp.status_code != 200:
|
||
return {"error": f"API {resp.status_code}"}
|
||
acc = resp.json()
|
||
|
||
equity = float(acc.get("totalWalletBalance", 0))
|
||
available = float(acc.get("availableBalance", 0))
|
||
used_margin = float(acc.get("totalInitialMargin", 0))
|
||
unrealized = float(acc.get("totalUnrealizedProfit", 0))
|
||
effective_leverage = round(used_margin / equity, 2) if equity > 0 else 0
|
||
|
||
# 今日已实现PnL
|
||
today_realized_r = 0
|
||
today_fee = 0
|
||
today_volume = 0
|
||
try:
|
||
rows = await async_fetch(
|
||
"SELECT pnl_r, fee_usdt FROM live_trades WHERE exit_ts >= $1 AND status NOT IN ('active','tp1_hit')",
|
||
int(datetime.now(timezone.utc).replace(hour=0,minute=0,second=0,microsecond=0).timestamp() * 1000)
|
||
)
|
||
today_realized_r = sum(r["pnl_r"] or 0 for r in rows)
|
||
today_fee = sum(r["fee_usdt"] or 0 for r in rows)
|
||
except:
|
||
pass
|
||
|
||
return {
|
||
"equity": round(equity, 2),
|
||
"available_margin": round(available, 2),
|
||
"used_margin": round(used_margin, 2),
|
||
"unrealized_pnl": round(unrealized, 2),
|
||
"effective_leverage": effective_leverage,
|
||
"today_realized_r": round(today_realized_r, 2),
|
||
"today_realized_usdt": round(today_realized_r * (await _get_risk_usd()), 2),
|
||
"today_fee": round(today_fee, 2),
|
||
"today_volume": round(today_volume, 2),
|
||
}
|
||
except Exception as e:
|
||
return {"error": str(e)}
|
||
|
||
|
||
@app.get("/api/live/health")
|
||
async def live_system_health(user: dict = Depends(get_current_user)):
|
||
"""L11: 系统健康 — 各进程心跳、API状态、数据新鲜度"""
|
||
import subprocess, time as _time
|
||
|
||
health = {
|
||
"ts": int(_time.time() * 1000),
|
||
"processes": {},
|
||
"data_freshness": {},
|
||
"api_status": "unknown",
|
||
}
|
||
|
||
# PM2进程状态
|
||
try:
|
||
result = subprocess.run(["pm2", "jlist"], capture_output=True, text=True, timeout=5)
|
||
import json as _json
|
||
procs = _json.loads(result.stdout) if result.stdout else []
|
||
for p in procs:
|
||
name = p.get("name", "")
|
||
if name in ("live-executor", "position-sync", "risk-guard", "signal-engine", "market-collector", "paper-monitor", "liq-collector"):
|
||
health["processes"][name] = {
|
||
"status": p.get("pm2_env", {}).get("status", "unknown"),
|
||
"uptime_ms": p.get("pm2_env", {}).get("pm_uptime", 0),
|
||
"restarts": p.get("pm2_env", {}).get("restart_time", 0),
|
||
"memory_mb": round(p.get("monit", {}).get("memory", 0) / 1024 / 1024, 1),
|
||
"cpu": p.get("monit", {}).get("cpu", 0),
|
||
}
|
||
except:
|
||
pass
|
||
|
||
# 数据新鲜度
|
||
try:
|
||
now_ms = int(_time.time() * 1000)
|
||
# 最新行情数据
|
||
latest_market = await async_fetchrow("SELECT MAX(ts) as ts FROM signal_indicators")
|
||
if latest_market and latest_market["ts"]:
|
||
age_sec = (now_ms - latest_market["ts"]) / 1000
|
||
health["data_freshness"]["market_data"] = {
|
||
"last_ts": latest_market["ts"],
|
||
"age_sec": round(age_sec, 1),
|
||
"status": "red" if age_sec > 10 else "yellow" if age_sec > 5 else "green",
|
||
}
|
||
|
||
# 最新对账
|
||
risk_state = {}
|
||
try:
|
||
import json as _json2
|
||
with open("/tmp/risk_guard_state.json") as f:
|
||
risk_state = _json2.load(f)
|
||
except:
|
||
pass
|
||
health["risk_guard"] = risk_state
|
||
except:
|
||
pass
|
||
|
||
return health
|
||
|
||
|
||
@app.get("/api/live/reconciliation")
|
||
async def live_reconciliation(user: dict = Depends(get_current_user)):
|
||
"""L5: 对账状态 — 本地 vs 币安"""
|
||
import httpx, hashlib, hmac, time as _time
|
||
from urllib.parse import urlencode
|
||
|
||
api_key = os.environ.get("BINANCE_API_KEY", "")
|
||
secret_key = os.environ.get("BINANCE_SECRET_KEY", "")
|
||
trade_env = os.environ.get("TRADE_ENV", "testnet")
|
||
base = "https://testnet.binancefuture.com" if trade_env == "testnet" else "https://fapi.binance.com"
|
||
|
||
if not api_key or not secret_key:
|
||
return {"error": "API keys not configured"}
|
||
|
||
def sign(params):
|
||
params["timestamp"] = int(_time.time() * 1000)
|
||
qs = urlencode(params)
|
||
sig = hmac.new(secret_key.encode(), qs.encode(), hashlib.sha256).hexdigest()
|
||
params["signature"] = sig
|
||
return params
|
||
|
||
result = {"local_positions": [], "exchange_positions": [], "local_orders": 0, "exchange_orders": 0, "diffs": [], "status": "ok"}
|
||
|
||
try:
|
||
async with httpx.AsyncClient(timeout=5) as client:
|
||
# 币安持仓
|
||
params = sign({})
|
||
resp = await client.get(f"{base}/fapi/v2/positionRisk", params=params, headers={"X-MBX-APIKEY": api_key})
|
||
exchange_positions = []
|
||
if resp.status_code == 200:
|
||
for p in resp.json():
|
||
amt = float(p.get("positionAmt", 0))
|
||
if amt != 0:
|
||
exchange_positions.append({
|
||
"symbol": p["symbol"],
|
||
"direction": "LONG" if amt > 0 else "SHORT",
|
||
"amount": abs(amt),
|
||
"entry_price": float(p.get("entryPrice", 0)),
|
||
"mark_price": float(p.get("markPrice", 0)),
|
||
"liquidation_price": float(p.get("liquidationPrice", 0)),
|
||
"unrealized_pnl": float(p.get("unRealizedProfit", 0)),
|
||
})
|
||
result["exchange_positions"] = exchange_positions
|
||
|
||
# 币安挂单
|
||
params2 = sign({})
|
||
resp2 = await client.get(f"{base}/fapi/v1/openOrders", params=params2, headers={"X-MBX-APIKEY": api_key})
|
||
exchange_orders = resp2.json() if resp2.status_code == 200 else []
|
||
result["exchange_orders"] = len(exchange_orders)
|
||
|
||
# 本地持仓
|
||
local = await async_fetch(
|
||
"SELECT id, symbol, direction, entry_price, sl_price, tp1_price, tp2_price, status, tp1_hit "
|
||
"FROM live_trades WHERE status IN ('active','tp1_hit')"
|
||
)
|
||
result["local_positions"] = [dict(r) for r in local]
|
||
result["local_orders"] = len(local) * 3 # 预期每仓3挂单(SL+TP1+TP2)
|
||
|
||
# 对比差异
|
||
local_syms = {r["symbol"]: r for r in local}
|
||
exchange_syms = {p["symbol"]: p for p in exchange_positions}
|
||
|
||
for sym, lp in local_syms.items():
|
||
if sym not in exchange_syms:
|
||
result["diffs"].append({"symbol": sym, "type": "local_only", "severity": "critical", "detail": f"本地有{lp['direction']}仓位但币安无持仓"})
|
||
else:
|
||
ep = exchange_syms[sym]
|
||
if lp["direction"] != ep["direction"]:
|
||
result["diffs"].append({"symbol": sym, "type": "direction_mismatch", "severity": "critical", "detail": f"本地={lp['direction']} 币安={ep['direction']}"})
|
||
# 清算距离
|
||
if ep["liquidation_price"] > 0 and ep["mark_price"] > 0:
|
||
if ep["direction"] == "LONG":
|
||
dist = (ep["mark_price"] - ep["liquidation_price"]) / ep["mark_price"] * 100
|
||
else:
|
||
dist = (ep["liquidation_price"] - ep["mark_price"]) / ep["mark_price"] * 100
|
||
if dist < 8:
|
||
result["diffs"].append({"symbol": sym, "type": "liquidation_critical", "severity": "critical", "detail": f"距清算仅{dist:.1f}%"})
|
||
elif dist < 12:
|
||
result["diffs"].append({"symbol": sym, "type": "liquidation_warning", "severity": "high", "detail": f"距清算{dist:.1f}%"})
|
||
|
||
for sym, ep in exchange_syms.items():
|
||
if sym not in local_syms:
|
||
result["diffs"].append({"symbol": sym, "type": "exchange_only", "severity": "high", "detail": f"币安有{ep['direction']}仓位但本地无记录"})
|
||
|
||
if result["diffs"]:
|
||
result["status"] = "mismatch"
|
||
|
||
except Exception as e:
|
||
result["error"] = str(e)
|
||
|
||
return result
|
||
|
||
|
||
@app.get("/api/live/execution-quality")
|
||
async def live_execution_quality(user: dict = Depends(get_current_user)):
|
||
"""L4: 执行质量面板"""
|
||
rows = await async_fetch(
|
||
"SELECT symbol, slippage_bps, signal_to_order_ms, order_to_fill_ms, protection_gap_ms "
|
||
"FROM live_trades WHERE signal_to_order_ms IS NOT NULL ORDER BY created_at DESC LIMIT 200"
|
||
)
|
||
if not rows:
|
||
return {"error": "no data"}
|
||
|
||
# 按币种分组
|
||
by_coin = {}
|
||
all_slips = []
|
||
all_s2o = []
|
||
all_o2f = []
|
||
all_prot = []
|
||
|
||
for r in rows:
|
||
sym = r["symbol"]
|
||
if sym not in by_coin:
|
||
by_coin[sym] = {"slippages": [], "s2o": [], "o2f": [], "protection": [], "count": 0}
|
||
by_coin[sym]["count"] += 1
|
||
if r["slippage_bps"] is not None:
|
||
by_coin[sym]["slippages"].append(r["slippage_bps"])
|
||
all_slips.append(r["slippage_bps"])
|
||
if r["signal_to_order_ms"] is not None:
|
||
by_coin[sym]["s2o"].append(r["signal_to_order_ms"])
|
||
all_s2o.append(r["signal_to_order_ms"])
|
||
if r["order_to_fill_ms"] is not None:
|
||
by_coin[sym]["o2f"].append(r["order_to_fill_ms"])
|
||
all_o2f.append(r["order_to_fill_ms"])
|
||
if r["protection_gap_ms"] is not None:
|
||
by_coin[sym]["protection"].append(r["protection_gap_ms"])
|
||
all_prot.append(r["protection_gap_ms"])
|
||
|
||
def percentile(arr, p):
|
||
if not arr: return 0
|
||
s = sorted(arr)
|
||
idx = min(int(len(s) * p / 100), len(s) - 1)
|
||
return s[idx]
|
||
|
||
def stats(arr):
|
||
if not arr: return {"avg": 0, "p50": 0, "p95": 0, "min": 0, "max": 0}
|
||
return {
|
||
"avg": round(sum(arr)/len(arr), 2),
|
||
"p50": round(percentile(arr, 50), 2),
|
||
"p95": round(percentile(arr, 95), 2),
|
||
"min": round(min(arr), 2),
|
||
"max": round(max(arr), 2),
|
||
}
|
||
|
||
result = {
|
||
"total_trades": len(rows),
|
||
"overall": {
|
||
"slippage_bps": stats(all_slips),
|
||
"signal_to_order_ms": stats(all_s2o),
|
||
"order_to_fill_ms": stats(all_o2f),
|
||
"protection_gap_ms": stats(all_prot),
|
||
},
|
||
"by_symbol": {},
|
||
}
|
||
|
||
for sym, d in by_coin.items():
|
||
result["by_symbol"][sym] = {
|
||
"count": d["count"],
|
||
"slippage_bps": stats(d["slippages"]),
|
||
"signal_to_order_ms": stats(d["s2o"]),
|
||
"order_to_fill_ms": stats(d["o2f"]),
|
||
"protection_gap_ms": stats(d["protection"]),
|
||
}
|
||
|
||
return result
|
||
|
||
|
||
@app.get("/api/live/paper-comparison")
|
||
async def live_paper_comparison(
|
||
limit: int = 50,
|
||
user: dict = Depends(get_current_user),
|
||
):
|
||
"""L8: 实盘 vs 模拟盘对照"""
|
||
# 按signal_id匹配
|
||
rows = await async_fetch("""
|
||
SELECT lt.symbol, lt.direction, lt.entry_price as live_entry, lt.exit_price as live_exit,
|
||
lt.pnl_r as live_pnl, lt.slippage_bps as live_slip, lt.entry_ts as live_entry_ts,
|
||
lt.signal_id,
|
||
pt.entry_price as paper_entry, pt.exit_price as paper_exit, pt.pnl_r as paper_pnl
|
||
FROM live_trades lt
|
||
LEFT JOIN paper_trades pt ON lt.signal_id = pt.signal_id AND lt.strategy = pt.strategy
|
||
WHERE lt.status NOT IN ('active','tp1_hit')
|
||
ORDER BY lt.exit_ts DESC
|
||
LIMIT $1
|
||
""", limit)
|
||
|
||
comparisons = []
|
||
total_entry_diff = 0
|
||
total_pnl_diff = 0
|
||
count = 0
|
||
|
||
for r in rows:
|
||
d = dict(r)
|
||
if r["paper_entry"] and r["live_entry"]:
|
||
d["entry_diff"] = round(r["live_entry"] - r["paper_entry"], 6)
|
||
d["entry_diff_bps"] = round(d["entry_diff"] / r["paper_entry"] * 10000, 2) if r["paper_entry"] else 0
|
||
if r["paper_pnl"] is not None and r["live_pnl"] is not None:
|
||
d["pnl_diff_r"] = round(r["live_pnl"] - r["paper_pnl"], 4)
|
||
total_pnl_diff += d["pnl_diff_r"]
|
||
count += 1
|
||
comparisons.append(d)
|
||
|
||
return {
|
||
"count": len(comparisons),
|
||
"avg_pnl_diff_r": round(total_pnl_diff / count, 4) if count > 0 else 0,
|
||
"data": comparisons,
|
||
}
|
||
|
||
|
||
# ============ Live Events (L7通知流) ============
|
||
|
||
async def log_live_event(level: str, category: str, message: str, symbol: str = None, detail: dict = None):
|
||
"""写入实盘事件日志(供各模块调用)"""
|
||
try:
|
||
import json as _json
|
||
await async_execute(
|
||
"INSERT INTO live_events (level, category, symbol, message, detail) VALUES ($1, $2, $3, $4, $5)",
|
||
level, category, symbol, message, _json.dumps(detail) if detail else None
|
||
)
|
||
except Exception:
|
||
pass
|
||
|
||
|
||
@app.get("/api/live/events")
|
||
async def live_events(
|
||
limit: int = 50,
|
||
level: str = "all",
|
||
category: str = "all",
|
||
user: dict = Depends(get_current_user),
|
||
):
|
||
"""实盘事件流"""
|
||
conditions = ["1=1"]
|
||
params = []
|
||
idx = 1
|
||
|
||
if level != "all":
|
||
conditions.append(f"level = ${idx}")
|
||
params.append(level)
|
||
idx += 1
|
||
if category != "all":
|
||
conditions.append(f"category = ${idx}")
|
||
params.append(category)
|
||
idx += 1
|
||
|
||
params.append(limit)
|
||
where = " AND ".join(conditions)
|
||
rows = await async_fetch(
|
||
f"SELECT id, ts, level, category, symbol, message, detail "
|
||
f"FROM live_events WHERE {where} ORDER BY ts DESC LIMIT ${idx}",
|
||
*params
|
||
)
|
||
return {"count": len(rows), "data": rows}
|
||
|
||
|
||
# ============ Live Config (实盘配置) ============
|
||
|
||
@app.get("/api/live/config")
|
||
async def live_config_get(user: dict = Depends(get_current_user)):
|
||
"""获取实盘配置"""
|
||
rows = await async_fetch("SELECT key, value, label, updated_at FROM live_config ORDER BY key")
|
||
config = {}
|
||
for r in rows:
|
||
config[r["key"]] = {"value": r["value"], "label": r["label"], "updated_at": str(r["updated_at"])}
|
||
return config
|
||
|
||
|
||
@app.put("/api/live/config")
|
||
async def live_config_update(request: Request, user: dict = Depends(get_current_user)):
|
||
"""更新实盘配置"""
|
||
_require_admin(user)
|
||
body = await request.json()
|
||
updated = []
|
||
for key, value in body.items():
|
||
await async_execute(
|
||
"UPDATE live_config SET value = $1, updated_at = NOW() WHERE key = $2",
|
||
str(value), key
|
||
)
|
||
updated.append(key)
|
||
return {"updated": updated}
|