aboutsummaryrefslogtreecommitdiff
path: root/tools/init_testdata.py
blob: 4079cdb3c42ca6637f2ae762219aed8007794cea (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
#!/usr/bin/env python3
"""
Initialize test data folders by archive signatures.

The script scans all files in --input and copies matching archives into:
  --output/nres/<relative path>
  --output/rsli/<relative path>
"""

from __future__ import annotations

import argparse
import shutil
import sys
from pathlib import Path

MAGIC_NRES = b"NRes"
MAGIC_RSLI = b"NL\x00\x01"


def is_relative_to(path: Path, base: Path) -> bool:
    try:
        path.relative_to(base)
    except ValueError:
        return False
    return True


def detect_archive_type(path: Path) -> str | None:
    try:
        with path.open("rb") as handle:
            magic = handle.read(4)
    except OSError as exc:
        print(f"[warn] cannot read {path}: {exc}", file=sys.stderr)
        return None

    if magic == MAGIC_NRES:
        return "nres"
    if magic == MAGIC_RSLI:
        return "rsli"
    return None


def scan_archives(input_root: Path, excluded_root: Path | None) -> list[tuple[Path, str]]:
    found: list[tuple[Path, str]] = []
    for path in sorted(input_root.rglob("*")):
        if not path.is_file():
            continue
        if excluded_root and is_relative_to(path.resolve(), excluded_root):
            continue

        archive_type = detect_archive_type(path)
        if archive_type:
            found.append((path, archive_type))
    return found


def confirm_overwrite(path: Path) -> str:
    prompt = (
        f"File exists: {path}\n"
        "Overwrite? [y]es / [n]o / [a]ll / [q]uit (default: n): "
    )
    while True:
        try:
            answer = input(prompt).strip().lower()
        except EOFError:
            return "quit"

        if answer in {"", "n", "no"}:
            return "no"
        if answer in {"y", "yes"}:
            return "yes"
        if answer in {"a", "all"}:
            return "all"
        if answer in {"q", "quit"}:
            return "quit"
        print("Please answer with y, n, a, or q.")


def copy_archives(
    archives: list[tuple[Path, str]],
    input_root: Path,
    output_root: Path,
    force: bool,
) -> int:
    copied = 0
    skipped = 0
    overwritten = 0
    overwrite_all = force

    type_counts = {"nres": 0, "rsli": 0}
    for _, archive_type in archives:
        type_counts[archive_type] += 1

    print(
        f"Found archives: total={len(archives)}, "
        f"nres={type_counts['nres']}, rsli={type_counts['rsli']}"
    )

    for source, archive_type in archives:
        rel_path = source.relative_to(input_root)
        destination = output_root / archive_type / rel_path
        destination.parent.mkdir(parents=True, exist_ok=True)

        if destination.exists():
            if destination.is_dir():
                print(
                    f"[error] destination is a directory, expected file: {destination}",
                    file=sys.stderr,
                )
                return 2

            if not overwrite_all:
                if not sys.stdin.isatty():
                    print(
                        "[error] destination file exists but stdin is not interactive. "
                        "Use --force to overwrite without prompts.",
                        file=sys.stderr,
                    )
                    return 2

                decision = confirm_overwrite(destination)
                if decision == "quit":
                    print("Aborted by user.")
                    return 130
                if decision == "no":
                    skipped += 1
                    continue
                if decision == "all":
                    overwrite_all = True

            overwritten += 1

        try:
            shutil.copy2(source, destination)
        except OSError as exc:
            print(f"[error] failed to copy {source} -> {destination}: {exc}", file=sys.stderr)
            return 2
        copied += 1

    print(
        f"Done: copied={copied}, overwritten={overwritten}, skipped={skipped}, "
        f"output={output_root}"
    )
    return 0


def build_parser() -> argparse.ArgumentParser:
    parser = argparse.ArgumentParser(
        description="Initialize test data by scanning NRes/RsLi signatures."
    )
    parser.add_argument(
        "--input",
        required=True,
        help="Input directory to scan recursively.",
    )
    parser.add_argument(
        "--output",
        required=True,
        help="Output root directory (archives go to nres/ and rsli/ subdirs).",
    )
    parser.add_argument(
        "--force",
        action="store_true",
        help="Overwrite destination files without confirmation prompts.",
    )
    return parser


def main() -> int:
    args = build_parser().parse_args()

    input_root = Path(args.input)
    if not input_root.exists():
        print(f"[error] input directory does not exist: {input_root}", file=sys.stderr)
        return 2
    if not input_root.is_dir():
        print(f"[error] input path is not a directory: {input_root}", file=sys.stderr)
        return 2

    output_root = Path(args.output)
    if output_root.exists() and not output_root.is_dir():
        print(f"[error] output path exists and is not a directory: {output_root}", file=sys.stderr)
        return 2

    input_resolved = input_root.resolve()
    output_resolved = output_root.resolve()
    if input_resolved == output_resolved:
        print("[error] input and output directories must be different.", file=sys.stderr)
        return 2

    excluded_root: Path | None = None
    if is_relative_to(output_resolved, input_resolved):
        excluded_root = output_resolved
        print(f"Notice: output is inside input, skipping scan under: {excluded_root}")

    archives = scan_archives(input_root, excluded_root)

    output_root.mkdir(parents=True, exist_ok=True)
    return copy_archives(archives, input_root, output_root, force=args.force)


if __name__ == "__main__":
    raise SystemExit(main())