Begin drafting cross-platform interface

This commit is contained in:
Mathias Hall-Andersen
2019-08-16 22:00:48 +02:00
parent 726163b7f1
commit 5aeea9b619
8 changed files with 77 additions and 42 deletions

1
Cargo.lock generated
View File

@@ -1026,6 +1026,7 @@ dependencies = [
"arraydeque 0.4.5 (registry+https://github.com/rust-lang/crates.io-index)", "arraydeque 0.4.5 (registry+https://github.com/rust-lang/crates.io-index)",
"blake2 0.8.0 (registry+https://github.com/rust-lang/crates.io-index)", "blake2 0.8.0 (registry+https://github.com/rust-lang/crates.io-index)",
"byteorder 1.3.1 (registry+https://github.com/rust-lang/crates.io-index)", "byteorder 1.3.1 (registry+https://github.com/rust-lang/crates.io-index)",
"crossbeam-deque 0.7.1 (registry+https://github.com/rust-lang/crates.io-index)",
"digest 0.8.0 (registry+https://github.com/rust-lang/crates.io-index)", "digest 0.8.0 (registry+https://github.com/rust-lang/crates.io-index)",
"futures 0.1.28 (registry+https://github.com/rust-lang/crates.io-index)", "futures 0.1.28 (registry+https://github.com/rust-lang/crates.io-index)",
"generic-array 0.12.3 (registry+https://github.com/rust-lang/crates.io-index)", "generic-array 0.12.3 (registry+https://github.com/rust-lang/crates.io-index)",

View File

@@ -21,6 +21,7 @@ tokio = "0.1.22"
futures = "0.1.28" futures = "0.1.28"
arraydeque = "^0.4" arraydeque = "^0.4"
treebitmap = "^0.4" treebitmap = "^0.4"
crossbeam-deque = "0.7"
[dependencies.x25519-dalek] [dependencies.x25519-dalek]
version = "^0.5" version = "^0.5"

View File

@@ -1,6 +1,7 @@
#![feature(test)] #![feature(test)]
mod handshake; mod handshake;
mod platform;
mod router; mod router;
mod types; mod types;
@@ -13,7 +14,7 @@ fn main() {
// choose optimal crypto implementations for platform // choose optimal crypto implementations for platform
sodiumoxide::init().unwrap(); sodiumoxide::init().unwrap();
let mut rdev = router::Device::new(); let mut rdev = router::Device::new(8);
let pref = rdev.add(); let pref = rdev.add();
} }

2
src/platform/mod.rs Normal file
View File

@@ -0,0 +1,2 @@
mod tun;
mod udp;

10
src/platform/tun.rs Normal file
View File

@@ -0,0 +1,10 @@
use std::sync::atomic::AtomicUsize;
use std::sync::Arc;
pub trait Tun: Send + Sync {
type Error;
fn new(mtu: Arc<AtomicUsize>) -> Self;
fn read(&self, dst: &mut [u8]) -> Result<usize, Self::Error>;
fn write(&self, src: &[u8]) -> Result<(), Self::Error>;
}

11
src/platform/udp.rs Normal file
View File

@@ -0,0 +1,11 @@
/* Often times an a file descriptor in an atomic might suffice.
*/
pub trait Bind<Endpoint>: Send + Sync {
type Error;
fn new() -> Self;
fn set_port(&self, port: u16) -> Result<(), Self::Error>;
fn get_port(&self) -> u16;
fn recv(&self, dst: &mut [u8]) -> Endpoint;
fn send(&self, src: &[u8], dst: &Endpoint);
}

View File

