mirror of
https://github.com/TheAnachronism/docspell.git
synced 2025-01-26 00:18:26 +00:00
Fix next-job query to do round-robin through job groups
This commit is contained in:
parent
988367a281
commit
ce6f53cc29
@ -8,6 +8,9 @@ val elmCompileMode = settingKey[ElmCompileMode]("How to compile elm sources")
|
||||
|
||||
// --- Settings
|
||||
|
||||
def inTest(d0: Seq[ModuleID], ds: Seq[ModuleID]*) =
|
||||
ds.fold(d0)(_ ++ _).map(_ % Test)
|
||||
|
||||
val scalafixSettings = Seq(
|
||||
semanticdbEnabled := true, // enable SemanticDB
|
||||
semanticdbVersion := scalafixSemanticdb.revision, //"4.4.0"
|
||||
@ -45,7 +48,7 @@ val sharedSettings = Seq(
|
||||
) ++ scalafixSettings
|
||||
|
||||
val testSettingsMUnit = Seq(
|
||||
libraryDependencies ++= Dependencies.munit.map(_ % Test),
|
||||
libraryDependencies ++= inTest(Dependencies.munit, Dependencies.logging),
|
||||
testFrameworks += new TestFramework("munit.Framework")
|
||||
)
|
||||
|
||||
|
@ -60,8 +60,11 @@ object QJob {
|
||||
store.transact(for {
|
||||
n <- RJob.setScheduled(job.id, worker)
|
||||
_ <-
|
||||
if (n == 1) RJobGroupUse.setGroup(RJobGroupUse(worker, job.group))
|
||||
if (n == 1) RJobGroupUse.setGroup(RJobGroupUse(job.group, worker))
|
||||
else 0.pure[ConnectionIO]
|
||||
_ <- logger.fdebug[ConnectionIO](
|
||||
s"Scheduled job ${job.info} to worker ${worker.id}"
|
||||
)
|
||||
} yield if (n == 1) Right(job) else Left(()))
|
||||
|
||||
for {
|
||||
@ -98,22 +101,42 @@ object QJob {
|
||||
val stateCond =
|
||||
JC.state === JobState.waiting || (JC.state === JobState.stuck && stuckTrigger < now.toMillis)
|
||||
|
||||
object AllGroups extends TableDef {
|
||||
val tableName = "allgroups"
|
||||
val alias = Some("ag")
|
||||
|
||||
val group: Column[Ident] = JC.group.copy(table = this)
|
||||
|
||||
val selectAll = Select(JC.group.s, from(JC), stateCond).distinct
|
||||
}
|
||||
|
||||
val sql1 =
|
||||
Select(
|
||||
max(JC.group).as("g"),
|
||||
from(JC).innerJoin(G, JC.group === G.group),
|
||||
G.worker === worker && stateCond
|
||||
select(min(AllGroups.group).as("g"), lit("0 as n")),
|
||||
from(AllGroups),
|
||||
AllGroups.group > Select(G.group.s, from(G), G.worker === worker)
|
||||
)
|
||||
|
||||
val sql2 =
|
||||
Select(min(JC.group).as("g"), from(JC), stateCond)
|
||||
Select(
|
||||
select(min(AllGroups.group).as("g"), lit("1 as n")),
|
||||
from(AllGroups)
|
||||
)
|
||||
|
||||
val gcol = Column[String]("g", TableDef(""))
|
||||
val gnum = Column[Int]("n", TableDef(""))
|
||||
val groups =
|
||||
Select(select(gcol), from(union(sql1, sql2), "t0"), gcol.isNull.negate)
|
||||
withCte(AllGroups -> AllGroups.selectAll)
|
||||
.select(Select(gcol.s, from(union(sql1, sql2), "t0"), gcol.isNull.negate))
|
||||
.orderBy(gnum.asc)
|
||||
.limit(1)
|
||||
|
||||
// either 0, one or two results, but may be empty if RJob table is empty
|
||||
groups.build.query[Ident].to[List].map(_.headOption)
|
||||
val frag = groups.build
|
||||
logger.trace(
|
||||
s"nextGroupQuery: $frag (now=${now.toMillis}, pause=${initialPause.millis})"
|
||||
)
|
||||
|
||||
frag.query[Ident].option
|
||||
}
|
||||
|
||||
private def stuckTriggerValue(t: RJob.Table, initialPause: Duration, now: Timestamp) =
|
||||
|
@ -36,4 +36,7 @@ object RJobGroupUse {
|
||||
|
||||
def findGroup(workerId: Ident): ConnectionIO[Option[Ident]] =
|
||||
run(select(T.group), from(T), T.worker === workerId).query[Ident].option
|
||||
|
||||
def deleteAll: ConnectionIO[Int] =
|
||||
DML.delete(T, T.group.isNotNull)
|
||||
}
|
||||
|
14
modules/store/src/test/resources/logback-test.xml
Normal file
14
modules/store/src/test/resources/logback-test.xml
Normal file
@ -0,0 +1,14 @@
|
||||
<configuration>
|
||||
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
|
||||
<withJansi>true</withJansi>
|
||||
|
||||
<encoder>
|
||||
<pattern>%highlight(%-5level) %cyan(%logger{15}) - %msg %n</pattern>
|
||||
</encoder>
|
||||
</appender>
|
||||
|
||||
<logger name="docspell" level="warn" />
|
||||
<root level="error">
|
||||
<appender-ref ref="STDOUT" />
|
||||
</root>
|
||||
</configuration>
|
@ -1,27 +1,33 @@
|
||||
package docspell.store
|
||||
|
||||
import cats.effect._
|
||||
import cats.effect.unsafe.implicits.global
|
||||
|
||||
import docspell.common.LenientUri
|
||||
import docspell.store.impl.StoreImpl
|
||||
|
||||
import doobie._
|
||||
import munit._
|
||||
import org.h2.jdbcx.JdbcConnectionPool
|
||||
|
||||
trait StoreFixture {
|
||||
def withStore(db: String)(code: Store[IO] => IO[Unit]): Unit = {
|
||||
//StoreFixture.store(StoreFixture.memoryDB(db)).use(code).unsafeRunSync()
|
||||
val jdbc = StoreFixture.memoryDB(db)
|
||||
val xa = StoreFixture.globalXA(jdbc)
|
||||
val store = new StoreImpl[IO](jdbc, xa)
|
||||
store.migrate.unsafeRunSync()
|
||||
code(store).unsafeRunSync()
|
||||
trait StoreFixture extends CatsEffectFunFixtures { self: CatsEffectSuite =>
|
||||
|
||||
val xa = ResourceFixture {
|
||||
val cfg = StoreFixture.memoryDB("test")
|
||||
for {
|
||||
xa <- StoreFixture.makeXA(cfg)
|
||||
store = new StoreImpl[IO](cfg, xa)
|
||||
_ <- Resource.eval(store.migrate)
|
||||
} yield xa
|
||||
}
|
||||
|
||||
def withXA(db: String)(code: Transactor[IO] => IO[Unit]): Unit =
|
||||
StoreFixture.makeXA(StoreFixture.memoryDB(db)).use(code).unsafeRunSync()
|
||||
|
||||
val store = ResourceFixture {
|
||||
val cfg = StoreFixture.memoryDB("test")
|
||||
for {
|
||||
xa <- StoreFixture.makeXA(cfg)
|
||||
store = new StoreImpl[IO](cfg, xa)
|
||||
_ <- Resource.eval(store.migrate)
|
||||
} yield store
|
||||
}
|
||||
}
|
||||
|
||||
object StoreFixture {
|
||||
|
@ -0,0 +1,88 @@
|
||||
package docspell.store.queries
|
||||
|
||||
import java.time.Instant
|
||||
import java.util.concurrent.atomic.AtomicLong
|
||||
|
||||
import cats.implicits._
|
||||
|
||||
import docspell.common._
|
||||
import docspell.store.StoreFixture
|
||||
import docspell.store.records.RJob
|
||||
import docspell.store.records.RJobGroupUse
|
||||
|
||||
import doobie.implicits._
|
||||
import munit._
|
||||
|
||||
class QJobTest extends CatsEffectSuite with StoreFixture {
|
||||
private[this] val c = new AtomicLong(0)
|
||||
|
||||
private val worker = Ident.unsafe("joex1")
|
||||
private val initialPause = Duration.seconds(5)
|
||||
private val nowTs = Timestamp(Instant.parse("2021-06-26T14:54:00Z"))
|
||||
private val group1 = Ident.unsafe("group1")
|
||||
private val group2 = Ident.unsafe("group2")
|
||||
|
||||
def createJob(group: Ident): RJob =
|
||||
RJob.newJob[Unit](
|
||||
Ident.unsafe(s"job-${c.incrementAndGet()}"),
|
||||
Ident.unsafe("task"),
|
||||
group,
|
||||
(),
|
||||
"some subject",
|
||||
nowTs - Duration.days(3),
|
||||
Ident.unsafe("user1"),
|
||||
Priority.Low,
|
||||
None
|
||||
)
|
||||
|
||||
xa.test("set group must insert or update") { tx =>
|
||||
val res =
|
||||
for {
|
||||
_ <- RJobGroupUse.setGroup(RJobGroupUse(group1, worker)).transact(tx)
|
||||
res <- RJobGroupUse.findGroup(worker).transact(tx)
|
||||
} yield res
|
||||
|
||||
res.assertEquals(Some(group1))
|
||||
}
|
||||
|
||||
xa.test("selectNextGroup should return first group on initial state") { tx =>
|
||||
val nextGroup = for {
|
||||
_ <- List(group1, group2, group1, group2, group2)
|
||||
.map(createJob)
|
||||
.map(RJob.insert)
|
||||
.traverse(_.transact(tx))
|
||||
_ <- RJobGroupUse.deleteAll.transact(tx)
|
||||
next <- QJob.selectNextGroup(worker, nowTs, initialPause).transact(tx)
|
||||
} yield next
|
||||
|
||||
nextGroup.assertEquals(Some(group1))
|
||||
}
|
||||
|
||||
xa.test("selectNextGroup should return second group on subsequent call (1)") { tx =>
|
||||
val nextGroup = for {
|
||||
_ <- List(group1, group2, group1, group2)
|
||||
.map(createJob)
|
||||
.map(RJob.insert)
|
||||
.traverse(_.transact(tx))
|
||||
_ <- RJobGroupUse.deleteAll.transact(tx)
|
||||
_ <- RJobGroupUse.setGroup(RJobGroupUse(group1, worker)).transact(tx)
|
||||
next <- QJob.selectNextGroup(worker, nowTs, initialPause).transact(tx)
|
||||
} yield next
|
||||
|
||||
nextGroup.assertEquals(Some(group2))
|
||||
}
|
||||
|
||||
xa.test("selectNextGroup should return second group on subsequent call (2)") { tx =>
|
||||
val nextGroup = for {
|
||||
_ <- List(group1, group2, group1, group2)
|
||||
.map(createJob)
|
||||
.map(RJob.insert)
|
||||
.traverse(_.transact(tx))
|
||||
_ <- RJobGroupUse.deleteAll.transact(tx)
|
||||
_ <- RJobGroupUse.setGroup(RJobGroupUse(group2, worker)).transact(tx)
|
||||
next <- QJob.selectNextGroup(worker, nowTs, initialPause).transact(tx)
|
||||
} yield next
|
||||
|
||||
nextGroup.assertEquals(Some(group1))
|
||||
}
|
||||
}
|
@ -29,6 +29,7 @@ object Dependencies {
|
||||
val LogbackVersion = "1.2.3"
|
||||
val MariaDbVersion = "2.7.3"
|
||||
val MUnitVersion = "0.7.26"
|
||||
val MUnitCatsEffectVersion = "1.0.5"
|
||||
val OrganizeImportsVersion = "0.5.0"
|
||||
val PdfboxVersion = "2.0.24"
|
||||
val PoiVersion = "4.1.2"
|
||||
@ -265,8 +266,9 @@ object Dependencies {
|
||||
)
|
||||
|
||||
val munit = Seq(
|
||||
"org.scalameta" %% "munit" % MUnitVersion,
|
||||
"org.scalameta" %% "munit-scalacheck" % MUnitVersion
|
||||
"org.scalameta" %% "munit" % MUnitVersion,
|
||||
"org.scalameta" %% "munit-scalacheck" % MUnitVersion,
|
||||
"org.typelevel" %% "munit-cats-effect-3" % MUnitCatsEffectVersion
|
||||
)
|
||||
|
||||
val kindProjectorPlugin = "org.typelevel" %% "kind-projector" % KindProjectorVersion
|
||||
|
Loading…
Reference in New Issue
Block a user