Skip to content

Commit

Permalink
Separate persisted responses without knowing their revision to preven…
Browse files Browse the repository at this point in the history
…t duplicating state during linearization

Signed-off-by: Marek Siarkowicz <[email protected]>
  • Loading branch information
serathius committed Jun 23, 2024
1 parent 16ed0fe commit 51cfad7
Show file tree
Hide file tree
Showing 9 changed files with 163 additions and 37 deletions.
7 changes: 5 additions & 2 deletions tests/robustness/model/describe.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,11 @@ func describeEtcdResponse(request EtcdRequest, response MaybeEtcdResponse) strin
if response.ClientError != "" {
return fmt.Sprintf("err: %q", response.ClientError)
}
if response.PartialResponse {
return fmt.Sprintf("unknown, rev: %d", response.Revision)
if response.Persisted {
if response.PersistedRevision != 0 {
return fmt.Sprintf("unknown, rev: %d", response.PersistedRevision)
}
return fmt.Sprintf("unknown")
}
switch request.Type {
case Range:
Expand Down
28 changes: 19 additions & 9 deletions tests/robustness/model/deterministic.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ func (s EtcdState) Step(request EtcdRequest) (EtcdState, MaybeEtcdResponse) {
if request.Range.Revision < newState.CompactRevision {
return newState, MaybeEtcdResponse{EtcdResponse: EtcdResponse{ClientError: mvcc.ErrCompacted.Error()}}
}
return newState, MaybeEtcdResponse{PartialResponse: true, EtcdResponse: EtcdResponse{Revision: newState.Revision}}
return newState, MaybeEtcdResponse{Persisted: true, PersistedRevision: newState.Revision}
case Txn:
failure := false
for _, cond := range request.Txn.Conditions {
Expand Down Expand Up @@ -351,15 +351,17 @@ type LeaseRevokeRequest struct {
}
type DefragmentRequest struct{}

// MaybeEtcdResponse extends EtcdResponse to represent partial or failed responses.
// Possible states:
// * Normal response. Only EtcdResponse is set.
// * Partial response. The EtcdResponse.Revision and PartialResponse are set.
// * Failed response. Only Err is set.
// MaybeEtcdResponse extends EtcdResponse to include partial information about responses to a request.
// Possible response state information:
// * Normal response. Client observed response. Only EtcdResponse is set.
// * Persisted. Client didn't observe response, but we know it was persisted by etcd. Only Persisted is set
// * Persisted with Revision. Client didn't observe response, but we know that it was persisted, and it's revision. Both Persisted and PersistedRevision is set.
// * Error response. Client observed error, but we don't know if it was persisted. Only Error is set.
type MaybeEtcdResponse struct {
EtcdResponse
PartialResponse bool
Error string
Persisted bool
PersistedRevision int64
Error string
}

var ErrEtcdFutureRev = errors.New("future rev")
Expand All @@ -376,7 +378,15 @@ type EtcdResponse struct {
}

func Match(r1, r2 MaybeEtcdResponse) bool {
return ((r1.PartialResponse || r2.PartialResponse) && (r1.Revision == r2.Revision)) || reflect.DeepEqual(r1, r2)
r1Revision := r1.Revision
if r1.Persisted {
r1Revision = r1.PersistedRevision
}
r2Revision := r2.Revision
if r2.Persisted {
r2Revision = r2.PersistedRevision
}
return (r1.Persisted && r1.PersistedRevision == 0) || (r2.Persisted && r2.PersistedRevision == 0) || ((r1.Persisted || r2.Persisted) && (r1.Error != "" || r2.Error != "" || r1Revision == r2Revision)) || reflect.DeepEqual(r1, r2)
}

type TxnResponse struct {
Expand Down
1 change: 0 additions & 1 deletion tests/robustness/model/deterministic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
"testing"

"github.com/google/go-cmp/cmp"

"go.etcd.io/etcd/api/v3/mvccpb"
)

Expand Down
2 changes: 1 addition & 1 deletion tests/robustness/model/history.go
Original file line number Diff line number Diff line change
Expand Up @@ -365,7 +365,7 @@ func failedResponse(err error) MaybeEtcdResponse {
}

func partialResponse(revision int64) MaybeEtcdResponse {
return MaybeEtcdResponse{PartialResponse: true, EtcdResponse: EtcdResponse{Revision: revision}}
return MaybeEtcdResponse{Persisted: true, PersistedRevision: revision}
}

func putRequest(key, value string) EtcdRequest {
Expand Down
32 changes: 22 additions & 10 deletions tests/robustness/model/non_deterministic.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,17 +58,19 @@ func (states nonDeterministicState) apply(request EtcdRequest, response MaybeEtc
var newStates nonDeterministicState
switch {
case response.Error != "":
newStates = states.stepFailedResponse(request)
case response.PartialResponse:
newStates = states.applyResponseRevision(request, response.EtcdResponse.Revision)
newStates = states.applyFailedRequest(request)
case response.Persisted && response.PersistedRevision == 0:
newStates = states.applyPersistedRequest(request)
case response.Persisted && response.PersistedRevision != 0:
newStates = states.applyPersistedRequestWithRevision(request, response.PersistedRevision)
default:
newStates = states.applySuccessfulResponse(request, response.EtcdResponse)
newStates = states.applyRequestWithResponse(request, response.EtcdResponse)
}
return len(newStates) > 0, newStates
}

// stepFailedResponse duplicates number of states by considering both cases, request was persisted and request was lost.
func (states nonDeterministicState) stepFailedResponse(request EtcdRequest) nonDeterministicState {
// applyFailedRequest returns both the original states and states with applied request. It considers both cases, request was persisted and request was lost.
func (states nonDeterministicState) applyFailedRequest(request EtcdRequest) nonDeterministicState {
newStates := make(nonDeterministicState, 0, len(states)*2)
for _, s := range states {
newStates = append(newStates, s)
Expand All @@ -80,8 +82,18 @@ func (states nonDeterministicState) stepFailedResponse(request EtcdRequest) nonD
return newStates
}

// applyResponseRevision filters possible states by leaving ony states that would return proper revision.
func (states nonDeterministicState) applyResponseRevision(request EtcdRequest, responseRevision int64) nonDeterministicState {
// applyPersistedRequest applies request to all possible states.
func (states nonDeterministicState) applyPersistedRequest(request EtcdRequest) nonDeterministicState {
newStates := make(nonDeterministicState, 0, len(states))
for _, s := range states {
newState, _ := s.Step(request)
newStates = append(newStates, newState)
}
return newStates
}

// applyPersistedRequestWithRevision applies request to all possible states, but leaves only states that would return proper revision.
func (states nonDeterministicState) applyPersistedRequestWithRevision(request EtcdRequest, responseRevision int64) nonDeterministicState {
newStates := make(nonDeterministicState, 0, len(states))
for _, s := range states {
newState, modelResponse := s.Step(request)
Expand All @@ -92,8 +104,8 @@ func (states nonDeterministicState) applyResponseRevision(request EtcdRequest, r
return newStates
}

// applySuccessfulResponse filters possible states by leaving ony states that would respond correctly.
func (states nonDeterministicState) applySuccessfulResponse(request EtcdRequest, response EtcdResponse) nonDeterministicState {
// applyRequestWithResponse applies request to all possible states, but leaves only state that would return proper response.
func (states nonDeterministicState) applyRequestWithResponse(request EtcdRequest, response EtcdResponse) nonDeterministicState {
newStates := make(nonDeterministicState, 0, len(states))
for _, s := range states {
newState, modelResponse := s.Step(request)
Expand Down
80 changes: 75 additions & 5 deletions tests/robustness/model/non_deterministic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -391,7 +391,7 @@ func TestModelResponseMatch(t *testing.T) {
},
{
resp1: getResponse("key", "a", 1, 1),
resp2: partialResponse(0),
resp2: partialResponse(2),
expectMatch: false,
},
{
Expand All @@ -416,9 +416,14 @@ func TestModelResponseMatch(t *testing.T) {
},
{
resp1: putResponse(3),
resp2: partialResponse(0),
resp2: partialResponse(1),
expectMatch: false,
},
{
resp1: putResponse(3),
resp2: partialResponse(0),
expectMatch: true,
},
{
resp1: deleteResponse(1, 5),
resp2: deleteResponse(1, 5),
Expand Down Expand Up @@ -446,13 +451,18 @@ func TestModelResponseMatch(t *testing.T) {
},
{
resp1: deleteResponse(0, 5),
resp2: partialResponse(0),
resp2: partialResponse(4),
expectMatch: false,
},
{
resp1: deleteResponse(0, 5),
resp2: partialResponse(0),
expectMatch: true,
},
{
resp1: deleteResponse(1, 5),
resp2: partialResponse(0),
expectMatch: false,
expectMatch: true,
},
{
resp1: deleteResponse(0, 5),
Expand Down Expand Up @@ -491,12 +501,72 @@ func TestModelResponseMatch(t *testing.T) {
},
{
resp1: compareRevisionAndPutResponse(true, 7),
resp2: partialResponse(0),
resp2: partialResponse(4),
expectMatch: false,
},
{
resp1: compareRevisionAndPutResponse(false, 7),
resp2: partialResponse(3),
expectMatch: false,
},
{
resp1: compareRevisionAndPutResponse(false, 7),
resp2: partialResponse(0),
expectMatch: true,
},
{
resp1: MaybeEtcdResponse{EtcdResponse: EtcdResponse{Revision: 1, Txn: &TxnResponse{Failure: false, Results: []EtcdOperationResult{{Deleted: 1}}}}},
resp2: failedResponse(errors.New("failed request")),
expectMatch: false,
},
{
resp1: failedResponse(errors.New("failed request 1")),
resp2: failedResponse(errors.New("failed request 2")),
expectMatch: false,
},
{
resp1: failedResponse(errors.New("failed request")),
resp2: failedResponse(errors.New("failed request")),
expectMatch: true,
},
{
resp1: putResponse(2),
resp2: MaybeEtcdResponse{Persisted: true},
expectMatch: true,
},
{
resp1: putResponse(2),
resp2: MaybeEtcdResponse{Persisted: true, PersistedRevision: 2},
expectMatch: true,
},
{
resp1: putResponse(2),
resp2: MaybeEtcdResponse{Persisted: true, PersistedRevision: 3},
expectMatch: false,
},
{
resp1: failedResponse(errors.New("failed request")),
resp2: MaybeEtcdResponse{Persisted: true},
expectMatch: true,
},
{
resp1: failedResponse(errors.New("failed request")),
resp2: MaybeEtcdResponse{Persisted: true, PersistedRevision: 2},
expectMatch: true,
},
{
resp1: MaybeEtcdResponse{Persisted: true},
resp2: MaybeEtcdResponse{Persisted: true, PersistedRevision: 2},
expectMatch: true,
},
{
resp1: MaybeEtcdResponse{Persisted: true, PersistedRevision: 2},
resp2: MaybeEtcdResponse{Persisted: true, PersistedRevision: 2},
expectMatch: true,
},
{
resp1: MaybeEtcdResponse{Persisted: true, PersistedRevision: 1},
resp2: MaybeEtcdResponse{Persisted: true, PersistedRevision: 2},
expectMatch: false,
},
}
Expand Down
2 changes: 1 addition & 1 deletion tests/robustness/validate/operations.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func filterSerializableOperations(clients []report.ClientReport) []porcupine.Ope
}

func validateSerializableRead(lg *zap.Logger, replay *model.EtcdReplay, request model.EtcdRequest, response model.MaybeEtcdResponse) error {
if response.PartialResponse || response.Error != "" {
if response.Persisted || response.Error != "" {
return nil
}
state, err := replay.StateForRevision(request.Range.Revision)
Expand Down
25 changes: 17 additions & 8 deletions tests/robustness/validate/patch_history.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ func patchOperations(operations []porcupine.Operation, watchEvents map[model.Eve
newOperations = append(newOperations, op)
continue
}
var resourceVersion int64
if op.Call <= lastObservedOperation.Call {
matchingEvent := matchWatchEvent(request.Txn, watchEvents)
if matchingEvent != nil {
Expand All @@ -84,7 +85,7 @@ func patchOperations(operations []porcupine.Operation, watchEvents map[model.Eve
if eventTime < op.Return {
op.Return = eventTime
}
op.Output = model.MaybeEtcdResponse{PartialResponse: true, EtcdResponse: model.EtcdResponse{Revision: matchingEvent.Revision}}
resourceVersion = matchingEvent.Revision
}
}
persistedReturnTime := matchReturnTime(request, persistedOperations)
Expand All @@ -94,9 +95,17 @@ func patchOperations(operations []porcupine.Operation, watchEvents map[model.Eve
op.Return = *persistedReturnTime
}
}
if persistedReturnTime == nil && canBeDiscarded(request.Txn) {
// Remove non persisted operations
continue
if isUniqueTxn(request.Txn) {
if persistedReturnTime == nil {
// Remove non persisted operations
continue
} else {
if resourceVersion != 0 {
op.Output = model.MaybeEtcdResponse{Persisted: true, PersistedRevision: resourceVersion}
} else {
op.Output = model.MaybeEtcdResponse{Persisted: true}
}
}
}
// Leave operation as it is as we cannot discard it.
newOperations = append(newOperations, op)
Expand Down Expand Up @@ -137,12 +146,12 @@ func matchWatchEvent(request *model.TxnRequest, watchEvents map[model.Event]clie
return nil
}

func canBeDiscarded(request *model.TxnRequest) bool {
return operationsCanBeDiscarded(request.OperationsOnSuccess) && operationsCanBeDiscarded(request.OperationsOnFailure)
func isUniqueTxn(request *model.TxnRequest) bool {
return (hasUniqueWriteOperation(request.OperationsOnSuccess) || !hasWriteOperation(request.OperationsOnSuccess)) && (hasUniqueWriteOperation(request.OperationsOnFailure) || !hasWriteOperation(request.OperationsOnFailure))
}

func operationsCanBeDiscarded(ops []model.EtcdOperation) bool {
return hasUniqueWriteOperation(ops) || !hasWriteOperation(ops)
func hasWriteTxn(txn *model.TxnRequest) bool {
return hasWriteOperation(txn.OperationsOnSuccess) || hasWriteOperation(txn.OperationsOnFailure)
}

func hasWriteOperation(ops []model.EtcdOperation) bool {
Expand Down
23 changes: 23 additions & 0 deletions tests/robustness/validate/patch_history_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,29 @@ func TestPatchHistory(t *testing.T) {
},
expectedRemainingOperations: 1,
},
{
name: "failed delete remains if there is a matching event",
historyFunc: func(baseTime time.Time, h *model.AppendableHistory) {
start := time.Since(baseTime)
time.Sleep(time.Nanosecond)
stop := time.Since(baseTime)
h.AppendDelete("key", start, stop, nil, errors.New("failed"))
},
persistedRequest: &model.EtcdRequest{
Type: model.Txn,
Txn: &model.TxnRequest{
OperationsOnSuccess: []model.EtcdOperation{
{
Type: model.DeleteOperation,
Delete: model.DeleteOptions{
Key: "key",
},
},
},
},
},
expectedRemainingOperations: 1,
},
{
name: "failed put is dropped if event has different key",
historyFunc: func(baseTime time.Time, h *model.AppendableHistory) {
Expand Down

0 comments on commit 51cfad7

Please sign in to comment.