""" PIN 발급·검증·만료. 본인 계좌(일반·ISA): 숫자 4자리. 가희 계좌(가희_일반·가희_ISA): 영숫자 8자리, 혼동 글자(0/O/o/1/l/I) 제외 55자. 120초 만료, 1회용, 1회 시도. 동시 활성 카드는 1개로 제한. 상태는 파일(state/active_card.json)에 저장 — CLI 가 매번 새 프로세스로 호출돼도 같은 활성 카드를 본다. 동시성은 fcntl flock 으로 직렬화. """ from __future__ import annotations import fcntl import json import os import secrets import time from dataclasses import asdict, dataclass, field from pathlib import Path from typing import Optional LIMITS_FILE = Path(__file__).resolve().parent / 'limits.json' WORKSPACE_ROOT = Path(__file__).resolve().parent.parent.parent DEFAULT_STATE_FILE = WORKSPACE_ROOT / 'state' / 'active_card.json' def _limits() -> dict: return json.loads(LIMITS_FILE.read_text(encoding='utf-8')) def _account_groups() -> tuple[set[str], set[str]]: d = _limits() return set(d['owner_accounts']), set(d['spouse_accounts']) def issue_pin(account_label: str) -> str: cfg = _limits()['pin'] owners, spouses = _account_groups() if account_label in spouses: chars = cfg['alnum_no_confusing_chars'] length = cfg['spouse_length'] elif account_label in owners: chars = '0123456789' length = cfg['owner_length'] else: raise ValueError(f'unknown account label: {account_label}') return ''.join(secrets.choice(chars) for _ in range(length)) def issue_card_id() -> str: return ''.join(secrets.choice('ABCDEFGHJKLMNPQRSTUVWXYZ23456789') for _ in range(4)) @dataclass class PendingCard: card_id: str pin: str account_label: str issued_at: float expiry_seconds: int payload: dict = field(default_factory=dict) attempts: int = 0 consumed: bool = False def is_expired(self) -> bool: return (time.time() - self.issued_at) >= self.expiry_seconds class _FileLock: def __init__(self, path: Path): self.path = path self._fp = None def __enter__(self): self.path.parent.mkdir(parents=True, exist_ok=True) self._fp = open(self.path, 'w') fcntl.flock(self._fp, fcntl.LOCK_EX) return self def __exit__(self, *args): try: fcntl.flock(self._fp, fcntl.LOCK_UN) finally: self._fp.close() class PinStore: """파일 기반 PinStore. 모든 프로세스가 같은 활성 카드를 본다.""" def __init__(self, state_file: Optional[Path] = None): self._state_file = Path(state_file) if state_file else DEFAULT_STATE_FILE self._lock_file = self._state_file.with_suffix(self._state_file.suffix + '.lock') def _read_locked(self) -> Optional[PendingCard]: if not self._state_file.exists(): return None try: data = json.loads(self._state_file.read_text(encoding='utf-8')) except (OSError, ValueError): return None try: return PendingCard(**data) except TypeError: return None def _write_locked(self, card: PendingCard) -> None: self._state_file.parent.mkdir(parents=True, exist_ok=True) tmp = self._state_file.with_suffix(self._state_file.suffix + '.tmp') tmp.write_text(json.dumps(asdict(card), ensure_ascii=False), encoding='utf-8') try: os.chmod(tmp, 0o600) except OSError: pass os.replace(tmp, self._state_file) def _clear_locked(self) -> None: if self._state_file.exists(): try: self._state_file.unlink() except OSError: pass def issue(self, account_label: str, payload: Optional[dict] = None) -> PendingCard: cfg = _limits()['pin'] with _FileLock(self._lock_file): existing = self._read_locked() if existing is not None and not existing.is_expired() and not existing.consumed: raise RuntimeError('이전 카드가 아직 활성. 처리 후 다시 시도.') card = PendingCard( card_id=issue_card_id(), pin=issue_pin(account_label), account_label=account_label, issued_at=time.time(), expiry_seconds=cfg['expiry_seconds'], payload=payload or {}, ) self._write_locked(card) return card def verify(self, pin_input: str) -> tuple[bool, str, Optional[PendingCard]]: cfg = _limits()['pin'] with _FileLock(self._lock_file): card = self._read_locked() if card is None: return False, '활성 카드 없음', None if card.is_expired(): self._clear_locked() return False, '만료', card if card.consumed: self._clear_locked() return False, '이미 사용됨', card card.attempts += 1 if card.pin != (pin_input or '').strip(): if card.attempts >= cfg['max_attempts']: self._clear_locked() return False, 'PIN 불일치, 카드 무효', card self._write_locked(card) return False, 'PIN 불일치', card card.consumed = True self._clear_locked() return True, 'OK', card def cancel(self) -> Optional[PendingCard]: with _FileLock(self._lock_file): card = self._read_locked() if card is None: return None self._clear_locked() return card def amend(self, payload: dict, account_label: Optional[str] = None) -> Optional[PendingCard]: """활성 카드의 페이로드 갱신 + PIN 재발급 + 만료 리셋. card_id 는 유지. account_label 명시 시 새 계좌 기준으로 PIN 재발급(본인 4자리 / 가희 8자리). 활성 카드가 없거나 만료/소비 상태면 None 반환 (호출자가 NO_ACTIVE_CARD 거부). """ cfg = _limits()['pin'] with _FileLock(self._lock_file): existing = self._read_locked() if existing is None or existing.is_expired() or existing.consumed: if existing is not None and (existing.is_expired() or existing.consumed): self._clear_locked() return None new_account = account_label or existing.account_label new_card = PendingCard( card_id=existing.card_id, pin=issue_pin(new_account), account_label=new_account, issued_at=time.time(), expiry_seconds=cfg['expiry_seconds'], payload=payload, attempts=0, ) self._write_locked(new_card) return new_card def peek(self) -> Optional[PendingCard]: with _FileLock(self._lock_file): return self._read_locked() def sweep_expired(self) -> Optional[PendingCard]: """만료된 카드를 정리하고 정리된 카드를 반환 (만료 알림 송신용).""" with _FileLock(self._lock_file): card = self._read_locked() if card is None: return None if card.is_expired() and not card.consumed: self._clear_locked() return card return None