mirror of
https://github.com/TheAnachronism/docspell.git
synced 2025-06-21 18:08:25 +00:00
Sketch a scheduler for running periodic tasks
Periodic tasks are special in that they are usually kept around and started based on a schedule. A new component checks periodic tasks and submits them in the queue once they are due. In order to avoid duplicate periodic jobs, the tracker of a job is used to store the periodic job id. Each time a periodic task is due, it is first checked if there is a job running (or queued) for this task.
This commit is contained in:
@ -64,6 +64,19 @@ docspell.joex {
|
||||
wakeup-period = "30 minutes"
|
||||
}
|
||||
|
||||
periodic-scheduler {
|
||||
|
||||
# Each scheduler needs a unique name. This defaults to the node
|
||||
# name, which must be unique, too.
|
||||
name = ${docspell.joex.app-id}
|
||||
|
||||
# A fallback to start looking for due periodic tasks regularily.
|
||||
# Usually joex instances should be notified via REST calls if
|
||||
# external processes change tasks. But these requests may get
|
||||
# lost.
|
||||
wakeup-period = "10 minutes"
|
||||
}
|
||||
|
||||
# Configuration of text extraction
|
||||
extraction {
|
||||
# For PDF files it is first tried to read the text parts of the
|
||||
|
@ -1,7 +1,7 @@
|
||||
package docspell.joex
|
||||
|
||||
import docspell.common.{Ident, LenientUri}
|
||||
import docspell.joex.scheduler.SchedulerConfig
|
||||
import docspell.joex.scheduler.{PeriodicSchedulerConfig, SchedulerConfig}
|
||||
import docspell.store.JdbcConfig
|
||||
import docspell.convert.ConvertConfig
|
||||
import docspell.extract.ExtractConfig
|
||||
@ -12,6 +12,7 @@ case class Config(
|
||||
bind: Config.Bind,
|
||||
jdbc: JdbcConfig,
|
||||
scheduler: SchedulerConfig,
|
||||
periodicScheduler: PeriodicSchedulerConfig,
|
||||
extraction: ExtractConfig,
|
||||
convert: ConvertConfig
|
||||
)
|
||||
|
@ -1,7 +1,7 @@
|
||||
package docspell.joex
|
||||
|
||||
import docspell.common.Ident
|
||||
import docspell.joex.scheduler.Scheduler
|
||||
import docspell.joex.scheduler.{PeriodicScheduler, Scheduler}
|
||||
import docspell.store.records.RJobLog
|
||||
|
||||
trait JoexApp[F[_]] {
|
||||
@ -10,6 +10,8 @@ trait JoexApp[F[_]] {
|
||||
|
||||
def scheduler: Scheduler[F]
|
||||
|
||||
def periodicScheduler: PeriodicScheduler[F]
|
||||
|
||||
def findLogs(jobId: Ident): F[Vector[RJobLog]]
|
||||
|
||||
/** Shuts down the job executor.
|
||||
|
@ -3,9 +3,11 @@ package docspell.joex
|
||||
import cats.implicits._
|
||||
import cats.effect._
|
||||
import docspell.common.{Ident, NodeType, ProcessItemArgs}
|
||||
import docspell.joex.background._
|
||||
import docspell.joex.process.ItemHandler
|
||||
import docspell.joex.scheduler.{JobTask, Scheduler, SchedulerBuilder}
|
||||
import docspell.joex.scheduler._
|
||||
import docspell.store.Store
|
||||
import docspell.store.queue._
|
||||
import docspell.store.ops.ONode
|
||||
import docspell.store.records.RJobLog
|
||||
import fs2.concurrent.SignallingRef
|
||||
@ -17,14 +19,18 @@ final class JoexAppImpl[F[_]: ConcurrentEffect: ContextShift: Timer](
|
||||
nodeOps: ONode[F],
|
||||
store: Store[F],
|
||||
termSignal: SignallingRef[F, Boolean],
|
||||
val scheduler: Scheduler[F]
|
||||
val scheduler: Scheduler[F],
|
||||
val periodicScheduler: PeriodicScheduler[F]
|
||||
) extends JoexApp[F] {
|
||||
|
||||
def init: F[Unit] = {
|
||||
val run = scheduler.start.compile.drain
|
||||
val run = scheduler.start.compile.drain
|
||||
val prun = periodicScheduler.start.compile.drain
|
||||
for {
|
||||
_ <- ConcurrentEffect[F].start(run)
|
||||
_ <- ConcurrentEffect[F].start(prun)
|
||||
_ <- scheduler.periodicAwake
|
||||
_ <- periodicScheduler.periodicAwake
|
||||
_ <- nodeOps.register(cfg.appId, NodeType.Joex, cfg.baseUrl)
|
||||
} yield ()
|
||||
}
|
||||
@ -36,7 +42,7 @@ final class JoexAppImpl[F[_]: ConcurrentEffect: ContextShift: Timer](
|
||||
nodeOps.unregister(cfg.appId)
|
||||
|
||||
def initShutdown: F[Unit] =
|
||||
scheduler.shutdown(false) *> termSignal.set(true)
|
||||
periodicScheduler.shutdown *> scheduler.shutdown(false) *> termSignal.set(true)
|
||||
|
||||
}
|
||||
|
||||
@ -50,8 +56,12 @@ object JoexAppImpl {
|
||||
): Resource[F, JoexApp[F]] =
|
||||
for {
|
||||
store <- Store.create(cfg.jdbc, connectEC, blocker)
|
||||
queue <- JobQueue(store)
|
||||
pstore <- PeriodicTaskStore.create(store)
|
||||
nodeOps <- ONode(store)
|
||||
psch <- PeriodicScheduler.create(cfg.periodicScheduler, queue, pstore, Timer[F])
|
||||
sch <- SchedulerBuilder(cfg.scheduler, blocker, store)
|
||||
.withQueue(queue)
|
||||
.withTask(
|
||||
JobTask.json(
|
||||
ProcessItemArgs.taskName,
|
||||
@ -59,8 +69,15 @@ object JoexAppImpl {
|
||||
ItemHandler.onCancel[F]
|
||||
)
|
||||
)
|
||||
.withTask(
|
||||
JobTask.json(
|
||||
PeriodicTask.taskName,
|
||||
PeriodicTask[F](cfg),
|
||||
PeriodicTask.onCancel[F]
|
||||
)
|
||||
)
|
||||
.resource
|
||||
app = new JoexAppImpl(cfg, nodeOps, store, termSignal, sch)
|
||||
app = new JoexAppImpl(cfg, nodeOps, store, termSignal, sch, psch)
|
||||
appR <- Resource.make(app.init.map(_ => app))(_.shutdown)
|
||||
} yield appR
|
||||
}
|
||||
|
@ -19,7 +19,8 @@ object JoexRoutes {
|
||||
case POST -> Root / "notify" =>
|
||||
for {
|
||||
_ <- app.scheduler.notifyChange
|
||||
resp <- Ok(BasicResult(true, "Scheduler notified."))
|
||||
_ <- app.periodicScheduler.notifyChange
|
||||
resp <- Ok(BasicResult(true, "Schedulers notified."))
|
||||
} yield resp
|
||||
|
||||
case GET -> Root / "running" =>
|
||||
|
@ -0,0 +1,46 @@
|
||||
package docspell.joex.scheduler
|
||||
|
||||
import fs2._
|
||||
import fs2.concurrent.SignallingRef
|
||||
import cats.effect._
|
||||
|
||||
import docspell.store.queue._
|
||||
|
||||
/** A periodic scheduler takes care to submit periodic tasks to the
|
||||
* job queue.
|
||||
*
|
||||
* It is run in the background to regularily find a periodic task to
|
||||
* execute. If the task is due, it will be submitted into the job
|
||||
* queue where it will be picked up by the scheduler from some joex
|
||||
* instance. If it is due in the future, a notification is scheduled
|
||||
* to be received at that time so the task can be looked up again.
|
||||
*/
|
||||
trait PeriodicScheduler[F[_]] {
|
||||
|
||||
def config: PeriodicSchedulerConfig
|
||||
|
||||
def start: Stream[F, Nothing]
|
||||
|
||||
def shutdown: F[Unit]
|
||||
|
||||
def periodicAwake: F[Fiber[F, Unit]]
|
||||
|
||||
def notifyChange: F[Unit]
|
||||
}
|
||||
|
||||
object PeriodicScheduler {
|
||||
|
||||
def create[F[_]: ConcurrentEffect: ContextShift](
|
||||
cfg: PeriodicSchedulerConfig,
|
||||
queue: JobQueue[F],
|
||||
store: PeriodicTaskStore[F],
|
||||
timer: Timer[F]
|
||||
): Resource[F, PeriodicScheduler[F]] =
|
||||
for {
|
||||
waiter <- Resource.liftF(SignallingRef(true))
|
||||
state <- Resource.liftF(SignallingRef(PeriodicSchedulerImpl.emptyState[F]))
|
||||
psch = new PeriodicSchedulerImpl[F](cfg, queue, store, waiter, state, timer)
|
||||
_ <- Resource.liftF(psch.init)
|
||||
} yield psch
|
||||
|
||||
}
|
@ -0,0 +1,8 @@
|
||||
package docspell.joex.scheduler
|
||||
|
||||
import docspell.common._
|
||||
|
||||
case class PeriodicSchedulerConfig(
|
||||
name: Ident,
|
||||
wakeupPeriod: Duration
|
||||
)
|
@ -0,0 +1,194 @@
|
||||
package docspell.joex.scheduler
|
||||
|
||||
import fs2._
|
||||
import fs2.concurrent.SignallingRef
|
||||
import cats.effect._
|
||||
import cats.implicits._
|
||||
import org.log4s.getLogger
|
||||
import com.github.eikek.fs2calev._
|
||||
|
||||
import docspell.common._
|
||||
import docspell.common.syntax.all._
|
||||
import docspell.store.queue._
|
||||
import docspell.store.records.RPeriodicTask
|
||||
|
||||
import PeriodicSchedulerImpl.State
|
||||
|
||||
/*
|
||||
onStartUp:
|
||||
- remove worker value from all of the current
|
||||
|
||||
Loop:
|
||||
- get earliest pjob
|
||||
- if none: stop
|
||||
- if triggered:
|
||||
- mark worker, restart loop on fail
|
||||
- submit new job
|
||||
- check for non-final jobs of that name
|
||||
- if exist: log info
|
||||
- if not exist: submit
|
||||
- update next trigger (in both cases)
|
||||
- remove worker
|
||||
- restart loop
|
||||
- if future
|
||||
- schedule notify
|
||||
- stop loop
|
||||
|
||||
|
||||
onNotify:
|
||||
- cancel current scheduled notify
|
||||
- start Loop
|
||||
|
||||
|
||||
onShutdown:
|
||||
- nothing to do
|
||||
*/
|
||||
|
||||
final class PeriodicSchedulerImpl[F[_]: ConcurrentEffect: ContextShift](
|
||||
val config: PeriodicSchedulerConfig,
|
||||
queue: JobQueue[F],
|
||||
store: PeriodicTaskStore[F],
|
||||
waiter: SignallingRef[F, Boolean],
|
||||
state: SignallingRef[F, State[F]],
|
||||
timer: Timer[F]
|
||||
) extends PeriodicScheduler[F] {
|
||||
private[this] val logger = getLogger
|
||||
implicit private val _timer: Timer[F] = timer
|
||||
|
||||
def start: Stream[F, Nothing] =
|
||||
logger.sinfo("Starting periodic scheduler") ++
|
||||
mainLoop
|
||||
|
||||
def shutdown: F[Unit] =
|
||||
state.modify(_.requestShutdown)
|
||||
|
||||
def periodicAwake: F[Fiber[F, Unit]] =
|
||||
ConcurrentEffect[F].start(
|
||||
Stream
|
||||
.awakeEvery[F](config.wakeupPeriod.toScala)
|
||||
.evalMap(_ => logger.fdebug("Periodic awake reached") *> notifyChange)
|
||||
.compile
|
||||
.drain
|
||||
)
|
||||
|
||||
def notifyChange: F[Unit] =
|
||||
waiter.update(b => !b)
|
||||
|
||||
// internal
|
||||
|
||||
/**
|
||||
* On startup, get all periodic jobs from this scheduler and remove
|
||||
* the mark, so they get picked up again.
|
||||
*/
|
||||
def init: F[Unit] =
|
||||
logError("Error clearing marks")(store.clearMarks(config.name))
|
||||
|
||||
def mainLoop: Stream[F, Nothing] = {
|
||||
val body: F[Boolean] =
|
||||
for {
|
||||
_ <- logger.fdebug(s"Going into main loop")
|
||||
now <- Timestamp.current[F]
|
||||
_ <- logger.fdebug(s"Looking for next periodic task")
|
||||
go <- logThrow("Error getting next task")(
|
||||
store
|
||||
.takeNext(config.name)
|
||||
.use({
|
||||
case Some(pj) =>
|
||||
(if (isTriggered(pj, now)) submitJob(pj)
|
||||
else scheduleNotify(pj)).map(_ => true)
|
||||
case None =>
|
||||
false.pure[F]
|
||||
})
|
||||
)
|
||||
} yield go
|
||||
|
||||
Stream
|
||||
.eval(state.get.map(_.shutdownRequest))
|
||||
.evalTap(
|
||||
if (_) logger.finfo[F]("Stopping main loop due to shutdown request.")
|
||||
else ().pure[F]
|
||||
)
|
||||
.flatMap(if (_) Stream.empty else Stream.eval(cancelNotify *> body))
|
||||
.flatMap({
|
||||
case true =>
|
||||
mainLoop
|
||||
case false =>
|
||||
logger.sdebug(s"Waiting for notify") ++
|
||||
waiter.discrete.take(2).drain ++
|
||||
logger.sdebug(s"Notify signal, going into main loop") ++
|
||||
mainLoop
|
||||
})
|
||||
}
|
||||
|
||||
def isTriggered(pj: RPeriodicTask, now: Timestamp): Boolean =
|
||||
pj.timer.contains(now.value)
|
||||
|
||||
def submitJob(pj: RPeriodicTask): F[Unit] =
|
||||
store
|
||||
.findNonFinalJob(pj.id)
|
||||
.flatMap({
|
||||
case Some(job) =>
|
||||
logger.finfo[F](
|
||||
s"There is already a job with non-final state '${job.state}' in the queue"
|
||||
)
|
||||
|
||||
case None =>
|
||||
logger.finfo[F](s"Submitting job for periodic task '${pj.task.id}'") *>
|
||||
pj.toJob.flatMap(queue.insert)
|
||||
})
|
||||
|
||||
def scheduleNotify(pj: RPeriodicTask): F[Unit] =
|
||||
ConcurrentEffect[F]
|
||||
.start(
|
||||
CalevFs2
|
||||
.sleep[F](pj.timer)
|
||||
.evalMap(_ => notifyChange)
|
||||
.compile
|
||||
.drain
|
||||
)
|
||||
.flatMap(fb => state.modify(_.setNotify(fb)))
|
||||
|
||||
def cancelNotify: F[Unit] =
|
||||
state
|
||||
.modify(_.clearNotify)
|
||||
.flatMap({
|
||||
case Some(fb) =>
|
||||
fb.cancel
|
||||
case None =>
|
||||
().pure[F]
|
||||
})
|
||||
|
||||
private def logError(msg: => String)(fa: F[Unit]): F[Unit] =
|
||||
fa.attempt.flatMap {
|
||||
case Right(_) => ().pure[F]
|
||||
case Left(ex) => logger.ferror(ex)(msg).map(_ => ())
|
||||
}
|
||||
|
||||
private def logThrow[A](msg: => String)(fa: F[A]): F[A] =
|
||||
fa.attempt
|
||||
.flatMap({
|
||||
case r @ Right(_) => (r: Either[Throwable, A]).pure[F]
|
||||
case l @ Left(ex) => logger.ferror(ex)(msg).map(_ => (l: Either[Throwable, A]))
|
||||
})
|
||||
.rethrow
|
||||
}
|
||||
|
||||
object PeriodicSchedulerImpl {
|
||||
def emptyState[F[_]]: State[F] =
|
||||
State(false, None)
|
||||
|
||||
case class State[F[_]](
|
||||
shutdownRequest: Boolean,
|
||||
scheduledNotify: Option[Fiber[F, Unit]]
|
||||
) {
|
||||
def requestShutdown: (State[F], Unit) =
|
||||
(copy(shutdownRequest = true), ())
|
||||
|
||||
def setNotify(fb: Fiber[F, Unit]): (State[F], Unit) =
|
||||
(copy(scheduledNotify = Some(fb)), ())
|
||||
|
||||
def clearNotify: (State[F], Option[Fiber[F, Unit]]) =
|
||||
(copy(scheduledNotify = None), scheduledNotify)
|
||||
|
||||
}
|
||||
}
|
@ -34,6 +34,9 @@ case class SchedulerBuilder[F[_]: ConcurrentEffect: ContextShift](
|
||||
def withLogSink(sink: LogSink[F]): SchedulerBuilder[F] =
|
||||
copy(logSink = sink)
|
||||
|
||||
def withQueue(queue: JobQueue[F]): SchedulerBuilder[F] =
|
||||
copy(queue = Resource.pure[F, JobQueue[F]](queue))
|
||||
|
||||
def serve: Resource[F, Scheduler[F]] =
|
||||
resource.evalMap(sch => ConcurrentEffect[F].start(sch.start.compile.drain).map(_ => sch))
|
||||
|
||||
|
Reference in New Issue
Block a user