Restructure and job stealing work queue
This commit is contained in:
@@ -14,7 +14,7 @@ fn main() {
|
|||||||
// choose optimal crypto implementations for platform
|
// choose optimal crypto implementations for platform
|
||||||
sodiumoxide::init().unwrap();
|
sodiumoxide::init().unwrap();
|
||||||
|
|
||||||
let mut rdev = router::Device::new(8);
|
let mut router = router::Device::new(8);
|
||||||
|
|
||||||
let pref = rdev.new_peer();
|
let peer = router.new_peer();
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -16,149 +16,37 @@ use spin;
|
|||||||
use super::super::constants::*;
|
use super::super::constants::*;
|
||||||
use super::super::types::KeyPair;
|
use super::super::types::KeyPair;
|
||||||
use super::anti_replay::AntiReplay;
|
use super::anti_replay::AntiReplay;
|
||||||
|
use super::peer;
|
||||||
|
use super::peer::{Peer, PeerInner};
|
||||||
|
use super::workers;
|
||||||
|
|
||||||
use std::u64;
|
pub struct DeviceInner {
|
||||||
|
pub stopped: AtomicBool,
|
||||||
const MAX_STAGED_PACKETS: usize = 128;
|
pub injector: Injector<()>, // parallel enc/dec task injector
|
||||||
|
pub threads: Vec<thread::JoinHandle<()>>, // join handles of worker threads
|
||||||
struct DeviceInner {
|
pub recv: spin::RwLock<HashMap<u32, DecryptionState>>, // receiver id -> decryption state
|
||||||
stopped: AtomicBool,
|
pub ipv4: spin::RwLock<IpLookupTable<Ipv4Addr, Weak<PeerInner>>>, // ipv4 cryptkey routing
|
||||||
injector: Injector<()>, // parallel enc/dec task injector
|
pub ipv6: spin::RwLock<IpLookupTable<Ipv6Addr, Weak<PeerInner>>>, // ipv6 cryptkey routing
|
||||||
threads: Vec<thread::JoinHandle<()>>, // join handles of worker threads
|
|
||||||
recv: spin::RwLock<HashMap<u32, DecryptionState>>, // receiver id -> decryption state
|
|
||||||
ipv4: spin::RwLock<IpLookupTable<Ipv4Addr, Weak<PeerInner>>>, // ipv4 cryptkey routing
|
|
||||||
ipv6: spin::RwLock<IpLookupTable<Ipv6Addr, Weak<PeerInner>>>, // ipv6 cryptkey routing
|
|
||||||
}
|
}
|
||||||
|
|
||||||
struct PeerInner {
|
pub struct EncryptionState {
|
||||||
stopped: AtomicBool,
|
pub key: [u8; 32], // encryption key
|
||||||
device: Arc<DeviceInner>,
|
pub id: u32, // sender id
|
||||||
thread_outbound: spin::Mutex<thread::JoinHandle<()>>,
|
pub nonce: u64, // next available nonce
|
||||||
thread_inbound: spin::Mutex<thread::JoinHandle<()>>,
|
pub death: Instant, // time when the key no longer can be used for encryption
|
||||||
inorder_outbound: SyncSender<()>,
|
// (birth + reject-after-time - keepalive-timeout - rekey-timeout)
|
||||||
inorder_inbound: SyncSender<()>,
|
|
||||||
staged_packets: spin::Mutex<ArrayDeque<[Vec<u8>; MAX_STAGED_PACKETS], Wrapping>>, // packets awaiting handshake
|
|
||||||
rx_bytes: AtomicU64, // received bytes
|
|
||||||
tx_bytes: AtomicU64, // transmitted bytes
|
|
||||||
keys: spin::Mutex<KeyWheel>, // key-wheel
|
|
||||||
ekey: spin::Mutex<Option<EncryptionState>>, // encryption state
|
|
||||||
endpoint: spin::Mutex<Option<Arc<SocketAddr>>>,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
struct EncryptionState {
|
pub struct DecryptionState {
|
||||||
key: [u8; 32], // encryption key
|
pub key: [u8; 32],
|
||||||
id: u32, // sender id
|
pub keypair: Weak<KeyPair>,
|
||||||
nonce: u64, // next available nonce
|
pub protector: spin::Mutex<AntiReplay>,
|
||||||
death: Instant, // time when the key no longer can be used for encryption
|
pub peer: Weak<PeerInner>,
|
||||||
// (birth + reject-after-time - keepalive-timeout - rekey-timeout)
|
pub death: Instant, // time when the key can no longer be used for decryption
|
||||||
}
|
}
|
||||||
|
|
||||||
struct DecryptionState {
|
|
||||||
key: [u8; 32],
|
|
||||||
// keypair: Weak<KeyPair>,
|
|
||||||
protector: spin::Mutex<AntiReplay>,
|
|
||||||
peer: Weak<PeerInner>,
|
|
||||||
death: Instant, // time when the key can no longer be used for decryption
|
|
||||||
}
|
|
||||||
|
|
||||||
struct KeyWheel {
|
|
||||||
next: Option<Arc<KeyPair>>, // next key state (unconfirmed)
|
|
||||||
current: Option<Arc<KeyPair>>, // current key state (used for encryption)
|
|
||||||
previous: Option<Arc<KeyPair>>, // old key state (used for decryption)
|
|
||||||
retired: Option<u32>, // retired id (previous id, after confirming key-pair)
|
|
||||||
}
|
|
||||||
|
|
||||||
pub struct Peer(Arc<PeerInner>);
|
|
||||||
pub struct Device(Arc<DeviceInner>);
|
pub struct Device(Arc<DeviceInner>);
|
||||||
|
|
||||||
fn treebit_list<A, R>(
|
|
||||||
peer: &Arc<PeerInner>,
|
|
||||||
table: &spin::RwLock<IpLookupTable<A, Weak<PeerInner>>>,
|
|
||||||
callback: Box<dyn Fn(A, u32) -> R>,
|
|
||||||
) -> Vec<R>
|
|
||||||
where
|
|
||||||
A: Address,
|
|
||||||
{
|
|
||||||
let mut res = Vec::new();
|
|
||||||
for subnet in table.read().iter() {
|
|
||||||
let (ip, masklen, p) = subnet;
|
|
||||||
if let Some(p) = p.upgrade() {
|
|
||||||
if Arc::ptr_eq(&p, &peer) {
|
|
||||||
res.push(callback(ip, masklen))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
res
|
|
||||||
}
|
|
||||||
|
|
||||||
fn treebit_remove<A>(peer: &Peer, table: &spin::RwLock<IpLookupTable<A, Weak<PeerInner>>>)
|
|
||||||
where
|
|
||||||
A: Address,
|
|
||||||
{
|
|
||||||
let mut m = table.write();
|
|
||||||
|
|
||||||
// collect keys for value
|
|
||||||
let mut subnets = vec![];
|
|
||||||
for subnet in m.iter() {
|
|
||||||
let (ip, masklen, p) = subnet;
|
|
||||||
if let Some(p) = p.upgrade() {
|
|
||||||
if Arc::ptr_eq(&p, &peer.0) {
|
|
||||||
subnets.push((ip, masklen))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// remove all key mappings
|
|
||||||
for subnet in subnets {
|
|
||||||
let r = m.remove(subnet.0, subnet.1);
|
|
||||||
debug_assert!(r.is_some());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Drop for Peer {
|
|
||||||
fn drop(&mut self) {
|
|
||||||
// mark peer as stopped
|
|
||||||
|
|
||||||
let peer = &self.0;
|
|
||||||
peer.stopped.store(true, Ordering::SeqCst);
|
|
||||||
|
|
||||||
// remove from cryptkey router
|
|
||||||
|
|
||||||
treebit_remove(self, &peer.device.ipv4);
|
|
||||||
treebit_remove(self, &peer.device.ipv6);
|
|
||||||
|
|
||||||
// unpark threads
|
|
||||||
|
|
||||||
peer.thread_inbound.lock().thread().unpark();
|
|
||||||
peer.thread_outbound.lock().thread().unpark();
|
|
||||||
|
|
||||||
// release ids from the receiver map
|
|
||||||
|
|
||||||
let mut keys = peer.keys.lock();
|
|
||||||
let mut release = Vec::with_capacity(3);
|
|
||||||
|
|
||||||
keys.next.as_ref().map(|k| release.push(k.recv.id));
|
|
||||||
keys.current.as_ref().map(|k| release.push(k.recv.id));
|
|
||||||
keys.previous.as_ref().map(|k| release.push(k.recv.id));
|
|
||||||
|
|
||||||
if release.len() > 0 {
|
|
||||||
let mut recv = peer.device.recv.write();
|
|
||||||
for id in &release {
|
|
||||||
recv.remove(id);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// null key-material (TODO: extend)
|
|
||||||
|
|
||||||
keys.next = None;
|
|
||||||
keys.current = None;
|
|
||||||
keys.previous = None;
|
|
||||||
|
|
||||||
*peer.ekey.lock() = None;
|
|
||||||
*peer.endpoint.lock() = None;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Drop for Device {
|
impl Drop for Device {
|
||||||
fn drop(&mut self) {
|
fn drop(&mut self) {
|
||||||
// mark device as stopped
|
// mark device as stopped
|
||||||
@@ -175,163 +63,6 @@ impl Drop for Device {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl PeerInner {
|
|
||||||
pub fn keypair_confirm(&self, kp: Weak<KeyPair>) {
|
|
||||||
let mut keys = self.keys.lock();
|
|
||||||
|
|
||||||
// Attempt to upgrade Weak -> Arc
|
|
||||||
// (this should ensure that the key is in the key-wheel,
|
|
||||||
// which holds the only strong reference)
|
|
||||||
let kp = match kp.upgrade() {
|
|
||||||
Some(kp) => kp,
|
|
||||||
None => {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
debug_assert!(
|
|
||||||
keys.retired.is_none(),
|
|
||||||
"retired spot is not free for previous"
|
|
||||||
);
|
|
||||||
|
|
||||||
debug_assert!(
|
|
||||||
if let Some(key) = &keys.next {
|
|
||||||
Arc::ptr_eq(&kp, &key)
|
|
||||||
} else {
|
|
||||||
false
|
|
||||||
},
|
|
||||||
"if next has been overwritten, before confirmation, the key-pair should have been dropped!"
|
|
||||||
);
|
|
||||||
|
|
||||||
// enable use for encryption and set confirmed
|
|
||||||
*self.ekey.lock() = Some(EncryptionState {
|
|
||||||
id: kp.send.id,
|
|
||||||
key: kp.send.key,
|
|
||||||
nonce: 0,
|
|
||||||
death: kp.birth + REJECT_AFTER_TIME,
|
|
||||||
});
|
|
||||||
|
|
||||||
// rotate the key-wheel
|
|
||||||
let release = keys.previous.as_ref().map(|k| k.recv.id);
|
|
||||||
keys.previous = keys.current.as_ref().map(|v| v.clone());
|
|
||||||
keys.current = Some(kp.clone());
|
|
||||||
keys.retired = release;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Public interface and handle to the peer
|
|
||||||
impl Peer {
|
|
||||||
pub fn set_endpoint(&self, endpoint: SocketAddr) {
|
|
||||||
*self.0.endpoint.lock() = Some(Arc::new(endpoint))
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Add a new keypair
|
|
||||||
///
|
|
||||||
/// # Arguments
|
|
||||||
///
|
|
||||||
/// - new: The new confirmed/unconfirmed key pair
|
|
||||||
///
|
|
||||||
/// # Returns
|
|
||||||
///
|
|
||||||
/// A vector of ids which has been released.
|
|
||||||
/// These should be released in the handshake module.
|
|
||||||
pub fn add_keypair(&self, new: KeyPair) -> Vec<u32> {
|
|
||||||
let mut keys = self.0.keys.lock();
|
|
||||||
let mut release = Vec::with_capacity(2);
|
|
||||||
|
|
||||||
// collect ids to be released
|
|
||||||
keys.retired.map(|v| release.push(v));
|
|
||||||
keys.previous.as_ref().map(|k| release.push(k.recv.id));
|
|
||||||
|
|
||||||
// update key-wheel
|
|
||||||
if new.confirmed {
|
|
||||||
// start using key for encryption
|
|
||||||
*self.0.ekey.lock() = Some(EncryptionState {
|
|
||||||
id: new.send.id,
|
|
||||||
key: new.send.key,
|
|
||||||
nonce: 0,
|
|
||||||
death: new.birth + REJECT_AFTER_TIME,
|
|
||||||
});
|
|
||||||
|
|
||||||
// move current into previous
|
|
||||||
keys.previous = keys.current.as_ref().map(|v| v.clone());;
|
|
||||||
keys.current = Some(Arc::new(new));
|
|
||||||
} else {
|
|
||||||
// store the key and await confirmation
|
|
||||||
keys.previous = keys.next.as_ref().map(|v| v.clone());;
|
|
||||||
keys.next = Some(Arc::new(new));
|
|
||||||
};
|
|
||||||
|
|
||||||
// update incoming packet id map
|
|
||||||
{
|
|
||||||
let mut recv = self.0.device.recv.write();
|
|
||||||
|
|
||||||
// purge recv map of released ids
|
|
||||||
for id in &release {
|
|
||||||
recv.remove(&id);
|
|
||||||
}
|
|
||||||
|
|
||||||
// map new id to keypair
|
|
||||||
debug_assert!(!recv.contains_key(&new.recv.id));
|
|
||||||
|
|
||||||
recv.insert(
|
|
||||||
new.recv.id,
|
|
||||||
DecryptionState {
|
|
||||||
key: new.recv.key,
|
|
||||||
protector: spin::Mutex::new(AntiReplay::new()),
|
|
||||||
peer: Arc::downgrade(&self.0),
|
|
||||||
death: new.birth + REJECT_AFTER_TIME,
|
|
||||||
},
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
// return the released id (for handshake state machine)
|
|
||||||
release
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn rx_bytes(&self) -> u64 {
|
|
||||||
self.0.rx_bytes.load(Ordering::Relaxed)
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn tx_bytes(&self) -> u64 {
|
|
||||||
self.0.tx_bytes.load(Ordering::Relaxed)
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn add_subnet(&self, ip: IpAddr, masklen: u32) {
|
|
||||||
match ip {
|
|
||||||
IpAddr::V4(v4) => {
|
|
||||||
self.0
|
|
||||||
.device
|
|
||||||
.ipv4
|
|
||||||
.write()
|
|
||||||
.insert(v4, masklen, Arc::downgrade(&self.0))
|
|
||||||
}
|
|
||||||
IpAddr::V6(v6) => {
|
|
||||||
self.0
|
|
||||||
.device
|
|
||||||
.ipv6
|
|
||||||
.write()
|
|
||||||
.insert(v6, masklen, Arc::downgrade(&self.0))
|
|
||||||
}
|
|
||||||
};
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn list_subnets(&self) -> Vec<(IpAddr, u32)> {
|
|
||||||
let mut res = Vec::new();
|
|
||||||
res.append(&mut treebit_list(
|
|
||||||
&self.0,
|
|
||||||
&self.0.device.ipv4,
|
|
||||||
Box::new(|ip, masklen| (IpAddr::V4(ip), masklen)),
|
|
||||||
));
|
|
||||||
res.append(&mut treebit_list(
|
|
||||||
&self.0,
|
|
||||||
&self.0.device.ipv6,
|
|
||||||
Box::new(|ip, masklen| (IpAddr::V6(ip), masklen)),
|
|
||||||
));
|
|
||||||
res
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Device {
|
impl Device {
|
||||||
pub fn new(workers: usize) -> Device {
|
pub fn new(workers: usize) -> Device {
|
||||||
Device(Arc::new(DeviceInner {
|
Device(Arc::new(DeviceInner {
|
||||||
@@ -350,34 +81,7 @@ impl Device {
|
|||||||
///
|
///
|
||||||
/// A atomic ref. counted peer (with liftime matching the device)
|
/// A atomic ref. counted peer (with liftime matching the device)
|
||||||
pub fn new_peer(&self) -> Peer {
|
pub fn new_peer(&self) -> Peer {
|
||||||
// spawn inbound thread
|
peer::new_peer(self.0.clone())
|
||||||
let (send_inbound, recv_inbound) = sync_channel(1);
|
|
||||||
let handle_inbound = thread::spawn(move || {});
|
|
||||||
|
|
||||||
// spawn outbound thread
|
|
||||||
let (send_outbound, recv_inbound) = sync_channel(1);
|
|
||||||
let handle_outbound = thread::spawn(move || {});
|
|
||||||
|
|
||||||
// allocate peer object
|
|
||||||
Peer(Arc::new(PeerInner {
|
|
||||||
stopped: AtomicBool::new(false),
|
|
||||||
device: self.0.clone(),
|
|
||||||
ekey: spin::Mutex::new(None),
|
|
||||||
endpoint: spin::Mutex::new(None),
|
|
||||||
inorder_inbound: send_inbound,
|
|
||||||
inorder_outbound: send_outbound,
|
|
||||||
keys: spin::Mutex::new(KeyWheel {
|
|
||||||
next: None,
|
|
||||||
current: None,
|
|
||||||
previous: None,
|
|
||||||
retired: None,
|
|
||||||
}),
|
|
||||||
rx_bytes: AtomicU64::new(0),
|
|
||||||
tx_bytes: AtomicU64::new(0),
|
|
||||||
staged_packets: spin::Mutex::new(ArrayDeque::new()),
|
|
||||||
thread_inbound: spin::Mutex::new(handle_inbound),
|
|
||||||
thread_outbound: spin::Mutex::new(handle_outbound),
|
|
||||||
}))
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Cryptkey routes and sends a plaintext message (IP packet)
|
/// Cryptkey routes and sends a plaintext message (IP packet)
|
||||||
@@ -396,39 +100,6 @@ impl Device {
|
|||||||
unimplemented!();
|
unimplemented!();
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Sends a message directly to the peer.
|
|
||||||
/// The router device takes care of discovering/managing the endpoint.
|
|
||||||
/// This is used for handshake initiation/response messages
|
|
||||||
///
|
|
||||||
/// # Arguments
|
|
||||||
///
|
|
||||||
/// - peer: Reference to the destination peer
|
|
||||||
/// - msg: Message to transmit
|
|
||||||
pub fn send_raw(&self, peer: Arc<Peer>, msg: &mut [u8]) {
|
|
||||||
unimplemented!();
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Flush the queue of buffered messages awaiting transmission
|
|
||||||
///
|
|
||||||
/// # Arguments
|
|
||||||
///
|
|
||||||
/// - peer: Reference for the peer to flush
|
|
||||||
pub fn flush_queue(&self, peer: Arc<Peer>) {
|
|
||||||
unimplemented!();
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Attempt to route, encrypt and send all elements buffered in the queue
|
|
||||||
///
|
|
||||||
/// # Arguments
|
|
||||||
///
|
|
||||||
/// # Returns
|
|
||||||
///
|
|
||||||
/// A boolean indicating whether packages where sent.
|
|
||||||
/// Note: This is used for implicit confirmation of handshakes.
|
|
||||||
pub fn send_run_queue(&self, peer: Arc<Peer>) -> bool {
|
|
||||||
unimplemented!();
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Receive an encrypted transport message
|
/// Receive an encrypted transport message
|
||||||
///
|
///
|
||||||
/// # Arguments
|
/// # Arguments
|
||||||
@@ -437,21 +108,4 @@ impl Device {
|
|||||||
pub fn recv(&self, ct_msg: &mut [u8]) {
|
pub fn recv(&self, ct_msg: &mut [u8]) {
|
||||||
unimplemented!();
|
unimplemented!();
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Returns the current endpoint known for the peer
|
|
||||||
///
|
|
||||||
/// # Arguments
|
|
||||||
///
|
|
||||||
/// - peer: The peer to retrieve the endpoint for
|
|
||||||
pub fn get_endpoint(&self, peer: Arc<Peer>) -> SocketAddr {
|
|
||||||
unimplemented!();
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn set_endpoint(&self, peer: Arc<Peer>, endpoint: SocketAddr) {
|
|
||||||
unimplemented!();
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn new_keypair(&self, peer: Arc<Peer>, keypair: KeyPair) {
|
|
||||||
unimplemented!();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -2,6 +2,8 @@ mod anti_replay;
|
|||||||
mod buffer;
|
mod buffer;
|
||||||
mod device;
|
mod device;
|
||||||
// mod inbound;
|
// mod inbound;
|
||||||
// mod outbound;
|
mod workers;
|
||||||
|
mod peer;
|
||||||
|
|
||||||
pub use device::{Device, Peer};
|
pub use peer::Peer;
|
||||||
|
pub use device::Device;
|
||||||
@@ -1,32 +0,0 @@
|
|||||||
use spin;
|
|
||||||
use std::thread;
|
|
||||||
use std::sync::Arc;
|
|
||||||
use std::sync::mpsc::{Receiver, sync_channel};
|
|
||||||
|
|
||||||
struct JobInner {
|
|
||||||
done : bool, // is encryption complete?
|
|
||||||
msg : Vec<u8>, // transport message (id, nonce already set)
|
|
||||||
key : [u8; 32], // encryption key
|
|
||||||
handle : thread::JoinHandle
|
|
||||||
}
|
|
||||||
|
|
||||||
type Job = Arc<spin::Mutex<JobInner>>;
|
|
||||||
|
|
||||||
fn worker_parallel()
|
|
||||||
|
|
||||||
fn worker_inorder(channel : Receiver<Job>) {
|
|
||||||
for ordered in channel.recv().iter() {
|
|
||||||
loop {
|
|
||||||
// check if job is complete
|
|
||||||
match ordered.try_lock() {
|
|
||||||
None => (),
|
|
||||||
Some(guard) => if guard.done {
|
|
||||||
// write to UDP interface
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// wait for job to complete
|
|
||||||
thread::park();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
285
src/router/peer.rs
Normal file
285
src/router/peer.rs
Normal file
@@ -0,0 +1,285 @@
|
|||||||
|
use std::sync::atomic::{AtomicU64, AtomicBool, Ordering};
|
||||||
|
use std::sync::{Weak, Arc};
|
||||||
|
use std::thread;
|
||||||
|
|
||||||
|
use std::net::{IpAddr, SocketAddr};
|
||||||
|
|
||||||
|
use std::sync::mpsc::{sync_channel, SyncSender};
|
||||||
|
|
||||||
|
use spin;
|
||||||
|
|
||||||
|
use arraydeque::{ArrayDeque, Wrapping};
|
||||||
|
|
||||||
|
use treebitmap::IpLookupTable;
|
||||||
|
use treebitmap::address::Address;
|
||||||
|
|
||||||
|
use super::super::types::KeyPair;
|
||||||
|
use super::super::constants::*;
|
||||||
|
|
||||||
|
use super::anti_replay::AntiReplay;
|
||||||
|
use super::device::DeviceInner;
|
||||||
|
use super::device::EncryptionState;
|
||||||
|
use super::device::DecryptionState;
|
||||||
|
|
||||||
|
const MAX_STAGED_PACKETS: usize = 128;
|
||||||
|
|
||||||
|
struct KeyWheel {
|
||||||
|
next: Option<Arc<KeyPair>>, // next key state (unconfirmed)
|
||||||
|
current: Option<Arc<KeyPair>>, // current key state (used for encryption)
|
||||||
|
previous: Option<Arc<KeyPair>>, // old key state (used for decryption)
|
||||||
|
retired: Option<u32>, // retired id (previous id, after confirming key-pair)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct PeerInner {
|
||||||
|
stopped: AtomicBool,
|
||||||
|
device: Arc<DeviceInner>,
|
||||||
|
thread_outbound: spin::Mutex<thread::JoinHandle<()>>,
|
||||||
|
thread_inbound: spin::Mutex<thread::JoinHandle<()>>,
|
||||||
|
inorder_outbound: SyncSender<()>,
|
||||||
|
inorder_inbound: SyncSender<()>,
|
||||||
|
staged_packets: spin::Mutex<ArrayDeque<[Vec<u8>; MAX_STAGED_PACKETS], Wrapping>>, // packets awaiting handshake
|
||||||
|
rx_bytes: AtomicU64, // received bytes
|
||||||
|
tx_bytes: AtomicU64, // transmitted bytes
|
||||||
|
keys: spin::Mutex<KeyWheel>, // key-wheel
|
||||||
|
ekey: spin::Mutex<Option<EncryptionState>>, // encryption state
|
||||||
|
endpoint: spin::Mutex<Option<Arc<SocketAddr>>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct Peer(Arc<PeerInner>);
|
||||||
|
|
||||||
|
fn treebit_list<A, R>(
|
||||||
|
peer: &Arc<PeerInner>,
|
||||||
|
table: &spin::RwLock<IpLookupTable<A, Weak<PeerInner>>>,
|
||||||
|
callback: Box<dyn Fn(A, u32) -> R>,
|
||||||
|
) -> Vec<R>
|
||||||
|
where
|
||||||
|
A: Address,
|
||||||
|
{
|
||||||
|
let mut res = Vec::new();
|
||||||
|
for subnet in table.read().iter() {
|
||||||
|
let (ip, masklen, p) = subnet;
|
||||||
|
if let Some(p) = p.upgrade() {
|
||||||
|
if Arc::ptr_eq(&p, &peer) {
|
||||||
|
res.push(callback(ip, masklen))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
res
|
||||||
|
}
|
||||||
|
|
||||||
|
fn treebit_remove<A>(peer: &Peer, table: &spin::RwLock<IpLookupTable<A, Weak<PeerInner>>>)
|
||||||
|
where
|
||||||
|
A: Address,
|
||||||
|
{
|
||||||
|
let mut m = table.write();
|
||||||
|
|
||||||
|
// collect keys for value
|
||||||
|
let mut subnets = vec![];
|
||||||
|
for subnet in m.iter() {
|
||||||
|
let (ip, masklen, p) = subnet;
|
||||||
|
if let Some(p) = p.upgrade() {
|
||||||
|
if Arc::ptr_eq(&p, &peer.0) {
|
||||||
|
subnets.push((ip, masklen))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// remove all key mappings
|
||||||
|
for subnet in subnets {
|
||||||
|
let r = m.remove(subnet.0, subnet.1);
|
||||||
|
debug_assert!(r.is_some());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Drop for Peer {
|
||||||
|
fn drop(&mut self) {
|
||||||
|
// mark peer as stopped
|
||||||
|
|
||||||
|
let peer = &self.0;
|
||||||
|
peer.stopped.store(true, Ordering::SeqCst);
|
||||||
|
|
||||||
|
// remove from cryptkey router
|
||||||
|
|
||||||
|
treebit_remove(self, &peer.device.ipv4);
|
||||||
|
treebit_remove(self, &peer.device.ipv6);
|
||||||
|
|
||||||
|
// unpark threads
|
||||||
|
|
||||||
|
peer.thread_inbound.lock().thread().unpark();
|
||||||
|
peer.thread_outbound.lock().thread().unpark();
|
||||||
|
|
||||||
|
// release ids from the receiver map
|
||||||
|
|
||||||
|
let mut keys = peer.keys.lock();
|
||||||
|
let mut release = Vec::with_capacity(3);
|
||||||
|
|
||||||
|
keys.next.as_ref().map(|k| release.push(k.recv.id));
|
||||||
|
keys.current.as_ref().map(|k| release.push(k.recv.id));
|
||||||
|
keys.previous.as_ref().map(|k| release.push(k.recv.id));
|
||||||
|
|
||||||
|
if release.len() > 0 {
|
||||||
|
let mut recv = peer.device.recv.write();
|
||||||
|
for id in &release {
|
||||||
|
recv.remove(id);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// null key-material (TODO: extend)
|
||||||
|
|
||||||
|
keys.next = None;
|
||||||
|
keys.current = None;
|
||||||
|
keys.previous = None;
|
||||||
|
|
||||||
|
*peer.ekey.lock() = None;
|
||||||
|
*peer.endpoint.lock() = None;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn new_peer(device: Arc<DeviceInner>) -> Peer {
|
||||||
|
// spawn inbound thread
|
||||||
|
let (send_inbound, recv_inbound) = sync_channel(1);
|
||||||
|
let handle_inbound = thread::spawn(move || {});
|
||||||
|
|
||||||
|
// spawn outbound thread
|
||||||
|
let (send_outbound, recv_inbound) = sync_channel(1);
|
||||||
|
let handle_outbound = thread::spawn(move || {});
|
||||||
|
|
||||||
|
// allocate peer object
|
||||||
|
Peer::new(PeerInner {
|
||||||
|
stopped: AtomicBool::new(false),
|
||||||
|
device: device,
|
||||||
|
ekey: spin::Mutex::new(None),
|
||||||
|
endpoint: spin::Mutex::new(None),
|
||||||
|
inorder_inbound: send_inbound,
|
||||||
|
inorder_outbound: send_outbound,
|
||||||
|
keys: spin::Mutex::new(KeyWheel {
|
||||||
|
next: None,
|
||||||
|
current: None,
|
||||||
|
previous: None,
|
||||||
|
retired: None,
|
||||||
|
}),
|
||||||
|
rx_bytes: AtomicU64::new(0),
|
||||||
|
tx_bytes: AtomicU64::new(0),
|
||||||
|
staged_packets: spin::Mutex::new(ArrayDeque::new()),
|
||||||
|
thread_inbound: spin::Mutex::new(handle_inbound),
|
||||||
|
thread_outbound: spin::Mutex::new(handle_outbound),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Peer {
|
||||||
|
fn new(inner : PeerInner) -> Peer {
|
||||||
|
Peer(Arc::new(inner))
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn set_endpoint(&self, endpoint: SocketAddr) {
|
||||||
|
*self.0.endpoint.lock() = Some(Arc::new(endpoint))
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Add a new keypair
|
||||||
|
///
|
||||||
|
/// # Arguments
|
||||||
|
///
|
||||||
|
/// - new: The new confirmed/unconfirmed key pair
|
||||||
|
///
|
||||||
|
/// # Returns
|
||||||
|
///
|
||||||
|
/// A vector of ids which has been released.
|
||||||
|
/// These should be released in the handshake module.
|
||||||
|
pub fn add_keypair(&self, new: KeyPair) -> Vec<u32> {
|
||||||
|
let mut keys = self.0.keys.lock();
|
||||||
|
let mut release = Vec::with_capacity(2);
|
||||||
|
let new = Arc::new(new);
|
||||||
|
|
||||||
|
// collect ids to be released
|
||||||
|
keys.retired.map(|v| release.push(v));
|
||||||
|
keys.previous.as_ref().map(|k| release.push(k.recv.id));
|
||||||
|
|
||||||
|
// update key-wheel
|
||||||
|
if new.confirmed {
|
||||||
|
// start using key for encryption
|
||||||
|
*self.0.ekey.lock() = Some(EncryptionState {
|
||||||
|
id: new.send.id,
|
||||||
|
key: new.send.key,
|
||||||
|
nonce: 0,
|
||||||
|
death: new.birth + REJECT_AFTER_TIME,
|
||||||
|
});
|
||||||
|
|
||||||
|
// move current into previous
|
||||||
|
keys.previous = keys.current.as_ref().map(|v| v.clone());;
|
||||||
|
keys.current = Some(new.clone());
|
||||||
|
} else {
|
||||||
|
// store the key and await confirmation
|
||||||
|
keys.previous = keys.next.as_ref().map(|v| v.clone());;
|
||||||
|
keys.next = Some(new.clone());
|
||||||
|
};
|
||||||
|
|
||||||
|
// update incoming packet id map
|
||||||
|
{
|
||||||
|
let mut recv = self.0.device.recv.write();
|
||||||
|
|
||||||
|
// purge recv map of released ids
|
||||||
|
for id in &release {
|
||||||
|
recv.remove(&id);
|
||||||
|
}
|
||||||
|
|
||||||
|
// map new id to keypair
|
||||||
|
debug_assert!(!recv.contains_key(&new.recv.id));
|
||||||
|
|
||||||
|
recv.insert(
|
||||||
|
new.recv.id,
|
||||||
|
DecryptionState {
|
||||||
|
keypair: Arc::downgrade(&new),
|
||||||
|
key: new.recv.key,
|
||||||
|
protector: spin::Mutex::new(AntiReplay::new()),
|
||||||
|
peer: Arc::downgrade(&self.0),
|
||||||
|
death: new.birth + REJECT_AFTER_TIME,
|
||||||
|
},
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
// return the released id (for handshake state machine)
|
||||||
|
release
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn rx_bytes(&self) -> u64 {
|
||||||
|
self.0.rx_bytes.load(Ordering::Relaxed)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn tx_bytes(&self) -> u64 {
|
||||||
|
self.0.tx_bytes.load(Ordering::Relaxed)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn add_subnet(&self, ip: IpAddr, masklen: u32) {
|
||||||
|
match ip {
|
||||||
|
IpAddr::V4(v4) => {
|
||||||
|
self.0
|
||||||
|
.device
|
||||||
|
.ipv4
|
||||||
|
.write()
|
||||||
|
.insert(v4, masklen, Arc::downgrade(&self.0))
|
||||||
|
}
|
||||||
|
IpAddr::V6(v6) => {
|
||||||
|
self.0
|
||||||
|
.device
|
||||||
|
.ipv6
|
||||||
|
.write()
|
||||||
|
.insert(v6, masklen, Arc::downgrade(&self.0))
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn list_subnets(&self) -> Vec<(IpAddr, u32)> {
|
||||||
|
let mut res = Vec::new();
|
||||||
|
res.append(&mut treebit_list(
|
||||||
|
&self.0,
|
||||||
|
&self.0.device.ipv4,
|
||||||
|
Box::new(|ip, masklen| (IpAddr::V4(ip), masklen)),
|
||||||
|
));
|
||||||
|
res.append(&mut treebit_list(
|
||||||
|
&self.0,
|
||||||
|
&self.0.device.ipv6,
|
||||||
|
Box::new(|ip, masklen| (IpAddr::V6(ip), masklen)),
|
||||||
|
));
|
||||||
|
res
|
||||||
|
}
|
||||||
|
}
|
||||||
153
src/router/workers.rs
Normal file
153
src/router/workers.rs
Normal file
@@ -0,0 +1,153 @@
|
|||||||
|
use super::device::DecryptionState;
|
||||||
|
use super::device::DeviceInner;
|
||||||
|
use super::peer::PeerInner;
|
||||||
|
|
||||||
|
use crossbeam_deque::{Injector, Steal, Stealer, Worker};
|
||||||
|
use spin;
|
||||||
|
use std::iter;
|
||||||
|
use std::sync::atomic::{AtomicBool, Ordering};
|
||||||
|
use std::sync::mpsc::{sync_channel, Receiver};
|
||||||
|
use std::sync::{Arc, Weak};
|
||||||
|
use std::thread;
|
||||||
|
|
||||||
|
#[derive(PartialEq)]
|
||||||
|
enum Operation {
|
||||||
|
Encryption,
|
||||||
|
Decryption,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(PartialEq)]
|
||||||
|
enum Status {
|
||||||
|
Fault, // unsealing failed
|
||||||
|
Done, // job valid and complete
|
||||||
|
Waiting, // job awaiting completion
|
||||||
|
}
|
||||||
|
|
||||||
|
struct JobInner {
|
||||||
|
msg: Vec<u8>, // message buffer (nonce and receiver id set)
|
||||||
|
key: [u8; 32], // chacha20poly1305 key
|
||||||
|
status: Status, // state of the job
|
||||||
|
op: Operation, // should be buffer be encrypted / decrypted?
|
||||||
|
}
|
||||||
|
|
||||||
|
type JobBuffer = Arc<spin::Mutex<JobInner>>;
|
||||||
|
type JobParallel = (Arc<thread::JoinHandle<()>>, JobBuffer);
|
||||||
|
type JobInbound = (Arc<DecryptionState>, JobBuffer);
|
||||||
|
type JobOutbound = (Weak<PeerInner>, JobBuffer);
|
||||||
|
|
||||||
|
/* Strategy for workers acquiring a new job:
|
||||||
|
*
|
||||||
|
* 1. Try the local job queue (owned by the thread)
|
||||||
|
* 2. Try fetching a batch of jobs from the global injector
|
||||||
|
* 3. Attempt to steal jobs from other threads.
|
||||||
|
*/
|
||||||
|
fn find_task<T>(local: &Worker<T>, global: &Injector<T>, stealers: &[Stealer<T>]) -> Option<T> {
|
||||||
|
local.pop().or_else(|| {
|
||||||
|
iter::repeat_with(|| {
|
||||||
|
global
|
||||||
|
.steal_batch_and_pop(local)
|
||||||
|
.or_else(|| stealers.iter().map(|s| s.steal()).collect())
|
||||||
|
})
|
||||||
|
.find(|s| !s.is_retry())
|
||||||
|
.and_then(|s| s.success())
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
fn worker_inbound(
|
||||||
|
device: Arc<DeviceInner>, // related device
|
||||||
|
peer: Arc<PeerInner>, // related peer
|
||||||
|
recv: Receiver<JobInbound>, // in order queue
|
||||||
|
) {
|
||||||
|
// reads from in order channel
|
||||||
|
for job in recv.recv().iter() {
|
||||||
|
loop {
|
||||||
|
let (state, buf) = job;
|
||||||
|
|
||||||
|
// check if job is complete
|
||||||
|
match buf.try_lock() {
|
||||||
|
None => (),
|
||||||
|
Some(buf) => {
|
||||||
|
if buf.status != Status::Waiting {
|
||||||
|
// check replay protector
|
||||||
|
|
||||||
|
// check if confirms keypair
|
||||||
|
|
||||||
|
// write to tun device
|
||||||
|
|
||||||
|
// continue to next job (no parking)
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// wait for job to complete
|
||||||
|
thread::park();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn worker_outbound(
|
||||||
|
device: Arc<DeviceInner>, // related device
|
||||||
|
peer: Arc<PeerInner>, // related peer
|
||||||
|
recv: Receiver<JobInbound>, // in order queue
|
||||||
|
) {
|
||||||
|
// reads from in order channel
|
||||||
|
for job in recv.recv().iter() {
|
||||||
|
loop {
|
||||||
|
let (peer, buf) = job;
|
||||||
|
|
||||||
|
// check if job is complete
|
||||||
|
match buf.try_lock() {
|
||||||
|
None => (),
|
||||||
|
Some(buf) => {
|
||||||
|
if buf.status != Status::Waiting {
|
||||||
|
// send buffer to peer endpoint
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// wait for job to complete
|
||||||
|
thread::park();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn worker_parallel(
|
||||||
|
stopped: Arc<AtomicBool>, // stop workers (device has been dropped)
|
||||||
|
parked: Arc<AtomicBool>, // thread has been parked?
|
||||||
|
local: Worker<JobParallel>, // local job queue (local to thread)
|
||||||
|
global: Injector<JobParallel>, // global job injector
|
||||||
|
stealers: Vec<Stealer<JobParallel>>, // stealers (from other threads)
|
||||||
|
) {
|
||||||
|
while !stopped.load(Ordering::SeqCst) {
|
||||||
|
match find_task(&local, &global, &stealers) {
|
||||||
|
Some(job) => {
|
||||||
|
let (handle, buf) = job;
|
||||||
|
|
||||||
|
// take ownership of the job buffer and complete it
|
||||||
|
{
|
||||||
|
let mut buf = buf.lock();
|
||||||
|
match buf.op {
|
||||||
|
Operation::Encryption => {
|
||||||
|
// TODO: encryption
|
||||||
|
buf.status = Status::Done;
|
||||||
|
}
|
||||||
|
Operation::Decryption => {
|
||||||
|
// TODO: decryption
|
||||||
|
buf.status = Status::Done;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// ensure consumer is unparked
|
||||||
|
handle.thread().unpark();
|
||||||
|
}
|
||||||
|
None => {
|
||||||
|
// no jobs, park the worker
|
||||||
|
parked.store(true, Ordering::Release);
|
||||||
|
thread::park();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
6
src/types/endpoint.rs
Normal file
6
src/types/endpoint.rs
Normal file
@@ -0,0 +1,6 @@
|
|||||||
|
use std::net::SocketAddr;
|
||||||
|
|
||||||
|
/* The generic implementation (not supporting "sticky-sockets"),
|
||||||
|
* is to simply use SocketAddr directly as the endpoint.
|
||||||
|
*/
|
||||||
|
pub trait Endpoint: Into<SocketAddr> {}
|
||||||
@@ -1,7 +1,9 @@
|
|||||||
|
mod endpoint;
|
||||||
mod keys;
|
mod keys;
|
||||||
mod tun;
|
mod tun;
|
||||||
mod udp;
|
mod udp;
|
||||||
|
|
||||||
|
pub use endpoint::Endpoint;
|
||||||
pub use keys::{Key, KeyPair};
|
pub use keys::{Key, KeyPair};
|
||||||
pub use tun::Tun;
|
pub use tun::Tun;
|
||||||
pub use udp::Bind;
|
pub use udp::Bind;
|
||||||
@@ -1,9 +1,11 @@
|
|||||||
|
use super::Endpoint;
|
||||||
use std::error;
|
use std::error;
|
||||||
|
|
||||||
/* Often times an a file descriptor in an atomic might suffice.
|
/* Often times an a file descriptor in an atomic might suffice.
|
||||||
*/
|
*/
|
||||||
pub trait Bind<Endpoint>: Send + Sync {
|
pub trait Bind: Send + Sync {
|
||||||
type Error : error::Error;
|
type Error: error::Error;
|
||||||
|
type Endpoint: Endpoint;
|
||||||
|
|
||||||
fn new() -> Self;
|
fn new() -> Self;
|
||||||
|
|
||||||
@@ -20,7 +22,6 @@ pub trait Bind<Endpoint>: Send + Sync {
|
|||||||
|
|
||||||
/// Returns the current port of the bind
|
/// Returns the current port of the bind
|
||||||
fn get_port(&self) -> u16;
|
fn get_port(&self) -> u16;
|
||||||
|
fn recv(&self, dst: &mut [u8]) -> Self::Endpoint;
|
||||||
fn recv(&self, dst: &mut [u8]) -> Endpoint;
|
fn send(&self, src: &[u8], dst: &Self::Endpoint);
|
||||||
fn send(&self, src: &[u8], dst: &Endpoint);
|
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user