Skip to content

Commit

Permalink
Make workflow instance.status() return output equal to production w…
Browse files Browse the repository at this point in the history
…orkflows (#7575)

* feat: Make `instance.status()` return output equal to production

Previously,in local dev, the `output` field would return the list of successful steps
outputs in the workflow. This is not expected behaviour compared to
production workflows (where the output is the actual return of the
`run` function), probably just a implementation detail not set left-over from before beta.

This commit makes it so that `output` is equal to production behaviour.
For observability sake, I kept the old step output list in a different
field `__LOCAL_DEV_STEP_OUTPUTS` - I think this is a good enough
compromise right now since we want local dev to be correct against prod
first. We can remove `__LOCAL_DEV_STEP_OUTPUTS` later, once we figure
out on how to add custom stuff to the devtools page.

* fix types

* fix tests

* chore: add better types for readLogs
  • Loading branch information
LuisDuarte1 authored and penalosa committed Jan 10, 2025
1 parent e2f8e0b commit 33ae781
Show file tree
Hide file tree
Showing 8 changed files with 85 additions and 32 deletions.
6 changes: 6 additions & 0 deletions .changeset/great-flowers-compete.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
"@cloudflare/workflows-shared": patch
"miniflare": patch
---

Make `Instance.status()` return type the same as production
4 changes: 2 additions & 2 deletions fixtures/workflow-multiple/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ export class Demo extends WorkflowEntrypoint<{}, Params> {
};
});

return [result, result2, timestamp, payload, "workflow1"];
return "i'm workflow1";
}
}

Expand All @@ -53,7 +53,7 @@ export class Demo2 extends WorkflowEntrypoint<{}, Params> {
};
});

return [result, result2, timestamp, payload, "workflow2"];
return "i'm workflow2";
}
}

Expand Down
12 changes: 8 additions & 4 deletions fixtures/workflow-multiple/tests/index.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@ describe("Workflows", () => {
id: "test",
status: {
status: "running",
output: [],
__LOCAL_DEV_STEP_OUTPUTS: [],
output: null,
},
};

Expand All @@ -65,7 +66,8 @@ describe("Workflows", () => {
id: "test",
status: {
status: "running",
output: [{ output: "First step result" }],
__LOCAL_DEV_STEP_OUTPUTS: [{ output: "First step result" }],
output: null,
},
};
await Promise.all([
Expand Down Expand Up @@ -96,10 +98,11 @@ describe("Workflows", () => {
id: "test",
status: {
status: "complete",
output: [
__LOCAL_DEV_STEP_OUTPUTS: [
{ output: "First step result" },
{ output: "workflow1" },
],
output: "i'm workflow1",
},
});
},
Expand All @@ -113,10 +116,11 @@ describe("Workflows", () => {
id: "test",
status: {
status: "complete",
output: [
__LOCAL_DEV_STEP_OUTPUTS: [
{ output: "First step result" },
{ output: "workflow2" },
],
output: "i'm workflow2",
},
});
},
Expand Down
2 changes: 1 addition & 1 deletion fixtures/workflow/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ export class Demo extends WorkflowEntrypoint<{}, Params> {
};
});

return [result, result2, timestamp, payload];
return "i'm a workflow output";
}
}

