Skip to content

Commit

Permalink
perf: Add workflow template informer to server (#13672)
Browse files Browse the repository at this point in the history
Signed-off-by: Jakub Buczak <[email protected]>
  • Loading branch information
jakkubu authored Nov 26, 2024
1 parent 40aded2 commit f22ae3b
Show file tree
Hide file tree
Showing 22 changed files with 622 additions and 65 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -623,7 +623,7 @@ Test%:
E2E_WAIT_TIMEOUT=$(E2E_WAIT_TIMEOUT) go test -failfast -v -timeout $(E2E_SUITE_TIMEOUT) -count 1 --tags $(ALL_BUILD_TAGS) -parallel $(E2E_PARALLEL) ./test/e2e -run='.*/$*'

Benchmark%:
go test --tags $(ALL_BUILD_TAGS) ./test/e2e -run='$@' -benchmem -bench .
go test --tags $(ALL_BUILD_TAGS) ./test/e2e -run='$@' -benchmem -bench '$@' .

# clean

Expand Down
3 changes: 2 additions & 1 deletion pkg/apiclient/apiclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ type Client interface {

type Opts struct {
ArgoServerOpts ArgoServerOpts
ArgoKubeOpts ArgoKubeOpts
InstanceID string
AuthSupplier func() string
// DEPRECATED: use `ClientConfigSupplier`
Expand Down Expand Up @@ -84,7 +85,7 @@ func NewClientFromOpts(opts Opts) (context.Context, Client, error) {
opts.ClientConfig = opts.ClientConfigSupplier()
}

ctx, client, err := newArgoKubeClient(opts.GetContext(), opts.ClientConfig, instanceid.NewService(opts.InstanceID))
ctx, client, err := newArgoKubeClient(opts.GetContext(), opts.ArgoKubeOpts, opts.ClientConfig, instanceid.NewService(opts.InstanceID))
return ctx, client, err
}
}
80 changes: 71 additions & 9 deletions pkg/apiclient/argo-kube-client.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,6 @@ import (
restclient "k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"

"github.com/argoproj/argo-workflows/v3/server/workflow/store"

"github.com/argoproj/argo-workflows/v3"
"github.com/argoproj/argo-workflows/v3/persist/sqldb"
"github.com/argoproj/argo-workflows/v3/pkg/apiclient/clusterworkflowtemplate"
Expand All @@ -27,6 +25,8 @@ import (
cronworkflowserver "github.com/argoproj/argo-workflows/v3/server/cronworkflow"
"github.com/argoproj/argo-workflows/v3/server/types"
workflowserver "github.com/argoproj/argo-workflows/v3/server/workflow"
"github.com/argoproj/argo-workflows/v3/server/workflow/store"
workflowstore "github.com/argoproj/argo-workflows/v3/server/workflow/store"
workflowtemplateserver "github.com/argoproj/argo-workflows/v3/server/workflowtemplate"
"github.com/argoproj/argo-workflows/v3/util/help"
"github.com/argoproj/argo-workflows/v3/util/instanceid"
Expand All @@ -37,14 +37,34 @@ var (
NoArgoServerErr = fmt.Errorf("this is impossible if you are not using the Argo Server, see %s", help.CLI())
)

type ArgoKubeOpts struct {
// Closing caching channel will stop caching informers
CachingCloseCh chan struct{}

// Whether to cache WorkflowTemplates, ClusterWorkflowTemplates and Workflows
// This improves performance of reading
// It is especially visible during validating templates,
//
// Note that templates caching currently uses informers, so not all template
// get/list can use it, since informer has limited capabilities (such as filtering)
//
// Workflow caching uses in-memory SQLite DB and it provides full capabilities
UseCaching bool
}

type argoKubeClient struct {
opts ArgoKubeOpts
instanceIDService instanceid.Service
wfClient workflow.Interface
wfTmplStore types.WorkflowTemplateStore
cwfTmplStore types.ClusterWorkflowTemplateStore
wfLister workflowstore.WorkflowLister
wfStore workflowstore.WorkflowStore
}

var _ Client = &argoKubeClient{}

func newArgoKubeClient(ctx context.Context, clientConfig clientcmd.ClientConfig, instanceIDService instanceid.Service) (context.Context, Client, error) {
func newArgoKubeClient(ctx context.Context, opts ArgoKubeOpts, clientConfig clientcmd.ClientConfig, instanceIDService instanceid.Service) (context.Context, Client, error) {
restConfig, err := clientConfig.ClientConfig()
if err != nil {
return nil, nil, err
Expand All @@ -59,6 +79,10 @@ func newArgoKubeClient(ctx context.Context, clientConfig clientcmd.ClientConfig,
if err != nil {
return nil, nil, err
}
namespace, _, err := clientConfig.Namespace()
if err != nil {
return nil, nil, err
}
eventSourceInterface, err := eventsource.NewForConfig(restConfig)
if err != nil {
return nil, nil, err
Expand Down Expand Up @@ -86,21 +110,59 @@ func newArgoKubeClient(ctx context.Context, clientConfig clientcmd.ClientConfig,
if err != nil {
return nil, nil, err
}
return ctx, &argoKubeClient{instanceIDService, wfClient}, nil

client := &argoKubeClient{
opts: opts,
instanceIDService: instanceIDService,
wfClient: wfClient,
}
err = client.startStores(restConfig, namespace)
if err != nil {
return nil, nil, err
}

return ctx, client, nil
}

func (a *argoKubeClient) startStores(restConfig *restclient.Config, namespace string) error {
if a.opts.UseCaching {
wftmplInformer, err := workflowtemplateserver.NewInformer(restConfig, namespace)
if err != nil {
return err
}
cwftmplInformer, err := clusterworkflowtmplserver.NewInformer(restConfig)
if err != nil {
return err
}
wfStore, err := store.NewSQLiteStore(a.instanceIDService)
if err != nil {
return err
}
wftmplInformer.Run(a.opts.CachingCloseCh)
cwftmplInformer.Run(a.opts.CachingCloseCh)
a.wfStore = wfStore
a.wfLister = wfStore
a.wfTmplStore = wftmplInformer
a.cwfTmplStore = cwftmplInformer
} else {
a.wfLister = store.NewKubeLister(a.wfClient)
a.wfTmplStore = workflowtemplateserver.NewWorkflowTemplateClientStore()
a.cwfTmplStore = clusterworkflowtmplserver.NewClusterWorkflowTemplateClientStore()
}
return nil
}

func (a *argoKubeClient) NewWorkflowServiceClient() workflowpkg.WorkflowServiceClient {
wfArchive := sqldb.NullWorkflowArchive
wfLister := store.NewKubeLister(a.wfClient)
return &errorTranslatingWorkflowServiceClient{&argoKubeWorkflowServiceClient{workflowserver.NewWorkflowServer(a.instanceIDService, argoKubeOffloadNodeStatusRepo, wfArchive, a.wfClient, wfLister, nil, nil)}}
return &errorTranslatingWorkflowServiceClient{&argoKubeWorkflowServiceClient{workflowserver.NewWorkflowServer(a.instanceIDService, argoKubeOffloadNodeStatusRepo, wfArchive, a.wfClient, a.wfLister, a.wfStore, a.wfTmplStore, a.cwfTmplStore, nil)}}
}

func (a *argoKubeClient) NewCronWorkflowServiceClient() (cronworkflow.CronWorkflowServiceClient, error) {
return &errorTranslatingCronWorkflowServiceClient{&argoKubeCronWorkflowServiceClient{cronworkflowserver.NewCronWorkflowServer(a.instanceIDService)}}, nil
return &errorTranslatingCronWorkflowServiceClient{&argoKubeCronWorkflowServiceClient{cronworkflowserver.NewCronWorkflowServer(a.instanceIDService, a.wfTmplStore, a.cwfTmplStore)}}, nil
}

func (a *argoKubeClient) NewWorkflowTemplateServiceClient() (workflowtemplate.WorkflowTemplateServiceClient, error) {
return &errorTranslatingWorkflowTemplateServiceClient{&argoKubeWorkflowTemplateServiceClient{workflowtemplateserver.NewWorkflowTemplateServer(a.instanceIDService)}}, nil
return &errorTranslatingWorkflowTemplateServiceClient{&argoKubeWorkflowTemplateServiceClient{workflowtemplateserver.NewWorkflowTemplateServer(a.instanceIDService, a.wfTmplStore, a.cwfTmplStore)}}, nil
}

func (a *argoKubeClient) NewArchivedWorkflowServiceClient() (workflowarchivepkg.ArchivedWorkflowServiceClient, error) {
Expand All @@ -112,5 +174,5 @@ func (a *argoKubeClient) NewInfoServiceClient() (infopkg.InfoServiceClient, erro
}

func (a *argoKubeClient) NewClusterWorkflowTemplateServiceClient() (clusterworkflowtemplate.ClusterWorkflowTemplateServiceClient, error) {
return &errorTranslatingWorkflowClusterTemplateServiceClient{&argoKubeWorkflowClusterTemplateServiceClient{clusterworkflowtmplserver.NewClusterWorkflowTemplateServer(a.instanceIDService)}}, nil
return &errorTranslatingWorkflowClusterTemplateServiceClient{&argoKubeWorkflowClusterTemplateServiceClient{clusterworkflowtmplserver.NewClusterWorkflowTemplateServer(a.instanceIDService, a.cwfTmplStore)}}, nil
}
26 changes: 19 additions & 7 deletions server/apiserver/argoserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ type argoServer struct {
apiRateLimiter limiter.Store
allowedLinkProtocol []string
cache *cache.ResourceCache
restConfig *rest.Config
}

type ArgoServerOpts struct {
Expand Down Expand Up @@ -184,6 +185,7 @@ func NewArgoServer(ctx context.Context, opts ArgoServerOpts) (*argoServer, error
apiRateLimiter: store,
allowedLinkProtocol: opts.AllowedLinkProtocol,
cache: resourceCache,
restConfig: opts.RestConfig,
}, nil
}

Expand Down Expand Up @@ -227,6 +229,15 @@ func (as *argoServer) Run(ctx context.Context, port int, browserOpenFunc func(st
// disable the archiving - and still read old records
wfArchive = sqldb.NewWorkflowArchive(session, persistence.GetClusterName(), as.managedNamespace, instanceIDService)
}
resourceCacheNamespace := getResourceCacheNamespace(as.managedNamespace)
wftmplStore, err := workflowtemplate.NewInformer(as.restConfig, resourceCacheNamespace)
if err != nil {
log.Fatal(err)
}
cwftmplInformer, err := clusterworkflowtemplate.NewInformer(as.restConfig)
if err != nil {
log.Fatal(err)
}
eventRecorderManager := events.NewEventRecorderManager(as.clients.Kubernetes)
artifactRepositories := artifactrepositories.New(as.clients.Kubernetes, as.managedNamespace, &config.ArtifactRepository)
artifactServer := artifacts.NewArtifactServer(as.gatekeeper, hydrator.New(offloadRepo), wfArchive, instanceIDService, artifactRepositories)
Expand All @@ -236,9 +247,8 @@ func (as *argoServer) Run(ctx context.Context, port int, browserOpenFunc func(st
if err != nil {
log.Fatal(err)
}
resourceCacheNamespace := getResourceCacheNamespace(as.managedNamespace)
workflowServer := workflow.NewWorkflowServer(instanceIDService, offloadRepo, wfArchive, as.clients.Workflow, wfStore, wfStore, &resourceCacheNamespace)
grpcServer := as.newGRPCServer(instanceIDService, workflowServer, wfArchiveServer, eventServer, config.Links, config.Columns, config.NavColor)
workflowServer := workflow.NewWorkflowServer(instanceIDService, offloadRepo, wfArchive, as.clients.Workflow, wfStore, wfStore, wftmplStore, cwftmplInformer, &resourceCacheNamespace)
grpcServer := as.newGRPCServer(instanceIDService, workflowServer, wftmplStore, cwftmplInformer, wfArchiveServer, eventServer, config.Links, config.Columns, config.NavColor)
httpServer := as.newHTTPServer(ctx, port, artifactServer)

// Start listener
Expand Down Expand Up @@ -267,6 +277,8 @@ func (as *argoServer) Run(ctx context.Context, port int, browserOpenFunc func(st
httpL := tcpm.Match(cmux.HTTP1Fast())
grpcL := tcpm.Match(cmux.Any())

wftmplStore.Run(as.stopCh)
cwftmplInformer.Run(as.stopCh)
go eventServer.Run(as.stopCh)
go workflowServer.Run(as.stopCh)
go func() { as.checkServeErr("grpcServer", grpcServer.Serve(grpcL)) }()
Expand All @@ -285,7 +297,7 @@ func (as *argoServer) Run(ctx context.Context, port int, browserOpenFunc func(st
<-as.stopCh
}

func (as *argoServer) newGRPCServer(instanceIDService instanceid.Service, workflowServer workflowpkg.WorkflowServiceServer, wfArchiveServer workflowarchivepkg.ArchivedWorkflowServiceServer, eventServer *event.Controller, links []*v1alpha1.Link, columns []*v1alpha1.Column, navColor string) *grpc.Server {
func (as *argoServer) newGRPCServer(instanceIDService instanceid.Service, workflowServer workflowpkg.WorkflowServiceServer, wftmplStore types.WorkflowTemplateStore, cwftmplStore types.ClusterWorkflowTemplateStore, wfArchiveServer workflowarchivepkg.ArchivedWorkflowServiceServer, eventServer *event.Controller, links []*v1alpha1.Link, columns []*v1alpha1.Column, navColor string) *grpc.Server {
serverLog := log.NewEntry(log.StandardLogger())

// "Prometheus histograms are a great way to measure latency distributions of your RPCs. However, since it is bad practice to have metrics of high cardinality the latency monitoring metrics are disabled by default. To enable them please call the following in your server initialization code:"
Expand Down Expand Up @@ -324,10 +336,10 @@ func (as *argoServer) newGRPCServer(instanceIDService instanceid.Service, workfl
eventsourcepkg.RegisterEventSourceServiceServer(grpcServer, eventsource.NewEventSourceServer())
sensorpkg.RegisterSensorServiceServer(grpcServer, sensor.NewSensorServer())
workflowpkg.RegisterWorkflowServiceServer(grpcServer, workflowServer)
workflowtemplatepkg.RegisterWorkflowTemplateServiceServer(grpcServer, workflowtemplate.NewWorkflowTemplateServer(instanceIDService))
cronworkflowpkg.RegisterCronWorkflowServiceServer(grpcServer, cronworkflow.NewCronWorkflowServer(instanceIDService))
workflowtemplatepkg.RegisterWorkflowTemplateServiceServer(grpcServer, workflowtemplate.NewWorkflowTemplateServer(instanceIDService, wftmplStore, cwftmplStore))
cronworkflowpkg.RegisterCronWorkflowServiceServer(grpcServer, cronworkflow.NewCronWorkflowServer(instanceIDService, wftmplStore, cwftmplStore))
workflowarchivepkg.RegisterArchivedWorkflowServiceServer(grpcServer, wfArchiveServer)
clusterwftemplatepkg.RegisterClusterWorkflowTemplateServiceServer(grpcServer, clusterworkflowtemplate.NewClusterWorkflowTemplateServer(instanceIDService))
clusterwftemplatepkg.RegisterClusterWorkflowTemplateServiceServer(grpcServer, clusterworkflowtemplate.NewClusterWorkflowTemplateServer(instanceIDService, cwftmplStore))
grpc_prometheus.Register(grpcServer)
return grpcServer
}
Expand Down
20 changes: 11 additions & 9 deletions server/clusterworkflowtemplate/cluster_workflow_template_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,20 +11,24 @@ import (
clusterwftmplpkg "github.com/argoproj/argo-workflows/v3/pkg/apiclient/clusterworkflowtemplate"
"github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
"github.com/argoproj/argo-workflows/v3/server/auth"
servertypes "github.com/argoproj/argo-workflows/v3/server/types"
"github.com/argoproj/argo-workflows/v3/util/instanceid"
"github.com/argoproj/argo-workflows/v3/workflow/creator"
"github.com/argoproj/argo-workflows/v3/workflow/templateresolution"
"github.com/argoproj/argo-workflows/v3/workflow/validate"

serverutils "github.com/argoproj/argo-workflows/v3/server/utils"
)

type ClusterWorkflowTemplateServer struct {
instanceIDService instanceid.Service
cwftmplStore servertypes.ClusterWorkflowTemplateStore
}

func NewClusterWorkflowTemplateServer(instanceID instanceid.Service) clusterwftmplpkg.ClusterWorkflowTemplateServiceServer {
return &ClusterWorkflowTemplateServer{instanceID}
func NewClusterWorkflowTemplateServer(instanceID instanceid.Service, cwftmplStore servertypes.ClusterWorkflowTemplateStore) clusterwftmplpkg.ClusterWorkflowTemplateServiceServer {
if cwftmplStore == nil {
cwftmplStore = NewClusterWorkflowTemplateClientStore()
}
return &ClusterWorkflowTemplateServer{instanceID, cwftmplStore}
}

func (cwts *ClusterWorkflowTemplateServer) CreateClusterWorkflowTemplate(ctx context.Context, req *clusterwftmplpkg.ClusterWorkflowTemplateCreateRequest) (*v1alpha1.ClusterWorkflowTemplate, error) {
Expand All @@ -34,7 +38,7 @@ func (cwts *ClusterWorkflowTemplateServer) CreateClusterWorkflowTemplate(ctx con
}
cwts.instanceIDService.Label(req.Template)
creator.Label(ctx, req.Template)
cwftmplGetter := templateresolution.WrapClusterWorkflowTemplateInterface(wfClient.ArgoprojV1alpha1().ClusterWorkflowTemplates())
cwftmplGetter := cwts.cwftmplStore.Getter(ctx)
err := validate.ValidateClusterWorkflowTemplate(nil, cwftmplGetter, req.Template, validate.ValidateOpts{})
if err != nil {
return nil, serverutils.ToStatusError(err, codes.InvalidArgument)
Expand All @@ -55,8 +59,7 @@ func (cwts *ClusterWorkflowTemplateServer) GetClusterWorkflowTemplate(ctx contex
}

func (cwts *ClusterWorkflowTemplateServer) getTemplateAndValidate(ctx context.Context, name string) (*v1alpha1.ClusterWorkflowTemplate, error) {
wfClient := auth.GetWfClient(ctx)
wfTmpl, err := wfClient.ArgoprojV1alpha1().ClusterWorkflowTemplates().Get(ctx, name, v1.GetOptions{})
wfTmpl, err := cwts.cwftmplStore.Getter(ctx).Get(name)
if err != nil {
return nil, serverutils.ToStatusError(err, codes.Internal)
}
Expand Down Expand Up @@ -101,8 +104,7 @@ func (cwts *ClusterWorkflowTemplateServer) DeleteClusterWorkflowTemplate(ctx con
func (cwts *ClusterWorkflowTemplateServer) LintClusterWorkflowTemplate(ctx context.Context, req *clusterwftmplpkg.ClusterWorkflowTemplateLintRequest) (*v1alpha1.ClusterWorkflowTemplate, error) {
cwts.instanceIDService.Label(req.Template)
creator.Label(ctx, req.Template)
wfClient := auth.GetWfClient(ctx)
cwftmplGetter := templateresolution.WrapClusterWorkflowTemplateInterface(wfClient.ArgoprojV1alpha1().ClusterWorkflowTemplates())
cwftmplGetter := cwts.cwftmplStore.Getter(ctx)

err := validate.ValidateClusterWorkflowTemplate(nil, cwftmplGetter, req.Template, validate.ValidateOpts{Lint: true})
if err != nil {
Expand All @@ -121,7 +123,7 @@ func (cwts *ClusterWorkflowTemplateServer) UpdateClusterWorkflowTemplate(ctx con
return nil, serverutils.ToStatusError(err, codes.InvalidArgument)
}
wfClient := auth.GetWfClient(ctx)
cwftmplGetter := templateresolution.WrapClusterWorkflowTemplateInterface(wfClient.ArgoprojV1alpha1().ClusterWorkflowTemplates())
cwftmplGetter := cwts.cwftmplStore.Getter(ctx)

err = validate.ValidateClusterWorkflowTemplate(nil, cwftmplGetter, req.Template, validate.ValidateOpts{})
if err != nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ func getClusterWorkflowTemplateServer() (clusterwftmplpkg.ClusterWorkflowTemplat
kubeClientSet := fake.NewSimpleClientset()
wfClientset := wftFake.NewSimpleClientset(&unlabelled, &cwftObj2, &cwftObj3)
ctx := context.WithValue(context.WithValue(context.WithValue(context.TODO(), auth.WfKey, wfClientset), auth.KubeKey, kubeClientSet), auth.ClaimsKey, &types.Claims{Claims: jwt.Claims{Subject: "my-sub"}})
return NewClusterWorkflowTemplateServer(instanceid.NewService("my-instanceid")), ctx
return NewClusterWorkflowTemplateServer(instanceid.NewService("my-instanceid"), nil), ctx
}

func TestWorkflowTemplateServer_CreateClusterWorkflowTemplate(t *testing.T) {
Expand Down
62 changes: 62 additions & 0 deletions server/clusterworkflowtemplate/informer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package clusterworkflowtemplate

import (
"context"
"time"

log "github.com/sirupsen/logrus"

"k8s.io/client-go/dynamic"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"

wfextvv1alpha1 "github.com/argoproj/argo-workflows/v3/pkg/client/informers/externalversions/workflow/v1alpha1"
"github.com/argoproj/argo-workflows/v3/server/types"
"github.com/argoproj/argo-workflows/v3/workflow/controller/informer"
"github.com/argoproj/argo-workflows/v3/workflow/templateresolution"
)

const (
workflowTemplateResyncPeriod = 20 * time.Minute
)

var _ types.ClusterWorkflowTemplateStore = &Informer{}

type Informer struct {
informer wfextvv1alpha1.ClusterWorkflowTemplateInformer
}

func NewInformer(restConfig *rest.Config) (*Informer, error) {
dynamicInterface, err := dynamic.NewForConfig(restConfig)
if err != nil {
return nil, err
}
informer := informer.NewTolerantClusterWorkflowTemplateInformer(
dynamicInterface,
workflowTemplateResyncPeriod,
)
return &Informer{
informer: informer,
}, nil
}

// Start informer in separate go-routine and block until cache sync
func (cwti *Informer) Run(stopCh <-chan struct{}) {

go cwti.informer.Informer().Run(stopCh)

if !cache.WaitForCacheSync(
stopCh,
cwti.informer.Informer().HasSynced,
) {
log.Fatal("Timed out waiting for caches to sync")
}
}

// if namespace contains empty string Lister will use the namespace provided during initialization
func (cwti *Informer) Getter(_ context.Context) templateresolution.ClusterWorkflowTemplateGetter {
if cwti.informer == nil {
log.Fatal("Template informer not started")
}
return cwti.informer.Lister()
}
21 changes: 21 additions & 0 deletions server/clusterworkflowtemplate/wf_client_store.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package clusterworkflowtemplate

import (
"context"

"github.com/argoproj/argo-workflows/v3/server/auth"
"github.com/argoproj/argo-workflows/v3/workflow/templateresolution"
)

// Store is a wrapper around informer
type ClusterWorkflowTemplateClientStore struct {
}

func NewClusterWorkflowTemplateClientStore() *ClusterWorkflowTemplateClientStore {
return &ClusterWorkflowTemplateClientStore{}
}

func (wcs *ClusterWorkflowTemplateClientStore) Getter(ctx context.Context) templateresolution.ClusterWorkflowTemplateGetter {
wfClient := auth.GetWfClient(ctx)
return templateresolution.WrapClusterWorkflowTemplateInterface(wfClient.ArgoprojV1alpha1().ClusterWorkflowTemplates())
}
Loading

0 comments on commit f22ae3b

Please sign in to comment.