Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cats Effect 3 context #2343

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion .dockerignore
Original file line number Diff line number Diff line change
@@ -1 +1,3 @@
*
*
!/quill-cassandra/src/test/cql/cassandra-schema.cql
!/build/cassandra-entrypoint.sh
43 changes: 40 additions & 3 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ lazy val codegenModules = Seq[sbt.ClasspathDep[sbt.ProjectReference]](
)

lazy val bigdataModules = Seq[sbt.ClasspathDep[sbt.ProjectReference]](
`quill-cassandra`, `quill-cassandra-lagom`, `quill-cassandra-monix`, `quill-cassandra-zio`, `quill-orientdb`, `quill-spark`
`quill-cassandra`, `quill-cassandra-lagom`, `quill-cassandra-monix`, `quill-cassandra-zio`, `quill-orientdb`, `quill-spark`, `quill-cassandra-ce`
)

lazy val allModules =
Expand All @@ -76,6 +76,7 @@ lazy val scala213Modules = baseModules ++ jsModules ++ dbModules ++ codegenModul
`quill-async-postgres`,
`quill-finagle-mysql`,
`quill-cassandra`,
`quill-cassandra-ce`,
`quill-cassandra-lagom`,
`quill-cassandra-monix`,
`quill-cassandra-zio`,
Expand Down Expand Up @@ -391,8 +392,8 @@ lazy val `quill-monix` =
.settings(
Test / fork := true,
libraryDependencies ++= Seq(
"io.monix" %% "monix-eval" % "3.0.0",
"io.monix" %% "monix-reactive" % "3.0.0"
"io.monix" %% "monix-eval" % "3.0.0" excludeAll(ExclusionRule(organization = "org.typelevel")),
"io.monix" %% "monix-reactive" % "3.0.0" excludeAll(ExclusionRule(organization = "org.typelevel")),
)
)
.dependsOn(`quill-core-jvm` % "compile->compile;test->test")
Expand Down Expand Up @@ -434,6 +435,31 @@ lazy val `quill-zio` =
.dependsOn(`quill-core-jvm` % "compile->compile;test->test")
.enablePlugins(MimaPlugin)

lazy val `quill-ce` =
(project in file("quill-ce"))
.settings(commonSettings: _*)
.settings(mimaSettings: _*)
.settings(
scalacOptions ++= {
CrossVersion.partialVersion(scalaVersion.value) match {
case Some((2, major)) if major <= 12 => Seq("-Ypartial-unification")
case _ => Seq.empty
}
},
libraryDependencies ++= {
CrossVersion.partialVersion(scalaVersion.value) match {
case Some((2, x)) if x >= 12 => Seq(
"org.typelevel" %% "cats-core" % "2.3.0",
"org.typelevel" %% "cats-effect" % "3.1.1",
"co.fs2" %% "fs2-core" % "3.0.4",
)
case _ => Seq.empty
}
}
)
.dependsOn(`quill-core-jvm` % "compile->compile;test->test")
.enablePlugins(MimaPlugin)

lazy val `quill-jdbc-zio` =
(project in file("quill-jdbc-zio"))
.settings(commonSettings: _*)
Expand Down Expand Up @@ -666,6 +692,16 @@ lazy val `quill-cassandra-zio` =
.dependsOn(`quill-zio` % "compile->compile;test->test")
.enablePlugins(MimaPlugin)

lazy val `quill-cassandra-ce` =
(project in file("quill-cassandra-ce"))
.settings(commonSettings: _*)
.settings(mimaSettings: _*)
.settings(
Test / fork := true
)
.dependsOn(`quill-cassandra` % "compile->compile;test->test")
.dependsOn(`quill-ce` % "compile->compile;test->test")
.enablePlugins(MimaPlugin)

lazy val `quill-cassandra-lagom` =
(project in file("quill-cassandra-lagom"))
Expand All @@ -686,6 +722,7 @@ lazy val `quill-cassandra-lagom` =
) ++ versionSpecificDependencies
}
)
.dependsOn(`quill-cassandra` % "compile->compile;test->test")
.enablePlugins(MimaPlugin)


