Skip to content

Commit

Permalink
Implement Compaction support in robustness test
Browse files Browse the repository at this point in the history
Signed-off-by: Marek Siarkowicz <[email protected]>
  • Loading branch information
serathius committed May 28, 2024
1 parent f8d8f6a commit b1032f6
Show file tree
Hide file tree
Showing 11 changed files with 154 additions and 55 deletions.
4 changes: 4 additions & 0 deletions tests/robustness/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,9 +193,13 @@ func (c *RecordingClient) Defragment(ctx context.Context) (*clientv3.DefragmentR
func (c *RecordingClient) Compact(ctx context.Context, rev int64) (*clientv3.CompactResponse, error) {
c.kvMux.Lock()
defer c.kvMux.Unlock()
callTime := time.Since(c.baseTime)
resp, err := c.client.Compact(ctx, rev)
returnTime := time.Since(c.baseTime)
c.kvOperations.AppendCompact(rev, callTime, returnTime, resp, err)
return resp, err
}

func (c *RecordingClient) MemberList(ctx context.Context, opts ...clientv3.OpOption) (*clientv3.MemberListResponse, error) {
c.kvMux.Lock()
defer c.kvMux.Unlock()
Expand Down
7 changes: 6 additions & 1 deletion tests/robustness/model/describe.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ func describeEtcdResponse(request EtcdRequest, response MaybeEtcdResponse) strin
if response.Error != "" {
return fmt.Sprintf("err: %q", response.Error)
}
if response.ClientError != "" {
return fmt.Sprintf("err: %q", response.ClientError)
}
if response.PartialResponse {
return fmt.Sprintf("unknown, rev: %d", response.Revision)
}
Expand All @@ -33,7 +36,7 @@ func describeEtcdResponse(request EtcdRequest, response MaybeEtcdResponse) strin
return fmt.Sprintf("%s, rev: %d", describeRangeResponse(request.Range.RangeOptions, *response.Range), response.Revision)
case Txn:
return fmt.Sprintf("%s, rev: %d", describeTxnResponse(request.Txn, response.Txn), response.Revision)
case LeaseGrant, LeaseRevoke, Defragment:
case LeaseGrant, LeaseRevoke, Defragment, Compact:
if response.Revision == 0 {
return "ok"
}
Expand Down Expand Up @@ -67,6 +70,8 @@ func describeEtcdRequest(request EtcdRequest) string {
return fmt.Sprintf("leaseRevoke(%d)", request.LeaseRevoke.LeaseID)
case Defragment:
return fmt.Sprintf("defragment()")
case Compact:
return fmt.Sprintf("compact(%d)", request.Compact.Revision)
default:
return fmt.Sprintf("<! unknown request type: %q !>", request.Type)
}
Expand Down
41 changes: 33 additions & 8 deletions tests/robustness/model/deterministic.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import (
"sort"

"github.com/anishathalye/porcupine"

"go.etcd.io/etcd/server/v3/storage/mvcc"
)

// DeterministicModel assumes a deterministic execution of etcd requests. All
Expand Down Expand Up @@ -64,10 +66,11 @@ var DeterministicModel = porcupine.Model{
}

type EtcdState struct {
Revision int64
KeyValues map[string]ValueRevision
KeyLeases map[string]int64
Leases map[int64]EtcdLease
Revision int64
CompactRevision int64
KeyValues map[string]ValueRevision
KeyLeases map[string]int64
Leases map[int64]EtcdLease
}

func (s EtcdState) apply(request EtcdRequest, response EtcdResponse) (bool, EtcdState) {
Expand All @@ -77,10 +80,12 @@ func (s EtcdState) apply(request EtcdRequest, response EtcdResponse) (bool, Etcd

func freshEtcdState() EtcdState {
return EtcdState{
Revision: 1,
KeyValues: map[string]ValueRevision{},
KeyLeases: map[string]int64{},
Leases: map[int64]EtcdLease{},
Revision: 1,
// Start from CompactRevision equal -1 as etcd allows client to compact revision 0 for some reason.
CompactRevision: -1,
KeyValues: map[string]ValueRevision{},
KeyLeases: map[string]int64{},
Leases: map[int64]EtcdLease{},
}
}

Expand All @@ -100,6 +105,9 @@ func (s EtcdState) Step(request EtcdRequest) (EtcdState, MaybeEtcdResponse) {
if request.Range.Revision > s.Revision {
return s, MaybeEtcdResponse{Error: ErrEtcdFutureRev.Error()}
}
if request.Range.Revision < s.CompactRevision {
return s, MaybeEtcdResponse{EtcdResponse: EtcdResponse{ClientError: mvcc.ErrCompacted.Error()}}
}
return s, MaybeEtcdResponse{PartialResponse: true, EtcdResponse: EtcdResponse{Revision: s.Revision}}
case Txn:
failure := false
Expand Down Expand Up @@ -178,6 +186,12 @@ func (s EtcdState) Step(request EtcdRequest) (EtcdState, MaybeEtcdResponse) {
return s, MaybeEtcdResponse{EtcdResponse: EtcdResponse{Revision: s.Revision, LeaseRevoke: &LeaseRevokeResponse{}}}
case Defragment:
return s, MaybeEtcdResponse{EtcdResponse: EtcdResponse{Defragment: &DefragmentResponse{}, Revision: s.Revision}}
case Compact:
if request.Compact.Revision <= s.CompactRevision {
return s, MaybeEtcdResponse{EtcdResponse: EtcdResponse{ClientError: mvcc.ErrCompacted.Error()}}
}
s.CompactRevision = request.Compact.Revision
return s, MaybeEtcdResponse{EtcdResponse: EtcdResponse{Compact: &CompactResponse{}, Revision: s.Revision}}
default:
panic(fmt.Sprintf("Unknown request type: %v", request.Type))
}
Expand Down Expand Up @@ -237,6 +251,7 @@ const (
LeaseGrant RequestType = "leaseGrant"
LeaseRevoke RequestType = "leaseRevoke"
Defragment RequestType = "defragment"
Compact RequestType = "compact"
)

type EtcdRequest struct {
Expand All @@ -246,6 +261,7 @@ type EtcdRequest struct {
Range *RangeRequest
Txn *TxnRequest
Defragment *DefragmentRequest
Compact *CompactRequest
}

func (r *EtcdRequest) IsRead() bool {
Expand Down Expand Up @@ -337,6 +353,8 @@ type EtcdResponse struct {
LeaseGrant *LeaseGrantReponse
LeaseRevoke *LeaseRevokeResponse
Defragment *DefragmentResponse
Compact *CompactResponse
ClientError string
Revision int64
}

Expand Down Expand Up @@ -398,3 +416,10 @@ func ToValueOrHash(value string) ValueOrHash {
}
return v
}

type CompactResponse struct {
}

type CompactRequest struct {
Revision int64
}
29 changes: 29 additions & 0 deletions tests/robustness/model/history.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,15 @@ package model

import (
"fmt"
"strings"
"time"

"github.com/anishathalye/porcupine"

"go.etcd.io/etcd/api/v3/etcdserverpb"
"go.etcd.io/etcd/api/v3/mvccpb"
clientv3 "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/server/v3/storage/mvcc"
"go.etcd.io/etcd/tests/v3/robustness/identity"
)

Expand Down Expand Up @@ -259,6 +261,25 @@ func (h *AppendableHistory) AppendDefragment(start, end time.Duration, resp *cli
h.appendSuccessful(request, start, end, defragmentResponse(revision))
}

func (h *AppendableHistory) AppendCompact(rev int64, start, end time.Duration, resp *clientv3.CompactResponse, err error) {
request := compactRequest(rev)
if err != nil {
if strings.Contains(err.Error(), mvcc.ErrCompacted.Error()) {
h.appendSuccessful(request, start, end, MaybeEtcdResponse{
EtcdResponse: EtcdResponse{ClientError: mvcc.ErrCompacted.Error()},
})
return
}
h.appendFailed(request, start, end, err)
return
}
var revision int64
if resp != nil && resp.Header != nil {
revision = resp.Header.Revision
}
h.appendSuccessful(request, start, end, compactResponse(revision))
}

func (h *AppendableHistory) appendFailed(request EtcdRequest, start, end time.Duration, err error) {
op := porcupine.Operation{
ClientId: h.streamID,
Expand Down Expand Up @@ -444,6 +465,14 @@ func defragmentResponse(revision int64) MaybeEtcdResponse {
return MaybeEtcdResponse{EtcdResponse: EtcdResponse{Defragment: &DefragmentResponse{}, Revision: revision}}
}

func compactRequest(rev int64) EtcdRequest {
return EtcdRequest{Type: Compact, Compact: &CompactRequest{Revision: rev}}
}

func compactResponse(revision int64) MaybeEtcdResponse {
return MaybeEtcdResponse{EtcdResponse: EtcdResponse{Compact: &CompactResponse{}, Revision: revision}}
}

type History struct {
operations []porcupine.Operation
}
Expand Down
6 changes: 5 additions & 1 deletion tests/robustness/report/wal.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,11 @@ func parseEntryNormal(ent raftpb.Entry) (*model.EtcdRequest, error) {
case raftReq.ClusterVersionSet != nil:
return nil, nil
case raftReq.Compaction != nil:
return nil, nil
request := model.EtcdRequest{
Type: model.Compact,
Compact: &model.CompactRequest{Revision: raftReq.Compaction.Revision},
}
return &request, nil
case raftReq.Txn != nil:
txn := model.TxnRequest{
Conditions: []model.EtcdCondition{},
Expand Down
15 changes: 12 additions & 3 deletions tests/robustness/traffic/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,14 @@ var (
{choice: List, weight: 15},
{choice: StaleGet, weight: 10},
{choice: StaleList, weight: 10},
{choice: Put, weight: 23},
{choice: LargePut, weight: 2},
{choice: Delete, weight: 5},
{choice: MultiOpTxn, weight: 5},
{choice: PutWithLease, weight: 5},
{choice: LeaseRevoke, weight: 5},
{choice: CompareAndSet, weight: 5},
{choice: Put, weight: 22},
{choice: LargePut, weight: 2},
{choice: Compact, weight: 1},
},
}
EtcdPut = etcdTraffic{
Expand All @@ -56,9 +57,10 @@ var (
{choice: List, weight: 15},
{choice: StaleGet, weight: 10},
{choice: StaleList, weight: 10},
{choice: Put, weight: 40},
{choice: MultiOpTxn, weight: 5},
{choice: LargePut, weight: 5},
{choice: Put, weight: 39},
{choice: Compact, weight: 1},
},
}
)
Expand Down Expand Up @@ -89,6 +91,7 @@ const (
LeaseRevoke etcdRequestType = "leaseRevoke"
CompareAndSet etcdRequestType = "compareAndSet"
Defragment etcdRequestType = "defragment"
Compact etcdRequestType = "compact"
)

func (t etcdTraffic) Name() string {
Expand Down Expand Up @@ -264,6 +267,12 @@ func (c etcdTrafficClient) Request(ctx context.Context, request etcdRequestType,
if resp != nil {
rev = resp.Header.Revision
}
case Compact:
var resp *clientv3.CompactResponse
resp, err = c.client.Compact(opCtx, lastRev)
if resp != nil {
rev = resp.Header.Revision
}
default:
panic("invalid choice")
}
Expand Down
17 changes: 13 additions & 4 deletions tests/robustness/traffic/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,10 @@ var (
resource: "pods",
namespace: "default",
writeChoices: []choiceWeight[KubernetesRequestType]{
{choice: KubernetesUpdate, weight: 90},
{choice: KubernetesUpdate, weight: 89},
{choice: KubernetesDelete, weight: 5},
{choice: KubernetesCreate, weight: 5},
{choice: KubernetesCompact, weight: 5},
},
}
)
Expand Down Expand Up @@ -167,6 +168,8 @@ func (t kubernetesTraffic) Write(ctx context.Context, kc *kubernetesClient, ids
_, err = kc.OptimisticUpdate(writeCtx, key, fmt.Sprintf("%d", ids.NewRequestID()), rev)
case KubernetesCreate:
err = kc.OptimisticCreate(writeCtx, t.generateKey(), fmt.Sprintf("%d", ids.NewRequestID()))
case KubernetesCompact:
err = kc.Compact(writeCtx, rev)
default:
panic(fmt.Sprintf("invalid choice: %q", op))
}
Expand Down Expand Up @@ -209,9 +212,10 @@ func (t kubernetesTraffic) generateKey() string {
type KubernetesRequestType string

const (
KubernetesDelete KubernetesRequestType = "delete"
KubernetesUpdate KubernetesRequestType = "update"
KubernetesCreate KubernetesRequestType = "create"
KubernetesDelete KubernetesRequestType = "delete"
KubernetesUpdate KubernetesRequestType = "update"
KubernetesCreate KubernetesRequestType = "create"
KubernetesCompact KubernetesRequestType = "compact"
)

type kubernetesClient struct {
Expand Down Expand Up @@ -250,6 +254,11 @@ func (k kubernetesClient) RequestProgress(ctx context.Context) error {
return k.client.RequestProgress(clientv3.WithRequireLeader(ctx))
}

func (k kubernetesClient) Compact(ctx context.Context, rev int64) error {
_, err := k.client.Compact(ctx, rev)
return err
}

// Kubernetes optimistically assumes that key didn't change since it was last observed, so it executes operations within a transaction conditioned on key not changing.
// However, if the keys value changed it wants imminently to read it, thus the Get operation on failure.
func (k kubernetesClient) optimisticOperationOrGet(ctx context.Context, key string, operation clientv3.Op, expectedRevision int64) (*mvccpb.KeyValue, error) {
Expand Down
2 changes: 1 addition & 1 deletion tests/robustness/traffic/traffic.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import (

var (
DefaultLeaseTTL int64 = 7200
RequestTimeout = 40 * time.Millisecond
RequestTimeout = 200 * time.Millisecond
WatchTimeout = 400 * time.Millisecond
MultiOpTxnOpCount = 4

Expand Down
2 changes: 2 additions & 0 deletions tests/robustness/validate/patch_history.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,7 @@ func persistedOperationsReturnTime(allOperations []porcupine.Operation, persiste
}
}
case model.LeaseGrant:
case model.Compact:
default:
panic(fmt.Sprintf("Unknown request type: %q", request.Type))
}
Expand All @@ -216,6 +217,7 @@ func operationReturnTime(operations []porcupine.Operation) map[model.EtcdOperati
}
case model.Range:
case model.LeaseGrant:
case model.Compact:
default:
panic(fmt.Sprintf("Unknown request type: %q", request.Type))
}
Expand Down
8 changes: 3 additions & 5 deletions tests/robustness/validate/validate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,9 @@ func TestDataReports(t *testing.T) {
}
visualize := ValidateAndReturnVisualize(t, zaptest.NewLogger(t), Config{}, reports, persistedRequests, 5*time.Minute)

if t.Failed() {
err := visualize(filepath.Join(path, "history.html"))
if err != nil {
t.Fatal(err)
}
err = visualize(filepath.Join(path, "history.html"))
if err != nil {
t.Fatal(err)
}
})
}
Expand Down
Loading

0 comments on commit b1032f6

Please sign in to comment.