Port replay filter and sketch router state

This commit is contained in:
Mathias Hall-Andersen
2019-08-12 21:04:19 +02:00
parent 0e16901261
commit 723a1b8e85
10 changed files with 298 additions and 86 deletions

14
Cargo.lock generated
View File

@@ -5,6 +5,11 @@ name = "adler32"
version = "1.0.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
name = "arraydeque"
version = "0.4.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
name = "arrayvec"
version = "0.4.11"
@@ -952,6 +957,11 @@ dependencies = [
"tokio-reactor 0.1.9 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "treebitmap"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
name = "typenum"
version = "1.10.0"
@@ -1013,6 +1023,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
name = "wireguard-rs"
version = "0.1.0"
dependencies = [
"arraydeque 0.4.5 (registry+https://github.com/rust-lang/crates.io-index)",
"blake2 0.8.0 (registry+https://github.com/rust-lang/crates.io-index)",
"byteorder 1.3.1 (registry+https://github.com/rust-lang/crates.io-index)",
"digest 0.8.0 (registry+https://github.com/rust-lang/crates.io-index)",
@@ -1027,6 +1038,7 @@ dependencies = [
"spin 0.5.0 (registry+https://github.com/rust-lang/crates.io-index)",
"subtle 2.1.0 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio 0.1.22 (registry+https://github.com/rust-lang/crates.io-index)",
"treebitmap 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)",
"x25519-dalek 0.5.1 (registry+https://github.com/rust-lang/crates.io-index)",
"zerocopy 0.2.7 (registry+https://github.com/rust-lang/crates.io-index)",
]
@@ -1079,6 +1091,7 @@ dependencies = [
[metadata]
"checksum adler32 1.0.3 (registry+https://github.com/rust-lang/crates.io-index)" = "7e522997b529f05601e05166c07ed17789691f562762c7f3b987263d2dedee5c"
"checksum arraydeque 0.4.5 (registry+https://github.com/rust-lang/crates.io-index)" = "f0ffd3d69bd89910509a5d31d1f1353f38ccffdd116dd0099bbd6627f7bd8ad8"
"checksum arrayvec 0.4.11 (registry+https://github.com/rust-lang/crates.io-index)" = "b8d73f9beda665eaa98ab9e4f7442bd4e7de6652587de55b2525e52e29c1b0ba"
"checksum autocfg 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)" = "0e49efa51329a5fd37e7c79db4621af617cd4e3e5bc224939808d076077077bf"
"checksum bit-set 0.5.1 (registry+https://github.com/rust-lang/crates.io-index)" = "e84c238982c4b1e1ee668d136c510c67a13465279c0cb367ea6baf6310620a80"
@@ -1188,6 +1201,7 @@ dependencies = [
"checksum tokio-timer 0.2.11 (registry+https://github.com/rust-lang/crates.io-index)" = "f2106812d500ed25a4f38235b9cae8f78a09edf43203e16e59c3b769a342a60e"
"checksum tokio-udp 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)" = "66268575b80f4a4a710ef83d087fdfeeabdce9b74c797535fbac18a2cb906e92"
"checksum tokio-uds 0.2.5 (registry+https://github.com/rust-lang/crates.io-index)" = "037ffc3ba0e12a0ab4aca92e5234e0dedeb48fddf6ccd260f1f150a36a9f2445"
"checksum treebitmap 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)" = "6bf423939ac9ccf4083788879b883a7149176586f9cf8b0fb1fd88b66ad692b5"
"checksum typenum 1.10.0 (registry+https://github.com/rust-lang/crates.io-index)" = "612d636f949607bdf9b123b4a6f6d966dedf3ff669f7f045890d3a4a73948169"
"checksum ucd-util 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)" = "fa9b3b49edd3468c0e6565d85783f51af95212b6fa3986a5500954f00b460874"
"checksum unicode-xid 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "fc72304796d0818e357ead4e000d19c9c174ab23dc11093ac919054d20a6a7fc"

View File

@@ -19,6 +19,8 @@ sodiumoxide = "0.2.2"
lazy_static = "^1.3"
tokio = "0.1.22"
futures = "0.1.28"
arraydeque = "^0.4"
treebitmap = "^0.4"
[dependencies.x25519-dalek]
version = "^0.5"

View File

@@ -356,12 +356,14 @@ mod tests {
use super::super::messages::*;
use super::*;
use hex;
use std::thread;
use rand::rngs::OsRng;
use std::time::Duration;
use std::net::SocketAddr;
use std::thread;
use std::time::Duration;
fn setup_devices<R: RngCore + CryptoRng>(rng : &mut R) -> (PublicKey, Device<usize>, PublicKey, Device<usize>) {
fn setup_devices<R: RngCore + CryptoRng>(
rng: &mut R,
) -> (PublicKey, Device<usize>, PublicKey, Device<usize>) {
// generate new keypairs
let sk1 = StaticSecret::new(rng);
@@ -404,8 +406,8 @@ mod tests {
let mut rng = OsRng::new().unwrap();
let (_pk1, dev1, pk2, dev2) = setup_devices(&mut rng);
let src1 : SocketAddr = "172.16.0.1:8080".parse().unwrap();
let src2 : SocketAddr = "172.16.0.2:7070".parse().unwrap();
let src1: SocketAddr = "172.16.0.1:8080".parse().unwrap();
let src2: SocketAddr = "172.16.0.2:7070".parse().unwrap();
// 1. device-1 : create first initation
let msg_init = dev1.begin(&mut rng, &pk2).unwrap();
@@ -413,13 +415,13 @@ mod tests {
// 2. device-2 : responds with CookieReply
let msg_cookie = match dev2.process(&mut rng, &msg_init, Some(&src1)).unwrap() {
(None, Some(msg), None) => msg,
_ => panic!("unexpected response")
_ => panic!("unexpected response"),
};
// device-1 : processes CookieReply (no response)
match dev1.process(&mut rng, &msg_cookie, Some(&src2)).unwrap() {
(None, None, None) => (),
_ => panic!("unexpected response")
_ => panic!("unexpected response"),
}
// avoid initation flood
@@ -433,20 +435,20 @@ mod tests {
(Some(_), Some(msg), Some(kp)) => {
assert_eq!(kp.confirmed, false);
msg
},
_ => panic!("unexpected response")
}
_ => panic!("unexpected response"),
};
// 5. device-1 : responds with CookieReply
let msg_cookie = match dev1.process(&mut rng, &msg_response, Some(&src2)).unwrap() {
(None, Some(msg), None) => msg,
_ => panic!("unexpected response")
_ => panic!("unexpected response"),
};
// device-2 : processes CookieReply (no response)
match dev2.process(&mut rng, &msg_cookie, Some(&src1)).unwrap() {
(None, None, None) => (),
_ => panic!("unexpected response")
_ => panic!("unexpected response"),
}
// avoid initation flood
@@ -460,8 +462,8 @@ mod tests {
(Some(_), Some(msg), Some(kp)) => {
assert_eq!(kp.confirmed, false);
(msg, kp)
},
_ => panic!("unexpected response")
}
_ => panic!("unexpected response"),
};
// device-1 : process noise response
@@ -469,8 +471,8 @@ mod tests {
(Some(_), None, Some(kp)) => {
assert_eq!(kp.confirmed, true);
kp
},
_ => panic!("unexpected response")
}
_ => panic!("unexpected response"),
};
assert_eq!(kp1.send, kp2.recv);

View File

@@ -1,10 +1,10 @@
use spin;
use std::collections::HashMap;
use std::net::IpAddr;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Condvar, Mutex, Arc};
use std::sync::{Arc, Condvar, Mutex};
use std::thread;
use std::time::{Duration, Instant};
use spin;
use lazy_static::lazy_static;
@@ -24,7 +24,7 @@ struct Entry {
pub struct RateLimiter(Arc<RateLimiterInner>);
struct RateLimiterInner{
struct RateLimiterInner {
gc_running: AtomicBool,
gc_dropped: (Mutex<bool>, Condvar),
table: spin::RwLock<HashMap<IpAddr, spin::Mutex<Entry>>>,
@@ -42,13 +42,11 @@ impl Drop for RateLimiter {
impl RateLimiter {
pub fn new() -> Self {
RateLimiter (
Arc::new(RateLimiterInner {
RateLimiter(Arc::new(RateLimiterInner {
gc_dropped: (Mutex::new(false), Condvar::new()),
gc_running: AtomicBool::from(false),
table: spin::RwLock::new(HashMap::new()),
})
)
}))
}
pub fn allow(&self, addr: &IpAddr) -> bool {
@@ -60,8 +58,8 @@ impl RateLimiter {
let mut entry = entry.lock();
// add tokens earned since last time
entry.tokens =
MAX_TOKENS.min(entry.tokens + u64::from(entry.last_time.elapsed().subsec_nanos()));
entry.tokens = MAX_TOKENS
.min(entry.tokens + u64::from(entry.last_time.elapsed().subsec_nanos()));
entry.last_time = Instant::now();
// subtract cost of packet
@@ -94,7 +92,9 @@ impl RateLimiter {
// garbage collect
{
let mut tw = limiter.table.write();
tw.retain(|_, ref mut entry| entry.lock().last_time.elapsed() <= *GC_INTERVAL);
tw.retain(|_, ref mut entry| {
entry.lock().last_time.elapsed() <= *GC_INTERVAL
});
if tw.len() == 0 {
limiter.gc_running.store(false, Ordering::Relaxed);
return;
@@ -102,7 +102,7 @@ impl RateLimiter {
}
// wait until stopped or new GC (~1 every sec)
let res = cvar.wait_timeout(dropped,*GC_INTERVAL).unwrap();
let res = cvar.wait_timeout(dropped, *GC_INTERVAL).unwrap();
dropped = res.0;
}
});
@@ -110,7 +110,6 @@ impl RateLimiter {
allowed
}
}
#[cfg(test)]

View File

@@ -1,4 +1,7 @@
#![feature(test)]
mod handshake;
mod router;
mod types;
use sodiumoxide;

View File

@@ -1,3 +0,0 @@
mod noise;
mod types;
mod router;

156
src/router/anti_replay.rs Normal file
View File

@@ -0,0 +1,156 @@
use std::mem;
// Implementation of RFC 6479.
// https://tools.ietf.org/html/rfc6479
#[cfg(target_pointer_width = "64")]
type Word = u64;
#[cfg(target_pointer_width = "64")]
const REDUNDANT_BIT_SHIFTS: usize = 6;
#[cfg(target_pointer_width = "32")]
type Word = u32;
#[cfg(target_pointer_width = "32")]
const REDUNDANT_BIT_SHIFTS: usize = 5;
const SIZE_OF_WORD: usize = mem::size_of::<Word>() * 8;
const BITMAP_BITLEN: usize = 2048;
const BITMAP_LEN: usize = (BITMAP_BITLEN / SIZE_OF_WORD);
const BITMAP_INDEX_MASK: u64 = BITMAP_LEN as u64 - 1;
const BITMAP_LOC_MASK: u64 = (SIZE_OF_WORD - 1) as u64;
const WINDOW_SIZE: u64 = (BITMAP_BITLEN - SIZE_OF_WORD) as u64;
pub struct AntiReplay {
bitmap: [Word; BITMAP_LEN],
last: u64,
}
impl Default for AntiReplay {
fn default() -> Self {
AntiReplay::new()
}
}
impl AntiReplay {
pub fn new() -> Self {
debug_assert_eq!(1 << REDUNDANT_BIT_SHIFTS, SIZE_OF_WORD);
debug_assert_eq!(BITMAP_BITLEN % SIZE_OF_WORD, 0);
AntiReplay {
last: 0,
bitmap: [0; BITMAP_LEN],
}
}
// Returns true if check is passed, i.e., not a replay or too old.
//
// Unlike RFC 6479, zero is allowed.
fn check(&self, seq: u64) -> bool {
// Larger is always good.
if seq > self.last {
return true;
}
if self.last - seq > WINDOW_SIZE {
return false;
}
let bit_location = seq & BITMAP_LOC_MASK;
let index = (seq >> REDUNDANT_BIT_SHIFTS) & BITMAP_INDEX_MASK;
self.bitmap[index as usize] & (1 << bit_location) == 0
}
// Should only be called if check returns true.
fn update_store(&mut self, seq: u64) {
debug_assert!(self.check(seq));
let index = seq >> REDUNDANT_BIT_SHIFTS;
if seq > self.last {
let index_cur = self.last >> REDUNDANT_BIT_SHIFTS;
let diff = index - index_cur;
if diff >= BITMAP_LEN as u64 {
self.bitmap = [0; BITMAP_LEN];
} else {
for i in 0..diff {
let real_index = (index_cur + i + 1) & BITMAP_INDEX_MASK;
self.bitmap[real_index as usize] = 0;
}
}
self.last = seq;
}
let index = index & BITMAP_INDEX_MASK;
let bit_location = seq & BITMAP_LOC_MASK;
self.bitmap[index as usize] |= 1 << bit_location;
}
/// Checks and marks a sequence number in the replay filter
///
/// # Arguments
///
/// - seq: Sequence number check for replay and add to filter
///
/// # Returns
///
/// Ok(()) if sequence number is valid (not marked and not behind the moving window).
/// Err if the sequence number is invalid (already marked or "too old").
pub fn update(&mut self, seq: u64) -> Result<(), ()> {
if self.check(seq) {
self.update_store(seq);
Ok(())
} else {
Err(())
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn anti_replay() {
let mut ar = AntiReplay::new();
for i in 0..20000 {
ar.update(i).unwrap();
}
for i in (0..20000).rev() {
assert!(!ar.check(i));
}
ar.update(65536).unwrap();
for i in (65536 - WINDOW_SIZE)..65535 {
ar.update(i).unwrap();
}
for i in (65536 - 10 * WINDOW_SIZE)..65535 {
assert!(!ar.check(i));
}
ar.update(66000).unwrap();
for i in 65537..66000 {
ar.update(i).unwrap();
}
for i in 65537..66000 {
assert!(ar.update(i).is_err());
}
// Test max u64.
let next = u64::max_value();
ar.update(next).unwrap();
assert!(!ar.check(next));
for i in (next - WINDOW_SIZE)..next {
ar.update(i).unwrap();
}
for i in (next - 20 * WINDOW_SIZE)..next {
assert!(!ar.check(i));
}
}
}

View File

@@ -4,20 +4,23 @@
* 2. Inserting into the buffer always succeeds, but might overwrite the oldest item
*/
const BUFFER_SIZE : usize = 1024;
const BUFFER_SIZE: usize = 1024;
pub struct DiscardingRingBuffer<T> {
buf : [ Option<T> ; BUFFER_SIZE],
idx : usize,
next : usize
buf: [Option<T>; BUFFER_SIZE],
idx: usize,
next: usize,
}
impl <T>DiscardingRingBuffer<T> where T: Copy {
impl<T> DiscardingRingBuffer<T>
where
T: Copy,
{
pub fn new() -> Self {
DiscardingRingBuffer{
buf : [None; BUFFER_SIZE],
idx : 0,
next : 0
DiscardingRingBuffer {
buf: [None; BUFFER_SIZE],
idx: 0,
next: 0,
}
}
@@ -29,7 +32,7 @@ impl <T>DiscardingRingBuffer<T> where T: Copy {
}
}
pub fn push(&mut self, val : T) {
pub fn push(&mut self, val: T) {
// assign next slot (free / oldest)
self.buf[self.idx] = Some(val);
self.idx += 1;
@@ -57,20 +60,25 @@ impl <T>DiscardingRingBuffer<T> where T: Copy {
pub fn has_element(&self) -> bool {
match self.buf[self.next] {
None => true,
_ => false
_ => false,
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use proptest::prelude::*;
proptest! {
proptest! {
#[test]
fn test_order(elems: Vec<usize>) {
let mut buf = DiscardingRingBuffer<usize>::new();
let mut buf = DiscardingRingBuffer::new();
for e in &elems {
buf.push(e);
}
}
}
}

View File

@@ -1,18 +1,48 @@
use std::net::SocketAddr;
use arraydeque::{ArrayDeque, Wrapping};
use treebitmap::IpLookupTable;
use std::net::{Ipv4Addr, Ipv6Addr, SocketAddr};
use std::sync::atomic::{AtomicPtr, AtomicU64};
use std::sync::{Arc, Mutex};
use std::time::Instant;
use super::super::types::KeyPair;
use super::anti_replay::AntiReplay;
const MAX_STAGED_PACKETS: usize = 128;
pub struct Device {
ipv4: IpLookupTable<Ipv4Addr, Arc<Peer>>,
ipv6: IpLookupTable<Ipv6Addr, Arc<Peer>>,
}
struct KeyState(KeyPair, AntiReplay);
struct EncryptState {
key: [u8; 32], // encryption key
id: u64, // sender id
nonce: AtomicU64, // next available nonce
death: Instant, // can must the key no longer be used:
// (birth + reject-after-time - keepalive-timeout - rekey-timeout)
}
struct KeyWheel {
next: AtomicPtr<Arc<Option<KeyState>>>, // next key state (unconfirmed)
current: AtomicPtr<Arc<Option<KeyState>>>, // current key state (used for encryption)
previous: AtomicPtr<Arc<Option<KeyState>>>, // old key state (used for decryption)
}
pub struct Peer {
staged_packets: Mutex<ArrayDeque<[Vec<u8>; MAX_STAGED_PACKETS], Wrapping>>, // packets awaiting handshake
rx_bytes: AtomicU64, // received bytes
tx_bytes: AtomicU64, // transmitted bytes
keys: KeyWheel, // key-wheel
encryption: AtomicPtr<Arc<EncryptState>>, // current encryption key (starts expired)
}
pub struct PeerRef {}
pub struct PeerRef();
impl Device {
pub fn new() -> Device {
unimplemented!();
}
@@ -54,7 +84,6 @@ impl Device {
unimplemented!();
}
/// Flush the queue of buffered messages awaiting transmission
///
/// # Arguments

View File

@@ -1,2 +1,4 @@
mod anti_replay;
mod buffer;
pub mod device;