Skip to content

Commit

Permalink
fix(consumer): send on closed channel
Browse files Browse the repository at this point in the history
Rejects the connection and quickly exits the reconnect loop after the consumer stops.

Signed-off-by: Allenxuxu <[email protected]>
  • Loading branch information
Allenxuxu committed Oct 9, 2021
1 parent 2a2babe commit 6bcc7f2
Showing 1 changed file with 27 additions and 15 deletions.
42 changes: 27 additions & 15 deletions consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -581,6 +581,12 @@ func (r *Consumer) ConnectToNSQD(addr string) error {

r.mtx.Lock()
delete(r.pendingConnections, addr)
if atomic.LoadInt32(&r.stopFlag) == 1 {
r.mtx.Unlock()

conn.Close()
return errors.New("consumer stopped")
}
r.connections[addr] = conn
r.mtx.Unlock()

Expand Down Expand Up @@ -751,25 +757,31 @@ func (r *Consumer) onConnClose(c *Conn) {
// there are no lookupd and we still have this nsqd TCP address in our list...
// try to reconnect after a bit
go func(addr string) {
ticker := time.NewTicker(r.config.LookupdPollInterval)
defer ticker.Stop()

for {
r.log(LogLevelInfo, "(%s) re-connecting in %s", addr, r.config.LookupdPollInterval)
time.Sleep(r.config.LookupdPollInterval)
if atomic.LoadInt32(&r.stopFlag) == 1 {
break
}
r.mtx.RLock()
reconnect := indexOf(addr, r.nsqdTCPAddrs) >= 0
r.mtx.RUnlock()
if !reconnect {
r.log(LogLevelWarning, "(%s) skipped reconnect after removal...", addr)
select {
case <-ticker.C:
r.mtx.RLock()
reconnect := indexOf(addr, r.nsqdTCPAddrs) >= 0
r.mtx.RUnlock()
if !reconnect {
r.log(LogLevelWarning, "(%s) skipped reconnect after removal...", addr)
return
}

err := r.ConnectToNSQD(addr)
if err != nil && err != ErrAlreadyConnected {
r.log(LogLevelError, "(%s) error connecting to nsqd - %s", addr, err)
continue
}

return
case <-r.exitChan:
return
}
err := r.ConnectToNSQD(addr)
if err != nil && err != ErrAlreadyConnected {
r.log(LogLevelError, "(%s) error connecting to nsqd - %s", addr, err)
continue
}
break
}
}(c.String())
}
Expand Down

0 comments on commit 6bcc7f2

Please sign in to comment.