Work on Up/Down operation on WireGuard device

This commit is contained in:
Mathias Hall-Andersen
2019-11-04 13:19:27 +01:00
parent a0fa261a8a
commit 6ba40f17cb
8 changed files with 267 additions and 152 deletions

View File

@@ -1,7 +1,7 @@
use spin::Mutex; use spin::Mutex;
use std::net::{IpAddr, SocketAddr}; use std::net::{IpAddr, SocketAddr};
use std::sync::atomic::Ordering; use std::sync::atomic::Ordering;
use std::time::SystemTime; use std::time::{Duration, SystemTime};
use x25519_dalek::{PublicKey, StaticSecret}; use x25519_dalek::{PublicKey, StaticSecret};
use super::*; use super::*;
@@ -125,11 +125,8 @@ pub trait Configuration {
/// ///
/// - `peer': The public key of the peer /// - `peer': The public key of the peer
/// - `psk` /// - `psk`
fn set_persistent_keepalive_interval( fn set_persistent_keepalive_interval(&self, peer: &PublicKey, secs: u64)
&self, -> Option<ConfigError>;
peer: &PublicKey,
interval: usize,
) -> Option<ConfigError>;
/// Remove all allowed IPs from the peer /// Remove all allowed IPs from the peer
/// ///
@@ -254,11 +251,11 @@ impl<T: tun::Tun, B: bind::Platform> Configuration for WireguardConfig<T, B> {
fn set_persistent_keepalive_interval( fn set_persistent_keepalive_interval(
&self, &self,
peer: &PublicKey, peer: &PublicKey,
interval: usize, secs: u64,
) -> Option<ConfigError> { ) -> Option<ConfigError> {
match self.wireguard.lookup_peer(peer) { match self.wireguard.lookup_peer(peer) {
Some(peer) => { Some(peer) => {
peer.set_persistent_keepalive_interval(interval); peer.set_persistent_keepalive_interval(secs);
None None
} }
None => Some(ConfigError::NoSuchPeer), None => Some(ConfigError::NoSuchPeer),
@@ -292,7 +289,7 @@ impl<T: tun::Tun, B: bind::Platform> Configuration for WireguardConfig<T, B> {
// convert the system time to (secs, nano) since epoch // convert the system time to (secs, nano) since epoch
let last_handshake = (*p.walltime_last_handshake.lock()) let last_handshake = (*p.walltime_last_handshake.lock())
.duration_since(SystemTime::UNIX_EPOCH) .duration_since(SystemTime::UNIX_EPOCH)
.expect("There should be no earlier time"); .unwrap_or(Duration::from_secs(0)); // any time before epoch is mapped to epoch
// extract state into PeerState // extract state into PeerState
state.push(PeerState { state.push(PeerState {

View File

@@ -4,18 +4,15 @@ mod wireguard;
mod endpoint; mod endpoint;
mod handshake; mod handshake;
mod peer;
mod router; mod router;
mod types; mod types;
#[cfg(test)] #[cfg(test)]
mod tests; mod tests;
/// The WireGuard sub-module contains a pure, configurable implementation of WireGuard. pub use peer::Peer;
/// The implementation is generic over: pub use wireguard::Wireguard;
///
/// - TUN type, specifying how packets are received on the interface side: a reader/writer and MTU reporting interface.
/// - Bind type, specifying how WireGuard messages are sent/received from the internet and what constitutes an "endpoint"
pub use wireguard::{Peer, Wireguard};
#[cfg(test)] #[cfg(test)]
pub use types::dummy_keypair; pub use types::dummy_keypair;
@@ -24,4 +21,6 @@ pub use types::dummy_keypair;
use super::platform::dummy; use super::platform::dummy;
use super::platform::{bind, tun, Endpoint}; use super::platform::{bind, tun, Endpoint};
use types::{Key, KeyPair}; use peer::PeerInner;
use types::KeyPair;
use wireguard::HandshakeJob;

111
src/wireguard/peer.rs Normal file
View File

@@ -0,0 +1,111 @@
use super::constants::*;
use super::router;
use super::timers::{Events, Timers};
use super::HandshakeJob;
use super::bind::Bind;
use super::bind::Reader as BindReader;
use super::tun::{Reader, Tun};
use std::fmt;
use std::ops::Deref;
use std::sync::atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering};
use std::sync::Arc;
use std::time::{Instant, SystemTime};
use spin::{Mutex, RwLock, RwLockReadGuard, RwLockWriteGuard};
use crossbeam_channel::Sender;
use x25519_dalek::PublicKey;
pub struct Peer<T: Tun, B: Bind> {
pub router: Arc<router::Peer<B::Endpoint, Events<T, B>, T::Writer, B::Writer>>,
pub state: Arc<PeerInner<B>>,
}
pub struct PeerInner<B: Bind> {
// internal id (for logging)
pub id: u64,
// handshake state
pub walltime_last_handshake: Mutex<SystemTime>,
pub last_handshake_sent: Mutex<Instant>, // instant for last handshake
pub handshake_queued: AtomicBool, // is a handshake job currently queued for the peer?
pub queue: Mutex<Sender<HandshakeJob<B::Endpoint>>>, // handshake queue
// stats and configuration
pub pk: PublicKey, // public key, DISCUSS: avoid this. TODO: remove
pub keepalive_interval: AtomicU64, // keepalive interval
pub rx_bytes: AtomicU64, // received bytes
pub tx_bytes: AtomicU64, // transmitted bytes
// timer model
pub timers: RwLock<Timers>,
}
impl<T: Tun, B: Bind> Clone for Peer<T, B> {
fn clone(&self) -> Peer<T, B> {
Peer {
router: self.router.clone(),
state: self.state.clone(),
}
}
}
impl<B: Bind> PeerInner<B> {
#[inline(always)]
pub fn timers(&self) -> RwLockReadGuard<Timers> {
self.timers.read()
}
#[inline(always)]
pub fn timers_mut(&self) -> RwLockWriteGuard<Timers> {
self.timers.write()
}
}
impl<T: Tun, B: Bind> fmt::Display for Peer<T, B> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "peer(id = {})", self.id)
}
}
impl<T: Tun, B: Bind> Deref for Peer<T, B> {
type Target = PeerInner<B>;
fn deref(&self) -> &Self::Target {
&self.state
}
}
impl<T: Tun, B: Bind> Peer<T, B> {
pub fn down(&self) {
self.stop_timers();
}
pub fn up(&self) {}
}
impl<B: Bind> PeerInner<B> {
/* Queue a handshake request for the parallel workers
* (if one does not already exist)
*
* The function is ratelimited.
*/
pub fn packet_send_handshake_initiation(&self) {
// the function is rate limited
{
let mut lhs = self.last_handshake_sent.lock();
if lhs.elapsed() < REKEY_TIMEOUT {
return;
}
*lhs = Instant::now();
}
// create a new handshake job for the peer
if !self.handshake_queued.swap(true, Ordering::SeqCst) {
self.queue.lock().send(HandshakeJob::New(self.pk)).unwrap();
}
}
}

View File

@@ -27,6 +27,8 @@ use super::route::get_route;
use super::super::{bind, tun, Endpoint, KeyPair}; use super::super::{bind, tun, Endpoint, KeyPair};
pub struct DeviceInner<E: Endpoint, C: Callbacks, T: tun::Writer, B: bind::Writer<E>> { pub struct DeviceInner<E: Endpoint, C: Callbacks, T: tun::Writer, B: bind::Writer<E>> {
pub enabled: AtomicBool,
// inbound writer (TUN) // inbound writer (TUN)
pub inbound: T, pub inbound: T,
@@ -91,6 +93,7 @@ impl<E: Endpoint, C: Callbacks, T: tun::Writer, B: bind::Writer<E>> Device<E, C,
// allocate shared device state // allocate shared device state
let inner = DeviceInner { let inner = DeviceInner {
inbound: tun, inbound: tun,
enabled: AtomicBool::new(true),
outbound: RwLock::new(None), outbound: RwLock::new(None),
queues: Mutex::new(Vec::with_capacity(num_workers)), queues: Mutex::new(Vec::with_capacity(num_workers)),
queue_next: AtomicUsize::new(0), queue_next: AtomicUsize::new(0),
@@ -114,6 +117,16 @@ impl<E: Endpoint, C: Callbacks, T: tun::Writer, B: bind::Writer<E>> Device<E, C,
} }
} }
/// Brings the router down.
/// When the router is brought down it:
/// - Prevents transmission of outbound messages.
/// - Erases all key state (key-wheels) of all peers
pub fn down(&self) {}
/// Brints the router up
/// When the router is brought up it enables the transmission of outbound messages.
pub fn up(&self) {}
/// A new secret key has been set for the device. /// A new secret key has been set for the device.
/// According to WireGuard semantics, this should cause all "sending" keys to be discarded. /// According to WireGuard semantics, this should cause all "sending" keys to be discarded.
pub fn new_sk(&self) {} pub fn new_sk(&self) {}

View File

@@ -1,5 +1,6 @@
use std::mem; use std::mem;
use std::net::{IpAddr, SocketAddr}; use std::net::{IpAddr, SocketAddr};
use std::ops::Deref;
use std::sync::atomic::AtomicBool; use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering; use std::sync::atomic::Ordering;
use std::sync::mpsc::{sync_channel, SyncSender}; use std::sync::mpsc::{sync_channel, SyncSender};
@@ -11,7 +12,6 @@ use log::debug;
use spin::Mutex; use spin::Mutex;
use treebitmap::address::Address; use treebitmap::address::Address;
use treebitmap::IpLookupTable; use treebitmap::IpLookupTable;
use zerocopy::LayoutVerified;
use super::super::constants::*; use super::super::constants::*;
use super::super::{bind, tun, Endpoint, KeyPair}; use super::super::{bind, tun, Endpoint, KeyPair};
@@ -55,6 +55,14 @@ pub struct Peer<E: Endpoint, C: Callbacks, T: tun::Writer, B: bind::Writer<E>> {
thread_inbound: Option<thread::JoinHandle<()>>, thread_inbound: Option<thread::JoinHandle<()>>,
} }
impl<E: Endpoint, C: Callbacks, T: tun::Writer, B: bind::Writer<E>> Deref for Peer<E, C, T, B> {
type Target = Arc<PeerInner<E, C, T, B>>;
fn deref(&self) -> &Self::Target {
&self.state
}
}
fn treebit_list<A, R, E: Endpoint, C: Callbacks, T: tun::Writer, B: bind::Writer<E>>( fn treebit_list<A, R, E: Endpoint, C: Callbacks, T: tun::Writer, B: bind::Writer<E>>(
peer: &Arc<PeerInner<E, C, T, B>>, peer: &Arc<PeerInner<E, C, T, B>>,
table: &spin::RwLock<IpLookupTable<A, Arc<PeerInner<E, C, T, B>>>>, table: &spin::RwLock<IpLookupTable<A, Arc<PeerInner<E, C, T, B>>>>,
@@ -199,7 +207,7 @@ pub fn new_peer<E: Endpoint, C: Callbacks, T: tun::Writer, B: bind::Writer<E>>(
let thread_inbound = { let thread_inbound = {
let peer = peer.clone(); let peer = peer.clone();
let device = device.clone(); let device = device.clone();
thread::spawn(move || worker_outbound(device, peer, out_rx)) thread::spawn(move || worker_outbound(peer, out_rx))
}; };
// spawn inbound thread // spawn inbound thread
@@ -217,6 +225,36 @@ pub fn new_peer<E: Endpoint, C: Callbacks, T: tun::Writer, B: bind::Writer<E>>(
} }
impl<E: Endpoint, C: Callbacks, T: tun::Writer, B: bind::Writer<E>> PeerInner<E, C, T, B> { impl<E: Endpoint, C: Callbacks, T: tun::Writer, B: bind::Writer<E>> PeerInner<E, C, T, B> {
/// Send a raw message to the peer (used for handshake messages)
///
/// # Arguments
///
/// - `msg`, message body to send to peer
///
/// # Returns
///
/// Unit if packet was sent, or an error indicating why sending failed
pub fn send(&self, msg: &[u8]) -> Result<(), RouterError> {
debug!("peer.send");
// check if device is enabled
if !self.device.enabled.load(Ordering::Acquire) {
return Ok(());
}
// send to endpoint (if known)
match self.endpoint.lock().as_ref() {
Some(endpoint) => self
.device
.outbound
.read()
.as_ref()
.ok_or(RouterError::SendError)
.and_then(|w| w.write(msg, endpoint).map_err(|_| RouterError::SendError)),
None => Err(RouterError::NoEndpoint),
}
}
fn send_staged(&self) -> bool { fn send_staged(&self) -> bool {
debug!("peer.send_staged"); debug!("peer.send_staged");
let mut sent = false; let mut sent = false;
@@ -498,7 +536,7 @@ impl<E: Endpoint, C: Callbacks, T: tun::Writer, B: bind::Writer<E>> Peer<E, C, T
pub fn send_keepalive(&self) -> bool { pub fn send_keepalive(&self) -> bool {
debug!("peer.send_keepalive"); debug!("peer.send_keepalive");
self.state.send_raw(vec![0u8; SIZE_MESSAGE_PREFIX]) self.send_raw(vec![0u8; SIZE_MESSAGE_PREFIX])
} }
/// Map a subnet to the peer /// Map a subnet to the peer
@@ -565,30 +603,6 @@ impl<E: Endpoint, C: Callbacks, T: tun::Writer, B: bind::Writer<E>> Peer<E, C, T
treebit_remove(self, &self.state.device.ipv6); treebit_remove(self, &self.state.device.ipv6);
} }
/// Send a raw message to the peer (used for handshake messages)
///
/// # Arguments
///
/// - `msg`, message body to send to peer
///
/// # Returns
///
/// Unit if packet was sent, or an error indicating why sending failed
pub fn send(&self, msg: &[u8]) -> Result<(), RouterError> {
debug!("peer.send");
let inner = &self.state;
match inner.endpoint.lock().as_ref() {
Some(endpoint) => inner
.device
.outbound
.read()
.as_ref()
.ok_or(RouterError::SendError)
.and_then(|w| w.write(msg, endpoint).map_err(|_| RouterError::SendError)),
None => Err(RouterError::NoEndpoint),
}
}
pub fn clear_src(&self) { pub fn clear_src(&self) {
(*self.state.endpoint.lock()) (*self.state.endpoint.lock())
.as_mut() .as_mut()

View File

@@ -141,8 +141,7 @@ pub fn worker_inbound<E: Endpoint, C: Callbacks, T: tun::Writer, B: bind::Writer
/* TODO: Replace with run-queue /* TODO: Replace with run-queue
*/ */
pub fn worker_outbound<E: Endpoint, C: Callbacks, T: tun::Writer, B: bind::Writer<E>>( pub fn worker_outbound<E: Endpoint, C: Callbacks, T: tun::Writer, B: bind::Writer<E>>(
device: Arc<DeviceInner<E, C, T, B>>, // related device peer: Arc<PeerInner<E, C, T, B>>,
peer: Arc<PeerInner<E, C, T, B>>, // related peer
receiver: Receiver<JobOutbound>, receiver: Receiver<JobOutbound>,
) { ) {
loop { loop {
@@ -160,23 +159,8 @@ pub fn worker_outbound<E: Endpoint, C: Callbacks, T: tun::Writer, B: bind::Write
.map(|buf| { .map(|buf| {
debug!("outbound worker: job complete"); debug!("outbound worker: job complete");
// write to UDP bind // send to peer
let xmit = if let Some(dst) = peer.endpoint.lock().as_ref() { let xmit = peer.send(&buf.msg[..]).is_ok();
let send: &Option<B> = &*device.outbound.read();
if let Some(writer) = send.as_ref() {
match writer.write(&buf.msg[..], dst) {
Err(e) => {
debug!("failed to send outbound packet: {:?}", e);
false
}
Ok(_) => true,
}
} else {
false
}
} else {
false
};
// trigger callback // trigger callback
C::send(&peer.opaque, buf.msg.len(), xmit, &buf.keypair, buf.counter); C::send(&peer.opaque, buf.msg.len(), xmit, &buf.keypair, buf.counter);

View File

@@ -3,17 +3,18 @@ use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::Arc; use std::sync::Arc;
use std::time::{Duration, Instant, SystemTime}; use std::time::{Duration, Instant, SystemTime};
use log::{debug, info}; use log::debug;
use hjul::{Runner, Timer}; use hjul::{Runner, Timer};
use super::constants::*; use super::constants::*;
use super::router::{message_data_len, Callbacks}; use super::router::{message_data_len, Callbacks};
use super::wireguard::{Peer, PeerInner}; use super::{Peer, PeerInner};
use super::{bind, tun}; use super::{bind, tun};
use super::types::KeyPair; use super::types::KeyPair;
pub struct Timers { pub struct Timers {
enabled: bool,
handshake_attempts: AtomicUsize, handshake_attempts: AtomicUsize,
sent_lastminute_handshake: AtomicBool, sent_lastminute_handshake: AtomicBool,
need_another_keepalive: AtomicBool, need_another_keepalive: AtomicBool,
@@ -33,6 +34,48 @@ impl Timers {
} }
impl<B: bind::Bind> PeerInner<B> { impl<B: bind::Bind> PeerInner<B> {
pub fn stop_timers(&self) {
// take a write lock preventing simultaneous timer events or "start_timers" call
let mut timers = self.timers_mut();
// set flag to prevent future timer events
if !timers.enabled {
return;
}
timers.enabled = false;
// stop all pending timers
timers.retransmit_handshake.stop();
timers.send_keepalive.stop();
timers.send_persistent_keepalive.stop();
timers.zero_key_material.stop();
timers.new_handshake.stop();
// reset all timer state
timers.handshake_attempts.store(0, Ordering::SeqCst);
timers.sent_lastminute_handshake.store(false, Ordering::SeqCst);
timers.need_another_keepalive.store(false, Ordering::SeqCst);
}
pub fn start_timers(&self) {
// take a write lock preventing simultaneous "stop_timers" call
let mut timers = self.timers_mut();
// set flag to renable timer events
if timers.enabled {
return;
}
timers.enabled = true;
// start send_persistent_keepalive
let interval = self.keepalive_interval.load(Ordering::Acquire);
if interval > 0 {
timers.send_persistent_keepalive.start(
Duration::from_secs(interval)
);
}
}
/* should be called after an authenticated data packet is sent */ /* should be called after an authenticated data packet is sent */
pub fn timers_data_sent(&self) { pub fn timers_data_sent(&self) {
self.timers() self.timers()
@@ -95,7 +138,7 @@ impl<B: bind::Bind> PeerInner<B> {
* keepalive, data, or handshake is sent, or after one is received. * keepalive, data, or handshake is sent, or after one is received.
*/ */
pub fn timers_any_authenticated_packet_traversal(&self) { pub fn timers_any_authenticated_packet_traversal(&self) {
let keepalive = self.keepalive.load(Ordering::Acquire); let keepalive = self.keepalive_interval.load(Ordering::Acquire);
if keepalive > 0 { if keepalive > 0 {
// push persistent_keepalive into the future // push persistent_keepalive into the future
self.timers() self.timers()
@@ -125,9 +168,9 @@ impl<B: bind::Bind> PeerInner<B> {
} }
pub fn set_persistent_keepalive_interval(&self, interval: usize) { pub fn set_persistent_keepalive_interval(&self, interval: u64) {
self.timers().send_persistent_keepalive.stop(); self.timers().send_persistent_keepalive.stop();
self.keepalive.store(interval, Ordering::SeqCst); self.keepalive_interval.store(interval, Ordering::SeqCst);
if interval > 0 { if interval > 0 {
self.timers() self.timers()
.send_persistent_keepalive .send_persistent_keepalive
@@ -154,6 +197,7 @@ impl Timers {
{ {
// create a timer instance for the provided peer // create a timer instance for the provided peer
Timers { Timers {
enabled: true,
need_another_keepalive: AtomicBool::new(false), need_another_keepalive: AtomicBool::new(false),
sent_lastminute_handshake: AtomicBool::new(false), sent_lastminute_handshake: AtomicBool::new(false),
handshake_attempts: AtomicUsize::new(0), handshake_attempts: AtomicUsize::new(0),
@@ -213,7 +257,7 @@ impl Timers {
send_persistent_keepalive: { send_persistent_keepalive: {
let peer = peer.clone(); let peer = peer.clone();
runner.timer(move || { runner.timer(move || {
let keepalive = peer.state.keepalive.load(Ordering::Acquire); let keepalive = peer.state.keepalive_interval.load(Ordering::Acquire);
if keepalive > 0 { if keepalive > 0 {
peer.router.send_keepalive(); peer.router.send_keepalive();
peer.timers().send_keepalive.stop(); peer.timers().send_keepalive.stop();
@@ -235,6 +279,7 @@ impl Timers {
pub fn dummy(runner: &Runner) -> Timers { pub fn dummy(runner: &Runner) -> Timers {
Timers { Timers {
enabled: false,
need_another_keepalive: AtomicBool::new(false), need_another_keepalive: AtomicBool::new(false),
sent_lastminute_handshake: AtomicBool::new(false), sent_lastminute_handshake: AtomicBool::new(false),
handshake_attempts: AtomicUsize::new(0), handshake_attempts: AtomicUsize::new(0),

View File

@@ -2,6 +2,7 @@ use super::constants::*;
use super::handshake; use super::handshake;
use super::router; use super::router;
use super::timers::{Events, Timers}; use super::timers::{Events, Timers};
use super::{Peer, PeerInner};
use super::bind::Reader as BindReader; use super::bind::Reader as BindReader;
use super::bind::{Bind, Writer}; use super::bind::{Bind, Writer};
@@ -22,7 +23,7 @@ use std::collections::HashMap;
use log::debug; use log::debug;
use rand::rngs::OsRng; use rand::rngs::OsRng;
use rand::Rng; use rand::Rng;
use spin::{Mutex, RwLock, RwLockReadGuard}; use spin::{Mutex, RwLock};
use byteorder::{ByteOrder, LittleEndian}; use byteorder::{ByteOrder, LittleEndian};
use crossbeam_channel::{bounded, Sender}; use crossbeam_channel::{bounded, Sender};
@@ -32,45 +33,19 @@ const SIZE_HANDSHAKE_QUEUE: usize = 128;
const THRESHOLD_UNDER_LOAD: usize = SIZE_HANDSHAKE_QUEUE / 4; const THRESHOLD_UNDER_LOAD: usize = SIZE_HANDSHAKE_QUEUE / 4;
const DURATION_UNDER_LOAD: Duration = Duration::from_millis(10_000); const DURATION_UNDER_LOAD: Duration = Duration::from_millis(10_000);
pub struct Peer<T: Tun, B: Bind> {
pub router: Arc<router::Peer<B::Endpoint, Events<T, B>, T::Writer, B::Writer>>,
pub state: Arc<PeerInner<B>>,
}
pub struct PeerInner<B: Bind> {
// internal id (for logging)
pub id: u64,
// handshake state
pub walltime_last_handshake: Mutex<SystemTime>,
pub last_handshake_sent: Mutex<Instant>, // instant for last handshake
pub handshake_queued: AtomicBool, // is a handshake job currently queued for the peer?
pub queue: Mutex<Sender<HandshakeJob<B::Endpoint>>>, // handshake queue
// stats and configuration
pub pk: PublicKey, // public key, DISCUSS: avoid this. TODO: remove
pub keepalive: AtomicUsize, // keepalive interval
pub rx_bytes: AtomicU64, // received bytes
pub tx_bytes: AtomicU64, // transmitted bytes
// timer model
pub timers: RwLock<Timers>,
}
pub struct WireguardInner<T: Tun, B: Bind> { pub struct WireguardInner<T: Tun, B: Bind> {
// identifier (for logging) // identifier (for logging)
id: u32, id: u32,
start: Instant, start: Instant,
// provides access to the MTU value of the tun device // provides access to the MTU value of the tun device
// (otherwise owned solely by the router and a dedicated read IO thread)
mtu: T::MTU, mtu: T::MTU,
send: RwLock<Option<B::Writer>>, send: RwLock<Option<B::Writer>>,
// identify and configuration map // identify and configuration map
peers: RwLock<HashMap<[u8; 32], Peer<T, B>>>, peers: RwLock<HashMap<[u8; 32], Peer<T, B>>>,
// cryptkey router // cryptokey router
router: router::Device<B::Endpoint, Events<T, B>, T::Writer, B::Writer>, router: router::Device<B::Endpoint, Events<T, B>, T::Writer, B::Writer>,
// handshake related state // handshake related state
@@ -90,66 +65,12 @@ pub struct WireguardHandle<T: Tun, B: Bind> {
inner: Arc<WireguardInner<T, B>>, inner: Arc<WireguardInner<T, B>>,
} }
impl<T: Tun, B: Bind> Clone for Peer<T, B> {
fn clone(&self) -> Peer<T, B> {
Peer {
router: self.router.clone(),
state: self.state.clone(),
}
}
}
impl<B: Bind> PeerInner<B> {
#[inline(always)]
pub fn timers(&self) -> RwLockReadGuard<Timers> {
self.timers.read()
}
}
impl<T: Tun, B: Bind> fmt::Display for Peer<T, B> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "peer(id = {})", self.id)
}
}
impl<T: Tun, B: Bind> fmt::Display for WireguardInner<T, B> { impl<T: Tun, B: Bind> fmt::Display for WireguardInner<T, B> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "wireguard({:x})", self.id) write!(f, "wireguard({:x})", self.id)
} }
} }
impl<T: Tun, B: Bind> Deref for Peer<T, B> {
type Target = PeerInner<B>;
fn deref(&self) -> &Self::Target {
&self.state
}
}
impl<B: Bind> PeerInner<B> {
/* Queue a handshake request for the parallel workers
* (if one does not already exist)
*
* The function is ratelimited.
*/
pub fn packet_send_handshake_initiation(&self) {
// the function is rate limited
{
let mut lhs = self.last_handshake_sent.lock();
if lhs.elapsed() < REKEY_TIMEOUT {
return;
}
*lhs = Instant::now();
}
// create a new handshake job for the peer
if !self.handshake_queued.swap(true, Ordering::SeqCst) {
self.queue.lock().send(HandshakeJob::New(self.pk)).unwrap();
}
}
}
struct Handshake { struct Handshake {
device: handshake::Device, device: handshake::Device,
active: bool, active: bool,
@@ -196,6 +117,37 @@ const fn padding(size: usize, mtu: usize) -> usize {
} }
impl<T: Tun, B: Bind> Wireguard<T, B> { impl<T: Tun, B: Bind> Wireguard<T, B> {
/// Brings the WireGuard device down.
/// Usually called when the associated interface is brought down.
///
/// This stops any further action/timer on any peer
/// and prevents transmission of further messages,
/// however the device retrains its state.
///
/// The instance will continue to consume and discard messages
/// on both ends of the device.
pub fn down(&self) {
// ensure exclusive access (to avoid race with "up" call)
let peers = self.peers.write();
// set all peers down (stops timers)
for peer in peers.values() {
peer.down();
}
}
/// Brings the WireGuard device up.
/// Usually called when the associated interface is brought up.
pub fn up(&self) {
// ensure exclusive access (to avoid race with "down" call)
let peers = self.peers.write();
// set all peers up (restarts timers)
for peer in peers.values() {
peer.up();
}
}
pub fn clear_peers(&self) { pub fn clear_peers(&self) {
self.state.peers.write().clear(); self.state.peers.write().clear();
} }
@@ -263,7 +215,7 @@ impl<T: Tun, B: Bind> Wireguard<T, B> {
last_handshake_sent: Mutex::new(self.state.start - TIME_HORIZON), last_handshake_sent: Mutex::new(self.state.start - TIME_HORIZON),
handshake_queued: AtomicBool::new(false), handshake_queued: AtomicBool::new(false),
queue: Mutex::new(self.state.queue.lock().clone()), queue: Mutex::new(self.state.queue.lock().clone()),
keepalive: AtomicUsize::new(0), keepalive_interval: AtomicU64::new(0), // disabled
rx_bytes: AtomicU64::new(0), rx_bytes: AtomicU64::new(0),
tx_bytes: AtomicU64::new(0), tx_bytes: AtomicU64::new(0),
timers: RwLock::new(Timers::dummy(&self.runner)), timers: RwLock::new(Timers::dummy(&self.runner)),