From c595b002db4147ebfef3f3581048e5f8c6554600 Mon Sep 17 00:00:00 2001 From: Harry Hough Date: Fri, 16 Aug 2024 15:36:26 -0500 Subject: [PATCH] Adding class for new pipelines product --- .vscode/launch.json | 1 + src/cloudflare/internal/pipeline-transform.ts | 146 ++++++++++++++++++ .../test/pipeline-transform/BUILD.bazel | 7 + .../test/pipeline-transform/transform-test.js | 108 +++++++++++++ .../test/pipeline-transform/transform.wd-test | 15 ++ .../test/pipeline-transform/tsconfig.json | 12 ++ src/cloudflare/pipeline-transform.ts | 3 + types/defines/pipeline-transform.d.ts | 18 +++ 8 files changed, 310 insertions(+) create mode 100644 src/cloudflare/internal/pipeline-transform.ts create mode 100644 src/cloudflare/internal/test/pipeline-transform/BUILD.bazel create mode 100644 src/cloudflare/internal/test/pipeline-transform/transform-test.js create mode 100644 src/cloudflare/internal/test/pipeline-transform/transform.wd-test create mode 100644 src/cloudflare/internal/test/pipeline-transform/tsconfig.json create mode 100644 src/cloudflare/pipeline-transform.ts create mode 100644 types/defines/pipeline-transform.d.ts diff --git a/.vscode/launch.json b/.vscode/launch.json index 85d80d6179d..c69526e60f3 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -286,6 +286,7 @@ "options": [ "src/cloudflare/internal/test/d1/d1-api-test.wd-test", "src/cloudflare/internal/test/vectorize/vectorize-api-test.wd-test", + "src/cloudflare/internal/test/pipeline-transform/pipeline-transform.wd-test", "src/workerd/api/actor-alarms-delete-test.wd-test", "src/workerd/api/actor-alarms-test.wd-test", "src/workerd/api/analytics-engine-test.wd-test", diff --git a/src/cloudflare/internal/pipeline-transform.ts b/src/cloudflare/internal/pipeline-transform.ts new file mode 100644 index 00000000000..fa5ab4025b5 --- /dev/null +++ b/src/cloudflare/internal/pipeline-transform.ts @@ -0,0 +1,146 @@ +// Copyright (c) 2024 Cloudflare, Inc. +// Licensed under the Apache 2.0 license found in the LICENSE file or at: +// https://opensource.org/licenses/Apache-2.0 + +import entrypoints from 'cloudflare-internal:workers' + +async function* readLines(stream: ReadableStream): AsyncGenerator { + let start, end = 0 + let partial = '' + + // @ts-ignore + for await (const chunk of stream) { + const full = chunk+partial as string + for (const char of full) { + if (char === '\n') { + yield chunk.substring(start, end) + end++ + start = end + } else { + end++ + } + } + + partial = chunk.substring(start, end) + start = 0 + end = 0 + } + + if (partial.length > 0) { + yield partial + } +} + +type Batch = { + id: string // unique identifier for the batch + shard?: string // assigned shard + ts: number // timestamp of the event + + format: Format + size: { + bytes: number + rows: number + } + data: unknown +} + +type JsonStream = Batch & { + format: Format.JSON_STREAM, + data: ReadableStream +} + +enum Format { + JSON_STREAM = 'json_stream', //jsonl +} + +export class PipelineTransformImpl extends entrypoints.WorkerEntrypoint { + #batch?: Batch + #initalized: boolean = false + + constructor(ctx: unknown, env: unknown) { + super(ctx, env) + } + + // stub overriden on the sub class + // @ts-ignore + public async transformJson(data: object[]): Promise { + throw new Error('should be implemented by parent') + } + + // called by the dispatcher which then calls the subclass methods + // @ts-ignore + private async _ping(): Promise { + // making sure the function was overriden by an implementing subclass + if (this.transformJson !== PipelineTransformImpl.prototype.transformJson) { + return + } else { + throw new Error('the transformJson method must be overridden by the PipelineTransform subclass') + } + } + + // called by the dispatcher which then calls the subclass methods + // the reason this is typescript private and not javascript private is that this must be + // able to be called by the dispatcher but should not be called by the class implementer + // @ts-ignore + private async _transform(batch: Batch): Promise { + if (this.#initalized) { + throw new Error('pipeline entrypoint has already been initialized') + } + + this.#batch = batch + this.#initalized = true + + switch (this.#batch!.format) { + case Format.JSON_STREAM: + const data = await this.#readJsonStream() + const transformed = await this.transformJson(data) + return this.#sendJson(transformed) + default: + throw new Error('unsupported batch format') + } + } + + async #readJsonStream(): Promise { + if (this.#batch!.format !== Format.JSON_STREAM) { + throw new Error(`expected JSON_STREAM not ${this.#batch!.format}`) + } + + const batch = this.#batch!.data as ReadableStream + const decoder = batch.pipeThrough(new TextDecoderStream()) + + const data: object[] = [] + for await (const line of readLines(decoder)) { + data.push(JSON.parse(line)) + } + + return data + } + + async #sendJson(data: object[]): Promise { + let written = 0 + const encoder = new TextEncoder() + const readable = new ReadableStream({ + start(controller) { + for (const obj of data) { + const encoded = encoder.encode(`${JSON.stringify(obj)}\n`) + written += encoded.length + controller.enqueue(encoded) + } + + controller.close() + } + }) + + return { + id: this.#batch!.id, + shard: this.#batch!.shard, + ts: this.#batch!.ts, + format: Format.JSON_STREAM, + size: { + bytes: written, + rows: data.length + }, + data: readable + } as JsonStream + } +} diff --git a/src/cloudflare/internal/test/pipeline-transform/BUILD.bazel b/src/cloudflare/internal/test/pipeline-transform/BUILD.bazel new file mode 100644 index 00000000000..11367ffa711 --- /dev/null +++ b/src/cloudflare/internal/test/pipeline-transform/BUILD.bazel @@ -0,0 +1,7 @@ +load("//:build/wd_test.bzl", "wd_test") + +wd_test( + src = "transform.wd-test", + args = ["--experimental"], + data = glob(["*.js"]), +) diff --git a/src/cloudflare/internal/test/pipeline-transform/transform-test.js b/src/cloudflare/internal/test/pipeline-transform/transform-test.js new file mode 100644 index 00000000000..0133c516edb --- /dev/null +++ b/src/cloudflare/internal/test/pipeline-transform/transform-test.js @@ -0,0 +1,108 @@ +// @ts-nocheck +// Copyright (c) 2024 Cloudflare, Inc. +// Licensed under the Apache 2.0 license found in the LICENSE file or at: +// https://opensource.org/licenses/Apache-2.0 + +// @ts-ignore +import assert from 'node:assert' +import { PipelineTransform } from 'cloudflare:pipeline-transform' + +// this is how "Pipeline" would be implemented by the user +const customTransform = class MyEntrypoint extends PipelineTransform { + /** + * @param {any} batch + * @override + */ + async transformJson(batch) { + for (const obj of batch) { + obj.dispatcher = 'was here!' + } + + return batch + } +} + +const lines = [ + `${JSON.stringify({ name: 'jimmy', age: '42' })}\n`, + `${JSON.stringify({ name: 'jonny', age: '9' })}\n`, + `${JSON.stringify({ name: 'joey', age: '108' })}\n`, +] + +function newBatch() { + return { + id: 'test', + shard: '0', + ts: Date.now(), + format: 'json_stream', + data: new ReadableStream({ + start(controller) { + const encoder = new TextEncoder() + for (const line of lines) { + controller.enqueue(encoder.encode(line)) + } + controller.close() + } + }) + } + } + +// bazel test //src/cloudflare/internal/test/pipeline-transform:transform --test_output=errors --sandbox_debug +export const tests = { + async test(ctr, env, ctx) { + { + // should fail dispatcher test call when PipelineTransform class not extended + const transformer = new PipelineTransform(ctx, env) + await assert.rejects(transformer._ping(), (err) => { + assert.strictEqual(err.message, 'the transformJson method must be overridden by the PipelineTransform subclass') + return true + }) + } + + { + // should correctly handle dispatcher test call + const transform = new customTransform(ctx, env) + await assert.doesNotReject(transform._ping()) + } + + { + // should return mutated batch + const transformer = new customTransform(ctx, env) + const batch = newBatch() + + const result = await transformer._transform(batch) + assert.equal(true, result.data instanceof ReadableStream) + + const reader = result.data + .pipeThrough(new TextDecoderStream()) + .getReader() + + let data = '' + while (true) { + const { done, value } = await reader.read() + if (done) { + break + } else { + data += value + } + } + + assert.notEqual(data.length, 0) + + const objects = [] + const resultLines = data.split('\n') + resultLines.pop() + for (const line of resultLines) { + objects.push(JSON.parse(line)) + } + + let index = 0 + for (const obj of objects) { + assert.equal(obj.dispatcher, 'was here!') + delete obj.dispatcher + + assert.equal(`${JSON.stringify(obj)}\n`, lines[index]) + index++ + } + } + }, +} diff --git a/src/cloudflare/internal/test/pipeline-transform/transform.wd-test b/src/cloudflare/internal/test/pipeline-transform/transform.wd-test new file mode 100644 index 00000000000..267990897bb --- /dev/null +++ b/src/cloudflare/internal/test/pipeline-transform/transform.wd-test @@ -0,0 +1,15 @@ +using Workerd = import "/workerd/workerd.capnp"; + +const unitTests :Workerd.Config = ( + services = [ + ( name = "transform-test", + worker = ( + modules = [ + (name = "worker", esModule = embed "transform-test.js") + ], + compatibilityDate = "2024-08-15", + compatibilityFlags = ["nodejs_compat", "experimental"] + ) + ) + ] +); diff --git a/src/cloudflare/internal/test/pipeline-transform/tsconfig.json b/src/cloudflare/internal/test/pipeline-transform/tsconfig.json new file mode 100644 index 00000000000..78d33756d4e --- /dev/null +++ b/src/cloudflare/internal/test/pipeline-transform/tsconfig.json @@ -0,0 +1,12 @@ +{ + "extends": "../../../tsconfig.json", + "compilerOptions": { + "checkJs": true, + "noEmit": true + }, + "include": [ + "*.ts", + "*.js", + ], + "files": ["../../../../../types/defines/pipeline-transform.d.ts"] +} diff --git a/src/cloudflare/pipeline-transform.ts b/src/cloudflare/pipeline-transform.ts new file mode 100644 index 00000000000..69c993668bd --- /dev/null +++ b/src/cloudflare/pipeline-transform.ts @@ -0,0 +1,3 @@ +import { PipelineTransformImpl } from 'cloudflare-internal:pipeline-transform' + +export const PipelineTransform = PipelineTransformImpl diff --git a/types/defines/pipeline-transform.d.ts b/types/defines/pipeline-transform.d.ts new file mode 100644 index 00000000000..0ea8d9a6632 --- /dev/null +++ b/types/defines/pipeline-transform.d.ts @@ -0,0 +1,18 @@ +// Copyright (c) 2022-2023 Cloudflare, Inc. +// Licensed under the Apache 2.0 license found in the LICENSE file or at: +// https://opensource.org/licenses/Apache-2.0 + +import { WorkerEntrypoint } from "cloudflare:workers"; + +/** + * The Pipelines class is called by the Pipelines product to support transformations + */ +declare abstract class PipelineTransform extends WorkerEntrypoint { + /** + * transformJson recieves an array of javascript objects which can be + * mutated and returned to the pipeline + * @param data The data to be mutated + * @returns A promise containing the mutated data + */ + public transformJson(data: object[]): Promise +}