More comprehensive unit tests for router

This commit is contained in:
Mathias Hall-Andersen
2020-02-20 13:21:37 +01:00
parent ead75828cd
commit db02609334
9 changed files with 667 additions and 605 deletions

View File

@@ -5,7 +5,7 @@ use std::sync::Arc;
use std::thread; use std::thread;
use std::time::Instant; use std::time::Instant;
use log::debug; use log;
use spin::{Mutex, RwLock}; use spin::{Mutex, RwLock};
use zerocopy::LayoutVerified; use zerocopy::LayoutVerified;
@@ -91,20 +91,17 @@ impl<E: Endpoint, C: Callbacks, T: tun::Writer, B: udp::Writer<E>> Drop
for DeviceHandle<E, C, T, B> for DeviceHandle<E, C, T, B>
{ {
fn drop(&mut self) { fn drop(&mut self) {
debug!("router: dropping device"); log::debug!("router: dropping device");
// close worker queue // close worker queue
self.state.work.close(); self.state.work.close();
// join all worker threads // join all worker threads
while match self.handles.pop() { while let Some(handle) = self.handles.pop() {
Some(handle) => { handle.thread().unpark();
handle.thread().unpark(); handle.join().unwrap();
handle.join().unwrap(); }
true log::debug!("router: joined with all workers from pool");
}
_ => false,
} {}
} }
} }
@@ -124,8 +121,13 @@ impl<E: Endpoint, C: Callbacks, T: tun::Writer, B: udp::Writer<E>> DeviceHandle<
// start worker threads // start worker threads
let mut threads = Vec::with_capacity(num_workers); let mut threads = Vec::with_capacity(num_workers);
while let Some(rx) = consumers.pop() { while let Some(rx) = consumers.pop() {
threads.push(thread::spawn(move || worker(rx))); println!("spawn");
threads.push(thread::spawn(move || {
println!("spawned");
worker(rx);
}));
} }
debug_assert!(num_workers > 0, "zero worker threads");
debug_assert_eq!(threads.len(), num_workers); debug_assert_eq!(threads.len(), num_workers);
// return exported device handle // return exported device handle
@@ -135,14 +137,14 @@ impl<E: Endpoint, C: Callbacks, T: tun::Writer, B: udp::Writer<E>> DeviceHandle<
} }
} }
pub fn send_raw(&self, msg : &[u8], dst: &mut E) -> Result<(), B::Error> { pub fn send_raw(&self, msg: &[u8], dst: &mut E) -> Result<(), B::Error> {
let bind = self.state.outbound.read(); let bind = self.state.outbound.read();
if bind.0 { if bind.0 {
if let Some(bind) = bind.1.as_ref() { if let Some(bind) = bind.1.as_ref() {
return bind.write(msg, dst); return bind.write(msg, dst);
} }
} }
return Ok(()) return Ok(());
} }
/// Brings the router down. /// Brings the router down.

View File

@@ -1,5 +1,8 @@
use std::mem;
use byteorder::BigEndian; use byteorder::BigEndian;
use zerocopy::byteorder::U16; use zerocopy::byteorder::U16;
use zerocopy::LayoutVerified;
use zerocopy::{AsBytes, FromBytes}; use zerocopy::{AsBytes, FromBytes};
pub const VERSION_IP4: u8 = 4; pub const VERSION_IP4: u8 = 4;
@@ -24,3 +27,23 @@ pub struct IPv6Header {
pub f_source: [u8; 16], pub f_source: [u8; 16],
pub f_destination: [u8; 16], pub f_destination: [u8; 16],
} }
#[inline(always)]
pub fn inner_length(packet: &[u8]) -> Option<usize> {
match packet.get(0)? >> 4 {
VERSION_IP4 => {
let (header, _): (LayoutVerified<&[u8], IPv4Header>, _) =
LayoutVerified::new_from_prefix(packet)?;
Some(header.f_total_len.get() as usize)
}
VERSION_IP6 => {
// check length and cast to IPv6 header
let (header, _): (LayoutVerified<&[u8], IPv6Header>, _) =
LayoutVerified::new_from_prefix(packet)?;
Some(header.f_len.get() as usize + mem::size_of::<IPv6Header>())
}
_ => None,
}
}

View File

