Experiment with addons

Addons allow to execute external programs in some context inside
docspell. Currently it is possible to run them after processing files.
Addons are provided by URLs to zip files.
This commit is contained in:
eikek
2022-04-22 14:07:28 +02:00
parent e04a76faa4
commit 7fdd78ad06
166 changed files with 8181 additions and 115 deletions

View File

@ -6,6 +6,8 @@
package docspell.scheduler
import cats.effect.Sync
import docspell.common._
import docspell.logging.Logger
@ -25,4 +27,8 @@ trait Context[F[_], A] { self =>
def map[C](f: A => C): Context[F, C]
def unit: Context[F, Unit] =
map(_ => ())
def loadJob(implicit F: Sync[F]): F[Job[String]]
}

View File

@ -0,0 +1,26 @@
/*
* Copyright 2020 Eike K. & Contributors
*
* SPDX-License-Identifier: AGPL-3.0-or-later
*/
package docspell.scheduler
/** Special "marker" exception to indicate errors in tasks, that should NOT be retried. */
final class PermanentError(cause: Throwable) extends RuntimeException(cause) {
override def fillInStackTrace() = this
}
object PermanentError {
def apply(cause: Throwable): PermanentError =
new PermanentError(cause)
def isPermanent(ex: Throwable): Boolean =
unapply(ex).isDefined
def unapply(ex: Throwable): Option[Throwable] =
ex match {
case p: PermanentError => Some(p.getCause)
case _ => None
}
}

View File

@ -21,6 +21,11 @@ trait Task[F[_], A, B] {
def andThen[C](f: B => F[C])(implicit F: FlatMap[F]): Task[F, A, C] =
Task(Task.toKleisli(this).andThen(f))
def andThenC[C](f: (Context[F, A], B) => F[C])(implicit M: Monad[F]): Task[F, A, C] = {
val run = Task.toKleisli(this).run
Task(ctx => run(ctx).flatMap(b => f(ctx, b)))
}
def mapF[C](f: F[B] => F[C]): Task[F, A, C] =
Task(Task.toKleisli(this).mapF(f))

View File

@ -50,6 +50,9 @@ object UserTaskScope {
def apply(collective: Ident): UserTaskScope =
UserTaskScope.collective(collective)
def apply(collective: Ident, login: Option[Ident]): UserTaskScope =
login.map(AccountId(collective, _)).map(account).getOrElse(apply(collective))
def system: UserTaskScope =
collective(DocspellSystem.taskGroup)
}

View File

@ -24,6 +24,19 @@ class ContextImpl[F[_]: Functor, A](
val jobId: Ident
) extends Context[F, A] {
def loadJob(implicit F: Sync[F]): F[Job[String]] =
JobStoreImpl(store)
.findById(jobId)
.flatMap(
_.fold(
F.raiseError[Job[String]](
new IllegalStateException(s"Job not found: ${jobId.id}")
)
)(
F.pure
)
)
def setProgress(percent: Int): F[Unit] = {
val pval = math.min(100, math.max(0, percent))
store.transact(RJob.setProgress(jobId, pval)).map(_ => ())

View File

@ -34,6 +34,7 @@ object LogSink {
.capture("task", e.taskName)
.capture("group", e.group)
.capture("jobInfo", e.jobInfo)
.captureAll(e.data)
e.level match {
case LogLevel.Info =>

View File

@ -12,8 +12,7 @@ import cats.syntax.all._
import fs2.Stream
import docspell.common.{Ident, LogLevel}
import docspell.logging
import docspell.logging.{Level, Logger}
import docspell.logging.{Level, LogEvent => DsLogEvent, Logger}
/** Background tasks use this logger to emit the log events to a queue. The consumer is
* [[LogSink]], which picks up log events in a separate thread.
@ -29,7 +28,7 @@ object QueueLogger {
): Logger[F] =
new Logger[F] {
def log(logEvent: => logging.LogEvent) =
def log(logEvent: => DsLogEvent) =
LogEvent
.create[F](
jobId,

View File

@ -267,27 +267,36 @@ final class SchedulerImpl[F[_]: Async](
.mapF(fa => onStart(job) *> logger.debug("Starting task now") *> fa)
.mapF(_.attempt.flatMap {
case Right(result) =>
logger.info(s"Job execution successful: ${job.info}")
ctx.logger.info("Job execution successful") *>
logger.info(s"Job execution successful: ${job.info}") *>
ctx.logger.info("Job execution successful") *>
(JobState.Success: JobState, result).pure[F]
case Left(PermanentError(ex)) =>
logger.warn(ex)("Task failed with permanent error") *>
ctx.logger
.warn(ex)("Task failed with permanent error!")
.as(JobState.failed -> JobTaskResult.empty)
case Left(ex) =>
state.get.map(_.wasCancelled(job)).flatMap {
case true =>
logger.error(ex)(s"Job ${job.info} execution failed (cancel = true)")
ctx.logger.error(ex)("Job execution failed (cancel = true)") *>
logger.error(ex)(s"Job ${job.info} execution failed (cancel = true)") *>
ctx.logger.error(ex)("Job execution failed (cancel = true)") *>
(JobState.Cancelled: JobState, JobTaskResult.empty).pure[F]
case false =>
QJob.exceedsRetries(job.id, config.retries, store).flatMap {
case true =>
logger.error(ex)(s"Job ${job.info} execution failed. Retries exceeded.")
ctx.logger
.error(ex)(s"Job ${job.info} execution failed. Retries exceeded.")
.map(_ => (JobState.Failed: JobState, JobTaskResult.empty))
logger
.error(ex)(s"Job ${job.info} execution failed. Retries exceeded.") *>
ctx.logger
.error(ex)(s"Job ${job.info} execution failed. Retries exceeded.")
.map(_ => (JobState.Failed: JobState, JobTaskResult.empty))
case false =>
logger.error(ex)(s"Job ${job.info} execution failed. Retrying later.")
ctx.logger
.error(ex)(s"Job ${job.info} execution failed. Retrying later.")
.map(_ => (JobState.Stuck: JobState, JobTaskResult.empty))
logger
.error(ex)(s"Job ${job.info} execution failed. Retrying later.") *>
ctx.logger
.error(ex)(s"Job ${job.info} execution failed. Retrying later.")
.map(_ => (JobState.Stuck: JobState, JobTaskResult.empty))
}
}
})