Add unit test coverage to Durable Functions types.

This commit adds some unit test coverage for Durable Functions types.
This commit is contained in:
Peter Huene 2019-08-05 00:09:04 -07:00 committed by Peter Huene
parent 41f93702cb
commit 2e398ae6aa
No known key found for this signature in database
GPG Key ID: E1D265D820213D6A
8 changed files with 1098 additions and 68 deletions

View File

@ -3,37 +3,6 @@ use azure_functions_durable::{OrchestrationClient, OrchestrationResult};
use serde::Deserialize;
use serde_json::{from_str, Value};
#[derive(Debug, Clone, Deserialize)]
#[serde(rename_all = "camelCase")]
struct CreationUrls {
#[serde(rename = "createNewInstancePostUri")]
create_new_instance_url: String,
#[serde(rename = "createAndWaitOnNewInstancePostUri")]
create_new_instance_and_wait_url: String,
}
#[derive(Debug, Clone, Deserialize)]
#[serde(rename_all = "camelCase")]
struct ManagementUrls {
id: String,
#[serde(rename = "statusQueryGetUri")]
status_query_url: String,
#[serde(rename = "sendEventPostUri")]
raise_event_url: String,
#[serde(rename = "terminatePostUri")]
terminate_url: String,
#[serde(rename = "rewindPostUri")]
rewind_url: String,
#[serde(rename = "purgeHistoryDeleteUri")]
purge_history_url: String,
}
#[derive(Deserialize)]
#[serde(rename_all = "camelCase")]
struct BindingData {
management_urls: ManagementUrls,
}
/// Represents the Durable Functions orchestration client input binding.
///
/// The following binding attributes are supported:
@ -73,6 +42,19 @@ impl DurableOrchestrationClient {
#[doc(hidden)]
impl From<TypedData> for DurableOrchestrationClient {
fn from(data: TypedData) -> Self {
#[derive(Debug, Clone, Deserialize)]
#[serde(rename_all = "camelCase")]
struct ManagementUrls {
#[serde(rename = "statusQueryGetUri")]
status_query_url: String,
}
#[derive(Deserialize)]
#[serde(rename_all = "camelCase")]
struct BindingData {
management_urls: ManagementUrls,
}
let data: BindingData = match &data.data {
Some(Data::String(s)) => {
from_str(s).expect("failed to parse durable orchestration client data")

View File

@ -99,3 +99,55 @@ where
..Default::default()
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::durable::{EventType, HistoryEvent};
use chrono::Utc;
use futures::future::FutureExt;
use std::{
future::Future,
ptr::null,
task::{Context, Poll, RawWaker, RawWakerVTable, Waker},
};
pub(crate) fn poll<F, T>(mut future: F) -> Poll<T>
where
F: Future<Output = T> + Unpin,
{
let waker = unsafe {
Waker::from_raw(RawWaker::new(
null(),
&RawWakerVTable::new(waker_clone, waker_wake, waker_wake_by_ref, waker_drop),
))
};
future.poll_unpin(&mut Context::from_waker(&waker))
}
pub(crate) fn create_event(
event_type: EventType,
event_id: i32,
name: Option<String>,
result: Option<String>,
task_scheduled_id: Option<i32>,
) -> HistoryEvent {
HistoryEvent {
event_type,
event_id,
is_played: true,
timestamp: Utc::now(),
is_processed: false,
name,
input: None,
result,
task_scheduled_id,
instance_id: None,
reason: None,
details: None,
fire_at: None,
timer_id: None,
}
}
}

View File

@ -64,3 +64,146 @@ where
self.event_index
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::durable::{
tests::{create_event, poll},
EventType,
};
use serde_json::{from_str, json};
use std::task::Poll;
#[test]
fn it_polls_pending_without_a_result() {
let history = vec![create_event(
EventType::OrchestratorStarted,
-1,
None,
None,
None,
)];
let state = Rc::new(RefCell::new(OrchestrationState::new(history)));
let future = ActionFuture::<()>::new(None, state, None);
assert_eq!(poll(future), Poll::Pending);
}
#[test]
fn it_polls_ready_given_a_result() {
let history = vec![
create_event(EventType::OrchestratorStarted, -1, None, None, None),
create_event(
EventType::TaskScheduled,
0,
Some("hello".to_string()),
None,
None,
),
create_event(
EventType::TaskCompleted,
-1,
Some("hello".to_string()),
Some(json!("hello").to_string()),
Some(0),
),
];
let mut state = OrchestrationState::new(history);
let (idx, event) = state.find_scheduled_task("hello").unwrap();
event.is_processed = true;
let (idx, event) = state.find_finished_task(idx).unwrap();
event.is_processed = true;
let result = Some(from_str(&event.result.as_ref().unwrap()).unwrap());
let state = Rc::new(RefCell::new(state));
let future = ActionFuture::new(result, state, Some(idx));
assert_eq!(future.event_index(), Some(idx));
assert_eq!(poll(future), Poll::Ready(json!("hello")));
}
#[test]
fn it_updates_state() {
let history = vec![
create_event(EventType::OrchestratorStarted, -1, None, None, None),
create_event(
EventType::TaskScheduled,
0,
Some("hello".to_string()),
None,
None,
),
create_event(EventType::OrchestratorCompleted, -1, None, None, None),
create_event(EventType::OrchestratorStarted, -1, None, None, None),
create_event(
EventType::TaskCompleted,
-1,
Some("hello".to_string()),
Some(json!("hello").to_string()),
Some(0),
),
];
let mut state = OrchestrationState::new(history);
assert!(state.is_replaying());
let (idx, event) = state.find_scheduled_task("hello").unwrap();
event.is_processed = true;
let (idx, event) = state.find_finished_task(idx).unwrap();
event.is_processed = true;
let result = Some(from_str(&event.result.as_ref().unwrap()).unwrap());
let state = Rc::new(RefCell::new(state));
let future = ActionFuture::new(result, state.clone(), Some(idx));
assert_eq!(future.event_index(), Some(idx));
assert_eq!(poll(future), Poll::Ready(json!("hello")));
assert!(!state.borrow().is_replaying());
}
#[test]
fn it_does_not_update_state_when_an_inner_future() {
let history = vec![
create_event(EventType::OrchestratorStarted, -1, None, None, None),
create_event(
EventType::TaskScheduled,
0,
Some("hello".to_string()),
None,
None,
),
create_event(EventType::OrchestratorCompleted, -1, None, None, None),
create_event(EventType::OrchestratorStarted, -1, None, None, None),
create_event(
EventType::TaskCompleted,
-1,
Some("hello".to_string()),
Some(json!("hello").to_string()),
Some(0),
),
];
let mut state = OrchestrationState::new(history);
assert!(state.is_replaying());
let (idx, event) = state.find_scheduled_task("hello").unwrap();
event.is_processed = true;
let (idx, event) = state.find_finished_task(idx).unwrap();
event.is_processed = true;
let result = Some(from_str(&event.result.as_ref().unwrap()).unwrap());
let state = Rc::new(RefCell::new(state));
let mut future = ActionFuture::new(result, state.clone(), Some(idx));
future.notify_inner();
assert_eq!(future.event_index(), Some(idx));
assert_eq!(poll(future), Poll::Ready(json!("hello")));
assert!(state.borrow().is_replaying());
}
}

View File

@ -1,13 +0,0 @@
use serde::Deserialize;
/// Represents the Durable Funtions client creation URLs.
#[derive(Debug, Clone, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct CreationUrls {
/// The URL for creating a new orchestration instance.
#[serde(rename = "createNewInstancePostUri")]
pub create_new_instance_url: String,
/// The URL for creating and waiting on a new orchestration instance.
#[serde(rename = "createAndWaitOnNewInstancePostUri")]
pub create_new_instance_and_wait_url: String,
}

View File

@ -92,3 +92,244 @@ where
self.event_index
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::durable::{
tests::{create_event, poll},
ActionFuture, EventType,
};
use serde_json::{from_str, json};
use std::task::Poll;
#[test]
fn it_polls_pending_without_a_result() {
let history = vec![create_event(
EventType::OrchestratorStarted,
-1,
None,
None,
None,
)];
let state = Rc::new(RefCell::new(OrchestrationState::new(history)));
let future1 = ActionFuture::<()>::new(None, state.clone(), None);
let future2 = ActionFuture::<()>::new(None, state.clone(), None);
let join = JoinAll::new(state.clone(), vec![future1, future2]);
assert_eq!(join.event_index(), None);
assert_eq!(poll(join), Poll::Pending);
}
#[test]
fn it_polls_pending_with_a_result() {
let history = vec![
create_event(EventType::OrchestratorStarted, -1, None, None, None),
create_event(
EventType::TaskScheduled,
0,
Some("hello".to_string()),
None,
None,
),
create_event(
EventType::TaskScheduled,
1,
Some("world".to_string()),
None,
None,
),
create_event(
EventType::TaskCompleted,
-1,
Some("hello".to_string()),
Some(json!("hello").to_string()),
Some(0),
),
create_event(
EventType::TaskCompleted,
-1,
Some("world".to_string()),
Some(json!("world").to_string()),
Some(1),
),
];
let mut state = OrchestrationState::new(history);
let (idx, event) = state.find_scheduled_task("hello").unwrap();
event.is_processed = true;
let (idx, event) = state.find_finished_task(idx).unwrap();
event.is_processed = true;
let result1 = Some(from_str(&event.result.as_ref().unwrap()).unwrap());
let idx1 = Some(idx);
let (idx, event) = state.find_scheduled_task("world").unwrap();
event.is_processed = true;
let (idx, event) = state.find_finished_task(idx).unwrap();
event.is_processed = true;
let result2 = Some(from_str(&event.result.as_ref().unwrap()).unwrap());
let idx2 = Some(idx);
let state = Rc::new(RefCell::new(state));
let future1 = ActionFuture::new(result1, state.clone(), idx1);
let future2 = ActionFuture::new(result2, state.clone(), idx2);
let join = JoinAll::new(state.clone(), vec![future2, future1]);
assert_eq!(join.event_index(), idx2);
assert_eq!(
poll(join),
Poll::Ready(vec![json!("world"), json!("hello")])
);
}
#[test]
fn it_updates_state() {
let history = vec![
create_event(EventType::OrchestratorStarted, -1, None, None, None),
create_event(
EventType::TaskScheduled,
0,
Some("hello".to_string()),
None,
None,
),
create_event(
EventType::TaskScheduled,
1,
Some("world".to_string()),
None,
None,
),
create_event(EventType::OrchestratorCompleted, -1, None, None, None),
create_event(EventType::OrchestratorStarted, -1, None, None, None),
create_event(
EventType::TaskCompleted,
-1,
Some("hello".to_string()),
Some(json!("hello").to_string()),
Some(0),
),
create_event(EventType::OrchestratorCompleted, -1, None, None, None),
create_event(EventType::OrchestratorStarted, -1, None, None, None),
create_event(
EventType::TaskCompleted,
-1,
Some("world".to_string()),
Some(json!("world").to_string()),
Some(1),
),
];
let mut state = OrchestrationState::new(history);
assert!(state.is_replaying());
let (idx, event) = state.find_scheduled_task("hello").unwrap();
event.is_processed = true;
let (idx, event) = state.find_finished_task(idx).unwrap();
event.is_processed = true;
let result1 = Some(from_str(&event.result.as_ref().unwrap()).unwrap());
let idx1 = Some(idx);
let (idx, event) = state.find_scheduled_task("world").unwrap();
event.is_processed = true;
let (idx, event) = state.find_finished_task(idx).unwrap();
event.is_processed = true;
let result2 = Some(from_str(&event.result.as_ref().unwrap()).unwrap());
let idx2 = Some(idx);
let state = Rc::new(RefCell::new(state));
let future1 = ActionFuture::new(result1, state.clone(), idx1);
let future2 = ActionFuture::new(result2, state.clone(), idx2);
let join = JoinAll::new(state.clone(), vec![future2, future1]);
assert_eq!(join.event_index(), idx2);
assert_eq!(
poll(join),
Poll::Ready(vec![json!("world"), json!("hello")])
);
assert!(!state.borrow().is_replaying());
}
#[test]
fn it_does_not_update_state_when_an_inner_future() {
let history = vec![
create_event(EventType::OrchestratorStarted, -1, None, None, None),
create_event(
EventType::TaskScheduled,
0,
Some("hello".to_string()),
None,
None,
),
create_event(
EventType::TaskScheduled,
1,
Some("world".to_string()),
None,
None,
),
create_event(EventType::OrchestratorCompleted, -1, None, None, None),
create_event(EventType::OrchestratorStarted, -1, None, None, None),
create_event(
EventType::TaskCompleted,
-1,
Some("hello".to_string()),
Some(json!("hello").to_string()),
Some(0),
),
create_event(EventType::OrchestratorCompleted, -1, None, None, None),
create_event(EventType::OrchestratorStarted, -1, None, None, None),
create_event(
EventType::TaskCompleted,
-1,
Some("world".to_string()),
Some(json!("world").to_string()),
Some(1),
),
];
let mut state = OrchestrationState::new(history);
assert!(state.is_replaying());
let (idx, event) = state.find_scheduled_task("hello").unwrap();
event.is_processed = true;
let (idx, event) = state.find_finished_task(idx).unwrap();
event.is_processed = true;
let result1 = Some(from_str(&event.result.as_ref().unwrap()).unwrap());
let idx1 = Some(idx);
let (idx, event) = state.find_scheduled_task("world").unwrap();
event.is_processed = true;
let (idx, event) = state.find_finished_task(idx).unwrap();
event.is_processed = true;
let result2 = Some(from_str(&event.result.as_ref().unwrap()).unwrap());
let idx2 = Some(idx);
let state = Rc::new(RefCell::new(state));
let future1 = ActionFuture::new(result1, state.clone(), idx1);
let future2 = ActionFuture::new(result2, state.clone(), idx2);
let mut join = JoinAll::new(state.clone(), vec![future2, future1]);
join.notify_inner();
assert_eq!(join.event_index(), idx2);
assert_eq!(
poll(join),
Poll::Ready(vec![json!("world"), json!("hello")])
);
assert!(state.borrow().is_replaying());
}
}

View File

@ -1,24 +0,0 @@
use serde::Deserialize;
/// Represents the Durable Funtions client management URLs.
#[derive(Debug, Clone, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct ManagementUrls {
/// The ID of the orchestration instance.
pub id: String,
/// The status URL of the orchestration instance.
#[serde(rename = "statusQueryGetUri")]
pub status_query_url: String,
/// The "raise event" URL of the orchestration instance.
#[serde(rename = "sendEventPostUri")]
pub raise_event_url: String,
/// The "terminate" URL of the orchestration instance.
#[serde(rename = "terminatePostUri")]
pub terminate_url: String,
/// The "rewind" URL of the orchestration instance.
#[serde(rename = "rewindPostUri")]
pub rewind_url: String,
/// The "purge history" URL of the orchestration instance.
#[serde(rename = "purgeHistoryDeleteUri")]
pub purge_history_url: String,
}

