diff --git a/modules/scheduler/impl/src/main/scala/docspell/scheduler/impl/QJob.scala b/modules/scheduler/impl/src/main/scala/docspell/scheduler/impl/QJob.scala index 91c37804..e56f88d0 100644 --- a/modules/scheduler/impl/src/main/scala/docspell/scheduler/impl/QJob.scala +++ b/modules/scheduler/impl/src/main/scala/docspell/scheduler/impl/QJob.scala @@ -9,13 +9,11 @@ package docspell.scheduler.impl import cats.effect.Async import cats.implicits._ import fs2.Stream - import docspell.common._ import docspell.store.Store import docspell.store.qb.DSL._ import docspell.store.qb._ import docspell.store.records.{RJob, RJobGroupUse} - import doobie.ConnectionIO object QJob { @@ -89,7 +87,7 @@ object QJob { res <- job.traverse(j => markJob(j)) } yield res.map(_.map(_.some)).getOrElse { if (group.isDefined) - Left(()) // if a group was found, but no job someone else was faster + Left(()) // if a group was found but no job, someone else was faster else Right(None) } } @@ -115,33 +113,27 @@ object QJob { val selectAll = Select(JC.group.s, from(JC), stateCond).distinct } - val sql1 = - Select( - select(min(AllGroups.group).as("g"), lit("0 as n")), - from(AllGroups), - AllGroups.group > Select(G.group.s, from(G), G.worker === worker) - ) - - val sql2 = - Select( - select(min(AllGroups.group).as("g"), lit("1 as n")), - from(AllGroups) - ) - - val gcol = Column[String]("g", TableDef("")) - val gnum = Column[Int]("n", TableDef("")) val groups = withCte(AllGroups -> AllGroups.selectAll) - .select(Select(gcol.s, from(union(sql1, sql2), "t0"), gcol.isNull.negate)) - .orderBy(gnum.asc) - .limit(1) + .select( + Select( + coalesce( + Select( + select(min(AllGroups.group)), + from(AllGroups), + AllGroups.group > Select(G.group.s, from(G), G.worker === worker) + ).asSubSelect, + Select(select(min(AllGroups.group)), from(AllGroups)).asSubSelect + ).s + ) + ) val frag = groups.build - cioLogger.trace( - s"nextGroupQuery: $frag (now=${now.toMillis}, pause=${initialPause.millis})" - ) - - frag.query[Ident].option + cioLogger + .trace( + s"nextGroupQuery: $frag (now=${now.toMillis}, pause=${initialPause.millis})" + ) *> + groups.build.query[Ident].option } private def stuckTriggerValue(t: RJob.Table, initialPause: Duration, now: Timestamp) = diff --git a/modules/scheduler/impl/src/test/scala/docspell/scheduler/impl/QJobTest.scala b/modules/scheduler/impl/src/test/scala/docspell/scheduler/impl/QJobTest.scala index 6c78a050..09ee6f1d 100644 --- a/modules/scheduler/impl/src/test/scala/docspell/scheduler/impl/QJobTest.scala +++ b/modules/scheduler/impl/src/test/scala/docspell/scheduler/impl/QJobTest.scala @@ -8,18 +8,12 @@ package docspell.scheduler.impl import java.time.Instant import java.util.concurrent.atomic.AtomicLong - -import cats.implicits._ - +import cats.syntax.all._ import docspell.common._ -import docspell.logging.TestLoggingConfig -import docspell.store.StoreFixture +import docspell.store.{DatabaseTest, Db} import docspell.store.records.{RJob, RJobGroupUse} -import doobie.implicits._ -import munit._ - -class QJobTest extends CatsEffectSuite with StoreFixture with TestLoggingConfig { +class QJobTest extends DatabaseTest { private[this] val c = new AtomicLong(0) private val worker = Ident.unsafe("joex1") @@ -28,6 +22,11 @@ class QJobTest extends CatsEffectSuite with StoreFixture with TestLoggingConfig private val group1 = Ident.unsafe("group1") private val group2 = Ident.unsafe("group2") + override def munitFixtures = h2File ++ mariaDbAll ++ postgresAll + + def createStore(dbms: Db) = + dbms.fold(pgStore(), mariaStore(), h2FileStore()) + def createJob(group: Ident): RJob = RJob.fromJson[Unit]( Ident.unsafe(s"job-${c.incrementAndGet()}"), @@ -41,54 +40,66 @@ class QJobTest extends CatsEffectSuite with StoreFixture with TestLoggingConfig None ) - xa.test("set group must insert or update") { tx => - val res = - for { - _ <- RJobGroupUse.setGroup(RJobGroupUse(group1, worker)).transact(tx) - res <- RJobGroupUse.findGroup(worker).transact(tx) - } yield res + Db.all.toList.foreach { db => + test(s"set group must insert or update ($db)") { + val store = createStore(db) + val res = + for { + _ <- store.transact(RJobGroupUse.setGroup(RJobGroupUse(group1, worker))) + res <- store.transact(RJobGroupUse.findGroup(worker)) + } yield res - res.assertEquals(Some(group1)) + res.assertEquals(Some(group1)) + } } - xa.test("selectNextGroup should return first group on initial state") { tx => - val nextGroup = for { - _ <- List(group1, group2, group1, group2, group2) - .map(createJob) - .map(RJob.insert) - .traverse(_.transact(tx)) - _ <- RJobGroupUse.deleteAll.transact(tx) - next <- QJob.selectNextGroup(worker, nowTs, initialPause).transact(tx) - } yield next + Db.all.toList.foreach { db => + test(s"selectNextGroup should return first group on initial state ($db)") { + val store = createStore(db) + val nextGroup = for { + _ <- List(group1, group2, group1, group2, group2) + .map(createJob) + .map(RJob.insert) + .traverse_(store.transact(_)) + _ <- store.transact(RJobGroupUse.deleteAll) + next <- store.transact(QJob.selectNextGroup(worker, nowTs, initialPause)) + } yield next - nextGroup.assertEquals(Some(group1)) + nextGroup.assertEquals(Some(group1)) + } } - xa.test("selectNextGroup should return second group on subsequent call (1)") { tx => - val nextGroup = for { - _ <- List(group1, group2, group1, group2) - .map(createJob) - .map(RJob.insert) - .traverse(_.transact(tx)) - _ <- RJobGroupUse.deleteAll.transact(tx) - _ <- RJobGroupUse.setGroup(RJobGroupUse(group1, worker)).transact(tx) - next <- QJob.selectNextGroup(worker, nowTs, initialPause).transact(tx) - } yield next + Db.all.toList.foreach { db => + test(s"selectNextGroup should return second group on subsequent call ($db)") { + val store = createStore(db) + val nextGroup = for { + _ <- List(group1, group2, group1, group2) + .map(createJob) + .map(RJob.insert) + .traverse_(store.transact(_)) + _ <- store.transact(RJobGroupUse.deleteAll) + _ <- store.transact(RJobGroupUse.setGroup(RJobGroupUse(group1, worker))) + next <- store.transact(QJob.selectNextGroup(worker, nowTs, initialPause)) + } yield next - nextGroup.assertEquals(Some(group2)) + nextGroup.assertEquals(Some(group2)) + } } - xa.test("selectNextGroup should return second group on subsequent call (2)") { tx => - val nextGroup = for { - _ <- List(group1, group2, group1, group2) - .map(createJob) - .map(RJob.insert) - .traverse(_.transact(tx)) - _ <- RJobGroupUse.deleteAll.transact(tx) - _ <- RJobGroupUse.setGroup(RJobGroupUse(group2, worker)).transact(tx) - next <- QJob.selectNextGroup(worker, nowTs, initialPause).transact(tx) - } yield next + Db.all.toList.foreach { db => + test(s"selectNextGroup should return first group on subsequent call ($db)") { + val store = createStore(db) + val nextGroup = for { + _ <- List(group1, group2, group1, group2) + .map(createJob) + .map(RJob.insert) + .traverse_(store.transact(_)) + _ <- store.transact(RJobGroupUse.deleteAll) + _ <- store.transact(RJobGroupUse.setGroup(RJobGroupUse(group2, worker))) + next <- store.transact(QJob.selectNextGroup(worker, nowTs, initialPause)) + } yield next - nextGroup.assertEquals(Some(group1)) + nextGroup.assertEquals(Some(group1)) + } } } diff --git a/modules/store/src/main/scala/docspell/store/records/RJob.scala b/modules/store/src/main/scala/docspell/store/records/RJob.scala index f8b00e8e..9177df06 100644 --- a/modules/store/src/main/scala/docspell/store/records/RJob.scala +++ b/modules/store/src/main/scala/docspell/store/records/RJob.scala @@ -300,11 +300,6 @@ object RJob { def setProgress(jobId: Ident, perc: Int): ConnectionIO[Int] = DML.update(T, T.id === jobId, DML.set(T.progress.setTo(perc))) - def selectWaiting: ConnectionIO[Option[RJob]] = { - val sql = run(select(T.all), from(T), T.state === JobState.waiting) - sql.query[RJob].to[Vector].map(_.headOption) - } - def selectGroupInState(states: NonEmptyList[JobState]): ConnectionIO[Vector[Ident]] = { val sql = Select(select(T.group), from(T), T.state.in(states)).orderBy(T.group) diff --git a/modules/store/src/test/scala/docspell/store/DatabaseTest.scala b/modules/store/src/test/scala/docspell/store/DatabaseTest.scala index e2e9363e..0eed8ab7 100644 --- a/modules/store/src/test/scala/docspell/store/DatabaseTest.scala +++ b/modules/store/src/test/scala/docspell/store/DatabaseTest.scala @@ -7,12 +7,10 @@ package docspell.store import java.util.UUID - import cats.effect._ - +import cats.syntax.option._ import docspell.common._ import docspell.logging.TestLoggingConfig - import com.dimafeng.testcontainers.munit.fixtures.TestContainersFixtures import com.dimafeng.testcontainers.{ JdbcDatabaseContainer, @@ -20,6 +18,7 @@ import com.dimafeng.testcontainers.{ PostgreSQLContainer } import doobie._ +import fs2.io.file.{Files, Path} import munit.CatsEffectSuite import org.testcontainers.utility.DockerImageName @@ -55,6 +54,15 @@ trait DatabaseTest } ) + lazy val h2FileDataSource = ResourceSuiteLocalFixture( + "h2FileDataSource", + for { + file <- Files[IO].tempFile(Path("target").some, "h2-test-", ".db", None) + jdbc = StoreFixture.fileDB(file) + res <- StoreFixture.dataSource(jdbc).map(ds => (jdbc, ds)) + } yield res + ) + lazy val newH2DataSource = ResourceFixture(for { jdbc <- Resource.eval(IO(StoreFixture.memoryDB(UUID.randomUUID().toString))) ds <- StoreFixture.dataSource(jdbc) @@ -84,9 +92,18 @@ trait DatabaseTest } yield store ) + lazy val h2FileStore = ResourceSuiteLocalFixture( + "h2FileStore", + for { + t <- Resource.eval(IO(h2FileDataSource())) + store <- StoreFixture.store(t._2, t._1) + } yield store + ) + def postgresAll = List(postgresCnt, pgDataSource, pgStore) def mariaDbAll = List(mariadbCnt, mariaDataSource, mariaStore) - def h2All = List(h2DataSource, h2Store) + def h2Memory = List(h2DataSource, h2Store) + def h2File = List(h2FileDataSource, h2FileStore) } object DatabaseTest { diff --git a/modules/store/src/test/scala/docspell/store/StoreFixture.scala b/modules/store/src/test/scala/docspell/store/StoreFixture.scala index d90b8d93..f9631031 100644 --- a/modules/store/src/test/scala/docspell/store/StoreFixture.scala +++ b/modules/store/src/test/scala/docspell/store/StoreFixture.scala @@ -7,15 +7,13 @@ package docspell.store import javax.sql.DataSource - import cats.effect._ - import docspell.common.LenientUri import docspell.store.file.{FileRepository, FileRepositoryConfig} import docspell.store.impl.StoreImpl import docspell.store.migrate.FlywayMigrate - import doobie._ +import fs2.io.file.Path import munit._ import org.h2.jdbcx.{JdbcConnectionPool, JdbcDataSource} import org.mariadb.jdbc.MariaDbDataSource @@ -55,6 +53,15 @@ object StoreFixture { "" ) + def fileDB(file: Path): JdbcConfig = + JdbcConfig( + LenientUri.unsafe( + s"jdbc:h2:file://${file.absolute.toString};MODE=PostgreSQL;DATABASE_TO_LOWER=TRUE" + ), + "sa", + "" + ) + def dataSource(jdbc: JdbcConfig): Resource[IO, JdbcConnectionPool] = { def jdbcConnPool = jdbc.dbms match { @@ -115,5 +122,4 @@ object StoreFixture { case None => IO.raiseError(new Exception(s"Resource not found: $resourceName")) } - } diff --git a/modules/store/src/test/scala/docspell/store/fts/TempFtsOpsTest.scala b/modules/store/src/test/scala/docspell/store/fts/TempFtsOpsTest.scala index fdb06b2b..83d5006a 100644 --- a/modules/store/src/test/scala/docspell/store/fts/TempFtsOpsTest.scala +++ b/modules/store/src/test/scala/docspell/store/fts/TempFtsOpsTest.scala @@ -27,7 +27,7 @@ import doobie._ class TempFtsOpsTest extends DatabaseTest { private[this] val logger = docspell.logging.getLogger[IO] - override def munitFixtures = postgresAll ++ mariaDbAll ++ h2All + override def munitFixtures = postgresAll ++ mariaDbAll ++ h2Memory def id(str: String): Ident = Ident.unsafe(str)