Skip to content

Commit

Permalink
state: ensure that identical manual virtual IP updates result in not …
Browse files Browse the repository at this point in the history
…bumping the modify indexes (#21909)

The consul-k8s endpoints controller issues catalog register and manual virtual ip
updates without first checking to see if the updates would be effectively not
changing anything. This is supposed to be reasonable because the state store
functions do the check for a no-op update and should discard repeat updates so
that downstream blocking queries watching one of the resources don't fire
pointlessly (and CPU wastefully).

While this is true for the check/service/node catalog updates, it is not true for
the "manual virtual ip" updates triggered by the PUT /v1/internal/service-virtual-ip.
Forcing the connect injector pod to recycle while watching some lightly
modified FSM code can show that a lot of updates are of the update list of ips
from [A] to [A]. Immediately following this stray update you can see a lot of
activity in proxycfg and xds packages waking up due to blocking queries
triggered by this.

This PR skips updates that change nothing both:

- at the RPC layer before passing it to raft (ideally)
- if the write does make it through raft and get applied to the FSM (failsafe)
  • Loading branch information
rboyer authored Nov 22, 2024
1 parent bbb2e79 commit c81dc8c
Show file tree
Hide file tree
Showing 7 changed files with 440 additions and 91 deletions.
3 changes: 3 additions & 0 deletions .changelog/21909.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:bug
state: ensure that identical manual virtual IP updates result in not bumping the modify indexes
```
32 changes: 28 additions & 4 deletions agent/consul/internal_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,18 @@ import (
"fmt"
"net"

hashstructure_v2 "github.com/mitchellh/hashstructure/v2"
"golang.org/x/exp/maps"

"github.com/hashicorp/go-bexpr"
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/go-memdb"
"github.com/hashicorp/serf/serf"
hashstructure_v2 "github.com/mitchellh/hashstructure/v2"

"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/consul/state"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/lib/stringslice"
)

const MaximumManualVIPsPerService = 8
Expand Down Expand Up @@ -782,17 +785,38 @@ func (m *Internal) AssignManualServiceVIPs(args *structs.AssignServiceManualVIPs
return fmt.Errorf("cannot associate more than %d manual virtual IPs with the same service", MaximumManualVIPsPerService)
}

vipMap := make(map[string]struct{})
for _, ip := range args.ManualVIPs {
parsedIP := net.ParseIP(ip)
if parsedIP == nil || parsedIP.To4() == nil {
return fmt.Errorf("%q is not a valid IPv4 address", parsedIP.String())
}
vipMap[ip] = struct{}{}
}
// Silently ignore duplicates.
args.ManualVIPs = maps.Keys(vipMap)

psn := structs.PeeredServiceName{
ServiceName: structs.NewServiceName(args.Service, &args.EnterpriseMeta),
}

// Check to see if we can skip the raft apply entirely.
{
existingIPs, err := m.srv.fsm.State().ServiceManualVIPs(psn)
if err != nil {
return fmt.Errorf("error checking for existing manual ips for service: %w", err)
}
if existingIPs != nil && stringslice.EqualMapKeys(existingIPs.ManualIPs, vipMap) {
*reply = structs.AssignServiceManualVIPsResponse{
Found: true,
UnassignedFrom: nil,
}
return nil
}
}

req := state.ServiceVirtualIP{
Service: structs.PeeredServiceName{
ServiceName: structs.NewServiceName(args.Service, &args.EnterpriseMeta),
},
Service: psn,
ManualIPs: args.ManualVIPs,
}
resp, err := m.srv.raftApplyMsgpack(structs.UpdateVirtualIPRequestType, req)
Expand Down
67 changes: 54 additions & 13 deletions agent/consul/internal_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,11 @@ import (
"testing"
"time"

"github.com/hashicorp/consul-net-rpc/net/rpc"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

msgpackrpc "github.com/hashicorp/consul-net-rpc/net-rpc-msgpackrpc"
"github.com/hashicorp/consul-net-rpc/net/rpc"

"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/structs"
Expand Down Expand Up @@ -3885,36 +3885,69 @@ func TestInternal_AssignManualServiceVIPs(t *testing.T) {
require.NoError(t, msgpackrpc.CallWithCodec(codec, "Internal.AssignManualServiceVIPs", req, &resp))

type testcase struct {
name string
req structs.AssignServiceManualVIPsRequest
expect structs.AssignServiceManualVIPsResponse
expectErr string
name string
req structs.AssignServiceManualVIPsRequest
expect structs.AssignServiceManualVIPsResponse
expectAgain structs.AssignServiceManualVIPsResponse
expectErr string
expectIPs []string
}
run := func(t *testing.T, tc testcase) {

run := func(t *testing.T, tc testcase, again bool) {
if tc.expectErr != "" && again {
return // we don't retest known errors
}

var resp structs.AssignServiceManualVIPsResponse
idx1 := s1.raft.CommitIndex()
err := msgpackrpc.CallWithCodec(codec, "Internal.AssignManualServiceVIPs", tc.req, &resp)
idx2 := s1.raft.CommitIndex()
if tc.expectErr != "" {
require.Error(t, err)
require.Contains(t, err.Error(), tc.expectErr)
return
testutil.RequireErrorContains(t, err, tc.expectErr)
} else {
if again {
require.Equal(t, tc.expectAgain, resp)
require.Equal(t, idx1, idx2, "no raft operations occurred")
} else {
require.Equal(t, tc.expect, resp)
}

psn := structs.PeeredServiceName{ServiceName: structs.NewServiceName(tc.req.Service, nil)}
got, err := s1.fsm.State().ServiceManualVIPs(psn)
require.NoError(t, err)
require.NotNil(t, got)
require.Equal(t, tc.expectIPs, got.ManualIPs)
}
require.Equal(t, tc.expect, resp)
}

tcs := []testcase{
{
name: "successful manual ip assignment",
req: structs.AssignServiceManualVIPsRequest{
Service: "web",
ManualVIPs: []string{"1.1.1.1", "2.2.2.2"},
},
expect: structs.AssignServiceManualVIPsResponse{Found: true},
expectIPs: []string{"1.1.1.1", "2.2.2.2"},
expect: structs.AssignServiceManualVIPsResponse{Found: true},
expectAgain: structs.AssignServiceManualVIPsResponse{Found: true},
},
{
name: "successfully ignoring duplicates",
req: structs.AssignServiceManualVIPsRequest{
Service: "web",
ManualVIPs: []string{"1.2.3.4", "5.6.7.8", "1.2.3.4", "5.6.7.8"},
},
expectIPs: []string{"1.2.3.4", "5.6.7.8"},
expect: structs.AssignServiceManualVIPsResponse{Found: true},
expectAgain: structs.AssignServiceManualVIPsResponse{Found: true},
},
{
name: "reassign existing ip",
req: structs.AssignServiceManualVIPsRequest{
Service: "web",
ManualVIPs: []string{"8.8.8.8"},
},
expectIPs: []string{"8.8.8.8"},
expect: structs.AssignServiceManualVIPsResponse{
Found: true,
UnassignedFrom: []structs.PeeredServiceName{
Expand All @@ -3923,20 +3956,28 @@ func TestInternal_AssignManualServiceVIPs(t *testing.T) {
},
},
},
// When we repeat this operation the second time it's a no-op.
expectAgain: structs.AssignServiceManualVIPsResponse{Found: true},
},
{
name: "invalid ip",
req: structs.AssignServiceManualVIPsRequest{
Service: "web",
ManualVIPs: []string{"3.3.3.3", "invalid"},
},
expect: structs.AssignServiceManualVIPsResponse{},
expectErr: "not a valid",
},
}
for _, tc := range tcs {
t.Run(tc.name, func(t *testing.T) {
run(t, tc)
t.Run("initial", func(t *testing.T) {
run(t, tc, false)
})
if tc.expectErr == "" {
t.Run("repeat", func(t *testing.T) {
run(t, tc, true) // only repeat a write if it isn't an known error
})
}
})
}
}
54 changes: 44 additions & 10 deletions agent/consul/state/catalog.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
"fmt"
"net"
"reflect"
"slices"
"sort"
"strings"

"github.com/hashicorp/go-memdb"
Expand All @@ -18,6 +20,7 @@ import (
"github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/lib"
"github.com/hashicorp/consul/lib/maps"
"github.com/hashicorp/consul/lib/stringslice"
"github.com/hashicorp/consul/types"
)

Expand Down Expand Up @@ -1106,6 +1109,9 @@ func (s *Store) AssignManualServiceVIPs(idx uint64, psn structs.PeeredServiceNam
for _, ip := range ips {
assignedIPs[ip] = struct{}{}
}

txnNeedsCommit := false

modifiedEntries := make(map[structs.PeeredServiceName]struct{})
for ip := range assignedIPs {
entry, err := tx.First(tableServiceVirtualIPs, indexManualVIPs, psn.ServiceName.PartitionOrDefault(), ip)
Expand All @@ -1118,7 +1124,13 @@ func (s *Store) AssignManualServiceVIPs(idx uint64, psn structs.PeeredServiceNam
}

newEntry := entry.(ServiceVirtualIP)
if newEntry.Service.ServiceName.Matches(psn.ServiceName) {

var (
thisServiceName = newEntry.Service.ServiceName
thisPeer = newEntry.Service.Peer
)

if thisServiceName.Matches(psn.ServiceName) && thisPeer == psn.Peer {
continue
}

Expand All @@ -1130,13 +1142,20 @@ func (s *Store) AssignManualServiceVIPs(idx uint64, psn structs.PeeredServiceNam
filteredIPs = append(filteredIPs, existingIP)
}
}
sort.Strings(filteredIPs)

newEntry.ManualIPs = filteredIPs
newEntry.ModifyIndex = idx
if err := tx.Insert(tableServiceVirtualIPs, newEntry); err != nil {
return false, nil, fmt.Errorf("failed inserting service virtual IP entry: %s", err)
}
modifiedEntries[newEntry.Service] = struct{}{}

if err := updateVirtualIPMaxIndexes(tx, idx, thisServiceName.PartitionOrDefault(), thisPeer); err != nil {
return false, nil, err
}

txnNeedsCommit = true
}

entry, err := tx.First(tableServiceVirtualIPs, indexID, psn)
Expand All @@ -1149,23 +1168,37 @@ func (s *Store) AssignManualServiceVIPs(idx uint64, psn structs.PeeredServiceNam
}

newEntry := entry.(ServiceVirtualIP)
newEntry.ManualIPs = ips
newEntry.ModifyIndex = idx

if err := tx.Insert(tableServiceVirtualIPs, newEntry); err != nil {
return false, nil, fmt.Errorf("failed inserting service virtual IP entry: %s", err)
}
if err := updateVirtualIPMaxIndexes(tx, idx, psn.ServiceName.PartitionOrDefault(), psn.Peer); err != nil {
return false, nil, err
// Check to see if the slice already contains the same ips.
if !stringslice.EqualMapKeys(newEntry.ManualIPs, assignedIPs) {
newEntry.ManualIPs = slices.Clone(ips)
newEntry.ModifyIndex = idx

sort.Strings(newEntry.ManualIPs)

if err := tx.Insert(tableServiceVirtualIPs, newEntry); err != nil {
return false, nil, fmt.Errorf("failed inserting service virtual IP entry: %s", err)
}
if err := updateVirtualIPMaxIndexes(tx, idx, psn.ServiceName.PartitionOrDefault(), psn.Peer); err != nil {
return false, nil, err
}
txnNeedsCommit = true
}
if err = tx.Commit(); err != nil {
return false, nil, err

if txnNeedsCommit {
if err = tx.Commit(); err != nil {
return false, nil, err
}
}

return true, maps.SliceOfKeys(modifiedEntries), nil
}

func updateVirtualIPMaxIndexes(txn WriteTxn, idx uint64, partition, peerName string) error {
// update global max index (for snapshots)
if err := indexUpdateMaxTxn(txn, idx, tableServiceVirtualIPs); err != nil {
return fmt.Errorf("failed while updating index: %w", err)
}
// update per-partition max index
if err := indexUpdateMaxTxn(txn, idx, partitionedIndexEntryName(tableServiceVirtualIPs, partition)); err != nil {
return fmt.Errorf("failed while updating partitioned index: %w", err)
Expand Down Expand Up @@ -3086,6 +3119,7 @@ func servicesVirtualIPsTxn(tx ReadTxn, ws memdb.WatchSet) (uint64, []ServiceVirt
vips = append(vips, vip)
}

// Pull from the global one
idx := maxIndexWatchTxn(tx, nil, tableServiceVirtualIPs)

return idx, vips, nil
Expand Down
Loading

0 comments on commit c81dc8c

Please sign in to comment.