mirror of
https://github.com/TheAnachronism/docspell.git
synced 2025-06-22 02:18:26 +00:00
Fix job query for H2
Unfortunately, the new h2 version has some regressions related to CTEs. The query selecting the next group failed only for H2 after the update. The query has been rewritten to not use union on CTE tables. The weird thing was that the error only occured using bind values and was not reproducible with "just string" SQL in the h2 console. The QJobTest are now running on all databases.
This commit is contained in:
@ -9,13 +9,11 @@ package docspell.scheduler.impl
|
|||||||
import cats.effect.Async
|
import cats.effect.Async
|
||||||
import cats.implicits._
|
import cats.implicits._
|
||||||
import fs2.Stream
|
import fs2.Stream
|
||||||
|
|
||||||
import docspell.common._
|
import docspell.common._
|
||||||
import docspell.store.Store
|
import docspell.store.Store
|
||||||
import docspell.store.qb.DSL._
|
import docspell.store.qb.DSL._
|
||||||
import docspell.store.qb._
|
import docspell.store.qb._
|
||||||
import docspell.store.records.{RJob, RJobGroupUse}
|
import docspell.store.records.{RJob, RJobGroupUse}
|
||||||
|
|
||||||
import doobie.ConnectionIO
|
import doobie.ConnectionIO
|
||||||
|
|
||||||
object QJob {
|
object QJob {
|
||||||
@ -89,7 +87,7 @@ object QJob {
|
|||||||
res <- job.traverse(j => markJob(j))
|
res <- job.traverse(j => markJob(j))
|
||||||
} yield res.map(_.map(_.some)).getOrElse {
|
} yield res.map(_.map(_.some)).getOrElse {
|
||||||
if (group.isDefined)
|
if (group.isDefined)
|
||||||
Left(()) // if a group was found, but no job someone else was faster
|
Left(()) // if a group was found but no job, someone else was faster
|
||||||
else Right(None)
|
else Right(None)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -115,33 +113,27 @@ object QJob {
|
|||||||
val selectAll = Select(JC.group.s, from(JC), stateCond).distinct
|
val selectAll = Select(JC.group.s, from(JC), stateCond).distinct
|
||||||
}
|
}
|
||||||
|
|
||||||
val sql1 =
|
|
||||||
Select(
|
|
||||||
select(min(AllGroups.group).as("g"), lit("0 as n")),
|
|
||||||
from(AllGroups),
|
|
||||||
AllGroups.group > Select(G.group.s, from(G), G.worker === worker)
|
|
||||||
)
|
|
||||||
|
|
||||||
val sql2 =
|
|
||||||
Select(
|
|
||||||
select(min(AllGroups.group).as("g"), lit("1 as n")),
|
|
||||||
from(AllGroups)
|
|
||||||
)
|
|
||||||
|
|
||||||
val gcol = Column[String]("g", TableDef(""))
|
|
||||||
val gnum = Column[Int]("n", TableDef(""))
|
|
||||||
val groups =
|
val groups =
|
||||||
withCte(AllGroups -> AllGroups.selectAll)
|
withCte(AllGroups -> AllGroups.selectAll)
|
||||||
.select(Select(gcol.s, from(union(sql1, sql2), "t0"), gcol.isNull.negate))
|
.select(
|
||||||
.orderBy(gnum.asc)
|
Select(
|
||||||
.limit(1)
|
coalesce(
|
||||||
|
Select(
|
||||||
|
select(min(AllGroups.group)),
|
||||||
|
from(AllGroups),
|
||||||
|
AllGroups.group > Select(G.group.s, from(G), G.worker === worker)
|
||||||
|
).asSubSelect,
|
||||||
|
Select(select(min(AllGroups.group)), from(AllGroups)).asSubSelect
|
||||||
|
).s
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
val frag = groups.build
|
val frag = groups.build
|
||||||
cioLogger.trace(
|
cioLogger
|
||||||
s"nextGroupQuery: $frag (now=${now.toMillis}, pause=${initialPause.millis})"
|
.trace(
|
||||||
)
|
s"nextGroupQuery: $frag (now=${now.toMillis}, pause=${initialPause.millis})"
|
||||||
|
) *>
|
||||||
frag.query[Ident].option
|
groups.build.query[Ident].option
|
||||||
}
|
}
|
||||||
|
|
||||||
private def stuckTriggerValue(t: RJob.Table, initialPause: Duration, now: Timestamp) =
|
private def stuckTriggerValue(t: RJob.Table, initialPause: Duration, now: Timestamp) =
|
||||||
|
@ -8,18 +8,12 @@ package docspell.scheduler.impl
|
|||||||
|
|
||||||
import java.time.Instant
|
import java.time.Instant
|
||||||
import java.util.concurrent.atomic.AtomicLong
|
import java.util.concurrent.atomic.AtomicLong
|
||||||
|
import cats.syntax.all._
|
||||||
import cats.implicits._
|
|
||||||
|
|
||||||
import docspell.common._
|
import docspell.common._
|
||||||
import docspell.logging.TestLoggingConfig
|
import docspell.store.{DatabaseTest, Db}
|
||||||
import docspell.store.StoreFixture
|
|
||||||
import docspell.store.records.{RJob, RJobGroupUse}
|
import docspell.store.records.{RJob, RJobGroupUse}
|
||||||
|
|
||||||
import doobie.implicits._
|
class QJobTest extends DatabaseTest {
|
||||||
import munit._
|
|
||||||
|
|
||||||
class QJobTest extends CatsEffectSuite with StoreFixture with TestLoggingConfig {
|
|
||||||
private[this] val c = new AtomicLong(0)
|
private[this] val c = new AtomicLong(0)
|
||||||
|
|
||||||
private val worker = Ident.unsafe("joex1")
|
private val worker = Ident.unsafe("joex1")
|
||||||
@ -28,6 +22,11 @@ class QJobTest extends CatsEffectSuite with StoreFixture with TestLoggingConfig
|
|||||||
private val group1 = Ident.unsafe("group1")
|
private val group1 = Ident.unsafe("group1")
|
||||||
private val group2 = Ident.unsafe("group2")
|
private val group2 = Ident.unsafe("group2")
|
||||||
|
|
||||||
|
override def munitFixtures = h2File ++ mariaDbAll ++ postgresAll
|
||||||
|
|
||||||
|
def createStore(dbms: Db) =
|
||||||
|
dbms.fold(pgStore(), mariaStore(), h2FileStore())
|
||||||
|
|
||||||
def createJob(group: Ident): RJob =
|
def createJob(group: Ident): RJob =
|
||||||
RJob.fromJson[Unit](
|
RJob.fromJson[Unit](
|
||||||
Ident.unsafe(s"job-${c.incrementAndGet()}"),
|
Ident.unsafe(s"job-${c.incrementAndGet()}"),
|
||||||
@ -41,54 +40,66 @@ class QJobTest extends CatsEffectSuite with StoreFixture with TestLoggingConfig
|
|||||||
None
|
None
|
||||||
)
|
)
|
||||||
|
|
||||||
xa.test("set group must insert or update") { tx =>
|
Db.all.toList.foreach { db =>
|
||||||
val res =
|
test(s"set group must insert or update ($db)") {
|
||||||
for {
|
val store = createStore(db)
|
||||||
_ <- RJobGroupUse.setGroup(RJobGroupUse(group1, worker)).transact(tx)
|
val res =
|
||||||
res <- RJobGroupUse.findGroup(worker).transact(tx)
|
for {
|
||||||
} yield res
|
_ <- store.transact(RJobGroupUse.setGroup(RJobGroupUse(group1, worker)))
|
||||||
|
res <- store.transact(RJobGroupUse.findGroup(worker))
|
||||||
|
} yield res
|
||||||
|
|
||||||
res.assertEquals(Some(group1))
|
res.assertEquals(Some(group1))
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
xa.test("selectNextGroup should return first group on initial state") { tx =>
|
Db.all.toList.foreach { db =>
|
||||||
val nextGroup = for {
|
test(s"selectNextGroup should return first group on initial state ($db)") {
|
||||||
_ <- List(group1, group2, group1, group2, group2)
|
val store = createStore(db)
|
||||||
.map(createJob)
|
val nextGroup = for {
|
||||||
.map(RJob.insert)
|
_ <- List(group1, group2, group1, group2, group2)
|
||||||
.traverse(_.transact(tx))
|
.map(createJob)
|
||||||
_ <- RJobGroupUse.deleteAll.transact(tx)
|
.map(RJob.insert)
|
||||||
next <- QJob.selectNextGroup(worker, nowTs, initialPause).transact(tx)
|
.traverse_(store.transact(_))
|
||||||
} yield next
|
_ <- store.transact(RJobGroupUse.deleteAll)
|
||||||
|
next <- store.transact(QJob.selectNextGroup(worker, nowTs, initialPause))
|
||||||
|
} yield next
|
||||||
|
|
||||||
nextGroup.assertEquals(Some(group1))
|
nextGroup.assertEquals(Some(group1))
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
xa.test("selectNextGroup should return second group on subsequent call (1)") { tx =>
|
Db.all.toList.foreach { db =>
|
||||||
val nextGroup = for {
|
test(s"selectNextGroup should return second group on subsequent call ($db)") {
|
||||||
_ <- List(group1, group2, group1, group2)
|
val store = createStore(db)
|
||||||
.map(createJob)
|
val nextGroup = for {
|
||||||
.map(RJob.insert)
|
_ <- List(group1, group2, group1, group2)
|
||||||
.traverse(_.transact(tx))
|
.map(createJob)
|
||||||
_ <- RJobGroupUse.deleteAll.transact(tx)
|
.map(RJob.insert)
|
||||||
_ <- RJobGroupUse.setGroup(RJobGroupUse(group1, worker)).transact(tx)
|
.traverse_(store.transact(_))
|
||||||
next <- QJob.selectNextGroup(worker, nowTs, initialPause).transact(tx)
|
_ <- store.transact(RJobGroupUse.deleteAll)
|
||||||
} yield next
|
_ <- store.transact(RJobGroupUse.setGroup(RJobGroupUse(group1, worker)))
|
||||||
|
next <- store.transact(QJob.selectNextGroup(worker, nowTs, initialPause))
|
||||||
|
} yield next
|
||||||
|
|
||||||
nextGroup.assertEquals(Some(group2))
|
nextGroup.assertEquals(Some(group2))
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
xa.test("selectNextGroup should return second group on subsequent call (2)") { tx =>
|
Db.all.toList.foreach { db =>
|
||||||
val nextGroup = for {
|
test(s"selectNextGroup should return first group on subsequent call ($db)") {
|
||||||
_ <- List(group1, group2, group1, group2)
|
val store = createStore(db)
|
||||||
.map(createJob)
|
val nextGroup = for {
|
||||||
.map(RJob.insert)
|
_ <- List(group1, group2, group1, group2)
|
||||||
.traverse(_.transact(tx))
|
.map(createJob)
|
||||||
_ <- RJobGroupUse.deleteAll.transact(tx)
|
.map(RJob.insert)
|
||||||
_ <- RJobGroupUse.setGroup(RJobGroupUse(group2, worker)).transact(tx)
|
.traverse_(store.transact(_))
|
||||||
next <- QJob.selectNextGroup(worker, nowTs, initialPause).transact(tx)
|
_ <- store.transact(RJobGroupUse.deleteAll)
|
||||||
} yield next
|
_ <- store.transact(RJobGroupUse.setGroup(RJobGroupUse(group2, worker)))
|
||||||
|
next <- store.transact(QJob.selectNextGroup(worker, nowTs, initialPause))
|
||||||
|
} yield next
|
||||||
|
|
||||||
nextGroup.assertEquals(Some(group1))
|
nextGroup.assertEquals(Some(group1))
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -300,11 +300,6 @@ object RJob {
|
|||||||
def setProgress(jobId: Ident, perc: Int): ConnectionIO[Int] =
|
def setProgress(jobId: Ident, perc: Int): ConnectionIO[Int] =
|
||||||
DML.update(T, T.id === jobId, DML.set(T.progress.setTo(perc)))
|
DML.update(T, T.id === jobId, DML.set(T.progress.setTo(perc)))
|
||||||
|
|
||||||
def selectWaiting: ConnectionIO[Option[RJob]] = {
|
|
||||||
val sql = run(select(T.all), from(T), T.state === JobState.waiting)
|
|
||||||
sql.query[RJob].to[Vector].map(_.headOption)
|
|
||||||
}
|
|
||||||
|
|
||||||
def selectGroupInState(states: NonEmptyList[JobState]): ConnectionIO[Vector[Ident]] = {
|
def selectGroupInState(states: NonEmptyList[JobState]): ConnectionIO[Vector[Ident]] = {
|
||||||
val sql =
|
val sql =
|
||||||
Select(select(T.group), from(T), T.state.in(states)).orderBy(T.group)
|
Select(select(T.group), from(T), T.state.in(states)).orderBy(T.group)
|
||||||
|
@ -7,12 +7,10 @@
|
|||||||
package docspell.store
|
package docspell.store
|
||||||
|
|
||||||
import java.util.UUID
|
import java.util.UUID
|
||||||
|
|
||||||
import cats.effect._
|
import cats.effect._
|
||||||
|
import cats.syntax.option._
|
||||||
import docspell.common._
|
import docspell.common._
|
||||||
import docspell.logging.TestLoggingConfig
|
import docspell.logging.TestLoggingConfig
|
||||||
|
|
||||||
import com.dimafeng.testcontainers.munit.fixtures.TestContainersFixtures
|
import com.dimafeng.testcontainers.munit.fixtures.TestContainersFixtures
|
||||||
import com.dimafeng.testcontainers.{
|
import com.dimafeng.testcontainers.{
|
||||||
JdbcDatabaseContainer,
|
JdbcDatabaseContainer,
|
||||||
@ -20,6 +18,7 @@ import com.dimafeng.testcontainers.{
|
|||||||
PostgreSQLContainer
|
PostgreSQLContainer
|
||||||
}
|
}
|
||||||
import doobie._
|
import doobie._
|
||||||
|
import fs2.io.file.{Files, Path}
|
||||||
import munit.CatsEffectSuite
|
import munit.CatsEffectSuite
|
||||||
import org.testcontainers.utility.DockerImageName
|
import org.testcontainers.utility.DockerImageName
|
||||||
|
|
||||||
@ -55,6 +54,15 @@ trait DatabaseTest
|
|||||||
}
|
}
|
||||||
)
|
)
|
||||||
|
|
||||||
|
lazy val h2FileDataSource = ResourceSuiteLocalFixture(
|
||||||
|
"h2FileDataSource",
|
||||||
|
for {
|
||||||
|
file <- Files[IO].tempFile(Path("target").some, "h2-test-", ".db", None)
|
||||||
|
jdbc = StoreFixture.fileDB(file)
|
||||||
|
res <- StoreFixture.dataSource(jdbc).map(ds => (jdbc, ds))
|
||||||
|
} yield res
|
||||||
|
)
|
||||||
|
|
||||||
lazy val newH2DataSource = ResourceFixture(for {
|
lazy val newH2DataSource = ResourceFixture(for {
|
||||||
jdbc <- Resource.eval(IO(StoreFixture.memoryDB(UUID.randomUUID().toString)))
|
jdbc <- Resource.eval(IO(StoreFixture.memoryDB(UUID.randomUUID().toString)))
|
||||||
ds <- StoreFixture.dataSource(jdbc)
|
ds <- StoreFixture.dataSource(jdbc)
|
||||||
@ -84,9 +92,18 @@ trait DatabaseTest
|
|||||||
} yield store
|
} yield store
|
||||||
)
|
)
|
||||||
|
|
||||||
|
lazy val h2FileStore = ResourceSuiteLocalFixture(
|
||||||
|
"h2FileStore",
|
||||||
|
for {
|
||||||
|
t <- Resource.eval(IO(h2FileDataSource()))
|
||||||
|
store <- StoreFixture.store(t._2, t._1)
|
||||||
|
} yield store
|
||||||
|
)
|
||||||
|
|
||||||
def postgresAll = List(postgresCnt, pgDataSource, pgStore)
|
def postgresAll = List(postgresCnt, pgDataSource, pgStore)
|
||||||
def mariaDbAll = List(mariadbCnt, mariaDataSource, mariaStore)
|
def mariaDbAll = List(mariadbCnt, mariaDataSource, mariaStore)
|
||||||
def h2All = List(h2DataSource, h2Store)
|
def h2Memory = List(h2DataSource, h2Store)
|
||||||
|
def h2File = List(h2FileDataSource, h2FileStore)
|
||||||
}
|
}
|
||||||
|
|
||||||
object DatabaseTest {
|
object DatabaseTest {
|
||||||
|
@ -7,15 +7,13 @@
|
|||||||
package docspell.store
|
package docspell.store
|
||||||
|
|
||||||
import javax.sql.DataSource
|
import javax.sql.DataSource
|
||||||
|
|
||||||
import cats.effect._
|
import cats.effect._
|
||||||
|
|
||||||
import docspell.common.LenientUri
|
import docspell.common.LenientUri
|
||||||
import docspell.store.file.{FileRepository, FileRepositoryConfig}
|
import docspell.store.file.{FileRepository, FileRepositoryConfig}
|
||||||
import docspell.store.impl.StoreImpl
|
import docspell.store.impl.StoreImpl
|
||||||
import docspell.store.migrate.FlywayMigrate
|
import docspell.store.migrate.FlywayMigrate
|
||||||
|
|
||||||
import doobie._
|
import doobie._
|
||||||
|
import fs2.io.file.Path
|
||||||
import munit._
|
import munit._
|
||||||
import org.h2.jdbcx.{JdbcConnectionPool, JdbcDataSource}
|
import org.h2.jdbcx.{JdbcConnectionPool, JdbcDataSource}
|
||||||
import org.mariadb.jdbc.MariaDbDataSource
|
import org.mariadb.jdbc.MariaDbDataSource
|
||||||
@ -55,6 +53,15 @@ object StoreFixture {
|
|||||||
""
|
""
|
||||||
)
|
)
|
||||||
|
|
||||||
|
def fileDB(file: Path): JdbcConfig =
|
||||||
|
JdbcConfig(
|
||||||
|
LenientUri.unsafe(
|
||||||
|
s"jdbc:h2:file://${file.absolute.toString};MODE=PostgreSQL;DATABASE_TO_LOWER=TRUE"
|
||||||
|
),
|
||||||
|
"sa",
|
||||||
|
""
|
||||||
|
)
|
||||||
|
|
||||||
def dataSource(jdbc: JdbcConfig): Resource[IO, JdbcConnectionPool] = {
|
def dataSource(jdbc: JdbcConfig): Resource[IO, JdbcConnectionPool] = {
|
||||||
def jdbcConnPool =
|
def jdbcConnPool =
|
||||||
jdbc.dbms match {
|
jdbc.dbms match {
|
||||||
@ -115,5 +122,4 @@ object StoreFixture {
|
|||||||
case None =>
|
case None =>
|
||||||
IO.raiseError(new Exception(s"Resource not found: $resourceName"))
|
IO.raiseError(new Exception(s"Resource not found: $resourceName"))
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -27,7 +27,7 @@ import doobie._
|
|||||||
class TempFtsOpsTest extends DatabaseTest {
|
class TempFtsOpsTest extends DatabaseTest {
|
||||||
private[this] val logger = docspell.logging.getLogger[IO]
|
private[this] val logger = docspell.logging.getLogger[IO]
|
||||||
|
|
||||||
override def munitFixtures = postgresAll ++ mariaDbAll ++ h2All
|
override def munitFixtures = postgresAll ++ mariaDbAll ++ h2Memory
|
||||||
|
|
||||||
def id(str: String): Ident = Ident.unsafe(str)
|
def id(str: String): Ident = Ident.unsafe(str)
|
||||||
|
|
||||||
|
Reference in New Issue
Block a user