Igni/affine steal sort (#261)

Executor rework
This commit is contained in:
Jeremy Lempereur 2020-09-24 21:13:42 +02:00 committed by GitHub
parent 75904cfe43
commit aa066f2cea
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
28 changed files with 1033 additions and 726 deletions

View File

@ -25,7 +25,7 @@ travis-ci = { repository = "bastion-rs/bastion", branch = "master" }
maintenance = { status = "actively-developed" }
[features]
unstable = ["numanji", "allocator-suite", "jemallocator"]
unstable = []
[dependencies]
lightproc = "0.3.5"
@ -42,13 +42,12 @@ num_cpus = "1.13"
pin-utils = "0.1.0"
# Allocator
numanji = { version = "^0.1", optional = true, default-features = false }
allocator-suite = { version = "^0.1", optional = true, default-features = false }
arrayvec = { version = "0.5.1", features = ["array-sizes-129-255"]}
futures-timer = "3.0.2"
[target.'cfg(not(any(target_os = "android", target_os = "linux")))'.dependencies]
jemallocator = { version = "^0.3", optional = true, default-features = false }
once_cell = "1.4.0"
lever = "0.1.1-alpha.11"
tracing = "0.1.19"
crossbeam-queue = "0.2.3"
[target.'cfg(target_os = "windows")'.dependencies]
winapi = { version = "^0.3.8", features = ["basetsd"] }
@ -56,3 +55,4 @@ winapi = { version = "^0.3.8", features = ["basetsd"] }
[dev-dependencies]
proptest = "^0.10"
futures = "0.3.5"
tracing-subscriber = "0.2.11"

View File

@ -3,10 +3,7 @@
extern crate test;
use bastion_executor::blocking;
use bastion_executor::run::run;
use futures::future::join_all;
use lightproc::proc_stack::ProcStack;
use lightproc::recoverable_handle::RecoverableHandle;
use std::thread;
use std::time::Duration;
use test::Bencher;
@ -15,7 +12,7 @@ use test::Bencher;
#[bench]
fn blocking(b: &mut Bencher) {
b.iter(|| {
let handles = (0..10_000)
(0..10_000)
.map(|_| {
blocking::spawn_blocking(
async {
@ -25,9 +22,7 @@ fn blocking(b: &mut Bencher) {
ProcStack::default(),
)
})
.collect::<Vec<RecoverableHandle<()>>>();
run(join_all(handles), ProcStack::default());
.collect::<Vec<_>>()
});
}

View File

@ -0,0 +1,48 @@
#![feature(test)]
extern crate test;
use bastion_executor::blocking;
use bastion_executor::run::run;
use futures::future::join_all;
use lightproc::proc_stack::ProcStack;
use std::thread;
use std::time::Duration;
use test::Bencher;
// Benchmark for a 10K burst task spawn
#[bench]
fn run_blocking(b: &mut Bencher) {
b.iter(|| {
let handles = (0..10_000)
.map(|_| {
blocking::spawn_blocking(
async {
let duration = Duration::from_millis(1);
thread::sleep(duration);
},
ProcStack::default(),
)
})
.collect::<Vec<_>>();
run(join_all(handles), ProcStack::default())
});
}
// Benchmark for a single blocking task spawn
#[bench]
fn run_blocking_single(b: &mut Bencher) {
b.iter(|| {
run(
blocking::spawn_blocking(
async {
let duration = Duration::from_millis(1);
thread::sleep(duration);
},
ProcStack::default(),
),
ProcStack::default(),
)
});
}

View File

@ -2,54 +2,43 @@
extern crate test;
use bastion_executor::load_balancer;
use bastion_executor::prelude::spawn;
use bastion_executor::run::run;
use futures::future::join_all;
use futures_timer::Delay;
use lightproc::proc_stack::ProcStack;
use lightproc::recoverable_handle::RecoverableHandle;
use std::time::Duration;
use test::Bencher;
// Benchmark for a 10K burst task spawn
#[bench]
fn spawn_lot(b: &mut Bencher) {
let proc_stack = ProcStack::default();
b.iter(|| {
let proc_stack = ProcStack::default();
let handles = (0..10_000)
let _ = (0..10_000)
.map(|_| {
spawn(
async {
let duration = Duration::from_millis(0);
let duration = Duration::from_millis(1);
Delay::new(duration).await;
},
proc_stack.clone(),
)
})
.collect::<Vec<RecoverableHandle<()>>>();
run(join_all(handles), proc_stack);
.collect::<Vec<_>>();
});
}
// Benchmark for a single blocking task spawn
// Benchmark for a single task spawn
#[bench]
fn spawn_single(b: &mut Bencher) {
let proc_stack = ProcStack::default();
b.iter(|| {
let proc_stack = ProcStack::default();
let handle = spawn(
spawn(
async {
let duration = Duration::from_millis(0);
let duration = Duration::from_millis(1);
Delay::new(duration).await;
},
proc_stack.clone(),
);
run(
async {
handle.await;
},
proc_stack,
)
});
}

View File

@ -1,16 +1,16 @@
#![feature(test)]
extern crate test;
use bastion_executor::load_balancer::{stats, SmpStats};
use bastion_executor::load_balancer::{core_count, get_cores, stats, SmpStats};
use bastion_executor::placement;
use std::thread;
use test::Bencher;
fn stress_stats<S: SmpStats + Sync + Send>(stats: &'static S) {
let cores = placement::get_core_ids().expect("Core mapping couldn't be fetched");
let mut handles = Vec::new();
for core in cores {
let mut handles = Vec::with_capacity(*core_count());
for core in get_cores() {
let handle = thread::spawn(move || {
placement::set_for_current(core);
placement::set_for_current(*core);
for i in 0..100 {
stats.store_load(core.id, 10);
if i % 3 == 0 {
@ -25,7 +25,6 @@ fn stress_stats<S: SmpStats + Sync + Send>(stats: &'static S) {
handle.join().unwrap();
}
}
use test::Bencher;
// previous lock based stats benchmark 1,352,791 ns/iter (+/- 2,682,013)
@ -36,3 +35,37 @@ fn lockless_stats_bench(b: &mut Bencher) {
stress_stats(stats());
});
}
#[bench]
fn lockless_stats_bad_load(b: &mut Bencher) {
let stats = stats();
const MAX_CORE: usize = 256;
for i in 0..MAX_CORE {
// Generating the worst possible mergesort scenario
// [0,2,4,6,8,10,1,3,5,7,9]...
if i <= MAX_CORE / 2 {
stats.store_load(i, i * 2);
} else {
stats.store_load(i, i - 1 - MAX_CORE / 2);
}
}
b.iter(|| {
let _sorted_load = stats.get_sorted_load();
});
}
#[bench]
fn lockless_stats_good_load(b: &mut Bencher) {
let stats = stats();
const MAX_CORE: usize = 256;
for i in 0..MAX_CORE {
// Generating the best possible mergesort scenario
// [0,1,2,3,4,5,6,7,8,9]...
stats.store_load(i, i);
}
b.iter(|| {
let _sorted_load = stats.get_sorted_load();
});
}

View File

@ -1,21 +0,0 @@
//!
//! NUMA-aware locality enabled allocator with optional fallback.
//!
//! Currently this API marked as `unstable` and can only be used with `unstable` feature.
//!
//! This allocator checks for NUMA-aware locality, and if it suitable it can start
//! this allocator with local allocation policy [MPOL_LOCAL].
//! In other cases otherwise it tries to use jemalloc.
//!
//! This allocator is an allocator called [Numanji].
//!
//! [Numanji]: https://docs.rs/numanji
//! [MPOL_LOCAL]: http://man7.org/linux/man-pages/man2/set_mempolicy.2.html
//!
unstable_api! {
// Allocation selector import
use numanji::*;
// Drive selection of allocator here
autoselect!();
}

View File

@ -1,321 +1,28 @@
//! A thread pool for running blocking functions asynchronously.
//!
//! Blocking thread pool consists of four elements:
//! * Frequency Detector
//! * Trend Estimator
//! * Predictive Upscaler
//! * Time-based Downscaler
//! Pool of threads to run heavy processes
//!
//! ## Frequency Detector
//! Detects how many tasks are submitted from scheduler to thread pool in a given time frame.
//! Pool manager thread does this sampling every 200 milliseconds.
//! This value is going to be used for trend estimation phase.
//!
//! ## Trend Estimator
//! Hold up to the given number of frequencies to create an estimation.
//! Trend estimator holds 10 frequencies at a time.
//! This value is stored as constant in [FREQUENCY_QUEUE_SIZE](constant.FREQUENCY_QUEUE_SIZE.html).
//! Estimation algorithm and prediction uses Exponentially Weighted Moving Average algorithm.
//!
//! This algorithm is adapted from [A Novel Predictive and SelfAdaptive Dynamic Thread Pool Management](https://doi.org/10.1109/ISPA.2011.61)
//! and altered to:
//! * use instead of heavy calculation of trend, utilize thread redundancy which is the sum of the differences between the predicted and observed value.
//! * use instead of linear trend estimation, it uses exponential trend estimation where formula is:
//! ```text
//! LOW_WATERMARK * (predicted - observed) + LOW_WATERMARK
//! ```
//! *NOTE:* If this algorithm wants to be tweaked increasing [LOW_WATERMARK](constant.LOW_WATERMARK.html) will automatically adapt the additional dynamic thread spawn count
//! * operate without watermarking by timestamps (in paper which is used to measure algorithms own performance during the execution)
//! * operate extensive subsampling. Extensive subsampling congests the pool manager thread.
//! * operate without keeping track of idle time of threads or job out queue like TEMA and FOPS implementations.
//!
//! ## Predictive Upscaler
//! Upscaler has three cases (also can be seen in paper):
//! * The rate slightly increases and there are many idle threads.
//! * The number of worker threads tends to be reduced since the workload of the system is descending.
//! * The system has no request or stalled. (Our case here is when the current tasks block further tasks from being processed throughput hogs)
//!
//! For the first two EMA calculation and exponential trend estimation gives good performance.
//! For the last case, upscaler selects upscaling amount by amount of tasks mapped when throughput hogs happen.
//!
//! **example scenario:** Let's say we have 10_000 tasks where every one of them is blocking for 1 second. Scheduler will map plenty of tasks but will got rejected.
//! This makes estimation calculation nearly 0 for both entering and exiting parts. When this happens and we still see tasks mapped from scheduler.
//! We start to slowly increase threads by amount of frequency linearly. High increase of this value either make us hit to the thread threshold on
//! some OS or make congestion on the other thread utilizations of the program, because of context switch.
//!
//! Throughput hogs determined by a combination of job in / job out frequency and current scheduler task assignment frequency.
//! Threshold of EMA difference is eluded by machine epsilon for floating point arithmetic errors.
//!
//! ## Time-based Downscaler
//! When threads becomes idle, they will not shut down immediately.
//! Instead, they wait a random amount between 1 and 11 seconds
//! to even out the load.
//! We spawn futures onto the pool with [spawn_blocking] method of global run queue or
//! with corresponding [Worker]'s spawn method.
use std::collections::VecDeque;
use std::future::Future;
use std::io::ErrorKind;
use std::iter::Iterator;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Mutex;
use std::time::Duration;
use std::{env, thread};
use crossbeam_channel::{bounded, Receiver, Sender};
use bastion_utils::math;
use crate::thread_manager::{DynamicPoolManager, DynamicRunner};
use crossbeam_channel::{unbounded, Receiver, Sender};
use lazy_static::lazy_static;
use lightproc::lightproc::LightProc;
use lightproc::proc_stack::ProcStack;
use lightproc::recoverable_handle::RecoverableHandle;
use crate::placement::CoreId;
use crate::{load_balancer, placement};
use once_cell::sync::{Lazy, OnceCell};
use std::future::Future;
use std::iter::Iterator;
use std::sync::Arc;
use std::time::Duration;
use std::{env, thread};
use tracing::trace;
/// If low watermark isn't configured this is the default scaler value.
/// This value is used for the heuristics of the scaler
const DEFAULT_LOW_WATERMARK: u64 = 2;
/// Pool managers interval time (milliseconds).
/// This is the actual interval which makes adaptation calculation.
const MANAGER_POLL_INTERVAL: u64 = 200;
/// Frequency histogram's sliding window size.
/// Defines how many frequencies will be considered for adaptation.
const FREQUENCY_QUEUE_SIZE: usize = 10;
/// Exponential moving average smoothing coefficient for limited window.
/// Smoothing factor is estimated with: 2 / (N + 1) where N is sample size.
const EMA_COEFFICIENT: f64 = 2_f64 / (FREQUENCY_QUEUE_SIZE as f64 + 1_f64);
/// Pool task frequency variable.
/// Holds scheduled tasks onto the thread pool for the calculation time window.
static FREQUENCY: AtomicU64 = AtomicU64::new(0);
/// Possible max threads (without OS contract).
static MAX_THREADS: AtomicU64 = AtomicU64::new(10_000);
/// Pool interface between the scheduler and thread pool
struct Pool {
sender: Sender<LightProc>,
receiver: Receiver<LightProc>,
}
lazy_static! {
/// Blocking pool with static starting thread count.
static ref POOL: Pool = {
for _ in 0..*low_watermark() {
thread::Builder::new()
.name("bastion-blocking-driver".to_string())
.spawn(|| {
self::affinity_pinner();
for task in &POOL.receiver {
task.run();
}
})
.expect("cannot start a thread driving blocking tasks");
}
// Pool manager to check frequency of task rates
// and take action by scaling the pool accordingly.
thread::Builder::new()
.name("bastion-pool-manager".to_string())
.spawn(|| {
let poll_interval = Duration::from_millis(MANAGER_POLL_INTERVAL);
loop {
scale_pool();
thread::sleep(poll_interval);
}
})
.expect("thread pool manager cannot be started");
// We want to use an unbuffered channel here to help
// us drive our dynamic control. In effect, the
// kernel's scheduler becomes the queue, reducing
// the number of buffers that work must flow through
// before being acted on by a core. This helps keep
// latency snappy in the overall async system by
// reducing bufferbloat.
let (sender, receiver) = bounded(0);
Pool { sender, receiver }
};
static ref ROUND_ROBIN_PIN: Mutex<CoreId> = Mutex::new(CoreId { id: 0 });
/// Sliding window for pool task frequency calculation
static ref FREQ_QUEUE: Mutex<VecDeque<u64>> = {
Mutex::new(VecDeque::with_capacity(FREQUENCY_QUEUE_SIZE.saturating_add(1)))
};
/// Dynamic pool thread count variable
static ref POOL_SIZE: Mutex<u64> = Mutex::new(*low_watermark());
}
/// Exponentially Weighted Moving Average calculation
///
/// This allows us to find the EMA value.
/// This value represents the trend of tasks mapped onto the thread pool.
/// Calculation is following:
/// ```text
/// +--------+-----------------+----------------------------------+
/// | Symbol | Identifier | Explanation |
/// +--------+-----------------+----------------------------------+
/// | α | EMA_COEFFICIENT | smoothing factor between 0 and 1 |
/// | Yt | freq | frequency sample at time t |
/// | St | acc | EMA at time t |
/// +--------+-----------------+----------------------------------+
/// ```
/// Under these definitions formula is following:
/// ```text
/// EMA = α * [ Yt + (1 - α)*Yt-1 + ((1 - α)^2)*Yt-2 + ((1 - α)^3)*Yt-3 ... ] + St
/// ```
/// # Arguments
///
/// * `freq_queue` - Sliding window of frequency samples
#[inline]
fn calculate_ema(freq_queue: &VecDeque<u64>) -> f64 {
freq_queue.iter().enumerate().fold(0_f64, |acc, (i, freq)| {
acc + ((*freq as f64) * ((1_f64 - EMA_COEFFICIENT).powf(i as f64) as f64))
}) * EMA_COEFFICIENT as f64
}
/// Adaptive pool scaling function
///
/// This allows to spawn new threads to make room for incoming task pressure.
/// Works in the background detached from the pool system and scales up the pool based
/// on the request rate.
///
/// It uses frequency based calculation to define work. Utilizing average processing rate.
fn scale_pool() {
// Fetch current frequency, it does matter that operations are ordered in this approach.
let current_frequency = FREQUENCY.swap(0, Ordering::SeqCst);
let mut freq_queue = FREQ_QUEUE.lock().unwrap();
// Make it safe to start for calculations by adding initial frequency scale
if freq_queue.len() == 0 {
freq_queue.push_back(0);
}
// Calculate message rate for the given time window
let frequency = (current_frequency as f64 / MANAGER_POLL_INTERVAL as f64) as u64;
// Calculates current time window's EMA value (including last sample)
let prev_ema_frequency = calculate_ema(&freq_queue);
// Add seen frequency data to the frequency histogram.
freq_queue.push_back(frequency);
if freq_queue.len() == FREQUENCY_QUEUE_SIZE.saturating_add(1) {
freq_queue.pop_front();
}
// Calculates current time window's EMA value (including last sample)
let curr_ema_frequency = calculate_ema(&freq_queue);
// Adapts the thread count of pool
//
// Sliding window of frequencies visited by the pool manager.
// Pool manager creates EMA value for previous window and current window.
// Compare them to determine scaling amount based on the trends.
// If current EMA value is bigger, we will scale up.
if curr_ema_frequency > prev_ema_frequency {
// "Scale by" amount can be seen as "how much load is coming".
// "Scale" amount is "how many threads we should spawn".
let scale_by: f64 = curr_ema_frequency - prev_ema_frequency;
let scale = num_cpus::get().min(
((DEFAULT_LOW_WATERMARK as f64 * scale_by) + DEFAULT_LOW_WATERMARK as f64) as usize,
);
// It is time to scale the pool!
(0..scale).for_each(|_| {
create_blocking_thread();
});
} else if (curr_ema_frequency - prev_ema_frequency).abs() < std::f64::EPSILON
&& current_frequency != 0
{
// Throughput is low. Allocate more threads to unblock flow.
// If we fall to this case, scheduler is congested by longhauling tasks.
// For unblock the flow we should add up some threads to the pool, but not that many to
// stagger the program's operation.
(0..DEFAULT_LOW_WATERMARK).for_each(|_| {
create_blocking_thread();
});
}
}
/// Creates blocking thread to receive tasks
/// Dynamic threads will terminate themselves if they don't
/// receive any work after between one and ten seconds.
fn create_blocking_thread() {
// Check that thread is spawnable.
// If it hits to the OS limits don't spawn it.
{
let pool_size = *POOL_SIZE.lock().unwrap();
if pool_size >= MAX_THREADS.load(Ordering::SeqCst) {
MAX_THREADS.store(10_000, Ordering::SeqCst);
return;
}
}
// We want to avoid having all threads terminate at
// exactly the same time, causing thundering herd
// effects. We want to stagger their destruction over
// 10 seconds or so to make the costs fade into
// background noise.
//
// Generate a simple random number of milliseconds
let rand_sleep_ms = 1000_u64
.checked_add(u64::from(math::random(10_000)))
.expect("shouldn't overflow");
let _ = thread::Builder::new()
.name("bastion-blocking-driver-dynamic".to_string())
.spawn(move || {
self::affinity_pinner();
let wait_limit = Duration::from_millis(rand_sleep_ms);
// Adjust the pool size counter before and after spawn
*POOL_SIZE.lock().unwrap() += 1;
while let Ok(task) = POOL.receiver.recv_timeout(wait_limit) {
task.run();
}
*POOL_SIZE.lock().unwrap() -= 1;
})
.map_err(|err| {
match err.kind() {
ErrorKind::WouldBlock => {
// Maximum allowed threads per process is varying from system to system.
// Also, some systems have it(like macOS), and some don't(Linux).
// This case expected not to happen.
// But when happened this shouldn't throw a panic.
let guarded_count = POOL_SIZE
.lock()
.unwrap()
.checked_sub(1)
.expect("shouldn't underflow");
MAX_THREADS.store(guarded_count, Ordering::SeqCst);
}
_ => eprintln!(
"cannot start a dynamic thread driving blocking tasks: {}",
err
),
}
});
}
/// Enqueues work, attempting to send to the thread pool in a
/// nonblocking way and spinning up needed amount of threads
/// based on the previous statistics without relying on
/// if there is not a thread ready to accept the work or not.
fn schedule(t: LightProc) {
// Add up for every incoming scheduled task
FREQUENCY.fetch_add(1, Ordering::Acquire);
if let Err(err) = POOL.sender.try_send(t) {
// We were not able to send to the channel without
// blocking.
POOL.sender.send(err.into_inner()).unwrap();
}
}
const THREAD_RECV_TIMEOUT: Duration = Duration::from_millis(100);
/// Spawns a blocking task.
///
@ -330,12 +37,84 @@ where
handle
}
struct BlockingRunner {}
impl DynamicRunner for BlockingRunner {
fn run_static(&self, park_timeout: Duration) -> ! {
loop {
while let Ok(task) = POOL.receiver.recv_timeout(THREAD_RECV_TIMEOUT) {
trace!("static thread: running task");
task.run();
}
trace!("static: empty queue, parking with timeout");
thread::park_timeout(park_timeout);
}
}
fn run_dynamic(&self, parker: &dyn Fn()) -> ! {
loop {
while let Ok(task) = POOL.receiver.recv_timeout(THREAD_RECV_TIMEOUT) {
trace!("dynamic thread: running task");
task.run();
}
trace!(
"dynamic thread: parking - {:?}",
std::thread::current().id()
);
parker();
}
}
fn run_standalone(&self) {
while let Ok(task) = POOL.receiver.recv_timeout(THREAD_RECV_TIMEOUT) {
task.run();
}
trace!("standalone thread: quitting.");
}
}
/// Pool interface between the scheduler and thread pool
struct Pool {
sender: Sender<LightProc>,
receiver: Receiver<LightProc>,
}
static DYNAMIC_POOL_MANAGER: OnceCell<DynamicPoolManager> = OnceCell::new();
static POOL: Lazy<Pool> = Lazy::new(|| {
let runner = Arc::new(BlockingRunner {});
DYNAMIC_POOL_MANAGER
.set(DynamicPoolManager::new(*low_watermark() as usize, runner))
.expect("couldn't create dynamic pool manager");
DYNAMIC_POOL_MANAGER
.get()
.expect("couldn't get static pool manager")
.initialize();
let (sender, receiver) = unbounded();
Pool { sender, receiver }
});
/// Enqueues work, attempting to send to the thread pool in a
/// nonblocking way and spinning up needed amount of threads
/// based on the previous statistics without relying on
/// if there is not a thread ready to accept the work or not.
fn schedule(t: LightProc) {
if let Err(err) = POOL.sender.try_send(t) {
// We were not able to send to the channel without
// blocking.
POOL.sender.send(err.into_inner()).unwrap();
}
// Add up for every incoming scheduled task
DYNAMIC_POOL_MANAGER.get().unwrap().increment_frequency();
}
///
/// Low watermark value, defines the bare minimum of the pool.
/// Spawns initial thread set.
/// Can be configurable with env var `BASTION_BLOCKING_THREADS` at runtime.
#[inline]
pub fn low_watermark() -> &'static u64 {
fn low_watermark() -> &'static u64 {
lazy_static! {
static ref LOW_WATERMARK: u64 = {
env::var_os("BASTION_BLOCKING_THREADS")
@ -346,15 +125,3 @@ pub fn low_watermark() -> &'static u64 {
&*LOW_WATERMARK
}
///
/// Affinity pinner for blocking pool
/// Pinning isn't going to be enabled for single core systems.
#[inline]
pub fn affinity_pinner() {
if 1 != *load_balancer::core_retrieval() {
let mut core = ROUND_ROBIN_PIN.lock().unwrap();
placement::set_for_current(*core);
core.id = (core.id + 1) % *load_balancer::core_retrieval();
}
}

View File

@ -1,46 +0,0 @@
//!
//! Cache affine thread pool distributor
//!
//! Distributor provides a fair distribution of threads and pinning them to cores for fair execution.
//! It assigns threads in round-robin fashion to all cores.
use crate::placement::{self, CoreId};
use crate::run_queue::{Stealer, Worker};
use crate::worker;
use lightproc::prelude::*;
use std::thread;
pub(crate) struct Distributor {
pub(crate) cores: Vec<CoreId>,
}
impl Distributor {
pub(crate) fn new() -> Self {
Distributor {
cores: placement::get_core_ids().expect("Core mapping couldn't be fetched"),
}
}
pub(crate) fn assign(self) -> Vec<Stealer<LightProc>> {
let mut stealers = Vec::<Stealer<LightProc>>::new();
for core in self.cores {
let wrk = Worker::new_fifo();
stealers.push(wrk.stealer());
thread::Builder::new()
.name("bastion-async-thread".to_string())
.spawn(move || {
// affinity assignment
placement::set_for_current(core);
// run initial stats generation for cores
worker::stats_generator(core.id, &wrk);
// actual execution
worker::main_loop(core.id, wrk);
})
.expect("cannot start the thread for running proc");
}
stealers
}
}

View File

@ -28,26 +28,15 @@
// Force missing implementations
#![warn(missing_docs)]
#![warn(missing_debug_implementations)]
#![cfg_attr(
any(feature = "numanji", feature = "allocator-suite"),
feature(allocator_api)
)]
#![cfg_attr(
any(feature = "numanji", feature = "allocator-suite"),
feature(nonnull_slice_from_raw_parts)
)]
#[macro_use]
mod macros;
pub mod allocator;
pub mod blocking;
pub mod distributor;
pub mod load_balancer;
pub mod placement;
pub mod pool;
pub mod run;
pub mod run_queue;
pub mod sleepers;
mod thread_manager;
pub mod worker;
///

View File

@ -7,51 +7,104 @@
use crate::load_balancer;
use crate::placement;
use arrayvec::ArrayVec;
use fmt::{Debug, Formatter};
use lazy_static::*;
use once_cell::sync::Lazy;
use placement::CoreId;
use std::mem::MaybeUninit;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::thread;
use std::time::Duration;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::RwLock;
use std::time::{Duration, Instant};
use std::{fmt, usize};
use tracing::{debug, error};
const MEAN_UPDATE_TRESHOLD: Duration = Duration::from_millis(200);
/// Stats of all the smp queues.
pub trait SmpStats {
/// Stores the load of the given queue.
fn store_load(&self, affinity: usize, load: usize);
/// returns tuple of queue id and load in an sorted order.
/// returns tuple of queue id and load ordered from highest load to lowest.
fn get_sorted_load(&self) -> ArrayVec<[(usize, usize); MAX_CORE]>;
/// mean of the all smp queue load.
fn mean(&self) -> usize;
/// update the smp mean.
fn update_mean(&self);
}
///
/// Load-balancer struct which is just a convenience wrapper over the statistics calculations.
#[derive(Debug)]
pub struct LoadBalancer;
static LOAD_BALANCER: Lazy<LoadBalancer> = Lazy::new(|| {
let lb = LoadBalancer::new(placement::get_core_ids().unwrap());
debug!("Instantiated load_balancer: {:?}", lb);
lb
});
/// Load-balancer struct which allows us to update the mean load
pub struct LoadBalancer {
/// The number of cores
/// available for this program
pub num_cores: usize,
/// The core Ids available for this program
/// This doesn't take affinity into account
pub cores: Vec<CoreId>,
mean_last_updated_at: RwLock<Instant>,
}
impl LoadBalancer {
///
/// AMQL sampling thread for run queue load balancing.
pub fn amql_generation() {
thread::Builder::new()
.name("bastion-load-balancer-thread".to_string())
.spawn(move || {
loop {
load_balancer::stats().update_mean();
// We don't have β-reduction here… Life is unfair. Life is cruel.
//
// Try sleeping for a while to wait
// Should be smaller time slice than 4 times per second to not miss
thread::sleep(Duration::from_millis(245));
// Yield immediately back to os so we can advance in workers
thread::yield_now();
}
})
.expect("load-balancer couldn't start");
/// Creates a new LoadBalancer.
/// if you're looking for `num_cores` and `cores`
/// Have a look at `load_balancer::core_count()`
/// and `load_balancer::get_cores()` respectively.
pub fn new(cores: Vec<CoreId>) -> Self {
Self {
num_cores: cores.len(),
cores,
mean_last_updated_at: RwLock::new(Instant::now()),
}
}
}
impl Debug for LoadBalancer {
fn fmt(&self, fmt: &mut Formatter) -> fmt::Result {
fmt.debug_struct("LoadBalancer")
.field("num_cores", &self.num_cores)
.field("cores", &self.cores)
.field("mean_last_updated_at", &self.mean_last_updated_at)
.finish()
}
}
impl LoadBalancer {
/// Iterates the statistics to get the mean load across the cores
pub fn update_load_mean(&self) {
// Check if update should occur
if !self.should_update() {
return;
}
self.mean_last_updated_at
.write()
.map(|mut last_updated_at| {
*last_updated_at = Instant::now();
})
.unwrap_or_else(|e| error!("couldn't update mean timestamp - {}", e));
load_balancer::stats().update_mean();
}
fn should_update(&self) -> bool {
// If we couldn't acquire a lock on the mean last_updated_at,
// There is probably someone else updating already
self.mean_last_updated_at
.try_read()
.map(|last_updated_at| last_updated_at.elapsed() > MEAN_UPDATE_TRESHOLD)
.unwrap_or(false)
}
}
/// Update the mean load on the singleton
pub fn update() {
LOAD_BALANCER.update_load_mean()
}
/// Maximum number of core supported by modern computers.
const MAX_CORE: usize = 256;
@ -64,6 +117,7 @@ const MAX_CORE: usize = 256;
pub struct Stats {
smp_load: [AtomicUsize; MAX_CORE],
mean_level: AtomicUsize,
updating_mean: AtomicBool,
}
impl fmt::Debug for Stats {
@ -71,6 +125,7 @@ impl fmt::Debug for Stats {
fmt.debug_struct("Stats")
.field("smp_load", &&self.smp_load[..])
.field("mean_level", &self.mean_level)
.field("updating_mean", &self.updating_mean)
.finish()
}
}
@ -81,26 +136,24 @@ impl Stats {
let smp_load: [AtomicUsize; MAX_CORE] = {
let mut data: [MaybeUninit<AtomicUsize>; MAX_CORE] =
unsafe { MaybeUninit::uninit().assume_init() };
let mut i = 0;
while i < MAX_CORE {
if i < num_cores {
unsafe {
std::ptr::write(data[i].as_mut_ptr(), AtomicUsize::new(0));
}
i += 1;
continue;
}
// MAX is for unused slot.
for core_data in data.iter_mut().take(num_cores) {
unsafe {
std::ptr::write(data[i].as_mut_ptr(), AtomicUsize::new(usize::MAX));
std::ptr::write(core_data.as_mut_ptr(), AtomicUsize::new(0));
}
i += 1;
}
for core_data in data.iter_mut().take(MAX_CORE).skip(num_cores) {
unsafe {
std::ptr::write(core_data.as_mut_ptr(), AtomicUsize::new(usize::MAX));
}
}
unsafe { std::mem::transmute::<_, [AtomicUsize; MAX_CORE]>(data) }
};
Stats {
smp_load,
mean_level: AtomicUsize::new(0),
updating_mean: AtomicBool::new(false),
}
}
}
@ -116,38 +169,43 @@ impl SmpStats for Stats {
fn get_sorted_load(&self) -> ArrayVec<[(usize, usize); MAX_CORE]> {
let mut sorted_load = ArrayVec::<[(usize, usize); MAX_CORE]>::new();
for (i, item) in self.smp_load.iter().enumerate() {
let load = item.load(Ordering::SeqCst);
for (core, load) in self.smp_load.iter().enumerate() {
let load = load.load(Ordering::SeqCst);
// load till maximum core.
if load == usize::MAX {
break;
}
// unsafe is ok here because self.smp_load.len() is MAX_CORE
unsafe { sorted_load.push_unchecked((i, load)) };
unsafe { sorted_load.push_unchecked((core, load)) };
}
sorted_load.sort_by(|x, y| y.1.cmp(&x.1));
sorted_load
}
fn mean(&self) -> usize {
self.mean_level.load(Ordering::SeqCst)
self.mean_level.load(Ordering::Acquire)
}
fn update_mean(&self) {
let mut sum: usize = 0;
for item in self.smp_load.iter() {
let load = item.load(Ordering::SeqCst);
if let Some(tmp) = sum.checked_add(load) {
sum = tmp;
continue;
}
break;
// Don't update if it's updating already
if self.updating_mean.load(Ordering::Acquire) {
return;
}
self.mean_level.store(
sum.wrapping_div(placement::get_core_ids().unwrap().len()),
Ordering::SeqCst,
);
self.updating_mean.store(true, Ordering::Release);
let mut sum: usize = 0;
let num_cores = LOAD_BALANCER.num_cores;
for item in self.smp_load.iter().take(num_cores) {
if let Some(tmp) = sum.checked_add(item.load(Ordering::Acquire)) {
sum = tmp;
}
}
self.mean_level
.store(sum.wrapping_div(num_cores), Ordering::Release);
self.updating_mean.store(false, Ordering::Release);
}
}
@ -156,7 +214,7 @@ impl SmpStats for Stats {
#[inline]
pub fn stats() -> &'static Stats {
lazy_static! {
static ref LOCKLESS_STATS: Stats = Stats::new(*core_retrieval());
static ref LOCKLESS_STATS: Stats = Stats::new(*core_count());
}
&*LOCKLESS_STATS
}
@ -164,10 +222,13 @@ pub fn stats() -> &'static Stats {
///
/// Retrieve core count for the runtime scheduling purposes
#[inline]
pub fn core_retrieval() -> &'static usize {
lazy_static! {
static ref CORE_COUNT: usize = placement::get_core_ids().unwrap().len();
}
&*CORE_COUNT
pub fn core_count() -> &'static usize {
&LOAD_BALANCER.num_cores
}
///
/// Retrieve cores for the runtime scheduling purposes
#[inline]
pub fn get_cores() -> &'static [CoreId] {
&*LOAD_BALANCER.cores
}

View File

@ -1,11 +0,0 @@
///
/// Marker of unstable API.
#[doc(hidden)]
macro_rules! unstable_api {
($($block:item)*) => {
$(
#[cfg(feature = "unstable")]
$block
)*
}
}

View File

@ -10,9 +10,15 @@ pub fn get_core_ids() -> Option<Vec<CoreId>> {
get_core_ids_helper()
}
/// This function tries to retrieve
/// the number of active "cores" on the system.
pub fn get_num_cores() -> Option<usize> {
get_core_ids().map(|ids| ids.len())
}
///
/// Sets the current threads affinity
pub fn set_for_current(core_id: CoreId) {
tracing::info!("Executor: placement: set affinity on core {}", core_id.id);
set_for_current_helper(core_id);
}

View File

@ -1,16 +1,23 @@
//!
//! Pool of threads to run lightweight processes
//!
//! Pool management and tracking belongs here.
//! We spawn futures onto the pool with [spawn] method of global run queue or
//! with corresponding [Worker]'s spawn method.
use crate::distributor::Distributor;
use crate::run_queue::{Injector, Stealer};
use crate::sleepers::Sleepers;
use crate::thread_manager::{DynamicPoolManager, DynamicRunner};
use crate::worker;
use crossbeam_channel::{unbounded, Receiver, Sender};
use lazy_static::lazy_static;
use lightproc::prelude::*;
use lightproc::lightproc::LightProc;
use lightproc::proc_stack::ProcStack;
use lightproc::recoverable_handle::RecoverableHandle;
use once_cell::sync::{Lazy, OnceCell};
use std::future::Future;
use std::iter::Iterator;
use std::sync::Arc;
use std::time::Duration;
use std::{env, thread};
use tracing::trace;
///
/// Spawn a process (which contains future + process stack) onto the executor from the global level.
@ -42,22 +49,29 @@ where
F: Future<Output = T> + Send + 'static,
T: Send + 'static,
{
self::get().spawn(future, stack)
let (task, handle) = LightProc::recoverable(future, worker::schedule, stack);
task.schedule();
handle
}
/// Spawns a blocking task.
///
/// The task will be spawned onto a thread pool specifically dedicated to blocking tasks.
pub fn spawn_blocking<F, R>(future: F, stack: ProcStack) -> RecoverableHandle<R>
where
F: Future<Output = R> + Send + 'static,
R: Send + 'static,
{
let (task, handle) = LightProc::recoverable(future, schedule, stack);
task.schedule();
handle
}
///
/// Pool that global run queue, stealers of the workers, and parked threads.
#[derive(Debug)]
pub struct Pool {
///
/// Global run queue implementation
pub(crate) injector: Injector<LightProc>,
///
/// Stealers of the workers
pub(crate) stealers: Vec<Stealer<LightProc>>,
///
/// Container of parked threads
pub(crate) sleepers: Sleepers,
/// Acquire the static Pool reference
#[inline]
pub fn get() -> &'static Pool {
&*POOL
}
impl Pool {
@ -78,21 +92,96 @@ impl Pool {
}
}
///
/// Acquire the static Pool reference
#[inline]
pub fn get() -> &'static Pool {
lazy_static! {
static ref POOL: Pool = {
let distributor = Distributor::new();
let stealers = distributor.assign();
/// Enqueues work, attempting to send to the thread pool in a
/// nonblocking way and spinning up needed amount of threads
/// based on the previous statistics without relying on
/// if there is not a thread ready to accept the work or not.
pub(crate) fn schedule(t: LightProc) {
if let Err(err) = POOL.sender.try_send(t) {
// We were not able to send to the channel without
// blocking.
POOL.sender.send(err.into_inner()).unwrap();
}
// Add up for every incoming scheduled task
DYNAMIC_POOL_MANAGER.get().unwrap().increment_frequency();
}
Pool {
injector: Injector::new(),
stealers,
sleepers: Sleepers::new(),
}
///
/// Low watermark value, defines the bare minimum of the pool.
/// Spawns initial thread set.
/// Can be configurable with env var `BASTION_BLOCKING_THREADS` at runtime.
#[inline]
fn low_watermark() -> &'static u64 {
lazy_static! {
static ref LOW_WATERMARK: u64 = {
env::var_os("BASTION_BLOCKING_THREADS")
.map(|x| x.to_str().unwrap().parse::<u64>().unwrap())
.unwrap_or(DEFAULT_LOW_WATERMARK)
};
}
&*POOL
&*LOW_WATERMARK
}
/// If low watermark isn't configured this is the default scaler value.
/// This value is used for the heuristics of the scaler
const DEFAULT_LOW_WATERMARK: u64 = 2;
/// Pool interface between the scheduler and thread pool
#[derive(Debug)]
pub struct Pool {
sender: Sender<LightProc>,
receiver: Receiver<LightProc>,
}
struct AsyncRunner {}
impl DynamicRunner for AsyncRunner {
fn run_static(&self, park_timeout: Duration) -> ! {
loop {
for task in &POOL.receiver {
trace!("static: running task");
task.run();
}
trace!("static: empty queue, parking with timeout");
thread::park_timeout(park_timeout);
}
}
fn run_dynamic(&self, parker: &dyn Fn()) -> ! {
loop {
while let Ok(task) = POOL.receiver.try_recv() {
trace!("dynamic thread: running task");
task.run();
}
trace!(
"dynamic thread: parking - {:?}",
std::thread::current().id()
);
parker();
}
}
fn run_standalone(&self) {
while let Ok(task) = POOL.receiver.try_recv() {
task.run();
}
trace!("standalone thread: quitting.");
}
}
static DYNAMIC_POOL_MANAGER: OnceCell<DynamicPoolManager> = OnceCell::new();
static POOL: Lazy<Pool> = Lazy::new(|| {
let runner = Arc::new(AsyncRunner {});
DYNAMIC_POOL_MANAGER
.set(DynamicPoolManager::new(*low_watermark() as usize, runner))
.expect("couldn't create dynamic pool manager");
DYNAMIC_POOL_MANAGER
.get()
.expect("couldn't get static pool manager")
.initialize();
let (sender, receiver) = unbounded();
Pool { sender, receiver }
});

View File

@ -1727,26 +1727,17 @@ pub enum Steal<T> {
impl<T> Steal<T> {
/// Returns `true` if the queue was empty at the time of stealing.
pub fn is_empty(&self) -> bool {
match self {
Steal::Empty => true,
_ => false,
}
matches!(self, Steal::Empty)
}
/// Returns `true` if at least one task was stolen.
pub fn is_success(&self) -> bool {
match self {
Steal::Success(_) => true,
_ => false,
}
matches!(self, Steal::Success(_))
}
/// Returns `true` if the steal operation needs to be retried.
pub fn is_retry(&self) -> bool {
match self {
Steal::Retry => true,
_ => false,
}
matches!(self, Steal::Retry)
}
/// Returns the result of the operation, if successful.

View File

@ -0,0 +1,407 @@
//! A thread manager to predict how many threads should be spawned to handle the upcoming load.
//!
//! The thread manager consists of three elements:
//! * Frequency Detector
//! * Trend Estimator
//! * Predictive Upscaler
//!
//! ## Frequency Detector
//! Detects how many tasks are submitted from scheduler to thread pool in a given time frame.
//! Pool manager thread does this sampling every 90 milliseconds.
//! This value is going to be used for trend estimation phase.
//!
//! ## Trend Estimator
//! Hold up to the given number of frequencies to create an estimation.
//! Trend estimator holds 10 frequencies at a time.
//! This value is stored as constant in [FREQUENCY_QUEUE_SIZE](constant.FREQUENCY_QUEUE_SIZE.html).
//! Estimation algorithm and prediction uses Exponentially Weighted Moving Average algorithm.
//!
//! This algorithm is adapted from [A Novel Predictive and SelfAdaptive Dynamic Thread Pool Management](https://doi.org/10.1109/ISPA.2011.61)
//! and altered to:
//! * use instead of heavy calculation of trend, utilize thread redundancy which is the sum of the differences between the predicted and observed value.
//! * use instead of linear trend estimation, it uses exponential trend estimation where formula is:
//! ```text
//! LOW_WATERMARK * (predicted - observed) + LOW_WATERMARK
//! ```
//! *NOTE:* If this algorithm wants to be tweaked increasing [LOW_WATERMARK](constant.LOW_WATERMARK.html) will automatically adapt the additional dynamic thread spawn count
//! * operate without watermarking by timestamps (in paper which is used to measure algorithms own performance during the execution)
//! * operate extensive subsampling. Extensive subsampling congests the pool manager thread.
//! * operate without keeping track of idle time of threads or job out queue like TEMA and FOPS implementations.
//!
//! ## Predictive Upscaler
//! Upscaler has three cases (also can be seen in paper):
//! * The rate slightly increases and there are many idle threads.
//! * The number of worker threads tends to be reduced since the workload of the system is descending.
//! * The system has no request or stalled. (Our case here is when the current tasks block further tasks from being processed throughput hogs)
//!
//! For the first two EMA calculation and exponential trend estimation gives good performance.
//! For the last case, upscaler selects upscaling amount by amount of tasks mapped when throughput hogs happen.
//!
//! **example scenario:** Let's say we have 10_000 tasks where every one of them is blocking for 1 second. Scheduler will map plenty of tasks but will get rejected.
//! This makes estimation calculation nearly 0 for both entering and exiting parts. When this happens and we still see tasks mapped from scheduler.
//! We start to slowly increase threads by amount of frequency linearly. High increase of this value either make us hit to the thread threshold on
//! some OS or make congestion on the other thread utilizations of the program, because of context switch.
//!
//! Throughput hogs determined by a combination of job in / job out frequency and current scheduler task assignment frequency.
//! Threshold of EMA difference is eluded by machine epsilon for floating point arithmetic errors.
use crate::{load_balancer, placement};
use core::fmt;
use crossbeam_queue::ArrayQueue;
use fmt::{Debug, Formatter};
use lazy_static::lazy_static;
use lever::prelude::TTas;
use placement::CoreId;
use std::collections::VecDeque;
use std::time::Duration;
use std::{
sync::{
atomic::{AtomicU64, Ordering},
Arc, Mutex,
},
thread::{self, Thread},
};
use tracing::{debug, trace};
/// The default thread park timeout before checking for new tasks.
const THREAD_PARK_TIMEOUT: Duration = Duration::from_millis(1);
/// Frequency histogram's sliding window size.
/// Defines how many frequencies will be considered for adaptation.
const FREQUENCY_QUEUE_SIZE: usize = 10;
/// If low watermark isn't configured this is the default scaler value.
/// This value is used for the heuristics of the scaler
const DEFAULT_LOW_WATERMARK: u64 = 2;
/// Pool scaler interval time (milliseconds).
/// This is the actual interval which makes adaptation calculation.
const SCALER_POLL_INTERVAL: u64 = 90;
/// Exponential moving average smoothing coefficient for limited window.
/// Smoothing factor is estimated with: 2 / (N + 1) where N is sample size.
const EMA_COEFFICIENT: f64 = 2_f64 / (FREQUENCY_QUEUE_SIZE as f64 + 1_f64);
lazy_static! {
static ref ROUND_ROBIN_PIN: Mutex<CoreId> = Mutex::new(CoreId { id: 0 });
}
/// The `DynamicRunner` is piloted by `DynamicPoolManager`.
/// Upon request it needs to be able to provide runner routines for:
/// * Static threads.
/// * Dynamic threads.
/// * Standalone threads.
///
/// Your implementation of `DynamicRunner`
/// will allow you to define what tasks must be accomplished.
///
/// Run static threads:
///
/// run_static should never return, and park for park_timeout instead.
///
/// Run dynamic threads:
/// run_dynamic should never return, and call `parker()` when it has no more tasks to process.
/// It will be unparked automatically by the `DynamicPoolManager` if needs be.
///
/// Run standalone threads:
/// run_standalone should return once it has no more tasks to process.
/// The `DynamicPoolManager` will spawn other standalone threads if needs be.
pub trait DynamicRunner {
fn run_static(&self, park_timeout: Duration) -> !;
fn run_dynamic(&self, parker: &dyn Fn()) -> !;
fn run_standalone(&self);
}
/// The `DynamicPoolManager` is responsible for
/// growing and shrinking a pool according to EMA rules.
///
/// It needs to be passed a structure that implements `DynamicRunner`,
/// That will be responsible for actually spawning threads.
///
/// The `DynamicPoolManager` keeps track of the number
/// of required number of threads to process load correctly.
/// and depending on the current state it will case it will:
/// - Spawn a lot of threads (we're predicting a load spike, and we need to prepare for it)
/// - Spawn few threads (there's a constant load, and throughput is low because the current resources are busy)
/// - Do nothing (the load is shrinking, threads will automatically stop once they're done).
///
/// Kinds of threads:
///
/// ## Static threads:
/// Defined in the constructor, they will always be available. They park for `THREAD_PARK_TIMEOUT` on idle.
///
/// ## Dynamic threads:
/// Created during `DynamicPoolManager` initialization, they will park on idle.
/// The `DynamicPoolManager` grows the number of Dynamic threads
/// so the total number of Static threads + Dynamic threads
/// is the number of available cores on the machine. (`num_cpus::get()`)
///
/// ## Standalone threads:
/// They are created when there aren't enough static and dynamic threads to process the expected load.
/// They will be destroyed on idle.
///
/// ## Spawn order:
/// In order to handle a growing load, the pool manager will ask to:
/// - Use Static threads
/// - Unpark Dynamic threads
/// - Spawn Standalone threads
///
/// The pool manager is not responsible for the tasks to be performed by the threads, it's handled by the `DynamicRunner`
///
/// If you use tracing, you can have a look at the trace! logs generated by the structure.
///
pub struct DynamicPoolManager {
static_threads: usize,
dynamic_threads: usize,
parked_threads: ArrayQueue<Thread>,
runner: Arc<dyn DynamicRunner + Send + Sync>,
last_frequency: AtomicU64,
frequencies: TTas<VecDeque<u64>>,
}
impl Debug for DynamicPoolManager {
fn fmt(&self, fmt: &mut Formatter) -> fmt::Result {
fmt.debug_struct("DynamicPoolManager")
.field("static_threads", &self.static_threads)
.field("dynamic_threads", &self.dynamic_threads)
.field("parked_threads", &self.parked_threads.len())
.field("parked_threads", &self.parked_threads.len())
.field("last_frequency", &self.last_frequency)
.field("frequencies", &self.frequencies.try_lock())
.finish()
}
}
impl DynamicPoolManager {
pub fn new(static_threads: usize, runner: Arc<dyn DynamicRunner + Send + Sync>) -> Self {
let dynamic_threads = 1.max(num_cpus::get().checked_sub(static_threads).unwrap_or(0));
Self {
static_threads,
dynamic_threads,
parked_threads: ArrayQueue::new(dynamic_threads),
runner,
last_frequency: AtomicU64::new(0),
frequencies: TTas::new(VecDeque::with_capacity(
FREQUENCY_QUEUE_SIZE.saturating_add(1),
)),
}
}
pub fn increment_frequency(&self) {
self.last_frequency.fetch_add(1, Ordering::Acquire);
}
/// Initialize the dynamic pool
/// That will be scaled
pub fn initialize(&'static self) {
// Static thread manager that will always be available
trace!("setting up the static thread manager");
(0..self.static_threads).for_each(|_| {
let clone = Arc::clone(&self.runner);
thread::Builder::new()
.name("bastion-driver-static".to_string())
.spawn(move || {
Self::affinity_pinner();
clone.run_static(THREAD_PARK_TIMEOUT);
})
.expect("couldn't spawn static thread");
});
// Dynamic thread manager that will allow us to unpark threads
// According to the needs
trace!("setting up the dynamic thread manager");
(0..self.dynamic_threads).for_each(|_| {
let clone = Arc::clone(&self.runner);
thread::Builder::new()
.name("bastion-driver-dynamic".to_string())
.spawn(move || {
Self::affinity_pinner();
let parker = || self.park_thread();
clone.run_dynamic(&parker);
})
.expect("cannot start dynamic thread");
});
// Pool manager to check frequency of task rates
// and take action by scaling the pool accordingly.
thread::Builder::new()
.name("bastion-pool-manager".to_string())
.spawn(move || {
let poll_interval = Duration::from_millis(SCALER_POLL_INTERVAL);
trace!("setting up the pool manager");
loop {
self.scale_pool();
thread::park_timeout(poll_interval);
}
})
.expect("thread pool manager cannot be started");
}
/// Provision threads takes a number of threads that need to be made available.
/// It will try to unpark threads from the dynamic pool, and spawn more threads if needs be.
pub fn provision_threads(&'static self, n: usize) {
for i in 0..n {
if !self.unpark_thread() {
let new_threads = n - i;
trace!(
"no more threads to unpark, spawning {} new threads",
new_threads
);
return self.spawn_threads(new_threads);
}
}
}
fn spawn_threads(&'static self, n: usize) {
(0..n).for_each(|_| {
let clone = Arc::clone(&self.runner);
thread::Builder::new()
.name("bastion-blocking-driver-standalone".to_string())
.spawn(move || {
Self::affinity_pinner();
clone.run_standalone();
})
.unwrap();
})
}
/// Parks a thread until unpark_thread unparks it
pub fn park_thread(&self) {
let _ = self
.parked_threads
.push(std::thread::current())
.map(|_| {
trace!("parking thread {:?}", std::thread::current().id());
std::thread::park();
})
.map_err(|e| {
debug!(
"couldn't park thread {:?} - {}",
std::thread::current().id(),
e
);
});
}
/// Pops a thread from the parked_threads queue and unparks it.
/// returns true on success.
fn unpark_thread(&self) -> bool {
if self.parked_threads.is_empty() {
trace!("no parked threads");
false
} else {
trace!("parked_threads: len is {}", self.parked_threads.len());
self.parked_threads
.pop()
.map(|thread| {
debug!("Executor: unpark_thread: unparking {:?}", thread.id());
thread.unpark();
})
.map_err(|e| {
debug!("Executor: unpark_thread: couldn't unpark thread - {}", e);
})
.is_ok()
}
}
///
/// Affinity pinner for blocking pool
/// Pinning isn't going to be enabled for single core systems.
#[inline]
fn affinity_pinner() {
if 1 != *load_balancer::core_count() {
let mut core = ROUND_ROBIN_PIN.lock().unwrap();
placement::set_for_current(*core);
core.id = (core.id + 1) % *load_balancer::core_count();
}
}
/// Exponentially Weighted Moving Average calculation
///
/// This allows us to find the EMA value.
/// This value represents the trend of tasks mapped onto the thread pool.
/// Calculation is following:
/// ```text
/// +--------+-----------------+----------------------------------+
/// | Symbol | Identifier | Explanation |
/// +--------+-----------------+----------------------------------+
/// | α | EMA_COEFFICIENT | smoothing factor between 0 and 1 |
/// | Yt | freq | frequency sample at time t |
/// | St | acc | EMA at time t |
/// +--------+-----------------+----------------------------------+
/// ```
/// Under these definitions formula is following:
/// ```text
/// EMA = α * [ Yt + (1 - α)*Yt-1 + ((1 - α)^2)*Yt-2 + ((1 - α)^3)*Yt-3 ... ] + St
/// ```
/// # Arguments
///
/// * `freq_queue` - Sliding window of frequency samples
#[inline]
fn calculate_ema(freq_queue: &VecDeque<u64>) -> f64 {
freq_queue.iter().enumerate().fold(0_f64, |acc, (i, freq)| {
acc + ((*freq as f64) * ((1_f64 - EMA_COEFFICIENT).powf(i as f64) as f64))
}) * EMA_COEFFICIENT as f64
}
/// Adaptive pool scaling function
///
/// This allows to spawn new threads to make room for incoming task pressure.
/// Works in the background detached from the pool system and scales up the pool based
/// on the request rate.
///
/// It uses frequency based calculation to define work. Utilizing average processing rate.
fn scale_pool(&'static self) {
// Fetch current frequency, it does matter that operations are ordered in this approach.
let current_frequency = self.last_frequency.swap(0, Ordering::SeqCst);
let mut freq_queue = self.frequencies.lock();
// Make it safe to start for calculations by adding initial frequency scale
if freq_queue.len() == 0 {
freq_queue.push_back(0);
}
// Calculate message rate for the given time window
let frequency = (current_frequency as f64 / SCALER_POLL_INTERVAL as f64) as u64;
// Calculates current time window's EMA value (including last sample)
let prev_ema_frequency = Self::calculate_ema(&freq_queue);
// Add seen frequency data to the frequency histogram.
freq_queue.push_back(frequency);
if freq_queue.len() == FREQUENCY_QUEUE_SIZE.saturating_add(1) {
freq_queue.pop_front();
}
// Calculates current time window's EMA value (including last sample)
let curr_ema_frequency = Self::calculate_ema(&freq_queue);
// Adapts the thread count of pool
//
// Sliding window of frequencies visited by the pool manager.
// Pool manager creates EMA value for previous window and current window.
// Compare them to determine scaling amount based on the trends.
// If current EMA value is bigger, we will scale up.
if curr_ema_frequency > prev_ema_frequency {
// "Scale by" amount can be seen as "how much load is coming".
// "Scale" amount is "how many threads we should spawn".
let scale_by: f64 = curr_ema_frequency - prev_ema_frequency;
let scale = num_cpus::get().min(
((DEFAULT_LOW_WATERMARK as f64 * scale_by) + DEFAULT_LOW_WATERMARK as f64) as usize,
);
trace!("unparking {} threads", scale);
// It is time to scale the pool!
self.provision_threads(scale);
} else if (curr_ema_frequency - prev_ema_frequency).abs() < std::f64::EPSILON
&& current_frequency != 0
{
// Throughput is low. Allocate more threads to unblock flow.
// If we fall to this case, scheduler is congested by longhauling tasks.
// For unblock the flow we should add up some threads to the pool, but not that many to
// stagger the program's operation.
trace!("unparking {} threads", DEFAULT_LOW_WATERMARK);
self.provision_threads(DEFAULT_LOW_WATERMARK as usize);
}
}
}

View File

@ -3,13 +3,17 @@
//!
//! This worker implementation relies on worker run queue statistics which are hold in the pinned global memory
//! where workload distribution calculated and amended to their own local queues.
use crate::load_balancer;
use crate::pool::{self, Pool};
use crate::run_queue::{Steal, Worker};
use crate::pool;
use lightproc::prelude::*;
use load_balancer::SmpStats;
use std::cell::{Cell, UnsafeCell};
use std::{iter, ptr};
use std::cell::Cell;
use std::ptr;
use std::time::Duration;
/// The timeout we'll use when parking before an other Steal attempt
pub const THREAD_PARK_TIMEOUT: Duration = Duration::from_millis(1);
///
/// Get the current process's stack
pub fn current() -> ProcStack {
@ -55,97 +59,6 @@ where
}
}
thread_local! {
static QUEUE: UnsafeCell<Option<Worker<LightProc>>> = UnsafeCell::new(None);
}
pub(crate) fn schedule(proc: LightProc) {
QUEUE.with(|queue| {
let local = unsafe { (*queue.get()).as_ref() };
match local {
None => pool::get().injector.push(proc),
Some(q) => q.push(proc),
}
});
pool::get().sleepers.notify_one();
}
///
/// Fetch the process from the run queue.
/// Does the work of work-stealing if process doesn't exist in the local run queue.
pub fn fetch_proc(affinity: usize) -> Option<LightProc> {
let pool = pool::get();
QUEUE.with(|queue| {
let local = unsafe { (*queue.get()).as_ref().unwrap() };
local.pop().or_else(|| affine_steal(pool, local, affinity))
})
}
fn affine_steal(pool: &Pool, local: &Worker<LightProc>, affinity: usize) -> Option<LightProc> {
let load_mean = load_balancer::stats().mean();
// Pop a task from the local queue, if not empty.
local.pop().or_else(|| {
// Otherwise, we need to look for a task elsewhere.
iter::repeat_with(|| {
let core_vec = load_balancer::stats().get_sorted_load();
// First try to get procs from global queue
pool.injector.steal_batch_and_pop(&local).or_else(|| {
match core_vec.get(0) {
Some((core, _)) => {
// If affinity is the one with the highest let other's do the stealing
if *core == affinity {
Steal::Retry
} else {
// Try iterating through biggest to smallest
core_vec
.iter()
.map(|s| {
// Steal the mean amount to balance all queues considering incoming workloads
// Otherwise do an ignorant steal (which is going to be useless)
if load_mean > 0 {
pool.stealers
.get(s.0)
.unwrap()
.steal_batch_and_pop_with_amount(&local, load_mean)
} else {
pool.stealers.get(s.0).unwrap().steal_batch_and_pop(&local)
// TODO: Set evacuation flag in thread_local
}
})
.collect()
}
}
_ => Steal::Retry,
}
})
})
// Loop while no task was stolen and any steal operation needs to be retried.
.find(|s| !s.is_retry())
// Extract the stolen task, if there is one.
.and_then(|s| s.success())
})
}
pub(crate) fn stats_generator(affinity: usize, local: &Worker<LightProc>) {
load_balancer::stats().store_load(affinity, local.worker_run_queue_size());
}
pub(crate) fn main_loop(affinity: usize, local: Worker<LightProc>) {
QUEUE.with(|queue| unsafe { *queue.get() = Some(local) });
loop {
QUEUE.with(|queue| {
let local = unsafe { (*queue.get()).as_ref().unwrap() };
stats_generator(affinity, local);
});
match fetch_proc(affinity) {
Some(proc) => set_stack(proc.stack(), || proc.run()),
None => pool::get().sleepers.wait(),
}
}
pool::schedule(proc)
}

View File

@ -0,0 +1,23 @@
use bastion_executor::blocking;
use bastion_executor::run::run;
use lightproc::proc_stack::ProcStack;
use std::thread;
use std::time::Duration;
#[test]
fn test_run_blocking() {
let output = run(
blocking::spawn_blocking(
async {
let duration = Duration::from_millis(1);
thread::sleep(duration);
42
},
ProcStack::default(),
),
ProcStack::default(),
)
.unwrap();
assert_eq!(42, output);
}

View File

@ -74,6 +74,7 @@ artillery-core = { version = "0.1.2-alpha.3", optional = true }
tracing-subscriber = "0.2.6"
tracing = "0.1.15"
anyhow = "1.0.31"
crossbeam-queue = "0.2.3"
[target.'cfg(not(windows))'.dependencies]
nuclei = "0.1.2-alpha.1"

View File

@ -1,6 +1,9 @@
use bastion::prelude::*;
#[cfg(not(target_os = "windows"))]
use futures::*;
use std::fs::{File, OpenOptions};
#[cfg(not(target_os = "windows"))]
use std::fs::File;
use std::fs::OpenOptions;
#[cfg(target_os = "windows")]
use std::io::Write;
use std::path::PathBuf;

View File

@ -26,16 +26,18 @@ fn main() {
// Supervisor that tracks only the single actor with input data
fn input_supervisor(supervisor: Supervisor) -> Supervisor {
supervisor.children(|children| input_group(children))
supervisor.children(input_group)
}
// Supervisor that tracks the actor group with rescaling in runtime.
fn auto_resize_group_supervisor(supervisor: Supervisor) -> Supervisor {
supervisor.children(|children| auto_resize_group(children))
supervisor.children(auto_resize_group)
}
#[allow(clippy::unnecessary_mut_passed)]
fn input_group(children: Children) -> Children {
// we would have fully chained the children builder if it wasn't for the feature flag
#[allow(unused_mut)]
let mut children = children.with_redundancy(1);
#[cfg(feature = "scaling")]
{
@ -68,6 +70,7 @@ fn input_group(children: Children) -> Children {
fn auto_resize_group(children: Children) -> Children {
// we would have fully chained the children builder if it wasn't for the feature flag
#[allow(unused_mut)]
let mut children = children
.with_redundancy(3) // Start with 3 actors
.with_heartbeat_tick(Duration::from_secs(5)); // Do heartbeat each 5 seconds

View File

@ -3,7 +3,9 @@ use bastion::prelude::*;
use futures::io;
#[cfg(target_os = "windows")]
use std::io::{self, Read, Write};
use std::net::{TcpListener, TcpStream, ToSocketAddrs};
#[cfg(not(target_os = "windows"))]
use std::net::TcpListener;
use std::net::{TcpStream, ToSocketAddrs};
use std::sync::atomic::{AtomicUsize, Ordering};
#[cfg(not(target_os = "windows"))]
@ -81,7 +83,7 @@ fn main() {
let port = TCP_SERVERS.fetch_sub(1, Ordering::SeqCst) + 2000;
let addr = format!("127.0.0.1:{}", port);
run(addr);
run(addr).await.unwrap();
Ok(())
})

View File

@ -10,7 +10,7 @@ use crate::message::BastionMessage;
use crate::resizer::ActorGroupStats;
use crate::system::SYSTEM;
use anyhow::Result as AnyResult;
use async_mutex::Mutex;
use bastion_executor::pool;
use futures::pending;
use futures::poll;
@ -35,11 +35,11 @@ pub(crate) struct Child {
callbacks: Callbacks,
// The future that this child is executing.
exec: Exec,
// A lock behind which is the child's context state.
// The child's context state.
// This is used to store the messages that were received
// for the child's associated future to be able to
// retrieve them.
state: Arc<Mutex<Pin<Box<ContextState>>>>,
state: Arc<Pin<Box<ContextState>>>,
// Messages that were received before the child was
// started. Those will be "replayed" once a start message
// is received.
@ -71,7 +71,7 @@ impl Child {
exec: Exec,
callbacks: Callbacks,
bcast: Broadcast,
state: Arc<Mutex<Pin<Box<ContextState>>>>,
state: Arc<Pin<Box<ContextState>>>,
child_ref: ChildRef,
) -> Self {
debug!("Child({}): Initializing.", bcast.id());
@ -200,9 +200,7 @@ impl Child {
sign,
} => {
debug!("Child({}): Received a message: {:?}", self.id(), msg);
let state = self.state.clone();
let mut guard = state.lock().await;
guard.push_message(msg, sign);
self.state.push_message(msg, sign);
}
Envelope {
msg: BastionMessage::RestartRequired { .. },
@ -288,17 +286,16 @@ impl Child {
#[cfg(feature = "scaling")]
async fn update_stats(&mut self) {
let guard = self.state.lock().await;
let context_state = guard.as_ref();
let storage = guard.stats();
let mailbox_size = self.state.mailbox_size();
let storage = self.state.stats();
let mut stats = ActorGroupStats::load(storage.clone());
stats.update_average_mailbox_size(context_state.mailbox_size());
stats.update_average_mailbox_size(mailbox_size);
stats.store(storage);
let actor_stats_table = guard.actor_stats();
let actor_stats_table = self.state.actor_stats();
actor_stats_table
.insert(self.bcast.id().clone(), context_state.mailbox_size())
.insert(self.bcast.id().clone(), mailbox_size)
.ok();
}
@ -416,9 +413,10 @@ impl Child {
#[cfg(feature = "scaling")]
async fn cleanup_actors_stats(&mut self) {
let guard = self.state.lock().await;
let actor_stats_table = guard.actor_stats();
actor_stats_table.remove(&self.bcast.id().clone()).ok();
self.state
.actor_stats()
.remove(&self.bcast.id().clone())
.ok();
}
}

View File

@ -14,7 +14,7 @@ use crate::path::BastionPathElement;
use crate::resizer::{ActorGroupStats, OptimalSizeExploringResizer, ScalingRule};
use crate::system::SYSTEM;
use anyhow::Result as AnyResult;
use async_mutex::Mutex;
use bastion_executor::pool;
use futures::pending;
use futures::poll;
@ -604,7 +604,7 @@ impl Children {
}
}
fn restart_child(&mut self, old_id: &BastionId, old_state: Arc<Mutex<Pin<Box<ContextState>>>>) {
fn restart_child(&mut self, old_id: &BastionId, old_state: Arc<Pin<Box<ContextState>>>) {
let parent = Parent::children(self.as_ref());
let bcast = Broadcast::new(parent, BastionPathElement::Child(old_id.clone()));
@ -616,7 +616,7 @@ impl Children {
let children = self.as_ref();
let supervisor = self.bcast.parent().clone().into_supervisor();
let state = Arc::new(Mutex::new(Box::pin(ContextState::new())));
let state = Arc::new(Box::pin(ContextState::new()));
let ctx = BastionContext::new(
id.clone(),
@ -890,7 +890,7 @@ impl Children {
#[cfg(feature = "scaling")]
self.init_data_for_scaling(&mut state);
let state = Arc::new(Mutex::new(Box::pin(state)));
let state = Arc::new(Box::pin(state));
let ctx = BastionContext::new(
id.clone(),
@ -939,7 +939,7 @@ impl Children {
let children = self.as_ref();
let supervisor = self.bcast.parent().clone().into_supervisor();
let state = Arc::new(Mutex::new(Box::pin(ContextState::new())));
let state = Arc::new(Box::pin(ContextState::new()));
let ctx = BastionContext::new(id, child_ref.clone(), children, supervisor, state.clone());
let init = self.get_heartbeat_fut();

View File

@ -9,13 +9,13 @@ use crate::envelope::{Envelope, RefAddr, SignedMessage};
use crate::message::{Answer, BastionMessage, Message, Msg};
use crate::supervisor::SupervisorRef;
use crate::{prelude::ReceiveError, system::SYSTEM};
use async_mutex::Mutex;
use crossbeam_queue::SegQueue;
use futures::pending;
use futures::FutureExt;
use futures_timer::Delay;
#[cfg(feature = "scaling")]
use lever::table::lotable::LOTable;
use std::collections::VecDeque;
use std::fmt::{self, Display, Formatter};
use std::pin::Pin;
#[cfg(feature = "scaling")]
@ -109,12 +109,12 @@ pub struct BastionContext {
child: ChildRef,
children: ChildrenRef,
supervisor: Option<SupervisorRef>,
state: Arc<Mutex<Pin<Box<ContextState>>>>,
state: Arc<Pin<Box<ContextState>>>,
}
#[derive(Debug)]
pub(crate) struct ContextState {
messages: VecDeque<SignedMessage>,
messages: SegQueue<SignedMessage>,
#[cfg(feature = "scaling")]
stats: Arc<AtomicU64>,
#[cfg(feature = "scaling")]
@ -135,7 +135,7 @@ impl BastionContext {
child: ChildRef,
children: ChildrenRef,
supervisor: Option<SupervisorRef>,
state: Arc<Mutex<Pin<Box<ContextState>>>>,
state: Arc<Pin<Box<ContextState>>>,
) -> Self {
debug!("BastionContext({}): Creating.", id);
BastionContext {
@ -306,17 +306,19 @@ impl BastionContext {
/// [`try_recv_timeout`]: #method.try_recv_timeout
/// [`SignedMessage`]: ../prelude/struct.SignedMessage.html
pub async fn try_recv(&self) -> Option<SignedMessage> {
self.try_recv_timeout(std::time::Duration::from_nanos(0))
.await
.map(|msg| {
trace!("BastionContext({}): Received message: {:?}", self.id, msg);
msg
})
.map_err(|e| {
trace!("BastionContext({}): Received no message.", self.id);
e
})
.ok()
// We want to let a tick pass
// otherwise guard will never contain anything.
Delay::new(Duration::from_millis(0)).await;
trace!("BastionContext({}): Trying to receive message.", self.id);
if let Some(msg) = self.state.pop_message() {
trace!("BastionContext({}): Received message: {:?}", self.id, msg);
Some(msg)
} else {
trace!("BastionContext({}): Received no message.", self.id);
None
}
}
/// Retrieves asynchronously a message received by the element
@ -361,15 +363,10 @@ impl BastionContext {
pub async fn recv(&self) -> Result<SignedMessage, ()> {
debug!("BastionContext({}): Waiting to receive message.", self.id);
loop {
let state = self.state.clone();
let mut guard = state.lock().await;
if let Some(msg) = guard.pop_message() {
if let Some(msg) = self.state.pop_message() {
trace!("BastionContext({}): Received message: {:?}", self.id, msg);
return Ok(msg);
}
drop(guard);
pending!();
}
}
@ -420,15 +417,11 @@ impl BastionContext {
/// [`try_recv`]: #method.try_recv
/// [`SignedMessage`]: ../prelude/struct.SignedMessage.html
pub async fn try_recv_timeout(&self, timeout: Duration) -> Result<SignedMessage, ReceiveError> {
if timeout == std::time::Duration::from_nanos(0) {
debug!("BastionContext({}): Trying to receive message.", self.id);
} else {
debug!(
"BastionContext({}): Waiting to receive message within {} milliseconds.",
self.id,
timeout.as_millis()
);
}
debug!(
"BastionContext({}): Waiting to receive message within {} milliseconds.",
self.id,
timeout.as_millis()
);
futures::select! {
message = self.recv().fuse() => {
message.map_err(|_| ReceiveError::Other)
@ -651,7 +644,7 @@ impl BastionContext {
impl ContextState {
pub(crate) fn new() -> Self {
ContextState {
messages: VecDeque::new(),
messages: SegQueue::new(),
#[cfg(feature = "scaling")]
stats: Arc::new(AtomicU64::new(0)),
#[cfg(feature = "scaling")]
@ -679,12 +672,12 @@ impl ContextState {
self.actor_stats.clone()
}
pub(crate) fn push_message(&mut self, msg: Msg, sign: RefAddr) {
self.messages.push_back(SignedMessage::new(msg, sign))
pub(crate) fn push_message(&self, msg: Msg, sign: RefAddr) {
self.messages.push(SignedMessage::new(msg, sign))
}
pub(crate) fn pop_message(&mut self) -> Option<SignedMessage> {
self.messages.pop_front()
pub(crate) fn pop_message(&self) -> Option<SignedMessage> {
self.messages.pop().ok()
}
#[cfg(feature = "scaling")]
@ -762,7 +755,7 @@ mod context_tests {
}
fn test_try_recv_fail() {
let children = Bastion::children(|children| {
Bastion::children(|children| {
children.with_exec(|ctx: BastionContext| async move {
assert!(ctx.try_recv().await.is_none());
Ok(())

View File

@ -1,7 +1,17 @@
//!
//! Describes the error types that may happen within bastion.
//! Given Bastion has a let it crash strategy, most error aren't noticeable.
//! A ReceiveError may however be raised when calling try_recv() or try_recv_timeout()
//! More errors may happen in the future.
use std::time::Duration;
#[derive(Debug)]
/// These errors happen
/// when try_recv() or try_recv_timeout() are invoked
pub enum ReceiveError {
/// We didn't receive a message on time
Timeout(Duration),
/// Generic error. Not used yet
Other,
}

View File

@ -11,7 +11,7 @@ use crate::children::Children;
use crate::context::{BastionId, ContextState};
use crate::envelope::{RefAddr, SignedMessage};
use crate::supervisor::{SupervisionStrategy, Supervisor};
use async_mutex::Mutex;
use futures::channel::oneshot::{self, Receiver};
use std::any::{type_name, Any};
use std::fmt::Debug;
@ -200,7 +200,7 @@ pub(crate) enum BastionMessage {
InstantiatedChild {
parent_id: BastionId,
child_id: BastionId,
state: Arc<Mutex<Pin<Box<ContextState>>>>,
state: Arc<Pin<Box<ContextState>>>,
},
Message(Msg),
RestartRequired {
@ -214,13 +214,13 @@ pub(crate) enum BastionMessage {
RestartSubtree,
RestoreChild {
id: BastionId,
state: Arc<Mutex<Pin<Box<ContextState>>>>,
state: Arc<Pin<Box<ContextState>>>,
},
DropChild {
id: BastionId,
},
SetState {
state: Arc<Mutex<Pin<Box<ContextState>>>>,
state: Arc<Pin<Box<ContextState>>>,
},
Stopped {
id: BastionId,
@ -418,7 +418,7 @@ impl BastionMessage {
pub(crate) fn instantiated_child(
parent_id: BastionId,
child_id: BastionId,
state: Arc<Mutex<Pin<Box<ContextState>>>>,
state: Arc<Pin<Box<ContextState>>>,
) -> Self {
BastionMessage::InstantiatedChild {
parent_id,
@ -454,7 +454,7 @@ impl BastionMessage {
BastionMessage::RestartSubtree
}
pub(crate) fn restore_child(id: BastionId, state: Arc<Mutex<Pin<Box<ContextState>>>>) -> Self {
pub(crate) fn restore_child(id: BastionId, state: Arc<Pin<Box<ContextState>>>) -> Self {
BastionMessage::RestoreChild { id, state }
}
@ -462,7 +462,7 @@ impl BastionMessage {
BastionMessage::DropChild { id }
}
pub(crate) fn set_state(state: Arc<Mutex<Pin<Box<ContextState>>>>) -> Self {
pub(crate) fn set_state(state: Arc<Pin<Box<ContextState>>>) -> Self {
BastionMessage::SetState { state }
}

View File

@ -9,7 +9,7 @@ use crate::context::{BastionId, ContextState};
use crate::envelope::Envelope;
use crate::message::{BastionMessage, Deployment, Message};
use crate::path::{BastionPath, BastionPathElement};
use async_mutex::Mutex;
use bastion_executor::pool;
use futures::prelude::*;
use futures::stream::FuturesOrdered;
@ -106,7 +106,7 @@ pub struct Supervisor {
#[derive(Debug, Clone)]
struct TrackedChildState {
id: BastionId,
state: Arc<Mutex<Pin<Box<ContextState>>>>,
state: Arc<Pin<Box<ContextState>>>,
restarts_counts: usize,
}
@ -1750,7 +1750,7 @@ impl SupervisorRef {
}
impl TrackedChildState {
fn new(id: BastionId, state: Arc<Mutex<Pin<Box<ContextState>>>>) -> Self {
fn new(id: BastionId, state: Arc<Pin<Box<ContextState>>>) -> Self {
TrackedChildState {
id,
state,
@ -1762,7 +1762,7 @@ impl TrackedChildState {
self.id.clone()
}
fn state(&self) -> Arc<Mutex<Pin<Box<ContextState>>>> {
fn state(&self) -> Arc<Pin<Box<ContextState>>> {
self.state.clone()
}

View File

@ -0,0 +1,64 @@
use bastion::prelude::*;
use std::sync::{
atomic::{AtomicUsize, Ordering},
Arc,
};
use std::thread;
use std::time::Duration;
#[test]
fn test_run_blocking() {
Bastion::init();
Bastion::start();
let c = Bastion::children(|children| {
// We are creating the function to exec
children.with_exec(|ctx: BastionContext| {
let received_messages = Arc::new(AtomicUsize::new(0));
async move {
let received_messages = Arc::clone(&received_messages);
loop {
msg! {
ctx.recv().await?,
msg: &'static str =!>
{
assert_eq!(msg, "hello");
let messages = received_messages.fetch_add(1, Ordering::SeqCst);
answer!(ctx, messages + 1).expect("couldn't reply :(");
};
_: _ => panic!();
}
}
Ok(())
}
})
})
.unwrap();
let child = c.elems()[0].clone();
let output = (0..100)
.map(|_| {
let child = child.clone();
run!(blocking!(
async move {
let duration = Duration::from_millis(1);
thread::sleep(duration);
msg! {
child.clone()
.ask_anonymously("hello").unwrap().await.unwrap(),
output: usize => output;
_: _ => panic!();
}
}
.await
))
.unwrap()
})
.collect::<Vec<_>>();
Bastion::stop();
Bastion::block_until_stopped();
assert_eq!((1..=100).map(|i| i).collect::<Vec<_>>(), output);
}