diff --git a/internal/database/repository/application_int_test.go b/internal/database/repository/application_int_test.go index 40118fac..c47afa17 100644 --- a/internal/database/repository/application_int_test.go +++ b/internal/database/repository/application_int_test.go @@ -16,13 +16,13 @@ import ( "github.com/G-Research/unicorn-history-server/internal/util" ) -type ApplicationTestSuite struct { +type ApplicationIntTest struct { suite.Suite pool *pgxpool.Pool repo *PostgresRepository } -func (as *ApplicationTestSuite) SetupSuite() { +func (as *ApplicationIntTest) SetupSuite() { ctx := context.Background() require.NotNil(as.T(), as.pool) repo, err := NewPostgresRepository(as.pool) @@ -32,11 +32,11 @@ func (as *ApplicationTestSuite) SetupSuite() { seedApplications(ctx, as.T(), as.repo) } -func (as *ApplicationTestSuite) TearDownSuite() { +func (as *ApplicationIntTest) TearDownSuite() { as.pool.Close() } -func (as *ApplicationTestSuite) TestGetAllApplications() { +func (as *ApplicationIntTest) TestGetAllApplications() { ctx := context.Background() tests := []struct { name string @@ -106,7 +106,7 @@ func (as *ApplicationTestSuite) TestGetAllApplications() { } } -func (as *ApplicationTestSuite) TestGetAppsPerPartitionPerQueue() { +func (as *ApplicationIntTest) TestGetAppsPerPartitionPerQueue() { ctx := context.Background() tests := []struct { name string diff --git a/internal/database/repository/history_int_test.go b/internal/database/repository/history_int_test.go index 7fa1198b..2d2ea333 100644 --- a/internal/database/repository/history_int_test.go +++ b/internal/database/repository/history_int_test.go @@ -15,13 +15,13 @@ import ( "github.com/G-Research/unicorn-history-server/internal/util" ) -type HistoryTestSuite struct { +type HistoryIntTest struct { suite.Suite pool *pgxpool.Pool repo *PostgresRepository } -func (hs *HistoryTestSuite) SetupSuite() { +func (hs *HistoryIntTest) SetupSuite() { ctx := context.Background() require.NotNil(hs.T(), hs.pool) repo, err := NewPostgresRepository(hs.pool) @@ -31,11 +31,11 @@ func (hs *HistoryTestSuite) SetupSuite() { seedHistory(ctx, hs.T(), hs.repo) } -func (hs *HistoryTestSuite) TearDownSuite() { +func (hs *HistoryIntTest) TearDownSuite() { hs.pool.Close() } -func (hs *HistoryTestSuite) TestGetApplicationsHistory() { +func (hs *HistoryIntTest) TestGetApplicationsHistory() { ctx := context.Background() tests := []struct { name string @@ -76,7 +76,7 @@ func (hs *HistoryTestSuite) TestGetApplicationsHistory() { } } -func (hs *HistoryTestSuite) TestGetContainersHistory() { +func (hs *HistoryIntTest) TestGetContainersHistory() { ctx := context.Background() tests := []struct { name string diff --git a/internal/database/repository/node_int_test.go b/internal/database/repository/node_int_test.go index 702d3e24..d69fae52 100644 --- a/internal/database/repository/node_int_test.go +++ b/internal/database/repository/node_int_test.go @@ -16,7 +16,7 @@ import ( "github.com/G-Research/unicorn-history-server/internal/util" ) -type NodeTestSuite struct { +type NodeIntTest struct { suite.Suite pool *pgxpool.Pool repo *PostgresRepository @@ -24,7 +24,7 @@ type NodeTestSuite struct { var partitionID = ulid.Make().String() -func (ns *NodeTestSuite) SetupSuite() { +func (ns *NodeIntTest) SetupSuite() { ctx := context.Background() require.NotNil(ns.T(), ns.pool) repo, err := NewPostgresRepository(ns.pool) @@ -34,11 +34,11 @@ func (ns *NodeTestSuite) SetupSuite() { seedNodes(ctx, ns.T(), ns.repo) } -func (ns *NodeTestSuite) TearDownSuite() { +func (ns *NodeIntTest) TearDownSuite() { ns.pool.Close() } -func (ns *NodeTestSuite) TestGetNodesPerPartition() { +func (ns *NodeIntTest) TestGetNodesPerPartition() { ctx := context.Background() tests := []struct { name string diff --git a/internal/database/repository/partitions_init_test.go b/internal/database/repository/partitions_init_test.go index 2d0e26e0..66764f39 100644 --- a/internal/database/repository/partitions_init_test.go +++ b/internal/database/repository/partitions_init_test.go @@ -14,13 +14,13 @@ import ( "github.com/G-Research/unicorn-history-server/internal/util" ) -type PartitionTestSuite struct { +type PartitionIntTest struct { suite.Suite pool *pgxpool.Pool repo *PostgresRepository } -func (ps *PartitionTestSuite) SetupSuite() { +func (ps *PartitionIntTest) SetupSuite() { ctx := context.Background() require.NotNil(ps.T(), ps.pool) repo, err := NewPostgresRepository(ps.pool) @@ -30,11 +30,11 @@ func (ps *PartitionTestSuite) SetupSuite() { seedPartitions(ctx, ps.T(), ps.repo) } -func (ps *PartitionTestSuite) TearDownSuite() { +func (ps *PartitionIntTest) TearDownSuite() { ps.pool.Close() } -func (ps *PartitionTestSuite) TestGetAllPartitions() { +func (ps *PartitionIntTest) TestGetAllPartitions() { ctx := context.Background() tests := []struct { name string diff --git a/internal/database/repository/queue_int_test.go b/internal/database/repository/queue_int_test.go index 9d146a1d..309d1088 100644 --- a/internal/database/repository/queue_int_test.go +++ b/internal/database/repository/queue_int_test.go @@ -13,13 +13,13 @@ import ( "github.com/stretchr/testify/suite" ) -type QueueTestSuite struct { +type QueueIntTest struct { suite.Suite pool *pgxpool.Pool repo *PostgresRepository } -func (qs *QueueTestSuite) SetupSuite() { +func (qs *QueueIntTest) SetupSuite() { require.NotNil(qs.T(), qs.pool) repo, err := NewPostgresRepository(qs.pool) require.NoError(qs.T(), err) @@ -28,11 +28,11 @@ func (qs *QueueTestSuite) SetupSuite() { seedQueues(qs.T(), qs.repo) } -func (qs *QueueTestSuite) TearDownSuite() { +func (qs *QueueIntTest) TearDownSuite() { qs.pool.Close() } -func (qs *QueueTestSuite) TestGetAllQueues() { +func (qs *QueueIntTest) TestGetAllQueues() { ctx := context.Background() tests := []struct { name string @@ -53,7 +53,7 @@ func (qs *QueueTestSuite) TestGetAllQueues() { } } -func (qs *QueueTestSuite) TestGetQueuesInPartition() { +func (qs *QueueIntTest) TestGetQueuesInPartition() { ctx := context.Background() tests := []struct { name string @@ -81,7 +81,7 @@ func (qs *QueueTestSuite) TestGetQueuesInPartition() { } } -func (qs *QueueTestSuite) TestGetQueue() { +func (qs *QueueIntTest) TestGetQueue() { ctx := context.Background() tests := []struct { name string @@ -119,7 +119,7 @@ func (qs *QueueTestSuite) TestGetQueue() { } } -func (qs *QueueTestSuite) TestDeleteQueues() { +func (qs *QueueIntTest) TestDeleteQueues() { ctx := context.Background() tests := []struct { name string @@ -163,7 +163,7 @@ func (qs *QueueTestSuite) TestDeleteQueues() { } } -func (qs *QueueTestSuite) TestUpdateQueue() { +func (qs *QueueIntTest) TestUpdateQueue() { ctx := context.Background() now := time.Now() tests := []struct { diff --git a/internal/database/repository/repo_int_test.go b/internal/database/repository/repo_suite_test.go similarity index 56% rename from internal/database/repository/repo_int_test.go rename to internal/database/repository/repo_suite_test.go index cda6fb1c..db1b3bf6 100644 --- a/internal/database/repository/repo_int_test.go +++ b/internal/database/repository/repo_suite_test.go @@ -11,13 +11,13 @@ import ( "github.com/G-Research/unicorn-history-server/test/database" ) -type RepositoryTestSuite struct { +type RepositorySuite struct { suite.Suite tp *database.TestPostgresContainer pool *pgxpool.Pool } -func (ts *RepositoryTestSuite) SetupSuite() { +func (ts *RepositorySuite) SetupSuite() { ctx := context.Background() cfg := database.InstanceConfig{ User: "test", @@ -36,35 +36,38 @@ func (ts *RepositoryTestSuite) SetupSuite() { ts.pool = tp.Pool(ctx, ts.T(), &cfg) } -func (ts *RepositoryTestSuite) TearDownSuite() { +func (ts *RepositorySuite) TearDownSuite() { err := ts.tp.Container.Terminate(context.Background()) require.NoError(ts.T(), err) } -func (ts *RepositoryTestSuite) TestSubSuites() { - ts.T().Run("ApplicationTestSuite", func(t *testing.T) { +func (ts *RepositorySuite) TestSubSuites() { + ts.T().Run("ApplicationIntTest", func(t *testing.T) { pool := database.CloneDB(t, ts.tp, ts.pool) - suite.Run(t, &ApplicationTestSuite{pool: pool}) + suite.Run(t, &ApplicationIntTest{pool: pool}) }) - ts.T().Run("HistoryTestSuite", func(t *testing.T) { + ts.T().Run("HistoryIntTest", func(t *testing.T) { pool := database.CloneDB(t, ts.tp, ts.pool) - suite.Run(t, &HistoryTestSuite{pool: pool}) + suite.Run(t, &HistoryIntTest{pool: pool}) }) - ts.T().Run("NodeTestSuite", func(t *testing.T) { + ts.T().Run("NodeIntTest", func(t *testing.T) { pool := database.CloneDB(t, ts.tp, ts.pool) - suite.Run(t, &NodeTestSuite{pool: pool}) + suite.Run(t, &NodeIntTest{pool: pool}) }) - ts.T().Run("QueueTestSuite", func(t *testing.T) { + ts.T().Run("QueueIntTest", func(t *testing.T) { pool := database.CloneDB(t, ts.tp, ts.pool) - suite.Run(t, &QueueTestSuite{pool: pool}) + suite.Run(t, &QueueIntTest{pool: pool}) }) - ts.T().Run("PartitionTestSuite", func(t *testing.T) { + ts.T().Run("PartitionIntTest", func(t *testing.T) { pool := database.CloneDB(t, ts.tp, ts.pool) - suite.Run(t, &PartitionTestSuite{pool: pool}) + suite.Run(t, &PartitionIntTest{pool: pool}) }) } -func TestRepositoryIntegrationTestSuite(t *testing.T) { - topSuite := new(RepositoryTestSuite) +func TestRepositoryIntegration(t *testing.T) { + if testing.Short() { + t.Skip("skipping integration test in short mode.") + } + topSuite := new(RepositorySuite) suite.Run(t, topSuite) } diff --git a/internal/database/repository/repository.go b/internal/database/repository/repository.go index 3d49259b..eed47676 100644 --- a/internal/database/repository/repository.go +++ b/internal/database/repository/repository.go @@ -22,7 +22,7 @@ type Repository interface { UpdateNode(ctx context.Context, node *model.Node) error GetNodeByID(ctx context.Context, id string) (*model.Node, error) DeleteNodesNotInIDs(ctx context.Context, ids []string, deletedAtNano int64) error - GetNodesPerPartition(ctx context.Context, partition string, filters NodeFilters) ([]*model.Node, error) + GetNodesPerPartition(ctx context.Context, partitionID string, filters NodeFilters) ([]*model.Node, error) InsertPartition(ctx context.Context, partition *model.Partition) error UpdatePartition(ctx context.Context, partition *model.Partition) error GetAllPartitions(ctx context.Context, filters PartitionFilters) ([]*model.Partition, error) diff --git a/internal/health/components_int_test.go b/internal/health/components_int_test.go index 2dae1ea5..451c3814 100644 --- a/internal/health/components_int_test.go +++ b/internal/health/components_int_test.go @@ -2,27 +2,30 @@ package health import ( "context" - "testing" - "github.com/stretchr/testify/assert" - - "github.com/G-Research/unicorn-history-server/internal/database/postgres" "github.com/G-Research/unicorn-history-server/internal/yunikorn" "github.com/G-Research/unicorn-history-server/test/config" + "github.com/jackc/pgx/v5/pgxpool" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/suite" ) -func TestNewComponent_Integration(t *testing.T) { - if testing.Short() { - t.Skip("skipping integration test in short mode.") - } +type ComponentsIntTest struct { + suite.Suite + pool *pgxpool.Pool + yunikornClient *yunikorn.RESTClient +} - ctx := context.Background() +func (ts *ComponentsIntTest) SetupSuite() { + ts.yunikornClient = yunikorn.NewRESTClient(config.GetTestYunikornConfig()) +} - yunikornClient := yunikorn.NewRESTClient(config.GetTestYunikornConfig()) - postgresPool, err := postgres.NewConnectionPool(ctx, config.GetTestPostgresConfig()) - if err != nil { - t.Fatalf("error creating postgres connection pool: %v", err) - } +func (ts *ComponentsIntTest) TearDownSuite() { + ts.pool.Close() +} + +func (ts *ComponentsIntTest) TestNewComponents() { + ctx := context.Background() tests := []struct { name string @@ -32,23 +35,23 @@ func TestNewComponent_Integration(t *testing.T) { }{ { name: "should return a valid ComponentStatus when Yunikorn is reachable", - component: NewYunikornComponent(yunikornClient), + component: NewYunikornComponent(ts.yunikornClient), expectedIdentifier: "yunikorn", expectedHealthy: true, }, { name: "should return a valid ComponentStatus when Postgres is reachable", - component: NewPostgresComponent(postgresPool), + component: NewPostgresComponent(ts.pool), expectedIdentifier: "postgres", expectedHealthy: true, }, } for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { + ts.Run(tt.name, func() { status := tt.component.Check(ctx) - assert.Equal(t, tt.expectedIdentifier, status.Identifier) - assert.Equal(t, tt.expectedHealthy, status.Healthy) + assert.Equal(ts.T(), tt.expectedIdentifier, status.Identifier) + assert.Equal(ts.T(), tt.expectedHealthy, status.Healthy) }) } } diff --git a/internal/health/health_int_test.go b/internal/health/health_int_test.go index fdf7bc63..8530c1d7 100644 --- a/internal/health/health_int_test.go +++ b/internal/health/health_int_test.go @@ -5,75 +5,76 @@ import ( "testing" "time" + "github.com/jackc/pgx/v5/pgxpool" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/suite" "github.com/G-Research/unicorn-history-server/internal/config" - "github.com/G-Research/unicorn-history-server/internal/database/postgres" "github.com/G-Research/unicorn-history-server/internal/yunikorn" testconfig "github.com/G-Research/unicorn-history-server/test/config" ) -func TestService_Readiness_Integration(t *testing.T) { - if testing.Short() { - t.Skip("skipping integration test in short mode.") - } +type HealthIntTest struct { + suite.Suite + pool *pgxpool.Pool + yunikornClient *yunikorn.RESTClient +} - ctx := context.Background() +func (ts *HealthIntTest) SetupSuite() { + ts.yunikornClient = yunikorn.NewRESTClient(testconfig.GetTestYunikornConfig()) +} + +func (ts *HealthIntTest) TearDownSuite() { + ts.pool.Close() +} - now := time.Now() +func (ts *HealthIntTest) TestService_Readiness() { + ctx := context.Background() + startedAt := time.Now() version := "1.0.0" - t.Run("status is unhealthy when one component is unhealthy", func(t *testing.T) { + ts.Run("status is unhealthy when one component is unhealthy", func() { invalidYunikornConfig := config.YunikornConfig{ Host: "invalid-host", Port: 2212, Secure: false, } yunikornClient := yunikorn.NewRESTClient(&invalidYunikornConfig) - postgresPool, err := postgres.NewConnectionPool(ctx, testconfig.GetTestPostgresConfig()) - if err != nil { - t.Fatalf("error creating postgres connection pool: %v", err) - } components := []Component{ NewYunikornComponent(yunikornClient), - NewPostgresComponent(postgresPool), + NewPostgresComponent(ts.pool), } service := Service{ - startedAt: now, + startedAt: startedAt, version: version, components: components, } - status := service.Readiness(context.Background()) + status := service.Readiness(ctx) expectErrorPrefix := `Get "http://invalid-host:2212/ws/v1/scheduler/healthcheck": dial tcp: lookup invalid-host` - assert.False(t, status.Healthy) - assert.Equal(t, 2, len(status.ComponentStatuses)) - assertStatus(t, status.ComponentStatuses, "yunikorn", false, expectErrorPrefix) - assert.Equal(t, now, status.StartedAt) - assert.Equal(t, version, status.Version) + assert.False(ts.T(), status.Healthy) + assert.Equal(ts.T(), 2, len(status.ComponentStatuses)) + assertStatus(ts.T(), status.ComponentStatuses, "yunikorn", false, expectErrorPrefix) + assert.Equal(ts.T(), startedAt, status.StartedAt) + assert.Equal(ts.T(), version, status.Version) }) - t.Run("status is healthy when all components are healthy", func(t *testing.T) { - yunikornClient := yunikorn.NewRESTClient(testconfig.GetTestYunikornConfig()) - postgresPool, err := postgres.NewConnectionPool(ctx, testconfig.GetTestPostgresConfig()) - if err != nil { - t.Fatalf("error creating postgres connection pool: %v", err) - } + ts.Run("status is healthy when all components are healthy", func() { components := []Component{ - NewYunikornComponent(yunikornClient), - NewPostgresComponent(postgresPool), + NewYunikornComponent(ts.yunikornClient), + NewPostgresComponent(ts.pool), } service := Service{ - startedAt: now, + startedAt: startedAt, version: version, components: components, } - status := service.Readiness(context.Background()) - assert.True(t, status.Healthy) + status := service.Readiness(ctx) + assert.True(ts.T(), status.Healthy) for _, componentStatus := range status.ComponentStatuses { - assert.True(t, componentStatus.Healthy) + assert.True(ts.T(), componentStatus.Healthy) } - assert.Equal(t, now, status.StartedAt) - assert.Equal(t, version, status.Version) + assert.Equal(ts.T(), startedAt, status.StartedAt) + assert.Equal(ts.T(), version, status.Version) }) } diff --git a/internal/health/health_suite_test.go b/internal/health/health_suite_test.go new file mode 100644 index 00000000..bac9bb77 --- /dev/null +++ b/internal/health/health_suite_test.go @@ -0,0 +1,58 @@ +package health + +import ( + "context" + "testing" + + "github.com/G-Research/unicorn-history-server/test/database" + "github.com/jackc/pgx/v5/pgxpool" + "github.com/stretchr/testify/require" + "github.com/stretchr/testify/suite" +) + +type HealthSuite struct { + suite.Suite + tp *database.TestPostgresContainer + pool *pgxpool.Pool +} + +func (ts *HealthSuite) SetupSuite() { + ctx := context.Background() + cfg := database.InstanceConfig{ + User: "test", + Password: "test", + DBName: "template", + Host: "localhost", + Port: 15437, + } + + tp, err := database.NewTestPostgresContainer(ctx, cfg) + require.NoError(ts.T(), err) + ts.tp = tp + ts.pool = tp.Pool(ctx, ts.T(), &cfg) +} + +func (ts *HealthSuite) TearDownSuite() { + err := ts.tp.Container.Terminate(context.Background()) + require.NoError(ts.T(), err) +} + +func (ts *HealthSuite) TestSubSuites() { + ts.T().Run("HealthIntTest", func(t *testing.T) { + pool := database.CloneDB(t, ts.tp, ts.pool) + suite.Run(t, &HealthIntTest{pool: pool}) + }) + ts.T().Run("ComponentsIntTest", func(t *testing.T) { + pool := database.CloneDB(t, ts.tp, ts.pool) + suite.Run(t, &ComponentsIntTest{pool: pool}) + }) + +} + +func TestHealthSuiteIntegration(t *testing.T) { + if testing.Short() { + t.Skip("skipping integration test in short mode.") + } + topSuite := new(HealthSuite) + suite.Run(t, topSuite) +} diff --git a/internal/yunikorn/sync.go b/internal/yunikorn/sync.go index f835f41b..779aac07 100644 --- a/internal/yunikorn/sync.go +++ b/internal/yunikorn/sync.go @@ -27,7 +27,6 @@ func (s *Service) syncPartitions(ctx context.Context, partitions []*dao.Partitio for _, p := range partitions { current, err := s.repo.GetPartitionByID(ctx, p.ID) - fmt.Printf("Getting partition resulted in current: %+v, err: %v\n", current, err) if err != nil { fmt.Printf("Error getting partition: %v\n", err) partition := &model.Partition{ diff --git a/internal/yunikorn/sync_applications_int_test.go b/internal/yunikorn/sync_applications_int_test.go new file mode 100644 index 00000000..0232fae4 --- /dev/null +++ b/internal/yunikorn/sync_applications_int_test.go @@ -0,0 +1,177 @@ +package yunikorn + +import ( + "context" + "time" + + "github.com/G-Research/unicorn-history-server/internal/database/repository" + "github.com/G-Research/unicorn-history-server/internal/model" + "github.com/G-Research/yunikorn-core/pkg/webservice/dao" + "github.com/jackc/pgx/v5/pgxpool" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/stretchr/testify/suite" +) + +type SyncApplicationsIntTest struct { + suite.Suite + pool *pgxpool.Pool + repo *repository.PostgresRepository +} + +func (ss *SyncApplicationsIntTest) SetupSuite() { + require.NotNil(ss.T(), ss.pool) + repo, err := repository.NewPostgresRepository(ss.pool) + require.NoError(ss.T(), err) + ss.repo = repo +} + +func (ss *SyncApplicationsIntTest) TearDownSuite() { + ss.pool.Close() +} + +func (ss *SyncApplicationsIntTest) TestSyncApplications() { + ctx := context.Background() + now := time.Now().UnixNano() + + tests := []struct { + name string + stateApplications []*dao.ApplicationDAOInfo + existingApplications []*model.Application + expectedLive []*model.Application + expectedDeleted []*model.Application + wantErr bool + }{ + { + name: "Sync applications with no existing applications in DB", + stateApplications: []*dao.ApplicationDAOInfo{ + {ID: "1", ApplicationID: "app-1"}, + {ID: "2", ApplicationID: "app-2"}, + }, + existingApplications: nil, + expectedLive: []*model.Application{ + { + Metadata: model.Metadata{ + CreatedAtNano: now, + }, + ApplicationDAOInfo: dao.ApplicationDAOInfo{ + ID: "1", + ApplicationID: "app-1", + }, + }, + { + Metadata: model.Metadata{ + CreatedAtNano: now, + }, + ApplicationDAOInfo: dao.ApplicationDAOInfo{ + ID: "2", + ApplicationID: "app-2", + }, + }, + }, + wantErr: false, + }, + { + name: "Should mark application as deleted in DB", + stateApplications: []*dao.ApplicationDAOInfo{ + {ID: "1", ApplicationID: "app-1"}, + }, + existingApplications: []*model.Application{ + { + Metadata: model.Metadata{ + CreatedAtNano: now, + }, + ApplicationDAOInfo: dao.ApplicationDAOInfo{ + ID: "1", + ApplicationID: "app-1", + }, + }, + { + Metadata: model.Metadata{ + CreatedAtNano: now, + }, + ApplicationDAOInfo: dao.ApplicationDAOInfo{ + ID: "2", + ApplicationID: "app-2", + }, + }, + }, + expectedLive: []*model.Application{ + { + Metadata: model.Metadata{ + CreatedAtNano: now, + }, + ApplicationDAOInfo: dao.ApplicationDAOInfo{ + ID: "1", + ApplicationID: "app-1", + }, + }, + }, + expectedDeleted: []*model.Application{ + { + Metadata: model.Metadata{ + CreatedAtNano: now, + }, + ApplicationDAOInfo: dao.ApplicationDAOInfo{ + ID: "2", + ApplicationID: "app-2", + }, + }, + }, + wantErr: false, + }, + } + + for _, tt := range tests { + ss.Run(tt.name, func() { + // clean up the table after the test + ss.T().Cleanup(func() { + _, err := ss.pool.Exec(ctx, "DELETE FROM applications") + require.NoError(ss.T(), err) + }) + + for _, app := range tt.existingApplications { + err := ss.repo.InsertApplication(ctx, app) + require.NoError(ss.T(), err) + } + + s := NewService(ss.repo, nil, nil) + + err := s.syncApplications(ctx, tt.stateApplications) + if tt.wantErr { + require.Error(ss.T(), err) + return + } + require.NoError(ss.T(), err) + + applicationsInDB, err := s.repo.GetAllApplications( + ctx, + repository.ApplicationFilters{}, + ) + require.NoError(ss.T(), err) + + require.Equal(ss.T(), len(tt.expectedLive)+len(tt.expectedDeleted), len(applicationsInDB)) + + lookup := make(map[string]model.Application) + for _, app := range applicationsInDB { + lookup[app.ID] = *app + } + + for _, target := range tt.expectedLive { + state, ok := lookup[target.ID] + require.True(ss.T(), ok) + assert.NotEmpty(ss.T(), state.ID) + assert.Greater(ss.T(), state.Metadata.CreatedAtNano, int64(0)) + assert.Nil(ss.T(), state.Metadata.DeletedAtNano) + } + + for _, target := range tt.expectedDeleted { + state, ok := lookup[target.ID] + require.True(ss.T(), ok) + assert.NotEmpty(ss.T(), state.ID) + assert.Greater(ss.T(), state.Metadata.CreatedAtNano, int64(0)) + assert.NotNil(ss.T(), state.Metadata.DeletedAtNano) + } + }) + } +} diff --git a/internal/yunikorn/sync_int_test.go b/internal/yunikorn/sync_int_test.go deleted file mode 100644 index fba93e83..00000000 --- a/internal/yunikorn/sync_int_test.go +++ /dev/null @@ -1,861 +0,0 @@ -package yunikorn - -import ( - "context" - "testing" - "time" - - "github.com/G-Research/yunikorn-core/pkg/webservice/dao" - "github.com/jackc/pgx/v5/pgxpool" - "github.com/oklog/ulid/v2" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - - "github.com/G-Research/unicorn-history-server/internal/database/migrations" - "github.com/G-Research/unicorn-history-server/internal/database/postgres" - "github.com/G-Research/unicorn-history-server/internal/database/repository" - "github.com/G-Research/unicorn-history-server/internal/model" - "github.com/G-Research/unicorn-history-server/test/config" - "github.com/G-Research/unicorn-history-server/test/database" -) - -func TestSync_syncNodes_Integration(t *testing.T) { - if testing.Short() { - t.Skip("skipping test in short mode.") - } - - ctx, cancel := context.WithCancel(context.Background()) - t.Cleanup(cancel) - - pool, repo, cleanupDB := setupDatabase(t, ctx) - t.Cleanup(cleanupDB) - eventRepository := repository.NewInMemoryEventRepository() - - nowNano := time.Now().UnixNano() - partitionID := ulid.Make().String() - - tests := []struct { - name string - stateNodes []*dao.NodesDAOInfo - existingNodes []*model.Node - expectedNodes []*model.Node - wantErr bool - }{ - { - name: "Sync nodes with no existing nodes", - stateNodes: []*dao.NodesDAOInfo{ - { - PartitionName: "default", - Nodes: []*dao.NodeDAOInfo{ - {ID: "1", NodeID: "node-1", PartitionID: partitionID, HostName: "host-1"}, - {ID: "2", NodeID: "node-2", PartitionID: partitionID, HostName: "host-2"}, - }, - }, - }, - existingNodes: nil, - expectedNodes: []*model.Node{ - {NodeDAOInfo: dao.NodeDAOInfo{ID: "2", NodeID: "node-2", HostName: "host-2"}}, - {NodeDAOInfo: dao.NodeDAOInfo{ID: "1", NodeID: "node-1", HostName: "host-1"}}, - }, - wantErr: false, - }, - { - name: "Sync nodes with existing nodes in DB", - stateNodes: []*dao.NodesDAOInfo{ - { - PartitionName: "default", - Nodes: []*dao.NodeDAOInfo{ - {ID: "2", NodeID: "node-2", PartitionID: partitionID, HostName: "host-2-updated"}, - }, - }, - }, - existingNodes: []*model.Node{ - {NodeDAOInfo: dao.NodeDAOInfo{ID: "1", NodeID: "node-1", HostName: "host-1", PartitionID: partitionID}}, - {NodeDAOInfo: dao.NodeDAOInfo{ID: "2", NodeID: "node-2", HostName: "host-2", PartitionID: partitionID}}, - }, - expectedNodes: []*model.Node{ - {NodeDAOInfo: dao.NodeDAOInfo{ID: "2", NodeID: "node-2", HostName: "host-2-updated", PartitionID: partitionID}}, // updated - { - Metadata: model.Metadata{DeletedAtNano: &nowNano}, // deleted - NodeDAOInfo: dao.NodeDAOInfo{ID: "1", NodeID: "node-1", HostName: "host-1", PartitionID: partitionID}, - }, - }, - wantErr: false, - }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - // clean up the table after the test - t.Cleanup(func() { - _, err := pool.Exec(ctx, "DELETE FROM nodes") - require.NoError(t, err) - }) - - for _, node := range tt.existingNodes { - err := repo.InsertNode(ctx, node) - require.NoError(t, err) - } - - s := NewService(repo, eventRepository, nil) - - err := s.syncNodes(ctx, tt.stateNodes) - if tt.wantErr { - require.Error(t, err) - return - } - require.NoError(t, err) - - nodesInDB, err := s.repo.GetNodesPerPartition(ctx, partitionID, repository.NodeFilters{}) - require.NoError(t, err) - require.Equal(t, len(tt.expectedNodes), len(nodesInDB)) - for i, target := range tt.expectedNodes { - require.Equal(t, target.ID, nodesInDB[i].ID) - require.Equal(t, target.NodeID, nodesInDB[i].NodeID) - require.Equal(t, target.HostName, nodesInDB[i].HostName) - if target.DeletedAtNano != nil { - require.NotNil(t, nodesInDB[i].DeletedAtNano) - } - } - }) - } -} - -func TestSync_syncQueues_Integration(t *testing.T) { - if testing.Short() { - t.Skip("skipping test in short mode.") - } - - ctx, cancel := context.WithCancel(context.Background()) - t.Cleanup(cancel) - - pool, repo, cleanupDB := setupDatabase(t, ctx) - t.Cleanup(cleanupDB) - eventRepository := repository.NewInMemoryEventRepository() - - now := time.Now().UnixNano() - - tests := []struct { - name string - stateQueues []dao.PartitionQueueDAOInfo - existingQueues []*model.Queue - expected []*model.Queue - wantErr bool - }{ - { - name: "Sync queues with no existing queues", - stateQueues: []dao.PartitionQueueDAOInfo{ - { - ID: "1", - QueueName: "root", - PartitionID: "1", - Children: []dao.PartitionQueueDAOInfo{ - { - ID: "2", - QueueName: "root.child-1", - PartitionID: "1", - Children: []dao.PartitionQueueDAOInfo{ - { - ID: "3", - QueueName: "root.child-1.1", - PartitionID: "1", - }, - { - ID: "4", - QueueName: "root.child-1.2", - PartitionID: "1", - }, - }, - }, - }, - }, - }, - existingQueues: nil, - expected: []*model.Queue{ - { - PartitionQueueDAOInfo: dao.PartitionQueueDAOInfo{ - ID: "1", - QueueName: "root", - PartitionID: "1", - }, - }, - { - PartitionQueueDAOInfo: dao.PartitionQueueDAOInfo{ - ID: "2", - QueueName: "root.child-1", - PartitionID: "1", - }, - }, - { - PartitionQueueDAOInfo: dao.PartitionQueueDAOInfo{ - ID: "3", - QueueName: "root.child-1.1", - PartitionID: "1", - }, - }, - { - PartitionQueueDAOInfo: dao.PartitionQueueDAOInfo{ - ID: "4", - QueueName: "root.child-1.2", - PartitionID: "1", - }, - }, - }, - wantErr: false, - }, - { - name: "Sync queues with existing queues in DB", - stateQueues: []dao.PartitionQueueDAOInfo{{ - ID: "1", - QueueName: "root", - PartitionID: "1", - Children: []dao.PartitionQueueDAOInfo{ - { - ID: "2", - QueueName: "root.child-1", - PartitionID: "1", - }, - { - ID: "3", - QueueName: "root.child-2", - PartitionID: "1", - }, - }, - }, - }, - - existingQueues: []*model.Queue{ - { - Metadata: model.Metadata{ - CreatedAtNano: now, - }, - PartitionQueueDAOInfo: dao.PartitionQueueDAOInfo{ - ID: "1", - QueueName: "root", - PartitionID: "1", - }, - }, - }, - expected: []*model.Queue{ - { - PartitionQueueDAOInfo: dao.PartitionQueueDAOInfo{ - ID: "1", - QueueName: "root", - PartitionID: "1", - }, - }, - { - PartitionQueueDAOInfo: dao.PartitionQueueDAOInfo{ - ID: "2", - QueueName: "root.child-1", - PartitionID: "1", - }, - }, - { - PartitionQueueDAOInfo: dao.PartitionQueueDAOInfo{ - ID: "3", - QueueName: "root.child-2", - PartitionID: "1", - }, - }, - }, - wantErr: false, - }, - { - name: "Sync queues when queue is deleted", - stateQueues: []dao.PartitionQueueDAOInfo{ - { - ID: "1", - QueueName: "root", - PartitionID: "1", - Children: []dao.PartitionQueueDAOInfo{ - { - ID: "3", - QueueName: "root.child-2", - PartitionID: "1", - }, - }, - }, - }, - existingQueues: []*model.Queue{ - { - Metadata: model.Metadata{ - CreatedAtNano: now, - }, - PartitionQueueDAOInfo: dao.PartitionQueueDAOInfo{ - ID: "1", - QueueName: "root", - PartitionID: "1", - }, - }, - { - Metadata: model.Metadata{ - CreatedAtNano: now, - }, - PartitionQueueDAOInfo: dao.PartitionQueueDAOInfo{ - ID: "2", - QueueName: "root.child-1", - PartitionID: "1", - }, - }, - }, - expected: []*model.Queue{ - { - PartitionQueueDAOInfo: dao.PartitionQueueDAOInfo{ - ID: "1", - QueueName: "root", - PartitionID: "1", - }, - }, - { - PartitionQueueDAOInfo: dao.PartitionQueueDAOInfo{ - ID: "3", - QueueName: "root.child-2", - PartitionID: "1", - }, - }, - }, - wantErr: false, - }, - { - name: "Sync queues with multiple partitions", - stateQueues: []dao.PartitionQueueDAOInfo{ - { - ID: "1", - QueueName: "root", - PartitionID: "1", - Children: []dao.PartitionQueueDAOInfo{ - { - ID: "2", - QueueName: "root.child-1", - PartitionID: "1", - }, - { - ID: "3", - QueueName: "root.child-2", - PartitionID: "1", - }, - }, - }, - { - ID: "4", - QueueName: "root", - PartitionID: "2", - Children: []dao.PartitionQueueDAOInfo{ - { - ID: "5", - QueueName: "root.child-1", - PartitionID: "2", - }, - { - ID: "6", - QueueName: "root.child-2", - PartitionID: "2", - }, - }, - }, - }, - existingQueues: nil, - expected: []*model.Queue{ - { - PartitionQueueDAOInfo: dao.PartitionQueueDAOInfo{ - ID: "1", - QueueName: "root", - PartitionID: "1", - }, - }, - { - PartitionQueueDAOInfo: dao.PartitionQueueDAOInfo{ - ID: "2", - QueueName: "root.child-1", - PartitionID: "1", - }, - }, - { - PartitionQueueDAOInfo: dao.PartitionQueueDAOInfo{ - ID: "3", - QueueName: "root.child-2", - PartitionID: "1", - }, - }, - { - PartitionQueueDAOInfo: dao.PartitionQueueDAOInfo{ - ID: "4", - QueueName: "root", - PartitionID: "2", - }, - }, - { - PartitionQueueDAOInfo: dao.PartitionQueueDAOInfo{ - ID: "5", - QueueName: "root.child-1", - PartitionID: "2", - }, - }, - { - PartitionQueueDAOInfo: dao.PartitionQueueDAOInfo{ - ID: "6", - QueueName: "root.child-2", - PartitionID: "2", - }, - }, - }, - - wantErr: false, - }, - { - name: "Sync queues with deeply nested queues", - stateQueues: []dao.PartitionQueueDAOInfo{ - { - ID: "1", - QueueName: "root", - PartitionID: "1", - Children: []dao.PartitionQueueDAOInfo{ - { - ID: "2", - QueueName: "root.child-1", - PartitionID: "1", - Children: []dao.PartitionQueueDAOInfo{ - { - ID: "3", - QueueName: "root.child-1.1", - PartitionID: "1", - Children: []dao.PartitionQueueDAOInfo{ - { - ID: "4", - QueueName: "root.child-1.1.1", - PartitionID: "1", - }, - { - ID: "5", - QueueName: "root.child-1.1.2", - PartitionID: "1", - }, - }, - }, - }, - }, - }, - }, - }, - existingQueues: nil, - expected: []*model.Queue{ - { - PartitionQueueDAOInfo: dao.PartitionQueueDAOInfo{ - ID: "1", - QueueName: "root", - PartitionID: "1", - }, - }, - { - PartitionQueueDAOInfo: dao.PartitionQueueDAOInfo{ - ID: "2", - QueueName: "root.child-1", - PartitionID: "1", - }, - }, - { - PartitionQueueDAOInfo: dao.PartitionQueueDAOInfo{ - ID: "3", - QueueName: "root.child-1.1", - PartitionID: "1", - }, - }, - { - PartitionQueueDAOInfo: dao.PartitionQueueDAOInfo{ - ID: "4", - QueueName: "root.child-1.1.1", - PartitionID: "1", - }, - }, - { - PartitionQueueDAOInfo: dao.PartitionQueueDAOInfo{ - ID: "5", - QueueName: "root.child-1.1.2", - PartitionID: "1", - }, - }, - }, - wantErr: false, - }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - // clean up the table after the test - t.Cleanup(func() { - _, err := pool.Exec(ctx, "DELETE FROM queues") - require.NoError(t, err) - }) - - for _, q := range tt.existingQueues { - err := repo.InsertQueue(ctx, q) - require.NoError(t, err) - } - - s := NewService(repo, eventRepository, nil) - - err := s.syncQueues(context.Background(), tt.stateQueues) - if tt.wantErr { - require.Error(t, err) - return - } - require.NoError(t, err) - queuesInDB, err := s.repo.GetAllQueues(ctx) - require.NoError(t, err) - for _, target := range tt.expected { - if !isQueuePresent(queuesInDB, target) { - t.Errorf("Queue %s in partition %s is not found in the DB", target.QueueName, target.PartitionID) - } - } - }) - } -} - -func TestSync_syncPartitions_Integration(t *testing.T) { - if testing.Short() { - t.Skip("skipping test in short mode.") - } - - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - pool, repo, cleanupDB := setupDatabase(t, ctx) - t.Cleanup(cleanupDB) - eventRepository := repository.NewInMemoryEventRepository() - - now := time.Now().UnixNano() - - tests := []struct { - name string - statePartitions []*dao.PartitionInfo - existingPartitions []*model.Partition - expected []*model.Partition - wantErr bool - }{ - { - name: "Sync partition with no existing partitions in DB", - statePartitions: []*dao.PartitionInfo{ - { - ID: "1", - Name: "1", - }, - { - ID: "2", - Name: "2", - }, - }, - existingPartitions: nil, - expected: []*model.Partition{ - { - PartitionInfo: dao.PartitionInfo{ - ID: "2", - Name: "2", - }, - }, - { - PartitionInfo: dao.PartitionInfo{ - ID: "1", - Name: "1", - }, - }, - }, - wantErr: false, - }, - { - name: "Should mark partition 2 as deleted in DB", - statePartitions: []*dao.PartitionInfo{ - { - ID: "1", - Name: "1", - }, - }, - - existingPartitions: []*model.Partition{ - { - Metadata: model.Metadata{ - CreatedAtNano: now, - }, - PartitionInfo: dao.PartitionInfo{ - ID: "1", - Name: "1", - }, - }, - { - Metadata: model.Metadata{ - CreatedAtNano: now, - }, - PartitionInfo: dao.PartitionInfo{ - ID: "2", - Name: "2", - }, - }, - }, - expected: []*model.Partition{ - { - Metadata: model.Metadata{ - CreatedAtNano: now, - }, - PartitionInfo: dao.PartitionInfo{ - ID: "1", - Name: "1", - }, - }, - }, - wantErr: false, - }, - // TODO: test syncPartition when statePartitions is nil - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - // clean up the table after the test - t.Cleanup(func() { - _, err := pool.Exec(ctx, "DELETE FROM partitions") - require.NoError(t, err) - }) - - for _, partition := range tt.existingPartitions { - err := repo.InsertPartition(ctx, partition) - require.NoError(t, err) - } - - s := NewService(repo, eventRepository, nil) - - // Start the service - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - err := s.syncPartitions(ctx, tt.statePartitions) - if tt.wantErr { - require.Error(t, err) - return - } - require.NoError(t, err) - - var partitionsInDB []*model.Partition - partitionsInDB, err = s.repo.GetAllPartitions(ctx, repository.PartitionFilters{}) - require.NoError(t, err) - - for _, dbPartition := range partitionsInDB { - found := false - for _, expectedPartition := range tt.expected { - if dbPartition.ID == expectedPartition.ID { - assert.Equal(t, expectedPartition.PartitionInfo, dbPartition.PartitionInfo) - assert.Nil(t, expectedPartition.DeletedAtNano) - found = true - } - } - if !found { - assert.NotNil(t, dbPartition.DeletedAtNano) - } - } - - }) - } -} - -func TestSync_syncApplications_Integration(t *testing.T) { - if testing.Short() { - t.Skip("skipping test in short mode.") - } - - ctx, cancel := context.WithCancel(context.Background()) - t.Cleanup(cancel) - - pool, repo, cleanupDB := setupDatabase(t, ctx) - t.Cleanup(cleanupDB) - eventRepository := repository.NewInMemoryEventRepository() - - now := time.Now().UnixNano() - tests := []struct { - name string - stateApplications []*dao.ApplicationDAOInfo - existingApplications []*model.Application - expectedLive []*model.Application - expectedDeleted []*model.Application - wantErr bool - }{ - { - name: "Sync applications with no existing applications in DB", - stateApplications: []*dao.ApplicationDAOInfo{ - {ID: "1", ApplicationID: "app-1"}, - {ID: "2", ApplicationID: "app-2"}, - }, - existingApplications: nil, - expectedLive: []*model.Application{ - { - Metadata: model.Metadata{ - CreatedAtNano: now, - }, - ApplicationDAOInfo: dao.ApplicationDAOInfo{ - ID: "1", - ApplicationID: "app-1", - }, - }, - { - Metadata: model.Metadata{ - CreatedAtNano: now, - }, - ApplicationDAOInfo: dao.ApplicationDAOInfo{ - ID: "2", - ApplicationID: "app-2", - }, - }, - }, - wantErr: false, - }, - { - name: "Should mark application as deleted in DB", - stateApplications: []*dao.ApplicationDAOInfo{ - {ID: "1", ApplicationID: "app-1"}, - }, - existingApplications: []*model.Application{ - { - Metadata: model.Metadata{ - CreatedAtNano: now, - }, - ApplicationDAOInfo: dao.ApplicationDAOInfo{ - ID: "1", - ApplicationID: "app-1", - }, - }, - { - Metadata: model.Metadata{ - CreatedAtNano: now, - }, - ApplicationDAOInfo: dao.ApplicationDAOInfo{ - ID: "2", - ApplicationID: "app-2", - }, - }, - }, - expectedLive: []*model.Application{ - { - Metadata: model.Metadata{ - CreatedAtNano: now, - }, - ApplicationDAOInfo: dao.ApplicationDAOInfo{ - ID: "1", - ApplicationID: "app-1", - }, - }, - }, - expectedDeleted: []*model.Application{ - { - Metadata: model.Metadata{ - CreatedAtNano: now, - }, - ApplicationDAOInfo: dao.ApplicationDAOInfo{ - ID: "2", - ApplicationID: "app-2", - }, - }, - }, - wantErr: false, - }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - // clean up the table after the test - t.Cleanup(func() { - _, err := pool.Exec(ctx, "DELETE FROM applications") - require.NoError(t, err) - }) - - for _, app := range tt.existingApplications { - err := repo.InsertApplication(ctx, app) - require.NoError(t, err) - } - - s := NewService(repo, eventRepository, nil) - - // Start the service - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - err := s.syncApplications(ctx, tt.stateApplications) - if tt.wantErr { - require.Error(t, err) - return - } - require.NoError(t, err) - - applicationsInDB, err := s.repo.GetAllApplications( - ctx, - repository.ApplicationFilters{}, - ) - require.NoError(t, err) - - require.Equal(t, len(tt.expectedLive)+len(tt.expectedDeleted), len(applicationsInDB)) - - lookup := make(map[string]model.Application) - for _, app := range applicationsInDB { - lookup[app.ID] = *app - } - - for _, target := range tt.expectedLive { - state, ok := lookup[target.ID] - require.True(t, ok) - assert.NotEmpty(t, state.ID) - assert.Greater(t, state.Metadata.CreatedAtNano, int64(0)) - assert.Nil(t, state.Metadata.DeletedAtNano) - } - - for _, target := range tt.expectedDeleted { - state, ok := lookup[target.ID] - require.True(t, ok) - assert.NotEmpty(t, state.ID) - assert.Greater(t, state.Metadata.CreatedAtNano, int64(0)) - assert.NotNil(t, state.Metadata.DeletedAtNano) - } - }) - } -} - -func isQueuePresent(queuesInDB []*model.Queue, targetQueue *model.Queue) bool { - for _, dbQueue := range queuesInDB { - if dbQueue.QueueName == targetQueue.QueueName && dbQueue.PartitionID == targetQueue.PartitionID { - // Check if DeletedAtNano fields are either both nil or both non-nil - if (dbQueue.DeletedAtNano == nil && targetQueue.DeletedAtNano != nil) || - (dbQueue.DeletedAtNano != nil && targetQueue.DeletedAtNano == nil) { - return false // If one is nil and the other is not, return false - } - return true - } - } - return false -} - -func setupDatabase(t *testing.T, ctx context.Context) (*pgxpool.Pool, repository.Repository, func()) { - schema := database.CreateTestSchema(ctx, t) - cfg := config.GetTestPostgresConfig() - cfg.Schema = schema - m, err := migrations.New(cfg, "../../migrations") - if err != nil { - t.Fatalf("error creating migrator: %v", err) - } - applied, err := m.Up() - if err != nil { - t.Fatalf("error occured while applying migrations: %v", err) - } - if !applied { - t.Fatal("migrator finished but migrations were not applied") - } - - pool, err := postgres.NewConnectionPool(ctx, cfg) - if err != nil { - t.Fatalf("error creating postgres connection pool: %v", err) - } - repo, err := repository.NewPostgresRepository(pool) - if err != nil { - t.Fatalf("error creating postgres repository: %v", err) - } - - cleanup := func() { - database.DropTestSchema(context.Background(), t, schema) - } - - return pool, repo, cleanup -} diff --git a/internal/yunikorn/sync_nodes_int_test.go b/internal/yunikorn/sync_nodes_int_test.go new file mode 100644 index 00000000..3d3bf56c --- /dev/null +++ b/internal/yunikorn/sync_nodes_int_test.go @@ -0,0 +1,123 @@ +package yunikorn + +import ( + "context" + "time" + + "github.com/G-Research/unicorn-history-server/internal/database/repository" + "github.com/G-Research/unicorn-history-server/internal/model" + "github.com/G-Research/yunikorn-core/pkg/webservice/dao" + "github.com/jackc/pgx/v5/pgxpool" + "github.com/oklog/ulid/v2" + "github.com/stretchr/testify/require" + "github.com/stretchr/testify/suite" +) + +type SyncNodesIntTest struct { + suite.Suite + pool *pgxpool.Pool + repo *repository.PostgresRepository +} + +func (ss *SyncNodesIntTest) SetupSuite() { + require.NotNil(ss.T(), ss.pool) + repo, err := repository.NewPostgresRepository(ss.pool) + require.NoError(ss.T(), err) + ss.repo = repo +} + +func (ss *SyncNodesIntTest) TearDownSuite() { + ss.pool.Close() +} + +func (ss *SyncNodesIntTest) TestSyncNodes() { + + ctx := context.Background() + nowNano := time.Now().UnixNano() + partitionID := ulid.Make().String() + + tests := []struct { + name string + stateNodes []*dao.NodesDAOInfo + existingNodes []*model.Node + expectedNodes []*model.Node + wantErr bool + }{ + { + name: "Sync nodes with no existing nodes", + stateNodes: []*dao.NodesDAOInfo{ + { + PartitionName: "default", + Nodes: []*dao.NodeDAOInfo{ + {ID: "1", NodeID: "node-1", PartitionID: partitionID, HostName: "host-1"}, + {ID: "2", NodeID: "node-2", PartitionID: partitionID, HostName: "host-2"}, + }, + }, + }, + existingNodes: nil, + expectedNodes: []*model.Node{ + {NodeDAOInfo: dao.NodeDAOInfo{ID: "2", NodeID: "node-2", HostName: "host-2"}}, + {NodeDAOInfo: dao.NodeDAOInfo{ID: "1", NodeID: "node-1", HostName: "host-1"}}, + }, + wantErr: false, + }, + { + name: "Sync nodes with existing nodes in DB", + stateNodes: []*dao.NodesDAOInfo{ + { + PartitionName: "default", + Nodes: []*dao.NodeDAOInfo{ + {ID: "2", NodeID: "node-2", PartitionID: partitionID, HostName: "host-2-updated"}, + }, + }, + }, + existingNodes: []*model.Node{ + {NodeDAOInfo: dao.NodeDAOInfo{ID: "1", NodeID: "node-1", HostName: "host-1", PartitionID: partitionID}}, + {NodeDAOInfo: dao.NodeDAOInfo{ID: "2", NodeID: "node-2", HostName: "host-2", PartitionID: partitionID}}, + }, + expectedNodes: []*model.Node{ + {NodeDAOInfo: dao.NodeDAOInfo{ID: "2", NodeID: "node-2", HostName: "host-2-updated", PartitionID: partitionID}}, // updated + { + Metadata: model.Metadata{DeletedAtNano: &nowNano}, // deleted + NodeDAOInfo: dao.NodeDAOInfo{ID: "1", NodeID: "node-1", HostName: "host-1", PartitionID: partitionID}, + }, + }, + wantErr: false, + }, + } + + for _, tt := range tests { + ss.Run(tt.name, func() { + // clean up the table after the test + ss.T().Cleanup(func() { + _, err := ss.pool.Exec(ctx, "DELETE FROM nodes") + require.NoError(ss.T(), err) + }) + + for _, node := range tt.existingNodes { + err := ss.repo.InsertNode(ctx, node) + require.NoError(ss.T(), err) + } + + s := NewService(ss.repo, nil, nil) + + err := s.syncNodes(ctx, tt.stateNodes) + if tt.wantErr { + require.Error(ss.T(), err) + return + } + require.NoError(ss.T(), err) + + nodesInDB, err := s.repo.GetNodesPerPartition(ctx, partitionID, repository.NodeFilters{}) + require.NoError(ss.T(), err) + for i, target := range tt.expectedNodes { + require.Equal(ss.T(), target.ID, nodesInDB[i].ID) + require.Equal(ss.T(), target.NodeID, nodesInDB[i].NodeID) + require.Equal(ss.T(), target.HostName, nodesInDB[i].HostName) + if target.DeletedAtNano != nil { + require.NotNil(ss.T(), nodesInDB[i].DeletedAtNano) + } + } + }) + } +} diff --git a/internal/yunikorn/sync_partitions_int_test.go b/internal/yunikorn/sync_partitions_int_test.go new file mode 100644 index 00000000..f55d6379 --- /dev/null +++ b/internal/yunikorn/sync_partitions_int_test.go @@ -0,0 +1,159 @@ +package yunikorn + +import ( + "context" + "time" + + "github.com/G-Research/unicorn-history-server/internal/database/repository" + "github.com/G-Research/unicorn-history-server/internal/model" + "github.com/G-Research/yunikorn-core/pkg/webservice/dao" + "github.com/jackc/pgx/v5/pgxpool" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/stretchr/testify/suite" +) + +type SyncPartitionIntTest struct { + suite.Suite + pool *pgxpool.Pool + repo *repository.PostgresRepository +} + +func (ss *SyncPartitionIntTest) SetupSuite() { + require.NotNil(ss.T(), ss.pool) + repo, err := repository.NewPostgresRepository(ss.pool) + require.NoError(ss.T(), err) + ss.repo = repo +} + +func (ss *SyncPartitionIntTest) TearDownSuite() { + ss.pool.Close() +} + +func (ss *SyncPartitionIntTest) TestSyncPartitions() { + ctx := context.Background() + now := time.Now().UnixNano() + + tests := []struct { + name string + statePartitions []*dao.PartitionInfo + existingPartitions []*model.Partition + expected []*model.Partition + wantErr bool + }{ + { + name: "Sync partition with no existing partitions in DB", + statePartitions: []*dao.PartitionInfo{ + { + ID: "1", + Name: "1", + }, + { + ID: "2", + Name: "2", + }, + }, + existingPartitions: nil, + expected: []*model.Partition{ + { + PartitionInfo: dao.PartitionInfo{ + ID: "2", + Name: "2", + }, + }, + { + PartitionInfo: dao.PartitionInfo{ + ID: "1", + Name: "1", + }, + }, + }, + wantErr: false, + }, + { + name: "Should mark partition 2 as deleted in DB", + statePartitions: []*dao.PartitionInfo{ + { + ID: "1", + Name: "1", + }, + }, + + existingPartitions: []*model.Partition{ + { + Metadata: model.Metadata{ + CreatedAtNano: now, + }, + PartitionInfo: dao.PartitionInfo{ + ID: "1", + Name: "1", + }, + }, + { + Metadata: model.Metadata{ + CreatedAtNano: now, + }, + PartitionInfo: dao.PartitionInfo{ + ID: "2", + Name: "2", + }, + }, + }, + expected: []*model.Partition{ + { + Metadata: model.Metadata{ + CreatedAtNano: now, + }, + PartitionInfo: dao.PartitionInfo{ + ID: "1", + Name: "1", + }, + }, + }, + wantErr: false, + }, + // TODO: test syncPartition when statePartitions is nil + } + + for _, tt := range tests { + ss.Run(tt.name, func() { + // clean up the table after the test + ss.T().Cleanup(func() { + _, err := ss.pool.Exec(ctx, "DELETE FROM partitions") + require.NoError(ss.T(), err) + }) + + for _, partition := range tt.existingPartitions { + err := ss.repo.InsertPartition(ctx, partition) + require.NoError(ss.T(), err) + } + + s := NewService(ss.repo, nil, nil) + + err := s.syncPartitions(ctx, tt.statePartitions) + if tt.wantErr { + require.Error(ss.T(), err) + return + } + require.NoError(ss.T(), err) + + var partitionsInDB []*model.Partition + partitionsInDB, err = s.repo.GetAllPartitions(ctx, repository.PartitionFilters{}) + require.NoError(ss.T(), err) + + for _, dbPartition := range partitionsInDB { + found := false + for _, expectedPartition := range tt.expected { + if dbPartition.ID == expectedPartition.ID { + assert.Equal(ss.T(), expectedPartition.PartitionInfo, dbPartition.PartitionInfo) + assert.Nil(ss.T(), expectedPartition.DeletedAtNano) + found = true + } + } + if !found { + assert.NotNil(ss.T(), dbPartition.DeletedAtNano) + } + } + }) + } +} diff --git a/internal/yunikorn/sync_queues_int_test.go b/internal/yunikorn/sync_queues_int_test.go new file mode 100644 index 00000000..18ae2c9e --- /dev/null +++ b/internal/yunikorn/sync_queues_int_test.go @@ -0,0 +1,425 @@ +package yunikorn + +import ( + "context" + "time" + + "github.com/G-Research/unicorn-history-server/internal/database/repository" + "github.com/G-Research/unicorn-history-server/internal/model" + "github.com/G-Research/yunikorn-core/pkg/webservice/dao" + "github.com/jackc/pgx/v5/pgxpool" + "github.com/stretchr/testify/require" + "github.com/stretchr/testify/suite" +) + +type SyncQueuesIntTest struct { + suite.Suite + pool *pgxpool.Pool + repo *repository.PostgresRepository +} + +func (ss *SyncQueuesIntTest) SetupSuite() { + require.NotNil(ss.T(), ss.pool) + repo, err := repository.NewPostgresRepository(ss.pool) + require.NoError(ss.T(), err) + ss.repo = repo +} + +func (ss *SyncQueuesIntTest) TearDownSuite() { + ss.pool.Close() +} + +func (ss *SyncQueuesIntTest) TestSyncQueues() { + + ctx := context.Background() + now := time.Now().UnixNano() + + tests := []struct { + name string + stateQueues []dao.PartitionQueueDAOInfo + existingQueues []*model.Queue + expected []*model.Queue + wantErr bool + }{ + { + name: "Sync queues with no existing queues", + stateQueues: []dao.PartitionQueueDAOInfo{ + { + ID: "1", + QueueName: "root", + PartitionID: "1", + Children: []dao.PartitionQueueDAOInfo{ + { + ID: "2", + QueueName: "root.child-1", + PartitionID: "1", + Children: []dao.PartitionQueueDAOInfo{ + { + ID: "3", + QueueName: "root.child-1.1", + PartitionID: "1", + }, + { + ID: "4", + QueueName: "root.child-1.2", + PartitionID: "1", + }, + }, + }, + }, + }, + }, + existingQueues: nil, + expected: []*model.Queue{ + { + PartitionQueueDAOInfo: dao.PartitionQueueDAOInfo{ + ID: "1", + QueueName: "root", + PartitionID: "1", + }, + }, + { + PartitionQueueDAOInfo: dao.PartitionQueueDAOInfo{ + ID: "2", + QueueName: "root.child-1", + PartitionID: "1", + }, + }, + { + PartitionQueueDAOInfo: dao.PartitionQueueDAOInfo{ + ID: "3", + QueueName: "root.child-1.1", + PartitionID: "1", + }, + }, + { + PartitionQueueDAOInfo: dao.PartitionQueueDAOInfo{ + ID: "4", + QueueName: "root.child-1.2", + PartitionID: "1", + }, + }, + }, + wantErr: false, + }, + { + name: "Sync queues with existing queues in DB", + stateQueues: []dao.PartitionQueueDAOInfo{{ + ID: "1", + QueueName: "root", + PartitionID: "1", + Children: []dao.PartitionQueueDAOInfo{ + { + ID: "2", + QueueName: "root.child-1", + PartitionID: "1", + }, + { + ID: "3", + QueueName: "root.child-2", + PartitionID: "1", + }, + }, + }, + }, + + existingQueues: []*model.Queue{ + { + Metadata: model.Metadata{ + CreatedAtNano: now, + }, + PartitionQueueDAOInfo: dao.PartitionQueueDAOInfo{ + ID: "1", + QueueName: "root", + PartitionID: "1", + }, + }, + }, + expected: []*model.Queue{ + { + PartitionQueueDAOInfo: dao.PartitionQueueDAOInfo{ + ID: "1", + QueueName: "root", + PartitionID: "1", + }, + }, + { + PartitionQueueDAOInfo: dao.PartitionQueueDAOInfo{ + ID: "2", + QueueName: "root.child-1", + PartitionID: "1", + }, + }, + { + PartitionQueueDAOInfo: dao.PartitionQueueDAOInfo{ + ID: "3", + QueueName: "root.child-2", + PartitionID: "1", + }, + }, + }, + wantErr: false, + }, + { + name: "Sync queues when queue is deleted", + stateQueues: []dao.PartitionQueueDAOInfo{ + { + ID: "1", + QueueName: "root", + PartitionID: "1", + Children: []dao.PartitionQueueDAOInfo{ + { + ID: "3", + QueueName: "root.child-2", + PartitionID: "1", + }, + }, + }, + }, + existingQueues: []*model.Queue{ + { + Metadata: model.Metadata{ + CreatedAtNano: now, + }, + PartitionQueueDAOInfo: dao.PartitionQueueDAOInfo{ + ID: "1", + QueueName: "root", + PartitionID: "1", + }, + }, + { + Metadata: model.Metadata{ + CreatedAtNano: now, + }, + PartitionQueueDAOInfo: dao.PartitionQueueDAOInfo{ + ID: "2", + QueueName: "root.child-1", + PartitionID: "1", + }, + }, + }, + expected: []*model.Queue{ + { + PartitionQueueDAOInfo: dao.PartitionQueueDAOInfo{ + ID: "1", + QueueName: "root", + PartitionID: "1", + }, + }, + { + PartitionQueueDAOInfo: dao.PartitionQueueDAOInfo{ + ID: "3", + QueueName: "root.child-2", + PartitionID: "1", + }, + }, + }, + wantErr: false, + }, + { + name: "Sync queues with multiple partitions", + stateQueues: []dao.PartitionQueueDAOInfo{ + { + ID: "1", + QueueName: "root", + PartitionID: "1", + Children: []dao.PartitionQueueDAOInfo{ + { + ID: "2", + QueueName: "root.child-1", + PartitionID: "1", + }, + { + ID: "3", + QueueName: "root.child-2", + PartitionID: "1", + }, + }, + }, + { + ID: "4", + QueueName: "root", + PartitionID: "2", + Children: []dao.PartitionQueueDAOInfo{ + { + ID: "5", + QueueName: "root.child-1", + PartitionID: "2", + }, + { + ID: "6", + QueueName: "root.child-2", + PartitionID: "2", + }, + }, + }, + }, + existingQueues: nil, + expected: []*model.Queue{ + { + PartitionQueueDAOInfo: dao.PartitionQueueDAOInfo{ + ID: "1", + QueueName: "root", + PartitionID: "1", + }, + }, + { + PartitionQueueDAOInfo: dao.PartitionQueueDAOInfo{ + ID: "2", + QueueName: "root.child-1", + PartitionID: "1", + }, + }, + { + PartitionQueueDAOInfo: dao.PartitionQueueDAOInfo{ + ID: "3", + QueueName: "root.child-2", + PartitionID: "1", + }, + }, + { + PartitionQueueDAOInfo: dao.PartitionQueueDAOInfo{ + ID: "4", + QueueName: "root", + PartitionID: "2", + }, + }, + { + PartitionQueueDAOInfo: dao.PartitionQueueDAOInfo{ + ID: "5", + QueueName: "root.child-1", + PartitionID: "2", + }, + }, + { + PartitionQueueDAOInfo: dao.PartitionQueueDAOInfo{ + ID: "6", + QueueName: "root.child-2", + PartitionID: "2", + }, + }, + }, + + wantErr: false, + }, + { + name: "Sync queues with deeply nested queues", + stateQueues: []dao.PartitionQueueDAOInfo{ + { + ID: "1", + QueueName: "root", + PartitionID: "1", + Children: []dao.PartitionQueueDAOInfo{ + { + ID: "2", + QueueName: "root.child-1", + PartitionID: "1", + Children: []dao.PartitionQueueDAOInfo{ + { + ID: "3", + QueueName: "root.child-1.1", + PartitionID: "1", + Children: []dao.PartitionQueueDAOInfo{ + { + ID: "4", + QueueName: "root.child-1.1.1", + PartitionID: "1", + }, + { + ID: "5", + QueueName: "root.child-1.1.2", + PartitionID: "1", + }, + }, + }, + }, + }, + }, + }, + }, + existingQueues: nil, + expected: []*model.Queue{ + { + PartitionQueueDAOInfo: dao.PartitionQueueDAOInfo{ + ID: "1", + QueueName: "root", + PartitionID: "1", + }, + }, + { + PartitionQueueDAOInfo: dao.PartitionQueueDAOInfo{ + ID: "2", + QueueName: "root.child-1", + PartitionID: "1", + }, + }, + { + PartitionQueueDAOInfo: dao.PartitionQueueDAOInfo{ + ID: "3", + QueueName: "root.child-1.1", + PartitionID: "1", + }, + }, + { + PartitionQueueDAOInfo: dao.PartitionQueueDAOInfo{ + ID: "4", + QueueName: "root.child-1.1.1", + PartitionID: "1", + }, + }, + { + PartitionQueueDAOInfo: dao.PartitionQueueDAOInfo{ + ID: "5", + QueueName: "root.child-1.1.2", + PartitionID: "1", + }, + }, + }, + wantErr: false, + }, + } + + for _, tt := range tests { + ss.Run(tt.name, func() { + // clean up the table after the test + ss.T().Cleanup(func() { + _, err := ss.pool.Exec(ctx, "DELETE FROM queues") + require.NoError(ss.T(), err) + }) + for _, q := range tt.existingQueues { + err := ss.repo.InsertQueue(ctx, q) + require.NoError(ss.T(), err) + } + + s := NewService(ss.repo, nil, nil) + + err := s.syncQueues(context.Background(), tt.stateQueues) + if tt.wantErr { + require.Error(ss.T(), err) + return + } + require.NoError(ss.T(), err) + queuesInDB, err := s.repo.GetAllQueues(ctx) + require.NoError(ss.T(), err) + for _, target := range tt.expected { + if !isQueuePresent(queuesInDB, target) { + ss.T().Errorf("Queue %s in partition %s is not found in the DB", target.QueueName, target.PartitionID) + } + } + }) + } +} + +func isQueuePresent(queuesInDB []*model.Queue, targetQueue *model.Queue) bool { + for _, dbQueue := range queuesInDB { + if dbQueue.QueueName == targetQueue.QueueName && dbQueue.PartitionID == targetQueue.PartitionID { + // Check if DeletedAtNano fields are either both nil or both non-nil + if (dbQueue.DeletedAtNano == nil && targetQueue.DeletedAtNano != nil) || + (dbQueue.DeletedAtNano != nil && targetQueue.DeletedAtNano == nil) { + return false // If one is nil and the other is not, return false + } + return true + } + } + return false +} diff --git a/internal/yunikorn/sync_suite_test.go b/internal/yunikorn/sync_suite_test.go new file mode 100644 index 00000000..91d559df --- /dev/null +++ b/internal/yunikorn/sync_suite_test.go @@ -0,0 +1,69 @@ +package yunikorn + +import ( + "context" + "testing" + + "github.com/jackc/pgx/v5/pgxpool" + "github.com/stretchr/testify/require" + "github.com/stretchr/testify/suite" + + "github.com/G-Research/unicorn-history-server/test/database" +) + +type SyncSuite struct { + suite.Suite + tp *database.TestPostgresContainer + pool *pgxpool.Pool +} + +func (ts *SyncSuite) SetupSuite() { + ctx := context.Background() + cfg := database.InstanceConfig{ + User: "test", + Password: "test", + DBName: "template", + Host: "localhost", + Port: 15434, + } + + tp, err := database.NewTestPostgresContainer(ctx, cfg) + require.NoError(ts.T(), err) + ts.tp = tp + err = tp.Migrate("../../migrations") + require.NoError(ts.T(), err) + + ts.pool = tp.Pool(ctx, ts.T(), &cfg) +} + +func (ts *SyncSuite) TearDownSuite() { + err := ts.tp.Container.Terminate(context.Background()) + require.NoError(ts.T(), err) +} + +func (ts *SyncSuite) TestSubSuites() { + ts.T().Run("SyncNodesIntTest", func(t *testing.T) { + pool := database.CloneDB(t, ts.tp, ts.pool) + suite.Run(t, &SyncNodesIntTest{pool: pool}) + }) + ts.T().Run("SyncQueuesIntTest", func(t *testing.T) { + pool := database.CloneDB(t, ts.tp, ts.pool) + suite.Run(t, &SyncQueuesIntTest{pool: pool}) + }) + ts.T().Run("SyncPartitionIntTest", func(t *testing.T) { + pool := database.CloneDB(t, ts.tp, ts.pool) + suite.Run(t, &SyncPartitionIntTest{pool: pool}) + }) + ts.T().Run("SyncApplicationsIntTest", func(t *testing.T) { + pool := database.CloneDB(t, ts.tp, ts.pool) + suite.Run(t, &SyncApplicationsIntTest{pool: pool}) + }) +} + +func TestSyncIntegration(t *testing.T) { + if testing.Short() { + t.Skip("skipping integration test in short mode.") + } + topSuite := new(SyncSuite) + suite.Run(t, topSuite) +}