Reactive Node Example

This example demonstrates how to use reactive nodes in the Floxide framework for building event-driven, reactive workflows.

Overview

Reactive nodes enable: - Stream processing - Event-driven workflows - Real-time data processing - Backpressure handling

Implementation

Let's create a reactive workflow that processes a stream of temperature readings:

use floxide_core::{ActionType, DefaultAction, FloxideError, NodeId};
use floxide_reactive::{CustomReactiveNode, ReactiveNode};
use futures::stream::{self, Stream};
use std::time::{Duration, Instant};
use tokio::time;

// Define our data types
#[derive(Debug, Clone)]
struct Temperature {
    celsius: f64,
    timestamp: Instant,
}

#[derive(Debug, Clone)]
struct Alert {
    message: String,
    level: AlertLevel,
    timestamp: Instant,
}

#[derive(Debug, Clone)]
enum AlertLevel {
    Info,
    Warning,
    Critical,
}

// Define our context
#[derive(Debug, Clone)]
struct MonitoringContext {
    temperatures: Vec<Temperature>,
    alerts: Vec<Alert>,
}

// Create a temperature monitoring node
fn create_temperature_monitor() -> impl ReactiveNode<Temperature, MonitoringContext, DefaultAction> {
    CustomReactiveNode::new(
        // Watch function that creates a stream of temperature readings
        || {
            let stream = stream::unfold(0, |state| async move {
                time::sleep(Duration::from_secs(1)).await;
                let temp = Temperature {
                    celsius: 20.0 + (state as f64 / 10.0),
                    timestamp: Instant::now(),
                };
                Some((temp, state + 1))
            });
            Ok(Box::new(stream))
        },
        // React function that processes each temperature reading
        |temp: Temperature, ctx: &mut MonitoringContext| {
            ctx.temperatures.push(temp.clone());
            
            let alert = match temp.celsius {
                t if t > 25.0 => Some(Alert {
                    message: format!("Critical temperature: {:.1}°C", t),
                    level: AlertLevel::Critical,
                    timestamp: temp.timestamp,
                }),
                t if t > 22.0 => Some(Alert {
                    message: format!("Warning temperature: {:.1}°C", t),
                    level: AlertLevel::Warning,
                    timestamp: temp.timestamp,
                }),
                _ => None,
            };

            if let Some(alert) = alert {
                ctx.alerts.push(alert);
                Ok(DefaultAction::change_detected())
            } else {
                Ok(DefaultAction::no_change())
            }
        },
    )
}

// Example usage
#[tokio::main]
async fn main() -> Result<(), FloxideError> {
    let mut ctx = MonitoringContext {
        temperatures: Vec::new(),
        alerts: Vec::new(),
    };

    let monitor = create_temperature_monitor();
    
    // Set up the watch stream with backpressure handling
    let mut stream = monitor.watch().await?;
    
    // Process temperature readings with proper error handling
    while let Some(temp) = stream.next().await {
        match monitor.react_to_change(temp, &mut ctx).await {
            Ok(action) if action.is_change_detected() => {
                println!("Alert triggered! Total alerts: {}", ctx.alerts.len());
            }
            Ok(_) => {
                println!("Temperature normal: {:.1}°C", ctx.temperatures.last().unwrap().celsius);
            }
            Err(e) => {
                eprintln!("Error processing temperature: {}", e);
                // Implement your error recovery strategy here
            }
        }
    }

    Ok(())
}

Running the Example

Here's how to run the reactive nodes:

#[tokio::main]
async fn main() -> Result<(), FloxideError> {
    // Create the reactive stream
    let (temperature_stream, temperature_sender) = ReactiveStream::new();

    // Create the nodes
    let monitor = create_temperature_monitor();
    let processor = create_alert_processor();

    // Create the workflow
    let workflow = temperature_stream
        .through(monitor)
        .through(processor)
        .for_each(|message| println!("{}", message));

    // Send some test data
    temperature_sender.send(Temperature {
        celsius: 24.0,
        timestamp: Instant::now(),
    }).await;

    temperature_sender.send(Temperature {
        celsius: 26.0,
        timestamp: Instant::now(),
    }).await;

    temperature_sender.send(Temperature {
        celsius: 31.0,
        timestamp: Instant::now(),
    }).await;
}

Advanced Usage

Backpressure Handling

// Create a node with backpressure handling
fn create_throttled_monitor() -> impl ReactiveNode<Temperature, Alert> {
    reactive_node(|temp: Temperature| async move {
        // ... processing logic ...
    })
    .with_buffer(100) // Buffer up to 100 items
    .with_backpressure(BackpressureStrategy::DropOldest)
}

Error Handling

// Create a node with error recovery
fn create_robust_monitor() -> impl ReactiveNode<Temperature, Alert> {
    reactive_node(|temp: Temperature| async move {
        if temp.celsius < -273.15 {
            return Err(FloxideError::new("Invalid temperature"));
        }
        // ... normal processing ...
    })
    .with_retry(3)
    .with_error_handler(|e| {
        println!("Error processing temperature: {}", e);
        None // Skip invalid temperatures
    })
}

Combining Streams

// Combine multiple temperature streams
let combined = ReactiveStream::combine(
    vec![temp_stream1, temp_stream2, temp_stream3],
    |temps: Vec<Temperature>| {
        // Process multiple temperature readings
        let avg = temps.iter().map(|t| t.celsius).sum::<f64>() / temps.len() as f64;
        Temperature {
            celsius: avg,
            timestamp: Instant::now(),
        }
    }
);

Best Practices

  1. Stream Management
  2. Handle backpressure appropriately
  3. Clean up resources when streams are dropped
  4. Use appropriate buffer sizes

  5. Error Handling

  6. Implement proper error recovery
  7. Handle stream completion
  8. Log errors for debugging

  9. Testing

  10. Test stream processing logic
  11. Verify backpressure handling
  12. Test error scenarios

See Also