- auth.py: rewrite to use PG via db.py (was sqlite3) - admin_cli.py: rewrite to use PG - migrate_auth_sqlite_to_pg.py: one-time migration script - SQLite arb.db no longer needed after migration
124 lines
4.1 KiB
Python
124 lines
4.1 KiB
Python
#!/usr/bin/env python3
|
|
"""Admin CLI for Arbitrage Engine (PostgreSQL version)"""
|
|
import sys, os, secrets
|
|
|
|
sys.path.insert(0, os.path.dirname(__file__))
|
|
from db import get_sync_conn
|
|
|
|
|
|
def gen_invite(count=1, max_uses=1):
|
|
with get_sync_conn() as conn:
|
|
with conn.cursor() as cur:
|
|
codes = []
|
|
for _ in range(count):
|
|
code = secrets.token_urlsafe(6)[:8].upper()
|
|
cur.execute(
|
|
"INSERT INTO invite_codes (code, created_by, max_uses) VALUES (%s, 1, %s)",
|
|
(code, max_uses)
|
|
)
|
|
codes.append(code)
|
|
conn.commit()
|
|
for c in codes:
|
|
print(f" {c}")
|
|
print(f"\nGenerated {len(codes)} invite code(s)")
|
|
|
|
|
|
def list_invites():
|
|
with get_sync_conn() as conn:
|
|
with conn.cursor() as cur:
|
|
cur.execute("SELECT id, code, max_uses, used_count, status, created_at FROM invite_codes ORDER BY id DESC")
|
|
cols = [desc[0] for desc in cur.description]
|
|
rows = [dict(zip(cols, row)) for row in cur.fetchall()]
|
|
if not rows:
|
|
print("No invite codes found")
|
|
return
|
|
print(f"{'ID':>4} {'CODE':>10} {'MAX':>4} {'USED':>5} {'STATUS':>10} {'CREATED':>20}")
|
|
print("-" * 60)
|
|
for r in rows:
|
|
print(f"{r['id']:>4} {r['code']:>10} {r['max_uses']:>4} {r['used_count']:>5} {r['status']:>10} {str(r['created_at']):>20}")
|
|
|
|
|
|
def disable_invite(code):
|
|
with get_sync_conn() as conn:
|
|
with conn.cursor() as cur:
|
|
cur.execute("UPDATE invite_codes SET status = 'disabled' WHERE code = %s", (code,))
|
|
conn.commit()
|
|
print(f"Disabled invite code: {code}")
|
|
|
|
|
|
def list_users():
|
|
with get_sync_conn() as conn:
|
|
with conn.cursor() as cur:
|
|
cur.execute("SELECT id, email, role, banned, created_at FROM users ORDER BY id DESC")
|
|
cols = [desc[0] for desc in cur.description]
|
|
rows = [dict(zip(cols, row)) for row in cur.fetchall()]
|
|
if not rows:
|
|
print("No users found")
|
|
return
|
|
print(f"{'ID':>4} {'EMAIL':>30} {'ROLE':>6} {'BANNED':>7} {'CREATED':>20}")
|
|
print("-" * 72)
|
|
for r in rows:
|
|
print(f"{r['id']:>4} {r['email']:>30} {r['role']:>6} {r['banned']:>7} {str(r['created_at']):>20}")
|
|
|
|
|
|
def ban_user(user_id):
|
|
with get_sync_conn() as conn:
|
|
with conn.cursor() as cur:
|
|
cur.execute("UPDATE users SET banned = 1 WHERE id = %s", (user_id,))
|
|
conn.commit()
|
|
print(f"Banned user ID: {user_id}")
|
|
|
|
|
|
def unban_user(user_id):
|
|
with get_sync_conn() as conn:
|
|
with conn.cursor() as cur:
|
|
cur.execute("UPDATE users SET banned = 0 WHERE id = %s", (user_id,))
|
|
conn.commit()
|
|
print(f"Unbanned user ID: {user_id}")
|
|
|
|
|
|
def set_admin(user_id):
|
|
with get_sync_conn() as conn:
|
|
with conn.cursor() as cur:
|
|
cur.execute("UPDATE users SET role = 'admin' WHERE id = %s", (user_id,))
|
|
conn.commit()
|
|
print(f"Set user {user_id} as admin")
|
|
|
|
|
|
def usage():
|
|
print("""Usage: python3 admin_cli.py <command> [args]
|
|
|
|
Commands:
|
|
gen-invite [count] [max_uses] Generate invite codes (default: 1 code, 1 use)
|
|
list-invites List all invite codes
|
|
disable-invite <CODE> Disable an invite code
|
|
list-users List all users
|
|
ban-user <USER_ID> Ban a user
|
|
unban-user <USER_ID> Unban a user
|
|
set-admin <USER_ID> Set user as admin
|
|
""")
|
|
|
|
if __name__ == "__main__":
|
|
if len(sys.argv) < 2:
|
|
usage()
|
|
sys.exit(1)
|
|
cmd = sys.argv[1]
|
|
if cmd == "gen-invite":
|
|
count = int(sys.argv[2]) if len(sys.argv) > 2 else 1
|
|
max_uses = int(sys.argv[3]) if len(sys.argv) > 3 else 1
|
|
gen_invite(count, max_uses)
|
|
elif cmd == "list-invites":
|
|
list_invites()
|
|
elif cmd == "disable-invite":
|
|
disable_invite(sys.argv[2])
|
|
elif cmd == "list-users":
|
|
list_users()
|
|
elif cmd == "ban-user":
|
|
ban_user(int(sys.argv[2]))
|
|
elif cmd == "unban-user":
|
|
unban_user(int(sys.argv[2]))
|
|
elif cmd == "set-admin":
|
|
set_admin(int(sys.argv[2]))
|
|
else:
|
|
usage()
|