Skip to content

Commit

Permalink
Cats Effect 3 context
Browse files Browse the repository at this point in the history
  • Loading branch information
voropaevp committed Dec 12, 2021
1 parent dc80130 commit 54c2323
Show file tree
Hide file tree
Showing 16 changed files with 709 additions and 3 deletions.
43 changes: 40 additions & 3 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ lazy val codegenModules = Seq[sbt.ClasspathDep[sbt.ProjectReference]](
)

lazy val bigdataModules = Seq[sbt.ClasspathDep[sbt.ProjectReference]](
`quill-cassandra`, `quill-cassandra-lagom`, `quill-cassandra-monix`, `quill-cassandra-zio`, `quill-orientdb`, `quill-spark`
`quill-cassandra`, `quill-cassandra-lagom`, `quill-cassandra-monix`, `quill-cassandra-zio`, `quill-orientdb`, `quill-spark`, `quill-cassandra-ce`
)

lazy val allModules =
Expand All @@ -76,6 +76,7 @@ lazy val scala213Modules = baseModules ++ jsModules ++ dbModules ++ codegenModul
`quill-async-postgres`,
`quill-finagle-mysql`,
`quill-cassandra`,
`quill-cassandra-ce`,
`quill-cassandra-lagom`,
`quill-cassandra-monix`,
`quill-cassandra-zio`,
Expand Down Expand Up @@ -391,8 +392,8 @@ lazy val `quill-monix` =
.settings(
Test / fork := true,
libraryDependencies ++= Seq(
"io.monix" %% "monix-eval" % "3.0.0",
"io.monix" %% "monix-reactive" % "3.0.0"
"io.monix" %% "monix-eval" % "3.0.0" excludeAll(ExclusionRule(organization = "org.typelevel")),
"io.monix" %% "monix-reactive" % "3.0.0" excludeAll(ExclusionRule(organization = "org.typelevel")),
)
)
.dependsOn(`quill-core-jvm` % "compile->compile;test->test")
Expand Down Expand Up @@ -434,6 +435,31 @@ lazy val `quill-zio` =
.dependsOn(`quill-core-jvm` % "compile->compile;test->test")
.enablePlugins(MimaPlugin)

lazy val `quill-ce` =
(project in file("quill-ce"))
.settings(commonSettings: _*)
.settings(mimaSettings: _*)
.settings(
scalacOptions ++= {
CrossVersion.partialVersion(scalaVersion.value) match {
case Some((2, major)) if major <= 12 => Seq("-Ypartial-unification")
case _ => Seq.empty
}
},
libraryDependencies ++= {
CrossVersion.partialVersion(scalaVersion.value) match {
case Some((2, x)) if x >= 12 => Seq(
"org.typelevel" %% "cats-core" % "2.3.0",
"org.typelevel" %% "cats-effect" % "3.1.1",
"co.fs2" %% "fs2-core" % "3.0.4",
)
case _ => Seq.empty
}
}
)
.dependsOn(`quill-core-jvm` % "compile->compile;test->test")
.enablePlugins(MimaPlugin)

lazy val `quill-jdbc-zio` =
(project in file("quill-jdbc-zio"))
.settings(commonSettings: _*)
Expand Down Expand Up @@ -666,6 +692,16 @@ lazy val `quill-cassandra-zio` =
.dependsOn(`quill-zio` % "compile->compile;test->test")
.enablePlugins(MimaPlugin)

lazy val `quill-cassandra-ce` =
(project in file("quill-cassandra-ce"))
.settings(commonSettings: _*)
.settings(mimaSettings: _*)
.settings(
Test / fork := true
)
.dependsOn(`quill-cassandra` % "compile->compile;test->test")
.dependsOn(`quill-ce` % "compile->compile;test->test")
.enablePlugins(MimaPlugin)

lazy val `quill-cassandra-lagom` =
(project in file("quill-cassandra-lagom"))
Expand All @@ -686,6 +722,7 @@ lazy val `quill-cassandra-lagom` =
) ++ versionSpecificDependencies
}
)
.dependsOn(`quill-cassandra` % "compile->compile;test->test")
.enablePlugins(MimaPlugin)


Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
package io.getquill

import cats._
import cats.effect._
import com.datastax.oss.driver.api.core.cql.Row
import cats.syntax.all._
import com.datastax.oss.driver.api.core.CqlSession
import com.datastax.oss.driver.api.core.cql.AsyncResultSet
import fs2.{ Chunk, Stream }
import com.typesafe.config.Config
import io.getquill.context.cassandra.CqlIdiom
import io.getquill.util.{ ContextLogger, LoadConfig }
import io.getquill.context.ExecutionInfo
import io.getquill.context.ce.CeContext

import scala.jdk.CollectionConverters._
import scala.language.higherKinds

