190 lines
5.9 KiB
Python
190 lines
5.9 KiB
Python
#!/usr/bin/env python3
|
|
"""Deterministic report path and migration manager for daily-stock-analysis."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import json
|
|
import os
|
|
import shutil
|
|
from datetime import date
|
|
from typing import Dict, List
|
|
|
|
from _report_utils import (
|
|
FILENAME_RE,
|
|
canonical_reports_dir,
|
|
discover_reports,
|
|
is_within_workdir,
|
|
)
|
|
|
|
|
|
def _same_day_versions_in_canonical(
|
|
reports: List, reports_dir: str, run_date: str, ticker_upper: str
|
|
) -> List[int]:
|
|
versions = []
|
|
for report in reports:
|
|
if report.run_date != run_date:
|
|
continue
|
|
if report.ticker != ticker_upper:
|
|
continue
|
|
if os.path.dirname(report.path) != reports_dir:
|
|
continue
|
|
versions.append(report.version)
|
|
return versions
|
|
|
|
|
|
def plan_output(
|
|
workdir: str,
|
|
ticker: str,
|
|
run_date: str,
|
|
versioning: str,
|
|
unattended: bool,
|
|
history_limit: int,
|
|
) -> Dict[str, object]:
|
|
root = os.path.abspath(workdir)
|
|
ticker_upper = ticker.upper()
|
|
reports_dir = canonical_reports_dir(root)
|
|
os.makedirs(reports_dir, exist_ok=True)
|
|
|
|
reports = discover_reports(root, ticker_upper)
|
|
history_files = [r.path for r in reports[:history_limit]]
|
|
legacy_files = [r.path for r in reports if not r.in_canonical_dir]
|
|
|
|
base_name = f"{run_date}-{ticker_upper}-analysis.md"
|
|
base_path = os.path.join(reports_dir, base_name)
|
|
base_exists = os.path.exists(base_path)
|
|
|
|
requires_user_choice = False
|
|
selected_mode = "new_file"
|
|
selected_path = base_path
|
|
|
|
if base_exists:
|
|
if versioning == "overwrite":
|
|
selected_mode = "overwrite"
|
|
elif versioning == "new_version":
|
|
selected_mode = "new_version"
|
|
else:
|
|
if unattended:
|
|
selected_mode = "new_version"
|
|
else:
|
|
selected_mode = "new_version"
|
|
requires_user_choice = True
|
|
|
|
if selected_mode == "new_version":
|
|
versions = _same_day_versions_in_canonical(
|
|
reports, reports_dir, run_date, ticker_upper
|
|
)
|
|
next_version = max(versions or [1]) + 1
|
|
selected_path = os.path.join(
|
|
reports_dir, f"{run_date}-{ticker_upper}-analysis-v{next_version}.md"
|
|
)
|
|
|
|
return {
|
|
"ticker": ticker_upper,
|
|
"workdir": root,
|
|
"reports_dir": reports_dir,
|
|
"base_output_file": base_path,
|
|
"selected_output_file": selected_path,
|
|
"selected_versioning_mode": selected_mode,
|
|
"requires_user_choice": requires_user_choice,
|
|
"history_files": history_files,
|
|
"legacy_files": legacy_files,
|
|
"history_limit": history_limit,
|
|
"security_scope": "working_directory_only",
|
|
}
|
|
|
|
|
|
def migrate_files(workdir: str, files: List[str]) -> Dict[str, object]:
|
|
root = os.path.abspath(workdir)
|
|
reports_dir = canonical_reports_dir(root)
|
|
os.makedirs(reports_dir, exist_ok=True)
|
|
|
|
moved = []
|
|
skipped = []
|
|
|
|
for raw_path in files:
|
|
src = os.path.abspath(raw_path)
|
|
if not is_within_workdir(src, root):
|
|
skipped.append({"file": src, "reason": "outside_workdir"})
|
|
continue
|
|
if not os.path.isfile(src):
|
|
skipped.append({"file": src, "reason": "not_file"})
|
|
continue
|
|
if os.path.islink(src):
|
|
skipped.append({"file": src, "reason": "symlink_not_allowed"})
|
|
continue
|
|
if not FILENAME_RE.match(os.path.basename(src)):
|
|
skipped.append({"file": src, "reason": "filename_not_supported"})
|
|
continue
|
|
|
|
dst = os.path.join(reports_dir, os.path.basename(src))
|
|
if os.path.abspath(src) == os.path.abspath(dst):
|
|
skipped.append({"file": src, "reason": "already_in_reports_dir"})
|
|
continue
|
|
|
|
if os.path.exists(dst):
|
|
# Keep migration deterministic and non-destructive.
|
|
skipped.append({"file": src, "reason": "target_exists"})
|
|
continue
|
|
|
|
try:
|
|
shutil.move(src, dst)
|
|
except OSError as exc:
|
|
skipped.append({"file": src, "reason": f"move_failed:{exc}"})
|
|
continue
|
|
|
|
moved.append({"from": src, "to": dst})
|
|
|
|
return {
|
|
"reports_dir": reports_dir,
|
|
"moved": moved,
|
|
"skipped": skipped,
|
|
"security_scope": "working_directory_only",
|
|
}
|
|
|
|
|
|
def _parse_args() -> argparse.Namespace:
|
|
parser = argparse.ArgumentParser(description="Manage report paths and migrations.")
|
|
subparsers = parser.add_subparsers(dest="command", required=True)
|
|
|
|
plan_parser = subparsers.add_parser("plan", help="Plan output path and history usage.")
|
|
plan_parser.add_argument("--workdir", default=os.getcwd())
|
|
plan_parser.add_argument("--ticker", required=True)
|
|
plan_parser.add_argument("--run-date", default=date.today().isoformat())
|
|
plan_parser.add_argument(
|
|
"--versioning",
|
|
choices=["auto", "overwrite", "new_version"],
|
|
default="auto",
|
|
)
|
|
plan_parser.add_argument("--unattended", action="store_true")
|
|
plan_parser.add_argument("--history-limit", type=int, default=5)
|
|
|
|
migrate_parser = subparsers.add_parser(
|
|
"migrate", help="Move legacy report files into canonical reports directory."
|
|
)
|
|
migrate_parser.add_argument("--workdir", default=os.getcwd())
|
|
migrate_parser.add_argument("--file", action="append", required=True)
|
|
|
|
return parser.parse_args()
|
|
|
|
|
|
def main() -> int:
|
|
args = _parse_args()
|
|
if args.command == "plan":
|
|
result = plan_output(
|
|
workdir=args.workdir,
|
|
ticker=args.ticker,
|
|
run_date=args.run_date,
|
|
versioning=args.versioning,
|
|
unattended=args.unattended,
|
|
history_limit=max(args.history_limit, 1),
|
|
)
|
|
else:
|
|
result = migrate_files(workdir=args.workdir, files=args.file)
|
|
print(json.dumps(result, indent=2, ensure_ascii=True))
|
|
return 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
raise SystemExit(main())
|