-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmod.rs
90 lines (74 loc) · 2.53 KB
/
mod.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
//! This is where the actual backend implementation goes, structured as an [asynchronous agent][run_until].
use std::time::Duration;
use tokio::sync::mpsc::error::SendError;
use tokio::sync::mpsc::{Sender, UnboundedReceiver};
use tokio::sync::watch::Receiver;
use crate::agent::protocol::*;
pub mod protocol;
pub type Inbox = UnboundedReceiver<Command>;
pub type Outbox = Sender<Event>;
#[derive(Debug, thiserror::Error)]
pub enum Error {
#[error("Failed to send event.")]
OutboxError(#[from] SendError<Event>),
#[error("The agent has crashed.")]
Crash,
}
pub async fn run_until(
mut shutdown: Receiver<bool>,
mut inbox: Inbox,
outbox: Outbox,
) -> Result<(), Error> {
let mut one_sec = tokio::time::interval(Duration::from_secs(1));
let mut secs = 1;
outbox.send(Event::Started).await?;
loop {
tokio::select! {
_ = shutdown.changed() => break,
_ = one_sec.tick() => {
outbox.send(Event::Tick { secs }).await?;
secs += 1;
},
Some(c) = inbox.recv() => {
match c {
Command::Ping { payload } => {
// simulate an error
if payload == "crash" {
// open problem: how to transparently propagate the backtrace from here?
return Err(Error::Crash)
}
outbox.send(Event::Pong { payload }).await?;
}
}
},
else => break,
}
}
Ok(())
}
#[cfg(test)]
mod tests {
use tokio::sync::{mpsc, watch};
use crate::agent::protocol::{Command, Event};
use crate::agent::run_until;
#[tokio::test]
async fn agent_is_well_behaved() -> eyre::Result<()> {
let (commands, inbox) = mpsc::unbounded_channel();
let (outbox, mut events) = mpsc::channel(1);
let (shutdown_sender, shutdown) = watch::channel(false);
let join_handle = tokio::spawn(async { run_until(shutdown, inbox, outbox).await });
commands.send(Command::Ping {
payload: String::from("command"),
})?;
assert_eq!(events.recv().await, Some(Event::Started));
assert_eq!(
events.recv().await,
Some(Event::Pong {
payload: String::from("command")
})
);
let _ = shutdown_sender.send(true);
assert_eq!(join_handle.await?.map_err(|e| format!("{:?}", e)), Ok(()));
Ok(())
}
}