Allow background tasks return results that are included in the event

This commit is contained in:
eikek 2022-02-09 23:38:32 +01:00
parent b76fa52d67
commit d6829ea69b
5 changed files with 60 additions and 39 deletions

View File

@ -54,6 +54,9 @@ object JobState {
NonEmptyList.of(Waiting, Scheduled, Running, Stuck)
val inProgress: Set[JobState] = Set(Scheduled, Running, Stuck)
def isDone(state: JobState): Boolean =
done.exists(_ == state)
def parse(str: String): Either[String, JobState] =
str.toLowerCase match {
case "waiting" => Right(Waiting)

View File

@ -12,7 +12,7 @@ import cats.implicits._
import docspell.common.Ident
import docspell.common.syntax.all._
import io.circe.Decoder
import io.circe.{Decoder, Encoder, Json}
/** Binds a Task to a name. This is required to lookup the code based on the taskName in
* the RJob data and to execute it given the arguments that have to be read from a
@ -24,18 +24,19 @@ import io.circe.Decoder
*/
case class JobTask[F[_]](
name: Ident,
task: Task[F, String, Unit],
task: Task[F, String, Json],
onCancel: Task[F, String, Unit]
)
object JobTask {
def json[F[_]: Sync, A](
def json[F[_]: Sync, A, B](
name: Ident,
task: Task[F, A, Unit],
task: Task[F, A, B],
onCancel: Task[F, A, Unit]
)(implicit
D: Decoder[A]
D: Decoder[A],
E: Encoder[B]
): JobTask[F] = {
val convert: String => F[A] =
str =>
@ -45,6 +46,6 @@ object JobTask {
Sync[F].raiseError(new Exception(s"Cannot parse task arguments: $str", ex))
}
JobTask(name, task.contramap(convert), onCancel.contramap(convert))
JobTask(name, task.contramap(convert).map(E.apply), onCancel.contramap(convert))
}
}

View File

@ -25,6 +25,7 @@ import docspell.store.queries.QJob
import docspell.store.queue.JobQueue
import docspell.store.records.RJob
import io.circe.Json
import org.log4s.getLogger
final class SchedulerImpl[F[_]: Async](
@ -168,7 +169,7 @@ final class SchedulerImpl[F[_]: Async](
ctx <- Context[F, String](job, job.args, config, logSink, store)
_ <- t.onCancel.run(ctx)
_ <- state.modify(_.markCancelled(job))
_ <- onFinish(job, JobState.Cancelled)
_ <- onFinish(job, Json.Null, JobState.Cancelled)
_ <- ctx.logger.warn("Job has been cancelled.")
_ <- logger.fdebug(s"Job ${job.info} has been cancelled.")
} yield ()
@ -197,27 +198,32 @@ final class SchedulerImpl[F[_]: Async](
}
}
def onFinish(job: RJob, finalState: JobState): F[Unit] =
def onFinish(job: RJob, result: Json, finishState: JobState): F[Unit] =
for {
_ <- logger.fdebug(s"Job ${job.info} done $finalState. Releasing resources.")
_ <- logger.fdebug(s"Job ${job.info} done $finishState. Releasing resources.")
_ <- permits.release *> permits.available.flatMap(a =>
logger.fdebug(s"Permit released ($a free)")
)
_ <- state.modify(_.removeRunning(job))
_ <- QJob.setFinalState(job.id, finalState, store)
_ <- pubSub.publish1IgnoreErrors(
JobDone.topic,
JobDone(job.id, job.group, job.task, job.args, finalState)
_ <- QJob.setFinalState(job.id, finishState, store)
_ <- Sync[F].whenA(JobState.isDone(finishState))(
pubSub.publish1IgnoreErrors(
JobDone.topic,
JobDone(job.id, job.group, job.task, job.args, finishState)
)
)
_ <- eventSink.offer(
Event.JobDone(
job.id,
job.group,
job.task,
job.args,
job.state,
job.subject,
job.submitter
_ <- Sync[F].whenA(JobState.isDone(finishState))(
eventSink.offer(
Event.JobDone(
job.id,
job.group,
job.task,
job.args,
job.state,
job.subject,
job.submitter,
result
)
)
)
} yield ()
@ -231,45 +237,45 @@ final class SchedulerImpl[F[_]: Async](
def wrapTask(
job: RJob,
task: Task[F, String, Unit],
task: Task[F, String, Json],
ctx: Context[F, String]
): Task[F, String, Unit] =
task
.mapF(fa => onStart(job) *> logger.fdebug("Starting task now") *> fa)
.mapF(_.attempt.flatMap {
case Right(()) =>
case Right(result) =>
logger.info(s"Job execution successful: ${job.info}")
ctx.logger.info("Job execution successful") *>
(JobState.Success: JobState).pure[F]
(JobState.Success: JobState, result).pure[F]
case Left(ex) =>
state.get.map(_.wasCancelled(job)).flatMap {
case true =>
logger.error(ex)(s"Job ${job.info} execution failed (cancel = true)")
ctx.logger.error(ex)("Job execution failed (cancel = true)") *>
(JobState.Cancelled: JobState).pure[F]
(JobState.Cancelled: JobState, Json.Null).pure[F]
case false =>
QJob.exceedsRetries(job.id, config.retries, store).flatMap {
case true =>
logger.error(ex)(s"Job ${job.info} execution failed. Retries exceeded.")
ctx.logger
.error(ex)(s"Job ${job.info} execution failed. Retries exceeded.")
.map(_ => JobState.Failed: JobState)
.map(_ => (JobState.Failed: JobState, Json.Null))
case false =>
logger.error(ex)(s"Job ${job.info} execution failed. Retrying later.")
ctx.logger
.error(ex)(s"Job ${job.info} execution failed. Retrying later.")
.map(_ => JobState.Stuck: JobState)
.map(_ => (JobState.Stuck: JobState, Json.Null))
}
}
})
.mapF(_.attempt.flatMap {
case Right(jstate) =>
onFinish(job, jstate)
case Right((jstate, result)) =>
onFinish(job, result, jstate)
case Left(ex) =>
logger.error(ex)(s"Error happened during post-processing of ${job.info}!")
// we don't know the real outcome here
// since tasks should be idempotent, set it to stuck. if above has failed, this might fail anyways
onFinish(job, JobState.Stuck)
onFinish(job, Json.Null, JobState.Stuck)
})
def forkRun(
@ -291,7 +297,7 @@ final class SchedulerImpl[F[_]: Async](
()
} *>
state.modify(_.markCancelled(job)) *>
onFinish(job, JobState.Cancelled) *>
onFinish(job, Json.Null, JobState.Cancelled) *>
ctx.logger.warn("Job has been cancelled.") *>
logger.fdebug(s"Job ${job.info} has been cancelled.")
)

View File

@ -12,7 +12,7 @@ import cats.implicits._
import docspell.common._
import io.circe.{Decoder, Encoder}
import io.circe.{Decoder, Encoder, Json}
/** An event generated in the platform. */
sealed trait Event {
@ -203,7 +203,8 @@ object Event {
args: String,
state: JobState,
subject: String,
submitter: Ident
submitter: Ident,
result: Json
) extends Event {
val eventType = JobDone
val baseUrl = None
@ -220,7 +221,8 @@ object Event {
"",
JobState.running,
"Process 3 files",
account.user
account.user,
Json.Null
)
} yield ev
}

View File

@ -13,8 +13,8 @@ import docspell.notification.api._
import docspell.notification.impl.AbstractEventContext
import doobie._
import io.circe.Encoder
import io.circe.syntax._
import io.circe.{Encoder, Json}
import yamusca.implicits._
final case class JobDoneCtx(event: Event.JobDone, data: JobDoneCtx.Data)
@ -45,14 +45,23 @@ object JobDoneCtx {
args: String,
state: JobState,
subject: String,
submitter: Ident
submitter: Ident,
result: Json
)
object Data {
implicit val jsonEncoder: Encoder[Data] =
io.circe.generic.semiauto.deriveEncoder
def apply(ev: Event.JobDone): Data =
Data(ev.jobId, ev.group, ev.task, ev.args, ev.state, ev.subject, ev.submitter)
Data(
ev.jobId,
ev.group,
ev.task,
ev.args,
ev.state,
ev.subject,
ev.submitter,
ev.result
)
}
}