Nodes¶
Nodes are the fundamental building blocks of workflows in Floxide. Each node represents a discrete unit of work that can be executed as part of a workflow. This page explains the different types of nodes and how to create and use them.
The Node Trait¶
At the core of Floxide is the Node
trait, which defines the interface for executing a node:
#[async_trait]
pub trait Node<C, A> {
async fn execute(&self, context: &mut C) -> Result<A, FloxideError>;
}
Where:
C
is the context type that the node operates onA
is the action type that the node returns
This simple interface allows for a wide variety of node implementations, from simple function-based nodes to complex stateful nodes.
LifecycleNode¶
The most common type of node in Floxide is the LifecycleNode
, which follows a three-phase lifecycle:
- Preparation (Prep): Extract data from the context
- Execution (Exec): Process the data
- Post-processing (Post): Update the context with the result and determine the next action
The LifecycleNode
trait is defined as:
#[async_trait]
pub trait LifecycleNode<C, A>: Send + Sync {
type PrepOutput: Send;
type ExecOutput: Send;
async fn prep(&self, context: &mut C) -> Result<Self::PrepOutput, FloxideError>;
async fn exec(&self, prep_output: Self::PrepOutput) -> Result<Self::ExecOutput, FloxideError>;
async fn post(
&self,
prep_output: Self::PrepOutput,
exec_output: Self::ExecOutput,
context: &mut C,
) -> Result<A, FloxideError>;
}
This trait allows for a clear separation of concerns between the different phases of node execution.
Creating Nodes¶
Using the lifecycle_node
Function¶
The easiest way to create a LifecycleNode
is using the lifecycle_node
function:
use floxide_core::{lifecycle_node, LifecycleNode, DefaultAction};
// Define your context type
#[derive(Debug, Clone)]
struct MyContext {
input: String,
result: Option<String>,
}
// Create a node
fn create_processor_node() -> impl LifecycleNode<MyContext, DefaultAction> {
lifecycle_node(
Some("processor"), // Node ID
|ctx: &mut MyContext| async move {
// Preparation phase: extract data
Ok(ctx.input.clone())
},
|input: String| async move {
// Execution phase: process data
Ok(input.to_uppercase())
},
|ctx: &mut MyContext, result: String| async move {
// Post-processing phase: update context
ctx.result = Some(result);
Ok(DefaultAction::Next)
},
)
}
Implementing the Trait Manually¶
For more complex nodes, you can implement the LifecycleNode
trait directly:
use std::sync::atomic::{AtomicUsize, Ordering};
struct CounterNode {
id: String,
counter: AtomicUsize,
}
#[async_trait]
impl LifecycleNode<MyContext, DefaultAction> for CounterNode {
type PrepOutput = usize;
type ExecOutput = usize;
async fn prep(&self, _context: &mut MyContext) -> Result<Self::PrepOutput, FloxideError> {
Ok(self.counter.load(Ordering::Relaxed))
}
async fn exec(&self, current: Self::PrepOutput) -> Result<Self::ExecOutput, FloxideError> {
let new_value = current + 1;
self.counter.store(new_value, Ordering::Relaxed);
Ok(new_value)
}
async fn post(
&self,
_prep: Self::PrepOutput,
exec: Self::ExecOutput,
context: &mut MyContext,
) -> Result<DefaultAction, FloxideError> {
context.result = Some(format!("Count: {}", exec));
Ok(DefaultAction::Next)
}
}
Node Types¶
Transform Node¶
Transform nodes are specialized for data transformation operations:
use floxide_transform::{transform_node, TransformNode};
fn create_transform_node() -> impl TransformNode<String, String> {
transform_node(|input: String| async move {
Ok(input.to_uppercase())
})
}
Batch Node¶
Batch nodes process collections of items concurrently:
use floxide_batch::{batch_node, BatchNode};
fn create_batch_node() -> impl BatchNode<Vec<String>, Vec<String>> {
batch_node(
10, // Concurrency limit
|item: String| async move {
Ok(item.to_uppercase())
}
)
}
Event Node¶
Event nodes handle asynchronous events:
use floxide_event::{event_node, EventNode};
fn create_event_node() -> impl EventNode<MyContext, DefaultAction> {
event_node(
|event: Event| async move {
match event {
Event::Message(msg) => Ok(msg.process()),
Event::Timeout => Ok(DefaultAction::Stop),
_ => Ok(DefaultAction::Next),
}
}
)
}
Best Practices¶
1. Error Handling¶
Always implement proper error handling in your nodes:
fn create_robust_node() -> impl LifecycleNode<MyContext, DefaultAction> {
lifecycle_node(
Some("robust_processor"),
|ctx: &mut MyContext| async move {
ctx.input.parse::<i32>()
.map_err(|e| FloxideError::new(format!("Invalid input: {}", e)))
},
|num: i32| async move {
if num < 0 {
Err(FloxideError::new("Negative numbers not allowed"))
} else {
Ok(num * 2)
}
},
|ctx: &mut MyContext, result: i32| async move {
ctx.result = Some(result.to_string());
Ok(DefaultAction::Next)
},
)
}
2. Resource Management¶
For nodes that manage resources, implement proper cleanup:
struct ResourceNode {
connection: Arc<Mutex<Connection>>,
}
impl ResourceNode {
fn new() -> Self {
Self {
connection: Arc::new(Mutex::new(Connection::new())),
}
}
}
#[async_trait]
impl LifecycleNode<MyContext, DefaultAction> for ResourceNode {
// ... implementation ...
async fn post(
&self,
_prep: Self::PrepOutput,
exec: Self::ExecOutput,
context: &mut MyContext,
) -> Result<DefaultAction, FloxideError> {
// Clean up resources
self.connection.lock().await.cleanup();
Ok(DefaultAction::Next)
}
}
3. Context Management¶
Keep context modifications focused and explicit:
fn create_focused_node() -> impl LifecycleNode<MyContext, DefaultAction> {
lifecycle_node(
Some("focused_processor"),
|ctx: &mut MyContext| async move {
// Only access what you need
Ok(ctx.input.clone())
},
|input: String| async move {
Ok(input.to_uppercase())
},
|ctx: &mut MyContext, result: String| async move {
// Only modify what you need
ctx.result = Some(result);
Ok(DefaultAction::Next)
},
)
}
4. Testing¶
Make your nodes testable:
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_processor_node() {
let node = create_processor_node();
let mut context = MyContext {
input: "test".to_string(),
result: None,
};
let result = node.execute(&mut context).await;
assert!(result.is_ok());
assert_eq!(context.result, Some("TEST".to_string()));
}
}
Common Patterns¶
1. Conditional Execution¶
fn create_conditional_node() -> impl LifecycleNode<MyContext, CustomAction> {
lifecycle_node(
Some("conditional"),
|ctx: &mut MyContext| async move {
Ok(ctx.input.clone())
},
|input: String| async move {
if input.is_empty() {
Ok(CustomAction::Skip)
} else {
Ok(CustomAction::Process(input))
}
},
|ctx: &mut MyContext, action: CustomAction| async move {
match action {
CustomAction::Process(result) => {
ctx.result = Some(result);
Ok(CustomAction::Next)
}
CustomAction::Skip => Ok(CustomAction::Skip),
}
},
)
}
2. State Management¶
struct StatefulNode {
state: Arc<RwLock<HashMap<String, String>>>,
}
impl StatefulNode {
fn new() -> Self {
Self {
state: Arc::new(RwLock::new(HashMap::new())),
}
}
}
#[async_trait]
impl LifecycleNode<MyContext, DefaultAction> for StatefulNode {
// ... implementation with state management ...
}
3. Retry Logic¶
fn create_retry_node() -> impl LifecycleNode<MyContext, DefaultAction> {
lifecycle_node(
Some("retry_processor"),
|ctx: &mut MyContext| async move {
Ok((ctx.input.clone(), 0)) // Include retry count
},
|input: (String, i32)| async move {
match process_with_retry(input.0, input.1).await {
Ok(result) => Ok(result),
Err(e) if input.1 < 3 => {
tokio::time::sleep(Duration::from_secs(1)).await;
Err(FloxideError::new("Retry"))
}
Err(e) => Err(e),
}
},
|ctx: &mut MyContext, result: String| async move {
ctx.result = Some(result);
Ok(DefaultAction::Next)
},
)
}
Next Steps¶
Now that you understand nodes, you can:
- Learn about Workflows to see how nodes work together
- Explore Actions for flow control
- Study Contexts for state management
- Check out the Examples section for more patterns
Specialized Node Types¶
Floxide provides several specialized node types for common workflow patterns:
TransformNode¶
A TransformNode
is a simplified node that transforms an input to an output without the need for the full lifecycle. It's useful for data transformation steps in a workflow.
use floxide_transform::{transform_node, TransformNode};
fn create_transform_node() -> impl TransformNode<String, String> {
transform_node(|input: String| async move {
Ok(input.to_uppercase())
})
}
BatchNode¶
A BatchNode
processes a collection of items in parallel, with configurable concurrency limits.
use floxide_batch::{batch_node, BatchNode};
fn create_batch_node() -> impl BatchNode<Vec<String>, Vec<String>> {
batch_node(
10, // Concurrency limit
|item: String| async move {
Ok(item.to_uppercase())
}
)
}
EventNode¶
An EventNode
responds to external events, allowing for event-driven workflows.
use floxide_event::{event_node, EventNode};
fn create_event_node() -> impl EventNode<String, String> {
event_node(
|event: String| async move {
Ok(format!("Processed event: {}", event))
}
)
}
TimerNode¶
A TimerNode
executes based on time schedules, supporting one-time, interval, and calendar-based scheduling.
use floxide_timer::{timer_node, TimerNode, TimerContext};
use std::time::Duration;
fn create_timer_node() -> impl TimerNode<(), String> {
timer_node(
Duration::from_secs(60), // Execute every 60 seconds
|_: ()| async move {
Ok("Timer executed".to_string())
}
)
}
ReactiveNode¶
A ReactiveNode
reacts to changes in external data sources, such as files, databases, or streams.
use floxide_reactive::{reactive_node, ReactiveNode, ReactiveContext};
fn create_reactive_node() -> impl ReactiveNode<String, String> {
reactive_node(
|change: String| async move {
Ok(format!("Reacted to change: {}", change))
}
)
}
LongRunningNode¶
A LongRunningNode
is designed for processes that can be suspended and resumed, with state persistence between executions.
use floxide_longrunning::{longrunning_node, LongRunningNode, LongRunningContext};
fn create_longrunning_node() -> impl LongRunningNode<String, String> {
longrunning_node(
|state: Option<String>, input: String| async move {
let current_state = state.unwrap_or_default();
let new_state = format!("{} + {}", current_state, input);
Ok((new_state.clone(), new_state))
}
)
}
Node Composition¶
Nodes can be composed to create more complex workflows. The most common way to compose nodes is to use the Workflow
struct, which we'll cover in the Workflows section.
Best Practices¶
When creating nodes, consider the following best practices:
- Keep nodes focused: Each node should have a single responsibility.
- Use appropriate node types: Choose the right node type for your use case.
- Handle errors gracefully: Properly handle errors in each phase of the node lifecycle.
- Consider performance: Be mindful of performance implications, especially for long-running or resource-intensive operations.
- Leverage type safety: Use Rust's type system to ensure type safety between nodes.
Next Steps¶
Now that you understand nodes, you can learn about: