Skip to content

Commit

Permalink
Backport of state: ensure that identical manual virtual IP updates re…
Browse files Browse the repository at this point in the history
…sult in not bumping the modify indexes into release/1.20.x (#21969)

The consul-k8s endpoints controller issues catalog register and manual virtual ip
updates without first checking to see if the updates would be effectively not
changing anything. This is supposed to be reasonable because the state store
functions do the check for a no-op update and should discard repeat updates so
that downstream blocking queries watching one of the resources don't fire
pointlessly (and CPU wastefully).

While this is true for the check/service/node catalog updates, it is not true for
the "manual virtual ip" updates triggered by the PUT /v1/internal/service-virtual-ip.
Forcing the connect injector pod to recycle while watching some lightly
modified FSM code can show that a lot of updates are of the update list of ips
from [A] to [A]. Immediately following this stray update you can see a lot of
activity in proxycfg and xds packages waking up due to blocking queries
triggered by this.

This PR skips updates that change nothing both:

- at the RPC layer before passing it to raft (ideally)
- if the write does make it through raft and get applied to the FSM (failsafe)
  • Loading branch information
hc-github-team-consul-core authored Nov 25, 2024
1 parent 5aca812 commit d335aa3
Show file tree
Hide file tree
Showing 7 changed files with 437 additions and 91 deletions.
3 changes: 3 additions & 0 deletions .changelog/21909.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:bug
state: ensure that identical manual virtual IP updates result in not bumping the modify indexes
```
32 changes: 28 additions & 4 deletions agent/consul/internal_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,18 @@ import (
"fmt"
"net"

hashstructure_v2 "github.com/mitchellh/hashstructure/v2"
"golang.org/x/exp/maps"

"github.com/hashicorp/go-bexpr"
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/go-memdb"
"github.com/hashicorp/serf/serf"
hashstructure_v2 "github.com/mitchellh/hashstructure/v2"

"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/consul/state"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/lib/stringslice"
)

const MaximumManualVIPsPerService = 8
Expand Down Expand Up @@ -782,17 +785,38 @@ func (m *Internal) AssignManualServiceVIPs(args *structs.AssignServiceManualVIPs
return fmt.Errorf("cannot associate more than %d manual virtual IPs with the same service", MaximumManualVIPsPerService)
}

vipMap := make(map[string]struct{})
for _, ip := range args.ManualVIPs {
parsedIP := net.ParseIP(ip)
if parsedIP == nil || parsedIP.To4() == nil {
return fmt.Errorf("%q is not a valid IPv4 address", parsedIP.String())
}
vipMap[ip] = struct{}{}
}
// Silently ignore duplicates.
args.ManualVIPs = maps.Keys(vipMap)

psn := structs.PeeredServiceName{
ServiceName: structs.NewServiceName(args.Service, &args.EnterpriseMeta),
}

// Check to see if we can skip the raft apply entirely.
{
existingIPs, err := m.srv.fsm.State().ServiceManualVIPs(psn)
if err != nil {
return fmt.Errorf("error checking for existing manual ips for service: %w", err)
}
if existingIPs != nil && stringslice.EqualMapKeys(existingIPs.ManualIPs, vipMap) {
*reply = structs.AssignServiceManualVIPsResponse{
Found: true,
UnassignedFrom: nil,
}
return nil
}
}

req := state.ServiceVirtualIP{
Service: structs.PeeredServiceName{
ServiceName: structs.NewServiceName(args.Service, &args.EnterpriseMeta),
},
Service: psn,
ManualIPs: args.ManualVIPs,
}
resp, err := m.srv.raftApplyMsgpack(structs.UpdateVirtualIPRequestType, req)
Expand Down
64 changes: 51 additions & 13 deletions agent/consul/internal_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,11 @@ import (
"testing"
"time"

"github.com/hashicorp/consul-net-rpc/net/rpc"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

msgpackrpc "github.com/hashicorp/consul-net-rpc/net-rpc-msgpackrpc"
"github.com/hashicorp/consul-net-rpc/net/rpc"

"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/structs"
Expand Down Expand Up @@ -3885,36 +3885,66 @@ func TestInternal_AssignManualServiceVIPs(t *testing.T) {
require.NoError(t, msgpackrpc.CallWithCodec(codec, "Internal.AssignManualServiceVIPs", req, &resp))

type testcase struct {
name string
req structs.AssignServiceManualVIPsRequest
expect structs.AssignServiceManualVIPsResponse
expectErr string
name string
req structs.AssignServiceManualVIPsRequest
expect structs.AssignServiceManualVIPsResponse
expectAgain structs.AssignServiceManualVIPsResponse
expectErr string
expectIPs []string
}
run := func(t *testing.T, tc testcase) {

run := func(t *testing.T, tc testcase, again bool) {
if tc.expectErr != "" && again {
return // we don't retest known errors
}

var resp structs.AssignServiceManualVIPsResponse
err := msgpackrpc.CallWithCodec(codec, "Internal.AssignManualServiceVIPs", tc.req, &resp)
if tc.expectErr != "" {
require.Error(t, err)
require.Contains(t, err.Error(), tc.expectErr)
return
testutil.RequireErrorContains(t, err, tc.expectErr)
} else {
if again {
require.Equal(t, tc.expectAgain, resp)
} else {
require.Equal(t, tc.expect, resp)
}

psn := structs.PeeredServiceName{ServiceName: structs.NewServiceName(tc.req.Service, nil)}
got, err := s1.fsm.State().ServiceManualVIPs(psn)
require.NoError(t, err)
require.NotNil(t, got)
require.Equal(t, tc.expectIPs, got.ManualIPs)
}
require.Equal(t, tc.expect, resp)
}

tcs := []testcase{
{
name: "successful manual ip assignment",
req: structs.AssignServiceManualVIPsRequest{
Service: "web",
ManualVIPs: []string{"1.1.1.1", "2.2.2.2"},
},
expect: structs.AssignServiceManualVIPsResponse{Found: true},
expectIPs: []string{"1.1.1.1", "2.2.2.2"},
expect: structs.AssignServiceManualVIPsResponse{Found: true},
expectAgain: structs.AssignServiceManualVIPsResponse{Found: true},
},
{
name: "successfully ignoring duplicates",
req: structs.AssignServiceManualVIPsRequest{
Service: "web",
ManualVIPs: []string{"1.2.3.4", "5.6.7.8", "1.2.3.4", "5.6.7.8"},
},
expectIPs: []string{"1.2.3.4", "5.6.7.8"},
expect: structs.AssignServiceManualVIPsResponse{Found: true},
expectAgain: structs.AssignServiceManualVIPsResponse{Found: true},
},
{
name: "reassign existing ip",
req: structs.AssignServiceManualVIPsRequest{
Service: "web",
ManualVIPs: []string{"8.8.8.8"},
},
expectIPs: []string{"8.8.8.8"},
expect: structs.AssignServiceManualVIPsResponse{
Found: true,
UnassignedFrom: []structs.PeeredServiceName{
Expand All @@ -3923,20 +3953,28 @@ func TestInternal_AssignManualServiceVIPs(t *testing.T) {
},
},
},
// When we repeat this operation the second time it's a no-op.
expectAgain: structs.AssignServiceManualVIPsResponse{Found: true},
},
{
name: "invalid ip",
req: structs.AssignServiceManualVIPsRequest{
Service: "web",
ManualVIPs: []string{"3.3.3.3", "invalid"},
},
expect: structs.AssignServiceManualVIPsResponse{},
expectErr: "not a valid",
},
}
for _, tc := range tcs {
t.Run(tc.name, func(t *testing.T) {
run(t, tc)
t.Run("initial", func(t *testing.T) {
run(t, tc, false)
})
if tc.expectErr == "" {
t.Run("repeat", func(t *testing.T) {
run(t, tc, true) // only repeat a write if it isn't an known error
})
}
})
}
}
54 changes: 44 additions & 10 deletions agent/consul/state/catalog.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
"fmt"
"net"
"reflect"
"slices"
"sort"
"strings"

"github.com/hashicorp/go-memdb"
Expand All @@ -18,6 +20,7 @@ import (
"github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/lib"
"github.com/hashicorp/consul/lib/maps"
"github.com/hashicorp/consul/lib/stringslice"
"github.com/hashicorp/consul/types"
)

Expand Down Expand Up @@ -1106,6 +1109,9 @@ func (s *Store) AssignManualServiceVIPs(idx uint64, psn structs.PeeredServiceNam
for _, ip := range ips {
assignedIPs[ip] = struct{}{}
}

txnNeedsCommit := false

modifiedEntries := make(map[structs.PeeredServiceName]struct{})
for ip := range assignedIPs {
entry, err := tx.First(tableServiceVirtualIPs, indexManualVIPs, psn.ServiceName.PartitionOrDefault(), ip)
Expand All @@ -1118,7 +1124,13 @@ func (s *Store) AssignManualServiceVIPs(idx uint64, psn structs.PeeredServiceNam
}

newEntry := entry.(ServiceVirtualIP)
if newEntry.Service.ServiceName.Matches(psn.ServiceName) {

var (
thisServiceName = newEntry.Service.ServiceName
thisPeer = newEntry.Service.Peer
)

if thisServiceName.Matches(psn.ServiceName) && thisPeer == psn.Peer {
continue
}

Expand All @@ -1130,13 +1142,20 @@ func (s *Store) AssignManualServiceVIPs(idx uint64, psn structs.PeeredServiceNam
filteredIPs = append(filteredIPs, existingIP)
}
}
sort.Strings(filteredIPs)

newEntry.ManualIPs = filteredIPs
newEntry.ModifyIndex = idx
if err := tx.Insert(tableServiceVirtualIPs, newEntry); err != nil {
return false, nil, fmt.Errorf("failed inserting service virtual IP entry: %s", err)
}
modifiedEntries[newEntry.Service] = struct{}{}

if err := updateVirtualIPMaxIndexes(tx, idx, thisServiceName.PartitionOrDefault(), thisPeer); err != nil {
return false, nil, err
}

txnNeedsCommit = true
}

entry, err := tx.First(tableServiceVirtualIPs, indexID, psn)
Expand All @@ -1149,23 +1168,37 @@ func (s *Store) AssignManualServiceVIPs(idx uint64, psn structs.PeeredServiceNam
}

newEntry := entry.(ServiceVirtualIP)
newEntry.ManualIPs = ips
newEntry.ModifyIndex = idx

if err := tx.Insert(tableServiceVirtualIPs, newEntry); err != nil {
return false, nil, fmt.Errorf("failed inserting service virtual IP entry: %s", err)
}
if err := updateVirtualIPMaxIndexes(tx, idx, psn.ServiceName.PartitionOrDefault(), psn.Peer); err != nil {
return false, nil, err
// Check to see if the slice already contains the same ips.
if !stringslice.EqualMapKeys(newEntry.ManualIPs, assignedIPs) {
newEntry.ManualIPs = slices.Clone(ips)
newEntry.ModifyIndex = idx

sort.Strings(newEntry.ManualIPs)

if err := tx.Insert(tableServiceVirtualIPs, newEntry); err != nil {
return false, nil, fmt.Errorf("failed inserting service virtual IP entry: %s", err)
}
if err := updateVirtualIPMaxIndexes(tx, idx, psn.ServiceName.PartitionOrDefault(), psn.Peer); err != nil {
return false, nil, err
}
txnNeedsCommit = true
}
if err = tx.Commit(); err != nil {
return false, nil, err

if txnNeedsCommit {
if err = tx.Commit(); err != nil {
return false, nil, err
}
}

return true, maps.SliceOfKeys(modifiedEntries), nil
}

func updateVirtualIPMaxIndexes(txn WriteTxn, idx uint64, partition, peerName string) error {
// update global max index (for snapshots)
if err := indexUpdateMaxTxn(txn, idx, tableServiceVirtualIPs); err != nil {
return fmt.Errorf("failed while updating index: %w", err)
}
// update per-partition max index
if err := indexUpdateMaxTxn(txn, idx, partitionedIndexEntryName(tableServiceVirtualIPs, partition)); err != nil {
return fmt.Errorf("failed while updating partitioned index: %w", err)
Expand Down Expand Up @@ -3086,6 +3119,7 @@ func servicesVirtualIPsTxn(tx ReadTxn, ws memdb.WatchSet) (uint64, []ServiceVirt
vips = append(vips, vip)
}

// Pull from the global one
idx := maxIndexWatchTxn(tx, nil, tableServiceVirtualIPs)

return idx, vips, nil
Expand Down
Loading

0 comments on commit d335aa3

Please sign in to comment.