diff --git a/modules/common/src/main/scala/docspell/common/JobState.scala b/modules/common/src/main/scala/docspell/common/JobState.scala index f2f52775..7f0aa31e 100644 --- a/modules/common/src/main/scala/docspell/common/JobState.scala +++ b/modules/common/src/main/scala/docspell/common/JobState.scala @@ -54,6 +54,9 @@ object JobState { NonEmptyList.of(Waiting, Scheduled, Running, Stuck) val inProgress: Set[JobState] = Set(Scheduled, Running, Stuck) + def isDone(state: JobState): Boolean = + done.exists(_ == state) + def parse(str: String): Either[String, JobState] = str.toLowerCase match { case "waiting" => Right(Waiting) diff --git a/modules/joex/src/main/scala/docspell/joex/scheduler/JobTask.scala b/modules/joex/src/main/scala/docspell/joex/scheduler/JobTask.scala index 9a04ad79..fd291d0b 100644 --- a/modules/joex/src/main/scala/docspell/joex/scheduler/JobTask.scala +++ b/modules/joex/src/main/scala/docspell/joex/scheduler/JobTask.scala @@ -12,7 +12,7 @@ import cats.implicits._ import docspell.common.Ident import docspell.common.syntax.all._ -import io.circe.Decoder +import io.circe.{Decoder, Encoder, Json} /** Binds a Task to a name. This is required to lookup the code based on the taskName in * the RJob data and to execute it given the arguments that have to be read from a @@ -24,18 +24,19 @@ import io.circe.Decoder */ case class JobTask[F[_]]( name: Ident, - task: Task[F, String, Unit], + task: Task[F, String, Json], onCancel: Task[F, String, Unit] ) object JobTask { - def json[F[_]: Sync, A]( + def json[F[_]: Sync, A, B]( name: Ident, - task: Task[F, A, Unit], + task: Task[F, A, B], onCancel: Task[F, A, Unit] )(implicit - D: Decoder[A] + D: Decoder[A], + E: Encoder[B] ): JobTask[F] = { val convert: String => F[A] = str => @@ -45,6 +46,6 @@ object JobTask { Sync[F].raiseError(new Exception(s"Cannot parse task arguments: $str", ex)) } - JobTask(name, task.contramap(convert), onCancel.contramap(convert)) + JobTask(name, task.contramap(convert).map(E.apply), onCancel.contramap(convert)) } } diff --git a/modules/joex/src/main/scala/docspell/joex/scheduler/SchedulerImpl.scala b/modules/joex/src/main/scala/docspell/joex/scheduler/SchedulerImpl.scala index 877364b6..d30c43eb 100644 --- a/modules/joex/src/main/scala/docspell/joex/scheduler/SchedulerImpl.scala +++ b/modules/joex/src/main/scala/docspell/joex/scheduler/SchedulerImpl.scala @@ -25,6 +25,7 @@ import docspell.store.queries.QJob import docspell.store.queue.JobQueue import docspell.store.records.RJob +import io.circe.Json import org.log4s.getLogger final class SchedulerImpl[F[_]: Async]( @@ -168,7 +169,7 @@ final class SchedulerImpl[F[_]: Async]( ctx <- Context[F, String](job, job.args, config, logSink, store) _ <- t.onCancel.run(ctx) _ <- state.modify(_.markCancelled(job)) - _ <- onFinish(job, JobState.Cancelled) + _ <- onFinish(job, Json.Null, JobState.Cancelled) _ <- ctx.logger.warn("Job has been cancelled.") _ <- logger.fdebug(s"Job ${job.info} has been cancelled.") } yield () @@ -197,27 +198,32 @@ final class SchedulerImpl[F[_]: Async]( } } - def onFinish(job: RJob, finalState: JobState): F[Unit] = + def onFinish(job: RJob, result: Json, finishState: JobState): F[Unit] = for { - _ <- logger.fdebug(s"Job ${job.info} done $finalState. Releasing resources.") + _ <- logger.fdebug(s"Job ${job.info} done $finishState. Releasing resources.") _ <- permits.release *> permits.available.flatMap(a => logger.fdebug(s"Permit released ($a free)") ) _ <- state.modify(_.removeRunning(job)) - _ <- QJob.setFinalState(job.id, finalState, store) - _ <- pubSub.publish1IgnoreErrors( - JobDone.topic, - JobDone(job.id, job.group, job.task, job.args, finalState) + _ <- QJob.setFinalState(job.id, finishState, store) + _ <- Sync[F].whenA(JobState.isDone(finishState))( + pubSub.publish1IgnoreErrors( + JobDone.topic, + JobDone(job.id, job.group, job.task, job.args, finishState) + ) ) - _ <- eventSink.offer( - Event.JobDone( - job.id, - job.group, - job.task, - job.args, - job.state, - job.subject, - job.submitter + _ <- Sync[F].whenA(JobState.isDone(finishState))( + eventSink.offer( + Event.JobDone( + job.id, + job.group, + job.task, + job.args, + job.state, + job.subject, + job.submitter, + result + ) ) ) } yield () @@ -231,45 +237,45 @@ final class SchedulerImpl[F[_]: Async]( def wrapTask( job: RJob, - task: Task[F, String, Unit], + task: Task[F, String, Json], ctx: Context[F, String] ): Task[F, String, Unit] = task .mapF(fa => onStart(job) *> logger.fdebug("Starting task now") *> fa) .mapF(_.attempt.flatMap { - case Right(()) => + case Right(result) => logger.info(s"Job execution successful: ${job.info}") ctx.logger.info("Job execution successful") *> - (JobState.Success: JobState).pure[F] + (JobState.Success: JobState, result).pure[F] case Left(ex) => state.get.map(_.wasCancelled(job)).flatMap { case true => logger.error(ex)(s"Job ${job.info} execution failed (cancel = true)") ctx.logger.error(ex)("Job execution failed (cancel = true)") *> - (JobState.Cancelled: JobState).pure[F] + (JobState.Cancelled: JobState, Json.Null).pure[F] case false => QJob.exceedsRetries(job.id, config.retries, store).flatMap { case true => logger.error(ex)(s"Job ${job.info} execution failed. Retries exceeded.") ctx.logger .error(ex)(s"Job ${job.info} execution failed. Retries exceeded.") - .map(_ => JobState.Failed: JobState) + .map(_ => (JobState.Failed: JobState, Json.Null)) case false => logger.error(ex)(s"Job ${job.info} execution failed. Retrying later.") ctx.logger .error(ex)(s"Job ${job.info} execution failed. Retrying later.") - .map(_ => JobState.Stuck: JobState) + .map(_ => (JobState.Stuck: JobState, Json.Null)) } } }) .mapF(_.attempt.flatMap { - case Right(jstate) => - onFinish(job, jstate) + case Right((jstate, result)) => + onFinish(job, result, jstate) case Left(ex) => logger.error(ex)(s"Error happened during post-processing of ${job.info}!") // we don't know the real outcome hereā€¦ // since tasks should be idempotent, set it to stuck. if above has failed, this might fail anyways - onFinish(job, JobState.Stuck) + onFinish(job, Json.Null, JobState.Stuck) }) def forkRun( @@ -291,7 +297,7 @@ final class SchedulerImpl[F[_]: Async]( () } *> state.modify(_.markCancelled(job)) *> - onFinish(job, JobState.Cancelled) *> + onFinish(job, Json.Null, JobState.Cancelled) *> ctx.logger.warn("Job has been cancelled.") *> logger.fdebug(s"Job ${job.info} has been cancelled.") ) diff --git a/modules/notification/api/src/main/scala/docspell/notification/api/Event.scala b/modules/notification/api/src/main/scala/docspell/notification/api/Event.scala index 18ff790a..3da27455 100644 --- a/modules/notification/api/src/main/scala/docspell/notification/api/Event.scala +++ b/modules/notification/api/src/main/scala/docspell/notification/api/Event.scala @@ -12,7 +12,7 @@ import cats.implicits._ import docspell.common._ -import io.circe.{Decoder, Encoder} +import io.circe.{Decoder, Encoder, Json} /** An event generated in the platform. */ sealed trait Event { @@ -203,7 +203,8 @@ object Event { args: String, state: JobState, subject: String, - submitter: Ident + submitter: Ident, + result: Json ) extends Event { val eventType = JobDone val baseUrl = None @@ -220,7 +221,8 @@ object Event { "", JobState.running, "Process 3 files", - account.user + account.user, + Json.Null ) } yield ev } diff --git a/modules/notification/impl/src/main/scala/docspell/notification/impl/context/JobDoneCtx.scala b/modules/notification/impl/src/main/scala/docspell/notification/impl/context/JobDoneCtx.scala index 2b5c9b24..8ef19754 100644 --- a/modules/notification/impl/src/main/scala/docspell/notification/impl/context/JobDoneCtx.scala +++ b/modules/notification/impl/src/main/scala/docspell/notification/impl/context/JobDoneCtx.scala @@ -13,8 +13,8 @@ import docspell.notification.api._ import docspell.notification.impl.AbstractEventContext import doobie._ -import io.circe.Encoder import io.circe.syntax._ +import io.circe.{Encoder, Json} import yamusca.implicits._ final case class JobDoneCtx(event: Event.JobDone, data: JobDoneCtx.Data) @@ -45,14 +45,23 @@ object JobDoneCtx { args: String, state: JobState, subject: String, - submitter: Ident + submitter: Ident, + result: Json ) object Data { implicit val jsonEncoder: Encoder[Data] = io.circe.generic.semiauto.deriveEncoder def apply(ev: Event.JobDone): Data = - Data(ev.jobId, ev.group, ev.task, ev.args, ev.state, ev.subject, ev.submitter) + Data( + ev.jobId, + ev.group, + ev.task, + ev.args, + ev.state, + ev.subject, + ev.submitter, + ev.result + ) } - }