From 7a8527f821102ef8d5eeee4385033c3e2c526713 Mon Sep 17 00:00:00 2001 From: eikek Date: Fri, 12 Nov 2021 21:56:48 +0100 Subject: [PATCH] Indicate number of running jobs in tob nav Issue: #1069 --- .../scala/docspell/backend/BackendApp.scala | 4 +- .../backend/msg/JobQueuePublish.scala | 59 +++++++++++++++++++ .../docspell/backend/msg/JobSubmitted.scala | 26 ++++++++ .../scala/docspell/backend/msg/Topics.scala | 2 +- .../scala/docspell/backend/ops/OJob.scala | 5 ++ .../scala/docspell/joex/JoexAppImpl.scala | 4 +- .../docspell/restserver/RestServer.scala | 2 +- .../docspell/restserver/Subscriptions.scala | 17 ++++-- .../docspell/restserver/ws/OutputEvent.scala | 26 +++++--- .../restserver/ws/WebSocketRoutes.scala | 18 ++++-- .../scala/docspell/store/queue/JobQueue.scala | 20 +++---- .../scala/docspell/store/records/RJob.scala | 6 ++ modules/webapp/src/main/elm/App/Data.elm | 2 + modules/webapp/src/main/elm/App/Update.elm | 28 +++++---- modules/webapp/src/main/elm/App/View2.elm | 20 ++++++- .../webapp/src/main/elm/Data/ServerEvent.elm | 12 +++- 16 files changed, 201 insertions(+), 50 deletions(-) create mode 100644 modules/backend/src/main/scala/docspell/backend/msg/JobQueuePublish.scala create mode 100644 modules/backend/src/main/scala/docspell/backend/msg/JobSubmitted.scala diff --git a/modules/backend/src/main/scala/docspell/backend/BackendApp.scala b/modules/backend/src/main/scala/docspell/backend/BackendApp.scala index cdc92fcc..812bfccd 100644 --- a/modules/backend/src/main/scala/docspell/backend/BackendApp.scala +++ b/modules/backend/src/main/scala/docspell/backend/BackendApp.scala @@ -10,12 +10,12 @@ import cats.effect._ import docspell.backend.auth.Login import docspell.backend.fulltext.CreateIndex +import docspell.backend.msg.JobQueuePublish import docspell.backend.ops._ import docspell.backend.signup.OSignup import docspell.ftsclient.FtsClient import docspell.pubsub.api.PubSubT import docspell.store.Store -import docspell.store.queue.JobQueue import docspell.store.usertask.UserTaskStore import docspell.totp.Totp @@ -58,7 +58,7 @@ object BackendApp { ): Resource[F, BackendApp[F]] = for { utStore <- UserTaskStore(store) - queue <- JobQueue(store) + queue <- JobQueuePublish(store, pubSubT) totpImpl <- OTotp(store, Totp.default) loginImpl <- Login[F](store, Totp.default) signupImpl <- OSignup[F](store) diff --git a/modules/backend/src/main/scala/docspell/backend/msg/JobQueuePublish.scala b/modules/backend/src/main/scala/docspell/backend/msg/JobQueuePublish.scala new file mode 100644 index 00000000..64b0b6d6 --- /dev/null +++ b/modules/backend/src/main/scala/docspell/backend/msg/JobQueuePublish.scala @@ -0,0 +1,59 @@ +/* + * Copyright 2020 Eike K. & Contributors + * + * SPDX-License-Identifier: AGPL-3.0-or-later + */ + +package docspell.backend.msg + +import cats.effect._ +import cats.implicits._ + +import docspell.common.{Duration, Ident, Priority} +import docspell.pubsub.api.PubSubT +import docspell.store.Store +import docspell.store.queue.JobQueue +import docspell.store.records.RJob + +final class JobQueuePublish[F[_]: Sync](delegate: JobQueue[F], pubsub: PubSubT[F]) + extends JobQueue[F] { + + private def msg(job: RJob): JobSubmitted = + JobSubmitted(job.id, job.group, job.task, job.args) + + private def publish(job: RJob): F[Unit] = + pubsub.publish1(JobSubmitted.topic, msg(job)).as(()) + + def insert(job: RJob) = + delegate.insert(job).flatTap(_ => publish(job)) + + def insertIfNew(job: RJob) = + delegate.insertIfNew(job).flatTap { + case true => publish(job) + case false => ().pure[F] + } + + def insertAll(jobs: Seq[RJob]) = + delegate.insertAll(jobs).flatTap { results => + results.zip(jobs).traverse { case (res, job) => + if (res) publish(job) + else ().pure[F] + } + } + + def insertAllIfNew(jobs: Seq[RJob]) = + delegate.insertAllIfNew(jobs).flatTap { results => + results.zip(jobs).traverse { case (res, job) => + if (res) publish(job) + else ().pure[F] + } + } + + def nextJob(prio: Ident => F[Priority], worker: Ident, retryPause: Duration) = + delegate.nextJob(prio, worker, retryPause) +} + +object JobQueuePublish { + def apply[F[_]: Async](store: Store[F], pubSub: PubSubT[F]): Resource[F, JobQueue[F]] = + JobQueue(store).map(q => new JobQueuePublish[F](q, pubSub)) +} diff --git a/modules/backend/src/main/scala/docspell/backend/msg/JobSubmitted.scala b/modules/backend/src/main/scala/docspell/backend/msg/JobSubmitted.scala new file mode 100644 index 00000000..2cecdde8 --- /dev/null +++ b/modules/backend/src/main/scala/docspell/backend/msg/JobSubmitted.scala @@ -0,0 +1,26 @@ +/* + * Copyright 2020 Eike K. & Contributors + * + * SPDX-License-Identifier: AGPL-3.0-or-later + */ + +package docspell.backend.msg +import docspell.common._ +import docspell.pubsub.api.{Topic, TypedTopic} + +import io.circe.generic.semiauto.{deriveDecoder, deriveEncoder} +import io.circe.{Decoder, Encoder} + +final case class JobSubmitted(jobId: Ident, group: Ident, task: Ident, args: String) + +object JobSubmitted { + + implicit val jsonDecoder: Decoder[JobSubmitted] = + deriveDecoder + + implicit val jsonEncoder: Encoder[JobSubmitted] = + deriveEncoder + + val topic: TypedTopic[JobSubmitted] = + TypedTopic(Topic("job-submitted")) +} diff --git a/modules/backend/src/main/scala/docspell/backend/msg/Topics.scala b/modules/backend/src/main/scala/docspell/backend/msg/Topics.scala index 26c594f7..ae53d9f6 100644 --- a/modules/backend/src/main/scala/docspell/backend/msg/Topics.scala +++ b/modules/backend/src/main/scala/docspell/backend/msg/Topics.scala @@ -19,5 +19,5 @@ object Topics { /** A list of all topics. It is required to list every topic in use here! */ val all: NonEmptyList[TypedTopic[_]] = - NonEmptyList.of(JobDone.topic, CancelJob.topic, jobsNotify) + NonEmptyList.of(JobDone.topic, CancelJob.topic, jobsNotify, JobSubmitted.topic) } diff --git a/modules/backend/src/main/scala/docspell/backend/ops/OJob.scala b/modules/backend/src/main/scala/docspell/backend/ops/OJob.scala index 7f7a36fd..bcd70da4 100644 --- a/modules/backend/src/main/scala/docspell/backend/ops/OJob.scala +++ b/modules/backend/src/main/scala/docspell/backend/ops/OJob.scala @@ -24,6 +24,8 @@ trait OJob[F[_]] { def cancelJob(id: Ident, collective: Ident): F[JobCancelResult] def setPriority(id: Ident, collective: Ident, prio: Priority): F[UpdateResult] + + def getUnfinishedJobCount(collective: Ident): F[Int] } object OJob { @@ -93,5 +95,8 @@ object OJob { } yield result) .getOrElse(JobCancelResult.jobNotFound) } + + def getUnfinishedJobCount(collective: Ident): F[Int] = + store.transact(RJob.getUnfinishedCount(collective)) }) } diff --git a/modules/joex/src/main/scala/docspell/joex/JoexAppImpl.scala b/modules/joex/src/main/scala/docspell/joex/JoexAppImpl.scala index 80cc3825..f761bfa8 100644 --- a/modules/joex/src/main/scala/docspell/joex/JoexAppImpl.scala +++ b/modules/joex/src/main/scala/docspell/joex/JoexAppImpl.scala @@ -12,7 +12,7 @@ import fs2.concurrent.SignallingRef import docspell.analysis.TextAnalyser import docspell.backend.fulltext.CreateIndex -import docspell.backend.msg.{CancelJob, Topics} +import docspell.backend.msg.{CancelJob, JobQueuePublish, Topics} import docspell.backend.ops._ import docspell.common._ import docspell.ftsclient.FtsClient @@ -126,13 +126,13 @@ object JoexAppImpl { pubSub: PubSub[F] ): Resource[F, JoexApp[F]] = for { - queue <- JobQueue(store) pstore <- PeriodicTaskStore.create(store) client = JoexClient(httpClient) pubSubT = PubSubT( pubSub, Logger.log4s(org.log4s.getLogger(s"joex-${cfg.appId.id}")) ) + queue <- JobQueuePublish(store, pubSubT) joex <- OJoex(pubSubT) upload <- OUpload(store, queue, joex) fts <- createFtsClient(cfg)(httpClient) diff --git a/modules/restserver/src/main/scala/docspell/restserver/RestServer.scala b/modules/restserver/src/main/scala/docspell/restserver/RestServer.scala index 1122231d..663cc962 100644 --- a/modules/restserver/src/main/scala/docspell/restserver/RestServer.scala +++ b/modules/restserver/src/main/scala/docspell/restserver/RestServer.scala @@ -138,7 +138,7 @@ object RestServer { token: AuthToken ): HttpRoutes[F] = Router( - "ws" -> WebSocketRoutes(token, topic, wsB), + "ws" -> WebSocketRoutes(token, restApp.backend, topic, wsB), "auth" -> LoginRoutes.session(restApp.backend.login, cfg, token), "tag" -> TagRoutes(restApp.backend, token), "equipment" -> EquipmentRoutes(restApp.backend, token), diff --git a/modules/restserver/src/main/scala/docspell/restserver/Subscriptions.scala b/modules/restserver/src/main/scala/docspell/restserver/Subscriptions.scala index 3dc04aa5..faecac11 100644 --- a/modules/restserver/src/main/scala/docspell/restserver/Subscriptions.scala +++ b/modules/restserver/src/main/scala/docspell/restserver/Subscriptions.scala @@ -6,11 +6,11 @@ package docspell.restserver +import cats.effect.Async import fs2.Stream import fs2.concurrent.Topic -import docspell.backend.msg.JobDone -import docspell.common.ProcessItemArgs +import docspell.backend.msg.{JobDone, JobSubmitted} import docspell.pubsub.api.PubSubT import docspell.restserver.ws.OutputEvent @@ -18,15 +18,20 @@ import docspell.restserver.ws.OutputEvent */ object Subscriptions { - def apply[F[_]]( + def apply[F[_]: Async]( wsTopic: Topic[F, OutputEvent], pubSub: PubSubT[F] ): Stream[F, Nothing] = - jobDone(pubSub).through(wsTopic.publish) + jobDone(pubSub).merge(jobSubmitted(pubSub)).through(wsTopic.publish) def jobDone[F[_]](pubSub: PubSubT[F]): Stream[F, OutputEvent] = pubSub .subscribe(JobDone.topic) - .filter(m => m.body.task == ProcessItemArgs.taskName) - .map(m => OutputEvent.ItemProcessed(m.body.group)) + .map(m => OutputEvent.JobDone(m.body.group, m.body.task)) + + def jobSubmitted[F[_]](pubSub: PubSubT[F]): Stream[F, OutputEvent] = + pubSub + .subscribe(JobSubmitted.topic) + .map(m => OutputEvent.JobSubmitted(m.body.group, m.body.task)) + } diff --git a/modules/restserver/src/main/scala/docspell/restserver/ws/OutputEvent.scala b/modules/restserver/src/main/scala/docspell/restserver/ws/OutputEvent.scala index f7accaa8..d04424b2 100644 --- a/modules/restserver/src/main/scala/docspell/restserver/ws/OutputEvent.scala +++ b/modules/restserver/src/main/scala/docspell/restserver/ws/OutputEvent.scala @@ -31,18 +31,26 @@ object OutputEvent { Msg("keep-alive", ()).asJson } - final case class ItemProcessed(collective: Ident) extends OutputEvent { - def forCollective(token: AuthToken): Boolean = - token.account.collective == collective - - def asJson: Json = - Msg("item-processed", ()).asJson - } - - final case class JobsWaiting(group: Ident, count: Int) extends OutputEvent { + final case class JobSubmitted(group: Ident, task: Ident) extends OutputEvent { def forCollective(token: AuthToken): Boolean = token.account.collective == group + def asJson: Json = + Msg("job-submitted", task).asJson + } + + final case class JobDone(group: Ident, task: Ident) extends OutputEvent { + def forCollective(token: AuthToken): Boolean = + token.account.collective == group + + def asJson: Json = + Msg("job-done", task).asJson + } + + final case class JobsWaiting(collective: Ident, count: Int) extends OutputEvent { + def forCollective(token: AuthToken): Boolean = + token.account.collective == collective + def asJson: Json = Msg("jobs-waiting", count).asJson } diff --git a/modules/restserver/src/main/scala/docspell/restserver/ws/WebSocketRoutes.scala b/modules/restserver/src/main/scala/docspell/restserver/ws/WebSocketRoutes.scala index 022420b2..febbc0e1 100644 --- a/modules/restserver/src/main/scala/docspell/restserver/ws/WebSocketRoutes.scala +++ b/modules/restserver/src/main/scala/docspell/restserver/ws/WebSocketRoutes.scala @@ -7,9 +7,11 @@ package docspell.restserver.ws import cats.effect.Async +import cats.implicits._ import fs2.concurrent.Topic import fs2.{Pipe, Stream} +import docspell.backend.BackendApp import docspell.backend.auth.AuthToken import org.http4s.HttpRoutes @@ -22,6 +24,7 @@ object WebSocketRoutes { def apply[F[_]: Async]( user: AuthToken, + backend: BackendApp[F], topic: Topic[F, OutputEvent], wsb: WebSocketBuilder2[F] ): HttpRoutes[F] = { @@ -29,11 +32,18 @@ object WebSocketRoutes { import dsl._ HttpRoutes.of { case GET -> Root => + val init = + for { + jc <- backend.job.getUnfinishedJobCount(user.account.collective) + msg = OutputEvent.JobsWaiting(user.account.collective, jc) + } yield Text(msg.encode) + val toClient: Stream[F, WebSocketFrame.Text] = - topic - .subscribe(500) - .filter(_.forCollective(user)) - .map(msg => Text(msg.encode)) + Stream.eval(init) ++ + topic + .subscribe(500) + .filter(_.forCollective(user)) + .map(msg => Text(msg.encode)) val toServer: Pipe[F, WebSocketFrame, Unit] = _.map(_ => ()) diff --git a/modules/store/src/main/scala/docspell/store/queue/JobQueue.scala b/modules/store/src/main/scala/docspell/store/queue/JobQueue.scala index f81bfa96..feb59f60 100644 --- a/modules/store/src/main/scala/docspell/store/queue/JobQueue.scala +++ b/modules/store/src/main/scala/docspell/store/queue/JobQueue.scala @@ -30,9 +30,9 @@ trait JobQueue[F[_]] { */ def insertIfNew(job: RJob): F[Boolean] - def insertAll(jobs: Seq[RJob]): F[Int] + def insertAll(jobs: Seq[RJob]): F[List[Boolean]] - def insertAllIfNew(jobs: Seq[RJob]): F[Int] + def insertAllIfNew(jobs: Seq[RJob]): F[List[Boolean]] def nextJob( prio: Ident => F[Priority], @@ -77,26 +77,24 @@ object JobQueue { else insert(job).as(true) } yield ret - def insertAll(jobs: Seq[RJob]): F[Int] = + def insertAll(jobs: Seq[RJob]): F[List[Boolean]] = jobs.toList .traverse(j => insert(j).attempt) .flatMap(_.traverse { - case Right(()) => 1.pure[F] + case Right(()) => true.pure[F] case Left(ex) => - logger.error(ex)("Could not insert job. Skipping it.").as(0) + logger.error(ex)("Could not insert job. Skipping it.").as(false) }) - .map(_.sum) - def insertAllIfNew(jobs: Seq[RJob]): F[Int] = + def insertAllIfNew(jobs: Seq[RJob]): F[List[Boolean]] = jobs.toList .traverse(j => insertIfNew(j).attempt) .flatMap(_.traverse { - case Right(true) => 1.pure[F] - case Right(false) => 0.pure[F] + case Right(true) => true.pure[F] + case Right(false) => false.pure[F] case Left(ex) => - logger.error(ex)("Could not insert job. Skipping it.").as(0) + logger.error(ex)("Could not insert job. Skipping it.").as(false) }) - .map(_.sum) }) } diff --git a/modules/store/src/main/scala/docspell/store/records/RJob.scala b/modules/store/src/main/scala/docspell/store/records/RJob.scala index 65d5718d..89852623 100644 --- a/modules/store/src/main/scala/docspell/store/records/RJob.scala +++ b/modules/store/src/main/scala/docspell/store/records/RJob.scala @@ -300,4 +300,10 @@ object RJob { where(T.tracker === trackerId, T.state.in(JobState.notDone)) ).query[RJob].option + def getUnfinishedCount(group: Ident): ConnectionIO[Int] = + run( + select(count(T.id)), + from(T), + T.group === group && T.state.in(JobState.notDone) + ).query[Int].unique } diff --git a/modules/webapp/src/main/elm/App/Data.elm b/modules/webapp/src/main/elm/App/Data.elm index 340be3b2..4fcd485d 100644 --- a/modules/webapp/src/main/elm/App/Data.elm +++ b/modules/webapp/src/main/elm/App/Data.elm @@ -66,6 +66,7 @@ type alias Model = , anonymousUiLang : UiLanguage , langMenuOpen : Bool , showNewItemsArrived : Bool + , jobsWaiting : Int } @@ -129,6 +130,7 @@ init key url flags_ settings = , anonymousUiLang = Messages.UiLanguage.English , langMenuOpen = False , showNewItemsArrived = False + , jobsWaiting = 0 } , Cmd.batch [ Cmd.map UserSettingsMsg uc diff --git a/modules/webapp/src/main/elm/App/Update.elm b/modules/webapp/src/main/elm/App/Update.elm index c525945c..24a3ab41 100644 --- a/modules/webapp/src/main/elm/App/Update.elm +++ b/modules/webapp/src/main/elm/App/Update.elm @@ -311,20 +311,28 @@ updateWithSub msg model = ReceiveWsMessage data -> case data of - Ok ItemProcessed -> + Ok (JobDone task) -> let - newModel = - { model | showNewItemsArrived = True } - in - case model.page of - HomePage -> - updateHome texts Page.Home.Data.RefreshView newModel + isProcessItem = + task == "process-item" - _ -> - ( newModel, Cmd.none, Sub.none ) + newModel = + { model + | showNewItemsArrived = isProcessItem + , jobsWaiting = max 0 (model.jobsWaiting - 1) + } + in + if model.page == HomePage && isProcessItem then + updateHome texts Page.Home.Data.RefreshView newModel + + else + ( newModel, Cmd.none, Sub.none ) + + Ok (JobSubmitted _) -> + ( { model | jobsWaiting = model.jobsWaiting + 1 }, Cmd.none, Sub.none ) Ok (JobsWaiting n) -> - ( model, Cmd.none, Sub.none ) + ( { model | jobsWaiting = max 0 n }, Cmd.none, Sub.none ) Err err -> ( model, Cmd.none, Sub.none ) diff --git a/modules/webapp/src/main/elm/App/View2.elm b/modules/webapp/src/main/elm/App/View2.elm index 4e3d657c..6d943e32 100644 --- a/modules/webapp/src/main/elm/App/View2.elm +++ b/modules/webapp/src/main/elm/App/View2.elm @@ -259,10 +259,21 @@ dataMenu texts _ model = div [ class "relative" ] [ a [ class dropdownLink + , class "inline-block relative" , onClick ToggleNavMenu , href "#" ] [ i [ class "fa fa-cogs" ] [] + , div + [ class "h-5 w-5 rounded-full text-xs px-1 py-1 absolute top-1 left-1 font-bold" + , class "dark:bg-lightblue-500 dark:border-gray-50 dark:text-gray-800" + , class "bg-blue-500 text-gray-50" + , classList [ ( "hidden", model.jobsWaiting <= 0 ) ] + ] + [ div [ class "-mt-0.5 ml-0.5" ] + [ text (String.fromInt model.jobsWaiting) + ] + ] ] , div [ class dropdownMenu @@ -301,7 +312,14 @@ dataMenu texts _ model = , dataPageLink model QueuePage [] - [ i [ class "fa fa-tachometer-alt w-6" ] [] + [ i + [ if model.jobsWaiting <= 0 then + class "fa fa-tachometer-alt w-6" + + else + class "fa fa-circle dark:text-lightblue-500 text-blue-500" + ] + [] , span [ class "ml-1" ] [ text texts.processingQueue ] diff --git a/modules/webapp/src/main/elm/Data/ServerEvent.elm b/modules/webapp/src/main/elm/Data/ServerEvent.elm index 5844eaaf..194ba0cc 100644 --- a/modules/webapp/src/main/elm/Data/ServerEvent.elm +++ b/modules/webapp/src/main/elm/Data/ServerEvent.elm @@ -11,7 +11,8 @@ import Json.Decode as D type ServerEvent - = ItemProcessed + = JobSubmitted String + | JobDone String | JobsWaiting Int @@ -30,8 +31,13 @@ decode json = decodeTag : String -> D.Decoder ServerEvent decodeTag tag = case tag of - "item-processed" -> - D.succeed ItemProcessed + "job-done" -> + D.field "content" D.string + |> D.map JobDone + + "job-submitted" -> + D.field "content" D.string + |> D.map JobSubmitted "jobs-waiting" -> D.field "content" D.int