Provide tasks with ability to return data and human message

To allow better communication from background tasks, tasks can return
not only data (json), but also a human readable message which is send
via notification channels
This commit is contained in:
eikek
2022-03-11 22:56:14 +01:00
parent c1ce0769eb
commit 290b4ca58b
16 changed files with 250 additions and 76 deletions

View File

@ -9,14 +9,12 @@ package docspell.joex.filecopy
import cats.data.NonEmptyList
import cats.effect._
import cats.implicits._
import docspell.common.FileCopyTaskArgs.Selection
import docspell.common.{FileCopyTaskArgs, Ident}
import docspell.joex.Config
import docspell.joex.scheduler.Task
import docspell.joex.scheduler.{JobTaskResultEncoder, Task}
import docspell.logging.Logger
import docspell.store.file.{BinnyUtils, FileRepository, FileRepositoryConfig}
import binny.CopyTool.Counter
import binny.{BinaryId, BinaryStore, CopyTool}
import io.circe.generic.semiauto.deriveCodec
@ -56,6 +54,16 @@ object FileCopyTask {
deriveCodec
implicit val jsonCodec: Codec[CopyResult] =
deriveCodec
implicit val jobTaskResultEncoder: JobTaskResultEncoder[CopyResult] =
JobTaskResultEncoder.fromJson[CopyResult].withMessage { result =>
val allGood = result.counter.map(_.success).sum
val failed = result.counter.map(_.failed.size).sum
if (result.success)
s"Successfully copied $allGood files to ${result.counter.size} stores."
else
s"Copying files failed for ${failed} files! ${allGood} were copied successfully."
}
}
def onCancel[F[_]]: Task[F, Args, Unit] =
@ -91,7 +99,7 @@ object FileCopyTask {
data match {
case Right((from, tos)) =>
ctx.logger.info(s"Start copying all files from ") *>
ctx.logger.info(s"Start copying all files from $from") *>
copy(ctx.logger, from, tos).flatTap(r =>
if (r.success) ctx.logger.info(s"Copying finished: ${r.counter}")
else ctx.logger.error(s"Copying failed: $r")

View File

@ -9,34 +9,49 @@ package docspell.joex.filecopy
import cats.Monoid
import cats.effect._
import cats.implicits._
import docspell.backend.ops.OFileRepository
import docspell.backend.ops.OFileRepository.IntegrityResult
import docspell.common.{FileIntegrityCheckArgs, FileKey}
import docspell.joex.scheduler.Task
import docspell.joex.scheduler.{JobTaskResultEncoder, Task}
import docspell.store.records.RFileMeta
import io.circe.Encoder
import io.circe.generic.semiauto.deriveEncoder
object FileIntegrityCheckTask {
type Args = FileIntegrityCheckArgs
case class Result(ok: Int, failedKeys: Set[FileKey]) {
case class Result(ok: Int, failedKeys: Set[FileKey], notFoundKeys: Set[FileKey]) {
override def toString: String =
s"Result(ok=$ok, failed=${failedKeys.size}, keysFailed=$failedKeys)"
s"Result(ok=$ok, failed=${failedKeys.size}, notFound=${notFoundKeys.size}, " +
s"keysFailed=$failedKeys, notFoundKeys=$notFoundKeys)"
}
object Result {
val empty = Result(0, Set.empty)
val empty = Result(0, Set.empty, Set.empty)
def notFound(key: FileKey) = Result(0, Set.empty, Set(key))
def from(r: IntegrityResult): Result =
if (r.ok) Result(1, Set.empty) else Result(0, Set(r.key))
if (r.ok) Result(1, Set.empty, Set.empty) else Result(0, Set(r.key), Set.empty)
implicit val monoid: Monoid[Result] =
Monoid.instance(empty, (a, b) => Result(a.ok + b.ok, a.failedKeys ++ b.failedKeys))
Monoid.instance(
empty,
(a, b) =>
Result(
a.ok + b.ok,
a.failedKeys ++ b.failedKeys,
a.notFoundKeys ++ b.notFoundKeys
)
)
implicit val jsonEncoder: Encoder[Result] =
deriveEncoder
implicit val jobTaskResultEncoder: JobTaskResultEncoder[Result] =
JobTaskResultEncoder.fromJson[Result].withMessage { result =>
s"Integrity check finished. Ok: ${result.ok}, " +
s"Failed: ${result.failedKeys.size}, Not found: ${result.notFoundKeys.size}"
}
}
def apply[F[_]: Sync](ops: OFileRepository[F]): Task[F, Args, Result] =
@ -49,13 +64,16 @@ object FileIntegrityCheckTask {
.chunks
.evalTap(c => ctx.logger.info(s"Checking next ${c.size} files…"))
.unchunks
.evalMap(meta => ops.checkIntegrity(meta.id, meta.checksum.some))
.evalMap {
case Some(r) =>
Result.from(r).pure[F]
case None =>
ctx.logger.error(s"File not found").as(Result.empty)
}
.evalMap(meta =>
ops.checkIntegrity(meta.id, meta.checksum.some).flatMap {
case Some(r) =>
Result.from(r).pure[F]
case None =>
ctx.logger
.error(s"File '${meta.id.toString}' not found in file repository")
.as(Result.notFound(meta.id))
}
)
.foldMonoid
.compile
.lastOrError
@ -67,5 +85,4 @@ object FileIntegrityCheckTask {
def onCancel[F[_]]: Task[F, Args, Unit] =
Task.log(_.warn(s"Cancelling ${FileIntegrityCheckArgs.taskName.id} task"))
}

View File

@ -8,6 +8,7 @@ package docspell.joex.process
import docspell.common._
import docspell.joex.process.ItemData.AttachmentDates
import docspell.joex.scheduler.JobTaskResultEncoder
import docspell.store.records.{RAttachment, RAttachmentMeta, RItem}
import io.circe.syntax.EncoderOps
@ -118,7 +119,28 @@ object ItemData {
)
.asJson,
"tags" -> data.tags.asJson,
"assumedTags" -> data.classifyTags.asJson
"assumedTags" -> data.classifyTags.asJson,
"assumedCorrOrg" -> data.finalProposals
.find(MetaProposalType.CorrOrg)
.map(_.values.head.ref)
.asJson
)
}
implicit val jobTaskResultEncoder: JobTaskResultEncoder[ItemData] =
JobTaskResultEncoder.fromJson[ItemData].withMessage { data =>
val tags =
if (data.tags.isEmpty && data.classifyTags.isEmpty) ""
else (data.tags ++ data.classifyTags).mkString("[", ", ", "]")
val corg =
data.finalProposals.find(MetaProposalType.CorrOrg).map(_.values.head.ref.name)
val cpers =
data.finalProposals.find(MetaProposalType.CorrPerson).map(_.values.head.ref.name)
val org = corg match {
case Some(o) => s" by $o" + cpers.map(p => s"/$p").getOrElse("")
case None => cpers.map(p => s" by $p").getOrElse("")
}
s"Processed '${data.item.name}' $tags$org"
}
}

View File

@ -12,7 +12,7 @@ import cats.implicits._
import docspell.common.Ident
import docspell.common.syntax.all._
import io.circe.{Decoder, Encoder, Json}
import io.circe.Decoder
/** Binds a Task to a name. This is required to lookup the code based on the taskName in
* the RJob data and to execute it given the arguments that have to be read from a
@ -24,7 +24,7 @@ import io.circe.{Decoder, Encoder, Json}
*/
case class JobTask[F[_]](
name: Ident,
task: Task[F, String, Json],
task: Task[F, String, JobTaskResult],
onCancel: Task[F, String, Unit]
)
@ -36,7 +36,7 @@ object JobTask {
onCancel: Task[F, A, Unit]
)(implicit
D: Decoder[A],
E: Encoder[B]
E: JobTaskResultEncoder[B]
): JobTask[F] = {
val convert: String => F[A] =
str =>
@ -46,6 +46,6 @@ object JobTask {
Sync[F].raiseError(new Exception(s"Cannot parse task arguments: $str", ex))
}
JobTask(name, task.contramap(convert).map(E.apply), onCancel.contramap(convert))
JobTask(name, task.contramap(convert).map(E.encode), onCancel.contramap(convert))
}
}

View File

@ -0,0 +1,27 @@
/*
* Copyright 2020 Eike K. & Contributors
*
* SPDX-License-Identifier: AGPL-3.0-or-later
*/
package docspell.joex.scheduler
import io.circe.Json
final case class JobTaskResult(message: Option[String], json: Option[Json]) {
def withMessage(m: String): JobTaskResult =
copy(message = Some(m))
def withJson(json: Json): JobTaskResult =
copy(json = Some(json))
}
object JobTaskResult {
val empty: JobTaskResult = JobTaskResult(None, None)
def message(msg: String): JobTaskResult = JobTaskResult(Some(msg), None)
def json(json: Json): JobTaskResult = JobTaskResult(None, Some(json))
}

View File

@ -0,0 +1,49 @@
/*
* Copyright 2020 Eike K. & Contributors
*
* SPDX-License-Identifier: AGPL-3.0-or-later
*/
package docspell.joex.scheduler
import docspell.joex.scheduler.JobTaskResultEncoder.instance
import io.circe.Encoder
trait JobTaskResultEncoder[A] { self =>
def encode(a: A): JobTaskResult
final def contramap[B](f: B => A): JobTaskResultEncoder[B] =
JobTaskResultEncoder.instance(b => self.encode(f(b)))
final def map(f: JobTaskResult => JobTaskResult): JobTaskResultEncoder[A] =
instance(a => f(self.encode(a)))
final def modify(f: (A, JobTaskResult) => JobTaskResult): JobTaskResultEncoder[A] =
instance(a => f(a, self.encode(a)))
final def withMessage(f: A => String): JobTaskResultEncoder[A] =
modify((a, r) => r.withMessage(f(a)))
}
object JobTaskResultEncoder {
def apply[A](implicit v: JobTaskResultEncoder[A]): JobTaskResultEncoder[A] = v
def instance[A](f: A => JobTaskResult): JobTaskResultEncoder[A] =
(a: A) => f(a)
def fromJson[A: Encoder]: JobTaskResultEncoder[A] =
instance(a => JobTaskResult.json(Encoder[A].apply(a)))
implicit val unitJobTaskResultEncoder: JobTaskResultEncoder[Unit] =
instance(_ => JobTaskResult.empty)
implicit def optionJobTaskResultEncoder[A](implicit
ea: JobTaskResultEncoder[A]
): JobTaskResultEncoder[Option[A]] =
instance {
case Some(a) => ea.encode(a)
case None => JobTaskResult.empty
}
}

View File

@ -167,7 +167,7 @@ final class SchedulerImpl[F[_]: Async](
ctx <- Context[F, String](job, job.args, config, logSink, store)
_ <- t.onCancel.run(ctx)
_ <- state.modify(_.markCancelled(job))
_ <- onFinish(job, Json.Null, JobState.Cancelled)
_ <- onFinish(job, JobTaskResult.empty, JobState.Cancelled)
_ <- ctx.logger.warn("Job has been cancelled.")
_ <- logger.debug(s"Job ${job.info} has been cancelled.")
} yield ()
@ -196,7 +196,7 @@ final class SchedulerImpl[F[_]: Async](
}
}
def onFinish(job: RJob, result: Json, finishState: JobState): F[Unit] =
def onFinish(job: RJob, result: JobTaskResult, finishState: JobState): F[Unit] =
for {
_ <- logger.debug(s"Job ${job.info} done $finishState. Releasing resources.")
_ <- permits.release *> permits.available.flatMap(a =>
@ -220,7 +220,8 @@ final class SchedulerImpl[F[_]: Async](
job.state,
job.subject,
job.submitter,
result
result.json.getOrElse(Json.Null),
result.message
)
)
)
@ -235,7 +236,7 @@ final class SchedulerImpl[F[_]: Async](
def wrapTask(
job: RJob,
task: Task[F, String, Json],
task: Task[F, String, JobTaskResult],
ctx: Context[F, String]
): Task[F, String, Unit] =
task
@ -250,19 +251,19 @@ final class SchedulerImpl[F[_]: Async](
case true =>
logger.error(ex)(s"Job ${job.info} execution failed (cancel = true)")
ctx.logger.error(ex)("Job execution failed (cancel = true)") *>
(JobState.Cancelled: JobState, Json.Null).pure[F]
(JobState.Cancelled: JobState, JobTaskResult.empty).pure[F]
case false =>
QJob.exceedsRetries(job.id, config.retries, store).flatMap {
case true =>
logger.error(ex)(s"Job ${job.info} execution failed. Retries exceeded.")
ctx.logger
.error(ex)(s"Job ${job.info} execution failed. Retries exceeded.")
.map(_ => (JobState.Failed: JobState, Json.Null))
.map(_ => (JobState.Failed: JobState, JobTaskResult.empty))
case false =>
logger.error(ex)(s"Job ${job.info} execution failed. Retrying later.")
ctx.logger
.error(ex)(s"Job ${job.info} execution failed. Retrying later.")
.map(_ => (JobState.Stuck: JobState, Json.Null))
.map(_ => (JobState.Stuck: JobState, JobTaskResult.empty))
}
}
})
@ -273,7 +274,7 @@ final class SchedulerImpl[F[_]: Async](
logger.error(ex)(s"Error happened during post-processing of ${job.info}!")
// we don't know the real outcome here…
// since tasks should be idempotent, set it to stuck. if above has failed, this might fail anyways
onFinish(job, Json.Null, JobState.Stuck)
onFinish(job, JobTaskResult.empty, JobState.Stuck)
})
def forkRun(
@ -295,7 +296,7 @@ final class SchedulerImpl[F[_]: Async](
()
} *>
state.modify(_.markCancelled(job)) *>
onFinish(job, Json.Null, JobState.Cancelled) *>
onFinish(job, JobTaskResult.empty, JobState.Cancelled) *>
ctx.logger.warn("Job has been cancelled.") *>
logger.debug(s"Job ${job.info} has been cancelled.")
)