Chapter 6: Checkpoint & CheckpointStore Trait

In the previous chapter, we saw how the WorkQueue acts like a central bulletin board, allowing different workers to pick up tasks in a distributed workflow. This is great for distributing the work, but what happens if something goes wrong? Or how does a worker know the current state of the shared information (WorkflowCtx) when it picks up a task mid-way through a long workflow?

What's the Problem? Remembering Where We Left Off

Imagine playing a long video game. You wouldn't want to start from the very beginning every time you turn the game off or if the power suddenly goes out! You need a way to save your progress.

Similarly, workflows, especially distributed ones, can be long-running and complex. * What if a worker processing a crucial video segment crashes? Do we lose all the work done so far? * If Worker A finishes processing step 1 and puts a task for step 2 onto the WorkQueue, how does Worker B (which might be on a different computer) know the latest value of the shared api_key or processed_items_count from the workflow's Context when it picks up the step 2 task?

We need a mechanism to: 1. Save the current state of a running workflow at specific points. 2. Load that state back to resume the workflow later, either after a crash or when a different worker takes over.

This mechanism is called checkpointing, and it's absolutely essential for fault tolerance and effective state management in distributed systems like Floxide.

Checkpoint: The Workflow Snapshot

A Checkpoint in Floxide is like taking a snapshot of your workflow's current progress. It captures two critical pieces of information at a particular moment in time for a specific workflow run:

  1. The Shared Context: The current state of your custom data stored inside the WorkflowCtx::store. This ensures that when the workflow resumes, it has the correct shared information (like API keys, counters, configuration loaded mid-way, etc.).
  2. The Pending Work Queue: The list of WorkItems that are still waiting to be processed for this specific workflow run. This ensures the workflow knows which steps are next.

Think of it like saving your game: the save file contains your character's stats, inventory (like the context), and the list of quests you still need to do (like the pending work queue).

Floxide defines a simple struct to hold this snapshot:

// Simplified from crates/floxide-core/src/checkpoint.rs

use std::collections::VecDeque; // A type of queue
use crate::context::Context;
use crate::workflow::WorkItem;

/// A snapshot of a workflow's pending work and its context.
#[derive(Debug, Clone)] // Allows copying and printing
pub struct Checkpoint<C: Context, WI: WorkItem> {
    /// The user-provided context for the workflow
    pub context: C,
    /// The queue of pending work items for this run
    pub queue: VecDeque<WI>,
}
  • context: C: Holds the actual instance of your shared Context data (e.g., MyWorkflowData from Chapter 3).
  • queue: VecDeque<WI>: Holds the list of WorkItems generated by the workflow! macro that haven't been processed yet for this run.

Distributed Emphasis: When a worker saves a checkpoint, it's essentially publishing the current state and remaining tasks. Another worker (or the same one after a restart) can then load this checkpoint to seamlessly continue the process with the correct data and task list, even if it's running on a different machine.

CheckpointStore Trait: Defining How to Save and Load

Okay, we have the snapshot (Checkpoint), but where do we save it? And how? Do we write it to a file? Store it in a database? Keep it in memory?

This is where the CheckpointStore trait comes in. It defines the contract for how checkpoints are saved and loaded. It's like defining the interface for interacting with a memory card or cloud storage for your game saves.

The trait requires implementors to provide two main functions:

// Simplified from crates/floxide-core/src/checkpoint.rs

use async_trait::async_trait;
// ... other imports: Checkpoint, Context, WorkItem, CheckpointError

/// A trait for persisting and loading workflow checkpoints.
#[async_trait]
pub trait CheckpointStore<C: Context, WI: WorkItem> {
    /// Persist the given checkpoint under `workflow_id`.
    /// Overwrites any previous checkpoint for the same ID.
    async fn save(
        &self,
        workflow_id: &str, // Unique ID for the workflow run
        checkpoint: &Checkpoint<C, WI>, // The snapshot to save
    ) -> Result<(), CheckpointError>; // Returns Ok or an error