class CassandraCeContext[N <: NamingStrategy, F[_]](
naming: N,
session: CqlSession,
preparedStatementCacheSize: Long
)(implicit val af: Async[F])
extends CassandraCqlSessionContext[N](naming, session, preparedStatementCacheSize)
with CeContext[CqlIdiom, N, F] {

private val logger = ContextLogger(classOf[CassandraCeContext[_, F]])

private[getquill] def prepareRowAndLog(cql: String, prepare: Prepare = identityPrepare): F[PrepareRow] = for {
ec <- Async[F].executionContext
futureStatement = Sync[F].delay(prepareAsync(cql)(ec))
prepStatement <- Async[F].fromFuture(futureStatement)
(params, bs) = prepare(prepStatement, this)
_ <- Sync[F].delay(logger.logQuery(cql, params))
} yield bs

protected def page(rs: AsyncResultSet): Stream[F, Row] =
Stream.unfoldChunkEval(rs.remaining())(rem =>
if (rem > 0)
af.delay[Option[(Chunk[Row], Int)]] {
val chunk: Chunk[Row] = Chunk.iterable(rs.currentPage().asScala)
Some((chunk, rs.remaining()))
}
else
af.pure[Option[(Chunk[Row], Int)]](None))

def streamQuery[T](cql: String, prepare: Prepare = identityPrepare, extractor: Extractor[T] = identityExtractor)(info: ExecutionInfo, dc: Runner): StreamResult[T] = {
Stream
.eval(prepareRowAndLog(cql, prepare))
.evalMap(p => af.fromCompletableFuture(af.delay(session.executeAsync(p).toCompletableFuture)))
.flatMap(page)
.map(it => extractor(it, this))
}

def executeQuery[T](cql: String, prepare: Prepare = identityPrepare, extractor: Extractor[T] = identityExtractor)(info: ExecutionInfo, dc: Runner): Result[RunQueryResult[T]] =
streamQuery[T](cql, prepare, extractor)(info, dc).compile.toList

def executeQuerySingle[T](cql: String, prepare: Prepare = identityPrepare, extractor: Extractor[T] = identityExtractor)(info: ExecutionInfo, dc: Runner): Result[RunQuerySingleResult[T]] =
Functor[F].map(executeQuery(cql, prepare, extractor)(info, dc))(handleSingleResult)

def executeAction(cql: String, prepare: Prepare = identityPrepare)(info: ExecutionInfo, dc: Runner): Result[RunActionResult] = {
prepareRowAndLog(cql, prepare)
.flatMap(r => af.fromCompletableFuture(af.delay(session.executeAsync(r).toCompletableFuture)))
.map(_ => ())
}

def executeBatchAction(groups: List[BatchGroup])(info: ExecutionInfo, dc: Runner): Result[RunBatchActionResult] =
groups.traverse_ {
case BatchGroup(cql, prepare) =>
prepare.traverse_(executeAction(cql, _)(info, dc))
}
}

