mirror of
https://github.com/TheAnachronism/docspell.git
synced 2025-06-21 18:08:25 +00:00
Move job queue to scheduler-api and fix notification of periodic tasks
This commit is contained in:
@ -0,0 +1,97 @@
|
||||
/*
|
||||
* Copyright 2020 Eike K. & Contributors
|
||||
*
|
||||
* SPDX-License-Identifier: AGPL-3.0-or-later
|
||||
*/
|
||||
|
||||
package docspell.scheduler
|
||||
|
||||
import cats.effect._
|
||||
import cats.implicits._
|
||||
import docspell.common._
|
||||
import docspell.store.Store
|
||||
import docspell.store.queries.QJob
|
||||
import docspell.store.records.RJob
|
||||
|
||||
trait JobQueue[F[_]] {
|
||||
|
||||
/** Inserts the job into the queue to get picked up as soon as possible. The job must
|
||||
* have a new unique id.
|
||||
*/
|
||||
def insert(job: RJob): F[Unit]
|
||||
|
||||
/** Inserts the job into the queue only, if there is no job with the same tracker-id
|
||||
* running at the moment. The job id must be a new unique id.
|
||||
*
|
||||
* If the job has no tracker defined, it is simply inserted.
|
||||
*/
|
||||
def insertIfNew(job: RJob): F[Boolean]
|
||||
|
||||
def insertAll(jobs: Seq[RJob]): F[List[Boolean]]
|
||||
|
||||
def insertAllIfNew(jobs: Seq[RJob]): F[List[Boolean]]
|
||||
|
||||
def nextJob(
|
||||
prio: Ident => F[Priority],
|
||||
worker: Ident,
|
||||
retryPause: Duration
|
||||
): F[Option[RJob]]
|
||||
}
|
||||
|
||||
object JobQueue {
|
||||
private[scheduler] def create[F[_]: Async](store: Store[F]): Resource[F, JobQueue[F]] =
|
||||
Resource.pure[F, JobQueue[F]](new JobQueue[F] {
|
||||
private[this] val logger = docspell.logging.getLogger[F]
|
||||
|
||||
def nextJob(
|
||||
prio: Ident => F[Priority],
|
||||
worker: Ident,
|
||||
retryPause: Duration
|
||||
): F[Option[RJob]] =
|
||||
logger
|
||||
.trace("Select next job") *> QJob.takeNextJob(store)(prio, worker, retryPause)
|
||||
|
||||
def insert(job: RJob): F[Unit] =
|
||||
store
|
||||
.transact(RJob.insert(job))
|
||||
.flatMap { n =>
|
||||
if (n != 1)
|
||||
Async[F]
|
||||
.raiseError(new Exception(s"Inserting job failed. Update count: $n"))
|
||||
else ().pure[F]
|
||||
}
|
||||
|
||||
def insertIfNew(job: RJob): F[Boolean] =
|
||||
for {
|
||||
rj <- job.tracker match {
|
||||
case Some(tid) =>
|
||||
store.transact(RJob.findNonFinalByTracker(tid))
|
||||
case None =>
|
||||
None.pure[F]
|
||||
}
|
||||
ret <-
|
||||
if (rj.isDefined) false.pure[F]
|
||||
else insert(job).as(true)
|
||||
} yield ret
|
||||
|
||||
def insertAll(jobs: Seq[RJob]): F[List[Boolean]] =
|
||||
jobs.toList
|
||||
.traverse(j => insert(j).attempt)
|
||||
.flatMap(_.traverse {
|
||||
case Right(()) => true.pure[F]
|
||||
case Left(ex) =>
|
||||
logger.error(ex)("Could not insert job. Skipping it.").as(false)
|
||||
|
||||
})
|
||||
|
||||
def insertAllIfNew(jobs: Seq[RJob]): F[List[Boolean]] =
|
||||
jobs.toList
|
||||
.traverse(j => insertIfNew(j).attempt)
|
||||
.flatMap(_.traverse {
|
||||
case Right(true) => true.pure[F]
|
||||
case Right(false) => false.pure[F]
|
||||
case Left(ex) =>
|
||||
logger.error(ex)("Could not insert job. Skipping it.").as(false)
|
||||
})
|
||||
})
|
||||
}
|
@ -27,6 +27,7 @@ trait PeriodicScheduler[F[_]] {
|
||||
def periodicAwake: F[Fiber[F, Throwable, Unit]]
|
||||
|
||||
def notifyChange: F[Unit]
|
||||
}
|
||||
|
||||
object PeriodicScheduler {}
|
||||
/** Starts listening for notify messages in the background. */
|
||||
def startSubscriptions: F[Unit]
|
||||
}
|
||||
|
@ -22,6 +22,10 @@ trait Scheduler[F[_]] {
|
||||
|
||||
def notifyChange: F[Unit]
|
||||
|
||||
/** Starts reacting on notify and cancel messages. */
|
||||
def startSubscriptions: F[Unit]
|
||||
|
||||
/** Starts the schedulers main loop. */
|
||||
def start: Stream[F, Nothing]
|
||||
|
||||
/** Requests to shutdown the scheduler.
|
||||
|
@ -0,0 +1,26 @@
|
||||
/*
|
||||
* Copyright 2020 Eike K. & Contributors
|
||||
*
|
||||
* SPDX-License-Identifier: AGPL-3.0-or-later
|
||||
*/
|
||||
|
||||
package docspell.scheduler.msg
|
||||
|
||||
import docspell.common._
|
||||
import docspell.pubsub.api.{Topic, TypedTopic}
|
||||
import io.circe.generic.semiauto.{deriveDecoder, deriveEncoder}
|
||||
import io.circe.{Decoder, Encoder}
|
||||
|
||||
/** Message to request to cancel a job. */
|
||||
final case class CancelJob(jobId: Ident, nodeId: Ident)
|
||||
|
||||
object CancelJob {
|
||||
implicit val jsonDecoder: Decoder[CancelJob] =
|
||||
deriveDecoder[CancelJob]
|
||||
|
||||
implicit val jsonEncoder: Encoder[CancelJob] =
|
||||
deriveEncoder[CancelJob]
|
||||
|
||||
val topic: TypedTopic[CancelJob] =
|
||||
TypedTopic(Topic("job-cancel-request"))
|
||||
}
|
@ -0,0 +1,78 @@
|
||||
/*
|
||||
* Copyright 2020 Eike K. & Contributors
|
||||
*
|
||||
* SPDX-License-Identifier: AGPL-3.0-or-later
|
||||
*/
|
||||
|
||||
package docspell.scheduler.msg
|
||||
|
||||
import cats.effect._
|
||||
import cats.implicits._
|
||||
import docspell.common.{Duration, Ident, Priority}
|
||||
import docspell.notification.api.{Event, EventSink}
|
||||
import docspell.pubsub.api.PubSubT
|
||||
import docspell.scheduler.JobQueue
|
||||
import docspell.store.Store
|
||||
import docspell.store.records.RJob
|
||||
|
||||
final class JobQueuePublish[F[_]: Sync](
|
||||
delegate: JobQueue[F],
|
||||
pubsub: PubSubT[F],
|
||||
eventSink: EventSink[F]
|
||||
) extends JobQueue[F] {
|
||||
|
||||
private def msg(job: RJob): JobSubmitted =
|
||||
JobSubmitted(job.id, job.group, job.task, job.args)
|
||||
|
||||
private def event(job: RJob): Event.JobSubmitted =
|
||||
Event.JobSubmitted(
|
||||
job.id,
|
||||
job.group,
|
||||
job.task,
|
||||
job.args,
|
||||
job.state,
|
||||
job.subject,
|
||||
job.submitter
|
||||
)
|
||||
|
||||
private def publish(job: RJob): F[Unit] =
|
||||
pubsub.publish1(JobSubmitted.topic, msg(job)).as(()) *>
|
||||
eventSink.offer(event(job))
|
||||
|
||||
def insert(job: RJob) =
|
||||
delegate.insert(job).flatTap(_ => publish(job))
|
||||
|
||||
def insertIfNew(job: RJob) =
|
||||
delegate.insertIfNew(job).flatTap {
|
||||
case true => publish(job)
|
||||
case false => ().pure[F]
|
||||
}
|
||||
|
||||
def insertAll(jobs: Seq[RJob]) =
|
||||
delegate.insertAll(jobs).flatTap { results =>
|
||||
results.zip(jobs).traverse { case (res, job) =>
|
||||
if (res) publish(job)
|
||||
else ().pure[F]
|
||||
}
|
||||
}
|
||||
|
||||
def insertAllIfNew(jobs: Seq[RJob]) =
|
||||
delegate.insertAllIfNew(jobs).flatTap { results =>
|
||||
results.zip(jobs).traverse { case (res, job) =>
|
||||
if (res) publish(job)
|
||||
else ().pure[F]
|
||||
}
|
||||
}
|
||||
|
||||
def nextJob(prio: Ident => F[Priority], worker: Ident, retryPause: Duration) =
|
||||
delegate.nextJob(prio, worker, retryPause)
|
||||
}
|
||||
|
||||
object JobQueuePublish {
|
||||
def apply[F[_]: Async](
|
||||
store: Store[F],
|
||||
pubSub: PubSubT[F],
|
||||
eventSink: EventSink[F]
|
||||
): Resource[F, JobQueue[F]] =
|
||||
JobQueue.create(store).map(q => new JobQueuePublish[F](q, pubSub, eventSink))
|
||||
}
|
@ -0,0 +1,26 @@
|
||||
/*
|
||||
* Copyright 2020 Eike K. & Contributors
|
||||
*
|
||||
* SPDX-License-Identifier: AGPL-3.0-or-later
|
||||
*/
|
||||
|
||||
package docspell.scheduler.msg
|
||||
|
||||
import docspell.common._
|
||||
import docspell.pubsub.api.{Topic, TypedTopic}
|
||||
import io.circe.generic.semiauto.{deriveDecoder, deriveEncoder}
|
||||
import io.circe.{Decoder, Encoder}
|
||||
|
||||
final case class JobSubmitted(jobId: Ident, group: Ident, task: Ident, args: String)
|
||||
|
||||
object JobSubmitted {
|
||||
|
||||
implicit val jsonDecoder: Decoder[JobSubmitted] =
|
||||
deriveDecoder
|
||||
|
||||
implicit val jsonEncoder: Encoder[JobSubmitted] =
|
||||
deriveEncoder
|
||||
|
||||
val topic: TypedTopic[JobSubmitted] =
|
||||
TypedTopic(Topic("job-submitted"))
|
||||
}
|
@ -0,0 +1,9 @@
|
||||
package docspell.scheduler.msg
|
||||
|
||||
import docspell.pubsub.api.{Topic, TypedTopic}
|
||||
|
||||
/** A generic notification to the job executors to look for new work. */
|
||||
object JobsNotify {
|
||||
def apply(): TypedTopic[Unit] =
|
||||
TypedTopic[Unit](Topic("jobs-notify"))
|
||||
}
|
@ -0,0 +1,9 @@
|
||||
package docspell.scheduler.msg
|
||||
|
||||
import docspell.pubsub.api.{Topic, TypedTopic}
|
||||
|
||||
/** A generic notification to the periodic task scheduler to look for new work. */
|
||||
object PeriodicTaskNotify {
|
||||
def apply(): TypedTopic[Unit] =
|
||||
TypedTopic[Unit](Topic("periodic-task-notify"))
|
||||
}
|
@ -0,0 +1,22 @@
|
||||
/*
|
||||
* Copyright 2020 Eike K. & Contributors
|
||||
*
|
||||
* SPDX-License-Identifier: AGPL-3.0-or-later
|
||||
*/
|
||||
|
||||
package docspell.scheduler.impl
|
||||
|
||||
sealed trait Marked[+A] {}
|
||||
|
||||
object Marked {
|
||||
|
||||
final case class Found[A](value: A) extends Marked[A]
|
||||
|
||||
final case object NotFound extends Marked[Nothing]
|
||||
|
||||
final case object NotMarkable extends Marked[Nothing]
|
||||
|
||||
def found[A](v: A): Marked[A] = Found(v)
|
||||
def notFound[A]: Marked[A] = NotFound
|
||||
def notMarkable[A]: Marked[A] = NotMarkable
|
||||
}
|
@ -1,8 +1,8 @@
|
||||
package docspell.scheduler.impl
|
||||
|
||||
import cats.effect._
|
||||
import docspell.pubsub.api.PubSubT
|
||||
import docspell.scheduler._
|
||||
import docspell.store.queue.{JobQueue, PeriodicTaskStore}
|
||||
import fs2.concurrent.SignallingRef
|
||||
|
||||
object PeriodicSchedulerBuilder {
|
||||
@ -12,7 +12,7 @@ object PeriodicSchedulerBuilder {
|
||||
sch: Scheduler[F],
|
||||
queue: JobQueue[F],
|
||||
store: PeriodicTaskStore[F],
|
||||
notifyJoex: F[Unit]
|
||||
pubsub: PubSubT[F]
|
||||
): Resource[F, PeriodicScheduler[F]] =
|
||||
for {
|
||||
waiter <- Resource.eval(SignallingRef(true))
|
||||
@ -22,7 +22,7 @@ object PeriodicSchedulerBuilder {
|
||||
sch,
|
||||
queue,
|
||||
store,
|
||||
notifyJoex,
|
||||
pubsub,
|
||||
waiter,
|
||||
state
|
||||
)
|
||||
|
@ -10,13 +10,12 @@ import cats.effect._
|
||||
import cats.implicits._
|
||||
import fs2._
|
||||
import fs2.concurrent.SignallingRef
|
||||
|
||||
import docspell.common._
|
||||
import docspell.pubsub.api.PubSubT
|
||||
import docspell.scheduler._
|
||||
import docspell.scheduler.impl.PeriodicSchedulerImpl.State
|
||||
import docspell.store.queue._
|
||||
import docspell.scheduler.msg.{JobsNotify, PeriodicTaskNotify}
|
||||
import docspell.store.records.RPeriodicTask
|
||||
|
||||
import eu.timepit.fs2cron.calev.CalevScheduler
|
||||
|
||||
final class PeriodicSchedulerImpl[F[_]: Async](
|
||||
@ -24,7 +23,7 @@ final class PeriodicSchedulerImpl[F[_]: Async](
|
||||
sch: Scheduler[F],
|
||||
queue: JobQueue[F],
|
||||
store: PeriodicTaskStore[F],
|
||||
joexNotifyAll: F[Unit],
|
||||
pubSub: PubSubT[F],
|
||||
waiter: SignallingRef[F, Boolean],
|
||||
state: SignallingRef[F, State[F]]
|
||||
) extends PeriodicScheduler[F] {
|
||||
@ -49,6 +48,13 @@ final class PeriodicSchedulerImpl[F[_]: Async](
|
||||
def notifyChange: F[Unit] =
|
||||
waiter.update(b => !b)
|
||||
|
||||
def startSubscriptions: F[Unit] =
|
||||
for {
|
||||
_ <- Async[F].start(pubSub.subscribeSink(PeriodicTaskNotify()) { _ =>
|
||||
logger.info("Notify periodic scheduler from message") *> notifyChange
|
||||
})
|
||||
} yield ()
|
||||
|
||||
// internal
|
||||
|
||||
/** On startup, get all periodic jobs from this scheduler and remove the mark, so they
|
||||
@ -117,7 +123,7 @@ final class PeriodicSchedulerImpl[F[_]: Async](
|
||||
}
|
||||
|
||||
def notifyJoex: F[Unit] =
|
||||
sch.notifyChange *> joexNotifyAll
|
||||
sch.notifyChange *> pubSub.publish1IgnoreErrors(JobsNotify(), ()).void
|
||||
|
||||
def scheduleNotify(pj: RPeriodicTask): F[Unit] =
|
||||
Timestamp
|
||||
|
@ -0,0 +1,120 @@
|
||||
/*
|
||||
* Copyright 2020 Eike K. & Contributors
|
||||
*
|
||||
* SPDX-License-Identifier: AGPL-3.0-or-later
|
||||
*/
|
||||
|
||||
package docspell.scheduler.impl
|
||||
|
||||
import cats.effect._
|
||||
import cats.implicits._
|
||||
import docspell.common._
|
||||
import docspell.store.queries.QPeriodicTask
|
||||
import docspell.store.records._
|
||||
import docspell.store.{AddResult, Store}
|
||||
|
||||
trait PeriodicTaskStore[F[_]] {
|
||||
|
||||
/** Get the free periodic task due next and reserve it to the given worker.
|
||||
*
|
||||
* If found, the task is returned and resource finalization takes care of unmarking the
|
||||
* task after use and updating `nextRun` with the next timestamp.
|
||||
*/
|
||||
def takeNext(
|
||||
worker: Ident,
|
||||
excludeId: Option[Ident]
|
||||
): Resource[F, Marked[RPeriodicTask]]
|
||||
|
||||
def clearMarks(name: Ident): F[Unit]
|
||||
|
||||
def findNonFinalJob(pjobId: Ident): F[Option[RJob]]
|
||||
|
||||
/** Insert a task or update if it already exists. */
|
||||
def insert(task: RPeriodicTask): F[Unit]
|
||||
|
||||
/** Adds the task only if it not already exists. */
|
||||
def add(task: RPeriodicTask): F[AddResult]
|
||||
|
||||
/** Find all joex nodes as registered in the database. */
|
||||
def findJoexNodes: F[Vector[RNode]]
|
||||
}
|
||||
|
||||
object PeriodicTaskStore {
|
||||
|
||||
def create[F[_]: Sync](store: Store[F]): Resource[F, PeriodicTaskStore[F]] =
|
||||
Resource.pure[F, PeriodicTaskStore[F]](new PeriodicTaskStore[F] {
|
||||
private[this] val logger = docspell.logging.getLogger[F]
|
||||
def takeNext(
|
||||
worker: Ident,
|
||||
excludeId: Option[Ident]
|
||||
): Resource[F, Marked[RPeriodicTask]] = {
|
||||
val chooseNext: F[Marked[RPeriodicTask]] =
|
||||
getNext(excludeId).flatMap {
|
||||
case Some(pj) =>
|
||||
mark(pj.id, worker).map {
|
||||
case true => Marked.found(pj.copy(worker = worker.some))
|
||||
case false => Marked.notMarkable
|
||||
}
|
||||
case None =>
|
||||
Marked.notFound[RPeriodicTask].pure[F]
|
||||
}
|
||||
|
||||
Resource.make(chooseNext) {
|
||||
case Marked.Found(pj) => unmark(pj)
|
||||
case _ => ().pure[F]
|
||||
}
|
||||
}
|
||||
|
||||
def getNext(excl: Option[Ident]): F[Option[RPeriodicTask]] =
|
||||
store.transact(QPeriodicTask.findNext(excl))
|
||||
|
||||
def mark(pid: Ident, name: Ident): F[Boolean] =
|
||||
Timestamp
|
||||
.current[F]
|
||||
.flatMap(now =>
|
||||
store.transact(QPeriodicTask.setWorker(pid, name, now)).map(_ > 0)
|
||||
)
|
||||
|
||||
def unmark(job: RPeriodicTask): F[Unit] =
|
||||
for {
|
||||
now <- Timestamp.current[F]
|
||||
nextRun = job.timer.nextElapse(now.atUTC).map(Timestamp.from)
|
||||
_ <- store.transact(QPeriodicTask.unsetWorker(job.id, nextRun))
|
||||
} yield ()
|
||||
|
||||
def clearMarks(name: Ident): F[Unit] =
|
||||
store
|
||||
.transact(QPeriodicTask.clearWorkers(name))
|
||||
.flatMap { n =>
|
||||
if (n > 0) logger.info(s"Clearing $n periodic tasks from worker ${name.id}")
|
||||
else ().pure[F]
|
||||
}
|
||||
|
||||
def findNonFinalJob(pjobId: Ident): F[Option[RJob]] =
|
||||
store.transact(RJob.findNonFinalByTracker(pjobId))
|
||||
|
||||
def insert(task: RPeriodicTask): F[Unit] = {
|
||||
val update = store.transact(RPeriodicTask.update(task))
|
||||
val insertAttempt = store.transact(RPeriodicTask.insert(task)).attempt.map {
|
||||
case Right(n) => n > 0
|
||||
case Left(_) => false
|
||||
}
|
||||
|
||||
for {
|
||||
n1 <- update
|
||||
ins <- if (n1 == 0) insertAttempt else true.pure[F]
|
||||
_ <- if (ins) 1.pure[F] else update
|
||||
} yield ()
|
||||
}
|
||||
|
||||
def add(task: RPeriodicTask): F[AddResult] = {
|
||||
val insert = RPeriodicTask.insert(task)
|
||||
val exists = RPeriodicTask.exists(task.id)
|
||||
store.add(insert, exists)
|
||||
}
|
||||
|
||||
def findJoexNodes: F[Vector[RNode]] =
|
||||
store.transact(RNode.findAll(NodeType.Joex))
|
||||
|
||||
})
|
||||
}
|
@ -10,12 +10,10 @@ import cats.effect._
|
||||
import cats.effect.std.Semaphore
|
||||
import cats.implicits._
|
||||
import fs2.concurrent.SignallingRef
|
||||
|
||||
import docspell.scheduler._
|
||||
import docspell.scheduler.{JobQueue, _}
|
||||
import docspell.notification.api.EventSink
|
||||
import docspell.pubsub.api.PubSubT
|
||||
import docspell.store.Store
|
||||
import docspell.store.queue.JobQueue
|
||||
|
||||
case class SchedulerBuilder[F[_]: Async](
|
||||
config: SchedulerConfig,
|
||||
@ -88,7 +86,7 @@ object SchedulerBuilder {
|
||||
config,
|
||||
JobTaskRegistry.empty[F],
|
||||
store,
|
||||
JobQueue(store),
|
||||
JobQueue.create(store),
|
||||
LogSink.db[F](store),
|
||||
PubSubT.noop[F],
|
||||
EventSink.silent[F]
|
||||
|
@ -12,19 +12,16 @@ import cats.effect.std.Semaphore
|
||||
import cats.implicits._
|
||||
import fs2.Stream
|
||||
import fs2.concurrent.SignallingRef
|
||||
|
||||
import docspell.scheduler.msg.JobDone
|
||||
import docspell.scheduler.msg.{CancelJob, JobDone, JobsNotify}
|
||||
import docspell.common._
|
||||
import docspell.scheduler._
|
||||
import docspell.scheduler.{JobQueue, _}
|
||||
import docspell.scheduler.impl.SchedulerImpl._
|
||||
import docspell.notification.api.Event
|
||||
import docspell.notification.api.EventSink
|
||||
import docspell.pubsub.api.PubSubT
|
||||
import docspell.store.Store
|
||||
import docspell.store.queries.QJob
|
||||
import docspell.store.queue.JobQueue
|
||||
import docspell.store.records.RJob
|
||||
|
||||
import io.circe.Json
|
||||
|
||||
final class SchedulerImpl[F[_]: Async](
|
||||
@ -42,6 +39,16 @@ final class SchedulerImpl[F[_]: Async](
|
||||
|
||||
private[this] val logger = docspell.logging.getLogger[F]
|
||||
|
||||
def startSubscriptions =
|
||||
for {
|
||||
_ <- Async[F].start(pubSub.subscribeSink(JobsNotify()) { _ =>
|
||||
notifyChange
|
||||
})
|
||||
_ <- Async[F].start(pubSub.subscribeSink(CancelJob.topic) { msg =>
|
||||
requestCancel(msg.body.jobId).void
|
||||
})
|
||||
} yield ()
|
||||
|
||||
/** On startup, get all jobs in state running from this scheduler and put them into
|
||||
* waiting state, so they get picked up again.
|
||||
*/
|
||||
|
Reference in New Issue
Block a user