From 5bbe073bf37789d2d02ba4f34ea3412a52febd06 Mon Sep 17 00:00:00 2001 From: eikek Date: Fri, 12 Aug 2022 16:30:32 +0200 Subject: [PATCH] Fix job query for H2 Unfortunately, the new h2 version has some regressions related to CTEs. The query selecting the next group failed only for H2 after the update. The query has been rewritten to not use union on CTE tables. The weird thing was that the error only occured using bind values and was not reproducible with "just string" SQL in the h2 console. The QJobTest are now running on all databases. --- .../scala/docspell/scheduler/impl/QJob.scala | 44 +++---- .../docspell/scheduler/impl/QJobTest.scala | 107 ++++++++++-------- .../scala/docspell/store/records/RJob.scala | 5 - .../scala/docspell/store/DatabaseTest.scala | 25 +++- .../scala/docspell/store/StoreFixture.scala | 14 ++- .../docspell/store/fts/TempFtsOpsTest.scala | 2 +- 6 files changed, 109 insertions(+), 88 deletions(-) diff --git a/modules/scheduler/impl/src/main/scala/docspell/scheduler/impl/QJob.scala b/modules/scheduler/impl/src/main/scala/docspell/scheduler/impl/QJob.scala index 91c37804..e56f88d0 100644 --- a/modules/scheduler/impl/src/main/scala/docspell/scheduler/impl/QJob.scala +++ b/modules/scheduler/impl/src/main/scala/docspell/scheduler/impl/QJob.scala @@ -9,13 +9,11 @@ package docspell.scheduler.impl import cats.effect.Async import cats.implicits._ import fs2.Stream - import docspell.common._ import docspell.store.Store import docspell.store.qb.DSL._ import docspell.store.qb._ import docspell.store.records.{RJob, RJobGroupUse} - import doobie.ConnectionIO object QJob { @@ -89,7 +87,7 @@ object QJob { res <- job.traverse(j => markJob(j)) } yield res.map(_.map(_.some)).getOrElse { if (group.isDefined) - Left(()) // if a group was found, but no job someone else was faster + Left(()) // if a group was found but no job, someone else was faster else Right(None) } } @@ -115,33 +113,27 @@ object QJob { val selectAll = Select(JC.group.s, from(JC), stateCond).distinct } - val sql1 = - Select( - select(min(AllGroups.group).as("g"), lit("0 as n")), - from(AllGroups), - AllGroups.group > Select(G.group.s, from(G), G.worker === worker) - ) - - val sql2 = - Select( - select(min(AllGroups.group).as("g"), lit("1 as n")), - from(AllGroups) - ) - - val gcol = Column[String]("g", TableDef("")) - val gnum = Column[Int]("n", TableDef("")) val groups = withCte(AllGroups -> AllGroups.selectAll) - .select(Select(gcol.s, from(union(sql1, sql2), "t0"), gcol.isNull.negate)) - .orderBy(gnum.asc) - .limit(1) + .select( + Select( + coalesce( + Select( + select(min(AllGroups.group)), + from(AllGroups), + AllGroups.group > Select(G.group.s, from(G), G.worker === worker) + ).asSubSelect, + Select(select(min(AllGroups.group)), from(AllGroups)).asSubSelect + ).s + ) + ) val frag = groups.build - cioLogger.trace( - s"nextGroupQuery: $frag (now=${now.toMillis}, pause=${initialPause.millis})" - ) - - frag.query[Ident].option + cioLogger + .trace( + s"nextGroupQuery: $frag (now=${now.toMillis}, pause=${initialPause.millis})" + ) *> + groups.build.query[Ident].option } private def stuckTriggerValue(t: RJob.Table, initialPause: Duration, now: Timestamp) = diff --git a/modules/scheduler/impl/src/test/scala/docspell/scheduler/impl/QJobTest.scala b/modules/scheduler/impl/src/test/scala/docspell/scheduler/impl/QJobTest.scala index 6c78a050..09ee6f1d 100644 --- a/modules/scheduler/impl/src/test/scala/docspell/scheduler/impl/QJobTest.scala +++ b/modules/scheduler/impl/src/test/scala/docspell/scheduler/impl/QJobTest.scala @@ -8,18 +8,12 @@ package docspell.scheduler.impl import java.time.Instant import java.util.concurrent.atomic.AtomicLong - -import cats.implicits._ - +import cats.syntax.all._ import docspell.common._ -import docspell.logging.TestLoggingConfig -import docspell.store.StoreFixture +import docspell.store.{DatabaseTest, Db} import docspell.store.records.{RJob, RJobGroupUse} -import doobie.implicits._ -import munit._ - -class QJobTest extends CatsEffectSuite with StoreFixture with TestLoggingConfig { +class QJobTest extends DatabaseTest { private[this] val c = new AtomicLong(0) private val worker = Ident.unsafe("joex1") @@ -28,6 +22,11 @@ class QJobTest extends CatsEffectSuite with StoreFixture with TestLoggingConfig private val group1 = Ident.unsafe("group1") private val group2 = Ident.unsafe("group2") + override def munitFixtures = h2File ++ mariaDbAll ++ postgresAll + + def createStore(dbms: Db) = + dbms.fold(pgStore(), mariaStore(), h2FileStore()) + def createJob(group: Ident): RJob = RJob.fromJson[Unit]( Ident.unsafe(s"job-${c.incrementAndGet()}"), @@ -41,54 +40,66 @@ class QJobTest extends CatsEffectSuite with StoreFixture with TestLoggingConfig None ) - xa.test("set group must insert or update") { tx => - val res = - for { - _ <- RJobGroupUse.setGroup(RJobGroupUse(group1, worker)).transact(tx) - res <- RJobGroupUse.findGroup(worker).transact(tx) - } yield res + Db.all.toList.foreach { db => + test(s"set group must insert or update ($db)") { + val store = createStore(db) + val res = + for { + _ <- store.transact(RJobGroupUse.setGroup(RJobGroupUse(group1, worker))) + res <- store.transact(RJobGroupUse.findGroup(worker)) + } yield res - res.assertEquals(Some(group1)) + res.assertEquals(Some(group1)) + } } - xa.test("selectNextGroup should return first group on initial state") { tx => - val nextGroup = for { - _ <- List(group1, group2, group1, group2, group2) - .map(createJob) - .map(RJob.insert) - .traverse(_.transact(tx)) - _ <- RJobGroupUse.deleteAll.transact(tx) - next <- QJob.selectNextGroup(worker, nowTs, initialPause).transact(tx) - } yield next + Db.all.toList.foreach { db => + test(s"selectNextGroup should return first group on initial state ($db)") { + val store = createStore(db) + val nextGroup = for { + _ <- List(group1, group2, group1, group2, group2) + .map(createJob) + .map(RJob.insert) + .traverse_(store.transact(_)) + _ <- store.transact(RJobGroupUse.deleteAll) + next <- store.transact(QJob.selectNextGroup(worker, nowTs, initialPause)) + } yield next - nextGroup.assertEquals(Some(group1)) + nextGroup.assertEquals(Some(group1)) + } } - xa.test("selectNextGroup should return second group on subsequent call (1)") { tx => - val nextGroup = for { - _ <- List(group1, group2, group1, group2) - .map(createJob) - .map(RJob.insert) - .traverse(_.transact(tx)) - _ <- RJobGroupUse.deleteAll.transact(tx) - _ <- RJobGroupUse.setGroup(RJobGroupUse(group1, worker)).transact(tx) - next <- QJob.selectNextGroup(worker, nowTs, initialPause).transact(tx) - } yield next + Db.all.toList.foreach { db => + test(s"selectNextGroup should return second group on subsequent call ($db)") { + val store = createStore(db) + val nextGroup = for { + _ <- List(group1, group2, group1, group2) + .map(createJob) + .map(RJob.insert) + .traverse_(store.transact(_)) + _ <- store.transact(RJobGroupUse.deleteAll) + _ <- store.transact(RJobGroupUse.setGroup(RJobGroupUse(group1, worker))) + next <- store.transact(QJob.selectNextGroup(worker, nowTs, initialPause)) + } yield next - nextGroup.assertEquals(Some(group2)) + nextGroup.assertEquals(Some(group2)) + } } - xa.test("selectNextGroup should return second group on subsequent call (2)") { tx => - val nextGroup = for { - _ <- List(group1, group2, group1, group2) - .map(createJob) - .map(RJob.insert) - .traverse(_.transact(tx)) - _ <- RJobGroupUse.deleteAll.transact(tx) - _ <- RJobGroupUse.setGroup(RJobGroupUse(group2, worker)).transact(tx) - next <- QJob.selectNextGroup(worker, nowTs, initialPause).transact(tx) - } yield next + Db.all.toList.foreach { db => + test(s"selectNextGroup should return first group on subsequent call ($db)") { + val store = createStore(db) + val nextGroup = for { + _ <- List(group1, group2, group1, group2) + .map(createJob) + .map(RJob.insert) + .traverse_(store.transact(_)) + _ <- store.transact(RJobGroupUse.deleteAll) + _ <- store.transact(RJobGroupUse.setGroup(RJobGroupUse(group2, worker))) + next <- store.transact(QJob.selectNextGroup(worker, nowTs, initialPause)) + } yield next - nextGroup.assertEquals(Some(group1)) + nextGroup.assertEquals(Some(group1)) + } } } diff --git a/modules/store/src/main/scala/docspell/store/records/RJob.scala b/modules/store/src/main/scala/docspell/store/records/RJob.scala index f8b00e8e..9177df06 100644 --- a/modules/store/src/main/scala/docspell/store/records/RJob.scala +++ b/modules/store/src/main/scala/docspell/store/records/RJob.scala @@ -300,11 +300,6 @@ object RJob { def setProgress(jobId: Ident, perc: Int): ConnectionIO[Int] = DML.update(T, T.id === jobId, DML.set(T.progress.setTo(perc))) - def selectWaiting: ConnectionIO[Option[RJob]] = { - val sql = run(select(T.all), from(T), T.state === JobState.waiting) - sql.query[RJob].to[Vector].map(_.headOption) - } - def selectGroupInState(states: NonEmptyList[JobState]): ConnectionIO[Vector[Ident]] = { val sql = Select(select(T.group), from(T), T.state.in(states)).orderBy(T.group) diff --git a/modules/store/src/test/scala/docspell/store/DatabaseTest.scala b/modules/store/src/test/scala/docspell/store/DatabaseTest.scala index e2e9363e..0eed8ab7 100644 --- a/modules/store/src/test/scala/docspell/store/DatabaseTest.scala +++ b/modules/store/src/test/scala/docspell/store/DatabaseTest.scala @@ -7,12 +7,10 @@ package docspell.store import java.util.UUID - import cats.effect._ - +import cats.syntax.option._ import docspell.common._ import docspell.logging.TestLoggingConfig - import com.dimafeng.testcontainers.munit.fixtures.TestContainersFixtures import com.dimafeng.testcontainers.{ JdbcDatabaseContainer, @@ -20,6 +18,7 @@ import com.dimafeng.testcontainers.{ PostgreSQLContainer } import doobie._ +import fs2.io.file.{Files, Path} import munit.CatsEffectSuite import org.testcontainers.utility.DockerImageName @@ -55,6 +54,15 @@ trait DatabaseTest } ) + lazy val h2FileDataSource = ResourceSuiteLocalFixture( + "h2FileDataSource", + for { + file <- Files[IO].tempFile(Path("target").some, "h2-test-", ".db", None) + jdbc = StoreFixture.fileDB(file) + res <- StoreFixture.dataSource(jdbc).map(ds => (jdbc, ds)) + } yield res + ) + lazy val newH2DataSource = ResourceFixture(for { jdbc <- Resource.eval(IO(StoreFixture.memoryDB(UUID.randomUUID().toString))) ds <- StoreFixture.dataSource(jdbc) @@ -84,9 +92,18 @@ trait DatabaseTest } yield store ) + lazy val h2FileStore = ResourceSuiteLocalFixture( + "h2FileStore", + for { + t <- Resource.eval(IO(h2FileDataSource())) + store <- StoreFixture.store(t._2, t._1) + } yield store + ) + def postgresAll = List(postgresCnt, pgDataSource, pgStore) def mariaDbAll = List(mariadbCnt, mariaDataSource, mariaStore) - def h2All = List(h2DataSource, h2Store) + def h2Memory = List(h2DataSource, h2Store) + def h2File = List(h2FileDataSource, h2FileStore) } object DatabaseTest { diff --git a/modules/store/src/test/scala/docspell/store/StoreFixture.scala b/modules/store/src/test/scala/docspell/store/StoreFixture.scala index d90b8d93..f9631031 100644 --- a/modules/store/src/test/scala/docspell/store/StoreFixture.scala +++ b/modules/store/src/test/scala/docspell/store/StoreFixture.scala @@ -7,15 +7,13 @@ package docspell.store import javax.sql.DataSource - import cats.effect._ - import docspell.common.LenientUri import docspell.store.file.{FileRepository, FileRepositoryConfig} import docspell.store.impl.StoreImpl import docspell.store.migrate.FlywayMigrate - import doobie._ +import fs2.io.file.Path import munit._ import org.h2.jdbcx.{JdbcConnectionPool, JdbcDataSource} import org.mariadb.jdbc.MariaDbDataSource @@ -55,6 +53,15 @@ object StoreFixture { "" ) + def fileDB(file: Path): JdbcConfig = + JdbcConfig( + LenientUri.unsafe( + s"jdbc:h2:file://${file.absolute.toString};MODE=PostgreSQL;DATABASE_TO_LOWER=TRUE" + ), + "sa", + "" + ) + def dataSource(jdbc: JdbcConfig): Resource[IO, JdbcConnectionPool] = { def jdbcConnPool = jdbc.dbms match { @@ -115,5 +122,4 @@ object StoreFixture { case None => IO.raiseError(new Exception(s"Resource not found: $resourceName")) } - } diff --git a/modules/store/src/test/scala/docspell/store/fts/TempFtsOpsTest.scala b/modules/store/src/test/scala/docspell/store/fts/TempFtsOpsTest.scala index fdb06b2b..83d5006a 100644 --- a/modules/store/src/test/scala/docspell/store/fts/TempFtsOpsTest.scala +++ b/modules/store/src/test/scala/docspell/store/fts/TempFtsOpsTest.scala @@ -27,7 +27,7 @@ import doobie._ class TempFtsOpsTest extends DatabaseTest { private[this] val logger = docspell.logging.getLogger[IO] - override def munitFixtures = postgresAll ++ mariaDbAll ++ h2All + override def munitFixtures = postgresAll ++ mariaDbAll ++ h2Memory def id(str: String): Ident = Ident.unsafe(str)