Added profiler feature

This commit is contained in:
Mathias Hall-Andersen
2019-12-14 13:37:51 +01:00
parent 6566796387
commit e0db9861bc
12 changed files with 181 additions and 46 deletions

View File

@@ -2,6 +2,19 @@
#![feature(weak_into_raw)]
#![allow(dead_code)]
#[cfg(feature = "profiler")]
extern crate cpuprofiler;
#[cfg(feature = "profiler")]
use cpuprofiler::PROFILER;
#[cfg(feature = "profiler")]
use libc::atexit;
mod configuration;
mod platform;
mod wireguard;
use log;
use daemonize::Daemonize;
@@ -10,18 +23,47 @@ use std::env;
use std::process::exit;
use std::thread;
mod configuration;
mod platform;
mod wireguard;
use configuration::Configuration;
use platform::tun::{PlatformTun, Status};
use platform::uapi::{BindUAPI, PlatformUAPI};
use platform::*;
// destructor which stops the profiler upon program exit.
#[cfg(feature = "profiler")]
pub extern "C" fn dtor_profiler_stop() {
}
#[cfg(feature = "profiler")]
fn profiler_stop() {
PROFILER.lock().unwrap().stop().unwrap();
}
#[cfg(not(feature = "profiler"))]
fn profiler_stop() {}
#[cfg(feature = "profiler")]
fn profiler_start(name: &str) {
use std::path::Path;
// find first available path to save profiler output
let mut n = 0;
loop {
let path = format!("./{}-{}.profile", name, n);
if !Path::new(path.as_str()).exists() {
println!("Starting profiler: {}", path);
PROFILER.lock().unwrap().start(path).unwrap();
unsafe {
assert_eq!(atexit(dtor_profiler_stop), 0);
}
break;
};
n += 1;
}
}
fn main() {
// parse commandline arguments
// parse command line arguments
let mut name = None;
let mut drop_privileges = true;
let mut foreground = false;
@@ -82,6 +124,10 @@ fn main() {
// drop privileges
if drop_privileges {}
// start profiler (if enabled)
#[cfg(feature = "profiler")]
profiler_start(name.as_str());
// create WireGuard device
let wg: wireguard::Wireguard<plt::Tun, plt::UDP> = wireguard::Wireguard::new(writer);
@@ -104,6 +150,7 @@ fn main() {
match status.event() {
Err(e) => {
log::info!("Tun device error {}", e);
profiler_stop();
exit(0);
}
Ok(tun::TunEvent::Up(mtu)) => {
@@ -134,6 +181,7 @@ fn main() {
// start UAPI server
thread::spawn(move || loop {
// accept and handle UAPI config connections
match uapi.connect() {
Ok(mut stream) => {
let cfg = cfg.clone();
@@ -146,8 +194,13 @@ fn main() {
break;
}
}
// exit
profiler_stop();
exit(0);
});
// block until all tun readers closed
wait.wait();
profiler_stop();
}

View File

@@ -23,4 +23,4 @@ pub const MESSAGE_PADDING_MULTIPLE: usize = 16;
* used in places to avoid Option<Instant> by instead using a long "expired" Instant:
* (Instant::now() - TIME_HORIZON)
*/
pub const TIME_HORIZON: Duration = Duration::from_secs(3600 * 24);
pub const TIME_HORIZON: Duration = Duration::from_secs(60 * 60 * 24);

View File

@@ -28,9 +28,9 @@ pub struct PeerInner<T: Tun, B: UDP> {
pub wg: Arc<WireguardInner<T, B>>,
// handshake state
pub walltime_last_handshake: Mutex<Option<SystemTime>>,
pub last_handshake_sent: Mutex<Instant>, // instant for last handshake
pub handshake_queued: AtomicBool, // is a handshake job currently queued for the peer?
pub walltime_last_handshake: Mutex<Option<SystemTime>>, // walltime for last handshake (for UAPI status)
pub last_handshake_sent: Mutex<Instant>, // instant for last handshake
pub handshake_queued: AtomicBool, // is a handshake job currently queued for the peer?
// stats and configuration
pub pk: PublicKey, // public key, DISCUSS: avoid this. TODO: remove

View File

@@ -21,7 +21,7 @@ impl<T> ParallelQueue<T> {
///
/// # Arguments
///
/// - `queues`: number of readers/writers
/// - `queues`: number of readers
/// - `capacity`: capacity of each internal queue
///
pub fn new(queues: usize, capacity: usize) -> (Self, Vec<Receiver<T>>) {

View File

@@ -4,4 +4,6 @@ pub const MAX_STAGED_PACKETS: usize = 128;
// performance constants
pub const WORKER_QUEUE_SIZE: usize = MAX_STAGED_PACKETS;
pub const PARALLEL_QUEUE_SIZE: usize = MAX_STAGED_PACKETS;
pub const INORDER_QUEUE_SIZE: usize = PARALLEL_QUEUE_SIZE;
pub const MAX_INORDER_CONSUME: usize = INORDER_QUEUE_SIZE;

View File

@@ -1,3 +1,4 @@
use super::constants::MAX_INORDER_CONSUME;
use super::device::DecryptionState;
use super::device::Device;
use super::messages::TransportHeader;
@@ -185,6 +186,7 @@ pub fn sequential<E: Endpoint, C: Callbacks, T: tun::Writer, B: udp::Writer<E>>(
// handle message from the peers inbound queue
device.run_inbound.run(|peer| {
peer.inbound.handle(|body| work(&peer, body));
peer.inbound
.handle(|body| work(&peer, body), MAX_INORDER_CONSUME)
});
}

View File

@@ -1,3 +1,4 @@
use super::constants::MAX_INORDER_CONSUME;
use super::device::Device;
use super::messages::{TransportHeader, TYPE_TRANSPORT};
use super::peer::Peer;
@@ -88,20 +89,23 @@ pub fn sequential<E: Endpoint, C: Callbacks, T: tun::Writer, B: udp::Writer<E>>(
device: Device<E, C, T, B>,
) {
device.run_outbound.run(|peer| {
peer.outbound.handle(|body| {
log::trace!("worker, sequential section, obtained job");
peer.outbound.handle(
|body| {
log::trace!("worker, sequential section, obtained job");
// send to peer
let xmit = peer.send(&body.msg[..]).is_ok();
// send to peer
let xmit = peer.send(&body.msg[..]).is_ok();
// trigger callback
C::send(
&peer.opaque,
body.msg.len(),
xmit,
&body.keypair,
body.counter,
);
});
// trigger callback
C::send(
&peer.opaque,
body.msg.len(),
xmit,
&body.keypair,
body.counter,
);
},
MAX_INORDER_CONSUME,
)
});
}

View File

@@ -4,10 +4,9 @@ use std::mem;
use std::sync::mpsc::Receiver;
use std::sync::Arc;
use super::constants::INORDER_QUEUE_SIZE;
use super::runq::{RunQueue, ToKey};
const INORDER_QUEUE_SIZE: usize = 64;
pub struct InnerJob<P, B> {
// peer (used by worker to schedule/handle inorder queue),
// when the peer is None, the job is complete
@@ -52,28 +51,50 @@ pub struct InorderQueue<P, B> {
}
impl<P, B> InorderQueue<P, B> {
pub fn send(&self, job: Job<P, B>) -> bool {
self.queue.lock().push_back(job).is_ok()
}
pub fn new() -> InorderQueue<P, B> {
InorderQueue {
queue: Mutex::new(ArrayDeque::new()),
}
}
/// Add a new job to the in-order queue
///
/// # Arguments
///
/// - `job`: The job added to the back of the queue
///
/// # Returns
///
/// True if the element was added,
/// false to indicate that the queue is full.
pub fn send(&self, job: Job<P, B>) -> bool {
self.queue.lock().push_back(job).is_ok()
}
/// Consume completed jobs from the in-order queue
///
/// # Arguments
///
/// - `f`: function to apply to the body of each jobof each job.
/// - `limit`: maximum number of jobs to handle before returning
///
/// # Returns
///
/// A boolean indicating if the limit was reached:
/// true indicating that the limit was reached,
/// while false implies that the queue is empty or an uncompleted job was reached.
#[inline(always)]
pub fn handle<F: Fn(&mut B)>(&self, f: F) {
pub fn handle<F: Fn(&mut B)>(&self, f: F, mut limit: usize) -> bool {
// take the mutex
let mut queue = self.queue.lock();
loop {
while limit > 0 {
// attempt to extract front element
let front = queue.pop_front();
let elem = match front {
Some(elem) => elem,
_ => {
return;
return false;
}
};
@@ -90,13 +111,17 @@ impl<P, B> InorderQueue<P, B> {
// job not complete yet, return job to front
if ret {
queue.push_front(elem).unwrap();
return;
return false;
}
limit -= 1;
}
// did not complete all jobs
true
}
}
/// Allows easy construction of a semi-parallel worker.
/// Allows easy construction of a parallel worker.
/// Applicable for both decryption and encryption workers.
#[inline(always)]
pub fn worker_parallel<

View File

@@ -58,7 +58,21 @@ impl<T: ToKey> RunQueue<T> {
}
}
pub fn run<F: Fn(&T) -> ()>(&self, f: F) {
/// Run (consume from) the run queue using the provided function.
/// The function should return wheter the given element should be rescheduled.
///
/// # Arguments
///
/// - `f` : function to apply to every element
///
/// # Note
///
/// The function f may be called again even when the element was not inserted back in to the
/// queue since the last applciation and no rescheduling was requested.
///
/// This happens then the function handles all work for T,
/// but T is added to the run queue while the function is running.
pub fn run<F: Fn(&T) -> bool>(&self, f: F) {
let mut inner = self.inner.lock().unwrap();
loop {
// fetch next element
@@ -86,10 +100,16 @@ impl<T: ToKey> RunQueue<T> {
mem::drop(inner); // drop guard
// handle element
f(&elem);
let rerun = f(&elem);
// retake lock and check if should be added back to queue
// if the function requested a re-run add the element to the back of the queue
inner = self.inner.lock().unwrap();
if rerun {
inner.queue.push_back(elem);
continue;
}
// otherwise check if new requests have come in since we ran the function
match inner.members.entry(key) {
Entry::Occupied(occ) => {
if *occ.get() == old_n {
@@ -111,7 +131,6 @@ impl<T: ToKey> RunQueue<T> {
#[cfg(test)]
mod tests {
use super::*;
use std::sync::Arc;
use std::thread;
use std::time::Duration;

View File

@@ -343,8 +343,7 @@ impl<T: tun::Tun, B: udp::UDP> Wireguard<T, B> {
// create vector big enough for any message given current MTU
let mtu = wg.mtu.load(Ordering::Relaxed);
let size = mtu + handshake::MAX_HANDSHAKE_MSG_SIZE;
let mut msg: Vec<u8> = Vec::with_capacity(size);
msg.resize(size, 0);
let mut msg: Vec<u8> = vec![0; size];
// read UDP packet into vector
let (size, src) = match reader.read(&mut msg) {
@@ -413,8 +412,7 @@ impl<T: tun::Tun, B: udp::UDP> Wireguard<T, B> {
// create vector big enough for any transport message (based on MTU)
let mtu = wg.mtu.load(Ordering::Relaxed);
let size = mtu + router::SIZE_MESSAGE_PREFIX + 1;
let mut msg: Vec<u8> = Vec::with_capacity(size + router::CAPACITY_MESSAGE_POSTFIX);
msg.resize(size, 0);
let mut msg: Vec<u8> = vec![0; size + router::CAPACITY_MESSAGE_POSTFIX];
// read a new IP packet
let payload = match reader.read(&mut msg[..], router::SIZE_MESSAGE_PREFIX) {
@@ -426,7 +424,7 @@ impl<T: tun::Tun, B: udp::UDP> Wireguard<T, B> {
};
debug!("TUN worker, IP packet of {} bytes (MTU = {})", payload, mtu);
// TODO: start device down
// check if device is down
if mtu == 0 {
continue;
}