Tutorial: Cloudflare Workflows for Long-Running Tasks
This tutorial walks through implementing durable, long-running background tasks using Cloudflare Workflows. Learn how to orchestrate multi-step processes that can survive restarts, handle failures gracefully, and integrate with AI services.
Table of Contents
- Overview
- Why Cloudflare Workflows?
- Project Setup
- Basic Workflow Structure
- Steps and State Management
- Error Handling
- Accessing Environment Variables
- Database Operations in Workflows
- AI Integration Example
- Triggering Workflows
- Complete Example
- Monitoring and Debugging
- Best Practices
Overview
Cloudflare Workflows provide:
- Durable Execution: Steps are persisted and can resume after failures
- Hibernation: Workflows pause between steps, saving compute costs
- Automatic Retries: Failed steps can be retried automatically
- Long Running: No 30-second timeout like regular Workers
Use Cases
- AI document processing pipelines
- Multi-step data analysis
- Background report generation
- Complex async workflows
- Scheduled batch processing
Why Cloudflare Workflows?
| Feature | Regular Workers | Workflows |
|---|---|---|
| Max Duration | 30 seconds | Unlimited |
| State Persistence | None | Automatic |
| Failure Recovery | Manual | Automatic |
| Step Orchestration | Manual | Built-in |
| Cost | Pay per request | Pay per step |
Project Setup
1. Configure wrangler.jsonc
// wrangler.jsonc
{
"name": "my-app",
"main": "workers/app.ts",
"compatibility_date": "2024-01-01",
// Define your workflows
"workflows": [
{
"name": "invention-analysis-workflow",
"binding": "INVENTION_ANALYSIS_WORKFLOW",
"class_name": "InventionAnalysisWorkflow",
"script_name": "my-app"
},
{
"name": "report-generation-workflow",
"binding": "REPORT_GENERATION_WORKFLOW",
"class_name": "ReportGenerationWorkflow",
"script_name": "my-app"
}
]
}
2. Export Workflows from Worker
// workers/app.ts
import { createRequestHandler } from "react-router";
// Export your workflow classes
export { InventionAnalysisWorkflow } from "../workflows/invention-analysis";
export { ReportGenerationWorkflow } from "../workflows/report-generation";
// Normal worker handler
export default {
async fetch(request: Request, env: Env, ctx: ExecutionContext) {
// Your normal request handling
},
};
3. TypeScript Types
// workers/env.d.ts
interface Env {
// Database
DATABASE: D1Database;
// Workflow bindings
INVENTION_ANALYSIS_WORKFLOW: Workflow;
REPORT_GENERATION_WORKFLOW: Workflow;
// API keys
ANTHROPIC_API_KEY: string;
OPENAI_API_KEY: string;
}
Basic Workflow Structure
// workflows/my-workflow.ts
import {
WorkflowEntrypoint,
type WorkflowEvent,
type WorkflowStep,
} from "cloudflare:workers";
// Define your payload type
export interface MyWorkflowPayload {
userId: string;
documentId: string;
}
// Extend WorkflowEntrypoint with Env and Payload types
export class MyWorkflow extends WorkflowEntrypoint<Env, MyWorkflowPayload> {
// Main workflow logic
async run(
event: WorkflowEvent<MyWorkflowPayload>,
step: WorkflowStep
) {
// Access payload
const { userId, documentId } = event.payload;
console.log(`[Workflow] Starting for user ${userId}`);
// Step 1: Fetch data
const data = await step.do("fetch-data", async () => {
// This code runs in a durable step
return { fetched: true };
});
// Step 2: Process data
const result = await step.do("process-data", async () => {
// Use data from previous step
return { processed: true, input: data };
});
// Return final result
return {
success: true,
result,
};
}
}
Steps and State Management
The step.do() Function
Each step.do() call:
- Executes the async function
- Persists the result
- On replay, returns the persisted result (doesn't re-execute)
// Each step is idempotent - runs once and caches result
const extractionResult = await step.do("extract-data", async () => {
console.log("[Step 1] Extracting data...");
// Expensive operation that should only run once
const result = await callAIService(data);
console.log("[Step 1] Extraction complete");
return result;
});
// This step uses the result from the previous step
const analysisResult = await step.do("analyze-data", async () => {
console.log("[Step 2] Analyzing extracted data...");
// extractionResult is available here
const analysis = await processData(extractionResult);
return analysis;
});
Step Naming Best Practices
// Good: Descriptive, unique names
await step.do("extract-invention-features", async () => { ... });
await step.do("generate-search-queries", async () => { ... });
await step.do("screen-patents-batch-1", async () => { ... });
// Bad: Generic or duplicate names
await step.do("step1", async () => { ... });
await step.do("process", async () => { ... });
Passing Data Between Steps
async run(event: WorkflowEvent<Payload>, step: WorkflowStep) {
// Step 1: Returns data
const userData = await step.do("fetch-user", async () => {
return { name: "John", email: "john@example.com" };
});
// Step 2: Uses data from step 1
const enrichedData = await step.do("enrich-user", async () => {
// userData is available here
return {
...userData,
enrichedAt: new Date().toISOString(),
};
});
// Step 3: Uses data from step 2
await step.do("save-user", async () => {
await saveToDatabase(enrichedData);
});
}
Error Handling
Try-Catch in Workflows
async run(event: WorkflowEvent<Payload>, step: WorkflowStep) {
const { inventionId, userId } = event.payload;
try {
// Steps that might fail
const result = await step.do("risky-operation", async () => {
const response = await fetch("https://api.example.com/data");
if (!response.ok) {
throw new Error(`API error: ${response.status}`);
}
return response.json();
});
return { success: true, result };
} catch (error) {
console.error(`[Workflow ERROR] Failed for ${inventionId}:`, error);
// Update status to failed in database
try {
const db = await getDb(this.env.DATABASE);
await updateStatus(db, {
inventionId,
status: "failed",
errorMessage: error instanceof Error ? error.message : "Unknown error",
});
} catch (updateError) {
console.error("Failed to update status:", updateError);
}
// Re-throw to mark workflow as failed
throw error;
}
}
Step-Level Error Handling
const result = await step.do("api-call-with-retry", async () => {
try {
return await callExternalAPI();
} catch (error) {
// Log but don't throw - return error state instead
console.error("API call failed:", error);
return {
success: false,
error: error instanceof Error ? error.message : "Unknown"
};
}
});
// Check result and handle accordingly
if (!result.success) {
// Handle failure gracefully
await step.do("handle-failure", async () => {
await notifyAdmin(result.error);
});
}
Accessing Environment Variables
The this.env Pattern
export class MyWorkflow extends WorkflowEntrypoint<Env, Payload> {
// Create clients using environment variables
private getAnthropicClient() {
return new Anthropic({
apiKey: this.env.ANTHROPIC_API_KEY,
});
}
private getStripeClient() {
return new Stripe(this.env.STRIPE_SECRET_KEY);
}
async run(event: WorkflowEvent<Payload>, step: WorkflowStep) {
// Access env directly
const webhookUrl = this.env.SLACK_WEBHOOK_URL;
// Or use helper methods
const anthropic = this.getAnthropicClient();
await step.do("call-ai", async () => {
return await anthropic.messages.create({ ... });
});
}
}
Important: Recreate Clients in Steps
Due to workflow hibernation, create clients fresh in each step:
// Good: Create client inside the step
await step.do("call-api", async () => {
const client = this.getAnthropicClient(); // Fresh client
return await client.messages.create({ ... });
});
// Bad: Client created outside step may be stale after hibernation
const client = this.getAnthropicClient();
await step.do("call-api", async () => {
return await client.messages.create({ ... }); // May fail!
});
Database Operations in Workflows
Getting Database Connection
import { getDb } from "@/db";
export class MyWorkflow extends WorkflowEntrypoint<Env, Payload> {
async run(event: WorkflowEvent<Payload>, step: WorkflowStep) {
// Step with database operation
await step.do("update-status", async () => {
// Create fresh DB connection in each step
const db = await getDb(this.env.DATABASE);
await updateRecord(db, {
id: event.payload.recordId,
status: "processing",
});
});
// Another step with database
const data = await step.do("fetch-data", async () => {
const db = await getDb(this.env.DATABASE);
return await getRecordById(db, event.payload.recordId);
});
}
}
Using Repository Functions
import { updateAnalysis, initializeAnalysis } from "@/repositories/analysis";
import { updateInvention } from "@/repositories/invention";
await step.do("initialize-analysis", async () => {
const db = await getDb(this.env.DATABASE);
await initializeAnalysis(db, {
inventionId: event.payload.inventionId,
extractedData: result,
});
await updateInvention(db, {
inventionId: event.payload.inventionId,
userId: event.payload.userId,
data: {
status: "analyzing",
},
});
});
AI Integration Example
Structured Output with Zod
import Anthropic from "@anthropic-ai/sdk";
import { z } from "zod";
import { zodToJsonSchema } from "zod-to-json-schema";
// Define your schema
const extractionSchema = z.object({
title: z.string(),
description: z.string(),
features: z.array(z.string()),
});
// Helper function for AI calls with schema validation
async function callAnthropicWithSchema<T extends z.ZodTypeAny>(
client: Anthropic,
{
model,
systemPrompt,
userPrompt,
schema,
schemaName,
}: {
model: string;
systemPrompt: string;
userPrompt: string;
schema: T;
schemaName: string;
}
): Promise<z.infer<T>> {
// Convert Zod to JSON Schema for the prompt
const jsonSchema = zodToJsonSchema(schema, schemaName);
const enhancedSystemPrompt = `${systemPrompt}
You MUST respond with ONLY valid JSON matching this schema:
${JSON.stringify(jsonSchema, null, 2)}
Return ONLY the JSON object, no markdown, no explanations.`;
const response = await client.messages.create({
model,
max_tokens: 4096,
system: enhancedSystemPrompt,
messages: [{ role: "user", content: userPrompt }],
});
// Extract text content
const textContent = response.content
.filter((block) => block.type === "text")
.map((block) => block.text)
.join("");
// Parse JSON (handle markdown code blocks)
let cleaned = textContent.trim();
const codeBlockMatch = cleaned.match(/^```(?:json)?\s*([\s\S]*?)\s*```$/);
if (codeBlockMatch) {
cleaned = codeBlockMatch[1].trim();
}
const parsed = JSON.parse(cleaned);
// Validate with Zod
return schema.parse(parsed);
}
Using in Workflow Steps
export class AnalysisWorkflow extends WorkflowEntrypoint<Env, Payload> {
private getAnthropicClient() {
return new Anthropic({
apiKey: this.env.ANTHROPIC_API_KEY,
});
}
async run(event: WorkflowEvent<Payload>, step: WorkflowStep) {
// Step 1: Extract structured data
const extraction = await step.do("extract-features", async () => {
const client = this.getAnthropicClient();
return await callAnthropicWithSchema(client, {
model: "claude-sonnet-4-5",
systemPrompt: "Extract invention features from the conversation.",
userPrompt: event.payload.conversation,
schema: extractionSchema,
schemaName: "InventionExtraction",
});
});
console.log(`Extracted ${extraction.features.length} features`);
// Step 2: Generate queries based on extraction
const queries = await step.do("generate-queries", async () => {
const client = this.getAnthropicClient();
return await callAnthropicWithSchema(client, {
model: "claude-sonnet-4-5",
systemPrompt: "Generate search queries for patent research.",
userPrompt: JSON.stringify(extraction.features),
schema: querySchema,
schemaName: "SearchQueries",
});
});
return { extraction, queries };
}
}
Triggering Workflows
From tRPC Routes
// app/trpc/routes/invention.ts
export const inventionRouter = createTRPCRouter({
startAnalysis: protectedProcedure
.input(z.object({ inventionId: z.string() }))
.mutation(async ({ ctx, input }) => {
// Get invention data
const invention = await getInventionById(ctx.db, input);
// Trigger workflow
const instance = await ctx.workflows.InventionAnalysisWorkflow.create({
params: {
inventionId: input.inventionId,
userId: ctx.auth.user.id,
conversation: invention.conversation,
},
});
// Update status
await updateInvention(ctx.db, {
inventionId: input.inventionId,
data: {
status: "analyzing",
workflowInstanceId: instance.id,
},
});
return {
success: true,
instanceId: instance.id
};
}),
});
Workflow Bindings in tRPC Context
// app/trpc/index.ts
export const createTRPCContext = async (opts: {
cfContext: Env;
}) => {
return {
// ... other context
workflows: {
InventionAnalysisWorkflow: opts.cfContext.INVENTION_ANALYSIS_WORKFLOW,
ReportGenerationWorkflow: opts.cfContext.REPORT_GENERATION_WORKFLOW,
},
};
};
Complete Example
Here's a complete multi-step workflow:
// workflows/invention-analysis.ts
import { getDb } from "@/db";
import {
WorkflowEntrypoint,
type WorkflowEvent,
type WorkflowStep,
} from "cloudflare:workers";
import Anthropic from "@anthropic-ai/sdk";
import { updateInvention } from "@/repositories/invention";
import { initializeAnalysis, updateAnalysis } from "@/repositories/analysis";
import { notifyReportGenerated } from "@/repositories/slack";
export interface AnalysisWorkflowPayload {
inventionId: string;
userId: string;
conversation: Array<{ type: string; message: string }>;
}
export class InventionAnalysisWorkflow extends WorkflowEntrypoint<
Env,
AnalysisWorkflowPayload
> {
private getAnthropicClient() {
return new Anthropic({
apiKey: this.env.ANTHROPIC_API_KEY,
});
}
async run(
event: WorkflowEvent<AnalysisWorkflowPayload>,
step: WorkflowStep
) {
const { inventionId, userId, conversation } = event.payload;
console.log(`[Workflow] Starting analysis for ${inventionId}`);
try {
// === Step 1: Extract Features ===
const extraction = await step.do("extract-features", async () => {
console.log("[Step 1] Extracting features...");
const client = this.getAnthropicClient();
const response = await client.messages.create({
model: "claude-sonnet-4-5",
max_tokens: 4096,
system: "Extract invention features from the conversation.",
messages: [{
role: "user",
content: JSON.stringify(conversation),
}],
});
const text = response.content
.filter((b) => b.type === "text")
.map((b) => b.text)
.join("");
const result = JSON.parse(text);
// Save to database
const db = await getDb(this.env.DATABASE);
await initializeAnalysis(db, {
inventionId,
extractedData: result,
});
console.log(`[Step 1] Extracted ${result.features.length} features`);
return result;
});
// === Step 2: Search Prior Art ===
const searchResults = await step.do("search-prior-art", async () => {
console.log("[Step 2] Searching prior art...");
// Search for each feature in parallel
const results = await Promise.all(
extraction.features.map((feature) =>
fetch(`${this.env.PATENT_API_URL}/search?q=${encodeURIComponent(feature)}`)
.then((r) => r.json())
)
);
// Deduplicate results
const uniquePatents = [...new Map(
results.flat().map((p) => [p.id, p])
).values()];
console.log(`[Step 2] Found ${uniquePatents.length} patents`);
return uniquePatents;
});
// === Step 3: Analyze Patents ===
const analysis = await step.do("analyze-patents", async () => {
console.log("[Step 3] Analyzing patents...");
const client = this.getAnthropicClient();
const analyses = [];
for (const patent of searchResults.slice(0, 10)) {
const response = await client.messages.create({
model: "claude-haiku-4-5",
max_tokens: 1024,
system: "Analyze patent relevance to the invention.",
messages: [{
role: "user",
content: `Invention: ${extraction.description}\nPatent: ${patent.abstract}`,
}],
});
analyses.push({
patentId: patent.id,
analysis: response.content[0].text,
});
}
console.log(`[Step 3] Analyzed ${analyses.length} patents`);
return analyses;
});
// === Step 4: Generate Report ===
const report = await step.do("generate-report", async () => {
console.log("[Step 4] Generating report...");
const client = this.getAnthropicClient();
const response = await client.messages.create({
model: "claude-sonnet-4-5",
max_tokens: 8192,
system: "Generate a patent analysis report.",
messages: [{
role: "user",
content: JSON.stringify({ extraction, analysis }),
}],
});
const reportText = response.content[0].text;
// Save to database
const db = await getDb(this.env.DATABASE);
await updateAnalysis(db, {
inventionId,
status: "completed",
finalReport: reportText,
});
await updateInvention(db, {
inventionId,
userId,
data: {
status: "completed",
analysis: reportText,
},
});
console.log(`[Step 4] Report generated (${reportText.length} chars)`);
return reportText;
});
// === Step 5: Send Notification ===
await step.do("send-notification", async () => {
if (!this.env.SLACK_WEBHOOK_URL) {
console.log("[Step 5] Slack not configured, skipping");
return;
}
await notifyReportGenerated(this.env.SLACK_WEBHOOK_URL, {
inventionId,
inventionTitle: extraction.title,
});
console.log("[Step 5] Notification sent");
});
console.log(`[Workflow] Completed successfully for ${inventionId}`);
return {
success: true,
inventionId,
featuresExtracted: extraction.features.length,
patentsAnalyzed: analysis.length,
};
} catch (error) {
console.error(`[Workflow ERROR] ${inventionId}:`, error);
// Update status to failed
try {
const db = await getDb(this.env.DATABASE);
await updateAnalysis(db, {
inventionId,
status: "failed",
});
await updateInvention(db, {
inventionId,
userId,
data: { status: "failed" },
});
} catch (e) {
console.error("Failed to update status:", e);
}
throw error;
}
}
}
Monitoring and Debugging
Logging Best Practices
// Use structured logging with step identifiers
console.log(`[Step 1] Starting extraction for ${inventionId}`);
console.log(`[Step 1] Extracted ${features.length} features`);
console.error(`[Step 1 ERROR] Extraction failed:`, error);
// Log workflow lifecycle
console.log(`[Workflow] Starting for ${inventionId} (user: ${userId})`);
console.log(`[Workflow] Completed successfully`);
console.error(`[Workflow ERROR] Failed:`, error);
Checking Workflow Status
// Get workflow instance status
const instance = await ctx.workflows.MyWorkflow.get(instanceId);
console.log("Status:", instance.status); // "running", "complete", "errored"
Best Practices
1. Step Granularity
- Too Fine: Many small steps = overhead
- Too Coarse: One big step = no recovery benefit
- Just Right: Logical units of work
// Good: Logical groupings
await step.do("extract-and-validate", async () => { ... });
await step.do("search-all-databases", async () => { ... });
await step.do("generate-report", async () => { ... });
// Bad: Too granular
await step.do("parse-json", async () => { ... });
await step.do("validate-field-1", async () => { ... });
await step.do("validate-field-2", async () => { ... });
2. Recreate Clients in Steps
// Always create fresh clients inside steps
await step.do("call-api", async () => {
const client = this.getApiClient(); // Fresh client
return await client.call();
});
3. Handle Partial Failures
const results = await step.do("process-batch", async () => {
const successes = [];
const failures = [];
for (const item of items) {
try {
successes.push(await process(item));
} catch (e) {
failures.push({ item, error: e.message });
}
}
return { successes, failures };
});
// Continue with partial results
if (results.failures.length > 0) {
console.warn(`${results.failures.length} items failed`);
}
4. Use Meaningful Return Values
return {
success: true,
summary: {
inventionId,
featuresExtracted: extraction.features.length,
patentsFound: searchResults.length,
patentsAnalyzed: analysis.length,
reportLength: report.length,
},
timing: {
startedAt: event.timestamp,
completedAt: new Date().toISOString(),
},
};
5. Idempotent Operations
Design steps to be safely re-runnable:
// Good: Use upsert or check-then-create
await step.do("save-result", async () => {
const db = await getDb(this.env.DATABASE);
await db.insert(results)
.values({ id: inventionId, data: result })
.onConflictDoUpdate({
target: results.id,
set: { data: result, updatedAt: new Date() },
});
});
