floxide-event API Reference¶
This document provides a reference for the floxide-event
crate, which extends the Floxide framework with event-driven capabilities.
Overview¶
The floxide-event
crate provides extensions to the core Floxide framework for building event-driven workflows, including:
- Event-driven node abstractions
- Event source implementations
- Event-driven workflow orchestration
- Integration with standard workflows
Core Modules¶
Event Module¶
The Event module provides the core event abstractions:
use floxide_event::event::{Event, EventAction};
// Define a custom event type
#[derive(Clone)]
struct SensorEvent {
id: String,
value: f64,
}
// Implement the Event trait
impl Event for SensorEvent {}
EventDrivenNode Module¶
The EventDrivenNode module provides the core node abstractions for event-driven workflows:
use floxide_event::node::{EventDrivenNode, NodeId};
use async_trait::async_trait;
struct MyEventNode {
id: NodeId,
}
#[async_trait]
impl EventDrivenNode<SensorEvent> for MyEventNode {
fn id(&self) -> NodeId {
self.id
}
async fn wait_for_event(&self) -> Result<SensorEvent, FloxideError> {
// Wait for an event
// ...
}
async fn process_event(&self, event: SensorEvent) -> Result<EventAction<SensorEvent>, FloxideError> {
// Process the event
// ...
Ok(EventAction::Route("next_node".to_string(), event))
}
}
EventSource Module¶
The EventSource module provides abstractions and implementations for event sources:
use floxide_event::source::{EventSource, ChannelEventSource};
use tokio::sync::mpsc;
// Create a channel-based event source
let (tx, rx) = mpsc::channel::<SensorEvent>(100);
let source = ChannelEventSource::new(rx);
// Send an event
tx.send(SensorEvent { id: "sensor1".to_string(), value: 42.0 }).await?;
Workflow Module¶
The Workflow module provides the event-driven workflow orchestration:
use floxide_event::workflow::EventDrivenWorkflow;
// Create an event-driven workflow
let mut workflow = EventDrivenWorkflow::<SensorEvent>::new();
// Add nodes and configure routes
let node_id = workflow.add_node(MyEventNode { id: NodeId::new() });
workflow.set_initial_node(node_id);
workflow.set_route("next_node", node_id); // Loop back to the same node
// Run the workflow
workflow.run().await?;
Key Types¶
Event Trait¶
The core trait for event types:
pub trait Event: Clone + Send + 'static {}
EventDrivenNode Trait¶
The core trait for event-driven nodes:
#[async_trait]
pub trait EventDrivenNode<E>: Send + Sync + 'static
where
E: Event + Send + 'static,
{
/// Get the node's unique identifier
fn id(&self) -> NodeId;
/// Wait for an event to arrive
async fn wait_for_event(&self) -> Result<E, FloxideError>;
/// Process an event and return an action
async fn process_event(&self, event: E) -> Result<EventAction<E>, FloxideError>;
}
EventSource Trait¶
The core trait for event sources:
#[async_trait]
pub trait EventSource<E>: Send + Sync + 'static
where
E: Event + Send + 'static,
{
/// Get the next event from the source
async fn next_event(&self) -> Result<E, FloxideError>;
/// Check if the source has more events
async fn has_more_events(&self) -> Result<bool, FloxideError>;
}
EventAction Enum¶
The action returned by event-driven nodes:
pub enum EventAction<E>
where
E: Event + Send + 'static,
{
/// Route the event to another node
Route(String, E),
/// Terminate the workflow
Terminate,
}
EventDrivenWorkflow¶
The workflow orchestrator for event-driven nodes:
pub struct EventDrivenWorkflow<E>
where
E: Event + Send + Clone + 'static,
{
nodes: HashMap<NodeId, Box<dyn EventDrivenNode<E>>>,
routes: HashMap<String, NodeId>,
initial_node: NodeId,
timeout: Option<Duration>,
}
Usage Examples¶
Basic Event-Driven Workflow¶
use floxide_core::prelude::*;
use floxide_event::prelude::*;
use async_trait::async_trait;
use std::sync::Arc;
use tokio::sync::mpsc;
// Define an event type
#[derive(Clone)]
struct SensorEvent {
id: String,
value: f64,
}
impl Event for SensorEvent {}
// Create an event source
let (tx, rx) = mpsc::channel::<SensorEvent>(100);
let source = Arc::new(ChannelEventSource::new(rx));
// Create an event-driven node
struct SensorNode {
id: NodeId,
source: Arc<dyn EventSource<SensorEvent>>,
}
#[async_trait]
impl EventDrivenNode<SensorEvent> for SensorNode {
fn id(&self) -> NodeId {
self.id
}
async fn wait_for_event(&self) -> Result<SensorEvent, FloxideError> {
self.source.next_event().await
}
async fn process_event(&self, event: SensorEvent) -> Result<EventAction<SensorEvent>, FloxideError> {
println!("Sensor {}: {}", event.id, event.value);
if event.value > 100.0 {
Ok(EventAction::Terminate)
} else {
Ok(EventAction::Route("self".to_string(), event))
}
}
}
// Create and run the workflow
#[tokio::main]
async fn main() -> Result<(), FloxideError> {
let mut workflow = EventDrivenWorkflow::<SensorEvent>::new();
let node = SensorNode {
id: NodeId::new(),
source: Arc::clone(&source),
};
let node_id = workflow.add_node(node);
workflow.set_initial_node(node_id);
workflow.set_route("self", node_id);
// Run the workflow in a separate task
let workflow_handle = tokio::spawn(async move {
workflow.run().await
});
// Send events
tx.send(SensorEvent { id: "sensor1".to_string(), value: 42.0 }).await?;
tx.send(SensorEvent { id: "sensor1".to_string(), value: 75.0 }).await?;
tx.send(SensorEvent { id: "sensor1".to_string(), value: 120.0 }).await?;
// Wait for the workflow to complete
workflow_handle.await??;
Ok(())
}
Integration with Standard Workflows¶
use floxide_core::prelude::*;
use floxide_event::prelude::*;
use floxide_event::adapter::EventDrivenNodeAdapter;
// Create an event-driven node adapter
let event_node = SensorNode {
id: NodeId::new(),
source: Arc::clone(&source),
};
let adapter = EventDrivenNodeAdapter::new(event_node);
// Use the adapter in a standard workflow
let mut workflow = Workflow::new();
workflow.add_node("event_node", adapter);
workflow.set_initial_node("event_node");
// Run the standard workflow
workflow.run(&mut context).await?;
Best Practices¶
When using the floxide-event
crate, consider these best practices:
- Event Design: Design event types to be small, focused, and immutable.
- Error Handling: Properly handle errors in event processing, especially for external event sources.
- Timeouts: Use timeouts to prevent workflows from hanging indefinitely.
- Resource Management: Ensure event sources are properly closed when no longer needed.
- Backpressure: Implement backpressure mechanisms for high-volume event sources.