Expand Down
6 changes: 6 additions & 0 deletions build/Dockerfile-cassandra
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,9 @@ FROM cassandra:3.11.11
MAINTAINER [email protected]

RUN apt-get update; DEBIAN_FRONTEND=noninteractive apt-get install -y --no-install-recommends default-jdk
ENV LANG C.UTF-8
COPY ./build/cassandra-entrypoint.sh /entrypoint.sh
RUN chmod 755 /entrypoint.sh
COPY ./quill-cassandra/src/test/cql/*.cql /docker-entrypoint-initdb.d/
ENTRYPOINT ["/entrypoint.sh"]
CMD ["cassandra", "-f"]
9 changes: 0 additions & 9 deletions build/build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -141,15 +141,6 @@ function wait_for_bigdata() {
sbt scalariformFormat test:scalariformFormat
sbt checkUnformattedFiles
sbt $SBT_ARGS quill-coreJVM/test:compile & COMPILE=$!
./build/setup_bigdata.sh & SETUP=$!

wait $SETUP
if [[ "$?" != "0" ]]; then
echo "build.sh =:> BigData Database setup failed"
sleep 10
kill -9 $COMPILE
exit 1
fi

wait $COMPILE
if [[ "$?" != "0" ]]; then
Expand Down
65 changes: 65 additions & 0 deletions build/cassandra-entrypoint.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
#!/usr/bin/env bash
##
## This script will generate a patched docker-entrypoint.sh that:
## - executes any *.sh script found in /docker-entrypoint-initdb.d
## - boots cassandra up
## - executes any *.cql script found in docker-entrypoint-initdb.d
##
## It is compatible with any cassandra:* image
##


## Create script that executes files found in docker-entrypoint-initdb.d/

cat <<'EOF' >> /run-init-scripts.sh
#!/usr/bin/env bash

LOCK=/var/lib/cassandra/_init.done
INIT_DIR=docker-entrypoint-initdb.d

if [ -f "$LOCK" ]; then
echo "@@ Initialization already performed."
exit 0
fi

cd $INIT_DIR

echo "@@ Executing bash scripts found in $INIT_DIR"

# execute scripts found in INIT_DIR
for f in $(find . -type f -name "*.sh" -executable -print | sort); do
echo "$0: sourcing $f"
. "$f"
echo "$0: $f executed."
done

# wait for cassandra to be ready and execute cql in background
(
while ! cqlsh -e 'describe cluster' > /dev/null 2>&1; do sleep 6; done
echo "$0: Cassandra cluster ready: executing cql scripts found in $INIT_DIR"
for f in $(find . -type f -name "*.cql" -print | sort); do
echo "$0: running $f"
cqlsh -f "$f"
echo "$0: $f executed"
done
# mark things as initialized (in case /var/lib/cassandra was mapped to a local folder)
touch $LOCK
) &

EOF

## Patch existing entrypoint to call our script in the background
# This has been inspired by https://www.thetopsites.net/article/51594713.shtml
EP=/patched-entrypoint.sh
sed '$ d' /docker-entrypoint.sh > $EP
cat <<'EOF' >> $EP
/run-init-scripts.sh &
exec "$@"
EOF

# Make both scripts executable
chmod +x /run-init-scripts.sh
chmod +x $EP

# Call the new entrypoint
$EP "$@"
15 changes: 0 additions & 15 deletions build/setup_bigdata.sh

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
package io.getquill

import cats._
import cats.effect._
import com.datastax.oss.driver.api.core.cql.Row
import cats.syntax.all._
import com.datastax.oss.driver.api.core.CqlSession
import com.datastax.oss.driver.api.core.cql.AsyncResultSet
import fs2.{ Chunk, Stream }
import com.typesafe.config.Config
import io.getquill.context.cassandra.CqlIdiom
import io.getquill.util.{ ContextLogger, LoadConfig }
import io.getquill.context.ExecutionInfo
import io.getquill.context.ce.CeContext

import scala.jdk.CollectionConverters._
import scala.language.higherKinds

class CassandraCeContext[N <: NamingStrategy, F[_]](
naming: N,
session: CqlSession,
preparedStatementCacheSize: Long
)(implicit val af: Async[F])
extends CassandraCqlSessionContext[N](naming, session, preparedStatementCacheSize)
with CeContext[CqlIdiom, N, F] {

private val logger = ContextLogger(classOf[CassandraCeContext[_, F]])

private[getquill] def prepareRowAndLog(cql: String, prepare: Prepare = identityPrepare): F[PrepareRow] = for {
ec <- Async[F].executionContext
futureStatement = Sync[F].delay(prepareAsync(cql)(ec))
prepStatement <- Async[F].fromFuture(futureStatement)
(params, bs) = prepare(prepStatement, this)
_ <- Sync[F].delay(logger.logQuery(cql, params))
} yield bs

protected def page(rs: AsyncResultSet): Stream[F, Row] =
Stream.unfoldChunkEval(rs.remaining())(rem =>
if (rem > 0)
af.delay[Option[(Chunk[Row], Int)]] {
val chunk: Chunk[Row] = Chunk.iterable(rs.currentPage().asScala)
Some((chunk, rs.remaining()))
}
else
af.pure[Option[(Chunk[Row], Int)]](None))

def streamQuery[T](cql: String, prepare: Prepare = identityPrepare, extractor: Extractor[T] = identityExtractor)(info: ExecutionInfo, dc: Runner): StreamResult[T] = {
Stream
.eval(prepareRowAndLog(cql, prepare))
.evalMap(p => af.fromCompletableFuture(af.delay(session.executeAsync(p).toCompletableFuture)))
.flatMap(page)
.map(it => extractor(it, this))
}

def executeQuery[T](cql: String, prepare: Prepare = identityPrepare, extractor: Extractor[T] = identityExtractor)(info: ExecutionInfo, dc: Runner): Result[RunQueryResult[T]] =
streamQuery[T](cql, prepare, extractor)(info, dc).compile.toList

def executeQuerySingle[T](cql: String, prepare: Prepare = identityPrepare, extractor: Extractor[T] = identityExtractor)(info: ExecutionInfo, dc: Runner): Result[RunQuerySingleResult[T]] =
Functor[F].map(executeQuery(cql, prepare, extractor)(info, dc))(handleSingleResult)

def executeAction(cql: String, prepare: Prepare = identityPrepare)(info: ExecutionInfo, dc: Runner): Result[RunActionResult] = {
prepareRowAndLog(cql, prepare)
.flatMap(r => af.fromCompletableFuture(af.delay(session.executeAsync(r).toCompletableFuture)))
.map(_ => ())
}

def executeBatchAction(groups: List[BatchGroup])(info: ExecutionInfo, dc: Runner): Result[RunBatchActionResult] =
groups.traverse_ {
case BatchGroup(cql, prepare) =>
prepare.traverse_(executeAction(cql, _)(info, dc))
}
}

object CassandraCeContext {

def apply[N <: NamingStrategy, F[_]: Async: FlatMap](naming: N, config: CassandraContextConfig): CassandraCeContext[N, F] =
new CassandraCeContext(naming, config.session, config.preparedStatementCacheSize)

def apply[N <: NamingStrategy, F[_]: Async: FlatMap](naming: N, config: Config): CassandraCeContext[N, F] =
CassandraCeContext(naming, CassandraContextConfig(config))

def apply[N <: NamingStrategy, F[_]: Async: FlatMap](naming: N, configPrefix: String): CassandraCeContext[N, F] =
CassandraCeContext(naming, LoadConfig(configPrefix))

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
package io.getquill

import cats._
import cats.effect._
import com.datastax.oss.driver.api.core.cql.Row
import cats.syntax.all._
import com.datastax.oss.driver.api.core.CqlSession
import com.datastax.oss.driver.api.core.cql.AsyncResultSet
import fs2.{ Chunk, Stream }
import com.typesafe.config.Config
import io.getquill.context.cassandra.CqlIdiom
import io.getquill.util.{ ContextLogger, LoadConfig }
import io.getquill.context.ExecutionInfo
import io.getquill.context.ce.CeContext

import scala.jdk.CollectionConverters._
import scala.language.higherKinds

class CassandraCeContext[N <: NamingStrategy, F[_]](
naming: N,
session: CqlSession,
preparedStatementCacheSize: Long
)(implicit val af: Async[F])
extends CassandraCqlSessionContext[N](naming, session, preparedStatementCacheSize)
with CeContext[CqlIdiom, N, F] {

private val logger = ContextLogger(classOf[CassandraCeContext[_, F]])

private[getquill] def prepareRowAndLog(cql: String, prepare: Prepare = identityPrepare): F[PrepareRow] = for {
ec <- Async[F].executionContext
futureStatement = Sync[F].delay(prepareAsync(cql)(ec))
prepStatement <- Async[F].fromFuture(futureStatement)
(params, bs) = prepare(prepStatement, this)
_ <- Sync[F].delay(logger.logQuery(cql, params))
} yield bs

protected def page(rs: AsyncResultSet): Stream[F, Row] =
Stream.unfoldChunkEval(rs.remaining())(rem =>
if (rem > 0)
af.delay[Option[(Chunk[Row], Int)]] {
val chunk: Chunk[Row] = Chunk.iterable(rs.currentPage().asScala)
Some((chunk, rs.remaining()))
}
else
af.pure[Option[(Chunk[Row], Int)]](None))

def streamQuery[T](cql: String, prepare: Prepare = identityPrepare, extractor: Extractor[T] = identityExtractor)(info: ExecutionInfo, dc: Runner): StreamResult[T] = {
Stream
.eval(prepareRowAndLog(cql, prepare))
.evalMap(p => af.fromCompletableFuture(af.delay(session.executeAsync(p).toCompletableFuture)))
.flatMap(page)
.map(it => extractor(it, this))
}

def executeQuery[T](cql: String, prepare: Prepare = identityPrepare, extractor: Extractor[T] = identityExtractor)(info: ExecutionInfo, dc: Runner): Result[RunQueryResult[T]] =
streamQuery[T](cql, prepare, extractor)(info, dc).compile.toList

def executeQuerySingle[T](cql: String, prepare: Prepare = identityPrepare, extractor: Extractor[T] = identityExtractor)(info: ExecutionInfo, dc: Runner): Result[RunQuerySingleResult[T]] =
Functor[F].map(executeQuery(cql, prepare, extractor)(info, dc))(handleSingleResult)

def executeAction(cql: String, prepare: Prepare = identityPrepare)(info: ExecutionInfo, dc: Runner): Result[RunActionResult] = {
prepareRowAndLog(cql, prepare)
.flatMap(r => af.fromCompletableFuture(af.delay(session.executeAsync(r).toCompletableFuture)))
.map(_ => ())
}

def executeBatchAction(groups: List[BatchGroup])(info: ExecutionInfo, dc: Runner): Result[RunBatchActionResult] =
groups.traverse_ {
case BatchGroup(cql, prepare) =>
prepare.traverse_(executeAction(cql, _)(info, dc))
}
}

object CassandraCeContext {

def apply[N <: NamingStrategy, F[_]: Async: FlatMap](naming: N, config: CassandraContextConfig): CassandraCeContext[N, F] =
new CassandraCeContext(naming, config.session, config.preparedStatementCacheSize)

def apply[N <: NamingStrategy, F[_]: Async: FlatMap](naming: N, config: Config): CassandraCeContext[N, F] =
CassandraCeContext(naming, CassandraContextConfig(config))

def apply[N <: NamingStrategy, F[_]: Async: FlatMap](naming: N, configPrefix: String): CassandraCeContext[N, F] =
CassandraCeContext(naming, LoadConfig(configPrefix))

}
12 changes: 12 additions & 0 deletions quill-cassandra-ce/src/test/resources/application.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
testStreamDB {
preparedStatementCacheSize=1
keyspace=quill_test

session {
basic.contact-points = [ ${?CASSANDRA_CONTACT_POINT_0}, ${?CASSANDRA_CONTACT_POINT_1} ]
basic.load-balancing-policy.local-datacenter = ${?CASSANDRA_DC}
basic.request.consistency = LOCAL_QUORUM
basic.request.page-size = 999
}

}
Loading