mirror of
https://github.com/TheAnachronism/docspell.git
synced 2025-06-22 02:18:26 +00:00
Move scheduler code into separate module
This commit is contained in:
@ -0,0 +1,31 @@
|
||||
package docspell.scheduler.impl
|
||||
|
||||
import cats.effect._
|
||||
import docspell.scheduler._
|
||||
import docspell.store.queue.{JobQueue, PeriodicTaskStore}
|
||||
import fs2.concurrent.SignallingRef
|
||||
|
||||
object PeriodicSchedulerBuilder {
|
||||
|
||||
def build[F[_]: Async](
|
||||
cfg: PeriodicSchedulerConfig,
|
||||
sch: Scheduler[F],
|
||||
queue: JobQueue[F],
|
||||
store: PeriodicTaskStore[F],
|
||||
notifyJoex: F[Unit]
|
||||
): Resource[F, PeriodicScheduler[F]] =
|
||||
for {
|
||||
waiter <- Resource.eval(SignallingRef(true))
|
||||
state <- Resource.eval(SignallingRef(PeriodicSchedulerImpl.emptyState[F]))
|
||||
psch = new PeriodicSchedulerImpl[F](
|
||||
cfg,
|
||||
sch,
|
||||
queue,
|
||||
store,
|
||||
notifyJoex,
|
||||
waiter,
|
||||
state
|
||||
)
|
||||
_ <- Resource.eval(psch.init)
|
||||
} yield psch
|
||||
}
|
@ -0,0 +1,182 @@
|
||||
/*
|
||||
* Copyright 2020 Eike K. & Contributors
|
||||
*
|
||||
* SPDX-License-Identifier: AGPL-3.0-or-later
|
||||
*/
|
||||
|
||||
package docspell.scheduler.impl
|
||||
|
||||
import cats.effect._
|
||||
import cats.implicits._
|
||||
import fs2._
|
||||
import fs2.concurrent.SignallingRef
|
||||
|
||||
import docspell.common._
|
||||
import docspell.scheduler._
|
||||
import docspell.scheduler.impl.PeriodicSchedulerImpl.State
|
||||
import docspell.store.queue._
|
||||
import docspell.store.records.RPeriodicTask
|
||||
|
||||
import eu.timepit.fs2cron.calev.CalevScheduler
|
||||
|
||||
final class PeriodicSchedulerImpl[F[_]: Async](
|
||||
val config: PeriodicSchedulerConfig,
|
||||
sch: Scheduler[F],
|
||||
queue: JobQueue[F],
|
||||
store: PeriodicTaskStore[F],
|
||||
joexNotifyAll: F[Unit],
|
||||
waiter: SignallingRef[F, Boolean],
|
||||
state: SignallingRef[F, State[F]]
|
||||
) extends PeriodicScheduler[F] {
|
||||
private[this] val logger = docspell.logging.getLogger[F]
|
||||
|
||||
def start: Stream[F, Nothing] =
|
||||
logger.stream.info("Starting periodic scheduler").drain ++
|
||||
mainLoop
|
||||
|
||||
def shutdown: F[Unit] =
|
||||
state.modify(_.requestShutdown)
|
||||
|
||||
def periodicAwake: F[Fiber[F, Throwable, Unit]] =
|
||||
Async[F].start(
|
||||
Stream
|
||||
.awakeEvery[F](config.wakeupPeriod.toScala)
|
||||
.evalMap(_ => logger.debug("Periodic awake reached") *> notifyChange)
|
||||
.compile
|
||||
.drain
|
||||
)
|
||||
|
||||
def notifyChange: F[Unit] =
|
||||
waiter.update(b => !b)
|
||||
|
||||
// internal
|
||||
|
||||
/** On startup, get all periodic jobs from this scheduler and remove the mark, so they
|
||||
* get picked up again.
|
||||
*/
|
||||
def init: F[Unit] =
|
||||
logError("Error clearing marks")(store.clearMarks(config.name))
|
||||
|
||||
def mainLoop: Stream[F, Nothing] = {
|
||||
val body: F[Boolean] =
|
||||
for {
|
||||
_ <- logger.debug(s"Going into main loop")
|
||||
now <- Timestamp.current[F]
|
||||
_ <- logger.debug(s"Looking for next periodic task")
|
||||
go <- logThrow("Error getting next task")(
|
||||
store
|
||||
.takeNext(config.name, None)
|
||||
.use {
|
||||
case Marked.Found(pj) =>
|
||||
logger
|
||||
.debug(s"Found periodic task '${pj.subject}/${pj.timer.asString}'") *>
|
||||
(if (isTriggered(pj, now)) submitJob(pj)
|
||||
else scheduleNotify(pj).map(_ => false))
|
||||
case Marked.NotFound =>
|
||||
logger.debug("No periodic task found") *> false.pure[F]
|
||||
case Marked.NotMarkable =>
|
||||
logger.debug("Periodic job cannot be marked. Trying again.") *> true
|
||||
.pure[F]
|
||||
}
|
||||
)
|
||||
} yield go
|
||||
|
||||
Stream
|
||||
.eval(state.get.map(_.shutdownRequest))
|
||||
.evalTap(
|
||||
if (_) logger.info("Stopping main loop due to shutdown request.")
|
||||
else ().pure[F]
|
||||
)
|
||||
.flatMap(if (_) Stream.empty else Stream.eval(cancelNotify *> body))
|
||||
.flatMap {
|
||||
case true =>
|
||||
mainLoop
|
||||
case false =>
|
||||
logger.stream.debug(s"Waiting for notify").drain ++
|
||||
waiter.discrete.take(2).drain ++
|
||||
logger.stream.debug(s"Notify signal, going into main loop").drain ++
|
||||
mainLoop
|
||||
}
|
||||
}
|
||||
|
||||
def isTriggered(pj: RPeriodicTask, now: Timestamp): Boolean =
|
||||
pj.nextrun < now
|
||||
|
||||
def submitJob(pj: RPeriodicTask): F[Boolean] =
|
||||
store
|
||||
.findNonFinalJob(pj.id)
|
||||
.flatMap {
|
||||
case Some(job) =>
|
||||
logger.info(
|
||||
s"There is already a job with non-final state '${job.state}' in the queue"
|
||||
) *> scheduleNotify(pj) *> false.pure[F]
|
||||
|
||||
case None =>
|
||||
logger.info(s"Submitting job for periodic task '${pj.task.id}'") *>
|
||||
pj.toJob.flatMap(queue.insert) *> notifyJoex *> true.pure[F]
|
||||
}
|
||||
|
||||
def notifyJoex: F[Unit] =
|
||||
sch.notifyChange *> joexNotifyAll
|
||||
|
||||
def scheduleNotify(pj: RPeriodicTask): F[Unit] =
|
||||
Timestamp
|
||||
.current[F]
|
||||
.flatMap(now =>
|
||||
logger.debug(
|
||||
s"Scheduling next notify for timer ${pj.timer.asString} -> ${pj.timer.nextElapse(now.toUtcDateTime)}"
|
||||
)
|
||||
) *>
|
||||
Async[F]
|
||||
.start(
|
||||
CalevScheduler
|
||||
.utc[F]
|
||||
.sleep(pj.timer)
|
||||
.evalMap(_ => notifyChange)
|
||||
.compile
|
||||
.drain
|
||||
)
|
||||
.flatMap(fb => state.modify(_.setNotify(fb)))
|
||||
|
||||
def cancelNotify: F[Unit] =
|
||||
state
|
||||
.modify(_.clearNotify)
|
||||
.flatMap {
|
||||
case Some(fb) =>
|
||||
fb.cancel
|
||||
case None =>
|
||||
().pure[F]
|
||||
}
|
||||
|
||||
private def logError(msg: => String)(fa: F[Unit]): F[Unit] =
|
||||
fa.attempt.flatMap {
|
||||
case Right(_) => ().pure[F]
|
||||
case Left(ex) => logger.error(ex)(msg).map(_ => ())
|
||||
}
|
||||
|
||||
private def logThrow[A](msg: => String)(fa: F[A]): F[A] =
|
||||
fa.attempt.flatMap {
|
||||
case r @ Right(_) => (r: Either[Throwable, A]).pure[F]
|
||||
case l @ Left(ex) => logger.error(ex)(msg).map(_ => (l: Either[Throwable, A]))
|
||||
}.rethrow
|
||||
}
|
||||
|
||||
object PeriodicSchedulerImpl {
|
||||
def emptyState[F[_]]: State[F] =
|
||||
State(false, None)
|
||||
|
||||
case class State[F[_]](
|
||||
shutdownRequest: Boolean,
|
||||
scheduledNotify: Option[Fiber[F, Throwable, Unit]]
|
||||
) {
|
||||
def requestShutdown: (State[F], Unit) =
|
||||
(copy(shutdownRequest = true), ())
|
||||
|
||||
def setNotify(fb: Fiber[F, Throwable, Unit]): (State[F], Unit) =
|
||||
(copy(scheduledNotify = Some(fb)), ())
|
||||
|
||||
def clearNotify: (State[F], Option[Fiber[F, Throwable, Unit]]) =
|
||||
(copy(scheduledNotify = None), scheduledNotify)
|
||||
|
||||
}
|
||||
}
|
@ -0,0 +1,97 @@
|
||||
/*
|
||||
* Copyright 2020 Eike K. & Contributors
|
||||
*
|
||||
* SPDX-License-Identifier: AGPL-3.0-or-later
|
||||
*/
|
||||
|
||||
package docspell.scheduler.impl
|
||||
|
||||
import cats.effect._
|
||||
import cats.effect.std.Semaphore
|
||||
import cats.implicits._
|
||||
import fs2.concurrent.SignallingRef
|
||||
|
||||
import docspell.scheduler._
|
||||
import docspell.notification.api.EventSink
|
||||
import docspell.pubsub.api.PubSubT
|
||||
import docspell.store.Store
|
||||
import docspell.store.queue.JobQueue
|
||||
|
||||
case class SchedulerBuilder[F[_]: Async](
|
||||
config: SchedulerConfig,
|
||||
tasks: JobTaskRegistry[F],
|
||||
store: Store[F],
|
||||
queue: Resource[F, JobQueue[F]],
|
||||
logSink: LogSink[F],
|
||||
pubSub: PubSubT[F],
|
||||
eventSink: EventSink[F]
|
||||
) {
|
||||
|
||||
def withConfig(cfg: SchedulerConfig): SchedulerBuilder[F] =
|
||||
copy(config = cfg)
|
||||
|
||||
def withTaskRegistry(reg: JobTaskRegistry[F]): SchedulerBuilder[F] =
|
||||
copy(tasks = reg)
|
||||
|
||||
def withTask[A](task: JobTask[F]): SchedulerBuilder[F] =
|
||||
withTaskRegistry(tasks.withTask(task))
|
||||
|
||||
def withQueue(queue: Resource[F, JobQueue[F]]): SchedulerBuilder[F] =
|
||||
copy(queue = queue)
|
||||
|
||||
def withLogSink(sink: LogSink[F]): SchedulerBuilder[F] =
|
||||
copy(logSink = sink)
|
||||
|
||||
def withQueue(queue: JobQueue[F]): SchedulerBuilder[F] =
|
||||
copy(queue = Resource.pure[F, JobQueue[F]](queue))
|
||||
|
||||
def withPubSub(pubSubT: PubSubT[F]): SchedulerBuilder[F] =
|
||||
copy(pubSub = pubSubT)
|
||||
|
||||
def withEventSink(sink: EventSink[F]): SchedulerBuilder[F] =
|
||||
copy(eventSink = sink)
|
||||
|
||||
def serve: Resource[F, Scheduler[F]] =
|
||||
resource.evalMap(sch => Async[F].start(sch.start.compile.drain).map(_ => sch))
|
||||
|
||||
def resource: Resource[F, Scheduler[F]] = {
|
||||
val scheduler: Resource[F, SchedulerImpl[F]] = for {
|
||||
jq <- queue
|
||||
waiter <- Resource.eval(SignallingRef(true))
|
||||
state <- Resource.eval(SignallingRef(SchedulerImpl.emptyState[F]))
|
||||
perms <- Resource.eval(Semaphore(config.poolSize.toLong))
|
||||
} yield new SchedulerImpl[F](
|
||||
config,
|
||||
jq,
|
||||
pubSub,
|
||||
eventSink,
|
||||
tasks,
|
||||
store,
|
||||
logSink,
|
||||
state,
|
||||
waiter,
|
||||
perms
|
||||
)
|
||||
|
||||
scheduler.evalTap(_.init).map(s => s: Scheduler[F])
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
object SchedulerBuilder {
|
||||
|
||||
def apply[F[_]: Async](
|
||||
config: SchedulerConfig,
|
||||
store: Store[F]
|
||||
): SchedulerBuilder[F] =
|
||||
new SchedulerBuilder[F](
|
||||
config,
|
||||
JobTaskRegistry.empty[F],
|
||||
store,
|
||||
JobQueue(store),
|
||||
LogSink.db[F](store),
|
||||
PubSubT.noop[F],
|
||||
EventSink.silent[F]
|
||||
)
|
||||
|
||||
}
|
@ -0,0 +1,352 @@
|
||||
/*
|
||||
* Copyright 2020 Eike K. & Contributors
|
||||
*
|
||||
* SPDX-License-Identifier: AGPL-3.0-or-later
|
||||
*/
|
||||
|
||||
package docspell.scheduler.impl
|
||||
|
||||
import cats.data.OptionT
|
||||
import cats.effect._
|
||||
import cats.effect.std.Semaphore
|
||||
import cats.implicits._
|
||||
import fs2.Stream
|
||||
import fs2.concurrent.SignallingRef
|
||||
|
||||
import docspell.scheduler.msg.JobDone
|
||||
import docspell.common._
|
||||
import docspell.scheduler._
|
||||
import docspell.scheduler.impl.SchedulerImpl._
|
||||
import docspell.notification.api.Event
|
||||
import docspell.notification.api.EventSink
|
||||
import docspell.pubsub.api.PubSubT
|
||||
import docspell.store.Store
|
||||
import docspell.store.queries.QJob
|
||||
import docspell.store.queue.JobQueue
|
||||
import docspell.store.records.RJob
|
||||
|
||||
import io.circe.Json
|
||||
|
||||
final class SchedulerImpl[F[_]: Async](
|
||||
val config: SchedulerConfig,
|
||||
queue: JobQueue[F],
|
||||
pubSub: PubSubT[F],
|
||||
eventSink: EventSink[F],
|
||||
tasks: JobTaskRegistry[F],
|
||||
store: Store[F],
|
||||
logSink: LogSink[F],
|
||||
state: SignallingRef[F, State[F]],
|
||||
waiter: SignallingRef[F, Boolean],
|
||||
permits: Semaphore[F]
|
||||
) extends Scheduler[F] {
|
||||
|
||||
private[this] val logger = docspell.logging.getLogger[F]
|
||||
|
||||
/** On startup, get all jobs in state running from this scheduler and put them into
|
||||
* waiting state, so they get picked up again.
|
||||
*/
|
||||
def init: F[Unit] =
|
||||
QJob.runningToWaiting(config.name, store)
|
||||
|
||||
def periodicAwake: F[Fiber[F, Throwable, Unit]] =
|
||||
Async[F].start(
|
||||
Stream
|
||||
.awakeEvery[F](config.wakeupPeriod.toScala)
|
||||
.evalMap(_ => logger.debug("Periodic awake reached") *> notifyChange)
|
||||
.compile
|
||||
.drain
|
||||
)
|
||||
|
||||
def getRunning: F[Vector[RJob]] =
|
||||
state.get.flatMap(s => QJob.findAll(s.getRunning, store))
|
||||
|
||||
def requestCancel(jobId: Ident): F[Boolean] =
|
||||
logger.info(s"Scheduler requested to cancel job: ${jobId.id}") *>
|
||||
state.get.flatMap(_.cancelRequest(jobId) match {
|
||||
case Some(ct) => ct.map(_ => true)
|
||||
case None =>
|
||||
(for {
|
||||
job <- OptionT(store.transact(RJob.findByIdAndWorker(jobId, config.name)))
|
||||
_ <- OptionT.liftF(
|
||||
if (job.isInProgress) executeCancel(job)
|
||||
else ().pure[F]
|
||||
)
|
||||
} yield true)
|
||||
.getOrElseF(
|
||||
logger.warn(s"Job ${jobId.id} not found, cannot cancel.").map(_ => false)
|
||||
)
|
||||
})
|
||||
|
||||
def notifyChange: F[Unit] =
|
||||
waiter.update(b => !b)
|
||||
|
||||
def shutdown(cancelAll: Boolean): F[Unit] = {
|
||||
val doCancel =
|
||||
state.get.flatMap(_.cancelTokens.values.toList.traverse(identity)).map(_ => ())
|
||||
|
||||
val runShutdown =
|
||||
state.modify(_.requestShutdown) *> (if (cancelAll) doCancel else ().pure[F])
|
||||
|
||||
val wait = Stream
|
||||
.eval(runShutdown)
|
||||
.evalMap(_ => logger.info("Scheduler is shutting down now."))
|
||||
.flatMap(_ =>
|
||||
Stream.eval(state.get) ++ Stream
|
||||
.suspend(state.discrete.takeWhile(_.getRunning.nonEmpty))
|
||||
)
|
||||
.flatMap { state =>
|
||||
if (state.getRunning.isEmpty) Stream.eval(logger.info("No jobs running."))
|
||||
else
|
||||
Stream.eval(
|
||||
logger.info(s"Waiting for ${state.getRunning.size} jobs to finish.")
|
||||
) ++
|
||||
Stream.emit(state)
|
||||
}
|
||||
|
||||
(wait.drain ++ Stream.emit(())).compile.lastOrError
|
||||
}
|
||||
|
||||
def start: Stream[F, Nothing] =
|
||||
logger.stream.info("Starting scheduler").drain ++
|
||||
mainLoop
|
||||
|
||||
def mainLoop: Stream[F, Nothing] = {
|
||||
val body: F[Boolean] =
|
||||
for {
|
||||
_ <- permits.available.flatMap(a =>
|
||||
logger.debug(s"Try to acquire permit ($a free)")
|
||||
)
|
||||
_ <- permits.acquire
|
||||
_ <- logger.debug("New permit acquired")
|
||||
down <- state.get.map(_.shutdownRequest)
|
||||
rjob <-
|
||||
if (down)
|
||||
logger.info("") *> permits.release *> (None: Option[RJob]).pure[F]
|
||||
else
|
||||
queue.nextJob(
|
||||
group => state.modify(_.nextPrio(group, config.countingScheme)),
|
||||
config.name,
|
||||
config.retryDelay
|
||||
)
|
||||
_ <- logger.debug(s"Next job found: ${rjob.map(_.info)}")
|
||||
_ <- rjob.map(execute).getOrElse(permits.release)
|
||||
} yield rjob.isDefined
|
||||
|
||||
Stream
|
||||
.eval(state.get.map(_.shutdownRequest))
|
||||
.evalTap(
|
||||
if (_) logger.info("Stopping main loop due to shutdown request.")
|
||||
else ().pure[F]
|
||||
)
|
||||
.flatMap(if (_) Stream.empty else Stream.eval(body))
|
||||
.flatMap {
|
||||
case true =>
|
||||
mainLoop
|
||||
case false =>
|
||||
logger.stream.debug(s"Waiting for notify").drain ++
|
||||
waiter.discrete.take(2).drain ++
|
||||
logger.stream.debug(s"Notify signal, going into main loop").drain ++
|
||||
mainLoop
|
||||
}
|
||||
}
|
||||
|
||||
private def executeCancel(job: RJob): F[Unit] = {
|
||||
val task = for {
|
||||
jobtask <-
|
||||
tasks
|
||||
.find(job.task)
|
||||
.toRight(s"This executor cannot run tasks with name: ${job.task}")
|
||||
} yield jobtask
|
||||
|
||||
task match {
|
||||
case Left(err) =>
|
||||
logger.error(s"Unable to run cancellation task for job ${job.info}: $err")
|
||||
case Right(t) =>
|
||||
for {
|
||||
_ <-
|
||||
logger.debug(s"Creating context for job ${job.info} to run cancellation $t")
|
||||
ctx <- Context[F, String](job, job.args, config, logSink, store)
|
||||
_ <- t.onCancel.run(ctx)
|
||||
_ <- state.modify(_.markCancelled(job))
|
||||
_ <- onFinish(job, JobTaskResult.empty, JobState.Cancelled)
|
||||
_ <- ctx.logger.warn("Job has been cancelled.")
|
||||
_ <- logger.debug(s"Job ${job.info} has been cancelled.")
|
||||
} yield ()
|
||||
}
|
||||
}
|
||||
|
||||
def execute(job: RJob): F[Unit] = {
|
||||
val task = for {
|
||||
jobtask <-
|
||||
tasks
|
||||
.find(job.task)
|
||||
.toRight(s"This executor cannot run tasks with name: ${job.task}")
|
||||
} yield jobtask
|
||||
|
||||
task match {
|
||||
case Left(err) =>
|
||||
logger.error(s"Unable to start a task for job ${job.info}: $err")
|
||||
case Right(t) =>
|
||||
for {
|
||||
_ <- logger.debug(s"Creating context for job ${job.info} to run $t")
|
||||
ctx <- Context[F, String](job, job.args, config, logSink, store)
|
||||
jot = wrapTask(job, t.task, ctx)
|
||||
tok <- forkRun(job, jot.run(ctx), t.onCancel.run(ctx), ctx)
|
||||
_ <- state.modify(_.addRunning(job, tok))
|
||||
} yield ()
|
||||
}
|
||||
}
|
||||
|
||||
def onFinish(job: RJob, result: JobTaskResult, finishState: JobState): F[Unit] =
|
||||
for {
|
||||
_ <- logger.debug(s"Job ${job.info} done $finishState. Releasing resources.")
|
||||
_ <- permits.release *> permits.available.flatMap(a =>
|
||||
logger.debug(s"Permit released ($a free)")
|
||||
)
|
||||
_ <- state.modify(_.removeRunning(job))
|
||||
_ <- QJob.setFinalState(job.id, finishState, store)
|
||||
_ <- Sync[F].whenA(JobState.isDone(finishState))(
|
||||
pubSub.publish1IgnoreErrors(
|
||||
JobDone.topic,
|
||||
JobDone(job.id, job.group, job.task, job.args, finishState)
|
||||
)
|
||||
)
|
||||
_ <- Sync[F].whenA(JobState.isDone(finishState))(
|
||||
eventSink.offer(
|
||||
Event.JobDone(
|
||||
job.id,
|
||||
job.group,
|
||||
job.task,
|
||||
job.args,
|
||||
job.state,
|
||||
job.subject,
|
||||
job.submitter,
|
||||
result.json.getOrElse(Json.Null),
|
||||
result.message
|
||||
)
|
||||
)
|
||||
)
|
||||
} yield ()
|
||||
|
||||
def onStart(job: RJob): F[Unit] =
|
||||
QJob.setRunning(
|
||||
job.id,
|
||||
config.name,
|
||||
store
|
||||
) // also increments retries if current state=stuck
|
||||
|
||||
def wrapTask(
|
||||
job: RJob,
|
||||
task: Task[F, String, JobTaskResult],
|
||||
ctx: Context[F, String]
|
||||
): Task[F, String, Unit] =
|
||||
task
|
||||
.mapF(fa => onStart(job) *> logger.debug("Starting task now") *> fa)
|
||||
.mapF(_.attempt.flatMap {
|
||||
case Right(result) =>
|
||||
logger.info(s"Job execution successful: ${job.info}")
|
||||
ctx.logger.info("Job execution successful") *>
|
||||
(JobState.Success: JobState, result).pure[F]
|
||||
case Left(ex) =>
|
||||
state.get.map(_.wasCancelled(job)).flatMap {
|
||||
case true =>
|
||||
logger.error(ex)(s"Job ${job.info} execution failed (cancel = true)")
|
||||
ctx.logger.error(ex)("Job execution failed (cancel = true)") *>
|
||||
(JobState.Cancelled: JobState, JobTaskResult.empty).pure[F]
|
||||
case false =>
|
||||
QJob.exceedsRetries(job.id, config.retries, store).flatMap {
|
||||
case true =>
|
||||
logger.error(ex)(s"Job ${job.info} execution failed. Retries exceeded.")
|
||||
ctx.logger
|
||||
.error(ex)(s"Job ${job.info} execution failed. Retries exceeded.")
|
||||
.map(_ => (JobState.Failed: JobState, JobTaskResult.empty))
|
||||
case false =>
|
||||
logger.error(ex)(s"Job ${job.info} execution failed. Retrying later.")
|
||||
ctx.logger
|
||||
.error(ex)(s"Job ${job.info} execution failed. Retrying later.")
|
||||
.map(_ => (JobState.Stuck: JobState, JobTaskResult.empty))
|
||||
}
|
||||
}
|
||||
})
|
||||
.mapF(_.attempt.flatMap {
|
||||
case Right((jstate, result)) =>
|
||||
onFinish(job, result, jstate)
|
||||
case Left(ex) =>
|
||||
logger.error(ex)(s"Error happened during post-processing of ${job.info}!")
|
||||
// we don't know the real outcome here…
|
||||
// since tasks should be idempotent, set it to stuck. if above has failed, this might fail anyways
|
||||
onFinish(job, JobTaskResult.empty, JobState.Stuck)
|
||||
})
|
||||
|
||||
def forkRun(
|
||||
job: RJob,
|
||||
code: F[Unit],
|
||||
onCancel: F[Unit],
|
||||
ctx: Context[F, String]
|
||||
): F[F[Unit]] =
|
||||
logger.debug(s"Forking job ${job.info}") *>
|
||||
Async[F]
|
||||
.start(code)
|
||||
.map(fiber =>
|
||||
logger.debug(s"Cancelling job ${job.info}") *>
|
||||
fiber.cancel *>
|
||||
onCancel.attempt.map {
|
||||
case Right(_) => ()
|
||||
case Left(ex) =>
|
||||
logger.error(ex)(s"Task's cancelling code failed. Job ${job.info}.")
|
||||
()
|
||||
} *>
|
||||
state.modify(_.markCancelled(job)) *>
|
||||
onFinish(job, JobTaskResult.empty, JobState.Cancelled) *>
|
||||
ctx.logger.warn("Job has been cancelled.") *>
|
||||
logger.debug(s"Job ${job.info} has been cancelled.")
|
||||
)
|
||||
}
|
||||
|
||||
object SchedulerImpl {
|
||||
|
||||
type CancelToken[F[_]] = F[Unit]
|
||||
|
||||
def emptyState[F[_]]: State[F] =
|
||||
State(Map.empty, Set.empty, Map.empty, false)
|
||||
|
||||
case class State[F[_]](
|
||||
counters: Map[Ident, CountingScheme],
|
||||
cancelled: Set[Ident],
|
||||
cancelTokens: Map[Ident, CancelToken[F]],
|
||||
shutdownRequest: Boolean
|
||||
) {
|
||||
|
||||
def nextPrio(group: Ident, initial: CountingScheme): (State[F], Priority) = {
|
||||
val (cs, prio) = counters.getOrElse(group, initial).nextPriority
|
||||
(copy(counters = counters.updated(group, cs)), prio)
|
||||
}
|
||||
|
||||
def addRunning(job: RJob, token: CancelToken[F]): (State[F], Unit) =
|
||||
(
|
||||
State(counters, cancelled, cancelTokens.updated(job.id, token), shutdownRequest),
|
||||
()
|
||||
)
|
||||
|
||||
def removeRunning(job: RJob): (State[F], Unit) =
|
||||
(
|
||||
copy(cancelled = cancelled - job.id, cancelTokens = cancelTokens.removed(job.id)),
|
||||
()
|
||||
)
|
||||
|
||||
def markCancelled(job: RJob): (State[F], Unit) =
|
||||
(copy(cancelled = cancelled + job.id), ())
|
||||
|
||||
def wasCancelled(job: RJob): Boolean =
|
||||
cancelled.contains(job.id)
|
||||
|
||||
def cancelRequest(id: Ident): Option[F[Unit]] =
|
||||
cancelTokens.get(id)
|
||||
|
||||
def getRunning: Seq[Ident] =
|
||||
cancelTokens.keys.toSeq
|
||||
|
||||
def requestShutdown: (State[F], Unit) =
|
||||
(copy(shutdownRequest = true), ())
|
||||
}
|
||||
}
|
Reference in New Issue
Block a user