Skip to content

Commit

Permalink
Updates readlines to yield eagerly
Browse files Browse the repository at this point in the history
  • Loading branch information
hhoughgg committed Aug 20, 2024
1 parent 4b9ef15 commit 7f09712
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 12 deletions.
29 changes: 18 additions & 11 deletions src/cloudflare/internal/pipeline-transform.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,31 +5,37 @@
import entrypoints from 'cloudflare-internal:workers'

async function* readLines(stream: ReadableStream<string>): AsyncGenerator<string> {
let start, end = 0
let partial = ''

// @ts-ignore
for await (const chunk of stream) {
const full = partial + chunk
const lines = full.split('\n')
const popped = lines.pop()
if (popped) {
partial = popped
const full = chunk+partial as string
for (const char of full) {
if (char === '\n') {
yield chunk.substring(start, end)
end++
start = end
} else {
end++
}
}

for (const line of lines) {
yield line;
}
partial = chunk.substring(start, end)
start = 0
end = 0
}

if (partial.length > 0) {
yield partial;
yield partial
}
}

type Batch = {
id: string // unique identifier for the batch
shard?: string // assigned shard
ts: number // timestamp of the event
metadata?: { [key: string]: string }

format: Format
data: unknown
Expand Down Expand Up @@ -60,16 +66,17 @@ export class PipelineTransformImpl extends entrypoints.WorkerEntrypoint {

// called by the dispatcher which then calls the subclass methods
// @ts-ignore
private async _transform(batch: Batch, ping?: boolean): Promise<JsonStream> {
private async _transform(batch: Batch): Promise<JsonStream> {
if (this.#initalized) {
throw new Error('pipeline entrypoint has already been initialized')
}

// if ping is defined it means this is a test call to make sure
// the pipeline entrypoint is setup correctly
if (ping) {
if (batch.metadata && batch.metadata['dispatcher'] === 'ping') {
// making sure the function was overriden by an implementing subclass
if (this.transformJson !== PipelineTransformImpl.prototype.transformJson) {
batch.metadata['dispatcher'] = 'pong'
return {
...batch,
data: batch.data as ReadableStream<Uint8Array>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,9 @@ export const tests = {
const transformer = new PipelineTransform(ctx, env)

try {
const ping = await transformer._transform(null, true)
const batch = newBatch()
batch.metadata = { dispatcher: 'ping' }
const ping = await transformer._transform(batch)
throw new Error('this test should trigger catch')
} catch(e) {
assert.equal(e.message, 'the transformJson method must be overidden by the PipelineTransform subclass')
Expand Down

0 comments on commit 7f09712

Please sign in to comment.