Fixed typo in under load code

This commit is contained in:
Mathias Hall-Andersen
2019-12-27 18:01:11 +01:00
parent 956c3f02d4
commit d4f5d5b723
6 changed files with 69 additions and 25 deletions

View File

@@ -41,6 +41,7 @@ features = ["nightly"]
[features] [features]
profiler = ["cpuprofiler"] profiler = ["cpuprofiler"]
start_up = []
[dev-dependencies] [dev-dependencies]
pnet = "^0.22" pnet = "^0.22"

View File

@@ -299,7 +299,10 @@ impl LinuxTunStatus {
Err(LinuxTunError::Closed) Err(LinuxTunError::Closed)
} else { } else {
Ok(LinuxTunStatus { Ok(LinuxTunStatus {
events: vec![TunEvent::Up(1500)], events: vec![
#[cfg(feature = "start_up")]
TunEvent::Up(1500),
],
index: get_ifindex(&name), index: get_ifindex(&name),
fd, fd,
name, name,

View File

@@ -4,8 +4,8 @@ use super::timers::{Events, Timers};
use super::tun::Tun; use super::tun::Tun;
use super::udp::UDP; use super::udp::UDP;
use super::wireguard::WireGuard;
use super::constants::REKEY_TIMEOUT; use super::constants::REKEY_TIMEOUT;
use super::wireguard::WireGuard;
use super::workers::HandshakeJob; use super::workers::HandshakeJob;
use std::fmt; use std::fmt;
@@ -60,21 +60,31 @@ impl<T: Tun, B: UDP> PeerInner<T, B> {
* The function is ratelimited. * The function is ratelimited.
*/ */
pub fn packet_send_handshake_initiation(&self) { pub fn packet_send_handshake_initiation(&self) {
// the function is rate limited log::trace!("{} : packet_send_handshake_initiation", self);
// the function is rate limited
{ {
let mut lhs = self.last_handshake_sent.lock(); let mut lhs = self.last_handshake_sent.lock();
if lhs.elapsed() < REKEY_TIMEOUT { if lhs.elapsed() < REKEY_TIMEOUT {
log::trace!("{} : packet_send_handshake_initiation, rate-limited!", self);
return; return;
} }
*lhs = Instant::now(); *lhs = Instant::now();
} }
// create a new handshake job for the peer // create a new handshake job for the peer
if !self.handshake_queued.swap(true, Ordering::SeqCst) { if !self.handshake_queued.swap(true, Ordering::SeqCst) {
self.wg.pending.fetch_add(1, Ordering::SeqCst); self.wg.pending.fetch_add(1, Ordering::SeqCst);
self.wg.queue.send(HandshakeJob::New(self.pk)); self.wg.queue.send(HandshakeJob::New(self.pk));
log::trace!(
"{} : packet_send_handshake_initiation, handshake queued",
self
);
} else {
log::trace!(
"{} : packet_send_handshake_initiation, handshake already queued",
self
);
} }
} }
@@ -89,6 +99,12 @@ impl<T: Tun, B: UDP> PeerInner<T, B> {
} }
} }
impl<T: Tun, B: UDP> fmt::Display for PeerInner<T, B> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "peer(id = {})", self.id)
}
}
impl<T: Tun, B: UDP> fmt::Display for Peer<T, B> { impl<T: Tun, B: UDP> fmt::Display for Peer<T, B> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "peer(id = {})", self.id) write!(f, "peer(id = {})", self.id)

View File

