Work on inbound/outbound consume code

This commit is contained in:
Mathias Hall-Andersen
2019-08-27 22:20:22 +02:00
parent cdbcd55eda
commit e5f515098a
3 changed files with 69 additions and 8 deletions

View File

@@ -44,6 +44,7 @@ pub struct EncryptionState {
pub struct DecryptionState<T: Opaque, S: Callback<T>, R: Callback<T>, K: KeyCallback<T>> {
pub key: [u8; 32],
pub keypair: Weak<KeyPair>,
pub confirmed: AtomicBool,
pub protector: spin::Mutex<AntiReplay>,
pub peer: Weak<PeerInner<T, S, R, K>>,
pub death: Instant, // time when the key can no longer be used for decryption

View File

@@ -265,6 +265,7 @@ impl<T: Opaque, S: Callback<T>, R: Callback<T>, K: KeyCallback<T>> Peer<T, S, R,
recv.insert(
new.recv.id,
DecryptionState {
confirmed: AtomicBool::new(false),
keypair: Arc::downgrade(&new),
key: new.recv.key,
protector: spin::Mutex::new(AntiReplay::new()),

View File

@@ -1,4 +1,5 @@
use std::iter;
use std::mem;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::{sync_channel, Receiver, TryRecvError};
use std::sync::{Arc, Weak};
@@ -97,12 +98,50 @@ pub fn worker_inbound<T: Opaque, S: Callback<T>, R: Callback<T>, K: KeyCallback<
while !peer.stopped.load(Ordering::Acquire) {
match buf.try_lock() {
None => (),
Some(buf) => {
if buf.status != Status::Waiting {
// consume
Some(buf) => match buf.status {
Status::Done => {
// cast
let (header, packet) =
match LayoutVerified::new_from_prefix(&buf.msg[..]) {
Some(v) => v,
None => continue,
};
let header: LayoutVerified<&[u8], TransportHeader> = header;
// obtain strong reference to decryption state
let state = if let Some(state) = state.upgrade() {
state
} else {
break;
};
// check for replay
if !state.protector.lock().update(header.f_counter.get()) {
break;
}
// check for confirms key
if state.confirmed.swap(true, Ordering::SeqCst) {
// TODO: confirm key
}
// write packet to TUN device
// trigger callback
debug_assert!(
packet.len() >= CHACHA20_POLY1305.nonce_len(),
"this should be checked earlier in the pipeline"
);
(device.event_recv)(
&peer.opaque,
packet.len() > CHACHA20_POLY1305.nonce_len(),
true,
);
break;
}
Status::Fault => break,
_ => (),
},
};
thread::park();
}
@@ -125,12 +164,32 @@ pub fn worker_outbound<T: Opaque, S: Callback<T>, R: Callback<T>, K: KeyCallback
while !peer.stopped.load(Ordering::Acquire) {
match buf.try_lock() {
None => (),
Some(buf) => {
if buf.status != Status::Waiting {
// consume
Some(buf) => match buf.status {
Status::Done => {
// cast
let (header, packet) =
match LayoutVerified::new_from_prefix(&buf.msg[..]) {
Some(v) => v,
None => continue,
};
let header: LayoutVerified<&[u8], TransportHeader> = header;
// write to UDP device
let xmit = false;
// trigger callback
(device.event_send)(
&peer.opaque,
buf.msg.len()
> CHACHA20_POLY1305.nonce_len()
+ mem::size_of::<TransportHeader>(),
xmit,
);
break;
}
}
Status::Fault => break,
_ => (),
},
};
thread::park();
}