Sent staged packets when key-pair confirmed

This commit is contained in:
Mathias Hall-Andersen
2019-09-15 15:15:15 +02:00
parent b31becda71
commit f46762183a
4 changed files with 50 additions and 46 deletions

View File

@@ -14,7 +14,7 @@ use treebitmap::IpLookupTable;
use zerocopy::LayoutVerified; use zerocopy::LayoutVerified;
use super::super::constants::*; use super::super::constants::*;
use super::super::types::{Bind, KeyPair, Tun}; use super::super::types::{Bind, Endpoint, KeyPair, Tun};
use super::anti_replay::AntiReplay; use super::anti_replay::AntiReplay;
use super::device::DecryptionState; use super::device::DecryptionState;
@@ -216,6 +216,35 @@ pub fn new_peer<C: Callbacks, T: Tun, B: Bind>(
} }
impl<C: Callbacks, T: Tun, B: Bind> PeerInner<C, T, B> { impl<C: Callbacks, T: Tun, B: Bind> PeerInner<C, T, B> {
fn send_staged(&self) -> bool {
let mut sent = false;
let mut staged = self.staged_packets.lock();
loop {
match staged.pop_front() {
Some(msg) => {
sent = true;
self.send_raw(msg);
}
None => break sent,
}
}
}
fn send_raw(&self, msg: Vec<u8>) -> bool {
match self.send_job(msg) {
Some(job) => {
debug!("send_raw: got obtained send_job");
let index = self.device.queue_next.fetch_add(1, Ordering::SeqCst);
let queues = self.device.queues.lock();
match queues[index % queues.len()].send(job) {
Ok(_) => true,
Err(_) => false,
}
}
None => false,
}
}
pub fn confirm_key(&self, keypair: &Arc<KeyPair>) { pub fn confirm_key(&self, keypair: &Arc<KeyPair>) {
// take lock and check keypair = keys.next // take lock and check keypair = keys.next
let mut keys = self.keys.lock(); let mut keys = self.keys.lock();
@@ -240,6 +269,9 @@ impl<C: Callbacks, T: Tun, B: Bind> PeerInner<C, T, B> {
// set new encryption key // set new encryption key
*self.ekey.lock() = ekey; *self.ekey.lock() = ekey;
// start transmission of staged packets
self.send_staged();
} }
pub fn recv_job( pub fn recv_job(
@@ -327,12 +359,16 @@ impl<C: Callbacks, T: Tun, B: Bind> Peer<C, T, B> {
/// ///
/// This API still permits support for the "sticky socket" behavior, /// This API still permits support for the "sticky socket" behavior,
/// as sockets should be "unsticked" when manually updating the endpoint /// as sockets should be "unsticked" when manually updating the endpoint
pub fn set_endpoint(&self, endpoint: SocketAddr) { pub fn set_endpoint(&self, address: SocketAddr) {
*self.state.endpoint.lock() = Some(endpoint.into()); *self.state.endpoint.lock() = Some(B::Endpoint::from_address(address));
} }
pub fn get_endpoint(&self) -> Option<SocketAddr> { pub fn get_endpoint(&self) -> Option<SocketAddr> {
self.state.endpoint.lock().as_ref().map(|e| (*e).into()) self.state
.endpoint
.lock()
.as_ref()
.map(|e| e.into_address())
} }
/// Add a new keypair /// Add a new keypair
@@ -387,21 +423,8 @@ impl<C: Callbacks, T: Tun, B: Bind> Peer<C, T, B> {
// schedule confirmation // schedule confirmation
if new.initiator { if new.initiator {
// attempt to confirm with staged packets
let mut staged = self.state.staged_packets.lock();
let keepalive = staged.len() == 0;
loop {
match staged.pop_front() {
Some(msg) => {
debug!("send staged packet to confirm key-pair");
self.send_raw(msg);
}
None => break,
}
}
// fall back to keepalive packet // fall back to keepalive packet
if keepalive { if !self.state.send_staged() {
let ok = self.keepalive(); let ok = self.keepalive();
debug!("keepalive for confirmation, sent = {}", ok); debug!("keepalive for confirmation, sent = {}", ok);
} }
@@ -411,25 +434,9 @@ impl<C: Callbacks, T: Tun, B: Bind> Peer<C, T, B> {
release release
} }
fn send_raw(&self, msg: Vec<u8>) -> bool {
match self.state.send_job(msg) {
Some(job) => {
debug!("send_raw: got obtained send_job");
let device = &self.state.device;
let index = device.queue_next.fetch_add(1, Ordering::SeqCst);
let queues = device.queues.lock();
match queues[index % queues.len()].send(job) {
Ok(_) => true,
Err(_) => false,
}
}
None => false,
}
}
pub fn keepalive(&self) -> bool { pub fn keepalive(&self) -> bool {
debug!("send keepalive"); debug!("send keepalive");
self.send_raw(vec![0u8; SIZE_MESSAGE_PREFIX]) self.state.send_raw(vec![0u8; SIZE_MESSAGE_PREFIX])
} }
/// Map a subnet to the peer /// Map a subnet to the peer

View File

@@ -12,7 +12,7 @@ use num_cpus;
use pnet::packet::ipv4::MutableIpv4Packet; use pnet::packet::ipv4::MutableIpv4Packet;
use pnet::packet::ipv6::MutableIpv6Packet; use pnet::packet::ipv6::MutableIpv6Packet;
use super::super::types::{Bind, Key, KeyPair, Tun}; use super::super::types::{Bind, Endpoint, Key, KeyPair, Tun};
use super::{Callbacks, Device, SIZE_MESSAGE_PREFIX}; use super::{Callbacks, Device, SIZE_MESSAGE_PREFIX};
extern crate test; extern crate test;
@@ -70,14 +70,11 @@ impl fmt::Display for TunError {
#[derive(Clone, Copy)] #[derive(Clone, Copy)]
struct UnitEndpoint {} struct UnitEndpoint {}
impl From<SocketAddr> for UnitEndpoint { impl Endpoint for UnitEndpoint {
fn from(addr: SocketAddr) -> UnitEndpoint { fn from_address(_: SocketAddr) -> UnitEndpoint {
UnitEndpoint {} UnitEndpoint {}
} }
} fn into_address(&self) -> SocketAddr {
impl Into<SocketAddr> for UnitEndpoint {
fn into(self) -> SocketAddr {
"127.0.0.1:8080".parse().unwrap() "127.0.0.1:8080".parse().unwrap()
} }
} }

View File

@@ -1,6 +1,5 @@
use std::error::Error; use std::error::Error;
use std::fmt; use std::fmt;
use std::marker::PhantomData;
pub trait Opaque: Send + Sync + 'static {} pub trait Opaque: Send + Sync + 'static {}

View File

@@ -1,5 +1,6 @@
use std::net::SocketAddr; use std::net::SocketAddr;
pub trait Endpoint: Into<SocketAddr> + From<SocketAddr> + Copy + Send {} pub trait Endpoint: Send {
fn from_address(addr: SocketAddr) -> Self;
impl<T> Endpoint for T where T: Into<SocketAddr> + From<SocketAddr> + Copy + Send {} fn into_address(&self) -> SocketAddr;
}