Reconsider inorder queueing

This commit is contained in:
Mathias Hall-Andersen
2019-09-02 20:22:47 +02:00
parent 929eadb651
commit 62d71a7a67
6 changed files with 85 additions and 17 deletions

View File

@@ -17,6 +17,7 @@ use super::peer;
use super::peer::{Peer, PeerInner};
use super::SIZE_MESSAGE_PREFIX;
use super::messages::TYPE_TRANSPORT;
use super::types::{Callback, Callbacks, KeyCallback, Opaque, PhantomCallbacks, RouterError};
use super::workers::{worker_parallel, JobParallel};
@@ -66,8 +67,8 @@ pub struct DecryptionState<C: Callbacks, T: Tun, B: Bind> {
}
pub struct Device<C: Callbacks, T: Tun, B: Bind>(
Arc<DeviceInner<C, T, B>>,
Vec<thread::JoinHandle<()>>,
Arc<DeviceInner<C, T, B>>, // reference to device state
Vec<thread::JoinHandle<()>>, // join handles for workers
);
impl<C: Callbacks, T: Tun, B: Bind> Drop for Device<C, T, B> {
@@ -207,8 +208,17 @@ impl<C: Callbacks, T: Tun, B: Bind> Device<C, T, B> {
// schedule for encryption and transmission to peer
if let Some(job) = peer.send_job(msg) {
println!("made job!");
self.0.injector.push((peer.clone(), job));
}
// ensure workers running
if self.0.parked.load(Ordering::Acquire) {
for handle in &self.1 {
handle.thread().unpark();
}
}
Ok(())
}
@@ -216,8 +226,17 @@ impl<C: Callbacks, T: Tun, B: Bind> Device<C, T, B> {
///
/// # Arguments
///
/// - ct_msg: Encrypted transport message
pub fn recv(&self, ct_msg: &mut [u8]) {
/// - msg: Encrypted transport message
pub fn recv(&self, msg: Vec<u8>) -> Result<(), RouterError> {
// ensure that the type field access is within bounds
if msg.len() < SIZE_MESSAGE_PREFIX || msg[0] != TYPE_TRANSPORT {
return Err(RouterError::MalformedTransportMessage);
}
// parse / cast
// lookup peer based on receiver id
unimplemented!();
}
}

View File

@@ -2,6 +2,8 @@ use byteorder::LittleEndian;
use zerocopy::byteorder::{U32, U64};
use zerocopy::{AsBytes, ByteSlice, FromBytes, LayoutVerified};
pub const TYPE_TRANSPORT: u8 = 4;
#[repr(packed)]
#[derive(Copy, Clone, FromBytes, AsBytes)]
pub struct TransportHeader {

View File

@@ -5,9 +5,9 @@ use std::sync::mpsc::{sync_channel, SyncSender};
use std::sync::{Arc, Weak};
use std::thread;
use spin;
use spin::Mutex;
use arraydeque::{ArrayDeque, Wrapping};
use arraydeque::{ArrayDeque, Wrapping, Saturating};
use zerocopy::{AsBytes, LayoutVerified};
use treebitmap::address::Address;
@@ -40,6 +40,8 @@ pub struct KeyWheel {
pub struct PeerInner<C: Callbacks, T: Tun, B: Bind> {
pub stopped: AtomicBool,
pub opaque: C::Opaque,
pub outbound: Mutex<ArrayDeque<[JobOutbound; MAX_STAGED_PACKETS], Wrapping>>,
pub inbound: Mutex<ArrayDeque<[JobInbound<C, T, B>; MAX_STAGED_PACKETS], Wrapping>>,
pub device: Arc<DeviceInner<C, T, B>>,
pub thread_outbound: spin::Mutex<Option<thread::JoinHandle<()>>>,
pub thread_inbound: spin::Mutex<Option<thread::JoinHandle<()>>>,
@@ -101,6 +103,7 @@ fn treebit_remove<A: Address, C: Callbacks, T: Tun, B: Bind>(
impl<C: Callbacks, T: Tun, B: Bind> Drop for Peer<C, T, B> {
fn drop(&mut self) {
println!("drop");
// mark peer as stopped
let peer = &self.0;
@@ -167,6 +170,8 @@ pub fn new_peer<C: Callbacks, T: Tun, B: Bind>(
let device = device.clone();
Arc::new(PeerInner {
opaque,
inbound: Mutex::new(ArrayDeque::new()),
outbound: Mutex::new(ArrayDeque::new()),
stopped: AtomicBool::new(false),
device: device,
ekey: spin::Mutex::new(None),
@@ -258,7 +263,10 @@ impl<C: Callbacks, T: Tun, B: Bind> PeerInner<C, T, B> {
// add job to in-order queue and return to device for inclusion in worker pool
match self.queue_outbound.try_send(job.clone()) {
Ok(_) => Some(job),
Err(_) => None,
Err(e) => {
println!("{:?}", e);
None
}
}
}
}

View File

@@ -3,11 +3,13 @@ use std::fmt;
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::thread;
use std::time::{Duration, Instant};
use pnet::packet::ipv4::MutableIpv4Packet;
use pnet::packet::ipv6::MutableIpv6Packet;
use super::super::types::{Bind, Tun};
use super::super::types::{Bind, Key, KeyPair, Tun};
use super::{Device, Peer, SIZE_MESSAGE_PREFIX};
#[derive(Debug)]
@@ -93,6 +95,32 @@ impl fmt::Display for BindError {
}
}
fn dummy_keypair(initiator: bool) -> KeyPair {
let k1 = Key {
key: [0x53u8; 32],
id: 0x646e6573,
};
let k2 = Key {
key: [0x52u8; 32],
id: 0x76636572,
};
if initiator {
KeyPair {
birth: Instant::now(),
initiator: true,
send: k1,
recv: k2,
}
} else {
KeyPair {
birth: Instant::now(),
initiator: false,
send: k2,
recv: k1,
}
}
}
#[test]
fn test_outbound() {
let opaque = Arc::new(AtomicBool::new(false));
@@ -134,6 +162,11 @@ fn test_outbound() {
),
];
thread::sleep(Duration::from_millis(1000));
assert!(false);
peer.add_keypair(dummy_keypair(true));
for (mask, len, ip, okay) in &tests {
opaque.store(false, Ordering::SeqCst);
@@ -162,7 +195,7 @@ fn test_outbound() {
assert!(res.is_ok());
// and a key should have been requested
assert!(opaque.load(Ordering::Acquire));
// assert!(opaque.load(Ordering::Acquire), "did not request key");
} else {
assert!(res.is_err());
}
@@ -170,4 +203,6 @@ fn test_outbound() {
// clear subnets for next test
peer.remove_subnets();
}
assert!(false);
}

View File

@@ -1,6 +1,6 @@
use std::error::Error;
use std::fmt;
use std::marker::PhantomData;
use std::error::Error;
pub trait Opaque: Send + Sync + 'static {}
@@ -52,19 +52,19 @@ impl<O: Opaque, R: Callback<O>, S: Callback<O>, K: KeyCallback<O>> Callbacks
type CallbackKey = K;
}
#[derive(Debug)]
pub enum RouterError {
NoCryptKeyRoute,
MalformedIPHeader,
MalformedTransportMessage,
}
impl fmt::Display for RouterError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
RouterError::NoCryptKeyRoute => write!(f, "No cryptkey route configured for subnet"),
RouterError::MalformedIPHeader => write!(f, "IP header is malformed")
RouterError::MalformedIPHeader => write!(f, "IP header is malformed"),
RouterError::MalformedTransportMessage => write!(f, "IP header is malformed"),
}
}
}

View File

@@ -167,7 +167,7 @@ pub fn worker_outbound<C: Callbacks, T: Tun, B: Bind>(
Ok(buf) => {
while !peer.stopped.load(Ordering::Acquire) {
match buf.try_lock() {
None => (),
None => (), // nothing to do
Some(buf) => match buf.status {
Status::Done => {
// parse / cast
@@ -198,7 +198,8 @@ pub fn worker_outbound<C: Callbacks, T: Tun, B: Bind>(
thread::park();
}
}
Err(_) => {
Err(e) => {
println!("park outbound! {:?}", e);
break;
}
}
@@ -211,9 +212,11 @@ pub fn worker_parallel<C: Callbacks, T: Tun, B: Bind>(
stealers: Vec<Stealer<JobParallel<C, T, B>>>, // stealers (from other threads)
) {
while device.running.load(Ordering::SeqCst) {
println!("running");
match find_task(&local, &device.injector, &stealers) {
Some(job) => {
let (peer, buf) = job;
println!("jobs!");
// take ownership of the job buffer and complete it
{
@@ -269,6 +272,7 @@ pub fn worker_parallel<C: Callbacks, T: Tun, B: Bind>(
.unpark();
}
None => {
println!("park");
device.parked.store(true, Ordering::Release);
thread::park();
}