object CassandraCeContext {

def apply[N <: NamingStrategy, F[_]: Async: FlatMap](naming: N, config: CassandraContextConfig): CassandraCeContext[N, F] =
new CassandraCeContext(naming, config.session, config.preparedStatementCacheSize)

def apply[N <: NamingStrategy, F[_]: Async: FlatMap](naming: N, config: Config): CassandraCeContext[N, F] =
CassandraCeContext(naming, CassandraContextConfig(config))

def apply[N <: NamingStrategy, F[_]: Async: FlatMap](naming: N, configPrefix: String): CassandraCeContext[N, F] =
CassandraCeContext(naming, LoadConfig(configPrefix))

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
package io.getquill

import cats._
import cats.effect._
import cats.syntax.all._
import fs2.{Chunk, Stream}
import com.typesafe.config.Config
import com.datastax.oss.driver.api.core.cql.Row
import com.datastax.oss.driver.api.core.CqlSession
import com.datastax.oss.driver.api.core.cql.AsyncResultSet
import io.getquill.context.cassandra.CqlIdiom
import io.getquill.util.{ContextLogger, LoadConfig}
import io.getquill.context.ExecutionInfo
import io.getquill.context.ce.CeContext

import scala.jdk.CollectionConverters._
import scala.language.higherKinds

class CassandraCeContext[N <: NamingStrategy, F[_]](
naming: N,
session: CqlSession,
preparedStatementCacheSize: Long
)(implicit val af: Async[F])
extends CassandraCqlSessionContext[N](naming, session, preparedStatementCacheSize)
with CeContext[CqlIdiom, N, F] {

private val logger = ContextLogger(classOf[CassandraCeContext[_, F]])

private[getquill] def prepareRowAndLog(cql: String, prepare: Prepare = identityPrepare): F[PrepareRow] = for {
ec <- Async[F].executionContext
futureStatement = Sync[F].delay(prepareAsync(cql)(ec))
prepStatement <- Async[F].fromFuture(futureStatement)
(params, bs) = prepare(prepStatement, this)
_ <- Sync[F].delay(logger.logQuery(cql, params))
} yield bs

protected def page(rs: AsyncResultSet): Stream[F, Row] =
Stream.unfoldChunkEval(rs.remaining())(rem =>
if (rem > 0)
af.delay(Some(Chunk.iterable(rs.currentPage().asScala), rs.remaining()))
else
af.pure(None)
)

def streamQuery[T](cql: String, prepare: Prepare = identityPrepare, extractor: Extractor[T] = identityExtractor)(info: ExecutionInfo, dc: Runner): StreamResult[T] = {
Stream
.eval(prepareRowAndLog(cql, prepare))
.evalMap(p => af.fromCompletableFuture(af.delay(session.executeAsync(p).toCompletableFuture)))
.flatMap(page)
.map(it => extractor(it, this))
}

def executeQuery[T](cql: String, prepare: Prepare = identityPrepare, extractor: Extractor[T] = identityExtractor)(info: ExecutionInfo, dc: Runner): Result[RunQueryResult[T]] =
streamQuery[T](cql, prepare, extractor)(info, dc).compile.toList

def executeQuerySingle[T](cql: String, prepare: Prepare = identityPrepare, extractor: Extractor[T] = identityExtractor)(info: ExecutionInfo, dc: Runner): Result[RunQuerySingleResult[T]] =
Functor[F].map(executeQuery(cql, prepare, extractor)(info, dc))(handleSingleResult)

def executeAction(cql: String, prepare: Prepare = identityPrepare)(info: ExecutionInfo, dc: Runner): Result[RunActionResult] = {
prepareRowAndLog(cql, prepare)
.flatMap(r => af.fromCompletableFuture(af.delay(session.executeAsync(r).toCompletableFuture)))
.map(_ => ())
}

def executeBatchAction(groups: List[BatchGroup])(info: ExecutionInfo, dc: Runner): Result[RunBatchActionResult] =
groups.traverse_ {
case BatchGroup(cql, prepare) =>
prepare.traverse_(executeAction(cql, _)(info, dc))
}
}

object CassandraCeContext {

def apply[N <: NamingStrategy, F[_] : Async : FlatMap](naming: N, config: CassandraContextConfig): CassandraCeContext[N, F] =
new CassandraCeContext(naming, config.session, config.preparedStatementCacheSize)

def apply[N <: NamingStrategy, F[_] : Async : FlatMap](naming: N, config: Config): CassandraCeContext[N, F] =
CassandraCeContext(naming, CassandraContextConfig(config))

def apply[N <: NamingStrategy, F[_] : Async : FlatMap](naming: N, configPrefix: String): CassandraCeContext[N, F] =
CassandraCeContext(naming, LoadConfig(configPrefix))

}
12 changes: 12 additions & 0 deletions quill-cassandra-ce/src/test/resources/application.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
testStreamDB {
preparedStatementCacheSize=1
keyspace=quill_test

session {
basic.contact-points = [ ${?CASSANDRA_CONTACT_POINT_0}, ${?CASSANDRA_CONTACT_POINT_1} ]
basic.load-balancing-policy.local-datacenter = ${?CASSANDRA_DC}
basic.request.consistency = LOCAL_QUORUM
basic.request.page-size = 999
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package io.getquill.context.cassandra.catEffect

import io.getquill._

class DecodeNullSpec extends Spec {

"no default values when reading null" - {
"stream" in {
import io.getquill.context.cassandra.catsEffect.testCeDB._
import io.getquill.context.cassandra.catsEffect.testCeDB
import cats.effect.unsafe.implicits.global

val writeEntities = quote(querySchema[DecodeNullTestWriteEntity]("DecodeNullTestEntity"))

val result =
for {
_ <- testCeDB.run(writeEntities.delete)
_ <- testCeDB.run(writeEntities.insert(lift(insertValue)))
result <- testCeDB.run(query[DecodeNullTestEntity])
} yield {
result
}
intercept[IllegalStateException] {
await {
result.unsafeToFuture()
}
}
}
}

case class DecodeNullTestEntity(id: Int, value: Int)

case class DecodeNullTestWriteEntity(id: Int, value: Option[Int])

val insertValue = DecodeNullTestWriteEntity(0, None)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package io.getquill.context.cassandra.catEffect

import io.getquill.Query
import io.getquill.context.cassandra.EncodingSpecHelper
import io.getquill.context.cassandra.catsEffect.testCeDB._
import io.getquill.context.cassandra.catsEffect.testCeDB
import cats.effect.unsafe.implicits.global

class EncodingSpec extends EncodingSpecHelper {
"encodes and decodes types" - {
"stream" in {
val result =
for {
_ <- testCeDB.run(query[EncodingTestEntity].delete)
_ <- testCeDB.run(liftQuery(insertValues).foreach(e => query[EncodingTestEntity].insert(e)))
result <- testCeDB.run(query[EncodingTestEntity])
} yield {
result
}
val f = result.unsafeToFuture()
val r = await(f)
verify(r)
}
}

"encodes collections" - {
"stream" in {
val q = quote {
(list: Query[Int]) =>
query[EncodingTestEntity].filter(t => list.contains(t.id))
}
val result =
for {
_ <- testCeDB.run(query[EncodingTestEntity].delete)
_ <- testCeDB.run(liftQuery(insertValues).foreach(e => query[EncodingTestEntity].insert(e)))
result <- testCeDB.run(q(liftQuery(insertValues.map(_.id))))
} yield {
result
}
val f = result.unsafeToFuture()
verify(await(f))
}
}
}
Loading

0 comments on commit 54c2323

Please sign in to comment.