Improve job-queue query to make sure jobs across all states show up

This commit is contained in:
Eike Kettner 2020-08-13 01:01:02 +02:00
parent 41ea071555
commit 69674eb485
3 changed files with 29 additions and 10 deletions

View File

@ -48,7 +48,9 @@ object OJob {
def queueState(collective: Ident, maxResults: Int): F[CollectiveQueueState] = def queueState(collective: Ident, maxResults: Int): F[CollectiveQueueState] =
store store
.transact(QJob.queueStateSnapshot(collective).take(maxResults.toLong)) .transact(
QJob.queueStateSnapshot(collective, maxResults.toLong)
)
.map(t => JobDetail(t._1, t._2)) .map(t => JobDetail(t._1, t._2))
.compile .compile
.toVector .toVector

View File

@ -21,7 +21,7 @@ object JobQueueRoutes {
HttpRoutes.of { HttpRoutes.of {
case GET -> Root / "state" => case GET -> Root / "state" =>
for { for {
js <- backend.job.queueState(user.account.collective, 200) js <- backend.job.queueState(user.account.collective, 40)
res = Conversions.mkJobQueueState(js) res = Conversions.mkJobQueueState(js)
resp <- Ok(res) resp <- Ok(res)
} yield resp } yield resp

View File

@ -209,7 +209,8 @@ object QJob {
store.transact(RJob.findFromIds(ids)) store.transact(RJob.findFromIds(ids))
def queueStateSnapshot( def queueStateSnapshot(
collective: Ident collective: Ident,
max: Long
): Stream[ConnectionIO, (RJob, Vector[RJobLog])] = { ): Stream[ConnectionIO, (RJob, Vector[RJobLog])] = {
val JC = RJob.Columns val JC = RJob.Columns
val waiting: Set[JobState] = Set(JobState.Waiting, JobState.Stuck, JobState.Scheduled) val waiting: Set[JobState] = Set(JobState.Waiting, JobState.Stuck, JobState.Scheduled)
@ -218,18 +219,34 @@ object QJob {
def selectJobs(now: Timestamp): Stream[ConnectionIO, RJob] = { def selectJobs(now: Timestamp): Stream[ConnectionIO, RJob] = {
val refDate = now.minusHours(24) val refDate = now.minusHours(24)
val sql = selectSimple(
val runningJobs = (selectSimple(
JC.all,
RJob.table,
and(JC.group.is(collective), JC.state.isOneOf(running.toSeq))
) ++ orderBy(JC.submitted.desc)).query[RJob].stream
val waitingJobs = (selectSimple(
JC.all, JC.all,
RJob.table, RJob.table,
and( and(
JC.group.is(collective), JC.group.is(collective),
or( JC.state.isOneOf(waiting.toSeq),
and(JC.state.isOneOf(done.toSeq), JC.submitted.isGt(refDate)), JC.submitted.isGt(refDate)
JC.state.isOneOf((running ++ waiting).toSeq)
) )
) ++ orderBy(JC.submitted.desc)).query[RJob].stream.take(max)
val doneJobs = (selectSimple(
JC.all,
RJob.table,
and(
JC.group.is(collective),
JC.state.isOneOf(done.toSeq),
JC.submitted.isGt(refDate)
) )
) ) ++ orderBy(JC.submitted.desc)).query[RJob].stream.take(max)
(sql ++ orderBy(JC.submitted.desc)).query[RJob].stream
runningJobs ++ waitingJobs ++ doneJobs
} }
def selectLogs(job: RJob): ConnectionIO[Vector[RJobLog]] = def selectLogs(job: RJob): ConnectionIO[Vector[RJobLog]] =