-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathClient.scala
74 lines (65 loc) · 2.15 KB
/
Client.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
package frankenpaxos.voting
import collection.mutable
import com.google.protobuf.ByteString
import com.github.tototoshi.csv.CSVWriter
import frankenpaxos.Actor
import frankenpaxos.Chan
import frankenpaxos.Logger
import frankenpaxos.ProtoSerializer
import java.io.File
import scala.concurrent.Future
import scala.concurrent.Promise
import scala.scalajs.js.annotation._
@JSExportAll
object ClientReplySerializer extends ProtoSerializer[ClientReply] {
type A = ClientReply
override def toBytes(x: A): Array[Byte] = super.toBytes(x)
override def fromBytes(bytes: Array[Byte]): A = super.fromBytes(bytes)
override def toPrettyString(x: A): String = super.toPrettyString(x)
}
@JSExportAll
object Client {
val serializer = ClientReplySerializer
}
@JSExportAll
class Client[Transport <: frankenpaxos.Transport[Transport]](
address: Transport#Address,
config: Config[Transport],
transport: Transport,
logger: Logger
) extends Actor(address, transport, logger) {
override type InboundMessage = ClientReply
override def serializer = Client.serializer
private val leader =
chan[Leader[Transport]](config.leaderAddress, Leader.serializer)
private var id: Long = 0
private val promises = mutable.Map[Long, Promise[Unit]]()
private val addressAsBytes: ByteString =
ByteString.copyFrom(transport.addressSerializer.toBytes(address))
override def receive(src: Transport#Address, reply: InboundMessage): Unit = {
promises.get(reply.id) match {
case Some(promise) =>
promise.success(())
promises -= reply.id
case None =>
logger.fatal(s"Received reply for unpending echo ${reply.id}.")
}
}
def _request(command: Array[Byte], promise: Promise[Unit]): Unit = {
leader.send(
LeaderInbound().withClientRequest(
ClientRequest(id = id,
clientAddress = addressAsBytes,
command = ByteString.copyFrom(command)
)
)
)
promises(id) = promise
id += 1
}
def request(command: Array[Byte]): Future[Unit] = {
val promise = Promise[Unit]()
transport.executionContext.execute(() => _request(command, promise))
promise.future
}
}