Start worker threads for device

This commit is contained in:
Mathias Hall-Andersen
2019-08-28 11:52:08 +02:00
parent a1b50aca26
commit 10e6436e6b
2 changed files with 53 additions and 26 deletions

View File

@@ -5,7 +5,7 @@ use std::sync::{Arc, Weak};
use std::thread; use std::thread;
use std::time::Instant; use std::time::Instant;
use crossbeam_deque::{Injector, Steal}; use crossbeam_deque::{Injector, Steal, Stealer, Worker};
use spin; use spin;
use treebitmap::IpLookupTable; use treebitmap::IpLookupTable;
@@ -15,12 +15,13 @@ use super::peer;
use super::peer::{Peer, PeerInner}; use super::peer::{Peer, PeerInner};
use super::types::{Callback, KeyCallback, Opaque}; use super::types::{Callback, KeyCallback, Opaque};
use super::workers::{worker_parallel, JobParallel};
pub struct DeviceInner<T: Opaque, S: Callback<T>, R: Callback<T>, K: KeyCallback<T>> { pub struct DeviceInner<T: Opaque, S: Callback<T>, R: Callback<T>, K: KeyCallback<T>> {
// threading and workers // threading and workers
pub stopped: AtomicBool, pub running: AtomicBool, // workers running?
pub injector: Injector<()>, // parallel enc/dec task injector pub parked: AtomicBool, // any workers parked?
pub threads: Vec<thread::JoinHandle<()>>, // join handles of worker threads pub injector: Injector<JobParallel>, // parallel enc/dec task injector
// unboxed callbacks (used for timers and handshake requests) // unboxed callbacks (used for timers and handshake requests)
pub event_send: S, // called when authenticated message send pub event_send: S, // called when authenticated message send
@@ -52,19 +53,23 @@ pub struct DecryptionState<T: Opaque, S: Callback<T>, R: Callback<T>, K: KeyCall
pub struct Device<T: Opaque, S: Callback<T>, R: Callback<T>, K: KeyCallback<T>>( pub struct Device<T: Opaque, S: Callback<T>, R: Callback<T>, K: KeyCallback<T>>(
Arc<DeviceInner<T, S, R, K>>, Arc<DeviceInner<T, S, R, K>>,
Vec<thread::JoinHandle<()>>,
); );
impl<T: Opaque, S: Callback<T>, R: Callback<T>, K: KeyCallback<T>> Drop for Device<T, S, R, K> { impl<T: Opaque, S: Callback<T>, R: Callback<T>, K: KeyCallback<T>> Drop for Device<T, S, R, K> {
fn drop(&mut self) { fn drop(&mut self) {
// mark device as stopped // mark device as stopped
let device = &self.0; let device = &self.0;
device.stopped.store(true, Ordering::SeqCst); device.running.store(false, Ordering::SeqCst);
// eat all parallel jobs // eat all parallel jobs
while device.injector.steal() != Steal::Empty {} while match device.injector.steal() {
Steal::Empty => true,
_ => false,
} {}
// unpark all threads // unpark all threads
for handle in &device.threads { for handle in &self.1 {
handle.thread().unpark(); handle.thread().unpark();
} }
} }
@@ -72,22 +77,46 @@ impl<T: Opaque, S: Callback<T>, R: Callback<T>, K: KeyCallback<T>> Drop for Devi
impl<T: Opaque, S: Callback<T>, R: Callback<T>, K: KeyCallback<T>> Device<T, S, R, K> { impl<T: Opaque, S: Callback<T>, R: Callback<T>, K: KeyCallback<T>> Device<T, S, R, K> {
pub fn new( pub fn new(
workers: usize, num_workers: usize,
event_recv: R, event_recv: R,
event_send: S, event_send: S,
event_need_key: K, event_need_key: K,
) -> Device<T, S, R, K> { ) -> Device<T, S, R, K> {
Device(Arc::new(DeviceInner { // allocate shared device state
let inner = Arc::new(DeviceInner {
event_recv, event_recv,
event_send, event_send,
event_need_key, event_need_key,
threads: vec![], parked: AtomicBool::new(false),
stopped: AtomicBool::new(false), running: AtomicBool::new(true),
injector: Injector::new(), injector: Injector::new(),
recv: spin::RwLock::new(HashMap::new()), recv: spin::RwLock::new(HashMap::new()),
ipv4: spin::RwLock::new(IpLookupTable::new()), ipv4: spin::RwLock::new(IpLookupTable::new()),
ipv6: spin::RwLock::new(IpLookupTable::new()), ipv6: spin::RwLock::new(IpLookupTable::new()),
})) });
// alloacate work pool resources
let mut workers = Vec::with_capacity(num_workers);
let mut stealers = Vec::with_capacity(num_workers);
for _ in 0..num_workers {
let w = Worker::new_fifo();
stealers.push(w.stealer());
workers.push(w);
}
// start worker threads
let mut threads = Vec::with_capacity(num_workers);
for _ in 0..num_workers {
let device = inner.clone();
let stealers = stealers.clone();
let worker = workers.pop().unwrap();
threads.push(thread::spawn(move || {
worker_parallel(device, worker, stealers)
}));
}
// return exported device handle
Device(inner, threads)
} }
/// Adds a new peer to the device /// Adds a new peer to the device

View File

@@ -18,7 +18,7 @@ use super::peer::PeerInner;
use super::types::{Callback, KeyCallback, Opaque}; use super::types::{Callback, KeyCallback, Opaque};
#[derive(PartialEq, Debug)] #[derive(PartialEq, Debug)]
enum Operation { pub enum Operation {
Encryption, Encryption,
Decryption, Decryption,
} }
@@ -60,8 +60,8 @@ fn find_task<T>(local: &Worker<T>, global: &Injector<T>, stealers: &[Stealer<T>]
}) })
} }
fn wait_buffer(stopped: AtomicBool, buf: &JobBuffer) { fn wait_buffer(running: AtomicBool, buf: &JobBuffer) {
while !stopped.load(Ordering::Acquire) { while running.load(Ordering::Acquire) {
match buf.try_lock() { match buf.try_lock() {
None => (), None => (),
Some(buf) => { Some(buf) => {
@@ -74,8 +74,8 @@ fn wait_buffer(stopped: AtomicBool, buf: &JobBuffer) {
} }
} }
fn wait_recv<T>(stopped: &AtomicBool, recv: &Receiver<T>) -> Result<T, TryRecvError> { fn wait_recv<T>(running: &AtomicBool, recv: &Receiver<T>) -> Result<T, TryRecvError> {
while !stopped.load(Ordering::Acquire) { while running.load(Ordering::Acquire) {
match recv.try_recv() { match recv.try_recv() {
Err(TryRecvError::Empty) => (), Err(TryRecvError::Empty) => (),
value => { value => {
@@ -201,15 +201,13 @@ pub fn worker_outbound<T: Opaque, S: Callback<T>, R: Callback<T>, K: KeyCallback
} }
} }
pub fn worker_parallel( pub fn worker_parallel<T: Opaque, S: Callback<T>, R: Callback<T>, K: KeyCallback<T>>(
stopped: Arc<AtomicBool>, // stop workers (device has been dropped) device: Arc<DeviceInner<T, S, R, K>>,
parked: Arc<AtomicBool>, // thread has been parked? local: Worker<JobParallel>, // local job queue (local to thread)
local: Worker<JobParallel>, // local job queue (local to thread)
global: Injector<JobParallel>, // global job injector
stealers: Vec<Stealer<JobParallel>>, // stealers (from other threads) stealers: Vec<Stealer<JobParallel>>, // stealers (from other threads)
) { ) {
while !stopped.load(Ordering::SeqCst) { while !device.running.load(Ordering::SeqCst) {
match find_task(&local, &global, &stealers) { match find_task(&local, &device.injector, &stealers) {
Some(job) => { Some(job) => {
let (handle, buf) = job; let (handle, buf) = job;
@@ -236,7 +234,7 @@ pub fn worker_parallel(
// create a nonce object // create a nonce object
let mut nonce = [0u8; 12]; let mut nonce = [0u8; 12];
debug_assert_eq!(nonce.len(), CHACHA20_POLY1305.nonce_len()); // why the fuck this is not a constant, god knows... debug_assert_eq!(nonce.len(), CHACHA20_POLY1305.nonce_len()); // why the this is not a constant, god knows...
nonce[4..].copy_from_slice(header.f_counter.as_bytes()); nonce[4..].copy_from_slice(header.f_counter.as_bytes());
let nonce = Nonce::assume_unique_for_key(nonce); let nonce = Nonce::assume_unique_for_key(nonce);
@@ -263,7 +261,7 @@ pub fn worker_parallel(
} }
None => { None => {
// no jobs, park the worker // no jobs, park the worker
parked.store(true, Ordering::Release); device.parked.store(true, Ordering::Release);
thread::park(); thread::park();
} }
} }