Compare commits

...

10 Commits

Author SHA1 Message Date
Micheal Smith
585afa5f6f Implemented external processes as potential plugins. 2025-11-20 04:17:09 -06:00
Micheal Smith
30e2d9a448 Renamed commands/Command to plugin/Plugin. 2025-11-14 07:19:28 -06:00
Micheal Smith
70de039610 Broadcast, and FIFO are currently functional. 2025-11-14 05:06:57 -06:00
8ec4f2860c Implementing fifo channel, and command structure. 2025-11-13 21:07:49 -06:00
Micheal Smith
21d9c3f002 Adding response FIFO. 2025-11-12 06:42:30 -06:00
Micheal Smith
f880795b44 Added an integration test for events. 2025-11-11 00:39:58 -06:00
a158ee385f Moved most main.rs functionality into lib.rs 2025-11-10 22:58:44 -06:00
Micheal Smith
2da7cc4450 Breaking out some portions for integration testing. 2025-11-10 22:20:09 -06:00
Micheal Smith
3af95235e6 Added some tests at least for the broadcast buffering. 2025-11-10 05:26:59 -06:00
Micheal Smith
5d390ee9f3 Adding some IPC. 2025-11-09 08:26:39 -06:00
15 changed files with 2214 additions and 258 deletions

469
Cargo.lock generated
View File

