Skip to content

Commit

Permalink
nsqd: discover topic/channel paused state on new topic discovery
Browse files Browse the repository at this point in the history
  • Loading branch information
thekingofworld committed Dec 5, 2020
1 parent 1124ec8 commit f513954
Show file tree
Hide file tree
Showing 17 changed files with 593 additions and 59 deletions.
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,5 @@ require (
)

go 1.13

replace github.com/nsqio/go-nsq v1.0.8 => github.com/thekingofworld/go-nsq v1.0.9-0.20200815080834-015554cb0b90
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,13 @@ github.com/mreiferson/go-options v1.0.0 h1:RMLidydGlDWpL+lQTXo0bVIf/XT2CTq7AEJMo
github.com/mreiferson/go-options v1.0.0/go.mod h1:zHtCks/HQvOt8ATyfwVe3JJq2PPuImzXINPRTC03+9w=
github.com/nsqio/go-diskqueue v1.0.0 h1:XRqpx7zTMu9yNVH+cHvA5jEiPNKoYcyEsCVqXP3eFg4=
github.com/nsqio/go-diskqueue v1.0.0/go.mod h1:INuJIxl4ayUsyoNtHL5+9MFPDfSZ0zY93hNY6vhBRsI=
github.com/nsqio/go-nsq v1.0.8 h1:3L2F8tNLlwXXlp2slDUrUWSBn2O3nMh8R1/KEDFTHPk=
github.com/nsqio/go-nsq v1.0.8/go.mod h1:vKq36oyeVXgsS5Q8YEO7WghqidAVXQlcFxzQbQTuDEY=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJyk=
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
github.com/thekingofworld/go-nsq v1.0.9-0.20200815080834-015554cb0b90 h1:js7rqe9IqkTNFcyXjbmuOo0nV3Z2HBM4yNc8SWqByUs=
github.com/thekingofworld/go-nsq v1.0.9-0.20200815080834-015554cb0b90/go.mod h1:vKq36oyeVXgsS5Q8YEO7WghqidAVXQlcFxzQbQTuDEY=
golang.org/x/sys v0.0.0-20191224085550-c709ea063b76 h1:Dho5nD6R3PcW2SH1or8vS0dszDaXRxIw55lBX7XiE5g=
golang.org/x/sys v0.0.0-20191224085550-c709ea063b76/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
Expand Down
88 changes: 78 additions & 10 deletions internal/clusterinfo/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,14 +119,14 @@ func (c *ClusterInfo) GetLookupdTopics(lookupdHTTPAddrs []string) ([]string, err

// GetLookupdTopicChannels returns a []string containing a union of all the channels
// from all the given lookupd for the given topic
func (c *ClusterInfo) GetLookupdTopicChannels(topic string, lookupdHTTPAddrs []string) ([]string, error) {
var channels []string
func (c *ClusterInfo) GetLookupdTopicChannels(topic string, lookupdHTTPAddrs []string) ([]ChannelState, error) {
var lock sync.Mutex
var wg sync.WaitGroup
var errs []error

type respType struct {
Channels []string `json:"channels"`
topicChannelsMeta := &TopicChannelsMeta{
Channels: []string{},
ChannelsMeta: map[string]*ChannelMeta{},
}

for _, addr := range lookupdHTTPAddrs {
Expand All @@ -137,7 +137,7 @@ func (c *ClusterInfo) GetLookupdTopicChannels(topic string, lookupdHTTPAddrs []s
endpoint := fmt.Sprintf("http://%s/channels?topic=%s", addr, url.QueryEscape(topic))
c.logf("CI: querying nsqlookupd %s", endpoint)

var resp respType
var resp TopicChannelsMeta
err := c.client.GETV1(endpoint, &resp)
if err != nil {
lock.Lock()
Expand All @@ -148,7 +148,15 @@ func (c *ClusterInfo) GetLookupdTopicChannels(topic string, lookupdHTTPAddrs []s

lock.Lock()
defer lock.Unlock()
channels = append(channels, resp.Channels...)
topicChannelsMeta.Channels = append(topicChannelsMeta.Channels, resp.Channels...)
for channelName, channelMeta := range resp.ChannelsMeta {
if curMeta, ok := topicChannelsMeta.ChannelsMeta[channelName]; ok {
if curMeta != nil && curMeta.Paused == true {
continue //one of the lookupd has returned paused,so just continue
}
}
topicChannelsMeta.ChannelsMeta[channelName] = channelMeta
}
}(addr)
}
wg.Wait()
Expand All @@ -157,13 +165,73 @@ func (c *ClusterInfo) GetLookupdTopicChannels(topic string, lookupdHTTPAddrs []s
return nil, fmt.Errorf("Failed to query any nsqlookupd: %s", ErrList(errs))
}

channels = stringy.Uniq(channels)
sort.Strings(channels)
topicChannelsMeta.Channels = stringy.Uniq(topicChannelsMeta.Channels)
sort.Strings(topicChannelsMeta.Channels)

var channelStates []ChannelState
for _, channelName := range topicChannelsMeta.Channels {
channelState := ChannelState{
Name: channelName,
Paused: false,
}
if meta, ok := topicChannelsMeta.ChannelsMeta[channelName]; ok && meta != nil {
channelState.Paused = meta.Paused
}
channelStates = append(channelStates, channelState)
}

if len(errs) > 0 {
return channelStates, ErrList(errs)
}
return channelStates, nil
}

// GetLookupdTopic return a topicMeta info from all the given lookupd for the given topic
func (c *ClusterInfo) GetLookupdTopic(topic string, lookupdHTTPAddrs []string) (TopicMeta, error) {
var lock sync.Mutex
var wg sync.WaitGroup
var errs []error

topicMeta := TopicMeta{}
type respType struct {
Topics []string `json:"topics"`
TopicsMeta map[string]*TopicMeta `json:"topics_meta"`
}
for _, addr := range lookupdHTTPAddrs {
wg.Add(1)
go func(addr string) {
defer wg.Done()

endpoint := fmt.Sprintf("http://%s/topics?topic=%s", addr, url.QueryEscape(topic))
c.logf("CI: querying nsqlookupd %s", endpoint)

var resp respType
err := c.client.GETV1(endpoint, &resp)
if err != nil {
lock.Lock()
errs = append(errs, err)
lock.Unlock()
return
}

lock.Lock()
defer lock.Unlock()
if metaData, ok := resp.TopicsMeta[topic]; ok {
//if one of the lookupd return paused, that should be paused
if metaData != nil && metaData.Paused == true && topicMeta.Paused == false {
topicMeta.Paused = true
}
}
}(addr)
}
wg.Wait()
if len(errs) == len(lookupdHTTPAddrs) {
return topicMeta, fmt.Errorf("Failed to query any nsqlookupd: %s", ErrList(errs))
}
if len(errs) > 0 {
return channels, ErrList(errs)
return topicMeta, ErrList(errs)
}
return channels, nil
return topicMeta, nil
}

// GetLookupdProducers returns Producers of all the nsqd connected to the given lookupds
Expand Down
18 changes: 18 additions & 0 deletions internal/clusterinfo/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -304,3 +304,21 @@ type ProducersByHost struct {
func (c ProducersByHost) Less(i, j int) bool {
return c.Producers[i].Hostname < c.Producers[j].Hostname
}

type ChannelState struct {
Name string
Paused bool
}

type ChannelMeta struct {
Paused bool `json:"paused"`
}

type TopicMeta struct {
Paused bool `json:"paused"`
}

type TopicChannelsMeta struct {
Channels []string `json:"channels"`
ChannelsMeta map[string]*ChannelMeta `json:"channels_meta"`
}
8 changes: 6 additions & 2 deletions nsqadmin/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,9 +228,13 @@ func (s *httpServer) topicsHandler(w http.ResponseWriter, req *http.Request, ps
producers, _ := s.ci.GetLookupdTopicProducers(
topicName, s.ctx.nsqadmin.getOpts().NSQLookupdHTTPAddresses)
if len(producers) == 0 {
topicChannels, _ := s.ci.GetLookupdTopicChannels(
var channels []string
channelStates, _ := s.ci.GetLookupdTopicChannels(
topicName, s.ctx.nsqadmin.getOpts().NSQLookupdHTTPAddresses)
topicChannelMap[topicName] = topicChannels
for _, channelState := range channelStates {
channels = append(channels, channelState.Name)
}
topicChannelMap[topicName] = channels
}
}
respond:
Expand Down
18 changes: 18 additions & 0 deletions nsqadmin/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@ type TopicsDoc struct {
Topics []interface{} `json:"topics"`
}

type TopicsInactiveDoc struct {
Topics map[string][]string `json:"topics"`
}

type TopicStatsDoc struct {
*clusterinfo.TopicStats
Message string `json:"message"`
Expand Down Expand Up @@ -177,6 +181,20 @@ func TestHTTPTopicsGET(t *testing.T) {
test.Nil(t, err)
test.Equal(t, 1, len(tr.Topics))
test.Equal(t, topicName, tr.Topics[0])

url = fmt.Sprintf("http://%s/api/topics?inactive=true", nsqadmin1.RealHTTPAddr())
req, _ = http.NewRequest("GET", url, nil)
resp, err = client.Do(req)
test.Nil(t, err)
test.Equal(t, 200, resp.StatusCode)
body, _ = ioutil.ReadAll(resp.Body)
resp.Body.Close()

t.Logf("%s", body)
ti := TopicsInactiveDoc{}
err = json.Unmarshal(body, &ti)
test.Nil(t, err)
test.Equal(t, 0, len(ti.Topics))
}

func TestHTTPTopicGET(t *testing.T) {
Expand Down
12 changes: 8 additions & 4 deletions nsqd/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ func NewChannel(topicName string, channelName string, ctx *context,
)
}

c.ctx.nsqd.Notify(c)
c.ctx.nsqd.Notify(notifyContext{NotifyTypeRegistration, c})

return c
}
Expand Down Expand Up @@ -164,7 +164,7 @@ func (c *Channel) exit(deleted bool) error {

// since we are explicitly deleting a channel (not just at system exit time)
// de-register this from the lookupd
c.ctx.nsqd.Notify(c)
c.ctx.nsqd.Notify(notifyContext{NotifyTypeUnRegistration, c})
} else {
c.ctx.nsqd.logf(LOG_INFO, "CHANNEL(%s): closing", c.name)
}
Expand Down Expand Up @@ -258,11 +258,15 @@ func (c *Channel) Depth() int64 {
}

func (c *Channel) Pause() error {
return c.doPause(true)
err := c.doPause(true)
c.ctx.nsqd.Notify(notifyContext{NotifyTypeStateUpdate, c})
return err
}

func (c *Channel) UnPause() error {
return c.doPause(false)
err := c.doPause(false)
c.ctx.nsqd.Notify(notifyContext{NotifyTypeStateUpdate, c})
return err
}

func (c *Channel) doPause(pause bool) error {
Expand Down
103 changes: 81 additions & 22 deletions nsqd/lookup.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package nsqd
import (
"bytes"
"encoding/json"
"fmt"
"net"
"os"
"strconv"
Expand All @@ -12,6 +13,17 @@ import (
"github.com/nsqio/nsq/internal/version"
)

const (
NotifyTypeRegistration = iota
NotifyTypeUnRegistration
NotifyTypeStateUpdate
)

type notifyContext struct {
notifyType int
v interface{}
}

func connectCallback(n *NSQD, hostname string) func(*lookupPeer) {
return func(lp *lookupPeer) {
ci := make(map[string]interface{})
Expand Down Expand Up @@ -53,11 +65,26 @@ func connectCallback(n *NSQD, hostname string) func(*lookupPeer) {
n.RLock()
for _, topic := range n.topicMap {
topic.RLock()
if len(topic.channelMap) == 0 {
commands = append(commands, nsq.Register(topic.name, ""))
} else {
for _, channel := range topic.channelMap {
commands = append(commands, nsq.Register(channel.topicName, channel.name))
commands = append(commands, nsq.Register(topic.name, ""))
topicPaused := topic.IsPaused()
if topicPaused { //sync state when topic paused
command, err := nsq.SyncState(topic.name, "", map[string]interface{}{"paused": topicPaused})
if err != nil {
n.logf(LOG_ERROR, "LOOKUPD(%s): SyncState - %s", lp, err)
} else {
commands = append(commands, command)
}
}
for _, channel := range topic.channelMap {
commands = append(commands, nsq.Register(channel.topicName, channel.name))
channelPaused := channel.IsPaused()
if channelPaused { //sync state when channel paused
command, err := nsq.SyncState(channel.topicName, channel.name, map[string]interface{}{"paused": channelPaused})
if err != nil {
n.logf(LOG_ERROR, "LOOKUPD(%s): SyncState - %s", lp, err)
continue
}
commands = append(commands, command)
}
}
topic.RUnlock()
Expand Down Expand Up @@ -118,29 +145,61 @@ func (n *NSQD) lookupLoop() {
}
case val := <-n.notifyChan:
var cmd *nsq.Command
var err error
var branch string

switch val.(type) {
case *Channel:
// notify all nsqlookupds that a new channel exists, or that it's removed
branch = "channel"
channel := val.(*Channel)
if channel.Exiting() == true {
cmd = nsq.UnRegister(channel.topicName, channel.name)
} else {
notifyCtx, ok := val.(notifyContext)
if !ok {
panic("non-notifyContext sent to notifyChan - should never happen")
}
switch notifyCtx.notifyType {
case NotifyTypeRegistration:
switch notifyCtx.v.(type) {
case *Channel:
// notify all nsqlookupds that a new channel exists
branch = "channel"
channel := notifyCtx.v.(*Channel)
cmd = nsq.Register(channel.topicName, channel.name)
case *Topic:
// notify all nsqlookupds that a new topic exists
branch = "topic"
topic := notifyCtx.v.(*Topic)
cmd = nsq.Register(topic.name, "")
}
case *Topic:
// notify all nsqlookupds that a new topic exists, or that it's removed
branch = "topic"
topic := val.(*Topic)
if topic.Exiting() == true {
case NotifyTypeUnRegistration:
switch notifyCtx.v.(type) {
case *Channel:
// notify all nsqlookupds that a new channel removed
branch = "channel"
channel := notifyCtx.v.(*Channel)
cmd = nsq.UnRegister(channel.topicName, channel.name)
case *Topic:
// notify all nsqlookupds that a new topic removed
branch = "topic"
topic := notifyCtx.v.(*Topic)
cmd = nsq.UnRegister(topic.name, "")
} else {
cmd = nsq.Register(topic.name, "")
}
case NotifyTypeStateUpdate:
switch notifyCtx.v.(type) {
case *Channel:
// notify all nsqlookupds that channel state changed
branch = "channel"
channel := notifyCtx.v.(*Channel)
cmd, err = nsq.SyncState(channel.topicName, channel.name, map[string]interface{}{"paused": channel.IsPaused()})
if err != nil {
n.logf(LOG_ERROR, "NSQD: build cmd err: %s", err)
}
case *Topic:
// notify all nsqlookupds that topic state changed
branch = "topic"
topic := notifyCtx.v.(*Topic)
cmd, err = nsq.SyncState(topic.name, "", map[string]interface{}{"paused": topic.IsPaused()})
if err != nil {
n.logf(LOG_ERROR, "NSQD: build cmd err: %s", err)
}
}
default:
panic(fmt.Sprintf("unknown notifyType in notifyContext: %d, should never happen", notifyCtx.notifyType))
}

for _, lookupPeer := range lookupPeers {
n.logf(LOG_INFO, "LOOKUPD(%s): %s %s", lookupPeer, branch, cmd)
_, err := lookupPeer.Command(cmd)
Expand Down
Loading

0 comments on commit f513954

Please sign in to comment.