Additional in-order queue test for router
This commit is contained in:
@@ -97,25 +97,92 @@ impl<J: SequentialJob> Queue<J> {
|
|||||||
mod tests {
|
mod tests {
|
||||||
use super::*;
|
use super::*;
|
||||||
|
|
||||||
use std::sync::Arc;
|
|
||||||
use std::thread;
|
use std::thread;
|
||||||
|
|
||||||
|
use std::sync::Arc;
|
||||||
|
use std::time::Duration;
|
||||||
|
|
||||||
use rand::thread_rng;
|
use rand::thread_rng;
|
||||||
use rand::Rng;
|
use rand::Rng;
|
||||||
|
|
||||||
struct TestJob {}
|
#[test]
|
||||||
|
fn test_consume_queue() {
|
||||||
impl SequentialJob for TestJob {
|
struct TestJob {
|
||||||
fn is_ready(&self) -> bool {
|
cnt: Arc<AtomicUsize>,
|
||||||
true
|
wait_sequential: Duration,
|
||||||
}
|
}
|
||||||
|
|
||||||
fn sequential_work(self) {}
|
impl SequentialJob for TestJob {
|
||||||
|
fn is_ready(&self) -> bool {
|
||||||
|
true
|
||||||
|
}
|
||||||
|
|
||||||
|
fn sequential_work(self) {
|
||||||
|
thread::sleep(self.wait_sequential);
|
||||||
|
self.cnt.fetch_add(1, Ordering::SeqCst);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn hammer(queue: &Arc<Queue<TestJob>>, cnt: Arc<AtomicUsize>) -> usize {
|
||||||
|
let mut jobs = 0;
|
||||||
|
let mut rng = thread_rng();
|
||||||
|
for _ in 0..10_000 {
|
||||||
|
if rng.gen() {
|
||||||
|
let wait_sequential: u64 = rng.gen();
|
||||||
|
let wait_sequential = wait_sequential % 1000;
|
||||||
|
|
||||||
|
let wait_parallel: u64 = rng.gen();
|
||||||
|
let wait_parallel = wait_parallel % 1000;
|
||||||
|
|
||||||
|
thread::sleep(Duration::from_micros(wait_parallel));
|
||||||
|
|
||||||
|
queue.push(TestJob {
|
||||||
|
cnt: cnt.clone(),
|
||||||
|
wait_sequential: Duration::from_micros(wait_sequential),
|
||||||
|
});
|
||||||
|
jobs += 1;
|
||||||
|
} else {
|
||||||
|
queue.consume();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
queue.consume();
|
||||||
|
jobs
|
||||||
|
}
|
||||||
|
|
||||||
|
let queue = Arc::new(Queue::new());
|
||||||
|
let counter = Arc::new(AtomicUsize::new(0));
|
||||||
|
|
||||||
|
// repeatedly apply operations randomly from concurrent threads
|
||||||
|
let other = {
|
||||||
|
let queue = queue.clone();
|
||||||
|
let counter = counter.clone();
|
||||||
|
thread::spawn(move || hammer(&queue, counter))
|
||||||
|
};
|
||||||
|
let mut jobs = hammer(&queue, counter.clone());
|
||||||
|
|
||||||
|
// wait, consume and check empty
|
||||||
|
jobs += other.join().unwrap();
|
||||||
|
assert_eq!(queue.queue.lock().len(), 0, "elements left in queue");
|
||||||
|
assert_eq!(
|
||||||
|
jobs,
|
||||||
|
counter.load(Ordering::Acquire),
|
||||||
|
"did not consume every job"
|
||||||
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Fuzz the Queue */
|
/* Fuzz the Queue */
|
||||||
#[test]
|
#[test]
|
||||||
fn test_queue() {
|
fn test_fuzz_queue() {
|
||||||
|
struct TestJob {}
|
||||||
|
|
||||||
|
impl SequentialJob for TestJob {
|
||||||
|
fn is_ready(&self) -> bool {
|
||||||
|
true
|
||||||
|
}
|
||||||
|
|
||||||
|
fn sequential_work(self) {}
|
||||||
|
}
|
||||||
|
|
||||||
fn hammer(queue: &Arc<Queue<TestJob>>) {
|
fn hammer(queue: &Arc<Queue<TestJob>>) {
|
||||||
let mut rng = thread_rng();
|
let mut rng = thread_rng();
|
||||||
for _ in 0..1_000_000 {
|
for _ in 0..1_000_000 {
|
||||||
|
|||||||
Reference in New Issue
Block a user