Added Bind trait to router

This commit is contained in:
Mathias Hall-Andersen
2019-08-31 21:00:10 +02:00
parent 46d76b80c6
commit d16521f4c7
6 changed files with 70 additions and 60 deletions

View File

@@ -52,14 +52,14 @@ impl Tun for TunTest {
} }
} }
struct Test {} struct BindTest {}
impl Bind for Test { impl Bind for BindTest {
type Error = BindError; type Error = BindError;
type Endpoint = SocketAddr; type Endpoint = SocketAddr;
fn new() -> Test { fn new() -> BindTest {
Test {} BindTest {}
} }
fn set_port(&self, port: u16) -> Result<(), Self::Error> { fn set_port(&self, port: u16) -> Result<(), Self::Error> {
@@ -111,6 +111,7 @@ fn main() {
let router = router::Device::new( let router = router::Device::new(
4, 4,
TunTest {}, TunTest {},
BindTest {},
|t: &PeerTimer, data: bool, sent: bool| t.a.reset(Duration::from_millis(1000)), |t: &PeerTimer, data: bool, sent: bool| t.a.reset(Duration::from_millis(1000)),
|t: &PeerTimer, data: bool, sent: bool| t.b.reset(Duration::from_millis(1000)), |t: &PeerTimer, data: bool, sent: bool| t.b.reset(Duration::from_millis(1000)),
|t: &PeerTimer| println!("new key requested"), |t: &PeerTimer| println!("new key requested"),

View File

@@ -15,12 +15,13 @@ use super::anti_replay::AntiReplay;
use super::peer; use super::peer;
use super::peer::{Peer, PeerInner}; use super::peer::{Peer, PeerInner};
use super::types::{Callback, Callbacks, CallbacksPhantom, KeyCallback, Opaque}; use super::types::{Callback, Callbacks, KeyCallback, Opaque, PhantomCallbacks};
use super::workers::{worker_parallel, JobParallel}; use super::workers::{worker_parallel, JobParallel};
pub struct DeviceInner<C: Callbacks, T: Tun> { pub struct DeviceInner<C: Callbacks, T: Tun, B: Bind> {
// IO & timer generics // IO & timer generics
pub tun: T, pub tun: T,
pub bind: B,
pub call_recv: C::CallbackRecv, pub call_recv: C::CallbackRecv,
pub call_send: C::CallbackSend, pub call_send: C::CallbackSend,
pub call_need_key: C::CallbackKey, pub call_need_key: C::CallbackKey,
@@ -31,9 +32,9 @@ pub struct DeviceInner<C: Callbacks, T: Tun> {
pub injector: Injector<JobParallel>, // parallel enc/dec task injector pub injector: Injector<JobParallel>, // parallel enc/dec task injector
// routing // routing
pub recv: spin::RwLock<HashMap<u32, DecryptionState<C, T>>>, // receiver id -> decryption state pub recv: spin::RwLock<HashMap<u32, DecryptionState<C, T, B>>>, // receiver id -> decryption state
pub ipv4: spin::RwLock<IpLookupTable<Ipv4Addr, Weak<PeerInner<C, T>>>>, // ipv4 cryptkey routing pub ipv4: spin::RwLock<IpLookupTable<Ipv4Addr, Weak<PeerInner<C, T, B>>>>, // ipv4 cryptkey routing
pub ipv6: spin::RwLock<IpLookupTable<Ipv6Addr, Weak<PeerInner<C, T>>>>, // ipv6 cryptkey routing pub ipv6: spin::RwLock<IpLookupTable<Ipv6Addr, Weak<PeerInner<C, T, B>>>>, // ipv6 cryptkey routing
} }
pub struct EncryptionState { pub struct EncryptionState {
@@ -44,18 +45,21 @@ pub struct EncryptionState {
// (birth + reject-after-time - keepalive-timeout - rekey-timeout) // (birth + reject-after-time - keepalive-timeout - rekey-timeout)
} }
pub struct DecryptionState<C: Callbacks, T: Tun> { pub struct DecryptionState<C: Callbacks, T: Tun, B: Bind> {
pub key: [u8; 32], pub key: [u8; 32],
pub keypair: Weak<KeyPair>, pub keypair: Weak<KeyPair>,
pub confirmed: AtomicBool, pub confirmed: AtomicBool,
pub protector: spin::Mutex<AntiReplay>, pub protector: spin::Mutex<AntiReplay>,
pub peer: Weak<PeerInner<C, T>>, pub peer: Weak<PeerInner<C, T, B>>,
pub death: Instant, // time when the key can no longer be used for decryption pub death: Instant, // time when the key can no longer be used for decryption
} }
pub struct Device<C: Callbacks, T: Tun>(Arc<DeviceInner<C, T>>, Vec<thread::JoinHandle<()>>); pub struct Device<C: Callbacks, T: Tun, B: Bind>(
Arc<DeviceInner<C, T, B>>,
Vec<thread::JoinHandle<()>>,
);
impl<C: Callbacks, T: Tun> Drop for Device<C, T> { impl<C: Callbacks, T: Tun, B: Bind> Drop for Device<C, T, B> {
fn drop(&mut self) { fn drop(&mut self) {
// mark device as stopped // mark device as stopped
let device = &self.0; let device = &self.0;
@@ -73,19 +77,21 @@ impl<C: Callbacks, T: Tun> Drop for Device<C, T> {
} }
} }
impl<O: Opaque, R: Callback<O>, S: Callback<O>, K: KeyCallback<O>, T: Tun> impl<O: Opaque, R: Callback<O>, S: Callback<O>, K: KeyCallback<O>, T: Tun, B: Bind>
Device<CallbacksPhantom<O, R, S, K>, T> Device<PhantomCallbacks<O, R, S, K>, T, B>
{ {
pub fn new( pub fn new(
num_workers: usize, num_workers: usize,
tun: T, tun: T,
bind: B,
call_recv: R, call_recv: R,
call_send: S, call_send: S,
call_need_key: K, call_need_key: K,
) -> Device<CallbacksPhantom<O, R, S, K>, T> { ) -> Device<PhantomCallbacks<O, R, S, K>, T, B> {
// allocate shared device state // allocate shared device state
let inner = Arc::new(DeviceInner { let inner = Arc::new(DeviceInner {
tun, tun,
bind,
call_recv, call_recv,
call_send, call_send,
call_need_key, call_need_key,
@@ -122,13 +128,13 @@ impl<O: Opaque, R: Callback<O>, S: Callback<O>, K: KeyCallback<O>, T: Tun>
} }
} }
impl<C: Callbacks, T: Tun> Device<C, T> { impl<C: Callbacks, T: Tun, B: Bind> Device<C, T, B> {
/// Adds a new peer to the device /// Adds a new peer to the device
/// ///
/// # Returns /// # Returns
/// ///
/// A atomic ref. counted peer (with liftime matching the device) /// A atomic ref. counted peer (with liftime matching the device)
pub fn new_peer(&self, opaque: C::Opaque) -> Peer<C, T> { pub fn new_peer(&self, opaque: C::Opaque) -> Peer<C, T, B> {
peer::new_peer(self.0.clone(), opaque) peer::new_peer(self.0.clone(), opaque)
} }

View File

@@ -12,7 +12,7 @@ use treebitmap::address::Address;
use treebitmap::IpLookupTable; use treebitmap::IpLookupTable;
use super::super::constants::*; use super::super::constants::*;
use super::super::types::{KeyPair, Tun}; use super::super::types::{KeyPair, Tun, Bind};
use super::anti_replay::AntiReplay; use super::anti_replay::AntiReplay;
use super::device::DecryptionState; use super::device::DecryptionState;
@@ -31,14 +31,14 @@ pub struct KeyWheel {
retired: Option<u32>, // retired id (previous id, after confirming key-pair) retired: Option<u32>, // retired id (previous id, after confirming key-pair)
} }
pub struct PeerInner<C: Callbacks, T: Tun> { pub struct PeerInner<C: Callbacks, T: Tun, B: Bind> {
pub stopped: AtomicBool, pub stopped: AtomicBool,
pub opaque: C::Opaque, pub opaque: C::Opaque,
pub device: Arc<DeviceInner<C, T>>, pub device: Arc<DeviceInner<C, T, B>>,
pub thread_outbound: spin::Mutex<Option<thread::JoinHandle<()>>>, pub thread_outbound: spin::Mutex<Option<thread::JoinHandle<()>>>,
pub thread_inbound: spin::Mutex<Option<thread::JoinHandle<()>>>, pub thread_inbound: spin::Mutex<Option<thread::JoinHandle<()>>>,
pub queue_outbound: SyncSender<JobOutbound>, pub queue_outbound: SyncSender<JobOutbound>,
pub queue_inbound: SyncSender<JobInbound<C, T>>, pub queue_inbound: SyncSender<JobInbound<C, T, B>>,
pub staged_packets: spin::Mutex<ArrayDeque<[Vec<u8>; MAX_STAGED_PACKETS], Wrapping>>, // packets awaiting handshake pub staged_packets: spin::Mutex<ArrayDeque<[Vec<u8>; MAX_STAGED_PACKETS], Wrapping>>, // packets awaiting handshake
pub rx_bytes: AtomicU64, // received bytes pub rx_bytes: AtomicU64, // received bytes
pub tx_bytes: AtomicU64, // transmitted bytes pub tx_bytes: AtomicU64, // transmitted bytes
@@ -47,13 +47,13 @@ pub struct PeerInner<C: Callbacks, T: Tun> {
pub endpoint: spin::Mutex<Option<Arc<SocketAddr>>>, pub endpoint: spin::Mutex<Option<Arc<SocketAddr>>>,
} }
pub struct Peer<C: Callbacks, T: Tun>( pub struct Peer<C: Callbacks, T: Tun, B: Bind>(
Arc<PeerInner<C, T>>, Arc<PeerInner<C, T, B>>,
); );
fn treebit_list<A, E, C: Callbacks, T: Tun>( fn treebit_list<A, E, C: Callbacks, T: Tun, B: Bind>(
peer: &Arc<PeerInner<C, T>>, peer: &Arc<PeerInner<C, T, B>>,
table: &spin::RwLock<IpLookupTable<A, Weak<PeerInner<C, T>>>>, table: &spin::RwLock<IpLookupTable<A, Weak<PeerInner<C, T, B>>>>,
callback: Box<dyn Fn(A, u32) -> E>, callback: Box<dyn Fn(A, u32) -> E>,
) -> Vec<E> ) -> Vec<E>
where where
@@ -71,9 +71,9 @@ where
res res
} }
fn treebit_remove<A: Address, C: Callbacks, T: Tun>( fn treebit_remove<A: Address, C: Callbacks, T: Tun, B: Bind>(
peer: &Peer<C, T>, peer: &Peer<C, T, B>,
table: &spin::RwLock<IpLookupTable<A, Weak<PeerInner<C, T>>>>, table: &spin::RwLock<IpLookupTable<A, Weak<PeerInner<C, T, B>>>>,
) { ) {
let mut m = table.write(); let mut m = table.write();
@@ -95,7 +95,7 @@ fn treebit_remove<A: Address, C: Callbacks, T: Tun>(
} }
} }
impl<C: Callbacks, T: Tun> Drop for Peer<C, T> { impl<C: Callbacks, T: Tun, B: Bind> Drop for Peer<C, T, B> {
fn drop(&mut self) { fn drop(&mut self) {
// mark peer as stopped // mark peer as stopped
@@ -150,10 +150,10 @@ impl<C: Callbacks, T: Tun> Drop for Peer<C, T> {
} }
} }
pub fn new_peer<C: Callbacks, T: Tun>( pub fn new_peer<C: Callbacks, T: Tun, B: Bind>(
device: Arc<DeviceInner<C, T>>, device: Arc<DeviceInner<C, T, B>>,
opaque: C::Opaque, opaque: C::Opaque,
) -> Peer<C, T> { ) -> Peer<C, T, B> {
// allocate in-order queues // allocate in-order queues
let (send_inbound, recv_inbound) = sync_channel(MAX_STAGED_PACKETS); let (send_inbound, recv_inbound) = sync_channel(MAX_STAGED_PACKETS);
let (send_outbound, recv_outbound) = sync_channel(MAX_STAGED_PACKETS); let (send_outbound, recv_outbound) = sync_channel(MAX_STAGED_PACKETS);
@@ -204,7 +204,7 @@ pub fn new_peer<C: Callbacks, T: Tun>(
Peer(peer) Peer(peer)
} }
impl<C: Callbacks, T: Tun> PeerInner<C, T> { impl<C: Callbacks, T: Tun, B: Bind> PeerInner<C, T, B> {
pub fn confirm_key(&self, kp: Weak<KeyPair>) { pub fn confirm_key(&self, kp: Weak<KeyPair>) {
// upgrade key-pair to strong reference // upgrade key-pair to strong reference
@@ -214,8 +214,8 @@ impl<C: Callbacks, T: Tun> PeerInner<C, T> {
} }
} }
impl<C: Callbacks, T: Tun> Peer<C, T> { impl<C: Callbacks, T: Tun, B: Bind> Peer<C, T, B> {
fn new(inner: PeerInner<C, T>) -> Peer<C, T> { fn new(inner: PeerInner<C, T, B>) -> Peer<C, T, B> {
Peer(Arc::new(inner)) Peer(Arc::new(inner))
} }

View File

@@ -20,10 +20,6 @@ pub trait KeyCallback<T>: Fn(&T) -> () + Sync + Send + 'static {}
impl<T, F> KeyCallback<T> for F where F: Fn(&T) -> () + Sync + Send + 'static {} impl<T, F> KeyCallback<T> for F where F: Fn(&T) -> () + Sync + Send + 'static {}
pub trait TunCallback<T>: Fn(&T, bool, bool) -> () + Sync + Send + 'static {}
pub trait BindCallback<T>: Fn(&T, bool, bool) -> () + Sync + Send + 'static {}
pub trait Endpoint: Send + Sync {} pub trait Endpoint: Send + Sync {}
pub trait Callbacks: Send + Sync + 'static { pub trait Callbacks: Send + Sync + 'static {
@@ -33,14 +29,21 @@ pub trait Callbacks: Send + Sync + 'static {
type CallbackKey: KeyCallback<Self::Opaque>; type CallbackKey: KeyCallback<Self::Opaque>;
} }
pub struct CallbacksPhantom<O: Opaque, R: Callback<O>, S: Callback<O>, K: KeyCallback<O>> { /* Concrete implementation of "Callbacks",
* used to hide the constituent type parameters.
*
* This type is never instantiated.
*/
pub struct PhantomCallbacks<O: Opaque, R: Callback<O>, S: Callback<O>, K: KeyCallback<O>> {
_phantom_opaque: PhantomData<O>, _phantom_opaque: PhantomData<O>,
_phantom_recv: PhantomData<R>, _phantom_recv: PhantomData<R>,
_phantom_send: PhantomData<S>, _phantom_send: PhantomData<S>,
_phantom_key: PhantomData<K> _phantom_key: PhantomData<K>,
} }
impl <O: Opaque, R: Callback<O>, S: Callback<O>, K: KeyCallback<O>> Callbacks for CallbacksPhantom<O, R, S, K> { impl<O: Opaque, R: Callback<O>, S: Callback<O>, K: KeyCallback<O>> Callbacks
for PhantomCallbacks<O, R, S, K>
{
type Opaque = O; type Opaque = O;
type CallbackRecv = R; type CallbackRecv = R;
type CallbackSend = S; type CallbackSend = S;

View File

@@ -17,7 +17,7 @@ use super::messages::TransportHeader;
use super::peer::PeerInner; use super::peer::PeerInner;
use super::types::Callbacks; use super::types::Callbacks;
use super::super::types::Tun; use super::super::types::{Tun, Bind};
#[derive(PartialEq, Debug)] #[derive(PartialEq, Debug)]
pub enum Operation { pub enum Operation {
@@ -41,7 +41,7 @@ pub struct JobInner {
pub type JobBuffer = Arc<spin::Mutex<JobInner>>; pub type JobBuffer = Arc<spin::Mutex<JobInner>>;
pub type JobParallel = (Arc<thread::JoinHandle<()>>, JobBuffer); pub type JobParallel = (Arc<thread::JoinHandle<()>>, JobBuffer);
pub type JobInbound<C, T> = (Weak<DecryptionState<C, T>>, JobBuffer); pub type JobInbound<C, T, B> = (Weak<DecryptionState<C, T, B>>, JobBuffer);
pub type JobOutbound = JobBuffer; pub type JobOutbound = JobBuffer;
/* Strategy for workers acquiring a new job: /* Strategy for workers acquiring a new job:
@@ -89,10 +89,10 @@ fn wait_recv<T>(running: &AtomicBool, recv: &Receiver<T>) -> Result<T, TryRecvEr
return Err(TryRecvError::Disconnected); return Err(TryRecvError::Disconnected);
} }
pub fn worker_inbound<C: Callbacks, T: Tun>( pub fn worker_inbound<C: Callbacks, T: Tun, B: Bind>(
device: Arc<DeviceInner<C, T>>, // related device device: Arc<DeviceInner<C, T, B>>, // related device
peer: Arc<PeerInner<C, T>>, // related peer peer: Arc<PeerInner<C, T, B>>, // related peer
recv: Receiver<JobInbound<C, T>>, // in order queue recv: Receiver<JobInbound<C, T, B>>, // in order queue
) { ) {
loop { loop {
match wait_recv(&peer.stopped, &recv) { match wait_recv(&peer.stopped, &recv) {
@@ -157,9 +157,9 @@ pub fn worker_inbound<C: Callbacks, T: Tun>(
} }
} }
pub fn worker_outbound<C: Callbacks, T: Tun>( pub fn worker_outbound<C: Callbacks, T: Tun, B: Bind>(
device: Arc<DeviceInner<C, T>>, // related device device: Arc<DeviceInner<C, T, B>>, // related device
peer: Arc<PeerInner<C, T>>, // related peer peer: Arc<PeerInner<C, T, B>>, // related peer
recv: Receiver<JobOutbound>, // in order queue recv: Receiver<JobOutbound>, // in order queue
) { ) {
loop { loop {
@@ -205,8 +205,8 @@ pub fn worker_outbound<C: Callbacks, T: Tun>(
} }
} }
pub fn worker_parallel<C: Callbacks, T: Tun>( pub fn worker_parallel<C: Callbacks, T: Tun, B: Bind>(
device: Arc<DeviceInner<C, T>>, device: Arc<DeviceInner<C, T, B>>,
local: Worker<JobParallel>, // local job queue (local to thread) local: Worker<JobParallel>, // local job queue (local to thread)
stealers: Vec<Stealer<JobParallel>>, // stealers (from other threads) stealers: Vec<Stealer<JobParallel>>, // stealers (from other threads)
) { ) {

View File

@@ -3,7 +3,7 @@ use std::error;
/* Often times an a file descriptor in an atomic might suffice. /* Often times an a file descriptor in an atomic might suffice.
*/ */
pub trait Bind: Send + Sync { pub trait Bind: Send + Sync + 'static {
type Error: error::Error; type Error: error::Error;
type Endpoint: Endpoint; type Endpoint: Endpoint;