549545bde6
설정·스크립트·스킬·문서·큐레이션 메모리 추적. 시크릿(credentials/identity)·런타임 상태(state/logs/sessions/sqlite)· 백업(clobbered/bak)·dream 캐시는 .gitignore로 제외. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
847 lines
31 KiB
Python
Executable File
847 lines
31 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""@비하이브투자자문 신규 '종목분석' 영상 감지 + 자막 수집 + watchlist 저장 + 이메일/텔레그램 발송 + 조회.
|
|
|
|
Subcommands:
|
|
fetch — 신규 매칭 영상 메타데이터 JSON 출력(자막 제외) + 자막은 fetch 캐시에만 저장
|
|
transcript VIDEO_ID — fetch 캐시에서 특정 영상의 자막만 꺼내 출력 (토큰 절감용)
|
|
save VIDEO_ID — stdin JSON 분석 데이터를 watchlist에 저장하고 seen 기록
|
|
add STOCK ... — 수동 watchlist 추가 (--code, --buy, --target, --stop, --note)
|
|
email VIDEO_IDS... — watchlist에서 해당 영상들을 읽어 단일 이메일 발송
|
|
notify VIDEO_IDS... — 레이 텔레그램으로 "N개 보고서 제출 (종목명들)" 요약 발송
|
|
list — watchlist 전체 종목 요약 테이블 출력
|
|
show STOCK — 특정 종목의 상세 분석 내용 출력
|
|
remove STOCK — watchlist에서 특정 종목 제거
|
|
|
|
현재가 조회는 키움 ka10001(`kiwoom_client.get_stock_quote`) 사용.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
import subprocess
|
|
import sys
|
|
import time
|
|
import urllib.error
|
|
import urllib.parse
|
|
import urllib.request
|
|
import xml.etree.ElementTree as ET
|
|
from datetime import datetime, timezone, timedelta
|
|
from pathlib import Path
|
|
|
|
KST = timezone(timedelta(hours=9))
|
|
|
|
FEED_USER_AGENT = (
|
|
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) '
|
|
'AppleWebKit/537.36 (KHTML, like Gecko) '
|
|
'Chrome/122.0.0.0 Safari/537.36'
|
|
)
|
|
FEED_RETRY_ATTEMPTS = 4
|
|
FEED_RETRY_BACKOFF_SEC = 3.0
|
|
FEED_RETRY_STATUS = {408, 429, 500, 502, 503, 504}
|
|
|
|
CHANNEL_ID = 'UCHTRF5r154igU2gXjudUMzg'
|
|
CHANNEL_NAME = '비하이브 투자자문'
|
|
FEED_URL = f'https://www.youtube.com/feeds/videos.xml?channel_id={CHANNEL_ID}'
|
|
SEARCH_URL = 'https://www.youtube.com/results?search_query={query}&sp=CAISAhAB'
|
|
TITLE_FILTER = '종목분석'
|
|
TRANSCRIPT_LANGS = ['ko', 'ko-KR', 'en']
|
|
TRANSCRIPT_CHAR_LIMIT = 8000
|
|
FETCH_LIMIT = 10
|
|
SEEN_CAP = 200
|
|
|
|
WORKSPACE = Path('/Users/snowoyh/.openclaw/agents/stock/workspace')
|
|
STATE_DIR = WORKSPACE / 'state'
|
|
STATE_DIR.mkdir(parents=True, exist_ok=True)
|
|
SEEN_STATE = STATE_DIR / 'behive_youtube_seen.json'
|
|
FETCH_CACHE = STATE_DIR / 'behive_last_fetch.json'
|
|
WATCHLIST = STATE_DIR / 'behive_watchlist.json'
|
|
|
|
CONFIG_PATH = Path('/Users/snowoyh/.openclaw/openclaw.json')
|
|
TELEGRAM_ACCOUNT = 'stock'
|
|
EMAIL_RECIPIENT = 'mini.snowoyh@gmail.com'
|
|
|
|
HTML_STYLES = {
|
|
'wrap': 'font-family:-apple-system,BlinkMacSystemFont,"Apple SD Gothic Neo",sans-serif;font-size:14px;color:#222;max-width:680px;line-height:1.55;',
|
|
'greet': 'color:#444;margin-bottom:18px;',
|
|
'card': 'border:1px solid #e5e5e5;border-radius:8px;padding:16px 18px;margin-bottom:18px;background:#fafafa;',
|
|
'card_title': 'font-size:16px;font-weight:700;color:#111;margin-bottom:4px;',
|
|
'card_meta': 'color:#666;font-size:12px;font-weight:400;margin-left:6px;',
|
|
'price_table': 'width:100%;border-collapse:collapse;background:#fff;border:1px solid #eee;border-radius:6px;margin:10px 0 14px;',
|
|
'price_label': 'color:#666;padding:6px 10px;width:80px;font-size:13px;border-bottom:1px solid #f0f0f0;',
|
|
'price_value': 'color:#111;padding:6px 10px;font-size:13px;font-weight:500;border-bottom:1px solid #f0f0f0;',
|
|
'section': 'font-size:13px;font-weight:600;color:#333;margin:10px 0 6px;border-left:3px solid #333;padding-left:8px;',
|
|
'bullet_list': 'margin:4px 0 8px 0;padding-left:20px;color:#333;font-size:13px;',
|
|
'bullet_item': 'margin:3px 0;line-height:1.5;',
|
|
'footer': 'color:#888;font-size:12px;margin-top:10px;border-top:1px dashed #ddd;padding-top:8px;',
|
|
'link': 'color:#0066cc;text-decoration:none;',
|
|
'pos': 'color:#d24f4f;font-weight:600;',
|
|
'neg': 'color:#1565c0;font-weight:600;',
|
|
}
|
|
|
|
|
|
def html_escape(s) -> str:
|
|
return (str(s)
|
|
.replace('&', '&')
|
|
.replace('<', '<')
|
|
.replace('>', '>')
|
|
.replace('"', '"')
|
|
.replace("'", '''))
|
|
|
|
|
|
def load_json(path: Path, default):
|
|
if path.exists():
|
|
try:
|
|
return json.loads(path.read_text())
|
|
except Exception:
|
|
return default
|
|
return default
|
|
|
|
|
|
def save_json(path: Path, data):
|
|
"""tmp + rename으로 원자적 저장. 부분 쓰기 방지."""
|
|
tmp = path.with_suffix(path.suffix + '.tmp')
|
|
tmp.write_text(json.dumps(data, ensure_ascii=False, indent=2))
|
|
tmp.replace(path)
|
|
|
|
|
|
import fcntl as _fcntl
|
|
from contextlib import contextmanager as _contextmanager
|
|
|
|
|
|
@_contextmanager
|
|
def watchlist_lock():
|
|
"""프로세스간 watchlist read-modify-write 직렬화.
|
|
2026-04-27 비하이브 cron이 7개 save를 병렬 호출해 8종목 유실 사고 후 도입.
|
|
fcntl.flock(LOCK_EX)로 같은 머신 내 모든 프로세스가 큐잉됨.
|
|
"""
|
|
lock_path = WATCHLIST.with_suffix(WATCHLIST.suffix + '.lock')
|
|
lock_path.parent.mkdir(parents=True, exist_ok=True)
|
|
f = open(lock_path, 'a')
|
|
try:
|
|
_fcntl.flock(f.fileno(), _fcntl.LOCK_EX)
|
|
yield
|
|
finally:
|
|
try:
|
|
_fcntl.flock(f.fileno(), _fcntl.LOCK_UN)
|
|
finally:
|
|
f.close()
|
|
|
|
|
|
def _is_recent_published_text(text: str) -> bool:
|
|
if not text:
|
|
return True
|
|
compact = text.replace(' ', '')
|
|
digits = ''.join(ch for ch in compact if ch.isdigit())
|
|
value = int(digits) if digits else None
|
|
if '분전' in compact or '시간전' in compact or '일전' in compact:
|
|
return True
|
|
if '주전' in compact:
|
|
return value is not None and value <= 1
|
|
return False
|
|
|
|
|
|
def _search_results_fallback() -> list[dict]:
|
|
query = urllib.parse.quote(f'{CHANNEL_NAME} {TITLE_FILTER}')
|
|
url = SEARCH_URL.format(query=query)
|
|
req = urllib.request.Request(url, headers={'User-Agent': FEED_USER_AGENT})
|
|
with urllib.request.urlopen(req, timeout=30) as r:
|
|
html = r.read().decode('utf-8', 'ignore')
|
|
|
|
marker = 'var ytInitialData = '
|
|
start = html.find(marker)
|
|
if start < 0:
|
|
raise RuntimeError('ytInitialData not found in YouTube search HTML')
|
|
start += len(marker)
|
|
end = html.find(';</script>', start)
|
|
if end < 0:
|
|
raise RuntimeError('ytInitialData terminator not found in YouTube search HTML')
|
|
data = json.loads(html[start:end])
|
|
|
|
renderers = []
|
|
|
|
def walk(node):
|
|
if isinstance(node, dict):
|
|
vr = node.get('videoRenderer')
|
|
if vr:
|
|
renderers.append(vr)
|
|
for value in node.values():
|
|
walk(value)
|
|
elif isinstance(node, list):
|
|
for value in node:
|
|
walk(value)
|
|
|
|
walk(data)
|
|
|
|
entries = []
|
|
seen_ids = set()
|
|
for vr in renderers:
|
|
owner = ''.join(run.get('text', '') for run in vr.get('ownerText', {}).get('runs', []))
|
|
title = ''.join(run.get('text', '') for run in vr.get('title', {}).get('runs', []))
|
|
vid = vr.get('videoId', '')
|
|
published = vr.get('publishedTimeText', {}).get('simpleText', '')
|
|
if owner != CHANNEL_NAME or TITLE_FILTER not in title or not vid or vid in seen_ids:
|
|
continue
|
|
if not _is_recent_published_text(published):
|
|
continue
|
|
seen_ids.add(vid)
|
|
entries.append({
|
|
'video_id': vid,
|
|
'title': title.strip(),
|
|
'published': published,
|
|
'url': f'https://www.youtube.com/watch?v={vid}',
|
|
})
|
|
if len(entries) >= FETCH_LIMIT:
|
|
break
|
|
return entries
|
|
|
|
|
|
def fetch_feed() -> list[dict]:
|
|
req = urllib.request.Request(FEED_URL, headers={'User-Agent': FEED_USER_AGENT})
|
|
xml_text = None
|
|
last_err: Exception | None = None
|
|
for attempt in range(1, FEED_RETRY_ATTEMPTS + 1):
|
|
try:
|
|
with urllib.request.urlopen(req, timeout=30) as r:
|
|
xml_text = r.read().decode('utf-8', 'ignore')
|
|
break
|
|
except urllib.error.HTTPError as e:
|
|
last_err = e
|
|
if e.code == 404:
|
|
return _search_results_fallback()
|
|
if e.code not in FEED_RETRY_STATUS or attempt == FEED_RETRY_ATTEMPTS:
|
|
raise
|
|
except (urllib.error.URLError, TimeoutError) as e:
|
|
last_err = e
|
|
if attempt == FEED_RETRY_ATTEMPTS:
|
|
raise
|
|
time.sleep(FEED_RETRY_BACKOFF_SEC * attempt)
|
|
if xml_text is None:
|
|
raise last_err or RuntimeError('feed fetch failed without exception')
|
|
root = ET.fromstring(xml_text)
|
|
ns = {'a': 'http://www.w3.org/2005/Atom', 'yt': 'http://www.youtube.com/xml/schemas/2015'}
|
|
entries = []
|
|
for entry in root.findall('a:entry', ns):
|
|
vid = entry.findtext('yt:videoId', default='', namespaces=ns)
|
|
title = entry.findtext('a:title', default='', namespaces=ns)
|
|
published = entry.findtext('a:published', default='', namespaces=ns)
|
|
link = ''
|
|
for l in entry.findall('a:link', ns):
|
|
if l.attrib.get('rel') == 'alternate':
|
|
link = l.attrib.get('href', '')
|
|
if vid:
|
|
entries.append({
|
|
'video_id': vid,
|
|
'title': title.strip(),
|
|
'published': published,
|
|
'url': link or f'https://www.youtube.com/watch?v={vid}',
|
|
})
|
|
return entries
|
|
|
|
|
|
def fetch_transcript(video_id: str) -> tuple[str, str]:
|
|
try:
|
|
from youtube_transcript_api import YouTubeTranscriptApi
|
|
except Exception as e:
|
|
return '', f'error: youtube_transcript_api import 실패 ({e})'
|
|
try:
|
|
api = YouTubeTranscriptApi()
|
|
fetched = api.fetch(video_id, languages=TRANSCRIPT_LANGS)
|
|
parts = []
|
|
for snippet in fetched:
|
|
text = getattr(snippet, 'text', None)
|
|
if text is None and isinstance(snippet, dict):
|
|
text = snippet.get('text', '')
|
|
if text:
|
|
parts.append(text.strip())
|
|
full = ' '.join(parts).strip()
|
|
if not full:
|
|
return '', 'unavailable'
|
|
if len(full) > TRANSCRIPT_CHAR_LIMIT:
|
|
full = full[:TRANSCRIPT_CHAR_LIMIT] + '... [자막 일부 생략]'
|
|
return full, 'ok'
|
|
except Exception as e:
|
|
return '', f'unavailable: {type(e).__name__}'
|
|
|
|
|
|
def cmd_fetch() -> int:
|
|
seen = set(load_json(SEEN_STATE, {'seen': []}).get('seen', []))
|
|
entries = fetch_feed()
|
|
matched = [
|
|
e for e in entries
|
|
if TITLE_FILTER in e['title'] and e['video_id'] not in seen
|
|
]
|
|
matched = list(reversed(matched))[:FETCH_LIMIT]
|
|
for item in matched:
|
|
text, status = fetch_transcript(item['video_id'])
|
|
item['transcript'] = text
|
|
item['transcript_status'] = status
|
|
save_json(FETCH_CACHE, {'fetched_at': datetime.now(KST).isoformat(), 'items': matched})
|
|
lean = [
|
|
{k: v for k, v in item.items() if k != 'transcript'}
|
|
for item in matched
|
|
]
|
|
print(json.dumps(lean, ensure_ascii=False, indent=2))
|
|
return 0
|
|
|
|
|
|
def cmd_transcript(video_id: str) -> int:
|
|
cache = load_json(FETCH_CACHE, {})
|
|
for item in cache.get('items', []):
|
|
if item.get('video_id') == video_id:
|
|
status = item.get('transcript_status', 'unknown')
|
|
text = item.get('transcript', '') or ''
|
|
print(f'transcript_status: {status}')
|
|
print('---')
|
|
print(text)
|
|
return 0
|
|
print(f'video_id "{video_id}" not found in fetch cache', file=sys.stderr)
|
|
return 1
|
|
|
|
|
|
def load_cached_video(video_id: str) -> dict:
|
|
cache = load_json(FETCH_CACHE, {})
|
|
for item in cache.get('items', []):
|
|
if item.get('video_id') == video_id:
|
|
return {
|
|
'id': item.get('video_id'),
|
|
'title': item.get('title'),
|
|
'url': item.get('url'),
|
|
'published': item.get('published'),
|
|
}
|
|
return {'id': video_id}
|
|
|
|
|
|
def mark_seen(video_id: str):
|
|
state = load_json(SEEN_STATE, {'seen': []})
|
|
seen = state.get('seen', [])
|
|
if video_id not in seen:
|
|
seen.insert(0, video_id)
|
|
state['seen'] = seen[:SEEN_CAP]
|
|
save_json(SEEN_STATE, state)
|
|
|
|
|
|
def _reclassify_stop_above_buy(entry: dict, stock: str) -> None:
|
|
"""Long-only 가정: stop은 buy보다 낮아야 함. 어기면 notes로 이동시키고 stop을 null화."""
|
|
buy = entry.get('buy') or {}
|
|
stop = entry.get('stop') or {}
|
|
if not isinstance(buy, dict) or not isinstance(stop, dict):
|
|
return
|
|
buy_primary = buy.get('primary')
|
|
stop_value = stop.get('value')
|
|
if not isinstance(buy_primary, (int, float)) or not isinstance(stop_value, (int, float)):
|
|
return
|
|
if stop_value < buy_primary:
|
|
return
|
|
stop_raw = stop.get('raw') or f'{stop_value}'
|
|
notes = list(entry.get('notes') or [])
|
|
notes.insert(0, f"(원문: '{stop_raw}' — 매입가 {buy_primary}원보다 높아 손절가 대신 지지선으로 재분류됨)")
|
|
entry['notes'] = notes
|
|
entry['stop'] = None
|
|
print(
|
|
f"[guard] {stock}: stop({stop_value}) >= buy.primary({buy_primary}) — 재분류 후 stop=null로 저장",
|
|
file=sys.stderr,
|
|
)
|
|
|
|
|
|
def cmd_save(video_id: str) -> int:
|
|
raw = sys.stdin.read().strip()
|
|
if not raw:
|
|
print('empty stdin', file=sys.stderr)
|
|
return 2
|
|
try:
|
|
analysis = json.loads(raw)
|
|
except Exception as e:
|
|
print(f'invalid JSON: {e}', file=sys.stderr)
|
|
return 2
|
|
stock = analysis.get('stock', '').strip()
|
|
if not stock:
|
|
print('missing "stock" field', file=sys.stderr)
|
|
return 2
|
|
# 종목코드 자동 주입 (키움 기반 모니터링용). 실패 시 빈 문자열 — show/email은 여전히 종목명만으로 동작.
|
|
code = (analysis.get('code') or '').strip()
|
|
if not code:
|
|
try:
|
|
code = _get_kiwoom_client().resolve_stock_code(stock).get('code', '')
|
|
except Exception as e:
|
|
print(f'[warn] {stock} 종목코드 매핑 실패: {e}', file=sys.stderr)
|
|
entry = {
|
|
'stock': stock,
|
|
'code': code,
|
|
'target': analysis.get('target'),
|
|
'buy': analysis.get('buy'),
|
|
'stop': analysis.get('stop'),
|
|
'upside_pct': analysis.get('upside_pct'),
|
|
'summary': analysis.get('summary', []),
|
|
'notes': analysis.get('notes', []),
|
|
'video': analysis.get('video') or load_cached_video(video_id),
|
|
'saved_at': datetime.now(KST).isoformat(),
|
|
}
|
|
_reclassify_stop_above_buy(entry, stock)
|
|
with watchlist_lock():
|
|
watchlist = load_json(WATCHLIST, {})
|
|
existing = watchlist.get(stock)
|
|
if isinstance(existing, dict) and existing.get('status') == 'pending_delete':
|
|
mark_seen(video_id)
|
|
print(f'skipped {stock} — pending_delete (video={video_id})', file=sys.stderr)
|
|
return 0
|
|
watchlist[stock] = entry
|
|
save_json(WATCHLIST, watchlist)
|
|
mark_seen(video_id)
|
|
print(f'saved {stock} (video={video_id})')
|
|
return 0
|
|
|
|
|
|
def format_price_line(label: str, field) -> str:
|
|
if not field or not isinstance(field, dict):
|
|
return f'{label}: 언급 없음'
|
|
raw = field.get('raw', '').strip()
|
|
return f'{label}: {raw}' if raw else f'{label}: 언급 없음'
|
|
|
|
|
|
def _get_kiwoom_client():
|
|
"""Lazy import — 테스트 환경에서 kiwoom 자격증명 없을 때 다른 서브커맨드는 살아있도록."""
|
|
sys.path.insert(0, str(WORKSPACE / 'scripts'))
|
|
import kiwoom_client # type: ignore
|
|
return kiwoom_client
|
|
|
|
|
|
def fetch_current_price(stock_name_or_code: str, buy_price: float | int | None = None) -> str | None:
|
|
"""키움 ka10001 현재가. buy_price 주어지면 매입가 대비 등락 표시. 실패 시 None."""
|
|
if not stock_name_or_code:
|
|
return None
|
|
try:
|
|
kc = _get_kiwoom_client()
|
|
info = kc.resolve_stock_code(stock_name_or_code)
|
|
q = kc.get_stock_quote(info['code'])
|
|
price = q.get('price', 0)
|
|
if not price:
|
|
return None
|
|
if isinstance(buy_price, (int, float)) and buy_price > 0:
|
|
diff = price - buy_price
|
|
pct = (diff / buy_price) * 100
|
|
return f'{price:,}원 ({diff:+,.0f}원, {pct:+.2f}% vs 매입가)'
|
|
return f'{price:,}원'
|
|
except Exception:
|
|
return None
|
|
|
|
|
|
def _buy_primary(entry: dict) -> float | None:
|
|
buy = entry.get('buy')
|
|
if isinstance(buy, dict):
|
|
v = buy.get('primary')
|
|
if isinstance(v, (int, float)):
|
|
return v
|
|
return None
|
|
|
|
|
|
def format_entry_block(entry: dict) -> str:
|
|
stock = entry.get('stock', '')
|
|
lines = [f'[종목분석] #{stock}', '━━━━━━━━━━']
|
|
target_line = format_price_line('목표가', entry.get('target'))
|
|
upside = entry.get('upside_pct')
|
|
if upside is not None and '언급 없음' not in target_line:
|
|
sign = '+' if upside >= 0 else ''
|
|
target_line = f'{target_line} ({sign}{upside:g}%)'
|
|
lines.append(target_line)
|
|
lines.append(format_price_line('매입가', entry.get('buy')))
|
|
lines.append(format_price_line('손절가', entry.get('stop')))
|
|
current = fetch_current_price(entry.get('code') or stock, _buy_primary(entry))
|
|
lines.append(f'현재가: {current}' if current else '현재가: 조회 불가')
|
|
lines.append('')
|
|
lines.append('주요내용')
|
|
for b in entry.get('summary') or []:
|
|
lines.append(f'- {b}')
|
|
if entry.get('notes'):
|
|
lines.append('')
|
|
lines.append('기타')
|
|
for b in entry['notes']:
|
|
lines.append(f'- {b}')
|
|
video = entry.get('video') or {}
|
|
if video.get('url'):
|
|
lines.append('')
|
|
lines.append(f"출처: {video.get('title','')} {video['url']}")
|
|
return '\n'.join(lines)
|
|
|
|
|
|
def _format_current_html(current: str | None) -> str:
|
|
"""fetch_current_price 반환값을 색상 강조 HTML로 변환."""
|
|
if not current:
|
|
return '<span style="color:#888;">조회 불가</span>'
|
|
if 'vs 매입가' in current and '(' in current:
|
|
try:
|
|
head, tail = current.split('(', 1)
|
|
inner, _, _ = tail.partition(')')
|
|
color = HTML_STYLES['pos'] if inner.strip().startswith('+') else HTML_STYLES['neg']
|
|
return f'{html_escape(head.strip())} <span style="{color}">({html_escape(inner)})</span>'
|
|
except Exception:
|
|
pass
|
|
return html_escape(current)
|
|
|
|
|
|
def format_entry_html_block(entry: dict) -> str:
|
|
S = HTML_STYLES
|
|
stock = entry.get('stock', '')
|
|
current = fetch_current_price(entry.get('code') or stock, _buy_primary(entry))
|
|
parts = [f'<div style="{S["card"]}">']
|
|
parts.append(
|
|
f'<div style="{S["card_title"]}">[종목분석] #{html_escape(stock)} '
|
|
f'<span style="{S["card_meta"]}">현재가: {_format_current_html(current)}</span></div>'
|
|
)
|
|
|
|
target_line = format_price_line('목표가', entry.get('target'))
|
|
upside = entry.get('upside_pct')
|
|
target_value = target_line.split(':', 1)[1].strip()
|
|
if upside is not None and '언급 없음' not in target_line:
|
|
sign = '+' if upside >= 0 else ''
|
|
target_value = f'{html_escape(target_value)} <span style="{S["pos"] if upside >= 0 else S["neg"]}">({sign}{upside:g}%)</span>'
|
|
else:
|
|
target_value = html_escape(target_value)
|
|
|
|
rows = [
|
|
('목표가', target_value),
|
|
('매입가', html_escape(format_price_line('매입가', entry.get('buy')).split(':', 1)[1].strip())),
|
|
('손절가', html_escape(format_price_line('손절가', entry.get('stop')).split(':', 1)[1].strip())),
|
|
]
|
|
parts.append(f'<table style="{S["price_table"]}">')
|
|
for label, value in rows:
|
|
parts.append(
|
|
f'<tr><td style="{S["price_label"]}">{html_escape(label)}</td>'
|
|
f'<td style="{S["price_value"]}">{value}</td></tr>'
|
|
)
|
|
parts.append('</table>')
|
|
|
|
summary = entry.get('summary') or []
|
|
if summary:
|
|
parts.append(f'<div style="{S["section"]}">주요내용</div>')
|
|
parts.append(f'<ul style="{S["bullet_list"]}">')
|
|
for b in summary:
|
|
parts.append(f'<li style="{S["bullet_item"]}">{html_escape(b)}</li>')
|
|
parts.append('</ul>')
|
|
|
|
notes = entry.get('notes') or []
|
|
if notes:
|
|
parts.append(f'<div style="{S["section"]}">기타</div>')
|
|
parts.append(f'<ul style="{S["bullet_list"]}">')
|
|
for b in notes:
|
|
parts.append(f'<li style="{S["bullet_item"]}">{html_escape(b)}</li>')
|
|
parts.append('</ul>')
|
|
|
|
video = entry.get('video') or {}
|
|
if video.get('url'):
|
|
title = html_escape(video.get('title', '영상'))
|
|
parts.append(
|
|
f'<div style="{S["footer"]}">출처: '
|
|
f'<a href="{html_escape(video["url"])}" style="{S["link"]}" target="_blank">{title} ↗</a>'
|
|
f'</div>'
|
|
)
|
|
parts.append('</div>')
|
|
return '\n'.join(parts)
|
|
|
|
|
|
def build_email_html(entries: list[dict]) -> str:
|
|
S = HTML_STYLES
|
|
parts = [
|
|
f'<div style="{S["wrap"]}">',
|
|
f'<div style="{S["greet"]}">관리자님, 비하이브투자자문 신규 종목분석 {len(entries)}건을 전달드립니다.</div>',
|
|
]
|
|
for entry in entries:
|
|
parts.append(format_entry_html_block(entry))
|
|
parts.append('</div>')
|
|
return '\n'.join(parts)
|
|
|
|
|
|
def _entries_for_video_ids(video_ids: list[str]) -> tuple[list[dict], list[str]]:
|
|
watchlist = load_json(WATCHLIST, {})
|
|
entries = []
|
|
missing = []
|
|
seen_keys = set()
|
|
for vid in video_ids:
|
|
matches = [
|
|
e for e in watchlist.values()
|
|
if (e.get('video') or {}).get('id') == vid
|
|
]
|
|
if not matches:
|
|
missing.append(vid)
|
|
continue
|
|
for e in matches:
|
|
key = (e.get('stock', ''), (e.get('video') or {}).get('id', ''))
|
|
if key in seen_keys:
|
|
continue
|
|
seen_keys.add(key)
|
|
entries.append(e)
|
|
return entries, missing
|
|
|
|
|
|
|
|
def cmd_email(video_ids: list[str]) -> int:
|
|
if not video_ids:
|
|
print('no video_ids provided', file=sys.stderr)
|
|
return 2
|
|
entries, missing = _entries_for_video_ids(video_ids)
|
|
if missing:
|
|
print(f'watchlist missing video_ids: {missing}', file=sys.stderr)
|
|
return 1
|
|
if not entries:
|
|
print('no entries to send', file=sys.stderr)
|
|
return 1
|
|
date_str = datetime.now(KST).strftime('%Y-%m-%d %H:%M')
|
|
stock_names = ', '.join(e.get('stock', '') for e in entries)
|
|
subject = f'[비하이브 종목분석] {len(entries)}개 보고서 — {date_str} ({stock_names})'
|
|
html_body = build_email_html(entries)
|
|
cmd = ['gog', 'gmail', 'send', '--to', EMAIL_RECIPIENT, '--subject', subject, '--body-html', html_body]
|
|
p = subprocess.run(cmd, text=True, capture_output=True)
|
|
if p.returncode != 0:
|
|
print(p.stderr.strip() or p.stdout.strip(), file=sys.stderr)
|
|
return 1
|
|
print(f'emailed {len(entries)} entries to {EMAIL_RECIPIENT}')
|
|
return 0
|
|
|
|
|
|
def load_telegram_config() -> tuple[str, list[str]]:
|
|
cfg = json.loads(CONFIG_PATH.read_text())
|
|
acct = cfg['channels']['telegram']['accounts'][TELEGRAM_ACCOUNT]
|
|
token = acct['botToken']
|
|
chat_ids = acct.get('allowFrom') or []
|
|
return token, chat_ids
|
|
|
|
|
|
def send_telegram(text: str) -> bool:
|
|
token, chat_ids = load_telegram_config()
|
|
if not chat_ids:
|
|
print('no chat_ids configured', file=sys.stderr)
|
|
return False
|
|
url = f'https://api.telegram.org/bot{token}/sendMessage'
|
|
ok = True
|
|
for chat_id in chat_ids:
|
|
payload = {
|
|
'chat_id': chat_id,
|
|
'text': text[:4000],
|
|
'disable_web_page_preview': 'true',
|
|
}
|
|
data = urllib.parse.urlencode(payload).encode()
|
|
req = urllib.request.Request(url, data=data, method='POST')
|
|
try:
|
|
with urllib.request.urlopen(req, timeout=15) as r:
|
|
if r.status != 200:
|
|
print(f'telegram HTTP {r.status}', file=sys.stderr)
|
|
ok = False
|
|
except Exception as e:
|
|
print(f'telegram send failed: {e}', file=sys.stderr)
|
|
ok = False
|
|
return ok
|
|
|
|
|
|
def format_telegram_block(entry: dict) -> str:
|
|
stock = entry.get('stock', '?')
|
|
current = fetch_current_price(entry.get('code') or stock, _buy_primary(entry))
|
|
current_line = f'현재가: {current}' if current else '현재가: 조회 불가'
|
|
buy_line = format_price_line('매입가', entry.get('buy'))
|
|
target_line = format_price_line('목표가', entry.get('target'))
|
|
upside = entry.get('upside_pct')
|
|
if upside is not None and '언급 없음' not in target_line:
|
|
sign = '+' if upside >= 0 else ''
|
|
target_line = f'{target_line} ({sign}{upside:g}%)'
|
|
stop_line = format_price_line('손절가', entry.get('stop'))
|
|
return f'#{stock}\n• {current_line}\n• {buy_line}\n• {target_line}\n• {stop_line}'
|
|
|
|
|
|
def cmd_notify(video_ids: list[str]) -> int:
|
|
if not video_ids:
|
|
print('no video_ids provided', file=sys.stderr)
|
|
return 2
|
|
entries, _missing = _entries_for_video_ids(video_ids)
|
|
if not entries:
|
|
print('no matching entries in watchlist', file=sys.stderr)
|
|
return 1
|
|
header = f'비하이브 종목분석 {len(entries)}건 제출'
|
|
blocks = [format_telegram_block(e) for e in entries]
|
|
text = header + '\n\n' + '\n\n'.join(blocks)
|
|
if send_telegram(text):
|
|
print(f'notified {len(entries)} stocks')
|
|
return 0
|
|
return 1
|
|
|
|
|
|
def cmd_list() -> int:
|
|
w = load_json(WATCHLIST, {})
|
|
if not w:
|
|
print('watchlist 비어있음')
|
|
return 0
|
|
entries = sorted(w.values(), key=lambda e: e.get('saved_at', ''), reverse=True)
|
|
print(f'총 {len(entries)}개 종목')
|
|
print('-' * 90)
|
|
print(f'{"종목명":<12} {"목표가":<28} {"상승률":<8} {"매입가":<24} {"저장일":<12}')
|
|
print('-' * 90)
|
|
for e in entries:
|
|
stock = e.get('stock', '?')
|
|
target_raw = (e.get('target') or {}).get('raw', '언급 없음')
|
|
buy_raw = (e.get('buy') or {}).get('raw', '언급 없음')
|
|
up = e.get('upside_pct')
|
|
up_s = f'+{up}%' if up is not None else 'n/a'
|
|
saved = (e.get('saved_at') or '')[:10]
|
|
print(f'{stock:<12} {target_raw[:26]:<28} {up_s:<8} {buy_raw[:22]:<24} {saved:<12}')
|
|
return 0
|
|
|
|
|
|
def cmd_show(stock: str) -> int:
|
|
w = load_json(WATCHLIST, {})
|
|
entry = w.get(stock)
|
|
if not entry:
|
|
matches = [k for k in w if stock in k]
|
|
if len(matches) == 1:
|
|
entry = w[matches[0]]
|
|
elif len(matches) > 1:
|
|
print(f'여러 종목 매칭: {", ".join(matches)}. 정확한 종목명 지정 필요.', file=sys.stderr)
|
|
return 1
|
|
else:
|
|
print(f'watchlist에 "{stock}" 없음. 현재 등록: {", ".join(w.keys()) or "(없음)"}', file=sys.stderr)
|
|
return 1
|
|
print(format_entry_block(entry))
|
|
saved = entry.get('saved_at', '')
|
|
if saved:
|
|
print(f'\n(저장일시: {saved})')
|
|
return 0
|
|
|
|
|
|
def cmd_add(argv: list[str]) -> int:
|
|
"""수동 워치리스트 추가.
|
|
|
|
usage: add <stock_name> [--code 005930] [--buy 13000] [--target 16000-17000]
|
|
[--stop 12000] [--note "..."] [--note "..."]
|
|
|
|
--code 생략 시 키움 종목코드 캐시로 자동 매핑.
|
|
--target 은 단일값 또는 'low-high' 형식.
|
|
"""
|
|
import argparse
|
|
p = argparse.ArgumentParser(prog='behive_youtube_digest.py add', add_help=True)
|
|
p.add_argument('stock')
|
|
p.add_argument('--code')
|
|
p.add_argument('--buy', type=int, help='매입가 (단일 정수)')
|
|
p.add_argument('--target', help='목표가 — "16000" 또는 "16000-17000"')
|
|
p.add_argument('--stop', type=int, help='손절가')
|
|
p.add_argument('--note', action='append', default=[])
|
|
try:
|
|
args = p.parse_args(argv)
|
|
except SystemExit as e:
|
|
return int(e.code or 2)
|
|
|
|
stock = args.stock.strip()
|
|
# 종목코드 확보
|
|
code = (args.code or '').strip()
|
|
resolved_name = stock
|
|
if not code:
|
|
try:
|
|
info = _get_kiwoom_client().resolve_stock_code(stock)
|
|
code = info.get('code', '')
|
|
# 캐시 공식 종목명과 다르면 교정 알림
|
|
if info.get('name') and info['name'] != stock:
|
|
print(f'[info] "{stock}" → 공식 종목명 "{info["name"]}"로 저장', file=sys.stderr)
|
|
resolved_name = info['name']
|
|
except Exception as e:
|
|
print(f'종목코드 자동 매핑 실패: {e}. --code 6자리로 지정해주세요.', file=sys.stderr)
|
|
return 1
|
|
|
|
target = None
|
|
if args.target:
|
|
raw = args.target.strip()
|
|
if '-' in raw:
|
|
lo_s, hi_s = raw.split('-', 1)
|
|
lo, hi = int(lo_s.replace(',', '').strip()), int(hi_s.replace(',', '').strip())
|
|
target = {'raw': raw, 'low': lo, 'high': hi}
|
|
else:
|
|
v = int(raw.replace(',', '').strip())
|
|
target = {'raw': raw, 'low': v, 'high': v}
|
|
|
|
buy = None
|
|
if args.buy is not None:
|
|
buy = {'raw': f'{args.buy:,}원', 'primary': args.buy, 'levels': [args.buy]}
|
|
|
|
stop = None
|
|
if args.stop is not None:
|
|
stop = {'raw': f'{args.stop:,}원', 'value': args.stop}
|
|
|
|
upside = None
|
|
if target and buy and buy.get('primary'):
|
|
upside = round((target['low'] / buy['primary'] - 1) * 100, 1)
|
|
|
|
entry = {
|
|
'stock': resolved_name,
|
|
'code': code,
|
|
'target': target,
|
|
'buy': buy,
|
|
'stop': stop,
|
|
'upside_pct': upside,
|
|
'summary': [],
|
|
'notes': list(args.note or []),
|
|
'video': {'source': 'manual'},
|
|
'saved_at': datetime.now(KST).isoformat(),
|
|
}
|
|
_reclassify_stop_above_buy(entry, resolved_name)
|
|
with watchlist_lock():
|
|
watchlist = load_json(WATCHLIST, {})
|
|
watchlist[resolved_name] = entry
|
|
save_json(WATCHLIST, watchlist)
|
|
parts = [f'added {resolved_name} ({code})']
|
|
if target: parts.append(f"target={target['raw']}")
|
|
if buy: parts.append(f"buy={buy['raw']}")
|
|
if stop: parts.append(f"stop={stop['raw']}")
|
|
print(' · '.join(parts))
|
|
return 0
|
|
|
|
|
|
def cmd_remove(stock: str) -> int:
|
|
with watchlist_lock():
|
|
w = load_json(WATCHLIST, {})
|
|
if stock not in w:
|
|
print(f'watchlist에 "{stock}" 없음', file=sys.stderr)
|
|
return 1
|
|
w.pop(stock)
|
|
save_json(WATCHLIST, w)
|
|
print(f'removed {stock}')
|
|
return 0
|
|
|
|
|
|
def main():
|
|
if len(sys.argv) < 2:
|
|
print(__doc__, file=sys.stderr)
|
|
return 2
|
|
cmd = sys.argv[1]
|
|
if cmd == 'fetch':
|
|
return cmd_fetch()
|
|
if cmd == 'transcript':
|
|
if len(sys.argv) < 3:
|
|
print('usage: transcript VIDEO_ID', file=sys.stderr)
|
|
return 2
|
|
return cmd_transcript(sys.argv[2])
|
|
if cmd == 'save':
|
|
if len(sys.argv) < 3:
|
|
print('usage: save VIDEO_ID (stdin = JSON)', file=sys.stderr)
|
|
return 2
|
|
return cmd_save(sys.argv[2])
|
|
if cmd == 'add':
|
|
if len(sys.argv) < 3:
|
|
print('usage: add <stock_name> [--code 6자리] [--buy 13000] [--target 16000-17000] [--stop 12000] [--note "..."]', file=sys.stderr)
|
|
return 2
|
|
return cmd_add(sys.argv[2:])
|
|
if cmd == 'email':
|
|
return cmd_email(sys.argv[2:])
|
|
if cmd == 'notify':
|
|
return cmd_notify(sys.argv[2:])
|
|
if cmd == 'list':
|
|
return cmd_list()
|
|
if cmd == 'show':
|
|
if len(sys.argv) < 3:
|
|
print('usage: show STOCK', file=sys.stderr)
|
|
return 2
|
|
return cmd_show(sys.argv[2])
|
|
if cmd == 'remove':
|
|
if len(sys.argv) < 3:
|
|
print('usage: remove STOCK', file=sys.stderr)
|
|
return 2
|
|
return cmd_remove(sys.argv[2])
|
|
print(f'unknown command: {cmd}', file=sys.stderr)
|
|
return 2
|
|
|
|
|
|
if __name__ == '__main__':
|
|
sys.exit(main())
|