Skip to content

Commit

Permalink
fs2-rabbit backend (#82)
Browse files Browse the repository at this point in the history
* init consumer using rabbit fs2

* implement amqpClient interface with fs2 rabbit

* fix requeue

* WIP separation into modules

* All compilation and tests

* Finish off Delivery MessageEncoder and support all declaration options

* Remove temporary client in examples module

* Requeue on reject for RequeueImmediately support

* Restore example to JavaBackend client

* Bleh! Type erasure is the worst

* `deliveryEncoder` tests

* WIP

* Get compiling again after a rebase

* Passing tests

* Abstract out fs2-rabbit config

* Move unit tests to relevant module

* Fix up test logging

* Sort declarations before executing them!

* Try existing integration tests with new client

* Added lots

Co-authored-by: Andrew Gee <[email protected]> 🫥

* Reworked publisher to return an F[Publisher[F, ???]]

Co-authored-by: Jos Bogan <[email protected]>

* Fixed error in publishing

Co-authored-by: Andrew Gee <[email protected]> 🫥

* working for cross compile

* Supporting 2.12

Co-authored-by: Andrew Gee <[email protected]> 🫥

* More 2.12

Co-authored-by: Andrew Gee <[email protected]> 🫥

* More 2.12 madness

Co-authored-by: Andrew Gee <[email protected]> 🫥

* revert version

* Co-authored-by: Andrew Gee <[email protected]> 🫥

* Time :(

Co-authored-by: Andrew Gee <[email protected]> 🫥

* success?

Co-authored-by: Jos Bogan <[email protected]>

---------

Co-authored-by: Luke Thomas <[email protected]>
Co-authored-by: joseboga <[email protected]>
Co-authored-by: Jos Bogan <[email protected]>
  • Loading branch information
4 people authored Jul 11, 2024
1 parent e3bd4ca commit a765766
Show file tree
Hide file tree
Showing 42 changed files with 1,279 additions and 456 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
package com.itv.bucky.backend.fs2rabbit

import cats.effect._
import cats.effect.implicits._
import cats.effect.std.Dispatcher
import cats.implicits._
import com.itv.bucky.publish._
import com.itv.bucky.{AmqpClientConfig, Publisher, publish}
import com.rabbitmq.client.Channel
import com.typesafe.scalalogging.StrictLogging
import dev.profunktor.fs2rabbit.effects.MessageEncoder
import dev.profunktor.fs2rabbit.interpreter.RabbitClient
import dev.profunktor.fs2rabbit.model
import dev.profunktor.fs2rabbit.model.{AMQPChannel, PublishingFlag}

import scala.collection.immutable.TreeMap
import scala.util.Try

class AmqpClientConnectionManager[F[_]: Async](
amqpConfig: AmqpClientConfig,
pendingConfirmListener: PendingConfirmListener[F],
dispatcher: Dispatcher[F],
client: RabbitClient[F]
)(implicit publishCommandEncoder: MessageEncoder[F, PublishCommand], amqpChannel: AMQPChannel)
extends StrictLogging {

val publishChannel: Channel = amqpChannel.value

private def synchroniseIfNeeded[T](f: => T): T = publishChannel.synchronized(f)
private def runWithChannelSync[T](action: F[T]): F[T] =
synchroniseIfNeeded {
Async[F].fromTry(Try {
dispatcher.unsafeRunSync(action)
})
}

def addConfirmListeningToPublisher(publisher: Publisher[F, PublishCommand]): Publisher[F, PublishCommand] = (cmd: PublishCommand) =>
for {
deliveryTag <- Ref.of[F, Option[Long]](None)
_ <- (for {
signal <- Deferred[F, Option[Throwable]]
_ <- runWithChannelSync {
for {
nextPublishSeq <- Async[F].blocking(publishChannel.getNextPublishSeqNo)
_ <- deliveryTag.set(Some(nextPublishSeq))
_ <- pendingConfirmListener.pendingConfirmations.update(_ + (nextPublishSeq -> signal))
_ <- publisher(cmd)
} yield ()
}
_ <- signal.get.flatMap(maybeError => maybeError.traverse(Async[F].raiseError[Unit]))
} yield ())
.timeout(amqpConfig.publishingTimeout)
.recoverWith { case e =>
runWithChannelSync {
for {
dl <- deliveryTag.get
deliveryTag <- Async[F].fromOption(dl, new RuntimeException("Timeout occurred before a delivery tag could be obtained.", e))
_ <- pendingConfirmListener.pop(deliveryTag, multiple = false)
_ <- Async[F].raiseError[Unit](e)
} yield ()
}
}
} yield ()

}

private[bucky] object AmqpClientConnectionManager extends StrictLogging {

def apply[F[_]](
config: AmqpClientConfig,
client: RabbitClient[F],
amqpChannel: AMQPChannel,
dispatcher: Dispatcher[F]
)(implicit
F: Async[F],
publishCommandEncoder: MessageEncoder[F, PublishCommand]
): F[AmqpClientConnectionManager[F]] =
for {
pendingConfirmations <- Ref.of[F, TreeMap[Long, Deferred[F, Option[Throwable]]]](TreeMap.empty)
pendingReturn <- Ref.of[F, Option[Throwable]](None)
publishChannel = amqpChannel.value
_ <- Async[F].blocking(publishChannel.confirmSelect)
confirmListener <- Async[F].blocking(PendingConfirmListener(pendingConfirmations, pendingReturn, dispatcher))
_ <- Async[F].blocking(publishChannel.addConfirmListener(confirmListener))
_ <- Async[F].blocking(publishChannel.addReturnListener(confirmListener))
} yield new AmqpClientConnectionManager(config, confirmListener, dispatcher, client)(implicitly, implicitly, amqpChannel)
}
Loading

0 comments on commit a765766

Please sign in to comment.