Batch Processing Implementation¶
This document describes the batch processing implementation in the Floxide framework.
Overview¶
Batch processing in the Floxide framework enables efficient parallel execution of workflows on collections of items. This capability is essential for handling large datasets and maximizing throughput in workflow applications.
Core Concepts¶
The batch processing implementation is built around several key concepts:
- BatchContext: A specialized context that manages collections of items
- BatchNode: A node that can process items in parallel
- BatchFlow: A workflow orchestrator for batch processing
BatchContext¶
The BatchContext
trait defines how contexts that support batch operations should behave:
/// Trait for contexts that support batch processing
pub trait BatchContext<T> {
/// Get the items to process in batch
fn get_batch_items(&self) -> Result<Vec<T>, FloxideError>;
/// Create a context for a single item
fn create_item_context(&self, item: T) -> Result<Self, FloxideError> where Self: Sized;
/// Update the main context with results from item processing
fn update_with_results(&mut self, results: Vec<Result<T, FloxideError>>) -> Result<(), FloxideError>;
}
A concrete implementation of BatchContext
typically wraps a standard context and adds batch-specific functionality:
pub struct SimpleBatchContext<S, T> {
inner_context: Context<S>,
items: Vec<T>,
results: Vec<Result<T, FloxideError>>,
}
impl<S, T> BatchContext<T> for SimpleBatchContext<S, T>
where
S: Clone + Send + 'static,
T: Clone + Send + 'static,
{
fn get_batch_items(&self) -> Result<Vec<T>, FloxideError> {
Ok(self.items.clone())
}
fn create_item_context(&self, item: T) -> Result<Self, FloxideError> {
Ok(SimpleBatchContext {
inner_context: Context::new(self.inner_context.state().clone()),
items: vec![item],
results: Vec::new(),
})
}
fn update_with_results(&mut self, results: Vec<Result<T, FloxideError>>) -> Result<(), FloxideError> {
self.results = results;
Ok(())
}
}
BatchNode¶
The BatchNode
trait extends the standard Node
trait with batch processing capabilities:
pub trait BatchNode<T>: Node {
/// Process a batch of items concurrently
fn process_batch(&self, ctx: &mut Self::Context, concurrency: usize) -> Result<(), FloxideError>
where
Self::Context: BatchContext<T>,
{
let items = ctx.get_batch_items()?;
let processor = BatchProcessor::new(concurrency);
let results = processor.process_items(items, |item| {
let mut item_ctx = ctx.create_item_context(item)?;
self.exec(&mut item_ctx)?;
Ok(item)
})?;
ctx.update_with_results(results)?;
Ok(())
}
}
BatchProcessor¶
The BatchProcessor
handles the actual parallel execution of items:
pub struct BatchProcessor {
concurrency_limit: usize,
}
impl BatchProcessor {
pub fn new(concurrency_limit: usize) -> Self {
Self { concurrency_limit }
}
pub fn process_items<T, F>(&self, items: Vec<T>, processor: F) -> Result<Vec<Result<T, FloxideError>>, FloxideError>
where
F: Fn(T) -> Result<T, FloxideError> + Send + Sync + 'static,
T: Send + 'static,
{
let semaphore = Arc::new(Semaphore::new(self.concurrency_limit));
let processor = Arc::new(processor);
let handles: Vec<_> = items
.into_iter()
.map(|item| {
let semaphore = Arc::clone(&semaphore);
let processor = Arc::clone(&processor);
thread::spawn(move || {
let _permit = semaphore.acquire();
processor(item)
})
})
.collect();
let results = handles
.into_iter()
.map(|handle| match handle.join() {
Ok(result) => result,
Err(_) => Err(FloxideError::ThreadPanicked),
})
.collect();
Ok(results)
}
}
BatchFlow¶
The BatchFlow
provides a simplified API for batch processing workflows:
pub struct BatchFlow<T, N>
where
N: BatchNode<T>,
{
node: N,
concurrency: usize,
_phantom: PhantomData<T>,
}
impl<T, N> BatchFlow<T, N>
where
N: BatchNode<T>,
{
pub fn new(node: N, concurrency: usize) -> Self {
Self {
node,
concurrency,
_phantom: PhantomData,
}
}
pub fn execute<C>(&self, mut context: C) -> Result<C, FloxideError>
where
C: BatchContext<T>,
{
self.node.process_batch(&mut context, self.concurrency)?;
Ok(context)
}
}
Usage Example¶
Here's an example of using the batch processing capabilities:
// Define a state type
struct MyState {
processed_count: usize,
}
// Define an item type
struct MyItem {
id: usize,
value: String,
}
// Create a batch context
let state = MyState { processed_count: 0 };
let items = vec![
MyItem { id: 1, value: "item1".to_string() },
MyItem { id: 2, value: "item2".to_string() },
MyItem { id: 3, value: "item3".to_string() },
];
let context = SimpleBatchContext::new(state, items);
// Create a batch node
struct MyBatchNode;
impl Node for MyBatchNode {
type Context = SimpleBatchContext<MyState, MyItem>;
fn exec(&self, ctx: &mut Self::Context) -> NodeResult {
// Process a single item
let item = &ctx.get_batch_items()?[0];
println!("Processing item {}: {}", item.id, item.value);
ctx.inner_context.state_mut().processed_count += 1;
Ok(())
}
}
impl BatchNode<MyItem> for MyBatchNode {}
// Create and execute a batch flow
let batch_flow = BatchFlow::new(MyBatchNode, 4);
let result = batch_flow.execute(context);
Conclusion¶
The batch processing implementation in the Floxide framework provides a powerful and flexible approach to parallel execution of workflows. By leveraging Rust's concurrency features and the framework's core abstractions, it enables efficient processing of large datasets while maintaining type safety and proper error handling.
For more detailed information on the batch processing implementation, refer to the Batch Processing Implementation ADR.