Layout work on router

This commit is contained in:
Mathias Hall-Andersen
2019-08-16 12:33:10 +02:00
parent 657a1ccb44
commit 726163b7f1
9 changed files with 249 additions and 106 deletions

7
Cargo.lock generated
View File

@@ -306,11 +306,6 @@ dependencies = [
"vcpkg 0.2.7 (registry+https://github.com/rust-lang/crates.io-index)", "vcpkg 0.2.7 (registry+https://github.com/rust-lang/crates.io-index)",
] ]
[[package]]
name = "lifeguard"
version = "0.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]] [[package]]
name = "lock_api" name = "lock_api"
version = "0.1.5" version = "0.1.5"
@@ -1037,7 +1032,6 @@ dependencies = [
"hex 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)", "hex 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)",
"hmac 0.7.1 (registry+https://github.com/rust-lang/crates.io-index)", "hmac 0.7.1 (registry+https://github.com/rust-lang/crates.io-index)",
"lazy_static 1.3.0 (registry+https://github.com/rust-lang/crates.io-index)", "lazy_static 1.3.0 (registry+https://github.com/rust-lang/crates.io-index)",
"lifeguard 0.6.0 (registry+https://github.com/rust-lang/crates.io-index)",
"proptest 0.9.4 (registry+https://github.com/rust-lang/crates.io-index)", "proptest 0.9.4 (registry+https://github.com/rust-lang/crates.io-index)",
"rand 0.6.5 (registry+https://github.com/rust-lang/crates.io-index)", "rand 0.6.5 (registry+https://github.com/rust-lang/crates.io-index)",
"sodiumoxide 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)", "sodiumoxide 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)",
@@ -1136,7 +1130,6 @@ dependencies = [
"checksum libc 0.2.60 (registry+https://github.com/rust-lang/crates.io-index)" = "d44e80633f007889c7eff624b709ab43c92d708caad982295768a7b13ca3b5eb" "checksum libc 0.2.60 (registry+https://github.com/rust-lang/crates.io-index)" = "d44e80633f007889c7eff624b709ab43c92d708caad982295768a7b13ca3b5eb"
"checksum libflate 0.1.25 (registry+https://github.com/rust-lang/crates.io-index)" = "90c6f86f4b0caa347206f916f8b687b51d77c6ef8ff18d52dd007491fd580529" "checksum libflate 0.1.25 (registry+https://github.com/rust-lang/crates.io-index)" = "90c6f86f4b0caa347206f916f8b687b51d77c6ef8ff18d52dd007491fd580529"
"checksum libsodium-sys 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)" = "de29595a79ddae2612ad0f27793a0b86cdf05a12f94ad5b87674540cc568171e" "checksum libsodium-sys 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)" = "de29595a79ddae2612ad0f27793a0b86cdf05a12f94ad5b87674540cc568171e"
"checksum lifeguard 0.6.0 (registry+https://github.com/rust-lang/crates.io-index)" = "eee00513d51f9a08737b74a286c761fc641114d1d5d6329beb11510049ec405f"
"checksum lock_api 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)" = "62ebf1391f6acad60e5c8b43706dde4582df75c06698ab44511d15016bc2442c" "checksum lock_api 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)" = "62ebf1391f6acad60e5c8b43706dde4582df75c06698ab44511d15016bc2442c"
"checksum log 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)" = "14b6052be84e6b71ab17edffc2eeabf5c2c3ae1fdb464aae35ac50c67a44e1f7" "checksum log 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)" = "14b6052be84e6b71ab17edffc2eeabf5c2c3ae1fdb464aae35ac50c67a44e1f7"
"checksum memoffset 0.5.1 (registry+https://github.com/rust-lang/crates.io-index)" = "ce6075db033bbbb7ee5a0bbd3a3186bbae616f57fb001c485c7ff77955f8177f" "checksum memoffset 0.5.1 (registry+https://github.com/rust-lang/crates.io-index)" = "ce6075db033bbbb7ee5a0bbd3a3186bbae616f57fb001c485c7ff77955f8177f"

View File

@@ -21,7 +21,6 @@ tokio = "0.1.22"
futures = "0.1.28" futures = "0.1.28"
arraydeque = "^0.4" arraydeque = "^0.4"
treebitmap = "^0.4" treebitmap = "^0.4"
lifeguard = "0.6.0"
[dependencies.x25519-dalek] [dependencies.x25519-dalek]
version = "^0.5" version = "^0.5"

View File

@@ -23,6 +23,8 @@ use super::types::*;
use crate::types::{Key, KeyPair}; use crate::types::{Key, KeyPair};
use std::time::Instant;
// HMAC hasher (generic construction) // HMAC hasher (generic construction)
type HMACBlake2s = Hmac<Blake2s>; type HMACBlake2s = Hmac<Blake2s>;
@@ -388,6 +390,7 @@ pub fn create_response<T: Copy, R: RngCore + CryptoRng>(
// return unconfirmed key-pair // return unconfirmed key-pair
Ok(KeyPair { Ok(KeyPair {
birth: Instant::now(),
confirmed: false, confirmed: false,
send: Key { send: Key {
id: sender, id: sender,
@@ -462,6 +465,7 @@ pub fn consume_response<T: Copy>(
Some(peer.identifier), // proves overship of the public key (e.g. for updating the endpoint) Some(peer.identifier), // proves overship of the public key (e.g. for updating the endpoint)
None, // no response message None, // no response message
Some(KeyPair { Some(KeyPair {
birth: Instant::now(),
confirmed: true, confirmed: true,
send: Key { send: Key {
id: sender, id: sender,

View File

@@ -100,12 +100,12 @@ impl AntiReplay {
/// ///
/// Ok(()) if sequence number is valid (not marked and not behind the moving window). /// Ok(()) if sequence number is valid (not marked and not behind the moving window).
/// Err if the sequence number is invalid (already marked or "too old"). /// Err if the sequence number is invalid (already marked or "too old").
pub fn update(&mut self, seq: u64) -> Result<(), ()> { pub fn update(&mut self, seq: u64) -> bool {
if self.check(seq) { if self.check(seq) {
self.update_store(seq); self.update_store(seq);
Ok(()) true
} else { } else {
Err(()) false
} }
} }
} }
@@ -119,35 +119,36 @@ mod tests {
let mut ar = AntiReplay::new(); let mut ar = AntiReplay::new();
for i in 0..20000 { for i in 0..20000 {
ar.update(i).unwrap(); assert!(ar.update(i));
} }
for i in (0..20000).rev() { for i in (0..20000).rev() {
assert!(!ar.check(i)); assert!(!ar.check(i));
} }
ar.update(65536).unwrap(); assert!(ar.update(65536));
for i in (65536 - WINDOW_SIZE)..65535 { for i in (65536 - WINDOW_SIZE)..65535 {
ar.update(i).unwrap(); assert!(ar.update(i));
} }
for i in (65536 - 10 * WINDOW_SIZE)..65535 { for i in (65536 - 10 * WINDOW_SIZE)..65535 {
assert!(!ar.check(i)); assert!(!ar.check(i));
} }
ar.update(66000).unwrap(); assert!(ar.update(66000));
for i in 65537..66000 { for i in 65537..66000 {
ar.update(i).unwrap(); assert!(ar.update(i));
} }
for i in 65537..66000 { for i in 65537..66000 {
assert!(ar.update(i).is_err()); assert_eq!(ar.update(i), false);
} }
// Test max u64. // Test max u64.
let next = u64::max_value(); let next = u64::max_value();
ar.update(next).unwrap(); assert!(ar.update(next));
assert!(!ar.check(next)); assert!(!ar.check(next));
for i in (next - WINDOW_SIZE)..next { for i in (next - WINDOW_SIZE)..next {
ar.update(i).unwrap(); assert!(ar.update(i));
} }
for i in (next - 20 * WINDOW_SIZE)..next { for i in (next - 20 * WINDOW_SIZE)..next {
assert!(!ar.check(i)); assert!(!ar.check(i));

View File

@@ -1,14 +1,17 @@
use arraydeque::{ArrayDeque, Saturating, Wrapping}; use arraydeque::{ArrayDeque, Saturating, Wrapping};
use lifeguard::{Pool, Recycled};
use treebitmap::IpLookupTable; use treebitmap::IpLookupTable;
use std::collections::HashMap; use std::collections::HashMap;
use std::error::Error;
use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr}; use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr};
use std::sync::atomic::{AtomicPtr, AtomicU64, Ordering}; use std::ptr;
use std::sync::atomic::{AtomicBool, AtomicPtr, AtomicU64, Ordering};
use std::sync::mpsc::{sync_channel, Receiver, SyncSender};
use std::sync::{Arc, Mutex, Weak}; use std::sync::{Arc, Mutex, Weak};
use std::thread;
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
use spin::RwLock; use spin;
use super::super::types::KeyPair; use super::super::types::KeyPair;
use super::anti_replay::AntiReplay; use super::anti_replay::AntiReplay;
@@ -18,143 +21,197 @@ use std::u64;
const REJECT_AFTER_MESSAGES: u64 = u64::MAX - (1 << 4); const REJECT_AFTER_MESSAGES: u64 = u64::MAX - (1 << 4);
const MAX_STAGED_PACKETS: usize = 128; const MAX_STAGED_PACKETS: usize = 128;
pub struct Device<'a> { pub struct Device {
recv: RwLock<HashMap<u32, Arc<Peer<'a>>>>, // map receiver id -> peer recv: spin::RwLock<HashMap<u32, DecryptionState>>,
ipv4: IpLookupTable<Ipv4Addr, Arc<Peer<'a>>>, // ipv4 trie ipv4: IpLookupTable<Ipv4Addr, Weak<PeerInner>>,
ipv6: IpLookupTable<Ipv6Addr, Arc<Peer<'a>>>, // ipv6 trie ipv6: IpLookupTable<Ipv6Addr, Weak<PeerInner>>,
pool: Pool<Vec<u8>>, // message buffer pool
} }
struct KeyState(KeyPair, AntiReplay);
struct EncryptionState { struct EncryptionState {
key: [u8; 32], // encryption key key: [u8; 32], // encryption key
id: u64, // sender id id: u32, // sender id
nonce: AtomicU64, // next available nonce nonce: u64, // next available nonce
death: Instant, // can must the key no longer be used: death: Instant, // time when the key no longer can be used for encryption
// (birth + reject-after-time - keepalive-timeout - rekey-timeout) // (birth + reject-after-time - keepalive-timeout - rekey-timeout)
}
struct DecryptionState {
key: [u8; 32],
protector: Arc<spin::Mutex<AntiReplay>>,
peer: Weak<PeerInner>,
death: Instant, // time when the key can no longer be used for decryption
} }
struct KeyWheel { struct KeyWheel {
next: AtomicPtr<Arc<Option<KeyState>>>, // next key state (unconfirmed) next: Option<KeyPair>, // next key state (unconfirmed)
current: AtomicPtr<Arc<Option<KeyState>>>, // current key state (used for encryption) current: Option<KeyPair>, // current key state (used for encryption)
previous: AtomicPtr<Arc<Option<KeyState>>>, // old key state (used for decryption) previous: Option<KeyPair>, // old key state (used for decryption)
} }
pub struct Peer<'a> { struct PeerInner {
inorder: Mutex<ArrayDeque<[Option<Recycled<'a, Vec<u8>>>; MAX_STAGED_PACKETS], Saturating>>, // inorder queue inorder_outbound: SyncSender<()>,
inorder_inbound: SyncSender<()>,
staged_packets: Mutex<ArrayDeque<[Vec<u8>; MAX_STAGED_PACKETS], Wrapping>>, // packets awaiting handshake staged_packets: Mutex<ArrayDeque<[Vec<u8>; MAX_STAGED_PACKETS], Wrapping>>, // packets awaiting handshake
rx_bytes: AtomicU64, // received bytes rx_bytes: AtomicU64, // received bytes
tx_bytes: AtomicU64, // transmitted bytes tx_bytes: AtomicU64, // transmitted bytes
keys: KeyWheel, // key-wheel keys: spin::Mutex<KeyWheel>, // key-wheel
ekey: AtomicPtr<Arc<EncryptionState>>, // encryption state ekey: spin::Mutex<EncryptionState>, // encryption state
endpoint: AtomicPtr<Arc<Option<SocketAddr>>>, endpoint: spin::Mutex<Option<Arc<SocketAddr>>>,
} }
impl<'a> Peer<'a> { pub struct Peer(Arc<PeerInner>);
pub fn set_endpoint(&self, endpoint: SocketAddr) {
self.endpoint impl Drop for Peer {
.store(&mut Arc::new(Some(endpoint)), Ordering::Relaxed) fn drop(&mut self) {
// stop threads and remove peer from device
}
}
impl Drop for Device {
fn drop(&mut self) {
// stop threads
}
}
impl Peer {
fn set_endpoint(&self, endpoint: SocketAddr) {
*self.0.endpoint.lock() = Some(Arc::new(endpoint))
} }
pub fn add_keypair(&self, keypair: KeyPair) { pub fn keypair_confirm(&self, ks: Arc<KeyPair>) {
let confirmed = keypair.confirmed; *self.0.ekey.lock() = EncryptionState {
let mut st_new = Arc::new(Some(KeyState(keypair, AntiReplay::new()))); id: ks.send.id,
let st_previous = self.keys.previous.load(Ordering::Relaxed); key: ks.send.key,
if confirmed { nonce: 0,
// previous <- current death: ks.birth + Duration::from_millis(1337), // todo
self.keys.previous.compare_and_swap( };
st_previous, }
self.keys.current.load(Ordering::Relaxed),
Ordering::Relaxed,
);
// current <- new pub fn keypair_add(&self, new: KeyPair) -> Option<u32> {
self.keys.next.store(&mut st_new, Ordering::Relaxed) let mut keys = self.0.keys.lock();
let release = keys.previous.map(|k| k.recv.id);
// update key-wheel
if new.confirmed {
// start using key for encryption
*self.0.ekey.lock() = EncryptionState {
id: new.send.id,
key: new.send.key,
nonce: 0,
death: new.birth + Duration::from_millis(1337), // todo
};
// move current into previous
keys.previous = keys.current;
keys.current = Some(new);
} else { } else {
// previous <- next // store the key and await confirmation
self.keys.previous.compare_and_swap( keys.previous = keys.next;
st_previous, keys.next = Some(new);
self.keys.next.load(Ordering::Relaxed), };
Ordering::Relaxed,
);
// next <- new // return the released id (for handshake state machine)
self.keys.next.store(&mut st_new, Ordering::Relaxed) release
}
} }
pub fn rx_bytes(&self) -> u64 { pub fn rx_bytes(&self) -> u64 {
self.rx_bytes.load(Ordering::Relaxed) self.0.rx_bytes.load(Ordering::Relaxed)
} }
pub fn tx_bytes(&self) -> u64 { pub fn tx_bytes(&self) -> u64 {
self.tx_bytes.load(Ordering::Relaxed) self.0.tx_bytes.load(Ordering::Relaxed)
} }
} }
impl<'a> Device<'a> { impl Device {
pub fn new() -> Device<'a> { pub fn new() -> Device {
Device { Device {
recv: RwLock::new(HashMap::new()), recv: spin::RwLock::new(HashMap::new()),
ipv4: IpLookupTable::new(), ipv4: IpLookupTable::new(),
ipv6: IpLookupTable::new(), ipv6: IpLookupTable::new(),
pool: Pool::with_size_and_max(0, MAX_STAGED_PACKETS * 2),
} }
} }
pub fn subnets(&self, peer: Arc<Peer<'a>>) -> Vec<(IpAddr, u32)> { pub fn release(&self, id: u32) {
debug_assert!(
if let Some(_) = self.recv.read().get(&id) {
true
} else {
false
},
true
);
self.recv.write().remove(&id);
}
pub fn add_subnet(&mut self, ip: IpAddr, masklen: u32, peer: Peer) {
match ip {
IpAddr::V4(v4) => self.ipv4.insert(v4, masklen, Arc::downgrade(&peer.0)),
IpAddr::V6(v6) => self.ipv6.insert(v6, masklen, Arc::downgrade(&peer.0)),
};
}
pub fn subnets(&self, peer: Peer) -> Vec<(IpAddr, u32)> {
let mut subnets = Vec::new(); let mut subnets = Vec::new();
// extract ipv4 entries // extract ipv4 entries
for subnet in self.ipv4.iter() { for subnet in self.ipv4.iter() {
let (ip, masklen, p) = subnet; let (ip, masklen, p) = subnet;
if Arc::ptr_eq(&peer, p) { if let Some(p) = p.upgrade() {
subnets.push((IpAddr::V4(ip), masklen)) if Arc::ptr_eq(&p, &peer.0) {
subnets.push((IpAddr::V4(ip), masklen))
}
} }
} }
// extract ipv6 entries // extract ipv6 entries
for subnet in self.ipv6.iter() { for subnet in self.ipv6.iter() {
let (ip, masklen, p) = subnet; let (ip, masklen, p) = subnet;
if Arc::ptr_eq(&peer, p) { if let Some(p) = p.upgrade() {
subnets.push((IpAddr::V6(ip), masklen)) if Arc::ptr_eq(&p, &peer.0) {
subnets.push((IpAddr::V6(ip), masklen))
}
} }
} }
subnets subnets
} }
pub fn keypair_add(&self, peer: Peer, new: KeyPair) -> Option<u32> {
// update key-wheel of peer
let release = peer.keypair_add(new);
// update incoming packet id map
let mut recv = self.recv.write();
// release id of previous keypair
if let Some(id) = release {
debug_assert!(recv.contains_key(&id));
recv.remove(&id);
};
// map new id to keypair
debug_assert!(!recv.contains_key(&new.recv.id));
recv.insert(
new.recv.id,
DecryptionState {
key: new.recv.key,
protector: Arc::new(spin::Mutex::new(AntiReplay::new())),
peer: Arc::downgrade(&peer.0),
death: new.birth + Duration::from_millis(2600), // todo
},
);
release
}
/// Adds a new peer to the device /// Adds a new peer to the device
/// ///
/// # Returns /// # Returns
/// ///
/// A atomic ref. counted peer (with liftime matching the device) /// A atomic ref. counted peer (with liftime matching the device)
pub fn add(&mut self) -> Arc<Peer<'a>> { pub fn add(&mut self) -> () {}
Arc::new(Peer {
inorder: Mutex::new(ArrayDeque::new()),
staged_packets: Mutex::new(ArrayDeque::new()),
rx_bytes: AtomicU64::new(0),
tx_bytes: AtomicU64::new(0),
keys: KeyWheel {
next: AtomicPtr::new(&mut Arc::new(None)),
current: AtomicPtr::new(&mut Arc::new(None)),
previous: AtomicPtr::new(&mut Arc::new(None)),
},
// long expired encryption key
ekey: AtomicPtr::new(&mut Arc::new(EncryptionState {
key: [0u8; 32],
id: 0,
nonce: AtomicU64::new(REJECT_AFTER_MESSAGES),
death: Instant::now() - Duration::from_secs(31536000),
})),
endpoint: AtomicPtr::new(&mut Arc::new(None)),
})
}
pub fn get_buffer(&self) -> Recycled<Vec<u8>> {
self.pool.new()
}
/// Cryptkey routes and sends a plaintext message (IP packet) /// Cryptkey routes and sends a plaintext message (IP packet)
/// ///

52
src/router/inbound.rs Normal file
View File

@@ -0,0 +1,52 @@
use std::thread;
use spin;
use lifeguard::Recycled;
use super::anti_replay::AntiReplay;
use std::sync::mpsc::{Receiver, sync_channel};
use std::sync::Arc;
struct ParallelJobInner {
done : bool,
msg : Vec<u8>,
key : [u8; 32]
}
type ParallelJob = spin::Mutex<ParallelJobInner>;
struct InboundInorder {
job : Arc<ParallelJob>,
state : Arc<KeyState>,
}
struct Inorder<'a> (Arc<spin::Mutex<Option<Job<'a>>>>);
struct Job<'a> {
msg : Recycled<'a, Vec<u8>>,
arp : Arc<KeyState>, // replay protector and key-pair
key : Option<(Arc<Peer>, Arc<KeyPair>)> // provided if the key has not been confirmed
}
fn worker_inorder<'a>(channel : Receiver<Inorder<'a>>) {
let mut current = 0;
// reads from inorder channel
for ordered in channel.recv().iter() {
loop {
// check if job is complete
match ordered.0.try_lock() {
None => (),
Some(guard) => if let Some(job) = *guard {
if job.arp.lock().update(6) {
// write to output
break;
}
}
}
// wait for job to complete
thread::park();
}
}
}

View File

@@ -1,5 +1,7 @@
mod anti_replay; mod anti_replay;
mod buffer; mod buffer;
mod device; mod device;
// mod inbound;
// mod outbound;
pub use device::Device; pub use device::Device;

32
src/router/outbound.rs Normal file
View File

@@ -0,0 +1,32 @@
use spin;
use std::thread;
use std::sync::Arc;
use std::sync::mpsc::{Receiver, sync_channel};
struct JobInner {
done : bool, // is encryption complete?
msg : Vec<u8>, // transport message (id, nonce already set)
key : [u8; 32], // encryption key
handle : thread::JoinHandle
}
type Job = Arc<spin::Mutex<JobInner>>;
fn worker_parallel()
fn worker_inorder(channel : Receiver<Job>) {
for ordered in channel.recv().iter() {
loop {
// check if job is complete
match ordered.try_lock() {
None => (),
Some(guard) => if guard.done {
// write to UDP interface
}
}
// wait for job to complete
thread::park();
}
}
}

View File

@@ -1,8 +1,10 @@
use std::time::Instant;
/* This file holds types passed between components. /* This file holds types passed between components.
* Whenever a type cannot be held local to a single module. * Whenever a type cannot be held local to a single module.
*/ */
#[derive(Debug)] #[derive(Debug, Clone, Copy)]
pub struct Key { pub struct Key {
pub key: [u8; 32], pub key: [u8; 32],
pub id: u32, pub id: u32,
@@ -15,8 +17,9 @@ impl PartialEq for Key {
} }
} }
#[derive(Debug)] #[derive(Debug, Clone, Copy)]
pub struct KeyPair { pub struct KeyPair {
pub birth: Instant, // when was the key-pair created
pub confirmed: bool, // has the key-pair been confirmed? pub confirmed: bool, // has the key-pair been confirmed?
pub send: Key, // key for outbound messages pub send: Key, // key for outbound messages
pub recv: Key, // key for inbound messages pub recv: Key, // key for inbound messages