Skip to content

Commit

Permalink
Merge pull request #4932 from nmaupu/fix-ovh-reconcile-issues
Browse files Browse the repository at this point in the history
fix(ovh): cache refresh and duplicates processing
  • Loading branch information
k8s-ci-robot authored Jan 14, 2025
2 parents 9368a24 + fc57879 commit d0ae7cf
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 24 deletions.
74 changes: 51 additions & 23 deletions provider/ovh/ovh.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,18 +137,34 @@ func (p *OVHProvider) Records(ctx context.Context) ([]*endpoint.Endpoint, error)
}

// ApplyChanges applies a given set of changes in a given zone.
func (p *OVHProvider) ApplyChanges(ctx context.Context, changes *plan.Changes) error {
func (p *OVHProvider) ApplyChanges(ctx context.Context, changes *plan.Changes) (err error) {
zones, records, err := p.zonesRecords(ctx)
zonesChangeUniques := map[string]bool{}
if err != nil {
return err
return provider.NewSoftError(err)
}

allChanges := make([]ovhChange, 0, countTargets(changes.Create, changes.UpdateNew, changes.UpdateOld, changes.Delete))
zonesChangeUniques := map[string]bool{}

// Always refresh zones even in case of errors.
defer func() {
log.Debugf("OVH: %d zones will be refreshed", len(zonesChangeUniques))

eg, _ := errgroup.WithContext(ctx)
for zone := range zonesChangeUniques {
// This is necessary because the loop variable zone is reused in each iteration of the loop,
// and without this line, the goroutines launched by eg.Go would all reference the same zone variable.
zone := zone
eg.Go(func() error { return p.refresh(zone) })
}

if e := eg.Wait(); e != nil && err == nil { // return the error only if there is no error during the changes
err = provider.NewSoftError(e)
}
}()

allChanges := make([]ovhChange, 0, countTargets(changes.Create, changes.UpdateNew, changes.UpdateOld, changes.Delete))
allChanges = append(allChanges, newOvhChange(ovhCreate, changes.Create, zones, records)...)
allChanges = append(allChanges, newOvhChange(ovhCreate, changes.UpdateNew, zones, records)...)

allChanges = append(allChanges, newOvhChange(ovhDelete, changes.UpdateOld, zones, records)...)
allChanges = append(allChanges, newOvhChange(ovhDelete, changes.Delete, zones, records)...)

Expand All @@ -161,27 +177,24 @@ func (p *OVHProvider) ApplyChanges(ctx context.Context, changes *plan.Changes) e
eg.Go(func() error { return p.change(change) })
}
if err := eg.Wait(); err != nil {
return err
return provider.NewSoftError(err)
}

log.Infof("OVH: %d zones will be refreshed", len(zonesChangeUniques))

eg, _ = errgroup.WithContext(ctx)
for zone := range zonesChangeUniques {
zone := zone
eg.Go(func() error { return p.refresh(zone) })
}
if err := eg.Wait(); err != nil {
return err
}
return nil
}

func (p *OVHProvider) refresh(zone string) error {
log.Debugf("OVH: Refresh %s zone", zone)

// Zone has been altered so we invalidate the cache
// so that the next run will reload it.
p.invalidateCache(zone)

p.apiRateLimiter.Take()
return p.client.Post(fmt.Sprintf("/domain/zone/%s/refresh", zone), nil, nil)
if err := p.client.Post(fmt.Sprintf("/domain/zone/%s/refresh", zone), nil, nil); err != nil {
return provider.NewSoftError(err)
}
return nil
}

func (p *OVHProvider) change(change ovhChange) error {
Expand All @@ -201,11 +214,15 @@ func (p *OVHProvider) change(change ovhChange) error {
return nil
}

func (p *OVHProvider) invalidateCache(zone string) {
p.cacheInstance.Delete(zone + "#soa")
}

func (p *OVHProvider) zonesRecords(ctx context.Context) ([]string, []ovhRecord, error) {
var allRecords []ovhRecord
zones, err := p.zones()
if err != nil {
return nil, nil, err
return nil, nil, provider.NewSoftError(err)
}

chRecords := make(chan []ovhRecord, len(zones))
Expand All @@ -215,7 +232,7 @@ func (p *OVHProvider) zonesRecords(ctx context.Context) ([]string, []ovhRecord,
eg.Go(func() error { return p.records(&ctx, &zone, chRecords) })
}
if err := eg.Wait(); err != nil {
return nil, nil, err
return nil, nil, provider.NewSoftError(err)
}
close(chRecords)
for records := range chRecords {
Expand Down Expand Up @@ -270,7 +287,7 @@ func (p *OVHProvider) records(ctx *context.Context, zone *string, records chan<-
}
}

p.cacheInstance.Delete(*zone + "#soa")
p.invalidateCache(*zone)
}
}

