Files
hyowons fed3526b20 Initial commit: OpenClaw 워크스페이스 버전관리 시작
설정·스크립트·스킬·문서·큐레이션 메모리 추적.
시크릿(credentials/identity)·런타임 상태(state/logs/sessions/sqlite)·
백업(clobbered/bak)·dream 캐시는 .gitignore로 제외.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-04 15:39:41 +09:00

394 lines
15 KiB
Python

#!/usr/bin/env python3
from __future__ import annotations
import argparse
import json
import re
import subprocess
import urllib.request
from dataclasses import dataclass
from urllib.parse import urljoin
from datetime import date, datetime, timedelta
from html import unescape
from pathlib import Path
from zoneinfo import ZoneInfo
KST = ZoneInfo('Asia/Seoul')
CALENDAR_ID = 'mini.snowoyh@gmail.com'
WORKSPACE = Path('/Users/snowoyh/.openclaw/agents/stock/workspace')
STATE_DIR = WORKSPACE / 'state'
STATE_DIR.mkdir(parents=True, exist_ok=True)
STATE_FILE = STATE_DIR / 'ipo_calendar_sync.json'
SUBSCRIPTION_URL = 'https://www.38.co.kr/html/fund/index.htm?o=k'
NAVER_IPO_URL = 'https://finance.naver.com/sise/ipo.naver'
SOURCE_LABEL = '네이버 금융 IPO'
@dataclass
class EventSpec:
kind: str
name: str
start_date: date
end_date: date # inclusive
brokers: str
source_url: str
@property
def summary(self) -> str:
prefix = '[공모청약]' if self.kind == 'subscription' else '[신규상장]'
return f'{prefix} {self.name}'
@property
def description(self) -> str:
label = '청약일' if self.kind == 'subscription' else '상장일'
if self.start_date == self.end_date:
date_str = self.start_date.isoformat()
else:
date_str = f'{self.start_date.isoformat()} ~ {self.end_date.isoformat()}'
return (
f'종목명: {self.name}\n'
f'증권사: {self.brokers or "미확인"}\n'
f'{label}: {date_str}\n'
f'기준: {SOURCE_LABEL}\n'
f'출처: {self.source_url}'
)
@property
def state_key(self) -> str:
return f'{self.kind}|{self.name}'
def run(cmd: list[str]) -> str:
p = subprocess.run(cmd, capture_output=True, text=True)
if p.returncode != 0:
raise RuntimeError(p.stderr.strip() or p.stdout.strip() or 'command failed')
return p.stdout
def fetch(url: str, encoding: str = 'euc-kr') -> str:
req = urllib.request.Request(url, headers={'User-Agent': 'Mozilla/5.0'})
with urllib.request.urlopen(req, timeout=30) as r:
raw = r.read()
if encoding == 'auto':
# 한글 페이지는 utf-8과 cp949 둘 중 하나 — strict 디코드 성공하는 쪽 사용 (네이버가 메타태그로 거짓말하는 경우 대비)
for enc in ('utf-8', 'cp949'):
try:
return raw.decode(enc)
except UnicodeDecodeError:
continue
return raw.decode('utf-8', errors='ignore')
return raw.decode(encoding, 'ignore')
def clean_text(html_fragment: str) -> str:
text = re.sub(r'<br\s*/?>', ' ', html_fragment, flags=re.I)
text = re.sub(r'<[^>]+>', ' ', text)
text = unescape(text)
text = text.replace('\xa0', ' ')
return re.sub(r'\s+', ' ', text).strip()
def parse_html_rows(table_html: str) -> list[list[str]]:
rows = []
for row in re.findall(r'<tr[^>]*>(.*?)</tr>', table_html, re.S | re.I):
cols = [clean_text(c) for c in re.findall(r'<t[dh][^>]*>(.*?)</t[dh]>', row, re.S | re.I)]
if cols:
rows.append(cols)
return rows
def parse_date_range(text: str) -> tuple[date, date] | None:
text = text.strip()
m = re.match(r'(\d{4})\.(\d{2})\.(\d{2})~(\d{2})\.(\d{2})$', text)
if not m:
return None
y, m1, d1, m2, d2 = map(int, m.groups())
return date(y, m1, d1), date(y, m2, d2)
def parse_single_date(text: str) -> date | None:
m = re.match(r'(\d{4})\.(\d{2})\.(\d{2})$', text.strip())
if not m:
return None
y, mm, dd = map(int, m.groups())
return date(y, mm, dd)
def extract_next_data_json(html: str) -> dict:
m = re.search(r'<script id="__NEXT_DATA__" type="application/json">(.*?)</script>', html, re.S)
if not m:
return {}
try:
return json.loads(m.group(1))
except Exception:
return {}
def extract_naver_ipo_detail(detail_url: str) -> dict:
html = fetch(detail_url, encoding='utf-8')
data = extract_next_data_json(html)
queries = data.get('props', {}).get('pageProps', {}).get('dehydratedState', {}).get('queries', [])
for q in queries:
result = q.get('state', {}).get('data', {}).get('result', {})
ipo_info = result.get('ipoInfo')
if ipo_info:
return ipo_info
return {}
def extract_naver_ipo_entries() -> list[dict]:
html = fetch(NAVER_IPO_URL, encoding='auto')
entries = []
seen = set()
for m in re.finditer(r'<div class="item_area" id="([^"]+)">.*?<a href="(https://m\.stock\.naver\.com/ipo/[^"]+)"[^>]*>(.*?)</a>', html, re.S):
code = m.group(1).strip()
detail_url = m.group(2).strip()
name = clean_text(m.group(3))
if not code or code in seen or not name:
continue
seen.add(code)
entries.append({'code': code, 'name': name, 'detail_url': detail_url})
return entries
def extract_brokers_from_naver_detail(detail_url: str) -> tuple[dict, str]:
html = fetch(detail_url, encoding='utf-8')
data = extract_next_data_json(html)
queries = data.get('props', {}).get('pageProps', {}).get('dehydratedState', {}).get('queries', [])
ipo_info = {}
brokers = []
for q in queries:
result = q.get('state', {}).get('data', {}).get('result', {})
if isinstance(result, dict) and result.get('ipoInfo'):
ipo_info = result.get('ipoInfo') or {}
join_managers = result.get('joinManagers') or []
for item in join_managers:
if not isinstance(item, dict):
continue
name = (item.get('orgNm') or '').strip()
if name and name not in brokers:
brokers.append(name)
break
return ipo_info, ','.join(brokers)
def parse_subscription_events_from(cutoff: date) -> list[EventSpec]:
events = []
for entry in extract_naver_ipo_entries():
info, brokers = extract_brokers_from_naver_detail(entry['detail_url'])
start_raw = (info.get('poStartDate') or '').strip()
end_raw = (info.get('poEndDate') or '').strip()
if not start_raw or not end_raw or '미정' in start_raw or '미정' in end_raw:
continue
try:
start_date = date.fromisoformat(start_raw)
end_date = date.fromisoformat(end_raw)
except ValueError:
continue
if end_date <= cutoff:
continue
events.append(EventSpec('subscription', info.get('compName') or entry['name'], start_date, end_date, brokers, entry['detail_url']))
return events
def parse_listed_events_from(cutoff: date) -> list[EventSpec]:
events = []
for entry in extract_naver_ipo_entries():
info, brokers = extract_brokers_from_naver_detail(entry['detail_url'])
listed_raw = (info.get('lcalDate') or info.get('listingDate') or info.get('listDate') or '').strip()
if not listed_raw or '미정' in listed_raw:
continue
try:
d = date.fromisoformat(listed_raw)
except ValueError:
continue
if d <= cutoff:
continue
events.append(EventSpec('listing', info.get('compName') or entry['name'], d, d, brokers, entry['detail_url']))
return events
def load_state() -> dict:
if STATE_FILE.exists():
try:
return json.loads(STATE_FILE.read_text())
except Exception:
return {}
return {}
def save_state(state: dict):
STATE_FILE.write_text(json.dumps(state, ensure_ascii=False, indent=2))
def event_key_from_summary(summary: str) -> tuple[str, str] | None:
summary = (summary or '').strip()
if summary.startswith('[공모청약] '):
return ('subscription', summary.replace('[공모청약] ', '', 1).strip())
if summary.startswith('[신규상장] '):
return ('listing', summary.replace('[신규상장] ', '', 1).strip())
return None
def fetch_existing_events(start_date: date, end_date: date) -> dict[str, dict]:
start_dt = datetime(start_date.year, start_date.month, start_date.day, 0, 0, tzinfo=KST)
end_dt = datetime(end_date.year, end_date.month, end_date.day, 0, 0, tzinfo=KST)
out = run([
'gog', 'calendar', 'events', CALENDAR_ID,
'--from', start_dt.isoformat(), '--to', end_dt.isoformat(),
'--all-pages', '--max', '250', '--json'
])
data = json.loads(out)
existing: dict[str, list[dict]] = {}
for ev in data.get('events', []):
parsed = event_key_from_summary(ev.get('summary', ''))
if not parsed:
continue
kind, name = parsed
key = f'{kind}|{name}'
existing.setdefault(key, []).append(ev)
return existing
def create_event(ev: EventSpec, dry_run: bool = False):
start_date = ev.start_date.isoformat()
end_date = (ev.end_date + timedelta(days=1)).isoformat() # Google Calendar 종일 이벤트는 end가 exclusive
if dry_run:
print(json.dumps({'summary': ev.summary, 'date': start_date, 'description': ev.description, 'all_day': True}, ensure_ascii=False))
return
run([
'gog', 'calendar', 'create', CALENDAR_ID,
'--summary', ev.summary,
'--description', ev.description,
'--from', start_date,
'--to', end_date,
'--all-day',
'--event-color', '5' if ev.kind == 'subscription' else '10'
])
def update_event(event_id: str, ev: EventSpec, dry_run: bool = False):
start_date = ev.start_date.isoformat()
end_date = (ev.end_date + timedelta(days=1)).isoformat()
if dry_run:
print(json.dumps({'update_event_id': event_id, 'summary': ev.summary, 'date': start_date, 'description': ev.description, 'all_day': True}, ensure_ascii=False))
return 'updated'
try:
run([
'gog', 'calendar', 'update', CALENDAR_ID, event_id,
'--summary', ev.summary,
'--description', ev.description,
'--from', start_date,
'--to', end_date,
'--all-day',
'--event-color', '5' if ev.kind == 'subscription' else '10'
])
return 'updated'
except Exception:
run(['gog', 'calendar', 'delete', CALENDAR_ID, event_id, '--force', '--no-input'])
create_event(ev, dry_run=False)
return 'recreated'
def delete_event(event_id: str, dry_run: bool = False):
if dry_run:
print(json.dumps({'delete_event_id': event_id}, ensure_ascii=False))
return
run(['gog', 'calendar', 'delete', CALENDAR_ID, event_id, '--force', '--no-input'])
def event_date_range(ev: dict) -> tuple[str, str]:
start_info = ev.get('start', {})
end_info = ev.get('end', {})
start = start_info.get('date') or start_info.get('dateTime', '')[:10]
end_excl = end_info.get('date') or end_info.get('dateTime', '')[:10]
end = ''
if end_excl:
end = (date.fromisoformat(end_excl) - timedelta(days=1)).isoformat()
return start, end
def main():
parser = argparse.ArgumentParser()
parser.add_argument('--dry-run', action='store_true')
args = parser.parse_args()
cutoff = datetime.now(KST).date()
events = parse_subscription_events_from(cutoff) + parse_listed_events_from(cutoff)
events.sort(key=lambda e: (e.start_date, e.kind, e.name))
state = load_state()
end_date = max((ev.end_date for ev in events), default=cutoff) + timedelta(days=1)
# 사이트 스크래핑 실패 시 모든 일정이 삭제되는 사고 방지 — events 비면 cleanup 스킵
existing = {} if not events else fetch_existing_events(cutoff + timedelta(days=1), end_date)
created = []
updated = []
recreated = []
deleted = []
duplicates_removed = []
unchanged = 0
changes = []
event_keys = {ev.state_key for ev in events}
for ev in events:
existing_list = existing.get(ev.state_key, [])
# 같은 state_key로 여러 건이면 가장 최근 created를 남기고 나머지 삭제
if len(existing_list) > 1:
existing_list.sort(key=lambda e: e.get('created', ''), reverse=True)
for dup in existing_list[1:]:
delete_event(dup['id'], dry_run=args.dry_run)
duplicates_removed.append(ev.state_key)
old_start, old_end = event_date_range(dup)
old_date = old_start if old_start == old_end or not old_end else f'{old_start}~{old_end}'
changes.append({'kind': ev.kind, 'name': ev.name, 'action': 'duplicate_removed', 'event_id': dup['id'], 'old_date': old_date})
existing_ev = existing_list[0] if existing_list else None
if not existing_ev:
create_event(ev, dry_run=args.dry_run)
created.append(ev.state_key)
changes.append({'kind': ev.kind, 'name': ev.name, 'action': 'created', 'new_date': ev.start_date.isoformat() if ev.start_date == ev.end_date else f'{ev.start_date.isoformat()}~{ev.end_date.isoformat()}'})
continue
existing_start, existing_end = event_date_range(existing_ev)
desired_start = ev.start_date.isoformat()
desired_end = ev.end_date.isoformat()
existing_description = (existing_ev.get('description') or '').strip()
desired_description = ev.description.strip()
if existing_start != desired_start or existing_end != desired_end or existing_description != desired_description:
result = update_event(existing_ev['id'], ev, dry_run=args.dry_run)
if result == 'recreated':
recreated.append(ev.state_key)
else:
updated.append(ev.state_key)
old_date = existing_start if existing_start == existing_end or not existing_end else f'{existing_start}~{existing_end}'
new_date = desired_start if desired_start == desired_end else f'{desired_start}~{desired_end}'
changes.append({'kind': ev.kind, 'name': ev.name, 'action': result, 'old_date': old_date, 'new_date': new_date})
else:
unchanged += 1
# 사이트에서 사라진 일정은 캘린더에서도 삭제 (events 비면 위에서 existing이 빈 dict라 자동으로 스킵됨)
for key, existing_list in existing.items():
if key in event_keys:
continue
kind, name = key.split('|', 1)
for stale in existing_list:
delete_event(stale['id'], dry_run=args.dry_run)
deleted.append(key)
old_start, old_end = event_date_range(stale)
old_date = old_start if old_start == old_end or not old_end else f'{old_start}~{old_end}'
changes.append({'kind': kind, 'name': name, 'action': 'deleted', 'event_id': stale['id'], 'old_date': old_date})
state['last_changes'] = changes
state['last_run_at'] = datetime.now(KST).isoformat()
if not args.dry_run:
save_state(state)
print(json.dumps({'cutoff_after': cutoff.isoformat(), 'total_found': len(events), 'newly_created': len(created), 'updated': len(updated), 'recreated': len(recreated), 'deleted': len(deleted), 'duplicates_removed': len(duplicates_removed), 'unchanged': unchanged, 'dry_run': args.dry_run, 'changes': changes}, ensure_ascii=False))
if __name__ == '__main__':
main()