diff --git a/build.sbt b/build.sbt index 051a767b..f0e27e5d 100644 --- a/build.sbt +++ b/build.sbt @@ -307,6 +307,7 @@ val joexapi = project.in(file("modules/joexapi")). name := "docspell-joexapi", libraryDependencies ++= Dependencies.circe ++ + Dependencies.http4sCirce ++ Dependencies.http4sClient, openapiTargetLanguage := Language.Scala, openapiPackage := Pkg("docspell.joexapi.model"), diff --git a/modules/backend/src/main/scala/docspell/backend/ops/OJob.scala b/modules/backend/src/main/scala/docspell/backend/ops/OJob.scala index 04329488..d24cd975 100644 --- a/modules/backend/src/main/scala/docspell/backend/ops/OJob.scala +++ b/modules/backend/src/main/scala/docspell/backend/ops/OJob.scala @@ -2,6 +2,7 @@ package docspell.backend.ops import cats.implicits._ import cats.effect._ +import cats.data.OptionT import docspell.backend.ops.OJob.{CollectiveQueueState, JobCancelResult} import docspell.common.{Ident, JobState} import docspell.store.Store @@ -22,6 +23,10 @@ object OJob { case object Removed extends JobCancelResult case object CancelRequested extends JobCancelResult case object JobNotFound extends JobCancelResult + + def removed: JobCancelResult = Removed + def cancelRequested: JobCancelResult = CancelRequested + def jobNotFound: JobCancelResult = JobNotFound } case class JobDetail(job: RJob, logs: Vector[RJobLog]) @@ -49,43 +54,30 @@ object OJob { .map(CollectiveQueueState) def cancelJob(id: Ident, collective: Ident): F[JobCancelResult] = { - def mustCancel(job: Option[RJob]): Option[(RJob, Ident)] = - for { - worker <- job.flatMap(_.worker) - job <- job.filter(j => - j.state == JobState.Scheduled || j.state == JobState.Running - ) - } yield (job, worker) + def remove(job: RJob): F[JobCancelResult] = + store.transact(RJob.delete(job.id)) *> JobCancelResult.removed.pure[F] - def canDelete(j: RJob): Boolean = - mustCancel(j.some).isEmpty - - val tryDelete = for { - job <- RJob.findByIdAndGroup(id, collective) - jobm = job.filter(canDelete) - del <- jobm.traverse(j => RJob.delete(j.id)) - } yield del match { - case Some(_) => Right(JobCancelResult.Removed: JobCancelResult) - case None => Left(mustCancel(job)) - } - - def tryCancel(job: RJob, worker: Ident): F[JobCancelResult] = - joex - .cancelJob(job.id, worker) - .map(flag => - if (flag) JobCancelResult.CancelRequested else JobCancelResult.JobNotFound - ) - - for { - tryDel <- store.transact(tryDelete) - result <- tryDel match { - case Right(r) => r.pure[F] - case Left(Some((job, worker))) => - tryCancel(job, worker) - case Left(None) => - (JobCancelResult.JobNotFound: OJob.JobCancelResult).pure[F] + def tryCancel(job: RJob): F[JobCancelResult] = + job.worker match { + case Some(worker) => + for { + flag <- joex.cancelJob(job.id, worker) + res <- + if (flag) JobCancelResult.cancelRequested.pure[F] + else remove(job) + } yield res + case None => + remove(job) } - } yield result + + (for { + job <- OptionT(store.transact(RJob.findByIdAndGroup(id, collective))) + result <- OptionT.liftF( + if (job.isInProgress) tryCancel(job) + else remove(job) + ) + } yield result) + .getOrElse(JobCancelResult.jobNotFound) } }) } diff --git a/modules/backend/src/main/scala/docspell/backend/ops/OJoex.scala b/modules/backend/src/main/scala/docspell/backend/ops/OJoex.scala index 541646b5..95659b93 100644 --- a/modules/backend/src/main/scala/docspell/backend/ops/OJoex.scala +++ b/modules/backend/src/main/scala/docspell/backend/ops/OJoex.scala @@ -2,6 +2,7 @@ package docspell.backend.ops import cats.implicits._ import cats.effect._ +import cats.data.OptionT import docspell.common.{Ident, NodeType} import docspell.joexapi.client.JoexClient import docspell.store.Store @@ -28,10 +29,10 @@ object OJoex { } yield () def cancelJob(job: Ident, worker: Ident): F[Boolean] = - for { - node <- store.transact(RNode.findById(worker)) - cancel <- node.traverse(n => client.cancelJob(n.url, job)) - } yield cancel.isDefined + (for { + node <- OptionT(store.transact(RNode.findById(worker))) + cancel <- OptionT.liftF(client.cancelJob(node.url, job)) + } yield cancel.success).getOrElse(false) }) def create[F[_]: ConcurrentEffect]( diff --git a/modules/common/src/main/scala/docspell/common/JobState.scala b/modules/common/src/main/scala/docspell/common/JobState.scala index 68567dcd..007bd478 100644 --- a/modules/common/src/main/scala/docspell/common/JobState.scala +++ b/modules/common/src/main/scala/docspell/common/JobState.scala @@ -20,7 +20,7 @@ object JobState { /** Is currently executing */ case object Running extends JobState {} - /** Finished with failure and is being retried. */ + /** Task completed with failure and is being retried. */ case object Stuck extends JobState {} /** Finished finally with a failure */ @@ -34,8 +34,9 @@ object JobState { val all: Set[JobState] = Set(Waiting, Scheduled, Running, Stuck, Failed, Cancelled, Success) - val queued: Set[JobState] = Set(Waiting, Scheduled, Stuck) - val done: Set[JobState] = Set(Failed, Cancelled, Success) + val queued: Set[JobState] = Set(Waiting, Scheduled, Stuck) + val done: Set[JobState] = Set(Failed, Cancelled, Success) + val inProgress: Set[JobState] = Set(Scheduled, Running, Stuck) def parse(str: String): Either[String, JobState] = str.toLowerCase match { diff --git a/modules/joex/src/main/scala/docspell/joex/process/ItemHandler.scala b/modules/joex/src/main/scala/docspell/joex/process/ItemHandler.scala index 5b7e9552..2346d69c 100644 --- a/modules/joex/src/main/scala/docspell/joex/process/ItemHandler.scala +++ b/modules/joex/src/main/scala/docspell/joex/process/ItemHandler.scala @@ -2,6 +2,7 @@ package docspell.joex.process import cats.implicits._ import cats.effect._ +import cats.data.OptionT import fs2.Stream import docspell.common.{ItemState, ProcessItemArgs} import docspell.joex.Config @@ -11,15 +12,22 @@ import docspell.store.records.RItem import docspell.ftsclient.FtsClient object ItemHandler { - def onCancel[F[_]: Sync: ContextShift]: Task[F, ProcessItemArgs, Unit] = - logWarn("Now cancelling. Deleting potentially created data.").flatMap(_ => - deleteByFileIds.flatMap(_ => deleteFiles) + type Args = ProcessItemArgs + + def onCancel[F[_]: Sync: ContextShift]: Task[F, Args, Unit] = + logWarn("Now cancelling.").flatMap(_ => + markItemCreated.flatMap { + case true => + Task.pure(()) + case false => + deleteByFileIds[F].flatMap(_ => deleteFiles) + } ) def newItem[F[_]: ConcurrentEffect: ContextShift]( cfg: Config, fts: FtsClient[F] - ): Task[F, ProcessItemArgs, Unit] = + ): Task[F, Args, Unit] = CreateItem[F] .flatMap(itemStateTask(ItemState.Processing)) .flatMap(safeProcess[F](cfg, fts)) @@ -34,13 +42,13 @@ object ItemHandler { .map(_ => data) ) - def isLastRetry[F[_]: Sync]: Task[F, ProcessItemArgs, Boolean] = + def isLastRetry[F[_]: Sync]: Task[F, Args, Boolean] = Task(_.isLastRetry) def safeProcess[F[_]: ConcurrentEffect: ContextShift]( cfg: Config, fts: FtsClient[F] - )(data: ItemData): Task[F, ProcessItemArgs, ItemData] = + )(data: ItemData): Task[F, Args, ItemData] = isLastRetry[F].flatMap { case true => ProcessItem[F](cfg, fts)(data).attempt.flatMap({ @@ -56,24 +64,50 @@ object ItemHandler { ProcessItem[F](cfg, fts)(data).flatMap(itemStateTask(ItemState.Created)) } - def deleteByFileIds[F[_]: Sync: ContextShift]: Task[F, ProcessItemArgs, Unit] = + private def markItemCreated[F[_]: Sync]: Task[F, Args, Boolean] = + Task { ctx => + val fileMetaIds = ctx.args.files.map(_.fileMetaId).toSet + (for { + item <- OptionT(ctx.store.transact(QItem.findOneByFileIds(fileMetaIds.toSeq))) + _ <- OptionT.liftF( + ctx.logger.info("Processing cancelled. Marking item as created anyways.") + ) + _ <- OptionT.liftF( + ctx.store + .transact( + RItem.updateState(item.id, ItemState.Created, ItemState.invalidStates) + ) + ) + } yield true) + .getOrElseF( + ctx.logger.warn("Processing cancelled. No item created").map(_ => false) + ) + } + + def deleteByFileIds[F[_]: Sync: ContextShift]: Task[F, Args, Unit] = Task { ctx => for { items <- ctx.store.transact(QItem.findByFileIds(ctx.args.files.map(_.fileMetaId))) - _ <- ctx.logger.info(s"Deleting items ${items.map(_.id.id)}") - _ <- items.traverse(i => QItem.delete(ctx.store)(i.id, ctx.args.meta.collective)) + _ <- + if (items.nonEmpty) ctx.logger.info(s"Deleting items ${items.map(_.id.id)}") + else + ctx.logger.info( + s"No items found for file ids ${ctx.args.files.map(_.fileMetaId)}" + ) + _ <- items.traverse(i => QItem.delete(ctx.store)(i.id, ctx.args.meta.collective)) } yield () } - private def deleteFiles[F[_]: Sync]: Task[F, ProcessItemArgs, Unit] = + private def deleteFiles[F[_]: Sync]: Task[F, Args, Unit] = Task(ctx => - Stream - .emits(ctx.args.files.map(_.fileMetaId.id)) - .flatMap(id => ctx.store.bitpeace.delete(id).attempt.drain) - .compile - .drain + ctx.logger.info("Deleting input files …") *> + Stream + .emits(ctx.args.files.map(_.fileMetaId.id)) + .flatMap(id => ctx.store.bitpeace.delete(id).attempt.drain) + .compile + .drain ) - private def logWarn[F[_]](msg: => String): Task[F, ProcessItemArgs, Unit] = + private def logWarn[F[_]](msg: => String): Task[F, Args, Unit] = Task(_.logger.warn(msg)) } diff --git a/modules/joex/src/main/scala/docspell/joex/process/ProcessItem.scala b/modules/joex/src/main/scala/docspell/joex/process/ProcessItem.scala index 1de74072..48542638 100644 --- a/modules/joex/src/main/scala/docspell/joex/process/ProcessItem.scala +++ b/modules/joex/src/main/scala/docspell/joex/process/ProcessItem.scala @@ -30,5 +30,4 @@ object ProcessItem { .flatMap(FindProposal[F](cfg.processing)) .flatMap(EvalProposals[F]) .flatMap(SaveProposals[F]) - } diff --git a/modules/joex/src/main/scala/docspell/joex/scheduler/SchedulerImpl.scala b/modules/joex/src/main/scala/docspell/joex/scheduler/SchedulerImpl.scala index 9fb6d160..ab2e7cc1 100644 --- a/modules/joex/src/main/scala/docspell/joex/scheduler/SchedulerImpl.scala +++ b/modules/joex/src/main/scala/docspell/joex/scheduler/SchedulerImpl.scala @@ -3,6 +3,7 @@ package docspell.joex.scheduler import fs2.Stream import cats.implicits._ import cats.effect.concurrent.Semaphore +import cats.data.OptionT import docspell.common._ import docspell.common.syntax.all._ import docspell.store.queue.JobQueue @@ -51,7 +52,16 @@ final class SchedulerImpl[F[_]: ConcurrentEffect: ContextShift]( state.get.flatMap(_.cancelRequest(jobId) match { case Some(ct) => ct.map(_ => true) case None => - logger.fwarn(s"Job ${jobId.id} not found, cannot cancel.").map(_ => false) + (for { + job <- OptionT(store.transact(RJob.findByIdAndWorker(jobId, config.name))) + _ <- OptionT.liftF( + if (job.isInProgress) executeCancel(job) + else ().pure[F] + ) + } yield true) + .getOrElseF( + logger.fwarn(s"Job ${jobId.id} not found, cannot cancel.").map(_ => false) + ) }) def notifyChange: F[Unit] = @@ -127,6 +137,31 @@ final class SchedulerImpl[F[_]: ConcurrentEffect: ContextShift]( }) } + private def executeCancel(job: RJob): F[Unit] = { + val task = for { + jobtask <- + tasks + .find(job.task) + .toRight(s"This executor cannot run tasks with name: ${job.task}") + } yield jobtask + + task match { + case Left(err) => + logger.ferror(s"Unable to run cancellation task for job ${job.info}: $err") + case Right(t) => + for { + _ <- + logger.fdebug(s"Creating context for job ${job.info} to run cancellation $t") + ctx <- Context[F, String](job, job.args, config, logSink, blocker, store) + _ <- t.onCancel.run(ctx) + _ <- state.modify(_.markCancelled(job)) + _ <- onFinish(job, JobState.Cancelled) + _ <- ctx.logger.warn("Job has been cancelled.") + _ <- logger.fdebug(s"Job ${job.info} has been cancelled.") + } yield () + } + } + def execute(job: RJob): F[Unit] = { val task = for { jobtask <- diff --git a/modules/joexapi/src/main/scala/docspell/joexapi/client/JoexClient.scala b/modules/joexapi/src/main/scala/docspell/joexapi/client/JoexClient.scala index 02a4c2cf..509d9689 100644 --- a/modules/joexapi/src/main/scala/docspell/joexapi/client/JoexClient.scala +++ b/modules/joexapi/src/main/scala/docspell/joexapi/client/JoexClient.scala @@ -4,9 +4,11 @@ import cats.implicits._ import cats.effect._ import docspell.common.{Ident, LenientUri} import docspell.common.syntax.all._ +import docspell.joexapi.model.BasicResult import org.http4s.{Method, Request, Uri} import org.http4s.client.Client import org.http4s.client.blaze.BlazeClientBuilder +import org.http4s.circe.CirceEntityDecoder._ import scala.concurrent.ExecutionContext import org.log4s.getLogger @@ -17,7 +19,7 @@ trait JoexClient[F[_]] { def notifyJoexIgnoreErrors(base: LenientUri): F[Unit] - def cancelJob(base: LenientUri, job: Ident): F[Unit] + def cancelJob(base: LenientUri, job: Ident): F[BasicResult] } @@ -44,10 +46,10 @@ object JoexClient { () } - def cancelJob(base: LenientUri, job: Ident): F[Unit] = { + def cancelJob(base: LenientUri, job: Ident): F[BasicResult] = { val cancelUrl = base / "api" / "v1" / "job" / job.id / "cancel" val req = Request[F](Method.POST, uri(cancelUrl)) - client.expect[String](req).map(_ => ()) + client.expect[BasicResult](req) } private def uri(u: LenientUri): Uri = diff --git a/modules/microsite/docs/doc/processing.md b/modules/microsite/docs/doc/processing.md index 9051873f..1554e815 100644 --- a/modules/microsite/docs/doc/processing.md +++ b/modules/microsite/docs/doc/processing.md @@ -36,7 +36,7 @@ that a job is some time waiting until it is picked up by a job executor. You can always start more job executors to help out. If a job fails, it is retried after some time. Only if it fails too -often (can be configured), it then is finished with *failed* state. If -processing finally fails, the item is still created, just without -suggestions. But if processing is cancelled by the user, the item is -not created. +often (can be configured), it then is finished with *failed* state. + +For the document-processing task, if processing finally fails or a job +is cancelled, the item is still created, just without suggestions. diff --git a/modules/store/src/main/scala/docspell/store/queries/QItem.scala b/modules/store/src/main/scala/docspell/store/queries/QItem.scala index a5a72ff2..d481656f 100644 --- a/modules/store/src/main/scala/docspell/store/queries/QItem.scala +++ b/modules/store/src/main/scala/docspell/store/queries/QItem.scala @@ -454,21 +454,56 @@ object QItem { n <- store.transact(RItem.deleteByIdAndCollective(itemId, collective)) } yield tn + rn + n + mn - def findByFileIds(fileMetaIds: Seq[Ident]): ConnectionIO[Vector[RItem]] = { - val IC = RItem.Columns - val AC = RAttachment.Columns - val q = - fr"SELECT DISTINCT" ++ commas( - IC.all.map(_.prefix("i").f) - ) ++ fr"FROM" ++ RItem.table ++ fr"i" ++ - fr"INNER JOIN" ++ RAttachment.table ++ fr"a ON" ++ AC.itemId - .prefix("a") - .is(IC.id.prefix("i")) ++ - fr"WHERE" ++ AC.fileId.isOneOf(fileMetaIds) ++ orderBy(IC.created.prefix("i").asc) + private def findByFileIdsQuery(fileMetaIds: NonEmptyList[Ident], limit: Option[Int]) = { + val IC = RItem.Columns.all.map(_.prefix("i")) + val aItem = RAttachment.Columns.itemId.prefix("a") + val aId = RAttachment.Columns.id.prefix("a") + val aFileId = RAttachment.Columns.fileId.prefix("a") + val iId = RItem.Columns.id.prefix("i") + val sId = RAttachmentSource.Columns.id.prefix("s") + val sFileId = RAttachmentSource.Columns.fileId.prefix("s") + val rId = RAttachmentArchive.Columns.id.prefix("r") + val rFileId = RAttachmentArchive.Columns.fileId.prefix("r") + val m1Id = RFileMeta.Columns.id.prefix("m1") + val m2Id = RFileMeta.Columns.id.prefix("m2") + val m3Id = RFileMeta.Columns.id.prefix("m3") - q.query[RItem].to[Vector] + val from = + RItem.table ++ fr"i INNER JOIN" ++ RAttachment.table ++ fr"a ON" ++ aItem.is(iId) ++ + fr"INNER JOIN" ++ RAttachmentSource.table ++ fr"s ON" ++ aId.is(sId) ++ + fr"INNER JOIN" ++ RFileMeta.table ++ fr"m1 ON" ++ m1Id.is(aFileId) ++ + fr"INNER JOIN" ++ RFileMeta.table ++ fr"m2 ON" ++ m2Id.is(sFileId) ++ + fr"LEFT OUTER JOIN" ++ RAttachmentArchive.table ++ fr"r ON" ++ aId.is(rId) ++ + fr"LEFT OUTER JOIN" ++ RFileMeta.table ++ fr"m3 ON" ++ m3Id.is(rFileId) + + val q = selectSimple( + IC, + from, + and(or(m1Id.isIn(fileMetaIds), m2Id.isIn(fileMetaIds), m3Id.isIn(fileMetaIds))) + ) + + limit match { + case Some(n) => q ++ fr"LIMIT $n" + case None => q + } } + def findOneByFileIds(fileMetaIds: Seq[Ident]): ConnectionIO[Option[RItem]] = + NonEmptyList.fromList(fileMetaIds.toList) match { + case Some(nel) => + findByFileIdsQuery(nel, Some(1)).query[RItem].option + case None => + (None: Option[RItem]).pure[ConnectionIO] + } + + def findByFileIds(fileMetaIds: Seq[Ident]): ConnectionIO[Vector[RItem]] = + NonEmptyList.fromList(fileMetaIds.toList) match { + case Some(nel) => + findByFileIdsQuery(nel, None).query[RItem].to[Vector] + case None => + Vector.empty[RItem].pure[ConnectionIO] + } + def findByChecksum(checksum: String, collective: Ident): ConnectionIO[Vector[RItem]] = { val IC = RItem.Columns.all.map(_.prefix("i")) val aItem = RAttachment.Columns.itemId.prefix("a") diff --git a/modules/store/src/main/scala/docspell/store/records/RJob.scala b/modules/store/src/main/scala/docspell/store/records/RJob.scala index 9f607c5c..0fbe7e2c 100644 --- a/modules/store/src/main/scala/docspell/store/records/RJob.scala +++ b/modules/store/src/main/scala/docspell/store/records/RJob.scala @@ -31,6 +31,12 @@ case class RJob( def info: String = s"${id.id.substring(0, 9)}.../${group.id}/${task.id}/$priority" + + def isFinalState: Boolean = + JobState.done.contains(state) + + def isInProgress: Boolean = + JobState.inProgress.contains(state) } object RJob { @@ -121,6 +127,12 @@ object RJob { def findByIdAndGroup(jobId: Ident, jobGroup: Ident): ConnectionIO[Option[RJob]] = selectSimple(all, table, and(id.is(jobId), group.is(jobGroup))).query[RJob].option + def findById(jobId: Ident): ConnectionIO[Option[RJob]] = + selectSimple(all, table, id.is(jobId)).query[RJob].option + + def findByIdAndWorker(jobId: Ident, workerId: Ident): ConnectionIO[Option[RJob]] = + selectSimple(all, table, and(id.is(jobId), worker.is(workerId))).query[RJob].option + def setRunningToWaiting(workerId: Ident): ConnectionIO[Int] = { val states: Seq[JobState] = List(JobState.Running, JobState.Scheduled) updateRow(