arbitrage-engine/backend/main.py

455 lines
16 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from fastapi import FastAPI, HTTPException, Depends
from fastapi.middleware.cors import CORSMiddleware
import httpx
from datetime import datetime, timedelta
import asyncio, time, os
from auth import router as auth_router, get_current_user, ensure_tables as ensure_auth_tables
from db import (
init_schema, ensure_partitions, get_async_pool, async_fetch, async_fetchrow, async_execute,
close_async_pool,
)
import datetime as _dt
app = FastAPI(title="Arbitrage Engine API")
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_methods=["*"],
allow_headers=["*"],
)
app.include_router(auth_router)
BINANCE_FAPI = "https://fapi.binance.com/fapi/v1"
SYMBOLS = ["BTCUSDT", "ETHUSDT"]
HEADERS = {"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36"}
# 简单内存缓存history/stats 60秒rates 3秒
_cache: dict = {}
def get_cache(key: str, ttl: int):
entry = _cache.get(key)
if entry and time.time() - entry["ts"] < ttl:
return entry["data"]
return None
def set_cache(key: str, data):
_cache[key] = {"ts": time.time(), "data": data}
async def save_snapshot(rates: dict):
try:
btc = rates.get("BTC", {})
eth = rates.get("ETH", {})
await async_execute(
"INSERT INTO rate_snapshots (ts, btc_rate, eth_rate, btc_price, eth_price, btc_index_price, eth_index_price) "
"VALUES ($1,$2,$3,$4,$5,$6,$7)",
int(time.time()),
float(btc.get("lastFundingRate", 0)),
float(eth.get("lastFundingRate", 0)),
float(btc.get("markPrice", 0)),
float(eth.get("markPrice", 0)),
float(btc.get("indexPrice", 0)),
float(eth.get("indexPrice", 0)),
)
except Exception as e:
pass # 落库失败不影响API响应
async def background_snapshot_loop():
"""后台每2秒自动拉取费率+价格并落库"""
while True:
try:
async with httpx.AsyncClient(timeout=5, headers=HEADERS) as client:
tasks = [client.get(f"{BINANCE_FAPI}/premiumIndex", params={"symbol": s}) for s in SYMBOLS]
responses = await asyncio.gather(*tasks, return_exceptions=True)
result = {}
for sym, resp in zip(SYMBOLS, responses):
if isinstance(resp, Exception) or resp.status_code != 200:
continue
data = resp.json()
key = sym.replace("USDT", "")
result[key] = {
"lastFundingRate": float(data["lastFundingRate"]),
"markPrice": float(data["markPrice"]),
"indexPrice": float(data["indexPrice"]),
}
if result:
await save_snapshot(result)
except Exception:
pass
await asyncio.sleep(2)
@app.on_event("startup")
async def startup():
# 初始化PG schema
init_schema()
ensure_auth_tables()
# 初始化asyncpg池
await get_async_pool()
asyncio.create_task(background_snapshot_loop())
@app.on_event("shutdown")
async def shutdown():
await close_async_pool()
@app.get("/api/health")
async def health():
return {"status": "ok", "timestamp": datetime.utcnow().isoformat()}
@app.get("/api/rates")
async def get_rates():
cached = get_cache("rates", 3)
if cached: return cached
async with httpx.AsyncClient(timeout=10, headers=HEADERS) as client:
tasks = [client.get(f"{BINANCE_FAPI}/premiumIndex", params={"symbol": s}) for s in SYMBOLS]
responses = await asyncio.gather(*tasks)
result = {}
for sym, resp in zip(SYMBOLS, responses):
if resp.status_code != 200:
raise HTTPException(status_code=502, detail=f"Binance error for {sym}")
data = resp.json()
key = sym.replace("USDT", "")
result[key] = {
"symbol": sym,
"markPrice": float(data["markPrice"]),
"indexPrice": float(data["indexPrice"]),
"lastFundingRate": float(data["lastFundingRate"]),
"nextFundingTime": data["nextFundingTime"],
"timestamp": data["time"],
}
set_cache("rates", result)
asyncio.create_task(save_snapshot(result))
return result
@app.get("/api/snapshots")
async def get_snapshots(hours: int = 24, limit: int = 5000, user: dict = Depends(get_current_user)):
since = int(time.time()) - hours * 3600
rows = await async_fetch(
"SELECT ts, btc_rate, eth_rate, btc_price, eth_price FROM rate_snapshots "
"WHERE ts >= $1 ORDER BY ts ASC LIMIT $2",
since, limit
)
return {"count": len(rows), "hours": hours, "data": rows}
@app.get("/api/kline")
async def get_kline(symbol: str = "BTC", interval: str = "5m", limit: int = 500, user: dict = Depends(get_current_user)):
interval_secs = {
"1m": 60, "5m": 300, "30m": 1800,
"1h": 3600, "4h": 14400, "8h": 28800,
"1d": 86400, "1w": 604800, "1M": 2592000,
}
bar_secs = interval_secs.get(interval, 300)
rate_col = "btc_rate" if symbol.upper() == "BTC" else "eth_rate"
price_col = "btc_price" if symbol.upper() == "BTC" else "eth_price"
since = int(time.time()) - bar_secs * limit
rows = await async_fetch(
f"SELECT ts, {rate_col} as rate, {price_col} as price FROM rate_snapshots "
f"WHERE ts >= $1 ORDER BY ts ASC",
since
)
if not rows:
return {"symbol": symbol, "interval": interval, "data": []}
bars: dict = {}
for r in rows:
ts, rate, price = r["ts"], r["rate"], r["price"]
bar_ts = (ts // bar_secs) * bar_secs
if bar_ts not in bars:
bars[bar_ts] = {
"time": bar_ts,
"open": rate, "high": rate, "low": rate, "close": rate,
"price_open": price, "price_high": price, "price_low": price, "price_close": price,
}
else:
b = bars[bar_ts]
b["high"] = max(b["high"], rate)
b["low"] = min(b["low"], rate)
b["close"] = rate
b["price_high"] = max(b["price_high"], price)
b["price_low"] = min(b["price_low"], price)
b["price_close"] = price
data = sorted(bars.values(), key=lambda x: x["time"])[-limit:]
for b in data:
for k in ("open", "high", "low", "close"):
b[k] = round(b[k] * 10000, 4)
return {"symbol": symbol, "interval": interval, "count": len(data), "data": data}
@app.get("/api/stats/ytd")
async def get_stats_ytd(user: dict = Depends(get_current_user)):
cached = get_cache("stats_ytd", 3600)
if cached: return cached
import datetime
year_start = int(datetime.datetime(datetime.datetime.utcnow().year, 1, 1).timestamp() * 1000)
end_time = int(time.time() * 1000)
async with httpx.AsyncClient(timeout=20, headers=HEADERS) as client:
tasks = [
client.get(f"{BINANCE_FAPI}/fundingRate",
params={"symbol": s, "startTime": year_start, "endTime": end_time, "limit": 1000})
for s in SYMBOLS
]
responses = await asyncio.gather(*tasks)
result = {}
for sym, resp in zip(SYMBOLS, responses):
if resp.status_code != 200:
result[sym.replace("USDT","")] = {"annualized": 0, "count": 0}
continue
key = sym.replace("USDT", "")
rates = [float(item["fundingRate"]) for item in resp.json()]
if not rates:
result[key] = {"annualized": 0, "count": 0}
continue
mean = sum(rates) / len(rates)
annualized = round(mean * 3 * 365 * 100, 2)
result[key] = {"annualized": annualized, "count": len(rates)}
set_cache("stats_ytd", result)
return result
@app.get("/api/signals/history")
async def get_signals_history(limit: int = 100, user: dict = Depends(get_current_user)):
try:
rows = await async_fetch(
"SELECT id, symbol, rate, annualized, sent_at, message FROM signal_logs ORDER BY sent_at DESC LIMIT $1",
limit
)
return {"items": rows}
except Exception as e:
return {"items": [], "error": str(e)}
@app.get("/api/history")
async def get_history(user: dict = Depends(get_current_user)):
cached = get_cache("history", 60)
if cached: return cached
end_time = int(datetime.utcnow().timestamp() * 1000)
start_time = int((datetime.utcnow() - timedelta(days=7)).timestamp() * 1000)
async with httpx.AsyncClient(timeout=15, headers=HEADERS) as client:
tasks = [
client.get(f"{BINANCE_FAPI}/fundingRate",
params={"symbol": s, "startTime": start_time, "endTime": end_time, "limit": 1000})
for s in SYMBOLS
]
responses = await asyncio.gather(*tasks)
result = {}
for sym, resp in zip(SYMBOLS, responses):
if resp.status_code != 200:
raise HTTPException(status_code=502, detail=f"Binance history error for {sym}")
key = sym.replace("USDT", "")
result[key] = [
{"fundingTime": item["fundingTime"], "fundingRate": float(item["fundingRate"]),
"timestamp": datetime.utcfromtimestamp(item["fundingTime"] / 1000).isoformat()}
for item in resp.json()
]
set_cache("history", result)
return result
@app.get("/api/stats")
async def get_stats(user: dict = Depends(get_current_user)):
cached = get_cache("stats", 60)
if cached: return cached
end_time = int(datetime.utcnow().timestamp() * 1000)
start_time = int((datetime.utcnow() - timedelta(days=7)).timestamp() * 1000)
async with httpx.AsyncClient(timeout=15, headers=HEADERS) as client:
tasks = [
client.get(f"{BINANCE_FAPI}/fundingRate",
params={"symbol": s, "startTime": start_time, "endTime": end_time, "limit": 1000})
for s in SYMBOLS
]
responses = await asyncio.gather(*tasks)
stats = {}
for sym, resp in zip(SYMBOLS, responses):
if resp.status_code != 200:
raise HTTPException(status_code=502, detail=f"Binance stats error for {sym}")
key = sym.replace("USDT", "")
rates = [float(item["fundingRate"]) for item in resp.json()]
if not rates:
stats[key] = {"mean7d": 0, "annualized": 0, "count": 0}
continue
mean = sum(rates) / len(rates)
annualized = mean * 3 * 365 * 100
stats[key] = {
"mean7d": round(mean * 100, 6),
"annualized": round(annualized, 2),
"count": len(rates),
}
btc_ann = stats.get("BTC", {}).get("annualized", 0)
eth_ann = stats.get("ETH", {}).get("annualized", 0)
btc_mean = stats.get("BTC", {}).get("mean7d", 0)
eth_mean = stats.get("ETH", {}).get("mean7d", 0)
stats["combo"] = {
"mean7d": round((btc_mean + eth_mean) / 2, 6),
"annualized": round((btc_ann + eth_ann) / 2, 2),
}
set_cache("stats", stats)
return stats
# ─── aggTrades 查询接口PG版───────────────────────────────────
@app.get("/api/trades/meta")
async def get_trades_meta(user: dict = Depends(get_current_user)):
rows = await async_fetch("SELECT symbol, last_agg_id, last_time_ms, updated_at FROM agg_trades_meta")
result = {}
for r in rows:
sym = r["symbol"].replace("USDT", "")
result[sym] = {
"last_agg_id": r["last_agg_id"],
"last_time_ms": r["last_time_ms"],
"updated_at": r["updated_at"],
}
return result
@app.get("/api/trades/summary")
async def get_trades_summary(
symbol: str = "BTC",
start_ms: int = 0,
end_ms: int = 0,
interval: str = "1m",
user: dict = Depends(get_current_user),
):
if end_ms == 0:
end_ms = int(time.time() * 1000)
if start_ms == 0:
start_ms = end_ms - 3600 * 1000
interval_ms = {"1m": 60000, "5m": 300000, "15m": 900000, "1h": 3600000}.get(interval, 60000)
sym_full = symbol.upper() + "USDT"
# PG原生聚合比Python循环快100倍
rows = await async_fetch(
"""
SELECT
(time_ms / $4) * $4 AS bar_ms,
ROUND(SUM(CASE WHEN is_buyer_maker = 0 THEN qty ELSE 0 END)::numeric, 4) AS buy_vol,
ROUND(SUM(CASE WHEN is_buyer_maker = 1 THEN qty ELSE 0 END)::numeric, 4) AS sell_vol,
COUNT(*) AS trade_count,
ROUND((SUM(price * qty) / NULLIF(SUM(qty), 0))::numeric, 2) AS vwap,
ROUND(MAX(qty)::numeric, 4) AS max_qty
FROM agg_trades
WHERE symbol = $1 AND time_ms >= $2 AND time_ms < $3
GROUP BY bar_ms
ORDER BY bar_ms ASC
""",
sym_full, start_ms, end_ms, interval_ms
)
result = []
for r in rows:
buy = float(r["buy_vol"])
sell = float(r["sell_vol"])
result.append({
"time_ms": r["bar_ms"],
"buy_vol": buy,
"sell_vol": sell,
"delta": round(buy - sell, 4),
"total_vol": round(buy + sell, 4),
"trade_count": r["trade_count"],
"vwap": float(r["vwap"]) if r["vwap"] else 0,
"max_qty": float(r["max_qty"]),
})
return {"symbol": symbol, "interval": interval, "count": len(result), "data": result}
@app.get("/api/trades/latest")
async def get_trades_latest(
symbol: str = "BTC",
limit: int = 30,
user: dict = Depends(get_current_user),
):
cache_key = f"trades_latest_{symbol}_{limit}"
cached = get_cache(cache_key, 2)
if cached: return cached
sym_full = symbol.upper() + "USDT"
rows = await async_fetch(
"SELECT agg_id, price, qty, time_ms, is_buyer_maker FROM agg_trades "
"WHERE symbol = $1 ORDER BY time_ms DESC, agg_id DESC LIMIT $2",
sym_full, limit
)
result = {"symbol": symbol, "count": len(rows), "data": rows}
set_cache(cache_key, result)
return result
@app.get("/api/collector/health")
async def collector_health(user: dict = Depends(get_current_user)):
now_ms = int(time.time() * 1000)
rows = await async_fetch("SELECT symbol, last_agg_id, last_time_ms FROM agg_trades_meta")
status = {}
for r in rows:
sym = r["symbol"].replace("USDT", "")
lag_s = (now_ms - (r["last_time_ms"] or 0)) / 1000
status[sym] = {
"last_agg_id": r["last_agg_id"],
"lag_seconds": round(lag_s, 1),
"healthy": lag_s < 30,
}
return {"collector": status, "timestamp": now_ms}
# ─── V5 Signal Engine API ────────────────────────────────────────
@app.get("/api/signals/indicators")
async def get_signal_indicators(
symbol: str = "BTC",
minutes: int = 60,
user: dict = Depends(get_current_user),
):
sym_full = symbol.upper() + "USDT"
now_ms = int(time.time() * 1000)
start_ms = now_ms - minutes * 60 * 1000
rows = await async_fetch(
"SELECT ts, cvd_fast, cvd_mid, cvd_day, atr_5m, vwap_30m, price, score, signal "
"FROM signal_indicators_1m WHERE symbol = $1 AND ts >= $2 ORDER BY ts ASC",
sym_full, start_ms
)
return {"symbol": symbol, "count": len(rows), "data": rows}
@app.get("/api/signals/latest")
async def get_signal_latest(user: dict = Depends(get_current_user)):
result = {}
for sym in ["BTCUSDT", "ETHUSDT"]:
row = await async_fetchrow(
"SELECT ts, cvd_fast, cvd_mid, cvd_day, cvd_fast_slope, atr_5m, atr_percentile, "
"vwap_30m, price, p95_qty, p99_qty, score, signal "
"FROM signal_indicators WHERE symbol = $1 ORDER BY ts DESC LIMIT 1",
sym
)
if row:
result[sym.replace("USDT", "")] = row
return result
@app.get("/api/signals/trades")
async def get_signal_trades(
status: str = "all",
limit: int = 50,
user: dict = Depends(get_current_user),
):
if status == "all":
rows = await async_fetch(
"SELECT * FROM signal_trades ORDER BY ts_open DESC LIMIT $1", limit
)
else:
rows = await async_fetch(
"SELECT * FROM signal_trades WHERE status = $1 ORDER BY ts_open DESC LIMIT $2",
status, limit
)
return {"count": len(rows), "data": rows}