-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathtransport.go
81 lines (69 loc) · 3.08 KB
/
transport.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
package lime
import (
"context"
"encoding/json"
"fmt"
"io"
"net"
)
// Transport defines the basic features for a Lime communication mean
type Transport interface {
io.Closer
Send(ctx context.Context, e envelope) error // Send sends an envelope to the remote node.
Receive(ctx context.Context) (envelope, error) // Receive receives an envelope from the remote node.
SupportedCompression() []SessionCompression // SupportedCompression enumerates the supported compression options for the transport.
Compression() SessionCompression // Compression returns the current transport compression option.
SetCompression(ctx context.Context, c SessionCompression) error // SetCompression defines the compression mode for the transport.
SupportedEncryption() []SessionEncryption // SupportedEncryption enumerates the supported encryption options for the transport.
Encryption() SessionEncryption // Encryption returns the current transport encryption option.
SetEncryption(ctx context.Context, e SessionEncryption) error // SetEncryption defines the encryption mode for the transport.
Connected() bool // Connected indicates if the transport is connected.
LocalAddr() net.Addr // LocalAddr returns the local endpoint address.
RemoteAddr() net.Addr // RemoteAddr returns the remote endpoint address.
}
// TransportListener Defines a listener interface for the transports.
type TransportListener interface {
io.Closer
Listen(ctx context.Context, addr net.Addr) error // Listen start listening for new transport connections.
Accept(ctx context.Context) (Transport, error) // Accept a new transport connection.
}
// TraceWriter Enable request tracing for network transports.
type TraceWriter interface {
SendWriter() *io.Writer // SendWriter returns the sendWriter for the transport send operations
ReceiveWriter() *io.Writer // ReceiveWriter returns the sendWriter for the transport receive operations
}
// StdoutTraceWriter Implements a TraceWriter that uses the standard output for
// writing send and received envelopes.
type StdoutTraceWriter struct {
sendWriter io.Writer
receiveWriter io.Writer
}
func NewStdoutTraceWriter() TraceWriter {
sendReader, sendWriter := io.Pipe()
receiveReader, receiveWriter := io.Pipe()
sendDecoder := json.NewDecoder(sendReader)
receiveDecoder := json.NewDecoder(receiveReader)
tw := StdoutTraceWriter{
sendWriter: sendWriter,
receiveWriter: receiveWriter,
}
trace := func(dec *json.Decoder, action string) {
for {
var j json.RawMessage
err := dec.Decode(&j)
if err != nil {
break
}
fmt.Printf("%v: %v\n", action, string(j))
}
}
go trace(receiveDecoder, "receive")
go trace(sendDecoder, "send")
return &tw
}
func (t StdoutTraceWriter) SendWriter() *io.Writer {
return &t.sendWriter
}
func (t StdoutTraceWriter) ReceiveWriter() *io.Writer {
return &t.receiveWriter
}