Skip to content

Commit

Permalink
Adding class for new pipelines product
Browse files Browse the repository at this point in the history
  • Loading branch information
hhoughgg committed Aug 20, 2024
1 parent c1f8e1c commit c595b00
Show file tree
Hide file tree
Showing 8 changed files with 310 additions and 0 deletions.
1 change: 1 addition & 0 deletions .vscode/launch.json
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,7 @@
"options": [
"src/cloudflare/internal/test/d1/d1-api-test.wd-test",
"src/cloudflare/internal/test/vectorize/vectorize-api-test.wd-test",
"src/cloudflare/internal/test/pipeline-transform/pipeline-transform.wd-test",
"src/workerd/api/actor-alarms-delete-test.wd-test",
"src/workerd/api/actor-alarms-test.wd-test",
"src/workerd/api/analytics-engine-test.wd-test",
Expand Down
146 changes: 146 additions & 0 deletions src/cloudflare/internal/pipeline-transform.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
// Copyright (c) 2024 Cloudflare, Inc.
// Licensed under the Apache 2.0 license found in the LICENSE file or at:
// https://opensource.org/licenses/Apache-2.0

import entrypoints from 'cloudflare-internal:workers'

async function* readLines(stream: ReadableStream<string>): AsyncGenerator<string> {
let start, end = 0
let partial = ''

// @ts-ignore
for await (const chunk of stream) {
const full = chunk+partial as string
for (const char of full) {
if (char === '\n') {
yield chunk.substring(start, end)
end++
start = end
} else {
end++
}
}

partial = chunk.substring(start, end)
start = 0
end = 0
}

if (partial.length > 0) {
yield partial
}
}

type Batch = {
id: string // unique identifier for the batch
shard?: string // assigned shard
ts: number // timestamp of the event

format: Format
size: {
bytes: number
rows: number
}
data: unknown
}

type JsonStream = Batch & {
format: Format.JSON_STREAM,
data: ReadableStream<Uint8Array>
}

enum Format {
JSON_STREAM = 'json_stream', //jsonl
}

export class PipelineTransformImpl extends entrypoints.WorkerEntrypoint {
#batch?: Batch
#initalized: boolean = false

constructor(ctx: unknown, env: unknown) {
super(ctx, env)
}

// stub overriden on the sub class
// @ts-ignore
public async transformJson(data: object[]): Promise<object[]> {
throw new Error('should be implemented by parent')
}

// called by the dispatcher which then calls the subclass methods
// @ts-ignore
private async _ping(): Promise<void> {
// making sure the function was overriden by an implementing subclass
if (this.transformJson !== PipelineTransformImpl.prototype.transformJson) {
return
} else {
throw new Error('the transformJson method must be overridden by the PipelineTransform subclass')
}
}

// called by the dispatcher which then calls the subclass methods
// the reason this is typescript private and not javascript private is that this must be
// able to be called by the dispatcher but should not be called by the class implementer
// @ts-ignore
private async _transform(batch: Batch): Promise<JsonStream> {
if (this.#initalized) {
throw new Error('pipeline entrypoint has already been initialized')
}

this.#batch = batch
this.#initalized = true

switch (this.#batch!.format) {
case Format.JSON_STREAM:
const data = await this.#readJsonStream()
const transformed = await this.transformJson(data)
return this.#sendJson(transformed)
default:
throw new Error('unsupported batch format')
}
}

async #readJsonStream(): Promise<object[]> {
if (this.#batch!.format !== Format.JSON_STREAM) {
throw new Error(`expected JSON_STREAM not ${this.#batch!.format}`)
}

const batch = this.#batch!.data as ReadableStream<Uint8Array>
const decoder = batch.pipeThrough(new TextDecoderStream())

const data: object[] = []
for await (const line of readLines(decoder)) {
data.push(JSON.parse(line))
}

return data
}

async #sendJson(data: object[]): Promise<JsonStream> {
let written = 0
const encoder = new TextEncoder()
const readable = new ReadableStream<Uint8Array>({
start(controller) {
for (const obj of data) {
const encoded = encoder.encode(`${JSON.stringify(obj)}\n`)
written += encoded.length
controller.enqueue(encoded)
}

controller.close()
}
})

return {
id: this.#batch!.id,
shard: this.#batch!.shard,
ts: this.#batch!.ts,
format: Format.JSON_STREAM,
size: {
bytes: written,
rows: data.length
},
data: readable
} as JsonStream
}
}
7 changes: 7 additions & 0 deletions src/cloudflare/internal/test/pipeline-transform/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
load("//:build/wd_test.bzl", "wd_test")

