WIP: Work on handshake worker

This commit is contained in:
Mathias Hall-Andersen
2019-09-16 22:33:46 +02:00
parent 32c030367c
commit dfe4a22920
5 changed files with 76 additions and 24 deletions

View File

@@ -4,7 +4,7 @@ use std::net::SocketAddr;
use std::sync::Mutex; use std::sync::Mutex;
use zerocopy::AsBytes; use zerocopy::AsBytes;
use byteorder::{LittleEndian, ByteOrder}; use byteorder::{ByteOrder, LittleEndian};
use rand::prelude::*; use rand::prelude::*;
@@ -35,7 +35,7 @@ pub struct Device<T> {
*/ */
impl<T> Device<T> impl<T> Device<T>
where where
T: Copy, T: Clone,
{ {
/// Initialize a new handshake state machine /// Initialize a new handshake state machine
/// ///
@@ -270,7 +270,7 @@ where
// return unconfirmed keypair and the response as vector // return unconfirmed keypair and the response as vector
Ok(( Ok((
Some(peer.identifier), Some(peer.identifier.clone()),
Some(resp.as_bytes().to_owned()), Some(resp.as_bytes().to_owned()),
Some(keys), Some(keys),
)) ))

View File

@@ -18,4 +18,4 @@ mod types;
// publicly exposed interface // publicly exposed interface
pub use device::Device; pub use device::Device;
pub use messages::{TYPE_COOKIE_REPLY, TYPE_INITIATION, TYPE_RESPONSE }; pub use messages::{TYPE_COOKIE_REPLY, TYPE_INITIATION, TYPE_RESPONSE};

View File

@@ -215,7 +215,7 @@ mod tests {
} }
} }
pub fn create_initiation<T: Copy, R: RngCore + CryptoRng>( pub fn create_initiation<T: Clone, R: RngCore + CryptoRng>(
rng: &mut R, rng: &mut R,
device: &Device<T>, device: &Device<T>,
peer: &Peer<T>, peer: &Peer<T>,
@@ -296,7 +296,7 @@ pub fn create_initiation<T: Copy, R: RngCore + CryptoRng>(
}) })
} }
pub fn consume_initiation<'a, T: Copy>( pub fn consume_initiation<'a, T: Clone>(
device: &'a Device<T>, device: &'a Device<T>,
msg: &NoiseInitiation, msg: &NoiseInitiation,
) -> Result<(&'a Peer<T>, TemporaryState), HandshakeError> { ) -> Result<(&'a Peer<T>, TemporaryState), HandshakeError> {
@@ -370,7 +370,7 @@ pub fn consume_initiation<'a, T: Copy>(
}) })
} }
pub fn create_response<T: Copy, R: RngCore + CryptoRng>( pub fn create_response<T: Clone, R: RngCore + CryptoRng>(
rng: &mut R, rng: &mut R,
peer: &Peer<T>, peer: &Peer<T>,
sender: u32, // sending identifier sender: u32, // sending identifier
@@ -456,7 +456,7 @@ pub fn create_response<T: Copy, R: RngCore + CryptoRng>(
* allow concurrent processing of potential responses to the initiation, * allow concurrent processing of potential responses to the initiation,
* in order to better mitigate DoS from malformed response messages. * in order to better mitigate DoS from malformed response messages.
*/ */
pub fn consume_response<T: Copy>( pub fn consume_response<T: Clone>(
device: &Device<T>, device: &Device<T>,
msg: &NoiseResponse, msg: &NoiseResponse,
) -> Result<Output<T>, HandshakeError> { ) -> Result<Output<T>, HandshakeError> {
@@ -530,7 +530,7 @@ pub fn consume_response<T: Copy>(
// return confirmed key-pair // return confirmed key-pair
Ok(( Ok((
Some(peer.identifier), Some(peer.identifier.clone()),
None, None,
Some(KeyPair { Some(KeyPair {
birth, birth,

View File

@@ -55,19 +55,19 @@ pub enum State {
impl Drop for State { impl Drop for State {
fn drop(&mut self) { fn drop(&mut self) {
match self { match self {
State::InitiationSent{hs, ck, ..} => { State::InitiationSent { hs, ck, .. } => {
// eph_sk already cleared by dalek-x25519 // eph_sk already cleared by dalek-x25519
hs.clear(); hs.clear();
ck.clear(); ck.clear();
}, }
_ => () _ => (),
} }
} }
} }
impl<T> Peer<T> impl<T> Peer<T>
where where
T: Copy, T: Clone,
{ {
pub fn new( pub fn new(
identifier: T, // external identifier identifier: T, // external identifier
@@ -141,4 +141,4 @@ where
*last_initiation_consumption = Some(Instant::now()); *last_initiation_consumption = Some(Instant::now());
Ok(()) Ok(())
} }
} }

View File

@@ -1,13 +1,15 @@
use crate::handshake; use crate::handshake;
use crate::router; use crate::router;
use crate::types::{Bind, Tun}; use crate::types::{Bind, Endpoint, Tun};
use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::mpsc::sync_channel;
use std::sync::Arc; use std::sync::Arc;
use std::thread; use std::thread;
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
use log::debug;
use rand::rngs::OsRng;
use byteorder::{ByteOrder, LittleEndian}; use byteorder::{ByteOrder, LittleEndian};
use crossbeam_channel::bounded; use crossbeam_channel::bounded;
use x25519_dalek::StaticSecret; use x25519_dalek::StaticSecret;
@@ -16,6 +18,14 @@ const SIZE_HANDSHAKE_QUEUE: usize = 128;
const THRESHOLD_UNDER_LOAD: usize = SIZE_HANDSHAKE_QUEUE / 4; const THRESHOLD_UNDER_LOAD: usize = SIZE_HANDSHAKE_QUEUE / 4;
const DURATION_UNDER_LOAD: Duration = Duration::from_millis(10_000); const DURATION_UNDER_LOAD: Duration = Duration::from_millis(10_000);
#[derive(Clone)]
pub struct Peer<T: Tun, B: Bind>(Arc<PeerInner<T, B>>);
pub struct PeerInner<T: Tun, B: Bind> {
peer: router::Peer<Events, T, B>,
timers: Timers,
}
pub struct Timers {} pub struct Timers {}
pub struct Events(); pub struct Events();
@@ -38,7 +48,7 @@ pub struct Wireguard<T: Tun, B: Bind> {
impl<T: Tun, B: Bind> Wireguard<T, B> { impl<T: Tun, B: Bind> Wireguard<T, B> {
fn start(&self) {} fn start(&self) {}
fn new(tun: T, bind: B) -> Wireguard<T, B> { fn new(tun: T, bind: B, sk: StaticSecret) -> Wireguard<T, B> {
let router = Arc::new(router::Device::new( let router = Arc::new(router::Device::new(
num_cpus::get(), num_cpus::get(),
tun.clone(), tun.clone(),
@@ -46,11 +56,14 @@ impl<T: Tun, B: Bind> Wireguard<T, B> {
)); ));
let handshake_staged = Arc::new(AtomicUsize::new(0)); let handshake_staged = Arc::new(AtomicUsize::new(0));
let handshake_device: Arc<handshake::Device<Peer<T, B>>> =
Arc::new(handshake::Device::new(sk));
// start UDP read IO thread // start UDP read IO thread
let (handshake_tx, handshake_rx) = bounded(128); let (handshake_tx, handshake_rx) = bounded(128);
{ {
let tun = tun.clone(); let tun = tun.clone();
let bind = bind.clone();
thread::spawn(move || { thread::spawn(move || {
let mut under_load = let mut under_load =
Instant::now() - DURATION_UNDER_LOAD - Duration::from_millis(1000); Instant::now() - DURATION_UNDER_LOAD - Duration::from_millis(1000);
@@ -81,11 +94,9 @@ impl<T: Tun, B: Bind> Wireguard<T, B> {
} }
// pass source address along if under load // pass source address along if under load
if under_load.elapsed() < DURATION_UNDER_LOAD { handshake_tx
handshake_tx.send((msg, Some(src))).unwrap(); .send((msg, src, under_load.elapsed() < DURATION_UNDER_LOAD))
} else { .unwrap();
handshake_tx.send((msg, None)).unwrap();
}
} }
router::TYPE_TRANSPORT => { router::TYPE_TRANSPORT => {
// transport message // transport message
@@ -98,9 +109,50 @@ impl<T: Tun, B: Bind> Wireguard<T, B> {
// start handshake workers // start handshake workers
for _ in 0..num_cpus::get() { for _ in 0..num_cpus::get() {
let bind = bind.clone();
let handshake_rx = handshake_rx.clone(); let handshake_rx = handshake_rx.clone();
thread::spawn(move || loop { let handshake_device = handshake_device.clone();
let (msg, src) = handshake_rx.recv().unwrap(); // TODO handle error thread::spawn(move || {
// prepare OsRng instance for this thread
let mut rng = OsRng::new().unwrap();
// process elements from the handshake queue
for (msg, src, under_load) in handshake_rx {
// feed message to handshake device
let src_validate = (&src).into_address(); // TODO avoid
match handshake_device.process(
&mut rng,
&msg[..],
if under_load {
Some(&src_validate)
} else {
None
},
) {
Ok((identity, msg, keypair)) => {
// send response
if let Some(msg) = msg {
let _ = bind.send(&msg[..], &src).map_err(|e| {
debug!(
"handshake worker, failed to send response, error = {:?}",
e
)
});
}
// update timers
if let Some(identity) = identity {
// add keypair to peer and free any unused ids
if let Some(keypair) = keypair {
for id in identity.0.peer.add_keypair(keypair) {
handshake_device.release(id);
}
}
}
}
Err(e) => debug!("handshake worker, error = {:?}", e),
}
}
}); });
} }