Skip to content

Commit

Permalink
Adding PipelineTransform class (#2553)
Browse files Browse the repository at this point in the history
Adding class for new pipelines product
  • Loading branch information
hhoughgg authored Aug 26, 2024
1 parent 7a0bace commit c4ede54
Show file tree
Hide file tree
Showing 8 changed files with 315 additions and 0 deletions.
1 change: 1 addition & 0 deletions .vscode/launch.json
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,7 @@
"options": [
"src/cloudflare/internal/test/d1/d1-api-test.wd-test",
"src/cloudflare/internal/test/vectorize/vectorize-api-test.wd-test",
"src/cloudflare/internal/test/pipeline-transform/pipeline-transform.wd-test",
"src/workerd/api/actor-alarms-delete-test.wd-test",
"src/workerd/api/actor-alarms-test.wd-test",
"src/workerd/api/analytics-engine-test.wd-test",
Expand Down
151 changes: 151 additions & 0 deletions src/cloudflare/internal/pipeline-transform.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
// Copyright (c) 2024 Cloudflare, Inc.
// Licensed under the Apache 2.0 license found in the LICENSE file or at:
// https://opensource.org/licenses/Apache-2.0

import entrypoints from 'cloudflare-internal:workers';

async function* readLines(
stream: ReadableStream<string>
): AsyncGenerator<string> {
let start = 0;
let end = 0;
let partial = '';

// @ts-expect-error must have a '[Symbol.asyncIterator]()' method
for await (const chunk of stream) {
const full = partial + chunk;
for (const char of full) {
if (char === '\n') {
yield full.substring(start, end);
end++;
start = end;
} else {
end++;
}
}

partial = full.substring(start, end);
start = 0;
end = 0;
}

if (partial.length > 0) {
yield partial;
}
}

type Batch = {
id: string; // unique identifier for the batch
shard: string; // assigned shard
ts: number; // creation timestamp of the batch

format: Format;
size: {
bytes: number;
rows: number;
};
data: unknown;
};

type JsonStream = Batch & {
format: Format.JSON_STREAM;
data: ReadableStream<Uint8Array>;
};

enum Format {
JSON_STREAM = 'json_stream', // jsonl
}

export class PipelineTransformImpl extends entrypoints.WorkerEntrypoint {
#batch?: Batch;
#initalized: boolean = false;

// stub overriden on the sub class
// eslint-disable-next-line @typescript-eslint/require-await
public async transformJson(_data: object[]): Promise<object[]> {
throw new Error('should be implemented by parent');
}

// called by the dispatcher which then calls the subclass methods
// @ts-expect-error thinks ping is never used
private async _ping(): Promise<void> {
// making sure the function was overriden by an implementing subclass
if (this.transformJson !== PipelineTransformImpl.prototype.transformJson) {
return await Promise.resolve(); // eslint
} else {
throw new Error(
'the transformJson method must be overridden by the PipelineTransform subclass'
);
}
}

// called by the dispatcher which then calls the subclass methods
// the reason this is typescript private and not javascript private is that this must be
// able to be called by the dispatcher but should not be called by the class implementer
// @ts-expect-error _transform is called by rpc
private async _transform(batch: Batch): Promise<JsonStream> {
if (this.#initalized) {
throw new Error('pipeline entrypoint has already been initialized');
}

this.#batch = batch;
this.#initalized = true;

switch (this.#batch.format) {
case Format.JSON_STREAM:
const data = await this.#readJsonStream();
const transformed = await this.transformJson(data);
return this.#sendJson(transformed);
default:
throw new Error('unsupported batch format');
}
}

async #readJsonStream(): Promise<object[]> {
if (this.#batch!.format !== Format.JSON_STREAM) {
throw new Error(`expected JSON_STREAM not ${this.#batch!.format}`);
}

const batch = this.#batch!.data as ReadableStream<Uint8Array>;
const decoder = batch.pipeThrough(new TextDecoderStream());

const data: object[] = [];
for await (const line of readLines(decoder)) {
data.push(JSON.parse(line) as object);
}

return data;
}

#sendJson(data: object[]): JsonStream {
if (!(data instanceof Array)) {
throw new Error('transformJson must return an array of objects');
}

let written = 0;
const encoder = new TextEncoder();
const readable = new ReadableStream<Uint8Array>({
start(controller): void {
for (const obj of data) {
const encoded = encoder.encode(`${JSON.stringify(obj)}\n`);
written += encoded.length;
controller.enqueue(encoded);
}

controller.close();
},
});

return {
id: this.#batch!.id,
shard: this.#batch!.shard,
ts: this.#batch!.ts,
format: Format.JSON_STREAM,
size: {
bytes: written,
rows: data.length,
},
data: readable,
};
}
}
8 changes: 8 additions & 0 deletions src/cloudflare/internal/test/pipeline-transform/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
load("//:build/wd_test.bzl", "wd_test")

