Files
pegasusAPI/SERVICE_ARCHITECTURE.md
2026-03-25 02:04:52 -05:00

19 KiB

Service Architecture — Gateway Checker API

Living document. Updated as we discuss and build. Read this before starting any work.

Overview

REST API that wraps gateway checker scripts behind authenticated endpoints. Clients get an API key and hit endpoints — they never see source code. This is a black-box service — clients only know the API contract, not how it works internally.

How the service works (the big picture)

┌─────────────────────────────────────────────────────────────────┐
│                        YOUR SERVER                               │
│                                                                  │
│  Client A ──►┐                                                   │
│  Client B ──►┤  FastAPI (api.py)                                 │
│  Client C ──►┤    │                                              │
│  Tg Bot   ──►┘    ├── Auth middleware (validate API key)         │
│                    ├── Rate limiter (per-key, per-minute)        │
│                    ├── Task queue (asyncio.Queue)                │
│                    │     │                                       │
│                    │     ▼                                       │
│                    ├── Gateway Registry                          │
│                    │     ├── comwave_forbot.py  [max_concurrent=1]│
│                    │     ├── comwave_forbot.py  [max_concurrent=5]│
│                    │     └── future_forbot.py   [max_concurrent=N]│
│                    │                                             │
│                    ├── Dedup cache (same card+gateway = cached)  │
│                    ├── Request logger (SQLite)                   │
│                    └── DB (api.db) — keys, usage, logs           │
│                                                                  │
│  Admin (you) ──► /admin/* endpoints (master key)                 │
└─────────────────────────────────────────────────────────────────┘

The flow (step by step)

  1. Client sends request: POST /api/check/comwave with API key + card
  2. Auth middleware: Validates key exists, is active, not expired, not over limit
  3. Gateway access check: Is this key allowed to use comwave?
  4. Rate limit check: Has this key exceeded N requests/minute?
  5. Dedup check: Same card + same gateway in last 60s? Return cached result
  6. Task created: Card is queued, client gets back task_id immediately
  7. Worker picks it up: Respects gateway's max_concurrent semaphore
  8. Gateway runs: comwave_forbot.py does the actual check (5-15s)
  9. Result stored: In-memory results dict + logged to SQLite
  10. Client polls: GET /api/result/{task_id} → gets result when ready

Stack

Component Choice Why
Framework FastAPI Async native, auto-docs, fast
DB SQLite (via aiosqlite) Zero setup, file-based, good for this scale
Auth API key in X-API-Key header Simple, standard
Server uvicorn ASGI server for FastAPI
Config python-dotenv Already installed, clean .env loading
Keys secrets.token_urlsafe(32) 43-char random, unguessable
Task queue asyncio.Queue No Redis needed, in-process
Concurrency asyncio.Semaphore per gateway Controls parallel load

Packages to install: fastapi, uvicorn, aiosqlite


Files

File Purpose Status
api.py FastAPI server, gateway registry, auth, task queue, workers Not started
db.py SQLite key management + request logging Not started
comwave_forbot.py Comwave checker engine (existing, unchanged) Done
comwave_bot.py Telegram bot (existing, will update to call API later) Done
api.db SQLite database (auto-created at runtime) Auto
.env MASTER_KEY, BOT_TOKEN, API_HOST, API_PORT, credentials Exists (needs MASTER_KEY)
SERVICE_ARCHITECTURE.md This file

API Endpoints

Public (no auth)

Method Endpoint Description
GET /health Health check — {"status": "ok", "uptime": 3600}

Client Endpoints (require API key)

Method Endpoint Body Description
POST /api/check/{gateway} {"card": "cc|mm|yyyy|cvv"} Submit card for checking, returns task_id
GET /api/result/{task_id} Poll for result (queued/processing/completed)
GET /api/usage Check own usage/limits/expiry
GET /api/gateways List available gateways + status
GET /api/cooldown Check own cooldown timers per gateway

Admin Endpoints (require master key)

Method Endpoint Body Description
POST /admin/keys {"owner", "gateways", "request_limit", "expires_days", "rate_per_minute"} Create new API key
GET /admin/keys List all keys with usage
GET /admin/keys/{key} Get single key details
PATCH /admin/keys/{key} fields to update Update key limits/expiry/gateways
DELETE /admin/keys/{key} Revoke a key
GET /admin/stats Live server stats (queues, 24h counts, gateway health)

Gateway Registry

Each gateway declares its function, concurrency limit, timeout, and optional lifecycle hooks:

GATEWAYS = {
    "comwave": {
        "fn": checker.check_card,           # the async check function
        "type": "$0 auth",                  # display label
        "status": "online",                 # online/maintenance/offline
        "max_concurrent": 1,                # uses shared session, serialize
        "timeout": 30,                      # seconds before giving up
        "cooldown": 20,                     # per-user cooldown in seconds
        "max_queue": 50,                     # max tasks waiting in queue
        "init": checker.ensure_logged_in,   # called on API startup
        "shutdown": checker.shutdown,        # called on API shutdown
    },
    "comwave3": {
        "fn": check_card_3,
        "type": "$3.33 charge",
        "status": "online",
        "max_concurrent": 5,                # stateless, can run in parallel
        "timeout": 60,
        "cooldown": 20,                     # per-user cooldown in seconds
        "max_queue": 50,                     # max tasks waiting in queue
        "init": None,
        "shutdown": None,
    },
    # Adding a new gateway:
    # "duke": {
    #     "fn": duke_check,
    #     "type": "$1.00 charge",
    #     "status": "online",
    #     "max_concurrent": 3,
    #     "timeout": 45,
    #     "cooldown": 20,
    #     "max_queue": 50,
    #     "init": None,
    #     "shutdown": None,
    # },
}

Adding a new gateway:

  1. Code new_site_forbot.py with async def check_card(card: str) -> dict function
  2. Import and register in GATEWAYS dict in api.py
  3. Create/update client keys to include the new gateway name
  4. Done — client uses POST /api/check/newsite

Gateway function contract:

async def check_card(card: str) -> dict:
    # card = "cc|mm|yyyy|cvv"
    # Must return: {"status": "approved"|"declined"|"error", "message": "...", "time": float}

Cooldown:

  • Each gateway has a cooldown value (seconds) — per-user, not global
  • After a user submits a card to a gateway, they must wait cooldown seconds before submitting another to the same gateway
  • Different users are independent — User A's cooldown doesn't block User B
  • Different gateways are independent — cooldown on comwave doesn't affect comwave3
  • Tracked via in-memory dict: last_request[api_key][gateway] → timestamp
  • If a user submits too soon: 429 {"detail": "Cooldown active", "retry_after": 12.5} (remaining seconds)
  • Trade-off accepted: site load scales with number of active users (10 users = up to 10 concurrent requests), but users never wait for each other

Concurrency control:

  • Each gateway gets its own asyncio.Semaphore(max_concurrent)
  • comwave max_concurrent=1 → only 1 check at a time (shared session with lock)
  • comwave3 max_concurrent=5 → up to 5 parallel checks (stateless)
  • Excess requests wait in queue, not rejected

Lifecycle hooks:

  • init() — called once when API server starts (e.g. comwave login)
  • shutdown() — called once when API server stops (e.g. comwave logout)
  • Both are optional (None for stateless gateways)

Task Queue System

Instead of making clients wait 5-15s for a response, we use async tasks:

Submit flow

POST /api/check/comwave {"card": "4111...|12|2025|123"}
     ↓
Returns immediately:
{"task_id": "abc123", "status": "queued", "gateway": "comwave", "position": 2, "estimated_wait": 15}

Poll flow

GET /api/result/abc123
     ↓
While queued:      {"task_id": "abc123", "status": "queued", "position": 2, "expires_in": 290}
While processing:  {"task_id": "abc123", "status": "processing", "expires_in": 270}
When done:         {"task_id": "abc123", "status": "completed", "result": {...}, "remaining": 999}

Implementation

  • asyncio.Queue per gateway — tasks are queued and processed by workers
  • Workers respect max_concurrent semaphore
  • Results stored in dict[task_id] → result (in-memory, TTL 5 minutes)
  • task_id = uuid4() short string

Why this is better than synchronous

  • No HTTP timeout issues (client gets response in <100ms)
  • Client can submit multiple cards and poll all results
  • Server controls processing speed via semaphores
  • Batch support becomes trivial (submit N tasks, poll N results)

Task ownership

  • Each task is tied to the API key that created it
  • Only the creating key can poll GET /api/result/{task_id}
  • Other keys get 404 "Task not found" — prevents cross-client result snooping
  • Stored as tasks[task_id] = {"api_key": key, "result": ..., ...}

Max queue depth

  • Each gateway has a max queue size (default: 50 tasks)
  • If queue is full: 503 {"detail": "Gateway overloaded", "queue_depth": 50}
  • Prevents a single client from flooding the queue during cooldown edge cases
  • Configurable per gateway via max_queue field in registry

Throttling Strategy

Model: Per-user cooldown (20s default)

Each user (API key) has an independent cooldown timer per gateway. After submitting a card, that user must wait cooldown seconds before submitting another card to the same gateway.

How it works

User A submits card → comwave processes → User A must wait 20s
User B submits card → comwave processes → User B must wait 20s (independent of A)
User A submits to comwave3 → allowed immediately (different gateway)

Implementation

# In-memory tracker
last_request: dict[str, dict[str, float]] = {}  # {api_key: {gateway: timestamp}}

# Before queuing a task:
now = time.time()
last = last_request.get(api_key, {}).get(gateway, 0)
remaining = cooldown - (now - last)
if remaining > 0:
    raise HTTPException(429, {"detail": "Cooldown active", "retry_after": round(remaining, 1)})
last_request.setdefault(api_key, {})[gateway] = now

Behavior with multiple users

Time User A User B User C Site load
0s submits card submits card 2 concurrent
5s waiting (15s left) waiting (15s left) submits card 1 concurrent
20s submits next submits next waiting (5s left) 2 concurrent
25s waiting waiting submits next 1 concurrent

Max site load = number of active users (each can send 1 request per 20s)


Request Deduplication

If a client sends the same card to the same gateway within 60 seconds, return the cached result instead of hitting the gateway again.

  • Key: hash(card + gateway) (SHA256, not stored in plain text)
  • Cache: In-memory dict with TTL
  • Why: Prevents accidental double-charges on comwave3, saves gateway resources

Database Schema (SQLite)

api_keys table

Column Type Description
api_key TEXT PK The key string (sk_live_...)
owner TEXT Client name/identifier
created_at DATETIME When key was created
expires_at DATETIME NULL NULL = never expires
request_limit INTEGER NULL NULL = unlimited
requests_used INTEGER Counter, starts at 0
rate_per_minute INTEGER Max requests per minute (default 10)
allowed_gateways TEXT JSON list ["comwave","comwave3"] or "*" for all
is_active BOOLEAN Can be deactivated without deleting
is_paused BOOLEAN Temporarily disabled (client sees "API key paused")

request_log table

Column Type Description
id INTEGER PK Auto-increment
api_key TEXT Which key was used
gateway TEXT Which gateway
card TEXT Full card (`cc
status TEXT approved/declined/error
response_time REAL Seconds
created_at DATETIME Timestamp
ip_address TEXT Client IP

admin_log table

Column Type Description
id INTEGER PK Auto-increment
action TEXT create_key / revoke_key / update_key / pause_key / unpause_key
target_key TEXT Which API key was affected
details TEXT JSON of what changed (e.g. {"request_limit": [1000, 2000]})
ip_address TEXT Admin's IP
created_at DATETIME Timestamp

Security

Layer Implementation
Auth X-API-Key header required on all client/admin endpoints
Key format sk_live_ + secrets.token_urlsafe(32)
No docs in prod FastAPI(docs_url=None, redoc_url=None)
HTTPS Behind nginx with SSL or Cloudflare tunnel
Rate limit Per-minute limit per API key (in-memory sliding window)
IP block Block IP after 10 failed auth attempts (in-memory, resets after 15min)
Full card logging Full card stored in request_log (keep api.db protected)
Dedup hashing Card+gateway hashed with SHA256, never stored raw
Master key From .env, used for /admin/* only
Health endpoint /health is the only unauthenticated endpoint, reveals nothing sensitive
Graceful shutdown Finish in-progress checks before exit, call gateway shutdown hooks

Response Format

Submit check

{
    "task_id": "a1b2c3d4",
    "status": "queued",
    "gateway": "comwave",
    "position": 2,
    "estimated_wait": 15
}

Poll result (queued)

{
    "task_id": "a1b2c3d4",
    "status": "queued",
    "position": 2,
    "expires_in": 290
}

Poll result (processing)

{
    "task_id": "a1b2c3d4",
    "status": "processing",
    "expires_in": 270
}

Poll result (completed)

{
    "task_id": "a1b2c3d4",
    "status": "completed",
    "result": {
        "status": "declined",
        "gateway": "comwave",
        "message": "DECLINED [VISA] | Last4: 1111 | Auth: 000000 | Ref: 662321650018600120",
        "time": 5.28
    },
    "remaining": 999
}

Usage

{
    "owner": "client_john",
    "requests_used": 347,
    "requests_limit": 1000,
    "expires_at": "2026-04-24T00:00:00",
    "gateways": ["comwave", "comwave3"],
    "rate_per_minute": 10
}

Gateways

{
    "gateways": {
        "comwave":  {"status": "online", "type": "$0 auth"},
        "comwave3": {"status": "online", "type": "$3.33 charge"}
    }
}

Errors

{"detail": "Missing API key"}                     // 401
{"detail": "Invalid API key"}                      // 401
{"detail": "API key expired"}                      // 401
{"detail": "API key deactivated"}                  // 401
{"detail": "Request limit exceeded", "used": 1000, "limit": 1000}  // 429
{"detail": "Rate limit exceeded", "retry_after": 6}               // 429
{"detail": "Cooldown active", "retry_after": 12.5}                // 429
{"detail": "Access denied for gateway: duke"}      // 403
{"detail": "Gateway not found: xyz"}               // 404
{"detail": "Gateway offline: duke"}                // 503
{"detail": "Gateway overloaded", "queue_depth": 50}  // 503
{"detail": "API key paused"}                          // 401
{"detail": "Invalid card format"}                  // 400
{"detail": "Task not found"}                       // 404
{"detail": "IP blocked"}                           // 403

Key Tiers (pricing model)

Tier Limit Expiry Gateways Rate
Trial 50 requests 7 days 1 gateway 5/min
Standard 1000/month 30 days specific 10/min
Unlimited no limit no expiry all 50/min

Telegram Bot Integration (future)

Option A (recommended): Bot calls own API with its own key — cleanest separation, bot is just another API consumer. The bot gets an "unlimited" key with all gateways.

Option B: Bot imports checkers directly (current setup) — faster, no network hop but tightly coupled.

Decision: TBD — build API first, connect bot later.


.env Structure

# API Server
MASTER_KEY=your_admin_secret_here
API_HOST=0.0.0.0
API_PORT=8000

# Telegram Bot
BOT_TOKEN=your_bot_token
ALLOWED_USERS=123456,789012

# Gateway Credentials
COMWAVE_USERNAME=tu76502@gmail.com
COMWAVE_PASSWORD=root13579

Build Order

Phase 1: Core API (v1)

  1. Install fastapi, uvicorn, aiosqlite
  2. Build db.py — SQLite init, key CRUD, usage tracking, request logging
  3. Build api.py — FastAPI app, auth middleware, gateway registry, task queue + workers, all endpoints
  4. Test with comwave gateway — submit, poll, verify result
  5. Test admin endpoints — create key, list, revoke
  6. Test security — expired key, over-limit, wrong gateway, IP blocking
  7. Test concurrency — multiple clients hitting same gateway

Phase 2: Bot + Polish

  1. Connect Telegram bot via API (Option A)
  2. Add more gateways as clients request them

Phase 3: Future (v2)

  1. Webhook/callback support for async results
  2. Batch endpoint (POST /api/check/batch/{gateway})
  3. IP whitelisting per API key
  4. Key rotation
  5. Read-only key scoping (usage-only vs full-access)
  6. Client dashboard (simple HTML)
  7. Gateway health monitoring (auto-disable on failure)
  8. Cloudflare Tunnel for easy HTTPS

Existing Gateways Reference

Comwave $0 auth (comwave)

  • File: comwave_forbot.pyComwaveChecker.check_card()
  • Type: Account portal card update, Moneris Checkout v1
  • Amount: $0 (auth only)
  • Session: Single persistent login, asyncio.Lock for ticket serialization
  • Concurrency: max_concurrent=1 (shared session)
  • Lifecycle: init = ensure_logged_in(), shutdown = shutdown()
  • Note: Server enforces one login at a time

Comwave $3.33 charge (comwave3)

  • File: comwave_forbot.pycheck_card_3()
  • Type: WooCommerce guest checkout, Moneris Checkout v1
  • Amount: $3.33 CAD ($2.95 + 12% tax)
  • Session: Stateless, each call gets own session
  • Concurrency: max_concurrent=5 (fully parallel-safe)
  • Lifecycle: no init/shutdown needed

Last updated: 2026-03-24