    /// Load the last-saved checkpoint for `workflow_id`, if any.
    async fn load(
        &self,
        workflow_id: &str, // Which workflow run to load?
    ) -> Result<Option<Checkpoint<C, WI>>, CheckpointError>;
    // Returns Ok(Some(checkpoint)) if found, Ok(None) if not, or an error
}
  • save: Takes the unique workflow_id and the Checkpoint data and saves it somewhere. The implementation decides how (e.g., serializing to JSON and writing to Redis).
  • load: Takes a workflow_id and attempts to retrieve the previously saved Checkpoint data, deserializing it back into the Checkpoint struct. It returns None if no checkpoint exists for that ID.

By using a trait, Floxide allows you to choose the storage backend that fits your needs: * InMemoryCheckpointStore: Stores checkpoints in memory (simple, good for tests, but not persistent or distributed). * Database Store: Could store checkpoints as rows in a database table. * File System Store: Could save checkpoints as files on disk. * Cloud Storage Store: Could save checkpoints to services like AWS S3 or Google Cloud Storage.

How Checkpointing Enables Distributed and Fault-Tolerant Workflows

Let's see how the Checkpoint and CheckpointStore work together, especially in a distributed setup.

  1. Start: A workflow run starts (e.g., run ID "video_abc"). An initial checkpoint might be saved with the starting context and the first task in the queue.
  2. Worker A Takes Task 1: Worker A loads the checkpoint for "video_abc". It gets the context and sees Task 1 is next. It processes Task 1.
  3. Worker A Finishes Task 1: Let's say Task 1 updated the context (e.g., incremented a counter) and produced Task 2 and Task 3.
  4. Worker A Saves Checkpoint: The Floxide engine (running within Worker A) creates a new Checkpoint containing:
    • The updated context.
    • The remaining work queue (now containing Task 2 and Task 3).
    • It calls store.save("video_abc", new_checkpoint). The old checkpoint is overwritten.
  5. Crash! (Optional): Worker A crashes immediately after saving. No work is lost because the progress is saved!
  6. Worker B Takes Task 2: Some time later, Worker B (maybe on a different machine) dequeues Task 2 from the main WorkQueue.
  7. Worker B Loads Checkpoint: Before processing Task 2, Worker B calls store.load("video_abc"). It receives the checkpoint saved by Worker A in step 4.
  8. Worker B Resumes: Worker B now has the correct, updated context (with the incremented counter) and knows that Task 3 is also pending (from the checkpoint's queue). It proceeds to execute Task 2.
sequenceDiagram participant WorkerA as Worker A participant Store as Checkpoint Store participant WorkerB as Worker B participant Engine as Floxide Engine Note over WorkerA, Store: Run "video_abc", Worker A processes Task 1 WorkerA->>Engine: Finish Task 1 (Context updated, Task 2 & 3 next) Engine->>Engine: Create Checkpoint { updated_context, queue:[Task2, Task3] } Engine->>Store: save("video_abc", checkpoint) Store-->>Engine: Ok Note over WorkerA, WorkerB: Worker A might crash here - state is safe! Note over WorkerB, Store: Later, Worker B needs to process Task 2 for "video_abc" WorkerB->>Store: load("video_abc") Store-->>WorkerB: Return Checkpoint { updated_context, queue:[Task2, Task3] } WorkerB->>Engine: Restore context, know Task 3 is pending Engine->>WorkerB: Execute Task 2 with updated_context

This save/load cycle ensures that state is consistently passed between steps, even across different workers or process restarts.

Example: InMemoryCheckpointStore

Floxide provides a basic InMemoryCheckpointStore for testing and simple cases. It uses a standard Rust HashMap protected by a RwLock (allowing multiple readers or one writer) to store checkpoints in memory.

// Simplified from crates/floxide-core/src/checkpoint.rs

use std::collections::HashMap;
use std::sync::{Arc, RwLock}; // For thread-safe sharing
// ... other imports

#[derive(Clone)]
pub struct InMemoryCheckpointStore<C: Context, WI: WorkItem> {
    // Arc allows sharing ownership across threads/tasks
    // RwLock allows multiple readers or one writer at a time
    inner: Arc<RwLock<HashMap<String, Checkpoint<C, WI>>>>,
    // PhantomData might be needed depending on generic usage
    _phantom: std::marker::PhantomData<(C, WI)>,
}

// --- Implementation of the CheckpointStore trait ---

#[async_trait]
impl<C: Context, WI: WorkItem> CheckpointStore<C, WI> for InMemoryCheckpointStore<C, WI> {
    async fn save(
        &self,
        workflow_id: &str,
        checkpoint: &Checkpoint<C, WI>,
    ) -> Result<(), CheckpointError> {
        // Get write access to the map (blocks other writers/readers)
        let mut map = self.inner.write().unwrap(); // .unwrap() simplifies error handling here
        // Insert a clone of the checkpoint into the map
        map.insert(workflow_id.to_string(), checkpoint.clone());
        Ok(())
        // Lock is released automatically when 'map' goes out of scope
    }

    async fn load(&self, workflow_id: &str) -> Result<Option<Checkpoint<C, WI>>, CheckpointError> {
        // Get read access to the map (blocks writers, allows other readers)
        let map = self.inner.read().unwrap();
        // Get the checkpoint, clone it if found
        let maybe_checkpoint = map.get(workflow_id).map(|ck| ck.clone());
        Ok(maybe_checkpoint)
        // Lock is released automatically when 'map' goes out of scope
    }
}
  • Arc<RwLock<HashMap<...>>>: The standard Rust way to share mutable data safely across asynchronous tasks.
  • inner.write().unwrap(): Acquires a write lock on the map.
  • inner.read().unwrap(): Acquires a read lock.
  • map.insert(...) / map.get(...): Standard HashMap operations.
  • .clone(): We clone the Checkpoint when saving and loading to avoid ownership issues with the map. This requires C and WI to be Clone.

This implementation is simple but effective for single-process scenarios. For real distributed systems, you'd use an implementation backed by Redis, a database, or another shared storage mechanism.

Under the Hood: Serialization is Key

How does the CheckpointStore actually save your custom Context data (C) and the WorkItem (WI)? It relies on serialization.

  • Serialization: Converting the in-memory Rust structs (Checkpoint, your Context, your WorkItem) into a format that can be stored or transmitted (like JSON, MessagePack, Protobuf, etc.).
  • Deserialization: Converting the stored format back into the Rust structs.

Floxide leverages the powerful serde library for this. Remember in Chapter 3, we required your Context struct to derive Serialize and Deserialize? And the workflow! macro automatically derives them for the WorkItem enum it generates.

The CheckpointStore implementation (like InMemoryCheckpointStore, or a hypothetical RedisCheckpointStore) is responsible for: 1. During save: Taking the Checkpoint<C, WI> struct and using serde (e.g., serde_json::to_string or serde_json::to_vec) to turn it into a byte array or string. Then, storing those bytes/string in its backend (e.g., the in-memory HashMap, a Redis key, a database BLOB). 2. During load: Retrieving the stored bytes/string from the backend. Then, using serde (e.g., serde_json::from_slice or serde_json::from_str) to parse it back into a Checkpoint<C, WI> struct.

This serialization is what allows complex Rust data structures representing your workflow's state to be saved, shared across machines, and loaded back reliably.

Conclusion

Checkpoints are the "save game" mechanism for Floxide workflows.

  • A Checkpoint is a snapshot containing the current shared Context and the list of pending WorkItems for a specific run.
  • The CheckpointStore trait defines the standard interface (save, load) for persisting and retrieving these snapshots.
  • Implementations of CheckpointStore (like InMemoryCheckpointStore or others using databases/files) handle the actual storage and rely on serde for serialization.
  • Checkpointing is crucial for fault tolerance (resuming after crashes) and enabling distributed execution by allowing state to be shared and consistently restored across different workers and machines.

Now that we have all the core pieces – Nodes define steps, Context shares data, Workflow defines structure, WorkQueue distributes tasks, and Checkpoints save progress – let's see how a dedicated DistributedWorker uses all these components together to actually execute steps in a distributed environment.

Next: Chapter 7: DistributedWorker