Skip to content

Commit

Permalink
Merge pull request #249 from mreiferson/pause
Browse files Browse the repository at this point in the history
consumer: cleanup RDY handling; fix pausing
  • Loading branch information
mreiferson authored Mar 14, 2019
2 parents 4f1e133 + cd1b94b commit 61f49c0
Show file tree
Hide file tree
Showing 4 changed files with 117 additions and 73 deletions.
7 changes: 3 additions & 4 deletions conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ type Conn struct {
messagesInFlight int64
maxRdyCount int64
rdyCount int64
lastRdyCount int64
lastRdyTimestamp int64
lastMsgTimestamp int64

Expand Down Expand Up @@ -207,13 +206,12 @@ func (c *Conn) RDY() int64 {

// LastRDY returns the previously set RDY count
func (c *Conn) LastRDY() int64 {
return atomic.LoadInt64(&c.lastRdyCount)
return atomic.LoadInt64(&c.rdyCount)
}

// SetRDY stores the specified RDY count
func (c *Conn) SetRDY(rdy int64) {
atomic.StoreInt64(&c.rdyCount, rdy)
atomic.StoreInt64(&c.lastRdyCount, rdy)
if rdy > 0 {
atomic.StoreInt64(&c.lastRdyTimestamp, time.Now().UnixNano())
}
Expand All @@ -225,6 +223,8 @@ func (c *Conn) MaxRDY() int64 {
return c.maxRdyCount
}

// LastRdyTime returns the time of the last non-zero RDY
// update for this connection
func (c *Conn) LastRdyTime() time.Time {
return time.Unix(0, atomic.LoadInt64(&c.lastRdyTimestamp))
}
Expand Down Expand Up @@ -523,7 +523,6 @@ func (c *Conn) readLoop() {
msg.Delegate = delegate
msg.NSQDAddress = c.String()

atomic.AddInt64(&c.rdyCount, -1)
atomic.AddInt64(&c.messagesInFlight, 1)
atomic.StoreInt64(&c.lastMsgTimestamp, time.Now().UnixNano())

Expand Down
27 changes: 7 additions & 20 deletions consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,11 +111,11 @@ type Consumer struct {

needRDYRedistributed int32

backoffMtx sync.RWMutex
backoffMtx sync.Mutex

incomingMessages chan *Message

rdyRetryMtx sync.RWMutex
rdyRetryMtx sync.Mutex
rdyRetryTimers map[string]*time.Timer

pendingConnections map[string]*Conn
Expand Down Expand Up @@ -264,7 +264,7 @@ func (r *Consumer) perConnMaxInFlight() int64 {
// before being able to receive more messages (ie. RDY count of 0 and not exiting)
func (r *Consumer) IsStarved() bool {
for _, conn := range r.conns() {
threshold := int64(float64(atomic.LoadInt64(&conn.lastRdyCount)) * 0.85)
threshold := int64(float64(conn.RDY()) * 0.85)
inFlight := atomic.LoadInt64(&conn.messagesInFlight)
if inFlight >= threshold && inFlight > 0 && !conn.IsClosing() {
return true
Expand Down Expand Up @@ -642,10 +642,8 @@ func (r *Consumer) DisconnectFromNSQLookupd(addr string) error {
}

func (r *Consumer) onConnMessage(c *Conn, msg *Message) {
atomic.AddInt64(&r.totalRdyCount, -1)
atomic.AddUint64(&r.messagesReceived, 1)
r.incomingMessages <- msg
r.maybeUpdateRDY(c)
}

func (r *Consumer) onConnMessageFinished(c *Conn, msg *Message) {
Expand Down Expand Up @@ -771,11 +769,10 @@ func (r *Consumer) startStopContinueBackoff(conn *Conn, signal backoffSignal) {
// max backoff/normal rate (by ensuring that we dont continually incr/decr
// the counter during a backoff period)
r.backoffMtx.Lock()
defer r.backoffMtx.Unlock()
if r.inBackoffTimeout() {
r.backoffMtx.Unlock()
return
}
defer r.backoffMtx.Unlock()

// update backoff state
backoffUpdated := false
Expand Down Expand Up @@ -879,19 +876,9 @@ func (r *Consumer) maybeUpdateRDY(conn *Conn) {
return
}

remain := conn.RDY()
lastRdyCount := conn.LastRDY()
count := r.perConnMaxInFlight()

// refill when at 1, or at 25%, or if connections have changed and we're imbalanced
if remain <= 1 || remain < (lastRdyCount/4) || (count > 0 && count < remain) {
r.log(LogLevelDebug, "(%s) sending RDY %d (%d remain from last RDY %d)",
conn, count, remain, lastRdyCount)
r.updateRDY(conn, count)
} else {
r.log(LogLevelDebug, "(%s) skip sending RDY %d (%d remain out of last RDY %d)",
conn, count, remain, lastRdyCount)
}
r.log(LogLevelDebug, "(%s) sending RDY %d", conn, count)
r.updateRDY(conn, count)
}

func (r *Consumer) rdyLoop() {
Expand Down Expand Up @@ -961,7 +948,7 @@ func (r *Consumer) sendRDY(c *Conn, count int64) error {
return nil
}

atomic.AddInt64(&r.totalRdyCount, -c.RDY()+count)
atomic.AddInt64(&r.totalRdyCount, count-c.RDY())
c.SetRDY(count)
err := c.WriteCommand(Ready(int(count)))
if err != nil {
Expand Down
13 changes: 1 addition & 12 deletions consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
"log"
"net"
"net/http"
"os"
"strconv"
"strings"
"testing"
Expand Down Expand Up @@ -116,11 +115,6 @@ func TestConsumerTLSSnappy(t *testing.T) {
}

func TestConsumerTLSClientCert(t *testing.T) {
envDl := os.Getenv("NSQ_DOWNLOAD")
if strings.HasPrefix(envDl, "nsq-0.2.24") || strings.HasPrefix(envDl, "nsq-0.2.27") {
t.Log("skipping due to older nsqd")
return
}
cert, _ := tls.LoadX509KeyPair("./test/client.pem", "./test/client.key")
consumerTest(t, func(c *Config) {
c.TlsV1 = true
Expand All @@ -132,11 +126,6 @@ func TestConsumerTLSClientCert(t *testing.T) {
}

func TestConsumerTLSClientCertViaSet(t *testing.T) {
envDl := os.Getenv("NSQ_DOWNLOAD")
if strings.HasPrefix(envDl, "nsq-0.2.24") || strings.HasPrefix(envDl, "nsq-0.2.27") {
t.Log("skipping due to older nsqd")
return
}
consumerTest(t, func(c *Config) {
c.Set("tls_v1", true)
c.Set("tls_cert", "./test/client.pem")
Expand Down Expand Up @@ -168,7 +157,7 @@ func consumerTest(t *testing.T, cb func(c *Config)) {
}
topicName = topicName + strconv.Itoa(int(time.Now().Unix()))
q, _ := NewConsumer(topicName, "ch", config)
q.SetLogger(log.New(os.Stderr, "", log.Flags()), LogLevelDebug)
q.SetLogger(newTestLogger(t), LogLevelDebug)

h := &MyTestHandler{
t: t,
Expand Down
Loading

0 comments on commit 61f49c0

Please sign in to comment.