View File

@ -131,3 +131,374 @@ impl OrchestrationState {
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::durable::tests::create_event;
use serde_json::json;
#[test]
#[should_panic(expected = "failed to find orchestration started event")]
fn it_requires_an_orchestration_start_event() {
OrchestrationState::new(Vec::new());
}
#[test]
fn it_constructs() {
let history = vec![create_event(
EventType::OrchestratorStarted,
-1,
None,
None,
None,
)];
let timestamp = history[0].timestamp;
let state = OrchestrationState::new(history);
assert_eq!(state.current_time(), timestamp);
assert_eq!(state.is_replaying(), false);
}
#[test]
fn it_pushes_an_action() {
let history = vec![create_event(
EventType::OrchestratorStarted,
-1,
None,
None,
None,
)];
let mut state = OrchestrationState::new(history);
let action = Action::CallActivity {
function_name: "test".to_string(),
input: json!("hello"),
};
state.push_action(action.clone());
assert_eq!(state.result.actions.len(), 1);
assert_eq!(state.result.actions[0].len(), 1);
assert_eq!(state.result.actions[0][0], action);
}
#[test]
fn it_sets_done_with_output() {
let history = vec![create_event(
EventType::OrchestratorStarted,
-1,
None,
None,
None,
)];
let mut state = OrchestrationState::new(history);
state.set_output(json!(42));
assert!(state.result.is_done);
assert_eq!(state.result.output.as_ref().unwrap(), &json!(42));
}
#[test]
fn it_returns_a_json_result() {
let history = vec![create_event(
EventType::OrchestratorStarted,
-1,
None,
None,
None,
)];
let mut state = OrchestrationState::new(history);
state.push_action(Action::CallActivity {
function_name: "test".to_string(),
input: json!("hello"),
});
state.set_output(json!("hello"));
assert_eq!(
state.result(),
r#"{"isDone":true,"actions":[[{"actionType":"callActivity","functionName":"test","input":"hello"}]],"output":"hello","customStatus":null,"error":null}"#
);
}
#[test]
fn it_returns_none_if_scheduled_activity_is_not_in_history() {
let history = vec![create_event(
EventType::OrchestratorStarted,
-1,
None,
None,
None,
)];
let mut state = OrchestrationState::new(history);
assert_eq!(state.find_scheduled_task("foo"), None);
}
#[test]
fn it_returns_some_if_scheduled_activity_is_in_history() {
let history = vec![
create_event(EventType::OrchestratorStarted, -1, None, None, None),
create_event(
EventType::TaskScheduled,
0,
Some("foo".to_string()),
None,
None,
),
];
let mut state = OrchestrationState::new(history);
match state.find_scheduled_task("foo") {
Some((idx, entry)) => {
assert_eq!(idx, 1);
assert_eq!(entry.event_type, EventType::TaskScheduled);
}
None => assert!(false),
}
}
#[test]
fn it_returns_none_if_finished_activity_is_not_in_history() {
let history = vec![
create_event(EventType::OrchestratorStarted, -1, None, None, None),
create_event(
EventType::TaskScheduled,
0,
Some("foo".to_string()),
None,
None,
),
];
let mut state = OrchestrationState::new(history);
match state.find_scheduled_task("foo") {
Some((idx, entry)) => {
assert_eq!(idx, 1);
assert_eq!(entry.event_type, EventType::TaskScheduled);
assert_eq!(state.find_finished_task(idx), None);
}
None => assert!(false),
}
}
#[test]
fn it_returns_some_if_completed_activity_is_in_history() {
let history = vec![
create_event(EventType::OrchestratorStarted, -1, None, None, None),
create_event(
EventType::TaskScheduled,
0,
Some("foo".to_string()),
None,
None,
),
create_event(
EventType::TaskCompleted,
-1,
Some("foo".to_string()),
Some(json!("bar").to_string()),
Some(0),
),
];
let mut state = OrchestrationState::new(history);
match state.find_scheduled_task("foo") {
Some((idx, entry)) => {
assert_eq!(idx, 1);
assert_eq!(entry.event_type, EventType::TaskScheduled);
match state.find_finished_task(idx) {
Some((idx, entry)) => {
assert_eq!(idx, 2);
assert_eq!(entry.event_type, EventType::TaskCompleted);
assert_eq!(entry.result, Some(json!("bar").to_string()));
}
None => assert!(false),
}
}
None => assert!(false),
}
}
#[test]
fn it_returns_some_if_failed_activity_is_in_history() {
let history = vec![
create_event(EventType::OrchestratorStarted, -1, None, None, None),
create_event(
EventType::TaskScheduled,
0,
Some("foo".to_string()),
None,
None,
),
create_event(
EventType::TaskFailed,
-1,
Some("foo".to_string()),
None,
Some(0),
),
];
let mut state = OrchestrationState::new(history);
match state.find_scheduled_task("foo") {
Some((idx, entry)) => {
assert_eq!(idx, 1);
assert_eq!(entry.event_type, EventType::TaskScheduled);
match state.find_finished_task(idx) {
Some((idx, entry)) => {
assert_eq!(idx, 2);
assert_eq!(entry.event_type, EventType::TaskFailed);
}
None => assert!(false),
}
}
None => assert!(false),
}
}
#[test]
fn it_does_not_update_state_if_there_is_no_completed_event() {
let history = vec![
create_event(EventType::OrchestratorStarted, -1, None, None, None),
create_event(
EventType::TaskScheduled,
0,
Some("foo".to_string()),
None,
None,
),
create_event(
EventType::TaskFailed,
-1,
Some("foo".to_string()),
None,
Some(0),
),
];
let mut state = OrchestrationState::new(history);
assert!(!state.is_replaying());
let current_time = state.current_time();
state.update(2);
assert_eq!(state.current_time(), current_time);
assert!(!state.is_replaying());
}
#[test]
fn it_does_not_update_state_if_index_is_less_than_end() {
let history = vec![
create_event(EventType::OrchestratorStarted, -1, None, None, None),
create_event(
EventType::TaskScheduled,
0,
Some("foo".to_string()),
None,
None,
),
create_event(
EventType::TaskFailed,
-1,
Some("foo".to_string()),
None,
Some(0),
),
create_event(EventType::OrchestratorCompleted, -1, None, None, None),
create_event(EventType::OrchestratorStarted, -1, None, None, None),
];
let mut state = OrchestrationState::new(history);
assert!(state.is_replaying());
let current_time = state.current_time();
state.update(2);
assert_eq!(state.current_time(), current_time);
assert!(state.is_replaying());
}
#[test]
fn it_updates_when_the_index_is_greater_with_end() {
let history = vec![
create_event(EventType::OrchestratorStarted, -1, None, None, None),
create_event(
EventType::TaskScheduled,
0,
Some("foo".to_string()),
None,
None,
),
create_event(EventType::OrchestratorCompleted, -1, None, None, None),
create_event(EventType::OrchestratorStarted, -1, None, None, None),
create_event(
EventType::TaskFailed,
-1,
Some("foo".to_string()),
None,
Some(0),
),
create_event(EventType::OrchestratorCompleted, -1, None, None, None),
create_event(EventType::OrchestratorStarted, -1, None, None, None),
];
let mut state = OrchestrationState::new(history);
assert!(state.is_replaying());
let current_time = state.current_time();
state.update(4);
assert_ne!(state.current_time(), current_time);
assert!(state.is_replaying());
}
#[test]
fn it_updates_when_the_index_is_greater() {
let history = vec![
create_event(EventType::OrchestratorStarted, -1, None, None, None),
create_event(
EventType::TaskScheduled,
0,
Some("foo".to_string()),
None,
None,
),
create_event(EventType::OrchestratorCompleted, -1, None, None, None),
create_event(EventType::OrchestratorStarted, -1, None, None, None),
create_event(
EventType::TaskFailed,
-1,
Some("foo".to_string()),
None,
Some(0),
),
];
let mut state = OrchestrationState::new(history);
assert!(state.is_replaying());
let current_time = state.current_time();
state.update(4);
assert_ne!(state.current_time(), current_time);
assert!(!state.is_replaying());
}
}