wd_test(
src = "transform.wd-test",
args = ["--experimental"],
data = glob(["*.js"]),
)
108 changes: 108 additions & 0 deletions src/cloudflare/internal/test/pipeline-transform/transform-test.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
// @ts-nocheck
// Copyright (c) 2024 Cloudflare, Inc.
// Licensed under the Apache 2.0 license found in the LICENSE file or at:
// https://opensource.org/licenses/Apache-2.0

// @ts-ignore
import assert from 'node:assert'
import { PipelineTransform } from 'cloudflare:pipeline-transform'

// this is how "Pipeline" would be implemented by the user
const customTransform = class MyEntrypoint extends PipelineTransform {
/**
* @param {any} batch
* @override
*/
async transformJson(batch) {
for (const obj of batch) {
obj.dispatcher = 'was here!'
}

return batch
}
}

const lines = [
`${JSON.stringify({ name: 'jimmy', age: '42' })}\n`,
`${JSON.stringify({ name: 'jonny', age: '9' })}\n`,
`${JSON.stringify({ name: 'joey', age: '108' })}\n`,
]

function newBatch() {
return {
id: 'test',
shard: '0',
ts: Date.now(),
format: 'json_stream',
data: new ReadableStream({
start(controller) {
const encoder = new TextEncoder()
for (const line of lines) {
controller.enqueue(encoder.encode(line))
}
controller.close()
}
})
}
}

// bazel test //src/cloudflare/internal/test/pipeline-transform:transform --test_output=errors --sandbox_debug
export const tests = {
async test(ctr, env, ctx) {
{
// should fail dispatcher test call when PipelineTransform class not extended
const transformer = new PipelineTransform(ctx, env)
await assert.rejects(transformer._ping(), (err) => {
assert.strictEqual(err.message, 'the transformJson method must be overridden by the PipelineTransform subclass')
return true
})
}

{
// should correctly handle dispatcher test call
const transform = new customTransform(ctx, env)
await assert.doesNotReject(transform._ping())
}

{
// should return mutated batch
const transformer = new customTransform(ctx, env)
const batch = newBatch()

const result = await transformer._transform(batch)
assert.equal(true, result.data instanceof ReadableStream)

const reader = result.data
.pipeThrough(new TextDecoderStream())
.getReader()

let data = ''
while (true) {
const { done, value } = await reader.read()
if (done) {
break
} else {
data += value
}
}

assert.notEqual(data.length, 0)

const objects = []
const resultLines = data.split('\n')
resultLines.pop()
for (const line of resultLines) {
objects.push(JSON.parse(line))
}

let index = 0
for (const obj of objects) {
assert.equal(obj.dispatcher, 'was here!')
delete obj.dispatcher

assert.equal(`${JSON.stringify(obj)}\n`, lines[index])
index++
}
}
},
}
15 changes: 15 additions & 0 deletions src/cloudflare/internal/test/pipeline-transform/transform.wd-test
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
using Workerd = import "/workerd/workerd.capnp";

const unitTests :Workerd.Config = (
services = [
( name = "transform-test",
worker = (
modules = [
(name = "worker", esModule = embed "transform-test.js")
],
compatibilityDate = "2024-08-15",
compatibilityFlags = ["nodejs_compat", "experimental"]
)
)
]
);
12 changes: 12 additions & 0 deletions src/cloudflare/internal/test/pipeline-transform/tsconfig.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
{
"extends": "../../../tsconfig.json",
"compilerOptions": {
"checkJs": true,
"noEmit": true
},
"include": [
"*.ts",
"*.js",
],
"files": ["../../../../../types/defines/pipeline-transform.d.ts"]
}
3 changes: 3 additions & 0 deletions src/cloudflare/pipeline-transform.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
import { PipelineTransformImpl } from 'cloudflare-internal:pipeline-transform'

export const PipelineTransform = PipelineTransformImpl
18 changes: 18 additions & 0 deletions types/defines/pipeline-transform.d.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
// Copyright (c) 2022-2023 Cloudflare, Inc.
// Licensed under the Apache 2.0 license found in the LICENSE file or at:
// https://opensource.org/licenses/Apache-2.0

import { WorkerEntrypoint } from "cloudflare:workers";

/**
* The Pipelines class is called by the Pipelines product to support transformations
*/
declare abstract class PipelineTransform extends WorkerEntrypoint {
/**
* transformJson recieves an array of javascript objects which can be
* mutated and returned to the pipeline
* @param data The data to be mutated
* @returns A promise containing the mutated data
*/
public transformJson(data: object[]): Promise<object[]>
}

0 comments on commit c595b00

Please sign in to comment.