Skip to main content

Tutorial: Cloudflare Workflows for Long-Running Tasks

This tutorial walks through implementing durable, long-running background tasks using Cloudflare Workflows. Learn how to orchestrate multi-step processes that can survive restarts, handle failures gracefully, and integrate with AI services.


Table of Contents

  1. Overview
  2. Why Cloudflare Workflows?
  3. Project Setup
  4. Basic Workflow Structure
  5. Steps and State Management
  6. Error Handling
  7. Accessing Environment Variables
  8. Database Operations in Workflows
  9. AI Integration Example
  10. Triggering Workflows
  11. Complete Example
  12. Monitoring and Debugging
  13. Best Practices

Overview

Cloudflare Workflows provide:

  • Durable Execution: Steps are persisted and can resume after failures
  • Hibernation: Workflows pause between steps, saving compute costs
  • Automatic Retries: Failed steps can be retried automatically
  • Long Running: No 30-second timeout like regular Workers

Use Cases

  • AI document processing pipelines
  • Multi-step data analysis
  • Background report generation
  • Complex async workflows
  • Scheduled batch processing

Why Cloudflare Workflows?

FeatureRegular WorkersWorkflows
Max Duration30 secondsUnlimited
State PersistenceNoneAutomatic
Failure RecoveryManualAutomatic
Step OrchestrationManualBuilt-in
CostPay per requestPay per step

Project Setup

1. Configure wrangler.jsonc

// wrangler.jsonc
{
"name": "my-app",
"main": "workers/app.ts",
"compatibility_date": "2024-01-01",

// Define your workflows
"workflows": [
{
"name": "invention-analysis-workflow",
"binding": "INVENTION_ANALYSIS_WORKFLOW",
"class_name": "InventionAnalysisWorkflow",
"script_name": "my-app"
},
{
"name": "report-generation-workflow",
"binding": "REPORT_GENERATION_WORKFLOW",
"class_name": "ReportGenerationWorkflow",
"script_name": "my-app"
}
]
}

2. Export Workflows from Worker

// workers/app.ts

import { createRequestHandler } from "react-router";

// Export your workflow classes
export { InventionAnalysisWorkflow } from "../workflows/invention-analysis";
export { ReportGenerationWorkflow } from "../workflows/report-generation";

// Normal worker handler
export default {
async fetch(request: Request, env: Env, ctx: ExecutionContext) {
// Your normal request handling
},
};

3. TypeScript Types

// workers/env.d.ts

interface Env {
// Database
DATABASE: D1Database;

// Workflow bindings
INVENTION_ANALYSIS_WORKFLOW: Workflow;
REPORT_GENERATION_WORKFLOW: Workflow;

// API keys
ANTHROPIC_API_KEY: string;
OPENAI_API_KEY: string;
}

Basic Workflow Structure

// workflows/my-workflow.ts

import {
WorkflowEntrypoint,
type WorkflowEvent,
type WorkflowStep,
} from "cloudflare:workers";

// Define your payload type
export interface MyWorkflowPayload {
userId: string;
documentId: string;
}

// Extend WorkflowEntrypoint with Env and Payload types
export class MyWorkflow extends WorkflowEntrypoint<Env, MyWorkflowPayload> {

// Main workflow logic
async run(
event: WorkflowEvent<MyWorkflowPayload>,
step: WorkflowStep
) {
// Access payload
const { userId, documentId } = event.payload;

console.log(`[Workflow] Starting for user ${userId}`);

// Step 1: Fetch data
const data = await step.do("fetch-data", async () => {
// This code runs in a durable step
return { fetched: true };
});

// Step 2: Process data
const result = await step.do("process-data", async () => {
// Use data from previous step
return { processed: true, input: data };
});

// Return final result
return {
success: true,
result,
};
}
}

Steps and State Management

The step.do() Function

