diff --git a/build.sbt b/build.sbt index 570318c6..c6479bd7 100644 --- a/build.sbt +++ b/build.sbt @@ -165,18 +165,16 @@ lazy val commonSettings = Seq( scalacOptions ++= Seq( "-encoding", "UTF-8", - "-target:jvm-1.8", + "-release", "8", "-Xlog-reflective-calls", "-Xlint", "-Ywarn-unused", - "-Ywarn-unused-import", "-deprecation", "-feature", "-language:_", "-unchecked" ), - scalacOptions in (Compile, console) := (scalacOptions in (Global)).value - .filter(_ == "-Ywarn-unused-import"), + scalacOptions in (Compile, console) := (scalacOptions in (Global)).value, scalacOptions in (Test, console) := (scalacOptions in (Compile, console)).value ) diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 365c8a2e..c6432bcf 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -1,7 +1,7 @@ import sbt._ object Version { - val Scala = "2.12.17" + val Scala = "2.13.10" val Akka = "2.6.20" val Prometheus = "0.15.0" val Fabric8 = "4.11.2" diff --git a/src/it/scala/com/lightbend/kafkalagexporter/integration/IntegrationSpec.scala b/src/it/scala/com/lightbend/kafkalagexporter/integration/IntegrationSpec.scala index 3d05c749..02aa473f 100644 --- a/src/it/scala/com/lightbend/kafkalagexporter/integration/IntegrationSpec.scala +++ b/src/it/scala/com/lightbend/kafkalagexporter/integration/IntegrationSpec.scala @@ -134,6 +134,7 @@ trait IntegrationSpec val parsedDoubleTry = Try(actual.toDouble) assert(parsedDoubleTry.isSuccess) val parsedDouble = parsedDoubleTry.get + assert(!parsedDouble.isNaN) parsedDouble should be > lastLagInTime lastLagInTime = parsedDouble } diff --git a/src/it/scala/com/lightbend/kafkalagexporter/integration/minikube/MinikubeSpecBase.scala b/src/it/scala/com/lightbend/kafkalagexporter/integration/minikube/MinikubeSpecBase.scala index 6a4b49f4..8ee2c1ed 100644 --- a/src/it/scala/com/lightbend/kafkalagexporter/integration/minikube/MinikubeSpecBase.scala +++ b/src/it/scala/com/lightbend/kafkalagexporter/integration/minikube/MinikubeSpecBase.scala @@ -11,7 +11,6 @@ import org.scalatest.BeforeAndAfterAll import org.scalatest.concurrent.Eventually import org.scalatest.matchers.should.Matchers import org.scalatest.wordspec.AnyWordSpecLike -import org.slf4j.{Logger, LoggerFactory} import scala.sys.process._ import scala.util.Random @@ -24,7 +23,6 @@ abstract class MinikubeSpecBase with Eventually with PrometheusUtils { - private[this] val log: Logger = LoggerFactory.getLogger(getClass) override val bootstrapServers: String = getNodePortForService("strimzi-kafka-cluster-kafka-external-bootstrap") diff --git a/src/main/scala/com/lightbend/kafkalagexporter/AppConfig.scala b/src/main/scala/com/lightbend/kafkalagexporter/AppConfig.scala index a20160f5..4000d2cd 100644 --- a/src/main/scala/com/lightbend/kafkalagexporter/AppConfig.scala +++ b/src/main/scala/com/lightbend/kafkalagexporter/AppConfig.scala @@ -6,12 +6,11 @@ package com.lightbend.kafkalagexporter import java.util - import com.lightbend.kafkalagexporter.EndpointSink.ClusterGlobalLabels import com.typesafe.config.{Config, ConfigObject} import scala.annotation.tailrec -import scala.collection.JavaConverters._ +import scala.jdk.CollectionConverters.{ListHasAsScala, SetHasAsScala} import scala.compat.java8.DurationConverters._ import scala.concurrent.duration.{Duration, FiniteDuration} import scala.util.Try diff --git a/src/main/scala/com/lightbend/kafkalagexporter/ConsumerGroupCollector.scala b/src/main/scala/com/lightbend/kafkalagexporter/ConsumerGroupCollector.scala index 5d2284b1..52ef14e3 100644 --- a/src/main/scala/com/lightbend/kafkalagexporter/ConsumerGroupCollector.scala +++ b/src/main/scala/com/lightbend/kafkalagexporter/ConsumerGroupCollector.scala @@ -368,9 +368,7 @@ object ConsumerGroupCollector { ): Unit = { val groupLag: immutable.Iterable[GroupPartitionLag] = for { (gtp, groupPoint) <- offsetsSnapshot.lastGroupOffsets - (_, mostRecentPoint) <- offsetsSnapshot.latestOffsets.filterKeys( - _ == gtp.tp - ) + mostRecentPoint <- offsetsSnapshot.latestOffsets.get(gtp.tp) } yield { val (groupOffset, offsetLag, timeLag) = groupPoint match { case Some(point) => @@ -383,7 +381,7 @@ object ConsumerGroupCollector { } val offsetLagCalc = mostRecentPoint.offset - point.offset - val offsetLag = if (offsetLagCalc < 0) 0d else offsetLagCalc + val offsetLag = if (offsetLagCalc < 0) 0d else offsetLagCalc.toDouble log.debug( " Found time_lag=\"{}\" and offset_lag=\"{}\" for offset=\"{}\" in the lookup table ({}, {})", @@ -473,7 +471,7 @@ object ConsumerGroupCollector { Metrics.EarliestOffsetMetric, config.cluster.name, tp, - topicPoint.offset + topicPoint.offset.toDouble ) } } @@ -489,7 +487,7 @@ object ConsumerGroupCollector { Metrics.LatestOffsetMetric, config.cluster.name, tp, - point.offset + point.offset.toDouble ) } @@ -565,7 +563,7 @@ object ConsumerGroupCollector { reporter ! Metrics.ClusterValueMessage( Metrics.PollTimeMetric, config.cluster.name, - metaData.pollTime + metaData.pollTime.toDouble ) } } diff --git a/src/main/scala/com/lightbend/kafkalagexporter/Domain.scala b/src/main/scala/com/lightbend/kafkalagexporter/Domain.scala index cf09cdd9..fffede89 100644 --- a/src/main/scala/com/lightbend/kafkalagexporter/Domain.scala +++ b/src/main/scala/com/lightbend/kafkalagexporter/Domain.scala @@ -56,7 +56,7 @@ object Domain { } def clear(evictedTps: List[TopicPartition]): Unit = - tables = tables.filterKeys(tp => !evictedTps.contains(tp)) + tables = tables.view.filterKeys(tp => !evictedTps.contains(tp)).toMap def all: Map[TopicPartition, LookupTable] = tables } diff --git a/src/main/scala/com/lightbend/kafkalagexporter/GraphiteEndpointSink.scala b/src/main/scala/com/lightbend/kafkalagexporter/GraphiteEndpointSink.scala index 6f036c31..f0289b75 100644 --- a/src/main/scala/com/lightbend/kafkalagexporter/GraphiteEndpointSink.scala +++ b/src/main/scala/com/lightbend/kafkalagexporter/GraphiteEndpointSink.scala @@ -11,8 +11,6 @@ import java.net.Socket import java.io.PrintWriter import scala.util.{Try, Success, Failure} -import scala.util.Try - object GraphiteEndpointSink { def apply( diff --git a/src/main/scala/com/lightbend/kafkalagexporter/KafkaClient.scala b/src/main/scala/com/lightbend/kafkalagexporter/KafkaClient.scala index aa7e6279..94bccf18 100644 --- a/src/main/scala/com/lightbend/kafkalagexporter/KafkaClient.scala +++ b/src/main/scala/com/lightbend/kafkalagexporter/KafkaClient.scala @@ -9,30 +9,17 @@ import java.time.Duration import java.util.Properties import java.util.concurrent.TimeUnit import java.{lang, util} - import com.lightbend.kafkalagexporter.Domain.{GroupOffsets, PartitionOffsets} -import com.lightbend.kafkalagexporter.KafkaClient.{ - AdminKafkaClientContract, - ConsumerKafkaClientContract, - KafkaClientContract -} +import com.lightbend.kafkalagexporter.KafkaClient.{AdminKafkaClientContract, ConsumerKafkaClientContract, KafkaClientContract} import org.apache.kafka.clients.admin._ -import org.apache.kafka.clients.consumer.{ - ConsumerConfig, - KafkaConsumer, - OffsetAndMetadata -} +import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer, OffsetAndMetadata} import org.apache.kafka.common.serialization.ByteArrayDeserializer -import org.apache.kafka.common.{ - KafkaFuture, - TopicPartition => KafkaTopicPartition -} +import org.apache.kafka.common.{KafkaFuture, TopicPartition => KafkaTopicPartition} -import scala.collection.JavaConverters._ -import scala.collection.immutable.Map import scala.compat.java8.DurationConverters._ import scala.concurrent.duration.FiniteDuration import scala.concurrent.{ExecutionContext, Future, Promise} +import scala.jdk.CollectionConverters.{CollectionHasAsScala, MapHasAsScala, SeqHasAsJava, SetHasAsJava, SetHasAsScala} import scala.util.Try object KafkaClient { diff --git a/src/main/scala/com/lightbend/kafkalagexporter/LookupTable.scala b/src/main/scala/com/lightbend/kafkalagexporter/LookupTable.scala index 61aa59bd..f20015f4 100644 --- a/src/main/scala/com/lightbend/kafkalagexporter/LookupTable.scala +++ b/src/main/scala/com/lightbend/kafkalagexporter/LookupTable.scala @@ -92,7 +92,7 @@ object LookupTable { ) extends LookupTable { import config._ - val key = List(prefix, clusterName, tp.topic, tp.partition) + val key = List(prefix, clusterName, tp.topic, String.valueOf(tp.partition)) .mkString(separator) val client = config.client @@ -111,16 +111,16 @@ object LookupTable { val times = client .zrangebyscore( key = key, - min = point.offset, - max = point.offset, + min = point.offset.toDouble, + max = point.offset.toDouble, limit = Some((0, 2): (Int, Int)) ) .get // remove points with the same offset client.zremrangebyscore( key = key, - start = point.offset, - end = point.offset + start = point.offset.toDouble, + end = point.offset.toDouble ) // insert earliest + current points client.zadd(key, point.offset.toDouble, times.minBy(_.toLong)) @@ -196,11 +196,11 @@ object LookupTable { right = mostRecentPoint() } - if (left.isRight && right.isRight) { - predict(offset, left.right.get, right.right.get) - } else { - TooFewPoints - } + (for { + l <- left + r <- right + } yield predict(offset, l, r)) + .getOrElse(TooFewPoints) } mostRecentPoint() match { @@ -337,7 +337,7 @@ object LookupTable { // create a sliding window of 2 elements with a step size of 1 .sliding(size = 2, step = 1) // convert window to a tuple. since we're iterating backwards we match right and left in reverse. - .map { case r :: l :: Nil => (l, r) } + .collect { case Seq(r, l) => (l, r) } // find the Point that contains the offset .find { case (l, r) => offset >= l.offset && offset <= r.offset } // offset is not between any two points in the table diff --git a/src/test/scala/com/lightbend/kafkalagexporter/GraphiteEndpointSinkSpec.scala b/src/test/scala/com/lightbend/kafkalagexporter/GraphiteEndpointSinkSpec.scala index 96944022..de46f068 100644 --- a/src/test/scala/com/lightbend/kafkalagexporter/GraphiteEndpointSinkSpec.scala +++ b/src/test/scala/com/lightbend/kafkalagexporter/GraphiteEndpointSinkSpec.scala @@ -41,9 +41,9 @@ class GraphiteEndpointSinkTest extends FixtureAnyFreeSpec with Matchers { "GraphiteEndpointSinkImpl should" - { "report only metrics which match the regex" in { fixture => - val properties = Map( + val properties: Map[String, Any] = Map( "reporters.graphite.host" -> "localhost", - "reporters.graphite.port" -> fixture.server.server.getLocalPort() + "reporters.graphite.port" -> fixture.server.server.getLocalPort ) val sink = GraphiteEndpointSink( new GraphiteEndpointConfig( diff --git a/src/test/scala/com/lightbend/kafkalagexporter/InfluxDBPusherSinkSpec.scala b/src/test/scala/com/lightbend/kafkalagexporter/InfluxDBPusherSinkSpec.scala index feed5139..433c6cb8 100644 --- a/src/test/scala/com/lightbend/kafkalagexporter/InfluxDBPusherSinkSpec.scala +++ b/src/test/scala/com/lightbend/kafkalagexporter/InfluxDBPusherSinkSpec.scala @@ -6,7 +6,6 @@ package com.lightbend.kafkalagexporter import java.net.URL - import com.typesafe.config.ConfigFactory import org.influxdb.dto.Query import org.scalatest.concurrent.{Eventually, IntegrationPatience} @@ -137,7 +136,7 @@ class InfluxDBPusherSinkSpec new InfluxDBPusherSinkConfig( "InfluxDBPusherSink", List("kafka_consumergroup_group_max_lag"), - ConfigFactory.parseMap(mapAsJavaMap(fixture.properties)) + ConfigFactory.parseMap(fixture.properties.asJava) ), clustersGlobalValuesMap ) diff --git a/src/test/scala/com/lightbend/kafkalagexporter/KafkaClientSpec.scala b/src/test/scala/com/lightbend/kafkalagexporter/KafkaClientSpec.scala index 17e5e6c1..a93e878e 100644 --- a/src/test/scala/com/lightbend/kafkalagexporter/KafkaClientSpec.scala +++ b/src/test/scala/com/lightbend/kafkalagexporter/KafkaClientSpec.scala @@ -6,26 +6,18 @@ package com.lightbend.kafkalagexporter import java.util.Optional - import com.lightbend.kafkalagexporter.Domain.GroupOffsets -import com.lightbend.kafkalagexporter.KafkaClient.{ - AdminKafkaClientContract, - ConsumerKafkaClientContract, - KafkaTopicPartitionOps -} +import com.lightbend.kafkalagexporter.KafkaClient.{AdminKafkaClientContract, ConsumerKafkaClientContract, KafkaTopicPartitionOps} import org.apache.kafka.clients.admin._ import org.apache.kafka.clients.consumer.OffsetAndMetadata -import org.apache.kafka.common.{ - ConsumerGroupState, - TopicPartition => KafkaTopicPartition -} +import org.apache.kafka.common.{ConsumerGroupState, TopicPartition => KafkaTopicPartition} import org.mockito.{ArgumentMatchersSugar, MockitoSugar} import org.scalatest.concurrent.ScalaFutures import org.scalatest.freespec.AnyFreeSpec import org.scalatest.matchers.should.Matchers -import scala.collection.JavaConverters._ import scala.concurrent.{ExecutionContext, ExecutionContextExecutor, Future} +import scala.jdk.CollectionConverters.{IterableHasAsJava, MapHasAsJava, SeqHasAsJava, SetHasAsJava} class KafkaClientSpec extends AnyFreeSpec diff --git a/src/test/scala/com/lightbend/kafkalagexporter/LookupTableSpec.scala b/src/test/scala/com/lightbend/kafkalagexporter/LookupTableSpec.scala index c7a7c547..278009d3 100644 --- a/src/test/scala/com/lightbend/kafkalagexporter/LookupTableSpec.scala +++ b/src/test/scala/com/lightbend/kafkalagexporter/LookupTableSpec.scala @@ -121,7 +121,7 @@ class LookupTableSpec ) tests.foreach(expected => - table.lookup(expected) shouldBe Prediction(expected) + table.lookup(expected) shouldBe Prediction(expected.toDouble) ) } @@ -174,7 +174,7 @@ class LookupTableSpec fail(s"Expected flat entries to compress to a single entry $table") } - if (table.mostRecentPoint().right.get.time != 400) { + if (table.mostRecentPoint().toOption.get.time != 400) { fail(s"Expected compressed table to have last timestamp $table") } @@ -304,9 +304,9 @@ class LookupTableSpec ) } - if (n != result.right.get.offset) { + if (n != result.toOption.get.offset) { fail( - s"Most recent point on $table expected $n, but got ${result.right.get.offset}" + s"Most recent point on $table expected $n, but got ${result.toOption.get.offset}" ) } } @@ -368,7 +368,7 @@ class LookupTableSpec ) tests.foreach(expected => - table.lookup(expected) shouldBe Prediction(expected) + table.lookup(expected) shouldBe Prediction(expected.toDouble) ) } @@ -510,9 +510,9 @@ class LookupTableSpec ) } - if (n != result.right.get.offset) { + if (n != result.toOption.get.offset) { fail( - s"Most recent point on $table expected $n, but got ${result.right.get.offset}" + s"Most recent point on $table expected $n, but got ${result.toOption.get.offset}" ) } } diff --git a/src/test/scala/com/lightbend/kafkalagexporter/PrometheusEndpointSinkSpec.scala b/src/test/scala/com/lightbend/kafkalagexporter/PrometheusEndpointSinkSpec.scala index 0fc43a5a..7c7c8f8a 100644 --- a/src/test/scala/com/lightbend/kafkalagexporter/PrometheusEndpointSinkSpec.scala +++ b/src/test/scala/com/lightbend/kafkalagexporter/PrometheusEndpointSinkSpec.scala @@ -6,17 +6,17 @@ package com.lightbend.kafkalagexporter import java.net.ServerSocket - import io.prometheus.client.CollectorRegistry import io.prometheus.client.exporter.HTTPServer -import scala.collection.JavaConverters._ import scala.util.{Failure, Success, Try} import com.typesafe.config.{Config, ConfigFactory} import org.scalatest.Outcome import org.scalatest.freespec.FixtureAnyFreeSpec import org.scalatest.matchers.should.Matchers +import scala.jdk.CollectionConverters.{EnumerationHasAsScala, ListHasAsScala} + class PrometheusEndpointSinkSpec extends FixtureAnyFreeSpec with Matchers { case class Fixture(