#!/usr/bin/env python3 """@비하이브투자자문 신규 '종목분석' 영상 감지 + 자막 수집 + watchlist 저장 + 이메일/텔레그램 발송 + 조회. Subcommands: fetch — 신규 매칭 영상 메타데이터 JSON 출력(자막 제외) + 자막은 fetch 캐시에만 저장 transcript VIDEO_ID — fetch 캐시에서 특정 영상의 자막만 꺼내 출력 (토큰 절감용) save VIDEO_ID — stdin JSON 분석 데이터를 watchlist에 저장하고 seen 기록 add STOCK ... — 수동 watchlist 추가 (--code, --buy, --target, --stop, --note) email VIDEO_IDS... — watchlist에서 해당 영상들을 읽어 단일 이메일 발송 notify VIDEO_IDS... — 레이 텔레그램으로 "N개 보고서 제출 (종목명들)" 요약 발송 list — watchlist 전체 종목 요약 테이블 출력 show STOCK — 특정 종목의 상세 분석 내용 출력 remove STOCK — watchlist에서 특정 종목 제거 현재가 조회는 키움 ka10001(`kiwoom_client.get_stock_quote`) 사용. """ from __future__ import annotations import json import subprocess import sys import time import urllib.error import urllib.parse import urllib.request import xml.etree.ElementTree as ET from datetime import datetime, timezone, timedelta from pathlib import Path KST = timezone(timedelta(hours=9)) FEED_USER_AGENT = ( 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) ' 'AppleWebKit/537.36 (KHTML, like Gecko) ' 'Chrome/122.0.0.0 Safari/537.36' ) FEED_RETRY_ATTEMPTS = 4 FEED_RETRY_BACKOFF_SEC = 3.0 FEED_RETRY_STATUS = {408, 429, 500, 502, 503, 504} CHANNEL_ID = 'UCHTRF5r154igU2gXjudUMzg' CHANNEL_NAME = '비하이브 투자자문' FEED_URL = f'https://www.youtube.com/feeds/videos.xml?channel_id={CHANNEL_ID}' SEARCH_URL = 'https://www.youtube.com/results?search_query={query}&sp=CAISAhAB' TITLE_FILTER = '종목분석' TRANSCRIPT_LANGS = ['ko', 'ko-KR', 'en'] TRANSCRIPT_CHAR_LIMIT = 8000 FETCH_LIMIT = 10 SEEN_CAP = 200 WORKSPACE = Path('/Users/snowoyh/.openclaw/agents/stock/workspace') STATE_DIR = WORKSPACE / 'state' STATE_DIR.mkdir(parents=True, exist_ok=True) SEEN_STATE = STATE_DIR / 'behive_youtube_seen.json' FETCH_CACHE = STATE_DIR / 'behive_last_fetch.json' WATCHLIST = STATE_DIR / 'behive_watchlist.json' CONFIG_PATH = Path('/Users/snowoyh/.openclaw/openclaw.json') TELEGRAM_ACCOUNT = 'stock' EMAIL_RECIPIENT = 'mini.snowoyh@gmail.com' HTML_STYLES = { 'wrap': 'font-family:-apple-system,BlinkMacSystemFont,"Apple SD Gothic Neo",sans-serif;font-size:14px;color:#222;max-width:680px;line-height:1.55;', 'greet': 'color:#444;margin-bottom:18px;', 'card': 'border:1px solid #e5e5e5;border-radius:8px;padding:16px 18px;margin-bottom:18px;background:#fafafa;', 'card_title': 'font-size:16px;font-weight:700;color:#111;margin-bottom:4px;', 'card_meta': 'color:#666;font-size:12px;font-weight:400;margin-left:6px;', 'price_table': 'width:100%;border-collapse:collapse;background:#fff;border:1px solid #eee;border-radius:6px;margin:10px 0 14px;', 'price_label': 'color:#666;padding:6px 10px;width:80px;font-size:13px;border-bottom:1px solid #f0f0f0;', 'price_value': 'color:#111;padding:6px 10px;font-size:13px;font-weight:500;border-bottom:1px solid #f0f0f0;', 'section': 'font-size:13px;font-weight:600;color:#333;margin:10px 0 6px;border-left:3px solid #333;padding-left:8px;', 'bullet_list': 'margin:4px 0 8px 0;padding-left:20px;color:#333;font-size:13px;', 'bullet_item': 'margin:3px 0;line-height:1.5;', 'footer': 'color:#888;font-size:12px;margin-top:10px;border-top:1px dashed #ddd;padding-top:8px;', 'link': 'color:#0066cc;text-decoration:none;', 'pos': 'color:#d24f4f;font-weight:600;', 'neg': 'color:#1565c0;font-weight:600;', } def html_escape(s) -> str: return (str(s) .replace('&', '&') .replace('<', '<') .replace('>', '>') .replace('"', '"') .replace("'", ''')) def load_json(path: Path, default): if path.exists(): try: return json.loads(path.read_text()) except Exception: return default return default def save_json(path: Path, data): """tmp + rename으로 원자적 저장. 부분 쓰기 방지.""" tmp = path.with_suffix(path.suffix + '.tmp') tmp.write_text(json.dumps(data, ensure_ascii=False, indent=2)) tmp.replace(path) import fcntl as _fcntl from contextlib import contextmanager as _contextmanager @_contextmanager def watchlist_lock(): """프로세스간 watchlist read-modify-write 직렬화. 2026-04-27 비하이브 cron이 7개 save를 병렬 호출해 8종목 유실 사고 후 도입. fcntl.flock(LOCK_EX)로 같은 머신 내 모든 프로세스가 큐잉됨. """ lock_path = WATCHLIST.with_suffix(WATCHLIST.suffix + '.lock') lock_path.parent.mkdir(parents=True, exist_ok=True) f = open(lock_path, 'a') try: _fcntl.flock(f.fileno(), _fcntl.LOCK_EX) yield finally: try: _fcntl.flock(f.fileno(), _fcntl.LOCK_UN) finally: f.close() def _is_recent_published_text(text: str) -> bool: if not text: return True compact = text.replace(' ', '') digits = ''.join(ch for ch in compact if ch.isdigit()) value = int(digits) if digits else None if '분전' in compact or '시간전' in compact or '일전' in compact: return True if '주전' in compact: return value is not None and value <= 1 return False def _search_results_fallback() -> list[dict]: query = urllib.parse.quote(f'{CHANNEL_NAME} {TITLE_FILTER}') url = SEARCH_URL.format(query=query) req = urllib.request.Request(url, headers={'User-Agent': FEED_USER_AGENT}) with urllib.request.urlopen(req, timeout=30) as r: html = r.read().decode('utf-8', 'ignore') marker = 'var ytInitialData = ' start = html.find(marker) if start < 0: raise RuntimeError('ytInitialData not found in YouTube search HTML') start += len(marker) end = html.find(';', start) if end < 0: raise RuntimeError('ytInitialData terminator not found in YouTube search HTML') data = json.loads(html[start:end]) renderers = [] def walk(node): if isinstance(node, dict): vr = node.get('videoRenderer') if vr: renderers.append(vr) for value in node.values(): walk(value) elif isinstance(node, list): for value in node: walk(value) walk(data) entries = [] seen_ids = set() for vr in renderers: owner = ''.join(run.get('text', '') for run in vr.get('ownerText', {}).get('runs', [])) title = ''.join(run.get('text', '') for run in vr.get('title', {}).get('runs', [])) vid = vr.get('videoId', '') published = vr.get('publishedTimeText', {}).get('simpleText', '') if owner != CHANNEL_NAME or TITLE_FILTER not in title or not vid or vid in seen_ids: continue if not _is_recent_published_text(published): continue seen_ids.add(vid) entries.append({ 'video_id': vid, 'title': title.strip(), 'published': published, 'url': f'https://www.youtube.com/watch?v={vid}', }) if len(entries) >= FETCH_LIMIT: break return entries def fetch_feed() -> list[dict]: req = urllib.request.Request(FEED_URL, headers={'User-Agent': FEED_USER_AGENT}) xml_text = None last_err: Exception | None = None for attempt in range(1, FEED_RETRY_ATTEMPTS + 1): try: with urllib.request.urlopen(req, timeout=30) as r: xml_text = r.read().decode('utf-8', 'ignore') break except urllib.error.HTTPError as e: last_err = e if e.code == 404: return _search_results_fallback() if e.code not in FEED_RETRY_STATUS or attempt == FEED_RETRY_ATTEMPTS: raise except (urllib.error.URLError, TimeoutError) as e: last_err = e if attempt == FEED_RETRY_ATTEMPTS: raise time.sleep(FEED_RETRY_BACKOFF_SEC * attempt) if xml_text is None: raise last_err or RuntimeError('feed fetch failed without exception') root = ET.fromstring(xml_text) ns = {'a': 'http://www.w3.org/2005/Atom', 'yt': 'http://www.youtube.com/xml/schemas/2015'} entries = [] for entry in root.findall('a:entry', ns): vid = entry.findtext('yt:videoId', default='', namespaces=ns) title = entry.findtext('a:title', default='', namespaces=ns) published = entry.findtext('a:published', default='', namespaces=ns) link = '' for l in entry.findall('a:link', ns): if l.attrib.get('rel') == 'alternate': link = l.attrib.get('href', '') if vid: entries.append({ 'video_id': vid, 'title': title.strip(), 'published': published, 'url': link or f'https://www.youtube.com/watch?v={vid}', }) return entries def fetch_transcript(video_id: str) -> tuple[str, str]: try: from youtube_transcript_api import YouTubeTranscriptApi except Exception as e: return '', f'error: youtube_transcript_api import 실패 ({e})' try: api = YouTubeTranscriptApi() fetched = api.fetch(video_id, languages=TRANSCRIPT_LANGS) parts = [] for snippet in fetched: text = getattr(snippet, 'text', None) if text is None and isinstance(snippet, dict): text = snippet.get('text', '') if text: parts.append(text.strip()) full = ' '.join(parts).strip() if not full: return '', 'unavailable' if len(full) > TRANSCRIPT_CHAR_LIMIT: full = full[:TRANSCRIPT_CHAR_LIMIT] + '... [자막 일부 생략]' return full, 'ok' except Exception as e: return '', f'unavailable: {type(e).__name__}' def cmd_fetch() -> int: seen = set(load_json(SEEN_STATE, {'seen': []}).get('seen', [])) entries = fetch_feed() matched = [ e for e in entries if TITLE_FILTER in e['title'] and e['video_id'] not in seen ] matched = list(reversed(matched))[:FETCH_LIMIT] for item in matched: text, status = fetch_transcript(item['video_id']) item['transcript'] = text item['transcript_status'] = status save_json(FETCH_CACHE, {'fetched_at': datetime.now(KST).isoformat(), 'items': matched}) lean = [ {k: v for k, v in item.items() if k != 'transcript'} for item in matched ] print(json.dumps(lean, ensure_ascii=False, indent=2)) return 0 def cmd_transcript(video_id: str) -> int: cache = load_json(FETCH_CACHE, {}) for item in cache.get('items', []): if item.get('video_id') == video_id: status = item.get('transcript_status', 'unknown') text = item.get('transcript', '') or '' print(f'transcript_status: {status}') print('---') print(text) return 0 print(f'video_id "{video_id}" not found in fetch cache', file=sys.stderr) return 1 def load_cached_video(video_id: str) -> dict: cache = load_json(FETCH_CACHE, {}) for item in cache.get('items', []): if item.get('video_id') == video_id: return { 'id': item.get('video_id'), 'title': item.get('title'), 'url': item.get('url'), 'published': item.get('published'), } return {'id': video_id} def mark_seen(video_id: str): state = load_json(SEEN_STATE, {'seen': []}) seen = state.get('seen', []) if video_id not in seen: seen.insert(0, video_id) state['seen'] = seen[:SEEN_CAP] save_json(SEEN_STATE, state) def _reclassify_stop_above_buy(entry: dict, stock: str) -> None: """Long-only 가정: stop은 buy보다 낮아야 함. 어기면 notes로 이동시키고 stop을 null화.""" buy = entry.get('buy') or {} stop = entry.get('stop') or {} if not isinstance(buy, dict) or not isinstance(stop, dict): return buy_primary = buy.get('primary') stop_value = stop.get('value') if not isinstance(buy_primary, (int, float)) or not isinstance(stop_value, (int, float)): return if stop_value < buy_primary: return stop_raw = stop.get('raw') or f'{stop_value}' notes = list(entry.get('notes') or []) notes.insert(0, f"(원문: '{stop_raw}' — 매입가 {buy_primary}원보다 높아 손절가 대신 지지선으로 재분류됨)") entry['notes'] = notes entry['stop'] = None print( f"[guard] {stock}: stop({stop_value}) >= buy.primary({buy_primary}) — 재분류 후 stop=null로 저장", file=sys.stderr, ) def cmd_save(video_id: str) -> int: raw = sys.stdin.read().strip() if not raw: print('empty stdin', file=sys.stderr) return 2 try: analysis = json.loads(raw) except Exception as e: print(f'invalid JSON: {e}', file=sys.stderr) return 2 stock = analysis.get('stock', '').strip() if not stock: print('missing "stock" field', file=sys.stderr) return 2 # 종목코드 자동 주입 (키움 기반 모니터링용). 실패 시 빈 문자열 — show/email은 여전히 종목명만으로 동작. code = (analysis.get('code') or '').strip() if not code: try: code = _get_kiwoom_client().resolve_stock_code(stock).get('code', '') except Exception as e: print(f'[warn] {stock} 종목코드 매핑 실패: {e}', file=sys.stderr) entry = { 'stock': stock, 'code': code, 'target': analysis.get('target'), 'buy': analysis.get('buy'), 'stop': analysis.get('stop'), 'upside_pct': analysis.get('upside_pct'), 'summary': analysis.get('summary', []), 'notes': analysis.get('notes', []), 'video': analysis.get('video') or load_cached_video(video_id), 'saved_at': datetime.now(KST).isoformat(), } _reclassify_stop_above_buy(entry, stock) with watchlist_lock(): watchlist = load_json(WATCHLIST, {}) existing = watchlist.get(stock) if isinstance(existing, dict) and existing.get('status') == 'pending_delete': mark_seen(video_id) print(f'skipped {stock} — pending_delete (video={video_id})', file=sys.stderr) return 0 watchlist[stock] = entry save_json(WATCHLIST, watchlist) mark_seen(video_id) print(f'saved {stock} (video={video_id})') return 0 def format_price_line(label: str, field) -> str: if not field or not isinstance(field, dict): return f'{label}: 언급 없음' raw = field.get('raw', '').strip() return f'{label}: {raw}' if raw else f'{label}: 언급 없음' def _get_kiwoom_client(): """Lazy import — 테스트 환경에서 kiwoom 자격증명 없을 때 다른 서브커맨드는 살아있도록.""" sys.path.insert(0, str(WORKSPACE / 'scripts')) import kiwoom_client # type: ignore return kiwoom_client def fetch_current_price(stock_name_or_code: str, buy_price: float | int | None = None) -> str | None: """키움 ka10001 현재가. buy_price 주어지면 매입가 대비 등락 표시. 실패 시 None.""" if not stock_name_or_code: return None try: kc = _get_kiwoom_client() info = kc.resolve_stock_code(stock_name_or_code) q = kc.get_stock_quote(info['code']) price = q.get('price', 0) if not price: return None if isinstance(buy_price, (int, float)) and buy_price > 0: diff = price - buy_price pct = (diff / buy_price) * 100 return f'{price:,}원 ({diff:+,.0f}원, {pct:+.2f}% vs 매입가)' return f'{price:,}원' except Exception: return None def _buy_primary(entry: dict) -> float | None: buy = entry.get('buy') if isinstance(buy, dict): v = buy.get('primary') if isinstance(v, (int, float)): return v return None def format_entry_block(entry: dict) -> str: stock = entry.get('stock', '') lines = [f'[종목분석] #{stock}', '━━━━━━━━━━'] target_line = format_price_line('목표가', entry.get('target')) upside = entry.get('upside_pct') if upside is not None and '언급 없음' not in target_line: sign = '+' if upside >= 0 else '' target_line = f'{target_line} ({sign}{upside:g}%)' lines.append(target_line) lines.append(format_price_line('매입가', entry.get('buy'))) lines.append(format_price_line('손절가', entry.get('stop'))) current = fetch_current_price(entry.get('code') or stock, _buy_primary(entry)) lines.append(f'현재가: {current}' if current else '현재가: 조회 불가') lines.append('') lines.append('주요내용') for b in entry.get('summary') or []: lines.append(f'- {b}') if entry.get('notes'): lines.append('') lines.append('기타') for b in entry['notes']: lines.append(f'- {b}') video = entry.get('video') or {} if video.get('url'): lines.append('') lines.append(f"출처: {video.get('title','')} {video['url']}") return '\n'.join(lines) def _format_current_html(current: str | None) -> str: """fetch_current_price 반환값을 색상 강조 HTML로 변환.""" if not current: return '조회 불가' if 'vs 매입가' in current and '(' in current: try: head, tail = current.split('(', 1) inner, _, _ = tail.partition(')') color = HTML_STYLES['pos'] if inner.strip().startswith('+') else HTML_STYLES['neg'] return f'{html_escape(head.strip())} ({html_escape(inner)})' except Exception: pass return html_escape(current) def format_entry_html_block(entry: dict) -> str: S = HTML_STYLES stock = entry.get('stock', '') current = fetch_current_price(entry.get('code') or stock, _buy_primary(entry)) parts = [f'
| {html_escape(label)} | ' f'{value} |