Simply passing of JobBuffer ownership

This commit is contained in:
Mathias Hall-Andersen
2019-09-04 19:08:13 +02:00
parent f55014ef8f
commit 6d11da441b
7 changed files with 279 additions and 304 deletions

View File

@@ -1,14 +1,13 @@
use std::cmp;
use std::collections::HashMap;
use std::net::{Ipv4Addr, Ipv6Addr};
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::mpsc::sync_channel;
use std::sync::mpsc::SyncSender;
use std::sync::{Arc, Weak};
use std::thread;
use std::time::Instant;
use parking_lot::{Condvar, Mutex};
use crossbeam_deque::{Injector, Worker};
use spin;
use treebitmap::IpLookupTable;
@@ -41,11 +40,6 @@ pub struct DeviceInner<C: Callbacks, T: Tun, B: Bind> {
pub call_send: C::CallbackSend,
pub call_need_key: C::CallbackKey,
// threading and workers
pub waker: (Mutex<()>, Condvar),
pub running: AtomicBool, // workers running?
pub injector: Injector<JobParallel<C, T, B>>, // parallel enc/dec task injector
// routing
pub recv: spin::RwLock<HashMap<u32, DecryptionState<C, T, B>>>, // receiver id -> decryption state
pub ipv4: spin::RwLock<IpLookupTable<Ipv4Addr, Weak<PeerInner<C, T, B>>>>, // ipv4 cryptkey routing
@@ -68,19 +62,20 @@ pub struct DecryptionState<C: Callbacks, T: Tun, B: Bind> {
pub death: Instant, // time when the key can no longer be used for decryption
}
pub struct Device<C: Callbacks, T: Tun, B: Bind>(
Arc<DeviceInner<C, T, B>>, // reference to device state
Vec<thread::JoinHandle<()>>, // join handles for workers
);
pub struct Device<C: Callbacks, T: Tun, B: Bind> {
pub state: Arc<DeviceInner<C, T, B>>, // reference to device state
pub handles: Vec<thread::JoinHandle<()>>, // join handles for workers
pub queue_next: AtomicUsize, // next round-robin index
pub queues: Vec<spin::Mutex<SyncSender<JobParallel>>>, // work queues (1 per thread)
}
impl<C: Callbacks, T: Tun, B: Bind> Drop for Device<C, T, B> {
fn drop(&mut self) {
// mark device as stopped
let device = &self.0;
device.running.store(false, Ordering::SeqCst);
// drop all queues
while self.queues.pop().is_some() {}
// join all worker threads
while match self.1.pop() {
while match self.handles.pop() {
Some(handle) => {
handle.thread().unpark();
handle.join().unwrap();
@@ -98,8 +93,8 @@ impl<O: Opaque, R: Callback<O>, S: Callback<O>, K: KeyCallback<O>, T: Tun, B: Bi
num_workers: usize,
tun: T,
bind: B,
call_recv: R,
call_send: S,
call_recv: R,
call_need_key: K,
) -> Device<PhantomCallbacks<O, R, S, K>, T, B> {
// allocate shared device state
@@ -109,36 +104,31 @@ impl<O: Opaque, R: Callback<O>, S: Callback<O>, K: KeyCallback<O>, T: Tun, B: Bi
call_recv,
call_send,
call_need_key,
waker: (Mutex::new(()), Condvar::new()),
running: AtomicBool::new(true),
injector: Injector::new(),
recv: spin::RwLock::new(HashMap::new()),
ipv4: spin::RwLock::new(IpLookupTable::new()),
ipv6: spin::RwLock::new(IpLookupTable::new()),
});
// allocate work pool resources
let mut workers = Vec::with_capacity(num_workers);
let mut stealers = Vec::with_capacity(num_workers);
for _ in 0..num_workers {
let w = Worker::new_fifo();
stealers.push(w.stealer());
workers.push(w);
}
// start worker threads
let mut queues = Vec::with_capacity(num_workers);
let mut threads = Vec::with_capacity(num_workers);
for _ in 0..num_workers {
// allocate work queue
let (tx, rx) = sync_channel(128);
queues.push(spin::Mutex::new(tx));
// start worker thread
let device = inner.clone();
let stealers = stealers.clone();
let worker = workers.pop().unwrap();
threads.push(thread::spawn(move || {
worker_parallel(device, worker, stealers)
}));
threads.push(thread::spawn(move || worker_parallel(device, rx)));
}
// return exported device handle
Device(inner, threads)
Device {
state: inner,
handles: threads,
queue_next: AtomicUsize::new(0),
queues: queues,
}
}
}
@@ -149,7 +139,7 @@ impl<C: Callbacks, T: Tun, B: Bind> Device<C, T, B> {
///
/// A atomic ref. counted peer (with liftime matching the device)
pub fn new_peer(&self, opaque: C::Opaque) -> Peer<C, T, B> {
peer::new_peer(self.0.clone(), opaque)
peer::new_peer(self.state.clone(), opaque)
}
/// Cryptkey routes and sends a plaintext message (IP packet)
@@ -177,7 +167,7 @@ impl<C: Callbacks, T: Tun, B: Bind> Device<C, T, B> {
let dst = Ipv4Addr::from(dst);
// lookup peer (project unto and clone "value" field)
self.0
self.state
.ipv4
.read()
.longest_match(dst)
@@ -195,7 +185,7 @@ impl<C: Callbacks, T: Tun, B: Bind> Device<C, T, B> {
let dst = Ipv6Addr::from(dst);
// lookup peer (project unto and clone "value" field)
self.0
self.state
.ipv6
.read()
.longest_match(dst)
@@ -210,12 +200,12 @@ impl<C: Callbacks, T: Tun, B: Bind> Device<C, T, B> {
// schedule for encryption and transmission to peer
if let Some(job) = peer.send_job(msg) {
// add job to parallel worker pool
self.0.injector.push((peer.clone(), job));
// ensure workers running, TODO: something faster
let &(_, ref cvar) = &self.0.waker;
cvar.notify_all();
// add job to worker queue
let idx = self.queue_next.fetch_add(1, Ordering::SeqCst);
self.queues[idx % self.queues.len()]
.lock()
.send(job)
.unwrap();
}
Ok(())