diff options
Diffstat (limited to 'tools')
| -rw-r--r-- | tools/fxid_abs100_audit.py | 262 | ||||
| -rw-r--r-- | tools/terrain_map_doc_validator.py | 809 | ||||
| -rw-r--r-- | tools/terrain_map_preview_renderer.py | 679 |
3 files changed, 1750 insertions, 0 deletions
diff --git a/tools/fxid_abs100_audit.py b/tools/fxid_abs100_audit.py new file mode 100644 index 0000000..79f3b92 --- /dev/null +++ b/tools/fxid_abs100_audit.py @@ -0,0 +1,262 @@ +#!/usr/bin/env python3 +""" +Deterministic audit for FXID "absolute parity" checklist. + +What this script produces: +1) strict parsing stats across all FXID payloads in NRes archives, +2) opcode histogram and rare-branch counters (op6, op1 tail usage), +3) reference vectors for RNG core (sub_10002220 semantics). +""" + +from __future__ import annotations + +import argparse +import json +import struct +from collections import Counter +from pathlib import Path +from typing import Any + +import archive_roundtrip_validator as arv + +TYPE_FXID = 0x44495846 +FX_CMD_SIZE = {1: 224, 2: 148, 3: 200, 4: 204, 5: 112, 6: 4, 7: 208, 8: 248, 9: 208, 10: 208} + + +def _entry_payload(blob: bytes, entry: dict[str, Any]) -> bytes: + start = int(entry["data_offset"]) + end = start + int(entry["size"]) + return blob[start:end] + + +def _cstr32(raw: bytes) -> str: + return raw.split(b"\x00", 1)[0].decode("latin1", errors="replace") + + +def _rng_step_sub_10002220(state32: int) -> tuple[int, int]: + """ + sub_10002220 semantics in 32-bit packed state form: + lo = state[15:0], hi = state[31:16] + new_lo = hi ^ (lo << 1) + new_hi = (hi >> 1) ^ new_lo + return new_hi (u16), update state=(new_hi<<16)|new_lo + """ + lo = state32 & 0xFFFF + hi = (state32 >> 16) & 0xFFFF + new_lo = (hi ^ ((lo << 1) & 0xFFFF)) & 0xFFFF + new_hi = ((hi >> 1) ^ new_lo) & 0xFFFF + return ((new_hi << 16) | new_lo), new_hi + + +def _rng_vectors() -> dict[str, Any]: + seeds = [0x00000000, 0x00000001, 0x12345678, 0x89ABCDEF, 0xFFFFFFFF] + out: list[dict[str, Any]] = [] + for seed in seeds: + state = seed + outputs: list[int] = [] + states: list[int] = [] + for _ in range(16): + state, value = _rng_step_sub_10002220(state) + outputs.append(value) + states.append(state) + out.append( + { + "seed_hex": f"0x{seed:08X}", + "outputs_u16_hex": [f"0x{x:04X}" for x in outputs], + "states_u32_hex": [f"0x{x:08X}" for x in states], + } + ) + return {"generator": "sub_10002220", "vectors": out} + + +def run_audit(root: Path) -> dict[str, Any]: + counters: Counter[str] = Counter() + opcode_hist: Counter[int] = Counter() + issues: list[dict[str, Any]] = [] + op1_tail6_samples: list[dict[str, Any]] = [] + op1_optref_samples: list[dict[str, Any]] = [] + + for item in arv.scan_archives(root): + if item["type"] != "nres": + continue + archive_path = root / item["relative_path"] + counters["archives_total"] += 1 + data = archive_path.read_bytes() + try: + parsed = arv.parse_nres(data, source=str(archive_path)) + except Exception as exc: # pylint: disable=broad-except + issues.append( + { + "severity": "error", + "archive": str(archive_path), + "entry": None, + "message": f"cannot parse NRes: {exc}", + } + ) + continue + + for entry in parsed["entries"]: + if int(entry["type_id"]) != TYPE_FXID: + continue + counters["fxid_total"] += 1 + payload = _entry_payload(data, entry) + entry_name = str(entry["name"]) + + if len(payload) < 60: + issues.append( + { + "severity": "error", + "archive": str(archive_path), + "entry": entry_name, + "message": f"payload too small: {len(payload)}", + } + ) + continue + + cmd_count = struct.unpack_from("<I", payload, 0)[0] + ptr = 0x3C + ok = True + for idx in range(cmd_count): + if ptr + 4 > len(payload): + issues.append( + { + "severity": "error", + "archive": str(archive_path), + "entry": entry_name, + "message": f"command {idx}: missing header at offset={ptr}", + } + ) + ok = False + break + + word = struct.unpack_from("<I", payload, ptr)[0] + opcode = word & 0xFF + size = FX_CMD_SIZE.get(opcode) + if size is None: + issues.append( + { + "severity": "error", + "archive": str(archive_path), + "entry": entry_name, + "message": f"command {idx}: unknown opcode={opcode} at offset={ptr}", + } + ) + ok = False + break + + if ptr + size > len(payload): + issues.append( + { + "severity": "error", + "archive": str(archive_path), + "entry": entry_name, + "message": f"command {idx}: truncated end={ptr + size}, payload={len(payload)}", + } + ) + ok = False + break + + opcode_hist[opcode] += 1 + if opcode == 6: + counters["op6_commands"] += 1 + if opcode == 1: + tail6 = payload[ptr + 136 : ptr + 160] + if any(tail6): + counters["op1_tail6_nonzero"] += 1 + if len(op1_tail6_samples) < 16: + dwords = list(struct.unpack("<6I", tail6)) + op1_tail6_samples.append( + { + "archive": str(archive_path), + "entry": entry_name, + "cmd_index": idx, + "tail6_u32_hex": [f"0x{x:08X}" for x in dwords], + } + ) + + archive_s = _cstr32(payload[ptr + 160 : ptr + 192]) + name_s = _cstr32(payload[ptr + 192 : ptr + 224]) + if archive_s or name_s: + counters["op1_optref_nonempty"] += 1 + if len(op1_optref_samples) < 16: + op1_optref_samples.append( + { + "archive": str(archive_path), + "entry": entry_name, + "cmd_index": idx, + "opt_archive": archive_s, + "opt_name": name_s, + } + ) + + ptr += size + + if ok and ptr != len(payload): + issues.append( + { + "severity": "error", + "archive": str(archive_path), + "entry": entry_name, + "message": f"tail bytes after command stream: parsed_end={ptr}, payload={len(payload)}", + } + ) + ok = False + + if ok: + counters["fxid_ok"] += 1 + + return { + "input_root": str(root), + "summary": { + "archives_total": counters["archives_total"], + "fxid_total": counters["fxid_total"], + "fxid_ok": counters["fxid_ok"], + "issues_total": len(issues), + "op6_commands": counters["op6_commands"], + "op1_tail6_nonzero": counters["op1_tail6_nonzero"], + "op1_optref_nonempty": counters["op1_optref_nonempty"], + }, + "opcode_histogram": {str(k): opcode_hist[k] for k in sorted(opcode_hist)}, + "op1_tail6_samples": op1_tail6_samples, + "op1_optref_samples": op1_optref_samples, + "rng_reference": _rng_vectors(), + "rng_states_fx_path": [ + {"state": "dword_10023688", "seed_init": "sub_10002660", "used_by": ["sub_10001720", "sub_10001A40"]}, + {"state": "dword_100238C0", "seed_init": "sub_10003A50", "used_by": ["sub_10002BE0"]}, + {"state": "dword_10024110", "seed_init": "sub_10009180", "used_by": ["sub_10008120", "sub_10007D10"]}, + {"state": "dword_10024810", "seed_init": "sub_1000D370", "used_by": ["sub_1000BF30", "sub_1000C1A0"]}, + {"state": "dword_10024A48", "seed_init": "sub_1000F420", "used_by": ["sub_1000EC50"]}, + {"state": "dword_10024C80", "seed_init": "sub_10010370", "used_by": ["sub_1000F6E0"]}, + {"state": "dword_100250F0", "seed_init": "sub_10012C70", "used_by": ["sub_10011230", "sub_100115C0"]}, + ], + "issues": issues, + } + + +def main() -> int: + parser = argparse.ArgumentParser(description="FXID absolute parity audit.") + parser.add_argument("--input", required=True, help="Root directory with game/test archives.") + parser.add_argument("--report", required=True, help="Output JSON report path.") + args = parser.parse_args() + + root = Path(args.input).resolve() + report_path = Path(args.report).resolve() + payload = run_audit(root) + report_path.parent.mkdir(parents=True, exist_ok=True) + report_path.write_text(json.dumps(payload, indent=2, ensure_ascii=False) + "\n", encoding="utf-8") + + summary = payload["summary"] + print(f"Input root : {root}") + print(f"NRes archives : {summary['archives_total']}") + print(f"FXID payloads : {summary['fxid_ok']}/{summary['fxid_total']} valid") + print(f"Issues : {summary['issues_total']}") + print(f"Opcode6 commands : {summary['op6_commands']}") + print(f"Op1 tail6 nonzero : {summary['op1_tail6_nonzero']}") + print(f"Op1 optref non-empty : {summary['op1_optref_nonempty']}") + print(f"Report : {report_path}") + + return 0 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/tools/terrain_map_doc_validator.py b/tools/terrain_map_doc_validator.py new file mode 100644 index 0000000..63c3077 --- /dev/null +++ b/tools/terrain_map_doc_validator.py @@ -0,0 +1,809 @@ +#!/usr/bin/env python3 +""" +Validate terrain/map documentation assumptions against real game data. + +Targets: +- tmp/gamedata/DATA/MAPS/**/Land.msh +- tmp/gamedata/DATA/MAPS/**/Land.map +""" + +from __future__ import annotations + +import argparse +import json +import math +import struct +from collections import Counter, defaultdict +from dataclasses import dataclass +from pathlib import Path +from typing import Any + +import archive_roundtrip_validator as arv + +MAGIC_NRES = b"NRes" + +REQUIRED_MSH_TYPES = (1, 2, 3, 4, 5, 11, 18, 21) +OPTIONAL_MSH_TYPES = (14,) +EXPECTED_MSH_ORDER = (1, 2, 3, 4, 5, 18, 14, 11, 21) + +MSH_STRIDES = { + 1: 38, + 3: 12, + 4: 4, + 5: 4, + 11: 4, + 14: 4, + 18: 4, + 21: 28, +} + +SLOT_TABLE_OFFSET = 0x8C + + +@dataclass +class ValidationIssue: + severity: str # error | warning + category: str + resource: str + message: str + + +class TerrainMapDocValidator: + def __init__(self) -> None: + self.issues: list[ValidationIssue] = [] + self.stats: dict[str, Any] = { + "maps_total": 0, + "msh_total": 0, + "map_total": 0, + "msh_type_orders": Counter(), + "msh_attr_triplets": defaultdict(Counter), # type_id -> Counter[(a1,a2,a3)] + "msh_type11_header_words": Counter(), + "msh_type21_flags_top": Counter(), + "map_logic_flags": Counter(), + "map_class_ids": Counter(), # record +40 + "map_poly_count": Counter(), + "map_vertex_count_min": None, + "map_vertex_count_max": None, + "map_cell_dims": Counter(), + "map_reserved_u12": Counter(), + "map_reserved_u36": Counter(), + "map_reserved_u44": Counter(), + "map_area_delta_abs_max": 0.0, + "map_area_delta_rel_max": 0.0, + "map_area_rel_gt_05_count": 0, + "map_normal_len_min": None, + "map_normal_len_max": None, + "map_records_total": 0, + } + + def add_issue(self, severity: str, category: str, resource: Path, message: str) -> None: + self.issues.append( + ValidationIssue( + severity=severity, + category=category, + resource=str(resource), + message=message, + ) + ) + + def _entry_payload(self, blob: bytes, entry: dict[str, Any]) -> bytes: + start = int(entry["data_offset"]) + end = start + int(entry["size"]) + return blob[start:end] + + def _entry_by_type(self, entries: list[dict[str, Any]]) -> dict[int, list[dict[str, Any]]]: + by_type: dict[int, list[dict[str, Any]]] = {} + for item in entries: + by_type.setdefault(int(item["type_id"]), []).append(item) + return by_type + + def _expect_single_type( + self, + *, + by_type: dict[int, list[dict[str, Any]]], + type_id: int, + label: str, + resource: Path, + required: bool, + ) -> dict[str, Any] | None: + rows = by_type.get(type_id, []) + if not rows: + if required: + self.add_issue( + "error", + "msh-chunk", + resource, + f"missing required chunk type={type_id} ({label})", + ) + return None + if len(rows) > 1: + self.add_issue( + "warning", + "msh-chunk", + resource, + f"multiple chunks type={type_id} ({label}); using first", + ) + return rows[0] + + def _check_stride( + self, + *, + resource: Path, + entry: dict[str, Any], + stride: int, + label: str, + ) -> int: + size = int(entry["size"]) + attr1 = int(entry["attr1"]) + attr2 = int(entry["attr2"]) + attr3 = int(entry["attr3"]) + self.stats["msh_attr_triplets"][int(entry["type_id"])][(attr1, attr2, attr3)] += 1 + + if size % stride != 0: + self.add_issue( + "error", + "msh-stride", + resource, + f"{label}: size={size} is not divisible by stride={stride}", + ) + return -1 + + count = size // stride + if attr1 != count: + self.add_issue( + "error", + "msh-attr", + resource, + f"{label}: attr1={attr1} != size/stride={count}", + ) + if attr3 != stride: + self.add_issue( + "error", + "msh-attr", + resource, + f"{label}: attr3={attr3} != {stride}", + ) + if attr2 != 0 and int(entry["type_id"]) not in (1,): + # type 1 has non-zero attr2 in real assets, others are expected zero. + self.add_issue( + "warning", + "msh-attr", + resource, + f"{label}: attr2={attr2} (expected 0 for this chunk type)", + ) + return count + + def validate_msh(self, path: Path) -> None: + self.stats["msh_total"] += 1 + blob = path.read_bytes() + if blob[:4] != MAGIC_NRES: + self.add_issue("error", "msh-container", path, "file is not NRes") + return + + try: + parsed = arv.parse_nres(blob, source=str(path)) + except Exception as exc: # pylint: disable=broad-except + self.add_issue("error", "msh-container", path, f"failed to parse NRes: {exc}") + return + + for issue in parsed.get("issues", []): + self.add_issue("warning", "msh-nres", path, issue) + + entries = parsed["entries"] + types_order = tuple(int(item["type_id"]) for item in entries) + self.stats["msh_type_orders"][types_order] += 1 + if types_order != EXPECTED_MSH_ORDER: + self.add_issue( + "warning", + "msh-order", + path, + f"unexpected chunk order {types_order}, expected {EXPECTED_MSH_ORDER}", + ) + + by_type = self._entry_by_type(entries) + + chunks: dict[int, dict[str, Any]] = {} + for type_id in REQUIRED_MSH_TYPES: + chunk = self._expect_single_type( + by_type=by_type, + type_id=type_id, + label=f"type{type_id}", + resource=path, + required=True, + ) + if chunk: + chunks[type_id] = chunk + for type_id in OPTIONAL_MSH_TYPES: + chunk = self._expect_single_type( + by_type=by_type, + type_id=type_id, + label=f"type{type_id}", + resource=path, + required=False, + ) + if chunk: + chunks[type_id] = chunk + + for type_id, stride in MSH_STRIDES.items(): + chunk = chunks.get(type_id) + if not chunk: + continue + self._check_stride(resource=path, entry=chunk, stride=stride, label=f"type{type_id}") + + # type 2 includes 0x8C-byte header + 68-byte slot table entries. + type2 = chunks.get(2) + if type2: + size = int(type2["size"]) + attr1 = int(type2["attr1"]) + attr2 = int(type2["attr2"]) + attr3 = int(type2["attr3"]) + self.stats["msh_attr_triplets"][2][(attr1, attr2, attr3)] += 1 + if attr3 != 68: + self.add_issue( + "error", + "msh-attr", + path, + f"type2: attr3={attr3} != 68", + ) + if attr2 != 0: + self.add_issue( + "warning", + "msh-attr", + path, + f"type2: attr2={attr2} (expected 0)", + ) + if size < SLOT_TABLE_OFFSET: + self.add_issue( + "error", + "msh-size", + path, + f"type2: size={size} < header_size={SLOT_TABLE_OFFSET}", + ) + elif (size - SLOT_TABLE_OFFSET) % 68 != 0: + self.add_issue( + "error", + "msh-size", + path, + f"type2: (size - 0x8C) is not divisible by 68 (size={size})", + ) + else: + slots_by_size = (size - SLOT_TABLE_OFFSET) // 68 + if attr1 != slots_by_size: + self.add_issue( + "error", + "msh-attr", + path, + f"type2: attr1={attr1} != (size-0x8C)/68={slots_by_size}", + ) + + verts = chunks.get(3) + face = chunks.get(21) + slots = chunks.get(2) + nodes = chunks.get(1) + type11 = chunks.get(11) + + if verts and face: + vcount = int(verts["attr1"]) + face_payload = self._entry_payload(blob, face) + fcount = int(face["attr1"]) + if len(face_payload) >= 28: + for idx in range(fcount): + off = idx * 28 + if off + 28 > len(face_payload): + self.add_issue( + "error", + "msh-face", + path, + f"type21 truncated at face {idx}", + ) + break + flags = struct.unpack_from("<I", face_payload, off)[0] + self.stats["msh_type21_flags_top"][flags] += 1 + i0, i1, i2 = struct.unpack_from("<HHH", face_payload, off + 8) + for name, value in (("i0", i0), ("i1", i1), ("i2", i2)): + if value >= vcount: + self.add_issue( + "error", + "msh-face-index", + path, + f"type21[{idx}].{name}={value} out of range vertex_count={vcount}", + ) + n0, n1, n2 = struct.unpack_from("<HHH", face_payload, off + 14) + for name, value in (("n0", n0), ("n1", n1), ("n2", n2)): + if value != 0xFFFF and value >= fcount: + self.add_issue( + "error", + "msh-face-neighbour", + path, + f"type21[{idx}].{name}={value} out of range face_count={fcount}", + ) + + if slots and face: + slot_count = int(slots["attr1"]) + face_count = int(face["attr1"]) + slot_payload = self._entry_payload(blob, slots) + need = SLOT_TABLE_OFFSET + slot_count * 68 + if len(slot_payload) < need: + self.add_issue( + "error", + "msh-slot", + path, + f"type2 payload too short: size={len(slot_payload)}, need_at_least={need}", + ) + else: + if len(slot_payload) != need: + self.add_issue( + "warning", + "msh-slot", + path, + f"type2 payload has trailing bytes: size={len(slot_payload)}, expected={need}", + ) + for idx in range(slot_count): + off = SLOT_TABLE_OFFSET + idx * 68 + tri_start, tri_count = struct.unpack_from("<HH", slot_payload, off) + if tri_start + tri_count > face_count: + self.add_issue( + "error", + "msh-slot-range", + path, + f"type2 slot[{idx}] range [{tri_start}, {tri_start + tri_count}) exceeds face_count={face_count}", + ) + + if nodes and slots: + node_payload = self._entry_payload(blob, nodes) + slot_count = int(slots["attr1"]) + node_count = int(nodes["attr1"]) + for node_idx in range(node_count): + off = node_idx * 38 + if off + 38 > len(node_payload): + self.add_issue( + "error", + "msh-node", + path, + f"type1 truncated at node {node_idx}", + ) + break + for j in range(19): + slot_id = struct.unpack_from("<H", node_payload, off + j * 2)[0] + if slot_id != 0xFFFF and slot_id >= slot_count: + self.add_issue( + "error", + "msh-node-slot", + path, + f"type1 node[{node_idx}] slot[{j}]={slot_id} out of range slot_count={slot_count}", + ) + + if type11: + payload = self._entry_payload(blob, type11) + if len(payload) >= 8: + w0, w1 = struct.unpack_from("<II", payload, 0) + self.stats["msh_type11_header_words"][(w0, w1)] += 1 + else: + self.add_issue( + "error", + "msh-type11", + path, + f"type11 payload too short: {len(payload)}", + ) + + def _update_minmax(self, key_min: str, key_max: str, value: float) -> None: + if self.stats[key_min] is None or value < self.stats[key_min]: + self.stats[key_min] = value + if self.stats[key_max] is None or value > self.stats[key_max]: + self.stats[key_max] = value + + def validate_map(self, path: Path) -> None: + self.stats["map_total"] += 1 + blob = path.read_bytes() + if blob[:4] != MAGIC_NRES: + self.add_issue("error", "map-container", path, "file is not NRes") + return + + try: + parsed = arv.parse_nres(blob, source=str(path)) + except Exception as exc: # pylint: disable=broad-except + self.add_issue("error", "map-container", path, f"failed to parse NRes: {exc}") + return + + for issue in parsed.get("issues", []): + self.add_issue("warning", "map-nres", path, issue) + + entries = parsed["entries"] + if len(entries) != 1 or int(entries[0]["type_id"]) != 12: + self.add_issue( + "error", + "map-chunk", + path, + f"expected single chunk type=12, got {[int(e['type_id']) for e in entries]}", + ) + return + + entry = entries[0] + areal_count = int(entry["attr1"]) + if areal_count <= 0: + self.add_issue("error", "map-areal", path, f"invalid areal_count={areal_count}") + return + + payload = self._entry_payload(blob, entry) + ptr = 0 + records: list[dict[str, Any]] = [] + + for idx in range(areal_count): + if ptr + 56 > len(payload): + self.add_issue( + "error", + "map-record", + path, + f"truncated areal header at index={idx}, ptr={ptr}, size={len(payload)}", + ) + return + + anchor_x, anchor_y, anchor_z = struct.unpack_from("<fff", payload, ptr) + u12 = struct.unpack_from("<I", payload, ptr + 12)[0] + area_f = struct.unpack_from("<f", payload, ptr + 16)[0] + nx, ny, nz = struct.unpack_from("<fff", payload, ptr + 20) + logic_flag = struct.unpack_from("<I", payload, ptr + 32)[0] + u36 = struct.unpack_from("<I", payload, ptr + 36)[0] + class_id = struct.unpack_from("<I", payload, ptr + 40)[0] + u44 = struct.unpack_from("<I", payload, ptr + 44)[0] + vertex_count, poly_count = struct.unpack_from("<II", payload, ptr + 48) + + self.stats["map_records_total"] += 1 + self.stats["map_logic_flags"][logic_flag] += 1 + self.stats["map_class_ids"][class_id] += 1 + self.stats["map_poly_count"][poly_count] += 1 + self.stats["map_reserved_u12"][u12] += 1 + self.stats["map_reserved_u36"][u36] += 1 + self.stats["map_reserved_u44"][u44] += 1 + self._update_minmax("map_vertex_count_min", "map_vertex_count_max", float(vertex_count)) + + normal_len = math.sqrt(nx * nx + ny * ny + nz * nz) + self._update_minmax("map_normal_len_min", "map_normal_len_max", normal_len) + if abs(normal_len - 1.0) > 1e-3: + self.add_issue( + "warning", + "map-normal", + path, + f"record[{idx}] normal length={normal_len:.6f} (expected ~1.0)", + ) + + vertices_off = ptr + 56 + vertices_size = 12 * vertex_count + if vertices_off + vertices_size > len(payload): + self.add_issue( + "error", + "map-vertices", + path, + f"record[{idx}] vertices out of bounds", + ) + return + + vertices: list[tuple[float, float, float]] = [] + for i in range(vertex_count): + vertices.append(struct.unpack_from("<fff", payload, vertices_off + i * 12)) + + if vertex_count >= 3: + # signed shoelace area in XY. + shoelace = 0.0 + for i in range(vertex_count): + x1, y1, _ = vertices[i] + x2, y2, _ = vertices[(i + 1) % vertex_count] + shoelace += x1 * y2 - x2 * y1 + area_xy = abs(shoelace) * 0.5 + delta = abs(area_xy - area_f) + if delta > self.stats["map_area_delta_abs_max"]: + self.stats["map_area_delta_abs_max"] = delta + rel_delta = delta / max(1.0, area_xy) + if rel_delta > self.stats["map_area_delta_rel_max"]: + self.stats["map_area_delta_rel_max"] = rel_delta + if rel_delta > 0.05: + self.stats["map_area_rel_gt_05_count"] += 1 + + links_off = vertices_off + vertices_size + link_count = vertex_count + 3 * poly_count + links_size = 8 * link_count + if links_off + links_size > len(payload): + self.add_issue( + "error", + "map-links", + path, + f"record[{idx}] link table out of bounds", + ) + return + + edge_links: list[tuple[int, int]] = [] + for i in range(vertex_count): + area_ref, edge_ref = struct.unpack_from("<ii", payload, links_off + i * 8) + edge_links.append((area_ref, edge_ref)) + + poly_links_off = links_off + 8 * vertex_count + poly_links: list[tuple[int, int]] = [] + for i in range(3 * poly_count): + area_ref, edge_ref = struct.unpack_from("<ii", payload, poly_links_off + i * 8) + poly_links.append((area_ref, edge_ref)) + + p = links_off + links_size + for poly_idx in range(poly_count): + if p + 4 > len(payload): + self.add_issue( + "error", + "map-poly", + path, + f"record[{idx}] poly header truncated at poly_idx={poly_idx}", + ) + return + n = struct.unpack_from("<I", payload, p)[0] + poly_size = 4 * (3 * n + 1) + if p + poly_size > len(payload): + self.add_issue( + "error", + "map-poly", + path, + f"record[{idx}] poly data out of bounds at poly_idx={poly_idx}", + ) + return + p += poly_size + + records.append( + { + "index": idx, + "anchor": (anchor_x, anchor_y, anchor_z), + "logic": logic_flag, + "class_id": class_id, + "vertex_count": vertex_count, + "poly_count": poly_count, + "edge_links": edge_links, + "poly_links": poly_links, + } + ) + ptr = p + + vertex_counts = [int(item["vertex_count"]) for item in records] + for rec in records: + idx = int(rec["index"]) + for link_idx, (area_ref, edge_ref) in enumerate(rec["edge_links"]): + if area_ref == -1: + if edge_ref != -1: + self.add_issue( + "warning", + "map-link", + path, + f"record[{idx}] edge_link[{link_idx}] has area_ref=-1 but edge_ref={edge_ref}", + ) + continue + if area_ref < 0 or area_ref >= areal_count: + self.add_issue( + "error", + "map-link", + path, + f"record[{idx}] edge_link[{link_idx}] area_ref={area_ref} out of range", + ) + continue + dst_vcount = vertex_counts[area_ref] + if edge_ref < 0 or edge_ref >= dst_vcount: + self.add_issue( + "error", + "map-link", + path, + f"record[{idx}] edge_link[{link_idx}] edge_ref={edge_ref} out of range dst_vertex_count={dst_vcount}", + ) + + for link_idx, (area_ref, edge_ref) in enumerate(rec["poly_links"]): + if area_ref == -1: + if edge_ref != -1: + self.add_issue( + "warning", + "map-poly-link", + path, + f"record[{idx}] poly_link[{link_idx}] has area_ref=-1 but edge_ref={edge_ref}", + ) + continue + if area_ref < 0 or area_ref >= areal_count: + self.add_issue( + "error", + "map-poly-link", + path, + f"record[{idx}] poly_link[{link_idx}] area_ref={area_ref} out of range", + ) + + if ptr + 8 > len(payload): + self.add_issue( + "error", + "map-cells", + path, + f"missing cells header at ptr={ptr}, size={len(payload)}", + ) + return + + cells_x, cells_y = struct.unpack_from("<II", payload, ptr) + self.stats["map_cell_dims"][(cells_x, cells_y)] += 1 + ptr += 8 + if cells_x <= 0 or cells_y <= 0: + self.add_issue( + "error", + "map-cells", + path, + f"invalid cells dimensions {cells_x}x{cells_y}", + ) + return + + for x in range(cells_x): + for y in range(cells_y): + if ptr + 2 > len(payload): + self.add_issue( + "error", + "map-cells", + path, + f"truncated hitCount at cell ({x},{y})", + ) + return + hit_count = struct.unpack_from("<H", payload, ptr)[0] + ptr += 2 + need = 2 * hit_count + if ptr + need > len(payload): + self.add_issue( + "error", + "map-cells", + path, + f"truncated areaIds at cell ({x},{y}), hitCount={hit_count}", + ) + return + for i in range(hit_count): + area_id = struct.unpack_from("<H", payload, ptr + 2 * i)[0] + if area_id >= areal_count: + self.add_issue( + "error", + "map-cells", + path, + f"cell ({x},{y}) has area_id={area_id} out of range areal_count={areal_count}", + ) + ptr += need + + if ptr != len(payload): + self.add_issue( + "error", + "map-size", + path, + f"payload tail mismatch: consumed={ptr}, payload_size={len(payload)}", + ) + + def validate(self, maps_root: Path) -> None: + msh_paths = sorted(maps_root.rglob("Land.msh")) + map_paths = sorted(maps_root.rglob("Land.map")) + + msh_by_dir = {path.parent: path for path in msh_paths} + map_by_dir = {path.parent: path for path in map_paths} + + all_dirs = sorted(set(msh_by_dir) | set(map_by_dir)) + self.stats["maps_total"] = len(all_dirs) + + for folder in all_dirs: + msh_path = msh_by_dir.get(folder) + map_path = map_by_dir.get(folder) + if msh_path is None: + self.add_issue("error", "pairing", folder, "missing Land.msh") + continue + if map_path is None: + self.add_issue("error", "pairing", folder, "missing Land.map") + continue + self.validate_msh(msh_path) + self.validate_map(map_path) + + def build_report(self) -> dict[str, Any]: + errors = [i for i in self.issues if i.severity == "error"] + warnings = [i for i in self.issues if i.severity == "warning"] + + # Convert counters/defaultdicts to JSON-friendly dicts. + msh_orders = { + str(list(order)): count + for order, count in self.stats["msh_type_orders"].most_common() + } + msh_attrs = { + str(type_id): {str(list(k)): v for k, v in counter.most_common()} + for type_id, counter in self.stats["msh_attr_triplets"].items() + } + type11_hdr = { + str(list(key)): value + for key, value in self.stats["msh_type11_header_words"].most_common() + } + type21_flags = { + f"0x{key:08X}": value + for key, value in self.stats["msh_type21_flags_top"].most_common(32) + } + + return { + "summary": { + "maps_total": self.stats["maps_total"], + "msh_total": self.stats["msh_total"], + "map_total": self.stats["map_total"], + "issues_total": len(self.issues), + "errors_total": len(errors), + "warnings_total": len(warnings), + }, + "stats": { + "msh_type_orders": msh_orders, + "msh_attr_triplets": msh_attrs, + "msh_type11_header_words": type11_hdr, + "msh_type21_flags_top": type21_flags, + "map_logic_flags": dict(self.stats["map_logic_flags"]), + "map_class_ids": dict(self.stats["map_class_ids"]), + "map_poly_count": dict(self.stats["map_poly_count"]), + "map_vertex_count_min": self.stats["map_vertex_count_min"], + "map_vertex_count_max": self.stats["map_vertex_count_max"], + "map_cell_dims": {str(list(k)): v for k, v in self.stats["map_cell_dims"].items()}, + "map_reserved_u12": dict(self.stats["map_reserved_u12"]), + "map_reserved_u36": dict(self.stats["map_reserved_u36"]), + "map_reserved_u44": dict(self.stats["map_reserved_u44"]), + "map_area_delta_abs_max": self.stats["map_area_delta_abs_max"], + "map_area_delta_rel_max": self.stats["map_area_delta_rel_max"], + "map_area_rel_gt_05_count": self.stats["map_area_rel_gt_05_count"], + "map_normal_len_min": self.stats["map_normal_len_min"], + "map_normal_len_max": self.stats["map_normal_len_max"], + "map_records_total": self.stats["map_records_total"], + }, + "issues": [ + { + "severity": item.severity, + "category": item.category, + "resource": item.resource, + "message": item.message, + } + for item in self.issues + ], + } + + +def parse_args() -> argparse.Namespace: + parser = argparse.ArgumentParser(description="Validate terrain/map doc assumptions") + parser.add_argument( + "--maps-root", + type=Path, + default=Path("tmp/gamedata/DATA/MAPS"), + help="Root directory containing MAPS/**/Land.msh and Land.map", + ) + parser.add_argument( + "--report-json", + type=Path, + default=None, + help="Optional path to save full JSON report", + ) + parser.add_argument( + "--fail-on-warning", + action="store_true", + help="Return non-zero exit code on warnings too", + ) + return parser.parse_args() + + +def main() -> int: + args = parse_args() + validator = TerrainMapDocValidator() + validator.validate(args.maps_root) + report = validator.build_report() + + print( + json.dumps( + report["summary"], + indent=2, + ensure_ascii=False, + ) + ) + + if args.report_json: + args.report_json.parent.mkdir(parents=True, exist_ok=True) + with args.report_json.open("w", encoding="utf-8") as handle: + json.dump(report, handle, indent=2, ensure_ascii=False) + handle.write("\n") + print(f"report written: {args.report_json}") + + has_errors = report["summary"]["errors_total"] > 0 + has_warnings = report["summary"]["warnings_total"] > 0 + if has_errors: + return 1 + if args.fail_on_warning and has_warnings: + return 1 + return 0 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/tools/terrain_map_preview_renderer.py b/tools/terrain_map_preview_renderer.py new file mode 100644 index 0000000..86d72d7 --- /dev/null +++ b/tools/terrain_map_preview_renderer.py @@ -0,0 +1,679 @@ +#!/usr/bin/env python3 +""" +Software 3D renderer for terrain Land.msh + Land.map overlay. + +Output format: binary PPM (P6), dependency-free. +""" + +from __future__ import annotations + +import argparse +import math +import struct +from pathlib import Path +from typing import Any + +import archive_roundtrip_validator as arv + +MAGIC_NRES = b"NRes" + + +def _entry_payload(blob: bytes, entry: dict[str, Any]) -> bytes: + start = int(entry["data_offset"]) + end = start + int(entry["size"]) + return blob[start:end] + + +def _parse_nres(blob: bytes, source: str) -> dict[str, Any]: + if blob[:4] != MAGIC_NRES: + raise RuntimeError(f"{source}: not an NRes payload") + return arv.parse_nres(blob, source=source) + + +def _by_type(entries: list[dict[str, Any]]) -> dict[int, list[dict[str, Any]]]: + out: dict[int, list[dict[str, Any]]] = {} + for row in entries: + out.setdefault(int(row["type_id"]), []).append(row) + return out + + +def _get_single(by_type: dict[int, list[dict[str, Any]]], type_id: int, label: str) -> dict[str, Any]: + rows = by_type.get(type_id, []) + if not rows: + raise RuntimeError(f"missing resource type {type_id} ({label})") + return rows[0] + + +def _downsample_faces( + faces: list[tuple[int, int, int]], + max_faces: int, +) -> list[tuple[int, int, int]]: + if max_faces <= 0 or len(faces) <= max_faces: + return faces + step = len(faces) / max_faces + out: list[tuple[int, int, int]] = [] + pos = 0.0 + while len(out) < max_faces and int(pos) < len(faces): + out.append(faces[int(pos)]) + pos += step + return out + + +def load_terrain_msh( + path: Path, + *, + max_faces: int, +) -> tuple[list[tuple[float, float, float]], list[tuple[int, int, int]], dict[str, int]]: + blob = path.read_bytes() + parsed = _parse_nres(blob, str(path)) + by_type = _by_type(parsed["entries"]) + + res3 = _get_single(by_type, 3, "positions") + res21 = _get_single(by_type, 21, "terrain faces") + + pos_blob = _entry_payload(blob, res3) + if len(pos_blob) % 12 != 0: + raise RuntimeError(f"{path}: type 3 payload size is not divisible by 12") + vertex_count = len(pos_blob) // 12 + positions = [struct.unpack_from("<3f", pos_blob, i * 12) for i in range(vertex_count)] + + face_blob = _entry_payload(blob, res21) + if len(face_blob) % 28 != 0: + raise RuntimeError(f"{path}: type 21 payload size is not divisible by 28") + all_faces: list[tuple[int, int, int]] = [] + raw_face_count = len(face_blob) // 28 + dropped = 0 + for i in range(raw_face_count): + off = i * 28 + i0, i1, i2 = struct.unpack_from("<HHH", face_blob, off + 8) + if i0 >= vertex_count or i1 >= vertex_count or i2 >= vertex_count: + dropped += 1 + continue + all_faces.append((i0, i1, i2)) + + faces = _downsample_faces(all_faces, max_faces) + meta = { + "vertex_count": vertex_count, + "face_count_raw": raw_face_count, + "face_count_valid": len(all_faces), + "face_count_rendered": len(faces), + "face_dropped_invalid": dropped, + } + return positions, faces, meta + + +def load_areal_map(path: Path) -> tuple[list[dict[str, Any]], dict[str, int]]: + blob = path.read_bytes() + parsed = _parse_nres(blob, str(path)) + by_type = _by_type(parsed["entries"]) + chunk = _get_single(by_type, 12, "ArealMapGeometry") + + payload = _entry_payload(blob, chunk) + areal_count = int(chunk["attr1"]) + ptr = 0 + areals: list[dict[str, Any]] = [] + for idx in range(areal_count): + if ptr + 56 > len(payload): + raise RuntimeError(f"{path}: truncated areal header at index={idx}") + class_id = struct.unpack_from("<I", payload, ptr + 40)[0] + vertex_count, poly_count = struct.unpack_from("<II", payload, ptr + 48) + verts_off = ptr + 56 + verts_size = 12 * vertex_count + if verts_off + verts_size > len(payload): + raise RuntimeError(f"{path}: areal[{idx}] vertices out of bounds") + verts = [struct.unpack_from("<3f", payload, verts_off + 12 * i) for i in range(vertex_count)] + + links_off = verts_off + verts_size + links_size = 8 * (vertex_count + 3 * poly_count) + p = links_off + links_size + for _ in range(poly_count): + if p + 4 > len(payload): + raise RuntimeError(f"{path}: areal[{idx}] poly header out of bounds") + n = struct.unpack_from("<I", payload, p)[0] + p += 4 * (3 * n + 1) + if p > len(payload): + raise RuntimeError(f"{path}: areal[{idx}] poly data out of bounds") + + areals.append( + { + "index": idx, + "class_id": class_id, + "vertices": verts, + } + ) + ptr = p + + if ptr + 8 > len(payload): + raise RuntimeError(f"{path}: missing cells section") + cells_x, cells_y = struct.unpack_from("<II", payload, ptr) + ptr += 8 + for _x in range(cells_x): + for _y in range(cells_y): + if ptr + 2 > len(payload): + raise RuntimeError(f"{path}: cells section truncated") + hit_count = struct.unpack_from("<H", payload, ptr)[0] + ptr += 2 + 2 * hit_count + if ptr > len(payload): + raise RuntimeError(f"{path}: cells section out of bounds") + if ptr != len(payload): + raise RuntimeError(f"{path}: trailing bytes in chunk12 parse ({len(payload) - ptr})") + + meta = { + "areal_count": areal_count, + "cells_x": cells_x, + "cells_y": cells_y, + } + return areals, meta + + +def _color_for_class(class_id: int) -> tuple[int, int, int]: + x = (class_id * 1103515245 + 12345) & 0x7FFFFFFF + r = 60 + (x & 0x7F) + g = 60 + ((x >> 7) & 0x7F) + b = 60 + ((x >> 14) & 0x7F) + return r, g, b + + +def _write_ppm(path: Path, width: int, height: int, rgb: bytearray) -> None: + path.parent.mkdir(parents=True, exist_ok=True) + with path.open("wb") as handle: + handle.write(f"P6\n{width} {height}\n255\n".encode("ascii")) + handle.write(rgb) + + +def _write_obj( + path: Path, + terrain_positions: list[tuple[float, float, float]], + terrain_faces: list[tuple[int, int, int]], + areals: list[dict[str, Any]], + *, + include_areals: bool, +) -> None: + path.parent.mkdir(parents=True, exist_ok=True) + with path.open("w", encoding="utf-8", newline="\n") as out: + out.write("# Exported by terrain_map_preview_renderer.py\n") + out.write("o terrain\n") + for x, y, z in terrain_positions: + out.write(f"v {x:.9g} {y:.9g} {z:.9g}\n") + for i0, i1, i2 in terrain_faces: + # OBJ indices are 1-based. + out.write(f"f {i0 + 1} {i1 + 1} {i2 + 1}\n") + + if include_areals and areals: + base = len(terrain_positions) + area_vertex_counts: list[int] = [] + out.write("o areal_edges\n") + for area in areals: + verts = area["vertices"] + area_vertex_counts.append(len(verts)) + for x, y, z in verts: + out.write(f"v {x:.9g} {y:.9g} {z:.9g}\n") + + ptr = base + for area_idx, area in enumerate(areals): + cnt = area_vertex_counts[area_idx] + if cnt < 2: + ptr += cnt + continue + # closed polyline. + line = [str(ptr + i + 1) for i in range(cnt)] + line.append(str(ptr + 1)) + out.write("l " + " ".join(line) + "\n") + ptr += cnt + + +def _render_scene( + terrain_positions: list[tuple[float, float, float]], + terrain_faces: list[tuple[int, int, int]], + areals: list[dict[str, Any]], + *, + width: int, + height: int, + yaw_deg: float, + pitch_deg: float, + wireframe: bool, + areal_overlay: bool, +) -> bytearray: + all_positions = list(terrain_positions) + if areal_overlay: + for area in areals: + all_positions.extend(area["vertices"]) + if not all_positions: + raise RuntimeError("scene is empty") + + xs = [p[0] for p in all_positions] + ys = [p[1] for p in all_positions] + zs = [p[2] for p in all_positions] + cx = (min(xs) + max(xs)) * 0.5 + cy = (min(ys) + max(ys)) * 0.5 + cz = (min(zs) + max(zs)) * 0.5 + span = max(max(xs) - min(xs), max(ys) - min(ys), max(zs) - min(zs)) + radius = max(span * 0.5, 1e-3) + + yaw = math.radians(yaw_deg) + pitch = math.radians(pitch_deg) + cyaw = math.cos(yaw) + syaw = math.sin(yaw) + cpitch = math.cos(pitch) + spitch = math.sin(pitch) + camera_dist = radius * 3.2 + scale = min(width, height) * 0.96 + + # Terrain transform cache. + vx: list[float] = [] + vy: list[float] = [] + vz: list[float] = [] + sx: list[float] = [] + sy: list[float] = [] + for x, y, z in terrain_positions: + x0 = x - cx + y0 = y - cy + z0 = z - cz + x1 = cyaw * x0 + syaw * z0 + z1 = -syaw * x0 + cyaw * z0 + y2 = cpitch * y0 - spitch * z1 + z2 = spitch * y0 + cpitch * z1 + camera_dist + if z2 < 1e-3: + z2 = 1e-3 + vx.append(x1) + vy.append(y2) + vz.append(z2) + sx.append(width * 0.5 + (x1 / z2) * scale) + sy.append(height * 0.5 - (y2 / z2) * scale) + + def project_point(x: float, y: float, z: float) -> tuple[float, float, float]: + x0 = x - cx + y0 = y - cy + z0 = z - cz + x1 = cyaw * x0 + syaw * z0 + z1 = -syaw * x0 + cyaw * z0 + y2 = cpitch * y0 - spitch * z1 + z2 = spitch * y0 + cpitch * z1 + camera_dist + if z2 < 1e-3: + z2 = 1e-3 + px = width * 0.5 + (x1 / z2) * scale + py = height * 0.5 - (y2 / z2) * scale + return px, py, z2 + + rgb = bytearray([14, 16, 20] * (width * height)) + zbuf = [float("inf")] * (width * height) + light_dir = (0.35, 0.45, 1.0) + l_len = math.sqrt(light_dir[0] ** 2 + light_dir[1] ** 2 + light_dir[2] ** 2) + light = (light_dir[0] / l_len, light_dir[1] / l_len, light_dir[2] / l_len) + + def edge(ax: float, ay: float, bx: float, by: float, px: float, py: float) -> float: + return (px - ax) * (by - ay) - (py - ay) * (bx - ax) + + for i0, i1, i2 in terrain_faces: + x0 = sx[i0] + y0 = sy[i0] + x1 = sx[i1] + y1 = sy[i1] + x2 = sx[i2] + y2 = sy[i2] + area = edge(x0, y0, x1, y1, x2, y2) + if area == 0.0: + continue + + ux = vx[i1] - vx[i0] + uy = vy[i1] - vy[i0] + uz = vz[i1] - vz[i0] + wx = vx[i2] - vx[i0] + wy = vy[i2] - vy[i0] + wz = vz[i2] - vz[i0] + nx = uy * wz - uz * wy + ny = uz * wx - ux * wz + nz = ux * wy - uy * wx + n_len = math.sqrt(nx * nx + ny * ny + nz * nz) + if n_len > 0.0: + nx /= n_len + ny /= n_len + nz /= n_len + intensity = nx * light[0] + ny * light[1] + nz * light[2] + if intensity < 0.0: + intensity = 0.0 + shade = int(45 + 185 * intensity) + color = (min(255, shade + 6), min(255, shade + 14), min(255, shade + 28)) + + minx = int(max(0, math.floor(min(x0, x1, x2)))) + maxx = int(min(width - 1, math.ceil(max(x0, x1, x2)))) + miny = int(max(0, math.floor(min(y0, y1, y2)))) + maxy = int(min(height - 1, math.ceil(max(y0, y1, y2)))) + if minx > maxx or miny > maxy: + continue + + z0 = vz[i0] + z1 = vz[i1] + z2 = vz[i2] + inv_area = 1.0 / area + for py in range(miny, maxy + 1): + fy = py + 0.5 + row = py * width + for px in range(minx, maxx + 1): + fx = px + 0.5 + w0 = edge(x1, y1, x2, y2, fx, fy) + w1 = edge(x2, y2, x0, y0, fx, fy) + w2 = edge(x0, y0, x1, y1, fx, fy) + if area > 0: + if w0 < 0 or w1 < 0 or w2 < 0: + continue + else: + if w0 > 0 or w1 > 0 or w2 > 0: + continue + bz0 = w0 * inv_area + bz1 = w1 * inv_area + bz2 = w2 * inv_area + depth = bz0 * z0 + bz1 * z1 + bz2 * z2 + idx = row + px + if depth >= zbuf[idx]: + continue + zbuf[idx] = depth + p = idx * 3 + rgb[p + 0] = color[0] + rgb[p + 1] = color[1] + rgb[p + 2] = color[2] + + def draw_line( + xa: float, + ya: float, + xb: float, + yb: float, + color: tuple[int, int, int], + ) -> None: + x0i = int(round(xa)) + y0i = int(round(ya)) + x1i = int(round(xb)) + y1i = int(round(yb)) + dx = abs(x1i - x0i) + sx_step = 1 if x0i < x1i else -1 + dy = -abs(y1i - y0i) + sy_step = 1 if y0i < y1i else -1 + err = dx + dy + x = x0i + y = y0i + while True: + if 0 <= x < width and 0 <= y < height: + p = (y * width + x) * 3 + rgb[p + 0] = color[0] + rgb[p + 1] = color[1] + rgb[p + 2] = color[2] + if x == x1i and y == y1i: + break + e2 = 2 * err + if e2 >= dy: + err += dy + x += sx_step + if e2 <= dx: + err += dx + y += sy_step + + if wireframe: + wf = (225, 232, 246) + for i0, i1, i2 in terrain_faces: + draw_line(sx[i0], sy[i0], sx[i1], sy[i1], wf) + draw_line(sx[i1], sy[i1], sx[i2], sy[i2], wf) + draw_line(sx[i2], sy[i2], sx[i0], sy[i0], wf) + + if areal_overlay: + for area in areals: + verts = area["vertices"] + if len(verts) < 2: + continue + color = _color_for_class(int(area["class_id"])) + projected = [project_point(x, y, z + 0.35) for x, y, z in verts] + for i in range(len(projected)): + x0, y0, _ = projected[i] + x1, y1, _ = projected[(i + 1) % len(projected)] + draw_line(x0, y0, x1, y1, color) + + return rgb + + +def cmd_render(args: argparse.Namespace) -> int: + msh_path = Path(args.land_msh).resolve() + map_path = Path(args.land_map).resolve() if args.land_map else None + output_path = Path(args.output).resolve() + + positions, faces, terrain_meta = load_terrain_msh(msh_path, max_faces=int(args.max_faces)) + areals: list[dict[str, Any]] = [] + map_meta: dict[str, int] = {"areal_count": 0, "cells_x": 0, "cells_y": 0} + if map_path: + areals, map_meta = load_areal_map(map_path) + + rgb = _render_scene( + positions, + faces, + areals, + width=int(args.width), + height=int(args.height), + yaw_deg=float(args.yaw), + pitch_deg=float(args.pitch), + wireframe=bool(args.wireframe), + areal_overlay=bool(args.overlay_areals), + ) + _write_ppm(output_path, int(args.width), int(args.height), rgb) + + print(f"Rendered terrain : {msh_path}") + if map_path: + print(f"Areal overlay : {map_path}") + print(f"Output : {output_path}") + print( + "Terrain geometry : " + f"vertices={terrain_meta['vertex_count']}, " + f"faces={terrain_meta['face_count_rendered']}/{terrain_meta['face_count_valid']} " + f"(raw={terrain_meta['face_count_raw']}, dropped={terrain_meta['face_dropped_invalid']})" + ) + if map_path: + print( + "Areal map : " + f"areals={map_meta['areal_count']}, cells={map_meta['cells_x']}x{map_meta['cells_y']}" + ) + return 0 + + +def cmd_export_obj(args: argparse.Namespace) -> int: + msh_path = Path(args.land_msh).resolve() + map_path = Path(args.land_map).resolve() if args.land_map else None + output_path = Path(args.output).resolve() + + positions, faces, terrain_meta = load_terrain_msh(msh_path, max_faces=int(args.max_faces)) + areals: list[dict[str, Any]] = [] + if map_path and bool(args.include_areals): + areals, _ = load_areal_map(map_path) + + _write_obj( + output_path, + positions, + faces, + areals, + include_areals=bool(args.include_areals), + ) + + areal_vertices = sum(len(a["vertices"]) for a in areals) + print(f"Terrain source : {msh_path}") + if map_path: + print(f"Areal source : {map_path}") + print(f"OBJ output : {output_path}") + print( + "Terrain geometry : " + f"vertices={terrain_meta['vertex_count']}, " + f"faces={terrain_meta['face_count_rendered']}/{terrain_meta['face_count_valid']}" + ) + if bool(args.include_areals): + print(f"Areal edges : areals={len(areals)}, extra_vertices={areal_vertices}") + return 0 + + +def cmd_render_turntable(args: argparse.Namespace) -> int: + msh_path = Path(args.land_msh).resolve() + map_path = Path(args.land_map).resolve() if args.land_map else None + output_dir = Path(args.output_dir).resolve() + output_dir.mkdir(parents=True, exist_ok=True) + + frames = int(args.frames) + if frames <= 0: + raise RuntimeError("--frames must be > 0") + + positions, faces, terrain_meta = load_terrain_msh(msh_path, max_faces=int(args.max_faces)) + areals: list[dict[str, Any]] = [] + if map_path: + areals, _ = load_areal_map(map_path) + + yaw_start = float(args.yaw_start) + yaw_end = float(args.yaw_end) + if frames == 1: + yaws = [yaw_start] + else: + step = (yaw_end - yaw_start) / (frames - 1) + yaws = [yaw_start + i * step for i in range(frames)] + + prefix = str(args.prefix) + for i, yaw in enumerate(yaws): + rgb = _render_scene( + positions, + faces, + areals, + width=int(args.width), + height=int(args.height), + yaw_deg=yaw, + pitch_deg=float(args.pitch), + wireframe=bool(args.wireframe), + areal_overlay=bool(args.overlay_areals), + ) + out = output_dir / f"{prefix}_{i:03d}.ppm" + _write_ppm(out, int(args.width), int(args.height), rgb) + + print(f"Turntable source : {msh_path}") + if map_path: + print(f"Areal source : {map_path}") + print(f"Output dir : {output_dir}") + print(f"Frames : {frames} ({yaws[0]:.3f} -> {yaws[-1]:.3f} yaw)") + print( + "Terrain geometry : " + f"vertices={terrain_meta['vertex_count']}, faces={terrain_meta['face_count_rendered']}" + ) + return 0 + + +def cmd_render_batch(args: argparse.Namespace) -> int: + maps_root = Path(args.maps_root).resolve() + output_dir = Path(args.output_dir).resolve() + msh_paths = sorted(maps_root.rglob("Land.msh")) + if not msh_paths: + raise RuntimeError(f"no Land.msh files under {maps_root}") + + rendered = 0 + skipped = 0 + for msh_path in msh_paths: + map_path = msh_path.with_name("Land.map") + if not map_path.exists(): + skipped += 1 + continue + rel = msh_path.parent.relative_to(maps_root) + out = output_dir / f"{rel.as_posix().replace('/', '__')}.ppm" + cmd_render( + argparse.Namespace( + land_msh=str(msh_path), + land_map=str(map_path), + output=str(out), + max_faces=args.max_faces, + width=args.width, + height=args.height, + yaw=args.yaw, + pitch=args.pitch, + wireframe=args.wireframe, + overlay_areals=args.overlay_areals, + ) + ) + rendered += 1 + + print(f"Batch summary: rendered={rendered}, skipped_no_map={skipped}, output_dir={output_dir}") + return 0 + + +def build_parser() -> argparse.ArgumentParser: + parser = argparse.ArgumentParser( + description="Software 3D terrain renderer (Land.msh + optional Land.map overlay)." + ) + sub = parser.add_subparsers(dest="command", required=True) + + render = sub.add_parser("render", help="Render one terrain map to PPM.") + render.add_argument("--land-msh", required=True, help="Path to Land.msh") + render.add_argument("--land-map", help="Path to Land.map (optional)") + render.add_argument("--output", required=True, help="Output .ppm path") + render.add_argument("--max-faces", type=int, default=220000, help="Face limit (default: 220000)") + render.add_argument("--width", type=int, default=1280, help="Image width (default: 1280)") + render.add_argument("--height", type=int, default=720, help="Image height (default: 720)") + render.add_argument("--yaw", type=float, default=38.0, help="Yaw angle in degrees (default: 38)") + render.add_argument("--pitch", type=float, default=26.0, help="Pitch angle in degrees (default: 26)") + render.add_argument("--wireframe", action="store_true", help="Draw terrain wireframe overlay") + render.add_argument( + "--overlay-areals", + action="store_true", + help="Draw ArealMap polygon overlay", + ) + render.set_defaults(func=cmd_render) + + export_obj = sub.add_parser("export-obj", help="Export terrain (and optional areal edges) to OBJ.") + export_obj.add_argument("--land-msh", required=True, help="Path to Land.msh") + export_obj.add_argument("--land-map", help="Path to Land.map (optional)") + export_obj.add_argument("--output", required=True, help="Output .obj path") + export_obj.add_argument("--max-faces", type=int, default=0, help="Face limit (0 = all)") + export_obj.add_argument( + "--include-areals", + action="store_true", + help="Export areal polygons as OBJ polyline object", + ) + export_obj.set_defaults(func=cmd_export_obj) + + turn = sub.add_parser("render-turntable", help="Render turntable frame sequence to PPM.") + turn.add_argument("--land-msh", required=True, help="Path to Land.msh") + turn.add_argument("--land-map", help="Path to Land.map (optional)") + turn.add_argument("--output-dir", required=True, help="Output directory for frames") + turn.add_argument("--prefix", default="frame", help="Frame filename prefix (default: frame)") + turn.add_argument("--frames", type=int, default=36, help="Frame count (default: 36)") + turn.add_argument("--yaw-start", type=float, default=0.0, help="Start yaw in degrees (default: 0)") + turn.add_argument("--yaw-end", type=float, default=360.0, help="End yaw in degrees (default: 360)") + turn.add_argument("--pitch", type=float, default=26.0, help="Pitch angle in degrees (default: 26)") + turn.add_argument("--max-faces", type=int, default=160000, help="Face limit (default: 160000)") + turn.add_argument("--width", type=int, default=960, help="Image width (default: 960)") + turn.add_argument("--height", type=int, default=540, help="Image height (default: 540)") + turn.add_argument("--wireframe", action="store_true", help="Draw terrain wireframe overlay") + turn.add_argument( + "--overlay-areals", + action="store_true", + help="Draw ArealMap polygon overlay", + ) + turn.set_defaults(func=cmd_render_turntable) + + batch = sub.add_parser("render-batch", help="Render all MAPS/**/Land.msh under root.") + batch.add_argument( + "--maps-root", + default="tmp/gamedata/DATA/MAPS", + help="Root directory with MAPS subfolders (default: tmp/gamedata/DATA/MAPS)", + ) + batch.add_argument("--output-dir", required=True, help="Directory for output PPM files") + batch.add_argument("--max-faces", type=int, default=90000, help="Face limit per map (default: 90000)") + batch.add_argument("--width", type=int, default=960, help="Image width (default: 960)") + batch.add_argument("--height", type=int, default=540, help="Image height (default: 540)") + batch.add_argument("--yaw", type=float, default=38.0, help="Yaw angle in degrees (default: 38)") + batch.add_argument("--pitch", type=float, default=26.0, help="Pitch angle in degrees (default: 26)") + batch.add_argument("--wireframe", action="store_true", help="Draw terrain wireframe overlay") + batch.add_argument( + "--overlay-areals", + action="store_true", + help="Draw ArealMap polygon overlay", + ) + batch.set_defaults(func=cmd_render_batch) + + return parser + + +def main() -> int: + parser = build_parser() + args = parser.parse_args() + return int(args.func(args)) + + +if __name__ == "__main__": + raise SystemExit(main()) |
