diff --git a/CHANGELOG.md b/CHANGELOG.md index a84a0f4..58a8d79 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,11 @@ +# 5.2.0 + +- **BinaryWriter**: + - **New Feature**: Added `skip(int count)` — advances the write position by [count] bytes without writing data, for reserving space (Reserve & Backpatch pattern). + - **New Feature**: Added `shiftBytes(int start, int end, int target)` — shifts a block of written bytes within the buffer, enabling in-place compaction when reserved header space exceeds actual needs. + # 5.1.0 - **BinaryWriterPool**: diff --git a/README.md b/README.md index 5112f38..d175344 100644 --- a/README.md +++ b/README.md @@ -20,7 +20,7 @@ ```yaml dependencies: - pro_binary: ^5.1.0 + pro_binary: ^5.2.0 ``` ## Quick Start @@ -170,7 +170,7 @@ Explore the [example](example/) directory for complete, runnable projects: | Component | Description | | --------- | ----------- | -| **BinaryWriter** | Fast encoder for fixed-width, VarInt/ZigZag, and one-pass strings. Features automatic expansion and pooling. | +| **BinaryWriter** | Fast encoder for fixed-width, VarInt/ZigZag, and one-pass strings. Features automatic expansion, pooling, and in-place buffer manipulation (`skip`, `shiftBytes`). | | **BinaryReader** | Zero-copy decoder with advanced navigation (`seek`, `rewind`, `peek`). Optimized for performance. | | **StreamBinaryReader** | Handles async data chunks seamlessly with a transactional `bookmark`/`rollback` model for partial data. | | **BinaryStreamTransformer** | The easiest way to parse a `Stream>` into a stream of typed messages or objects. | @@ -184,16 +184,16 @@ Run benchmarks to see it in action: ```bash # Serialization (Writer) -dart run performance/serialization_bench.dart +dart run benchmark_harness:bench --flavor aot --target performance/serialization_bench.dart # Deserialization (Reader) -dart run performance/deserialization_bench.dart +dart run benchmark_harness:bench --flavor aot --target performance/deserialization_bench.dart # String encoding (One-pass vs Two-pass vs Standard) -dart run performance/strings_bench.dart +dart run benchmark_harness:bench --flavor aot --target performance/strings_bench.dart # Object Pooling (GC impact mitigation) -dart run performance/pool_bench.dart +dart run benchmark_harness:bench --flavor aot --target performance/pool_bench.dart ``` ## Testing diff --git a/lib/src/binary_reader.dart b/lib/src/binary_reader.dart index 517df95..8ea8c1e 100644 --- a/lib/src/binary_reader.dart +++ b/lib/src/binary_reader.dart @@ -105,6 +105,7 @@ extension type BinaryReader._(_ReaderState _rs) { var byte = list[offset++]; if ((byte & 0x80) == 0) { _rs.offset = offset; + return byte; } @@ -121,6 +122,7 @@ extension type BinaryReader._(_ReaderState _rs) { if ((byte & 0x80) == 0) { _rs.offset = offset; + return result; } @@ -134,6 +136,7 @@ extension type BinaryReader._(_ReaderState _rs) { if ((byte & 0x80) == 0) { _rs.offset = offset; + return result; } @@ -149,6 +152,7 @@ extension type BinaryReader._(_ReaderState _rs) { if ((byte & 0x80) == 0) { _rs.offset = offset; + return result; } @@ -417,11 +421,12 @@ extension type BinaryReader._(_ReaderState _rs) { // Create a view of the underlying buffer without copying final bOffset = _rs.baseOffset; - final bytes = _rs.data.buffer.asUint8List(bOffset + _rs.offset, length); + final offset = _rs.offset; + final view = _rs.data.buffer.asUint8List(bOffset + offset, length); _rs.offset += length; - return bytes; + return view; } /// Reads all remaining bytes from the current position to the end of the @@ -500,7 +505,9 @@ extension type BinaryReader._(_ReaderState _rs) { _checkBounds(length, 'String'); final bOffset = _rs.baseOffset; - final view = _rs.data.buffer.asUint8List(bOffset + _rs.offset, length); + final offset = _rs.offset; + final view = _rs.data.buffer.asUint8List(bOffset + offset, length); + _rs.offset += length; return utf8.decode(view, allowMalformed: allowMalformed); @@ -640,12 +647,13 @@ extension type BinaryReader._(_ReaderState _rs) { } final peekOffset = offset ?? _rs.offset; + _checkBounds(length, 'Peek Bytes', peekOffset); final bOffset = _rs.baseOffset; - final bytes = _rs.data.buffer.asUint8List(bOffset + peekOffset, length); + final view = _rs.data.buffer.asUint8List(bOffset + peekOffset, length); - return bytes; + return view; } /// Returns the byte at the current read position without advancing the diff --git a/lib/src/binary_writer.dart b/lib/src/binary_writer.dart index 4a5ad3d..4efce0a 100644 --- a/lib/src/binary_writer.dart +++ b/lib/src/binary_writer.dart @@ -176,7 +176,6 @@ extension type BinaryWriter._(_WriterState _ws) { @pragma('vm:prefer-inline') @pragma('dart2js:tryInline') // Disable lint to allow positional boolean parameter for simplicity - // ignore: avoid_positional_boolean_parameters void writeBool(bool value) { writeUint8(value ? 1 : 0); } @@ -843,6 +842,63 @@ extension type BinaryWriter._(_WriterState _ws) { _ws.offset = position; } + /// Advances the write position by [count] bytes without writing data. + /// + /// The skipped bytes may contain garbage. This is primarily used to reserve + /// space for a header that will be written later + /// (Reserve & Backpatch pattern). + /// + /// Throws [RangeError] if [count] is negative. + @pragma('vm:prefer-inline') + @pragma('dart2js:tryInline') + void skip(int count) { + if (count < 0) { + throw RangeError.value(count, 'count', 'must be non-negative'); + } + + if (count == 0) { + return; + } + + _ws + ..ensureSize(count) + ..offset += count; + } + + /// Shifts a block of written bytes within the buffer. + /// + /// Used for the "Reserve & Backpatch" pattern when the reserved header space + /// was larger than actually needed. This allows shifting the payload left to + /// overwrite the unused reserved space, avoiding a new array allocation. + /// + /// [start] - The starting index of the block to shift. + /// [end] - The ending index (exclusive) of the block to shift. + /// [target] - The index where the block should be moved to + /// (must be <= start). + /// + /// Throws [RangeError] if parameters define an invalid range or would cause + /// data corruption. + @pragma('vm:prefer-inline') + @pragma('dart2js:tryInline') + void shiftBytes(int start, int end, int target) { + assert(start >= 0, 'start must be non-negative'); + assert(end >= start, 'end must be >= start'); + assert(end <= _ws.offset, 'end exceeds current bytesWritten'); + assert(target >= 0, 'target must be non-negative'); + assert(target <= start, 'target must be <= start (can only shift left)'); + + final length = end - start; + if (length == 0) { + return; + } + + _ws.list.setRange(target, target + length, _ws.list, start); + + if (end == _ws.offset) { + _ws.offset = target + length; + } + } + /// Returns the byte at the specified [index] without changing the current /// write position. /// @@ -918,6 +974,7 @@ extension type BinaryWriter._(_WriterState _ws) { list[offset] = 0xEF; list[offset + 1] = 0xBF; list[offset + 2] = 0xBD; + return offset + 3; } @@ -991,6 +1048,7 @@ final class _WriterState { @pragma('dart2js:tryInline') void ensureSize(int size) { assert(!_isInPool, 'Cannot ensure size on a pooled writer'); + if (offset + size > capacity) { _expand(size); } @@ -1000,6 +1058,7 @@ final class _WriterState { @pragma('dart2js:tryInline') void ensureOneByte() { assert(!_isInPool, 'Cannot ensure size on a pooled writer'); + if (offset + 1 > capacity) { _expand(1); } @@ -1009,6 +1068,7 @@ final class _WriterState { @pragma('dart2js:tryInline') void ensureTwoBytes() { assert(!_isInPool, 'Cannot ensure size on a pooled writer'); + if (offset + 2 > capacity) { _expand(2); } @@ -1018,6 +1078,7 @@ final class _WriterState { @pragma('dart2js:tryInline') void ensureFourBytes() { assert(!_isInPool, 'Cannot ensure size on a pooled writer'); + if (offset + 4 > capacity) { _expand(4); } @@ -1027,6 +1088,7 @@ final class _WriterState { @pragma('dart2js:tryInline') void ensureEightBytes() { assert(!_isInPool, 'Cannot ensure size on a pooled writer'); + if (offset + 8 > capacity) { _expand(8); } diff --git a/lib/src/binary_writer_pool.dart b/lib/src/binary_writer_pool.dart index 5ae17ae..ae39571 100644 --- a/lib/src/binary_writer_pool.dart +++ b/lib/src/binary_writer_pool.dart @@ -1,6 +1,5 @@ part of 'binary_writer.dart'; -// Disable lint to allow static-only class for pooling /// Object pool for reusing [BinaryWriter] instances to reduce GC pressure. /// /// This pool maintains a cache of [BinaryWriter] instances with their @@ -13,49 +12,44 @@ part of 'binary_writer.dart'; /// buffers ≤ `maxReusableCapacity` /// - **Size limits:** Maintains max `maxPoolSize` pooled instances /// - **Safe:** Prevents double-release and handles edge cases +/// - **Instantiable:** Create custom pools for specific use cases or use the +/// default global pool. /// /// ## Usage Pattern -/// Use `acquire()` and `release()` for short-lived write operations: /// +/// **Using the default global pool:** /// ```dart /// final writer = BinaryWriterPool.acquire(); /// try { /// writer.writeUint32(42); /// writer.writeString('Hello'); /// final bytes = writer.toBytes(); -/// // Use bytes... /// } finally { -/// BinaryWriterPool.release(writer); // Return to pool +/// BinaryWriterPool.release(writer); /// } /// ``` /// -/// ## Thread Safety -/// This pool is isolate-local. Each Dart isolate maintains its own -/// static pool instance. +/// **Creating a custom pool:** +/// ```dart +/// final myPool = BinaryWriterPool( +/// maxPoolSize: 10, +/// initialBufferSize: 512, +/// maxReusableCapacity: 4096, +/// ); /// -/// Avoid sharing [BinaryWriter] instances between different isolates. -/// For concurrent operations within the same isolate, ensure writers -/// are acquired and released synchronously or protected by logic -/// to prevent interleaved usage. +/// final writer = myPool.acquireInstance(); +/// // ... +/// myPool.releaseInstance(writer); +/// ``` /// /// ## Performance Considerations /// - Pooling is beneficial for high-frequency write operations /// - Overhead is minimal for single-use writers (use regular constructor) /// - Large buffers (>64 KiB by default) are discarded to avoid memory waste /// -/// ## Memory Management -/// - Default pool max size: 32 writers -/// - Default max reusable buffer: 64 KiB -/// - Default initial buffer size: 1 KiB -/// - Use [configure] to change these limits -/// - Use [clear] to free pooled memory explicitly -/// -/// See also: [BinaryWriter], [stats] for pool monitoring -// ignore: avoid_classes_with_only_static_members -abstract final class BinaryWriterPool { - /// Configures the pool settings. - /// - /// This should typically be called once at application startup. +/// See also: [BinaryWriter] +final class BinaryWriterPool { + /// Creates a new, isolated [BinaryWriterPool]. /// /// Parameters: /// - [maxPoolSize]: Maximum number of writers to keep in the pool @@ -64,20 +58,41 @@ abstract final class BinaryWriterPool { /// (default: 1 KiB). /// - [maxReusableCapacity]: Maximum buffer capacity allowed for pooling /// (default: 64 KiB). Writers exceeding this are discarded on release. - /// - /// Example: - /// ```dart - /// // Configure for heavy load with larger buffers - /// BinaryWriterPool.configure( - /// maxPoolSize: 64, - /// maxReusableCapacity: 256 * 1024, - /// ); - /// ``` - static void configure({ + BinaryWriterPool({ int maxPoolSize = 32, int initialBufferSize = 1024, int maxReusableCapacity = 64 * 1024, }) { + _validateConfig(maxPoolSize, initialBufferSize, maxReusableCapacity); + _maxPoolSize = maxPoolSize; + _initialBufferSize = initialBufferSize; + _maxReusableCapacity = maxReusableCapacity; + } + + /// The default global instance of [BinaryWriterPool]. + /// + /// This is used when calling the static methods on [BinaryWriterPool]. + static final global = BinaryWriterPool(); + + // The internal pool of reusable writer states. + final List<_WriterState> _pool = []; + + late int _maxPoolSize; + late int _initialBufferSize; + late int _maxReusableCapacity; + + // Performance counters + var _acquireHit = 0; + var _acquireMiss = 0; + var _peakPoolSize = 0; + var _discardedLargeBuffers = 0; + var _discardedPoolFull = 0; + + static void _validateConfig( + int maxPoolSize, + int initialBufferSize, + int maxReusableCapacity, + ) { if (maxPoolSize <= 0) { throw ArgumentError.value(maxPoolSize, 'maxPoolSize', 'Must be positive'); } @@ -105,65 +120,41 @@ abstract final class BinaryWriterPool { 'This would cause all pooled writers to be discarded immediately.', ); } + } + + /// Configures the default global pool settings. + /// + /// This should typically be called once at application startup. + static void configure({ + int maxPoolSize = 32, + int initialBufferSize = 1024, + int maxReusableCapacity = 64 * 1024, + }) { + global.reconfigure( + maxPoolSize: maxPoolSize, + initialBufferSize: initialBufferSize, + maxReusableCapacity: maxReusableCapacity, + ); + } + /// Reconfigures the settings of this specific pool instance. + void reconfigure({ + int maxPoolSize = 32, + int initialBufferSize = 1024, + int maxReusableCapacity = 64 * 1024, + }) { + _validateConfig(maxPoolSize, initialBufferSize, maxReusableCapacity); _maxPoolSize = maxPoolSize; _initialBufferSize = initialBufferSize; _maxReusableCapacity = maxReusableCapacity; } - // The internal pool of reusable writer states. - static final _pool = <_WriterState>[]; + /// Acquires a [BinaryWriter] from the default global pool. + static BinaryWriter acquire([int? initialBufferSize]) => + global.acquireInstance(initialBufferSize); - /// Maximum number of writers to keep in the pool. - static var _maxPoolSize = 32; - - /// Default initial buffer size for new writers (1 KiB). - static var _initialBufferSize = 1024; - - /// Maximum buffer capacity allowed for pooling (64 KiB). - /// Writers that exceed this size are discarded to free up system memory. - static var _maxReusableCapacity = 64 * 1024; - - // Performance counters - static var _acquireHit = 0; - static var _acquireMiss = 0; - static var _peakPoolSize = 0; - static var _discardedLargeBuffers = 0; - static var _discardedPoolFull = 0; - - /// Acquires a [BinaryWriter] from the pool or creates a new one. - /// - /// Returns a pooled writer if available, otherwise creates a fresh instance - /// with the default buffer size (1 KiB). - /// - /// The returned writer is ready to use and should be returned to the pool - /// via [release] when no longer needed. - /// - /// **Best Practice:** Always use a `try-finally` block. - /// - /// There are two ways to get the data: - /// 1. Use [BinaryWriter.toBytes] if you consume data **inside** the try - /// block (zero-copy view). This is the fastest method but the view - /// becomes invalid if the writer is reused. - /// 2. Use [BinaryWriter.takeBytes] with `copy: true` if you need to - /// **return** the data. This copies the written bytes but **retains** - /// the internal buffer for the pool, preventing future re-allocations. - /// 3. Use [BinaryWriter.takeBytes] with `copy: false` (default) for a - /// zero-copy transfer of ownership. This detaches the buffer from the - /// writer, causing the pool to allocate a new buffer next time. - /// - /// ```dart - /// final writer = BinaryWriterPool.acquire(); - /// try { - /// writer.writeUint32(123); - /// return writer.takeBytes(copy: true); // Recommended for pooling - /// } finally { - /// BinaryWriterPool.release(writer); - /// } - /// ``` - /// - /// Returns: A [BinaryWriter] ready for use. - static BinaryWriter acquire([int? initialBufferSize]) { + /// Acquires a [BinaryWriter] from this specific pool instance. + BinaryWriter acquireInstance([int? initialBufferSize]) { final size = initialBufferSize ?? _initialBufferSize; if (size <= 0) { @@ -175,8 +166,6 @@ abstract final class BinaryWriterPool { } if (_pool.isNotEmpty) { - // Find the best-fitting buffer: smallest one that is >= requested size. - // If none, take the largest available to minimize expansions. var bestIndex = -1; var smallestSuitableCapacity = double.infinity; @@ -188,7 +177,6 @@ abstract final class BinaryWriterPool { } } - // If no suitable buffer found, take the largest one to minimize growth if (bestIndex == -1) { var largestCap = -1; for (var i = 0; i < _pool.length; i++) { @@ -214,68 +202,44 @@ abstract final class BinaryWriterPool { return BinaryWriter(initialBufferSize: size); } - /// Acquires a writer, executes the given [action], and automatically - /// releases the writer back to the pool. - /// - /// This is the recommended way to use the pool as it ensures the writer - /// is always released even if an exception occurs. - /// - /// Parameters: - /// - [action]: The function to execute with the acquired writer - /// - [initialBufferSize]: Initial buffer size for new writers - /// (defaults to pool setting) - /// - /// Example: - /// ```dart - /// final bytes = BinaryWriterPool.withWriter((writer) { - /// writer.writeUint32(42); - /// return writer.takeBytes(copy: true); - /// }); - /// ``` + /// Acquires a writer from the default global pool, executes the given + /// [action], and automatically releases it. static T withWriter( T Function(BinaryWriter writer) action, [ int? initialBufferSize, + ]) => global.withWriterInstance(action, initialBufferSize); + + /// Acquires a writer from this pool instance, executes the given + /// [action], and automatically releases it. + T withWriterInstance( + T Function(BinaryWriter writer) action, [ + int? initialBufferSize, ]) { - final writer = acquire(initialBufferSize); + final writer = acquireInstance(initialBufferSize); try { return action(writer); } finally { - release(writer); + releaseInstance(writer); } } - /// Returns a [BinaryWriter] to the pool for future reuse. - /// - /// The writer is reset (offset cleared) and stored for future [acquire] - /// calls. Writers with buffers larger than `maxReusableCapacity` are not - /// pooled to avoid long-term memory retention. - /// - /// **Safe to call multiple times** (duplicate releases are ignored). - /// - /// Only writers with capacity ≤ [_maxReusableCapacity] are pooled. - /// Writers exceeding this limit are discarded, allowing the buffer to be - /// garbage collected. - /// - /// **Do NOT use the writer after releasing it.** - /// - /// Parameters: - /// - [writer]: The [BinaryWriter] to return to the pool - static void release(BinaryWriter writer) { + /// Returns a [BinaryWriter] to the default global pool. + static void release(BinaryWriter writer) => global.releaseInstance(writer); + + /// Returns a [BinaryWriter] to this specific pool instance. + void releaseInstance(BinaryWriter writer) { final state = writer._ws; - // Prevent double-release and state corruption if (state._isInPool) { return; } - // Only pool writers with reasonable buffer sizes if (state.capacity <= _maxReusableCapacity && _pool.length < _maxPoolSize) { state ..offset = 0 .._isInPool = true; _pool.add(state); - // Track peak pool size if (_pool.length > _peakPoolSize) { _peakPoolSize = _pool.length; } @@ -286,28 +250,11 @@ abstract final class BinaryWriterPool { } } - /// Returns pool statistics for monitoring and debugging. - /// - /// Useful for performance analysis and detecting pool inefficiencies. - /// - /// Returns a map with keys: - /// - `'pooled'`: Number of writers currently in the pool - /// - `'maxPoolSize'`: Maximum pool capacity - /// - `'initialBufferSize'`: Initial buffer size for new writers - /// - `'maxReusableCapacity'`: Maximum buffer size for pooling - /// - `'acquireHit'`: Number of successful reuses from pool - /// - `'acquireMiss'`: Number of new writer allocations - /// - `'peakPoolSize'`: Maximum pool size reached - /// - `'discardedLargeBuffers'`: Number of oversized buffers discarded - /// - `'discardedPoolFull'`: Number of writers discarded when pool is full - /// - /// Example: - /// ```dart - /// final stats = BinaryWriterPool.stats; - /// print('Pooled writers: ${stats.pooled}'); - /// print('Hit rate: ${stats.hitRate}'); - /// ``` - static PoolStatistics get stats => PoolStatistics({ + /// Returns statistics for the default global pool. + static PoolStatistics get stats => global.instanceStats; + + /// Returns statistics for this specific pool instance. + PoolStatistics get instanceStats => PoolStatistics({ 'pooled': _pool.length, 'maxPoolSize': _maxPoolSize, 'initialBufferSize': _initialBufferSize, @@ -319,15 +266,11 @@ abstract final class BinaryWriterPool { 'discardedPoolFull': _discardedPoolFull, }); - /// Clears the pool, releasing all cached writers. - /// - /// Use this to: - /// - Free memory during low-activity periods - /// - Reset pool state in tests - /// - Handle memory pressure - /// - /// After clearing, subsequent [acquire] calls will create new writers. - static void clear() { + /// Clears the default global pool. + static void clear() => global.clearInstance(); + + /// Clears this specific pool instance. + void clearInstance() { _pool.clear(); _acquireHit = 0; _acquireMiss = 0; diff --git a/lib/src/stream/stream_binary_reader.dart b/lib/src/stream/stream_binary_reader.dart index 0bcba0e..e59f0e1 100644 --- a/lib/src/stream/stream_binary_reader.dart +++ b/lib/src/stream/stream_binary_reader.dart @@ -86,10 +86,13 @@ extension type StreamBinaryReader._(_StreamReaderState _s) // Invariant: availableBytes > 0 implies currentReader != null final cr = _s.currentReader!; final val = cr.readUint8(); + _s.availableBytes -= 1; + if (cr.availableBytes == 0) { _advanceChunk(); } + return val; } @@ -100,12 +103,16 @@ extension type StreamBinaryReader._(_StreamReaderState _s) @pragma('dart2js:tryInline') int readInt8() { _checkAvailable(1); + final cr = _s.currentReader!; final val = cr.readInt8(); + _s.availableBytes -= 1; + if (cr.availableBytes == 0) { _advanceChunk(); } + return val; } @@ -118,9 +125,12 @@ extension type StreamBinaryReader._(_StreamReaderState _s) @pragma('dart2js:tryInline') bool readBool() { _checkAvailable(1); + final cr = _s.currentReader!; final val = cr.readUint8(); + _s.availableBytes -= 1; + if (cr.availableBytes == 0) { _advanceChunk(); } @@ -137,15 +147,20 @@ extension type StreamBinaryReader._(_StreamReaderState _s) @pragma('dart2js:tryInline') int readUint16([Endian endian = Endian.big]) { _checkAvailable(2); + final cr = _s.currentReader!; if (cr.availableBytes >= 2) { final val = cr.readUint16(endian); + _s.availableBytes -= 2; + if (cr.availableBytes == 0) { _advanceChunk(); } + return val; } + return _readCrossChunk(2, (data) => data.getUint16(0, endian)); } @@ -158,15 +173,20 @@ extension type StreamBinaryReader._(_StreamReaderState _s) @pragma('dart2js:tryInline') int readInt16([Endian endian = Endian.big]) { _checkAvailable(2); + final cr = _s.currentReader!; if (cr.availableBytes >= 2) { final val = cr.readInt16(endian); + _s.availableBytes -= 2; + if (cr.availableBytes == 0) { _advanceChunk(); } + return val; } + return _readCrossChunk(2, (data) => data.getInt16(0, endian)); } @@ -179,15 +199,20 @@ extension type StreamBinaryReader._(_StreamReaderState _s) @pragma('dart2js:tryInline') int readUint32([Endian endian = Endian.big]) { _checkAvailable(4); + final cr = _s.currentReader!; if (cr.availableBytes >= 4) { final val = cr.readUint32(endian); + _s.availableBytes -= 4; + if (cr.availableBytes == 0) { _advanceChunk(); } + return val; } + return _readCrossChunk(4, (data) => data.getUint32(0, endian)); } @@ -200,15 +225,20 @@ extension type StreamBinaryReader._(_StreamReaderState _s) @pragma('dart2js:tryInline') int readInt32([Endian endian = Endian.big]) { _checkAvailable(4); + final cr = _s.currentReader!; if (cr.availableBytes >= 4) { final val = cr.readInt32(endian); + _s.availableBytes -= 4; + if (cr.availableBytes == 0) { _advanceChunk(); } + return val; } + return _readCrossChunk(4, (data) => data.getInt32(0, endian)); } @@ -221,15 +251,20 @@ extension type StreamBinaryReader._(_StreamReaderState _s) @pragma('dart2js:tryInline') int readUint64([Endian endian = Endian.big]) { _checkAvailable(8); + final cr = _s.currentReader!; if (cr.availableBytes >= 8) { final val = cr.readUint64(endian); + _s.availableBytes -= 8; + if (cr.availableBytes == 0) { _advanceChunk(); } + return val; } + return _readCrossChunk(8, (data) => data.getUint64(0, endian)); } @@ -242,15 +277,20 @@ extension type StreamBinaryReader._(_StreamReaderState _s) @pragma('dart2js:tryInline') int readInt64([Endian endian = Endian.big]) { _checkAvailable(8); + final cr = _s.currentReader!; if (cr.availableBytes >= 8) { final val = cr.readInt64(endian); + _s.availableBytes -= 8; + if (cr.availableBytes == 0) { _advanceChunk(); } + return val; } + return _readCrossChunk(8, (data) => data.getInt64(0, endian)); } @@ -263,15 +303,20 @@ extension type StreamBinaryReader._(_StreamReaderState _s) @pragma('dart2js:tryInline') double readFloat32([Endian endian = Endian.big]) { _checkAvailable(4); + final cr = _s.currentReader!; if (cr.availableBytes >= 4) { final val = cr.readFloat32(endian); + _s.availableBytes -= 4; + if (cr.availableBytes == 0) { _advanceChunk(); } + return val; } + return _readCrossChunk(4, (data) => data.getFloat32(0, endian)); } @@ -284,23 +329,38 @@ extension type StreamBinaryReader._(_StreamReaderState _s) @pragma('dart2js:tryInline') double readFloat64([Endian endian = Endian.big]) { _checkAvailable(8); + final cr = _s.currentReader!; if (cr.availableBytes >= 8) { final val = cr.readFloat64(endian); + _s.availableBytes -= 8; + if (cr.availableBytes == 0) { _advanceChunk(); } + return val; } + return _readCrossChunk(8, (data) => data.getFloat64(0, endian)); } @pragma('vm:prefer-inline') @pragma('dart2js:tryInline') T _readCrossChunk(int length, T Function(ByteData) parser) { + if (length <= 8) { + final scratch = _s.scratchBuffer; + for (var i = 0; i < length; i++) { + scratch[i] = readUint8(); + } + + return parser(_s.scratchData); + } + final bytes = readBytes(length); final data = ByteData.sublistView(bytes); + return parser(data); } @@ -315,10 +375,13 @@ extension type StreamBinaryReader._(_StreamReaderState _s) final before = cr.availableBytes; final value = cr.readVarUint(); final readLen = before - cr.availableBytes; + _s.availableBytes -= readLen; + if (cr.availableBytes == 0) { _advanceChunk(); } + return value; } @@ -330,8 +393,10 @@ extension type StreamBinaryReader._(_StreamReaderState _s) if ((byte & 0x80) == 0) { return result; } + shift += 7; } + throw const FormatException('VarInt is too long (more than 10 bytes)'); } @@ -342,6 +407,7 @@ extension type StreamBinaryReader._(_StreamReaderState _s) @pragma('dart2js:tryInline') int readVarInt() { final v = readVarUint(); + return (v >>> 1) ^ -(v & 1); } @@ -355,6 +421,7 @@ extension type StreamBinaryReader._(_StreamReaderState _s) if (length < 0) { throw RangeError.value(length, 'length', 'Length must be non-negative'); } + if (length == 0) { return emptyUintList_; } @@ -364,7 +431,9 @@ extension type StreamBinaryReader._(_StreamReaderState _s) final cr = _s.currentReader!; if (cr.availableBytes >= length) { final bytes = cr.readBytes(length); + _s.availableBytes -= length; + if (cr.availableBytes == 0) { _advanceChunk(); } @@ -383,7 +452,9 @@ extension type StreamBinaryReader._(_StreamReaderState _s) if (chunkAvailable >= remaining) { final bytes = chunkReader.readBytes(remaining); result.setRange(resultOffset, resultOffset + remaining, bytes); + _s.availableBytes -= remaining; + if (chunkReader.availableBytes == 0) { _advanceChunk(); } @@ -392,13 +463,16 @@ extension type StreamBinaryReader._(_StreamReaderState _s) if (chunkAvailable > 0) { final bytes = chunkReader.readBytes(chunkAvailable); result.setRange(resultOffset, resultOffset + chunkAvailable, bytes); + resultOffset += chunkAvailable; remaining -= chunkAvailable; _s.availableBytes -= chunkAvailable; } + _advanceChunk(); } } + return result; } @@ -417,6 +491,7 @@ extension type StreamBinaryReader._(_StreamReaderState _s) @pragma('dart2js:tryInline') Uint8List readVarBytes() { final length = readVarUint(); + return readBytes(length); } @@ -430,9 +505,11 @@ extension type StreamBinaryReader._(_StreamReaderState _s) if (length < 0) { throw RangeError.value(length, 'length', 'Length must be non-negative'); } + if (length == 0) { return ''; } + _checkAvailable(length); final cr = _s.currentReader!; @@ -442,6 +519,7 @@ extension type StreamBinaryReader._(_StreamReaderState _s) if (cr.availableBytes == 0) { _advanceChunk(); } + return value; } @@ -458,6 +536,7 @@ extension type StreamBinaryReader._(_StreamReaderState _s) @pragma('dart2js:tryInline') String readVarString({bool allowMalformed = false}) { final length = readVarUint(); + return readString(length, allowMalformed: allowMalformed); } @@ -472,6 +551,7 @@ extension type StreamBinaryReader._(_StreamReaderState _s) bool allowMalformed = false, }) { final length = _readLength(lengthEncoding); + return readString(length, allowMalformed: allowMalformed); } @@ -494,7 +574,9 @@ extension type StreamBinaryReader._(_StreamReaderState _s) if (length < 0) { throw RangeError.value(length, 'length', 'Length must be non-negative'); } + _checkAvailable(length); + var remaining = length; while (remaining > 0) { @@ -521,6 +603,7 @@ extension type StreamBinaryReader._(_StreamReaderState _s) if (length < 0) { throw RangeError.value(length, 'length', 'Length must be non-negative'); } + return _s.availableBytes >= length; } } @@ -528,10 +611,21 @@ extension type StreamBinaryReader._(_StreamReaderState _s) /// Internal state holder for [StreamBinaryReader]. final class _StreamReaderState extends ChunkedTransactionalState implements TransactionalReader> { - _StreamReaderState() : bookmarkReaderOffset = Int32List(16), super(); + _StreamReaderState() + : bookmarkReaderOffset = Int32List(16), + scratchBuffer = Uint8List(8), + super() { + scratchData = ByteData.sublistView(scratchBuffer); + } BinaryReader? currentReader; + /// Pre-allocated buffer for zero-allocation cross-chunk primitive reads. + final Uint8List scratchBuffer; + + /// ByteData view for the [scratchBuffer]. + late final ByteData scratchData; + // Zero-allocation bookmarks using parallel arrays Int32List bookmarkReaderOffset; diff --git a/lib/src/stream/transactional_stream_transformer.dart b/lib/src/stream/transactional_stream_transformer.dart index eea1527..ec69bed 100644 --- a/lib/src/stream/transactional_stream_transformer.dart +++ b/lib/src/stream/transactional_stream_transformer.dart @@ -1,4 +1,5 @@ import 'dart:async'; + import 'transactional_reader.dart'; /// A [StreamTransformer] that simplifies parsing messages from a stream diff --git a/pubspec.yaml b/pubspec.yaml index 69d33a7..8a127d2 100644 --- a/pubspec.yaml +++ b/pubspec.yaml @@ -1,7 +1,7 @@ name: pro_binary description: Efficient binary serialization library for Dart. Encodes and decodes various data types. -version: 5.1.0 +version: 5.2.0 repository: https://github.com/pro100andrey/pro_binary issue_tracker: https://github.com/pro100andrey/pro_binary/issues @@ -31,7 +31,7 @@ dependencies: dev_dependencies: benchmark_harness: ^2.4.0 - pro_lints: ^6.0.0 + pro_lints: ^6.1.0 test: ^1.31.1 diff --git a/test/stream/stream_binary_reader_test.dart b/test/stream/stream_binary_reader_test.dart index 7468e1a..02a0fc6 100644 --- a/test/stream/stream_binary_reader_test.dart +++ b/test/stream/stream_binary_reader_test.dart @@ -25,6 +25,27 @@ void main() { expect(reader.readUint32(), equals(42)); }); + test('readFloat64 handles complex chunk boundary', () { + // 3.141592653589793 in big-endian bytes + final bytes = [0x40, 0x09, 0x21, 0xFB, 0x54, 0x44, 0x2D, 0x18]; + reader + ..addChunk(bytes.sublist(0, 2)) + ..addChunk(bytes.sublist(2, 5)) + ..addChunk(bytes.sublist(5, 8)); + expect(reader.readFloat64(), equals(3.141592653589793)); + }); + + test('readInt64 handles chunk boundary', () { + // 0x0102030405060708 + final bytes = [1, 2, 3, 4, 5, 6, 7, 8]; + reader + ..addChunk(bytes.sublist(0, 7)) + ..addChunk(bytes.sublist(7, 8)); + // + // ignore: avoid_js_rounded_ints + expect(reader.readInt64(), equals(0x0102030405060708)); + }); + test('readVarUint handles chunk boundary', () { // 300 is [0xAC, 0x02] reader diff --git a/test/unit/binary_writer_pool_test.dart b/test/unit/binary_writer_pool_test.dart index 98ee62c..ffe4930 100644 --- a/test/unit/binary_writer_pool_test.dart +++ b/test/unit/binary_writer_pool_test.dart @@ -125,5 +125,45 @@ void main() { }, ); }); + + group('Custom Instances', () { + test('custom pool is isolated from global pool', () { + final customPool = BinaryWriterPool(maxPoolSize: 5); + final writer = customPool.acquireInstance(); + customPool.releaseInstance(writer); + + expect(customPool.instanceStats.pooled, equals(1)); + expect(BinaryWriterPool.stats.pooled, equals(0)); + }); + + test('custom pool uses its own configuration', () { + final customPool = BinaryWriterPool( + maxPoolSize: 2, + initialBufferSize: 256, + maxReusableCapacity: 512, + ); + + final stats = customPool.instanceStats; + expect(stats.maxPoolSize, equals(2)); + expect(stats.initialBufferSize, equals(256)); + expect(stats.maxReusableCapacity, equals(512)); + }); + + test('withWriterInstance works correctly', () { + final customPool = BinaryWriterPool(); + final result = customPool.withWriterInstance((w) { + w.writeUint8(100); + return w.toBytes(); + }); + expect(result, equals([100])); + expect(customPool.instanceStats.pooled, equals(1)); + }); + + test('reconfigure updates instance settings', () { + final customPool = BinaryWriterPool(maxPoolSize: 5) + ..reconfigure(maxPoolSize: 10); + expect(customPool.instanceStats.maxPoolSize, equals(10)); + }); + }); }); } diff --git a/test/unit/binary_writer_skip_shift_test.dart b/test/unit/binary_writer_skip_shift_test.dart new file mode 100644 index 0000000..afa4878 --- /dev/null +++ b/test/unit/binary_writer_skip_shift_test.dart @@ -0,0 +1,231 @@ +import 'dart:convert'; + +import 'package:pro_binary/pro_binary.dart'; +import 'package:test/test.dart'; + +void main() { + group('BinaryWriter skip', () { + late BinaryWriter writer; + + setUp(() { + writer = BinaryWriter(); + }); + + test('skip 0 bytes is a no-op', () { + writer.writeUint32(42); + final before = writer.bytesWritten; + writer.skip(0); + expect(writer.bytesWritten, equals(before)); + }); + + test('skip advances write position', () { + writer.writeUint32(42); + expect(writer.bytesWritten, equals(4)); + writer.skip(8); + expect(writer.bytesWritten, equals(12)); + }); + + test('skip expands buffer when needed', () { + final initialCapacity = writer.capacity; + writer.skip(initialCapacity + 100); + expect(writer.bytesWritten, equals(initialCapacity + 100)); + expect(writer.capacity, greaterThan(initialCapacity)); + }); + + test('skip negative throws RangeError', () { + expect(() => writer.skip(-1), throwsA(isA())); + }); + + test('skip then write verifies data integrity', () { + writer + ..writeUint32(0xDEADBEEF) + ..skip(4) + ..writeUint32(0xCAFEBABE); + // skip advances offset, so second write goes to position 8 + expect(writer[8], equals(0xCA)); + expect(writer[9], equals(0xFE)); + expect(writer[10], equals(0xBA)); + expect(writer[11], equals(0xBE)); + }); + + test('skip then backpatch overwrites skipped bytes', () { + writer + ..writeUint32(0) // placeholder + ..skip(4) // reserve 4 bytes + ..writeUint32(0x12345678); + // backpatch: overwrite the skipped region + writer[0] = 0x99; + writer[1] = 0x88; + writer[2] = 0x77; + writer[3] = 0x66; + expect(writer[0], equals(0x99)); + expect(writer[1], equals(0x88)); + expect(writer[2], equals(0x77)); + expect(writer[3], equals(0x66)); + }); + }); + + group('BinaryWriter shiftBytes', () { + late BinaryWriter writer; + + setUp(() { + writer = BinaryWriter(); + }); + + test('shift block left within buffer', () { + writer + ..writeUint32(0x11111111) + ..writeUint32(0x22222222) + ..writeUint32(0x33333333) + // shift bytes [4..8) to position 0 + ..shiftBytes(4, 8, 0); + expect(writer.bytesWritten, equals(12)); + expect(writer[0], equals(0x22)); + expect(writer[1], equals(0x22)); + expect(writer[2], equals(0x22)); + expect(writer[3], equals(0x22)); + }); + + test('shift to position 0', () { + writer + ..writeUint8(0) + ..writeUint8(0) + ..writeUint8(0xAA) + ..writeUint8(0xBB) + ..shiftBytes(2, 4, 0); + // end == offset, so offset becomes target + length = 2 + expect(writer.bytesWritten, equals(2)); + expect(writer[0], equals(0xAA)); + expect(writer[1], equals(0xBB)); + }); + + test('shift with target == start is a no-op', () { + writer + ..writeUint8(0xAA) + ..writeUint8(0xBB) + ..shiftBytes(0, 2, 0); + expect(writer[0], equals(0xAA)); + expect(writer[1], equals(0xBB)); + }); + + test('shift with start == end (empty range) is a no-op', () { + writer + ..writeUint8(0xAA) + ..writeUint8(0xBB) + ..shiftBytes(2, 2, 0); + expect(writer[0], equals(0xAA)); + expect(writer[1], equals(0xBB)); + }); + + test('shift updates offset when shifting the end boundary', () { + writer + ..writeUint32(0x11111111) + ..writeUint32(0x22222222) + ..writeUint32(0x33333333); + expect(writer.bytesWritten, equals(12)); + // shift [8..12) to [4..8), end == offset so offset should update + writer.shiftBytes(8, 12, 4); + expect(writer.bytesWritten, equals(8)); + }); + + test('shift does not update offset when not shifting the end', () { + writer + ..writeUint32(0x11111111) + ..writeUint32(0x22222222) + ..writeUint32(0x33333333); + expect(writer.bytesWritten, equals(12)); + // shift [4..8) to [0..4), end != offset so offset stays + writer.shiftBytes(4, 8, 0); + expect(writer.bytesWritten, equals(12)); + }); + + test('shift negative start throws AssertionError', () { + writer.writeUint8(0xAA); + expect(() => writer.shiftBytes(-1, 1, 0), throwsA(isA())); + }); + + test('shift end < start throws AssertionError', () { + writer.writeUint8(0xAA); + expect(() => writer.shiftBytes(1, 0, 0), throwsA(isA())); + }); + + test('shift end > offset throws AssertionError', () { + writer.writeUint8(0xAA); + expect(() => writer.shiftBytes(0, 5, 0), throwsA(isA())); + }); + + test('shift target < 0 throws AssertionError', () { + writer.writeUint8(0xAA); + expect(() => writer.shiftBytes(0, 1, -1), throwsA(isA())); + }); + + test('shift target > start throws AssertionError', () { + writer + ..writeUint8(0xAA) + ..writeUint8(0xBB); + expect(() => writer.shiftBytes(0, 1, 1), throwsA(isA())); + }); + + test('shift preserves data integrity for non-shifted bytes', () { + writer + ..writeUint8(0x01) + ..writeUint8(0x02) + ..writeUint8(0x03) + ..writeUint8(0x04) + ..writeUint8(0x05) + // shift [2..4) to [0..2) + ..shiftBytes(2, 4, 0); + expect(writer.bytesWritten, equals(5)); + expect(writer[0], equals(0x03)); + expect(writer[1], equals(0x04)); + expect(writer[2], equals(0x03)); + expect(writer[3], equals(0x04)); + expect(writer[4], equals(0x05)); + }); + }); + + group('BinaryWriter Reserve & Backpatch pattern', () { + late BinaryWriter writer; + + setUp(() { + writer = BinaryWriter(); + }); + + test('full reserve and backpatch with shift', () { + // Reserve 8 bytes for header (over-allocated) + writer + ..skip(8) + // Write payload + ..writeUint32(0x11111111) + ..writeUint32(0x22222222); + expect(writer.bytesWritten, equals(16)); + + // Actual header is 4 bytes, shift payload left by 4 + writer.shiftBytes(8, 16, 4); + expect(writer.bytesWritten, equals(12)); + + // Now backpatch the 4-byte header using seek to go back + writer + ..seek(0) + ..writeUint32(12); + // 12 = 0x0000000C in big-endian + expect(writer[0], equals(0x00)); + expect(writer[1], equals(0x00)); + expect(writer[2], equals(0x00)); + expect(writer[3], equals(0x0C)); + }); + + test('reserve and backpatch via direct write', () { + writer + ..skip(4) + ..writeVarString('hello'); + // Backpatch the length at position 0 + final len = utf8.encode('hello').length; + writer[0] = len & 0xFF; + writer[1] = (len >> 8) & 0xFF; + writer[2] = (len >> 16) & 0xFF; + writer[3] = (len >> 24) & 0xFF; + expect(writer[0], equals(len & 0xFF)); + }); + }); +}