From 1a6329f5be6fbf2be87d2e3ccd960bebc582863e Mon Sep 17 00:00:00 2001 From: Yagiz Nizipli Date: Mon, 12 Aug 2024 10:55:30 -0400 Subject: [PATCH] add zlib.Gzip class --- src/node/internal/internal_errors.ts | 6 + src/node/internal/internal_zlib.ts | 301 ++++++++++++++++++-- src/node/internal/internal_zlib_base.ts | 325 ++++++++++++++++++++++ src/node/internal/streams_util.d.ts | 8 + src/node/internal/validators.ts | 44 +++ src/node/internal/zlib.d.ts | 20 ++ src/node/tsconfig.json | 8 +- src/node/zlib.ts | 55 +++- src/workerd/api/node/zlib-util.c++ | 347 ++++++++++++++++++++++++ src/workerd/api/node/zlib-util.h | 271 +++++++++++++++--- 10 files changed, 1328 insertions(+), 57 deletions(-) create mode 100644 src/node/internal/internal_zlib_base.ts create mode 100644 src/node/internal/streams_util.d.ts diff --git a/src/node/internal/internal_errors.ts b/src/node/internal/internal_errors.ts index 2a85cf33135..00727c0afe3 100644 --- a/src/node/internal/internal_errors.ts +++ b/src/node/internal/internal_errors.ts @@ -508,6 +508,12 @@ export class ERR_STREAM_UNSHIFT_AFTER_END_EVENT extends NodeError { } } +export class ERR_BUFFER_TOO_LARGE extends NodeRangeError { + constructor(value: number) { + super("ERR_BUFFER_TOO_LARGE", `Cannot create a Buffer larger than ${value} bytes`); + } +} + export function aggregateTwoErrors(innerError: any, outerError: any) { if (innerError && outerError && innerError !== outerError) { if (Array.isArray(outerError.errors)) { diff --git a/src/node/internal/internal_zlib.ts b/src/node/internal/internal_zlib.ts index 275c13a631c..0568e27a2fb 100644 --- a/src/node/internal/internal_zlib.ts +++ b/src/node/internal/internal_zlib.ts @@ -1,22 +1,43 @@ +// Copyright (c) 2017-2022 Cloudflare, Inc. +// Licensed under the Apache 2.0 license found in the LICENSE file or at: +// https://opensource.org/licenses/Apache-2.0 +// Copyright Joyent and Node contributors. All rights reserved. MIT license. + import { default as zlibUtil } from 'node-internal:zlib'; import { Buffer } from 'node-internal:internal_buffer'; -import { validateUint32 } from 'node-internal:validators'; -import { isArrayBufferView } from 'node-internal:internal_types'; +import { validateUint32, checkRangesOrGetDefault } from 'node-internal:validators'; import { ERR_INVALID_ARG_TYPE } from 'node-internal:internal_errors'; +import { + isArrayBufferView, + isAnyArrayBuffer +} from 'node-internal:internal_types'; +import type { ZlibOptions } from "node:zlib"; +import { ZlibBase } from 'node-internal:internal_zlib_base'; +import assert from 'node:assert'; -function crc32(data: ArrayBufferView | string, value: number = 0): number { - if (typeof data === 'string') { - data = Buffer.from(data); - } else if (!isArrayBufferView(data)) { - throw new ERR_INVALID_ARG_TYPE('data', 'ArrayBufferView', typeof data); - } - validateUint32(value, 'value'); - return zlibUtil.crc32(data, value); -} +const { + CONST_DEFLATE, + CONST_DEFLATERAW, + CONST_INFLATE, + CONST_INFLATERAW, + CONST_GUNZIP, + CONST_GZIP, + CONST_UNZIP, + CONST_Z_DEFAULT_STRATEGY, + CONST_Z_DEFAULT_MEMLEVEL, + CONST_Z_DEFAULT_WINDOWBITS, + CONST_Z_DEFAULT_COMPRESSION, + CONST_Z_FIXED, + CONST_Z_MAX_LEVEL, + CONST_Z_MAX_MEMLEVEL, + CONST_Z_MAX_WINDOWBITS, + CONST_Z_MIN_LEVEL, + CONST_Z_MIN_MEMLEVEL, + CONST_Z_SYNC_FLUSH, +} = zlibUtil; - -const constPrefix = 'CONST_'; -const constants = {}; +const constPrefix = 'CONST_'; +export const constants: Record = {}; // eslint-disable-next-line @typescript-eslint/no-unsafe-argument Object.defineProperties(constants, Object.fromEntries(Object.entries(Object.getPrototypeOf(zlibUtil)) @@ -29,7 +50,253 @@ Object.defineProperties(constants, Object.fromEntries(Object.entries(Object.getP }]) )); -export { - crc32, - constants, +export function crc32(data: ArrayBufferView | string, value: number = 0): number { + if (typeof data === 'string') { + data = Buffer.from(data); + } else if (!isArrayBufferView(data)) { + throw new ERR_INVALID_ARG_TYPE('data', 'ArrayBufferView', typeof data); + } + validateUint32(value, 'value'); + return zlibUtil.crc32(data, value); +} + +function _processCallback(): void { + // // This callback's context (`this`) is the `_handle` (ZCtx) object. It is + // // important to null out the values once they are no longer needed since + // // `_handle` can stay in memory long after the buffer is needed. + // const handle = this; + // const self = this[owner_symbol]; + // const state = self._writeState; + + // if (self.destroyed) { + // this.buffer = null; + // this.cb(); + // return; + // } + + // const availOutAfter = state[0]; + // const availInAfter = state[1]; + + // const inDelta = handle.availInBefore - availInAfter; + // self.bytesWritten += inDelta; + + // const have = handle.availOutBefore - availOutAfter; + // let streamBufferIsFull = false; + // if (have > 0) { + // const out = self._outBuffer.slice(self._outOffset, self._outOffset + have); + // self._outOffset += have; + // streamBufferIsFull = !self.push(out); + // } else { + // assert(have === 0, 'have should not go down'); + // } + + // if (self.destroyed) { + // this.cb(); + // return; + // } + + // // Exhausted the output buffer, or used all the input create a new one. + // if (availOutAfter === 0 || self._outOffset >= self._chunkSize) { + // handle.availOutBefore = self._chunkSize; + // self._outOffset = 0; + // self._outBuffer = Buffer.allocUnsafe(self._chunkSize); + // } + + // if (availOutAfter === 0) { + // // Not actually done. Need to reprocess. + // // Also, update the availInBefore to the availInAfter value, + // // so that if we have to hit it a third (fourth, etc.) time, + // // it'll have the correct byte counts. + // handle.inOff += inDelta; + // handle.availInBefore = availInAfter; + + + // if (!streamBufferIsFull) { + // this.write(handle.flushFlag, + // this.buffer, // in + // handle.inOff, // in_off + // handle.availInBefore, // in_len + // self._outBuffer, // out + // self._outOffset, // out_off + // self._chunkSize); // out_len + // } else { + // const oldRead = self._read; + // self._read = (n) => { + // self._read = oldRead; + // this.write(handle.flushFlag, + // this.buffer, // in + // handle.inOff, // in_off + // handle.availInBefore, // in_len + // self._outBuffer, // out + // self._outOffset, // out_off + // self._chunkSize); // out_len + // self._read(n); + // }; + // } + // return; + // } + + // if (availInAfter > 0) { + // // If we have more input that should be written, but we also have output + // // space available, that means that the compression library was not + // // interested in receiving more data, and in particular that the input + // // stream has ended early. + // // This applies to streams where we don't check data past the end of + // // what was consumed; that is, everything except Gunzip/Unzip. + // self.push(null); + // } + + // // Finished with the chunk. + // this.buffer = null; + // this.cb(); +} + +class Zlib extends ZlibBase { + public _level = CONST_Z_DEFAULT_COMPRESSION; + public _strategy = CONST_Z_DEFAULT_STRATEGY; + + public constructor(options: ZlibOptions | null | undefined, mode: number) { + let windowBits = CONST_Z_DEFAULT_WINDOWBITS; + let level = CONST_Z_DEFAULT_COMPRESSION; + let memLevel = CONST_Z_DEFAULT_MEMLEVEL; + let strategy = CONST_Z_DEFAULT_STRATEGY; + let dictionary: ZlibOptions['dictionary'] | undefined = undefined; + + if (options != null) { + // Special case: + // - Compression: 0 is an invalid case. + // - Decompression: 0 indicates zlib to use the window size in the header of the compressed stream. + if ((options.windowBits == null || options.windowBits === 0) && (mode === CONST_INFLATE || mode === CONST_GUNZIP || mode === CONST_UNZIP)) { + windowBits = 0; + } else { + // `{ windowBits: 8 }` is valid for DEFLATE but not for GZIP. + const min = zlibUtil.CONST_Z_MIN_WINDOWBITS + (mode === CONST_GZIP ? 1 : 0); + windowBits = checkRangesOrGetDefault(options.windowBits, 'options.windowBits', min, CONST_Z_MAX_WINDOWBITS, CONST_Z_DEFAULT_WINDOWBITS); + } + + level = checkRangesOrGetDefault(options.level, 'options.level', CONST_Z_MIN_LEVEL, CONST_Z_MAX_LEVEL, CONST_Z_DEFAULT_MEMLEVEL); + memLevel = checkRangesOrGetDefault(options.memLevel, 'options.memLevel', CONST_Z_MIN_MEMLEVEL, CONST_Z_MAX_MEMLEVEL, CONST_Z_DEFAULT_MEMLEVEL); + strategy = checkRangesOrGetDefault(options.strategy, 'options.strategy', CONST_Z_DEFAULT_STRATEGY, CONST_Z_FIXED, CONST_Z_DEFAULT_STRATEGY); + dictionary = options.dictionary; + + if (dictionary != null && !isArrayBufferView(dictionary)) { + if (isAnyArrayBuffer(dictionary)) { + dictionary = Buffer.from(dictionary); + } else { + throw new ERR_INVALID_ARG_TYPE('options.dictionary', ['Buffer', 'TypedArray', 'DataView', 'ArrayBuffer'], dictionary); + } + } + } + + + const writeState = new Uint32Array(2); + const handle = new zlibUtil.ZlibStream(mode); + // TODO(soon): Move this to constructor. + handle.initialize(windowBits, level, memLevel, strategy, writeState, _processCallback, dictionary); + + super(options ?? {}, mode, handle); + + this._level = level; + this._strategy = strategy; + this._handle = handle; + this._writeState = writeState; + } + + public params(level: number, strategy: number, callback: () => never): void { + checkRangesOrGetDefault(level, 'level', CONST_Z_MIN_LEVEL, CONST_Z_MAX_LEVEL); + checkRangesOrGetDefault(strategy, 'strategy', CONST_Z_DEFAULT_STRATEGY, CONST_Z_FIXED); + + if (this._level !== level || this._strategy !== strategy) { + this.flush(CONST_Z_SYNC_FLUSH, this.#paramsAfterFlushCallback.bind(this, level, strategy, callback)); + } else { + queueMicrotask(callback); + } + } + + // This callback is used by `.params()` to wait until a full flush happened + // before adjusting the parameters. In particular, the call to the native + // `params()` function should not happen while a write is currently in progress + // on the threadpool. + #paramsAfterFlushCallback(level: number, strategy: number, callback: () => void): void { + assert(this._handle, 'zlib binding closed'); + this._handle.params(level, strategy); + if (!this.destroyed) { + this._level = level; + this._strategy = strategy; + callback?.(); + } + } +} + +export class Gzip extends Zlib { + public constructor(options: ZlibOptions) { + super(options, CONST_GZIP); + } +} + +export class Gunzip extends Zlib { + public constructor(options: ZlibOptions) { + super(options, CONST_GUNZIP); + } +} + +export class Deflate extends Zlib { + public constructor(options: ZlibOptions) { + super(options, CONST_DEFLATE); + } +} + +export class DeflateRaw extends Zlib { + public constructor(options: ZlibOptions) { + if (options?.windowBits === 8) { + options.windowBits = 9 + } + super(options, CONST_DEFLATERAW); + } +} + +export class Inflate extends Zlib { + public constructor(options: ZlibOptions) { + super(options, CONST_INFLATE); + } +} + +export class InflateRaw extends Zlib { + public constructor(options: ZlibOptions) { + super(options, CONST_INFLATERAW); + } +} + +export class Unzip extends Zlib { + public constructor(options: ZlibOptions) { + super(options, CONST_UNZIP); + } +} + +export function createGzip(options: ZlibOptions): Gzip { + return new Gzip(options); +} + +export function createGunzip(options: ZlibOptions): Gunzip { + return new Gunzip(options); +} + +export function createDeflate(options: ZlibOptions): Deflate { + return new Deflate(options); +} + +export function createDeflateRaw(options: ZlibOptions): DeflateRaw { + return new DeflateRaw(options); +} + +export function createInflate(options: ZlibOptions): Inflate { + return new Inflate(options); +} + +export function createInflateRaw(options: ZlibOptions): InflateRaw { + return new InflateRaw(options); +} + +export function createUnzip(options: ZlibOptions): Unzip { + return new Unzip(options); } diff --git a/src/node/internal/internal_zlib_base.ts b/src/node/internal/internal_zlib_base.ts new file mode 100644 index 00000000000..807a657a723 --- /dev/null +++ b/src/node/internal/internal_zlib_base.ts @@ -0,0 +1,325 @@ +// Copyright (c) 2017-2022 Cloudflare, Inc. +// Licensed under the Apache 2.0 license found in the LICENSE file or at: +// https://opensource.org/licenses/Apache-2.0 +// Copyright Joyent and Node contributors. All rights reserved. MIT license. + +import { default as zlibUtil } from 'node-internal:zlib'; +import { Buffer } from 'node-internal:internal_buffer'; +import { checkRangesOrGetDefault, checkFiniteNumber } from 'node-internal:validators'; +import { ERR_OUT_OF_RANGE, ERR_BUFFER_TOO_LARGE } from 'node-internal:internal_errors'; +import { kMaxLength } from 'node-internal:internal_buffer'; +import { Transform } from 'node-internal:streams_transform' +import type { ZlibOptions } from 'node:zlib'; +import type { DuplexOptions } from 'node:stream'; +import { eos as finished } from './streams_util'; +import assert from 'node-internal:internal_assert'; + +const { + CONST_BROTLI_DECODE, + CONST_BROTLI_ENCODE, + CONST_BROTLI_OPERATION_EMIT_METADATA, + CONST_BROTLI_OPERATION_PROCESS, + CONST_Z_BLOCK, + CONST_Z_DEFAULT_CHUNK, + CONST_Z_MIN_CHUNK, + CONST_Z_NO_FLUSH, + CONST_Z_FINISH, + CONST_Z_FULL_FLUSH, + CONST_Z_SYNC_FLUSH, + CONST_Z_PARTIAL_FLUSH, +} = zlibUtil; + +const FLUSH_BOUND = [ + [ CONST_Z_NO_FLUSH, CONST_Z_BLOCK ], + [ CONST_BROTLI_OPERATION_PROCESS, CONST_BROTLI_OPERATION_EMIT_METADATA ], +] as const; +const FLUSH_BOUND_IDX_NORMAL: number = 0; +const FLUSH_BOUND_IDX_BROTLI: number = 1; + +const kFlushFlag = Symbol('kFlushFlag'); + +// If a flush is scheduled while another flush is still pending, a way to figure +// out which one is the "stronger" flush is needed. +// This is currently only used to figure out which flush flag to use for the +// last chunk. +// Roughly, the following holds: +// Z_NO_FLUSH (< Z_TREES) < Z_BLOCK < Z_PARTIAL_FLUSH < +// Z_SYNC_FLUSH < Z_FULL_FLUSH < Z_FINISH +const flushiness: number[] = []; +const kFlushFlagList = [CONST_Z_NO_FLUSH, CONST_Z_BLOCK, CONST_Z_PARTIAL_FLUSH, + CONST_Z_SYNC_FLUSH, CONST_Z_FULL_FLUSH, CONST_Z_FINISH]; +for (let i = 0; i < kFlushFlagList.length; i++) { + flushiness[kFlushFlagList[i] as number] = i; +} + +type BufferWithFlushFlag = Buffer & { [kFlushFlag]: number}; + +// Set up a list of 'special' buffers that can be written using .write() +// from the .flush() code as a way of introducing flushing operations into the +// write sequence. +const kFlushBuffers: BufferWithFlushFlag[] = []; +{ + const dummyArrayBuffer = new ArrayBuffer(0); + for (const flushFlag of kFlushFlagList) { + const buf = Buffer.from(dummyArrayBuffer) as BufferWithFlushFlag; + buf[kFlushFlag] = flushFlag; + kFlushBuffers[flushFlag] = buf; + } +} + +type ZlibBaseDefaultOptions = { + flush?: number | undefined; + finishFlush?: number | undefined; + fullFlush?: number | undefined +} + +const zlibDefaultOptions = { + flush: CONST_Z_NO_FLUSH, + finishFlush: CONST_Z_FINISH, + fullFlush: CONST_Z_FULL_FLUSH, +}; + +export class ZlibBase extends Transform { + public bytesWritten: number = 0; + + public _maxOutputLength: number; + public _outBuffer: Buffer; + public _outOffset: number = 0; + public _chunkSize: number; + public _defaultFlushFlag: number | undefined; + public _finishFlushFlag: number | undefined; + public _defaultFullFlushFlag: number | undefined; + public _info: unknown; + public _handle: zlibUtil.ZlibStream | null = null; + public _writeState = new Uint32Array(2); + + public constructor(opts: ZlibOptions & DuplexOptions, mode: number, handle: zlibUtil.ZlibStream, { flush, finishFlush, fullFlush }: ZlibBaseDefaultOptions = zlibDefaultOptions) { + let chunkSize = CONST_Z_DEFAULT_CHUNK; + let maxOutputLength = kMaxLength; + + let flushBoundIdx; + if (mode !== CONST_BROTLI_ENCODE && mode !== CONST_BROTLI_DECODE) { + flushBoundIdx = FLUSH_BOUND_IDX_NORMAL; + } else { + flushBoundIdx = FLUSH_BOUND_IDX_BROTLI; + } + + if (opts) { + if (opts.chunkSize != null) { + chunkSize = opts.chunkSize; + } + if (!checkFiniteNumber(chunkSize, 'options.chunkSize')) { + chunkSize = CONST_Z_DEFAULT_CHUNK; + } else if (chunkSize < CONST_Z_MIN_CHUNK) { + throw new ERR_OUT_OF_RANGE('options.chunkSize', + `>= ${CONST_Z_MIN_CHUNK}`, chunkSize); + } + + flush = checkRangesOrGetDefault( + opts.flush, 'options.flush', + FLUSH_BOUND[flushBoundIdx]![0], FLUSH_BOUND[flushBoundIdx]![1], flush as number); + + finishFlush = checkRangesOrGetDefault( + opts.finishFlush, 'options.finishFlush', + FLUSH_BOUND[flushBoundIdx]![0], FLUSH_BOUND[flushBoundIdx]![1], + finishFlush as number); + + maxOutputLength = checkRangesOrGetDefault( + opts.maxOutputLength, 'options.maxOutputLength', + 1, kMaxLength, kMaxLength); + + if (opts.encoding || opts.objectMode || opts.writableObjectMode) { + opts = { ...opts }; + opts.encoding = undefined; + opts.objectMode = false; + opts.writableObjectMode = false; + } + } + + // @ts-expect-error TODO: Find a way to avoid having "unknown" + super({ autoDestroy: true, ...opts } as unknown); + this._handle = handle; + // handle[owner_symbol] = this; + this._outBuffer = Buffer.allocUnsafe(chunkSize); + this._outOffset = 0; + + this._chunkSize = chunkSize; + this._defaultFlushFlag = flush; + this._finishFlushFlag = finishFlush; + this._defaultFullFlushFlag = fullFlush; + this._info = opts && opts.info; + this._maxOutputLength = maxOutputLength; + } + + public flush(kind: number | (() => void), callback: (() => void) | undefined = undefined): void { + if (typeof kind === 'function' || (kind == null && !callback)) { + callback = kind; + kind = this._defaultFlushFlag as number; + } + + if (this.writableFinished) { + if (callback) { + queueMicrotask(callback); + } + } else if (this.writableEnded) { + if (callback) { + queueMicrotask(callback); + } + } else { + // encoding is not used. utf8 is passed to conform to typescript. + this.write(kFlushBuffers[kind], 'utf8', callback); + } + } + + public close(callback?: (() => void)): void { + if (callback) { + finished(this, callback); + } + this.destroy(); + } + + public override _destroy(err: T, callback: (err: T) => never): void { + this.#close(); + callback(err); + } + + public override _transform(chunk: Buffer & { [kFlushFlag]?: number }, _encoding: BufferEncoding, cb: () => void): void { + let flushFlag = this._defaultFlushFlag; + // We use a 'fake' zero-length chunk to carry information about flushes from + // the public API to the actual stream implementation. + if (typeof chunk[kFlushFlag] === 'number') { + flushFlag = chunk[kFlushFlag]; + } + + // For the last chunk, also apply `_finishFlushFlag`. + if (this.writableEnded && this.writableLength === chunk.byteLength) { + flushFlag = maxFlush(flushFlag as number, this._finishFlushFlag as number); + } + this.#processChunk(chunk, flushFlag as number, cb); + } + + // _processChunk is left for backwards compatibility. + public _processChunk(chunk: Buffer, flushFlag: number, cb?: () => void): NodeJS.TypedArray | undefined { + if (typeof cb === 'function') { + this.#processChunk(chunk, flushFlag, cb); + return undefined; + } else { + return this.#processChunkSync(chunk, flushFlag); + } + } + + #close(): void { + // Caller may invoke .close after a zlib error (which will null _handle). + this._handle?.close(); + this._handle = null; + } + + #processChunk(chunk: Buffer, flushFlag: number, cb: () => void): void { + if (this._handle == null) { + return queueMicrotask(cb); + } + + this._handle.availOutBefore = this._chunkSize - this._outOffset; + this._handle.availInBefore = chunk.byteLength; + this._handle.flushFlag = flushFlag; + + this._handle.setCallback(cb); + + this._handle.write( + flushFlag, + chunk, // in + 0, // in_off + this._handle.availInBefore, // in_len + this._outBuffer, // out + this._outOffset, // out_off + this._handle.availOutBefore, // out_len + ); + } + + #processChunkSync(chunk: NodeJS.TypedArray, flushFlag: number): NodeJS.TypedArray { + let availInBefore = chunk.byteLength; + let availOutBefore = this._chunkSize - this._outOffset; + let inOff = 0; + let availOutAfter: number | undefined; + let availInAfter: number | undefined; + + let buffers: (Buffer | Uint8Array)[] = []; + let nread = 0; + let inputRead = 0; + const state = this._writeState; + let buffer = this._outBuffer; + let offset = this._outOffset; + const chunkSize = this._chunkSize; + + let error; + this.on('error', function onError(er) { + error = er; + }); + + while (true) { + this._handle?.write(flushFlag, + chunk, // in + inOff, // in_off + availInBefore, // in_len + buffer, // out + offset, // out_off + availOutBefore); // out_len + if (error) + throw error; + + // `as unknown as []` is done to prevent typescript from mistyping + // a fixed array of 2 having undefined value. + [availOutAfter, availInAfter] = state as unknown as [number, number]; + + const inDelta = (availInBefore - availInAfter); + inputRead += inDelta; + + const have = availOutBefore - availOutAfter; + if (have > 0) { + const out = buffer.slice(offset, offset + have); + offset += have; + buffers.push(out); + nread += out.byteLength; + + if (nread > this._maxOutputLength) { + this.#close(); + throw new ERR_BUFFER_TOO_LARGE(this._maxOutputLength); + } + + } else { + assert.strictEqual(have, 0, 'have should not go down'); + } + + // Exhausted the output buffer, or used all the input create a new one. + if (availOutAfter === 0 || offset >= chunkSize) { + availOutBefore = chunkSize; + offset = 0; + buffer = Buffer.allocUnsafe(chunkSize); + } + + if (availOutAfter === 0) { + // Not actually done. Need to reprocess. + // Also, update the availInBefore to the availInAfter value, + // so that if we have to hit it a third (fourth, etc.) time, + // it'll have the correct byte counts. + inOff += inDelta; + availInBefore = availInAfter; + } else { + break; + } + } + + this.bytesWritten = inputRead; + this.#close(); + + if (nread === 0) + return Buffer.alloc(0); + + return (buffers.length === 1 ? buffers[0] as NodeJS.TypedArray : Buffer.concat(buffers, nread)); + } + +} + +function maxFlush(a: number, b: number): number { + return (flushiness[a] as number) > (flushiness[b] as number) ? a : b; +} diff --git a/src/node/internal/streams_util.d.ts b/src/node/internal/streams_util.d.ts new file mode 100644 index 00000000000..1405a0ccf6d --- /dev/null +++ b/src/node/internal/streams_util.d.ts @@ -0,0 +1,8 @@ +import type { FinishedOptions } from 'node:stream'; + +type FinishedStream = NodeJS.ReadableStream | NodeJS.WritableStream | NodeJS.ReadWriteStream; +type FinishedCallback = (err?: NodeJS.ErrnoException | null) => void; + +export function eos(stream: FinishedStream, options: FinishedOptions): void; +export function eos(stream: FinishedStream, options: FinishedOptions, callback?: FinishedCallback): void; +export function eos(stream: FinishedStream, callback?: FinishedCallback): void; diff --git a/src/node/internal/validators.ts b/src/node/internal/validators.ts index 09efb29e877..7462c074f2b 100644 --- a/src/node/internal/validators.ts +++ b/src/node/internal/validators.ts @@ -203,6 +203,46 @@ export function validateArray(value: unknown, name: string, minLength = 0) { } } +// 1. Returns false for undefined and NaN +// 2. Returns true for finite numbers +// 3. Throws ERR_INVALID_ARG_TYPE for non-numbers +// 4. Throws ERR_OUT_OF_RANGE for infinite numbers +export function checkFiniteNumber(number: unknown, name: string): number is number { + // Common case + if (number === undefined) { + return false; + } + + if (Number.isFinite(number)) { + return true; // Is a valid number + } + + if (Number.isNaN(number)) { + return false; + } + + validateNumber(number, name); + + // Infinite numbers + throw new ERR_OUT_OF_RANGE(name, 'a finite number', number); +} + +// 1. Returns def for number when it's undefined or NaN +// 2. Returns number for finite numbers >= lower and <= upper +// 3. Throws ERR_INVALID_ARG_TYPE for non-numbers +// 4. Throws ERR_OUT_OF_RANGE for infinite numbers or numbers > upper or < lower +export function checkRangesOrGetDefault(number: unknown, name: string, lower: number, upper: number, def: number): number; +export function checkRangesOrGetDefault(number: unknown, name: string, lower: number, upper: number, def?: number | undefined): number | undefined; +export function checkRangesOrGetDefault(number: unknown, name: string, lower: number, upper: number, def: number | undefined = undefined): number | undefined { + if (!checkFiniteNumber(number, name)) { + return def; + } + if (number < lower || number > upper) { + throw new ERR_OUT_OF_RANGE(name, `>= ${lower} and <= ${upper}`, number); + } + return number; +} + export default { isInt32, isUint32, @@ -218,4 +258,8 @@ export default { validateOneOf, validateString, validateUint32, + + // Zlib specific + checkFiniteNumber, + checkRangesOrGetDefault, }; diff --git a/src/node/internal/zlib.d.ts b/src/node/internal/zlib.d.ts index 247e34aa816..104fb62028e 100644 --- a/src/node/internal/zlib.d.ts +++ b/src/node/internal/zlib.d.ts @@ -1,3 +1,5 @@ +import type {ZlibOptions} from "node:zlib"; + export function crc32(data: ArrayBufferView, value: number): number; // zlib.constants (part of the API contract for node:zlib) @@ -113,3 +115,21 @@ export const CONST_BROTLI_DECODER_ERROR_ALLOC_RING_BUFFER_1: number; export const CONST_BROTLI_DECODER_ERROR_ALLOC_RING_BUFFER_2: number; export const CONST_BROTLI_DECODER_ERROR_ALLOC_BLOCK_TYPE_TREES: number; export const CONST_BROTLI_DECODER_ERROR_UNREACHABLE: number; + +// IMPORTANT: Options are not exported deliberately. +// Please use `import type { ZlibOptions } from 'node:zlib';` which is +// defined by the @types/node package. + +export class ZlibStream { + public availOutBefore: number; + public availInBefore: number; + public flushFlag: number; + + public constructor(mode: number); + public initialize(windowBits: number, level: number, memLevel: number, strategy: number, writeState: NodeJS.TypedArray, processCallback: () => void, dictionary: ZlibOptions['dictionary']): void; + public close(): void; + public write(flushFlag: number, inputBuffer: NodeJS.TypedArray, inputOffset: number, inputLength: number, outputBuffer: NodeJS.TypedArray, outputOffset: number, outputLength: number): void; + public params(level: number, strategy: number): void; + + public setCallback(cb: () => void): void; +} diff --git a/src/node/tsconfig.json b/src/node/tsconfig.json index e052859d66c..9ec30cf7127 100644 --- a/src/node/tsconfig.json +++ b/src/node/tsconfig.json @@ -21,9 +21,7 @@ "moduleResolution": "node", "declaration": true, "types": [ - // todo: consume generated workerd types - // "@cloudflare/workers-types" - "@types/node", + "@types/node" ], "paths": { "node:*": ["./*"], @@ -33,7 +31,7 @@ "node:stream/*": ["./*"], "node-internal:*": ["./internal/*"], "cloudflare-internal:workers": ["./internal/workers.d.ts"], - "workerd:compatibility-flags": ["./internal/compatibility-flags.d.ts"], + "workerd:compatibility-flags": ["./internal/compatibility-flags.d.ts"] } }, "include": [ @@ -47,5 +45,5 @@ "path/*.ts", "path/*.js", "internal/*.ts" - ], + ] } diff --git a/src/node/zlib.ts b/src/node/zlib.ts index 488c0057fb6..23695482310 100644 --- a/src/node/zlib.ts +++ b/src/node/zlib.ts @@ -1,11 +1,64 @@ -import { crc32, constants } from 'node-internal:internal_zlib'; +import { + crc32, + constants, + Gzip, + Gunzip, + Deflate, + DeflateRaw, + Inflate, + InflateRaw, + Unzip, + createGzip, + createGunzip, + createDeflate, + createDeflateRaw, + createInflate, + createInflateRaw, + createUnzip, +} from 'node-internal:internal_zlib'; export { crc32, constants, + + // Classes + Gzip, + Gunzip, + Deflate, + DeflateRaw, + Inflate, + InflateRaw, + Unzip, + + // Convenience methods to create classes + createGzip, + createGunzip, + createDeflate, + createDeflateRaw, + createInflate, + createInflateRaw, + createUnzip, } export default { crc32, constants, + + // Classes + Gzip, + Gunzip, + Deflate, + DeflateRaw, + Inflate, + InflateRaw, + Unzip, + + // Convenience methods to create classes + createGzip, + createGunzip, + createDeflate, + createDeflateRaw, + createInflate, + createInflateRaw, + createUnzip, } diff --git a/src/workerd/api/node/zlib-util.c++ b/src/workerd/api/node/zlib-util.c++ index 567d4062d74..57890c37dce 100644 --- a/src/workerd/api/node/zlib-util.c++ +++ b/src/workerd/api/node/zlib-util.c++ @@ -1,11 +1,358 @@ // Copyright (c) 2017-2022 Cloudflare, Inc. // Licensed under the Apache 2.0 license found in the LICENSE file or at: // https://opensource.org/licenses/Apache-2.0 +// Copyright Joyent and Node contributors. All rights reserved. MIT license. + #include "zlib-util.h" namespace workerd::api::node { + uint32_t ZlibUtil::crc32Sync(kj::Array data, uint32_t value) { // Note: Bytef is defined in zlib.h return crc32(value, reinterpret_cast(data.begin()), data.size()); } + +void ZlibContext::initialize(int _level, int _windowBits, int _memLevel, int _strategy, kj::Array _dictionary) { + if (!((_windowBits == 0) && (mode == ZlibMode::INFLATE || mode == ZlibMode::GUNZIP || mode == ZlibMode::UNZIP))) { + JSG_ASSERT(_windowBits >= Z_MIN_WINDOWBITS && _windowBits <= Z_MAX_WINDOWBITS, Error, "Invalid windowBits"_kj); + } + + JSG_REQUIRE(_level >= Z_MIN_LEVEL && _level <= Z_MAX_LEVEL, Error, "Invalid compression level"_kj); + JSG_REQUIRE(_memLevel >= Z_MIN_MEMLEVEL && _memLevel <= Z_MAX_MEMLEVEL, Error, "Invalid memlevel"_kj); + JSG_REQUIRE(_strategy == Z_FILTERED || _strategy == Z_HUFFMAN_ONLY || + _strategy == Z_RLE || _strategy == Z_FIXED || + _strategy == Z_DEFAULT_STRATEGY, Error, "invalid strategy"_kj); + + level = _level; + windowBits = _windowBits; + memLevel = _memLevel; + strategy = _strategy; + flush = Z_NO_FLUSH; + + switch (mode) { + case ZlibMode::GZIP: + case ZlibMode::GUNZIP: + windowBits += 16; + break; + case ZlibMode::UNZIP: + windowBits += 32; + break; + case ZlibMode::DEFLATERAW: + case ZlibMode::INFLATERAW: + windowBits *= -1; + break; + default: + break; + } + + dictionary = kj::mv(_dictionary); +} + +kj::Maybe ZlibContext::getError() const { + // Acceptable error states depend on the type of zlib stream. + switch (err) { + case Z_OK: + case Z_BUF_ERROR: + if (stream.avail_out != 0 && flush == Z_FINISH) { + return constructError("unexpected end of file"_kj); + } + case Z_STREAM_END: + // normal statuses, not fatal + break; + case Z_NEED_DICT: + if (dictionary.empty()) + return constructError("Missing dictionary"_kj); + else + return constructError("Bad dictionary"_kj); + default: + // something else. + return constructError("Zlib error"); + } + + return {}; +} + +kj::Maybe ZlibContext::setDictionary() { + if (dictionary.empty()) { + return kj::none; + } + + switch (mode) { + case ZlibMode::DEFLATE: + case ZlibMode::DEFLATERAW: + err = deflateSetDictionary(&stream, dictionary.begin(), dictionary.size());; + break; + case ZlibMode::INFLATERAW: + err = inflateSetDictionary(&stream, dictionary.begin(), dictionary.size()); + break; + default: + break; + } + + if (err != Z_OK) { + return constructError("Failed to set dictionary"_kj); + } + + return kj::none; +} + +bool ZlibContext::initializeZlib() { + if (initialized.lockExclusive()) { + return false; + } + switch (mode) { + case ZlibMode::DEFLATE: + case ZlibMode::GZIP: + case ZlibMode::DEFLATERAW: + err = deflateInit2(&stream, level, Z_DEFLATED, windowBits, memLevel, strategy); + break; + case ZlibMode::INFLATE: + case ZlibMode::GUNZIP: + case ZlibMode::INFLATERAW: + case ZlibMode::UNZIP: + err = inflateInit2(&stream, windowBits); + break; + default: + JSG_FAIL_REQUIRE(Error, "Unreachable"); + } + + if (err != Z_OK) { + dictionary.clear(); + mode = ZlibMode::NONE; + return true; + } + + setDictionary(); + *initialized.lockExclusive() = true; + return true; +} + +kj::Maybe ZlibContext::resetStream() { + bool initialized_now = initializeZlib(); + if (initialized_now && err != Z_OK) { + return constructError("Failed to init stream before reset"); + } + switch (mode) { + case ZlibMode::DEFLATE: + case ZlibMode::DEFLATERAW: + case ZlibMode::GZIP: + err = deflateReset(&stream); + break; + case ZlibMode::INFLATE: + case ZlibMode::INFLATERAW: + case ZlibMode::GUNZIP: + err = inflateReset(&stream); + break; + default: + break; + } + + if (err != Z_OK) { + return constructError("Failed to reset stream"_kj); + } + + return setDictionary(); +} + +void ZlibContext::work() { + bool initialized_now = initializeZlib(); + if (initialized_now && err != Z_OK) { + return; + } + const Bytef* next_expected_header_byte = nullptr; + + auto err = Z_OK; + + // If the avail_out is left at 0, then it means that it ran out + // of room. If there was avail_out left over, then it means + // that all of the input was consumed. + switch (mode) { + case ZlibMode::DEFLATE: + case ZlibMode::GZIP: + case ZlibMode::DEFLATERAW: + err = deflate(&stream, flush); + break; + case ZlibMode::UNZIP: + if (stream.avail_in > 0) { + next_expected_header_byte = stream.next_in; + } + + if (next_expected_header_byte == nullptr) { + break; + } + + switch (gzip_id_bytes_read) { + case 0: + if (*next_expected_header_byte == GZIP_HEADER_ID1) { + gzip_id_bytes_read = 1; + next_expected_header_byte++; + + if (stream.avail_in == 1) { + // The only available byte was already read. + break; + } + } else { + mode = ZlibMode::INFLATE; + break; + } + + [[fallthrough]]; + case 1: + if (*next_expected_header_byte == GZIP_HEADER_ID2) { + gzip_id_bytes_read = 2; + mode = ZlibMode::GUNZIP; + } else { + // There is no actual difference between INFLATE and INFLATERAW + // (after initialization). + mode = ZlibMode::INFLATE; + } + + break; + default: + JSG_FAIL_REQUIRE(Error, "Invalid number of gzip magic number bytes read"); + } + + [[fallthrough]]; + case ZlibMode::INFLATE: + case ZlibMode::GUNZIP: + case ZlibMode::INFLATERAW: + err = inflate(&stream, flush); + + // If data was encoded with dictionary (INFLATERAW will have it set in + // SetDictionary, don't repeat that here) + if (mode != ZlibMode::INFLATERAW && + err == Z_NEED_DICT && + !dictionary.empty()) { + err = inflateSetDictionary(&stream, dictionary.begin(), dictionary.size()); + if (err == Z_OK) { + // And try to decode again + err = inflate(&stream, flush); + } else if (err == Z_DATA_ERROR) { + // Both inflateSetDictionary() and inflate() return Z_DATA_ERROR. + // Make it possible for After() to tell a bad dictionary from bad + // input. + err = Z_NEED_DICT; + } + } + + while (stream.avail_in > 0 && + mode == ZlibMode::GUNZIP && + err == Z_STREAM_END && + stream.next_in[0] != 0x00) { + // Bytes remain in input buffer. Perhaps this is another compressed + // member in the same archive, or just trailing garbage. + // Trailing zero bytes are okay, though, since they are frequently + // used for padding. + + resetStream(); + err = inflate(&stream, flush); + } + break; + default: + JSG_FAIL_REQUIRE(Error, "Unreachable"); + } +} + +kj::Maybe ZlibContext::setParams(int _level, int _strategy) { + bool initialized_now = initializeZlib(); + if (initialized_now && err != Z_OK) { + return constructError("Failed to init stream before set parameters"); + } + err = Z_OK; + + switch (mode) { + case ZlibMode::DEFLATE: + case ZlibMode::DEFLATERAW: + err = deflateParams(&stream, level, strategy); + break; + default: + break; + } + + if (err != Z_OK) { + return constructError("Failed to set parameters"); + } + + return kj::none; +} + +void ZlibContext::close() { + if (!initialized.lockExclusive()) { + dictionary.clear(); + mode = ZlibMode::NONE; + return; + } + + auto status = Z_OK; + switch (mode) { + case ZlibMode::DEFLATE: + case ZlibMode::DEFLATERAW: + case ZlibMode::GZIP: + status = deflateEnd(&stream); + break; + case ZlibMode::INFLATE: + case ZlibMode::INFLATERAW: + case ZlibMode::GUNZIP: + case ZlibMode::UNZIP: + status = inflateEnd(&stream); + break; + default: + JSG_FAIL_REQUIRE(Error, "Unreachable"); + } + + JSG_REQUIRE(status == Z_OK || status == Z_DATA_ERROR, Error, "Uncaught error on closing zlib stream"); + mode = ZlibMode::NONE; + dictionary.clear(); +} + +jsg::Ref ZlibUtil::ZlibStream::constructor(ZlibModeValue mode) { + return jsg::alloc(static_cast(mode)); +} + +template +void CompressionStream::emitError(const CompressionError& error) { + // TODO: Implement this +} + +template +void CompressionStream::writeStream(int flush, kj::ArrayPtr input, kj::ArrayPtr output) { + JSG_REQUIRE(initialized, Error, "Writing before initializing"_kj); + JSG_REQUIRE(!closed, Error, "Already finalized"_kj); + JSG_REQUIRE(!writing, Error, "Writing is in progress"_kj); + JSG_REQUIRE(!pending_close, Error, "Pending close"_kj); + + // Ref(); + context.setBuffers(input, output); + context.setFlush(flush); + + // This implementation always follows sync version. + context.work(); + if (checkError()) { + context.getAfterWriteOffsets(writeResult); + writing = false; + } + // Unref(); +} + +void ZlibUtil::ZlibStream::write(int flush, kj::Array input, int inputOffset, + int inputLength, kj::Array output, int outputOffset, + int outputLength) { + if (flush != Z_NO_FLUSH && + flush != Z_PARTIAL_FLUSH && + flush != Z_SYNC_FLUSH && + flush != Z_FULL_FLUSH && + flush != Z_FINISH && + flush != Z_BLOCK) { + JSG_FAIL_REQUIRE(Error, "Invalid flush value"); + } + + // Check bounds + JSG_REQUIRE(inputOffset > 0 && inputOffset < input.size(), Error, "Offset should be smaller than size and bigger than 0"_kj); + JSG_REQUIRE(input.size() >= inputLength, Error, "Invalid inputLength"_kj); + JSG_REQUIRE(outputOffset > 0 && outputOffset < output.size(), Error, "Offset should be smaller than size and bigger than 0"_kj); + JSG_REQUIRE(output.size() >= outputLength, Error, "Invalid outputLength"_kj); + + writeStream(flush, input.slice(inputOffset, inputLength), output.slice(outputOffset, outputLength)); +} + } // namespace workerd::api::node diff --git a/src/workerd/api/node/zlib-util.h b/src/workerd/api/node/zlib-util.h index c5ec664e291..9eadc5cd7df 100644 --- a/src/workerd/api/node/zlib-util.h +++ b/src/workerd/api/node/zlib-util.h @@ -1,57 +1,258 @@ // Copyright (c) 2017-2022 Cloudflare, Inc. // Licensed under the Apache 2.0 license found in the LICENSE file or at: // https://opensource.org/licenses/Apache-2.0 - +// Copyright Joyent and Node contributors. All rights reserved. MIT license. #include +#include +#include "zlib.h" #include #include -#include "zlib.h" +#include +#include -#include -#include +#include namespace workerd::api::node { +#ifndef ZLIB_ERROR_CODES +#define ZLIB_ERROR_CODES(V) \ + V(Z_OK) \ + V(Z_STREAM_END) \ + V(Z_NEED_DICT) \ + V(Z_ERRNO) \ + V(Z_STREAM_ERROR) \ + V(Z_DATA_ERROR) \ + V(Z_MEM_ERROR) \ + V(Z_BUF_ERROR) \ + V(Z_VERSION_ERROR) \ + +inline const char* ZlibStrerror(int err) { +#define V(code) if (err == code) return #code; + ZLIB_ERROR_CODES(V) +#undef V + return "Z_UNKNOWN_ERROR"; +} +#endif // ZLIB_ERROR_CODES + +// Certain zlib constants are defined by Node.js itself +static constexpr auto Z_MIN_CHUNK = 64; +static constexpr auto Z_MAX_CHUNK = 128 * 1024; +static constexpr auto Z_DEFAULT_CHUNK = 16 * 1024; +static constexpr auto Z_MIN_MEMLEVEL = 1; + +static constexpr auto Z_MAX_MEMLEVEL = 9; +static constexpr auto Z_DEFAULT_MEMLEVEL = 8; +static constexpr auto Z_MIN_LEVEL = -1; +static constexpr auto Z_MAX_LEVEL = 9; +static constexpr auto Z_DEFAULT_LEVEL = Z_DEFAULT_COMPRESSION; +static constexpr auto Z_MIN_WINDOWBITS = 8; +static constexpr auto Z_MAX_WINDOWBITS = 15; +static constexpr auto Z_DEFAULT_WINDOWBITS = 15; + +static constexpr uint8_t GZIP_HEADER_ID1 = 0x1f; +static constexpr uint8_t GZIP_HEADER_ID2 = 0x8b; + +using ZlibModeValue = uint8_t; +enum class ZlibMode : ZlibModeValue { + NONE, + DEFLATE, + INFLATE, + GZIP, + GUNZIP, + DEFLATERAW, + INFLATERAW, + UNZIP, + BROTLI_DECODE, + BROTLI_ENCODE +}; + +struct CompressionError { + CompressionError(kj::StringPtr _message, kj::StringPtr _code, int _err) + : message(kj::str(_message)), + code(kj::str(_code)), + err(_err) { + JSG_REQUIRE(message.size() != 0, Error, "Compression error message should not be null"); + } + + kj::String message; + kj::String code; + int err; +}; + +class ZlibContext final { +public: + ZlibContext() = default; + + KJ_DISALLOW_COPY(ZlibContext); + + void close(); + void setBuffers(kj::ArrayPtr input, kj::ArrayPtr output) { + stream.avail_in = input.size(); + stream.next_in = input.begin(); + stream.avail_out = output.size(); + stream.next_out = output.begin(); + }; + int getFlush() const { return flush; }; + void setFlush(int value) { + flush = value; + }; + void getAfterWriteOffsets(kj::ArrayPtr writeResult) const { + writeResult[0] = stream.avail_out; + writeResult[1] = stream.avail_in; + } + void setMode(ZlibMode value) { + mode = value; + }; + kj::Maybe resetStream(); + kj::Maybe getError() const; + void work(); + + uint getAvailIn() const { return stream.avail_in; }; + void setAvailIn(uint value) { stream.avail_in = value; }; + uint getAvailOut() const { return stream.avail_out; } + void setAvailOut(uint value) { stream.avail_out = value; }; + + // Zlib + void initialize(int _level, int _windowBits, int _memLevel, int _strategy, kj::Array _dictionary); + kj::Maybe setParams(int level, int strategy); + +private: + bool initializeZlib(); + kj::Maybe setDictionary(); + + CompressionError constructError(kj::StringPtr message) const { + if (stream.msg != nullptr) + message = kj::StringPtr(stream.msg); + + return {kj::str(message), kj::str(ZlibStrerror(err)), err}; + }; + + kj::MutexGuarded initialized{false}; + ZlibMode mode = ZlibMode::NONE; + int flush = Z_NO_FLUSH; + int windowBits = 0; + int level = 0; + int memLevel = 0; + int strategy = 0; + kj::Vector dictionary; + + int err = Z_OK; + unsigned int gzip_id_bytes_read = 0; + z_stream stream{}; +}; + +using CompressionStreamCallback = kj::Maybe>>; + +template +class CompressionStream { +public: + CompressionStream() = default; + ~CompressionStream() { + JSG_ASSERT(!writing, Error, "Writing to compression stream"_kj); + close(); + JSG_ASSERT(zlib_memory == 0, Error, "Zlib memory is not 0"_kj); + JSG_ASSERT(unreported_allocations == 0, Error, "Unreported allocations are not 0"_kj); + } + + KJ_DISALLOW_COPY_AND_MOVE(CompressionStream); + + void close() { + pending_close = writing; + if (writing) { + return; + } + closed = true; + JSG_ASSERT(initialized, Error, "Closing before initialized"_kj); + context.close(); + } + + bool checkError() { + KJ_IF_SOME(error, context.getError()) { + emitError(error); + return false; + } + return true; + } + + void emitError(const CompressionError& error); + void writeStream(int flush, kj::ArrayPtr input, kj::ArrayPtr output); + void setCallback(CompressionStreamCallback value) { callback = kj::mv(value); }; + void initializeStream(kj::ArrayPtr _write_result, CompressionStreamCallback _callback) { + writeResult = kj::mv(_write_result); + callback = kj::mv(_callback); + initialized = true; + } + +protected: + CompressionContext context; + +private: + bool initialized = false; + bool writing = false; + bool pending_close = false; + bool closed = false; + size_t zlib_memory = 0; + size_t unreported_allocations = 0; + + CompressionStreamCallback callback; + kj::ArrayPtr writeResult = nullptr; +}; + // Implements utilities in support of the Node.js Zlib class ZlibUtil final : public jsg::Object { public: ZlibUtil() = default; ZlibUtil(jsg::Lock&, const jsg::Url&) {} - // Certain zlib constants are defined by NodeJS itself - static constexpr auto Z_MIN_CHUNK = 64; - static constexpr auto Z_MAX_CHUNK = 128 * 1024; - static constexpr auto Z_DEFAULT_CHUNK = 16 * 1024; - static constexpr auto Z_MIN_MEMLEVEL = 1; - - static constexpr auto Z_MAX_MEMLEVEL = 9; - static constexpr auto Z_DEFAULT_MEMLEVEL = 8; - static constexpr auto Z_MIN_LEVEL = -1; - static constexpr auto Z_MAX_LEVEL = 9; - static constexpr auto Z_DEFAULT_LEVEL = Z_DEFAULT_COMPRESSION; - static constexpr auto Z_MIN_WINDOWBITS = 8; - static constexpr auto Z_MAX_WINDOWBITS = 15; - static constexpr auto Z_DEFAULT_WINDOWBITS = 15; - - using ZlibModeValue = uint8_t; - enum class ZlibMode: ZlibModeValue { - NONE, - DEFLATE, - INFLATE, - GZIP, - GUNZIP, - DEFLATERAW, - INFLATERAW, - UNZIP, - BROTLI_DECODE, - BROTLI_ENCODE + class ZlibStream final : public jsg::Object, public CompressionStream { + public: + explicit ZlibStream(ZlibMode mode) : CompressionStream() { + context.setMode(mode); + }; + + KJ_DISALLOW_COPY_AND_MOVE(ZlibStream); + + static jsg::Ref constructor(ZlibModeValue mode); + + // Instance methods + void initialize(int windowBits, int level, int memLevel, int strategy, kj::Array writeState, CompressionStreamCallback processCallback, kj::Array dictionary) { + initializeStream(writeState.asPtr(), kj::mv(processCallback)); + context.initialize(level, windowBits, memLevel, strategy, kj::mv(dictionary)); + } + void write(int flush, kj::Array input, int inputOffset, int inputLength, kj::Array output, int outputOffset, int outputLength); + void params(const int level, const int strategy) { + context.setParams(level, strategy); + } + + // Property setters/getters + uint32_t getAvailInBefore() const { return context.getAvailIn(); }; + void setAvailInBefore(uint32_t value) { context.setAvailIn(value); }; + + uint32_t getAvailOutBefore() const { return context.getAvailOut(); }; + void setAvailOutBefore(uint32_t value) { context.setAvailOut(value); }; + + int getFlushFlag() const { return context.getFlush(); }; + void setFlushFlag(int value) { context.setFlush(value); }; + + JSG_RESOURCE_TYPE(ZlibStream) { + JSG_METHOD(initialize); + JSG_METHOD(close); + JSG_METHOD(write); + JSG_METHOD(params); + JSG_METHOD(setCallback); + + JSG_PROTOTYPE_PROPERTY(availInBefore, getAvailInBefore, setAvailInBefore); + JSG_PROTOTYPE_PROPERTY(availOutBefore, getAvailOutBefore, setAvailOutBefore); + JSG_PROTOTYPE_PROPERTY(flushFlag, getFlushFlag, setFlushFlag); + } }; uint32_t crc32Sync(kj::Array data, uint32_t value); JSG_RESOURCE_TYPE(ZlibUtil) { JSG_METHOD_NAMED(crc32, crc32Sync); + JSG_NESTED_TYPE(ZlibStream); // zlib.constants (part of the API contract for node:zlib) JSG_STATIC_CONSTANT_NAMED(CONST_Z_NO_FLUSH, Z_NO_FLUSH); @@ -89,8 +290,10 @@ class ZlibUtil final : public jsg::Object { JSG_STATIC_CONSTANT_NAMED(CONST_DEFLATERAW, static_cast(ZlibMode::DEFLATERAW)); JSG_STATIC_CONSTANT_NAMED(CONST_INFLATERAW, static_cast(ZlibMode::INFLATERAW)); JSG_STATIC_CONSTANT_NAMED(CONST_UNZIP, static_cast(ZlibMode::UNZIP)); - JSG_STATIC_CONSTANT_NAMED(CONST_BROTLI_DECODE, static_cast(ZlibMode::BROTLI_DECODE)); - JSG_STATIC_CONSTANT_NAMED(CONST_BROTLI_ENCODE, static_cast(ZlibMode::BROTLI_ENCODE)); + JSG_STATIC_CONSTANT_NAMED(CONST_BROTLI_DECODE, + static_cast(ZlibMode::BROTLI_DECODE)); + JSG_STATIC_CONSTANT_NAMED(CONST_BROTLI_ENCODE, + static_cast(ZlibMode::BROTLI_ENCODE)); JSG_STATIC_CONSTANT_NAMED(CONST_Z_MIN_WINDOWBITS, Z_MIN_WINDOWBITS); JSG_STATIC_CONSTANT_NAMED(CONST_Z_MAX_WINDOWBITS, Z_MAX_WINDOWBITS); @@ -201,6 +404,6 @@ class ZlibUtil final : public jsg::Object { }; }; -#define EW_NODE_ZLIB_ISOLATE_TYPES api::node::ZlibUtil +#define EW_NODE_ZLIB_ISOLATE_TYPES api::node::ZlibUtil, api::node::ZlibUtil::ZlibStream } // namespace workerd::api::node