from fastapi import FastAPI, HTTPException from fastapi.middleware.cors import CORSMiddleware import httpx from datetime import datetime, timedelta import asyncio, time app = FastAPI(title="Arbitrage Engine API") app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_methods=["*"], allow_headers=["*"], ) BINANCE_FAPI = "https://fapi.binance.com/fapi/v1" SYMBOLS = ["BTCUSDT", "ETHUSDT"] HEADERS = {"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36"} # 简单内存缓存(history/stats 60秒,rates 3秒) _cache: dict = {} def get_cache(key: str, ttl: int): entry = _cache.get(key) if entry and time.time() - entry["ts"] < ttl: return entry["data"] return None def set_cache(key: str, data): _cache[key] = {"ts": time.time(), "data": data} @app.get("/api/health") async def health(): return {"status": "ok", "timestamp": datetime.utcnow().isoformat()} @app.get("/api/rates") async def get_rates(): cached = get_cache("rates", 3) if cached: return cached async with httpx.AsyncClient(timeout=10, headers=HEADERS) as client: tasks = [client.get(f"{BINANCE_FAPI}/premiumIndex", params={"symbol": s}) for s in SYMBOLS] responses = await asyncio.gather(*tasks) result = {} for sym, resp in zip(SYMBOLS, responses): if resp.status_code != 200: raise HTTPException(status_code=502, detail=f"Binance error for {sym}") data = resp.json() key = sym.replace("USDT", "") result[key] = { "symbol": sym, "markPrice": float(data["markPrice"]), "indexPrice": float(data["indexPrice"]), "lastFundingRate": float(data["lastFundingRate"]), "nextFundingTime": data["nextFundingTime"], "timestamp": data["time"], } set_cache("rates", result) return result @app.get("/api/history") async def get_history(): cached = get_cache("history", 60) if cached: return cached end_time = int(datetime.utcnow().timestamp() * 1000) start_time = int((datetime.utcnow() - timedelta(days=7)).timestamp() * 1000) async with httpx.AsyncClient(timeout=15, headers=HEADERS) as client: tasks = [ client.get(f"{BINANCE_FAPI}/fundingRate", params={"symbol": s, "startTime": start_time, "endTime": end_time, "limit": 1000}) for s in SYMBOLS ] responses = await asyncio.gather(*tasks) result = {} for sym, resp in zip(SYMBOLS, responses): if resp.status_code != 200: raise HTTPException(status_code=502, detail=f"Binance history error for {sym}") key = sym.replace("USDT", "") result[key] = [ {"fundingTime": item["fundingTime"], "fundingRate": float(item["fundingRate"]), "timestamp": datetime.utcfromtimestamp(item["fundingTime"] / 1000).isoformat()} for item in resp.json() ] set_cache("history", result) return result @app.get("/api/stats") async def get_stats(): cached = get_cache("stats", 60) if cached: return cached end_time = int(datetime.utcnow().timestamp() * 1000) start_time = int((datetime.utcnow() - timedelta(days=7)).timestamp() * 1000) async with httpx.AsyncClient(timeout=15, headers=HEADERS) as client: tasks = [ client.get(f"{BINANCE_FAPI}/fundingRate", params={"symbol": s, "startTime": start_time, "endTime": end_time, "limit": 1000}) for s in SYMBOLS ] responses = await asyncio.gather(*tasks) stats = {} for sym, resp in zip(SYMBOLS, responses): if resp.status_code != 200: raise HTTPException(status_code=502, detail=f"Binance stats error for {sym}") key = sym.replace("USDT", "") rates = [float(item["fundingRate"]) for item in resp.json()] if not rates: stats[key] = {"mean7d": 0, "annualized": 0, "count": 0} continue mean = sum(rates) / len(rates) annualized = mean * 3 * 365 * 100 stats[key] = { "mean7d": round(mean * 100, 6), "annualized": round(annualized, 2), "count": len(rates), } btc_ann = stats.get("BTC", {}).get("annualized", 0) eth_ann = stats.get("ETH", {}).get("annualized", 0) btc_mean = stats.get("BTC", {}).get("mean7d", 0) eth_mean = stats.get("ETH", {}).get("mean7d", 0) stats["combo"] = { "mean7d": round((btc_mean + eth_mean) / 2, 6), "annualized": round((btc_ann + eth_ann) / 2, 2), } set_cache("stats", stats) return stats