Files
pegasusAPI/db.py
2026-03-25 02:04:52 -05:00

180 lines
7.2 KiB
Python

"""
SQLite database layer for Gateway Checker API.
Tables: api_keys, request_log, admin_log
"""
import aiosqlite
import secrets
import json
from datetime import datetime, timedelta, timezone
DB_PATH = "api.db"
def _now() -> str:
return datetime.now(timezone.utc).isoformat()
async def init_db():
async with aiosqlite.connect(DB_PATH) as db:
await db.execute("""
CREATE TABLE IF NOT EXISTS api_keys (
api_key TEXT PRIMARY KEY,
owner TEXT NOT NULL,
created_at TEXT NOT NULL,
expires_at TEXT,
request_limit INTEGER,
requests_used INTEGER NOT NULL DEFAULT 0,
rate_per_minute INTEGER NOT NULL DEFAULT 10,
allowed_gateways TEXT NOT NULL DEFAULT '["*"]',
is_active INTEGER NOT NULL DEFAULT 1,
is_paused INTEGER NOT NULL DEFAULT 0
)
""")
await db.execute("""
CREATE TABLE IF NOT EXISTS request_log (
id INTEGER PRIMARY KEY AUTOINCREMENT,
api_key TEXT NOT NULL,
gateway TEXT NOT NULL,
card TEXT NOT NULL,
status TEXT NOT NULL,
response_time REAL,
created_at TEXT NOT NULL,
ip_address TEXT
)
""")
await db.execute("""
CREATE TABLE IF NOT EXISTS admin_log (
id INTEGER PRIMARY KEY AUTOINCREMENT,
action TEXT NOT NULL,
target_key TEXT,
details TEXT,
ip_address TEXT,
created_at TEXT NOT NULL
)
""")
await db.execute("CREATE INDEX IF NOT EXISTS idx_log_key ON request_log(api_key)")
await db.execute("CREATE INDEX IF NOT EXISTS idx_log_created ON request_log(created_at)")
await db.commit()
# ── Key Management ──
async def create_key(owner: str, gateways: list | str = "*", request_limit: int | None = None,
expires_days: int | None = None, rate_per_minute: int = 10) -> dict:
key = "sk_live_" + secrets.token_urlsafe(32)
now = _now()
expires_at = None
if expires_days:
expires_at = (datetime.now(timezone.utc) + timedelta(days=expires_days)).isoformat()
gw_json = json.dumps(gateways) if isinstance(gateways, list) else json.dumps(gateways)
async with aiosqlite.connect(DB_PATH) as db:
await db.execute(
"INSERT INTO api_keys (api_key, owner, created_at, expires_at, request_limit, rate_per_minute, allowed_gateways) VALUES (?,?,?,?,?,?,?)",
(key, owner, now, expires_at, request_limit, rate_per_minute, gw_json)
)
await db.commit()
return {"api_key": key, "owner": owner, "created_at": now, "expires_at": expires_at,
"request_limit": request_limit, "requests_used": 0, "rate_per_minute": rate_per_minute,
"allowed_gateways": gateways, "is_active": True, "is_paused": False}
async def get_key(api_key: str) -> dict | None:
async with aiosqlite.connect(DB_PATH) as db:
db.row_factory = aiosqlite.Row
async with db.execute("SELECT * FROM api_keys WHERE api_key = ?", (api_key,)) as cur:
row = await cur.fetchone()
if not row:
return None
d = dict(row)
d["allowed_gateways"] = json.loads(d["allowed_gateways"])
d["is_active"] = bool(d["is_active"])
d["is_paused"] = bool(d["is_paused"])
return d
async def list_keys() -> list[dict]:
async with aiosqlite.connect(DB_PATH) as db:
db.row_factory = aiosqlite.Row
async with db.execute("SELECT * FROM api_keys ORDER BY created_at DESC") as cur:
rows = await cur.fetchall()
result = []
for row in rows:
d = dict(row)
d["allowed_gateways"] = json.loads(d["allowed_gateways"])
d["is_active"] = bool(d["is_active"])
d["is_paused"] = bool(d["is_paused"])
result.append(d)
return result
async def update_key(api_key: str, **fields) -> bool:
allowed = {"owner", "expires_at", "request_limit", "requests_used",
"rate_per_minute", "allowed_gateways", "is_active", "is_paused"}
updates = {k: v for k, v in fields.items() if k in allowed}
if not updates:
return False
if "allowed_gateways" in updates and isinstance(updates["allowed_gateways"], list):
updates["allowed_gateways"] = json.dumps(updates["allowed_gateways"])
elif "allowed_gateways" in updates and isinstance(updates["allowed_gateways"], str):
updates["allowed_gateways"] = json.dumps(updates["allowed_gateways"])
set_clause = ", ".join(f"{k} = ?" for k in updates)
values = list(updates.values()) + [api_key]
async with aiosqlite.connect(DB_PATH) as db:
cur = await db.execute(f"UPDATE api_keys SET {set_clause} WHERE api_key = ?", values)
await db.commit()
return cur.rowcount > 0
async def delete_key(api_key: str) -> bool:
async with aiosqlite.connect(DB_PATH) as db:
cur = await db.execute("DELETE FROM api_keys WHERE api_key = ?", (api_key,))
await db.commit()
return cur.rowcount > 0
async def increment_usage(api_key: str):
async with aiosqlite.connect(DB_PATH) as db:
await db.execute("UPDATE api_keys SET requests_used = requests_used + 1 WHERE api_key = ?", (api_key,))
await db.commit()
# ── Request Logging ──
async def log_request(api_key: str, gateway: str, card: str, status: str,
response_time: float, ip_address: str):
async with aiosqlite.connect(DB_PATH) as db:
await db.execute(
"INSERT INTO request_log (api_key, gateway, card, status, response_time, created_at, ip_address) VALUES (?,?,?,?,?,?,?)",
(api_key, gateway, card, status, response_time, _now(), ip_address)
)
await db.commit()
# ── Admin Logging ──
async def log_admin(action: str, target_key: str | None, details: dict | None, ip_address: str):
async with aiosqlite.connect(DB_PATH) as db:
await db.execute(
"INSERT INTO admin_log (action, target_key, details, ip_address, created_at) VALUES (?,?,?,?,?)",
(action, target_key, json.dumps(details) if details else None, ip_address, _now())
)
await db.commit()
# ── Stats ──
async def get_stats_24h() -> dict:
cutoff = (datetime.now(timezone.utc) - timedelta(hours=24)).isoformat()
async with aiosqlite.connect(DB_PATH) as db:
async with db.execute("SELECT COUNT(*) FROM request_log WHERE created_at > ?", (cutoff,)) as cur:
total = (await cur.fetchone())[0]
async with db.execute("SELECT COUNT(*) FROM request_log WHERE created_at > ? AND status = 'approved'", (cutoff,)) as cur:
approvals = (await cur.fetchone())[0]
async with db.execute("SELECT COUNT(DISTINCT api_key) FROM request_log WHERE created_at > ?", (cutoff,)) as cur:
unique_users = (await cur.fetchone())[0]
return {"total_checks_24h": total, "approvals_24h": approvals, "unique_users_24h": unique_users}