mirror of https://github.com/rust-lang/cargo
Add delays to network retries.
This commit is contained in:
parent
c38e050fc6
commit
6bd1209a55
|
@ -58,6 +58,7 @@ os_info = "3.5.0"
|
|||
pasetors = { version = "0.6.4", features = ["v3", "paserk", "std", "serde"] }
|
||||
pathdiff = "0.2"
|
||||
pretty_env_logger = { version = "0.4", optional = true }
|
||||
rand = "0.8.5"
|
||||
rustfix = "0.6.0"
|
||||
semver = { version = "1.0.3", features = ["serde"] }
|
||||
serde = { version = "1.0.123", features = ["derive"] }
|
||||
|
|
|
@ -28,7 +28,8 @@ use crate::ops;
|
|||
use crate::util::config::PackageCacheLock;
|
||||
use crate::util::errors::{CargoResult, HttpNotSuccessful};
|
||||
use crate::util::interning::InternedString;
|
||||
use crate::util::network::retry::Retry;
|
||||
use crate::util::network::retry::{Retry, RetryResult};
|
||||
use crate::util::network::sleep::SleepTracker;
|
||||
use crate::util::{self, internal, Config, Progress, ProgressStyle};
|
||||
|
||||
pub const MANIFEST_PREAMBLE: &str = "\
|
||||
|
@ -319,6 +320,8 @@ pub struct Downloads<'a, 'cfg> {
|
|||
/// Set of packages currently being downloaded. This should stay in sync
|
||||
/// with `pending`.
|
||||
pending_ids: HashSet<PackageId>,
|
||||
/// Downloads that have failed and are waiting to retry again later.
|
||||
sleeping: SleepTracker<(Download<'cfg>, Easy)>,
|
||||
/// The final result of each download. A pair `(token, result)`. This is a
|
||||
/// temporary holding area, needed because curl can report multiple
|
||||
/// downloads at once, but the main loop (`wait`) is written to only
|
||||
|
@ -442,6 +445,7 @@ impl<'cfg> PackageSet<'cfg> {
|
|||
next: 0,
|
||||
pending: HashMap::new(),
|
||||
pending_ids: HashSet::new(),
|
||||
sleeping: SleepTracker::new(),
|
||||
results: Vec::new(),
|
||||
progress: RefCell::new(Some(Progress::with_style(
|
||||
"Downloading",
|
||||
|
@ -800,7 +804,7 @@ impl<'a, 'cfg> Downloads<'a, 'cfg> {
|
|||
|
||||
/// Returns the number of crates that are still downloading.
|
||||
pub fn remaining(&self) -> usize {
|
||||
self.pending.len()
|
||||
self.pending.len() + self.sleeping.len()
|
||||
}
|
||||
|
||||
/// Blocks the current thread waiting for a package to finish downloading.
|
||||
|
@ -831,51 +835,52 @@ impl<'a, 'cfg> Downloads<'a, 'cfg> {
|
|||
let ret = {
|
||||
let timed_out = &dl.timed_out;
|
||||
let url = &dl.url;
|
||||
dl.retry
|
||||
.r#try(|| {
|
||||
if let Err(e) = result {
|
||||
// If this error is "aborted by callback" then that's
|
||||
// probably because our progress callback aborted due to
|
||||
// a timeout. We'll find out by looking at the
|
||||
// `timed_out` field, looking for a descriptive message.
|
||||
// If one is found we switch the error code (to ensure
|
||||
// it's flagged as spurious) and then attach our extra
|
||||
// information to the error.
|
||||
if !e.is_aborted_by_callback() {
|
||||
return Err(e.into());
|
||||
}
|
||||
|
||||
return Err(match timed_out.replace(None) {
|
||||
Some(msg) => {
|
||||
let code = curl_sys::CURLE_OPERATION_TIMEDOUT;
|
||||
let mut err = curl::Error::new(code);
|
||||
err.set_extra(msg);
|
||||
err
|
||||
}
|
||||
None => e,
|
||||
}
|
||||
.into());
|
||||
dl.retry.r#try(|| {
|
||||
if let Err(e) = result {
|
||||
// If this error is "aborted by callback" then that's
|
||||
// probably because our progress callback aborted due to
|
||||
// a timeout. We'll find out by looking at the
|
||||
// `timed_out` field, looking for a descriptive message.
|
||||
// If one is found we switch the error code (to ensure
|
||||
// it's flagged as spurious) and then attach our extra
|
||||
// information to the error.
|
||||
if !e.is_aborted_by_callback() {
|
||||
return Err(e.into());
|
||||
}
|
||||
|
||||
let code = handle.response_code()?;
|
||||
if code != 200 && code != 0 {
|
||||
let url = handle.effective_url()?.unwrap_or(url);
|
||||
return Err(HttpNotSuccessful {
|
||||
code,
|
||||
url: url.to_string(),
|
||||
body: data,
|
||||
return Err(match timed_out.replace(None) {
|
||||
Some(msg) => {
|
||||
let code = curl_sys::CURLE_OPERATION_TIMEDOUT;
|
||||
let mut err = curl::Error::new(code);
|
||||
err.set_extra(msg);
|
||||
err
|
||||
}
|
||||
.into());
|
||||
None => e,
|
||||
}
|
||||
Ok(data)
|
||||
})
|
||||
.with_context(|| format!("failed to download from `{}`", dl.url))?
|
||||
.into());
|
||||
}
|
||||
|
||||
let code = handle.response_code()?;
|
||||
if code != 200 && code != 0 {
|
||||
let url = handle.effective_url()?.unwrap_or(url);
|
||||
return Err(HttpNotSuccessful {
|
||||
code,
|
||||
url: url.to_string(),
|
||||
body: data,
|
||||
}
|
||||
.into());
|
||||
}
|
||||
Ok(data)
|
||||
})
|
||||
};
|
||||
match ret {
|
||||
Some(data) => break (dl, data),
|
||||
None => {
|
||||
self.pending_ids.insert(dl.id);
|
||||
self.enqueue(dl, handle)?
|
||||
RetryResult::Success(data) => break (dl, data),
|
||||
RetryResult::Err(e) => {
|
||||
return Err(e.context(format!("failed to download from `{}`", dl.url)))
|
||||
}
|
||||
RetryResult::Retry(sleep) => {
|
||||
debug!("download retry {} for {sleep}ms", dl.url);
|
||||
self.sleeping.push(sleep, (dl, handle));
|
||||
}
|
||||
}
|
||||
};
|
||||
|
@ -963,6 +968,7 @@ impl<'a, 'cfg> Downloads<'a, 'cfg> {
|
|||
// actually block waiting for I/O to happen, which we achieve with the
|
||||
// `wait` method on `multi`.
|
||||
loop {
|
||||
self.add_sleepers()?;
|
||||
let n = tls::set(self, || {
|
||||
self.set
|
||||
.multi
|
||||
|
@ -985,17 +991,31 @@ impl<'a, 'cfg> Downloads<'a, 'cfg> {
|
|||
if let Some(pair) = results.pop() {
|
||||
break Ok(pair);
|
||||
}
|
||||
assert!(!self.pending.is_empty());
|
||||
let min_timeout = Duration::new(1, 0);
|
||||
let timeout = self.set.multi.get_timeout()?.unwrap_or(min_timeout);
|
||||
let timeout = timeout.min(min_timeout);
|
||||
self.set
|
||||
.multi
|
||||
.wait(&mut [], timeout)
|
||||
.with_context(|| "failed to wait on curl `Multi`")?;
|
||||
assert_ne!(self.remaining(), 0);
|
||||
if self.pending.is_empty() {
|
||||
let delay = self.sleeping.time_to_next().unwrap();
|
||||
debug!("sleeping main thread for {delay:?}");
|
||||
std::thread::sleep(delay);
|
||||
} else {
|
||||
let min_timeout = Duration::new(1, 0);
|
||||
let timeout = self.set.multi.get_timeout()?.unwrap_or(min_timeout);
|
||||
let timeout = timeout.min(min_timeout);
|
||||
self.set
|
||||
.multi
|
||||
.wait(&mut [], timeout)
|
||||
.with_context(|| "failed to wait on curl `Multi`")?;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn add_sleepers(&mut self) -> CargoResult<()> {
|
||||
for (dl, handle) in self.sleeping.to_retry() {
|
||||
self.pending_ids.insert(dl.id);
|
||||
self.enqueue(dl, handle)?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn progress(&self, token: usize, total: u64, cur: u64) -> bool {
|
||||
let dl = &self.pending[&token].0;
|
||||
dl.total.set(total);
|
||||
|
@ -1061,7 +1081,7 @@ impl<'a, 'cfg> Downloads<'a, 'cfg> {
|
|||
return Ok(());
|
||||
}
|
||||
}
|
||||
let pending = self.pending.len();
|
||||
let pending = self.remaining();
|
||||
let mut msg = if pending == 1 {
|
||||
format!("{} crate", pending)
|
||||
} else {
|
||||
|
|
|
@ -8,11 +8,12 @@ use crate::sources::registry::download;
|
|||
use crate::sources::registry::MaybeLock;
|
||||
use crate::sources::registry::{LoadResponse, RegistryConfig, RegistryData};
|
||||
use crate::util::errors::{CargoResult, HttpNotSuccessful};
|
||||
use crate::util::network::retry::Retry;
|
||||
use crate::util::network::retry::{Retry, RetryResult};
|
||||
use crate::util::network::sleep::SleepTracker;
|
||||
use crate::util::{auth, Config, Filesystem, IntoUrl, Progress, ProgressStyle};
|
||||
use anyhow::Context;
|
||||
use cargo_util::paths;
|
||||
use curl::easy::{HttpVersion, List};
|
||||
use curl::easy::{Easy, HttpVersion, List};
|
||||
use curl::multi::{EasyHandle, Multi};
|
||||
use log::{debug, trace, warn};
|
||||
use std::cell::RefCell;
|
||||
|
@ -103,6 +104,8 @@ struct Downloads<'cfg> {
|
|||
/// Set of paths currently being downloaded.
|
||||
/// This should stay in sync with `pending`.
|
||||
pending_paths: HashSet<PathBuf>,
|
||||
/// Downloads that have failed and are waiting to retry again later.
|
||||
sleeping: SleepTracker<(Download<'cfg>, Easy)>,
|
||||
/// The final result of each download.
|
||||
results: HashMap<PathBuf, CargoResult<CompletedDownload>>,
|
||||
/// The next ID to use for creating a token (see `Download::token`).
|
||||
|
@ -184,6 +187,7 @@ impl<'cfg> HttpRegistry<'cfg> {
|
|||
next: 0,
|
||||
pending: HashMap::new(),
|
||||
pending_paths: HashSet::new(),
|
||||
sleeping: SleepTracker::new(),
|
||||
results: HashMap::new(),
|
||||
progress: RefCell::new(Some(Progress::with_style(
|
||||
"Fetch",
|
||||
|
@ -265,6 +269,7 @@ impl<'cfg> HttpRegistry<'cfg> {
|
|||
};
|
||||
for (token, result) in results {
|
||||
let (mut download, handle) = self.downloads.pending.remove(&token).unwrap();
|
||||
assert!(self.downloads.pending_paths.remove(&download.path));
|
||||
let mut handle = self.multi.remove(handle)?;
|
||||
let data = download.data.take();
|
||||
let url = self.full_url(&download.path);
|
||||
|
@ -289,21 +294,19 @@ impl<'cfg> HttpRegistry<'cfg> {
|
|||
};
|
||||
Ok((data, code))
|
||||
}) {
|
||||
Ok(Some((data, code))) => Ok(CompletedDownload {
|
||||
RetryResult::Success((data, code)) => Ok(CompletedDownload {
|
||||
response_code: code,
|
||||
data,
|
||||
header_map: download.header_map.take(),
|
||||
}),
|
||||
Ok(None) => {
|
||||
// retry the operation
|
||||
let handle = self.multi.add(handle)?;
|
||||
self.downloads.pending.insert(token, (download, handle));
|
||||
RetryResult::Err(e) => Err(e),
|
||||
RetryResult::Retry(sleep) => {
|
||||
debug!("download retry {:?} for {sleep}ms", download.path);
|
||||
self.downloads.sleeping.push(sleep, (download, handle));
|
||||
continue;
|
||||
}
|
||||
Err(e) => Err(e),
|
||||
};
|
||||
|
||||
assert!(self.downloads.pending_paths.remove(&download.path));
|
||||
self.downloads.results.insert(download.path, result);
|
||||
self.downloads.downloads_finished += 1;
|
||||
}
|
||||
|
@ -395,6 +398,25 @@ impl<'cfg> HttpRegistry<'cfg> {
|
|||
))),
|
||||
}
|
||||
}
|
||||
|
||||
fn add_sleepers(&mut self) -> CargoResult<()> {
|
||||
for (dl, handle) in self.downloads.sleeping.to_retry() {
|
||||
let mut handle = self.multi.add(handle)?;
|
||||
handle.set_token(dl.token)?;
|
||||
assert!(
|
||||
self.downloads.pending_paths.insert(dl.path.to_path_buf()),
|
||||
"path queued for download more than once"
|
||||
);
|
||||
assert!(
|
||||
self.downloads
|
||||
.pending
|
||||
.insert(dl.token, (dl, handle))
|
||||
.is_none(),
|
||||
"dl token queued more than once"
|
||||
);
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl<'cfg> RegistryData for HttpRegistry<'cfg> {
|
||||
|
@ -730,6 +752,7 @@ impl<'cfg> RegistryData for HttpRegistry<'cfg> {
|
|||
|
||||
loop {
|
||||
self.handle_completed_downloads()?;
|
||||
self.add_sleepers()?;
|
||||
|
||||
let remaining_in_multi = tls::set(&self.downloads, || {
|
||||
self.multi
|
||||
|
@ -738,19 +761,25 @@ impl<'cfg> RegistryData for HttpRegistry<'cfg> {
|
|||
})?;
|
||||
trace!("{} transfers remaining", remaining_in_multi);
|
||||
|
||||
if remaining_in_multi == 0 {
|
||||
if remaining_in_multi + self.downloads.sleeping.len() as u32 == 0 {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
// We have no more replies to provide the caller with,
|
||||
// so we need to wait until cURL has something new for us.
|
||||
let timeout = self
|
||||
.multi
|
||||
.get_timeout()?
|
||||
.unwrap_or_else(|| Duration::new(1, 0));
|
||||
self.multi
|
||||
.wait(&mut [], timeout)
|
||||
.with_context(|| "failed to wait on curl `Multi`")?;
|
||||
if self.downloads.pending.is_empty() {
|
||||
let delay = self.downloads.sleeping.time_to_next().unwrap();
|
||||
debug!("sleeping main thread for {delay:?}");
|
||||
std::thread::sleep(delay);
|
||||
} else {
|
||||
// We have no more replies to provide the caller with,
|
||||
// so we need to wait until cURL has something new for us.
|
||||
let timeout = self
|
||||
.multi
|
||||
.get_timeout()?
|
||||
.unwrap_or_else(|| Duration::new(1, 0));
|
||||
self.multi
|
||||
.wait(&mut [], timeout)
|
||||
.with_context(|| "failed to wait on curl `Multi`")?;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -779,7 +808,7 @@ impl<'cfg> Downloads<'cfg> {
|
|||
&format!(
|
||||
" {} complete; {} pending",
|
||||
self.downloads_finished,
|
||||
self.pending.len()
|
||||
self.pending.len() + self.sleeping.len()
|
||||
),
|
||||
)
|
||||
}
|
||||
|
|
|
@ -3,6 +3,7 @@
|
|||
use std::task::Poll;
|
||||
|
||||
pub mod retry;
|
||||
pub mod sleep;
|
||||
|
||||
pub trait PollExt<T> {
|
||||
fn expect(self, msg: &str) -> T;
|
||||
|
|
|
@ -1,37 +1,71 @@
|
|||
//! Utilities for retrying a network operation.
|
||||
|
||||
use crate::util::errors::HttpNotSuccessful;
|
||||
use crate::{CargoResult, Config};
|
||||
use anyhow::Error;
|
||||
|
||||
use crate::util::errors::{CargoResult, HttpNotSuccessful};
|
||||
use crate::util::Config;
|
||||
use rand::Rng;
|
||||
use std::cmp::min;
|
||||
use std::time::Duration;
|
||||
|
||||
pub struct Retry<'a> {
|
||||
config: &'a Config,
|
||||
remaining: u32,
|
||||
retries: u64,
|
||||
max_retries: u64,
|
||||
}
|
||||
|
||||
pub enum RetryResult<T> {
|
||||
Success(T),
|
||||
Err(anyhow::Error),
|
||||
Retry(u64),
|
||||
}
|
||||
|
||||
/// Maximum amount of time a single retry can be delayed (milliseconds).
|
||||
const MAX_RETRY_SLEEP: u64 = 10 * 1000;
|
||||
/// The minimum initial amount of time a retry will be delayed (milliseconds).
|
||||
///
|
||||
/// The actual amount of time will be a random value above this.
|
||||
const INITIAL_RETRY_SLEEP_BASE: u64 = 500;
|
||||
/// The maximum amount of additional time the initial retry will take (milliseconds).
|
||||
///
|
||||
/// The initial delay will be [`INITIAL_RETRY_SLEEP_BASE`] plus a random range
|
||||
/// from 0 to this value.
|
||||
const INITIAL_RETRY_JITTER: u64 = 1000;
|
||||
|
||||
impl<'a> Retry<'a> {
|
||||
pub fn new(config: &'a Config) -> CargoResult<Retry<'a>> {
|
||||
Ok(Retry {
|
||||
config,
|
||||
remaining: config.net_config()?.retry.unwrap_or(2),
|
||||
retries: 0,
|
||||
max_retries: config.net_config()?.retry.unwrap_or(3) as u64,
|
||||
})
|
||||
}
|
||||
|
||||
/// Returns `Ok(None)` for operations that should be re-tried.
|
||||
pub fn r#try<T>(&mut self, f: impl FnOnce() -> CargoResult<T>) -> CargoResult<Option<T>> {
|
||||
pub fn r#try<T>(&mut self, f: impl FnOnce() -> CargoResult<T>) -> RetryResult<T> {
|
||||
match f() {
|
||||
Err(ref e) if maybe_spurious(e) && self.remaining > 0 => {
|
||||
Err(ref e) if maybe_spurious(e) && self.retries < self.max_retries => {
|
||||
let msg = format!(
|
||||
"spurious network error ({} tries remaining): {}",
|
||||
self.remaining,
|
||||
self.max_retries - self.retries,
|
||||
e.root_cause(),
|
||||
);
|
||||
self.config.shell().warn(msg)?;
|
||||
self.remaining -= 1;
|
||||
Ok(None)
|
||||
if let Err(e) = self.config.shell().warn(msg) {
|
||||
return RetryResult::Err(e);
|
||||
}
|
||||
self.retries += 1;
|
||||
let sleep = if self.retries == 1 {
|
||||
let mut rng = rand::thread_rng();
|
||||
INITIAL_RETRY_SLEEP_BASE + rng.gen_range(0..INITIAL_RETRY_JITTER)
|
||||
} else {
|
||||
min(
|
||||
((self.retries - 1) * 3) * 1000 + INITIAL_RETRY_SLEEP_BASE,
|
||||
MAX_RETRY_SLEEP,
|
||||
)
|
||||
};
|
||||
RetryResult::Retry(sleep)
|
||||
}
|
||||
other => other.map(Some),
|
||||
Err(e) => RetryResult::Err(e),
|
||||
Ok(r) => RetryResult::Success(r),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -100,8 +134,10 @@ where
|
|||
{
|
||||
let mut retry = Retry::new(config)?;
|
||||
loop {
|
||||
if let Some(ret) = retry.r#try(&mut callback)? {
|
||||
return Ok(ret);
|
||||
match retry.r#try(&mut callback) {
|
||||
RetryResult::Success(r) => return Ok(r),
|
||||
RetryResult::Err(e) => return Err(e),
|
||||
RetryResult::Retry(sleep) => std::thread::sleep(Duration::from_millis(sleep)),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -155,6 +191,43 @@ fn with_retry_finds_nested_spurious_errors() {
|
|||
assert!(result.is_ok())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn default_retry_schedule() {
|
||||
use crate::core::Shell;
|
||||
|
||||
let spurious = || -> CargoResult<()> {
|
||||
Err(anyhow::Error::from(HttpNotSuccessful {
|
||||
code: 500,
|
||||
url: "Uri".to_string(),
|
||||
body: Vec::new(),
|
||||
}))
|
||||
};
|
||||
let config = Config::default().unwrap();
|
||||
*config.shell() = Shell::from_write(Box::new(Vec::new()));
|
||||
let mut retry = Retry::new(&config).unwrap();
|
||||
match retry.r#try(|| spurious()) {
|
||||
RetryResult::Retry(sleep) => {
|
||||
assert!(
|
||||
sleep >= INITIAL_RETRY_SLEEP_BASE
|
||||
&& sleep < INITIAL_RETRY_SLEEP_BASE + INITIAL_RETRY_JITTER
|
||||
);
|
||||
}
|
||||
_ => panic!("unexpected non-retry"),
|
||||
}
|
||||
match retry.r#try(|| spurious()) {
|
||||
RetryResult::Retry(sleep) => assert_eq!(sleep, 3500),
|
||||
_ => panic!("unexpected non-retry"),
|
||||
}
|
||||
match retry.r#try(|| spurious()) {
|
||||
RetryResult::Retry(sleep) => assert_eq!(sleep, 6500),
|
||||
_ => panic!("unexpected non-retry"),
|
||||
}
|
||||
match retry.r#try(|| spurious()) {
|
||||
RetryResult::Err(_) => {}
|
||||
_ => panic!("unexpected non-retry"),
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn curle_http2_stream_is_spurious() {
|
||||
let code = curl_sys::CURLE_HTTP2_STREAM;
|
||||
|
|
|
@ -0,0 +1,94 @@
|
|||
//! Utility for tracking network requests that will be retried in the future.
|
||||
|
||||
use core::cmp::Ordering;
|
||||
use std::collections::BinaryHeap;
|
||||
use std::time::{Duration, Instant};
|
||||
|
||||
pub struct SleepTracker<T> {
|
||||
heap: BinaryHeap<Sleeper<T>>,
|
||||
}
|
||||
|
||||
struct Sleeper<T> {
|
||||
wakeup: Instant,
|
||||
data: T,
|
||||
}
|
||||
|
||||
impl<T> PartialEq for Sleeper<T> {
|
||||
fn eq(&self, other: &Sleeper<T>) -> bool {
|
||||
self.wakeup == other.wakeup
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> PartialOrd for Sleeper<T> {
|
||||
fn partial_cmp(&self, other: &Sleeper<T>) -> Option<Ordering> {
|
||||
Some(other.wakeup.cmp(&self.wakeup))
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> Eq for Sleeper<T> {}
|
||||
|
||||
impl<T> Ord for Sleeper<T> {
|
||||
fn cmp(&self, other: &Sleeper<T>) -> Ordering {
|
||||
self.wakeup.cmp(&other.wakeup)
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> SleepTracker<T> {
|
||||
pub fn new() -> SleepTracker<T> {
|
||||
SleepTracker {
|
||||
heap: BinaryHeap::new(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Adds a new download that should be retried in the future.
|
||||
pub fn push(&mut self, sleep: u64, data: T) {
|
||||
self.heap.push(Sleeper {
|
||||
wakeup: Instant::now()
|
||||
.checked_add(Duration::from_millis(sleep))
|
||||
.expect("instant should not wrap"),
|
||||
data,
|
||||
});
|
||||
}
|
||||
|
||||
pub fn len(&self) -> usize {
|
||||
self.heap.len()
|
||||
}
|
||||
|
||||
/// Returns any downloads that are ready to go now.
|
||||
pub fn to_retry(&mut self) -> Vec<T> {
|
||||
let now = Instant::now();
|
||||
let mut result = Vec::new();
|
||||
while let Some(next) = self.heap.peek() {
|
||||
log::debug!("ERIC: now={now:?} next={:?}", next.wakeup);
|
||||
if next.wakeup < now {
|
||||
result.push(self.heap.pop().unwrap().data);
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
}
|
||||
result
|
||||
}
|
||||
|
||||
/// Returns the time when the next download is ready to go.
|
||||
///
|
||||
/// Returns None if there are no sleepers remaining.
|
||||
pub fn time_to_next(&self) -> Option<Duration> {
|
||||
self.heap
|
||||
.peek()
|
||||
.map(|s| s.wakeup.saturating_duration_since(Instant::now()))
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn returns_in_order() {
|
||||
let mut s = SleepTracker::new();
|
||||
s.push(3, 3);
|
||||
s.push(1, 1);
|
||||
s.push(6, 6);
|
||||
s.push(5, 5);
|
||||
s.push(2, 2);
|
||||
s.push(10000, 10000);
|
||||
assert_eq!(s.len(), 6);
|
||||
std::thread::sleep(Duration::from_millis(100));
|
||||
assert_eq!(s.to_retry(), &[1, 2, 3, 5, 6]);
|
||||
}
|
|
@ -110,7 +110,7 @@ user-agent = "…" # the user-agent header
|
|||
root = "/some/path" # `cargo install` destination directory
|
||||
|
||||
[net]
|
||||
retry = 2 # network retries
|
||||
retry = 3 # network retries
|
||||
git-fetch-with-cli = true # use the `git` executable for git operations
|
||||
offline = true # do not access the network
|
||||
|
||||
|
@ -724,7 +724,7 @@ The `[net]` table controls networking configuration.
|
|||
|
||||
##### `net.retry`
|
||||
* Type: integer
|
||||
* Default: 2
|
||||
* Default: 3
|
||||
* Environment: `CARGO_NET_RETRY`
|
||||
|
||||
Number of times to retry possibly spurious network errors.
|
||||
|
|
|
@ -327,6 +327,7 @@ fn net_err_suggests_fetch_with_cli() {
|
|||
[UPDATING] git repository `ssh://needs-proxy.invalid/git`
|
||||
warning: spurious network error[..]
|
||||
warning: spurious network error[..]
|
||||
warning: spurious network error[..]
|
||||
[ERROR] failed to get `foo` as a dependency of package `foo v0.0.0 [..]`
|
||||
|
||||
Caused by:
|
||||
|
|
|
@ -9,8 +9,10 @@ use cargo_test_support::registry::{
|
|||
use cargo_test_support::{basic_manifest, project};
|
||||
use cargo_test_support::{git, install::cargo_home, t};
|
||||
use cargo_util::paths::remove_dir_all;
|
||||
use std::fmt::Write;
|
||||
use std::fs::{self, File};
|
||||
use std::path::Path;
|
||||
use std::sync::Arc;
|
||||
use std::sync::Mutex;
|
||||
|
||||
fn setup_http() -> TestRegistry {
|
||||
|
@ -2704,7 +2706,7 @@ Caused by:
|
|||
}
|
||||
|
||||
#[cargo_test]
|
||||
fn sparse_retry() {
|
||||
fn sparse_retry_single() {
|
||||
let fail_count = Mutex::new(0);
|
||||
let _registry = RegistryBuilder::new()
|
||||
.http_index()
|
||||
|
@ -2741,10 +2743,10 @@ fn sparse_retry() {
|
|||
.with_stderr(
|
||||
"\
|
||||
[UPDATING] `dummy-registry` index
|
||||
warning: spurious network error (2 tries remaining): failed to get successful HTTP response from `[..]`, got 500
|
||||
warning: spurious network error (3 tries remaining): failed to get successful HTTP response from `[..]`, got 500
|
||||
body:
|
||||
internal server error
|
||||
warning: spurious network error (1 tries remaining): failed to get successful HTTP response from `[..]`, got 500
|
||||
warning: spurious network error (2 tries remaining): failed to get successful HTTP response from `[..]`, got 500
|
||||
body:
|
||||
internal server error
|
||||
[DOWNLOADING] crates ...
|
||||
|
@ -2757,6 +2759,224 @@ internal server error
|
|||
.run();
|
||||
}
|
||||
|
||||
#[cargo_test]
|
||||
fn sparse_retry_multiple() {
|
||||
// Tests retry behavior of downloading lots of packages with various
|
||||
// failure rates accessing the sparse index.
|
||||
|
||||
// The index is the number of retries, the value is the number of packages
|
||||
// that retry that number of times. Thus 50 packages succeed on first try,
|
||||
// 25 on second, etc.
|
||||
const RETRIES: &[u32] = &[50, 25, 12, 6];
|
||||
|
||||
let pkgs: Vec<_> = RETRIES
|
||||
.iter()
|
||||
.enumerate()
|
||||
.flat_map(|(retries, num)| {
|
||||
(0..*num)
|
||||
.into_iter()
|
||||
.map(move |n| (retries as u32, format!("{}-{n}-{retries}", rand_prefix())))
|
||||
})
|
||||
.collect();
|
||||
|
||||
let mut builder = RegistryBuilder::new().http_index();
|
||||
let fail_counts: Arc<Mutex<Vec<u32>>> = Arc::new(Mutex::new(vec![0; pkgs.len()]));
|
||||
let mut cargo_toml = r#"
|
||||
[package]
|
||||
name = "foo"
|
||||
version = "0.1.0"
|
||||
|
||||
[dependencies]
|
||||
"#
|
||||
.to_string();
|
||||
// The expected stderr output.
|
||||
let mut expected = "\
|
||||
[UPDATING] `dummy-registry` index
|
||||
[DOWNLOADING] crates ...
|
||||
"
|
||||
.to_string();
|
||||
for (n, (retries, name)) in pkgs.iter().enumerate() {
|
||||
let count_clone = fail_counts.clone();
|
||||
let retries = *retries;
|
||||
let ab = &name[..2];
|
||||
let cd = &name[2..4];
|
||||
builder = builder.add_responder(format!("/index/{ab}/{cd}/{name}"), move |req, server| {
|
||||
let mut fail_counts = count_clone.lock().unwrap();
|
||||
if fail_counts[n] < retries {
|
||||
fail_counts[n] += 1;
|
||||
server.internal_server_error(req)
|
||||
} else {
|
||||
server.index(req)
|
||||
}
|
||||
});
|
||||
write!(&mut cargo_toml, "{name} = \"1.0.0\"\n").unwrap();
|
||||
for retry in 0..retries {
|
||||
let remain = 3 - retry;
|
||||
write!(
|
||||
&mut expected,
|
||||
"warning: spurious network error ({remain} tries remaining): \
|
||||
failed to get successful HTTP response from \
|
||||
`http://127.0.0.1:[..]/{ab}/{cd}/{name}`, got 500\n\
|
||||
body:\n\
|
||||
internal server error\n"
|
||||
)
|
||||
.unwrap();
|
||||
}
|
||||
write!(
|
||||
&mut expected,
|
||||
"[DOWNLOADED] {name} v1.0.0 (registry `dummy-registry`)\n"
|
||||
)
|
||||
.unwrap();
|
||||
}
|
||||
let _server = builder.build();
|
||||
for (_, name) in &pkgs {
|
||||
Package::new(name, "1.0.0").publish();
|
||||
}
|
||||
let p = project()
|
||||
.file("Cargo.toml", &cargo_toml)
|
||||
.file("src/lib.rs", "")
|
||||
.build();
|
||||
p.cargo("fetch").with_stderr_unordered(expected).run();
|
||||
}
|
||||
|
||||
#[cargo_test]
|
||||
fn dl_retry_single() {
|
||||
// Tests retry behavior of downloading a package.
|
||||
// This tests a single package which exercises the code path that causes
|
||||
// it to block.
|
||||
let fail_count = Mutex::new(0);
|
||||
let _server = RegistryBuilder::new()
|
||||
.http_index()
|
||||
.add_responder("/dl/bar/1.0.0/download", move |req, server| {
|
||||
let mut fail_count = fail_count.lock().unwrap();
|
||||
if *fail_count < 2 {
|
||||
*fail_count += 1;
|
||||
server.internal_server_error(req)
|
||||
} else {
|
||||
server.dl(req)
|
||||
}
|
||||
})
|
||||
.build();
|
||||
Package::new("bar", "1.0.0").publish();
|
||||
let p = project()
|
||||
.file(
|
||||
"Cargo.toml",
|
||||
r#"
|
||||
[package]
|
||||
name = "foo"
|
||||
version = "0.1.0"
|
||||
|
||||
[dependencies]
|
||||
bar = "1.0"
|
||||
"#,
|
||||
)
|
||||
.file("src/lib.rs", "")
|
||||
.build();
|
||||
p.cargo("fetch")
|
||||
.with_stderr("\
|
||||
[UPDATING] `dummy-registry` index
|
||||
[DOWNLOADING] crates ...
|
||||
warning: spurious network error (3 tries remaining): failed to get successful HTTP response from `http://127.0.0.1:[..]/dl/bar/1.0.0/download`, got 500
|
||||
body:
|
||||
internal server error
|
||||
warning: spurious network error (2 tries remaining): failed to get successful HTTP response from `http://127.0.0.1:[..]/dl/bar/1.0.0/download`, got 500
|
||||
body:
|
||||
internal server error
|
||||
[DOWNLOADED] bar v1.0.0 (registry `dummy-registry`)
|
||||
").run();
|
||||
}
|
||||
|
||||
/// Creates a random prefix to randomly spread out the package names
|
||||
/// to somewhat evenly distribute the different failures at different
|
||||
/// points.
|
||||
fn rand_prefix() -> String {
|
||||
use rand::Rng;
|
||||
const CHARS: &[u8] = b"abcdefghijklmnopqrstuvwxyz";
|
||||
let mut rng = rand::thread_rng();
|
||||
(0..5)
|
||||
.map(|_| CHARS[rng.gen_range(0..CHARS.len())] as char)
|
||||
.collect()
|
||||
}
|
||||
|
||||
#[cargo_test]
|
||||
fn dl_retry_multiple() {
|
||||
// Tests retry behavior of downloading lots of packages with various
|
||||
// failure rates.
|
||||
|
||||
// The index is the number of retries, the value is the number of packages
|
||||
// that retry that number of times. Thus 50 packages succeed on first try,
|
||||
// 25 on second, etc.
|
||||
const RETRIES: &[u32] = &[50, 25, 12, 6];
|
||||
|
||||
let pkgs: Vec<_> = RETRIES
|
||||
.iter()
|
||||
.enumerate()
|
||||
.flat_map(|(retries, num)| {
|
||||
(0..*num)
|
||||
.into_iter()
|
||||
.map(move |n| (retries as u32, format!("{}-{n}-{retries}", rand_prefix())))
|
||||
})
|
||||
.collect();
|
||||
|
||||
let mut builder = RegistryBuilder::new().http_index();
|
||||
let fail_counts: Arc<Mutex<Vec<u32>>> = Arc::new(Mutex::new(vec![0; pkgs.len()]));
|
||||
let mut cargo_toml = r#"
|
||||
[package]
|
||||
name = "foo"
|
||||
version = "0.1.0"
|
||||
|
||||
[dependencies]
|
||||
"#
|
||||
.to_string();
|
||||
// The expected stderr output.
|
||||
let mut expected = "\
|
||||
[UPDATING] `dummy-registry` index
|
||||
[DOWNLOADING] crates ...
|
||||
"
|
||||
.to_string();
|
||||
for (n, (retries, name)) in pkgs.iter().enumerate() {
|
||||
let count_clone = fail_counts.clone();
|
||||
let retries = *retries;
|
||||
builder =
|
||||
builder.add_responder(format!("/dl/{name}/1.0.0/download"), move |req, server| {
|
||||
let mut fail_counts = count_clone.lock().unwrap();
|
||||
if fail_counts[n] < retries {
|
||||
fail_counts[n] += 1;
|
||||
server.internal_server_error(req)
|
||||
} else {
|
||||
server.dl(req)
|
||||
}
|
||||
});
|
||||
write!(&mut cargo_toml, "{name} = \"1.0.0\"\n").unwrap();
|
||||
for retry in 0..retries {
|
||||
let remain = 3 - retry;
|
||||
write!(
|
||||
&mut expected,
|
||||
"warning: spurious network error ({remain} tries remaining): \
|
||||
failed to get successful HTTP response from \
|
||||
`http://127.0.0.1:[..]/dl/{name}/1.0.0/download`, got 500\n\
|
||||
body:\n\
|
||||
internal server error\n"
|
||||
)
|
||||
.unwrap();
|
||||
}
|
||||
write!(
|
||||
&mut expected,
|
||||
"[DOWNLOADED] {name} v1.0.0 (registry `dummy-registry`)\n"
|
||||
)
|
||||
.unwrap();
|
||||
}
|
||||
let _server = builder.build();
|
||||
for (_, name) in &pkgs {
|
||||
Package::new(name, "1.0.0").publish();
|
||||
}
|
||||
let p = project()
|
||||
.file("Cargo.toml", &cargo_toml)
|
||||
.file("src/lib.rs", "")
|
||||
.build();
|
||||
p.cargo("fetch").with_stderr_unordered(expected).run();
|
||||
}
|
||||
|
||||
#[cargo_test]
|
||||
fn deleted_entry() {
|
||||
// Checks the behavior when a package is removed from the index.
|
||||
|
|
Loading…
Reference in New Issue