""" Append-only 주문 ledger. state/order_log.jsonl 에 매매 모든 라이프사이클(card_issued, pin_issued, approved, rejected, expired, canceled, submitted, filled, partial, failed, dryrun)을 KST 타임스탬프와 함께 한 줄씩 기록한다. 토큰·시크릿 평문 기록을 차단한다. """ from __future__ import annotations import hashlib import json import os from datetime import datetime, timezone, timedelta from pathlib import Path from typing import Optional WORKSPACE_ROOT = Path(__file__).resolve().parent.parent.parent LIMITS_FILE = Path(__file__).resolve().parent / 'limits.json' KST = timezone(timedelta(hours=9)) def _load_limits() -> dict: return json.loads(LIMITS_FILE.read_text(encoding='utf-8')) def _ledger_path() -> Path: return WORKSPACE_ROOT / _load_limits()['ledger']['log_file'] def _now_iso() -> str: return datetime.now(KST).isoformat() def _mask(value: str, keep: int = 4) -> str: if not value: return '***' s = str(value) if len(s) <= keep: return '***' return s[:keep] + '*' * (len(s) - keep) def _scrub_secrets(payload: dict) -> dict: out = {} for k, v in payload.items(): kl = k.lower() if any(s in kl for s in ('token', 'secret', 'pin', 'password', 'appkey', 'appsecret')): out[k] = _mask(v, 2) if v is not None else None elif isinstance(v, dict): out[k] = _scrub_secrets(v) elif isinstance(v, list): out[k] = [_scrub_secrets(x) if isinstance(x, dict) else x for x in v] else: out[k] = v return out def append(event: str, payload: dict) -> None: limits = _load_limits() allowed = set(limits['ledger']['events']) if event not in allowed: raise ValueError(f'unknown ledger event: {event}') safe_payload = _scrub_secrets(payload) if limits['ledger']['mask_token_in_logs'] else payload record = {'ts': _now_iso(), 'event': event, 'payload': safe_payload} path = _ledger_path() path.parent.mkdir(parents=True, exist_ok=True) with path.open('a', encoding='utf-8') as f: f.write(json.dumps(record, ensure_ascii=False) + '\n') try: os.chmod(path, 0o600) except OSError: pass def idempotency_hash(account: str, symbol: str, side: str, qty: int, price: int | str) -> str: """동일 (계좌·종목·방향·수량·가격) 60초 윈도우 중복 주문 차단용 해시.""" minute = int(datetime.now(KST).timestamp() // 60) key = f'{account}|{symbol}|{side}|{qty}|{price}|{minute}' return hashlib.sha256(key.encode('utf-8')).hexdigest()[:16] def find_recent_idempotency(h: str, within_seconds: int = 90) -> Optional[dict]: path = _ledger_path() if not path.exists(): return None cutoff = datetime.now(KST) - timedelta(seconds=within_seconds) with path.open('r', encoding='utf-8') as f: lines = f.readlines() for line in reversed(lines): line = line.strip() if not line: continue try: rec = json.loads(line) ts = datetime.fromisoformat(rec['ts']) except (ValueError, KeyError): continue if ts < cutoff: return None if rec.get('payload', {}).get('idem_hash') == h: return rec return None def last_event_for_symbol(account: str, symbol: str, events: tuple[str, ...] = ('submitted', 'filled', 'partial')) -> Optional[dict]: """동일 종목 딜레이 계산을 위해 가장 최근 '실주문 접수 이후' 매칭 이벤트를 반환. rejected/canceled/expired/failed 는 키움 접수 전 실패이므로 쿨다운 대상 아님. """ path = _ledger_path() if not path.exists(): return None with path.open('r', encoding='utf-8') as f: lines = f.readlines() for line in reversed(lines): line = line.strip() if not line: continue try: rec = json.loads(line) except ValueError: continue if rec.get('event') not in events: continue p = rec.get('payload', {}) if p.get('account') == account and p.get('symbol') == symbol: return rec return None def last_terminal_event(events: tuple[str, ...] = ('submitted', 'filled', 'partial')) -> Optional[dict]: """전체 거래 간 60초 딜레이 계산을 위해 가장 최근 '실주문 접수 이후' 이벤트를 반환. rejected/canceled/expired/failed 는 키움 접수 전 실패이므로 쿨다운 대상 아님. """ path = _ledger_path() if not path.exists(): return None with path.open('r', encoding='utf-8') as f: lines = f.readlines() for line in reversed(lines): line = line.strip() if not line: continue try: rec = json.loads(line) except ValueError: continue if rec.get('event') in events: return rec return None