mirror of
https://github.com/TheAnachronism/docspell.git
synced 2025-06-21 09:58:26 +00:00
Fix cancelling jobs
A request to cancel a job was not processed correctly. The cancelling routine of a task must run, regardless of the (non-final) state. Now it works like this: if a job is currently running, it is interrupted and its cancel routine is invoked. It then enters "cancelled" state. If it is stuck, it is loaded and only its cancel routine is run. If it is in a final state or waiting, it is removed from the queue.
This commit is contained in:
@ -2,6 +2,7 @@ package docspell.joex.process
|
||||
|
||||
import cats.implicits._
|
||||
import cats.effect._
|
||||
import cats.data.OptionT
|
||||
import fs2.Stream
|
||||
import docspell.common.{ItemState, ProcessItemArgs}
|
||||
import docspell.joex.Config
|
||||
@ -11,15 +12,22 @@ import docspell.store.records.RItem
|
||||
import docspell.ftsclient.FtsClient
|
||||
|
||||
object ItemHandler {
|
||||
def onCancel[F[_]: Sync: ContextShift]: Task[F, ProcessItemArgs, Unit] =
|
||||
logWarn("Now cancelling. Deleting potentially created data.").flatMap(_ =>
|
||||
deleteByFileIds.flatMap(_ => deleteFiles)
|
||||
type Args = ProcessItemArgs
|
||||
|
||||
def onCancel[F[_]: Sync: ContextShift]: Task[F, Args, Unit] =
|
||||
logWarn("Now cancelling.").flatMap(_ =>
|
||||
markItemCreated.flatMap {
|
||||
case true =>
|
||||
Task.pure(())
|
||||
case false =>
|
||||
deleteByFileIds[F].flatMap(_ => deleteFiles)
|
||||
}
|
||||
)
|
||||
|
||||
def newItem[F[_]: ConcurrentEffect: ContextShift](
|
||||
cfg: Config,
|
||||
fts: FtsClient[F]
|
||||
): Task[F, ProcessItemArgs, Unit] =
|
||||
): Task[F, Args, Unit] =
|
||||
CreateItem[F]
|
||||
.flatMap(itemStateTask(ItemState.Processing))
|
||||
.flatMap(safeProcess[F](cfg, fts))
|
||||
@ -34,13 +42,13 @@ object ItemHandler {
|
||||
.map(_ => data)
|
||||
)
|
||||
|
||||
def isLastRetry[F[_]: Sync]: Task[F, ProcessItemArgs, Boolean] =
|
||||
def isLastRetry[F[_]: Sync]: Task[F, Args, Boolean] =
|
||||
Task(_.isLastRetry)
|
||||
|
||||
def safeProcess[F[_]: ConcurrentEffect: ContextShift](
|
||||
cfg: Config,
|
||||
fts: FtsClient[F]
|
||||
)(data: ItemData): Task[F, ProcessItemArgs, ItemData] =
|
||||
)(data: ItemData): Task[F, Args, ItemData] =
|
||||
isLastRetry[F].flatMap {
|
||||
case true =>
|
||||
ProcessItem[F](cfg, fts)(data).attempt.flatMap({
|
||||
@ -56,24 +64,50 @@ object ItemHandler {
|
||||
ProcessItem[F](cfg, fts)(data).flatMap(itemStateTask(ItemState.Created))
|
||||
}
|
||||
|
||||
def deleteByFileIds[F[_]: Sync: ContextShift]: Task[F, ProcessItemArgs, Unit] =
|
||||
private def markItemCreated[F[_]: Sync]: Task[F, Args, Boolean] =
|
||||
Task { ctx =>
|
||||
val fileMetaIds = ctx.args.files.map(_.fileMetaId).toSet
|
||||
(for {
|
||||
item <- OptionT(ctx.store.transact(QItem.findOneByFileIds(fileMetaIds.toSeq)))
|
||||
_ <- OptionT.liftF(
|
||||
ctx.logger.info("Processing cancelled. Marking item as created anyways.")
|
||||
)
|
||||
_ <- OptionT.liftF(
|
||||
ctx.store
|
||||
.transact(
|
||||
RItem.updateState(item.id, ItemState.Created, ItemState.invalidStates)
|
||||
)
|
||||
)
|
||||
} yield true)
|
||||
.getOrElseF(
|
||||
ctx.logger.warn("Processing cancelled. No item created").map(_ => false)
|
||||
)
|
||||
}
|
||||
|
||||
def deleteByFileIds[F[_]: Sync: ContextShift]: Task[F, Args, Unit] =
|
||||
Task { ctx =>
|
||||
for {
|
||||
items <- ctx.store.transact(QItem.findByFileIds(ctx.args.files.map(_.fileMetaId)))
|
||||
_ <- ctx.logger.info(s"Deleting items ${items.map(_.id.id)}")
|
||||
_ <- items.traverse(i => QItem.delete(ctx.store)(i.id, ctx.args.meta.collective))
|
||||
_ <-
|
||||
if (items.nonEmpty) ctx.logger.info(s"Deleting items ${items.map(_.id.id)}")
|
||||
else
|
||||
ctx.logger.info(
|
||||
s"No items found for file ids ${ctx.args.files.map(_.fileMetaId)}"
|
||||
)
|
||||
_ <- items.traverse(i => QItem.delete(ctx.store)(i.id, ctx.args.meta.collective))
|
||||
} yield ()
|
||||
}
|
||||
|
||||
private def deleteFiles[F[_]: Sync]: Task[F, ProcessItemArgs, Unit] =
|
||||
private def deleteFiles[F[_]: Sync]: Task[F, Args, Unit] =
|
||||
Task(ctx =>
|
||||
Stream
|
||||
.emits(ctx.args.files.map(_.fileMetaId.id))
|
||||
.flatMap(id => ctx.store.bitpeace.delete(id).attempt.drain)
|
||||
.compile
|
||||
.drain
|
||||
ctx.logger.info("Deleting input files …") *>
|
||||
Stream
|
||||
.emits(ctx.args.files.map(_.fileMetaId.id))
|
||||
.flatMap(id => ctx.store.bitpeace.delete(id).attempt.drain)
|
||||
.compile
|
||||
.drain
|
||||
)
|
||||
|
||||
private def logWarn[F[_]](msg: => String): Task[F, ProcessItemArgs, Unit] =
|
||||
private def logWarn[F[_]](msg: => String): Task[F, Args, Unit] =
|
||||
Task(_.logger.warn(msg))
|
||||
}
|
||||
|
@ -30,5 +30,4 @@ object ProcessItem {
|
||||
.flatMap(FindProposal[F](cfg.processing))
|
||||
.flatMap(EvalProposals[F])
|
||||
.flatMap(SaveProposals[F])
|
||||
|
||||
}
|
||||
|
@ -3,6 +3,7 @@ package docspell.joex.scheduler
|
||||
import fs2.Stream
|
||||
import cats.implicits._
|
||||
import cats.effect.concurrent.Semaphore
|
||||
import cats.data.OptionT
|
||||
import docspell.common._
|
||||
import docspell.common.syntax.all._
|
||||
import docspell.store.queue.JobQueue
|
||||
@ -51,7 +52,16 @@ final class SchedulerImpl[F[_]: ConcurrentEffect: ContextShift](
|
||||
state.get.flatMap(_.cancelRequest(jobId) match {
|
||||
case Some(ct) => ct.map(_ => true)
|
||||
case None =>
|
||||
logger.fwarn(s"Job ${jobId.id} not found, cannot cancel.").map(_ => false)
|
||||
(for {
|
||||
job <- OptionT(store.transact(RJob.findByIdAndWorker(jobId, config.name)))
|
||||
_ <- OptionT.liftF(
|
||||
if (job.isInProgress) executeCancel(job)
|
||||
else ().pure[F]
|
||||
)
|
||||
} yield true)
|
||||
.getOrElseF(
|
||||
logger.fwarn(s"Job ${jobId.id} not found, cannot cancel.").map(_ => false)
|
||||
)
|
||||
})
|
||||
|
||||
def notifyChange: F[Unit] =
|
||||
@ -127,6 +137,31 @@ final class SchedulerImpl[F[_]: ConcurrentEffect: ContextShift](
|
||||
})
|
||||
}
|
||||
|
||||
private def executeCancel(job: RJob): F[Unit] = {
|
||||
val task = for {
|
||||
jobtask <-
|
||||
tasks
|
||||
.find(job.task)
|
||||
.toRight(s"This executor cannot run tasks with name: ${job.task}")
|
||||
} yield jobtask
|
||||
|
||||
task match {
|
||||
case Left(err) =>
|
||||
logger.ferror(s"Unable to run cancellation task for job ${job.info}: $err")
|
||||
case Right(t) =>
|
||||
for {
|
||||
_ <-
|
||||
logger.fdebug(s"Creating context for job ${job.info} to run cancellation $t")
|
||||
ctx <- Context[F, String](job, job.args, config, logSink, blocker, store)
|
||||
_ <- t.onCancel.run(ctx)
|
||||
_ <- state.modify(_.markCancelled(job))
|
||||
_ <- onFinish(job, JobState.Cancelled)
|
||||
_ <- ctx.logger.warn("Job has been cancelled.")
|
||||
_ <- logger.fdebug(s"Job ${job.info} has been cancelled.")
|
||||
} yield ()
|
||||
}
|
||||
}
|
||||
|
||||
def execute(job: RJob): F[Unit] = {
|
||||
val task = for {
|
||||
jobtask <-
|
||||
|
Reference in New Issue
Block a user