Derieve clone for timer (handle)

This commit is contained in:
Mathias Hall-Andersen
2019-08-22 15:46:34 +02:00
parent 543efad980
commit 9528d19cc9

View File

@@ -12,7 +12,7 @@ extern crate test;
type TimerID = u64; type TimerID = u64;
type TimerKey = (u64, usize); type TimerKey = (u64, usize);
type Callback = (Arc<AtomicBool>, Box<dyn Fn() -> () + Send + 'static>); type Callback = (Weak<TimerInner>, Box<dyn Fn() -> () + Send + 'static>);
const ACCURACY: Duration = Duration::from_millis(100); const ACCURACY: Duration = Duration::from_millis(100);
const OFFSET: Duration = Duration::from_millis(1000); const OFFSET: Duration = Duration::from_millis(1000);
@@ -24,13 +24,16 @@ struct RunnerInner {
callback: spin::Mutex<HashMap<TimerID, Callback>>, callback: spin::Mutex<HashMap<TimerID, Callback>>,
} }
pub struct Timer { struct TimerInner {
pending: Arc<AtomicBool>,
runner: Weak<RunnerInner>,
id: u64, id: u64,
pending: AtomicBool,
runner: Weak<RunnerInner>,
cnt: AtomicUsize, cnt: AtomicUsize,
} }
#[derive(Clone)]
pub struct Timer(Arc<TimerInner>);
pub struct Runner(Arc<RunnerInner>, Option<thread::JoinHandle<()>>); pub struct Runner(Arc<RunnerInner>, Option<thread::JoinHandle<()>>);
impl Runner { impl Runner {
@@ -64,13 +67,13 @@ impl Runner {
// handle expired events // handle expired events
for key in &expired { for key in &expired {
if let Some((pending, callback)) = inner.callback.lock().get(&key.0) { let callbacks = inner.callback.lock();
if pending.swap(false, Ordering::SeqCst) { let (timer, callback) = callbacks.get(&key.0).unwrap();
if let Some(timer) = timer.upgrade() {
if timer.pending.swap(false, Ordering::SeqCst) {
callback(); callback();
} }
} else { }
unreachable!()
};
} }
} }
}) })
@@ -81,56 +84,59 @@ impl Runner {
pub fn timer(&self, callback: Box<dyn Fn() -> () + Send + 'static>) -> Timer { pub fn timer(&self, callback: Box<dyn Fn() -> () + Send + 'static>) -> Timer {
let id = self.0.keys.fetch_add(1, Ordering::Relaxed); let id = self.0.keys.fetch_add(1, Ordering::Relaxed);
let pending = Arc::new(AtomicBool::new(false)); let inner = Arc::new(TimerInner {
id,
pending: AtomicBool::new(false),
runner: Arc::downgrade(&self.0.clone()),
cnt: AtomicUsize::new(0),
});
assert!(id < u64::MAX, "wrapping of ids"); assert!(id < u64::MAX, "wrapping of ids");
self.0 self.0
.callback .callback
.lock() .lock()
.insert(id, (pending.clone(), callback)); .insert(id, (Arc::downgrade(&inner), callback));
Timer { Timer(inner)
id,
pending: pending,
runner: Arc::downgrade(&self.0.clone()),
cnt: AtomicUsize::new(0),
}
} }
} }
impl Timer { impl Timer {
pub fn reset(&self, duration: Duration) { pub fn reset(&self, duration: Duration) {
if let Some(runner) = self.runner.upgrade() { let timer = &self.0;
if let Some(runner) = timer.runner.upgrade() {
let mut wheel = runner.wheel.lock(); let mut wheel = runner.wheel.lock();
let cnt = self.cnt.fetch_add(1, Ordering::SeqCst); let cnt = timer.cnt.fetch_add(1, Ordering::SeqCst);
self.pending.store(true, Ordering::SeqCst); timer.pending.store(true, Ordering::SeqCst);
wheel.stop((self.id, cnt)); wheel.stop((timer.id, cnt));
wheel.start((self.id, cnt + 1), duration - OFFSET); wheel.start((timer.id, cnt + 1), duration - OFFSET);
} }
} }
pub fn start(&self, duration: Duration) { pub fn start(&self, duration: Duration) {
if self.pending.load(Ordering::Acquire) { let timer = &self.0;
if timer.pending.load(Ordering::Acquire) {
return; return;
} }
if let Some(runner) = self.runner.upgrade() { if let Some(runner) = timer.runner.upgrade() {
let mut wheel = runner.wheel.lock(); let mut wheel = runner.wheel.lock();
if !self.pending.swap(true, Ordering::SeqCst) { if !timer.pending.swap(true, Ordering::SeqCst) {
let cnt = self.cnt.fetch_add(1, Ordering::SeqCst); let cnt = timer.cnt.fetch_add(1, Ordering::SeqCst);
wheel.start((self.id, cnt + 1), duration - OFFSET); wheel.start((timer.id, cnt + 1), duration - OFFSET);
} }
} }
} }
pub fn stop(&self) { pub fn stop(&self) {
if self.pending.load(Ordering::Acquire) { let timer = &self.0;
if let Some(runner) = self.runner.upgrade() { if timer.pending.load(Ordering::Acquire) {
if let Some(runner) = timer.runner.upgrade() {
let mut wheel = runner.wheel.lock(); let mut wheel = runner.wheel.lock();
if self.pending.swap(false, Ordering::SeqCst) { if timer.pending.swap(false, Ordering::SeqCst) {
let cnt = self.cnt.load(Ordering::SeqCst); let cnt = timer.cnt.load(Ordering::SeqCst);
wheel.stop((self.id, cnt)); wheel.stop((timer.id, cnt));
} }
} }
} }
@@ -146,12 +152,6 @@ impl Drop for Runner {
} }
} }
impl Drop for Timer {
fn drop(&mut self) {
self.stop();
}
}
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::*; use super::*;