From 5035d022206bf9ace54a43b4d65abe0b9fc0f361 Mon Sep 17 00:00:00 2001 From: Valentin Popov Date: Tue, 10 Feb 2026 23:27:43 +0000 Subject: Add MSH geometry export and preview rendering tools - Implemented msh_export_obj.py for exporting NGI MSH geometry to Wavefront OBJ format, including model selection and geometry extraction. - Added msh_preview_renderer.py for rendering NGI MSH models to binary PPM images, featuring a primitive software renderer with customizable parameters. - Both tools utilize the same NRes parsing logic and provide command-line interfaces for listing models and exporting or rendering geometry. --- tools/README.md | 94 ++++ tools/msh_doc_validator.py | 1000 +++++++++++++++++++++++++++++++++++++++++ tools/msh_export_obj.py | 357 +++++++++++++++ tools/msh_preview_renderer.py | 481 ++++++++++++++++++++ 4 files changed, 1932 insertions(+) create mode 100644 tools/msh_doc_validator.py create mode 100644 tools/msh_export_obj.py create mode 100644 tools/msh_preview_renderer.py (limited to 'tools') diff --git a/tools/README.md b/tools/README.md index 19de2e5..2418567 100644 --- a/tools/README.md +++ b/tools/README.md @@ -105,3 +105,97 @@ python3 tools/init_testdata.py --input tmp/gamedata --output testdata --force - если `--output` указывает на существующий файл, скрипт завершится с ошибкой; - если `--output` расположен внутри `--input`, каталог вывода исключается из сканирования; - если `stdin` неинтерактивный и требуется перезапись, нужно явно указать `--force`. + +## `msh_doc_validator.py` + +Скрипт валидирует ключевые инварианты из документации `/Users/valentineus/Developer/personal/fparkan/docs/specs/msh.md` на реальных данных. + +Проверяемые группы: + +- модели `*.msh` (вложенные `NRes` в архивах `NRes`); +- текстуры `Texm` (`type_id = 0x6D786554`); +- эффекты `FXID` (`type_id = 0x44495846`). + +Что проверяет для моделей: + +- обязательные ресурсы (`Res1/2/3/6/13`) и известные опциональные (`Res4/5/7/8/10/15/16/18/19`); +- `size/attr1/attr3` и шаги структур по таблицам; +- диапазоны индексов, батчей и ссылок между таблицами; +- разбор `Res10` как `len + bytes + NUL` для каждого узла; +- матрицу слотов в `Res1` (LOD/group) и границы по `Res2/Res7/Res13/Res19`. + +Быстрый запуск: + +```bash +python3 tools/msh_doc_validator.py scan --input testdata/nres +python3 tools/msh_doc_validator.py validate --input testdata/nres --print-limit 20 +``` + +С отчётом в JSON: + +```bash +python3 tools/msh_doc_validator.py validate \ + --input testdata/nres \ + --report tmp/msh_validation_report.json \ + --fail-on-warnings +``` + +## `msh_preview_renderer.py` + +Примитивный программный рендерер моделей `*.msh` без внешних зависимостей. + +- вход: архив `NRes` (например `animals.rlb`) или прямой payload модели; +- выход: изображение `PPM` (`P6`); +- использует `Res3` (позиции), `Res6` (индексы), `Res13` (батчи), `Res1/Res2` (выбор слотов по `lod/group`). + +Показать доступные модели в архиве: + +```bash +python3 tools/msh_preview_renderer.py list-models --archive testdata/nres/animals.rlb +``` + +Сгенерировать тестовый рендер: + +```bash +python3 tools/msh_preview_renderer.py render \ + --archive testdata/nres/animals.rlb \ + --model A_L_01.msh \ + --output tmp/renders/A_L_01.ppm \ + --width 800 \ + --height 600 \ + --lod 0 \ + --group 0 \ + --wireframe +``` + +Ограничения: + +- инструмент предназначен для smoke-теста геометрии, а не для пиксельно-точного рендера движка; +- текстуры/материалы/эффектные проходы не эмулируются. + +## `msh_export_obj.py` + +Экспортирует геометрию `*.msh` в `Wavefront OBJ`, чтобы открыть модель в Blender/MeshLab. + +- вход: `NRes` архив (например `animals.rlb`) или прямой payload модели; +- выбор геометрии: через `Res1` slot matrix (`lod/group`) как в рендерере; +- опция `--all-batches` экспортирует все батчи, игнорируя slot matrix. + +Показать модели в архиве: + +```bash +python3 tools/msh_export_obj.py list-models --archive testdata/nres/animals.rlb +``` + +Экспорт в OBJ: + +```bash +python3 tools/msh_export_obj.py export \ + --archive testdata/nres/animals.rlb \ + --model A_L_01.msh \ + --output tmp/renders/A_L_01.obj \ + --lod 0 \ + --group 0 +``` + +Файл `OBJ` можно открыть напрямую в Blender (`File -> Import -> Wavefront (.obj)`). diff --git a/tools/msh_doc_validator.py b/tools/msh_doc_validator.py new file mode 100644 index 0000000..ff096a4 --- /dev/null +++ b/tools/msh_doc_validator.py @@ -0,0 +1,1000 @@ +#!/usr/bin/env python3 +""" +Validate assumptions from docs/specs/msh.md on real game archives. + +The tool checks three groups: +1) MSH model payloads (nested NRes in *.msh entries), +2) Texm texture payloads, +3) FXID effect payloads. +""" + +from __future__ import annotations + +import argparse +import json +import math +import struct +from collections import Counter +from pathlib import Path +from typing import Any + +import archive_roundtrip_validator as arv + +MAGIC_NRES = b"NRes" +MAGIC_PAGE = b"Page" + +TYPE_FXID = 0x44495846 +TYPE_TEXM = 0x6D786554 + +FX_CMD_SIZE = {1: 224, 2: 148, 3: 200, 4: 204, 5: 112, 6: 4, 7: 208, 8: 248, 9: 208, 10: 208} +TEXM_KNOWN_FORMATS = {0, 565, 556, 4444, 888, 8888} + + +def _add_issue( + issues: list[dict[str, Any]], + severity: str, + category: str, + archive: Path, + entry_name: str | None, + message: str, +) -> None: + issues.append( + { + "severity": severity, + "category": category, + "archive": str(archive), + "entry": entry_name, + "message": message, + } + ) + + +def _entry_payload(blob: bytes, entry: dict[str, Any]) -> bytes: + start = int(entry["data_offset"]) + end = start + int(entry["size"]) + return blob[start:end] + + +def _entry_by_type(entries: list[dict[str, Any]]) -> dict[int, list[dict[str, Any]]]: + by_type: dict[int, list[dict[str, Any]]] = {} + for item in entries: + by_type.setdefault(int(item["type_id"]), []).append(item) + return by_type + + +def _expect_single_resource( + by_type: dict[int, list[dict[str, Any]]], + type_id: int, + label: str, + issues: list[dict[str, Any]], + archive: Path, + model_name: str, + required: bool, +) -> dict[str, Any] | None: + rows = by_type.get(type_id, []) + if not rows: + if required: + _add_issue( + issues, + "error", + "model-resource", + archive, + model_name, + f"missing required resource type={type_id} ({label})", + ) + return None + if len(rows) > 1: + _add_issue( + issues, + "warning", + "model-resource", + archive, + model_name, + f"multiple resources type={type_id} ({label}); using first entry", + ) + return rows[0] + + +def _check_fixed_stride( + *, + entry: dict[str, Any], + stride: int, + label: str, + issues: list[dict[str, Any]], + archive: Path, + model_name: str, + enforce_attr3: bool = True, + enforce_attr2_zero: bool = True, +) -> int: + size = int(entry["size"]) + attr1 = int(entry["attr1"]) + attr2 = int(entry["attr2"]) + attr3 = int(entry["attr3"]) + + count = -1 + if size % stride != 0: + _add_issue( + issues, + "error", + "model-stride", + archive, + model_name, + f"{label}: size={size} is not divisible by stride={stride}", + ) + else: + count = size // stride + if attr1 != count: + _add_issue( + issues, + "error", + "model-attr", + archive, + model_name, + f"{label}: attr1={attr1} != size/stride={count}", + ) + if enforce_attr3 and attr3 != stride: + _add_issue( + issues, + "error", + "model-attr", + archive, + model_name, + f"{label}: attr3={attr3} != {stride}", + ) + if enforce_attr2_zero and attr2 != 0: + _add_issue( + issues, + "warning", + "model-attr", + archive, + model_name, + f"{label}: attr2={attr2} (expected 0 in known assets)", + ) + return count + + +def _validate_res10( + data: bytes, + node_count: int, + issues: list[dict[str, Any]], + archive: Path, + model_name: str, +) -> None: + off = 0 + for idx in range(node_count): + if off + 4 > len(data): + _add_issue( + issues, + "error", + "res10", + archive, + model_name, + f"record {idx}: missing u32 length (offset={off}, size={len(data)})", + ) + return + ln = struct.unpack_from(" len(data): + _add_issue( + issues, + "error", + "res10", + archive, + model_name, + f"record {idx}: out of bounds (len={ln}, need={need}, offset={off}, size={len(data)})", + ) + return + if ln and data[off + ln] != 0: + _add_issue( + issues, + "warning", + "res10", + archive, + model_name, + f"record {idx}: missing trailing NUL at payload end", + ) + off += need + + if off != len(data): + _add_issue( + issues, + "error", + "res10", + archive, + model_name, + f"tail bytes after node records: consumed={off}, size={len(data)}", + ) + + +def _validate_model_payload( + model_blob: bytes, + archive: Path, + model_name: str, + issues: list[dict[str, Any]], + counters: Counter[str], +) -> None: + counters["models_total"] += 1 + + if model_blob[:4] != MAGIC_NRES: + _add_issue( + issues, + "error", + "model-container", + archive, + model_name, + "payload is not NRes (missing magic)", + ) + return + + try: + parsed = arv.parse_nres(model_blob, source=f"{archive}:{model_name}") + except Exception as exc: # pylint: disable=broad-except + _add_issue( + issues, + "error", + "model-container", + archive, + model_name, + f"cannot parse nested NRes: {exc}", + ) + return + + for item in parsed.get("issues", []): + _add_issue(issues, "warning", "model-container", archive, model_name, str(item)) + + entries = parsed["entries"] + by_type = _entry_by_type(entries) + + res1 = _expect_single_resource(by_type, 1, "Res1", issues, archive, model_name, True) + res2 = _expect_single_resource(by_type, 2, "Res2", issues, archive, model_name, True) + res3 = _expect_single_resource(by_type, 3, "Res3", issues, archive, model_name, True) + res4 = _expect_single_resource(by_type, 4, "Res4", issues, archive, model_name, False) + res5 = _expect_single_resource(by_type, 5, "Res5", issues, archive, model_name, False) + res6 = _expect_single_resource(by_type, 6, "Res6", issues, archive, model_name, True) + res7 = _expect_single_resource(by_type, 7, "Res7", issues, archive, model_name, False) + res8 = _expect_single_resource(by_type, 8, "Res8", issues, archive, model_name, False) + res10 = _expect_single_resource(by_type, 10, "Res10", issues, archive, model_name, False) + res13 = _expect_single_resource(by_type, 13, "Res13", issues, archive, model_name, True) + res15 = _expect_single_resource(by_type, 15, "Res15", issues, archive, model_name, False) + res16 = _expect_single_resource(by_type, 16, "Res16", issues, archive, model_name, False) + res18 = _expect_single_resource(by_type, 18, "Res18", issues, archive, model_name, False) + res19 = _expect_single_resource(by_type, 19, "Res19", issues, archive, model_name, False) + + if not (res1 and res2 and res3 and res6 and res13): + return + + # Res1 + res1_stride = int(res1["attr3"]) + if res1_stride not in (38, 24): + _add_issue( + issues, + "warning", + "res1", + archive, + model_name, + f"unexpected Res1 stride attr3={res1_stride} (known: 38 or 24)", + ) + if res1_stride <= 0: + _add_issue(issues, "error", "res1", archive, model_name, f"invalid Res1 stride={res1_stride}") + return + if int(res1["size"]) % res1_stride != 0: + _add_issue( + issues, + "error", + "res1", + archive, + model_name, + f"Res1 size={res1['size']} not divisible by stride={res1_stride}", + ) + return + node_count = int(res1["size"]) // res1_stride + if int(res1["attr1"]) != node_count: + _add_issue( + issues, + "error", + "res1", + archive, + model_name, + f"Res1 attr1={res1['attr1']} != node_count={node_count}", + ) + + # Res2 + res2_size = int(res2["size"]) + res2_attr1 = int(res2["attr1"]) + res2_attr2 = int(res2["attr2"]) + res2_attr3 = int(res2["attr3"]) + if res2_size < 0x8C: + _add_issue(issues, "error", "res2", archive, model_name, f"Res2 too small: size={res2_size}") + return + slot_bytes = res2_size - 0x8C + slot_count = -1 + if slot_bytes % 68 != 0: + _add_issue( + issues, + "error", + "res2", + archive, + model_name, + f"Res2 slot area not divisible by 68: slot_bytes={slot_bytes}", + ) + else: + slot_count = slot_bytes // 68 + if res2_attr1 != slot_count: + _add_issue( + issues, + "error", + "res2", + archive, + model_name, + f"Res2 attr1={res2_attr1} != slot_count={slot_count}", + ) + if res2_attr2 != 0: + _add_issue( + issues, + "warning", + "res2", + archive, + model_name, + f"Res2 attr2={res2_attr2} (expected 0 in known assets)", + ) + if res2_attr3 != 68: + _add_issue( + issues, + "error", + "res2", + archive, + model_name, + f"Res2 attr3={res2_attr3} != 68", + ) + + # Fixed-stride resources + vertex_count = _check_fixed_stride( + entry=res3, + stride=12, + label="Res3", + issues=issues, + archive=archive, + model_name=model_name, + ) + _ = _check_fixed_stride( + entry=res4, + stride=4, + label="Res4", + issues=issues, + archive=archive, + model_name=model_name, + ) if res4 else None + _ = _check_fixed_stride( + entry=res5, + stride=4, + label="Res5", + issues=issues, + archive=archive, + model_name=model_name, + ) if res5 else None + index_count = _check_fixed_stride( + entry=res6, + stride=2, + label="Res6", + issues=issues, + archive=archive, + model_name=model_name, + ) + tri_desc_count = _check_fixed_stride( + entry=res7, + stride=16, + label="Res7", + issues=issues, + archive=archive, + model_name=model_name, + ) if res7 else -1 + anim_key_count = _check_fixed_stride( + entry=res8, + stride=24, + label="Res8", + issues=issues, + archive=archive, + model_name=model_name, + enforce_attr3=False, # format stores attr3=4 in data set + ) if res8 else -1 + if res8 and int(res8["attr3"]) != 4: + _add_issue( + issues, + "error", + "res8", + archive, + model_name, + f"Res8 attr3={res8['attr3']} != 4", + ) + if res13: + batch_count = _check_fixed_stride( + entry=res13, + stride=20, + label="Res13", + issues=issues, + archive=archive, + model_name=model_name, + ) + else: + batch_count = -1 + if res15: + _check_fixed_stride( + entry=res15, + stride=8, + label="Res15", + issues=issues, + archive=archive, + model_name=model_name, + ) + if res16: + _check_fixed_stride( + entry=res16, + stride=8, + label="Res16", + issues=issues, + archive=archive, + model_name=model_name, + ) + if res18: + _check_fixed_stride( + entry=res18, + stride=4, + label="Res18", + issues=issues, + archive=archive, + model_name=model_name, + ) + + if res19: + anim_map_count = _check_fixed_stride( + entry=res19, + stride=2, + label="Res19", + issues=issues, + archive=archive, + model_name=model_name, + enforce_attr3=False, + enforce_attr2_zero=False, + ) + if int(res19["attr3"]) != 2: + _add_issue( + issues, + "error", + "res19", + archive, + model_name, + f"Res19 attr3={res19['attr3']} != 2", + ) + else: + anim_map_count = -1 + + # Res10 + if res10: + if int(res10["attr1"]) != int(res1["attr1"]): + _add_issue( + issues, + "error", + "res10", + archive, + model_name, + f"Res10 attr1={res10['attr1']} != Res1.attr1={res1['attr1']}", + ) + if int(res10["attr3"]) != 0: + _add_issue( + issues, + "warning", + "res10", + archive, + model_name, + f"Res10 attr3={res10['attr3']} (known assets use 0)", + ) + _validate_res10(_entry_payload(model_blob, res10), node_count, issues, archive, model_name) + + # Cross-table checks. + if vertex_count > 0 and (res4 and int(res4["size"]) // 4 != vertex_count): + _add_issue(issues, "error", "model-cross", archive, model_name, "Res4 count != Res3 count") + if vertex_count > 0 and (res5 and int(res5["size"]) // 4 != vertex_count): + _add_issue(issues, "error", "model-cross", archive, model_name, "Res5 count != Res3 count") + + indices: list[int] = [] + if index_count > 0: + res6_data = _entry_payload(model_blob, res6) + indices = list(struct.unpack_from(f"<{index_count}H", res6_data, 0)) + + if batch_count > 0: + res13_data = _entry_payload(model_blob, res13) + for batch_idx in range(batch_count): + b_off = batch_idx * 20 + ( + _batch_flags, + _mat_idx, + _unk4, + _unk6, + idx_count, + idx_start, + _unk14, + base_vertex, + ) = struct.unpack_from(" 0 and end > index_count: + _add_issue( + issues, + "error", + "res13", + archive, + model_name, + f"batch {batch_idx}: index range [{idx_start}, {end}) outside Res6 count={index_count}", + ) + continue + if idx_count % 3 != 0: + _add_issue( + issues, + "warning", + "res13", + archive, + model_name, + f"batch {batch_idx}: indexCount={idx_count} is not divisible by 3", + ) + if vertex_count > 0 and index_count > 0 and idx_count > 0: + raw_slice = indices[idx_start:end] + max_raw = max(raw_slice) + if base_vertex + max_raw >= vertex_count: + _add_issue( + issues, + "error", + "res13", + archive, + model_name, + f"batch {batch_idx}: baseVertex+maxIndex={base_vertex + max_raw} >= vertex_count={vertex_count}", + ) + + if slot_count > 0: + res2_data = _entry_payload(model_blob, res2) + for slot_idx in range(slot_count): + s_off = 0x8C + slot_idx * 68 + tri_start, tri_count, batch_start, slot_batch_count = struct.unpack_from("<4H", res2_data, s_off) + if tri_desc_count > 0 and tri_start + tri_count > tri_desc_count: + _add_issue( + issues, + "error", + "res2-slot", + archive, + model_name, + f"slot {slot_idx}: tri range [{tri_start}, {tri_start + tri_count}) outside Res7 count={tri_desc_count}", + ) + if batch_count > 0 and batch_start + slot_batch_count > batch_count: + _add_issue( + issues, + "error", + "res2-slot", + archive, + model_name, + f"slot {slot_idx}: batch range [{batch_start}, {batch_start + slot_batch_count}) outside Res13 count={batch_count}", + ) + # Slot bounds are 10 float values. + for f_idx in range(10): + value = struct.unpack_from(" 0: + res7_data = _entry_payload(model_blob, res7) + for tri_idx in range(tri_desc_count): + t_off = tri_idx * 16 + _flags, l0, l1, l2 = struct.unpack_from("<4H", res7_data, t_off) + for link in (l0, l1, l2): + if link != 0xFFFF and link >= tri_desc_count: + _add_issue( + issues, + "error", + "res7", + archive, + model_name, + f"tri {tri_idx}: link {link} outside tri_desc_count={tri_desc_count}", + ) + _ = struct.unpack_from(" 0 and res19: + res19_data = _entry_payload(model_blob, res19) + map_words = list(struct.unpack_from(f"<{anim_map_count}H", res19_data, 0)) + frame_count = int(res19["attr2"]) if res19 else 0 + + for node_idx in range(node_count): + n_off = node_idx * 38 + hdr2 = struct.unpack_from(" 0 and slot_idx >= slot_count: + _add_issue( + issues, + "error", + "res1-slot", + archive, + model_name, + f"node {node_idx}: slotIndex[{w_idx}]={slot_idx} outside slot_count={slot_count}", + ) + + if anim_key_count > 0 and hdr3 != 0xFFFF and hdr3 >= anim_key_count: + _add_issue( + issues, + "error", + "res1-anim", + archive, + model_name, + f"node {node_idx}: fallbackKeyIndex={hdr3} outside Res8 count={anim_key_count}", + ) + if map_words and hdr2 != 0xFFFF and frame_count > 0: + end = hdr2 + frame_count + if end > len(map_words): + _add_issue( + issues, + "error", + "res19-map", + archive, + model_name, + f"node {node_idx}: map range [{hdr2}, {end}) outside Res19 count={len(map_words)}", + ) + + counters["models_ok"] += 1 + + +def _validate_texm_payload( + payload: bytes, + archive: Path, + entry_name: str, + issues: list[dict[str, Any]], + counters: Counter[str], +) -> None: + counters["texm_total"] += 1 + + if len(payload) < 32: + _add_issue( + issues, + "error", + "texm", + archive, + entry_name, + f"payload too small: {len(payload)}", + ) + return + + magic, width, height, mip_count, flags4, flags5, unk6, fmt = struct.unpack_from("<8I", payload, 0) + if magic != TYPE_TEXM: + _add_issue(issues, "error", "texm", archive, entry_name, f"magic=0x{magic:08X} != Texm") + return + if width == 0 or height == 0: + _add_issue(issues, "error", "texm", archive, entry_name, f"invalid size {width}x{height}") + return + if mip_count == 0: + _add_issue(issues, "error", "texm", archive, entry_name, "mipCount=0") + return + if fmt not in TEXM_KNOWN_FORMATS: + _add_issue( + issues, + "error", + "texm", + archive, + entry_name, + f"unknown format code {fmt}", + ) + return + if flags4 not in (0, 32): + _add_issue( + issues, + "warning", + "texm", + archive, + entry_name, + f"flags4={flags4} (known values: 0 or 32)", + ) + if flags5 not in (0, 0x04000000, 0x00800000): + _add_issue( + issues, + "warning", + "texm", + archive, + entry_name, + f"flags5=0x{flags5:08X} (known values: 0, 0x00800000, 0x04000000)", + ) + + bpp = 1 if fmt == 0 else (2 if fmt in (565, 556, 4444) else 4) + pix_sum = 0 + w = width + h = height + for _ in range(mip_count): + pix_sum += w * h + w = max(1, w >> 1) + h = max(1, h >> 1) + size_core = 32 + (1024 if fmt == 0 else 0) + bpp * pix_sum + if size_core > len(payload): + _add_issue( + issues, + "error", + "texm", + archive, + entry_name, + f"sizeCore={size_core} exceeds payload size={len(payload)}", + ) + return + + tail = len(payload) - size_core + if tail > 0: + off = size_core + if tail < 8: + _add_issue( + issues, + "error", + "texm", + archive, + entry_name, + f"tail too short for Page chunk: tail={tail}", + ) + return + if payload[off : off + 4] != MAGIC_PAGE: + _add_issue( + issues, + "error", + "texm", + archive, + entry_name, + f"tail is present but no Page magic at offset {off}", + ) + return + rect_count = struct.unpack_from(" tail: + _add_issue( + issues, + "error", + "texm", + archive, + entry_name, + f"Page chunk truncated: need={need}, tail={tail}", + ) + return + if need != tail: + _add_issue( + issues, + "error", + "texm", + archive, + entry_name, + f"extra bytes after Page chunk: tail={tail}, pageSize={need}", + ) + return + + _ = unk6 # carried as raw field in spec, semantics intentionally unknown. + counters["texm_ok"] += 1 + + +def _validate_fxid_payload( + payload: bytes, + archive: Path, + entry_name: str, + issues: list[dict[str, Any]], + counters: Counter[str], +) -> None: + counters["fxid_total"] += 1 + + if len(payload) < 60: + _add_issue( + issues, + "error", + "fxid", + archive, + entry_name, + f"payload too small: {len(payload)}", + ) + return + + cmd_count = struct.unpack_from(" len(payload): + _add_issue( + issues, + "error", + "fxid", + archive, + entry_name, + f"command {idx}: missing header at offset={ptr}", + ) + return + word = struct.unpack_from(" len(payload): + _add_issue( + issues, + "error", + "fxid", + archive, + entry_name, + f"command {idx}: truncated, need end={ptr + size}, payload={len(payload)}", + ) + return + ptr += size + + if ptr != len(payload): + _add_issue( + issues, + "error", + "fxid", + archive, + entry_name, + f"tail bytes after command stream: parsed_end={ptr}, payload={len(payload)}", + ) + return + + counters["fxid_ok"] += 1 + + +def _scan_nres_files(root: Path) -> list[Path]: + rows = arv.scan_archives(root) + out: list[Path] = [] + for item in rows: + if item["type"] != "nres": + continue + out.append(root / item["relative_path"]) + return out + + +def run_validation(input_root: Path) -> dict[str, Any]: + archives = _scan_nres_files(input_root) + issues: list[dict[str, Any]] = [] + counters: Counter[str] = Counter() + + for archive_path in archives: + counters["archives_total"] += 1 + data = archive_path.read_bytes() + try: + parsed = arv.parse_nres(data, source=str(archive_path)) + except Exception as exc: # pylint: disable=broad-except + _add_issue(issues, "error", "archive", archive_path, None, f"cannot parse NRes: {exc}") + continue + + for item in parsed.get("issues", []): + _add_issue(issues, "warning", "archive", archive_path, None, str(item)) + + for entry in parsed["entries"]: + name = str(entry["name"]) + payload = _entry_payload(data, entry) + type_id = int(entry["type_id"]) + + if name.lower().endswith(".msh"): + _validate_model_payload(payload, archive_path, name, issues, counters) + + if type_id == TYPE_TEXM: + _validate_texm_payload(payload, archive_path, name, issues, counters) + + if type_id == TYPE_FXID: + _validate_fxid_payload(payload, archive_path, name, issues, counters) + + errors = sum(1 for row in issues if row["severity"] == "error") + warnings = sum(1 for row in issues if row["severity"] == "warning") + + return { + "input_root": str(input_root), + "summary": { + "archives_total": counters["archives_total"], + "models_total": counters["models_total"], + "models_ok": counters["models_ok"], + "texm_total": counters["texm_total"], + "texm_ok": counters["texm_ok"], + "fxid_total": counters["fxid_total"], + "fxid_ok": counters["fxid_ok"], + "errors": errors, + "warnings": warnings, + "issues_total": len(issues), + }, + "issues": issues, + } + + +def cmd_scan(args: argparse.Namespace) -> int: + root = Path(args.input).resolve() + report = run_validation(root) + summary = report["summary"] + print(f"Input root : {root}") + print(f"NRes archives : {summary['archives_total']}") + print(f"MSH models : {summary['models_total']}") + print(f"Texm textures : {summary['texm_total']}") + print(f"FXID effects : {summary['fxid_total']}") + return 0 + + +def cmd_validate(args: argparse.Namespace) -> int: + root = Path(args.input).resolve() + report = run_validation(root) + summary = report["summary"] + + if args.report: + arv.dump_json(Path(args.report).resolve(), report) + + print(f"Input root : {root}") + print(f"NRes archives : {summary['archives_total']}") + print(f"MSH models : {summary['models_ok']}/{summary['models_total']} valid") + print(f"Texm textures : {summary['texm_ok']}/{summary['texm_total']} valid") + print(f"FXID effects : {summary['fxid_ok']}/{summary['fxid_total']} valid") + print(f"Issues : {summary['issues_total']} (errors={summary['errors']}, warnings={summary['warnings']})") + + if report["issues"]: + limit = max(1, int(args.print_limit)) + print("\nSample issues:") + for item in report["issues"][:limit]: + where = item["archive"] + if item["entry"]: + where = f"{where}::{item['entry']}" + print(f"- [{item['severity']}] [{item['category']}] {where}: {item['message']}") + if len(report["issues"]) > limit: + print(f"... and {len(report['issues']) - limit} more issue(s)") + + if summary["errors"] > 0: + return 1 + if args.fail_on_warnings and summary["warnings"] > 0: + return 1 + return 0 + + +def build_parser() -> argparse.ArgumentParser: + parser = argparse.ArgumentParser( + description="Validate docs/specs/msh.md assumptions on real archives." + ) + sub = parser.add_subparsers(dest="command", required=True) + + scan = sub.add_parser("scan", help="Quick scan and counts (models/textures/effects).") + scan.add_argument("--input", required=True, help="Root directory with game/test archives.") + scan.set_defaults(func=cmd_scan) + + validate = sub.add_parser("validate", help="Run full spec validation.") + validate.add_argument("--input", required=True, help="Root directory with game/test archives.") + validate.add_argument("--report", help="Optional JSON report output path.") + validate.add_argument( + "--print-limit", + type=int, + default=50, + help="How many issues to print to stdout (default: 50).", + ) + validate.add_argument( + "--fail-on-warnings", + action="store_true", + help="Return non-zero if warnings are present.", + ) + validate.set_defaults(func=cmd_validate) + + return parser + + +def main() -> int: + parser = build_parser() + args = parser.parse_args() + return int(args.func(args)) + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/tools/msh_export_obj.py b/tools/msh_export_obj.py new file mode 100644 index 0000000..75a9602 --- /dev/null +++ b/tools/msh_export_obj.py @@ -0,0 +1,357 @@ +#!/usr/bin/env python3 +""" +Export NGI MSH geometry to Wavefront OBJ. + +The exporter is intended for inspection/debugging and uses the same +batch/slot selection logic as msh_preview_renderer.py. +""" + +from __future__ import annotations + +import argparse +import math +import struct +from pathlib import Path +from typing import Any + +import archive_roundtrip_validator as arv + +MAGIC_NRES = b"NRes" + + +def _entry_payload(blob: bytes, entry: dict[str, Any]) -> bytes: + start = int(entry["data_offset"]) + end = start + int(entry["size"]) + return blob[start:end] + + +def _parse_nres(blob: bytes, source: str) -> dict[str, Any]: + if blob[:4] != MAGIC_NRES: + raise RuntimeError(f"{source}: not an NRes payload") + return arv.parse_nres(blob, source=source) + + +def _by_type(entries: list[dict[str, Any]]) -> dict[int, list[dict[str, Any]]]: + out: dict[int, list[dict[str, Any]]] = {} + for row in entries: + out.setdefault(int(row["type_id"]), []).append(row) + return out + + +def _get_single(by_type: dict[int, list[dict[str, Any]]], type_id: int, label: str) -> dict[str, Any]: + rows = by_type.get(type_id, []) + if not rows: + raise RuntimeError(f"missing resource type {type_id} ({label})") + return rows[0] + + +def _pick_model_payload(archive_path: Path, model_name: str | None) -> tuple[bytes, str]: + root_blob = archive_path.read_bytes() + parsed = _parse_nres(root_blob, str(archive_path)) + + msh_entries = [row for row in parsed["entries"] if str(row["name"]).lower().endswith(".msh")] + if msh_entries: + chosen: dict[str, Any] | None = None + if model_name: + model_l = model_name.lower() + for row in msh_entries: + name_l = str(row["name"]).lower() + if name_l == model_l: + chosen = row + break + if chosen is None: + for row in msh_entries: + if str(row["name"]).lower().startswith(model_l): + chosen = row + break + else: + chosen = msh_entries[0] + + if chosen is None: + names = ", ".join(str(row["name"]) for row in msh_entries[:12]) + raise RuntimeError( + f"model '{model_name}' not found in {archive_path}. Available: {names}" + ) + return _entry_payload(root_blob, chosen), str(chosen["name"]) + + by_type = _by_type(parsed["entries"]) + if all(k in by_type for k in (1, 2, 3, 6, 13)): + return root_blob, archive_path.name + + raise RuntimeError( + f"{archive_path} does not contain .msh entries and does not look like a direct model payload" + ) + + +def _extract_geometry( + model_blob: bytes, + *, + lod: int, + group: int, + max_faces: int, + all_batches: bool, +) -> tuple[list[tuple[float, float, float]], list[tuple[int, int, int]], dict[str, int]]: + parsed = _parse_nres(model_blob, "") + by_type = _by_type(parsed["entries"]) + + res1 = _get_single(by_type, 1, "Res1") + res2 = _get_single(by_type, 2, "Res2") + res3 = _get_single(by_type, 3, "Res3") + res6 = _get_single(by_type, 6, "Res6") + res13 = _get_single(by_type, 13, "Res13") + + pos_blob = _entry_payload(model_blob, res3) + if len(pos_blob) % 12 != 0: + raise RuntimeError(f"Res3 size is not divisible by 12: {len(pos_blob)}") + vertex_count = len(pos_blob) // 12 + positions = [struct.unpack_from("<3f", pos_blob, i * 12) for i in range(vertex_count)] + + idx_blob = _entry_payload(model_blob, res6) + if len(idx_blob) % 2 != 0: + raise RuntimeError(f"Res6 size is not divisible by 2: {len(idx_blob)}") + index_count = len(idx_blob) // 2 + indices = list(struct.unpack_from(f"<{index_count}H", idx_blob, 0)) + + batch_blob = _entry_payload(model_blob, res13) + if len(batch_blob) % 20 != 0: + raise RuntimeError(f"Res13 size is not divisible by 20: {len(batch_blob)}") + batch_count = len(batch_blob) // 20 + batches: list[tuple[int, int, int, int]] = [] + for i in range(batch_count): + off = i * 20 + idx_count = struct.unpack_from("= 38 and len(res1_blob) >= node_count * node_stride: + if lod < 0 or lod > 2: + raise RuntimeError(f"lod must be 0..2 (got {lod})") + if group < 0 or group > 4: + raise RuntimeError(f"group must be 0..4 (got {group})") + matrix_index = lod * 5 + group + for n in range(node_count): + off = n * node_stride + 8 + matrix_index * 2 + slot_idx = struct.unpack_from("= slot_count: + continue + node_slot_indices.append(slot_idx) + + faces: list[tuple[int, int, int]] = [] + used_batches = 0 + used_slots = 0 + + def append_batch(batch_idx: int) -> None: + nonlocal used_batches + if batch_idx < 0 or batch_idx >= len(batches): + return + idx_count, idx_start, base_vertex, _ = batches[batch_idx] + if idx_count < 3: + return + end = idx_start + idx_count + if end > len(indices): + return + used_batches += 1 + tri_count = idx_count // 3 + for t in range(tri_count): + i0 = indices[idx_start + t * 3 + 0] + base_vertex + i1 = indices[idx_start + t * 3 + 1] + base_vertex + i2 = indices[idx_start + t * 3 + 2] + base_vertex + if i0 >= vertex_count or i1 >= vertex_count or i2 >= vertex_count: + continue + faces.append((i0, i1, i2)) + if len(faces) >= max_faces: + return + + if node_slot_indices: + for slot_idx in node_slot_indices: + if len(faces) >= max_faces: + break + _tri_start, _tri_count, batch_start, slot_batch_count = slots[slot_idx] + used_slots += 1 + for bi in range(batch_start, batch_start + slot_batch_count): + append_batch(bi) + if len(faces) >= max_faces: + break + else: + for bi in range(batch_count): + append_batch(bi) + if len(faces) >= max_faces: + break + + if not faces: + raise RuntimeError("no faces selected for export") + + meta = { + "vertex_count": vertex_count, + "index_count": index_count, + "batch_count": batch_count, + "slot_count": slot_count, + "node_count": node_count, + "used_slots": used_slots, + "used_batches": used_batches, + "face_count": len(faces), + } + return positions, faces, meta + + +def _compute_vertex_normals( + positions: list[tuple[float, float, float]], + faces: list[tuple[int, int, int]], +) -> list[tuple[float, float, float]]: + acc = [[0.0, 0.0, 0.0] for _ in positions] + for i0, i1, i2 in faces: + p0 = positions[i0] + p1 = positions[i1] + p2 = positions[i2] + ux = p1[0] - p0[0] + uy = p1[1] - p0[1] + uz = p1[2] - p0[2] + vx = p2[0] - p0[0] + vy = p2[1] - p0[1] + vz = p2[2] - p0[2] + nx = uy * vz - uz * vy + ny = uz * vx - ux * vz + nz = ux * vy - uy * vx + acc[i0][0] += nx + acc[i0][1] += ny + acc[i0][2] += nz + acc[i1][0] += nx + acc[i1][1] += ny + acc[i1][2] += nz + acc[i2][0] += nx + acc[i2][1] += ny + acc[i2][2] += nz + + normals: list[tuple[float, float, float]] = [] + for nx, ny, nz in acc: + ln = math.sqrt(nx * nx + ny * ny + nz * nz) + if ln <= 1e-12: + normals.append((0.0, 1.0, 0.0)) + else: + normals.append((nx / ln, ny / ln, nz / ln)) + return normals + + +def _write_obj( + output_path: Path, + object_name: str, + positions: list[tuple[float, float, float]], + faces: list[tuple[int, int, int]], +) -> None: + output_path.parent.mkdir(parents=True, exist_ok=True) + normals = _compute_vertex_normals(positions, faces) + + with output_path.open("w", encoding="utf-8", newline="\n") as out: + out.write("# Exported by msh_export_obj.py\n") + out.write(f"o {object_name}\n") + for x, y, z in positions: + out.write(f"v {x:.9g} {y:.9g} {z:.9g}\n") + for nx, ny, nz in normals: + out.write(f"vn {nx:.9g} {ny:.9g} {nz:.9g}\n") + for i0, i1, i2 in faces: + a = i0 + 1 + b = i1 + 1 + c = i2 + 1 + out.write(f"f {a}//{a} {b}//{b} {c}//{c}\n") + + +def cmd_list_models(args: argparse.Namespace) -> int: + archive_path = Path(args.archive).resolve() + blob = archive_path.read_bytes() + parsed = _parse_nres(blob, str(archive_path)) + rows = [row for row in parsed["entries"] if str(row["name"]).lower().endswith(".msh")] + print(f"Archive: {archive_path}") + print(f"MSH entries: {len(rows)}") + for row in rows: + print(f"- {row['name']}") + return 0 + + +def cmd_export(args: argparse.Namespace) -> int: + archive_path = Path(args.archive).resolve() + output_path = Path(args.output).resolve() + + model_blob, model_label = _pick_model_payload(archive_path, args.model) + positions, faces, meta = _extract_geometry( + model_blob, + lod=int(args.lod), + group=int(args.group), + max_faces=int(args.max_faces), + all_batches=bool(args.all_batches), + ) + obj_name = Path(model_label).stem or "msh_model" + _write_obj(output_path, obj_name, positions, faces) + + print(f"Exported model : {model_label}") + print(f"Output OBJ : {output_path}") + print(f"Object name : {obj_name}") + print( + "Geometry : " + f"vertices={meta['vertex_count']}, faces={meta['face_count']}, " + f"batches={meta['used_batches']}/{meta['batch_count']}, slots={meta['used_slots']}/{meta['slot_count']}" + ) + print( + "Mode : " + f"lod={args.lod}, group={args.group}, all_batches={bool(args.all_batches)}" + ) + return 0 + + +def build_parser() -> argparse.ArgumentParser: + parser = argparse.ArgumentParser( + description="Export NGI MSH geometry to Wavefront OBJ." + ) + sub = parser.add_subparsers(dest="command", required=True) + + list_models = sub.add_parser("list-models", help="List .msh entries in an NRes archive.") + list_models.add_argument("--archive", required=True, help="Path to archive (e.g. animals.rlb).") + list_models.set_defaults(func=cmd_list_models) + + export = sub.add_parser("export", help="Export one model to OBJ.") + export.add_argument("--archive", required=True, help="Path to NRes archive or direct model payload.") + export.add_argument( + "--model", + help="Model entry name (*.msh) inside archive. If omitted, first .msh is used.", + ) + export.add_argument("--output", required=True, help="Output .obj path.") + export.add_argument("--lod", type=int, default=0, help="LOD index 0..2 (default: 0).") + export.add_argument("--group", type=int, default=0, help="Group index 0..4 (default: 0).") + export.add_argument("--max-faces", type=int, default=120000, help="Face limit (default: 120000).") + export.add_argument( + "--all-batches", + action="store_true", + help="Ignore slot matrix selection and export all batches.", + ) + export.set_defaults(func=cmd_export) + + return parser + + +def main() -> int: + parser = build_parser() + args = parser.parse_args() + return int(args.func(args)) + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/tools/msh_preview_renderer.py b/tools/msh_preview_renderer.py new file mode 100644 index 0000000..53b4e63 --- /dev/null +++ b/tools/msh_preview_renderer.py @@ -0,0 +1,481 @@ +#!/usr/bin/env python3 +""" +Primitive software renderer for NGI MSH models. + +Output format: binary PPM (P6), no external dependencies. +""" + +from __future__ import annotations + +import argparse +import math +import struct +from pathlib import Path +from typing import Any + +import archive_roundtrip_validator as arv + +MAGIC_NRES = b"NRes" + + +def _entry_payload(blob: bytes, entry: dict[str, Any]) -> bytes: + start = int(entry["data_offset"]) + end = start + int(entry["size"]) + return blob[start:end] + + +def _parse_nres(blob: bytes, source: str) -> dict[str, Any]: + if blob[:4] != MAGIC_NRES: + raise RuntimeError(f"{source}: not an NRes payload") + return arv.parse_nres(blob, source=source) + + +def _by_type(entries: list[dict[str, Any]]) -> dict[int, list[dict[str, Any]]]: + out: dict[int, list[dict[str, Any]]] = {} + for row in entries: + out.setdefault(int(row["type_id"]), []).append(row) + return out + + +def _pick_model_payload(archive_path: Path, model_name: str | None) -> tuple[bytes, str]: + root_blob = archive_path.read_bytes() + parsed = _parse_nres(root_blob, str(archive_path)) + + msh_entries = [row for row in parsed["entries"] if str(row["name"]).lower().endswith(".msh")] + if msh_entries: + chosen: dict[str, Any] | None = None + if model_name: + model_l = model_name.lower() + for row in msh_entries: + name_l = str(row["name"]).lower() + if name_l == model_l: + chosen = row + break + if chosen is None: + for row in msh_entries: + if str(row["name"]).lower().startswith(model_l): + chosen = row + break + else: + chosen = msh_entries[0] + + if chosen is None: + names = ", ".join(str(row["name"]) for row in msh_entries[:12]) + raise RuntimeError( + f"model '{model_name}' not found in {archive_path}. Available: {names}" + ) + return _entry_payload(root_blob, chosen), str(chosen["name"]) + + # Fallback: treat file itself as a model NRes payload. + by_type = _by_type(parsed["entries"]) + if all(k in by_type for k in (1, 2, 3, 6, 13)): + return root_blob, archive_path.name + + raise RuntimeError( + f"{archive_path} does not contain .msh entries and does not look like a direct model payload" + ) + + +def _get_single(by_type: dict[int, list[dict[str, Any]]], type_id: int, label: str) -> dict[str, Any]: + rows = by_type.get(type_id, []) + if not rows: + raise RuntimeError(f"missing resource type {type_id} ({label})") + return rows[0] + + +def _extract_geometry( + model_blob: bytes, + *, + lod: int, + group: int, + max_faces: int, +) -> tuple[list[tuple[float, float, float]], list[tuple[int, int, int]], dict[str, int]]: + parsed = _parse_nres(model_blob, "") + by_type = _by_type(parsed["entries"]) + + res1 = _get_single(by_type, 1, "Res1") + res2 = _get_single(by_type, 2, "Res2") + res3 = _get_single(by_type, 3, "Res3") + res6 = _get_single(by_type, 6, "Res6") + res13 = _get_single(by_type, 13, "Res13") + + # Positions + pos_blob = _entry_payload(model_blob, res3) + if len(pos_blob) % 12 != 0: + raise RuntimeError(f"Res3 size is not divisible by 12: {len(pos_blob)}") + vertex_count = len(pos_blob) // 12 + positions = [struct.unpack_from("<3f", pos_blob, i * 12) for i in range(vertex_count)] + + # Indices + idx_blob = _entry_payload(model_blob, res6) + if len(idx_blob) % 2 != 0: + raise RuntimeError(f"Res6 size is not divisible by 2: {len(idx_blob)}") + index_count = len(idx_blob) // 2 + indices = list(struct.unpack_from(f"<{index_count}H", idx_blob, 0)) + + # Batches + batch_blob = _entry_payload(model_blob, res13) + if len(batch_blob) % 20 != 0: + raise RuntimeError(f"Res13 size is not divisible by 20: {len(batch_blob)}") + batch_count = len(batch_blob) // 20 + batches: list[tuple[int, int, int, int]] = [] + for i in range(batch_count): + off = i * 20 + # Keep only fields used by renderer: + # indexCount, indexStart, baseVertex + idx_count = struct.unpack_from("= 38 and len(res1_blob) >= node_count * node_stride: + if lod < 0 or lod > 2: + raise RuntimeError(f"lod must be 0..2 (got {lod})") + if group < 0 or group > 4: + raise RuntimeError(f"group must be 0..4 (got {group})") + matrix_index = lod * 5 + group + for n in range(node_count): + off = n * node_stride + 8 + matrix_index * 2 + slot_idx = struct.unpack_from("= slot_count: + continue + node_slot_indices.append(slot_idx) + + # Build triangle list. + faces: list[tuple[int, int, int]] = [] + used_batches = 0 + used_slots = 0 + + def append_batch(batch_idx: int) -> None: + nonlocal used_batches + if batch_idx < 0 or batch_idx >= len(batches): + return + idx_count, idx_start, base_vertex, _ = batches[batch_idx] + if idx_count < 3: + return + end = idx_start + idx_count + if end > len(indices): + return + used_batches += 1 + tri_count = idx_count // 3 + for t in range(tri_count): + i0 = indices[idx_start + t * 3 + 0] + base_vertex + i1 = indices[idx_start + t * 3 + 1] + base_vertex + i2 = indices[idx_start + t * 3 + 2] + base_vertex + if i0 >= vertex_count or i1 >= vertex_count or i2 >= vertex_count: + continue + faces.append((i0, i1, i2)) + if len(faces) >= max_faces: + return + + if node_slot_indices: + for slot_idx in node_slot_indices: + if len(faces) >= max_faces: + break + _tri_start, _tri_count, batch_start, slot_batch_count = slots[slot_idx] + used_slots += 1 + for bi in range(batch_start, batch_start + slot_batch_count): + append_batch(bi) + if len(faces) >= max_faces: + break + else: + # Fallback if slot matrix is unavailable: draw all batches. + for bi in range(batch_count): + append_batch(bi) + if len(faces) >= max_faces: + break + + meta = { + "vertex_count": vertex_count, + "index_count": index_count, + "batch_count": batch_count, + "slot_count": slot_count, + "node_count": node_count, + "used_slots": used_slots, + "used_batches": used_batches, + "face_count": len(faces), + } + if not faces: + raise RuntimeError("no faces selected for rendering") + return positions, faces, meta + + +def _write_ppm(path: Path, width: int, height: int, rgb: bytearray) -> None: + path.parent.mkdir(parents=True, exist_ok=True) + with path.open("wb") as handle: + handle.write(f"P6\n{width} {height}\n255\n".encode("ascii")) + handle.write(rgb) + + +def _render_software( + positions: list[tuple[float, float, float]], + faces: list[tuple[int, int, int]], + *, + width: int, + height: int, + yaw_deg: float, + pitch_deg: float, + wireframe: bool, +) -> bytearray: + xs = [p[0] for p in positions] + ys = [p[1] for p in positions] + zs = [p[2] for p in positions] + cx = (min(xs) + max(xs)) * 0.5 + cy = (min(ys) + max(ys)) * 0.5 + cz = (min(zs) + max(zs)) * 0.5 + span = max(max(xs) - min(xs), max(ys) - min(ys), max(zs) - min(zs)) + radius = max(span * 0.5, 1e-3) + + yaw = math.radians(yaw_deg) + pitch = math.radians(pitch_deg) + cyaw = math.cos(yaw) + syaw = math.sin(yaw) + cpitch = math.cos(pitch) + spitch = math.sin(pitch) + + camera_dist = radius * 3.2 + scale = min(width, height) * 0.95 + + # Transform all vertices once. + vx: list[float] = [] + vy: list[float] = [] + vz: list[float] = [] + sx: list[float] = [] + sy: list[float] = [] + for x, y, z in positions: + x0 = x - cx + y0 = y - cy + z0 = z - cz + x1 = cyaw * x0 + syaw * z0 + z1 = -syaw * x0 + cyaw * z0 + y2 = cpitch * y0 - spitch * z1 + z2 = spitch * y0 + cpitch * z1 + camera_dist + if z2 < 1e-3: + z2 = 1e-3 + vx.append(x1) + vy.append(y2) + vz.append(z2) + sx.append(width * 0.5 + (x1 / z2) * scale) + sy.append(height * 0.5 - (y2 / z2) * scale) + + rgb = bytearray([16, 18, 24] * (width * height)) + zbuf = [float("inf")] * (width * height) + light_dir = (0.35, 0.45, 1.0) + l_len = math.sqrt(light_dir[0] ** 2 + light_dir[1] ** 2 + light_dir[2] ** 2) + light = (light_dir[0] / l_len, light_dir[1] / l_len, light_dir[2] / l_len) + + def edge(ax: float, ay: float, bx: float, by: float, px: float, py: float) -> float: + return (px - ax) * (by - ay) - (py - ay) * (bx - ax) + + for i0, i1, i2 in faces: + x0 = sx[i0] + y0 = sy[i0] + x1 = sx[i1] + y1 = sy[i1] + x2 = sx[i2] + y2 = sy[i2] + area = edge(x0, y0, x1, y1, x2, y2) + if area == 0.0: + continue + + # Shading from camera-space normal. + ux = vx[i1] - vx[i0] + uy = vy[i1] - vy[i0] + uz = vz[i1] - vz[i0] + wx = vx[i2] - vx[i0] + wy = vy[i2] - vy[i0] + wz = vz[i2] - vz[i0] + nx = uy * wz - uz * wy + ny = uz * wx - ux * wz + nz = ux * wy - uy * wx + n_len = math.sqrt(nx * nx + ny * ny + nz * nz) + if n_len > 0.0: + nx /= n_len + ny /= n_len + nz /= n_len + intensity = nx * light[0] + ny * light[1] + nz * light[2] + if intensity < 0.0: + intensity = 0.0 + shade = int(45 + 200 * intensity) + color = (shade, shade, min(255, shade + 18)) + + minx = int(max(0, math.floor(min(x0, x1, x2)))) + maxx = int(min(width - 1, math.ceil(max(x0, x1, x2)))) + miny = int(max(0, math.floor(min(y0, y1, y2)))) + maxy = int(min(height - 1, math.ceil(max(y0, y1, y2)))) + if minx > maxx or miny > maxy: + continue + + z0 = vz[i0] + z1 = vz[i1] + z2 = vz[i2] + + for py in range(miny, maxy + 1): + fy = py + 0.5 + row = py * width + for px in range(minx, maxx + 1): + fx = px + 0.5 + w0 = edge(x1, y1, x2, y2, fx, fy) + w1 = edge(x2, y2, x0, y0, fx, fy) + w2 = edge(x0, y0, x1, y1, fx, fy) + if area > 0: + if w0 < 0 or w1 < 0 or w2 < 0: + continue + else: + if w0 > 0 or w1 > 0 or w2 > 0: + continue + inv_area = 1.0 / area + bz0 = w0 * inv_area + bz1 = w1 * inv_area + bz2 = w2 * inv_area + depth = bz0 * z0 + bz1 * z1 + bz2 * z2 + idx = row + px + if depth >= zbuf[idx]: + continue + zbuf[idx] = depth + p = idx * 3 + rgb[p + 0] = color[0] + rgb[p + 1] = color[1] + rgb[p + 2] = color[2] + + if wireframe: + def draw_line(xa: float, ya: float, xb: float, yb: float) -> None: + x0i = int(round(xa)) + y0i = int(round(ya)) + x1i = int(round(xb)) + y1i = int(round(yb)) + dx = abs(x1i - x0i) + sx_step = 1 if x0i < x1i else -1 + dy = -abs(y1i - y0i) + sy_step = 1 if y0i < y1i else -1 + err = dx + dy + x = x0i + y = y0i + while True: + if 0 <= x < width and 0 <= y < height: + p = (y * width + x) * 3 + rgb[p + 0] = 240 + rgb[p + 1] = 245 + rgb[p + 2] = 255 + if x == x1i and y == y1i: + break + e2 = 2 * err + if e2 >= dy: + err += dy + x += sx_step + if e2 <= dx: + err += dx + y += sy_step + + for i0, i1, i2 in faces: + draw_line(sx[i0], sy[i0], sx[i1], sy[i1]) + draw_line(sx[i1], sy[i1], sx[i2], sy[i2]) + draw_line(sx[i2], sy[i2], sx[i0], sy[i0]) + + return rgb + + +def cmd_list_models(args: argparse.Namespace) -> int: + archive_path = Path(args.archive).resolve() + blob = archive_path.read_bytes() + parsed = _parse_nres(blob, str(archive_path)) + rows = [row for row in parsed["entries"] if str(row["name"]).lower().endswith(".msh")] + print(f"Archive: {archive_path}") + print(f"MSH entries: {len(rows)}") + for row in rows: + print(f"- {row['name']}") + return 0 + + +def cmd_render(args: argparse.Namespace) -> int: + archive_path = Path(args.archive).resolve() + output_path = Path(args.output).resolve() + + model_blob, model_label = _pick_model_payload(archive_path, args.model) + positions, faces, meta = _extract_geometry( + model_blob, + lod=int(args.lod), + group=int(args.group), + max_faces=int(args.max_faces), + ) + rgb = _render_software( + positions, + faces, + width=int(args.width), + height=int(args.height), + yaw_deg=float(args.yaw), + pitch_deg=float(args.pitch), + wireframe=bool(args.wireframe), + ) + _write_ppm(output_path, int(args.width), int(args.height), rgb) + + print(f"Rendered model: {model_label}") + print(f"Output : {output_path}") + print( + "Geometry : " + f"vertices={meta['vertex_count']}, faces={meta['face_count']}, " + f"batches={meta['used_batches']}/{meta['batch_count']}, slots={meta['used_slots']}/{meta['slot_count']}" + ) + print(f"Mode : lod={args.lod}, group={args.group}, wireframe={bool(args.wireframe)}") + return 0 + + +def build_parser() -> argparse.ArgumentParser: + parser = argparse.ArgumentParser( + description="Primitive NGI MSH renderer (software, dependency-free)." + ) + sub = parser.add_subparsers(dest="command", required=True) + + list_models = sub.add_parser("list-models", help="List .msh entries in an NRes archive.") + list_models.add_argument("--archive", required=True, help="Path to archive (e.g. animals.rlb).") + list_models.set_defaults(func=cmd_list_models) + + render = sub.add_parser("render", help="Render one model to PPM image.") + render.add_argument("--archive", required=True, help="Path to NRes archive or direct model payload.") + render.add_argument( + "--model", + help="Model entry name (*.msh) inside archive. If omitted, first .msh is used.", + ) + render.add_argument("--output", required=True, help="Output .ppm file path.") + render.add_argument("--lod", type=int, default=0, help="LOD index 0..2 (default: 0).") + render.add_argument("--group", type=int, default=0, help="Group index 0..4 (default: 0).") + render.add_argument("--max-faces", type=int, default=120000, help="Face limit (default: 120000).") + render.add_argument("--width", type=int, default=1280, help="Image width (default: 1280).") + render.add_argument("--height", type=int, default=720, help="Image height (default: 720).") + render.add_argument("--yaw", type=float, default=35.0, help="Yaw angle in degrees (default: 35).") + render.add_argument("--pitch", type=float, default=18.0, help="Pitch angle in degrees (default: 18).") + render.add_argument("--wireframe", action="store_true", help="Draw white wireframe overlay.") + render.set_defaults(func=cmd_render) + + return parser + + +def main() -> int: + parser = build_parser() + args = parser.parse_args() + return int(args.func(args)) + + +if __name__ == "__main__": + raise SystemExit(main()) -- cgit v1.2.3