Wake workers when submitting work
This commit is contained in:
@@ -6,6 +6,8 @@ use std::sync::{Arc, Weak};
|
||||
use std::thread;
|
||||
use std::time::Instant;
|
||||
|
||||
use parking_lot::{Condvar, Mutex};
|
||||
|
||||
use crossbeam_deque::{Injector, Worker};
|
||||
use spin;
|
||||
use treebitmap::IpLookupTable;
|
||||
@@ -40,8 +42,8 @@ pub struct DeviceInner<C: Callbacks, T: Tun, B: Bind> {
|
||||
pub call_need_key: C::CallbackKey,
|
||||
|
||||
// threading and workers
|
||||
pub waker: (Mutex<()>, Condvar),
|
||||
pub running: AtomicBool, // workers running?
|
||||
pub parked: AtomicBool, // any workers parked?
|
||||
pub injector: Injector<JobParallel<C, T, B>>, // parallel enc/dec task injector
|
||||
|
||||
// routing
|
||||
@@ -107,7 +109,7 @@ impl<O: Opaque, R: Callback<O>, S: Callback<O>, K: KeyCallback<O>, T: Tun, B: Bi
|
||||
call_recv,
|
||||
call_send,
|
||||
call_need_key,
|
||||
parked: AtomicBool::new(false),
|
||||
waker: (Mutex::new(()), Condvar::new()),
|
||||
running: AtomicBool::new(true),
|
||||
injector: Injector::new(),
|
||||
recv: spin::RwLock::new(HashMap::new()),
|
||||
@@ -208,15 +210,12 @@ impl<C: Callbacks, T: Tun, B: Bind> Device<C, T, B> {
|
||||
|
||||
// schedule for encryption and transmission to peer
|
||||
if let Some(job) = peer.send_job(msg) {
|
||||
println!("made job!");
|
||||
// add job to parallel worker pool
|
||||
self.0.injector.push((peer.clone(), job));
|
||||
}
|
||||
|
||||
// ensure workers running
|
||||
if self.0.parked.load(Ordering::Acquire) {
|
||||
for handle in &self.1 {
|
||||
handle.thread().unpark();
|
||||
}
|
||||
// ensure workers running, TODO: something faster
|
||||
let &(_, ref cvar) = &self.0.waker;
|
||||
cvar.notify_all();
|
||||
}
|
||||
|
||||
Ok(())
|
||||
|
||||
@@ -7,7 +7,7 @@ use std::thread;
|
||||
|
||||
use spin::Mutex;
|
||||
|
||||
use arraydeque::{ArrayDeque, Wrapping, Saturating};
|
||||
use arraydeque::{ArrayDeque, Saturating, Wrapping};
|
||||
use zerocopy::{AsBytes, LayoutVerified};
|
||||
|
||||
use treebitmap::address::Address;
|
||||
@@ -40,19 +40,17 @@ pub struct KeyWheel {
|
||||
pub struct PeerInner<C: Callbacks, T: Tun, B: Bind> {
|
||||
pub stopped: AtomicBool,
|
||||
pub opaque: C::Opaque,
|
||||
pub outbound: Mutex<ArrayDeque<[JobOutbound; MAX_STAGED_PACKETS], Wrapping>>,
|
||||
pub inbound: Mutex<ArrayDeque<[JobInbound<C, T, B>; MAX_STAGED_PACKETS], Wrapping>>,
|
||||
pub outbound: Mutex<ArrayDeque<[JobOutbound; MAX_STAGED_PACKETS], Saturating>>,
|
||||
pub inbound: Mutex<ArrayDeque<[JobInbound<C, T, B>; MAX_STAGED_PACKETS], Saturating>>,
|
||||
pub device: Arc<DeviceInner<C, T, B>>,
|
||||
pub thread_outbound: spin::Mutex<Option<thread::JoinHandle<()>>>,
|
||||
pub thread_inbound: spin::Mutex<Option<thread::JoinHandle<()>>>,
|
||||
pub queue_outbound: SyncSender<JobOutbound>,
|
||||
pub queue_inbound: SyncSender<JobInbound<C, T, B>>,
|
||||
pub staged_packets: spin::Mutex<ArrayDeque<[Vec<u8>; MAX_STAGED_PACKETS], Wrapping>>, // packets awaiting handshake
|
||||
pub rx_bytes: AtomicU64, // received bytes
|
||||
pub tx_bytes: AtomicU64, // transmitted bytes
|
||||
pub keys: spin::Mutex<KeyWheel>, // key-wheel
|
||||
pub ekey: spin::Mutex<Option<EncryptionState>>, // encryption state
|
||||
pub endpoint: spin::Mutex<Option<Arc<SocketAddr>>>,
|
||||
pub thread_outbound: Mutex<Option<thread::JoinHandle<()>>>,
|
||||
pub thread_inbound: Mutex<Option<thread::JoinHandle<()>>>,
|
||||
pub staged_packets: Mutex<ArrayDeque<[Vec<u8>; MAX_STAGED_PACKETS], Wrapping>>, // packets awaiting handshake
|
||||
pub rx_bytes: AtomicU64, // received bytes
|
||||
pub tx_bytes: AtomicU64, // transmitted bytes
|
||||
pub keys: Mutex<KeyWheel>, // key-wheel
|
||||
pub ekey: Mutex<Option<EncryptionState>>, // encryption state
|
||||
pub endpoint: Mutex<Option<Arc<SocketAddr>>>,
|
||||
}
|
||||
|
||||
pub struct Peer<C: Callbacks, T: Tun, B: Bind>(Arc<PeerInner<C, T, B>>);
|
||||
@@ -103,7 +101,6 @@ fn treebit_remove<A: Address, C: Callbacks, T: Tun, B: Bind>(
|
||||
|
||||
impl<C: Callbacks, T: Tun, B: Bind> Drop for Peer<C, T, B> {
|
||||
fn drop(&mut self) {
|
||||
println!("drop");
|
||||
// mark peer as stopped
|
||||
|
||||
let peer = &self.0;
|
||||
@@ -161,10 +158,6 @@ pub fn new_peer<C: Callbacks, T: Tun, B: Bind>(
|
||||
device: Arc<DeviceInner<C, T, B>>,
|
||||
opaque: C::Opaque,
|
||||
) -> Peer<C, T, B> {
|
||||
// allocate in-order queues
|
||||
let (send_inbound, recv_inbound) = sync_channel(MAX_STAGED_PACKETS);
|
||||
let (send_outbound, recv_outbound) = sync_channel(MAX_STAGED_PACKETS);
|
||||
|
||||
// allocate peer object
|
||||
let peer = {
|
||||
let device = device.clone();
|
||||
@@ -176,8 +169,6 @@ pub fn new_peer<C: Callbacks, T: Tun, B: Bind>(
|
||||
device: device,
|
||||
ekey: spin::Mutex::new(None),
|
||||
endpoint: spin::Mutex::new(None),
|
||||
queue_inbound: send_inbound,
|
||||
queue_outbound: send_outbound,
|
||||
keys: spin::Mutex::new(KeyWheel {
|
||||
next: None,
|
||||
current: None,
|
||||
@@ -192,22 +183,18 @@ pub fn new_peer<C: Callbacks, T: Tun, B: Bind>(
|
||||
})
|
||||
};
|
||||
|
||||
// spawn inbound thread
|
||||
// spawn outbound thread
|
||||
*peer.thread_inbound.lock() = {
|
||||
let peer = peer.clone();
|
||||
let device = device.clone();
|
||||
Some(thread::spawn(move || {
|
||||
worker_outbound(device, peer, recv_outbound)
|
||||
}))
|
||||
Some(thread::spawn(move || worker_outbound(device, peer)))
|
||||
};
|
||||
|
||||
// spawn outbound thread
|
||||
// spawn inbound thread
|
||||
*peer.thread_outbound.lock() = {
|
||||
let peer = peer.clone();
|
||||
let device = device.clone();
|
||||
Some(thread::spawn(move || {
|
||||
worker_inbound(device, peer, recv_inbound)
|
||||
}))
|
||||
Some(thread::spawn(move || worker_inbound(device, peer)))
|
||||
};
|
||||
|
||||
Peer(peer)
|
||||
@@ -261,12 +248,9 @@ impl<C: Callbacks, T: Tun, B: Bind> PeerInner<C, T, B> {
|
||||
}));
|
||||
|
||||
// add job to in-order queue and return to device for inclusion in worker pool
|
||||
match self.queue_outbound.try_send(job.clone()) {
|
||||
match self.outbound.lock().push_back(job.clone()) {
|
||||
Ok(_) => Some(job),
|
||||
Err(e) => {
|
||||
println!("{:?}", e);
|
||||
None
|
||||
}
|
||||
Err(_) => None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -162,9 +162,6 @@ fn test_outbound() {
|
||||
),
|
||||
];
|
||||
|
||||
thread::sleep(Duration::from_millis(1000));
|
||||
assert!(false);
|
||||
|
||||
peer.add_keypair(dummy_keypair(true));
|
||||
|
||||
for (mask, len, ip, okay) in &tests {
|
||||
@@ -199,10 +196,7 @@ fn test_outbound() {
|
||||
} else {
|
||||
assert!(res.is_err());
|
||||
}
|
||||
|
||||
// clear subnets for next test
|
||||
peer.remove_subnets();
|
||||
}
|
||||
|
||||
assert!(false);
|
||||
}
|
||||
|
||||
@@ -90,69 +90,81 @@ fn wait_recv<T>(running: &AtomicBool, recv: &Receiver<T>) -> Result<T, TryRecvEr
|
||||
}
|
||||
|
||||
pub fn worker_inbound<C: Callbacks, T: Tun, B: Bind>(
|
||||
device: Arc<DeviceInner<C, T, B>>, // related device
|
||||
peer: Arc<PeerInner<C, T, B>>, // related peer
|
||||
recv: Receiver<JobInbound<C, T, B>>, // in order queue
|
||||
device: Arc<DeviceInner<C, T, B>>, // related device
|
||||
peer: Arc<PeerInner<C, T, B>>, // related peer
|
||||
) {
|
||||
loop {
|
||||
match wait_recv(&peer.stopped, &recv) {
|
||||
Ok((state, buf)) => {
|
||||
while !peer.stopped.load(Ordering::Acquire) {
|
||||
match buf.try_lock() {
|
||||
None => (),
|
||||
Some(buf) => match buf.status {
|
||||
Status::Done => {
|
||||
// parse / cast
|
||||
let (header, packet) =
|
||||
match LayoutVerified::new_from_prefix(&buf.msg[..]) {
|
||||
Some(v) => v,
|
||||
None => continue,
|
||||
};
|
||||
let header: LayoutVerified<&[u8], TransportHeader> = header;
|
||||
while !peer.stopped.load(Ordering::Acquire) {
|
||||
inner(&device, &peer)
|
||||
}
|
||||
|
||||
// obtain strong reference to decryption state
|
||||
let state = if let Some(state) = state.upgrade() {
|
||||
state
|
||||
} else {
|
||||
break;
|
||||
};
|
||||
|
||||
// check for replay
|
||||
if !state.protector.lock().update(header.f_counter.get()) {
|
||||
break;
|
||||
}
|
||||
|
||||
// check for confirms key
|
||||
if !state.confirmed.swap(true, Ordering::SeqCst) {
|
||||
peer.confirm_key(state.keypair.clone());
|
||||
}
|
||||
|
||||
// update enpoint, TODO
|
||||
|
||||
// write packet to TUN device, TODO
|
||||
|
||||
// trigger callback
|
||||
debug_assert!(
|
||||
packet.len() >= CHACHA20_POLY1305.nonce_len(),
|
||||
"this should be checked earlier in the pipeline"
|
||||
);
|
||||
(device.call_recv)(
|
||||
&peer.opaque,
|
||||
packet.len() > CHACHA20_POLY1305.nonce_len(),
|
||||
true,
|
||||
);
|
||||
break;
|
||||
}
|
||||
Status::Fault => break,
|
||||
_ => (),
|
||||
},
|
||||
};
|
||||
thread::park();
|
||||
}
|
||||
}
|
||||
Err(_) => {
|
||||
break;
|
||||
fn inner<C: Callbacks, T: Tun, B: Bind>(
|
||||
device: &Arc<DeviceInner<C, T, B>>,
|
||||
peer: &Arc<PeerInner<C, T, B>>,
|
||||
) {
|
||||
// wait for job to be submitted
|
||||
let (state, buf) = loop {
|
||||
match peer.inbound.lock().pop_front() {
|
||||
Some(elem) => break elem,
|
||||
_ => (),
|
||||
}
|
||||
|
||||
// default is to park
|
||||
thread::park()
|
||||
};
|
||||
|
||||
// wait for job to complete
|
||||
loop {
|
||||
match buf.try_lock() {
|
||||
None => (),
|
||||
Some(buf) => match buf.status {
|
||||
Status::Fault => break (),
|
||||
Status::Done => {
|
||||
// parse / cast
|
||||
let (header, packet) = match LayoutVerified::new_from_prefix(&buf.msg[..]) {
|
||||
Some(v) => v,
|
||||
None => continue,
|
||||
};
|
||||
let header: LayoutVerified<&[u8], TransportHeader> = header;
|
||||
|
||||
// obtain strong reference to decryption state
|
||||
let state = if let Some(state) = state.upgrade() {
|
||||
state
|
||||
} else {
|
||||
break;
|
||||
};
|
||||
|
||||
// check for replay
|
||||
if !state.protector.lock().update(header.f_counter.get()) {
|
||||
break;
|
||||
}
|
||||
|
||||
// check for confirms key
|
||||
if !state.confirmed.swap(true, Ordering::SeqCst) {
|
||||
peer.confirm_key(state.keypair.clone());
|
||||
}
|
||||
|
||||
// update endpoint, TODO
|
||||
|
||||
// write packet to TUN device, TODO
|
||||
|
||||
// trigger callback
|
||||
debug_assert!(
|
||||
packet.len() >= CHACHA20_POLY1305.nonce_len(),
|
||||
"this should be checked earlier in the pipeline"
|
||||
);
|
||||
(device.call_recv)(
|
||||
&peer.opaque,
|
||||
packet.len() > CHACHA20_POLY1305.nonce_len(),
|
||||
true,
|
||||
);
|
||||
break;
|
||||
}
|
||||
_ => (),
|
||||
},
|
||||
};
|
||||
|
||||
// default is to park
|
||||
thread::park()
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -160,48 +172,58 @@ pub fn worker_inbound<C: Callbacks, T: Tun, B: Bind>(
|
||||
pub fn worker_outbound<C: Callbacks, T: Tun, B: Bind>(
|
||||
device: Arc<DeviceInner<C, T, B>>, // related device
|
||||
peer: Arc<PeerInner<C, T, B>>, // related peer
|
||||
recv: Receiver<JobOutbound>, // in order queue
|
||||
) {
|
||||
loop {
|
||||
match wait_recv(&peer.stopped, &recv) {
|
||||
Ok(buf) => {
|
||||
while !peer.stopped.load(Ordering::Acquire) {
|
||||
match buf.try_lock() {
|
||||
None => (), // nothing to do
|
||||
Some(buf) => match buf.status {
|
||||
Status::Done => {
|
||||
// parse / cast
|
||||
let (header, packet) =
|
||||
match LayoutVerified::new_from_prefix(&buf.msg[..]) {
|
||||
Some(v) => v,
|
||||
None => continue,
|
||||
};
|
||||
let header: LayoutVerified<&[u8], TransportHeader> = header;
|
||||
while !peer.stopped.load(Ordering::Acquire) {
|
||||
inner(&device, &peer)
|
||||
}
|
||||
|
||||
// write to UDP device, TODO
|
||||
let xmit = false;
|
||||
fn inner<C: Callbacks, T: Tun, B: Bind>(
|
||||
device: &Arc<DeviceInner<C, T, B>>,
|
||||
peer: &Arc<PeerInner<C, T, B>>,
|
||||
) {
|
||||
// wait for job to be submitted
|
||||
let (state, buf) = loop {
|
||||
match peer.inbound.lock().pop_front() {
|
||||
Some(elem) => break elem,
|
||||
_ => (),
|
||||
}
|
||||
|
||||
// trigger callback
|
||||
(device.call_send)(
|
||||
&peer.opaque,
|
||||
buf.msg.len()
|
||||
> CHACHA20_POLY1305.nonce_len()
|
||||
+ mem::size_of::<TransportHeader>(),
|
||||
xmit,
|
||||
);
|
||||
break;
|
||||
}
|
||||
Status::Fault => break,
|
||||
_ => (),
|
||||
},
|
||||
};
|
||||
thread::park();
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
println!("park outbound! {:?}", e);
|
||||
break;
|
||||
}
|
||||
// default is to park
|
||||
thread::park()
|
||||
};
|
||||
|
||||
// wait for job to complete
|
||||
loop {
|
||||
match buf.try_lock() {
|
||||
None => (),
|
||||
Some(buf) => match buf.status {
|
||||
Status::Fault => break (),
|
||||
Status::Done => {
|
||||
// parse / cast
|
||||
let (header, packet) = match LayoutVerified::new_from_prefix(&buf.msg[..]) {
|
||||
Some(v) => v,
|
||||
None => continue,
|
||||
};
|
||||
let header: LayoutVerified<&[u8], TransportHeader> = header;
|
||||
|
||||
// write to UDP device, TODO
|
||||
let xmit = false;
|
||||
|
||||
// trigger callback
|
||||
(device.call_send)(
|
||||
&peer.opaque,
|
||||
buf.msg.len()
|
||||
> CHACHA20_POLY1305.nonce_len() + mem::size_of::<TransportHeader>(),
|
||||
xmit,
|
||||
);
|
||||
break;
|
||||
}
|
||||
_ => (),
|
||||
},
|
||||
};
|
||||
|
||||
// default is to park
|
||||
thread::park()
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -212,11 +234,9 @@ pub fn worker_parallel<C: Callbacks, T: Tun, B: Bind>(
|
||||
stealers: Vec<Stealer<JobParallel<C, T, B>>>, // stealers (from other threads)
|
||||
) {
|
||||
while device.running.load(Ordering::SeqCst) {
|
||||
println!("running");
|
||||
match find_task(&local, &device.injector, &stealers) {
|
||||
Some(job) => {
|
||||
let (peer, buf) = job;
|
||||
println!("jobs!");
|
||||
|
||||
// take ownership of the job buffer and complete it
|
||||
{
|
||||
@@ -272,9 +292,10 @@ pub fn worker_parallel<C: Callbacks, T: Tun, B: Bind>(
|
||||
.unpark();
|
||||
}
|
||||
None => {
|
||||
println!("park");
|
||||
device.parked.store(true, Ordering::Release);
|
||||
thread::park();
|
||||
// wait for notification from device
|
||||
let &(ref lock, ref cvar) = &device.waker;
|
||||
let mut guard = lock.lock();
|
||||
cvar.wait(&mut guard);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user