diff --git a/src/router/constants.rs b/src/router/constants.rs new file mode 100644 index 0000000..b3015ed --- /dev/null +++ b/src/router/constants.rs @@ -0,0 +1,2 @@ +pub const MAX_STAGED_PACKETS: usize = 128; +pub const WORKER_QUEUE_SIZE: usize = MAX_STAGED_PACKETS; diff --git a/src/router/device.rs b/src/router/device.rs index 58ca2f6..2617350 100644 --- a/src/router/device.rs +++ b/src/router/device.rs @@ -18,6 +18,7 @@ use super::peer; use super::peer::{Peer, PeerInner}; use super::SIZE_MESSAGE_PREFIX; +use super::constants::WORKER_QUEUE_SIZE; use super::messages::TYPE_TRANSPORT; use super::types::{Callback, Callbacks, KeyCallback, Opaque, PhantomCallbacks, RouterError}; use super::workers::{worker_parallel, JobParallel}; @@ -113,13 +114,9 @@ impl, S: Callback, K: KeyCallback, T: Tun, B: Bi let mut queues = Vec::with_capacity(num_workers); let mut threads = Vec::with_capacity(num_workers); for _ in 0..num_workers { - // allocate work queue - let (tx, rx) = sync_channel(128); + let (tx, rx) = sync_channel(WORKER_QUEUE_SIZE); queues.push(spin::Mutex::new(tx)); - - // start worker thread - let device = inner.clone(); - threads.push(thread::spawn(move || worker_parallel(device, rx))); + threads.push(thread::spawn(move || worker_parallel(rx))); } // return exported device handle diff --git a/src/router/mod.rs b/src/router/mod.rs index 0e4bce1..ec560b4 100644 --- a/src/router/mod.rs +++ b/src/router/mod.rs @@ -1,4 +1,5 @@ mod anti_replay; +mod constants; mod device; mod messages; mod peer; diff --git a/src/router/peer.rs b/src/router/peer.rs index a31dfcf..e9f62d5 100644 --- a/src/router/peer.rs +++ b/src/router/peer.rs @@ -22,17 +22,15 @@ use super::device::DeviceInner; use super::device::EncryptionState; use super::messages::TransportHeader; -use futures::sync::oneshot; use futures::*; use super::workers::Operation; use super::workers::{worker_inbound, worker_outbound}; use super::workers::{JobBuffer, JobInbound, JobOutbound, JobParallel}; +use super::constants::MAX_STAGED_PACKETS; use super::types::Callbacks; -const MAX_STAGED_PACKETS: usize = 128; - pub struct KeyWheel { next: Option>, // next key state (unconfirmed) current: Option>, // current key state (used for encryption) diff --git a/src/router/tests.rs b/src/router/tests.rs index 1e049c4..c2ff378 100644 --- a/src/router/tests.rs +++ b/src/router/tests.rs @@ -123,17 +123,30 @@ fn dummy_keypair(initiator: bool) -> KeyPair { #[test] fn test_outbound() { - let opaque = Arc::new(AtomicBool::new(false)); + // type for tracking events inside the router module + struct Flags { + send: AtomicBool, + recv: AtomicBool, + need_key: AtomicBool, + } - // create device (with Opaque = ()) + type Opaque = Arc; + + let opaque = Arc::new(Flags { + send: AtomicBool::new(false), + recv: AtomicBool::new(false), + need_key: AtomicBool::new(false), + }); + + // create device let workers = 4; let router = Device::new( workers, TunTest {}, BindTest {}, - |t: &Arc, data: bool, sent: bool| println!("send"), - |t: &Arc, data: bool, sent: bool| {}, - |t: &Arc| t.store(true, Ordering::SeqCst), + |t: &Opaque, data: bool, sent: bool| t.send.store(true, Ordering::SeqCst), + |t: &Opaque, data: bool, sent: bool| t.recv.store(true, Ordering::SeqCst), + |t: &Opaque| t.need_key.store(true, Ordering::SeqCst), ); // create peer @@ -165,7 +178,9 @@ fn test_outbound() { peer.add_keypair(dummy_keypair(true)); for (mask, len, ip, okay) in &tests { - opaque.store(false, Ordering::SeqCst); + opaque.send.store(false, Ordering::SeqCst); + opaque.recv.store(false, Ordering::SeqCst); + opaque.need_key.store(false, Ordering::SeqCst); let mask: IpAddr = mask.parse().unwrap(); @@ -187,15 +202,28 @@ fn test_outbound() { // cryptkey route the IP packet let res = router.send(msg); + + // allow some scheduling + thread::sleep(Duration::from_millis(1)); + if *okay { // cryptkey routing succeeded assert!(res.is_ok()); - // and a key should have been requested - // assert!(opaque.load(Ordering::Acquire), "did not request key"); + // attempted to send message + assert_eq!(opaque.need_key.load(Ordering::Acquire), false); + assert_eq!(opaque.send.load(Ordering::Acquire), true); + assert_eq!(opaque.recv.load(Ordering::Acquire), false); } else { + // no such cryptkey route assert!(res.is_err()); + + // did not attempt to send message + assert_eq!(opaque.need_key.load(Ordering::Acquire), false); + assert_eq!(opaque.send.load(Ordering::Acquire), false); + assert_eq!(opaque.recv.load(Ordering::Acquire), false); } + // clear subnets for next test peer.remove_subnets(); } diff --git a/src/router/workers.rs b/src/router/workers.rs index e79502f..537f238 100644 --- a/src/router/workers.rs +++ b/src/router/workers.rs @@ -141,10 +141,7 @@ pub fn worker_outbound( } } -pub fn worker_parallel( - device: Arc>, - receiver: Receiver, -) { +pub fn worker_parallel(receiver: Receiver) { loop { // fetch next job let (tx, mut buf) = match receiver.recv() {