From d6829ea69b195eb70c971c45247e26cbf9c1fdfc Mon Sep 17 00:00:00 2001 From: eikek Date: Wed, 9 Feb 2022 23:38:32 +0100 Subject: [PATCH 1/2] Allow background tasks return results that are included in the event --- .../main/scala/docspell/common/JobState.scala | 3 + .../docspell/joex/scheduler/JobTask.scala | 13 +++-- .../joex/scheduler/SchedulerImpl.scala | 58 ++++++++++--------- .../docspell/notification/api/Event.scala | 8 ++- .../impl/context/JobDoneCtx.scala | 17 ++++-- 5 files changed, 60 insertions(+), 39 deletions(-) diff --git a/modules/common/src/main/scala/docspell/common/JobState.scala b/modules/common/src/main/scala/docspell/common/JobState.scala index f2f52775..7f0aa31e 100644 --- a/modules/common/src/main/scala/docspell/common/JobState.scala +++ b/modules/common/src/main/scala/docspell/common/JobState.scala @@ -54,6 +54,9 @@ object JobState { NonEmptyList.of(Waiting, Scheduled, Running, Stuck) val inProgress: Set[JobState] = Set(Scheduled, Running, Stuck) + def isDone(state: JobState): Boolean = + done.exists(_ == state) + def parse(str: String): Either[String, JobState] = str.toLowerCase match { case "waiting" => Right(Waiting) diff --git a/modules/joex/src/main/scala/docspell/joex/scheduler/JobTask.scala b/modules/joex/src/main/scala/docspell/joex/scheduler/JobTask.scala index 9a04ad79..fd291d0b 100644 --- a/modules/joex/src/main/scala/docspell/joex/scheduler/JobTask.scala +++ b/modules/joex/src/main/scala/docspell/joex/scheduler/JobTask.scala @@ -12,7 +12,7 @@ import cats.implicits._ import docspell.common.Ident import docspell.common.syntax.all._ -import io.circe.Decoder +import io.circe.{Decoder, Encoder, Json} /** Binds a Task to a name. This is required to lookup the code based on the taskName in * the RJob data and to execute it given the arguments that have to be read from a @@ -24,18 +24,19 @@ import io.circe.Decoder */ case class JobTask[F[_]]( name: Ident, - task: Task[F, String, Unit], + task: Task[F, String, Json], onCancel: Task[F, String, Unit] ) object JobTask { - def json[F[_]: Sync, A]( + def json[F[_]: Sync, A, B]( name: Ident, - task: Task[F, A, Unit], + task: Task[F, A, B], onCancel: Task[F, A, Unit] )(implicit - D: Decoder[A] + D: Decoder[A], + E: Encoder[B] ): JobTask[F] = { val convert: String => F[A] = str => @@ -45,6 +46,6 @@ object JobTask { Sync[F].raiseError(new Exception(s"Cannot parse task arguments: $str", ex)) } - JobTask(name, task.contramap(convert), onCancel.contramap(convert)) + JobTask(name, task.contramap(convert).map(E.apply), onCancel.contramap(convert)) } } diff --git a/modules/joex/src/main/scala/docspell/joex/scheduler/SchedulerImpl.scala b/modules/joex/src/main/scala/docspell/joex/scheduler/SchedulerImpl.scala index 877364b6..d30c43eb 100644 --- a/modules/joex/src/main/scala/docspell/joex/scheduler/SchedulerImpl.scala +++ b/modules/joex/src/main/scala/docspell/joex/scheduler/SchedulerImpl.scala @@ -25,6 +25,7 @@ import docspell.store.queries.QJob import docspell.store.queue.JobQueue import docspell.store.records.RJob +import io.circe.Json import org.log4s.getLogger final class SchedulerImpl[F[_]: Async]( @@ -168,7 +169,7 @@ final class SchedulerImpl[F[_]: Async]( ctx <- Context[F, String](job, job.args, config, logSink, store) _ <- t.onCancel.run(ctx) _ <- state.modify(_.markCancelled(job)) - _ <- onFinish(job, JobState.Cancelled) + _ <- onFinish(job, Json.Null, JobState.Cancelled) _ <- ctx.logger.warn("Job has been cancelled.") _ <- logger.fdebug(s"Job ${job.info} has been cancelled.") } yield () @@ -197,27 +198,32 @@ final class SchedulerImpl[F[_]: Async]( } } - def onFinish(job: RJob, finalState: JobState): F[Unit] = + def onFinish(job: RJob, result: Json, finishState: JobState): F[Unit] = for { - _ <- logger.fdebug(s"Job ${job.info} done $finalState. Releasing resources.") + _ <- logger.fdebug(s"Job ${job.info} done $finishState. Releasing resources.") _ <- permits.release *> permits.available.flatMap(a => logger.fdebug(s"Permit released ($a free)") ) _ <- state.modify(_.removeRunning(job)) - _ <- QJob.setFinalState(job.id, finalState, store) - _ <- pubSub.publish1IgnoreErrors( - JobDone.topic, - JobDone(job.id, job.group, job.task, job.args, finalState) + _ <- QJob.setFinalState(job.id, finishState, store) + _ <- Sync[F].whenA(JobState.isDone(finishState))( + pubSub.publish1IgnoreErrors( + JobDone.topic, + JobDone(job.id, job.group, job.task, job.args, finishState) + ) ) - _ <- eventSink.offer( - Event.JobDone( - job.id, - job.group, - job.task, - job.args, - job.state, - job.subject, - job.submitter + _ <- Sync[F].whenA(JobState.isDone(finishState))( + eventSink.offer( + Event.JobDone( + job.id, + job.group, + job.task, + job.args, + job.state, + job.subject, + job.submitter, + result + ) ) ) } yield () @@ -231,45 +237,45 @@ final class SchedulerImpl[F[_]: Async]( def wrapTask( job: RJob, - task: Task[F, String, Unit], + task: Task[F, String, Json], ctx: Context[F, String] ): Task[F, String, Unit] = task .mapF(fa => onStart(job) *> logger.fdebug("Starting task now") *> fa) .mapF(_.attempt.flatMap { - case Right(()) => + case Right(result) => logger.info(s"Job execution successful: ${job.info}") ctx.logger.info("Job execution successful") *> - (JobState.Success: JobState).pure[F] + (JobState.Success: JobState, result).pure[F] case Left(ex) => state.get.map(_.wasCancelled(job)).flatMap { case true => logger.error(ex)(s"Job ${job.info} execution failed (cancel = true)") ctx.logger.error(ex)("Job execution failed (cancel = true)") *> - (JobState.Cancelled: JobState).pure[F] + (JobState.Cancelled: JobState, Json.Null).pure[F] case false => QJob.exceedsRetries(job.id, config.retries, store).flatMap { case true => logger.error(ex)(s"Job ${job.info} execution failed. Retries exceeded.") ctx.logger .error(ex)(s"Job ${job.info} execution failed. Retries exceeded.") - .map(_ => JobState.Failed: JobState) + .map(_ => (JobState.Failed: JobState, Json.Null)) case false => logger.error(ex)(s"Job ${job.info} execution failed. Retrying later.") ctx.logger .error(ex)(s"Job ${job.info} execution failed. Retrying later.") - .map(_ => JobState.Stuck: JobState) + .map(_ => (JobState.Stuck: JobState, Json.Null)) } } }) .mapF(_.attempt.flatMap { - case Right(jstate) => - onFinish(job, jstate) + case Right((jstate, result)) => + onFinish(job, result, jstate) case Left(ex) => logger.error(ex)(s"Error happened during post-processing of ${job.info}!") // we don't know the real outcome hereā€¦ // since tasks should be idempotent, set it to stuck. if above has failed, this might fail anyways - onFinish(job, JobState.Stuck) + onFinish(job, Json.Null, JobState.Stuck) }) def forkRun( @@ -291,7 +297,7 @@ final class SchedulerImpl[F[_]: Async]( () } *> state.modify(_.markCancelled(job)) *> - onFinish(job, JobState.Cancelled) *> + onFinish(job, Json.Null, JobState.Cancelled) *> ctx.logger.warn("Job has been cancelled.") *> logger.fdebug(s"Job ${job.info} has been cancelled.") ) diff --git a/modules/notification/api/src/main/scala/docspell/notification/api/Event.scala b/modules/notification/api/src/main/scala/docspell/notification/api/Event.scala index 18ff790a..3da27455 100644 --- a/modules/notification/api/src/main/scala/docspell/notification/api/Event.scala +++ b/modules/notification/api/src/main/scala/docspell/notification/api/Event.scala @@ -12,7 +12,7 @@ import cats.implicits._ import docspell.common._ -import io.circe.{Decoder, Encoder} +import io.circe.{Decoder, Encoder, Json} /** An event generated in the platform. */ sealed trait Event { @@ -203,7 +203,8 @@ object Event { args: String, state: JobState, subject: String, - submitter: Ident + submitter: Ident, + result: Json ) extends Event { val eventType = JobDone val baseUrl = None @@ -220,7 +221,8 @@ object Event { "", JobState.running, "Process 3 files", - account.user + account.user, + Json.Null ) } yield ev } diff --git a/modules/notification/impl/src/main/scala/docspell/notification/impl/context/JobDoneCtx.scala b/modules/notification/impl/src/main/scala/docspell/notification/impl/context/JobDoneCtx.scala index 2b5c9b24..8ef19754 100644 --- a/modules/notification/impl/src/main/scala/docspell/notification/impl/context/JobDoneCtx.scala +++ b/modules/notification/impl/src/main/scala/docspell/notification/impl/context/JobDoneCtx.scala @@ -13,8 +13,8 @@ import docspell.notification.api._ import docspell.notification.impl.AbstractEventContext import doobie._ -import io.circe.Encoder import io.circe.syntax._ +import io.circe.{Encoder, Json} import yamusca.implicits._ final case class JobDoneCtx(event: Event.JobDone, data: JobDoneCtx.Data) @@ -45,14 +45,23 @@ object JobDoneCtx { args: String, state: JobState, subject: String, - submitter: Ident + submitter: Ident, + result: Json ) object Data { implicit val jsonEncoder: Encoder[Data] = io.circe.generic.semiauto.deriveEncoder def apply(ev: Event.JobDone): Data = - Data(ev.jobId, ev.group, ev.task, ev.args, ev.state, ev.subject, ev.submitter) + Data( + ev.jobId, + ev.group, + ev.task, + ev.args, + ev.state, + ev.subject, + ev.submitter, + ev.result + ) } - } From 19e040b029236262a0960dba0e7175df8d93e09d Mon Sep 17 00:00:00 2001 From: eikek Date: Thu, 10 Feb 2022 20:28:07 +0100 Subject: [PATCH 2/2] Send results from processing documents in the event --- .../docspell/joex/process/ItemData.scala | 27 +++++++++++++++++++ .../docspell/joex/process/ItemHandler.scala | 6 ++--- .../notification/api/EventCodec.scala | 23 ---------------- 3 files changed, 30 insertions(+), 26 deletions(-) delete mode 100644 modules/notification/api/src/main/scala/docspell/notification/api/EventCodec.scala diff --git a/modules/joex/src/main/scala/docspell/joex/process/ItemData.scala b/modules/joex/src/main/scala/docspell/joex/process/ItemData.scala index 35c1e5d8..7e8bd697 100644 --- a/modules/joex/src/main/scala/docspell/joex/process/ItemData.scala +++ b/modules/joex/src/main/scala/docspell/joex/process/ItemData.scala @@ -10,6 +10,9 @@ import docspell.common._ import docspell.joex.process.ItemData.AttachmentDates import docspell.store.records.{RAttachment, RAttachmentMeta, RItem} +import io.circe.syntax.EncoderOps +import io.circe.{Encoder, Json} + /** Data that is carried across all processing tasks. * * @param item @@ -94,4 +97,28 @@ object ItemData { dates.map(dl => dl.label.copy(label = dl.date.toString)) } + // Used to encode the result passed to the job-done event + implicit val jsonEncoder: Encoder[ItemData] = + Encoder.instance { data => + val metaMap = data.metas.groupMap(_.id)(identity) + Json.obj( + "id" -> data.item.id.asJson, + "name" -> data.item.name.asJson, + "collective" -> data.item.cid.asJson, + "source" -> data.item.source.asJson, + "attachments" -> data.attachments + .map(a => + Json.obj( + "id" -> a.id.asJson, + "name" -> a.name.asJson, + "content" -> metaMap.get(a.id).flatMap(_.head.content).asJson, + "language" -> metaMap.get(a.id).flatMap(_.head.language).asJson, + "pages" -> metaMap.get(a.id).flatMap(_.head.pages).asJson + ) + ) + .asJson, + "tags" -> data.tags.asJson, + "assumedTags" -> data.classifyTags.asJson + ) + } } diff --git a/modules/joex/src/main/scala/docspell/joex/process/ItemHandler.scala b/modules/joex/src/main/scala/docspell/joex/process/ItemHandler.scala index 58a696f9..0afd3784 100644 --- a/modules/joex/src/main/scala/docspell/joex/process/ItemHandler.scala +++ b/modules/joex/src/main/scala/docspell/joex/process/ItemHandler.scala @@ -40,18 +40,18 @@ object ItemHandler { fts: FtsClient[F], analyser: TextAnalyser[F], regexNer: RegexNerFile[F] - ): Task[F, Args, Unit] = + ): Task[F, Args, Option[ItemData]] = logBeginning.flatMap(_ => DuplicateCheck[F] .flatMap(args => - if (args.files.isEmpty) logNoFiles + if (args.files.isEmpty) logNoFiles.map(_ => None) else { val create: Task[F, Args, ItemData] = CreateItem[F].contramap(_ => args.pure[F]) create .flatMap(itemStateTask(ItemState.Processing)) .flatMap(safeProcess[F](cfg, itemOps, fts, analyser, regexNer)) - .map(_ => ()) + .map(_.some) } ) ) diff --git a/modules/notification/api/src/main/scala/docspell/notification/api/EventCodec.scala b/modules/notification/api/src/main/scala/docspell/notification/api/EventCodec.scala deleted file mode 100644 index 1b66841f..00000000 --- a/modules/notification/api/src/main/scala/docspell/notification/api/EventCodec.scala +++ /dev/null @@ -1,23 +0,0 @@ -/* - * Copyright 2020 Eike K. & Contributors - * - * SPDX-License-Identifier: AGPL-3.0-or-later - */ - -package docspell.notification.api - -import docspell.notification.api.Event._ - -import io.circe.generic.semiauto.{deriveDecoder, deriveEncoder} -import io.circe.{Decoder, Encoder} - -trait EventCodec { - - implicit val tagsChangedDecoder: Decoder[TagsChanged] = deriveDecoder - implicit val tagsChangedEncoder: Encoder[TagsChanged] = deriveEncoder - - implicit val eventDecoder: Decoder[Event] = - deriveDecoder - implicit val eventEncoder: Encoder[Event] = - deriveEncoder -}