From a7657669b133a837fd521f2ce1bc848de1e8afa4 Mon Sep 17 00:00:00 2001 From: Andrew Gee Date: Thu, 11 Jul 2024 08:54:45 +0100 Subject: [PATCH] fs2-rabbit backend (#82) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * init consumer using rabbit fs2 * implement amqpClient interface with fs2 rabbit * fix requeue * WIP separation into modules * All compilation and tests * Finish off Delivery MessageEncoder and support all declaration options * Remove temporary client in examples module * Requeue on reject for RequeueImmediately support * Restore example to JavaBackend client * Bleh! Type erasure is the worst * `deliveryEncoder` tests * WIP * Get compiling again after a rebase * Passing tests * Abstract out fs2-rabbit config * Move unit tests to relevant module * Fix up test logging * Sort declarations before executing them! * Try existing integration tests with new client * Added lots Co-authored-by: Andrew Gee 🫥 * Reworked publisher to return an F[Publisher[F, ???]] Co-authored-by: Jos Bogan * Fixed error in publishing Co-authored-by: Andrew Gee 🫥 * working for cross compile * Supporting 2.12 Co-authored-by: Andrew Gee 🫥 * More 2.12 Co-authored-by: Andrew Gee 🫥 * More 2.12 madness Co-authored-by: Andrew Gee 🫥 * revert version * Co-authored-by: Andrew Gee 🫥 * Time :( Co-authored-by: Andrew Gee 🫥 * success? Co-authored-by: Jos Bogan --------- Co-authored-by: Luke Thomas Co-authored-by: joseboga Co-authored-by: Jos Bogan --- .../AmqpClientConnectionManager.scala | 87 +++++ .../fs2rabbit/Fs2RabbitAmqpClient.scala | 334 ++++++++++++++++++ .../fs2rabbit/PendingConfirmListener.scala | 59 ++++ .../fs2rabbit/Fs2RabbitAmqpClientSpec.scala | 225 ++++++++++++ .../AmqpClientConnectionManager.scala | 18 +- .../itv/bucky/backend/javaamqp}/Channel.scala | 12 +- .../backend/javaamqp/EnvelopeConversion.scala | 9 + .../javaamqp/JavaBackendAmqpClient.scala | 175 +++++++++ .../MessagePropertiesConverters.scala | 3 +- .../backend/javaamqp}/consume/Consumer.scala | 6 +- .../publish/PendingConfirmListener.scala | 4 +- .../MessagePropertiesConvertersTest.scala | 11 +- build.sbt | 51 ++- .../main/scala/com/itv/bucky/AmqpClient.scala | 165 +-------- .../com/itv/bucky/AmqpClientConfig.scala | 28 +- .../main/scala/com/itv/bucky/Envelope.scala | 5 - .../com/itv/bucky/LoggingAmqpClient.scala | 80 +++-- .../main/scala/com/itv/bucky/package.scala | 75 ++-- .../itv/bucky/pattern/requeue/package.scala | 9 +- .../bucky/publish/PublishCommandBuilder.scala | 9 +- .../scala/com/itv/bucky/publish/package.scala | 2 +- .../scala/com/itv/bucky/wiring/Wiring.scala | 11 +- core/src/test/resources/logback.xml | 4 +- example/src/main/resources/logback.xml | 4 +- .../circe/CirceMarshalledPublisher.scala | 9 +- .../circe/CirceUnmarshalledConsumer.scala | 4 +- .../marshalling/MarshallingPublisher.scala | 6 +- .../marshalling/UnmarshallingConsumer.scala | 3 +- .../example/requeue/RequeueConsumer.scala | 20 +- it/src/test/resources/logback.xml | 10 +- .../{integrationTest => }/HammerTest.scala | 48 ++- .../package.scala => IntegrationSpec.scala} | 2 +- .../PublishIntegrationTest.scala | 29 +- .../RequeueIntegrationTest.scala | 21 +- ...queueWithExpiryActionIntegrationTest.scala | 26 +- .../ShutdownTimeoutTest.scala | 60 ++-- .../scala/com/itv/bucky/test/package.scala | 5 +- .../itv/bucky/test/stubs/StubChannel.scala | 5 +- test/src/test/resources/logback.xml | 4 +- .../itv/bucky/test/PublishConsumeTest.scala | 40 +-- .../com/itv/bucky/test/PublisherTest.scala | 29 +- .../scala/com/itv/bucky/test/StubTest.scala | 28 +- 42 files changed, 1279 insertions(+), 456 deletions(-) create mode 100644 backendFs2Rabbit/src/main/scala/com/itv/bucky/backend/fs2rabbit/AmqpClientConnectionManager.scala create mode 100644 backendFs2Rabbit/src/main/scala/com/itv/bucky/backend/fs2rabbit/Fs2RabbitAmqpClient.scala create mode 100644 backendFs2Rabbit/src/main/scala/com/itv/bucky/backend/fs2rabbit/PendingConfirmListener.scala create mode 100644 backendFs2Rabbit/src/test/scala/fs2rabbit/Fs2RabbitAmqpClientSpec.scala rename {core/src/main/scala/com/itv/bucky => backendJavaAmqp/src/main/scala/com/itv/bucky/backend/javaamqp}/AmqpClientConnectionManager.scala (91%) rename {core/src/main/scala/com/itv/bucky => backendJavaAmqp/src/main/scala/com/itv/bucky/backend/javaamqp}/Channel.scala (94%) create mode 100644 backendJavaAmqp/src/main/scala/com/itv/bucky/backend/javaamqp/EnvelopeConversion.scala create mode 100644 backendJavaAmqp/src/main/scala/com/itv/bucky/backend/javaamqp/JavaBackendAmqpClient.scala rename {core/src/main/scala/com/itv/bucky => backendJavaAmqp/src/main/scala/com/itv/bucky/backend/javaamqp}/MessagePropertiesConverters.scala (96%) rename {core/src/main/scala/com/itv/bucky => backendJavaAmqp/src/main/scala/com/itv/bucky/backend/javaamqp}/consume/Consumer.scala (67%) rename {core/src/main/scala/com/itv/bucky => backendJavaAmqp/src/main/scala/com/itv/bucky/backend/javaamqp}/publish/PendingConfirmListener.scala (96%) rename {core/src/test/scala/com/itv/bucky => backendJavaAmqp/src/test/scala/com/itv/bucky/backend/javaamqp}/MessagePropertiesConvertersTest.scala (91%) rename it/src/test/scala/com/itv/bucky/{integrationTest => }/HammerTest.scala (80%) rename it/src/test/scala/com/itv/bucky/{integrationTest/package.scala => IntegrationSpec.scala} (96%) rename it/src/test/scala/com/itv/bucky/{integrationTest => }/PublishIntegrationTest.scala (68%) rename it/src/test/scala/com/itv/bucky/{integrationTest => }/RequeueIntegrationTest.scala (83%) rename it/src/test/scala/com/itv/bucky/{integrationTest => }/RequeueWithExpiryActionIntegrationTest.scala (82%) rename it/src/test/scala/com/itv/bucky/{integrationTest => }/ShutdownTimeoutTest.scala (62%) diff --git a/backendFs2Rabbit/src/main/scala/com/itv/bucky/backend/fs2rabbit/AmqpClientConnectionManager.scala b/backendFs2Rabbit/src/main/scala/com/itv/bucky/backend/fs2rabbit/AmqpClientConnectionManager.scala new file mode 100644 index 00000000..4768d792 --- /dev/null +++ b/backendFs2Rabbit/src/main/scala/com/itv/bucky/backend/fs2rabbit/AmqpClientConnectionManager.scala @@ -0,0 +1,87 @@ +package com.itv.bucky.backend.fs2rabbit + +import cats.effect._ +import cats.effect.implicits._ +import cats.effect.std.Dispatcher +import cats.implicits._ +import com.itv.bucky.publish._ +import com.itv.bucky.{AmqpClientConfig, Publisher, publish} +import com.rabbitmq.client.Channel +import com.typesafe.scalalogging.StrictLogging +import dev.profunktor.fs2rabbit.effects.MessageEncoder +import dev.profunktor.fs2rabbit.interpreter.RabbitClient +import dev.profunktor.fs2rabbit.model +import dev.profunktor.fs2rabbit.model.{AMQPChannel, PublishingFlag} + +import scala.collection.immutable.TreeMap +import scala.util.Try + +class AmqpClientConnectionManager[F[_]: Async]( + amqpConfig: AmqpClientConfig, + pendingConfirmListener: PendingConfirmListener[F], + dispatcher: Dispatcher[F], + client: RabbitClient[F] +)(implicit publishCommandEncoder: MessageEncoder[F, PublishCommand], amqpChannel: AMQPChannel) + extends StrictLogging { + + val publishChannel: Channel = amqpChannel.value + + private def synchroniseIfNeeded[T](f: => T): T = publishChannel.synchronized(f) + private def runWithChannelSync[T](action: F[T]): F[T] = + synchroniseIfNeeded { + Async[F].fromTry(Try { + dispatcher.unsafeRunSync(action) + }) + } + + def addConfirmListeningToPublisher(publisher: Publisher[F, PublishCommand]): Publisher[F, PublishCommand] = (cmd: PublishCommand) => + for { + deliveryTag <- Ref.of[F, Option[Long]](None) + _ <- (for { + signal <- Deferred[F, Option[Throwable]] + _ <- runWithChannelSync { + for { + nextPublishSeq <- Async[F].blocking(publishChannel.getNextPublishSeqNo) + _ <- deliveryTag.set(Some(nextPublishSeq)) + _ <- pendingConfirmListener.pendingConfirmations.update(_ + (nextPublishSeq -> signal)) + _ <- publisher(cmd) + } yield () + } + _ <- signal.get.flatMap(maybeError => maybeError.traverse(Async[F].raiseError[Unit])) + } yield ()) + .timeout(amqpConfig.publishingTimeout) + .recoverWith { case e => + runWithChannelSync { + for { + dl <- deliveryTag.get + deliveryTag <- Async[F].fromOption(dl, new RuntimeException("Timeout occurred before a delivery tag could be obtained.", e)) + _ <- pendingConfirmListener.pop(deliveryTag, multiple = false) + _ <- Async[F].raiseError[Unit](e) + } yield () + } + } + } yield () + +} + +private[bucky] object AmqpClientConnectionManager extends StrictLogging { + + def apply[F[_]]( + config: AmqpClientConfig, + client: RabbitClient[F], + amqpChannel: AMQPChannel, + dispatcher: Dispatcher[F] + )(implicit + F: Async[F], + publishCommandEncoder: MessageEncoder[F, PublishCommand] + ): F[AmqpClientConnectionManager[F]] = + for { + pendingConfirmations <- Ref.of[F, TreeMap[Long, Deferred[F, Option[Throwable]]]](TreeMap.empty) + pendingReturn <- Ref.of[F, Option[Throwable]](None) + publishChannel = amqpChannel.value + _ <- Async[F].blocking(publishChannel.confirmSelect) + confirmListener <- Async[F].blocking(PendingConfirmListener(pendingConfirmations, pendingReturn, dispatcher)) + _ <- Async[F].blocking(publishChannel.addConfirmListener(confirmListener)) + _ <- Async[F].blocking(publishChannel.addReturnListener(confirmListener)) + } yield new AmqpClientConnectionManager(config, confirmListener, dispatcher, client)(implicitly, implicitly, amqpChannel) +} diff --git a/backendFs2Rabbit/src/main/scala/com/itv/bucky/backend/fs2rabbit/Fs2RabbitAmqpClient.scala b/backendFs2Rabbit/src/main/scala/com/itv/bucky/backend/fs2rabbit/Fs2RabbitAmqpClient.scala new file mode 100644 index 00000000..0fcf0206 --- /dev/null +++ b/backendFs2Rabbit/src/main/scala/com/itv/bucky/backend/fs2rabbit/Fs2RabbitAmqpClient.scala @@ -0,0 +1,334 @@ +package com.itv.bucky.backend.fs2rabbit + +import cats.data.Kleisli +import cats.effect.implicits.{genSpawnOps, genTemporalOps} +import cats.effect.std.Dispatcher +import cats.effect.{Async, Ref, Resource} +import cats.implicits._ +import com.itv.bucky +import com.itv.bucky.backend.fs2rabbit.Fs2RabbitAmqpClient.deliveryDecoder +import com.itv.bucky.consume.DeliveryMode +import com.itv.bucky.decl.ExchangeType +import com.itv.bucky.publish.{ContentEncoding, ContentType, PublishCommand} +import com.itv.bucky.{ + AmqpClient, + AmqpClientConfig, + Envelope, + ExchangeName, + Handler, + Payload, + Publisher, + QueueName, + RoutingKey, + consume, + decl, + publish +} +import com.rabbitmq.client.LongString +import dev.profunktor.fs2rabbit.arguments.SafeArg +import dev.profunktor.fs2rabbit.config.Fs2RabbitConfig +import dev.profunktor.fs2rabbit.config.declaration._ +import dev.profunktor.fs2rabbit.effects.{EnvelopeDecoder, MessageEncoder} +import dev.profunktor.fs2rabbit.interpreter.RabbitClient +import dev.profunktor.fs2rabbit.model +import dev.profunktor.fs2rabbit.model.AmqpFieldValue.{ + ArrayVal, + BooleanVal, + ByteArrayVal, + ByteVal, + DecimalVal, + DoubleVal, + FloatVal, + IntVal, + LongVal, + NullVal, + ShortVal, + StringVal, + TableVal, + TimestampVal +} +import dev.profunktor.fs2rabbit.model.{AMQPChannel, PublishingFlag, ShortString} +import scodec.bits.ByteVector + +import java.util.{Date, UUID} +import scala.concurrent.duration.FiniteDuration +import scala.jdk.CollectionConverters._ +import scala.language.higherKinds +import Fs2RabbitAmqpClient._ +import cats.effect.kernel.Temporal + +class Fs2RabbitAmqpClient[F[_]: Async: Temporal]( + client: RabbitClient[F], + connection: model.AMQPConnection, + publishChannel: model.AMQPChannel, + amqpClientConnectionManager: AmqpClientConnectionManager[F] +) extends AmqpClient[F] { + + override def declare(declarations: decl.Declaration*): F[Unit] = declare(declarations.toList) + + override def declare(declarations: Iterable[decl.Declaration]): F[Unit] = { + + def mapToSafeArg(kv: (String, AnyRef)): (String, SafeArg) = kv match { + case (k, arg: String) => k -> arg + case (k, arg: BigDecimal) => k -> arg + case (k, arg: Integer) => k -> arg.intValue() + case (k, arg: java.lang.Long) => k -> arg.longValue() + case (k, arg: java.lang.Double) => k -> arg.doubleValue() + case (k, arg: java.lang.Float) => k -> arg.floatValue() + case (k, arg: java.lang.Short) => k -> arg.shortValue() + case (k, arg: java.lang.Boolean) => k -> arg.booleanValue() + case (k, arg: java.lang.Byte) => k -> arg.byteValue() + case (k, arg: java.util.Date) => k -> arg + case t => throw new IllegalArgumentException(s"Unsupported type for rabbit arguments $t") + } + + def argumentsFromAnyRef(arguments: Map[String, AnyRef]): Map[String, SafeArg] = + arguments.map(mapToSafeArg) + + def exchangeTypeToFs2ExchangeType(exchangeType: ExchangeType): model.ExchangeType = + exchangeType match { + case decl.Direct => model.ExchangeType.Direct + case decl.Topic => model.ExchangeType.Topic + case decl.Headers => model.ExchangeType.Headers + case decl.Fanout => model.ExchangeType.FanOut + } + + client.createChannel(connection).use { implicit channel => + declarations.toList + .sortBy { + case _: decl.Queue => 0 + case _: decl.Exchange => 1 + case _ => 2 + } + .map { + case decl.Exchange(name, exchangeType, isDurable, shouldAutoDelete, isInternal, arguments, bindings) => + client.declareExchange( + DeclarationExchangeConfig + .default(model.ExchangeName(name.value), exchangeTypeToFs2ExchangeType(exchangeType)) + .copy( + arguments = argumentsFromAnyRef(arguments), + durable = if (isDurable) Durable else NonDurable, + autoDelete = if (shouldAutoDelete) AutoDelete else NonAutoDelete, + internal = if (isInternal) Internal else NonInternal + ) + ) *> + bindings.traverse_ { binding => + client.bindQueue( + model.QueueName(binding.queueName.value), + model.ExchangeName(binding.exchangeName.value), + model.RoutingKey(binding.routingKey.value), + model.QueueBindingArgs(argumentsFromAnyRef(binding.arguments)) + ) + } + case decl.Binding(exchangeName, queueName, routingKey, arguments) => + client.bindQueue( + model.QueueName(queueName.value), + model.ExchangeName(exchangeName.value), + model.RoutingKey(routingKey.value), + model.QueueBindingArgs(argumentsFromAnyRef(arguments)) + ) + case decl.ExchangeBinding(destinationExchangeName, sourceExchangeName, routingKey, arguments) => + client.bindExchange( + model.ExchangeName(destinationExchangeName.value), + model.ExchangeName(sourceExchangeName.value), + model.RoutingKey(routingKey.value), + model.ExchangeBindingArgs(argumentsFromAnyRef(arguments)) + ) + case decl.Queue(name, isDurable, isExclusive, shouldAutoDelete, arguments) => + client.declareQueue( + DeclarationQueueConfig + .default(model.QueueName(name.value)) + .copy( + arguments = argumentsFromAnyRef(arguments), + durable = if (isDurable) Durable else NonDurable, + autoDelete = if (shouldAutoDelete) AutoDelete else NonAutoDelete, + exclusive = if (isExclusive) Exclusive else NonExclusive + ) + ) + } + .sequence_ + } + + } + + override def publisher(mandatory: Boolean): F[Publisher[F, publish.PublishCommand]] = + client + .createBasicPublisherWithListener[PublishCommand]( + PublishingFlag(mandatory), + _ => Async[F].unit // Mandatory returns ignored here, but are handled in AmqpClientConnectionManager + )(publishChannel, implicitly) + .map { publisher => (publishCommand: PublishCommand) => + publisher( + model.ExchangeName(publishCommand.exchange.value), + model.RoutingKey(publishCommand.routingKey.value), + publishCommand + ) + } + .map(amqpClientConnectionManager.addConfirmListeningToPublisher) + + private def repeatUntil[A](eval: F[A])(pred: A => Boolean)(sleep: FiniteDuration): F[Unit] = + for { + result <- eval + ended <- Async[F].pure(pred(result)) + _ <- if (ended) Async[F].unit else Temporal[F].sleep(sleep) *> repeatUntil(eval)(pred)(sleep) + } yield () + + override def registerConsumer( + queueName: bucky.QueueName, + handler: Handler[F, consume.Delivery], + exceptionalAction: consume.ConsumeAction, + prefetchCount: Int, + shutdownTimeout: FiniteDuration, + shutdownRetry: FiniteDuration + ): Resource[F, Unit] = + client.createChannel(connection).flatMap { implicit channel => + implicit val decoder: EnvelopeDecoder[F, consume.Delivery] = deliveryDecoder(queueName) + Resource.eval(Ref.of[F, Set[UUID]](Set.empty)).flatMap { consumptionIds => + Resource.eval(client.createAckerConsumer[consume.Delivery](model.QueueName(queueName.value))).flatMap { case (acker, consumer) => + consumer + .evalMap(delivery => + for { + uuid <- Async[F].delay(UUID.randomUUID()) + _ <- consumptionIds.update(set => set + uuid) + res <- handler(delivery.payload).attempt + tag = delivery.deliveryTag + _ <- consumptionIds.update(set => set - uuid) + result <- Async[F].fromEither(res) + } yield (result, tag) + ) + .evalMap { + case (consume.Ack, tag) => acker(model.AckResult.Ack(tag)) + case (consume.DeadLetter, tag) => acker(model.AckResult.NAck(tag)) + case (consume.RequeueImmediately, tag) => acker(model.AckResult.Reject(tag)) + } + .compile + .drain + .background + .flatMap { _ => + Resource.onFinalize( + repeatUntil(consumptionIds.get)(_.isEmpty)(shutdownRetry).timeout(shutdownTimeout) + ) + } + .map(_ => ()) + } + } + } + + override def isConnectionOpen: F[Boolean] = Async[F].pure(connection.value.isOpen) + +} + +object Fs2RabbitAmqpClient { + + def apply[F[_]: Async](config: AmqpClientConfig): Resource[F, Fs2RabbitAmqpClient[F]] = { + val fs2RabbitConfig = Fs2RabbitConfig( + config.host, + config.port, + config.virtualHost.getOrElse("/"), + config.connectionTimeout, + ssl = config.ssl, + Some(config.username), + Some(config.password), + requeueOnNack = config.requeueOnNack, + requeueOnReject = config.requeueOnReject, + config.internalQueueSize + ) + for { + client <- RabbitClient.default[F](fs2RabbitConfig).resource + dispatcher <- Dispatcher.parallel[F] + connection <- client.createConnection + publishChannel <- client.createConnectionChannel + amqpClientConnectionManager <- Resource.eval( + AmqpClientConnectionManager[F]( + config = config, + client = client, + dispatcher = dispatcher, + amqpChannel = publishChannel + ) + ) + } yield new Fs2RabbitAmqpClient(client, connection, publishChannel, amqpClientConnectionManager) + } + + implicit def deliveryEncoder[F[_]: Async]: MessageEncoder[F, PublishCommand] = + Kleisli { publishCommand => + def toAmqpValue(value: AnyRef): model.AmqpFieldValue = value match { + case bd: java.math.BigDecimal => DecimalVal.unsafeFrom(bd) + case ts: java.time.Instant => TimestampVal.from(ts) + case d: java.util.Date => TimestampVal.from(d) + case t: java.util.Map[_, _] => + TableVal(t.asScala.toMap.collect { case (key: String, v: AnyRef) => ShortString.unsafeFrom(key) -> toAmqpValue(v) }) + case byte: java.lang.Byte => ByteVal(byte) + case double: java.lang.Double => DoubleVal(double) + case float: java.lang.Float => FloatVal(float) + case short: java.lang.Short => ShortVal(short) + case byteArray: Array[Byte] => ByteArrayVal(ByteVector(byteArray)) + case b: java.lang.Boolean => BooleanVal(b) + case i: java.lang.Integer => IntVal(i) + case l: java.lang.Long => LongVal(l) + case s: java.lang.String => StringVal(s) + case ls: LongString => StringVal(ls.toString) + case a: java.util.List[_] => ArrayVal(a.asScala.toVector.collect { case v: AnyRef => toAmqpValue(v) }) + case _ => NullVal + } + + val fs2MessageHeaders: Map[String, model.AmqpFieldValue] = publishCommand.basicProperties.headers.map { case (key, headerValue) => + key -> toAmqpValue(headerValue) + } + + val message = model.AmqpMessage( + publishCommand.body.value, + model.AmqpProperties( + contentType = publishCommand.basicProperties.contentType.map(_.value), + contentEncoding = publishCommand.basicProperties.contentEncoding.map(_.value), + priority = publishCommand.basicProperties.priority, + deliveryMode = publishCommand.basicProperties.deliveryMode.map(dm => model.DeliveryMode.from(dm.value)), + correlationId = publishCommand.basicProperties.correlationId, + messageId = publishCommand.basicProperties.messageId, + `type` = publishCommand.basicProperties.messageType, + userId = publishCommand.basicProperties.userId, + appId = publishCommand.basicProperties.appId, + expiration = publishCommand.basicProperties.expiration, + replyTo = publishCommand.basicProperties.replyTo, + clusterId = publishCommand.basicProperties.clusterId, + timestamp = publishCommand.basicProperties.timestamp.map(_.toInstant), + headers = fs2MessageHeaders + ) + ) + + Async[F].pure(message) + } + + def deliveryDecoder[F[_]: Async](queueName: QueueName): EnvelopeDecoder[F, consume.Delivery] = + Kleisli { amqpEnvelope => + val messageProperties = publish.MessageProperties( + contentType = amqpEnvelope.properties.contentType.map(ContentType.apply), + contentEncoding = amqpEnvelope.properties.contentEncoding.map(ContentEncoding.apply), + headers = amqpEnvelope.properties.headers.map { case (key, headerValue) => key -> headerValue.toValueWriterCompatibleJava }, + deliveryMode = amqpEnvelope.properties.deliveryMode.map(dm => DeliveryMode(dm.value)), + priority = amqpEnvelope.properties.priority, + correlationId = amqpEnvelope.properties.correlationId, + replyTo = amqpEnvelope.properties.replyTo, + expiration = amqpEnvelope.properties.expiration, + messageId = amqpEnvelope.properties.messageId, + timestamp = amqpEnvelope.properties.timestamp.map(Date.from), + messageType = amqpEnvelope.properties.`type`, + userId = amqpEnvelope.properties.userId, + appId = amqpEnvelope.properties.appId, + clusterId = amqpEnvelope.properties.clusterId + ) + + Async[F].pure( + consume.Delivery( + Payload(amqpEnvelope.payload), + consume.ConsumerTag.create(queueName), + Envelope( + amqpEnvelope.deliveryTag.value, + amqpEnvelope.redelivered, + ExchangeName(amqpEnvelope.exchangeName.value), + RoutingKey(amqpEnvelope.routingKey.value) + ), + messageProperties + ) + ) + } +} diff --git a/backendFs2Rabbit/src/main/scala/com/itv/bucky/backend/fs2rabbit/PendingConfirmListener.scala b/backendFs2Rabbit/src/main/scala/com/itv/bucky/backend/fs2rabbit/PendingConfirmListener.scala new file mode 100644 index 00000000..93546159 --- /dev/null +++ b/backendFs2Rabbit/src/main/scala/com/itv/bucky/backend/fs2rabbit/PendingConfirmListener.scala @@ -0,0 +1,59 @@ +package com.itv.bucky.backend.fs2rabbit + +import cats.effect._ +import cats.effect.std.Dispatcher +import cats.implicits._ +import com.rabbitmq.client.{AMQP, ConfirmListener, ReturnListener} +import com.typesafe.scalalogging.StrictLogging + +import scala.collection.immutable.TreeMap + +private[bucky] case class PendingConfirmListener[F[_]]( + pendingConfirmations: Ref[F, TreeMap[Long, Deferred[F, Option[Throwable]]]], + pendingReturn: Ref[F, Option[Throwable]], + dispatcher: Dispatcher[F] +)(implicit F: Sync[F]) + extends ConfirmListener + with ReturnListener + with StrictLogging { + + def pop[T](deliveryTag: Long, multiple: Boolean): F[List[Deferred[F, Option[Throwable]]]] = + pendingConfirmations.modify { x => + if (multiple) { + val entries = x.until(deliveryTag + 1).toList + (x -- entries.map { case (key, _) => key }, entries.map { case (_, value) => value }) + } else { + (x - deliveryTag, x.get(deliveryTag).toList) + } + } + + override def handleAck(deliveryTag: Long, multiple: Boolean): Unit = + dispatcher.unsafeRunSync( + for { + returnRef <- pendingReturn.getAndSet(None) + toComplete <- pop(deliveryTag, multiple) + _ <- F.delay(logger.info("Received ack for delivery tag: {} and multiple: {}", deliveryTag, multiple)) + _ <- toComplete.traverse(_.complete(returnRef)) + } yield () + ) + + override def handleNack(deliveryTag: Long, multiple: Boolean): Unit = + dispatcher.unsafeRunSync( + for { + toComplete <- pop(deliveryTag, multiple) + _ <- F.delay(logger.error("Received Nack for delivery tag: {} and multiple: {}", deliveryTag, multiple)) + _ <- pendingReturn.set(None) + _ <- toComplete.traverse(_.complete(Some(new Throwable(s"Received Nack for delivery tag: $deliveryTag and multiple: $multiple")))) + } yield () + ) + + override def handleReturn( + replyCode: Int, + replyText: String, + exchange: String, + routingKey: String, + properties: AMQP.BasicProperties, + body: Array[Byte] + ): Unit = + dispatcher.unsafeRunSync(pendingReturn.set(Some(new Throwable(s"Message publish returned with $replyCode: " + replyText)))) +} diff --git a/backendFs2Rabbit/src/test/scala/fs2rabbit/Fs2RabbitAmqpClientSpec.scala b/backendFs2Rabbit/src/test/scala/fs2rabbit/Fs2RabbitAmqpClientSpec.scala new file mode 100644 index 00000000..05613a25 --- /dev/null +++ b/backendFs2Rabbit/src/test/scala/fs2rabbit/Fs2RabbitAmqpClientSpec.scala @@ -0,0 +1,225 @@ +package fs2rabbit + +import cats.effect.IO +import cats.effect.testing.scalatest.AsyncIOSpec +import com.itv.bucky.PayloadMarshaller.StringPayloadMarshaller +import com.itv.bucky.backend.fs2rabbit.Fs2RabbitAmqpClient +import com.itv.bucky.consume.DeliveryMode +import com.itv.bucky.publish.{MessageProperties, PublishCommandBuilder} +import com.itv.bucky.{AmqpClientConfig, ExchangeName, QueueName, RoutingKey, consume, publish} +import com.rabbitmq.client.impl.LongStringHelper +import dev.profunktor.fs2rabbit.model +import dev.profunktor.fs2rabbit.model.{AmqpEnvelope, AmqpProperties, DeliveryTag, ShortString} +import org.scalatest.OptionValues +import org.scalatest.matchers.should.Matchers +import org.scalatest.wordspec.AsyncWordSpec +import scodec.bits.ByteVector + +import java.time.Instant +import java.time.temporal.ChronoUnit +import java.util.Date +import scala.jdk.CollectionConverters._ + +class Fs2RabbitAmqpClientSpec extends AsyncWordSpec with AsyncIOSpec with Matchers with OptionValues { + + val amqpClientConfig: AmqpClientConfig = AmqpClientConfig("localhost", 5672, "guest", "guest") + + val now: Instant = Instant.now().truncatedTo(ChronoUnit.SECONDS) + + val amqpMessageHeaders: Map[String, model.AmqpFieldValue] = Map( + "bigDecimal" -> model.AmqpFieldValue.DecimalVal.unsafeFrom(1), + "instant" -> model.AmqpFieldValue.TimestampVal.from(now), + "date" -> model.AmqpFieldValue.TimestampVal.from(Date.from(now)), + "map" -> model.AmqpFieldValue.TableVal( + Map( + ShortString.unsafeFrom("inside") -> model.AmqpFieldValue.IntVal(1) + ) + ), + "byte" -> model.AmqpFieldValue.ByteVal('c'.toByte), + "double" -> model.AmqpFieldValue.DoubleVal(Double.box(4.5)), + "float" -> model.AmqpFieldValue.FloatVal(Double.box(4.5).toFloat), + "short" -> model.AmqpFieldValue.ShortVal(Short.box(1)), + "byteArray" -> model.AmqpFieldValue.ByteArrayVal(ByteVector(0.toByte, 1.toByte)), + "int" -> model.AmqpFieldValue.IntVal(Integer.MIN_VALUE), + "long" -> model.AmqpFieldValue.LongVal(Long.MinValue), + "string" -> model.AmqpFieldValue.StringVal("blah"), + "longString" -> model.AmqpFieldValue.StringVal("blahlong"), + "list" -> model.AmqpFieldValue.ArrayVal( + Vector( + model.AmqpFieldValue.IntVal(1), + model.AmqpFieldValue.IntVal(2), + model.AmqpFieldValue.IntVal(3) + ) + ) + ) + + val messagePropertyHeaders: Map[String, AnyRef] = Map( + "bigDecimal" -> java.math.BigDecimal.ONE, + "instant" -> now, + "date" -> Date.from(now), + "map" -> Map( + "inside" -> 1 + ).asJava, + "byte" -> Byte.box('c'.toByte), + "double" -> Double.box(4.5), + "float" -> Float.box(Double.box(4.5).toFloat), + "short" -> Short.box(1), + "byteArray" -> Array(0.toByte, 1.toByte), + "int" -> Int.box(Integer.MIN_VALUE), + "long" -> Long.box(Long.MinValue), + "string" -> "blah", + "longString" -> LongStringHelper.asLongString("blahlong"), + "list" -> List(1, 2, 3).asJava + ) + + "deliveryEncoder" should { + val basicPublishCommand = PublishCommandBuilder + .publishCommandBuilder(StringPayloadMarshaller) + .using(ExchangeName("blah")) + .using(RoutingKey("routing")) + .toPublishCommand("message") + + "decode a basic publish command" in { + Fs2RabbitAmqpClient.deliveryEncoder[IO].apply(basicPublishCommand).map { amqpMessage => + amqpMessage.payload shouldBe "message".getBytes + } + + } + + "decode a publish command with all the properties, apart from headers" in { + val publishCommand = basicPublishCommand.copy(basicProperties = + MessageProperties( + contentType = Some(publish.ContentType.textPlain), + contentEncoding = Some(publish.ContentEncoding.utf8), + headers = Map.empty, + deliveryMode = Some(consume.DeliveryMode.persistent), + priority = Some(1), + correlationId = Some("correlationid"), + replyTo = Some("replyto"), + expiration = Some("expiration"), + messageId = Some("messageId"), + timestamp = Some(Date.from(Instant.now())), + messageType = Some("messageType"), + userId = Some("userId"), + appId = Some("appId"), + clusterId = Some("clusterId") + ) + ) + + Fs2RabbitAmqpClient.deliveryEncoder[IO].apply(publishCommand).map { amqpMessage => + amqpMessage.properties shouldBe AmqpProperties( + contentType = Some("text/plain"), + contentEncoding = Some("utf-8"), + priority = Some(1), + deliveryMode = Some(model.DeliveryMode.Persistent), + correlationId = Some("correlationid"), + messageId = Some("messageId"), + `type` = Some("messageType"), + userId = Some("userId"), + appId = Some("appId"), + expiration = Some("expiration"), + replyTo = Some("replyto"), + clusterId = Some("clusterId"), + timestamp = Some(publishCommand.basicProperties.timestamp.get.toInstant), + headers = Map.empty + ) + + } + } + + "decode a publish command with headers" in { + val publishCommand = basicPublishCommand.copy(basicProperties = + MessageProperties.basic.copy( + headers = messagePropertyHeaders + ) + ) + + Fs2RabbitAmqpClient.deliveryEncoder[IO].apply(publishCommand).map { amqpMessage => + amqpMessage.properties.headers shouldBe amqpMessageHeaders + } + + } + } + + "deliveryDecoder" should { + val basicEnvelope = AmqpEnvelope( + DeliveryTag(123L), + "payload".getBytes, + AmqpProperties.empty, + model.ExchangeName("exchange"), + model.RoutingKey("routing"), + redelivered = false + ) + val queueName = QueueName("queue") + + "decode a basic AmqpEnvelope" in { + Fs2RabbitAmqpClient.deliveryDecoder[IO](queueName).apply(basicEnvelope).map { delivery => + delivery.envelope.deliveryTag shouldBe 123L + delivery.body.value shouldBe "payload".getBytes + delivery.envelope.exchangeName.value shouldBe "exchange" + delivery.envelope.routingKey.value shouldBe "routing" + delivery.envelope.redeliver shouldBe false + } + + } + + "decode an AmqpEnvelope with properties" in { + + val instant = Instant.ofEpochSecond(now.getEpochSecond) + + val amqpProperties = AmqpProperties.apply( + contentType = Some("content-type"), + contentEncoding = Some("content-encoding"), + priority = Some(5), + deliveryMode = Some(model.DeliveryMode.Persistent), + correlationId = Some("correlation-id"), + messageId = Some("message-id"), + `type` = Some("type"), + userId = Some("user"), + appId = Some("app"), + expiration = Some("expiry"), + replyTo = Some("reply"), + clusterId = Some("cluster"), + timestamp = Some(instant), + headers = amqpMessageHeaders + ) + + Fs2RabbitAmqpClient.deliveryDecoder[IO](queueName).apply(basicEnvelope.copy(properties = amqpProperties)).map { delivery => + delivery.properties.headers.get("bigDecimal") shouldBe Some(java.math.BigDecimal.ONE) + delivery.properties.headers.get("instant").value.asInstanceOf[Date].toInstant shouldBe now + delivery.properties.headers.get("date") shouldBe Some(Date.from(now)) + delivery.properties.headers.get("map") shouldBe Some( + Map( + "inside" -> 1 + ).asJava + ) + delivery.properties.headers.get("byte") shouldBe Some(Byte.box('c'.toByte)) + delivery.properties.headers.get("double") shouldBe Some(Double.box(4.5)) + delivery.properties.headers.get("float") shouldBe Some(Float.box(Double.box(4.5).toFloat)) + delivery.properties.headers.get("short") shouldBe Some(Short.box(1)) + delivery.properties.headers.get("byteArray").value.asInstanceOf[Array[Byte]] shouldBe Array(0.toByte, 1.toByte) + delivery.properties.headers.get("int") shouldBe Some(Int.box(Integer.MIN_VALUE)) + delivery.properties.headers.get("long") shouldBe Some(Long.box(Long.MinValue)) + delivery.properties.headers.get("string") shouldBe Some("blah") + delivery.properties.headers.get("longString") shouldBe Some("blahlong") + delivery.properties.headers.get("list") shouldBe Some(List(1, 2, 3).asJava) + + delivery.properties.contentType.map(_.value) shouldBe amqpProperties.contentType + delivery.properties.contentEncoding.map(_.value) shouldBe amqpProperties.contentEncoding + delivery.properties.deliveryMode shouldBe Some(DeliveryMode.persistent) + delivery.properties.priority shouldBe Some(5) + delivery.properties.correlationId shouldBe Some("correlation-id") + delivery.properties.replyTo shouldBe Some("reply") + delivery.properties.expiration shouldBe Some("expiry") + delivery.properties.messageId shouldBe Some("message-id") + delivery.properties.timestamp shouldBe Some(Date.from(instant)) + delivery.properties.messageType shouldBe Some("type") + delivery.properties.userId shouldBe Some("user") + delivery.properties.appId shouldBe Some("app") + delivery.properties.clusterId shouldBe Some("cluster") + } + } + + } + +} diff --git a/core/src/main/scala/com/itv/bucky/AmqpClientConnectionManager.scala b/backendJavaAmqp/src/main/scala/com/itv/bucky/backend/javaamqp/AmqpClientConnectionManager.scala similarity index 91% rename from core/src/main/scala/com/itv/bucky/AmqpClientConnectionManager.scala rename to backendJavaAmqp/src/main/scala/com/itv/bucky/backend/javaamqp/AmqpClientConnectionManager.scala index d4c2f9b4..e015982f 100644 --- a/core/src/main/scala/com/itv/bucky/AmqpClientConnectionManager.scala +++ b/backendJavaAmqp/src/main/scala/com/itv/bucky/backend/javaamqp/AmqpClientConnectionManager.scala @@ -1,21 +1,19 @@ -package com.itv.bucky +package com.itv.bucky.backend.javaamqp import cats.effect._ import cats.effect.implicits._ import cats.effect.std.Dispatcher import cats.implicits._ +import com.itv.bucky.backend.javaamqp.publish.PendingConfirmListener import com.itv.bucky.consume._ -import com.itv.bucky.publish._ import com.itv.bucky.decl.Declaration -import com.itv.bucky.publish.PendingConfirmListener +import com.itv.bucky.publish._ +import com.itv.bucky.{AmqpClientConfig, Handler, QueueName} import com.typesafe.scalalogging.StrictLogging import scala.collection.immutable.TreeMap -import scala.util.Try -import cats.effect.{Deferred, Ref, Temporal} - -import java.util.concurrent.Executors import scala.concurrent.ExecutionContext +import scala.util.Try private[bucky] case class AmqpClientConnectionManager[F[_]]( amqpConfig: AmqpClientConfig, @@ -33,7 +31,7 @@ private[bucky] case class AmqpClientConnectionManager[F[_]]( }) } - def publish(cmd: PublishCommand): F[Unit] = + def publish(cmd: PublishCommand, mandatory: Boolean): F[Unit] = for { deliveryTag <- Ref.of[F, Option[Long]](None) _ <- (for { @@ -43,7 +41,7 @@ private[bucky] case class AmqpClientConnectionManager[F[_]]( nextPublishSeq <- publishChannel.getNextPublishSeqNo _ <- deliveryTag.set(Some(nextPublishSeq)) _ <- pendingConfirmListener.pendingConfirmations.update(_ + (nextPublishSeq -> signal)) - _ <- publishChannel.publish(nextPublishSeq, cmd) + _ <- publishChannel.publish(nextPublishSeq, cmd, mandatory) } yield () } _ <- signal.get.ifM(F.unit, F.raiseError[Unit](new RuntimeException(s"Failed to publish msg: ${cmd}"))) @@ -89,7 +87,7 @@ private[bucky] object AmqpClientConnectionManager extends StrictLogging { pendingConfirmations <- Ref.of[F, TreeMap[Long, Deferred[F, Boolean]]](TreeMap.empty) pendingReturn <- Ref.of[F, Boolean](false) _ <- publishChannel.confirmSelect - confirmListener <- F.blocking(publish.PendingConfirmListener(pendingConfirmations, pendingReturn, dispatcher)) + confirmListener <- F.blocking(PendingConfirmListener(pendingConfirmations, pendingReturn, dispatcher)) _ <- publishChannel.addConfirmListener(confirmListener) _ <- publishChannel.addReturnListener(confirmListener) } yield AmqpClientConnectionManager(config, publishChannel, confirmListener, dispatcher, executionContext) diff --git a/core/src/main/scala/com/itv/bucky/Channel.scala b/backendJavaAmqp/src/main/scala/com/itv/bucky/backend/javaamqp/Channel.scala similarity index 94% rename from core/src/main/scala/com/itv/bucky/Channel.scala rename to backendJavaAmqp/src/main/scala/com/itv/bucky/backend/javaamqp/Channel.scala index 60232522..a1e95d12 100644 --- a/core/src/main/scala/com/itv/bucky/Channel.scala +++ b/backendJavaAmqp/src/main/scala/com/itv/bucky/backend/javaamqp/Channel.scala @@ -1,12 +1,14 @@ -package com.itv.bucky +package com.itv.bucky.backend.javaamqp import cats.effect.implicits._ import cats.effect.std.Dispatcher import cats.effect.{Async, Sync} import cats.implicits._ +import com.itv.bucky.backend.javaamqp.consume.Consumer import com.itv.bucky.consume._ import com.itv.bucky.decl._ import com.itv.bucky.publish.PublishCommand +import com.itv.bucky.{Envelope, Handler, QueueName} import com.rabbitmq.client.AMQP.BasicProperties import com.rabbitmq.client.{ConfirmListener, DefaultConsumer, ReturnListener, Channel => RabbitChannel, Envelope => RabbitMQEnvelope} import com.typesafe.scalalogging.StrictLogging @@ -23,7 +25,7 @@ trait Channel[F[_]] { def addConfirmListener(listener: ConfirmListener): F[Unit] def addReturnListener(listener: ReturnListener): F[Unit] def getNextPublishSeqNo: F[Long] - def publish(sequenceNumber: Long, cmd: PublishCommand): F[Unit] + def publish(sequenceNumber: Long, cmd: PublishCommand, mandatory: Boolean): F[Unit] def sendAction(action: ConsumeAction)(envelope: Envelope): F[Unit] def declareExchange(exchange: Exchange): F[Unit] def declareQueue(queue: Queue): F[Unit] @@ -66,12 +68,12 @@ object Channel { override def addReturnListener(listener: ReturnListener): F[Unit] = F.delay(channel.addReturnListener(listener)) override def getNextPublishSeqNo: F[Long] = F.delay(channel.getNextPublishSeqNo) - override def publish(sequenceNumber: Long, cmd: PublishCommand): F[Unit] = + override def publish(sequenceNumber: Long, cmd: PublishCommand, mandatory: Boolean): F[Unit] = for { _ <- F.delay(logger.debug("Publishing command with exchange:{} rk: {}.", cmd.exchange, cmd.routingKey)) _ <- F.blocking( channel - .basicPublish(cmd.exchange.value, cmd.routingKey.value, cmd.mandatory, false, MessagePropertiesConverters(cmd.basicProperties), cmd.body.value) + .basicPublish(cmd.exchange.value, cmd.routingKey.value, mandatory, false, MessagePropertiesConverters(cmd.basicProperties), cmd.body.value) ) _ <- F.delay(logger.info("Published message: {}", cmd)) } yield () @@ -142,7 +144,7 @@ object Channel { case Left(e) => F.delay(logger.debug(s"Handler failure with {} will recover to: {}", e.getMessage, onHandlerException)) *> F.delay(onHandlerException) } - .flatMap(sendAction(_)(Envelope.fromEnvelope(envelope))) + .flatMap(sendAction(_)(EnvelopeConversion.fromJavaEnvelope(envelope))) ) } } diff --git a/backendJavaAmqp/src/main/scala/com/itv/bucky/backend/javaamqp/EnvelopeConversion.scala b/backendJavaAmqp/src/main/scala/com/itv/bucky/backend/javaamqp/EnvelopeConversion.scala new file mode 100644 index 00000000..58f00a49 --- /dev/null +++ b/backendJavaAmqp/src/main/scala/com/itv/bucky/backend/javaamqp/EnvelopeConversion.scala @@ -0,0 +1,9 @@ +package com.itv.bucky.backend.javaamqp + +import com.itv.bucky.{Envelope, ExchangeName, RoutingKey} +import com.rabbitmq.client.{Envelope => RabbitEnvelope} + +object EnvelopeConversion { + def fromJavaEnvelope(envelope: RabbitEnvelope): Envelope = + Envelope(envelope.getDeliveryTag, envelope.isRedeliver, ExchangeName(envelope.getExchange), RoutingKey(envelope.getRoutingKey)) +} diff --git a/backendJavaAmqp/src/main/scala/com/itv/bucky/backend/javaamqp/JavaBackendAmqpClient.scala b/backendJavaAmqp/src/main/scala/com/itv/bucky/backend/javaamqp/JavaBackendAmqpClient.scala new file mode 100644 index 00000000..d11afd93 --- /dev/null +++ b/backendJavaAmqp/src/main/scala/com/itv/bucky/backend/javaamqp/JavaBackendAmqpClient.scala @@ -0,0 +1,175 @@ +package com.itv.bucky.backend.javaamqp + +import cats.effect._ +import cats.effect.implicits._ +import cats.effect.std.Dispatcher +import cats.implicits._ +import com.itv.bucky.{AmqpClient, AmqpClientConfig, Handler, Publisher, QueueName} +import com.itv.bucky.consume.{ConsumeAction, DeadLetter, Delivery} +import com.itv.bucky.decl._ +import com.itv.bucky.publish.PublishCommand +import com.rabbitmq.client.{ConnectionFactory, ShutdownListener, ShutdownSignalException, Channel => RabbitChannel, Connection => RabbitConnection} +import com.typesafe.scalalogging.StrictLogging + +import java.util.concurrent.{AbstractExecutorService, Executors, TimeUnit} +import java.util.{Collections, UUID} +import scala.concurrent.duration.{FiniteDuration, _} +import scala.concurrent.{ExecutionContext, ExecutionContextExecutorService} +import scala.language.higherKinds + +object JavaBackendAmqpClient extends StrictLogging { + private def createChannel[F[_]](connection: RabbitConnection, executionContext: ExecutionContext)(implicit + F: Async[F] + ): Resource[F, RabbitChannel] = { + val make = + F.blocking { + logger.info(s"Starting Channel") + val channel = connection.createChannel() + channel.addShutdownListener(new ShutdownListener { + override def shutdownCompleted(cause: ShutdownSignalException): Unit = + if (cause.isInitiatedByApplication) + logger.info(s"Channel shut down due to explicit application action: ${cause.getMessage}") + else + logger.error(s"Channel shut down by broker or because of detectable non-deliberate application failure", cause) + }) + channel + } + .attempt + .flatTap { + case Right(_) => + F.delay(logger.info(s"Channel has been started successfully!")) + case Left(exception) => + F.delay(logger.error(s"Failure when starting Channel because ${exception.getMessage}", exception)) + } + .rethrow + + Resource.make(make.evalOn(executionContext))(channel => F.blocking(channel.close())) + } + + private def createConnection[F[_]](config: AmqpClientConfig, executionContext: ExecutionContext)(implicit + F: Async[F] + ): Resource[F, RabbitConnection] = { + val make = + F.blocking { + logger.info(s"Starting AmqpClient") + val connectionFactory = new ConnectionFactory() + connectionFactory.setHost(config.host) + connectionFactory.setPort(config.port) + connectionFactory.setUsername(config.username) + connectionFactory.setPassword(config.password) + connectionFactory.setAutomaticRecoveryEnabled(config.networkRecoveryInterval.isDefined) + connectionFactory.setSharedExecutor(executionContext match { + case null => throw null + case eces: ExecutionContextExecutorService => eces + case other => + new AbstractExecutorService with ExecutionContextExecutorService { + override def prepare(): ExecutionContext = other + override def isShutdown = false + override def isTerminated = false + override def shutdown() = () + override def shutdownNow() = Collections.emptyList[Runnable] + override def execute(runnable: Runnable): Unit = other execute runnable + override def reportFailure(t: Throwable): Unit = other reportFailure t + override def awaitTermination(length: Long, unit: TimeUnit): Boolean = false + } + }) + config.networkRecoveryInterval.map(_.toMillis.toInt).foreach(connectionFactory.setNetworkRecoveryInterval) + config.virtualHost.foreach(connectionFactory.setVirtualHost) + connectionFactory.newConnection() + } + .attempt + .flatTap { + case Right(_) => + logger.info(s"AmqpClient has been started successfully!").pure[F] + case Left(exception) => + logger.error(s"Failure when starting AmqpClient because ${exception.getMessage}", exception).pure[F] + } + .rethrow + + Resource.make(make.evalOn(executionContext))(connection => F.blocking(connection.close())) + } + + def apply[F[_]](config: AmqpClientConfig)(implicit + F: Async[F], + t: Temporal[F], + connectionExecutionContext: ExecutionContext + ): Resource[F, AmqpClient[F]] = + for { + channelEc <- defaultChannelExecutionContext + client <- apply[F](config, connectionExecutionContext, channelEc) + } yield client + + def apply[F[_]](config: AmqpClientConfig, connectionExecutionContext: ExecutionContext, channelExecutionContext: ExecutionContext)(implicit + F: Async[F], + t: Temporal[F] + ): Resource[F, AmqpClient[F]] = + for { + dispatcher <- Dispatcher.parallel[F] + connection <- createConnection(config, connectionExecutionContext) + publishChannel = createChannel(connection, channelExecutionContext).map(Channel.apply[F](_, dispatcher, channelExecutionContext)) + buildChannel = () => createChannel(connection, channelExecutionContext).map(Channel.apply[F](_, dispatcher, channelExecutionContext)) + client <- apply[F](config, buildChannel, publishChannel, dispatcher, channelExecutionContext) + } yield client + + private def defaultChannelExecutionContext[F[_]](implicit F: Sync[F]) = + Resource.make(F.delay(Executors.newCachedThreadPool()))(e => F.delay(e.shutdown())).map(ExecutionContext.fromExecutor) + + def apply[F[_]]( + config: AmqpClientConfig, + buildChannel: () => Resource[F, Channel[F]], + publishChannel: Resource[F, Channel[F]], + dispatcher: Dispatcher[F], + channelExecutionContext: ExecutionContext + )(implicit F: Async[F], t: Temporal[F]): Resource[F, AmqpClient[F]] = + publishChannel.flatMap { channel => + val make = + for { + connectionManager <- AmqpClientConnectionManager(config, channel, dispatcher, channelExecutionContext) + } yield mkClient(buildChannel, connectionManager, channelExecutionContext) + Resource.make(make.evalOn(channelExecutionContext))(_ => F.unit) + } + + private def mkClient[F[_]]( + buildChannel: () => Resource[F, Channel[F]], + connectionManager: AmqpClientConnectionManager[F], + executionContext: ExecutionContext + )(implicit F: Async[F], t: Temporal[F]): AmqpClient[F] = + new AmqpClient[F] { + private def repeatUntil[A](eval: F[A])(pred: A => Boolean)(sleep: FiniteDuration): F[Unit] = + for { + result <- eval + ended <- F.pure(pred(result)) + _ <- if (ended) F.unit else t.sleep(sleep) *> repeatUntil(eval)(pred)(sleep) + } yield () + + override def publisher(mandatory: Boolean = false): F[Publisher[F, PublishCommand]] = F.pure( + cmd => connectionManager.publish(cmd, mandatory).evalOn(executionContext) + ) + + override def registerConsumer(queueName: QueueName, + handler: Handler[F, Delivery], + exceptionalAction: ConsumeAction, + prefetchCount: Int, + shutdownTimeout: FiniteDuration, + shutdownRetry: FiniteDuration): Resource[F, Unit] = + for { + channel <- buildChannel() + handling <- Resource.make(Ref.of[F, Set[UUID]](Set.empty))(set => + repeatUntil(F.blocking(logger.debug("Verifying running handlers.")) *> set.get)(_.isEmpty)(shutdownRetry).timeout(shutdownTimeout)) + newHandler = (delivery: Delivery) => + for { + id <- F.delay(UUID.randomUUID()) + _ <- handling.update(set => set + id) + resultAttempt <- handler(delivery).attempt + _ <- handling.update(set => set - id) + result <- F.fromEither(resultAttempt) + } yield result + result <- Resource.eval(connectionManager.registerConsumer(channel, queueName, newHandler, exceptionalAction, prefetchCount)) + } yield result + + override def declare(declarations: Declaration*): F[Unit] = connectionManager.declare(declarations) + override def declare(declarations: Iterable[Declaration]): F[Unit] = connectionManager.declare(declarations) + + override def isConnectionOpen: F[Boolean] = connectionManager.publishChannel.isConnectionOpen + } +} diff --git a/core/src/main/scala/com/itv/bucky/MessagePropertiesConverters.scala b/backendJavaAmqp/src/main/scala/com/itv/bucky/backend/javaamqp/MessagePropertiesConverters.scala similarity index 96% rename from core/src/main/scala/com/itv/bucky/MessagePropertiesConverters.scala rename to backendJavaAmqp/src/main/scala/com/itv/bucky/backend/javaamqp/MessagePropertiesConverters.scala index 4ed25060..a23a6ea2 100644 --- a/core/src/main/scala/com/itv/bucky/MessagePropertiesConverters.scala +++ b/backendJavaAmqp/src/main/scala/com/itv/bucky/backend/javaamqp/MessagePropertiesConverters.scala @@ -1,7 +1,8 @@ -package com.itv.bucky +package com.itv.bucky.backend.javaamqp import com.itv.bucky.consume.DeliveryMode import com.itv.bucky.publish.{ContentEncoding, ContentType, MessageProperties} +import com.itv.bucky.{Envelope, ExchangeName, RoutingKey} import com.rabbitmq.client.AMQP.BasicProperties import com.rabbitmq.client.{Envelope => RabbitMQEnvelope} diff --git a/core/src/main/scala/com/itv/bucky/consume/Consumer.scala b/backendJavaAmqp/src/main/scala/com/itv/bucky/backend/javaamqp/consume/Consumer.scala similarity index 67% rename from core/src/main/scala/com/itv/bucky/consume/Consumer.scala rename to backendJavaAmqp/src/main/scala/com/itv/bucky/backend/javaamqp/consume/Consumer.scala index 37d02447..b9881c39 100644 --- a/core/src/main/scala/com/itv/bucky/consume/Consumer.scala +++ b/backendJavaAmqp/src/main/scala/com/itv/bucky/backend/javaamqp/consume/Consumer.scala @@ -1,6 +1,8 @@ -package com.itv.bucky.consume +package com.itv.bucky.backend.javaamqp.consume -import com.itv.bucky.{MessagePropertiesConverters, Payload} +import com.itv.bucky.Payload +import com.itv.bucky.backend.javaamqp.MessagePropertiesConverters +import com.itv.bucky.consume.{ConsumerTag, Delivery} import com.rabbitmq.client.AMQP.BasicProperties import com.rabbitmq.client.Envelope import com.typesafe.scalalogging.StrictLogging diff --git a/core/src/main/scala/com/itv/bucky/publish/PendingConfirmListener.scala b/backendJavaAmqp/src/main/scala/com/itv/bucky/backend/javaamqp/publish/PendingConfirmListener.scala similarity index 96% rename from core/src/main/scala/com/itv/bucky/publish/PendingConfirmListener.scala rename to backendJavaAmqp/src/main/scala/com/itv/bucky/backend/javaamqp/publish/PendingConfirmListener.scala index 8891cbe7..46410b6d 100644 --- a/core/src/main/scala/com/itv/bucky/publish/PendingConfirmListener.scala +++ b/backendJavaAmqp/src/main/scala/com/itv/bucky/backend/javaamqp/publish/PendingConfirmListener.scala @@ -1,7 +1,7 @@ -package com.itv.bucky.publish +package com.itv.bucky.backend.javaamqp.publish import cats.effect.std.Dispatcher -import cats.effect.{Deferred, Ref, _} +import cats.effect._ import cats.implicits._ import com.rabbitmq.client.{AMQP, ConfirmListener, ReturnListener} import com.typesafe.scalalogging.StrictLogging diff --git a/core/src/test/scala/com/itv/bucky/MessagePropertiesConvertersTest.scala b/backendJavaAmqp/src/test/scala/com/itv/bucky/backend/javaamqp/MessagePropertiesConvertersTest.scala similarity index 91% rename from core/src/test/scala/com/itv/bucky/MessagePropertiesConvertersTest.scala rename to backendJavaAmqp/src/test/scala/com/itv/bucky/backend/javaamqp/MessagePropertiesConvertersTest.scala index ecf509f7..3fbe2472 100644 --- a/core/src/test/scala/com/itv/bucky/MessagePropertiesConvertersTest.scala +++ b/backendJavaAmqp/src/test/scala/com/itv/bucky/backend/javaamqp/MessagePropertiesConvertersTest.scala @@ -1,15 +1,12 @@ -package com.itv.bucky - -import java.util.Date +package com.itv.bucky.backend.javaamqp import com.itv.bucky.consume.DeliveryMode import com.itv.bucky.publish.{ContentEncoding, ContentType, MessageProperties} import com.rabbitmq.client.AMQP.BasicProperties - import org.scalatest.funsuite.AnyFunSuite import org.scalatest.matchers.should.Matchers._ - +import java.util.Date import scala.jdk.CollectionConverters._ class MessagePropertiesConvertersTest extends AnyFunSuite { @@ -17,9 +14,9 @@ class MessagePropertiesConvertersTest extends AnyFunSuite { test("should be able to convert minimal basic properties to message properties") { val basicProperties: BasicProperties = new BasicProperties() - val messageProperties: publish.MessageProperties = MessagePropertiesConverters.apply(basicProperties) + val messageProperties: MessageProperties = MessagePropertiesConverters.apply(basicProperties) - messageProperties shouldBe publish.MessageProperties( + messageProperties shouldBe MessageProperties( contentType = None, contentEncoding = None, headers = Map.empty, diff --git a/build.sbt b/build.sbt index ebc70497..c2b157a7 100644 --- a/build.sbt +++ b/build.sbt @@ -50,7 +50,7 @@ lazy val kernelSettings = Seq( crossScalaVersions := Seq(scala212, scala213), scalaVersion := scala213, organization := "com.itv", - scalacOptions ++= Seq("-feature", "-deprecation", "-Xfatal-warnings", "-language:higherKinds"), + scalacOptions ++= Seq("-feature", "-deprecation", "-language:higherKinds"), publishTo := { val nexus = "https://oss.sonatype.org/" if (isSnapshot.value) @@ -141,9 +141,45 @@ lazy val core = project "org.scalatest" %% "scalatest" % scalaTestVersion % "test", "org.typelevel" %% "cats-effect-testing-scalatest" % catsEffectScalaTestVersion % "test", "org.typelevel" %% "cats-effect" % catsEffectVersion, + "ch.qos.logback" % "logback-classic" % logbackVersion % "test", + "org.scala-lang.modules" %% "scala-collection-compat" % "2.5.0" + ) + ) + +lazy val backendJavaAmqp = project + .settings(name := "com.itv") + .settings(moduleName := "bucky-backend-java-amqp") + .settings(kernelSettings: _*) + .dependsOn(core) + .aggregate(core) + .settings( + libraryDependencies ++= Seq( "com.rabbitmq" % "amqp-client" % amqpClientVersion, + "com.typesafe.scala-logging" %% "scala-logging" % scalaLoggingVersion, + "org.scalatest" %% "scalatest" % scalaTestVersion % "test", + "org.typelevel" %% "cats-effect-testing-scalatest" % catsEffectScalaTestVersion % "test", + "org.typelevel" %% "cats-effect" % catsEffectVersion, + "ch.qos.logback" % "logback-classic" % logbackVersion % "test", + "org.scala-lang.modules" %% "scala-collection-compat" % "2.5.0" + ) + ) + +lazy val backendFs2Rabbit = project + .settings(name := "com.itv") + .settings(moduleName := "bucky-backend-fs2-rabbit") + .settings(kernelSettings) + .dependsOn(core) + .aggregate(core) + .settings( + libraryDependencies ++= Seq( + "com.typesafe.scala-logging" %% "scala-logging" % scalaLoggingVersion, + "org.scalatest" %% "scalatest" % scalaTestVersion % "test", + "org.typelevel" %% "cats-effect-testing-scalatest" % catsEffectScalaTestVersion % "test", + "org.typelevel" %% "cats-effect" % catsEffectVersion, + "dev.profunktor" %% "fs2-rabbit" % "5.0.0", + "dev.profunktor" %% "fs2-rabbit-circe" % "5.0.0", "ch.qos.logback" % "logback-classic" % logbackVersion % "test", - "org.scala-lang.modules" %% "scala-collection-compat" % "2.11.0" + "org.scala-lang.modules" %% "scala-collection-compat" % "2.5.0" ) ) @@ -151,7 +187,7 @@ lazy val test = project .settings(name := "com.itv") .settings(moduleName := "bucky-test") .settings(kernelSettings) - .dependsOn(core) + .dependsOn(core, backendJavaAmqp) .aggregate(core) .settings( libraryDependencies ++= Seq( @@ -168,7 +204,7 @@ lazy val it = project .settings(name := "com.itv") .settings(moduleName := "bucky-it") .settings(kernelSettings) - .dependsOn(core, test) + .dependsOn(core, test, backendFs2Rabbit) .aggregate(core) .settings( libraryDependencies ++= Seq( @@ -194,7 +230,10 @@ lazy val example = project "com.typesafe.scala-logging" %% "scala-logging" % scalaLoggingVersion, "org.scalatest" %% "scalatest" % scalaTestVersion, "org.typelevel" %% "cats-effect-testing-scalatest" % catsEffectScalaTestVersion % "test", - "com.typesafe" % "config" % typeSafeVersion + "com.typesafe" % "config" % typeSafeVersion, + "ch.qos.logback" % "logback-classic" % logbackVersion, + "dev.profunktor" %% "fs2-rabbit" % "5.0.0", + "dev.profunktor" %% "fs2-rabbit-circe" % "5.0.0" ) ) @@ -246,5 +285,5 @@ lazy val xml = project ) lazy val root = (project in file(".")) - .aggregate(xml, circe, argonaut, example, test, core) + .aggregate(xml, circe, argonaut, example, test, backendJavaAmqp, backendFs2Rabbit, core) .settings(publishArtifact := false) diff --git a/core/src/main/scala/com/itv/bucky/AmqpClient.scala b/core/src/main/scala/com/itv/bucky/AmqpClient.scala index b534c273..c73e91c3 100644 --- a/core/src/main/scala/com/itv/bucky/AmqpClient.scala +++ b/core/src/main/scala/com/itv/bucky/AmqpClient.scala @@ -1,24 +1,17 @@ package com.itv.bucky import cats.effect._ -import cats.effect.implicits._ -import cats.effect.std.Dispatcher -import cats.implicits._ import com.itv.bucky.consume.{ConsumeAction, DeadLetter, Delivery} import com.itv.bucky.decl._ import com.itv.bucky.publish.PublishCommand -import com.rabbitmq.client.{ConnectionFactory, ShutdownListener, ShutdownSignalException, Channel => RabbitChannel, Connection => RabbitConnection} -import com.typesafe.scalalogging.StrictLogging -import java.util.concurrent.{AbstractExecutorService, Executors, TimeUnit} -import java.util.{Collections, UUID} import scala.concurrent.duration.{FiniteDuration, _} import scala.concurrent.{ExecutionContext, ExecutionContextExecutorService} trait AmqpClient[F[_]] { def declare(declarations: Declaration*): F[Unit] def declare(declarations: Iterable[Declaration]): F[Unit] - def publisher(): Publisher[F, PublishCommand] + def publisher(mandatory: Boolean = false): F[Publisher[F, PublishCommand]] def registerConsumer(queueName: QueueName, handler: Handler[F, Delivery], exceptionalAction: ConsumeAction = DeadLetter, @@ -27,159 +20,3 @@ trait AmqpClient[F[_]] { shutdownRetry: FiniteDuration = 500.millis): Resource[F, Unit] def isConnectionOpen: F[Boolean] } - -object AmqpClient extends StrictLogging { - - private def createChannel[F[_]](connection: RabbitConnection, executionContext: ExecutionContext)(implicit - F: Async[F] - ): Resource[F, RabbitChannel] = { - val make = - F.blocking { - logger.info(s"Starting Channel") - val channel = connection.createChannel() - channel.addShutdownListener(new ShutdownListener { - override def shutdownCompleted(cause: ShutdownSignalException): Unit = - if (cause.isInitiatedByApplication) - logger.info(s"Channel shut down due to explicit application action: ${cause.getMessage}") - else - logger.error(s"Channel shut down by broker or because of detectable non-deliberate application failure", cause) - }) - channel - } - .attempt - .flatTap { - case Right(_) => - F.delay(logger.info(s"Channel has been started successfully!")) - case Left(exception) => - F.delay(logger.error(s"Failure when starting Channel because ${exception.getMessage}", exception)) - } - .rethrow - - Resource.make(make.evalOn(executionContext))(channel => F.blocking(channel.close())) - } - - private def createConnection[F[_]](config: AmqpClientConfig, executionContext: ExecutionContext)(implicit - F: Async[F] - ): Resource[F, RabbitConnection] = { - val make = - F.blocking { - logger.info(s"Starting AmqpClient") - val connectionFactory = new ConnectionFactory() - connectionFactory.setHost(config.host) - connectionFactory.setPort(config.port) - connectionFactory.setUsername(config.username) - connectionFactory.setPassword(config.password) - connectionFactory.setAutomaticRecoveryEnabled(config.networkRecoveryInterval.isDefined) - connectionFactory.setSharedExecutor(executionContext match { - case null => throw null - case eces: ExecutionContextExecutorService => eces - case other => - new AbstractExecutorService with ExecutionContextExecutorService { - override def prepare(): ExecutionContext = other - override def isShutdown = false - override def isTerminated = false - override def shutdown() = () - override def shutdownNow() = Collections.emptyList[Runnable] - override def execute(runnable: Runnable): Unit = other execute runnable - override def reportFailure(t: Throwable): Unit = other reportFailure t - override def awaitTermination(length: Long, unit: TimeUnit): Boolean = false - } - }) - config.networkRecoveryInterval.map(_.toMillis.toInt).foreach(connectionFactory.setNetworkRecoveryInterval) - config.virtualHost.foreach(connectionFactory.setVirtualHost) - connectionFactory.newConnection() - } - .attempt - .flatTap { - case Right(_) => - logger.info(s"AmqpClient has been started successfully!").pure[F] - case Left(exception) => - logger.error(s"Failure when starting AmqpClient because ${exception.getMessage}", exception).pure[F] - } - .rethrow - - Resource.make(make.evalOn(executionContext))(connection => F.blocking(connection.close())) - } - - def apply[F[_]](config: AmqpClientConfig)(implicit - F: Async[F], - t: Temporal[F], - connectionExecutionContext: ExecutionContext - ): Resource[F, AmqpClient[F]] = - for { - channelEc <- defaultChannelExecutionContext - client <- apply[F](config, connectionExecutionContext, channelEc) - } yield client - - def apply[F[_]](config: AmqpClientConfig, connectionExecutionContext: ExecutionContext, channelExecutionContext: ExecutionContext)(implicit - F: Async[F], - t: Temporal[F] - ): Resource[F, AmqpClient[F]] = - for { - dispatcher <- Dispatcher.parallel[F] - connection <- createConnection(config, connectionExecutionContext) - publishChannel = createChannel(connection, channelExecutionContext).map(Channel.apply[F](_, dispatcher, channelExecutionContext)) - buildChannel = () => createChannel(connection, channelExecutionContext).map(Channel.apply[F](_, dispatcher, channelExecutionContext)) - client <- apply[F](config, buildChannel, publishChannel, dispatcher, channelExecutionContext) - } yield client - - private def defaultChannelExecutionContext[F[_]](implicit F: Sync[F]) = - Resource.make(F.delay(Executors.newCachedThreadPool()))(e => F.delay(e.shutdown())).map(ExecutionContext.fromExecutor) - - def apply[F[_]]( - config: AmqpClientConfig, - buildChannel: () => Resource[F, Channel[F]], - publishChannel: Resource[F, Channel[F]], - dispatcher: Dispatcher[F], - channelExecutionContext: ExecutionContext - )(implicit F: Async[F], t: Temporal[F]): Resource[F, AmqpClient[F]] = - publishChannel.flatMap { channel => - val make = - for { - connectionManager <- AmqpClientConnectionManager(config, channel, dispatcher, channelExecutionContext) - } yield mkClient(buildChannel, connectionManager, channelExecutionContext) - Resource.make(make.evalOn(channelExecutionContext))(_ => F.unit) - } - - private def mkClient[F[_]]( - buildChannel: () => Resource[F, Channel[F]], - connectionManager: AmqpClientConnectionManager[F], - executionContext: ExecutionContext - )(implicit F: Async[F], t: Temporal[F]): AmqpClient[F] = - new AmqpClient[F] { - private def repeatUntil[A](eval: F[A])(pred: A => Boolean)(sleep: FiniteDuration): F[Unit] = - for { - result <- eval - ended <- F.pure(pred(result)) - _ <- if (ended) F.unit else t.sleep(sleep) *> repeatUntil(eval)(pred)(sleep) - } yield () - - override def publisher(): Publisher[F, PublishCommand] = cmd => connectionManager.publish(cmd).evalOn(executionContext) - - override def registerConsumer(queueName: QueueName, - handler: Handler[F, Delivery], - exceptionalAction: ConsumeAction, - prefetchCount: Int, - shutdownTimeout: FiniteDuration, - shutdownRetry: FiniteDuration): Resource[F, Unit] = - for { - channel <- buildChannel() - handling <- Resource.make(Ref.of[F, Set[UUID]](Set.empty))(set => - repeatUntil(F.blocking(logger.debug("Verifying running handlers.")) *> set.get)(_.isEmpty)(shutdownRetry).timeout(shutdownTimeout)) - newHandler = (delivery: Delivery) => - for { - id <- F.delay(UUID.randomUUID()) - _ <- handling.update(set => set + id) - resultAttempt <- handler(delivery).attempt - _ <- handling.update(set => set - id) - result <- F.fromEither(resultAttempt) - } yield result - result <- Resource.eval(connectionManager.registerConsumer(channel, queueName, newHandler, exceptionalAction, prefetchCount)) - } yield result - - override def declare(declarations: Declaration*): F[Unit] = connectionManager.declare(declarations) - override def declare(declarations: Iterable[Declaration]): F[Unit] = connectionManager.declare(declarations) - - override def isConnectionOpen: F[Boolean] = connectionManager.publishChannel.isConnectionOpen - } -} diff --git a/core/src/main/scala/com/itv/bucky/AmqpClientConfig.scala b/core/src/main/scala/com/itv/bucky/AmqpClientConfig.scala index 3625bb1e..62f461b3 100644 --- a/core/src/main/scala/com/itv/bucky/AmqpClientConfig.scala +++ b/core/src/main/scala/com/itv/bucky/AmqpClientConfig.scala @@ -2,18 +2,24 @@ package com.itv.bucky import scala.concurrent.duration._ -/** - * AmqpClient configuration. +/** AmqpClient configuration. */ -case class AmqpClientConfig(host: String, - port: Int, - username: String, - password: String, - networkRecoveryInterval: Option[FiniteDuration] = Some(3.seconds), - networkRecoveryIntervalOnStart: Option[NetworkRecoveryOnStart] = Some(NetworkRecoveryOnStart()), - publishingTimeout: FiniteDuration = 15.seconds, - virtualHost: Option[String] = None) +case class AmqpClientConfig( + host: String, + port: Int, + username: String, + password: String, + networkRecoveryInterval: Option[FiniteDuration] = Some(3.seconds), + networkRecoveryIntervalOnStart: Option[NetworkRecoveryOnStart] = Some(NetworkRecoveryOnStart()), + publishingTimeout: FiniteDuration = 15.seconds, + virtualHost: Option[String] = None, + connectionTimeout: FiniteDuration = 10.seconds, + ssl: Boolean = false, + requeueOnNack: Boolean = false, + requeueOnReject: Boolean = true, + internalQueueSize: Option[Int] = None +) case class NetworkRecoveryOnStart(interval: FiniteDuration = 3.seconds, max: FiniteDuration = 3.seconds) { - val numberOfRetries = max.toMillis / interval.toMillis + val numberOfRetries: Long = max.toMillis / interval.toMillis } diff --git a/core/src/main/scala/com/itv/bucky/Envelope.scala b/core/src/main/scala/com/itv/bucky/Envelope.scala index 49c203b3..af8e738b 100644 --- a/core/src/main/scala/com/itv/bucky/Envelope.scala +++ b/core/src/main/scala/com/itv/bucky/Envelope.scala @@ -1,8 +1,3 @@ package com.itv.bucky -import com.rabbitmq.client.{Envelope => RabbitEnvelope} case class Envelope(deliveryTag: Long, redeliver: Boolean, exchangeName: ExchangeName, routingKey: RoutingKey) -object Envelope { - def fromEnvelope(envelope: RabbitEnvelope) = - Envelope(envelope.getDeliveryTag, envelope.isRedeliver, ExchangeName(envelope.getExchange), RoutingKey(envelope.getRoutingKey)) -} diff --git a/core/src/main/scala/com/itv/bucky/LoggingAmqpClient.scala b/core/src/main/scala/com/itv/bucky/LoggingAmqpClient.scala index a96cba7c..7b6d5c0a 100644 --- a/core/src/main/scala/com/itv/bucky/LoggingAmqpClient.scala +++ b/core/src/main/scala/com/itv/bucky/LoggingAmqpClient.scala @@ -16,26 +16,32 @@ object LoggingAmqpClient extends StrictLogging { private[bucky] def logSuccessfullPublishMessage[F[_]](charset: Charset, cmd: PublishCommand)(implicit F: Sync[F]): F[Unit] = F.delay( - logger.info("Successfully published message with rk:'{}', exchange:{} and message:'{}'", - cmd.routingKey.value, - cmd.exchange.value, - new String(cmd.body.value, charset)) + logger.info( + "Successfully published message with rk:'{}', exchange:{} and message:'{}'", + cmd.routingKey.value, + cmd.exchange.value, + new String(cmd.body.value, charset) + ) ) private[bucky] def logFailedPublishMessage[F[_]](t: Throwable, charset: Charset, cmd: PublishCommand)(implicit F: Sync[F]): F[Unit] = F.delay( - logger.error("Failed to publish message with rk:'{}', exchange:'{}' and message:'{}'", - cmd.routingKey.value, - cmd.exchange.value, - new String(cmd.body.value, charset), - t) + logger.error( + "Failed to publish message with rk:'{}', exchange:'{}' and message:'{}'", + cmd.routingKey.value, + cmd.exchange.value, + new String(cmd.body.value, charset), + t + ) ) - private[bucky] def logFailedHandler[F[_]](charset: Charset, - queueName: QueueName, - exceptionalAction: ConsumeAction, - delivery: Delivery, - t: Throwable)(implicit F: Sync[F]): F[Unit] = F.delay { + private[bucky] def logFailedHandler[F[_]]( + charset: Charset, + queueName: QueueName, + exceptionalAction: ConsumeAction, + delivery: Delivery, + t: Throwable + )(implicit F: Sync[F]): F[Unit] = F.delay { logger.error( s"Failed to execute handler for message with rk '{}' on queue '{}' and exchange '{}'. Will return '{}'. message: '{}', headers:'{}'", delivery.envelope.routingKey.value, @@ -48,8 +54,9 @@ object LoggingAmqpClient extends StrictLogging { ) } - private[bucky] def logSuccessfulHandler[F[_]](charset: Charset, queueName: QueueName, delivery: Delivery, ca: ConsumeAction)( - implicit F: Sync[F]): F[Unit] = F.delay { + private[bucky] def logSuccessfulHandler[F[_]](charset: Charset, queueName: QueueName, delivery: Delivery, ca: ConsumeAction)(implicit + F: Sync[F] + ): F[Unit] = F.delay { logger.info( "Executed handler for message with rk:'{}' on queue:'{}' and exchange '{}'. Will return '{}'. message: '{}'", delivery.envelope.routingKey.value, @@ -65,30 +72,31 @@ object LoggingAmqpClient extends StrictLogging { override def declare(declarations: decl.Declaration*): F[Unit] = amqpClient.declare(declarations) override def declare(declarations: Iterable[decl.Declaration]): F[Unit] = amqpClient.declare(declarations) - override def publisher(): Publisher[F, PublishCommand] = { - val originalPublisher = amqpClient.publisher() - cmd: PublishCommand => - { - (for { - result <- originalPublisher(cmd).attempt - _ <- result.fold[F[Unit]](logFailedPublishMessage(_, charset, cmd), _ => logSuccessfullPublishMessage(charset, cmd)) - } yield result).rethrow - } - } + override def publisher(mandatory: Boolean): F[Publisher[F, PublishCommand]] = + amqpClient.publisher().map { originalPublisher => + cmd: PublishCommand => + (for { + result <- originalPublisher(cmd).attempt + _ <- result.fold[F[Unit]](logFailedPublishMessage(_, charset, cmd), _ => logSuccessfullPublishMessage(charset, cmd)) + } yield result).rethrow + } - override def registerConsumer(queueName: QueueName, - handler: Handler[F, Delivery], - exceptionalAction: ConsumeAction, - prefetchCount: Int, - shutdownTimeout: FiniteDuration = 1.minutes, - shutdownRetry: FiniteDuration = 500.millis): Resource[F, Unit] = { - val newHandler = (delivery: Delivery) => { + override def registerConsumer( + queueName: QueueName, + handler: Handler[F, Delivery], + exceptionalAction: ConsumeAction, + prefetchCount: Int, + shutdownTimeout: FiniteDuration = 1.minutes, + shutdownRetry: FiniteDuration = 500.millis + ): Resource[F, Unit] = { + val newHandler = (delivery: Delivery) => (for { result <- handler(delivery).attempt - _ <- result.fold(logFailedHandler(charset, queueName, exceptionalAction, delivery, _), - logSuccessfulHandler(charset, queueName, delivery, _)) + _ <- result.fold( + logFailedHandler(charset, queueName, exceptionalAction, delivery, _), + logSuccessfulHandler(charset, queueName, delivery, _) + ) } yield result).rethrow - } amqpClient.registerConsumer(queueName, newHandler, exceptionalAction, prefetchCount) } diff --git a/core/src/main/scala/com/itv/bucky/package.scala b/core/src/main/scala/com/itv/bucky/package.scala index 84944ed5..86081108 100644 --- a/core/src/main/scala/com/itv/bucky/package.scala +++ b/core/src/main/scala/com/itv/bucky/package.scala @@ -1,8 +1,8 @@ package com.itv import java.nio.charset.{Charset, StandardCharsets} - import cats.effect.{Resource, Sync} +import cats.implicits.toFunctorOps import cats.{Applicative, ApplicativeError} import com.itv.bucky.Unmarshaller.toDeliveryUnmarshaller import com.itv.bucky.consume._ @@ -32,21 +32,21 @@ package object bucky { implicit class ConsumerSugar[F[_]](amqpClient: AmqpClient[F])(implicit val F: Sync[F]) { - def registerConsumerOf[T](queueName: QueueName, - handler: Handler[F, T], - exceptionalAction: ConsumeAction = DeadLetter, - prefetchCount: Int = defaultPreFetchCount)(implicit payloadUnmarshaller: PayloadUnmarshaller[T], - ae: ApplicativeError[F, Throwable]): Resource[F, Unit] = + def registerConsumerOf[T]( + queueName: QueueName, + handler: Handler[F, T], + exceptionalAction: ConsumeAction = DeadLetter, + prefetchCount: Int = defaultPreFetchCount + )(implicit payloadUnmarshaller: PayloadUnmarshaller[T], ae: ApplicativeError[F, Throwable]): Resource[F, Unit] = amqpClient.registerConsumer( queueName, - (delivery: Delivery) => { + (delivery: Delivery) => payloadUnmarshaller.unmarshal(delivery.body) match { case Right(value) => handler.apply(value) case Left(e) => ae.raiseError(e) - } - }, + }, exceptionalAction, prefetchCount ) @@ -58,7 +58,8 @@ package object bucky { onHandlerException: RequeueConsumeAction = Requeue, unmarshalFailureAction: RequeueConsumeAction = DeadLetter, onRequeueExpiryAction: T => F[ConsumeAction] = (_: T) => F.point[ConsumeAction](DeadLetter), - prefetchCount: Int = defaultPreFetchCount)(implicit unmarshaller: PayloadUnmarshaller[T], F: Sync[F]): Resource[F, Unit] = + prefetchCount: Int = defaultPreFetchCount + )(implicit unmarshaller: PayloadUnmarshaller[T], F: Sync[F]): Resource[F, Unit] = new RequeueOps(amqpClient).requeueDeliveryHandlerOf[T]( queueName = queueName, handler = handler, @@ -77,7 +78,8 @@ package object bucky { onHandlerException: RequeueConsumeAction = Requeue, unmarshalFailureAction: RequeueConsumeAction = DeadLetter, onRequeueExpiryAction: T => F[ConsumeAction] = (_: T) => F.point[ConsumeAction](DeadLetter), - prefetchCount: Int = defaultPreFetchCount)(implicit unmarshaller: DeliveryUnmarshaller[T], F: Sync[F]): Resource[F, Unit] = + prefetchCount: Int = defaultPreFetchCount + )(implicit unmarshaller: DeliveryUnmarshaller[T], F: Sync[F]): Resource[F, Unit] = new RequeueOps(amqpClient).requeueDeliveryHandlerOf[T]( queueName = queueName, handler = handler, @@ -100,45 +102,66 @@ package object bucky { } - implicit class PublisherSugar[F[_]](amqpClient: AmqpClient[F]) { + implicit class PublisherSugar[F[_]: Applicative](amqpClient: AmqpClient[F]) { - def publisherOf[T](implicit publishCommandBuilder: PublishCommandBuilder[T]): Publisher[F, T] = { - val basePublisher = amqpClient.publisher() - value: T => + def publisherOf[T](mandatory: Boolean)(implicit publishCommandBuilder: PublishCommandBuilder[T]): F[Publisher[F, T]] = + amqpClient.publisher(mandatory).map { basePublisher => value: T => { val command = publishCommandBuilder.toPublishCommand(value) basePublisher.apply(command) } - } + } + + def publisherOf[T](implicit publishCommandBuilder: PublishCommandBuilder[T]): F[Publisher[F, T]] = + publisherOf(mandatory = false) - def publisherOf[T](exchangeName: ExchangeName, routingKey: RoutingKey)(implicit marshaller: PayloadMarshaller[T]): Publisher[F, T] = { + def publisherOf[T](exchangeName: ExchangeName, routingKey: RoutingKey, mandatory: Boolean)(implicit + marshaller: PayloadMarshaller[T] + ): F[Publisher[F, T]] = { val pcb = PublishCommandBuilder .publishCommandBuilder(marshaller) .using(exchangeName) .using(routingKey) - publisherOf[T](pcb) + publisherOf[T](mandatory)(pcb) } - def publisherWithHeadersOf[T](exchangeName: ExchangeName, - routingKey: RoutingKey)(implicit F: Sync[F], marshaller: PayloadMarshaller[T]): PublisherWithHeaders[F, T] = { + def publisherOf[T](exchangeName: ExchangeName, routingKey: RoutingKey)(implicit + marshaller: PayloadMarshaller[T] + ): F[Publisher[F, T]] = + publisherOf(exchangeName, routingKey, mandatory = false) + + def publisherWithHeadersOf[T](exchangeName: ExchangeName, routingKey: RoutingKey, mandatory: Boolean)(implicit + F: Sync[F], + marshaller: PayloadMarshaller[T] + ): F[PublisherWithHeaders[F, T]] = { val pcb = PublishCommandBuilder .publishCommandBuilder(marshaller) .using(exchangeName) .using(routingKey) - publisherWithHeadersOf[T](pcb) + publisherWithHeadersOf[T](pcb, mandatory) } - def publisherWithHeadersOf[T](commandBuilder: PublishCommandBuilder[T])(implicit F: Sync[F]): PublisherWithHeaders[F, T] = - (message: T, headers: Map[String, AnyRef]) => + def publisherWithHeadersOf[T](exchangeName: ExchangeName, routingKey: RoutingKey)(implicit + F: Sync[F], + marshaller: PayloadMarshaller[T] + ): F[PublisherWithHeaders[F, T]] = + publisherWithHeadersOf[T](exchangeName, routingKey, mandatory = false) + + def publisherWithHeadersOf[T](commandBuilder: PublishCommandBuilder[T], mandatory: Boolean)(implicit F: Sync[F]): F[PublisherWithHeaders[F, T]] = + amqpClient.publisher(mandatory).map { publisher => (message: T, headers: Map[String, AnyRef]) => F.flatMap(F.delay { val command = commandBuilder.toPublishCommand(message) - command.copy(basicProperties = headers.foldLeft(command.basicProperties) { - case (props, (headerName, headerValue)) => props.withHeader(headerName -> headerValue) + command.copy(basicProperties = headers.foldLeft(command.basicProperties) { case (props, (headerName, headerValue)) => + props.withHeader(headerName -> headerValue) }) - })(amqpClient.publisher()) + })(publisher) + } + + def publisherWithHeadersOf[T](commandBuilder: PublishCommandBuilder[T])(implicit F: Sync[F]): F[PublisherWithHeaders[F, T]] = + publisherWithHeadersOf(commandBuilder, mandatory = false) } diff --git a/core/src/main/scala/com/itv/bucky/pattern/requeue/package.scala b/core/src/main/scala/com/itv/bucky/pattern/requeue/package.scala index 40a73e1b..3d58f0d8 100644 --- a/core/src/main/scala/com/itv/bucky/pattern/requeue/package.scala +++ b/core/src/main/scala/com/itv/bucky/pattern/requeue/package.scala @@ -59,10 +59,11 @@ package object requeue { onRequeueExpiryAction: Delivery => F[ConsumeAction] = (_: Delivery) => F.point[ConsumeAction](DeadLetter), prefetchCount: Int = defaultPreFetchCount): Resource[F, Unit] = { val requeueExchange = ExchangeName(s"${queueName.value}.requeue") - val requeuePublish = amqpClient.publisher() - amqpClient.registerConsumer(queueName, - RequeueTransformer(requeuePublish, requeueExchange, requeuePolicy, onHandlerException, onRequeueExpiryAction)(handler), - prefetchCount = prefetchCount) + Resource.eval(amqpClient.publisher(mandatory = true)).flatMap { requeuePublish => + amqpClient.registerConsumer(queueName, + RequeueTransformer(requeuePublish, requeueExchange, requeuePolicy, onHandlerException, onRequeueExpiryAction)(handler), + prefetchCount = prefetchCount) + } } } diff --git a/core/src/main/scala/com/itv/bucky/publish/PublishCommandBuilder.scala b/core/src/main/scala/com/itv/bucky/publish/PublishCommandBuilder.scala index bf4d0287..6b48d3de 100644 --- a/core/src/main/scala/com/itv/bucky/publish/PublishCommandBuilder.scala +++ b/core/src/main/scala/com/itv/bucky/publish/PublishCommandBuilder.scala @@ -60,19 +60,14 @@ object PublishCommandBuilder { case class Builder[T](exchange: ExchangeName, routingKey: RoutingKey, properties: Option[MessageProperties], - marshaller: PayloadMarshaller[T], - mandatory: Boolean = false) + marshaller: PayloadMarshaller[T]) extends PublishCommandBuilder[T] { override def toPublishCommand(t: T): PublishCommand = - PublishCommand(exchange, routingKey, properties.fold(MessageProperties.persistentBasic)(identity), marshaller(t), mandatory) + PublishCommand(exchange, routingKey, properties.fold(MessageProperties.persistentBasic)(identity), marshaller(t)) def using(basicProperties: MessageProperties): Builder[T] = copy(properties = Some(basicProperties)) - - def usingMandatory(mandatory: Boolean): Builder[T] = - copy(mandatory = mandatory) - } } diff --git a/core/src/main/scala/com/itv/bucky/publish/package.scala b/core/src/main/scala/com/itv/bucky/publish/package.scala index d5c461bb..e8f085e8 100644 --- a/core/src/main/scala/com/itv/bucky/publish/package.scala +++ b/core/src/main/scala/com/itv/bucky/publish/package.scala @@ -61,7 +61,7 @@ package object publish { priority = Some(0) ) } - case class PublishCommand(exchange: ExchangeName, routingKey: RoutingKey, basicProperties: MessageProperties, body: Payload, mandatory: Boolean = false) { + case class PublishCommand(exchange: ExchangeName, routingKey: RoutingKey, basicProperties: MessageProperties, body: Payload) { def description = s"${exchange.value}:${routingKey.value} $body" } } diff --git a/core/src/main/scala/com/itv/bucky/wiring/Wiring.scala b/core/src/main/scala/com/itv/bucky/wiring/Wiring.scala index f6cb90c6..0813ce55 100644 --- a/core/src/main/scala/com/itv/bucky/wiring/Wiring.scala +++ b/core/src/main/scala/com/itv/bucky/wiring/Wiring.scala @@ -14,7 +14,6 @@ import cats.effect.implicits._ import com.itv.bucky.consume.{ConsumeAction, DeadLetter, RequeueConsumeAction} import com.itv.bucky.publish.PublishCommandBuilder - final case class WiringName(value: String) extends AnyVal class Wiring[T]( @@ -81,8 +80,9 @@ class Wiring[T]( s"requeuePolicy=$requeuePolicy" ) ) - _ <- client.declare(publisherDeclarations) - } yield client.publisherOf(publisherBuilder) + _ <- client.declare(publisherDeclarations) + publisher <- client.publisherOf(publisherBuilder) + } yield publisher def publisherWithHeaders[F[_]](client: AmqpClient[F])(implicit F: Sync[F]): F[PublisherWithHeaders[F, T]] = for { @@ -96,8 +96,9 @@ class Wiring[T]( s"requeuePolicy=$requeuePolicy" ) } - _ <- client.declare(publisherDeclarations) - } yield client.publisherWithHeadersOf(publisherBuilder) + _ <- client.declare(publisherDeclarations) + publisher <- client.publisherWithHeadersOf(publisherBuilder) + } yield publisher def registerConsumer[F[_]](client: AmqpClient[F])(handleMessage: T => F[ConsumeAction])(implicit F: Sync[F]): Resource[F, Unit] = { val runDeclarations = diff --git a/core/src/test/resources/logback.xml b/core/src/test/resources/logback.xml index 2c3d386a..8837a709 100644 --- a/core/src/test/resources/logback.xml +++ b/core/src/test/resources/logback.xml @@ -2,14 +2,14 @@ - %d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - "%msg"%n + %d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - "%delivery"%n localhost LOCAL3 - content-delivery - work-order-to-workflow-translator - [%thread] %-5level %logger - "%msg"%n + content-delivery - work-order-to-workflow-translator - [%thread] %-5level %logger - "%delivery"%n diff --git a/example/src/main/resources/logback.xml b/example/src/main/resources/logback.xml index 29df8fb5..6a75a7bc 100644 --- a/example/src/main/resources/logback.xml +++ b/example/src/main/resources/logback.xml @@ -2,14 +2,14 @@ - %d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - "%msg"%n + %d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - "%delivery"%n localhost LOCAL3 - content-delivery - work-order-to-workflow-translator - [%thread] %-5level %logger - "%msg"%n + content-delivery - work-order-to-workflow-translator - [%thread] %-5level %logger - "%delivery"%n diff --git a/example/src/main/scala/com/itv/bucky/example/circe/CirceMarshalledPublisher.scala b/example/src/main/scala/com/itv/bucky/example/circe/CirceMarshalledPublisher.scala index ad5c2563..ca5fe5a8 100644 --- a/example/src/main/scala/com/itv/bucky/example/circe/CirceMarshalledPublisher.scala +++ b/example/src/main/scala/com/itv/bucky/example/circe/CirceMarshalledPublisher.scala @@ -2,6 +2,7 @@ package com.itv.bucky.example.circe import cats.effect.{ExitCode, IO, IOApp} import com.itv.bucky._ +import com.itv.bucky.backend.javaamqp.JavaBackendAmqpClient import com.itv.bucky.circe.auto._ import com.itv.bucky.decl._ import com.itv.bucky.example.circe.Shared.Person @@ -28,11 +29,11 @@ object CirceMarshalledPublisher extends IOApp { val amqpClientConfig: AmqpClientConfig = AmqpClientConfig(config.getString("rmq.host"), 5672, "guest", "guest") override def run(args: List[String]): IO[ExitCode] = - AmqpClient[IO](amqpClientConfig).use { client => + JavaBackendAmqpClient[IO](amqpClientConfig).use { client => for { - _ <- client.declare(Declarations.all) - publisher = client.publisherOf[Person](Declarations.exchange.name, Declarations.routingKey) - _ <- publisher(Person("bob", 22)) + _ <- client.declare(Declarations.all) + publisher <- client.publisherOf[Person](Declarations.exchange.name, Declarations.routingKey) + _ <- publisher(Person("bob", 22)) } yield ExitCode.Success } diff --git a/example/src/main/scala/com/itv/bucky/example/circe/CirceUnmarshalledConsumer.scala b/example/src/main/scala/com/itv/bucky/example/circe/CirceUnmarshalledConsumer.scala index b4b77843..1fb55d6e 100644 --- a/example/src/main/scala/com/itv/bucky/example/circe/CirceUnmarshalledConsumer.scala +++ b/example/src/main/scala/com/itv/bucky/example/circe/CirceUnmarshalledConsumer.scala @@ -11,6 +11,8 @@ import com.itv.bucky.example.circe.Shared.Person import com.typesafe.config.{Config, ConfigFactory} import cats.effect._ import cats.implicits._ +import com.itv.bucky.backend.javaamqp.JavaBackendAmqpClient + import scala.concurrent.ExecutionContext.Implicits.global /* @@ -37,7 +39,7 @@ object CirceUnmarshalledConsumer extends IOApp with StrictLogging { override def run(args: List[String]): IO[ExitCode] = (for { - client <- AmqpClient[IO](amqpClientConfig) + client <- JavaBackendAmqpClient[IO](amqpClientConfig) _ <- Resource.eval(client.declare(Declarations.all)) _ <- client.registerConsumerOf(Declarations.queue.name, personHandler) } yield ()).use(_ => IO.never *> IO(ExitCode.Success)) diff --git a/example/src/main/scala/com/itv/bucky/example/marshalling/MarshallingPublisher.scala b/example/src/main/scala/com/itv/bucky/example/marshalling/MarshallingPublisher.scala index 3be5b58c..b1a9e516 100644 --- a/example/src/main/scala/com/itv/bucky/example/marshalling/MarshallingPublisher.scala +++ b/example/src/main/scala/com/itv/bucky/example/marshalling/MarshallingPublisher.scala @@ -2,6 +2,7 @@ package com.itv.bucky.example.marshalling import cats.effect.{ExitCode, IO, IOApp} import com.itv.bucky.PayloadMarshaller.StringPayloadMarshaller +import com.itv.bucky.backend.javaamqp.JavaBackendAmqpClient import com.itv.bucky.decl._ import com.itv.bucky.example.marshalling.Shared.Person import com.itv.bucky.publish._ @@ -9,7 +10,6 @@ import com.typesafe.config.ConfigFactory import scala.concurrent._ import scala.concurrent.duration._ - import scala.concurrent.ExecutionContext.Implicits.global object MarshallingPublisher extends IOApp { @@ -37,10 +37,10 @@ object MarshallingPublisher extends IOApp { //start snippet 3 override def run(args: List[String]): IO[ExitCode] = - AmqpClient[IO](amqpClientConfig).use { client => + JavaBackendAmqpClient[IO](amqpClientConfig).use { client => for { _ <- client.declare(Seq(Declarations.exchange)) - publisher = client.publisherOf[Person] + publisher <- client.publisherOf[Person] _ <- publisher(Person("Bob", 67)) } yield ExitCode.Success } diff --git a/example/src/main/scala/com/itv/bucky/example/marshalling/UnmarshallingConsumer.scala b/example/src/main/scala/com/itv/bucky/example/marshalling/UnmarshallingConsumer.scala index c196603b..69103848 100644 --- a/example/src/main/scala/com/itv/bucky/example/marshalling/UnmarshallingConsumer.scala +++ b/example/src/main/scala/com/itv/bucky/example/marshalling/UnmarshallingConsumer.scala @@ -9,6 +9,7 @@ import com.typesafe.config.ConfigFactory import com.typesafe.scalalogging.StrictLogging import cats.effect._ import cats.implicits._ +import com.itv.bucky.backend.javaamqp.JavaBackendAmqpClient import scala.concurrent.ExecutionContext.Implicits.global @@ -55,7 +56,7 @@ object UnmarshallingConsumer extends IOApp with StrictLogging { //start snippet 4 override def run(args: List[String]): IO[ExitCode] = (for { - amqpClient <- AmqpClient[IO](amqpClientConfig) + amqpClient <- JavaBackendAmqpClient[IO](amqpClientConfig) _ <- Resource.eval(amqpClient.declare(Declarations.all)) _ <- amqpClient.registerConsumerOf(Declarations.queue.name, personHandler) } yield ()).use(_ => IO.never *> IO.delay(ExitCode.Success)) diff --git a/example/src/main/scala/com/itv/bucky/example/requeue/RequeueConsumer.scala b/example/src/main/scala/com/itv/bucky/example/requeue/RequeueConsumer.scala index 22919fb7..192f5ad0 100644 --- a/example/src/main/scala/com/itv/bucky/example/requeue/RequeueConsumer.scala +++ b/example/src/main/scala/com/itv/bucky/example/requeue/RequeueConsumer.scala @@ -2,9 +2,10 @@ package com.itv.bucky.example.requeue import cats.effect.{ExitCode, IO, IOApp, Resource} import com.itv.bucky.Unmarshaller.StringPayloadUnmarshaller -import com.itv.bucky.decl._ import com.itv.bucky._ +import com.itv.bucky.backend.javaamqp.JavaBackendAmqpClient import com.itv.bucky.consume._ +import com.itv.bucky.decl._ import com.itv.bucky.pattern.requeue._ import com.typesafe.config.{Config, ConfigFactory} import com.typesafe.scalalogging.StrictLogging @@ -12,9 +13,6 @@ import com.typesafe.scalalogging.StrictLogging import scala.concurrent.ExecutionContext.Implicits.global import scala.concurrent.duration._ -import cats.effect._ -import cats.implicits._ - object RequeueConsumer extends IOApp with StrictLogging { object Declarations { @@ -28,7 +26,7 @@ object RequeueConsumer extends IOApp with StrictLogging { val stringToLogRequeueHandler: RequeueHandler[IO, String] = RequeueHandler[IO, String] { message: String => IO.delay { - logger.info(message) + println(message) message match { case "requeue" => Requeue @@ -40,9 +38,9 @@ object RequeueConsumer extends IOApp with StrictLogging { override def run(args: List[String]): IO[ExitCode] = (for { - amqpClient <- AmqpClient[IO](amqpClientConfig) - _ <- Resource.eval(amqpClient.declare(Declarations.all)) - _ <- amqpClient.registerRequeueConsumerOf(Declarations.queue.name, - stringToLogRequeueHandler) - } yield ()).use(_ => IO.never *> IO(ExitCode.Success)) -} + amqpClient <- JavaBackendAmqpClient[IO](amqpClientConfig) + _ <- Resource.eval(amqpClient.declare(Declarations.all)) + _ <- amqpClient.registerRequeueConsumerOf(Declarations.queue.name, + stringToLogRequeueHandler, requeuePolicy = RequeuePolicy(10, 5.seconds)) + } yield ()).use(_ => IO.never *> IO(ExitCode.Success)) +} \ No newline at end of file diff --git a/it/src/test/resources/logback.xml b/it/src/test/resources/logback.xml index 06ca111a..11a983a8 100644 --- a/it/src/test/resources/logback.xml +++ b/it/src/test/resources/logback.xml @@ -1,22 +1,14 @@ - - %d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - "%msg"%n + %d{HH:mm:ss.SSS} %thread %-5level %logger{36} - "%msg"%n - - localhost - LOCAL3 - content-delivery - work-order-to-workflow-translator - [%thread] %-5level %logger - "%msg"%n - - - diff --git a/it/src/test/scala/com/itv/bucky/integrationTest/HammerTest.scala b/it/src/test/scala/com/itv/bucky/HammerTest.scala similarity index 80% rename from it/src/test/scala/com/itv/bucky/integrationTest/HammerTest.scala rename to it/src/test/scala/com/itv/bucky/HammerTest.scala index a9720500..a517bfde 100644 --- a/it/src/test/scala/com/itv/bucky/integrationTest/HammerTest.scala +++ b/it/src/test/scala/com/itv/bucky/HammerTest.scala @@ -1,4 +1,4 @@ -package com.itv.bucky.integrationTest +package com.itv.bucky import cats.effect.testing.scalatest.EffectTestSupport import cats.effect.unsafe.IORuntime @@ -7,6 +7,8 @@ import cats.implicits._ import com.itv.bucky.PayloadMarshaller.StringPayloadMarshaller import com.itv.bucky.Unmarshaller.StringPayloadUnmarshaller import com.itv.bucky._ +import com.itv.bucky.backend.fs2rabbit.Fs2RabbitAmqpClient +import com.itv.bucky.backend.javaamqp.JavaBackendAmqpClient import com.itv.bucky.consume.Ack import com.itv.bucky.decl.{Exchange, Queue} import com.itv.bucky.test.StubHandlers @@ -21,7 +23,14 @@ import java.util.UUID import scala.collection.immutable.TreeSet import scala.concurrent.duration._ -class HammerTest extends AsyncFunSuite with EffectTestSupport with Eventually with IntegrationPatience with StrictLogging with Matchers { +class HammerTest + extends AsyncFunSuite + with IntegrationSpec + with EffectTestSupport + with Eventually + with IntegrationPatience + with StrictLogging + with Matchers { implicit override val ioRuntime: IORuntime = packageIORuntime case class TestFixture(stubHandler: RecordingHandler[IO, String], publisher: Publisher[IO, String], client: AmqpClient[IO]) @@ -47,15 +56,17 @@ class HammerTest extends AsyncFunSuite with EffectTestSupport with Eventually wi Exchange(exchangeName).binding(routingKey -> queueName).autoDelete.expires(20.minutes) ) - AmqpClient[IO](config).use { client => + Fs2RabbitAmqpClient[IO](config).use { client => val handler = StubHandlers.ackHandler[IO, String] val handlerResource = Resource.eval(client.declare(declarations)).flatMap(_ => client.registerConsumerOf(queueName, handler)) handlerResource.use { _ => - val pub = client.publisherOf[String](exchangeName, routingKey) - val fixture = TestFixture(handler, pub, client) - test(fixture) + client.publisherOf[String](exchangeName, routingKey).flatMap { pub => + val fixture = TestFixture(handler, pub, client) + + test(fixture) + } } } } @@ -105,9 +116,6 @@ class HammerTest extends AsyncFunSuite with EffectTestSupport with Eventually wi val order: Ref[IO, List[String]] = Ref.of[IO, List[String]](List.empty).unsafeRunSync() - val fastPublisher = testFixture.client.publisherOf[String](exchange, fastRk) - val slowPublisher = testFixture.client.publisherOf[String](exchange, slowRk) - val fastHandler = new RecordingHandler[IO, String]((v1: String) => for { _ <- order.update(_ :+ "fast") @@ -132,15 +140,19 @@ class HammerTest extends AsyncFunSuite with EffectTestSupport with Eventually wi val handlersResource = Resource.eval(testFixture.client.declare(declarations)).flatMap(_ => handlers) - handlersResource.use { _ => - for { - _ <- slowPublisher("slow one") - _ <- IO.sleep(1.second) - _ <- fastPublisher("fast one") - } yield eventually { - val messages = order.get.unsafeRunSync() - messages should have size 2 - messages shouldBe List("fast", "slow") + testFixture.client.publisherOf[String](exchange, fastRk).flatMap { fastPublisher => + testFixture.client.publisherOf[String](exchange, slowRk).flatMap { slowPublisher => + handlersResource.use { _ => + for { + _ <- slowPublisher("slow one") + _ <- IO.sleep(1.second) + _ <- fastPublisher("fast one") + } yield eventually { + val messages = order.get.unsafeRunSync() + messages should have size 2 + messages shouldBe List("fast", "slow") + } + } } } } diff --git a/it/src/test/scala/com/itv/bucky/integrationTest/package.scala b/it/src/test/scala/com/itv/bucky/IntegrationSpec.scala similarity index 96% rename from it/src/test/scala/com/itv/bucky/integrationTest/package.scala rename to it/src/test/scala/com/itv/bucky/IntegrationSpec.scala index 5287b2f9..8fdead32 100644 --- a/it/src/test/scala/com/itv/bucky/integrationTest/package.scala +++ b/it/src/test/scala/com/itv/bucky/IntegrationSpec.scala @@ -5,7 +5,7 @@ import cats.effect.unsafe.{IORuntime, IORuntimeConfig, Scheduler} import java.util.concurrent.{Executors, ScheduledThreadPoolExecutor} import scala.concurrent.ExecutionContext -package object integrationTest { +trait IntegrationSpec { val schedulerExecutor = new ScheduledThreadPoolExecutor( 1, diff --git a/it/src/test/scala/com/itv/bucky/integrationTest/PublishIntegrationTest.scala b/it/src/test/scala/com/itv/bucky/PublishIntegrationTest.scala similarity index 68% rename from it/src/test/scala/com/itv/bucky/integrationTest/PublishIntegrationTest.scala rename to it/src/test/scala/com/itv/bucky/PublishIntegrationTest.scala index 5d5eb9c8..4a287865 100644 --- a/it/src/test/scala/com/itv/bucky/integrationTest/PublishIntegrationTest.scala +++ b/it/src/test/scala/com/itv/bucky/PublishIntegrationTest.scala @@ -1,9 +1,11 @@ -package com.itv.bucky.integrationTest +package com.itv.bucky import cats.effect.unsafe.IORuntime import cats.effect.{IO, Resource} import com.itv.bucky.PayloadMarshaller.StringPayloadMarshaller import com.itv.bucky.Unmarshaller.StringPayloadUnmarshaller +import com.itv.bucky.backend.fs2rabbit.Fs2RabbitAmqpClient +import com.itv.bucky.backend.javaamqp.JavaBackendAmqpClient import com.itv.bucky.decl.Exchange import com.itv.bucky.pattern.requeue.RequeuePolicy import com.itv.bucky.publish.{PublishCommand, PublishCommandBuilder} @@ -18,27 +20,31 @@ import java.util.concurrent.Executors import scala.concurrent.ExecutionContext import scala.concurrent.duration.DurationInt -class PublishIntegrationTest extends AnyFunSuite with Eventually with IntegrationPatience { +class PublishIntegrationTest extends AnyFunSuite with IntegrationSpec with Eventually with IntegrationPatience { implicit val ioRuntime: IORuntime = packageIORuntime implicit val ec: ExecutionContext = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(300)) val requeuePolicy: RequeuePolicy = RequeuePolicy(maximumProcessAttempts = 5, requeueAfter = 2.seconds) - test("publisher should error if mandatory is true and there is no routing") { - withTestFixture{ + test("publisher should error if mandatory is and there is no routing") { + withTestFixture(mandatory = true){ case (builder, publisher) => - publisher(builder.usingMandatory(true).toPublishCommand("Where am I going?")).attempt.map(_.isLeft shouldBe true) + publisher(builder.toPublishCommand("Where am I going?")).attempt.map(res => { + println("result: " + res) + res.isLeft shouldBe true + } + ) } } test("publisher should publish if mandatory is false and there is no routing") { - withTestFixture { + withTestFixture(mandatory = false) { case (builder, publisher) => - publisher(builder.usingMandatory(false).toPublishCommand("But seriously though, where am I going?")).attempt.map(_.isRight shouldBe true) + publisher(builder.toPublishCommand("But seriously though, where am I going?")).attempt.map(_.isRight shouldBe true) } } - def withTestFixture(test: (PublishCommandBuilder.Builder[String], Publisher[IO, PublishCommand]) => IO[Unit]): Unit = { + def withTestFixture(mandatory: Boolean)(test: (PublishCommandBuilder.Builder[String], Publisher[IO, PublishCommand]) => IO[Unit]): Unit = { val rawConfig = ConfigFactory.load("bucky") val config = AmqpClientConfig(rawConfig.getString("rmq.host"), @@ -51,7 +57,7 @@ class PublishIntegrationTest extends AnyFunSuite with Eventually with Integratio val exchangeName = ExchangeName(UUID.randomUUID().toString) val routingKey = RoutingKey(UUID.randomUUID().toString) - AmqpClient[IO](config) + Fs2RabbitAmqpClient[IO](config) .use { client => Resource .eval( @@ -59,8 +65,9 @@ class PublishIntegrationTest extends AnyFunSuite with Eventually with Integratio ) .use { _ => val pcb = publishCommandBuilder[String](implicitly).using(exchangeName).using(routingKey) - val pub = client.publisher() - test(pcb, pub) + client.publisher(mandatory).flatMap { pub => + test(pcb, pub) + } } } .unsafeRunSync() diff --git a/it/src/test/scala/com/itv/bucky/integrationTest/RequeueIntegrationTest.scala b/it/src/test/scala/com/itv/bucky/RequeueIntegrationTest.scala similarity index 83% rename from it/src/test/scala/com/itv/bucky/integrationTest/RequeueIntegrationTest.scala rename to it/src/test/scala/com/itv/bucky/RequeueIntegrationTest.scala index be958f70..87edb49c 100644 --- a/it/src/test/scala/com/itv/bucky/integrationTest/RequeueIntegrationTest.scala +++ b/it/src/test/scala/com/itv/bucky/RequeueIntegrationTest.scala @@ -1,4 +1,4 @@ -package com.itv.bucky.integrationTest +package com.itv.bucky import cats.effect.testing.scalatest.EffectTestSupport import cats.effect.unsafe.IORuntime @@ -6,6 +6,8 @@ import cats.effect.{IO, Resource} import com.itv.bucky.PayloadMarshaller.StringPayloadMarshaller import com.itv.bucky.Unmarshaller.StringPayloadUnmarshaller import com.itv.bucky._ +import com.itv.bucky.backend.fs2rabbit.Fs2RabbitAmqpClient +import com.itv.bucky.backend.javaamqp.JavaBackendAmqpClient import com.itv.bucky.consume._ import com.itv.bucky.decl.Exchange import com.itv.bucky.pattern.requeue @@ -23,7 +25,7 @@ import java.util.concurrent.Executors import scala.concurrent.ExecutionContext import scala.concurrent.duration._ -class RequeueIntegrationTest extends AsyncFunSuite with EffectTestSupport with Eventually with IntegrationPatience { +class RequeueIntegrationTest extends AsyncFunSuite with IntegrationSpec with EffectTestSupport with Eventually with IntegrationPatience { case class TestFixture( stubHandler: RecordingRequeueHandler[IO, Delivery], @@ -33,8 +35,8 @@ class RequeueIntegrationTest extends AsyncFunSuite with EffectTestSupport with E ) implicit override val ioRuntime: IORuntime = packageIORuntime - implicit val ec: ExecutionContext = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(300)) - val requeuePolicy = RequeuePolicy(maximumProcessAttempts = 5, requeueAfter = 2.seconds) + implicit val ec: ExecutionContext = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(300)) + val requeuePolicy = RequeuePolicy(maximumProcessAttempts = 5, requeueAfter = 2.seconds) def withTestFixture(test: TestFixture => IO[Unit]): IO[Unit] = { val rawConfig = ConfigFactory.load("bucky") @@ -57,7 +59,7 @@ class RequeueIntegrationTest extends AsyncFunSuite with EffectTestSupport with E Exchange(exchangeName).binding(routingKey -> queueName) ) ++ requeue.requeueDeclarations(queueName, routingKey) - AmqpClient[IO](config).use { client => + Fs2RabbitAmqpClient[IO](config).use { client => val handler = StubHandlers.requeueRequeueHandler[IO, Delivery] val dlqHandler = StubHandlers.ackHandler[IO, Delivery] @@ -70,10 +72,11 @@ class RequeueIntegrationTest extends AsyncFunSuite with EffectTestSupport with E } yield () ) .use { _ => - val pub = client.publisher() - val pcb = publishCommandBuilder[String](implicitly).using(exchangeName).using(routingKey) - val fixture = TestFixture(handler, dlqHandler, pcb, pub) - test(fixture) + client.publisher().flatMap { pub => + val pcb = publishCommandBuilder[String](implicitly).using(exchangeName).using(routingKey) + val fixture = TestFixture(handler, dlqHandler, pcb, pub) + test(fixture) + } } } } diff --git a/it/src/test/scala/com/itv/bucky/integrationTest/RequeueWithExpiryActionIntegrationTest.scala b/it/src/test/scala/com/itv/bucky/RequeueWithExpiryActionIntegrationTest.scala similarity index 82% rename from it/src/test/scala/com/itv/bucky/integrationTest/RequeueWithExpiryActionIntegrationTest.scala rename to it/src/test/scala/com/itv/bucky/RequeueWithExpiryActionIntegrationTest.scala index 57d2ea43..80970442 100644 --- a/it/src/test/scala/com/itv/bucky/integrationTest/RequeueWithExpiryActionIntegrationTest.scala +++ b/it/src/test/scala/com/itv/bucky/RequeueWithExpiryActionIntegrationTest.scala @@ -1,4 +1,4 @@ -package com.itv.bucky.integrationTest +package com.itv.bucky import cats.data.Kleisli import cats.effect.testing.scalatest.EffectTestSupport @@ -7,6 +7,8 @@ import cats.effect.{IO, Resource} import com.itv.bucky.PayloadMarshaller.StringPayloadMarshaller import com.itv.bucky.Unmarshaller.StringPayloadUnmarshaller import com.itv.bucky._ +import com.itv.bucky.backend.fs2rabbit.Fs2RabbitAmqpClient +import com.itv.bucky.backend.javaamqp.JavaBackendAmqpClient import com.itv.bucky.consume._ import com.itv.bucky.decl.Exchange import com.itv.bucky.pattern.requeue @@ -25,7 +27,12 @@ import scala.collection.mutable.ListBuffer import scala.concurrent.ExecutionContext import scala.concurrent.duration._ -class RequeueWithExpiryActionIntegrationTest extends AsyncFunSuite with EffectTestSupport with Eventually with IntegrationPatience { +class RequeueWithExpiryActionIntegrationTest + extends AsyncFunSuite + with IntegrationSpec + with EffectTestSupport + with Eventually + with IntegrationPatience { case class TestFixture( stubHandler: RecordingRequeueHandler[IO, String], @@ -35,8 +42,8 @@ class RequeueWithExpiryActionIntegrationTest extends AsyncFunSuite with EffectTe ) implicit override val ioRuntime: IORuntime = packageIORuntime - implicit val ec: ExecutionContext = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(300)) - val requeuePolicy: RequeuePolicy = RequeuePolicy(maximumProcessAttempts = 5, requeueAfter = 2.seconds) + implicit val ec: ExecutionContext = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(300)) + val requeuePolicy: RequeuePolicy = RequeuePolicy(maximumProcessAttempts = 5, requeueAfter = 2.seconds) def withTestFixture[F[_]](onRequeueExpiryAction: String => IO[ConsumeAction], handlerAction: String => IO[Unit] = _ => IO.unit)( test: TestFixture => IO[Unit] @@ -63,7 +70,7 @@ class RequeueWithExpiryActionIntegrationTest extends AsyncFunSuite with EffectTe Exchange(exchangeName).binding(routingKey -> queueName) ) ++ requeue.requeueDeclarations(queueName, routingKey) - AmqpClient[IO](config).use { client => + Fs2RabbitAmqpClient[IO](config).use { client => val handler = new RecordingRequeueHandler[IO, String](Kleisli(handlerAction).andThen(_ => IO(Requeue)).run) val dlqHandler = StubHandlers.ackHandler[IO, String] @@ -81,10 +88,11 @@ class RequeueWithExpiryActionIntegrationTest extends AsyncFunSuite with EffectTe } yield () ) .use { _ => - val pub = client.publisher() - val pcb = publishCommandBuilder[String](implicitly).using(exchangeName).using(routingKey) - val fixture = TestFixture(handler, dlqHandler, pcb, pub) - test(fixture) + client.publisher().flatMap { pub => + val pcb = publishCommandBuilder[String](implicitly).using(exchangeName).using(routingKey) + val fixture = TestFixture(handler, dlqHandler, pcb, pub) + test(fixture) + } } } } diff --git a/it/src/test/scala/com/itv/bucky/integrationTest/ShutdownTimeoutTest.scala b/it/src/test/scala/com/itv/bucky/ShutdownTimeoutTest.scala similarity index 62% rename from it/src/test/scala/com/itv/bucky/integrationTest/ShutdownTimeoutTest.scala rename to it/src/test/scala/com/itv/bucky/ShutdownTimeoutTest.scala index 7fc817d8..5020d7bb 100644 --- a/it/src/test/scala/com/itv/bucky/integrationTest/ShutdownTimeoutTest.scala +++ b/it/src/test/scala/com/itv/bucky/ShutdownTimeoutTest.scala @@ -1,11 +1,13 @@ -package com.itv.bucky.integrationTest +package com.itv.bucky -import cats.effect.testing.scalatest.EffectTestSupport +import cats.effect.testing.scalatest.{AsyncIOSpec, EffectTestSupport} import cats.effect.unsafe.IORuntime -import cats.effect.{IO, Resource} +import cats.effect.{Deferred, IO, Ref, Resource} import com.itv.bucky.PayloadMarshaller.StringPayloadMarshaller import com.itv.bucky.Unmarshaller.StringPayloadUnmarshaller import com.itv.bucky._ +import com.itv.bucky.backend.fs2rabbit.Fs2RabbitAmqpClient +import com.itv.bucky.backend.javaamqp.JavaBackendAmqpClient import com.itv.bucky.consume._ import com.itv.bucky.decl.Exchange import com.itv.bucky.pattern.requeue @@ -20,21 +22,17 @@ import org.scalatest.matchers.should.Matchers._ import java.time.{Clock, Instant, LocalDateTime, ZoneOffset} import java.util.UUID -import java.util.concurrent.Executors -import scala.concurrent.ExecutionContext import scala.concurrent.duration._ -class ShutdownTimeoutTest extends AsyncFunSuite with EffectTestSupport with Eventually with IntegrationPatience { +class ShutdownTimeoutTest extends AsyncFunSuite with AsyncIOSpec with Eventually with IntegrationPatience { case class TestFixture( - stubHandler: RecordingRequeueHandler[IO, Delivery], - dlqHandler: RecordingHandler[IO, Delivery], - publishCommandBuilder: PublishCommandBuilder.Builder[String], - publisher: Publisher[IO, PublishCommand] - ) + stubHandler: RecordingRequeueHandler[IO, Delivery], + dlqHandler: RecordingHandler[IO, Delivery], + publishCommandBuilder: PublishCommandBuilder.Builder[String], + publisher: Publisher[IO, PublishCommand] + ) - implicit override val ioRuntime: IORuntime = packageIORuntime - implicit val ec: ExecutionContext = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(300)) val requeuePolicy: RequeuePolicy = RequeuePolicy(maximumProcessAttempts = 5, requeueAfter = 2.seconds) def runTest[A](test: IO[A]): IO[A] = { @@ -53,24 +51,26 @@ class ShutdownTimeoutTest extends AsyncFunSuite with EffectTestSupport with Even val queueName = QueueName(UUID.randomUUID().toString) val declarations = List(Exchange(exchangeName).binding(routingKey -> queueName)) ++ requeue.requeueDeclarations(queueName, routingKey) - AmqpClient[IO](config) - .use { client => - val handler = StubHandlers.recordingHandler[IO, Delivery]((_: Delivery) => IO.sleep(3.seconds).map(_ => Ack)) - Resource - .eval(client.declare(declarations)) - .flatMap(_ => - for { - _ <- client.registerConsumer(queueName, handler) - } yield () - ) - .use { _ => - val pcb = publishCommandBuilder[String](implicitly).using(exchangeName).using(routingKey) - client.publisher()(pcb.toPublishCommand("a message")).flatMap(_ => test) - } - } + Deferred[IO, Boolean].flatMap { consumingMessage => + Fs2RabbitAmqpClient[IO](config) + .use { client => + val handler = (_: Delivery) => consumingMessage.complete(true) *> IO.sleep(3.seconds).as(Ack) + Resource + .eval(client.declare(declarations)) + .flatMap(_ => client.registerConsumer(queueName, handler)) + .use { _ => + val pcb = publishCommandBuilder[String](implicitly).using(exchangeName).using(routingKey) + client.publisher().flatMap { publisher => + publisher(pcb.toPublishCommand("a message")) *> + consumingMessage.get + .flatMap(_ => test) + } + } + } + } } - test("Should wait until a handler finishes executing before shuttind down") { + test("Should wait until a handler finishes executing before shutting down") { val clock = Clock.systemUTC() val start = Instant.now(clock) runTest[Instant](IO.delay(Instant.now())).map { result => @@ -81,4 +81,4 @@ class ShutdownTimeoutTest extends AsyncFunSuite with EffectTestSupport with Even (result.toEpochMilli - after.toEpochMilli) < 3000 shouldBe true } } -} \ No newline at end of file +} diff --git a/test/src/main/scala/com/itv/bucky/test/package.scala b/test/src/main/scala/com/itv/bucky/test/package.scala index 045f2b2a..0581a28b 100644 --- a/test/src/main/scala/com/itv/bucky/test/package.scala +++ b/test/src/main/scala/com/itv/bucky/test/package.scala @@ -14,6 +14,7 @@ import scala.concurrent.duration._ import cats.effect.Temporal import cats.effect.std.Dispatcher import cats.effect.unsafe.IORuntime +import com.itv.bucky.backend.javaamqp.{Channel, JavaBackendAmqpClient} package object test { object Config { @@ -36,7 +37,7 @@ package object test { def publishNoAck[F[_]](implicit F: Async[F], t: Temporal[F]): StubChannel[F] = new StubChannel[F]() { - override def publish(sequenceNumber: Long, cmd: PublishCommand): F[Unit] = F.delay { + override def publish(sequenceNumber: Long, cmd: PublishCommand, mandatory: Boolean): F[Unit] = F.delay { pubSeqLock.synchronized { publishSeq = sequenceNumber + 1 } @@ -82,7 +83,7 @@ package object test { def client(channel: StubChannel[F], config: AmqpClientConfig)(implicit async: Async[F]): Resource[F, AmqpClient[F]] = { Dispatcher.parallel[F].flatMap { dispatcher => - AmqpClient[F]( + JavaBackendAmqpClient[F]( config, () => Resource.pure[F, Channel[F]](channel), Resource.pure[F, Channel[F]](channel), diff --git a/test/src/main/scala/com/itv/bucky/test/stubs/StubChannel.scala b/test/src/main/scala/com/itv/bucky/test/stubs/StubChannel.scala index 9c831ace..e58c6f74 100644 --- a/test/src/main/scala/com/itv/bucky/test/stubs/StubChannel.scala +++ b/test/src/main/scala/com/itv/bucky/test/stubs/StubChannel.scala @@ -5,12 +5,13 @@ import java.util.concurrent.atomic.AtomicLong import com.itv.bucky import com.itv.bucky.consume._ import com.itv.bucky.publish._ -import com.itv.bucky.{Channel, Envelope, ExchangeName, Handler, QueueName, RoutingKey} +import com.itv.bucky.{Envelope, ExchangeName, Handler, QueueName, RoutingKey} import com.itv.bucky.decl.{Binding, Direct, Exchange, ExchangeBinding, ExchangeType, Headers, Queue, Topic} import com.rabbitmq.client.{ConfirmListener, ReturnListener} import cats._ import cats.effect._ import cats.implicits._ +import com.itv.bucky.backend.javaamqp.Channel import com.typesafe.scalalogging.StrictLogging import scala.collection.mutable @@ -134,7 +135,7 @@ abstract class StubChannel[F[_]](implicit F: Async[F]) extends Channel[F] with S .map(_.queueName) .toList) - override def publish(sequenceNumber: Long, cmd: PublishCommand): F[Unit] = { + override def publish(sequenceNumber: Long, cmd: PublishCommand, mandatory: Boolean): F[Unit] = { val queues = lookupQueues(cmd) val subscribedHandlers = handlers.view .filterKeys(queues.contains) diff --git a/test/src/test/resources/logback.xml b/test/src/test/resources/logback.xml index 2c3d386a..8837a709 100644 --- a/test/src/test/resources/logback.xml +++ b/test/src/test/resources/logback.xml @@ -2,14 +2,14 @@ - %d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - "%msg"%n + %d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - "%delivery"%n localhost LOCAL3 - content-delivery - work-order-to-workflow-translator - [%thread] %-5level %logger - "%msg"%n + content-delivery - work-order-to-workflow-translator - [%thread] %-5level %logger - "%delivery"%n diff --git a/test/src/test/scala/com/itv/bucky/test/PublishConsumeTest.scala b/test/src/test/scala/com/itv/bucky/test/PublishConsumeTest.scala index 030ddbb9..e66a38a1 100644 --- a/test/src/test/scala/com/itv/bucky/test/PublishConsumeTest.scala +++ b/test/src/test/scala/com/itv/bucky/test/PublishConsumeTest.scala @@ -44,7 +44,7 @@ class PublishConsumeTest Resource.eval(client.declare(declarations)).flatMap(_ => client.registerConsumer(queue, handler)).use { _ => for { - _ <- client.publisher()(commandBuilder) + _ <- client.publisher().flatMap(publisher => publisher(commandBuilder)) } yield handler.receivedMessages should have size 1 } } @@ -66,7 +66,7 @@ class PublishConsumeTest Resource.eval(client.declare(declarations)).flatMap(_ => client.registerConsumer(queue, handler)).use { _ => for { - _ <- client.publisher()(commandBuilder) + _ <- client.publisher().flatMap(publisher => publisher(commandBuilder)) } yield handler.receivedMessages should have size 1 } } @@ -91,8 +91,8 @@ class PublishConsumeTest Resource.eval(client.declare(declarations)).flatMap(_ => client.registerConsumer(queue, handler)).use { _ => for { - _ <- client.publisher()(commandBuilder.using(rkRouted).toPublishCommand(message)) - _ <- client.publisher()(commandBuilder.using(rkUnrouted).toPublishCommand(message)) + _ <- client.publisher().flatMap(publisher => publisher(commandBuilder.using(rkRouted).toPublishCommand(message))) + _ <- client.publisher().flatMap(publisher => publisher(commandBuilder.using(rkUnrouted).toPublishCommand(message))) } yield handler.receivedMessages should have size 1 } } @@ -117,8 +117,8 @@ class PublishConsumeTest Resource.eval(client.declare(declarations)).flatMap(_ => client.registerConsumer(queue, handler)).use { _ => for { - _ <- client.publisher()(commandBuilder.using(rkRouted).toPublishCommand(message)) - _ <- client.publisher()(commandBuilder.using(rkUnrouted).toPublishCommand(message)) + _ <- client.publisher().flatMap(publisher => publisher(commandBuilder.using(rkRouted).toPublishCommand(message))) + _ <- client.publisher().flatMap(publisher => publisher(commandBuilder.using(rkUnrouted).toPublishCommand(message))) } yield handler.receivedMessages should have size 1 } } @@ -148,7 +148,7 @@ class PublishConsumeTest Resource.eval(client.declare(declarations)).flatMap(_ => client.registerConsumer(queue, handler)).use { _ => for { - _ <- client.publisher()(commandBuilder) + _ <- client.publisher().flatMap(publisher => publisher(commandBuilder)) } yield handler.receivedMessages should have size 1 } } @@ -179,7 +179,7 @@ class PublishConsumeTest Resource.eval(client.declare(declarations)).flatMap(_ => client.registerConsumer(queue, handler)).use { _ => for { - _ <- client.publisher()(commandBuilder) + _ <- client.publisher().flatMap(publisher => publisher(commandBuilder)) } yield handler.receivedMessages should have size 1 } } @@ -217,9 +217,9 @@ class PublishConsumeTest Resource.eval(client.declare(declarations)).flatMap(_ => client.registerConsumer(queue, handler)).use { _ => for { - _ <- client.publisher()(message1) + _ <- client.publisher().flatMap(publisher => publisher(message1)) firstCount = handler.receivedMessages.size - _ <- client.publisher()(message2) + _ <- client.publisher().flatMap(publisher => publisher(message2)) secondCount = handler.receivedMessages.size } yield { firstCount shouldBe 0 @@ -257,7 +257,7 @@ class PublishConsumeTest Resource.eval(client.declare(declarations)).flatMap(_ => client.registerConsumer(queue, handler)).use { _ => for { - _ <- client.publisher()(commandBuilder) + _ <- client.publisher().flatMap(publisher => publisher(commandBuilder)) } yield handler.receivedMessages should have size 1 } } @@ -280,9 +280,9 @@ class PublishConsumeTest val headers: Map[String, AnyRef] = Map("foo" -> "bar") Resource.eval(client.declare(declarations)).flatMap(_ => client.registerConsumer(queue, handler)).use { _ => - val publisher = new PublisherSugar(client).publisherWithHeadersOf(commandBuilder) for { - _ <- publisher(message, headers) + publisher <- new PublisherSugar(client).publisherWithHeadersOf(commandBuilder) + _ <- publisher(message, headers) } yield { handler.receivedMessages should have size 1 handler.receivedMessages.head.properties.headers shouldBe headers @@ -307,7 +307,7 @@ class PublishConsumeTest Resource.eval(client.declare(declarations)).flatMap(_ => client.registerConsumer(queue, handler)).use { _ => for { - publishResult <- client.publisher()(commandBuilder).attempt + publishResult <- client.publisher().flatMap(publisher => publisher(commandBuilder)).attempt } yield { publishResult.left.value shouldBe a[TimeoutException] handler.receivedMessages should have size 0 @@ -331,9 +331,9 @@ class PublishConsumeTest val declarations = List(Queue(queue), Exchange(exchange).binding((rk, queue))) Resource.eval(client.declare(declarations)).flatMap(_ => client.registerConsumer(queue, handler)).use { _ => - val publisher = client.publisherOf[String] for { - _ <- publisher(message) + publisher <- client.publisherOf[String] + _ <- publisher(message) } yield handler.receivedMessages should have size 1 } } @@ -351,9 +351,9 @@ class PublishConsumeTest val declarations = List(Queue(queue), Exchange(exchange).binding((rk, queue))) Resource.eval(client.declare(declarations)).flatMap(_ => client.registerConsumer(queue, handler)).use { _ => - val publisher = client.publisherOf[String](exchange, rk) for { - _ <- publisher(message) + publisher <- client.publisherOf[String](exchange, rk) + _ <- publisher(message) } yield handler.receivedMessages should have size 1 } } @@ -388,9 +388,9 @@ class PublishConsumeTest } yield () ) .use { _ => - val publisher = client.publisherOf[String](exchange, rk) for { - _ <- publisher("hello") + publisher <- client.publisherOf[String](exchange, rk) + _ <- publisher("hello") } yield requeueHandler.receivedMessages should have size 1 } } diff --git a/test/src/test/scala/com/itv/bucky/test/PublisherTest.scala b/test/src/test/scala/com/itv/bucky/test/PublisherTest.scala index 4699193f..f35acab4 100644 --- a/test/src/test/scala/com/itv/bucky/test/PublisherTest.scala +++ b/test/src/test/scala/com/itv/bucky/test/PublisherTest.scala @@ -5,6 +5,7 @@ import cats.effect.unsafe.IORuntime import cats.effect.{IO, Outcome} import cats.implicits._ import com.itv.bucky.PayloadMarshaller.StringPayloadMarshaller +import com.itv.bucky.backend.javaamqp.publish.PendingConfirmListener import com.itv.bucky.publish._ import com.itv.bucky.{ExchangeName, QueueName, RoutingKey} import com.rabbitmq.client.AMQP.BasicProperties @@ -40,7 +41,7 @@ class PublisherTest extends AnyFunSuite with IOAmqpClientTest with EitherValues runAmqpTestIO(client(channel, Config.empty(10.seconds))) { client => for { pubSeq <- IO(channel.publishSeq) - future <- IO(client.publisher()(commandBuilder).unsafeToFuture()) + future <- IO(client.publisher().flatMap(publisher => publisher(commandBuilder)).unsafeToFuture()) _ <- IO.sleep(3.seconds) _ <- IO.sleep(3.seconds) isCompleted1 <- IO(future.isCompleted) @@ -55,7 +56,7 @@ class PublisherTest extends AnyFunSuite with IOAmqpClientTest with EitherValues runAmqpTest(client(channel, Config.empty(10.seconds))) { client => for { pubSeq <- IO(channel.publishSeq) - future <- IO(client.publisher()(commandBuilder).unsafeToFuture()) + future <- IO(client.publisher().flatMap(publisher => publisher(commandBuilder)).unsafeToFuture()) properties = new BasicProperties _ = properties.builder().build() _ <- IO(channel.returnListeners.foreach(_.handleReturn(400, "reply", exchange.value, rk.value, properties, message.getBytes))) @@ -71,7 +72,7 @@ class PublisherTest extends AnyFunSuite with IOAmqpClientTest with EitherValues val channel = StubChannels.publishNoAck[IO] runAmqpTestIO(client(channel, Config.empty(1.second))) { client => for { - result <- client.publisher()(commandBuilder).attempt + result <- client.publisher().flatMap(publisher => publisher(commandBuilder)).attempt listeners <- IO(channel.confirmListeners.map(_.asInstanceOf[PendingConfirmListener[IO]])) pendingConf <- listeners.toList.map(_.pendingConfirmations.get).sequence } yield { @@ -88,9 +89,9 @@ class PublisherTest extends AnyFunSuite with IOAmqpClientTest with EitherValues val channel = StubChannels.publishNoAck[IO] runAmqpTestIO(client(channel, Config.empty(30.seconds))) { client => for { - fiber1 <- client.publisher()(commandBuilder).start - fiber2 <- client.publisher()(commandBuilder).start - fiber3 <- client.publisher()(commandBuilder).start + fiber1 <- client.publisher().flatMap(publisher => publisher(commandBuilder)).start + fiber2 <- client.publisher().flatMap(publisher => publisher(commandBuilder)).start + fiber3 <- client.publisher().flatMap(publisher => publisher(commandBuilder)).start _ <- IO.sleep(5.seconds) _ <- IO(channel.confirmListeners.foreach(_.handleAck(2, true))) outcome1 <- fiber1.join @@ -108,9 +109,9 @@ class PublisherTest extends AnyFunSuite with IOAmqpClientTest with EitherValues val channel = StubChannels.publishNoAck[IO] runAmqpTestIO(client(channel, Config.empty(30.seconds))) { client => for { - fiber1 <- client.publisher()(commandBuilder).start - fiber2 <- client.publisher()(commandBuilder).start - fiber3 <- client.publisher()(commandBuilder).start + fiber1 <- client.publisher().flatMap(publisher => publisher(commandBuilder)).start + fiber2 <- client.publisher().flatMap(publisher => publisher(commandBuilder)).start + fiber3 <- client.publisher().flatMap(publisher => publisher(commandBuilder)).start _ <- IO.sleep(5.seconds) _ <- IO(channel.confirmListeners.foreach(_.handleNack(2, true))) outcome1 <- fiber1.join @@ -127,12 +128,12 @@ class PublisherTest extends AnyFunSuite with IOAmqpClientTest with EitherValues test("Multiple messages can be published and some can be acked and some can be Nacked.") { val channel = StubChannels.publishNoAck[IO] runAmqpTestIO(client(channel, Config.empty(10.seconds))) { client => - val pub: IO[Unit] = client.publisher()(commandBuilder) + val pub: IO[Unit] = client.publisher().flatMap(publisher => publisher(commandBuilder)) for { - fiber1 <- client.publisher()(commandBuilder).start - fiber2 <- client.publisher()(commandBuilder).start - fiber3 <- client.publisher()(commandBuilder).start - fiber4 <- client.publisher()(commandBuilder).start + fiber1 <- client.publisher().flatMap(publisher => publisher(commandBuilder)).start + fiber2 <- client.publisher().flatMap(publisher => publisher(commandBuilder)).start + fiber3 <- client.publisher().flatMap(publisher => publisher(commandBuilder)).start + fiber4 <- client.publisher().flatMap(publisher => publisher(commandBuilder)).start _ <- IO.sleep(3.seconds) _ <- IO.sleep(3.seconds) _ <- IO(channel.confirmListeners.foreach(_.handleNack(0, false))) diff --git a/test/src/test/scala/com/itv/bucky/test/StubTest.scala b/test/src/test/scala/com/itv/bucky/test/StubTest.scala index 3f72264a..69a4c33e 100644 --- a/test/src/test/scala/com/itv/bucky/test/StubTest.scala +++ b/test/src/test/scala/com/itv/bucky/test/StubTest.scala @@ -23,7 +23,7 @@ class StubTest extends AnyFunSuite with IOAmqpClientTest { val consumer = StubHandlers.ackHandler[IO, String] Resource.eval(client.declare(declarations)).flatMap(_ => client.registerConsumerOf(queue, consumer)).use { _ => for { - publisher <- IO(client.publisherOf[String](exchange, rk)) + publisher <- client.publisherOf[String](exchange, rk) _ <- (1 to 10).toList.map(_ => publisher(message)).sequence } yield { all(consumer.receivedMessages) should be(message) @@ -35,10 +35,11 @@ class StubTest extends AnyFunSuite with IOAmqpClientTest { test("Should not suffer from deadlock") { runAmqpTestAllAck { client => - val publisher = client.publisherOf[String](ExchangeName("x"), RoutingKey("y")) val handler = new Handler[IO, String] { override def apply(delivery: String): IO[consume.ConsumeAction] = - publisher("publish from handler").map(_ => Ack) + client.publisherOf[String](ExchangeName("x"), RoutingKey("y")).flatMap { publisher => + publisher("publish from handler").map(_ => Ack) + } } Resource @@ -47,8 +48,9 @@ class StubTest extends AnyFunSuite with IOAmqpClientTest { client.registerConsumerOf(queue, handler) } .use { _ => - val publisher = client.publisherOf[String](exchange, rk) - publisher(message) + client.publisherOf[String](exchange, rk).flatMap { publisher => + publisher(message) + } } } } @@ -66,7 +68,7 @@ class StubTest extends AnyFunSuite with IOAmqpClientTest { } yield ()) .use { _ => for { - publisher <- IO(client.publisherOf[String](exchange, rk)) + publisher <- client.publisherOf[String](exchange, rk) _ <- publisher(message) } yield stubPubslisher.recordedMessages shouldBe List(message) } @@ -84,7 +86,7 @@ class StubTest extends AnyFunSuite with IOAmqpClientTest { _ <- client.registerConsumerOf(queue2, consumer2) } yield ()).use { _ => for { - publisher <- IO(client.publisherOf[String](exchange, rk)) + publisher <- client.publisherOf[String](exchange, rk) _ <- (1 to 10).toList.map(_ => publisher(message)).sequence } yield { all(consumer1.receivedMessages) should be(message) @@ -104,7 +106,7 @@ class StubTest extends AnyFunSuite with IOAmqpClientTest { _ <- client.registerConsumerOf(queue, consumer) } yield ()).use { _ => for { - publisher <- IO(client.publisherOf[String](exchange, rk)) + publisher <- client.publisherOf[String](exchange, rk) publishRes <- publisher(message).attempt } yield { consumer.receivedMessages shouldBe List(message) @@ -123,7 +125,7 @@ class StubTest extends AnyFunSuite with IOAmqpClientTest { _ <- client.registerConsumerOf(queue, consumer) } yield ()).use { _ => for { - publisher <- IO(client.publisherOf[String](exchange, rk)) + publisher <- client.publisherOf[String](exchange, rk) publishRes <- publisher(message).attempt } yield { consumer.receivedMessages shouldBe List(message) @@ -142,7 +144,7 @@ class StubTest extends AnyFunSuite with IOAmqpClientTest { _ <- client.registerConsumerOf(queue, consumer) } yield ()).use { _ => for { - publisher <- IO(client.publisherOf[String](exchange, rk)) + publisher <- client.publisherOf[String](exchange, rk) publishRes <- publisher(message).attempt } yield { consumer.receivedMessages shouldBe List(message) @@ -161,7 +163,7 @@ class StubTest extends AnyFunSuite with IOAmqpClientTest { } yield ()).use { _ => for { - publisher <- IO(client.publisherOf[String](exchange, rk)) + publisher <- client.publisherOf[String](exchange, rk) publishRes <- publisher(message).attempt } yield { consumer.receivedMessages shouldBe List(message) @@ -179,7 +181,7 @@ class StubTest extends AnyFunSuite with IOAmqpClientTest { _ <- client.registerConsumerOf(queue, consumer) } yield ()).use { _ => for { - publisher <- IO(client.publisherOf[String](exchange, rk)) + publisher <- client.publisherOf[String](exchange, rk) publishRes <- publisher(message).attempt } yield { consumer.receivedMessages shouldBe List(message) @@ -199,7 +201,7 @@ class StubTest extends AnyFunSuite with IOAmqpClientTest { } yield ()).use { _ => for { - publisher <- IO(client.publisherOf[String](exchange, rk)) + publisher <- client.publisherOf[String](exchange, rk) publishRes <- publisher(message).attempt } yield { consumer.receivedMessages shouldBe List(message)