@@ -17,6 +17,15 @@ version = "2.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "320119579fcad9c21884f5c4861d16174d0e06250625266f50fe6898340abefa"
[[package]]
name = "aho-corasick"
version = "1.1.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ddd31a130427c27518df266943a5308ed92d4b226cc639f5a8f1002816174301"
dependencies = [
"memchr",
]
[[package]]
name = "android_system_properties"
version = "0.1.5"
@@ -58,22 +67,22 @@ dependencies = [
[[package]]
name = "anstyle-query"
version = "1.1.4"
version = "1.1.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9e231f6134f61b71076a3eab506c379d4f36122f2af15a9ff04415ea4c3339e2"
checksum = "40c48f72fd53cd289104fc64099abca73db4166ad86ea0b4341abe65af83dadc"
dependencies = [
"windows-sys 0.60.2",
"windows-sys 0.61.2",
]
[[package]]
name = "anstyle-wincon"
version = "3.0.10"
version = "3.0.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3e0633414522a32ffaac8ac6cc8f748e090c5717661fddeea04219e2344f5f2a"
checksum = "291e6a250ff86cd4a820112fb8898808a366d8f9f58ce16d1f538353ad55747d"
dependencies = [
"anstyle",
"once_cell_polyfill",
"windows-sys 0.60.2",
"windows-sys 0.61.2",
]
[[package]]
@@ -120,12 +129,6 @@ dependencies = [
"windows-link",
]
[[package]]
name = "base64"
version = "0.21.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9d297deb1925b89f2ccc13d7635fa0714f12c87adce1c75356b39ca9b7178567"
[[package]]
name = "base64"
version = "0.22.1"
@@ -160,6 +163,15 @@ dependencies = [
"generic-array",
]
[[package]]
name = "block2"
version = "0.6.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cdeb9d870516001442e364c5220d3574d2da8dc765554b4a617230d33fa58ef5"
dependencies = [
"objc2",
]
[[package]]
name = "bumpalo"
version = "3.19.0"
@@ -173,10 +185,16 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d71b6127be86fdcfddb610f7182ac57211d4b18a3e9c82eb2d17662f2227ad6a"
[[package]]
name = "cc"
version = "1.2.43"
name = "cargo-husky"
version = "1.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "739eb0f94557554b3ca9a86d2d37bebd49c5e6d0c1d2bda35ba5bdac830befc2"
checksum = "7b02b629252fe8ef6460461409564e2c21d0c8e77e0944f3d189ff06c4e932ad"
[[package]]
name = "cc"
version = "1.2.46"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b97463e1064cb1b1c1384ad0a0b9c8abd0988e2a91f52606c80ef14aadb63e36"
dependencies = [
"find-msvc-tools",
"shlex",
@@ -281,9 +299,9 @@ checksum = "b05b61dc5112cbb17e4b6cd61790d9845d13888356391624cbe7e41efeac1e75"
[[package]]
name = "config"
version = "0.15.18"
version = "0.15.19"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "180e549344080374f9b32ed41bf3b6b57885ff6a289367b3dbc10eea8acc1918"
checksum = "b30fa8254caad766fc03cb0ccae691e14bf3bd72bfff27f72802ce729551b3d6"
dependencies = [
"async-trait",
"convert_case",
@@ -373,9 +391,9 @@ checksum = "460fbee9c2c2f33933d720630a6a0bac33ba7053db5344fac858d4b8952d77d5"
[[package]]
name = "crypto-common"
version = "0.1.6"
version = "0.1.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1bfb12502f3fc46cca1bb51ac28df9d618d813cdc3d2f25b9fe775a34af26bb3"
checksum = "78c8292055d1c1df0cce5d180393dc8cce0abec0a7102adb6c7b1eef6016d60a"
dependencies = [
"generic-array",
"typenum",
@@ -498,6 +516,16 @@ dependencies = [
"windows-sys 0.61.2",
]
[[package]]
name = "dispatch2"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "89a09f22a6c6069a18470eb92d2298acf25463f14256d24778e1230d789a2aec"
dependencies = [
"bitflags",
"objc2",
]
[[package]]
name = "displaydoc"
version = "0.2.5"
@@ -611,9 +639,9 @@ checksum = "877a4ace8713b0bcf2a4e7eec82529c029f1d0619886d18145fea96c3ffe5c0f"
[[package]]
name = "erased-serde"
version = "0.4.8"
version = "0.4.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "259d404d09818dec19332e31d94558aeb442fea04c817006456c24b5460bbd4b"
checksum = "89e8918065695684b2b0702da20382d5ae6065cf3327bc2d6436bd49a71ce9f3"
dependencies = [
"serde",
"serde_core",
@@ -659,9 +687,9 @@ checksum = "37909eebbb50d72f9059c3b6d82c0463f2ff062c9e95845c43a6c9c0355411be"
[[package]]
name = "find-msvc-tools"
version = "0.1.4"
version = "0.1.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "52051878f80a721bb68ebfbc930e07b65ba72f2da88968ea5c06fd6ca3d3a127"
checksum = "3a3076410a55c90011c298b04d0cfa770b00fa04e1e3c97d3f6c9de105a03844"
[[package]]
name = "fnv"
@@ -817,9 +845,9 @@ dependencies = [
[[package]]
name = "generic-array"
version = "0.14.9"
version = "0.14.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4bb6743198531e02858aeaea5398fcc883e71851fcbcb5a2f773e2fb6cb1edf2"
checksum = "85649ca51fd72272d7821adaf274ad91c288277713d9c18820d8499a7ff69e9a"
dependencies = [
"typenum",
"version_check",
@@ -858,6 +886,12 @@ version = "0.32.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e629b9b98ef3dd8afe6ca2bd0f89306cec16d43d907889945bc5d6687f2f13c7"
[[package]]
name = "glob"
version = "0.3.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0cc23270f6e1808e30a928bdc84dea0b9b4136a8bc82338574f23baf47bbd280"
[[package]]
name = "hashbrown"
version = "0.12.3"
@@ -964,9 +998,9 @@ dependencies = [
[[package]]
name = "hyper"
version = "1.7.0"
version = "1.8.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "eb3aa54a13a0dfe7fbe3a59e0c76093041720fdc77b110cc0fc260fafb4dc51e"
checksum = "2ab2d4f250c3d7b1c9fcdff1cece94ea4e2dfbec68614f7b87cb205f24ca9d11"
dependencies = [
"atomic-waker",
"bytes",
@@ -1002,11 +1036,11 @@ dependencies = [
[[package]]
name = "hyper-util"
version = "0.1.17"
version = "0.1.18"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3c6995591a8f1380fcb4ba966a252a4b29188d51d2b89e3a252f5305be65aea8"
checksum = "52e9a2a24dc5c6821e71a7030e1e14b7b632acac55c40e9d2e082c621261bb56"
dependencies = [
"base64 0.22.1",
"base64",
"bytes",
"futures-channel",
"futures-core",
@@ -1229,9 +1263,9 @@ dependencies = [
[[package]]
name = "iri-string"
version = "0.7.8"
version = "0.7.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dbc5ebe9c3a1a7a5127f920a418f7585e9e758e911d0466ed004f393b0e380b2"
checksum = "4f867b9d1d896b67beb18518eda36fdb77a32ea590de864f1325b294a6d14397"
dependencies = [
"memchr",
"serde",
@@ -1380,6 +1414,18 @@ dependencies = [
"tempfile",
]
[[package]]
name = "nix"
version = "0.30.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "74523f3a35e05aba87a1d978330aef40f67b0304ac79c1c00b294c9830543db6"
dependencies = [
"bitflags",
"cfg-if",
"cfg_aliases",
"libc",
]
[[package]]
name = "nom"
version = "7.1.3"
@@ -1414,6 +1460,165 @@ dependencies = [
"autocfg",
]
[[package]]
name = "objc2"
version = "0.6.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b7c2599ce0ec54857b29ce62166b0ed9b4f6f1a70ccc9a71165b6154caca8c05"
dependencies = [
"objc2-encode",
]
[[package]]
name = "objc2-cloud-kit"
version = "0.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "73ad74d880bb43877038da939b7427bba67e9dd42004a18b809ba7d87cee241c"
dependencies = [
"bitflags",
"objc2",
"objc2-foundation",
]
[[package]]
name = "objc2-core-data"
version = "0.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0b402a653efbb5e82ce4df10683b6b28027616a2715e90009947d50b8dd298fa"
dependencies = [
"objc2",
"objc2-foundation",
]
[[package]]
name = "objc2-core-foundation"
version = "0.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2a180dd8642fa45cdb7dd721cd4c11b1cadd4929ce112ebd8b9f5803cc79d536"
dependencies = [
"bitflags",
"dispatch2",
"objc2",
]
[[package]]
name = "objc2-core-graphics"
version = "0.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e022c9d066895efa1345f8e33e584b9f958da2fd4cd116792e15e07e4720a807"
dependencies = [
"bitflags",
"dispatch2",
"objc2",
"objc2-core-foundation",
"objc2-io-surface",
]
[[package]]
name = "objc2-core-image"
version = "0.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e5d563b38d2b97209f8e861173de434bd0214cf020e3423a52624cd1d989f006"
dependencies = [
"objc2",
"objc2-foundation",
]
[[package]]
name = "objc2-core-location"
version = "0.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ca347214e24bc973fc025fd0d36ebb179ff30536ed1f80252706db19ee452009"
dependencies = [
"objc2",
"objc2-foundation",
]
[[package]]
name = "objc2-core-text"
version = "0.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0cde0dfb48d25d2b4862161a4d5fcc0e3c24367869ad306b0c9ec0073bfed92d"
dependencies = [
"bitflags",
"objc2",
"objc2-core-foundation",
"objc2-core-graphics",
]
[[package]]
name = "objc2-encode"
version = "4.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ef25abbcd74fb2609453eb695bd2f860d389e457f67dc17cafc8b8cbc89d0c33"
[[package]]
name = "objc2-foundation"
version = "0.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e3e0adef53c21f888deb4fa59fc59f7eb17404926ee8a6f59f5df0fd7f9f3272"
dependencies = [
"bitflags",
"block2",
"libc",
"objc2",
"objc2-core-foundation",
]
[[package]]
name = "objc2-io-surface"
version = "0.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "180788110936d59bab6bd83b6060ffdfffb3b922ba1396b312ae795e1de9d81d"
dependencies = [
"bitflags",
"objc2",
"objc2-core-foundation",
]
[[package]]
name = "objc2-quartz-core"
version = "0.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "96c1358452b371bf9f104e21ec536d37a650eb10f7ee379fff67d2e08d537f1f"
dependencies = [
"bitflags",
"objc2",
"objc2-core-foundation",
"objc2-foundation",
]
[[package]]
name = "objc2-ui-kit"
version = "0.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d87d638e33c06f577498cbcc50491496a3ed4246998a7fbba7ccb98b1e7eab22"
dependencies = [
"bitflags",
"block2",
"objc2",
"objc2-cloud-kit",
"objc2-core-data",
"objc2-core-foundation",
"objc2-core-graphics",
"objc2-core-image",
"objc2-core-location",
"objc2-core-text",
"objc2-foundation",
"objc2-quartz-core",
"objc2-user-notifications",
]
[[package]]
name = "objc2-user-notifications"
version = "0.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9df9128cbbfef73cda168416ccf7f837b62737d748333bfe9ab71c245d76613e"
dependencies = [
"objc2",
"objc2-foundation",
]
[[package]]
name = "object"
version = "0.37.3"
@@ -1437,9 +1642,9 @@ checksum = "384b8ab6d37215f3c5301a95a4accb5d64aa607f1fcb26a11b5303878451b4fe"
[[package]]
name = "openssl"
version = "0.10.74"
version = "0.10.75"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "24ad14dd45412269e1a30f52ad8f0664f0f4f4a89ee8fe28c3b3527021ebb654"
checksum = "08838db121398ad17ab8531ce9de97b244589089e290a384c900cb9ff7434328"
dependencies = [
"bitflags",
"cfg-if",
@@ -1469,9 +1674,9 @@ checksum = "d05e27ee213611ffe7d6348b942e8f942b37114c00cc03cec254295a4a17852e"
[[package]]
name = "openssl-sys"
version = "0.9.110"
version = "0.9.111"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0a9f0075ba3c21b09f8e8b2026584b1d18d49388648f2fbbf3c97ea8deced8e2"
checksum = "82cab2d520aa75e3c58898289429321eb788c3106963d0dc886ec7a5f4adc321"
dependencies = [
"cc",
"libc",
@@ -1497,14 +1702,18 @@ dependencies = [
[[package]]
name = "os_info"
version = "3.12.0"
version = "3.13.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d0e1ac5fde8d43c34139135df8ea9ee9465394b2d8d20f032d38998f64afffc3"
checksum = "7c39b5918402d564846d5aba164c09a66cc88d232179dfd3e3c619a25a268392"
dependencies = [
"android_system_properties",
"log",
"plist",
"nix",
"objc2",
"objc2-foundation",
"objc2-ui-kit",
"serde",
"windows-sys 0.52.0",
"windows-sys 0.61.2",
]
[[package]]
@@ -1629,19 +1838,6 @@ version = "0.3.32"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7edddbd0b52d732b21ad9a5fab5c704c14cd949e5e9a1ec5929a24fded1b904c"
[[package]]
name = "plist"
version = "1.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "740ebea15c5d1428f910cd1a5f52cebf8d25006245ed8ade92702f4943d91e07"
dependencies = [
"base64 0.22.1",
"indexmap 2.12.0",
"quick-xml",
"serde",
"time",
]
[[package]]
name = "potential_utf"
version = "0.1.4"
@@ -1666,6 +1862,15 @@ dependencies = [
"zerocopy",
]
[[package]]
name = "proc-macro-crate"
version = "3.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "219cb19e96be00ab2e37d6e299658a0cfa83e52429179969b0f0121b4ac46983"
dependencies = [
"toml_edit 0.23.7",
]
[[package]]
name = "proc-macro2"
version = "1.0.103"
@@ -1675,15 +1880,6 @@ dependencies = [
"unicode-ident",
]
[[package]]
name = "quick-xml"
version = "0.38.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "42a232e7487fc2ef313d96dde7948e7a3c05101870d8985e4fd8d26aedd27b89"
dependencies = [
"memchr",
]
[[package]]
name = "quinn"
version = "0.11.9"
@@ -1741,9 +1937,9 @@ dependencies = [
[[package]]
name = "quote"
version = "1.0.41"
version = "1.0.42"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ce25767e7b499d1b604768e7cde645d14cc8584231ea6b295e9c9eb22c02e1d1"
checksum = "a338cc41d27e6cc6dce6cefc13a0729dfbb81c262b1f519331575dd80ef3067f"
dependencies = [
"proc-macro2",
]
@@ -1823,13 +2019,48 @@ dependencies = [
"syn",
]
[[package]]
name = "regex"
version = "1.12.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "843bc0191f75f3e22651ae5f1e72939ab2f72a4bc30fa80a066bd66edefc24d4"
dependencies = [
"aho-corasick",
"memchr",
"regex-automata",
"regex-syntax",
]
[[package]]
name = "regex-automata"
version = "0.4.13"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5276caf25ac86c8d810222b3dbb938e512c55c6831a10f3e6ed1c93b84041f1c"
dependencies = [
"aho-corasick",
"memchr",
"regex-syntax",
]
[[package]]
name = "regex-syntax"
version = "0.8.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7a2d987857b319362043e95f5353c0535c1f58eec5336fdfcf626430af7def58"
[[package]]
name = "relative-path"
version = "1.9.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ba39f3699c378cd8970968dcbff9c43159ea4cfbd88d43c00b22f2ef10a435d2"
[[package]]
name = "reqwest"
version = "0.12.24"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9d0946410b9f7b082a427e4ef5c8ff541a88b357bc6c637c40db3a68ac70a36f"
dependencies = [
"base64 0.22.1",
"base64",
"bytes",
"futures-core",
"futures-util",
@@ -1899,6 +2130,8 @@ name = "robotnik"
version = "0.1.0"
dependencies = [
"better-panic",
"bytes",
"cargo-husky",
"clap",
"color-eyre",
"config",
@@ -1907,6 +2140,11 @@ dependencies = [
"genai",
"human-panic",
"irc",
"nix",
"rstest",
"serde",
"serde_json",
"tempfile",
"tokio",
"tracing",
"tracing-subscriber",
@@ -1914,14 +2152,46 @@ dependencies = [
[[package]]
name = "ron"
version = "0.8.1"
version = "0.12.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b91f7eff05f748767f183df4320a63d6936e9c6107d97c9e6bdd9784f4289c94"
checksum = "fd490c5b18261893f14449cbd28cb9c0b637aebf161cd77900bfdedaff21ec32"
dependencies = [
"base64 0.21.7",
"bitflags",
"once_cell",
"serde",
"serde_derive",
"typeid",
"unicode-ident",
]
[[package]]
name = "rstest"
version = "0.24.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "03e905296805ab93e13c1ec3a03f4b6c4f35e9498a3d5fa96dc626d22c03cd89"
dependencies = [
"futures-timer",
"futures-util",
"rstest_macros",
"rustc_version",
]
[[package]]
name = "rstest_macros"
version = "0.24.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ef0053bbffce09062bee4bcc499b0fbe7a57b879f1efe088d6d8d4c7adcdef9b"
dependencies = [
"cfg-if",
"glob",
"proc-macro-crate",
"proc-macro2",
"quote",
"regex",
"relative-path",
"rustc_version",
"syn",
"unicode-ident",
]
[[package]]
@@ -1946,6 +2216,15 @@ version = "2.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "357703d41365b4b27c590e3ed91eabb1b663f07c4c084095e60cbed4362dff0d"
[[package]]
name = "rustc_version"
version = "0.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cfcb3a22ef46e85b45de6ee7e79d063319ebb6594faafcf1c225ea92ab6e9b92"
dependencies = [
"semver",
]
[[package]]
name = "rustix"
version = "1.1.2"
@@ -1961,9 +2240,9 @@ dependencies = [
[[package]]
name = "rustls"
version = "0.23.34"
version = "0.23.35"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6a9586e9ee2b4f8fab52a0048ca7334d7024eef48e2cb9407e3497bb7cab7fa7"
checksum = "533f54bc6a7d4f647e46ad909549eda97bf5afc1585190ef692b4286b198bd8f"
dependencies = [
"once_cell",
"ring",
@@ -2029,9 +2308,9 @@ dependencies = [
[[package]]
name = "schemars"
version = "1.0.4"
version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "82d20c4491bc164fa2f6c5d44565947a52ad80b9505d8e36f8d54c27c739fcd0"
checksum = "9558e172d4e8533736ba97870c4b2cd63f84b382a3d6eb063da41b91cce17289"
dependencies = [
"dyn-clone",
"ref-cast",
@@ -2068,6 +2347,12 @@ dependencies = [
"libc",
]
[[package]]
name = "semver"
version = "1.0.27"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d767eb0aabc880b29956c35734170f26ed551a859dbd361d140cdbeca61ab1e2"
[[package]]
name = "serde"
version = "1.0.228"
@@ -2159,13 +2444,13 @@ version = "3.15.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "aa66c845eee442168b2c8134fec70ac50dc20e760769c8ba0ad1319ca1959b04"
dependencies = [
"base64 0.22.1",
"base64",
"chrono",
"hex",
"indexmap 1.9.3",
"indexmap 2.12.0",
"schemars 0.9.0",
"schemars 1.0.4",
"schemars 1.1.0",
"serde_core",
"serde_json",
"serde_with_macros",
@@ -2210,6 +2495,15 @@ version = "1.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0fda2ff0d084019ba4d7c6f371c95d8fd75ce3524c3cb8fb653a3023f6323e64"
[[package]]
name = "signal-hook-registry"
version = "1.4.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b2a4719bff48cee6b39d12c020eeb490953ad2443b7055bd0b21fca26bd8c28b"
dependencies = [
"libc",
]
[[package]]
name = "slab"
version = "0.4.11"
@@ -2252,9 +2546,9 @@ checksum = "13c2bddecc57b384dee18652358fb23172facb8a2c51ccc10d74c157bdea3292"
[[package]]
name = "syn"
version = "2.0.108"
version = "2.0.110"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "da58917d35242480a05c2897064da0a80589a2a0476c9a3f2fdc83b53502e917"
checksum = "a99801b5bd34ede4cf3fc688c5919368fea4e4814a4664359503e6015b280aea"
dependencies = [
"proc-macro2",
"quote",
@@ -2418,6 +2712,7 @@ dependencies = [
"libc",
"mio",
"pin-project-lite",
"signal-hook-registry",
"socket2",
"tokio-macros",
"windows-sys 0.61.2",
@@ -2467,9 +2762,9 @@ dependencies = [
[[package]]
name = "tokio-util"
version = "0.7.16"
version = "0.7.17"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "14307c986784f72ef81c89db7d9e28d6ac26d16213b109ea501696195e6e3ce5"
checksum = "2efa149fe76073d6e8fd97ef4f4eca7b67f599660115591483572e406e165594"
dependencies = [
"bytes",
"futures-core",
@@ -2487,7 +2782,7 @@ dependencies = [
"serde",
"serde_spanned 0.6.9",
"toml_datetime 0.6.11",
"toml_edit",
"toml_edit 0.19.15",
]
[[package]]
@@ -2535,6 +2830,18 @@ dependencies = [
"winnow 0.5.40",
]
[[package]]
name = "toml_edit"
version = "0.23.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6485ef6d0d9b5d0ec17244ff7eb05310113c3f316f2d14200d4de56b3cb98f8d"
dependencies = [
"indexmap 2.12.0",
"toml_datetime 0.7.3",
"toml_parser",
"winnow 0.7.13",
]
[[package]]
name = "toml_parser"
version = "1.0.4"
@@ -2889,9 +3196,9 @@ dependencies = [
[[package]]
name = "webpki-roots"
version = "1.0.3"
version = "1.0.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "32b130c0d2d49f8b6889abc456e795e82525204f27c42cf767cf0d7734e089b8"
checksum = "b2878ef029c47c6e8cf779119f20fcf52bde7ad42a731b2a304bc221df17571e"
dependencies = [
"rustls-pki-types",
]

View File

@@ -4,24 +4,57 @@ version = "0.1.0"
edition = "2024"
[dependencies]
# TODO: make this a dev and/or debug dependency later.
better-panic = "0.3.0"
clap = { version = "4.5", features = [ "derive" ] }
bytes = "1"
color-eyre = "0.6.3"
config = { version = "0.15", features = [ "toml" ] }
directories = "6.0"
futures = "0.3"
human-panic = "2.0"
genai = "0.4.3"
irc = "1.1"
tokio = { version = "1", features = [ "macros", "rt-multi-thread" ] }
serde_json = "1.0"
tracing = "0.1"
tracing-subscriber = "0.3"
[dependencies.nix]
version = "0.30.1"
features = ["fs"]
[dependencies.clap]
version = "4.5"
features = ["derive"]
[dependencies.config]
version = "0.15"
features = ["toml"]
[dependencies.serde]
version = "1.0"
features = ["derive"]
[dependencies.tokio]
version = "1"
features = [
"io-util",
"macros",
"net",
"process",
"rt-multi-thread",
"sync",
"time",
]
[dev-dependencies]
rstest = "0.24"
tempfile = "3.13"
[dev-dependencies.cargo-husky]
version = "1"
features = ["run-cargo-check", "run-cargo-clippy"]
[profile.release]
strip = true
opt-level = "z" # Optimize for size
lto = true # Link-time optimization
opt-level = "z"
lto = true
codegen-units = 1
# Comment this if unwinding is needed. Compiling without release works too.
panic = "abort"

View File

@@ -3,5 +3,5 @@ style_edition = "2024"
comment_width = 100
format_code_in_doc_comments = true
imports_granularity = "Crate"
imports_layout = "Vertical"
imports_layout = "HorizontalVertical"
wrap_comments = true

View File

@@ -1,34 +1,28 @@
use color_eyre::{
Result,
eyre::{
OptionExt,
WrapErr,
},
};
// Lots of namespace confusion potential
use crate::qna::LLMHandle;
use std::sync::Arc;
use color_eyre::{Result, eyre::WrapErr};
use config::Config as MainConfig;
use futures::StreamExt;
use irc::client::prelude::{
Client as IRCClient,
Command,
Config as IRCConfig,
};
use tracing::{
Level,
event,
instrument,
};
use irc::client::prelude::{Client, Command, Config as IRCConfig, Message};
use tokio::sync::mpsc;
use tracing::{Level, event, instrument};
use crate::{Event, EventManager, LLMHandle, plugin};
#[derive(Debug)]
pub struct Chat {
client: IRCClient,
client: Client,
event_manager: Arc<EventManager>,
llm_handle: LLMHandle, // FIXME: This needs to be thread safe, and shared, etc.
}
// Need: owners, channels, username, nick, server, password
#[instrument]
pub async fn new(settings: &MainConfig, handle: &LLMHandle) -> Result<Chat> {
pub async fn new(
settings: &MainConfig,
handle: &LLMHandle,
manager: Arc<EventManager>,
) -> Result<Chat> {
// Going to just assign and let the irc library handle errors for now, and
// add my own checking if necessary.
let port: u16 = settings.get("port")?;
@@ -49,31 +43,77 @@ pub async fn new(settings: &MainConfig, handle: &LLMHandle) -> Result<Chat> {
event!(Level::INFO, "IRC connection starting...");
Ok(Chat {
client: IRCClient::from_config(config).await?,
client: Client::from_config(config).await?,
llm_handle: handle.clone(),
event_manager: manager,
})
}
impl Chat {
pub async fn run(&mut self) -> Result<()> {
let client = &mut self.client;
pub async fn run(&mut self, mut command_in: mpsc::Receiver<plugin::Plugin>) -> Result<()> {
self.client.identify()?;
client.identify()?;
let mut stream = self.client.stream()?;
let mut stream = client.stream()?;
loop {
tokio::select! {
message = stream.next() => {
match message {
Some(Ok(msg)) => {
self.handle_chat_message(&msg).await?;
}
Some(Err(e)) => return Err(e.into()),
None => break, // disconnected
}
}
command = command_in.recv() => {
event!(Level::INFO, "Received command {:#?}", command);
match command {
Some(plugin::Plugin::SendMessage {channel, message} ) => {
// Now to pass on the message.
event!(Level::INFO, "Trying to send to channel.");
self.client.send_privmsg(&channel, &message).wrap_err("Couldn't send to channel")?;
event!(Level::INFO, "Message sent successfully.");
while let Some(message) = stream.next().await.transpose()? {
if let Command::PRIVMSG(channel, message) = message.command
&& message.starts_with("!gem")
{
let mut msg = self.llm_handle.send_request(&message).await?;
event!(Level::INFO, "Asked: {}", message);
event!(Level::INFO, "Answered: {}", msg);
}
None => {
event!(Level::ERROR,
"Command channel unexpectedly closed - \
FIFO reader may have crashed");
break;
}
}
}
}
}
// Make it all one line.
msg.retain(|c| c != '\n' && c != '\r');
msg.truncate(500);
client.send_privmsg(&channel, msg).wrap_err("Could not send to {channel}")?;
Ok(())
}
async fn handle_chat_message(&mut self, message: &Message) -> Result<()> {
// Broadcast anything here. If it should not be broadcasted then
// TryFrom should fail.
if let Ok(event) = Event::try_from(message) {
self.event_manager.broadcast(&event).await?;
}
// Only handle PRIVMSG for now.
if let Command::PRIVMSG(channel, msg) = &message.command {
// Just preserve the original behavior for now.
if msg.starts_with("!gem") {
let mut llm_response = self.llm_handle.send_request(msg).await?;
event!(Level::INFO, "Asked: {message}");
event!(Level::INFO, "Response: {llm_response}");
// Keep responses to one line for now.
llm_response.retain(|c| c != '\n' && c != '\r');
// TODO: Make this configurable.
llm_response.truncate(500);
event!(Level::INFO, "Sending {llm_response} to channel {channel}");
self.client.send_privmsg(channel, llm_response)?;
}
}

183
src/command.rs Normal file
View File

@@ -0,0 +1,183 @@
// Commands that are associated with external processes (commands).
use std::{
path::{Path, PathBuf},
time::Duration,
};
use bytes::Bytes;
use color_eyre::{Result, eyre::eyre};
use tokio::{fs::try_exists, process::Command, time::timeout};
use tracing::{Level, event};
#[derive(Debug)]
pub struct CommandDir {
command_path: PathBuf,
}
impl CommandDir {
pub fn new(command_path: impl AsRef<Path>) -> Self {
event!(
Level::INFO,
"CommandDir initialized with path: {:?}",
command_path.as_ref()
);
CommandDir {
command_path: command_path.as_ref().to_path_buf(),
}
}
async fn find_command(&self, name: impl AsRef<Path>) -> Result<String> {
let path = self.command_path.join(name.as_ref());
event!(
Level::INFO,
"Looking for {} command.",
name.as_ref().display()
);
match try_exists(&path).await {
Ok(true) => Ok(path.to_string_lossy().to_string()),
Ok(false) => Err(eyre!(format!("{} Not found.", path.to_string_lossy()))),
Err(e) => Err(e.into()),
}
}
pub async fn run_command(
&self,
command_name: impl AsRef<str>,
input: impl AsRef<str>,
) -> Result<Bytes> {
let path = self.find_command(Path::new(command_name.as_ref())).await?;
// Well it exists let's cross our fingers...
let output = Command::new(path).arg(input.as_ref()).output().await?;
if output.status.success() {
// So far so good
Ok(Bytes::from(output.stdout))
} else {
// Whoops
Err(eyre!(format!(
"Error running {}: {}",
command_name.as_ref(),
output.status
)))
}
}
pub async fn run_command_with_timeout(
&self,
command_name: impl AsRef<str>,
input: impl AsRef<str>,
time_out: Duration,
) -> Result<Bytes> {
timeout(time_out, self.run_command(command_name, input)).await?
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::{
fs::{self, Permissions},
os::unix::fs::PermissionsExt,
};
use tempfile::TempDir;
fn create_test_script(dir: &Path, name: &str, script: &str) -> PathBuf {
let path = dir.join(name);
fs::write(&path, script).unwrap();
fs::set_permissions(&path, Permissions::from_mode(0o755)).unwrap();
path
}
#[test]
fn test_command_dir_new() {
let dir = CommandDir::new("/some/path");
assert_eq!(dir.command_path, PathBuf::from("/some/path"));
}
#[tokio::test]
async fn test_find_command_exists() {
let temp = TempDir::new().unwrap();
create_test_script(temp.path(), "test_cmd", "#!/bin/bash\necho hello");
let cmd_dir = CommandDir::new(temp.path());
let result = cmd_dir.find_command("test_cmd").await;
assert!(result.is_ok());
assert!(result.unwrap().contains("test_cmd"));
}
#[tokio::test]
async fn test_find_command_not_found() {
let temp = TempDir::new().unwrap();
let cmd_dir = CommandDir::new(temp.path());
let result = cmd_dir.find_command("nonexistent").await;
assert!(result.is_err());
assert!(result.unwrap_err().to_string().contains("Not found"));
}
#[tokio::test]
async fn test_run_command_success() {
let temp = TempDir::new().unwrap();
create_test_script(temp.path(), "echo_cmd", "#!/bin/bash\necho \"$1\"");
let cmd_dir = CommandDir::new(temp.path());
let result = cmd_dir.run_command("echo_cmd", "hello world").await;
assert!(result.is_ok());
let output = result.unwrap();
assert_eq!(output.as_ref(), b"hello world\n");
}
#[tokio::test]
async fn test_run_command_failure() {
let temp = TempDir::new().unwrap();
create_test_script(temp.path(), "fail_cmd", "#!/bin/bash\nexit 1");
let cmd_dir = CommandDir::new(temp.path());
let result = cmd_dir.run_command("fail_cmd", "input").await;
assert!(result.is_err());
assert!(result.unwrap_err().to_string().contains("Error running"));
}
#[tokio::test]
async fn test_run_command_not_found() {
let temp = TempDir::new().unwrap();
let cmd_dir = CommandDir::new(temp.path());
let result = cmd_dir.run_command("missing", "input").await;
assert!(result.is_err());
}
#[tokio::test]
async fn test_run_command_with_timeout_success() {
let temp = TempDir::new().unwrap();
create_test_script(temp.path(), "fast_cmd", "#!/bin/bash\necho \"$1\"");
let cmd_dir = CommandDir::new(temp.path());
let result = cmd_dir
.run_command_with_timeout("fast_cmd", "quick", Duration::from_secs(5))
.await;
assert!(result.is_ok());
}
#[tokio::test]
async fn test_run_command_with_timeout_expires() {
let temp = TempDir::new().unwrap();
create_test_script(temp.path(), "slow_cmd", "#!/bin/bash\nsleep 10\necho done");
let cmd_dir = CommandDir::new(temp.path());
let result = cmd_dir
.run_command_with_timeout("slow_cmd", "input", Duration::from_millis(100))
.await;
assert!(result.is_err());
}
}

View File

@@ -1,22 +0,0 @@
use color_eyre::Result;
use std::path::{
Path,
PathBuf,
};
#[derive(Clone, Debug)]
pub struct Root {
path: PathBuf,
}
impl Root {
pub fn new(path: impl AsRef<Path>) -> Self {
Root {
path: path.as_ref().to_owned(),
}
}
pub fn run_command(cmd_string: impl AsRef<str>) -> Result<()> {
todo!();
}
}

32
src/event.rs Normal file
View File

@@ -0,0 +1,32 @@
use irc::proto::{Command, Message};
use serde::{Deserialize, Serialize};
#[derive(Deserialize, Serialize)]
pub struct Event {
from: String,
message: String,
}
impl Event {
pub fn new(from: impl Into<String>, msg: impl Into<String>) -> Self {
Self {
from: from.into(),
message: msg.into(),
}
}
}
impl TryFrom<&Message> for Event {
type Error = &'static str;
fn try_from(value: &Message) -> Result<Self, Self::Error> {
let from = value.response_target().unwrap_or("unknown").to_string();
match &value.command {
Command::PRIVMSG(_channel, message) => Ok(Event {
from,
message: message.clone(),
}),
_ => Err("Not a PRIVMSG"),
}
}
}

575
src/event_manager.rs Normal file
View File

@@ -0,0 +1,575 @@
use std::{collections::VecDeque, path::Path, sync::Arc};
use color_eyre::Result;
use nix::{NixPath, sys::stat, unistd::mkfifo};
use tokio::{
io::{AsyncBufReadExt, AsyncWriteExt, BufReader},
net::{UnixListener, UnixStream, unix::pipe},
sync::{RwLock, broadcast, mpsc},
};
use tracing::{error, info};
use crate::{event::Event, plugin::Plugin};
// Hard coding for now. Maybe make this a parameter to new.
const EVENT_BUF_MAX: usize = 1000;
// Manager for communication with plugins.
#[derive(Debug)]
pub struct EventManager {
announce: broadcast::Sender<String>, // Everything broadcasts here.
events: Arc<RwLock<VecDeque<String>>>, // Ring buffer.
}
impl EventManager {
pub fn new() -> Result<Self> {
let (announce, _) = broadcast::channel(100);
Ok(Self {
announce,
events: Arc::new(RwLock::new(VecDeque::<String>::new())),
})
}
pub async fn broadcast(&self, event: &Event) -> Result<()> {
let msg = serde_json::to_string(event)? + "\n";
let mut events = self.events.write().await;
if events.len() >= EVENT_BUF_MAX {
events.pop_front();
}
events.push_back(msg.clone());
drop(events);
let _ = self.announce.send(msg);
Ok(())
}
// NB: This assumes it has exclusive control of the FIFO.
pub async fn start_fifo<P>(path: &P, command_tx: mpsc::Sender<Plugin>) -> Result<()>
where
P: AsRef<Path> + NixPath + ?Sized,
{
// Overwrite, or create the FIFO.
let _ = std::fs::remove_file(path);
mkfifo(path, stat::Mode::S_IRWXU)?;
loop {
let rx = pipe::OpenOptions::new().open_receiver(path)?;
let mut reader = BufReader::new(rx);
let mut line = String::new();
while reader.read_line(&mut line).await? > 0 {
// Now handle the command.
let cmd: Plugin = serde_json::from_str(&line)?;
info!("Command received: {:?}.", cmd);
command_tx.send(cmd).await?;
line.clear();
}
}
}
pub async fn start_listening(self: Arc<Self>, broadcast_path: impl AsRef<Path>) {
let listener = UnixListener::bind(broadcast_path).unwrap();
loop {
match listener.accept().await {
Ok((stream, _addr)) => {
info!("New broadcast subscriber");
// Spawn a new stream for the plugin. The loop
// runs recursively from there.
let broadcaster = Arc::clone(&self);
tokio::spawn(async move {
// send events.
let _ = broadcaster.send_events(stream).await;
});
}
Err(e) => error!("Accept error: {e}"),
}
}
}
async fn send_events(&self, stream: UnixStream) -> Result<()> {
let mut writer = stream;
// Take care of history.
let events = self.events.read().await;
for event in events.iter() {
writer.write_all(event.as_bytes()).await?;
}
drop(events);
// Now just broadcast the new events.
let mut rx = self.announce.subscribe();
while let Ok(event) = rx.recv().await {
if writer.write_all(event.as_bytes()).await.is_err() {
// *click*
break;
}
}
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
use rstest::rstest;
#[tokio::test]
async fn test_new_event_manager_has_empty_buffer() {
let manager = EventManager::new().unwrap();
let events = manager.events.read().await;
assert_eq!(events.len(), 0);
}
#[tokio::test]
async fn test_broadcast_adds_event_to_buffer() {
let manager = EventManager::new().unwrap();
let event = Event::new("test_user", "test message");
manager.broadcast(&event).await.unwrap();
let events = manager.events.read().await;
assert_eq!(events.len(), 1);
assert!(events[0].contains("test message"));
assert!(events[0].ends_with('\n'));
}
#[tokio::test]
async fn test_broadcast_serializes_event_as_json() {
let manager = EventManager::new().unwrap();
let event = Event::new("test_user", "hello world");
manager.broadcast(&event).await.unwrap();
let events = manager.events.read().await;
let stored = &events[0];
// Should be valid JSON
let parsed: serde_json::Value = serde_json::from_str(stored.trim()).unwrap();
assert_eq!(parsed["message"], "hello world");
}
#[rstest]
#[case(1)]
#[case(10)]
#[case(100)]
#[case(999)]
#[tokio::test]
async fn test_buffer_holds_events_below_max(#[case] count: usize) {
let manager = EventManager::new().unwrap();
for i in 0..count {
let event = Event::new("test_user", format!("event {}", i));
manager.broadcast(&event).await.unwrap();
}
let events = manager.events.read().await;
assert_eq!(events.len(), count);
}
#[tokio::test]
async fn test_buffer_at_exactly_max_capacity() {
let manager = EventManager::new().unwrap();
// Fill to exactly EVENT_BUF_MAX (1000)
for i in 0..EVENT_BUF_MAX {
let event = Event::new("test_user", format!("event {}", i));
manager.broadcast(&event).await.unwrap();
}
let events = manager.events.read().await;
assert_eq!(events.len(), EVENT_BUF_MAX);
assert!(events[0].contains("event 0"));
assert!(events[EVENT_BUF_MAX - 1].contains("event 999"));
}
#[rstest]
#[case(1)]
#[case(10)]
#[case(100)]
#[case(500)]
#[tokio::test]
async fn test_buffer_overflow_evicts_oldest_fifo(#[case] overflow: usize) {
let manager = EventManager::new().unwrap();
let total = EVENT_BUF_MAX + overflow;
// Broadcast more events than buffer can hold
for i in 0..total {
let event = Event::new("test_user", format!("event {}", i));
manager.broadcast(&event).await.unwrap();
}
let events = manager.events.read().await;
// Buffer should still be at max capacity
assert_eq!(events.len(), EVENT_BUF_MAX);
// Oldest events (0 through overflow-1) should be evicted
// Buffer should contain events [overflow..total)
let first_event = &events[0];
let last_event = &events[EVENT_BUF_MAX - 1];
assert!(first_event.contains(&format!("event {}", overflow)));
assert!(last_event.contains(&format!("event {}", total - 1)));
// Verify the evicted events are NOT in the buffer
let buffer_string = events.iter().cloned().collect::<String>();
assert!(!buffer_string.contains(r#""message":"event 0""#));
}
#[tokio::test]
async fn test_multiple_broadcasts_maintain_order() {
let manager = EventManager::new().unwrap();
let messages = vec!["first", "second", "third", "fourth", "fifth"];
for msg in &messages {
let event = Event::new("test_user", *msg);
manager.broadcast(&event).await.unwrap();
}
let events = manager.events.read().await;
assert_eq!(events.len(), messages.len());
for (i, expected) in messages.iter().enumerate() {
assert!(events[i].contains(expected));
}
}
#[tokio::test]
async fn test_buffer_wraparound_maintains_newest_events() {
let manager = EventManager::new().unwrap();
// Fill buffer completely
for i in 0..EVENT_BUF_MAX {
let event = Event::new("test_user", format!("old {}", i));
manager.broadcast(&event).await.unwrap();
}
// Add 5 more events
for i in 0..5 {
let event = Event::new("test_user", format!("new {}", i));
manager.broadcast(&event).await.unwrap();
}
let events = manager.events.read().await;
assert_eq!(events.len(), EVENT_BUF_MAX);
// First 5 old events should be gone
let buffer_string = events.iter().cloned().collect::<String>();
assert!(!buffer_string.contains(r#""message":"old 0""#));
assert!(!buffer_string.contains(r#""message":"old 4""#));
// But old 5 should still be there (now at the front)
assert!(events[0].contains("old 5"));
// New events should be at the end
assert!(events[EVENT_BUF_MAX - 5].contains("new 0"));
assert!(events[EVENT_BUF_MAX - 1].contains("new 4"));
}
#[tokio::test]
async fn test_concurrent_broadcasts_all_stored() {
let manager = Arc::new(EventManager::new().unwrap());
let mut handles = vec![];
// Spawn 10 concurrent tasks, each broadcasting 10 events
for task_id in 0..10 {
let manager_clone = Arc::clone(&manager);
let handle = tokio::spawn(async move {
for i in 0..10 {
let event = Event::new("test_user", format!("task {} event {}", task_id, i));
manager_clone.broadcast(&event).await.unwrap();
}
});
handles.push(handle);
}
// Wait for all tasks to complete
for handle in handles {
handle.await.unwrap();
}
let events = manager.events.read().await;
assert_eq!(events.len(), 100);
}
#[tokio::test]
async fn test_fifo_receives_and_forwards_single_command() {
let temp_dir = tempfile::tempdir().unwrap();
let fifo_path = temp_dir.path().join("test.fifo");
let (tx, mut rx) = mpsc::channel(10);
// Spawn the FIFO reader
let path = fifo_path.clone();
tokio::spawn(async move {
let _ = EventManager::start_fifo(&path, tx).await;
});
// Give it time to create the FIFO
tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
// Write a command to the FIFO
let cmd = Plugin::SendMessage {
channel: "#test".to_string(),
message: "hello".to_string(),
};
let json = serde_json::to_string(&cmd).unwrap() + "\n";
// Open FIFO for writing and write the command
tokio::spawn(async move {
use tokio::io::AsyncWriteExt;
let tx = pipe::OpenOptions::new().open_sender(&fifo_path).unwrap();
let mut tx = tokio::io::BufWriter::new(tx);
tx.write_all(json.as_bytes()).await.unwrap();
tx.flush().await.unwrap();
});
// Should receive the command within a reasonable time
let received = tokio::time::timeout(tokio::time::Duration::from_secs(1), rx.recv())
.await
.expect("timeout waiting for command")
.expect("channel closed");
match received {
Plugin::SendMessage { channel, message } => {
assert_eq!(channel, "#test");
assert_eq!(message, "hello");
}
}
}
#[tokio::test]
async fn test_fifo_handles_multiple_commands() {
let temp_dir = tempfile::tempdir().unwrap();
let fifo_path = temp_dir.path().join("test.fifo");
let (tx, mut rx) = mpsc::channel(10);
// Spawn the FIFO reader
let path = fifo_path.clone();
tokio::spawn(async move {
let _ = EventManager::start_fifo(&path, tx).await;
});
// Give it time to create the FIFO
tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
// Write multiple commands
let commands = vec![
Plugin::SendMessage {
channel: "#chan1".to_string(),
message: "first".to_string(),
},
Plugin::SendMessage {
channel: "#chan2".to_string(),
message: "second".to_string(),
},
Plugin::SendMessage {
channel: "#chan3".to_string(),
message: "third".to_string(),
},
];
tokio::spawn(async move {
use tokio::io::AsyncWriteExt;
let tx = pipe::OpenOptions::new().open_sender(&fifo_path).unwrap();
let mut tx = tokio::io::BufWriter::new(tx);
for cmd in commands {
let json = serde_json::to_string(&cmd).unwrap() + "\n";
tx.write_all(json.as_bytes()).await.unwrap();
}
tx.flush().await.unwrap();
});
// Receive all three commands in order
let first = tokio::time::timeout(tokio::time::Duration::from_secs(1), rx.recv())
.await
.expect("timeout on first")
.expect("channel closed");
match first {
Plugin::SendMessage { channel, message } => {
assert_eq!(channel, "#chan1");
assert_eq!(message, "first");
}
}
let second = tokio::time::timeout(tokio::time::Duration::from_secs(1), rx.recv())
.await
.expect("timeout on second")
.expect("channel closed");
match second {
Plugin::SendMessage { channel, message } => {
assert_eq!(channel, "#chan2");
assert_eq!(message, "second");
}
}
let third = tokio::time::timeout(tokio::time::Duration::from_secs(1), rx.recv())
.await
.expect("timeout on third")
.expect("channel closed");
match third {
Plugin::SendMessage { channel, message } => {
assert_eq!(channel, "#chan3");
assert_eq!(message, "third");
}
}
}
#[tokio::test]
async fn test_fifo_reopens_after_writer_closes() {
let temp_dir = tempfile::tempdir().unwrap();
let fifo_path = temp_dir.path().join("test.fifo");
let (tx, mut rx) = mpsc::channel(10);
// Spawn the FIFO reader
let path = fifo_path.clone();
tokio::spawn(async move {
let _ = EventManager::start_fifo(&path, tx).await;
});
// Give it time to create the FIFO
tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
// First writer sends a command and closes
{
use tokio::io::AsyncWriteExt;
let path = fifo_path.clone();
tokio::spawn(async move {
let tx = pipe::OpenOptions::new().open_sender(&path).unwrap();
let mut tx = tokio::io::BufWriter::new(tx);
let cmd = Plugin::SendMessage {
channel: "#first".to_string(),
message: "batch1".to_string(),
};
let json = serde_json::to_string(&cmd).unwrap() + "\n";
tx.write_all(json.as_bytes()).await.unwrap();
tx.flush().await.unwrap();
// Writer drops here, closing the FIFO
});
let first = tokio::time::timeout(tokio::time::Duration::from_secs(1), rx.recv())
.await
.expect("timeout on first batch")
.expect("channel closed");
match first {
Plugin::SendMessage { channel, message } => {
assert_eq!(channel, "#first");
assert_eq!(message, "batch1");
}
}
}
// Give the FIFO time to reopen
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
// Second writer opens and sends a command
{
use tokio::io::AsyncWriteExt;
tokio::spawn(async move {
let tx = pipe::OpenOptions::new().open_sender(&fifo_path).unwrap();
let mut tx = tokio::io::BufWriter::new(tx);
let cmd = Plugin::SendMessage {
channel: "#second".to_string(),
message: "batch2".to_string(),
};
let json = serde_json::to_string(&cmd).unwrap() + "\n";
tx.write_all(json.as_bytes()).await.unwrap();
tx.flush().await.unwrap();
});
let second = tokio::time::timeout(tokio::time::Duration::from_secs(1), rx.recv())
.await
.expect("timeout on second batch - FIFO may not have reopened")
.expect("channel closed");
match second {
Plugin::SendMessage { channel, message } => {
assert_eq!(channel, "#second");
assert_eq!(message, "batch2");
}
}
}
}
#[tokio::test]
async fn test_fifo_handles_empty_lines() {
let temp_dir = tempfile::tempdir().unwrap();
let fifo_path = temp_dir.path().join("test.fifo");
let (tx, mut rx) = mpsc::channel(10);
// Spawn the FIFO reader
let path = fifo_path.clone();
let handle = tokio::spawn(async move { EventManager::start_fifo(&path, tx).await });
// Give it time to create the FIFO
tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
// Write command, empty line, whitespace, another command
tokio::spawn(async move {
use tokio::io::AsyncWriteExt;
let tx = pipe::OpenOptions::new().open_sender(&fifo_path).unwrap();
let mut tx = tokio::io::BufWriter::new(tx);
let cmd1 = Plugin::SendMessage {
channel: "#test".to_string(),
message: "first".to_string(),
};
let json1 = serde_json::to_string(&cmd1).unwrap() + "\n";
tx.write_all(json1.as_bytes()).await.unwrap();
// Write empty line
tx.write_all(b"\n").await.unwrap();
// Write whitespace line
tx.write_all(b" \n").await.unwrap();
let cmd2 = Plugin::SendMessage {
channel: "#test".to_string(),
message: "second".to_string(),
};
let json2 = serde_json::to_string(&cmd2).unwrap() + "\n";
tx.write_all(json2.as_bytes()).await.unwrap();
tx.flush().await.unwrap();
});
// Should receive first command
let first = tokio::time::timeout(tokio::time::Duration::from_millis(500), rx.recv())
.await
.expect("timeout on first")
.expect("channel closed");
match first {
Plugin::SendMessage { channel, message } => {
assert_eq!(channel, "#test");
assert_eq!(message, "first");
}
}
// The empty/whitespace lines should cause JSON parse errors
// which will cause start_fifo to error and exit
// So we expect the handle to complete (with an error)
let result = tokio::time::timeout(tokio::time::Duration::from_secs(1), handle)
.await
.expect("FIFO task should exit due to parse error");
// The task should have errored
assert!(
result.unwrap().is_err(),
"Expected parse error from empty line"
);
}
}

92
src/lib.rs Normal file
View File

@@ -0,0 +1,92 @@
// Robotnik libraries
use std::{os::unix::fs, sync::Arc};
use color_eyre::{Result, eyre::WrapErr};
use human_panic::setup_panic;
use tokio::sync::mpsc;
use tracing::{Level, info};
use tracing_subscriber::FmtSubscriber;
pub mod chat;
pub mod command;
pub mod event;
pub mod event_manager;
pub mod plugin;
pub mod qna;
pub mod setup;
pub use event::Event;
pub use event_manager::EventManager;
pub use qna::LLMHandle;
const DEFAULT_INSTRUCT: &str =
"You are a shady, yet helpful IRC bot. You try to give responses that can
be sent in a single IRC response according to the specification. Keep answers to
500 characters or less.";
// NB: Everything should fail if logging doesn't start properly.
async fn init_logging() {
better_panic::install();
setup_panic!();
let subscriber = FmtSubscriber::builder()
.with_max_level(Level::TRACE)
.finish();
tracing::subscriber::set_global_default(subscriber).unwrap();
}
pub async fn run() -> Result<()> {
init_logging().await;
info!("Starting up.");
let settings = setup::init().await.wrap_err("Failed to initialize.")?;
let config = settings.config;
// NOTE: Doing chroot this way might be impractical.
if let Ok(chroot_path) = config.get_string("chroot-dir") {
info!("Attempting to chroot to {}", chroot_path);
fs::chroot(&chroot_path)
.wrap_err_with(|| format!("Failed setting chroot '{}'", chroot_path))?;
std::env::set_current_dir("/").wrap_err("Couldn't change directory after chroot.")?;
}
let handle = qna::LLMHandle::new(
config.get_string("api-key").wrap_err("API missing.")?,
config
.get_string("base-url")
.wrap_err("base-url missing.")?,
config
.get_string("model")
.wrap_err("model string missing.")?,
config
.get_string("instruct")
.unwrap_or_else(|_| DEFAULT_INSTRUCT.to_string()),
)
.wrap_err("Couldn't initialize LLM handle.")?;
let ev_manager = Arc::new(EventManager::new()?);
let ev_manager_clone = Arc::clone(&ev_manager);
let mut c = chat::new(&config, &handle, Arc::clone(&ev_manager)).await?;
let (from_plugins, to_chat) = mpsc::channel(100);
tokio::select! {
_ = ev_manager_clone.start_listening("/tmp/robo.sock") => {
// Event listener ended
}
result = c.run(to_chat) => {
if let Err(e) = result {
tracing::error!("Chat run error: {:?}", e);
return Err(e);
}
}
fifo = EventManager::start_fifo("/tmp/robo_in.sock", from_plugins) => {
fifo.wrap_err("FIFO reader failed.")?;
}
}
Ok(())
}

View File

@@ -1,75 +1,6 @@
use color_eyre::{
Result,
eyre::WrapErr,
};
use human_panic::setup_panic;
use std::os::unix::fs;
use tracing::{
Level,
info,
};
use tracing_subscriber::FmtSubscriber;
mod chat;
mod commands;
mod qna;
mod setup;
const DEFAULT_INSTRUCT: &str =
"You are a shady, yet helpful IRC bot. You try to give responses that can
be sent in a single IRC response according to the specification. Keep answers to
500 characters or less.";
use color_eyre::Result;
#[tokio::main]
async fn main() -> Result<()> {
// Some error sprucing.
better_panic::install();
setup_panic!();
let subscriber = FmtSubscriber::builder()
.with_max_level(Level::TRACE)
.finish();
tracing::subscriber::set_global_default(subscriber)
.wrap_err("Failed to setup trace logging.")?;
info!("Starting");
let settings = setup::init().await.wrap_err("Failed to initialize.")?;
let config = settings.config;
// chroot if applicable.
if let Ok(chroot_path) = config.get_string("chroot-dir") {
info!("Attempting to chroot to {}", chroot_path);
fs::chroot(&chroot_path)
.wrap_err_with(|| format!("Failed setting chroot '{}'", chroot_path))?;
std::env::set_current_dir("/").wrap_err("Couldn't change directory after chroot.")?;
}
// Setup root path for commands.
let cmd_root = if let Ok(command_path) = config.get_string("command-path") {
Some(commands::Root::new(command_path))
} else {
None
};
let handle = qna::LLMHandle::new(
config.get_string("api-key").wrap_err("API missing.")?,
config
.get_string("base-url")
.wrap_err("base-url missing.")?,
cmd_root,
config
.get_string("model")
.wrap_err("model string missing.")?,
config
.get_string("instruct")
.unwrap_or_else(|_| DEFAULT_INSTRUCT.to_string()),
)
.wrap_err("Couldn't initialize LLM handle.")?;
let mut c = chat::new(&config, &handle).await?;
c.run().await.unwrap();
Ok(())
robotnik::run().await
}

18
src/plugin.rs Normal file
View File

@@ -0,0 +1,18 @@
use std::fmt::Display;
use serde::{Deserialize, Serialize};
#[derive(Debug, Deserialize, Serialize)]
pub enum Plugin {
SendMessage { channel: String, message: String },
}
impl Display for Plugin {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::SendMessage { channel, message } => {
write!(f, "[{channel}]: {message}")
}
}
}
}

View File

@@ -1,19 +1,10 @@
use crate::commands;
use color_eyre::Result;
use futures::StreamExt;
use genai::{
Client,
ModelIden,
chat::{
ChatMessage,
ChatRequest,
ChatStreamEvent,
StreamChunk,
},
resolver::{
AuthData,
AuthResolver,
},
chat::{ChatMessage, ChatRequest, ChatStreamEvent, StreamChunk},
resolver::{AuthData, AuthResolver},
};
use tracing::info;
@@ -23,7 +14,6 @@ use tracing::info;
pub struct LLMHandle {
chat_request: ChatRequest,
client: Client,
cmd_root: Option<commands::Root>,
model: String,
}
@@ -31,7 +21,6 @@ impl LLMHandle {
pub fn new(
api_key: String,
_base_url: impl AsRef<str>,
cmd_root: Option<commands::Root>,
model: impl Into<String>,
system_role: String,
) -> Result<LLMHandle> {
@@ -51,7 +40,6 @@ impl LLMHandle {
Ok(LLMHandle {
client,
chat_request,
cmd_root,
model: model.into(),
})
}

View File

@@ -1,78 +1,72 @@
use clap::Parser;
use color_eyre::{
Result,
eyre::WrapErr,
};
use color_eyre::{Result, eyre::WrapErr};
use config::Config;
use directories::ProjectDirs;
use std::path::PathBuf;
use tracing::{
info,
instrument,
};
use tracing::{info, instrument};
// TODO: use [clap(long, short, help_heading = Some(section))]
#[derive(Clone, Debug, Parser)]
#[command(about, version)]
pub(crate) struct Args {
pub struct Args {
#[arg(short, long)]
/// API Key for the LLM in use.
pub(crate) api_key: Option<String>,
pub api_key: Option<String>,
#[arg(short, long, default_value = "https://api.openai.com")]
/// Base URL for the LLM API to use.
pub(crate) base_url: Option<String>,
pub base_url: Option<String>,
/// Directory to use for chroot (recommended).
#[arg(long)]
pub(crate) chroot_dir: Option<String>,
pub chroot_dir: Option<String>,
/// Root directory for file based command structure.
#[arg(long)]
pub(crate) command_dir: Option<String>,
pub command_dir: Option<String>,
#[arg(long)]
/// Instructions to the model on how to behave.
pub(crate) instruct: Option<String>,
pub instruct: Option<String>,
#[arg(long)]
pub(crate) model: Option<String>,
pub model: Option<String>,
#[arg(long)]
/// List of IRC channels to join.
pub(crate) channels: Option<Vec<String>>,
pub channels: Option<Vec<String>>,
#[arg(short, long)]
/// Custom configuration file location if need be.
pub(crate) config_file: Option<PathBuf>,
pub config_file: Option<PathBuf>,
#[arg(short, long, default_value = "irc.libera.chat")]
/// IRC server.
pub(crate) server: Option<String>,
pub server: Option<String>,
#[arg(short, long, default_value = "6697")]
/// Port of the IRC server.
pub(crate) port: Option<String>,
pub port: Option<String>,
#[arg(long)]
/// IRC Nickname.
pub(crate) nickname: Option<String>,
pub nickname: Option<String>,
#[arg(long)]
/// IRC Nick Password
pub(crate) nick_password: Option<String>,
pub nick_password: Option<String>,
#[arg(long)]
/// IRC Username
pub(crate) username: Option<String>,
pub username: Option<String>,
#[arg(long)]
/// Whether or not to use TLS when connecting to the IRC server.
pub(crate) use_tls: Option<bool>,
pub use_tls: Option<bool>,
}
pub(crate) struct Setup {
pub(crate) config: Config,
pub struct Setup {
pub config: Config,
}
#[instrument]

290
tests/command_test.rs Normal file
View File

@@ -0,0 +1,290 @@
use std::{
fs::{self, Permissions},
os::unix::fs::PermissionsExt,
path::Path,
time::Duration,
};
use robotnik::command::CommandDir;
use tempfile::TempDir;
/// Helper to create executable test scripts
fn create_command(dir: &Path, name: &str, script: &str) {
let path = dir.join(name);
fs::write(&path, script).unwrap();
fs::set_permissions(&path, Permissions::from_mode(0o755)).unwrap();
}
/// Parse a bot message like "!weather 73135" into (command_name, argument)
fn parse_bot_message(message: &str) -> Option<(&str, &str)> {
if !message.starts_with('!') {
return None;
}
let without_prefix = &message[1..];
let mut parts = without_prefix.splitn(2, ' ');
let command = parts.next()?;
let arg = parts.next().unwrap_or("");
Some((command, arg))
}
#[tokio::test]
async fn test_bot_message_finds_and_runs_command() {
let temp = TempDir::new().unwrap();
// Create a weather command that echoes the zip code
create_command(
temp.path(),
"weather",
r#"#!/bin/bash
echo "Weather for $1: Sunny, 72°F"
"#,
);
let cmd_dir = CommandDir::new(temp.path());
let message = "!weather 73135";
// Parse the message
let (command_name, arg) = parse_bot_message(message).unwrap();
assert_eq!(command_name, "weather");
assert_eq!(arg, "73135");
// Find and run the command
let result = cmd_dir.run_command(command_name, arg).await;
assert!(result.is_ok());
let bytes = result.unwrap();
let output = String::from_utf8_lossy(&bytes);
assert!(output.contains("Weather for 73135"));
assert!(output.contains("Sunny"));
}
#[tokio::test]
async fn test_bot_message_command_not_found() {
let temp = TempDir::new().unwrap();
let cmd_dir = CommandDir::new(temp.path());
let message = "!nonexistent arg";
let (command_name, arg) = parse_bot_message(message).unwrap();
let result = cmd_dir.run_command(command_name, arg).await;
assert!(result.is_err());
assert!(result.unwrap_err().to_string().contains("Not found"));
}
#[tokio::test]
async fn test_bot_message_with_multiple_arguments() {
let temp = TempDir::new().unwrap();
// Create a command that handles multiple words as a single argument
create_command(
temp.path(),
"echo",
r#"#!/bin/bash
echo "You said: $1"
"#,
);
let cmd_dir = CommandDir::new(temp.path());
let message = "!echo hello world how are you";
let (command_name, arg) = parse_bot_message(message).unwrap();
assert_eq!(command_name, "echo");
assert_eq!(arg, "hello world how are you");
let result = cmd_dir.run_command(command_name, arg).await;
assert!(result.is_ok());
let bytes = result.unwrap();
let output = String::from_utf8_lossy(&bytes);
assert!(output.contains("hello world how are you"));
}
#[tokio::test]
async fn test_bot_message_without_argument() {
let temp = TempDir::new().unwrap();
create_command(
temp.path(),
"help",
r#"#!/bin/bash
echo "Available commands: weather, echo, help"
"#,
);
let cmd_dir = CommandDir::new(temp.path());
let message = "!help";
let (command_name, arg) = parse_bot_message(message).unwrap();
assert_eq!(command_name, "help");
assert_eq!(arg, "");
let result = cmd_dir.run_command(command_name, arg).await;
assert!(result.is_ok());
let bytes = result.unwrap();
let output = String::from_utf8_lossy(&bytes);
assert!(output.contains("Available commands"));
}
#[tokio::test]
async fn test_bot_message_command_returns_error_exit_code() {
let temp = TempDir::new().unwrap();
// Create a command that fails for invalid input
create_command(
temp.path(),
"validate",
r#"#!/bin/bash
if [ -z "$1" ]; then
echo "Error: Input required" >&2
exit 1
fi
echo "Valid: $1"
"#,
);
let cmd_dir = CommandDir::new(temp.path());
let message = "!validate";
let (command_name, arg) = parse_bot_message(message).unwrap();
let result = cmd_dir.run_command(command_name, arg).await;
assert!(result.is_err());
assert!(result.unwrap_err().to_string().contains("Error running"));
}
#[tokio::test]
async fn test_bot_message_with_timeout() {
let temp = TempDir::new().unwrap();
create_command(
temp.path(),
"quick",
r#"#!/bin/bash
echo "Result: $1"
"#,
);
let cmd_dir = CommandDir::new(temp.path());
let message = "!quick test";
let (command_name, arg) = parse_bot_message(message).unwrap();
let result = cmd_dir
.run_command_with_timeout(command_name, arg, Duration::from_secs(5))
.await;
assert!(result.is_ok());
}
#[tokio::test]
async fn test_bot_message_command_times_out() {
let temp = TempDir::new().unwrap();
create_command(
temp.path(),
"slow",
r#"#!/bin/bash
sleep 10
echo "Done"
"#,
);
let cmd_dir = CommandDir::new(temp.path());
let message = "!slow arg";
let (command_name, arg) = parse_bot_message(message).unwrap();
let result = cmd_dir
.run_command_with_timeout(command_name, arg, Duration::from_millis(100))
.await;
assert!(result.is_err());
}
#[tokio::test]
async fn test_multiple_commands_in_directory() {
let temp = TempDir::new().unwrap();
create_command(
temp.path(),
"weather",
r#"#!/bin/bash
echo "Weather: Sunny"
"#,
);
create_command(
temp.path(),
"time",
r#"#!/bin/bash
echo "Time: 12:00"
"#,
);
create_command(
temp.path(),
"joke",
r#"#!/bin/bash
echo "Why did the robot go on vacation? To recharge!"
"#,
);
let cmd_dir = CommandDir::new(temp.path());
// Test each command
let messages = ["!weather", "!time", "!joke"];
let expected = ["Sunny", "12:00", "recharge"];
for (message, expect) in messages.iter().zip(expected.iter()) {
let (command_name, arg) = parse_bot_message(message).unwrap();
let result = cmd_dir.run_command(command_name, arg).await;
assert!(result.is_ok());
let bytes = result.unwrap();
let output = String::from_utf8_lossy(&bytes);
assert!(
output.contains(expect),
"Expected '{}' in '{}'",
expect,
output
);
}
}
#[tokio::test]
async fn test_non_bot_message_ignored() {
// Messages not starting with ! should be ignored
let messages = ["hello world", "weather 73135", "?help", "/command", ""];
for message in messages {
assert!(
parse_bot_message(message).is_none(),
"Should ignore: {}",
message
);
}
}
#[tokio::test]
async fn test_command_output_is_bytes() {
let temp = TempDir::new().unwrap();
// Create a command that outputs binary-safe content
create_command(
temp.path(),
"binary",
r#"#!/bin/bash
printf "Hello\x00World"
"#,
);
let cmd_dir = CommandDir::new(temp.path());
let message = "!binary test";
let (command_name, arg) = parse_bot_message(message).unwrap();
let result = cmd_dir.run_command(command_name, arg).await;
assert!(result.is_ok());
let output = result.unwrap();
// Should preserve the null byte
assert_eq!(&output[..], b"Hello\x00World");
}

495
tests/event_test.rs Normal file
View File

@@ -0,0 +1,495 @@
use std::{sync::Arc, time::Duration};
use robotnik::{event::Event, event_manager::EventManager};
use rstest::rstest;
use tokio::{
io::{AsyncBufReadExt, BufReader},
net::UnixStream,
time::timeout,
};
const TEST_SOCKET_BASE: &str = "/tmp/robotnik_test";
/// Helper to create unique socket paths for parallel tests
fn test_socket_path(name: &str) -> String {
format!("{}_{}_{}", TEST_SOCKET_BASE, name, std::process::id())
}
/// Helper to read one JSON event from a stream
async fn read_event(
reader: &mut BufReader<UnixStream>,
) -> Result<Event, Box<dyn std::error::Error>> {
let mut line = String::new();
reader.read_line(&mut line).await?;
let event: Event = serde_json::from_str(&line)?;
Ok(event)
}
/// Helper to read all available events with a timeout
async fn read_events_with_timeout(
reader: &mut BufReader<UnixStream>,
max_count: usize,
timeout_ms: u64,
) -> Vec<String> {
let mut events = Vec::new();
for _ in 0..max_count {
let mut line = String::new();
match timeout(
Duration::from_millis(timeout_ms),
reader.read_line(&mut line),
)
.await
{
Ok(Ok(0)) => break, // EOF
Ok(Ok(_)) => events.push(line),
Ok(Err(_)) => break, // Read error
Err(_) => break, // Timeout
}
}
events
}
#[tokio::test]
async fn test_client_connects_and_receives_event() {
let socket_path = test_socket_path("basic_connect");
let manager = Arc::new(EventManager::new().unwrap());
// Start the listener
let listener_manager = Arc::clone(&manager);
let socket_path_clone = socket_path.clone();
tokio::spawn(async move {
listener_manager.start_listening(socket_path_clone).await;
});
// Give the listener time to start
tokio::time::sleep(Duration::from_millis(50)).await;
// Broadcast an event
let event = Event::new("test_user", "test message");
manager.broadcast(&event).await.unwrap();
// Connect as a client
let stream = UnixStream::connect(&socket_path).await.unwrap();
let mut reader = BufReader::new(stream);
// Read the event
let mut line = String::new();
reader.read_line(&mut line).await.unwrap();
assert!(line.contains("test message"));
assert!(line.ends_with('\n'));
// Cleanup
let _ = std::fs::remove_file(&socket_path);
}
#[tokio::test]
async fn test_client_receives_event_history() {
let socket_path = test_socket_path("event_history");
let manager = Arc::new(EventManager::new().unwrap());
// Broadcast events BEFORE starting the listener
for i in 0..5 {
let event = Event::new("test_user", format!("historical event {}", i));
manager.broadcast(&event).await.unwrap();
}
// Start the listener
let listener_manager = Arc::clone(&manager);
let socket_path_clone = socket_path.clone();
tokio::spawn(async move {
listener_manager.start_listening(socket_path_clone).await;
});
tokio::time::sleep(Duration::from_millis(50)).await;
// Connect as a client
let stream = UnixStream::connect(&socket_path).await.unwrap();
let mut reader = BufReader::new(stream);
// Should receive all 5 historical events
let events = read_events_with_timeout(&mut reader, 5, 100).await;
assert_eq!(events.len(), 5);
assert!(events[0].contains("historical event 0"));
assert!(events[4].contains("historical event 4"));
// Cleanup
let _ = std::fs::remove_file(&socket_path);
}
#[tokio::test]
async fn test_multiple_clients_receive_same_events() {
let socket_path = test_socket_path("multiple_clients");
let manager = Arc::new(EventManager::new().unwrap());
// Start the listener
let listener_manager = Arc::clone(&manager);
let socket_path_clone = socket_path.clone();
tokio::spawn(async move {
listener_manager.start_listening(socket_path_clone).await;
});
tokio::time::sleep(Duration::from_millis(50)).await;
// Connect 3 clients
let stream1 = UnixStream::connect(&socket_path).await.unwrap();
let stream2 = UnixStream::connect(&socket_path).await.unwrap();
let stream3 = UnixStream::connect(&socket_path).await.unwrap();
let mut reader1 = BufReader::new(stream1);
let mut reader2 = BufReader::new(stream2);
let mut reader3 = BufReader::new(stream3);
// Broadcast a new event
let event = Event::new("test_user", "broadcast to all");
manager.broadcast(&event).await.unwrap();
// All clients should receive the event
let mut line1 = String::new();
let mut line2 = String::new();
let mut line3 = String::new();
timeout(Duration::from_millis(100), reader1.read_line(&mut line1))
.await
.unwrap()
.unwrap();
timeout(Duration::from_millis(100), reader2.read_line(&mut line2))
.await
.unwrap()
.unwrap();
timeout(Duration::from_millis(100), reader3.read_line(&mut line3))
.await
.unwrap()
.unwrap();
assert!(line1.contains("broadcast to all"));
assert!(line2.contains("broadcast to all"));
assert!(line3.contains("broadcast to all"));
// Cleanup
let _ = std::fs::remove_file(&socket_path);
}
#[tokio::test]
async fn test_late_joiner_receives_full_history() {
let socket_path = test_socket_path("late_joiner");
let manager = Arc::new(EventManager::new().unwrap());
// Start the listener
let listener_manager = Arc::clone(&manager);
let socket_path_clone = socket_path.clone();
tokio::spawn(async move {
listener_manager.start_listening(socket_path_clone).await;
});
tokio::time::sleep(Duration::from_millis(50)).await;
// First client connects
let stream1 = UnixStream::connect(&socket_path).await.unwrap();
let mut reader1 = BufReader::new(stream1);
// Broadcast several events
for i in 0..10 {
let event = Event::new("test_user", format!("event {}", i));
manager.broadcast(&event).await.unwrap();
}
// Consume events from first client
let _ = read_events_with_timeout(&mut reader1, 10, 100).await;
// Late joiner connects
let stream2 = UnixStream::connect(&socket_path).await.unwrap();
let mut reader2 = BufReader::new(stream2);
// Late joiner should receive all 10 events from history
let events = read_events_with_timeout(&mut reader2, 10, 100).await;
assert_eq!(events.len(), 10);
assert!(events[0].contains("event 0"));
assert!(events[9].contains("event 9"));
// Cleanup
let _ = std::fs::remove_file(&socket_path);
}
#[tokio::test]
async fn test_client_receives_events_in_order() {
let socket_path = test_socket_path("event_order");
let manager = Arc::new(EventManager::new().unwrap());
// Start the listener
let listener_manager = Arc::clone(&manager);
let socket_path_clone = socket_path.clone();
tokio::spawn(async move {
listener_manager.start_listening(socket_path_clone).await;
});
tokio::time::sleep(Duration::from_millis(50)).await;
// Connect client
let stream = UnixStream::connect(&socket_path).await.unwrap();
let mut reader = BufReader::new(stream);
// Broadcast events rapidly
let count = 50;
for i in 0..count {
let event = Event::new("test_user", format!("sequence {}", i));
manager.broadcast(&event).await.unwrap();
}
// Read all events
let events = read_events_with_timeout(&mut reader, count, 500).await;
assert_eq!(events.len(), count);
// Verify order
for (i, event) in events.iter().enumerate() {
assert!(
event.contains(&format!("sequence {}", i)),
"Event {} out of order: {}",
i,
event
);
}
// Cleanup
let _ = std::fs::remove_file(&socket_path);
}
#[tokio::test]
async fn test_concurrent_broadcasts_during_client_connections() {
let socket_path = test_socket_path("concurrent_ops");
let manager = Arc::new(EventManager::new().unwrap());
// Start the listener
let listener_manager = Arc::clone(&manager);
let socket_path_clone = socket_path.clone();
tokio::spawn(async move {
listener_manager.start_listening(socket_path_clone).await;
});
tokio::time::sleep(Duration::from_millis(50)).await;
// Connect client 1 BEFORE any broadcasts
let stream1 = UnixStream::connect(&socket_path).await.unwrap();
let mut reader1 = BufReader::new(stream1);
// Spawn a task that continuously broadcasts
let broadcast_manager = Arc::clone(&manager);
let broadcast_handle = tokio::spawn(async move {
for i in 0..100 {
let event = Event::new("test_user", format!("concurrent event {}", i));
broadcast_manager.broadcast(&event).await.unwrap();
tokio::time::sleep(Duration::from_millis(5)).await;
}
});
// While broadcasting, connect more clients at different times
tokio::time::sleep(Duration::from_millis(100)).await;
let stream2 = UnixStream::connect(&socket_path).await.unwrap();
let mut reader2 = BufReader::new(stream2);
tokio::time::sleep(Duration::from_millis(150)).await;
let stream3 = UnixStream::connect(&socket_path).await.unwrap();
let mut reader3 = BufReader::new(stream3);
// Wait for broadcasts to complete
broadcast_handle.await.unwrap();
// All clients should have received events
let events1 = read_events_with_timeout(&mut reader1, 100, 200).await;
let events2 = read_events_with_timeout(&mut reader2, 100, 200).await;
let events3 = read_events_with_timeout(&mut reader3, 100, 200).await;
// Client 1 connected first (before any broadcasts), should get all 100
assert_eq!(events1.len(), 100);
// Client 2 connected after ~20 events were broadcast
// Gets ~20 from history + ~80 live = 100
assert_eq!(events2.len(), 100);
// Client 3 connected after ~50 events were broadcast
// Gets ~50 from history + ~50 live = 100
assert_eq!(events3.len(), 100);
// Verify they all received events in order
assert!(events1[0].contains("concurrent event 0"));
assert!(events2[0].contains("concurrent event 0"));
assert!(events3[0].contains("concurrent event 0"));
// Cleanup
let _ = std::fs::remove_file(&socket_path);
}
#[tokio::test]
async fn test_buffer_overflow_affects_new_clients() {
let socket_path = test_socket_path("buffer_overflow");
let manager = Arc::new(EventManager::new().unwrap());
// Broadcast more than buffer max (1000)
for i in 0..1100 {
let event = Event::new("test_user", format!("overflow event {}", i));
manager.broadcast(&event).await.unwrap();
}
// Start the listener
let listener_manager = Arc::clone(&manager);
let socket_path_clone = socket_path.clone();
tokio::spawn(async move {
listener_manager.start_listening(socket_path_clone).await;
});
tokio::time::sleep(Duration::from_millis(50)).await;
// New client connects
let stream = UnixStream::connect(&socket_path).await.unwrap();
let mut reader = BufReader::new(stream);
// Should receive exactly 1000 events (buffer max)
let events = read_events_with_timeout(&mut reader, 1100, 500).await;
assert_eq!(events.len(), 1000);
// First event should be 100 (oldest 100 were evicted)
assert!(events[0].contains("overflow event 100"));
// Last event should be 1099
assert!(events[999].contains("overflow event 1099"));
// Cleanup
let _ = std::fs::remove_file(&socket_path);
}
#[rstest]
#[case(10, 1)]
#[case(50, 5)]
#[tokio::test]
async fn test_client_count_scaling(#[case] num_clients: usize, #[case] events_per_client: usize) {
let socket_path = test_socket_path(&format!("scaling_{}_{}", num_clients, events_per_client));
let manager = Arc::new(EventManager::new().unwrap());
// Start the listener
let listener_manager = Arc::clone(&manager);
let socket_path_clone = socket_path.clone();
tokio::spawn(async move {
listener_manager.start_listening(socket_path_clone).await;
});
tokio::time::sleep(Duration::from_millis(50)).await;
// Connect many clients
let mut readers = Vec::new();
for _ in 0..num_clients {
let stream = UnixStream::connect(&socket_path).await.unwrap();
readers.push(BufReader::new(stream));
}
// Broadcast events
for i in 0..events_per_client {
let event = Event::new("test_user", format!("scale event {}", i));
manager.broadcast(&event).await.unwrap();
}
// Verify all clients received all events
for reader in &mut readers {
let events = read_events_with_timeout(reader, events_per_client, 200).await;
assert_eq!(events.len(), events_per_client);
}
// Cleanup
let _ = std::fs::remove_file(&socket_path);
}
#[tokio::test]
async fn test_client_disconnect_doesnt_affect_others() {
let socket_path = test_socket_path("disconnect");
let manager = Arc::new(EventManager::new().unwrap());
// Start the listener
let listener_manager = Arc::clone(&manager);
let socket_path_clone = socket_path.clone();
tokio::spawn(async move {
listener_manager.start_listening(socket_path_clone).await;
});
tokio::time::sleep(Duration::from_millis(50)).await;
// Connect 3 clients
let stream1 = UnixStream::connect(&socket_path).await.unwrap();
let stream2 = UnixStream::connect(&socket_path).await.unwrap();
let stream3 = UnixStream::connect(&socket_path).await.unwrap();
let mut reader1 = BufReader::new(stream1);
let mut reader2 = BufReader::new(stream2);
let mut reader3 = BufReader::new(stream3);
// Broadcast initial event
manager
.broadcast(&Event::new("test_user", "before disconnect"))
.await
.unwrap();
// All receive it
let _ = read_events_with_timeout(&mut reader1, 1, 100).await;
let _ = read_events_with_timeout(&mut reader2, 1, 100).await;
let _ = read_events_with_timeout(&mut reader3, 1, 100).await;
// Drop client 2 (simulates disconnect)
drop(reader2);
// Broadcast another event
manager
.broadcast(&Event::new("test_user", "after disconnect"))
.await
.unwrap();
// Clients 1 and 3 should still receive it
let events1 = read_events_with_timeout(&mut reader1, 1, 100).await;
let events3 = read_events_with_timeout(&mut reader3, 1, 100).await;
assert_eq!(events1.len(), 1);
assert_eq!(events3.len(), 1);
assert!(events1[0].contains("after disconnect"));
assert!(events3[0].contains("after disconnect"));
// Cleanup
let _ = std::fs::remove_file(&socket_path);
}
#[tokio::test]
async fn test_json_deserialization_of_received_events() {
let socket_path = test_socket_path("json_deser");
let manager = Arc::new(EventManager::new().unwrap());
// Start the listener
let listener_manager = Arc::clone(&manager);
let socket_path_clone = socket_path.clone();
tokio::spawn(async move {
listener_manager.start_listening(socket_path_clone).await;
});
tokio::time::sleep(Duration::from_millis(50)).await;
// Broadcast an event with special characters
let test_message = "special chars: @#$% newline\\n tab\\t quotes \"test\"";
manager
.broadcast(&Event::new("test_user", test_message))
.await
.unwrap();
// Connect and deserialize
let stream = UnixStream::connect(&socket_path).await.unwrap();
let mut reader = BufReader::new(stream);
let mut line = String::new();
reader.read_line(&mut line).await.unwrap();
// Should be valid JSON
let parsed: serde_json::Value = serde_json::from_str(line.trim()).unwrap();
assert_eq!(parsed["message"], test_message);
// Cleanup
let _ = std::fs::remove_file(&socket_path);
}