diff --git a/src/node/internal/compatibility-flags.d.ts b/src/node/internal/compatibility-flags.d.ts index 1bb1a90c397..a8e830009c6 100644 --- a/src/node/internal/compatibility-flags.d.ts +++ b/src/node/internal/compatibility-flags.d.ts @@ -30,3 +30,4 @@ export const specCompliantResponseRedirect: boolean; export const workerdExperimental: boolean; export const durableObjectGetExisting: boolean; export const vectorizeQueryMetadataOptional: boolean; +export const nodeJsZlib: boolean; diff --git a/src/node/internal/internal_assert.ts b/src/node/internal/internal_assert.ts index a23e9759e15..87075aac5b9 100644 --- a/src/node/internal/internal_assert.ts +++ b/src/node/internal/internal_assert.ts @@ -77,7 +77,9 @@ function assert(actual: unknown, message?: string | Error): asserts actual { } as AssertionErrorConstructorOptions); } } -export const ok = assert; + +type Assert = (actual: unknown, message?: string | Error) => asserts actual; +export const ok: Assert = assert; export function throws( fn: () => void, diff --git a/src/node/internal/internal_errors.ts b/src/node/internal/internal_errors.ts index aad91d506c2..3f829e27c3e 100644 --- a/src/node/internal/internal_errors.ts +++ b/src/node/internal/internal_errors.ts @@ -526,6 +526,15 @@ export class ERR_STREAM_UNSHIFT_AFTER_END_EVENT extends NodeError { } } +export class ERR_BUFFER_TOO_LARGE extends NodeRangeError { + constructor(value: number) { + super( + 'ERR_BUFFER_TOO_LARGE', + `Cannot create a Buffer larger than ${value} bytes` + ); + } +} + export function aggregateTwoErrors(innerError: any, outerError: any) { if (innerError && outerError && innerError !== outerError) { if (Array.isArray(outerError.errors)) { diff --git a/src/node/internal/internal_zlib.ts b/src/node/internal/internal_zlib.ts index f9c88c8bfa5..7c2836792ee 100644 --- a/src/node/internal/internal_zlib.ts +++ b/src/node/internal/internal_zlib.ts @@ -1,21 +1,27 @@ -import { default as zlibUtil } from 'node-internal:zlib'; +// Copyright (c) 2017-2022 Cloudflare, Inc. +// Licensed under the Apache 2.0 license found in the LICENSE file or at: +// https://opensource.org/licenses/Apache-2.0 +// Copyright Joyent and Node contributors. All rights reserved. MIT license. + +import { default as zlibUtil, type ZlibOptions } from 'node-internal:zlib'; import { Buffer } from 'node-internal:internal_buffer'; import { validateUint32 } from 'node-internal:validators'; -import { isArrayBufferView } from 'node-internal:internal_types'; import { ERR_INVALID_ARG_TYPE } from 'node-internal:internal_errors'; +import { isArrayBufferView } from 'node-internal:internal_types'; +import { Zlib } from 'node-internal:internal_zlib_base'; -function crc32(data: ArrayBufferView | string, value: number = 0): number { - if (typeof data === 'string') { - data = Buffer.from(data); - } else if (!isArrayBufferView(data)) { - throw new ERR_INVALID_ARG_TYPE('data', 'ArrayBufferView', typeof data); - } - validateUint32(value, 'value'); - return zlibUtil.crc32(data, value); -} +const { + CONST_DEFLATE, + CONST_DEFLATERAW, + CONST_INFLATE, + CONST_INFLATERAW, + CONST_GUNZIP, + CONST_GZIP, + CONST_UNZIP, +} = zlibUtil; const constPrefix = 'CONST_'; -const constants = {}; +export const constants: Record = {}; Object.defineProperties( constants, @@ -35,4 +41,88 @@ Object.defineProperties( ) ); -export { crc32, constants }; +export function crc32( + data: ArrayBufferView | string, + value: number = 0 +): number { + if (typeof data === 'string') { + data = Buffer.from(data); + } else if (!isArrayBufferView(data)) { + throw new ERR_INVALID_ARG_TYPE('data', 'ArrayBufferView', typeof data); + } + validateUint32(value, 'value'); + return zlibUtil.crc32(data, value); +} + +export class Gzip extends Zlib { + public constructor(options: ZlibOptions) { + super(options, CONST_GZIP); + } +} + +export class Gunzip extends Zlib { + public constructor(options: ZlibOptions) { + super(options, CONST_GUNZIP); + } +} + +export class Deflate extends Zlib { + public constructor(options: ZlibOptions) { + super(options, CONST_DEFLATE); + } +} + +export class DeflateRaw extends Zlib { + public constructor(options?: ZlibOptions) { + if (options?.windowBits === 8) { + options.windowBits = 9; + } + super(options, CONST_DEFLATERAW); + } +} + +export class Inflate extends Zlib { + public constructor(options: ZlibOptions) { + super(options, CONST_INFLATE); + } +} + +export class InflateRaw extends Zlib { + public constructor(options: ZlibOptions) { + super(options, CONST_INFLATERAW); + } +} + +export class Unzip extends Zlib { + public constructor(options: ZlibOptions) { + super(options, CONST_UNZIP); + } +} + +export function createGzip(options: ZlibOptions): Gzip { + return new Gzip(options); +} + +export function createGunzip(options: ZlibOptions): Gunzip { + return new Gunzip(options); +} + +export function createDeflate(options: ZlibOptions): Deflate { + return new Deflate(options); +} + +export function createDeflateRaw(options: ZlibOptions): DeflateRaw { + return new DeflateRaw(options); +} + +export function createInflate(options: ZlibOptions): Inflate { + return new Inflate(options); +} + +export function createInflateRaw(options: ZlibOptions): InflateRaw { + return new InflateRaw(options); +} + +export function createUnzip(options: ZlibOptions): Unzip { + return new Unzip(options); +} diff --git a/src/node/internal/internal_zlib_base.ts b/src/node/internal/internal_zlib_base.ts new file mode 100644 index 00000000000..4d15a2ac69d --- /dev/null +++ b/src/node/internal/internal_zlib_base.ts @@ -0,0 +1,681 @@ +// Copyright (c) 2017-2022 Cloudflare, Inc. +// Licensed under the Apache 2.0 license found in the LICENSE file or at: +// https://opensource.org/licenses/Apache-2.0 +// Copyright Joyent and Node contributors. All rights reserved. MIT license. + +import { default as zlibUtil, type ZlibOptions } from 'node-internal:zlib'; +import { Buffer, kMaxLength } from 'node-internal:internal_buffer'; +import { + checkRangesOrGetDefault, + checkFiniteNumber, +} from 'node-internal:validators'; +import { + ERR_OUT_OF_RANGE, + ERR_BUFFER_TOO_LARGE, + ERR_INVALID_ARG_TYPE, + NodeError, +} from 'node-internal:internal_errors'; +import { Transform, type DuplexOptions } from 'node-internal:streams_transform'; +import { eos as finished } from 'node-internal:streams_util'; +import { + isArrayBufferView, + isAnyArrayBuffer, +} from 'node-internal:internal_types'; + +// Explicitly import `ok()` to avoid typescript error requiring every name in the call target to +// be annotated with an explicit type annotation. +import assert, { ok } from 'node-internal:internal_assert'; + +const { + CONST_INFLATE, + CONST_GUNZIP, + CONST_GZIP, + CONST_UNZIP, + CONST_Z_DEFAULT_CHUNK, + CONST_Z_DEFAULT_STRATEGY, + CONST_Z_DEFAULT_MEMLEVEL, + CONST_Z_DEFAULT_WINDOWBITS, + CONST_Z_DEFAULT_COMPRESSION, + CONST_Z_FIXED, + CONST_Z_MAX_LEVEL, + CONST_Z_MAX_MEMLEVEL, + CONST_Z_MAX_WINDOWBITS, + CONST_Z_MIN_LEVEL, + CONST_Z_MIN_MEMLEVEL, + CONST_Z_SYNC_FLUSH, + CONST_Z_NO_FLUSH, + CONST_Z_BLOCK, + CONST_Z_MIN_CHUNK, + CONST_Z_PARTIAL_FLUSH, + CONST_Z_FULL_FLUSH, + CONST_Z_FINISH, + CONST_BROTLI_ENCODE, + CONST_BROTLI_DECODE, + CONST_BROTLI_OPERATION_PROCESS, + CONST_BROTLI_OPERATION_EMIT_METADATA, +} = zlibUtil; + +export const owner_symbol = Symbol('owner'); + +const FLUSH_BOUND_IDX_NORMAL: number = 0; +const FLUSH_BOUND_IDX_BROTLI: number = 1; +const FLUSH_BOUND: [[number, number], [number, number]] = [ + [CONST_Z_NO_FLUSH, CONST_Z_BLOCK], + [CONST_BROTLI_OPERATION_PROCESS, CONST_BROTLI_OPERATION_EMIT_METADATA], +]; + +const kFlushFlag = Symbol('kFlushFlag'); +const kError = Symbol('kError'); + +function processCallback(this: zlibUtil.ZlibStream): void { + // This callback's context (`this`) is the `_handle` (ZCtx) object. It is + // important to null out the values once they are no longer needed since + // `_handle` can stay in memory long after the buffer is needed. + // eslint-disable-next-line @typescript-eslint/no-this-alias + const handle = this; + const self = this[owner_symbol]; + const state = self._writeState; + + if (self.destroyed) { + this.buffer = null; + this.cb(); + return; + } + + const [availOutAfter, availInAfter] = state as unknown as [number, number]; + + const inDelta = handle.availInBefore - availInAfter; + self.bytesWritten += inDelta; + + const have = handle.availOutBefore - availOutAfter; + let streamBufferIsFull = false; + if (have > 0) { + const out = self._outBuffer.slice(self._outOffset, self._outOffset + have); + self._outOffset += have; + streamBufferIsFull = !self.push(out); + } else { + assert.strictEqual(have, 0, 'have should not go down'); + } + + if (self.destroyed) { + this.cb(); + return; + } + + // Exhausted the output buffer, or used all the input create a new one. + if (availOutAfter === 0 || self._outOffset >= self._chunkSize) { + handle.availOutBefore = self._chunkSize; + self._outOffset = 0; + self._outBuffer = Buffer.allocUnsafe(self._chunkSize); + } + + if (availOutAfter === 0) { + // Not actually done. Need to reprocess. + // Also, update the availInBefore to the availInAfter value, + // so that if we have to hit it a third (fourth, etc.) time, + // it'll have the correct byte counts. + handle.inOff += inDelta; + handle.availInBefore = availInAfter; + + if (!streamBufferIsFull) { + this.write( + handle.flushFlag, + this.buffer as NodeJS.TypedArray, // in + handle.inOff, // in_off + handle.availInBefore, // in_len + self._outBuffer, // out + self._outOffset, // out_off + self._chunkSize + ); // out_len + } else { + // eslint-disable-next-line @typescript-eslint/unbound-method + const oldRead = self._read; + self._read = (n): void => { + self._read = oldRead; + this.write( + handle.flushFlag, + this.buffer as NodeJS.TypedArray, // in + handle.inOff, // in_off + handle.availInBefore, // in_len + self._outBuffer, // out + self._outOffset, // out_off + self._chunkSize + ); // out_len + self._read(n); + }; + } + return; + } + + if (availInAfter > 0) { + // If we have more input that should be written, but we also have output + // space available, that means that the compression library was not + // interested in receiving more data, and in particular that the input + // stream has ended early. + // This applies to streams where we don't check data past the end of + // what was consumed; that is, everything except Gunzip/Unzip. + self.push(null); + } + + // Finished with the chunk. + this.buffer = null; + this.cb(); +} + +// If a flush is scheduled while another flush is still pending, a way to figure +// out which one is the "stronger" flush is needed. +// This is currently only used to figure out which flush flag to use for the +// last chunk. +// Roughly, the following holds: +// Z_NO_FLUSH (< Z_TREES) < Z_BLOCK < Z_PARTIAL_FLUSH < +// Z_SYNC_FLUSH < Z_FULL_FLUSH < Z_FINISH +const flushiness: number[] = []; +const kFlushFlagList = [ + CONST_Z_NO_FLUSH, + CONST_Z_BLOCK, + CONST_Z_PARTIAL_FLUSH, + CONST_Z_SYNC_FLUSH, + CONST_Z_FULL_FLUSH, + CONST_Z_FINISH, +]; +for (let i = 0; i < kFlushFlagList.length; i++) { + flushiness[kFlushFlagList[i] as number] = i; +} + +type BufferWithFlushFlag = Buffer & { [kFlushFlag]: number }; + +// Set up a list of 'special' buffers that can be written using .write() +// from the .flush() code as a way of introducing flushing operations into the +// write sequence. +const kFlushBuffers: BufferWithFlushFlag[] = []; +{ + const dummyArrayBuffer = new ArrayBuffer(0); + for (const flushFlag of kFlushFlagList) { + const buf = Buffer.from(dummyArrayBuffer) as BufferWithFlushFlag; + buf[kFlushFlag] = flushFlag; + kFlushBuffers[flushFlag] = buf; + } +} + +function zlibOnError( + this: zlibUtil.ZlibStream, + errno: number, + code: string, + message: string +): void { + const self = this[owner_symbol]; + const error = new NodeError(code, message); + // @ts-expect-error Err number is expected. + error.errno = errno; + self.destroy(error); + self[kError] = error; +} + +function processChunkSync( + self: Zlib, + chunk: Buffer, + flushFlag: number +): Buffer | Uint8Array { + let availInBefore = chunk.byteLength; + let availOutBefore = self._chunkSize - self._outOffset; + let inOff = 0; + let availOutAfter; + let availInAfter; + + const buffers: (Buffer | Uint8Array)[] = []; + let nread = 0; + let inputRead = 0; + const state = self._writeState; + const handle = self._handle; + let buffer = self._outBuffer; + let offset = self._outOffset; + const chunkSize = self._chunkSize; + + let error; + self.on('error', function onError(er) { + error = er; + }); + + while (true) { + // TODO(soon): This was `writeSync` before, but it's not anymore. + handle?.write( + flushFlag, + chunk, // in + inOff, // in_off + availInBefore, // in_len + buffer, // out + offset, // out_off + availOutBefore + ); // out_len + if (error) throw error; + else if (self[kError]) throw self[kError]; + + [availOutAfter, availInAfter] = state as unknown as [number, number]; + + const inDelta = availInBefore - availInAfter; + inputRead += inDelta; + + const have = availOutBefore - availOutAfter; + if (have > 0) { + const out = buffer.slice(offset, offset + have); + offset += have; + buffers.push(out); + nread += out.byteLength; + + if (nread > self._maxOutputLength) { + _close(self); + throw new ERR_BUFFER_TOO_LARGE(self._maxOutputLength); + } + } else { + assert.strictEqual(have, 0, 'have should not go down'); + } + + // Exhausted the output buffer, or used all the input create a new one. + if (availOutAfter === 0 || offset >= chunkSize) { + availOutBefore = chunkSize; + offset = 0; + buffer = Buffer.allocUnsafe(chunkSize); + } + + if (availOutAfter === 0) { + // Not actually done. Need to reprocess. + // Also, update the availInBefore to the availInAfter value, + // so that if we have to hit it a third (fourth, etc.) time, + // it'll have the correct byte counts. + inOff += inDelta; + availInBefore = availInAfter; + } else { + break; + } + } + + self.bytesWritten = inputRead; + _close(self); + + if (nread === 0) return Buffer.alloc(0); + + return buffers.length === 1 + ? (buffers[0] as Buffer) + : Buffer.concat(buffers, nread); +} + +function _close(engine: ZlibBase): void { + engine._handle?.close(); + engine._handle = undefined; +} + +type ZlibDefaultOptions = { + flush: number; + finishFlush: number; + fullFlush: number; +}; + +const zlibDefaultOptions = { + flush: CONST_Z_NO_FLUSH, + finishFlush: CONST_Z_FINISH, + fullFlush: CONST_Z_FULL_FLUSH, +}; + +export class ZlibBase extends Transform { + public bytesWritten: number = 0; + + public _maxOutputLength: number; + public _outBuffer: Buffer; + public _outOffset: number = 0; + public _chunkSize: number; + public _defaultFlushFlag: number | undefined; + public _finishFlushFlag: number | undefined; + public _defaultFullFlushFlag: number | undefined; + public _info: unknown; + public _handle: zlibUtil.ZlibStream | undefined; + public _writeState = new Uint32Array(2); + + public [kError]: NodeError | undefined; + + public constructor( + opts: ZlibOptions & DuplexOptions, + mode: number, + handle: zlibUtil.ZlibStream, + { flush, finishFlush, fullFlush }: ZlibDefaultOptions = zlibDefaultOptions + ) { + let chunkSize = CONST_Z_DEFAULT_CHUNK; + let maxOutputLength = kMaxLength; + + let flushBoundIdx; + if (mode !== CONST_BROTLI_ENCODE && mode !== CONST_BROTLI_DECODE) { + flushBoundIdx = FLUSH_BOUND_IDX_NORMAL; + } else { + flushBoundIdx = FLUSH_BOUND_IDX_BROTLI; + } + + if (opts) { + if (opts.chunkSize != null) { + chunkSize = opts.chunkSize; + } + if (!checkFiniteNumber(chunkSize, 'options.chunkSize')) { + chunkSize = CONST_Z_DEFAULT_CHUNK; + } else if (chunkSize < CONST_Z_MIN_CHUNK) { + throw new ERR_OUT_OF_RANGE( + 'options.chunkSize', + `>= ${CONST_Z_MIN_CHUNK}`, + chunkSize + ); + } + + flush = checkRangesOrGetDefault( + opts.flush, + 'options.flush', + FLUSH_BOUND[flushBoundIdx]?.[0] as number, + FLUSH_BOUND[flushBoundIdx]?.[1] as number, + flush + ); + + finishFlush = checkRangesOrGetDefault( + opts.finishFlush, + 'options.finishFlush', + FLUSH_BOUND[flushBoundIdx]?.[0] as number, + FLUSH_BOUND[flushBoundIdx]?.[1] as number, + finishFlush + ); + + maxOutputLength = checkRangesOrGetDefault( + opts.maxOutputLength, + 'options.maxOutputLength', + 1, + kMaxLength, + kMaxLength + ); + + if (opts.encoding || opts.objectMode || opts.writableObjectMode) { + opts = { ...opts }; + opts.encoding = undefined; + opts.objectMode = false; + opts.writableObjectMode = false; + } + } + + // @ts-expect-error TODO: Find a way to avoid having "unknown" + super({ autoDestroy: true, ...opts } as unknown); + + // Error handler by processCallback() and zlibOnError() + handle.setErrorHandler(zlibOnError); + handle[owner_symbol] = this as never; + this._handle = handle; + this._outBuffer = Buffer.allocUnsafe(chunkSize); + this._outOffset = 0; + this._chunkSize = chunkSize; + this._defaultFlushFlag = flush; + this._finishFlushFlag = finishFlush; + this._defaultFullFlushFlag = fullFlush; + this._info = opts && opts.info; + this._maxOutputLength = maxOutputLength; + } + + public get _closed(): boolean { + return this._handle == null; + } + + // @deprecated Use `bytesWritten` instead. + public get bytesRead(): number { + return this.bytesWritten; + } + + // @deprecated Use `bytesWritten` instead. + public set bytesRead(value: number) { + this.bytesWritten = value; + } + + public reset(): void { + ok(this._handle, 'zlib binding closed'); + this._handle.reset(); + } + + // This is the _flush function called by the transform class, + // internally, when the last chunk has been written. + public override _flush(callback: () => void): void { + this._transform(Buffer.alloc(0), 'utf8', callback); + } + + // Force Transform compat behavior. + public override _final(callback: () => void): void { + callback(); + } + + public flush(kind: number, callback: () => void): void; + public flush(callback?: () => void): void; + public flush( + kind?: number | (() => void), + callback: (() => void) | undefined = undefined + ): void { + if (typeof kind === 'function' || (kind == null && !callback)) { + callback = kind as (() => void) | undefined; + kind = this._defaultFlushFlag as number; + } + + if (this.writableFinished) { + if (callback) { + /* eslint-disable-next-line @typescript-eslint/no-unsafe-call */ + queueMicrotask(callback); + } + } else if (this.writableEnded) { + if (callback) { + /* eslint-disable-next-line @typescript-eslint/no-unsafe-call */ + queueMicrotask(callback); + } + } else { + this.write(kFlushBuffers[kind as number], 'utf8', callback); + } + } + + public close(callback?: () => void): void { + if (callback) { + finished(this, callback); + } + this.destroy(); + } + + public override _destroy( + err: T, + callback: (err: T) => never + ): void { + _close(this); + callback(err); + } + + public override _transform( + chunk: Buffer & { [kFlushFlag]?: number }, + _encoding: BufferEncoding, + cb: () => void + ): void { + let flushFlag = this._defaultFlushFlag; + // We use a 'fake' zero-length chunk to carry information about flushes from + // the public API to the actual stream implementation. + if (typeof chunk[kFlushFlag] === 'number') { + flushFlag = chunk[kFlushFlag]; + } + + // For the last chunk, also apply `_finishFlushFlag`. + if (this.writableEnded && this.writableLength === chunk.byteLength) { + flushFlag = maxFlush( + flushFlag as number, + this._finishFlushFlag as number + ); + } + this.#processChunk(chunk, flushFlag as number, cb); + } + + // This function is left for backwards compatibility. + public _processChunk( + chunk: Buffer, + flushFlag: number, + cb?: () => void + ): Buffer | Uint8Array | undefined { + if (cb != null && typeof cb === 'function') { + this.#processChunk(chunk, flushFlag, cb); + return; + } + return processChunkSync(this as never, chunk, flushFlag); + } + + #processChunk(chunk: Buffer, flushFlag: number, cb: () => void): void { + if (this._handle == null) { + /* eslint-disable-next-line @typescript-eslint/no-unsafe-call */ + queueMicrotask(cb); + return; + } + + this._handle.buffer = null; + this._handle.cb = cb; + this._handle.availOutBefore = this._chunkSize - this._outOffset; + this._handle.availInBefore = chunk.byteLength; + this._handle.flushFlag = flushFlag; + + this._handle.write( + flushFlag, + chunk, // in + 0, // in_off + this._handle.availInBefore, // in_len + this._outBuffer, // out + this._outOffset, // out_off + this._handle.availOutBefore // out_len + ); + } +} + +function maxFlush(a: number, b: number): number { + return (flushiness[a] as number) > (flushiness[b] as number) ? a : b; +} + +export class Zlib extends ZlibBase { + public _level = CONST_Z_DEFAULT_COMPRESSION; + public _strategy = CONST_Z_DEFAULT_STRATEGY; + + public constructor(options: ZlibOptions | null | undefined, mode: number) { + let windowBits = CONST_Z_DEFAULT_WINDOWBITS; + let level = CONST_Z_DEFAULT_COMPRESSION; + let memLevel = CONST_Z_DEFAULT_MEMLEVEL; + let strategy = CONST_Z_DEFAULT_STRATEGY; + let dictionary: ZlibOptions['dictionary']; + + if (options != null) { + // Special case: + // - Compression: 0 is an invalid case. + // - Decompression: 0 indicates zlib to use the window size in the header of the compressed stream. + if ( + (options.windowBits == null || options.windowBits === 0) && + (mode === CONST_INFLATE || + mode === CONST_GUNZIP || + mode === CONST_UNZIP) + ) { + windowBits = 0; + } else { + // `{ windowBits: 8 }` is valid for DEFLATE but not for GZIP. + const min = + zlibUtil.CONST_Z_MIN_WINDOWBITS + (mode === CONST_GZIP ? 1 : 0); + windowBits = checkRangesOrGetDefault( + options.windowBits, + 'options.windowBits', + min, + CONST_Z_MAX_WINDOWBITS, + CONST_Z_DEFAULT_WINDOWBITS + ); + } + + level = checkRangesOrGetDefault( + options.level, + 'options.level', + CONST_Z_MIN_LEVEL, + CONST_Z_MAX_LEVEL, + CONST_Z_DEFAULT_COMPRESSION + ); + memLevel = checkRangesOrGetDefault( + options.memLevel, + 'options.memLevel', + CONST_Z_MIN_MEMLEVEL, + CONST_Z_MAX_MEMLEVEL, + CONST_Z_DEFAULT_MEMLEVEL + ); + strategy = checkRangesOrGetDefault( + options.strategy, + 'options.strategy', + CONST_Z_DEFAULT_STRATEGY, + CONST_Z_FIXED, + CONST_Z_DEFAULT_STRATEGY + ); + dictionary = options.dictionary; + + if (dictionary != null && !isArrayBufferView(dictionary)) { + if (isAnyArrayBuffer(dictionary)) { + dictionary = Buffer.from(dictionary); + } else { + throw new ERR_INVALID_ARG_TYPE( + 'options.dictionary', + ['Buffer', 'TypedArray', 'DataView', 'ArrayBuffer'], + dictionary + ); + } + } + } + + const writeState = new Uint32Array(2); + const handle = new zlibUtil.ZlibStream(mode); + handle.initialize( + windowBits, + level, + memLevel, + strategy, + writeState, + processCallback, + dictionary + ); + super(options ?? {}, mode, handle); + handle[owner_symbol] = this; + this._level = level; + this._strategy = strategy; + this._handle = handle; + this._writeState = writeState; + } + + public params(level: number, strategy: number, callback: () => never): void { + checkRangesOrGetDefault( + level, + 'level', + CONST_Z_MIN_LEVEL, + CONST_Z_MAX_LEVEL + ); + checkRangesOrGetDefault( + strategy, + 'strategy', + CONST_Z_DEFAULT_STRATEGY, + CONST_Z_FIXED + ); + + if (this._level !== level || this._strategy !== strategy) { + this.flush( + CONST_Z_SYNC_FLUSH, + this.#paramsAfterFlushCallback.bind(this, level, strategy, callback) + ); + } else { + /* eslint-disable-next-line @typescript-eslint/no-unsafe-call */ + queueMicrotask(callback); + } + } + + // This callback is used by `.params()` to wait until a full flush happened + // before adjusting the parameters. In particular, the call to the native + // `params()` function should not happen while a write is currently in progress + // on the threadpool. + #paramsAfterFlushCallback( + level: number, + strategy: number, + callback: () => void + ): void { + ok(this._handle, 'zlib binding closed'); + this._handle.params(level, strategy); + if (!this.destroyed) { + this._level = level; + this._strategy = strategy; + callback?.(); + } + } +} diff --git a/src/node/internal/streams_util.d.ts b/src/node/internal/streams_util.d.ts new file mode 100644 index 00000000000..8f3a8f7ee19 --- /dev/null +++ b/src/node/internal/streams_util.d.ts @@ -0,0 +1,16 @@ +/* eslint-disable @typescript-eslint/no-redundant-type-constituents */ +import type { FinishedOptions } from 'node:stream'; + +type FinishedStream = + | NodeJS.ReadableStream + | NodeJS.WritableStream + | NodeJS.ReadWriteStream; +type FinishedCallback = (err?: NodeJS.ErrnoException | null) => void; + +export function eos(stream: FinishedStream, options: FinishedOptions): void; +export function eos( + stream: FinishedStream, + options: FinishedOptions, + callback?: FinishedCallback +): void; +export function eos(stream: FinishedStream, callback?: FinishedCallback): void; diff --git a/src/node/internal/validators.ts b/src/node/internal/validators.ts index 79c67e1e132..d4e5bb6c2ba 100644 --- a/src/node/internal/validators.ts +++ b/src/node/internal/validators.ts @@ -209,6 +209,67 @@ export function validateArray(value: unknown, name: string, minLength = 0) { } } +// 1. Returns false for undefined and NaN +// 2. Returns true for finite numbers +// 3. Throws ERR_INVALID_ARG_TYPE for non-numbers +// 4. Throws ERR_OUT_OF_RANGE for infinite numbers +export function checkFiniteNumber( + number: unknown, + name: string +): number is number { + // Common case + if (number === undefined) { + return false; + } + + if (Number.isFinite(number)) { + return true; // Is a valid number + } + + if (Number.isNaN(number)) { + return false; + } + + validateNumber(number, name); + + // Infinite numbers + throw new ERR_OUT_OF_RANGE(name, 'a finite number', number); +} + +// 1. Returns def for number when it's undefined or NaN +// 2. Returns number for finite numbers >= lower and <= upper +// 3. Throws ERR_INVALID_ARG_TYPE for non-numbers +// 4. Throws ERR_OUT_OF_RANGE for infinite numbers or numbers > upper or < lower +export function checkRangesOrGetDefault( + number: unknown, + name: string, + lower: number, + upper: number, + def: number +): number; +export function checkRangesOrGetDefault( + number: unknown, + name: string, + lower: number, + upper: number, + def?: number | undefined +): number | undefined; +export function checkRangesOrGetDefault( + number: unknown, + name: string, + lower: number, + upper: number, + def: number | undefined = undefined +): number | undefined { + if (!checkFiniteNumber(number, name)) { + return def; + } + if (number < lower || number > upper) { + throw new ERR_OUT_OF_RANGE(name, `>= ${lower} and <= ${upper}`, number); + } + return number; +} + export default { isInt32, isUint32, @@ -224,4 +285,8 @@ export default { validateOneOf, validateString, validateUint32, + + // Zlib specific + checkFiniteNumber, + checkRangesOrGetDefault, }; diff --git a/src/node/internal/zlib.d.ts b/src/node/internal/zlib.d.ts index 247e34aa816..ce050f1110b 100644 --- a/src/node/internal/zlib.d.ts +++ b/src/node/internal/zlib.d.ts @@ -1,3 +1,5 @@ +import { owner_symbol, type Zlib } from 'node-internal:internal_zlib_base'; + export function crc32(data: ArrayBufferView, value: number): number; // zlib.constants (part of the API contract for node:zlib) @@ -113,3 +115,57 @@ export const CONST_BROTLI_DECODER_ERROR_ALLOC_RING_BUFFER_1: number; export const CONST_BROTLI_DECODER_ERROR_ALLOC_RING_BUFFER_2: number; export const CONST_BROTLI_DECODER_ERROR_ALLOC_BLOCK_TYPE_TREES: number; export const CONST_BROTLI_DECODER_ERROR_UNREACHABLE: number; + +export interface ZlibOptions { + flush?: number | undefined; + finishFlush?: number | undefined; + chunkSize?: number | undefined; + windowBits?: number | undefined; + level?: number | undefined; // compression only + memLevel?: number | undefined; // compression only + strategy?: number | undefined; // compression only + dictionary?: ArrayBufferView | undefined; // deflate/inflate only, empty dictionary by default + info?: boolean | undefined; + maxOutputLength?: number | undefined; +} + +type ErrorHandler = (errno: number, code: string, message: string) => void; +type ProcessHandler = () => void; + +export class ZlibStream { + public [owner_symbol]: Zlib; + + // Not used by C++ implementation but required to be Node.js compatible. + public inOff: number; + /* eslint-disable-next-line @typescript-eslint/no-redundant-type-constituents */ + public buffer: NodeJS.TypedArray | null; + public cb: () => void; + public availOutBefore: number; + public availInBefore: number; + public flushFlag: number; + + public constructor(mode: number); + public initialize( + windowBits: number, + level: number, + memLevel: number, + strategy: number, + writeState: NodeJS.TypedArray, + processCallback: ProcessHandler, + dictionary: ZlibOptions['dictionary'] + ): void; + public close(): void; + public write( + flushFlag: number, + inputBuffer: NodeJS.TypedArray, + inputOffset: number, + inputLength: number, + outputBuffer: NodeJS.TypedArray, + outputOffset: number, + outputLength: number + ): void; + public params(level: number, strategy: number): void; + public reset(): void; + + public setErrorHandler(cb: ErrorHandler): void; +} diff --git a/src/node/zlib.ts b/src/node/zlib.ts index ec400f88d0c..efecda82e4c 100644 --- a/src/node/zlib.ts +++ b/src/node/zlib.ts @@ -1,8 +1,76 @@ +import * as zlib from 'node-internal:internal_zlib'; import { crc32, constants } from 'node-internal:internal_zlib'; +import { default as compatFlags } from 'workerd:compatibility-flags'; -export { crc32, constants }; +const { nodeJsZlib } = compatFlags; + +function protectMethod(method: unknown): unknown { + if (!nodeJsZlib) { + return function notImplemented() { + throw new Error('Compatibility flag "nodejs_zlib" is not enabled'); + }; + } + + return method; +} + +const Gzip = protectMethod(zlib.Gzip); +const Gunzip = protectMethod(zlib.Gunzip); +const Deflate = protectMethod(zlib.Deflate); +const DeflateRaw = protectMethod(zlib.DeflateRaw); +const Inflate = protectMethod(zlib.Inflate); +const InflateRaw = protectMethod(zlib.InflateRaw); +const Unzip = protectMethod(zlib.Unzip); +const createGzip = protectMethod(zlib.createGzip); +const createGunzip = protectMethod(zlib.createGunzip); +const createDeflate = protectMethod(zlib.createDeflate); +const createDeflateRaw = protectMethod(zlib.createDeflateRaw); +const createInflate = protectMethod(zlib.createInflate); +const createInflateRaw = protectMethod(zlib.createInflateRaw); +const createUnzip = protectMethod(zlib.createUnzip); + +export { + crc32, + constants, + + // Classes + Gzip, + Gunzip, + Deflate, + DeflateRaw, + Inflate, + InflateRaw, + Unzip, + + // Convenience methods to create classes + createGzip, + createGunzip, + createDeflate, + createDeflateRaw, + createInflate, + createInflateRaw, + createUnzip, +}; export default { crc32, constants, + + // Classes + Gzip, + Gunzip, + Deflate, + DeflateRaw, + Inflate, + InflateRaw, + Unzip, + + // Convenience methods to create classes + createGzip, + createGunzip, + createDeflate, + createDeflateRaw, + createInflate, + createInflateRaw, + createUnzip, }; diff --git a/src/workerd/api/node/tests/zlib-nodejs-test.js b/src/workerd/api/node/tests/zlib-nodejs-test.js index e9ac7714a66..0cc072a3505 100644 --- a/src/workerd/api/node/tests/zlib-nodejs-test.js +++ b/src/workerd/api/node/tests/zlib-nodejs-test.js @@ -1,6 +1,11 @@ -import { strictEqual, throws, deepStrictEqual } from 'node:assert'; +import { + strictEqual, + throws, + deepStrictEqual, + ok as assert, +} from 'node:assert'; import { Buffer } from 'node:buffer'; -import { crc32, constants } from 'node:zlib'; +import zlib from 'node:zlib'; // The following test data comes from // https://github.com/zlib-ng/zlib-ng/blob/5401b24/test/test_crc32.cc @@ -210,23 +215,23 @@ export const crc32Test = { const buf = Buffer.from(data, 'utf8'); strictEqual(buf.length, len); strictEqual( - crc32(buf, crc), + zlib.crc32(buf, crc), expected, `crc32('${data}', ${crc}) in buffer is not ${expected}` ); strictEqual( - crc32(buf.toString(), crc), + zlib.crc32(buf.toString(), crc), expected, `crc32('${data}', ${crc}) in string is not ${expected}` ); if (crc === 0) { strictEqual( - crc32(buf), + zlib.crc32(buf), expected, `crc32('${data}') in buffer is not ${expected}` ); strictEqual( - crc32(buf.toString()), + zlib.crc32(buf.toString()), expected, `crc32('${data}') in string is not ${expected}` ); @@ -236,7 +241,7 @@ export const crc32Test = { [undefined, null, true, 1, () => {}, {}].forEach((invalid) => { throws( () => { - crc32(invalid); + zlib.crc32(invalid); }, { code: 'ERR_INVALID_ARG_TYPE' } ); @@ -245,7 +250,7 @@ export const crc32Test = { [null, true, () => {}, {}].forEach((invalid) => { throws( () => { - crc32('test', invalid); + zlib.crc32('test', invalid); }, { code: 'ERR_INVALID_ARG_TYPE' } ); @@ -255,7 +260,7 @@ export const crc32Test = { export const constantsTest = { test() { - deepStrictEqual(Object.keys(constants).sort(), [ + deepStrictEqual(Object.keys(zlib.constants).sort(), [ 'BROTLI_DECODE', 'BROTLI_DECODER_ERROR_ALLOC_BLOCK_TYPE_TREES', 'BROTLI_DECODER_ERROR_ALLOC_CONTEXT_MAP', @@ -366,3 +371,299 @@ export const constantsTest = { ]); }, }; + +// Tests are taken from: +// https://github.com/nodejs/node/blob/561bc87c7607208f0d3db6dcd9231efeb48cfe2f/test/parallel/test-zlib-zero-windowBits.js +export const testZeroWindowBits = { + test() { + // windowBits is a special case in zlib. On the compression side, 0 is invalid. + // On the decompression side, it indicates that zlib should use the value from + // the header of the compressed stream. + { + const inflate = zlib.createInflate({ windowBits: 0 }); + assert(inflate instanceof zlib.Inflate); + } + + { + const gunzip = zlib.createGunzip({ windowBits: 0 }); + assert(gunzip instanceof zlib.Gunzip); + } + + { + const unzip = zlib.createUnzip({ windowBits: 0 }); + assert(unzip instanceof zlib.Unzip); + } + + throws(() => zlib.createGzip({ windowBits: 0 }), { + code: 'ERR_OUT_OF_RANGE', + name: 'RangeError', + }); + }, +}; + +// Tests are taken from: +// https://github.com/nodejs/node/blob/561bc87c7607208f0d3db6dcd9231efeb48cfe2f/test/parallel/test-zlib-create-raw.js +export const testCreateRaw = { + test() { + { + const inflateRaw = zlib.createInflateRaw(); + assert(inflateRaw instanceof zlib.InflateRaw); + } + + { + const deflateRaw = zlib.createDeflateRaw(); + assert(deflateRaw instanceof zlib.DeflateRaw); + } + }, +}; + +// Tests are taken from: +// https://github.com/nodejs/node/blob/561bc87c7607208f0d3db6dcd9231efeb48cfe2f/test/parallel/test-zlib-deflate-constructors.js +export const testDeflateConstructors = { + test() { + assert(new zlib.Deflate() instanceof zlib.Deflate); + assert(new zlib.DeflateRaw() instanceof zlib.DeflateRaw); + + // Throws if `options.chunkSize` is invalid + throws(() => new zlib.Deflate({ chunkSize: 'test' }), { + code: 'ERR_INVALID_ARG_TYPE', + name: 'TypeError', + }); + + throws(() => new zlib.Deflate({ chunkSize: -Infinity }), { + code: 'ERR_OUT_OF_RANGE', + name: 'RangeError', + }); + + throws(() => new zlib.Deflate({ chunkSize: 0 }), { + code: 'ERR_OUT_OF_RANGE', + name: 'RangeError', + }); + + // Throws if `options.windowBits` is invalid + throws(() => new zlib.Deflate({ windowBits: 'test' }), { + code: 'ERR_INVALID_ARG_TYPE', + name: 'TypeError', + }); + + throws(() => new zlib.Deflate({ windowBits: -Infinity }), { + code: 'ERR_OUT_OF_RANGE', + name: 'RangeError', + }); + + throws(() => new zlib.Deflate({ windowBits: Infinity }), { + code: 'ERR_OUT_OF_RANGE', + name: 'RangeError', + }); + + throws(() => new zlib.Deflate({ windowBits: 0 }), { + code: 'ERR_OUT_OF_RANGE', + name: 'RangeError', + }); + + // Throws if `options.level` is invalid + throws(() => new zlib.Deflate({ level: 'test' }), { + code: 'ERR_INVALID_ARG_TYPE', + name: 'TypeError', + }); + + throws(() => new zlib.Deflate({ level: -Infinity }), { + code: 'ERR_OUT_OF_RANGE', + name: 'RangeError', + }); + + throws(() => new zlib.Deflate({ level: Infinity }), { + code: 'ERR_OUT_OF_RANGE', + name: 'RangeError', + }); + + throws(() => new zlib.Deflate({ level: -2 }), { + code: 'ERR_OUT_OF_RANGE', + name: 'RangeError', + }); + + // Throws if `level` invalid in `Deflate.prototype.params()` + throws(() => new zlib.Deflate().params('test'), { + code: 'ERR_INVALID_ARG_TYPE', + name: 'TypeError', + }); + + throws(() => new zlib.Deflate().params(-Infinity), { + code: 'ERR_OUT_OF_RANGE', + name: 'RangeError', + }); + + throws(() => new zlib.Deflate().params(Infinity), { + code: 'ERR_OUT_OF_RANGE', + name: 'RangeError', + }); + + throws(() => new zlib.Deflate().params(-2), { + code: 'ERR_OUT_OF_RANGE', + name: 'RangeError', + }); + + // Throws if options.memLevel is invalid + throws(() => new zlib.Deflate({ memLevel: 'test' }), { + code: 'ERR_INVALID_ARG_TYPE', + name: 'TypeError', + }); + + throws(() => new zlib.Deflate({ memLevel: -Infinity }), { + code: 'ERR_OUT_OF_RANGE', + name: 'RangeError', + }); + + throws(() => new zlib.Deflate({ memLevel: Infinity }), { + code: 'ERR_OUT_OF_RANGE', + name: 'RangeError', + }); + + throws(() => new zlib.Deflate({ memLevel: -2 }), { + code: 'ERR_OUT_OF_RANGE', + name: 'RangeError', + }); + + // Does not throw if opts.strategy is valid + new zlib.Deflate({ strategy: zlib.constants.Z_FILTERED }); + new zlib.Deflate({ strategy: zlib.constants.Z_HUFFMAN_ONLY }); + new zlib.Deflate({ strategy: zlib.constants.Z_RLE }); + new zlib.Deflate({ strategy: zlib.constants.Z_FIXED }); + new zlib.Deflate({ strategy: zlib.constants.Z_DEFAULT_STRATEGY }); + + // Throws if options.strategy is invalid + throws(() => new zlib.Deflate({ strategy: 'test' }), { + code: 'ERR_INVALID_ARG_TYPE', + name: 'TypeError', + }); + + throws(() => new zlib.Deflate({ strategy: -Infinity }), { + code: 'ERR_OUT_OF_RANGE', + name: 'RangeError', + }); + + throws(() => new zlib.Deflate({ strategy: Infinity }), { + code: 'ERR_OUT_OF_RANGE', + name: 'RangeError', + }); + + throws(() => new zlib.Deflate({ strategy: -2 }), { + code: 'ERR_OUT_OF_RANGE', + name: 'RangeError', + }); + + // Throws TypeError if `strategy` is invalid in `Deflate.prototype.params()` + throws(() => new zlib.Deflate().params(0, 'test'), { + code: 'ERR_INVALID_ARG_TYPE', + name: 'TypeError', + }); + + throws(() => new zlib.Deflate().params(0, -Infinity), { + code: 'ERR_OUT_OF_RANGE', + name: 'RangeError', + }); + + throws(() => new zlib.Deflate().params(0, Infinity), { + code: 'ERR_OUT_OF_RANGE', + name: 'RangeError', + }); + + throws(() => new zlib.Deflate().params(0, -2), { + code: 'ERR_OUT_OF_RANGE', + name: 'RangeError', + }); + + // Throws if opts.dictionary is not a Buffer + throws(() => new zlib.Deflate({ dictionary: 'not a buffer' }), { + code: 'ERR_INVALID_ARG_TYPE', + name: 'TypeError', + }); + }, +}; + +// Tests are taken from: +// https://github.com/nodejs/node/blob/561bc87c7607208f0d3db6dcd9231efeb48cfe2f/test/parallel/test-zlib-failed-init.js +export const testFailedInit = { + test() { + assert.throws(() => zlib.createGzip({ chunkSize: 0 }), { + code: 'ERR_OUT_OF_RANGE', + name: 'RangeError', + }); + + assert.throws(() => zlib.createGzip({ windowBits: 0 }), { + code: 'ERR_OUT_OF_RANGE', + name: 'RangeError', + }); + + assert.throws(() => zlib.createGzip({ memLevel: 0 }), { + code: 'ERR_OUT_OF_RANGE', + name: 'RangeError', + }); + + { + const stream = zlib.createGzip({ level: NaN }); + assert.strictEqual(stream._level, zlib.constants.Z_DEFAULT_COMPRESSION); + } + + { + const stream = zlib.createGzip({ strategy: NaN }); + assert.strictEqual(stream._strategy, zlib.constants.Z_DEFAULT_STRATEGY); + } + }, +}; + +// Node.js tests relevant to zlib +// +// - [ ] test-zlib-brotli-16GB.js +// - [ ] test-zlib-convenience-methods.js +// - [ ] test-zlib-flush-drain.js +// - [ ] test-zlib-invalid-input-memory.js +// - [ ] test-zlib-sync-no-event.js +// - [ ] test-zlib-brotli-flush.js +// - [x] test-zlib-crc32.js +// - [ ] test-zlib-flush-drain-longblock.js +// - [ ] test-zlib.js +// - [ ] test-zlib-truncated.js +// - [ ] test-zlib-brotli-from-brotli.js +// - [x] test-zlib-create-raw.js +// - [ ] test-zlib-flush-flags.js +// - [ ] test-zlib-kmaxlength-rangeerror.js +// - [ ] test-zlib-unused-weak.js +// - [ ] test-zlib-brotli-from-string.js +// - [x] test-zlib-deflate-constructors.js +// - [ ] test-zlib-flush.js +// - [ ] test-zlib-maxOutputLength.js +// - [ ] test-zlib-unzip-one-byte-chunks.js +// - [ ] test-zlib-brotli.js +// - [ ] test-zlib-deflate-raw-inherits.js +// - [ ] test-zlib-flush-write-sync-interleaved.js +// - [ ] test-zlib-no-stream.js +// - [ ] test-zlib-write-after-close.js +// - [ ] test-zlib-brotli-kmaxlength-rangeerror.js +// - [ ] test-zlib-destroy.js +// - [ ] test-zlib-from-concatenated-gzip.js +// - [ ] test-zlib-not-string-or-buffer.js +// - [ ] test-zlib-write-after-end.js +// - [ ] test-zlib-bytes-read.js +// - [ ] test-zlib-destroy-pipe.js +// - [ ] test-zlib-from-gzip.js +// - [ ] test-zlib-object-write.js +// - [ ] test-zlib-write-after-flush.js +// - [ ] test-zlib-close-after-error.js +// - [ ] test-zlib-dictionary-fail.js +// - [ ] test-zlib-from-gzip-with-trailing-garbage.js +// - [ ] test-zlib-params.js +// - [ ] test-zlib-zero-byte.js +// - [ ] test-zlib-close-after-write.js +// - [ ] test-zlib-dictionary.js +// - [ ] test-zlib-from-string.js +// - [ ] test-zlib-premature-end.js +// - [x] test-zlib-zero-windowBits.js +// - [ ] test-zlib-close-in-ondata.js +// - [ ] test-zlib-empty-buffer.js +// - [ ] test-zlib-invalid-arg-value-brotli-compress.js +// - [ ] test-zlib-random-byte-pipes.js +// - [ ] test-zlib-const.js +// - [x] test-zlib-failed-init.js +// - [ ] test-zlib-invalid-input.js +// - [ ] test-zlib-reset-before-write.js diff --git a/src/workerd/api/node/tests/zlib-nodejs-test.wd-test b/src/workerd/api/node/tests/zlib-nodejs-test.wd-test index 14bb95faafe..0631bc2fee8 100644 --- a/src/workerd/api/node/tests/zlib-nodejs-test.wd-test +++ b/src/workerd/api/node/tests/zlib-nodejs-test.wd-test @@ -8,7 +8,7 @@ const unitTests :Workerd.Config = ( (name = "worker", esModule = embed "zlib-nodejs-test.js") ], compatibilityDate = "2023-01-15", - compatibilityFlags = ["experimental", "nodejs_compat"], + compatibilityFlags = ["experimental", "nodejs_compat", "nodejs_zlib"], ) ), ], diff --git a/src/workerd/api/node/zlib-util.c++ b/src/workerd/api/node/zlib-util.c++ index f94f1618cd5..77632c6c9b5 100644 --- a/src/workerd/api/node/zlib-util.c++ +++ b/src/workerd/api/node/zlib-util.c++ @@ -1,11 +1,453 @@ // Copyright (c) 2017-2022 Cloudflare, Inc. // Licensed under the Apache 2.0 license found in the LICENSE file or at: // https://opensource.org/licenses/Apache-2.0 +// Copyright Joyent and Node contributors. All rights reserved. MIT license. + #include "zlib-util.h" namespace workerd::api::node { + uint32_t ZlibUtil::crc32Sync(kj::Array data, uint32_t value) { // Note: Bytef is defined in zlib.h return crc32(value, reinterpret_cast(data.begin()), data.size()); } + +void ZlibContext::initialize(int _level, + int _windowBits, + int _memLevel, + int _strategy, + jsg::Optional> _dictionary) { + if (!((_windowBits == 0) && + (mode == ZlibMode::INFLATE || mode == ZlibMode::GUNZIP || mode == ZlibMode::UNZIP))) { + JSG_ASSERT(_windowBits >= Z_MIN_WINDOWBITS && _windowBits <= Z_MAX_WINDOWBITS, Error, + "Invalid windowBits"_kj); + } + + JSG_REQUIRE( + _level >= Z_MIN_LEVEL && _level <= Z_MAX_LEVEL, Error, "Invalid compression level"_kj); + JSG_REQUIRE( + _memLevel >= Z_MIN_MEMLEVEL && _memLevel <= Z_MAX_MEMLEVEL, Error, "Invalid memlevel"_kj); + JSG_REQUIRE(_strategy == Z_FILTERED || _strategy == Z_HUFFMAN_ONLY || _strategy == Z_RLE || + _strategy == Z_FIXED || _strategy == Z_DEFAULT_STRATEGY, + Error, "invalid strategy"_kj); + + level = _level; + windowBits = _windowBits; + memLevel = _memLevel; + strategy = _strategy; + flush = Z_NO_FLUSH; + + switch (mode) { + case ZlibMode::GZIP: + case ZlibMode::GUNZIP: + windowBits += 16; + break; + case ZlibMode::UNZIP: + windowBits += 32; + break; + case ZlibMode::DEFLATERAW: + case ZlibMode::INFLATERAW: + windowBits *= -1; + break; + default: + break; + } + + KJ_IF_SOME(dict, _dictionary) { + dictionary = kj::mv(dict); + } +} + +kj::Maybe ZlibContext::getError() const { + // Acceptable error states depend on the type of zlib stream. + switch (err) { + case Z_OK: + case Z_BUF_ERROR: + if (stream.avail_out != 0 && flush == Z_FINISH) { + return constructError("unexpected end of file"_kj); + } + break; + case Z_STREAM_END: + // normal statuses, not fatal + break; + case Z_NEED_DICT: + if (dictionary.empty()) + return constructError("Missing dictionary"_kj); + else + return constructError("Bad dictionary"_kj); + default: + // something else. + return constructError("Zlib error"); + } + + return {}; +} + +kj::Maybe ZlibContext::setDictionary() { + if (dictionary.empty()) { + return kj::none; + } + + switch (mode) { + case ZlibMode::DEFLATE: + case ZlibMode::DEFLATERAW: + err = deflateSetDictionary(&stream, dictionary.begin(), dictionary.size()); + ; + break; + case ZlibMode::INFLATERAW: + err = inflateSetDictionary(&stream, dictionary.begin(), dictionary.size()); + break; + default: + break; + } + + if (err != Z_OK) { + return constructError("Failed to set dictionary"_kj); + } + + return kj::none; +} + +bool ZlibContext::initializeZlib() { + if (initialized) { + return false; + } + switch (mode) { + case ZlibMode::DEFLATE: + case ZlibMode::GZIP: + case ZlibMode::DEFLATERAW: + err = deflateInit2(&stream, level, Z_DEFLATED, windowBits, memLevel, strategy); + break; + case ZlibMode::INFLATE: + case ZlibMode::GUNZIP: + case ZlibMode::INFLATERAW: + case ZlibMode::UNZIP: + err = inflateInit2(&stream, windowBits); + break; + default: + KJ_UNREACHABLE; + } + + if (err != Z_OK) { + dictionary.clear(); + mode = ZlibMode::NONE; + return true; + } + + setDictionary(); + initialized = true; + return true; +} + +kj::Maybe ZlibContext::resetStream() { + bool initialized_now = initializeZlib(); + if (initialized_now && err != Z_OK) { + return constructError("Failed to init stream before reset"); + } + switch (mode) { + case ZlibMode::DEFLATE: + case ZlibMode::DEFLATERAW: + case ZlibMode::GZIP: + err = deflateReset(&stream); + break; + case ZlibMode::INFLATE: + case ZlibMode::INFLATERAW: + case ZlibMode::GUNZIP: + err = inflateReset(&stream); + break; + default: + break; + } + + if (err != Z_OK) { + return constructError("Failed to reset stream"_kj); + } + + return setDictionary(); +} + +void ZlibContext::work() { + bool initialized_now = initializeZlib(); + if (initialized_now && err != Z_OK) { + return; + } + const Bytef* next_expected_header_byte = nullptr; + + err = Z_OK; + + // If the avail_out is left at 0, then it means that it ran out + // of room. If there was avail_out left over, then it means + // that all the input was consumed. + switch (mode) { + case ZlibMode::DEFLATE: + case ZlibMode::GZIP: + case ZlibMode::DEFLATERAW: + err = deflate(&stream, flush); + break; + case ZlibMode::UNZIP: + if (stream.avail_in > 0) { + next_expected_header_byte = stream.next_in; + } + + if (next_expected_header_byte == nullptr) { + break; + } + + switch (gzip_id_bytes_read) { + case 0: + if (*next_expected_header_byte == GZIP_HEADER_ID1) { + gzip_id_bytes_read = 1; + next_expected_header_byte++; + + if (stream.avail_in == 1) { + // The only available byte was already read. + break; + } + } else { + mode = ZlibMode::INFLATE; + break; + } + + [[fallthrough]]; + case 1: + if (*next_expected_header_byte == GZIP_HEADER_ID2) { + gzip_id_bytes_read = 2; + mode = ZlibMode::GUNZIP; + } else { + // There is no actual difference between INFLATE and INFLATERAW + // (after initialization). + mode = ZlibMode::INFLATE; + } + + break; + default: + JSG_FAIL_REQUIRE(Error, "Invalid number of gzip magic number bytes read"); + } + + [[fallthrough]]; + case ZlibMode::INFLATE: + case ZlibMode::GUNZIP: + case ZlibMode::INFLATERAW: + err = inflate(&stream, flush); + + // If data was encoded with dictionary (INFLATERAW will have it set in + // SetDictionary, don't repeat that here) + if (mode != ZlibMode::INFLATERAW && err == Z_NEED_DICT && !dictionary.empty()) { + err = inflateSetDictionary(&stream, dictionary.begin(), dictionary.size()); + if (err == Z_OK) { + // And try to decode again + err = inflate(&stream, flush); + } else if (err == Z_DATA_ERROR) { + // Both inflateSetDictionary() and inflate() return Z_DATA_ERROR. + // Make it possible for After() to tell a bad dictionary from bad + // input. + err = Z_NEED_DICT; + } + } + + while (stream.avail_in > 0 && mode == ZlibMode::GUNZIP && err == Z_STREAM_END && + stream.next_in[0] != '\0') { + // Bytes remain in input buffer. Perhaps this is another compressed + // member in the same archive, or just trailing garbage. + // Trailing zero bytes are okay, though, since they are frequently + // used for padding. + + resetStream(); + err = inflate(&stream, flush); + } + break; + default: + KJ_UNREACHABLE; + } +} + +kj::Maybe ZlibContext::setParams(int _level, int _strategy) { + bool initialized_now = initializeZlib(); + if (initialized_now && err != Z_OK) { + return constructError("Failed to init stream before set parameters"); + } + err = Z_OK; + + switch (mode) { + case ZlibMode::DEFLATE: + case ZlibMode::DEFLATERAW: + err = deflateParams(&stream, level, strategy); + break; + default: + break; + } + + if (err != Z_OK) { + return constructError("Failed to set parameters"); + } + + return kj::none; +} + +void ZlibContext::close() { + if (!initialized) { + dictionary.clear(); + mode = ZlibMode::NONE; + return; + } + + auto status = Z_OK; + switch (mode) { + case ZlibMode::DEFLATE: + case ZlibMode::DEFLATERAW: + case ZlibMode::GZIP: + status = deflateEnd(&stream); + break; + case ZlibMode::INFLATE: + case ZlibMode::INFLATERAW: + case ZlibMode::GUNZIP: + case ZlibMode::UNZIP: + status = inflateEnd(&stream); + break; + default: + KJ_UNREACHABLE; + } + + JSG_REQUIRE( + status == Z_OK || status == Z_DATA_ERROR, Error, "Uncaught error on closing zlib stream"); + mode = ZlibMode::NONE; + dictionary.clear(); +} + +void ZlibContext::setBuffers(kj::ArrayPtr input, + uint32_t inputLength, + kj::ArrayPtr output, + uint32_t outputLength) { + stream.avail_in = inputLength; + stream.next_in = input.begin(); + stream.avail_out = outputLength; + stream.next_out = output.begin(); +} + +jsg::Ref ZlibUtil::ZlibStream::constructor(ZlibModeValue mode) { + return jsg::alloc(static_cast(mode)); +} + +template +CompressionStream::~CompressionStream() noexcept(false) { + JSG_ASSERT(!writing, Error, "Writing to compression stream"_kj); + close(); +} + +template +void CompressionStream::emitError( + jsg::Lock& js, const CompressionError& error) { + KJ_IF_SOME(onError, errorHandler) { + onError(js, error.err, kj::mv(error.code), kj::mv(error.message)); + } + + writing = false; + if (pending_close) { + close(); + } +} + +template +void CompressionStream::writeStream(jsg::Lock& js, + int flush, + kj::ArrayPtr input, + uint32_t inputLength, + kj::ArrayPtr output, + uint32_t outputLength) { + JSG_REQUIRE(initialized, Error, "Writing before initializing"_kj); + JSG_REQUIRE(!closed, Error, "Already finalized"_kj); + JSG_REQUIRE(!writing, Error, "Writing is in progress"_kj); + JSG_REQUIRE(!pending_close, Error, "Pending close"_kj); + + // Ref(); + context.setBuffers(input, inputLength, output, outputLength); + context.setFlush(flush); + + // This implementation always follow the sync version. + context.work(); + if (checkError(js)) { + context.getAfterWriteOffsets(writeResult); + writing = false; + } + // Unref(); +} + +template +void CompressionStream::close() { + pending_close = writing; + if (writing) { + return; + } + closed = true; + JSG_ASSERT(initialized, Error, "Closing before initialized"_kj); + context.close(); +} + +template +bool CompressionStream::checkError(jsg::Lock& js) { + KJ_IF_SOME(error, context.getError()) { + emitError(js, kj::mv(error)); + return false; + } + return true; +} + +template +void CompressionStream::initializeStream( + kj::ArrayPtr _writeResult, jsg::Function _writeCallback) { + writeResult = kj::mv(_writeResult); + writeCallback = kj::mv(_writeCallback); + initialized = true; +} + +ZlibUtil::ZlibStream::ZlibStream(ZlibMode mode): CompressionStream() { + context.setMode(mode); +} + +void ZlibUtil::ZlibStream::initialize(int windowBits, + int level, + int memLevel, + int strategy, + kj::Array writeState, + jsg::Function writeCallback, + jsg::Optional> dictionary) { + initializeStream(writeState.asPtr(), kj::mv(writeCallback)); + context.initialize(level, windowBits, memLevel, strategy, kj::mv(dictionary)); +} + +void ZlibUtil::ZlibStream::write(jsg::Lock& js, + int flush, + kj::Array input, + int inputOffset, + int inputLength, + kj::Array output, + int outputOffset, + int outputLength) { + if (flush != Z_NO_FLUSH && flush != Z_PARTIAL_FLUSH && flush != Z_SYNC_FLUSH && + flush != Z_FULL_FLUSH && flush != Z_FINISH && flush != Z_BLOCK) { + JSG_FAIL_REQUIRE(Error, "Invalid flush value"); + } + + // Check bounds + JSG_REQUIRE(inputOffset > 0 && inputOffset < input.size(), Error, + "Offset should be smaller than size and bigger than 0"_kj); + JSG_REQUIRE(input.size() >= inputLength, Error, "Invalid inputLength"_kj); + JSG_REQUIRE(outputOffset > 0 && outputOffset < output.size(), Error, + "Offset should be smaller than size and bigger than 0"_kj); + JSG_REQUIRE(output.size() >= outputLength, Error, "Invalid outputLength"_kj); + + writeStream( + js, flush, input.slice(inputOffset), inputLength, output.slice(outputOffset), outputLength); +} + +void ZlibUtil::ZlibStream::params(int level, int strategy) { + context.setParams(level, strategy); +} + +void ZlibUtil::ZlibStream::reset(jsg::Lock& js) { + KJ_IF_SOME(error, context.resetStream()) { + emitError(js, kj::mv(error)); + } +} + } // namespace workerd::api::node diff --git a/src/workerd/api/node/zlib-util.h b/src/workerd/api/node/zlib-util.h index 56f0f707949..ecf34a2aa37 100644 --- a/src/workerd/api/node/zlib-util.h +++ b/src/workerd/api/node/zlib-util.h @@ -1,57 +1,244 @@ // Copyright (c) 2017-2022 Cloudflare, Inc. // Licensed under the Apache 2.0 license found in the LICENSE file or at: // https://opensource.org/licenses/Apache-2.0 +// Copyright Joyent and Node contributors. All rights reserved. MIT license. +#pragma once #include +#include +#include "zlib.h" #include #include -#include "zlib.h" +#include -#include -#include +#include namespace workerd::api::node { +#ifndef ZLIB_ERROR_CODES +#define ZLIB_ERROR_CODES(V) \ + V(Z_OK) \ + V(Z_STREAM_END) \ + V(Z_NEED_DICT) \ + V(Z_ERRNO) \ + V(Z_STREAM_ERROR) \ + V(Z_DATA_ERROR) \ + V(Z_MEM_ERROR) \ + V(Z_BUF_ERROR) \ + V(Z_VERSION_ERROR) + +inline const char* ZlibStrerror(int err) { +#define V(code) \ + if (err == code) return #code; + ZLIB_ERROR_CODES(V) +#undef V + return "Z_UNKNOWN_ERROR"; +} +#endif // ZLIB_ERROR_CODES + +// Certain zlib constants are defined by Node.js itself +static constexpr auto Z_MIN_CHUNK = 64; +static constexpr auto Z_MAX_CHUNK = 128 * 1024 * 1024; +static constexpr auto Z_DEFAULT_CHUNK = 16 * 1024; +static constexpr auto Z_MIN_MEMLEVEL = 1; + +static constexpr auto Z_MAX_MEMLEVEL = 9; +static constexpr auto Z_DEFAULT_MEMLEVEL = 8; +static constexpr auto Z_MIN_LEVEL = -1; +static constexpr auto Z_MAX_LEVEL = 9; +static constexpr auto Z_DEFAULT_LEVEL = Z_DEFAULT_COMPRESSION; +static constexpr auto Z_MIN_WINDOWBITS = 8; +static constexpr auto Z_MAX_WINDOWBITS = 15; +static constexpr auto Z_DEFAULT_WINDOWBITS = 15; + +static constexpr uint8_t GZIP_HEADER_ID1 = 0x1f; +static constexpr uint8_t GZIP_HEADER_ID2 = 0x8b; + +using ZlibModeValue = uint8_t; +enum class ZlibMode : ZlibModeValue { + NONE, + DEFLATE, + INFLATE, + GZIP, + GUNZIP, + DEFLATERAW, + INFLATERAW, + UNZIP, + BROTLI_DECODE, + BROTLI_ENCODE +}; + +struct CompressionError { + CompressionError(kj::StringPtr _message, kj::StringPtr _code, int _err) + : message(kj::str(_message)), + code(kj::str(_code)), + err(_err) { + JSG_REQUIRE(message.size() != 0, Error, "Compression error message should not be null"); + } + + kj::String message; + kj::String code; + int err; +}; + +class ZlibContext final { +public: + ZlibContext() = default; + + KJ_DISALLOW_COPY(ZlibContext); + + void close(); + void setBuffers(kj::ArrayPtr input, + uint32_t inputLength, + kj::ArrayPtr output, + uint32_t outputLength); + int getFlush() const { + return flush; + }; + void setFlush(int value) { + flush = value; + }; + void getAfterWriteOffsets(kj::ArrayPtr writeResult) const { + writeResult[0] = stream.avail_out; + writeResult[1] = stream.avail_in; + } + void setMode(ZlibMode value) { + mode = value; + }; + kj::Maybe resetStream(); + kj::Maybe getError() const; + void work(); + + uint getAvailIn() const { + return stream.avail_in; + }; + void setAvailIn(uint value) { + stream.avail_in = value; + }; + uint getAvailOut() const { + return stream.avail_out; + } + void setAvailOut(uint value) { + stream.avail_out = value; + }; + + // Zlib + void initialize(int _level, + int _windowBits, + int _memLevel, + int _strategy, + jsg::Optional> _dictionary); + kj::Maybe setParams(int level, int strategy); + +private: + bool initializeZlib(); + kj::Maybe setDictionary(); + + CompressionError constructError(kj::StringPtr message) const { + if (stream.msg != nullptr) message = kj::StringPtr(stream.msg); + + return {kj::str(message), kj::str(ZlibStrerror(err)), err}; + }; + + bool initialized = false; + ZlibMode mode = ZlibMode::NONE; + int flush = Z_NO_FLUSH; + int windowBits = 0; + int level = 0; + int memLevel = 0; + int strategy = 0; + kj::Vector dictionary{}; + + int err = Z_OK; + unsigned int gzip_id_bytes_read = 0; + z_stream stream{}; +}; + +using CompressionStreamErrorHandler = jsg::Function; + +template +class CompressionStream { +public: + CompressionStream() = default; + ~CompressionStream() noexcept(false); + KJ_DISALLOW_COPY_AND_MOVE(CompressionStream); + + void close(); + bool checkError(jsg::Lock& js); + void emitError(jsg::Lock& js, const CompressionError& error); + void writeStream(jsg::Lock& js, + int flush, + kj::ArrayPtr input, + uint32_t inputLength, + kj::ArrayPtr output, + uint32_t outputLength); + void setErrorHandler(CompressionStreamErrorHandler handler) { + errorHandler = kj::mv(handler); + }; + void initializeStream(kj::ArrayPtr _write_result, jsg::Function writeCallback); + +protected: + CompressionContext context; + +private: + bool initialized = false; + bool writing = false; + bool pending_close = false; + bool closed = false; + + // Equivalent to `write_js_callback` in Node.js + jsg::Optional> writeCallback; + kj::ArrayPtr writeResult = nullptr; + jsg::Optional errorHandler; +}; + // Implements utilities in support of the Node.js Zlib class ZlibUtil final: public jsg::Object { public: ZlibUtil() = default; ZlibUtil(jsg::Lock&, const jsg::Url&) {} - // Certain zlib constants are defined by NodeJS itself - static constexpr auto Z_MIN_CHUNK = 64; - static constexpr auto Z_MAX_CHUNK = 128 * 1024; - static constexpr auto Z_DEFAULT_CHUNK = 16 * 1024; - static constexpr auto Z_MIN_MEMLEVEL = 1; - - static constexpr auto Z_MAX_MEMLEVEL = 9; - static constexpr auto Z_DEFAULT_MEMLEVEL = 8; - static constexpr auto Z_MIN_LEVEL = -1; - static constexpr auto Z_MAX_LEVEL = 9; - static constexpr auto Z_DEFAULT_LEVEL = Z_DEFAULT_COMPRESSION; - static constexpr auto Z_MIN_WINDOWBITS = 8; - static constexpr auto Z_MAX_WINDOWBITS = 15; - static constexpr auto Z_DEFAULT_WINDOWBITS = 15; - - using ZlibModeValue = uint8_t; - enum class ZlibMode : ZlibModeValue { - NONE, - DEFLATE, - INFLATE, - GZIP, - GUNZIP, - DEFLATERAW, - INFLATERAW, - UNZIP, - BROTLI_DECODE, - BROTLI_ENCODE + class ZlibStream final: public jsg::Object, public CompressionStream { + public: + ZlibStream(ZlibMode mode); + KJ_DISALLOW_COPY_AND_MOVE(ZlibStream); + static jsg::Ref constructor(ZlibModeValue mode); + + // Instance methods + void initialize(int windowBits, + int level, + int memLevel, + int strategy, + kj::Array writeState, + jsg::Function writeCallback, + jsg::Optional> dictionary); + void write(jsg::Lock& js, + int flush, + kj::Array input, + int inputOffset, + int inputLength, + kj::Array output, + int outputOffset, + int outputLength); + void params(int level, int strategy); + void reset(jsg::Lock& js); + + JSG_RESOURCE_TYPE(ZlibStream) { + JSG_METHOD(initialize); + JSG_METHOD(close); + JSG_METHOD(write); + JSG_METHOD(params); + JSG_METHOD(setErrorHandler); + JSG_METHOD(reset); + } }; uint32_t crc32Sync(kj::Array data, uint32_t value); JSG_RESOURCE_TYPE(ZlibUtil) { JSG_METHOD_NAMED(crc32, crc32Sync); + JSG_NESTED_TYPE(ZlibStream); // zlib.constants (part of the API contract for node:zlib) JSG_STATIC_CONSTANT_NAMED(CONST_Z_NO_FLUSH, Z_NO_FLUSH); @@ -93,7 +280,6 @@ class ZlibUtil final: public jsg::Object { CONST_BROTLI_DECODE, static_cast(ZlibMode::BROTLI_DECODE)); JSG_STATIC_CONSTANT_NAMED( CONST_BROTLI_ENCODE, static_cast(ZlibMode::BROTLI_ENCODE)); - JSG_STATIC_CONSTANT_NAMED(CONST_Z_MIN_WINDOWBITS, Z_MIN_WINDOWBITS); JSG_STATIC_CONSTANT_NAMED(CONST_Z_MAX_WINDOWBITS, Z_MAX_WINDOWBITS); JSG_STATIC_CONSTANT_NAMED(CONST_Z_DEFAULT_WINDOWBITS, Z_DEFAULT_WINDOWBITS); @@ -203,6 +389,6 @@ class ZlibUtil final: public jsg::Object { }; }; -#define EW_NODE_ZLIB_ISOLATE_TYPES api::node::ZlibUtil +#define EW_NODE_ZLIB_ISOLATE_TYPES api::node::ZlibUtil, api::node::ZlibUtil::ZlibStream } // namespace workerd::api::node diff --git a/src/workerd/io/compatibility-date.capnp b/src/workerd/io/compatibility-date.capnp index 5f323ab863f..24851cf1d6c 100644 --- a/src/workerd/io/compatibility-date.capnp +++ b/src/workerd/io/compatibility-date.capnp @@ -580,4 +580,12 @@ struct CompatibilityFlags @0x8f8c1b68151b6cef { packages = "2024-03-01", backport = 0) $experimental; # Enables Python Workers and uses the bundle from the Pyodide source directory directly. For testing only. + + nodeJsZlib @59 :Bool + $compatEnableFlag("nodejs_zlib") + $compatDisableFlag("no_nodejs_zlib") + $experimental; + # Enables node:zlib implementation while it is in-development. + # Once the node:zlib implementation is complete, this will be automatically enabled when + # nodejs_compat is enabled. }