From cf933b60a7bad74afe170414bf6dcb720d60797a Mon Sep 17 00:00:00 2001 From: eikek Date: Thu, 11 Nov 2021 21:55:31 +0100 Subject: [PATCH] Encode ws messages as JSON --- .../scala/docspell/joex/JoexAppImpl.scala | 2 +- .../docspell/restserver/ws/OutputEvent.scala | 31 ++++++++++++-- .../restserver/ws/WebSocketRoutes.scala | 2 - .../scala/docspell/store/queue/JobQueue.scala | 42 ++++++++++--------- modules/webapp/src/main/elm/App/Data.elm | 3 +- modules/webapp/src/main/elm/App/Update.elm | 13 +++--- .../webapp/src/main/elm/Data/ServerEvent.elm | 31 +++++++++++--- modules/webapp/src/main/elm/Main.elm | 2 +- modules/webapp/src/main/elm/Ports.elm | 11 ++++- modules/webapp/src/main/webjar/docspell.js | 8 +++- 10 files changed, 99 insertions(+), 46 deletions(-) diff --git a/modules/joex/src/main/scala/docspell/joex/JoexAppImpl.scala b/modules/joex/src/main/scala/docspell/joex/JoexAppImpl.scala index 2ec86d11..80cc3825 100644 --- a/modules/joex/src/main/scala/docspell/joex/JoexAppImpl.scala +++ b/modules/joex/src/main/scala/docspell/joex/JoexAppImpl.scala @@ -94,7 +94,7 @@ final class JoexAppImpl[F[_]: Async]( AllPreviewsTask .job(MakePreviewArgs.StoreMode.WhenMissing, None) .flatMap(queue.insertIfNew) *> - AllPageCountTask.job.flatMap(queue.insertIfNew) + AllPageCountTask.job.flatMap(queue.insertIfNew).as(()) private def scheduleEmptyTrashTasks: F[Unit] = store diff --git a/modules/restserver/src/main/scala/docspell/restserver/ws/OutputEvent.scala b/modules/restserver/src/main/scala/docspell/restserver/ws/OutputEvent.scala index 8567f3a7..bb9378f7 100644 --- a/modules/restserver/src/main/scala/docspell/restserver/ws/OutputEvent.scala +++ b/modules/restserver/src/main/scala/docspell/restserver/ws/OutputEvent.scala @@ -8,25 +8,48 @@ package docspell.restserver.ws import docspell.backend.auth.AuthToken import docspell.common._ +import io.circe._ +import io.circe.syntax._ +import io.circe.generic.semiauto.deriveEncoder +/** The event that is sent to clients through a websocket connection. All events are + * encoded as JSON. + */ sealed trait OutputEvent { def forCollective(token: AuthToken): Boolean - def encode: String + def asJson: Json + def encode: String = + asJson.noSpaces } object OutputEvent { case object KeepAlive extends OutputEvent { def forCollective(token: AuthToken): Boolean = true - def encode: String = "keep-alive" + def asJson: Json = + Msg("keep-alive", ()).asJson } final case class ItemProcessed(collective: Ident) extends OutputEvent { def forCollective(token: AuthToken): Boolean = token.account.collective == collective - def encode: String = - "item-processed" + def asJson: Json = + Msg("item-processed", ()).asJson } + final case class JobsWaiting(group: Ident, count: Int) extends OutputEvent { + def forCollective(token: AuthToken): Boolean = + token.account.collective == group + + def asJson: Json = + Msg("jobs-waiting", count).asJson + } + + private case class Msg[A](tag: String, content: A) + private object Msg { + @scala.annotation.nowarn + implicit def jsonEncoder[A: Encoder]: Encoder[Msg[A]] = + deriveEncoder + } } diff --git a/modules/restserver/src/main/scala/docspell/restserver/ws/WebSocketRoutes.scala b/modules/restserver/src/main/scala/docspell/restserver/ws/WebSocketRoutes.scala index 022420b2..1e92d10a 100644 --- a/modules/restserver/src/main/scala/docspell/restserver/ws/WebSocketRoutes.scala +++ b/modules/restserver/src/main/scala/docspell/restserver/ws/WebSocketRoutes.scala @@ -9,9 +9,7 @@ package docspell.restserver.ws import cats.effect.Async import fs2.concurrent.Topic import fs2.{Pipe, Stream} - import docspell.backend.auth.AuthToken - import org.http4s.HttpRoutes import org.http4s.dsl.Http4sDsl import org.http4s.server.websocket.WebSocketBuilder2 diff --git a/modules/store/src/main/scala/docspell/store/queue/JobQueue.scala b/modules/store/src/main/scala/docspell/store/queue/JobQueue.scala index d6920283..f81bfa96 100644 --- a/modules/store/src/main/scala/docspell/store/queue/JobQueue.scala +++ b/modules/store/src/main/scala/docspell/store/queue/JobQueue.scala @@ -10,12 +10,11 @@ import cats.effect._ import cats.implicits._ import docspell.common._ -import docspell.common.syntax.all._ import docspell.store.Store import docspell.store.queries.QJob import docspell.store.records.RJob -import org.log4s._ +import org.log4s.getLogger trait JobQueue[F[_]] { @@ -29,11 +28,11 @@ trait JobQueue[F[_]] { * * If the job has no tracker defined, it is simply inserted. */ - def insertIfNew(job: RJob): F[Unit] + def insertIfNew(job: RJob): F[Boolean] - def insertAll(jobs: Seq[RJob]): F[Unit] + def insertAll(jobs: Seq[RJob]): F[Int] - def insertAllIfNew(jobs: Seq[RJob]): F[Unit] + def insertAllIfNew(jobs: Seq[RJob]): F[Int] def nextJob( prio: Ident => F[Priority], @@ -43,10 +42,9 @@ trait JobQueue[F[_]] { } object JobQueue { - private[this] val logger = getLogger - def apply[F[_]: Async](store: Store[F]): Resource[F, JobQueue[F]] = Resource.pure[F, JobQueue[F]](new JobQueue[F] { + private[this] val logger = Logger.log4s(getLogger) def nextJob( prio: Ident => F[Priority], @@ -54,7 +52,7 @@ object JobQueue { retryPause: Duration ): F[Option[RJob]] = logger - .ftrace("Select next job") *> QJob.takeNextJob(store)(prio, worker, retryPause) + .trace("Select next job") *> QJob.takeNextJob(store)(prio, worker, retryPause) def insert(job: RJob): F[Unit] = store @@ -66,7 +64,7 @@ object JobQueue { else ().pure[F] } - def insertIfNew(job: RJob): F[Unit] = + def insertIfNew(job: RJob): F[Boolean] = for { rj <- job.tracker match { case Some(tid) => @@ -75,26 +73,30 @@ object JobQueue { None.pure[F] } ret <- - if (rj.isDefined) ().pure[F] - else insert(job) + if (rj.isDefined) false.pure[F] + else insert(job).as(true) } yield ret - def insertAll(jobs: Seq[RJob]): F[Unit] = + def insertAll(jobs: Seq[RJob]): F[Int] = jobs.toList .traverse(j => insert(j).attempt) - .map(_.foreach { - case Right(()) => + .flatMap(_.traverse { + case Right(()) => 1.pure[F] case Left(ex) => - logger.error(ex)("Could not insert job. Skipping it.") - }) + logger.error(ex)("Could not insert job. Skipping it.").as(0) - def insertAllIfNew(jobs: Seq[RJob]): F[Unit] = + }) + .map(_.sum) + + def insertAllIfNew(jobs: Seq[RJob]): F[Int] = jobs.toList .traverse(j => insertIfNew(j).attempt) - .map(_.foreach { - case Right(()) => + .flatMap(_.traverse { + case Right(true) => 1.pure[F] + case Right(false) => 0.pure[F] case Left(ex) => - logger.error(ex)("Could not insert job. Skipping it.") + logger.error(ex)("Could not insert job. Skipping it.").as(0) }) + .map(_.sum) }) } diff --git a/modules/webapp/src/main/elm/App/Data.elm b/modules/webapp/src/main/elm/App/Data.elm index f88a36cc..340be3b2 100644 --- a/modules/webapp/src/main/elm/App/Data.elm +++ b/modules/webapp/src/main/elm/App/Data.elm @@ -19,6 +19,7 @@ import Api.Model.VersionInfo exposing (VersionInfo) import Browser exposing (UrlRequest) import Browser.Navigation exposing (Key) import Data.Flags exposing (Flags) +import Data.ServerEvent exposing (ServerEvent) import Data.UiSettings exposing (StoredUiSettings, UiSettings) import Data.UiTheme exposing (UiTheme) import Http @@ -192,7 +193,7 @@ type Msg | SetLanguage UiLanguage | ClientSettingsSaveResp UiSettings (Result Http.Error BasicResult) | ReceiveBrowserSettings StoredUiSettings - | ReceiveWsMessage String + | ReceiveWsMessage (Result String ServerEvent) | ToggleShowNewItemsArrived diff --git a/modules/webapp/src/main/elm/App/Update.elm b/modules/webapp/src/main/elm/App/Update.elm index 60bfcd47..c525945c 100644 --- a/modules/webapp/src/main/elm/App/Update.elm +++ b/modules/webapp/src/main/elm/App/Update.elm @@ -310,12 +310,8 @@ updateWithSub msg model = updateUserSettings texts lm model ReceiveWsMessage data -> - let - se = - Data.ServerEvent.fromString data - in - case se of - Just ItemProcessed -> + case data of + Ok ItemProcessed -> let newModel = { model | showNewItemsArrived = True } @@ -327,7 +323,10 @@ updateWithSub msg model = _ -> ( newModel, Cmd.none, Sub.none ) - Nothing -> + Ok (JobsWaiting n) -> + ( model, Cmd.none, Sub.none ) + + Err err -> ( model, Cmd.none, Sub.none ) ToggleShowNewItemsArrived -> diff --git a/modules/webapp/src/main/elm/Data/ServerEvent.elm b/modules/webapp/src/main/elm/Data/ServerEvent.elm index c4d84b72..5844eaaf 100644 --- a/modules/webapp/src/main/elm/Data/ServerEvent.elm +++ b/modules/webapp/src/main/elm/Data/ServerEvent.elm @@ -5,18 +5,37 @@ -} -module Data.ServerEvent exposing (ServerEvent(..), fromString) +module Data.ServerEvent exposing (ServerEvent(..), decode) + +import Json.Decode as D type ServerEvent = ItemProcessed + | JobsWaiting Int -fromString : String -> Maybe ServerEvent -fromString str = - case String.toLower str of +decoder : D.Decoder ServerEvent +decoder = + D.field "tag" D.string + |> D.andThen decodeTag + + +decode : D.Value -> Result String ServerEvent +decode json = + D.decodeValue decoder json + |> Result.mapError D.errorToString + + +decodeTag : String -> D.Decoder ServerEvent +decodeTag tag = + case tag of "item-processed" -> - Just ItemProcessed + D.succeed ItemProcessed + + "jobs-waiting" -> + D.field "content" D.int + |> D.map JobsWaiting _ -> - Nothing + D.fail ("Unknown tag: " ++ tag) diff --git a/modules/webapp/src/main/elm/Main.elm b/modules/webapp/src/main/elm/Main.elm index be80a076..be94b7ff 100644 --- a/modules/webapp/src/main/elm/Main.elm +++ b/modules/webapp/src/main/elm/Main.elm @@ -93,5 +93,5 @@ subscriptions model = Sub.batch [ model.subs , Ports.receiveUiSettings ReceiveBrowserSettings - , Ports.receiveWsMessage ReceiveWsMessage + , Ports.receiveServerEvent ReceiveWsMessage ] diff --git a/modules/webapp/src/main/elm/Ports.elm b/modules/webapp/src/main/elm/Ports.elm index c366bccc..8a348b85 100644 --- a/modules/webapp/src/main/elm/Ports.elm +++ b/modules/webapp/src/main/elm/Ports.elm @@ -10,8 +10,8 @@ port module Ports exposing , initClipboard , printElement , receiveCheckQueryResult + , receiveServerEvent , receiveUiSettings - , receiveWsMessage , removeAccount , requestUiSettings , setAccount @@ -20,8 +20,10 @@ port module Ports exposing import Api.Model.AuthResult exposing (AuthResult) import Data.QueryParseResult exposing (QueryParseResult) +import Data.ServerEvent exposing (ServerEvent) import Data.UiSettings exposing (StoredUiSettings) import Data.UiTheme exposing (UiTheme) +import Json.Decode as D {-| Save the result of authentication to local storage. @@ -58,7 +60,7 @@ port printElement : String -> Cmd msg {-| Receives messages from the websocket. -} -port receiveWsMessage : (String -> msg) -> Sub msg +port receiveWsMessage : (D.Value -> msg) -> Sub msg @@ -68,3 +70,8 @@ port receiveWsMessage : (String -> msg) -> Sub msg setUiTheme : UiTheme -> Cmd msg setUiTheme theme = internalSetUiTheme (Data.UiTheme.toString theme) + + +receiveServerEvent : (Result String ServerEvent -> msg) -> Sub msg +receiveServerEvent tagger = + receiveWsMessage (Data.ServerEvent.decode >> tagger) diff --git a/modules/webapp/src/main/webjar/docspell.js b/modules/webapp/src/main/webjar/docspell.js index 08aa6aed..c25bbbe0 100644 --- a/modules/webapp/src/main/webjar/docspell.js +++ b/modules/webapp/src/main/webjar/docspell.js @@ -157,8 +157,12 @@ function initWS() { console.log("Initialize websocket at " + url); dsWebSocket = new WebSocket(url); dsWebSocket.addEventListener("message", function(event) { - if (event.data != "keep-alive" && event.data) { - elmApp.ports.receiveWsMessage.send(event.data); + + if (event.data) { + var dataJSON = JSON.parse(event.data); + if (dataJSON.tag !== "keep-alive") { + elmApp.ports.receiveWsMessage.send(dataJSON); + } } }); }