Chapter 6: Checkpoint
& CheckpointStore
Trait¶
In the previous chapter, we saw how the WorkQueue
acts like a central bulletin board, allowing different workers to pick up tasks in a distributed workflow. This is great for distributing the work, but what happens if something goes wrong? Or how does a worker know the current state of the shared information (WorkflowCtx
) when it picks up a task mid-way through a long workflow?
What's the Problem? Remembering Where We Left Off¶
Imagine playing a long video game. You wouldn't want to start from the very beginning every time you turn the game off or if the power suddenly goes out! You need a way to save your progress.
Similarly, workflows, especially distributed ones, can be long-running and complex.
* What if a worker processing a crucial video segment crashes? Do we lose all the work done so far?
* If Worker A finishes processing step 1 and puts a task for step 2 onto the WorkQueue
, how does Worker B (which might be on a different computer) know the latest value of the shared api_key
or processed_items_count
from the workflow's Context
when it picks up the step 2 task?
We need a mechanism to: 1. Save the current state of a running workflow at specific points. 2. Load that state back to resume the workflow later, either after a crash or when a different worker takes over.
This mechanism is called checkpointing, and it's absolutely essential for fault tolerance and effective state management in distributed systems like Floxide.
Checkpoint
: The Workflow Snapshot¶
A Checkpoint
in Floxide is like taking a snapshot of your workflow's current progress. It captures two critical pieces of information at a particular moment in time for a specific workflow run:
- The Shared Context: The current state of your custom data stored inside the
WorkflowCtx::store
. This ensures that when the workflow resumes, it has the correct shared information (like API keys, counters, configuration loaded mid-way, etc.). - The Pending Work Queue: The list of
WorkItem
s that are still waiting to be processed for this specific workflow run. This ensures the workflow knows which steps are next.
Think of it like saving your game: the save file contains your character's stats, inventory (like the context), and the list of quests you still need to do (like the pending work queue).
Floxide defines a simple struct to hold this snapshot:
// Simplified from crates/floxide-core/src/checkpoint.rs
use std::collections::VecDeque; // A type of queue
use crate::context::Context;
use crate::workflow::WorkItem;
/// A snapshot of a workflow's pending work and its context.
#[derive(Debug, Clone)] // Allows copying and printing
pub struct Checkpoint<C: Context, WI: WorkItem> {
/// The user-provided context for the workflow
pub context: C,
/// The queue of pending work items for this run
pub queue: VecDeque<WI>,
}
context: C
: Holds the actual instance of your sharedContext
data (e.g.,MyWorkflowData
from Chapter 3).queue: VecDeque<WI>
: Holds the list ofWorkItem
s generated by theworkflow!
macro that haven't been processed yet for this run.
Distributed Emphasis: When a worker saves a checkpoint, it's essentially publishing the current state and remaining tasks. Another worker (or the same one after a restart) can then load this checkpoint to seamlessly continue the process with the correct data and task list, even if it's running on a different machine.
CheckpointStore
Trait: Defining How to Save and Load¶
Okay, we have the snapshot (Checkpoint
), but where do we save it? And how? Do we write it to a file? Store it in a database? Keep it in memory?
This is where the CheckpointStore
trait comes in. It defines the contract for how checkpoints are saved and loaded. It's like defining the interface for interacting with a memory card or cloud storage for your game saves.
The trait requires implementors to provide two main functions:
// Simplified from crates/floxide-core/src/checkpoint.rs
use async_trait::async_trait;
// ... other imports: Checkpoint, Context, WorkItem, CheckpointError
/// A trait for persisting and loading workflow checkpoints.
#[async_trait]
pub trait CheckpointStore<C: Context, WI: WorkItem> {
/// Persist the given checkpoint under `workflow_id`.
/// Overwrites any previous checkpoint for the same ID.
async fn save(
&self,
workflow_id: &str, // Unique ID for the workflow run
checkpoint: &Checkpoint<C, WI>, // The snapshot to save
) -> Result<(), CheckpointError>; // Returns Ok or an error
/// Load the last-saved checkpoint for `workflow_id`, if any.
async fn load(
&self,
workflow_id: &str, // Which workflow run to load?
) -> Result<Option<Checkpoint<C, WI>>, CheckpointError>;
// Returns Ok(Some(checkpoint)) if found, Ok(None) if not, or an error
}
save
: Takes the uniqueworkflow_id
and theCheckpoint
data and saves it somewhere. The implementation decides how (e.g., serializing to JSON and writing to Redis).load
: Takes aworkflow_id
and attempts to retrieve the previously savedCheckpoint
data, deserializing it back into theCheckpoint
struct. It returnsNone
if no checkpoint exists for that ID.
By using a trait, Floxide allows you to choose the storage backend that fits your needs:
* InMemoryCheckpointStore
: Stores checkpoints in memory (simple, good for tests, but not persistent or distributed).
* Database Store: Could store checkpoints as rows in a database table.
* File System Store: Could save checkpoints as files on disk.
* Cloud Storage Store: Could save checkpoints to services like AWS S3 or Google Cloud Storage.
How Checkpointing Enables Distributed and Fault-Tolerant Workflows¶
Let's see how the Checkpoint
and CheckpointStore
work together, especially in a distributed setup.
- Start: A workflow run starts (e.g., run ID "video_abc"). An initial checkpoint might be saved with the starting context and the first task in the queue.
- Worker A Takes Task 1: Worker A loads the checkpoint for "video_abc". It gets the context and sees Task 1 is next. It processes Task 1.
- Worker A Finishes Task 1: Let's say Task 1 updated the context (e.g., incremented a counter) and produced Task 2 and Task 3.
- Worker A Saves Checkpoint: The Floxide engine (running within Worker A) creates a new
Checkpoint
containing:- The updated context.
- The remaining work queue (now containing Task 2 and Task 3).
- It calls
store.save("video_abc", new_checkpoint)
. The old checkpoint is overwritten.
- Crash! (Optional): Worker A crashes immediately after saving. No work is lost because the progress is saved!
- Worker B Takes Task 2: Some time later, Worker B (maybe on a different machine) dequeues Task 2 from the main
WorkQueue
. - Worker B Loads Checkpoint: Before processing Task 2, Worker B calls
store.load("video_abc")
. It receives the checkpoint saved by Worker A in step 4. - Worker B Resumes: Worker B now has the correct, updated context (with the incremented counter) and knows that Task 3 is also pending (from the checkpoint's queue). It proceeds to execute Task 2.
This save/load cycle ensures that state is consistently passed between steps, even across different workers or process restarts.
Example: InMemoryCheckpointStore
¶
Floxide provides a basic InMemoryCheckpointStore
for testing and simple cases. It uses a standard Rust HashMap
protected by a RwLock
(allowing multiple readers or one writer) to store checkpoints in memory.
// Simplified from crates/floxide-core/src/checkpoint.rs
use std::collections::HashMap;
use std::sync::{Arc, RwLock}; // For thread-safe sharing
// ... other imports
#[derive(Clone)]
pub struct InMemoryCheckpointStore<C: Context, WI: WorkItem> {
// Arc allows sharing ownership across threads/tasks
// RwLock allows multiple readers or one writer at a time
inner: Arc<RwLock<HashMap<String, Checkpoint<C, WI>>>>,
// PhantomData might be needed depending on generic usage
_phantom: std::marker::PhantomData<(C, WI)>,
}
// --- Implementation of the CheckpointStore trait ---
#[async_trait]
impl<C: Context, WI: WorkItem> CheckpointStore<C, WI> for InMemoryCheckpointStore<C, WI> {
async fn save(
&self,
workflow_id: &str,
checkpoint: &Checkpoint<C, WI>,
) -> Result<(), CheckpointError> {
// Get write access to the map (blocks other writers/readers)
let mut map = self.inner.write().unwrap(); // .unwrap() simplifies error handling here
// Insert a clone of the checkpoint into the map
map.insert(workflow_id.to_string(), checkpoint.clone());
Ok(())
// Lock is released automatically when 'map' goes out of scope
}
async fn load(&self, workflow_id: &str) -> Result<Option<Checkpoint<C, WI>>, CheckpointError> {
// Get read access to the map (blocks writers, allows other readers)
let map = self.inner.read().unwrap();
// Get the checkpoint, clone it if found
let maybe_checkpoint = map.get(workflow_id).map(|ck| ck.clone());
Ok(maybe_checkpoint)
// Lock is released automatically when 'map' goes out of scope
}
}
Arc<RwLock<HashMap<...>>>
: The standard Rust way to share mutable data safely across asynchronous tasks.inner.write().unwrap()
: Acquires a write lock on the map.inner.read().unwrap()
: Acquires a read lock.map.insert(...)
/map.get(...)
: StandardHashMap
operations..clone()
: We clone theCheckpoint
when saving and loading to avoid ownership issues with the map. This requiresC
andWI
to beClone
.
This implementation is simple but effective for single-process scenarios. For real distributed systems, you'd use an implementation backed by Redis, a database, or another shared storage mechanism.
Under the Hood: Serialization is Key¶
How does the CheckpointStore
actually save your custom Context
data (C
) and the WorkItem
(WI
)? It relies on serialization.
- Serialization: Converting the in-memory Rust structs (
Checkpoint
, yourContext
, yourWorkItem
) into a format that can be stored or transmitted (like JSON, MessagePack, Protobuf, etc.). - Deserialization: Converting the stored format back into the Rust structs.
Floxide leverages the powerful serde
library for this. Remember in Chapter 3, we required your Context
struct to derive Serialize
and Deserialize
? And the workflow!
macro automatically derives them for the WorkItem
enum it generates.
The CheckpointStore
implementation (like InMemoryCheckpointStore
, or a hypothetical RedisCheckpointStore
) is responsible for:
1. During save
: Taking the Checkpoint<C, WI>
struct and using serde
(e.g., serde_json::to_string
or serde_json::to_vec
) to turn it into a byte array or string. Then, storing those bytes/string in its backend (e.g., the in-memory HashMap
, a Redis key, a database BLOB).
2. During load
: Retrieving the stored bytes/string from the backend. Then, using serde
(e.g., serde_json::from_slice
or serde_json::from_str
) to parse it back into a Checkpoint<C, WI>
struct.
This serialization is what allows complex Rust data structures representing your workflow's state to be saved, shared across machines, and loaded back reliably.
Conclusion¶
Checkpoints are the "save game" mechanism for Floxide workflows.
- A
Checkpoint
is a snapshot containing the current sharedContext
and the list of pendingWorkItem
s for a specific run. - The
CheckpointStore
trait defines the standard interface (save
,load
) for persisting and retrieving these snapshots. - Implementations of
CheckpointStore
(likeInMemoryCheckpointStore
or others using databases/files) handle the actual storage and rely onserde
for serialization. - Checkpointing is crucial for fault tolerance (resuming after crashes) and enabling distributed execution by allowing state to be shared and consistently restored across different workers and machines.
Now that we have all the core pieces – Nodes define steps, Context shares data, Workflow defines structure, WorkQueue distributes tasks, and Checkpoints save progress – let's see how a dedicated DistributedWorker
uses all these components together to actually execute steps in a distributed environment.