Fix notifications for collective scoped tasks

This commit is contained in:
eikek
2023-03-06 16:45:16 +01:00
parent f4615e690a
commit 3718442e29
4 changed files with 33 additions and 15 deletions

View File

@ -136,15 +136,14 @@ final class SchedulerImpl[F[_]: Async](
def mainLoop: Stream[F, Nothing] = {
val body: F[Boolean] =
for {
_ <- permits.available.flatMap(a =>
_ <- permits.available.flatTap(a =>
logger.debug(s"Try to acquire permit ($a free)")
)
_ <- permits.acquire
_ <- logger.debug("New permit acquired")
down <- state.get.map(_.shutdownRequest)
rjob <-
if (down)
logger.info("") *> permits.release *> (None: Option[RJob]).pure[F]
if (down) permits.release.as(Option.empty[RJob])
else
queue.nextJob(
group => state.modify(_.nextPrio(group, config.countingScheme)),
@ -220,7 +219,7 @@ final class SchedulerImpl[F[_]: Async](
}
}
def onFinish(job: RJob, result: JobTaskResult, finishState: JobState): F[Unit] =
private def onFinish(job: RJob, result: JobTaskResult, finishState: JobState): F[Unit] =
for {
_ <- logger.debug(s"Job ${job.info} done $finishState. Releasing resources.")
_ <- permits.release *> permits.available.flatMap(a =>
@ -229,13 +228,13 @@ final class SchedulerImpl[F[_]: Async](
_ <- state.modify(_.removeRunning(job))
_ <- QJob.setFinalState(job.id, finishState, store)
_ <- Sync[F].whenA(JobState.isDone(finishState))(
pubSub.publish1IgnoreErrors(
logger.trace("Publishing JobDone event") *> pubSub.publish1IgnoreErrors(
JobDone.topic,
JobDone(job.id, job.group, job.task, job.args, finishState, result.json)
)
)
_ <- Sync[F].whenA(JobState.isDone(finishState))(
makeJobDoneEvent(job, result)
logger.trace("Sending JobDone to event sink") *> makeJobDoneEvent(job, result)
.semiflatMap(eventSink.offer)
.value
)
@ -243,7 +242,9 @@ final class SchedulerImpl[F[_]: Async](
private def makeJobDoneEvent(job: RJob, result: JobTaskResult) =
for {
acc <- OptionT(findJobOwner(convertJob(job)))
acc <- OptionT(findJobOwner(convertJob(job))).flatTransform(acc =>
logger.debug(s"Found job owner $acc for job $job").as(acc)
)
ev = Event.JobDone(
acc,
job.id,