From 4176b6bc64cbc684e043f64fcfe424c91b829982 Mon Sep 17 00:00:00 2001 From: eikek Date: Thu, 18 Nov 2021 21:52:11 +0100 Subject: [PATCH] Publish event when a waiting job is removed Fixes: #1182 --- .../main/scala/docspell/backend/BackendApp.scala | 2 +- .../main/scala/docspell/backend/ops/OJob.scala | 16 ++++++++++++++-- 2 files changed, 15 insertions(+), 3 deletions(-) diff --git a/modules/backend/src/main/scala/docspell/backend/BackendApp.scala b/modules/backend/src/main/scala/docspell/backend/BackendApp.scala index 812bfccd..9444901a 100644 --- a/modules/backend/src/main/scala/docspell/backend/BackendApp.scala +++ b/modules/backend/src/main/scala/docspell/backend/BackendApp.scala @@ -70,7 +70,7 @@ object BackendApp { orgImpl <- OOrganization(store) uploadImpl <- OUpload(store, queue, joexImpl) nodeImpl <- ONode(store) - jobImpl <- OJob(store, joexImpl) + jobImpl <- OJob(store, joexImpl, pubSubT) createIndex <- CreateIndex.resource(ftsClient, store) itemImpl <- OItem(store, ftsClient, createIndex, queue, joexImpl) itemSearchImpl <- OItemSearch(store) diff --git a/modules/backend/src/main/scala/docspell/backend/ops/OJob.scala b/modules/backend/src/main/scala/docspell/backend/ops/OJob.scala index bcd70da4..0c3d92ae 100644 --- a/modules/backend/src/main/scala/docspell/backend/ops/OJob.scala +++ b/modules/backend/src/main/scala/docspell/backend/ops/OJob.scala @@ -10,8 +10,10 @@ import cats.data.OptionT import cats.effect._ import cats.implicits._ +import docspell.backend.msg.JobDone import docspell.backend.ops.OJob.{CollectiveQueueState, JobCancelResult} import docspell.common._ +import docspell.pubsub.api.PubSubT import docspell.store.Store import docspell.store.UpdateResult import docspell.store.queries.QJob @@ -53,7 +55,8 @@ object OJob { def apply[F[_]: Sync]( store: Store[F], - joex: OJoex[F] + joex: OJoex[F], + pubsub: PubSubT[F] ): Resource[F, OJob[F]] = Resource.pure[F, OJob[F]](new OJob[F] { private[this] val logger = Logger.log4s(org.log4s.getLogger(OJob.getClass)) @@ -73,7 +76,16 @@ object OJob { def cancelJob(id: Ident, collective: Ident): F[JobCancelResult] = { def remove(job: RJob): F[JobCancelResult] = - store.transact(RJob.delete(job.id)) *> JobCancelResult.removed.pure[F] + for { + n <- store.transact(RJob.delete(job.id)) + _ <- + if (n <= 0) ().pure[F] + else + pubsub.publish1IgnoreErrors( + JobDone.topic, + JobDone(job.id, job.group, job.task, job.args, JobState.Cancelled) + ) + } yield JobCancelResult.removed def tryCancel(job: RJob): F[JobCancelResult] = job.worker match {