Expanded outbound test

This commit is contained in:
Mathias Hall-Andersen
2019-09-04 19:22:47 +02:00
parent 6d11da441b
commit 310be99fa6
6 changed files with 44 additions and 21 deletions

2
src/router/constants.rs Normal file
View File

@@ -0,0 +1,2 @@
pub const MAX_STAGED_PACKETS: usize = 128;
pub const WORKER_QUEUE_SIZE: usize = MAX_STAGED_PACKETS;

View File

@@ -18,6 +18,7 @@ use super::peer;
use super::peer::{Peer, PeerInner}; use super::peer::{Peer, PeerInner};
use super::SIZE_MESSAGE_PREFIX; use super::SIZE_MESSAGE_PREFIX;
use super::constants::WORKER_QUEUE_SIZE;
use super::messages::TYPE_TRANSPORT; use super::messages::TYPE_TRANSPORT;
use super::types::{Callback, Callbacks, KeyCallback, Opaque, PhantomCallbacks, RouterError}; use super::types::{Callback, Callbacks, KeyCallback, Opaque, PhantomCallbacks, RouterError};
use super::workers::{worker_parallel, JobParallel}; use super::workers::{worker_parallel, JobParallel};
@@ -113,13 +114,9 @@ impl<O: Opaque, R: Callback<O>, S: Callback<O>, K: KeyCallback<O>, T: Tun, B: Bi
let mut queues = Vec::with_capacity(num_workers); let mut queues = Vec::with_capacity(num_workers);
let mut threads = Vec::with_capacity(num_workers); let mut threads = Vec::with_capacity(num_workers);
for _ in 0..num_workers { for _ in 0..num_workers {
// allocate work queue let (tx, rx) = sync_channel(WORKER_QUEUE_SIZE);
let (tx, rx) = sync_channel(128);
queues.push(spin::Mutex::new(tx)); queues.push(spin::Mutex::new(tx));
threads.push(thread::spawn(move || worker_parallel(rx)));
// start worker thread
let device = inner.clone();
threads.push(thread::spawn(move || worker_parallel(device, rx)));
} }
// return exported device handle // return exported device handle

View File

@@ -1,4 +1,5 @@
mod anti_replay; mod anti_replay;
mod constants;
mod device; mod device;
mod messages; mod messages;
mod peer; mod peer;

View File

@@ -22,17 +22,15 @@ use super::device::DeviceInner;
use super::device::EncryptionState; use super::device::EncryptionState;
use super::messages::TransportHeader; use super::messages::TransportHeader;
use futures::sync::oneshot;
use futures::*; use futures::*;
use super::workers::Operation; use super::workers::Operation;
use super::workers::{worker_inbound, worker_outbound}; use super::workers::{worker_inbound, worker_outbound};
use super::workers::{JobBuffer, JobInbound, JobOutbound, JobParallel}; use super::workers::{JobBuffer, JobInbound, JobOutbound, JobParallel};
use super::constants::MAX_STAGED_PACKETS;
use super::types::Callbacks; use super::types::Callbacks;
const MAX_STAGED_PACKETS: usize = 128;
pub struct KeyWheel { pub struct KeyWheel {
next: Option<Arc<KeyPair>>, // next key state (unconfirmed) next: Option<Arc<KeyPair>>, // next key state (unconfirmed)
current: Option<Arc<KeyPair>>, // current key state (used for encryption) current: Option<Arc<KeyPair>>, // current key state (used for encryption)

View File

@@ -123,17 +123,30 @@ fn dummy_keypair(initiator: bool) -> KeyPair {
#[test] #[test]
fn test_outbound() { fn test_outbound() {
let opaque = Arc::new(AtomicBool::new(false)); // type for tracking events inside the router module
struct Flags {
send: AtomicBool,
recv: AtomicBool,
need_key: AtomicBool,
}
// create device (with Opaque = ()) type Opaque = Arc<Flags>;
let opaque = Arc::new(Flags {
send: AtomicBool::new(false),
recv: AtomicBool::new(false),
need_key: AtomicBool::new(false),
});
// create device
let workers = 4; let workers = 4;
let router = Device::new( let router = Device::new(
workers, workers,
TunTest {}, TunTest {},
BindTest {}, BindTest {},
|t: &Arc<AtomicBool>, data: bool, sent: bool| println!("send"), |t: &Opaque, data: bool, sent: bool| t.send.store(true, Ordering::SeqCst),
|t: &Arc<AtomicBool>, data: bool, sent: bool| {}, |t: &Opaque, data: bool, sent: bool| t.recv.store(true, Ordering::SeqCst),
|t: &Arc<AtomicBool>| t.store(true, Ordering::SeqCst), |t: &Opaque| t.need_key.store(true, Ordering::SeqCst),
); );
// create peer // create peer
@@ -165,7 +178,9 @@ fn test_outbound() {
peer.add_keypair(dummy_keypair(true)); peer.add_keypair(dummy_keypair(true));
for (mask, len, ip, okay) in &tests { for (mask, len, ip, okay) in &tests {
opaque.store(false, Ordering::SeqCst); opaque.send.store(false, Ordering::SeqCst);
opaque.recv.store(false, Ordering::SeqCst);
opaque.need_key.store(false, Ordering::SeqCst);
let mask: IpAddr = mask.parse().unwrap(); let mask: IpAddr = mask.parse().unwrap();
@@ -187,15 +202,28 @@ fn test_outbound() {
// cryptkey route the IP packet // cryptkey route the IP packet
let res = router.send(msg); let res = router.send(msg);
// allow some scheduling
thread::sleep(Duration::from_millis(1));
if *okay { if *okay {
// cryptkey routing succeeded // cryptkey routing succeeded
assert!(res.is_ok()); assert!(res.is_ok());
// and a key should have been requested // attempted to send message
// assert!(opaque.load(Ordering::Acquire), "did not request key"); assert_eq!(opaque.need_key.load(Ordering::Acquire), false);
assert_eq!(opaque.send.load(Ordering::Acquire), true);
assert_eq!(opaque.recv.load(Ordering::Acquire), false);
} else { } else {
// no such cryptkey route
assert!(res.is_err()); assert!(res.is_err());
// did not attempt to send message
assert_eq!(opaque.need_key.load(Ordering::Acquire), false);
assert_eq!(opaque.send.load(Ordering::Acquire), false);
assert_eq!(opaque.recv.load(Ordering::Acquire), false);
} }
// clear subnets for next test // clear subnets for next test
peer.remove_subnets(); peer.remove_subnets();
} }

View File

@@ -141,10 +141,7 @@ pub fn worker_outbound<C: Callbacks, T: Tun, B: Bind>(
} }
} }
pub fn worker_parallel<C: Callbacks, T: Tun, B: Bind>( pub fn worker_parallel(receiver: Receiver<JobParallel>) {
device: Arc<DeviceInner<C, T, B>>,
receiver: Receiver<JobParallel>,
) {
loop { loop {
// fetch next job // fetch next job
let (tx, mut buf) = match receiver.recv() { let (tx, mut buf) = match receiver.recv() {