Remove crossbeam dependency
This commit is contained in:
@@ -4,6 +4,7 @@ use super::router;
|
||||
use super::timers::{Events, Timers};
|
||||
use super::{Peer, PeerInner};
|
||||
|
||||
use super::queue::ParallelQueue;
|
||||
use super::tun;
|
||||
use super::tun::Reader as TunReader;
|
||||
|
||||
@@ -35,7 +36,6 @@ use rand::Rng;
|
||||
use spin::{Mutex, RwLock};
|
||||
|
||||
use byteorder::{ByteOrder, LittleEndian};
|
||||
use crossbeam_channel::{bounded, Sender};
|
||||
use x25519_dalek::{PublicKey, StaticSecret};
|
||||
|
||||
const SIZE_HANDSHAKE_QUEUE: usize = 128;
|
||||
@@ -99,7 +99,7 @@ pub struct WireguardInner<T: tun::Tun, B: udp::UDP> {
|
||||
handshake: RwLock<handshake::Device>,
|
||||
under_load: AtomicBool,
|
||||
pending: AtomicUsize, // num of pending handshake packets in queue
|
||||
queue: Mutex<Sender<HandshakeJob<B::Endpoint>>>,
|
||||
queue: ParallelQueue<HandshakeJob<B::Endpoint>>,
|
||||
}
|
||||
|
||||
impl<T: tun::Tun, B: udp::UDP> PeerInner<T, B> {
|
||||
@@ -123,7 +123,7 @@ impl<T: tun::Tun, B: udp::UDP> PeerInner<T, B> {
|
||||
|
||||
if !self.handshake_queued.swap(true, Ordering::SeqCst) {
|
||||
self.wg.pending.fetch_add(1, Ordering::SeqCst);
|
||||
self.queue.lock().send(HandshakeJob::New(self.pk)).unwrap();
|
||||
self.wg.queue.send(HandshakeJob::New(self.pk));
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -288,7 +288,6 @@ impl<T: tun::Tun, B: udp::UDP> Wireguard<T, B> {
|
||||
walltime_last_handshake: Mutex::new(None),
|
||||
last_handshake_sent: Mutex::new(Instant::now() - TIME_HORIZON),
|
||||
handshake_queued: AtomicBool::new(false),
|
||||
queue: Mutex::new(self.state.queue.lock().clone()),
|
||||
rx_bytes: AtomicU64::new(0),
|
||||
tx_bytes: AtomicU64::new(0),
|
||||
timers: RwLock::new(Timers::dummy(&self.runner)),
|
||||
@@ -386,10 +385,7 @@ impl<T: tun::Tun, B: udp::UDP> Wireguard<T, B> {
|
||||
}
|
||||
|
||||
// add to handshake queue
|
||||
wg.queue
|
||||
.lock()
|
||||
.send(HandshakeJob::Message(msg, src))
|
||||
.unwrap();
|
||||
wg.queue.send(HandshakeJob::Message(msg, src));
|
||||
}
|
||||
router::TYPE_TRANSPORT => {
|
||||
debug!("{} : reader, received transport message", wg);
|
||||
@@ -477,12 +473,14 @@ impl<T: tun::Tun, B: udp::UDP> Wireguard<T, B> {
|
||||
}
|
||||
|
||||
pub fn new(writer: T::Writer) -> Wireguard<T, B> {
|
||||
// workers equal to number of physical cores
|
||||
let cpus = num_cpus::get();
|
||||
|
||||
// create device state
|
||||
let mut rng = OsRng::new().unwrap();
|
||||
|
||||
// handshake queue
|
||||
let (tx, rx): (Sender<HandshakeJob<B::Endpoint>>, _) = bounded(SIZE_HANDSHAKE_QUEUE);
|
||||
|
||||
let (tx, mut rxs) = ParallelQueue::new(cpus, 128);
|
||||
let wg = Arc::new(WireguardInner {
|
||||
enabled: RwLock::new(false),
|
||||
tun_readers: WaitHandle::new(),
|
||||
@@ -494,13 +492,12 @@ impl<T: tun::Tun, B: udp::UDP> Wireguard<T, B> {
|
||||
pending: AtomicUsize::new(0),
|
||||
handshake: RwLock::new(handshake::Device::new()),
|
||||
under_load: AtomicBool::new(false),
|
||||
queue: Mutex::new(tx),
|
||||
queue: tx,
|
||||
});
|
||||
|
||||
// start handshake workers
|
||||
for _ in 0..num_cpus::get() {
|
||||
while let Some(rx) = rxs.pop() {
|
||||
let wg = wg.clone();
|
||||
let rx = rx.clone();
|
||||
thread::spawn(move || {
|
||||
debug!("{} : handshake worker, started", wg);
|
||||
|
||||
@@ -509,16 +506,18 @@ impl<T: tun::Tun, B: udp::UDP> Wireguard<T, B> {
|
||||
|
||||
// process elements from the handshake queue
|
||||
for job in rx {
|
||||
// decrement pending
|
||||
// decrement pending pakcets (under_load)
|
||||
let job: HandshakeJob<B::Endpoint> = job;
|
||||
wg.pending.fetch_sub(1, Ordering::SeqCst);
|
||||
|
||||
let device = wg.handshake.read();
|
||||
// demultiplex staged handshake jobs and handshake messages
|
||||
match job {
|
||||
HandshakeJob::Message(msg, src) => {
|
||||
// feed message to handshake device
|
||||
let src_validate = (&src).into_address(); // TODO avoid
|
||||
|
||||
// process message
|
||||
let device = wg.handshake.read();
|
||||
match device.process(
|
||||
&mut rng,
|
||||
&msg[..],
|
||||
@@ -599,6 +598,7 @@ impl<T: tun::Tun, B: udp::UDP> Wireguard<T, B> {
|
||||
"{} : handshake worker, new handshake requested for {}",
|
||||
wg, peer
|
||||
);
|
||||
let device = wg.handshake.read();
|
||||
let _ = device.begin(&mut rng, &peer.pk).map(|msg| {
|
||||
let _ = peer.router.send(&msg[..]).map_err(|e| {
|
||||
debug!("{} : handshake worker, failed to send handshake initiation, error = {}", wg, e)
|
||||
|
||||
Reference in New Issue
Block a user