View File

@ -100,3 +100,281 @@ where
self.event_index
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::durable::{
tests::{create_event, poll},
ActionFuture, EventType,
};
use serde_json::{from_str, json, Value};
use std::task::Poll;
#[test]
fn it_polls_pending_without_a_result() {
let history = vec![create_event(
EventType::OrchestratorStarted,
-1,
None,
None,
None,
)];
let state = Rc::new(RefCell::new(OrchestrationState::new(history)));
let future1 = ActionFuture::<()>::new(None, state.clone(), None);
let future2 = ActionFuture::<()>::new(None, state.clone(), None);
let select = SelectAll::new(state.clone(), vec![future1, future2]);
assert_eq!(select.event_index(), None);
match poll(select) {
Poll::Ready(_) => assert!(false),
Poll::Pending => {}
};
}
#[test]
fn it_polls_pending_with_a_result() {
let history = vec![
create_event(EventType::OrchestratorStarted, -1, None, None, None),
create_event(
EventType::TaskScheduled,
0,
Some("hello".to_string()),
None,
None,
),
create_event(
EventType::TaskScheduled,
1,
Some("world".to_string()),
None,
None,
),
create_event(
EventType::TaskCompleted,
-1,
Some("hello".to_string()),
Some(json!("hello").to_string()),
Some(0),
),
create_event(
EventType::TaskCompleted,
-1,
Some("world".to_string()),
Some(json!("world").to_string()),
Some(1),
),
];
let mut state = OrchestrationState::new(history);
let (idx, event) = state.find_scheduled_task("hello").unwrap();
event.is_processed = true;
let (idx, event) = state.find_finished_task(idx).unwrap();
event.is_processed = true;
let result1 = Some(from_str(&event.result.as_ref().unwrap()).unwrap());
let idx1 = Some(idx);
let (idx, event) = state.find_scheduled_task("world").unwrap();
event.is_processed = true;
let (idx, event) = state.find_finished_task(idx).unwrap();
event.is_processed = true;
let result2 = Some(from_str(&event.result.as_ref().unwrap()).unwrap());
let idx2 = Some(idx);
let state = Rc::new(RefCell::new(state));
let future1 = ActionFuture::new(result1, state.clone(), idx1);
let future2 = ActionFuture::new(result2, state.clone(), idx2);
let select = SelectAll::new(state.clone(), vec![future2, future1]);
assert_eq!(select.event_index(), idx1);
match poll(select) {
Poll::Ready((r, i, mut remaining)) => {
assert_eq!(r, json!("hello"));
assert_eq!(i, 1);
assert_eq!(remaining.len(), 1);
assert_eq!(poll(remaining.pop().unwrap()), Poll::Ready(json!("world")));
}
Poll::Pending => assert!(false),
};
}
#[test]
fn it_updates_state() {
let history = vec![
create_event(EventType::OrchestratorStarted, -1, None, None, None),
create_event(
EventType::TaskScheduled,
0,
Some("hello".to_string()),
None,
None,
),
create_event(
EventType::TaskScheduled,
1,
Some("world".to_string()),
None,
None,
),
create_event(EventType::OrchestratorCompleted, -1, None, None, None),
create_event(EventType::OrchestratorStarted, -1, None, None, None),
create_event(
EventType::TaskCompleted,
-1,
Some("hello".to_string()),
Some(json!("hello").to_string()),
Some(0),
),
create_event(EventType::OrchestratorCompleted, -1, None, None, None),
create_event(EventType::OrchestratorStarted, -1, None, None, None),
create_event(
EventType::TaskCompleted,
-1,
Some("world".to_string()),
Some(json!("world").to_string()),
Some(1),
),
];
let mut state = OrchestrationState::new(history);
assert!(state.is_replaying());
let (idx, event) = state.find_scheduled_task("hello").unwrap();
event.is_processed = true;
let (idx, event) = state.find_finished_task(idx).unwrap();
event.is_processed = true;
let result1: Option<Value> = Some(from_str(&event.result.as_ref().unwrap()).unwrap());
let idx1 = Some(idx);
let (idx, event) = state.find_scheduled_task("world").unwrap();
event.is_processed = true;
let (idx, event) = state.find_finished_task(idx).unwrap();
event.is_processed = true;
let result2 = Some(from_str(&event.result.as_ref().unwrap()).unwrap());
let idx2 = Some(idx);
let state = Rc::new(RefCell::new(state));
let future1 = ActionFuture::new(result1, state.clone(), idx1);
let future2 = ActionFuture::new(result2, state.clone(), idx2);
let select = SelectAll::new(state.clone(), vec![future2, future1]);
assert_eq!(select.event_index(), idx1);
match poll(select) {
Poll::Ready((r, i, remaining)) => {
assert_eq!(r, json!("hello"));
assert_eq!(i, 1);
assert_eq!(remaining.len(), 1);
match poll(SelectAll::new(state.clone(), remaining)) {
Poll::Ready((r, i, remaining)) => {
assert_eq!(r, json!("world"));
assert_eq!(i, 0);
assert!(remaining.is_empty());
assert!(!state.borrow().is_replaying());
}
Poll::Pending => assert!(false),
};
}
Poll::Pending => assert!(false),
};
}
#[test]
fn it_does_not_update_state_when_an_inner_future() {
let history = vec![
create_event(EventType::OrchestratorStarted, -1, None, None, None),
create_event(
EventType::TaskScheduled,
0,
Some("hello".to_string()),
None,
None,
),
create_event(
EventType::TaskScheduled,
1,
Some("world".to_string()),
None,
None,
),
create_event(EventType::OrchestratorCompleted, -1, None, None, None),
create_event(EventType::OrchestratorStarted, -1, None, None, None),
create_event(
EventType::TaskCompleted,
-1,
Some("hello".to_string()),
Some(json!("hello").to_string()),
Some(0),
),
create_event(EventType::OrchestratorCompleted, -1, None, None, None),
create_event(EventType::OrchestratorStarted, -1, None, None, None),
create_event(
EventType::TaskCompleted,
-1,
Some("world".to_string()),
Some(json!("world").to_string()),
Some(1),
),
];
let mut state = OrchestrationState::new(history);
assert!(state.is_replaying());
let (idx, event) = state.find_scheduled_task("hello").unwrap();
event.is_processed = true;
let (idx, event) = state.find_finished_task(idx).unwrap();
event.is_processed = true;
let result1: Option<Value> = Some(from_str(&event.result.as_ref().unwrap()).unwrap());
let idx1 = Some(idx);
let (idx, event) = state.find_scheduled_task("world").unwrap();
event.is_processed = true;
let (idx, event) = state.find_finished_task(idx).unwrap();
event.is_processed = true;
let result2 = Some(from_str(&event.result.as_ref().unwrap()).unwrap());
let idx2 = Some(idx);
let state = Rc::new(RefCell::new(state));
let future1 = ActionFuture::new(result1, state.clone(), idx1);
let future2 = ActionFuture::new(result2, state.clone(), idx2);
let mut select = SelectAll::new(state.clone(), vec![future2, future1]);
select.notify_inner();
assert_eq!(select.event_index(), idx1);
match poll(select) {
Poll::Ready((r, i, remaining)) => {
assert_eq!(r, json!("hello"));
assert_eq!(i, 1);
assert_eq!(remaining.len(), 1);
let mut select = SelectAll::new(state.clone(), remaining);
select.notify_inner();
match poll(select) {
Poll::Ready((r, i, remaining)) => {
assert_eq!(r, json!("world"));
assert_eq!(i, 0);
assert!(remaining.is_empty());
assert!(state.borrow().is_replaying());
}
Poll::Pending => assert!(false),
};
}
Poll::Pending => assert!(false),
};
}
}