From 3103930afa94440f40c09fee5240306104e73afb Mon Sep 17 00:00:00 2001 From: Dionysios Ntaouros Date: Tue, 6 Sep 2022 16:55:17 +0300 Subject: [PATCH 1/2] Use fetchSize, return next page instead of the current one FMT --- .../io/getquill/CassandraMonixContext.scala | 68 +++++++++++++------ 1 file changed, 49 insertions(+), 19 deletions(-) diff --git a/quill-cassandra-monix/src/main/scala/io/getquill/CassandraMonixContext.scala b/quill-cassandra-monix/src/main/scala/io/getquill/CassandraMonixContext.scala index 0988e50a09..a94d92d9ec 100644 --- a/quill-cassandra-monix/src/main/scala/io/getquill/CassandraMonixContext.scala +++ b/quill-cassandra-monix/src/main/scala/io/getquill/CassandraMonixContext.scala @@ -7,10 +7,10 @@ import io.getquill.context.ExecutionInfo import io.getquill.context.cassandra.CqlIdiom import io.getquill.context.monix.MonixContext import io.getquill.util.{ ContextLogger, LoadConfig } -import io.getquill.context.cassandra.util.FutureConversions._ import monix.eval.Task import monix.execution.Scheduler import monix.reactive.Observable + import scala.compat.java8.FutureConverters._ import scala.jdk.CollectionConverters._ import scala.util.{ Failure, Success } @@ -19,8 +19,7 @@ class CassandraMonixContext[+N <: NamingStrategy]( naming: N, session: CqlSession, preparedStatementCacheSize: Long -) - extends CassandraCqlSessionContext[N](naming, session, preparedStatementCacheSize) +) extends CassandraCqlSessionContext[N](naming, session, preparedStatementCacheSize) with MonixContext[CqlIdiom, N] { def this(naming: N, config: CassandraContextConfig) = this(naming, config.session, config.preparedStatementCacheSize) @@ -41,16 +40,25 @@ class CassandraMonixContext[+N <: NamingStrategy]( Task.defer { val page = rs.currentPage().asScala if (rs.hasMorePages) - Task.from(rs.fetchNextPage().toCompletableFuture.toScala).map(_ => page) + Task + .from(rs.fetchNextPage().toCompletableFuture.toScala) + .map(_.currentPage().asScala) else Task.now(page) } - def streamQuery[T](fetchSize: Option[Int], cql: String, prepare: Prepare = identityPrepare, extractor: Extractor[T] = identityExtractor)(info: ExecutionInfo, dc: Runner): Observable[T] = { + def streamQuery[T]( + fetchSize: Option[Int], + cql: String, + prepare: Prepare = identityPrepare, + extractor: Extractor[T] = identityExtractor + )(info: ExecutionInfo, dc: Runner): Observable[T] = { Observable - .fromTask(prepareRowAndLog(cql, prepare)) + .fromTask(prepareRowAndLog(cql, prepare, fetchSize)) .mapEvalF(p => session.executeAsync(p).toScala) - .flatMap(Observable.fromAsyncStateAction((rs: AsyncResultSet) => page(rs).map((_, rs)))(_)) + .flatMap( + Observable.fromAsyncStateAction((rs: AsyncResultSet) => page(rs).map((_, rs)))(_) + ) .takeWhile(_.nonEmpty) .flatMap(Observable.fromIterable) .map(row => extractor(row, this)) @@ -58,11 +66,18 @@ class CassandraMonixContext[+N <: NamingStrategy]( def executeQuery[T](cql: String, prepare: Prepare = identityPrepare, extractor: Extractor[T] = identityExtractor)(info: ExecutionInfo, dc: Runner): Task[List[T]] = { streamQuery[T](None, cql, prepare, extractor)(info, dc) - .foldLeftL(List[T]())({ case (l, r) => r +: l }).map(_.reverse) + .foldLeftL(List[T]())({ case (l, r) => r +: l }) + .map(_.reverse) } - def executeQuerySingle[T](cql: String, prepare: Prepare = identityPrepare, extractor: Extractor[T] = identityExtractor)(info: ExecutionInfo, dc: Runner): Task[T] = - executeQuery(cql, prepare, extractor)(info, dc).map(handleSingleResult(cql, _)) + def executeQuerySingle[T]( + cql: String, + prepare: Prepare = identityPrepare, + extractor: Extractor[T] = identityExtractor + )(info: ExecutionInfo, dc: Runner): Task[T] = + executeQuery(cql, prepare, extractor)(info, dc).map( + handleSingleResult(cql, _) + ) def executeAction(cql: String, prepare: Prepare = identityPrepare)(info: ExecutionInfo, dc: Runner): Task[Unit] = { prepareRowAndLog(cql, prepare) @@ -71,19 +86,34 @@ class CassandraMonixContext[+N <: NamingStrategy]( } def executeBatchAction(groups: List[BatchGroup])(info: ExecutionInfo, dc: Runner): Task[Unit] = - Observable.fromIterable(groups).flatMap { - case BatchGroup(cql, prepare) => - Observable.fromIterable(prepare) - .flatMap(prep => Observable.fromTask(executeAction(cql, prep)(info, dc))) - .map(_ => ()) - }.completedL + Observable + .fromIterable(groups) + .flatMap { + case BatchGroup(cql, prepare) => + Observable + .fromIterable(prepare) + .flatMap(prep => Observable.fromTask(executeAction(cql, prep)(info, dc))) + .map(_ => ()) + } + .completedL - private def prepareRowAndLog(cql: String, prepare: Prepare = identityPrepare): Task[PrepareRow] = { + private def prepareRowAndLog( + cql: String, + prepare: Prepare = identityPrepare, + fetchSize: Option[Int] = None + ): Task[PrepareRow] = { Task.async0[PrepareRow] { (scheduler, callback) => implicit val executor: Scheduler = scheduler - super.prepareAsync(cql) - .map(row => prepare(row, this)) + super + .prepareAsync(cql) + .map { row => + val rowWithPageSize = fetchSize match { + case Some(size) => row.setPageSize(size) + case None => row + } + prepare(rowWithPageSize, this) + } .onComplete { case Success((params, bs)) => logger.logQuery(cql, params) From 520371891e896ab16bfaa7fdd8c83ba40faf00c6 Mon Sep 17 00:00:00 2001 From: Dionysios Ntaouros Date: Wed, 19 Oct 2022 15:31:16 +0200 Subject: [PATCH 2/2] Fix batch issue --- .../io/getquill/CassandraMonixContext.scala | 146 +++++++++++++----- 1 file changed, 109 insertions(+), 37 deletions(-) diff --git a/quill-cassandra-monix/src/main/scala/io/getquill/CassandraMonixContext.scala b/quill-cassandra-monix/src/main/scala/io/getquill/CassandraMonixContext.scala index a94d92d9ec..635ed40d9e 100644 --- a/quill-cassandra-monix/src/main/scala/io/getquill/CassandraMonixContext.scala +++ b/quill-cassandra-monix/src/main/scala/io/getquill/CassandraMonixContext.scala @@ -1,12 +1,14 @@ package io.getquill import com.datastax.oss.driver.api.core.CqlSession -import com.datastax.oss.driver.api.core.cql.{ AsyncResultSet, Row } +import com.datastax.oss.driver.api.core.cql._ +import com.restore.cassandra.baselineMigration.migration.Logging import com.typesafe.config.Config import io.getquill.context.ExecutionInfo import io.getquill.context.cassandra.CqlIdiom import io.getquill.context.monix.MonixContext import io.getquill.util.{ ContextLogger, LoadConfig } +import io.getquill.{ CassandraContextConfig, CassandraCqlSessionContext, NamingStrategy } import monix.eval.Task import monix.execution.Scheduler import monix.reactive.Observable @@ -16,15 +18,23 @@ import scala.jdk.CollectionConverters._ import scala.util.{ Failure, Success } class CassandraMonixContext[+N <: NamingStrategy]( - naming: N, - session: CqlSession, - preparedStatementCacheSize: Long -) extends CassandraCqlSessionContext[N](naming, session, preparedStatementCacheSize) - with MonixContext[CqlIdiom, N] { + naming: N, + session: CqlSession, + preparedStatementCacheSize: Long +) extends CassandraCqlSessionContext[N]( + naming, + session, + preparedStatementCacheSize + ) + with MonixContext[CqlIdiom, N] + with Logging { - def this(naming: N, config: CassandraContextConfig) = this(naming, config.session, config.preparedStatementCacheSize) - def this(naming: N, config: Config) = this(naming, CassandraContextConfig(config)) - def this(naming: N, configPrefix: String) = this(naming, LoadConfig(configPrefix)) + def this(naming: N, config: CassandraContextConfig) = + this(naming, config.session, config.preparedStatementCacheSize) + def this(naming: N, config: Config) = + this(naming, CassandraContextConfig(config)) + def this(naming: N, configPrefix: String) = + this(naming, LoadConfig(configPrefix)) private val logger = ContextLogger(classOf[CassandraMonixContext[_]]) @@ -42,65 +52,126 @@ class CassandraMonixContext[+N <: NamingStrategy]( if (rs.hasMorePages) Task .from(rs.fetchNextPage().toCompletableFuture.toScala) - .map(_.currentPage().asScala) - else + .map(next => page) + else { Task.now(page) + } } def streamQuery[T]( - fetchSize: Option[Int], - cql: String, - prepare: Prepare = identityPrepare, - extractor: Extractor[T] = identityExtractor - )(info: ExecutionInfo, dc: Runner): Observable[T] = { + fetchSize: Option[Int], + cql: String, + prepare: Prepare = identityPrepare, + extractor: Extractor[T] = identityExtractor + )( + info: ExecutionInfo, + dc: Runner + ): Observable[T] = { Observable .fromTask(prepareRowAndLog(cql, prepare, fetchSize)) .mapEvalF(p => session.executeAsync(p).toScala) - .flatMap( - Observable.fromAsyncStateAction((rs: AsyncResultSet) => page(rs).map((_, rs)))(_) + .flatMap(x => + Observable.fromAsyncStateAction { (rs: Option[AsyncResultSet]) => + rs match { + case None => Task.now(Iterable.empty -> None) + case Some(rs) if rs.hasMorePages => { + val page = rs.currentPage() + Task + .fromFuture(rs.fetchNextPage().toScala) + .map(rs => page.asScala -> Some(rs)) + } + case Some(rs) => + val page = rs.currentPage() + Task.now(page.asScala -> None) + } + }(Some(x)) ) .takeWhile(_.nonEmpty) .flatMap(Observable.fromIterable) .map(row => extractor(row, this)) } - def executeQuery[T](cql: String, prepare: Prepare = identityPrepare, extractor: Extractor[T] = identityExtractor)(info: ExecutionInfo, dc: Runner): Task[List[T]] = { + def executeQuery[T]( + cql: String, + prepare: Prepare = identityPrepare, + extractor: Extractor[T] = identityExtractor + )( + info: ExecutionInfo, + dc: Runner + ): Task[List[T]] = { streamQuery[T](None, cql, prepare, extractor)(info, dc) - .foldLeftL(List[T]())({ case (l, r) => r +: l }) + .foldLeftL(List[T]()) { case (l, r) => r +: l } .map(_.reverse) } def executeQuerySingle[T]( - cql: String, - prepare: Prepare = identityPrepare, - extractor: Extractor[T] = identityExtractor - )(info: ExecutionInfo, dc: Runner): Task[T] = + cql: String, + prepare: Prepare = identityPrepare, + extractor: Extractor[T] = identityExtractor + )( + info: ExecutionInfo, + dc: Runner + ): Task[T] = executeQuery(cql, prepare, extractor)(info, dc).map( handleSingleResult(cql, _) ) - def executeAction(cql: String, prepare: Prepare = identityPrepare)(info: ExecutionInfo, dc: Runner): Task[Unit] = { + def executeAction( + cql: String, + prepare: Prepare = identityPrepare + )( + info: ExecutionInfo, + dc: Runner + ): Task[Unit] = { prepareRowAndLog(cql, prepare) .flatMap(r => Task.fromFuture(session.executeAsync(r).toScala)) .map(_ => ()) } - def executeBatchAction(groups: List[BatchGroup])(info: ExecutionInfo, dc: Runner): Task[Unit] = + def executeBatchAction( + groups: List[BatchGroup] + )( + info: ExecutionInfo, + dc: Runner + ): Task[Unit] = Observable .fromIterable(groups) - .flatMap { - case BatchGroup(cql, prepare) => - Observable - .fromIterable(prepare) - .flatMap(prep => Observable.fromTask(executeAction(cql, prep)(info, dc))) - .map(_ => ()) + .flatMap { case BatchGroup(cql, prepare) => + Observable + .fromTask { + Task + .async0[PrepareRow] { (scheduler, callback) => + super + .prepareAsync(cql)(scheduler) + .onComplete { case Success(statement) => + callback.onSuccess(statement) + + }(scheduler) + } + .flatMap { statements => + val preparedStatement = + prepare.map(_.apply(statements, this)._2) + + Task.fromFuture { + val batch = BatchStatement + .builder(DefaultBatchType.UNLOGGED) + .addStatements( + preparedStatement.asJava + .asInstanceOf[java.lang.Iterable[BatchableStatement[_]]] + ) + .build() + session.executeAsync(batch).toScala + } + } + } } + .map(_ => ()) .completedL private def prepareRowAndLog( - cql: String, - prepare: Prepare = identityPrepare, - fetchSize: Option[Int] = None + cql: String, + prepare: Prepare = identityPrepare, + fetchSize: Option[Int] = None ): Task[PrepareRow] = { Task.async0[PrepareRow] { (scheduler, callback) => implicit val executor: Scheduler = scheduler @@ -109,8 +180,9 @@ class CassandraMonixContext[+N <: NamingStrategy]( .prepareAsync(cql) .map { row => val rowWithPageSize = fetchSize match { - case Some(size) => row.setPageSize(size) - case None => row + case Some(size) => + row.setPageSize(size) + case None => row } prepare(rowWithPageSize, this) }