Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement collection GETs for node pools #883

Merged
merged 6 commits into from
Jan 8, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
116 changes: 83 additions & 33 deletions frontend/pkg/frontend/frontend.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"net"
"net/http"
"os"
"path"
"strconv"
"strings"
"sync/atomic"
Expand Down Expand Up @@ -175,6 +176,8 @@ func (f *Frontend) ArmResourceList(writer http.ResponseWriter, request *http.Req

subscriptionID := request.PathValue(PathSegmentSubscriptionID)
resourceGroupName := request.PathValue(PathSegmentResourceGroupName)
resourceName := request.PathValue(PathSegmentResourceName)
resourceTypeName := path.Base(request.URL.Path)

// Even though the bulk of the list content comes from Cluster Service,
// we start by querying Cosmos DB because its continuation token meets
Expand All @@ -185,24 +188,45 @@ func (f *Frontend) ArmResourceList(writer http.ResponseWriter, request *http.Req
if resourceGroupName != "" {
prefixString += "/resourceGroups/" + resourceGroupName
}
if resourceName != "" {
// This is a nested resource request. Build a resource ID for
// the parent cluster. We use this below to get the cluster's
// ResourceDocument from Cosmos DB.
prefixString += "/providers/" + api.ProviderNamespace
prefixString += "/" + api.ClusterResourceTypeName + "/" + resourceName
}
prefix, err := arm.ParseResourceID(prefixString)
if err != nil {
f.logger.Error(err.Error())
arm.WriteInternalServerError(writer)
return
}

documentList, continuationToken, err := f.dbClient.ListResourceDocs(ctx, prefix, &api.ClusterResourceType, pageSizeHint, continuationToken)
if err != nil {
f.logger.Error(err.Error())
arm.WriteInternalServerError(writer)
return
}
dbIterator := f.dbClient.ListResourceDocs(ctx, prefix, pageSizeHint, continuationToken)

// Build a map of cluster documents by Cluster Service cluster ID.
documentMap := make(map[string]*database.ResourceDocument)
for _, doc := range documentList {
documentMap[doc.InternalID.ID()] = doc
for item := range dbIterator.Items(ctx) {
var doc database.ResourceDocument

err = json.Unmarshal(item, &doc)
if err != nil {
f.logger.Error(err.Error())
arm.WriteInternalServerError(writer)
return
}

// FIXME This filtering could be made part of the query expression. It would
// require some reworking (or elimination) of the DBClient interface.
if strings.HasSuffix(strings.ToLower(doc.Key.ResourceType.Type), resourceTypeName) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this be done with a WHERE clause at the DB layer?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Technically yes, but see my in-memory cache answer below.

I'm going to leave this as is for this PR but I added a "FIXME" comment here to circle back to it once I've gotten rid of the generic database interface I described below.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good. Not being able to delegate this kind of thing to the DB because of the vestigial in-memory implementation IMO makes getting rid of it a good use of time before long.

documentMap[doc.InternalID.ID()] = &doc
}
}

err = dbIterator.GetError()
if err != nil {
f.logger.Error(err.Error())
arm.WriteInternalServerError(writer)
}

// Build a Cluster Service query that looks for
Expand All @@ -214,39 +238,65 @@ func (f *Frontend) ArmResourceList(writer http.ResponseWriter, request *http.Req
query := fmt.Sprintf("id in (%s)", strings.Join(queryIDs, ", "))
f.logger.Info(fmt.Sprintf("Searching Cluster Service for %q", query))

listRequest := f.clusterServiceClient.GetConn().ClustersMgmt().V1().Clusters().List().Search(query)

// XXX This SHOULD avoid dealing with pagination from Cluster Service.
// As far I can tell, uhc-cluster-service does not impose its own
// limit on the page size. Further testing is needed to verify.
listRequest.Size(len(documentMap))

listResponse, err := listRequest.SendContext(ctx)
if err != nil {
f.logger.Error(err.Error())
arm.WriteInternalServerError(writer)
return
}

for _, csCluster := range listResponse.Items().Slice() {
if doc, ok := documentMap[csCluster.ID()]; ok {
value, err := marshalCSCluster(csCluster, doc, versionedInterface)
if err != nil {
f.logger.Error(err.Error())
arm.WriteInternalServerError(writer)
return
switch resourceTypeName {
case strings.ToLower(api.ClusterResourceTypeName):
csIterator := f.clusterServiceClient.ListCSClusters(query)

for csCluster := range csIterator.Items(ctx) {
if doc, ok := documentMap[csCluster.ID()]; ok {
value, err := marshalCSCluster(csCluster, doc, versionedInterface)
if err != nil {
f.logger.Error(err.Error())
arm.WriteInternalServerError(writer)
return
}
pagedResponse.AddValue(value)
}
pagedResponse.AddValue(value)
}
}
err = csIterator.GetError()

case strings.ToLower(api.NodePoolResourceTypeName):
var resourceDoc *database.ResourceDocument

if continuationToken != nil {
err = pagedResponse.SetNextLink(request.Referer(), *continuationToken)
// Fetch the cluster document for the Cluster Service ID.
resourceDoc, err = f.dbClient.GetResourceDoc(ctx, prefix)
if err != nil {
f.logger.Error(err.Error())
arm.WriteInternalServerError(writer)
return
}

csIterator := f.clusterServiceClient.ListCSNodePools(resourceDoc.InternalID, query)

for csNodePool := range csIterator.Items(ctx) {
if doc, ok := documentMap[csNodePool.ID()]; ok {
value, err := marshalCSNodePool(csNodePool, doc, versionedInterface)
if err != nil {
f.logger.Error(err.Error())
arm.WriteInternalServerError(writer)
return
}
pagedResponse.AddValue(value)
}
}
err = csIterator.GetError()

default:
err = fmt.Errorf("unsupported resource type: %s", resourceTypeName)
}

// Check for iteration error.
if err != nil {
f.logger.Error(err.Error())
arm.WriteInternalServerError(writer)
return
}

err = pagedResponse.SetNextLink(request.Referer(), dbIterator.GetContinuationToken())
if err != nil {
f.logger.Error(err.Error())
arm.WriteInternalServerError(writer)
return
}

_, err = arm.WriteJSONResponse(writer, http.StatusOK, pagedResponse)
Expand Down
11 changes: 11 additions & 0 deletions frontend/pkg/frontend/middleware_resourceid_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,17 @@ func TestMiddlewareResourceID(t *testing.T) {
"Microsoft.Resources/tenants",
},
},
{
name: "node pool collection",
path: "/SUBSCRIPTIONS/00000000-0000-0000-0000-000000000000/RESOURCEGROUPS/MyResourceGroup/PROVIDERS/MICROSOFT.REDHATOPENSHIFT/HCPOPENSHIFTCLUSTERS/myCluster/NODEPOOLS",
resourceTypes: []string{
"MICROSOFT.REDHATOPENSHIFT/HCPOPENSHIFTCLUSTERS/NODEPOOLS",
"MICROSOFT.REDHATOPENSHIFT/HCPOPENSHIFTCLUSTERS",
"Microsoft.Resources/resourceGroups",
"Microsoft.Resources/subscriptions",
"Microsoft.Resources/tenants",
},
},
{
name: "preflight deployment",
path: "/SUBSCRIPTIONS/00000000-0000-0000-0000-000000000000/RESOURCEGROUPS/MyResourceGroup/PROVIDERS/MICROSOFT.REDHATOPENSHIFT/DEPLOYMENTS/preflight",
Expand Down
47 changes: 22 additions & 25 deletions frontend/pkg/frontend/middleware_validatestatic.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@ import (
var rxHCPOpenShiftClusterResourceName = regexp.MustCompile(`^[a-zA-Z][a-zA-Z0-9-]{2,53}$`)
var rxNodePoolResourceName = regexp.MustCompile(`^[a-zA-Z][a-zA-Z0-9-]{2,14}$`)

var resourceTypeSubscription = "Microsoft.Resources/subscriptions"

func MiddlewareValidateStatic(w http.ResponseWriter, r *http.Request, next http.HandlerFunc) {
// To conform with "OAPI012: Resource IDs must not be case sensitive"
// we need to use the original, non-lowercased resource ID components
Expand All @@ -40,29 +38,28 @@ func MiddlewareValidateStatic(w http.ResponseWriter, r *http.Request, next http.
}
}

// Skip static validation for subscription resources
if !strings.EqualFold(resource.ResourceType.String(), resourceTypeSubscription) {
switch strings.ToLower(resource.ResourceType.Type) {
case strings.ToLower(api.ClusterResourceType.Type):
if !rxHCPOpenShiftClusterResourceName.MatchString(resource.Name) {
arm.WriteError(w, http.StatusBadRequest,
arm.CloudErrorCodeInvalidResourceName,
resource.String(),
"The Resource '%s/%s' under resource group '%s' does not conform to the naming restriction.",
resource.ResourceType, resource.Name,
resource.ResourceGroupName)
return
}
case strings.ToLower(api.NodePoolResourceType.Type):
if !rxNodePoolResourceName.MatchString(resource.Name) {
arm.WriteError(w, http.StatusBadRequest,
arm.CloudErrorCodeInvalidResourceName,
resource.String(),
"The Resource '%s/%s' under resource group '%s' does not conform to the naming restriction.",
resource.ResourceType, resource.Name,
resource.ResourceGroupName)
return
}
switch strings.ToLower(resource.ResourceType.Type) {
case strings.ToLower(api.ClusterResourceType.Type):
if !rxHCPOpenShiftClusterResourceName.MatchString(resource.Name) {
arm.WriteError(w, http.StatusBadRequest,
arm.CloudErrorCodeInvalidResourceName,
resource.String(),
"The Resource '%s/%s' under resource group '%s' does not conform to the naming restriction.",
resource.ResourceType, resource.Name,
resource.ResourceGroupName)
return
}
case strings.ToLower(api.NodePoolResourceType.Type):
// The collection GET endpoint for nested resources
// parses into a ResourceID with an empty Name field.
if resource.Name != "" && !rxNodePoolResourceName.MatchString(resource.Name) {
arm.WriteError(w, http.StatusBadRequest,
arm.CloudErrorCodeInvalidResourceName,
resource.String(),
"The Resource '%s/%s' under resource group '%s' does not conform to the naming restriction.",
resource.ResourceType, resource.Name,
resource.ResourceGroupName)
return
}
}
}
Expand Down
3 changes: 3 additions & 0 deletions frontend/pkg/frontend/routes.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,9 @@ func (f *Frontend) routes() *MiddlewareMux {
mux.Handle(
MuxPattern(http.MethodGet, PatternSubscriptions, PatternResourceGroups, PatternProviders, api.ClusterResourceTypeName),
postMuxMiddleware.HandlerFunc(f.ArmResourceList))
mux.Handle(
MuxPattern(http.MethodGet, PatternSubscriptions, PatternResourceGroups, PatternProviders, PatternClusters, api.NodePoolResourceTypeName),
postMuxMiddleware.HandlerFunc(f.ArmResourceList))

// Resource ID endpoints
// Request context holds an azcorearm.ResourceID
Expand Down
34 changes: 19 additions & 15 deletions internal/database/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,6 @@ import (
"iter"
"strings"

azcorearm "github.com/Azure/azure-sdk-for-go/sdk/azcore/arm"

"github.com/Azure/ARO-HCP/internal/api/arm"
)

Expand All @@ -24,14 +22,14 @@ type Cache struct {
subscription map[string]*SubscriptionDocument
}

type operationCacheIterator struct {
operation map[string]*OperationDocument
err error
type cacheIterator struct {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For my understanding, why is there an in-memory cache for CosmosDB? Was there a particular REST p95 that couldn't be reached without caching?

Copy link
Collaborator Author

@mbarnes mbarnes Nov 25, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's there for historical reasons, from early in development before we had access to Cosmos DB. Both the Cosmos DB client and the in-memory cache are hidden behind a generic database interface. Once we got Cosmos DB access it was decided to keep the interface and cache around as a unit test aid, and for a time it was helpful when our database use cases were simple. But it's technical debt now; it's become an obstacle to leveraging more advanced features of Cosmos DB. I'm honestly ready to ditch it and just use gomock or some such for unit tests, but I'll need to find time to pay that debt.

docs []any
err error
}

func (iter operationCacheIterator) Items(ctx context.Context) iter.Seq[[]byte] {
func (iter cacheIterator) Items(ctx context.Context) iter.Seq[[]byte] {
return func(yield func([]byte) bool) {
for _, doc := range iter.operation {
for _, doc := range iter.docs {
// Marshalling the document struct only to immediately unmarshal
// it back to a document struct is a little silly but this is to
// conform to the DBClientIterator interface.
Expand All @@ -48,7 +46,11 @@ func (iter operationCacheIterator) Items(ctx context.Context) iter.Seq[[]byte] {
}
}

func (iter operationCacheIterator) GetError() error {
func (iter cacheIterator) GetContinuationToken() string {
return ""
}

func (iter cacheIterator) GetError() error {
return iter.err
}

Expand Down Expand Up @@ -108,21 +110,19 @@ func (c *Cache) DeleteResourceDoc(ctx context.Context, resourceID *arm.ResourceI
return nil
}

func (c *Cache) ListResourceDocs(ctx context.Context, prefix *arm.ResourceID, resourceType *azcorearm.ResourceType, pageSizeHint int32, continuationToken *string) ([]*ResourceDocument, *string, error) {
var resourceList []*ResourceDocument
func (c *Cache) ListResourceDocs(ctx context.Context, prefix *arm.ResourceID, maxItems int32, continuationToken *string) DBClientIterator {
var iterator cacheIterator

// Make sure key prefix is lowercase.
prefixString := strings.ToLower(prefix.String() + "/")

for key, doc := range c.resource {
if strings.HasPrefix(key, prefixString) {
if resourceType == nil || strings.EqualFold(resourceType.String(), doc.Key.ResourceType.String()) {
resourceList = append(resourceList, doc)
}
iterator.docs = append(iterator.docs, doc)
}
}

return resourceList, nil, nil
return iterator
}

func (c *Cache) GetOperationDoc(ctx context.Context, operationID string) (*OperationDocument, error) {
Expand Down Expand Up @@ -164,7 +164,11 @@ func (c *Cache) DeleteOperationDoc(ctx context.Context, operationID string) erro
}

func (c *Cache) ListAllOperationDocs(ctx context.Context) DBClientIterator {
return operationCacheIterator{operation: c.operation}
var iterator cacheIterator
for _, doc := range c.operation {
iterator.docs = append(iterator.docs, doc)
}
return iterator
}

func (c *Cache) GetSubscriptionDoc(ctx context.Context, subscriptionID string) (*SubscriptionDocument, error) {
Expand Down
Loading
Loading