#!/usr/bin/env python3 from __future__ import annotations import argparse import json import re import subprocess import urllib.request from dataclasses import dataclass from urllib.parse import urljoin from datetime import date, datetime, timedelta from html import unescape from pathlib import Path from zoneinfo import ZoneInfo KST = ZoneInfo('Asia/Seoul') CALENDAR_ID = 'mini.snowoyh@gmail.com' WORKSPACE = Path('/Users/snowoyh/.openclaw/agents/stock/workspace') STATE_DIR = WORKSPACE / 'state' STATE_DIR.mkdir(parents=True, exist_ok=True) STATE_FILE = STATE_DIR / 'ipo_calendar_sync.json' SUBSCRIPTION_URL = 'https://www.38.co.kr/html/fund/index.htm?o=k' NAVER_IPO_URL = 'https://finance.naver.com/sise/ipo.naver' SOURCE_LABEL = '네이버 금융 IPO' @dataclass class EventSpec: kind: str name: str start_date: date end_date: date # inclusive brokers: str source_url: str @property def summary(self) -> str: prefix = '[공모청약]' if self.kind == 'subscription' else '[신규상장]' return f'{prefix} {self.name}' @property def description(self) -> str: label = '청약일' if self.kind == 'subscription' else '상장일' if self.start_date == self.end_date: date_str = self.start_date.isoformat() else: date_str = f'{self.start_date.isoformat()} ~ {self.end_date.isoformat()}' return ( f'종목명: {self.name}\n' f'증권사: {self.brokers or "미확인"}\n' f'{label}: {date_str}\n' f'기준: {SOURCE_LABEL}\n' f'출처: {self.source_url}' ) @property def state_key(self) -> str: return f'{self.kind}|{self.name}' def run(cmd: list[str]) -> str: p = subprocess.run(cmd, capture_output=True, text=True) if p.returncode != 0: raise RuntimeError(p.stderr.strip() or p.stdout.strip() or 'command failed') return p.stdout def fetch(url: str, encoding: str = 'euc-kr') -> str: req = urllib.request.Request(url, headers={'User-Agent': 'Mozilla/5.0'}) with urllib.request.urlopen(req, timeout=30) as r: raw = r.read() if encoding == 'auto': # 한글 페이지는 utf-8과 cp949 둘 중 하나 — strict 디코드 성공하는 쪽 사용 (네이버가 메타태그로 거짓말하는 경우 대비) for enc in ('utf-8', 'cp949'): try: return raw.decode(enc) except UnicodeDecodeError: continue return raw.decode('utf-8', errors='ignore') return raw.decode(encoding, 'ignore') def clean_text(html_fragment: str) -> str: text = re.sub(r'', ' ', html_fragment, flags=re.I) text = re.sub(r'<[^>]+>', ' ', text) text = unescape(text) text = text.replace('\xa0', ' ') return re.sub(r'\s+', ' ', text).strip() def parse_html_rows(table_html: str) -> list[list[str]]: rows = [] for row in re.findall(r']*>(.*?)', table_html, re.S | re.I): cols = [clean_text(c) for c in re.findall(r']*>(.*?)', row, re.S | re.I)] if cols: rows.append(cols) return rows def parse_date_range(text: str) -> tuple[date, date] | None: text = text.strip() m = re.match(r'(\d{4})\.(\d{2})\.(\d{2})~(\d{2})\.(\d{2})$', text) if not m: return None y, m1, d1, m2, d2 = map(int, m.groups()) return date(y, m1, d1), date(y, m2, d2) def parse_single_date(text: str) -> date | None: m = re.match(r'(\d{4})\.(\d{2})\.(\d{2})$', text.strip()) if not m: return None y, mm, dd = map(int, m.groups()) return date(y, mm, dd) def extract_next_data_json(html: str) -> dict: m = re.search(r'', html, re.S) if not m: return {} try: return json.loads(m.group(1)) except Exception: return {} def extract_naver_ipo_detail(detail_url: str) -> dict: html = fetch(detail_url, encoding='utf-8') data = extract_next_data_json(html) queries = data.get('props', {}).get('pageProps', {}).get('dehydratedState', {}).get('queries', []) for q in queries: result = q.get('state', {}).get('data', {}).get('result', {}) ipo_info = result.get('ipoInfo') if ipo_info: return ipo_info return {} def extract_naver_ipo_entries() -> list[dict]: html = fetch(NAVER_IPO_URL, encoding='auto') entries = [] seen = set() for m in re.finditer(r'
.*?]*>(.*?)', html, re.S): code = m.group(1).strip() detail_url = m.group(2).strip() name = clean_text(m.group(3)) if not code or code in seen or not name: continue seen.add(code) entries.append({'code': code, 'name': name, 'detail_url': detail_url}) return entries def extract_brokers_from_naver_detail(detail_url: str) -> tuple[dict, str]: html = fetch(detail_url, encoding='utf-8') data = extract_next_data_json(html) queries = data.get('props', {}).get('pageProps', {}).get('dehydratedState', {}).get('queries', []) ipo_info = {} brokers = [] for q in queries: result = q.get('state', {}).get('data', {}).get('result', {}) if isinstance(result, dict) and result.get('ipoInfo'): ipo_info = result.get('ipoInfo') or {} join_managers = result.get('joinManagers') or [] for item in join_managers: if not isinstance(item, dict): continue name = (item.get('orgNm') or '').strip() if name and name not in brokers: brokers.append(name) break return ipo_info, ','.join(brokers) def parse_subscription_events_from(cutoff: date) -> list[EventSpec]: events = [] for entry in extract_naver_ipo_entries(): info, brokers = extract_brokers_from_naver_detail(entry['detail_url']) start_raw = (info.get('poStartDate') or '').strip() end_raw = (info.get('poEndDate') or '').strip() if not start_raw or not end_raw or '미정' in start_raw or '미정' in end_raw: continue try: start_date = date.fromisoformat(start_raw) end_date = date.fromisoformat(end_raw) except ValueError: continue if end_date <= cutoff: continue events.append(EventSpec('subscription', info.get('compName') or entry['name'], start_date, end_date, brokers, entry['detail_url'])) return events def parse_listed_events_from(cutoff: date) -> list[EventSpec]: events = [] for entry in extract_naver_ipo_entries(): info, brokers = extract_brokers_from_naver_detail(entry['detail_url']) listed_raw = (info.get('lcalDate') or info.get('listingDate') or info.get('listDate') or '').strip() if not listed_raw or '미정' in listed_raw: continue try: d = date.fromisoformat(listed_raw) except ValueError: continue if d <= cutoff: continue events.append(EventSpec('listing', info.get('compName') or entry['name'], d, d, brokers, entry['detail_url'])) return events def load_state() -> dict: if STATE_FILE.exists(): try: return json.loads(STATE_FILE.read_text()) except Exception: return {} return {} def save_state(state: dict): STATE_FILE.write_text(json.dumps(state, ensure_ascii=False, indent=2)) def event_key_from_summary(summary: str) -> tuple[str, str] | None: summary = (summary or '').strip() if summary.startswith('[공모청약] '): return ('subscription', summary.replace('[공모청약] ', '', 1).strip()) if summary.startswith('[신규상장] '): return ('listing', summary.replace('[신규상장] ', '', 1).strip()) return None def fetch_existing_events(start_date: date, end_date: date) -> dict[str, dict]: start_dt = datetime(start_date.year, start_date.month, start_date.day, 0, 0, tzinfo=KST) end_dt = datetime(end_date.year, end_date.month, end_date.day, 0, 0, tzinfo=KST) out = run([ 'gog', 'calendar', 'events', CALENDAR_ID, '--from', start_dt.isoformat(), '--to', end_dt.isoformat(), '--all-pages', '--max', '250', '--json' ]) data = json.loads(out) existing: dict[str, list[dict]] = {} for ev in data.get('events', []): parsed = event_key_from_summary(ev.get('summary', '')) if not parsed: continue kind, name = parsed key = f'{kind}|{name}' existing.setdefault(key, []).append(ev) return existing def create_event(ev: EventSpec, dry_run: bool = False): start_date = ev.start_date.isoformat() end_date = (ev.end_date + timedelta(days=1)).isoformat() # Google Calendar 종일 이벤트는 end가 exclusive if dry_run: print(json.dumps({'summary': ev.summary, 'date': start_date, 'description': ev.description, 'all_day': True}, ensure_ascii=False)) return run([ 'gog', 'calendar', 'create', CALENDAR_ID, '--summary', ev.summary, '--description', ev.description, '--from', start_date, '--to', end_date, '--all-day', '--event-color', '5' if ev.kind == 'subscription' else '10' ]) def update_event(event_id: str, ev: EventSpec, dry_run: bool = False): start_date = ev.start_date.isoformat() end_date = (ev.end_date + timedelta(days=1)).isoformat() if dry_run: print(json.dumps({'update_event_id': event_id, 'summary': ev.summary, 'date': start_date, 'description': ev.description, 'all_day': True}, ensure_ascii=False)) return 'updated' try: run([ 'gog', 'calendar', 'update', CALENDAR_ID, event_id, '--summary', ev.summary, '--description', ev.description, '--from', start_date, '--to', end_date, '--all-day', '--event-color', '5' if ev.kind == 'subscription' else '10' ]) return 'updated' except Exception: run(['gog', 'calendar', 'delete', CALENDAR_ID, event_id, '--force', '--no-input']) create_event(ev, dry_run=False) return 'recreated' def delete_event(event_id: str, dry_run: bool = False): if dry_run: print(json.dumps({'delete_event_id': event_id}, ensure_ascii=False)) return run(['gog', 'calendar', 'delete', CALENDAR_ID, event_id, '--force', '--no-input']) def event_date_range(ev: dict) -> tuple[str, str]: start_info = ev.get('start', {}) end_info = ev.get('end', {}) start = start_info.get('date') or start_info.get('dateTime', '')[:10] end_excl = end_info.get('date') or end_info.get('dateTime', '')[:10] end = '' if end_excl: end = (date.fromisoformat(end_excl) - timedelta(days=1)).isoformat() return start, end def main(): parser = argparse.ArgumentParser() parser.add_argument('--dry-run', action='store_true') args = parser.parse_args() cutoff = datetime.now(KST).date() events = parse_subscription_events_from(cutoff) + parse_listed_events_from(cutoff) events.sort(key=lambda e: (e.start_date, e.kind, e.name)) state = load_state() end_date = max((ev.end_date for ev in events), default=cutoff) + timedelta(days=1) # 사이트 스크래핑 실패 시 모든 일정이 삭제되는 사고 방지 — events 비면 cleanup 스킵 existing = {} if not events else fetch_existing_events(cutoff + timedelta(days=1), end_date) created = [] updated = [] recreated = [] deleted = [] duplicates_removed = [] unchanged = 0 changes = [] event_keys = {ev.state_key for ev in events} for ev in events: existing_list = existing.get(ev.state_key, []) # 같은 state_key로 여러 건이면 가장 최근 created를 남기고 나머지 삭제 if len(existing_list) > 1: existing_list.sort(key=lambda e: e.get('created', ''), reverse=True) for dup in existing_list[1:]: delete_event(dup['id'], dry_run=args.dry_run) duplicates_removed.append(ev.state_key) old_start, old_end = event_date_range(dup) old_date = old_start if old_start == old_end or not old_end else f'{old_start}~{old_end}' changes.append({'kind': ev.kind, 'name': ev.name, 'action': 'duplicate_removed', 'event_id': dup['id'], 'old_date': old_date}) existing_ev = existing_list[0] if existing_list else None if not existing_ev: create_event(ev, dry_run=args.dry_run) created.append(ev.state_key) changes.append({'kind': ev.kind, 'name': ev.name, 'action': 'created', 'new_date': ev.start_date.isoformat() if ev.start_date == ev.end_date else f'{ev.start_date.isoformat()}~{ev.end_date.isoformat()}'}) continue existing_start, existing_end = event_date_range(existing_ev) desired_start = ev.start_date.isoformat() desired_end = ev.end_date.isoformat() existing_description = (existing_ev.get('description') or '').strip() desired_description = ev.description.strip() if existing_start != desired_start or existing_end != desired_end or existing_description != desired_description: result = update_event(existing_ev['id'], ev, dry_run=args.dry_run) if result == 'recreated': recreated.append(ev.state_key) else: updated.append(ev.state_key) old_date = existing_start if existing_start == existing_end or not existing_end else f'{existing_start}~{existing_end}' new_date = desired_start if desired_start == desired_end else f'{desired_start}~{desired_end}' changes.append({'kind': ev.kind, 'name': ev.name, 'action': result, 'old_date': old_date, 'new_date': new_date}) else: unchanged += 1 # 사이트에서 사라진 일정은 캘린더에서도 삭제 (events 비면 위에서 existing이 빈 dict라 자동으로 스킵됨) for key, existing_list in existing.items(): if key in event_keys: continue kind, name = key.split('|', 1) for stale in existing_list: delete_event(stale['id'], dry_run=args.dry_run) deleted.append(key) old_start, old_end = event_date_range(stale) old_date = old_start if old_start == old_end or not old_end else f'{old_start}~{old_end}' changes.append({'kind': kind, 'name': name, 'action': 'deleted', 'event_id': stale['id'], 'old_date': old_date}) state['last_changes'] = changes state['last_run_at'] = datetime.now(KST).isoformat() if not args.dry_run: save_state(state) print(json.dumps({'cutoff_after': cutoff.isoformat(), 'total_found': len(events), 'newly_created': len(created), 'updated': len(updated), 'recreated': len(recreated), 'deleted': len(deleted), 'duplicates_removed': len(duplicates_removed), 'unchanged': unchanged, 'dry_run': args.dry_run, 'changes': changes}, ensure_ascii=False)) if __name__ == '__main__': main()