fed3526b20
설정·스크립트·스킬·문서·큐레이션 메모리 추적. 시크릿(credentials/identity)·런타임 상태(state/logs/sessions/sqlite)· 백업(clobbered/bak)·dream 캐시는 .gitignore로 제외. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
214 lines
7.3 KiB
Python
214 lines
7.3 KiB
Python
"""
|
|
PIN 발급·검증·만료.
|
|
|
|
본인 계좌(일반·ISA): 숫자 4자리.
|
|
가희 계좌(가희_일반·가희_ISA): 영숫자 8자리, 혼동 글자(0/O/o/1/l/I) 제외 55자.
|
|
120초 만료, 1회용, 1회 시도.
|
|
|
|
동시 활성 카드는 1개로 제한.
|
|
|
|
상태는 파일(state/active_card.json)에 저장 — CLI 가 매번 새 프로세스로 호출돼도
|
|
같은 활성 카드를 본다. 동시성은 fcntl flock 으로 직렬화.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import fcntl
|
|
import json
|
|
import os
|
|
import secrets
|
|
import time
|
|
from dataclasses import asdict, dataclass, field
|
|
from pathlib import Path
|
|
from typing import Optional
|
|
|
|
LIMITS_FILE = Path(__file__).resolve().parent / 'limits.json'
|
|
WORKSPACE_ROOT = Path(__file__).resolve().parent.parent.parent
|
|
DEFAULT_STATE_FILE = WORKSPACE_ROOT / 'state' / 'active_card.json'
|
|
|
|
|
|
def _limits() -> dict:
|
|
return json.loads(LIMITS_FILE.read_text(encoding='utf-8'))
|
|
|
|
|
|
def _account_groups() -> tuple[set[str], set[str]]:
|
|
d = _limits()
|
|
return set(d['owner_accounts']), set(d['spouse_accounts'])
|
|
|
|
|
|
def issue_pin(account_label: str) -> str:
|
|
cfg = _limits()['pin']
|
|
owners, spouses = _account_groups()
|
|
if account_label in spouses:
|
|
chars = cfg['alnum_no_confusing_chars']
|
|
length = cfg['spouse_length']
|
|
elif account_label in owners:
|
|
chars = '0123456789'
|
|
length = cfg['owner_length']
|
|
else:
|
|
raise ValueError(f'unknown account label: {account_label}')
|
|
return ''.join(secrets.choice(chars) for _ in range(length))
|
|
|
|
|
|
def issue_card_id() -> str:
|
|
return ''.join(secrets.choice('ABCDEFGHJKLMNPQRSTUVWXYZ23456789') for _ in range(4))
|
|
|
|
|
|
@dataclass
|
|
class PendingCard:
|
|
card_id: str
|
|
pin: str
|
|
account_label: str
|
|
issued_at: float
|
|
expiry_seconds: int
|
|
payload: dict = field(default_factory=dict)
|
|
attempts: int = 0
|
|
consumed: bool = False
|
|
|
|
def is_expired(self) -> bool:
|
|
return (time.time() - self.issued_at) >= self.expiry_seconds
|
|
|
|
|
|
class _FileLock:
|
|
def __init__(self, path: Path):
|
|
self.path = path
|
|
self._fp = None
|
|
|
|
def __enter__(self):
|
|
self.path.parent.mkdir(parents=True, exist_ok=True)
|
|
self._fp = open(self.path, 'w')
|
|
fcntl.flock(self._fp, fcntl.LOCK_EX)
|
|
return self
|
|
|
|
def __exit__(self, *args):
|
|
try:
|
|
fcntl.flock(self._fp, fcntl.LOCK_UN)
|
|
finally:
|
|
self._fp.close()
|
|
|
|
|
|
class PinStore:
|
|
"""파일 기반 PinStore. 모든 프로세스가 같은 활성 카드를 본다."""
|
|
|
|
def __init__(self, state_file: Optional[Path] = None):
|
|
self._state_file = Path(state_file) if state_file else DEFAULT_STATE_FILE
|
|
self._lock_file = self._state_file.with_suffix(self._state_file.suffix + '.lock')
|
|
|
|
def _read_locked(self) -> Optional[PendingCard]:
|
|
if not self._state_file.exists():
|
|
return None
|
|
try:
|
|
data = json.loads(self._state_file.read_text(encoding='utf-8'))
|
|
except (OSError, ValueError):
|
|
return None
|
|
try:
|
|
return PendingCard(**data)
|
|
except TypeError:
|
|
return None
|
|
|
|
def _write_locked(self, card: PendingCard) -> None:
|
|
self._state_file.parent.mkdir(parents=True, exist_ok=True)
|
|
tmp = self._state_file.with_suffix(self._state_file.suffix + '.tmp')
|
|
tmp.write_text(json.dumps(asdict(card), ensure_ascii=False), encoding='utf-8')
|
|
try:
|
|
os.chmod(tmp, 0o600)
|
|
except OSError:
|
|
pass
|
|
os.replace(tmp, self._state_file)
|
|
|
|
def _clear_locked(self) -> None:
|
|
if self._state_file.exists():
|
|
try:
|
|
self._state_file.unlink()
|
|
except OSError:
|
|
pass
|
|
|
|
def issue(self, account_label: str, payload: Optional[dict] = None) -> PendingCard:
|
|
cfg = _limits()['pin']
|
|
with _FileLock(self._lock_file):
|
|
existing = self._read_locked()
|
|
if existing is not None and not existing.is_expired() and not existing.consumed:
|
|
raise RuntimeError('이전 카드가 아직 활성. 처리 후 다시 시도.')
|
|
card = PendingCard(
|
|
card_id=issue_card_id(),
|
|
pin=issue_pin(account_label),
|
|
account_label=account_label,
|
|
issued_at=time.time(),
|
|
expiry_seconds=cfg['expiry_seconds'],
|
|
payload=payload or {},
|
|
)
|
|
self._write_locked(card)
|
|
return card
|
|
|
|
def verify(self, pin_input: str) -> tuple[bool, str, Optional[PendingCard]]:
|
|
cfg = _limits()['pin']
|
|
with _FileLock(self._lock_file):
|
|
card = self._read_locked()
|
|
if card is None:
|
|
return False, '활성 카드 없음', None
|
|
if card.is_expired():
|
|
self._clear_locked()
|
|
return False, '만료', card
|
|
if card.consumed:
|
|
self._clear_locked()
|
|
return False, '이미 사용됨', card
|
|
card.attempts += 1
|
|
if card.pin != (pin_input or '').strip():
|
|
if card.attempts >= cfg['max_attempts']:
|
|
self._clear_locked()
|
|
return False, 'PIN 불일치, 카드 무효', card
|
|
self._write_locked(card)
|
|
return False, 'PIN 불일치', card
|
|
card.consumed = True
|
|
self._clear_locked()
|
|
return True, 'OK', card
|
|
|
|
def cancel(self) -> Optional[PendingCard]:
|
|
with _FileLock(self._lock_file):
|
|
card = self._read_locked()
|
|
if card is None:
|
|
return None
|
|
self._clear_locked()
|
|
return card
|
|
|
|
def amend(self, payload: dict, account_label: Optional[str] = None) -> Optional[PendingCard]:
|
|
"""활성 카드의 페이로드 갱신 + PIN 재발급 + 만료 리셋. card_id 는 유지.
|
|
|
|
account_label 명시 시 새 계좌 기준으로 PIN 재발급(본인 4자리 / 가희 8자리).
|
|
활성 카드가 없거나 만료/소비 상태면 None 반환 (호출자가 NO_ACTIVE_CARD 거부).
|
|
"""
|
|
cfg = _limits()['pin']
|
|
with _FileLock(self._lock_file):
|
|
existing = self._read_locked()
|
|
if existing is None or existing.is_expired() or existing.consumed:
|
|
if existing is not None and (existing.is_expired() or existing.consumed):
|
|
self._clear_locked()
|
|
return None
|
|
new_account = account_label or existing.account_label
|
|
new_card = PendingCard(
|
|
card_id=existing.card_id,
|
|
pin=issue_pin(new_account),
|
|
account_label=new_account,
|
|
issued_at=time.time(),
|
|
expiry_seconds=cfg['expiry_seconds'],
|
|
payload=payload,
|
|
attempts=0,
|
|
)
|
|
self._write_locked(new_card)
|
|
return new_card
|
|
|
|
def peek(self) -> Optional[PendingCard]:
|
|
with _FileLock(self._lock_file):
|
|
return self._read_locked()
|
|
|
|
def sweep_expired(self) -> Optional[PendingCard]:
|
|
"""만료된 카드를 정리하고 정리된 카드를 반환 (만료 알림 송신용)."""
|
|
with _FileLock(self._lock_file):
|
|
card = self._read_locked()
|
|
if card is None:
|
|
return None
|
|
if card.is_expired() and not card.consumed:
|
|
self._clear_locked()
|
|
return card
|
|
return None
|