openclaw-home-pc/workspace/skills/daily-stock-analysis/scripts/report_manager.py
2026-03-21 15:31:06 +08:00

190 lines
5.9 KiB
Python

#!/usr/bin/env python3
"""Deterministic report path and migration manager for daily-stock-analysis."""
from __future__ import annotations
import argparse
import json
import os
import shutil
from datetime import date
from typing import Dict, List
from _report_utils import (
FILENAME_RE,
canonical_reports_dir,
discover_reports,
is_within_workdir,
)
def _same_day_versions_in_canonical(
reports: List, reports_dir: str, run_date: str, ticker_upper: str
) -> List[int]:
versions = []
for report in reports:
if report.run_date != run_date:
continue
if report.ticker != ticker_upper:
continue
if os.path.dirname(report.path) != reports_dir:
continue
versions.append(report.version)
return versions
def plan_output(
workdir: str,
ticker: str,
run_date: str,
versioning: str,
unattended: bool,
history_limit: int,
) -> Dict[str, object]:
root = os.path.abspath(workdir)
ticker_upper = ticker.upper()
reports_dir = canonical_reports_dir(root)
os.makedirs(reports_dir, exist_ok=True)
reports = discover_reports(root, ticker_upper)
history_files = [r.path for r in reports[:history_limit]]
legacy_files = [r.path for r in reports if not r.in_canonical_dir]
base_name = f"{run_date}-{ticker_upper}-analysis.md"
base_path = os.path.join(reports_dir, base_name)
base_exists = os.path.exists(base_path)
requires_user_choice = False
selected_mode = "new_file"
selected_path = base_path
if base_exists:
if versioning == "overwrite":
selected_mode = "overwrite"
elif versioning == "new_version":
selected_mode = "new_version"
else:
if unattended:
selected_mode = "new_version"
else:
selected_mode = "new_version"
requires_user_choice = True
if selected_mode == "new_version":
versions = _same_day_versions_in_canonical(
reports, reports_dir, run_date, ticker_upper
)
next_version = max(versions or [1]) + 1
selected_path = os.path.join(
reports_dir, f"{run_date}-{ticker_upper}-analysis-v{next_version}.md"
)
return {
"ticker": ticker_upper,
"workdir": root,
"reports_dir": reports_dir,
"base_output_file": base_path,
"selected_output_file": selected_path,
"selected_versioning_mode": selected_mode,
"requires_user_choice": requires_user_choice,
"history_files": history_files,
"legacy_files": legacy_files,
"history_limit": history_limit,
"security_scope": "working_directory_only",
}
def migrate_files(workdir: str, files: List[str]) -> Dict[str, object]:
root = os.path.abspath(workdir)
reports_dir = canonical_reports_dir(root)
os.makedirs(reports_dir, exist_ok=True)
moved = []
skipped = []
for raw_path in files:
src = os.path.abspath(raw_path)
if not is_within_workdir(src, root):
skipped.append({"file": src, "reason": "outside_workdir"})
continue
if not os.path.isfile(src):
skipped.append({"file": src, "reason": "not_file"})
continue
if os.path.islink(src):
skipped.append({"file": src, "reason": "symlink_not_allowed"})
continue
if not FILENAME_RE.match(os.path.basename(src)):
skipped.append({"file": src, "reason": "filename_not_supported"})
continue
dst = os.path.join(reports_dir, os.path.basename(src))
if os.path.abspath(src) == os.path.abspath(dst):
skipped.append({"file": src, "reason": "already_in_reports_dir"})
continue
if os.path.exists(dst):
# Keep migration deterministic and non-destructive.
skipped.append({"file": src, "reason": "target_exists"})
continue
try:
shutil.move(src, dst)
except OSError as exc:
skipped.append({"file": src, "reason": f"move_failed:{exc}"})
continue
moved.append({"from": src, "to": dst})
return {
"reports_dir": reports_dir,
"moved": moved,
"skipped": skipped,
"security_scope": "working_directory_only",
}
def _parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description="Manage report paths and migrations.")
subparsers = parser.add_subparsers(dest="command", required=True)
plan_parser = subparsers.add_parser("plan", help="Plan output path and history usage.")
plan_parser.add_argument("--workdir", default=os.getcwd())
plan_parser.add_argument("--ticker", required=True)
plan_parser.add_argument("--run-date", default=date.today().isoformat())
plan_parser.add_argument(
"--versioning",
choices=["auto", "overwrite", "new_version"],
default="auto",
)
plan_parser.add_argument("--unattended", action="store_true")
plan_parser.add_argument("--history-limit", type=int, default=5)
migrate_parser = subparsers.add_parser(
"migrate", help="Move legacy report files into canonical reports directory."
)
migrate_parser.add_argument("--workdir", default=os.getcwd())
migrate_parser.add_argument("--file", action="append", required=True)
return parser.parse_args()
def main() -> int:
args = _parse_args()
if args.command == "plan":
result = plan_output(
workdir=args.workdir,
ticker=args.ticker,
run_date=args.run_date,
versioning=args.versioning,
unattended=args.unattended,
history_limit=max(args.history_limit, 1),
)
else:
result = migrate_files(workdir=args.workdir, files=args.file)
print(json.dumps(result, indent=2, ensure_ascii=True))
return 0
if __name__ == "__main__":
raise SystemExit(main())