From 70de039610dcc848015ecbef6acc8901b608968f Mon Sep 17 00:00:00 2001 From: Micheal Smith Date: Fri, 14 Nov 2025 05:06:57 -0600 Subject: [PATCH] Broadcast, and FIFO are currently functional. --- Cargo.lock | 315 ++++++++++++++++++++++++++++++++----------- Cargo.toml | 1 + src/chat.rs | 98 ++++++++++---- src/commands.rs | 4 +- src/event.rs | 20 ++- src/event_manager.rs | 292 +++++++++++++++++++++++++++++++++++++-- src/lib.rs | 18 ++- tests/event_test.rs | 25 ++-- 8 files changed, 642 insertions(+), 131 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 1eee981..590684a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -67,22 +67,22 @@ dependencies = [ [[package]] name = "anstyle-query" -version = "1.1.4" +version = "1.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9e231f6134f61b71076a3eab506c379d4f36122f2af15a9ff04415ea4c3339e2" +checksum = "40c48f72fd53cd289104fc64099abca73db4166ad86ea0b4341abe65af83dadc" dependencies = [ - "windows-sys 0.60.2", + "windows-sys 0.61.2", ] [[package]] name = "anstyle-wincon" -version = "3.0.10" +version = "3.0.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3e0633414522a32ffaac8ac6cc8f748e090c5717661fddeea04219e2344f5f2a" +checksum = "291e6a250ff86cd4a820112fb8898808a366d8f9f58ce16d1f538353ad55747d" dependencies = [ "anstyle", "once_cell_polyfill", - "windows-sys 0.60.2", + "windows-sys 0.61.2", ] [[package]] @@ -129,12 +129,6 @@ dependencies = [ "windows-link", ] -[[package]] -name = "base64" -version = "0.21.7" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9d297deb1925b89f2ccc13d7635fa0714f12c87adce1c75356b39ca9b7178567" - [[package]] name = "base64" version = "0.22.1" @@ -169,6 +163,15 @@ dependencies = [ "generic-array", ] +[[package]] +name = "block2" +version = "0.6.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cdeb9d870516001442e364c5220d3574d2da8dc765554b4a617230d33fa58ef5" +dependencies = [ + "objc2", +] + [[package]] name = "bumpalo" version = "3.19.0" @@ -189,9 +192,9 @@ checksum = "7b02b629252fe8ef6460461409564e2c21d0c8e77e0944f3d189ff06c4e932ad" [[package]] name = "cc" -version = "1.2.43" +version = "1.2.46" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "739eb0f94557554b3ca9a86d2d37bebd49c5e6d0c1d2bda35ba5bdac830befc2" +checksum = "b97463e1064cb1b1c1384ad0a0b9c8abd0988e2a91f52606c80ef14aadb63e36" dependencies = [ "find-msvc-tools", "shlex", @@ -296,9 +299,9 @@ checksum = "b05b61dc5112cbb17e4b6cd61790d9845d13888356391624cbe7e41efeac1e75" [[package]] name = "config" -version = "0.15.18" +version = "0.15.19" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "180e549344080374f9b32ed41bf3b6b57885ff6a289367b3dbc10eea8acc1918" +checksum = "b30fa8254caad766fc03cb0ccae691e14bf3bd72bfff27f72802ce729551b3d6" dependencies = [ "async-trait", "convert_case", @@ -388,9 +391,9 @@ checksum = "460fbee9c2c2f33933d720630a6a0bac33ba7053db5344fac858d4b8952d77d5" [[package]] name = "crypto-common" -version = "0.1.6" +version = "0.1.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1bfb12502f3fc46cca1bb51ac28df9d618d813cdc3d2f25b9fe775a34af26bb3" +checksum = "78c8292055d1c1df0cce5d180393dc8cce0abec0a7102adb6c7b1eef6016d60a" dependencies = [ "generic-array", "typenum", @@ -513,6 +516,16 @@ dependencies = [ "windows-sys 0.61.2", ] +[[package]] +name = "dispatch2" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "89a09f22a6c6069a18470eb92d2298acf25463f14256d24778e1230d789a2aec" +dependencies = [ + "bitflags", + "objc2", +] + [[package]] name = "displaydoc" version = "0.2.5" @@ -626,9 +639,9 @@ checksum = "877a4ace8713b0bcf2a4e7eec82529c029f1d0619886d18145fea96c3ffe5c0f" [[package]] name = "erased-serde" -version = "0.4.8" +version = "0.4.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "259d404d09818dec19332e31d94558aeb442fea04c817006456c24b5460bbd4b" +checksum = "89e8918065695684b2b0702da20382d5ae6065cf3327bc2d6436bd49a71ce9f3" dependencies = [ "serde", "serde_core", @@ -674,9 +687,9 @@ checksum = "37909eebbb50d72f9059c3b6d82c0463f2ff062c9e95845c43a6c9c0355411be" [[package]] name = "find-msvc-tools" -version = "0.1.4" +version = "0.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "52051878f80a721bb68ebfbc930e07b65ba72f2da88968ea5c06fd6ca3d3a127" +checksum = "3a3076410a55c90011c298b04d0cfa770b00fa04e1e3c97d3f6c9de105a03844" [[package]] name = "fnv" @@ -832,9 +845,9 @@ dependencies = [ [[package]] name = "generic-array" -version = "0.14.9" +version = "0.14.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4bb6743198531e02858aeaea5398fcc883e71851fcbcb5a2f773e2fb6cb1edf2" +checksum = "85649ca51fd72272d7821adaf274ad91c288277713d9c18820d8499a7ff69e9a" dependencies = [ "typenum", "version_check", @@ -985,9 +998,9 @@ dependencies = [ [[package]] name = "hyper" -version = "1.7.0" +version = "1.8.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "eb3aa54a13a0dfe7fbe3a59e0c76093041720fdc77b110cc0fc260fafb4dc51e" +checksum = "2ab2d4f250c3d7b1c9fcdff1cece94ea4e2dfbec68614f7b87cb205f24ca9d11" dependencies = [ "atomic-waker", "bytes", @@ -1023,11 +1036,11 @@ dependencies = [ [[package]] name = "hyper-util" -version = "0.1.17" +version = "0.1.18" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3c6995591a8f1380fcb4ba966a252a4b29188d51d2b89e3a252f5305be65aea8" +checksum = "52e9a2a24dc5c6821e71a7030e1e14b7b632acac55c40e9d2e082c621261bb56" dependencies = [ - "base64 0.22.1", + "base64", "bytes", "futures-channel", "futures-core", @@ -1250,9 +1263,9 @@ dependencies = [ [[package]] name = "iri-string" -version = "0.7.8" +version = "0.7.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dbc5ebe9c3a1a7a5127f920a418f7585e9e758e911d0466ed004f393b0e380b2" +checksum = "4f867b9d1d896b67beb18518eda36fdb77a32ea590de864f1325b294a6d14397" dependencies = [ "memchr", "serde", @@ -1447,6 +1460,165 @@ dependencies = [ "autocfg", ] +[[package]] +name = "objc2" +version = "0.6.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b7c2599ce0ec54857b29ce62166b0ed9b4f6f1a70ccc9a71165b6154caca8c05" +dependencies = [ + "objc2-encode", +] + +[[package]] +name = "objc2-cloud-kit" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "73ad74d880bb43877038da939b7427bba67e9dd42004a18b809ba7d87cee241c" +dependencies = [ + "bitflags", + "objc2", + "objc2-foundation", +] + +[[package]] +name = "objc2-core-data" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0b402a653efbb5e82ce4df10683b6b28027616a2715e90009947d50b8dd298fa" +dependencies = [ + "objc2", + "objc2-foundation", +] + +[[package]] +name = "objc2-core-foundation" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2a180dd8642fa45cdb7dd721cd4c11b1cadd4929ce112ebd8b9f5803cc79d536" +dependencies = [ + "bitflags", + "dispatch2", + "objc2", +] + +[[package]] +name = "objc2-core-graphics" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e022c9d066895efa1345f8e33e584b9f958da2fd4cd116792e15e07e4720a807" +dependencies = [ + "bitflags", + "dispatch2", + "objc2", + "objc2-core-foundation", + "objc2-io-surface", +] + +[[package]] +name = "objc2-core-image" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e5d563b38d2b97209f8e861173de434bd0214cf020e3423a52624cd1d989f006" +dependencies = [ + "objc2", + "objc2-foundation", +] + +[[package]] +name = "objc2-core-location" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ca347214e24bc973fc025fd0d36ebb179ff30536ed1f80252706db19ee452009" +dependencies = [ + "objc2", + "objc2-foundation", +] + +[[package]] +name = "objc2-core-text" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0cde0dfb48d25d2b4862161a4d5fcc0e3c24367869ad306b0c9ec0073bfed92d" +dependencies = [ + "bitflags", + "objc2", + "objc2-core-foundation", + "objc2-core-graphics", +] + +[[package]] +name = "objc2-encode" +version = "4.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ef25abbcd74fb2609453eb695bd2f860d389e457f67dc17cafc8b8cbc89d0c33" + +[[package]] +name = "objc2-foundation" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e3e0adef53c21f888deb4fa59fc59f7eb17404926ee8a6f59f5df0fd7f9f3272" +dependencies = [ + "bitflags", + "block2", + "libc", + "objc2", + "objc2-core-foundation", +] + +[[package]] +name = "objc2-io-surface" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "180788110936d59bab6bd83b6060ffdfffb3b922ba1396b312ae795e1de9d81d" +dependencies = [ + "bitflags", + "objc2", + "objc2-core-foundation", +] + +[[package]] +name = "objc2-quartz-core" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "96c1358452b371bf9f104e21ec536d37a650eb10f7ee379fff67d2e08d537f1f" +dependencies = [ + "bitflags", + "objc2", + "objc2-core-foundation", + "objc2-foundation", +] + +[[package]] +name = "objc2-ui-kit" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d87d638e33c06f577498cbcc50491496a3ed4246998a7fbba7ccb98b1e7eab22" +dependencies = [ + "bitflags", + "block2", + "objc2", + "objc2-cloud-kit", + "objc2-core-data", + "objc2-core-foundation", + "objc2-core-graphics", + "objc2-core-image", + "objc2-core-location", + "objc2-core-text", + "objc2-foundation", + "objc2-quartz-core", + "objc2-user-notifications", +] + +[[package]] +name = "objc2-user-notifications" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9df9128cbbfef73cda168416ccf7f837b62737d748333bfe9ab71c245d76613e" +dependencies = [ + "objc2", + "objc2-foundation", +] + [[package]] name = "object" version = "0.37.3" @@ -1470,9 +1642,9 @@ checksum = "384b8ab6d37215f3c5301a95a4accb5d64aa607f1fcb26a11b5303878451b4fe" [[package]] name = "openssl" -version = "0.10.74" +version = "0.10.75" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "24ad14dd45412269e1a30f52ad8f0664f0f4f4a89ee8fe28c3b3527021ebb654" +checksum = "08838db121398ad17ab8531ce9de97b244589089e290a384c900cb9ff7434328" dependencies = [ "bitflags", "cfg-if", @@ -1502,9 +1674,9 @@ checksum = "d05e27ee213611ffe7d6348b942e8f942b37114c00cc03cec254295a4a17852e" [[package]] name = "openssl-sys" -version = "0.9.110" +version = "0.9.111" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0a9f0075ba3c21b09f8e8b2026584b1d18d49388648f2fbbf3c97ea8deced8e2" +checksum = "82cab2d520aa75e3c58898289429321eb788c3106963d0dc886ec7a5f4adc321" dependencies = [ "cc", "libc", @@ -1530,14 +1702,18 @@ dependencies = [ [[package]] name = "os_info" -version = "3.12.0" +version = "3.13.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d0e1ac5fde8d43c34139135df8ea9ee9465394b2d8d20f032d38998f64afffc3" +checksum = "7c39b5918402d564846d5aba164c09a66cc88d232179dfd3e3c619a25a268392" dependencies = [ + "android_system_properties", "log", - "plist", + "nix", + "objc2", + "objc2-foundation", + "objc2-ui-kit", "serde", - "windows-sys 0.52.0", + "windows-sys 0.61.2", ] [[package]] @@ -1662,19 +1838,6 @@ version = "0.3.32" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7edddbd0b52d732b21ad9a5fab5c704c14cd949e5e9a1ec5929a24fded1b904c" -[[package]] -name = "plist" -version = "1.8.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "740ebea15c5d1428f910cd1a5f52cebf8d25006245ed8ade92702f4943d91e07" -dependencies = [ - "base64 0.22.1", - "indexmap 2.12.0", - "quick-xml", - "serde", - "time", -] - [[package]] name = "potential_utf" version = "0.1.4" @@ -1717,15 +1880,6 @@ dependencies = [ "unicode-ident", ] -[[package]] -name = "quick-xml" -version = "0.38.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "42a232e7487fc2ef313d96dde7948e7a3c05101870d8985e4fd8d26aedd27b89" -dependencies = [ - "memchr", -] - [[package]] name = "quinn" version = "0.11.9" @@ -1783,9 +1937,9 @@ dependencies = [ [[package]] name = "quote" -version = "1.0.41" +version = "1.0.42" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ce25767e7b499d1b604768e7cde645d14cc8584231ea6b295e9c9eb22c02e1d1" +checksum = "a338cc41d27e6cc6dce6cefc13a0729dfbb81c262b1f519331575dd80ef3067f" dependencies = [ "proc-macro2", ] @@ -1906,7 +2060,7 @@ version = "0.12.24" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9d0946410b9f7b082a427e4ef5c8ff541a88b357bc6c637c40db3a68ac70a36f" dependencies = [ - "base64 0.22.1", + "base64", "bytes", "futures-core", "futures-util", @@ -1989,6 +2143,7 @@ dependencies = [ "rstest", "serde", "serde_json", + "tempfile", "tokio", "tracing", "tracing-subscriber", @@ -1996,14 +2151,16 @@ dependencies = [ [[package]] name = "ron" -version = "0.8.1" +version = "0.12.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b91f7eff05f748767f183df4320a63d6936e9c6107d97c9e6bdd9784f4289c94" +checksum = "fd490c5b18261893f14449cbd28cb9c0b637aebf161cd77900bfdedaff21ec32" dependencies = [ - "base64 0.21.7", "bitflags", + "once_cell", "serde", "serde_derive", + "typeid", + "unicode-ident", ] [[package]] @@ -2082,9 +2239,9 @@ dependencies = [ [[package]] name = "rustls" -version = "0.23.34" +version = "0.23.35" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6a9586e9ee2b4f8fab52a0048ca7334d7024eef48e2cb9407e3497bb7cab7fa7" +checksum = "533f54bc6a7d4f647e46ad909549eda97bf5afc1585190ef692b4286b198bd8f" dependencies = [ "once_cell", "ring", @@ -2150,9 +2307,9 @@ dependencies = [ [[package]] name = "schemars" -version = "1.0.4" +version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "82d20c4491bc164fa2f6c5d44565947a52ad80b9505d8e36f8d54c27c739fcd0" +checksum = "9558e172d4e8533736ba97870c4b2cd63f84b382a3d6eb063da41b91cce17289" dependencies = [ "dyn-clone", "ref-cast", @@ -2286,13 +2443,13 @@ version = "3.15.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "aa66c845eee442168b2c8134fec70ac50dc20e760769c8ba0ad1319ca1959b04" dependencies = [ - "base64 0.22.1", + "base64", "chrono", "hex", "indexmap 1.9.3", "indexmap 2.12.0", "schemars 0.9.0", - "schemars 1.0.4", + "schemars 1.1.0", "serde_core", "serde_json", "serde_with_macros", @@ -2379,9 +2536,9 @@ checksum = "13c2bddecc57b384dee18652358fb23172facb8a2c51ccc10d74c157bdea3292" [[package]] name = "syn" -version = "2.0.108" +version = "2.0.110" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "da58917d35242480a05c2897064da0a80589a2a0476c9a3f2fdc83b53502e917" +checksum = "a99801b5bd34ede4cf3fc688c5919368fea4e4814a4664359503e6015b280aea" dependencies = [ "proc-macro2", "quote", @@ -2594,9 +2751,9 @@ dependencies = [ [[package]] name = "tokio-util" -version = "0.7.16" +version = "0.7.17" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "14307c986784f72ef81c89db7d9e28d6ac26d16213b109ea501696195e6e3ce5" +checksum = "2efa149fe76073d6e8fd97ef4f4eca7b67f599660115591483572e406e165594" dependencies = [ "bytes", "futures-core", @@ -3028,9 +3185,9 @@ dependencies = [ [[package]] name = "webpki-roots" -version = "1.0.3" +version = "1.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "32b130c0d2d49f8b6889abc456e795e82525204f27c42cf767cf0d7734e089b8" +checksum = "b2878ef029c47c6e8cf779119f20fcf52bde7ad42a731b2a304bc221df17571e" dependencies = [ "rustls-pki-types", ] diff --git a/Cargo.toml b/Cargo.toml index 6d1b3e3..d0fb103 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -37,6 +37,7 @@ tracing-subscriber = "0.3" [dev-dependencies] rstest = "0.24" +tempfile = "3.13" [dev-dependencies.cargo-husky] version = "1" diff --git a/src/chat.rs b/src/chat.rs index 5bd7239..ecf8b79 100644 --- a/src/chat.rs +++ b/src/chat.rs @@ -1,20 +1,28 @@ +use std::sync::Arc; + use color_eyre::{Result, eyre::WrapErr}; -// Lots of namespace confusion potential -use crate::qna::LLMHandle; use config::Config as MainConfig; use futures::StreamExt; -use irc::client::prelude::{Client as IRCClient, Command, Config as IRCConfig}; +use irc::client::prelude::{Client, Command, Config as IRCConfig, Message}; +use tokio::sync::mpsc; use tracing::{Level, event, instrument}; +use crate::{Event, EventManager, LLMHandle, commands}; + #[derive(Debug)] pub struct Chat { - client: IRCClient, + client: Client, + event_manager: Arc, llm_handle: LLMHandle, // FIXME: This needs to be thread safe, and shared, etc. } // Need: owners, channels, username, nick, server, password #[instrument] -pub async fn new(settings: &MainConfig, handle: &LLMHandle) -> Result { +pub async fn new( + settings: &MainConfig, + handle: &LLMHandle, + manager: Arc, +) -> Result { // Going to just assign and let the irc library handle errors for now, and // add my own checking if necessary. let port: u16 = settings.get("port")?; @@ -35,33 +43,77 @@ pub async fn new(settings: &MainConfig, handle: &LLMHandle) -> Result { event!(Level::INFO, "IRC connection starting..."); Ok(Chat { - client: IRCClient::from_config(config).await?, + client: Client::from_config(config).await?, llm_handle: handle.clone(), + event_manager: manager, }) } impl Chat { - pub async fn run(&mut self) -> Result<()> { - let client = &mut self.client; + pub async fn run(&mut self, mut command_in: mpsc::Receiver) -> Result<()> { + self.client.identify()?; - client.identify()?; + let mut stream = self.client.stream()?; - let mut stream = client.stream()?; + loop { + tokio::select! { + message = stream.next() => { + match message { + Some(Ok(msg)) => { + self.handle_chat_message(&msg).await?; + } + Some(Err(e)) => return Err(e.into()), + None => break, // disconnected + } + } + command = command_in.recv() => { + event!(Level::INFO, "Received command {:#?}", command); + match command { + Some(commands::Command::SendMessage {channel, message} ) => { + // Now to pass on the message. + event!(Level::INFO, "Trying to send to channel."); + self.client.send_privmsg(&channel, &message).wrap_err("Couldn't send to channel")?; + event!(Level::INFO, "Message sent successfully."); - while let Some(message) = stream.next().await.transpose()? { - if let Command::PRIVMSG(channel, message) = message.command - && message.starts_with("!gem") - { - let mut msg = self.llm_handle.send_request(&message).await?; - event!(Level::INFO, "Asked: {}", message); - event!(Level::INFO, "Answered: {}", msg); + } + None => { + event!(Level::ERROR, + "Command channel unexpectedly closed - \ + FIFO reader may have crashed"); + break; + } + } + } + } + } - // Make it all one line. - msg.retain(|c| c != '\n' && c != '\r'); - msg.truncate(500); - client - .send_privmsg(&channel, msg) - .wrap_err("Could not send to {channel}")?; + Ok(()) + } + + async fn handle_chat_message(&mut self, message: &Message) -> Result<()> { + // Broadcast anything here. If it should not be broadcasted then + // TryFrom should fail. + if let Ok(event) = Event::try_from(message) { + self.event_manager.broadcast(&event).await?; + } + + // Only handle PRIVMSG for now. + if let Command::PRIVMSG(channel, msg) = &message.command { + // Just preserve the original behavior for now. + if msg.starts_with("!gem") { + let mut llm_response = self.llm_handle.send_request(msg).await?; + + event!(Level::INFO, "Asked: {message}"); + event!(Level::INFO, "Response: {llm_response}"); + + // Keep responses to one line for now. + llm_response.retain(|c| c != '\n' && c != '\r'); + + // TODO: Make this configurable. + llm_response.truncate(500); + + event!(Level::INFO, "Sending {llm_response} to channel {channel}"); + self.client.send_privmsg(channel, llm_response)?; } } diff --git a/src/commands.rs b/src/commands.rs index 5935fcc..84ec85c 100644 --- a/src/commands.rs +++ b/src/commands.rs @@ -1,8 +1,8 @@ use std::fmt::Display; -use serde::Deserialize; +use serde::{Deserialize, Serialize}; -#[derive(Debug, Deserialize)] +#[derive(Debug, Deserialize, Serialize)] pub enum Command { SendMessage { channel: String, message: String }, } diff --git a/src/event.rs b/src/event.rs index bc7356d..e62b0db 100644 --- a/src/event.rs +++ b/src/event.rs @@ -1,14 +1,32 @@ +use irc::proto::{Command, Message}; use serde::{Deserialize, Serialize}; #[derive(Deserialize, Serialize)] pub struct Event { + from: String, message: String, } impl Event { - pub fn new(msg: impl Into) -> Self { + pub fn new(from: impl Into, msg: impl Into) -> Self { Self { + from: from.into(), message: msg.into(), } } } + +impl TryFrom<&Message> for Event { + type Error = &'static str; + + fn try_from(value: &Message) -> Result { + let from = value.response_target().unwrap_or("unknown").to_string(); + match &value.command { + Command::PRIVMSG(_channel, message) => Ok(Event { + from, + message: message.clone(), + }), + _ => Err("Not a PRIVMSG"), + } + } +} diff --git a/src/event_manager.rs b/src/event_manager.rs index bfe0b1f..a5361cb 100644 --- a/src/event_manager.rs +++ b/src/event_manager.rs @@ -15,6 +15,7 @@ use crate::{commands::Command, event::Event}; const EVENT_BUF_MAX: usize = 1000; // Manager for communication with plugins. +#[derive(Debug)] pub struct EventManager { announce: broadcast::Sender, // Everything broadcasts here. events: Arc>>, // Ring buffer. @@ -130,7 +131,7 @@ mod tests { #[tokio::test] async fn test_broadcast_adds_event_to_buffer() { let manager = EventManager::new().unwrap(); - let event = Event::new("test message"); + let event = Event::new("test_user", "test message"); manager.broadcast(&event).await.unwrap(); @@ -143,7 +144,7 @@ mod tests { #[tokio::test] async fn test_broadcast_serializes_event_as_json() { let manager = EventManager::new().unwrap(); - let event = Event::new("hello world"); + let event = Event::new("test_user", "hello world"); manager.broadcast(&event).await.unwrap(); @@ -165,7 +166,7 @@ mod tests { let manager = EventManager::new().unwrap(); for i in 0..count { - let event = Event::new(format!("event {}", i)); + let event = Event::new("test_user", format!("event {}", i)); manager.broadcast(&event).await.unwrap(); } @@ -179,7 +180,7 @@ mod tests { // Fill to exactly EVENT_BUF_MAX (1000) for i in 0..EVENT_BUF_MAX { - let event = Event::new(format!("event {}", i)); + let event = Event::new("test_user", format!("event {}", i)); manager.broadcast(&event).await.unwrap(); } @@ -201,7 +202,7 @@ mod tests { // Broadcast more events than buffer can hold for i in 0..total { - let event = Event::new(format!("event {}", i)); + let event = Event::new("test_user", format!("event {}", i)); manager.broadcast(&event).await.unwrap(); } @@ -229,7 +230,7 @@ mod tests { let messages = vec!["first", "second", "third", "fourth", "fifth"]; for msg in &messages { - let event = Event::new(*msg); + let event = Event::new("test_user", *msg); manager.broadcast(&event).await.unwrap(); } @@ -247,13 +248,13 @@ mod tests { // Fill buffer completely for i in 0..EVENT_BUF_MAX { - let event = Event::new(format!("old {}", i)); + let event = Event::new("test_user", format!("old {}", i)); manager.broadcast(&event).await.unwrap(); } // Add 5 more events for i in 0..5 { - let event = Event::new(format!("new {}", i)); + let event = Event::new("test_user", format!("new {}", i)); manager.broadcast(&event).await.unwrap(); } @@ -283,7 +284,7 @@ mod tests { let manager_clone = Arc::clone(&manager); let handle = tokio::spawn(async move { for i in 0..10 { - let event = Event::new(format!("task {} event {}", task_id, i)); + let event = Event::new("test_user", format!("task {} event {}", task_id, i)); manager_clone.broadcast(&event).await.unwrap(); } }); @@ -298,4 +299,277 @@ mod tests { let events = manager.events.read().await; assert_eq!(events.len(), 100); } + + #[tokio::test] + async fn test_fifo_receives_and_forwards_single_command() { + let temp_dir = tempfile::tempdir().unwrap(); + let fifo_path = temp_dir.path().join("test.fifo"); + let (tx, mut rx) = mpsc::channel(10); + + // Spawn the FIFO reader + let path = fifo_path.clone(); + tokio::spawn(async move { + let _ = EventManager::start_fifo(&path, tx).await; + }); + + // Give it time to create the FIFO + tokio::time::sleep(tokio::time::Duration::from_millis(50)).await; + + // Write a command to the FIFO + let cmd = Command::SendMessage { + channel: "#test".to_string(), + message: "hello".to_string(), + }; + let json = serde_json::to_string(&cmd).unwrap() + "\n"; + + // Open FIFO for writing and write the command + tokio::spawn(async move { + use tokio::io::AsyncWriteExt; + let tx = pipe::OpenOptions::new().open_sender(&fifo_path).unwrap(); + let mut tx = tokio::io::BufWriter::new(tx); + tx.write_all(json.as_bytes()).await.unwrap(); + tx.flush().await.unwrap(); + }); + + // Should receive the command within a reasonable time + let received = tokio::time::timeout(tokio::time::Duration::from_secs(1), rx.recv()) + .await + .expect("timeout waiting for command") + .expect("channel closed"); + + match received { + Command::SendMessage { channel, message } => { + assert_eq!(channel, "#test"); + assert_eq!(message, "hello"); + } + } + } + + #[tokio::test] + async fn test_fifo_handles_multiple_commands() { + let temp_dir = tempfile::tempdir().unwrap(); + let fifo_path = temp_dir.path().join("test.fifo"); + let (tx, mut rx) = mpsc::channel(10); + + // Spawn the FIFO reader + let path = fifo_path.clone(); + tokio::spawn(async move { + let _ = EventManager::start_fifo(&path, tx).await; + }); + + // Give it time to create the FIFO + tokio::time::sleep(tokio::time::Duration::from_millis(50)).await; + + // Write multiple commands + let commands = vec![ + Command::SendMessage { + channel: "#chan1".to_string(), + message: "first".to_string(), + }, + Command::SendMessage { + channel: "#chan2".to_string(), + message: "second".to_string(), + }, + Command::SendMessage { + channel: "#chan3".to_string(), + message: "third".to_string(), + }, + ]; + + tokio::spawn(async move { + use tokio::io::AsyncWriteExt; + let tx = pipe::OpenOptions::new().open_sender(&fifo_path).unwrap(); + let mut tx = tokio::io::BufWriter::new(tx); + + for cmd in commands { + let json = serde_json::to_string(&cmd).unwrap() + "\n"; + tx.write_all(json.as_bytes()).await.unwrap(); + } + tx.flush().await.unwrap(); + }); + + // Receive all three commands in order + let first = tokio::time::timeout(tokio::time::Duration::from_secs(1), rx.recv()) + .await + .expect("timeout on first") + .expect("channel closed"); + + match first { + Command::SendMessage { channel, message } => { + assert_eq!(channel, "#chan1"); + assert_eq!(message, "first"); + } + } + + let second = tokio::time::timeout(tokio::time::Duration::from_secs(1), rx.recv()) + .await + .expect("timeout on second") + .expect("channel closed"); + + match second { + Command::SendMessage { channel, message } => { + assert_eq!(channel, "#chan2"); + assert_eq!(message, "second"); + } + } + + let third = tokio::time::timeout(tokio::time::Duration::from_secs(1), rx.recv()) + .await + .expect("timeout on third") + .expect("channel closed"); + + match third { + Command::SendMessage { channel, message } => { + assert_eq!(channel, "#chan3"); + assert_eq!(message, "third"); + } + } + } + + #[tokio::test] + async fn test_fifo_reopens_after_writer_closes() { + let temp_dir = tempfile::tempdir().unwrap(); + let fifo_path = temp_dir.path().join("test.fifo"); + let (tx, mut rx) = mpsc::channel(10); + + // Spawn the FIFO reader + let path = fifo_path.clone(); + tokio::spawn(async move { + let _ = EventManager::start_fifo(&path, tx).await; + }); + + // Give it time to create the FIFO + tokio::time::sleep(tokio::time::Duration::from_millis(50)).await; + + // First writer sends a command and closes + { + use tokio::io::AsyncWriteExt; + let path = fifo_path.clone(); + tokio::spawn(async move { + let tx = pipe::OpenOptions::new().open_sender(&path).unwrap(); + let mut tx = tokio::io::BufWriter::new(tx); + + let cmd = Command::SendMessage { + channel: "#first".to_string(), + message: "batch1".to_string(), + }; + let json = serde_json::to_string(&cmd).unwrap() + "\n"; + tx.write_all(json.as_bytes()).await.unwrap(); + tx.flush().await.unwrap(); + // Writer drops here, closing the FIFO + }); + + let first = tokio::time::timeout(tokio::time::Duration::from_secs(1), rx.recv()) + .await + .expect("timeout on first batch") + .expect("channel closed"); + + match first { + Command::SendMessage { channel, message } => { + assert_eq!(channel, "#first"); + assert_eq!(message, "batch1"); + } + } + } + + // Give the FIFO time to reopen + tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; + + // Second writer opens and sends a command + { + use tokio::io::AsyncWriteExt; + tokio::spawn(async move { + let tx = pipe::OpenOptions::new().open_sender(&fifo_path).unwrap(); + let mut tx = tokio::io::BufWriter::new(tx); + + let cmd = Command::SendMessage { + channel: "#second".to_string(), + message: "batch2".to_string(), + }; + let json = serde_json::to_string(&cmd).unwrap() + "\n"; + tx.write_all(json.as_bytes()).await.unwrap(); + tx.flush().await.unwrap(); + }); + + let second = tokio::time::timeout(tokio::time::Duration::from_secs(1), rx.recv()) + .await + .expect("timeout on second batch - FIFO may not have reopened") + .expect("channel closed"); + + match second { + Command::SendMessage { channel, message } => { + assert_eq!(channel, "#second"); + assert_eq!(message, "batch2"); + } + } + } + } + + #[tokio::test] + async fn test_fifo_handles_empty_lines() { + let temp_dir = tempfile::tempdir().unwrap(); + let fifo_path = temp_dir.path().join("test.fifo"); + let (tx, mut rx) = mpsc::channel(10); + + // Spawn the FIFO reader + let path = fifo_path.clone(); + let handle = tokio::spawn(async move { EventManager::start_fifo(&path, tx).await }); + + // Give it time to create the FIFO + tokio::time::sleep(tokio::time::Duration::from_millis(50)).await; + + // Write command, empty line, whitespace, another command + tokio::spawn(async move { + use tokio::io::AsyncWriteExt; + let tx = pipe::OpenOptions::new().open_sender(&fifo_path).unwrap(); + let mut tx = tokio::io::BufWriter::new(tx); + + let cmd1 = Command::SendMessage { + channel: "#test".to_string(), + message: "first".to_string(), + }; + let json1 = serde_json::to_string(&cmd1).unwrap() + "\n"; + tx.write_all(json1.as_bytes()).await.unwrap(); + + // Write empty line + tx.write_all(b"\n").await.unwrap(); + + // Write whitespace line + tx.write_all(b" \n").await.unwrap(); + + let cmd2 = Command::SendMessage { + channel: "#test".to_string(), + message: "second".to_string(), + }; + let json2 = serde_json::to_string(&cmd2).unwrap() + "\n"; + tx.write_all(json2.as_bytes()).await.unwrap(); + tx.flush().await.unwrap(); + }); + + // Should receive first command + let first = tokio::time::timeout(tokio::time::Duration::from_millis(500), rx.recv()) + .await + .expect("timeout on first") + .expect("channel closed"); + + match first { + Command::SendMessage { channel, message } => { + assert_eq!(channel, "#test"); + assert_eq!(message, "first"); + } + } + + // The empty/whitespace lines should cause JSON parse errors + // which will cause start_fifo to error and exit + // So we expect the handle to complete (with an error) + let result = tokio::time::timeout(tokio::time::Duration::from_secs(1), handle) + .await + .expect("FIFO task should exit due to parse error"); + + // The task should have errored + assert!( + result.unwrap().is_err(), + "Expected parse error from empty line" + ); + } } diff --git a/src/lib.rs b/src/lib.rs index ac72eaf..92efda8 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -4,6 +4,7 @@ use std::{os::unix::fs, sync::Arc}; use color_eyre::{Result, eyre::WrapErr}; use human_panic::setup_panic; +use tokio::sync::mpsc; use tracing::{Level, info}; use tracing_subscriber::FmtSubscriber; @@ -67,18 +68,23 @@ pub async fn run() -> Result<()> { let ev_manager = Arc::new(EventManager::new()?); let ev_manager_clone = Arc::clone(&ev_manager); - ev_manager_clone - .broadcast(&Event::new("Starting...")) - .await?; - let mut c = chat::new(&config, &handle).await?; + let mut c = chat::new(&config, &handle, Arc::clone(&ev_manager)).await?; + + let (from_plugins, to_chat) = mpsc::channel(100); tokio::select! { _ = ev_manager_clone.start_listening("/tmp/robo.sock") => { // Event listener ended } - result = c.run() => { - result.unwrap(); + result = c.run(to_chat) => { + if let Err(e) = result { + tracing::error!("Chat run error: {:?}", e); + return Err(e); + } + } + fifo = EventManager::start_fifo("/tmp/robo_in.sock", from_plugins) => { + fifo.wrap_err("FIFO reader failed.")?; } } diff --git a/tests/event_test.rs b/tests/event_test.rs index 5237b76..1a1b6ca 100644 --- a/tests/event_test.rs +++ b/tests/event_test.rs @@ -65,7 +65,7 @@ async fn test_client_connects_and_receives_event() { tokio::time::sleep(Duration::from_millis(50)).await; // Broadcast an event - let event = Event::new("test message"); + let event = Event::new("test_user", "test message"); manager.broadcast(&event).await.unwrap(); // Connect as a client @@ -90,7 +90,7 @@ async fn test_client_receives_event_history() { // Broadcast events BEFORE starting the listener for i in 0..5 { - let event = Event::new(format!("historical event {}", i)); + let event = Event::new("test_user", format!("historical event {}", i)); manager.broadcast(&event).await.unwrap(); } @@ -142,7 +142,7 @@ async fn test_multiple_clients_receive_same_events() { let mut reader3 = BufReader::new(stream3); // Broadcast a new event - let event = Event::new("broadcast to all"); + let event = Event::new("test_user", "broadcast to all"); manager.broadcast(&event).await.unwrap(); // All clients should receive the event @@ -191,7 +191,7 @@ async fn test_late_joiner_receives_full_history() { // Broadcast several events for i in 0..10 { - let event = Event::new(format!("event {}", i)); + let event = Event::new("test_user", format!("event {}", i)); manager.broadcast(&event).await.unwrap(); } @@ -234,7 +234,7 @@ async fn test_client_receives_events_in_order() { // Broadcast events rapidly let count = 50; for i in 0..count { - let event = Event::new(format!("sequence {}", i)); + let event = Event::new("test_user", format!("sequence {}", i)); manager.broadcast(&event).await.unwrap(); } @@ -279,7 +279,7 @@ async fn test_concurrent_broadcasts_during_client_connections() { let broadcast_manager = Arc::clone(&manager); let broadcast_handle = tokio::spawn(async move { for i in 0..100 { - let event = Event::new(format!("concurrent event {}", i)); + let event = Event::new("test_user", format!("concurrent event {}", i)); broadcast_manager.broadcast(&event).await.unwrap(); tokio::time::sleep(Duration::from_millis(5)).await; } @@ -329,7 +329,7 @@ async fn test_buffer_overflow_affects_new_clients() { // Broadcast more than buffer max (1000) for i in 0..1100 { - let event = Event::new(format!("overflow event {}", i)); + let event = Event::new("test_user", format!("overflow event {}", i)); manager.broadcast(&event).await.unwrap(); } @@ -387,7 +387,7 @@ async fn test_client_count_scaling(#[case] num_clients: usize, #[case] events_pe // Broadcast events for i in 0..events_per_client { - let event = Event::new(format!("scale event {}", i)); + let event = Event::new("test_user", format!("scale event {}", i)); manager.broadcast(&event).await.unwrap(); } @@ -426,7 +426,7 @@ async fn test_client_disconnect_doesnt_affect_others() { // Broadcast initial event manager - .broadcast(&Event::new("before disconnect")) + .broadcast(&Event::new("test_user", "before disconnect")) .await .unwrap(); @@ -440,7 +440,7 @@ async fn test_client_disconnect_doesnt_affect_others() { // Broadcast another event manager - .broadcast(&Event::new("after disconnect")) + .broadcast(&Event::new("test_user", "after disconnect")) .await .unwrap(); @@ -473,7 +473,10 @@ async fn test_json_deserialization_of_received_events() { // Broadcast an event with special characters let test_message = "special chars: @#$% newline\\n tab\\t quotes \"test\""; - manager.broadcast(&Event::new(test_message)).await.unwrap(); + manager + .broadcast(&Event::new("test_user", test_message)) + .await + .unwrap(); // Connect and deserialize let stream = UnixStream::connect(&socket_path).await.unwrap();