WIP: Handshake queue and workers

This commit is contained in:
Mathias Hall-Andersen
2019-09-15 21:10:23 +02:00
parent f46762183a
commit 32c030367c
4 changed files with 92 additions and 28 deletions

10
Cargo.lock generated
View File

@@ -177,6 +177,14 @@ dependencies = [
"bitflags 1.1.0 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "crossbeam-channel"
version = "0.3.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"crossbeam-utils 0.6.6 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "crossbeam-deque"
version = "0.7.1"
@@ -1582,6 +1590,7 @@ dependencies = [
"byteorder 1.3.2 (registry+https://github.com/rust-lang/crates.io-index)",
"chacha20poly1305 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)",
"clear_on_drop 0.2.3 (registry+https://github.com/rust-lang/crates.io-index)",
"crossbeam-channel 0.3.9 (registry+https://github.com/rust-lang/crates.io-index)",
"crossbeam-deque 0.7.1 (registry+https://github.com/rust-lang/crates.io-index)",
"digest 0.8.1 (registry+https://github.com/rust-lang/crates.io-index)",
"env_logger 0.6.2 (registry+https://github.com/rust-lang/crates.io-index)",
@@ -1676,6 +1685,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
"checksum chacha20poly1305 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "40cd3ddeae0b0ea7fe848a06e4fbf3f02463648b9395bd1139368ce42b44543e"
"checksum clear_on_drop 0.2.3 (registry+https://github.com/rust-lang/crates.io-index)" = "97276801e127ffb46b66ce23f35cc96bd454fa311294bced4bbace7baa8b1d17"
"checksum cloudabi 0.0.3 (registry+https://github.com/rust-lang/crates.io-index)" = "ddfc5b9aa5d4507acaf872de71051dfd0e309860e88966e1051e462a077aac4f"
"checksum crossbeam-channel 0.3.9 (registry+https://github.com/rust-lang/crates.io-index)" = "c8ec7fcd21571dc78f96cc96243cab8d8f035247c3efd16c687be154c3fa9efa"
"checksum crossbeam-deque 0.7.1 (registry+https://github.com/rust-lang/crates.io-index)" = "b18cd2e169ad86297e6bc0ad9aa679aee9daa4f19e8163860faf7c164e4f5a71"
"checksum crossbeam-epoch 0.7.2 (registry+https://github.com/rust-lang/crates.io-index)" = "fedcd6772e37f3da2a9af9bf12ebe046c0dfe657992377b4df982a2b54cd37a9"
"checksum crossbeam-queue 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)" = "7c979cd6cfe72335896575c6b5688da489e420d36a27a0b9eb0c73db574b4a4b"

View File

@@ -22,6 +22,7 @@ futures = "0.1.28"
arraydeque = "0.4.5"
treebitmap = "^0.4"
crossbeam-deque = "0.7"
crossbeam-channel = "0.3.9"
hjul = "0.1.2"
ring = "0.16.7"
chacha20poly1305 = "^0.1"

View File

@@ -308,28 +308,40 @@ impl<C: Callbacks, T: Tun, B: Bind> PeerInner<C, T, B> {
let mut header: LayoutVerified<&mut [u8], TransportHeader> = header;
// check if has key
let key = match self.ekey.lock().as_mut() {
None => {
// add to staged packets (create no job)
debug!("execute callback: call_need_key");
C::need_key(&self.opaque);
self.staged_packets.lock().push_back(msg);
return None;
}
Some(mut state) => {
// avoid integer overflow in nonce
if state.nonce >= REJECT_AFTER_MESSAGES - 1 {
return None;
}
debug!("encryption state available, nonce = {}", state.nonce);
let key = {
let mut ekey = self.ekey.lock();
let key = match ekey.as_mut() {
None => None,
Some(mut state) => {
// avoid integer overflow in nonce
if state.nonce >= REJECT_AFTER_MESSAGES - 1 {
*ekey = None;
None
} else {
// there should be no stacked packets lingering around
debug_assert_eq!(self.staged_packets.lock().len(), 0);
debug!("encryption state available, nonce = {}", state.nonce);
// set transport message fields
header.f_counter.set(state.nonce);
header.f_receiver.set(state.id);
state.nonce += 1;
state.key
}
};
// set transport message fields
header.f_counter.set(state.nonce);
header.f_receiver.set(state.id);
state.nonce += 1;
Some(state.key)
}
}
};
// If not suitable key was found:
// 1. Stage packet for later transmission
// 2. Request new key
if key.is_none() {
self.staged_packets.lock().push_back(msg);
C::need_key(&self.opaque);
return None;
};
key
}?;
// add job to in-order queue and return sendeer to device for inclusion in worker pool
let (tx, rx) = oneshot();

View File

@@ -2,12 +2,20 @@ use crate::handshake;
use crate::router;
use crate::types::{Bind, Tun};
use byteorder::{ByteOrder, LittleEndian};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::mpsc::sync_channel;
use std::sync::Arc;
use std::thread;
use std::time::{Duration, Instant};
use byteorder::{ByteOrder, LittleEndian};
use crossbeam_channel::bounded;
use x25519_dalek::StaticSecret;
const SIZE_HANDSHAKE_QUEUE: usize = 128;
const THRESHOLD_UNDER_LOAD: usize = SIZE_HANDSHAKE_QUEUE / 4;
const DURATION_UNDER_LOAD: Duration = Duration::from_millis(10_000);
pub struct Timers {}
pub struct Events();
@@ -23,18 +31,30 @@ impl router::Callbacks for Events {
}
pub struct Wireguard<T: Tun, B: Bind> {
router: router::Device<Events, T, B>,
handshake: Option<handshake::Device<()>>,
router: Arc<router::Device<Events, T, B>>,
handshake: Option<Arc<handshake::Device<()>>>,
}
impl<T: Tun, B: Bind> Wireguard<T, B> {
fn start(&self) {}
fn new(tun: T, bind: B) -> Wireguard<T, B> {
let router = router::Device::new(num_cpus::get(), tun.clone(), bind.clone());
let router = Arc::new(router::Device::new(
num_cpus::get(),
tun.clone(),
bind.clone(),
));
let handshake_staged = Arc::new(AtomicUsize::new(0));
// start UDP read IO thread
let (handshake_tx, handshake_rx) = bounded(128);
{
let tun = tun.clone();
thread::spawn(move || {
let mut under_load =
Instant::now() - DURATION_UNDER_LOAD - Duration::from_millis(1000);
loop {
// read UDP packet into vector
let size = tun.mtu() + 148; // maximum message size
@@ -45,14 +65,27 @@ impl<T: Tun, B: Bind> Wireguard<T, B> {
msg.truncate(size);
// message type de-multiplexer
if msg.len() < 4 {
if msg.len() < std::mem::size_of::<u32>() {
continue;
}
match LittleEndian::read_u32(&msg[..]) {
handshake::TYPE_COOKIE_REPLY
| handshake::TYPE_INITIATION
| handshake::TYPE_RESPONSE => {
// handshake message
// detect if under load
if handshake_staged.fetch_add(1, Ordering::SeqCst)
> THRESHOLD_UNDER_LOAD
{
under_load = Instant::now()
}
// pass source address along if under load
if under_load.elapsed() < DURATION_UNDER_LOAD {
handshake_tx.send((msg, Some(src))).unwrap();
} else {
handshake_tx.send((msg, None)).unwrap();
}
}
router::TYPE_TRANSPORT => {
// transport message
@@ -63,6 +96,14 @@ impl<T: Tun, B: Bind> Wireguard<T, B> {
});
}
// start handshake workers
for _ in 0..num_cpus::get() {
let handshake_rx = handshake_rx.clone();
thread::spawn(move || loop {
let (msg, src) = handshake_rx.recv().unwrap(); // TODO handle error
});
}
// start TUN read IO thread
thread::spawn(move || {});