Skip to content

Commit

Permalink
zmtp: implement XSUB/XPUB, PUB/SUB and REQ/REP (#73)
Browse files Browse the repository at this point in the history
* zmtp: improve perfs of Connection.(read|send){,Multipart}

This CL uses io.ReadFull to make sure all requested bytes are read from
an io.Reader.
It's also using binary.ByteOrder.Uint64 and binary.ByteOrder.PutUint64
directly instead of going the round about way through (slow) reflection.

Fixes #67.
Fixes #61.

* zmtp: reduce number of allocs in Connection.SendCommand

* zmtp: reduce number of allocs in Connection.writeMetadata

* zmtp: removed reflection from de/serializing greetings

* zmtp: remove slow reflection in Connection.recvMetadata

* zmtp: implement REQ/REP

Updates #65.

* zmtp: implement PUB/SUB

Updates zeromq/mq#66.

* zmtp: implement XSUB/XPUB

Updates zeromq/mq#66.

* zmtp: cosmetics
  • Loading branch information
sbinet authored and Luna Duclos committed Apr 19, 2018
1 parent 44e4a3e commit 44a15e8
Show file tree
Hide file tree
Showing 2 changed files with 97 additions and 24 deletions.
37 changes: 13 additions & 24 deletions zmtp/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,29 +34,18 @@ func (id SocketIdentity) String() string {
}

const (
// ClientSocketType is a ZMQ_CLIENT socket
ClientSocketType SocketType = "CLIENT"

// ServerSocketType is a ZMQ_SERVER socket
ServerSocketType SocketType = "SERVER"

// PullSocketType is a ZMQ_PULL socket
PullSocketType SocketType = "PULL"

// PushSocketType is a ZMQ_PUSH socket
PushSocketType SocketType = "PUSH"

// DealerSocketType is a ZMQ_DEALER socket
DealerSocketType SocketType = "DEALER"

// RouterSocketType is a ZMQ_ROUTER socket
RouterSocketType SocketType = "ROUTER"

// ReqSocketType is a ZMQ_REQ socket
ReqSocketType SocketType = "REQ"

// RepSocketType is a ZMQ_REP socket
RepSocketType SocketType = "REP"
ClientSocketType SocketType = "CLIENT" // a ZMQ_CLIENT socket
ServerSocketType SocketType = "SERVER" // a ZMQ_SERVER socket
PullSocketType SocketType = "PULL" // a ZMQ_PULL socket
PushSocketType SocketType = "PUSH" // a ZMQ_PUSH socket
DealerSocketType SocketType = "DEALER" // a ZMQ_DEALER socket
RouterSocketType SocketType = "ROUTER" // a ZMQ_ROUTER socket
ReqSocketType SocketType = "REQ" // a ZMQ_REQ socket
RepSocketType SocketType = "REP" // a ZMQ_REP socket
PubSocketType SocketType = "PUB" // a ZMQ_PUB socket
SubSocketType SocketType = "SUB" // a ZMQ_SUB socket
XPubSocketType SocketType = "XPUB" // a ZMQ_XPUB socket
XSubSocketType SocketType = "XSUB" // a ZMQ_XSUB socket
)

// NewConnection accepts an io.ReadWriter and creates a new ZMTP connection
Expand Down Expand Up @@ -276,7 +265,7 @@ func (c *Connection) SendCommand(commandName string, body []byte) error {
buf[0] = byte(cmdLen)
copy(buf[1:], []byte(commandName))
copy(buf[1+cmdLen:], body)

return c.send(true, buf)
}

Expand Down
84 changes: 84 additions & 0 deletions zmtp/socket.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,14 @@ func NewSocket(socketType SocketType) (Socket, error) {
return reqSocket{}, nil
case RepSocketType:
return repSocket{}, nil
case PubSocketType:
return pubSocket{}, nil
case SubSocketType:
return subSocket{}, nil
case XPubSocketType:
return xpubSocket{}, nil
case XSubSocketType:
return xsubSocket{}, nil
default:
return nil, errors.New("Invalid socket type")
}
Expand Down Expand Up @@ -190,3 +198,79 @@ func (repSocket) IsCommandTypeValid(name string) bool {
// FIXME
return false
}

type pubSocket struct{}

func (pubSocket) Type() SocketType {
return PubSocketType
}

func (pubSocket) IsSocketTypeCompatible(socketType SocketType) bool {
switch socketType {
case SubSocketType, XSubSocketType:
return true
}
return false
}

func (pubSocket) IsCommandTypeValid(name string) bool {
// FIXME
return false
}

type subSocket struct{}

func (subSocket) Type() SocketType {
return SubSocketType
}

func (subSocket) IsSocketTypeCompatible(socketType SocketType) bool {
switch socketType {
case PubSocketType, XPubSocketType:
return true
}
return false
}

func (subSocket) IsCommandTypeValid(name string) bool {
// FIXME
return false
}

type xpubSocket struct{}

func (xpubSocket) Type() SocketType {
return XPubSocketType
}

func (xpubSocket) IsSocketTypeCompatible(socketType SocketType) bool {
switch socketType {
case SubSocketType, XSubSocketType:
return true
}
return false
}

func (xpubSocket) IsCommandTypeValid(name string) bool {
// FIXME
return false
}

type xsubSocket struct{}

func (xsubSocket) Type() SocketType {
return XSubSocketType
}

func (xsubSocket) IsSocketTypeCompatible(socketType SocketType) bool {
switch socketType {
case PubSocketType, XPubSocketType:
return true
}
return false
}

func (xsubSocket) IsCommandTypeValid(name string) bool {
// FIXME
return false
}

0 comments on commit 44a15e8

Please sign in to comment.