Chapter 7: DistributedWorker

In the previous chapter, we learned about Checkpoint and CheckpointStore, the "save game" system for Floxide workflows. This allows us to save the state of a workflow run (its shared Context and pending tasks) so we can resume later or recover from crashes.

But who actually does the work in a distributed setup? We have the blueprint (Workflow), the task list (WorkQueue, and the save system (CheckpointStore). Now we need the actual employees who run the assembly line stations.

What's the Problem? Actively Processing Tasks

Imagine our distributed video processing factory. We have: * A list of jobs (the WorkQueue). * Instructions for each job (the Node logic within the Workflow blueprint). * A way to save progress after each job (CheckpointStore).

But nothing happens on its own! We need programs – workers – that actively: 1. Look at the job list. 2. Pick up the next available job. 3. Figure out the current state of the project (load the latest save). 4. Do the work for that job step. 5. Save the updated project state. 6. Potentially add new follow-up jobs to the list. 7. Go back to step 1 and repeat.

This tireless entity is what Floxide calls a DistributedWorker.

What is a DistributedWorker? The Engine's Employee

A DistributedWorker is an independent process or task whose main job is to execute workflow steps in a distributed Floxide setup. It's like an employee who performs the core work cycle:

  1. Check Queue: Repeatedly asks the shared WorkQueue: "Any jobs available?"
  2. Get Task: If a task (WorkItem) is available, the worker takes it.
  3. Load State: Reads the latest Checkpoint for the specific workflow run this task belongs to. This gives it the correct Context.
  4. Execute Step: Runs the processing logic defined by the corresponding Node (using the Workflow definition to find the right code).
  5. Handle Outcome: Based on the Transition returned by the Node:
    • Enqueues new tasks onto the WorkQueue for subsequent steps.
    • Updates the state if necessary.
  6. Save State: Saves a new Checkpoint reflecting the updated context and remaining tasks for this run.
  7. Repeat: Goes back to checking the queue for the next job.

Distributed Emphasis: The magic happens when you run multiple DistributedWorker instances. They can run on the same machine or, more importantly, on different machines. They all connect to the same shared WorkQueue and CheckpointStore. This allows them to work together in parallel, processing different tasks for different workflow runs (or even different tasks for the same run if a step used Transition::NextAll). This parallel, distributed processing is Floxide's core strength.

graph TD subgraph Shared Infrastructure WQ[(Work Queue)] CS[(Checkpoint Store)] end subgraph Machine 1 W1[Worker 1] end subgraph Machine 2 W2[Worker 2] W3[Worker 3] end %% Worker Interactions (Same pattern for all) W1 -- "Get/Put Task" --> WQ W1 -- "Load/Save State" --> CS W1 -- "Process Step" --> W1 W2 -- "Get/Put Task" --> WQ W2 -- "Load/Save State" --> CS W2 -- "Process Step" --> W2 W3 -- "Get/Put Task" --> WQ W3 -- "Load/Save State" --> CS W3 -- "Process Step" --> W3

How a DistributedWorker Uses Floxide Components

The DistributedWorker doesn't do everything from scratch. It relies heavily on the other abstractions we've learned about:

  • Workflow: The worker needs the workflow definition to know which Node logic to execute for a given WorkItem and how to handle transitions based on the defined edges. It primarily uses the step_distributed method provided by the Workflow trait implementation (generated by the workflow! macro).
  • WorkQueue: The worker constantly interacts with the queue to dequeue tasks to process and enqueue subsequent tasks.
  • CheckpointStore: Before executing a step, the worker loads the state. After execution, it saves the potentially updated state.
  • Distributed Stores (Chapter 9): Workers also interact with other stores to report their status (LivenessStore), record errors (ErrorStore), update metrics (MetricsStore), and manage the state of individual work items (WorkItemStateStore). This provides observability and control.
  • RetryPolicy (Chapter 10): The worker can be configured with a retry policy to automatically handle transient errors during step execution.

The Worker Loop in Action (Simplified)

Let's walk through a single cycle for one worker (Worker #42):

  1. Dequeue: Worker #42 calls queue.dequeue().
  2. Result: It receives Some(("video_abc", WorkItem::ExtractAudio("chunk_3.mp4"))). (It got a job!)
  3. Load Checkpoint: Worker #42 calls checkpoint_store.load("video_abc"). It gets back the Checkpoint containing the current Context (maybe {"api_key": "xyz", "processed_chunks": 2}) and the pending queue for this run (maybe [WorkItem::GenerateSubtitles("chunk_3.mp4")]).
  4. Execute Step: The worker looks at WorkItem::ExtractAudio. Using the Workflow definition, it finds the ExtractAudioNode logic. It calls the process method of that Node, passing the loaded Context and the input "chunk_3.mp4".
  5. Node Returns: The ExtractAudioNode finishes and returns Ok(Transition::Next("chunk_3.aac")). It might have internally used the api_key from the context. Let's assume it also updated the context conceptually to {"api_key": "xyz", "processed_chunks": 3}.
  6. Handle Transition: The worker sees Transition::Next. It checks the Workflow's edges for ExtractAudioNode. Let's say the edge points to GenerateSubtitlesNode.
  7. Enqueue Next: The worker creates WorkItem::GenerateSubtitles("chunk_3.aac") (using the output from step 5) and calls queue.enqueue("video_abc", new_work_item).
  8. Save Checkpoint: The worker creates a new Checkpoint containing the updated context ({"processed_chunks": 3}) and the updated pending queue (which is now empty, as the GenerateSubtitles item from step 3 was already present, and the new one from step 7 was added to the main work queue). It calls checkpoint_store.save("video_abc", new_checkpoint).
  9. Loop: Worker #42 goes back to step 1, ready for the next job.

If in step 2, queue.dequeue() returned None, the worker would typically wait for a short period and then try again.

Using DistributedWorker

You typically don't interact with the DistributedWorker struct directly in your Node logic. Instead, you configure and run it as a separate process or task. Floxide provides helpers to build and run workers.

First, you need instances of your workflow definition and all the required store implementations (using appropriate distributed backends like Redis or Kafka, not just in-memory ones for real distribution).

use floxide::{
    Workflow, DistributedWorker, WorkQueue, CheckpointStore, Context,
    // Import your specific workflow, context, and store implementations
    // e.g., TextProcessor, SimpleContext, RedisWorkQueue, RedisCheckpointStore etc.
};
use std::sync::Arc;
// Assume these are properly configured instances for distributed use
// let my_workflow: TextProcessor = // ... initialized workflow struct
// let my_queue: RedisWorkQueue<...> = // ... connected queue
// let my_checkpoint_store: RedisCheckpointStore<...> = // ... connected store
// let my_run_info_store: RedisRunInfoStore = // ...
// let my_metrics_store: RedisMetricsStore = // ...
// let my_error_store: RedisErrorStore = // ...
// let my_liveness_store: RedisLivenessStore = // ...
// let my_work_item_state_store: RedisWorkItemStateStore<...> = // ...

// Create a worker instance
let worker = DistributedWorker::new(
    my_workflow,
    my_queue,
    my_checkpoint_store,
    my_run_info_store,
    my_metrics_store,
    my_error_store,
    my_liveness_store,
    my_work_item_state_store
);

// Optional: Configure retry policy
// worker.set_retry_policy(RetryPolicy::default());

// Define a unique ID for this worker instance (e.g., from hostname/PID)
let worker_id = 42; // Or generate dynamically

This code sets up a DistributedWorker instance, providing it with everything it needs to operate: the workflow logic, the queue, and various state stores.

Then, you typically run the worker's main loop in an async task:

// This would usually be run within an async runtime like Tokio

// Clone the worker if needed (it's designed to be Clone-able)
let worker_clone = worker.clone();

// Spawn an async task to run the worker loop forever
tokio::spawn(async move {
    // run_forever loops indefinitely, processing tasks as they appear
    worker_clone.run_forever(worker_id).await;
    // Note: run_forever technically never returns Ok, it runs until cancelled or panics.
});

println!("Worker {} started and polling for tasks...", worker_id);
// Keep the main program alive, or manage worker tasks (e.g., using WorkerPool)
// ...

This code starts the worker. The run_forever method contains the core loop described earlier (dequeue, load, process, save, enqueue). You would typically launch many such worker tasks, potentially across multiple machines, all configured with the same shared stores and workflow definition.

Under the Hood: The step_distributed Method

The DistributedWorker itself doesn't contain the complex logic for loading, executing, saving, and enqueuing based on transitions. It delegates most of this to the Workflow trait's step_distributed method (which is implemented by the code generated by the workflow! macro).

The worker.run_once(worker_id) method essentially does this:

  1. Calls queue.dequeue().
  2. If a work_item and run_id are received:
    • Updates worker status (liveness, work item state) using the provided stores.
    • Calls self.workflow.step_distributed(...), passing the checkpoint_store, queue, run_id, and work_item.
    • The step_distributed implementation (generated by workflow!) handles:
      • Loading the checkpoint (store.load(run_id)).
      • Calling the correct Node's process method (using process_work_item).
      • Handling the Transition and enqueuing next items (queue.enqueue(...)).
      • Saving the new checkpoint (store.save(run_id, ...)).
    • Updates worker status based on the result of step_distributed.
  3. If no item is dequeued, it indicates idleness.

Here's a simplified sequence diagram of run_once:

sequenceDiagram participant Worker as DistributedWorker (run_once) participant WQ as WorkQueue participant WorkflowImpl as Workflow (step_distributed) participant Stores as Other Stores (Liveness, etc.) Worker->>WQ: dequeue() alt Task Found (run_id, item) WQ-->>Worker: Return Some((run_id, item)) Worker->>Stores: Update status (starting item) Worker->>WorkflowImpl: step_distributed(store, queue, run_id, item, ...) Note right of WorkflowImpl: Loads Checkpoint, <br/>Executes Node,<br/>Saves Checkpoint,<br/>Enqueues Next Tasks WorkflowImpl-->>Worker: Return Result (Ok(None) or Ok(Some(output)) or Err) Worker->>Stores: Update status (item finished/failed) Worker-->>Worker: Return result to caller else No Task Found WQ-->>Worker: Return None Worker->>Stores: Update status (idle) Worker-->>Worker: Return Ok(None) end

Let's look at simplified code from floxide-core/src/distributed/worker.rs:

The DistributedWorker struct holds all the necessary components:

// Simplified from crates/floxide-core/src/distributed/worker.rs
pub struct DistributedWorker<W, C, Q, S, ...>
where
    W: Workflow<C, WorkItem: 'static>,
    C: Context,
    Q: WorkQueue<C, W::WorkItem>,
    S: CheckpointStore<C, W::WorkItem>,
    // ... other store bounds ...
{
    workflow: W,
    queue: Q,
    checkpoint_store: S,
    run_info_store: RIS,
    // ... other stores ...
    retry_policy: Option<RetryPolicy>,
    // ...
}

The run_forever method loops, calling run_once:

// Simplified from crates/floxide-core/src/distributed/worker.rs
impl<...> DistributedWorker<...> {
    pub async fn run_forever(&self, worker_id: usize) -> std::convert::Infallible {
        loop {
            match self.run_once(worker_id).await {
                Ok(Some((_run_id, _output))) => {
                    // Work was done, loop immediately for more
                }
                Ok(None) => {
                    // No work found, sleep briefly
                    sleep(Duration::from_millis(100)).await;
                }
                Err(e) => {
                    // Error occurred, log and sleep
                    error!(worker_id, error = ?e, "Worker error");
                    sleep(Duration::from_millis(100)).await;
                }
            }
        }
    }
}

And run_once orchestrates the call to the workflow's core distributed step logic:

// Simplified CONCEPT from crates/floxide-core/src/distributed/worker.rs
// The actual implementation uses callbacks for state updates.
impl<...> DistributedWorker<...> {
    pub async fn run_once(&self, worker_id: usize) -> Result<Option<(String, W::Output)>, FloxideError> {

        // Update liveness/heartbeat (simplified)
        self.heartbeat(worker_id).await;

        // *** Core Logic: Delegate to Workflow::step_distributed ***
        // This method encapsulates: dequeue, load, process, save, enqueue
        match self.workflow.step_distributed(
            &self.checkpoint_store,
            &self.queue,
            worker_id,
            self.build_callbacks(worker_id) // Provides hooks for state updates
        ).await {
            Ok(Some((run_id, output))) => {
                // Workflow run completed!
                // Update status to idle
                self.on_idle_state_updates(worker_id).await?;
                Ok(Some((run_id, output)))
            }
            Ok(None) => {
                // Step processed, but workflow not finished OR queue was empty
                // Update status to idle
                self.on_idle_state_updates(worker_id).await?;
                Ok(None)
            }
            Err(step_error) => {
                // Handle step error (potentially using retry policy logic)
                // Update status to idle or failed
                self.on_idle_state_updates(worker_id).await?;
                Err(step_error.error) // Return the underlying FloxideError
            }
        }
    }

    // Helper to create callback object
    fn build_callbacks(&self, worker_id: usize) -> Arc<dyn StepCallbacks<C, W>> {
        // ... creates an object that updates stores on events ...
    }

    // Helpers for state updates via callbacks (simplified)
    async fn on_idle_state_updates(&self, worker_id: usize) -> Result<(), FloxideError>;
    async fn heartbeat(&self, worker_id: usize);
    // ... other state update helpers for start, success, error ...
}

The key takeaway is that the DistributedWorker acts as the runner or host process, performing the loop and calling the appropriate methods on the Workflow, WorkQueue, and CheckpointStore to execute the distributed steps defined by your application.

Worker Pools

Running and managing individual worker tasks can be tedious. Floxide often provides a WorkerPool utility (shown in the provided context code) that simplifies starting, stopping, and monitoring multiple DistributedWorker instances concurrently.

Conclusion

The DistributedWorker is the active entity in a Floxide distributed workflow. It's the "employee" that continuously: * Pulls tasks from the shared WorkQueue. * Loads the necessary state from the CheckpointStore. * Executes the Node logic defined in the Workflow (via step_distributed). * Saves the updated state back to the CheckpointStore. * Enqueues follow-up tasks.

By running multiple workers, potentially across many machines, all interacting with the same shared queue and stores, Floxide achieves parallel and distributed workflow execution.

But how do we start a new workflow run in this distributed environment? We can't just call my_workflow.run() anymore. We need a way to kick things off, create the initial checkpoint, and put the very first task onto the queue for the workers to find. That's the job of the DistributedOrchestrator.

Next: Chapter 8: DistributedOrchestrator