From 7a8527f821102ef8d5eeee4385033c3e2c526713 Mon Sep 17 00:00:00 2001
From: eikek <eike.kettner@posteo.de>
Date: Fri, 12 Nov 2021 21:56:48 +0100
Subject: [PATCH] Indicate number of running jobs in tob nav

Issue: #1069
---
 .../scala/docspell/backend/BackendApp.scala   |  4 +-
 .../backend/msg/JobQueuePublish.scala         | 59 +++++++++++++++++++
 .../docspell/backend/msg/JobSubmitted.scala   | 26 ++++++++
 .../scala/docspell/backend/msg/Topics.scala   |  2 +-
 .../scala/docspell/backend/ops/OJob.scala     |  5 ++
 .../scala/docspell/joex/JoexAppImpl.scala     |  4 +-
 .../docspell/restserver/RestServer.scala      |  2 +-
 .../docspell/restserver/Subscriptions.scala   | 17 ++++--
 .../docspell/restserver/ws/OutputEvent.scala  | 26 +++++---
 .../restserver/ws/WebSocketRoutes.scala       | 18 ++++--
 .../scala/docspell/store/queue/JobQueue.scala | 20 +++----
 .../scala/docspell/store/records/RJob.scala   |  6 ++
 modules/webapp/src/main/elm/App/Data.elm      |  2 +
 modules/webapp/src/main/elm/App/Update.elm    | 28 +++++----
 modules/webapp/src/main/elm/App/View2.elm     | 20 ++++++-
 .../webapp/src/main/elm/Data/ServerEvent.elm  | 12 +++-
 16 files changed, 201 insertions(+), 50 deletions(-)
 create mode 100644 modules/backend/src/main/scala/docspell/backend/msg/JobQueuePublish.scala
 create mode 100644 modules/backend/src/main/scala/docspell/backend/msg/JobSubmitted.scala

diff --git a/modules/backend/src/main/scala/docspell/backend/BackendApp.scala b/modules/backend/src/main/scala/docspell/backend/BackendApp.scala
index cdc92fcc..812bfccd 100644
--- a/modules/backend/src/main/scala/docspell/backend/BackendApp.scala
+++ b/modules/backend/src/main/scala/docspell/backend/BackendApp.scala
@@ -10,12 +10,12 @@ import cats.effect._
 
 import docspell.backend.auth.Login
 import docspell.backend.fulltext.CreateIndex
+import docspell.backend.msg.JobQueuePublish
 import docspell.backend.ops._
 import docspell.backend.signup.OSignup
 import docspell.ftsclient.FtsClient
 import docspell.pubsub.api.PubSubT
 import docspell.store.Store
-import docspell.store.queue.JobQueue
 import docspell.store.usertask.UserTaskStore
 import docspell.totp.Totp
 
