aboutsummaryrefslogtreecommitdiff
path: root/tools
diff options
context:
space:
mode:
Diffstat (limited to 'tools')
-rw-r--r--tools/README.md201
-rw-r--r--tools/archive_roundtrip_validator.py944
-rw-r--r--tools/init_testdata.py204
-rw-r--r--tools/msh_doc_validator.py1000
-rw-r--r--tools/msh_export_obj.py357
-rw-r--r--tools/msh_preview_renderer.py481
6 files changed, 3187 insertions, 0 deletions
diff --git a/tools/README.md b/tools/README.md
new file mode 100644
index 0000000..2418567
--- /dev/null
+++ b/tools/README.md
@@ -0,0 +1,201 @@
+# Инструменты в каталоге `tools`
+
+## `archive_roundtrip_validator.py`
+
+Скрипт предназначен для **валидации документации по форматам NRes и RsLi на реальных данных игры**.
+
+Что делает утилита:
+
+- находит архивы по сигнатуре заголовка (а не по расширению файла);
+- распаковывает архивы в структуру `manifest.json + entries/*`;
+- собирает архивы обратно из `manifest.json`;
+- выполняет проверку `unpack -> repack -> byte-compare`;
+- формирует отчёт о расхождениях со спецификацией.
+
+Скрипт не изменяет оригинальные файлы игры. Рабочие файлы создаются только в указанном `--workdir` (или во временной папке).
+
+## Поддерживаемые сигнатуры
+
+- `NRes` (`4E 52 65 73`)
+- `RsLi` в файловом формате библиотеки: `NL 00 01`
+
+## Основные команды
+
+Сканирование архива по сигнатурам:
+
+```bash
+python3 tools/archive_roundtrip_validator.py scan --input tmp/gamedata
+```
+
+Распаковка/упаковка одного NRes:
+
+```bash
+python3 tools/archive_roundtrip_validator.py nres-unpack \
+ --archive tmp/gamedata/sounds.lib \
+ --output tmp/work/nres_sounds
+
+python3 tools/archive_roundtrip_validator.py nres-pack \
+ --manifest tmp/work/nres_sounds/manifest.json \
+ --output tmp/work/sounds.repacked.lib
+```
+
+Распаковка/упаковка одного RsLi:
+
+```bash
+python3 tools/archive_roundtrip_validator.py rsli-unpack \
+ --archive tmp/gamedata/sprites.lib \
+ --output tmp/work/rsli_sprites
+
+python3 tools/archive_roundtrip_validator.py rsli-pack \
+ --manifest tmp/work/rsli_sprites/manifest.json \
+ --output tmp/work/sprites.repacked.lib
+```
+
+Полная валидация документации на всём наборе данных:
+
+```bash
+python3 tools/archive_roundtrip_validator.py validate \
+ --input tmp/gamedata \
+ --workdir tmp/validation_work \
+ --report tmp/validation_report.json \
+ --fail-on-diff
+```
+
+## Формат распаковки
+
+Для каждого архива создаются:
+
+- `manifest.json` — все поля заголовка, записи, индексы, смещения, контрольные суммы;
+- `entries/*.bin` — payload-файлы.
+
+Имена файлов в `entries` включают индекс записи, поэтому коллизии одинаковых имён внутри архива обрабатываются корректно.
+
+## `init_testdata.py`
+
+Скрипт инициализирует тестовые данные по сигнатурам архивов из спецификации:
+
+- `NRes` (`4E 52 65 73`);
+- `RsLi` (`NL 00 01`).
+
+Что делает утилита:
+
+- рекурсивно сканирует все файлы в `--input`;
+- копирует найденные `NRes` в `--output/nres/`;
+- копирует найденные `RsLi` в `--output/rsli/`;
+- сохраняет относительный путь исходного файла внутри целевого каталога;
+- создаёт целевые каталоги автоматически, если их нет.
+
+Базовый запуск:
+
+```bash
+python3 tools/init_testdata.py --input tmp/gamedata --output testdata
+```
+
+Если целевой файл уже существует, скрипт спрашивает подтверждение перезаписи (`yes/no/all/quit`).
+
+Для перезаписи без вопросов используйте `--force`:
+
+```bash
+python3 tools/init_testdata.py --input tmp/gamedata --output testdata --force
+```
+
+Проверки надёжности:
+
+- `--input` должен существовать и быть каталогом;
+- если `--output` указывает на существующий файл, скрипт завершится с ошибкой;
+- если `--output` расположен внутри `--input`, каталог вывода исключается из сканирования;
+- если `stdin` неинтерактивный и требуется перезапись, нужно явно указать `--force`.
+
+## `msh_doc_validator.py`
+
+Скрипт валидирует ключевые инварианты из документации `/Users/valentineus/Developer/personal/fparkan/docs/specs/msh.md` на реальных данных.
+
+Проверяемые группы:
+
+- модели `*.msh` (вложенные `NRes` в архивах `NRes`);
+- текстуры `Texm` (`type_id = 0x6D786554`);
+- эффекты `FXID` (`type_id = 0x44495846`).
+
+Что проверяет для моделей:
+
+- обязательные ресурсы (`Res1/2/3/6/13`) и известные опциональные (`Res4/5/7/8/10/15/16/18/19`);
+- `size/attr1/attr3` и шаги структур по таблицам;
+- диапазоны индексов, батчей и ссылок между таблицами;
+- разбор `Res10` как `len + bytes + NUL` для каждого узла;
+- матрицу слотов в `Res1` (LOD/group) и границы по `Res2/Res7/Res13/Res19`.
+
+Быстрый запуск:
+
+```bash
+python3 tools/msh_doc_validator.py scan --input testdata/nres
+python3 tools/msh_doc_validator.py validate --input testdata/nres --print-limit 20
+```
+
+С отчётом в JSON:
+
+```bash
+python3 tools/msh_doc_validator.py validate \
+ --input testdata/nres \
+ --report tmp/msh_validation_report.json \
+ --fail-on-warnings
+```
+
+## `msh_preview_renderer.py`
+
+Примитивный программный рендерер моделей `*.msh` без внешних зависимостей.
+
+- вход: архив `NRes` (например `animals.rlb`) или прямой payload модели;
+- выход: изображение `PPM` (`P6`);
+- использует `Res3` (позиции), `Res6` (индексы), `Res13` (батчи), `Res1/Res2` (выбор слотов по `lod/group`).
+
+Показать доступные модели в архиве:
+
+```bash
+python3 tools/msh_preview_renderer.py list-models --archive testdata/nres/animals.rlb
+```
+
+Сгенерировать тестовый рендер:
+
+```bash
+python3 tools/msh_preview_renderer.py render \
+ --archive testdata/nres/animals.rlb \
+ --model A_L_01.msh \
+ --output tmp/renders/A_L_01.ppm \
+ --width 800 \
+ --height 600 \
+ --lod 0 \
+ --group 0 \
+ --wireframe
+```
+
+Ограничения:
+
+- инструмент предназначен для smoke-теста геометрии, а не для пиксельно-точного рендера движка;
+- текстуры/материалы/эффектные проходы не эмулируются.
+
+## `msh_export_obj.py`
+
+Экспортирует геометрию `*.msh` в `Wavefront OBJ`, чтобы открыть модель в Blender/MeshLab.
+
+- вход: `NRes` архив (например `animals.rlb`) или прямой payload модели;
+- выбор геометрии: через `Res1` slot matrix (`lod/group`) как в рендерере;
+- опция `--all-batches` экспортирует все батчи, игнорируя slot matrix.
+
+Показать модели в архиве:
+
+```bash
+python3 tools/msh_export_obj.py list-models --archive testdata/nres/animals.rlb
+```
+
+Экспорт в OBJ:
+
+```bash
+python3 tools/msh_export_obj.py export \
+ --archive testdata/nres/animals.rlb \
+ --model A_L_01.msh \
+ --output tmp/renders/A_L_01.obj \
+ --lod 0 \
+ --group 0
+```
+
+Файл `OBJ` можно открыть напрямую в Blender (`File -> Import -> Wavefront (.obj)`).
diff --git a/tools/archive_roundtrip_validator.py b/tools/archive_roundtrip_validator.py
new file mode 100644
index 0000000..073fd9b
--- /dev/null
+++ b/tools/archive_roundtrip_validator.py
@@ -0,0 +1,944 @@
+#!/usr/bin/env python3
+"""
+Roundtrip tools for NRes and RsLi archives.
+
+The script can:
+1) scan archives by header signature (ignores file extensions),
+2) unpack / pack NRes archives,
+3) unpack / pack RsLi archives,
+4) validate docs assumptions by full roundtrip and byte-to-byte comparison.
+"""
+
+from __future__ import annotations
+
+import argparse
+import hashlib
+import json
+import re
+import shutil
+import struct
+import tempfile
+import zlib
+from pathlib import Path
+from typing import Any
+
+MAGIC_NRES = b"NRes"
+MAGIC_RSLI = b"NL\x00\x01"
+
+
+class ArchiveFormatError(RuntimeError):
+ pass
+
+
+def sha256_hex(data: bytes) -> str:
+ return hashlib.sha256(data).hexdigest()
+
+
+def safe_component(value: str, fallback: str = "item", max_len: int = 80) -> str:
+ clean = re.sub(r"[^A-Za-z0-9._-]+", "_", value).strip("._-")
+ if not clean:
+ clean = fallback
+ return clean[:max_len]
+
+
+def first_diff(a: bytes, b: bytes) -> tuple[int | None, str | None]:
+ if a == b:
+ return None, None
+ limit = min(len(a), len(b))
+ for idx in range(limit):
+ if a[idx] != b[idx]:
+ return idx, f"{a[idx]:02x}!={b[idx]:02x}"
+ return limit, f"len {len(a)}!={len(b)}"
+
+
+def load_json(path: Path) -> dict[str, Any]:
+ with path.open("r", encoding="utf-8") as handle:
+ return json.load(handle)
+
+
+def dump_json(path: Path, payload: dict[str, Any]) -> None:
+ path.parent.mkdir(parents=True, exist_ok=True)
+ with path.open("w", encoding="utf-8") as handle:
+ json.dump(payload, handle, indent=2, ensure_ascii=False)
+ handle.write("\n")
+
+
+def xor_stream(data: bytes, key16: int) -> bytes:
+ lo = key16 & 0xFF
+ hi = (key16 >> 8) & 0xFF
+ out = bytearray(len(data))
+ for i, value in enumerate(data):
+ lo = (hi ^ ((lo << 1) & 0xFF)) & 0xFF
+ out[i] = value ^ lo
+ hi = (lo ^ ((hi >> 1) & 0xFF)) & 0xFF
+ return bytes(out)
+
+
+def lzss_decompress_simple(data: bytes, expected_size: int) -> bytes:
+ ring = bytearray([0x20] * 0x1000)
+ ring_pos = 0xFEE
+ out = bytearray()
+ in_pos = 0
+ control = 0
+ bits_left = 0
+
+ while len(out) < expected_size and in_pos < len(data):
+ if bits_left == 0:
+ control = data[in_pos]
+ in_pos += 1
+ bits_left = 8
+
+ if control & 1:
+ if in_pos >= len(data):
+ break
+ byte = data[in_pos]
+ in_pos += 1
+ out.append(byte)
+ ring[ring_pos] = byte
+ ring_pos = (ring_pos + 1) & 0x0FFF
+ else:
+ if in_pos + 1 >= len(data):
+ break
+ low = data[in_pos]
+ high = data[in_pos + 1]
+ in_pos += 2
+ # Real files indicate nibble layout opposite to common LZSS variant:
+ # high nibble extends offset, low nibble stores (length - 3).
+ offset = low | ((high & 0xF0) << 4)
+ length = (high & 0x0F) + 3
+ for step in range(length):
+ byte = ring[(offset + step) & 0x0FFF]
+ out.append(byte)
+ ring[ring_pos] = byte
+ ring_pos = (ring_pos + 1) & 0x0FFF
+ if len(out) >= expected_size:
+ break
+
+ control >>= 1
+ bits_left -= 1
+
+ if len(out) != expected_size:
+ raise ArchiveFormatError(
+ f"LZSS size mismatch: expected {expected_size}, got {len(out)}"
+ )
+ return bytes(out)
+
+
+def decode_rsli_payload(
+ packed: bytes, method: int, sort_to_original: int, unpacked_size: int
+) -> bytes:
+ key16 = sort_to_original & 0xFFFF
+
+ if method == 0x000:
+ out = packed
+ elif method == 0x020:
+ if len(packed) < unpacked_size:
+ raise ArchiveFormatError(
+ f"method 0x20 packed too short: {len(packed)} < {unpacked_size}"
+ )
+ out = xor_stream(packed[:unpacked_size], key16)
+ elif method == 0x040:
+ out = lzss_decompress_simple(packed, unpacked_size)
+ elif method == 0x060:
+ out = lzss_decompress_simple(xor_stream(packed, key16), unpacked_size)
+ elif method == 0x100:
+ try:
+ out = zlib.decompress(packed, -15)
+ except zlib.error:
+ out = zlib.decompress(packed)
+ else:
+ raise ArchiveFormatError(f"unsupported RsLi method: 0x{method:03X}")
+
+ if len(out) != unpacked_size:
+ raise ArchiveFormatError(
+ f"unpacked_size mismatch: expected {unpacked_size}, got {len(out)}"
+ )
+ return out
+
+
+def detect_archive_type(path: Path) -> str | None:
+ try:
+ with path.open("rb") as handle:
+ magic = handle.read(4)
+ except OSError:
+ return None
+
+ if magic == MAGIC_NRES:
+ return "nres"
+ if magic == MAGIC_RSLI:
+ return "rsli"
+ return None
+
+
+def scan_archives(root: Path) -> list[dict[str, Any]]:
+ found: list[dict[str, Any]] = []
+ for path in sorted(root.rglob("*")):
+ if not path.is_file():
+ continue
+ archive_type = detect_archive_type(path)
+ if not archive_type:
+ continue
+ found.append(
+ {
+ "path": str(path),
+ "relative_path": str(path.relative_to(root)),
+ "type": archive_type,
+ "size": path.stat().st_size,
+ }
+ )
+ return found
+
+
+def parse_nres(data: bytes, source: str = "<memory>") -> dict[str, Any]:
+ if len(data) < 16:
+ raise ArchiveFormatError(f"{source}: NRes too short ({len(data)} bytes)")
+
+ magic, version, entry_count, total_size = struct.unpack_from("<4sIII", data, 0)
+ if magic != MAGIC_NRES:
+ raise ArchiveFormatError(f"{source}: invalid NRes magic")
+
+ issues: list[str] = []
+ if total_size != len(data):
+ issues.append(
+ f"header.total_size={total_size} != actual_size={len(data)} (spec 1.2)"
+ )
+ if version != 0x100:
+ issues.append(f"version=0x{version:08X} != 0x00000100 (spec 1.2)")
+
+ directory_offset = total_size - entry_count * 64
+ if directory_offset < 16 or directory_offset > len(data):
+ raise ArchiveFormatError(
+ f"{source}: invalid directory offset {directory_offset} for entry_count={entry_count}"
+ )
+ if directory_offset + entry_count * 64 != len(data):
+ issues.append(
+ "directory_offset + entry_count*64 != file_size (spec 1.3)"
+ )
+
+ entries: list[dict[str, Any]] = []
+ for index in range(entry_count):
+ offset = directory_offset + index * 64
+ if offset + 64 > len(data):
+ raise ArchiveFormatError(f"{source}: truncated directory entry {index}")
+
+ (
+ type_id,
+ attr1,
+ attr2,
+ size,
+ attr3,
+ name_raw,
+ data_offset,
+ sort_index,
+ ) = struct.unpack_from("<IIIII36sII", data, offset)
+ name_bytes = name_raw.split(b"\x00", 1)[0]
+ name = name_bytes.decode("latin1", errors="replace")
+ entries.append(
+ {
+ "index": index,
+ "type_id": type_id,
+ "attr1": attr1,
+ "attr2": attr2,
+ "size": size,
+ "attr3": attr3,
+ "name": name,
+ "name_bytes_hex": name_bytes.hex(),
+ "name_raw_hex": name_raw.hex(),
+ "data_offset": data_offset,
+ "sort_index": sort_index,
+ }
+ )
+
+ # Spec checks.
+ expected_sort = sorted(
+ range(entry_count),
+ key=lambda idx: bytes.fromhex(entries[idx]["name_bytes_hex"]).lower(),
+ )
+ current_sort = [item["sort_index"] for item in entries]
+ if current_sort != expected_sort:
+ issues.append(
+ "sort_index table does not match case-insensitive name order (spec 1.4)"
+ )
+
+ data_regions = sorted(
+ (
+ item["index"],
+ item["data_offset"],
+ item["size"],
+ )
+ for item in entries
+ )
+ for idx, data_offset, size in data_regions:
+ if data_offset % 8 != 0:
+ issues.append(f"entry {idx}: data_offset={data_offset} not aligned to 8 (spec 1.5)")
+ if data_offset < 16 or data_offset + size > directory_offset:
+ issues.append(
+ f"entry {idx}: data range [{data_offset}, {data_offset + size}) out of data area (spec 1.3)"
+ )
+ for i in range(len(data_regions) - 1):
+ _, start, size = data_regions[i]
+ _, next_start, _ = data_regions[i + 1]
+ if start + size > next_start:
+ issues.append(
+ f"entry overlap at data_offset={start}, next={next_start}"
+ )
+ padding = data[start + size : next_start]
+ if any(padding):
+ issues.append(
+ f"non-zero padding after data block at offset={start + size} (spec 1.5)"
+ )
+
+ return {
+ "format": "NRes",
+ "header": {
+ "magic": "NRes",
+ "version": version,
+ "entry_count": entry_count,
+ "total_size": total_size,
+ "directory_offset": directory_offset,
+ },
+ "entries": entries,
+ "issues": issues,
+ }
+
+
+def build_nres_name_field(entry: dict[str, Any]) -> bytes:
+ if "name_bytes_hex" in entry:
+ raw = bytes.fromhex(entry["name_bytes_hex"])
+ else:
+ raw = entry.get("name", "").encode("latin1", errors="replace")
+ raw = raw[:35]
+ return raw + b"\x00" * (36 - len(raw))
+
+
+def unpack_nres_file(archive_path: Path, out_dir: Path, source_root: Path | None = None) -> dict[str, Any]:
+ data = archive_path.read_bytes()
+ parsed = parse_nres(data, source=str(archive_path))
+
+ out_dir.mkdir(parents=True, exist_ok=True)
+ entries_dir = out_dir / "entries"
+ entries_dir.mkdir(parents=True, exist_ok=True)
+
+ manifest: dict[str, Any] = {
+ "format": "NRes",
+ "source_path": str(archive_path),
+ "source_relative_path": str(archive_path.relative_to(source_root)) if source_root else str(archive_path),
+ "header": parsed["header"],
+ "entries": [],
+ "issues": parsed["issues"],
+ "source_sha256": sha256_hex(data),
+ }
+
+ for entry in parsed["entries"]:
+ begin = entry["data_offset"]
+ end = begin + entry["size"]
+ if begin < 0 or end > len(data):
+ raise ArchiveFormatError(
+ f"{archive_path}: entry {entry['index']} data range outside file"
+ )
+ payload = data[begin:end]
+ base = safe_component(entry["name"], fallback=f"entry_{entry['index']:05d}")
+ file_name = (
+ f"{entry['index']:05d}__{base}"
+ f"__t{entry['type_id']:08X}_a1{entry['attr1']:08X}_a2{entry['attr2']:08X}.bin"
+ )
+ (entries_dir / file_name).write_bytes(payload)
+
+ manifest_entry = dict(entry)
+ manifest_entry["data_file"] = f"entries/{file_name}"
+ manifest_entry["sha256"] = sha256_hex(payload)
+ manifest["entries"].append(manifest_entry)
+
+ dump_json(out_dir / "manifest.json", manifest)
+ return manifest
+
+
+def pack_nres_manifest(manifest_path: Path, out_file: Path) -> bytes:
+ manifest = load_json(manifest_path)
+ if manifest.get("format") != "NRes":
+ raise ArchiveFormatError(f"{manifest_path}: not an NRes manifest")
+
+ entries = manifest["entries"]
+ count = len(entries)
+ version = int(manifest.get("header", {}).get("version", 0x100))
+
+ out = bytearray(b"\x00" * 16)
+ data_offsets: list[int] = []
+ data_sizes: list[int] = []
+
+ for entry in entries:
+ payload_path = manifest_path.parent / entry["data_file"]
+ payload = payload_path.read_bytes()
+ offset = len(out)
+ out.extend(payload)
+ padding = (-len(out)) % 8
+ if padding:
+ out.extend(b"\x00" * padding)
+ data_offsets.append(offset)
+ data_sizes.append(len(payload))
+
+ directory_offset = len(out)
+ expected_sort = sorted(
+ range(count),
+ key=lambda idx: bytes.fromhex(entries[idx].get("name_bytes_hex", "")).lower(),
+ )
+
+ for index, entry in enumerate(entries):
+ name_field = build_nres_name_field(entry)
+ out.extend(
+ struct.pack(
+ "<IIIII36sII",
+ int(entry["type_id"]),
+ int(entry["attr1"]),
+ int(entry["attr2"]),
+ data_sizes[index],
+ int(entry["attr3"]),
+ name_field,
+ data_offsets[index],
+ expected_sort[index],
+ )
+ )
+
+ total_size = len(out)
+ struct.pack_into("<4sIII", out, 0, MAGIC_NRES, version, count, total_size)
+
+ out_file.parent.mkdir(parents=True, exist_ok=True)
+ out_file.write_bytes(out)
+ return bytes(out)
+
+
+def parse_rsli(data: bytes, source: str = "<memory>") -> dict[str, Any]:
+ if len(data) < 32:
+ raise ArchiveFormatError(f"{source}: RsLi too short ({len(data)} bytes)")
+ if data[:4] != MAGIC_RSLI:
+ raise ArchiveFormatError(f"{source}: invalid RsLi magic")
+
+ issues: list[str] = []
+ reserved_zero = data[2]
+ version = data[3]
+ entry_count = struct.unpack_from("<h", data, 4)[0]
+ presorted_flag = struct.unpack_from("<H", data, 14)[0]
+ seed = struct.unpack_from("<I", data, 20)[0]
+
+ if reserved_zero != 0:
+ issues.append(f"header[2]={reserved_zero} != 0 (spec 2.2)")
+ if version != 1:
+ issues.append(f"version={version} != 1 (spec 2.2)")
+ if entry_count < 0:
+ raise ArchiveFormatError(f"{source}: negative entry_count={entry_count}")
+
+ table_offset = 32
+ table_size = entry_count * 32
+ if table_offset + table_size > len(data):
+ raise ArchiveFormatError(
+ f"{source}: encrypted table out of file bounds ({table_offset}+{table_size}>{len(data)})"
+ )
+
+ table_encrypted = data[table_offset : table_offset + table_size]
+ table_plain = xor_stream(table_encrypted, seed & 0xFFFF)
+
+ trailer: dict[str, Any] = {"present": False}
+ overlay_offset = 0
+ if len(data) >= 6 and data[-6:-4] == b"AO":
+ overlay_offset = struct.unpack_from("<I", data, len(data) - 4)[0]
+ trailer = {
+ "present": True,
+ "signature": "AO",
+ "overlay_offset": overlay_offset,
+ "raw_hex": data[-6:].hex(),
+ }
+
+ entries: list[dict[str, Any]] = []
+ sort_values: list[int] = []
+ for index in range(entry_count):
+ row = table_plain[index * 32 : (index + 1) * 32]
+ name_raw = row[0:12]
+ reserved4 = row[12:16]
+ flags_signed, sort_to_original = struct.unpack_from("<hh", row, 16)
+ unpacked_size, data_offset, packed_size = struct.unpack_from("<III", row, 20)
+ method = flags_signed & 0x1E0
+ name = name_raw.split(b"\x00", 1)[0].decode("latin1", errors="replace")
+ effective_offset = data_offset + overlay_offset
+ entries.append(
+ {
+ "index": index,
+ "name": name,
+ "name_raw_hex": name_raw.hex(),
+ "reserved_raw_hex": reserved4.hex(),
+ "flags_signed": flags_signed,
+ "flags_u16": flags_signed & 0xFFFF,
+ "method": method,
+ "sort_to_original": sort_to_original,
+ "unpacked_size": unpacked_size,
+ "data_offset": data_offset,
+ "effective_data_offset": effective_offset,
+ "packed_size": packed_size,
+ }
+ )
+ sort_values.append(sort_to_original)
+
+ if effective_offset < 0:
+ issues.append(f"entry {index}: negative effective_data_offset={effective_offset}")
+ elif effective_offset + packed_size > len(data):
+ end = effective_offset + packed_size
+ if method == 0x100 and end == len(data) + 1:
+ issues.append(
+ f"entry {index}: deflate packed_size reaches EOF+1 ({end}); "
+ "observed in game data, likely decoder lookahead byte"
+ )
+ else:
+ issues.append(
+ f"entry {index}: packed range [{effective_offset}, {end}) out of file"
+ )
+
+ if presorted_flag == 0xABBA:
+ if sorted(sort_values) != list(range(entry_count)):
+ issues.append(
+ "presorted flag is 0xABBA but sort_to_original is not a permutation [0..N-1] (spec 2.2/2.4)"
+ )
+
+ return {
+ "format": "RsLi",
+ "header_raw_hex": data[:32].hex(),
+ "header": {
+ "magic": "NL\\x00\\x01",
+ "entry_count": entry_count,
+ "seed": seed,
+ "presorted_flag": presorted_flag,
+ },
+ "entries": entries,
+ "issues": issues,
+ "trailer": trailer,
+ }
+
+
+def unpack_rsli_file(archive_path: Path, out_dir: Path, source_root: Path | None = None) -> dict[str, Any]:
+ data = archive_path.read_bytes()
+ parsed = parse_rsli(data, source=str(archive_path))
+
+ out_dir.mkdir(parents=True, exist_ok=True)
+ entries_dir = out_dir / "entries"
+ entries_dir.mkdir(parents=True, exist_ok=True)
+
+ manifest: dict[str, Any] = {
+ "format": "RsLi",
+ "source_path": str(archive_path),
+ "source_relative_path": str(archive_path.relative_to(source_root)) if source_root else str(archive_path),
+ "source_size": len(data),
+ "header_raw_hex": parsed["header_raw_hex"],
+ "header": parsed["header"],
+ "entries": [],
+ "issues": list(parsed["issues"]),
+ "trailer": parsed["trailer"],
+ "source_sha256": sha256_hex(data),
+ }
+
+ for entry in parsed["entries"]:
+ begin = int(entry["effective_data_offset"])
+ end = begin + int(entry["packed_size"])
+ packed = data[begin:end]
+ base = safe_component(entry["name"], fallback=f"entry_{entry['index']:05d}")
+ packed_name = f"{entry['index']:05d}__{base}__packed.bin"
+ (entries_dir / packed_name).write_bytes(packed)
+
+ manifest_entry = dict(entry)
+ manifest_entry["packed_file"] = f"entries/{packed_name}"
+ manifest_entry["packed_file_size"] = len(packed)
+ manifest_entry["packed_sha256"] = sha256_hex(packed)
+
+ try:
+ unpacked = decode_rsli_payload(
+ packed=packed,
+ method=int(entry["method"]),
+ sort_to_original=int(entry["sort_to_original"]),
+ unpacked_size=int(entry["unpacked_size"]),
+ )
+ unpacked_name = f"{entry['index']:05d}__{base}__unpacked.bin"
+ (entries_dir / unpacked_name).write_bytes(unpacked)
+ manifest_entry["unpacked_file"] = f"entries/{unpacked_name}"
+ manifest_entry["unpacked_sha256"] = sha256_hex(unpacked)
+ except ArchiveFormatError as exc:
+ manifest_entry["unpack_error"] = str(exc)
+ manifest["issues"].append(
+ f"entry {entry['index']}: cannot decode method 0x{entry['method']:03X}: {exc}"
+ )
+
+ manifest["entries"].append(manifest_entry)
+
+ dump_json(out_dir / "manifest.json", manifest)
+ return manifest
+
+
+def _pack_i16(value: int) -> int:
+ if not (-32768 <= int(value) <= 32767):
+ raise ArchiveFormatError(f"int16 overflow: {value}")
+ return int(value)
+
+
+def pack_rsli_manifest(manifest_path: Path, out_file: Path) -> bytes:
+ manifest = load_json(manifest_path)
+ if manifest.get("format") != "RsLi":
+ raise ArchiveFormatError(f"{manifest_path}: not an RsLi manifest")
+
+ entries = manifest["entries"]
+ count = len(entries)
+
+ header_raw = bytes.fromhex(manifest["header_raw_hex"])
+ if len(header_raw) != 32:
+ raise ArchiveFormatError(f"{manifest_path}: header_raw_hex must be 32 bytes")
+ header = bytearray(header_raw)
+ header[:4] = MAGIC_RSLI
+ struct.pack_into("<h", header, 4, count)
+ seed = int(manifest["header"]["seed"])
+ struct.pack_into("<I", header, 20, seed)
+
+ rows = bytearray()
+ packed_chunks: list[tuple[dict[str, Any], bytes]] = []
+
+ for entry in entries:
+ packed_path = manifest_path.parent / entry["packed_file"]
+ packed = packed_path.read_bytes()
+ declared_size = int(entry["packed_size"])
+ if len(packed) > declared_size:
+ raise ArchiveFormatError(
+ f"{packed_path}: packed size {len(packed)} > manifest packed_size {declared_size}"
+ )
+
+ data_offset = int(entry["data_offset"])
+ packed_chunks.append((entry, packed))
+
+ row = bytearray(32)
+ name_raw = bytes.fromhex(entry["name_raw_hex"])
+ reserved_raw = bytes.fromhex(entry["reserved_raw_hex"])
+ if len(name_raw) != 12 or len(reserved_raw) != 4:
+ raise ArchiveFormatError(
+ f"entry {entry['index']}: invalid name/reserved raw length"
+ )
+ row[0:12] = name_raw
+ row[12:16] = reserved_raw
+ struct.pack_into(
+ "<hhIII",
+ row,
+ 16,
+ _pack_i16(int(entry["flags_signed"])),
+ _pack_i16(int(entry["sort_to_original"])),
+ int(entry["unpacked_size"]),
+ data_offset,
+ declared_size,
+ )
+ rows.extend(row)
+
+ encrypted_table = xor_stream(bytes(rows), seed & 0xFFFF)
+ trailer = manifest.get("trailer", {})
+ trailer_raw = b""
+ if trailer.get("present"):
+ raw_hex = trailer.get("raw_hex", "")
+ trailer_raw = bytes.fromhex(raw_hex)
+ if len(trailer_raw) != 6:
+ raise ArchiveFormatError("trailer raw length must be 6 bytes")
+
+ source_size = manifest.get("source_size")
+ table_end = 32 + count * 32
+ if source_size is not None:
+ pre_trailer_size = int(source_size) - len(trailer_raw)
+ if pre_trailer_size < table_end:
+ raise ArchiveFormatError(
+ f"invalid source_size={source_size}: smaller than header+table"
+ )
+ else:
+ pre_trailer_size = table_end
+ for entry, packed in packed_chunks:
+ pre_trailer_size = max(
+ pre_trailer_size, int(entry["data_offset"]) + len(packed)
+ )
+
+ out = bytearray(pre_trailer_size)
+ out[0:32] = header
+ out[32:table_end] = encrypted_table
+ occupied = bytearray(pre_trailer_size)
+ occupied[0:table_end] = b"\x01" * table_end
+
+ for entry, packed in packed_chunks:
+ base_offset = int(entry["data_offset"])
+ for index, byte in enumerate(packed):
+ pos = base_offset + index
+ if pos >= pre_trailer_size:
+ raise ArchiveFormatError(
+ f"entry {entry['index']}: data write at {pos} beyond output size {pre_trailer_size}"
+ )
+ if occupied[pos] and out[pos] != byte:
+ raise ArchiveFormatError(
+ f"entry {entry['index']}: overlapping packed data conflict at offset {pos}"
+ )
+ out[pos] = byte
+ occupied[pos] = 1
+
+ out.extend(trailer_raw)
+ if source_size is not None and len(out) != int(source_size):
+ raise ArchiveFormatError(
+ f"packed size {len(out)} != source_size {source_size} from manifest"
+ )
+
+ out_file.parent.mkdir(parents=True, exist_ok=True)
+ out_file.write_bytes(out)
+ return bytes(out)
+
+
+def cmd_scan(args: argparse.Namespace) -> int:
+ root = Path(args.input).resolve()
+ archives = scan_archives(root)
+ if args.json:
+ print(json.dumps(archives, ensure_ascii=False, indent=2))
+ else:
+ print(f"Found {len(archives)} archive(s) in {root}")
+ for item in archives:
+ print(f"{item['type']:4} {item['size']:10d} {item['relative_path']}")
+ return 0
+
+
+def cmd_nres_unpack(args: argparse.Namespace) -> int:
+ archive_path = Path(args.archive).resolve()
+ out_dir = Path(args.output).resolve()
+ manifest = unpack_nres_file(archive_path, out_dir)
+ print(f"NRes unpacked: {archive_path}")
+ print(f"Manifest: {out_dir / 'manifest.json'}")
+ print(f"Entries : {len(manifest['entries'])}")
+ if manifest["issues"]:
+ print("Issues:")
+ for issue in manifest["issues"]:
+ print(f"- {issue}")
+ return 0
+
+
+def cmd_nres_pack(args: argparse.Namespace) -> int:
+ manifest_path = Path(args.manifest).resolve()
+ out_file = Path(args.output).resolve()
+ packed = pack_nres_manifest(manifest_path, out_file)
+ print(f"NRes packed: {out_file} ({len(packed)} bytes, sha256={sha256_hex(packed)})")
+ return 0
+
+
+def cmd_rsli_unpack(args: argparse.Namespace) -> int:
+ archive_path = Path(args.archive).resolve()
+ out_dir = Path(args.output).resolve()
+ manifest = unpack_rsli_file(archive_path, out_dir)
+ print(f"RsLi unpacked: {archive_path}")
+ print(f"Manifest: {out_dir / 'manifest.json'}")
+ print(f"Entries : {len(manifest['entries'])}")
+ if manifest["issues"]:
+ print("Issues:")
+ for issue in manifest["issues"]:
+ print(f"- {issue}")
+ return 0
+
+
+def cmd_rsli_pack(args: argparse.Namespace) -> int:
+ manifest_path = Path(args.manifest).resolve()
+ out_file = Path(args.output).resolve()
+ packed = pack_rsli_manifest(manifest_path, out_file)
+ print(f"RsLi packed: {out_file} ({len(packed)} bytes, sha256={sha256_hex(packed)})")
+ return 0
+
+
+def cmd_validate(args: argparse.Namespace) -> int:
+ input_root = Path(args.input).resolve()
+ archives = scan_archives(input_root)
+
+ temp_created = False
+ if args.workdir:
+ workdir = Path(args.workdir).resolve()
+ workdir.mkdir(parents=True, exist_ok=True)
+ else:
+ workdir = Path(tempfile.mkdtemp(prefix="nres-rsli-validate-"))
+ temp_created = True
+
+ report: dict[str, Any] = {
+ "input_root": str(input_root),
+ "workdir": str(workdir),
+ "archives_total": len(archives),
+ "results": [],
+ "summary": {},
+ }
+
+ failures = 0
+ try:
+ for idx, item in enumerate(archives):
+ rel = item["relative_path"]
+ archive_path = input_root / rel
+ marker = f"{idx:04d}_{safe_component(rel, fallback='archive')}"
+ unpack_dir = workdir / "unpacked" / marker
+ repacked_file = workdir / "repacked" / f"{marker}.bin"
+ try:
+ if item["type"] == "nres":
+ manifest = unpack_nres_file(archive_path, unpack_dir, source_root=input_root)
+ repacked = pack_nres_manifest(unpack_dir / "manifest.json", repacked_file)
+ elif item["type"] == "rsli":
+ manifest = unpack_rsli_file(archive_path, unpack_dir, source_root=input_root)
+ repacked = pack_rsli_manifest(unpack_dir / "manifest.json", repacked_file)
+ else:
+ continue
+
+ original = archive_path.read_bytes()
+ match = original == repacked
+ diff_offset, diff_desc = first_diff(original, repacked)
+ issues = list(manifest.get("issues", []))
+ result = {
+ "relative_path": rel,
+ "type": item["type"],
+ "size_original": len(original),
+ "size_repacked": len(repacked),
+ "sha256_original": sha256_hex(original),
+ "sha256_repacked": sha256_hex(repacked),
+ "match": match,
+ "first_diff_offset": diff_offset,
+ "first_diff": diff_desc,
+ "issues": issues,
+ "entries": len(manifest.get("entries", [])),
+ "error": None,
+ }
+ except Exception as exc: # pylint: disable=broad-except
+ result = {
+ "relative_path": rel,
+ "type": item["type"],
+ "size_original": item["size"],
+ "size_repacked": None,
+ "sha256_original": None,
+ "sha256_repacked": None,
+ "match": False,
+ "first_diff_offset": None,
+ "first_diff": None,
+ "issues": [f"processing error: {exc}"],
+ "entries": None,
+ "error": str(exc),
+ }
+
+ report["results"].append(result)
+
+ if not result["match"]:
+ failures += 1
+ if result["issues"] and args.fail_on_issues:
+ failures += 1
+
+ matches = sum(1 for row in report["results"] if row["match"])
+ mismatches = len(report["results"]) - matches
+ nres_count = sum(1 for row in report["results"] if row["type"] == "nres")
+ rsli_count = sum(1 for row in report["results"] if row["type"] == "rsli")
+ issues_total = sum(len(row["issues"]) for row in report["results"])
+ report["summary"] = {
+ "nres_count": nres_count,
+ "rsli_count": rsli_count,
+ "matches": matches,
+ "mismatches": mismatches,
+ "issues_total": issues_total,
+ }
+
+ if args.report:
+ dump_json(Path(args.report).resolve(), report)
+
+ print(f"Input root : {input_root}")
+ print(f"Work dir : {workdir}")
+ print(f"NRes archives : {nres_count}")
+ print(f"RsLi archives : {rsli_count}")
+ print(f"Roundtrip match: {matches}/{len(report['results'])}")
+ print(f"Doc issues : {issues_total}")
+
+ if mismatches:
+ print("\nMismatches:")
+ for row in report["results"]:
+ if row["match"]:
+ continue
+ print(
+ f"- {row['relative_path']} [{row['type']}] "
+ f"diff@{row['first_diff_offset']}: {row['first_diff']}"
+ )
+
+ if issues_total:
+ print("\nIssues:")
+ for row in report["results"]:
+ if not row["issues"]:
+ continue
+ print(f"- {row['relative_path']} [{row['type']}]")
+ for issue in row["issues"]:
+ print(f" * {issue}")
+
+ finally:
+ if temp_created or args.cleanup:
+ shutil.rmtree(workdir, ignore_errors=True)
+
+ if failures > 0:
+ return 1
+ if report["summary"].get("mismatches", 0) > 0 and args.fail_on_diff:
+ return 1
+ return 0
+
+
+def build_parser() -> argparse.ArgumentParser:
+ parser = argparse.ArgumentParser(
+ description="NRes/RsLi tools: scan, unpack, repack, and roundtrip validation."
+ )
+ sub = parser.add_subparsers(dest="command", required=True)
+
+ scan = sub.add_parser("scan", help="Scan files by header signatures.")
+ scan.add_argument("--input", required=True, help="Root directory to scan.")
+ scan.add_argument("--json", action="store_true", help="Print JSON output.")
+ scan.set_defaults(func=cmd_scan)
+
+ nres_unpack = sub.add_parser("nres-unpack", help="Unpack a single NRes archive.")
+ nres_unpack.add_argument("--archive", required=True, help="Path to NRes file.")
+ nres_unpack.add_argument("--output", required=True, help="Output directory.")
+ nres_unpack.set_defaults(func=cmd_nres_unpack)
+
+ nres_pack = sub.add_parser("nres-pack", help="Pack NRes archive from manifest.")
+ nres_pack.add_argument("--manifest", required=True, help="Path to manifest.json.")
+ nres_pack.add_argument("--output", required=True, help="Output file path.")
+ nres_pack.set_defaults(func=cmd_nres_pack)
+
+ rsli_unpack = sub.add_parser("rsli-unpack", help="Unpack a single RsLi archive.")
+ rsli_unpack.add_argument("--archive", required=True, help="Path to RsLi file.")
+ rsli_unpack.add_argument("--output", required=True, help="Output directory.")
+ rsli_unpack.set_defaults(func=cmd_rsli_unpack)
+
+ rsli_pack = sub.add_parser("rsli-pack", help="Pack RsLi archive from manifest.")
+ rsli_pack.add_argument("--manifest", required=True, help="Path to manifest.json.")
+ rsli_pack.add_argument("--output", required=True, help="Output file path.")
+ rsli_pack.set_defaults(func=cmd_rsli_pack)
+
+ validate = sub.add_parser(
+ "validate",
+ help="Scan all archives and run unpack->repack->byte-compare validation.",
+ )
+ validate.add_argument("--input", required=True, help="Root with game data files.")
+ validate.add_argument(
+ "--workdir",
+ help="Working directory for temporary unpack/repack files. "
+ "If omitted, a temporary directory is used and removed automatically.",
+ )
+ validate.add_argument("--report", help="Optional JSON report output path.")
+ validate.add_argument(
+ "--fail-on-diff",
+ action="store_true",
+ help="Return non-zero exit code if any byte mismatch exists.",
+ )
+ validate.add_argument(
+ "--fail-on-issues",
+ action="store_true",
+ help="Return non-zero exit code if any spec issue was detected.",
+ )
+ validate.add_argument(
+ "--cleanup",
+ action="store_true",
+ help="Remove --workdir after completion.",
+ )
+ validate.set_defaults(func=cmd_validate)
+
+ return parser
+
+
+def main() -> int:
+ parser = build_parser()
+ args = parser.parse_args()
+ return int(args.func(args))
+
+
+if __name__ == "__main__":
+ raise SystemExit(main())
diff --git a/tools/init_testdata.py b/tools/init_testdata.py
new file mode 100644
index 0000000..4079cdb
--- /dev/null
+++ b/tools/init_testdata.py
@@ -0,0 +1,204 @@
+#!/usr/bin/env python3
+"""
+Initialize test data folders by archive signatures.
+
+The script scans all files in --input and copies matching archives into:
+ --output/nres/<relative path>
+ --output/rsli/<relative path>
+"""
+
+from __future__ import annotations
+
+import argparse
+import shutil
+import sys
+from pathlib import Path
+
+MAGIC_NRES = b"NRes"
+MAGIC_RSLI = b"NL\x00\x01"
+
+
+def is_relative_to(path: Path, base: Path) -> bool:
+ try:
+ path.relative_to(base)
+ except ValueError:
+ return False
+ return True
+
+
+def detect_archive_type(path: Path) -> str | None:
+ try:
+ with path.open("rb") as handle:
+ magic = handle.read(4)
+ except OSError as exc:
+ print(f"[warn] cannot read {path}: {exc}", file=sys.stderr)
+ return None
+
+ if magic == MAGIC_NRES:
+ return "nres"
+ if magic == MAGIC_RSLI:
+ return "rsli"
+ return None
+
+
+def scan_archives(input_root: Path, excluded_root: Path | None) -> list[tuple[Path, str]]:
+ found: list[tuple[Path, str]] = []
+ for path in sorted(input_root.rglob("*")):
+ if not path.is_file():
+ continue
+ if excluded_root and is_relative_to(path.resolve(), excluded_root):
+ continue
+
+ archive_type = detect_archive_type(path)
+ if archive_type:
+ found.append((path, archive_type))
+ return found
+
+
+def confirm_overwrite(path: Path) -> str:
+ prompt = (
+ f"File exists: {path}\n"
+ "Overwrite? [y]es / [n]o / [a]ll / [q]uit (default: n): "
+ )
+ while True:
+ try:
+ answer = input(prompt).strip().lower()
+ except EOFError:
+ return "quit"
+
+ if answer in {"", "n", "no"}:
+ return "no"
+ if answer in {"y", "yes"}:
+ return "yes"
+ if answer in {"a", "all"}:
+ return "all"
+ if answer in {"q", "quit"}:
+ return "quit"
+ print("Please answer with y, n, a, or q.")
+
+
+def copy_archives(
+ archives: list[tuple[Path, str]],
+ input_root: Path,
+ output_root: Path,
+ force: bool,
+) -> int:
+ copied = 0
+ skipped = 0
+ overwritten = 0
+ overwrite_all = force
+
+ type_counts = {"nres": 0, "rsli": 0}
+ for _, archive_type in archives:
+ type_counts[archive_type] += 1
+
+ print(
+ f"Found archives: total={len(archives)}, "
+ f"nres={type_counts['nres']}, rsli={type_counts['rsli']}"
+ )
+
+ for source, archive_type in archives:
+ rel_path = source.relative_to(input_root)
+ destination = output_root / archive_type / rel_path
+ destination.parent.mkdir(parents=True, exist_ok=True)
+
+ if destination.exists():
+ if destination.is_dir():
+ print(
+ f"[error] destination is a directory, expected file: {destination}",
+ file=sys.stderr,
+ )
+ return 2
+
+ if not overwrite_all:
+ if not sys.stdin.isatty():
+ print(
+ "[error] destination file exists but stdin is not interactive. "
+ "Use --force to overwrite without prompts.",
+ file=sys.stderr,
+ )
+ return 2
+
+ decision = confirm_overwrite(destination)
+ if decision == "quit":
+ print("Aborted by user.")
+ return 130
+ if decision == "no":
+ skipped += 1
+ continue
+ if decision == "all":
+ overwrite_all = True
+
+ overwritten += 1
+
+ try:
+ shutil.copy2(source, destination)
+ except OSError as exc:
+ print(f"[error] failed to copy {source} -> {destination}: {exc}", file=sys.stderr)
+ return 2
+ copied += 1
+
+ print(
+ f"Done: copied={copied}, overwritten={overwritten}, skipped={skipped}, "
+ f"output={output_root}"
+ )
+ return 0
+
+
+def build_parser() -> argparse.ArgumentParser:
+ parser = argparse.ArgumentParser(
+ description="Initialize test data by scanning NRes/RsLi signatures."
+ )
+ parser.add_argument(
+ "--input",
+ required=True,
+ help="Input directory to scan recursively.",
+ )
+ parser.add_argument(
+ "--output",
+ required=True,
+ help="Output root directory (archives go to nres/ and rsli/ subdirs).",
+ )
+ parser.add_argument(
+ "--force",
+ action="store_true",
+ help="Overwrite destination files without confirmation prompts.",
+ )
+ return parser
+
+
+def main() -> int:
+ args = build_parser().parse_args()
+
+ input_root = Path(args.input)
+ if not input_root.exists():
+ print(f"[error] input directory does not exist: {input_root}", file=sys.stderr)
+ return 2
+ if not input_root.is_dir():
+ print(f"[error] input path is not a directory: {input_root}", file=sys.stderr)
+ return 2
+
+ output_root = Path(args.output)
+ if output_root.exists() and not output_root.is_dir():
+ print(f"[error] output path exists and is not a directory: {output_root}", file=sys.stderr)
+ return 2
+
+ input_resolved = input_root.resolve()
+ output_resolved = output_root.resolve()
+ if input_resolved == output_resolved:
+ print("[error] input and output directories must be different.", file=sys.stderr)
+ return 2
+
+ excluded_root: Path | None = None
+ if is_relative_to(output_resolved, input_resolved):
+ excluded_root = output_resolved
+ print(f"Notice: output is inside input, skipping scan under: {excluded_root}")
+
+ archives = scan_archives(input_root, excluded_root)
+
+ output_root.mkdir(parents=True, exist_ok=True)
+ return copy_archives(archives, input_root, output_root, force=args.force)
+
+
+if __name__ == "__main__":
+ raise SystemExit(main())
diff --git a/tools/msh_doc_validator.py b/tools/msh_doc_validator.py
new file mode 100644
index 0000000..ff096a4
--- /dev/null
+++ b/tools/msh_doc_validator.py
@@ -0,0 +1,1000 @@
+#!/usr/bin/env python3
+"""
+Validate assumptions from docs/specs/msh.md on real game archives.
+
+The tool checks three groups:
+1) MSH model payloads (nested NRes in *.msh entries),
+2) Texm texture payloads,
+3) FXID effect payloads.
+"""
+
+from __future__ import annotations
+
+import argparse
+import json
+import math
+import struct
+from collections import Counter
+from pathlib import Path
+from typing import Any
+
+import archive_roundtrip_validator as arv
+
+MAGIC_NRES = b"NRes"
+MAGIC_PAGE = b"Page"
+
+TYPE_FXID = 0x44495846
+TYPE_TEXM = 0x6D786554
+
+FX_CMD_SIZE = {1: 224, 2: 148, 3: 200, 4: 204, 5: 112, 6: 4, 7: 208, 8: 248, 9: 208, 10: 208}
+TEXM_KNOWN_FORMATS = {0, 565, 556, 4444, 888, 8888}
+
+
+def _add_issue(
+ issues: list[dict[str, Any]],
+ severity: str,
+ category: str,
+ archive: Path,
+ entry_name: str | None,
+ message: str,
+) -> None:
+ issues.append(
+ {
+ "severity": severity,
+ "category": category,
+ "archive": str(archive),
+ "entry": entry_name,
+ "message": message,
+ }
+ )
+
+
+def _entry_payload(blob: bytes, entry: dict[str, Any]) -> bytes:
+ start = int(entry["data_offset"])
+ end = start + int(entry["size"])
+ return blob[start:end]
+
+
+def _entry_by_type(entries: list[dict[str, Any]]) -> dict[int, list[dict[str, Any]]]:
+ by_type: dict[int, list[dict[str, Any]]] = {}
+ for item in entries:
+ by_type.setdefault(int(item["type_id"]), []).append(item)
+ return by_type
+
+
+def _expect_single_resource(
+ by_type: dict[int, list[dict[str, Any]]],
+ type_id: int,
+ label: str,
+ issues: list[dict[str, Any]],
+ archive: Path,
+ model_name: str,
+ required: bool,
+) -> dict[str, Any] | None:
+ rows = by_type.get(type_id, [])
+ if not rows:
+ if required:
+ _add_issue(
+ issues,
+ "error",
+ "model-resource",
+ archive,
+ model_name,
+ f"missing required resource type={type_id} ({label})",
+ )
+ return None
+ if len(rows) > 1:
+ _add_issue(
+ issues,
+ "warning",
+ "model-resource",
+ archive,
+ model_name,
+ f"multiple resources type={type_id} ({label}); using first entry",
+ )
+ return rows[0]
+
+
+def _check_fixed_stride(
+ *,
+ entry: dict[str, Any],
+ stride: int,
+ label: str,
+ issues: list[dict[str, Any]],
+ archive: Path,
+ model_name: str,
+ enforce_attr3: bool = True,
+ enforce_attr2_zero: bool = True,
+) -> int:
+ size = int(entry["size"])
+ attr1 = int(entry["attr1"])
+ attr2 = int(entry["attr2"])
+ attr3 = int(entry["attr3"])
+
+ count = -1
+ if size % stride != 0:
+ _add_issue(
+ issues,
+ "error",
+ "model-stride",
+ archive,
+ model_name,
+ f"{label}: size={size} is not divisible by stride={stride}",
+ )
+ else:
+ count = size // stride
+ if attr1 != count:
+ _add_issue(
+ issues,
+ "error",
+ "model-attr",
+ archive,
+ model_name,
+ f"{label}: attr1={attr1} != size/stride={count}",
+ )
+ if enforce_attr3 and attr3 != stride:
+ _add_issue(
+ issues,
+ "error",
+ "model-attr",
+ archive,
+ model_name,
+ f"{label}: attr3={attr3} != {stride}",
+ )
+ if enforce_attr2_zero and attr2 != 0:
+ _add_issue(
+ issues,
+ "warning",
+ "model-attr",
+ archive,
+ model_name,
+ f"{label}: attr2={attr2} (expected 0 in known assets)",
+ )
+ return count
+
+
+def _validate_res10(
+ data: bytes,
+ node_count: int,
+ issues: list[dict[str, Any]],
+ archive: Path,
+ model_name: str,
+) -> None:
+ off = 0
+ for idx in range(node_count):
+ if off + 4 > len(data):
+ _add_issue(
+ issues,
+ "error",
+ "res10",
+ archive,
+ model_name,
+ f"record {idx}: missing u32 length (offset={off}, size={len(data)})",
+ )
+ return
+ ln = struct.unpack_from("<I", data, off)[0]
+ off += 4
+ need = ln + 1 if ln else 0
+ if off + need > len(data):
+ _add_issue(
+ issues,
+ "error",
+ "res10",
+ archive,
+ model_name,
+ f"record {idx}: out of bounds (len={ln}, need={need}, offset={off}, size={len(data)})",
+ )
+ return
+ if ln and data[off + ln] != 0:
+ _add_issue(
+ issues,
+ "warning",
+ "res10",
+ archive,
+ model_name,
+ f"record {idx}: missing trailing NUL at payload end",
+ )
+ off += need
+
+ if off != len(data):
+ _add_issue(
+ issues,
+ "error",
+ "res10",
+ archive,
+ model_name,
+ f"tail bytes after node records: consumed={off}, size={len(data)}",
+ )
+
+
+def _validate_model_payload(
+ model_blob: bytes,
+ archive: Path,
+ model_name: str,
+ issues: list[dict[str, Any]],
+ counters: Counter[str],
+) -> None:
+ counters["models_total"] += 1
+
+ if model_blob[:4] != MAGIC_NRES:
+ _add_issue(
+ issues,
+ "error",
+ "model-container",
+ archive,
+ model_name,
+ "payload is not NRes (missing magic)",
+ )
+ return
+
+ try:
+ parsed = arv.parse_nres(model_blob, source=f"{archive}:{model_name}")
+ except Exception as exc: # pylint: disable=broad-except
+ _add_issue(
+ issues,
+ "error",
+ "model-container",
+ archive,
+ model_name,
+ f"cannot parse nested NRes: {exc}",
+ )
+ return
+
+ for item in parsed.get("issues", []):
+ _add_issue(issues, "warning", "model-container", archive, model_name, str(item))
+
+ entries = parsed["entries"]
+ by_type = _entry_by_type(entries)
+
+ res1 = _expect_single_resource(by_type, 1, "Res1", issues, archive, model_name, True)
+ res2 = _expect_single_resource(by_type, 2, "Res2", issues, archive, model_name, True)
+ res3 = _expect_single_resource(by_type, 3, "Res3", issues, archive, model_name, True)
+ res4 = _expect_single_resource(by_type, 4, "Res4", issues, archive, model_name, False)
+ res5 = _expect_single_resource(by_type, 5, "Res5", issues, archive, model_name, False)
+ res6 = _expect_single_resource(by_type, 6, "Res6", issues, archive, model_name, True)
+ res7 = _expect_single_resource(by_type, 7, "Res7", issues, archive, model_name, False)
+ res8 = _expect_single_resource(by_type, 8, "Res8", issues, archive, model_name, False)
+ res10 = _expect_single_resource(by_type, 10, "Res10", issues, archive, model_name, False)
+ res13 = _expect_single_resource(by_type, 13, "Res13", issues, archive, model_name, True)
+ res15 = _expect_single_resource(by_type, 15, "Res15", issues, archive, model_name, False)
+ res16 = _expect_single_resource(by_type, 16, "Res16", issues, archive, model_name, False)
+ res18 = _expect_single_resource(by_type, 18, "Res18", issues, archive, model_name, False)
+ res19 = _expect_single_resource(by_type, 19, "Res19", issues, archive, model_name, False)
+
+ if not (res1 and res2 and res3 and res6 and res13):
+ return
+
+ # Res1
+ res1_stride = int(res1["attr3"])
+ if res1_stride not in (38, 24):
+ _add_issue(
+ issues,
+ "warning",
+ "res1",
+ archive,
+ model_name,
+ f"unexpected Res1 stride attr3={res1_stride} (known: 38 or 24)",
+ )
+ if res1_stride <= 0:
+ _add_issue(issues, "error", "res1", archive, model_name, f"invalid Res1 stride={res1_stride}")
+ return
+ if int(res1["size"]) % res1_stride != 0:
+ _add_issue(
+ issues,
+ "error",
+ "res1",
+ archive,
+ model_name,
+ f"Res1 size={res1['size']} not divisible by stride={res1_stride}",
+ )
+ return
+ node_count = int(res1["size"]) // res1_stride
+ if int(res1["attr1"]) != node_count:
+ _add_issue(
+ issues,
+ "error",
+ "res1",
+ archive,
+ model_name,
+ f"Res1 attr1={res1['attr1']} != node_count={node_count}",
+ )
+
+ # Res2
+ res2_size = int(res2["size"])
+ res2_attr1 = int(res2["attr1"])
+ res2_attr2 = int(res2["attr2"])
+ res2_attr3 = int(res2["attr3"])
+ if res2_size < 0x8C:
+ _add_issue(issues, "error", "res2", archive, model_name, f"Res2 too small: size={res2_size}")
+ return
+ slot_bytes = res2_size - 0x8C
+ slot_count = -1
+ if slot_bytes % 68 != 0:
+ _add_issue(
+ issues,
+ "error",
+ "res2",
+ archive,
+ model_name,
+ f"Res2 slot area not divisible by 68: slot_bytes={slot_bytes}",
+ )
+ else:
+ slot_count = slot_bytes // 68
+ if res2_attr1 != slot_count:
+ _add_issue(
+ issues,
+ "error",
+ "res2",
+ archive,
+ model_name,
+ f"Res2 attr1={res2_attr1} != slot_count={slot_count}",
+ )
+ if res2_attr2 != 0:
+ _add_issue(
+ issues,
+ "warning",
+ "res2",
+ archive,
+ model_name,
+ f"Res2 attr2={res2_attr2} (expected 0 in known assets)",
+ )
+ if res2_attr3 != 68:
+ _add_issue(
+ issues,
+ "error",
+ "res2",
+ archive,
+ model_name,
+ f"Res2 attr3={res2_attr3} != 68",
+ )
+
+ # Fixed-stride resources
+ vertex_count = _check_fixed_stride(
+ entry=res3,
+ stride=12,
+ label="Res3",
+ issues=issues,
+ archive=archive,
+ model_name=model_name,
+ )
+ _ = _check_fixed_stride(
+ entry=res4,
+ stride=4,
+ label="Res4",
+ issues=issues,
+ archive=archive,
+ model_name=model_name,
+ ) if res4 else None
+ _ = _check_fixed_stride(
+ entry=res5,
+ stride=4,
+ label="Res5",
+ issues=issues,
+ archive=archive,
+ model_name=model_name,
+ ) if res5 else None
+ index_count = _check_fixed_stride(
+ entry=res6,
+ stride=2,
+ label="Res6",
+ issues=issues,
+ archive=archive,
+ model_name=model_name,
+ )
+ tri_desc_count = _check_fixed_stride(
+ entry=res7,
+ stride=16,
+ label="Res7",
+ issues=issues,
+ archive=archive,
+ model_name=model_name,
+ ) if res7 else -1
+ anim_key_count = _check_fixed_stride(
+ entry=res8,
+ stride=24,
+ label="Res8",
+ issues=issues,
+ archive=archive,
+ model_name=model_name,
+ enforce_attr3=False, # format stores attr3=4 in data set
+ ) if res8 else -1
+ if res8 and int(res8["attr3"]) != 4:
+ _add_issue(
+ issues,
+ "error",
+ "res8",
+ archive,
+ model_name,
+ f"Res8 attr3={res8['attr3']} != 4",
+ )
+ if res13:
+ batch_count = _check_fixed_stride(
+ entry=res13,
+ stride=20,
+ label="Res13",
+ issues=issues,
+ archive=archive,
+ model_name=model_name,
+ )
+ else:
+ batch_count = -1
+ if res15:
+ _check_fixed_stride(
+ entry=res15,
+ stride=8,
+ label="Res15",
+ issues=issues,
+ archive=archive,
+ model_name=model_name,
+ )
+ if res16:
+ _check_fixed_stride(
+ entry=res16,
+ stride=8,
+ label="Res16",
+ issues=issues,
+ archive=archive,
+ model_name=model_name,
+ )
+ if res18:
+ _check_fixed_stride(
+ entry=res18,
+ stride=4,
+ label="Res18",
+ issues=issues,
+ archive=archive,
+ model_name=model_name,
+ )
+
+ if res19:
+ anim_map_count = _check_fixed_stride(
+ entry=res19,
+ stride=2,
+ label="Res19",
+ issues=issues,
+ archive=archive,
+ model_name=model_name,
+ enforce_attr3=False,
+ enforce_attr2_zero=False,
+ )
+ if int(res19["attr3"]) != 2:
+ _add_issue(
+ issues,
+ "error",
+ "res19",
+ archive,
+ model_name,
+ f"Res19 attr3={res19['attr3']} != 2",
+ )
+ else:
+ anim_map_count = -1
+
+ # Res10
+ if res10:
+ if int(res10["attr1"]) != int(res1["attr1"]):
+ _add_issue(
+ issues,
+ "error",
+ "res10",
+ archive,
+ model_name,
+ f"Res10 attr1={res10['attr1']} != Res1.attr1={res1['attr1']}",
+ )
+ if int(res10["attr3"]) != 0:
+ _add_issue(
+ issues,
+ "warning",
+ "res10",
+ archive,
+ model_name,
+ f"Res10 attr3={res10['attr3']} (known assets use 0)",
+ )
+ _validate_res10(_entry_payload(model_blob, res10), node_count, issues, archive, model_name)
+
+ # Cross-table checks.
+ if vertex_count > 0 and (res4 and int(res4["size"]) // 4 != vertex_count):
+ _add_issue(issues, "error", "model-cross", archive, model_name, "Res4 count != Res3 count")
+ if vertex_count > 0 and (res5 and int(res5["size"]) // 4 != vertex_count):
+ _add_issue(issues, "error", "model-cross", archive, model_name, "Res5 count != Res3 count")
+
+ indices: list[int] = []
+ if index_count > 0:
+ res6_data = _entry_payload(model_blob, res6)
+ indices = list(struct.unpack_from(f"<{index_count}H", res6_data, 0))
+
+ if batch_count > 0:
+ res13_data = _entry_payload(model_blob, res13)
+ for batch_idx in range(batch_count):
+ b_off = batch_idx * 20
+ (
+ _batch_flags,
+ _mat_idx,
+ _unk4,
+ _unk6,
+ idx_count,
+ idx_start,
+ _unk14,
+ base_vertex,
+ ) = struct.unpack_from("<HHHHHIHI", res13_data, b_off)
+ end = idx_start + idx_count
+ if index_count > 0 and end > index_count:
+ _add_issue(
+ issues,
+ "error",
+ "res13",
+ archive,
+ model_name,
+ f"batch {batch_idx}: index range [{idx_start}, {end}) outside Res6 count={index_count}",
+ )
+ continue
+ if idx_count % 3 != 0:
+ _add_issue(
+ issues,
+ "warning",
+ "res13",
+ archive,
+ model_name,
+ f"batch {batch_idx}: indexCount={idx_count} is not divisible by 3",
+ )
+ if vertex_count > 0 and index_count > 0 and idx_count > 0:
+ raw_slice = indices[idx_start:end]
+ max_raw = max(raw_slice)
+ if base_vertex + max_raw >= vertex_count:
+ _add_issue(
+ issues,
+ "error",
+ "res13",
+ archive,
+ model_name,
+ f"batch {batch_idx}: baseVertex+maxIndex={base_vertex + max_raw} >= vertex_count={vertex_count}",
+ )
+
+ if slot_count > 0:
+ res2_data = _entry_payload(model_blob, res2)
+ for slot_idx in range(slot_count):
+ s_off = 0x8C + slot_idx * 68
+ tri_start, tri_count, batch_start, slot_batch_count = struct.unpack_from("<4H", res2_data, s_off)
+ if tri_desc_count > 0 and tri_start + tri_count > tri_desc_count:
+ _add_issue(
+ issues,
+ "error",
+ "res2-slot",
+ archive,
+ model_name,
+ f"slot {slot_idx}: tri range [{tri_start}, {tri_start + tri_count}) outside Res7 count={tri_desc_count}",
+ )
+ if batch_count > 0 and batch_start + slot_batch_count > batch_count:
+ _add_issue(
+ issues,
+ "error",
+ "res2-slot",
+ archive,
+ model_name,
+ f"slot {slot_idx}: batch range [{batch_start}, {batch_start + slot_batch_count}) outside Res13 count={batch_count}",
+ )
+ # Slot bounds are 10 float values.
+ for f_idx in range(10):
+ value = struct.unpack_from("<f", res2_data, s_off + 8 + f_idx * 4)[0]
+ if not math.isfinite(value):
+ _add_issue(
+ issues,
+ "error",
+ "res2-slot",
+ archive,
+ model_name,
+ f"slot {slot_idx}: non-finite bound float at field {f_idx}",
+ )
+ break
+
+ if tri_desc_count > 0:
+ res7_data = _entry_payload(model_blob, res7)
+ for tri_idx in range(tri_desc_count):
+ t_off = tri_idx * 16
+ _flags, l0, l1, l2 = struct.unpack_from("<4H", res7_data, t_off)
+ for link in (l0, l1, l2):
+ if link != 0xFFFF and link >= tri_desc_count:
+ _add_issue(
+ issues,
+ "error",
+ "res7",
+ archive,
+ model_name,
+ f"tri {tri_idx}: link {link} outside tri_desc_count={tri_desc_count}",
+ )
+ _ = struct.unpack_from("<H", res7_data, t_off + 14)[0]
+
+ # Node-level constraints for slot matrix / animation mapping.
+ if res1_stride == 38:
+ res1_data = _entry_payload(model_blob, res1)
+ map_words: list[int] = []
+ if anim_map_count > 0 and res19:
+ res19_data = _entry_payload(model_blob, res19)
+ map_words = list(struct.unpack_from(f"<{anim_map_count}H", res19_data, 0))
+ frame_count = int(res19["attr2"]) if res19 else 0
+
+ for node_idx in range(node_count):
+ n_off = node_idx * 38
+ hdr2 = struct.unpack_from("<H", res1_data, n_off + 4)[0]
+ hdr3 = struct.unpack_from("<H", res1_data, n_off + 6)[0]
+ # Slot matrix: 15 uint16 at +8.
+ for w_idx in range(15):
+ slot_idx = struct.unpack_from("<H", res1_data, n_off + 8 + w_idx * 2)[0]
+ if slot_idx != 0xFFFF and slot_count > 0 and slot_idx >= slot_count:
+ _add_issue(
+ issues,
+ "error",
+ "res1-slot",
+ archive,
+ model_name,
+ f"node {node_idx}: slotIndex[{w_idx}]={slot_idx} outside slot_count={slot_count}",
+ )
+
+ if anim_key_count > 0 and hdr3 != 0xFFFF and hdr3 >= anim_key_count:
+ _add_issue(
+ issues,
+ "error",
+ "res1-anim",
+ archive,
+ model_name,
+ f"node {node_idx}: fallbackKeyIndex={hdr3} outside Res8 count={anim_key_count}",
+ )
+ if map_words and hdr2 != 0xFFFF and frame_count > 0:
+ end = hdr2 + frame_count
+ if end > len(map_words):
+ _add_issue(
+ issues,
+ "error",
+ "res19-map",
+ archive,
+ model_name,
+ f"node {node_idx}: map range [{hdr2}, {end}) outside Res19 count={len(map_words)}",
+ )
+
+ counters["models_ok"] += 1
+
+
+def _validate_texm_payload(
+ payload: bytes,
+ archive: Path,
+ entry_name: str,
+ issues: list[dict[str, Any]],
+ counters: Counter[str],
+) -> None:
+ counters["texm_total"] += 1
+
+ if len(payload) < 32:
+ _add_issue(
+ issues,
+ "error",
+ "texm",
+ archive,
+ entry_name,
+ f"payload too small: {len(payload)}",
+ )
+ return
+
+ magic, width, height, mip_count, flags4, flags5, unk6, fmt = struct.unpack_from("<8I", payload, 0)
+ if magic != TYPE_TEXM:
+ _add_issue(issues, "error", "texm", archive, entry_name, f"magic=0x{magic:08X} != Texm")
+ return
+ if width == 0 or height == 0:
+ _add_issue(issues, "error", "texm", archive, entry_name, f"invalid size {width}x{height}")
+ return
+ if mip_count == 0:
+ _add_issue(issues, "error", "texm", archive, entry_name, "mipCount=0")
+ return
+ if fmt not in TEXM_KNOWN_FORMATS:
+ _add_issue(
+ issues,
+ "error",
+ "texm",
+ archive,
+ entry_name,
+ f"unknown format code {fmt}",
+ )
+ return
+ if flags4 not in (0, 32):
+ _add_issue(
+ issues,
+ "warning",
+ "texm",
+ archive,
+ entry_name,
+ f"flags4={flags4} (known values: 0 or 32)",
+ )
+ if flags5 not in (0, 0x04000000, 0x00800000):
+ _add_issue(
+ issues,
+ "warning",
+ "texm",
+ archive,
+ entry_name,
+ f"flags5=0x{flags5:08X} (known values: 0, 0x00800000, 0x04000000)",
+ )
+
+ bpp = 1 if fmt == 0 else (2 if fmt in (565, 556, 4444) else 4)
+ pix_sum = 0
+ w = width
+ h = height
+ for _ in range(mip_count):
+ pix_sum += w * h
+ w = max(1, w >> 1)
+ h = max(1, h >> 1)
+ size_core = 32 + (1024 if fmt == 0 else 0) + bpp * pix_sum
+ if size_core > len(payload):
+ _add_issue(
+ issues,
+ "error",
+ "texm",
+ archive,
+ entry_name,
+ f"sizeCore={size_core} exceeds payload size={len(payload)}",
+ )
+ return
+
+ tail = len(payload) - size_core
+ if tail > 0:
+ off = size_core
+ if tail < 8:
+ _add_issue(
+ issues,
+ "error",
+ "texm",
+ archive,
+ entry_name,
+ f"tail too short for Page chunk: tail={tail}",
+ )
+ return
+ if payload[off : off + 4] != MAGIC_PAGE:
+ _add_issue(
+ issues,
+ "error",
+ "texm",
+ archive,
+ entry_name,
+ f"tail is present but no Page magic at offset {off}",
+ )
+ return
+ rect_count = struct.unpack_from("<I", payload, off + 4)[0]
+ need = 8 + rect_count * 8
+ if need > tail:
+ _add_issue(
+ issues,
+ "error",
+ "texm",
+ archive,
+ entry_name,
+ f"Page chunk truncated: need={need}, tail={tail}",
+ )
+ return
+ if need != tail:
+ _add_issue(
+ issues,
+ "error",
+ "texm",
+ archive,
+ entry_name,
+ f"extra bytes after Page chunk: tail={tail}, pageSize={need}",
+ )
+ return
+
+ _ = unk6 # carried as raw field in spec, semantics intentionally unknown.
+ counters["texm_ok"] += 1
+
+
+def _validate_fxid_payload(
+ payload: bytes,
+ archive: Path,
+ entry_name: str,
+ issues: list[dict[str, Any]],
+ counters: Counter[str],
+) -> None:
+ counters["fxid_total"] += 1
+
+ if len(payload) < 60:
+ _add_issue(
+ issues,
+ "error",
+ "fxid",
+ archive,
+ entry_name,
+ f"payload too small: {len(payload)}",
+ )
+ return
+
+ cmd_count = struct.unpack_from("<I", payload, 0)[0]
+ ptr = 0x3C
+ for idx in range(cmd_count):
+ if ptr + 4 > len(payload):
+ _add_issue(
+ issues,
+ "error",
+ "fxid",
+ archive,
+ entry_name,
+ f"command {idx}: missing header at offset={ptr}",
+ )
+ return
+ word = struct.unpack_from("<I", payload, ptr)[0]
+ opcode = word & 0xFF
+ if opcode not in FX_CMD_SIZE:
+ _add_issue(
+ issues,
+ "error",
+ "fxid",
+ archive,
+ entry_name,
+ f"command {idx}: unknown opcode={opcode} at offset={ptr}",
+ )
+ return
+ size = FX_CMD_SIZE[opcode]
+ if ptr + size > len(payload):
+ _add_issue(
+ issues,
+ "error",
+ "fxid",
+ archive,
+ entry_name,
+ f"command {idx}: truncated, need end={ptr + size}, payload={len(payload)}",
+ )
+ return
+ ptr += size
+
+ if ptr != len(payload):
+ _add_issue(
+ issues,
+ "error",
+ "fxid",
+ archive,
+ entry_name,
+ f"tail bytes after command stream: parsed_end={ptr}, payload={len(payload)}",
+ )
+ return
+
+ counters["fxid_ok"] += 1
+
+
+def _scan_nres_files(root: Path) -> list[Path]:
+ rows = arv.scan_archives(root)
+ out: list[Path] = []
+ for item in rows:
+ if item["type"] != "nres":
+ continue
+ out.append(root / item["relative_path"])
+ return out
+
+
+def run_validation(input_root: Path) -> dict[str, Any]:
+ archives = _scan_nres_files(input_root)
+ issues: list[dict[str, Any]] = []
+ counters: Counter[str] = Counter()
+
+ for archive_path in archives:
+ counters["archives_total"] += 1
+ data = archive_path.read_bytes()
+ try:
+ parsed = arv.parse_nres(data, source=str(archive_path))
+ except Exception as exc: # pylint: disable=broad-except
+ _add_issue(issues, "error", "archive", archive_path, None, f"cannot parse NRes: {exc}")
+ continue
+
+ for item in parsed.get("issues", []):
+ _add_issue(issues, "warning", "archive", archive_path, None, str(item))
+
+ for entry in parsed["entries"]:
+ name = str(entry["name"])
+ payload = _entry_payload(data, entry)
+ type_id = int(entry["type_id"])
+
+ if name.lower().endswith(".msh"):
+ _validate_model_payload(payload, archive_path, name, issues, counters)
+
+ if type_id == TYPE_TEXM:
+ _validate_texm_payload(payload, archive_path, name, issues, counters)
+
+ if type_id == TYPE_FXID:
+ _validate_fxid_payload(payload, archive_path, name, issues, counters)
+
+ errors = sum(1 for row in issues if row["severity"] == "error")
+ warnings = sum(1 for row in issues if row["severity"] == "warning")
+
+ return {
+ "input_root": str(input_root),
+ "summary": {
+ "archives_total": counters["archives_total"],
+ "models_total": counters["models_total"],
+ "models_ok": counters["models_ok"],
+ "texm_total": counters["texm_total"],
+ "texm_ok": counters["texm_ok"],
+ "fxid_total": counters["fxid_total"],
+ "fxid_ok": counters["fxid_ok"],
+ "errors": errors,
+ "warnings": warnings,
+ "issues_total": len(issues),
+ },
+ "issues": issues,
+ }
+
+
+def cmd_scan(args: argparse.Namespace) -> int:
+ root = Path(args.input).resolve()
+ report = run_validation(root)
+ summary = report["summary"]
+ print(f"Input root : {root}")
+ print(f"NRes archives : {summary['archives_total']}")
+ print(f"MSH models : {summary['models_total']}")
+ print(f"Texm textures : {summary['texm_total']}")
+ print(f"FXID effects : {summary['fxid_total']}")
+ return 0
+
+
+def cmd_validate(args: argparse.Namespace) -> int:
+ root = Path(args.input).resolve()
+ report = run_validation(root)
+ summary = report["summary"]
+
+ if args.report:
+ arv.dump_json(Path(args.report).resolve(), report)
+
+ print(f"Input root : {root}")
+ print(f"NRes archives : {summary['archives_total']}")
+ print(f"MSH models : {summary['models_ok']}/{summary['models_total']} valid")
+ print(f"Texm textures : {summary['texm_ok']}/{summary['texm_total']} valid")
+ print(f"FXID effects : {summary['fxid_ok']}/{summary['fxid_total']} valid")
+ print(f"Issues : {summary['issues_total']} (errors={summary['errors']}, warnings={summary['warnings']})")
+
+ if report["issues"]:
+ limit = max(1, int(args.print_limit))
+ print("\nSample issues:")
+ for item in report["issues"][:limit]:
+ where = item["archive"]
+ if item["entry"]:
+ where = f"{where}::{item['entry']}"
+ print(f"- [{item['severity']}] [{item['category']}] {where}: {item['message']}")
+ if len(report["issues"]) > limit:
+ print(f"... and {len(report['issues']) - limit} more issue(s)")
+
+ if summary["errors"] > 0:
+ return 1
+ if args.fail_on_warnings and summary["warnings"] > 0:
+ return 1
+ return 0
+
+
+def build_parser() -> argparse.ArgumentParser:
+ parser = argparse.ArgumentParser(
+ description="Validate docs/specs/msh.md assumptions on real archives."
+ )
+ sub = parser.add_subparsers(dest="command", required=True)
+
+ scan = sub.add_parser("scan", help="Quick scan and counts (models/textures/effects).")
+ scan.add_argument("--input", required=True, help="Root directory with game/test archives.")
+ scan.set_defaults(func=cmd_scan)
+
+ validate = sub.add_parser("validate", help="Run full spec validation.")
+ validate.add_argument("--input", required=True, help="Root directory with game/test archives.")
+ validate.add_argument("--report", help="Optional JSON report output path.")
+ validate.add_argument(
+ "--print-limit",
+ type=int,
+ default=50,
+ help="How many issues to print to stdout (default: 50).",
+ )
+ validate.add_argument(
+ "--fail-on-warnings",
+ action="store_true",
+ help="Return non-zero if warnings are present.",
+ )
+ validate.set_defaults(func=cmd_validate)
+
+ return parser
+
+
+def main() -> int:
+ parser = build_parser()
+ args = parser.parse_args()
+ return int(args.func(args))
+
+
+if __name__ == "__main__":
+ raise SystemExit(main())
diff --git a/tools/msh_export_obj.py b/tools/msh_export_obj.py
new file mode 100644
index 0000000..75a9602
--- /dev/null
+++ b/tools/msh_export_obj.py
@@ -0,0 +1,357 @@
+#!/usr/bin/env python3
+"""
+Export NGI MSH geometry to Wavefront OBJ.
+
+The exporter is intended for inspection/debugging and uses the same
+batch/slot selection logic as msh_preview_renderer.py.
+"""
+
+from __future__ import annotations
+
+import argparse
+import math
+import struct
+from pathlib import Path
+from typing import Any
+
+import archive_roundtrip_validator as arv
+
+MAGIC_NRES = b"NRes"
+
+
+def _entry_payload(blob: bytes, entry: dict[str, Any]) -> bytes:
+ start = int(entry["data_offset"])
+ end = start + int(entry["size"])
+ return blob[start:end]
+
+
+def _parse_nres(blob: bytes, source: str) -> dict[str, Any]:
+ if blob[:4] != MAGIC_NRES:
+ raise RuntimeError(f"{source}: not an NRes payload")
+ return arv.parse_nres(blob, source=source)
+
+
+def _by_type(entries: list[dict[str, Any]]) -> dict[int, list[dict[str, Any]]]:
+ out: dict[int, list[dict[str, Any]]] = {}
+ for row in entries:
+ out.setdefault(int(row["type_id"]), []).append(row)
+ return out
+
+
+def _get_single(by_type: dict[int, list[dict[str, Any]]], type_id: int, label: str) -> dict[str, Any]:
+ rows = by_type.get(type_id, [])
+ if not rows:
+ raise RuntimeError(f"missing resource type {type_id} ({label})")
+ return rows[0]
+
+
+def _pick_model_payload(archive_path: Path, model_name: str | None) -> tuple[bytes, str]:
+ root_blob = archive_path.read_bytes()
+ parsed = _parse_nres(root_blob, str(archive_path))
+
+ msh_entries = [row for row in parsed["entries"] if str(row["name"]).lower().endswith(".msh")]
+ if msh_entries:
+ chosen: dict[str, Any] | None = None
+ if model_name:
+ model_l = model_name.lower()
+ for row in msh_entries:
+ name_l = str(row["name"]).lower()
+ if name_l == model_l:
+ chosen = row
+ break
+ if chosen is None:
+ for row in msh_entries:
+ if str(row["name"]).lower().startswith(model_l):
+ chosen = row
+ break
+ else:
+ chosen = msh_entries[0]
+
+ if chosen is None:
+ names = ", ".join(str(row["name"]) for row in msh_entries[:12])
+ raise RuntimeError(
+ f"model '{model_name}' not found in {archive_path}. Available: {names}"
+ )
+ return _entry_payload(root_blob, chosen), str(chosen["name"])
+
+ by_type = _by_type(parsed["entries"])
+ if all(k in by_type for k in (1, 2, 3, 6, 13)):
+ return root_blob, archive_path.name
+
+ raise RuntimeError(
+ f"{archive_path} does not contain .msh entries and does not look like a direct model payload"
+ )
+
+
+def _extract_geometry(
+ model_blob: bytes,
+ *,
+ lod: int,
+ group: int,
+ max_faces: int,
+ all_batches: bool,
+) -> tuple[list[tuple[float, float, float]], list[tuple[int, int, int]], dict[str, int]]:
+ parsed = _parse_nres(model_blob, "<model>")
+ by_type = _by_type(parsed["entries"])
+
+ res1 = _get_single(by_type, 1, "Res1")
+ res2 = _get_single(by_type, 2, "Res2")
+ res3 = _get_single(by_type, 3, "Res3")
+ res6 = _get_single(by_type, 6, "Res6")
+ res13 = _get_single(by_type, 13, "Res13")
+
+ pos_blob = _entry_payload(model_blob, res3)
+ if len(pos_blob) % 12 != 0:
+ raise RuntimeError(f"Res3 size is not divisible by 12: {len(pos_blob)}")
+ vertex_count = len(pos_blob) // 12
+ positions = [struct.unpack_from("<3f", pos_blob, i * 12) for i in range(vertex_count)]
+
+ idx_blob = _entry_payload(model_blob, res6)
+ if len(idx_blob) % 2 != 0:
+ raise RuntimeError(f"Res6 size is not divisible by 2: {len(idx_blob)}")
+ index_count = len(idx_blob) // 2
+ indices = list(struct.unpack_from(f"<{index_count}H", idx_blob, 0))
+
+ batch_blob = _entry_payload(model_blob, res13)
+ if len(batch_blob) % 20 != 0:
+ raise RuntimeError(f"Res13 size is not divisible by 20: {len(batch_blob)}")
+ batch_count = len(batch_blob) // 20
+ batches: list[tuple[int, int, int, int]] = []
+ for i in range(batch_count):
+ off = i * 20
+ idx_count = struct.unpack_from("<H", batch_blob, off + 8)[0]
+ idx_start = struct.unpack_from("<I", batch_blob, off + 10)[0]
+ base_vertex = struct.unpack_from("<I", batch_blob, off + 16)[0]
+ batches.append((idx_count, idx_start, base_vertex, i))
+
+ res2_blob = _entry_payload(model_blob, res2)
+ if len(res2_blob) < 0x8C:
+ raise RuntimeError("Res2 is too small (< 0x8C)")
+ slot_blob = res2_blob[0x8C:]
+ if len(slot_blob) % 68 != 0:
+ raise RuntimeError(f"Res2 slot area is not divisible by 68: {len(slot_blob)}")
+ slot_count = len(slot_blob) // 68
+ slots: list[tuple[int, int, int, int]] = []
+ for i in range(slot_count):
+ off = i * 68
+ tri_start, tri_count, batch_start, slot_batch_count = struct.unpack_from("<4H", slot_blob, off)
+ slots.append((tri_start, tri_count, batch_start, slot_batch_count))
+
+ res1_blob = _entry_payload(model_blob, res1)
+ node_stride = int(res1["attr3"])
+ node_count = int(res1["attr1"])
+ node_slot_indices: list[int] = []
+ if not all_batches and node_stride >= 38 and len(res1_blob) >= node_count * node_stride:
+ if lod < 0 or lod > 2:
+ raise RuntimeError(f"lod must be 0..2 (got {lod})")
+ if group < 0 or group > 4:
+ raise RuntimeError(f"group must be 0..4 (got {group})")
+ matrix_index = lod * 5 + group
+ for n in range(node_count):
+ off = n * node_stride + 8 + matrix_index * 2
+ slot_idx = struct.unpack_from("<H", res1_blob, off)[0]
+ if slot_idx == 0xFFFF:
+ continue
+ if slot_idx >= slot_count:
+ continue
+ node_slot_indices.append(slot_idx)
+
+ faces: list[tuple[int, int, int]] = []
+ used_batches = 0
+ used_slots = 0
+
+ def append_batch(batch_idx: int) -> None:
+ nonlocal used_batches
+ if batch_idx < 0 or batch_idx >= len(batches):
+ return
+ idx_count, idx_start, base_vertex, _ = batches[batch_idx]
+ if idx_count < 3:
+ return
+ end = idx_start + idx_count
+ if end > len(indices):
+ return
+ used_batches += 1
+ tri_count = idx_count // 3
+ for t in range(tri_count):
+ i0 = indices[idx_start + t * 3 + 0] + base_vertex
+ i1 = indices[idx_start + t * 3 + 1] + base_vertex
+ i2 = indices[idx_start + t * 3 + 2] + base_vertex
+ if i0 >= vertex_count or i1 >= vertex_count or i2 >= vertex_count:
+ continue
+ faces.append((i0, i1, i2))
+ if len(faces) >= max_faces:
+ return
+
+ if node_slot_indices:
+ for slot_idx in node_slot_indices:
+ if len(faces) >= max_faces:
+ break
+ _tri_start, _tri_count, batch_start, slot_batch_count = slots[slot_idx]
+ used_slots += 1
+ for bi in range(batch_start, batch_start + slot_batch_count):
+ append_batch(bi)
+ if len(faces) >= max_faces:
+ break
+ else:
+ for bi in range(batch_count):
+ append_batch(bi)
+ if len(faces) >= max_faces:
+ break
+
+ if not faces:
+ raise RuntimeError("no faces selected for export")
+
+ meta = {
+ "vertex_count": vertex_count,
+ "index_count": index_count,
+ "batch_count": batch_count,
+ "slot_count": slot_count,
+ "node_count": node_count,
+ "used_slots": used_slots,
+ "used_batches": used_batches,
+ "face_count": len(faces),
+ }
+ return positions, faces, meta
+
+
+def _compute_vertex_normals(
+ positions: list[tuple[float, float, float]],
+ faces: list[tuple[int, int, int]],
+) -> list[tuple[float, float, float]]:
+ acc = [[0.0, 0.0, 0.0] for _ in positions]
+ for i0, i1, i2 in faces:
+ p0 = positions[i0]
+ p1 = positions[i1]
+ p2 = positions[i2]
+ ux = p1[0] - p0[0]
+ uy = p1[1] - p0[1]
+ uz = p1[2] - p0[2]
+ vx = p2[0] - p0[0]
+ vy = p2[1] - p0[1]
+ vz = p2[2] - p0[2]
+ nx = uy * vz - uz * vy
+ ny = uz * vx - ux * vz
+ nz = ux * vy - uy * vx
+ acc[i0][0] += nx
+ acc[i0][1] += ny
+ acc[i0][2] += nz
+ acc[i1][0] += nx
+ acc[i1][1] += ny
+ acc[i1][2] += nz
+ acc[i2][0] += nx
+ acc[i2][1] += ny
+ acc[i2][2] += nz
+
+ normals: list[tuple[float, float, float]] = []
+ for nx, ny, nz in acc:
+ ln = math.sqrt(nx * nx + ny * ny + nz * nz)
+ if ln <= 1e-12:
+ normals.append((0.0, 1.0, 0.0))
+ else:
+ normals.append((nx / ln, ny / ln, nz / ln))
+ return normals
+
+
+def _write_obj(
+ output_path: Path,
+ object_name: str,
+ positions: list[tuple[float, float, float]],
+ faces: list[tuple[int, int, int]],
+) -> None:
+ output_path.parent.mkdir(parents=True, exist_ok=True)
+ normals = _compute_vertex_normals(positions, faces)
+
+ with output_path.open("w", encoding="utf-8", newline="\n") as out:
+ out.write("# Exported by msh_export_obj.py\n")
+ out.write(f"o {object_name}\n")
+ for x, y, z in positions:
+ out.write(f"v {x:.9g} {y:.9g} {z:.9g}\n")
+ for nx, ny, nz in normals:
+ out.write(f"vn {nx:.9g} {ny:.9g} {nz:.9g}\n")
+ for i0, i1, i2 in faces:
+ a = i0 + 1
+ b = i1 + 1
+ c = i2 + 1
+ out.write(f"f {a}//{a} {b}//{b} {c}//{c}\n")
+
+
+def cmd_list_models(args: argparse.Namespace) -> int:
+ archive_path = Path(args.archive).resolve()
+ blob = archive_path.read_bytes()
+ parsed = _parse_nres(blob, str(archive_path))
+ rows = [row for row in parsed["entries"] if str(row["name"]).lower().endswith(".msh")]
+ print(f"Archive: {archive_path}")
+ print(f"MSH entries: {len(rows)}")
+ for row in rows:
+ print(f"- {row['name']}")
+ return 0
+
+
+def cmd_export(args: argparse.Namespace) -> int:
+ archive_path = Path(args.archive).resolve()
+ output_path = Path(args.output).resolve()
+
+ model_blob, model_label = _pick_model_payload(archive_path, args.model)
+ positions, faces, meta = _extract_geometry(
+ model_blob,
+ lod=int(args.lod),
+ group=int(args.group),
+ max_faces=int(args.max_faces),
+ all_batches=bool(args.all_batches),
+ )
+ obj_name = Path(model_label).stem or "msh_model"
+ _write_obj(output_path, obj_name, positions, faces)
+
+ print(f"Exported model : {model_label}")
+ print(f"Output OBJ : {output_path}")
+ print(f"Object name : {obj_name}")
+ print(
+ "Geometry : "
+ f"vertices={meta['vertex_count']}, faces={meta['face_count']}, "
+ f"batches={meta['used_batches']}/{meta['batch_count']}, slots={meta['used_slots']}/{meta['slot_count']}"
+ )
+ print(
+ "Mode : "
+ f"lod={args.lod}, group={args.group}, all_batches={bool(args.all_batches)}"
+ )
+ return 0
+
+
+def build_parser() -> argparse.ArgumentParser:
+ parser = argparse.ArgumentParser(
+ description="Export NGI MSH geometry to Wavefront OBJ."
+ )
+ sub = parser.add_subparsers(dest="command", required=True)
+
+ list_models = sub.add_parser("list-models", help="List .msh entries in an NRes archive.")
+ list_models.add_argument("--archive", required=True, help="Path to archive (e.g. animals.rlb).")
+ list_models.set_defaults(func=cmd_list_models)
+
+ export = sub.add_parser("export", help="Export one model to OBJ.")
+ export.add_argument("--archive", required=True, help="Path to NRes archive or direct model payload.")
+ export.add_argument(
+ "--model",
+ help="Model entry name (*.msh) inside archive. If omitted, first .msh is used.",
+ )
+ export.add_argument("--output", required=True, help="Output .obj path.")
+ export.add_argument("--lod", type=int, default=0, help="LOD index 0..2 (default: 0).")
+ export.add_argument("--group", type=int, default=0, help="Group index 0..4 (default: 0).")
+ export.add_argument("--max-faces", type=int, default=120000, help="Face limit (default: 120000).")
+ export.add_argument(
+ "--all-batches",
+ action="store_true",
+ help="Ignore slot matrix selection and export all batches.",
+ )
+ export.set_defaults(func=cmd_export)
+
+ return parser
+
+
+def main() -> int:
+ parser = build_parser()
+ args = parser.parse_args()
+ return int(args.func(args))
+
+
+if __name__ == "__main__":
+ raise SystemExit(main())
diff --git a/tools/msh_preview_renderer.py b/tools/msh_preview_renderer.py
new file mode 100644
index 0000000..53b4e63
--- /dev/null
+++ b/tools/msh_preview_renderer.py
@@ -0,0 +1,481 @@
+#!/usr/bin/env python3
+"""
+Primitive software renderer for NGI MSH models.
+
+Output format: binary PPM (P6), no external dependencies.
+"""
+
+from __future__ import annotations
+
+import argparse
+import math
+import struct
+from pathlib import Path
+from typing import Any
+
+import archive_roundtrip_validator as arv
+
+MAGIC_NRES = b"NRes"
+
+
+def _entry_payload(blob: bytes, entry: dict[str, Any]) -> bytes:
+ start = int(entry["data_offset"])
+ end = start + int(entry["size"])
+ return blob[start:end]
+
+
+def _parse_nres(blob: bytes, source: str) -> dict[str, Any]:
+ if blob[:4] != MAGIC_NRES:
+ raise RuntimeError(f"{source}: not an NRes payload")
+ return arv.parse_nres(blob, source=source)
+
+
+def _by_type(entries: list[dict[str, Any]]) -> dict[int, list[dict[str, Any]]]:
+ out: dict[int, list[dict[str, Any]]] = {}
+ for row in entries:
+ out.setdefault(int(row["type_id"]), []).append(row)
+ return out
+
+
+def _pick_model_payload(archive_path: Path, model_name: str | None) -> tuple[bytes, str]:
+ root_blob = archive_path.read_bytes()
+ parsed = _parse_nres(root_blob, str(archive_path))
+
+ msh_entries = [row for row in parsed["entries"] if str(row["name"]).lower().endswith(".msh")]
+ if msh_entries:
+ chosen: dict[str, Any] | None = None
+ if model_name:
+ model_l = model_name.lower()
+ for row in msh_entries:
+ name_l = str(row["name"]).lower()
+ if name_l == model_l:
+ chosen = row
+ break
+ if chosen is None:
+ for row in msh_entries:
+ if str(row["name"]).lower().startswith(model_l):
+ chosen = row
+ break
+ else:
+ chosen = msh_entries[0]
+
+ if chosen is None:
+ names = ", ".join(str(row["name"]) for row in msh_entries[:12])
+ raise RuntimeError(
+ f"model '{model_name}' not found in {archive_path}. Available: {names}"
+ )
+ return _entry_payload(root_blob, chosen), str(chosen["name"])
+
+ # Fallback: treat file itself as a model NRes payload.
+ by_type = _by_type(parsed["entries"])
+ if all(k in by_type for k in (1, 2, 3, 6, 13)):
+ return root_blob, archive_path.name
+
+ raise RuntimeError(
+ f"{archive_path} does not contain .msh entries and does not look like a direct model payload"
+ )
+
+
+def _get_single(by_type: dict[int, list[dict[str, Any]]], type_id: int, label: str) -> dict[str, Any]:
+ rows = by_type.get(type_id, [])
+ if not rows:
+ raise RuntimeError(f"missing resource type {type_id} ({label})")
+ return rows[0]
+
+
+def _extract_geometry(
+ model_blob: bytes,
+ *,
+ lod: int,
+ group: int,
+ max_faces: int,
+) -> tuple[list[tuple[float, float, float]], list[tuple[int, int, int]], dict[str, int]]:
+ parsed = _parse_nres(model_blob, "<model>")
+ by_type = _by_type(parsed["entries"])
+
+ res1 = _get_single(by_type, 1, "Res1")
+ res2 = _get_single(by_type, 2, "Res2")
+ res3 = _get_single(by_type, 3, "Res3")
+ res6 = _get_single(by_type, 6, "Res6")
+ res13 = _get_single(by_type, 13, "Res13")
+
+ # Positions
+ pos_blob = _entry_payload(model_blob, res3)
+ if len(pos_blob) % 12 != 0:
+ raise RuntimeError(f"Res3 size is not divisible by 12: {len(pos_blob)}")
+ vertex_count = len(pos_blob) // 12
+ positions = [struct.unpack_from("<3f", pos_blob, i * 12) for i in range(vertex_count)]
+
+ # Indices
+ idx_blob = _entry_payload(model_blob, res6)
+ if len(idx_blob) % 2 != 0:
+ raise RuntimeError(f"Res6 size is not divisible by 2: {len(idx_blob)}")
+ index_count = len(idx_blob) // 2
+ indices = list(struct.unpack_from(f"<{index_count}H", idx_blob, 0))
+
+ # Batches
+ batch_blob = _entry_payload(model_blob, res13)
+ if len(batch_blob) % 20 != 0:
+ raise RuntimeError(f"Res13 size is not divisible by 20: {len(batch_blob)}")
+ batch_count = len(batch_blob) // 20
+ batches: list[tuple[int, int, int, int]] = []
+ for i in range(batch_count):
+ off = i * 20
+ # Keep only fields used by renderer:
+ # indexCount, indexStart, baseVertex
+ idx_count = struct.unpack_from("<H", batch_blob, off + 8)[0]
+ idx_start = struct.unpack_from("<I", batch_blob, off + 10)[0]
+ base_vertex = struct.unpack_from("<I", batch_blob, off + 16)[0]
+ batches.append((idx_count, idx_start, base_vertex, i))
+
+ # Slots
+ res2_blob = _entry_payload(model_blob, res2)
+ if len(res2_blob) < 0x8C:
+ raise RuntimeError("Res2 is too small (< 0x8C)")
+ slot_blob = res2_blob[0x8C:]
+ if len(slot_blob) % 68 != 0:
+ raise RuntimeError(f"Res2 slot area is not divisible by 68: {len(slot_blob)}")
+ slot_count = len(slot_blob) // 68
+ slots: list[tuple[int, int, int, int]] = []
+ for i in range(slot_count):
+ off = i * 68
+ tri_start, tri_count, batch_start, slot_batch_count = struct.unpack_from("<4H", slot_blob, off)
+ slots.append((tri_start, tri_count, batch_start, slot_batch_count))
+
+ # Nodes / slot matrix
+ res1_blob = _entry_payload(model_blob, res1)
+ node_stride = int(res1["attr3"])
+ node_count = int(res1["attr1"])
+ node_slot_indices: list[int] = []
+ if node_stride >= 38 and len(res1_blob) >= node_count * node_stride:
+ if lod < 0 or lod > 2:
+ raise RuntimeError(f"lod must be 0..2 (got {lod})")
+ if group < 0 or group > 4:
+ raise RuntimeError(f"group must be 0..4 (got {group})")
+ matrix_index = lod * 5 + group
+ for n in range(node_count):
+ off = n * node_stride + 8 + matrix_index * 2
+ slot_idx = struct.unpack_from("<H", res1_blob, off)[0]
+ if slot_idx == 0xFFFF:
+ continue
+ if slot_idx >= slot_count:
+ continue
+ node_slot_indices.append(slot_idx)
+
+ # Build triangle list.
+ faces: list[tuple[int, int, int]] = []
+ used_batches = 0
+ used_slots = 0
+
+ def append_batch(batch_idx: int) -> None:
+ nonlocal used_batches
+ if batch_idx < 0 or batch_idx >= len(batches):
+ return
+ idx_count, idx_start, base_vertex, _ = batches[batch_idx]
+ if idx_count < 3:
+ return
+ end = idx_start + idx_count
+ if end > len(indices):
+ return
+ used_batches += 1
+ tri_count = idx_count // 3
+ for t in range(tri_count):
+ i0 = indices[idx_start + t * 3 + 0] + base_vertex
+ i1 = indices[idx_start + t * 3 + 1] + base_vertex
+ i2 = indices[idx_start + t * 3 + 2] + base_vertex
+ if i0 >= vertex_count or i1 >= vertex_count or i2 >= vertex_count:
+ continue
+ faces.append((i0, i1, i2))
+ if len(faces) >= max_faces:
+ return
+
+ if node_slot_indices:
+ for slot_idx in node_slot_indices:
+ if len(faces) >= max_faces:
+ break
+ _tri_start, _tri_count, batch_start, slot_batch_count = slots[slot_idx]
+ used_slots += 1
+ for bi in range(batch_start, batch_start + slot_batch_count):
+ append_batch(bi)
+ if len(faces) >= max_faces:
+ break
+ else:
+ # Fallback if slot matrix is unavailable: draw all batches.
+ for bi in range(batch_count):
+ append_batch(bi)
+ if len(faces) >= max_faces:
+ break
+
+ meta = {
+ "vertex_count": vertex_count,
+ "index_count": index_count,
+ "batch_count": batch_count,
+ "slot_count": slot_count,
+ "node_count": node_count,
+ "used_slots": used_slots,
+ "used_batches": used_batches,
+ "face_count": len(faces),
+ }
+ if not faces:
+ raise RuntimeError("no faces selected for rendering")
+ return positions, faces, meta
+
+
+def _write_ppm(path: Path, width: int, height: int, rgb: bytearray) -> None:
+ path.parent.mkdir(parents=True, exist_ok=True)
+ with path.open("wb") as handle:
+ handle.write(f"P6\n{width} {height}\n255\n".encode("ascii"))
+ handle.write(rgb)
+
+
+def _render_software(
+ positions: list[tuple[float, float, float]],
+ faces: list[tuple[int, int, int]],
+ *,
+ width: int,
+ height: int,
+ yaw_deg: float,
+ pitch_deg: float,
+ wireframe: bool,
+) -> bytearray:
+ xs = [p[0] for p in positions]
+ ys = [p[1] for p in positions]
+ zs = [p[2] for p in positions]
+ cx = (min(xs) + max(xs)) * 0.5
+ cy = (min(ys) + max(ys)) * 0.5
+ cz = (min(zs) + max(zs)) * 0.5
+ span = max(max(xs) - min(xs), max(ys) - min(ys), max(zs) - min(zs))
+ radius = max(span * 0.5, 1e-3)
+
+ yaw = math.radians(yaw_deg)
+ pitch = math.radians(pitch_deg)
+ cyaw = math.cos(yaw)
+ syaw = math.sin(yaw)
+ cpitch = math.cos(pitch)
+ spitch = math.sin(pitch)
+
+ camera_dist = radius * 3.2
+ scale = min(width, height) * 0.95
+
+ # Transform all vertices once.
+ vx: list[float] = []
+ vy: list[float] = []
+ vz: list[float] = []
+ sx: list[float] = []
+ sy: list[float] = []
+ for x, y, z in positions:
+ x0 = x - cx
+ y0 = y - cy
+ z0 = z - cz
+ x1 = cyaw * x0 + syaw * z0
+ z1 = -syaw * x0 + cyaw * z0
+ y2 = cpitch * y0 - spitch * z1
+ z2 = spitch * y0 + cpitch * z1 + camera_dist
+ if z2 < 1e-3:
+ z2 = 1e-3
+ vx.append(x1)
+ vy.append(y2)
+ vz.append(z2)
+ sx.append(width * 0.5 + (x1 / z2) * scale)
+ sy.append(height * 0.5 - (y2 / z2) * scale)
+
+ rgb = bytearray([16, 18, 24] * (width * height))
+ zbuf = [float("inf")] * (width * height)
+ light_dir = (0.35, 0.45, 1.0)
+ l_len = math.sqrt(light_dir[0] ** 2 + light_dir[1] ** 2 + light_dir[2] ** 2)
+ light = (light_dir[0] / l_len, light_dir[1] / l_len, light_dir[2] / l_len)
+
+ def edge(ax: float, ay: float, bx: float, by: float, px: float, py: float) -> float:
+ return (px - ax) * (by - ay) - (py - ay) * (bx - ax)
+
+ for i0, i1, i2 in faces:
+ x0 = sx[i0]
+ y0 = sy[i0]
+ x1 = sx[i1]
+ y1 = sy[i1]
+ x2 = sx[i2]
+ y2 = sy[i2]
+ area = edge(x0, y0, x1, y1, x2, y2)
+ if area == 0.0:
+ continue
+
+ # Shading from camera-space normal.
+ ux = vx[i1] - vx[i0]
+ uy = vy[i1] - vy[i0]
+ uz = vz[i1] - vz[i0]
+ wx = vx[i2] - vx[i0]
+ wy = vy[i2] - vy[i0]
+ wz = vz[i2] - vz[i0]
+ nx = uy * wz - uz * wy
+ ny = uz * wx - ux * wz
+ nz = ux * wy - uy * wx
+ n_len = math.sqrt(nx * nx + ny * ny + nz * nz)
+ if n_len > 0.0:
+ nx /= n_len
+ ny /= n_len
+ nz /= n_len
+ intensity = nx * light[0] + ny * light[1] + nz * light[2]
+ if intensity < 0.0:
+ intensity = 0.0
+ shade = int(45 + 200 * intensity)
+ color = (shade, shade, min(255, shade + 18))
+
+ minx = int(max(0, math.floor(min(x0, x1, x2))))
+ maxx = int(min(width - 1, math.ceil(max(x0, x1, x2))))
+ miny = int(max(0, math.floor(min(y0, y1, y2))))
+ maxy = int(min(height - 1, math.ceil(max(y0, y1, y2))))
+ if minx > maxx or miny > maxy:
+ continue
+
+ z0 = vz[i0]
+ z1 = vz[i1]
+ z2 = vz[i2]
+
+ for py in range(miny, maxy + 1):
+ fy = py + 0.5
+ row = py * width
+ for px in range(minx, maxx + 1):
+ fx = px + 0.5
+ w0 = edge(x1, y1, x2, y2, fx, fy)
+ w1 = edge(x2, y2, x0, y0, fx, fy)
+ w2 = edge(x0, y0, x1, y1, fx, fy)
+ if area > 0:
+ if w0 < 0 or w1 < 0 or w2 < 0:
+ continue
+ else:
+ if w0 > 0 or w1 > 0 or w2 > 0:
+ continue
+ inv_area = 1.0 / area
+ bz0 = w0 * inv_area
+ bz1 = w1 * inv_area
+ bz2 = w2 * inv_area
+ depth = bz0 * z0 + bz1 * z1 + bz2 * z2
+ idx = row + px
+ if depth >= zbuf[idx]:
+ continue
+ zbuf[idx] = depth
+ p = idx * 3
+ rgb[p + 0] = color[0]
+ rgb[p + 1] = color[1]
+ rgb[p + 2] = color[2]
+
+ if wireframe:
+ def draw_line(xa: float, ya: float, xb: float, yb: float) -> None:
+ x0i = int(round(xa))
+ y0i = int(round(ya))
+ x1i = int(round(xb))
+ y1i = int(round(yb))
+ dx = abs(x1i - x0i)
+ sx_step = 1 if x0i < x1i else -1
+ dy = -abs(y1i - y0i)
+ sy_step = 1 if y0i < y1i else -1
+ err = dx + dy
+ x = x0i
+ y = y0i
+ while True:
+ if 0 <= x < width and 0 <= y < height:
+ p = (y * width + x) * 3
+ rgb[p + 0] = 240
+ rgb[p + 1] = 245
+ rgb[p + 2] = 255
+ if x == x1i and y == y1i:
+ break
+ e2 = 2 * err
+ if e2 >= dy:
+ err += dy
+ x += sx_step
+ if e2 <= dx:
+ err += dx
+ y += sy_step
+
+ for i0, i1, i2 in faces:
+ draw_line(sx[i0], sy[i0], sx[i1], sy[i1])
+ draw_line(sx[i1], sy[i1], sx[i2], sy[i2])
+ draw_line(sx[i2], sy[i2], sx[i0], sy[i0])
+
+ return rgb
+
+
+def cmd_list_models(args: argparse.Namespace) -> int:
+ archive_path = Path(args.archive).resolve()
+ blob = archive_path.read_bytes()
+ parsed = _parse_nres(blob, str(archive_path))
+ rows = [row for row in parsed["entries"] if str(row["name"]).lower().endswith(".msh")]
+ print(f"Archive: {archive_path}")
+ print(f"MSH entries: {len(rows)}")
+ for row in rows:
+ print(f"- {row['name']}")
+ return 0
+
+
+def cmd_render(args: argparse.Namespace) -> int:
+ archive_path = Path(args.archive).resolve()
+ output_path = Path(args.output).resolve()
+
+ model_blob, model_label = _pick_model_payload(archive_path, args.model)
+ positions, faces, meta = _extract_geometry(
+ model_blob,
+ lod=int(args.lod),
+ group=int(args.group),
+ max_faces=int(args.max_faces),
+ )
+ rgb = _render_software(
+ positions,
+ faces,
+ width=int(args.width),
+ height=int(args.height),
+ yaw_deg=float(args.yaw),
+ pitch_deg=float(args.pitch),
+ wireframe=bool(args.wireframe),
+ )
+ _write_ppm(output_path, int(args.width), int(args.height), rgb)
+
+ print(f"Rendered model: {model_label}")
+ print(f"Output : {output_path}")
+ print(
+ "Geometry : "
+ f"vertices={meta['vertex_count']}, faces={meta['face_count']}, "
+ f"batches={meta['used_batches']}/{meta['batch_count']}, slots={meta['used_slots']}/{meta['slot_count']}"
+ )
+ print(f"Mode : lod={args.lod}, group={args.group}, wireframe={bool(args.wireframe)}")
+ return 0
+
+
+def build_parser() -> argparse.ArgumentParser:
+ parser = argparse.ArgumentParser(
+ description="Primitive NGI MSH renderer (software, dependency-free)."
+ )
+ sub = parser.add_subparsers(dest="command", required=True)
+
+ list_models = sub.add_parser("list-models", help="List .msh entries in an NRes archive.")
+ list_models.add_argument("--archive", required=True, help="Path to archive (e.g. animals.rlb).")
+ list_models.set_defaults(func=cmd_list_models)
+
+ render = sub.add_parser("render", help="Render one model to PPM image.")
+ render.add_argument("--archive", required=True, help="Path to NRes archive or direct model payload.")
+ render.add_argument(
+ "--model",
+ help="Model entry name (*.msh) inside archive. If omitted, first .msh is used.",
+ )
+ render.add_argument("--output", required=True, help="Output .ppm file path.")
+ render.add_argument("--lod", type=int, default=0, help="LOD index 0..2 (default: 0).")
+ render.add_argument("--group", type=int, default=0, help="Group index 0..4 (default: 0).")
+ render.add_argument("--max-faces", type=int, default=120000, help="Face limit (default: 120000).")
+ render.add_argument("--width", type=int, default=1280, help="Image width (default: 1280).")
+ render.add_argument("--height", type=int, default=720, help="Image height (default: 720).")
+ render.add_argument("--yaw", type=float, default=35.0, help="Yaw angle in degrees (default: 35).")
+ render.add_argument("--pitch", type=float, default=18.0, help="Pitch angle in degrees (default: 18).")
+ render.add_argument("--wireframe", action="store_true", help="Draw white wireframe overlay.")
+ render.set_defaults(func=cmd_render)
+
+ return parser
+
+
+def main() -> int:
+ parser = build_parser()
+ args = parser.parse_args()
+ return int(args.func(args))
+
+
+if __name__ == "__main__":
+ raise SystemExit(main())