mirror of https://github.com/spacejam/sled
Make concurrent iterator test more intense
This commit is contained in:
parent
bc8b14b9f8
commit
8c69736c1f
|
@ -490,6 +490,8 @@ fn concurrent_tree_iter() -> io::Result<()> {
|
|||
|
||||
const N_FORWARD: usize = INTENSITY;
|
||||
const N_REVERSE: usize = INTENSITY;
|
||||
const N_INSERT: usize = INTENSITY;
|
||||
const N_DELETE: usize = INTENSITY;
|
||||
|
||||
let config = Config::tmp()
|
||||
.unwrap()
|
||||
|
@ -521,100 +523,100 @@ fn concurrent_tree_iter() -> io::Result<()> {
|
|||
t.insert(item, item.to_vec())?;
|
||||
}
|
||||
|
||||
let barrier = Arc::new(Barrier::new(N_FORWARD + N_REVERSE + 2));
|
||||
let barrier =
|
||||
Arc::new(Barrier::new(N_FORWARD + N_REVERSE + N_INSERT + N_DELETE));
|
||||
|
||||
let mut threads: Vec<thread::JoinHandle<io::Result<()>>> = vec![];
|
||||
static I: AtomicUsize = AtomicUsize::new(0);
|
||||
|
||||
for i in 0..N_FORWARD {
|
||||
let t: Db = t.clone();
|
||||
let barrier = barrier.clone();
|
||||
|
||||
let thread = thread::Builder::new()
|
||||
.name(format!("forward({})", i))
|
||||
.spawn({
|
||||
let t: Db = t.clone();
|
||||
let barrier = barrier.clone();
|
||||
move || {
|
||||
I.fetch_add(1, SeqCst);
|
||||
barrier.wait();
|
||||
for _ in 0..N {
|
||||
let expected = INDELIBLE.iter();
|
||||
let mut keys = t.iter().keys();
|
||||
.spawn(move || {
|
||||
I.fetch_add(1, SeqCst);
|
||||
barrier.wait();
|
||||
for _ in 0..1024 {
|
||||
let expected = INDELIBLE.iter();
|
||||
let mut keys = t.iter().keys();
|
||||
|
||||
for expect in expected {
|
||||
loop {
|
||||
let k = keys.next().unwrap()?;
|
||||
assert!(
|
||||
&*k <= *expect,
|
||||
"witnessed key is {:?} but we expected \
|
||||
for expect in expected {
|
||||
loop {
|
||||
let k = keys.next().unwrap()?;
|
||||
assert!(
|
||||
&*k <= *expect,
|
||||
"witnessed key is {:?} but we expected \
|
||||
one <= {:?}, so we overshot due to a \
|
||||
concurrent modification",
|
||||
k,
|
||||
expect,
|
||||
);
|
||||
if &*k == *expect {
|
||||
break;
|
||||
}
|
||||
k,
|
||||
expect,
|
||||
);
|
||||
if &*k == *expect {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
I.fetch_sub(1, SeqCst);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
I.fetch_sub(1, SeqCst);
|
||||
|
||||
Ok(())
|
||||
})
|
||||
.unwrap();
|
||||
threads.push(thread);
|
||||
}
|
||||
|
||||
for i in 0..N_REVERSE {
|
||||
let t: Db = t.clone();
|
||||
let barrier = barrier.clone();
|
||||
|
||||
let thread = thread::Builder::new()
|
||||
.name(format!("reverse({})", i))
|
||||
.spawn({
|
||||
let t: Db = t.clone();
|
||||
let barrier = barrier.clone();
|
||||
move || {
|
||||
I.fetch_add(1, SeqCst);
|
||||
barrier.wait();
|
||||
for _ in 0..N {
|
||||
let expected = INDELIBLE.iter().rev();
|
||||
let mut keys = t.iter().keys().rev();
|
||||
.spawn(move || {
|
||||
I.fetch_add(1, SeqCst);
|
||||
barrier.wait();
|
||||
for _ in 0..1024 {
|
||||
let expected = INDELIBLE.iter().rev();
|
||||
let mut keys = t.iter().keys().rev();
|
||||
|
||||
for expect in expected {
|
||||
loop {
|
||||
if let Some(Ok(k)) = keys.next() {
|
||||
assert!(
|
||||
&*k >= *expect,
|
||||
"witnessed key is {:?} but we expected \
|
||||
for expect in expected {
|
||||
loop {
|
||||
if let Some(Ok(k)) = keys.next() {
|
||||
assert!(
|
||||
&*k >= *expect,
|
||||
"witnessed key is {:?} but we expected \
|
||||
one >= {:?}, so we overshot due to a \
|
||||
concurrent modification\n{:?}",
|
||||
k,
|
||||
expect,
|
||||
t,
|
||||
);
|
||||
if &*k == *expect {
|
||||
break;
|
||||
}
|
||||
} else {
|
||||
panic!("undershot key on tree: \n{:?}", t);
|
||||
k,
|
||||
expect,
|
||||
t,
|
||||
);
|
||||
if &*k == *expect {
|
||||
break;
|
||||
}
|
||||
} else {
|
||||
panic!("undershot key on tree: \n{:?}", t);
|
||||
}
|
||||
}
|
||||
}
|
||||
I.fetch_sub(1, SeqCst);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
I.fetch_sub(1, SeqCst);
|
||||
|
||||
Ok(())
|
||||
})
|
||||
.unwrap();
|
||||
|
||||
threads.push(thread);
|
||||
}
|
||||
|
||||
let inserter = thread::Builder::new()
|
||||
.name("inserter".into())
|
||||
.spawn({
|
||||
let t: Db = t.clone();
|
||||
let barrier = barrier.clone();
|
||||
move || {
|
||||
for i in 0..N_INSERT {
|
||||
let t: Db = t.clone();
|
||||
let barrier = barrier.clone();
|
||||
|
||||
let thread = thread::Builder::new()
|
||||
.name(format!("insert({})", i))
|
||||
.spawn(move || {
|
||||
barrier.wait();
|
||||
|
||||
while I.load(SeqCst) != 0 {
|
||||
|
@ -629,17 +631,19 @@ fn concurrent_tree_iter() -> io::Result<()> {
|
|||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
})
|
||||
.unwrap();
|
||||
})
|
||||
.unwrap();
|
||||
|
||||
threads.push(inserter);
|
||||
threads.push(thread);
|
||||
}
|
||||
|
||||
let deleter = thread::Builder::new()
|
||||
.name("deleter".into())
|
||||
.spawn({
|
||||
let t: Db = t.clone();
|
||||
move || {
|
||||
for i in 0..N_DELETE {
|
||||
let t: Db = t.clone();
|
||||
let barrier = barrier.clone();
|
||||
|
||||
let thread = thread::Builder::new()
|
||||
.name(format!("deleter({})", i))
|
||||
.spawn(move || {
|
||||
barrier.wait();
|
||||
|
||||
while I.load(SeqCst) != 0 {
|
||||
|
@ -654,11 +658,11 @@ fn concurrent_tree_iter() -> io::Result<()> {
|
|||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
})
|
||||
.unwrap();
|
||||
})
|
||||
.unwrap();
|
||||
|
||||
threads.push(deleter);
|
||||
threads.push(thread);
|
||||
}
|
||||
|
||||
for thread in threads.into_iter() {
|
||||
thread.join().expect("thread should not have crashed")?;
|
||||
|
|
Loading…
Reference in New Issue