@@ -58,7 +58,7 @@ object BackendApp {
   ): Resource[F, BackendApp[F]] =
     for {
       utStore <- UserTaskStore(store)
-      queue <- JobQueue(store)
+      queue <- JobQueuePublish(store, pubSubT)
       totpImpl <- OTotp(store, Totp.default)
       loginImpl <- Login[F](store, Totp.default)
       signupImpl <- OSignup[F](store)
diff --git a/modules/backend/src/main/scala/docspell/backend/msg/JobQueuePublish.scala b/modules/backend/src/main/scala/docspell/backend/msg/JobQueuePublish.scala
new file mode 100644
index 00000000..64b0b6d6
--- /dev/null
+++ b/modules/backend/src/main/scala/docspell/backend/msg/JobQueuePublish.scala
@@ -0,0 +1,59 @@
+/*
+ * Copyright 2020 Eike K. & Contributors
+ *
+ * SPDX-License-Identifier: AGPL-3.0-or-later
+ */
+
+package docspell.backend.msg
+
+import cats.effect._
+import cats.implicits._
+
+import docspell.common.{Duration, Ident, Priority}
+import docspell.pubsub.api.PubSubT
+import docspell.store.Store
+import docspell.store.queue.JobQueue
+import docspell.store.records.RJob
+
+final class JobQueuePublish[F[_]: Sync](delegate: JobQueue[F], pubsub: PubSubT[F])
+    extends JobQueue[F] {
+
+  private def msg(job: RJob): JobSubmitted =
+    JobSubmitted(job.id, job.group, job.task, job.args)
+
+  private def publish(job: RJob): F[Unit] =
+    pubsub.publish1(JobSubmitted.topic, msg(job)).as(())
+
+  def insert(job: RJob) =
+    delegate.insert(job).flatTap(_ => publish(job))
+
+  def insertIfNew(job: RJob) =
+    delegate.insertIfNew(job).flatTap {
+      case true  => publish(job)
+      case false => ().pure[F]
+    }
+
+  def insertAll(jobs: Seq[RJob]) =
+    delegate.insertAll(jobs).flatTap { results =>
+      results.zip(jobs).traverse { case (res, job) =>
+        if (res) publish(job)
+        else ().pure[F]
+      }
+    }
+
+  def insertAllIfNew(jobs: Seq[RJob]) =
+    delegate.insertAllIfNew(jobs).flatTap { results =>
+      results.zip(jobs).traverse { case (res, job) =>
+        if (res) publish(job)
+        else ().pure[F]
+      }
+    }
+
+  def nextJob(prio: Ident => F[Priority], worker: Ident, retryPause: Duration) =
+    delegate.nextJob(prio, worker, retryPause)
+}
+
+object JobQueuePublish {
+  def apply[F[_]: Async](store: Store[F], pubSub: PubSubT[F]): Resource[F, JobQueue[F]] =
+    JobQueue(store).map(q => new JobQueuePublish[F](q, pubSub))
+}
diff --git a/modules/backend/src/main/scala/docspell/backend/msg/JobSubmitted.scala b/modules/backend/src/main/scala/docspell/backend/msg/JobSubmitted.scala
new file mode 100644
index 00000000..2cecdde8
--- /dev/null
+++ b/modules/backend/src/main/scala/docspell/backend/msg/JobSubmitted.scala
@@ -0,0 +1,26 @@
+/*
+ * Copyright 2020 Eike K. & Contributors
+ *
+ * SPDX-License-Identifier: AGPL-3.0-or-later
+ */
+
+package docspell.backend.msg
+import docspell.common._
+import docspell.pubsub.api.{Topic, TypedTopic}
+
+import io.circe.generic.semiauto.{deriveDecoder, deriveEncoder}
+import io.circe.{Decoder, Encoder}
+
+final case class JobSubmitted(jobId: Ident, group: Ident, task: Ident, args: String)
+
+object JobSubmitted {
+
+  implicit val jsonDecoder: Decoder[JobSubmitted] =
+    deriveDecoder
+
+  implicit val jsonEncoder: Encoder[JobSubmitted] =
+    deriveEncoder
+
+  val topic: TypedTopic[JobSubmitted] =
+    TypedTopic(Topic("job-submitted"))
+}
diff --git a/modules/backend/src/main/scala/docspell/backend/msg/Topics.scala b/modules/backend/src/main/scala/docspell/backend/msg/Topics.scala
index 26c594f7..ae53d9f6 100644
--- a/modules/backend/src/main/scala/docspell/backend/msg/Topics.scala
+++ b/modules/backend/src/main/scala/docspell/backend/msg/Topics.scala
@@ -19,5 +19,5 @@ object Topics {
 
   /** A list of all topics. It is required to list every topic in use here! */
   val all: NonEmptyList[TypedTopic[_]] =
-    NonEmptyList.of(JobDone.topic, CancelJob.topic, jobsNotify)
+    NonEmptyList.of(JobDone.topic, CancelJob.topic, jobsNotify, JobSubmitted.topic)
 }
diff --git a/modules/backend/src/main/scala/docspell/backend/ops/OJob.scala b/modules/backend/src/main/scala/docspell/backend/ops/OJob.scala
index 7f7a36fd..bcd70da4 100644
--- a/modules/backend/src/main/scala/docspell/backend/ops/OJob.scala
+++ b/modules/backend/src/main/scala/docspell/backend/ops/OJob.scala
@@ -24,6 +24,8 @@ trait OJob[F[_]] {
   def cancelJob(id: Ident, collective: Ident): F[JobCancelResult]
 
   def setPriority(id: Ident, collective: Ident, prio: Priority): F[UpdateResult]
+
+  def getUnfinishedJobCount(collective: Ident): F[Int]
 }
 
 object OJob {
@@ -93,5 +95,8 @@ object OJob {
         } yield result)
           .getOrElse(JobCancelResult.jobNotFound)
       }