@@ -1,11 +1,11 @@
use arraydeque::{ArrayDeque, Saturating, Wrapping}; use arraydeque::{ArrayDeque, Wrapping};
use treebitmap::IpLookupTable; use treebitmap::IpLookupTable;
use crossbeam_deque::{Injector, Steal};
use std::collections::HashMap; use std::collections::HashMap;
use std::error::Error; use std::mem;
use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr}; use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr};
use std::ptr; use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use std::sync::atomic::{AtomicBool, AtomicPtr, AtomicU64, Ordering};
use std::sync::mpsc::{sync_channel, Receiver, SyncSender}; use std::sync::mpsc::{sync_channel, Receiver, SyncSender};
use std::sync::{Arc, Mutex, Weak}; use std::sync::{Arc, Mutex, Weak};
use std::thread; use std::thread;
@@ -21,12 +21,29 @@ use std::u64;
const REJECT_AFTER_MESSAGES: u64 = u64::MAX - (1 << 4); const REJECT_AFTER_MESSAGES: u64 = u64::MAX - (1 << 4);
const MAX_STAGED_PACKETS: usize = 128; const MAX_STAGED_PACKETS: usize = 128;
pub struct Device { struct DeviceInner {
stopped: AtomicBool,
injector: Injector<()>, // parallel enc/dec task injector
threads: Vec<thread::JoinHandle<()>>,
recv: spin::RwLock<HashMap<u32, DecryptionState>>, recv: spin::RwLock<HashMap<u32, DecryptionState>>,
ipv4: IpLookupTable<Ipv4Addr, Weak<PeerInner>>, ipv4: IpLookupTable<Ipv4Addr, Weak<PeerInner>>,
ipv6: IpLookupTable<Ipv6Addr, Weak<PeerInner>>, ipv6: IpLookupTable<Ipv6Addr, Weak<PeerInner>>,
} }
struct PeerInner {
stopped: AtomicBool,
thread_outbound: spin::Mutex<thread::JoinHandle<()>>,
thread_inbound: spin::Mutex<thread::JoinHandle<()>>,
inorder_outbound: SyncSender<()>,
inorder_inbound: SyncSender<()>,
staged_packets: Mutex<ArrayDeque<[Vec<u8>; MAX_STAGED_PACKETS], Wrapping>>, // packets awaiting handshake
rx_bytes: AtomicU64, // received bytes
tx_bytes: AtomicU64, // transmitted bytes
keys: spin::Mutex<KeyWheel>, // key-wheel
ekey: spin::Mutex<EncryptionState>, // encryption state
endpoint: spin::Mutex<Option<Arc<SocketAddr>>>,
}
struct EncryptionState { struct EncryptionState {
key: [u8; 32], // encryption key key: [u8; 32], // encryption key
id: u32, // sender id id: u32, // sender id
@@ -48,33 +65,34 @@ struct KeyWheel {
previous: Option<KeyPair>, // old key state (used for decryption) previous: Option<KeyPair>, // old key state (used for decryption)
} }
struct PeerInner {
inorder_outbound: SyncSender<()>,
inorder_inbound: SyncSender<()>,
staged_packets: Mutex<ArrayDeque<[Vec<u8>; MAX_STAGED_PACKETS], Wrapping>>, // packets awaiting handshake
rx_bytes: AtomicU64, // received bytes
tx_bytes: AtomicU64, // transmitted bytes
keys: spin::Mutex<KeyWheel>, // key-wheel
ekey: spin::Mutex<EncryptionState>, // encryption state
endpoint: spin::Mutex<Option<Arc<SocketAddr>>>,
}
pub struct Peer(Arc<PeerInner>); pub struct Peer(Arc<PeerInner>);
pub struct Device(DeviceInner);
impl Drop for Peer { impl Drop for Peer {
fn drop(&mut self) { fn drop(&mut self) {
// stop threads and remove peer from device // mark peer as stopped
let inner = &self.0;
inner.stopped.store(true, Ordering::SeqCst);
// unpark threads to stop
inner.thread_inbound.lock().thread().unpark();
inner.thread_outbound.lock().thread().unpark();
} }
} }
impl Drop for Device { impl Drop for Device {
fn drop(&mut self) { fn drop(&mut self) {
// stop threads // mark device as stopped
let inner = &self.0;
inner.stopped.store(true, Ordering::SeqCst);
// eat all parallel jobs
while inner.injector.steal() != Steal::Empty {}
} }
} }
impl Peer { impl Peer {
fn set_endpoint(&self, endpoint: SocketAddr) { pub fn set_endpoint(&self, endpoint: SocketAddr) {
*self.0.endpoint.lock() = Some(Arc::new(endpoint)) *self.0.endpoint.lock() = Some(Arc::new(endpoint))
} }
@@ -87,7 +105,7 @@ impl Peer {
}; };
} }
pub fn keypair_add(&self, new: KeyPair) -> Option<u32> { fn keypair_add(&self, new: KeyPair) -> Option<u32> {
let mut keys = self.0.keys.lock(); let mut keys = self.0.keys.lock();
let release = keys.previous.map(|k| k.recv.id); let release = keys.previous.map(|k| k.recv.id);
@@ -124,30 +142,21 @@ impl Peer {
} }
impl Device { impl Device {
pub fn new() -> Device { pub fn new(workers: usize) -> Device {
Device { Device(DeviceInner {
threads: vec![],
stopped: AtomicBool::new(false),
injector: Injector::new(),
recv: spin::RwLock::new(HashMap::new()), recv: spin::RwLock::new(HashMap::new()),
ipv4: IpLookupTable::new(), ipv4: IpLookupTable::new(),
ipv6: IpLookupTable::new(), ipv6: IpLookupTable::new(),
} })
}
pub fn release(&self, id: u32) {
debug_assert!(
if let Some(_) = self.recv.read().get(&id) {
true
} else {
false
},
true
);
self.recv.write().remove(&id);
} }
pub fn add_subnet(&mut self, ip: IpAddr, masklen: u32, peer: Peer) { pub fn add_subnet(&mut self, ip: IpAddr, masklen: u32, peer: Peer) {
match ip { match ip {
IpAddr::V4(v4) => self.ipv4.insert(v4, masklen, Arc::downgrade(&peer.0)), IpAddr::V4(v4) => self.0.ipv4.insert(v4, masklen, Arc::downgrade(&peer.0)),
IpAddr::V6(v6) => self.ipv6.insert(v6, masklen, Arc::downgrade(&peer.0)), IpAddr::V6(v6) => self.0.ipv6.insert(v6, masklen, Arc::downgrade(&peer.0)),
}; };
} }
@@ -155,7 +164,7 @@ impl Device {
let mut subnets = Vec::new(); let mut subnets = Vec::new();
// extract ipv4 entries // extract ipv4 entries
for subnet in self.ipv4.iter() { for subnet in self.0.ipv4.iter() {
let (ip, masklen, p) = subnet; let (ip, masklen, p) = subnet;
if let Some(p) = p.upgrade() { if let Some(p) = p.upgrade() {
if Arc::ptr_eq(&p, &peer.0) { if Arc::ptr_eq(&p, &peer.0) {
@@ -165,7 +174,7 @@ impl Device {
} }
// extract ipv6 entries // extract ipv6 entries
for subnet in self.ipv6.iter() { for subnet in self.0.ipv6.iter() {
let (ip, masklen, p) = subnet; let (ip, masklen, p) = subnet;
if let Some(p) = p.upgrade() { if let Some(p) = p.upgrade() {
if Arc::ptr_eq(&p, &peer.0) { if Arc::ptr_eq(&p, &peer.0) {
@@ -182,7 +191,7 @@ impl Device {
let release = peer.keypair_add(new); let release = peer.keypair_add(new);
// update incoming packet id map // update incoming packet id map
let mut recv = self.recv.write(); let mut recv = self.0.recv.write();
// release id of previous keypair // release id of previous keypair
if let Some(id) = release { if let Some(id) = release {

View File

@@ -4,4 +4,4 @@ mod device;
// mod inbound; // mod inbound;
// mod outbound; // mod outbound;
pub use device::Device; pub use device::{Device, Peer};