More extensive outbound test
This commit is contained in:
@@ -8,6 +8,8 @@ use std::sync::{Arc, Weak};
|
||||
use std::thread;
|
||||
use std::time::Instant;
|
||||
|
||||
use log::debug;
|
||||
|
||||
use spin;
|
||||
use treebitmap::IpLookupTable;
|
||||
|
||||
@@ -84,6 +86,8 @@ impl<C: Callbacks, T: Tun, B: Bind> Drop for Device<C, T, B> {
|
||||
}
|
||||
_ => false,
|
||||
} {}
|
||||
|
||||
debug!("device dropped");
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -5,6 +5,8 @@ use std::sync::mpsc::{sync_channel, SyncSender};
|
||||
use std::sync::{Arc, Weak};
|
||||
use std::thread;
|
||||
|
||||
use log::debug;
|
||||
|
||||
use spin::Mutex;
|
||||
|
||||
use arraydeque::{ArrayDeque, Saturating, Wrapping};
|
||||
@@ -54,8 +56,8 @@ pub struct PeerInner<C: Callbacks, T: Tun, B: Bind> {
|
||||
|
||||
pub struct Peer<C: Callbacks, T: Tun, B: Bind> {
|
||||
state: Arc<PeerInner<C, T, B>>,
|
||||
thread_outbound: thread::JoinHandle<()>,
|
||||
thread_inbound: thread::JoinHandle<()>,
|
||||
thread_outbound: Option<thread::JoinHandle<()>>,
|
||||
thread_inbound: Option<thread::JoinHandle<()>>,
|
||||
}
|
||||
|
||||
fn treebit_list<A, E, C: Callbacks, T: Tun, B: Bind>(
|
||||
@@ -109,6 +111,16 @@ impl<C: Callbacks, T: Tun, B: Bind> Drop for Peer<C, T, B> {
|
||||
let peer = &self.state;
|
||||
peer.stopped.store(true, Ordering::SeqCst);
|
||||
|
||||
// drop channels
|
||||
|
||||
mem::replace(&mut *peer.inbound.lock(), sync_channel(0).0);
|
||||
mem::replace(&mut *peer.outbound.lock(), sync_channel(0).0);
|
||||
|
||||
// join with workers
|
||||
|
||||
mem::replace(&mut self.thread_inbound, None).map(|v| v.join());
|
||||
mem::replace(&mut self.thread_outbound, None).map(|v| v.join());
|
||||
|
||||
// remove from cryptkey router
|
||||
|
||||
treebit_remove(self, &peer.device.ipv4);
|
||||
@@ -130,7 +142,7 @@ impl<C: Callbacks, T: Tun, B: Bind> Drop for Peer<C, T, B> {
|
||||
}
|
||||
}
|
||||
|
||||
// null key-material (TODO: extend)
|
||||
// null key-material
|
||||
|
||||
keys.next = None;
|
||||
keys.current = None;
|
||||
@@ -138,6 +150,8 @@ impl<C: Callbacks, T: Tun, B: Bind> Drop for Peer<C, T, B> {
|
||||
|
||||
*peer.ekey.lock() = None;
|
||||
*peer.endpoint.lock() = None;
|
||||
|
||||
debug!("peer dropped & removed from device");
|
||||
}
|
||||
}
|
||||
|
||||
@@ -153,10 +167,10 @@ pub fn new_peer<C: Callbacks, T: Tun, B: Bind>(
|
||||
let device = device.clone();
|
||||
Arc::new(PeerInner {
|
||||
opaque,
|
||||
device,
|
||||
inbound: Mutex::new(in_tx),
|
||||
outbound: Mutex::new(out_tx),
|
||||
stopped: AtomicBool::new(false),
|
||||
device: device,
|
||||
ekey: spin::Mutex::new(None),
|
||||
endpoint: spin::Mutex::new(None),
|
||||
keys: spin::Mutex::new(KeyWheel {
|
||||
@@ -187,8 +201,8 @@ pub fn new_peer<C: Callbacks, T: Tun, B: Bind>(
|
||||
|
||||
Peer {
|
||||
state: peer,
|
||||
thread_inbound,
|
||||
thread_outbound,
|
||||
thread_inbound: Some(thread_inbound),
|
||||
thread_outbound: Some(thread_outbound),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -212,21 +226,22 @@ impl<C: Callbacks, T: Tun, B: Bind> PeerInner<C, T, B> {
|
||||
let key = match self.ekey.lock().as_mut() {
|
||||
None => {
|
||||
// add to staged packets (create no job)
|
||||
debug!("execute callback: call_need_key");
|
||||
(self.device.call_need_key)(&self.opaque);
|
||||
self.staged_packets.lock().push_back(msg);
|
||||
return None;
|
||||
}
|
||||
Some(mut state) => {
|
||||
// allocate nonce
|
||||
state.nonce += 1;
|
||||
if state.nonce >= REJECT_AFTER_MESSAGES {
|
||||
state.nonce -= 1;
|
||||
// avoid integer overflow in nonce
|
||||
if state.nonce >= REJECT_AFTER_MESSAGES - 1 {
|
||||
return None;
|
||||
}
|
||||
debug!("encryption state available, nonce = {}", state.nonce);
|
||||
|
||||
// set transport message fields
|
||||
header.f_counter.set(state.nonce);
|
||||
header.f_receiver.set(state.id);
|
||||
state.nonce += 1;
|
||||
state.key
|
||||
}
|
||||
};
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
use std::error::Error;
|
||||
use std::fmt;
|
||||
use std::io;
|
||||
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
|
||||
use std::sync::atomic::{AtomicBool, Ordering};
|
||||
use std::sync::Arc;
|
||||
@@ -121,110 +122,141 @@ fn dummy_keypair(initiator: bool) -> KeyPair {
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_outbound() {
|
||||
// type for tracking events inside the router module
|
||||
struct Flags {
|
||||
send: AtomicBool,
|
||||
recv: AtomicBool,
|
||||
need_key: AtomicBool,
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use env_logger;
|
||||
|
||||
fn init() {
|
||||
let _ = env_logger::builder().is_test(true).try_init();
|
||||
}
|
||||
|
||||
type Opaque = Arc<Flags>;
|
||||
#[test]
|
||||
fn test_outbound() {
|
||||
init();
|
||||
|
||||
let opaque = Arc::new(Flags {
|
||||
send: AtomicBool::new(false),
|
||||
recv: AtomicBool::new(false),
|
||||
need_key: AtomicBool::new(false),
|
||||
});
|
||||
|
||||
// create device
|
||||
let workers = 4;
|
||||
let router = Device::new(
|
||||
workers,
|
||||
TunTest {},
|
||||
BindTest {},
|
||||
|t: &Opaque, data: bool, sent: bool| t.send.store(true, Ordering::SeqCst),
|
||||
|t: &Opaque, data: bool, sent: bool| t.recv.store(true, Ordering::SeqCst),
|
||||
|t: &Opaque| t.need_key.store(true, Ordering::SeqCst),
|
||||
);
|
||||
|
||||
// create peer
|
||||
let peer = router.new_peer(opaque.clone());
|
||||
let tests = vec![
|
||||
("192.168.1.0", 24, "192.168.1.20", true),
|
||||
("172.133.133.133", 32, "172.133.133.133", true),
|
||||
("172.133.133.133", 32, "172.133.133.132", false),
|
||||
(
|
||||
"2001:db8::ff00:42:0000",
|
||||
112,
|
||||
"2001:db8::ff00:42:3242",
|
||||
true,
|
||||
),
|
||||
(
|
||||
"2001:db8::ff00:42:8000",
|
||||
113,
|
||||
"2001:db8::ff00:42:0660",
|
||||
false,
|
||||
),
|
||||
(
|
||||
"2001:db8::ff00:42:8000",
|
||||
113,
|
||||
"2001:db8::ff00:42:ffff",
|
||||
true,
|
||||
),
|
||||
];
|
||||
|
||||
peer.add_keypair(dummy_keypair(true));
|
||||
|
||||
for (mask, len, ip, okay) in &tests {
|
||||
opaque.send.store(false, Ordering::SeqCst);
|
||||
opaque.recv.store(false, Ordering::SeqCst);
|
||||
opaque.need_key.store(false, Ordering::SeqCst);
|
||||
|
||||
let mask: IpAddr = mask.parse().unwrap();
|
||||
|
||||
// map subnet to peer
|
||||
peer.add_subnet(mask, *len);
|
||||
|
||||
// create "IP packet"
|
||||
let mut msg = Vec::<u8>::new();
|
||||
msg.resize(SIZE_MESSAGE_PREFIX + 1024, 0);
|
||||
if mask.is_ipv4() {
|
||||
let mut packet = MutableIpv4Packet::new(&mut msg[SIZE_MESSAGE_PREFIX..]).unwrap();
|
||||
packet.set_destination(ip.parse().unwrap());
|
||||
packet.set_version(4);
|
||||
} else {
|
||||
let mut packet = MutableIpv6Packet::new(&mut msg[SIZE_MESSAGE_PREFIX..]).unwrap();
|
||||
packet.set_destination(ip.parse().unwrap());
|
||||
packet.set_version(6);
|
||||
// type for tracking events inside the router module
|
||||
struct Flags {
|
||||
send: AtomicBool,
|
||||
recv: AtomicBool,
|
||||
need_key: AtomicBool,
|
||||
}
|
||||
|
||||
// cryptkey route the IP packet
|
||||
let res = router.send(msg);
|
||||
type Opaque = Arc<Flags>;
|
||||
|
||||
// allow some scheduling
|
||||
thread::sleep(Duration::from_millis(1));
|
||||
// create device
|
||||
let workers = 4;
|
||||
let router = Device::new(
|
||||
workers,
|
||||
TunTest {},
|
||||
BindTest {},
|
||||
|t: &Opaque, _data: bool, _sent: bool| t.send.store(true, Ordering::SeqCst),
|
||||
|t: &Opaque, _data: bool, _sent: bool| t.recv.store(true, Ordering::SeqCst),
|
||||
|t: &Opaque| t.need_key.store(true, Ordering::SeqCst),
|
||||
);
|
||||
|
||||
if *okay {
|
||||
// cryptkey routing succeeded
|
||||
assert!(res.is_ok());
|
||||
// create peer
|
||||
let tests = vec![
|
||||
("192.168.1.0", 24, "192.168.1.20", true),
|
||||
("172.133.133.133", 32, "172.133.133.133", true),
|
||||
("172.133.133.133", 32, "172.133.133.132", false),
|
||||
(
|
||||
"2001:db8::ff00:42:0000",
|
||||
112,
|
||||
"2001:db8::ff00:42:3242",
|
||||
true,
|
||||
),
|
||||
(
|
||||
"2001:db8::ff00:42:8000",
|
||||
113,
|
||||
"2001:db8::ff00:42:0660",
|
||||
false,
|
||||
),
|
||||
(
|
||||
"2001:db8::ff00:42:8000",
|
||||
113,
|
||||
"2001:db8::ff00:42:ffff",
|
||||
true,
|
||||
),
|
||||
];
|
||||
|
||||
// attempted to send message
|
||||
assert_eq!(opaque.need_key.load(Ordering::Acquire), false);
|
||||
assert_eq!(opaque.send.load(Ordering::Acquire), true);
|
||||
assert_eq!(opaque.recv.load(Ordering::Acquire), false);
|
||||
} else {
|
||||
// no such cryptkey route
|
||||
assert!(res.is_err());
|
||||
for (num, (mask, len, ip, okay)) in tests.iter().enumerate() {
|
||||
for set_key in vec![true, false] {
|
||||
// add new peer
|
||||
let opaque = Arc::new(Flags {
|
||||
send: AtomicBool::new(false),
|
||||
recv: AtomicBool::new(false),
|
||||
need_key: AtomicBool::new(false),
|
||||
});
|
||||
let peer = router.new_peer(opaque.clone());
|
||||
let mask: IpAddr = mask.parse().unwrap();
|
||||
|
||||
// did not attempt to send message
|
||||
assert_eq!(opaque.need_key.load(Ordering::Acquire), false);
|
||||
assert_eq!(opaque.send.load(Ordering::Acquire), false);
|
||||
assert_eq!(opaque.recv.load(Ordering::Acquire), false);
|
||||
if set_key {
|
||||
peer.add_keypair(dummy_keypair(true));
|
||||
}
|
||||
|
||||
// map subnet to peer
|
||||
peer.add_subnet(mask, *len);
|
||||
|
||||
// create "IP packet"
|
||||
let mut msg = Vec::<u8>::new();
|
||||
msg.resize(SIZE_MESSAGE_PREFIX + 1024, 0);
|
||||
if mask.is_ipv4() {
|
||||
let mut packet =
|
||||
MutableIpv4Packet::new(&mut msg[SIZE_MESSAGE_PREFIX..]).unwrap();
|
||||
packet.set_destination(ip.parse().unwrap());
|
||||
packet.set_version(4);
|
||||
} else {
|
||||
let mut packet =
|
||||
MutableIpv6Packet::new(&mut msg[SIZE_MESSAGE_PREFIX..]).unwrap();
|
||||
packet.set_destination(ip.parse().unwrap());
|
||||
packet.set_version(6);
|
||||
}
|
||||
|
||||
// cryptkey route the IP packet
|
||||
let res = router.send(msg);
|
||||
|
||||
// allow some scheduling
|
||||
thread::sleep(Duration::from_millis(20));
|
||||
|
||||
if *okay {
|
||||
// cryptkey routing succeeded
|
||||
assert!(res.is_ok(), "crypt-key routing should succeed");
|
||||
assert_eq!(
|
||||
opaque.need_key.load(Ordering::Acquire),
|
||||
!set_key,
|
||||
"should have requested a new key, if no encryption state was set"
|
||||
);
|
||||
assert_eq!(
|
||||
opaque.send.load(Ordering::Acquire),
|
||||
set_key,
|
||||
"transmission should have been attempted"
|
||||
);
|
||||
assert_eq!(
|
||||
opaque.recv.load(Ordering::Acquire),
|
||||
false,
|
||||
"no messages should have been marked as received"
|
||||
);
|
||||
} else {
|
||||
// no such cryptkey route
|
||||
assert!(res.is_err(), "crypt-key routing should fail");
|
||||
assert_eq!(
|
||||
opaque.need_key.load(Ordering::Acquire),
|
||||
false,
|
||||
"should not request a new-key if crypt-key routing failed"
|
||||
);
|
||||
assert_eq!(
|
||||
opaque.send.load(Ordering::Acquire),
|
||||
false,
|
||||
"transmission should not have been attempted",
|
||||
);
|
||||
assert_eq!(
|
||||
opaque.recv.load(Ordering::Acquire),
|
||||
false,
|
||||
"no messages should have been marked as received",
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// clear subnets for next test
|
||||
peer.remove_subnets();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,16 +1,12 @@
|
||||
use std::iter;
|
||||
use std::mem;
|
||||
use std::sync::atomic::{AtomicBool, Ordering};
|
||||
use std::sync::mpsc::{sync_channel, Receiver, TryRecvError};
|
||||
use std::sync::mpsc::Receiver;
|
||||
use std::sync::{Arc, Weak};
|
||||
use std::thread;
|
||||
|
||||
use futures::sync::oneshot;
|
||||
use futures::*;
|
||||
|
||||
use spin;
|
||||
use log::debug;
|
||||
|
||||
use crossbeam_deque::{Injector, Steal, Stealer, Worker};
|
||||
use ring::aead::{Aad, LessSafeKey, Nonce, UnboundKey, CHACHA20_POLY1305};
|
||||
use zerocopy::{AsBytes, LayoutVerified};
|
||||
|
||||
@@ -174,12 +170,16 @@ pub fn worker_parallel(receiver: Receiver<JobParallel>) {
|
||||
|
||||
match buf.op {
|
||||
Operation::Encryption => {
|
||||
debug!("worker, process encryption");
|
||||
|
||||
// note: extends the vector to accommodate the tag
|
||||
key.seal_in_place_append_tag(nonce, Aad::empty(), &mut buf.msg)
|
||||
.unwrap();
|
||||
buf.okay = true;
|
||||
}
|
||||
Operation::Decryption => {
|
||||
debug!("worker, process decryption");
|
||||
|
||||
// opening failure is signaled by fault state
|
||||
buf.okay = match key.open_in_place(nonce, Aad::empty(), &mut buf.msg) {
|
||||
Ok(_) => true,
|
||||
|
||||
Reference in New Issue
Block a user