From e8701195e66f2d27ffe17fb514eae8173795aaf7 Mon Sep 17 00:00:00 2001
From: Georgiy Bondarenko <69736697+nehilo@users.noreply.github.com>
Date: Thu, 4 Mar 2021 22:54:23 +0500
Subject: Initial commit
---
Marlin/src/feature/binary_stream.h | 462 +++++++++++++++++++++++++++++++++++++
1 file changed, 462 insertions(+)
create mode 100644 Marlin/src/feature/binary_stream.h
(limited to 'Marlin/src/feature/binary_stream.h')
diff --git a/Marlin/src/feature/binary_stream.h b/Marlin/src/feature/binary_stream.h
new file mode 100644
index 0000000..81d6e71
--- /dev/null
+++ b/Marlin/src/feature/binary_stream.h
@@ -0,0 +1,462 @@
+/**
+ * Marlin 3D Printer Firmware
+ * Copyright (c) 2020 MarlinFirmware [https://github.com/MarlinFirmware/Marlin]
+ *
+ * Based on Sprinter and grbl.
+ * Copyright (c) 2011 Camiel Gubbels / Erik van der Zalm
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see .
+ *
+ */
+#pragma once
+
+#include "../inc/MarlinConfig.h"
+
+#define BINARY_STREAM_COMPRESSION
+
+#if ENABLED(BINARY_STREAM_COMPRESSION)
+ #include "../libs/heatshrink/heatshrink_decoder.h"
+#endif
+
+inline bool bs_serial_data_available(const uint8_t index) {
+ return SERIAL_IMPL.available(index);
+}
+
+inline int bs_read_serial(const uint8_t index) {
+ return SERIAL_IMPL.read(index);
+}
+
+#if ENABLED(BINARY_STREAM_COMPRESSION)
+ static heatshrink_decoder hsd;
+ static uint8_t decode_buffer[512] = {};
+#endif
+
+class SDFileTransferProtocol {
+private:
+ struct Packet {
+ struct [[gnu::packed]] Open {
+ static bool validate(char* buffer, size_t length) {
+ return (length > sizeof(Open) && buffer[length - 1] == '\0');
+ }
+ static Open& decode(char* buffer) {
+ data = &buffer[2];
+ return *reinterpret_cast(buffer);
+ }
+ bool compression_enabled() { return compression & 0x1; }
+ bool dummy_transfer() { return dummy & 0x1; }
+ static char* filename() { return data; }
+ private:
+ uint8_t dummy, compression;
+ static char* data; // variable length strings complicate things
+ };
+ };
+
+ static bool file_open(char* filename) {
+ if (!dummy_transfer) {
+ card.mount();
+ card.openFileWrite(filename);
+ if (!card.isFileOpen()) return false;
+ }
+ transfer_active = true;
+ data_waiting = 0;
+ TERN_(BINARY_STREAM_COMPRESSION, heatshrink_decoder_reset(&hsd));
+ return true;
+ }
+
+ static bool file_write(char* buffer, const size_t length) {
+ #if ENABLED(BINARY_STREAM_COMPRESSION)
+ if (compression) {
+ size_t total_processed = 0, processed_count = 0;
+ HSD_poll_res presult;
+
+ while (total_processed < length) {
+ heatshrink_decoder_sink(&hsd, reinterpret_cast(&buffer[total_processed]), length - total_processed, &processed_count);
+ total_processed += processed_count;
+ do {
+ presult = heatshrink_decoder_poll(&hsd, &decode_buffer[data_waiting], sizeof(decode_buffer) - data_waiting, &processed_count);
+ data_waiting += processed_count;
+ if (data_waiting == sizeof(decode_buffer)) {
+ if (!dummy_transfer)
+ if (card.write(decode_buffer, data_waiting) < 0) {
+ return false;
+ }
+ data_waiting = 0;
+ }
+ } while (presult == HSDR_POLL_MORE);
+ }
+ return true;
+ }
+ #endif
+ return (dummy_transfer || card.write(buffer, length) >= 0);
+ }
+
+ static bool file_close() {
+ if (!dummy_transfer) {
+ #if ENABLED(BINARY_STREAM_COMPRESSION)
+ // flush any buffered data
+ if (data_waiting) {
+ if (card.write(decode_buffer, data_waiting) < 0) return false;
+ data_waiting = 0;
+ }
+ #endif
+ card.closefile();
+ card.release();
+ }
+ TERN_(BINARY_STREAM_COMPRESSION, heatshrink_decoder_finish(&hsd));
+ transfer_active = false;
+ return true;
+ }
+
+ static void transfer_abort() {
+ if (!dummy_transfer) {
+ card.closefile();
+ card.removeFile(card.filename);
+ card.release();
+ TERN_(BINARY_STREAM_COMPRESSION, heatshrink_decoder_finish(&hsd));
+ }
+ transfer_active = false;
+ return;
+ }
+
+ enum class FileTransfer : uint8_t { QUERY, OPEN, CLOSE, WRITE, ABORT };
+
+ static size_t data_waiting, transfer_timeout, idle_timeout;
+ static bool transfer_active, dummy_transfer, compression;
+
+public:
+
+ static void idle() {
+ // If a transfer is interrupted and a file is left open, abort it after TIMEOUT ms
+ const millis_t ms = millis();
+ if (transfer_active && ELAPSED(ms, idle_timeout)) {
+ idle_timeout = ms + IDLE_PERIOD;
+ if (ELAPSED(ms, transfer_timeout)) transfer_abort();
+ }
+ }
+
+ static void process(uint8_t packet_type, char* buffer, const uint16_t length) {
+ transfer_timeout = millis() + TIMEOUT;
+ switch (static_cast(packet_type)) {
+ case FileTransfer::QUERY:
+ SERIAL_ECHOPAIR("PFT:version:", VERSION_MAJOR, ".", VERSION_MINOR, ".", VERSION_PATCH);
+ #if ENABLED(BINARY_STREAM_COMPRESSION)
+ SERIAL_ECHOLNPAIR(":compresion:heatshrink,", HEATSHRINK_STATIC_WINDOW_BITS, ",", HEATSHRINK_STATIC_LOOKAHEAD_BITS);
+ #else
+ SERIAL_ECHOLNPGM(":compresion:none");
+ #endif
+ break;
+ case FileTransfer::OPEN:
+ if (transfer_active)
+ SERIAL_ECHOLNPGM("PFT:busy");
+ else {
+ if (Packet::Open::validate(buffer, length)) {
+ auto packet = Packet::Open::decode(buffer);
+ compression = packet.compression_enabled();
+ dummy_transfer = packet.dummy_transfer();
+ if (file_open(packet.filename())) {
+ SERIAL_ECHOLNPGM("PFT:success");
+ break;
+ }
+ }
+ SERIAL_ECHOLNPGM("PFT:fail");
+ }
+ break;
+ case FileTransfer::CLOSE:
+ if (transfer_active) {
+ if (file_close())
+ SERIAL_ECHOLNPGM("PFT:success");
+ else
+ SERIAL_ECHOLNPGM("PFT:ioerror");
+ }
+ else SERIAL_ECHOLNPGM("PFT:invalid");
+ break;
+ case FileTransfer::WRITE:
+ if (!transfer_active)
+ SERIAL_ECHOLNPGM("PFT:invalid");
+ else if (!file_write(buffer, length))
+ SERIAL_ECHOLNPGM("PFT:ioerror");
+ break;
+ case FileTransfer::ABORT:
+ transfer_abort();
+ SERIAL_ECHOLNPGM("PFT:success");
+ break;
+ default:
+ SERIAL_ECHOLNPGM("PTF:invalid");
+ break;
+ }
+ }
+
+ static const uint16_t VERSION_MAJOR = 0, VERSION_MINOR = 1, VERSION_PATCH = 0, TIMEOUT = 10000, IDLE_PERIOD = 1000;
+};
+
+class BinaryStream {
+public:
+ enum class Protocol : uint8_t { CONTROL, FILE_TRANSFER };
+
+ enum class ProtocolControl : uint8_t { SYNC = 1, CLOSE };
+
+ enum class StreamState : uint8_t { PACKET_RESET, PACKET_WAIT, PACKET_HEADER, PACKET_DATA, PACKET_FOOTER,
+ PACKET_PROCESS, PACKET_RESEND, PACKET_TIMEOUT, PACKET_ERROR };
+
+ struct Packet { // 10 byte protocol overhead, ascii with checksum and line number has a minimum of 7 increasing with line
+
+ union Header {
+ static constexpr uint16_t HEADER_TOKEN = 0xB5AD;
+ struct [[gnu::packed]] {
+ uint16_t token; // packet start token
+ uint8_t sync; // stream sync, resend id and packet loss detection
+ uint8_t meta; // 4 bit protocol,
+ // 4 bit packet type
+ uint16_t size; // data length
+ uint16_t checksum; // header checksum
+ };
+ uint8_t protocol() { return (meta >> 4) & 0xF; }
+ uint8_t type() { return meta & 0xF; }
+ void reset() { token = 0; sync = 0; meta = 0; size = 0; checksum = 0; }
+ uint8_t data[2];
+ };
+
+ union Footer {
+ struct [[gnu::packed]] {
+ uint16_t checksum; // full packet checksum
+ };
+ void reset() { checksum = 0; }
+ uint8_t data[1];
+ };
+
+ Header header;
+ Footer footer;
+ uint32_t bytes_received;
+ uint16_t checksum, header_checksum;
+ millis_t timeout;
+ char* buffer;
+
+ void reset() {
+ header.reset();
+ footer.reset();
+ bytes_received = 0;
+ checksum = 0;
+ header_checksum = 0;
+ timeout = millis() + PACKET_MAX_WAIT;
+ buffer = nullptr;
+ }
+ } packet{};
+
+ void reset() {
+ sync = 0;
+ packet_retries = 0;
+ buffer_next_index = 0;
+ }
+
+ // fletchers 16 checksum
+ uint32_t checksum(uint32_t cs, uint8_t value) {
+ uint16_t cs_low = (((cs & 0xFF) + value) % 255);
+ return ((((cs >> 8) + cs_low) % 255) << 8) | cs_low;
+ }
+
+ // read the next byte from the data stream keeping track of
+ // whether the stream times out from data starvation
+ // takes the data variable by reference in order to return status
+ bool stream_read(uint8_t& data) {
+ if (stream_state != StreamState::PACKET_WAIT && ELAPSED(millis(), packet.timeout)) {
+ stream_state = StreamState::PACKET_TIMEOUT;
+ return false;
+ }
+ if (!bs_serial_data_available(card.transfer_port_index)) return false;
+ data = bs_read_serial(card.transfer_port_index);
+ packet.timeout = millis() + PACKET_MAX_WAIT;
+ return true;
+ }
+
+ template
+ void receive(char (&buffer)[buffer_size]) {
+ uint8_t data = 0;
+ millis_t transfer_window = millis() + RX_TIMESLICE;
+
+ #if ENABLED(SDSUPPORT)
+ PORT_REDIRECT(SERIAL_PORTMASK(card.transfer_port_index));
+ #endif
+
+ #pragma GCC diagnostic push
+ #pragma GCC diagnostic ignored "-Warray-bounds"
+
+ while (PENDING(millis(), transfer_window)) {
+ switch (stream_state) {
+ /**
+ * Data stream packet handling
+ */
+ case StreamState::PACKET_RESET:
+ packet.reset();
+ stream_state = StreamState::PACKET_WAIT;
+ case StreamState::PACKET_WAIT:
+ if (!stream_read(data)) { idle(); return; } // no active packet so don't wait
+ packet.header.data[1] = data;
+ if (packet.header.token == packet.header.HEADER_TOKEN) {
+ packet.bytes_received = 2;
+ stream_state = StreamState::PACKET_HEADER;
+ }
+ else {
+ // stream corruption drop data
+ packet.header.data[0] = data;
+ }
+ break;
+ case StreamState::PACKET_HEADER:
+ if (!stream_read(data)) break;
+
+ packet.header.data[packet.bytes_received++] = data;
+ packet.checksum = checksum(packet.checksum, data);
+
+ // header checksum calculation can't contain the checksum
+ if (packet.bytes_received == sizeof(Packet::header) - 2)
+ packet.header_checksum = packet.checksum;
+
+ if (packet.bytes_received == sizeof(Packet::header)) {
+ if (packet.header.checksum == packet.header_checksum) {
+ // The SYNC control packet is a special case in that it doesn't require the stream sync to be correct
+ if (static_cast(packet.header.protocol()) == Protocol::CONTROL && static_cast(packet.header.type()) == ProtocolControl::SYNC) {
+ SERIAL_ECHOLNPAIR("ss", sync, ",", buffer_size, ",", VERSION_MAJOR, ".", VERSION_MINOR, ".", VERSION_PATCH);
+ stream_state = StreamState::PACKET_RESET;
+ break;
+ }
+ if (packet.header.sync == sync) {
+ buffer_next_index = 0;
+ packet.bytes_received = 0;
+ if (packet.header.size) {
+ stream_state = StreamState::PACKET_DATA;
+ packet.buffer = static_cast(&buffer[0]); // multipacket buffering not implemented, always allocate whole buffer to packet
+ }
+ else
+ stream_state = StreamState::PACKET_PROCESS;
+ }
+ else if (packet.header.sync == sync - 1) { // ok response must have been lost
+ SERIAL_ECHOLNPAIR("ok", packet.header.sync); // transmit valid packet received and drop the payload
+ stream_state = StreamState::PACKET_RESET;
+ }
+ else if (packet_retries) {
+ stream_state = StreamState::PACKET_RESET; // could be packets already buffered on flow controlled connections, drop them without ack
+ }
+ else {
+ SERIAL_ECHO_MSG("Datastream packet out of order");
+ stream_state = StreamState::PACKET_RESEND;
+ }
+ }
+ else {
+ SERIAL_ECHO_START();
+ SERIAL_ECHOLNPAIR("Packet header(", packet.header.sync, "?) corrupt");
+ stream_state = StreamState::PACKET_RESEND;
+ }
+ }
+ break;
+ case StreamState::PACKET_DATA:
+ if (!stream_read(data)) break;
+
+ if (buffer_next_index < buffer_size)
+ packet.buffer[buffer_next_index] = data;
+ else {
+ SERIAL_ECHO_MSG("Datastream packet data buffer overrun");
+ stream_state = StreamState::PACKET_ERROR;
+ break;
+ }
+
+ packet.checksum = checksum(packet.checksum, data);
+ packet.bytes_received++;
+ buffer_next_index++;
+
+ if (packet.bytes_received == packet.header.size) {
+ stream_state = StreamState::PACKET_FOOTER;
+ packet.bytes_received = 0;
+ }
+ break;
+ case StreamState::PACKET_FOOTER:
+ if (!stream_read(data)) break;
+
+ packet.footer.data[packet.bytes_received++] = data;
+ if (packet.bytes_received == sizeof(Packet::footer)) {
+ if (packet.footer.checksum == packet.checksum) {
+ stream_state = StreamState::PACKET_PROCESS;
+ }
+ else {
+ SERIAL_ECHO_START();
+ SERIAL_ECHOLNPAIR("Packet(", packet.header.sync, ") payload corrupt");
+ stream_state = StreamState::PACKET_RESEND;
+ }
+ }
+ break;
+ case StreamState::PACKET_PROCESS:
+ sync++;
+ packet_retries = 0;
+ bytes_received += packet.header.size;
+
+ SERIAL_ECHOLNPAIR("ok", packet.header.sync); // transmit valid packet received
+ dispatch();
+ stream_state = StreamState::PACKET_RESET;
+ break;
+ case StreamState::PACKET_RESEND:
+ if (packet_retries < MAX_RETRIES || MAX_RETRIES == 0) {
+ packet_retries++;
+ stream_state = StreamState::PACKET_RESET;
+ SERIAL_ECHO_START();
+ SERIAL_ECHOLNPAIR("Resend request ", int(packet_retries));
+ SERIAL_ECHOLNPAIR("rs", sync);
+ }
+ else
+ stream_state = StreamState::PACKET_ERROR;
+ break;
+ case StreamState::PACKET_TIMEOUT:
+ SERIAL_ECHO_MSG("Datastream timeout");
+ stream_state = StreamState::PACKET_RESEND;
+ break;
+ case StreamState::PACKET_ERROR:
+ SERIAL_ECHOLNPAIR("fe", packet.header.sync);
+ reset(); // reset everything, resync required
+ stream_state = StreamState::PACKET_RESET;
+ break;
+ }
+ }
+
+ #pragma GCC diagnostic pop
+ }
+
+ void dispatch() {
+ switch (static_cast(packet.header.protocol())) {
+ case Protocol::CONTROL:
+ switch (static_cast(packet.header.type())) {
+ case ProtocolControl::CLOSE: // revert back to ASCII mode
+ card.flag.binary_mode = false;
+ break;
+ default:
+ SERIAL_ECHO_MSG("Unknown BinaryProtocolControl Packet");
+ }
+ break;
+ case Protocol::FILE_TRANSFER:
+ SDFileTransferProtocol::process(packet.header.type(), packet.buffer, packet.header.size); // send user data to be processed
+ break;
+ default:
+ SERIAL_ECHO_MSG("Unsupported Binary Protocol");
+ }
+ }
+
+ void idle() {
+ // Some Protocols may need periodic updates without new data
+ SDFileTransferProtocol::idle();
+ }
+
+ static const uint16_t PACKET_MAX_WAIT = 500, RX_TIMESLICE = 20, MAX_RETRIES = 0, VERSION_MAJOR = 0, VERSION_MINOR = 1, VERSION_PATCH = 0;
+ uint8_t packet_retries, sync;
+ uint16_t buffer_next_index;
+ uint32_t bytes_received;
+ StreamState stream_state = StreamState::PACKET_RESET;
+};
+
+extern BinaryStream binaryStream[NUM_SERIAL];
--
cgit v1.2.3