-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathleader.rs
98 lines (83 loc) · 3.46 KB
/
leader.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
use frankenpaxos::automicrobenchmarks_proto;
use hydroflow::bytes::BytesMut;
use hydroflow::util::{
cli::{
launch_flow, ConnectedBidi, ConnectedDemux, ConnectedSink, ConnectedSource,
ConnectedTagged, ServerOrBound,
},
deserialize_from_bytes, serialize_to_bytes,
};
use hydroflow_datalog::datalog;
use prost::Message;
use std::rc::Rc;
use std::{collections::HashMap, io::Cursor};
#[derive(clap::Args, Debug)]
pub struct LeaderArgs {}
fn decrypt_and_deserialize(msg: BytesMut) -> (i64, u32, Rc<Vec<u8>>) {
let s =
automicrobenchmarks_proto::ServerInbound::decode(&mut Cursor::new(msg.as_ref())).unwrap();
return (s.id, s.ballot, Rc::new(s.payload));
}
fn encrypt_and_serialize(id: i64, payload: Rc<Vec<u8>>) -> bytes::Bytes {
let out = automicrobenchmarks_proto::ClientInbound {
request: Some(
automicrobenchmarks_proto::client_inbound::Request::ClientReply(
automicrobenchmarks_proto::ClientReply {
id,
ballot: None,
payload: Some(payload.as_ref().clone()),
},
),
),
};
let mut buf = Vec::new();
out.encode(&mut buf).unwrap();
return bytes::Bytes::from(buf);
}
pub async fn run(_cfg: LeaderArgs, mut ports: HashMap<String, ServerOrBound>) {
// Client setup
let client_recv = ports
.remove("receive_from$clients$0")
.unwrap()
.connect::<ConnectedTagged<ConnectedBidi>>()
.await
.into_source();
let client_send = ports
.remove("send_to$clients$0")
.unwrap()
.connect::<ConnectedDemux<ConnectedBidi>>()
.await
.into_sink();
// Replica setup
let to_replica_port = ports
.remove("send_to$replicas$0")
.unwrap()
.connect::<ConnectedDemux<ConnectedBidi>>()
.await;
let peers = to_replica_port.keys.clone();
let num_replicas = peers.len();
let to_replica_sink = to_replica_port.into_sink();
let from_replica_source = ports
.remove("receive_from$replicas$0")
.unwrap()
.connect::<ConnectedTagged<ConnectedBidi>>()
.await
.into_source();
let df = datalog!(
r#"
.input replicas `repeat_iter(peers.clone()) -> map(|p| (p,))`
.input numReplicas `repeat_iter([(num_replicas,),])`
.async voteToReplica `map(|(node_id, v)| (node_id, serialize_to_bytes(v))) -> dest_sink(to_replica_sink)` `null::<(u32,i64,u32,Rc<Vec<u8>>)>()`
.async voteFromReplica `null::<(u32,u32,i64,Rc<Vec<u8>>)>()` `source_stream(from_replica_source) -> map(|v| deserialize_from_bytes::<(u32,u32,i64,Rc<Vec<u8>>)>(v.unwrap().1).unwrap())`
.async clientIn `null::<(u32,i64,Rc<Vec<u8>>)>()` `source_stream(client_recv) -> map(|x| {let input = x.unwrap(); let v = decrypt_and_deserialize(input.1); (input.0, v.0, v.2,)})`
.async clientOut `map(|(node_id, (id, payload,))| (node_id, encrypt_and_serialize(id, payload))) -> dest_sink(client_send)` `null::<(i64,Rc<Vec<u8>>)>()`
voteToReplica@addr(client, id, v) :~ clientIn(client, id, v), replicas(addr)
allVotes(l, client, id, v) :- voteFromReplica(l, client, id, v)
allVotes(l, client, id, v) :+ allVotes(l, client, id, v), !committed(client, id, _)
voteCounts(count(l), client, id) :- allVotes(l, client, id, v)
committed(client, id, v) :- voteCounts(n, client, id), numReplicas(n), allVotes(l, client, id, v)
clientOut@client(id, v) :~ committed(client, id, v)
"#
);
launch_flow(df).await
}