mirror of
				https://github.com/TheAnachronism/docspell.git
				synced 2025-10-31 09:30:12 +00:00 
			
		
		
		
	| @@ -54,6 +54,9 @@ object JobState { | ||||
|     NonEmptyList.of(Waiting, Scheduled, Running, Stuck) | ||||
|   val inProgress: Set[JobState] = Set(Scheduled, Running, Stuck) | ||||
|  | ||||
|   def isDone(state: JobState): Boolean = | ||||
|     done.exists(_ == state) | ||||
|  | ||||
|   def parse(str: String): Either[String, JobState] = | ||||
|     str.toLowerCase match { | ||||
|       case "waiting"   => Right(Waiting) | ||||
|   | ||||
| @@ -10,6 +10,9 @@ import docspell.common._ | ||||
| import docspell.joex.process.ItemData.AttachmentDates | ||||
| import docspell.store.records.{RAttachment, RAttachmentMeta, RItem} | ||||
|  | ||||
| import io.circe.syntax.EncoderOps | ||||
| import io.circe.{Encoder, Json} | ||||
|  | ||||
| /** Data that is carried across all processing tasks. | ||||
|   * | ||||
|   * @param item | ||||
| @@ -94,4 +97,28 @@ object ItemData { | ||||
|       dates.map(dl => dl.label.copy(label = dl.date.toString)) | ||||
|   } | ||||
|  | ||||
|   // Used to encode the result passed to the job-done event | ||||
|   implicit val jsonEncoder: Encoder[ItemData] = | ||||
|     Encoder.instance { data => | ||||
|       val metaMap = data.metas.groupMap(_.id)(identity) | ||||
|       Json.obj( | ||||
|         "id" -> data.item.id.asJson, | ||||
|         "name" -> data.item.name.asJson, | ||||
|         "collective" -> data.item.cid.asJson, | ||||
|         "source" -> data.item.source.asJson, | ||||
|         "attachments" -> data.attachments | ||||
|           .map(a => | ||||
|             Json.obj( | ||||
|               "id" -> a.id.asJson, | ||||
|               "name" -> a.name.asJson, | ||||
|               "content" -> metaMap.get(a.id).flatMap(_.head.content).asJson, | ||||
|               "language" -> metaMap.get(a.id).flatMap(_.head.language).asJson, | ||||
|               "pages" -> metaMap.get(a.id).flatMap(_.head.pages).asJson | ||||
|             ) | ||||
|           ) | ||||
|           .asJson, | ||||
|         "tags" -> data.tags.asJson, | ||||
|         "assumedTags" -> data.classifyTags.asJson | ||||
|       ) | ||||
|     } | ||||
| } | ||||
|   | ||||
| @@ -40,18 +40,18 @@ object ItemHandler { | ||||
|       fts: FtsClient[F], | ||||
|       analyser: TextAnalyser[F], | ||||
|       regexNer: RegexNerFile[F] | ||||
|   ): Task[F, Args, Unit] = | ||||
|   ): Task[F, Args, Option[ItemData]] = | ||||
|     logBeginning.flatMap(_ => | ||||
|       DuplicateCheck[F] | ||||
|         .flatMap(args => | ||||
|           if (args.files.isEmpty) logNoFiles | ||||
|           if (args.files.isEmpty) logNoFiles.map(_ => None) | ||||
|           else { | ||||
|             val create: Task[F, Args, ItemData] = | ||||
|               CreateItem[F].contramap(_ => args.pure[F]) | ||||
|             create | ||||
|               .flatMap(itemStateTask(ItemState.Processing)) | ||||
|               .flatMap(safeProcess[F](cfg, itemOps, fts, analyser, regexNer)) | ||||
|               .map(_ => ()) | ||||
|               .map(_.some) | ||||
|           } | ||||
|         ) | ||||
|     ) | ||||
|   | ||||
| @@ -12,7 +12,7 @@ import cats.implicits._ | ||||
| import docspell.common.Ident | ||||
| import docspell.common.syntax.all._ | ||||
|  | ||||
| import io.circe.Decoder | ||||
| import io.circe.{Decoder, Encoder, Json} | ||||
|  | ||||
| /** Binds a Task to a name. This is required to lookup the code based on the taskName in | ||||
|   * the RJob data and to execute it given the arguments that have to be read from a | ||||
| @@ -24,18 +24,19 @@ import io.circe.Decoder | ||||
|   */ | ||||
| case class JobTask[F[_]]( | ||||
|     name: Ident, | ||||
|     task: Task[F, String, Unit], | ||||
|     task: Task[F, String, Json], | ||||
|     onCancel: Task[F, String, Unit] | ||||
| ) | ||||
|  | ||||
| object JobTask { | ||||
|  | ||||
|   def json[F[_]: Sync, A]( | ||||
|   def json[F[_]: Sync, A, B]( | ||||
|       name: Ident, | ||||
|       task: Task[F, A, Unit], | ||||
|       task: Task[F, A, B], | ||||
|       onCancel: Task[F, A, Unit] | ||||
|   )(implicit | ||||
|       D: Decoder[A] | ||||
|       D: Decoder[A], | ||||
|       E: Encoder[B] | ||||
|   ): JobTask[F] = { | ||||
|     val convert: String => F[A] = | ||||
|       str => | ||||
| @@ -45,6 +46,6 @@ object JobTask { | ||||
|             Sync[F].raiseError(new Exception(s"Cannot parse task arguments: $str", ex)) | ||||
|         } | ||||
|  | ||||
|     JobTask(name, task.contramap(convert), onCancel.contramap(convert)) | ||||
|     JobTask(name, task.contramap(convert).map(E.apply), onCancel.contramap(convert)) | ||||
|   } | ||||
| } | ||||
|   | ||||
| @@ -25,6 +25,7 @@ import docspell.store.queries.QJob | ||||
| import docspell.store.queue.JobQueue | ||||
| import docspell.store.records.RJob | ||||
|  | ||||
| import io.circe.Json | ||||
| import org.log4s.getLogger | ||||
|  | ||||
| final class SchedulerImpl[F[_]: Async]( | ||||
| @@ -168,7 +169,7 @@ final class SchedulerImpl[F[_]: Async]( | ||||
|           ctx <- Context[F, String](job, job.args, config, logSink, store) | ||||
|           _ <- t.onCancel.run(ctx) | ||||
|           _ <- state.modify(_.markCancelled(job)) | ||||
|           _ <- onFinish(job, JobState.Cancelled) | ||||
|           _ <- onFinish(job, Json.Null, JobState.Cancelled) | ||||
|           _ <- ctx.logger.warn("Job has been cancelled.") | ||||
|           _ <- logger.fdebug(s"Job ${job.info} has been cancelled.") | ||||
|         } yield () | ||||
| @@ -197,27 +198,32 @@ final class SchedulerImpl[F[_]: Async]( | ||||
|     } | ||||
|   } | ||||
|  | ||||
|   def onFinish(job: RJob, finalState: JobState): F[Unit] = | ||||
|   def onFinish(job: RJob, result: Json, finishState: JobState): F[Unit] = | ||||
|     for { | ||||
|       _ <- logger.fdebug(s"Job ${job.info} done $finalState. Releasing resources.") | ||||
|       _ <- logger.fdebug(s"Job ${job.info} done $finishState. Releasing resources.") | ||||
|       _ <- permits.release *> permits.available.flatMap(a => | ||||
|         logger.fdebug(s"Permit released ($a free)") | ||||
|       ) | ||||
|       _ <- state.modify(_.removeRunning(job)) | ||||
|       _ <- QJob.setFinalState(job.id, finalState, store) | ||||
|       _ <- pubSub.publish1IgnoreErrors( | ||||
|         JobDone.topic, | ||||
|         JobDone(job.id, job.group, job.task, job.args, finalState) | ||||
|       _ <- QJob.setFinalState(job.id, finishState, store) | ||||
|       _ <- Sync[F].whenA(JobState.isDone(finishState))( | ||||
|         pubSub.publish1IgnoreErrors( | ||||
|           JobDone.topic, | ||||
|           JobDone(job.id, job.group, job.task, job.args, finishState) | ||||
|         ) | ||||
|       ) | ||||
|       _ <- eventSink.offer( | ||||
|         Event.JobDone( | ||||
|           job.id, | ||||
|           job.group, | ||||
|           job.task, | ||||
|           job.args, | ||||
|           job.state, | ||||
|           job.subject, | ||||
|           job.submitter | ||||
|       _ <- Sync[F].whenA(JobState.isDone(finishState))( | ||||
|         eventSink.offer( | ||||
|           Event.JobDone( | ||||
|             job.id, | ||||
|             job.group, | ||||
|             job.task, | ||||
|             job.args, | ||||
|             job.state, | ||||
|             job.subject, | ||||
|             job.submitter, | ||||
|             result | ||||
|           ) | ||||
|         ) | ||||
|       ) | ||||
|     } yield () | ||||
| @@ -231,45 +237,45 @@ final class SchedulerImpl[F[_]: Async]( | ||||
|  | ||||
|   def wrapTask( | ||||
|       job: RJob, | ||||
|       task: Task[F, String, Unit], | ||||
|       task: Task[F, String, Json], | ||||
|       ctx: Context[F, String] | ||||
|   ): Task[F, String, Unit] = | ||||
|     task | ||||
|       .mapF(fa => onStart(job) *> logger.fdebug("Starting task now") *> fa) | ||||
|       .mapF(_.attempt.flatMap { | ||||
|         case Right(()) => | ||||
|         case Right(result) => | ||||
|           logger.info(s"Job execution successful: ${job.info}") | ||||
|           ctx.logger.info("Job execution successful") *> | ||||
|             (JobState.Success: JobState).pure[F] | ||||
|             (JobState.Success: JobState, result).pure[F] | ||||
|         case Left(ex) => | ||||
|           state.get.map(_.wasCancelled(job)).flatMap { | ||||
|             case true => | ||||
|               logger.error(ex)(s"Job ${job.info} execution failed (cancel = true)") | ||||
|               ctx.logger.error(ex)("Job execution failed (cancel = true)") *> | ||||
|                 (JobState.Cancelled: JobState).pure[F] | ||||
|                 (JobState.Cancelled: JobState, Json.Null).pure[F] | ||||
|             case false => | ||||
|               QJob.exceedsRetries(job.id, config.retries, store).flatMap { | ||||
|                 case true => | ||||
|                   logger.error(ex)(s"Job ${job.info} execution failed. Retries exceeded.") | ||||
|                   ctx.logger | ||||
|                     .error(ex)(s"Job ${job.info} execution failed. Retries exceeded.") | ||||
|                     .map(_ => JobState.Failed: JobState) | ||||
|                     .map(_ => (JobState.Failed: JobState, Json.Null)) | ||||
|                 case false => | ||||
|                   logger.error(ex)(s"Job ${job.info} execution failed. Retrying later.") | ||||
|                   ctx.logger | ||||
|                     .error(ex)(s"Job ${job.info} execution failed. Retrying later.") | ||||
|                     .map(_ => JobState.Stuck: JobState) | ||||
|                     .map(_ => (JobState.Stuck: JobState, Json.Null)) | ||||
|               } | ||||
|           } | ||||
|       }) | ||||
|       .mapF(_.attempt.flatMap { | ||||
|         case Right(jstate) => | ||||
|           onFinish(job, jstate) | ||||
|         case Right((jstate, result)) => | ||||
|           onFinish(job, result, jstate) | ||||
|         case Left(ex) => | ||||
|           logger.error(ex)(s"Error happened during post-processing of ${job.info}!") | ||||
|           // we don't know the real outcome here… | ||||
|           // since tasks should be idempotent, set it to stuck. if above has failed, this might fail anyways | ||||
|           onFinish(job, JobState.Stuck) | ||||
|           onFinish(job, Json.Null, JobState.Stuck) | ||||
|       }) | ||||
|  | ||||
|   def forkRun( | ||||
| @@ -291,7 +297,7 @@ final class SchedulerImpl[F[_]: Async]( | ||||
|                 () | ||||
|             } *> | ||||
|             state.modify(_.markCancelled(job)) *> | ||||
|             onFinish(job, JobState.Cancelled) *> | ||||
|             onFinish(job, Json.Null, JobState.Cancelled) *> | ||||
|             ctx.logger.warn("Job has been cancelled.") *> | ||||
|             logger.fdebug(s"Job ${job.info} has been cancelled.") | ||||
|         ) | ||||
|   | ||||
| @@ -12,7 +12,7 @@ import cats.implicits._ | ||||
|  | ||||
| import docspell.common._ | ||||
|  | ||||
| import io.circe.{Decoder, Encoder} | ||||
| import io.circe.{Decoder, Encoder, Json} | ||||
|  | ||||
| /** An event generated in the platform. */ | ||||
| sealed trait Event { | ||||
| @@ -203,7 +203,8 @@ object Event { | ||||
|       args: String, | ||||
|       state: JobState, | ||||
|       subject: String, | ||||
|       submitter: Ident | ||||
|       submitter: Ident, | ||||
|       result: Json | ||||
|   ) extends Event { | ||||
|     val eventType = JobDone | ||||
|     val baseUrl = None | ||||
| @@ -220,7 +221,8 @@ object Event { | ||||
|           "", | ||||
|           JobState.running, | ||||
|           "Process 3 files", | ||||
|           account.user | ||||
|           account.user, | ||||
|           Json.Null | ||||
|         ) | ||||
|       } yield ev | ||||
|   } | ||||
|   | ||||
| @@ -1,23 +0,0 @@ | ||||
| /* | ||||
|  * Copyright 2020 Eike K. & Contributors | ||||
|  * | ||||
|  * SPDX-License-Identifier: AGPL-3.0-or-later | ||||
|  */ | ||||
|  | ||||
| package docspell.notification.api | ||||
|  | ||||
| import docspell.notification.api.Event._ | ||||
|  | ||||
| import io.circe.generic.semiauto.{deriveDecoder, deriveEncoder} | ||||
| import io.circe.{Decoder, Encoder} | ||||
|  | ||||
| trait EventCodec { | ||||
|  | ||||
|   implicit val tagsChangedDecoder: Decoder[TagsChanged] = deriveDecoder | ||||
|   implicit val tagsChangedEncoder: Encoder[TagsChanged] = deriveEncoder | ||||
|  | ||||
|   implicit val eventDecoder: Decoder[Event] = | ||||
|     deriveDecoder | ||||
|   implicit val eventEncoder: Encoder[Event] = | ||||
|     deriveEncoder | ||||
| } | ||||
| @@ -13,8 +13,8 @@ import docspell.notification.api._ | ||||
| import docspell.notification.impl.AbstractEventContext | ||||
|  | ||||
| import doobie._ | ||||
| import io.circe.Encoder | ||||
| import io.circe.syntax._ | ||||
| import io.circe.{Encoder, Json} | ||||
| import yamusca.implicits._ | ||||
|  | ||||
| final case class JobDoneCtx(event: Event.JobDone, data: JobDoneCtx.Data) | ||||
| @@ -45,14 +45,23 @@ object JobDoneCtx { | ||||
|       args: String, | ||||
|       state: JobState, | ||||
|       subject: String, | ||||
|       submitter: Ident | ||||
|       submitter: Ident, | ||||
|       result: Json | ||||
|   ) | ||||
|   object Data { | ||||
|     implicit val jsonEncoder: Encoder[Data] = | ||||
|       io.circe.generic.semiauto.deriveEncoder | ||||
|  | ||||
|     def apply(ev: Event.JobDone): Data = | ||||
|       Data(ev.jobId, ev.group, ev.task, ev.args, ev.state, ev.subject, ev.submitter) | ||||
|       Data( | ||||
|         ev.jobId, | ||||
|         ev.group, | ||||
|         ev.task, | ||||
|         ev.args, | ||||
|         ev.state, | ||||
|         ev.subject, | ||||
|         ev.submitter, | ||||
|         ev.result | ||||
|       ) | ||||
|   } | ||||
|  | ||||
| } | ||||
|   | ||||
		Reference in New Issue
	
	Block a user