Work on peer timers

This commit is contained in:
Mathias Hall-Andersen
2019-09-22 21:35:06 +02:00
parent 5cc1083499
commit 794933d6dd
8 changed files with 135 additions and 64 deletions

7
Cargo.lock generated
View File

@@ -378,7 +378,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
name = "hjul"
version = "0.1.2"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"mio 0.6.19 (registry+https://github.com/rust-lang/crates.io-index)",
@@ -1598,10 +1598,9 @@ dependencies = [
"futures-channel 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)",
"generic-array 0.12.3 (registry+https://github.com/rust-lang/crates.io-index)",
"hex 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)",
"hjul 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)",
"hjul 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)",
"hmac 0.7.1 (registry+https://github.com/rust-lang/crates.io-index)",
"jemallocator 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)",
"lazy_static 1.4.0 (registry+https://github.com/rust-lang/crates.io-index)",
"log 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)",
"num_cpus 1.10.1 (registry+https://github.com/rust-lang/crates.io-index)",
"parking_lot 0.9.0 (registry+https://github.com/rust-lang/crates.io-index)",
@@ -1710,7 +1709,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
"checksum glob 0.2.11 (registry+https://github.com/rust-lang/crates.io-index)" = "8be18de09a56b60ed0edf84bc9df007e30040691af7acd1c41874faac5895bfb"
"checksum heck 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)" = "20564e78d53d2bb135c343b3f47714a56af2061f1c928fdb541dc7b9fdd94205"
"checksum hex 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)" = "805026a5d0141ffc30abb3be3173848ad46a1b1664fe632428479619a3644d77"
"checksum hjul 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)" = "ea9ffb9dc3e645a3fd2820b8c2f3e1c27f48586678f95cf287d75af018eba577"
"checksum hjul 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "151568250ae270811638e74dab09f4dfcb72d6c125f55cd0fa9b8feb0ef8c5fc"
"checksum hmac 0.7.1 (registry+https://github.com/rust-lang/crates.io-index)" = "5dcb5e64cda4c23119ab41ba960d1e170a774c8e4b9d9e6a9bc18aabf5e59695"
"checksum humantime 1.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "3ca7e5f2e110db35f93b837c81797f3714500b81d517bf20c431b16d3ca4f114"
"checksum iovec 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)" = "dbe6e417e7d0975db6512b90796e8ce223145ac4e33c377e4a42882a0e88bb08"

View File

@@ -16,14 +16,13 @@ generic-array = "0.12.3"
zerocopy = "0.2.7"
byteorder = "1.3.1"
digest = "0.8.0"
lazy_static = "^1.3"
tokio = "0.1.22"
futures = "0.1.28"
arraydeque = "0.4.5"
treebitmap = "^0.4"
crossbeam-deque = "0.7"
crossbeam-channel = "0.3.9"
hjul = "0.1.2"
hjul = "^0.2"
ring = "0.16.7"
chacha20poly1305 = "^0.1"
aead = "^0.1.1"

View File

@@ -9,3 +9,5 @@ pub const REJECT_AFTER_TIME: Duration = Duration::from_secs(180);
pub const REKEY_ATTEMPT_TIME: Duration = Duration::from_secs(90);
pub const REKEY_TIMEOUT: Duration = Duration::from_secs(5);
pub const KEEPALIVE_TIMEOUT: Duration = Duration::from_secs(10);
pub const MAX_TIMER_HANDSHAKES: usize = 18;

View File

@@ -1,5 +1,4 @@
use generic_array::GenericArray;
use lazy_static::lazy_static;
use rand::{CryptoRng, RngCore};
use spin::RwLock;
use std::time::{Duration, Instant};
@@ -27,9 +26,7 @@ const SIZE_SECRET: usize = 32;
const SIZE_MAC: usize = 16; // blake2s-mac128
const SIZE_TAG: usize = 16; // xchacha20poly1305 tag
lazy_static! {
pub static ref COOKIE_UPDATE_INTERVAL: Duration = Duration::new(120, 0);
}
const COOKIE_UPDATE_INTERVAL: Duration = Duration::from_secs(120);
macro_rules! HASH {
( $($input:expr),* ) => {{
@@ -168,7 +165,7 @@ impl Generator {
macs.f_mac1 = MAC!(&self.mac1_key, inner);
macs.f_mac2 = match &self.cookie {
Some(cookie) => {
if cookie.birth.elapsed() > *COOKIE_UPDATE_INTERVAL {
if cookie.birth.elapsed() > COOKIE_UPDATE_INTERVAL {
self.cookie = None;
[0u8; SIZE_MAC]
} else {
@@ -206,7 +203,7 @@ impl Validator {
fn get_tau(&self, src: &[u8]) -> Option<[u8; SIZE_COOKIE]> {
let secret = self.secret.read();
if secret.birth.elapsed() < *COOKIE_UPDATE_INTERVAL {
if secret.birth.elapsed() < COOKIE_UPDATE_INTERVAL {
Some(MAC!(&secret.value, src))
} else {
None
@@ -217,7 +214,7 @@ impl Validator {
// check if current value is still valid
{
let secret = self.secret.read();
if secret.birth.elapsed() < *COOKIE_UPDATE_INTERVAL {
if secret.birth.elapsed() < COOKIE_UPDATE_INTERVAL {
return MAC!(&secret.value, src);
};
}
@@ -225,7 +222,7 @@ impl Validator {
// take write lock, check again
{
let mut secret = self.secret.write();
if secret.birth.elapsed() < *COOKIE_UPDATE_INTERVAL {
if secret.birth.elapsed() < COOKIE_UPDATE_INTERVAL {
return MAC!(&secret.value, src);
};

View File

@@ -1,4 +1,3 @@
use lazy_static::lazy_static;
use spin::Mutex;
use std::mem;
@@ -18,9 +17,7 @@ use super::macs;
use super::timestamp;
use super::types::*;
lazy_static! {
pub static ref TIME_BETWEEN_INITIATIONS: Duration = Duration::from_millis(20);
}
const TIME_BETWEEN_INITIATIONS: Duration = Duration::from_millis(20);
/* Represents the recomputation and state of a peer.
*
@@ -123,7 +120,7 @@ impl Peer {
// check flood attack
match *last_initiation_consumption {
Some(last) => {
if last.elapsed() < *TIME_BETWEEN_INITIATIONS {
if last.elapsed() < TIME_BETWEEN_INITIATIONS {
return Err(HandshakeError::InitiationFlood);
}
}

View File

@@ -6,16 +6,12 @@ use std::sync::{Arc, Condvar, Mutex};
use std::thread;
use std::time::{Duration, Instant};
use lazy_static::lazy_static;
const PACKETS_PER_SECOND: u64 = 20;
const PACKETS_BURSTABLE: u64 = 5;
const PACKET_COST: u64 = 1_000_000_000 / PACKETS_PER_SECOND;
const MAX_TOKENS: u64 = PACKET_COST * PACKETS_BURSTABLE;
lazy_static! {
pub static ref GC_INTERVAL: Duration = Duration::new(1, 0);
}
const GC_INTERVAL: Duration = Duration::from_secs(1);
struct Entry {
pub last_time: Instant,
@@ -93,7 +89,7 @@ impl RateLimiter {
{
let mut tw = limiter.table.write();
tw.retain(|_, ref mut entry| {
entry.lock().last_time.elapsed() <= *GC_INTERVAL
entry.lock().last_time.elapsed() <= GC_INTERVAL
});
if tw.len() == 0 {
limiter.gc_running.store(false, Ordering::Relaxed);
@@ -102,7 +98,7 @@ impl RateLimiter {
}
// wait until stopped or new GC (~1 every sec)
let res = cvar.wait_timeout(dropped, *GC_INTERVAL).unwrap();
let res = cvar.wait_timeout(dropped, GC_INTERVAL).unwrap();
dropped = res.0;
}
});

View File

@@ -1,14 +1,15 @@
use std::sync::atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering};
use std::sync::Arc;
use std::marker::PhantomData;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::time::Duration;
use hjul::{Runner, Timer};
use crate::constants::*;
use crate::router::Callbacks;
use crate::types::{Bind, Tun};
use crate::wireguard::Peer;
const ZERO_DURATION: Duration = Duration::from_micros(0);
pub struct TimersInner {
pub struct Timers {
handshake_pending: AtomicBool,
handshake_attempts: AtomicUsize,
@@ -16,24 +17,80 @@ pub struct TimersInner {
send_keepalive: Timer,
zero_key_material: Timer,
new_handshake: Timer,
// stats
rx_bytes: AtomicU64,
tx_bytes: AtomicU64,
}
impl TimersInner {
pub fn new(runner: &Runner) -> Timers {
Arc::new(TimersInner {
impl Timers {
pub fn new<T, B>(runner: &Runner, peer: Peer<T, B>) -> Timers
where
T: Tun,
B: Bind,
{
// create a timer instance for the provided peer
Timers {
handshake_pending: AtomicBool::new(false),
handshake_attempts: AtomicUsize::new(0),
retransmit_handshake: {
let peer = peer.clone();
runner.timer(move || {
if peer.timers.read().handshake_retry() {
peer.new_handshake();
}
})
},
new_handshake: {
let peer = peer.clone();
runner.timer(move || {
peer.new_handshake();
peer.timers.read().handshake_begun();
})
},
send_keepalive: {
let peer = peer.clone();
runner.timer(move || {
peer.router.keepalive();
let keepalive = peer.keepalive.load(Ordering::Acquire);
if keepalive > 0 {
peer.timers
.read()
.send_keepalive
.reset(Duration::from_secs(keepalive as u64))
}
})
},
zero_key_material: {
let peer = peer.clone();
runner.timer(move || {
peer.router.zero_keys();
})
},
}
}
fn handshake_begun(&self) {
self.handshake_pending.store(true, Ordering::SeqCst);
self.handshake_attempts.store(0, Ordering::SeqCst);
self.retransmit_handshake.reset(REKEY_TIMEOUT);
}
fn handshake_retry(&self) -> bool {
if self.handshake_attempts.fetch_add(1, Ordering::SeqCst) <= MAX_TIMER_HANDSHAKES {
self.retransmit_handshake.reset(REKEY_TIMEOUT);
true
} else {
self.handshake_pending.store(false, Ordering::SeqCst);
false
}
}
pub fn dummy(runner: &Runner) -> Timers {
Timers {
handshake_pending: AtomicBool::new(false),
handshake_attempts: AtomicUsize::new(0),
retransmit_handshake: runner.timer(|| {}),
new_handshake: runner.timer(|| {}),
send_keepalive: runner.timer(|| {}),
zero_key_material: runner.timer(|| {}),
rx_bytes: AtomicU64::new(0),
tx_bytes: AtomicU64::new(0),
})
}
}
pub fn handshake_sent(&self) {
@@ -41,25 +98,26 @@ impl TimersInner {
}
}
pub type Timers = Arc<TimersInner>;
/* Instance of the router callbacks */
pub struct Events();
pub struct Events<T, B>(PhantomData<(T, B)>);
impl Callbacks for Events {
type Opaque = Timers;
impl<T: Tun, B: Bind> Callbacks for Events<T, B> {
type Opaque = Peer<T, B>;
fn send(t: &Timers, size: usize, data: bool, sent: bool) {
t.tx_bytes.fetch_add(size as u64, Ordering::Relaxed);
fn send(peer: &Peer<T, B>, size: usize, data: bool, sent: bool) {
peer.tx_bytes.fetch_add(size as u64, Ordering::Relaxed);
}
fn recv(t: &Timers, size: usize, data: bool, sent: bool) {
t.rx_bytes.fetch_add(size as u64, Ordering::Relaxed);
fn recv(peer: &Peer<T, B>, size: usize, data: bool, sent: bool) {
peer.rx_bytes.fetch_add(size as u64, Ordering::Relaxed);
}
fn need_key(t: &Timers) {
if !t.handshake_pending.swap(true, Ordering::SeqCst) {
t.handshake_attempts.store(0, Ordering::SeqCst);
t.new_handshake.reset(ZERO_DURATION);
fn need_key(peer: &Peer<T, B>) {
let timers = peer.timers.read();
if !timers.handshake_pending.swap(true, Ordering::SeqCst) {
timers.handshake_attempts.store(0, Ordering::SeqCst);
timers.new_handshake.fire();
}
}
}

View File

@@ -22,27 +22,36 @@ const SIZE_HANDSHAKE_QUEUE: usize = 128;
const THRESHOLD_UNDER_LOAD: usize = SIZE_HANDSHAKE_QUEUE / 4;
const DURATION_UNDER_LOAD: Duration = Duration::from_millis(10_000);
type Peer<T: Tun, B: Bind> = Arc<PeerInner<T, B>>;
pub type Peer<T: Tun, B: Bind> = Arc<PeerInner<T, B>>;
pub struct PeerInner<T: Tun, B: Bind> {
queue: Mutex<Sender<HandshakeJob<B::Endpoint>>>, // handshake queue
router: router::Peer<Events, T, B>, // router peer
timers: Option<Timers>, //
pub keepalive: AtomicUsize, // keepalive interval
pub rx_bytes: AtomicU64,
pub tx_bytes: AtomicU64,
pub pk: PublicKey, // DISCUSS: Change layout in handshake module (adopt pattern of router), to avoid this.
pub queue: Mutex<Sender<HandshakeJob<B::Endpoint>>>, // handshake queue
pub router: router::Peer<Events<T, B>, T, B>, // router peer
pub timers: RwLock<Timers>, //
}
impl<T: Tun, B: Bind> PeerInner<T, B> {
#[inline(always)]
fn timers(&self) -> &Timers {
self.timers.as_ref().unwrap()
pub fn new_handshake(&self) {
self.queue.lock().send(HandshakeJob::New(self.pk)).unwrap();
}
}
macro_rules! timers {
($peer:expr) => {
$peer.timers.read()
};
}
struct Handshake {
device: handshake::Device,
active: bool,
}
enum HandshakeJob<E> {
pub enum HandshakeJob<E> {
Message(Vec<u8>, E),
New(PublicKey),
}
@@ -51,8 +60,8 @@ struct WireguardInner<T: Tun, B: Bind> {
// identify and configuration map
peers: RwLock<HashMap<[u8; 32], Peer<T, B>>>,
// cryptkey routing
router: router::Device<Events, T, B>,
// cryptkey router
router: router::Device<Events<T, B>, T, B>,
// handshake related state
handshake: RwLock<Handshake>,
@@ -84,6 +93,20 @@ impl<T: Tun, B: Bind> Wireguard<T, B> {
}
}
/*
fn new_peer(&self, pk: PublicKey) -> Peer<T, B> {
let router = self.state.router.new_peer();
Arc::new(PeerInner {
pk,
queue: Mutex::new(self.state.queue.lock().clone()),
keepalive: AtomicUsize::new(0),
rx_bytes: AtomicU64::new(0),
tx_bytes: AtomicU64::new(0),
})
}
*/
fn new(tun: T, bind: B) -> Wireguard<T, B> {
// create device state
let mut rng = OsRng::new().unwrap();
@@ -166,7 +189,7 @@ impl<T: Tun, B: Bind> Wireguard<T, B> {
let msg = state.device.begin(&mut rng, &pk).unwrap(); // TODO handle
if let Some(peer) = wg.peers.read().get(pk.as_bytes()) {
peer.router.send(&msg[..]);
peer.timers().handshake_sent();
timers!(peer).handshake_sent();
}
}
}