@@ -232,7 +232,7 @@ impl<E: Endpoint, C: Callbacks, T: tun::Writer, B: udp::Writer<E>> Peer<E, C, T,
match staged.pop_front() { match staged.pop_front() {
Some(msg) => { Some(msg) => {
sent = true; sent = true;
self.send_raw(msg); self.send_raw(msg, false);
} }
None => break sent, None => break sent,
} }
@@ -240,10 +240,11 @@ impl<E: Endpoint, C: Callbacks, T: tun::Writer, B: udp::Writer<E>> Peer<E, C, T,
} }
// Treat the msg as the payload of a transport message // Treat the msg as the payload of a transport message
// Unlike device.send, peer.send_raw does not buffer messages when a key is not available. //
fn send_raw(&self, msg: Vec<u8>) -> bool { // Returns true if the message was queued for transmission.
fn send_raw(&self, msg: Vec<u8>, stage: bool) -> bool {
log::debug!("peer.send_raw"); log::debug!("peer.send_raw");
match self.send_job(msg, false) { match self.send_job(msg, stage) {
Some(job) => { Some(job) => {
self.device.queue_outbound.send(job); self.device.queue_outbound.send(job);
debug!("send_raw: got obtained send_job"); debug!("send_raw: got obtained send_job");
@@ -300,7 +301,11 @@ impl<E: Endpoint, C: Callbacks, T: tun::Writer, B: udp::Writer<E>> Peer<E, C, T,
} }
pub fn send_job(&self, msg: Vec<u8>, stage: bool) -> Option<Job<Self, Outbound>> { pub fn send_job(&self, msg: Vec<u8>, stage: bool) -> Option<Job<Self, Outbound>> {
debug!("peer.send_job"); debug!(
"peer.send_job, msg.len() = {}, stage = {}",
msg.len(),
stage
);
debug_assert!( debug_assert!(
msg.len() >= mem::size_of::<TransportHeader>(), msg.len() >= mem::size_of::<TransportHeader>(),
"received message with size: {:}", "received message with size: {:}",
@@ -333,6 +338,7 @@ impl<E: Endpoint, C: Callbacks, T: tun::Writer, B: udp::Writer<E>> Peer<E, C, T,
// 1. Stage packet for later transmission // 1. Stage packet for later transmission
// 2. Request new key // 2. Request new key
if keypair.is_none() && stage { if keypair.is_none() && stage {
log::trace!("packet staged");
self.staged_packets.lock().push_back(msg); self.staged_packets.lock().push_back(msg);
C::need_key(&self.opaque); C::need_key(&self.opaque);
return None; return None;
@@ -491,7 +497,7 @@ impl<E: Endpoint, C: Callbacks, T: tun::Writer, B: udp::Writer<E>> PeerHandle<E,
pub fn send_keepalive(&self) -> bool { pub fn send_keepalive(&self) -> bool {
debug!("peer.send_keepalive"); debug!("peer.send_keepalive");
self.peer.send_raw(vec![0u8; SIZE_MESSAGE_PREFIX]) self.peer.send_raw(vec![0u8; SIZE_MESSAGE_PREFIX], true)
} }
/// Map a subnet to the peer /// Map a subnet to the peer

View File

@@ -80,7 +80,7 @@ impl<T: Tun, B: UDP> PeerInner<T, B> {
if timers.keepalive_interval > 0 { if timers.keepalive_interval > 0 {
timers timers
.send_persistent_keepalive .send_persistent_keepalive
.start(Duration::from_secs(timers.keepalive_interval)); .start(Duration::from_secs(0));
} }
} }
@@ -108,6 +108,7 @@ impl<T: Tun, B: UDP> PeerInner<T, B> {
* - handshake * - handshake
*/ */
pub fn timers_any_authenticated_packet_sent(&self) { pub fn timers_any_authenticated_packet_sent(&self) {
log::trace!("timers_any_authenticated_packet_sent");
let timers = self.timers(); let timers = self.timers();
if timers.enabled { if timers.enabled {
timers.send_keepalive.stop() timers.send_keepalive.stop()
@@ -120,6 +121,7 @@ impl<T: Tun, B: UDP> PeerInner<T, B> {
* - handshake * - handshake
*/ */
pub fn timers_any_authenticated_packet_received(&self) { pub fn timers_any_authenticated_packet_received(&self) {
log::trace!("timers_any_authenticated_packet_received");
let timers = self.timers(); let timers = self.timers();
if timers.enabled { if timers.enabled {
timers.new_handshake.stop(); timers.new_handshake.stop();
@@ -128,6 +130,7 @@ impl<T: Tun, B: UDP> PeerInner<T, B> {
/* Should be called after a handshake initiation message is sent. */ /* Should be called after a handshake initiation message is sent. */
pub fn timers_handshake_initiated(&self) { pub fn timers_handshake_initiated(&self) {
log::trace!("timers_handshake_initiated");
let timers = self.timers(); let timers = self.timers();
if timers.enabled { if timers.enabled {
timers.send_keepalive.stop(); timers.send_keepalive.stop();
@@ -139,6 +142,7 @@ impl<T: Tun, B: UDP> PeerInner<T, B> {
* or when getting key confirmation via the first data message. * or when getting key confirmation via the first data message.
*/ */
pub fn timers_handshake_complete(&self) { pub fn timers_handshake_complete(&self) {
log::trace!("timers_handshake_complete");
let timers = self.timers(); let timers = self.timers();
if timers.enabled { if timers.enabled {
timers.retransmit_handshake.stop(); timers.retransmit_handshake.stop();
@@ -154,6 +158,7 @@ impl<T: Tun, B: UDP> PeerInner<T, B> {
* handshake response or after receiving a handshake response. * handshake response or after receiving a handshake response.
*/ */
pub fn timers_session_derived(&self) { pub fn timers_session_derived(&self) {
log::trace!("timers_session_derived");
let timers = self.timers(); let timers = self.timers();
if timers.enabled { if timers.enabled {
timers.zero_key_material.reset(REJECT_AFTER_TIME * 3); timers.zero_key_material.reset(REJECT_AFTER_TIME * 3);
@@ -164,6 +169,7 @@ impl<T: Tun, B: UDP> PeerInner<T, B> {
* keepalive, data, or handshake is sent, or after one is received. * keepalive, data, or handshake is sent, or after one is received.
*/ */
pub fn timers_any_authenticated_packet_traversal(&self) { pub fn timers_any_authenticated_packet_traversal(&self) {
log::trace!("timers_any_authenticated_packet_traversal");
let timers = self.timers(); let timers = self.timers();
if timers.enabled && timers.keepalive_interval > 0 { if timers.enabled && timers.keepalive_interval > 0 {
// push persistent_keepalive into the future // push persistent_keepalive into the future
@@ -174,6 +180,7 @@ impl<T: Tun, B: UDP> PeerInner<T, B> {
} }
fn timers_set_retransmit_handshake(&self) { fn timers_set_retransmit_handshake(&self) {
log::trace!("timers_set_retransmit_handshake");
let timers = self.timers(); let timers = self.timers();
if timers.enabled { if timers.enabled {
timers.retransmit_handshake.reset(REKEY_TIMEOUT); timers.retransmit_handshake.reset(REKEY_TIMEOUT);
@@ -205,11 +212,11 @@ impl<T: Tun, B: UDP> PeerInner<T, B> {
// stop the keepalive timer with the old interval // stop the keepalive timer with the old interval
timers.send_persistent_keepalive.stop(); timers.send_persistent_keepalive.stop();
// restart the persistent_keepalive timer with the new interval // cause immediate expiry of persistent_keepalive timer
if secs > 0 && timers.enabled { if secs > 0 && timers.enabled {
timers timers
.send_persistent_keepalive .send_persistent_keepalive
.start(Duration::from_secs(secs)); .reset(Duration::from_secs(0));
} }
} }
@@ -233,6 +240,8 @@ impl Timers {
retransmit_handshake: { retransmit_handshake: {
let peer = peer.clone(); let peer = peer.clone();
runner.timer(move || { runner.timer(move || {
log::trace!("{} : timer fired (retransmit_handshake)", peer);
// ignore if timers are disabled // ignore if timers are disabled
let timers = peer.timers(); let timers = peer.timers();
if !timers.enabled { if !timers.enabled {
@@ -269,6 +278,8 @@ impl Timers {
send_keepalive: { send_keepalive: {
let peer = peer.clone(); let peer = peer.clone();
runner.timer(move || { runner.timer(move || {
log::trace!("{} : timer fired (send_keepalive)", peer);
// ignore if timers are disabled // ignore if timers are disabled
let timers = peer.timers(); let timers = peer.timers();
if !timers.enabled { if !timers.enabled {
@@ -284,7 +295,8 @@ impl Timers {
new_handshake: { new_handshake: {
let peer = peer.clone(); let peer = peer.clone();
runner.timer(move || { runner.timer(move || {
debug!( log::trace!("{} : timer fired (new_handshake)", peer);
log::debug!(
"Retrying handshake with {} because we stopped hearing back after {} seconds", "Retrying handshake with {} because we stopped hearing back after {} seconds",
peer, peer,
(KEEPALIVE_TIMEOUT + REKEY_TIMEOUT).as_secs() (KEEPALIVE_TIMEOUT + REKEY_TIMEOUT).as_secs()
@@ -296,16 +308,19 @@ impl Timers {
zero_key_material: { zero_key_material: {
let peer = peer.clone(); let peer = peer.clone();
runner.timer(move || { runner.timer(move || {
log::trace!("{} : timer fired (zero_key_material)", peer);
peer.router.zero_keys(); peer.router.zero_keys();
}) })
}, },
send_persistent_keepalive: { send_persistent_keepalive: {
let peer = peer.clone(); let peer = peer.clone();
runner.timer(move || { runner.timer(move || {
log::trace!("{} : timer fired (send_persistent_keepalive)", peer);
let timers = peer.timers(); let timers = peer.timers();
if timers.enabled && timers.keepalive_interval > 0 { if timers.enabled && timers.keepalive_interval > 0 {
peer.router.send_keepalive();
timers.send_keepalive.stop(); timers.send_keepalive.stop();
let queued = peer.router.send_keepalive();
log::trace!("{} : keepalive queued {}", peer, queued);
timers timers
.send_persistent_keepalive .send_persistent_keepalive
.start(Duration::from_secs(timers.keepalive_interval)); .start(Duration::from_secs(timers.keepalive_interval));
@@ -331,8 +346,7 @@ impl Timers {
} }
} }
/* Instance of the router callbacks */ /* instance of the router callbacks */
pub struct Events<T, B>(PhantomData<(T, B)>); pub struct Events<T, B>(PhantomData<(T, B)>);
impl<T: Tun, B: UDP> Callbacks for Events<T, B> { impl<T: Tun, B: UDP> Callbacks for Events<T, B> {
@@ -343,6 +357,8 @@ impl<T: Tun, B: UDP> Callbacks for Events<T, B> {
*/ */
#[inline(always)] #[inline(always)]
fn send(peer: &Self::Opaque, size: usize, sent: bool, keypair: &Arc<KeyPair>, counter: u64) { fn send(peer: &Self::Opaque, size: usize, sent: bool, keypair: &Arc<KeyPair>, counter: u64) {
log::trace!("{} : EVENT(send)", peer);
// update timers and stats // update timers and stats
peer.timers_any_authenticated_packet_traversal(); peer.timers_any_authenticated_packet_traversal();
@@ -373,6 +389,8 @@ impl<T: Tun, B: UDP> Callbacks for Events<T, B> {
*/ */
#[inline(always)] #[inline(always)]
fn recv(peer: &Self::Opaque, size: usize, sent: bool, keypair: &Arc<KeyPair>) { fn recv(peer: &Self::Opaque, size: usize, sent: bool, keypair: &Arc<KeyPair>) {
log::trace!("{} : EVENT(recv)", peer);
// update timers and stats // update timers and stats
peer.timers_any_authenticated_packet_traversal(); peer.timers_any_authenticated_packet_traversal();
@@ -407,11 +425,13 @@ impl<T: Tun, B: UDP> Callbacks for Events<T, B> {
*/ */
#[inline(always)] #[inline(always)]
fn need_key(peer: &Self::Opaque) { fn need_key(peer: &Self::Opaque) {
log::trace!("{} : EVENT(need_key)", peer);
peer.packet_send_queued_handshake_initiation(false); peer.packet_send_queued_handshake_initiation(false);
} }
#[inline(always)] #[inline(always)]
fn key_confirmed(peer: &Self::Opaque) { fn key_confirmed(peer: &Self::Opaque) {
log::trace!("{} : EVENT(key_confirmed)", peer);
peer.timers_handshake_complete(); peer.timers_handshake_complete();
} }
} }

View File

@@ -20,7 +20,7 @@ use super::udp::UDP;
// constants // constants
use super::constants::{ use super::constants::{
DURATION_UNDER_LOAD, MAX_QUEUED_INCOMING_HANDSHAKES, MESSAGE_PADDING_MULTIPLE, DURATION_UNDER_LOAD, MAX_QUEUED_INCOMING_HANDSHAKES, MESSAGE_PADDING_MULTIPLE,
THRESHOLD_UNDER_LOAD, TIME_HORIZON, THRESHOLD_UNDER_LOAD,
}; };
use super::handshake::MAX_HANDSHAKE_MSG_SIZE; use super::handshake::MAX_HANDSHAKE_MSG_SIZE;
use super::handshake::{TYPE_COOKIE_REPLY, TYPE_INITIATION, TYPE_RESPONSE}; use super::handshake::{TYPE_COOKIE_REPLY, TYPE_INITIATION, TYPE_RESPONSE};
@@ -102,8 +102,6 @@ pub fn tun_worker<T: Tun, B: UDP>(wg: &WireGuard<T, B>, reader: T::Reader) {
} }
pub fn udp_worker<T: Tun, B: UDP>(wg: &WireGuard<T, B>, reader: B::Reader) { pub fn udp_worker<T: Tun, B: UDP>(wg: &WireGuard<T, B>, reader: B::Reader) {
let mut last_under_load = Instant::now() - TIME_HORIZON;
loop { loop {
// create vector big enough for any message given current MTU // create vector big enough for any message given current MTU
let mtu = wg.mtu.load(Ordering::Relaxed); let mtu = wg.mtu.load(Ordering::Relaxed);
@@ -160,26 +158,26 @@ pub fn handshake_worker<T: Tun, B: UDP>(
// process elements from the handshake queue // process elements from the handshake queue
for job in rx { for job in rx {
// check if under load // check if under load
let mut under_load = false;
let job: HandshakeJob<B::Endpoint> = job; let job: HandshakeJob<B::Endpoint> = job;
let pending = wg.pending.fetch_sub(1, Ordering::SeqCst); let pending = wg.pending.fetch_sub(1, Ordering::SeqCst);
let mut under_load = false;
debug_assert!(pending < MAX_QUEUED_INCOMING_HANDSHAKES + (1 << 16)); debug_assert!(pending < MAX_QUEUED_INCOMING_HANDSHAKES + (1 << 16));
// immediate go under load if too many handshakes pending // immediate go under load if too many handshakes pending
if pending > THRESHOLD_UNDER_LOAD { if pending > THRESHOLD_UNDER_LOAD {
log::trace!("{} : handshake worker, under load (above threshold)", wg);
*wg.last_under_load.lock() = Instant::now(); *wg.last_under_load.lock() = Instant::now();
under_load = true; under_load = true;
} }
// remain under load for a while // remain under load for DURATION_UNDER_LOAD
if !under_load { if !under_load {
let elapsed = wg.last_under_load.lock().elapsed(); let elapsed = wg.last_under_load.lock().elapsed();
if elapsed > DURATION_UNDER_LOAD { if DURATION_UNDER_LOAD >= elapsed {
log::trace!("{} : handshake worker, under load (recent)", wg);
under_load = true; under_load = true;
} }
} }
log::trace!("{} : handshake worker, under_load = {}", wg, under_load);
// de-multiplex staged handshake jobs and handshake messages // de-multiplex staged handshake jobs and handshake messages
match job { match job {