Move scheduler queries into the new module

This commit is contained in:
eikek
2022-03-14 12:41:36 +01:00
parent d12c672dcf
commit 8d5fc7f9da
7 changed files with 80 additions and 60 deletions

View File

@ -11,7 +11,6 @@ import cats.implicits._
import docspell.common._
import docspell.store.Store
import docspell.store.queries.QJob
import docspell.store.records.RJob
trait JobQueue[F[_]] {

View File

@ -0,0 +1,232 @@
/*
* Copyright 2020 Eike K. & Contributors
*
* SPDX-License-Identifier: AGPL-3.0-or-later
*/
package docspell.scheduler.impl
import cats.effect.Async
import cats.implicits._
import fs2.Stream
import docspell.common._
import docspell.store.Store
import docspell.store.qb.DSL._
import docspell.store.qb._
import docspell.store.records.{RJob, RJobGroupUse}
import doobie.ConnectionIO
object QJob {
private[this] val cioLogger = docspell.logging.getLogger[ConnectionIO]
def takeNextJob[F[_]: Async](
store: Store[F]
)(
priority: Ident => F[Priority],
worker: Ident,
retryPause: Duration
): F[Option[RJob]] = {
val logger = docspell.logging.getLogger[F]
Stream
.range(0, 10)
.evalMap(n => takeNextJob1(store)(priority, worker, retryPause, n))
.evalTap { x =>
if (x.isLeft)
logger.debug(
"Cannot mark job, probably due to concurrent updates. Will retry."
)
else ().pure[F]
}
.find(_.isRight)
.flatMap {
case Right(job) =>
Stream.emit(job)
case Left(_) =>
Stream
.eval(logger.warn("Cannot mark job, even after retrying. Give up."))
.map(_ => None)
}
.compile
.last
.map(_.flatten)
}
private def takeNextJob1[F[_]: Async](store: Store[F])(
priority: Ident => F[Priority],
worker: Ident,
retryPause: Duration,
currentTry: Int
): F[Either[Unit, Option[RJob]]] = {
val logger = docspell.logging.getLogger[F]
// if this fails, we have to restart takeNextJob
def markJob(job: RJob): F[Either[Unit, RJob]] =
store.transact(for {
n <- RJob.setScheduled(job.id, worker)
_ <-
if (n == 1) RJobGroupUse.setGroup(RJobGroupUse(job.group, worker))
else 0.pure[ConnectionIO]
_ <- cioLogger.debug(
s"Scheduled job ${job.info} to worker ${worker.id}"
)
} yield if (n == 1) Right(job) else Left(()))
for {
_ <- logger.trace(
s"About to take next job (worker ${worker.id}), try $currentTry"
)
now <- Timestamp.current[F]
group <- store.transact(selectNextGroup(worker, now, retryPause))
_ <- logger.trace(s"Choose group ${group.map(_.id)}")
prio <- group.map(priority).getOrElse((Priority.Low: Priority).pure[F])
_ <- logger.trace(s"Looking for job of prio $prio")
job <-
group
.map(g => store.transact(selectNextJob(g, prio, retryPause, now)))
.getOrElse((None: Option[RJob]).pure[F])
_ <- logger.trace(s"Found job: ${job.map(_.info)}")
res <- job.traverse(j => markJob(j))
} yield res.map(_.map(_.some)).getOrElse {
if (group.isDefined)
Left(()) // if a group was found, but no job someone else was faster
else Right(None)
}
}
def selectNextGroup(
worker: Ident,
now: Timestamp,
initialPause: Duration
): ConnectionIO[Option[Ident]] = {
val JC = RJob.as("a")
val G = RJobGroupUse.as("b")
val stuckTrigger = stuckTriggerValue(JC, initialPause, now)
val stateCond =
JC.state === JobState.waiting || (JC.state === JobState.stuck && stuckTrigger < now.toMillis)
object AllGroups extends TableDef {
val tableName = "allgroups"
val alias = Some("ag")
val group: Column[Ident] = JC.group.copy(table = this)
val selectAll = Select(JC.group.s, from(JC), stateCond).distinct
}
val sql1 =
Select(
select(min(AllGroups.group).as("g"), lit("0 as n")),
from(AllGroups),
AllGroups.group > Select(G.group.s, from(G), G.worker === worker)
)
val sql2 =
Select(
select(min(AllGroups.group).as("g"), lit("1 as n")),
from(AllGroups)
)
val gcol = Column[String]("g", TableDef(""))
val gnum = Column[Int]("n", TableDef(""))
val groups =
withCte(AllGroups -> AllGroups.selectAll)
.select(Select(gcol.s, from(union(sql1, sql2), "t0"), gcol.isNull.negate))
.orderBy(gnum.asc)
.limit(1)
val frag = groups.build
cioLogger.trace(
s"nextGroupQuery: $frag (now=${now.toMillis}, pause=${initialPause.millis})"
)
frag.query[Ident].option
}
private def stuckTriggerValue(t: RJob.Table, initialPause: Duration, now: Timestamp) =
plus(
coalesce(t.startedmillis.s, const(now.toMillis)).s,
mult(power(2, t.retries.s).s, const(initialPause.millis)).s
)
def selectNextJob(
group: Ident,
prio: Priority,
initialPause: Duration,
now: Timestamp
): ConnectionIO[Option[RJob]] = {
val JC = RJob.T
val psort =
if (prio == Priority.High) JC.priority.desc
else JC.priority.asc
val waiting = JobState.waiting
val stuck = JobState.stuck
val stuckTrigger = stuckTriggerValue(JC, initialPause, now)
val sql =
Select(
select(JC.all),
from(JC),
JC.group === group && (JC.state === waiting ||
(JC.state === stuck && stuckTrigger < now.toMillis))
).orderBy(JC.state.asc, psort, JC.submitted.asc).limit(1)
sql.build.query[RJob].option
}
def setCancelled[F[_]: Async](id: Ident, store: Store[F]): F[Unit] =
for {
now <- Timestamp.current[F]
_ <- store.transact(RJob.setCancelled(id, now))
} yield ()
def setFailed[F[_]: Async](id: Ident, store: Store[F]): F[Unit] =
for {
now <- Timestamp.current[F]
_ <- store.transact(RJob.setFailed(id, now))
} yield ()
def setSuccess[F[_]: Async](id: Ident, store: Store[F]): F[Unit] =
for {
now <- Timestamp.current[F]
_ <- store.transact(RJob.setSuccess(id, now))
} yield ()
def setStuck[F[_]: Async](id: Ident, store: Store[F]): F[Unit] =
for {
now <- Timestamp.current[F]
_ <- store.transact(RJob.setStuck(id, now))
} yield ()
def setRunning[F[_]: Async](id: Ident, workerId: Ident, store: Store[F]): F[Unit] =
for {
now <- Timestamp.current[F]
_ <- store.transact(RJob.setRunning(id, workerId, now))
} yield ()
def setFinalState[F[_]: Async](id: Ident, state: JobState, store: Store[F]): F[Unit] = {
val logger = docspell.logging.getLogger[F]
state match {
case JobState.Success =>
setSuccess(id, store)
case JobState.Failed =>
setFailed(id, store)
case JobState.Cancelled =>
setCancelled(id, store)
case JobState.Stuck =>
setStuck(id, store)
case _ =>
logger.error(s"Invalid final state: $state.")
}
}
def exceedsRetries[F[_]: Async](id: Ident, max: Int, store: Store[F]): F[Boolean] =
store.transact(RJob.getRetries(id)).map(n => n.forall(_ >= max))
def runningToWaiting[F[_]: Async](workerId: Ident, store: Store[F]): F[Unit] =
store.transact(RJob.setRunningToWaiting(workerId)).map(_ => ())
def findAll[F[_]](ids: Seq[Ident], store: Store[F]): F[Vector[RJob]] =
store.transact(RJob.findFromIds(ids))
}

View File

@ -21,7 +21,6 @@ import docspell.scheduler._
import docspell.scheduler.impl.SchedulerImpl._
import docspell.scheduler.msg.{CancelJob, JobDone, JobsNotify}
import docspell.store.Store
import docspell.store.queries.QJob
import docspell.store.records.RJob
import io.circe.Json

View File

@ -0,0 +1,94 @@
/*
* Copyright 2020 Eike K. & Contributors
*
* SPDX-License-Identifier: AGPL-3.0-or-later
*/
package docspell.scheduler.impl
import java.time.Instant
import java.util.concurrent.atomic.AtomicLong
import cats.implicits._
import docspell.common._
import docspell.logging.TestLoggingConfig
import docspell.store.StoreFixture
import docspell.store.records.{RJob, RJobGroupUse}
import doobie.implicits._
import munit._
class QJobTest extends CatsEffectSuite with StoreFixture with TestLoggingConfig {
private[this] val c = new AtomicLong(0)
private val worker = Ident.unsafe("joex1")
private val initialPause = Duration.seconds(5)
private val nowTs = Timestamp(Instant.parse("2021-06-26T14:54:00Z"))
private val group1 = Ident.unsafe("group1")
private val group2 = Ident.unsafe("group2")
def createJob(group: Ident): RJob =
RJob.fromJson[Unit](
Ident.unsafe(s"job-${c.incrementAndGet()}"),
Ident.unsafe("task"),
group,
(),
"some subject",
nowTs - Duration.days(3),
Ident.unsafe("user1"),
Priority.Low,
None
)
xa.test("set group must insert or update") { tx =>
val res =
for {
_ <- RJobGroupUse.setGroup(RJobGroupUse(group1, worker)).transact(tx)
res <- RJobGroupUse.findGroup(worker).transact(tx)
} yield res
res.assertEquals(Some(group1))
}
xa.test("selectNextGroup should return first group on initial state") { tx =>
val nextGroup = for {
_ <- List(group1, group2, group1, group2, group2)
.map(createJob)
.map(RJob.insert)
.traverse(_.transact(tx))
_ <- RJobGroupUse.deleteAll.transact(tx)
next <- QJob.selectNextGroup(worker, nowTs, initialPause).transact(tx)
} yield next
nextGroup.assertEquals(Some(group1))
}
xa.test("selectNextGroup should return second group on subsequent call (1)") { tx =>
val nextGroup = for {
_ <- List(group1, group2, group1, group2)
.map(createJob)
.map(RJob.insert)
.traverse(_.transact(tx))
_ <- RJobGroupUse.deleteAll.transact(tx)
_ <- RJobGroupUse.setGroup(RJobGroupUse(group1, worker)).transact(tx)
next <- QJob.selectNextGroup(worker, nowTs, initialPause).transact(tx)
} yield next
nextGroup.assertEquals(Some(group2))
}
xa.test("selectNextGroup should return second group on subsequent call (2)") { tx =>
val nextGroup = for {
_ <- List(group1, group2, group1, group2)
.map(createJob)
.map(RJob.insert)
.traverse(_.transact(tx))
_ <- RJobGroupUse.deleteAll.transact(tx)
_ <- RJobGroupUse.setGroup(RJobGroupUse(group2, worker)).transact(tx)
next <- QJob.selectNextGroup(worker, nowTs, initialPause).transact(tx)
} yield next
nextGroup.assertEquals(Some(group1))
}
}