mirror of
https://github.com/TheAnachronism/docspell.git
synced 2025-04-05 02:49:32 +00:00
commit
9d1717f0e0
@ -307,6 +307,7 @@ val joexapi = project.in(file("modules/joexapi")).
|
||||
name := "docspell-joexapi",
|
||||
libraryDependencies ++=
|
||||
Dependencies.circe ++
|
||||
Dependencies.http4sCirce ++
|
||||
Dependencies.http4sClient,
|
||||
openapiTargetLanguage := Language.Scala,
|
||||
openapiPackage := Pkg("docspell.joexapi.model"),
|
||||
|
@ -2,6 +2,7 @@ package docspell.backend.ops
|
||||
|
||||
import cats.implicits._
|
||||
import cats.effect._
|
||||
import cats.data.OptionT
|
||||
import docspell.backend.ops.OJob.{CollectiveQueueState, JobCancelResult}
|
||||
import docspell.common.{Ident, JobState}
|
||||
import docspell.store.Store
|
||||
@ -22,6 +23,10 @@ object OJob {
|
||||
case object Removed extends JobCancelResult
|
||||
case object CancelRequested extends JobCancelResult
|
||||
case object JobNotFound extends JobCancelResult
|
||||
|
||||
def removed: JobCancelResult = Removed
|
||||
def cancelRequested: JobCancelResult = CancelRequested
|
||||
def jobNotFound: JobCancelResult = JobNotFound
|
||||
}
|
||||
|
||||
case class JobDetail(job: RJob, logs: Vector[RJobLog])
|
||||
@ -49,43 +54,30 @@ object OJob {
|
||||
.map(CollectiveQueueState)
|
||||
|
||||
def cancelJob(id: Ident, collective: Ident): F[JobCancelResult] = {
|
||||
def mustCancel(job: Option[RJob]): Option[(RJob, Ident)] =
|
||||
for {
|
||||
worker <- job.flatMap(_.worker)
|
||||
job <- job.filter(j =>
|
||||
j.state == JobState.Scheduled || j.state == JobState.Running
|
||||
)
|
||||
} yield (job, worker)
|
||||
def remove(job: RJob): F[JobCancelResult] =
|
||||
store.transact(RJob.delete(job.id)) *> JobCancelResult.removed.pure[F]
|
||||
|
||||
def canDelete(j: RJob): Boolean =
|
||||
mustCancel(j.some).isEmpty
|
||||
|
||||
val tryDelete = for {
|
||||
job <- RJob.findByIdAndGroup(id, collective)
|
||||
jobm = job.filter(canDelete)
|
||||
del <- jobm.traverse(j => RJob.delete(j.id))
|
||||
} yield del match {
|
||||
case Some(_) => Right(JobCancelResult.Removed: JobCancelResult)
|
||||
case None => Left(mustCancel(job))
|
||||
}
|
||||
|
||||
def tryCancel(job: RJob, worker: Ident): F[JobCancelResult] =
|
||||
joex
|
||||
.cancelJob(job.id, worker)
|
||||
.map(flag =>
|
||||
if (flag) JobCancelResult.CancelRequested else JobCancelResult.JobNotFound
|
||||
)
|
||||
|
||||
for {
|
||||
tryDel <- store.transact(tryDelete)
|
||||
result <- tryDel match {
|
||||
case Right(r) => r.pure[F]
|
||||
case Left(Some((job, worker))) =>
|
||||
tryCancel(job, worker)
|
||||
case Left(None) =>
|
||||
(JobCancelResult.JobNotFound: OJob.JobCancelResult).pure[F]
|
||||
def tryCancel(job: RJob): F[JobCancelResult] =
|
||||
job.worker match {
|
||||
case Some(worker) =>
|
||||
for {
|
||||
flag <- joex.cancelJob(job.id, worker)
|
||||
res <-
|
||||
if (flag) JobCancelResult.cancelRequested.pure[F]
|
||||
else remove(job)
|
||||
} yield res
|
||||
case None =>
|
||||
remove(job)
|
||||
}
|
||||
} yield result
|
||||
|
||||
(for {
|
||||
job <- OptionT(store.transact(RJob.findByIdAndGroup(id, collective)))
|
||||
result <- OptionT.liftF(
|
||||
if (job.isInProgress) tryCancel(job)
|
||||
else remove(job)
|
||||
)
|
||||
} yield result)
|
||||
.getOrElse(JobCancelResult.jobNotFound)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
@ -2,6 +2,7 @@ package docspell.backend.ops
|
||||
|
||||
import cats.implicits._
|
||||
import cats.effect._
|
||||
import cats.data.OptionT
|
||||
import docspell.common.{Ident, NodeType}
|
||||
import docspell.joexapi.client.JoexClient
|
||||
import docspell.store.Store
|
||||
@ -28,10 +29,10 @@ object OJoex {
|
||||
} yield ()
|
||||
|
||||
def cancelJob(job: Ident, worker: Ident): F[Boolean] =
|
||||
for {
|
||||
node <- store.transact(RNode.findById(worker))
|
||||
cancel <- node.traverse(n => client.cancelJob(n.url, job))
|
||||
} yield cancel.isDefined
|
||||
(for {
|
||||
node <- OptionT(store.transact(RNode.findById(worker)))
|
||||
cancel <- OptionT.liftF(client.cancelJob(node.url, job))
|
||||
} yield cancel.success).getOrElse(false)
|
||||
})
|
||||
|
||||
def create[F[_]: ConcurrentEffect](
|
||||
|
@ -20,7 +20,7 @@ object JobState {
|
||||
/** Is currently executing */
|
||||
case object Running extends JobState {}
|
||||
|
||||
/** Finished with failure and is being retried. */
|
||||
/** Task completed with failure and is being retried. */
|
||||
case object Stuck extends JobState {}
|
||||
|
||||
/** Finished finally with a failure */
|
||||
@ -34,8 +34,9 @@ object JobState {
|
||||
|
||||
val all: Set[JobState] =
|
||||
Set(Waiting, Scheduled, Running, Stuck, Failed, Cancelled, Success)
|
||||
val queued: Set[JobState] = Set(Waiting, Scheduled, Stuck)
|
||||
val done: Set[JobState] = Set(Failed, Cancelled, Success)
|
||||
val queued: Set[JobState] = Set(Waiting, Scheduled, Stuck)
|
||||
val done: Set[JobState] = Set(Failed, Cancelled, Success)
|
||||
val inProgress: Set[JobState] = Set(Scheduled, Running, Stuck)
|
||||
|
||||
def parse(str: String): Either[String, JobState] =
|
||||
str.toLowerCase match {
|
||||
|
@ -2,6 +2,7 @@ package docspell.joex.process
|
||||
|
||||
import cats.implicits._
|
||||
import cats.effect._
|
||||
import cats.data.OptionT
|
||||
import fs2.Stream
|
||||
import docspell.common.{ItemState, ProcessItemArgs}
|
||||
import docspell.joex.Config
|
||||
@ -11,15 +12,22 @@ import docspell.store.records.RItem
|
||||
import docspell.ftsclient.FtsClient
|
||||
|
||||
object ItemHandler {
|
||||
def onCancel[F[_]: Sync: ContextShift]: Task[F, ProcessItemArgs, Unit] =
|
||||
logWarn("Now cancelling. Deleting potentially created data.").flatMap(_ =>
|
||||
deleteByFileIds.flatMap(_ => deleteFiles)
|
||||
type Args = ProcessItemArgs
|
||||
|
||||
def onCancel[F[_]: Sync: ContextShift]: Task[F, Args, Unit] =
|
||||
logWarn("Now cancelling.").flatMap(_ =>
|
||||
markItemCreated.flatMap {
|
||||
case true =>
|
||||
Task.pure(())
|
||||
case false =>
|
||||
deleteByFileIds[F].flatMap(_ => deleteFiles)
|
||||
}
|
||||
)
|
||||
|
||||
def newItem[F[_]: ConcurrentEffect: ContextShift](
|
||||
cfg: Config,
|
||||
fts: FtsClient[F]
|
||||
): Task[F, ProcessItemArgs, Unit] =
|
||||
): Task[F, Args, Unit] =
|
||||
CreateItem[F]
|
||||
.flatMap(itemStateTask(ItemState.Processing))
|
||||
.flatMap(safeProcess[F](cfg, fts))
|
||||
@ -34,13 +42,13 @@ object ItemHandler {
|
||||
.map(_ => data)
|
||||
)
|
||||
|
||||
def isLastRetry[F[_]: Sync]: Task[F, ProcessItemArgs, Boolean] =
|
||||
def isLastRetry[F[_]: Sync]: Task[F, Args, Boolean] =
|
||||
Task(_.isLastRetry)
|
||||
|
||||
def safeProcess[F[_]: ConcurrentEffect: ContextShift](
|
||||
cfg: Config,
|
||||
fts: FtsClient[F]
|
||||
)(data: ItemData): Task[F, ProcessItemArgs, ItemData] =
|
||||
)(data: ItemData): Task[F, Args, ItemData] =
|
||||
isLastRetry[F].flatMap {
|
||||
case true =>
|
||||
ProcessItem[F](cfg, fts)(data).attempt.flatMap({
|
||||
@ -56,24 +64,50 @@ object ItemHandler {
|
||||
ProcessItem[F](cfg, fts)(data).flatMap(itemStateTask(ItemState.Created))
|
||||
}
|
||||
|
||||
def deleteByFileIds[F[_]: Sync: ContextShift]: Task[F, ProcessItemArgs, Unit] =
|
||||
private def markItemCreated[F[_]: Sync]: Task[F, Args, Boolean] =
|
||||
Task { ctx =>
|
||||
val fileMetaIds = ctx.args.files.map(_.fileMetaId).toSet
|
||||
(for {
|
||||
item <- OptionT(ctx.store.transact(QItem.findOneByFileIds(fileMetaIds.toSeq)))
|
||||
_ <- OptionT.liftF(
|
||||
ctx.logger.info("Processing cancelled. Marking item as created anyways.")
|
||||
)
|
||||
_ <- OptionT.liftF(
|
||||
ctx.store
|
||||
.transact(
|
||||
RItem.updateState(item.id, ItemState.Created, ItemState.invalidStates)
|
||||
)
|
||||
)
|
||||
} yield true)
|
||||
.getOrElseF(
|
||||
ctx.logger.warn("Processing cancelled. No item created").map(_ => false)
|
||||
)
|
||||
}
|
||||
|
||||
def deleteByFileIds[F[_]: Sync: ContextShift]: Task[F, Args, Unit] =
|
||||
Task { ctx =>
|
||||
for {
|
||||
items <- ctx.store.transact(QItem.findByFileIds(ctx.args.files.map(_.fileMetaId)))
|
||||
_ <- ctx.logger.info(s"Deleting items ${items.map(_.id.id)}")
|
||||
_ <- items.traverse(i => QItem.delete(ctx.store)(i.id, ctx.args.meta.collective))
|
||||
_ <-
|
||||
if (items.nonEmpty) ctx.logger.info(s"Deleting items ${items.map(_.id.id)}")
|
||||
else
|
||||
ctx.logger.info(
|
||||
s"No items found for file ids ${ctx.args.files.map(_.fileMetaId)}"
|
||||
)
|
||||
_ <- items.traverse(i => QItem.delete(ctx.store)(i.id, ctx.args.meta.collective))
|
||||
} yield ()
|
||||
}
|
||||
|
||||
private def deleteFiles[F[_]: Sync]: Task[F, ProcessItemArgs, Unit] =
|
||||
private def deleteFiles[F[_]: Sync]: Task[F, Args, Unit] =
|
||||
Task(ctx =>
|
||||
Stream
|
||||
.emits(ctx.args.files.map(_.fileMetaId.id))
|
||||
.flatMap(id => ctx.store.bitpeace.delete(id).attempt.drain)
|
||||
.compile
|
||||
.drain
|
||||
ctx.logger.info("Deleting input files …") *>
|
||||
Stream
|
||||
.emits(ctx.args.files.map(_.fileMetaId.id))
|
||||
.flatMap(id => ctx.store.bitpeace.delete(id).attempt.drain)
|
||||
.compile
|
||||
.drain
|
||||
)
|
||||
|
||||
private def logWarn[F[_]](msg: => String): Task[F, ProcessItemArgs, Unit] =
|
||||
private def logWarn[F[_]](msg: => String): Task[F, Args, Unit] =
|
||||
Task(_.logger.warn(msg))
|
||||
}
|
||||
|
@ -30,5 +30,4 @@ object ProcessItem {
|
||||
.flatMap(FindProposal[F](cfg.processing))
|
||||
.flatMap(EvalProposals[F])
|
||||
.flatMap(SaveProposals[F])
|
||||
|
||||
}
|
||||
|
@ -3,6 +3,7 @@ package docspell.joex.scheduler
|
||||
import fs2.Stream
|
||||
import cats.implicits._
|
||||
import cats.effect.concurrent.Semaphore
|
||||
import cats.data.OptionT
|
||||
import docspell.common._
|
||||
import docspell.common.syntax.all._
|
||||
import docspell.store.queue.JobQueue
|
||||
@ -51,7 +52,16 @@ final class SchedulerImpl[F[_]: ConcurrentEffect: ContextShift](
|
||||
state.get.flatMap(_.cancelRequest(jobId) match {
|
||||
case Some(ct) => ct.map(_ => true)
|
||||
case None =>
|
||||
logger.fwarn(s"Job ${jobId.id} not found, cannot cancel.").map(_ => false)
|
||||
(for {
|
||||
job <- OptionT(store.transact(RJob.findByIdAndWorker(jobId, config.name)))
|
||||
_ <- OptionT.liftF(
|
||||
if (job.isInProgress) executeCancel(job)
|
||||
else ().pure[F]
|
||||
)
|
||||
} yield true)
|
||||
.getOrElseF(
|
||||
logger.fwarn(s"Job ${jobId.id} not found, cannot cancel.").map(_ => false)
|
||||
)
|
||||
})
|
||||
|
||||
def notifyChange: F[Unit] =
|
||||
@ -127,6 +137,31 @@ final class SchedulerImpl[F[_]: ConcurrentEffect: ContextShift](
|
||||
})
|
||||
}
|
||||
|
||||
private def executeCancel(job: RJob): F[Unit] = {
|
||||
val task = for {
|
||||
jobtask <-
|
||||
tasks
|
||||
.find(job.task)
|
||||
.toRight(s"This executor cannot run tasks with name: ${job.task}")
|
||||
} yield jobtask
|
||||
|
||||
task match {
|
||||
case Left(err) =>
|
||||
logger.ferror(s"Unable to run cancellation task for job ${job.info}: $err")
|
||||
case Right(t) =>
|
||||
for {
|
||||
_ <-
|
||||
logger.fdebug(s"Creating context for job ${job.info} to run cancellation $t")
|
||||
ctx <- Context[F, String](job, job.args, config, logSink, blocker, store)
|
||||
_ <- t.onCancel.run(ctx)
|
||||
_ <- state.modify(_.markCancelled(job))
|
||||
_ <- onFinish(job, JobState.Cancelled)
|
||||
_ <- ctx.logger.warn("Job has been cancelled.")
|
||||
_ <- logger.fdebug(s"Job ${job.info} has been cancelled.")
|
||||
} yield ()
|
||||
}
|
||||
}
|
||||
|
||||
def execute(job: RJob): F[Unit] = {
|
||||
val task = for {
|
||||
jobtask <-
|
||||
|
@ -4,9 +4,11 @@ import cats.implicits._
|
||||
import cats.effect._
|
||||
import docspell.common.{Ident, LenientUri}
|
||||
import docspell.common.syntax.all._
|
||||
import docspell.joexapi.model.BasicResult
|
||||
import org.http4s.{Method, Request, Uri}
|
||||
import org.http4s.client.Client
|
||||
import org.http4s.client.blaze.BlazeClientBuilder
|
||||
import org.http4s.circe.CirceEntityDecoder._
|
||||
import scala.concurrent.ExecutionContext
|
||||
|
||||
import org.log4s.getLogger
|
||||
@ -17,7 +19,7 @@ trait JoexClient[F[_]] {
|
||||
|
||||
def notifyJoexIgnoreErrors(base: LenientUri): F[Unit]
|
||||
|
||||
def cancelJob(base: LenientUri, job: Ident): F[Unit]
|
||||
def cancelJob(base: LenientUri, job: Ident): F[BasicResult]
|
||||
|
||||
}
|
||||
|
||||
@ -44,10 +46,10 @@ object JoexClient {
|
||||
()
|
||||
}
|
||||
|
||||
def cancelJob(base: LenientUri, job: Ident): F[Unit] = {
|
||||
def cancelJob(base: LenientUri, job: Ident): F[BasicResult] = {
|
||||
val cancelUrl = base / "api" / "v1" / "job" / job.id / "cancel"
|
||||
val req = Request[F](Method.POST, uri(cancelUrl))
|
||||
client.expect[String](req).map(_ => ())
|
||||
client.expect[BasicResult](req)
|
||||
}
|
||||
|
||||
private def uri(u: LenientUri): Uri =
|
||||
|
@ -36,7 +36,7 @@ that a job is some time waiting until it is picked up by a job
|
||||
executor. You can always start more job executors to help out.
|
||||
|
||||
If a job fails, it is retried after some time. Only if it fails too
|
||||
often (can be configured), it then is finished with *failed* state. If
|
||||
processing finally fails, the item is still created, just without
|
||||
suggestions. But if processing is cancelled by the user, the item is
|
||||
not created.
|
||||
often (can be configured), it then is finished with *failed* state.
|
||||
|
||||
For the document-processing task, if processing finally fails or a job
|
||||
is cancelled, the item is still created, just without suggestions.
|
||||
|
@ -454,21 +454,56 @@ object QItem {
|
||||
n <- store.transact(RItem.deleteByIdAndCollective(itemId, collective))
|
||||
} yield tn + rn + n + mn
|
||||
|
||||
def findByFileIds(fileMetaIds: Seq[Ident]): ConnectionIO[Vector[RItem]] = {
|
||||
val IC = RItem.Columns
|
||||
val AC = RAttachment.Columns
|
||||
val q =
|
||||
fr"SELECT DISTINCT" ++ commas(
|
||||
IC.all.map(_.prefix("i").f)
|
||||
) ++ fr"FROM" ++ RItem.table ++ fr"i" ++
|
||||
fr"INNER JOIN" ++ RAttachment.table ++ fr"a ON" ++ AC.itemId
|
||||
.prefix("a")
|
||||
.is(IC.id.prefix("i")) ++
|
||||
fr"WHERE" ++ AC.fileId.isOneOf(fileMetaIds) ++ orderBy(IC.created.prefix("i").asc)
|
||||
private def findByFileIdsQuery(fileMetaIds: NonEmptyList[Ident], limit: Option[Int]) = {
|
||||
val IC = RItem.Columns.all.map(_.prefix("i"))
|
||||
val aItem = RAttachment.Columns.itemId.prefix("a")
|
||||
val aId = RAttachment.Columns.id.prefix("a")
|
||||
val aFileId = RAttachment.Columns.fileId.prefix("a")
|
||||
val iId = RItem.Columns.id.prefix("i")
|
||||
val sId = RAttachmentSource.Columns.id.prefix("s")
|
||||
val sFileId = RAttachmentSource.Columns.fileId.prefix("s")
|
||||
val rId = RAttachmentArchive.Columns.id.prefix("r")
|
||||
val rFileId = RAttachmentArchive.Columns.fileId.prefix("r")
|
||||
val m1Id = RFileMeta.Columns.id.prefix("m1")
|
||||
val m2Id = RFileMeta.Columns.id.prefix("m2")
|
||||
val m3Id = RFileMeta.Columns.id.prefix("m3")
|
||||
|
||||
q.query[RItem].to[Vector]
|
||||
val from =
|
||||
RItem.table ++ fr"i INNER JOIN" ++ RAttachment.table ++ fr"a ON" ++ aItem.is(iId) ++
|
||||
fr"INNER JOIN" ++ RAttachmentSource.table ++ fr"s ON" ++ aId.is(sId) ++
|
||||
fr"INNER JOIN" ++ RFileMeta.table ++ fr"m1 ON" ++ m1Id.is(aFileId) ++
|
||||
fr"INNER JOIN" ++ RFileMeta.table ++ fr"m2 ON" ++ m2Id.is(sFileId) ++
|
||||
fr"LEFT OUTER JOIN" ++ RAttachmentArchive.table ++ fr"r ON" ++ aId.is(rId) ++
|
||||
fr"LEFT OUTER JOIN" ++ RFileMeta.table ++ fr"m3 ON" ++ m3Id.is(rFileId)
|
||||
|
||||
val q = selectSimple(
|
||||
IC,
|
||||
from,
|
||||
and(or(m1Id.isIn(fileMetaIds), m2Id.isIn(fileMetaIds), m3Id.isIn(fileMetaIds)))
|
||||
)
|
||||
|
||||
limit match {
|
||||
case Some(n) => q ++ fr"LIMIT $n"
|
||||
case None => q
|
||||
}
|
||||
}
|
||||
|
||||
def findOneByFileIds(fileMetaIds: Seq[Ident]): ConnectionIO[Option[RItem]] =
|
||||
NonEmptyList.fromList(fileMetaIds.toList) match {
|
||||
case Some(nel) =>
|
||||
findByFileIdsQuery(nel, Some(1)).query[RItem].option
|
||||
case None =>
|
||||
(None: Option[RItem]).pure[ConnectionIO]
|
||||
}
|
||||
|
||||
def findByFileIds(fileMetaIds: Seq[Ident]): ConnectionIO[Vector[RItem]] =
|
||||
NonEmptyList.fromList(fileMetaIds.toList) match {
|
||||
case Some(nel) =>
|
||||
findByFileIdsQuery(nel, None).query[RItem].to[Vector]
|
||||
case None =>
|
||||
Vector.empty[RItem].pure[ConnectionIO]
|
||||
}
|
||||
|
||||
def findByChecksum(checksum: String, collective: Ident): ConnectionIO[Vector[RItem]] = {
|
||||
val IC = RItem.Columns.all.map(_.prefix("i"))
|
||||
val aItem = RAttachment.Columns.itemId.prefix("a")
|
||||
|
@ -31,6 +31,12 @@ case class RJob(
|
||||
|
||||
def info: String =
|
||||
s"${id.id.substring(0, 9)}.../${group.id}/${task.id}/$priority"
|
||||
|
||||
def isFinalState: Boolean =
|
||||
JobState.done.contains(state)
|
||||
|
||||
def isInProgress: Boolean =
|
||||
JobState.inProgress.contains(state)
|
||||
}
|
||||
|
||||
object RJob {
|
||||
@ -121,6 +127,12 @@ object RJob {
|
||||
def findByIdAndGroup(jobId: Ident, jobGroup: Ident): ConnectionIO[Option[RJob]] =
|
||||
selectSimple(all, table, and(id.is(jobId), group.is(jobGroup))).query[RJob].option
|
||||
|
||||
def findById(jobId: Ident): ConnectionIO[Option[RJob]] =
|
||||
selectSimple(all, table, id.is(jobId)).query[RJob].option
|
||||
|
||||
def findByIdAndWorker(jobId: Ident, workerId: Ident): ConnectionIO[Option[RJob]] =
|
||||
selectSimple(all, table, and(id.is(jobId), worker.is(workerId))).query[RJob].option
|
||||
|
||||
def setRunningToWaiting(workerId: Ident): ConnectionIO[Int] = {
|
||||
val states: Seq[JobState] = List(JobState.Running, JobState.Scheduled)
|
||||
updateRow(
|
||||
|
Loading…
x
Reference in New Issue
Block a user