@@ -24,6 +24,7 @@ use super::types::*;
pub const SIZE_TAG: usize = 16; pub const SIZE_TAG: usize = 16;
pub const SIZE_MESSAGE_PREFIX: usize = mem::size_of::<TransportHeader>(); pub const SIZE_MESSAGE_PREFIX: usize = mem::size_of::<TransportHeader>();
pub const SIZE_KEEPALIVE: usize = mem::size_of::<TransportHeader>() + SIZE_TAG;
pub const CAPACITY_MESSAGE_POSTFIX: usize = SIZE_TAG; pub const CAPACITY_MESSAGE_POSTFIX: usize = SIZE_TAG;
pub const fn message_data_len(payload: usize) -> usize { pub const fn message_data_len(payload: usize) -> usize {

View File

@@ -22,7 +22,7 @@ use std::sync::atomic::AtomicBool;
use std::sync::Arc; use std::sync::Arc;
use arraydeque::{ArrayDeque, Wrapping}; use arraydeque::{ArrayDeque, Wrapping};
use log::debug; use log;
use spin::Mutex; use spin::Mutex;
pub struct KeyWheel { pub struct KeyWheel {
@@ -148,7 +148,7 @@ impl<E: Endpoint, C: Callbacks, T: tun::Writer, B: udp::Writer<E>> Drop for Peer
*peer.enc_key.lock() = None; *peer.enc_key.lock() = None;
*peer.endpoint.lock() = None; *peer.endpoint.lock() = None;
debug!("peer dropped & removed from device"); log::debug!("peer dropped & removed from device");
} }
} }
@@ -192,8 +192,6 @@ impl<E: Endpoint, C: Callbacks, T: tun::Writer, B: udp::Writer<E>> PeerInner<E,
/// ///
/// Unit if packet was sent, or an error indicating why sending failed /// Unit if packet was sent, or an error indicating why sending failed
pub fn send_raw(&self, msg: &[u8]) -> Result<(), RouterError> { pub fn send_raw(&self, msg: &[u8]) -> Result<(), RouterError> {
debug!("peer.send");
// send to endpoint (if known) // send to endpoint (if known)
match self.endpoint.lock().as_mut() { match self.endpoint.lock().as_mut() {
Some(endpoint) => { Some(endpoint) => {
@@ -227,6 +225,7 @@ impl<E: Endpoint, C: Callbacks, T: tun::Writer, B: udp::Writer<E>> Peer<E, C, T,
let mut enc_key = self.enc_key.lock(); let mut enc_key = self.enc_key.lock();
match enc_key.as_mut() { match enc_key.as_mut() {
None => { None => {
log::debug!("no key encryption key available");
if stage { if stage {
self.staged_packets.lock().push_back(msg); self.staged_packets.lock().push_back(msg);
}; };
@@ -235,13 +234,14 @@ impl<E: Endpoint, C: Callbacks, T: tun::Writer, B: udp::Writer<E>> Peer<E, C, T,
Some(mut state) => { Some(mut state) => {
// avoid integer overflow in nonce // avoid integer overflow in nonce
if state.nonce >= REJECT_AFTER_MESSAGES - 1 { if state.nonce >= REJECT_AFTER_MESSAGES - 1 {
log::debug!("encryption key expired");
*enc_key = None; *enc_key = None;
if stage { if stage {
self.staged_packets.lock().push_back(msg); self.staged_packets.lock().push_back(msg);
} }
(None, true) (None, true)
} else { } else {
debug!("encryption state available, nonce = {}", state.nonce); log::debug!("encryption state available, nonce = {}", state.nonce);
let job = let job =
SendJob::new(msg, state.nonce, state.keypair.clone(), self.clone()); SendJob::new(msg, state.nonce, state.keypair.clone(), self.clone());
if self.outbound.push(job.clone()) { if self.outbound.push(job.clone()) {
@@ -256,18 +256,20 @@ impl<E: Endpoint, C: Callbacks, T: tun::Writer, B: udp::Writer<E>> Peer<E, C, T,
}; };
if need_key { if need_key {
log::debug!("request new key");
debug_assert!(job.is_none()); debug_assert!(job.is_none());
C::need_key(&self.opaque); C::need_key(&self.opaque);
}; };
if let Some(job) = job { if let Some(job) = job {
log::debug!("schedule outbound job");
self.device.work.send(JobUnion::Outbound(job)) self.device.work.send(JobUnion::Outbound(job))
} }
} }
// Transmit all staged packets // Transmit all staged packets
fn send_staged(&self) -> bool { fn send_staged(&self) -> bool {
debug!("peer.send_staged"); log::trace!("peer.send_staged");
let mut sent = false; let mut sent = false;
let mut staged = self.staged_packets.lock(); let mut staged = self.staged_packets.lock();
loop { loop {
@@ -282,7 +284,7 @@ impl<E: Endpoint, C: Callbacks, T: tun::Writer, B: udp::Writer<E>> Peer<E, C, T,
} }
pub(super) fn confirm_key(&self, keypair: &Arc<KeyPair>) { pub(super) fn confirm_key(&self, keypair: &Arc<KeyPair>) {
debug!("peer.confirm_key"); log::trace!("peer.confirm_key");
{ {
// take lock and check keypair = keys.next // take lock and check keypair = keys.next
let mut keys = self.keys.lock(); let mut keys = self.keys.lock();
@@ -329,7 +331,7 @@ impl<E: Endpoint, C: Callbacks, T: tun::Writer, B: udp::Writer<E>> PeerHandle<E,
/// This API still permits support for the "sticky socket" behavior, /// This API still permits support for the "sticky socket" behavior,
/// as sockets should be "unsticked" when manually updating the endpoint /// as sockets should be "unsticked" when manually updating the endpoint
pub fn set_endpoint(&self, endpoint: E) { pub fn set_endpoint(&self, endpoint: E) {
debug!("peer.set_endpoint"); log::trace!("peer.set_endpoint");
*self.peer.endpoint.lock() = Some(endpoint); *self.peer.endpoint.lock() = Some(endpoint);
} }
@@ -339,13 +341,13 @@ impl<E: Endpoint, C: Callbacks, T: tun::Writer, B: udp::Writer<E>> PeerHandle<E,
/// ///
/// Does not convey potential "sticky socket" information /// Does not convey potential "sticky socket" information
pub fn get_endpoint(&self) -> Option<SocketAddr> { pub fn get_endpoint(&self) -> Option<SocketAddr> {
debug!("peer.get_endpoint"); log::trace!("peer.get_endpoint");
self.peer.endpoint.lock().as_ref().map(|e| e.into_address()) self.peer.endpoint.lock().as_ref().map(|e| e.into_address())
} }
/// Zero all key-material related to the peer /// Zero all key-material related to the peer
pub fn zero_keys(&self) { pub fn zero_keys(&self) {
debug!("peer.zero_keys"); log::trace!("peer.zero_keys");
let mut release: Vec<u32> = Vec::with_capacity(3); let mut release: Vec<u32> = Vec::with_capacity(3);
let mut keys = self.peer.keys.lock(); let mut keys = self.peer.keys.lock();
@@ -416,7 +418,7 @@ impl<E: Endpoint, C: Callbacks, T: tun::Writer, B: udp::Writer<E>> PeerHandle<E,
// update incoming packet id map // update incoming packet id map
{ {
debug!("peer.add_keypair: updating inbound id map"); log::trace!("peer.add_keypair: updating inbound id map");
let mut recv = self.peer.device.recv.write(); let mut recv = self.peer.device.recv.write();
// purge recv map of previous id // purge recv map of previous id
@@ -438,14 +440,14 @@ impl<E: Endpoint, C: Callbacks, T: tun::Writer, B: udp::Writer<E>> PeerHandle<E,
// schedule confirmation // schedule confirmation
if initiator { if initiator {
debug_assert!(self.peer.enc_key.lock().is_some()); debug_assert!(self.peer.enc_key.lock().is_some());
debug!("peer.add_keypair: is initiator, must confirm the key"); log::trace!("peer.add_keypair: is initiator, must confirm the key");
// attempt to confirm using staged packets // attempt to confirm using staged packets
if !self.peer.send_staged() { if !self.peer.send_staged() {
// fall back to keepalive packet // fall back to keepalive packet
self.send_keepalive(); self.send_keepalive();
debug!("peer.add_keypair: keepalive for confirmation",); log::debug!("peer.add_keypair: keepalive for confirmation",);
} }
debug!("peer.add_keypair: key attempted confirmed"); log::trace!("peer.add_keypair: key attempted confirmed");
} }
debug_assert!( debug_assert!(
@@ -456,7 +458,7 @@ impl<E: Endpoint, C: Callbacks, T: tun::Writer, B: udp::Writer<E>> PeerHandle<E,
} }
pub fn send_keepalive(&self) { pub fn send_keepalive(&self) {
debug!("peer.send_keepalive"); log::trace!("peer.send_keepalive");
self.peer.send(vec![0u8; SIZE_MESSAGE_PREFIX], false) self.peer.send(vec![0u8; SIZE_MESSAGE_PREFIX], false)
} }

View File

@@ -1,12 +1,12 @@
use super::device::DecryptionState; use super::device::DecryptionState;
use super::ip::inner_length;
use super::messages::TransportHeader; use super::messages::TransportHeader;
use super::queue::{ParallelJob, Queue, SequentialJob}; use super::queue::{ParallelJob, Queue, SequentialJob};
use super::types::Callbacks; use super::types::Callbacks;
use super::{REJECT_AFTER_MESSAGES, SIZE_TAG}; use super::{REJECT_AFTER_MESSAGES, SIZE_KEEPALIVE};
use super::super::{tun, udp, Endpoint}; use super::super::{tun, udp, Endpoint};
use std::mem;
use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc; use std::sync::Arc;
@@ -15,8 +15,8 @@ use spin::Mutex;
use zerocopy::{AsBytes, LayoutVerified}; use zerocopy::{AsBytes, LayoutVerified};
struct Inner<E: Endpoint, C: Callbacks, T: tun::Writer, B: udp::Writer<E>> { struct Inner<E: Endpoint, C: Callbacks, T: tun::Writer, B: udp::Writer<E>> {
ready: AtomicBool, ready: AtomicBool, // job status
buffer: Mutex<(Option<E>, Vec<u8>)>, // endpoint & ciphertext buffer buffer: Mutex<(Option<E>, Vec<u8>)>, // endpoint & ciphertext buffer
state: Arc<DecryptionState<E, C, T, B>>, // decryption state (keys and replay protector) state: Arc<DecryptionState<E, C, T, B>>, // decryption state (keys and replay protector)
} }
@@ -53,26 +53,41 @@ impl<E: Endpoint, C: Callbacks, T: tun::Writer, B: udp::Writer<E>> ParallelJob
&self.0.state.peer.inbound &self.0.state.peer.inbound
} }
/* The parallel section of an incoming job:
*
* - Decryption.
* - Crypto-key routing lookup.
*
* Note: We truncate the message buffer to 0 bytes in case of authentication failure
* or crypto-key routing failure (attempted impersonation).
*
* Note: We cannot do replay protection in the parallel job,
* since this can cause dropping of packets (leaving the window) due to scheduling.
*/
fn parallel_work(&self) { fn parallel_work(&self) {
// TODO: refactor debug_assert_eq!(
self.is_ready(),
false,
"doing parallel work on completed job"
);
log::trace!("processing parallel receive job");
// decrypt // decrypt
{ {
// closure for locking
let job = &self.0; let job = &self.0;
let peer = &job.state.peer; let peer = &job.state.peer;
let mut msg = job.buffer.lock(); let mut msg = job.buffer.lock();
// cast to header followed by payload // process buffer
let (header, packet): (LayoutVerified<&mut [u8], TransportHeader>, &mut [u8]) = let ok = (|| {
match LayoutVerified::new_from_prefix(&mut msg.1[..]) { // cast to header followed by payload
Some(v) => v, let (header, packet): (LayoutVerified<&mut [u8], TransportHeader>, &mut [u8]) =
None => { match LayoutVerified::new_from_prefix(&mut msg.1[..]) {
log::debug!("inbound worker: failed to parse message"); Some(v) => v,
return; None => return false,
} };
};
// authenticate and decrypt payload
{
// create nonce object // create nonce object
let mut nonce = [0u8; 12]; let mut nonce = [0u8; 12];
debug_assert_eq!(nonce.len(), CHACHA20_POLY1305.nonce_len()); debug_assert_eq!(nonce.len(), CHACHA20_POLY1305.nonce_len());
@@ -87,47 +102,24 @@ impl<E: Endpoint, C: Callbacks, T: tun::Writer, B: udp::Writer<E>> ParallelJob
// attempt to open (and authenticate) the body // attempt to open (and authenticate) the body
match key.open_in_place(nonce, Aad::empty(), packet) { match key.open_in_place(nonce, Aad::empty(), packet) {
Ok(_) => (), Ok(_) => (),
Err(_) => { Err(_) => return false,
// fault and return early
log::trace!("inbound worker: authentication failure");
msg.1.truncate(0);
return;
}
} }
}
// check that counter not after reject // check that counter not after reject
if header.f_counter.get() >= REJECT_AFTER_MESSAGES { if header.f_counter.get() >= REJECT_AFTER_MESSAGES {
return false;
}
// check crypto-key router
packet.len() == SIZE_KEEPALIVE || peer.device.table.check_route(&peer, &packet)
})();
// remove message in case of failure:
// to indicate failure and avoid later accidental use of unauthenticated data.
if !ok {
msg.1.truncate(0); msg.1.truncate(0);
return;
} }
};
// cryptokey route and strip padding
let inner_len = {
let length = packet.len() - SIZE_TAG;
if length > 0 {
peer.device.table.check_route(&peer, &packet[..length])
} else {
Some(0)
}
};
// truncate to remove tag
match inner_len {
None => {
log::trace!("inbound worker: cryptokey routing failed");
msg.1.truncate(0);
}
Some(len) => {
log::trace!(
"inbound worker: good route, length = {} {}",
len,
if len == 0 { "(keepalive)" } else { "" }
);
msg.1.truncate(mem::size_of::<TransportHeader>() + len);
}
}
}
// mark ready // mark ready
self.0.ready.store(true, Ordering::Release); self.0.ready.store(true, Ordering::Release);
@@ -142,6 +134,13 @@ impl<E: Endpoint, C: Callbacks, T: tun::Writer, B: udp::Writer<E>> SequentialJob
} }
fn sequential_work(self) { fn sequential_work(self) {
debug_assert_eq!(
self.is_ready(),
true,
"doing sequential work on an incomplete job"
);
log::trace!("processing sequential receive job");
let job = &self.0; let job = &self.0;
let peer = &job.state.peer; let peer = &job.state.peer;
let mut msg = job.buffer.lock(); let mut msg = job.buffer.lock();
@@ -152,7 +151,7 @@ impl<E: Endpoint, C: Callbacks, T: tun::Writer, B: udp::Writer<E>> SequentialJob
match LayoutVerified::new_from_prefix(&msg.1[..]) { match LayoutVerified::new_from_prefix(&msg.1[..]) {
Some(v) => v, Some(v) => v,
None => { None => {
// also covers authentication failure // also covers authentication failure (will fail to parse header)
return; return;
} }
}; };
@@ -173,20 +172,16 @@ impl<E: Endpoint, C: Callbacks, T: tun::Writer, B: udp::Writer<E>> SequentialJob
*peer.endpoint.lock() = endpoint; *peer.endpoint.lock() = endpoint;
// check if should be written to TUN // check if should be written to TUN
let mut sent = false; // (keep-alive and malformed packets will have no inner length)
if packet.len() > 0 { if let Some(inner) = inner_length(packet) {
sent = match peer.device.inbound.write(&packet[..]) { if inner >= packet.len() {
Err(e) => { let _ = peer.device.inbound.write(&packet[..inner]).map_err(|e| {
log::debug!("failed to write inbound packet to TUN: {:?}", e); log::debug!("failed to write inbound packet to TUN: {:?}", e);
false });
}
Ok(_) => true,
} }
} else {
log::debug!("inbound worker: received keepalive")
} }
// trigger callback // trigger callback
C::recv(&peer.opaque, msg.1.len(), sent, &job.state.keypair); C::recv(&peer.opaque, msg.1.len(), true, &job.state.keypair);
} }
} }

View File

@@ -1,13 +1,11 @@
use super::ip::*; use super::ip::*;
use zerocopy::LayoutVerified;
use std::mem;
use std::net::{IpAddr, Ipv4Addr, Ipv6Addr}; use std::net::{IpAddr, Ipv4Addr, Ipv6Addr};
use spin::RwLock; use spin::RwLock;
use treebitmap::address::Address; use treebitmap::address::Address;
use treebitmap::IpLookupTable; use treebitmap::IpLookupTable;
use zerocopy::LayoutVerified;
/* Functions for obtaining and validating "cryptokey" routes */ /* Functions for obtaining and validating "cryptokey" routes */
@@ -115,53 +113,26 @@ impl<T: Eq + Clone> RoutingTable<T> {
} }
#[inline(always)] #[inline(always)]
pub fn check_route(&self, peer: &T, packet: &[u8]) -> Option<usize> { pub fn check_route(&self, peer: &T, packet: &[u8]) -> bool {
match packet.get(0)? >> 4 { match packet.get(0).map(|v| v >> 4) {
VERSION_IP4 => { Some(VERSION_IP4) => LayoutVerified::new_from_prefix(packet)
// check length and cast to IPv4 header .and_then(|(header, _): (LayoutVerified<&[u8], IPv4Header>, _)| {
let (header, _): (LayoutVerified<&[u8], IPv4Header>, _) = self.ipv4
LayoutVerified::new_from_prefix(packet)?; .read()
.longest_match(Ipv4Addr::from(header.f_source))
.map(|(_, _, p)| p == peer)
})
.is_some(),
log::trace!( Some(VERSION_IP6) => LayoutVerified::new_from_prefix(packet)
"router, check route for IPv4 source: {:?}", .and_then(|(header, _): (LayoutVerified<&[u8], IPv6Header>, _)| {
Ipv4Addr::from(header.f_source) self.ipv6
); .read()
.longest_match(Ipv6Addr::from(header.f_source))
// check IPv4 source address .map(|(_, _, p)| p == peer)
self.ipv4 })
.read() .is_some(),
.longest_match(Ipv4Addr::from(header.f_source)) _ => false,
.and_then(|(_, _, p)| {
if p == peer {
Some(header.f_total_len.get() as usize)
} else {
None
}
})
}
VERSION_IP6 => {
// check length and cast to IPv6 header
let (header, _): (LayoutVerified<&[u8], IPv6Header>, _) =
LayoutVerified::new_from_prefix(packet)?;
log::trace!(
"router, check route for IPv6 source: {:?}",
Ipv6Addr::from(header.f_source)
);
// check IPv6 source address
self.ipv6
.read()
.longest_match(Ipv6Addr::from(header.f_source))
.and_then(|(_, _, p)| {
if p == peer {
Some(header.f_len.get() as usize + mem::size_of::<IPv6Header>())
} else {
None
}
})
}
_ => None,
} }
} }
} }

View File

@@ -1,9 +1,9 @@
use super::queue::{SequentialJob, ParallelJob, Queue};
use super::KeyPair;
use super::types::Callbacks;
use super::peer::Peer;
use super::{REJECT_AFTER_MESSAGES, SIZE_TAG};
use super::messages::{TransportHeader, TYPE_TRANSPORT}; use super::messages::{TransportHeader, TYPE_TRANSPORT};
use super::peer::Peer;
use super::queue::{ParallelJob, Queue, SequentialJob};
use super::types::Callbacks;
use super::KeyPair;
use super::{REJECT_AFTER_MESSAGES, SIZE_TAG};
use super::super::{tun, udp, Endpoint}; use super::super::{tun, udp, Endpoint};
@@ -11,8 +11,8 @@ use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc; use std::sync::Arc;
use ring::aead::{Aad, LessSafeKey, Nonce, UnboundKey, CHACHA20_POLY1305}; use ring::aead::{Aad, LessSafeKey, Nonce, UnboundKey, CHACHA20_POLY1305};
use zerocopy::{AsBytes, LayoutVerified};
use spin::Mutex; use spin::Mutex;
use zerocopy::{AsBytes, LayoutVerified};
struct Inner<E: Endpoint, C: Callbacks, T: tun::Writer, B: udp::Writer<E>> { struct Inner<E: Endpoint, C: Callbacks, T: tun::Writer, B: udp::Writer<E>> {
ready: AtomicBool, ready: AtomicBool,
@@ -22,67 +22,36 @@ struct Inner<E: Endpoint, C: Callbacks, T: tun::Writer, B: udp::Writer<E>> {
peer: Peer<E, C, T, B>, peer: Peer<E, C, T, B>,
} }
pub struct SendJob<E: Endpoint, C: Callbacks, T: tun::Writer, B: udp::Writer<E>> ( pub struct SendJob<E: Endpoint, C: Callbacks, T: tun::Writer, B: udp::Writer<E>>(
Arc<Inner<E, C, T, B>> Arc<Inner<E, C, T, B>>,
); );
impl <E: Endpoint, C: Callbacks, T: tun::Writer, B: udp::Writer<E>> Clone for SendJob<E, C, T, B> { impl<E: Endpoint, C: Callbacks, T: tun::Writer, B: udp::Writer<E>> Clone for SendJob<E, C, T, B> {
fn clone(&self) -> SendJob<E, C, T, B> { fn clone(&self) -> SendJob<E, C, T, B> {
SendJob(self.0.clone()) SendJob(self.0.clone())
} }
} }
impl <E: Endpoint, C: Callbacks, T: tun::Writer, B: udp::Writer<E>> SendJob<E, C, T, B> { impl<E: Endpoint, C: Callbacks, T: tun::Writer, B: udp::Writer<E>> SendJob<E, C, T, B> {
pub fn new( pub fn new(
buffer: Vec<u8>, buffer: Vec<u8>,
counter: u64, counter: u64,
keypair: Arc<KeyPair>, keypair: Arc<KeyPair>,
peer: Peer<E, C, T, B> peer: Peer<E, C, T, B>,
) -> SendJob<E, C, T, B> { ) -> SendJob<E, C, T, B> {
SendJob(Arc::new(Inner{ SendJob(Arc::new(Inner {
buffer: Mutex::new(buffer), buffer: Mutex::new(buffer),
counter, counter,
keypair, keypair,
peer, peer,
ready: AtomicBool::new(false) ready: AtomicBool::new(false),
})) }))
} }
} }
impl <E: Endpoint, C: Callbacks, T: tun::Writer, B: udp::Writer<E>> SequentialJob for SendJob<E, C, T, B> { impl<E: Endpoint, C: Callbacks, T: tun::Writer, B: udp::Writer<E>> ParallelJob
for SendJob<E, C, T, B>
fn is_ready(&self) -> bool { {
self.0.ready.load(Ordering::Acquire)
}
fn sequential_work(self) {
debug_assert_eq!(
self.is_ready(),
true,
"doing sequential work
on an incomplete job"
);
log::trace!("processing sequential send job");
// send to peer
let job = &self.0;
let msg = job.buffer.lock();
let xmit = job.peer.send_raw(&msg[..]).is_ok();
// trigger callback (for timers)
C::send(
&job.peer.opaque,
msg.len(),
xmit,
&job.keypair,
job.counter,
);
}
}
impl <E: Endpoint, C: Callbacks, T: tun::Writer, B: udp::Writer<E>> ParallelJob for SendJob<E, C, T, B> {
fn queue(&self) -> &Queue<Self> { fn queue(&self) -> &Queue<Self> {
&self.0.peer.outbound &self.0.peer.outbound
} }
@@ -141,3 +110,29 @@ impl <E: Endpoint, C: Callbacks, T: tun::Writer, B: udp::Writer<E>> ParallelJob
self.0.ready.store(true, Ordering::Release); self.0.ready.store(true, Ordering::Release);
} }
} }
impl<E: Endpoint, C: Callbacks, T: tun::Writer, B: udp::Writer<E>> SequentialJob
for SendJob<E, C, T, B>
{
fn is_ready(&self) -> bool {
self.0.ready.load(Ordering::Acquire)
}
fn sequential_work(self) {
debug_assert_eq!(
self.is_ready(),
true,
"doing sequential work
on an incomplete job"
);
log::trace!("processing sequential send job");
// send to peer
let job = &self.0;
let msg = job.buffer.lock();
let xmit = job.peer.send_raw(&msg[..]).is_ok();
// trigger callback (for timers)
C::send(&job.peer.opaque, msg.len(), xmit, &job.keypair, job.counter);
}
}

View File

@@ -1,229 +1,264 @@
use std::net::IpAddr;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::sync::Mutex;
use std::thread;
use std::time::Duration;
use num_cpus;
use super::super::dummy;
use super::super::dummy_keypair;
use super::super::tests::make_packet;
use super::super::udp::*;
use super::KeyPair; use super::KeyPair;
use super::SIZE_MESSAGE_PREFIX; use super::SIZE_MESSAGE_PREFIX;
use super::{Callbacks, Device}; use super::{Callbacks, Device};
use super::SIZE_KEEPALIVE;
use super::super::dummy;
use super::super::dummy_keypair;
use super::super::tests::make_packet;
use crate::platform::udp::Reader;
use std::net::IpAddr;
use std::ops::Deref;
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering;
use std::sync::mpsc::{channel, Receiver, RecvTimeoutError, Sender};
use std::sync::Arc;
use std::sync::Mutex;
use std::time::Duration;
use env_logger;
use num_cpus;
use test::Bencher;
extern crate test; extern crate test;
const SIZE_KEEPALIVE: usize = 32; const SIZE_MSG: usize = 1024;
#[cfg(test)] const TIMEOUT: Duration = Duration::from_millis(1000);
mod tests {
use super::*;
use env_logger;
use log::debug;
use std::sync::atomic::AtomicUsize;
use test::Bencher;
// type for tracking events inside the router module struct EventTracker<E> {
struct Flags { rx: Mutex<Receiver<E>>,
send: Mutex<Vec<(usize, bool)>>, tx: Mutex<Sender<E>>,
recv: Mutex<Vec<(usize, bool)>>, }
need_key: Mutex<Vec<()>>,
key_confirmed: Mutex<Vec<()>>,
}
#[derive(Clone)] impl<E> EventTracker<E> {
struct Opaque(Arc<Flags>); fn new() -> Self {
let (tx, rx) = channel();
struct TestCallbacks(); EventTracker {
rx: Mutex::new(rx),
impl Opaque { tx: Mutex::new(tx),
fn new() -> Opaque {
Opaque(Arc::new(Flags {
send: Mutex::new(vec![]),
recv: Mutex::new(vec![]),
need_key: Mutex::new(vec![]),
key_confirmed: Mutex::new(vec![]),
}))
}
fn reset(&self) {
self.0.send.lock().unwrap().clear();
self.0.recv.lock().unwrap().clear();
self.0.need_key.lock().unwrap().clear();
self.0.key_confirmed.lock().unwrap().clear();
}
fn send(&self) -> Option<(usize, bool)> {
self.0.send.lock().unwrap().pop()
}
fn recv(&self) -> Option<(usize, bool)> {
self.0.recv.lock().unwrap().pop()
}
fn need_key(&self) -> Option<()> {
self.0.need_key.lock().unwrap().pop()
}
fn key_confirmed(&self) -> Option<()> {
self.0.key_confirmed.lock().unwrap().pop()
}
// has all events been accounted for by assertions?
fn is_empty(&self) -> bool {
let send = self.0.send.lock().unwrap();
let recv = self.0.recv.lock().unwrap();
let need_key = self.0.need_key.lock().unwrap();
let key_confirmed = self.0.key_confirmed.lock().unwrap();
send.is_empty() && recv.is_empty() && need_key.is_empty() & key_confirmed.is_empty()
} }
} }
impl Callbacks for TestCallbacks { fn log(&self, e: E) {
type Opaque = Opaque; self.tx.lock().unwrap().send(e).unwrap();
}
fn send(t: &Self::Opaque, size: usize, sent: bool, _keypair: &Arc<KeyPair>, _counter: u64) { fn wait(&self, timeout: Duration) -> Option<E> {
t.0.send.lock().unwrap().push((size, sent)) match self.rx.lock().unwrap().recv_timeout(timeout) {
} Ok(v) => Some(v),
Err(RecvTimeoutError::Timeout) => None,
fn recv(t: &Self::Opaque, size: usize, sent: bool, _keypair: &Arc<KeyPair>) { Err(RecvTimeoutError::Disconnected) => panic!("Disconnect"),
t.0.recv.lock().unwrap().push((size, sent))
}
fn need_key(t: &Self::Opaque) {
t.0.need_key.lock().unwrap().push(());
}
fn key_confirmed(t: &Self::Opaque) {
t.0.key_confirmed.lock().unwrap().push(());
} }
} }
// wait for scheduling fn now(&self) -> Option<E> {
fn wait() { self.wait(Duration::from_millis(0))
thread::sleep(Duration::from_millis(15));
} }
}
fn init() { // type for tracking events inside the router module
let _ = env_logger::builder().is_test(true).try_init(); struct Inner {
send: EventTracker<(usize, bool)>,
recv: EventTracker<(usize, bool)>,
need_key: EventTracker<()>,
key_confirmed: EventTracker<()>,
}
#[derive(Clone)]
struct Opaque {
inner: Arc<Inner>,
}
impl Deref for Opaque {
type Target = Inner;
fn deref(&self) -> &Self::Target {
&self.inner
} }
}
fn make_packet_padded(size: usize, src: IpAddr, dst: IpAddr, id: u64) -> Vec<u8> { struct TestCallbacks();
let p = make_packet(size, src, dst, id);
let mut o = vec![0; p.len() + SIZE_MESSAGE_PREFIX];
o[SIZE_MESSAGE_PREFIX..SIZE_MESSAGE_PREFIX + p.len()].copy_from_slice(&p[..]);
o
}
#[bench] impl Opaque {
fn bench_outbound(b: &mut Bencher) { fn new() -> Opaque {
struct BencherCallbacks {} Opaque {
impl Callbacks for BencherCallbacks { inner: Arc::new(Inner {
type Opaque = Arc<AtomicUsize>; send: EventTracker::new(),
fn send( recv: EventTracker::new(),
t: &Self::Opaque, need_key: EventTracker::new(),
size: usize, key_confirmed: EventTracker::new(),
_sent: bool, }),
_keypair: &Arc<KeyPair>,
_counter: u64,
) {
t.fetch_add(size, Ordering::SeqCst);
}
fn recv(_: &Self::Opaque, _size: usize, _sent: bool, _keypair: &Arc<KeyPair>) {}
fn need_key(_: &Self::Opaque) {}
fn key_confirmed(_: &Self::Opaque) {}
} }
}
}
// create device macro_rules! no_events {
let (_fake, _reader, tun_writer, _mtu) = dummy::TunTest::create(false); ($opq:expr) => {
let router: Device<_, BencherCallbacks, dummy::TunWriter, dummy::VoidBind> = assert_eq!($opq.send.now(), None, "unexpected send event");
Device::new(num_cpus::get(), tun_writer); assert_eq!($opq.recv.now(), None, "unexpected recv event");
assert_eq!($opq.need_key.now(), None, "unexpected need_key event");
assert_eq!(
$opq.key_confirmed.now(),
None,
"unexpected key_confirmed event"
);
};
}
// add new peer impl Callbacks for TestCallbacks {
let opaque = Arc::new(AtomicUsize::new(0)); type Opaque = Opaque;
let peer = router.new_peer(opaque.clone());
peer.add_keypair(dummy_keypair(true));
// add subnet to peer fn send(t: &Self::Opaque, size: usize, sent: bool, _keypair: &Arc<KeyPair>, _counter: u64) {
let (mask, len, dst) = ("192.168.1.0", 24, "192.168.1.20"); t.send.log((size, sent))
let mask: IpAddr = mask.parse().unwrap();
peer.add_allowed_ip(mask, len);
// create "IP packet"
let dst = dst.parse().unwrap();
let src = match dst {
IpAddr::V4(_) => "127.0.0.1".parse().unwrap(),
IpAddr::V6(_) => "::1".parse().unwrap(),
};
let msg = make_packet_padded(1024, src, dst, 0);
// every iteration sends 10 GB
b.iter(|| {
opaque.store(0, Ordering::SeqCst);
while opaque.load(Ordering::Acquire) < 10 * 1024 * 1024 {
router.send(msg.to_vec()).unwrap();
}
});
} }
#[test] fn recv(t: &Self::Opaque, size: usize, sent: bool, _keypair: &Arc<KeyPair>) {
fn test_outbound() { t.recv.log((size, sent))
init(); }
// create device fn need_key(t: &Self::Opaque) {
let (_fake, _reader, tun_writer, _mtu) = dummy::TunTest::create(false); t.need_key.log(());
let router: Device<_, TestCallbacks, _, _> = Device::new(1, tun_writer); }
router.set_outbound_writer(dummy::VoidBind::new());
let tests = vec![ fn key_confirmed(t: &Self::Opaque) {
("192.168.1.0", 24, "192.168.1.20", true), t.key_confirmed.log(());
("172.133.133.133", 32, "172.133.133.133", true), }
("172.133.133.133", 32, "172.133.133.132", false), }
(
"2001:db8::ff00:42:0000",
112,
"2001:db8::ff00:42:3242",
true,
),
(
"2001:db8::ff00:42:8000",
113,
"2001:db8::ff00:42:0660",
false,
),
(
"2001:db8::ff00:42:8000",
113,
"2001:db8::ff00:42:ffff",
true,
),
];
for (num, (mask, len, dst, okay)) in tests.iter().enumerate() { fn init() {
println!( let _ = env_logger::builder().is_test(true).try_init();
"Check: {} {} {}/{}", }
dst,
if *okay { "\\in" } else { "\\notin" }, fn make_packet_padded(size: usize, src: IpAddr, dst: IpAddr, id: u64) -> Vec<u8> {
mask, let p = make_packet(size, src, dst, id);
len let mut o = vec![0; p.len() + SIZE_MESSAGE_PREFIX];
); o[SIZE_MESSAGE_PREFIX..SIZE_MESSAGE_PREFIX + p.len()].copy_from_slice(&p[..]);
for set_key in vec![true, false] { o
debug!("index = {}, set_key = {}", num, set_key); }
#[bench]
fn bench_outbound(b: &mut Bencher) {
struct BencherCallbacks {}
impl Callbacks for BencherCallbacks {
type Opaque = Arc<AtomicUsize>;
fn send(
t: &Self::Opaque,
size: usize,
_sent: bool,
_keypair: &Arc<KeyPair>,
_counter: u64,
) {
t.fetch_add(size, Ordering::SeqCst);
}
fn recv(_: &Self::Opaque, _size: usize, _sent: bool, _keypair: &Arc<KeyPair>) {}
fn need_key(_: &Self::Opaque) {}
fn key_confirmed(_: &Self::Opaque) {}
}
// create device
let (_fake, _reader, tun_writer, _mtu) = dummy::TunTest::create(false);
let router: Device<_, BencherCallbacks, dummy::TunWriter, dummy::VoidBind> =
Device::new(num_cpus::get(), tun_writer);
// add new peer
let opaque = Arc::new(AtomicUsize::new(0));
let peer = router.new_peer(opaque.clone());
peer.add_keypair(dummy_keypair(true));
// add subnet to peer
let (mask, len, dst) = ("192.168.1.0", 24, "192.168.1.20");
let mask: IpAddr = mask.parse().unwrap();
peer.add_allowed_ip(mask, len);
// create "IP packet"
let dst = dst.parse().unwrap();
let src = match dst {
IpAddr::V4(_) => "127.0.0.1".parse().unwrap(),
IpAddr::V6(_) => "::1".parse().unwrap(),
};
let msg = make_packet_padded(1024, src, dst, 0);
// every iteration sends 10 GB
b.iter(|| {
opaque.store(0, Ordering::SeqCst);
while opaque.load(Ordering::Acquire) < 10 * 1024 * 1024 {
router.send(msg.to_vec()).unwrap();
}
});
}
#[test]
fn test_outbound() {
init();
// create device
let (_fake, _reader, tun_writer, _mtu) = dummy::TunTest::create(false);
let router: Device<_, TestCallbacks, _, _> = Device::new(1, tun_writer);
router.set_outbound_writer(dummy::VoidBind::new());
let tests = vec![
("192.168.1.0", 24, "192.168.1.20", true),
("172.133.133.133", 32, "172.133.133.133", true),
("172.133.133.133", 32, "172.133.133.132", false),
(
"2001:db8::ff00:42:0000",
112,
"2001:db8::ff00:42:3242",
true,
),
(
"2001:db8::ff00:42:8000",
113,
"2001:db8::ff00:42:0660",
false,
),
(
"2001:db8::ff00:42:8000",
113,
"2001:db8::ff00:42:ffff",
true,
),
];
for (mask, len, dst, okay) in tests.iter() {
let len = *len;
let okay = *okay;
println!(
"Check: {} {} {}/{}",
dst,
if okay { "\\in" } else { "\\notin" },
mask,
len
);
for set_key in vec![true, false] {
for confirm_with_staged_packet in vec![true, false] {
let send_keepalive = (!confirm_with_staged_packet || !okay) && set_key;
let send_payload = okay && set_key;
let need_key = ((confirm_with_staged_packet && set_key) || !set_key) && okay;
println!(
" confirm_with_staged_packet = {}, send_keepalive = {}, set_key = {}",
confirm_with_staged_packet, send_keepalive, set_key
);
// add new peer // add new peer
let opaque = Opaque::new(); let opaque = Opaque::new();
let peer = router.new_peer(opaque.clone()); let peer = router.new_peer(opaque.clone());
let mask: IpAddr = mask.parse().unwrap(); let mask: IpAddr = mask.parse().unwrap();
if set_key {
// confirm using keepalive
if set_key && (!confirm_with_staged_packet) {
peer.add_keypair(dummy_keypair(true)); peer.add_keypair(dummy_keypair(true));
} }
// map subnet to peer // map subnet to peer
peer.add_allowed_ip(mask, *len); peer.add_allowed_ip(mask, len);
// create "IP packet" // create "IP packet"
let dst = dst.parse().unwrap(); let dst = dst.parse().unwrap();
@@ -231,246 +266,279 @@ mod tests {
IpAddr::V4(_) => "127.0.0.1".parse().unwrap(), IpAddr::V4(_) => "127.0.0.1".parse().unwrap(),
IpAddr::V6(_) => "::1".parse().unwrap(), IpAddr::V6(_) => "::1".parse().unwrap(),
}; };
let msg = make_packet_padded(1024, src, dst, 0); let msg = make_packet_padded(SIZE_MSG, src, dst, 0);
// cryptkey route the IP packet // crypto-key route the IP packet
let res = router.send(msg); let res = router.send(msg);
assert_eq!(
res.is_ok(),
okay,
"crypto-routing / destination lookup failure"
);
// allow some scheduling // confirm using staged packet
wait(); if set_key && confirm_with_staged_packet {
peer.add_keypair(dummy_keypair(true));
}
if *okay { // check for key-material request
// cryptkey routing succeeded if need_key {
assert!(res.is_ok(), "crypt-key routing should succeed: {:?}", res);
assert_eq!( assert_eq!(
opaque.need_key().is_some(), opaque.need_key.wait(TIMEOUT),
!set_key, Some(()),
"should have requested a new key, if no encryption state was set" "should have requested a new key, if no encryption state was set"
); );
}
// check for keepalive
if send_keepalive {
assert_eq!( assert_eq!(
opaque.send().is_some(), opaque.send.wait(TIMEOUT),
set_key, Some((SIZE_KEEPALIVE, false)),
"transmission should have been attempted" "keepalive should be sent before transport message"
);
assert!(
opaque.recv().is_none(),
"no messages should have been marked as received"
);
} else {
// no such cryptkey route
assert!(res.is_err(), "crypt-key routing should fail");
assert!(
opaque.need_key().is_none(),
"should not request a new-key if crypt-key routing failed"
); );
}
// check for encryption of payload
if send_payload {
assert_eq!( assert_eq!(
opaque.send(), opaque.send.wait(TIMEOUT),
if set_key { Some((SIZE_KEEPALIVE + SIZE_MSG, false)),
Some((SIZE_KEEPALIVE, false)) "message buffer should be encrypted"
} else { )
None
},
"transmission should only happen if key was set (keepalive)",
);
assert!(
opaque.recv().is_none(),
"no messages should have been marked as received",
);
}
}
}
println!("Test complete, drop device");
}
#[test]
fn test_bidirectional() {
init();
let tests = [
(
("192.168.1.0", 24, "192.168.1.20", true),
("172.133.133.133", 32, "172.133.133.133", true),
),
(
("192.168.1.0", 24, "192.168.1.20", true),
("172.133.133.133", 32, "172.133.133.133", true),
),
(
(
"2001:db8::ff00:42:8000",
113,
"2001:db8::ff00:42:ffff",
true,
),
(
"2001:db8::ff40:42:8000",
113,
"2001:db8::ff40:42:ffff",
true,
),
),
(
(
"2001:db8::ff00:42:8000",
113,
"2001:db8::ff00:42:ffff",
true,
),
(
"2001:db8::ff40:42:8000",
113,
"2001:db8::ff40:42:ffff",
true,
),
),
];
for stage in vec![true, false] {
for (p1, p2) in tests.iter() {
let ((bind_reader1, bind_writer1), (bind_reader2, bind_writer2)) =
dummy::PairBind::pair();
// create matching device
let (_fake, _, tun_writer1, _) = dummy::TunTest::create(false);
let (_fake, _, tun_writer2, _) = dummy::TunTest::create(false);
let router1: Device<_, TestCallbacks, _, _> = Device::new(1, tun_writer1);
router1.set_outbound_writer(bind_writer1);
let router2: Device<_, TestCallbacks, _, _> = Device::new(1, tun_writer2);
router2.set_outbound_writer(bind_writer2);
// prepare opaque values for tracing callbacks
let opaque1 = Opaque::new();
let opaque2 = Opaque::new();
// create peers with matching keypairs and assign subnets
let peer1 = router1.new_peer(opaque1.clone());
let peer2 = router2.new_peer(opaque2.clone());
{
let (mask, len, _ip, _okay) = p1;
let mask: IpAddr = mask.parse().unwrap();
peer1.add_allowed_ip(mask, *len);
peer1.add_keypair(dummy_keypair(false));
} }
{ // check that we handled all events
let (mask, len, _ip, _okay) = p2; no_events!(opaque);
let mask: IpAddr = mask.parse().unwrap(); }
peer2.add_allowed_ip(mask, *len); }
peer2.set_endpoint(dummy::UnitEndpoint::new()); }
} }
if stage { #[test]
println!("confirm using staged packet"); fn test_bidirectional() {
init();
// create IP packet
let (_mask, _len, ip1, _okay) = p1; let tests = [
let (_mask, _len, ip2, _okay) = p2; (
let msg = make_packet_padded( ("192.168.1.0", 24, "192.168.1.20", true),
1024, ("172.133.133.133", 32, "172.133.133.133", true),
ip1.parse().unwrap(), // src ),
ip2.parse().unwrap(), // dst (
0, ("192.168.1.0", 24, "192.168.1.20", true),
); ("172.133.133.133", 32, "172.133.133.133", true),
),
// stage packet for sending (
router2.send(msg).expect("failed to sent staged packet"); (
wait(); "2001:db8::ff00:42:8000",
113,
// validate events "2001:db8::ff00:42:ffff",
assert!(opaque2.recv().is_none()); true,
assert!( ),
opaque2.send().is_none(), (
"sending should fail as not key is set" "2001:db8::ff40:42:8000",
); 113,
assert!( "2001:db8::ff40:42:ffff",
opaque2.need_key().is_some(), true,
"a new key should be requested since a packet was attempted transmitted" ),
); ),
assert!(opaque2.is_empty(), "callbacks should only run once"); (
} (
"2001:db8::ff00:42:8000",
// this should cause a key-confirmation packet (keepalive or staged packet) 113,
// this also causes peer1 to learn the "endpoint" for peer2 "2001:db8::ff00:42:ffff",
assert!(peer1.get_endpoint().is_none()); true,
peer2.add_keypair(dummy_keypair(true)); ),
(
wait(); "2001:db8::ff40:42:8000",
assert!(opaque2.send().is_some()); 113,
assert!(opaque2.is_empty(), "events on peer2 should be 'send'"); "2001:db8::ff40:42:ffff",
assert!(opaque1.is_empty(), "nothing should happened on peer1"); true,
),
// read confirming message received by the other end ("across the internet") ),
let mut buf = vec![0u8; 2048]; ];
let (len, from) = bind_reader1.read(&mut buf).unwrap();
buf.truncate(len); for (p1, p2) in tests.iter() {
router1.recv(from, buf).unwrap(); for confirm_with_staged_packet in vec![true, false] {
println!(
wait(); "peer1 = {:?}, peer2 = {:?}, confirm_with_staged_packet = {}",
assert!(opaque1.recv().is_some()); p1, p2, confirm_with_staged_packet
assert!(opaque1.key_confirmed().is_some()); );
assert!(
opaque1.is_empty(), let ((bind_reader1, bind_writer1), (bind_reader2, bind_writer2)) =
"events on peer1 should be 'recv' and 'key_confirmed'" dummy::PairBind::pair();
);
assert!(peer1.get_endpoint().is_some()); let confirm_packet_size = if confirm_with_staged_packet {
assert!(opaque2.is_empty(), "nothing should happened on peer2"); SIZE_KEEPALIVE + SIZE_MSG
} else {
// now that peer1 has an endpoint SIZE_KEEPALIVE
// route packets : peer1 -> peer2 };
for id in 1..11 { // create matching device
println!("round: {}", id); let (_fake, _, tun_writer1, _) = dummy::TunTest::create(false);
assert!( let (_fake, _, tun_writer2, _) = dummy::TunTest::create(false);
opaque1.is_empty(),
"we should have asserted a value for every callback on peer1" let router1: Device<_, TestCallbacks, _, _> = Device::new(1, tun_writer1);
); router1.set_outbound_writer(bind_writer1);
assert!(
opaque2.is_empty(), let router2: Device<_, TestCallbacks, _, _> = Device::new(1, tun_writer2);
"we should have asserted a value for every callback on peer2" router2.set_outbound_writer(bind_writer2);
);
// prepare opaque values for tracing callbacks
// pass IP packet to router
let (_mask, _len, ip1, _okay) = p1; let opaque1 = Opaque::new();
let (_mask, _len, ip2, _okay) = p2; let opaque2 = Opaque::new();
let msg = make_packet_padded(
1024, // create peers with matching keypairs and assign subnets
ip2.parse().unwrap(), // src
ip1.parse().unwrap(), // dst let peer1 = router1.new_peer(opaque1.clone());
id, let peer2 = router2.new_peer(opaque2.clone());
);
router1.send(msg).unwrap(); {
let (mask, len, _ip, _okay) = p1;
wait(); let mask: IpAddr = mask.parse().unwrap();
assert!(opaque1.send().is_some(), "encryption should succeed"); peer1.add_allowed_ip(mask, *len);
assert!( peer1.add_keypair(dummy_keypair(false));
opaque1.recv().is_none(), }
"receiving callback should not be called"
); {
assert!(opaque1.need_key().is_none()); let (mask, len, _ip, _okay) = p2;
let mask: IpAddr = mask.parse().unwrap();
// receive ("across the internet") on the other end peer2.add_allowed_ip(mask, *len);
let mut buf = vec![0u8; 2048]; peer2.set_endpoint(dummy::UnitEndpoint::new());
let (len, from) = bind_reader2.read(&mut buf).unwrap(); }
buf.truncate(len);
router2.recv(from, buf).unwrap(); if confirm_with_staged_packet {
// create IP packet
wait(); let (_mask, _len, ip1, _okay) = p1;
assert!( let (_mask, _len, ip2, _okay) = p2;
opaque2.send().is_none(), let msg = make_packet_padded(
"sending callback should not be called" SIZE_MSG,
); ip1.parse().unwrap(), // src
assert!( ip2.parse().unwrap(), // dst
opaque2.recv().is_some(), 0,
"decryption and routing should succeed" );
);
assert!(opaque2.need_key().is_none()); // stage packet for sending
} router2.send(msg).expect("failed to sent staged packet");
// a new key should have been requested from the handshake machine
assert_eq!(
opaque2.need_key.wait(TIMEOUT),
Some(()),
"a new key should be requested since a packet was attempted transmitted"
);
no_events!(opaque1);
no_events!(opaque2);
}
// add a keypair
assert_eq!(peer1.get_endpoint(), None, "no endpoint has yet been set");
peer2.add_keypair(dummy_keypair(true));
// this should cause a key-confirmation packet (keepalive or staged packet)
assert_eq!(
opaque2.send.wait(TIMEOUT),
Some((confirm_packet_size, true)),
"expected successful transmission of a confirmation packet"
);
// no other events should fire
no_events!(opaque1);
no_events!(opaque2);
// read confirming message received by the other end ("across the internet")
let mut buf = vec![0u8; SIZE_MSG * 2];
let (len, from) = bind_reader1.read(&mut buf).unwrap();
buf.truncate(len);
assert_eq!(
len,
if confirm_with_staged_packet {
SIZE_MSG + SIZE_KEEPALIVE
} else {
SIZE_KEEPALIVE
},
"unexpected size of confirmation message"
);
// pass to the router for processing
router1
.recv(from, buf)
.expect("failed to receive confirmation message");
// check that a receive event is fired
assert_eq!(
opaque1.recv.wait(TIMEOUT),
Some((confirm_packet_size, true)),
"we expect processing to be successful"
);
// the key is confirmed
assert_eq!(
opaque1.key_confirmed.wait(TIMEOUT),
Some(()),
"confirmation message should confirm the key"
);
// peer1 learns the endpoint
assert!(
peer1.get_endpoint().is_some(),
"peer1 should learn the endpoint of peer2 from the confirmation message (roaming)"
);
// no other events should fire
no_events!(opaque1);
no_events!(opaque2);
// now that peer1 has an endpoint
// route packets in the other direction: peer1 -> peer2
for id in 1..11 {
println!("packet: {}", id);
let message_size = 1024;
// pass IP packet to router
let (_mask, _len, ip1, _okay) = p1;
let (_mask, _len, ip2, _okay) = p2;
let msg = make_packet_padded(
message_size,
ip2.parse().unwrap(), // src
ip1.parse().unwrap(), // dst
id,
);
router1
.send(msg)
.expect("we expect routing to be successful");
// encryption succeeds and the correct size is logged
assert_eq!(
opaque1.send.wait(TIMEOUT),
Some((message_size + SIZE_KEEPALIVE, true)),
"expected send event for peer1 -> peer2 payload"
);
// otherwise no events
no_events!(opaque1);
no_events!(opaque2);
// receive ("across the internet") on the other end
let mut buf = vec![0u8; 2048];
let (len, from) = bind_reader2.read(&mut buf).unwrap();
buf.truncate(len);
router2.recv(from, buf).unwrap();
// check that decryption succeeds
assert_eq!(
opaque2.recv.wait(TIMEOUT),
Some((message_size + SIZE_KEEPALIVE, true)),
"decryption and routing should succeed"
);
// otherwise no events
no_events!(opaque1);
no_events!(opaque2);
} }
} }
} }

View File

@@ -6,6 +6,7 @@ use super::receive::ReceiveJob;
use super::send::SendJob; use super::send::SendJob;
use crossbeam_channel::Receiver; use crossbeam_channel::Receiver;
use log;
pub enum JobUnion<E: Endpoint, C: Callbacks, T: tun::Writer, B: udp::Writer<E>> { pub enum JobUnion<E: Endpoint, C: Callbacks, T: tun::Writer, B: udp::Writer<E>> {
Outbound(SendJob<E, C, T, B>), Outbound(SendJob<E, C, T, B>),
@@ -16,8 +17,12 @@ pub fn worker<E: Endpoint, C: Callbacks, T: tun::Writer, B: udp::Writer<E>>(
receiver: Receiver<JobUnion<E, C, T, B>>, receiver: Receiver<JobUnion<E, C, T, B>>,
) { ) {
loop { loop {
log::trace!("pool worker awaiting job");
match receiver.recv() { match receiver.recv() {
Err(_) => break, Err(e) => {
log::debug!("worker stopped with {}", e);
break;
}
Ok(JobUnion::Inbound(job)) => { Ok(JobUnion::Inbound(job)) => {
job.parallel_work(); job.parallel_work();
job.queue().consume(); job.queue().consume();