Skip to content

Commit

Permalink
feat(actors): expose ActorContext to actor RPCs & init
Browse files Browse the repository at this point in the history
  • Loading branch information
NathanFlurry committed Jul 2, 2024
1 parent 6b0c50b commit 5d52aa3
Show file tree
Hide file tree
Showing 15 changed files with 271 additions and 58 deletions.
22 changes: 11 additions & 11 deletions artifacts/runtime_archive.json

Large diffs are not rendered by default.

16 changes: 16 additions & 0 deletions docs/build/actors.mdx
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
---
title: Actor
---

<Note>
Documentation coming very soon!
</Note>

## Performance Tips

### Do as little work on the actor as possible

Opt to run as many database queries and other expensive operations on the request before sending a message to the actor.

Scripts scale horizontally while actors can be a performance bottleneck if misused.

4 changes: 2 additions & 2 deletions src/build/entrypoint.ts
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ export async function generateEntrypoint(project: Project, opts: BuildOpts) {
dependenciesCamel: DependenciesCamel,
}>(
config,
new ActorDriver(config),
new ActorDriver(config, dependencyCaseConversionMap, actorCaseConversionMap),
dependencyCaseConversionMap,
actorCaseConversionMap,
);
Expand Down Expand Up @@ -139,7 +139,7 @@ export async function generateEntrypoint(project: Project, opts: BuildOpts) {
dependenciesCamel: DependenciesCamel,
}>(
config,
new ActorDriver(config),
new ActorDriver(config, dependencyCaseConversionMap, actorCaseConversionMap),
dependencyCaseConversionMap,
actorCaseConversionMap,
);
Expand Down
9 changes: 5 additions & 4 deletions src/build/gen/module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,13 @@ export async function compileModuleHelper(
ModuleContext as ModuleContextInner,
TestContext as TestContextInner,
ScriptContext as ScriptContextInner,
ActorContext as ActorContextInner,
Runtime,
} from "${runtimePath}";
import config from "${runtimeConfigPath}";
import { dependencyCaseConversionMap } from "${dependencyCaseConversionMapPath}";
import { actorCaseConversionMap } from "${actorCaseConversionMapPath}";
import { ActorBase as ActorBaseInner } from ${JSON.stringify(genRuntimeActorPath(project))};
import { ActorBase } from ${JSON.stringify(genRuntimeActorPath(project))};
import { ActorDriver } from ${JSON.stringify(genRuntimeActorDriverPath(project, opts.runtime))};
`;

Expand All @@ -65,7 +66,7 @@ export async function compileModuleHelper(

// Common exports
helper.chunk.append`
export { RuntimeError };
export { RuntimeError, ActorBase };
`;

// Gen blocks
Expand Down Expand Up @@ -255,7 +256,7 @@ function genTest(
export function test(name: string, fn: TestFn) {
Runtime.test(
config,
new ActorDriver(config),
new ActorDriver(config, dependencyCaseConversionMap, actorCaseConversionMap),
"${module.name}",
name,
fn,
Expand Down Expand Up @@ -288,6 +289,6 @@ function genActor(
helper.chunk.withNewlinesPerChunk(1)
.newline()
.append`
export abstract class ActorBase<Input, State> extends ActorBaseInner<ModuleContextParams, Input, State> {}
export type ActorContext = ActorContextInner<ModuleContextParams>;
`;
}
23 changes: 18 additions & 5 deletions src/runtime/actor/actor.ts
Original file line number Diff line number Diff line change
@@ -1,21 +1,34 @@
import { ModuleContextParams } from "../context.ts";
import { Config } from "../mod.ts";
import { ActorContext } from "../mod.ts";
import { ScheduleDriver, StorageDriver } from "./driver.ts";

/**
* Actor implementation that user-made actors will extend.
*/
export abstract class ActorBase<
Params extends ModuleContextParams,
Input,
State,
> {
public state!: State;

public constructor(
public readonly config: Config,
public readonly storage: StorageDriver,
public readonly schedule: ScheduleDriver,
public state: State,
) {}

public abstract initialize(input: Input): State | Promise<State>;
public abstract initialize(ctx: ActorContext<ModuleContextParams>, input: Input): State | Promise<State>;

/**
* Runs a promise in the background.
*
* This allows the actor runtime to ensure that a promise completes while
* returning from an RPC request early.
*/
protected runInBackground(promise: Promise<void>) {
// TODO: Pass this to the actor driver

promise
.then(() => console.log("Actor background promise complete"))
.catch((err) => console.error("Actor background promise failed", err));
}
}
5 changes: 5 additions & 0 deletions src/runtime/actor/driver.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
import { Trace } from "../mod.ts";

export interface CreateOpts {
moduleName: string;
actorName: string;
instanceName: string;
input: unknown;
trace: Trace;
}

export interface CallOpts {
Expand All @@ -11,6 +14,7 @@ export interface CallOpts {
instanceName: string;
fn: string;
request: unknown;
trace: Trace;
}

export interface GetOrCreateAndCallOpts {
Expand All @@ -20,6 +24,7 @@ export interface GetOrCreateAndCallOpts {
input: unknown;
fn: string;
request: unknown;
trace: Trace;
}

export interface ExistsOpts {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ export class CloudflareDurableObjectsActorDriver implements ActorDriver {
actor: opts.actorName,
instance: opts.instanceName,
input: opts.input,
trace: opts.trace,
});
}

Expand All @@ -25,6 +26,7 @@ export class CloudflareDurableObjectsActorDriver implements ActorDriver {
return await stub.callRpc({
fn: opts.fn,
request: opts.request,
trace: opts.trace,
});
}

Expand All @@ -38,9 +40,11 @@ export class CloudflareDurableObjectsActorDriver implements ActorDriver {
actor: opts.actorName,
instance: opts.instanceName,
input: opts.input,
trace: opts.trace,
},
fn: opts.fn,
request: opts.request,
trace: opts.trace,
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,19 @@ import { DurableObject } from "cloudflare:workers";
import { CloudflareDurableObjectsStorage } from "./storage.ts";
import { CloudflareDurableObjectsSchedule } from "./schedule.ts";
import { ActorBase } from "../../actor.ts";
import { Config, ModuleContextParams } from "../../../mod.ts";
import {
Actor,
ActorContext,
appendTraceEntry,
Config,
Module,
ModuleContextParams,
Runtime,
Trace,
} from "../../../mod.ts";
import { RegistryCallMap } from "../../../proxy.ts";
import { ActorDriver } from "./driver.ts";
import { newTrace } from "../../../trace.ts";

const KEYS = {
META: {
Expand Down Expand Up @@ -43,25 +55,27 @@ interface InitOpts {
actor: string;
instance: string;
input: any;
trace: Trace;
ignoreAlreadyInitialized?: boolean;
}

interface GetOrCreateAndCallOpts {
init: InitOpts;
fn: string;
request: unknown;
trace: Trace;
}

interface CallRpcOpts {
fn: string;
request: unknown;
trace: Trace;
}

/*
* __GlobalDurableObject type used for referencing an instance of the class.
*/
export interface __GlobalDurableObjectT extends DurableObject {
constructActor(): Promise<ActorBase<ModuleContextParams, unknown, unknown>>;
init(opts: InitOpts): Promise<void>;
initialized(): Promise<boolean>;
getOrCreateAndCallRpc(opts: GetOrCreateAndCallOpts): Promise<any>;
Expand All @@ -71,6 +85,15 @@ export interface __GlobalDurableObjectT extends DurableObject {
get storage(): DurableObjectStorage;
}

/**
* Actor data & config read from the actor state.
*/
interface ActorMeta {
moduleName: string;
actorName: string;
state: any;
}

/**
* Generate a __GlobalDurableObject class that has access to the current config.
*
Expand All @@ -81,10 +104,31 @@ export interface __GlobalDurableObjectT extends DurableObject {
* is better since it ensures that you _can't_ create an instance of
* __GlobalDurableObject that doesn't have an associated config.
*/
export function buildGlobalDurableObjectClass(config: Config) {
export function buildGlobalDurableObjectClass(
config: Config,
dependencyCaseConversionMap: RegistryCallMap,
actorDependencyCaseConversionMap: RegistryCallMap,
) {
class __GlobalDurableObject extends DurableObject implements __GlobalDurableObjectT {
// TODO: optimize to use in-memory state
async constructActor(): Promise<ActorBase<ModuleContextParams, unknown, unknown>> {
private runtime: Runtime<ModuleContextParams>;

constructor(ctx: DurableObjectState, env: unknown) {
super(ctx, env);

this.runtime = new Runtime(
config,
new ActorDriver(config),
dependencyCaseConversionMap,
actorDependencyCaseConversionMap,
);
}

/**
* Reads the metadata related to this actor from storage.
*
* This data is set in `init`.
*/
async getMeta(): Promise<ActorMeta> {
// Create actor instance
const storageRes = await this.ctx.storage.get<string>([KEYS.META.MODULE, KEYS.META.ACTOR, KEYS.STATE]);
const moduleName = storageRes.get(KEYS.META.MODULE);
Expand All @@ -93,22 +137,45 @@ export function buildGlobalDurableObjectClass(config: Config) {
if (moduleName == undefined || actorName == undefined) throw new Error("actor not initialized");
if (state == undefined) throw Error("actor state not initiated");

return { moduleName, actorName, state };
}

// TODO: optimize to use in-memory state
private async constructActor(meta: ActorMeta): Promise<ActorBase<unknown, unknown>> {
// Get actor config
if (!(moduleName in config.modules)) throw new Error("module not found");
const moduleConfig = config.modules[moduleName];
if (!(actorName in moduleConfig.actors)) throw new Error("actor not found");
const actorConfig = moduleConfig.actors[actorName];
if (!(meta.moduleName in config.modules)) throw new Error("module not found");
const moduleConfig = config.modules[meta.moduleName];
if (!(meta.actorName in moduleConfig.actors)) throw new Error("actor not found");
const actorConfig = moduleConfig.actors[meta.actorName];

// TODO: cache actor instance in memory
// TODO: use ctx.waitUntil for all calls
// Run actor function
const storage = new CloudflareDurableObjectsStorage(this);
const schedule = new CloudflareDurableObjectsSchedule(this);
const actor = new (actorConfig.actor)(config, storage, schedule, state);
const actor = new (actorConfig.actor)(
new CloudflareDurableObjectsStorage(this),
new CloudflareDurableObjectsSchedule(this),
);

return actor;
}

private createActorContext(moduleName: string, actorName: string, trace: Trace): ActorContext<ModuleContextParams> {
// Build context
const module = config.modules[moduleName];
const context = new ActorContext<ModuleContextParams>(
this.runtime,
trace,
moduleName,
this.runtime.postgres.getOrCreatePrismaClient(this.runtime.config, module),
module.db?.schema,
actorName,
dependencyCaseConversionMap,
actorDependencyCaseConversionMap,
);

return context;
}

async init(opts: InitOpts) {
// Check if already initialized
if (await this.initialized()) {
Expand All @@ -123,8 +190,17 @@ export function buildGlobalDurableObjectClass(config: Config) {
});

// Build initial state
const actor = await this.constructActor();
const state = actor.initialize(opts.input);
const actor = await this.constructActor({
moduleName: opts.module,
actorName: opts.actor,
state: undefined,
});
const context = this.createActorContext(
opts.module,
opts.actor,
appendTraceEntry(opts.trace, { actorInitialize: { module: opts.module, actor: opts.actor } }),
);
const state = actor.initialize(context, opts.input);
await this.ctx.storage.put(KEYS.STATE, state);
}

Expand All @@ -138,18 +214,26 @@ export function buildGlobalDurableObjectClass(config: Config) {
ignoreAlreadyInitialized: true,
});

return await this.callRpc({ fn: opts.fn, request: opts.request });
return await this.callRpc({ fn: opts.fn, request: opts.request, trace: opts.trace });
}

async callRpc({ fn, request }: CallRpcOpts): Promise<any> {
const actor = await this.constructActor();
async callRpc({ fn, request, trace }: CallRpcOpts): Promise<any> {
const meta = await this.getMeta();
const actor = await this.constructActor(meta);
actor.state = meta.state;

const context = this.createActorContext(
meta.moduleName,
meta.actorName,
appendTraceEntry(trace, { actorCall: { module: meta.moduleName, actor: meta.actorName, fn } }),
);

// Call fn
let callRes = (actor as any)[fn](request);
let callRes = (actor as any)[fn](context, request);
if (callRes instanceof Promise) callRes = await callRes;

// Update state
await this.ctx.storage.put(KEYS.STATE, actor!.state);
await this.ctx.storage.put(KEYS.STATE, actor.state);

return callRes;
}
Expand Down Expand Up @@ -214,7 +298,8 @@ export function buildGlobalDurableObjectClass(config: Config) {
const event = scheduleEvents.get(eventKey)!;
try {
// TODO: how do we handle this promise cleanly?
this.callRpc({ fn: event.fn, request: event.request });
const res = this.callRpc({ fn: event.fn, request: event.request, trace: newTrace({ actorSchedule: {} }) });
if (res instanceof Promise) await res;
} catch (err) {
console.error("Failed to run scheduled event", err, event);
}
Expand Down
Loading

0 comments on commit 5d52aa3

Please sign in to comment.