Files
openclaw/agents/stock/workspace/scripts/orders/pin.py
T
hyowons 549545bde6 Initial commit: OpenClaw 워크스페이스 버전관리 시작
설정·스크립트·스킬·문서·큐레이션 메모리 추적.
시크릿(credentials/identity)·런타임 상태(state/logs/sessions/sqlite)·
백업(clobbered/bak)·dream 캐시는 .gitignore로 제외.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-04 15:10:57 +09:00

214 lines
7.3 KiB
Python

"""
PIN 발급·검증·만료.
본인 계좌(일반·ISA): 숫자 4자리.
가희 계좌(가희_일반·가희_ISA): 영숫자 8자리, 혼동 글자(0/O/o/1/l/I) 제외 55자.
120초 만료, 1회용, 1회 시도.
동시 활성 카드는 1개로 제한.
상태는 파일(state/active_card.json)에 저장 — CLI 가 매번 새 프로세스로 호출돼도
같은 활성 카드를 본다. 동시성은 fcntl flock 으로 직렬화.
"""
from __future__ import annotations
import fcntl
import json
import os
import secrets
import time
from dataclasses import asdict, dataclass, field
from pathlib import Path
from typing import Optional
LIMITS_FILE = Path(__file__).resolve().parent / 'limits.json'
WORKSPACE_ROOT = Path(__file__).resolve().parent.parent.parent
DEFAULT_STATE_FILE = WORKSPACE_ROOT / 'state' / 'active_card.json'
def _limits() -> dict:
return json.loads(LIMITS_FILE.read_text(encoding='utf-8'))
def _account_groups() -> tuple[set[str], set[str]]:
d = _limits()
return set(d['owner_accounts']), set(d['spouse_accounts'])
def issue_pin(account_label: str) -> str:
cfg = _limits()['pin']
owners, spouses = _account_groups()
if account_label in spouses:
chars = cfg['alnum_no_confusing_chars']
length = cfg['spouse_length']
elif account_label in owners:
chars = '0123456789'
length = cfg['owner_length']
else:
raise ValueError(f'unknown account label: {account_label}')
return ''.join(secrets.choice(chars) for _ in range(length))
def issue_card_id() -> str:
return ''.join(secrets.choice('ABCDEFGHJKLMNPQRSTUVWXYZ23456789') for _ in range(4))
@dataclass
class PendingCard:
card_id: str
pin: str
account_label: str
issued_at: float
expiry_seconds: int
payload: dict = field(default_factory=dict)
attempts: int = 0
consumed: bool = False
def is_expired(self) -> bool:
return (time.time() - self.issued_at) >= self.expiry_seconds
class _FileLock:
def __init__(self, path: Path):
self.path = path
self._fp = None
def __enter__(self):
self.path.parent.mkdir(parents=True, exist_ok=True)
self._fp = open(self.path, 'w')
fcntl.flock(self._fp, fcntl.LOCK_EX)
return self
def __exit__(self, *args):
try:
fcntl.flock(self._fp, fcntl.LOCK_UN)
finally:
self._fp.close()
class PinStore:
"""파일 기반 PinStore. 모든 프로세스가 같은 활성 카드를 본다."""
def __init__(self, state_file: Optional[Path] = None):
self._state_file = Path(state_file) if state_file else DEFAULT_STATE_FILE
self._lock_file = self._state_file.with_suffix(self._state_file.suffix + '.lock')
def _read_locked(self) -> Optional[PendingCard]:
if not self._state_file.exists():
return None
try:
data = json.loads(self._state_file.read_text(encoding='utf-8'))
except (OSError, ValueError):
return None
try:
return PendingCard(**data)
except TypeError:
return None
def _write_locked(self, card: PendingCard) -> None:
self._state_file.parent.mkdir(parents=True, exist_ok=True)
tmp = self._state_file.with_suffix(self._state_file.suffix + '.tmp')
tmp.write_text(json.dumps(asdict(card), ensure_ascii=False), encoding='utf-8')
try:
os.chmod(tmp, 0o600)
except OSError:
pass
os.replace(tmp, self._state_file)
def _clear_locked(self) -> None:
if self._state_file.exists():
try:
self._state_file.unlink()
except OSError:
pass
def issue(self, account_label: str, payload: Optional[dict] = None) -> PendingCard:
cfg = _limits()['pin']
with _FileLock(self._lock_file):
existing = self._read_locked()
if existing is not None and not existing.is_expired() and not existing.consumed:
raise RuntimeError('이전 카드가 아직 활성. 처리 후 다시 시도.')
card = PendingCard(
card_id=issue_card_id(),
pin=issue_pin(account_label),
account_label=account_label,
issued_at=time.time(),
expiry_seconds=cfg['expiry_seconds'],
payload=payload or {},
)
self._write_locked(card)
return card
def verify(self, pin_input: str) -> tuple[bool, str, Optional[PendingCard]]:
cfg = _limits()['pin']
with _FileLock(self._lock_file):
card = self._read_locked()
if card is None:
return False, '활성 카드 없음', None
if card.is_expired():
self._clear_locked()
return False, '만료', card
if card.consumed:
self._clear_locked()
return False, '이미 사용됨', card
card.attempts += 1
if card.pin != (pin_input or '').strip():
if card.attempts >= cfg['max_attempts']:
self._clear_locked()
return False, 'PIN 불일치, 카드 무효', card
self._write_locked(card)
return False, 'PIN 불일치', card
card.consumed = True
self._clear_locked()
return True, 'OK', card
def cancel(self) -> Optional[PendingCard]:
with _FileLock(self._lock_file):
card = self._read_locked()
if card is None:
return None
self._clear_locked()
return card
def amend(self, payload: dict, account_label: Optional[str] = None) -> Optional[PendingCard]:
"""활성 카드의 페이로드 갱신 + PIN 재발급 + 만료 리셋. card_id 는 유지.
account_label 명시 시 새 계좌 기준으로 PIN 재발급(본인 4자리 / 가희 8자리).
활성 카드가 없거나 만료/소비 상태면 None 반환 (호출자가 NO_ACTIVE_CARD 거부).
"""
cfg = _limits()['pin']
with _FileLock(self._lock_file):
existing = self._read_locked()
if existing is None or existing.is_expired() or existing.consumed:
if existing is not None and (existing.is_expired() or existing.consumed):
self._clear_locked()
return None
new_account = account_label or existing.account_label
new_card = PendingCard(
card_id=existing.card_id,
pin=issue_pin(new_account),
account_label=new_account,
issued_at=time.time(),
expiry_seconds=cfg['expiry_seconds'],
payload=payload,
attempts=0,
)
self._write_locked(new_card)
return new_card
def peek(self) -> Optional[PendingCard]:
with _FileLock(self._lock_file):
return self._read_locked()
def sweep_expired(self) -> Optional[PendingCard]:
"""만료된 카드를 정리하고 정리된 카드를 반환 (만료 알림 송신용)."""
with _FileLock(self._lock_file):
card = self._read_locked()
if card is None:
return None
if card.is_expired() and not card.consumed:
self._clear_locked()
return card
return None