Encode ws messages as JSON

This commit is contained in:
eikek 2021-11-11 21:55:31 +01:00
parent d0f3d54060
commit cf933b60a7
10 changed files with 99 additions and 46 deletions

View File

@ -94,7 +94,7 @@ final class JoexAppImpl[F[_]: Async](
AllPreviewsTask
.job(MakePreviewArgs.StoreMode.WhenMissing, None)
.flatMap(queue.insertIfNew) *>
AllPageCountTask.job.flatMap(queue.insertIfNew)
AllPageCountTask.job.flatMap(queue.insertIfNew).as(())
private def scheduleEmptyTrashTasks: F[Unit] =
store

View File

@ -8,25 +8,48 @@ package docspell.restserver.ws
import docspell.backend.auth.AuthToken
import docspell.common._
import io.circe._
import io.circe.syntax._
import io.circe.generic.semiauto.deriveEncoder
/** The event that is sent to clients through a websocket connection. All events are
* encoded as JSON.
*/
sealed trait OutputEvent {
def forCollective(token: AuthToken): Boolean
def encode: String
def asJson: Json
def encode: String =
asJson.noSpaces
}
object OutputEvent {
case object KeepAlive extends OutputEvent {
def forCollective(token: AuthToken): Boolean = true
def encode: String = "keep-alive"
def asJson: Json =
Msg("keep-alive", ()).asJson
}
final case class ItemProcessed(collective: Ident) extends OutputEvent {
def forCollective(token: AuthToken): Boolean =
token.account.collective == collective
def encode: String =
"item-processed"
def asJson: Json =
Msg("item-processed", ()).asJson
}
final case class JobsWaiting(group: Ident, count: Int) extends OutputEvent {
def forCollective(token: AuthToken): Boolean =
token.account.collective == group
def asJson: Json =
Msg("jobs-waiting", count).asJson
}
private case class Msg[A](tag: String, content: A)
private object Msg {
@scala.annotation.nowarn
implicit def jsonEncoder[A: Encoder]: Encoder[Msg[A]] =
deriveEncoder
}
}

View File

@ -9,9 +9,7 @@ package docspell.restserver.ws
import cats.effect.Async
import fs2.concurrent.Topic
import fs2.{Pipe, Stream}
import docspell.backend.auth.AuthToken
import org.http4s.HttpRoutes
import org.http4s.dsl.Http4sDsl
import org.http4s.server.websocket.WebSocketBuilder2

View File

@ -10,12 +10,11 @@ import cats.effect._
import cats.implicits._
import docspell.common._
import docspell.common.syntax.all._
import docspell.store.Store
import docspell.store.queries.QJob
import docspell.store.records.RJob
import org.log4s._
import org.log4s.getLogger
trait JobQueue[F[_]] {
@ -29,11 +28,11 @@ trait JobQueue[F[_]] {
*
* If the job has no tracker defined, it is simply inserted.
*/
def insertIfNew(job: RJob): F[Unit]
def insertIfNew(job: RJob): F[Boolean]
def insertAll(jobs: Seq[RJob]): F[Unit]
def insertAll(jobs: Seq[RJob]): F[Int]
def insertAllIfNew(jobs: Seq[RJob]): F[Unit]
def insertAllIfNew(jobs: Seq[RJob]): F[Int]
def nextJob(
prio: Ident => F[Priority],
@ -43,10 +42,9 @@ trait JobQueue[F[_]] {
}
object JobQueue {
private[this] val logger = getLogger
def apply[F[_]: Async](store: Store[F]): Resource[F, JobQueue[F]] =
Resource.pure[F, JobQueue[F]](new JobQueue[F] {
private[this] val logger = Logger.log4s(getLogger)
def nextJob(
prio: Ident => F[Priority],
@ -54,7 +52,7 @@ object JobQueue {
retryPause: Duration
): F[Option[RJob]] =
logger
.ftrace("Select next job") *> QJob.takeNextJob(store)(prio, worker, retryPause)
.trace("Select next job") *> QJob.takeNextJob(store)(prio, worker, retryPause)
def insert(job: RJob): F[Unit] =
store
@ -66,7 +64,7 @@ object JobQueue {
else ().pure[F]
}
def insertIfNew(job: RJob): F[Unit] =
def insertIfNew(job: RJob): F[Boolean] =
for {
rj <- job.tracker match {
case Some(tid) =>
@ -75,26 +73,30 @@ object JobQueue {
None.pure[F]
}
ret <-
if (rj.isDefined) ().pure[F]
else insert(job)
if (rj.isDefined) false.pure[F]
else insert(job).as(true)
} yield ret
def insertAll(jobs: Seq[RJob]): F[Unit] =
def insertAll(jobs: Seq[RJob]): F[Int] =
jobs.toList
.traverse(j => insert(j).attempt)
.map(_.foreach {
case Right(()) =>
.flatMap(_.traverse {
case Right(()) => 1.pure[F]
case Left(ex) =>
logger.error(ex)("Could not insert job. Skipping it.")
})
logger.error(ex)("Could not insert job. Skipping it.").as(0)
def insertAllIfNew(jobs: Seq[RJob]): F[Unit] =
})
.map(_.sum)
def insertAllIfNew(jobs: Seq[RJob]): F[Int] =
jobs.toList
.traverse(j => insertIfNew(j).attempt)
.map(_.foreach {
case Right(()) =>
.flatMap(_.traverse {
case Right(true) => 1.pure[F]
case Right(false) => 0.pure[F]
case Left(ex) =>
logger.error(ex)("Could not insert job. Skipping it.")
logger.error(ex)("Could not insert job. Skipping it.").as(0)
})
.map(_.sum)
})
}

View File

@ -19,6 +19,7 @@ import Api.Model.VersionInfo exposing (VersionInfo)
import Browser exposing (UrlRequest)
import Browser.Navigation exposing (Key)
import Data.Flags exposing (Flags)
import Data.ServerEvent exposing (ServerEvent)
import Data.UiSettings exposing (StoredUiSettings, UiSettings)
import Data.UiTheme exposing (UiTheme)
import Http
@ -192,7 +193,7 @@ type Msg
| SetLanguage UiLanguage
| ClientSettingsSaveResp UiSettings (Result Http.Error BasicResult)
| ReceiveBrowserSettings StoredUiSettings
| ReceiveWsMessage String
| ReceiveWsMessage (Result String ServerEvent)
| ToggleShowNewItemsArrived

View File

@ -310,12 +310,8 @@ updateWithSub msg model =
updateUserSettings texts lm model
ReceiveWsMessage data ->
let
se =
Data.ServerEvent.fromString data
in
case se of
Just ItemProcessed ->
case data of
Ok ItemProcessed ->
let
newModel =
{ model | showNewItemsArrived = True }
@ -327,7 +323,10 @@ updateWithSub msg model =
_ ->
( newModel, Cmd.none, Sub.none )
Nothing ->
Ok (JobsWaiting n) ->
( model, Cmd.none, Sub.none )
Err err ->
( model, Cmd.none, Sub.none )
ToggleShowNewItemsArrived ->

View File

@ -5,18 +5,37 @@
-}
module Data.ServerEvent exposing (ServerEvent(..), fromString)
module Data.ServerEvent exposing (ServerEvent(..), decode)
import Json.Decode as D
type ServerEvent
= ItemProcessed
| JobsWaiting Int
fromString : String -> Maybe ServerEvent
fromString str =
case String.toLower str of
decoder : D.Decoder ServerEvent
decoder =
D.field "tag" D.string
|> D.andThen decodeTag
decode : D.Value -> Result String ServerEvent
decode json =
D.decodeValue decoder json
|> Result.mapError D.errorToString
decodeTag : String -> D.Decoder ServerEvent
decodeTag tag =
case tag of
"item-processed" ->
Just ItemProcessed
D.succeed ItemProcessed
"jobs-waiting" ->
D.field "content" D.int
|> D.map JobsWaiting
_ ->
Nothing
D.fail ("Unknown tag: " ++ tag)

View File

@ -93,5 +93,5 @@ subscriptions model =
Sub.batch
[ model.subs
, Ports.receiveUiSettings ReceiveBrowserSettings
, Ports.receiveWsMessage ReceiveWsMessage
, Ports.receiveServerEvent ReceiveWsMessage
]

View File

@ -10,8 +10,8 @@ port module Ports exposing
, initClipboard
, printElement
, receiveCheckQueryResult
, receiveServerEvent
, receiveUiSettings
, receiveWsMessage
, removeAccount
, requestUiSettings
, setAccount
@ -20,8 +20,10 @@ port module Ports exposing
import Api.Model.AuthResult exposing (AuthResult)
import Data.QueryParseResult exposing (QueryParseResult)
import Data.ServerEvent exposing (ServerEvent)
import Data.UiSettings exposing (StoredUiSettings)
import Data.UiTheme exposing (UiTheme)
import Json.Decode as D
{-| Save the result of authentication to local storage.
@ -58,7 +60,7 @@ port printElement : String -> Cmd msg
{-| Receives messages from the websocket.
-}
port receiveWsMessage : (String -> msg) -> Sub msg
port receiveWsMessage : (D.Value -> msg) -> Sub msg
@ -68,3 +70,8 @@ port receiveWsMessage : (String -> msg) -> Sub msg
setUiTheme : UiTheme -> Cmd msg
setUiTheme theme =
internalSetUiTheme (Data.UiTheme.toString theme)
receiveServerEvent : (Result String ServerEvent -> msg) -> Sub msg
receiveServerEvent tagger =
receiveWsMessage (Data.ServerEvent.decode >> tagger)

View File

@ -157,8 +157,12 @@ function initWS() {
console.log("Initialize websocket at " + url);
dsWebSocket = new WebSocket(url);
dsWebSocket.addEventListener("message", function(event) {
if (event.data != "keep-alive" && event.data) {
elmApp.ports.receiveWsMessage.send(event.data);
if (event.data) {
var dataJSON = JSON.parse(event.data);
if (dataJSON.tag !== "keep-alive") {
elmApp.ports.receiveWsMessage.send(dataJSON);
}
}
});
}