From 50c2cf4686b53ebd2b76318223096660e92305a4 Mon Sep 17 00:00:00 2001 From: Valentin Popov Date: Mon, 22 Jun 2026 00:35:19 +0400 Subject: chore: remove Python tooling and resource viewer --- Cargo.toml | 2 +- README.md | 12 +- apps/resource-viewer/Cargo.toml | 11 - apps/resource-viewer/src/main.rs | 518 ----------------- docs/specs/coverage-audit.md | 10 +- docs/specs/fxid.md | 2 +- docs/specs/material.md | 4 +- docs/specs/msh-animation.md | 4 +- docs/specs/msh-core.md | 3 +- docs/specs/msh-notes.md | 2 +- docs/specs/nres.md | 4 +- docs/specs/render.md | 2 +- docs/specs/rsli.md | 5 +- docs/specs/terrain-map-loading.md | 4 +- docs/specs/texture.md | 2 +- testdata/README.md | 5 + testdata/nres/.gitignore | 2 - testdata/rsli/.gitignore | 2 - tools/README.md | 201 ------- tools/archive_roundtrip_validator.py | 944 ------------------------------- tools/fxid_abs100_audit.py | 262 --------- tools/init_testdata.py | 204 ------- tools/msh_doc_validator.py | 1000 --------------------------------- tools/msh_export_obj.py | 357 ------------ tools/msh_preview_renderer.py | 481 ---------------- tools/terrain_map_doc_validator.py | 809 -------------------------- tools/terrain_map_preview_renderer.py | 679 ---------------------- 27 files changed, 22 insertions(+), 5509 deletions(-) delete mode 100644 apps/resource-viewer/Cargo.toml delete mode 100644 apps/resource-viewer/src/main.rs create mode 100644 testdata/README.md delete mode 100644 testdata/nres/.gitignore delete mode 100644 testdata/rsli/.gitignore delete mode 100644 tools/README.md delete mode 100644 tools/archive_roundtrip_validator.py delete mode 100644 tools/fxid_abs100_audit.py delete mode 100644 tools/init_testdata.py delete mode 100644 tools/msh_doc_validator.py delete mode 100644 tools/msh_export_obj.py delete mode 100644 tools/msh_preview_renderer.py delete mode 100644 tools/terrain_map_doc_validator.py delete mode 100644 tools/terrain_map_preview_renderer.py diff --git a/Cargo.toml b/Cargo.toml index e508408..34c501a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [workspace] resolver = "3" -members = ["crates/*", "apps/resource-viewer"] +members = ["crates/*"] [profile.release] codegen-units = 1 diff --git a/README.md b/README.md index 86e525a..92a3b64 100644 --- a/README.md +++ b/README.md @@ -1,13 +1,12 @@ # FParkan -Open source проект с реализацией компонентов игрового движка игры **«Паркан: Железная Стратегия»** и набором [вспомогательных инструментов](tools) для исследования. +Open source проект с реализацией компонентов игрового движка игры **«Паркан: Железная Стратегия»**. ## Описание Проект находится в активной разработке и включает: - библиотеки для работы с форматами игровых архивов; -- инструменты для валидации/подготовки тестовых данных; - спецификации форматов и сопутствующую документацию. ## Установка @@ -19,13 +18,6 @@ Open source проект с реализацией компонентов игр - локально: каталог [`docs/`](docs) - сайт: -## Инструменты - -Вспомогательные инструменты находятся в каталоге [`tools/`](tools). - -- [tools/archive_roundtrip_validator.py](tools/archive_roundtrip_validator.py) — инструмент верификации документации по архивам `NRes`/`RsLi` на реальных файлах (включая `unpack -> repack -> byte-compare`). -- [tools/init_testdata.py](tools/init_testdata.py) — подготовка тестовых данных по сигнатурам с раскладкой по каталогам. - ## Библиотеки - [crates/nres](crates/nres) — библиотека для работы с файлами архивов NRes (чтение, поиск, редактирование, сохранение). @@ -37,8 +29,8 @@ Open source проект с реализацией компонентов игр Для дополнительного тестирования на реальных игровых ресурсах: -- используйте [tools/init_testdata.py](tools/init_testdata.py) для подготовки локального набора; - используйте оригинальную копию игры (диск или [GOG-версия](https://www.gog.com/en/game/parkan_iron_strategy)); +- разместите игровые каталоги в [`testdata/`](testdata); - игровые ресурсы в репозиторий не включаются, так как защищены авторским правом. ## Contributing & Support diff --git a/apps/resource-viewer/Cargo.toml b/apps/resource-viewer/Cargo.toml deleted file mode 100644 index beadefe..0000000 --- a/apps/resource-viewer/Cargo.toml +++ /dev/null @@ -1,11 +0,0 @@ -[package] -name = "resource-viewer" -version = "0.1.0" -edition = "2021" -publish = false - -[dependencies] -iced = "0.14" -rfd = "0.17" -nres = { path = "../../crates/nres" } -rsli = { path = "../../crates/rsli" } diff --git a/apps/resource-viewer/src/main.rs b/apps/resource-viewer/src/main.rs deleted file mode 100644 index 508c407..0000000 --- a/apps/resource-viewer/src/main.rs +++ /dev/null @@ -1,518 +0,0 @@ -use iced::widget::{button, column, container, horizontal_space, row, scrollable, text}; -use iced::{application, Element, Length, Task, Theme}; -use rfd::FileDialog; -use std::collections::BTreeMap; -use std::fmt::Write as _; -use std::fs; -use std::path::{Path, PathBuf}; - -fn main() -> iced::Result { - application("Parkan Resource Viewer", update, view) - .theme(theme) - .run_with(|| (ViewerApp::default(), Task::none())) -} - -fn theme(_state: &ViewerApp) -> Theme { - Theme::Light -} - -#[derive(Debug, Default)] -struct ViewerApp { - document: Option, - status: String, -} - -#[derive(Debug, Clone)] -enum Message { - OpenRequested, - SelectNode(Selection), -} - -fn update(state: &mut ViewerApp, message: Message) -> Task { - match message { - Message::OpenRequested => { - if let Some(path) = pick_archive_file() { - match load_document(&path) { - Ok(document) => { - state.status = - format!("Loaded {} as {}", path.display(), document.format.label()); - state.document = Some(document); - } - Err(err) => { - state.status = err; - } - } - } - } - Message::SelectNode(selection) => { - if let Some(document) = state.document.as_mut() { - document.selected = selection; - } - } - } - - Task::none() -} - -fn view(state: &ViewerApp) -> Element<'_, Message> { - let top_bar = row![ - button("Open archive").on_press(Message::OpenRequested), - text(status_text(state)).size(14) - ] - .spacing(12); - - let content = if let Some(document) = &state.document { - view_document(document) - } else { - container(text("Open an .nres/.rsli/.lib archive to start.").size(16)) - .width(Length::Fill) - .height(Length::Fill) - .center_x(Length::Fill) - .center_y(Length::Fill) - .into() - }; - - container(column![top_bar, content].spacing(12).padding(12)) - .width(Length::Fill) - .height(Length::Fill) - .into() -} - -fn status_text(state: &ViewerApp) -> String { - if state.status.is_empty() { - String::from("Ready") - } else { - state.status.clone() - } -} - -fn view_document(document: &DocumentModel) -> Element<'_, Message> { - let mut tree = column![text("Archive tree").size(18)].spacing(6); - for item in &document.tree_rows { - let indent = horizontal_space().width(Length::Fixed(f32::from(item.depth) * 16.0)); - - let line = row![indent, text(&item.label).size(14)].spacing(6); - if let Some(selection) = item.selection { - let mut node_button = button(line) - .width(Length::Fill) - .on_press(Message::SelectNode(selection)); - - if selection == document.selected { - node_button = node_button.style(button::primary); - } - - tree = tree.push(node_button); - } else { - tree = tree.push(line); - } - } - - let (panel_title, fields) = selected_fields(document); - let mut fields_column = column![text(panel_title).size(18)].spacing(8); - - for field in fields { - fields_column = fields_column.push( - row![ - text(&field.key).size(14).width(Length::Fixed(220.0)), - text(&field.value).size(14).width(Length::Fill) - ] - .spacing(12), - ); - } - - let left = container(scrollable(tree)) - .width(Length::FillPortion(2)) - .height(Length::Fill); - - let right = container(scrollable(fields_column)) - .width(Length::FillPortion(5)) - .height(Length::Fill); - - row![left, right].spacing(12).height(Length::Fill).into() -} - -fn selected_fields(document: &DocumentModel) -> (String, &[FieldRow]) { - match document.selected { - Selection::Archive => ( - format!( - "{} fields ({})", - document.format.label(), - document.path.display() - ), - &document.archive_fields, - ), - Selection::Entry(index) => { - if let Some(entry) = document.entries.get(index) { - (entry.panel_title.clone(), &entry.fields) - } else { - (String::from("Entry"), &[]) - } - } - } -} - -fn pick_archive_file() -> Option { - FileDialog::new() - .set_title("Open Parkan archive") - .pick_file() -} - -fn load_document(path: &Path) -> Result { - let bytes = - fs::read(path).map_err(|err| format!("Failed to read {}: {err}", path.display()))?; - let Some(format) = detect_archive_format(&bytes) else { - return Err(format!( - "{} is not recognized as NRes/RsLi (unsupported magic).", - path.display() - )); - }; - - match format { - ArchiveFormat::Nres => load_nres_document(path), - ArchiveFormat::Rsli => load_rsli_document(path), - } -} - -fn detect_archive_format(bytes: &[u8]) -> Option { - if bytes.len() >= 4 && &bytes[0..4] == b"NRes" { - return Some(ArchiveFormat::Nres); - } - - if bytes.len() >= 2 && &bytes[0..2] == b"NL" { - return Some(ArchiveFormat::Rsli); - } - - None -} - -fn load_nres_document(path: &Path) -> Result { - let archive = nres::Archive::open_path(path) - .map_err(|err| format!("NRes open failed for {}: {err}", path.display()))?; - - let info = archive.info(); - let mut archive_fields = vec![ - FieldRow::new("format", "NRes"), - FieldRow::new("file_size", info.file_size.to_string()), - FieldRow::new("raw_mode", info.raw_mode.to_string()), - ]; - - if let Some(header) = &info.header { - archive_fields.push(FieldRow::new( - "magic", - String::from_utf8_lossy(&header.magic).into_owned(), - )); - archive_fields.push(FieldRow::new("version", format_u32_dec_hex(header.version))); - archive_fields.push(FieldRow::new("entry_count", header.entry_count.to_string())); - archive_fields.push(FieldRow::new( - "total_size", - format!("{} (0x{:08X})", header.total_size, header.total_size), - )); - archive_fields.push(FieldRow::new( - "directory_offset", - header.directory_offset.to_string(), - )); - archive_fields.push(FieldRow::new( - "directory_size", - header.directory_size.to_string(), - )); - } - - let mut entries = Vec::new(); - for entry in archive.entries_inspect() { - let meta = entry.meta; - let mut fields = vec![ - FieldRow::new("id", entry.id.0.to_string()), - FieldRow::new("name", meta.name.clone()), - FieldRow::new("type_id", format_u32_dec_hex(meta.kind)), - FieldRow::new("attr1", format_u32_dec_hex(meta.attr1)), - FieldRow::new("attr2", format_u32_dec_hex(meta.attr2)), - FieldRow::new("attr3", format_u32_dec_hex(meta.attr3)), - FieldRow::new("data_offset", meta.data_offset.to_string()), - FieldRow::new("data_size", meta.data_size.to_string()), - FieldRow::new("sort_index", meta.sort_index.to_string()), - FieldRow::new("name_raw_hex", bytes_as_hex(entry.name_raw)), - FieldRow::new("name_raw_ascii", bytes_as_ascii(entry.name_raw)), - ]; - - fields.push(FieldRow::new("find_key", meta.name.to_ascii_lowercase())); - - entries.push(EntryView { - full_name: meta.name.clone(), - panel_title: format!("NRes entry #{}: {}", entry.id.0, meta.name), - fields, - }); - } - - let tree_rows = build_tree_rows(&entries); - - Ok(DocumentModel { - path: path.to_path_buf(), - format: ArchiveFormat::Nres, - archive_fields, - entries, - tree_rows, - selected: Selection::Archive, - }) -} - -fn load_rsli_document(path: &Path) -> Result { - let library = rsli::Library::open_path(path) - .map_err(|err| format!("RsLi open failed for {}: {err}", path.display()))?; - - let header = library.header(); - let mut archive_fields = vec![ - FieldRow::new("format", "RsLi"), - FieldRow::new("magic", String::from_utf8_lossy(&header.magic).into_owned()), - FieldRow::new( - "reserved", - format!("{} (0x{:02X})", header.reserved, header.reserved), - ), - FieldRow::new( - "version", - format!("{} (0x{:02X})", header.version, header.version), - ), - FieldRow::new("entry_count", header.entry_count.to_string()), - FieldRow::new("presorted_flag", format!("0x{:04X}", header.presorted_flag)), - FieldRow::new("xor_seed", format!("0x{:08X}", header.xor_seed)), - FieldRow::new("header_raw_hex", bytes_as_hex(&header.raw)), - ]; - - if let Some(ao) = library.ao_trailer() { - archive_fields.push(FieldRow::new("ao_trailer", "present")); - archive_fields.push(FieldRow::new("ao_overlay", ao.overlay.to_string())); - archive_fields.push(FieldRow::new("ao_raw_hex", bytes_as_hex(&ao.raw))); - } else { - archive_fields.push(FieldRow::new("ao_trailer", "absent")); - } - - let mut entries = Vec::new(); - for entry in library.entries_inspect() { - let meta = entry.meta; - let method_raw = (meta.flags as u16 as u32) & 0x1E0; - - let fields = vec![ - FieldRow::new("id", entry.id.0.to_string()), - FieldRow::new("name", meta.name.clone()), - FieldRow::new( - "flags", - format!("{} (0x{:04X})", meta.flags, meta.flags as u16), - ), - FieldRow::new("method", format!("{:?}", meta.method)), - FieldRow::new("method_raw", format!("0x{:03X}", method_raw)), - FieldRow::new("packed_size", meta.packed_size.to_string()), - FieldRow::new("unpacked_size", meta.unpacked_size.to_string()), - FieldRow::new("data_offset_effective", meta.data_offset.to_string()), - FieldRow::new("data_offset_raw", entry.data_offset_raw.to_string()), - FieldRow::new("sort_to_original", entry.sort_to_original.to_string()), - FieldRow::new("name_raw_hex", bytes_as_hex(entry.name_raw)), - FieldRow::new("name_raw_ascii", bytes_as_ascii(entry.name_raw)), - FieldRow::new("service_tail_hex", bytes_as_hex(entry.service_tail)), - FieldRow::new("service_tail_ascii", bytes_as_ascii(entry.service_tail)), - ]; - - entries.push(EntryView { - full_name: meta.name.clone(), - panel_title: format!("RsLi entry #{}: {}", entry.id.0, meta.name), - fields, - }); - } - - let tree_rows = build_tree_rows(&entries); - - Ok(DocumentModel { - path: path.to_path_buf(), - format: ArchiveFormat::Rsli, - archive_fields, - entries, - tree_rows, - selected: Selection::Archive, - }) -} - -fn build_tree_rows(entries: &[EntryView]) -> Vec { - let mut root = FolderNode::default(); - for (index, entry) in entries.iter().enumerate() { - insert_tree_path(&mut root, &entry.full_name, index); - } - - let mut rows = vec![TreeRow { - depth: 0, - label: String::from("[Archive fields]"), - selection: Some(Selection::Archive), - }]; - - flatten_tree(&root, 0, &mut rows); - rows -} - -fn insert_tree_path(root: &mut FolderNode, full_name: &str, entry_index: usize) { - let mut parts: Vec<&str> = full_name - .split(['/', '\\']) - .filter(|part| !part.is_empty()) - .collect(); - - if parts.is_empty() { - parts.push(full_name); - } - - if parts.len() == 1 { - root.files.push((parts[0].to_string(), entry_index)); - return; - } - - let file_name = parts.pop().unwrap_or(full_name); - let mut node = root; - for part in parts { - node = node.folders.entry(part.to_string()).or_default(); - } - - node.files.push((file_name.to_string(), entry_index)); -} - -fn flatten_tree(node: &FolderNode, depth: u16, out: &mut Vec) { - for (folder_name, folder_node) in &node.folders { - out.push(TreeRow { - depth, - label: format!("{folder_name}/"), - selection: None, - }); - flatten_tree(folder_node, depth.saturating_add(1), out); - } - - let mut files = node.files.clone(); - files.sort_by(|left, right| left.0.cmp(&right.0)); - - for (name, index) in files { - out.push(TreeRow { - depth, - label: name, - selection: Some(Selection::Entry(index)), - }); - } -} - -fn bytes_as_hex(bytes: &[u8]) -> String { - let mut out = String::new(); - for (index, byte) in bytes.iter().enumerate() { - if index > 0 { - out.push(' '); - } - let _ = write!(&mut out, "{byte:02X}"); - } - out -} - -fn bytes_as_ascii(bytes: &[u8]) -> String { - bytes - .iter() - .map(|byte| { - if byte.is_ascii_graphic() || *byte == b' ' { - char::from(*byte) - } else { - '.' - } - }) - .collect() -} - -fn format_u32_dec_hex(value: u32) -> String { - format!("{} (0x{:08X})", value, value) -} - -#[derive(Debug, Clone)] -struct DocumentModel { - path: PathBuf, - format: ArchiveFormat, - archive_fields: Vec, - entries: Vec, - tree_rows: Vec, - selected: Selection, -} - -#[derive(Debug, Clone, Copy)] -enum ArchiveFormat { - Nres, - Rsli, -} - -impl ArchiveFormat { - fn label(self) -> &'static str { - match self { - Self::Nres => "NRes", - Self::Rsli => "RsLi", - } - } -} - -#[derive(Debug, Clone)] -struct EntryView { - full_name: String, - panel_title: String, - fields: Vec, -} - -#[derive(Debug, Clone)] -struct FieldRow { - key: String, - value: String, -} - -impl FieldRow { - fn new(key: impl Into, value: impl Into) -> Self { - Self { - key: key.into(), - value: value.into(), - } - } -} - -#[derive(Debug, Clone)] -struct TreeRow { - depth: u16, - label: String, - selection: Option, -} - -#[derive(Debug, Clone, Copy, PartialEq, Eq)] -enum Selection { - Archive, - Entry(usize), -} - -#[derive(Default, Debug)] -struct FolderNode { - folders: BTreeMap, - files: Vec<(String, usize)>, -} - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn tree_builds_nested_paths() { - let entries = vec![ - EntryView { - full_name: String::from("textures/ui/hud.texm"), - panel_title: String::new(), - fields: vec![], - }, - EntryView { - full_name: String::from("textures/world/ground.texm"), - panel_title: String::new(), - fields: vec![], - }, - EntryView { - full_name: String::from("root_file.msh"), - panel_title: String::new(), - fields: vec![], - }, - ]; - - let rows = build_tree_rows(&entries); - assert!(rows.iter().any(|row| row.label == "textures/")); - assert!(rows.iter().any(|row| row.label == "ui/")); - assert!(rows.iter().any(|row| row.label == "hud.texm")); - assert!(rows.iter().any(|row| row.label == "root_file.msh")); - } -} diff --git a/docs/specs/coverage-audit.md b/docs/specs/coverage-audit.md index 638f4c1..bee27ee 100644 --- a/docs/specs/coverage-audit.md +++ b/docs/specs/coverage-audit.md @@ -11,9 +11,7 @@ - `RsLi`: `2` архива, roundtrip `2/2` (byte-identical) - подтвержден один совместимый quirk: `sprites.lib`, entry `23`, `deflate EOF+1` -Инструмент: - -- `tools/archive_roundtrip_validator.py` +Проверено legacy-валидатором архивов. ## 2. Проверка рендерных форматов @@ -24,11 +22,7 @@ - `FXID`: `923/923` валидны - `Terrain/Map` (`Land.msh` + `Land.map`): `33/33` без ошибок/предупреждений -Инструменты: - -- `tools/msh_doc_validator.py` -- `tools/fxid_abs100_audit.py` -- `tools/terrain_map_doc_validator.py` +Проверено legacy-валидаторами рендерных форматов. ## 3. Глобальный статус по подсистемам diff --git a/docs/specs/fxid.md b/docs/specs/fxid.md index f723e17..e3a583d 100644 --- a/docs/specs/fxid.md +++ b/docs/specs/fxid.md @@ -184,7 +184,7 @@ struct ResourceRef64 { ## 11. Статус валидации -- Формальные инварианты FXID зафиксированы в `tools/msh_doc_validator.py` и `tools/fxid_abs100_audit.py`. +- Формальные инварианты FXID зафиксированы в спецификациях проекта и проверены legacy-валидаторами. - На полном retail-корпусе `testdata/Parkan - Iron Strategy` проверено `923/923` FXID payload без ошибок. ## 12. Статус покрытия и что осталось до 100% diff --git a/docs/specs/material.md b/docs/specs/material.md index 12c8296..1aa3510 100644 --- a/docs/specs/material.md +++ b/docs/specs/material.md @@ -126,8 +126,8 @@ struct KeyRaw { ## 10. Статус валидации -- Инварианты MAT0 зафиксированы в текущем toolchain проекта (`docs/specs` + `tools`). -- Структурная валидация MAT0 включена в корпусный прогон `tools/msh_doc_validator.py` на полном retail-наборе. +- Инварианты MAT0 зафиксированы в спецификациях проекта. +- Структурная валидация MAT0 проверена legacy-валидатором на полном retail-наборе. ## 11. Статус покрытия и что осталось до 100% diff --git a/docs/specs/msh-animation.md b/docs/specs/msh-animation.md index ec5a256..1c0807a 100644 --- a/docs/specs/msh-animation.md +++ b/docs/specs/msh-animation.md @@ -108,8 +108,8 @@ uint16_t map_words[]; // size/2 элементов ## 6. Статус валидации -- Форматные проверки включены в `tools/msh_doc_validator.py`. -- Корпусная валидация анимационных инвариантов включена в прогон `tools/msh_doc_validator.py` на полном retail-наборе. +- Форматные проверки были покрыты legacy-валидатором. +- Корпусная валидация анимационных инвариантов выполнена на полном retail-наборе. ## 7. Статус покрытия и что осталось до 100% diff --git a/docs/specs/msh-core.md b/docs/specs/msh-core.md index 60a4453..db465e7 100644 --- a/docs/specs/msh-core.md +++ b/docs/specs/msh-core.md @@ -174,7 +174,7 @@ for each node: ## 8. Статус валидации -- Инварианты формата реализованы в `tools/msh_doc_validator.py`. +- Инварианты формата проверены legacy-валидатором. - На полном retail-корпусе `testdata/Parkan - Iron Strategy` проверено `435/435` MSH-моделей без структурных ошибок. ## 9. Статус покрытия и что осталось до 100% @@ -190,4 +190,3 @@ for each node: 1. Полная семантика части opaque-полей (`Slot68` tail, `Batch20` opaque-поля) для authoring без copy-through. 2. Полная формализация редких веток (`Res1.attr3 != 38`) на расширенном корпусе. 3. End-to-end writer для генерации новых игровых MSH с подтвержденным runtime-паритетом. - diff --git a/docs/specs/msh-notes.md b/docs/specs/msh-notes.md index 6e77c4f..5c95eb5 100644 --- a/docs/specs/msh-notes.md +++ b/docs/specs/msh-notes.md @@ -104,7 +104,7 @@ Fallback: 2. Контракт animation sampling (`Res8 + Res19`). 3. Контракт MAT0/WEAR/Texm на уровне чтения и применения в кадре. 4. Формат FXID-контейнера, командный поток и fixed command sizes. -5. Валидация на retail-корпусе через `tools/msh_doc_validator.py` (0 ошибок/предупреждений). +5. Валидация на retail-корпусе legacy-валидатором (0 ошибок/предупреждений). ## 8. Статус покрытия и что осталось до 100% diff --git a/docs/specs/nres.md b/docs/specs/nres.md index bb31823..150b38b 100644 --- a/docs/specs/nres.md +++ b/docs/specs/nres.md @@ -168,9 +168,7 @@ Fail-safe поведение: - roundtrip `unpack -> repack -> byte-compare`: `120/120` совпали побайтно; - критических расхождений формата не обнаружено. -Инструмент: - -- `tools/archive_roundtrip_validator.py` +Проверено legacy-валидатором архивов. ## 11. Статус покрытия и что осталось до 100% diff --git a/docs/specs/render.md b/docs/specs/render.md index ccc941b..f1d098f 100644 --- a/docs/specs/render.md +++ b/docs/specs/render.md @@ -152,7 +152,7 @@ void RenderFrame(Scene* scene, Camera* cam, float dt) { ## 10. Статус валидации - Порядок кадра и подключение `Material.lib / Textures.lib / LightMap.lib` подтверждены текущей runtime-валидацией проекта. -- Детальные инварианты форматов зафиксированы в `tools/msh_doc_validator.py` и `tools/fxid_abs100_audit.py`. +- Детальные инварианты форматов зафиксированы в спецификациях проекта и проверены legacy-валидаторами. ## 11. Статус покрытия и что осталось до 100% diff --git a/docs/specs/rsli.md b/docs/specs/rsli.md index 298cf2a..239b3ff 100644 --- a/docs/specs/rsli.md +++ b/docs/specs/rsli.md @@ -207,10 +207,7 @@ XOR-дешифрование первых `unpacked_size` байт. - roundtrip `unpack -> repack -> byte-compare`: `2/2` совпали побайтно; - подтвержден ровно один `deflate EOF+1` случай (`sprites.lib`, entry `23`). -Инструменты: - -- `tools/archive_roundtrip_validator.py` -- `crates/rsli` tests +Проверено legacy-валидатором архивов и тестами `crates/rsli`. ## 11. Статус покрытия и что осталось до 100% diff --git a/docs/specs/terrain-map-loading.md b/docs/specs/terrain-map-loading.md index 62c1e0a..a511799 100644 --- a/docs/specs/terrain-map-loading.md +++ b/docs/specs/terrain-map-loading.md @@ -273,9 +273,7 @@ for (x=0; x repack -> byte-compare`; -- формирует отчёт о расхождениях со спецификацией. - -Скрипт не изменяет оригинальные файлы игры. Рабочие файлы создаются только в указанном `--workdir` (или во временной папке). - -## Поддерживаемые сигнатуры - -- `NRes` (`4E 52 65 73`) -- `RsLi` в файловом формате библиотеки: `NL 00 01` - -## Основные команды - -Сканирование архива по сигнатурам: - -```bash -python3 tools/archive_roundtrip_validator.py scan --input tmp/gamedata -``` - -Распаковка/упаковка одного NRes: - -```bash -python3 tools/archive_roundtrip_validator.py nres-unpack \ - --archive tmp/gamedata/sounds.lib \ - --output tmp/work/nres_sounds - -python3 tools/archive_roundtrip_validator.py nres-pack \ - --manifest tmp/work/nres_sounds/manifest.json \ - --output tmp/work/sounds.repacked.lib -``` - -Распаковка/упаковка одного RsLi: - -```bash -python3 tools/archive_roundtrip_validator.py rsli-unpack \ - --archive tmp/gamedata/sprites.lib \ - --output tmp/work/rsli_sprites - -python3 tools/archive_roundtrip_validator.py rsli-pack \ - --manifest tmp/work/rsli_sprites/manifest.json \ - --output tmp/work/sprites.repacked.lib -``` - -Полная валидация документации на всём наборе данных: - -```bash -python3 tools/archive_roundtrip_validator.py validate \ - --input tmp/gamedata \ - --workdir tmp/validation_work \ - --report tmp/validation_report.json \ - --fail-on-diff -``` - -## Формат распаковки - -Для каждого архива создаются: - -- `manifest.json` — все поля заголовка, записи, индексы, смещения, контрольные суммы; -- `entries/*.bin` — payload-файлы. - -Имена файлов в `entries` включают индекс записи, поэтому коллизии одинаковых имён внутри архива обрабатываются корректно. - -## `init_testdata.py` - -Скрипт инициализирует тестовые данные по сигнатурам архивов из спецификации: - -- `NRes` (`4E 52 65 73`); -- `RsLi` (`NL 00 01`). - -Что делает утилита: - -- рекурсивно сканирует все файлы в `--input`; -- копирует найденные `NRes` в `--output/nres/`; -- копирует найденные `RsLi` в `--output/rsli/`; -- сохраняет относительный путь исходного файла внутри целевого каталога; -- создаёт целевые каталоги автоматически, если их нет. - -Базовый запуск: - -```bash -python3 tools/init_testdata.py --input tmp/gamedata --output testdata -``` - -Если целевой файл уже существует, скрипт спрашивает подтверждение перезаписи (`yes/no/all/quit`). - -Для перезаписи без вопросов используйте `--force`: - -```bash -python3 tools/init_testdata.py --input tmp/gamedata --output testdata --force -``` - -Проверки надёжности: - -- `--input` должен существовать и быть каталогом; -- если `--output` указывает на существующий файл, скрипт завершится с ошибкой; -- если `--output` расположен внутри `--input`, каталог вывода исключается из сканирования; -- если `stdin` неинтерактивный и требуется перезапись, нужно явно указать `--force`. - -## `msh_doc_validator.py` - -Скрипт валидирует ключевые инварианты из документации `/Users/valentineus/Developer/personal/fparkan/docs/specs/msh.md` на реальных данных. - -Проверяемые группы: - -- модели `*.msh` (вложенные `NRes` в архивах `NRes`); -- текстуры `Texm` (`type_id = 0x6D786554`); -- эффекты `FXID` (`type_id = 0x44495846`). - -Что проверяет для моделей: - -- обязательные ресурсы (`Res1/2/3/6/13`) и известные опциональные (`Res4/5/7/8/10/15/16/18/19`); -- `size/attr1/attr3` и шаги структур по таблицам; -- диапазоны индексов, батчей и ссылок между таблицами; -- разбор `Res10` как `len + bytes + NUL` для каждого узла; -- матрицу слотов в `Res1` (LOD/group) и границы по `Res2/Res7/Res13/Res19`. - -Быстрый запуск: - -```bash -python3 tools/msh_doc_validator.py scan --input testdata/nres -python3 tools/msh_doc_validator.py validate --input testdata/nres --print-limit 20 -``` - -С отчётом в JSON: - -```bash -python3 tools/msh_doc_validator.py validate \ - --input testdata/nres \ - --report tmp/msh_validation_report.json \ - --fail-on-warnings -``` - -## `msh_preview_renderer.py` - -Примитивный программный рендерер моделей `*.msh` без внешних зависимостей. - -- вход: архив `NRes` (например `animals.rlb`) или прямой payload модели; -- выход: изображение `PPM` (`P6`); -- использует `Res3` (позиции), `Res6` (индексы), `Res13` (батчи), `Res1/Res2` (выбор слотов по `lod/group`). - -Показать доступные модели в архиве: - -```bash -python3 tools/msh_preview_renderer.py list-models --archive testdata/nres/animals.rlb -``` - -Сгенерировать тестовый рендер: - -```bash -python3 tools/msh_preview_renderer.py render \ - --archive testdata/nres/animals.rlb \ - --model A_L_01.msh \ - --output tmp/renders/A_L_01.ppm \ - --width 800 \ - --height 600 \ - --lod 0 \ - --group 0 \ - --wireframe -``` - -Ограничения: - -- инструмент предназначен для smoke-теста геометрии, а не для пиксельно-точного рендера движка; -- текстуры/материалы/эффектные проходы не эмулируются. - -## `msh_export_obj.py` - -Экспортирует геометрию `*.msh` в `Wavefront OBJ`, чтобы открыть модель в Blender/MeshLab. - -- вход: `NRes` архив (например `animals.rlb`) или прямой payload модели; -- выбор геометрии: через `Res1` slot matrix (`lod/group`) как в рендерере; -- опция `--all-batches` экспортирует все батчи, игнорируя slot matrix. - -Показать модели в архиве: - -```bash -python3 tools/msh_export_obj.py list-models --archive testdata/nres/animals.rlb -``` - -Экспорт в OBJ: - -```bash -python3 tools/msh_export_obj.py export \ - --archive testdata/nres/animals.rlb \ - --model A_L_01.msh \ - --output tmp/renders/A_L_01.obj \ - --lod 0 \ - --group 0 -``` - -Файл `OBJ` можно открыть напрямую в Blender (`File -> Import -> Wavefront (.obj)`). diff --git a/tools/archive_roundtrip_validator.py b/tools/archive_roundtrip_validator.py deleted file mode 100644 index 073fd9b..0000000 --- a/tools/archive_roundtrip_validator.py +++ /dev/null @@ -1,944 +0,0 @@ -#!/usr/bin/env python3 -""" -Roundtrip tools for NRes and RsLi archives. - -The script can: -1) scan archives by header signature (ignores file extensions), -2) unpack / pack NRes archives, -3) unpack / pack RsLi archives, -4) validate docs assumptions by full roundtrip and byte-to-byte comparison. -""" - -from __future__ import annotations - -import argparse -import hashlib -import json -import re -import shutil -import struct -import tempfile -import zlib -from pathlib import Path -from typing import Any - -MAGIC_NRES = b"NRes" -MAGIC_RSLI = b"NL\x00\x01" - - -class ArchiveFormatError(RuntimeError): - pass - - -def sha256_hex(data: bytes) -> str: - return hashlib.sha256(data).hexdigest() - - -def safe_component(value: str, fallback: str = "item", max_len: int = 80) -> str: - clean = re.sub(r"[^A-Za-z0-9._-]+", "_", value).strip("._-") - if not clean: - clean = fallback - return clean[:max_len] - - -def first_diff(a: bytes, b: bytes) -> tuple[int | None, str | None]: - if a == b: - return None, None - limit = min(len(a), len(b)) - for idx in range(limit): - if a[idx] != b[idx]: - return idx, f"{a[idx]:02x}!={b[idx]:02x}" - return limit, f"len {len(a)}!={len(b)}" - - -def load_json(path: Path) -> dict[str, Any]: - with path.open("r", encoding="utf-8") as handle: - return json.load(handle) - - -def dump_json(path: Path, payload: dict[str, Any]) -> None: - path.parent.mkdir(parents=True, exist_ok=True) - with path.open("w", encoding="utf-8") as handle: - json.dump(payload, handle, indent=2, ensure_ascii=False) - handle.write("\n") - - -def xor_stream(data: bytes, key16: int) -> bytes: - lo = key16 & 0xFF - hi = (key16 >> 8) & 0xFF - out = bytearray(len(data)) - for i, value in enumerate(data): - lo = (hi ^ ((lo << 1) & 0xFF)) & 0xFF - out[i] = value ^ lo - hi = (lo ^ ((hi >> 1) & 0xFF)) & 0xFF - return bytes(out) - - -def lzss_decompress_simple(data: bytes, expected_size: int) -> bytes: - ring = bytearray([0x20] * 0x1000) - ring_pos = 0xFEE - out = bytearray() - in_pos = 0 - control = 0 - bits_left = 0 - - while len(out) < expected_size and in_pos < len(data): - if bits_left == 0: - control = data[in_pos] - in_pos += 1 - bits_left = 8 - - if control & 1: - if in_pos >= len(data): - break - byte = data[in_pos] - in_pos += 1 - out.append(byte) - ring[ring_pos] = byte - ring_pos = (ring_pos + 1) & 0x0FFF - else: - if in_pos + 1 >= len(data): - break - low = data[in_pos] - high = data[in_pos + 1] - in_pos += 2 - # Real files indicate nibble layout opposite to common LZSS variant: - # high nibble extends offset, low nibble stores (length - 3). - offset = low | ((high & 0xF0) << 4) - length = (high & 0x0F) + 3 - for step in range(length): - byte = ring[(offset + step) & 0x0FFF] - out.append(byte) - ring[ring_pos] = byte - ring_pos = (ring_pos + 1) & 0x0FFF - if len(out) >= expected_size: - break - - control >>= 1 - bits_left -= 1 - - if len(out) != expected_size: - raise ArchiveFormatError( - f"LZSS size mismatch: expected {expected_size}, got {len(out)}" - ) - return bytes(out) - - -def decode_rsli_payload( - packed: bytes, method: int, sort_to_original: int, unpacked_size: int -) -> bytes: - key16 = sort_to_original & 0xFFFF - - if method == 0x000: - out = packed - elif method == 0x020: - if len(packed) < unpacked_size: - raise ArchiveFormatError( - f"method 0x20 packed too short: {len(packed)} < {unpacked_size}" - ) - out = xor_stream(packed[:unpacked_size], key16) - elif method == 0x040: - out = lzss_decompress_simple(packed, unpacked_size) - elif method == 0x060: - out = lzss_decompress_simple(xor_stream(packed, key16), unpacked_size) - elif method == 0x100: - try: - out = zlib.decompress(packed, -15) - except zlib.error: - out = zlib.decompress(packed) - else: - raise ArchiveFormatError(f"unsupported RsLi method: 0x{method:03X}") - - if len(out) != unpacked_size: - raise ArchiveFormatError( - f"unpacked_size mismatch: expected {unpacked_size}, got {len(out)}" - ) - return out - - -def detect_archive_type(path: Path) -> str | None: - try: - with path.open("rb") as handle: - magic = handle.read(4) - except OSError: - return None - - if magic == MAGIC_NRES: - return "nres" - if magic == MAGIC_RSLI: - return "rsli" - return None - - -def scan_archives(root: Path) -> list[dict[str, Any]]: - found: list[dict[str, Any]] = [] - for path in sorted(root.rglob("*")): - if not path.is_file(): - continue - archive_type = detect_archive_type(path) - if not archive_type: - continue - found.append( - { - "path": str(path), - "relative_path": str(path.relative_to(root)), - "type": archive_type, - "size": path.stat().st_size, - } - ) - return found - - -def parse_nres(data: bytes, source: str = "") -> dict[str, Any]: - if len(data) < 16: - raise ArchiveFormatError(f"{source}: NRes too short ({len(data)} bytes)") - - magic, version, entry_count, total_size = struct.unpack_from("<4sIII", data, 0) - if magic != MAGIC_NRES: - raise ArchiveFormatError(f"{source}: invalid NRes magic") - - issues: list[str] = [] - if total_size != len(data): - issues.append( - f"header.total_size={total_size} != actual_size={len(data)} (spec 1.2)" - ) - if version != 0x100: - issues.append(f"version=0x{version:08X} != 0x00000100 (spec 1.2)") - - directory_offset = total_size - entry_count * 64 - if directory_offset < 16 or directory_offset > len(data): - raise ArchiveFormatError( - f"{source}: invalid directory offset {directory_offset} for entry_count={entry_count}" - ) - if directory_offset + entry_count * 64 != len(data): - issues.append( - "directory_offset + entry_count*64 != file_size (spec 1.3)" - ) - - entries: list[dict[str, Any]] = [] - for index in range(entry_count): - offset = directory_offset + index * 64 - if offset + 64 > len(data): - raise ArchiveFormatError(f"{source}: truncated directory entry {index}") - - ( - type_id, - attr1, - attr2, - size, - attr3, - name_raw, - data_offset, - sort_index, - ) = struct.unpack_from(" directory_offset: - issues.append( - f"entry {idx}: data range [{data_offset}, {data_offset + size}) out of data area (spec 1.3)" - ) - for i in range(len(data_regions) - 1): - _, start, size = data_regions[i] - _, next_start, _ = data_regions[i + 1] - if start + size > next_start: - issues.append( - f"entry overlap at data_offset={start}, next={next_start}" - ) - padding = data[start + size : next_start] - if any(padding): - issues.append( - f"non-zero padding after data block at offset={start + size} (spec 1.5)" - ) - - return { - "format": "NRes", - "header": { - "magic": "NRes", - "version": version, - "entry_count": entry_count, - "total_size": total_size, - "directory_offset": directory_offset, - }, - "entries": entries, - "issues": issues, - } - - -def build_nres_name_field(entry: dict[str, Any]) -> bytes: - if "name_bytes_hex" in entry: - raw = bytes.fromhex(entry["name_bytes_hex"]) - else: - raw = entry.get("name", "").encode("latin1", errors="replace") - raw = raw[:35] - return raw + b"\x00" * (36 - len(raw)) - - -def unpack_nres_file(archive_path: Path, out_dir: Path, source_root: Path | None = None) -> dict[str, Any]: - data = archive_path.read_bytes() - parsed = parse_nres(data, source=str(archive_path)) - - out_dir.mkdir(parents=True, exist_ok=True) - entries_dir = out_dir / "entries" - entries_dir.mkdir(parents=True, exist_ok=True) - - manifest: dict[str, Any] = { - "format": "NRes", - "source_path": str(archive_path), - "source_relative_path": str(archive_path.relative_to(source_root)) if source_root else str(archive_path), - "header": parsed["header"], - "entries": [], - "issues": parsed["issues"], - "source_sha256": sha256_hex(data), - } - - for entry in parsed["entries"]: - begin = entry["data_offset"] - end = begin + entry["size"] - if begin < 0 or end > len(data): - raise ArchiveFormatError( - f"{archive_path}: entry {entry['index']} data range outside file" - ) - payload = data[begin:end] - base = safe_component(entry["name"], fallback=f"entry_{entry['index']:05d}") - file_name = ( - f"{entry['index']:05d}__{base}" - f"__t{entry['type_id']:08X}_a1{entry['attr1']:08X}_a2{entry['attr2']:08X}.bin" - ) - (entries_dir / file_name).write_bytes(payload) - - manifest_entry = dict(entry) - manifest_entry["data_file"] = f"entries/{file_name}" - manifest_entry["sha256"] = sha256_hex(payload) - manifest["entries"].append(manifest_entry) - - dump_json(out_dir / "manifest.json", manifest) - return manifest - - -def pack_nres_manifest(manifest_path: Path, out_file: Path) -> bytes: - manifest = load_json(manifest_path) - if manifest.get("format") != "NRes": - raise ArchiveFormatError(f"{manifest_path}: not an NRes manifest") - - entries = manifest["entries"] - count = len(entries) - version = int(manifest.get("header", {}).get("version", 0x100)) - - out = bytearray(b"\x00" * 16) - data_offsets: list[int] = [] - data_sizes: list[int] = [] - - for entry in entries: - payload_path = manifest_path.parent / entry["data_file"] - payload = payload_path.read_bytes() - offset = len(out) - out.extend(payload) - padding = (-len(out)) % 8 - if padding: - out.extend(b"\x00" * padding) - data_offsets.append(offset) - data_sizes.append(len(payload)) - - directory_offset = len(out) - expected_sort = sorted( - range(count), - key=lambda idx: bytes.fromhex(entries[idx].get("name_bytes_hex", "")).lower(), - ) - - for index, entry in enumerate(entries): - name_field = build_nres_name_field(entry) - out.extend( - struct.pack( - " dict[str, Any]: - if len(data) < 32: - raise ArchiveFormatError(f"{source}: RsLi too short ({len(data)} bytes)") - if data[:4] != MAGIC_RSLI: - raise ArchiveFormatError(f"{source}: invalid RsLi magic") - - issues: list[str] = [] - reserved_zero = data[2] - version = data[3] - entry_count = struct.unpack_from(" len(data): - raise ArchiveFormatError( - f"{source}: encrypted table out of file bounds ({table_offset}+{table_size}>{len(data)})" - ) - - table_encrypted = data[table_offset : table_offset + table_size] - table_plain = xor_stream(table_encrypted, seed & 0xFFFF) - - trailer: dict[str, Any] = {"present": False} - overlay_offset = 0 - if len(data) >= 6 and data[-6:-4] == b"AO": - overlay_offset = struct.unpack_from(" len(data): - end = effective_offset + packed_size - if method == 0x100 and end == len(data) + 1: - issues.append( - f"entry {index}: deflate packed_size reaches EOF+1 ({end}); " - "observed in game data, likely decoder lookahead byte" - ) - else: - issues.append( - f"entry {index}: packed range [{effective_offset}, {end}) out of file" - ) - - if presorted_flag == 0xABBA: - if sorted(sort_values) != list(range(entry_count)): - issues.append( - "presorted flag is 0xABBA but sort_to_original is not a permutation [0..N-1] (spec 2.2/2.4)" - ) - - return { - "format": "RsLi", - "header_raw_hex": data[:32].hex(), - "header": { - "magic": "NL\\x00\\x01", - "entry_count": entry_count, - "seed": seed, - "presorted_flag": presorted_flag, - }, - "entries": entries, - "issues": issues, - "trailer": trailer, - } - - -def unpack_rsli_file(archive_path: Path, out_dir: Path, source_root: Path | None = None) -> dict[str, Any]: - data = archive_path.read_bytes() - parsed = parse_rsli(data, source=str(archive_path)) - - out_dir.mkdir(parents=True, exist_ok=True) - entries_dir = out_dir / "entries" - entries_dir.mkdir(parents=True, exist_ok=True) - - manifest: dict[str, Any] = { - "format": "RsLi", - "source_path": str(archive_path), - "source_relative_path": str(archive_path.relative_to(source_root)) if source_root else str(archive_path), - "source_size": len(data), - "header_raw_hex": parsed["header_raw_hex"], - "header": parsed["header"], - "entries": [], - "issues": list(parsed["issues"]), - "trailer": parsed["trailer"], - "source_sha256": sha256_hex(data), - } - - for entry in parsed["entries"]: - begin = int(entry["effective_data_offset"]) - end = begin + int(entry["packed_size"]) - packed = data[begin:end] - base = safe_component(entry["name"], fallback=f"entry_{entry['index']:05d}") - packed_name = f"{entry['index']:05d}__{base}__packed.bin" - (entries_dir / packed_name).write_bytes(packed) - - manifest_entry = dict(entry) - manifest_entry["packed_file"] = f"entries/{packed_name}" - manifest_entry["packed_file_size"] = len(packed) - manifest_entry["packed_sha256"] = sha256_hex(packed) - - try: - unpacked = decode_rsli_payload( - packed=packed, - method=int(entry["method"]), - sort_to_original=int(entry["sort_to_original"]), - unpacked_size=int(entry["unpacked_size"]), - ) - unpacked_name = f"{entry['index']:05d}__{base}__unpacked.bin" - (entries_dir / unpacked_name).write_bytes(unpacked) - manifest_entry["unpacked_file"] = f"entries/{unpacked_name}" - manifest_entry["unpacked_sha256"] = sha256_hex(unpacked) - except ArchiveFormatError as exc: - manifest_entry["unpack_error"] = str(exc) - manifest["issues"].append( - f"entry {entry['index']}: cannot decode method 0x{entry['method']:03X}: {exc}" - ) - - manifest["entries"].append(manifest_entry) - - dump_json(out_dir / "manifest.json", manifest) - return manifest - - -def _pack_i16(value: int) -> int: - if not (-32768 <= int(value) <= 32767): - raise ArchiveFormatError(f"int16 overflow: {value}") - return int(value) - - -def pack_rsli_manifest(manifest_path: Path, out_file: Path) -> bytes: - manifest = load_json(manifest_path) - if manifest.get("format") != "RsLi": - raise ArchiveFormatError(f"{manifest_path}: not an RsLi manifest") - - entries = manifest["entries"] - count = len(entries) - - header_raw = bytes.fromhex(manifest["header_raw_hex"]) - if len(header_raw) != 32: - raise ArchiveFormatError(f"{manifest_path}: header_raw_hex must be 32 bytes") - header = bytearray(header_raw) - header[:4] = MAGIC_RSLI - struct.pack_into(" declared_size: - raise ArchiveFormatError( - f"{packed_path}: packed size {len(packed)} > manifest packed_size {declared_size}" - ) - - data_offset = int(entry["data_offset"]) - packed_chunks.append((entry, packed)) - - row = bytearray(32) - name_raw = bytes.fromhex(entry["name_raw_hex"]) - reserved_raw = bytes.fromhex(entry["reserved_raw_hex"]) - if len(name_raw) != 12 or len(reserved_raw) != 4: - raise ArchiveFormatError( - f"entry {entry['index']}: invalid name/reserved raw length" - ) - row[0:12] = name_raw - row[12:16] = reserved_raw - struct.pack_into( - "= pre_trailer_size: - raise ArchiveFormatError( - f"entry {entry['index']}: data write at {pos} beyond output size {pre_trailer_size}" - ) - if occupied[pos] and out[pos] != byte: - raise ArchiveFormatError( - f"entry {entry['index']}: overlapping packed data conflict at offset {pos}" - ) - out[pos] = byte - occupied[pos] = 1 - - out.extend(trailer_raw) - if source_size is not None and len(out) != int(source_size): - raise ArchiveFormatError( - f"packed size {len(out)} != source_size {source_size} from manifest" - ) - - out_file.parent.mkdir(parents=True, exist_ok=True) - out_file.write_bytes(out) - return bytes(out) - - -def cmd_scan(args: argparse.Namespace) -> int: - root = Path(args.input).resolve() - archives = scan_archives(root) - if args.json: - print(json.dumps(archives, ensure_ascii=False, indent=2)) - else: - print(f"Found {len(archives)} archive(s) in {root}") - for item in archives: - print(f"{item['type']:4} {item['size']:10d} {item['relative_path']}") - return 0 - - -def cmd_nres_unpack(args: argparse.Namespace) -> int: - archive_path = Path(args.archive).resolve() - out_dir = Path(args.output).resolve() - manifest = unpack_nres_file(archive_path, out_dir) - print(f"NRes unpacked: {archive_path}") - print(f"Manifest: {out_dir / 'manifest.json'}") - print(f"Entries : {len(manifest['entries'])}") - if manifest["issues"]: - print("Issues:") - for issue in manifest["issues"]: - print(f"- {issue}") - return 0 - - -def cmd_nres_pack(args: argparse.Namespace) -> int: - manifest_path = Path(args.manifest).resolve() - out_file = Path(args.output).resolve() - packed = pack_nres_manifest(manifest_path, out_file) - print(f"NRes packed: {out_file} ({len(packed)} bytes, sha256={sha256_hex(packed)})") - return 0 - - -def cmd_rsli_unpack(args: argparse.Namespace) -> int: - archive_path = Path(args.archive).resolve() - out_dir = Path(args.output).resolve() - manifest = unpack_rsli_file(archive_path, out_dir) - print(f"RsLi unpacked: {archive_path}") - print(f"Manifest: {out_dir / 'manifest.json'}") - print(f"Entries : {len(manifest['entries'])}") - if manifest["issues"]: - print("Issues:") - for issue in manifest["issues"]: - print(f"- {issue}") - return 0 - - -def cmd_rsli_pack(args: argparse.Namespace) -> int: - manifest_path = Path(args.manifest).resolve() - out_file = Path(args.output).resolve() - packed = pack_rsli_manifest(manifest_path, out_file) - print(f"RsLi packed: {out_file} ({len(packed)} bytes, sha256={sha256_hex(packed)})") - return 0 - - -def cmd_validate(args: argparse.Namespace) -> int: - input_root = Path(args.input).resolve() - archives = scan_archives(input_root) - - temp_created = False - if args.workdir: - workdir = Path(args.workdir).resolve() - workdir.mkdir(parents=True, exist_ok=True) - else: - workdir = Path(tempfile.mkdtemp(prefix="nres-rsli-validate-")) - temp_created = True - - report: dict[str, Any] = { - "input_root": str(input_root), - "workdir": str(workdir), - "archives_total": len(archives), - "results": [], - "summary": {}, - } - - failures = 0 - try: - for idx, item in enumerate(archives): - rel = item["relative_path"] - archive_path = input_root / rel - marker = f"{idx:04d}_{safe_component(rel, fallback='archive')}" - unpack_dir = workdir / "unpacked" / marker - repacked_file = workdir / "repacked" / f"{marker}.bin" - try: - if item["type"] == "nres": - manifest = unpack_nres_file(archive_path, unpack_dir, source_root=input_root) - repacked = pack_nres_manifest(unpack_dir / "manifest.json", repacked_file) - elif item["type"] == "rsli": - manifest = unpack_rsli_file(archive_path, unpack_dir, source_root=input_root) - repacked = pack_rsli_manifest(unpack_dir / "manifest.json", repacked_file) - else: - continue - - original = archive_path.read_bytes() - match = original == repacked - diff_offset, diff_desc = first_diff(original, repacked) - issues = list(manifest.get("issues", [])) - result = { - "relative_path": rel, - "type": item["type"], - "size_original": len(original), - "size_repacked": len(repacked), - "sha256_original": sha256_hex(original), - "sha256_repacked": sha256_hex(repacked), - "match": match, - "first_diff_offset": diff_offset, - "first_diff": diff_desc, - "issues": issues, - "entries": len(manifest.get("entries", [])), - "error": None, - } - except Exception as exc: # pylint: disable=broad-except - result = { - "relative_path": rel, - "type": item["type"], - "size_original": item["size"], - "size_repacked": None, - "sha256_original": None, - "sha256_repacked": None, - "match": False, - "first_diff_offset": None, - "first_diff": None, - "issues": [f"processing error: {exc}"], - "entries": None, - "error": str(exc), - } - - report["results"].append(result) - - if not result["match"]: - failures += 1 - if result["issues"] and args.fail_on_issues: - failures += 1 - - matches = sum(1 for row in report["results"] if row["match"]) - mismatches = len(report["results"]) - matches - nres_count = sum(1 for row in report["results"] if row["type"] == "nres") - rsli_count = sum(1 for row in report["results"] if row["type"] == "rsli") - issues_total = sum(len(row["issues"]) for row in report["results"]) - report["summary"] = { - "nres_count": nres_count, - "rsli_count": rsli_count, - "matches": matches, - "mismatches": mismatches, - "issues_total": issues_total, - } - - if args.report: - dump_json(Path(args.report).resolve(), report) - - print(f"Input root : {input_root}") - print(f"Work dir : {workdir}") - print(f"NRes archives : {nres_count}") - print(f"RsLi archives : {rsli_count}") - print(f"Roundtrip match: {matches}/{len(report['results'])}") - print(f"Doc issues : {issues_total}") - - if mismatches: - print("\nMismatches:") - for row in report["results"]: - if row["match"]: - continue - print( - f"- {row['relative_path']} [{row['type']}] " - f"diff@{row['first_diff_offset']}: {row['first_diff']}" - ) - - if issues_total: - print("\nIssues:") - for row in report["results"]: - if not row["issues"]: - continue - print(f"- {row['relative_path']} [{row['type']}]") - for issue in row["issues"]: - print(f" * {issue}") - - finally: - if temp_created or args.cleanup: - shutil.rmtree(workdir, ignore_errors=True) - - if failures > 0: - return 1 - if report["summary"].get("mismatches", 0) > 0 and args.fail_on_diff: - return 1 - return 0 - - -def build_parser() -> argparse.ArgumentParser: - parser = argparse.ArgumentParser( - description="NRes/RsLi tools: scan, unpack, repack, and roundtrip validation." - ) - sub = parser.add_subparsers(dest="command", required=True) - - scan = sub.add_parser("scan", help="Scan files by header signatures.") - scan.add_argument("--input", required=True, help="Root directory to scan.") - scan.add_argument("--json", action="store_true", help="Print JSON output.") - scan.set_defaults(func=cmd_scan) - - nres_unpack = sub.add_parser("nres-unpack", help="Unpack a single NRes archive.") - nres_unpack.add_argument("--archive", required=True, help="Path to NRes file.") - nres_unpack.add_argument("--output", required=True, help="Output directory.") - nres_unpack.set_defaults(func=cmd_nres_unpack) - - nres_pack = sub.add_parser("nres-pack", help="Pack NRes archive from manifest.") - nres_pack.add_argument("--manifest", required=True, help="Path to manifest.json.") - nres_pack.add_argument("--output", required=True, help="Output file path.") - nres_pack.set_defaults(func=cmd_nres_pack) - - rsli_unpack = sub.add_parser("rsli-unpack", help="Unpack a single RsLi archive.") - rsli_unpack.add_argument("--archive", required=True, help="Path to RsLi file.") - rsli_unpack.add_argument("--output", required=True, help="Output directory.") - rsli_unpack.set_defaults(func=cmd_rsli_unpack) - - rsli_pack = sub.add_parser("rsli-pack", help="Pack RsLi archive from manifest.") - rsli_pack.add_argument("--manifest", required=True, help="Path to manifest.json.") - rsli_pack.add_argument("--output", required=True, help="Output file path.") - rsli_pack.set_defaults(func=cmd_rsli_pack) - - validate = sub.add_parser( - "validate", - help="Scan all archives and run unpack->repack->byte-compare validation.", - ) - validate.add_argument("--input", required=True, help="Root with game data files.") - validate.add_argument( - "--workdir", - help="Working directory for temporary unpack/repack files. " - "If omitted, a temporary directory is used and removed automatically.", - ) - validate.add_argument("--report", help="Optional JSON report output path.") - validate.add_argument( - "--fail-on-diff", - action="store_true", - help="Return non-zero exit code if any byte mismatch exists.", - ) - validate.add_argument( - "--fail-on-issues", - action="store_true", - help="Return non-zero exit code if any spec issue was detected.", - ) - validate.add_argument( - "--cleanup", - action="store_true", - help="Remove --workdir after completion.", - ) - validate.set_defaults(func=cmd_validate) - - return parser - - -def main() -> int: - parser = build_parser() - args = parser.parse_args() - return int(args.func(args)) - - -if __name__ == "__main__": - raise SystemExit(main()) diff --git a/tools/fxid_abs100_audit.py b/tools/fxid_abs100_audit.py deleted file mode 100644 index 79f3b92..0000000 --- a/tools/fxid_abs100_audit.py +++ /dev/null @@ -1,262 +0,0 @@ -#!/usr/bin/env python3 -""" -Deterministic audit for FXID "absolute parity" checklist. - -What this script produces: -1) strict parsing stats across all FXID payloads in NRes archives, -2) opcode histogram and rare-branch counters (op6, op1 tail usage), -3) reference vectors for RNG core (sub_10002220 semantics). -""" - -from __future__ import annotations - -import argparse -import json -import struct -from collections import Counter -from pathlib import Path -from typing import Any - -import archive_roundtrip_validator as arv - -TYPE_FXID = 0x44495846 -FX_CMD_SIZE = {1: 224, 2: 148, 3: 200, 4: 204, 5: 112, 6: 4, 7: 208, 8: 248, 9: 208, 10: 208} - - -def _entry_payload(blob: bytes, entry: dict[str, Any]) -> bytes: - start = int(entry["data_offset"]) - end = start + int(entry["size"]) - return blob[start:end] - - -def _cstr32(raw: bytes) -> str: - return raw.split(b"\x00", 1)[0].decode("latin1", errors="replace") - - -def _rng_step_sub_10002220(state32: int) -> tuple[int, int]: - """ - sub_10002220 semantics in 32-bit packed state form: - lo = state[15:0], hi = state[31:16] - new_lo = hi ^ (lo << 1) - new_hi = (hi >> 1) ^ new_lo - return new_hi (u16), update state=(new_hi<<16)|new_lo - """ - lo = state32 & 0xFFFF - hi = (state32 >> 16) & 0xFFFF - new_lo = (hi ^ ((lo << 1) & 0xFFFF)) & 0xFFFF - new_hi = ((hi >> 1) ^ new_lo) & 0xFFFF - return ((new_hi << 16) | new_lo), new_hi - - -def _rng_vectors() -> dict[str, Any]: - seeds = [0x00000000, 0x00000001, 0x12345678, 0x89ABCDEF, 0xFFFFFFFF] - out: list[dict[str, Any]] = [] - for seed in seeds: - state = seed - outputs: list[int] = [] - states: list[int] = [] - for _ in range(16): - state, value = _rng_step_sub_10002220(state) - outputs.append(value) - states.append(state) - out.append( - { - "seed_hex": f"0x{seed:08X}", - "outputs_u16_hex": [f"0x{x:04X}" for x in outputs], - "states_u32_hex": [f"0x{x:08X}" for x in states], - } - ) - return {"generator": "sub_10002220", "vectors": out} - - -def run_audit(root: Path) -> dict[str, Any]: - counters: Counter[str] = Counter() - opcode_hist: Counter[int] = Counter() - issues: list[dict[str, Any]] = [] - op1_tail6_samples: list[dict[str, Any]] = [] - op1_optref_samples: list[dict[str, Any]] = [] - - for item in arv.scan_archives(root): - if item["type"] != "nres": - continue - archive_path = root / item["relative_path"] - counters["archives_total"] += 1 - data = archive_path.read_bytes() - try: - parsed = arv.parse_nres(data, source=str(archive_path)) - except Exception as exc: # pylint: disable=broad-except - issues.append( - { - "severity": "error", - "archive": str(archive_path), - "entry": None, - "message": f"cannot parse NRes: {exc}", - } - ) - continue - - for entry in parsed["entries"]: - if int(entry["type_id"]) != TYPE_FXID: - continue - counters["fxid_total"] += 1 - payload = _entry_payload(data, entry) - entry_name = str(entry["name"]) - - if len(payload) < 60: - issues.append( - { - "severity": "error", - "archive": str(archive_path), - "entry": entry_name, - "message": f"payload too small: {len(payload)}", - } - ) - continue - - cmd_count = struct.unpack_from(" len(payload): - issues.append( - { - "severity": "error", - "archive": str(archive_path), - "entry": entry_name, - "message": f"command {idx}: missing header at offset={ptr}", - } - ) - ok = False - break - - word = struct.unpack_from(" len(payload): - issues.append( - { - "severity": "error", - "archive": str(archive_path), - "entry": entry_name, - "message": f"command {idx}: truncated end={ptr + size}, payload={len(payload)}", - } - ) - ok = False - break - - opcode_hist[opcode] += 1 - if opcode == 6: - counters["op6_commands"] += 1 - if opcode == 1: - tail6 = payload[ptr + 136 : ptr + 160] - if any(tail6): - counters["op1_tail6_nonzero"] += 1 - if len(op1_tail6_samples) < 16: - dwords = list(struct.unpack("<6I", tail6)) - op1_tail6_samples.append( - { - "archive": str(archive_path), - "entry": entry_name, - "cmd_index": idx, - "tail6_u32_hex": [f"0x{x:08X}" for x in dwords], - } - ) - - archive_s = _cstr32(payload[ptr + 160 : ptr + 192]) - name_s = _cstr32(payload[ptr + 192 : ptr + 224]) - if archive_s or name_s: - counters["op1_optref_nonempty"] += 1 - if len(op1_optref_samples) < 16: - op1_optref_samples.append( - { - "archive": str(archive_path), - "entry": entry_name, - "cmd_index": idx, - "opt_archive": archive_s, - "opt_name": name_s, - } - ) - - ptr += size - - if ok and ptr != len(payload): - issues.append( - { - "severity": "error", - "archive": str(archive_path), - "entry": entry_name, - "message": f"tail bytes after command stream: parsed_end={ptr}, payload={len(payload)}", - } - ) - ok = False - - if ok: - counters["fxid_ok"] += 1 - - return { - "input_root": str(root), - "summary": { - "archives_total": counters["archives_total"], - "fxid_total": counters["fxid_total"], - "fxid_ok": counters["fxid_ok"], - "issues_total": len(issues), - "op6_commands": counters["op6_commands"], - "op1_tail6_nonzero": counters["op1_tail6_nonzero"], - "op1_optref_nonempty": counters["op1_optref_nonempty"], - }, - "opcode_histogram": {str(k): opcode_hist[k] for k in sorted(opcode_hist)}, - "op1_tail6_samples": op1_tail6_samples, - "op1_optref_samples": op1_optref_samples, - "rng_reference": _rng_vectors(), - "rng_states_fx_path": [ - {"state": "dword_10023688", "seed_init": "sub_10002660", "used_by": ["sub_10001720", "sub_10001A40"]}, - {"state": "dword_100238C0", "seed_init": "sub_10003A50", "used_by": ["sub_10002BE0"]}, - {"state": "dword_10024110", "seed_init": "sub_10009180", "used_by": ["sub_10008120", "sub_10007D10"]}, - {"state": "dword_10024810", "seed_init": "sub_1000D370", "used_by": ["sub_1000BF30", "sub_1000C1A0"]}, - {"state": "dword_10024A48", "seed_init": "sub_1000F420", "used_by": ["sub_1000EC50"]}, - {"state": "dword_10024C80", "seed_init": "sub_10010370", "used_by": ["sub_1000F6E0"]}, - {"state": "dword_100250F0", "seed_init": "sub_10012C70", "used_by": ["sub_10011230", "sub_100115C0"]}, - ], - "issues": issues, - } - - -def main() -> int: - parser = argparse.ArgumentParser(description="FXID absolute parity audit.") - parser.add_argument("--input", required=True, help="Root directory with game/test archives.") - parser.add_argument("--report", required=True, help="Output JSON report path.") - args = parser.parse_args() - - root = Path(args.input).resolve() - report_path = Path(args.report).resolve() - payload = run_audit(root) - report_path.parent.mkdir(parents=True, exist_ok=True) - report_path.write_text(json.dumps(payload, indent=2, ensure_ascii=False) + "\n", encoding="utf-8") - - summary = payload["summary"] - print(f"Input root : {root}") - print(f"NRes archives : {summary['archives_total']}") - print(f"FXID payloads : {summary['fxid_ok']}/{summary['fxid_total']} valid") - print(f"Issues : {summary['issues_total']}") - print(f"Opcode6 commands : {summary['op6_commands']}") - print(f"Op1 tail6 nonzero : {summary['op1_tail6_nonzero']}") - print(f"Op1 optref non-empty : {summary['op1_optref_nonempty']}") - print(f"Report : {report_path}") - - return 0 - - -if __name__ == "__main__": - raise SystemExit(main()) diff --git a/tools/init_testdata.py b/tools/init_testdata.py deleted file mode 100644 index 4079cdb..0000000 --- a/tools/init_testdata.py +++ /dev/null @@ -1,204 +0,0 @@ -#!/usr/bin/env python3 -""" -Initialize test data folders by archive signatures. - -The script scans all files in --input and copies matching archives into: - --output/nres/ - --output/rsli/ -""" - -from __future__ import annotations - -import argparse -import shutil -import sys -from pathlib import Path - -MAGIC_NRES = b"NRes" -MAGIC_RSLI = b"NL\x00\x01" - - -def is_relative_to(path: Path, base: Path) -> bool: - try: - path.relative_to(base) - except ValueError: - return False - return True - - -def detect_archive_type(path: Path) -> str | None: - try: - with path.open("rb") as handle: - magic = handle.read(4) - except OSError as exc: - print(f"[warn] cannot read {path}: {exc}", file=sys.stderr) - return None - - if magic == MAGIC_NRES: - return "nres" - if magic == MAGIC_RSLI: - return "rsli" - return None - - -def scan_archives(input_root: Path, excluded_root: Path | None) -> list[tuple[Path, str]]: - found: list[tuple[Path, str]] = [] - for path in sorted(input_root.rglob("*")): - if not path.is_file(): - continue - if excluded_root and is_relative_to(path.resolve(), excluded_root): - continue - - archive_type = detect_archive_type(path) - if archive_type: - found.append((path, archive_type)) - return found - - -def confirm_overwrite(path: Path) -> str: - prompt = ( - f"File exists: {path}\n" - "Overwrite? [y]es / [n]o / [a]ll / [q]uit (default: n): " - ) - while True: - try: - answer = input(prompt).strip().lower() - except EOFError: - return "quit" - - if answer in {"", "n", "no"}: - return "no" - if answer in {"y", "yes"}: - return "yes" - if answer in {"a", "all"}: - return "all" - if answer in {"q", "quit"}: - return "quit" - print("Please answer with y, n, a, or q.") - - -def copy_archives( - archives: list[tuple[Path, str]], - input_root: Path, - output_root: Path, - force: bool, -) -> int: - copied = 0 - skipped = 0 - overwritten = 0 - overwrite_all = force - - type_counts = {"nres": 0, "rsli": 0} - for _, archive_type in archives: - type_counts[archive_type] += 1 - - print( - f"Found archives: total={len(archives)}, " - f"nres={type_counts['nres']}, rsli={type_counts['rsli']}" - ) - - for source, archive_type in archives: - rel_path = source.relative_to(input_root) - destination = output_root / archive_type / rel_path - destination.parent.mkdir(parents=True, exist_ok=True) - - if destination.exists(): - if destination.is_dir(): - print( - f"[error] destination is a directory, expected file: {destination}", - file=sys.stderr, - ) - return 2 - - if not overwrite_all: - if not sys.stdin.isatty(): - print( - "[error] destination file exists but stdin is not interactive. " - "Use --force to overwrite without prompts.", - file=sys.stderr, - ) - return 2 - - decision = confirm_overwrite(destination) - if decision == "quit": - print("Aborted by user.") - return 130 - if decision == "no": - skipped += 1 - continue - if decision == "all": - overwrite_all = True - - overwritten += 1 - - try: - shutil.copy2(source, destination) - except OSError as exc: - print(f"[error] failed to copy {source} -> {destination}: {exc}", file=sys.stderr) - return 2 - copied += 1 - - print( - f"Done: copied={copied}, overwritten={overwritten}, skipped={skipped}, " - f"output={output_root}" - ) - return 0 - - -def build_parser() -> argparse.ArgumentParser: - parser = argparse.ArgumentParser( - description="Initialize test data by scanning NRes/RsLi signatures." - ) - parser.add_argument( - "--input", - required=True, - help="Input directory to scan recursively.", - ) - parser.add_argument( - "--output", - required=True, - help="Output root directory (archives go to nres/ and rsli/ subdirs).", - ) - parser.add_argument( - "--force", - action="store_true", - help="Overwrite destination files without confirmation prompts.", - ) - return parser - - -def main() -> int: - args = build_parser().parse_args() - - input_root = Path(args.input) - if not input_root.exists(): - print(f"[error] input directory does not exist: {input_root}", file=sys.stderr) - return 2 - if not input_root.is_dir(): - print(f"[error] input path is not a directory: {input_root}", file=sys.stderr) - return 2 - - output_root = Path(args.output) - if output_root.exists() and not output_root.is_dir(): - print(f"[error] output path exists and is not a directory: {output_root}", file=sys.stderr) - return 2 - - input_resolved = input_root.resolve() - output_resolved = output_root.resolve() - if input_resolved == output_resolved: - print("[error] input and output directories must be different.", file=sys.stderr) - return 2 - - excluded_root: Path | None = None - if is_relative_to(output_resolved, input_resolved): - excluded_root = output_resolved - print(f"Notice: output is inside input, skipping scan under: {excluded_root}") - - archives = scan_archives(input_root, excluded_root) - - output_root.mkdir(parents=True, exist_ok=True) - return copy_archives(archives, input_root, output_root, force=args.force) - - -if __name__ == "__main__": - raise SystemExit(main()) diff --git a/tools/msh_doc_validator.py b/tools/msh_doc_validator.py deleted file mode 100644 index ff096a4..0000000 --- a/tools/msh_doc_validator.py +++ /dev/null @@ -1,1000 +0,0 @@ -#!/usr/bin/env python3 -""" -Validate assumptions from docs/specs/msh.md on real game archives. - -The tool checks three groups: -1) MSH model payloads (nested NRes in *.msh entries), -2) Texm texture payloads, -3) FXID effect payloads. -""" - -from __future__ import annotations - -import argparse -import json -import math -import struct -from collections import Counter -from pathlib import Path -from typing import Any - -import archive_roundtrip_validator as arv - -MAGIC_NRES = b"NRes" -MAGIC_PAGE = b"Page" - -TYPE_FXID = 0x44495846 -TYPE_TEXM = 0x6D786554 - -FX_CMD_SIZE = {1: 224, 2: 148, 3: 200, 4: 204, 5: 112, 6: 4, 7: 208, 8: 248, 9: 208, 10: 208} -TEXM_KNOWN_FORMATS = {0, 565, 556, 4444, 888, 8888} - - -def _add_issue( - issues: list[dict[str, Any]], - severity: str, - category: str, - archive: Path, - entry_name: str | None, - message: str, -) -> None: - issues.append( - { - "severity": severity, - "category": category, - "archive": str(archive), - "entry": entry_name, - "message": message, - } - ) - - -def _entry_payload(blob: bytes, entry: dict[str, Any]) -> bytes: - start = int(entry["data_offset"]) - end = start + int(entry["size"]) - return blob[start:end] - - -def _entry_by_type(entries: list[dict[str, Any]]) -> dict[int, list[dict[str, Any]]]: - by_type: dict[int, list[dict[str, Any]]] = {} - for item in entries: - by_type.setdefault(int(item["type_id"]), []).append(item) - return by_type - - -def _expect_single_resource( - by_type: dict[int, list[dict[str, Any]]], - type_id: int, - label: str, - issues: list[dict[str, Any]], - archive: Path, - model_name: str, - required: bool, -) -> dict[str, Any] | None: - rows = by_type.get(type_id, []) - if not rows: - if required: - _add_issue( - issues, - "error", - "model-resource", - archive, - model_name, - f"missing required resource type={type_id} ({label})", - ) - return None - if len(rows) > 1: - _add_issue( - issues, - "warning", - "model-resource", - archive, - model_name, - f"multiple resources type={type_id} ({label}); using first entry", - ) - return rows[0] - - -def _check_fixed_stride( - *, - entry: dict[str, Any], - stride: int, - label: str, - issues: list[dict[str, Any]], - archive: Path, - model_name: str, - enforce_attr3: bool = True, - enforce_attr2_zero: bool = True, -) -> int: - size = int(entry["size"]) - attr1 = int(entry["attr1"]) - attr2 = int(entry["attr2"]) - attr3 = int(entry["attr3"]) - - count = -1 - if size % stride != 0: - _add_issue( - issues, - "error", - "model-stride", - archive, - model_name, - f"{label}: size={size} is not divisible by stride={stride}", - ) - else: - count = size // stride - if attr1 != count: - _add_issue( - issues, - "error", - "model-attr", - archive, - model_name, - f"{label}: attr1={attr1} != size/stride={count}", - ) - if enforce_attr3 and attr3 != stride: - _add_issue( - issues, - "error", - "model-attr", - archive, - model_name, - f"{label}: attr3={attr3} != {stride}", - ) - if enforce_attr2_zero and attr2 != 0: - _add_issue( - issues, - "warning", - "model-attr", - archive, - model_name, - f"{label}: attr2={attr2} (expected 0 in known assets)", - ) - return count - - -def _validate_res10( - data: bytes, - node_count: int, - issues: list[dict[str, Any]], - archive: Path, - model_name: str, -) -> None: - off = 0 - for idx in range(node_count): - if off + 4 > len(data): - _add_issue( - issues, - "error", - "res10", - archive, - model_name, - f"record {idx}: missing u32 length (offset={off}, size={len(data)})", - ) - return - ln = struct.unpack_from(" len(data): - _add_issue( - issues, - "error", - "res10", - archive, - model_name, - f"record {idx}: out of bounds (len={ln}, need={need}, offset={off}, size={len(data)})", - ) - return - if ln and data[off + ln] != 0: - _add_issue( - issues, - "warning", - "res10", - archive, - model_name, - f"record {idx}: missing trailing NUL at payload end", - ) - off += need - - if off != len(data): - _add_issue( - issues, - "error", - "res10", - archive, - model_name, - f"tail bytes after node records: consumed={off}, size={len(data)}", - ) - - -def _validate_model_payload( - model_blob: bytes, - archive: Path, - model_name: str, - issues: list[dict[str, Any]], - counters: Counter[str], -) -> None: - counters["models_total"] += 1 - - if model_blob[:4] != MAGIC_NRES: - _add_issue( - issues, - "error", - "model-container", - archive, - model_name, - "payload is not NRes (missing magic)", - ) - return - - try: - parsed = arv.parse_nres(model_blob, source=f"{archive}:{model_name}") - except Exception as exc: # pylint: disable=broad-except - _add_issue( - issues, - "error", - "model-container", - archive, - model_name, - f"cannot parse nested NRes: {exc}", - ) - return - - for item in parsed.get("issues", []): - _add_issue(issues, "warning", "model-container", archive, model_name, str(item)) - - entries = parsed["entries"] - by_type = _entry_by_type(entries) - - res1 = _expect_single_resource(by_type, 1, "Res1", issues, archive, model_name, True) - res2 = _expect_single_resource(by_type, 2, "Res2", issues, archive, model_name, True) - res3 = _expect_single_resource(by_type, 3, "Res3", issues, archive, model_name, True) - res4 = _expect_single_resource(by_type, 4, "Res4", issues, archive, model_name, False) - res5 = _expect_single_resource(by_type, 5, "Res5", issues, archive, model_name, False) - res6 = _expect_single_resource(by_type, 6, "Res6", issues, archive, model_name, True) - res7 = _expect_single_resource(by_type, 7, "Res7", issues, archive, model_name, False) - res8 = _expect_single_resource(by_type, 8, "Res8", issues, archive, model_name, False) - res10 = _expect_single_resource(by_type, 10, "Res10", issues, archive, model_name, False) - res13 = _expect_single_resource(by_type, 13, "Res13", issues, archive, model_name, True) - res15 = _expect_single_resource(by_type, 15, "Res15", issues, archive, model_name, False) - res16 = _expect_single_resource(by_type, 16, "Res16", issues, archive, model_name, False) - res18 = _expect_single_resource(by_type, 18, "Res18", issues, archive, model_name, False) - res19 = _expect_single_resource(by_type, 19, "Res19", issues, archive, model_name, False) - - if not (res1 and res2 and res3 and res6 and res13): - return - - # Res1 - res1_stride = int(res1["attr3"]) - if res1_stride not in (38, 24): - _add_issue( - issues, - "warning", - "res1", - archive, - model_name, - f"unexpected Res1 stride attr3={res1_stride} (known: 38 or 24)", - ) - if res1_stride <= 0: - _add_issue(issues, "error", "res1", archive, model_name, f"invalid Res1 stride={res1_stride}") - return - if int(res1["size"]) % res1_stride != 0: - _add_issue( - issues, - "error", - "res1", - archive, - model_name, - f"Res1 size={res1['size']} not divisible by stride={res1_stride}", - ) - return - node_count = int(res1["size"]) // res1_stride - if int(res1["attr1"]) != node_count: - _add_issue( - issues, - "error", - "res1", - archive, - model_name, - f"Res1 attr1={res1['attr1']} != node_count={node_count}", - ) - - # Res2 - res2_size = int(res2["size"]) - res2_attr1 = int(res2["attr1"]) - res2_attr2 = int(res2["attr2"]) - res2_attr3 = int(res2["attr3"]) - if res2_size < 0x8C: - _add_issue(issues, "error", "res2", archive, model_name, f"Res2 too small: size={res2_size}") - return - slot_bytes = res2_size - 0x8C - slot_count = -1 - if slot_bytes % 68 != 0: - _add_issue( - issues, - "error", - "res2", - archive, - model_name, - f"Res2 slot area not divisible by 68: slot_bytes={slot_bytes}", - ) - else: - slot_count = slot_bytes // 68 - if res2_attr1 != slot_count: - _add_issue( - issues, - "error", - "res2", - archive, - model_name, - f"Res2 attr1={res2_attr1} != slot_count={slot_count}", - ) - if res2_attr2 != 0: - _add_issue( - issues, - "warning", - "res2", - archive, - model_name, - f"Res2 attr2={res2_attr2} (expected 0 in known assets)", - ) - if res2_attr3 != 68: - _add_issue( - issues, - "error", - "res2", - archive, - model_name, - f"Res2 attr3={res2_attr3} != 68", - ) - - # Fixed-stride resources - vertex_count = _check_fixed_stride( - entry=res3, - stride=12, - label="Res3", - issues=issues, - archive=archive, - model_name=model_name, - ) - _ = _check_fixed_stride( - entry=res4, - stride=4, - label="Res4", - issues=issues, - archive=archive, - model_name=model_name, - ) if res4 else None - _ = _check_fixed_stride( - entry=res5, - stride=4, - label="Res5", - issues=issues, - archive=archive, - model_name=model_name, - ) if res5 else None - index_count = _check_fixed_stride( - entry=res6, - stride=2, - label="Res6", - issues=issues, - archive=archive, - model_name=model_name, - ) - tri_desc_count = _check_fixed_stride( - entry=res7, - stride=16, - label="Res7", - issues=issues, - archive=archive, - model_name=model_name, - ) if res7 else -1 - anim_key_count = _check_fixed_stride( - entry=res8, - stride=24, - label="Res8", - issues=issues, - archive=archive, - model_name=model_name, - enforce_attr3=False, # format stores attr3=4 in data set - ) if res8 else -1 - if res8 and int(res8["attr3"]) != 4: - _add_issue( - issues, - "error", - "res8", - archive, - model_name, - f"Res8 attr3={res8['attr3']} != 4", - ) - if res13: - batch_count = _check_fixed_stride( - entry=res13, - stride=20, - label="Res13", - issues=issues, - archive=archive, - model_name=model_name, - ) - else: - batch_count = -1 - if res15: - _check_fixed_stride( - entry=res15, - stride=8, - label="Res15", - issues=issues, - archive=archive, - model_name=model_name, - ) - if res16: - _check_fixed_stride( - entry=res16, - stride=8, - label="Res16", - issues=issues, - archive=archive, - model_name=model_name, - ) - if res18: - _check_fixed_stride( - entry=res18, - stride=4, - label="Res18", - issues=issues, - archive=archive, - model_name=model_name, - ) - - if res19: - anim_map_count = _check_fixed_stride( - entry=res19, - stride=2, - label="Res19", - issues=issues, - archive=archive, - model_name=model_name, - enforce_attr3=False, - enforce_attr2_zero=False, - ) - if int(res19["attr3"]) != 2: - _add_issue( - issues, - "error", - "res19", - archive, - model_name, - f"Res19 attr3={res19['attr3']} != 2", - ) - else: - anim_map_count = -1 - - # Res10 - if res10: - if int(res10["attr1"]) != int(res1["attr1"]): - _add_issue( - issues, - "error", - "res10", - archive, - model_name, - f"Res10 attr1={res10['attr1']} != Res1.attr1={res1['attr1']}", - ) - if int(res10["attr3"]) != 0: - _add_issue( - issues, - "warning", - "res10", - archive, - model_name, - f"Res10 attr3={res10['attr3']} (known assets use 0)", - ) - _validate_res10(_entry_payload(model_blob, res10), node_count, issues, archive, model_name) - - # Cross-table checks. - if vertex_count > 0 and (res4 and int(res4["size"]) // 4 != vertex_count): - _add_issue(issues, "error", "model-cross", archive, model_name, "Res4 count != Res3 count") - if vertex_count > 0 and (res5 and int(res5["size"]) // 4 != vertex_count): - _add_issue(issues, "error", "model-cross", archive, model_name, "Res5 count != Res3 count") - - indices: list[int] = [] - if index_count > 0: - res6_data = _entry_payload(model_blob, res6) - indices = list(struct.unpack_from(f"<{index_count}H", res6_data, 0)) - - if batch_count > 0: - res13_data = _entry_payload(model_blob, res13) - for batch_idx in range(batch_count): - b_off = batch_idx * 20 - ( - _batch_flags, - _mat_idx, - _unk4, - _unk6, - idx_count, - idx_start, - _unk14, - base_vertex, - ) = struct.unpack_from(" 0 and end > index_count: - _add_issue( - issues, - "error", - "res13", - archive, - model_name, - f"batch {batch_idx}: index range [{idx_start}, {end}) outside Res6 count={index_count}", - ) - continue - if idx_count % 3 != 0: - _add_issue( - issues, - "warning", - "res13", - archive, - model_name, - f"batch {batch_idx}: indexCount={idx_count} is not divisible by 3", - ) - if vertex_count > 0 and index_count > 0 and idx_count > 0: - raw_slice = indices[idx_start:end] - max_raw = max(raw_slice) - if base_vertex + max_raw >= vertex_count: - _add_issue( - issues, - "error", - "res13", - archive, - model_name, - f"batch {batch_idx}: baseVertex+maxIndex={base_vertex + max_raw} >= vertex_count={vertex_count}", - ) - - if slot_count > 0: - res2_data = _entry_payload(model_blob, res2) - for slot_idx in range(slot_count): - s_off = 0x8C + slot_idx * 68 - tri_start, tri_count, batch_start, slot_batch_count = struct.unpack_from("<4H", res2_data, s_off) - if tri_desc_count > 0 and tri_start + tri_count > tri_desc_count: - _add_issue( - issues, - "error", - "res2-slot", - archive, - model_name, - f"slot {slot_idx}: tri range [{tri_start}, {tri_start + tri_count}) outside Res7 count={tri_desc_count}", - ) - if batch_count > 0 and batch_start + slot_batch_count > batch_count: - _add_issue( - issues, - "error", - "res2-slot", - archive, - model_name, - f"slot {slot_idx}: batch range [{batch_start}, {batch_start + slot_batch_count}) outside Res13 count={batch_count}", - ) - # Slot bounds are 10 float values. - for f_idx in range(10): - value = struct.unpack_from(" 0: - res7_data = _entry_payload(model_blob, res7) - for tri_idx in range(tri_desc_count): - t_off = tri_idx * 16 - _flags, l0, l1, l2 = struct.unpack_from("<4H", res7_data, t_off) - for link in (l0, l1, l2): - if link != 0xFFFF and link >= tri_desc_count: - _add_issue( - issues, - "error", - "res7", - archive, - model_name, - f"tri {tri_idx}: link {link} outside tri_desc_count={tri_desc_count}", - ) - _ = struct.unpack_from(" 0 and res19: - res19_data = _entry_payload(model_blob, res19) - map_words = list(struct.unpack_from(f"<{anim_map_count}H", res19_data, 0)) - frame_count = int(res19["attr2"]) if res19 else 0 - - for node_idx in range(node_count): - n_off = node_idx * 38 - hdr2 = struct.unpack_from(" 0 and slot_idx >= slot_count: - _add_issue( - issues, - "error", - "res1-slot", - archive, - model_name, - f"node {node_idx}: slotIndex[{w_idx}]={slot_idx} outside slot_count={slot_count}", - ) - - if anim_key_count > 0 and hdr3 != 0xFFFF and hdr3 >= anim_key_count: - _add_issue( - issues, - "error", - "res1-anim", - archive, - model_name, - f"node {node_idx}: fallbackKeyIndex={hdr3} outside Res8 count={anim_key_count}", - ) - if map_words and hdr2 != 0xFFFF and frame_count > 0: - end = hdr2 + frame_count - if end > len(map_words): - _add_issue( - issues, - "error", - "res19-map", - archive, - model_name, - f"node {node_idx}: map range [{hdr2}, {end}) outside Res19 count={len(map_words)}", - ) - - counters["models_ok"] += 1 - - -def _validate_texm_payload( - payload: bytes, - archive: Path, - entry_name: str, - issues: list[dict[str, Any]], - counters: Counter[str], -) -> None: - counters["texm_total"] += 1 - - if len(payload) < 32: - _add_issue( - issues, - "error", - "texm", - archive, - entry_name, - f"payload too small: {len(payload)}", - ) - return - - magic, width, height, mip_count, flags4, flags5, unk6, fmt = struct.unpack_from("<8I", payload, 0) - if magic != TYPE_TEXM: - _add_issue(issues, "error", "texm", archive, entry_name, f"magic=0x{magic:08X} != Texm") - return - if width == 0 or height == 0: - _add_issue(issues, "error", "texm", archive, entry_name, f"invalid size {width}x{height}") - return - if mip_count == 0: - _add_issue(issues, "error", "texm", archive, entry_name, "mipCount=0") - return - if fmt not in TEXM_KNOWN_FORMATS: - _add_issue( - issues, - "error", - "texm", - archive, - entry_name, - f"unknown format code {fmt}", - ) - return - if flags4 not in (0, 32): - _add_issue( - issues, - "warning", - "texm", - archive, - entry_name, - f"flags4={flags4} (known values: 0 or 32)", - ) - if flags5 not in (0, 0x04000000, 0x00800000): - _add_issue( - issues, - "warning", - "texm", - archive, - entry_name, - f"flags5=0x{flags5:08X} (known values: 0, 0x00800000, 0x04000000)", - ) - - bpp = 1 if fmt == 0 else (2 if fmt in (565, 556, 4444) else 4) - pix_sum = 0 - w = width - h = height - for _ in range(mip_count): - pix_sum += w * h - w = max(1, w >> 1) - h = max(1, h >> 1) - size_core = 32 + (1024 if fmt == 0 else 0) + bpp * pix_sum - if size_core > len(payload): - _add_issue( - issues, - "error", - "texm", - archive, - entry_name, - f"sizeCore={size_core} exceeds payload size={len(payload)}", - ) - return - - tail = len(payload) - size_core - if tail > 0: - off = size_core - if tail < 8: - _add_issue( - issues, - "error", - "texm", - archive, - entry_name, - f"tail too short for Page chunk: tail={tail}", - ) - return - if payload[off : off + 4] != MAGIC_PAGE: - _add_issue( - issues, - "error", - "texm", - archive, - entry_name, - f"tail is present but no Page magic at offset {off}", - ) - return - rect_count = struct.unpack_from(" tail: - _add_issue( - issues, - "error", - "texm", - archive, - entry_name, - f"Page chunk truncated: need={need}, tail={tail}", - ) - return - if need != tail: - _add_issue( - issues, - "error", - "texm", - archive, - entry_name, - f"extra bytes after Page chunk: tail={tail}, pageSize={need}", - ) - return - - _ = unk6 # carried as raw field in spec, semantics intentionally unknown. - counters["texm_ok"] += 1 - - -def _validate_fxid_payload( - payload: bytes, - archive: Path, - entry_name: str, - issues: list[dict[str, Any]], - counters: Counter[str], -) -> None: - counters["fxid_total"] += 1 - - if len(payload) < 60: - _add_issue( - issues, - "error", - "fxid", - archive, - entry_name, - f"payload too small: {len(payload)}", - ) - return - - cmd_count = struct.unpack_from(" len(payload): - _add_issue( - issues, - "error", - "fxid", - archive, - entry_name, - f"command {idx}: missing header at offset={ptr}", - ) - return - word = struct.unpack_from(" len(payload): - _add_issue( - issues, - "error", - "fxid", - archive, - entry_name, - f"command {idx}: truncated, need end={ptr + size}, payload={len(payload)}", - ) - return - ptr += size - - if ptr != len(payload): - _add_issue( - issues, - "error", - "fxid", - archive, - entry_name, - f"tail bytes after command stream: parsed_end={ptr}, payload={len(payload)}", - ) - return - - counters["fxid_ok"] += 1 - - -def _scan_nres_files(root: Path) -> list[Path]: - rows = arv.scan_archives(root) - out: list[Path] = [] - for item in rows: - if item["type"] != "nres": - continue - out.append(root / item["relative_path"]) - return out - - -def run_validation(input_root: Path) -> dict[str, Any]: - archives = _scan_nres_files(input_root) - issues: list[dict[str, Any]] = [] - counters: Counter[str] = Counter() - - for archive_path in archives: - counters["archives_total"] += 1 - data = archive_path.read_bytes() - try: - parsed = arv.parse_nres(data, source=str(archive_path)) - except Exception as exc: # pylint: disable=broad-except - _add_issue(issues, "error", "archive", archive_path, None, f"cannot parse NRes: {exc}") - continue - - for item in parsed.get("issues", []): - _add_issue(issues, "warning", "archive", archive_path, None, str(item)) - - for entry in parsed["entries"]: - name = str(entry["name"]) - payload = _entry_payload(data, entry) - type_id = int(entry["type_id"]) - - if name.lower().endswith(".msh"): - _validate_model_payload(payload, archive_path, name, issues, counters) - - if type_id == TYPE_TEXM: - _validate_texm_payload(payload, archive_path, name, issues, counters) - - if type_id == TYPE_FXID: - _validate_fxid_payload(payload, archive_path, name, issues, counters) - - errors = sum(1 for row in issues if row["severity"] == "error") - warnings = sum(1 for row in issues if row["severity"] == "warning") - - return { - "input_root": str(input_root), - "summary": { - "archives_total": counters["archives_total"], - "models_total": counters["models_total"], - "models_ok": counters["models_ok"], - "texm_total": counters["texm_total"], - "texm_ok": counters["texm_ok"], - "fxid_total": counters["fxid_total"], - "fxid_ok": counters["fxid_ok"], - "errors": errors, - "warnings": warnings, - "issues_total": len(issues), - }, - "issues": issues, - } - - -def cmd_scan(args: argparse.Namespace) -> int: - root = Path(args.input).resolve() - report = run_validation(root) - summary = report["summary"] - print(f"Input root : {root}") - print(f"NRes archives : {summary['archives_total']}") - print(f"MSH models : {summary['models_total']}") - print(f"Texm textures : {summary['texm_total']}") - print(f"FXID effects : {summary['fxid_total']}") - return 0 - - -def cmd_validate(args: argparse.Namespace) -> int: - root = Path(args.input).resolve() - report = run_validation(root) - summary = report["summary"] - - if args.report: - arv.dump_json(Path(args.report).resolve(), report) - - print(f"Input root : {root}") - print(f"NRes archives : {summary['archives_total']}") - print(f"MSH models : {summary['models_ok']}/{summary['models_total']} valid") - print(f"Texm textures : {summary['texm_ok']}/{summary['texm_total']} valid") - print(f"FXID effects : {summary['fxid_ok']}/{summary['fxid_total']} valid") - print(f"Issues : {summary['issues_total']} (errors={summary['errors']}, warnings={summary['warnings']})") - - if report["issues"]: - limit = max(1, int(args.print_limit)) - print("\nSample issues:") - for item in report["issues"][:limit]: - where = item["archive"] - if item["entry"]: - where = f"{where}::{item['entry']}" - print(f"- [{item['severity']}] [{item['category']}] {where}: {item['message']}") - if len(report["issues"]) > limit: - print(f"... and {len(report['issues']) - limit} more issue(s)") - - if summary["errors"] > 0: - return 1 - if args.fail_on_warnings and summary["warnings"] > 0: - return 1 - return 0 - - -def build_parser() -> argparse.ArgumentParser: - parser = argparse.ArgumentParser( - description="Validate docs/specs/msh.md assumptions on real archives." - ) - sub = parser.add_subparsers(dest="command", required=True) - - scan = sub.add_parser("scan", help="Quick scan and counts (models/textures/effects).") - scan.add_argument("--input", required=True, help="Root directory with game/test archives.") - scan.set_defaults(func=cmd_scan) - - validate = sub.add_parser("validate", help="Run full spec validation.") - validate.add_argument("--input", required=True, help="Root directory with game/test archives.") - validate.add_argument("--report", help="Optional JSON report output path.") - validate.add_argument( - "--print-limit", - type=int, - default=50, - help="How many issues to print to stdout (default: 50).", - ) - validate.add_argument( - "--fail-on-warnings", - action="store_true", - help="Return non-zero if warnings are present.", - ) - validate.set_defaults(func=cmd_validate) - - return parser - - -def main() -> int: - parser = build_parser() - args = parser.parse_args() - return int(args.func(args)) - - -if __name__ == "__main__": - raise SystemExit(main()) diff --git a/tools/msh_export_obj.py b/tools/msh_export_obj.py deleted file mode 100644 index 75a9602..0000000 --- a/tools/msh_export_obj.py +++ /dev/null @@ -1,357 +0,0 @@ -#!/usr/bin/env python3 -""" -Export NGI MSH geometry to Wavefront OBJ. - -The exporter is intended for inspection/debugging and uses the same -batch/slot selection logic as msh_preview_renderer.py. -""" - -from __future__ import annotations - -import argparse -import math -import struct -from pathlib import Path -from typing import Any - -import archive_roundtrip_validator as arv - -MAGIC_NRES = b"NRes" - - -def _entry_payload(blob: bytes, entry: dict[str, Any]) -> bytes: - start = int(entry["data_offset"]) - end = start + int(entry["size"]) - return blob[start:end] - - -def _parse_nres(blob: bytes, source: str) -> dict[str, Any]: - if blob[:4] != MAGIC_NRES: - raise RuntimeError(f"{source}: not an NRes payload") - return arv.parse_nres(blob, source=source) - - -def _by_type(entries: list[dict[str, Any]]) -> dict[int, list[dict[str, Any]]]: - out: dict[int, list[dict[str, Any]]] = {} - for row in entries: - out.setdefault(int(row["type_id"]), []).append(row) - return out - - -def _get_single(by_type: dict[int, list[dict[str, Any]]], type_id: int, label: str) -> dict[str, Any]: - rows = by_type.get(type_id, []) - if not rows: - raise RuntimeError(f"missing resource type {type_id} ({label})") - return rows[0] - - -def _pick_model_payload(archive_path: Path, model_name: str | None) -> tuple[bytes, str]: - root_blob = archive_path.read_bytes() - parsed = _parse_nres(root_blob, str(archive_path)) - - msh_entries = [row for row in parsed["entries"] if str(row["name"]).lower().endswith(".msh")] - if msh_entries: - chosen: dict[str, Any] | None = None - if model_name: - model_l = model_name.lower() - for row in msh_entries: - name_l = str(row["name"]).lower() - if name_l == model_l: - chosen = row - break - if chosen is None: - for row in msh_entries: - if str(row["name"]).lower().startswith(model_l): - chosen = row - break - else: - chosen = msh_entries[0] - - if chosen is None: - names = ", ".join(str(row["name"]) for row in msh_entries[:12]) - raise RuntimeError( - f"model '{model_name}' not found in {archive_path}. Available: {names}" - ) - return _entry_payload(root_blob, chosen), str(chosen["name"]) - - by_type = _by_type(parsed["entries"]) - if all(k in by_type for k in (1, 2, 3, 6, 13)): - return root_blob, archive_path.name - - raise RuntimeError( - f"{archive_path} does not contain .msh entries and does not look like a direct model payload" - ) - - -def _extract_geometry( - model_blob: bytes, - *, - lod: int, - group: int, - max_faces: int, - all_batches: bool, -) -> tuple[list[tuple[float, float, float]], list[tuple[int, int, int]], dict[str, int]]: - parsed = _parse_nres(model_blob, "") - by_type = _by_type(parsed["entries"]) - - res1 = _get_single(by_type, 1, "Res1") - res2 = _get_single(by_type, 2, "Res2") - res3 = _get_single(by_type, 3, "Res3") - res6 = _get_single(by_type, 6, "Res6") - res13 = _get_single(by_type, 13, "Res13") - - pos_blob = _entry_payload(model_blob, res3) - if len(pos_blob) % 12 != 0: - raise RuntimeError(f"Res3 size is not divisible by 12: {len(pos_blob)}") - vertex_count = len(pos_blob) // 12 - positions = [struct.unpack_from("<3f", pos_blob, i * 12) for i in range(vertex_count)] - - idx_blob = _entry_payload(model_blob, res6) - if len(idx_blob) % 2 != 0: - raise RuntimeError(f"Res6 size is not divisible by 2: {len(idx_blob)}") - index_count = len(idx_blob) // 2 - indices = list(struct.unpack_from(f"<{index_count}H", idx_blob, 0)) - - batch_blob = _entry_payload(model_blob, res13) - if len(batch_blob) % 20 != 0: - raise RuntimeError(f"Res13 size is not divisible by 20: {len(batch_blob)}") - batch_count = len(batch_blob) // 20 - batches: list[tuple[int, int, int, int]] = [] - for i in range(batch_count): - off = i * 20 - idx_count = struct.unpack_from("= 38 and len(res1_blob) >= node_count * node_stride: - if lod < 0 or lod > 2: - raise RuntimeError(f"lod must be 0..2 (got {lod})") - if group < 0 or group > 4: - raise RuntimeError(f"group must be 0..4 (got {group})") - matrix_index = lod * 5 + group - for n in range(node_count): - off = n * node_stride + 8 + matrix_index * 2 - slot_idx = struct.unpack_from("= slot_count: - continue - node_slot_indices.append(slot_idx) - - faces: list[tuple[int, int, int]] = [] - used_batches = 0 - used_slots = 0 - - def append_batch(batch_idx: int) -> None: - nonlocal used_batches - if batch_idx < 0 or batch_idx >= len(batches): - return - idx_count, idx_start, base_vertex, _ = batches[batch_idx] - if idx_count < 3: - return - end = idx_start + idx_count - if end > len(indices): - return - used_batches += 1 - tri_count = idx_count // 3 - for t in range(tri_count): - i0 = indices[idx_start + t * 3 + 0] + base_vertex - i1 = indices[idx_start + t * 3 + 1] + base_vertex - i2 = indices[idx_start + t * 3 + 2] + base_vertex - if i0 >= vertex_count or i1 >= vertex_count or i2 >= vertex_count: - continue - faces.append((i0, i1, i2)) - if len(faces) >= max_faces: - return - - if node_slot_indices: - for slot_idx in node_slot_indices: - if len(faces) >= max_faces: - break - _tri_start, _tri_count, batch_start, slot_batch_count = slots[slot_idx] - used_slots += 1 - for bi in range(batch_start, batch_start + slot_batch_count): - append_batch(bi) - if len(faces) >= max_faces: - break - else: - for bi in range(batch_count): - append_batch(bi) - if len(faces) >= max_faces: - break - - if not faces: - raise RuntimeError("no faces selected for export") - - meta = { - "vertex_count": vertex_count, - "index_count": index_count, - "batch_count": batch_count, - "slot_count": slot_count, - "node_count": node_count, - "used_slots": used_slots, - "used_batches": used_batches, - "face_count": len(faces), - } - return positions, faces, meta - - -def _compute_vertex_normals( - positions: list[tuple[float, float, float]], - faces: list[tuple[int, int, int]], -) -> list[tuple[float, float, float]]: - acc = [[0.0, 0.0, 0.0] for _ in positions] - for i0, i1, i2 in faces: - p0 = positions[i0] - p1 = positions[i1] - p2 = positions[i2] - ux = p1[0] - p0[0] - uy = p1[1] - p0[1] - uz = p1[2] - p0[2] - vx = p2[0] - p0[0] - vy = p2[1] - p0[1] - vz = p2[2] - p0[2] - nx = uy * vz - uz * vy - ny = uz * vx - ux * vz - nz = ux * vy - uy * vx - acc[i0][0] += nx - acc[i0][1] += ny - acc[i0][2] += nz - acc[i1][0] += nx - acc[i1][1] += ny - acc[i1][2] += nz - acc[i2][0] += nx - acc[i2][1] += ny - acc[i2][2] += nz - - normals: list[tuple[float, float, float]] = [] - for nx, ny, nz in acc: - ln = math.sqrt(nx * nx + ny * ny + nz * nz) - if ln <= 1e-12: - normals.append((0.0, 1.0, 0.0)) - else: - normals.append((nx / ln, ny / ln, nz / ln)) - return normals - - -def _write_obj( - output_path: Path, - object_name: str, - positions: list[tuple[float, float, float]], - faces: list[tuple[int, int, int]], -) -> None: - output_path.parent.mkdir(parents=True, exist_ok=True) - normals = _compute_vertex_normals(positions, faces) - - with output_path.open("w", encoding="utf-8", newline="\n") as out: - out.write("# Exported by msh_export_obj.py\n") - out.write(f"o {object_name}\n") - for x, y, z in positions: - out.write(f"v {x:.9g} {y:.9g} {z:.9g}\n") - for nx, ny, nz in normals: - out.write(f"vn {nx:.9g} {ny:.9g} {nz:.9g}\n") - for i0, i1, i2 in faces: - a = i0 + 1 - b = i1 + 1 - c = i2 + 1 - out.write(f"f {a}//{a} {b}//{b} {c}//{c}\n") - - -def cmd_list_models(args: argparse.Namespace) -> int: - archive_path = Path(args.archive).resolve() - blob = archive_path.read_bytes() - parsed = _parse_nres(blob, str(archive_path)) - rows = [row for row in parsed["entries"] if str(row["name"]).lower().endswith(".msh")] - print(f"Archive: {archive_path}") - print(f"MSH entries: {len(rows)}") - for row in rows: - print(f"- {row['name']}") - return 0 - - -def cmd_export(args: argparse.Namespace) -> int: - archive_path = Path(args.archive).resolve() - output_path = Path(args.output).resolve() - - model_blob, model_label = _pick_model_payload(archive_path, args.model) - positions, faces, meta = _extract_geometry( - model_blob, - lod=int(args.lod), - group=int(args.group), - max_faces=int(args.max_faces), - all_batches=bool(args.all_batches), - ) - obj_name = Path(model_label).stem or "msh_model" - _write_obj(output_path, obj_name, positions, faces) - - print(f"Exported model : {model_label}") - print(f"Output OBJ : {output_path}") - print(f"Object name : {obj_name}") - print( - "Geometry : " - f"vertices={meta['vertex_count']}, faces={meta['face_count']}, " - f"batches={meta['used_batches']}/{meta['batch_count']}, slots={meta['used_slots']}/{meta['slot_count']}" - ) - print( - "Mode : " - f"lod={args.lod}, group={args.group}, all_batches={bool(args.all_batches)}" - ) - return 0 - - -def build_parser() -> argparse.ArgumentParser: - parser = argparse.ArgumentParser( - description="Export NGI MSH geometry to Wavefront OBJ." - ) - sub = parser.add_subparsers(dest="command", required=True) - - list_models = sub.add_parser("list-models", help="List .msh entries in an NRes archive.") - list_models.add_argument("--archive", required=True, help="Path to archive (e.g. animals.rlb).") - list_models.set_defaults(func=cmd_list_models) - - export = sub.add_parser("export", help="Export one model to OBJ.") - export.add_argument("--archive", required=True, help="Path to NRes archive or direct model payload.") - export.add_argument( - "--model", - help="Model entry name (*.msh) inside archive. If omitted, first .msh is used.", - ) - export.add_argument("--output", required=True, help="Output .obj path.") - export.add_argument("--lod", type=int, default=0, help="LOD index 0..2 (default: 0).") - export.add_argument("--group", type=int, default=0, help="Group index 0..4 (default: 0).") - export.add_argument("--max-faces", type=int, default=120000, help="Face limit (default: 120000).") - export.add_argument( - "--all-batches", - action="store_true", - help="Ignore slot matrix selection and export all batches.", - ) - export.set_defaults(func=cmd_export) - - return parser - - -def main() -> int: - parser = build_parser() - args = parser.parse_args() - return int(args.func(args)) - - -if __name__ == "__main__": - raise SystemExit(main()) diff --git a/tools/msh_preview_renderer.py b/tools/msh_preview_renderer.py deleted file mode 100644 index 53b4e63..0000000 --- a/tools/msh_preview_renderer.py +++ /dev/null @@ -1,481 +0,0 @@ -#!/usr/bin/env python3 -""" -Primitive software renderer for NGI MSH models. - -Output format: binary PPM (P6), no external dependencies. -""" - -from __future__ import annotations - -import argparse -import math -import struct -from pathlib import Path -from typing import Any - -import archive_roundtrip_validator as arv - -MAGIC_NRES = b"NRes" - - -def _entry_payload(blob: bytes, entry: dict[str, Any]) -> bytes: - start = int(entry["data_offset"]) - end = start + int(entry["size"]) - return blob[start:end] - - -def _parse_nres(blob: bytes, source: str) -> dict[str, Any]: - if blob[:4] != MAGIC_NRES: - raise RuntimeError(f"{source}: not an NRes payload") - return arv.parse_nres(blob, source=source) - - -def _by_type(entries: list[dict[str, Any]]) -> dict[int, list[dict[str, Any]]]: - out: dict[int, list[dict[str, Any]]] = {} - for row in entries: - out.setdefault(int(row["type_id"]), []).append(row) - return out - - -def _pick_model_payload(archive_path: Path, model_name: str | None) -> tuple[bytes, str]: - root_blob = archive_path.read_bytes() - parsed = _parse_nres(root_blob, str(archive_path)) - - msh_entries = [row for row in parsed["entries"] if str(row["name"]).lower().endswith(".msh")] - if msh_entries: - chosen: dict[str, Any] | None = None - if model_name: - model_l = model_name.lower() - for row in msh_entries: - name_l = str(row["name"]).lower() - if name_l == model_l: - chosen = row - break - if chosen is None: - for row in msh_entries: - if str(row["name"]).lower().startswith(model_l): - chosen = row - break - else: - chosen = msh_entries[0] - - if chosen is None: - names = ", ".join(str(row["name"]) for row in msh_entries[:12]) - raise RuntimeError( - f"model '{model_name}' not found in {archive_path}. Available: {names}" - ) - return _entry_payload(root_blob, chosen), str(chosen["name"]) - - # Fallback: treat file itself as a model NRes payload. - by_type = _by_type(parsed["entries"]) - if all(k in by_type for k in (1, 2, 3, 6, 13)): - return root_blob, archive_path.name - - raise RuntimeError( - f"{archive_path} does not contain .msh entries and does not look like a direct model payload" - ) - - -def _get_single(by_type: dict[int, list[dict[str, Any]]], type_id: int, label: str) -> dict[str, Any]: - rows = by_type.get(type_id, []) - if not rows: - raise RuntimeError(f"missing resource type {type_id} ({label})") - return rows[0] - - -def _extract_geometry( - model_blob: bytes, - *, - lod: int, - group: int, - max_faces: int, -) -> tuple[list[tuple[float, float, float]], list[tuple[int, int, int]], dict[str, int]]: - parsed = _parse_nres(model_blob, "") - by_type = _by_type(parsed["entries"]) - - res1 = _get_single(by_type, 1, "Res1") - res2 = _get_single(by_type, 2, "Res2") - res3 = _get_single(by_type, 3, "Res3") - res6 = _get_single(by_type, 6, "Res6") - res13 = _get_single(by_type, 13, "Res13") - - # Positions - pos_blob = _entry_payload(model_blob, res3) - if len(pos_blob) % 12 != 0: - raise RuntimeError(f"Res3 size is not divisible by 12: {len(pos_blob)}") - vertex_count = len(pos_blob) // 12 - positions = [struct.unpack_from("<3f", pos_blob, i * 12) for i in range(vertex_count)] - - # Indices - idx_blob = _entry_payload(model_blob, res6) - if len(idx_blob) % 2 != 0: - raise RuntimeError(f"Res6 size is not divisible by 2: {len(idx_blob)}") - index_count = len(idx_blob) // 2 - indices = list(struct.unpack_from(f"<{index_count}H", idx_blob, 0)) - - # Batches - batch_blob = _entry_payload(model_blob, res13) - if len(batch_blob) % 20 != 0: - raise RuntimeError(f"Res13 size is not divisible by 20: {len(batch_blob)}") - batch_count = len(batch_blob) // 20 - batches: list[tuple[int, int, int, int]] = [] - for i in range(batch_count): - off = i * 20 - # Keep only fields used by renderer: - # indexCount, indexStart, baseVertex - idx_count = struct.unpack_from("= 38 and len(res1_blob) >= node_count * node_stride: - if lod < 0 or lod > 2: - raise RuntimeError(f"lod must be 0..2 (got {lod})") - if group < 0 or group > 4: - raise RuntimeError(f"group must be 0..4 (got {group})") - matrix_index = lod * 5 + group - for n in range(node_count): - off = n * node_stride + 8 + matrix_index * 2 - slot_idx = struct.unpack_from("= slot_count: - continue - node_slot_indices.append(slot_idx) - - # Build triangle list. - faces: list[tuple[int, int, int]] = [] - used_batches = 0 - used_slots = 0 - - def append_batch(batch_idx: int) -> None: - nonlocal used_batches - if batch_idx < 0 or batch_idx >= len(batches): - return - idx_count, idx_start, base_vertex, _ = batches[batch_idx] - if idx_count < 3: - return - end = idx_start + idx_count - if end > len(indices): - return - used_batches += 1 - tri_count = idx_count // 3 - for t in range(tri_count): - i0 = indices[idx_start + t * 3 + 0] + base_vertex - i1 = indices[idx_start + t * 3 + 1] + base_vertex - i2 = indices[idx_start + t * 3 + 2] + base_vertex - if i0 >= vertex_count or i1 >= vertex_count or i2 >= vertex_count: - continue - faces.append((i0, i1, i2)) - if len(faces) >= max_faces: - return - - if node_slot_indices: - for slot_idx in node_slot_indices: - if len(faces) >= max_faces: - break - _tri_start, _tri_count, batch_start, slot_batch_count = slots[slot_idx] - used_slots += 1 - for bi in range(batch_start, batch_start + slot_batch_count): - append_batch(bi) - if len(faces) >= max_faces: - break - else: - # Fallback if slot matrix is unavailable: draw all batches. - for bi in range(batch_count): - append_batch(bi) - if len(faces) >= max_faces: - break - - meta = { - "vertex_count": vertex_count, - "index_count": index_count, - "batch_count": batch_count, - "slot_count": slot_count, - "node_count": node_count, - "used_slots": used_slots, - "used_batches": used_batches, - "face_count": len(faces), - } - if not faces: - raise RuntimeError("no faces selected for rendering") - return positions, faces, meta - - -def _write_ppm(path: Path, width: int, height: int, rgb: bytearray) -> None: - path.parent.mkdir(parents=True, exist_ok=True) - with path.open("wb") as handle: - handle.write(f"P6\n{width} {height}\n255\n".encode("ascii")) - handle.write(rgb) - - -def _render_software( - positions: list[tuple[float, float, float]], - faces: list[tuple[int, int, int]], - *, - width: int, - height: int, - yaw_deg: float, - pitch_deg: float, - wireframe: bool, -) -> bytearray: - xs = [p[0] for p in positions] - ys = [p[1] for p in positions] - zs = [p[2] for p in positions] - cx = (min(xs) + max(xs)) * 0.5 - cy = (min(ys) + max(ys)) * 0.5 - cz = (min(zs) + max(zs)) * 0.5 - span = max(max(xs) - min(xs), max(ys) - min(ys), max(zs) - min(zs)) - radius = max(span * 0.5, 1e-3) - - yaw = math.radians(yaw_deg) - pitch = math.radians(pitch_deg) - cyaw = math.cos(yaw) - syaw = math.sin(yaw) - cpitch = math.cos(pitch) - spitch = math.sin(pitch) - - camera_dist = radius * 3.2 - scale = min(width, height) * 0.95 - - # Transform all vertices once. - vx: list[float] = [] - vy: list[float] = [] - vz: list[float] = [] - sx: list[float] = [] - sy: list[float] = [] - for x, y, z in positions: - x0 = x - cx - y0 = y - cy - z0 = z - cz - x1 = cyaw * x0 + syaw * z0 - z1 = -syaw * x0 + cyaw * z0 - y2 = cpitch * y0 - spitch * z1 - z2 = spitch * y0 + cpitch * z1 + camera_dist - if z2 < 1e-3: - z2 = 1e-3 - vx.append(x1) - vy.append(y2) - vz.append(z2) - sx.append(width * 0.5 + (x1 / z2) * scale) - sy.append(height * 0.5 - (y2 / z2) * scale) - - rgb = bytearray([16, 18, 24] * (width * height)) - zbuf = [float("inf")] * (width * height) - light_dir = (0.35, 0.45, 1.0) - l_len = math.sqrt(light_dir[0] ** 2 + light_dir[1] ** 2 + light_dir[2] ** 2) - light = (light_dir[0] / l_len, light_dir[1] / l_len, light_dir[2] / l_len) - - def edge(ax: float, ay: float, bx: float, by: float, px: float, py: float) -> float: - return (px - ax) * (by - ay) - (py - ay) * (bx - ax) - - for i0, i1, i2 in faces: - x0 = sx[i0] - y0 = sy[i0] - x1 = sx[i1] - y1 = sy[i1] - x2 = sx[i2] - y2 = sy[i2] - area = edge(x0, y0, x1, y1, x2, y2) - if area == 0.0: - continue - - # Shading from camera-space normal. - ux = vx[i1] - vx[i0] - uy = vy[i1] - vy[i0] - uz = vz[i1] - vz[i0] - wx = vx[i2] - vx[i0] - wy = vy[i2] - vy[i0] - wz = vz[i2] - vz[i0] - nx = uy * wz - uz * wy - ny = uz * wx - ux * wz - nz = ux * wy - uy * wx - n_len = math.sqrt(nx * nx + ny * ny + nz * nz) - if n_len > 0.0: - nx /= n_len - ny /= n_len - nz /= n_len - intensity = nx * light[0] + ny * light[1] + nz * light[2] - if intensity < 0.0: - intensity = 0.0 - shade = int(45 + 200 * intensity) - color = (shade, shade, min(255, shade + 18)) - - minx = int(max(0, math.floor(min(x0, x1, x2)))) - maxx = int(min(width - 1, math.ceil(max(x0, x1, x2)))) - miny = int(max(0, math.floor(min(y0, y1, y2)))) - maxy = int(min(height - 1, math.ceil(max(y0, y1, y2)))) - if minx > maxx or miny > maxy: - continue - - z0 = vz[i0] - z1 = vz[i1] - z2 = vz[i2] - - for py in range(miny, maxy + 1): - fy = py + 0.5 - row = py * width - for px in range(minx, maxx + 1): - fx = px + 0.5 - w0 = edge(x1, y1, x2, y2, fx, fy) - w1 = edge(x2, y2, x0, y0, fx, fy) - w2 = edge(x0, y0, x1, y1, fx, fy) - if area > 0: - if w0 < 0 or w1 < 0 or w2 < 0: - continue - else: - if w0 > 0 or w1 > 0 or w2 > 0: - continue - inv_area = 1.0 / area - bz0 = w0 * inv_area - bz1 = w1 * inv_area - bz2 = w2 * inv_area - depth = bz0 * z0 + bz1 * z1 + bz2 * z2 - idx = row + px - if depth >= zbuf[idx]: - continue - zbuf[idx] = depth - p = idx * 3 - rgb[p + 0] = color[0] - rgb[p + 1] = color[1] - rgb[p + 2] = color[2] - - if wireframe: - def draw_line(xa: float, ya: float, xb: float, yb: float) -> None: - x0i = int(round(xa)) - y0i = int(round(ya)) - x1i = int(round(xb)) - y1i = int(round(yb)) - dx = abs(x1i - x0i) - sx_step = 1 if x0i < x1i else -1 - dy = -abs(y1i - y0i) - sy_step = 1 if y0i < y1i else -1 - err = dx + dy - x = x0i - y = y0i - while True: - if 0 <= x < width and 0 <= y < height: - p = (y * width + x) * 3 - rgb[p + 0] = 240 - rgb[p + 1] = 245 - rgb[p + 2] = 255 - if x == x1i and y == y1i: - break - e2 = 2 * err - if e2 >= dy: - err += dy - x += sx_step - if e2 <= dx: - err += dx - y += sy_step - - for i0, i1, i2 in faces: - draw_line(sx[i0], sy[i0], sx[i1], sy[i1]) - draw_line(sx[i1], sy[i1], sx[i2], sy[i2]) - draw_line(sx[i2], sy[i2], sx[i0], sy[i0]) - - return rgb - - -def cmd_list_models(args: argparse.Namespace) -> int: - archive_path = Path(args.archive).resolve() - blob = archive_path.read_bytes() - parsed = _parse_nres(blob, str(archive_path)) - rows = [row for row in parsed["entries"] if str(row["name"]).lower().endswith(".msh")] - print(f"Archive: {archive_path}") - print(f"MSH entries: {len(rows)}") - for row in rows: - print(f"- {row['name']}") - return 0 - - -def cmd_render(args: argparse.Namespace) -> int: - archive_path = Path(args.archive).resolve() - output_path = Path(args.output).resolve() - - model_blob, model_label = _pick_model_payload(archive_path, args.model) - positions, faces, meta = _extract_geometry( - model_blob, - lod=int(args.lod), - group=int(args.group), - max_faces=int(args.max_faces), - ) - rgb = _render_software( - positions, - faces, - width=int(args.width), - height=int(args.height), - yaw_deg=float(args.yaw), - pitch_deg=float(args.pitch), - wireframe=bool(args.wireframe), - ) - _write_ppm(output_path, int(args.width), int(args.height), rgb) - - print(f"Rendered model: {model_label}") - print(f"Output : {output_path}") - print( - "Geometry : " - f"vertices={meta['vertex_count']}, faces={meta['face_count']}, " - f"batches={meta['used_batches']}/{meta['batch_count']}, slots={meta['used_slots']}/{meta['slot_count']}" - ) - print(f"Mode : lod={args.lod}, group={args.group}, wireframe={bool(args.wireframe)}") - return 0 - - -def build_parser() -> argparse.ArgumentParser: - parser = argparse.ArgumentParser( - description="Primitive NGI MSH renderer (software, dependency-free)." - ) - sub = parser.add_subparsers(dest="command", required=True) - - list_models = sub.add_parser("list-models", help="List .msh entries in an NRes archive.") - list_models.add_argument("--archive", required=True, help="Path to archive (e.g. animals.rlb).") - list_models.set_defaults(func=cmd_list_models) - - render = sub.add_parser("render", help="Render one model to PPM image.") - render.add_argument("--archive", required=True, help="Path to NRes archive or direct model payload.") - render.add_argument( - "--model", - help="Model entry name (*.msh) inside archive. If omitted, first .msh is used.", - ) - render.add_argument("--output", required=True, help="Output .ppm file path.") - render.add_argument("--lod", type=int, default=0, help="LOD index 0..2 (default: 0).") - render.add_argument("--group", type=int, default=0, help="Group index 0..4 (default: 0).") - render.add_argument("--max-faces", type=int, default=120000, help="Face limit (default: 120000).") - render.add_argument("--width", type=int, default=1280, help="Image width (default: 1280).") - render.add_argument("--height", type=int, default=720, help="Image height (default: 720).") - render.add_argument("--yaw", type=float, default=35.0, help="Yaw angle in degrees (default: 35).") - render.add_argument("--pitch", type=float, default=18.0, help="Pitch angle in degrees (default: 18).") - render.add_argument("--wireframe", action="store_true", help="Draw white wireframe overlay.") - render.set_defaults(func=cmd_render) - - return parser - - -def main() -> int: - parser = build_parser() - args = parser.parse_args() - return int(args.func(args)) - - -if __name__ == "__main__": - raise SystemExit(main()) diff --git a/tools/terrain_map_doc_validator.py b/tools/terrain_map_doc_validator.py deleted file mode 100644 index 63c3077..0000000 --- a/tools/terrain_map_doc_validator.py +++ /dev/null @@ -1,809 +0,0 @@ -#!/usr/bin/env python3 -""" -Validate terrain/map documentation assumptions against real game data. - -Targets: -- tmp/gamedata/DATA/MAPS/**/Land.msh -- tmp/gamedata/DATA/MAPS/**/Land.map -""" - -from __future__ import annotations - -import argparse -import json -import math -import struct -from collections import Counter, defaultdict -from dataclasses import dataclass -from pathlib import Path -from typing import Any - -import archive_roundtrip_validator as arv - -MAGIC_NRES = b"NRes" - -REQUIRED_MSH_TYPES = (1, 2, 3, 4, 5, 11, 18, 21) -OPTIONAL_MSH_TYPES = (14,) -EXPECTED_MSH_ORDER = (1, 2, 3, 4, 5, 18, 14, 11, 21) - -MSH_STRIDES = { - 1: 38, - 3: 12, - 4: 4, - 5: 4, - 11: 4, - 14: 4, - 18: 4, - 21: 28, -} - -SLOT_TABLE_OFFSET = 0x8C - - -@dataclass -class ValidationIssue: - severity: str # error | warning - category: str - resource: str - message: str - - -class TerrainMapDocValidator: - def __init__(self) -> None: - self.issues: list[ValidationIssue] = [] - self.stats: dict[str, Any] = { - "maps_total": 0, - "msh_total": 0, - "map_total": 0, - "msh_type_orders": Counter(), - "msh_attr_triplets": defaultdict(Counter), # type_id -> Counter[(a1,a2,a3)] - "msh_type11_header_words": Counter(), - "msh_type21_flags_top": Counter(), - "map_logic_flags": Counter(), - "map_class_ids": Counter(), # record +40 - "map_poly_count": Counter(), - "map_vertex_count_min": None, - "map_vertex_count_max": None, - "map_cell_dims": Counter(), - "map_reserved_u12": Counter(), - "map_reserved_u36": Counter(), - "map_reserved_u44": Counter(), - "map_area_delta_abs_max": 0.0, - "map_area_delta_rel_max": 0.0, - "map_area_rel_gt_05_count": 0, - "map_normal_len_min": None, - "map_normal_len_max": None, - "map_records_total": 0, - } - - def add_issue(self, severity: str, category: str, resource: Path, message: str) -> None: - self.issues.append( - ValidationIssue( - severity=severity, - category=category, - resource=str(resource), - message=message, - ) - ) - - def _entry_payload(self, blob: bytes, entry: dict[str, Any]) -> bytes: - start = int(entry["data_offset"]) - end = start + int(entry["size"]) - return blob[start:end] - - def _entry_by_type(self, entries: list[dict[str, Any]]) -> dict[int, list[dict[str, Any]]]: - by_type: dict[int, list[dict[str, Any]]] = {} - for item in entries: - by_type.setdefault(int(item["type_id"]), []).append(item) - return by_type - - def _expect_single_type( - self, - *, - by_type: dict[int, list[dict[str, Any]]], - type_id: int, - label: str, - resource: Path, - required: bool, - ) -> dict[str, Any] | None: - rows = by_type.get(type_id, []) - if not rows: - if required: - self.add_issue( - "error", - "msh-chunk", - resource, - f"missing required chunk type={type_id} ({label})", - ) - return None - if len(rows) > 1: - self.add_issue( - "warning", - "msh-chunk", - resource, - f"multiple chunks type={type_id} ({label}); using first", - ) - return rows[0] - - def _check_stride( - self, - *, - resource: Path, - entry: dict[str, Any], - stride: int, - label: str, - ) -> int: - size = int(entry["size"]) - attr1 = int(entry["attr1"]) - attr2 = int(entry["attr2"]) - attr3 = int(entry["attr3"]) - self.stats["msh_attr_triplets"][int(entry["type_id"])][(attr1, attr2, attr3)] += 1 - - if size % stride != 0: - self.add_issue( - "error", - "msh-stride", - resource, - f"{label}: size={size} is not divisible by stride={stride}", - ) - return -1 - - count = size // stride - if attr1 != count: - self.add_issue( - "error", - "msh-attr", - resource, - f"{label}: attr1={attr1} != size/stride={count}", - ) - if attr3 != stride: - self.add_issue( - "error", - "msh-attr", - resource, - f"{label}: attr3={attr3} != {stride}", - ) - if attr2 != 0 and int(entry["type_id"]) not in (1,): - # type 1 has non-zero attr2 in real assets, others are expected zero. - self.add_issue( - "warning", - "msh-attr", - resource, - f"{label}: attr2={attr2} (expected 0 for this chunk type)", - ) - return count - - def validate_msh(self, path: Path) -> None: - self.stats["msh_total"] += 1 - blob = path.read_bytes() - if blob[:4] != MAGIC_NRES: - self.add_issue("error", "msh-container", path, "file is not NRes") - return - - try: - parsed = arv.parse_nres(blob, source=str(path)) - except Exception as exc: # pylint: disable=broad-except - self.add_issue("error", "msh-container", path, f"failed to parse NRes: {exc}") - return - - for issue in parsed.get("issues", []): - self.add_issue("warning", "msh-nres", path, issue) - - entries = parsed["entries"] - types_order = tuple(int(item["type_id"]) for item in entries) - self.stats["msh_type_orders"][types_order] += 1 - if types_order != EXPECTED_MSH_ORDER: - self.add_issue( - "warning", - "msh-order", - path, - f"unexpected chunk order {types_order}, expected {EXPECTED_MSH_ORDER}", - ) - - by_type = self._entry_by_type(entries) - - chunks: dict[int, dict[str, Any]] = {} - for type_id in REQUIRED_MSH_TYPES: - chunk = self._expect_single_type( - by_type=by_type, - type_id=type_id, - label=f"type{type_id}", - resource=path, - required=True, - ) - if chunk: - chunks[type_id] = chunk - for type_id in OPTIONAL_MSH_TYPES: - chunk = self._expect_single_type( - by_type=by_type, - type_id=type_id, - label=f"type{type_id}", - resource=path, - required=False, - ) - if chunk: - chunks[type_id] = chunk - - for type_id, stride in MSH_STRIDES.items(): - chunk = chunks.get(type_id) - if not chunk: - continue - self._check_stride(resource=path, entry=chunk, stride=stride, label=f"type{type_id}") - - # type 2 includes 0x8C-byte header + 68-byte slot table entries. - type2 = chunks.get(2) - if type2: - size = int(type2["size"]) - attr1 = int(type2["attr1"]) - attr2 = int(type2["attr2"]) - attr3 = int(type2["attr3"]) - self.stats["msh_attr_triplets"][2][(attr1, attr2, attr3)] += 1 - if attr3 != 68: - self.add_issue( - "error", - "msh-attr", - path, - f"type2: attr3={attr3} != 68", - ) - if attr2 != 0: - self.add_issue( - "warning", - "msh-attr", - path, - f"type2: attr2={attr2} (expected 0)", - ) - if size < SLOT_TABLE_OFFSET: - self.add_issue( - "error", - "msh-size", - path, - f"type2: size={size} < header_size={SLOT_TABLE_OFFSET}", - ) - elif (size - SLOT_TABLE_OFFSET) % 68 != 0: - self.add_issue( - "error", - "msh-size", - path, - f"type2: (size - 0x8C) is not divisible by 68 (size={size})", - ) - else: - slots_by_size = (size - SLOT_TABLE_OFFSET) // 68 - if attr1 != slots_by_size: - self.add_issue( - "error", - "msh-attr", - path, - f"type2: attr1={attr1} != (size-0x8C)/68={slots_by_size}", - ) - - verts = chunks.get(3) - face = chunks.get(21) - slots = chunks.get(2) - nodes = chunks.get(1) - type11 = chunks.get(11) - - if verts and face: - vcount = int(verts["attr1"]) - face_payload = self._entry_payload(blob, face) - fcount = int(face["attr1"]) - if len(face_payload) >= 28: - for idx in range(fcount): - off = idx * 28 - if off + 28 > len(face_payload): - self.add_issue( - "error", - "msh-face", - path, - f"type21 truncated at face {idx}", - ) - break - flags = struct.unpack_from("= vcount: - self.add_issue( - "error", - "msh-face-index", - path, - f"type21[{idx}].{name}={value} out of range vertex_count={vcount}", - ) - n0, n1, n2 = struct.unpack_from("= fcount: - self.add_issue( - "error", - "msh-face-neighbour", - path, - f"type21[{idx}].{name}={value} out of range face_count={fcount}", - ) - - if slots and face: - slot_count = int(slots["attr1"]) - face_count = int(face["attr1"]) - slot_payload = self._entry_payload(blob, slots) - need = SLOT_TABLE_OFFSET + slot_count * 68 - if len(slot_payload) < need: - self.add_issue( - "error", - "msh-slot", - path, - f"type2 payload too short: size={len(slot_payload)}, need_at_least={need}", - ) - else: - if len(slot_payload) != need: - self.add_issue( - "warning", - "msh-slot", - path, - f"type2 payload has trailing bytes: size={len(slot_payload)}, expected={need}", - ) - for idx in range(slot_count): - off = SLOT_TABLE_OFFSET + idx * 68 - tri_start, tri_count = struct.unpack_from(" face_count: - self.add_issue( - "error", - "msh-slot-range", - path, - f"type2 slot[{idx}] range [{tri_start}, {tri_start + tri_count}) exceeds face_count={face_count}", - ) - - if nodes and slots: - node_payload = self._entry_payload(blob, nodes) - slot_count = int(slots["attr1"]) - node_count = int(nodes["attr1"]) - for node_idx in range(node_count): - off = node_idx * 38 - if off + 38 > len(node_payload): - self.add_issue( - "error", - "msh-node", - path, - f"type1 truncated at node {node_idx}", - ) - break - for j in range(19): - slot_id = struct.unpack_from("= slot_count: - self.add_issue( - "error", - "msh-node-slot", - path, - f"type1 node[{node_idx}] slot[{j}]={slot_id} out of range slot_count={slot_count}", - ) - - if type11: - payload = self._entry_payload(blob, type11) - if len(payload) >= 8: - w0, w1 = struct.unpack_from(" None: - if self.stats[key_min] is None or value < self.stats[key_min]: - self.stats[key_min] = value - if self.stats[key_max] is None or value > self.stats[key_max]: - self.stats[key_max] = value - - def validate_map(self, path: Path) -> None: - self.stats["map_total"] += 1 - blob = path.read_bytes() - if blob[:4] != MAGIC_NRES: - self.add_issue("error", "map-container", path, "file is not NRes") - return - - try: - parsed = arv.parse_nres(blob, source=str(path)) - except Exception as exc: # pylint: disable=broad-except - self.add_issue("error", "map-container", path, f"failed to parse NRes: {exc}") - return - - for issue in parsed.get("issues", []): - self.add_issue("warning", "map-nres", path, issue) - - entries = parsed["entries"] - if len(entries) != 1 or int(entries[0]["type_id"]) != 12: - self.add_issue( - "error", - "map-chunk", - path, - f"expected single chunk type=12, got {[int(e['type_id']) for e in entries]}", - ) - return - - entry = entries[0] - areal_count = int(entry["attr1"]) - if areal_count <= 0: - self.add_issue("error", "map-areal", path, f"invalid areal_count={areal_count}") - return - - payload = self._entry_payload(blob, entry) - ptr = 0 - records: list[dict[str, Any]] = [] - - for idx in range(areal_count): - if ptr + 56 > len(payload): - self.add_issue( - "error", - "map-record", - path, - f"truncated areal header at index={idx}, ptr={ptr}, size={len(payload)}", - ) - return - - anchor_x, anchor_y, anchor_z = struct.unpack_from(" 1e-3: - self.add_issue( - "warning", - "map-normal", - path, - f"record[{idx}] normal length={normal_len:.6f} (expected ~1.0)", - ) - - vertices_off = ptr + 56 - vertices_size = 12 * vertex_count - if vertices_off + vertices_size > len(payload): - self.add_issue( - "error", - "map-vertices", - path, - f"record[{idx}] vertices out of bounds", - ) - return - - vertices: list[tuple[float, float, float]] = [] - for i in range(vertex_count): - vertices.append(struct.unpack_from("= 3: - # signed shoelace area in XY. - shoelace = 0.0 - for i in range(vertex_count): - x1, y1, _ = vertices[i] - x2, y2, _ = vertices[(i + 1) % vertex_count] - shoelace += x1 * y2 - x2 * y1 - area_xy = abs(shoelace) * 0.5 - delta = abs(area_xy - area_f) - if delta > self.stats["map_area_delta_abs_max"]: - self.stats["map_area_delta_abs_max"] = delta - rel_delta = delta / max(1.0, area_xy) - if rel_delta > self.stats["map_area_delta_rel_max"]: - self.stats["map_area_delta_rel_max"] = rel_delta - if rel_delta > 0.05: - self.stats["map_area_rel_gt_05_count"] += 1 - - links_off = vertices_off + vertices_size - link_count = vertex_count + 3 * poly_count - links_size = 8 * link_count - if links_off + links_size > len(payload): - self.add_issue( - "error", - "map-links", - path, - f"record[{idx}] link table out of bounds", - ) - return - - edge_links: list[tuple[int, int]] = [] - for i in range(vertex_count): - area_ref, edge_ref = struct.unpack_from(" len(payload): - self.add_issue( - "error", - "map-poly", - path, - f"record[{idx}] poly header truncated at poly_idx={poly_idx}", - ) - return - n = struct.unpack_from(" len(payload): - self.add_issue( - "error", - "map-poly", - path, - f"record[{idx}] poly data out of bounds at poly_idx={poly_idx}", - ) - return - p += poly_size - - records.append( - { - "index": idx, - "anchor": (anchor_x, anchor_y, anchor_z), - "logic": logic_flag, - "class_id": class_id, - "vertex_count": vertex_count, - "poly_count": poly_count, - "edge_links": edge_links, - "poly_links": poly_links, - } - ) - ptr = p - - vertex_counts = [int(item["vertex_count"]) for item in records] - for rec in records: - idx = int(rec["index"]) - for link_idx, (area_ref, edge_ref) in enumerate(rec["edge_links"]): - if area_ref == -1: - if edge_ref != -1: - self.add_issue( - "warning", - "map-link", - path, - f"record[{idx}] edge_link[{link_idx}] has area_ref=-1 but edge_ref={edge_ref}", - ) - continue - if area_ref < 0 or area_ref >= areal_count: - self.add_issue( - "error", - "map-link", - path, - f"record[{idx}] edge_link[{link_idx}] area_ref={area_ref} out of range", - ) - continue - dst_vcount = vertex_counts[area_ref] - if edge_ref < 0 or edge_ref >= dst_vcount: - self.add_issue( - "error", - "map-link", - path, - f"record[{idx}] edge_link[{link_idx}] edge_ref={edge_ref} out of range dst_vertex_count={dst_vcount}", - ) - - for link_idx, (area_ref, edge_ref) in enumerate(rec["poly_links"]): - if area_ref == -1: - if edge_ref != -1: - self.add_issue( - "warning", - "map-poly-link", - path, - f"record[{idx}] poly_link[{link_idx}] has area_ref=-1 but edge_ref={edge_ref}", - ) - continue - if area_ref < 0 or area_ref >= areal_count: - self.add_issue( - "error", - "map-poly-link", - path, - f"record[{idx}] poly_link[{link_idx}] area_ref={area_ref} out of range", - ) - - if ptr + 8 > len(payload): - self.add_issue( - "error", - "map-cells", - path, - f"missing cells header at ptr={ptr}, size={len(payload)}", - ) - return - - cells_x, cells_y = struct.unpack_from(" len(payload): - self.add_issue( - "error", - "map-cells", - path, - f"truncated hitCount at cell ({x},{y})", - ) - return - hit_count = struct.unpack_from(" len(payload): - self.add_issue( - "error", - "map-cells", - path, - f"truncated areaIds at cell ({x},{y}), hitCount={hit_count}", - ) - return - for i in range(hit_count): - area_id = struct.unpack_from("= areal_count: - self.add_issue( - "error", - "map-cells", - path, - f"cell ({x},{y}) has area_id={area_id} out of range areal_count={areal_count}", - ) - ptr += need - - if ptr != len(payload): - self.add_issue( - "error", - "map-size", - path, - f"payload tail mismatch: consumed={ptr}, payload_size={len(payload)}", - ) - - def validate(self, maps_root: Path) -> None: - msh_paths = sorted(maps_root.rglob("Land.msh")) - map_paths = sorted(maps_root.rglob("Land.map")) - - msh_by_dir = {path.parent: path for path in msh_paths} - map_by_dir = {path.parent: path for path in map_paths} - - all_dirs = sorted(set(msh_by_dir) | set(map_by_dir)) - self.stats["maps_total"] = len(all_dirs) - - for folder in all_dirs: - msh_path = msh_by_dir.get(folder) - map_path = map_by_dir.get(folder) - if msh_path is None: - self.add_issue("error", "pairing", folder, "missing Land.msh") - continue - if map_path is None: - self.add_issue("error", "pairing", folder, "missing Land.map") - continue - self.validate_msh(msh_path) - self.validate_map(map_path) - - def build_report(self) -> dict[str, Any]: - errors = [i for i in self.issues if i.severity == "error"] - warnings = [i for i in self.issues if i.severity == "warning"] - - # Convert counters/defaultdicts to JSON-friendly dicts. - msh_orders = { - str(list(order)): count - for order, count in self.stats["msh_type_orders"].most_common() - } - msh_attrs = { - str(type_id): {str(list(k)): v for k, v in counter.most_common()} - for type_id, counter in self.stats["msh_attr_triplets"].items() - } - type11_hdr = { - str(list(key)): value - for key, value in self.stats["msh_type11_header_words"].most_common() - } - type21_flags = { - f"0x{key:08X}": value - for key, value in self.stats["msh_type21_flags_top"].most_common(32) - } - - return { - "summary": { - "maps_total": self.stats["maps_total"], - "msh_total": self.stats["msh_total"], - "map_total": self.stats["map_total"], - "issues_total": len(self.issues), - "errors_total": len(errors), - "warnings_total": len(warnings), - }, - "stats": { - "msh_type_orders": msh_orders, - "msh_attr_triplets": msh_attrs, - "msh_type11_header_words": type11_hdr, - "msh_type21_flags_top": type21_flags, - "map_logic_flags": dict(self.stats["map_logic_flags"]), - "map_class_ids": dict(self.stats["map_class_ids"]), - "map_poly_count": dict(self.stats["map_poly_count"]), - "map_vertex_count_min": self.stats["map_vertex_count_min"], - "map_vertex_count_max": self.stats["map_vertex_count_max"], - "map_cell_dims": {str(list(k)): v for k, v in self.stats["map_cell_dims"].items()}, - "map_reserved_u12": dict(self.stats["map_reserved_u12"]), - "map_reserved_u36": dict(self.stats["map_reserved_u36"]), - "map_reserved_u44": dict(self.stats["map_reserved_u44"]), - "map_area_delta_abs_max": self.stats["map_area_delta_abs_max"], - "map_area_delta_rel_max": self.stats["map_area_delta_rel_max"], - "map_area_rel_gt_05_count": self.stats["map_area_rel_gt_05_count"], - "map_normal_len_min": self.stats["map_normal_len_min"], - "map_normal_len_max": self.stats["map_normal_len_max"], - "map_records_total": self.stats["map_records_total"], - }, - "issues": [ - { - "severity": item.severity, - "category": item.category, - "resource": item.resource, - "message": item.message, - } - for item in self.issues - ], - } - - -def parse_args() -> argparse.Namespace: - parser = argparse.ArgumentParser(description="Validate terrain/map doc assumptions") - parser.add_argument( - "--maps-root", - type=Path, - default=Path("tmp/gamedata/DATA/MAPS"), - help="Root directory containing MAPS/**/Land.msh and Land.map", - ) - parser.add_argument( - "--report-json", - type=Path, - default=None, - help="Optional path to save full JSON report", - ) - parser.add_argument( - "--fail-on-warning", - action="store_true", - help="Return non-zero exit code on warnings too", - ) - return parser.parse_args() - - -def main() -> int: - args = parse_args() - validator = TerrainMapDocValidator() - validator.validate(args.maps_root) - report = validator.build_report() - - print( - json.dumps( - report["summary"], - indent=2, - ensure_ascii=False, - ) - ) - - if args.report_json: - args.report_json.parent.mkdir(parents=True, exist_ok=True) - with args.report_json.open("w", encoding="utf-8") as handle: - json.dump(report, handle, indent=2, ensure_ascii=False) - handle.write("\n") - print(f"report written: {args.report_json}") - - has_errors = report["summary"]["errors_total"] > 0 - has_warnings = report["summary"]["warnings_total"] > 0 - if has_errors: - return 1 - if args.fail_on_warning and has_warnings: - return 1 - return 0 - - -if __name__ == "__main__": - raise SystemExit(main()) diff --git a/tools/terrain_map_preview_renderer.py b/tools/terrain_map_preview_renderer.py deleted file mode 100644 index 86d72d7..0000000 --- a/tools/terrain_map_preview_renderer.py +++ /dev/null @@ -1,679 +0,0 @@ -#!/usr/bin/env python3 -""" -Software 3D renderer for terrain Land.msh + Land.map overlay. - -Output format: binary PPM (P6), dependency-free. -""" - -from __future__ import annotations - -import argparse -import math -import struct -from pathlib import Path -from typing import Any - -import archive_roundtrip_validator as arv - -MAGIC_NRES = b"NRes" - - -def _entry_payload(blob: bytes, entry: dict[str, Any]) -> bytes: - start = int(entry["data_offset"]) - end = start + int(entry["size"]) - return blob[start:end] - - -def _parse_nres(blob: bytes, source: str) -> dict[str, Any]: - if blob[:4] != MAGIC_NRES: - raise RuntimeError(f"{source}: not an NRes payload") - return arv.parse_nres(blob, source=source) - - -def _by_type(entries: list[dict[str, Any]]) -> dict[int, list[dict[str, Any]]]: - out: dict[int, list[dict[str, Any]]] = {} - for row in entries: - out.setdefault(int(row["type_id"]), []).append(row) - return out - - -def _get_single(by_type: dict[int, list[dict[str, Any]]], type_id: int, label: str) -> dict[str, Any]: - rows = by_type.get(type_id, []) - if not rows: - raise RuntimeError(f"missing resource type {type_id} ({label})") - return rows[0] - - -def _downsample_faces( - faces: list[tuple[int, int, int]], - max_faces: int, -) -> list[tuple[int, int, int]]: - if max_faces <= 0 or len(faces) <= max_faces: - return faces - step = len(faces) / max_faces - out: list[tuple[int, int, int]] = [] - pos = 0.0 - while len(out) < max_faces and int(pos) < len(faces): - out.append(faces[int(pos)]) - pos += step - return out - - -def load_terrain_msh( - path: Path, - *, - max_faces: int, -) -> tuple[list[tuple[float, float, float]], list[tuple[int, int, int]], dict[str, int]]: - blob = path.read_bytes() - parsed = _parse_nres(blob, str(path)) - by_type = _by_type(parsed["entries"]) - - res3 = _get_single(by_type, 3, "positions") - res21 = _get_single(by_type, 21, "terrain faces") - - pos_blob = _entry_payload(blob, res3) - if len(pos_blob) % 12 != 0: - raise RuntimeError(f"{path}: type 3 payload size is not divisible by 12") - vertex_count = len(pos_blob) // 12 - positions = [struct.unpack_from("<3f", pos_blob, i * 12) for i in range(vertex_count)] - - face_blob = _entry_payload(blob, res21) - if len(face_blob) % 28 != 0: - raise RuntimeError(f"{path}: type 21 payload size is not divisible by 28") - all_faces: list[tuple[int, int, int]] = [] - raw_face_count = len(face_blob) // 28 - dropped = 0 - for i in range(raw_face_count): - off = i * 28 - i0, i1, i2 = struct.unpack_from("= vertex_count or i1 >= vertex_count or i2 >= vertex_count: - dropped += 1 - continue - all_faces.append((i0, i1, i2)) - - faces = _downsample_faces(all_faces, max_faces) - meta = { - "vertex_count": vertex_count, - "face_count_raw": raw_face_count, - "face_count_valid": len(all_faces), - "face_count_rendered": len(faces), - "face_dropped_invalid": dropped, - } - return positions, faces, meta - - -def load_areal_map(path: Path) -> tuple[list[dict[str, Any]], dict[str, int]]: - blob = path.read_bytes() - parsed = _parse_nres(blob, str(path)) - by_type = _by_type(parsed["entries"]) - chunk = _get_single(by_type, 12, "ArealMapGeometry") - - payload = _entry_payload(blob, chunk) - areal_count = int(chunk["attr1"]) - ptr = 0 - areals: list[dict[str, Any]] = [] - for idx in range(areal_count): - if ptr + 56 > len(payload): - raise RuntimeError(f"{path}: truncated areal header at index={idx}") - class_id = struct.unpack_from(" len(payload): - raise RuntimeError(f"{path}: areal[{idx}] vertices out of bounds") - verts = [struct.unpack_from("<3f", payload, verts_off + 12 * i) for i in range(vertex_count)] - - links_off = verts_off + verts_size - links_size = 8 * (vertex_count + 3 * poly_count) - p = links_off + links_size - for _ in range(poly_count): - if p + 4 > len(payload): - raise RuntimeError(f"{path}: areal[{idx}] poly header out of bounds") - n = struct.unpack_from(" len(payload): - raise RuntimeError(f"{path}: areal[{idx}] poly data out of bounds") - - areals.append( - { - "index": idx, - "class_id": class_id, - "vertices": verts, - } - ) - ptr = p - - if ptr + 8 > len(payload): - raise RuntimeError(f"{path}: missing cells section") - cells_x, cells_y = struct.unpack_from(" len(payload): - raise RuntimeError(f"{path}: cells section truncated") - hit_count = struct.unpack_from(" len(payload): - raise RuntimeError(f"{path}: cells section out of bounds") - if ptr != len(payload): - raise RuntimeError(f"{path}: trailing bytes in chunk12 parse ({len(payload) - ptr})") - - meta = { - "areal_count": areal_count, - "cells_x": cells_x, - "cells_y": cells_y, - } - return areals, meta - - -def _color_for_class(class_id: int) -> tuple[int, int, int]: - x = (class_id * 1103515245 + 12345) & 0x7FFFFFFF - r = 60 + (x & 0x7F) - g = 60 + ((x >> 7) & 0x7F) - b = 60 + ((x >> 14) & 0x7F) - return r, g, b - - -def _write_ppm(path: Path, width: int, height: int, rgb: bytearray) -> None: - path.parent.mkdir(parents=True, exist_ok=True) - with path.open("wb") as handle: - handle.write(f"P6\n{width} {height}\n255\n".encode("ascii")) - handle.write(rgb) - - -def _write_obj( - path: Path, - terrain_positions: list[tuple[float, float, float]], - terrain_faces: list[tuple[int, int, int]], - areals: list[dict[str, Any]], - *, - include_areals: bool, -) -> None: - path.parent.mkdir(parents=True, exist_ok=True) - with path.open("w", encoding="utf-8", newline="\n") as out: - out.write("# Exported by terrain_map_preview_renderer.py\n") - out.write("o terrain\n") - for x, y, z in terrain_positions: - out.write(f"v {x:.9g} {y:.9g} {z:.9g}\n") - for i0, i1, i2 in terrain_faces: - # OBJ indices are 1-based. - out.write(f"f {i0 + 1} {i1 + 1} {i2 + 1}\n") - - if include_areals and areals: - base = len(terrain_positions) - area_vertex_counts: list[int] = [] - out.write("o areal_edges\n") - for area in areals: - verts = area["vertices"] - area_vertex_counts.append(len(verts)) - for x, y, z in verts: - out.write(f"v {x:.9g} {y:.9g} {z:.9g}\n") - - ptr = base - for area_idx, area in enumerate(areals): - cnt = area_vertex_counts[area_idx] - if cnt < 2: - ptr += cnt - continue - # closed polyline. - line = [str(ptr + i + 1) for i in range(cnt)] - line.append(str(ptr + 1)) - out.write("l " + " ".join(line) + "\n") - ptr += cnt - - -def _render_scene( - terrain_positions: list[tuple[float, float, float]], - terrain_faces: list[tuple[int, int, int]], - areals: list[dict[str, Any]], - *, - width: int, - height: int, - yaw_deg: float, - pitch_deg: float, - wireframe: bool, - areal_overlay: bool, -) -> bytearray: - all_positions = list(terrain_positions) - if areal_overlay: - for area in areals: - all_positions.extend(area["vertices"]) - if not all_positions: - raise RuntimeError("scene is empty") - - xs = [p[0] for p in all_positions] - ys = [p[1] for p in all_positions] - zs = [p[2] for p in all_positions] - cx = (min(xs) + max(xs)) * 0.5 - cy = (min(ys) + max(ys)) * 0.5 - cz = (min(zs) + max(zs)) * 0.5 - span = max(max(xs) - min(xs), max(ys) - min(ys), max(zs) - min(zs)) - radius = max(span * 0.5, 1e-3) - - yaw = math.radians(yaw_deg) - pitch = math.radians(pitch_deg) - cyaw = math.cos(yaw) - syaw = math.sin(yaw) - cpitch = math.cos(pitch) - spitch = math.sin(pitch) - camera_dist = radius * 3.2 - scale = min(width, height) * 0.96 - - # Terrain transform cache. - vx: list[float] = [] - vy: list[float] = [] - vz: list[float] = [] - sx: list[float] = [] - sy: list[float] = [] - for x, y, z in terrain_positions: - x0 = x - cx - y0 = y - cy - z0 = z - cz - x1 = cyaw * x0 + syaw * z0 - z1 = -syaw * x0 + cyaw * z0 - y2 = cpitch * y0 - spitch * z1 - z2 = spitch * y0 + cpitch * z1 + camera_dist - if z2 < 1e-3: - z2 = 1e-3 - vx.append(x1) - vy.append(y2) - vz.append(z2) - sx.append(width * 0.5 + (x1 / z2) * scale) - sy.append(height * 0.5 - (y2 / z2) * scale) - - def project_point(x: float, y: float, z: float) -> tuple[float, float, float]: - x0 = x - cx - y0 = y - cy - z0 = z - cz - x1 = cyaw * x0 + syaw * z0 - z1 = -syaw * x0 + cyaw * z0 - y2 = cpitch * y0 - spitch * z1 - z2 = spitch * y0 + cpitch * z1 + camera_dist - if z2 < 1e-3: - z2 = 1e-3 - px = width * 0.5 + (x1 / z2) * scale - py = height * 0.5 - (y2 / z2) * scale - return px, py, z2 - - rgb = bytearray([14, 16, 20] * (width * height)) - zbuf = [float("inf")] * (width * height) - light_dir = (0.35, 0.45, 1.0) - l_len = math.sqrt(light_dir[0] ** 2 + light_dir[1] ** 2 + light_dir[2] ** 2) - light = (light_dir[0] / l_len, light_dir[1] / l_len, light_dir[2] / l_len) - - def edge(ax: float, ay: float, bx: float, by: float, px: float, py: float) -> float: - return (px - ax) * (by - ay) - (py - ay) * (bx - ax) - - for i0, i1, i2 in terrain_faces: - x0 = sx[i0] - y0 = sy[i0] - x1 = sx[i1] - y1 = sy[i1] - x2 = sx[i2] - y2 = sy[i2] - area = edge(x0, y0, x1, y1, x2, y2) - if area == 0.0: - continue - - ux = vx[i1] - vx[i0] - uy = vy[i1] - vy[i0] - uz = vz[i1] - vz[i0] - wx = vx[i2] - vx[i0] - wy = vy[i2] - vy[i0] - wz = vz[i2] - vz[i0] - nx = uy * wz - uz * wy - ny = uz * wx - ux * wz - nz = ux * wy - uy * wx - n_len = math.sqrt(nx * nx + ny * ny + nz * nz) - if n_len > 0.0: - nx /= n_len - ny /= n_len - nz /= n_len - intensity = nx * light[0] + ny * light[1] + nz * light[2] - if intensity < 0.0: - intensity = 0.0 - shade = int(45 + 185 * intensity) - color = (min(255, shade + 6), min(255, shade + 14), min(255, shade + 28)) - - minx = int(max(0, math.floor(min(x0, x1, x2)))) - maxx = int(min(width - 1, math.ceil(max(x0, x1, x2)))) - miny = int(max(0, math.floor(min(y0, y1, y2)))) - maxy = int(min(height - 1, math.ceil(max(y0, y1, y2)))) - if minx > maxx or miny > maxy: - continue - - z0 = vz[i0] - z1 = vz[i1] - z2 = vz[i2] - inv_area = 1.0 / area - for py in range(miny, maxy + 1): - fy = py + 0.5 - row = py * width - for px in range(minx, maxx + 1): - fx = px + 0.5 - w0 = edge(x1, y1, x2, y2, fx, fy) - w1 = edge(x2, y2, x0, y0, fx, fy) - w2 = edge(x0, y0, x1, y1, fx, fy) - if area > 0: - if w0 < 0 or w1 < 0 or w2 < 0: - continue - else: - if w0 > 0 or w1 > 0 or w2 > 0: - continue - bz0 = w0 * inv_area - bz1 = w1 * inv_area - bz2 = w2 * inv_area - depth = bz0 * z0 + bz1 * z1 + bz2 * z2 - idx = row + px - if depth >= zbuf[idx]: - continue - zbuf[idx] = depth - p = idx * 3 - rgb[p + 0] = color[0] - rgb[p + 1] = color[1] - rgb[p + 2] = color[2] - - def draw_line( - xa: float, - ya: float, - xb: float, - yb: float, - color: tuple[int, int, int], - ) -> None: - x0i = int(round(xa)) - y0i = int(round(ya)) - x1i = int(round(xb)) - y1i = int(round(yb)) - dx = abs(x1i - x0i) - sx_step = 1 if x0i < x1i else -1 - dy = -abs(y1i - y0i) - sy_step = 1 if y0i < y1i else -1 - err = dx + dy - x = x0i - y = y0i - while True: - if 0 <= x < width and 0 <= y < height: - p = (y * width + x) * 3 - rgb[p + 0] = color[0] - rgb[p + 1] = color[1] - rgb[p + 2] = color[2] - if x == x1i and y == y1i: - break - e2 = 2 * err - if e2 >= dy: - err += dy - x += sx_step - if e2 <= dx: - err += dx - y += sy_step - - if wireframe: - wf = (225, 232, 246) - for i0, i1, i2 in terrain_faces: - draw_line(sx[i0], sy[i0], sx[i1], sy[i1], wf) - draw_line(sx[i1], sy[i1], sx[i2], sy[i2], wf) - draw_line(sx[i2], sy[i2], sx[i0], sy[i0], wf) - - if areal_overlay: - for area in areals: - verts = area["vertices"] - if len(verts) < 2: - continue - color = _color_for_class(int(area["class_id"])) - projected = [project_point(x, y, z + 0.35) for x, y, z in verts] - for i in range(len(projected)): - x0, y0, _ = projected[i] - x1, y1, _ = projected[(i + 1) % len(projected)] - draw_line(x0, y0, x1, y1, color) - - return rgb - - -def cmd_render(args: argparse.Namespace) -> int: - msh_path = Path(args.land_msh).resolve() - map_path = Path(args.land_map).resolve() if args.land_map else None - output_path = Path(args.output).resolve() - - positions, faces, terrain_meta = load_terrain_msh(msh_path, max_faces=int(args.max_faces)) - areals: list[dict[str, Any]] = [] - map_meta: dict[str, int] = {"areal_count": 0, "cells_x": 0, "cells_y": 0} - if map_path: - areals, map_meta = load_areal_map(map_path) - - rgb = _render_scene( - positions, - faces, - areals, - width=int(args.width), - height=int(args.height), - yaw_deg=float(args.yaw), - pitch_deg=float(args.pitch), - wireframe=bool(args.wireframe), - areal_overlay=bool(args.overlay_areals), - ) - _write_ppm(output_path, int(args.width), int(args.height), rgb) - - print(f"Rendered terrain : {msh_path}") - if map_path: - print(f"Areal overlay : {map_path}") - print(f"Output : {output_path}") - print( - "Terrain geometry : " - f"vertices={terrain_meta['vertex_count']}, " - f"faces={terrain_meta['face_count_rendered']}/{terrain_meta['face_count_valid']} " - f"(raw={terrain_meta['face_count_raw']}, dropped={terrain_meta['face_dropped_invalid']})" - ) - if map_path: - print( - "Areal map : " - f"areals={map_meta['areal_count']}, cells={map_meta['cells_x']}x{map_meta['cells_y']}" - ) - return 0 - - -def cmd_export_obj(args: argparse.Namespace) -> int: - msh_path = Path(args.land_msh).resolve() - map_path = Path(args.land_map).resolve() if args.land_map else None - output_path = Path(args.output).resolve() - - positions, faces, terrain_meta = load_terrain_msh(msh_path, max_faces=int(args.max_faces)) - areals: list[dict[str, Any]] = [] - if map_path and bool(args.include_areals): - areals, _ = load_areal_map(map_path) - - _write_obj( - output_path, - positions, - faces, - areals, - include_areals=bool(args.include_areals), - ) - - areal_vertices = sum(len(a["vertices"]) for a in areals) - print(f"Terrain source : {msh_path}") - if map_path: - print(f"Areal source : {map_path}") - print(f"OBJ output : {output_path}") - print( - "Terrain geometry : " - f"vertices={terrain_meta['vertex_count']}, " - f"faces={terrain_meta['face_count_rendered']}/{terrain_meta['face_count_valid']}" - ) - if bool(args.include_areals): - print(f"Areal edges : areals={len(areals)}, extra_vertices={areal_vertices}") - return 0 - - -def cmd_render_turntable(args: argparse.Namespace) -> int: - msh_path = Path(args.land_msh).resolve() - map_path = Path(args.land_map).resolve() if args.land_map else None - output_dir = Path(args.output_dir).resolve() - output_dir.mkdir(parents=True, exist_ok=True) - - frames = int(args.frames) - if frames <= 0: - raise RuntimeError("--frames must be > 0") - - positions, faces, terrain_meta = load_terrain_msh(msh_path, max_faces=int(args.max_faces)) - areals: list[dict[str, Any]] = [] - if map_path: - areals, _ = load_areal_map(map_path) - - yaw_start = float(args.yaw_start) - yaw_end = float(args.yaw_end) - if frames == 1: - yaws = [yaw_start] - else: - step = (yaw_end - yaw_start) / (frames - 1) - yaws = [yaw_start + i * step for i in range(frames)] - - prefix = str(args.prefix) - for i, yaw in enumerate(yaws): - rgb = _render_scene( - positions, - faces, - areals, - width=int(args.width), - height=int(args.height), - yaw_deg=yaw, - pitch_deg=float(args.pitch), - wireframe=bool(args.wireframe), - areal_overlay=bool(args.overlay_areals), - ) - out = output_dir / f"{prefix}_{i:03d}.ppm" - _write_ppm(out, int(args.width), int(args.height), rgb) - - print(f"Turntable source : {msh_path}") - if map_path: - print(f"Areal source : {map_path}") - print(f"Output dir : {output_dir}") - print(f"Frames : {frames} ({yaws[0]:.3f} -> {yaws[-1]:.3f} yaw)") - print( - "Terrain geometry : " - f"vertices={terrain_meta['vertex_count']}, faces={terrain_meta['face_count_rendered']}" - ) - return 0 - - -def cmd_render_batch(args: argparse.Namespace) -> int: - maps_root = Path(args.maps_root).resolve() - output_dir = Path(args.output_dir).resolve() - msh_paths = sorted(maps_root.rglob("Land.msh")) - if not msh_paths: - raise RuntimeError(f"no Land.msh files under {maps_root}") - - rendered = 0 - skipped = 0 - for msh_path in msh_paths: - map_path = msh_path.with_name("Land.map") - if not map_path.exists(): - skipped += 1 - continue - rel = msh_path.parent.relative_to(maps_root) - out = output_dir / f"{rel.as_posix().replace('/', '__')}.ppm" - cmd_render( - argparse.Namespace( - land_msh=str(msh_path), - land_map=str(map_path), - output=str(out), - max_faces=args.max_faces, - width=args.width, - height=args.height, - yaw=args.yaw, - pitch=args.pitch, - wireframe=args.wireframe, - overlay_areals=args.overlay_areals, - ) - ) - rendered += 1 - - print(f"Batch summary: rendered={rendered}, skipped_no_map={skipped}, output_dir={output_dir}") - return 0 - - -def build_parser() -> argparse.ArgumentParser: - parser = argparse.ArgumentParser( - description="Software 3D terrain renderer (Land.msh + optional Land.map overlay)." - ) - sub = parser.add_subparsers(dest="command", required=True) - - render = sub.add_parser("render", help="Render one terrain map to PPM.") - render.add_argument("--land-msh", required=True, help="Path to Land.msh") - render.add_argument("--land-map", help="Path to Land.map (optional)") - render.add_argument("--output", required=True, help="Output .ppm path") - render.add_argument("--max-faces", type=int, default=220000, help="Face limit (default: 220000)") - render.add_argument("--width", type=int, default=1280, help="Image width (default: 1280)") - render.add_argument("--height", type=int, default=720, help="Image height (default: 720)") - render.add_argument("--yaw", type=float, default=38.0, help="Yaw angle in degrees (default: 38)") - render.add_argument("--pitch", type=float, default=26.0, help="Pitch angle in degrees (default: 26)") - render.add_argument("--wireframe", action="store_true", help="Draw terrain wireframe overlay") - render.add_argument( - "--overlay-areals", - action="store_true", - help="Draw ArealMap polygon overlay", - ) - render.set_defaults(func=cmd_render) - - export_obj = sub.add_parser("export-obj", help="Export terrain (and optional areal edges) to OBJ.") - export_obj.add_argument("--land-msh", required=True, help="Path to Land.msh") - export_obj.add_argument("--land-map", help="Path to Land.map (optional)") - export_obj.add_argument("--output", required=True, help="Output .obj path") - export_obj.add_argument("--max-faces", type=int, default=0, help="Face limit (0 = all)") - export_obj.add_argument( - "--include-areals", - action="store_true", - help="Export areal polygons as OBJ polyline object", - ) - export_obj.set_defaults(func=cmd_export_obj) - - turn = sub.add_parser("render-turntable", help="Render turntable frame sequence to PPM.") - turn.add_argument("--land-msh", required=True, help="Path to Land.msh") - turn.add_argument("--land-map", help="Path to Land.map (optional)") - turn.add_argument("--output-dir", required=True, help="Output directory for frames") - turn.add_argument("--prefix", default="frame", help="Frame filename prefix (default: frame)") - turn.add_argument("--frames", type=int, default=36, help="Frame count (default: 36)") - turn.add_argument("--yaw-start", type=float, default=0.0, help="Start yaw in degrees (default: 0)") - turn.add_argument("--yaw-end", type=float, default=360.0, help="End yaw in degrees (default: 360)") - turn.add_argument("--pitch", type=float, default=26.0, help="Pitch angle in degrees (default: 26)") - turn.add_argument("--max-faces", type=int, default=160000, help="Face limit (default: 160000)") - turn.add_argument("--width", type=int, default=960, help="Image width (default: 960)") - turn.add_argument("--height", type=int, default=540, help="Image height (default: 540)") - turn.add_argument("--wireframe", action="store_true", help="Draw terrain wireframe overlay") - turn.add_argument( - "--overlay-areals", - action="store_true", - help="Draw ArealMap polygon overlay", - ) - turn.set_defaults(func=cmd_render_turntable) - - batch = sub.add_parser("render-batch", help="Render all MAPS/**/Land.msh under root.") - batch.add_argument( - "--maps-root", - default="tmp/gamedata/DATA/MAPS", - help="Root directory with MAPS subfolders (default: tmp/gamedata/DATA/MAPS)", - ) - batch.add_argument("--output-dir", required=True, help="Directory for output PPM files") - batch.add_argument("--max-faces", type=int, default=90000, help="Face limit per map (default: 90000)") - batch.add_argument("--width", type=int, default=960, help="Image width (default: 960)") - batch.add_argument("--height", type=int, default=540, help="Image height (default: 540)") - batch.add_argument("--yaw", type=float, default=38.0, help="Yaw angle in degrees (default: 38)") - batch.add_argument("--pitch", type=float, default=26.0, help="Pitch angle in degrees (default: 26)") - batch.add_argument("--wireframe", action="store_true", help="Draw terrain wireframe overlay") - batch.add_argument( - "--overlay-areals", - action="store_true", - help="Draw ArealMap polygon overlay", - ) - batch.set_defaults(func=cmd_render_batch) - - return parser - - -def main() -> int: - parser = build_parser() - args = parser.parse_args() - return int(args.func(args)) - - -if __name__ == "__main__": - raise SystemExit(main()) -- cgit v1.2.3