Expand Down
12 changes: 8 additions & 4 deletions fixtures/workflow/tests/index.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ describe("Workflows", () => {
fetchJson(`http://${ip}:${port}/create?workflowName=test`)
).resolves.toEqual({
status: "running",
output: [],
__LOCAL_DEV_STEP_OUTPUTS: [],
output: null,
});

await vi.waitFor(
Expand All @@ -55,7 +56,8 @@ describe("Workflows", () => {
fetchJson(`http://${ip}:${port}/status?workflowName=test`)
).resolves.toEqual({
status: "running",
output: [{ output: "First step result" }],
__LOCAL_DEV_STEP_OUTPUTS: [{ output: "First step result" }],
output: null,
});
},
{ timeout: 5000 }
Expand All @@ -67,10 +69,11 @@ describe("Workflows", () => {
fetchJson(`http://${ip}:${port}/status?workflowName=test`)
).resolves.toEqual({
status: "complete",
output: [
__LOCAL_DEV_STEP_OUTPUTS: [
{ output: "First step result" },
{ output: "Second step result" },
],
output: "i'm a workflow output",
});
},
{ timeout: 5000 }
Expand All @@ -80,7 +83,8 @@ describe("Workflows", () => {
it("creates a workflow without id", async ({ expect }) => {
await expect(fetchJson(`http://${ip}:${port}/create`)).resolves.toEqual({
status: "running",
output: [],
__LOCAL_DEV_STEP_OUTPUTS: [],
output: null,
});
});

Expand Down
15 changes: 12 additions & 3 deletions packages/miniflare/test/plugins/workflows/index.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,10 @@ test("persists Workflow data on file-system between runs", async (t) => {
t.teardown(() => mf.dispose());

let res = await mf.dispatchFetch("http://localhost");
t.is(await res.text(), '{"status":"running","output":[]}');
t.is(
await res.text(),
'{"status":"running","__LOCAL_DEV_STEP_OUTPUTS":[],"output":null}'
);

// there's no waitUntil in ava haha
const begin = performance.now();
Expand All @@ -50,7 +53,10 @@ test("persists Workflow data on file-system between runs", async (t) => {
const res = await mf.dispatchFetch("http://localhost");
console.log(test);
test = await res.text();
if (test === '{"status":"complete","output":["yes you are"]}') {
if (
test ===
'{"status":"complete","__LOCAL_DEV_STEP_OUTPUTS":["yes you are"],"output":"I\'m a output string"}'
) {
success = true;
break;
}
Expand All @@ -68,5 +74,8 @@ test("persists Workflow data on file-system between runs", async (t) => {

// state should be persisted now
res = await mf.dispatchFetch("http://localhost");
t.is(await res.text(), '{"status":"complete","output":["yes you are"]}');
t.is(
await res.text(),
'{"status":"complete","__LOCAL_DEV_STEP_OUTPUTS":["yes you are"],"output":"I\'m a output string"}'
);
});
33 changes: 26 additions & 7 deletions packages/workflows-shared/src/binding.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import type {
DatabaseVersion,
DatabaseWorkflow,
Engine,
EngineLogs,
} from "./engine";

type Env = {
Expand Down Expand Up @@ -93,16 +94,34 @@ export class WorkflowHandle extends RpcTarget implements WorkflowInstance {
throw new Error("Not implemented yet");
}

public async status(): Promise<InstanceStatus> {
public async status(): Promise<
InstanceStatus & { __LOCAL_DEV_STEP_OUTPUTS: unknown[] }
> {
const status = await this.stub.getStatus(0, this.id);
const { logs } = await this.stub.readLogs();
// @ts-expect-error TODO: Fix this

// NOTE(lduarte): for some reason, sync functions over RPC are typed as never instead of Promise<EngineLogs>
const { logs } =
await (this.stub.readLogs() as unknown as Promise<EngineLogs>);

const workflowSuccessEvent = logs
.filter((log) => log.event === InstanceEvent.WORKFLOW_SUCCESS)
.at(0);

const filteredLogs = logs.filter(
// @ts-expect-error TODO: Fix this
(log) => log.event === InstanceEvent.STEP_SUCCESS
);
// @ts-expect-error TODO: Fix this
const output = filteredLogs.map((log) => log.metadata.result);
return { status: instanceStatusName(status), output }; // output, error
const stepOutputs = filteredLogs.map((log) => log.metadata.result);

const workflowOutput =
workflowSuccessEvent !== undefined
? workflowSuccessEvent.metadata.result
: null;

return {
status: instanceStatusName(status),
__LOCAL_DEV_STEP_OUTPUTS: stepOutputs,
// @ts-expect-error types are wrong, will remove this expect-error once I fix them
output: workflowOutput,
}; // output, error
}
}
33 changes: 22 additions & 11 deletions packages/workflows-shared/src/engine.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,7 @@ import {
startGracePeriod,
} from "./lib/gracePeriodSemaphore";
import { TimePriorityQueue } from "./lib/timePriorityQueue";
import type {
InstanceLogsResponse,
InstanceMetadata,
RawInstanceLog,
} from "./instance";
import type { InstanceMetadata, RawInstanceLog } from "./instance";
import type { WorkflowEntrypoint, WorkflowEvent } from "cloudflare:workers";

export interface Env {
Expand Down Expand Up @@ -53,6 +49,19 @@ export type DatabaseInstance = {
ended_on: string | null;
};

export type Log = {
event: InstanceEvent;
group: string | null;
target: string | null;
metadata: {
result: unknown;
};
};

export type EngineLogs = {
logs: Log[];
};

const ENGINE_STATUS_KEY = "ENGINE_STATUS";

export class Engine extends DurableObject<Env> {
Expand Down Expand Up @@ -121,18 +130,20 @@ export class Engine extends DurableObject<Env> {
return [];
}

readLogs(): InstanceLogsResponse {
readLogs(): EngineLogs {
const logs = [
...this.ctx.storage.sql.exec<Record<string, string | number>>(
"SELECT event, groupKey, target, metadata FROM states"
),
...this.ctx.storage.sql.exec<{
event: InstanceEvent;
groupKey: string | null;
target: string | null;
metadata: string;
}>("SELECT event, groupKey, target, metadata FROM states"),
];

return {
// @ts-expect-error TODO: Fix this
logs: logs.map((log) => ({
...log,
metadata: JSON.parse(log.metadata as string),
metadata: JSON.parse(log.metadata),
group: log.groupKey,
})),
};
Expand Down

0 comments on commit 33ae781

Please sign in to comment.