ADR-0007: Batch Processing Implementation¶
Status¶
Accepted
Date¶
2025-02-27
Context¶
The floxide framework needs batch processing capabilities to efficiently handle parallel execution of workflows on collections of items. We need to design a batch processing system that leverages Rust's ownership model and concurrency features.
We need to design a batch processing system that:
- Efficiently processes collections of items in parallel
- Respects configurable concurrency limits
- Provides proper error handling for individual item failures
- Integrates well with the existing workflow system
- Follows Rust idioms and best practices
Decision¶
We'll implement batch processing with a two-tier approach:
- A
BatchContext
trait to define contexts that support batch operations - A
BatchNode
implementation that can process items concurrently - A
BatchFlow
wrapper that provides a simplified API for batch execution
BatchContext Trait¶
The BatchContext
trait will define how batch-supporting contexts should behave:
/// Trait for contexts that support batch processing
pub trait BatchContext<T> {
/// Get the items to process in batch
fn get_batch_items(&self) -> Result<Vec<T>, FloxideError>;
/// Create a context for a single item
fn create_item_context(&self, item: T) -> Result<Self, FloxideError> where Self: Sized;
/// Update the main context with results from item processing
fn update_with_results(&mut self, results: Vec<Result<T, FloxideError>>) -> Result<(), FloxideError>;
}
BatchNode Implementation¶
The BatchNode
will implement the Node
trait and use Tokio tasks to process items in parallel, with a semaphore to control concurrency:
pub struct BatchNode<Context, ItemType, A = crate::action::DefaultAction>
where
Context: BatchContext<ItemType> + Send + Sync + 'static,
ItemType: Send + Sync + 'static,
A: ActionType + Send + Sync + 'static,
{
id: NodeId,
item_workflow: Arc<Workflow<Context, A>>,
parallelism: usize,
_phantom: PhantomData<(Context, ItemType, A)>,
}
BatchFlow Implementation¶
The BatchFlow
will provide a simpler way to execute batch operations without directly dealing with nodes:
pub struct BatchFlow<Context, ItemType, A = crate::action::DefaultAction>
where
Context: BatchContext<ItemType> + Send + Sync + 'static,
ItemType: Send + Sync + 'static,
A: ActionType + Send + Sync + 'static,
{
id: NodeId,
batch_node: BatchNode<Context, ItemType, A>,
}
Consequences¶
Advantages¶
- Better Parallelism: Rust's async runtime with Tokio provides efficient parallel processing
- Type Safety: The approach is fully type-safe with no runtime type checking needed
- Resource Control: Explicit concurrency controls prevent overwhelming system resources
- Integration: Seamlessly integrates with the existing workflow system
- Error Isolation: Individual item failures don't stop the entire batch
Disadvantages¶
- Complexity: Requires implementing the BatchContext trait for contexts that support batch operations
- Resource Overhead: Each parallel task incurs some overhead for spawning and synchronization
- Context Cloning: Requires contexts to be clonable, which might be inefficient for large contexts
Testing and Verification¶
We've added comprehensive tests for both BatchNode
and BatchFlow
to verify:
- Parallel processing capabilities
- Proper error handling
- Context updates after processing
- Integration with the workflow system
Alternatives Considered¶
Stream-Based Processing¶
We initially considered a purely stream-based approach using the futures crate StreamExt traits:
stream::iter(items)
.map(|item| async { /* process item */ })
.buffer_unordered(self.parallelism)
.collect::<Vec<_>>()
.await
However, this approach was more limited in handling context updates and didn't provide as much control over error handling.
Single-Threaded Processing¶
We considered a simpler, single-threaded approach that processes items sequentially. While simpler, this would not take advantage of multi-core systems for CPU-bound tasks.
Implementation Notes¶
- We're using Tokio's Semaphore for concurrency control
- Each item gets its own task, allowing true parallelism for CPU-bound operations
- Results are collected back in the original context after all items are processed
- Error handling preserves information about individual item failures