Indicate number of running jobs in tob nav

Issue: #1069
This commit is contained in:
eikek 2021-11-12 21:56:48 +01:00
parent 9acdb8ef96
commit 7a8527f821
16 changed files with 201 additions and 50 deletions

View File

@ -10,12 +10,12 @@ import cats.effect._
import docspell.backend.auth.Login
import docspell.backend.fulltext.CreateIndex
import docspell.backend.msg.JobQueuePublish
import docspell.backend.ops._
import docspell.backend.signup.OSignup
import docspell.ftsclient.FtsClient
import docspell.pubsub.api.PubSubT
import docspell.store.Store
import docspell.store.queue.JobQueue
import docspell.store.usertask.UserTaskStore
import docspell.totp.Totp
@ -58,7 +58,7 @@ object BackendApp {
): Resource[F, BackendApp[F]] =
for {
utStore <- UserTaskStore(store)
queue <- JobQueue(store)
queue <- JobQueuePublish(store, pubSubT)
totpImpl <- OTotp(store, Totp.default)
loginImpl <- Login[F](store, Totp.default)
signupImpl <- OSignup[F](store)

View File

@ -0,0 +1,59 @@
/*
* Copyright 2020 Eike K. & Contributors
*
* SPDX-License-Identifier: AGPL-3.0-or-later
*/
package docspell.backend.msg
import cats.effect._
import cats.implicits._
import docspell.common.{Duration, Ident, Priority}
import docspell.pubsub.api.PubSubT
import docspell.store.Store
import docspell.store.queue.JobQueue
import docspell.store.records.RJob
final class JobQueuePublish[F[_]: Sync](delegate: JobQueue[F], pubsub: PubSubT[F])
extends JobQueue[F] {
private def msg(job: RJob): JobSubmitted =
JobSubmitted(job.id, job.group, job.task, job.args)
private def publish(job: RJob): F[Unit] =
pubsub.publish1(JobSubmitted.topic, msg(job)).as(())
def insert(job: RJob) =
delegate.insert(job).flatTap(_ => publish(job))
def insertIfNew(job: RJob) =
delegate.insertIfNew(job).flatTap {
case true => publish(job)
case false => ().pure[F]
}
def insertAll(jobs: Seq[RJob]) =
delegate.insertAll(jobs).flatTap { results =>
results.zip(jobs).traverse { case (res, job) =>
if (res) publish(job)
else ().pure[F]
}
}
def insertAllIfNew(jobs: Seq[RJob]) =
delegate.insertAllIfNew(jobs).flatTap { results =>
results.zip(jobs).traverse { case (res, job) =>
if (res) publish(job)
else ().pure[F]
}
}
def nextJob(prio: Ident => F[Priority], worker: Ident, retryPause: Duration) =
delegate.nextJob(prio, worker, retryPause)
}
object JobQueuePublish {
def apply[F[_]: Async](store: Store[F], pubSub: PubSubT[F]): Resource[F, JobQueue[F]] =
JobQueue(store).map(q => new JobQueuePublish[F](q, pubSub))
}

View File

@ -0,0 +1,26 @@
/*
* Copyright 2020 Eike K. & Contributors
*
* SPDX-License-Identifier: AGPL-3.0-or-later
*/
package docspell.backend.msg
import docspell.common._
import docspell.pubsub.api.{Topic, TypedTopic}
import io.circe.generic.semiauto.{deriveDecoder, deriveEncoder}
import io.circe.{Decoder, Encoder}
final case class JobSubmitted(jobId: Ident, group: Ident, task: Ident, args: String)
object JobSubmitted {
implicit val jsonDecoder: Decoder[JobSubmitted] =
deriveDecoder
implicit val jsonEncoder: Encoder[JobSubmitted] =
deriveEncoder
val topic: TypedTopic[JobSubmitted] =
TypedTopic(Topic("job-submitted"))
}

View File

@ -19,5 +19,5 @@ object Topics {
/** A list of all topics. It is required to list every topic in use here! */
val all: NonEmptyList[TypedTopic[_]] =
NonEmptyList.of(JobDone.topic, CancelJob.topic, jobsNotify)
NonEmptyList.of(JobDone.topic, CancelJob.topic, jobsNotify, JobSubmitted.topic)
}

View File

@ -24,6 +24,8 @@ trait OJob[F[_]] {
def cancelJob(id: Ident, collective: Ident): F[JobCancelResult]
def setPriority(id: Ident, collective: Ident, prio: Priority): F[UpdateResult]
def getUnfinishedJobCount(collective: Ident): F[Int]
}
object OJob {
@ -93,5 +95,8 @@ object OJob {
} yield result)
.getOrElse(JobCancelResult.jobNotFound)
}
def getUnfinishedJobCount(collective: Ident): F[Int] =
store.transact(RJob.getUnfinishedCount(collective))
})
}

