diff --git a/tests/event_test.rs b/tests/event_test.rs new file mode 100644 index 0000000..5237b76 --- /dev/null +++ b/tests/event_test.rs @@ -0,0 +1,492 @@ +use std::{sync::Arc, time::Duration}; + +use robotnik::{event::Event, event_manager::EventManager}; +use rstest::rstest; +use tokio::{ + io::{AsyncBufReadExt, BufReader}, + net::UnixStream, + time::timeout, +}; + +const TEST_SOCKET_BASE: &str = "/tmp/robotnik_test"; + +/// Helper to create unique socket paths for parallel tests +fn test_socket_path(name: &str) -> String { + format!("{}_{}_{}", TEST_SOCKET_BASE, name, std::process::id()) +} + +/// Helper to read one JSON event from a stream +async fn read_event( + reader: &mut BufReader, +) -> Result> { + let mut line = String::new(); + reader.read_line(&mut line).await?; + let event: Event = serde_json::from_str(&line)?; + Ok(event) +} + +/// Helper to read all available events with a timeout +async fn read_events_with_timeout( + reader: &mut BufReader, + max_count: usize, + timeout_ms: u64, +) -> Vec { + let mut events = Vec::new(); + for _ in 0..max_count { + let mut line = String::new(); + match timeout( + Duration::from_millis(timeout_ms), + reader.read_line(&mut line), + ) + .await + { + Ok(Ok(0)) => break, // EOF + Ok(Ok(_)) => events.push(line), + Ok(Err(_)) => break, // Read error + Err(_) => break, // Timeout + } + } + events +} + +#[tokio::test] +async fn test_client_connects_and_receives_event() { + let socket_path = test_socket_path("basic_connect"); + let manager = Arc::new(EventManager::new().unwrap()); + + // Start the listener + let listener_manager = Arc::clone(&manager); + let socket_path_clone = socket_path.clone(); + tokio::spawn(async move { + listener_manager.start_listening(socket_path_clone).await; + }); + + // Give the listener time to start + tokio::time::sleep(Duration::from_millis(50)).await; + + // Broadcast an event + let event = Event::new("test message"); + manager.broadcast(&event).await.unwrap(); + + // Connect as a client + let stream = UnixStream::connect(&socket_path).await.unwrap(); + let mut reader = BufReader::new(stream); + + // Read the event + let mut line = String::new(); + reader.read_line(&mut line).await.unwrap(); + + assert!(line.contains("test message")); + assert!(line.ends_with('\n')); + + // Cleanup + let _ = std::fs::remove_file(&socket_path); +} + +#[tokio::test] +async fn test_client_receives_event_history() { + let socket_path = test_socket_path("event_history"); + let manager = Arc::new(EventManager::new().unwrap()); + + // Broadcast events BEFORE starting the listener + for i in 0..5 { + let event = Event::new(format!("historical event {}", i)); + manager.broadcast(&event).await.unwrap(); + } + + // Start the listener + let listener_manager = Arc::clone(&manager); + let socket_path_clone = socket_path.clone(); + tokio::spawn(async move { + listener_manager.start_listening(socket_path_clone).await; + }); + + tokio::time::sleep(Duration::from_millis(50)).await; + + // Connect as a client + let stream = UnixStream::connect(&socket_path).await.unwrap(); + let mut reader = BufReader::new(stream); + + // Should receive all 5 historical events + let events = read_events_with_timeout(&mut reader, 5, 100).await; + + assert_eq!(events.len(), 5); + assert!(events[0].contains("historical event 0")); + assert!(events[4].contains("historical event 4")); + + // Cleanup + let _ = std::fs::remove_file(&socket_path); +} + +#[tokio::test] +async fn test_multiple_clients_receive_same_events() { + let socket_path = test_socket_path("multiple_clients"); + let manager = Arc::new(EventManager::new().unwrap()); + + // Start the listener + let listener_manager = Arc::clone(&manager); + let socket_path_clone = socket_path.clone(); + tokio::spawn(async move { + listener_manager.start_listening(socket_path_clone).await; + }); + + tokio::time::sleep(Duration::from_millis(50)).await; + + // Connect 3 clients + let stream1 = UnixStream::connect(&socket_path).await.unwrap(); + let stream2 = UnixStream::connect(&socket_path).await.unwrap(); + let stream3 = UnixStream::connect(&socket_path).await.unwrap(); + + let mut reader1 = BufReader::new(stream1); + let mut reader2 = BufReader::new(stream2); + let mut reader3 = BufReader::new(stream3); + + // Broadcast a new event + let event = Event::new("broadcast to all"); + manager.broadcast(&event).await.unwrap(); + + // All clients should receive the event + let mut line1 = String::new(); + let mut line2 = String::new(); + let mut line3 = String::new(); + + timeout(Duration::from_millis(100), reader1.read_line(&mut line1)) + .await + .unwrap() + .unwrap(); + timeout(Duration::from_millis(100), reader2.read_line(&mut line2)) + .await + .unwrap() + .unwrap(); + timeout(Duration::from_millis(100), reader3.read_line(&mut line3)) + .await + .unwrap() + .unwrap(); + + assert!(line1.contains("broadcast to all")); + assert!(line2.contains("broadcast to all")); + assert!(line3.contains("broadcast to all")); + + // Cleanup + let _ = std::fs::remove_file(&socket_path); +} + +#[tokio::test] +async fn test_late_joiner_receives_full_history() { + let socket_path = test_socket_path("late_joiner"); + let manager = Arc::new(EventManager::new().unwrap()); + + // Start the listener + let listener_manager = Arc::clone(&manager); + let socket_path_clone = socket_path.clone(); + tokio::spawn(async move { + listener_manager.start_listening(socket_path_clone).await; + }); + + tokio::time::sleep(Duration::from_millis(50)).await; + + // First client connects + let stream1 = UnixStream::connect(&socket_path).await.unwrap(); + let mut reader1 = BufReader::new(stream1); + + // Broadcast several events + for i in 0..10 { + let event = Event::new(format!("event {}", i)); + manager.broadcast(&event).await.unwrap(); + } + + // Consume events from first client + let _ = read_events_with_timeout(&mut reader1, 10, 100).await; + + // Late joiner connects + let stream2 = UnixStream::connect(&socket_path).await.unwrap(); + let mut reader2 = BufReader::new(stream2); + + // Late joiner should receive all 10 events from history + let events = read_events_with_timeout(&mut reader2, 10, 100).await; + + assert_eq!(events.len(), 10); + assert!(events[0].contains("event 0")); + assert!(events[9].contains("event 9")); + + // Cleanup + let _ = std::fs::remove_file(&socket_path); +} + +#[tokio::test] +async fn test_client_receives_events_in_order() { + let socket_path = test_socket_path("event_order"); + let manager = Arc::new(EventManager::new().unwrap()); + + // Start the listener + let listener_manager = Arc::clone(&manager); + let socket_path_clone = socket_path.clone(); + tokio::spawn(async move { + listener_manager.start_listening(socket_path_clone).await; + }); + + tokio::time::sleep(Duration::from_millis(50)).await; + + // Connect client + let stream = UnixStream::connect(&socket_path).await.unwrap(); + let mut reader = BufReader::new(stream); + + // Broadcast events rapidly + let count = 50; + for i in 0..count { + let event = Event::new(format!("sequence {}", i)); + manager.broadcast(&event).await.unwrap(); + } + + // Read all events + let events = read_events_with_timeout(&mut reader, count, 500).await; + + assert_eq!(events.len(), count); + + // Verify order + for (i, event) in events.iter().enumerate() { + assert!( + event.contains(&format!("sequence {}", i)), + "Event {} out of order: {}", + i, + event + ); + } + + // Cleanup + let _ = std::fs::remove_file(&socket_path); +} + +#[tokio::test] +async fn test_concurrent_broadcasts_during_client_connections() { + let socket_path = test_socket_path("concurrent_ops"); + let manager = Arc::new(EventManager::new().unwrap()); + + // Start the listener + let listener_manager = Arc::clone(&manager); + let socket_path_clone = socket_path.clone(); + tokio::spawn(async move { + listener_manager.start_listening(socket_path_clone).await; + }); + + tokio::time::sleep(Duration::from_millis(50)).await; + + // Connect client 1 BEFORE any broadcasts + let stream1 = UnixStream::connect(&socket_path).await.unwrap(); + let mut reader1 = BufReader::new(stream1); + + // Spawn a task that continuously broadcasts + let broadcast_manager = Arc::clone(&manager); + let broadcast_handle = tokio::spawn(async move { + for i in 0..100 { + let event = Event::new(format!("concurrent event {}", i)); + broadcast_manager.broadcast(&event).await.unwrap(); + tokio::time::sleep(Duration::from_millis(5)).await; + } + }); + + // While broadcasting, connect more clients at different times + tokio::time::sleep(Duration::from_millis(100)).await; + let stream2 = UnixStream::connect(&socket_path).await.unwrap(); + let mut reader2 = BufReader::new(stream2); + + tokio::time::sleep(Duration::from_millis(150)).await; + let stream3 = UnixStream::connect(&socket_path).await.unwrap(); + let mut reader3 = BufReader::new(stream3); + + // Wait for broadcasts to complete + broadcast_handle.await.unwrap(); + + // All clients should have received events + let events1 = read_events_with_timeout(&mut reader1, 100, 200).await; + let events2 = read_events_with_timeout(&mut reader2, 100, 200).await; + let events3 = read_events_with_timeout(&mut reader3, 100, 200).await; + + // Client 1 connected first (before any broadcasts), should get all 100 + assert_eq!(events1.len(), 100); + + // Client 2 connected after ~20 events were broadcast + // Gets ~20 from history + ~80 live = 100 + assert_eq!(events2.len(), 100); + + // Client 3 connected after ~50 events were broadcast + // Gets ~50 from history + ~50 live = 100 + assert_eq!(events3.len(), 100); + + // Verify they all received events in order + assert!(events1[0].contains("concurrent event 0")); + assert!(events2[0].contains("concurrent event 0")); + assert!(events3[0].contains("concurrent event 0")); + + // Cleanup + let _ = std::fs::remove_file(&socket_path); +} + +#[tokio::test] +async fn test_buffer_overflow_affects_new_clients() { + let socket_path = test_socket_path("buffer_overflow"); + let manager = Arc::new(EventManager::new().unwrap()); + + // Broadcast more than buffer max (1000) + for i in 0..1100 { + let event = Event::new(format!("overflow event {}", i)); + manager.broadcast(&event).await.unwrap(); + } + + // Start the listener + let listener_manager = Arc::clone(&manager); + let socket_path_clone = socket_path.clone(); + tokio::spawn(async move { + listener_manager.start_listening(socket_path_clone).await; + }); + + tokio::time::sleep(Duration::from_millis(50)).await; + + // New client connects + let stream = UnixStream::connect(&socket_path).await.unwrap(); + let mut reader = BufReader::new(stream); + + // Should receive exactly 1000 events (buffer max) + let events = read_events_with_timeout(&mut reader, 1100, 500).await; + + assert_eq!(events.len(), 1000); + + // First event should be 100 (oldest 100 were evicted) + assert!(events[0].contains("overflow event 100")); + + // Last event should be 1099 + assert!(events[999].contains("overflow event 1099")); + + // Cleanup + let _ = std::fs::remove_file(&socket_path); +} + +#[rstest] +#[case(10, 1)] +#[case(50, 5)] +#[tokio::test] +async fn test_client_count_scaling(#[case] num_clients: usize, #[case] events_per_client: usize) { + let socket_path = test_socket_path(&format!("scaling_{}_{}", num_clients, events_per_client)); + let manager = Arc::new(EventManager::new().unwrap()); + + // Start the listener + let listener_manager = Arc::clone(&manager); + let socket_path_clone = socket_path.clone(); + tokio::spawn(async move { + listener_manager.start_listening(socket_path_clone).await; + }); + + tokio::time::sleep(Duration::from_millis(50)).await; + + // Connect many clients + let mut readers = Vec::new(); + for _ in 0..num_clients { + let stream = UnixStream::connect(&socket_path).await.unwrap(); + readers.push(BufReader::new(stream)); + } + + // Broadcast events + for i in 0..events_per_client { + let event = Event::new(format!("scale event {}", i)); + manager.broadcast(&event).await.unwrap(); + } + + // Verify all clients received all events + for reader in &mut readers { + let events = read_events_with_timeout(reader, events_per_client, 200).await; + assert_eq!(events.len(), events_per_client); + } + + // Cleanup + let _ = std::fs::remove_file(&socket_path); +} + +#[tokio::test] +async fn test_client_disconnect_doesnt_affect_others() { + let socket_path = test_socket_path("disconnect"); + let manager = Arc::new(EventManager::new().unwrap()); + + // Start the listener + let listener_manager = Arc::clone(&manager); + let socket_path_clone = socket_path.clone(); + tokio::spawn(async move { + listener_manager.start_listening(socket_path_clone).await; + }); + + tokio::time::sleep(Duration::from_millis(50)).await; + + // Connect 3 clients + let stream1 = UnixStream::connect(&socket_path).await.unwrap(); + let stream2 = UnixStream::connect(&socket_path).await.unwrap(); + let stream3 = UnixStream::connect(&socket_path).await.unwrap(); + + let mut reader1 = BufReader::new(stream1); + let mut reader2 = BufReader::new(stream2); + let mut reader3 = BufReader::new(stream3); + + // Broadcast initial event + manager + .broadcast(&Event::new("before disconnect")) + .await + .unwrap(); + + // All receive it + let _ = read_events_with_timeout(&mut reader1, 1, 100).await; + let _ = read_events_with_timeout(&mut reader2, 1, 100).await; + let _ = read_events_with_timeout(&mut reader3, 1, 100).await; + + // Drop client 2 (simulates disconnect) + drop(reader2); + + // Broadcast another event + manager + .broadcast(&Event::new("after disconnect")) + .await + .unwrap(); + + // Clients 1 and 3 should still receive it + let events1 = read_events_with_timeout(&mut reader1, 1, 100).await; + let events3 = read_events_with_timeout(&mut reader3, 1, 100).await; + + assert_eq!(events1.len(), 1); + assert_eq!(events3.len(), 1); + assert!(events1[0].contains("after disconnect")); + assert!(events3[0].contains("after disconnect")); + + // Cleanup + let _ = std::fs::remove_file(&socket_path); +} + +#[tokio::test] +async fn test_json_deserialization_of_received_events() { + let socket_path = test_socket_path("json_deser"); + let manager = Arc::new(EventManager::new().unwrap()); + + // Start the listener + let listener_manager = Arc::clone(&manager); + let socket_path_clone = socket_path.clone(); + tokio::spawn(async move { + listener_manager.start_listening(socket_path_clone).await; + }); + + tokio::time::sleep(Duration::from_millis(50)).await; + + // Broadcast an event with special characters + let test_message = "special chars: @#$% newline\\n tab\\t quotes \"test\""; + manager.broadcast(&Event::new(test_message)).await.unwrap(); + + // Connect and deserialize + let stream = UnixStream::connect(&socket_path).await.unwrap(); + let mut reader = BufReader::new(stream); + + let mut line = String::new(); + reader.read_line(&mut line).await.unwrap(); + + // Should be valid JSON + let parsed: serde_json::Value = serde_json::from_str(&line.trim()).unwrap(); + + assert_eq!(parsed["message"], test_message); + + // Cleanup + let _ = std::fs::remove_file(&socket_path); +}