From 69674eb485c00bced53e334570287bf17eac2499 Mon Sep 17 00:00:00 2001 From: Eike Kettner Date: Thu, 13 Aug 2020 01:01:02 +0200 Subject: [PATCH] Improve job-queue query to make sure jobs across all states show up --- .../scala/docspell/backend/ops/OJob.scala | 4 ++- .../restserver/routes/JobQueueRoutes.scala | 2 +- .../scala/docspell/store/queries/QJob.scala | 33 ++++++++++++++----- 3 files changed, 29 insertions(+), 10 deletions(-) diff --git a/modules/backend/src/main/scala/docspell/backend/ops/OJob.scala b/modules/backend/src/main/scala/docspell/backend/ops/OJob.scala index 9a05337c..ade2fda0 100644 --- a/modules/backend/src/main/scala/docspell/backend/ops/OJob.scala +++ b/modules/backend/src/main/scala/docspell/backend/ops/OJob.scala @@ -48,7 +48,9 @@ object OJob { def queueState(collective: Ident, maxResults: Int): F[CollectiveQueueState] = store - .transact(QJob.queueStateSnapshot(collective).take(maxResults.toLong)) + .transact( + QJob.queueStateSnapshot(collective, maxResults.toLong) + ) .map(t => JobDetail(t._1, t._2)) .compile .toVector diff --git a/modules/restserver/src/main/scala/docspell/restserver/routes/JobQueueRoutes.scala b/modules/restserver/src/main/scala/docspell/restserver/routes/JobQueueRoutes.scala index fc605f74..4a34b219 100644 --- a/modules/restserver/src/main/scala/docspell/restserver/routes/JobQueueRoutes.scala +++ b/modules/restserver/src/main/scala/docspell/restserver/routes/JobQueueRoutes.scala @@ -21,7 +21,7 @@ object JobQueueRoutes { HttpRoutes.of { case GET -> Root / "state" => for { - js <- backend.job.queueState(user.account.collective, 200) + js <- backend.job.queueState(user.account.collective, 40) res = Conversions.mkJobQueueState(js) resp <- Ok(res) } yield resp diff --git a/modules/store/src/main/scala/docspell/store/queries/QJob.scala b/modules/store/src/main/scala/docspell/store/queries/QJob.scala index 99f94b67..f3521ed9 100644 --- a/modules/store/src/main/scala/docspell/store/queries/QJob.scala +++ b/modules/store/src/main/scala/docspell/store/queries/QJob.scala @@ -209,7 +209,8 @@ object QJob { store.transact(RJob.findFromIds(ids)) def queueStateSnapshot( - collective: Ident + collective: Ident, + max: Long ): Stream[ConnectionIO, (RJob, Vector[RJobLog])] = { val JC = RJob.Columns val waiting: Set[JobState] = Set(JobState.Waiting, JobState.Stuck, JobState.Scheduled) @@ -218,18 +219,34 @@ object QJob { def selectJobs(now: Timestamp): Stream[ConnectionIO, RJob] = { val refDate = now.minusHours(24) - val sql = selectSimple( + + val runningJobs = (selectSimple( + JC.all, + RJob.table, + and(JC.group.is(collective), JC.state.isOneOf(running.toSeq)) + ) ++ orderBy(JC.submitted.desc)).query[RJob].stream + + val waitingJobs = (selectSimple( JC.all, RJob.table, and( JC.group.is(collective), - or( - and(JC.state.isOneOf(done.toSeq), JC.submitted.isGt(refDate)), - JC.state.isOneOf((running ++ waiting).toSeq) - ) + JC.state.isOneOf(waiting.toSeq), + JC.submitted.isGt(refDate) ) - ) - (sql ++ orderBy(JC.submitted.desc)).query[RJob].stream + ) ++ orderBy(JC.submitted.desc)).query[RJob].stream.take(max) + + val doneJobs = (selectSimple( + JC.all, + RJob.table, + and( + JC.group.is(collective), + JC.state.isOneOf(done.toSeq), + JC.submitted.isGt(refDate) + ) + ) ++ orderBy(JC.submitted.desc)).query[RJob].stream.take(max) + + runningJobs ++ waitingJobs ++ doneJobs } def selectLogs(job: RJob): ConnectionIO[Vector[RJobLog]] =