Publish event when a waiting job is removed

Fixes: #1182
This commit is contained in:
eikek 2021-11-18 21:52:11 +01:00
parent 95ded62cb9
commit 4176b6bc64
2 changed files with 15 additions and 3 deletions

View File

@ -70,7 +70,7 @@ object BackendApp {
orgImpl <- OOrganization(store)
uploadImpl <- OUpload(store, queue, joexImpl)
nodeImpl <- ONode(store)
jobImpl <- OJob(store, joexImpl)
jobImpl <- OJob(store, joexImpl, pubSubT)
createIndex <- CreateIndex.resource(ftsClient, store)
itemImpl <- OItem(store, ftsClient, createIndex, queue, joexImpl)
itemSearchImpl <- OItemSearch(store)

View File

@ -10,8 +10,10 @@ import cats.data.OptionT
import cats.effect._
import cats.implicits._
import docspell.backend.msg.JobDone
import docspell.backend.ops.OJob.{CollectiveQueueState, JobCancelResult}
import docspell.common._
import docspell.pubsub.api.PubSubT
import docspell.store.Store
import docspell.store.UpdateResult
import docspell.store.queries.QJob
@ -53,7 +55,8 @@ object OJob {
def apply[F[_]: Sync](
store: Store[F],
joex: OJoex[F]
joex: OJoex[F],
pubsub: PubSubT[F]
): Resource[F, OJob[F]] =
Resource.pure[F, OJob[F]](new OJob[F] {
private[this] val logger = Logger.log4s(org.log4s.getLogger(OJob.getClass))
@ -73,7 +76,16 @@ object OJob {
def cancelJob(id: Ident, collective: Ident): F[JobCancelResult] = {
def remove(job: RJob): F[JobCancelResult] =
store.transact(RJob.delete(job.id)) *> JobCancelResult.removed.pure[F]
for {
n <- store.transact(RJob.delete(job.id))
_ <-
if (n <= 0) ().pure[F]
else
pubsub.publish1IgnoreErrors(
JobDone.topic,
JobDone(job.id, job.group, job.task, job.args, JobState.Cancelled)
)
} yield JobCancelResult.removed
def tryCancel(job: RJob): F[JobCancelResult] =
job.worker match {