Implemented keypair_confirm

This commit is contained in:
Mathias Hall-Andersen
2019-08-18 15:44:20 +02:00
parent 78ab1a93e6
commit 31ef3e2871
2 changed files with 178 additions and 96 deletions

View File

@@ -16,5 +16,5 @@ fn main() {
let mut rdev = router::Device::new(8); let mut rdev = router::Device::new(8);
let pref = rdev.add(); let pref = rdev.new_peer();
} }

View File

@@ -6,7 +6,7 @@ use crossbeam_deque::{Injector, Steal};
use std::collections::HashMap; use std::collections::HashMap;
use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr}; use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr};
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering}; use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use std::sync::mpsc::SyncSender; use std::sync::mpsc::{sync_channel, SyncSender};
use std::sync::{Arc, Mutex, Weak}; use std::sync::{Arc, Mutex, Weak};
use std::thread; use std::thread;
use std::time::Instant; use std::time::Instant;
@@ -37,11 +37,11 @@ struct PeerInner {
thread_inbound: spin::Mutex<thread::JoinHandle<()>>, thread_inbound: spin::Mutex<thread::JoinHandle<()>>,
inorder_outbound: SyncSender<()>, inorder_outbound: SyncSender<()>,
inorder_inbound: SyncSender<()>, inorder_inbound: SyncSender<()>,
staged_packets: Mutex<ArrayDeque<[Vec<u8>; MAX_STAGED_PACKETS], Wrapping>>, // packets awaiting handshake staged_packets: spin::Mutex<ArrayDeque<[Vec<u8>; MAX_STAGED_PACKETS], Wrapping>>, // packets awaiting handshake
rx_bytes: AtomicU64, // received bytes rx_bytes: AtomicU64, // received bytes
tx_bytes: AtomicU64, // transmitted bytes tx_bytes: AtomicU64, // transmitted bytes
keys: spin::Mutex<KeyWheel>, // key-wheel keys: spin::Mutex<KeyWheel>, // key-wheel
ekey: spin::Mutex<Option<EncryptionState>>, // encryption state ekey: spin::Mutex<Option<EncryptionState>>, // encryption state
endpoint: spin::Mutex<Option<Arc<SocketAddr>>>, endpoint: spin::Mutex<Option<Arc<SocketAddr>>>,
} }
@@ -55,22 +55,24 @@ struct EncryptionState {
struct DecryptionState { struct DecryptionState {
key: [u8; 32], key: [u8; 32],
protector: Arc<spin::Mutex<AntiReplay>>, // keypair: Weak<KeyPair>,
protector: spin::Mutex<AntiReplay>,
peer: Weak<PeerInner>, peer: Weak<PeerInner>,
death: Instant, // time when the key can no longer be used for decryption death: Instant, // time when the key can no longer be used for decryption
} }
struct KeyWheel { struct KeyWheel {
next: Option<KeyPair>, // next key state (unconfirmed) next: Option<Arc<KeyPair>>, // next key state (unconfirmed)
current: Option<KeyPair>, // current key state (used for encryption) current: Option<Arc<KeyPair>>, // current key state (used for encryption)
previous: Option<KeyPair>, // old key state (used for decryption) previous: Option<Arc<KeyPair>>, // old key state (used for decryption)
retired: Option<u32>, // retired id (previous id, after confirming key-pair)
} }
pub struct Peer(Arc<PeerInner>); pub struct Peer(Arc<PeerInner>);
pub struct Device(DeviceInner); pub struct Device(Arc<DeviceInner>);
fn treebit_list<A, R>( fn treebit_list<A, R>(
peer: &Peer, peer: &Arc<PeerInner>,
table: &spin::RwLock<IpLookupTable<A, Weak<PeerInner>>>, table: &spin::RwLock<IpLookupTable<A, Weak<PeerInner>>>,
callback: Box<dyn Fn(A, u32) -> R>, callback: Box<dyn Fn(A, u32) -> R>,
) -> Vec<R> ) -> Vec<R>
@@ -81,7 +83,7 @@ where
for subnet in table.read().iter() { for subnet in table.read().iter() {
let (ip, masklen, p) = subnet; let (ip, masklen, p) = subnet;
if let Some(p) = p.upgrade() { if let Some(p) = p.upgrade() {
if Arc::ptr_eq(&p, &peer.0) { if Arc::ptr_eq(&p, &peer) {
res.push(callback(ip, masklen)) res.push(callback(ip, masklen))
} }
} }
@@ -116,10 +118,12 @@ where
impl Drop for Peer { impl Drop for Peer {
fn drop(&mut self) { fn drop(&mut self) {
// mark peer as stopped // mark peer as stopped
let peer = &self.0; let peer = &self.0;
peer.stopped.store(true, Ordering::SeqCst); peer.stopped.store(true, Ordering::SeqCst);
// remove from cryptkey router // remove from cryptkey router
treebit_remove(self, &peer.device.ipv4); treebit_remove(self, &peer.device.ipv4);
treebit_remove(self, &peer.device.ipv6); treebit_remove(self, &peer.device.ipv6);
@@ -127,15 +131,16 @@ impl Drop for Peer {
peer.thread_inbound.lock().thread().unpark(); peer.thread_inbound.lock().thread().unpark();
peer.thread_outbound.lock().thread().unpark(); peer.thread_outbound.lock().thread().unpark();
// collect ids to release
// release ids from the receiver map
let mut keys = peer.keys.lock(); let mut keys = peer.keys.lock();
let mut release = Vec::with_capacity(3); let mut release = Vec::with_capacity(3);
keys.next.map(|k| release.push(k.recv.id)); keys.next.as_ref().map(|k| release.push(k.recv.id));
keys.current.map(|k| release.push(k.recv.id)); keys.current.as_ref().map(|k| release.push(k.recv.id));
keys.previous.map(|k| release.push(k.recv.id)); keys.previous.as_ref().map(|k| release.push(k.recv.id));
// remove from receive id map
if release.len() > 0 { if release.len() > 0 {
let mut recv = peer.device.recv.write(); let mut recv = peer.device.recv.write();
for id in &release { for id in &release {
@@ -170,23 +175,73 @@ impl Drop for Device {
} }
} }
impl PeerInner {
pub fn keypair_confirm(&self, kp: Weak<KeyPair>) {
let mut keys = self.keys.lock();
// Attempt to upgrade Weak -> Arc
// (this should ensure that the key is in the key-wheel,
// which holds the only strong reference)
let kp = match kp.upgrade() {
Some(kp) => kp,
None => {
return;
}
};
debug_assert!(
keys.retired.is_none(),
"retired spot is not free for previous"
);
debug_assert!(
if let Some(key) = &keys.next {
Arc::ptr_eq(&kp, &key)
} else {
false
},
"if next has been overwritten, before confirmation, the key-pair should have been dropped!"
);
// enable use for encryption and set confirmed
*self.ekey.lock() = Some(EncryptionState {
id: kp.send.id,
key: kp.send.key,
nonce: 0,
death: kp.birth + REJECT_AFTER_TIME,
});
// rotate the key-wheel
let release = keys.previous.as_ref().map(|k| k.recv.id);
keys.previous = keys.current.as_ref().map(|v| v.clone());
keys.current = Some(kp.clone());
keys.retired = release;
}
}
/// Public interface and handle to the peer
impl Peer { impl Peer {
pub fn set_endpoint(&self, endpoint: SocketAddr) { pub fn set_endpoint(&self, endpoint: SocketAddr) {
*self.0.endpoint.lock() = Some(Arc::new(endpoint)) *self.0.endpoint.lock() = Some(Arc::new(endpoint))
} }
pub fn keypair_confirm(&self, ks: Arc<KeyPair>) { /// Add a new keypair
*self.0.ekey.lock() = Some(EncryptionState { ///
id: ks.send.id, /// # Arguments
key: ks.send.key, ///
nonce: 0, /// - new: The new confirmed/unconfirmed key pair
death: ks.birth + REJECT_AFTER_TIME, ///
}); /// # Returns
} ///
/// A vector of ids which has been released.
fn keypair_add(&self, new: KeyPair) -> Option<u32> { /// These should be released in the handshake module.
pub fn add_keypair(&self, new: KeyPair) -> Vec<u32> {
let mut keys = self.0.keys.lock(); let mut keys = self.0.keys.lock();
let release = keys.previous.map(|k| k.recv.id); let mut release = Vec::with_capacity(2);
// collect ids to be released
keys.retired.map(|v| release.push(v));
keys.previous.as_ref().map(|k| release.push(k.recv.id));
// update key-wheel // update key-wheel
if new.confirmed { if new.confirmed {
@@ -199,14 +254,37 @@ impl Peer {
}); });
// move current into previous // move current into previous
keys.previous = keys.current; keys.previous = keys.current.as_ref().map(|v| v.clone());;
keys.current = Some(new); keys.current = Some(Arc::new(new));
} else { } else {
// store the key and await confirmation // store the key and await confirmation
keys.previous = keys.next; keys.previous = keys.next.as_ref().map(|v| v.clone());;
keys.next = Some(new); keys.next = Some(Arc::new(new));
}; };
// update incoming packet id map
{
let mut recv = self.0.device.recv.write();
// purge recv map of released ids
for id in &release {
recv.remove(&id);
}
// map new id to keypair
debug_assert!(!recv.contains_key(&new.recv.id));
recv.insert(
new.recv.id,
DecryptionState {
key: new.recv.key,
protector: spin::Mutex::new(AntiReplay::new()),
peer: Arc::downgrade(&self.0),
death: new.birth + REJECT_AFTER_TIME,
},
);
}
// return the released id (for handshake state machine) // return the released id (for handshake state machine)
release release
} }
@@ -218,77 +296,52 @@ impl Peer {
pub fn tx_bytes(&self) -> u64 { pub fn tx_bytes(&self) -> u64 {
self.0.tx_bytes.load(Ordering::Relaxed) self.0.tx_bytes.load(Ordering::Relaxed)
} }
pub fn add_subnet(&self, ip: IpAddr, masklen: u32) {
match ip {
IpAddr::V4(v4) => {
self.0
.device
.ipv4
.write()
.insert(v4, masklen, Arc::downgrade(&self.0))
}
IpAddr::V6(v6) => {
self.0
.device
.ipv6
.write()
.insert(v6, masklen, Arc::downgrade(&self.0))
}
};
}
pub fn list_subnets(&self) -> Vec<(IpAddr, u32)> {
let mut res = Vec::new();
res.append(&mut treebit_list(
&self.0,
&self.0.device.ipv4,
Box::new(|ip, masklen| (IpAddr::V4(ip), masklen)),
));
res.append(&mut treebit_list(
&self.0,
&self.0.device.ipv6,
Box::new(|ip, masklen| (IpAddr::V6(ip), masklen)),
));
res
}
} }
impl Device { impl Device {
pub fn new(workers: usize) -> Device { pub fn new(workers: usize) -> Device {
Device(DeviceInner { Device(Arc::new(DeviceInner {
threads: vec![], threads: vec![],
stopped: AtomicBool::new(false), stopped: AtomicBool::new(false),
injector: Injector::new(), injector: Injector::new(),
recv: spin::RwLock::new(HashMap::new()), recv: spin::RwLock::new(HashMap::new()),
ipv4: spin::RwLock::new(IpLookupTable::new()), ipv4: spin::RwLock::new(IpLookupTable::new()),
ipv6: spin::RwLock::new(IpLookupTable::new()), ipv6: spin::RwLock::new(IpLookupTable::new()),
}) }))
}
pub fn add_subnet(&mut self, ip: IpAddr, masklen: u32, peer: Peer) {
match ip {
IpAddr::V4(v4) => self
.0
.ipv4
.write()
.insert(v4, masklen, Arc::downgrade(&peer.0)),
IpAddr::V6(v6) => self
.0
.ipv6
.write()
.insert(v6, masklen, Arc::downgrade(&peer.0)),
};
}
pub fn list_subnets(&self, peer: Peer) -> Vec<(IpAddr, u32)> {
let mut res = Vec::new();
res.append(&mut treebit_list(
&peer,
&self.0.ipv4,
Box::new(|ip, masklen| (IpAddr::V4(ip), masklen)),
));
res.append(&mut treebit_list(
&peer,
&self.0.ipv6,
Box::new(|ip, masklen| (IpAddr::V6(ip), masklen)),
));
res
}
pub fn keypair_add(&self, peer: Peer, new: KeyPair) -> Option<u32> {
// update key-wheel of peer
let release = peer.keypair_add(new);
// update incoming packet id map
let mut recv = self.0.recv.write();
// release id of previous keypair
if let Some(id) = release {
debug_assert!(recv.contains_key(&id));
recv.remove(&id);
};
// map new id to keypair
debug_assert!(!recv.contains_key(&new.recv.id));
recv.insert(
new.recv.id,
DecryptionState {
key: new.recv.key,
protector: Arc::new(spin::Mutex::new(AntiReplay::new())),
peer: Arc::downgrade(&peer.0),
death: new.birth + REJECT_AFTER_TIME,
},
);
release
} }
/// Adds a new peer to the device /// Adds a new peer to the device
@@ -296,7 +349,36 @@ impl Device {
/// # Returns /// # Returns
/// ///
/// A atomic ref. counted peer (with liftime matching the device) /// A atomic ref. counted peer (with liftime matching the device)
pub fn add(&mut self) -> () {} pub fn new_peer(&self) -> Peer {
// spawn inbound thread
let (send_inbound, recv_inbound) = sync_channel(1);
let handle_inbound = thread::spawn(move || {});
// spawn outbound thread
let (send_outbound, recv_inbound) = sync_channel(1);
let handle_outbound = thread::spawn(move || {});
// allocate peer object
Peer(Arc::new(PeerInner {
stopped: AtomicBool::new(false),
device: self.0.clone(),
ekey: spin::Mutex::new(None),
endpoint: spin::Mutex::new(None),
inorder_inbound: send_inbound,
inorder_outbound: send_outbound,
keys: spin::Mutex::new(KeyWheel {
next: None,
current: None,
previous: None,
retired: None,
}),
rx_bytes: AtomicU64::new(0),
tx_bytes: AtomicU64::new(0),
staged_packets: spin::Mutex::new(ArrayDeque::new()),
thread_inbound: spin::Mutex::new(handle_inbound),
thread_outbound: spin::Mutex::new(handle_outbound),
}))
}
/// Cryptkey routes and sends a plaintext message (IP packet) /// Cryptkey routes and sends a plaintext message (IP packet)
/// ///