aboutsummaryrefslogtreecommitdiff
path: root/tools
diff options
context:
space:
mode:
Diffstat (limited to 'tools')
-rw-r--r--tools/fxid_abs100_audit.py262
-rw-r--r--tools/terrain_map_doc_validator.py809
-rw-r--r--tools/terrain_map_preview_renderer.py679
3 files changed, 1750 insertions, 0 deletions
diff --git a/tools/fxid_abs100_audit.py b/tools/fxid_abs100_audit.py
new file mode 100644
index 0000000..79f3b92
--- /dev/null
+++ b/tools/fxid_abs100_audit.py
@@ -0,0 +1,262 @@
+#!/usr/bin/env python3
+"""
+Deterministic audit for FXID "absolute parity" checklist.
+
+What this script produces:
+1) strict parsing stats across all FXID payloads in NRes archives,
+2) opcode histogram and rare-branch counters (op6, op1 tail usage),
+3) reference vectors for RNG core (sub_10002220 semantics).
+"""
+
+from __future__ import annotations
+
+import argparse
+import json
+import struct
+from collections import Counter
+from pathlib import Path
+from typing import Any
+
+import archive_roundtrip_validator as arv
+
+TYPE_FXID = 0x44495846
+FX_CMD_SIZE = {1: 224, 2: 148, 3: 200, 4: 204, 5: 112, 6: 4, 7: 208, 8: 248, 9: 208, 10: 208}
+
+
+def _entry_payload(blob: bytes, entry: dict[str, Any]) -> bytes:
+ start = int(entry["data_offset"])
+ end = start + int(entry["size"])
+ return blob[start:end]
+
+
+def _cstr32(raw: bytes) -> str:
+ return raw.split(b"\x00", 1)[0].decode("latin1", errors="replace")
+
+
+def _rng_step_sub_10002220(state32: int) -> tuple[int, int]:
+ """
+ sub_10002220 semantics in 32-bit packed state form:
+ lo = state[15:0], hi = state[31:16]
+ new_lo = hi ^ (lo << 1)
+ new_hi = (hi >> 1) ^ new_lo
+ return new_hi (u16), update state=(new_hi<<16)|new_lo
+ """
+ lo = state32 & 0xFFFF
+ hi = (state32 >> 16) & 0xFFFF
+ new_lo = (hi ^ ((lo << 1) & 0xFFFF)) & 0xFFFF
+ new_hi = ((hi >> 1) ^ new_lo) & 0xFFFF
+ return ((new_hi << 16) | new_lo), new_hi
+
+
+def _rng_vectors() -> dict[str, Any]:
+ seeds = [0x00000000, 0x00000001, 0x12345678, 0x89ABCDEF, 0xFFFFFFFF]
+ out: list[dict[str, Any]] = []
+ for seed in seeds:
+ state = seed
+ outputs: list[int] = []
+ states: list[int] = []
+ for _ in range(16):
+ state, value = _rng_step_sub_10002220(state)
+ outputs.append(value)
+ states.append(state)
+ out.append(
+ {
+ "seed_hex": f"0x{seed:08X}",
+ "outputs_u16_hex": [f"0x{x:04X}" for x in outputs],
+ "states_u32_hex": [f"0x{x:08X}" for x in states],
+ }
+ )
+ return {"generator": "sub_10002220", "vectors": out}
+
+
+def run_audit(root: Path) -> dict[str, Any]:
+ counters: Counter[str] = Counter()
+ opcode_hist: Counter[int] = Counter()
+ issues: list[dict[str, Any]] = []
+ op1_tail6_samples: list[dict[str, Any]] = []
+ op1_optref_samples: list[dict[str, Any]] = []
+
+ for item in arv.scan_archives(root):
+ if item["type"] != "nres":
+ continue
+ archive_path = root / item["relative_path"]
+ counters["archives_total"] += 1
+ data = archive_path.read_bytes()
+ try:
+ parsed = arv.parse_nres(data, source=str(archive_path))
+ except Exception as exc: # pylint: disable=broad-except
+ issues.append(
+ {
+ "severity": "error",
+ "archive": str(archive_path),
+ "entry": None,
+ "message": f"cannot parse NRes: {exc}",
+ }
+ )
+ continue
+
+ for entry in parsed["entries"]:
+ if int(entry["type_id"]) != TYPE_FXID:
+ continue
+ counters["fxid_total"] += 1
+ payload = _entry_payload(data, entry)
+ entry_name = str(entry["name"])
+
+ if len(payload) < 60:
+ issues.append(
+ {
+ "severity": "error",
+ "archive": str(archive_path),
+ "entry": entry_name,
+ "message": f"payload too small: {len(payload)}",
+ }
+ )
+ continue
+
+ cmd_count = struct.unpack_from("<I", payload, 0)[0]
+ ptr = 0x3C
+ ok = True
+ for idx in range(cmd_count):
+ if ptr + 4 > len(payload):
+ issues.append(
+ {
+ "severity": "error",
+ "archive": str(archive_path),
+ "entry": entry_name,
+ "message": f"command {idx}: missing header at offset={ptr}",
+ }
+ )
+ ok = False
+ break
+
+ word = struct.unpack_from("<I", payload, ptr)[0]
+ opcode = word & 0xFF
+ size = FX_CMD_SIZE.get(opcode)
+ if size is None:
+ issues.append(
+ {
+ "severity": "error",
+ "archive": str(archive_path),
+ "entry": entry_name,
+ "message": f"command {idx}: unknown opcode={opcode} at offset={ptr}",
+ }
+ )
+ ok = False
+ break
+
+ if ptr + size > len(payload):
+ issues.append(
+ {
+ "severity": "error",
+ "archive": str(archive_path),
+ "entry": entry_name,
+ "message": f"command {idx}: truncated end={ptr + size}, payload={len(payload)}",
+ }
+ )
+ ok = False
+ break
+
+ opcode_hist[opcode] += 1
+ if opcode == 6:
+ counters["op6_commands"] += 1
+ if opcode == 1:
+ tail6 = payload[ptr + 136 : ptr + 160]
+ if any(tail6):
+ counters["op1_tail6_nonzero"] += 1
+ if len(op1_tail6_samples) < 16:
+ dwords = list(struct.unpack("<6I", tail6))
+ op1_tail6_samples.append(
+ {
+ "archive": str(archive_path),
+ "entry": entry_name,
+ "cmd_index": idx,
+ "tail6_u32_hex": [f"0x{x:08X}" for x in dwords],
+ }
+ )
+
+ archive_s = _cstr32(payload[ptr + 160 : ptr + 192])
+ name_s = _cstr32(payload[ptr + 192 : ptr + 224])
+ if archive_s or name_s:
+ counters["op1_optref_nonempty"] += 1
+ if len(op1_optref_samples) < 16:
+ op1_optref_samples.append(
+ {
+ "archive": str(archive_path),
+ "entry": entry_name,
+ "cmd_index": idx,
+ "opt_archive": archive_s,
+ "opt_name": name_s,
+ }
+ )
+
+ ptr += size
+
+ if ok and ptr != len(payload):
+ issues.append(
+ {
+ "severity": "error",
+ "archive": str(archive_path),
+ "entry": entry_name,
+ "message": f"tail bytes after command stream: parsed_end={ptr}, payload={len(payload)}",
+ }
+ )
+ ok = False
+
+ if ok:
+ counters["fxid_ok"] += 1
+
+ return {
+ "input_root": str(root),
+ "summary": {
+ "archives_total": counters["archives_total"],
+ "fxid_total": counters["fxid_total"],
+ "fxid_ok": counters["fxid_ok"],
+ "issues_total": len(issues),
+ "op6_commands": counters["op6_commands"],
+ "op1_tail6_nonzero": counters["op1_tail6_nonzero"],
+ "op1_optref_nonempty": counters["op1_optref_nonempty"],
+ },
+ "opcode_histogram": {str(k): opcode_hist[k] for k in sorted(opcode_hist)},
+ "op1_tail6_samples": op1_tail6_samples,
+ "op1_optref_samples": op1_optref_samples,
+ "rng_reference": _rng_vectors(),
+ "rng_states_fx_path": [
+ {"state": "dword_10023688", "seed_init": "sub_10002660", "used_by": ["sub_10001720", "sub_10001A40"]},
+ {"state": "dword_100238C0", "seed_init": "sub_10003A50", "used_by": ["sub_10002BE0"]},
+ {"state": "dword_10024110", "seed_init": "sub_10009180", "used_by": ["sub_10008120", "sub_10007D10"]},
+ {"state": "dword_10024810", "seed_init": "sub_1000D370", "used_by": ["sub_1000BF30", "sub_1000C1A0"]},
+ {"state": "dword_10024A48", "seed_init": "sub_1000F420", "used_by": ["sub_1000EC50"]},
+ {"state": "dword_10024C80", "seed_init": "sub_10010370", "used_by": ["sub_1000F6E0"]},
+ {"state": "dword_100250F0", "seed_init": "sub_10012C70", "used_by": ["sub_10011230", "sub_100115C0"]},
+ ],
+ "issues": issues,
+ }
+
+
+def main() -> int:
+ parser = argparse.ArgumentParser(description="FXID absolute parity audit.")
+ parser.add_argument("--input", required=True, help="Root directory with game/test archives.")
+ parser.add_argument("--report", required=True, help="Output JSON report path.")
+ args = parser.parse_args()
+
+ root = Path(args.input).resolve()
+ report_path = Path(args.report).resolve()
+ payload = run_audit(root)
+ report_path.parent.mkdir(parents=True, exist_ok=True)
+ report_path.write_text(json.dumps(payload, indent=2, ensure_ascii=False) + "\n", encoding="utf-8")
+
+ summary = payload["summary"]
+ print(f"Input root : {root}")
+ print(f"NRes archives : {summary['archives_total']}")
+ print(f"FXID payloads : {summary['fxid_ok']}/{summary['fxid_total']} valid")
+ print(f"Issues : {summary['issues_total']}")
+ print(f"Opcode6 commands : {summary['op6_commands']}")
+ print(f"Op1 tail6 nonzero : {summary['op1_tail6_nonzero']}")
+ print(f"Op1 optref non-empty : {summary['op1_optref_nonempty']}")
+ print(f"Report : {report_path}")
+
+ return 0
+
+
+if __name__ == "__main__":
+ raise SystemExit(main())
diff --git a/tools/terrain_map_doc_validator.py b/tools/terrain_map_doc_validator.py
new file mode 100644
index 0000000..63c3077
--- /dev/null
+++ b/tools/terrain_map_doc_validator.py
@@ -0,0 +1,809 @@
+#!/usr/bin/env python3
+"""
+Validate terrain/map documentation assumptions against real game data.
+
+Targets:
+- tmp/gamedata/DATA/MAPS/**/Land.msh
+- tmp/gamedata/DATA/MAPS/**/Land.map
+"""
+
+from __future__ import annotations
+
+import argparse
+import json
+import math
+import struct
+from collections import Counter, defaultdict
+from dataclasses import dataclass
+from pathlib import Path
+from typing import Any
+
+import archive_roundtrip_validator as arv
+
+MAGIC_NRES = b"NRes"
+
+REQUIRED_MSH_TYPES = (1, 2, 3, 4, 5, 11, 18, 21)
+OPTIONAL_MSH_TYPES = (14,)
+EXPECTED_MSH_ORDER = (1, 2, 3, 4, 5, 18, 14, 11, 21)
+
+MSH_STRIDES = {
+ 1: 38,
+ 3: 12,
+ 4: 4,
+ 5: 4,
+ 11: 4,
+ 14: 4,
+ 18: 4,
+ 21: 28,
+}
+
+SLOT_TABLE_OFFSET = 0x8C
+
+
+@dataclass
+class ValidationIssue:
+ severity: str # error | warning
+ category: str
+ resource: str
+ message: str
+
+
+class TerrainMapDocValidator:
+ def __init__(self) -> None:
+ self.issues: list[ValidationIssue] = []
+ self.stats: dict[str, Any] = {
+ "maps_total": 0,
+ "msh_total": 0,
+ "map_total": 0,
+ "msh_type_orders": Counter(),
+ "msh_attr_triplets": defaultdict(Counter), # type_id -> Counter[(a1,a2,a3)]
+ "msh_type11_header_words": Counter(),
+ "msh_type21_flags_top": Counter(),
+ "map_logic_flags": Counter(),
+ "map_class_ids": Counter(), # record +40
+ "map_poly_count": Counter(),
+ "map_vertex_count_min": None,
+ "map_vertex_count_max": None,
+ "map_cell_dims": Counter(),
+ "map_reserved_u12": Counter(),
+ "map_reserved_u36": Counter(),
+ "map_reserved_u44": Counter(),
+ "map_area_delta_abs_max": 0.0,
+ "map_area_delta_rel_max": 0.0,
+ "map_area_rel_gt_05_count": 0,
+ "map_normal_len_min": None,
+ "map_normal_len_max": None,
+ "map_records_total": 0,
+ }
+
+ def add_issue(self, severity: str, category: str, resource: Path, message: str) -> None:
+ self.issues.append(
+ ValidationIssue(
+ severity=severity,
+ category=category,
+ resource=str(resource),
+ message=message,
+ )
+ )
+
+ def _entry_payload(self, blob: bytes, entry: dict[str, Any]) -> bytes:
+ start = int(entry["data_offset"])
+ end = start + int(entry["size"])
+ return blob[start:end]
+
+ def _entry_by_type(self, entries: list[dict[str, Any]]) -> dict[int, list[dict[str, Any]]]:
+ by_type: dict[int, list[dict[str, Any]]] = {}
+ for item in entries:
+ by_type.setdefault(int(item["type_id"]), []).append(item)
+ return by_type
+
+ def _expect_single_type(
+ self,
+ *,
+ by_type: dict[int, list[dict[str, Any]]],
+ type_id: int,
+ label: str,
+ resource: Path,
+ required: bool,
+ ) -> dict[str, Any] | None:
+ rows = by_type.get(type_id, [])
+ if not rows:
+ if required:
+ self.add_issue(
+ "error",
+ "msh-chunk",
+ resource,
+ f"missing required chunk type={type_id} ({label})",
+ )
+ return None
+ if len(rows) > 1:
+ self.add_issue(
+ "warning",
+ "msh-chunk",
+ resource,
+ f"multiple chunks type={type_id} ({label}); using first",
+ )
+ return rows[0]
+
+ def _check_stride(
+ self,
+ *,
+ resource: Path,
+ entry: dict[str, Any],
+ stride: int,
+ label: str,
+ ) -> int:
+ size = int(entry["size"])
+ attr1 = int(entry["attr1"])
+ attr2 = int(entry["attr2"])
+ attr3 = int(entry["attr3"])
+ self.stats["msh_attr_triplets"][int(entry["type_id"])][(attr1, attr2, attr3)] += 1
+
+ if size % stride != 0:
+ self.add_issue(
+ "error",
+ "msh-stride",
+ resource,
+ f"{label}: size={size} is not divisible by stride={stride}",
+ )
+ return -1
+
+ count = size // stride
+ if attr1 != count:
+ self.add_issue(
+ "error",
+ "msh-attr",
+ resource,
+ f"{label}: attr1={attr1} != size/stride={count}",
+ )
+ if attr3 != stride:
+ self.add_issue(
+ "error",
+ "msh-attr",
+ resource,
+ f"{label}: attr3={attr3} != {stride}",
+ )
+ if attr2 != 0 and int(entry["type_id"]) not in (1,):
+ # type 1 has non-zero attr2 in real assets, others are expected zero.
+ self.add_issue(
+ "warning",
+ "msh-attr",
+ resource,
+ f"{label}: attr2={attr2} (expected 0 for this chunk type)",
+ )
+ return count
+
+ def validate_msh(self, path: Path) -> None:
+ self.stats["msh_total"] += 1
+ blob = path.read_bytes()
+ if blob[:4] != MAGIC_NRES:
+ self.add_issue("error", "msh-container", path, "file is not NRes")
+ return
+
+ try:
+ parsed = arv.parse_nres(blob, source=str(path))
+ except Exception as exc: # pylint: disable=broad-except
+ self.add_issue("error", "msh-container", path, f"failed to parse NRes: {exc}")
+ return
+
+ for issue in parsed.get("issues", []):
+ self.add_issue("warning", "msh-nres", path, issue)
+
+ entries = parsed["entries"]
+ types_order = tuple(int(item["type_id"]) for item in entries)
+ self.stats["msh_type_orders"][types_order] += 1
+ if types_order != EXPECTED_MSH_ORDER:
+ self.add_issue(
+ "warning",
+ "msh-order",
+ path,
+ f"unexpected chunk order {types_order}, expected {EXPECTED_MSH_ORDER}",
+ )
+
+ by_type = self._entry_by_type(entries)
+
+ chunks: dict[int, dict[str, Any]] = {}
+ for type_id in REQUIRED_MSH_TYPES:
+ chunk = self._expect_single_type(
+ by_type=by_type,
+ type_id=type_id,
+ label=f"type{type_id}",
+ resource=path,
+ required=True,
+ )
+ if chunk:
+ chunks[type_id] = chunk
+ for type_id in OPTIONAL_MSH_TYPES:
+ chunk = self._expect_single_type(
+ by_type=by_type,
+ type_id=type_id,
+ label=f"type{type_id}",
+ resource=path,
+ required=False,
+ )
+ if chunk:
+ chunks[type_id] = chunk
+
+ for type_id, stride in MSH_STRIDES.items():
+ chunk = chunks.get(type_id)
+ if not chunk:
+ continue
+ self._check_stride(resource=path, entry=chunk, stride=stride, label=f"type{type_id}")
+
+ # type 2 includes 0x8C-byte header + 68-byte slot table entries.
+ type2 = chunks.get(2)
+ if type2:
+ size = int(type2["size"])
+ attr1 = int(type2["attr1"])
+ attr2 = int(type2["attr2"])
+ attr3 = int(type2["attr3"])
+ self.stats["msh_attr_triplets"][2][(attr1, attr2, attr3)] += 1
+ if attr3 != 68:
+ self.add_issue(
+ "error",
+ "msh-attr",
+ path,
+ f"type2: attr3={attr3} != 68",
+ )
+ if attr2 != 0:
+ self.add_issue(
+ "warning",
+ "msh-attr",
+ path,
+ f"type2: attr2={attr2} (expected 0)",
+ )
+ if size < SLOT_TABLE_OFFSET:
+ self.add_issue(
+ "error",
+ "msh-size",
+ path,
+ f"type2: size={size} < header_size={SLOT_TABLE_OFFSET}",
+ )
+ elif (size - SLOT_TABLE_OFFSET) % 68 != 0:
+ self.add_issue(
+ "error",
+ "msh-size",
+ path,
+ f"type2: (size - 0x8C) is not divisible by 68 (size={size})",
+ )
+ else:
+ slots_by_size = (size - SLOT_TABLE_OFFSET) // 68
+ if attr1 != slots_by_size:
+ self.add_issue(
+ "error",
+ "msh-attr",
+ path,
+ f"type2: attr1={attr1} != (size-0x8C)/68={slots_by_size}",
+ )
+
+ verts = chunks.get(3)
+ face = chunks.get(21)
+ slots = chunks.get(2)
+ nodes = chunks.get(1)
+ type11 = chunks.get(11)
+
+ if verts and face:
+ vcount = int(verts["attr1"])
+ face_payload = self._entry_payload(blob, face)
+ fcount = int(face["attr1"])
+ if len(face_payload) >= 28:
+ for idx in range(fcount):
+ off = idx * 28
+ if off + 28 > len(face_payload):
+ self.add_issue(
+ "error",
+ "msh-face",
+ path,
+ f"type21 truncated at face {idx}",
+ )
+ break
+ flags = struct.unpack_from("<I", face_payload, off)[0]
+ self.stats["msh_type21_flags_top"][flags] += 1
+ i0, i1, i2 = struct.unpack_from("<HHH", face_payload, off + 8)
+ for name, value in (("i0", i0), ("i1", i1), ("i2", i2)):
+ if value >= vcount:
+ self.add_issue(
+ "error",
+ "msh-face-index",
+ path,
+ f"type21[{idx}].{name}={value} out of range vertex_count={vcount}",
+ )
+ n0, n1, n2 = struct.unpack_from("<HHH", face_payload, off + 14)
+ for name, value in (("n0", n0), ("n1", n1), ("n2", n2)):
+ if value != 0xFFFF and value >= fcount:
+ self.add_issue(
+ "error",
+ "msh-face-neighbour",
+ path,
+ f"type21[{idx}].{name}={value} out of range face_count={fcount}",
+ )
+
+ if slots and face:
+ slot_count = int(slots["attr1"])
+ face_count = int(face["attr1"])
+ slot_payload = self._entry_payload(blob, slots)
+ need = SLOT_TABLE_OFFSET + slot_count * 68
+ if len(slot_payload) < need:
+ self.add_issue(
+ "error",
+ "msh-slot",
+ path,
+ f"type2 payload too short: size={len(slot_payload)}, need_at_least={need}",
+ )
+ else:
+ if len(slot_payload) != need:
+ self.add_issue(
+ "warning",
+ "msh-slot",
+ path,
+ f"type2 payload has trailing bytes: size={len(slot_payload)}, expected={need}",
+ )
+ for idx in range(slot_count):
+ off = SLOT_TABLE_OFFSET + idx * 68
+ tri_start, tri_count = struct.unpack_from("<HH", slot_payload, off)
+ if tri_start + tri_count > face_count:
+ self.add_issue(
+ "error",
+ "msh-slot-range",
+ path,
+ f"type2 slot[{idx}] range [{tri_start}, {tri_start + tri_count}) exceeds face_count={face_count}",
+ )
+
+ if nodes and slots:
+ node_payload = self._entry_payload(blob, nodes)
+ slot_count = int(slots["attr1"])
+ node_count = int(nodes["attr1"])
+ for node_idx in range(node_count):
+ off = node_idx * 38
+ if off + 38 > len(node_payload):
+ self.add_issue(
+ "error",
+ "msh-node",
+ path,
+ f"type1 truncated at node {node_idx}",
+ )
+ break
+ for j in range(19):
+ slot_id = struct.unpack_from("<H", node_payload, off + j * 2)[0]
+ if slot_id != 0xFFFF and slot_id >= slot_count:
+ self.add_issue(
+ "error",
+ "msh-node-slot",
+ path,
+ f"type1 node[{node_idx}] slot[{j}]={slot_id} out of range slot_count={slot_count}",
+ )
+
+ if type11:
+ payload = self._entry_payload(blob, type11)
+ if len(payload) >= 8:
+ w0, w1 = struct.unpack_from("<II", payload, 0)
+ self.stats["msh_type11_header_words"][(w0, w1)] += 1
+ else:
+ self.add_issue(
+ "error",
+ "msh-type11",
+ path,
+ f"type11 payload too short: {len(payload)}",
+ )
+
+ def _update_minmax(self, key_min: str, key_max: str, value: float) -> None:
+ if self.stats[key_min] is None or value < self.stats[key_min]:
+ self.stats[key_min] = value
+ if self.stats[key_max] is None or value > self.stats[key_max]:
+ self.stats[key_max] = value
+
+ def validate_map(self, path: Path) -> None:
+ self.stats["map_total"] += 1
+ blob = path.read_bytes()
+ if blob[:4] != MAGIC_NRES:
+ self.add_issue("error", "map-container", path, "file is not NRes")
+ return
+
+ try:
+ parsed = arv.parse_nres(blob, source=str(path))
+ except Exception as exc: # pylint: disable=broad-except
+ self.add_issue("error", "map-container", path, f"failed to parse NRes: {exc}")
+ return
+
+ for issue in parsed.get("issues", []):
+ self.add_issue("warning", "map-nres", path, issue)
+
+ entries = parsed["entries"]
+ if len(entries) != 1 or int(entries[0]["type_id"]) != 12:
+ self.add_issue(
+ "error",
+ "map-chunk",
+ path,
+ f"expected single chunk type=12, got {[int(e['type_id']) for e in entries]}",
+ )
+ return
+
+ entry = entries[0]
+ areal_count = int(entry["attr1"])
+ if areal_count <= 0:
+ self.add_issue("error", "map-areal", path, f"invalid areal_count={areal_count}")
+ return
+
+ payload = self._entry_payload(blob, entry)
+ ptr = 0
+ records: list[dict[str, Any]] = []
+
+ for idx in range(areal_count):
+ if ptr + 56 > len(payload):
+ self.add_issue(
+ "error",
+ "map-record",
+ path,
+ f"truncated areal header at index={idx}, ptr={ptr}, size={len(payload)}",
+ )
+ return
+
+ anchor_x, anchor_y, anchor_z = struct.unpack_from("<fff", payload, ptr)
+ u12 = struct.unpack_from("<I", payload, ptr + 12)[0]
+ area_f = struct.unpack_from("<f", payload, ptr + 16)[0]
+ nx, ny, nz = struct.unpack_from("<fff", payload, ptr + 20)
+ logic_flag = struct.unpack_from("<I", payload, ptr + 32)[0]
+ u36 = struct.unpack_from("<I", payload, ptr + 36)[0]
+ class_id = struct.unpack_from("<I", payload, ptr + 40)[0]
+ u44 = struct.unpack_from("<I", payload, ptr + 44)[0]
+ vertex_count, poly_count = struct.unpack_from("<II", payload, ptr + 48)
+
+ self.stats["map_records_total"] += 1
+ self.stats["map_logic_flags"][logic_flag] += 1
+ self.stats["map_class_ids"][class_id] += 1
+ self.stats["map_poly_count"][poly_count] += 1
+ self.stats["map_reserved_u12"][u12] += 1
+ self.stats["map_reserved_u36"][u36] += 1
+ self.stats["map_reserved_u44"][u44] += 1
+ self._update_minmax("map_vertex_count_min", "map_vertex_count_max", float(vertex_count))
+
+ normal_len = math.sqrt(nx * nx + ny * ny + nz * nz)
+ self._update_minmax("map_normal_len_min", "map_normal_len_max", normal_len)
+ if abs(normal_len - 1.0) > 1e-3:
+ self.add_issue(
+ "warning",
+ "map-normal",
+ path,
+ f"record[{idx}] normal length={normal_len:.6f} (expected ~1.0)",
+ )
+
+ vertices_off = ptr + 56
+ vertices_size = 12 * vertex_count
+ if vertices_off + vertices_size > len(payload):
+ self.add_issue(
+ "error",
+ "map-vertices",
+ path,
+ f"record[{idx}] vertices out of bounds",
+ )
+ return
+
+ vertices: list[tuple[float, float, float]] = []
+ for i in range(vertex_count):
+ vertices.append(struct.unpack_from("<fff", payload, vertices_off + i * 12))
+
+ if vertex_count >= 3:
+ # signed shoelace area in XY.
+ shoelace = 0.0
+ for i in range(vertex_count):
+ x1, y1, _ = vertices[i]
+ x2, y2, _ = vertices[(i + 1) % vertex_count]
+ shoelace += x1 * y2 - x2 * y1
+ area_xy = abs(shoelace) * 0.5
+ delta = abs(area_xy - area_f)
+ if delta > self.stats["map_area_delta_abs_max"]:
+ self.stats["map_area_delta_abs_max"] = delta
+ rel_delta = delta / max(1.0, area_xy)
+ if rel_delta > self.stats["map_area_delta_rel_max"]:
+ self.stats["map_area_delta_rel_max"] = rel_delta
+ if rel_delta > 0.05:
+ self.stats["map_area_rel_gt_05_count"] += 1
+
+ links_off = vertices_off + vertices_size
+ link_count = vertex_count + 3 * poly_count
+ links_size = 8 * link_count
+ if links_off + links_size > len(payload):
+ self.add_issue(
+ "error",
+ "map-links",
+ path,
+ f"record[{idx}] link table out of bounds",
+ )
+ return
+
+ edge_links: list[tuple[int, int]] = []
+ for i in range(vertex_count):
+ area_ref, edge_ref = struct.unpack_from("<ii", payload, links_off + i * 8)
+ edge_links.append((area_ref, edge_ref))
+
+ poly_links_off = links_off + 8 * vertex_count
+ poly_links: list[tuple[int, int]] = []
+ for i in range(3 * poly_count):
+ area_ref, edge_ref = struct.unpack_from("<ii", payload, poly_links_off + i * 8)
+ poly_links.append((area_ref, edge_ref))
+
+ p = links_off + links_size
+ for poly_idx in range(poly_count):
+ if p + 4 > len(payload):
+ self.add_issue(
+ "error",
+ "map-poly",
+ path,
+ f"record[{idx}] poly header truncated at poly_idx={poly_idx}",
+ )
+ return
+ n = struct.unpack_from("<I", payload, p)[0]
+ poly_size = 4 * (3 * n + 1)
+ if p + poly_size > len(payload):
+ self.add_issue(
+ "error",
+ "map-poly",
+ path,
+ f"record[{idx}] poly data out of bounds at poly_idx={poly_idx}",
+ )
+ return
+ p += poly_size
+
+ records.append(
+ {
+ "index": idx,
+ "anchor": (anchor_x, anchor_y, anchor_z),
+ "logic": logic_flag,
+ "class_id": class_id,
+ "vertex_count": vertex_count,
+ "poly_count": poly_count,
+ "edge_links": edge_links,
+ "poly_links": poly_links,
+ }
+ )
+ ptr = p
+
+ vertex_counts = [int(item["vertex_count"]) for item in records]
+ for rec in records:
+ idx = int(rec["index"])
+ for link_idx, (area_ref, edge_ref) in enumerate(rec["edge_links"]):
+ if area_ref == -1:
+ if edge_ref != -1:
+ self.add_issue(
+ "warning",
+ "map-link",
+ path,
+ f"record[{idx}] edge_link[{link_idx}] has area_ref=-1 but edge_ref={edge_ref}",
+ )
+ continue
+ if area_ref < 0 or area_ref >= areal_count:
+ self.add_issue(
+ "error",
+ "map-link",
+ path,
+ f"record[{idx}] edge_link[{link_idx}] area_ref={area_ref} out of range",
+ )
+ continue
+ dst_vcount = vertex_counts[area_ref]
+ if edge_ref < 0 or edge_ref >= dst_vcount:
+ self.add_issue(
+ "error",
+ "map-link",
+ path,
+ f"record[{idx}] edge_link[{link_idx}] edge_ref={edge_ref} out of range dst_vertex_count={dst_vcount}",
+ )
+
+ for link_idx, (area_ref, edge_ref) in enumerate(rec["poly_links"]):
+ if area_ref == -1:
+ if edge_ref != -1:
+ self.add_issue(
+ "warning",
+ "map-poly-link",
+ path,
+ f"record[{idx}] poly_link[{link_idx}] has area_ref=-1 but edge_ref={edge_ref}",
+ )
+ continue
+ if area_ref < 0 or area_ref >= areal_count:
+ self.add_issue(
+ "error",
+ "map-poly-link",
+ path,
+ f"record[{idx}] poly_link[{link_idx}] area_ref={area_ref} out of range",
+ )
+
+ if ptr + 8 > len(payload):
+ self.add_issue(
+ "error",
+ "map-cells",
+ path,
+ f"missing cells header at ptr={ptr}, size={len(payload)}",
+ )
+ return
+
+ cells_x, cells_y = struct.unpack_from("<II", payload, ptr)
+ self.stats["map_cell_dims"][(cells_x, cells_y)] += 1
+ ptr += 8
+ if cells_x <= 0 or cells_y <= 0:
+ self.add_issue(
+ "error",
+ "map-cells",
+ path,
+ f"invalid cells dimensions {cells_x}x{cells_y}",
+ )
+ return
+
+ for x in range(cells_x):
+ for y in range(cells_y):
+ if ptr + 2 > len(payload):
+ self.add_issue(
+ "error",
+ "map-cells",
+ path,
+ f"truncated hitCount at cell ({x},{y})",
+ )
+ return
+ hit_count = struct.unpack_from("<H", payload, ptr)[0]
+ ptr += 2
+ need = 2 * hit_count
+ if ptr + need > len(payload):
+ self.add_issue(
+ "error",
+ "map-cells",
+ path,
+ f"truncated areaIds at cell ({x},{y}), hitCount={hit_count}",
+ )
+ return
+ for i in range(hit_count):
+ area_id = struct.unpack_from("<H", payload, ptr + 2 * i)[0]
+ if area_id >= areal_count:
+ self.add_issue(
+ "error",
+ "map-cells",
+ path,
+ f"cell ({x},{y}) has area_id={area_id} out of range areal_count={areal_count}",
+ )
+ ptr += need
+
+ if ptr != len(payload):
+ self.add_issue(
+ "error",
+ "map-size",
+ path,
+ f"payload tail mismatch: consumed={ptr}, payload_size={len(payload)}",
+ )
+
+ def validate(self, maps_root: Path) -> None:
+ msh_paths = sorted(maps_root.rglob("Land.msh"))
+ map_paths = sorted(maps_root.rglob("Land.map"))
+
+ msh_by_dir = {path.parent: path for path in msh_paths}
+ map_by_dir = {path.parent: path for path in map_paths}
+
+ all_dirs = sorted(set(msh_by_dir) | set(map_by_dir))
+ self.stats["maps_total"] = len(all_dirs)
+
+ for folder in all_dirs:
+ msh_path = msh_by_dir.get(folder)
+ map_path = map_by_dir.get(folder)
+ if msh_path is None:
+ self.add_issue("error", "pairing", folder, "missing Land.msh")
+ continue
+ if map_path is None:
+ self.add_issue("error", "pairing", folder, "missing Land.map")
+ continue
+ self.validate_msh(msh_path)
+ self.validate_map(map_path)
+
+ def build_report(self) -> dict[str, Any]:
+ errors = [i for i in self.issues if i.severity == "error"]
+ warnings = [i for i in self.issues if i.severity == "warning"]
+
+ # Convert counters/defaultdicts to JSON-friendly dicts.
+ msh_orders = {
+ str(list(order)): count
+ for order, count in self.stats["msh_type_orders"].most_common()
+ }
+ msh_attrs = {
+ str(type_id): {str(list(k)): v for k, v in counter.most_common()}
+ for type_id, counter in self.stats["msh_attr_triplets"].items()
+ }
+ type11_hdr = {
+ str(list(key)): value
+ for key, value in self.stats["msh_type11_header_words"].most_common()
+ }
+ type21_flags = {
+ f"0x{key:08X}": value
+ for key, value in self.stats["msh_type21_flags_top"].most_common(32)
+ }
+
+ return {
+ "summary": {
+ "maps_total": self.stats["maps_total"],
+ "msh_total": self.stats["msh_total"],
+ "map_total": self.stats["map_total"],
+ "issues_total": len(self.issues),
+ "errors_total": len(errors),
+ "warnings_total": len(warnings),
+ },
+ "stats": {
+ "msh_type_orders": msh_orders,
+ "msh_attr_triplets": msh_attrs,
+ "msh_type11_header_words": type11_hdr,
+ "msh_type21_flags_top": type21_flags,
+ "map_logic_flags": dict(self.stats["map_logic_flags"]),
+ "map_class_ids": dict(self.stats["map_class_ids"]),
+ "map_poly_count": dict(self.stats["map_poly_count"]),
+ "map_vertex_count_min": self.stats["map_vertex_count_min"],
+ "map_vertex_count_max": self.stats["map_vertex_count_max"],
+ "map_cell_dims": {str(list(k)): v for k, v in self.stats["map_cell_dims"].items()},
+ "map_reserved_u12": dict(self.stats["map_reserved_u12"]),
+ "map_reserved_u36": dict(self.stats["map_reserved_u36"]),
+ "map_reserved_u44": dict(self.stats["map_reserved_u44"]),
+ "map_area_delta_abs_max": self.stats["map_area_delta_abs_max"],
+ "map_area_delta_rel_max": self.stats["map_area_delta_rel_max"],
+ "map_area_rel_gt_05_count": self.stats["map_area_rel_gt_05_count"],
+ "map_normal_len_min": self.stats["map_normal_len_min"],
+ "map_normal_len_max": self.stats["map_normal_len_max"],
+ "map_records_total": self.stats["map_records_total"],
+ },
+ "issues": [
+ {
+ "severity": item.severity,
+ "category": item.category,
+ "resource": item.resource,
+ "message": item.message,
+ }
+ for item in self.issues
+ ],
+ }
+
+
+def parse_args() -> argparse.Namespace:
+ parser = argparse.ArgumentParser(description="Validate terrain/map doc assumptions")
+ parser.add_argument(
+ "--maps-root",
+ type=Path,
+ default=Path("tmp/gamedata/DATA/MAPS"),
+ help="Root directory containing MAPS/**/Land.msh and Land.map",
+ )
+ parser.add_argument(
+ "--report-json",
+ type=Path,
+ default=None,
+ help="Optional path to save full JSON report",
+ )
+ parser.add_argument(
+ "--fail-on-warning",
+ action="store_true",
+ help="Return non-zero exit code on warnings too",
+ )
+ return parser.parse_args()
+
+
+def main() -> int:
+ args = parse_args()
+ validator = TerrainMapDocValidator()
+ validator.validate(args.maps_root)
+ report = validator.build_report()
+
+ print(
+ json.dumps(
+ report["summary"],
+ indent=2,
+ ensure_ascii=False,
+ )
+ )
+
+ if args.report_json:
+ args.report_json.parent.mkdir(parents=True, exist_ok=True)
+ with args.report_json.open("w", encoding="utf-8") as handle:
+ json.dump(report, handle, indent=2, ensure_ascii=False)
+ handle.write("\n")
+ print(f"report written: {args.report_json}")
+
+ has_errors = report["summary"]["errors_total"] > 0
+ has_warnings = report["summary"]["warnings_total"] > 0
+ if has_errors:
+ return 1
+ if args.fail_on_warning and has_warnings:
+ return 1
+ return 0
+
+
+if __name__ == "__main__":
+ raise SystemExit(main())
diff --git a/tools/terrain_map_preview_renderer.py b/tools/terrain_map_preview_renderer.py
new file mode 100644
index 0000000..86d72d7
--- /dev/null
+++ b/tools/terrain_map_preview_renderer.py
@@ -0,0 +1,679 @@
+#!/usr/bin/env python3
+"""
+Software 3D renderer for terrain Land.msh + Land.map overlay.
+
+Output format: binary PPM (P6), dependency-free.
+"""
+
+from __future__ import annotations
+
+import argparse
+import math
+import struct
+from pathlib import Path
+from typing import Any
+
+import archive_roundtrip_validator as arv
+
+MAGIC_NRES = b"NRes"
+
+
+def _entry_payload(blob: bytes, entry: dict[str, Any]) -> bytes:
+ start = int(entry["data_offset"])
+ end = start + int(entry["size"])
+ return blob[start:end]
+
+
+def _parse_nres(blob: bytes, source: str) -> dict[str, Any]:
+ if blob[:4] != MAGIC_NRES:
+ raise RuntimeError(f"{source}: not an NRes payload")
+ return arv.parse_nres(blob, source=source)
+
+
+def _by_type(entries: list[dict[str, Any]]) -> dict[int, list[dict[str, Any]]]:
+ out: dict[int, list[dict[str, Any]]] = {}
+ for row in entries:
+ out.setdefault(int(row["type_id"]), []).append(row)
+ return out
+
+
+def _get_single(by_type: dict[int, list[dict[str, Any]]], type_id: int, label: str) -> dict[str, Any]:
+ rows = by_type.get(type_id, [])
+ if not rows:
+ raise RuntimeError(f"missing resource type {type_id} ({label})")
+ return rows[0]
+
+
+def _downsample_faces(
+ faces: list[tuple[int, int, int]],
+ max_faces: int,
+) -> list[tuple[int, int, int]]:
+ if max_faces <= 0 or len(faces) <= max_faces:
+ return faces
+ step = len(faces) / max_faces
+ out: list[tuple[int, int, int]] = []
+ pos = 0.0
+ while len(out) < max_faces and int(pos) < len(faces):
+ out.append(faces[int(pos)])
+ pos += step
+ return out
+
+
+def load_terrain_msh(
+ path: Path,
+ *,
+ max_faces: int,
+) -> tuple[list[tuple[float, float, float]], list[tuple[int, int, int]], dict[str, int]]:
+ blob = path.read_bytes()
+ parsed = _parse_nres(blob, str(path))
+ by_type = _by_type(parsed["entries"])
+
+ res3 = _get_single(by_type, 3, "positions")
+ res21 = _get_single(by_type, 21, "terrain faces")
+
+ pos_blob = _entry_payload(blob, res3)
+ if len(pos_blob) % 12 != 0:
+ raise RuntimeError(f"{path}: type 3 payload size is not divisible by 12")
+ vertex_count = len(pos_blob) // 12
+ positions = [struct.unpack_from("<3f", pos_blob, i * 12) for i in range(vertex_count)]
+
+ face_blob = _entry_payload(blob, res21)
+ if len(face_blob) % 28 != 0:
+ raise RuntimeError(f"{path}: type 21 payload size is not divisible by 28")
+ all_faces: list[tuple[int, int, int]] = []
+ raw_face_count = len(face_blob) // 28
+ dropped = 0
+ for i in range(raw_face_count):
+ off = i * 28
+ i0, i1, i2 = struct.unpack_from("<HHH", face_blob, off + 8)
+ if i0 >= vertex_count or i1 >= vertex_count or i2 >= vertex_count:
+ dropped += 1
+ continue
+ all_faces.append((i0, i1, i2))
+
+ faces = _downsample_faces(all_faces, max_faces)
+ meta = {
+ "vertex_count": vertex_count,
+ "face_count_raw": raw_face_count,
+ "face_count_valid": len(all_faces),
+ "face_count_rendered": len(faces),
+ "face_dropped_invalid": dropped,
+ }
+ return positions, faces, meta
+
+
+def load_areal_map(path: Path) -> tuple[list[dict[str, Any]], dict[str, int]]:
+ blob = path.read_bytes()
+ parsed = _parse_nres(blob, str(path))
+ by_type = _by_type(parsed["entries"])
+ chunk = _get_single(by_type, 12, "ArealMapGeometry")
+
+ payload = _entry_payload(blob, chunk)
+ areal_count = int(chunk["attr1"])
+ ptr = 0
+ areals: list[dict[str, Any]] = []
+ for idx in range(areal_count):
+ if ptr + 56 > len(payload):
+ raise RuntimeError(f"{path}: truncated areal header at index={idx}")
+ class_id = struct.unpack_from("<I", payload, ptr + 40)[0]
+ vertex_count, poly_count = struct.unpack_from("<II", payload, ptr + 48)
+ verts_off = ptr + 56
+ verts_size = 12 * vertex_count
+ if verts_off + verts_size > len(payload):
+ raise RuntimeError(f"{path}: areal[{idx}] vertices out of bounds")
+ verts = [struct.unpack_from("<3f", payload, verts_off + 12 * i) for i in range(vertex_count)]
+
+ links_off = verts_off + verts_size
+ links_size = 8 * (vertex_count + 3 * poly_count)
+ p = links_off + links_size
+ for _ in range(poly_count):
+ if p + 4 > len(payload):
+ raise RuntimeError(f"{path}: areal[{idx}] poly header out of bounds")
+ n = struct.unpack_from("<I", payload, p)[0]
+ p += 4 * (3 * n + 1)
+ if p > len(payload):
+ raise RuntimeError(f"{path}: areal[{idx}] poly data out of bounds")
+
+ areals.append(
+ {
+ "index": idx,
+ "class_id": class_id,
+ "vertices": verts,
+ }
+ )
+ ptr = p
+
+ if ptr + 8 > len(payload):
+ raise RuntimeError(f"{path}: missing cells section")
+ cells_x, cells_y = struct.unpack_from("<II", payload, ptr)
+ ptr += 8
+ for _x in range(cells_x):
+ for _y in range(cells_y):
+ if ptr + 2 > len(payload):
+ raise RuntimeError(f"{path}: cells section truncated")
+ hit_count = struct.unpack_from("<H", payload, ptr)[0]
+ ptr += 2 + 2 * hit_count
+ if ptr > len(payload):
+ raise RuntimeError(f"{path}: cells section out of bounds")
+ if ptr != len(payload):
+ raise RuntimeError(f"{path}: trailing bytes in chunk12 parse ({len(payload) - ptr})")
+
+ meta = {
+ "areal_count": areal_count,
+ "cells_x": cells_x,
+ "cells_y": cells_y,
+ }
+ return areals, meta
+
+
+def _color_for_class(class_id: int) -> tuple[int, int, int]:
+ x = (class_id * 1103515245 + 12345) & 0x7FFFFFFF
+ r = 60 + (x & 0x7F)
+ g = 60 + ((x >> 7) & 0x7F)
+ b = 60 + ((x >> 14) & 0x7F)
+ return r, g, b
+
+
+def _write_ppm(path: Path, width: int, height: int, rgb: bytearray) -> None:
+ path.parent.mkdir(parents=True, exist_ok=True)
+ with path.open("wb") as handle:
+ handle.write(f"P6\n{width} {height}\n255\n".encode("ascii"))
+ handle.write(rgb)
+
+
+def _write_obj(
+ path: Path,
+ terrain_positions: list[tuple[float, float, float]],
+ terrain_faces: list[tuple[int, int, int]],
+ areals: list[dict[str, Any]],
+ *,
+ include_areals: bool,
+) -> None:
+ path.parent.mkdir(parents=True, exist_ok=True)
+ with path.open("w", encoding="utf-8", newline="\n") as out:
+ out.write("# Exported by terrain_map_preview_renderer.py\n")
+ out.write("o terrain\n")
+ for x, y, z in terrain_positions:
+ out.write(f"v {x:.9g} {y:.9g} {z:.9g}\n")
+ for i0, i1, i2 in terrain_faces:
+ # OBJ indices are 1-based.
+ out.write(f"f {i0 + 1} {i1 + 1} {i2 + 1}\n")
+
+ if include_areals and areals:
+ base = len(terrain_positions)
+ area_vertex_counts: list[int] = []
+ out.write("o areal_edges\n")
+ for area in areals:
+ verts = area["vertices"]
+ area_vertex_counts.append(len(verts))
+ for x, y, z in verts:
+ out.write(f"v {x:.9g} {y:.9g} {z:.9g}\n")
+
+ ptr = base
+ for area_idx, area in enumerate(areals):
+ cnt = area_vertex_counts[area_idx]
+ if cnt < 2:
+ ptr += cnt
+ continue
+ # closed polyline.
+ line = [str(ptr + i + 1) for i in range(cnt)]
+ line.append(str(ptr + 1))
+ out.write("l " + " ".join(line) + "\n")
+ ptr += cnt
+
+
+def _render_scene(
+ terrain_positions: list[tuple[float, float, float]],
+ terrain_faces: list[tuple[int, int, int]],
+ areals: list[dict[str, Any]],
+ *,
+ width: int,
+ height: int,
+ yaw_deg: float,
+ pitch_deg: float,
+ wireframe: bool,
+ areal_overlay: bool,
+) -> bytearray:
+ all_positions = list(terrain_positions)
+ if areal_overlay:
+ for area in areals:
+ all_positions.extend(area["vertices"])
+ if not all_positions:
+ raise RuntimeError("scene is empty")
+
+ xs = [p[0] for p in all_positions]
+ ys = [p[1] for p in all_positions]
+ zs = [p[2] for p in all_positions]
+ cx = (min(xs) + max(xs)) * 0.5
+ cy = (min(ys) + max(ys)) * 0.5
+ cz = (min(zs) + max(zs)) * 0.5
+ span = max(max(xs) - min(xs), max(ys) - min(ys), max(zs) - min(zs))
+ radius = max(span * 0.5, 1e-3)
+
+ yaw = math.radians(yaw_deg)
+ pitch = math.radians(pitch_deg)
+ cyaw = math.cos(yaw)
+ syaw = math.sin(yaw)
+ cpitch = math.cos(pitch)
+ spitch = math.sin(pitch)
+ camera_dist = radius * 3.2
+ scale = min(width, height) * 0.96
+
+ # Terrain transform cache.
+ vx: list[float] = []
+ vy: list[float] = []
+ vz: list[float] = []
+ sx: list[float] = []
+ sy: list[float] = []
+ for x, y, z in terrain_positions:
+ x0 = x - cx
+ y0 = y - cy
+ z0 = z - cz
+ x1 = cyaw * x0 + syaw * z0
+ z1 = -syaw * x0 + cyaw * z0
+ y2 = cpitch * y0 - spitch * z1
+ z2 = spitch * y0 + cpitch * z1 + camera_dist
+ if z2 < 1e-3:
+ z2 = 1e-3
+ vx.append(x1)
+ vy.append(y2)
+ vz.append(z2)
+ sx.append(width * 0.5 + (x1 / z2) * scale)
+ sy.append(height * 0.5 - (y2 / z2) * scale)
+
+ def project_point(x: float, y: float, z: float) -> tuple[float, float, float]:
+ x0 = x - cx
+ y0 = y - cy
+ z0 = z - cz
+ x1 = cyaw * x0 + syaw * z0
+ z1 = -syaw * x0 + cyaw * z0
+ y2 = cpitch * y0 - spitch * z1
+ z2 = spitch * y0 + cpitch * z1 + camera_dist
+ if z2 < 1e-3:
+ z2 = 1e-3
+ px = width * 0.5 + (x1 / z2) * scale
+ py = height * 0.5 - (y2 / z2) * scale
+ return px, py, z2
+
+ rgb = bytearray([14, 16, 20] * (width * height))
+ zbuf = [float("inf")] * (width * height)
+ light_dir = (0.35, 0.45, 1.0)
+ l_len = math.sqrt(light_dir[0] ** 2 + light_dir[1] ** 2 + light_dir[2] ** 2)
+ light = (light_dir[0] / l_len, light_dir[1] / l_len, light_dir[2] / l_len)
+
+ def edge(ax: float, ay: float, bx: float, by: float, px: float, py: float) -> float:
+ return (px - ax) * (by - ay) - (py - ay) * (bx - ax)
+
+ for i0, i1, i2 in terrain_faces:
+ x0 = sx[i0]
+ y0 = sy[i0]
+ x1 = sx[i1]
+ y1 = sy[i1]
+ x2 = sx[i2]
+ y2 = sy[i2]
+ area = edge(x0, y0, x1, y1, x2, y2)
+ if area == 0.0:
+ continue
+
+ ux = vx[i1] - vx[i0]
+ uy = vy[i1] - vy[i0]
+ uz = vz[i1] - vz[i0]
+ wx = vx[i2] - vx[i0]
+ wy = vy[i2] - vy[i0]
+ wz = vz[i2] - vz[i0]
+ nx = uy * wz - uz * wy
+ ny = uz * wx - ux * wz
+ nz = ux * wy - uy * wx
+ n_len = math.sqrt(nx * nx + ny * ny + nz * nz)
+ if n_len > 0.0:
+ nx /= n_len
+ ny /= n_len
+ nz /= n_len
+ intensity = nx * light[0] + ny * light[1] + nz * light[2]
+ if intensity < 0.0:
+ intensity = 0.0
+ shade = int(45 + 185 * intensity)
+ color = (min(255, shade + 6), min(255, shade + 14), min(255, shade + 28))
+
+ minx = int(max(0, math.floor(min(x0, x1, x2))))
+ maxx = int(min(width - 1, math.ceil(max(x0, x1, x2))))
+ miny = int(max(0, math.floor(min(y0, y1, y2))))
+ maxy = int(min(height - 1, math.ceil(max(y0, y1, y2))))
+ if minx > maxx or miny > maxy:
+ continue
+
+ z0 = vz[i0]
+ z1 = vz[i1]
+ z2 = vz[i2]
+ inv_area = 1.0 / area
+ for py in range(miny, maxy + 1):
+ fy = py + 0.5
+ row = py * width
+ for px in range(minx, maxx + 1):
+ fx = px + 0.5
+ w0 = edge(x1, y1, x2, y2, fx, fy)
+ w1 = edge(x2, y2, x0, y0, fx, fy)
+ w2 = edge(x0, y0, x1, y1, fx, fy)
+ if area > 0:
+ if w0 < 0 or w1 < 0 or w2 < 0:
+ continue
+ else:
+ if w0 > 0 or w1 > 0 or w2 > 0:
+ continue
+ bz0 = w0 * inv_area
+ bz1 = w1 * inv_area
+ bz2 = w2 * inv_area
+ depth = bz0 * z0 + bz1 * z1 + bz2 * z2
+ idx = row + px
+ if depth >= zbuf[idx]:
+ continue
+ zbuf[idx] = depth
+ p = idx * 3
+ rgb[p + 0] = color[0]
+ rgb[p + 1] = color[1]
+ rgb[p + 2] = color[2]
+
+ def draw_line(
+ xa: float,
+ ya: float,
+ xb: float,
+ yb: float,
+ color: tuple[int, int, int],
+ ) -> None:
+ x0i = int(round(xa))
+ y0i = int(round(ya))
+ x1i = int(round(xb))
+ y1i = int(round(yb))
+ dx = abs(x1i - x0i)
+ sx_step = 1 if x0i < x1i else -1
+ dy = -abs(y1i - y0i)
+ sy_step = 1 if y0i < y1i else -1
+ err = dx + dy
+ x = x0i
+ y = y0i
+ while True:
+ if 0 <= x < width and 0 <= y < height:
+ p = (y * width + x) * 3
+ rgb[p + 0] = color[0]
+ rgb[p + 1] = color[1]
+ rgb[p + 2] = color[2]
+ if x == x1i and y == y1i:
+ break
+ e2 = 2 * err
+ if e2 >= dy:
+ err += dy
+ x += sx_step
+ if e2 <= dx:
+ err += dx
+ y += sy_step
+
+ if wireframe:
+ wf = (225, 232, 246)
+ for i0, i1, i2 in terrain_faces:
+ draw_line(sx[i0], sy[i0], sx[i1], sy[i1], wf)
+ draw_line(sx[i1], sy[i1], sx[i2], sy[i2], wf)
+ draw_line(sx[i2], sy[i2], sx[i0], sy[i0], wf)
+
+ if areal_overlay:
+ for area in areals:
+ verts = area["vertices"]
+ if len(verts) < 2:
+ continue
+ color = _color_for_class(int(area["class_id"]))
+ projected = [project_point(x, y, z + 0.35) for x, y, z in verts]
+ for i in range(len(projected)):
+ x0, y0, _ = projected[i]
+ x1, y1, _ = projected[(i + 1) % len(projected)]
+ draw_line(x0, y0, x1, y1, color)
+
+ return rgb
+
+
+def cmd_render(args: argparse.Namespace) -> int:
+ msh_path = Path(args.land_msh).resolve()
+ map_path = Path(args.land_map).resolve() if args.land_map else None
+ output_path = Path(args.output).resolve()
+
+ positions, faces, terrain_meta = load_terrain_msh(msh_path, max_faces=int(args.max_faces))
+ areals: list[dict[str, Any]] = []
+ map_meta: dict[str, int] = {"areal_count": 0, "cells_x": 0, "cells_y": 0}
+ if map_path:
+ areals, map_meta = load_areal_map(map_path)
+
+ rgb = _render_scene(
+ positions,
+ faces,
+ areals,
+ width=int(args.width),
+ height=int(args.height),
+ yaw_deg=float(args.yaw),
+ pitch_deg=float(args.pitch),
+ wireframe=bool(args.wireframe),
+ areal_overlay=bool(args.overlay_areals),
+ )
+ _write_ppm(output_path, int(args.width), int(args.height), rgb)
+
+ print(f"Rendered terrain : {msh_path}")
+ if map_path:
+ print(f"Areal overlay : {map_path}")
+ print(f"Output : {output_path}")
+ print(
+ "Terrain geometry : "
+ f"vertices={terrain_meta['vertex_count']}, "
+ f"faces={terrain_meta['face_count_rendered']}/{terrain_meta['face_count_valid']} "
+ f"(raw={terrain_meta['face_count_raw']}, dropped={terrain_meta['face_dropped_invalid']})"
+ )
+ if map_path:
+ print(
+ "Areal map : "
+ f"areals={map_meta['areal_count']}, cells={map_meta['cells_x']}x{map_meta['cells_y']}"
+ )
+ return 0
+
+
+def cmd_export_obj(args: argparse.Namespace) -> int:
+ msh_path = Path(args.land_msh).resolve()
+ map_path = Path(args.land_map).resolve() if args.land_map else None
+ output_path = Path(args.output).resolve()
+
+ positions, faces, terrain_meta = load_terrain_msh(msh_path, max_faces=int(args.max_faces))
+ areals: list[dict[str, Any]] = []
+ if map_path and bool(args.include_areals):
+ areals, _ = load_areal_map(map_path)
+
+ _write_obj(
+ output_path,
+ positions,
+ faces,
+ areals,
+ include_areals=bool(args.include_areals),
+ )
+
+ areal_vertices = sum(len(a["vertices"]) for a in areals)
+ print(f"Terrain source : {msh_path}")
+ if map_path:
+ print(f"Areal source : {map_path}")
+ print(f"OBJ output : {output_path}")
+ print(
+ "Terrain geometry : "
+ f"vertices={terrain_meta['vertex_count']}, "
+ f"faces={terrain_meta['face_count_rendered']}/{terrain_meta['face_count_valid']}"
+ )
+ if bool(args.include_areals):
+ print(f"Areal edges : areals={len(areals)}, extra_vertices={areal_vertices}")
+ return 0
+
+
+def cmd_render_turntable(args: argparse.Namespace) -> int:
+ msh_path = Path(args.land_msh).resolve()
+ map_path = Path(args.land_map).resolve() if args.land_map else None
+ output_dir = Path(args.output_dir).resolve()
+ output_dir.mkdir(parents=True, exist_ok=True)
+
+ frames = int(args.frames)
+ if frames <= 0:
+ raise RuntimeError("--frames must be > 0")
+
+ positions, faces, terrain_meta = load_terrain_msh(msh_path, max_faces=int(args.max_faces))
+ areals: list[dict[str, Any]] = []
+ if map_path:
+ areals, _ = load_areal_map(map_path)
+
+ yaw_start = float(args.yaw_start)
+ yaw_end = float(args.yaw_end)
+ if frames == 1:
+ yaws = [yaw_start]
+ else:
+ step = (yaw_end - yaw_start) / (frames - 1)
+ yaws = [yaw_start + i * step for i in range(frames)]
+
+ prefix = str(args.prefix)
+ for i, yaw in enumerate(yaws):
+ rgb = _render_scene(
+ positions,
+ faces,
+ areals,
+ width=int(args.width),
+ height=int(args.height),
+ yaw_deg=yaw,
+ pitch_deg=float(args.pitch),
+ wireframe=bool(args.wireframe),
+ areal_overlay=bool(args.overlay_areals),
+ )
+ out = output_dir / f"{prefix}_{i:03d}.ppm"
+ _write_ppm(out, int(args.width), int(args.height), rgb)
+
+ print(f"Turntable source : {msh_path}")
+ if map_path:
+ print(f"Areal source : {map_path}")
+ print(f"Output dir : {output_dir}")
+ print(f"Frames : {frames} ({yaws[0]:.3f} -> {yaws[-1]:.3f} yaw)")
+ print(
+ "Terrain geometry : "
+ f"vertices={terrain_meta['vertex_count']}, faces={terrain_meta['face_count_rendered']}"
+ )
+ return 0
+
+
+def cmd_render_batch(args: argparse.Namespace) -> int:
+ maps_root = Path(args.maps_root).resolve()
+ output_dir = Path(args.output_dir).resolve()
+ msh_paths = sorted(maps_root.rglob("Land.msh"))
+ if not msh_paths:
+ raise RuntimeError(f"no Land.msh files under {maps_root}")
+
+ rendered = 0
+ skipped = 0
+ for msh_path in msh_paths:
+ map_path = msh_path.with_name("Land.map")
+ if not map_path.exists():
+ skipped += 1
+ continue
+ rel = msh_path.parent.relative_to(maps_root)
+ out = output_dir / f"{rel.as_posix().replace('/', '__')}.ppm"
+ cmd_render(
+ argparse.Namespace(
+ land_msh=str(msh_path),
+ land_map=str(map_path),
+ output=str(out),
+ max_faces=args.max_faces,
+ width=args.width,
+ height=args.height,
+ yaw=args.yaw,
+ pitch=args.pitch,
+ wireframe=args.wireframe,
+ overlay_areals=args.overlay_areals,
+ )
+ )
+ rendered += 1
+
+ print(f"Batch summary: rendered={rendered}, skipped_no_map={skipped}, output_dir={output_dir}")
+ return 0
+
+
+def build_parser() -> argparse.ArgumentParser:
+ parser = argparse.ArgumentParser(
+ description="Software 3D terrain renderer (Land.msh + optional Land.map overlay)."
+ )
+ sub = parser.add_subparsers(dest="command", required=True)
+
+ render = sub.add_parser("render", help="Render one terrain map to PPM.")
+ render.add_argument("--land-msh", required=True, help="Path to Land.msh")
+ render.add_argument("--land-map", help="Path to Land.map (optional)")
+ render.add_argument("--output", required=True, help="Output .ppm path")
+ render.add_argument("--max-faces", type=int, default=220000, help="Face limit (default: 220000)")
+ render.add_argument("--width", type=int, default=1280, help="Image width (default: 1280)")
+ render.add_argument("--height", type=int, default=720, help="Image height (default: 720)")
+ render.add_argument("--yaw", type=float, default=38.0, help="Yaw angle in degrees (default: 38)")
+ render.add_argument("--pitch", type=float, default=26.0, help="Pitch angle in degrees (default: 26)")
+ render.add_argument("--wireframe", action="store_true", help="Draw terrain wireframe overlay")
+ render.add_argument(
+ "--overlay-areals",
+ action="store_true",
+ help="Draw ArealMap polygon overlay",
+ )
+ render.set_defaults(func=cmd_render)
+
+ export_obj = sub.add_parser("export-obj", help="Export terrain (and optional areal edges) to OBJ.")
+ export_obj.add_argument("--land-msh", required=True, help="Path to Land.msh")
+ export_obj.add_argument("--land-map", help="Path to Land.map (optional)")
+ export_obj.add_argument("--output", required=True, help="Output .obj path")
+ export_obj.add_argument("--max-faces", type=int, default=0, help="Face limit (0 = all)")
+ export_obj.add_argument(
+ "--include-areals",
+ action="store_true",
+ help="Export areal polygons as OBJ polyline object",
+ )
+ export_obj.set_defaults(func=cmd_export_obj)
+
+ turn = sub.add_parser("render-turntable", help="Render turntable frame sequence to PPM.")
+ turn.add_argument("--land-msh", required=True, help="Path to Land.msh")
+ turn.add_argument("--land-map", help="Path to Land.map (optional)")
+ turn.add_argument("--output-dir", required=True, help="Output directory for frames")
+ turn.add_argument("--prefix", default="frame", help="Frame filename prefix (default: frame)")
+ turn.add_argument("--frames", type=int, default=36, help="Frame count (default: 36)")
+ turn.add_argument("--yaw-start", type=float, default=0.0, help="Start yaw in degrees (default: 0)")
+ turn.add_argument("--yaw-end", type=float, default=360.0, help="End yaw in degrees (default: 360)")
+ turn.add_argument("--pitch", type=float, default=26.0, help="Pitch angle in degrees (default: 26)")
+ turn.add_argument("--max-faces", type=int, default=160000, help="Face limit (default: 160000)")
+ turn.add_argument("--width", type=int, default=960, help="Image width (default: 960)")
+ turn.add_argument("--height", type=int, default=540, help="Image height (default: 540)")
+ turn.add_argument("--wireframe", action="store_true", help="Draw terrain wireframe overlay")
+ turn.add_argument(
+ "--overlay-areals",
+ action="store_true",
+ help="Draw ArealMap polygon overlay",
+ )
+ turn.set_defaults(func=cmd_render_turntable)
+
+ batch = sub.add_parser("render-batch", help="Render all MAPS/**/Land.msh under root.")
+ batch.add_argument(
+ "--maps-root",
+ default="tmp/gamedata/DATA/MAPS",
+ help="Root directory with MAPS subfolders (default: tmp/gamedata/DATA/MAPS)",
+ )
+ batch.add_argument("--output-dir", required=True, help="Directory for output PPM files")
+ batch.add_argument("--max-faces", type=int, default=90000, help="Face limit per map (default: 90000)")
+ batch.add_argument("--width", type=int, default=960, help="Image width (default: 960)")
+ batch.add_argument("--height", type=int, default=540, help="Image height (default: 540)")
+ batch.add_argument("--yaw", type=float, default=38.0, help="Yaw angle in degrees (default: 38)")
+ batch.add_argument("--pitch", type=float, default=26.0, help="Pitch angle in degrees (default: 26)")
+ batch.add_argument("--wireframe", action="store_true", help="Draw terrain wireframe overlay")
+ batch.add_argument(
+ "--overlay-areals",
+ action="store_true",
+ help="Draw ArealMap polygon overlay",
+ )
+ batch.set_defaults(func=cmd_render_batch)
+
+ return parser
+
+
+def main() -> int:
+ parser = build_parser()
+ args = parser.parse_args()
+ return int(args.func(args))
+
+
+if __name__ == "__main__":
+ raise SystemExit(main())