2025-11-11 00:39:58 -06:00
|
|
|
use std::{sync::Arc, time::Duration};
|
|
|
|
|
|
|
|
|
|
use robotnik::{event::Event, event_manager::EventManager};
|
|
|
|
|
use rstest::rstest;
|
|
|
|
|
use tokio::{
|
|
|
|
|
io::{AsyncBufReadExt, BufReader},
|
|
|
|
|
net::UnixStream,
|
|
|
|
|
time::timeout,
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
const TEST_SOCKET_BASE: &str = "/tmp/robotnik_test";
|
|
|
|
|
|
|
|
|
|
/// Helper to create unique socket paths for parallel tests
|
|
|
|
|
fn test_socket_path(name: &str) -> String {
|
|
|
|
|
format!("{}_{}_{}", TEST_SOCKET_BASE, name, std::process::id())
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Helper to read one JSON event from a stream
|
|
|
|
|
async fn read_event(
|
|
|
|
|
reader: &mut BufReader<UnixStream>,
|
|
|
|
|
) -> Result<Event, Box<dyn std::error::Error>> {
|
|
|
|
|
let mut line = String::new();
|
|
|
|
|
reader.read_line(&mut line).await?;
|
|
|
|
|
let event: Event = serde_json::from_str(&line)?;
|
|
|
|
|
Ok(event)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Helper to read all available events with a timeout
|
|
|
|
|
async fn read_events_with_timeout(
|
|
|
|
|
reader: &mut BufReader<UnixStream>,
|
|
|
|
|
max_count: usize,
|
|
|
|
|
timeout_ms: u64,
|
|
|
|
|
) -> Vec<String> {
|
|
|
|
|
let mut events = Vec::new();
|
|
|
|
|
for _ in 0..max_count {
|
|
|
|
|
let mut line = String::new();
|
|
|
|
|
match timeout(
|
|
|
|
|
Duration::from_millis(timeout_ms),
|
|
|
|
|
reader.read_line(&mut line),
|
|
|
|
|
)
|
|
|
|
|
.await
|
|
|
|
|
{
|
|
|
|
|
Ok(Ok(0)) => break, // EOF
|
|
|
|
|
Ok(Ok(_)) => events.push(line),
|
|
|
|
|
Ok(Err(_)) => break, // Read error
|
|
|
|
|
Err(_) => break, // Timeout
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
events
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#[tokio::test]
|
|
|
|
|
async fn test_client_connects_and_receives_event() {
|
|
|
|
|
let socket_path = test_socket_path("basic_connect");
|
|
|
|
|
let manager = Arc::new(EventManager::new().unwrap());
|
|
|
|
|
|
|
|
|
|
// Start the listener
|
|
|
|
|
let listener_manager = Arc::clone(&manager);
|
|
|
|
|
let socket_path_clone = socket_path.clone();
|
|
|
|
|
tokio::spawn(async move {
|
|
|
|
|
listener_manager.start_listening(socket_path_clone).await;
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
// Give the listener time to start
|
|
|
|
|
tokio::time::sleep(Duration::from_millis(50)).await;
|
|
|
|
|
|
|
|
|
|
// Broadcast an event
|
2025-11-14 05:06:57 -06:00
|
|
|
let event = Event::new("test_user", "test message");
|
2025-11-11 00:39:58 -06:00
|
|
|
manager.broadcast(&event).await.unwrap();
|
|
|
|
|
|
|
|
|
|
// Connect as a client
|
|
|
|
|
let stream = UnixStream::connect(&socket_path).await.unwrap();
|
|
|
|
|
let mut reader = BufReader::new(stream);
|
|
|
|
|
|
|
|
|
|
// Read the event
|
|
|
|
|
let mut line = String::new();
|
|
|
|
|
reader.read_line(&mut line).await.unwrap();
|
|
|
|
|
|
|
|
|
|
assert!(line.contains("test message"));
|
|
|
|
|
assert!(line.ends_with('\n'));
|
|
|
|
|
|
|
|
|
|
// Cleanup
|
|
|
|
|
let _ = std::fs::remove_file(&socket_path);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#[tokio::test]
|
|
|
|
|
async fn test_client_receives_event_history() {
|
|
|
|
|
let socket_path = test_socket_path("event_history");
|
|
|
|
|
let manager = Arc::new(EventManager::new().unwrap());
|
|
|
|
|
|
|
|
|
|
// Broadcast events BEFORE starting the listener
|
|
|
|
|
for i in 0..5 {
|
2025-11-14 05:06:57 -06:00
|
|
|
let event = Event::new("test_user", format!("historical event {}", i));
|
2025-11-11 00:39:58 -06:00
|
|
|
manager.broadcast(&event).await.unwrap();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Start the listener
|
|
|
|
|
let listener_manager = Arc::clone(&manager);
|
|
|
|
|
let socket_path_clone = socket_path.clone();
|
|
|
|
|
tokio::spawn(async move {
|
|
|
|
|
listener_manager.start_listening(socket_path_clone).await;
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
tokio::time::sleep(Duration::from_millis(50)).await;
|
|
|
|
|
|
|
|
|
|
// Connect as a client
|
|
|
|
|
let stream = UnixStream::connect(&socket_path).await.unwrap();
|
|
|
|
|
let mut reader = BufReader::new(stream);
|
|
|
|
|
|
|
|
|
|
// Should receive all 5 historical events
|
|
|
|
|
let events = read_events_with_timeout(&mut reader, 5, 100).await;
|
|
|
|
|
|
|
|
|
|
assert_eq!(events.len(), 5);
|
|
|
|
|
assert!(events[0].contains("historical event 0"));
|
|
|
|
|
assert!(events[4].contains("historical event 4"));
|
|
|
|
|
|
|
|
|
|
// Cleanup
|
|
|
|
|
let _ = std::fs::remove_file(&socket_path);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#[tokio::test]
|
|
|
|
|
async fn test_multiple_clients_receive_same_events() {
|
|
|
|
|
let socket_path = test_socket_path("multiple_clients");
|
|
|
|
|
let manager = Arc::new(EventManager::new().unwrap());
|
|
|
|
|
|
|
|
|
|
// Start the listener
|
|
|
|
|
let listener_manager = Arc::clone(&manager);
|
|
|
|
|
let socket_path_clone = socket_path.clone();
|
|
|
|
|
tokio::spawn(async move {
|
|
|
|
|
listener_manager.start_listening(socket_path_clone).await;
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
tokio::time::sleep(Duration::from_millis(50)).await;
|
|
|
|
|
|
|
|
|
|
// Connect 3 clients
|
|
|
|
|
let stream1 = UnixStream::connect(&socket_path).await.unwrap();
|
|
|
|
|
let stream2 = UnixStream::connect(&socket_path).await.unwrap();
|
|
|
|
|
let stream3 = UnixStream::connect(&socket_path).await.unwrap();
|
|
|
|
|
|
|
|
|
|
let mut reader1 = BufReader::new(stream1);
|
|
|
|
|
let mut reader2 = BufReader::new(stream2);
|
|
|
|
|
let mut reader3 = BufReader::new(stream3);
|
|
|
|
|
|
|
|
|
|
// Broadcast a new event
|
2025-11-14 05:06:57 -06:00
|
|
|
let event = Event::new("test_user", "broadcast to all");
|
2025-11-11 00:39:58 -06:00
|
|
|
manager.broadcast(&event).await.unwrap();
|
|
|
|
|
|
|
|
|
|
// All clients should receive the event
|
|
|
|
|
let mut line1 = String::new();
|
|
|
|
|
let mut line2 = String::new();
|
|
|
|
|
let mut line3 = String::new();
|
|
|
|
|
|
|
|
|
|
timeout(Duration::from_millis(100), reader1.read_line(&mut line1))
|
|
|
|
|
.await
|
|
|
|
|
.unwrap()
|
|
|
|
|
.unwrap();
|
|
|
|
|
timeout(Duration::from_millis(100), reader2.read_line(&mut line2))
|
|
|
|
|
.await
|
|
|
|
|
.unwrap()
|
|
|
|
|
.unwrap();
|
|
|
|
|
timeout(Duration::from_millis(100), reader3.read_line(&mut line3))
|
|
|
|
|
.await
|
|
|
|
|
.unwrap()
|
|
|
|
|
.unwrap();
|
|
|
|
|
|
|
|
|
|
assert!(line1.contains("broadcast to all"));
|
|
|
|
|
assert!(line2.contains("broadcast to all"));
|
|
|
|
|
assert!(line3.contains("broadcast to all"));
|
|
|
|
|
|
|
|
|
|
// Cleanup
|
|
|
|
|
let _ = std::fs::remove_file(&socket_path);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#[tokio::test]
|
|
|
|
|
async fn test_late_joiner_receives_full_history() {
|
|
|
|
|
let socket_path = test_socket_path("late_joiner");
|
|
|
|
|
let manager = Arc::new(EventManager::new().unwrap());
|
|
|
|
|
|
|
|
|
|
// Start the listener
|
|
|
|
|
let listener_manager = Arc::clone(&manager);
|
|
|
|
|
let socket_path_clone = socket_path.clone();
|
|
|
|
|
tokio::spawn(async move {
|
|
|
|
|
listener_manager.start_listening(socket_path_clone).await;
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
tokio::time::sleep(Duration::from_millis(50)).await;
|
|
|
|
|
|
|
|
|
|
// First client connects
|
|
|
|
|
let stream1 = UnixStream::connect(&socket_path).await.unwrap();
|
|
|
|
|
let mut reader1 = BufReader::new(stream1);
|
|
|
|
|
|
|
|
|
|
// Broadcast several events
|
|
|
|
|
for i in 0..10 {
|
2025-11-14 05:06:57 -06:00
|
|
|
let event = Event::new("test_user", format!("event {}", i));
|
2025-11-11 00:39:58 -06:00
|
|
|
manager.broadcast(&event).await.unwrap();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Consume events from first client
|
|
|
|
|
let _ = read_events_with_timeout(&mut reader1, 10, 100).await;
|
|
|
|
|
|
|
|
|
|
// Late joiner connects
|
|
|
|
|
let stream2 = UnixStream::connect(&socket_path).await.unwrap();
|
|
|
|
|
let mut reader2 = BufReader::new(stream2);
|
|
|
|
|
|
|
|
|
|
// Late joiner should receive all 10 events from history
|
|
|
|
|
let events = read_events_with_timeout(&mut reader2, 10, 100).await;
|
|
|
|
|
|
|
|
|
|
assert_eq!(events.len(), 10);
|
|
|
|
|
assert!(events[0].contains("event 0"));
|
|
|
|
|
assert!(events[9].contains("event 9"));
|
|
|
|
|
|
|
|
|
|
// Cleanup
|
|
|
|
|
let _ = std::fs::remove_file(&socket_path);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#[tokio::test]
|
|
|
|
|
async fn test_client_receives_events_in_order() {
|
|
|
|
|
let socket_path = test_socket_path("event_order");
|
|
|
|
|
let manager = Arc::new(EventManager::new().unwrap());
|
|
|
|
|
|
|
|
|
|
// Start the listener
|
|
|
|
|
let listener_manager = Arc::clone(&manager);
|
|
|
|
|
let socket_path_clone = socket_path.clone();
|
|
|
|
|
tokio::spawn(async move {
|
|
|
|
|
listener_manager.start_listening(socket_path_clone).await;
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
tokio::time::sleep(Duration::from_millis(50)).await;
|
|
|
|
|
|
|
|
|
|
// Connect client
|
|
|
|
|
let stream = UnixStream::connect(&socket_path).await.unwrap();
|
|
|
|
|
let mut reader = BufReader::new(stream);
|
|
|
|
|
|
|
|
|
|
// Broadcast events rapidly
|
|
|
|
|
let count = 50;
|
|
|
|
|
for i in 0..count {
|
2025-11-14 05:06:57 -06:00
|
|
|
let event = Event::new("test_user", format!("sequence {}", i));
|
2025-11-11 00:39:58 -06:00
|
|
|
manager.broadcast(&event).await.unwrap();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Read all events
|
|
|
|
|
let events = read_events_with_timeout(&mut reader, count, 500).await;
|
|
|
|
|
|
|
|
|
|
assert_eq!(events.len(), count);
|
|
|
|
|
|
|
|
|
|
// Verify order
|
|
|
|
|
for (i, event) in events.iter().enumerate() {
|
|
|
|
|
assert!(
|
|
|
|
|
event.contains(&format!("sequence {}", i)),
|
|
|
|
|
"Event {} out of order: {}",
|
|
|
|
|
i,
|
|
|
|
|
event
|
|
|
|
|
);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Cleanup
|
|
|
|
|
let _ = std::fs::remove_file(&socket_path);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#[tokio::test]
|
|
|
|
|
async fn test_concurrent_broadcasts_during_client_connections() {
|
|
|
|
|
let socket_path = test_socket_path("concurrent_ops");
|
|
|
|
|
let manager = Arc::new(EventManager::new().unwrap());
|
|
|
|
|
|
|
|
|
|
// Start the listener
|
|
|
|
|
let listener_manager = Arc::clone(&manager);
|
|
|
|
|
let socket_path_clone = socket_path.clone();
|
|
|
|
|
tokio::spawn(async move {
|
|
|
|
|
listener_manager.start_listening(socket_path_clone).await;
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
tokio::time::sleep(Duration::from_millis(50)).await;
|
|
|
|
|
|
|
|
|
|
// Connect client 1 BEFORE any broadcasts
|
|
|
|
|
let stream1 = UnixStream::connect(&socket_path).await.unwrap();
|
|
|
|
|
let mut reader1 = BufReader::new(stream1);
|
|
|
|
|
|
|
|
|
|
// Spawn a task that continuously broadcasts
|
|
|
|
|
let broadcast_manager = Arc::clone(&manager);
|
|
|
|
|
let broadcast_handle = tokio::spawn(async move {
|
|
|
|
|
for i in 0..100 {
|
2025-11-14 05:06:57 -06:00
|
|
|
let event = Event::new("test_user", format!("concurrent event {}", i));
|
2025-11-11 00:39:58 -06:00
|
|
|
broadcast_manager.broadcast(&event).await.unwrap();
|
|
|
|
|
tokio::time::sleep(Duration::from_millis(5)).await;
|
|
|
|
|
}
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
// While broadcasting, connect more clients at different times
|
|
|
|
|
tokio::time::sleep(Duration::from_millis(100)).await;
|
|
|
|
|
let stream2 = UnixStream::connect(&socket_path).await.unwrap();
|
|
|
|
|
let mut reader2 = BufReader::new(stream2);
|
|
|
|
|
|
|
|
|
|
tokio::time::sleep(Duration::from_millis(150)).await;
|
|
|
|
|
let stream3 = UnixStream::connect(&socket_path).await.unwrap();
|
|
|
|
|
let mut reader3 = BufReader::new(stream3);
|
|
|
|
|
|
|
|
|
|
// Wait for broadcasts to complete
|
|
|
|
|
broadcast_handle.await.unwrap();
|
|
|
|
|
|
|
|
|
|
// All clients should have received events
|
|
|
|
|
let events1 = read_events_with_timeout(&mut reader1, 100, 200).await;
|
|
|
|
|
let events2 = read_events_with_timeout(&mut reader2, 100, 200).await;
|
|
|
|
|
let events3 = read_events_with_timeout(&mut reader3, 100, 200).await;
|
|
|
|
|
|
|
|
|
|
// Client 1 connected first (before any broadcasts), should get all 100
|
|
|
|
|
assert_eq!(events1.len(), 100);
|
|
|
|
|
|
|
|
|
|
// Client 2 connected after ~20 events were broadcast
|
|
|
|
|
// Gets ~20 from history + ~80 live = 100
|
|
|
|
|
assert_eq!(events2.len(), 100);
|
|
|
|
|
|
|
|
|
|
// Client 3 connected after ~50 events were broadcast
|
|
|
|
|
// Gets ~50 from history + ~50 live = 100
|
|
|
|
|
assert_eq!(events3.len(), 100);
|
|
|
|
|
|
|
|
|
|
// Verify they all received events in order
|
|
|
|
|
assert!(events1[0].contains("concurrent event 0"));
|
|
|
|
|
assert!(events2[0].contains("concurrent event 0"));
|
|
|
|
|
assert!(events3[0].contains("concurrent event 0"));
|
|
|
|
|
|
|
|
|
|
// Cleanup
|
|
|
|
|
let _ = std::fs::remove_file(&socket_path);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#[tokio::test]
|
|
|
|
|
async fn test_buffer_overflow_affects_new_clients() {
|
|
|
|
|
let socket_path = test_socket_path("buffer_overflow");
|
|
|
|
|
let manager = Arc::new(EventManager::new().unwrap());
|
|
|
|
|
|
|
|
|
|
// Broadcast more than buffer max (1000)
|
|
|
|
|
for i in 0..1100 {
|
2025-11-14 05:06:57 -06:00
|
|
|
let event = Event::new("test_user", format!("overflow event {}", i));
|
2025-11-11 00:39:58 -06:00
|
|
|
manager.broadcast(&event).await.unwrap();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Start the listener
|
|
|
|
|
let listener_manager = Arc::clone(&manager);
|
|
|
|
|
let socket_path_clone = socket_path.clone();
|
|
|
|
|
tokio::spawn(async move {
|
|
|
|
|
listener_manager.start_listening(socket_path_clone).await;
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
tokio::time::sleep(Duration::from_millis(50)).await;
|
|
|
|
|
|
|
|
|
|
// New client connects
|
|
|
|
|
let stream = UnixStream::connect(&socket_path).await.unwrap();
|
|
|
|
|
let mut reader = BufReader::new(stream);
|
|
|
|
|
|
|
|
|
|
// Should receive exactly 1000 events (buffer max)
|
|
|
|
|
let events = read_events_with_timeout(&mut reader, 1100, 500).await;
|
|
|
|
|
|
|
|
|
|
assert_eq!(events.len(), 1000);
|
|
|
|
|
|
|
|
|
|
// First event should be 100 (oldest 100 were evicted)
|
|
|
|
|
assert!(events[0].contains("overflow event 100"));
|
|
|
|
|
|
|
|
|
|
// Last event should be 1099
|
|
|
|
|
assert!(events[999].contains("overflow event 1099"));
|
|
|
|
|
|
|
|
|
|
// Cleanup
|
|
|
|
|
let _ = std::fs::remove_file(&socket_path);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#[rstest]
|
|
|
|
|
#[case(10, 1)]
|
|
|
|
|
#[case(50, 5)]
|
|
|
|
|
#[tokio::test]
|
|
|
|
|
async fn test_client_count_scaling(#[case] num_clients: usize, #[case] events_per_client: usize) {
|
|
|
|
|
let socket_path = test_socket_path(&format!("scaling_{}_{}", num_clients, events_per_client));
|
|
|
|
|
let manager = Arc::new(EventManager::new().unwrap());
|
|
|
|
|
|
|
|
|
|
// Start the listener
|
|
|
|
|
let listener_manager = Arc::clone(&manager);
|
|
|
|
|
let socket_path_clone = socket_path.clone();
|
|
|
|
|
tokio::spawn(async move {
|
|
|
|
|
listener_manager.start_listening(socket_path_clone).await;
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
tokio::time::sleep(Duration::from_millis(50)).await;
|
|
|
|
|
|
|
|
|
|
// Connect many clients
|
|
|
|
|
let mut readers = Vec::new();
|
|
|
|
|
for _ in 0..num_clients {
|
|
|
|
|
let stream = UnixStream::connect(&socket_path).await.unwrap();
|
|
|
|
|
readers.push(BufReader::new(stream));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Broadcast events
|
|
|
|
|
for i in 0..events_per_client {
|
2025-11-14 05:06:57 -06:00
|
|
|
let event = Event::new("test_user", format!("scale event {}", i));
|
2025-11-11 00:39:58 -06:00
|
|
|
manager.broadcast(&event).await.unwrap();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Verify all clients received all events
|
|
|
|
|
for reader in &mut readers {
|
|
|
|
|
let events = read_events_with_timeout(reader, events_per_client, 200).await;
|
|
|
|
|
assert_eq!(events.len(), events_per_client);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Cleanup
|
|
|
|
|
let _ = std::fs::remove_file(&socket_path);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#[tokio::test]
|
|
|
|
|
async fn test_client_disconnect_doesnt_affect_others() {
|
|
|
|
|
let socket_path = test_socket_path("disconnect");
|
|
|
|
|
let manager = Arc::new(EventManager::new().unwrap());
|
|
|
|
|
|
|
|
|
|
// Start the listener
|
|
|
|
|
let listener_manager = Arc::clone(&manager);
|
|
|
|
|
let socket_path_clone = socket_path.clone();
|
|
|
|
|
tokio::spawn(async move {
|
|
|
|
|
listener_manager.start_listening(socket_path_clone).await;
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
tokio::time::sleep(Duration::from_millis(50)).await;
|
|
|
|
|
|
|
|
|
|
// Connect 3 clients
|
|
|
|
|
let stream1 = UnixStream::connect(&socket_path).await.unwrap();
|
|
|
|
|
let stream2 = UnixStream::connect(&socket_path).await.unwrap();
|
|
|
|
|
let stream3 = UnixStream::connect(&socket_path).await.unwrap();
|
|
|
|
|
|
|
|
|
|
let mut reader1 = BufReader::new(stream1);
|
|
|
|
|
let mut reader2 = BufReader::new(stream2);
|
|
|
|
|
let mut reader3 = BufReader::new(stream3);
|
|
|
|
|
|
|
|
|
|
// Broadcast initial event
|
|
|
|
|
manager
|
2025-11-14 05:06:57 -06:00
|
|
|
.broadcast(&Event::new("test_user", "before disconnect"))
|
2025-11-11 00:39:58 -06:00
|
|
|
.await
|
|
|
|
|
.unwrap();
|
|
|
|
|
|
|
|
|
|
// All receive it
|
|
|
|
|
let _ = read_events_with_timeout(&mut reader1, 1, 100).await;
|
|
|
|
|
let _ = read_events_with_timeout(&mut reader2, 1, 100).await;
|
|
|
|
|
let _ = read_events_with_timeout(&mut reader3, 1, 100).await;
|
|
|
|
|
|
|
|
|
|
// Drop client 2 (simulates disconnect)
|
|
|
|
|
drop(reader2);
|
|
|
|
|
|
|
|
|
|
// Broadcast another event
|
|
|
|
|
manager
|
2025-11-14 05:06:57 -06:00
|
|
|
.broadcast(&Event::new("test_user", "after disconnect"))
|
2025-11-11 00:39:58 -06:00
|
|
|
.await
|
|
|
|
|
.unwrap();
|
|
|
|
|
|
|
|
|
|
// Clients 1 and 3 should still receive it
|
|
|
|
|
let events1 = read_events_with_timeout(&mut reader1, 1, 100).await;
|
|
|
|
|
let events3 = read_events_with_timeout(&mut reader3, 1, 100).await;
|
|
|
|
|
|
|
|
|
|
assert_eq!(events1.len(), 1);
|
|
|
|
|
assert_eq!(events3.len(), 1);
|
|
|
|
|
assert!(events1[0].contains("after disconnect"));
|
|
|
|
|
assert!(events3[0].contains("after disconnect"));
|
|
|
|
|
|
|
|
|
|
// Cleanup
|
|
|
|
|
let _ = std::fs::remove_file(&socket_path);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#[tokio::test]
|
|
|
|
|
async fn test_json_deserialization_of_received_events() {
|
|
|
|
|
let socket_path = test_socket_path("json_deser");
|
|
|
|
|
let manager = Arc::new(EventManager::new().unwrap());
|
|
|
|
|
|
|
|
|
|
// Start the listener
|
|
|
|
|
let listener_manager = Arc::clone(&manager);
|
|
|
|
|
let socket_path_clone = socket_path.clone();
|
|
|
|
|
tokio::spawn(async move {
|
|
|
|
|
listener_manager.start_listening(socket_path_clone).await;
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
tokio::time::sleep(Duration::from_millis(50)).await;
|
|
|
|
|
|
|
|
|
|
// Broadcast an event with special characters
|
|
|
|
|
let test_message = "special chars: @#$% newline\\n tab\\t quotes \"test\"";
|
2025-11-14 05:06:57 -06:00
|
|
|
manager
|
|
|
|
|
.broadcast(&Event::new("test_user", test_message))
|
|
|
|
|
.await
|
|
|
|
|
.unwrap();
|
2025-11-11 00:39:58 -06:00
|
|
|
|
|
|
|
|
// Connect and deserialize
|
|
|
|
|
let stream = UnixStream::connect(&socket_path).await.unwrap();
|
|
|
|
|
let mut reader = BufReader::new(stream);
|
|
|
|
|
|
|
|
|
|
let mut line = String::new();
|
|
|
|
|
reader.read_line(&mut line).await.unwrap();
|
|
|
|
|
|
|
|
|
|
// Should be valid JSON
|
|
|
|
|
let parsed: serde_json::Value = serde_json::from_str(&line.trim()).unwrap();
|
|
|
|
|
|
|
|
|
|
assert_eq!(parsed["message"], test_message);
|
|
|
|
|
|
|
|
|
|
// Cleanup
|
|
|
|
|
let _ = std::fs::remove_file(&socket_path);
|
|
|
|
|
}
|