Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding PipelineTransform class #2553

Merged
merged 2 commits into from
Aug 26, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .vscode/launch.json
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,7 @@
"options": [
"src/cloudflare/internal/test/d1/d1-api-test.wd-test",
"src/cloudflare/internal/test/vectorize/vectorize-api-test.wd-test",
"src/cloudflare/internal/test/pipeline-transform/pipeline-transform.wd-test",
"src/workerd/api/actor-alarms-delete-test.wd-test",
"src/workerd/api/actor-alarms-test.wd-test",
"src/workerd/api/analytics-engine-test.wd-test",
Expand Down
147 changes: 147 additions & 0 deletions src/cloudflare/internal/pipeline-transform.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
// Copyright (c) 2024 Cloudflare, Inc.
// Licensed under the Apache 2.0 license found in the LICENSE file or at:
// https://opensource.org/licenses/Apache-2.0

import entrypoints from 'cloudflare-internal:workers';

async function* readLines(
stream: ReadableStream<string>
): AsyncGenerator<string> {
let start = 0;
let end = 0;
let partial = '';

// @ts-expect-error must have a '[Symbol.asyncIterator]()' method
for await (const chunk of stream) {
const full = partial + chunk;
for (const char of full) {
if (char === '\n') {
yield full.substring(start, end);
end++;
start = end;
} else {
end++;
}
}

partial = full.substring(start, end);
start = 0;
end = 0;
}

if (partial.length > 0) {
yield partial;
}
}

type Batch = {
id: string; // unique identifier for the batch
shard: string; // assigned shard
ts: number; // timestamp of the event
hhoughgg marked this conversation as resolved.
Show resolved Hide resolved

format: Format;
size: {
bytes: number;
rows: number;
};
data: unknown;
};

type JsonStream = Batch & {
format: Format.JSON_STREAM;
data: ReadableStream<Uint8Array>;
};

enum Format {
JSON_STREAM = 'json_stream', // jsonl
}

export class PipelineTransformImpl extends entrypoints.WorkerEntrypoint {
#batch?: Batch;
#initalized: boolean = false;

// stub overriden on the sub class
// eslint-disable-next-line @typescript-eslint/require-await
public async transformJson(_data: object[]): Promise<object[]> {
throw new Error('should be implemented by parent');
}

// called by the dispatcher which then calls the subclass methods
// @ts-expect-error thinks ping is never used
private async _ping(): Promise<void> {
// making sure the function was overriden by an implementing subclass
if (this.transformJson !== PipelineTransformImpl.prototype.transformJson) {
return await Promise.resolve(); // eslint
} else {
throw new Error(
'the transformJson method must be overridden by the PipelineTransform subclass'
);
}
}

// called by the dispatcher which then calls the subclass methods
// the reason this is typescript private and not javascript private is that this must be
// able to be called by the dispatcher but should not be called by the class implementer
// @ts-expect-error _transform is called by rpc
private async _transform(batch: Batch): Promise<JsonStream> {
hhoughgg marked this conversation as resolved.
Show resolved Hide resolved
if (this.#initalized) {
throw new Error('pipeline entrypoint has already been initialized');
}

this.#batch = batch;
hhoughgg marked this conversation as resolved.
Show resolved Hide resolved
this.#initalized = true;

switch (this.#batch.format) {
case Format.JSON_STREAM:
const data = await this.#readJsonStream();
const transformed = await this.transformJson(data);
return this.#sendJson(transformed);
default:
throw new Error('unsupported batch format');
}
}

async #readJsonStream(): Promise<object[]> {
if (this.#batch!.format !== Format.JSON_STREAM) {
throw new Error(`expected JSON_STREAM not ${this.#batch!.format}`);
}

const batch = this.#batch!.data as ReadableStream<Uint8Array>;
const decoder = batch.pipeThrough(new TextDecoderStream());

const data: object[] = [];
for await (const line of readLines(decoder)) {
data.push(JSON.parse(line) as object);
}

return data;
}

#sendJson(data: object[]): JsonStream {
let written = 0;
const encoder = new TextEncoder();
const readable = new ReadableStream<Uint8Array>({
start(controller): void {
for (const obj of data) {
hhoughgg marked this conversation as resolved.
Show resolved Hide resolved
const encoded = encoder.encode(`${JSON.stringify(obj)}\n`);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In terms of validating the response from the customer's code, it's also possible for them to return values that will throw when passed to JSON.stringify, and we'd probably benefit from handling them more gracefully than just an uncaught exception?

Although then again in the instanceof Array check above all we do is throw an exception, so actually maybe this is already fine and we just need the calling code to detect this case correctly?

https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/JSON/stringify#exceptions

written += encoded.length;
controller.enqueue(encoded);
}

controller.close();
},
});

return {
id: this.#batch!.id,
shard: this.#batch!.shard,
ts: this.#batch!.ts,
format: Format.JSON_STREAM,
size: {
bytes: written,
rows: data.length,
},
data: readable,
};
}
}
8 changes: 8 additions & 0 deletions src/cloudflare/internal/test/pipeline-transform/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
load("//:build/wd_test.bzl", "wd_test")

