Skip to content
This repository has been archived by the owner on Mar 17, 2024. It is now read-only.

scala 2.13 migration #424

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 2 additions & 4 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -165,18 +165,16 @@ lazy val commonSettings = Seq(
scalacOptions ++= Seq(
"-encoding",
"UTF-8",
"-target:jvm-1.8",
"-release", "8",
"-Xlog-reflective-calls",
"-Xlint",
"-Ywarn-unused",
"-Ywarn-unused-import",
"-deprecation",
"-feature",
"-language:_",
"-unchecked"
),
scalacOptions in (Compile, console) := (scalacOptions in (Global)).value
.filter(_ == "-Ywarn-unused-import"),
scalacOptions in (Compile, console) := (scalacOptions in (Global)).value,
scalacOptions in (Test, console) := (scalacOptions in (Compile, console)).value
)

Expand Down
2 changes: 1 addition & 1 deletion project/Dependencies.scala
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import sbt._

object Version {
val Scala = "2.12.17"
val Scala = "2.13.10"
val Akka = "2.6.20"
val Prometheus = "0.15.0"
val Fabric8 = "4.11.2"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ trait IntegrationSpec
val parsedDoubleTry = Try(actual.toDouble)
assert(parsedDoubleTry.isSuccess)
val parsedDouble = parsedDoubleTry.get
assert(!parsedDouble.isNaN)
parsedDouble should be > lastLagInTime
lastLagInTime = parsedDouble
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import org.scalatest.BeforeAndAfterAll
import org.scalatest.concurrent.Eventually
import org.scalatest.matchers.should.Matchers
import org.scalatest.wordspec.AnyWordSpecLike
import org.slf4j.{Logger, LoggerFactory}

import scala.sys.process._
import scala.util.Random
Expand All @@ -24,7 +23,6 @@ abstract class MinikubeSpecBase
with Eventually
with PrometheusUtils {

private[this] val log: Logger = LoggerFactory.getLogger(getClass)
override val bootstrapServers: String =
getNodePortForService("strimzi-kafka-cluster-kafka-external-bootstrap")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,11 @@
package com.lightbend.kafkalagexporter

import java.util

import com.lightbend.kafkalagexporter.EndpointSink.ClusterGlobalLabels
import com.typesafe.config.{Config, ConfigObject}

import scala.annotation.tailrec
import scala.collection.JavaConverters._
import scala.jdk.CollectionConverters.{ListHasAsScala, SetHasAsScala}
import scala.compat.java8.DurationConverters._
import scala.concurrent.duration.{Duration, FiniteDuration}
import scala.util.Try
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -368,9 +368,7 @@ object ConsumerGroupCollector {
): Unit = {
val groupLag: immutable.Iterable[GroupPartitionLag] = for {
(gtp, groupPoint) <- offsetsSnapshot.lastGroupOffsets
(_, mostRecentPoint) <- offsetsSnapshot.latestOffsets.filterKeys(
_ == gtp.tp
)
mostRecentPoint <- offsetsSnapshot.latestOffsets.get(gtp.tp)
} yield {
val (groupOffset, offsetLag, timeLag) = groupPoint match {
case Some(point) =>
Expand All @@ -383,7 +381,7 @@ object ConsumerGroupCollector {
}

val offsetLagCalc = mostRecentPoint.offset - point.offset
val offsetLag = if (offsetLagCalc < 0) 0d else offsetLagCalc
val offsetLag = if (offsetLagCalc < 0) 0d else offsetLagCalc.toDouble

log.debug(
" Found time_lag=\"{}\" and offset_lag=\"{}\" for offset=\"{}\" in the lookup table ({}, {})",
Expand Down Expand Up @@ -473,7 +471,7 @@ object ConsumerGroupCollector {
Metrics.EarliestOffsetMetric,
config.cluster.name,
tp,
topicPoint.offset
topicPoint.offset.toDouble
)
}
}
Expand All @@ -489,7 +487,7 @@ object ConsumerGroupCollector {
Metrics.LatestOffsetMetric,
config.cluster.name,
tp,
point.offset
point.offset.toDouble
)
}

Expand Down Expand Up @@ -565,7 +563,7 @@ object ConsumerGroupCollector {
reporter ! Metrics.ClusterValueMessage(
Metrics.PollTimeMetric,
config.cluster.name,
metaData.pollTime
metaData.pollTime.toDouble
)
}
}
2 changes: 1 addition & 1 deletion src/main/scala/com/lightbend/kafkalagexporter/Domain.scala
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ object Domain {
}

def clear(evictedTps: List[TopicPartition]): Unit =
tables = tables.filterKeys(tp => !evictedTps.contains(tp))
tables = tables.view.filterKeys(tp => !evictedTps.contains(tp)).toMap

def all: Map[TopicPartition, LookupTable] = tables
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,6 @@ import java.net.Socket
import java.io.PrintWriter
import scala.util.{Try, Success, Failure}

import scala.util.Try

object GraphiteEndpointSink {

def apply(
Expand Down
21 changes: 4 additions & 17 deletions src/main/scala/com/lightbend/kafkalagexporter/KafkaClient.scala
Original file line number Diff line number Diff line change
Expand Up @@ -9,30 +9,17 @@ import java.time.Duration
import java.util.Properties
import java.util.concurrent.TimeUnit
import java.{lang, util}

import com.lightbend.kafkalagexporter.Domain.{GroupOffsets, PartitionOffsets}
import com.lightbend.kafkalagexporter.KafkaClient.{
AdminKafkaClientContract,
ConsumerKafkaClientContract,
KafkaClientContract
}
import com.lightbend.kafkalagexporter.KafkaClient.{AdminKafkaClientContract, ConsumerKafkaClientContract, KafkaClientContract}
import org.apache.kafka.clients.admin._
import org.apache.kafka.clients.consumer.{
ConsumerConfig,
KafkaConsumer,
OffsetAndMetadata
}
import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer, OffsetAndMetadata}
import org.apache.kafka.common.serialization.ByteArrayDeserializer
import org.apache.kafka.common.{
KafkaFuture,
TopicPartition => KafkaTopicPartition
}
import org.apache.kafka.common.{KafkaFuture, TopicPartition => KafkaTopicPartition}

import scala.collection.JavaConverters._
import scala.collection.immutable.Map
import scala.compat.java8.DurationConverters._
import scala.concurrent.duration.FiniteDuration
import scala.concurrent.{ExecutionContext, Future, Promise}
import scala.jdk.CollectionConverters.{CollectionHasAsScala, MapHasAsScala, SeqHasAsJava, SetHasAsJava, SetHasAsScala}
import scala.util.Try

object KafkaClient {
Expand Down
22 changes: 11 additions & 11 deletions src/main/scala/com/lightbend/kafkalagexporter/LookupTable.scala
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ object LookupTable {
) extends LookupTable {
import config._

val key = List(prefix, clusterName, tp.topic, tp.partition)
val key = List(prefix, clusterName, tp.topic, String.valueOf(tp.partition))
.mkString(separator)
val client = config.client

Expand All @@ -111,16 +111,16 @@ object LookupTable {
val times = client
.zrangebyscore(
key = key,
min = point.offset,
max = point.offset,
min = point.offset.toDouble,
max = point.offset.toDouble,
limit = Some((0, 2): (Int, Int))
)
.get
// remove points with the same offset
client.zremrangebyscore(
key = key,
start = point.offset,
end = point.offset
start = point.offset.toDouble,
end = point.offset.toDouble
)
// insert earliest + current points
client.zadd(key, point.offset.toDouble, times.minBy(_.toLong))
Expand Down Expand Up @@ -196,11 +196,11 @@ object LookupTable {
right = mostRecentPoint()
}

if (left.isRight && right.isRight) {
predict(offset, left.right.get, right.right.get)
} else {
TooFewPoints
}
(for {
l <- left
r <- right
} yield predict(offset, l, r))
.getOrElse(TooFewPoints)
}

mostRecentPoint() match {
Expand Down Expand Up @@ -337,7 +337,7 @@ object LookupTable {
// create a sliding window of 2 elements with a step size of 1
.sliding(size = 2, step = 1)
// convert window to a tuple. since we're iterating backwards we match right and left in reverse.
.map { case r :: l :: Nil => (l, r) }
.collect { case Seq(r, l) => (l, r) }
// find the Point that contains the offset
.find { case (l, r) => offset >= l.offset && offset <= r.offset }
// offset is not between any two points in the table
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,9 @@ class GraphiteEndpointSinkTest extends FixtureAnyFreeSpec with Matchers {
"GraphiteEndpointSinkImpl should" - {

"report only metrics which match the regex" in { fixture =>
val properties = Map(
val properties: Map[String, Any] = Map(
"reporters.graphite.host" -> "localhost",
"reporters.graphite.port" -> fixture.server.server.getLocalPort()
"reporters.graphite.port" -> fixture.server.server.getLocalPort
)
val sink = GraphiteEndpointSink(
new GraphiteEndpointConfig(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
package com.lightbend.kafkalagexporter

import java.net.URL

import com.typesafe.config.ConfigFactory
import org.influxdb.dto.Query
import org.scalatest.concurrent.{Eventually, IntegrationPatience}
Expand Down Expand Up @@ -137,7 +136,7 @@ class InfluxDBPusherSinkSpec
new InfluxDBPusherSinkConfig(
"InfluxDBPusherSink",
List("kafka_consumergroup_group_max_lag"),
ConfigFactory.parseMap(mapAsJavaMap(fixture.properties))
ConfigFactory.parseMap(fixture.properties.asJava)
),
clustersGlobalValuesMap
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,26 +6,18 @@
package com.lightbend.kafkalagexporter

import java.util.Optional

import com.lightbend.kafkalagexporter.Domain.GroupOffsets
import com.lightbend.kafkalagexporter.KafkaClient.{
AdminKafkaClientContract,
ConsumerKafkaClientContract,
KafkaTopicPartitionOps
}
import com.lightbend.kafkalagexporter.KafkaClient.{AdminKafkaClientContract, ConsumerKafkaClientContract, KafkaTopicPartitionOps}
import org.apache.kafka.clients.admin._
import org.apache.kafka.clients.consumer.OffsetAndMetadata
import org.apache.kafka.common.{
ConsumerGroupState,
TopicPartition => KafkaTopicPartition
}
import org.apache.kafka.common.{ConsumerGroupState, TopicPartition => KafkaTopicPartition}
import org.mockito.{ArgumentMatchersSugar, MockitoSugar}
import org.scalatest.concurrent.ScalaFutures
import org.scalatest.freespec.AnyFreeSpec
import org.scalatest.matchers.should.Matchers

import scala.collection.JavaConverters._
import scala.concurrent.{ExecutionContext, ExecutionContextExecutor, Future}
import scala.jdk.CollectionConverters.{IterableHasAsJava, MapHasAsJava, SeqHasAsJava, SetHasAsJava}

class KafkaClientSpec
extends AnyFreeSpec
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ class LookupTableSpec
)

tests.foreach(expected =>
table.lookup(expected) shouldBe Prediction(expected)
table.lookup(expected) shouldBe Prediction(expected.toDouble)
)
}

Expand Down Expand Up @@ -174,7 +174,7 @@ class LookupTableSpec
fail(s"Expected flat entries to compress to a single entry $table")
}

if (table.mostRecentPoint().right.get.time != 400) {
if (table.mostRecentPoint().toOption.get.time != 400) {
fail(s"Expected compressed table to have last timestamp $table")
}

Expand Down Expand Up @@ -304,9 +304,9 @@ class LookupTableSpec
)
}

if (n != result.right.get.offset) {
if (n != result.toOption.get.offset) {
fail(
s"Most recent point on $table expected $n, but got ${result.right.get.offset}"
s"Most recent point on $table expected $n, but got ${result.toOption.get.offset}"
)
}
}
Expand Down Expand Up @@ -368,7 +368,7 @@ class LookupTableSpec
)

tests.foreach(expected =>
table.lookup(expected) shouldBe Prediction(expected)
table.lookup(expected) shouldBe Prediction(expected.toDouble)
)
}

Expand Down Expand Up @@ -510,9 +510,9 @@ class LookupTableSpec
)
}

if (n != result.right.get.offset) {
if (n != result.toOption.get.offset) {
fail(
s"Most recent point on $table expected $n, but got ${result.right.get.offset}"
s"Most recent point on $table expected $n, but got ${result.toOption.get.offset}"
)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,17 @@
package com.lightbend.kafkalagexporter

import java.net.ServerSocket

import io.prometheus.client.CollectorRegistry
import io.prometheus.client.exporter.HTTPServer

import scala.collection.JavaConverters._
import scala.util.{Failure, Success, Try}
import com.typesafe.config.{Config, ConfigFactory}
import org.scalatest.Outcome
import org.scalatest.freespec.FixtureAnyFreeSpec
import org.scalatest.matchers.should.Matchers

import scala.jdk.CollectionConverters.{EnumerationHasAsScala, ListHasAsScala}

class PrometheusEndpointSinkSpec extends FixtureAnyFreeSpec with Matchers {

case class Fixture(
Expand Down