From 03d88340f0374567997cf99574de013857417742 Mon Sep 17 00:00:00 2001 From: Yagiz Nizipli Date: Mon, 12 Aug 2024 10:55:30 -0400 Subject: [PATCH] add several `node:zlib` classes --- src/node/internal/internal_assert.ts | 4 +- src/node/internal/internal_errors.ts | 6 + src/node/internal/internal_zlib.ts | 117 +++- src/node/internal/internal_zlib_base.ts | 558 ++++++++++++++++++ src/node/internal/streams_util.d.ts | 9 + src/node/internal/validators.ts | 44 ++ src/node/internal/zlib.d.ts | 40 ++ src/node/zlib.ts | 55 +- .../api/node/tests/zlib-nodejs-test.js | 408 ++++++++++++- src/workerd/api/node/zlib-util.c++ | 415 +++++++++++++ src/workerd/api/node/zlib-util.h | 217 +++++-- 11 files changed, 1812 insertions(+), 61 deletions(-) create mode 100644 src/node/internal/internal_zlib_base.ts create mode 100644 src/node/internal/streams_util.d.ts diff --git a/src/node/internal/internal_assert.ts b/src/node/internal/internal_assert.ts index eb0e36447b1..111dc366f42 100644 --- a/src/node/internal/internal_assert.ts +++ b/src/node/internal/internal_assert.ts @@ -83,7 +83,9 @@ function assert(actual: unknown, message?: string | Error): asserts actual { } as AssertionErrorConstructorOptions ); } } -export const ok = assert; + +type Assert = (actual: unknown, message?: string | Error) => asserts actual; +export const ok: Assert = assert; export function throws( fn: () => void, diff --git a/src/node/internal/internal_errors.ts b/src/node/internal/internal_errors.ts index 2a85cf33135..00727c0afe3 100644 --- a/src/node/internal/internal_errors.ts +++ b/src/node/internal/internal_errors.ts @@ -508,6 +508,12 @@ export class ERR_STREAM_UNSHIFT_AFTER_END_EVENT extends NodeError { } } +export class ERR_BUFFER_TOO_LARGE extends NodeRangeError { + constructor(value: number) { + super("ERR_BUFFER_TOO_LARGE", `Cannot create a Buffer larger than ${value} bytes`); + } +} + export function aggregateTwoErrors(innerError: any, outerError: any) { if (innerError && outerError && innerError !== outerError) { if (Array.isArray(outerError.errors)) { diff --git a/src/node/internal/internal_zlib.ts b/src/node/internal/internal_zlib.ts index 275c13a631c..57203211d07 100644 --- a/src/node/internal/internal_zlib.ts +++ b/src/node/internal/internal_zlib.ts @@ -1,22 +1,27 @@ -import { default as zlibUtil } from 'node-internal:zlib'; +// Copyright (c) 2017-2022 Cloudflare, Inc. +// Licensed under the Apache 2.0 license found in the LICENSE file or at: +// https://opensource.org/licenses/Apache-2.0 +// Copyright Joyent and Node contributors. All rights reserved. MIT license. + +import { default as zlibUtil, type ZlibOptions } from 'node-internal:zlib'; import { Buffer } from 'node-internal:internal_buffer'; import { validateUint32 } from 'node-internal:validators'; -import { isArrayBufferView } from 'node-internal:internal_types'; import { ERR_INVALID_ARG_TYPE } from 'node-internal:internal_errors'; +import { isArrayBufferView } from 'node-internal:internal_types'; +import { Zlib } from 'node-internal:internal_zlib_base'; -function crc32(data: ArrayBufferView | string, value: number = 0): number { - if (typeof data === 'string') { - data = Buffer.from(data); - } else if (!isArrayBufferView(data)) { - throw new ERR_INVALID_ARG_TYPE('data', 'ArrayBufferView', typeof data); - } - validateUint32(value, 'value'); - return zlibUtil.crc32(data, value); -} - +const { + CONST_DEFLATE, + CONST_DEFLATERAW, + CONST_INFLATE, + CONST_INFLATERAW, + CONST_GUNZIP, + CONST_GZIP, + CONST_UNZIP, +} = zlibUtil; -const constPrefix = 'CONST_'; -const constants = {}; +const constPrefix = 'CONST_'; +export const constants: Record = {}; // eslint-disable-next-line @typescript-eslint/no-unsafe-argument Object.defineProperties(constants, Object.fromEntries(Object.entries(Object.getPrototypeOf(zlibUtil)) @@ -29,7 +34,85 @@ Object.defineProperties(constants, Object.fromEntries(Object.entries(Object.getP }]) )); -export { - crc32, - constants, +export function crc32(data: ArrayBufferView | string, value: number = 0): number { + if (typeof data === 'string') { + data = Buffer.from(data); + } else if (!isArrayBufferView(data)) { + throw new ERR_INVALID_ARG_TYPE('data', 'ArrayBufferView', typeof data); + } + validateUint32(value, 'value'); + return zlibUtil.crc32(data, value); +} + +export class Gzip extends Zlib { + public constructor(options: ZlibOptions) { + super(options, CONST_GZIP); + } +} + +export class Gunzip extends Zlib { + public constructor(options: ZlibOptions) { + super(options, CONST_GUNZIP); + } +} + +export class Deflate extends Zlib { + public constructor(options: ZlibOptions) { + super(options, CONST_DEFLATE); + } +} + +export class DeflateRaw extends Zlib { + public constructor(options?: ZlibOptions) { + if (options?.windowBits === 8) { + options.windowBits = 9 + } + super(options, CONST_DEFLATERAW); + } +} + +export class Inflate extends Zlib { + public constructor(options: ZlibOptions) { + super(options, CONST_INFLATE); + } +} + +export class InflateRaw extends Zlib { + public constructor(options: ZlibOptions) { + super(options, CONST_INFLATERAW); + } +} + +export class Unzip extends Zlib { + public constructor(options: ZlibOptions) { + super(options, CONST_UNZIP); + } +} + +export function createGzip(options: ZlibOptions): Gzip { + return new Gzip(options); +} + +export function createGunzip(options: ZlibOptions): Gunzip { + return new Gunzip(options); +} + +export function createDeflate(options: ZlibOptions): Deflate { + return new Deflate(options); +} + +export function createDeflateRaw(options: ZlibOptions): DeflateRaw { + return new DeflateRaw(options); +} + +export function createInflate(options: ZlibOptions): Inflate { + return new Inflate(options); +} + +export function createInflateRaw(options: ZlibOptions): InflateRaw { + return new InflateRaw(options); +} + +export function createUnzip(options: ZlibOptions): Unzip { + return new Unzip(options); } diff --git a/src/node/internal/internal_zlib_base.ts b/src/node/internal/internal_zlib_base.ts new file mode 100644 index 00000000000..d5ba2d3d13b --- /dev/null +++ b/src/node/internal/internal_zlib_base.ts @@ -0,0 +1,558 @@ +// Copyright (c) 2017-2022 Cloudflare, Inc. +// Licensed under the Apache 2.0 license found in the LICENSE file or at: +// https://opensource.org/licenses/Apache-2.0 +// Copyright Joyent and Node contributors. All rights reserved. MIT license. + +import { default as zlibUtil, type ZlibOptions } from 'node-internal:zlib'; +import { Buffer, kMaxLength } from 'node-internal:internal_buffer'; +import { checkRangesOrGetDefault, checkFiniteNumber } from 'node-internal:validators'; +import { ERR_OUT_OF_RANGE, ERR_BUFFER_TOO_LARGE, ERR_INVALID_ARG_TYPE, NodeError } from 'node-internal:internal_errors'; +import { Transform, type DuplexOptions } from 'node-internal:streams_transform' +import { eos as finished } from 'node-internal:streams_util'; +import { isArrayBufferView, isAnyArrayBuffer } from 'node-internal:internal_types'; + +// Explicitly import `ok()` to avoid typescript error requiring every name in the call target to +// be annotated with an explicit type annotation. +import assert, { ok } from 'node-internal:internal_assert'; + +const { + CONST_INFLATE, + CONST_GUNZIP, + CONST_GZIP, + CONST_UNZIP, + CONST_Z_DEFAULT_CHUNK, + CONST_Z_DEFAULT_STRATEGY, + CONST_Z_DEFAULT_MEMLEVEL, + CONST_Z_DEFAULT_WINDOWBITS, + CONST_Z_DEFAULT_COMPRESSION, + CONST_Z_FIXED, + CONST_Z_MAX_LEVEL, + CONST_Z_MAX_MEMLEVEL, + CONST_Z_MAX_WINDOWBITS, + CONST_Z_MIN_LEVEL, + CONST_Z_MIN_MEMLEVEL, + CONST_Z_SYNC_FLUSH, + CONST_Z_NO_FLUSH, + CONST_Z_BLOCK, + CONST_Z_MIN_CHUNK, + CONST_Z_PARTIAL_FLUSH, + CONST_Z_FULL_FLUSH, + CONST_Z_FINISH, + CONST_BROTLI_ENCODE, + CONST_BROTLI_DECODE, + CONST_BROTLI_OPERATION_PROCESS, + CONST_BROTLI_OPERATION_EMIT_METADATA, +} = zlibUtil; + +export const owner_symbol = Symbol('owner'); + +const FLUSH_BOUND_IDX_NORMAL: number = 0; +const FLUSH_BOUND_IDX_BROTLI: number = 1; +const FLUSH_BOUND: [[number, number], [number, number]] = [ + [ CONST_Z_NO_FLUSH, CONST_Z_BLOCK ], + [ CONST_BROTLI_OPERATION_PROCESS, CONST_BROTLI_OPERATION_EMIT_METADATA ], +]; + +const kFlushFlag = Symbol('kFlushFlag'); +const kError = Symbol('kError'); + +function processCallback(this: zlibUtil.ZlibStream): void { + // This callback's context (`this`) is the `_handle` (ZCtx) object. It is + // important to null out the values once they are no longer needed since + // `_handle` can stay in memory long after the buffer is needed. + // eslint-disable-next-line @typescript-eslint/no-this-alias + const handle = this; + const self = this[owner_symbol]; + const state = self._writeState; + + if (self.destroyed) { + this.buffer = null; + this.cb(); + return; + } + + const [availOutAfter, availInAfter] = state as unknown as [number, number]; + + const inDelta = handle.availInBefore - availInAfter; + self.bytesWritten += inDelta; + + const have = handle.availOutBefore - availOutAfter; + let streamBufferIsFull = false; + if (have > 0) { + const out = self._outBuffer.slice(self._outOffset, self._outOffset + have); + self._outOffset += have; + streamBufferIsFull = !self.push(out); + } else { + assert.strictEqual(have, 0, 'have should not go down'); + } + + if (self.destroyed) { + this.cb(); + return; + } + + // Exhausted the output buffer, or used all the input create a new one. + if (availOutAfter === 0 || self._outOffset >= self._chunkSize) { + handle.availOutBefore = self._chunkSize; + self._outOffset = 0; + self._outBuffer = Buffer.allocUnsafe(self._chunkSize); + } + + if (availOutAfter === 0) { + // Not actually done. Need to reprocess. + // Also, update the availInBefore to the availInAfter value, + // so that if we have to hit it a third (fourth, etc.) time, + // it'll have the correct byte counts. + handle.inOff += inDelta; + handle.availInBefore = availInAfter; + + + if (!streamBufferIsFull) { + this.write(handle.flushFlag, + this.buffer as NodeJS.TypedArray, // in + handle.inOff, // in_off + handle.availInBefore, // in_len + self._outBuffer, // out + self._outOffset, // out_off + self._chunkSize); // out_len + } else { + // eslint-disable-next-line @typescript-eslint/unbound-method + const oldRead = self._read; + self._read = (n): void => { + self._read = oldRead; + this.write(handle.flushFlag, + this.buffer as NodeJS.TypedArray, // in + handle.inOff, // in_off + handle.availInBefore, // in_len + self._outBuffer, // out + self._outOffset, // out_off + self._chunkSize); // out_len + self._read(n); + }; + } + return; + } + + if (availInAfter > 0) { + // If we have more input that should be written, but we also have output + // space available, that means that the compression library was not + // interested in receiving more data, and in particular that the input + // stream has ended early. + // This applies to streams where we don't check data past the end of + // what was consumed; that is, everything except Gunzip/Unzip. + self.push(null); + } + + // Finished with the chunk. + this.buffer = null; + this.cb(); +} + +// If a flush is scheduled while another flush is still pending, a way to figure +// out which one is the "stronger" flush is needed. +// This is currently only used to figure out which flush flag to use for the +// last chunk. +// Roughly, the following holds: +// Z_NO_FLUSH (< Z_TREES) < Z_BLOCK < Z_PARTIAL_FLUSH < +// Z_SYNC_FLUSH < Z_FULL_FLUSH < Z_FINISH +const flushiness: number[] = []; +const kFlushFlagList = [CONST_Z_NO_FLUSH, CONST_Z_BLOCK, CONST_Z_PARTIAL_FLUSH, + CONST_Z_SYNC_FLUSH, CONST_Z_FULL_FLUSH, CONST_Z_FINISH]; +for (let i = 0; i < kFlushFlagList.length; i++) { + flushiness[kFlushFlagList[i] as number] = i; +} + +type BufferWithFlushFlag = Buffer & { [kFlushFlag]: number}; + +// Set up a list of 'special' buffers that can be written using .write() +// from the .flush() code as a way of introducing flushing operations into the +// write sequence. +const kFlushBuffers: BufferWithFlushFlag[] = []; +{ + const dummyArrayBuffer = new ArrayBuffer(0); + for (const flushFlag of kFlushFlagList) { + const buf = Buffer.from(dummyArrayBuffer) as BufferWithFlushFlag; + buf[kFlushFlag] = flushFlag; + kFlushBuffers[flushFlag] = buf; + } +} + +function zlibOnError(this: zlibUtil.ZlibStream, errno: number, code: string, message: string): void { + const self = this[owner_symbol]; + const error = new NodeError(code, message); + // @ts-expect-error Err number is expected. + error.errno = errno; + self.destroy(error); + self[kError] = error; +} + +function processChunkSync(self: Zlib, chunk: Buffer, flushFlag: number): Buffer | Uint8Array { + let availInBefore = chunk.byteLength; + let availOutBefore = self._chunkSize - self._outOffset; + let inOff = 0; + let availOutAfter; + let availInAfter; + + const buffers: (Buffer | Uint8Array)[] = []; + let nread = 0; + let inputRead = 0; + const state = self._writeState; + const handle = self._handle; + let buffer = self._outBuffer; + let offset = self._outOffset; + const chunkSize = self._chunkSize; + + let error; + self.on('error', function onError(er) { + error = er; + }); + + while (true) { + // TODO(soon): This was `writeSync` before, but it's not anymore. + handle?.write(flushFlag, + chunk, // in + inOff, // in_off + availInBefore, // in_len + buffer, // out + offset, // out_off + availOutBefore); // out_len + if (error) + throw error; + else if (self[kError]) + throw self[kError]; + + [availOutAfter, availInAfter] = state as unknown as [number, number]; + + const inDelta = (availInBefore - availInAfter); + inputRead += inDelta; + + const have = availOutBefore - availOutAfter; + if (have > 0) { + const out = buffer.slice(offset, offset + have); + offset += have; + buffers.push(out); + nread += out.byteLength; + + if (nread > self._maxOutputLength) { + _close(self); + throw new ERR_BUFFER_TOO_LARGE(self._maxOutputLength); + } + + } else { + assert.strictEqual(have, 0, 'have should not go down'); + } + + // Exhausted the output buffer, or used all the input create a new one. + if (availOutAfter === 0 || offset >= chunkSize) { + availOutBefore = chunkSize; + offset = 0; + buffer = Buffer.allocUnsafe(chunkSize); + } + + if (availOutAfter === 0) { + // Not actually done. Need to reprocess. + // Also, update the availInBefore to the availInAfter value, + // so that if we have to hit it a third (fourth, etc.) time, + // it'll have the correct byte counts. + inOff += inDelta; + availInBefore = availInAfter; + } else { + break; + } + } + + self.bytesWritten = inputRead; + _close(self); + + if (nread === 0) + return Buffer.alloc(0); + + return (buffers.length === 1 ? buffers[0] as Buffer : Buffer.concat(buffers, nread)); +} + +function _close(engine: ZlibBase): void { + engine._handle?.close(); + engine._handle = undefined; +} + +type ZlibDefaultOptions = { + flush: number + finishFlush: number + fullFlush: number +} + +const zlibDefaultOptions = { + flush: CONST_Z_NO_FLUSH, + finishFlush: CONST_Z_FINISH, + fullFlush: CONST_Z_FULL_FLUSH, +}; + +export class ZlibBase extends Transform { + public bytesWritten: number = 0; + + public _maxOutputLength: number; + public _outBuffer: Buffer; + public _outOffset: number = 0; + public _chunkSize: number; + public _defaultFlushFlag: number | undefined; + public _finishFlushFlag: number | undefined; + public _defaultFullFlushFlag: number | undefined; + public _info: unknown; + public _handle: zlibUtil.ZlibStream | undefined; + public _writeState = new Uint32Array(2); + + public [kError]: NodeError | undefined; + + public constructor(opts: ZlibOptions & DuplexOptions, mode: number, handle: zlibUtil.ZlibStream, { flush, finishFlush, fullFlush }: ZlibDefaultOptions = zlibDefaultOptions) { + let chunkSize = CONST_Z_DEFAULT_CHUNK; + let maxOutputLength = kMaxLength; + + let flushBoundIdx; + if (mode !== CONST_BROTLI_ENCODE && mode !== CONST_BROTLI_DECODE) { + flushBoundIdx = FLUSH_BOUND_IDX_NORMAL; + } else { + flushBoundIdx = FLUSH_BOUND_IDX_BROTLI; + } + + if (opts) { + if (opts.chunkSize != null) { + chunkSize = opts.chunkSize; + } + if (!checkFiniteNumber(chunkSize, 'options.chunkSize')) { + chunkSize = CONST_Z_DEFAULT_CHUNK; + } else if (chunkSize < CONST_Z_MIN_CHUNK) { + throw new ERR_OUT_OF_RANGE('options.chunkSize', + `>= ${CONST_Z_MIN_CHUNK}`, chunkSize); + } + + flush = checkRangesOrGetDefault( + opts.flush, 'options.flush', + FLUSH_BOUND[flushBoundIdx]?.[0] as number, FLUSH_BOUND[flushBoundIdx]?.[1] as number, flush); + + finishFlush = checkRangesOrGetDefault( + opts.finishFlush, 'options.finishFlush', + FLUSH_BOUND[flushBoundIdx]?.[0] as number, FLUSH_BOUND[flushBoundIdx]?.[1] as number, + finishFlush); + + maxOutputLength = checkRangesOrGetDefault( + opts.maxOutputLength, 'options.maxOutputLength', + 1, kMaxLength, kMaxLength); + + if (opts.encoding || opts.objectMode || opts.writableObjectMode) { + opts = { ...opts }; + opts.encoding = undefined; + opts.objectMode = false; + opts.writableObjectMode = false; + } + } + + // @ts-expect-error TODO: Find a way to avoid having "unknown" + super({ autoDestroy: true, ...opts } as unknown); + + // Error handler by processCallback() and zlibOnError() + handle.setErrorHandler(zlibOnError); + handle[owner_symbol] = this as never; + this._handle = handle; + this._outBuffer = Buffer.allocUnsafe(chunkSize); + this._outOffset = 0; + this._chunkSize = chunkSize; + this._defaultFlushFlag = flush; + this._finishFlushFlag = finishFlush; + this._defaultFullFlushFlag = fullFlush; + this._info = opts && opts.info; + this._maxOutputLength = maxOutputLength; + } + + public get _closed(): boolean { + return this._handle == null; + } + + // @deprecated Use `bytesWritten` instead. + public get bytesRead(): number { + return this.bytesWritten; + } + + // @deprecated Use `bytesWritten` instead. + public set bytesRead(value: number) { + this.bytesWritten = value; + } + + public reset(): void { + ok(this._handle, 'zlib binding closed'); + this._handle.reset(); + } + + // This is the _flush function called by the transform class, + // internally, when the last chunk has been written. + public override _flush(callback: () => void): void { + this._transform(Buffer.alloc(0), 'utf8', callback); + } + + // Force Transform compat behavior. + public override _final(callback: () => void): void { + callback(); + } + + public flush(kind: number, callback: () => void): void; + public flush(callback?: () => void): void; + public flush(kind?: number | (() => void), callback: (() => void) | undefined = undefined): void { + if (typeof kind === 'function' || (kind == null && !callback)) { + callback = kind as (() => void) | undefined; + kind = this._defaultFlushFlag as number; + } + + if (this.writableFinished) { + if (callback) { + /* eslint-disable-next-line @typescript-eslint/no-unsafe-call */ + queueMicrotask(callback); + } + } else if (this.writableEnded) { + if (callback) { + /* eslint-disable-next-line @typescript-eslint/no-unsafe-call */ + queueMicrotask(callback); + } + } else { + this.write(kFlushBuffers[kind as number], 'utf8', callback); + } + } + + public close(callback?: (() => void)): void { + if (callback) { + finished(this, callback); + } + this.destroy(); + } + + public override _destroy(err: T, callback: (err: T) => never): void { + _close(this); + callback(err); + } + + public override _transform(chunk: Buffer & { [kFlushFlag]?: number }, _encoding: BufferEncoding, cb: () => void): void { + let flushFlag = this._defaultFlushFlag; + // We use a 'fake' zero-length chunk to carry information about flushes from + // the public API to the actual stream implementation. + if (typeof chunk[kFlushFlag] === 'number') { + flushFlag = chunk[kFlushFlag]; + } + + // For the last chunk, also apply `_finishFlushFlag`. + if (this.writableEnded && this.writableLength === chunk.byteLength) { + flushFlag = maxFlush(flushFlag as number, this._finishFlushFlag as number); + } + this.#processChunk(chunk, flushFlag as number, cb); + } + + // This function is left for backwards compatibility. + public _processChunk(chunk: Buffer, flushFlag: number, cb?: () => void): Buffer | Uint8Array | undefined { + if (cb != null && typeof cb === 'function') { + this.#processChunk(chunk, flushFlag, cb); + return; + } + return processChunkSync(this as never, chunk, flushFlag); + } + + #processChunk(chunk: Buffer, flushFlag: number, cb: () => void): void { + if (this._handle == null) { + /* eslint-disable-next-line @typescript-eslint/no-unsafe-call */ + queueMicrotask(cb); + return; + } + + this._handle.buffer = null; + this._handle.cb = cb; + this._handle.availOutBefore = this._chunkSize - this._outOffset; + this._handle.availInBefore = chunk.byteLength; + this._handle.flushFlag = flushFlag; + + this._handle.write( + flushFlag, + chunk, // in + 0, // in_off + this._handle.availInBefore, // in_len + this._outBuffer, // out + this._outOffset, // out_off + this._handle.availOutBefore, // out_len + ); + } +} + +function maxFlush(a: number, b: number): number { + return (flushiness[a] as number) > (flushiness[b] as number) ? a : b; +} + +export class Zlib extends ZlibBase { + public _level = CONST_Z_DEFAULT_COMPRESSION; + public _strategy = CONST_Z_DEFAULT_STRATEGY; + + public constructor(options: ZlibOptions | null | undefined, mode: number) { + let windowBits = CONST_Z_DEFAULT_WINDOWBITS; + let level = CONST_Z_DEFAULT_COMPRESSION; + let memLevel = CONST_Z_DEFAULT_MEMLEVEL; + let strategy = CONST_Z_DEFAULT_STRATEGY; + let dictionary: ZlibOptions['dictionary']; + + if (options != null) { + // Special case: + // - Compression: 0 is an invalid case. + // - Decompression: 0 indicates zlib to use the window size in the header of the compressed stream. + if ((options.windowBits == null || options.windowBits === 0) && (mode === CONST_INFLATE || mode === CONST_GUNZIP || mode === CONST_UNZIP)) { + windowBits = 0; + } else { + // `{ windowBits: 8 }` is valid for DEFLATE but not for GZIP. + const min = zlibUtil.CONST_Z_MIN_WINDOWBITS + (mode === CONST_GZIP ? 1 : 0); + windowBits = checkRangesOrGetDefault(options.windowBits, 'options.windowBits', min, CONST_Z_MAX_WINDOWBITS, CONST_Z_DEFAULT_WINDOWBITS); + } + + level = checkRangesOrGetDefault(options.level, 'options.level', CONST_Z_MIN_LEVEL, CONST_Z_MAX_LEVEL, CONST_Z_DEFAULT_COMPRESSION); + memLevel = checkRangesOrGetDefault(options.memLevel, 'options.memLevel', CONST_Z_MIN_MEMLEVEL, CONST_Z_MAX_MEMLEVEL, CONST_Z_DEFAULT_MEMLEVEL); + strategy = checkRangesOrGetDefault(options.strategy, 'options.strategy', CONST_Z_DEFAULT_STRATEGY, CONST_Z_FIXED, CONST_Z_DEFAULT_STRATEGY); + dictionary = options.dictionary; + + if (dictionary != null && !isArrayBufferView(dictionary)) { + if (isAnyArrayBuffer(dictionary)) { + dictionary = Buffer.from(dictionary); + } else { + throw new ERR_INVALID_ARG_TYPE('options.dictionary', ['Buffer', 'TypedArray', 'DataView', 'ArrayBuffer'], dictionary); + } + } + } + + + const writeState = new Uint32Array(2); + const handle = new zlibUtil.ZlibStream(mode); + handle.initialize(windowBits, level, memLevel, strategy, writeState, processCallback, dictionary); + super(options ?? {}, mode, handle); + handle[owner_symbol] = this; + this._level = level; + this._strategy = strategy; + this._handle = handle; + this._writeState = writeState; + } + + public params(level: number, strategy: number, callback: () => never): void { + checkRangesOrGetDefault(level, 'level', CONST_Z_MIN_LEVEL, CONST_Z_MAX_LEVEL); + checkRangesOrGetDefault(strategy, 'strategy', CONST_Z_DEFAULT_STRATEGY, CONST_Z_FIXED); + + if (this._level !== level || this._strategy !== strategy) { + this.flush(CONST_Z_SYNC_FLUSH, this.#paramsAfterFlushCallback.bind(this, level, strategy, callback)); + } else { + /* eslint-disable-next-line @typescript-eslint/no-unsafe-call */ + queueMicrotask(callback); + } + } + + // This callback is used by `.params()` to wait until a full flush happened + // before adjusting the parameters. In particular, the call to the native + // `params()` function should not happen while a write is currently in progress + // on the threadpool. + #paramsAfterFlushCallback(level: number, strategy: number, callback: () => void): void { + ok(this._handle, 'zlib binding closed'); + this._handle.params(level, strategy); + if (!this.destroyed) { + this._level = level; + this._strategy = strategy; + callback?.(); + } + } +} diff --git a/src/node/internal/streams_util.d.ts b/src/node/internal/streams_util.d.ts new file mode 100644 index 00000000000..edca5bc8c5f --- /dev/null +++ b/src/node/internal/streams_util.d.ts @@ -0,0 +1,9 @@ +/* eslint-disable @typescript-eslint/no-redundant-type-constituents */ +import type { FinishedOptions } from 'node:stream'; + +type FinishedStream = NodeJS.ReadableStream | NodeJS.WritableStream | NodeJS.ReadWriteStream; +type FinishedCallback = (err?: NodeJS.ErrnoException | null) => void; + +export function eos(stream: FinishedStream, options: FinishedOptions): void; +export function eos(stream: FinishedStream, options: FinishedOptions, callback?: FinishedCallback): void; +export function eos(stream: FinishedStream, callback?: FinishedCallback): void; diff --git a/src/node/internal/validators.ts b/src/node/internal/validators.ts index 09efb29e877..7462c074f2b 100644 --- a/src/node/internal/validators.ts +++ b/src/node/internal/validators.ts @@ -203,6 +203,46 @@ export function validateArray(value: unknown, name: string, minLength = 0) { } } +// 1. Returns false for undefined and NaN +// 2. Returns true for finite numbers +// 3. Throws ERR_INVALID_ARG_TYPE for non-numbers +// 4. Throws ERR_OUT_OF_RANGE for infinite numbers +export function checkFiniteNumber(number: unknown, name: string): number is number { + // Common case + if (number === undefined) { + return false; + } + + if (Number.isFinite(number)) { + return true; // Is a valid number + } + + if (Number.isNaN(number)) { + return false; + } + + validateNumber(number, name); + + // Infinite numbers + throw new ERR_OUT_OF_RANGE(name, 'a finite number', number); +} + +// 1. Returns def for number when it's undefined or NaN +// 2. Returns number for finite numbers >= lower and <= upper +// 3. Throws ERR_INVALID_ARG_TYPE for non-numbers +// 4. Throws ERR_OUT_OF_RANGE for infinite numbers or numbers > upper or < lower +export function checkRangesOrGetDefault(number: unknown, name: string, lower: number, upper: number, def: number): number; +export function checkRangesOrGetDefault(number: unknown, name: string, lower: number, upper: number, def?: number | undefined): number | undefined; +export function checkRangesOrGetDefault(number: unknown, name: string, lower: number, upper: number, def: number | undefined = undefined): number | undefined { + if (!checkFiniteNumber(number, name)) { + return def; + } + if (number < lower || number > upper) { + throw new ERR_OUT_OF_RANGE(name, `>= ${lower} and <= ${upper}`, number); + } + return number; +} + export default { isInt32, isUint32, @@ -218,4 +258,8 @@ export default { validateOneOf, validateString, validateUint32, + + // Zlib specific + checkFiniteNumber, + checkRangesOrGetDefault, }; diff --git a/src/node/internal/zlib.d.ts b/src/node/internal/zlib.d.ts index 247e34aa816..48b114ff5ea 100644 --- a/src/node/internal/zlib.d.ts +++ b/src/node/internal/zlib.d.ts @@ -1,3 +1,5 @@ +import { owner_symbol, type Zlib } from 'node-internal:internal_zlib_base'; + export function crc32(data: ArrayBufferView, value: number): number; // zlib.constants (part of the API contract for node:zlib) @@ -113,3 +115,41 @@ export const CONST_BROTLI_DECODER_ERROR_ALLOC_RING_BUFFER_1: number; export const CONST_BROTLI_DECODER_ERROR_ALLOC_RING_BUFFER_2: number; export const CONST_BROTLI_DECODER_ERROR_ALLOC_BLOCK_TYPE_TREES: number; export const CONST_BROTLI_DECODER_ERROR_UNREACHABLE: number; + +export interface ZlibOptions { + flush?: number | undefined; + finishFlush?: number | undefined; + chunkSize?: number | undefined; + windowBits?: number | undefined; + level?: number | undefined; // compression only + memLevel?: number | undefined; // compression only + strategy?: number | undefined; // compression only + dictionary?: ArrayBufferView | undefined; // deflate/inflate only, empty dictionary by default + info?: boolean | undefined; + maxOutputLength?: number | undefined; +} + +type ErrorHandler = (errno: number, code: string, message: string) => void; +type ProcessHandler = () => void; + +export class ZlibStream { + public [owner_symbol]: Zlib; + + // Not used by C++ implementation but required to be Node.js compatible. + public inOff: number; + /* eslint-disable-next-line @typescript-eslint/no-redundant-type-constituents */ + public buffer: NodeJS.TypedArray | null; + public cb: () => void; + public availOutBefore: number; + public availInBefore: number; + public flushFlag: number; + + public constructor(mode: number); + public initialize(windowBits: number, level: number, memLevel: number, strategy: number, writeState: NodeJS.TypedArray, processCallback: ProcessHandler, dictionary: ZlibOptions['dictionary']): void; + public close(): void; + public write(flushFlag: number, inputBuffer: NodeJS.TypedArray, inputOffset: number, inputLength: number, outputBuffer: NodeJS.TypedArray, outputOffset: number, outputLength: number): void; + public params(level: number, strategy: number): void; + public reset(): void; + + public setErrorHandler(cb: ErrorHandler): void; +} diff --git a/src/node/zlib.ts b/src/node/zlib.ts index 488c0057fb6..23695482310 100644 --- a/src/node/zlib.ts +++ b/src/node/zlib.ts @@ -1,11 +1,64 @@ -import { crc32, constants } from 'node-internal:internal_zlib'; +import { + crc32, + constants, + Gzip, + Gunzip, + Deflate, + DeflateRaw, + Inflate, + InflateRaw, + Unzip, + createGzip, + createGunzip, + createDeflate, + createDeflateRaw, + createInflate, + createInflateRaw, + createUnzip, +} from 'node-internal:internal_zlib'; export { crc32, constants, + + // Classes + Gzip, + Gunzip, + Deflate, + DeflateRaw, + Inflate, + InflateRaw, + Unzip, + + // Convenience methods to create classes + createGzip, + createGunzip, + createDeflate, + createDeflateRaw, + createInflate, + createInflateRaw, + createUnzip, } export default { crc32, constants, + + // Classes + Gzip, + Gunzip, + Deflate, + DeflateRaw, + Inflate, + InflateRaw, + Unzip, + + // Convenience methods to create classes + createGzip, + createGunzip, + createDeflate, + createDeflateRaw, + createInflate, + createInflateRaw, + createUnzip, } diff --git a/src/workerd/api/node/tests/zlib-nodejs-test.js b/src/workerd/api/node/tests/zlib-nodejs-test.js index 69c5a48533c..d9ee7795547 100644 --- a/src/workerd/api/node/tests/zlib-nodejs-test.js +++ b/src/workerd/api/node/tests/zlib-nodejs-test.js @@ -1,10 +1,11 @@ import { strictEqual, throws, - deepStrictEqual + deepStrictEqual, + ok as assert, } from 'node:assert'; import { Buffer } from 'node:buffer'; -import { crc32, constants } from 'node:zlib'; +import zlib from 'node:zlib'; // The following test data comes from @@ -193,31 +194,31 @@ export const crc32Test = { } const buf = Buffer.from(data, 'utf8'); strictEqual(buf.length, len); - strictEqual(crc32(buf, crc), expected, + strictEqual(zlib.crc32(buf, crc), expected, `crc32('${data}', ${crc}) in buffer is not ${expected}`); - strictEqual(crc32(buf.toString(), crc), expected, + strictEqual(zlib.crc32(buf.toString(), crc), expected, `crc32('${data}', ${crc}) in string is not ${expected}`); if (crc === 0) { - strictEqual(crc32(buf), expected, + strictEqual(zlib.crc32(buf), expected, `crc32('${data}') in buffer is not ${expected}`); - strictEqual(crc32(buf.toString()), expected, + strictEqual(zlib.crc32(buf.toString()), expected, `crc32('${data}') in string is not ${expected}`); } } [undefined, null, true, 1, () => {}, {}].forEach((invalid) => { - throws(() => { crc32(invalid); }, { code: 'ERR_INVALID_ARG_TYPE' }); + throws(() => { zlib.crc32(invalid); }, { code: 'ERR_INVALID_ARG_TYPE' }); }); [null, true, () => {}, {}].forEach((invalid) => { - throws(() => { crc32('test', invalid); }, { code: 'ERR_INVALID_ARG_TYPE' }); + throws(() => { zlib.crc32('test', invalid); }, { code: 'ERR_INVALID_ARG_TYPE' }); }); } } export const constantsTest = { test() { - deepStrictEqual(Object.keys(constants).sort(), [ + deepStrictEqual(Object.keys(zlib.constants).sort(), [ "BROTLI_DECODE", "BROTLI_DECODER_ERROR_ALLOC_BLOCK_TYPE_TREES", "BROTLI_DECODER_ERROR_ALLOC_CONTEXT_MAP", @@ -328,3 +329,392 @@ export const constantsTest = { ]); } } + +// Tests are taken from: +// https://github.com/nodejs/node/blob/561bc87c7607208f0d3db6dcd9231efeb48cfe2f/test/parallel/test-zlib-zero-windowBits.js +export const testZeroWindowBits = { + test() { + // windowBits is a special case in zlib. On the compression side, 0 is invalid. + // On the decompression side, it indicates that zlib should use the value from + // the header of the compressed stream. + { + const inflate = zlib.createInflate({ windowBits: 0 }); + assert(inflate instanceof zlib.Inflate); + } + + { + const gunzip = zlib.createGunzip({ windowBits: 0 }); + assert(gunzip instanceof zlib.Gunzip); + } + + { + const unzip = zlib.createUnzip({ windowBits: 0 }); + assert(unzip instanceof zlib.Unzip); + } + + throws(() => zlib.createGzip({ windowBits: 0 }), { + code: 'ERR_OUT_OF_RANGE', + name: 'RangeError', + }); + } +} + +// Tests are taken from: +// https://github.com/nodejs/node/blob/561bc87c7607208f0d3db6dcd9231efeb48cfe2f/test/parallel/test-zlib-create-raw.js +export const testCreateRaw = { + test() { + { + const inflateRaw = zlib.createInflateRaw(); + assert(inflateRaw instanceof zlib.InflateRaw); + } + + { + const deflateRaw = zlib.createDeflateRaw(); + assert(deflateRaw instanceof zlib.DeflateRaw); + } + } +} + +// Tests are taken from: +// https://github.com/nodejs/node/blob/561bc87c7607208f0d3db6dcd9231efeb48cfe2f/test/parallel/test-zlib-deflate-constructors.js +export const testDeflateConstructors = { + test() { + assert(new zlib.Deflate() instanceof zlib.Deflate); + assert(new zlib.DeflateRaw() instanceof zlib.DeflateRaw); + + // Throws if `options.chunkSize` is invalid + throws( + () => new zlib.Deflate({ chunkSize: 'test' }), + { + code: 'ERR_INVALID_ARG_TYPE', + name: 'TypeError', + } + ); + + throws( + () => new zlib.Deflate({ chunkSize: -Infinity }), + { + code: 'ERR_OUT_OF_RANGE', + name: 'RangeError', + } + ); + + throws( + () => new zlib.Deflate({ chunkSize: 0 }), + { + code: 'ERR_OUT_OF_RANGE', + name: 'RangeError', + } + ); + + // Throws if `options.windowBits` is invalid + throws( + () => new zlib.Deflate({ windowBits: 'test' }), + { + code: 'ERR_INVALID_ARG_TYPE', + name: 'TypeError', + } + ); + + throws( + () => new zlib.Deflate({ windowBits: -Infinity }), + { + code: 'ERR_OUT_OF_RANGE', + name: 'RangeError', + } + ); + + throws( + () => new zlib.Deflate({ windowBits: Infinity }), + { + code: 'ERR_OUT_OF_RANGE', + name: 'RangeError', + } + ); + + throws( + () => new zlib.Deflate({ windowBits: 0 }), + { + code: 'ERR_OUT_OF_RANGE', + name: 'RangeError', + } + ); + + // Throws if `options.level` is invalid + throws( + () => new zlib.Deflate({ level: 'test' }), + { + code: 'ERR_INVALID_ARG_TYPE', + name: 'TypeError', + } + ); + + throws( + () => new zlib.Deflate({ level: -Infinity }), + { + code: 'ERR_OUT_OF_RANGE', + name: 'RangeError', + } + ); + + throws( + () => new zlib.Deflate({ level: Infinity }), + { + code: 'ERR_OUT_OF_RANGE', + name: 'RangeError', + } + ); + + throws( + () => new zlib.Deflate({ level: -2 }), + { + code: 'ERR_OUT_OF_RANGE', + name: 'RangeError', + } + ); + + // Throws if `level` invalid in `Deflate.prototype.params()` + throws( + () => new zlib.Deflate().params('test'), + { + code: 'ERR_INVALID_ARG_TYPE', + name: 'TypeError', + } + ); + + throws( + () => new zlib.Deflate().params(-Infinity), + { + code: 'ERR_OUT_OF_RANGE', + name: 'RangeError', + } + ); + + throws( + () => new zlib.Deflate().params(Infinity), + { + code: 'ERR_OUT_OF_RANGE', + name: 'RangeError', + } + ); + + throws( + () => new zlib.Deflate().params(-2), + { + code: 'ERR_OUT_OF_RANGE', + name: 'RangeError', + } + ); + + // Throws if options.memLevel is invalid + throws( + () => new zlib.Deflate({ memLevel: 'test' }), + { + code: 'ERR_INVALID_ARG_TYPE', + name: 'TypeError', + } + ); + + throws( + () => new zlib.Deflate({ memLevel: -Infinity }), + { + code: 'ERR_OUT_OF_RANGE', + name: 'RangeError', + } + ); + + throws( + () => new zlib.Deflate({ memLevel: Infinity }), + { + code: 'ERR_OUT_OF_RANGE', + name: 'RangeError', + } + ); + + throws( + () => new zlib.Deflate({ memLevel: -2 }), + { + code: 'ERR_OUT_OF_RANGE', + name: 'RangeError', + } + ); + + // Does not throw if opts.strategy is valid + new zlib.Deflate({ strategy: zlib.constants.Z_FILTERED }); + new zlib.Deflate({ strategy: zlib.constants.Z_HUFFMAN_ONLY }); + new zlib.Deflate({ strategy: zlib.constants.Z_RLE }); + new zlib.Deflate({ strategy: zlib.constants.Z_FIXED }); + new zlib.Deflate({ strategy: zlib.constants.Z_DEFAULT_STRATEGY }); + + // Throws if options.strategy is invalid + throws( + () => new zlib.Deflate({ strategy: 'test' }), + { + code: 'ERR_INVALID_ARG_TYPE', + name: 'TypeError', + } + ); + + throws( + () => new zlib.Deflate({ strategy: -Infinity }), + { + code: 'ERR_OUT_OF_RANGE', + name: 'RangeError', + } + ); + + throws( + () => new zlib.Deflate({ strategy: Infinity }), + { + code: 'ERR_OUT_OF_RANGE', + name: 'RangeError', + } + ); + + throws( + () => new zlib.Deflate({ strategy: -2 }), + { + code: 'ERR_OUT_OF_RANGE', + name: 'RangeError', + } + ); + + // Throws TypeError if `strategy` is invalid in `Deflate.prototype.params()` + throws( + () => new zlib.Deflate().params(0, 'test'), + { + code: 'ERR_INVALID_ARG_TYPE', + name: 'TypeError', + } + ); + + throws( + () => new zlib.Deflate().params(0, -Infinity), + { + code: 'ERR_OUT_OF_RANGE', + name: 'RangeError', + } + ); + + throws( + () => new zlib.Deflate().params(0, Infinity), + { + code: 'ERR_OUT_OF_RANGE', + name: 'RangeError', + } + ); + + throws( + () => new zlib.Deflate().params(0, -2), + { + code: 'ERR_OUT_OF_RANGE', + name: 'RangeError', + } + ); + + // Throws if opts.dictionary is not a Buffer + throws( + () => new zlib.Deflate({ dictionary: 'not a buffer' }), + { + code: 'ERR_INVALID_ARG_TYPE', + name: 'TypeError', + } + ); + } +} + +// Tests are taken from: +// https://github.com/nodejs/node/blob/561bc87c7607208f0d3db6dcd9231efeb48cfe2f/test/parallel/test-zlib-failed-init.js +export const testFailedInit = { + test() { + assert.throws( + () => zlib.createGzip({ chunkSize: 0 }), + { + code: 'ERR_OUT_OF_RANGE', + name: 'RangeError', + } + ); + + assert.throws( + () => zlib.createGzip({ windowBits: 0 }), + { + code: 'ERR_OUT_OF_RANGE', + name: 'RangeError', + } + ); + + assert.throws( + () => zlib.createGzip({ memLevel: 0 }), + { + code: 'ERR_OUT_OF_RANGE', + name: 'RangeError', + } + ); + + { + const stream = zlib.createGzip({ level: NaN }); + assert.strictEqual(stream._level, zlib.constants.Z_DEFAULT_COMPRESSION); + } + + { + const stream = zlib.createGzip({ strategy: NaN }); + assert.strictEqual(stream._strategy, zlib.constants.Z_DEFAULT_STRATEGY); + } + } +} + +// Node.js tests relevant to zlib +// +// - [ ] test-zlib-brotli-16GB.js +// - [ ] test-zlib-convenience-methods.js +// - [ ] test-zlib-flush-drain.js +// - [ ] test-zlib-invalid-input-memory.js +// - [ ] test-zlib-sync-no-event.js +// - [ ] test-zlib-brotli-flush.js +// - [x] test-zlib-crc32.js +// - [ ] test-zlib-flush-drain-longblock.js +// - [ ] test-zlib.js +// - [ ] test-zlib-truncated.js +// - [ ] test-zlib-brotli-from-brotli.js +// - [x] test-zlib-create-raw.js +// - [ ] test-zlib-flush-flags.js +// - [ ] test-zlib-kmaxlength-rangeerror.js +// - [ ] test-zlib-unused-weak.js +// - [ ] test-zlib-brotli-from-string.js +// - [x] test-zlib-deflate-constructors.js +// - [ ] test-zlib-flush.js +// - [ ] test-zlib-maxOutputLength.js +// - [ ] test-zlib-unzip-one-byte-chunks.js +// - [ ] test-zlib-brotli.js +// - [ ] test-zlib-deflate-raw-inherits.js +// - [ ] test-zlib-flush-write-sync-interleaved.js +// - [ ] test-zlib-no-stream.js +// - [ ] test-zlib-write-after-close.js +// - [ ] test-zlib-brotli-kmaxlength-rangeerror.js +// - [ ] test-zlib-destroy.js +// - [ ] test-zlib-from-concatenated-gzip.js +// - [ ] test-zlib-not-string-or-buffer.js +// - [ ] test-zlib-write-after-end.js +// - [ ] test-zlib-bytes-read.js +// - [ ] test-zlib-destroy-pipe.js +// - [ ] test-zlib-from-gzip.js +// - [ ] test-zlib-object-write.js +// - [ ] test-zlib-write-after-flush.js +// - [ ] test-zlib-close-after-error.js +// - [ ] test-zlib-dictionary-fail.js +// - [ ] test-zlib-from-gzip-with-trailing-garbage.js +// - [ ] test-zlib-params.js +// - [ ] test-zlib-zero-byte.js +// - [ ] test-zlib-close-after-write.js +// - [ ] test-zlib-dictionary.js +// - [ ] test-zlib-from-string.js +// - [ ] test-zlib-premature-end.js +// - [x] test-zlib-zero-windowBits.js +// - [ ] test-zlib-close-in-ondata.js +// - [ ] test-zlib-empty-buffer.js +// - [ ] test-zlib-invalid-arg-value-brotli-compress.js +// - [ ] test-zlib-random-byte-pipes.js +// - [ ] test-zlib-const.js +// - [x] test-zlib-failed-init.js +// - [ ] test-zlib-invalid-input.js +// - [ ] test-zlib-reset-before-write.js diff --git a/src/workerd/api/node/zlib-util.c++ b/src/workerd/api/node/zlib-util.c++ index 567d4062d74..fe3f2734658 100644 --- a/src/workerd/api/node/zlib-util.c++ +++ b/src/workerd/api/node/zlib-util.c++ @@ -1,11 +1,426 @@ // Copyright (c) 2017-2022 Cloudflare, Inc. // Licensed under the Apache 2.0 license found in the LICENSE file or at: // https://opensource.org/licenses/Apache-2.0 +// Copyright Joyent and Node contributors. All rights reserved. MIT license. + #include "zlib-util.h" namespace workerd::api::node { + uint32_t ZlibUtil::crc32Sync(kj::Array data, uint32_t value) { // Note: Bytef is defined in zlib.h return crc32(value, reinterpret_cast(data.begin()), data.size()); } + +void ZlibContext::initialize(int _level, int _windowBits, int _memLevel, int _strategy, jsg::Optional> _dictionary) { + if (!((_windowBits == 0) && (mode == ZlibMode::INFLATE || mode == ZlibMode::GUNZIP || mode == ZlibMode::UNZIP))) { + JSG_ASSERT(_windowBits >= Z_MIN_WINDOWBITS && _windowBits <= Z_MAX_WINDOWBITS, Error, "Invalid windowBits"_kj); + } + + JSG_REQUIRE(_level >= Z_MIN_LEVEL && _level <= Z_MAX_LEVEL, Error, "Invalid compression level"_kj); + JSG_REQUIRE(_memLevel >= Z_MIN_MEMLEVEL && _memLevel <= Z_MAX_MEMLEVEL, Error, "Invalid memlevel"_kj); + JSG_REQUIRE(_strategy == Z_FILTERED || _strategy == Z_HUFFMAN_ONLY || + _strategy == Z_RLE || _strategy == Z_FIXED || + _strategy == Z_DEFAULT_STRATEGY, Error, "invalid strategy"_kj); + + level = _level; + windowBits = _windowBits; + memLevel = _memLevel; + strategy = _strategy; + flush = Z_NO_FLUSH; + + switch (mode) { + case ZlibMode::GZIP: + case ZlibMode::GUNZIP: + windowBits += 16; + break; + case ZlibMode::UNZIP: + windowBits += 32; + break; + case ZlibMode::DEFLATERAW: + case ZlibMode::INFLATERAW: + windowBits *= -1; + break; + default: + break; + } + + KJ_IF_SOME(dict, _dictionary) { + dictionary = kj::mv(dict); + } +} + +kj::Maybe ZlibContext::getError() const { + // Acceptable error states depend on the type of zlib stream. + switch (err) { + case Z_OK: + case Z_BUF_ERROR: + if (stream.avail_out != 0 && flush == Z_FINISH) { + return constructError("unexpected end of file"_kj); + } + case Z_STREAM_END: + // normal statuses, not fatal + break; + case Z_NEED_DICT: + if (dictionary.empty()) + return constructError("Missing dictionary"_kj); + else + return constructError("Bad dictionary"_kj); + default: + // something else. + return constructError("Zlib error"); + } + + return {}; +} + +kj::Maybe ZlibContext::setDictionary() { + if (dictionary.empty()) { + return kj::none; + } + + switch (mode) { + case ZlibMode::DEFLATE: + case ZlibMode::DEFLATERAW: + err = deflateSetDictionary(&stream, dictionary.begin(), dictionary.size());; + break; + case ZlibMode::INFLATERAW: + err = inflateSetDictionary(&stream, dictionary.begin(), dictionary.size()); + break; + default: + break; + } + + if (err != Z_OK) { + return constructError("Failed to set dictionary"_kj); + } + + return kj::none; +} + +bool ZlibContext::initializeZlib() { + if (initialized.lockExclusive()) { + return false; + } + switch (mode) { + case ZlibMode::DEFLATE: + case ZlibMode::GZIP: + case ZlibMode::DEFLATERAW: + err = deflateInit2(&stream, level, Z_DEFLATED, windowBits, memLevel, strategy); + break; + case ZlibMode::INFLATE: + case ZlibMode::GUNZIP: + case ZlibMode::INFLATERAW: + case ZlibMode::UNZIP: + err = inflateInit2(&stream, windowBits); + break; + default: + KJ_UNREACHABLE; + } + + if (err != Z_OK) { + dictionary.clear(); + mode = ZlibMode::NONE; + return true; + } + + setDictionary(); + *initialized.lockExclusive() = true; + return true; +} + +kj::Maybe ZlibContext::resetStream() { + bool initialized_now = initializeZlib(); + if (initialized_now && err != Z_OK) { + return constructError("Failed to init stream before reset"); + } + switch (mode) { + case ZlibMode::DEFLATE: + case ZlibMode::DEFLATERAW: + case ZlibMode::GZIP: + err = deflateReset(&stream); + break; + case ZlibMode::INFLATE: + case ZlibMode::INFLATERAW: + case ZlibMode::GUNZIP: + err = inflateReset(&stream); + break; + default: + break; + } + + if (err != Z_OK) { + return constructError("Failed to reset stream"_kj); + } + + return setDictionary(); +} + +void ZlibContext::work() { + bool initialized_now = initializeZlib(); + if (initialized_now && err != Z_OK) { + return; + } + const Bytef* next_expected_header_byte = nullptr; + + err = Z_OK; + + // If the avail_out is left at 0, then it means that it ran out + // of room. If there was avail_out left over, then it means + // that all the input was consumed. + switch (mode) { + case ZlibMode::DEFLATE: + case ZlibMode::GZIP: + case ZlibMode::DEFLATERAW: + err = deflate(&stream, flush); + break; + case ZlibMode::UNZIP: + if (stream.avail_in > 0) { + next_expected_header_byte = stream.next_in; + } + + if (next_expected_header_byte == nullptr) { + break; + } + + switch (gzip_id_bytes_read) { + case 0: + if (*next_expected_header_byte == GZIP_HEADER_ID1) { + gzip_id_bytes_read = 1; + next_expected_header_byte++; + + if (stream.avail_in == 1) { + // The only available byte was already read. + break; + } + } else { + mode = ZlibMode::INFLATE; + break; + } + + [[fallthrough]]; + case 1: + if (*next_expected_header_byte == GZIP_HEADER_ID2) { + gzip_id_bytes_read = 2; + mode = ZlibMode::GUNZIP; + } else { + // There is no actual difference between INFLATE and INFLATERAW + // (after initialization). + mode = ZlibMode::INFLATE; + } + + break; + default: + JSG_FAIL_REQUIRE(Error, "Invalid number of gzip magic number bytes read"); + } + + [[fallthrough]]; + case ZlibMode::INFLATE: + case ZlibMode::GUNZIP: + case ZlibMode::INFLATERAW: + err = inflate(&stream, flush); + + // If data was encoded with dictionary (INFLATERAW will have it set in + // SetDictionary, don't repeat that here) + if (mode != ZlibMode::INFLATERAW && + err == Z_NEED_DICT && + !dictionary.empty()) { + err = inflateSetDictionary(&stream, dictionary.begin(), dictionary.size()); + if (err == Z_OK) { + // And try to decode again + err = inflate(&stream, flush); + } else if (err == Z_DATA_ERROR) { + // Both inflateSetDictionary() and inflate() return Z_DATA_ERROR. + // Make it possible for After() to tell a bad dictionary from bad + // input. + err = Z_NEED_DICT; + } + } + + while (stream.avail_in > 0 && + mode == ZlibMode::GUNZIP && + err == Z_STREAM_END && + stream.next_in[0] != '\0') { + // Bytes remain in input buffer. Perhaps this is another compressed + // member in the same archive, or just trailing garbage. + // Trailing zero bytes are okay, though, since they are frequently + // used for padding. + + resetStream(); + err = inflate(&stream, flush); + } + break; + default: + KJ_UNREACHABLE; + } +} + +kj::Maybe ZlibContext::setParams(int _level, int _strategy) { + bool initialized_now = initializeZlib(); + if (initialized_now && err != Z_OK) { + return constructError("Failed to init stream before set parameters"); + } + err = Z_OK; + + switch (mode) { + case ZlibMode::DEFLATE: + case ZlibMode::DEFLATERAW: + err = deflateParams(&stream, level, strategy); + break; + default: + break; + } + + if (err != Z_OK) { + return constructError("Failed to set parameters"); + } + + return kj::none; +} + +void ZlibContext::close() { + if (!initialized.lockExclusive()) { + dictionary.clear(); + mode = ZlibMode::NONE; + return; + } + + auto status = Z_OK; + switch (mode) { + case ZlibMode::DEFLATE: + case ZlibMode::DEFLATERAW: + case ZlibMode::GZIP: + status = deflateEnd(&stream); + break; + case ZlibMode::INFLATE: + case ZlibMode::INFLATERAW: + case ZlibMode::GUNZIP: + case ZlibMode::UNZIP: + status = inflateEnd(&stream); + break; + default: + KJ_UNREACHABLE; + } + + JSG_REQUIRE(status == Z_OK || status == Z_DATA_ERROR, Error, "Uncaught error on closing zlib stream"); + mode = ZlibMode::NONE; + dictionary.clear(); +} + +void ZlibContext::setBuffers(kj::ArrayPtr input, uint32_t inputLength, kj::ArrayPtr output, uint32_t outputLength) { + stream.avail_in = inputLength; + stream.next_in = input.begin(); + stream.avail_out = outputLength; + stream.next_out = output.begin(); +} + +jsg::Ref ZlibUtil::ZlibStream::constructor(ZlibModeValue mode) { + return jsg::alloc(static_cast(mode)); +} + +template +CompressionStream::~CompressionStream() { + JSG_ASSERT(!writing, Error, "Writing to compression stream"_kj); + close(); +} + +template +void CompressionStream::emitError(jsg::Lock& js, const CompressionError& error) { + KJ_IF_SOME(onError, errorHandler) { + onError(js, error.err, kj::mv(error.code), kj::mv(error.message)); + } + + writing = false; + if (pending_close) { + close(); + } +} + +template +void CompressionStream::writeStream(jsg::Lock& js, int flush, kj::ArrayPtr input, uint32_t inputLength, kj::ArrayPtr output, uint32_t outputLength) { + JSG_REQUIRE(initialized, Error, "Writing before initializing"_kj); + JSG_REQUIRE(!closed, Error, "Already finalized"_kj); + JSG_REQUIRE(!writing, Error, "Writing is in progress"_kj); + JSG_REQUIRE(!pending_close, Error, "Pending close"_kj); + + // Ref(); + context.setBuffers(input, inputLength, output, outputLength); + context.setFlush(flush); + + // This implementation always follow the sync version. + context.work(); + if (checkError(js)) { + context.getAfterWriteOffsets(writeResult); + writing = false; + } + // Unref(); +} + +template +void CompressionStream::close() { + pending_close = writing; + if (writing) { + return; + } + closed = true; + JSG_ASSERT(initialized, Error, "Closing before initialized"_kj); + context.close(); +} + +template +bool CompressionStream::checkError(jsg::Lock& js) { + KJ_IF_SOME(error, context.getError()) { + emitError(js, kj::mv(error)); + return false; + } + return true; +} + +template +void CompressionStream::initializeStream(kj::ArrayPtr _writeResult, jsg::Function _writeCallback) { + writeResult = kj::mv(_writeResult); + writeCallback = kj::mv(_writeCallback); + initialized = true; +} + +ZlibUtil::ZlibStream::ZlibStream(ZlibMode mode) : CompressionStream() { + context.setMode(mode); +} + +void ZlibUtil::ZlibStream::initialize(int windowBits, int level, int memLevel, int strategy, kj::Array writeState, jsg::Function writeCallback, jsg::Optional> dictionary) { + initializeStream(writeState.asPtr(), kj::mv(writeCallback)); + context.initialize(level, windowBits, memLevel, strategy, kj::mv(dictionary)); +} + +void ZlibUtil::ZlibStream::write(jsg::Lock& js, int flush, kj::Array input, int inputOffset, + int inputLength, kj::Array output, int outputOffset, + int outputLength) { + if (flush != Z_NO_FLUSH && + flush != Z_PARTIAL_FLUSH && + flush != Z_SYNC_FLUSH && + flush != Z_FULL_FLUSH && + flush != Z_FINISH && + flush != Z_BLOCK) { + JSG_FAIL_REQUIRE(Error, "Invalid flush value"); + } + + // Check bounds + JSG_REQUIRE(inputOffset > 0 && inputOffset < input.size(), Error, "Offset should be smaller than size and bigger than 0"_kj); + JSG_REQUIRE(input.size() >= inputLength, Error, "Invalid inputLength"_kj); + JSG_REQUIRE(outputOffset > 0 && outputOffset < output.size(), Error, "Offset should be smaller than size and bigger than 0"_kj); + JSG_REQUIRE(output.size() >= outputLength, Error, "Invalid outputLength"_kj); + + writeStream(js, flush, input.slice(inputOffset), inputLength, output.slice(outputOffset), outputLength); +} + +void ZlibUtil::ZlibStream::params(int level, int strategy) { + context.setParams(level, strategy); +} + +void ZlibUtil::ZlibStream::reset(jsg::Lock& js) { + KJ_IF_SOME(error, context.resetStream()) { + emitError(js, kj::mv(error)); + } +} + } // namespace workerd::api::node diff --git a/src/workerd/api/node/zlib-util.h b/src/workerd/api/node/zlib-util.h index c5ec664e291..ca9ed3217df 100644 --- a/src/workerd/api/node/zlib-util.h +++ b/src/workerd/api/node/zlib-util.h @@ -1,57 +1,206 @@ // Copyright (c) 2017-2022 Cloudflare, Inc. // Licensed under the Apache 2.0 license found in the LICENSE file or at: // https://opensource.org/licenses/Apache-2.0 +// Copyright Joyent and Node contributors. All rights reserved. MIT license. +#pragma once #include +#include +#include "zlib.h" #include #include -#include "zlib.h" +#include +#include -#include -#include +#include namespace workerd::api::node { +#ifndef ZLIB_ERROR_CODES +#define ZLIB_ERROR_CODES(V) \ + V(Z_OK) \ + V(Z_STREAM_END) \ + V(Z_NEED_DICT) \ + V(Z_ERRNO) \ + V(Z_STREAM_ERROR) \ + V(Z_DATA_ERROR) \ + V(Z_MEM_ERROR) \ + V(Z_BUF_ERROR) \ + V(Z_VERSION_ERROR) \ + +inline const char* ZlibStrerror(int err) { +#define V(code) if (err == code) return #code; + ZLIB_ERROR_CODES(V) +#undef V + return "Z_UNKNOWN_ERROR"; +} +#endif // ZLIB_ERROR_CODES + +// Certain zlib constants are defined by Node.js itself +static constexpr auto Z_MIN_CHUNK = 64; +static constexpr auto Z_MAX_CHUNK = 128 * 1024 * 1024; +static constexpr auto Z_DEFAULT_CHUNK = 16 * 1024; +static constexpr auto Z_MIN_MEMLEVEL = 1; + +static constexpr auto Z_MAX_MEMLEVEL = 9; +static constexpr auto Z_DEFAULT_MEMLEVEL = 8; +static constexpr auto Z_MIN_LEVEL = -1; +static constexpr auto Z_MAX_LEVEL = 9; +static constexpr auto Z_DEFAULT_LEVEL = Z_DEFAULT_COMPRESSION; +static constexpr auto Z_MIN_WINDOWBITS = 8; +static constexpr auto Z_MAX_WINDOWBITS = 15; +static constexpr auto Z_DEFAULT_WINDOWBITS = 15; + +static constexpr uint8_t GZIP_HEADER_ID1 = 0x1f; +static constexpr uint8_t GZIP_HEADER_ID2 = 0x8b; + +using ZlibModeValue = uint8_t; +enum class ZlibMode : ZlibModeValue { + NONE, + DEFLATE, + INFLATE, + GZIP, + GUNZIP, + DEFLATERAW, + INFLATERAW, + UNZIP, + BROTLI_DECODE, + BROTLI_ENCODE +}; + +struct CompressionError { + CompressionError(kj::StringPtr _message, kj::StringPtr _code, int _err) + : message(kj::str(_message)), + code(kj::str(_code)), + err(_err) { + JSG_REQUIRE(message.size() != 0, Error, "Compression error message should not be null"); + } + + kj::String message; + kj::String code; + int err; +}; + +class ZlibContext final { +public: + ZlibContext() = default; + + KJ_DISALLOW_COPY(ZlibContext); + + void close(); + void setBuffers(kj::ArrayPtr input, uint32_t inputLength, kj::ArrayPtr output, uint32_t outputLength); + int getFlush() const { return flush; }; + void setFlush(int value) { flush = value;}; + void getAfterWriteOffsets(kj::ArrayPtr writeResult) const { + writeResult[0] = stream.avail_out; + writeResult[1] = stream.avail_in; + } + void setMode(ZlibMode value) { + mode = value; + }; + kj::Maybe resetStream(); + kj::Maybe getError() const; + void work(); + + uint getAvailIn() const { return stream.avail_in; }; + void setAvailIn(uint value) { stream.avail_in = value; }; + uint getAvailOut() const { return stream.avail_out; } + void setAvailOut(uint value) { stream.avail_out = value; }; + + // Zlib + void initialize(int _level, int _windowBits, int _memLevel, int _strategy, jsg::Optional> _dictionary); + kj::Maybe setParams(int level, int strategy); + +private: + bool initializeZlib(); + kj::Maybe setDictionary(); + + CompressionError constructError(kj::StringPtr message) const { + if (stream.msg != nullptr) + message = kj::StringPtr(stream.msg); + + return {kj::str(message), kj::str(ZlibStrerror(err)), err}; + }; + + kj::MutexGuarded initialized{false}; + ZlibMode mode = ZlibMode::NONE; + int flush = Z_NO_FLUSH; + int windowBits = 0; + int level = 0; + int memLevel = 0; + int strategy = 0; + kj::Vector dictionary{}; + + int err = Z_OK; + unsigned int gzip_id_bytes_read = 0; + z_stream stream{}; +}; + +using CompressionStreamErrorHandler = jsg::Function; + +template +class CompressionStream { +public: + CompressionStream() = default; + ~CompressionStream(); + KJ_DISALLOW_COPY_AND_MOVE(CompressionStream); + + void close(); + bool checkError(jsg::Lock& js); + void emitError(jsg::Lock& js, const CompressionError& error); + void writeStream(jsg::Lock& js, int flush, kj::ArrayPtr input, uint32_t inputLength, kj::ArrayPtr output, uint32_t outputLength); + void setErrorHandler(CompressionStreamErrorHandler handler) { errorHandler = kj::mv(handler); }; + void initializeStream(kj::ArrayPtr _write_result, jsg::Function writeCallback); + +protected: + CompressionContext context; + +private: + bool initialized = false; + bool writing = false; + bool pending_close = false; + bool closed = false; + + // Equivalent to `write_js_callback` in Node.js + jsg::Optional> writeCallback; + kj::ArrayPtr writeResult = nullptr; + jsg::Optional errorHandler; +}; + // Implements utilities in support of the Node.js Zlib class ZlibUtil final : public jsg::Object { public: ZlibUtil() = default; ZlibUtil(jsg::Lock&, const jsg::Url&) {} - // Certain zlib constants are defined by NodeJS itself - static constexpr auto Z_MIN_CHUNK = 64; - static constexpr auto Z_MAX_CHUNK = 128 * 1024; - static constexpr auto Z_DEFAULT_CHUNK = 16 * 1024; - static constexpr auto Z_MIN_MEMLEVEL = 1; - - static constexpr auto Z_MAX_MEMLEVEL = 9; - static constexpr auto Z_DEFAULT_MEMLEVEL = 8; - static constexpr auto Z_MIN_LEVEL = -1; - static constexpr auto Z_MAX_LEVEL = 9; - static constexpr auto Z_DEFAULT_LEVEL = Z_DEFAULT_COMPRESSION; - static constexpr auto Z_MIN_WINDOWBITS = 8; - static constexpr auto Z_MAX_WINDOWBITS = 15; - static constexpr auto Z_DEFAULT_WINDOWBITS = 15; - - using ZlibModeValue = uint8_t; - enum class ZlibMode: ZlibModeValue { - NONE, - DEFLATE, - INFLATE, - GZIP, - GUNZIP, - DEFLATERAW, - INFLATERAW, - UNZIP, - BROTLI_DECODE, - BROTLI_ENCODE + class ZlibStream final : public jsg::Object, public CompressionStream { + public: + ZlibStream(ZlibMode mode); + KJ_DISALLOW_COPY_AND_MOVE(ZlibStream); + static jsg::Ref constructor(ZlibModeValue mode); + + // Instance methods + void initialize(int windowBits, int level, int memLevel, int strategy, kj::Array writeState, jsg::Function writeCallback, jsg::Optional> dictionary); + void write(jsg::Lock& js,int flush, kj::Array input, int inputOffset, int inputLength, kj::Array output, int outputOffset, int outputLength); + void params(int level, int strategy); + void reset(jsg::Lock& js); + + JSG_RESOURCE_TYPE(ZlibStream) { + JSG_METHOD(initialize); + JSG_METHOD(close); + JSG_METHOD(write); + JSG_METHOD(params); + JSG_METHOD(setErrorHandler); + JSG_METHOD(reset); + } }; uint32_t crc32Sync(kj::Array data, uint32_t value); JSG_RESOURCE_TYPE(ZlibUtil) { JSG_METHOD_NAMED(crc32, crc32Sync); + JSG_NESTED_TYPE(ZlibStream); // zlib.constants (part of the API contract for node:zlib) JSG_STATIC_CONSTANT_NAMED(CONST_Z_NO_FLUSH, Z_NO_FLUSH); @@ -89,8 +238,10 @@ class ZlibUtil final : public jsg::Object { JSG_STATIC_CONSTANT_NAMED(CONST_DEFLATERAW, static_cast(ZlibMode::DEFLATERAW)); JSG_STATIC_CONSTANT_NAMED(CONST_INFLATERAW, static_cast(ZlibMode::INFLATERAW)); JSG_STATIC_CONSTANT_NAMED(CONST_UNZIP, static_cast(ZlibMode::UNZIP)); - JSG_STATIC_CONSTANT_NAMED(CONST_BROTLI_DECODE, static_cast(ZlibMode::BROTLI_DECODE)); - JSG_STATIC_CONSTANT_NAMED(CONST_BROTLI_ENCODE, static_cast(ZlibMode::BROTLI_ENCODE)); + JSG_STATIC_CONSTANT_NAMED(CONST_BROTLI_DECODE, + static_cast(ZlibMode::BROTLI_DECODE)); + JSG_STATIC_CONSTANT_NAMED(CONST_BROTLI_ENCODE, + static_cast(ZlibMode::BROTLI_ENCODE)); JSG_STATIC_CONSTANT_NAMED(CONST_Z_MIN_WINDOWBITS, Z_MIN_WINDOWBITS); JSG_STATIC_CONSTANT_NAMED(CONST_Z_MAX_WINDOWBITS, Z_MAX_WINDOWBITS); @@ -201,6 +352,6 @@ class ZlibUtil final : public jsg::Object { }; }; -#define EW_NODE_ZLIB_ISOLATE_TYPES api::node::ZlibUtil +#define EW_NODE_ZLIB_ISOLATE_TYPES api::node::ZlibUtil, api::node::ZlibUtil::ZlibStream } // namespace workerd::api::node