Fixed inbound job bug (add to sequential queue)
This commit is contained in:
@@ -1,8 +1,6 @@
|
|||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
use std::ops::Deref;
|
use std::ops::Deref;
|
||||||
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
|
use std::sync::atomic::AtomicBool;
|
||||||
use std::sync::mpsc::sync_channel;
|
|
||||||
use std::sync::mpsc::{Receiver, SyncSender};
|
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use std::thread;
|
use std::thread;
|
||||||
use std::time::Instant;
|
use std::time::Instant;
|
||||||
@@ -25,47 +23,7 @@ use super::SIZE_MESSAGE_PREFIX;
|
|||||||
use super::route::RoutingTable;
|
use super::route::RoutingTable;
|
||||||
|
|
||||||
use super::super::{tun, udp, Endpoint, KeyPair};
|
use super::super::{tun, udp, Endpoint, KeyPair};
|
||||||
|
use super::queue::ParallelQueue;
|
||||||
pub struct ParallelQueue<T> {
|
|
||||||
next: AtomicUsize, // next round-robin index
|
|
||||||
queues: Vec<Mutex<SyncSender<T>>>, // work queues (1 per thread)
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<T> ParallelQueue<T> {
|
|
||||||
fn new(queues: usize) -> (Vec<Receiver<T>>, Self) {
|
|
||||||
let mut rxs = vec![];
|
|
||||||
let mut txs = vec![];
|
|
||||||
|
|
||||||
for _ in 0..queues {
|
|
||||||
let (tx, rx) = sync_channel(128);
|
|
||||||
txs.push(Mutex::new(tx));
|
|
||||||
rxs.push(rx);
|
|
||||||
}
|
|
||||||
|
|
||||||
(
|
|
||||||
rxs,
|
|
||||||
ParallelQueue {
|
|
||||||
next: AtomicUsize::new(0),
|
|
||||||
queues: txs,
|
|
||||||
},
|
|
||||||
)
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn send(&self, v: T) {
|
|
||||||
let len = self.queues.len();
|
|
||||||
let idx = self.next.fetch_add(1, Ordering::SeqCst);
|
|
||||||
let que = self.queues[idx % len].lock();
|
|
||||||
que.send(v).unwrap();
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn close(&self) {
|
|
||||||
for i in 0..self.queues.len() {
|
|
||||||
let (tx, _) = sync_channel(0);
|
|
||||||
let queue = &self.queues[i];
|
|
||||||
*queue.lock() = tx;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub struct DeviceInner<E: Endpoint, C: Callbacks, T: tun::Writer, B: udp::Writer<E>> {
|
pub struct DeviceInner<E: Endpoint, C: Callbacks, T: tun::Writer, B: udp::Writer<E>> {
|
||||||
// inbound writer (TUN)
|
// inbound writer (TUN)
|
||||||
@@ -171,16 +129,25 @@ impl<E: Endpoint, C: Callbacks, T: tun::Writer, B: udp::Writer<E>> DeviceHandle<
|
|||||||
|
|
||||||
// start worker threads
|
// start worker threads
|
||||||
let mut threads = Vec::with_capacity(num_workers);
|
let mut threads = Vec::with_capacity(num_workers);
|
||||||
|
|
||||||
for _ in 0..num_workers {
|
for _ in 0..num_workers {
|
||||||
let rx = inrx.pop().unwrap();
|
let rx = inrx.pop().unwrap();
|
||||||
threads.push(thread::spawn(move || inbound::worker(rx)));
|
threads.push(thread::spawn(move || {
|
||||||
|
log::debug!("inbound router worker started");
|
||||||
|
inbound::worker(rx)
|
||||||
|
}));
|
||||||
}
|
}
|
||||||
|
|
||||||
for _ in 0..num_workers {
|
for _ in 0..num_workers {
|
||||||
let rx = outrx.pop().unwrap();
|
let rx = outrx.pop().unwrap();
|
||||||
threads.push(thread::spawn(move || outbound::worker(rx)));
|
threads.push(thread::spawn(move || {
|
||||||
|
log::debug!("outbound router worker started");
|
||||||
|
outbound::worker(rx)
|
||||||
|
}));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
debug_assert_eq!(threads.len(), num_workers * 2);
|
||||||
|
|
||||||
// return exported device handle
|
// return exported device handle
|
||||||
DeviceHandle {
|
DeviceHandle {
|
||||||
state: Device {
|
state: Device {
|
||||||
@@ -274,7 +241,7 @@ impl<E: Endpoint, C: Callbacks, T: tun::Writer, B: udp::Writer<E>> DeviceHandle<
|
|||||||
);
|
);
|
||||||
|
|
||||||
log::trace!(
|
log::trace!(
|
||||||
"Router, handle transport message: (receiver = {}, counter = {})",
|
"handle transport message: (receiver = {}, counter = {})",
|
||||||
header.f_receiver,
|
header.f_receiver,
|
||||||
header.f_counter
|
header.f_counter
|
||||||
);
|
);
|
||||||
@@ -287,9 +254,9 @@ impl<E: Endpoint, C: Callbacks, T: tun::Writer, B: udp::Writer<E>> DeviceHandle<
|
|||||||
|
|
||||||
// schedule for decryption and TUN write
|
// schedule for decryption and TUN write
|
||||||
if let Some(job) = dec.peer.recv_job(src, dec.clone(), msg) {
|
if let Some(job) = dec.peer.recv_job(src, dec.clone(), msg) {
|
||||||
|
log::trace!("schedule decryption of transport message");
|
||||||
self.state.inbound_queue.send(job);
|
self.state.inbound_queue.send(job);
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -42,6 +42,8 @@ fn parallel<E: Endpoint, C: Callbacks, T: tun::Writer, B: udp::Writer<E>>(
|
|||||||
peer: &Peer<E, C, T, B>,
|
peer: &Peer<E, C, T, B>,
|
||||||
body: &mut Inbound<E, C, T, B>,
|
body: &mut Inbound<E, C, T, B>,
|
||||||
) {
|
) {
|
||||||
|
log::trace!("worker, parallel section, obtained job");
|
||||||
|
|
||||||
// cast to header followed by payload
|
// cast to header followed by payload
|
||||||
let (header, packet): (LayoutVerified<&mut [u8], TransportHeader>, &mut [u8]) =
|
let (header, packet): (LayoutVerified<&mut [u8], TransportHeader>, &mut [u8]) =
|
||||||
match LayoutVerified::new_from_prefix(&mut body.msg[..]) {
|
match LayoutVerified::new_from_prefix(&mut body.msg[..]) {
|
||||||
@@ -70,6 +72,7 @@ fn parallel<E: Endpoint, C: Callbacks, T: tun::Writer, B: udp::Writer<E>>(
|
|||||||
Ok(_) => (),
|
Ok(_) => (),
|
||||||
Err(_) => {
|
Err(_) => {
|
||||||
// fault and return early
|
// fault and return early
|
||||||
|
log::trace!("inbound worker: authentication failure");
|
||||||
body.failed = true;
|
body.failed = true;
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
@@ -89,9 +92,15 @@ fn parallel<E: Endpoint, C: Callbacks, T: tun::Writer, B: udp::Writer<E>>(
|
|||||||
// truncate to remove tag
|
// truncate to remove tag
|
||||||
match inner_len {
|
match inner_len {
|
||||||
None => {
|
None => {
|
||||||
|
log::trace!("inbound worker: cryptokey routing failed");
|
||||||
body.failed = true;
|
body.failed = true;
|
||||||
}
|
}
|
||||||
Some(len) => {
|
Some(len) => {
|
||||||
|
log::trace!(
|
||||||
|
"inbound worker: good route, length = {} {}",
|
||||||
|
len,
|
||||||
|
if len == 0 { "(keepalive)" } else { "" }
|
||||||
|
);
|
||||||
body.msg.truncate(mem::size_of::<TransportHeader>() + len);
|
body.msg.truncate(mem::size_of::<TransportHeader>() + len);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -102,8 +111,11 @@ fn sequential<E: Endpoint, C: Callbacks, T: tun::Writer, B: udp::Writer<E>>(
|
|||||||
peer: &Peer<E, C, T, B>,
|
peer: &Peer<E, C, T, B>,
|
||||||
body: &mut Inbound<E, C, T, B>,
|
body: &mut Inbound<E, C, T, B>,
|
||||||
) {
|
) {
|
||||||
|
log::trace!("worker, sequential section, obtained job");
|
||||||
|
|
||||||
// decryption failed, return early
|
// decryption failed, return early
|
||||||
if body.failed {
|
if body.failed {
|
||||||
|
log::trace!("job faulted, remove from queue and ignore");
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -116,10 +128,6 @@ fn sequential<E: Endpoint, C: Callbacks, T: tun::Writer, B: udp::Writer<E>>(
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
debug_assert!(
|
|
||||||
packet.len() >= CHACHA20_POLY1305.tag_len(),
|
|
||||||
"this should be checked earlier in the pipeline (decryption should fail)"
|
|
||||||
);
|
|
||||||
|
|
||||||
// check for replay
|
// check for replay
|
||||||
if !body.state.protector.lock().update(header.f_counter.get()) {
|
if !body.state.protector.lock().update(header.f_counter.get()) {
|
||||||
@@ -136,13 +144,9 @@ fn sequential<E: Endpoint, C: Callbacks, T: tun::Writer, B: udp::Writer<E>>(
|
|||||||
// update endpoint
|
// update endpoint
|
||||||
*peer.endpoint.lock() = body.endpoint.take();
|
*peer.endpoint.lock() = body.endpoint.take();
|
||||||
|
|
||||||
// calculate length of IP packet + padding
|
|
||||||
let length = packet.len() - SIZE_TAG;
|
|
||||||
log::debug!("inbound worker: plaintext length = {}", length);
|
|
||||||
|
|
||||||
// check if should be written to TUN
|
// check if should be written to TUN
|
||||||
let mut sent = false;
|
let mut sent = false;
|
||||||
if length > 0 {
|
if packet.len() > 0 {
|
||||||
sent = match peer.device.inbound.write(&packet[..]) {
|
sent = match peer.device.inbound.write(&packet[..]) {
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
log::debug!("failed to write inbound packet to TUN: {:?}", e);
|
log::debug!("failed to write inbound packet to TUN: {:?}", e);
|
||||||
|
|||||||
@@ -7,6 +7,7 @@ mod messages;
|
|||||||
mod outbound;
|
mod outbound;
|
||||||
mod peer;
|
mod peer;
|
||||||
mod pool;
|
mod pool;
|
||||||
|
mod queue;
|
||||||
mod route;
|
mod route;
|
||||||
mod types;
|
mod types;
|
||||||
|
|
||||||
|
|||||||
@@ -35,6 +35,8 @@ fn parallel<E: Endpoint, C: Callbacks, T: tun::Writer, B: udp::Writer<E>>(
|
|||||||
_peer: &Peer<E, C, T, B>,
|
_peer: &Peer<E, C, T, B>,
|
||||||
body: &mut Outbound,
|
body: &mut Outbound,
|
||||||
) {
|
) {
|
||||||
|
log::trace!("worker, parallel section, obtained job");
|
||||||
|
|
||||||
// make space for the tag
|
// make space for the tag
|
||||||
body.msg.extend([0u8; SIZE_TAG].iter());
|
body.msg.extend([0u8; SIZE_TAG].iter());
|
||||||
|
|
||||||
@@ -77,6 +79,8 @@ fn sequential<E: Endpoint, C: Callbacks, T: tun::Writer, B: udp::Writer<E>>(
|
|||||||
peer: &Peer<E, C, T, B>,
|
peer: &Peer<E, C, T, B>,
|
||||||
body: &mut Outbound,
|
body: &mut Outbound,
|
||||||
) {
|
) {
|
||||||
|
log::trace!("worker, sequential section, obtained job");
|
||||||
|
|
||||||
// send to peer
|
// send to peer
|
||||||
let xmit = peer.send(&body.msg[..]).is_ok();
|
let xmit = peer.send(&body.msg[..]).is_ok();
|
||||||
|
|
||||||
|
|||||||
@@ -276,7 +276,9 @@ impl<E: Endpoint, C: Callbacks, T: tun::Writer, B: udp::Writer<E>> Peer<E, C, T,
|
|||||||
dec: Arc<DecryptionState<E, C, T, B>>,
|
dec: Arc<DecryptionState<E, C, T, B>>,
|
||||||
msg: Vec<u8>,
|
msg: Vec<u8>,
|
||||||
) -> Option<Job<Self, Inbound<E, C, T, B>>> {
|
) -> Option<Job<Self, Inbound<E, C, T, B>>> {
|
||||||
Some(Job::new(self.clone(), Inbound::new(msg, dec, src)))
|
let job = Job::new(self.clone(), Inbound::new(msg, dec, src));
|
||||||
|
self.inbound.send(job.clone());
|
||||||
|
Some(job)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn send_job(&self, msg: Vec<u8>, stage: bool) -> Option<Job<Self, Outbound>> {
|
pub fn send_job(&self, msg: Vec<u8>, stage: bool) -> Option<Job<Self, Outbound>> {
|
||||||
|
|||||||
@@ -106,6 +106,7 @@ pub fn worker_template<
|
|||||||
work_sequential: S, // perform sequential work on peer
|
work_sequential: S, // perform sequential work on peer
|
||||||
queue: Q, // resolve a peer to an inorder queue
|
queue: Q, // resolve a peer to an inorder queue
|
||||||
) {
|
) {
|
||||||
|
log::trace!("router worker started");
|
||||||
loop {
|
loop {
|
||||||
// handle new job
|
// handle new job
|
||||||
let peer = {
|
let peer = {
|
||||||
|
|||||||
46
src/wireguard/router/queue.rs
Normal file
46
src/wireguard/router/queue.rs
Normal file
@@ -0,0 +1,46 @@
|
|||||||
|
use std::sync::atomic::{AtomicUsize, Ordering};
|
||||||
|
use std::sync::mpsc::sync_channel;
|
||||||
|
use std::sync::mpsc::{Receiver, SyncSender};
|
||||||
|
|
||||||
|
use spin::Mutex;
|
||||||
|
|
||||||
|
pub struct ParallelQueue<T> {
|
||||||
|
next: AtomicUsize, // next round-robin index
|
||||||
|
queues: Vec<Mutex<SyncSender<T>>>, // work queues (1 per thread)
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T> ParallelQueue<T> {
|
||||||
|
pub fn new(queues: usize) -> (Vec<Receiver<T>>, Self) {
|
||||||
|
let mut rxs = vec![];
|
||||||
|
let mut txs = vec![];
|
||||||
|
|
||||||
|
for _ in 0..queues {
|
||||||
|
let (tx, rx) = sync_channel(128);
|
||||||
|
txs.push(Mutex::new(tx));
|
||||||
|
rxs.push(rx);
|
||||||
|
}
|
||||||
|
|
||||||
|
(
|
||||||
|
rxs,
|
||||||
|
ParallelQueue {
|
||||||
|
next: AtomicUsize::new(0),
|
||||||
|
queues: txs,
|
||||||
|
},
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn send(&self, v: T) {
|
||||||
|
let len = self.queues.len();
|
||||||
|
let idx = self.next.fetch_add(1, Ordering::SeqCst);
|
||||||
|
let que = self.queues[idx % len].lock();
|
||||||
|
que.send(v).unwrap();
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn close(&self) {
|
||||||
|
for i in 0..self.queues.len() {
|
||||||
|
let (tx, _) = sync_channel(0);
|
||||||
|
let queue = &self.queues[i];
|
||||||
|
*queue.lock() = tx;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -81,7 +81,7 @@ impl<T: Eq + Clone> RoutingTable<T> {
|
|||||||
LayoutVerified::new_from_prefix(packet)?;
|
LayoutVerified::new_from_prefix(packet)?;
|
||||||
|
|
||||||
log::trace!(
|
log::trace!(
|
||||||
"Router, get route for IPv4 destination: {:?}",
|
"router, get route for IPv4 destination: {:?}",
|
||||||
Ipv4Addr::from(header.f_destination)
|
Ipv4Addr::from(header.f_destination)
|
||||||
);
|
);
|
||||||
|
|
||||||
@@ -97,7 +97,7 @@ impl<T: Eq + Clone> RoutingTable<T> {
|
|||||||
LayoutVerified::new_from_prefix(packet)?;
|
LayoutVerified::new_from_prefix(packet)?;
|
||||||
|
|
||||||
log::trace!(
|
log::trace!(
|
||||||
"Router, get route for IPv6 destination: {:?}",
|
"router, get route for IPv6 destination: {:?}",
|
||||||
Ipv6Addr::from(header.f_destination)
|
Ipv6Addr::from(header.f_destination)
|
||||||
);
|
);
|
||||||
|
|
||||||
@@ -107,7 +107,10 @@ impl<T: Eq + Clone> RoutingTable<T> {
|
|||||||
.longest_match(Ipv6Addr::from(header.f_destination))
|
.longest_match(Ipv6Addr::from(header.f_destination))
|
||||||
.and_then(|(_, _, p)| Some(p.clone()))
|
.and_then(|(_, _, p)| Some(p.clone()))
|
||||||
}
|
}
|
||||||
_ => None,
|
v => {
|
||||||
|
log::trace!("router, invalid IP version {}", v);
|
||||||
|
None
|
||||||
|
},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -120,7 +123,7 @@ impl<T: Eq + Clone> RoutingTable<T> {
|
|||||||
LayoutVerified::new_from_prefix(packet)?;
|
LayoutVerified::new_from_prefix(packet)?;
|
||||||
|
|
||||||
log::trace!(
|
log::trace!(
|
||||||
"Router, check route for IPv4 source: {:?}",
|
"router, check route for IPv4 source: {:?}",
|
||||||
Ipv4Addr::from(header.f_source)
|
Ipv4Addr::from(header.f_source)
|
||||||
);
|
);
|
||||||
|
|
||||||
@@ -142,7 +145,7 @@ impl<T: Eq + Clone> RoutingTable<T> {
|
|||||||
LayoutVerified::new_from_prefix(packet)?;
|
LayoutVerified::new_from_prefix(packet)?;
|
||||||
|
|
||||||
log::trace!(
|
log::trace!(
|
||||||
"Router, check route for IPv6 source: {:?}",
|
"router, check route for IPv6 source: {:?}",
|
||||||
Ipv6Addr::from(header.f_source)
|
Ipv6Addr::from(header.f_source)
|
||||||
);
|
);
|
||||||
|
|
||||||
|
|||||||
@@ -9,7 +9,7 @@ use num_cpus;
|
|||||||
|
|
||||||
use super::super::dummy;
|
use super::super::dummy;
|
||||||
use super::super::dummy_keypair;
|
use super::super::dummy_keypair;
|
||||||
use super::super::tests::make_packet_dst;
|
use super::super::tests::make_packet;
|
||||||
use super::super::udp::*;
|
use super::super::udp::*;
|
||||||
use super::KeyPair;
|
use super::KeyPair;
|
||||||
use super::SIZE_MESSAGE_PREFIX;
|
use super::SIZE_MESSAGE_PREFIX;
|
||||||
@@ -105,15 +105,15 @@ mod tests {
|
|||||||
|
|
||||||
// wait for scheduling
|
// wait for scheduling
|
||||||
fn wait() {
|
fn wait() {
|
||||||
thread::sleep(Duration::from_millis(50));
|
thread::sleep(Duration::from_millis(15));
|
||||||
}
|
}
|
||||||
|
|
||||||
fn init() {
|
fn init() {
|
||||||
let _ = env_logger::builder().is_test(true).try_init();
|
let _ = env_logger::builder().is_test(true).try_init();
|
||||||
}
|
}
|
||||||
|
|
||||||
fn make_packet_dst_padded(size: usize, dst: IpAddr, id: u64) -> Vec<u8> {
|
fn make_packet_padded(size: usize, src: IpAddr, dst: IpAddr, id: u64) -> Vec<u8> {
|
||||||
let p = make_packet_dst(size, dst, id);
|
let p = make_packet(size, src, dst, id);
|
||||||
let mut o = vec![0; p.len() + SIZE_MESSAGE_PREFIX];
|
let mut o = vec![0; p.len() + SIZE_MESSAGE_PREFIX];
|
||||||
o[SIZE_MESSAGE_PREFIX..SIZE_MESSAGE_PREFIX + p.len()].copy_from_slice(&p[..]);
|
o[SIZE_MESSAGE_PREFIX..SIZE_MESSAGE_PREFIX + p.len()].copy_from_slice(&p[..]);
|
||||||
o
|
o
|
||||||
@@ -149,15 +149,21 @@ mod tests {
|
|||||||
peer.add_keypair(dummy_keypair(true));
|
peer.add_keypair(dummy_keypair(true));
|
||||||
|
|
||||||
// add subnet to peer
|
// add subnet to peer
|
||||||
let (mask, len, ip) = ("192.168.1.0", 24, "192.168.1.20");
|
let (mask, len, dst) = ("192.168.1.0", 24, "192.168.1.20");
|
||||||
let mask: IpAddr = mask.parse().unwrap();
|
let mask: IpAddr = mask.parse().unwrap();
|
||||||
let ip1: IpAddr = ip.parse().unwrap();
|
|
||||||
peer.add_allowed_ip(mask, len);
|
peer.add_allowed_ip(mask, len);
|
||||||
|
|
||||||
|
// create "IP packet"
|
||||||
|
let dst = dst.parse().unwrap();
|
||||||
|
let src = match dst {
|
||||||
|
IpAddr::V4(_) => "127.0.0.1".parse().unwrap(),
|
||||||
|
IpAddr::V6(_) => "::1".parse().unwrap()
|
||||||
|
};
|
||||||
|
let msg = make_packet_padded(1024, src, dst, 0);
|
||||||
|
|
||||||
// every iteration sends 10 GB
|
// every iteration sends 10 GB
|
||||||
b.iter(|| {
|
b.iter(|| {
|
||||||
opaque.store(0, Ordering::SeqCst);
|
opaque.store(0, Ordering::SeqCst);
|
||||||
let msg = make_packet_dst_padded(1024, ip1, 0);
|
|
||||||
while opaque.load(Ordering::Acquire) < 10 * 1024 * 1024 {
|
while opaque.load(Ordering::Acquire) < 10 * 1024 * 1024 {
|
||||||
router.send(msg.to_vec()).unwrap();
|
router.send(msg.to_vec()).unwrap();
|
||||||
}
|
}
|
||||||
@@ -197,7 +203,8 @@ mod tests {
|
|||||||
),
|
),
|
||||||
];
|
];
|
||||||
|
|
||||||
for (num, (mask, len, ip, okay)) in tests.iter().enumerate() {
|
for (num, (mask, len, dst, okay)) in tests.iter().enumerate() {
|
||||||
|
println!("Check: {} {} {}/{}", dst, if *okay { "\\in" } else { "\\notin" }, mask, len);
|
||||||
for set_key in vec![true, false] {
|
for set_key in vec![true, false] {
|
||||||
debug!("index = {}, set_key = {}", num, set_key);
|
debug!("index = {}, set_key = {}", num, set_key);
|
||||||
|
|
||||||
@@ -213,7 +220,12 @@ mod tests {
|
|||||||
peer.add_allowed_ip(mask, *len);
|
peer.add_allowed_ip(mask, *len);
|
||||||
|
|
||||||
// create "IP packet"
|
// create "IP packet"
|
||||||
let msg = make_packet_dst_padded(1024, ip.parse().unwrap(), 0);
|
let dst = dst.parse().unwrap();
|
||||||
|
let src = match dst {
|
||||||
|
IpAddr::V4(_) => "127.0.0.1".parse().unwrap(),
|
||||||
|
IpAddr::V6(_) => "::1".parse().unwrap()
|
||||||
|
};
|
||||||
|
let msg = make_packet_padded(1024, src, dst, 0);
|
||||||
|
|
||||||
// cryptkey route the IP packet
|
// cryptkey route the IP packet
|
||||||
let res = router.send(msg);
|
let res = router.send(msg);
|
||||||
@@ -269,17 +281,14 @@ mod tests {
|
|||||||
|
|
||||||
let tests = [
|
let tests = [
|
||||||
(
|
(
|
||||||
false, // confirm with keepalive
|
|
||||||
("192.168.1.0", 24, "192.168.1.20", true),
|
("192.168.1.0", 24, "192.168.1.20", true),
|
||||||
("172.133.133.133", 32, "172.133.133.133", true),
|
("172.133.133.133", 32, "172.133.133.133", true),
|
||||||
),
|
),
|
||||||
(
|
(
|
||||||
true, // confirm with staged packet
|
|
||||||
("192.168.1.0", 24, "192.168.1.20", true),
|
("192.168.1.0", 24, "192.168.1.20", true),
|
||||||
("172.133.133.133", 32, "172.133.133.133", true),
|
("172.133.133.133", 32, "172.133.133.133", true),
|
||||||
),
|
),
|
||||||
(
|
(
|
||||||
false, // confirm with keepalive
|
|
||||||
(
|
(
|
||||||
"2001:db8::ff00:42:8000",
|
"2001:db8::ff00:42:8000",
|
||||||
113,
|
113,
|
||||||
@@ -294,7 +303,6 @@ mod tests {
|
|||||||
),
|
),
|
||||||
),
|
),
|
||||||
(
|
(
|
||||||
false, // confirm with staged packet
|
|
||||||
(
|
(
|
||||||
"2001:db8::ff00:42:8000",
|
"2001:db8::ff00:42:8000",
|
||||||
113,
|
113,
|
||||||
@@ -310,117 +318,152 @@ mod tests {
|
|||||||
),
|
),
|
||||||
];
|
];
|
||||||
|
|
||||||
for (stage, p1, p2) in tests.iter() {
|
for stage in vec![true, false] {
|
||||||
let ((bind_reader1, bind_writer1), (bind_reader2, bind_writer2)) =
|
for (p1, p2) in tests.iter() {
|
||||||
dummy::PairBind::pair();
|
let ((bind_reader1, bind_writer1), (bind_reader2, bind_writer2)) =
|
||||||
|
dummy::PairBind::pair();
|
||||||
|
|
||||||
// create matching device
|
// create matching device
|
||||||
let (_fake, _, tun_writer1, _) = dummy::TunTest::create(false);
|
let (_fake, _, tun_writer1, _) = dummy::TunTest::create(false);
|
||||||
let (_fake, _, tun_writer2, _) = dummy::TunTest::create(false);
|
let (_fake, _, tun_writer2, _) = dummy::TunTest::create(false);
|
||||||
|
|
||||||
let router1: Device<_, TestCallbacks, _, _> = Device::new(1, tun_writer1);
|
let router1: Device<_, TestCallbacks, _, _> = Device::new(1, tun_writer1);
|
||||||
router1.set_outbound_writer(bind_writer1);
|
router1.set_outbound_writer(bind_writer1);
|
||||||
|
|
||||||
let router2: Device<_, TestCallbacks, _, _> = Device::new(1, tun_writer2);
|
let router2: Device<_, TestCallbacks, _, _> = Device::new(1, tun_writer2);
|
||||||
router2.set_outbound_writer(bind_writer2);
|
router2.set_outbound_writer(bind_writer2);
|
||||||
|
|
||||||
// prepare opaque values for tracing callbacks
|
// prepare opaque values for tracing callbacks
|
||||||
|
|
||||||
let opaq1 = Opaque::new();
|
let opaque1 = Opaque::new();
|
||||||
let opaq2 = Opaque::new();
|
let opaque2 = Opaque::new();
|
||||||
|
|
||||||
// create peers with matching keypairs and assign subnets
|
// create peers with matching keypairs and assign subnets
|
||||||
|
|
||||||
let (mask, len, _ip, _okay) = p1;
|
let peer1 = router1.new_peer(opaque1.clone());
|
||||||
let peer1 = router1.new_peer(opaq1.clone());
|
let peer2 = router2.new_peer(opaque2.clone());
|
||||||
let mask: IpAddr = mask.parse().unwrap();
|
|
||||||
peer1.add_allowed_ip(mask, *len);
|
|
||||||
peer1.add_keypair(dummy_keypair(false));
|
|
||||||
|
|
||||||
let (mask, len, _ip, _okay) = p2;
|
{
|
||||||
let peer2 = router2.new_peer(opaq2.clone());
|
let (mask, len, _ip, _okay) = p1;
|
||||||
let mask: IpAddr = mask.parse().unwrap();
|
let mask: IpAddr = mask.parse().unwrap();
|
||||||
peer2.add_allowed_ip(mask, *len);
|
peer1.add_allowed_ip(mask, *len);
|
||||||
peer2.set_endpoint(dummy::UnitEndpoint::new());
|
peer1.add_keypair(dummy_keypair(false));
|
||||||
|
}
|
||||||
|
|
||||||
if *stage {
|
{
|
||||||
// stage a packet which can be used for confirmation (in place of a keepalive)
|
let (mask, len, _ip, _okay) = p2;
|
||||||
let (_mask, _len, ip, _okay) = p2;
|
let mask: IpAddr = mask.parse().unwrap();
|
||||||
let msg = make_packet_dst_padded(1024, ip.parse().unwrap(), 0);
|
peer2.add_allowed_ip(mask, *len);
|
||||||
router2.send(msg).expect("failed to sent staged packet");
|
peer2.set_endpoint(dummy::UnitEndpoint::new());
|
||||||
|
}
|
||||||
|
|
||||||
|
if stage {
|
||||||
|
println!("confirm using staged packet");
|
||||||
|
|
||||||
|
// create IP packet
|
||||||
|
let (_mask, _len, ip1, _okay) = p1;
|
||||||
|
let (_mask, _len, ip2, _okay) = p2;
|
||||||
|
let msg = make_packet_padded(
|
||||||
|
1024,
|
||||||
|
ip1.parse().unwrap(), // src
|
||||||
|
ip2.parse().unwrap(), // dst
|
||||||
|
0,
|
||||||
|
);
|
||||||
|
|
||||||
|
// stage packet for sending
|
||||||
|
router2.send(msg).expect("failed to sent staged packet");
|
||||||
|
wait();
|
||||||
|
|
||||||
|
// validate events
|
||||||
|
assert!(opaque2.recv().is_none());
|
||||||
|
assert!(
|
||||||
|
opaque2.send().is_none(),
|
||||||
|
"sending should fail as not key is set"
|
||||||
|
);
|
||||||
|
assert!(
|
||||||
|
opaque2.need_key().is_some(),
|
||||||
|
"a new key should be requested since a packet was attempted transmitted"
|
||||||
|
);
|
||||||
|
assert!(opaque2.is_empty(), "callbacks should only run once");
|
||||||
|
}
|
||||||
|
|
||||||
|
// this should cause a key-confirmation packet (keepalive or staged packet)
|
||||||
|
// this also causes peer1 to learn the "endpoint" for peer2
|
||||||
|
assert!(peer1.get_endpoint().is_none());
|
||||||
|
peer2.add_keypair(dummy_keypair(true));
|
||||||
|
|
||||||
wait();
|
wait();
|
||||||
assert!(opaq2.recv().is_none());
|
assert!(opaque2.send().is_some());
|
||||||
assert!(
|
assert!(opaque2.is_empty(), "events on peer2 should be 'send'");
|
||||||
opaq2.send().is_none(),
|
assert!(opaque1.is_empty(), "nothing should happened on peer1");
|
||||||
"sending should fail as not key is set"
|
|
||||||
);
|
|
||||||
assert!(
|
|
||||||
opaq2.need_key().is_some(),
|
|
||||||
"a new key should be requested since a packet was attempted transmitted"
|
|
||||||
);
|
|
||||||
assert!(opaq2.is_empty(), "callbacks should only run once");
|
|
||||||
}
|
|
||||||
|
|
||||||
// this should cause a key-confirmation packet (keepalive or staged packet)
|
// read confirming message received by the other end ("across the internet")
|
||||||
// this also causes peer1 to learn the "endpoint" for peer2
|
|
||||||
assert!(peer1.get_endpoint().is_none());
|
|
||||||
peer2.add_keypair(dummy_keypair(true));
|
|
||||||
|
|
||||||
wait();
|
|
||||||
assert!(opaq2.send().is_some());
|
|
||||||
assert!(opaq2.is_empty(), "events on peer2 should be 'send'");
|
|
||||||
assert!(opaq1.is_empty(), "nothing should happened on peer1");
|
|
||||||
|
|
||||||
// read confirming message received by the other end ("across the internet")
|
|
||||||
let mut buf = vec![0u8; 2048];
|
|
||||||
let (len, from) = bind_reader1.read(&mut buf).unwrap();
|
|
||||||
buf.truncate(len);
|
|
||||||
router1.recv(from, buf).unwrap();
|
|
||||||
|
|
||||||
wait();
|
|
||||||
assert!(opaq1.recv().is_some());
|
|
||||||
assert!(opaq1.key_confirmed().is_some());
|
|
||||||
assert!(
|
|
||||||
opaq1.is_empty(),
|
|
||||||
"events on peer1 should be 'recv' and 'key_confirmed'"
|
|
||||||
);
|
|
||||||
assert!(peer1.get_endpoint().is_some());
|
|
||||||
assert!(opaq2.is_empty(), "nothing should happened on peer2");
|
|
||||||
|
|
||||||
// now that peer1 has an endpoint
|
|
||||||
// route packets : peer1 -> peer2
|
|
||||||
|
|
||||||
for id in 0..10 {
|
|
||||||
assert!(
|
|
||||||
opaq1.is_empty(),
|
|
||||||
"we should have asserted a value for every callback on peer1"
|
|
||||||
);
|
|
||||||
assert!(
|
|
||||||
opaq2.is_empty(),
|
|
||||||
"we should have asserted a value for every callback on peer2"
|
|
||||||
);
|
|
||||||
|
|
||||||
// pass IP packet to router
|
|
||||||
let (_mask, _len, ip, _okay) = p1;
|
|
||||||
let msg = make_packet_dst_padded(1024, ip.parse().unwrap(), id);
|
|
||||||
router1.send(msg).unwrap();
|
|
||||||
|
|
||||||
wait();
|
|
||||||
assert!(opaq1.send().is_some());
|
|
||||||
assert!(opaq1.recv().is_none());
|
|
||||||
assert!(opaq1.need_key().is_none());
|
|
||||||
|
|
||||||
// receive ("across the internet") on the other end
|
|
||||||
let mut buf = vec![0u8; 2048];
|
let mut buf = vec![0u8; 2048];
|
||||||
let (len, from) = bind_reader2.read(&mut buf).unwrap();
|
let (len, from) = bind_reader1.read(&mut buf).unwrap();
|
||||||
buf.truncate(len);
|
buf.truncate(len);
|
||||||
router2.recv(from, buf).unwrap();
|
router1.recv(from, buf).unwrap();
|
||||||
|
|
||||||
wait();
|
wait();
|
||||||
assert!(opaq2.send().is_none());
|
assert!(opaque1.recv().is_some());
|
||||||
assert!(opaq2.recv().is_some());
|
assert!(opaque1.key_confirmed().is_some());
|
||||||
assert!(opaq2.need_key().is_none());
|
assert!(
|
||||||
|
opaque1.is_empty(),
|
||||||
|
"events on peer1 should be 'recv' and 'key_confirmed'"
|
||||||
|
);
|
||||||
|
assert!(peer1.get_endpoint().is_some());
|
||||||
|
assert!(opaque2.is_empty(), "nothing should happened on peer2");
|
||||||
|
|
||||||
|
// now that peer1 has an endpoint
|
||||||
|
// route packets : peer1 -> peer2
|
||||||
|
|
||||||
|
for id in 1..11 {
|
||||||
|
println!("round: {}", id);
|
||||||
|
assert!(
|
||||||
|
opaque1.is_empty(),
|
||||||
|
"we should have asserted a value for every callback on peer1"
|
||||||
|
);
|
||||||
|
assert!(
|
||||||
|
opaque2.is_empty(),
|
||||||
|
"we should have asserted a value for every callback on peer2"
|
||||||
|
);
|
||||||
|
|
||||||
|
// pass IP packet to router
|
||||||
|
let (_mask, _len, ip1, _okay) = p1;
|
||||||
|
let (_mask, _len, ip2, _okay) = p2;
|
||||||
|
let msg =
|
||||||
|
make_packet_padded(
|
||||||
|
1024,
|
||||||
|
ip2.parse().unwrap(), // src
|
||||||
|
ip1.parse().unwrap(), // dst
|
||||||
|
id
|
||||||
|
);
|
||||||
|
router1.send(msg).unwrap();
|
||||||
|
|
||||||
|
wait();
|
||||||
|
assert!(opaque1.send().is_some(), "encryption should succeed");
|
||||||
|
assert!(
|
||||||
|
opaque1.recv().is_none(),
|
||||||
|
"receiving callback should not be called"
|
||||||
|
);
|
||||||
|
assert!(opaque1.need_key().is_none());
|
||||||
|
|
||||||
|
// receive ("across the internet") on the other end
|
||||||
|
let mut buf = vec![0u8; 2048];
|
||||||
|
let (len, from) = bind_reader2.read(&mut buf).unwrap();
|
||||||
|
buf.truncate(len);
|
||||||
|
router2.recv(from, buf).unwrap();
|
||||||
|
|
||||||
|
wait();
|
||||||
|
assert!(
|
||||||
|
opaque2.send().is_none(),
|
||||||
|
"sending callback should not be called"
|
||||||
|
);
|
||||||
|
assert!(
|
||||||
|
opaque2.recv().is_some(),
|
||||||
|
"decryption and routing should succeed"
|
||||||
|
);
|
||||||
|
assert!(opaque2.need_key().is_none());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -3,14 +3,14 @@ use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
|
|||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use std::time::{Duration, Instant, SystemTime};
|
use std::time::{Duration, Instant, SystemTime};
|
||||||
|
|
||||||
use log::debug;
|
|
||||||
use hjul::{Runner, Timer};
|
use hjul::{Runner, Timer};
|
||||||
|
use log::debug;
|
||||||
|
|
||||||
use super::constants::*;
|
use super::constants::*;
|
||||||
use super::router::{message_data_len, Callbacks};
|
use super::router::{message_data_len, Callbacks};
|
||||||
use super::{Peer, PeerInner};
|
|
||||||
use super::{udp, tun};
|
|
||||||
use super::types::KeyPair;
|
use super::types::KeyPair;
|
||||||
|
use super::{tun, udp};
|
||||||
|
use super::{Peer, PeerInner};
|
||||||
|
|
||||||
pub struct Timers {
|
pub struct Timers {
|
||||||
// only updated during configuration
|
// only updated during configuration
|
||||||
@@ -36,7 +36,6 @@ impl Timers {
|
|||||||
}
|
}
|
||||||
|
|
||||||
impl<T: tun::Tun, B: udp::UDP> PeerInner<T, B> {
|
impl<T: tun::Tun, B: udp::UDP> PeerInner<T, B> {
|
||||||
|
|
||||||
pub fn get_keepalive_interval(&self) -> u64 {
|
pub fn get_keepalive_interval(&self) -> u64 {
|
||||||
self.timers().keepalive_interval
|
self.timers().keepalive_interval
|
||||||
}
|
}
|
||||||
@@ -57,17 +56,19 @@ impl<T: tun::Tun, B: udp::UDP> PeerInner<T, B> {
|
|||||||
timers.send_persistent_keepalive.stop();
|
timers.send_persistent_keepalive.stop();
|
||||||
timers.zero_key_material.stop();
|
timers.zero_key_material.stop();
|
||||||
timers.new_handshake.stop();
|
timers.new_handshake.stop();
|
||||||
|
|
||||||
// reset all timer state
|
// reset all timer state
|
||||||
timers.handshake_attempts.store(0, Ordering::SeqCst);
|
timers.handshake_attempts.store(0, Ordering::SeqCst);
|
||||||
timers.sent_lastminute_handshake.store(false, Ordering::SeqCst);
|
timers
|
||||||
|
.sent_lastminute_handshake
|
||||||
|
.store(false, Ordering::SeqCst);
|
||||||
timers.need_another_keepalive.store(false, Ordering::SeqCst);
|
timers.need_another_keepalive.store(false, Ordering::SeqCst);
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn start_timers(&self) {
|
pub fn start_timers(&self) {
|
||||||
// take a write lock preventing simultaneous "stop_timers" call
|
// take a write lock preventing simultaneous "stop_timers" call
|
||||||
let mut timers = self.timers_mut();
|
let mut timers = self.timers_mut();
|
||||||
|
|
||||||
// set flag to reenable timer events
|
// set flag to reenable timer events
|
||||||
if timers.enabled {
|
if timers.enabled {
|
||||||
return;
|
return;
|
||||||
@@ -76,18 +77,20 @@ impl<T: tun::Tun, B: udp::UDP> PeerInner<T, B> {
|
|||||||
|
|
||||||
// start send_persistent_keepalive
|
// start send_persistent_keepalive
|
||||||
if timers.keepalive_interval > 0 {
|
if timers.keepalive_interval > 0 {
|
||||||
timers.send_persistent_keepalive.start(
|
timers
|
||||||
Duration::from_secs(timers.keepalive_interval)
|
.send_persistent_keepalive
|
||||||
);
|
.start(Duration::from_secs(timers.keepalive_interval));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/* should be called after an authenticated data packet is sent */
|
/* should be called after an authenticated data packet is sent */
|
||||||
pub fn timers_data_sent(&self) {
|
pub fn timers_data_sent(&self) {
|
||||||
let timers = self.timers();
|
let timers = self.timers();
|
||||||
if timers.enabled {
|
if timers.enabled {
|
||||||
timers.new_handshake.start(KEEPALIVE_TIMEOUT + REKEY_TIMEOUT);
|
timers
|
||||||
}
|
.new_handshake
|
||||||
|
.start(KEEPALIVE_TIMEOUT + REKEY_TIMEOUT);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/* should be called after an authenticated data packet is received */
|
/* should be called after an authenticated data packet is received */
|
||||||
@@ -139,7 +142,9 @@ impl<T: tun::Tun, B: udp::UDP> PeerInner<T, B> {
|
|||||||
if timers.enabled {
|
if timers.enabled {
|
||||||
timers.retransmit_handshake.stop();
|
timers.retransmit_handshake.stop();
|
||||||
timers.handshake_attempts.store(0, Ordering::SeqCst);
|
timers.handshake_attempts.store(0, Ordering::SeqCst);
|
||||||
timers.sent_lastminute_handshake.store(false, Ordering::SeqCst);
|
timers
|
||||||
|
.sent_lastminute_handshake
|
||||||
|
.store(false, Ordering::SeqCst);
|
||||||
*self.walltime_last_handshake.lock() = Some(SystemTime::now());
|
*self.walltime_last_handshake.lock() = Some(SystemTime::now());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -161,9 +166,9 @@ impl<T: tun::Tun, B: udp::UDP> PeerInner<T, B> {
|
|||||||
let timers = self.timers();
|
let timers = self.timers();
|
||||||
if timers.enabled && timers.keepalive_interval > 0 {
|
if timers.enabled && timers.keepalive_interval > 0 {
|
||||||
// push persistent_keepalive into the future
|
// push persistent_keepalive into the future
|
||||||
timers.send_persistent_keepalive.reset(Duration::from_secs(
|
timers
|
||||||
timers.keepalive_interval
|
.send_persistent_keepalive
|
||||||
));
|
.reset(Duration::from_secs(timers.keepalive_interval));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -179,7 +184,6 @@ impl<T: tun::Tun, B: udp::UDP> PeerInner<T, B> {
|
|||||||
if timers.enabled {
|
if timers.enabled {
|
||||||
timers.retransmit_handshake.reset(REKEY_TIMEOUT);
|
timers.retransmit_handshake.reset(REKEY_TIMEOUT);
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Called after a handshake worker sends a handshake initiation to the peer
|
/* Called after a handshake worker sends a handshake initiation to the peer
|
||||||
@@ -195,7 +199,7 @@ impl<T: tun::Tun, B: udp::UDP> PeerInner<T, B> {
|
|||||||
*self.last_handshake_sent.lock() = Instant::now();
|
*self.last_handshake_sent.lock() = Instant::now();
|
||||||
self.timers_any_authenticated_packet_traversal();
|
self.timers_any_authenticated_packet_traversal();
|
||||||
self.timers_any_authenticated_packet_sent();
|
self.timers_any_authenticated_packet_sent();
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn set_persistent_keepalive_interval(&self, secs: u64) {
|
pub fn set_persistent_keepalive_interval(&self, secs: u64) {
|
||||||
let mut timers = self.timers_mut();
|
let mut timers = self.timers_mut();
|
||||||
@@ -205,10 +209,12 @@ impl<T: tun::Tun, B: udp::UDP> PeerInner<T, B> {
|
|||||||
|
|
||||||
// stop the keepalive timer with the old interval
|
// stop the keepalive timer with the old interval
|
||||||
timers.send_persistent_keepalive.stop();
|
timers.send_persistent_keepalive.stop();
|
||||||
|
|
||||||
// restart the persistent_keepalive timer with the new interval
|
// restart the persistent_keepalive timer with the new interval
|
||||||
if secs > 0 && timers.enabled {
|
if secs > 0 && timers.enabled {
|
||||||
timers.send_persistent_keepalive.start(Duration::from_secs(secs));
|
timers
|
||||||
|
.send_persistent_keepalive
|
||||||
|
.start(Duration::from_secs(secs));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -220,7 +226,6 @@ impl<T: tun::Tun, B: udp::UDP> PeerInner<T, B> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
impl Timers {
|
impl Timers {
|
||||||
pub fn new<T, B>(runner: &Runner, running: bool, peer: Peer<T, B>) -> Timers
|
pub fn new<T, B>(runner: &Runner, running: bool, peer: Peer<T, B>) -> Timers
|
||||||
where
|
where
|
||||||
@@ -242,9 +247,12 @@ impl Timers {
|
|||||||
if !timers.enabled {
|
if !timers.enabled {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
// check if handshake attempts remaining
|
// check if handshake attempts remaining
|
||||||
let attempts = peer.timers().handshake_attempts.fetch_add(1, Ordering::SeqCst);
|
let attempts = peer
|
||||||
|
.timers()
|
||||||
|
.handshake_attempts
|
||||||
|
.fetch_add(1, Ordering::SeqCst);
|
||||||
if attempts > MAX_TIMER_HANDSHAKES {
|
if attempts > MAX_TIMER_HANDSHAKES {
|
||||||
debug!(
|
debug!(
|
||||||
"Handshake for peer {} did not complete after {} attempts, giving up",
|
"Handshake for peer {} did not complete after {} attempts, giving up",
|
||||||
@@ -257,8 +265,8 @@ impl Timers {
|
|||||||
} else {
|
} else {
|
||||||
debug!(
|
debug!(
|
||||||
"Handshake for {} did not complete after {} seconds, retrying (try {})",
|
"Handshake for {} did not complete after {} seconds, retrying (try {})",
|
||||||
peer,
|
peer,
|
||||||
REKEY_TIMEOUT.as_secs(),
|
REKEY_TIMEOUT.as_secs(),
|
||||||
attempts
|
attempts
|
||||||
);
|
);
|
||||||
timers.retransmit_handshake.reset(REKEY_TIMEOUT);
|
timers.retransmit_handshake.reset(REKEY_TIMEOUT);
|
||||||
@@ -287,7 +295,7 @@ impl Timers {
|
|||||||
runner.timer(move || {
|
runner.timer(move || {
|
||||||
debug!(
|
debug!(
|
||||||
"Retrying handshake with {} because we stopped hearing back after {} seconds",
|
"Retrying handshake with {} because we stopped hearing back after {} seconds",
|
||||||
peer,
|
peer,
|
||||||
(KEEPALIVE_TIMEOUT + REKEY_TIMEOUT).as_secs()
|
(KEEPALIVE_TIMEOUT + REKEY_TIMEOUT).as_secs()
|
||||||
);
|
);
|
||||||
peer.router.clear_src();
|
peer.router.clear_src();
|
||||||
@@ -307,9 +315,9 @@ impl Timers {
|
|||||||
if timers.enabled && timers.keepalive_interval > 0 {
|
if timers.enabled && timers.keepalive_interval > 0 {
|
||||||
peer.router.send_keepalive();
|
peer.router.send_keepalive();
|
||||||
timers.send_keepalive.stop();
|
timers.send_keepalive.stop();
|
||||||
timers.send_persistent_keepalive.start(Duration::from_secs(
|
timers
|
||||||
timers.keepalive_interval
|
.send_persistent_keepalive
|
||||||
));
|
.start(Duration::from_secs(timers.keepalive_interval));
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
},
|
},
|
||||||
@@ -318,7 +326,7 @@ impl Timers {
|
|||||||
|
|
||||||
pub fn dummy(runner: &Runner) -> Timers {
|
pub fn dummy(runner: &Runner) -> Timers {
|
||||||
Timers {
|
Timers {
|
||||||
enabled: false,
|
enabled: false,
|
||||||
keepalive_interval: 0,
|
keepalive_interval: 0,
|
||||||
need_another_keepalive: AtomicBool::new(false),
|
need_another_keepalive: AtomicBool::new(false),
|
||||||
sent_lastminute_handshake: AtomicBool::new(false),
|
sent_lastminute_handshake: AtomicBool::new(false),
|
||||||
@@ -344,9 +352,8 @@ impl<T: tun::Tun, B: udp::UDP> Callbacks for Events<T, B> {
|
|||||||
*/
|
*/
|
||||||
#[inline(always)]
|
#[inline(always)]
|
||||||
fn send(peer: &Self::Opaque, size: usize, sent: bool, keypair: &Arc<KeyPair>, counter: u64) {
|
fn send(peer: &Self::Opaque, size: usize, sent: bool, keypair: &Arc<KeyPair>, counter: u64) {
|
||||||
|
|
||||||
// update timers and stats
|
// update timers and stats
|
||||||
|
|
||||||
peer.timers_any_authenticated_packet_traversal();
|
peer.timers_any_authenticated_packet_traversal();
|
||||||
peer.timers_any_authenticated_packet_sent();
|
peer.timers_any_authenticated_packet_sent();
|
||||||
peer.tx_bytes.fetch_add(size as u64, Ordering::Relaxed);
|
peer.tx_bytes.fetch_add(size as u64, Ordering::Relaxed);
|
||||||
@@ -375,7 +382,6 @@ impl<T: tun::Tun, B: udp::UDP> Callbacks for Events<T, B> {
|
|||||||
*/
|
*/
|
||||||
#[inline(always)]
|
#[inline(always)]
|
||||||
fn recv(peer: &Self::Opaque, size: usize, sent: bool, keypair: &Arc<KeyPair>) {
|
fn recv(peer: &Self::Opaque, size: usize, sent: bool, keypair: &Arc<KeyPair>) {
|
||||||
|
|
||||||
// update timers and stats
|
// update timers and stats
|
||||||
|
|
||||||
peer.timers_any_authenticated_packet_traversal();
|
peer.timers_any_authenticated_packet_traversal();
|
||||||
@@ -386,13 +392,18 @@ impl<T: tun::Tun, B: udp::UDP> Callbacks for Events<T, B> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// keep_key_fresh
|
// keep_key_fresh
|
||||||
|
|
||||||
#[inline(always)]
|
#[inline(always)]
|
||||||
fn keep_key_fresh(keypair: &Arc<KeyPair>) -> bool {
|
fn keep_key_fresh(keypair: &Arc<KeyPair>) -> bool {
|
||||||
Instant::now() - keypair.birth > REJECT_AFTER_TIME - KEEPALIVE_TIMEOUT - REKEY_TIMEOUT
|
Instant::now() - keypair.birth > REJECT_AFTER_TIME - KEEPALIVE_TIMEOUT - REKEY_TIMEOUT
|
||||||
}
|
}
|
||||||
|
|
||||||
if keep_key_fresh(keypair) && !peer.timers().sent_lastminute_handshake.swap(true, Ordering::Acquire) {
|
if keep_key_fresh(keypair)
|
||||||
|
&& !peer
|
||||||
|
.timers()
|
||||||
|
.sent_lastminute_handshake
|
||||||
|
.swap(true, Ordering::Acquire)
|
||||||
|
{
|
||||||
peer.packet_send_queued_handshake_initiation(false);
|
peer.packet_send_queued_handshake_initiation(false);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -405,7 +416,7 @@ impl<T: tun::Tun, B: udp::UDP> Callbacks for Events<T, B> {
|
|||||||
*/
|
*/
|
||||||
#[inline(always)]
|
#[inline(always)]
|
||||||
fn need_key(peer: &Self::Opaque) {
|
fn need_key(peer: &Self::Opaque) {
|
||||||
peer.packet_send_queued_handshake_initiation(false);
|
peer.packet_send_queued_handshake_initiation(false);
|
||||||
}
|
}
|
||||||
|
|
||||||
#[inline(always)]
|
#[inline(always)]
|
||||||
|
|||||||
Reference in New Issue
Block a user