Skip to content

Commit

Permalink
channel: improve deliverability to zone and region local
Browse files Browse the repository at this point in the history
  • Loading branch information
jehiah committed Mar 20, 2024
1 parent 4631064 commit 8fc9060
Showing 1 changed file with 27 additions and 5 deletions.
32 changes: 27 additions & 5 deletions nsqd/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -316,22 +316,44 @@ func (c *Channel) PutMessage(m *Message) error {

func (c *Channel) put(m *Message) error {
if c.topologyAwareConsumption {
// Attempt zone local, region local and finally the memory channel
// we do this to ensure that we preferentially deliver messages based on toplogy
//
// Because messagePump is intermittently unavailable while writing a msg to a client
// we continue to have higher priority channels in the select loop, this means at each
// attempt a higher priority channel can still win
select {
case c.zoneLocalMsgChan <- m:
return nil
default:
}
select {
case c.zoneLocalMsgChan <- m:
return nil
case c.regionLocalMsgChan <- m:
return nil
default:
}

select {
case c.zoneLocalMsgChan <- m:
return nil
case c.regionLocalMsgChan <- m:
return nil
case c.memoryMsgChan <- m:
return nil
default:
}

} else {

select {
case c.memoryMsgChan <- m:
return nil
default:
}
}
select {
case c.memoryMsgChan <- m:
return nil
default:
}

err := writeMessageToBackend(m, c.backend)
c.nsqd.SetHealth(err)
if err != nil {
Expand Down

0 comments on commit 8fc9060

Please sign in to comment.