+
+      def getUnfinishedJobCount(collective: Ident): F[Int] =
+        store.transact(RJob.getUnfinishedCount(collective))
     })
 }
diff --git a/modules/joex/src/main/scala/docspell/joex/JoexAppImpl.scala b/modules/joex/src/main/scala/docspell/joex/JoexAppImpl.scala
index 80cc3825..f761bfa8 100644
--- a/modules/joex/src/main/scala/docspell/joex/JoexAppImpl.scala
+++ b/modules/joex/src/main/scala/docspell/joex/JoexAppImpl.scala
@@ -12,7 +12,7 @@ import fs2.concurrent.SignallingRef
 
 import docspell.analysis.TextAnalyser
 import docspell.backend.fulltext.CreateIndex
-import docspell.backend.msg.{CancelJob, Topics}
+import docspell.backend.msg.{CancelJob, JobQueuePublish, Topics}
 import docspell.backend.ops._
 import docspell.common._
 import docspell.ftsclient.FtsClient
@@ -126,13 +126,13 @@ object JoexAppImpl {
       pubSub: PubSub[F]
   ): Resource[F, JoexApp[F]] =
     for {
-      queue <- JobQueue(store)
       pstore <- PeriodicTaskStore.create(store)
       client = JoexClient(httpClient)
       pubSubT = PubSubT(
         pubSub,
         Logger.log4s(org.log4s.getLogger(s"joex-${cfg.appId.id}"))
       )
+      queue <- JobQueuePublish(store, pubSubT)
       joex <- OJoex(pubSubT)
       upload <- OUpload(store, queue, joex)
       fts <- createFtsClient(cfg)(httpClient)
diff --git a/modules/restserver/src/main/scala/docspell/restserver/RestServer.scala b/modules/restserver/src/main/scala/docspell/restserver/RestServer.scala
index 1122231d..663cc962 100644
--- a/modules/restserver/src/main/scala/docspell/restserver/RestServer.scala
+++ b/modules/restserver/src/main/scala/docspell/restserver/RestServer.scala
@@ -138,7 +138,7 @@ object RestServer {
       token: AuthToken
   ): HttpRoutes[F] =
     Router(
-      "ws" -> WebSocketRoutes(token, topic, wsB),
+      "ws" -> WebSocketRoutes(token, restApp.backend, topic, wsB),
       "auth" -> LoginRoutes.session(restApp.backend.login, cfg, token),
       "tag" -> TagRoutes(restApp.backend, token),
       "equipment" -> EquipmentRoutes(restApp.backend, token),
diff --git a/modules/restserver/src/main/scala/docspell/restserver/Subscriptions.scala b/modules/restserver/src/main/scala/docspell/restserver/Subscriptions.scala
index 3dc04aa5..faecac11 100644
--- a/modules/restserver/src/main/scala/docspell/restserver/Subscriptions.scala
+++ b/modules/restserver/src/main/scala/docspell/restserver/Subscriptions.scala
@@ -6,11 +6,11 @@
 
 package docspell.restserver
 
+import cats.effect.Async
 import fs2.Stream
 import fs2.concurrent.Topic
 
-import docspell.backend.msg.JobDone
-import docspell.common.ProcessItemArgs
+import docspell.backend.msg.{JobDone, JobSubmitted}
 import docspell.pubsub.api.PubSubT
 import docspell.restserver.ws.OutputEvent
 
@@ -18,15 +18,20 @@ import docspell.restserver.ws.OutputEvent
   */
 object Subscriptions {
 
-  def apply[F[_]](
+  def apply[F[_]: Async](
       wsTopic: Topic[F, OutputEvent],
       pubSub: PubSubT[F]
   ): Stream[F, Nothing] =
-    jobDone(pubSub).through(wsTopic.publish)
+    jobDone(pubSub).merge(jobSubmitted(pubSub)).through(wsTopic.publish)
 
   def jobDone[F[_]](pubSub: PubSubT[F]): Stream[F, OutputEvent] =
     pubSub
       .subscribe(JobDone.topic)
-      .filter(m => m.body.task == ProcessItemArgs.taskName)
-      .map(m => OutputEvent.ItemProcessed(m.body.group))
+      .map(m => OutputEvent.JobDone(m.body.group, m.body.task))
+
+  def jobSubmitted[F[_]](pubSub: PubSubT[F]): Stream[F, OutputEvent] =
+    pubSub
+      .subscribe(JobSubmitted.topic)
+      .map(m => OutputEvent.JobSubmitted(m.body.group, m.body.task))
+
 }
diff --git a/modules/restserver/src/main/scala/docspell/restserver/ws/OutputEvent.scala b/modules/restserver/src/main/scala/docspell/restserver/ws/OutputEvent.scala
index f7accaa8..d04424b2 100644
--- a/modules/restserver/src/main/scala/docspell/restserver/ws/OutputEvent.scala
+++ b/modules/restserver/src/main/scala/docspell/restserver/ws/OutputEvent.scala
@@ -31,18 +31,26 @@ object OutputEvent {
       Msg("keep-alive", ()).asJson
   }
 
-  final case class ItemProcessed(collective: Ident) extends OutputEvent {
-    def forCollective(token: AuthToken): Boolean =
-      token.account.collective == collective
-
-    def asJson: Json =
-      Msg("item-processed", ()).asJson
-  }
-
-  final case class JobsWaiting(group: Ident, count: Int) extends OutputEvent {
+  final case class JobSubmitted(group: Ident, task: Ident) extends OutputEvent {
     def forCollective(token: AuthToken): Boolean =
       token.account.collective == group
 
+    def asJson: Json =
+      Msg("job-submitted", task).asJson
+  }
+
+  final case class JobDone(group: Ident, task: Ident) extends OutputEvent {
+    def forCollective(token: AuthToken): Boolean =
+      token.account.collective == group
+
+    def asJson: Json =
+      Msg("job-done", task).asJson
+  }
+
+  final case class JobsWaiting(collective: Ident, count: Int) extends OutputEvent {
+    def forCollective(token: AuthToken): Boolean =
+      token.account.collective == collective
+
     def asJson: Json =
       Msg("jobs-waiting", count).asJson
   }
diff --git a/modules/restserver/src/main/scala/docspell/restserver/ws/WebSocketRoutes.scala b/modules/restserver/src/main/scala/docspell/restserver/ws/WebSocketRoutes.scala
index 022420b2..febbc0e1 100644
--- a/modules/restserver/src/main/scala/docspell/restserver/ws/WebSocketRoutes.scala
+++ b/modules/restserver/src/main/scala/docspell/restserver/ws/WebSocketRoutes.scala
@@ -7,9 +7,11 @@
 package docspell.restserver.ws
 
 import cats.effect.Async
+import cats.implicits._
 import fs2.concurrent.Topic
 import fs2.{Pipe, Stream}
 
+import docspell.backend.BackendApp
 import docspell.backend.auth.AuthToken
 
 import org.http4s.HttpRoutes
@@ -22,6 +24,7 @@ object WebSocketRoutes {
 
   def apply[F[_]: Async](
       user: AuthToken,
+      backend: BackendApp[F],
       topic: Topic[F, OutputEvent],
       wsb: WebSocketBuilder2[F]
   ): HttpRoutes[F] = {
@@ -29,11 +32,18 @@ object WebSocketRoutes {
     import dsl._
 
     HttpRoutes.of { case GET -> Root =>
+      val init =
+        for {
+          jc <- backend.job.getUnfinishedJobCount(user.account.collective)
+          msg = OutputEvent.JobsWaiting(user.account.collective, jc)
+        } yield Text(msg.encode)
+
       val toClient: Stream[F, WebSocketFrame.Text] =
-        topic
-          .subscribe(500)
-          .filter(_.forCollective(user))
-          .map(msg => Text(msg.encode))
+        Stream.eval(init) ++
+          topic
+            .subscribe(500)
+            .filter(_.forCollective(user))
+            .map(msg => Text(msg.encode))
 
       val toServer: Pipe[F, WebSocketFrame, Unit] =
         _.map(_ => ())
diff --git a/modules/store/src/main/scala/docspell/store/queue/JobQueue.scala b/modules/store/src/main/scala/docspell/store/queue/JobQueue.scala
index f81bfa96..feb59f60 100644
--- a/modules/store/src/main/scala/docspell/store/queue/JobQueue.scala
+++ b/modules/store/src/main/scala/docspell/store/queue/JobQueue.scala
@@ -30,9 +30,9 @@ trait JobQueue[F[_]] {
     */
   def insertIfNew(job: RJob): F[Boolean]
 
-  def insertAll(jobs: Seq[RJob]): F[Int]
+  def insertAll(jobs: Seq[RJob]): F[List[Boolean]]
 
-  def insertAllIfNew(jobs: Seq[RJob]): F[Int]
+  def insertAllIfNew(jobs: Seq[RJob]): F[List[Boolean]]
 
   def nextJob(
       prio: Ident => F[Priority],
@@ -77,26 +77,24 @@ object JobQueue {
             else insert(job).as(true)
         } yield ret
 
-      def insertAll(jobs: Seq[RJob]): F[Int] =
+      def insertAll(jobs: Seq[RJob]): F[List[Boolean]] =
         jobs.toList
           .traverse(j => insert(j).attempt)
           .flatMap(_.traverse {
-            case Right(()) => 1.pure[F]
+            case Right(()) => true.pure[F]
             case Left(ex) =>
-              logger.error(ex)("Could not insert job. Skipping it.").as(0)
+              logger.error(ex)("Could not insert job. Skipping it.").as(false)
 
           })
-          .map(_.sum)
 
-      def insertAllIfNew(jobs: Seq[RJob]): F[Int] =
+      def insertAllIfNew(jobs: Seq[RJob]): F[List[Boolean]] =
         jobs.toList
           .traverse(j => insertIfNew(j).attempt)
           .flatMap(_.traverse {
-            case Right(true)  => 1.pure[F]
-            case Right(false) => 0.pure[F]
+            case Right(true)  => true.pure[F]
+            case Right(false) => false.pure[F]
             case Left(ex) =>
-              logger.error(ex)("Could not insert job. Skipping it.").as(0)
+              logger.error(ex)("Could not insert job. Skipping it.").as(false)
           })
-          .map(_.sum)
     })
 }
diff --git a/modules/store/src/main/scala/docspell/store/records/RJob.scala b/modules/store/src/main/scala/docspell/store/records/RJob.scala
index 65d5718d..89852623 100644
--- a/modules/store/src/main/scala/docspell/store/records/RJob.scala
+++ b/modules/store/src/main/scala/docspell/store/records/RJob.scala
@@ -300,4 +300,10 @@ object RJob {
       where(T.tracker === trackerId, T.state.in(JobState.notDone))
     ).query[RJob].option
 
+  def getUnfinishedCount(group: Ident): ConnectionIO[Int] =
+    run(
+      select(count(T.id)),
+      from(T),
+      T.group === group && T.state.in(JobState.notDone)
+    ).query[Int].unique
 }
diff --git a/modules/webapp/src/main/elm/App/Data.elm b/modules/webapp/src/main/elm/App/Data.elm
index 340be3b2..4fcd485d 100644
--- a/modules/webapp/src/main/elm/App/Data.elm
+++ b/modules/webapp/src/main/elm/App/Data.elm
@@ -66,6 +66,7 @@ type alias Model =
     , anonymousUiLang : UiLanguage
     , langMenuOpen : Bool
     , showNewItemsArrived : Bool
+    , jobsWaiting : Int
     }
 
 
@@ -129,6 +130,7 @@ init key url flags_ settings =
       , anonymousUiLang = Messages.UiLanguage.English
       , langMenuOpen = False
       , showNewItemsArrived = False
+      , jobsWaiting = 0
       }
     , Cmd.batch
         [ Cmd.map UserSettingsMsg uc
diff --git a/modules/webapp/src/main/elm/App/Update.elm b/modules/webapp/src/main/elm/App/Update.elm
index c525945c..24a3ab41 100644
--- a/modules/webapp/src/main/elm/App/Update.elm
+++ b/modules/webapp/src/main/elm/App/Update.elm
@@ -311,20 +311,28 @@ updateWithSub msg model =
 
         ReceiveWsMessage data ->
             case data of
-                Ok ItemProcessed ->
+                Ok (JobDone task) ->
                     let
-                        newModel =
-                            { model | showNewItemsArrived = True }
-                    in
-                    case model.page of
-                        HomePage ->
-                            updateHome texts Page.Home.Data.RefreshView newModel
+                        isProcessItem =
+                            task == "process-item"
 
-                        _ ->
-                            ( newModel, Cmd.none, Sub.none )
+                        newModel =
+                            { model
+                                | showNewItemsArrived = isProcessItem
+                                , jobsWaiting = max 0 (model.jobsWaiting - 1)
+                            }
+                    in
+                    if model.page == HomePage && isProcessItem then
+                        updateHome texts Page.Home.Data.RefreshView newModel
+
+                    else
+                        ( newModel, Cmd.none, Sub.none )
+
+                Ok (JobSubmitted _) ->
+                    ( { model | jobsWaiting = model.jobsWaiting + 1 }, Cmd.none, Sub.none )
 
                 Ok (JobsWaiting n) ->
-                    ( model, Cmd.none, Sub.none )
+                    ( { model | jobsWaiting = max 0 n }, Cmd.none, Sub.none )
 
                 Err err ->
                     ( model, Cmd.none, Sub.none )
diff --git a/modules/webapp/src/main/elm/App/View2.elm b/modules/webapp/src/main/elm/App/View2.elm
index 4e3d657c..6d943e32 100644
--- a/modules/webapp/src/main/elm/App/View2.elm
+++ b/modules/webapp/src/main/elm/App/View2.elm
@@ -259,10 +259,21 @@ dataMenu texts _ model =
     div [ class "relative" ]
         [ a
             [ class dropdownLink
+            , class "inline-block relative"
             , onClick ToggleNavMenu
             , href "#"
             ]
             [ i [ class "fa fa-cogs" ] []
+            , div
+                [ class "h-5 w-5 rounded-full text-xs px-1 py-1 absolute top-1 left-1 font-bold"
+                , class "dark:bg-lightblue-500 dark:border-gray-50 dark:text-gray-800"
+                , class "bg-blue-500 text-gray-50"
+                , classList [ ( "hidden", model.jobsWaiting <= 0 ) ]
+                ]
+                [ div [ class "-mt-0.5 ml-0.5" ]
+                    [ text (String.fromInt model.jobsWaiting)
+                    ]
+                ]
             ]
         , div
             [ class dropdownMenu
@@ -301,7 +312,14 @@ dataMenu texts _ model =
             , dataPageLink model
                 QueuePage
                 []
-                [ i [ class "fa fa-tachometer-alt w-6" ] []
+                [ i
+                    [ if model.jobsWaiting <= 0 then
+                        class "fa fa-tachometer-alt w-6"
+
+                      else
+                        class "fa fa-circle dark:text-lightblue-500 text-blue-500"
+                    ]
+                    []
                 , span [ class "ml-1" ]
                     [ text texts.processingQueue
                     ]
diff --git a/modules/webapp/src/main/elm/Data/ServerEvent.elm b/modules/webapp/src/main/elm/Data/ServerEvent.elm
index 5844eaaf..194ba0cc 100644
--- a/modules/webapp/src/main/elm/Data/ServerEvent.elm
+++ b/modules/webapp/src/main/elm/Data/ServerEvent.elm
@@ -11,7 +11,8 @@ import Json.Decode as D
 
 
 type ServerEvent
-    = ItemProcessed
+    = JobSubmitted String
+    | JobDone String
     | JobsWaiting Int
 
 
@@ -30,8 +31,13 @@ decode json =
 decodeTag : String -> D.Decoder ServerEvent
 decodeTag tag =
     case tag of
-        "item-processed" ->
-            D.succeed ItemProcessed
+        "job-done" ->
+            D.field "content" D.string
+                |> D.map JobDone
+
+        "job-submitted" ->
+            D.field "content" D.string
+                |> D.map JobSubmitted
 
         "jobs-waiting" ->
             D.field "content" D.int