From 41c0f70d3b8ae25056d4cd513dab7b7f8b0dd651 Mon Sep 17 00:00:00 2001 From: Eike Kettner Date: Fri, 26 Jun 2020 22:10:11 +0200 Subject: [PATCH] Fix cancelling jobs A request to cancel a job was not processed correctly. The cancelling routine of a task must run, regardless of the (non-final) state. Now it works like this: if a job is currently running, it is interrupted and its cancel routine is invoked. It then enters "cancelled" state. If it is stuck, it is loaded and only its cancel routine is run. If it is in a final state or waiting, it is removed from the queue. --- build.sbt | 1 + .../scala/docspell/backend/ops/OJob.scala | 62 ++++++++--------- .../scala/docspell/backend/ops/OJoex.scala | 9 +-- .../main/scala/docspell/common/JobState.scala | 7 +- .../docspell/joex/process/ItemHandler.scala | 66 ++++++++++++++----- .../docspell/joex/process/ProcessItem.scala | 1 - .../joex/scheduler/SchedulerImpl.scala | 37 ++++++++++- .../docspell/joexapi/client/JoexClient.scala | 8 ++- modules/microsite/docs/doc/processing.md | 8 +-- .../scala/docspell/store/queries/QItem.scala | 59 +++++++++++++---- .../scala/docspell/store/records/RJob.scala | 12 ++++ 11 files changed, 191 insertions(+), 79 deletions(-) diff --git a/build.sbt b/build.sbt index 051a767b..f0e27e5d 100644 --- a/build.sbt +++ b/build.sbt @@ -307,6 +307,7 @@ val joexapi = project.in(file("modules/joexapi")). name := "docspell-joexapi", libraryDependencies ++= Dependencies.circe ++ + Dependencies.http4sCirce ++ Dependencies.http4sClient, openapiTargetLanguage := Language.Scala, openapiPackage := Pkg("docspell.joexapi.model"), diff --git a/modules/backend/src/main/scala/docspell/backend/ops/OJob.scala b/modules/backend/src/main/scala/docspell/backend/ops/OJob.scala index 04329488..d24cd975 100644 --- a/modules/backend/src/main/scala/docspell/backend/ops/OJob.scala +++ b/modules/backend/src/main/scala/docspell/backend/ops/OJob.scala @@ -2,6 +2,7 @@ package docspell.backend.ops import cats.implicits._ import cats.effect._ +import cats.data.OptionT import docspell.backend.ops.OJob.{CollectiveQueueState, JobCancelResult} import docspell.common.{Ident, JobState} import docspell.store.Store @@ -22,6 +23,10 @@ object OJob { case object Removed extends JobCancelResult case object CancelRequested extends JobCancelResult case object JobNotFound extends JobCancelResult + + def removed: JobCancelResult = Removed + def cancelRequested: JobCancelResult = CancelRequested + def jobNotFound: JobCancelResult = JobNotFound } case class JobDetail(job: RJob, logs: Vector[RJobLog]) @@ -49,43 +54,30 @@ object OJob { .map(CollectiveQueueState) def cancelJob(id: Ident, collective: Ident): F[JobCancelResult] = { - def mustCancel(job: Option[RJob]): Option[(RJob, Ident)] = - for { - worker <- job.flatMap(_.worker) - job <- job.filter(j => - j.state == JobState.Scheduled || j.state == JobState.Running - ) - } yield (job, worker) + def remove(job: RJob): F[JobCancelResult] = + store.transact(RJob.delete(job.id)) *> JobCancelResult.removed.pure[F] - def canDelete(j: RJob): Boolean = - mustCancel(j.some).isEmpty - - val tryDelete = for { - job <- RJob.findByIdAndGroup(id, collective) - jobm = job.filter(canDelete) - del <- jobm.traverse(j => RJob.delete(j.id)) - } yield del match { - case Some(_) => Right(JobCancelResult.Removed: JobCancelResult) - case None => Left(mustCancel(job)) - } - - def tryCancel(job: RJob, worker: Ident): F[JobCancelResult] = - joex - .cancelJob(job.id, worker) - .map(flag => - if (flag) JobCancelResult.CancelRequested else JobCancelResult.JobNotFound - ) - - for { - tryDel <- store.transact(tryDelete) - result <- tryDel match { - case Right(r) => r.pure[F] - case Left(Some((job, worker))) => - tryCancel(job, worker) - case Left(None) => - (JobCancelResult.JobNotFound: OJob.JobCancelResult).pure[F] + def tryCancel(job: RJob): F[JobCancelResult] = + job.worker match { + case Some(worker) => + for { + flag <- joex.cancelJob(job.id, worker) + res <- + if (flag) JobCancelResult.cancelRequested.pure[F] + else remove(job) + } yield res + case None => + remove(job) } - } yield result + + (for { + job <- OptionT(store.transact(RJob.findByIdAndGroup(id, collective))) + result <- OptionT.liftF( + if (job.isInProgress) tryCancel(job) + else remove(job) + ) + } yield result) + .getOrElse(JobCancelResult.jobNotFound) } }) } diff --git a/modules/backend/src/main/scala/docspell/backend/ops/OJoex.scala b/modules/backend/src/main/scala/docspell/backend/ops/OJoex.scala index 541646b5..95659b93 100644 --- a/modules/backend/src/main/scala/docspell/backend/ops/OJoex.scala +++ b/modules/backend/src/main/scala/docspell/backend/ops/OJoex.scala @@ -2,6 +2,7 @@ package docspell.backend.ops import cats.implicits._ import cats.effect._ +import cats.data.OptionT import docspell.common.{Ident, NodeType} import docspell.joexapi.client.JoexClient import docspell.store.Store @@ -28,10 +29,10 @@ object OJoex { } yield () def cancelJob(job: Ident, worker: Ident): F[Boolean] = - for { - node <- store.transact(RNode.findById(worker)) - cancel <- node.traverse(n => client.cancelJob(n.url, job)) - } yield cancel.isDefined + (for { + node <- OptionT(store.transact(RNode.findById(worker))) + cancel <- OptionT.liftF(client.cancelJob(node.url, job)) + } yield cancel.success).getOrElse(false) }) def create[F[_]: ConcurrentEffect]( diff --git a/modules/common/src/main/scala/docspell/common/JobState.scala b/modules/common/src/main/scala/docspell/common/JobState.scala index 68567dcd..007bd478 100644 --- a/modules/common/src/main/scala/docspell/common/JobState.scala +++ b/modules/common/src/main/scala/docspell/common/JobState.scala @@ -20,7 +20,7 @@ object JobState { /** Is currently executing */ case object Running extends JobState {} - /** Finished with failure and is being retried. */ + /** Task completed with failure and is being retried. */ case object Stuck extends JobState {} /** Finished finally with a failure */ @@ -34,8 +34,9 @@ object JobState { val all: Set[JobState] = Set(Waiting, Scheduled, Running, Stuck, Failed, Cancelled, Success) - val queued: Set[JobState] = Set(Waiting, Scheduled, Stuck) - val done: Set[JobState] = Set(Failed, Cancelled, Success) + val queued: Set[JobState] = Set(Waiting, Scheduled, Stuck) + val done: Set[JobState] = Set(Failed, Cancelled, Success) + val inProgress: Set[JobState] = Set(Scheduled, Running, Stuck) def parse(str: String): Either[String, JobState] = str.toLowerCase match { diff --git a/modules/joex/src/main/scala/docspell/joex/process/ItemHandler.scala b/modules/joex/src/main/scala/docspell/joex/process/ItemHandler.scala index 5b7e9552..2346d69c 100644 --- a/modules/joex/src/main/scala/docspell/joex/process/ItemHandler.scala +++ b/modules/joex/src/main/scala/docspell/joex/process/ItemHandler.scala @@ -2,6 +2,7 @@ package docspell.joex.process import cats.implicits._ import cats.effect._ +import cats.data.OptionT import fs2.Stream import docspell.common.{ItemState, ProcessItemArgs} import docspell.joex.Config @@ -11,15 +12,22 @@ import docspell.store.records.RItem import docspell.ftsclient.FtsClient object ItemHandler { - def onCancel[F[_]: Sync: ContextShift]: Task[F, ProcessItemArgs, Unit] = - logWarn("Now cancelling. Deleting potentially created data.").flatMap(_ => - deleteByFileIds.flatMap(_ => deleteFiles) + type Args = ProcessItemArgs + + def onCancel[F[_]: Sync: ContextShift]: Task[F, Args, Unit] = + logWarn("Now cancelling.").flatMap(_ => + markItemCreated.flatMap { + case true => + Task.pure(()) + case false => + deleteByFileIds[F].flatMap(_ => deleteFiles) + } ) def newItem[F[_]: ConcurrentEffect: ContextShift]( cfg: Config, fts: FtsClient[F] - ): Task[F, ProcessItemArgs, Unit] = + ): Task[F, Args, Unit] = CreateItem[F] .flatMap(itemStateTask(ItemState.Processing)) .flatMap(safeProcess[F](cfg, fts)) @@ -34,13 +42,13 @@ object ItemHandler { .map(_ => data) ) - def isLastRetry[F[_]: Sync]: Task[F, ProcessItemArgs, Boolean] = + def isLastRetry[F[_]: Sync]: Task[F, Args, Boolean] = Task(_.isLastRetry) def safeProcess[F[_]: ConcurrentEffect: ContextShift]( cfg: Config, fts: FtsClient[F] - )(data: ItemData): Task[F, ProcessItemArgs, ItemData] = + )(data: ItemData): Task[F, Args, ItemData] = isLastRetry[F].flatMap { case true => ProcessItem[F](cfg, fts)(data).attempt.flatMap({ @@ -56,24 +64,50 @@ object ItemHandler { ProcessItem[F](cfg, fts)(data).flatMap(itemStateTask(ItemState.Created)) } - def deleteByFileIds[F[_]: Sync: ContextShift]: Task[F, ProcessItemArgs, Unit] = + private def markItemCreated[F[_]: Sync]: Task[F, Args, Boolean] = + Task { ctx => + val fileMetaIds = ctx.args.files.map(_.fileMetaId).toSet + (for { + item <- OptionT(ctx.store.transact(QItem.findOneByFileIds(fileMetaIds.toSeq))) + _ <- OptionT.liftF( + ctx.logger.info("Processing cancelled. Marking item as created anyways.") + ) + _ <- OptionT.liftF( + ctx.store + .transact( + RItem.updateState(item.id, ItemState.Created, ItemState.invalidStates) + ) + ) + } yield true) + .getOrElseF( + ctx.logger.warn("Processing cancelled. No item created").map(_ => false) + ) + } + + def deleteByFileIds[F[_]: Sync: ContextShift]: Task[F, Args, Unit] = Task { ctx => for { items <- ctx.store.transact(QItem.findByFileIds(ctx.args.files.map(_.fileMetaId))) - _ <- ctx.logger.info(s"Deleting items ${items.map(_.id.id)}") - _ <- items.traverse(i => QItem.delete(ctx.store)(i.id, ctx.args.meta.collective)) + _ <- + if (items.nonEmpty) ctx.logger.info(s"Deleting items ${items.map(_.id.id)}") + else + ctx.logger.info( + s"No items found for file ids ${ctx.args.files.map(_.fileMetaId)}" + ) + _ <- items.traverse(i => QItem.delete(ctx.store)(i.id, ctx.args.meta.collective)) } yield () } - private def deleteFiles[F[_]: Sync]: Task[F, ProcessItemArgs, Unit] = + private def deleteFiles[F[_]: Sync]: Task[F, Args, Unit] = Task(ctx => - Stream - .emits(ctx.args.files.map(_.fileMetaId.id)) - .flatMap(id => ctx.store.bitpeace.delete(id).attempt.drain) - .compile - .drain + ctx.logger.info("Deleting input files …") *> + Stream + .emits(ctx.args.files.map(_.fileMetaId.id)) + .flatMap(id => ctx.store.bitpeace.delete(id).attempt.drain) + .compile + .drain ) - private def logWarn[F[_]](msg: => String): Task[F, ProcessItemArgs, Unit] = + private def logWarn[F[_]](msg: => String): Task[F, Args, Unit] = Task(_.logger.warn(msg)) } diff --git a/modules/joex/src/main/scala/docspell/joex/process/ProcessItem.scala b/modules/joex/src/main/scala/docspell/joex/process/ProcessItem.scala index 1de74072..48542638 100644 --- a/modules/joex/src/main/scala/docspell/joex/process/ProcessItem.scala +++ b/modules/joex/src/main/scala/docspell/joex/process/ProcessItem.scala @@ -30,5 +30,4 @@ object ProcessItem { .flatMap(FindProposal[F](cfg.processing)) .flatMap(EvalProposals[F]) .flatMap(SaveProposals[F]) - } diff --git a/modules/joex/src/main/scala/docspell/joex/scheduler/SchedulerImpl.scala b/modules/joex/src/main/scala/docspell/joex/scheduler/SchedulerImpl.scala index 9fb6d160..ab2e7cc1 100644 --- a/modules/joex/src/main/scala/docspell/joex/scheduler/SchedulerImpl.scala +++ b/modules/joex/src/main/scala/docspell/joex/scheduler/SchedulerImpl.scala @@ -3,6 +3,7 @@ package docspell.joex.scheduler import fs2.Stream import cats.implicits._ import cats.effect.concurrent.Semaphore +import cats.data.OptionT import docspell.common._ import docspell.common.syntax.all._ import docspell.store.queue.JobQueue @@ -51,7 +52,16 @@ final class SchedulerImpl[F[_]: ConcurrentEffect: ContextShift]( state.get.flatMap(_.cancelRequest(jobId) match { case Some(ct) => ct.map(_ => true) case None => - logger.fwarn(s"Job ${jobId.id} not found, cannot cancel.").map(_ => false) + (for { + job <- OptionT(store.transact(RJob.findByIdAndWorker(jobId, config.name))) + _ <- OptionT.liftF( + if (job.isInProgress) executeCancel(job) + else ().pure[F] + ) + } yield true) + .getOrElseF( + logger.fwarn(s"Job ${jobId.id} not found, cannot cancel.").map(_ => false) + ) }) def notifyChange: F[Unit] = @@ -127,6 +137,31 @@ final class SchedulerImpl[F[_]: ConcurrentEffect: ContextShift]( }) } + private def executeCancel(job: RJob): F[Unit] = { + val task = for { + jobtask <- + tasks + .find(job.task) + .toRight(s"This executor cannot run tasks with name: ${job.task}") + } yield jobtask + + task match { + case Left(err) => + logger.ferror(s"Unable to run cancellation task for job ${job.info}: $err") + case Right(t) => + for { + _ <- + logger.fdebug(s"Creating context for job ${job.info} to run cancellation $t") + ctx <- Context[F, String](job, job.args, config, logSink, blocker, store) + _ <- t.onCancel.run(ctx) + _ <- state.modify(_.markCancelled(job)) + _ <- onFinish(job, JobState.Cancelled) + _ <- ctx.logger.warn("Job has been cancelled.") + _ <- logger.fdebug(s"Job ${job.info} has been cancelled.") + } yield () + } + } + def execute(job: RJob): F[Unit] = { val task = for { jobtask <- diff --git a/modules/joexapi/src/main/scala/docspell/joexapi/client/JoexClient.scala b/modules/joexapi/src/main/scala/docspell/joexapi/client/JoexClient.scala index 02a4c2cf..509d9689 100644 --- a/modules/joexapi/src/main/scala/docspell/joexapi/client/JoexClient.scala +++ b/modules/joexapi/src/main/scala/docspell/joexapi/client/JoexClient.scala @@ -4,9 +4,11 @@ import cats.implicits._ import cats.effect._ import docspell.common.{Ident, LenientUri} import docspell.common.syntax.all._ +import docspell.joexapi.model.BasicResult import org.http4s.{Method, Request, Uri} import org.http4s.client.Client import org.http4s.client.blaze.BlazeClientBuilder +import org.http4s.circe.CirceEntityDecoder._ import scala.concurrent.ExecutionContext import org.log4s.getLogger @@ -17,7 +19,7 @@ trait JoexClient[F[_]] { def notifyJoexIgnoreErrors(base: LenientUri): F[Unit] - def cancelJob(base: LenientUri, job: Ident): F[Unit] + def cancelJob(base: LenientUri, job: Ident): F[BasicResult] } @@ -44,10 +46,10 @@ object JoexClient { () } - def cancelJob(base: LenientUri, job: Ident): F[Unit] = { + def cancelJob(base: LenientUri, job: Ident): F[BasicResult] = { val cancelUrl = base / "api" / "v1" / "job" / job.id / "cancel" val req = Request[F](Method.POST, uri(cancelUrl)) - client.expect[String](req).map(_ => ()) + client.expect[BasicResult](req) } private def uri(u: LenientUri): Uri = diff --git a/modules/microsite/docs/doc/processing.md b/modules/microsite/docs/doc/processing.md index 9051873f..1554e815 100644 --- a/modules/microsite/docs/doc/processing.md +++ b/modules/microsite/docs/doc/processing.md @@ -36,7 +36,7 @@ that a job is some time waiting until it is picked up by a job executor. You can always start more job executors to help out. If a job fails, it is retried after some time. Only if it fails too -often (can be configured), it then is finished with *failed* state. If -processing finally fails, the item is still created, just without -suggestions. But if processing is cancelled by the user, the item is -not created. +often (can be configured), it then is finished with *failed* state. + +For the document-processing task, if processing finally fails or a job +is cancelled, the item is still created, just without suggestions. diff --git a/modules/store/src/main/scala/docspell/store/queries/QItem.scala b/modules/store/src/main/scala/docspell/store/queries/QItem.scala index a5a72ff2..d481656f 100644 --- a/modules/store/src/main/scala/docspell/store/queries/QItem.scala +++ b/modules/store/src/main/scala/docspell/store/queries/QItem.scala @@ -454,21 +454,56 @@ object QItem { n <- store.transact(RItem.deleteByIdAndCollective(itemId, collective)) } yield tn + rn + n + mn - def findByFileIds(fileMetaIds: Seq[Ident]): ConnectionIO[Vector[RItem]] = { - val IC = RItem.Columns - val AC = RAttachment.Columns - val q = - fr"SELECT DISTINCT" ++ commas( - IC.all.map(_.prefix("i").f) - ) ++ fr"FROM" ++ RItem.table ++ fr"i" ++ - fr"INNER JOIN" ++ RAttachment.table ++ fr"a ON" ++ AC.itemId - .prefix("a") - .is(IC.id.prefix("i")) ++ - fr"WHERE" ++ AC.fileId.isOneOf(fileMetaIds) ++ orderBy(IC.created.prefix("i").asc) + private def findByFileIdsQuery(fileMetaIds: NonEmptyList[Ident], limit: Option[Int]) = { + val IC = RItem.Columns.all.map(_.prefix("i")) + val aItem = RAttachment.Columns.itemId.prefix("a") + val aId = RAttachment.Columns.id.prefix("a") + val aFileId = RAttachment.Columns.fileId.prefix("a") + val iId = RItem.Columns.id.prefix("i") + val sId = RAttachmentSource.Columns.id.prefix("s") + val sFileId = RAttachmentSource.Columns.fileId.prefix("s") + val rId = RAttachmentArchive.Columns.id.prefix("r") + val rFileId = RAttachmentArchive.Columns.fileId.prefix("r") + val m1Id = RFileMeta.Columns.id.prefix("m1") + val m2Id = RFileMeta.Columns.id.prefix("m2") + val m3Id = RFileMeta.Columns.id.prefix("m3") - q.query[RItem].to[Vector] + val from = + RItem.table ++ fr"i INNER JOIN" ++ RAttachment.table ++ fr"a ON" ++ aItem.is(iId) ++ + fr"INNER JOIN" ++ RAttachmentSource.table ++ fr"s ON" ++ aId.is(sId) ++ + fr"INNER JOIN" ++ RFileMeta.table ++ fr"m1 ON" ++ m1Id.is(aFileId) ++ + fr"INNER JOIN" ++ RFileMeta.table ++ fr"m2 ON" ++ m2Id.is(sFileId) ++ + fr"LEFT OUTER JOIN" ++ RAttachmentArchive.table ++ fr"r ON" ++ aId.is(rId) ++ + fr"LEFT OUTER JOIN" ++ RFileMeta.table ++ fr"m3 ON" ++ m3Id.is(rFileId) + + val q = selectSimple( + IC, + from, + and(or(m1Id.isIn(fileMetaIds), m2Id.isIn(fileMetaIds), m3Id.isIn(fileMetaIds))) + ) + + limit match { + case Some(n) => q ++ fr"LIMIT $n" + case None => q + } } + def findOneByFileIds(fileMetaIds: Seq[Ident]): ConnectionIO[Option[RItem]] = + NonEmptyList.fromList(fileMetaIds.toList) match { + case Some(nel) => + findByFileIdsQuery(nel, Some(1)).query[RItem].option + case None => + (None: Option[RItem]).pure[ConnectionIO] + } + + def findByFileIds(fileMetaIds: Seq[Ident]): ConnectionIO[Vector[RItem]] = + NonEmptyList.fromList(fileMetaIds.toList) match { + case Some(nel) => + findByFileIdsQuery(nel, None).query[RItem].to[Vector] + case None => + Vector.empty[RItem].pure[ConnectionIO] + } + def findByChecksum(checksum: String, collective: Ident): ConnectionIO[Vector[RItem]] = { val IC = RItem.Columns.all.map(_.prefix("i")) val aItem = RAttachment.Columns.itemId.prefix("a") diff --git a/modules/store/src/main/scala/docspell/store/records/RJob.scala b/modules/store/src/main/scala/docspell/store/records/RJob.scala index 9f607c5c..0fbe7e2c 100644 --- a/modules/store/src/main/scala/docspell/store/records/RJob.scala +++ b/modules/store/src/main/scala/docspell/store/records/RJob.scala @@ -31,6 +31,12 @@ case class RJob( def info: String = s"${id.id.substring(0, 9)}.../${group.id}/${task.id}/$priority" + + def isFinalState: Boolean = + JobState.done.contains(state) + + def isInProgress: Boolean = + JobState.inProgress.contains(state) } object RJob { @@ -121,6 +127,12 @@ object RJob { def findByIdAndGroup(jobId: Ident, jobGroup: Ident): ConnectionIO[Option[RJob]] = selectSimple(all, table, and(id.is(jobId), group.is(jobGroup))).query[RJob].option + def findById(jobId: Ident): ConnectionIO[Option[RJob]] = + selectSimple(all, table, id.is(jobId)).query[RJob].option + + def findByIdAndWorker(jobId: Ident, workerId: Ident): ConnectionIO[Option[RJob]] = + selectSimple(all, table, and(id.is(jobId), worker.is(workerId))).query[RJob].option + def setRunningToWaiting(workerId: Ident): ConnectionIO[Int] = { val states: Seq[JobState] = List(JobState.Running, JobState.Scheduled) updateRow(