floxide-reactive API Reference

The floxide-reactive crate provides support for reactive patterns in the Floxide framework, enabling nodes to respond to changes in external data sources using a stream-based approach.

Core Types

ReactiveNode

#[async_trait]
pub trait ReactiveNode<Change, Context, Action>: Send + Sync
where
    Change: Send + 'static,
    Context: Send + Sync + 'static,
    Action: ActionType + Send + Sync + 'static + Debug,
{
    /// Set up a stream of changes to watch
    async fn watch(&self) -> Result<impl Stream<Item = Change> + Send, FloxideError>;

    /// React to a detected change
    async fn react_to_change(
        &self,
        change: Change,
        context: &mut Context,
    ) -> Result<Action, FloxideError>;

    /// Get the node's unique identifier
    fn id(&self) -> NodeId;
}

The ReactiveNode trait is the core abstraction for reactive patterns, providing: - Stream-based change detection - Context-aware change handling - Integration with the workflow engine

ReactiveError

pub enum ReactiveError {
    WatchError(String),
    StreamClosed,
    ConnectionError(String),
    ResourceNotFound(String),
}

Specialized error types for reactive operations, handling: - Resource watching failures - Stream lifecycle issues - Connection problems - Resource availability

Built-in Implementations

FileWatcherNode

pub struct FileWatcherNode<Context, Action>
where
    Context: Send + Sync + 'static,
    Action: ActionType + Send + Sync + 'static,
{
    // ...
}

A specialized node for file system monitoring: - File modification detection - Configurable polling intervals - Custom change handlers - Metadata tracking

CustomReactiveNode

pub struct CustomReactiveNode<Change, Context, Action, WatchFn, ReactFn>
where
    Change: Send + Sync + 'static,
    Context: Send + Sync + 'static,
    Action: ActionType + Send + Sync + 'static,
{
    // ...
}

A flexible implementation for custom reactive patterns: - User-defined watch functions - Custom change reaction logic - Configurable node identity

Stream Management

ReactiveNodeAdapter

pub struct ReactiveNodeAdapter<R, Change, Context, Action>
where
    R: ReactiveNode<Change, Context, Action>,
{
    // ...
}

Adapts reactive nodes to the standard Node interface: - Buffering with backpressure handling - Stream lifecycle management - Background processing

Buffering and Backpressure

The implementation includes configurable buffering to handle rapid changes:

// Configure buffer size
let node = reactive_node
    .with_buffer_size(100)
    .with_backoff(Duration::from_millis(100));

Usage Examples

File Watching

use floxide_reactive::{FileWatcherNode, FileChange};

let watcher = FileWatcherNode::new("config.toml")
    .with_poll_interval(Duration::from_secs(5))
    .with_change_handler(|change: FileChange, ctx: &mut Context| async move {
        match change {
            FileChange::Modified(path) => {
                println!("File modified: {}", path.display());
                Ok(DefaultAction::Next)
            }
            FileChange::NotFound => {
                println!("File not found");
                Ok(DefaultAction::Stop)
            }
        }
    });

let mut workflow = Workflow::new(watcher);
workflow.run(context).await?;

Custom Reactive Pattern

use floxide_reactive::CustomReactiveNode;

let node = CustomReactiveNode::new(
    // Watch function returns a stream of changes
    || async {
        let (tx, rx) = mpsc::channel(100);
        // Set up change detection...
        Ok(ReceiverStream::new(rx).boxed())
    },
    // React function handles changes
    |change, ctx| async move {
        println!("Processing change: {:?}", change);
        Ok(DefaultAction::Next)
    }
);

Best Practices

  1. Stream Management
  2. Configure appropriate buffer sizes based on change frequency
  3. Implement proper backpressure handling
  4. Clean up resources when streams are dropped
  5. Handle stream closure gracefully

  6. Error Handling

  7. Handle transient failures with retries
  8. Provide clear error context
  9. Clean up resources on error
  10. Log relevant error details

  11. Resource Usage

  12. Monitor memory usage with large streams
  13. Use appropriate polling intervals
  14. Implement proper cleanup
  15. Consider system resource limits

  16. Testing

  17. Test stream handling
  18. Verify error scenarios
  19. Check resource cleanup
  20. Test backpressure handling

See Also