use std::{sync::Arc, time::Duration}; use robotnik::{event::Event, event_manager::EventManager}; use rstest::rstest; use tokio::{ io::{AsyncBufReadExt, BufReader}, net::UnixStream, time::timeout, }; const TEST_SOCKET_BASE: &str = "/tmp/robotnik_test"; /// Helper to create unique socket paths for parallel tests fn test_socket_path(name: &str) -> String { format!("{}_{}_{}", TEST_SOCKET_BASE, name, std::process::id()) } /// Helper to read one JSON event from a stream async fn read_event( reader: &mut BufReader, ) -> Result> { let mut line = String::new(); reader.read_line(&mut line).await?; let event: Event = serde_json::from_str(&line)?; Ok(event) } /// Helper to read all available events with a timeout async fn read_events_with_timeout( reader: &mut BufReader, max_count: usize, timeout_ms: u64, ) -> Vec { let mut events = Vec::new(); for _ in 0..max_count { let mut line = String::new(); match timeout( Duration::from_millis(timeout_ms), reader.read_line(&mut line), ) .await { Ok(Ok(0)) => break, // EOF Ok(Ok(_)) => events.push(line), Ok(Err(_)) => break, // Read error Err(_) => break, // Timeout } } events } #[tokio::test] async fn test_client_connects_and_receives_event() { let socket_path = test_socket_path("basic_connect"); let manager = Arc::new(EventManager::new().unwrap()); // Start the listener let listener_manager = Arc::clone(&manager); let socket_path_clone = socket_path.clone(); tokio::spawn(async move { listener_manager.start_listening(socket_path_clone).await; }); // Give the listener time to start tokio::time::sleep(Duration::from_millis(50)).await; // Broadcast an event let event = Event::new("test message"); manager.broadcast(&event).await.unwrap(); // Connect as a client let stream = UnixStream::connect(&socket_path).await.unwrap(); let mut reader = BufReader::new(stream); // Read the event let mut line = String::new(); reader.read_line(&mut line).await.unwrap(); assert!(line.contains("test message")); assert!(line.ends_with('\n')); // Cleanup let _ = std::fs::remove_file(&socket_path); } #[tokio::test] async fn test_client_receives_event_history() { let socket_path = test_socket_path("event_history"); let manager = Arc::new(EventManager::new().unwrap()); // Broadcast events BEFORE starting the listener for i in 0..5 { let event = Event::new(format!("historical event {}", i)); manager.broadcast(&event).await.unwrap(); } // Start the listener let listener_manager = Arc::clone(&manager); let socket_path_clone = socket_path.clone(); tokio::spawn(async move { listener_manager.start_listening(socket_path_clone).await; }); tokio::time::sleep(Duration::from_millis(50)).await; // Connect as a client let stream = UnixStream::connect(&socket_path).await.unwrap(); let mut reader = BufReader::new(stream); // Should receive all 5 historical events let events = read_events_with_timeout(&mut reader, 5, 100).await; assert_eq!(events.len(), 5); assert!(events[0].contains("historical event 0")); assert!(events[4].contains("historical event 4")); // Cleanup let _ = std::fs::remove_file(&socket_path); } #[tokio::test] async fn test_multiple_clients_receive_same_events() { let socket_path = test_socket_path("multiple_clients"); let manager = Arc::new(EventManager::new().unwrap()); // Start the listener let listener_manager = Arc::clone(&manager); let socket_path_clone = socket_path.clone(); tokio::spawn(async move { listener_manager.start_listening(socket_path_clone).await; }); tokio::time::sleep(Duration::from_millis(50)).await; // Connect 3 clients let stream1 = UnixStream::connect(&socket_path).await.unwrap(); let stream2 = UnixStream::connect(&socket_path).await.unwrap(); let stream3 = UnixStream::connect(&socket_path).await.unwrap(); let mut reader1 = BufReader::new(stream1); let mut reader2 = BufReader::new(stream2); let mut reader3 = BufReader::new(stream3); // Broadcast a new event let event = Event::new("broadcast to all"); manager.broadcast(&event).await.unwrap(); // All clients should receive the event let mut line1 = String::new(); let mut line2 = String::new(); let mut line3 = String::new(); timeout(Duration::from_millis(100), reader1.read_line(&mut line1)) .await .unwrap() .unwrap(); timeout(Duration::from_millis(100), reader2.read_line(&mut line2)) .await .unwrap() .unwrap(); timeout(Duration::from_millis(100), reader3.read_line(&mut line3)) .await .unwrap() .unwrap(); assert!(line1.contains("broadcast to all")); assert!(line2.contains("broadcast to all")); assert!(line3.contains("broadcast to all")); // Cleanup let _ = std::fs::remove_file(&socket_path); } #[tokio::test] async fn test_late_joiner_receives_full_history() { let socket_path = test_socket_path("late_joiner"); let manager = Arc::new(EventManager::new().unwrap()); // Start the listener let listener_manager = Arc::clone(&manager); let socket_path_clone = socket_path.clone(); tokio::spawn(async move { listener_manager.start_listening(socket_path_clone).await; }); tokio::time::sleep(Duration::from_millis(50)).await; // First client connects let stream1 = UnixStream::connect(&socket_path).await.unwrap(); let mut reader1 = BufReader::new(stream1); // Broadcast several events for i in 0..10 { let event = Event::new(format!("event {}", i)); manager.broadcast(&event).await.unwrap(); } // Consume events from first client let _ = read_events_with_timeout(&mut reader1, 10, 100).await; // Late joiner connects let stream2 = UnixStream::connect(&socket_path).await.unwrap(); let mut reader2 = BufReader::new(stream2); // Late joiner should receive all 10 events from history let events = read_events_with_timeout(&mut reader2, 10, 100).await; assert_eq!(events.len(), 10); assert!(events[0].contains("event 0")); assert!(events[9].contains("event 9")); // Cleanup let _ = std::fs::remove_file(&socket_path); } #[tokio::test] async fn test_client_receives_events_in_order() { let socket_path = test_socket_path("event_order"); let manager = Arc::new(EventManager::new().unwrap()); // Start the listener let listener_manager = Arc::clone(&manager); let socket_path_clone = socket_path.clone(); tokio::spawn(async move { listener_manager.start_listening(socket_path_clone).await; }); tokio::time::sleep(Duration::from_millis(50)).await; // Connect client let stream = UnixStream::connect(&socket_path).await.unwrap(); let mut reader = BufReader::new(stream); // Broadcast events rapidly let count = 50; for i in 0..count { let event = Event::new(format!("sequence {}", i)); manager.broadcast(&event).await.unwrap(); } // Read all events let events = read_events_with_timeout(&mut reader, count, 500).await; assert_eq!(events.len(), count); // Verify order for (i, event) in events.iter().enumerate() { assert!( event.contains(&format!("sequence {}", i)), "Event {} out of order: {}", i, event ); } // Cleanup let _ = std::fs::remove_file(&socket_path); } #[tokio::test] async fn test_concurrent_broadcasts_during_client_connections() { let socket_path = test_socket_path("concurrent_ops"); let manager = Arc::new(EventManager::new().unwrap()); // Start the listener let listener_manager = Arc::clone(&manager); let socket_path_clone = socket_path.clone(); tokio::spawn(async move { listener_manager.start_listening(socket_path_clone).await; }); tokio::time::sleep(Duration::from_millis(50)).await; // Connect client 1 BEFORE any broadcasts let stream1 = UnixStream::connect(&socket_path).await.unwrap(); let mut reader1 = BufReader::new(stream1); // Spawn a task that continuously broadcasts let broadcast_manager = Arc::clone(&manager); let broadcast_handle = tokio::spawn(async move { for i in 0..100 { let event = Event::new(format!("concurrent event {}", i)); broadcast_manager.broadcast(&event).await.unwrap(); tokio::time::sleep(Duration::from_millis(5)).await; } }); // While broadcasting, connect more clients at different times tokio::time::sleep(Duration::from_millis(100)).await; let stream2 = UnixStream::connect(&socket_path).await.unwrap(); let mut reader2 = BufReader::new(stream2); tokio::time::sleep(Duration::from_millis(150)).await; let stream3 = UnixStream::connect(&socket_path).await.unwrap(); let mut reader3 = BufReader::new(stream3); // Wait for broadcasts to complete broadcast_handle.await.unwrap(); // All clients should have received events let events1 = read_events_with_timeout(&mut reader1, 100, 200).await; let events2 = read_events_with_timeout(&mut reader2, 100, 200).await; let events3 = read_events_with_timeout(&mut reader3, 100, 200).await; // Client 1 connected first (before any broadcasts), should get all 100 assert_eq!(events1.len(), 100); // Client 2 connected after ~20 events were broadcast // Gets ~20 from history + ~80 live = 100 assert_eq!(events2.len(), 100); // Client 3 connected after ~50 events were broadcast // Gets ~50 from history + ~50 live = 100 assert_eq!(events3.len(), 100); // Verify they all received events in order assert!(events1[0].contains("concurrent event 0")); assert!(events2[0].contains("concurrent event 0")); assert!(events3[0].contains("concurrent event 0")); // Cleanup let _ = std::fs::remove_file(&socket_path); } #[tokio::test] async fn test_buffer_overflow_affects_new_clients() { let socket_path = test_socket_path("buffer_overflow"); let manager = Arc::new(EventManager::new().unwrap()); // Broadcast more than buffer max (1000) for i in 0..1100 { let event = Event::new(format!("overflow event {}", i)); manager.broadcast(&event).await.unwrap(); } // Start the listener let listener_manager = Arc::clone(&manager); let socket_path_clone = socket_path.clone(); tokio::spawn(async move { listener_manager.start_listening(socket_path_clone).await; }); tokio::time::sleep(Duration::from_millis(50)).await; // New client connects let stream = UnixStream::connect(&socket_path).await.unwrap(); let mut reader = BufReader::new(stream); // Should receive exactly 1000 events (buffer max) let events = read_events_with_timeout(&mut reader, 1100, 500).await; assert_eq!(events.len(), 1000); // First event should be 100 (oldest 100 were evicted) assert!(events[0].contains("overflow event 100")); // Last event should be 1099 assert!(events[999].contains("overflow event 1099")); // Cleanup let _ = std::fs::remove_file(&socket_path); } #[rstest] #[case(10, 1)] #[case(50, 5)] #[tokio::test] async fn test_client_count_scaling(#[case] num_clients: usize, #[case] events_per_client: usize) { let socket_path = test_socket_path(&format!("scaling_{}_{}", num_clients, events_per_client)); let manager = Arc::new(EventManager::new().unwrap()); // Start the listener let listener_manager = Arc::clone(&manager); let socket_path_clone = socket_path.clone(); tokio::spawn(async move { listener_manager.start_listening(socket_path_clone).await; }); tokio::time::sleep(Duration::from_millis(50)).await; // Connect many clients let mut readers = Vec::new(); for _ in 0..num_clients { let stream = UnixStream::connect(&socket_path).await.unwrap(); readers.push(BufReader::new(stream)); } // Broadcast events for i in 0..events_per_client { let event = Event::new(format!("scale event {}", i)); manager.broadcast(&event).await.unwrap(); } // Verify all clients received all events for reader in &mut readers { let events = read_events_with_timeout(reader, events_per_client, 200).await; assert_eq!(events.len(), events_per_client); } // Cleanup let _ = std::fs::remove_file(&socket_path); } #[tokio::test] async fn test_client_disconnect_doesnt_affect_others() { let socket_path = test_socket_path("disconnect"); let manager = Arc::new(EventManager::new().unwrap()); // Start the listener let listener_manager = Arc::clone(&manager); let socket_path_clone = socket_path.clone(); tokio::spawn(async move { listener_manager.start_listening(socket_path_clone).await; }); tokio::time::sleep(Duration::from_millis(50)).await; // Connect 3 clients let stream1 = UnixStream::connect(&socket_path).await.unwrap(); let stream2 = UnixStream::connect(&socket_path).await.unwrap(); let stream3 = UnixStream::connect(&socket_path).await.unwrap(); let mut reader1 = BufReader::new(stream1); let mut reader2 = BufReader::new(stream2); let mut reader3 = BufReader::new(stream3); // Broadcast initial event manager .broadcast(&Event::new("before disconnect")) .await .unwrap(); // All receive it let _ = read_events_with_timeout(&mut reader1, 1, 100).await; let _ = read_events_with_timeout(&mut reader2, 1, 100).await; let _ = read_events_with_timeout(&mut reader3, 1, 100).await; // Drop client 2 (simulates disconnect) drop(reader2); // Broadcast another event manager .broadcast(&Event::new("after disconnect")) .await .unwrap(); // Clients 1 and 3 should still receive it let events1 = read_events_with_timeout(&mut reader1, 1, 100).await; let events3 = read_events_with_timeout(&mut reader3, 1, 100).await; assert_eq!(events1.len(), 1); assert_eq!(events3.len(), 1); assert!(events1[0].contains("after disconnect")); assert!(events3[0].contains("after disconnect")); // Cleanup let _ = std::fs::remove_file(&socket_path); } #[tokio::test] async fn test_json_deserialization_of_received_events() { let socket_path = test_socket_path("json_deser"); let manager = Arc::new(EventManager::new().unwrap()); // Start the listener let listener_manager = Arc::clone(&manager); let socket_path_clone = socket_path.clone(); tokio::spawn(async move { listener_manager.start_listening(socket_path_clone).await; }); tokio::time::sleep(Duration::from_millis(50)).await; // Broadcast an event with special characters let test_message = "special chars: @#$% newline\\n tab\\t quotes \"test\""; manager.broadcast(&Event::new(test_message)).await.unwrap(); // Connect and deserialize let stream = UnixStream::connect(&socket_path).await.unwrap(); let mut reader = BufReader::new(stream); let mut line = String::new(); reader.read_line(&mut line).await.unwrap(); // Should be valid JSON let parsed: serde_json::Value = serde_json::from_str(&line.trim()).unwrap(); assert_eq!(parsed["message"], test_message); // Cleanup let _ = std::fs::remove_file(&socket_path); }