From c4ede54b57a6bc9213acf0585fdfabc3e04bb69d Mon Sep 17 00:00:00 2001 From: Harry Hough Date: Mon, 26 Aug 2024 16:05:32 -0500 Subject: [PATCH] Adding PipelineTransform class (#2553) Adding class for new pipelines product --- .vscode/launch.json | 1 + src/cloudflare/internal/pipeline-transform.ts | 151 ++++++++++++++++++ .../test/pipeline-transform/BUILD.bazel | 8 + .../test/pipeline-transform/transform-test.js | 115 +++++++++++++ .../test/pipeline-transform/transform.wd-test | 15 ++ .../test/pipeline-transform/tsconfig.json | 9 ++ src/cloudflare/pipeline-transform.ts | 3 + types/defines/pipeline-transform.d.ts | 13 ++ 8 files changed, 315 insertions(+) create mode 100644 src/cloudflare/internal/pipeline-transform.ts create mode 100644 src/cloudflare/internal/test/pipeline-transform/BUILD.bazel create mode 100644 src/cloudflare/internal/test/pipeline-transform/transform-test.js create mode 100644 src/cloudflare/internal/test/pipeline-transform/transform.wd-test create mode 100644 src/cloudflare/internal/test/pipeline-transform/tsconfig.json create mode 100644 src/cloudflare/pipeline-transform.ts create mode 100644 types/defines/pipeline-transform.d.ts diff --git a/.vscode/launch.json b/.vscode/launch.json index 85d80d6179d..c69526e60f3 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -286,6 +286,7 @@ "options": [ "src/cloudflare/internal/test/d1/d1-api-test.wd-test", "src/cloudflare/internal/test/vectorize/vectorize-api-test.wd-test", + "src/cloudflare/internal/test/pipeline-transform/pipeline-transform.wd-test", "src/workerd/api/actor-alarms-delete-test.wd-test", "src/workerd/api/actor-alarms-test.wd-test", "src/workerd/api/analytics-engine-test.wd-test", diff --git a/src/cloudflare/internal/pipeline-transform.ts b/src/cloudflare/internal/pipeline-transform.ts new file mode 100644 index 00000000000..4cee5c203cb --- /dev/null +++ b/src/cloudflare/internal/pipeline-transform.ts @@ -0,0 +1,151 @@ +// Copyright (c) 2024 Cloudflare, Inc. +// Licensed under the Apache 2.0 license found in the LICENSE file or at: +// https://opensource.org/licenses/Apache-2.0 + +import entrypoints from 'cloudflare-internal:workers'; + +async function* readLines( + stream: ReadableStream +): AsyncGenerator { + let start = 0; + let end = 0; + let partial = ''; + + // @ts-expect-error must have a '[Symbol.asyncIterator]()' method + for await (const chunk of stream) { + const full = partial + chunk; + for (const char of full) { + if (char === '\n') { + yield full.substring(start, end); + end++; + start = end; + } else { + end++; + } + } + + partial = full.substring(start, end); + start = 0; + end = 0; + } + + if (partial.length > 0) { + yield partial; + } +} + +type Batch = { + id: string; // unique identifier for the batch + shard: string; // assigned shard + ts: number; // creation timestamp of the batch + + format: Format; + size: { + bytes: number; + rows: number; + }; + data: unknown; +}; + +type JsonStream = Batch & { + format: Format.JSON_STREAM; + data: ReadableStream; +}; + +enum Format { + JSON_STREAM = 'json_stream', // jsonl +} + +export class PipelineTransformImpl extends entrypoints.WorkerEntrypoint { + #batch?: Batch; + #initalized: boolean = false; + + // stub overriden on the sub class + // eslint-disable-next-line @typescript-eslint/require-await + public async transformJson(_data: object[]): Promise { + throw new Error('should be implemented by parent'); + } + + // called by the dispatcher which then calls the subclass methods + // @ts-expect-error thinks ping is never used + private async _ping(): Promise { + // making sure the function was overriden by an implementing subclass + if (this.transformJson !== PipelineTransformImpl.prototype.transformJson) { + return await Promise.resolve(); // eslint + } else { + throw new Error( + 'the transformJson method must be overridden by the PipelineTransform subclass' + ); + } + } + + // called by the dispatcher which then calls the subclass methods + // the reason this is typescript private and not javascript private is that this must be + // able to be called by the dispatcher but should not be called by the class implementer + // @ts-expect-error _transform is called by rpc + private async _transform(batch: Batch): Promise { + if (this.#initalized) { + throw new Error('pipeline entrypoint has already been initialized'); + } + + this.#batch = batch; + this.#initalized = true; + + switch (this.#batch.format) { + case Format.JSON_STREAM: + const data = await this.#readJsonStream(); + const transformed = await this.transformJson(data); + return this.#sendJson(transformed); + default: + throw new Error('unsupported batch format'); + } + } + + async #readJsonStream(): Promise { + if (this.#batch!.format !== Format.JSON_STREAM) { + throw new Error(`expected JSON_STREAM not ${this.#batch!.format}`); + } + + const batch = this.#batch!.data as ReadableStream; + const decoder = batch.pipeThrough(new TextDecoderStream()); + + const data: object[] = []; + for await (const line of readLines(decoder)) { + data.push(JSON.parse(line) as object); + } + + return data; + } + + #sendJson(data: object[]): JsonStream { + if (!(data instanceof Array)) { + throw new Error('transformJson must return an array of objects'); + } + + let written = 0; + const encoder = new TextEncoder(); + const readable = new ReadableStream({ + start(controller): void { + for (const obj of data) { + const encoded = encoder.encode(`${JSON.stringify(obj)}\n`); + written += encoded.length; + controller.enqueue(encoded); + } + + controller.close(); + }, + }); + + return { + id: this.#batch!.id, + shard: this.#batch!.shard, + ts: this.#batch!.ts, + format: Format.JSON_STREAM, + size: { + bytes: written, + rows: data.length, + }, + data: readable, + }; + } +} diff --git a/src/cloudflare/internal/test/pipeline-transform/BUILD.bazel b/src/cloudflare/internal/test/pipeline-transform/BUILD.bazel new file mode 100644 index 00000000000..9052649b3dc --- /dev/null +++ b/src/cloudflare/internal/test/pipeline-transform/BUILD.bazel @@ -0,0 +1,8 @@ +load("//:build/wd_test.bzl", "wd_test") + +wd_test( + size = "large", + src = "transform.wd-test", + args = ["--experimental"], + data = glob(["*.js"]), +) diff --git a/src/cloudflare/internal/test/pipeline-transform/transform-test.js b/src/cloudflare/internal/test/pipeline-transform/transform-test.js new file mode 100644 index 00000000000..793dcf54fd4 --- /dev/null +++ b/src/cloudflare/internal/test/pipeline-transform/transform-test.js @@ -0,0 +1,115 @@ +// @ts-nocheck +// Copyright (c) 2024 Cloudflare, Inc. +// Licensed under the Apache 2.0 license found in the LICENSE file or at: +// https://opensource.org/licenses/Apache-2.0 + +import assert from 'node:assert'; +import { PipelineTransform } from 'cloudflare:pipeline-transform'; + +// this is how "Pipeline" would be implemented by the user +const customTransform = class MyEntrypoint extends PipelineTransform { + /** + * @param {any} batch + * @override + */ + async transformJson(batch) { + for (const obj of batch) { + obj.dispatcher = 'was here!'; + await new Promise((resolve) => setTimeout(resolve, 50)); + obj.wait = 'happened!'; + } + + return batch; + } +}; + +const lines = [ + `${JSON.stringify({ name: 'jimmy', age: '42' })}\n`, + `${JSON.stringify({ name: 'jonny', age: '9' })}\n`, + `${JSON.stringify({ name: 'joey', age: '108' })}\n`, +]; + +function newBatch() { + return { + id: 'test', + shard: '0', + ts: Date.now(), + format: 'json_stream', + data: new ReadableStream({ + start(controller) { + const encoder = new TextEncoder(); + for (const line of lines) { + controller.enqueue(encoder.encode(line)); + } + controller.close(); + }, + }), + }; +} + +// bazel test //src/cloudflare/internal/test/pipeline-transform:transform --test_output=errors --sandbox_debug +export const tests = { + async test(ctr, env, ctx) { + { + // should fail dispatcher test call when PipelineTransform class not extended + const transformer = new PipelineTransform(ctx, env); + await assert.rejects(transformer._ping(), (err) => { + assert.strictEqual( + err.message, + 'the transformJson method must be overridden by the PipelineTransform subclass' + ); + return true; + }); + } + + { + // should correctly handle dispatcher test call + const transform = new customTransform(ctx, env); + await assert.doesNotReject(transform._ping()); + } + + { + // should return mutated batch + const transformer = new customTransform(ctx, env); + const batch = newBatch(); + + const result = await transformer._transform(batch); + assert.equal(true, result.data instanceof ReadableStream); + + const reader = result.data + .pipeThrough(new TextDecoderStream()) + .getReader(); + + let data = ''; + while (true) { + const { done, value } = await reader.read(); + if (done) { + break; + } else { + data += value; + } + } + + assert.notEqual(data.length, 0); + + const objects = []; + const resultLines = data.split('\n'); + resultLines.pop(); + for (const line of resultLines) { + objects.push(JSON.parse(line)); + } + assert.equal(objects.length, 3); + + let index = 0; + for (const obj of objects) { + assert.equal(obj.dispatcher, 'was here!'); + delete obj.dispatcher; + assert.equal(obj.wait, 'happened!'); + delete obj.wait; + + assert.equal(`${JSON.stringify(obj)}\n`, lines[index]); + index++; + } + } + }, +}; diff --git a/src/cloudflare/internal/test/pipeline-transform/transform.wd-test b/src/cloudflare/internal/test/pipeline-transform/transform.wd-test new file mode 100644 index 00000000000..267990897bb --- /dev/null +++ b/src/cloudflare/internal/test/pipeline-transform/transform.wd-test @@ -0,0 +1,15 @@ +using Workerd = import "/workerd/workerd.capnp"; + +const unitTests :Workerd.Config = ( + services = [ + ( name = "transform-test", + worker = ( + modules = [ + (name = "worker", esModule = embed "transform-test.js") + ], + compatibilityDate = "2024-08-15", + compatibilityFlags = ["nodejs_compat", "experimental"] + ) + ) + ] +); diff --git a/src/cloudflare/internal/test/pipeline-transform/tsconfig.json b/src/cloudflare/internal/test/pipeline-transform/tsconfig.json new file mode 100644 index 00000000000..1e511bc8474 --- /dev/null +++ b/src/cloudflare/internal/test/pipeline-transform/tsconfig.json @@ -0,0 +1,9 @@ +{ + "extends": "../../../tsconfig.json", + "compilerOptions": { + "checkJs": true, + "noEmit": true + }, + "include": ["*.ts", "*.js"], + "files": ["../../../../../types/defines/pipeline-transform.d.ts"] +} diff --git a/src/cloudflare/pipeline-transform.ts b/src/cloudflare/pipeline-transform.ts new file mode 100644 index 00000000000..52f90934c00 --- /dev/null +++ b/src/cloudflare/pipeline-transform.ts @@ -0,0 +1,3 @@ +import { PipelineTransformImpl } from 'cloudflare-internal:pipeline-transform'; + +export const PipelineTransform = PipelineTransformImpl; diff --git a/types/defines/pipeline-transform.d.ts b/types/defines/pipeline-transform.d.ts new file mode 100644 index 00000000000..58706d726c3 --- /dev/null +++ b/types/defines/pipeline-transform.d.ts @@ -0,0 +1,13 @@ +// Copyright (c) 2022-2023 Cloudflare, Inc. +// Licensed under the Apache 2.0 license found in the LICENSE file or at: +// https://opensource.org/licenses/Apache-2.0 + +export abstract class PipelineTransform { + /** + * transformJson recieves an array of javascript objects which can be + * mutated and returned to the pipeline + * @param data The data to be mutated + * @returns A promise containing the mutated data + */ + public transformJson(data: object[]): Promise; +}