#!/usr/bin/env python3 """@비하이브투자자문 신규 '종목분석' 영상 감지 + 자막 수집 + watchlist 저장 + 이메일/텔레그램 발송 + 조회. Subcommands: fetch — 신규 매칭 영상 메타데이터 JSON 출력(자막 제외) + 자막은 fetch 캐시에만 저장 transcript VIDEO_ID — fetch 캐시에서 특정 영상의 자막만 꺼내 출력 (토큰 절감용) save VIDEO_ID — stdin JSON 분석 데이터를 watchlist에 저장하고 seen 기록 add STOCK ... — 수동 watchlist 추가 (--code, --buy, --target, --stop, --note) email VIDEO_IDS... — watchlist에서 해당 영상들을 읽어 단일 이메일 발송 notify VIDEO_IDS... — 레이 텔레그램으로 "N개 보고서 제출 (종목명들)" 요약 발송 list — watchlist 전체 종목 요약 테이블 출력 show STOCK — 특정 종목의 상세 분석 내용 출력 remove STOCK — watchlist에서 특정 종목 제거 현재가 조회는 키움 ka10001(`kiwoom_client.get_stock_quote`) 사용. """ from __future__ import annotations import json import subprocess import sys import time import urllib.error import urllib.parse import urllib.request import xml.etree.ElementTree as ET from datetime import datetime, timezone, timedelta from pathlib import Path KST = timezone(timedelta(hours=9)) FEED_USER_AGENT = ( 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) ' 'AppleWebKit/537.36 (KHTML, like Gecko) ' 'Chrome/122.0.0.0 Safari/537.36' ) FEED_RETRY_ATTEMPTS = 4 FEED_RETRY_BACKOFF_SEC = 3.0 FEED_RETRY_STATUS = {408, 429, 500, 502, 503, 504} CHANNEL_ID = 'UCHTRF5r154igU2gXjudUMzg' CHANNEL_NAME = '비하이브 투자자문' FEED_URL = f'https://www.youtube.com/feeds/videos.xml?channel_id={CHANNEL_ID}' SEARCH_URL = 'https://www.youtube.com/results?search_query={query}&sp=CAISAhAB' TITLE_FILTER = '종목분석' TRANSCRIPT_LANGS = ['ko', 'ko-KR', 'en'] TRANSCRIPT_CHAR_LIMIT = 8000 FETCH_LIMIT = 10 SEEN_CAP = 200 WORKSPACE = Path('/Users/snowoyh/.openclaw/agents/stock/workspace') STATE_DIR = WORKSPACE / 'state' STATE_DIR.mkdir(parents=True, exist_ok=True) SEEN_STATE = STATE_DIR / 'behive_youtube_seen.json' FETCH_CACHE = STATE_DIR / 'behive_last_fetch.json' WATCHLIST = STATE_DIR / 'behive_watchlist.json' CONFIG_PATH = Path('/Users/snowoyh/.openclaw/openclaw.json') TELEGRAM_ACCOUNT = 'stock' EMAIL_RECIPIENT = 'mini.snowoyh@gmail.com' HTML_STYLES = { 'wrap': 'font-family:-apple-system,BlinkMacSystemFont,"Apple SD Gothic Neo",sans-serif;font-size:14px;color:#222;max-width:680px;line-height:1.55;', 'greet': 'color:#444;margin-bottom:18px;', 'card': 'border:1px solid #e5e5e5;border-radius:8px;padding:16px 18px;margin-bottom:18px;background:#fafafa;', 'card_title': 'font-size:16px;font-weight:700;color:#111;margin-bottom:4px;', 'card_meta': 'color:#666;font-size:12px;font-weight:400;margin-left:6px;', 'price_table': 'width:100%;border-collapse:collapse;background:#fff;border:1px solid #eee;border-radius:6px;margin:10px 0 14px;', 'price_label': 'color:#666;padding:6px 10px;width:80px;font-size:13px;border-bottom:1px solid #f0f0f0;', 'price_value': 'color:#111;padding:6px 10px;font-size:13px;font-weight:500;border-bottom:1px solid #f0f0f0;', 'section': 'font-size:13px;font-weight:600;color:#333;margin:10px 0 6px;border-left:3px solid #333;padding-left:8px;', 'bullet_list': 'margin:4px 0 8px 0;padding-left:20px;color:#333;font-size:13px;', 'bullet_item': 'margin:3px 0;line-height:1.5;', 'footer': 'color:#888;font-size:12px;margin-top:10px;border-top:1px dashed #ddd;padding-top:8px;', 'link': 'color:#0066cc;text-decoration:none;', 'pos': 'color:#d24f4f;font-weight:600;', 'neg': 'color:#1565c0;font-weight:600;', } def html_escape(s) -> str: return (str(s) .replace('&', '&') .replace('<', '<') .replace('>', '>') .replace('"', '"') .replace("'", ''')) def load_json(path: Path, default): if path.exists(): try: return json.loads(path.read_text()) except Exception: return default return default def save_json(path: Path, data): """tmp + rename으로 원자적 저장. 부분 쓰기 방지.""" tmp = path.with_suffix(path.suffix + '.tmp') tmp.write_text(json.dumps(data, ensure_ascii=False, indent=2)) tmp.replace(path) import fcntl as _fcntl from contextlib import contextmanager as _contextmanager @_contextmanager def watchlist_lock(): """프로세스간 watchlist read-modify-write 직렬화. 2026-04-27 비하이브 cron이 7개 save를 병렬 호출해 8종목 유실 사고 후 도입. fcntl.flock(LOCK_EX)로 같은 머신 내 모든 프로세스가 큐잉됨. """ lock_path = WATCHLIST.with_suffix(WATCHLIST.suffix + '.lock') lock_path.parent.mkdir(parents=True, exist_ok=True) f = open(lock_path, 'a') try: _fcntl.flock(f.fileno(), _fcntl.LOCK_EX) yield finally: try: _fcntl.flock(f.fileno(), _fcntl.LOCK_UN) finally: f.close() def _is_recent_published_text(text: str) -> bool: if not text: return True compact = text.replace(' ', '') digits = ''.join(ch for ch in compact if ch.isdigit()) value = int(digits) if digits else None if '분전' in compact or '시간전' in compact or '일전' in compact: return True if '주전' in compact: return value is not None and value <= 1 return False def _search_results_fallback() -> list[dict]: query = urllib.parse.quote(f'{CHANNEL_NAME} {TITLE_FILTER}') url = SEARCH_URL.format(query=query) req = urllib.request.Request(url, headers={'User-Agent': FEED_USER_AGENT}) with urllib.request.urlopen(req, timeout=30) as r: html = r.read().decode('utf-8', 'ignore') marker = 'var ytInitialData = ' start = html.find(marker) if start < 0: raise RuntimeError('ytInitialData not found in YouTube search HTML') start += len(marker) end = html.find(';', start) if end < 0: raise RuntimeError('ytInitialData terminator not found in YouTube search HTML') data = json.loads(html[start:end]) renderers = [] def walk(node): if isinstance(node, dict): vr = node.get('videoRenderer') if vr: renderers.append(vr) for value in node.values(): walk(value) elif isinstance(node, list): for value in node: walk(value) walk(data) entries = [] seen_ids = set() for vr in renderers: owner = ''.join(run.get('text', '') for run in vr.get('ownerText', {}).get('runs', [])) title = ''.join(run.get('text', '') for run in vr.get('title', {}).get('runs', [])) vid = vr.get('videoId', '') published = vr.get('publishedTimeText', {}).get('simpleText', '') if owner != CHANNEL_NAME or TITLE_FILTER not in title or not vid or vid in seen_ids: continue if not _is_recent_published_text(published): continue seen_ids.add(vid) entries.append({ 'video_id': vid, 'title': title.strip(), 'published': published, 'url': f'https://www.youtube.com/watch?v={vid}', }) if len(entries) >= FETCH_LIMIT: break return entries def fetch_feed() -> list[dict]: req = urllib.request.Request(FEED_URL, headers={'User-Agent': FEED_USER_AGENT}) xml_text = None last_err: Exception | None = None for attempt in range(1, FEED_RETRY_ATTEMPTS + 1): try: with urllib.request.urlopen(req, timeout=30) as r: xml_text = r.read().decode('utf-8', 'ignore') break except urllib.error.HTTPError as e: last_err = e if e.code == 404: return _search_results_fallback() if e.code not in FEED_RETRY_STATUS or attempt == FEED_RETRY_ATTEMPTS: raise except (urllib.error.URLError, TimeoutError) as e: last_err = e if attempt == FEED_RETRY_ATTEMPTS: raise time.sleep(FEED_RETRY_BACKOFF_SEC * attempt) if xml_text is None: raise last_err or RuntimeError('feed fetch failed without exception') root = ET.fromstring(xml_text) ns = {'a': 'http://www.w3.org/2005/Atom', 'yt': 'http://www.youtube.com/xml/schemas/2015'} entries = [] for entry in root.findall('a:entry', ns): vid = entry.findtext('yt:videoId', default='', namespaces=ns) title = entry.findtext('a:title', default='', namespaces=ns) published = entry.findtext('a:published', default='', namespaces=ns) link = '' for l in entry.findall('a:link', ns): if l.attrib.get('rel') == 'alternate': link = l.attrib.get('href', '') if vid: entries.append({ 'video_id': vid, 'title': title.strip(), 'published': published, 'url': link or f'https://www.youtube.com/watch?v={vid}', }) return entries def fetch_transcript(video_id: str) -> tuple[str, str]: try: from youtube_transcript_api import YouTubeTranscriptApi except Exception as e: return '', f'error: youtube_transcript_api import 실패 ({e})' try: api = YouTubeTranscriptApi() fetched = api.fetch(video_id, languages=TRANSCRIPT_LANGS) parts = [] for snippet in fetched: text = getattr(snippet, 'text', None) if text is None and isinstance(snippet, dict): text = snippet.get('text', '') if text: parts.append(text.strip()) full = ' '.join(parts).strip() if not full: return '', 'unavailable' if len(full) > TRANSCRIPT_CHAR_LIMIT: full = full[:TRANSCRIPT_CHAR_LIMIT] + '... [자막 일부 생략]' return full, 'ok' except Exception as e: return '', f'unavailable: {type(e).__name__}' def cmd_fetch() -> int: seen = set(load_json(SEEN_STATE, {'seen': []}).get('seen', [])) entries = fetch_feed() matched = [ e for e in entries if TITLE_FILTER in e['title'] and e['video_id'] not in seen ] matched = list(reversed(matched))[:FETCH_LIMIT] for item in matched: text, status = fetch_transcript(item['video_id']) item['transcript'] = text item['transcript_status'] = status save_json(FETCH_CACHE, {'fetched_at': datetime.now(KST).isoformat(), 'items': matched}) lean = [ {k: v for k, v in item.items() if k != 'transcript'} for item in matched ] print(json.dumps(lean, ensure_ascii=False, indent=2)) return 0 def cmd_transcript(video_id: str) -> int: cache = load_json(FETCH_CACHE, {}) for item in cache.get('items', []): if item.get('video_id') == video_id: status = item.get('transcript_status', 'unknown') text = item.get('transcript', '') or '' print(f'transcript_status: {status}') print('---') print(text) return 0 print(f'video_id "{video_id}" not found in fetch cache', file=sys.stderr) return 1 def load_cached_video(video_id: str) -> dict: cache = load_json(FETCH_CACHE, {}) for item in cache.get('items', []): if item.get('video_id') == video_id: return { 'id': item.get('video_id'), 'title': item.get('title'), 'url': item.get('url'), 'published': item.get('published'), } return {'id': video_id} def mark_seen(video_id: str): state = load_json(SEEN_STATE, {'seen': []}) seen = state.get('seen', []) if video_id not in seen: seen.insert(0, video_id) state['seen'] = seen[:SEEN_CAP] save_json(SEEN_STATE, state) def _reclassify_stop_above_buy(entry: dict, stock: str) -> None: """Long-only 가정: stop은 buy보다 낮아야 함. 어기면 notes로 이동시키고 stop을 null화.""" buy = entry.get('buy') or {} stop = entry.get('stop') or {} if not isinstance(buy, dict) or not isinstance(stop, dict): return buy_primary = buy.get('primary') stop_value = stop.get('value') if not isinstance(buy_primary, (int, float)) or not isinstance(stop_value, (int, float)): return if stop_value < buy_primary: return stop_raw = stop.get('raw') or f'{stop_value}' notes = list(entry.get('notes') or []) notes.insert(0, f"(원문: '{stop_raw}' — 매입가 {buy_primary}원보다 높아 손절가 대신 지지선으로 재분류됨)") entry['notes'] = notes entry['stop'] = None print( f"[guard] {stock}: stop({stop_value}) >= buy.primary({buy_primary}) — 재분류 후 stop=null로 저장", file=sys.stderr, ) def cmd_save(video_id: str) -> int: raw = sys.stdin.read().strip() if not raw: print('empty stdin', file=sys.stderr) return 2 try: analysis = json.loads(raw) except Exception as e: print(f'invalid JSON: {e}', file=sys.stderr) return 2 stock = analysis.get('stock', '').strip() if not stock: print('missing "stock" field', file=sys.stderr) return 2 # 종목코드 자동 주입 (키움 기반 모니터링용). 실패 시 빈 문자열 — show/email은 여전히 종목명만으로 동작. code = (analysis.get('code') or '').strip() if not code: try: code = _get_kiwoom_client().resolve_stock_code(stock).get('code', '') except Exception as e: print(f'[warn] {stock} 종목코드 매핑 실패: {e}', file=sys.stderr) entry = { 'stock': stock, 'code': code, 'target': analysis.get('target'), 'buy': analysis.get('buy'), 'stop': analysis.get('stop'), 'upside_pct': analysis.get('upside_pct'), 'summary': analysis.get('summary', []), 'notes': analysis.get('notes', []), 'video': analysis.get('video') or load_cached_video(video_id), 'saved_at': datetime.now(KST).isoformat(), } _reclassify_stop_above_buy(entry, stock) with watchlist_lock(): watchlist = load_json(WATCHLIST, {}) existing = watchlist.get(stock) if isinstance(existing, dict) and existing.get('status') == 'pending_delete': mark_seen(video_id) print(f'skipped {stock} — pending_delete (video={video_id})', file=sys.stderr) return 0 watchlist[stock] = entry save_json(WATCHLIST, watchlist) mark_seen(video_id) print(f'saved {stock} (video={video_id})') return 0 def format_price_line(label: str, field) -> str: if not field or not isinstance(field, dict): return f'{label}: 언급 없음' raw = field.get('raw', '').strip() return f'{label}: {raw}' if raw else f'{label}: 언급 없음' def _get_kiwoom_client(): """Lazy import — 테스트 환경에서 kiwoom 자격증명 없을 때 다른 서브커맨드는 살아있도록.""" sys.path.insert(0, str(WORKSPACE / 'scripts')) import kiwoom_client # type: ignore return kiwoom_client def fetch_current_price(stock_name_or_code: str, buy_price: float | int | None = None) -> str | None: """키움 ka10001 현재가. buy_price 주어지면 매입가 대비 등락 표시. 실패 시 None.""" if not stock_name_or_code: return None try: kc = _get_kiwoom_client() info = kc.resolve_stock_code(stock_name_or_code) q = kc.get_stock_quote(info['code']) price = q.get('price', 0) if not price: return None if isinstance(buy_price, (int, float)) and buy_price > 0: diff = price - buy_price pct = (diff / buy_price) * 100 return f'{price:,}원 ({diff:+,.0f}원, {pct:+.2f}% vs 매입가)' return f'{price:,}원' except Exception: return None def _buy_primary(entry: dict) -> float | None: buy = entry.get('buy') if isinstance(buy, dict): v = buy.get('primary') if isinstance(v, (int, float)): return v return None def format_entry_block(entry: dict) -> str: stock = entry.get('stock', '') lines = [f'[종목분석] #{stock}', '━━━━━━━━━━'] target_line = format_price_line('목표가', entry.get('target')) upside = entry.get('upside_pct') if upside is not None and '언급 없음' not in target_line: sign = '+' if upside >= 0 else '' target_line = f'{target_line} ({sign}{upside:g}%)' lines.append(target_line) lines.append(format_price_line('매입가', entry.get('buy'))) lines.append(format_price_line('손절가', entry.get('stop'))) current = fetch_current_price(entry.get('code') or stock, _buy_primary(entry)) lines.append(f'현재가: {current}' if current else '현재가: 조회 불가') lines.append('') lines.append('주요내용') for b in entry.get('summary') or []: lines.append(f'- {b}') if entry.get('notes'): lines.append('') lines.append('기타') for b in entry['notes']: lines.append(f'- {b}') video = entry.get('video') or {} if video.get('url'): lines.append('') lines.append(f"출처: {video.get('title','')} {video['url']}") return '\n'.join(lines) def _format_current_html(current: str | None) -> str: """fetch_current_price 반환값을 색상 강조 HTML로 변환.""" if not current: return '조회 불가' if 'vs 매입가' in current and '(' in current: try: head, tail = current.split('(', 1) inner, _, _ = tail.partition(')') color = HTML_STYLES['pos'] if inner.strip().startswith('+') else HTML_STYLES['neg'] return f'{html_escape(head.strip())} ({html_escape(inner)})' except Exception: pass return html_escape(current) def format_entry_html_block(entry: dict) -> str: S = HTML_STYLES stock = entry.get('stock', '') current = fetch_current_price(entry.get('code') or stock, _buy_primary(entry)) parts = [f'
'] parts.append( f'
[종목분석] #{html_escape(stock)} ' f'현재가: {_format_current_html(current)}
' ) target_line = format_price_line('목표가', entry.get('target')) upside = entry.get('upside_pct') target_value = target_line.split(':', 1)[1].strip() if upside is not None and '언급 없음' not in target_line: sign = '+' if upside >= 0 else '' target_value = f'{html_escape(target_value)} = 0 else S["neg"]}">({sign}{upside:g}%)' else: target_value = html_escape(target_value) rows = [ ('목표가', target_value), ('매입가', html_escape(format_price_line('매입가', entry.get('buy')).split(':', 1)[1].strip())), ('손절가', html_escape(format_price_line('손절가', entry.get('stop')).split(':', 1)[1].strip())), ] parts.append(f'') for label, value in rows: parts.append( f'' f'' ) parts.append('
{html_escape(label)}{value}
') summary = entry.get('summary') or [] if summary: parts.append(f'
주요내용
') parts.append(f'') notes = entry.get('notes') or [] if notes: parts.append(f'
기타
') parts.append(f'') video = entry.get('video') or {} if video.get('url'): title = html_escape(video.get('title', '영상')) parts.append( f'
출처: ' f'{title} ↗' f'
' ) parts.append('
') return '\n'.join(parts) def build_email_html(entries: list[dict]) -> str: S = HTML_STYLES parts = [ f'
', f'
관리자님, 비하이브투자자문 신규 종목분석 {len(entries)}건을 전달드립니다.
', ] for entry in entries: parts.append(format_entry_html_block(entry)) parts.append('
') return '\n'.join(parts) def _entries_for_video_ids(video_ids: list[str]) -> tuple[list[dict], list[str]]: watchlist = load_json(WATCHLIST, {}) entries = [] missing = [] seen_keys = set() for vid in video_ids: matches = [ e for e in watchlist.values() if (e.get('video') or {}).get('id') == vid ] if not matches: missing.append(vid) continue for e in matches: key = (e.get('stock', ''), (e.get('video') or {}).get('id', '')) if key in seen_keys: continue seen_keys.add(key) entries.append(e) return entries, missing def cmd_email(video_ids: list[str]) -> int: if not video_ids: print('no video_ids provided', file=sys.stderr) return 2 entries, missing = _entries_for_video_ids(video_ids) if missing: print(f'watchlist missing video_ids: {missing}', file=sys.stderr) return 1 if not entries: print('no entries to send', file=sys.stderr) return 1 date_str = datetime.now(KST).strftime('%Y-%m-%d %H:%M') stock_names = ', '.join(e.get('stock', '') for e in entries) subject = f'[비하이브 종목분석] {len(entries)}개 보고서 — {date_str} ({stock_names})' html_body = build_email_html(entries) cmd = ['gog', 'gmail', 'send', '--to', EMAIL_RECIPIENT, '--subject', subject, '--body-html', html_body] p = subprocess.run(cmd, text=True, capture_output=True) if p.returncode != 0: print(p.stderr.strip() or p.stdout.strip(), file=sys.stderr) return 1 print(f'emailed {len(entries)} entries to {EMAIL_RECIPIENT}') return 0 def load_telegram_config() -> tuple[str, list[str]]: cfg = json.loads(CONFIG_PATH.read_text()) acct = cfg['channels']['telegram']['accounts'][TELEGRAM_ACCOUNT] token = acct['botToken'] chat_ids = acct.get('allowFrom') or [] return token, chat_ids def send_telegram(text: str) -> bool: token, chat_ids = load_telegram_config() if not chat_ids: print('no chat_ids configured', file=sys.stderr) return False url = f'https://api.telegram.org/bot{token}/sendMessage' ok = True for chat_id in chat_ids: payload = { 'chat_id': chat_id, 'text': text[:4000], 'disable_web_page_preview': 'true', } data = urllib.parse.urlencode(payload).encode() req = urllib.request.Request(url, data=data, method='POST') try: with urllib.request.urlopen(req, timeout=15) as r: if r.status != 200: print(f'telegram HTTP {r.status}', file=sys.stderr) ok = False except Exception as e: print(f'telegram send failed: {e}', file=sys.stderr) ok = False return ok def format_telegram_block(entry: dict) -> str: stock = entry.get('stock', '?') current = fetch_current_price(entry.get('code') or stock, _buy_primary(entry)) current_line = f'현재가: {current}' if current else '현재가: 조회 불가' buy_line = format_price_line('매입가', entry.get('buy')) target_line = format_price_line('목표가', entry.get('target')) upside = entry.get('upside_pct') if upside is not None and '언급 없음' not in target_line: sign = '+' if upside >= 0 else '' target_line = f'{target_line} ({sign}{upside:g}%)' stop_line = format_price_line('손절가', entry.get('stop')) return f'#{stock}\n• {current_line}\n• {buy_line}\n• {target_line}\n• {stop_line}' def cmd_notify(video_ids: list[str]) -> int: if not video_ids: print('no video_ids provided', file=sys.stderr) return 2 entries, _missing = _entries_for_video_ids(video_ids) if not entries: print('no matching entries in watchlist', file=sys.stderr) return 1 header = f'비하이브 종목분석 {len(entries)}건 제출' blocks = [format_telegram_block(e) for e in entries] text = header + '\n\n' + '\n\n'.join(blocks) if send_telegram(text): print(f'notified {len(entries)} stocks') return 0 return 1 def cmd_list() -> int: w = load_json(WATCHLIST, {}) if not w: print('watchlist 비어있음') return 0 entries = sorted(w.values(), key=lambda e: e.get('saved_at', ''), reverse=True) print(f'총 {len(entries)}개 종목') print('-' * 90) print(f'{"종목명":<12} {"목표가":<28} {"상승률":<8} {"매입가":<24} {"저장일":<12}') print('-' * 90) for e in entries: stock = e.get('stock', '?') target_raw = (e.get('target') or {}).get('raw', '언급 없음') buy_raw = (e.get('buy') or {}).get('raw', '언급 없음') up = e.get('upside_pct') up_s = f'+{up}%' if up is not None else 'n/a' saved = (e.get('saved_at') or '')[:10] print(f'{stock:<12} {target_raw[:26]:<28} {up_s:<8} {buy_raw[:22]:<24} {saved:<12}') return 0 def cmd_show(stock: str) -> int: w = load_json(WATCHLIST, {}) entry = w.get(stock) if not entry: matches = [k for k in w if stock in k] if len(matches) == 1: entry = w[matches[0]] elif len(matches) > 1: print(f'여러 종목 매칭: {", ".join(matches)}. 정확한 종목명 지정 필요.', file=sys.stderr) return 1 else: print(f'watchlist에 "{stock}" 없음. 현재 등록: {", ".join(w.keys()) or "(없음)"}', file=sys.stderr) return 1 print(format_entry_block(entry)) saved = entry.get('saved_at', '') if saved: print(f'\n(저장일시: {saved})') return 0 def cmd_add(argv: list[str]) -> int: """수동 워치리스트 추가. usage: add [--code 005930] [--buy 13000] [--target 16000-17000] [--stop 12000] [--note "..."] [--note "..."] --code 생략 시 키움 종목코드 캐시로 자동 매핑. --target 은 단일값 또는 'low-high' 형식. """ import argparse p = argparse.ArgumentParser(prog='behive_youtube_digest.py add', add_help=True) p.add_argument('stock') p.add_argument('--code') p.add_argument('--buy', type=int, help='매입가 (단일 정수)') p.add_argument('--target', help='목표가 — "16000" 또는 "16000-17000"') p.add_argument('--stop', type=int, help='손절가') p.add_argument('--note', action='append', default=[]) try: args = p.parse_args(argv) except SystemExit as e: return int(e.code or 2) stock = args.stock.strip() # 종목코드 확보 code = (args.code or '').strip() resolved_name = stock if not code: try: info = _get_kiwoom_client().resolve_stock_code(stock) code = info.get('code', '') # 캐시 공식 종목명과 다르면 교정 알림 if info.get('name') and info['name'] != stock: print(f'[info] "{stock}" → 공식 종목명 "{info["name"]}"로 저장', file=sys.stderr) resolved_name = info['name'] except Exception as e: print(f'종목코드 자동 매핑 실패: {e}. --code 6자리로 지정해주세요.', file=sys.stderr) return 1 target = None if args.target: raw = args.target.strip() if '-' in raw: lo_s, hi_s = raw.split('-', 1) lo, hi = int(lo_s.replace(',', '').strip()), int(hi_s.replace(',', '').strip()) target = {'raw': raw, 'low': lo, 'high': hi} else: v = int(raw.replace(',', '').strip()) target = {'raw': raw, 'low': v, 'high': v} buy = None if args.buy is not None: buy = {'raw': f'{args.buy:,}원', 'primary': args.buy, 'levels': [args.buy]} stop = None if args.stop is not None: stop = {'raw': f'{args.stop:,}원', 'value': args.stop} upside = None if target and buy and buy.get('primary'): upside = round((target['low'] / buy['primary'] - 1) * 100, 1) entry = { 'stock': resolved_name, 'code': code, 'target': target, 'buy': buy, 'stop': stop, 'upside_pct': upside, 'summary': [], 'notes': list(args.note or []), 'video': {'source': 'manual'}, 'saved_at': datetime.now(KST).isoformat(), } _reclassify_stop_above_buy(entry, resolved_name) with watchlist_lock(): watchlist = load_json(WATCHLIST, {}) watchlist[resolved_name] = entry save_json(WATCHLIST, watchlist) parts = [f'added {resolved_name} ({code})'] if target: parts.append(f"target={target['raw']}") if buy: parts.append(f"buy={buy['raw']}") if stop: parts.append(f"stop={stop['raw']}") print(' · '.join(parts)) return 0 def cmd_remove(stock: str) -> int: with watchlist_lock(): w = load_json(WATCHLIST, {}) if stock not in w: print(f'watchlist에 "{stock}" 없음', file=sys.stderr) return 1 w.pop(stock) save_json(WATCHLIST, w) print(f'removed {stock}') return 0 def main(): if len(sys.argv) < 2: print(__doc__, file=sys.stderr) return 2 cmd = sys.argv[1] if cmd == 'fetch': return cmd_fetch() if cmd == 'transcript': if len(sys.argv) < 3: print('usage: transcript VIDEO_ID', file=sys.stderr) return 2 return cmd_transcript(sys.argv[2]) if cmd == 'save': if len(sys.argv) < 3: print('usage: save VIDEO_ID (stdin = JSON)', file=sys.stderr) return 2 return cmd_save(sys.argv[2]) if cmd == 'add': if len(sys.argv) < 3: print('usage: add [--code 6자리] [--buy 13000] [--target 16000-17000] [--stop 12000] [--note "..."]', file=sys.stderr) return 2 return cmd_add(sys.argv[2:]) if cmd == 'email': return cmd_email(sys.argv[2:]) if cmd == 'notify': return cmd_notify(sys.argv[2:]) if cmd == 'list': return cmd_list() if cmd == 'show': if len(sys.argv) < 3: print('usage: show STOCK', file=sys.stderr) return 2 return cmd_show(sys.argv[2]) if cmd == 'remove': if len(sys.argv) < 3: print('usage: remove STOCK', file=sys.stderr) return 2 return cmd_remove(sys.argv[2]) print(f'unknown command: {cmd}', file=sys.stderr) return 2 if __name__ == '__main__': sys.exit(main())