From cfceff7e47cf38212835735d7bdb758abed69b41 Mon Sep 17 00:00:00 2001 From: Isaak Date: Fri, 23 Aug 2024 14:25:53 -0400 Subject: [PATCH] #3042 added pipeline update --- .../management/AdapterUpdateManagement.java | 8 ++++++ .../rest/impl/pe/DataStreamResource.java | 26 ++++++++++++++++--- 2 files changed, 31 insertions(+), 3 deletions(-) diff --git a/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/AdapterUpdateManagement.java b/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/AdapterUpdateManagement.java index 9aae62888d..bc9d57738a 100644 --- a/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/AdapterUpdateManagement.java +++ b/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/AdapterUpdateManagement.java @@ -105,6 +105,14 @@ public void updateAdapter(AdapterDescription ad) } } + public void updateDataStream(SpDataStream dataStream) throws AdapterException { + var correspondingAdapter = adapterMasterManagement.getAdapter(dataStream.getCorrespondingAdapterId()); + dataStreamResourceManager.update(dataStream); + + correspondingAdapter.setDataStream(dataStream); + updateAdapter(correspondingAdapter); + } + public List checkPipelineMigrations(AdapterDescription adapterDescription) { var affectedPipelines = PipelineManager .getPipelinesContainingElements(adapterDescription.getCorrespondingDataStreamElementId()); diff --git a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/pe/DataStreamResource.java b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/pe/DataStreamResource.java index 2ecab6d806..b5b6ab452a 100644 --- a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/pe/DataStreamResource.java +++ b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/pe/DataStreamResource.java @@ -18,13 +18,19 @@ package org.apache.streampipes.rest.impl.pe; +import org.apache.streampipes.commons.exceptions.connect.AdapterException; +import org.apache.streampipes.commons.prometheus.adapter.AdapterMetricsManager; +import org.apache.streampipes.connect.management.management.AdapterMasterManagement; +import org.apache.streampipes.connect.management.management.AdapterUpdateManagement; import org.apache.streampipes.model.SpDataStream; import org.apache.streampipes.model.message.Message; import org.apache.streampipes.model.message.NotificationType; import org.apache.streampipes.model.monitoring.SpLogMessage; import org.apache.streampipes.resource.management.DataStreamResourceManager; -import org.apache.streampipes.rest.core.base.impl.AbstractAuthGuardedRestResource; +import org.apache.streampipes.resource.management.SpResourceManager; +import org.apache.streampipes.rest.impl.connect.AbstractAdapterResource; import org.apache.streampipes.rest.security.AuthConstants; +import org.apache.streampipes.storage.management.StorageDispatcher; import org.springframework.http.MediaType; import org.springframework.http.ResponseEntity; @@ -43,7 +49,17 @@ @RestController @RequestMapping("/api/v2/streams") -public class DataStreamResource extends AbstractAuthGuardedRestResource { +public class DataStreamResource extends AbstractAdapterResource { + + public DataStreamResource() { + super(() -> new AdapterMasterManagement( + StorageDispatcher.INSTANCE.getNoSqlStore() + .getAdapterInstanceStorage(), + new SpResourceManager().manageAdapters(), + new SpResourceManager().manageDataStreams(), + AdapterMetricsManager.INSTANCE.getAdapterMetrics() + )); + } @GetMapping(path = "/available", produces = MediaType.APPLICATION_JSON_VALUE) @PreAuthorize(AuthConstants.HAS_READ_PIPELINE_ELEMENT_PRIVILEGE) @@ -98,8 +114,12 @@ public ResponseEntity addDataStream(@RequestBody SpDataStream dataStream) { public ResponseEntity updateDataStream(@RequestBody SpDataStream dataStream) { try { getDataStreamResourceManager().update(dataStream); + var updateManager = new AdapterUpdateManagement(managementService); + + updateManager.updateDataStream(dataStream); + return ok(); - } catch (IllegalArgumentException e) { + } catch (AdapterException e) { return badRequest(e.getMessage()); } }