aboutsummaryrefslogtreecommitdiff
path: root/tools/archive_roundtrip_validator.py
diff options
context:
space:
mode:
Diffstat (limited to 'tools/archive_roundtrip_validator.py')
-rw-r--r--tools/archive_roundtrip_validator.py944
1 files changed, 0 insertions, 944 deletions
diff --git a/tools/archive_roundtrip_validator.py b/tools/archive_roundtrip_validator.py
deleted file mode 100644
index 073fd9b..0000000
--- a/tools/archive_roundtrip_validator.py
+++ /dev/null
@@ -1,944 +0,0 @@
-#!/usr/bin/env python3
-"""
-Roundtrip tools for NRes and RsLi archives.
-
-The script can:
-1) scan archives by header signature (ignores file extensions),
-2) unpack / pack NRes archives,
-3) unpack / pack RsLi archives,
-4) validate docs assumptions by full roundtrip and byte-to-byte comparison.
-"""
-
-from __future__ import annotations
-
-import argparse
-import hashlib
-import json
-import re
-import shutil
-import struct
-import tempfile
-import zlib
-from pathlib import Path
-from typing import Any
-
-MAGIC_NRES = b"NRes"
-MAGIC_RSLI = b"NL\x00\x01"
-
-
-class ArchiveFormatError(RuntimeError):
- pass
-
-
-def sha256_hex(data: bytes) -> str:
- return hashlib.sha256(data).hexdigest()
-
-
-def safe_component(value: str, fallback: str = "item", max_len: int = 80) -> str:
- clean = re.sub(r"[^A-Za-z0-9._-]+", "_", value).strip("._-")
- if not clean:
- clean = fallback
- return clean[:max_len]
-
-
-def first_diff(a: bytes, b: bytes) -> tuple[int | None, str | None]:
- if a == b:
- return None, None
- limit = min(len(a), len(b))
- for idx in range(limit):
- if a[idx] != b[idx]:
- return idx, f"{a[idx]:02x}!={b[idx]:02x}"
- return limit, f"len {len(a)}!={len(b)}"
-
-
-def load_json(path: Path) -> dict[str, Any]:
- with path.open("r", encoding="utf-8") as handle:
- return json.load(handle)
-
-
-def dump_json(path: Path, payload: dict[str, Any]) -> None:
- path.parent.mkdir(parents=True, exist_ok=True)
- with path.open("w", encoding="utf-8") as handle:
- json.dump(payload, handle, indent=2, ensure_ascii=False)
- handle.write("\n")
-
-
-def xor_stream(data: bytes, key16: int) -> bytes:
- lo = key16 & 0xFF
- hi = (key16 >> 8) & 0xFF
- out = bytearray(len(data))
- for i, value in enumerate(data):
- lo = (hi ^ ((lo << 1) & 0xFF)) & 0xFF
- out[i] = value ^ lo
- hi = (lo ^ ((hi >> 1) & 0xFF)) & 0xFF
- return bytes(out)
-
-
-def lzss_decompress_simple(data: bytes, expected_size: int) -> bytes:
- ring = bytearray([0x20] * 0x1000)
- ring_pos = 0xFEE
- out = bytearray()
- in_pos = 0
- control = 0
- bits_left = 0
-
- while len(out) < expected_size and in_pos < len(data):
- if bits_left == 0:
- control = data[in_pos]
- in_pos += 1
- bits_left = 8
-
- if control & 1:
- if in_pos >= len(data):
- break
- byte = data[in_pos]
- in_pos += 1
- out.append(byte)
- ring[ring_pos] = byte
- ring_pos = (ring_pos + 1) & 0x0FFF
- else:
- if in_pos + 1 >= len(data):
- break
- low = data[in_pos]
- high = data[in_pos + 1]
- in_pos += 2
- # Real files indicate nibble layout opposite to common LZSS variant:
- # high nibble extends offset, low nibble stores (length - 3).
- offset = low | ((high & 0xF0) << 4)
- length = (high & 0x0F) + 3
- for step in range(length):
- byte = ring[(offset + step) & 0x0FFF]
- out.append(byte)
- ring[ring_pos] = byte
- ring_pos = (ring_pos + 1) & 0x0FFF
- if len(out) >= expected_size:
- break
-
- control >>= 1
- bits_left -= 1
-
- if len(out) != expected_size:
- raise ArchiveFormatError(
- f"LZSS size mismatch: expected {expected_size}, got {len(out)}"
- )
- return bytes(out)
-
-
-def decode_rsli_payload(
- packed: bytes, method: int, sort_to_original: int, unpacked_size: int
-) -> bytes:
- key16 = sort_to_original & 0xFFFF
-
- if method == 0x000:
- out = packed
- elif method == 0x020:
- if len(packed) < unpacked_size:
- raise ArchiveFormatError(
- f"method 0x20 packed too short: {len(packed)} < {unpacked_size}"
- )
- out = xor_stream(packed[:unpacked_size], key16)
- elif method == 0x040:
- out = lzss_decompress_simple(packed, unpacked_size)
- elif method == 0x060:
- out = lzss_decompress_simple(xor_stream(packed, key16), unpacked_size)
- elif method == 0x100:
- try:
- out = zlib.decompress(packed, -15)
- except zlib.error:
- out = zlib.decompress(packed)
- else:
- raise ArchiveFormatError(f"unsupported RsLi method: 0x{method:03X}")
-
- if len(out) != unpacked_size:
- raise ArchiveFormatError(
- f"unpacked_size mismatch: expected {unpacked_size}, got {len(out)}"
- )
- return out
-
-
-def detect_archive_type(path: Path) -> str | None:
- try:
- with path.open("rb") as handle:
- magic = handle.read(4)
- except OSError:
- return None
-
- if magic == MAGIC_NRES:
- return "nres"
- if magic == MAGIC_RSLI:
- return "rsli"
- return None
-
-
-def scan_archives(root: Path) -> list[dict[str, Any]]:
- found: list[dict[str, Any]] = []
- for path in sorted(root.rglob("*")):
- if not path.is_file():
- continue
- archive_type = detect_archive_type(path)
- if not archive_type:
- continue
- found.append(
- {
- "path": str(path),
- "relative_path": str(path.relative_to(root)),
- "type": archive_type,
- "size": path.stat().st_size,
- }
- )
- return found
-
-
-def parse_nres(data: bytes, source: str = "<memory>") -> dict[str, Any]:
- if len(data) < 16:
- raise ArchiveFormatError(f"{source}: NRes too short ({len(data)} bytes)")
-
- magic, version, entry_count, total_size = struct.unpack_from("<4sIII", data, 0)
- if magic != MAGIC_NRES:
- raise ArchiveFormatError(f"{source}: invalid NRes magic")
-
- issues: list[str] = []
- if total_size != len(data):
- issues.append(
- f"header.total_size={total_size} != actual_size={len(data)} (spec 1.2)"
- )
- if version != 0x100:
- issues.append(f"version=0x{version:08X} != 0x00000100 (spec 1.2)")
-
- directory_offset = total_size - entry_count * 64
- if directory_offset < 16 or directory_offset > len(data):
- raise ArchiveFormatError(
- f"{source}: invalid directory offset {directory_offset} for entry_count={entry_count}"
- )
- if directory_offset + entry_count * 64 != len(data):
- issues.append(
- "directory_offset + entry_count*64 != file_size (spec 1.3)"
- )
-
- entries: list[dict[str, Any]] = []
- for index in range(entry_count):
- offset = directory_offset + index * 64
- if offset + 64 > len(data):
- raise ArchiveFormatError(f"{source}: truncated directory entry {index}")
-
- (
- type_id,
- attr1,
- attr2,
- size,
- attr3,
- name_raw,
- data_offset,
- sort_index,
- ) = struct.unpack_from("<IIIII36sII", data, offset)
- name_bytes = name_raw.split(b"\x00", 1)[0]
- name = name_bytes.decode("latin1", errors="replace")
- entries.append(
- {
- "index": index,
- "type_id": type_id,
- "attr1": attr1,
- "attr2": attr2,
- "size": size,
- "attr3": attr3,
- "name": name,
- "name_bytes_hex": name_bytes.hex(),
- "name_raw_hex": name_raw.hex(),
- "data_offset": data_offset,
- "sort_index": sort_index,
- }
- )
-
- # Spec checks.
- expected_sort = sorted(
- range(entry_count),
- key=lambda idx: bytes.fromhex(entries[idx]["name_bytes_hex"]).lower(),
- )
- current_sort = [item["sort_index"] for item in entries]
- if current_sort != expected_sort:
- issues.append(
- "sort_index table does not match case-insensitive name order (spec 1.4)"
- )
-
- data_regions = sorted(
- (
- item["index"],
- item["data_offset"],
- item["size"],
- )
- for item in entries
- )
- for idx, data_offset, size in data_regions:
- if data_offset % 8 != 0:
- issues.append(f"entry {idx}: data_offset={data_offset} not aligned to 8 (spec 1.5)")
- if data_offset < 16 or data_offset + size > directory_offset:
- issues.append(
- f"entry {idx}: data range [{data_offset}, {data_offset + size}) out of data area (spec 1.3)"
- )
- for i in range(len(data_regions) - 1):
- _, start, size = data_regions[i]
- _, next_start, _ = data_regions[i + 1]
- if start + size > next_start:
- issues.append(
- f"entry overlap at data_offset={start}, next={next_start}"
- )
- padding = data[start + size : next_start]
- if any(padding):
- issues.append(
- f"non-zero padding after data block at offset={start + size} (spec 1.5)"
- )
-
- return {
- "format": "NRes",
- "header": {
- "magic": "NRes",
- "version": version,
- "entry_count": entry_count,
- "total_size": total_size,
- "directory_offset": directory_offset,
- },
- "entries": entries,
- "issues": issues,
- }
-
-
-def build_nres_name_field(entry: dict[str, Any]) -> bytes:
- if "name_bytes_hex" in entry:
- raw = bytes.fromhex(entry["name_bytes_hex"])
- else:
- raw = entry.get("name", "").encode("latin1", errors="replace")
- raw = raw[:35]
- return raw + b"\x00" * (36 - len(raw))
-
-
-def unpack_nres_file(archive_path: Path, out_dir: Path, source_root: Path | None = None) -> dict[str, Any]:
- data = archive_path.read_bytes()
- parsed = parse_nres(data, source=str(archive_path))
-
- out_dir.mkdir(parents=True, exist_ok=True)
- entries_dir = out_dir / "entries"
- entries_dir.mkdir(parents=True, exist_ok=True)
-
- manifest: dict[str, Any] = {
- "format": "NRes",
- "source_path": str(archive_path),
- "source_relative_path": str(archive_path.relative_to(source_root)) if source_root else str(archive_path),
- "header": parsed["header"],
- "entries": [],
- "issues": parsed["issues"],
- "source_sha256": sha256_hex(data),
- }
-
- for entry in parsed["entries"]:
- begin = entry["data_offset"]
- end = begin + entry["size"]
- if begin < 0 or end > len(data):
- raise ArchiveFormatError(
- f"{archive_path}: entry {entry['index']} data range outside file"
- )
- payload = data[begin:end]
- base = safe_component(entry["name"], fallback=f"entry_{entry['index']:05d}")
- file_name = (
- f"{entry['index']:05d}__{base}"
- f"__t{entry['type_id']:08X}_a1{entry['attr1']:08X}_a2{entry['attr2']:08X}.bin"
- )
- (entries_dir / file_name).write_bytes(payload)
-
- manifest_entry = dict(entry)
- manifest_entry["data_file"] = f"entries/{file_name}"
- manifest_entry["sha256"] = sha256_hex(payload)
- manifest["entries"].append(manifest_entry)
-
- dump_json(out_dir / "manifest.json", manifest)
- return manifest
-
-
-def pack_nres_manifest(manifest_path: Path, out_file: Path) -> bytes:
- manifest = load_json(manifest_path)
- if manifest.get("format") != "NRes":
- raise ArchiveFormatError(f"{manifest_path}: not an NRes manifest")
-
- entries = manifest["entries"]
- count = len(entries)
- version = int(manifest.get("header", {}).get("version", 0x100))
-
- out = bytearray(b"\x00" * 16)
- data_offsets: list[int] = []
- data_sizes: list[int] = []
-
- for entry in entries:
- payload_path = manifest_path.parent / entry["data_file"]
- payload = payload_path.read_bytes()
- offset = len(out)
- out.extend(payload)
- padding = (-len(out)) % 8
- if padding:
- out.extend(b"\x00" * padding)
- data_offsets.append(offset)
- data_sizes.append(len(payload))
-
- directory_offset = len(out)
- expected_sort = sorted(
- range(count),
- key=lambda idx: bytes.fromhex(entries[idx].get("name_bytes_hex", "")).lower(),
- )
-
- for index, entry in enumerate(entries):
- name_field = build_nres_name_field(entry)
- out.extend(
- struct.pack(
- "<IIIII36sII",
- int(entry["type_id"]),
- int(entry["attr1"]),
- int(entry["attr2"]),
- data_sizes[index],
- int(entry["attr3"]),
- name_field,
- data_offsets[index],
- expected_sort[index],
- )
- )
-
- total_size = len(out)
- struct.pack_into("<4sIII", out, 0, MAGIC_NRES, version, count, total_size)
-
- out_file.parent.mkdir(parents=True, exist_ok=True)
- out_file.write_bytes(out)
- return bytes(out)
-
-
-def parse_rsli(data: bytes, source: str = "<memory>") -> dict[str, Any]:
- if len(data) < 32:
- raise ArchiveFormatError(f"{source}: RsLi too short ({len(data)} bytes)")
- if data[:4] != MAGIC_RSLI:
- raise ArchiveFormatError(f"{source}: invalid RsLi magic")
-
- issues: list[str] = []
- reserved_zero = data[2]
- version = data[3]
- entry_count = struct.unpack_from("<h", data, 4)[0]
- presorted_flag = struct.unpack_from("<H", data, 14)[0]
- seed = struct.unpack_from("<I", data, 20)[0]
-
- if reserved_zero != 0:
- issues.append(f"header[2]={reserved_zero} != 0 (spec 2.2)")
- if version != 1:
- issues.append(f"version={version} != 1 (spec 2.2)")
- if entry_count < 0:
- raise ArchiveFormatError(f"{source}: negative entry_count={entry_count}")
-
- table_offset = 32
- table_size = entry_count * 32
- if table_offset + table_size > len(data):
- raise ArchiveFormatError(
- f"{source}: encrypted table out of file bounds ({table_offset}+{table_size}>{len(data)})"
- )
-
- table_encrypted = data[table_offset : table_offset + table_size]
- table_plain = xor_stream(table_encrypted, seed & 0xFFFF)
-
- trailer: dict[str, Any] = {"present": False}
- overlay_offset = 0
- if len(data) >= 6 and data[-6:-4] == b"AO":
- overlay_offset = struct.unpack_from("<I", data, len(data) - 4)[0]
- trailer = {
- "present": True,
- "signature": "AO",
- "overlay_offset": overlay_offset,
- "raw_hex": data[-6:].hex(),
- }
-
- entries: list[dict[str, Any]] = []
- sort_values: list[int] = []
- for index in range(entry_count):
- row = table_plain[index * 32 : (index + 1) * 32]
- name_raw = row[0:12]
- reserved4 = row[12:16]
- flags_signed, sort_to_original = struct.unpack_from("<hh", row, 16)
- unpacked_size, data_offset, packed_size = struct.unpack_from("<III", row, 20)
- method = flags_signed & 0x1E0
- name = name_raw.split(b"\x00", 1)[0].decode("latin1", errors="replace")
- effective_offset = data_offset + overlay_offset
- entries.append(
- {
- "index": index,
- "name": name,
- "name_raw_hex": name_raw.hex(),
- "reserved_raw_hex": reserved4.hex(),
- "flags_signed": flags_signed,
- "flags_u16": flags_signed & 0xFFFF,
- "method": method,
- "sort_to_original": sort_to_original,
- "unpacked_size": unpacked_size,
- "data_offset": data_offset,
- "effective_data_offset": effective_offset,
- "packed_size": packed_size,
- }
- )
- sort_values.append(sort_to_original)
-
- if effective_offset < 0:
- issues.append(f"entry {index}: negative effective_data_offset={effective_offset}")
- elif effective_offset + packed_size > len(data):
- end = effective_offset + packed_size
- if method == 0x100 and end == len(data) + 1:
- issues.append(
- f"entry {index}: deflate packed_size reaches EOF+1 ({end}); "
- "observed in game data, likely decoder lookahead byte"
- )
- else:
- issues.append(
- f"entry {index}: packed range [{effective_offset}, {end}) out of file"
- )
-
- if presorted_flag == 0xABBA:
- if sorted(sort_values) != list(range(entry_count)):
- issues.append(
- "presorted flag is 0xABBA but sort_to_original is not a permutation [0..N-1] (spec 2.2/2.4)"
- )
-
- return {
- "format": "RsLi",
- "header_raw_hex": data[:32].hex(),
- "header": {
- "magic": "NL\\x00\\x01",
- "entry_count": entry_count,
- "seed": seed,
- "presorted_flag": presorted_flag,
- },
- "entries": entries,
- "issues": issues,
- "trailer": trailer,
- }
-
-
-def unpack_rsli_file(archive_path: Path, out_dir: Path, source_root: Path | None = None) -> dict[str, Any]:
- data = archive_path.read_bytes()
- parsed = parse_rsli(data, source=str(archive_path))
-
- out_dir.mkdir(parents=True, exist_ok=True)
- entries_dir = out_dir / "entries"
- entries_dir.mkdir(parents=True, exist_ok=True)
-
- manifest: dict[str, Any] = {
- "format": "RsLi",
- "source_path": str(archive_path),
- "source_relative_path": str(archive_path.relative_to(source_root)) if source_root else str(archive_path),
- "source_size": len(data),
- "header_raw_hex": parsed["header_raw_hex"],
- "header": parsed["header"],
- "entries": [],
- "issues": list(parsed["issues"]),
- "trailer": parsed["trailer"],
- "source_sha256": sha256_hex(data),
- }
-
- for entry in parsed["entries"]:
- begin = int(entry["effective_data_offset"])
- end = begin + int(entry["packed_size"])
- packed = data[begin:end]
- base = safe_component(entry["name"], fallback=f"entry_{entry['index']:05d}")
- packed_name = f"{entry['index']:05d}__{base}__packed.bin"
- (entries_dir / packed_name).write_bytes(packed)
-
- manifest_entry = dict(entry)
- manifest_entry["packed_file"] = f"entries/{packed_name}"
- manifest_entry["packed_file_size"] = len(packed)
- manifest_entry["packed_sha256"] = sha256_hex(packed)
-
- try:
- unpacked = decode_rsli_payload(
- packed=packed,
- method=int(entry["method"]),
- sort_to_original=int(entry["sort_to_original"]),
- unpacked_size=int(entry["unpacked_size"]),
- )
- unpacked_name = f"{entry['index']:05d}__{base}__unpacked.bin"
- (entries_dir / unpacked_name).write_bytes(unpacked)
- manifest_entry["unpacked_file"] = f"entries/{unpacked_name}"
- manifest_entry["unpacked_sha256"] = sha256_hex(unpacked)
- except ArchiveFormatError as exc:
- manifest_entry["unpack_error"] = str(exc)
- manifest["issues"].append(
- f"entry {entry['index']}: cannot decode method 0x{entry['method']:03X}: {exc}"
- )
-
- manifest["entries"].append(manifest_entry)
-
- dump_json(out_dir / "manifest.json", manifest)
- return manifest
-
-
-def _pack_i16(value: int) -> int:
- if not (-32768 <= int(value) <= 32767):
- raise ArchiveFormatError(f"int16 overflow: {value}")
- return int(value)
-
-
-def pack_rsli_manifest(manifest_path: Path, out_file: Path) -> bytes:
- manifest = load_json(manifest_path)
- if manifest.get("format") != "RsLi":
- raise ArchiveFormatError(f"{manifest_path}: not an RsLi manifest")
-
- entries = manifest["entries"]
- count = len(entries)
-
- header_raw = bytes.fromhex(manifest["header_raw_hex"])
- if len(header_raw) != 32:
- raise ArchiveFormatError(f"{manifest_path}: header_raw_hex must be 32 bytes")
- header = bytearray(header_raw)
- header[:4] = MAGIC_RSLI
- struct.pack_into("<h", header, 4, count)
- seed = int(manifest["header"]["seed"])
- struct.pack_into("<I", header, 20, seed)
-
- rows = bytearray()
- packed_chunks: list[tuple[dict[str, Any], bytes]] = []
-
- for entry in entries:
- packed_path = manifest_path.parent / entry["packed_file"]
- packed = packed_path.read_bytes()
- declared_size = int(entry["packed_size"])
- if len(packed) > declared_size:
- raise ArchiveFormatError(
- f"{packed_path}: packed size {len(packed)} > manifest packed_size {declared_size}"
- )
-
- data_offset = int(entry["data_offset"])
- packed_chunks.append((entry, packed))
-
- row = bytearray(32)
- name_raw = bytes.fromhex(entry["name_raw_hex"])
- reserved_raw = bytes.fromhex(entry["reserved_raw_hex"])
- if len(name_raw) != 12 or len(reserved_raw) != 4:
- raise ArchiveFormatError(
- f"entry {entry['index']}: invalid name/reserved raw length"
- )
- row[0:12] = name_raw
- row[12:16] = reserved_raw
- struct.pack_into(
- "<hhIII",
- row,
- 16,
- _pack_i16(int(entry["flags_signed"])),
- _pack_i16(int(entry["sort_to_original"])),
- int(entry["unpacked_size"]),
- data_offset,
- declared_size,
- )
- rows.extend(row)
-
- encrypted_table = xor_stream(bytes(rows), seed & 0xFFFF)
- trailer = manifest.get("trailer", {})
- trailer_raw = b""
- if trailer.get("present"):
- raw_hex = trailer.get("raw_hex", "")
- trailer_raw = bytes.fromhex(raw_hex)
- if len(trailer_raw) != 6:
- raise ArchiveFormatError("trailer raw length must be 6 bytes")
-
- source_size = manifest.get("source_size")
- table_end = 32 + count * 32
- if source_size is not None:
- pre_trailer_size = int(source_size) - len(trailer_raw)
- if pre_trailer_size < table_end:
- raise ArchiveFormatError(
- f"invalid source_size={source_size}: smaller than header+table"
- )
- else:
- pre_trailer_size = table_end
- for entry, packed in packed_chunks:
- pre_trailer_size = max(
- pre_trailer_size, int(entry["data_offset"]) + len(packed)
- )
-
- out = bytearray(pre_trailer_size)
- out[0:32] = header
- out[32:table_end] = encrypted_table
- occupied = bytearray(pre_trailer_size)
- occupied[0:table_end] = b"\x01" * table_end
-
- for entry, packed in packed_chunks:
- base_offset = int(entry["data_offset"])
- for index, byte in enumerate(packed):
- pos = base_offset + index
- if pos >= pre_trailer_size:
- raise ArchiveFormatError(
- f"entry {entry['index']}: data write at {pos} beyond output size {pre_trailer_size}"
- )
- if occupied[pos] and out[pos] != byte:
- raise ArchiveFormatError(
- f"entry {entry['index']}: overlapping packed data conflict at offset {pos}"
- )
- out[pos] = byte
- occupied[pos] = 1
-
- out.extend(trailer_raw)
- if source_size is not None and len(out) != int(source_size):
- raise ArchiveFormatError(
- f"packed size {len(out)} != source_size {source_size} from manifest"
- )
-
- out_file.parent.mkdir(parents=True, exist_ok=True)
- out_file.write_bytes(out)
- return bytes(out)
-
-
-def cmd_scan(args: argparse.Namespace) -> int:
- root = Path(args.input).resolve()
- archives = scan_archives(root)
- if args.json:
- print(json.dumps(archives, ensure_ascii=False, indent=2))
- else:
- print(f"Found {len(archives)} archive(s) in {root}")
- for item in archives:
- print(f"{item['type']:4} {item['size']:10d} {item['relative_path']}")
- return 0
-
-
-def cmd_nres_unpack(args: argparse.Namespace) -> int:
- archive_path = Path(args.archive).resolve()
- out_dir = Path(args.output).resolve()
- manifest = unpack_nres_file(archive_path, out_dir)
- print(f"NRes unpacked: {archive_path}")
- print(f"Manifest: {out_dir / 'manifest.json'}")
- print(f"Entries : {len(manifest['entries'])}")
- if manifest["issues"]:
- print("Issues:")
- for issue in manifest["issues"]:
- print(f"- {issue}")
- return 0
-
-
-def cmd_nres_pack(args: argparse.Namespace) -> int:
- manifest_path = Path(args.manifest).resolve()
- out_file = Path(args.output).resolve()
- packed = pack_nres_manifest(manifest_path, out_file)
- print(f"NRes packed: {out_file} ({len(packed)} bytes, sha256={sha256_hex(packed)})")
- return 0
-
-
-def cmd_rsli_unpack(args: argparse.Namespace) -> int:
- archive_path = Path(args.archive).resolve()
- out_dir = Path(args.output).resolve()
- manifest = unpack_rsli_file(archive_path, out_dir)
- print(f"RsLi unpacked: {archive_path}")
- print(f"Manifest: {out_dir / 'manifest.json'}")
- print(f"Entries : {len(manifest['entries'])}")
- if manifest["issues"]:
- print("Issues:")
- for issue in manifest["issues"]:
- print(f"- {issue}")
- return 0
-
-
-def cmd_rsli_pack(args: argparse.Namespace) -> int:
- manifest_path = Path(args.manifest).resolve()
- out_file = Path(args.output).resolve()
- packed = pack_rsli_manifest(manifest_path, out_file)
- print(f"RsLi packed: {out_file} ({len(packed)} bytes, sha256={sha256_hex(packed)})")
- return 0
-
-
-def cmd_validate(args: argparse.Namespace) -> int:
- input_root = Path(args.input).resolve()
- archives = scan_archives(input_root)
-
- temp_created = False
- if args.workdir:
- workdir = Path(args.workdir).resolve()
- workdir.mkdir(parents=True, exist_ok=True)
- else:
- workdir = Path(tempfile.mkdtemp(prefix="nres-rsli-validate-"))
- temp_created = True
-
- report: dict[str, Any] = {
- "input_root": str(input_root),
- "workdir": str(workdir),
- "archives_total": len(archives),
- "results": [],
- "summary": {},
- }
-
- failures = 0
- try:
- for idx, item in enumerate(archives):
- rel = item["relative_path"]
- archive_path = input_root / rel
- marker = f"{idx:04d}_{safe_component(rel, fallback='archive')}"
- unpack_dir = workdir / "unpacked" / marker
- repacked_file = workdir / "repacked" / f"{marker}.bin"
- try:
- if item["type"] == "nres":
- manifest = unpack_nres_file(archive_path, unpack_dir, source_root=input_root)
- repacked = pack_nres_manifest(unpack_dir / "manifest.json", repacked_file)
- elif item["type"] == "rsli":
- manifest = unpack_rsli_file(archive_path, unpack_dir, source_root=input_root)
- repacked = pack_rsli_manifest(unpack_dir / "manifest.json", repacked_file)
- else:
- continue
-
- original = archive_path.read_bytes()
- match = original == repacked
- diff_offset, diff_desc = first_diff(original, repacked)
- issues = list(manifest.get("issues", []))
- result = {
- "relative_path": rel,
- "type": item["type"],
- "size_original": len(original),
- "size_repacked": len(repacked),
- "sha256_original": sha256_hex(original),
- "sha256_repacked": sha256_hex(repacked),
- "match": match,
- "first_diff_offset": diff_offset,
- "first_diff": diff_desc,
- "issues": issues,
- "entries": len(manifest.get("entries", [])),
- "error": None,
- }
- except Exception as exc: # pylint: disable=broad-except
- result = {
- "relative_path": rel,
- "type": item["type"],
- "size_original": item["size"],
- "size_repacked": None,
- "sha256_original": None,
- "sha256_repacked": None,
- "match": False,
- "first_diff_offset": None,
- "first_diff": None,
- "issues": [f"processing error: {exc}"],
- "entries": None,
- "error": str(exc),
- }
-
- report["results"].append(result)
-
- if not result["match"]:
- failures += 1
- if result["issues"] and args.fail_on_issues:
- failures += 1
-
- matches = sum(1 for row in report["results"] if row["match"])
- mismatches = len(report["results"]) - matches
- nres_count = sum(1 for row in report["results"] if row["type"] == "nres")
- rsli_count = sum(1 for row in report["results"] if row["type"] == "rsli")
- issues_total = sum(len(row["issues"]) for row in report["results"])
- report["summary"] = {
- "nres_count": nres_count,
- "rsli_count": rsli_count,
- "matches": matches,
- "mismatches": mismatches,
- "issues_total": issues_total,
- }
-
- if args.report:
- dump_json(Path(args.report).resolve(), report)
-
- print(f"Input root : {input_root}")
- print(f"Work dir : {workdir}")
- print(f"NRes archives : {nres_count}")
- print(f"RsLi archives : {rsli_count}")
- print(f"Roundtrip match: {matches}/{len(report['results'])}")
- print(f"Doc issues : {issues_total}")
-
- if mismatches:
- print("\nMismatches:")
- for row in report["results"]:
- if row["match"]:
- continue
- print(
- f"- {row['relative_path']} [{row['type']}] "
- f"diff@{row['first_diff_offset']}: {row['first_diff']}"
- )
-
- if issues_total:
- print("\nIssues:")
- for row in report["results"]:
- if not row["issues"]:
- continue
- print(f"- {row['relative_path']} [{row['type']}]")
- for issue in row["issues"]:
- print(f" * {issue}")
-
- finally:
- if temp_created or args.cleanup:
- shutil.rmtree(workdir, ignore_errors=True)
-
- if failures > 0:
- return 1
- if report["summary"].get("mismatches", 0) > 0 and args.fail_on_diff:
- return 1
- return 0
-
-
-def build_parser() -> argparse.ArgumentParser:
- parser = argparse.ArgumentParser(
- description="NRes/RsLi tools: scan, unpack, repack, and roundtrip validation."
- )
- sub = parser.add_subparsers(dest="command", required=True)
-
- scan = sub.add_parser("scan", help="Scan files by header signatures.")
- scan.add_argument("--input", required=True, help="Root directory to scan.")
- scan.add_argument("--json", action="store_true", help="Print JSON output.")
- scan.set_defaults(func=cmd_scan)
-
- nres_unpack = sub.add_parser("nres-unpack", help="Unpack a single NRes archive.")
- nres_unpack.add_argument("--archive", required=True, help="Path to NRes file.")
- nres_unpack.add_argument("--output", required=True, help="Output directory.")
- nres_unpack.set_defaults(func=cmd_nres_unpack)
-
- nres_pack = sub.add_parser("nres-pack", help="Pack NRes archive from manifest.")
- nres_pack.add_argument("--manifest", required=True, help="Path to manifest.json.")
- nres_pack.add_argument("--output", required=True, help="Output file path.")
- nres_pack.set_defaults(func=cmd_nres_pack)
-
- rsli_unpack = sub.add_parser("rsli-unpack", help="Unpack a single RsLi archive.")
- rsli_unpack.add_argument("--archive", required=True, help="Path to RsLi file.")
- rsli_unpack.add_argument("--output", required=True, help="Output directory.")
- rsli_unpack.set_defaults(func=cmd_rsli_unpack)
-
- rsli_pack = sub.add_parser("rsli-pack", help="Pack RsLi archive from manifest.")
- rsli_pack.add_argument("--manifest", required=True, help="Path to manifest.json.")
- rsli_pack.add_argument("--output", required=True, help="Output file path.")
- rsli_pack.set_defaults(func=cmd_rsli_pack)
-
- validate = sub.add_parser(
- "validate",
- help="Scan all archives and run unpack->repack->byte-compare validation.",
- )
- validate.add_argument("--input", required=True, help="Root with game data files.")
- validate.add_argument(
- "--workdir",
- help="Working directory for temporary unpack/repack files. "
- "If omitted, a temporary directory is used and removed automatically.",
- )
- validate.add_argument("--report", help="Optional JSON report output path.")
- validate.add_argument(
- "--fail-on-diff",
- action="store_true",
- help="Return non-zero exit code if any byte mismatch exists.",
- )
- validate.add_argument(
- "--fail-on-issues",
- action="store_true",
- help="Return non-zero exit code if any spec issue was detected.",
- )
- validate.add_argument(
- "--cleanup",
- action="store_true",
- help="Remove --workdir after completion.",
- )
- validate.set_defaults(func=cmd_validate)
-
- return parser
-
-
-def main() -> int:
- parser = build_parser()
- args = parser.parse_args()
- return int(args.func(args))
-
-
-if __name__ == "__main__":
- raise SystemExit(main())