Merge pull request #33 from mixalturek/name-threads

Name all threads spawned by dipstick.
This commit is contained in:
Francis Lalonde 2019-01-08 10:20:05 -05:00 committed by GitHub
commit 7ba1a7687e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 56 additions and 35 deletions

View File

@ -29,6 +29,10 @@ impl CancelHandle {
/// Schedule a task to run periodically.
/// Starts a new thread for every task.
///
/// # Panics
///
/// Panics if the OS fails to create a thread.
fn set_schedule<F>(every: Duration, operation: F) -> CancelHandle
where
F: Fn() -> () + Send + 'static,
@ -36,13 +40,16 @@ where
let handle = CancelHandle::new();
let inner_handle = handle.clone();
thread::spawn(move || loop {
thread::sleep(every);
if inner_handle.is_cancelled() {
break;
}
operation();
});
thread::Builder::new()
.name("dipstick-scheduler".to_string())
.spawn(move || loop {
thread::sleep(every);
if inner_handle.is_cancelled() {
break;
}
operation();
})
.unwrap(); // TODO: Panic, change API to return Result?
handle
}

View File

@ -25,24 +25,31 @@ pub trait QueuedInput: Input + Send + Sync + 'static + Sized {
}
}
/// # Panics
///
/// Panics if the OS fails to create a thread.
fn new_async_channel(length: usize) -> Arc<mpsc::SyncSender<InputQueueCmd>> {
let (sender, receiver) = mpsc::sync_channel::<InputQueueCmd>(length);
thread::spawn(move || {
let mut done = false;
while !done {
match receiver.recv() {
Ok(InputQueueCmd::Write(metric, value, labels)) => metric.write(value, labels),
Ok(InputQueueCmd::Flush(scope)) => if let Err(e) = scope.flush() {
debug!("Could not asynchronously flush metrics: {}", e);
},
Err(e) => {
debug!("Async metrics receive loop terminated: {}", e);
// cannot break from within match, use safety pin instead
done = true
thread::Builder::new()
.name("dipstick-queue-in".to_string())
.spawn(move || {
let mut done = false;
while !done {
match receiver.recv() {
Ok(InputQueueCmd::Write(metric, value, labels)) => metric.write(value, labels),
Ok(InputQueueCmd::Flush(scope)) => if let Err(e) = scope.flush() {
debug!("Could not asynchronously flush metrics: {}", e);
},
Err(e) => {
debug!("Async metrics receive loop terminated: {}", e);
// cannot break from within match, use safety pin instead
done = true
}
}
}
}
});
})
.unwrap(); // TODO: Panic, change API to return Result?
Arc::new(sender)
}

View File

@ -28,24 +28,31 @@ pub trait QueuedOutput: Output + Sized {
}
}
/// # Panics
///
/// Panics if the OS fails to create a thread.
fn new_async_channel(length: usize) -> Arc<mpsc::SyncSender<OutputQueueCmd>> {
let (sender, receiver) = mpsc::sync_channel::<OutputQueueCmd>(length);
thread::spawn(move || {
let mut done = false;
while !done {
match receiver.recv() {
Ok(OutputQueueCmd::Write(metric, value, labels)) => metric.write(value, labels),
Ok(OutputQueueCmd::Flush(scope)) => if let Err(e) = scope.flush() {
debug!("Could not asynchronously flush metrics: {}", e);
},
Err(e) => {
debug!("Async metrics receive loop terminated: {}", e);
// cannot break from within match, use safety pin instead
done = true
thread::Builder::new()
.name("dipstick-queue-out".to_string())
.spawn(move || {
let mut done = false;
while !done {
match receiver.recv() {
Ok(OutputQueueCmd::Write(metric, value, labels)) => metric.write(value, labels),
Ok(OutputQueueCmd::Flush(scope)) => if let Err(e) = scope.flush() {
debug!("Could not asynchronously flush metrics: {}", e);
},
Err(e) => {
debug!("Async metrics receive loop terminated: {}", e);
// cannot break from within match, use safety pin instead
done = true
}
}
}
}
});
})
.unwrap(); // TODO: Panic, change API to return Result?
Arc::new(sender)
}