Chapter 7: DistributedWorker
¶
In the previous chapter, we learned about Checkpoint
and CheckpointStore
, the "save game" system for Floxide workflows. This allows us to save the state of a workflow run (its shared Context
and pending tasks) so we can resume later or recover from crashes.
But who actually does the work in a distributed setup? We have the blueprint (Workflow
), the task list (WorkQueue
, and the save system (CheckpointStore
). Now we need the actual employees who run the assembly line stations.
What's the Problem? Actively Processing Tasks¶
Imagine our distributed video processing factory. We have:
* A list of jobs (the WorkQueue
).
* Instructions for each job (the Node
logic within the Workflow
blueprint).
* A way to save progress after each job (CheckpointStore
).
But nothing happens on its own! We need programs – workers – that actively: 1. Look at the job list. 2. Pick up the next available job. 3. Figure out the current state of the project (load the latest save). 4. Do the work for that job step. 5. Save the updated project state. 6. Potentially add new follow-up jobs to the list. 7. Go back to step 1 and repeat.
This tireless entity is what Floxide calls a DistributedWorker
.
What is a DistributedWorker
? The Engine's Employee¶
A DistributedWorker
is an independent process or task whose main job is to execute workflow steps in a distributed Floxide setup. It's like an employee who performs the core work cycle:
- Check Queue: Repeatedly asks the shared
WorkQueue
: "Any jobs available?" - Get Task: If a task (
WorkItem
) is available, the worker takes it. - Load State: Reads the latest
Checkpoint
for the specific workflow run this task belongs to. This gives it the correctContext
. - Execute Step: Runs the processing logic defined by the corresponding
Node
(using theWorkflow
definition to find the right code). - Handle Outcome: Based on the
Transition
returned by the Node:- Enqueues new tasks onto the
WorkQueue
for subsequent steps. - Updates the state if necessary.
- Enqueues new tasks onto the
- Save State: Saves a new
Checkpoint
reflecting the updated context and remaining tasks for this run. - Repeat: Goes back to checking the queue for the next job.
Distributed Emphasis: The magic happens when you run multiple DistributedWorker
instances. They can run on the same machine or, more importantly, on different machines. They all connect to the same shared WorkQueue
and CheckpointStore
. This allows them to work together in parallel, processing different tasks for different workflow runs (or even different tasks for the same run if a step used Transition::NextAll
). This parallel, distributed processing is Floxide's core strength.
How a DistributedWorker
Uses Floxide Components¶
The DistributedWorker
doesn't do everything from scratch. It relies heavily on the other abstractions we've learned about:
Workflow
: The worker needs the workflow definition to know which Node logic to execute for a givenWorkItem
and how to handle transitions based on the definededges
. It primarily uses thestep_distributed
method provided by theWorkflow
trait implementation (generated by theworkflow!
macro).WorkQueue
: The worker constantly interacts with the queue todequeue
tasks to process andenqueue
subsequent tasks.CheckpointStore
: Before executing a step, the workerload
s the state. After execution, itsave
s the potentially updated state.- Distributed Stores (Chapter 9): Workers also interact with other stores to report their status (
LivenessStore
), record errors (ErrorStore
), update metrics (MetricsStore
), and manage the state of individual work items (WorkItemStateStore
). This provides observability and control. RetryPolicy
(Chapter 10): The worker can be configured with a retry policy to automatically handle transient errors during step execution.
The Worker Loop in Action (Simplified)¶
Let's walk through a single cycle for one worker (Worker #42):
- Dequeue: Worker #42 calls
queue.dequeue()
. - Result: It receives
Some(("video_abc", WorkItem::ExtractAudio("chunk_3.mp4")))
. (It got a job!) - Load Checkpoint: Worker #42 calls
checkpoint_store.load("video_abc")
. It gets back theCheckpoint
containing the currentContext
(maybe{"api_key": "xyz", "processed_chunks": 2}
) and the pending queue for this run (maybe[WorkItem::GenerateSubtitles("chunk_3.mp4")]
). - Execute Step: The worker looks at
WorkItem::ExtractAudio
. Using theWorkflow
definition, it finds theExtractAudioNode
logic. It calls theprocess
method of that Node, passing the loadedContext
and the input"chunk_3.mp4"
. - Node Returns: The
ExtractAudioNode
finishes and returnsOk(Transition::Next("chunk_3.aac"))
. It might have internally used theapi_key
from the context. Let's assume it also updated the context conceptually to{"api_key": "xyz", "processed_chunks": 3}
. - Handle Transition: The worker sees
Transition::Next
. It checks theWorkflow
'sedges
forExtractAudioNode
. Let's say the edge points toGenerateSubtitlesNode
. - Enqueue Next: The worker creates
WorkItem::GenerateSubtitles("chunk_3.aac")
(using the output from step 5) and callsqueue.enqueue("video_abc", new_work_item)
. - Save Checkpoint: The worker creates a new
Checkpoint
containing the updated context ({"processed_chunks": 3}
) and the updated pending queue (which is now empty, as theGenerateSubtitles
item from step 3 was already present, and the new one from step 7 was added to the main work queue). It callscheckpoint_store.save("video_abc", new_checkpoint)
. - Loop: Worker #42 goes back to step 1, ready for the next job.
If in step 2, queue.dequeue()
returned None
, the worker would typically wait for a short period and then try again.
Using DistributedWorker
¶
You typically don't interact with the DistributedWorker
struct directly in your Node
logic. Instead, you configure and run it as a separate process or task. Floxide provides helpers to build and run workers.
First, you need instances of your workflow definition and all the required store implementations (using appropriate distributed backends like Redis or Kafka, not just in-memory ones for real distribution).
use floxide::{
Workflow, DistributedWorker, WorkQueue, CheckpointStore, Context,
// Import your specific workflow, context, and store implementations
// e.g., TextProcessor, SimpleContext, RedisWorkQueue, RedisCheckpointStore etc.
};
use std::sync::Arc;
// Assume these are properly configured instances for distributed use
// let my_workflow: TextProcessor = // ... initialized workflow struct
// let my_queue: RedisWorkQueue<...> = // ... connected queue
// let my_checkpoint_store: RedisCheckpointStore<...> = // ... connected store
// let my_run_info_store: RedisRunInfoStore = // ...
// let my_metrics_store: RedisMetricsStore = // ...
// let my_error_store: RedisErrorStore = // ...
// let my_liveness_store: RedisLivenessStore = // ...
// let my_work_item_state_store: RedisWorkItemStateStore<...> = // ...
// Create a worker instance
let worker = DistributedWorker::new(
my_workflow,
my_queue,
my_checkpoint_store,
my_run_info_store,
my_metrics_store,
my_error_store,
my_liveness_store,
my_work_item_state_store
);
// Optional: Configure retry policy
// worker.set_retry_policy(RetryPolicy::default());
// Define a unique ID for this worker instance (e.g., from hostname/PID)
let worker_id = 42; // Or generate dynamically
This code sets up a DistributedWorker
instance, providing it with everything it needs to operate: the workflow logic, the queue, and various state stores.
Then, you typically run the worker's main loop in an async task:
// This would usually be run within an async runtime like Tokio
// Clone the worker if needed (it's designed to be Clone-able)
let worker_clone = worker.clone();
// Spawn an async task to run the worker loop forever
tokio::spawn(async move {
// run_forever loops indefinitely, processing tasks as they appear
worker_clone.run_forever(worker_id).await;
// Note: run_forever technically never returns Ok, it runs until cancelled or panics.
});
println!("Worker {} started and polling for tasks...", worker_id);
// Keep the main program alive, or manage worker tasks (e.g., using WorkerPool)
// ...
This code starts the worker. The run_forever
method contains the core loop described earlier (dequeue, load, process, save, enqueue). You would typically launch many such worker tasks, potentially across multiple machines, all configured with the same shared stores and workflow definition.
Under the Hood: The step_distributed
Method¶
The DistributedWorker
itself doesn't contain the complex logic for loading, executing, saving, and enqueuing based on transitions. It delegates most of this to the Workflow
trait's step_distributed
method (which is implemented by the code generated by the workflow!
macro).
The worker.run_once(worker_id)
method essentially does this:
- Calls
queue.dequeue()
. - If a
work_item
andrun_id
are received:- Updates worker status (liveness, work item state) using the provided stores.
- Calls
self.workflow.step_distributed(...)
, passing thecheckpoint_store
,queue
,run_id
, andwork_item
. - The
step_distributed
implementation (generated byworkflow!
) handles:- Loading the checkpoint (
store.load(run_id)
). - Calling the correct Node's
process
method (usingprocess_work_item
). - Handling the
Transition
and enqueuing next items (queue.enqueue(...)
). - Saving the new checkpoint (
store.save(run_id, ...)
).
- Loading the checkpoint (
- Updates worker status based on the result of
step_distributed
.
- If no item is dequeued, it indicates idleness.
Here's a simplified sequence diagram of run_once
:
Let's look at simplified code from floxide-core/src/distributed/worker.rs
:
The DistributedWorker
struct holds all the necessary components:
// Simplified from crates/floxide-core/src/distributed/worker.rs
pub struct DistributedWorker<W, C, Q, S, ...>
where
W: Workflow<C, WorkItem: 'static>,
C: Context,
Q: WorkQueue<C, W::WorkItem>,
S: CheckpointStore<C, W::WorkItem>,
// ... other store bounds ...
{
workflow: W,
queue: Q,
checkpoint_store: S,
run_info_store: RIS,
// ... other stores ...
retry_policy: Option<RetryPolicy>,
// ...
}
The run_forever
method loops, calling run_once
:
// Simplified from crates/floxide-core/src/distributed/worker.rs
impl<...> DistributedWorker<...> {
pub async fn run_forever(&self, worker_id: usize) -> std::convert::Infallible {
loop {
match self.run_once(worker_id).await {
Ok(Some((_run_id, _output))) => {
// Work was done, loop immediately for more
}
Ok(None) => {
// No work found, sleep briefly
sleep(Duration::from_millis(100)).await;
}
Err(e) => {
// Error occurred, log and sleep
error!(worker_id, error = ?e, "Worker error");
sleep(Duration::from_millis(100)).await;
}
}
}
}
}
And run_once
orchestrates the call to the workflow's core distributed step logic:
// Simplified CONCEPT from crates/floxide-core/src/distributed/worker.rs
// The actual implementation uses callbacks for state updates.
impl<...> DistributedWorker<...> {
pub async fn run_once(&self, worker_id: usize) -> Result<Option<(String, W::Output)>, FloxideError> {
// Update liveness/heartbeat (simplified)
self.heartbeat(worker_id).await;
// *** Core Logic: Delegate to Workflow::step_distributed ***
// This method encapsulates: dequeue, load, process, save, enqueue
match self.workflow.step_distributed(
&self.checkpoint_store,
&self.queue,
worker_id,
self.build_callbacks(worker_id) // Provides hooks for state updates
).await {
Ok(Some((run_id, output))) => {
// Workflow run completed!
// Update status to idle
self.on_idle_state_updates(worker_id).await?;
Ok(Some((run_id, output)))
}
Ok(None) => {
// Step processed, but workflow not finished OR queue was empty
// Update status to idle
self.on_idle_state_updates(worker_id).await?;
Ok(None)
}
Err(step_error) => {
// Handle step error (potentially using retry policy logic)
// Update status to idle or failed
self.on_idle_state_updates(worker_id).await?;
Err(step_error.error) // Return the underlying FloxideError
}
}
}
// Helper to create callback object
fn build_callbacks(&self, worker_id: usize) -> Arc<dyn StepCallbacks<C, W>> {
// ... creates an object that updates stores on events ...
}
// Helpers for state updates via callbacks (simplified)
async fn on_idle_state_updates(&self, worker_id: usize) -> Result<(), FloxideError>;
async fn heartbeat(&self, worker_id: usize);
// ... other state update helpers for start, success, error ...
}
The key takeaway is that the DistributedWorker
acts as the runner or host process, performing the loop and calling the appropriate methods on the Workflow
, WorkQueue
, and CheckpointStore
to execute the distributed steps defined by your application.
Worker Pools¶
Running and managing individual worker tasks can be tedious. Floxide often provides a WorkerPool
utility (shown in the provided context code) that simplifies starting, stopping, and monitoring multiple DistributedWorker
instances concurrently.
Conclusion¶
The DistributedWorker
is the active entity in a Floxide distributed workflow. It's the "employee" that continuously:
* Pulls tasks from the shared WorkQueue
.
* Loads the necessary state from the CheckpointStore
.
* Executes the Node
logic defined in the Workflow
(via step_distributed
).
* Saves the updated state back to the CheckpointStore
.
* Enqueues follow-up tasks.
By running multiple workers, potentially across many machines, all interacting with the same shared queue and stores, Floxide achieves parallel and distributed workflow execution.
But how do we start a new workflow run in this distributed environment? We can't just call my_workflow.run()
anymore. We need a way to kick things off, create the initial checkpoint, and put the very first task onto the queue for the workers to find. That's the job of the DistributedOrchestrator
.