549545bde6
설정·스크립트·스킬·문서·큐레이션 메모리 추적. 시크릿(credentials/identity)·런타임 상태(state/logs/sessions/sqlite)· 백업(clobbered/bak)·dream 캐시는 .gitignore로 제외. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
394 lines
15 KiB
Python
394 lines
15 KiB
Python
#!/usr/bin/env python3
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import json
|
|
import re
|
|
import subprocess
|
|
import urllib.request
|
|
from dataclasses import dataclass
|
|
from urllib.parse import urljoin
|
|
from datetime import date, datetime, timedelta
|
|
from html import unescape
|
|
from pathlib import Path
|
|
from zoneinfo import ZoneInfo
|
|
|
|
KST = ZoneInfo('Asia/Seoul')
|
|
CALENDAR_ID = 'mini.snowoyh@gmail.com'
|
|
WORKSPACE = Path('/Users/snowoyh/.openclaw/agents/stock/workspace')
|
|
STATE_DIR = WORKSPACE / 'state'
|
|
STATE_DIR.mkdir(parents=True, exist_ok=True)
|
|
STATE_FILE = STATE_DIR / 'ipo_calendar_sync.json'
|
|
SUBSCRIPTION_URL = 'https://www.38.co.kr/html/fund/index.htm?o=k'
|
|
NAVER_IPO_URL = 'https://finance.naver.com/sise/ipo.naver'
|
|
SOURCE_LABEL = '네이버 금융 IPO'
|
|
|
|
|
|
@dataclass
|
|
class EventSpec:
|
|
kind: str
|
|
name: str
|
|
start_date: date
|
|
end_date: date # inclusive
|
|
brokers: str
|
|
source_url: str
|
|
|
|
@property
|
|
def summary(self) -> str:
|
|
prefix = '[공모청약]' if self.kind == 'subscription' else '[신규상장]'
|
|
return f'{prefix} {self.name}'
|
|
|
|
@property
|
|
def description(self) -> str:
|
|
label = '청약일' if self.kind == 'subscription' else '상장일'
|
|
if self.start_date == self.end_date:
|
|
date_str = self.start_date.isoformat()
|
|
else:
|
|
date_str = f'{self.start_date.isoformat()} ~ {self.end_date.isoformat()}'
|
|
return (
|
|
f'종목명: {self.name}\n'
|
|
f'증권사: {self.brokers or "미확인"}\n'
|
|
f'{label}: {date_str}\n'
|
|
f'기준: {SOURCE_LABEL}\n'
|
|
f'출처: {self.source_url}'
|
|
)
|
|
|
|
@property
|
|
def state_key(self) -> str:
|
|
return f'{self.kind}|{self.name}'
|
|
|
|
|
|
def run(cmd: list[str]) -> str:
|
|
p = subprocess.run(cmd, capture_output=True, text=True)
|
|
if p.returncode != 0:
|
|
raise RuntimeError(p.stderr.strip() or p.stdout.strip() or 'command failed')
|
|
return p.stdout
|
|
|
|
|
|
def fetch(url: str, encoding: str = 'euc-kr') -> str:
|
|
req = urllib.request.Request(url, headers={'User-Agent': 'Mozilla/5.0'})
|
|
with urllib.request.urlopen(req, timeout=30) as r:
|
|
raw = r.read()
|
|
if encoding == 'auto':
|
|
# 한글 페이지는 utf-8과 cp949 둘 중 하나 — strict 디코드 성공하는 쪽 사용 (네이버가 메타태그로 거짓말하는 경우 대비)
|
|
for enc in ('utf-8', 'cp949'):
|
|
try:
|
|
return raw.decode(enc)
|
|
except UnicodeDecodeError:
|
|
continue
|
|
return raw.decode('utf-8', errors='ignore')
|
|
return raw.decode(encoding, 'ignore')
|
|
|
|
|
|
def clean_text(html_fragment: str) -> str:
|
|
text = re.sub(r'<br\s*/?>', ' ', html_fragment, flags=re.I)
|
|
text = re.sub(r'<[^>]+>', ' ', text)
|
|
text = unescape(text)
|
|
text = text.replace('\xa0', ' ')
|
|
return re.sub(r'\s+', ' ', text).strip()
|
|
|
|
|
|
def parse_html_rows(table_html: str) -> list[list[str]]:
|
|
rows = []
|
|
for row in re.findall(r'<tr[^>]*>(.*?)</tr>', table_html, re.S | re.I):
|
|
cols = [clean_text(c) for c in re.findall(r'<t[dh][^>]*>(.*?)</t[dh]>', row, re.S | re.I)]
|
|
if cols:
|
|
rows.append(cols)
|
|
return rows
|
|
|
|
|
|
def parse_date_range(text: str) -> tuple[date, date] | None:
|
|
text = text.strip()
|
|
m = re.match(r'(\d{4})\.(\d{2})\.(\d{2})~(\d{2})\.(\d{2})$', text)
|
|
if not m:
|
|
return None
|
|
y, m1, d1, m2, d2 = map(int, m.groups())
|
|
return date(y, m1, d1), date(y, m2, d2)
|
|
|
|
|
|
def parse_single_date(text: str) -> date | None:
|
|
m = re.match(r'(\d{4})\.(\d{2})\.(\d{2})$', text.strip())
|
|
if not m:
|
|
return None
|
|
y, mm, dd = map(int, m.groups())
|
|
return date(y, mm, dd)
|
|
|
|
|
|
def extract_next_data_json(html: str) -> dict:
|
|
m = re.search(r'<script id="__NEXT_DATA__" type="application/json">(.*?)</script>', html, re.S)
|
|
if not m:
|
|
return {}
|
|
try:
|
|
return json.loads(m.group(1))
|
|
except Exception:
|
|
return {}
|
|
|
|
|
|
def extract_naver_ipo_detail(detail_url: str) -> dict:
|
|
html = fetch(detail_url, encoding='utf-8')
|
|
data = extract_next_data_json(html)
|
|
queries = data.get('props', {}).get('pageProps', {}).get('dehydratedState', {}).get('queries', [])
|
|
for q in queries:
|
|
result = q.get('state', {}).get('data', {}).get('result', {})
|
|
ipo_info = result.get('ipoInfo')
|
|
if ipo_info:
|
|
return ipo_info
|
|
return {}
|
|
|
|
|
|
def extract_naver_ipo_entries() -> list[dict]:
|
|
html = fetch(NAVER_IPO_URL, encoding='auto')
|
|
entries = []
|
|
seen = set()
|
|
for m in re.finditer(r'<div class="item_area" id="([^"]+)">.*?<a href="(https://m\.stock\.naver\.com/ipo/[^"]+)"[^>]*>(.*?)</a>', html, re.S):
|
|
code = m.group(1).strip()
|
|
detail_url = m.group(2).strip()
|
|
name = clean_text(m.group(3))
|
|
if not code or code in seen or not name:
|
|
continue
|
|
seen.add(code)
|
|
entries.append({'code': code, 'name': name, 'detail_url': detail_url})
|
|
return entries
|
|
|
|
|
|
def extract_brokers_from_naver_detail(detail_url: str) -> tuple[dict, str]:
|
|
html = fetch(detail_url, encoding='utf-8')
|
|
data = extract_next_data_json(html)
|
|
queries = data.get('props', {}).get('pageProps', {}).get('dehydratedState', {}).get('queries', [])
|
|
ipo_info = {}
|
|
brokers = []
|
|
for q in queries:
|
|
result = q.get('state', {}).get('data', {}).get('result', {})
|
|
if isinstance(result, dict) and result.get('ipoInfo'):
|
|
ipo_info = result.get('ipoInfo') or {}
|
|
join_managers = result.get('joinManagers') or []
|
|
for item in join_managers:
|
|
if not isinstance(item, dict):
|
|
continue
|
|
name = (item.get('orgNm') or '').strip()
|
|
if name and name not in brokers:
|
|
brokers.append(name)
|
|
break
|
|
return ipo_info, ','.join(brokers)
|
|
|
|
|
|
def parse_subscription_events_from(cutoff: date) -> list[EventSpec]:
|
|
events = []
|
|
for entry in extract_naver_ipo_entries():
|
|
info, brokers = extract_brokers_from_naver_detail(entry['detail_url'])
|
|
start_raw = (info.get('poStartDate') or '').strip()
|
|
end_raw = (info.get('poEndDate') or '').strip()
|
|
if not start_raw or not end_raw or '미정' in start_raw or '미정' in end_raw:
|
|
continue
|
|
try:
|
|
start_date = date.fromisoformat(start_raw)
|
|
end_date = date.fromisoformat(end_raw)
|
|
except ValueError:
|
|
continue
|
|
if end_date <= cutoff:
|
|
continue
|
|
events.append(EventSpec('subscription', info.get('compName') or entry['name'], start_date, end_date, brokers, entry['detail_url']))
|
|
return events
|
|
|
|
|
|
def parse_listed_events_from(cutoff: date) -> list[EventSpec]:
|
|
events = []
|
|
for entry in extract_naver_ipo_entries():
|
|
info, brokers = extract_brokers_from_naver_detail(entry['detail_url'])
|
|
listed_raw = (info.get('lcalDate') or info.get('listingDate') or info.get('listDate') or '').strip()
|
|
if not listed_raw or '미정' in listed_raw:
|
|
continue
|
|
try:
|
|
d = date.fromisoformat(listed_raw)
|
|
except ValueError:
|
|
continue
|
|
if d <= cutoff:
|
|
continue
|
|
events.append(EventSpec('listing', info.get('compName') or entry['name'], d, d, brokers, entry['detail_url']))
|
|
return events
|
|
|
|
|
|
def load_state() -> dict:
|
|
if STATE_FILE.exists():
|
|
try:
|
|
return json.loads(STATE_FILE.read_text())
|
|
except Exception:
|
|
return {}
|
|
return {}
|
|
|
|
|
|
def save_state(state: dict):
|
|
STATE_FILE.write_text(json.dumps(state, ensure_ascii=False, indent=2))
|
|
|
|
|
|
def event_key_from_summary(summary: str) -> tuple[str, str] | None:
|
|
summary = (summary or '').strip()
|
|
if summary.startswith('[공모청약] '):
|
|
return ('subscription', summary.replace('[공모청약] ', '', 1).strip())
|
|
if summary.startswith('[신규상장] '):
|
|
return ('listing', summary.replace('[신규상장] ', '', 1).strip())
|
|
return None
|
|
|
|
|
|
def fetch_existing_events(start_date: date, end_date: date) -> dict[str, dict]:
|
|
start_dt = datetime(start_date.year, start_date.month, start_date.day, 0, 0, tzinfo=KST)
|
|
end_dt = datetime(end_date.year, end_date.month, end_date.day, 0, 0, tzinfo=KST)
|
|
out = run([
|
|
'gog', 'calendar', 'events', CALENDAR_ID,
|
|
'--from', start_dt.isoformat(), '--to', end_dt.isoformat(),
|
|
'--all-pages', '--max', '250', '--json'
|
|
])
|
|
data = json.loads(out)
|
|
existing: dict[str, list[dict]] = {}
|
|
for ev in data.get('events', []):
|
|
parsed = event_key_from_summary(ev.get('summary', ''))
|
|
if not parsed:
|
|
continue
|
|
kind, name = parsed
|
|
key = f'{kind}|{name}'
|
|
existing.setdefault(key, []).append(ev)
|
|
return existing
|
|
|
|
|
|
def create_event(ev: EventSpec, dry_run: bool = False):
|
|
start_date = ev.start_date.isoformat()
|
|
end_date = (ev.end_date + timedelta(days=1)).isoformat() # Google Calendar 종일 이벤트는 end가 exclusive
|
|
if dry_run:
|
|
print(json.dumps({'summary': ev.summary, 'date': start_date, 'description': ev.description, 'all_day': True}, ensure_ascii=False))
|
|
return
|
|
run([
|
|
'gog', 'calendar', 'create', CALENDAR_ID,
|
|
'--summary', ev.summary,
|
|
'--description', ev.description,
|
|
'--from', start_date,
|
|
'--to', end_date,
|
|
'--all-day',
|
|
'--event-color', '5' if ev.kind == 'subscription' else '10'
|
|
])
|
|
|
|
|
|
def update_event(event_id: str, ev: EventSpec, dry_run: bool = False):
|
|
start_date = ev.start_date.isoformat()
|
|
end_date = (ev.end_date + timedelta(days=1)).isoformat()
|
|
if dry_run:
|
|
print(json.dumps({'update_event_id': event_id, 'summary': ev.summary, 'date': start_date, 'description': ev.description, 'all_day': True}, ensure_ascii=False))
|
|
return 'updated'
|
|
try:
|
|
run([
|
|
'gog', 'calendar', 'update', CALENDAR_ID, event_id,
|
|
'--summary', ev.summary,
|
|
'--description', ev.description,
|
|
'--from', start_date,
|
|
'--to', end_date,
|
|
'--all-day',
|
|
'--event-color', '5' if ev.kind == 'subscription' else '10'
|
|
])
|
|
return 'updated'
|
|
except Exception:
|
|
run(['gog', 'calendar', 'delete', CALENDAR_ID, event_id, '--force', '--no-input'])
|
|
create_event(ev, dry_run=False)
|
|
return 'recreated'
|
|
|
|
|
|
def delete_event(event_id: str, dry_run: bool = False):
|
|
if dry_run:
|
|
print(json.dumps({'delete_event_id': event_id}, ensure_ascii=False))
|
|
return
|
|
run(['gog', 'calendar', 'delete', CALENDAR_ID, event_id, '--force', '--no-input'])
|
|
|
|
|
|
def event_date_range(ev: dict) -> tuple[str, str]:
|
|
start_info = ev.get('start', {})
|
|
end_info = ev.get('end', {})
|
|
start = start_info.get('date') or start_info.get('dateTime', '')[:10]
|
|
end_excl = end_info.get('date') or end_info.get('dateTime', '')[:10]
|
|
end = ''
|
|
if end_excl:
|
|
end = (date.fromisoformat(end_excl) - timedelta(days=1)).isoformat()
|
|
return start, end
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser()
|
|
parser.add_argument('--dry-run', action='store_true')
|
|
args = parser.parse_args()
|
|
|
|
cutoff = datetime.now(KST).date()
|
|
events = parse_subscription_events_from(cutoff) + parse_listed_events_from(cutoff)
|
|
events.sort(key=lambda e: (e.start_date, e.kind, e.name))
|
|
|
|
state = load_state()
|
|
end_date = max((ev.end_date for ev in events), default=cutoff) + timedelta(days=1)
|
|
# 사이트 스크래핑 실패 시 모든 일정이 삭제되는 사고 방지 — events 비면 cleanup 스킵
|
|
existing = {} if not events else fetch_existing_events(cutoff + timedelta(days=1), end_date)
|
|
created = []
|
|
updated = []
|
|
recreated = []
|
|
deleted = []
|
|
duplicates_removed = []
|
|
unchanged = 0
|
|
changes = []
|
|
|
|
event_keys = {ev.state_key for ev in events}
|
|
|
|
for ev in events:
|
|
existing_list = existing.get(ev.state_key, [])
|
|
# 같은 state_key로 여러 건이면 가장 최근 created를 남기고 나머지 삭제
|
|
if len(existing_list) > 1:
|
|
existing_list.sort(key=lambda e: e.get('created', ''), reverse=True)
|
|
for dup in existing_list[1:]:
|
|
delete_event(dup['id'], dry_run=args.dry_run)
|
|
duplicates_removed.append(ev.state_key)
|
|
old_start, old_end = event_date_range(dup)
|
|
old_date = old_start if old_start == old_end or not old_end else f'{old_start}~{old_end}'
|
|
changes.append({'kind': ev.kind, 'name': ev.name, 'action': 'duplicate_removed', 'event_id': dup['id'], 'old_date': old_date})
|
|
|
|
existing_ev = existing_list[0] if existing_list else None
|
|
|
|
if not existing_ev:
|
|
create_event(ev, dry_run=args.dry_run)
|
|
created.append(ev.state_key)
|
|
changes.append({'kind': ev.kind, 'name': ev.name, 'action': 'created', 'new_date': ev.start_date.isoformat() if ev.start_date == ev.end_date else f'{ev.start_date.isoformat()}~{ev.end_date.isoformat()}'})
|
|
continue
|
|
|
|
existing_start, existing_end = event_date_range(existing_ev)
|
|
desired_start = ev.start_date.isoformat()
|
|
desired_end = ev.end_date.isoformat()
|
|
existing_description = (existing_ev.get('description') or '').strip()
|
|
desired_description = ev.description.strip()
|
|
|
|
if existing_start != desired_start or existing_end != desired_end or existing_description != desired_description:
|
|
result = update_event(existing_ev['id'], ev, dry_run=args.dry_run)
|
|
if result == 'recreated':
|
|
recreated.append(ev.state_key)
|
|
else:
|
|
updated.append(ev.state_key)
|
|
old_date = existing_start if existing_start == existing_end or not existing_end else f'{existing_start}~{existing_end}'
|
|
new_date = desired_start if desired_start == desired_end else f'{desired_start}~{desired_end}'
|
|
changes.append({'kind': ev.kind, 'name': ev.name, 'action': result, 'old_date': old_date, 'new_date': new_date})
|
|
else:
|
|
unchanged += 1
|
|
|
|
# 사이트에서 사라진 일정은 캘린더에서도 삭제 (events 비면 위에서 existing이 빈 dict라 자동으로 스킵됨)
|
|
for key, existing_list in existing.items():
|
|
if key in event_keys:
|
|
continue
|
|
kind, name = key.split('|', 1)
|
|
for stale in existing_list:
|
|
delete_event(stale['id'], dry_run=args.dry_run)
|
|
deleted.append(key)
|
|
old_start, old_end = event_date_range(stale)
|
|
old_date = old_start if old_start == old_end or not old_end else f'{old_start}~{old_end}'
|
|
changes.append({'kind': kind, 'name': name, 'action': 'deleted', 'event_id': stale['id'], 'old_date': old_date})
|
|
|
|
state['last_changes'] = changes
|
|
state['last_run_at'] = datetime.now(KST).isoformat()
|
|
if not args.dry_run:
|
|
save_state(state)
|
|
|
|
print(json.dumps({'cutoff_after': cutoff.isoformat(), 'total_found': len(events), 'newly_created': len(created), 'updated': len(updated), 'recreated': len(recreated), 'deleted': len(deleted), 'duplicates_removed': len(duplicates_removed), 'unchanged': unchanged, 'dry_run': args.dry_run, 'changes': changes}, ensure_ascii=False))
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main()
|