Event-Driven Workflow Example¶
This document provides a complete example of using event-driven workflow capabilities in the Floxide framework.
Overview¶
Event-driven workflows allow you to build reactive systems that respond to external events in real-time. This example demonstrates how to create and use event-driven nodes to process events from various sources.
Prerequisites¶
Before running this example, ensure you have the following dependencies in your Cargo.toml
:
[dependencies]
floxide-core = "0.1.0"
floxide-transform = "0.1.0"
floxide-event = "0.1.0"
tokio = { version = "1.0", features = ["full"] }
chrono = { version = "0.4", features = ["serde"] }
Example Implementation¶
Step 1: Define Event Types¶
First, define the event types for your event-driven workflow:
use floxide_core::prelude::*;
use floxide_event::prelude::*;
use chrono::{DateTime, Utc};
use std::sync::Arc;
// Define a sensor event type
#[derive(Clone, Debug)]
struct SensorEvent {
id: String,
value: f64,
timestamp: DateTime<Utc>,
}
// Implement the Event trait for the sensor event
impl Event for SensorEvent {}
Step 2: Create Event Sources¶
Next, create event sources that will provide events to your workflow:
use tokio::sync::mpsc;
// Create channel-based event sources
let (sensor_tx, sensor_rx) = mpsc::channel::<SensorEvent>(100);
let sensor_source = ChannelEventSource::new(sensor_rx);
// Create a shared event source that can be used by multiple nodes
let shared_sensor_source = Arc::new(sensor_source);
Step 3: Implement Event-Driven Nodes¶
Now, implement event-driven nodes that will process the events:
// Create a sensor monitor node
struct SensorMonitorNode {
id: NodeId,
event_source: Arc<dyn EventSource<SensorEvent>>,
threshold: f64,
}
impl SensorMonitorNode {
fn new(id_str: &str, event_source: Arc<dyn EventSource<SensorEvent>>, threshold: f64) -> Self {
Self {
id: NodeId::from_string(id_str),
event_source,
threshold,
}
}
}
#[async_trait]
impl EventDrivenNode<SensorEvent> for SensorMonitorNode {
fn id(&self) -> NodeId {
self.id
}
async fn wait_for_event(&self) -> Result<SensorEvent, FloxideError> {
self.event_source.next_event().await
}
async fn process_event(&self, event: SensorEvent) -> Result<EventAction<SensorEvent>, FloxideError> {
println!("Processing sensor event: id={}, value={}", event.id, event.value);
// Route based on the sensor value
if event.value > self.threshold {
println!("High value detected, routing to alert handler");
Ok(EventAction::Route("alert_handler".to_string(), event))
} else {
println!("Normal value, routing to standard handler");
Ok(EventAction::Route("standard_handler".to_string(), event))
}
}
}
// Create an alert handler node
struct AlertHandlerNode {
id: NodeId,
}
impl AlertHandlerNode {
fn new(id_str: &str) -> Self {
Self {
id: NodeId::from_string(id_str),
}
}
}
#[async_trait]
impl EventDrivenNode<SensorEvent> for AlertHandlerNode {
fn id(&self) -> NodeId {
self.id
}
async fn wait_for_event(&self) -> Result<SensorEvent, FloxideError> {
// This node doesn't wait for events directly, it receives them from the workflow
// In a real implementation, you might want to add a timeout here
Err(FloxideError::EventSourceClosed)
}
async fn process_event(&self, event: SensorEvent) -> Result<EventAction<SensorEvent>, FloxideError> {
println!("ALERT: High sensor value detected: id={}, value={}", event.id, event.value);
// Send an alert notification (in a real system)
// ...
// Continue the workflow
Ok(EventAction::Route("logging".to_string(), event))
}
}
// Create a standard handler node
struct StandardHandlerNode {
id: NodeId,
}
impl StandardHandlerNode {
fn new(id_str: &str) -> Self {
Self {
id: NodeId::from_string(id_str),
}
}
}
#[async_trait]
impl EventDrivenNode<SensorEvent> for StandardHandlerNode {
fn id(&self) -> NodeId {
self.id
}
async fn wait_for_event(&self) -> Result<SensorEvent, FloxideError> {
// This node doesn't wait for events directly, it receives them from the workflow
Err(FloxideError::EventSourceClosed)
}
async fn process_event(&self, event: SensorEvent) -> Result<EventAction<SensorEvent>, FloxideError> {
println!("Standard processing for sensor: id={}, value={}", event.id, event.value);
// Process the event (in a real system)
// ...
// Continue the workflow
Ok(EventAction::Route("logging".to_string(), event))
}
}
// Create a logging node
struct LoggingNode {
id: NodeId,
}
impl LoggingNode {
fn new(id_str: &str) -> Self {
Self {
id: NodeId::from_string(id_str),
}
}
}
#[async_trait]
impl EventDrivenNode<SensorEvent> for LoggingNode {
fn id(&self) -> NodeId {
self.id
}
async fn wait_for_event(&self) -> Result<SensorEvent, FloxideError> {
// This node doesn't wait for events directly, it receives them from the workflow
Err(FloxideError::EventSourceClosed)
}
async fn process_event(&self, event: SensorEvent) -> Result<EventAction<SensorEvent>, FloxideError> {
println!("Logging event: id={}, value={}, timestamp={}",
event.id, event.value, event.timestamp);
// In a real system, you might log to a database or file
// ...
// Return to the monitor node to wait for the next event
Ok(EventAction::Route("monitor".to_string(), event))
}
}
Step 4: Create and Configure the Workflow¶
Now, create and configure the event-driven workflow:
// Create the event-driven workflow
let mut workflow = EventDrivenWorkflow::<SensorEvent>::new();
// Create the nodes
let monitor_node = SensorMonitorNode::new("monitor", shared_sensor_source, 100.0);
let alert_handler = AlertHandlerNode::new("alert_handler");
let standard_handler = StandardHandlerNode::new("standard_handler");
let logging_node = LoggingNode::new("logging");
// Add nodes to the workflow
let monitor_id = workflow.add_node(monitor_node);
let alert_id = workflow.add_node(alert_handler);
let standard_id = workflow.add_node(standard_handler);
let logging_id = workflow.add_node(logging_node);
// Configure routes
workflow.set_initial_node(monitor_id);
workflow.set_route("alert_handler", alert_id);
workflow.set_route("standard_handler", standard_id);
workflow.set_route("logging", logging_id);
workflow.set_route("monitor", monitor_id);
// Set a timeout for the workflow (optional)
workflow.set_timeout(std::time::Duration::from_secs(300)); // 5 minutes
Step 5: Run the Workflow and Send Events¶
Finally, run the workflow and send events to it:
#[tokio::main]
async fn main() -> Result<(), FloxideError> {
// Create and configure the workflow (as shown above)
// ...
// Run the workflow in a separate task
let workflow_handle = tokio::spawn(async move {
if let Err(e) = workflow.run().await {
eprintln!("Workflow error: {}", e);
}
});
// Send some test events
for i in 1..=5 {
let value = if i % 2 == 0 { 120.0 } else { 80.0 };
let event = SensorEvent {
id: format!("sensor-{}", i),
value,
timestamp: Utc::now(),
};
println!("Sending event: id={}, value={}", event.id, event.value);
sensor_tx.send(event).await.unwrap();
// Wait a bit between events
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
}
// Wait a bit for processing to complete
tokio::time::sleep(tokio::time::Duration::from_secs(2)).await;
// In a real application, you might wait for user input to terminate
// or use a signal handler to gracefully shut down
Ok(())
}
Running the Example¶
To run this example:
- Create a new Rust project with the dependencies listed above
- Copy the code into your
src/main.rs
file - Run the example with
cargo run
You should see output similar to:
Sending event: id=sensor-1, value=80.0
Processing sensor event: id=sensor-1, value=80.0
Normal value, routing to standard handler
Standard processing for sensor: id=sensor-1, value=80.0
Logging event: id=sensor-1, value=80.0, timestamp=2024-02-25T12:34:56.789012Z
Sending event: id=sensor-2, value=120.0
Processing sensor event: id=sensor-2, value=120.0
High value detected, routing to alert handler
ALERT: High sensor value detected: id=sensor-2, value=120.0
Logging event: id=sensor-2, value=120.0, timestamp=2024-02-25T12:34:57.789012Z
...
Advanced Techniques¶
Custom Event Sources¶
You can create custom event sources for different types of event producers:
// Create a WebSocket event source
struct WebSocketEventSource<E> {
// WebSocket connection details
// ...
_phantom: PhantomData<E>,
}
#[async_trait]
impl<E> EventSource<E> for WebSocketEventSource<E>
where
E: Event + Send + 'static,
for<'de> E: serde::Deserialize<'de>,
{
async fn next_event(&self) -> Result<E, FloxideError> {
// Wait for and parse the next WebSocket message
// ...
}
async fn has_more_events(&self) -> Result<bool, FloxideError> {
// Check if the WebSocket connection is still open
// ...
}
}
Event Filtering¶
You can add filtering capabilities to your event-driven nodes:
struct FilteredSensorNode {
id: NodeId,
event_source: Arc<dyn EventSource<SensorEvent>>,
filter: Box<dyn Fn(&SensorEvent) -> bool + Send + Sync>,
}
impl FilteredSensorNode {
fn new(
id_str: &str,
event_source: Arc<dyn EventSource<SensorEvent>>,
filter: impl Fn(&SensorEvent) -> bool + Send + Sync + 'static,
) -> Self {
Self {
id: NodeId::from_string(id_str),
event_source,
filter: Box::new(filter),
}
}
}
#[async_trait]
impl EventDrivenNode<SensorEvent> for FilteredSensorNode {
// ... implementation with filtering logic
}
// Usage
let temperature_filter = |event: &SensorEvent| event.id.starts_with("temp-");
let filtered_node = FilteredSensorNode::new("temp_monitor", shared_source, temperature_filter);
Conclusion¶
This example demonstrates how to use event-driven workflows in the Floxide framework to build reactive systems. By leveraging the EventDrivenNode
, EventSource
, and EventDrivenWorkflow
abstractions, you can create powerful and flexible event processing pipelines.
For more information on event-driven workflows, refer to the Event-Driven Workflow Pattern documentation.