每日备份 2026-03-27

This commit is contained in:
OpenClaw Backup
2026-03-27 23:38:45 +08:00
parent 4f11cd7b03
commit d09281e48c
827 changed files with 6991 additions and 148648 deletions
@@ -0,0 +1,162 @@
#!/usr/bin/env python3
"""Shared helpers for daily-stock-analysis scripts."""
from __future__ import annotations
import os
import re
from dataclasses import dataclass
from datetime import date
from typing import Dict, List, Optional
FILENAME_RE = re.compile(
r"^(?P<run_date>\d{4}-\d{2}-\d{2})-(?P<ticker>[A-Za-z0-9._-]+)-analysis(?:-v(?P<version>\d+))?\.md$",
re.IGNORECASE,
)
@dataclass(frozen=True)
class ReportFile:
path: str
run_date: str
ticker: str
version: int
in_canonical_dir: bool
def canonical_reports_dir(workdir: str) -> str:
return os.path.join(os.path.abspath(workdir), "daily-stock-analysis", "reports")
def compatible_dirs(workdir: str) -> List[str]:
root = os.path.abspath(workdir)
return [
canonical_reports_dir(root),
os.path.join(root, "daily-stock-analysis"),
root,
]
def is_within_workdir(path: str, workdir: str) -> bool:
root = os.path.realpath(os.path.abspath(workdir))
target = os.path.realpath(os.path.abspath(path))
return target == root or target.startswith(root + os.sep)
def parse_filename(name: str) -> Optional[Dict[str, str]]:
match = FILENAME_RE.match(name)
if not match:
return None
return {
"run_date": match.group("run_date"),
"ticker": match.group("ticker").upper(),
"version": str(int(match.group("version") or "1")),
}
def discover_reports(workdir: str, ticker: str) -> List[ReportFile]:
root = os.path.abspath(workdir)
ticker_upper = ticker.upper()
canonical_dir = canonical_reports_dir(root)
seen = set()
records: List[ReportFile] = []
for directory in compatible_dirs(root):
if not is_within_workdir(directory, root):
continue
if not os.path.isdir(directory):
continue
for entry in os.scandir(directory):
# Never follow symlinks for safety/privacy.
if not entry.is_file(follow_symlinks=False):
continue
parsed = parse_filename(entry.name)
if not parsed:
continue
if parsed["ticker"] != ticker_upper:
continue
abs_path = os.path.abspath(entry.path)
real_path = os.path.realpath(abs_path)
if real_path in seen:
continue
seen.add(real_path)
records.append(
ReportFile(
path=abs_path,
run_date=parsed["run_date"],
ticker=parsed["ticker"],
version=int(parsed["version"]),
in_canonical_dir=os.path.dirname(abs_path) == canonical_dir,
)
)
def sort_key(record: ReportFile):
try:
d = date.fromisoformat(record.run_date)
except ValueError:
d = date.min
return (d, record.version, 1 if record.in_canonical_dir else 0)
return sorted(records, key=sort_key, reverse=True)
def read_frontmatter(path: str) -> Dict[str, str]:
try:
with open(path, "r", encoding="utf-8") as f:
first_line = f.readline()
if first_line.strip() != "---":
return {}
# Read only a bounded header section to avoid loading large files.
frontmatter: Dict[str, str] = {}
total_chars = len(first_line)
for _ in range(200):
line = f.readline()
if not line:
break
total_chars += len(line)
if total_chars > 64 * 1024:
break
raw = line.rstrip("\n")
if raw.strip() == "---":
break
if not raw.strip():
continue
if raw.startswith(" - "):
continue
if ":" not in raw:
continue
key, value = raw.split(":", 1)
frontmatter[key.strip()] = value.strip()
return frontmatter
except (OSError, UnicodeDecodeError):
return {}
def parse_float(value: Optional[str]) -> Optional[float]:
if value is None:
return None
text = value.strip()
if not text:
return None
if text.upper() in {"N/A", "NA", "NONE", "NULL", "PENDING"}:
return None
text = text.replace(",", "")
if text.endswith("%"):
text = text[:-1]
try:
return float(text)
except ValueError:
return None
def parse_bool(value: Optional[str]) -> Optional[bool]:
if value is None:
return None
text = value.strip().lower()
if text in {"true", "yes", "1"}:
return True
if text in {"false", "no", "0"}:
return False
return None
@@ -0,0 +1,122 @@
#!/usr/bin/env python3
"""Compute rolling forecast accuracy from existing report files."""
from __future__ import annotations
import argparse
import json
import os
from statistics import mean
from typing import Dict, List
from _report_utils import discover_reports, parse_bool, parse_float, read_frontmatter
def _window_list(text: str) -> List[int]:
windows = []
for item in text.split(","):
item = item.strip()
if not item:
continue
value = int(item)
if value <= 0:
continue
if value not in windows:
windows.append(value)
return windows or [1, 3, 7, 30]
def _build_review_rows(workdir: str, ticker: str, history_limit: int) -> List[Dict[str, object]]:
reports = discover_reports(workdir, ticker)[:history_limit]
rows: List[Dict[str, object]] = []
seen_run_date = set()
for report in reports:
# Keep the newest report for each run_date to avoid same-day duplicate counting.
if report.run_date in seen_run_date:
continue
frontmatter = read_frontmatter(report.path)
ape = parse_float(frontmatter.get("APE"))
strict = parse_bool(frontmatter.get("strict_hit"))
loose = parse_bool(frontmatter.get("loose_hit"))
if strict is None and ape is not None:
strict = ape <= 1.0
if loose is None and ape is not None:
loose = ape <= 2.0
if ape is None and strict is None and loose is None:
continue
rows.append(
{
"run_date": report.run_date,
"path": report.path,
"ape": ape,
"strict_hit": strict,
"loose_hit": loose,
}
)
seen_run_date.add(report.run_date)
return rows
def _rate(hit_count: int, total: int):
if total == 0:
return None
return round(hit_count * 100.0 / total, 2)
def compute_accuracy(workdir: str, ticker: str, windows: List[int], history_limit: int) -> Dict[str, object]:
rows = _build_review_rows(workdir, ticker, history_limit)
metrics = {}
for window in windows:
sample = rows[:window]
n = len(sample)
strict_hits = sum(1 for r in sample if r["strict_hit"] is True)
loose_hits = sum(1 for r in sample if r["loose_hit"] is True)
ape_values = [r["ape"] for r in sample if isinstance(r["ape"], float)]
metrics[str(window)] = {
"n": n,
"strict_rate_percent": _rate(strict_hits, n),
"loose_rate_percent": _rate(loose_hits, n),
"avg_ape_percent": round(mean(ape_values), 4) if ape_values else None,
}
latest = rows[0] if rows else None
return {
"ticker": ticker.upper(),
"workdir": os.path.abspath(workdir),
"windows": metrics,
"review_samples": len(rows),
"latest_review": latest,
"status": "ok" if rows else "insufficient_history",
"security_scope": "working_directory_only",
}
def _parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description="Calculate rolling forecast accuracy.")
parser.add_argument("--workdir", default=os.getcwd())
parser.add_argument("--ticker", required=True)
parser.add_argument("--windows", default="1,3,7,30")
parser.add_argument("--history-limit", type=int, default=60)
return parser.parse_args()
def main() -> int:
args = _parse_args()
result = compute_accuracy(
workdir=args.workdir,
ticker=args.ticker,
windows=_window_list(args.windows),
history_limit=max(args.history_limit, 1),
)
print(json.dumps(result, indent=2, ensure_ascii=True))
return 0
if __name__ == "__main__":
raise SystemExit(main())
@@ -0,0 +1,189 @@
#!/usr/bin/env python3
"""Deterministic report path and migration manager for daily-stock-analysis."""
from __future__ import annotations
import argparse
import json
import os
import shutil
from datetime import date
from typing import Dict, List
from _report_utils import (
FILENAME_RE,
canonical_reports_dir,
discover_reports,
is_within_workdir,
)
def _same_day_versions_in_canonical(
reports: List, reports_dir: str, run_date: str, ticker_upper: str
) -> List[int]:
versions = []
for report in reports:
if report.run_date != run_date:
continue
if report.ticker != ticker_upper:
continue
if os.path.dirname(report.path) != reports_dir:
continue
versions.append(report.version)
return versions
def plan_output(
workdir: str,
ticker: str,
run_date: str,
versioning: str,
unattended: bool,
history_limit: int,
) -> Dict[str, object]:
root = os.path.abspath(workdir)
ticker_upper = ticker.upper()
reports_dir = canonical_reports_dir(root)
os.makedirs(reports_dir, exist_ok=True)
reports = discover_reports(root, ticker_upper)
history_files = [r.path for r in reports[:history_limit]]
legacy_files = [r.path for r in reports if not r.in_canonical_dir]
base_name = f"{run_date}-{ticker_upper}-analysis.md"
base_path = os.path.join(reports_dir, base_name)
base_exists = os.path.exists(base_path)
requires_user_choice = False
selected_mode = "new_file"
selected_path = base_path
if base_exists:
if versioning == "overwrite":
selected_mode = "overwrite"
elif versioning == "new_version":
selected_mode = "new_version"
else:
if unattended:
selected_mode = "new_version"
else:
selected_mode = "new_version"
requires_user_choice = True
if selected_mode == "new_version":
versions = _same_day_versions_in_canonical(
reports, reports_dir, run_date, ticker_upper
)
next_version = max(versions or [1]) + 1
selected_path = os.path.join(
reports_dir, f"{run_date}-{ticker_upper}-analysis-v{next_version}.md"
)
return {
"ticker": ticker_upper,
"workdir": root,
"reports_dir": reports_dir,
"base_output_file": base_path,
"selected_output_file": selected_path,
"selected_versioning_mode": selected_mode,
"requires_user_choice": requires_user_choice,
"history_files": history_files,
"legacy_files": legacy_files,
"history_limit": history_limit,
"security_scope": "working_directory_only",
}
def migrate_files(workdir: str, files: List[str]) -> Dict[str, object]:
root = os.path.abspath(workdir)
reports_dir = canonical_reports_dir(root)
os.makedirs(reports_dir, exist_ok=True)
moved = []
skipped = []
for raw_path in files:
src = os.path.abspath(raw_path)
if not is_within_workdir(src, root):
skipped.append({"file": src, "reason": "outside_workdir"})
continue
if not os.path.isfile(src):
skipped.append({"file": src, "reason": "not_file"})
continue
if os.path.islink(src):
skipped.append({"file": src, "reason": "symlink_not_allowed"})
continue
if not FILENAME_RE.match(os.path.basename(src)):
skipped.append({"file": src, "reason": "filename_not_supported"})
continue
dst = os.path.join(reports_dir, os.path.basename(src))
if os.path.abspath(src) == os.path.abspath(dst):
skipped.append({"file": src, "reason": "already_in_reports_dir"})
continue
if os.path.exists(dst):
# Keep migration deterministic and non-destructive.
skipped.append({"file": src, "reason": "target_exists"})
continue
try:
shutil.move(src, dst)
except OSError as exc:
skipped.append({"file": src, "reason": f"move_failed:{exc}"})
continue
moved.append({"from": src, "to": dst})
return {
"reports_dir": reports_dir,
"moved": moved,
"skipped": skipped,
"security_scope": "working_directory_only",
}
def _parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description="Manage report paths and migrations.")
subparsers = parser.add_subparsers(dest="command", required=True)
plan_parser = subparsers.add_parser("plan", help="Plan output path and history usage.")
plan_parser.add_argument("--workdir", default=os.getcwd())
plan_parser.add_argument("--ticker", required=True)
plan_parser.add_argument("--run-date", default=date.today().isoformat())
plan_parser.add_argument(
"--versioning",
choices=["auto", "overwrite", "new_version"],
default="auto",
)
plan_parser.add_argument("--unattended", action="store_true")
plan_parser.add_argument("--history-limit", type=int, default=5)
migrate_parser = subparsers.add_parser(
"migrate", help="Move legacy report files into canonical reports directory."
)
migrate_parser.add_argument("--workdir", default=os.getcwd())
migrate_parser.add_argument("--file", action="append", required=True)
return parser.parse_args()
def main() -> int:
args = _parse_args()
if args.command == "plan":
result = plan_output(
workdir=args.workdir,
ticker=args.ticker,
run_date=args.run_date,
versioning=args.versioning,
unattended=args.unattended,
history_limit=max(args.history_limit, 1),
)
else:
result = migrate_files(workdir=args.workdir, files=args.file)
print(json.dumps(result, indent=2, ensure_ascii=True))
return 0
if __name__ == "__main__":
raise SystemExit(main())