From 6c547f96810904a90be853b2f5782c023bfbb5ba Mon Sep 17 00:00:00 2001 From: sudipto baral Date: Fri, 17 Jan 2025 12:28:51 -0500 Subject: [PATCH 1/3] implement mirror for /ws/v1/clusters core API. --- cmd/unicorn-history-server/commands/root.go | 2 +- internal/webservice/routes.go | 24 ++++++++++- internal/webservice/routes_test.go | 48 +++++++++++++++++++++ internal/webservice/webservice.go | 4 ++ internal/yunikorn/client.go | 1 + internal/yunikorn/mock_client.go | 15 +++++++ internal/yunikorn/rest.go | 20 +++++++++ 7 files changed, 112 insertions(+), 2 deletions(-) diff --git a/cmd/unicorn-history-server/commands/root.go b/cmd/unicorn-history-server/commands/root.go index 0bc17312..bc919d0e 100644 --- a/cmd/unicorn-history-server/commands/root.go +++ b/cmd/unicorn-history-server/commands/root.go @@ -72,7 +72,7 @@ func Run(ctx context.Context, cfg *config.Config) error { healthService := health.New(info.Version, health.NewYunikornComponent(client), health.NewPostgresComponent(pool)) - ws := webservice.NewWebService(cfg.UHSConfig, mainRepository, eventRepository, healthService) + ws := webservice.NewWebService(cfg.UHSConfig, mainRepository, eventRepository, healthService, client) g.Add( func() error { return ws.Start(ctx) diff --git a/internal/webservice/routes.go b/internal/webservice/routes.go index ea6ccfb6..b67daaf6 100644 --- a/internal/webservice/routes.go +++ b/internal/webservice/routes.go @@ -42,7 +42,15 @@ var startupTime = time.Now() func (ws *WebService) init(ctx context.Context) { service := new(restful.WebService) - + service.Route( + service.GET(routeClusters). + To(ws.getClusters). + Produces(restful.MIME_JSON). + Writes([]*dao.ClusterDAOInfo{}). + Returns(200, "OK", []*dao.ClusterDAOInfo{}). + Returns(500, "Internal Server Error", ProblemDetails{}). + Doc("Get cluster information"), + ) service.Route( service.GET(routePartitions). To(ws.getPartitions). @@ -387,6 +395,20 @@ func (ws *WebService) getNodesPerPartition(req *restful.Request, resp *restful.R jsonResponse(resp, nodes) } +func (ws *WebService) getClusters(req *restful.Request, resp *restful.Response) { + ctx := req.Request.Context() + clusters, err := ws.client.GetClusters(ctx) + if err != nil { + errorResponse(req, resp, err) + return + } + if clusters == nil { + notFoundResponse(req, resp, fmt.Errorf("no cluster found")) + return + } + jsonResponse(resp, clusters) +} + func (ws *WebService) getAppsHistory(req *restful.Request, resp *restful.Response) { ctx := req.Request.Context() filters, err := parseHistoryFilters(req.Request) diff --git a/internal/webservice/routes_test.go b/internal/webservice/routes_test.go index 4b579c53..e0b38803 100644 --- a/internal/webservice/routes_test.go +++ b/internal/webservice/routes_test.go @@ -10,6 +10,7 @@ import ( "time" "github.com/G-Research/unicorn-history-server/internal/database/repository" + "github.com/G-Research/unicorn-history-server/internal/yunikorn" "github.com/G-Research/yunikorn-core/pkg/webservice/dao" "github.com/emicklei/go-restful/v3" "github.com/stretchr/testify/assert" @@ -545,3 +546,50 @@ func TestGetContainersHistory(t *testing.T) { }) } } + +func TestGetCluster(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + mockClient := yunikorn.NewMockClient(ctrl) + + tests := []struct { + name string + expectedClusters []*dao.ClusterDAOInfo + expectedStatus int + }{ + { + name: "ClusterInfo exists", + expectedClusters: []*dao.ClusterDAOInfo{ + { + StartTime: time.Now().UnixNano(), + ClusterName: "cluster1", + PartitionName: "default", + }, + }, + expectedStatus: http.StatusOK, + }, + { + name: "No Cluster found", + expectedClusters: nil, + expectedStatus: http.StatusNotFound, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + mockClient.EXPECT(). + GetClusters(gomock.Any()). + Return(tt.expectedClusters, nil) + + ws := &WebService{client: mockClient} + + req, err := http.NewRequest(http.MethodGet, "/api/v1/clusters", nil) + require.NoError(t, err) + + rr := httptest.NewRecorder() + + ws.getClusters(restful.NewRequest(req), restful.NewResponse(rr)) + require.Equal(t, tt.expectedStatus, rr.Code) + }) + } +} diff --git a/internal/webservice/webservice.go b/internal/webservice/webservice.go index b8517333..62ae38bd 100644 --- a/internal/webservice/webservice.go +++ b/internal/webservice/webservice.go @@ -10,6 +10,7 @@ import ( "github.com/G-Research/unicorn-history-server/internal/database/repository" "github.com/G-Research/unicorn-history-server/internal/health" "github.com/G-Research/unicorn-history-server/internal/log" + "github.com/G-Research/unicorn-history-server/internal/yunikorn" ) type WebService struct { @@ -18,6 +19,7 @@ type WebService struct { eventRepository repository.EventRepository healthService health.Interface config config.UHSConfig + client yunikorn.Client } func NewWebService( @@ -25,6 +27,7 @@ func NewWebService( repository repository.Repository, eventRepository repository.EventRepository, healthService health.Interface, + client yunikorn.Client, ) *WebService { return &WebService{ server: &http.Server{ @@ -35,6 +38,7 @@ func NewWebService( eventRepository: eventRepository, healthService: healthService, config: cfg, + client: client, } } diff --git a/internal/yunikorn/client.go b/internal/yunikorn/client.go index 8c405125..c8b2118d 100644 --- a/internal/yunikorn/client.go +++ b/internal/yunikorn/client.go @@ -23,4 +23,5 @@ type Client interface { GetContainersHistory(ctx context.Context) ([]*dao.ContainerHistoryDAOInfo, error) GetEventStream(ctx context.Context) (*http.Response, error) Healthcheck(ctx context.Context) (*dao.SchedulerHealthDAOInfo, error) + GetClusters(ctx context.Context) ([]*dao.ClusterDAOInfo, error) } diff --git a/internal/yunikorn/mock_client.go b/internal/yunikorn/mock_client.go index 01012d5b..77f62afb 100644 --- a/internal/yunikorn/mock_client.go +++ b/internal/yunikorn/mock_client.go @@ -87,6 +87,21 @@ func (mr *MockClientMockRecorder) GetAppsHistory(arg0 any) *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetAppsHistory", reflect.TypeOf((*MockClient)(nil).GetAppsHistory), arg0) } +// GetClusters mocks base method. +func (m *MockClient) GetClusters(arg0 context.Context) ([]*dao.ClusterDAOInfo, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetClusters", arg0) + ret0, _ := ret[0].([]*dao.ClusterDAOInfo) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// GetClusters indicates an expected call of GetClusters. +func (mr *MockClientMockRecorder) GetClusters(arg0 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetClusters", reflect.TypeOf((*MockClient)(nil).GetClusters), arg0) +} + // GetContainersHistory mocks base method. func (m *MockClient) GetContainersHistory(arg0 context.Context) ([]*dao.ContainerHistoryDAOInfo, error) { m.ctrl.T.Helper() diff --git a/internal/yunikorn/rest.go b/internal/yunikorn/rest.go index 6d3ad841..9d119653 100644 --- a/internal/yunikorn/rest.go +++ b/internal/yunikorn/rest.go @@ -22,6 +22,7 @@ const ( endpointContainersHistory = "/ws/v1/history/containers" endpointFullStateDump = "/ws/v1/fullstatedump" endpointHealthcheck = "/ws/v1/scheduler/healthcheck" + endpointClusters = "/ws/v1/clusters" ) // RESTClient implements the Client interface which defines functions to interact with the Yunikorn REST API @@ -61,6 +62,25 @@ func (c *RESTClient) GetFullStateDump(ctx context.Context) (*webservice.Aggregat return &state, nil } +func (c *RESTClient) GetClusters(ctx context.Context) ([]*dao.ClusterDAOInfo, error) { + resp, err := c.get(ctx, endpointClusters) + if err != nil { + return nil, err + } + defer closeBody(ctx, resp) + + if resp.StatusCode != 200 { + return nil, handleNonOKResponse(ctx, resp) + } + + var clusters []*dao.ClusterDAOInfo + if err = unmarshallBody(ctx, resp, &clusters); err != nil { + return nil, err + } + + return clusters, nil +} + func (c *RESTClient) GetPartitions(ctx context.Context) ([]*dao.PartitionInfo, error) { resp, err := c.get(ctx, endpointPartitions) if err != nil { From 3faddf0a13fae2e4b6e330becec6db3960764d14 Mon Sep 17 00:00:00 2001 From: sudipto baral Date: Fri, 17 Jan 2025 13:21:53 -0500 Subject: [PATCH 2/3] implement mirror for ws/v1/scheduler/healthcheck core API. --- internal/webservice/routes.go | 24 ++++++++++--- internal/webservice/routes_test.go | 57 ++++++++++++++++++++++++++++++ 2 files changed, 77 insertions(+), 4 deletions(-) diff --git a/internal/webservice/routes.go b/internal/webservice/routes.go index b67daaf6..3e9625d1 100644 --- a/internal/webservice/routes.go +++ b/internal/webservice/routes.go @@ -182,12 +182,12 @@ func (ws *WebService) init(ctx context.Context) { ) service.Route( service.GET(routeSchedulerHealthcheck). - To(ws.LivenessHealthcheck). + To(ws.schedulerHealthcheck). Produces(restful.MIME_JSON). - Writes(health.LivenessStatus{}). - Returns(200, "OK", health.LivenessStatus{}). + Writes(dao.SchedulerHealthDAOInfo{}). + Returns(200, "OK", dao.SchedulerHealthDAOInfo{}). Returns(500, "Internal Server Error", ProblemDetails{}). - Doc("Scheduler liveness healthcheck"), + Doc("Scheduler healthcheck"), ) service.Route( service.GET(routeHealthLiveness). @@ -396,6 +396,7 @@ func (ws *WebService) getNodesPerPartition(req *restful.Request, resp *restful.R } func (ws *WebService) getClusters(req *restful.Request, resp *restful.Response) { + // mirror of yunikorn-core ws/v1/clusters ctx := req.Request.Context() clusters, err := ws.client.GetClusters(ctx) if err != nil { @@ -458,6 +459,21 @@ func (ws *WebService) getEventStatistics(req *restful.Request, resp *restful.Res jsonResponse(resp, counts) } +func (ws *WebService) schedulerHealthcheck(req *restful.Request, resp *restful.Response) { + // mirror of yunikorn-core ws/v1/scheduler/healthcheck + ctx := req.Request.Context() + healthCheck, err := ws.client.Healthcheck(ctx) + if err != nil { + errorResponse(req, resp, err) + return + } + if healthCheck == nil { + notFoundResponse(req, resp, fmt.Errorf("no healthcheck data found")) + return + } + jsonResponse(resp, healthCheck) +} + func (ws *WebService) LivenessHealthcheck(req *restful.Request, resp *restful.Response) { ctx := req.Request.Context() jsonResponse(resp, ws.healthService.Liveness(ctx)) diff --git a/internal/webservice/routes_test.go b/internal/webservice/routes_test.go index e0b38803..36ace9c0 100644 --- a/internal/webservice/routes_test.go +++ b/internal/webservice/routes_test.go @@ -593,3 +593,60 @@ func TestGetCluster(t *testing.T) { }) } } + +func TestSchedulerHealthcheck(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + mockClient := yunikorn.NewMockClient(ctrl) + + tests := []struct { + name string + expected *dao.SchedulerHealthDAOInfo + expectedStatus int + }{ + { + name: "SchedulerHealthDAOInfo exists", + expected: &dao.SchedulerHealthDAOInfo{ + Healthy: true, + HealthChecks: []dao.HealthCheckInfo{ + { + Name: "Scheduling errors", + Succeeded: true, + Description: "Check for scheduling error entries in metrics", + DiagnosisMessage: "There were 0 scheduling errors logged in the metrics", + }, + { + Name: "Failed nodes", + Succeeded: true, + Description: "Check for failed nodes entries in metrics", + DiagnosisMessage: "There were 0 failed nodes logged in the metrics", + }, + }, + }, + expectedStatus: http.StatusOK, + }, + { + name: "No SchedulerHealthDAOInfo found", + expected: nil, + expectedStatus: http.StatusNotFound, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + mockClient.EXPECT(). + Healthcheck(gomock.Any()). + Return(tt.expected, nil) + + ws := &WebService{client: mockClient} + + req, err := http.NewRequest(http.MethodGet, "/api/v1/scheduler/healthcheck", nil) + require.NoError(t, err) + + rr := httptest.NewRecorder() + + ws.schedulerHealthcheck(restful.NewRequest(req), restful.NewResponse(rr)) + require.Equal(t, tt.expectedStatus, rr.Code) + }) + } +} From d11ba26e7f75ea15eba1e424919662b60ab0f961 Mon Sep 17 00:00:00 2001 From: sudipto baral Date: Fri, 17 Jan 2025 14:21:30 -0500 Subject: [PATCH 3/3] implement mirror for ws/v1/scheduler/node-utilizations core API. --- internal/webservice/routes.go | 25 +++++++++++ internal/webservice/routes_test.go | 67 ++++++++++++++++++++++++++++++ internal/yunikorn/client.go | 1 + internal/yunikorn/mock_client.go | 15 +++++++ internal/yunikorn/rest.go | 20 +++++++++ 5 files changed, 128 insertions(+) diff --git a/internal/webservice/routes.go b/internal/webservice/routes.go index 3e9625d1..c9978f18 100644 --- a/internal/webservice/routes.go +++ b/internal/webservice/routes.go @@ -32,6 +32,7 @@ const ( routeAppsHistory = "/api/v1/history/apps" routeContainersHistory = "/api/v1/history/containers" routeNodesPerPartition = "/api/v1/partition/{partition_id}/nodes" + routeNodeUtilizations = "/api/v1/scheduler/node-utilizations" routeSchedulerHealthcheck = "/api/v1/scheduler/healthcheck" routeEventStatistics = "/api/v1/event-statistics" routeHealthLiveness = "/api/v1/health/liveness" @@ -180,6 +181,15 @@ func (ws *WebService) init(ctx context.Context) { Returns(500, "Internal Server Error", ProblemDetails{}). Doc("Get event statistics"), ) + service.Route( + service.GET(routeNodeUtilizations). + To(ws.getNodeUtilizations). + Produces(restful.MIME_JSON). + Writes([]*dao.PartitionNodesUtilDAOInfo{}). + Returns(200, "OK", []*dao.PartitionNodesUtilDAOInfo{}). + Returns(500, "Internal Server Error", ProblemDetails{}). + Doc("Get node utilization information"), + ) service.Route( service.GET(routeSchedulerHealthcheck). To(ws.schedulerHealthcheck). @@ -459,6 +469,21 @@ func (ws *WebService) getEventStatistics(req *restful.Request, resp *restful.Res jsonResponse(resp, counts) } +func (ws *WebService) getNodeUtilizations(req *restful.Request, resp *restful.Response) { + // mirror of yunikorn-core ws/v1/scheduler/node-utilizations + ctx := req.Request.Context() + nu, err := ws.client.NodeUtilizations(ctx) + if err != nil { + errorResponse(req, resp, err) + return + } + if nu == nil { + notFoundResponse(req, resp, fmt.Errorf("no node utilizations data found")) + return + } + jsonResponse(resp, nu) +} + func (ws *WebService) schedulerHealthcheck(req *restful.Request, resp *restful.Response) { // mirror of yunikorn-core ws/v1/scheduler/healthcheck ctx := req.Request.Context() diff --git a/internal/webservice/routes_test.go b/internal/webservice/routes_test.go index 36ace9c0..ba6faa9c 100644 --- a/internal/webservice/routes_test.go +++ b/internal/webservice/routes_test.go @@ -650,3 +650,70 @@ func TestSchedulerHealthcheck(t *testing.T) { }) } } + +func TestNodeUtilizations(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + mockClient := yunikorn.NewMockClient(ctrl) + + tests := []struct { + name string + expected []*dao.PartitionNodesUtilDAOInfo + expectedStatus int + }{ + { + name: "NodeUtilizations exist", + expected: []*dao.PartitionNodesUtilDAOInfo{ + { + ClusterID: "mycluster", + Partition: "default", + NodesUtilList: []*dao.NodesUtilDAOInfo{ + { + ResourceType: "memory", + NodesUtil: []*dao.NodeUtilDAOInfo{ + { + BucketName: "0-10%", + NumOfNodes: 1, + NodeNames: []string{"uhs-control-plane"}, + }, + {BucketName: "10-20%"}, + {BucketName: "20-30%"}, + {BucketName: "30-40%"}, + {BucketName: "40-50%"}, + {BucketName: "50-60%"}, + {BucketName: "60-70%"}, + {BucketName: "70-80%"}, + {BucketName: "80-90%"}, + {BucketName: "90-100%"}, + }, + }, + }, + }, + }, + expectedStatus: http.StatusOK, + }, + { + name: "No NodeUtilizations found", + expected: nil, + expectedStatus: http.StatusNotFound, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + mockClient.EXPECT(). + NodeUtilizations(gomock.Any()). + Return(tt.expected, nil) + + ws := &WebService{client: mockClient} + + req, err := http.NewRequest(http.MethodGet, "/api/v1/scheduler/node-utilizations", nil) + require.NoError(t, err) + + rr := httptest.NewRecorder() + + ws.getNodeUtilizations(restful.NewRequest(req), restful.NewResponse(rr)) + require.Equal(t, tt.expectedStatus, rr.Code) + }) + } +} diff --git a/internal/yunikorn/client.go b/internal/yunikorn/client.go index c8b2118d..fcdde468 100644 --- a/internal/yunikorn/client.go +++ b/internal/yunikorn/client.go @@ -23,5 +23,6 @@ type Client interface { GetContainersHistory(ctx context.Context) ([]*dao.ContainerHistoryDAOInfo, error) GetEventStream(ctx context.Context) (*http.Response, error) Healthcheck(ctx context.Context) (*dao.SchedulerHealthDAOInfo, error) + NodeUtilizations(ctx context.Context) ([]*dao.PartitionNodesUtilDAOInfo, error) GetClusters(ctx context.Context) ([]*dao.ClusterDAOInfo, error) } diff --git a/internal/yunikorn/mock_client.go b/internal/yunikorn/mock_client.go index 77f62afb..bb9880e7 100644 --- a/internal/yunikorn/mock_client.go +++ b/internal/yunikorn/mock_client.go @@ -221,3 +221,18 @@ func (mr *MockClientMockRecorder) Healthcheck(arg0 any) *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Healthcheck", reflect.TypeOf((*MockClient)(nil).Healthcheck), arg0) } + +// NodeUtilizations mocks base method. +func (m *MockClient) NodeUtilizations(arg0 context.Context) ([]*dao.PartitionNodesUtilDAOInfo, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "NodeUtilizations", arg0) + ret0, _ := ret[0].([]*dao.PartitionNodesUtilDAOInfo) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// NodeUtilizations indicates an expected call of NodeUtilizations. +func (mr *MockClientMockRecorder) NodeUtilizations(arg0 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "NodeUtilizations", reflect.TypeOf((*MockClient)(nil).NodeUtilizations), arg0) +} diff --git a/internal/yunikorn/rest.go b/internal/yunikorn/rest.go index 9d119653..527f7a6c 100644 --- a/internal/yunikorn/rest.go +++ b/internal/yunikorn/rest.go @@ -22,6 +22,7 @@ const ( endpointContainersHistory = "/ws/v1/history/containers" endpointFullStateDump = "/ws/v1/fullstatedump" endpointHealthcheck = "/ws/v1/scheduler/healthcheck" + endpointNodeUtilizations = "/ws/v1/scheduler/node-utilizations" endpointClusters = "/ws/v1/clusters" ) @@ -278,6 +279,25 @@ func (c *RESTClient) Healthcheck(ctx context.Context) (*dao.SchedulerHealthDAOIn return &schedulerHealth, nil } +func (c *RESTClient) NodeUtilizations(ctx context.Context) ([]*dao.PartitionNodesUtilDAOInfo, error) { + resp, err := c.get(ctx, endpointNodeUtilizations) + if err != nil { + return nil, err + } + defer closeBody(ctx, resp) + + if resp.StatusCode != 200 { + return nil, handleNonOKResponse(ctx, resp) + } + + var nodeUtilizations []*dao.PartitionNodesUtilDAOInfo + if err = unmarshallBody(ctx, resp, &nodeUtilizations); err != nil { + return nil, err + } + + return nodeUtilizations, nil +} + func (c *RESTClient) GetEventStream(ctx context.Context) (*http.Response, error) { resp, err := c.get(ctx, endpointStream) if err != nil {