aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorValentin Popov <valentin@popov.link>2026-06-21 23:35:19 +0300
committerValentin Popov <valentin@popov.link>2026-06-21 23:35:19 +0300
commit50c2cf4686b53ebd2b76318223096660e92305a4 (patch)
tree8741109102a21faaa09a013a047e5c7b74f62b12
parent96a25b6c0e39ee39bceddbc8eae5bda8f305acbf (diff)
downloadfparkan-50c2cf4686b53ebd2b76318223096660e92305a4.tar.xz
fparkan-50c2cf4686b53ebd2b76318223096660e92305a4.zip
chore: remove Python tooling and resource viewer
-rw-r--r--Cargo.toml2
-rw-r--r--README.md12
-rw-r--r--apps/resource-viewer/Cargo.toml11
-rw-r--r--apps/resource-viewer/src/main.rs518
-rw-r--r--docs/specs/coverage-audit.md10
-rw-r--r--docs/specs/fxid.md2
-rw-r--r--docs/specs/material.md4
-rw-r--r--docs/specs/msh-animation.md4
-rw-r--r--docs/specs/msh-core.md3
-rw-r--r--docs/specs/msh-notes.md2
-rw-r--r--docs/specs/nres.md4
-rw-r--r--docs/specs/render.md2
-rw-r--r--docs/specs/rsli.md5
-rw-r--r--docs/specs/terrain-map-loading.md4
-rw-r--r--docs/specs/texture.md2
-rw-r--r--testdata/README.md5
-rw-r--r--testdata/nres/.gitignore2
-rw-r--r--testdata/rsli/.gitignore2
-rw-r--r--tools/README.md201
-rw-r--r--tools/archive_roundtrip_validator.py944
-rw-r--r--tools/fxid_abs100_audit.py262
-rw-r--r--tools/init_testdata.py204
-rw-r--r--tools/msh_doc_validator.py1000
-rw-r--r--tools/msh_export_obj.py357
-rw-r--r--tools/msh_preview_renderer.py481
-rw-r--r--tools/terrain_map_doc_validator.py809
-rw-r--r--tools/terrain_map_preview_renderer.py679
27 files changed, 22 insertions, 5509 deletions
diff --git a/Cargo.toml b/Cargo.toml
index e508408..34c501a 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -1,6 +1,6 @@
[workspace]
resolver = "3"
-members = ["crates/*", "apps/resource-viewer"]
+members = ["crates/*"]
[profile.release]
codegen-units = 1
diff --git a/README.md b/README.md
index 86e525a..92a3b64 100644
--- a/README.md
+++ b/README.md
@@ -1,13 +1,12 @@
# FParkan
-Open source проект с реализацией компонентов игрового движка игры **«Паркан: Железная Стратегия»** и набором [вспомогательных инструментов](tools) для исследования.
+Open source проект с реализацией компонентов игрового движка игры **«Паркан: Железная Стратегия»**.
## Описание
Проект находится в активной разработке и включает:
- библиотеки для работы с форматами игровых архивов;
-- инструменты для валидации/подготовки тестовых данных;
- спецификации форматов и сопутствующую документацию.
## Установка
@@ -19,13 +18,6 @@ Open source проект с реализацией компонентов игр
- локально: каталог [`docs/`](docs)
- сайт: <https://fparkan.popov.link>
-## Инструменты
-
-Вспомогательные инструменты находятся в каталоге [`tools/`](tools).
-
-- [tools/archive_roundtrip_validator.py](tools/archive_roundtrip_validator.py) — инструмент верификации документации по архивам `NRes`/`RsLi` на реальных файлах (включая `unpack -> repack -> byte-compare`).
-- [tools/init_testdata.py](tools/init_testdata.py) — подготовка тестовых данных по сигнатурам с раскладкой по каталогам.
-
## Библиотеки
- [crates/nres](crates/nres) — библиотека для работы с файлами архивов NRes (чтение, поиск, редактирование, сохранение).
@@ -37,8 +29,8 @@ Open source проект с реализацией компонентов игр
Для дополнительного тестирования на реальных игровых ресурсах:
-- используйте [tools/init_testdata.py](tools/init_testdata.py) для подготовки локального набора;
- используйте оригинальную копию игры (диск или [GOG-версия](https://www.gog.com/en/game/parkan_iron_strategy));
+- разместите игровые каталоги в [`testdata/`](testdata);
- игровые ресурсы в репозиторий не включаются, так как защищены авторским правом.
## Contributing & Support
diff --git a/apps/resource-viewer/Cargo.toml b/apps/resource-viewer/Cargo.toml
deleted file mode 100644
index beadefe..0000000
--- a/apps/resource-viewer/Cargo.toml
+++ /dev/null
@@ -1,11 +0,0 @@
-[package]
-name = "resource-viewer"
-version = "0.1.0"
-edition = "2021"
-publish = false
-
-[dependencies]
-iced = "0.14"
-rfd = "0.17"
-nres = { path = "../../crates/nres" }
-rsli = { path = "../../crates/rsli" }
diff --git a/apps/resource-viewer/src/main.rs b/apps/resource-viewer/src/main.rs
deleted file mode 100644
index 508c407..0000000
--- a/apps/resource-viewer/src/main.rs
+++ /dev/null
@@ -1,518 +0,0 @@
-use iced::widget::{button, column, container, horizontal_space, row, scrollable, text};
-use iced::{application, Element, Length, Task, Theme};
-use rfd::FileDialog;
-use std::collections::BTreeMap;
-use std::fmt::Write as _;
-use std::fs;
-use std::path::{Path, PathBuf};
-
-fn main() -> iced::Result {
- application("Parkan Resource Viewer", update, view)
- .theme(theme)
- .run_with(|| (ViewerApp::default(), Task::none()))
-}
-
-fn theme(_state: &ViewerApp) -> Theme {
- Theme::Light
-}
-
-#[derive(Debug, Default)]
-struct ViewerApp {
- document: Option<DocumentModel>,
- status: String,
-}
-
-#[derive(Debug, Clone)]
-enum Message {
- OpenRequested,
- SelectNode(Selection),
-}
-
-fn update(state: &mut ViewerApp, message: Message) -> Task<Message> {
- match message {
- Message::OpenRequested => {
- if let Some(path) = pick_archive_file() {
- match load_document(&path) {
- Ok(document) => {
- state.status =
- format!("Loaded {} as {}", path.display(), document.format.label());
- state.document = Some(document);
- }
- Err(err) => {
- state.status = err;
- }
- }
- }
- }
- Message::SelectNode(selection) => {
- if let Some(document) = state.document.as_mut() {
- document.selected = selection;
- }
- }
- }
-
- Task::none()
-}
-
-fn view(state: &ViewerApp) -> Element<'_, Message> {
- let top_bar = row![
- button("Open archive").on_press(Message::OpenRequested),
- text(status_text(state)).size(14)
- ]
- .spacing(12);
-
- let content = if let Some(document) = &state.document {
- view_document(document)
- } else {
- container(text("Open an .nres/.rsli/.lib archive to start.").size(16))
- .width(Length::Fill)
- .height(Length::Fill)
- .center_x(Length::Fill)
- .center_y(Length::Fill)
- .into()
- };
-
- container(column![top_bar, content].spacing(12).padding(12))
- .width(Length::Fill)
- .height(Length::Fill)
- .into()
-}
-
-fn status_text(state: &ViewerApp) -> String {
- if state.status.is_empty() {
- String::from("Ready")
- } else {
- state.status.clone()
- }
-}
-
-fn view_document(document: &DocumentModel) -> Element<'_, Message> {
- let mut tree = column![text("Archive tree").size(18)].spacing(6);
- for item in &document.tree_rows {
- let indent = horizontal_space().width(Length::Fixed(f32::from(item.depth) * 16.0));
-
- let line = row![indent, text(&item.label).size(14)].spacing(6);
- if let Some(selection) = item.selection {
- let mut node_button = button(line)
- .width(Length::Fill)
- .on_press(Message::SelectNode(selection));
-
- if selection == document.selected {
- node_button = node_button.style(button::primary);
- }
-
- tree = tree.push(node_button);
- } else {
- tree = tree.push(line);
- }
- }
-
- let (panel_title, fields) = selected_fields(document);
- let mut fields_column = column![text(panel_title).size(18)].spacing(8);
-
- for field in fields {
- fields_column = fields_column.push(
- row![
- text(&field.key).size(14).width(Length::Fixed(220.0)),
- text(&field.value).size(14).width(Length::Fill)
- ]
- .spacing(12),
- );
- }
-
- let left = container(scrollable(tree))
- .width(Length::FillPortion(2))
- .height(Length::Fill);
-
- let right = container(scrollable(fields_column))
- .width(Length::FillPortion(5))
- .height(Length::Fill);
-
- row![left, right].spacing(12).height(Length::Fill).into()
-}
-
-fn selected_fields(document: &DocumentModel) -> (String, &[FieldRow]) {
- match document.selected {
- Selection::Archive => (
- format!(
- "{} fields ({})",
- document.format.label(),
- document.path.display()
- ),
- &document.archive_fields,
- ),
- Selection::Entry(index) => {
- if let Some(entry) = document.entries.get(index) {
- (entry.panel_title.clone(), &entry.fields)
- } else {
- (String::from("Entry"), &[])
- }
- }
- }
-}
-
-fn pick_archive_file() -> Option<PathBuf> {
- FileDialog::new()
- .set_title("Open Parkan archive")
- .pick_file()
-}
-
-fn load_document(path: &Path) -> Result<DocumentModel, String> {
- let bytes =
- fs::read(path).map_err(|err| format!("Failed to read {}: {err}", path.display()))?;
- let Some(format) = detect_archive_format(&bytes) else {
- return Err(format!(
- "{} is not recognized as NRes/RsLi (unsupported magic).",
- path.display()
- ));
- };
-
- match format {
- ArchiveFormat::Nres => load_nres_document(path),
- ArchiveFormat::Rsli => load_rsli_document(path),
- }
-}
-
-fn detect_archive_format(bytes: &[u8]) -> Option<ArchiveFormat> {
- if bytes.len() >= 4 && &bytes[0..4] == b"NRes" {
- return Some(ArchiveFormat::Nres);
- }
-
- if bytes.len() >= 2 && &bytes[0..2] == b"NL" {
- return Some(ArchiveFormat::Rsli);
- }
-
- None
-}
-
-fn load_nres_document(path: &Path) -> Result<DocumentModel, String> {
- let archive = nres::Archive::open_path(path)
- .map_err(|err| format!("NRes open failed for {}: {err}", path.display()))?;
-
- let info = archive.info();
- let mut archive_fields = vec![
- FieldRow::new("format", "NRes"),
- FieldRow::new("file_size", info.file_size.to_string()),
- FieldRow::new("raw_mode", info.raw_mode.to_string()),
- ];
-
- if let Some(header) = &info.header {
- archive_fields.push(FieldRow::new(
- "magic",
- String::from_utf8_lossy(&header.magic).into_owned(),
- ));
- archive_fields.push(FieldRow::new("version", format_u32_dec_hex(header.version)));
- archive_fields.push(FieldRow::new("entry_count", header.entry_count.to_string()));
- archive_fields.push(FieldRow::new(
- "total_size",
- format!("{} (0x{:08X})", header.total_size, header.total_size),
- ));
- archive_fields.push(FieldRow::new(
- "directory_offset",
- header.directory_offset.to_string(),
- ));
- archive_fields.push(FieldRow::new(
- "directory_size",
- header.directory_size.to_string(),
- ));
- }
-
- let mut entries = Vec::new();
- for entry in archive.entries_inspect() {
- let meta = entry.meta;
- let mut fields = vec![
- FieldRow::new("id", entry.id.0.to_string()),
- FieldRow::new("name", meta.name.clone()),
- FieldRow::new("type_id", format_u32_dec_hex(meta.kind)),
- FieldRow::new("attr1", format_u32_dec_hex(meta.attr1)),
- FieldRow::new("attr2", format_u32_dec_hex(meta.attr2)),
- FieldRow::new("attr3", format_u32_dec_hex(meta.attr3)),
- FieldRow::new("data_offset", meta.data_offset.to_string()),
- FieldRow::new("data_size", meta.data_size.to_string()),
- FieldRow::new("sort_index", meta.sort_index.to_string()),
- FieldRow::new("name_raw_hex", bytes_as_hex(entry.name_raw)),
- FieldRow::new("name_raw_ascii", bytes_as_ascii(entry.name_raw)),
- ];
-
- fields.push(FieldRow::new("find_key", meta.name.to_ascii_lowercase()));
-
- entries.push(EntryView {
- full_name: meta.name.clone(),
- panel_title: format!("NRes entry #{}: {}", entry.id.0, meta.name),
- fields,
- });
- }
-
- let tree_rows = build_tree_rows(&entries);
-
- Ok(DocumentModel {
- path: path.to_path_buf(),
- format: ArchiveFormat::Nres,
- archive_fields,
- entries,
- tree_rows,
- selected: Selection::Archive,
- })
-}
-
-fn load_rsli_document(path: &Path) -> Result<DocumentModel, String> {
- let library = rsli::Library::open_path(path)
- .map_err(|err| format!("RsLi open failed for {}: {err}", path.display()))?;
-
- let header = library.header();
- let mut archive_fields = vec![
- FieldRow::new("format", "RsLi"),
- FieldRow::new("magic", String::from_utf8_lossy(&header.magic).into_owned()),
- FieldRow::new(
- "reserved",
- format!("{} (0x{:02X})", header.reserved, header.reserved),
- ),
- FieldRow::new(
- "version",
- format!("{} (0x{:02X})", header.version, header.version),
- ),
- FieldRow::new("entry_count", header.entry_count.to_string()),
- FieldRow::new("presorted_flag", format!("0x{:04X}", header.presorted_flag)),
- FieldRow::new("xor_seed", format!("0x{:08X}", header.xor_seed)),
- FieldRow::new("header_raw_hex", bytes_as_hex(&header.raw)),
- ];
-
- if let Some(ao) = library.ao_trailer() {
- archive_fields.push(FieldRow::new("ao_trailer", "present"));
- archive_fields.push(FieldRow::new("ao_overlay", ao.overlay.to_string()));
- archive_fields.push(FieldRow::new("ao_raw_hex", bytes_as_hex(&ao.raw)));
- } else {
- archive_fields.push(FieldRow::new("ao_trailer", "absent"));
- }
-
- let mut entries = Vec::new();
- for entry in library.entries_inspect() {
- let meta = entry.meta;
- let method_raw = (meta.flags as u16 as u32) & 0x1E0;
-
- let fields = vec![
- FieldRow::new("id", entry.id.0.to_string()),
- FieldRow::new("name", meta.name.clone()),
- FieldRow::new(
- "flags",
- format!("{} (0x{:04X})", meta.flags, meta.flags as u16),
- ),
- FieldRow::new("method", format!("{:?}", meta.method)),
- FieldRow::new("method_raw", format!("0x{:03X}", method_raw)),
- FieldRow::new("packed_size", meta.packed_size.to_string()),
- FieldRow::new("unpacked_size", meta.unpacked_size.to_string()),
- FieldRow::new("data_offset_effective", meta.data_offset.to_string()),
- FieldRow::new("data_offset_raw", entry.data_offset_raw.to_string()),
- FieldRow::new("sort_to_original", entry.sort_to_original.to_string()),
- FieldRow::new("name_raw_hex", bytes_as_hex(entry.name_raw)),
- FieldRow::new("name_raw_ascii", bytes_as_ascii(entry.name_raw)),
- FieldRow::new("service_tail_hex", bytes_as_hex(entry.service_tail)),
- FieldRow::new("service_tail_ascii", bytes_as_ascii(entry.service_tail)),
- ];
-
- entries.push(EntryView {
- full_name: meta.name.clone(),
- panel_title: format!("RsLi entry #{}: {}", entry.id.0, meta.name),
- fields,
- });
- }
-
- let tree_rows = build_tree_rows(&entries);
-
- Ok(DocumentModel {
- path: path.to_path_buf(),
- format: ArchiveFormat::Rsli,
- archive_fields,
- entries,
- tree_rows,
- selected: Selection::Archive,
- })
-}
-
-fn build_tree_rows(entries: &[EntryView]) -> Vec<TreeRow> {
- let mut root = FolderNode::default();
- for (index, entry) in entries.iter().enumerate() {
- insert_tree_path(&mut root, &entry.full_name, index);
- }
-
- let mut rows = vec![TreeRow {
- depth: 0,
- label: String::from("[Archive fields]"),
- selection: Some(Selection::Archive),
- }];
-
- flatten_tree(&root, 0, &mut rows);
- rows
-}
-
-fn insert_tree_path(root: &mut FolderNode, full_name: &str, entry_index: usize) {
- let mut parts: Vec<&str> = full_name
- .split(['/', '\\'])
- .filter(|part| !part.is_empty())
- .collect();
-
- if parts.is_empty() {
- parts.push(full_name);
- }
-
- if parts.len() == 1 {
- root.files.push((parts[0].to_string(), entry_index));
- return;
- }
-
- let file_name = parts.pop().unwrap_or(full_name);
- let mut node = root;
- for part in parts {
- node = node.folders.entry(part.to_string()).or_default();
- }
-
- node.files.push((file_name.to_string(), entry_index));
-}
-
-fn flatten_tree(node: &FolderNode, depth: u16, out: &mut Vec<TreeRow>) {
- for (folder_name, folder_node) in &node.folders {
- out.push(TreeRow {
- depth,
- label: format!("{folder_name}/"),
- selection: None,
- });
- flatten_tree(folder_node, depth.saturating_add(1), out);
- }
-
- let mut files = node.files.clone();
- files.sort_by(|left, right| left.0.cmp(&right.0));
-
- for (name, index) in files {
- out.push(TreeRow {
- depth,
- label: name,
- selection: Some(Selection::Entry(index)),
- });
- }
-}
-
-fn bytes_as_hex(bytes: &[u8]) -> String {
- let mut out = String::new();
- for (index, byte) in bytes.iter().enumerate() {
- if index > 0 {
- out.push(' ');
- }
- let _ = write!(&mut out, "{byte:02X}");
- }
- out
-}
-
-fn bytes_as_ascii(bytes: &[u8]) -> String {
- bytes
- .iter()
- .map(|byte| {
- if byte.is_ascii_graphic() || *byte == b' ' {
- char::from(*byte)
- } else {
- '.'
- }
- })
- .collect()
-}
-
-fn format_u32_dec_hex(value: u32) -> String {
- format!("{} (0x{:08X})", value, value)
-}
-
-#[derive(Debug, Clone)]
-struct DocumentModel {
- path: PathBuf,
- format: ArchiveFormat,
- archive_fields: Vec<FieldRow>,
- entries: Vec<EntryView>,
- tree_rows: Vec<TreeRow>,
- selected: Selection,
-}
-
-#[derive(Debug, Clone, Copy)]
-enum ArchiveFormat {
- Nres,
- Rsli,
-}
-
-impl ArchiveFormat {
- fn label(self) -> &'static str {
- match self {
- Self::Nres => "NRes",
- Self::Rsli => "RsLi",
- }
- }
-}
-
-#[derive(Debug, Clone)]
-struct EntryView {
- full_name: String,
- panel_title: String,
- fields: Vec<FieldRow>,
-}
-
-#[derive(Debug, Clone)]
-struct FieldRow {
- key: String,
- value: String,
-}
-
-impl FieldRow {
- fn new(key: impl Into<String>, value: impl Into<String>) -> Self {
- Self {
- key: key.into(),
- value: value.into(),
- }
- }
-}
-
-#[derive(Debug, Clone)]
-struct TreeRow {
- depth: u16,
- label: String,
- selection: Option<Selection>,
-}
-
-#[derive(Debug, Clone, Copy, PartialEq, Eq)]
-enum Selection {
- Archive,
- Entry(usize),
-}
-
-#[derive(Default, Debug)]
-struct FolderNode {
- folders: BTreeMap<String, FolderNode>,
- files: Vec<(String, usize)>,
-}
-
-#[cfg(test)]
-mod tests {
- use super::*;
-
- #[test]
- fn tree_builds_nested_paths() {
- let entries = vec![
- EntryView {
- full_name: String::from("textures/ui/hud.texm"),
- panel_title: String::new(),
- fields: vec![],
- },
- EntryView {
- full_name: String::from("textures/world/ground.texm"),
- panel_title: String::new(),
- fields: vec![],
- },
- EntryView {
- full_name: String::from("root_file.msh"),
- panel_title: String::new(),
- fields: vec![],
- },
- ];
-
- let rows = build_tree_rows(&entries);
- assert!(rows.iter().any(|row| row.label == "textures/"));
- assert!(rows.iter().any(|row| row.label == "ui/"));
- assert!(rows.iter().any(|row| row.label == "hud.texm"));
- assert!(rows.iter().any(|row| row.label == "root_file.msh"));
- }
-}
diff --git a/docs/specs/coverage-audit.md b/docs/specs/coverage-audit.md
index 638f4c1..bee27ee 100644
--- a/docs/specs/coverage-audit.md
+++ b/docs/specs/coverage-audit.md
@@ -11,9 +11,7 @@
- `RsLi`: `2` архива, roundtrip `2/2` (byte-identical)
- подтвержден один совместимый quirk: `sprites.lib`, entry `23`, `deflate EOF+1`
-Инструмент:
-
-- `tools/archive_roundtrip_validator.py`
+Проверено legacy-валидатором архивов.
## 2. Проверка рендерных форматов
@@ -24,11 +22,7 @@
- `FXID`: `923/923` валидны
- `Terrain/Map` (`Land.msh` + `Land.map`): `33/33` без ошибок/предупреждений
-Инструменты:
-
-- `tools/msh_doc_validator.py`
-- `tools/fxid_abs100_audit.py`
-- `tools/terrain_map_doc_validator.py`
+Проверено legacy-валидаторами рендерных форматов.
## 3. Глобальный статус по подсистемам
diff --git a/docs/specs/fxid.md b/docs/specs/fxid.md
index f723e17..e3a583d 100644
--- a/docs/specs/fxid.md
+++ b/docs/specs/fxid.md
@@ -184,7 +184,7 @@ struct ResourceRef64 {
## 11. Статус валидации
-- Формальные инварианты FXID зафиксированы в `tools/msh_doc_validator.py` и `tools/fxid_abs100_audit.py`.
+- Формальные инварианты FXID зафиксированы в спецификациях проекта и проверены legacy-валидаторами.
- На полном retail-корпусе `testdata/Parkan - Iron Strategy` проверено `923/923` FXID payload без ошибок.
## 12. Статус покрытия и что осталось до 100%
diff --git a/docs/specs/material.md b/docs/specs/material.md
index 12c8296..1aa3510 100644
--- a/docs/specs/material.md
+++ b/docs/specs/material.md
@@ -126,8 +126,8 @@ struct KeyRaw {
## 10. Статус валидации
-- Инварианты MAT0 зафиксированы в текущем toolchain проекта (`docs/specs` + `tools`).
-- Структурная валидация MAT0 включена в корпусный прогон `tools/msh_doc_validator.py` на полном retail-наборе.
+- Инварианты MAT0 зафиксированы в спецификациях проекта.
+- Структурная валидация MAT0 проверена legacy-валидатором на полном retail-наборе.
## 11. Статус покрытия и что осталось до 100%
diff --git a/docs/specs/msh-animation.md b/docs/specs/msh-animation.md
index ec5a256..1c0807a 100644
--- a/docs/specs/msh-animation.md
+++ b/docs/specs/msh-animation.md
@@ -108,8 +108,8 @@ uint16_t map_words[]; // size/2 элементов
## 6. Статус валидации
-- Форматные проверки включены в `tools/msh_doc_validator.py`.
-- Корпусная валидация анимационных инвариантов включена в прогон `tools/msh_doc_validator.py` на полном retail-наборе.
+- Форматные проверки были покрыты legacy-валидатором.
+- Корпусная валидация анимационных инвариантов выполнена на полном retail-наборе.
## 7. Статус покрытия и что осталось до 100%
diff --git a/docs/specs/msh-core.md b/docs/specs/msh-core.md
index 60a4453..db465e7 100644
--- a/docs/specs/msh-core.md
+++ b/docs/specs/msh-core.md
@@ -174,7 +174,7 @@ for each node:
## 8. Статус валидации
-- Инварианты формата реализованы в `tools/msh_doc_validator.py`.
+- Инварианты формата проверены legacy-валидатором.
- На полном retail-корпусе `testdata/Parkan - Iron Strategy` проверено `435/435` MSH-моделей без структурных ошибок.
## 9. Статус покрытия и что осталось до 100%
@@ -190,4 +190,3 @@ for each node:
1. Полная семантика части opaque-полей (`Slot68` tail, `Batch20` opaque-поля) для authoring без copy-through.
2. Полная формализация редких веток (`Res1.attr3 != 38`) на расширенном корпусе.
3. End-to-end writer для генерации новых игровых MSH с подтвержденным runtime-паритетом.
-
diff --git a/docs/specs/msh-notes.md b/docs/specs/msh-notes.md
index 6e77c4f..5c95eb5 100644
--- a/docs/specs/msh-notes.md
+++ b/docs/specs/msh-notes.md
@@ -104,7 +104,7 @@ Fallback:
2. Контракт animation sampling (`Res8 + Res19`).
3. Контракт MAT0/WEAR/Texm на уровне чтения и применения в кадре.
4. Формат FXID-контейнера, командный поток и fixed command sizes.
-5. Валидация на retail-корпусе через `tools/msh_doc_validator.py` (0 ошибок/предупреждений).
+5. Валидация на retail-корпусе legacy-валидатором (0 ошибок/предупреждений).
## 8. Статус покрытия и что осталось до 100%
diff --git a/docs/specs/nres.md b/docs/specs/nres.md
index bb31823..150b38b 100644
--- a/docs/specs/nres.md
+++ b/docs/specs/nres.md
@@ -168,9 +168,7 @@ Fail-safe поведение:
- roundtrip `unpack -> repack -> byte-compare`: `120/120` совпали побайтно;
- критических расхождений формата не обнаружено.
-Инструмент:
-
-- `tools/archive_roundtrip_validator.py`
+Проверено legacy-валидатором архивов.
## 11. Статус покрытия и что осталось до 100%
diff --git a/docs/specs/render.md b/docs/specs/render.md
index ccc941b..f1d098f 100644
--- a/docs/specs/render.md
+++ b/docs/specs/render.md
@@ -152,7 +152,7 @@ void RenderFrame(Scene* scene, Camera* cam, float dt) {
## 10. Статус валидации
- Порядок кадра и подключение `Material.lib / Textures.lib / LightMap.lib` подтверждены текущей runtime-валидацией проекта.
-- Детальные инварианты форматов зафиксированы в `tools/msh_doc_validator.py` и `tools/fxid_abs100_audit.py`.
+- Детальные инварианты форматов зафиксированы в спецификациях проекта и проверены legacy-валидаторами.
## 11. Статус покрытия и что осталось до 100%
diff --git a/docs/specs/rsli.md b/docs/specs/rsli.md
index 298cf2a..239b3ff 100644
--- a/docs/specs/rsli.md
+++ b/docs/specs/rsli.md
@@ -207,10 +207,7 @@ XOR-дешифрование первых `unpacked_size` байт.
- roundtrip `unpack -> repack -> byte-compare`: `2/2` совпали побайтно;
- подтвержден ровно один `deflate EOF+1` случай (`sprites.lib`, entry `23`).
-Инструменты:
-
-- `tools/archive_roundtrip_validator.py`
-- `crates/rsli` tests
+Проверено legacy-валидатором архивов и тестами `crates/rsli`.
## 11. Статус покрытия и что осталось до 100%
diff --git a/docs/specs/terrain-map-loading.md b/docs/specs/terrain-map-loading.md
index 62c1e0a..a511799 100644
--- a/docs/specs/terrain-map-loading.md
+++ b/docs/specs/terrain-map-loading.md
@@ -273,9 +273,7 @@ for (x=0; x<cellsX; x++) {
- `normal` имеет длину ~1.0;
- `reserved_12`, `reserved_36`, `reserved_44` в retail наблюдаются как `0`.
-Инструмент:
-
-- `tools/terrain_map_doc_validator.py`
+Проверено legacy-валидатором terrain/map форматов.
## 7. Статус покрытия и что осталось до 100%
diff --git a/docs/specs/texture.md b/docs/specs/texture.md
index b43ab1a..81ef3b3 100644
--- a/docs/specs/texture.md
+++ b/docs/specs/texture.md
@@ -135,7 +135,7 @@ struct Rect16 {
## 10. Статус валидации
-- Инварианты `Texm` реализованы в `tools/msh_doc_validator.py`.
+- Инварианты `Texm` проверены legacy-валидатором.
- На полном retail-корпусе `testdata/Parkan - Iron Strategy` проверено `518/518` текстурных payload (`Texm`) без ошибок.
## 11. Статус покрытия и что осталось до 100%
diff --git a/testdata/README.md b/testdata/README.md
new file mode 100644
index 0000000..ca8627f
--- /dev/null
+++ b/testdata/README.md
@@ -0,0 +1,5 @@
+# Тестовые данные
+
+Для тестирования на реальных ресурсах разместите в этом каталоге игровые каталоги.
+
+Игровые файлы не включаются в репозиторий.
diff --git a/testdata/nres/.gitignore b/testdata/nres/.gitignore
deleted file mode 100644
index c96a04f..0000000
--- a/testdata/nres/.gitignore
+++ /dev/null
@@ -1,2 +0,0 @@
-*
-!.gitignore \ No newline at end of file
diff --git a/testdata/rsli/.gitignore b/testdata/rsli/.gitignore
deleted file mode 100644
index c96a04f..0000000
--- a/testdata/rsli/.gitignore
+++ /dev/null
@@ -1,2 +0,0 @@
-*
-!.gitignore \ No newline at end of file
diff --git a/tools/README.md b/tools/README.md
deleted file mode 100644
index 2418567..0000000
--- a/tools/README.md
+++ /dev/null
@@ -1,201 +0,0 @@
-# Инструменты в каталоге `tools`
-
-## `archive_roundtrip_validator.py`
-
-Скрипт предназначен для **валидации документации по форматам NRes и RsLi на реальных данных игры**.
-
-Что делает утилита:
-
-- находит архивы по сигнатуре заголовка (а не по расширению файла);
-- распаковывает архивы в структуру `manifest.json + entries/*`;
-- собирает архивы обратно из `manifest.json`;
-- выполняет проверку `unpack -> repack -> byte-compare`;
-- формирует отчёт о расхождениях со спецификацией.
-
-Скрипт не изменяет оригинальные файлы игры. Рабочие файлы создаются только в указанном `--workdir` (или во временной папке).
-
-## Поддерживаемые сигнатуры
-
-- `NRes` (`4E 52 65 73`)
-- `RsLi` в файловом формате библиотеки: `NL 00 01`
-
-## Основные команды
-
-Сканирование архива по сигнатурам:
-
-```bash
-python3 tools/archive_roundtrip_validator.py scan --input tmp/gamedata
-```
-
-Распаковка/упаковка одного NRes:
-
-```bash
-python3 tools/archive_roundtrip_validator.py nres-unpack \
- --archive tmp/gamedata/sounds.lib \
- --output tmp/work/nres_sounds
-
-python3 tools/archive_roundtrip_validator.py nres-pack \
- --manifest tmp/work/nres_sounds/manifest.json \
- --output tmp/work/sounds.repacked.lib
-```
-
-Распаковка/упаковка одного RsLi:
-
-```bash
-python3 tools/archive_roundtrip_validator.py rsli-unpack \
- --archive tmp/gamedata/sprites.lib \
- --output tmp/work/rsli_sprites
-
-python3 tools/archive_roundtrip_validator.py rsli-pack \
- --manifest tmp/work/rsli_sprites/manifest.json \
- --output tmp/work/sprites.repacked.lib
-```
-
-Полная валидация документации на всём наборе данных:
-
-```bash
-python3 tools/archive_roundtrip_validator.py validate \
- --input tmp/gamedata \
- --workdir tmp/validation_work \
- --report tmp/validation_report.json \
- --fail-on-diff
-```
-
-## Формат распаковки
-
-Для каждого архива создаются:
-
-- `manifest.json` — все поля заголовка, записи, индексы, смещения, контрольные суммы;
-- `entries/*.bin` — payload-файлы.
-
-Имена файлов в `entries` включают индекс записи, поэтому коллизии одинаковых имён внутри архива обрабатываются корректно.
-
-## `init_testdata.py`
-
-Скрипт инициализирует тестовые данные по сигнатурам архивов из спецификации:
-
-- `NRes` (`4E 52 65 73`);
-- `RsLi` (`NL 00 01`).
-
-Что делает утилита:
-
-- рекурсивно сканирует все файлы в `--input`;
-- копирует найденные `NRes` в `--output/nres/`;
-- копирует найденные `RsLi` в `--output/rsli/`;
-- сохраняет относительный путь исходного файла внутри целевого каталога;
-- создаёт целевые каталоги автоматически, если их нет.
-
-Базовый запуск:
-
-```bash
-python3 tools/init_testdata.py --input tmp/gamedata --output testdata
-```
-
-Если целевой файл уже существует, скрипт спрашивает подтверждение перезаписи (`yes/no/all/quit`).
-
-Для перезаписи без вопросов используйте `--force`:
-
-```bash
-python3 tools/init_testdata.py --input tmp/gamedata --output testdata --force
-```
-
-Проверки надёжности:
-
-- `--input` должен существовать и быть каталогом;
-- если `--output` указывает на существующий файл, скрипт завершится с ошибкой;
-- если `--output` расположен внутри `--input`, каталог вывода исключается из сканирования;
-- если `stdin` неинтерактивный и требуется перезапись, нужно явно указать `--force`.
-
-## `msh_doc_validator.py`
-
-Скрипт валидирует ключевые инварианты из документации `/Users/valentineus/Developer/personal/fparkan/docs/specs/msh.md` на реальных данных.
-
-Проверяемые группы:
-
-- модели `*.msh` (вложенные `NRes` в архивах `NRes`);
-- текстуры `Texm` (`type_id = 0x6D786554`);
-- эффекты `FXID` (`type_id = 0x44495846`).
-
-Что проверяет для моделей:
-
-- обязательные ресурсы (`Res1/2/3/6/13`) и известные опциональные (`Res4/5/7/8/10/15/16/18/19`);
-- `size/attr1/attr3` и шаги структур по таблицам;
-- диапазоны индексов, батчей и ссылок между таблицами;
-- разбор `Res10` как `len + bytes + NUL` для каждого узла;
-- матрицу слотов в `Res1` (LOD/group) и границы по `Res2/Res7/Res13/Res19`.
-
-Быстрый запуск:
-
-```bash
-python3 tools/msh_doc_validator.py scan --input testdata/nres
-python3 tools/msh_doc_validator.py validate --input testdata/nres --print-limit 20
-```
-
-С отчётом в JSON:
-
-```bash
-python3 tools/msh_doc_validator.py validate \
- --input testdata/nres \
- --report tmp/msh_validation_report.json \
- --fail-on-warnings
-```
-
-## `msh_preview_renderer.py`
-
-Примитивный программный рендерер моделей `*.msh` без внешних зависимостей.
-
-- вход: архив `NRes` (например `animals.rlb`) или прямой payload модели;
-- выход: изображение `PPM` (`P6`);
-- использует `Res3` (позиции), `Res6` (индексы), `Res13` (батчи), `Res1/Res2` (выбор слотов по `lod/group`).
-
-Показать доступные модели в архиве:
-
-```bash
-python3 tools/msh_preview_renderer.py list-models --archive testdata/nres/animals.rlb
-```
-
-Сгенерировать тестовый рендер:
-
-```bash
-python3 tools/msh_preview_renderer.py render \
- --archive testdata/nres/animals.rlb \
- --model A_L_01.msh \
- --output tmp/renders/A_L_01.ppm \
- --width 800 \
- --height 600 \
- --lod 0 \
- --group 0 \
- --wireframe
-```
-
-Ограничения:
-
-- инструмент предназначен для smoke-теста геометрии, а не для пиксельно-точного рендера движка;
-- текстуры/материалы/эффектные проходы не эмулируются.
-
-## `msh_export_obj.py`
-
-Экспортирует геометрию `*.msh` в `Wavefront OBJ`, чтобы открыть модель в Blender/MeshLab.
-
-- вход: `NRes` архив (например `animals.rlb`) или прямой payload модели;
-- выбор геометрии: через `Res1` slot matrix (`lod/group`) как в рендерере;
-- опция `--all-batches` экспортирует все батчи, игнорируя slot matrix.
-
-Показать модели в архиве:
-
-```bash
-python3 tools/msh_export_obj.py list-models --archive testdata/nres/animals.rlb
-```
-
-Экспорт в OBJ:
-
-```bash
-python3 tools/msh_export_obj.py export \
- --archive testdata/nres/animals.rlb \
- --model A_L_01.msh \
- --output tmp/renders/A_L_01.obj \
- --lod 0 \
- --group 0
-```
-
-Файл `OBJ` можно открыть напрямую в Blender (`File -> Import -> Wavefront (.obj)`).
diff --git a/tools/archive_roundtrip_validator.py b/tools/archive_roundtrip_validator.py
deleted file mode 100644
index 073fd9b..0000000
--- a/tools/archive_roundtrip_validator.py
+++ /dev/null
@@ -1,944 +0,0 @@
-#!/usr/bin/env python3
-"""
-Roundtrip tools for NRes and RsLi archives.
-
-The script can:
-1) scan archives by header signature (ignores file extensions),
-2) unpack / pack NRes archives,
-3) unpack / pack RsLi archives,
-4) validate docs assumptions by full roundtrip and byte-to-byte comparison.
-"""
-
-from __future__ import annotations
-
-import argparse
-import hashlib
-import json
-import re
-import shutil
-import struct
-import tempfile
-import zlib
-from pathlib import Path
-from typing import Any
-
-MAGIC_NRES = b"NRes"
-MAGIC_RSLI = b"NL\x00\x01"
-
-
-class ArchiveFormatError(RuntimeError):
- pass
-
-
-def sha256_hex(data: bytes) -> str:
- return hashlib.sha256(data).hexdigest()
-
-
-def safe_component(value: str, fallback: str = "item", max_len: int = 80) -> str:
- clean = re.sub(r"[^A-Za-z0-9._-]+", "_", value).strip("._-")
- if not clean:
- clean = fallback
- return clean[:max_len]
-
-
-def first_diff(a: bytes, b: bytes) -> tuple[int | None, str | None]:
- if a == b:
- return None, None
- limit = min(len(a), len(b))
- for idx in range(limit):
- if a[idx] != b[idx]:
- return idx, f"{a[idx]:02x}!={b[idx]:02x}"
- return limit, f"len {len(a)}!={len(b)}"
-
-
-def load_json(path: Path) -> dict[str, Any]:
- with path.open("r", encoding="utf-8") as handle:
- return json.load(handle)
-
-
-def dump_json(path: Path, payload: dict[str, Any]) -> None:
- path.parent.mkdir(parents=True, exist_ok=True)
- with path.open("w", encoding="utf-8") as handle:
- json.dump(payload, handle, indent=2, ensure_ascii=False)
- handle.write("\n")
-
-
-def xor_stream(data: bytes, key16: int) -> bytes:
- lo = key16 & 0xFF
- hi = (key16 >> 8) & 0xFF
- out = bytearray(len(data))
- for i, value in enumerate(data):
- lo = (hi ^ ((lo << 1) & 0xFF)) & 0xFF
- out[i] = value ^ lo
- hi = (lo ^ ((hi >> 1) & 0xFF)) & 0xFF
- return bytes(out)
-
-
-def lzss_decompress_simple(data: bytes, expected_size: int) -> bytes:
- ring = bytearray([0x20] * 0x1000)
- ring_pos = 0xFEE
- out = bytearray()
- in_pos = 0
- control = 0
- bits_left = 0
-
- while len(out) < expected_size and in_pos < len(data):
- if bits_left == 0:
- control = data[in_pos]
- in_pos += 1
- bits_left = 8
-
- if control & 1:
- if in_pos >= len(data):
- break
- byte = data[in_pos]
- in_pos += 1
- out.append(byte)
- ring[ring_pos] = byte
- ring_pos = (ring_pos + 1) & 0x0FFF
- else:
- if in_pos + 1 >= len(data):
- break
- low = data[in_pos]
- high = data[in_pos + 1]
- in_pos += 2
- # Real files indicate nibble layout opposite to common LZSS variant:
- # high nibble extends offset, low nibble stores (length - 3).
- offset = low | ((high & 0xF0) << 4)
- length = (high & 0x0F) + 3
- for step in range(length):
- byte = ring[(offset + step) & 0x0FFF]
- out.append(byte)
- ring[ring_pos] = byte
- ring_pos = (ring_pos + 1) & 0x0FFF
- if len(out) >= expected_size:
- break
-
- control >>= 1
- bits_left -= 1
-
- if len(out) != expected_size:
- raise ArchiveFormatError(
- f"LZSS size mismatch: expected {expected_size}, got {len(out)}"
- )
- return bytes(out)
-
-
-def decode_rsli_payload(
- packed: bytes, method: int, sort_to_original: int, unpacked_size: int
-) -> bytes:
- key16 = sort_to_original & 0xFFFF
-
- if method == 0x000:
- out = packed
- elif method == 0x020:
- if len(packed) < unpacked_size:
- raise ArchiveFormatError(
- f"method 0x20 packed too short: {len(packed)} < {unpacked_size}"
- )
- out = xor_stream(packed[:unpacked_size], key16)
- elif method == 0x040:
- out = lzss_decompress_simple(packed, unpacked_size)
- elif method == 0x060:
- out = lzss_decompress_simple(xor_stream(packed, key16), unpacked_size)
- elif method == 0x100:
- try:
- out = zlib.decompress(packed, -15)
- except zlib.error:
- out = zlib.decompress(packed)
- else:
- raise ArchiveFormatError(f"unsupported RsLi method: 0x{method:03X}")
-
- if len(out) != unpacked_size:
- raise ArchiveFormatError(
- f"unpacked_size mismatch: expected {unpacked_size}, got {len(out)}"
- )
- return out
-
-
-def detect_archive_type(path: Path) -> str | None:
- try:
- with path.open("rb") as handle:
- magic = handle.read(4)
- except OSError:
- return None
-
- if magic == MAGIC_NRES:
- return "nres"
- if magic == MAGIC_RSLI:
- return "rsli"
- return None
-
-
-def scan_archives(root: Path) -> list[dict[str, Any]]:
- found: list[dict[str, Any]] = []
- for path in sorted(root.rglob("*")):
- if not path.is_file():
- continue
- archive_type = detect_archive_type(path)
- if not archive_type:
- continue
- found.append(
- {
- "path": str(path),
- "relative_path": str(path.relative_to(root)),
- "type": archive_type,
- "size": path.stat().st_size,
- }
- )
- return found
-
-
-def parse_nres(data: bytes, source: str = "<memory>") -> dict[str, Any]:
- if len(data) < 16:
- raise ArchiveFormatError(f"{source}: NRes too short ({len(data)} bytes)")
-
- magic, version, entry_count, total_size = struct.unpack_from("<4sIII", data, 0)
- if magic != MAGIC_NRES:
- raise ArchiveFormatError(f"{source}: invalid NRes magic")
-
- issues: list[str] = []
- if total_size != len(data):
- issues.append(
- f"header.total_size={total_size} != actual_size={len(data)} (spec 1.2)"
- )
- if version != 0x100:
- issues.append(f"version=0x{version:08X} != 0x00000100 (spec 1.2)")
-
- directory_offset = total_size - entry_count * 64
- if directory_offset < 16 or directory_offset > len(data):
- raise ArchiveFormatError(
- f"{source}: invalid directory offset {directory_offset} for entry_count={entry_count}"
- )
- if directory_offset + entry_count * 64 != len(data):
- issues.append(
- "directory_offset + entry_count*64 != file_size (spec 1.3)"
- )
-
- entries: list[dict[str, Any]] = []
- for index in range(entry_count):
- offset = directory_offset + index * 64
- if offset + 64 > len(data):
- raise ArchiveFormatError(f"{source}: truncated directory entry {index}")
-
- (
- type_id,
- attr1,
- attr2,
- size,
- attr3,
- name_raw,
- data_offset,
- sort_index,
- ) = struct.unpack_from("<IIIII36sII", data, offset)
- name_bytes = name_raw.split(b"\x00", 1)[0]
- name = name_bytes.decode("latin1", errors="replace")
- entries.append(
- {
- "index": index,
- "type_id": type_id,
- "attr1": attr1,
- "attr2": attr2,
- "size": size,
- "attr3": attr3,
- "name": name,
- "name_bytes_hex": name_bytes.hex(),
- "name_raw_hex": name_raw.hex(),
- "data_offset": data_offset,
- "sort_index": sort_index,
- }
- )
-
- # Spec checks.
- expected_sort = sorted(
- range(entry_count),
- key=lambda idx: bytes.fromhex(entries[idx]["name_bytes_hex"]).lower(),
- )
- current_sort = [item["sort_index"] for item in entries]
- if current_sort != expected_sort:
- issues.append(
- "sort_index table does not match case-insensitive name order (spec 1.4)"
- )
-
- data_regions = sorted(
- (
- item["index"],
- item["data_offset"],
- item["size"],
- )
- for item in entries
- )
- for idx, data_offset, size in data_regions:
- if data_offset % 8 != 0:
- issues.append(f"entry {idx}: data_offset={data_offset} not aligned to 8 (spec 1.5)")
- if data_offset < 16 or data_offset + size > directory_offset:
- issues.append(
- f"entry {idx}: data range [{data_offset}, {data_offset + size}) out of data area (spec 1.3)"
- )
- for i in range(len(data_regions) - 1):
- _, start, size = data_regions[i]
- _, next_start, _ = data_regions[i + 1]
- if start + size > next_start:
- issues.append(
- f"entry overlap at data_offset={start}, next={next_start}"
- )
- padding = data[start + size : next_start]
- if any(padding):
- issues.append(
- f"non-zero padding after data block at offset={start + size} (spec 1.5)"
- )
-
- return {
- "format": "NRes",
- "header": {
- "magic": "NRes",
- "version": version,
- "entry_count": entry_count,
- "total_size": total_size,
- "directory_offset": directory_offset,
- },
- "entries": entries,
- "issues": issues,
- }
-
-
-def build_nres_name_field(entry: dict[str, Any]) -> bytes:
- if "name_bytes_hex" in entry:
- raw = bytes.fromhex(entry["name_bytes_hex"])
- else:
- raw = entry.get("name", "").encode("latin1", errors="replace")
- raw = raw[:35]
- return raw + b"\x00" * (36 - len(raw))
-
-
-def unpack_nres_file(archive_path: Path, out_dir: Path, source_root: Path | None = None) -> dict[str, Any]:
- data = archive_path.read_bytes()
- parsed = parse_nres(data, source=str(archive_path))
-
- out_dir.mkdir(parents=True, exist_ok=True)
- entries_dir = out_dir / "entries"
- entries_dir.mkdir(parents=True, exist_ok=True)
-
- manifest: dict[str, Any] = {
- "format": "NRes",
- "source_path": str(archive_path),
- "source_relative_path": str(archive_path.relative_to(source_root)) if source_root else str(archive_path),
- "header": parsed["header"],
- "entries": [],
- "issues": parsed["issues"],
- "source_sha256": sha256_hex(data),
- }
-
- for entry in parsed["entries"]:
- begin = entry["data_offset"]
- end = begin + entry["size"]
- if begin < 0 or end > len(data):
- raise ArchiveFormatError(
- f"{archive_path}: entry {entry['index']} data range outside file"
- )
- payload = data[begin:end]
- base = safe_component(entry["name"], fallback=f"entry_{entry['index']:05d}")
- file_name = (
- f"{entry['index']:05d}__{base}"
- f"__t{entry['type_id']:08X}_a1{entry['attr1']:08X}_a2{entry['attr2']:08X}.bin"
- )
- (entries_dir / file_name).write_bytes(payload)
-
- manifest_entry = dict(entry)
- manifest_entry["data_file"] = f"entries/{file_name}"
- manifest_entry["sha256"] = sha256_hex(payload)
- manifest["entries"].append(manifest_entry)
-
- dump_json(out_dir / "manifest.json", manifest)
- return manifest
-
-
-def pack_nres_manifest(manifest_path: Path, out_file: Path) -> bytes:
- manifest = load_json(manifest_path)
- if manifest.get("format") != "NRes":
- raise ArchiveFormatError(f"{manifest_path}: not an NRes manifest")
-
- entries = manifest["entries"]
- count = len(entries)
- version = int(manifest.get("header", {}).get("version", 0x100))
-
- out = bytearray(b"\x00" * 16)
- data_offsets: list[int] = []
- data_sizes: list[int] = []
-
- for entry in entries:
- payload_path = manifest_path.parent / entry["data_file"]
- payload = payload_path.read_bytes()
- offset = len(out)
- out.extend(payload)
- padding = (-len(out)) % 8
- if padding:
- out.extend(b"\x00" * padding)
- data_offsets.append(offset)
- data_sizes.append(len(payload))
-
- directory_offset = len(out)
- expected_sort = sorted(
- range(count),
- key=lambda idx: bytes.fromhex(entries[idx].get("name_bytes_hex", "")).lower(),
- )
-
- for index, entry in enumerate(entries):
- name_field = build_nres_name_field(entry)
- out.extend(
- struct.pack(
- "<IIIII36sII",
- int(entry["type_id"]),
- int(entry["attr1"]),
- int(entry["attr2"]),
- data_sizes[index],
- int(entry["attr3"]),
- name_field,
- data_offsets[index],
- expected_sort[index],
- )
- )
-
- total_size = len(out)
- struct.pack_into("<4sIII", out, 0, MAGIC_NRES, version, count, total_size)
-
- out_file.parent.mkdir(parents=True, exist_ok=True)
- out_file.write_bytes(out)
- return bytes(out)
-
-
-def parse_rsli(data: bytes, source: str = "<memory>") -> dict[str, Any]:
- if len(data) < 32:
- raise ArchiveFormatError(f"{source}: RsLi too short ({len(data)} bytes)")
- if data[:4] != MAGIC_RSLI:
- raise ArchiveFormatError(f"{source}: invalid RsLi magic")
-
- issues: list[str] = []
- reserved_zero = data[2]
- version = data[3]
- entry_count = struct.unpack_from("<h", data, 4)[0]
- presorted_flag = struct.unpack_from("<H", data, 14)[0]
- seed = struct.unpack_from("<I", data, 20)[0]
-
- if reserved_zero != 0:
- issues.append(f"header[2]={reserved_zero} != 0 (spec 2.2)")
- if version != 1:
- issues.append(f"version={version} != 1 (spec 2.2)")
- if entry_count < 0:
- raise ArchiveFormatError(f"{source}: negative entry_count={entry_count}")
-
- table_offset = 32
- table_size = entry_count * 32
- if table_offset + table_size > len(data):
- raise ArchiveFormatError(
- f"{source}: encrypted table out of file bounds ({table_offset}+{table_size}>{len(data)})"
- )
-
- table_encrypted = data[table_offset : table_offset + table_size]
- table_plain = xor_stream(table_encrypted, seed & 0xFFFF)
-
- trailer: dict[str, Any] = {"present": False}
- overlay_offset = 0
- if len(data) >= 6 and data[-6:-4] == b"AO":
- overlay_offset = struct.unpack_from("<I", data, len(data) - 4)[0]
- trailer = {
- "present": True,
- "signature": "AO",
- "overlay_offset": overlay_offset,
- "raw_hex": data[-6:].hex(),
- }
-
- entries: list[dict[str, Any]] = []
- sort_values: list[int] = []
- for index in range(entry_count):
- row = table_plain[index * 32 : (index + 1) * 32]
- name_raw = row[0:12]
- reserved4 = row[12:16]
- flags_signed, sort_to_original = struct.unpack_from("<hh", row, 16)
- unpacked_size, data_offset, packed_size = struct.unpack_from("<III", row, 20)
- method = flags_signed & 0x1E0
- name = name_raw.split(b"\x00", 1)[0].decode("latin1", errors="replace")
- effective_offset = data_offset + overlay_offset
- entries.append(
- {
- "index": index,
- "name": name,
- "name_raw_hex": name_raw.hex(),
- "reserved_raw_hex": reserved4.hex(),
- "flags_signed": flags_signed,
- "flags_u16": flags_signed & 0xFFFF,
- "method": method,
- "sort_to_original": sort_to_original,
- "unpacked_size": unpacked_size,
- "data_offset": data_offset,
- "effective_data_offset": effective_offset,
- "packed_size": packed_size,
- }
- )
- sort_values.append(sort_to_original)
-
- if effective_offset < 0:
- issues.append(f"entry {index}: negative effective_data_offset={effective_offset}")
- elif effective_offset + packed_size > len(data):
- end = effective_offset + packed_size
- if method == 0x100 and end == len(data) + 1:
- issues.append(
- f"entry {index}: deflate packed_size reaches EOF+1 ({end}); "
- "observed in game data, likely decoder lookahead byte"
- )
- else:
- issues.append(
- f"entry {index}: packed range [{effective_offset}, {end}) out of file"
- )
-
- if presorted_flag == 0xABBA:
- if sorted(sort_values) != list(range(entry_count)):
- issues.append(
- "presorted flag is 0xABBA but sort_to_original is not a permutation [0..N-1] (spec 2.2/2.4)"
- )
-
- return {
- "format": "RsLi",
- "header_raw_hex": data[:32].hex(),
- "header": {
- "magic": "NL\\x00\\x01",
- "entry_count": entry_count,
- "seed": seed,
- "presorted_flag": presorted_flag,
- },
- "entries": entries,
- "issues": issues,
- "trailer": trailer,
- }
-
-
-def unpack_rsli_file(archive_path: Path, out_dir: Path, source_root: Path | None = None) -> dict[str, Any]:
- data = archive_path.read_bytes()
- parsed = parse_rsli(data, source=str(archive_path))
-
- out_dir.mkdir(parents=True, exist_ok=True)
- entries_dir = out_dir / "entries"
- entries_dir.mkdir(parents=True, exist_ok=True)
-
- manifest: dict[str, Any] = {
- "format": "RsLi",
- "source_path": str(archive_path),
- "source_relative_path": str(archive_path.relative_to(source_root)) if source_root else str(archive_path),
- "source_size": len(data),
- "header_raw_hex": parsed["header_raw_hex"],
- "header": parsed["header"],
- "entries": [],
- "issues": list(parsed["issues"]),
- "trailer": parsed["trailer"],
- "source_sha256": sha256_hex(data),
- }
-
- for entry in parsed["entries"]:
- begin = int(entry["effective_data_offset"])
- end = begin + int(entry["packed_size"])
- packed = data[begin:end]
- base = safe_component(entry["name"], fallback=f"entry_{entry['index']:05d}")
- packed_name = f"{entry['index']:05d}__{base}__packed.bin"
- (entries_dir / packed_name).write_bytes(packed)
-
- manifest_entry = dict(entry)
- manifest_entry["packed_file"] = f"entries/{packed_name}"
- manifest_entry["packed_file_size"] = len(packed)
- manifest_entry["packed_sha256"] = sha256_hex(packed)
-
- try:
- unpacked = decode_rsli_payload(
- packed=packed,
- method=int(entry["method"]),
- sort_to_original=int(entry["sort_to_original"]),
- unpacked_size=int(entry["unpacked_size"]),
- )
- unpacked_name = f"{entry['index']:05d}__{base}__unpacked.bin"
- (entries_dir / unpacked_name).write_bytes(unpacked)
- manifest_entry["unpacked_file"] = f"entries/{unpacked_name}"
- manifest_entry["unpacked_sha256"] = sha256_hex(unpacked)
- except ArchiveFormatError as exc:
- manifest_entry["unpack_error"] = str(exc)
- manifest["issues"].append(
- f"entry {entry['index']}: cannot decode method 0x{entry['method']:03X}: {exc}"
- )
-
- manifest["entries"].append(manifest_entry)
-
- dump_json(out_dir / "manifest.json", manifest)
- return manifest
-
-
-def _pack_i16(value: int) -> int:
- if not (-32768 <= int(value) <= 32767):
- raise ArchiveFormatError(f"int16 overflow: {value}")
- return int(value)
-
-
-def pack_rsli_manifest(manifest_path: Path, out_file: Path) -> bytes:
- manifest = load_json(manifest_path)
- if manifest.get("format") != "RsLi":
- raise ArchiveFormatError(f"{manifest_path}: not an RsLi manifest")
-
- entries = manifest["entries"]
- count = len(entries)
-
- header_raw = bytes.fromhex(manifest["header_raw_hex"])
- if len(header_raw) != 32:
- raise ArchiveFormatError(f"{manifest_path}: header_raw_hex must be 32 bytes")
- header = bytearray(header_raw)
- header[:4] = MAGIC_RSLI
- struct.pack_into("<h", header, 4, count)
- seed = int(manifest["header"]["seed"])
- struct.pack_into("<I", header, 20, seed)
-
- rows = bytearray()
- packed_chunks: list[tuple[dict[str, Any], bytes]] = []
-
- for entry in entries:
- packed_path = manifest_path.parent / entry["packed_file"]
- packed = packed_path.read_bytes()
- declared_size = int(entry["packed_size"])
- if len(packed) > declared_size:
- raise ArchiveFormatError(
- f"{packed_path}: packed size {len(packed)} > manifest packed_size {declared_size}"
- )
-
- data_offset = int(entry["data_offset"])
- packed_chunks.append((entry, packed))
-
- row = bytearray(32)
- name_raw = bytes.fromhex(entry["name_raw_hex"])
- reserved_raw = bytes.fromhex(entry["reserved_raw_hex"])
- if len(name_raw) != 12 or len(reserved_raw) != 4:
- raise ArchiveFormatError(
- f"entry {entry['index']}: invalid name/reserved raw length"
- )
- row[0:12] = name_raw
- row[12:16] = reserved_raw
- struct.pack_into(
- "<hhIII",
- row,
- 16,
- _pack_i16(int(entry["flags_signed"])),
- _pack_i16(int(entry["sort_to_original"])),
- int(entry["unpacked_size"]),
- data_offset,
- declared_size,
- )
- rows.extend(row)
-
- encrypted_table = xor_stream(bytes(rows), seed & 0xFFFF)
- trailer = manifest.get("trailer", {})
- trailer_raw = b""
- if trailer.get("present"):
- raw_hex = trailer.get("raw_hex", "")
- trailer_raw = bytes.fromhex(raw_hex)
- if len(trailer_raw) != 6:
- raise ArchiveFormatError("trailer raw length must be 6 bytes")
-
- source_size = manifest.get("source_size")
- table_end = 32 + count * 32
- if source_size is not None:
- pre_trailer_size = int(source_size) - len(trailer_raw)
- if pre_trailer_size < table_end:
- raise ArchiveFormatError(
- f"invalid source_size={source_size}: smaller than header+table"
- )
- else:
- pre_trailer_size = table_end
- for entry, packed in packed_chunks:
- pre_trailer_size = max(
- pre_trailer_size, int(entry["data_offset"]) + len(packed)
- )
-
- out = bytearray(pre_trailer_size)
- out[0:32] = header
- out[32:table_end] = encrypted_table
- occupied = bytearray(pre_trailer_size)
- occupied[0:table_end] = b"\x01" * table_end
-
- for entry, packed in packed_chunks:
- base_offset = int(entry["data_offset"])
- for index, byte in enumerate(packed):
- pos = base_offset + index
- if pos >= pre_trailer_size:
- raise ArchiveFormatError(
- f"entry {entry['index']}: data write at {pos} beyond output size {pre_trailer_size}"
- )
- if occupied[pos] and out[pos] != byte:
- raise ArchiveFormatError(
- f"entry {entry['index']}: overlapping packed data conflict at offset {pos}"
- )
- out[pos] = byte
- occupied[pos] = 1
-
- out.extend(trailer_raw)
- if source_size is not None and len(out) != int(source_size):
- raise ArchiveFormatError(
- f"packed size {len(out)} != source_size {source_size} from manifest"
- )
-
- out_file.parent.mkdir(parents=True, exist_ok=True)
- out_file.write_bytes(out)
- return bytes(out)
-
-
-def cmd_scan(args: argparse.Namespace) -> int:
- root = Path(args.input).resolve()
- archives = scan_archives(root)
- if args.json:
- print(json.dumps(archives, ensure_ascii=False, indent=2))
- else:
- print(f"Found {len(archives)} archive(s) in {root}")
- for item in archives:
- print(f"{item['type']:4} {item['size']:10d} {item['relative_path']}")
- return 0
-
-
-def cmd_nres_unpack(args: argparse.Namespace) -> int:
- archive_path = Path(args.archive).resolve()
- out_dir = Path(args.output).resolve()
- manifest = unpack_nres_file(archive_path, out_dir)
- print(f"NRes unpacked: {archive_path}")
- print(f"Manifest: {out_dir / 'manifest.json'}")
- print(f"Entries : {len(manifest['entries'])}")
- if manifest["issues"]:
- print("Issues:")
- for issue in manifest["issues"]:
- print(f"- {issue}")
- return 0
-
-
-def cmd_nres_pack(args: argparse.Namespace) -> int:
- manifest_path = Path(args.manifest).resolve()
- out_file = Path(args.output).resolve()
- packed = pack_nres_manifest(manifest_path, out_file)
- print(f"NRes packed: {out_file} ({len(packed)} bytes, sha256={sha256_hex(packed)})")
- return 0
-
-
-def cmd_rsli_unpack(args: argparse.Namespace) -> int:
- archive_path = Path(args.archive).resolve()
- out_dir = Path(args.output).resolve()
- manifest = unpack_rsli_file(archive_path, out_dir)
- print(f"RsLi unpacked: {archive_path}")
- print(f"Manifest: {out_dir / 'manifest.json'}")
- print(f"Entries : {len(manifest['entries'])}")
- if manifest["issues"]:
- print("Issues:")
- for issue in manifest["issues"]:
- print(f"- {issue}")
- return 0
-
-
-def cmd_rsli_pack(args: argparse.Namespace) -> int:
- manifest_path = Path(args.manifest).resolve()
- out_file = Path(args.output).resolve()
- packed = pack_rsli_manifest(manifest_path, out_file)
- print(f"RsLi packed: {out_file} ({len(packed)} bytes, sha256={sha256_hex(packed)})")
- return 0
-
-
-def cmd_validate(args: argparse.Namespace) -> int:
- input_root = Path(args.input).resolve()
- archives = scan_archives(input_root)
-
- temp_created = False
- if args.workdir:
- workdir = Path(args.workdir).resolve()
- workdir.mkdir(parents=True, exist_ok=True)
- else:
- workdir = Path(tempfile.mkdtemp(prefix="nres-rsli-validate-"))
- temp_created = True
-
- report: dict[str, Any] = {
- "input_root": str(input_root),
- "workdir": str(workdir),
- "archives_total": len(archives),
- "results": [],
- "summary": {},
- }
-
- failures = 0
- try:
- for idx, item in enumerate(archives):
- rel = item["relative_path"]
- archive_path = input_root / rel
- marker = f"{idx:04d}_{safe_component(rel, fallback='archive')}"
- unpack_dir = workdir / "unpacked" / marker
- repacked_file = workdir / "repacked" / f"{marker}.bin"
- try:
- if item["type"] == "nres":
- manifest = unpack_nres_file(archive_path, unpack_dir, source_root=input_root)
- repacked = pack_nres_manifest(unpack_dir / "manifest.json", repacked_file)
- elif item["type"] == "rsli":
- manifest = unpack_rsli_file(archive_path, unpack_dir, source_root=input_root)
- repacked = pack_rsli_manifest(unpack_dir / "manifest.json", repacked_file)
- else:
- continue
-
- original = archive_path.read_bytes()
- match = original == repacked
- diff_offset, diff_desc = first_diff(original, repacked)
- issues = list(manifest.get("issues", []))
- result = {
- "relative_path": rel,
- "type": item["type"],
- "size_original": len(original),
- "size_repacked": len(repacked),
- "sha256_original": sha256_hex(original),
- "sha256_repacked": sha256_hex(repacked),
- "match": match,
- "first_diff_offset": diff_offset,
- "first_diff": diff_desc,
- "issues": issues,
- "entries": len(manifest.get("entries", [])),
- "error": None,
- }
- except Exception as exc: # pylint: disable=broad-except
- result = {
- "relative_path": rel,
- "type": item["type"],
- "size_original": item["size"],
- "size_repacked": None,
- "sha256_original": None,
- "sha256_repacked": None,
- "match": False,
- "first_diff_offset": None,
- "first_diff": None,
- "issues": [f"processing error: {exc}"],
- "entries": None,
- "error": str(exc),
- }
-
- report["results"].append(result)
-
- if not result["match"]:
- failures += 1
- if result["issues"] and args.fail_on_issues:
- failures += 1
-
- matches = sum(1 for row in report["results"] if row["match"])
- mismatches = len(report["results"]) - matches
- nres_count = sum(1 for row in report["results"] if row["type"] == "nres")
- rsli_count = sum(1 for row in report["results"] if row["type"] == "rsli")
- issues_total = sum(len(row["issues"]) for row in report["results"])
- report["summary"] = {
- "nres_count": nres_count,
- "rsli_count": rsli_count,
- "matches": matches,
- "mismatches": mismatches,
- "issues_total": issues_total,
- }
-
- if args.report:
- dump_json(Path(args.report).resolve(), report)
-
- print(f"Input root : {input_root}")
- print(f"Work dir : {workdir}")
- print(f"NRes archives : {nres_count}")
- print(f"RsLi archives : {rsli_count}")
- print(f"Roundtrip match: {matches}/{len(report['results'])}")
- print(f"Doc issues : {issues_total}")
-
- if mismatches:
- print("\nMismatches:")
- for row in report["results"]:
- if row["match"]:
- continue
- print(
- f"- {row['relative_path']} [{row['type']}] "
- f"diff@{row['first_diff_offset']}: {row['first_diff']}"
- )
-
- if issues_total:
- print("\nIssues:")
- for row in report["results"]:
- if not row["issues"]:
- continue
- print(f"- {row['relative_path']} [{row['type']}]")
- for issue in row["issues"]:
- print(f" * {issue}")
-
- finally:
- if temp_created or args.cleanup:
- shutil.rmtree(workdir, ignore_errors=True)
-
- if failures > 0:
- return 1
- if report["summary"].get("mismatches", 0) > 0 and args.fail_on_diff:
- return 1
- return 0
-
-
-def build_parser() -> argparse.ArgumentParser:
- parser = argparse.ArgumentParser(
- description="NRes/RsLi tools: scan, unpack, repack, and roundtrip validation."
- )
- sub = parser.add_subparsers(dest="command", required=True)
-
- scan = sub.add_parser("scan", help="Scan files by header signatures.")
- scan.add_argument("--input", required=True, help="Root directory to scan.")
- scan.add_argument("--json", action="store_true", help="Print JSON output.")
- scan.set_defaults(func=cmd_scan)
-
- nres_unpack = sub.add_parser("nres-unpack", help="Unpack a single NRes archive.")
- nres_unpack.add_argument("--archive", required=True, help="Path to NRes file.")
- nres_unpack.add_argument("--output", required=True, help="Output directory.")
- nres_unpack.set_defaults(func=cmd_nres_unpack)
-
- nres_pack = sub.add_parser("nres-pack", help="Pack NRes archive from manifest.")
- nres_pack.add_argument("--manifest", required=True, help="Path to manifest.json.")
- nres_pack.add_argument("--output", required=True, help="Output file path.")
- nres_pack.set_defaults(func=cmd_nres_pack)
-
- rsli_unpack = sub.add_parser("rsli-unpack", help="Unpack a single RsLi archive.")
- rsli_unpack.add_argument("--archive", required=True, help="Path to RsLi file.")
- rsli_unpack.add_argument("--output", required=True, help="Output directory.")
- rsli_unpack.set_defaults(func=cmd_rsli_unpack)
-
- rsli_pack = sub.add_parser("rsli-pack", help="Pack RsLi archive from manifest.")
- rsli_pack.add_argument("--manifest", required=True, help="Path to manifest.json.")
- rsli_pack.add_argument("--output", required=True, help="Output file path.")
- rsli_pack.set_defaults(func=cmd_rsli_pack)
-
- validate = sub.add_parser(
- "validate",
- help="Scan all archives and run unpack->repack->byte-compare validation.",
- )
- validate.add_argument("--input", required=True, help="Root with game data files.")
- validate.add_argument(
- "--workdir",
- help="Working directory for temporary unpack/repack files. "
- "If omitted, a temporary directory is used and removed automatically.",
- )
- validate.add_argument("--report", help="Optional JSON report output path.")
- validate.add_argument(
- "--fail-on-diff",
- action="store_true",
- help="Return non-zero exit code if any byte mismatch exists.",
- )
- validate.add_argument(
- "--fail-on-issues",
- action="store_true",
- help="Return non-zero exit code if any spec issue was detected.",
- )
- validate.add_argument(
- "--cleanup",
- action="store_true",
- help="Remove --workdir after completion.",
- )
- validate.set_defaults(func=cmd_validate)
-
- return parser
-
-
-def main() -> int:
- parser = build_parser()
- args = parser.parse_args()
- return int(args.func(args))
-
-
-if __name__ == "__main__":
- raise SystemExit(main())
diff --git a/tools/fxid_abs100_audit.py b/tools/fxid_abs100_audit.py
deleted file mode 100644
index 79f3b92..0000000
--- a/tools/fxid_abs100_audit.py
+++ /dev/null
@@ -1,262 +0,0 @@
-#!/usr/bin/env python3
-"""
-Deterministic audit for FXID "absolute parity" checklist.
-
-What this script produces:
-1) strict parsing stats across all FXID payloads in NRes archives,
-2) opcode histogram and rare-branch counters (op6, op1 tail usage),
-3) reference vectors for RNG core (sub_10002220 semantics).
-"""
-
-from __future__ import annotations
-
-import argparse
-import json
-import struct
-from collections import Counter
-from pathlib import Path
-from typing import Any
-
-import archive_roundtrip_validator as arv
-
-TYPE_FXID = 0x44495846
-FX_CMD_SIZE = {1: 224, 2: 148, 3: 200, 4: 204, 5: 112, 6: 4, 7: 208, 8: 248, 9: 208, 10: 208}
-
-
-def _entry_payload(blob: bytes, entry: dict[str, Any]) -> bytes:
- start = int(entry["data_offset"])
- end = start + int(entry["size"])
- return blob[start:end]
-
-
-def _cstr32(raw: bytes) -> str:
- return raw.split(b"\x00", 1)[0].decode("latin1", errors="replace")
-
-
-def _rng_step_sub_10002220(state32: int) -> tuple[int, int]:
- """
- sub_10002220 semantics in 32-bit packed state form:
- lo = state[15:0], hi = state[31:16]
- new_lo = hi ^ (lo << 1)
- new_hi = (hi >> 1) ^ new_lo
- return new_hi (u16), update state=(new_hi<<16)|new_lo
- """
- lo = state32 & 0xFFFF
- hi = (state32 >> 16) & 0xFFFF
- new_lo = (hi ^ ((lo << 1) & 0xFFFF)) & 0xFFFF
- new_hi = ((hi >> 1) ^ new_lo) & 0xFFFF
- return ((new_hi << 16) | new_lo), new_hi
-
-
-def _rng_vectors() -> dict[str, Any]:
- seeds = [0x00000000, 0x00000001, 0x12345678, 0x89ABCDEF, 0xFFFFFFFF]
- out: list[dict[str, Any]] = []
- for seed in seeds:
- state = seed
- outputs: list[int] = []
- states: list[int] = []
- for _ in range(16):
- state, value = _rng_step_sub_10002220(state)
- outputs.append(value)
- states.append(state)
- out.append(
- {
- "seed_hex": f"0x{seed:08X}",
- "outputs_u16_hex": [f"0x{x:04X}" for x in outputs],
- "states_u32_hex": [f"0x{x:08X}" for x in states],
- }
- )
- return {"generator": "sub_10002220", "vectors": out}
-
-
-def run_audit(root: Path) -> dict[str, Any]:
- counters: Counter[str] = Counter()
- opcode_hist: Counter[int] = Counter()
- issues: list[dict[str, Any]] = []
- op1_tail6_samples: list[dict[str, Any]] = []
- op1_optref_samples: list[dict[str, Any]] = []
-
- for item in arv.scan_archives(root):
- if item["type"] != "nres":
- continue
- archive_path = root / item["relative_path"]
- counters["archives_total"] += 1
- data = archive_path.read_bytes()
- try:
- parsed = arv.parse_nres(data, source=str(archive_path))
- except Exception as exc: # pylint: disable=broad-except
- issues.append(
- {
- "severity": "error",
- "archive": str(archive_path),
- "entry": None,
- "message": f"cannot parse NRes: {exc}",
- }
- )
- continue
-
- for entry in parsed["entries"]:
- if int(entry["type_id"]) != TYPE_FXID:
- continue
- counters["fxid_total"] += 1
- payload = _entry_payload(data, entry)
- entry_name = str(entry["name"])
-
- if len(payload) < 60:
- issues.append(
- {
- "severity": "error",
- "archive": str(archive_path),
- "entry": entry_name,
- "message": f"payload too small: {len(payload)}",
- }
- )
- continue
-
- cmd_count = struct.unpack_from("<I", payload, 0)[0]
- ptr = 0x3C
- ok = True
- for idx in range(cmd_count):
- if ptr + 4 > len(payload):
- issues.append(
- {
- "severity": "error",
- "archive": str(archive_path),
- "entry": entry_name,
- "message": f"command {idx}: missing header at offset={ptr}",
- }
- )
- ok = False
- break
-
- word = struct.unpack_from("<I", payload, ptr)[0]
- opcode = word & 0xFF
- size = FX_CMD_SIZE.get(opcode)
- if size is None:
- issues.append(
- {
- "severity": "error",
- "archive": str(archive_path),
- "entry": entry_name,
- "message": f"command {idx}: unknown opcode={opcode} at offset={ptr}",
- }
- )
- ok = False
- break
-
- if ptr + size > len(payload):
- issues.append(
- {
- "severity": "error",
- "archive": str(archive_path),
- "entry": entry_name,
- "message": f"command {idx}: truncated end={ptr + size}, payload={len(payload)}",
- }
- )
- ok = False
- break
-
- opcode_hist[opcode] += 1
- if opcode == 6:
- counters["op6_commands"] += 1
- if opcode == 1:
- tail6 = payload[ptr + 136 : ptr + 160]
- if any(tail6):
- counters["op1_tail6_nonzero"] += 1
- if len(op1_tail6_samples) < 16:
- dwords = list(struct.unpack("<6I", tail6))
- op1_tail6_samples.append(
- {
- "archive": str(archive_path),
- "entry": entry_name,
- "cmd_index": idx,
- "tail6_u32_hex": [f"0x{x:08X}" for x in dwords],
- }
- )
-
- archive_s = _cstr32(payload[ptr + 160 : ptr + 192])
- name_s = _cstr32(payload[ptr + 192 : ptr + 224])
- if archive_s or name_s:
- counters["op1_optref_nonempty"] += 1
- if len(op1_optref_samples) < 16:
- op1_optref_samples.append(
- {
- "archive": str(archive_path),
- "entry": entry_name,
- "cmd_index": idx,
- "opt_archive": archive_s,
- "opt_name": name_s,
- }
- )
-
- ptr += size
-
- if ok and ptr != len(payload):
- issues.append(
- {
- "severity": "error",
- "archive": str(archive_path),
- "entry": entry_name,
- "message": f"tail bytes after command stream: parsed_end={ptr}, payload={len(payload)}",
- }
- )
- ok = False
-
- if ok:
- counters["fxid_ok"] += 1
-
- return {
- "input_root": str(root),
- "summary": {
- "archives_total": counters["archives_total"],
- "fxid_total": counters["fxid_total"],
- "fxid_ok": counters["fxid_ok"],
- "issues_total": len(issues),
- "op6_commands": counters["op6_commands"],
- "op1_tail6_nonzero": counters["op1_tail6_nonzero"],
- "op1_optref_nonempty": counters["op1_optref_nonempty"],
- },
- "opcode_histogram": {str(k): opcode_hist[k] for k in sorted(opcode_hist)},
- "op1_tail6_samples": op1_tail6_samples,
- "op1_optref_samples": op1_optref_samples,
- "rng_reference": _rng_vectors(),
- "rng_states_fx_path": [
- {"state": "dword_10023688", "seed_init": "sub_10002660", "used_by": ["sub_10001720", "sub_10001A40"]},
- {"state": "dword_100238C0", "seed_init": "sub_10003A50", "used_by": ["sub_10002BE0"]},
- {"state": "dword_10024110", "seed_init": "sub_10009180", "used_by": ["sub_10008120", "sub_10007D10"]},
- {"state": "dword_10024810", "seed_init": "sub_1000D370", "used_by": ["sub_1000BF30", "sub_1000C1A0"]},
- {"state": "dword_10024A48", "seed_init": "sub_1000F420", "used_by": ["sub_1000EC50"]},
- {"state": "dword_10024C80", "seed_init": "sub_10010370", "used_by": ["sub_1000F6E0"]},
- {"state": "dword_100250F0", "seed_init": "sub_10012C70", "used_by": ["sub_10011230", "sub_100115C0"]},
- ],
- "issues": issues,
- }
-
-
-def main() -> int:
- parser = argparse.ArgumentParser(description="FXID absolute parity audit.")
- parser.add_argument("--input", required=True, help="Root directory with game/test archives.")
- parser.add_argument("--report", required=True, help="Output JSON report path.")
- args = parser.parse_args()
-
- root = Path(args.input).resolve()
- report_path = Path(args.report).resolve()
- payload = run_audit(root)
- report_path.parent.mkdir(parents=True, exist_ok=True)
- report_path.write_text(json.dumps(payload, indent=2, ensure_ascii=False) + "\n", encoding="utf-8")
-
- summary = payload["summary"]
- print(f"Input root : {root}")
- print(f"NRes archives : {summary['archives_total']}")
- print(f"FXID payloads : {summary['fxid_ok']}/{summary['fxid_total']} valid")
- print(f"Issues : {summary['issues_total']}")
- print(f"Opcode6 commands : {summary['op6_commands']}")
- print(f"Op1 tail6 nonzero : {summary['op1_tail6_nonzero']}")
- print(f"Op1 optref non-empty : {summary['op1_optref_nonempty']}")
- print(f"Report : {report_path}")
-
- return 0
-
-
-if __name__ == "__main__":
- raise SystemExit(main())
diff --git a/tools/init_testdata.py b/tools/init_testdata.py
deleted file mode 100644
index 4079cdb..0000000
--- a/tools/init_testdata.py
+++ /dev/null
@@ -1,204 +0,0 @@
-#!/usr/bin/env python3
-"""
-Initialize test data folders by archive signatures.
-
-The script scans all files in --input and copies matching archives into:
- --output/nres/<relative path>
- --output/rsli/<relative path>
-"""
-
-from __future__ import annotations
-
-import argparse
-import shutil
-import sys
-from pathlib import Path
-
-MAGIC_NRES = b"NRes"
-MAGIC_RSLI = b"NL\x00\x01"
-
-
-def is_relative_to(path: Path, base: Path) -> bool:
- try:
- path.relative_to(base)
- except ValueError:
- return False
- return True
-
-
-def detect_archive_type(path: Path) -> str | None:
- try:
- with path.open("rb") as handle:
- magic = handle.read(4)
- except OSError as exc:
- print(f"[warn] cannot read {path}: {exc}", file=sys.stderr)
- return None
-
- if magic == MAGIC_NRES:
- return "nres"
- if magic == MAGIC_RSLI:
- return "rsli"
- return None
-
-
-def scan_archives(input_root: Path, excluded_root: Path | None) -> list[tuple[Path, str]]:
- found: list[tuple[Path, str]] = []
- for path in sorted(input_root.rglob("*")):
- if not path.is_file():
- continue
- if excluded_root and is_relative_to(path.resolve(), excluded_root):
- continue
-
- archive_type = detect_archive_type(path)
- if archive_type:
- found.append((path, archive_type))
- return found
-
-
-def confirm_overwrite(path: Path) -> str:
- prompt = (
- f"File exists: {path}\n"
- "Overwrite? [y]es / [n]o / [a]ll / [q]uit (default: n): "
- )
- while True:
- try:
- answer = input(prompt).strip().lower()
- except EOFError:
- return "quit"
-
- if answer in {"", "n", "no"}:
- return "no"
- if answer in {"y", "yes"}:
- return "yes"
- if answer in {"a", "all"}:
- return "all"
- if answer in {"q", "quit"}:
- return "quit"
- print("Please answer with y, n, a, or q.")
-
-
-def copy_archives(
- archives: list[tuple[Path, str]],
- input_root: Path,
- output_root: Path,
- force: bool,
-) -> int:
- copied = 0
- skipped = 0
- overwritten = 0
- overwrite_all = force
-
- type_counts = {"nres": 0, "rsli": 0}
- for _, archive_type in archives:
- type_counts[archive_type] += 1
-
- print(
- f"Found archives: total={len(archives)}, "
- f"nres={type_counts['nres']}, rsli={type_counts['rsli']}"
- )
-
- for source, archive_type in archives:
- rel_path = source.relative_to(input_root)
- destination = output_root / archive_type / rel_path
- destination.parent.mkdir(parents=True, exist_ok=True)
-
- if destination.exists():
- if destination.is_dir():
- print(
- f"[error] destination is a directory, expected file: {destination}",
- file=sys.stderr,
- )
- return 2
-
- if not overwrite_all:
- if not sys.stdin.isatty():
- print(
- "[error] destination file exists but stdin is not interactive. "
- "Use --force to overwrite without prompts.",
- file=sys.stderr,
- )
- return 2
-
- decision = confirm_overwrite(destination)
- if decision == "quit":
- print("Aborted by user.")
- return 130
- if decision == "no":
- skipped += 1
- continue
- if decision == "all":
- overwrite_all = True
-
- overwritten += 1
-
- try:
- shutil.copy2(source, destination)
- except OSError as exc:
- print(f"[error] failed to copy {source} -> {destination}: {exc}", file=sys.stderr)
- return 2
- copied += 1
-
- print(
- f"Done: copied={copied}, overwritten={overwritten}, skipped={skipped}, "
- f"output={output_root}"
- )
- return 0
-
-
-def build_parser() -> argparse.ArgumentParser:
- parser = argparse.ArgumentParser(
- description="Initialize test data by scanning NRes/RsLi signatures."
- )
- parser.add_argument(
- "--input",
- required=True,
- help="Input directory to scan recursively.",
- )
- parser.add_argument(
- "--output",
- required=True,
- help="Output root directory (archives go to nres/ and rsli/ subdirs).",
- )
- parser.add_argument(
- "--force",
- action="store_true",
- help="Overwrite destination files without confirmation prompts.",
- )
- return parser
-
-
-def main() -> int:
- args = build_parser().parse_args()
-
- input_root = Path(args.input)
- if not input_root.exists():
- print(f"[error] input directory does not exist: {input_root}", file=sys.stderr)
- return 2
- if not input_root.is_dir():
- print(f"[error] input path is not a directory: {input_root}", file=sys.stderr)
- return 2
-
- output_root = Path(args.output)
- if output_root.exists() and not output_root.is_dir():
- print(f"[error] output path exists and is not a directory: {output_root}", file=sys.stderr)
- return 2
-
- input_resolved = input_root.resolve()
- output_resolved = output_root.resolve()
- if input_resolved == output_resolved:
- print("[error] input and output directories must be different.", file=sys.stderr)
- return 2
-
- excluded_root: Path | None = None
- if is_relative_to(output_resolved, input_resolved):
- excluded_root = output_resolved
- print(f"Notice: output is inside input, skipping scan under: {excluded_root}")
-
- archives = scan_archives(input_root, excluded_root)
-
- output_root.mkdir(parents=True, exist_ok=True)
- return copy_archives(archives, input_root, output_root, force=args.force)
-
-
-if __name__ == "__main__":
- raise SystemExit(main())
diff --git a/tools/msh_doc_validator.py b/tools/msh_doc_validator.py
deleted file mode 100644
index ff096a4..0000000
--- a/tools/msh_doc_validator.py
+++ /dev/null
@@ -1,1000 +0,0 @@
-#!/usr/bin/env python3
-"""
-Validate assumptions from docs/specs/msh.md on real game archives.
-
-The tool checks three groups:
-1) MSH model payloads (nested NRes in *.msh entries),
-2) Texm texture payloads,
-3) FXID effect payloads.
-"""
-
-from __future__ import annotations
-
-import argparse
-import json
-import math
-import struct
-from collections import Counter
-from pathlib import Path
-from typing import Any
-
-import archive_roundtrip_validator as arv
-
-MAGIC_NRES = b"NRes"
-MAGIC_PAGE = b"Page"
-
-TYPE_FXID = 0x44495846
-TYPE_TEXM = 0x6D786554
-
-FX_CMD_SIZE = {1: 224, 2: 148, 3: 200, 4: 204, 5: 112, 6: 4, 7: 208, 8: 248, 9: 208, 10: 208}
-TEXM_KNOWN_FORMATS = {0, 565, 556, 4444, 888, 8888}
-
-
-def _add_issue(
- issues: list[dict[str, Any]],
- severity: str,
- category: str,
- archive: Path,
- entry_name: str | None,
- message: str,
-) -> None:
- issues.append(
- {
- "severity": severity,
- "category": category,
- "archive": str(archive),
- "entry": entry_name,
- "message": message,
- }
- )
-
-
-def _entry_payload(blob: bytes, entry: dict[str, Any]) -> bytes:
- start = int(entry["data_offset"])
- end = start + int(entry["size"])
- return blob[start:end]
-
-
-def _entry_by_type(entries: list[dict[str, Any]]) -> dict[int, list[dict[str, Any]]]:
- by_type: dict[int, list[dict[str, Any]]] = {}
- for item in entries:
- by_type.setdefault(int(item["type_id"]), []).append(item)
- return by_type
-
-
-def _expect_single_resource(
- by_type: dict[int, list[dict[str, Any]]],
- type_id: int,
- label: str,
- issues: list[dict[str, Any]],
- archive: Path,
- model_name: str,
- required: bool,
-) -> dict[str, Any] | None:
- rows = by_type.get(type_id, [])
- if not rows:
- if required:
- _add_issue(
- issues,
- "error",
- "model-resource",
- archive,
- model_name,
- f"missing required resource type={type_id} ({label})",
- )
- return None
- if len(rows) > 1:
- _add_issue(
- issues,
- "warning",
- "model-resource",
- archive,
- model_name,
- f"multiple resources type={type_id} ({label}); using first entry",
- )
- return rows[0]
-
-
-def _check_fixed_stride(
- *,
- entry: dict[str, Any],
- stride: int,
- label: str,
- issues: list[dict[str, Any]],
- archive: Path,
- model_name: str,
- enforce_attr3: bool = True,
- enforce_attr2_zero: bool = True,
-) -> int:
- size = int(entry["size"])
- attr1 = int(entry["attr1"])
- attr2 = int(entry["attr2"])
- attr3 = int(entry["attr3"])
-
- count = -1
- if size % stride != 0:
- _add_issue(
- issues,
- "error",
- "model-stride",
- archive,
- model_name,
- f"{label}: size={size} is not divisible by stride={stride}",
- )
- else:
- count = size // stride
- if attr1 != count:
- _add_issue(
- issues,
- "error",
- "model-attr",
- archive,
- model_name,
- f"{label}: attr1={attr1} != size/stride={count}",
- )
- if enforce_attr3 and attr3 != stride:
- _add_issue(
- issues,
- "error",
- "model-attr",
- archive,
- model_name,
- f"{label}: attr3={attr3} != {stride}",
- )
- if enforce_attr2_zero and attr2 != 0:
- _add_issue(
- issues,
- "warning",
- "model-attr",
- archive,
- model_name,
- f"{label}: attr2={attr2} (expected 0 in known assets)",
- )
- return count
-
-
-def _validate_res10(
- data: bytes,
- node_count: int,
- issues: list[dict[str, Any]],
- archive: Path,
- model_name: str,
-) -> None:
- off = 0
- for idx in range(node_count):
- if off + 4 > len(data):
- _add_issue(
- issues,
- "error",
- "res10",
- archive,
- model_name,
- f"record {idx}: missing u32 length (offset={off}, size={len(data)})",
- )
- return
- ln = struct.unpack_from("<I", data, off)[0]
- off += 4
- need = ln + 1 if ln else 0
- if off + need > len(data):
- _add_issue(
- issues,
- "error",
- "res10",
- archive,
- model_name,
- f"record {idx}: out of bounds (len={ln}, need={need}, offset={off}, size={len(data)})",
- )
- return
- if ln and data[off + ln] != 0:
- _add_issue(
- issues,
- "warning",
- "res10",
- archive,
- model_name,
- f"record {idx}: missing trailing NUL at payload end",
- )
- off += need
-
- if off != len(data):
- _add_issue(
- issues,
- "error",
- "res10",
- archive,
- model_name,
- f"tail bytes after node records: consumed={off}, size={len(data)}",
- )
-
-
-def _validate_model_payload(
- model_blob: bytes,
- archive: Path,
- model_name: str,
- issues: list[dict[str, Any]],
- counters: Counter[str],
-) -> None:
- counters["models_total"] += 1
-
- if model_blob[:4] != MAGIC_NRES:
- _add_issue(
- issues,
- "error",
- "model-container",
- archive,
- model_name,
- "payload is not NRes (missing magic)",
- )
- return
-
- try:
- parsed = arv.parse_nres(model_blob, source=f"{archive}:{model_name}")
- except Exception as exc: # pylint: disable=broad-except
- _add_issue(
- issues,
- "error",
- "model-container",
- archive,
- model_name,
- f"cannot parse nested NRes: {exc}",
- )
- return
-
- for item in parsed.get("issues", []):
- _add_issue(issues, "warning", "model-container", archive, model_name, str(item))
-
- entries = parsed["entries"]
- by_type = _entry_by_type(entries)
-
- res1 = _expect_single_resource(by_type, 1, "Res1", issues, archive, model_name, True)
- res2 = _expect_single_resource(by_type, 2, "Res2", issues, archive, model_name, True)
- res3 = _expect_single_resource(by_type, 3, "Res3", issues, archive, model_name, True)
- res4 = _expect_single_resource(by_type, 4, "Res4", issues, archive, model_name, False)
- res5 = _expect_single_resource(by_type, 5, "Res5", issues, archive, model_name, False)
- res6 = _expect_single_resource(by_type, 6, "Res6", issues, archive, model_name, True)
- res7 = _expect_single_resource(by_type, 7, "Res7", issues, archive, model_name, False)
- res8 = _expect_single_resource(by_type, 8, "Res8", issues, archive, model_name, False)
- res10 = _expect_single_resource(by_type, 10, "Res10", issues, archive, model_name, False)
- res13 = _expect_single_resource(by_type, 13, "Res13", issues, archive, model_name, True)
- res15 = _expect_single_resource(by_type, 15, "Res15", issues, archive, model_name, False)
- res16 = _expect_single_resource(by_type, 16, "Res16", issues, archive, model_name, False)
- res18 = _expect_single_resource(by_type, 18, "Res18", issues, archive, model_name, False)
- res19 = _expect_single_resource(by_type, 19, "Res19", issues, archive, model_name, False)
-
- if not (res1 and res2 and res3 and res6 and res13):
- return
-
- # Res1
- res1_stride = int(res1["attr3"])
- if res1_stride not in (38, 24):
- _add_issue(
- issues,
- "warning",
- "res1",
- archive,
- model_name,
- f"unexpected Res1 stride attr3={res1_stride} (known: 38 or 24)",
- )
- if res1_stride <= 0:
- _add_issue(issues, "error", "res1", archive, model_name, f"invalid Res1 stride={res1_stride}")
- return
- if int(res1["size"]) % res1_stride != 0:
- _add_issue(
- issues,
- "error",
- "res1",
- archive,
- model_name,
- f"Res1 size={res1['size']} not divisible by stride={res1_stride}",
- )
- return
- node_count = int(res1["size"]) // res1_stride
- if int(res1["attr1"]) != node_count:
- _add_issue(
- issues,
- "error",
- "res1",
- archive,
- model_name,
- f"Res1 attr1={res1['attr1']} != node_count={node_count}",
- )
-
- # Res2
- res2_size = int(res2["size"])
- res2_attr1 = int(res2["attr1"])
- res2_attr2 = int(res2["attr2"])
- res2_attr3 = int(res2["attr3"])
- if res2_size < 0x8C:
- _add_issue(issues, "error", "res2", archive, model_name, f"Res2 too small: size={res2_size}")
- return
- slot_bytes = res2_size - 0x8C
- slot_count = -1
- if slot_bytes % 68 != 0:
- _add_issue(
- issues,
- "error",
- "res2",
- archive,
- model_name,
- f"Res2 slot area not divisible by 68: slot_bytes={slot_bytes}",
- )
- else:
- slot_count = slot_bytes // 68
- if res2_attr1 != slot_count:
- _add_issue(
- issues,
- "error",
- "res2",
- archive,
- model_name,
- f"Res2 attr1={res2_attr1} != slot_count={slot_count}",
- )
- if res2_attr2 != 0:
- _add_issue(
- issues,
- "warning",
- "res2",
- archive,
- model_name,
- f"Res2 attr2={res2_attr2} (expected 0 in known assets)",
- )
- if res2_attr3 != 68:
- _add_issue(
- issues,
- "error",
- "res2",
- archive,
- model_name,
- f"Res2 attr3={res2_attr3} != 68",
- )
-
- # Fixed-stride resources
- vertex_count = _check_fixed_stride(
- entry=res3,
- stride=12,
- label="Res3",
- issues=issues,
- archive=archive,
- model_name=model_name,
- )
- _ = _check_fixed_stride(
- entry=res4,
- stride=4,
- label="Res4",
- issues=issues,
- archive=archive,
- model_name=model_name,
- ) if res4 else None
- _ = _check_fixed_stride(
- entry=res5,
- stride=4,
- label="Res5",
- issues=issues,
- archive=archive,
- model_name=model_name,
- ) if res5 else None
- index_count = _check_fixed_stride(
- entry=res6,
- stride=2,
- label="Res6",
- issues=issues,
- archive=archive,
- model_name=model_name,
- )
- tri_desc_count = _check_fixed_stride(
- entry=res7,
- stride=16,
- label="Res7",
- issues=issues,
- archive=archive,
- model_name=model_name,
- ) if res7 else -1
- anim_key_count = _check_fixed_stride(
- entry=res8,
- stride=24,
- label="Res8",
- issues=issues,
- archive=archive,
- model_name=model_name,
- enforce_attr3=False, # format stores attr3=4 in data set
- ) if res8 else -1
- if res8 and int(res8["attr3"]) != 4:
- _add_issue(
- issues,
- "error",
- "res8",
- archive,
- model_name,
- f"Res8 attr3={res8['attr3']} != 4",
- )
- if res13:
- batch_count = _check_fixed_stride(
- entry=res13,
- stride=20,
- label="Res13",
- issues=issues,
- archive=archive,
- model_name=model_name,
- )
- else:
- batch_count = -1
- if res15:
- _check_fixed_stride(
- entry=res15,
- stride=8,
- label="Res15",
- issues=issues,
- archive=archive,
- model_name=model_name,
- )
- if res16:
- _check_fixed_stride(
- entry=res16,
- stride=8,
- label="Res16",
- issues=issues,
- archive=archive,
- model_name=model_name,
- )
- if res18:
- _check_fixed_stride(
- entry=res18,
- stride=4,
- label="Res18",
- issues=issues,
- archive=archive,
- model_name=model_name,
- )
-
- if res19:
- anim_map_count = _check_fixed_stride(
- entry=res19,
- stride=2,
- label="Res19",
- issues=issues,
- archive=archive,
- model_name=model_name,
- enforce_attr3=False,
- enforce_attr2_zero=False,
- )
- if int(res19["attr3"]) != 2:
- _add_issue(
- issues,
- "error",
- "res19",
- archive,
- model_name,
- f"Res19 attr3={res19['attr3']} != 2",
- )
- else:
- anim_map_count = -1
-
- # Res10
- if res10:
- if int(res10["attr1"]) != int(res1["attr1"]):
- _add_issue(
- issues,
- "error",
- "res10",
- archive,
- model_name,
- f"Res10 attr1={res10['attr1']} != Res1.attr1={res1['attr1']}",
- )
- if int(res10["attr3"]) != 0:
- _add_issue(
- issues,
- "warning",
- "res10",
- archive,
- model_name,
- f"Res10 attr3={res10['attr3']} (known assets use 0)",
- )
- _validate_res10(_entry_payload(model_blob, res10), node_count, issues, archive, model_name)
-
- # Cross-table checks.
- if vertex_count > 0 and (res4 and int(res4["size"]) // 4 != vertex_count):
- _add_issue(issues, "error", "model-cross", archive, model_name, "Res4 count != Res3 count")
- if vertex_count > 0 and (res5 and int(res5["size"]) // 4 != vertex_count):
- _add_issue(issues, "error", "model-cross", archive, model_name, "Res5 count != Res3 count")
-
- indices: list[int] = []
- if index_count > 0:
- res6_data = _entry_payload(model_blob, res6)
- indices = list(struct.unpack_from(f"<{index_count}H", res6_data, 0))
-
- if batch_count > 0:
- res13_data = _entry_payload(model_blob, res13)
- for batch_idx in range(batch_count):
- b_off = batch_idx * 20
- (
- _batch_flags,
- _mat_idx,
- _unk4,
- _unk6,
- idx_count,
- idx_start,
- _unk14,
- base_vertex,
- ) = struct.unpack_from("<HHHHHIHI", res13_data, b_off)
- end = idx_start + idx_count
- if index_count > 0 and end > index_count:
- _add_issue(
- issues,
- "error",
- "res13",
- archive,
- model_name,
- f"batch {batch_idx}: index range [{idx_start}, {end}) outside Res6 count={index_count}",
- )
- continue
- if idx_count % 3 != 0:
- _add_issue(
- issues,
- "warning",
- "res13",
- archive,
- model_name,
- f"batch {batch_idx}: indexCount={idx_count} is not divisible by 3",
- )
- if vertex_count > 0 and index_count > 0 and idx_count > 0:
- raw_slice = indices[idx_start:end]
- max_raw = max(raw_slice)
- if base_vertex + max_raw >= vertex_count:
- _add_issue(
- issues,
- "error",
- "res13",
- archive,
- model_name,
- f"batch {batch_idx}: baseVertex+maxIndex={base_vertex + max_raw} >= vertex_count={vertex_count}",
- )
-
- if slot_count > 0:
- res2_data = _entry_payload(model_blob, res2)
- for slot_idx in range(slot_count):
- s_off = 0x8C + slot_idx * 68
- tri_start, tri_count, batch_start, slot_batch_count = struct.unpack_from("<4H", res2_data, s_off)
- if tri_desc_count > 0 and tri_start + tri_count > tri_desc_count:
- _add_issue(
- issues,
- "error",
- "res2-slot",
- archive,
- model_name,
- f"slot {slot_idx}: tri range [{tri_start}, {tri_start + tri_count}) outside Res7 count={tri_desc_count}",
- )
- if batch_count > 0 and batch_start + slot_batch_count > batch_count:
- _add_issue(
- issues,
- "error",
- "res2-slot",
- archive,
- model_name,
- f"slot {slot_idx}: batch range [{batch_start}, {batch_start + slot_batch_count}) outside Res13 count={batch_count}",
- )
- # Slot bounds are 10 float values.
- for f_idx in range(10):
- value = struct.unpack_from("<f", res2_data, s_off + 8 + f_idx * 4)[0]
- if not math.isfinite(value):
- _add_issue(
- issues,
- "error",
- "res2-slot",
- archive,
- model_name,
- f"slot {slot_idx}: non-finite bound float at field {f_idx}",
- )
- break
-
- if tri_desc_count > 0:
- res7_data = _entry_payload(model_blob, res7)
- for tri_idx in range(tri_desc_count):
- t_off = tri_idx * 16
- _flags, l0, l1, l2 = struct.unpack_from("<4H", res7_data, t_off)
- for link in (l0, l1, l2):
- if link != 0xFFFF and link >= tri_desc_count:
- _add_issue(
- issues,
- "error",
- "res7",
- archive,
- model_name,
- f"tri {tri_idx}: link {link} outside tri_desc_count={tri_desc_count}",
- )
- _ = struct.unpack_from("<H", res7_data, t_off + 14)[0]
-
- # Node-level constraints for slot matrix / animation mapping.
- if res1_stride == 38:
- res1_data = _entry_payload(model_blob, res1)
- map_words: list[int] = []
- if anim_map_count > 0 and res19:
- res19_data = _entry_payload(model_blob, res19)
- map_words = list(struct.unpack_from(f"<{anim_map_count}H", res19_data, 0))
- frame_count = int(res19["attr2"]) if res19 else 0
-
- for node_idx in range(node_count):
- n_off = node_idx * 38
- hdr2 = struct.unpack_from("<H", res1_data, n_off + 4)[0]
- hdr3 = struct.unpack_from("<H", res1_data, n_off + 6)[0]
- # Slot matrix: 15 uint16 at +8.
- for w_idx in range(15):
- slot_idx = struct.unpack_from("<H", res1_data, n_off + 8 + w_idx * 2)[0]
- if slot_idx != 0xFFFF and slot_count > 0 and slot_idx >= slot_count:
- _add_issue(
- issues,
- "error",
- "res1-slot",
- archive,
- model_name,
- f"node {node_idx}: slotIndex[{w_idx}]={slot_idx} outside slot_count={slot_count}",
- )
-
- if anim_key_count > 0 and hdr3 != 0xFFFF and hdr3 >= anim_key_count:
- _add_issue(
- issues,
- "error",
- "res1-anim",
- archive,
- model_name,
- f"node {node_idx}: fallbackKeyIndex={hdr3} outside Res8 count={anim_key_count}",
- )
- if map_words and hdr2 != 0xFFFF and frame_count > 0:
- end = hdr2 + frame_count
- if end > len(map_words):
- _add_issue(
- issues,
- "error",
- "res19-map",
- archive,
- model_name,
- f"node {node_idx}: map range [{hdr2}, {end}) outside Res19 count={len(map_words)}",
- )
-
- counters["models_ok"] += 1
-
-
-def _validate_texm_payload(
- payload: bytes,
- archive: Path,
- entry_name: str,
- issues: list[dict[str, Any]],
- counters: Counter[str],
-) -> None:
- counters["texm_total"] += 1
-
- if len(payload) < 32:
- _add_issue(
- issues,
- "error",
- "texm",
- archive,
- entry_name,
- f"payload too small: {len(payload)}",
- )
- return
-
- magic, width, height, mip_count, flags4, flags5, unk6, fmt = struct.unpack_from("<8I", payload, 0)
- if magic != TYPE_TEXM:
- _add_issue(issues, "error", "texm", archive, entry_name, f"magic=0x{magic:08X} != Texm")
- return
- if width == 0 or height == 0:
- _add_issue(issues, "error", "texm", archive, entry_name, f"invalid size {width}x{height}")
- return
- if mip_count == 0:
- _add_issue(issues, "error", "texm", archive, entry_name, "mipCount=0")
- return
- if fmt not in TEXM_KNOWN_FORMATS:
- _add_issue(
- issues,
- "error",
- "texm",
- archive,
- entry_name,
- f"unknown format code {fmt}",
- )
- return
- if flags4 not in (0, 32):
- _add_issue(
- issues,
- "warning",
- "texm",
- archive,
- entry_name,
- f"flags4={flags4} (known values: 0 or 32)",
- )
- if flags5 not in (0, 0x04000000, 0x00800000):
- _add_issue(
- issues,
- "warning",
- "texm",
- archive,
- entry_name,
- f"flags5=0x{flags5:08X} (known values: 0, 0x00800000, 0x04000000)",
- )
-
- bpp = 1 if fmt == 0 else (2 if fmt in (565, 556, 4444) else 4)
- pix_sum = 0
- w = width
- h = height
- for _ in range(mip_count):
- pix_sum += w * h
- w = max(1, w >> 1)
- h = max(1, h >> 1)
- size_core = 32 + (1024 if fmt == 0 else 0) + bpp * pix_sum
- if size_core > len(payload):
- _add_issue(
- issues,
- "error",
- "texm",
- archive,
- entry_name,
- f"sizeCore={size_core} exceeds payload size={len(payload)}",
- )
- return
-
- tail = len(payload) - size_core
- if tail > 0:
- off = size_core
- if tail < 8:
- _add_issue(
- issues,
- "error",
- "texm",
- archive,
- entry_name,
- f"tail too short for Page chunk: tail={tail}",
- )
- return
- if payload[off : off + 4] != MAGIC_PAGE:
- _add_issue(
- issues,
- "error",
- "texm",
- archive,
- entry_name,
- f"tail is present but no Page magic at offset {off}",
- )
- return
- rect_count = struct.unpack_from("<I", payload, off + 4)[0]
- need = 8 + rect_count * 8
- if need > tail:
- _add_issue(
- issues,
- "error",
- "texm",
- archive,
- entry_name,
- f"Page chunk truncated: need={need}, tail={tail}",
- )
- return
- if need != tail:
- _add_issue(
- issues,
- "error",
- "texm",
- archive,
- entry_name,
- f"extra bytes after Page chunk: tail={tail}, pageSize={need}",
- )
- return
-
- _ = unk6 # carried as raw field in spec, semantics intentionally unknown.
- counters["texm_ok"] += 1
-
-
-def _validate_fxid_payload(
- payload: bytes,
- archive: Path,
- entry_name: str,
- issues: list[dict[str, Any]],
- counters: Counter[str],
-) -> None:
- counters["fxid_total"] += 1
-
- if len(payload) < 60:
- _add_issue(
- issues,
- "error",
- "fxid",
- archive,
- entry_name,
- f"payload too small: {len(payload)}",
- )
- return
-
- cmd_count = struct.unpack_from("<I", payload, 0)[0]
- ptr = 0x3C
- for idx in range(cmd_count):
- if ptr + 4 > len(payload):
- _add_issue(
- issues,
- "error",
- "fxid",
- archive,
- entry_name,
- f"command {idx}: missing header at offset={ptr}",
- )
- return
- word = struct.unpack_from("<I", payload, ptr)[0]
- opcode = word & 0xFF
- if opcode not in FX_CMD_SIZE:
- _add_issue(
- issues,
- "error",
- "fxid",
- archive,
- entry_name,
- f"command {idx}: unknown opcode={opcode} at offset={ptr}",
- )
- return
- size = FX_CMD_SIZE[opcode]
- if ptr + size > len(payload):
- _add_issue(
- issues,
- "error",
- "fxid",
- archive,
- entry_name,
- f"command {idx}: truncated, need end={ptr + size}, payload={len(payload)}",
- )
- return
- ptr += size
-
- if ptr != len(payload):
- _add_issue(
- issues,
- "error",
- "fxid",
- archive,
- entry_name,
- f"tail bytes after command stream: parsed_end={ptr}, payload={len(payload)}",
- )
- return
-
- counters["fxid_ok"] += 1
-
-
-def _scan_nres_files(root: Path) -> list[Path]:
- rows = arv.scan_archives(root)
- out: list[Path] = []
- for item in rows:
- if item["type"] != "nres":
- continue
- out.append(root / item["relative_path"])
- return out
-
-
-def run_validation(input_root: Path) -> dict[str, Any]:
- archives = _scan_nres_files(input_root)
- issues: list[dict[str, Any]] = []
- counters: Counter[str] = Counter()
-
- for archive_path in archives:
- counters["archives_total"] += 1
- data = archive_path.read_bytes()
- try:
- parsed = arv.parse_nres(data, source=str(archive_path))
- except Exception as exc: # pylint: disable=broad-except
- _add_issue(issues, "error", "archive", archive_path, None, f"cannot parse NRes: {exc}")
- continue
-
- for item in parsed.get("issues", []):
- _add_issue(issues, "warning", "archive", archive_path, None, str(item))
-
- for entry in parsed["entries"]:
- name = str(entry["name"])
- payload = _entry_payload(data, entry)
- type_id = int(entry["type_id"])
-
- if name.lower().endswith(".msh"):
- _validate_model_payload(payload, archive_path, name, issues, counters)
-
- if type_id == TYPE_TEXM:
- _validate_texm_payload(payload, archive_path, name, issues, counters)
-
- if type_id == TYPE_FXID:
- _validate_fxid_payload(payload, archive_path, name, issues, counters)
-
- errors = sum(1 for row in issues if row["severity"] == "error")
- warnings = sum(1 for row in issues if row["severity"] == "warning")
-
- return {
- "input_root": str(input_root),
- "summary": {
- "archives_total": counters["archives_total"],
- "models_total": counters["models_total"],
- "models_ok": counters["models_ok"],
- "texm_total": counters["texm_total"],
- "texm_ok": counters["texm_ok"],
- "fxid_total": counters["fxid_total"],
- "fxid_ok": counters["fxid_ok"],
- "errors": errors,
- "warnings": warnings,
- "issues_total": len(issues),
- },
- "issues": issues,
- }
-
-
-def cmd_scan(args: argparse.Namespace) -> int:
- root = Path(args.input).resolve()
- report = run_validation(root)
- summary = report["summary"]
- print(f"Input root : {root}")
- print(f"NRes archives : {summary['archives_total']}")
- print(f"MSH models : {summary['models_total']}")
- print(f"Texm textures : {summary['texm_total']}")
- print(f"FXID effects : {summary['fxid_total']}")
- return 0
-
-
-def cmd_validate(args: argparse.Namespace) -> int:
- root = Path(args.input).resolve()
- report = run_validation(root)
- summary = report["summary"]
-
- if args.report:
- arv.dump_json(Path(args.report).resolve(), report)
-
- print(f"Input root : {root}")
- print(f"NRes archives : {summary['archives_total']}")
- print(f"MSH models : {summary['models_ok']}/{summary['models_total']} valid")
- print(f"Texm textures : {summary['texm_ok']}/{summary['texm_total']} valid")
- print(f"FXID effects : {summary['fxid_ok']}/{summary['fxid_total']} valid")
- print(f"Issues : {summary['issues_total']} (errors={summary['errors']}, warnings={summary['warnings']})")
-
- if report["issues"]:
- limit = max(1, int(args.print_limit))
- print("\nSample issues:")
- for item in report["issues"][:limit]:
- where = item["archive"]
- if item["entry"]:
- where = f"{where}::{item['entry']}"
- print(f"- [{item['severity']}] [{item['category']}] {where}: {item['message']}")
- if len(report["issues"]) > limit:
- print(f"... and {len(report['issues']) - limit} more issue(s)")
-
- if summary["errors"] > 0:
- return 1
- if args.fail_on_warnings and summary["warnings"] > 0:
- return 1
- return 0
-
-
-def build_parser() -> argparse.ArgumentParser:
- parser = argparse.ArgumentParser(
- description="Validate docs/specs/msh.md assumptions on real archives."
- )
- sub = parser.add_subparsers(dest="command", required=True)
-
- scan = sub.add_parser("scan", help="Quick scan and counts (models/textures/effects).")
- scan.add_argument("--input", required=True, help="Root directory with game/test archives.")
- scan.set_defaults(func=cmd_scan)
-
- validate = sub.add_parser("validate", help="Run full spec validation.")
- validate.add_argument("--input", required=True, help="Root directory with game/test archives.")
- validate.add_argument("--report", help="Optional JSON report output path.")
- validate.add_argument(
- "--print-limit",
- type=int,
- default=50,
- help="How many issues to print to stdout (default: 50).",
- )
- validate.add_argument(
- "--fail-on-warnings",
- action="store_true",
- help="Return non-zero if warnings are present.",
- )
- validate.set_defaults(func=cmd_validate)
-
- return parser
-
-
-def main() -> int:
- parser = build_parser()
- args = parser.parse_args()
- return int(args.func(args))
-
-
-if __name__ == "__main__":
- raise SystemExit(main())
diff --git a/tools/msh_export_obj.py b/tools/msh_export_obj.py
deleted file mode 100644
index 75a9602..0000000
--- a/tools/msh_export_obj.py
+++ /dev/null
@@ -1,357 +0,0 @@
-#!/usr/bin/env python3
-"""
-Export NGI MSH geometry to Wavefront OBJ.
-
-The exporter is intended for inspection/debugging and uses the same
-batch/slot selection logic as msh_preview_renderer.py.
-"""
-
-from __future__ import annotations
-
-import argparse
-import math
-import struct
-from pathlib import Path
-from typing import Any
-
-import archive_roundtrip_validator as arv
-
-MAGIC_NRES = b"NRes"
-
-
-def _entry_payload(blob: bytes, entry: dict[str, Any]) -> bytes:
- start = int(entry["data_offset"])
- end = start + int(entry["size"])
- return blob[start:end]
-
-
-def _parse_nres(blob: bytes, source: str) -> dict[str, Any]:
- if blob[:4] != MAGIC_NRES:
- raise RuntimeError(f"{source}: not an NRes payload")
- return arv.parse_nres(blob, source=source)
-
-
-def _by_type(entries: list[dict[str, Any]]) -> dict[int, list[dict[str, Any]]]:
- out: dict[int, list[dict[str, Any]]] = {}
- for row in entries:
- out.setdefault(int(row["type_id"]), []).append(row)
- return out
-
-
-def _get_single(by_type: dict[int, list[dict[str, Any]]], type_id: int, label: str) -> dict[str, Any]:
- rows = by_type.get(type_id, [])
- if not rows:
- raise RuntimeError(f"missing resource type {type_id} ({label})")
- return rows[0]
-
-
-def _pick_model_payload(archive_path: Path, model_name: str | None) -> tuple[bytes, str]:
- root_blob = archive_path.read_bytes()
- parsed = _parse_nres(root_blob, str(archive_path))
-
- msh_entries = [row for row in parsed["entries"] if str(row["name"]).lower().endswith(".msh")]
- if msh_entries:
- chosen: dict[str, Any] | None = None
- if model_name:
- model_l = model_name.lower()
- for row in msh_entries:
- name_l = str(row["name"]).lower()
- if name_l == model_l:
- chosen = row
- break
- if chosen is None:
- for row in msh_entries:
- if str(row["name"]).lower().startswith(model_l):
- chosen = row
- break
- else:
- chosen = msh_entries[0]
-
- if chosen is None:
- names = ", ".join(str(row["name"]) for row in msh_entries[:12])
- raise RuntimeError(
- f"model '{model_name}' not found in {archive_path}. Available: {names}"
- )
- return _entry_payload(root_blob, chosen), str(chosen["name"])
-
- by_type = _by_type(parsed["entries"])
- if all(k in by_type for k in (1, 2, 3, 6, 13)):
- return root_blob, archive_path.name
-
- raise RuntimeError(
- f"{archive_path} does not contain .msh entries and does not look like a direct model payload"
- )
-
-
-def _extract_geometry(
- model_blob: bytes,
- *,
- lod: int,
- group: int,
- max_faces: int,
- all_batches: bool,
-) -> tuple[list[tuple[float, float, float]], list[tuple[int, int, int]], dict[str, int]]:
- parsed = _parse_nres(model_blob, "<model>")
- by_type = _by_type(parsed["entries"])
-
- res1 = _get_single(by_type, 1, "Res1")
- res2 = _get_single(by_type, 2, "Res2")
- res3 = _get_single(by_type, 3, "Res3")
- res6 = _get_single(by_type, 6, "Res6")
- res13 = _get_single(by_type, 13, "Res13")
-
- pos_blob = _entry_payload(model_blob, res3)
- if len(pos_blob) % 12 != 0:
- raise RuntimeError(f"Res3 size is not divisible by 12: {len(pos_blob)}")
- vertex_count = len(pos_blob) // 12
- positions = [struct.unpack_from("<3f", pos_blob, i * 12) for i in range(vertex_count)]
-
- idx_blob = _entry_payload(model_blob, res6)
- if len(idx_blob) % 2 != 0:
- raise RuntimeError(f"Res6 size is not divisible by 2: {len(idx_blob)}")
- index_count = len(idx_blob) // 2
- indices = list(struct.unpack_from(f"<{index_count}H", idx_blob, 0))
-
- batch_blob = _entry_payload(model_blob, res13)
- if len(batch_blob) % 20 != 0:
- raise RuntimeError(f"Res13 size is not divisible by 20: {len(batch_blob)}")
- batch_count = len(batch_blob) // 20
- batches: list[tuple[int, int, int, int]] = []
- for i in range(batch_count):
- off = i * 20
- idx_count = struct.unpack_from("<H", batch_blob, off + 8)[0]
- idx_start = struct.unpack_from("<I", batch_blob, off + 10)[0]
- base_vertex = struct.unpack_from("<I", batch_blob, off + 16)[0]
- batches.append((idx_count, idx_start, base_vertex, i))
-
- res2_blob = _entry_payload(model_blob, res2)
- if len(res2_blob) < 0x8C:
- raise RuntimeError("Res2 is too small (< 0x8C)")
- slot_blob = res2_blob[0x8C:]
- if len(slot_blob) % 68 != 0:
- raise RuntimeError(f"Res2 slot area is not divisible by 68: {len(slot_blob)}")
- slot_count = len(slot_blob) // 68
- slots: list[tuple[int, int, int, int]] = []
- for i in range(slot_count):
- off = i * 68
- tri_start, tri_count, batch_start, slot_batch_count = struct.unpack_from("<4H", slot_blob, off)
- slots.append((tri_start, tri_count, batch_start, slot_batch_count))
-
- res1_blob = _entry_payload(model_blob, res1)
- node_stride = int(res1["attr3"])
- node_count = int(res1["attr1"])
- node_slot_indices: list[int] = []
- if not all_batches and node_stride >= 38 and len(res1_blob) >= node_count * node_stride:
- if lod < 0 or lod > 2:
- raise RuntimeError(f"lod must be 0..2 (got {lod})")
- if group < 0 or group > 4:
- raise RuntimeError(f"group must be 0..4 (got {group})")
- matrix_index = lod * 5 + group
- for n in range(node_count):
- off = n * node_stride + 8 + matrix_index * 2
- slot_idx = struct.unpack_from("<H", res1_blob, off)[0]
- if slot_idx == 0xFFFF:
- continue
- if slot_idx >= slot_count:
- continue
- node_slot_indices.append(slot_idx)
-
- faces: list[tuple[int, int, int]] = []
- used_batches = 0
- used_slots = 0
-
- def append_batch(batch_idx: int) -> None:
- nonlocal used_batches
- if batch_idx < 0 or batch_idx >= len(batches):
- return
- idx_count, idx_start, base_vertex, _ = batches[batch_idx]
- if idx_count < 3:
- return
- end = idx_start + idx_count
- if end > len(indices):
- return
- used_batches += 1
- tri_count = idx_count // 3
- for t in range(tri_count):
- i0 = indices[idx_start + t * 3 + 0] + base_vertex
- i1 = indices[idx_start + t * 3 + 1] + base_vertex
- i2 = indices[idx_start + t * 3 + 2] + base_vertex
- if i0 >= vertex_count or i1 >= vertex_count or i2 >= vertex_count:
- continue
- faces.append((i0, i1, i2))
- if len(faces) >= max_faces:
- return
-
- if node_slot_indices:
- for slot_idx in node_slot_indices:
- if len(faces) >= max_faces:
- break
- _tri_start, _tri_count, batch_start, slot_batch_count = slots[slot_idx]
- used_slots += 1
- for bi in range(batch_start, batch_start + slot_batch_count):
- append_batch(bi)
- if len(faces) >= max_faces:
- break
- else:
- for bi in range(batch_count):
- append_batch(bi)
- if len(faces) >= max_faces:
- break
-
- if not faces:
- raise RuntimeError("no faces selected for export")
-
- meta = {
- "vertex_count": vertex_count,
- "index_count": index_count,
- "batch_count": batch_count,
- "slot_count": slot_count,
- "node_count": node_count,
- "used_slots": used_slots,
- "used_batches": used_batches,
- "face_count": len(faces),
- }
- return positions, faces, meta
-
-
-def _compute_vertex_normals(
- positions: list[tuple[float, float, float]],
- faces: list[tuple[int, int, int]],
-) -> list[tuple[float, float, float]]:
- acc = [[0.0, 0.0, 0.0] for _ in positions]
- for i0, i1, i2 in faces:
- p0 = positions[i0]
- p1 = positions[i1]
- p2 = positions[i2]
- ux = p1[0] - p0[0]
- uy = p1[1] - p0[1]
- uz = p1[2] - p0[2]
- vx = p2[0] - p0[0]
- vy = p2[1] - p0[1]
- vz = p2[2] - p0[2]
- nx = uy * vz - uz * vy
- ny = uz * vx - ux * vz
- nz = ux * vy - uy * vx
- acc[i0][0] += nx
- acc[i0][1] += ny
- acc[i0][2] += nz
- acc[i1][0] += nx
- acc[i1][1] += ny
- acc[i1][2] += nz
- acc[i2][0] += nx
- acc[i2][1] += ny
- acc[i2][2] += nz
-
- normals: list[tuple[float, float, float]] = []
- for nx, ny, nz in acc:
- ln = math.sqrt(nx * nx + ny * ny + nz * nz)
- if ln <= 1e-12:
- normals.append((0.0, 1.0, 0.0))
- else:
- normals.append((nx / ln, ny / ln, nz / ln))
- return normals
-
-
-def _write_obj(
- output_path: Path,
- object_name: str,
- positions: list[tuple[float, float, float]],
- faces: list[tuple[int, int, int]],
-) -> None:
- output_path.parent.mkdir(parents=True, exist_ok=True)
- normals = _compute_vertex_normals(positions, faces)
-
- with output_path.open("w", encoding="utf-8", newline="\n") as out:
- out.write("# Exported by msh_export_obj.py\n")
- out.write(f"o {object_name}\n")
- for x, y, z in positions:
- out.write(f"v {x:.9g} {y:.9g} {z:.9g}\n")
- for nx, ny, nz in normals:
- out.write(f"vn {nx:.9g} {ny:.9g} {nz:.9g}\n")
- for i0, i1, i2 in faces:
- a = i0 + 1
- b = i1 + 1
- c = i2 + 1
- out.write(f"f {a}//{a} {b}//{b} {c}//{c}\n")
-
-
-def cmd_list_models(args: argparse.Namespace) -> int:
- archive_path = Path(args.archive).resolve()
- blob = archive_path.read_bytes()
- parsed = _parse_nres(blob, str(archive_path))
- rows = [row for row in parsed["entries"] if str(row["name"]).lower().endswith(".msh")]
- print(f"Archive: {archive_path}")
- print(f"MSH entries: {len(rows)}")
- for row in rows:
- print(f"- {row['name']}")
- return 0
-
-
-def cmd_export(args: argparse.Namespace) -> int:
- archive_path = Path(args.archive).resolve()
- output_path = Path(args.output).resolve()
-
- model_blob, model_label = _pick_model_payload(archive_path, args.model)
- positions, faces, meta = _extract_geometry(
- model_blob,
- lod=int(args.lod),
- group=int(args.group),
- max_faces=int(args.max_faces),
- all_batches=bool(args.all_batches),
- )
- obj_name = Path(model_label).stem or "msh_model"
- _write_obj(output_path, obj_name, positions, faces)
-
- print(f"Exported model : {model_label}")
- print(f"Output OBJ : {output_path}")
- print(f"Object name : {obj_name}")
- print(
- "Geometry : "
- f"vertices={meta['vertex_count']}, faces={meta['face_count']}, "
- f"batches={meta['used_batches']}/{meta['batch_count']}, slots={meta['used_slots']}/{meta['slot_count']}"
- )
- print(
- "Mode : "
- f"lod={args.lod}, group={args.group}, all_batches={bool(args.all_batches)}"
- )
- return 0
-
-
-def build_parser() -> argparse.ArgumentParser:
- parser = argparse.ArgumentParser(
- description="Export NGI MSH geometry to Wavefront OBJ."
- )
- sub = parser.add_subparsers(dest="command", required=True)
-
- list_models = sub.add_parser("list-models", help="List .msh entries in an NRes archive.")
- list_models.add_argument("--archive", required=True, help="Path to archive (e.g. animals.rlb).")
- list_models.set_defaults(func=cmd_list_models)
-
- export = sub.add_parser("export", help="Export one model to OBJ.")
- export.add_argument("--archive", required=True, help="Path to NRes archive or direct model payload.")
- export.add_argument(
- "--model",
- help="Model entry name (*.msh) inside archive. If omitted, first .msh is used.",
- )
- export.add_argument("--output", required=True, help="Output .obj path.")
- export.add_argument("--lod", type=int, default=0, help="LOD index 0..2 (default: 0).")
- export.add_argument("--group", type=int, default=0, help="Group index 0..4 (default: 0).")
- export.add_argument("--max-faces", type=int, default=120000, help="Face limit (default: 120000).")
- export.add_argument(
- "--all-batches",
- action="store_true",
- help="Ignore slot matrix selection and export all batches.",
- )
- export.set_defaults(func=cmd_export)
-
- return parser
-
-
-def main() -> int:
- parser = build_parser()
- args = parser.parse_args()
- return int(args.func(args))
-
-
-if __name__ == "__main__":
- raise SystemExit(main())
diff --git a/tools/msh_preview_renderer.py b/tools/msh_preview_renderer.py
deleted file mode 100644
index 53b4e63..0000000
--- a/tools/msh_preview_renderer.py
+++ /dev/null
@@ -1,481 +0,0 @@
-#!/usr/bin/env python3
-"""
-Primitive software renderer for NGI MSH models.
-
-Output format: binary PPM (P6), no external dependencies.
-"""
-
-from __future__ import annotations
-
-import argparse
-import math
-import struct
-from pathlib import Path
-from typing import Any
-
-import archive_roundtrip_validator as arv
-
-MAGIC_NRES = b"NRes"
-
-
-def _entry_payload(blob: bytes, entry: dict[str, Any]) -> bytes:
- start = int(entry["data_offset"])
- end = start + int(entry["size"])
- return blob[start:end]
-
-
-def _parse_nres(blob: bytes, source: str) -> dict[str, Any]:
- if blob[:4] != MAGIC_NRES:
- raise RuntimeError(f"{source}: not an NRes payload")
- return arv.parse_nres(blob, source=source)
-
-
-def _by_type(entries: list[dict[str, Any]]) -> dict[int, list[dict[str, Any]]]:
- out: dict[int, list[dict[str, Any]]] = {}
- for row in entries:
- out.setdefault(int(row["type_id"]), []).append(row)
- return out
-
-
-def _pick_model_payload(archive_path: Path, model_name: str | None) -> tuple[bytes, str]:
- root_blob = archive_path.read_bytes()
- parsed = _parse_nres(root_blob, str(archive_path))
-
- msh_entries = [row for row in parsed["entries"] if str(row["name"]).lower().endswith(".msh")]
- if msh_entries:
- chosen: dict[str, Any] | None = None
- if model_name:
- model_l = model_name.lower()
- for row in msh_entries:
- name_l = str(row["name"]).lower()
- if name_l == model_l:
- chosen = row
- break
- if chosen is None:
- for row in msh_entries:
- if str(row["name"]).lower().startswith(model_l):
- chosen = row
- break
- else:
- chosen = msh_entries[0]
-
- if chosen is None:
- names = ", ".join(str(row["name"]) for row in msh_entries[:12])
- raise RuntimeError(
- f"model '{model_name}' not found in {archive_path}. Available: {names}"
- )
- return _entry_payload(root_blob, chosen), str(chosen["name"])
-
- # Fallback: treat file itself as a model NRes payload.
- by_type = _by_type(parsed["entries"])
- if all(k in by_type for k in (1, 2, 3, 6, 13)):
- return root_blob, archive_path.name
-
- raise RuntimeError(
- f"{archive_path} does not contain .msh entries and does not look like a direct model payload"
- )
-
-
-def _get_single(by_type: dict[int, list[dict[str, Any]]], type_id: int, label: str) -> dict[str, Any]:
- rows = by_type.get(type_id, [])
- if not rows:
- raise RuntimeError(f"missing resource type {type_id} ({label})")
- return rows[0]
-
-
-def _extract_geometry(
- model_blob: bytes,
- *,
- lod: int,
- group: int,
- max_faces: int,
-) -> tuple[list[tuple[float, float, float]], list[tuple[int, int, int]], dict[str, int]]:
- parsed = _parse_nres(model_blob, "<model>")
- by_type = _by_type(parsed["entries"])
-
- res1 = _get_single(by_type, 1, "Res1")
- res2 = _get_single(by_type, 2, "Res2")
- res3 = _get_single(by_type, 3, "Res3")
- res6 = _get_single(by_type, 6, "Res6")
- res13 = _get_single(by_type, 13, "Res13")
-
- # Positions
- pos_blob = _entry_payload(model_blob, res3)
- if len(pos_blob) % 12 != 0:
- raise RuntimeError(f"Res3 size is not divisible by 12: {len(pos_blob)}")
- vertex_count = len(pos_blob) // 12
- positions = [struct.unpack_from("<3f", pos_blob, i * 12) for i in range(vertex_count)]
-
- # Indices
- idx_blob = _entry_payload(model_blob, res6)
- if len(idx_blob) % 2 != 0:
- raise RuntimeError(f"Res6 size is not divisible by 2: {len(idx_blob)}")
- index_count = len(idx_blob) // 2
- indices = list(struct.unpack_from(f"<{index_count}H", idx_blob, 0))
-
- # Batches
- batch_blob = _entry_payload(model_blob, res13)
- if len(batch_blob) % 20 != 0:
- raise RuntimeError(f"Res13 size is not divisible by 20: {len(batch_blob)}")
- batch_count = len(batch_blob) // 20
- batches: list[tuple[int, int, int, int]] = []
- for i in range(batch_count):
- off = i * 20
- # Keep only fields used by renderer:
- # indexCount, indexStart, baseVertex
- idx_count = struct.unpack_from("<H", batch_blob, off + 8)[0]
- idx_start = struct.unpack_from("<I", batch_blob, off + 10)[0]
- base_vertex = struct.unpack_from("<I", batch_blob, off + 16)[0]
- batches.append((idx_count, idx_start, base_vertex, i))
-
- # Slots
- res2_blob = _entry_payload(model_blob, res2)
- if len(res2_blob) < 0x8C:
- raise RuntimeError("Res2 is too small (< 0x8C)")
- slot_blob = res2_blob[0x8C:]
- if len(slot_blob) % 68 != 0:
- raise RuntimeError(f"Res2 slot area is not divisible by 68: {len(slot_blob)}")
- slot_count = len(slot_blob) // 68
- slots: list[tuple[int, int, int, int]] = []
- for i in range(slot_count):
- off = i * 68
- tri_start, tri_count, batch_start, slot_batch_count = struct.unpack_from("<4H", slot_blob, off)
- slots.append((tri_start, tri_count, batch_start, slot_batch_count))
-
- # Nodes / slot matrix
- res1_blob = _entry_payload(model_blob, res1)
- node_stride = int(res1["attr3"])
- node_count = int(res1["attr1"])
- node_slot_indices: list[int] = []
- if node_stride >= 38 and len(res1_blob) >= node_count * node_stride:
- if lod < 0 or lod > 2:
- raise RuntimeError(f"lod must be 0..2 (got {lod})")
- if group < 0 or group > 4:
- raise RuntimeError(f"group must be 0..4 (got {group})")
- matrix_index = lod * 5 + group
- for n in range(node_count):
- off = n * node_stride + 8 + matrix_index * 2
- slot_idx = struct.unpack_from("<H", res1_blob, off)[0]
- if slot_idx == 0xFFFF:
- continue
- if slot_idx >= slot_count:
- continue
- node_slot_indices.append(slot_idx)
-
- # Build triangle list.
- faces: list[tuple[int, int, int]] = []
- used_batches = 0
- used_slots = 0
-
- def append_batch(batch_idx: int) -> None:
- nonlocal used_batches
- if batch_idx < 0 or batch_idx >= len(batches):
- return
- idx_count, idx_start, base_vertex, _ = batches[batch_idx]
- if idx_count < 3:
- return
- end = idx_start + idx_count
- if end > len(indices):
- return
- used_batches += 1
- tri_count = idx_count // 3
- for t in range(tri_count):
- i0 = indices[idx_start + t * 3 + 0] + base_vertex
- i1 = indices[idx_start + t * 3 + 1] + base_vertex
- i2 = indices[idx_start + t * 3 + 2] + base_vertex
- if i0 >= vertex_count or i1 >= vertex_count or i2 >= vertex_count:
- continue
- faces.append((i0, i1, i2))
- if len(faces) >= max_faces:
- return
-
- if node_slot_indices:
- for slot_idx in node_slot_indices:
- if len(faces) >= max_faces:
- break
- _tri_start, _tri_count, batch_start, slot_batch_count = slots[slot_idx]
- used_slots += 1
- for bi in range(batch_start, batch_start + slot_batch_count):
- append_batch(bi)
- if len(faces) >= max_faces:
- break
- else:
- # Fallback if slot matrix is unavailable: draw all batches.
- for bi in range(batch_count):
- append_batch(bi)
- if len(faces) >= max_faces:
- break
-
- meta = {
- "vertex_count": vertex_count,
- "index_count": index_count,
- "batch_count": batch_count,
- "slot_count": slot_count,
- "node_count": node_count,
- "used_slots": used_slots,
- "used_batches": used_batches,
- "face_count": len(faces),
- }
- if not faces:
- raise RuntimeError("no faces selected for rendering")
- return positions, faces, meta
-
-
-def _write_ppm(path: Path, width: int, height: int, rgb: bytearray) -> None:
- path.parent.mkdir(parents=True, exist_ok=True)
- with path.open("wb") as handle:
- handle.write(f"P6\n{width} {height}\n255\n".encode("ascii"))
- handle.write(rgb)
-
-
-def _render_software(
- positions: list[tuple[float, float, float]],
- faces: list[tuple[int, int, int]],
- *,
- width: int,
- height: int,
- yaw_deg: float,
- pitch_deg: float,
- wireframe: bool,
-) -> bytearray:
- xs = [p[0] for p in positions]
- ys = [p[1] for p in positions]
- zs = [p[2] for p in positions]
- cx = (min(xs) + max(xs)) * 0.5
- cy = (min(ys) + max(ys)) * 0.5
- cz = (min(zs) + max(zs)) * 0.5
- span = max(max(xs) - min(xs), max(ys) - min(ys), max(zs) - min(zs))
- radius = max(span * 0.5, 1e-3)
-
- yaw = math.radians(yaw_deg)
- pitch = math.radians(pitch_deg)
- cyaw = math.cos(yaw)
- syaw = math.sin(yaw)
- cpitch = math.cos(pitch)
- spitch = math.sin(pitch)
-
- camera_dist = radius * 3.2
- scale = min(width, height) * 0.95
-
- # Transform all vertices once.
- vx: list[float] = []
- vy: list[float] = []
- vz: list[float] = []
- sx: list[float] = []
- sy: list[float] = []
- for x, y, z in positions:
- x0 = x - cx
- y0 = y - cy
- z0 = z - cz
- x1 = cyaw * x0 + syaw * z0
- z1 = -syaw * x0 + cyaw * z0
- y2 = cpitch * y0 - spitch * z1
- z2 = spitch * y0 + cpitch * z1 + camera_dist
- if z2 < 1e-3:
- z2 = 1e-3
- vx.append(x1)
- vy.append(y2)
- vz.append(z2)
- sx.append(width * 0.5 + (x1 / z2) * scale)
- sy.append(height * 0.5 - (y2 / z2) * scale)
-
- rgb = bytearray([16, 18, 24] * (width * height))
- zbuf = [float("inf")] * (width * height)
- light_dir = (0.35, 0.45, 1.0)
- l_len = math.sqrt(light_dir[0] ** 2 + light_dir[1] ** 2 + light_dir[2] ** 2)
- light = (light_dir[0] / l_len, light_dir[1] / l_len, light_dir[2] / l_len)
-
- def edge(ax: float, ay: float, bx: float, by: float, px: float, py: float) -> float:
- return (px - ax) * (by - ay) - (py - ay) * (bx - ax)
-
- for i0, i1, i2 in faces:
- x0 = sx[i0]
- y0 = sy[i0]
- x1 = sx[i1]
- y1 = sy[i1]
- x2 = sx[i2]
- y2 = sy[i2]
- area = edge(x0, y0, x1, y1, x2, y2)
- if area == 0.0:
- continue
-
- # Shading from camera-space normal.
- ux = vx[i1] - vx[i0]
- uy = vy[i1] - vy[i0]
- uz = vz[i1] - vz[i0]
- wx = vx[i2] - vx[i0]
- wy = vy[i2] - vy[i0]
- wz = vz[i2] - vz[i0]
- nx = uy * wz - uz * wy
- ny = uz * wx - ux * wz
- nz = ux * wy - uy * wx
- n_len = math.sqrt(nx * nx + ny * ny + nz * nz)
- if n_len > 0.0:
- nx /= n_len
- ny /= n_len
- nz /= n_len
- intensity = nx * light[0] + ny * light[1] + nz * light[2]
- if intensity < 0.0:
- intensity = 0.0
- shade = int(45 + 200 * intensity)
- color = (shade, shade, min(255, shade + 18))
-
- minx = int(max(0, math.floor(min(x0, x1, x2))))
- maxx = int(min(width - 1, math.ceil(max(x0, x1, x2))))
- miny = int(max(0, math.floor(min(y0, y1, y2))))
- maxy = int(min(height - 1, math.ceil(max(y0, y1, y2))))
- if minx > maxx or miny > maxy:
- continue
-
- z0 = vz[i0]
- z1 = vz[i1]
- z2 = vz[i2]
-
- for py in range(miny, maxy + 1):
- fy = py + 0.5
- row = py * width
- for px in range(minx, maxx + 1):
- fx = px + 0.5
- w0 = edge(x1, y1, x2, y2, fx, fy)
- w1 = edge(x2, y2, x0, y0, fx, fy)
- w2 = edge(x0, y0, x1, y1, fx, fy)
- if area > 0:
- if w0 < 0 or w1 < 0 or w2 < 0:
- continue
- else:
- if w0 > 0 or w1 > 0 or w2 > 0:
- continue
- inv_area = 1.0 / area
- bz0 = w0 * inv_area
- bz1 = w1 * inv_area
- bz2 = w2 * inv_area
- depth = bz0 * z0 + bz1 * z1 + bz2 * z2
- idx = row + px
- if depth >= zbuf[idx]:
- continue
- zbuf[idx] = depth
- p = idx * 3
- rgb[p + 0] = color[0]
- rgb[p + 1] = color[1]
- rgb[p + 2] = color[2]
-
- if wireframe:
- def draw_line(xa: float, ya: float, xb: float, yb: float) -> None:
- x0i = int(round(xa))
- y0i = int(round(ya))
- x1i = int(round(xb))
- y1i = int(round(yb))
- dx = abs(x1i - x0i)
- sx_step = 1 if x0i < x1i else -1
- dy = -abs(y1i - y0i)
- sy_step = 1 if y0i < y1i else -1
- err = dx + dy
- x = x0i
- y = y0i
- while True:
- if 0 <= x < width and 0 <= y < height:
- p = (y * width + x) * 3
- rgb[p + 0] = 240
- rgb[p + 1] = 245
- rgb[p + 2] = 255
- if x == x1i and y == y1i:
- break
- e2 = 2 * err
- if e2 >= dy:
- err += dy
- x += sx_step
- if e2 <= dx:
- err += dx
- y += sy_step
-
- for i0, i1, i2 in faces:
- draw_line(sx[i0], sy[i0], sx[i1], sy[i1])
- draw_line(sx[i1], sy[i1], sx[i2], sy[i2])
- draw_line(sx[i2], sy[i2], sx[i0], sy[i0])
-
- return rgb
-
-
-def cmd_list_models(args: argparse.Namespace) -> int:
- archive_path = Path(args.archive).resolve()
- blob = archive_path.read_bytes()
- parsed = _parse_nres(blob, str(archive_path))
- rows = [row for row in parsed["entries"] if str(row["name"]).lower().endswith(".msh")]
- print(f"Archive: {archive_path}")
- print(f"MSH entries: {len(rows)}")
- for row in rows:
- print(f"- {row['name']}")
- return 0
-
-
-def cmd_render(args: argparse.Namespace) -> int:
- archive_path = Path(args.archive).resolve()
- output_path = Path(args.output).resolve()
-
- model_blob, model_label = _pick_model_payload(archive_path, args.model)
- positions, faces, meta = _extract_geometry(
- model_blob,
- lod=int(args.lod),
- group=int(args.group),
- max_faces=int(args.max_faces),
- )
- rgb = _render_software(
- positions,
- faces,
- width=int(args.width),
- height=int(args.height),
- yaw_deg=float(args.yaw),
- pitch_deg=float(args.pitch),
- wireframe=bool(args.wireframe),
- )
- _write_ppm(output_path, int(args.width), int(args.height), rgb)
-
- print(f"Rendered model: {model_label}")
- print(f"Output : {output_path}")
- print(
- "Geometry : "
- f"vertices={meta['vertex_count']}, faces={meta['face_count']}, "
- f"batches={meta['used_batches']}/{meta['batch_count']}, slots={meta['used_slots']}/{meta['slot_count']}"
- )
- print(f"Mode : lod={args.lod}, group={args.group}, wireframe={bool(args.wireframe)}")
- return 0
-
-
-def build_parser() -> argparse.ArgumentParser:
- parser = argparse.ArgumentParser(
- description="Primitive NGI MSH renderer (software, dependency-free)."
- )
- sub = parser.add_subparsers(dest="command", required=True)
-
- list_models = sub.add_parser("list-models", help="List .msh entries in an NRes archive.")
- list_models.add_argument("--archive", required=True, help="Path to archive (e.g. animals.rlb).")
- list_models.set_defaults(func=cmd_list_models)
-
- render = sub.add_parser("render", help="Render one model to PPM image.")
- render.add_argument("--archive", required=True, help="Path to NRes archive or direct model payload.")
- render.add_argument(
- "--model",
- help="Model entry name (*.msh) inside archive. If omitted, first .msh is used.",
- )
- render.add_argument("--output", required=True, help="Output .ppm file path.")
- render.add_argument("--lod", type=int, default=0, help="LOD index 0..2 (default: 0).")
- render.add_argument("--group", type=int, default=0, help="Group index 0..4 (default: 0).")
- render.add_argument("--max-faces", type=int, default=120000, help="Face limit (default: 120000).")
- render.add_argument("--width", type=int, default=1280, help="Image width (default: 1280).")
- render.add_argument("--height", type=int, default=720, help="Image height (default: 720).")
- render.add_argument("--yaw", type=float, default=35.0, help="Yaw angle in degrees (default: 35).")
- render.add_argument("--pitch", type=float, default=18.0, help="Pitch angle in degrees (default: 18).")
- render.add_argument("--wireframe", action="store_true", help="Draw white wireframe overlay.")
- render.set_defaults(func=cmd_render)
-
- return parser
-
-
-def main() -> int:
- parser = build_parser()
- args = parser.parse_args()
- return int(args.func(args))
-
-
-if __name__ == "__main__":
- raise SystemExit(main())
diff --git a/tools/terrain_map_doc_validator.py b/tools/terrain_map_doc_validator.py
deleted file mode 100644
index 63c3077..0000000
--- a/tools/terrain_map_doc_validator.py
+++ /dev/null
@@ -1,809 +0,0 @@
-#!/usr/bin/env python3
-"""
-Validate terrain/map documentation assumptions against real game data.
-
-Targets:
-- tmp/gamedata/DATA/MAPS/**/Land.msh
-- tmp/gamedata/DATA/MAPS/**/Land.map
-"""
-
-from __future__ import annotations
-
-import argparse
-import json
-import math
-import struct
-from collections import Counter, defaultdict
-from dataclasses import dataclass
-from pathlib import Path
-from typing import Any
-
-import archive_roundtrip_validator as arv
-
-MAGIC_NRES = b"NRes"
-
-REQUIRED_MSH_TYPES = (1, 2, 3, 4, 5, 11, 18, 21)
-OPTIONAL_MSH_TYPES = (14,)
-EXPECTED_MSH_ORDER = (1, 2, 3, 4, 5, 18, 14, 11, 21)
-
-MSH_STRIDES = {
- 1: 38,
- 3: 12,
- 4: 4,
- 5: 4,
- 11: 4,
- 14: 4,
- 18: 4,
- 21: 28,
-}
-
-SLOT_TABLE_OFFSET = 0x8C
-
-
-@dataclass
-class ValidationIssue:
- severity: str # error | warning
- category: str
- resource: str
- message: str
-
-
-class TerrainMapDocValidator:
- def __init__(self) -> None:
- self.issues: list[ValidationIssue] = []
- self.stats: dict[str, Any] = {
- "maps_total": 0,
- "msh_total": 0,
- "map_total": 0,
- "msh_type_orders": Counter(),
- "msh_attr_triplets": defaultdict(Counter), # type_id -> Counter[(a1,a2,a3)]
- "msh_type11_header_words": Counter(),
- "msh_type21_flags_top": Counter(),
- "map_logic_flags": Counter(),
- "map_class_ids": Counter(), # record +40
- "map_poly_count": Counter(),
- "map_vertex_count_min": None,
- "map_vertex_count_max": None,
- "map_cell_dims": Counter(),
- "map_reserved_u12": Counter(),
- "map_reserved_u36": Counter(),
- "map_reserved_u44": Counter(),
- "map_area_delta_abs_max": 0.0,
- "map_area_delta_rel_max": 0.0,
- "map_area_rel_gt_05_count": 0,
- "map_normal_len_min": None,
- "map_normal_len_max": None,
- "map_records_total": 0,
- }
-
- def add_issue(self, severity: str, category: str, resource: Path, message: str) -> None:
- self.issues.append(
- ValidationIssue(
- severity=severity,
- category=category,
- resource=str(resource),
- message=message,
- )
- )
-
- def _entry_payload(self, blob: bytes, entry: dict[str, Any]) -> bytes:
- start = int(entry["data_offset"])
- end = start + int(entry["size"])
- return blob[start:end]
-
- def _entry_by_type(self, entries: list[dict[str, Any]]) -> dict[int, list[dict[str, Any]]]:
- by_type: dict[int, list[dict[str, Any]]] = {}
- for item in entries:
- by_type.setdefault(int(item["type_id"]), []).append(item)
- return by_type
-
- def _expect_single_type(
- self,
- *,
- by_type: dict[int, list[dict[str, Any]]],
- type_id: int,
- label: str,
- resource: Path,
- required: bool,
- ) -> dict[str, Any] | None:
- rows = by_type.get(type_id, [])
- if not rows:
- if required:
- self.add_issue(
- "error",
- "msh-chunk",
- resource,
- f"missing required chunk type={type_id} ({label})",
- )
- return None
- if len(rows) > 1:
- self.add_issue(
- "warning",
- "msh-chunk",
- resource,
- f"multiple chunks type={type_id} ({label}); using first",
- )
- return rows[0]
-
- def _check_stride(
- self,
- *,
- resource: Path,
- entry: dict[str, Any],
- stride: int,
- label: str,
- ) -> int:
- size = int(entry["size"])
- attr1 = int(entry["attr1"])
- attr2 = int(entry["attr2"])
- attr3 = int(entry["attr3"])
- self.stats["msh_attr_triplets"][int(entry["type_id"])][(attr1, attr2, attr3)] += 1
-
- if size % stride != 0:
- self.add_issue(
- "error",
- "msh-stride",
- resource,
- f"{label}: size={size} is not divisible by stride={stride}",
- )
- return -1
-
- count = size // stride
- if attr1 != count:
- self.add_issue(
- "error",
- "msh-attr",
- resource,
- f"{label}: attr1={attr1} != size/stride={count}",
- )
- if attr3 != stride:
- self.add_issue(
- "error",
- "msh-attr",
- resource,
- f"{label}: attr3={attr3} != {stride}",
- )
- if attr2 != 0 and int(entry["type_id"]) not in (1,):
- # type 1 has non-zero attr2 in real assets, others are expected zero.
- self.add_issue(
- "warning",
- "msh-attr",
- resource,
- f"{label}: attr2={attr2} (expected 0 for this chunk type)",
- )
- return count
-
- def validate_msh(self, path: Path) -> None:
- self.stats["msh_total"] += 1
- blob = path.read_bytes()
- if blob[:4] != MAGIC_NRES:
- self.add_issue("error", "msh-container", path, "file is not NRes")
- return
-
- try:
- parsed = arv.parse_nres(blob, source=str(path))
- except Exception as exc: # pylint: disable=broad-except
- self.add_issue("error", "msh-container", path, f"failed to parse NRes: {exc}")
- return
-
- for issue in parsed.get("issues", []):
- self.add_issue("warning", "msh-nres", path, issue)
-
- entries = parsed["entries"]
- types_order = tuple(int(item["type_id"]) for item in entries)
- self.stats["msh_type_orders"][types_order] += 1
- if types_order != EXPECTED_MSH_ORDER:
- self.add_issue(
- "warning",
- "msh-order",
- path,
- f"unexpected chunk order {types_order}, expected {EXPECTED_MSH_ORDER}",
- )
-
- by_type = self._entry_by_type(entries)
-
- chunks: dict[int, dict[str, Any]] = {}
- for type_id in REQUIRED_MSH_TYPES:
- chunk = self._expect_single_type(
- by_type=by_type,
- type_id=type_id,
- label=f"type{type_id}",
- resource=path,
- required=True,
- )
- if chunk:
- chunks[type_id] = chunk
- for type_id in OPTIONAL_MSH_TYPES:
- chunk = self._expect_single_type(
- by_type=by_type,
- type_id=type_id,
- label=f"type{type_id}",
- resource=path,
- required=False,
- )
- if chunk:
- chunks[type_id] = chunk
-
- for type_id, stride in MSH_STRIDES.items():
- chunk = chunks.get(type_id)
- if not chunk:
- continue
- self._check_stride(resource=path, entry=chunk, stride=stride, label=f"type{type_id}")
-
- # type 2 includes 0x8C-byte header + 68-byte slot table entries.
- type2 = chunks.get(2)
- if type2:
- size = int(type2["size"])
- attr1 = int(type2["attr1"])
- attr2 = int(type2["attr2"])
- attr3 = int(type2["attr3"])
- self.stats["msh_attr_triplets"][2][(attr1, attr2, attr3)] += 1
- if attr3 != 68:
- self.add_issue(
- "error",
- "msh-attr",
- path,
- f"type2: attr3={attr3} != 68",
- )
- if attr2 != 0:
- self.add_issue(
- "warning",
- "msh-attr",
- path,
- f"type2: attr2={attr2} (expected 0)",
- )
- if size < SLOT_TABLE_OFFSET:
- self.add_issue(
- "error",
- "msh-size",
- path,
- f"type2: size={size} < header_size={SLOT_TABLE_OFFSET}",
- )
- elif (size - SLOT_TABLE_OFFSET) % 68 != 0:
- self.add_issue(
- "error",
- "msh-size",
- path,
- f"type2: (size - 0x8C) is not divisible by 68 (size={size})",
- )
- else:
- slots_by_size = (size - SLOT_TABLE_OFFSET) // 68
- if attr1 != slots_by_size:
- self.add_issue(
- "error",
- "msh-attr",
- path,
- f"type2: attr1={attr1} != (size-0x8C)/68={slots_by_size}",
- )
-
- verts = chunks.get(3)
- face = chunks.get(21)
- slots = chunks.get(2)
- nodes = chunks.get(1)
- type11 = chunks.get(11)
-
- if verts and face:
- vcount = int(verts["attr1"])
- face_payload = self._entry_payload(blob, face)
- fcount = int(face["attr1"])
- if len(face_payload) >= 28:
- for idx in range(fcount):
- off = idx * 28
- if off + 28 > len(face_payload):
- self.add_issue(
- "error",
- "msh-face",
- path,
- f"type21 truncated at face {idx}",
- )
- break
- flags = struct.unpack_from("<I", face_payload, off)[0]
- self.stats["msh_type21_flags_top"][flags] += 1
- i0, i1, i2 = struct.unpack_from("<HHH", face_payload, off + 8)
- for name, value in (("i0", i0), ("i1", i1), ("i2", i2)):
- if value >= vcount:
- self.add_issue(
- "error",
- "msh-face-index",
- path,
- f"type21[{idx}].{name}={value} out of range vertex_count={vcount}",
- )
- n0, n1, n2 = struct.unpack_from("<HHH", face_payload, off + 14)
- for name, value in (("n0", n0), ("n1", n1), ("n2", n2)):
- if value != 0xFFFF and value >= fcount:
- self.add_issue(
- "error",
- "msh-face-neighbour",
- path,
- f"type21[{idx}].{name}={value} out of range face_count={fcount}",
- )
-
- if slots and face:
- slot_count = int(slots["attr1"])
- face_count = int(face["attr1"])
- slot_payload = self._entry_payload(blob, slots)
- need = SLOT_TABLE_OFFSET + slot_count * 68
- if len(slot_payload) < need:
- self.add_issue(
- "error",
- "msh-slot",
- path,
- f"type2 payload too short: size={len(slot_payload)}, need_at_least={need}",
- )
- else:
- if len(slot_payload) != need:
- self.add_issue(
- "warning",
- "msh-slot",
- path,
- f"type2 payload has trailing bytes: size={len(slot_payload)}, expected={need}",
- )
- for idx in range(slot_count):
- off = SLOT_TABLE_OFFSET + idx * 68
- tri_start, tri_count = struct.unpack_from("<HH", slot_payload, off)
- if tri_start + tri_count > face_count:
- self.add_issue(
- "error",
- "msh-slot-range",
- path,
- f"type2 slot[{idx}] range [{tri_start}, {tri_start + tri_count}) exceeds face_count={face_count}",
- )
-
- if nodes and slots:
- node_payload = self._entry_payload(blob, nodes)
- slot_count = int(slots["attr1"])
- node_count = int(nodes["attr1"])
- for node_idx in range(node_count):
- off = node_idx * 38
- if off + 38 > len(node_payload):
- self.add_issue(
- "error",
- "msh-node",
- path,
- f"type1 truncated at node {node_idx}",
- )
- break
- for j in range(19):
- slot_id = struct.unpack_from("<H", node_payload, off + j * 2)[0]
- if slot_id != 0xFFFF and slot_id >= slot_count:
- self.add_issue(
- "error",
- "msh-node-slot",
- path,
- f"type1 node[{node_idx}] slot[{j}]={slot_id} out of range slot_count={slot_count}",
- )
-
- if type11:
- payload = self._entry_payload(blob, type11)
- if len(payload) >= 8:
- w0, w1 = struct.unpack_from("<II", payload, 0)
- self.stats["msh_type11_header_words"][(w0, w1)] += 1
- else:
- self.add_issue(
- "error",
- "msh-type11",
- path,
- f"type11 payload too short: {len(payload)}",
- )
-
- def _update_minmax(self, key_min: str, key_max: str, value: float) -> None:
- if self.stats[key_min] is None or value < self.stats[key_min]:
- self.stats[key_min] = value
- if self.stats[key_max] is None or value > self.stats[key_max]:
- self.stats[key_max] = value
-
- def validate_map(self, path: Path) -> None:
- self.stats["map_total"] += 1
- blob = path.read_bytes()
- if blob[:4] != MAGIC_NRES:
- self.add_issue("error", "map-container", path, "file is not NRes")
- return
-
- try:
- parsed = arv.parse_nres(blob, source=str(path))
- except Exception as exc: # pylint: disable=broad-except
- self.add_issue("error", "map-container", path, f"failed to parse NRes: {exc}")
- return
-
- for issue in parsed.get("issues", []):
- self.add_issue("warning", "map-nres", path, issue)
-
- entries = parsed["entries"]
- if len(entries) != 1 or int(entries[0]["type_id"]) != 12:
- self.add_issue(
- "error",
- "map-chunk",
- path,
- f"expected single chunk type=12, got {[int(e['type_id']) for e in entries]}",
- )
- return
-
- entry = entries[0]
- areal_count = int(entry["attr1"])
- if areal_count <= 0:
- self.add_issue("error", "map-areal", path, f"invalid areal_count={areal_count}")
- return
-
- payload = self._entry_payload(blob, entry)
- ptr = 0
- records: list[dict[str, Any]] = []
-
- for idx in range(areal_count):
- if ptr + 56 > len(payload):
- self.add_issue(
- "error",
- "map-record",
- path,
- f"truncated areal header at index={idx}, ptr={ptr}, size={len(payload)}",
- )
- return
-
- anchor_x, anchor_y, anchor_z = struct.unpack_from("<fff", payload, ptr)
- u12 = struct.unpack_from("<I", payload, ptr + 12)[0]
- area_f = struct.unpack_from("<f", payload, ptr + 16)[0]
- nx, ny, nz = struct.unpack_from("<fff", payload, ptr + 20)
- logic_flag = struct.unpack_from("<I", payload, ptr + 32)[0]
- u36 = struct.unpack_from("<I", payload, ptr + 36)[0]
- class_id = struct.unpack_from("<I", payload, ptr + 40)[0]
- u44 = struct.unpack_from("<I", payload, ptr + 44)[0]
- vertex_count, poly_count = struct.unpack_from("<II", payload, ptr + 48)
-
- self.stats["map_records_total"] += 1
- self.stats["map_logic_flags"][logic_flag] += 1
- self.stats["map_class_ids"][class_id] += 1
- self.stats["map_poly_count"][poly_count] += 1
- self.stats["map_reserved_u12"][u12] += 1
- self.stats["map_reserved_u36"][u36] += 1
- self.stats["map_reserved_u44"][u44] += 1
- self._update_minmax("map_vertex_count_min", "map_vertex_count_max", float(vertex_count))
-
- normal_len = math.sqrt(nx * nx + ny * ny + nz * nz)
- self._update_minmax("map_normal_len_min", "map_normal_len_max", normal_len)
- if abs(normal_len - 1.0) > 1e-3:
- self.add_issue(
- "warning",
- "map-normal",
- path,
- f"record[{idx}] normal length={normal_len:.6f} (expected ~1.0)",
- )
-
- vertices_off = ptr + 56
- vertices_size = 12 * vertex_count
- if vertices_off + vertices_size > len(payload):
- self.add_issue(
- "error",
- "map-vertices",
- path,
- f"record[{idx}] vertices out of bounds",
- )
- return
-
- vertices: list[tuple[float, float, float]] = []
- for i in range(vertex_count):
- vertices.append(struct.unpack_from("<fff", payload, vertices_off + i * 12))
-
- if vertex_count >= 3:
- # signed shoelace area in XY.
- shoelace = 0.0
- for i in range(vertex_count):
- x1, y1, _ = vertices[i]
- x2, y2, _ = vertices[(i + 1) % vertex_count]
- shoelace += x1 * y2 - x2 * y1
- area_xy = abs(shoelace) * 0.5
- delta = abs(area_xy - area_f)
- if delta > self.stats["map_area_delta_abs_max"]:
- self.stats["map_area_delta_abs_max"] = delta
- rel_delta = delta / max(1.0, area_xy)
- if rel_delta > self.stats["map_area_delta_rel_max"]:
- self.stats["map_area_delta_rel_max"] = rel_delta
- if rel_delta > 0.05:
- self.stats["map_area_rel_gt_05_count"] += 1
-
- links_off = vertices_off + vertices_size
- link_count = vertex_count + 3 * poly_count
- links_size = 8 * link_count
- if links_off + links_size > len(payload):
- self.add_issue(
- "error",
- "map-links",
- path,
- f"record[{idx}] link table out of bounds",
- )
- return
-
- edge_links: list[tuple[int, int]] = []
- for i in range(vertex_count):
- area_ref, edge_ref = struct.unpack_from("<ii", payload, links_off + i * 8)
- edge_links.append((area_ref, edge_ref))
-
- poly_links_off = links_off + 8 * vertex_count
- poly_links: list[tuple[int, int]] = []
- for i in range(3 * poly_count):
- area_ref, edge_ref = struct.unpack_from("<ii", payload, poly_links_off + i * 8)
- poly_links.append((area_ref, edge_ref))
-
- p = links_off + links_size
- for poly_idx in range(poly_count):
- if p + 4 > len(payload):
- self.add_issue(
- "error",
- "map-poly",
- path,
- f"record[{idx}] poly header truncated at poly_idx={poly_idx}",
- )
- return
- n = struct.unpack_from("<I", payload, p)[0]
- poly_size = 4 * (3 * n + 1)
- if p + poly_size > len(payload):
- self.add_issue(
- "error",
- "map-poly",
- path,
- f"record[{idx}] poly data out of bounds at poly_idx={poly_idx}",
- )
- return
- p += poly_size
-
- records.append(
- {
- "index": idx,
- "anchor": (anchor_x, anchor_y, anchor_z),
- "logic": logic_flag,
- "class_id": class_id,
- "vertex_count": vertex_count,
- "poly_count": poly_count,
- "edge_links": edge_links,
- "poly_links": poly_links,
- }
- )
- ptr = p
-
- vertex_counts = [int(item["vertex_count"]) for item in records]
- for rec in records:
- idx = int(rec["index"])
- for link_idx, (area_ref, edge_ref) in enumerate(rec["edge_links"]):
- if area_ref == -1:
- if edge_ref != -1:
- self.add_issue(
- "warning",
- "map-link",
- path,
- f"record[{idx}] edge_link[{link_idx}] has area_ref=-1 but edge_ref={edge_ref}",
- )
- continue
- if area_ref < 0 or area_ref >= areal_count:
- self.add_issue(
- "error",
- "map-link",
- path,
- f"record[{idx}] edge_link[{link_idx}] area_ref={area_ref} out of range",
- )
- continue
- dst_vcount = vertex_counts[area_ref]
- if edge_ref < 0 or edge_ref >= dst_vcount:
- self.add_issue(
- "error",
- "map-link",
- path,
- f"record[{idx}] edge_link[{link_idx}] edge_ref={edge_ref} out of range dst_vertex_count={dst_vcount}",
- )
-
- for link_idx, (area_ref, edge_ref) in enumerate(rec["poly_links"]):
- if area_ref == -1:
- if edge_ref != -1:
- self.add_issue(
- "warning",
- "map-poly-link",
- path,
- f"record[{idx}] poly_link[{link_idx}] has area_ref=-1 but edge_ref={edge_ref}",
- )
- continue
- if area_ref < 0 or area_ref >= areal_count:
- self.add_issue(
- "error",
- "map-poly-link",
- path,
- f"record[{idx}] poly_link[{link_idx}] area_ref={area_ref} out of range",
- )
-
- if ptr + 8 > len(payload):
- self.add_issue(
- "error",
- "map-cells",
- path,
- f"missing cells header at ptr={ptr}, size={len(payload)}",
- )
- return
-
- cells_x, cells_y = struct.unpack_from("<II", payload, ptr)
- self.stats["map_cell_dims"][(cells_x, cells_y)] += 1
- ptr += 8
- if cells_x <= 0 or cells_y <= 0:
- self.add_issue(
- "error",
- "map-cells",
- path,
- f"invalid cells dimensions {cells_x}x{cells_y}",
- )
- return
-
- for x in range(cells_x):
- for y in range(cells_y):
- if ptr + 2 > len(payload):
- self.add_issue(
- "error",
- "map-cells",
- path,
- f"truncated hitCount at cell ({x},{y})",
- )
- return
- hit_count = struct.unpack_from("<H", payload, ptr)[0]
- ptr += 2
- need = 2 * hit_count
- if ptr + need > len(payload):
- self.add_issue(
- "error",
- "map-cells",
- path,
- f"truncated areaIds at cell ({x},{y}), hitCount={hit_count}",
- )
- return
- for i in range(hit_count):
- area_id = struct.unpack_from("<H", payload, ptr + 2 * i)[0]
- if area_id >= areal_count:
- self.add_issue(
- "error",
- "map-cells",
- path,
- f"cell ({x},{y}) has area_id={area_id} out of range areal_count={areal_count}",
- )
- ptr += need
-
- if ptr != len(payload):
- self.add_issue(
- "error",
- "map-size",
- path,
- f"payload tail mismatch: consumed={ptr}, payload_size={len(payload)}",
- )
-
- def validate(self, maps_root: Path) -> None:
- msh_paths = sorted(maps_root.rglob("Land.msh"))
- map_paths = sorted(maps_root.rglob("Land.map"))
-
- msh_by_dir = {path.parent: path for path in msh_paths}
- map_by_dir = {path.parent: path for path in map_paths}
-
- all_dirs = sorted(set(msh_by_dir) | set(map_by_dir))
- self.stats["maps_total"] = len(all_dirs)
-
- for folder in all_dirs:
- msh_path = msh_by_dir.get(folder)
- map_path = map_by_dir.get(folder)
- if msh_path is None:
- self.add_issue("error", "pairing", folder, "missing Land.msh")
- continue
- if map_path is None:
- self.add_issue("error", "pairing", folder, "missing Land.map")
- continue
- self.validate_msh(msh_path)
- self.validate_map(map_path)
-
- def build_report(self) -> dict[str, Any]:
- errors = [i for i in self.issues if i.severity == "error"]
- warnings = [i for i in self.issues if i.severity == "warning"]
-
- # Convert counters/defaultdicts to JSON-friendly dicts.
- msh_orders = {
- str(list(order)): count
- for order, count in self.stats["msh_type_orders"].most_common()
- }
- msh_attrs = {
- str(type_id): {str(list(k)): v for k, v in counter.most_common()}
- for type_id, counter in self.stats["msh_attr_triplets"].items()
- }
- type11_hdr = {
- str(list(key)): value
- for key, value in self.stats["msh_type11_header_words"].most_common()
- }
- type21_flags = {
- f"0x{key:08X}": value
- for key, value in self.stats["msh_type21_flags_top"].most_common(32)
- }
-
- return {
- "summary": {
- "maps_total": self.stats["maps_total"],
- "msh_total": self.stats["msh_total"],
- "map_total": self.stats["map_total"],
- "issues_total": len(self.issues),
- "errors_total": len(errors),
- "warnings_total": len(warnings),
- },
- "stats": {
- "msh_type_orders": msh_orders,
- "msh_attr_triplets": msh_attrs,
- "msh_type11_header_words": type11_hdr,
- "msh_type21_flags_top": type21_flags,
- "map_logic_flags": dict(self.stats["map_logic_flags"]),
- "map_class_ids": dict(self.stats["map_class_ids"]),
- "map_poly_count": dict(self.stats["map_poly_count"]),
- "map_vertex_count_min": self.stats["map_vertex_count_min"],
- "map_vertex_count_max": self.stats["map_vertex_count_max"],
- "map_cell_dims": {str(list(k)): v for k, v in self.stats["map_cell_dims"].items()},
- "map_reserved_u12": dict(self.stats["map_reserved_u12"]),
- "map_reserved_u36": dict(self.stats["map_reserved_u36"]),
- "map_reserved_u44": dict(self.stats["map_reserved_u44"]),
- "map_area_delta_abs_max": self.stats["map_area_delta_abs_max"],
- "map_area_delta_rel_max": self.stats["map_area_delta_rel_max"],
- "map_area_rel_gt_05_count": self.stats["map_area_rel_gt_05_count"],
- "map_normal_len_min": self.stats["map_normal_len_min"],
- "map_normal_len_max": self.stats["map_normal_len_max"],
- "map_records_total": self.stats["map_records_total"],
- },
- "issues": [
- {
- "severity": item.severity,
- "category": item.category,
- "resource": item.resource,
- "message": item.message,
- }
- for item in self.issues
- ],
- }
-
-
-def parse_args() -> argparse.Namespace:
- parser = argparse.ArgumentParser(description="Validate terrain/map doc assumptions")
- parser.add_argument(
- "--maps-root",
- type=Path,
- default=Path("tmp/gamedata/DATA/MAPS"),
- help="Root directory containing MAPS/**/Land.msh and Land.map",
- )
- parser.add_argument(
- "--report-json",
- type=Path,
- default=None,
- help="Optional path to save full JSON report",
- )
- parser.add_argument(
- "--fail-on-warning",
- action="store_true",
- help="Return non-zero exit code on warnings too",
- )
- return parser.parse_args()
-
-
-def main() -> int:
- args = parse_args()
- validator = TerrainMapDocValidator()
- validator.validate(args.maps_root)
- report = validator.build_report()
-
- print(
- json.dumps(
- report["summary"],
- indent=2,
- ensure_ascii=False,
- )
- )
-
- if args.report_json:
- args.report_json.parent.mkdir(parents=True, exist_ok=True)
- with args.report_json.open("w", encoding="utf-8") as handle:
- json.dump(report, handle, indent=2, ensure_ascii=False)
- handle.write("\n")
- print(f"report written: {args.report_json}")
-
- has_errors = report["summary"]["errors_total"] > 0
- has_warnings = report["summary"]["warnings_total"] > 0
- if has_errors:
- return 1
- if args.fail_on_warning and has_warnings:
- return 1
- return 0
-
-
-if __name__ == "__main__":
- raise SystemExit(main())
diff --git a/tools/terrain_map_preview_renderer.py b/tools/terrain_map_preview_renderer.py
deleted file mode 100644
index 86d72d7..0000000
--- a/tools/terrain_map_preview_renderer.py
+++ /dev/null
@@ -1,679 +0,0 @@
-#!/usr/bin/env python3
-"""
-Software 3D renderer for terrain Land.msh + Land.map overlay.
-
-Output format: binary PPM (P6), dependency-free.
-"""
-
-from __future__ import annotations
-
-import argparse
-import math
-import struct
-from pathlib import Path
-from typing import Any
-
-import archive_roundtrip_validator as arv
-
-MAGIC_NRES = b"NRes"
-
-
-def _entry_payload(blob: bytes, entry: dict[str, Any]) -> bytes:
- start = int(entry["data_offset"])
- end = start + int(entry["size"])
- return blob[start:end]
-
-
-def _parse_nres(blob: bytes, source: str) -> dict[str, Any]:
- if blob[:4] != MAGIC_NRES:
- raise RuntimeError(f"{source}: not an NRes payload")
- return arv.parse_nres(blob, source=source)
-
-
-def _by_type(entries: list[dict[str, Any]]) -> dict[int, list[dict[str, Any]]]:
- out: dict[int, list[dict[str, Any]]] = {}
- for row in entries:
- out.setdefault(int(row["type_id"]), []).append(row)
- return out
-
-
-def _get_single(by_type: dict[int, list[dict[str, Any]]], type_id: int, label: str) -> dict[str, Any]:
- rows = by_type.get(type_id, [])
- if not rows:
- raise RuntimeError(f"missing resource type {type_id} ({label})")
- return rows[0]
-
-
-def _downsample_faces(
- faces: list[tuple[int, int, int]],
- max_faces: int,
-) -> list[tuple[int, int, int]]:
- if max_faces <= 0 or len(faces) <= max_faces:
- return faces
- step = len(faces) / max_faces
- out: list[tuple[int, int, int]] = []
- pos = 0.0
- while len(out) < max_faces and int(pos) < len(faces):
- out.append(faces[int(pos)])
- pos += step
- return out
-
-
-def load_terrain_msh(
- path: Path,
- *,
- max_faces: int,
-) -> tuple[list[tuple[float, float, float]], list[tuple[int, int, int]], dict[str, int]]:
- blob = path.read_bytes()
- parsed = _parse_nres(blob, str(path))
- by_type = _by_type(parsed["entries"])
-
- res3 = _get_single(by_type, 3, "positions")
- res21 = _get_single(by_type, 21, "terrain faces")
-
- pos_blob = _entry_payload(blob, res3)
- if len(pos_blob) % 12 != 0:
- raise RuntimeError(f"{path}: type 3 payload size is not divisible by 12")
- vertex_count = len(pos_blob) // 12
- positions = [struct.unpack_from("<3f", pos_blob, i * 12) for i in range(vertex_count)]
-
- face_blob = _entry_payload(blob, res21)
- if len(face_blob) % 28 != 0:
- raise RuntimeError(f"{path}: type 21 payload size is not divisible by 28")
- all_faces: list[tuple[int, int, int]] = []
- raw_face_count = len(face_blob) // 28
- dropped = 0
- for i in range(raw_face_count):
- off = i * 28
- i0, i1, i2 = struct.unpack_from("<HHH", face_blob, off + 8)
- if i0 >= vertex_count or i1 >= vertex_count or i2 >= vertex_count:
- dropped += 1
- continue
- all_faces.append((i0, i1, i2))
-
- faces = _downsample_faces(all_faces, max_faces)
- meta = {
- "vertex_count": vertex_count,
- "face_count_raw": raw_face_count,
- "face_count_valid": len(all_faces),
- "face_count_rendered": len(faces),
- "face_dropped_invalid": dropped,
- }
- return positions, faces, meta
-
-
-def load_areal_map(path: Path) -> tuple[list[dict[str, Any]], dict[str, int]]:
- blob = path.read_bytes()
- parsed = _parse_nres(blob, str(path))
- by_type = _by_type(parsed["entries"])
- chunk = _get_single(by_type, 12, "ArealMapGeometry")
-
- payload = _entry_payload(blob, chunk)
- areal_count = int(chunk["attr1"])
- ptr = 0
- areals: list[dict[str, Any]] = []
- for idx in range(areal_count):
- if ptr + 56 > len(payload):
- raise RuntimeError(f"{path}: truncated areal header at index={idx}")
- class_id = struct.unpack_from("<I", payload, ptr + 40)[0]
- vertex_count, poly_count = struct.unpack_from("<II", payload, ptr + 48)
- verts_off = ptr + 56
- verts_size = 12 * vertex_count
- if verts_off + verts_size > len(payload):
- raise RuntimeError(f"{path}: areal[{idx}] vertices out of bounds")
- verts = [struct.unpack_from("<3f", payload, verts_off + 12 * i) for i in range(vertex_count)]
-
- links_off = verts_off + verts_size
- links_size = 8 * (vertex_count + 3 * poly_count)
- p = links_off + links_size
- for _ in range(poly_count):
- if p + 4 > len(payload):
- raise RuntimeError(f"{path}: areal[{idx}] poly header out of bounds")
- n = struct.unpack_from("<I", payload, p)[0]
- p += 4 * (3 * n + 1)
- if p > len(payload):
- raise RuntimeError(f"{path}: areal[{idx}] poly data out of bounds")
-
- areals.append(
- {
- "index": idx,
- "class_id": class_id,
- "vertices": verts,
- }
- )
- ptr = p
-
- if ptr + 8 > len(payload):
- raise RuntimeError(f"{path}: missing cells section")
- cells_x, cells_y = struct.unpack_from("<II", payload, ptr)
- ptr += 8
- for _x in range(cells_x):
- for _y in range(cells_y):
- if ptr + 2 > len(payload):
- raise RuntimeError(f"{path}: cells section truncated")
- hit_count = struct.unpack_from("<H", payload, ptr)[0]
- ptr += 2 + 2 * hit_count
- if ptr > len(payload):
- raise RuntimeError(f"{path}: cells section out of bounds")
- if ptr != len(payload):
- raise RuntimeError(f"{path}: trailing bytes in chunk12 parse ({len(payload) - ptr})")
-
- meta = {
- "areal_count": areal_count,
- "cells_x": cells_x,
- "cells_y": cells_y,
- }
- return areals, meta
-
-
-def _color_for_class(class_id: int) -> tuple[int, int, int]:
- x = (class_id * 1103515245 + 12345) & 0x7FFFFFFF
- r = 60 + (x & 0x7F)
- g = 60 + ((x >> 7) & 0x7F)
- b = 60 + ((x >> 14) & 0x7F)
- return r, g, b
-
-
-def _write_ppm(path: Path, width: int, height: int, rgb: bytearray) -> None:
- path.parent.mkdir(parents=True, exist_ok=True)
- with path.open("wb") as handle:
- handle.write(f"P6\n{width} {height}\n255\n".encode("ascii"))
- handle.write(rgb)
-
-
-def _write_obj(
- path: Path,
- terrain_positions: list[tuple[float, float, float]],
- terrain_faces: list[tuple[int, int, int]],
- areals: list[dict[str, Any]],
- *,
- include_areals: bool,
-) -> None:
- path.parent.mkdir(parents=True, exist_ok=True)
- with path.open("w", encoding="utf-8", newline="\n") as out:
- out.write("# Exported by terrain_map_preview_renderer.py\n")
- out.write("o terrain\n")
- for x, y, z in terrain_positions:
- out.write(f"v {x:.9g} {y:.9g} {z:.9g}\n")
- for i0, i1, i2 in terrain_faces:
- # OBJ indices are 1-based.
- out.write(f"f {i0 + 1} {i1 + 1} {i2 + 1}\n")
-
- if include_areals and areals:
- base = len(terrain_positions)
- area_vertex_counts: list[int] = []
- out.write("o areal_edges\n")
- for area in areals:
- verts = area["vertices"]
- area_vertex_counts.append(len(verts))
- for x, y, z in verts:
- out.write(f"v {x:.9g} {y:.9g} {z:.9g}\n")
-
- ptr = base
- for area_idx, area in enumerate(areals):
- cnt = area_vertex_counts[area_idx]
- if cnt < 2:
- ptr += cnt
- continue
- # closed polyline.
- line = [str(ptr + i + 1) for i in range(cnt)]
- line.append(str(ptr + 1))
- out.write("l " + " ".join(line) + "\n")
- ptr += cnt
-
-
-def _render_scene(
- terrain_positions: list[tuple[float, float, float]],
- terrain_faces: list[tuple[int, int, int]],
- areals: list[dict[str, Any]],
- *,
- width: int,
- height: int,
- yaw_deg: float,
- pitch_deg: float,
- wireframe: bool,
- areal_overlay: bool,
-) -> bytearray:
- all_positions = list(terrain_positions)
- if areal_overlay:
- for area in areals:
- all_positions.extend(area["vertices"])
- if not all_positions:
- raise RuntimeError("scene is empty")
-
- xs = [p[0] for p in all_positions]
- ys = [p[1] for p in all_positions]
- zs = [p[2] for p in all_positions]
- cx = (min(xs) + max(xs)) * 0.5
- cy = (min(ys) + max(ys)) * 0.5
- cz = (min(zs) + max(zs)) * 0.5
- span = max(max(xs) - min(xs), max(ys) - min(ys), max(zs) - min(zs))
- radius = max(span * 0.5, 1e-3)
-
- yaw = math.radians(yaw_deg)
- pitch = math.radians(pitch_deg)
- cyaw = math.cos(yaw)
- syaw = math.sin(yaw)
- cpitch = math.cos(pitch)
- spitch = math.sin(pitch)
- camera_dist = radius * 3.2
- scale = min(width, height) * 0.96
-
- # Terrain transform cache.
- vx: list[float] = []
- vy: list[float] = []
- vz: list[float] = []
- sx: list[float] = []
- sy: list[float] = []
- for x, y, z in terrain_positions:
- x0 = x - cx
- y0 = y - cy
- z0 = z - cz
- x1 = cyaw * x0 + syaw * z0
- z1 = -syaw * x0 + cyaw * z0
- y2 = cpitch * y0 - spitch * z1
- z2 = spitch * y0 + cpitch * z1 + camera_dist
- if z2 < 1e-3:
- z2 = 1e-3
- vx.append(x1)
- vy.append(y2)
- vz.append(z2)
- sx.append(width * 0.5 + (x1 / z2) * scale)
- sy.append(height * 0.5 - (y2 / z2) * scale)
-
- def project_point(x: float, y: float, z: float) -> tuple[float, float, float]:
- x0 = x - cx
- y0 = y - cy
- z0 = z - cz
- x1 = cyaw * x0 + syaw * z0
- z1 = -syaw * x0 + cyaw * z0
- y2 = cpitch * y0 - spitch * z1
- z2 = spitch * y0 + cpitch * z1 + camera_dist
- if z2 < 1e-3:
- z2 = 1e-3
- px = width * 0.5 + (x1 / z2) * scale
- py = height * 0.5 - (y2 / z2) * scale
- return px, py, z2
-
- rgb = bytearray([14, 16, 20] * (width * height))
- zbuf = [float("inf")] * (width * height)
- light_dir = (0.35, 0.45, 1.0)
- l_len = math.sqrt(light_dir[0] ** 2 + light_dir[1] ** 2 + light_dir[2] ** 2)
- light = (light_dir[0] / l_len, light_dir[1] / l_len, light_dir[2] / l_len)
-
- def edge(ax: float, ay: float, bx: float, by: float, px: float, py: float) -> float:
- return (px - ax) * (by - ay) - (py - ay) * (bx - ax)
-
- for i0, i1, i2 in terrain_faces:
- x0 = sx[i0]
- y0 = sy[i0]
- x1 = sx[i1]
- y1 = sy[i1]
- x2 = sx[i2]
- y2 = sy[i2]
- area = edge(x0, y0, x1, y1, x2, y2)
- if area == 0.0:
- continue
-
- ux = vx[i1] - vx[i0]
- uy = vy[i1] - vy[i0]
- uz = vz[i1] - vz[i0]
- wx = vx[i2] - vx[i0]
- wy = vy[i2] - vy[i0]
- wz = vz[i2] - vz[i0]
- nx = uy * wz - uz * wy
- ny = uz * wx - ux * wz
- nz = ux * wy - uy * wx
- n_len = math.sqrt(nx * nx + ny * ny + nz * nz)
- if n_len > 0.0:
- nx /= n_len
- ny /= n_len
- nz /= n_len
- intensity = nx * light[0] + ny * light[1] + nz * light[2]
- if intensity < 0.0:
- intensity = 0.0
- shade = int(45 + 185 * intensity)
- color = (min(255, shade + 6), min(255, shade + 14), min(255, shade + 28))
-
- minx = int(max(0, math.floor(min(x0, x1, x2))))
- maxx = int(min(width - 1, math.ceil(max(x0, x1, x2))))
- miny = int(max(0, math.floor(min(y0, y1, y2))))
- maxy = int(min(height - 1, math.ceil(max(y0, y1, y2))))
- if minx > maxx or miny > maxy:
- continue
-
- z0 = vz[i0]
- z1 = vz[i1]
- z2 = vz[i2]
- inv_area = 1.0 / area
- for py in range(miny, maxy + 1):
- fy = py + 0.5
- row = py * width
- for px in range(minx, maxx + 1):
- fx = px + 0.5
- w0 = edge(x1, y1, x2, y2, fx, fy)
- w1 = edge(x2, y2, x0, y0, fx, fy)
- w2 = edge(x0, y0, x1, y1, fx, fy)
- if area > 0:
- if w0 < 0 or w1 < 0 or w2 < 0:
- continue
- else:
- if w0 > 0 or w1 > 0 or w2 > 0:
- continue
- bz0 = w0 * inv_area
- bz1 = w1 * inv_area
- bz2 = w2 * inv_area
- depth = bz0 * z0 + bz1 * z1 + bz2 * z2
- idx = row + px
- if depth >= zbuf[idx]:
- continue
- zbuf[idx] = depth
- p = idx * 3
- rgb[p + 0] = color[0]
- rgb[p + 1] = color[1]
- rgb[p + 2] = color[2]
-
- def draw_line(
- xa: float,
- ya: float,
- xb: float,
- yb: float,
- color: tuple[int, int, int],
- ) -> None:
- x0i = int(round(xa))
- y0i = int(round(ya))
- x1i = int(round(xb))
- y1i = int(round(yb))
- dx = abs(x1i - x0i)
- sx_step = 1 if x0i < x1i else -1
- dy = -abs(y1i - y0i)
- sy_step = 1 if y0i < y1i else -1
- err = dx + dy
- x = x0i
- y = y0i
- while True:
- if 0 <= x < width and 0 <= y < height:
- p = (y * width + x) * 3
- rgb[p + 0] = color[0]
- rgb[p + 1] = color[1]
- rgb[p + 2] = color[2]
- if x == x1i and y == y1i:
- break
- e2 = 2 * err
- if e2 >= dy:
- err += dy
- x += sx_step
- if e2 <= dx:
- err += dx
- y += sy_step
-
- if wireframe:
- wf = (225, 232, 246)
- for i0, i1, i2 in terrain_faces:
- draw_line(sx[i0], sy[i0], sx[i1], sy[i1], wf)
- draw_line(sx[i1], sy[i1], sx[i2], sy[i2], wf)
- draw_line(sx[i2], sy[i2], sx[i0], sy[i0], wf)
-
- if areal_overlay:
- for area in areals:
- verts = area["vertices"]
- if len(verts) < 2:
- continue
- color = _color_for_class(int(area["class_id"]))
- projected = [project_point(x, y, z + 0.35) for x, y, z in verts]
- for i in range(len(projected)):
- x0, y0, _ = projected[i]
- x1, y1, _ = projected[(i + 1) % len(projected)]
- draw_line(x0, y0, x1, y1, color)
-
- return rgb
-
-
-def cmd_render(args: argparse.Namespace) -> int:
- msh_path = Path(args.land_msh).resolve()
- map_path = Path(args.land_map).resolve() if args.land_map else None
- output_path = Path(args.output).resolve()
-
- positions, faces, terrain_meta = load_terrain_msh(msh_path, max_faces=int(args.max_faces))
- areals: list[dict[str, Any]] = []
- map_meta: dict[str, int] = {"areal_count": 0, "cells_x": 0, "cells_y": 0}
- if map_path:
- areals, map_meta = load_areal_map(map_path)
-
- rgb = _render_scene(
- positions,
- faces,
- areals,
- width=int(args.width),
- height=int(args.height),
- yaw_deg=float(args.yaw),
- pitch_deg=float(args.pitch),
- wireframe=bool(args.wireframe),
- areal_overlay=bool(args.overlay_areals),
- )
- _write_ppm(output_path, int(args.width), int(args.height), rgb)
-
- print(f"Rendered terrain : {msh_path}")
- if map_path:
- print(f"Areal overlay : {map_path}")
- print(f"Output : {output_path}")
- print(
- "Terrain geometry : "
- f"vertices={terrain_meta['vertex_count']}, "
- f"faces={terrain_meta['face_count_rendered']}/{terrain_meta['face_count_valid']} "
- f"(raw={terrain_meta['face_count_raw']}, dropped={terrain_meta['face_dropped_invalid']})"
- )
- if map_path:
- print(
- "Areal map : "
- f"areals={map_meta['areal_count']}, cells={map_meta['cells_x']}x{map_meta['cells_y']}"
- )
- return 0
-
-
-def cmd_export_obj(args: argparse.Namespace) -> int:
- msh_path = Path(args.land_msh).resolve()
- map_path = Path(args.land_map).resolve() if args.land_map else None
- output_path = Path(args.output).resolve()
-
- positions, faces, terrain_meta = load_terrain_msh(msh_path, max_faces=int(args.max_faces))
- areals: list[dict[str, Any]] = []
- if map_path and bool(args.include_areals):
- areals, _ = load_areal_map(map_path)
-
- _write_obj(
- output_path,
- positions,
- faces,
- areals,
- include_areals=bool(args.include_areals),
- )
-
- areal_vertices = sum(len(a["vertices"]) for a in areals)
- print(f"Terrain source : {msh_path}")
- if map_path:
- print(f"Areal source : {map_path}")
- print(f"OBJ output : {output_path}")
- print(
- "Terrain geometry : "
- f"vertices={terrain_meta['vertex_count']}, "
- f"faces={terrain_meta['face_count_rendered']}/{terrain_meta['face_count_valid']}"
- )
- if bool(args.include_areals):
- print(f"Areal edges : areals={len(areals)}, extra_vertices={areal_vertices}")
- return 0
-
-
-def cmd_render_turntable(args: argparse.Namespace) -> int:
- msh_path = Path(args.land_msh).resolve()
- map_path = Path(args.land_map).resolve() if args.land_map else None
- output_dir = Path(args.output_dir).resolve()
- output_dir.mkdir(parents=True, exist_ok=True)
-
- frames = int(args.frames)
- if frames <= 0:
- raise RuntimeError("--frames must be > 0")
-
- positions, faces, terrain_meta = load_terrain_msh(msh_path, max_faces=int(args.max_faces))
- areals: list[dict[str, Any]] = []
- if map_path:
- areals, _ = load_areal_map(map_path)
-
- yaw_start = float(args.yaw_start)
- yaw_end = float(args.yaw_end)
- if frames == 1:
- yaws = [yaw_start]
- else:
- step = (yaw_end - yaw_start) / (frames - 1)
- yaws = [yaw_start + i * step for i in range(frames)]
-
- prefix = str(args.prefix)
- for i, yaw in enumerate(yaws):
- rgb = _render_scene(
- positions,
- faces,
- areals,
- width=int(args.width),
- height=int(args.height),
- yaw_deg=yaw,
- pitch_deg=float(args.pitch),
- wireframe=bool(args.wireframe),
- areal_overlay=bool(args.overlay_areals),
- )
- out = output_dir / f"{prefix}_{i:03d}.ppm"
- _write_ppm(out, int(args.width), int(args.height), rgb)
-
- print(f"Turntable source : {msh_path}")
- if map_path:
- print(f"Areal source : {map_path}")
- print(f"Output dir : {output_dir}")
- print(f"Frames : {frames} ({yaws[0]:.3f} -> {yaws[-1]:.3f} yaw)")
- print(
- "Terrain geometry : "
- f"vertices={terrain_meta['vertex_count']}, faces={terrain_meta['face_count_rendered']}"
- )
- return 0
-
-
-def cmd_render_batch(args: argparse.Namespace) -> int:
- maps_root = Path(args.maps_root).resolve()
- output_dir = Path(args.output_dir).resolve()
- msh_paths = sorted(maps_root.rglob("Land.msh"))
- if not msh_paths:
- raise RuntimeError(f"no Land.msh files under {maps_root}")
-
- rendered = 0
- skipped = 0
- for msh_path in msh_paths:
- map_path = msh_path.with_name("Land.map")
- if not map_path.exists():
- skipped += 1
- continue
- rel = msh_path.parent.relative_to(maps_root)
- out = output_dir / f"{rel.as_posix().replace('/', '__')}.ppm"
- cmd_render(
- argparse.Namespace(
- land_msh=str(msh_path),
- land_map=str(map_path),
- output=str(out),
- max_faces=args.max_faces,
- width=args.width,
- height=args.height,
- yaw=args.yaw,
- pitch=args.pitch,
- wireframe=args.wireframe,
- overlay_areals=args.overlay_areals,
- )
- )
- rendered += 1
-
- print(f"Batch summary: rendered={rendered}, skipped_no_map={skipped}, output_dir={output_dir}")
- return 0
-
-
-def build_parser() -> argparse.ArgumentParser:
- parser = argparse.ArgumentParser(
- description="Software 3D terrain renderer (Land.msh + optional Land.map overlay)."
- )
- sub = parser.add_subparsers(dest="command", required=True)
-
- render = sub.add_parser("render", help="Render one terrain map to PPM.")
- render.add_argument("--land-msh", required=True, help="Path to Land.msh")
- render.add_argument("--land-map", help="Path to Land.map (optional)")
- render.add_argument("--output", required=True, help="Output .ppm path")
- render.add_argument("--max-faces", type=int, default=220000, help="Face limit (default: 220000)")
- render.add_argument("--width", type=int, default=1280, help="Image width (default: 1280)")
- render.add_argument("--height", type=int, default=720, help="Image height (default: 720)")
- render.add_argument("--yaw", type=float, default=38.0, help="Yaw angle in degrees (default: 38)")
- render.add_argument("--pitch", type=float, default=26.0, help="Pitch angle in degrees (default: 26)")
- render.add_argument("--wireframe", action="store_true", help="Draw terrain wireframe overlay")
- render.add_argument(
- "--overlay-areals",
- action="store_true",
- help="Draw ArealMap polygon overlay",
- )
- render.set_defaults(func=cmd_render)
-
- export_obj = sub.add_parser("export-obj", help="Export terrain (and optional areal edges) to OBJ.")
- export_obj.add_argument("--land-msh", required=True, help="Path to Land.msh")
- export_obj.add_argument("--land-map", help="Path to Land.map (optional)")
- export_obj.add_argument("--output", required=True, help="Output .obj path")
- export_obj.add_argument("--max-faces", type=int, default=0, help="Face limit (0 = all)")
- export_obj.add_argument(
- "--include-areals",
- action="store_true",
- help="Export areal polygons as OBJ polyline object",
- )
- export_obj.set_defaults(func=cmd_export_obj)
-
- turn = sub.add_parser("render-turntable", help="Render turntable frame sequence to PPM.")
- turn.add_argument("--land-msh", required=True, help="Path to Land.msh")
- turn.add_argument("--land-map", help="Path to Land.map (optional)")
- turn.add_argument("--output-dir", required=True, help="Output directory for frames")
- turn.add_argument("--prefix", default="frame", help="Frame filename prefix (default: frame)")
- turn.add_argument("--frames", type=int, default=36, help="Frame count (default: 36)")
- turn.add_argument("--yaw-start", type=float, default=0.0, help="Start yaw in degrees (default: 0)")
- turn.add_argument("--yaw-end", type=float, default=360.0, help="End yaw in degrees (default: 360)")
- turn.add_argument("--pitch", type=float, default=26.0, help="Pitch angle in degrees (default: 26)")
- turn.add_argument("--max-faces", type=int, default=160000, help="Face limit (default: 160000)")
- turn.add_argument("--width", type=int, default=960, help="Image width (default: 960)")
- turn.add_argument("--height", type=int, default=540, help="Image height (default: 540)")
- turn.add_argument("--wireframe", action="store_true", help="Draw terrain wireframe overlay")
- turn.add_argument(
- "--overlay-areals",
- action="store_true",
- help="Draw ArealMap polygon overlay",
- )
- turn.set_defaults(func=cmd_render_turntable)
-
- batch = sub.add_parser("render-batch", help="Render all MAPS/**/Land.msh under root.")
- batch.add_argument(
- "--maps-root",
- default="tmp/gamedata/DATA/MAPS",
- help="Root directory with MAPS subfolders (default: tmp/gamedata/DATA/MAPS)",
- )
- batch.add_argument("--output-dir", required=True, help="Directory for output PPM files")
- batch.add_argument("--max-faces", type=int, default=90000, help="Face limit per map (default: 90000)")
- batch.add_argument("--width", type=int, default=960, help="Image width (default: 960)")
- batch.add_argument("--height", type=int, default=540, help="Image height (default: 540)")
- batch.add_argument("--yaw", type=float, default=38.0, help="Yaw angle in degrees (default: 38)")
- batch.add_argument("--pitch", type=float, default=26.0, help="Pitch angle in degrees (default: 26)")
- batch.add_argument("--wireframe", action="store_true", help="Draw terrain wireframe overlay")
- batch.add_argument(
- "--overlay-areals",
- action="store_true",
- help="Draw ArealMap polygon overlay",
- )
- batch.set_defaults(func=cmd_render_batch)
-
- return parser
-
-
-def main() -> int:
- parser = build_parser()
- args = parser.parse_args()
- return int(args.func(args))
-
-
-if __name__ == "__main__":
- raise SystemExit(main())