Work on callback structure for cryptkey router
This commit is contained in:
30
src/main.rs
30
src/main.rs
@@ -5,18 +5,38 @@ mod handshake;
|
|||||||
mod router;
|
mod router;
|
||||||
mod types;
|
mod types;
|
||||||
|
|
||||||
|
use hjul::*;
|
||||||
|
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
use std::time::Duration;
|
||||||
|
|
||||||
use sodiumoxide;
|
use sodiumoxide;
|
||||||
use types::KeyPair;
|
use types::KeyPair;
|
||||||
|
|
||||||
|
#[derive(Debug, Clone)]
|
||||||
|
struct PeerTimer {
|
||||||
|
a: Timer,
|
||||||
|
b: Timer,
|
||||||
|
}
|
||||||
|
|
||||||
fn main() {
|
fn main() {
|
||||||
|
let runner = Runner::new(Duration::from_millis(100), 1000, 1024);
|
||||||
|
|
||||||
// choose optimal crypto implementations for platform
|
// choose optimal crypto implementations for platform
|
||||||
sodiumoxide::init().unwrap();
|
sodiumoxide::init().unwrap();
|
||||||
|
|
||||||
let mut router = router::Device::new(8);
|
let router = router::Device::new(
|
||||||
{
|
4,
|
||||||
let peer = router.new_peer();
|
|t: &PeerTimer, data: bool| t.a.reset(Duration::from_millis(1000)),
|
||||||
}
|
|t: &PeerTimer, data: bool| t.b.reset(Duration::from_millis(1000)),
|
||||||
loop {}
|
);
|
||||||
|
|
||||||
|
let pt = PeerTimer {
|
||||||
|
a: runner.timer(|| println!("timer-a fired for peer")),
|
||||||
|
b: runner.timer(|| println!("timer-b fired for peer")),
|
||||||
|
};
|
||||||
|
|
||||||
|
let peer = router.new_peer(pt.clone());
|
||||||
|
|
||||||
|
println!("{:?}", pt);
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,32 +1,33 @@
|
|||||||
use arraydeque::{ArrayDeque, Wrapping};
|
|
||||||
use treebitmap::address::Address;
|
|
||||||
use treebitmap::IpLookupTable;
|
|
||||||
|
|
||||||
use crossbeam_deque::{Injector, Steal};
|
|
||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr};
|
use std::net::{Ipv4Addr, Ipv6Addr};
|
||||||
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
|
use std::sync::atomic::{AtomicBool, Ordering};
|
||||||
use std::sync::mpsc::{sync_channel, SyncSender};
|
use std::sync::{Arc, Weak};
|
||||||
use std::sync::{Arc, Mutex, Weak};
|
|
||||||
use std::thread;
|
use std::thread;
|
||||||
use std::time::Instant;
|
use std::time::Instant;
|
||||||
|
|
||||||
|
use crossbeam_deque::{Injector, Steal};
|
||||||
use spin;
|
use spin;
|
||||||
|
use treebitmap::IpLookupTable;
|
||||||
|
|
||||||
use super::super::constants::*;
|
|
||||||
use super::super::types::KeyPair;
|
use super::super::types::KeyPair;
|
||||||
use super::anti_replay::AntiReplay;
|
use super::anti_replay::AntiReplay;
|
||||||
use super::peer;
|
use super::peer;
|
||||||
use super::peer::{Peer, PeerInner};
|
use super::peer::{Peer, PeerInner};
|
||||||
use super::workers::worker_parallel;
|
|
||||||
|
|
||||||
pub struct DeviceInner {
|
use super::types::{Callback, Opaque};
|
||||||
|
|
||||||
|
pub struct DeviceInner<T: Opaque> {
|
||||||
|
// callbacks (used for timers)
|
||||||
|
pub event_recv: Box<dyn Callback<T>>, // authenticated message received
|
||||||
|
pub event_send: Box<dyn Callback<T>>, // authenticated message send
|
||||||
|
pub event_new_handshake: (), // called when a new handshake is required
|
||||||
|
|
||||||
pub stopped: AtomicBool,
|
pub stopped: AtomicBool,
|
||||||
pub injector: Injector<()>, // parallel enc/dec task injector
|
pub injector: Injector<()>, // parallel enc/dec task injector
|
||||||
pub threads: Vec<thread::JoinHandle<()>>, // join handles of worker threads
|
pub threads: Vec<thread::JoinHandle<()>>, // join handles of worker threads
|
||||||
pub recv: spin::RwLock<HashMap<u32, DecryptionState>>, // receiver id -> decryption state
|
pub recv: spin::RwLock<HashMap<u32, DecryptionState<T>>>, // receiver id -> decryption state
|
||||||
pub ipv4: spin::RwLock<IpLookupTable<Ipv4Addr, Weak<PeerInner>>>, // ipv4 cryptkey routing
|
pub ipv4: spin::RwLock<IpLookupTable<Ipv4Addr, Weak<PeerInner<T>>>>, // ipv4 cryptkey routing
|
||||||
pub ipv6: spin::RwLock<IpLookupTable<Ipv6Addr, Weak<PeerInner>>>, // ipv6 cryptkey routing
|
pub ipv6: spin::RwLock<IpLookupTable<Ipv6Addr, Weak<PeerInner<T>>>>, // ipv6 cryptkey routing
|
||||||
}
|
}
|
||||||
|
|
||||||
pub struct EncryptionState {
|
pub struct EncryptionState {
|
||||||
@@ -37,17 +38,17 @@ pub struct EncryptionState {
|
|||||||
// (birth + reject-after-time - keepalive-timeout - rekey-timeout)
|
// (birth + reject-after-time - keepalive-timeout - rekey-timeout)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub struct DecryptionState {
|
pub struct DecryptionState<T: Opaque> {
|
||||||
pub key: [u8; 32],
|
pub key: [u8; 32],
|
||||||
pub keypair: Weak<KeyPair>,
|
pub keypair: Weak<KeyPair>,
|
||||||
pub protector: spin::Mutex<AntiReplay>,
|
pub protector: spin::Mutex<AntiReplay>,
|
||||||
pub peer: Weak<PeerInner>,
|
pub peer: Weak<PeerInner<T>>,
|
||||||
pub death: Instant, // time when the key can no longer be used for decryption
|
pub death: Instant, // time when the key can no longer be used for decryption
|
||||||
}
|
}
|
||||||
|
|
||||||
pub struct Device(Arc<DeviceInner>);
|
pub struct Device<T: Opaque>(Arc<DeviceInner<T>>);
|
||||||
|
|
||||||
impl Drop for Device {
|
impl<T: Opaque> Drop for Device<T> {
|
||||||
fn drop(&mut self) {
|
fn drop(&mut self) {
|
||||||
// mark device as stopped
|
// mark device as stopped
|
||||||
let device = &self.0;
|
let device = &self.0;
|
||||||
@@ -63,9 +64,16 @@ impl Drop for Device {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Device {
|
impl<T: Opaque> Device<T> {
|
||||||
pub fn new(workers: usize) -> Device {
|
pub fn new<F1: Callback<T>, F2: Callback<T>>(
|
||||||
|
workers: usize,
|
||||||
|
event_recv: F1,
|
||||||
|
event_send: F2,
|
||||||
|
) -> Device<T> {
|
||||||
Device(Arc::new(DeviceInner {
|
Device(Arc::new(DeviceInner {
|
||||||
|
event_recv: Box::new(event_recv),
|
||||||
|
event_send: Box::new(event_send),
|
||||||
|
event_new_handshake: (),
|
||||||
threads: vec![],
|
threads: vec![],
|
||||||
stopped: AtomicBool::new(false),
|
stopped: AtomicBool::new(false),
|
||||||
injector: Injector::new(),
|
injector: Injector::new(),
|
||||||
@@ -80,8 +88,8 @@ impl Device {
|
|||||||
/// # Returns
|
/// # Returns
|
||||||
///
|
///
|
||||||
/// A atomic ref. counted peer (with liftime matching the device)
|
/// A atomic ref. counted peer (with liftime matching the device)
|
||||||
pub fn new_peer(&self) -> Peer {
|
pub fn new_peer(&self, opaque: T) -> Peer<T> {
|
||||||
peer::new_peer(self.0.clone())
|
peer::new_peer(self.0.clone(), opaque)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Cryptkey routes and sends a plaintext message (IP packet)
|
/// Cryptkey routes and sends a plaintext message (IP packet)
|
||||||
@@ -96,7 +104,7 @@ impl Device {
|
|||||||
/// This indicates that a handshake should be initated (see the handshake module).
|
/// This indicates that a handshake should be initated (see the handshake module).
|
||||||
/// If this occurs the packet is copied to an internal buffer
|
/// If this occurs the packet is copied to an internal buffer
|
||||||
/// and retransmission can be attempted using send_run_queue
|
/// and retransmission can be attempted using send_run_queue
|
||||||
pub fn send(&self, pt_msg: &mut [u8]) -> Arc<Peer> {
|
pub fn send(&self, pt_msg: &mut [u8]) -> Arc<Peer<T>> {
|
||||||
unimplemented!();
|
unimplemented!();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -1,6 +1,7 @@
|
|||||||
mod anti_replay;
|
mod anti_replay;
|
||||||
mod buffer;
|
mod buffer;
|
||||||
mod device;
|
mod device;
|
||||||
|
mod types;
|
||||||
// mod inbound;
|
// mod inbound;
|
||||||
mod workers;
|
mod workers;
|
||||||
mod peer;
|
mod peer;
|
||||||
|
|||||||
@@ -1,9 +1,8 @@
|
|||||||
|
use std::net::{IpAddr, SocketAddr};
|
||||||
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
|
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
|
||||||
|
use std::sync::mpsc::{sync_channel, SyncSender};
|
||||||
use std::sync::{Arc, Weak};
|
use std::sync::{Arc, Weak};
|
||||||
use std::thread;
|
use std::thread;
|
||||||
use std::mem;
|
|
||||||
use std::net::{IpAddr, SocketAddr};
|
|
||||||
use std::sync::mpsc::{sync_channel, SyncSender};
|
|
||||||
|
|
||||||
use spin;
|
use spin;
|
||||||
|
|
||||||
@@ -21,6 +20,8 @@ use super::device::DeviceInner;
|
|||||||
use super::device::EncryptionState;
|
use super::device::EncryptionState;
|
||||||
use super::workers::{worker_inbound, worker_outbound, JobInbound, JobOutbound};
|
use super::workers::{worker_inbound, worker_outbound, JobInbound, JobOutbound};
|
||||||
|
|
||||||
|
use super::types::Opaque;
|
||||||
|
|
||||||
const MAX_STAGED_PACKETS: usize = 128;
|
const MAX_STAGED_PACKETS: usize = 128;
|
||||||
|
|
||||||
pub struct KeyWheel {
|
pub struct KeyWheel {
|
||||||
@@ -30,13 +31,14 @@ pub struct KeyWheel {
|
|||||||
retired: Option<u32>, // retired id (previous id, after confirming key-pair)
|
retired: Option<u32>, // retired id (previous id, after confirming key-pair)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub struct PeerInner {
|
pub struct PeerInner<T: Opaque> {
|
||||||
pub stopped: AtomicBool,
|
pub stopped: AtomicBool,
|
||||||
pub device: Arc<DeviceInner>,
|
pub opaque: T,
|
||||||
|
pub device: Arc<DeviceInner<T>>,
|
||||||
pub thread_outbound: spin::Mutex<Option<thread::JoinHandle<()>>>,
|
pub thread_outbound: spin::Mutex<Option<thread::JoinHandle<()>>>,
|
||||||
pub thread_inbound: spin::Mutex<Option<thread::JoinHandle<()>>>,
|
pub thread_inbound: spin::Mutex<Option<thread::JoinHandle<()>>>,
|
||||||
pub queue_outbound: SyncSender<JobOutbound>,
|
pub queue_outbound: SyncSender<JobOutbound>,
|
||||||
pub queue_inbound: SyncSender<JobInbound>,
|
pub queue_inbound: SyncSender<JobInbound<T>>,
|
||||||
pub staged_packets: spin::Mutex<ArrayDeque<[Vec<u8>; MAX_STAGED_PACKETS], Wrapping>>, // packets awaiting handshake
|
pub staged_packets: spin::Mutex<ArrayDeque<[Vec<u8>; MAX_STAGED_PACKETS], Wrapping>>, // packets awaiting handshake
|
||||||
pub rx_bytes: AtomicU64, // received bytes
|
pub rx_bytes: AtomicU64, // received bytes
|
||||||
pub tx_bytes: AtomicU64, // transmitted bytes
|
pub tx_bytes: AtomicU64, // transmitted bytes
|
||||||
@@ -45,11 +47,11 @@ pub struct PeerInner {
|
|||||||
pub endpoint: spin::Mutex<Option<Arc<SocketAddr>>>,
|
pub endpoint: spin::Mutex<Option<Arc<SocketAddr>>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
pub struct Peer(Arc<PeerInner>);
|
pub struct Peer<T: Opaque>(Arc<PeerInner<T>>);
|
||||||
|
|
||||||
fn treebit_list<A, R>(
|
fn treebit_list<A, R, T: Opaque>(
|
||||||
peer: &Arc<PeerInner>,
|
peer: &Arc<PeerInner<T>>,
|
||||||
table: &spin::RwLock<IpLookupTable<A, Weak<PeerInner>>>,
|
table: &spin::RwLock<IpLookupTable<A, Weak<PeerInner<T>>>>,
|
||||||
callback: Box<dyn Fn(A, u32) -> R>,
|
callback: Box<dyn Fn(A, u32) -> R>,
|
||||||
) -> Vec<R>
|
) -> Vec<R>
|
||||||
where
|
where
|
||||||
@@ -67,10 +69,10 @@ where
|
|||||||
res
|
res
|
||||||
}
|
}
|
||||||
|
|
||||||
fn treebit_remove<A>(peer: &Peer, table: &spin::RwLock<IpLookupTable<A, Weak<PeerInner>>>)
|
fn treebit_remove<A: Address, T: Opaque>(
|
||||||
where
|
peer: &Peer<T>,
|
||||||
A: Address,
|
table: &spin::RwLock<IpLookupTable<A, Weak<PeerInner<T>>>>,
|
||||||
{
|
) {
|
||||||
let mut m = table.write();
|
let mut m = table.write();
|
||||||
|
|
||||||
// collect keys for value
|
// collect keys for value
|
||||||
@@ -91,9 +93,8 @@ where
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Drop for Peer {
|
impl<T: Opaque> Drop for Peer<T> {
|
||||||
fn drop(&mut self) {
|
fn drop(&mut self) {
|
||||||
|
|
||||||
// mark peer as stopped
|
// mark peer as stopped
|
||||||
|
|
||||||
let peer = &self.0;
|
let peer = &self.0;
|
||||||
@@ -144,11 +145,10 @@ impl Drop for Peer {
|
|||||||
|
|
||||||
*peer.ekey.lock() = None;
|
*peer.ekey.lock() = None;
|
||||||
*peer.endpoint.lock() = None;
|
*peer.endpoint.lock() = None;
|
||||||
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn new_peer(device: Arc<DeviceInner>) -> Peer {
|
pub fn new_peer<T: Opaque>(device: Arc<DeviceInner<T>>, opaque: T) -> Peer<T> {
|
||||||
// allocate in-order queues
|
// allocate in-order queues
|
||||||
let (send_inbound, recv_inbound) = sync_channel(MAX_STAGED_PACKETS);
|
let (send_inbound, recv_inbound) = sync_channel(MAX_STAGED_PACKETS);
|
||||||
let (send_outbound, recv_outbound) = sync_channel(MAX_STAGED_PACKETS);
|
let (send_outbound, recv_outbound) = sync_channel(MAX_STAGED_PACKETS);
|
||||||
@@ -157,6 +157,7 @@ pub fn new_peer(device: Arc<DeviceInner>) -> Peer {
|
|||||||
let peer = {
|
let peer = {
|
||||||
let device = device.clone();
|
let device = device.clone();
|
||||||
Arc::new(PeerInner {
|
Arc::new(PeerInner {
|
||||||
|
opaque,
|
||||||
stopped: AtomicBool::new(false),
|
stopped: AtomicBool::new(false),
|
||||||
device: device,
|
device: device,
|
||||||
ekey: spin::Mutex::new(None),
|
ekey: spin::Mutex::new(None),
|
||||||
@@ -198,8 +199,8 @@ pub fn new_peer(device: Arc<DeviceInner>) -> Peer {
|
|||||||
Peer(peer)
|
Peer(peer)
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Peer {
|
impl<T: Opaque> Peer<T> {
|
||||||
fn new(inner: PeerInner) -> Peer {
|
fn new(inner: PeerInner<T>) -> Peer<T> {
|
||||||
Peer(Arc::new(inner))
|
Peer(Arc::new(inner))
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -315,6 +316,5 @@ impl Peer {
|
|||||||
res
|
res
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn send(&self, msg : Vec<u8>) {
|
pub fn send(&self, msg: Vec<u8>) {}
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|||||||
7
src/router/types.rs
Normal file
7
src/router/types.rs
Normal file
@@ -0,0 +1,7 @@
|
|||||||
|
pub trait Opaque: Send + Sync + 'static {}
|
||||||
|
|
||||||
|
impl<T> Opaque for T where T: Send + Sync + 'static {}
|
||||||
|
|
||||||
|
pub trait Callback<T>: Fn(&T, bool) -> () + Sync + Send + 'static {}
|
||||||
|
|
||||||
|
impl<T, F> Callback<T> for F where F: Fn(&T, bool) -> () + Sync + Send + 'static {}
|
||||||
@@ -10,6 +10,8 @@ use std::sync::mpsc::{sync_channel, Receiver, TryRecvError};
|
|||||||
use std::sync::{Arc, Weak};
|
use std::sync::{Arc, Weak};
|
||||||
use std::thread;
|
use std::thread;
|
||||||
|
|
||||||
|
use super::types::{Opaque, Callback};
|
||||||
|
|
||||||
#[derive(PartialEq)]
|
#[derive(PartialEq)]
|
||||||
enum Operation {
|
enum Operation {
|
||||||
Encryption,
|
Encryption,
|
||||||
@@ -32,7 +34,7 @@ pub struct JobInner {
|
|||||||
|
|
||||||
pub type JobBuffer = Arc<spin::Mutex<JobInner>>;
|
pub type JobBuffer = Arc<spin::Mutex<JobInner>>;
|
||||||
pub type JobParallel = (Arc<thread::JoinHandle<()>>, JobBuffer);
|
pub type JobParallel = (Arc<thread::JoinHandle<()>>, JobBuffer);
|
||||||
pub type JobInbound = (Weak<DecryptionState>, JobBuffer);
|
pub type JobInbound<T> = (Weak<DecryptionState<T>>, JobBuffer);
|
||||||
pub type JobOutbound = JobBuffer;
|
pub type JobOutbound = JobBuffer;
|
||||||
|
|
||||||
/* Strategy for workers acquiring a new job:
|
/* Strategy for workers acquiring a new job:
|
||||||
@@ -80,10 +82,10 @@ fn wait_recv<T>(stopped: &AtomicBool, recv: &Receiver<T>) -> Result<T, TryRecvEr
|
|||||||
return Err(TryRecvError::Disconnected);
|
return Err(TryRecvError::Disconnected);
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn worker_inbound(
|
pub fn worker_inbound<T : Opaque>(
|
||||||
device: Arc<DeviceInner>, // related device
|
device: Arc<DeviceInner<T>>, // related device
|
||||||
peer: Arc<PeerInner>, // related peer
|
peer: Arc<PeerInner<T>>, // related peer
|
||||||
recv: Receiver<JobInbound>, // in order queue
|
recv: Receiver<JobInbound<T>>, // in order queue
|
||||||
) {
|
) {
|
||||||
loop {
|
loop {
|
||||||
match wait_recv(&peer.stopped, &recv) {
|
match wait_recv(&peer.stopped, &recv) {
|
||||||
@@ -108,9 +110,9 @@ pub fn worker_inbound(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn worker_outbound(
|
pub fn worker_outbound<T : Opaque>(
|
||||||
device: Arc<DeviceInner>, // related device
|
device: Arc<DeviceInner<T>>, // related device
|
||||||
peer: Arc<PeerInner>, // related peer
|
peer: Arc<PeerInner<T>>, // related peer
|
||||||
recv: Receiver<JobOutbound>, // in order queue
|
recv: Receiver<JobOutbound>, // in order queue
|
||||||
) {
|
) {
|
||||||
loop {
|
loop {
|
||||||
|
|||||||
Reference in New Issue
Block a user