Replace RwLock<HashMap> with DashMap in handshake
This commit is contained in:
@@ -1,11 +1,12 @@
|
||||
use spin::RwLock;
|
||||
use std::collections::hash_map;
|
||||
use std::collections::HashMap;
|
||||
use std::net::SocketAddr;
|
||||
use std::sync::Mutex;
|
||||
use zerocopy::AsBytes;
|
||||
|
||||
use byteorder::{ByteOrder, LittleEndian};
|
||||
use dashmap::mapref::entry::Entry;
|
||||
use dashmap::DashMap;
|
||||
use zerocopy::AsBytes;
|
||||
|
||||
use rand::prelude::{CryptoRng, RngCore};
|
||||
use rand::Rng;
|
||||
@@ -36,7 +37,7 @@ pub struct KeyState {
|
||||
/// (the instance is a Peer object in the parent module)
|
||||
pub struct Device<O> {
|
||||
keyst: Option<KeyState>,
|
||||
id_map: RwLock<HashMap<u32, [u8; 32]>>,
|
||||
id_map: DashMap<u32, [u8; 32]>, // concurrent map
|
||||
pk_map: HashMap<[u8; 32], Peer<O>>,
|
||||
limiter: Mutex<RateLimiter>,
|
||||
}
|
||||
@@ -62,7 +63,7 @@ impl<'a, O> Iterator for Iter<'a, O> {
|
||||
*/
|
||||
impl<O> Device<O> {
|
||||
pub fn clear(&mut self) {
|
||||
self.id_map.write().clear();
|
||||
self.id_map.clear();
|
||||
self.pk_map.clear();
|
||||
}
|
||||
|
||||
@@ -96,7 +97,7 @@ impl<O> Device<O> {
|
||||
pub fn new() -> Device<O> {
|
||||
Device {
|
||||
keyst: None,
|
||||
id_map: RwLock::new(HashMap::new()),
|
||||
id_map: DashMap::new(),
|
||||
pk_map: HashMap::new(),
|
||||
limiter: Mutex::new(RateLimiter::new()),
|
||||
}
|
||||
@@ -208,16 +209,14 @@ impl<O> Device<O> {
|
||||
///
|
||||
/// The call might fail if the public key is not found
|
||||
pub fn remove(&mut self, pk: &PublicKey) -> Result<(), ConfigError> {
|
||||
// take write-lock on receive id table
|
||||
let mut id_map = self.id_map.write();
|
||||
|
||||
// remove the peer
|
||||
self.pk_map
|
||||
.remove(pk.as_bytes())
|
||||
.ok_or(ConfigError::new("Public key not in device"))?;
|
||||
|
||||
// purge the id map (linear scan)
|
||||
id_map.retain(|_, v| v != pk.as_bytes());
|
||||
// remove every id entry for the peer in the public key map
|
||||
// O(n) operations, however it is rare: only when removing peers.
|
||||
self.id_map.retain(|_, v| v != pk.as_bytes());
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -265,9 +264,8 @@ impl<O> Device<O> {
|
||||
///
|
||||
/// * `id` - The (sender) id to release
|
||||
pub fn release(&self, id: u32) {
|
||||
let mut m = self.id_map.write();
|
||||
debug_assert!(m.contains_key(&id), "Releasing id not allocated");
|
||||
m.remove(&id);
|
||||
let old = self.id_map.remove(&id);
|
||||
assert!(old.is_some(), "released id not allocated");
|
||||
}
|
||||
|
||||
/// Begin a new handshake
|
||||
@@ -446,32 +444,40 @@ impl<O> Device<O> {
|
||||
//
|
||||
// Return the peer currently associated with the receiver identifier
|
||||
pub(super) fn lookup_id(&self, id: u32) -> Result<(&Peer<O>, PublicKey), HandshakeError> {
|
||||
let im = self.id_map.read();
|
||||
let pk = im.get(&id).ok_or(HandshakeError::UnknownReceiverId)?;
|
||||
match self.pk_map.get(pk) {
|
||||
// obtain a read reference to entry in the id_map
|
||||
let pk = self
|
||||
.id_map
|
||||
.get(&id)
|
||||
.ok_or(HandshakeError::UnknownReceiverId)?;
|
||||
|
||||
// lookup the public key from the pk map
|
||||
match self.pk_map.get(&*pk) {
|
||||
Some(peer) => Ok((peer, PublicKey::from(*pk))),
|
||||
_ => unreachable!(), // if the id-lookup succeeded, the peer should exist
|
||||
_ => unreachable!(),
|
||||
}
|
||||
}
|
||||
|
||||
// Internal function
|
||||
//
|
||||
// Allocated a new receiver identifier for the peer
|
||||
// Allocated a new receiver identifier for the peer.
|
||||
// Implemented via rejection sampling.
|
||||
fn allocate<R: RngCore + CryptoRng>(&self, rng: &mut R, pk: &PublicKey) -> u32 {
|
||||
loop {
|
||||
let id = rng.gen();
|
||||
|
||||
// check membership with read lock
|
||||
if self.id_map.read().contains_key(&id) {
|
||||
// read lock the shard and do quick check
|
||||
if self.id_map.contains_key(&id) {
|
||||
continue;
|
||||
}
|
||||
|
||||
// take write lock and add index
|
||||
let mut m = self.id_map.write();
|
||||
if !m.contains_key(&id) {
|
||||
m.insert(id, *pk.as_bytes());
|
||||
return id;
|
||||
}
|
||||
// write lock the shard and insert
|
||||
match self.id_map.entry(id) {
|
||||
Entry::Vacant(entry) => {
|
||||
entry.insert(*pk.as_bytes());
|
||||
return id;
|
||||
}
|
||||
_ => (),
|
||||
};
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -22,13 +22,15 @@ pub struct PeerInner<T: Tun, B: UDP> {
|
||||
// wireguard device state
|
||||
pub wg: WireGuard<T, B>,
|
||||
|
||||
// TODO: eliminate
|
||||
pub pk: PublicKey,
|
||||
|
||||
// handshake state
|
||||
pub walltime_last_handshake: Mutex<Option<SystemTime>>, // walltime for last handshake (for UAPI status)
|
||||
pub last_handshake_sent: Mutex<Instant>, // instant for last handshake
|
||||
pub handshake_queued: AtomicBool, // is a handshake job currently queued for the peer?
|
||||
pub handshake_queued: AtomicBool, // is a handshake job currently queued?
|
||||
|
||||
// stats and configuration
|
||||
pub pk: PublicKey, // public key (TODO: there has to be a way to remove this)
|
||||
pub rx_bytes: AtomicU64, // received bytes
|
||||
pub tx_bytes: AtomicU64, // transmitted bytes
|
||||
|
||||
|
||||
@@ -3,7 +3,6 @@ use std::ops::Deref;
|
||||
use std::sync::atomic::AtomicBool;
|
||||
use std::sync::Arc;
|
||||
use std::thread;
|
||||
use std::time::Instant;
|
||||
|
||||
use log;
|
||||
use spin::{Mutex, RwLock};
|
||||
|
||||
Reference in New Issue
Block a user