wd_test(
size = "large",
src = "transform.wd-test",
args = ["--experimental"],
data = glob(["*.js"]),
)
110 changes: 110 additions & 0 deletions src/cloudflare/internal/test/pipeline-transform/transform-test.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
// @ts-nocheck
// Copyright (c) 2024 Cloudflare, Inc.
// Licensed under the Apache 2.0 license found in the LICENSE file or at:
// https://opensource.org/licenses/Apache-2.0

import assert from 'node:assert';
import { PipelineTransform } from 'cloudflare:pipeline-transform';

// this is how "Pipeline" would be implemented by the user
const customTransform = class MyEntrypoint extends PipelineTransform {
/**
* @param {any} batch
* @override
*/
async transformJson(batch) {
for (const obj of batch) {
obj.dispatcher = 'was here!';
}

return batch;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor, but it'd be nice to test the async nature of this as well -- e.g. modifying each obj a little more after waiting on a scheduler.yield() or something like that.

}
};

const lines = [
`${JSON.stringify({ name: 'jimmy', age: '42' })}\n`,
`${JSON.stringify({ name: 'jonny', age: '9' })}\n`,
`${JSON.stringify({ name: 'joey', age: '108' })}\n`,
];

function newBatch() {
return {
id: 'test',
shard: '0',
ts: Date.now(),
format: 'json_stream',
data: new ReadableStream({
start(controller) {
const encoder = new TextEncoder();
for (const line of lines) {
controller.enqueue(encoder.encode(line));
}
controller.close();
},
}),
};
}

// bazel test //src/cloudflare/internal/test/pipeline-transform:transform --test_output=errors --sandbox_debug
export const tests = {
async test(ctr, env, ctx) {
{
// should fail dispatcher test call when PipelineTransform class not extended
const transformer = new PipelineTransform(ctx, env);
await assert.rejects(transformer._ping(), (err) => {
assert.strictEqual(
err.message,
'the transformJson method must be overridden by the PipelineTransform subclass'
);
return true;
});
}

{
// should correctly handle dispatcher test call
const transform = new customTransform(ctx, env);
await assert.doesNotReject(transform._ping());
}

{
// should return mutated batch
const transformer = new customTransform(ctx, env);
const batch = newBatch();

const result = await transformer._transform(batch);
assert.equal(true, result.data instanceof ReadableStream);

const reader = result.data
.pipeThrough(new TextDecoderStream())
.getReader();

let data = '';
while (true) {
const { done, value } = await reader.read();
if (done) {
break;
} else {
data += value;
}
}

assert.notEqual(data.length, 0);

const objects = [];
const resultLines = data.split('\n');
resultLines.pop();
for (const line of resultLines) {
objects.push(JSON.parse(line));
}

let index = 0;
for (const obj of objects) {
hhoughgg marked this conversation as resolved.
Show resolved Hide resolved
assert.equal(obj.dispatcher, 'was here!');
delete obj.dispatcher;

assert.equal(`${JSON.stringify(obj)}\n`, lines[index]);
index++;
}
}
},
};
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
using Workerd = import "/workerd/workerd.capnp";

const unitTests :Workerd.Config = (
services = [
( name = "transform-test",
worker = (
modules = [
(name = "worker", esModule = embed "transform-test.js")
],
compatibilityDate = "2024-08-15",
compatibilityFlags = ["nodejs_compat", "experimental"]
)
)
]
);
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
{
"extends": "../../../tsconfig.json",
"compilerOptions": {
"checkJs": true,
"noEmit": true
},
"include": ["*.ts", "*.js"],
"files": ["../../../../../types/defines/pipeline-transform.d.ts"]
}
3 changes: 3 additions & 0 deletions src/cloudflare/pipeline-transform.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
import { PipelineTransformImpl } from 'cloudflare-internal:pipeline-transform';

export const PipelineTransform = PipelineTransformImpl;
13 changes: 13 additions & 0 deletions types/defines/pipeline-transform.d.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
// Copyright (c) 2022-2023 Cloudflare, Inc.
// Licensed under the Apache 2.0 license found in the LICENSE file or at:
// https://opensource.org/licenses/Apache-2.0

export abstract class PipelineTransform {
/**
* transformJson recieves an array of javascript objects which can be
* mutated and returned to the pipeline
* @param data The data to be mutated
* @returns A promise containing the mutated data
*/
public transformJson(data: object[]): Promise<object[]>;
}
Loading