wd_test(
size = "large",
src = "transform.wd-test",
args = ["--experimental"],
data = glob(["*.js"]),
)
115 changes: 115 additions & 0 deletions src/cloudflare/internal/test/pipeline-transform/transform-test.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
// @ts-nocheck
// Copyright (c) 2024 Cloudflare, Inc.
// Licensed under the Apache 2.0 license found in the LICENSE file or at:
// https://opensource.org/licenses/Apache-2.0

import assert from 'node:assert';
import { PipelineTransform } from 'cloudflare:pipeline-transform';

// this is how "Pipeline" would be implemented by the user
const customTransform = class MyEntrypoint extends PipelineTransform {
/**
* @param {any} batch
* @override
*/
async transformJson(batch) {
for (const obj of batch) {
obj.dispatcher = 'was here!';
await new Promise((resolve) => setTimeout(resolve, 50));
obj.wait = 'happened!';
}

return batch;
}
};

const lines = [
`${JSON.stringify({ name: 'jimmy', age: '42' })}\n`,
`${JSON.stringify({ name: 'jonny', age: '9' })}\n`,
`${JSON.stringify({ name: 'joey', age: '108' })}\n`,
];

function newBatch() {
return {
id: 'test',
shard: '0',
ts: Date.now(),
format: 'json_stream',
data: new ReadableStream({
start(controller) {
const encoder = new TextEncoder();
for (const line of lines) {
controller.enqueue(encoder.encode(line));
}
controller.close();
},
}),
};
}

// bazel test //src/cloudflare/internal/test/pipeline-transform:transform --test_output=errors --sandbox_debug
export const tests = {
async test(ctr, env, ctx) {
{
// should fail dispatcher test call when PipelineTransform class not extended
const transformer = new PipelineTransform(ctx, env);
await assert.rejects(transformer._ping(), (err) => {
assert.strictEqual(
err.message,
'the transformJson method must be overridden by the PipelineTransform subclass'
);
return true;
});
}

{
// should correctly handle dispatcher test call
const transform = new customTransform(ctx, env);
await assert.doesNotReject(transform._ping());
}

{
// should return mutated batch
const transformer = new customTransform(ctx, env);
const batch = newBatch();

const result = await transformer._transform(batch);
assert.equal(true, result.data instanceof ReadableStream);

const reader = result.data
.pipeThrough(new TextDecoderStream())
.getReader();

let data = '';
while (true) {
const { done, value } = await reader.read();
if (done) {
break;
} else {
data += value;
}
}

assert.notEqual(data.length, 0);

const objects = [];
const resultLines = data.split('\n');
resultLines.pop();
for (const line of resultLines) {
objects.push(JSON.parse(line));
}
assert.equal(objects.length, 3);

let index = 0;
for (const obj of objects) {
assert.equal(obj.dispatcher, 'was here!');
delete obj.dispatcher;
assert.equal(obj.wait, 'happened!');
delete obj.wait;

assert.equal(`${JSON.stringify(obj)}\n`, lines[index]);
index++;
}
}
},
};
15 changes: 15 additions & 0 deletions src/cloudflare/internal/test/pipeline-transform/transform.wd-test
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
using Workerd = import "/workerd/workerd.capnp";

const unitTests :Workerd.Config = (
services = [
( name = "transform-test",
worker = (
modules = [
(name = "worker", esModule = embed "transform-test.js")
],
compatibilityDate = "2024-08-15",
compatibilityFlags = ["nodejs_compat", "experimental"]
)
)
]
);
9 changes: 9 additions & 0 deletions src/cloudflare/internal/test/pipeline-transform/tsconfig.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
{
"extends": "../../../tsconfig.json",
"compilerOptions": {
"checkJs": true,
"noEmit": true
},
"include": ["*.ts", "*.js"],
"files": ["../../../../../types/defines/pipeline-transform.d.ts"]
}
3 changes: 3 additions & 0 deletions src/cloudflare/pipeline-transform.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
import { PipelineTransformImpl } from 'cloudflare-internal:pipeline-transform';

export const PipelineTransform = PipelineTransformImpl;
13 changes: 13 additions & 0 deletions types/defines/pipeline-transform.d.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
// Copyright (c) 2022-2023 Cloudflare, Inc.
// Licensed under the Apache 2.0 license found in the LICENSE file or at:
// https://opensource.org/licenses/Apache-2.0

export abstract class PipelineTransform {
/**
* transformJson recieves an array of javascript objects which can be
* mutated and returned to the pipeline
* @param data The data to be mutated
* @returns A promise containing the mutated data
*/
public transformJson(data: object[]): Promise<object[]>;
}

0 comments on commit c4ede54

Please sign in to comment.