Use job group instead of parsing the task arguments

This commit is contained in:
eikek 2021-11-07 22:39:44 +01:00
parent a09f75e45e
commit 61c01ad79b
3 changed files with 10 additions and 8 deletions

View File

@ -13,7 +13,13 @@ import io.circe.generic.semiauto.{deriveDecoder, deriveEncoder}
import io.circe.{Decoder, Encoder}
/** Message to notify about finished jobs. They have a final state. */
final case class JobDone(jobId: Ident, task: Ident, args: String, state: JobState)
final case class JobDone(
jobId: Ident,
group: Ident,
task: Ident,
args: String,
state: JobState
)
object JobDone {
implicit val jsonDecoder: Decoder[JobDone] =
deriveDecoder[JobDone]

View File

@ -204,7 +204,7 @@ final class SchedulerImpl[F[_]: Async](
_ <- QJob.setFinalState(job.id, finalState, store)
_ <- pubSub.publish1IgnoreErrors(
JobDone.topic,
JobDone(job.id, job.task, job.args, finalState)
JobDone(job.id, job.group, job.task, job.args, finalState)
)
} yield ()

View File

@ -8,10 +8,8 @@ package docspell.restserver
import fs2.Stream
import fs2.concurrent.Topic
import docspell.backend.msg.JobDone
import docspell.common._
import docspell.common.syntax.StringSyntax._
import docspell.common.ProcessItemArgs
import docspell.pubsub.api.PubSubT
import docspell.restserver.ws.OutputEvent
@ -29,7 +27,5 @@ object Subscriptions {
pubSub
.subscribe(JobDone.topic)
.filter(m => m.body.task == ProcessItemArgs.taskName)
.map(m => m.body.args.parseJsonAs[ProcessItemArgs])
.collect { case Right(a) => OutputEvent.ItemProcessed(a.meta.collective) }
.map(m => OutputEvent.ItemProcessed(m.body.group))
}