Files
openclaw/agents/stock/workspace/scripts/orders/guards.py
T
hyowons fed3526b20 Initial commit: OpenClaw 워크스페이스 버전관리 시작
설정·스크립트·스킬·문서·큐레이션 메모리 추적.
시크릿(credentials/identity)·런타임 상태(state/logs/sessions/sqlite)·
백업(clobbered/bak)·dream 캐시는 .gitignore로 제외.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-04 15:39:41 +09:00

550 lines
21 KiB
Python

"""
주문 검증 가드 (순수 함수).
키움 API 호출은 datasource 가 담당. guards 는 (request, market_data) 두 dict 만 받아
결정한다. 단위테스트가 모든 분기를 mock 데이터로 검증한다.
검증 순서 (validate_request):
1. 계좌 화이트리스트
2. side 유효성 (BUY/SELL)
3. 거래시간 + 휴장일 + NXT 매트릭스
4. 거래정지 / VI
5. 전체 거래 60초 딜레이 (ledger 조회)
6. 동일 종목 3분 딜레이 (ledger 조회)
7. 동일 종목 3분 딜레이 (키움 진실 소스 — kt00007). NETWORK 사각·키움앱 직접 매매까지 포함
8. ±30% 가격 가드 (지정가만)
9. 잔고(매수) / 보유수량(매도) 사전조회
라우팅 결정 (determine_routing) 은 검증 통과 후 호출자가 별도로 부른다.
시장가 슬리피지·평균체결가 추정은 estimate_market_fill 로 카드에 표시.
"""
from __future__ import annotations
import json
from dataclasses import dataclass
from datetime import datetime, time as dtime, timezone, timedelta
from pathlib import Path
from typing import Optional
from . import ledger
LIMITS_FILE = Path(__file__).resolve().parent / 'limits.json'
WORKSPACE_ROOT = Path(__file__).resolve().parent.parent.parent
KST = timezone(timedelta(hours=9))
def _limits() -> dict:
return json.loads(LIMITS_FILE.read_text(encoding='utf-8'))
@dataclass
class Result:
ok: bool
code: str
message: str
@classmethod
def OK(cls, code: str = 'OK', message: str = '') -> 'Result':
return cls(True, code, message)
@classmethod
def REJECT(cls, code: str, message: str) -> 'Result':
return cls(False, code, message)
# ---- 계좌 ----
def validate_account(account: str) -> Result:
if account not in _limits()['accounts_whitelist']:
return Result.REJECT('ACCOUNT_NOT_WHITELISTED', f'허용되지 않은 계좌: {account}')
return Result.OK()
def is_spouse_account(account: str) -> bool:
return account in _limits()['spouse_accounts']
# ---- 거래시간 + NXT 매트릭스 ----
def _parse_hms(s: str) -> dtime:
parts = [int(x) for x in s.split(':')]
while len(parts) < 3:
parts.append(0)
return dtime(parts[0], parts[1], parts[2])
def session_at(now: datetime) -> str:
th = _limits()['trading_hours']
t = now.replace(tzinfo=None).time()
if _parse_hms(th['nxt_pre_start']) <= t < _parse_hms(th['nxt_pre_end']):
return 'NXT_PRE'
if _parse_hms(th['krx_regular_start']) <= t < _parse_hms(th['krx_regular_end']):
return 'KRX_NXT'
if _parse_hms(th['krx_closing_auction_start']) <= t < _parse_hms(th['krx_closing_auction_end']):
return 'KRX_CLOSE'
if _parse_hms(th['nxt_after_start']) <= t < _parse_hms(th['nxt_after_end']):
return 'NXT_AFTER'
return 'CLOSED'
def is_today_holiday(now: datetime) -> bool:
th = _limits()['trading_hours']
rel = th.get('holiday_state_file')
if not rel:
return False
p = WORKSPACE_ROOT / rel
if not p.exists():
return False
try:
data = json.loads(p.read_text(encoding='utf-8'))
except (OSError, ValueError):
return False
today = now.strftime('%Y-%m-%d')
holidays = data.get('holidays') if isinstance(data, dict) else data
if not isinstance(holidays, list):
return False
for h in holidays:
if isinstance(h, str) and h == today:
return True
if isinstance(h, dict) and h.get('date') == today:
return True
return False
def validate_trading_hours(now: datetime, is_holiday: bool, nxt_eligible: bool) -> Result:
if is_holiday:
return Result.REJECT('HOLIDAY', '휴장일에는 매매 불가')
sess = session_at(now)
if sess == 'CLOSED':
return Result.REJECT('OUTSIDE_HOURS', f'거래시간 외 ({now.strftime("%H:%M:%S")})')
if sess in ('NXT_PRE', 'NXT_AFTER') and not nxt_eligible:
return Result.REJECT('NXT_NOT_ELIGIBLE', '지금은 NXT 시간대인데 이 종목은 NXT 거래 불가')
return Result.OK(code=sess)
def determine_routing(now: datetime, nxt_eligible: bool, force: Optional[str]) -> str:
routing = _limits()['routing']
suffix_map = {
'AL': routing['suffix_AL'],
'NX': routing['suffix_NX'],
'KRX': routing['suffix_KRX'],
}
if force:
if force not in routing['force_options']:
raise ValueError(f'unknown routing force: {force}')
return suffix_map[force]
sess = session_at(now)
if sess in ('NXT_PRE', 'NXT_AFTER'):
return routing['suffix_NX']
if sess == 'KRX_CLOSE':
return routing['suffix_KRX']
if sess == 'KRX_NXT':
return routing['suffix_AL'] if nxt_eligible else routing['suffix_KRX']
raise ValueError('CLOSED session has no valid routing')
# ---- 가격 가드 (±30% 상한가/하한가) ----
def validate_price_band(side: str, price: int, upper_limit: int, lower_limit: int) -> Result:
if price > upper_limit:
return Result.REJECT('PRICE_ABOVE_UPPER', f'지정가 {price:,}원 > 상한가 {upper_limit:,}')
if price < lower_limit:
return Result.REJECT('PRICE_BELOW_LOWER', f'지정가 {price:,}원 < 하한가 {lower_limit:,}')
return Result.OK()
# ---- 잔고 / 보유 ----
def validate_balance_for_buy(qty: int, price_estimate: int, balance_d2: int,
basis: Optional[str] = None) -> Result:
needed = qty * price_estimate
if balance_d2 < needed:
basis_label = f' ({basis} 기준)' if basis else ''
return Result.REJECT('INSUFFICIENT_BALANCE',
f'예수금 부족: 필요 {needed:,}{basis_label} / 가용 {balance_d2:,}')
return Result.OK()
def validate_position_for_sell(qty: int, position_qty: int) -> Result:
if position_qty < qty:
return Result.REJECT('INSUFFICIENT_POSITION',
f'보유 부족: 매도 {qty}주 / 보유 {position_qty}')
return Result.OK()
# ---- 시장가 세션 제한 (NXT 단독·KRX 단일가에선 시장가 불가) ----
def validate_market_order_session(now: datetime, order_type: str) -> Result:
if order_type != 'MARKET':
return Result.OK()
sess = session_at(now)
if sess in ('NXT_PRE', 'NXT_AFTER'):
return Result.REJECT('MARKET_NOT_ALLOWED_IN_NXT',
'NXT 시간대에는 시장가 불가 — 지정가만 가능. "지금 바로" 또는 가격 명시로 다시 시도.')
if sess == 'KRX_CLOSE':
return Result.REJECT('MARKET_NOT_ALLOWED_IN_AUCTION',
'단일가 동시호가 시간대에는 시장가 불가 — 지정가만 가능.')
return Result.OK()
# ---- 정지 / VI ----
def validate_halt_vi(halt: bool, vi: bool) -> Result:
if halt:
return Result.REJECT('TRADING_HALT', '거래정지 종목')
if vi:
return Result.REJECT('VI', 'VI 발동 종목')
return Result.OK()
# ---- 종목 상태 (ka10099 캐시 기반) ----
# orderWarning 코드 (ka10099 명세):
# 0: 해당없음, 1: ETF투자주의요망, 2: 정리매매, 3: 단기과열, 4: 투자위험, 5: 투자경과
_ORDER_WARNING_REJECT = {'2', '4'} # 정리매매·투자위험 → 사전 거부
_ORDER_WARNING_LABELS = {
'1': 'ETF투자주의요망',
'2': '정리매매',
'3': '단기과열',
'4': '투자위험',
'5': '투자경과',
}
_STATE_REJECT_KEYWORDS = ('거래정지', '정리매매') # state 텍스트 부분 매치 → 거부
_STATE_WARN_KEYWORDS = ('관리종목',) # state 텍스트 부분 매치 → 경고
def evaluate_stock_state(stock_meta: dict | None) -> dict:
"""ka10099 캐시 메타로 종목 상태 평가.
정책 (등급별 차등 — 2026-05-07 결정):
- 거부(STOCK_STATE_BLOCKED): orderWarning ∈ {2 정리매매, 4 투자위험} 또는
state 에 '거래정지'·'정리매매' 키워드 포함
- 경고(warning): orderWarning ∈ {1 ETF주의, 3 단기과열, 5 투자경과} 또는
state 에 '관리종목' 포함
Returns: {'result': Result, 'warning': str | None, 'state': str, 'order_warning': str}
"""
state = (stock_meta or {}).get('state', '') or ''
ow = str((stock_meta or {}).get('order_warning', '0') or '0').strip()
# 거부 우선
if ow in _ORDER_WARNING_REJECT:
label = _ORDER_WARNING_LABELS.get(ow, f'코드 {ow}')
return {
'result': Result.REJECT('STOCK_STATE_BLOCKED', f'{label} 종목 — 매매 차단'),
'warning': None, 'state': state, 'order_warning': ow,
}
for kw in _STATE_REJECT_KEYWORDS:
if kw in state:
return {
'result': Result.REJECT('STOCK_STATE_BLOCKED', f'{kw} 종목 — 매매 차단'),
'warning': None, 'state': state, 'order_warning': ow,
}
# 경고
warning = None
if ow in _ORDER_WARNING_LABELS: # 1, 3, 5 (2/4는 위에서 이미 거부됨)
warning = f'⚠️ 키움 경고: {_ORDER_WARNING_LABELS[ow]} (orderWarning={ow})'
else:
for kw in _STATE_WARN_KEYWORDS:
if kw in state:
warning = f'⚠️ 키움 경고: {kw} (state="{state}")'
break
return {'result': Result.OK(), 'warning': warning, 'state': state, 'order_warning': ow}
# ---- 딜레이 (ledger 조회) ----
def validate_delay_between_orders(now: datetime) -> Result:
# 접수(submitted) 시점만 카운트 — filled/partial은 키움 처리 결과 (사용자 명령 시점 아님)
last = ledger.last_terminal_event(events=('submitted',))
if not last:
return Result.OK()
last_ts = datetime.fromisoformat(last['ts'])
elapsed = (now - last_ts).total_seconds()
cooldown = _limits()['delays']['between_orders_seconds']
if elapsed < cooldown:
return Result.REJECT('COOLDOWN_GLOBAL', f'마지막 거래 후 {int(cooldown - elapsed)}초 남음')
return Result.OK()
def validate_delay_same_symbol(now: datetime, account: str, symbol: str) -> Result:
# 접수(submitted) 시점만 카운트 — filled/partial은 키움 처리 결과라 사용자 명령 시점 아님,
# 한 번 접수 후 N초간 같은 종목 재시도만 차단하면 충분.
last = ledger.last_event_for_symbol(account, symbol, events=('submitted',))
if not last:
return Result.OK()
last_ts = datetime.fromisoformat(last['ts'])
elapsed = (now - last_ts).total_seconds()
cooldown = _limits()['delays']['same_symbol_seconds']
if elapsed < cooldown:
remaining = int(cooldown - elapsed)
m, s = divmod(remaining, 60)
return Result.REJECT('COOLDOWN_SAME_SYMBOL',
f'[{symbol}] 마지막 체결 후 {m}{s}초 남음')
return Result.OK()
def validate_delay_same_symbol_via_broker(now: datetime, symbol: str,
broker_executions: Optional[list],
broker_query_error: Optional[str]) -> Result:
"""키움 진실 소스(kt00007) 기준 동일 종목 딜레이.
NETWORK 사각(우리 ledger 'failed' 인데 실은 키움에 들어감) 과
사용자 키움앱 직접 매매까지 포함한 검증. ledger 가드 통과 후 추가로 호출.
조회 자체 실패 시 보수적 차단 (BROKER_QUERY_FAILED) — 정확도 우선.
"""
if broker_query_error is not None:
return Result.REJECT('BROKER_QUERY_FAILED',
f'키움 체결조회 실패로 안전 차단: {broker_query_error}')
if not broker_executions:
return Result.OK()
cooldown = _limits()['delays']['same_symbol_seconds']
latest_ts: Optional[datetime] = None
latest_src = ''
for ex in broker_executions:
if ex.get('code') != symbol:
continue
# 접수(ord_tm) 시점 카운트 — cntr_qty 무관 (체결 여부와 별개로 접수 자체가 사용자 명령 시점)
ord_tm = (ex.get('ord_tm') or '').strip()
if len(ord_tm) < 5:
continue
try:
t = datetime.strptime(ord_tm, '%H:%M:%S').time()
except ValueError:
continue
ts = datetime.combine(now.date(), t).replace(tzinfo=KST)
if latest_ts is None or ts > latest_ts:
latest_ts = ts
latest_src = ex.get('comm_src', '') or ''
if latest_ts is None:
return Result.OK()
elapsed = (now - latest_ts).total_seconds()
if elapsed < cooldown:
remaining = int(cooldown - elapsed)
m, s = divmod(remaining, 60)
src_hint = f' [출처: {latest_src}]' if latest_src else ''
return Result.REJECT('COOLDOWN_SAME_SYMBOL_BROKER',
f'[{symbol}] 키움 기준 마지막 매매({latest_ts.strftime("%H:%M:%S")}) 후 '
f'{m}{s}초 남음{src_hint}')
return Result.OK()
# ---- 자연어 시장가 분류 + 호가단위 ----
def classify_order_intent(text: str) -> str:
"""레이 파싱 보조. 'MARKET' / 'AGGRESSIVE_LIMIT' / 'LIMIT'.
명시 키워드만 체크. 모호하면 안전한 LIMIT.
"""
cfg = _limits()['market_order']
lower = text.lower() if text else ''
for kw in cfg['natural_language_market']:
if kw.lower() in lower:
return 'MARKET'
for kw in cfg['natural_language_aggressive_limit']:
if kw.lower() in lower:
return 'AGGRESSIVE_LIMIT'
return 'LIMIT'
def tick_size(price: int) -> int:
"""KRX 표준 호가단위 (2023 개편 후, NXT 도 동일 적용)."""
if price < 2000:
return 1
if price < 5000:
return 5
if price < 20000:
return 10
if price < 50000:
return 50
if price < 200000:
return 100
if price < 500000:
return 500
return 1000
def aggressive_limit_price(side: str, orderbook: Optional[dict],
ticks: Optional[int] = None,
fallback_price: Optional[int] = None) -> dict:
"""공격적 지정가 산정.
호가창 우선, 없으면 fallback_price (ka10001 현재가) 로 대체.
Returns: {'ok': bool, 'price': int, 'source': 'orderbook'|'fallback', 'ref_price': int}
또는 {'ok': False, 'code': str, 'message': str}
"""
if ticks is None:
ticks = _limits()['market_order']['aggressive_limit_ticks']
if orderbook:
levels = orderbook.get('asks' if side == 'BUY' else 'bids') or []
if levels:
ref = int(levels[0]['price'])
if ref > 0:
if side == 'BUY':
price = ref + ticks * tick_size(ref)
else:
price = ref - ticks * tick_size(ref)
return {'ok': True, 'price': price, 'source': 'orderbook', 'ref_price': ref}
# fallback — ka10001 현재가
if fallback_price and fallback_price > 0:
ref = int(fallback_price)
if side == 'BUY':
price = ref + ticks * tick_size(ref)
else:
price = ref - ticks * tick_size(ref)
return {'ok': True, 'price': price, 'source': 'fallback', 'ref_price': ref}
return {'ok': False, 'code': 'NO_ORDERBOOK',
'message': '호가창·현재가 모두 조회 실패 — 공격적 지정가 산정 불가'}
BUDGET_BUMP_MAX_REF_PRICE = 300_000
def convert_budget_to_qty(side: str, budget: int, orderbook: Optional[dict],
fallback_price: Optional[int] = None) -> dict:
"""예산(원) → 정수 주식 수량 환산.
BUY: 매도1호가 기준 (시장가 매수 시 실제 체결 가능성 가장 높은 가격).
SELL: 매수1호가 기준 (대칭 — 매도 회수 추정).
호가창 비어있으면 fallback_price (ka10001 현재가) 로 대체.
버림 (floor). 슬리피지 마진 0%.
BUY +1 정책: 1주 가격 ≤ 300,000원 이고 floor 잔액이 0보다 크면 qty+=1.
예산을 살짝 초과해 1주 더 매수. SELL 은 항상 floor (보유수량 초과 매도 방지).
Returns:
ok=True 시 {'ok': True, 'qty': int, 'ref_price': int, 'remainder': int,
'source': 'orderbook'|'fallback', 'bumped': bool}
remainder: 음수면 예산 초과액 (bumped=True 일 때만 음수 가능).
ok=False 시 {'ok': False, 'code': str, 'message': str}
"""
if not isinstance(budget, int) or budget <= 0:
return {'ok': False, 'code': 'BUDGET_INVALID',
'message': f'예산은 양의 정수여야 합니다 (입력: {budget!r})'}
ref_price = 0
source = 'orderbook'
if orderbook:
levels = orderbook.get('asks' if side == 'BUY' else 'bids') or []
if levels:
cand = int(levels[0]['price'])
if cand > 0:
ref_price = cand
if ref_price <= 0:
if fallback_price and fallback_price > 0:
ref_price = int(fallback_price)
source = 'fallback'
else:
return {'ok': False, 'code': 'NO_ORDERBOOK',
'message': '호가창·현재가 모두 조회 실패 — 금액 환산 불가'}
qty = budget // ref_price
remainder = budget - qty * ref_price
bumped = False
if (side == 'BUY' and qty > 0 and remainder > 0
and ref_price <= BUDGET_BUMP_MAX_REF_PRICE):
qty += 1
remainder = budget - qty * ref_price # 음수 — 예산 초과액
bumped = True
if qty <= 0:
ref_label_map = {
('BUY', 'orderbook'): '매도1호가',
('SELL', 'orderbook'): '매수1호가',
('BUY', 'fallback'): '현재가',
('SELL', 'fallback'): '현재가',
}
ref_label = ref_label_map[(side, source)]
return {'ok': False, 'code': 'BUDGET_TOO_SMALL',
'message': f'1주 가격({ref_price:,}원, {ref_label})이 예산({budget:,}원)보다 큽니다'}
return {'ok': True, 'qty': qty, 'ref_price': ref_price,
'remainder': remainder, 'source': source, 'bumped': bumped}
def estimate_market_fill(side: str, qty: int, orderbook: dict, depth: Optional[int] = None) -> dict:
"""시장가 평균체결가·슬리피지 추정. 호가창 부족분은 마지막 호가가 추정."""
if depth is None:
depth = _limits()['market_order']['show_orderbook_depth']
levels = (orderbook['asks'] if side == 'BUY' else orderbook['bids'])[:depth]
if not levels:
return {'avg_fill': 0, 'total_won': 0, 'reference_price': 0, 'slippage_pct': 0.0}
remaining = qty
total_won = 0
last_price = levels[0]['price']
for lvl in levels:
if remaining <= 0:
break
take = min(remaining, lvl['qty'])
total_won += take * lvl['price']
remaining -= take
last_price = lvl['price']
if remaining > 0:
total_won += remaining * last_price
avg = total_won // qty
ref = levels[0]['price']
slip = ((avg - ref) / ref * 100) if ref else 0.0
if side == 'SELL':
slip = -slip
return {
'avg_fill': avg,
'total_won': total_won,
'reference_price': ref,
'slippage_pct': round(slip, 3),
}
# ---- 통합 ----
def validate_request(request: dict, market_data: dict) -> Result:
r = validate_account(request['account'])
if not r.ok:
return r
if request['side'] not in ('BUY', 'SELL'):
return Result.REJECT('INVALID_SIDE', f'잘못된 방향: {request["side"]}')
if request['order_type'] not in ('LIMIT', 'MARKET', 'AGGRESSIVE_LIMIT'):
return Result.REJECT('INVALID_ORDER_TYPE', f'잘못된 주문방식: {request["order_type"]}')
r = validate_trading_hours(market_data['now'], market_data.get('is_holiday', False),
market_data.get('nxt_eligible', False))
if not r.ok:
return r
r = validate_halt_vi(market_data.get('halt', False), market_data.get('vi', False))
if not r.ok:
return r
r = validate_market_order_session(market_data['now'], request['order_type'])
if not r.ok:
return r
r = validate_delay_between_orders(market_data['now'])
if not r.ok:
return r
# same_symbol 가드는 between_orders_seconds 글로벌 가드로 통합됨 — 같은 종목 별도 필터 X.
# 함수 자체는 보존 (limits.json 분리 설정 시 부활 가능).
if request['order_type'] == 'LIMIT':
r = validate_price_band(request['side'], request['price'],
market_data['upper_limit'], market_data['lower_limit'])
if not r.ok:
return r
if request['side'] == 'BUY':
basis = None
if request['order_type'] == 'MARKET':
# 키움 시장가 매수 증거금은 상한가 × qty 기준. 호가창 평균이 아닌 상한가로 사전 차단.
price_est = market_data.get('upper_limit', 0)
basis = '상한가'
elif request['order_type'] == 'AGGRESSIVE_LIMIT':
agg_res = aggressive_limit_price('BUY', market_data.get('orderbook'),
fallback_price=market_data.get('current_price'))
if not agg_res['ok']:
return Result.REJECT(agg_res['code'], agg_res['message'])
price_est = agg_res['price']
else:
price_est = request['price']
r = validate_balance_for_buy(request['qty'], price_est, market_data['balance_d2'], basis=basis)
if not r.ok:
return r
else:
r = validate_position_for_sell(request['qty'], market_data['position_qty'])
if not r.ok:
return r
return Result.OK()