View File

@ -12,7 +12,7 @@ import fs2.concurrent.SignallingRef
import docspell.analysis.TextAnalyser
import docspell.backend.fulltext.CreateIndex
import docspell.backend.msg.{CancelJob, Topics}
import docspell.backend.msg.{CancelJob, JobQueuePublish, Topics}
import docspell.backend.ops._
import docspell.common._
import docspell.ftsclient.FtsClient
@ -126,13 +126,13 @@ object JoexAppImpl {
pubSub: PubSub[F]
): Resource[F, JoexApp[F]] =
for {
queue <- JobQueue(store)
pstore <- PeriodicTaskStore.create(store)
client = JoexClient(httpClient)
pubSubT = PubSubT(
pubSub,
Logger.log4s(org.log4s.getLogger(s"joex-${cfg.appId.id}"))
)
queue <- JobQueuePublish(store, pubSubT)
joex <- OJoex(pubSubT)
upload <- OUpload(store, queue, joex)
fts <- createFtsClient(cfg)(httpClient)

View File

@ -138,7 +138,7 @@ object RestServer {
token: AuthToken
): HttpRoutes[F] =
Router(
"ws" -> WebSocketRoutes(token, topic, wsB),
"ws" -> WebSocketRoutes(token, restApp.backend, topic, wsB),
"auth" -> LoginRoutes.session(restApp.backend.login, cfg, token),
"tag" -> TagRoutes(restApp.backend, token),
"equipment" -> EquipmentRoutes(restApp.backend, token),

View File

@ -6,11 +6,11 @@
package docspell.restserver
import cats.effect.Async
import fs2.Stream
import fs2.concurrent.Topic
import docspell.backend.msg.JobDone
import docspell.common.ProcessItemArgs
import docspell.backend.msg.{JobDone, JobSubmitted}
import docspell.pubsub.api.PubSubT
import docspell.restserver.ws.OutputEvent
@ -18,15 +18,20 @@ import docspell.restserver.ws.OutputEvent
*/
object Subscriptions {
def apply[F[_]](
def apply[F[_]: Async](
wsTopic: Topic[F, OutputEvent],
pubSub: PubSubT[F]
): Stream[F, Nothing] =
jobDone(pubSub).through(wsTopic.publish)
jobDone(pubSub).merge(jobSubmitted(pubSub)).through(wsTopic.publish)
def jobDone[F[_]](pubSub: PubSubT[F]): Stream[F, OutputEvent] =
pubSub
.subscribe(JobDone.topic)
.filter(m => m.body.task == ProcessItemArgs.taskName)
.map(m => OutputEvent.ItemProcessed(m.body.group))
.map(m => OutputEvent.JobDone(m.body.group, m.body.task))
def jobSubmitted[F[_]](pubSub: PubSubT[F]): Stream[F, OutputEvent] =
pubSub
.subscribe(JobSubmitted.topic)
.map(m => OutputEvent.JobSubmitted(m.body.group, m.body.task))
}

View File

@ -31,18 +31,26 @@ object OutputEvent {
Msg("keep-alive", ()).asJson
}
final case class ItemProcessed(collective: Ident) extends OutputEvent {
def forCollective(token: AuthToken): Boolean =
token.account.collective == collective
def asJson: Json =
Msg("item-processed", ()).asJson
}
final case class JobsWaiting(group: Ident, count: Int) extends OutputEvent {
final case class JobSubmitted(group: Ident, task: Ident) extends OutputEvent {
def forCollective(token: AuthToken): Boolean =
token.account.collective == group
def asJson: Json =
Msg("job-submitted", task).asJson
}
final case class JobDone(group: Ident, task: Ident) extends OutputEvent {
def forCollective(token: AuthToken): Boolean =
token.account.collective == group
def asJson: Json =
Msg("job-done", task).asJson
}
final case class JobsWaiting(collective: Ident, count: Int) extends OutputEvent {
def forCollective(token: AuthToken): Boolean =
token.account.collective == collective
def asJson: Json =
Msg("jobs-waiting", count).asJson
}

View File

@ -7,9 +7,11 @@
package docspell.restserver.ws
import cats.effect.Async
import cats.implicits._
import fs2.concurrent.Topic
import fs2.{Pipe, Stream}
import docspell.backend.BackendApp
import docspell.backend.auth.AuthToken
import org.http4s.HttpRoutes
@ -22,6 +24,7 @@ object WebSocketRoutes {
def apply[F[_]: Async](
user: AuthToken,
backend: BackendApp[F],
topic: Topic[F, OutputEvent],
wsb: WebSocketBuilder2[F]
): HttpRoutes[F] = {
@ -29,11 +32,18 @@ object WebSocketRoutes {
import dsl._
HttpRoutes.of { case GET -> Root =>
val init =
for {
jc <- backend.job.getUnfinishedJobCount(user.account.collective)
msg = OutputEvent.JobsWaiting(user.account.collective, jc)
} yield Text(msg.encode)
val toClient: Stream[F, WebSocketFrame.Text] =
topic
.subscribe(500)
.filter(_.forCollective(user))
.map(msg => Text(msg.encode))
Stream.eval(init) ++
topic
.subscribe(500)
.filter(_.forCollective(user))
.map(msg => Text(msg.encode))
val toServer: Pipe[F, WebSocketFrame, Unit] =
_.map(_ => ())

View File

@ -30,9 +30,9 @@ trait JobQueue[F[_]] {
*/
def insertIfNew(job: RJob): F[Boolean]
def insertAll(jobs: Seq[RJob]): F[Int]
def insertAll(jobs: Seq[RJob]): F[List[Boolean]]
def insertAllIfNew(jobs: Seq[RJob]): F[Int]
def insertAllIfNew(jobs: Seq[RJob]): F[List[Boolean]]
def nextJob(
prio: Ident => F[Priority],
@ -77,26 +77,24 @@ object JobQueue {
else insert(job).as(true)
} yield ret
def insertAll(jobs: Seq[RJob]): F[Int] =
def insertAll(jobs: Seq[RJob]): F[List[Boolean]] =
jobs.toList
.traverse(j => insert(j).attempt)
.flatMap(_.traverse {
case Right(()) => 1.pure[F]
case Right(()) => true.pure[F]
case Left(ex) =>
logger.error(ex)("Could not insert job. Skipping it.").as(0)
logger.error(ex)("Could not insert job. Skipping it.").as(false)
})
.map(_.sum)
def insertAllIfNew(jobs: Seq[RJob]): F[Int] =
def insertAllIfNew(jobs: Seq[RJob]): F[List[Boolean]] =
jobs.toList
.traverse(j => insertIfNew(j).attempt)
.flatMap(_.traverse {
case Right(true) => 1.pure[F]
case Right(false) => 0.pure[F]
case Right(true) => true.pure[F]
case Right(false) => false.pure[F]
case Left(ex) =>
logger.error(ex)("Could not insert job. Skipping it.").as(0)
logger.error(ex)("Could not insert job. Skipping it.").as(false)
})
.map(_.sum)
})
}

View File

@ -300,4 +300,10 @@ object RJob {
where(T.tracker === trackerId, T.state.in(JobState.notDone))
).query[RJob].option
def getUnfinishedCount(group: Ident): ConnectionIO[Int] =
run(
select(count(T.id)),
from(T),
T.group === group && T.state.in(JobState.notDone)
).query[Int].unique
}

View File

@ -66,6 +66,7 @@ type alias Model =
, anonymousUiLang : UiLanguage
, langMenuOpen : Bool
, showNewItemsArrived : Bool
, jobsWaiting : Int
}
@ -129,6 +130,7 @@ init key url flags_ settings =
, anonymousUiLang = Messages.UiLanguage.English
, langMenuOpen = False
, showNewItemsArrived = False
, jobsWaiting = 0
}
, Cmd.batch
[ Cmd.map UserSettingsMsg uc

View File

@ -311,20 +311,28 @@ updateWithSub msg model =
ReceiveWsMessage data ->
case data of
Ok ItemProcessed ->
Ok (JobDone task) ->
let
newModel =
{ model | showNewItemsArrived = True }
in
case model.page of
HomePage ->
updateHome texts Page.Home.Data.RefreshView newModel
isProcessItem =
task == "process-item"
_ ->
( newModel, Cmd.none, Sub.none )
newModel =
{ model
| showNewItemsArrived = isProcessItem
, jobsWaiting = max 0 (model.jobsWaiting - 1)
}
in
if model.page == HomePage && isProcessItem then
updateHome texts Page.Home.Data.RefreshView newModel
else
( newModel, Cmd.none, Sub.none )
Ok (JobSubmitted _) ->
( { model | jobsWaiting = model.jobsWaiting + 1 }, Cmd.none, Sub.none )
Ok (JobsWaiting n) ->
( model, Cmd.none, Sub.none )
( { model | jobsWaiting = max 0 n }, Cmd.none, Sub.none )
Err err ->
( model, Cmd.none, Sub.none )

View File

@ -259,10 +259,21 @@ dataMenu texts _ model =
div [ class "relative" ]
[ a
[ class dropdownLink
, class "inline-block relative"
, onClick ToggleNavMenu
, href "#"
]
[ i [ class "fa fa-cogs" ] []
, div
[ class "h-5 w-5 rounded-full text-xs px-1 py-1 absolute top-1 left-1 font-bold"
, class "dark:bg-lightblue-500 dark:border-gray-50 dark:text-gray-800"
, class "bg-blue-500 text-gray-50"
, classList [ ( "hidden", model.jobsWaiting <= 0 ) ]
]
[ div [ class "-mt-0.5 ml-0.5" ]
[ text (String.fromInt model.jobsWaiting)
]
]
]
, div
[ class dropdownMenu
@ -301,7 +312,14 @@ dataMenu texts _ model =
, dataPageLink model
QueuePage
[]
[ i [ class "fa fa-tachometer-alt w-6" ] []
[ i
[ if model.jobsWaiting <= 0 then
class "fa fa-tachometer-alt w-6"
else
class "fa fa-circle dark:text-lightblue-500 text-blue-500"
]
[]
, span [ class "ml-1" ]
[ text texts.processingQueue
]

View File

@ -11,7 +11,8 @@ import Json.Decode as D
type ServerEvent
= ItemProcessed
= JobSubmitted String
| JobDone String
| JobsWaiting Int
@ -30,8 +31,13 @@ decode json =
decodeTag : String -> D.Decoder ServerEvent
decodeTag tag =
case tag of
"item-processed" ->
D.succeed ItemProcessed
"job-done" ->
D.field "content" D.string
|> D.map JobDone
"job-submitted" ->
D.field "content" D.string
|> D.map JobSubmitted
"jobs-waiting" ->
D.field "content" D.int