Each step.do() call:

  1. Executes the async function
  2. Persists the result
  3. On replay, returns the persisted result (doesn't re-execute)
// Each step is idempotent - runs once and caches result
const extractionResult = await step.do("extract-data", async () => {
console.log("[Step 1] Extracting data...");

// Expensive operation that should only run once
const result = await callAIService(data);

console.log("[Step 1] Extraction complete");
return result;
});

// This step uses the result from the previous step
const analysisResult = await step.do("analyze-data", async () => {
console.log("[Step 2] Analyzing extracted data...");

// extractionResult is available here
const analysis = await processData(extractionResult);

return analysis;
});

Step Naming Best Practices

// Good: Descriptive, unique names
await step.do("extract-invention-features", async () => { ... });
await step.do("generate-search-queries", async () => { ... });
await step.do("screen-patents-batch-1", async () => { ... });

// Bad: Generic or duplicate names
await step.do("step1", async () => { ... });
await step.do("process", async () => { ... });

Passing Data Between Steps

async run(event: WorkflowEvent<Payload>, step: WorkflowStep) {
// Step 1: Returns data
const userData = await step.do("fetch-user", async () => {
return { name: "John", email: "john@example.com" };
});

// Step 2: Uses data from step 1
const enrichedData = await step.do("enrich-user", async () => {
// userData is available here
return {
...userData,
enrichedAt: new Date().toISOString(),
};
});

// Step 3: Uses data from step 2
await step.do("save-user", async () => {
await saveToDatabase(enrichedData);
});
}

Error Handling

Try-Catch in Workflows

async run(event: WorkflowEvent<Payload>, step: WorkflowStep) {
const { inventionId, userId } = event.payload;

try {
// Steps that might fail
const result = await step.do("risky-operation", async () => {
const response = await fetch("https://api.example.com/data");
if (!response.ok) {
throw new Error(`API error: ${response.status}`);
}
return response.json();
});

return { success: true, result };

} catch (error) {
console.error(`[Workflow ERROR] Failed for ${inventionId}:`, error);

// Update status to failed in database
try {
const db = await getDb(this.env.DATABASE);
await updateStatus(db, {
inventionId,
status: "failed",
errorMessage: error instanceof Error ? error.message : "Unknown error",
});
} catch (updateError) {
console.error("Failed to update status:", updateError);
}

// Re-throw to mark workflow as failed
throw error;
}
}

Step-Level Error Handling

const result = await step.do("api-call-with-retry", async () => {
try {
return await callExternalAPI();
} catch (error) {
// Log but don't throw - return error state instead
console.error("API call failed:", error);
return {
success: false,
error: error instanceof Error ? error.message : "Unknown"
};
}
});

// Check result and handle accordingly
if (!result.success) {
// Handle failure gracefully
await step.do("handle-failure", async () => {
await notifyAdmin(result.error);
});
}

Accessing Environment Variables

The this.env Pattern

export class MyWorkflow extends WorkflowEntrypoint<Env, Payload> {

// Create clients using environment variables
private getAnthropicClient() {
return new Anthropic({
apiKey: this.env.ANTHROPIC_API_KEY,
});
}

private getStripeClient() {
return new Stripe(this.env.STRIPE_SECRET_KEY);
}

async run(event: WorkflowEvent<Payload>, step: WorkflowStep) {
// Access env directly
const webhookUrl = this.env.SLACK_WEBHOOK_URL;

// Or use helper methods
const anthropic = this.getAnthropicClient();

await step.do("call-ai", async () => {
return await anthropic.messages.create({ ... });
});
}
}

Important: Recreate Clients in Steps

Due to workflow hibernation, create clients fresh in each step:

// Good: Create client inside the step
await step.do("call-api", async () => {
const client = this.getAnthropicClient(); // Fresh client
return await client.messages.create({ ... });
});

// Bad: Client created outside step may be stale after hibernation
const client = this.getAnthropicClient();
await step.do("call-api", async () => {
return await client.messages.create({ ... }); // May fail!
});

Database Operations in Workflows

Getting Database Connection

import { getDb } from "@/db";

export class MyWorkflow extends WorkflowEntrypoint<Env, Payload> {

async run(event: WorkflowEvent<Payload>, step: WorkflowStep) {

// Step with database operation
await step.do("update-status", async () => {
// Create fresh DB connection in each step
const db = await getDb(this.env.DATABASE);

await updateRecord(db, {
id: event.payload.recordId,
status: "processing",
});
});

// Another step with database
const data = await step.do("fetch-data", async () => {
const db = await getDb(this.env.DATABASE);
return await getRecordById(db, event.payload.recordId);
});
}
}

Using Repository Functions

import { updateAnalysis, initializeAnalysis } from "@/repositories/analysis";
import { updateInvention } from "@/repositories/invention";

await step.do("initialize-analysis", async () => {
const db = await getDb(this.env.DATABASE);

await initializeAnalysis(db, {
inventionId: event.payload.inventionId,
extractedData: result,
});

await updateInvention(db, {
inventionId: event.payload.inventionId,
userId: event.payload.userId,
data: {
status: "analyzing",
},
});
});

AI Integration Example

Structured Output with Zod

import Anthropic from "@anthropic-ai/sdk";
import { z } from "zod";
import { zodToJsonSchema } from "zod-to-json-schema";

// Define your schema
const extractionSchema = z.object({
title: z.string(),
description: z.string(),
features: z.array(z.string()),
});

// Helper function for AI calls with schema validation
async function callAnthropicWithSchema<T extends z.ZodTypeAny>(
client: Anthropic,
{
model,
systemPrompt,
userPrompt,
schema,
schemaName,
}: {
model: string;
systemPrompt: string;
userPrompt: string;
schema: T;
schemaName: string;
}
): Promise<z.infer<T>> {
// Convert Zod to JSON Schema for the prompt
const jsonSchema = zodToJsonSchema(schema, schemaName);

const enhancedSystemPrompt = `${systemPrompt}

You MUST respond with ONLY valid JSON matching this schema:
${JSON.stringify(jsonSchema, null, 2)}

Return ONLY the JSON object, no markdown, no explanations.`;

const response = await client.messages.create({
model,
max_tokens: 4096,
system: enhancedSystemPrompt,
messages: [{ role: "user", content: userPrompt }],
});

// Extract text content
const textContent = response.content
.filter((block) => block.type === "text")
.map((block) => block.text)
.join("");

// Parse JSON (handle markdown code blocks)
let cleaned = textContent.trim();
const codeBlockMatch = cleaned.match(/^```(?:json)?\s*([\s\S]*?)\s*```$/);
if (codeBlockMatch) {
cleaned = codeBlockMatch[1].trim();
}

const parsed = JSON.parse(cleaned);

// Validate with Zod
return schema.parse(parsed);
}

Using in Workflow Steps

export class AnalysisWorkflow extends WorkflowEntrypoint<Env, Payload> {

private getAnthropicClient() {
return new Anthropic({
apiKey: this.env.ANTHROPIC_API_KEY,
});
}

async run(event: WorkflowEvent<Payload>, step: WorkflowStep) {

// Step 1: Extract structured data
const extraction = await step.do("extract-features", async () => {
const client = this.getAnthropicClient();

return await callAnthropicWithSchema(client, {
model: "claude-sonnet-4-5",
systemPrompt: "Extract invention features from the conversation.",
userPrompt: event.payload.conversation,
schema: extractionSchema,
schemaName: "InventionExtraction",
});
});

console.log(`Extracted ${extraction.features.length} features`);

// Step 2: Generate queries based on extraction
const queries = await step.do("generate-queries", async () => {
const client = this.getAnthropicClient();

return await callAnthropicWithSchema(client, {
model: "claude-sonnet-4-5",
systemPrompt: "Generate search queries for patent research.",
userPrompt: JSON.stringify(extraction.features),
schema: querySchema,
schemaName: "SearchQueries",
});
});

return { extraction, queries };
}
}

Triggering Workflows

From tRPC Routes

// app/trpc/routes/invention.ts

export const inventionRouter = createTRPCRouter({
startAnalysis: protectedProcedure
.input(z.object({ inventionId: z.string() }))
.mutation(async ({ ctx, input }) => {
// Get invention data
const invention = await getInventionById(ctx.db, input);

// Trigger workflow
const instance = await ctx.workflows.InventionAnalysisWorkflow.create({
params: {
inventionId: input.inventionId,
userId: ctx.auth.user.id,
conversation: invention.conversation,
},
});

// Update status
await updateInvention(ctx.db, {
inventionId: input.inventionId,
data: {
status: "analyzing",
workflowInstanceId: instance.id,
},
});

return {
success: true,
instanceId: instance.id
};
}),
});

Workflow Bindings in tRPC Context

// app/trpc/index.ts

export const createTRPCContext = async (opts: {
cfContext: Env;
}) => {
return {
// ... other context
workflows: {
InventionAnalysisWorkflow: opts.cfContext.INVENTION_ANALYSIS_WORKFLOW,
ReportGenerationWorkflow: opts.cfContext.REPORT_GENERATION_WORKFLOW,
},
};
};

Complete Example

Here's a complete multi-step workflow:

// workflows/invention-analysis.ts

import { getDb } from "@/db";
import {
WorkflowEntrypoint,
type WorkflowEvent,
type WorkflowStep,
} from "cloudflare:workers";
import Anthropic from "@anthropic-ai/sdk";

import { updateInvention } from "@/repositories/invention";
import { initializeAnalysis, updateAnalysis } from "@/repositories/analysis";
import { notifyReportGenerated } from "@/repositories/slack";

export interface AnalysisWorkflowPayload {
inventionId: string;
userId: string;
conversation: Array<{ type: string; message: string }>;
}

export class InventionAnalysisWorkflow extends WorkflowEntrypoint<
Env,
AnalysisWorkflowPayload
> {
private getAnthropicClient() {
return new Anthropic({
apiKey: this.env.ANTHROPIC_API_KEY,
});
}

async run(
event: WorkflowEvent<AnalysisWorkflowPayload>,
step: WorkflowStep
) {
const { inventionId, userId, conversation } = event.payload;

console.log(`[Workflow] Starting analysis for ${inventionId}`);

try {
// === Step 1: Extract Features ===
const extraction = await step.do("extract-features", async () => {
console.log("[Step 1] Extracting features...");

const client = this.getAnthropicClient();
const response = await client.messages.create({
model: "claude-sonnet-4-5",
max_tokens: 4096,
system: "Extract invention features from the conversation.",
messages: [{
role: "user",
content: JSON.stringify(conversation),
}],
});

const text = response.content
.filter((b) => b.type === "text")
.map((b) => b.text)
.join("");

const result = JSON.parse(text);

// Save to database
const db = await getDb(this.env.DATABASE);
await initializeAnalysis(db, {
inventionId,
extractedData: result,
});

console.log(`[Step 1] Extracted ${result.features.length} features`);
return result;
});

// === Step 2: Search Prior Art ===
const searchResults = await step.do("search-prior-art", async () => {
console.log("[Step 2] Searching prior art...");

// Search for each feature in parallel
const results = await Promise.all(
extraction.features.map((feature) =>
fetch(`${this.env.PATENT_API_URL}/search?q=${encodeURIComponent(feature)}`)
.then((r) => r.json())
)
);

// Deduplicate results
const uniquePatents = [...new Map(
results.flat().map((p) => [p.id, p])
).values()];

console.log(`[Step 2] Found ${uniquePatents.length} patents`);
return uniquePatents;
});

// === Step 3: Analyze Patents ===
const analysis = await step.do("analyze-patents", async () => {
console.log("[Step 3] Analyzing patents...");

const client = this.getAnthropicClient();
const analyses = [];

for (const patent of searchResults.slice(0, 10)) {
const response = await client.messages.create({
model: "claude-haiku-4-5",
max_tokens: 1024,
system: "Analyze patent relevance to the invention.",
messages: [{
role: "user",
content: `Invention: ${extraction.description}\nPatent: ${patent.abstract}`,
}],
});

analyses.push({
patentId: patent.id,
analysis: response.content[0].text,
});
}

console.log(`[Step 3] Analyzed ${analyses.length} patents`);
return analyses;
});

// === Step 4: Generate Report ===
const report = await step.do("generate-report", async () => {
console.log("[Step 4] Generating report...");

const client = this.getAnthropicClient();
const response = await client.messages.create({
model: "claude-sonnet-4-5",
max_tokens: 8192,
system: "Generate a patent analysis report.",
messages: [{
role: "user",
content: JSON.stringify({ extraction, analysis }),
}],
});

const reportText = response.content[0].text;

// Save to database
const db = await getDb(this.env.DATABASE);
await updateAnalysis(db, {
inventionId,
status: "completed",
finalReport: reportText,
});

await updateInvention(db, {
inventionId,
userId,
data: {
status: "completed",
analysis: reportText,
},
});

console.log(`[Step 4] Report generated (${reportText.length} chars)`);
return reportText;
});

// === Step 5: Send Notification ===
await step.do("send-notification", async () => {
if (!this.env.SLACK_WEBHOOK_URL) {
console.log("[Step 5] Slack not configured, skipping");
return;
}

await notifyReportGenerated(this.env.SLACK_WEBHOOK_URL, {
inventionId,
inventionTitle: extraction.title,
});

console.log("[Step 5] Notification sent");
});

console.log(`[Workflow] Completed successfully for ${inventionId}`);

return {
success: true,
inventionId,
featuresExtracted: extraction.features.length,
patentsAnalyzed: analysis.length,
};

} catch (error) {
console.error(`[Workflow ERROR] ${inventionId}:`, error);

// Update status to failed
try {
const db = await getDb(this.env.DATABASE);
await updateAnalysis(db, {
inventionId,
status: "failed",
});
await updateInvention(db, {
inventionId,
userId,
data: { status: "failed" },
});
} catch (e) {
console.error("Failed to update status:", e);
}

throw error;
}
}
}

Monitoring and Debugging

Logging Best Practices

// Use structured logging with step identifiers
console.log(`[Step 1] Starting extraction for ${inventionId}`);
console.log(`[Step 1] Extracted ${features.length} features`);
console.error(`[Step 1 ERROR] Extraction failed:`, error);

// Log workflow lifecycle
console.log(`[Workflow] Starting for ${inventionId} (user: ${userId})`);
console.log(`[Workflow] Completed successfully`);
console.error(`[Workflow ERROR] Failed:`, error);

Checking Workflow Status

// Get workflow instance status
const instance = await ctx.workflows.MyWorkflow.get(instanceId);
console.log("Status:", instance.status); // "running", "complete", "errored"

Best Practices

1. Step Granularity

  • Too Fine: Many small steps = overhead
  • Too Coarse: One big step = no recovery benefit
  • Just Right: Logical units of work
// Good: Logical groupings
await step.do("extract-and-validate", async () => { ... });
await step.do("search-all-databases", async () => { ... });
await step.do("generate-report", async () => { ... });

// Bad: Too granular
await step.do("parse-json", async () => { ... });
await step.do("validate-field-1", async () => { ... });
await step.do("validate-field-2", async () => { ... });

2. Recreate Clients in Steps

// Always create fresh clients inside steps
await step.do("call-api", async () => {
const client = this.getApiClient(); // Fresh client
return await client.call();
});

3. Handle Partial Failures

const results = await step.do("process-batch", async () => {
const successes = [];
const failures = [];

for (const item of items) {
try {
successes.push(await process(item));
} catch (e) {
failures.push({ item, error: e.message });
}
}

return { successes, failures };
});

// Continue with partial results
if (results.failures.length > 0) {
console.warn(`${results.failures.length} items failed`);
}

4. Use Meaningful Return Values

return {
success: true,
summary: {
inventionId,
featuresExtracted: extraction.features.length,
patentsFound: searchResults.length,
patentsAnalyzed: analysis.length,
reportLength: report.length,
},
timing: {
startedAt: event.timestamp,
completedAt: new Date().toISOString(),
},
};

5. Idempotent Operations

Design steps to be safely re-runnable:

// Good: Use upsert or check-then-create
await step.do("save-result", async () => {
const db = await getDb(this.env.DATABASE);
await db.insert(results)
.values({ id: inventionId, data: result })
.onConflictDoUpdate({
target: results.id,
set: { data: result, updatedAt: new Date() },
});
});

Resources

Written by

Sean Stuart Urgel
Senior Software Engineer @ Casper Studios