Expand Down Expand Up @@ -359,6 +376,10 @@ func ovhGroupByNameAndType(records []ovhRecord) []*endpoint.Endpoint {
}

func newOvhChange(action int, endpoints []*endpoint.Endpoint, zones []string, records []ovhRecord) []ovhChange {
// Copy the records because we need to mutate the list.
newRecords := make([]ovhRecord, len(records))
copy(newRecords, records)

zoneNameIDMapper := provider.ZoneIDName{}
ovhChanges := make([]ovhChange, 0, countTargets(endpoints))
for _, zone := range zones {
Expand Down Expand Up @@ -390,9 +411,16 @@ func newOvhChange(action int, endpoints []*endpoint.Endpoint, zones []string, re
if e.RecordTTL.IsConfigured() {
change.TTL = int64(e.RecordTTL)
}
for _, record := range records {
if record.Zone == change.Zone && record.SubDomain == change.SubDomain && record.FieldType == change.FieldType && record.Target == change.Target {
change.ID = record.ID

// The Zone might have multiple records with the same target. In order to avoid applying the action to the
// same OVH record, we remove a record from the list when a match is found.
for i := 0; i < len(newRecords); i++ {
rec := newRecords[i]
if rec.Zone == change.Zone && rec.SubDomain == change.SubDomain && rec.FieldType == change.FieldType && rec.Target == change.Target {
change.ID = rec.ID
// Deleting this record from the list to avoid retargetting it later if a change with a similar target exists.
newRecords = append(newRecords[:i], newRecords[i+1:]...)
break
}
}
ovhChanges = append(ovhChanges, change)
Expand Down
7 changes: 6 additions & 1 deletion provider/ovh/ovh_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -324,14 +324,18 @@ func TestOvhNewChange(t *testing.T) {

// Delete change
endpoints = []*endpoint.Endpoint{
{DNSName: "ovh.example.net", RecordType: "A", Targets: []string{"203.0.113.42"}},
{DNSName: "ovh.example.net", RecordType: "A", Targets: []string{"203.0.113.42", "203.0.113.42", "203.0.113.42"}},
}
records := []ovhRecord{
{ID: 42, Zone: "example.net", ovhRecordFields: ovhRecordFields{FieldType: "A", SubDomain: "ovh", Target: "203.0.113.42"}},
{ID: 43, Zone: "example.net", ovhRecordFields: ovhRecordFields{FieldType: "A", SubDomain: "ovh", Target: "203.0.113.42"}},
{ID: 44, Zone: "example.net", ovhRecordFields: ovhRecordFields{FieldType: "A", SubDomain: "ovh", Target: "203.0.113.42"}},
}
changes = newOvhChange(ovhDelete, endpoints, []string{"example.net"}, records)
assert.ElementsMatch(changes, []ovhChange{
{Action: ovhDelete, ovhRecord: ovhRecord{ID: 42, Zone: "example.net", ovhRecordFields: ovhRecordFields{SubDomain: "ovh", FieldType: "A", TTL: ovhDefaultTTL, Target: "203.0.113.42"}}},
{Action: ovhDelete, ovhRecord: ovhRecord{ID: 43, Zone: "example.net", ovhRecordFields: ovhRecordFields{SubDomain: "ovh", FieldType: "A", TTL: ovhDefaultTTL, Target: "203.0.113.42"}}},
{Action: ovhDelete, ovhRecord: ovhRecord{ID: 44, Zone: "example.net", ovhRecordFields: ovhRecordFields{SubDomain: "ovh", FieldType: "A", TTL: ovhDefaultTTL, Target: "203.0.113.42"}}},
})
}

Expand Down Expand Up @@ -368,6 +372,7 @@ func TestOvhApplyChanges(t *testing.T) {
client.On("Get", "/domain/zone").Return([]string{"example.net"}, nil).Once()
client.On("Get", "/domain/zone/example.net/record").Return([]uint64{}, nil).Once()
client.On("Post", "/domain/zone/example.net/record", ovhRecordFields{SubDomain: "", FieldType: "A", TTL: 10, Target: "203.0.113.42"}).Return(nil, ovh.ErrAPIDown).Once()
client.On("Post", "/domain/zone/example.net/refresh", nil).Return(nil, nil).Once()
assert.Error(provider.ApplyChanges(context.TODO(), &plan.Changes{
Create: []*endpoint.Endpoint{
{DNSName: ".example.net", RecordType: "A", RecordTTL: 10, Targets: []string{"203.0.113.42"}},
Expand Down

0 comments on commit d0ae7cf

Please sign in to comment.