Add websockets and notify frontend when an item is processed

This commit is contained in:
eikek 2021-11-06 21:32:07 +01:00
parent f38d520a1d
commit 3e58d97f72
17 changed files with 243 additions and 114 deletions

View File

@ -1,31 +0,0 @@
/*
* Copyright 2020 Eike K. & Contributors
*
* SPDX-License-Identifier: AGPL-3.0-or-later
*/
package docspell.backend.msg
import java.util.concurrent.atomic.AtomicLong
import docspell.pubsub.api.{Topic, TypedTopic}
import io.circe.generic.semiauto.{deriveDecoder, deriveEncoder}
import io.circe.{Decoder, Encoder}
final case class Ping(sender: String, num: Long)
object Ping {
implicit val jsonDecoder: Decoder[Ping] =
deriveDecoder[Ping]
implicit val jsonEncoder: Encoder[Ping] =
deriveEncoder[Ping]
private[this] val counter = new AtomicLong(0)
def next(sender: String): Ping =
Ping(sender, counter.getAndIncrement())
val topic: TypedTopic[Ping] =
TypedTopic[Ping](Topic("ping"))
}

View File

@ -19,5 +19,5 @@ object Topics {
/** A list of all topics. It is required to list every topic in use here! */
val all: NonEmptyList[TypedTopic[_]] =
NonEmptyList.of(Ping.topic, JobDone.topic, CancelJob.topic, jobsNotify)
NonEmptyList.of(JobDone.topic, CancelJob.topic, jobsNotify)
}

View File

@ -12,9 +12,7 @@ import io.circe.Decoder
import io.circe.parser._
trait StringSyntax {
implicit class EvenMoreStringOps(s: String) {
def asNonBlank: Option[String] =
Option(s).filter(_.trim.nonEmpty)
@ -24,5 +22,6 @@ trait StringSyntax {
value <- json.as[A]
} yield value
}
}
object StringSyntax extends StringSyntax

View File

@ -12,7 +12,7 @@ import fs2.concurrent.SignallingRef
import docspell.analysis.TextAnalyser
import docspell.backend.fulltext.CreateIndex
import docspell.backend.msg.{CancelJob, Ping, Topics}
import docspell.backend.msg.{CancelJob, Topics}
import docspell.backend.ops._
import docspell.common._
import docspell.ftsclient.FtsClient
@ -53,9 +53,6 @@ final class JoexAppImpl[F[_]: Async](
val scheduler: Scheduler[F],
val periodicScheduler: PeriodicScheduler[F]
) extends JoexApp[F] {
private[this] val logger: Logger[F] =
Logger.log4s(org.log4s.getLogger(s"Joex-${cfg.appId.id}"))
def init: F[Unit] = {
val run = scheduler.start.compile.drain
val prun = periodicScheduler.start.compile.drain
@ -71,9 +68,6 @@ final class JoexAppImpl[F[_]: Async](
def subscriptions =
for {
_ <- Async[F].start(pubSubT.subscribeSink(Ping.topic) { msg =>
logger.info(s">>>> PING $msg")
})
_ <- Async[F].start(pubSubT.subscribeSink(Topics.jobsNotify) { _ =>
scheduler.notifyChange
})

View File

@ -26,30 +26,30 @@ import org.http4s.client.dsl.Http4sClientDsl
import org.http4s.dsl.Http4sDsl
import org.http4s.{HttpRoutes, Uri}
/** A pubsub implementation that can be used across machines, but uses a rather
* inefficient protocol. The reason is to make it work with the current setup, i.e. not
* requiring to add another complex piece of software to the mix, like Kafka or RabbitMQ.
/** A pubsub implementation that can be used across machines, using a rather inefficient
* but simple protocol. It can therefore work with the current setup, i.e. not requiring
* to add another complex piece of software to the mix, like Kafka or RabbitMQ.
*
* However, the api should allow to be used on top of such a tool. This implementation
* can be used in a personal setting, where there are only a few nodes.
*
* How it works:
* How it works: Each node has a set of local subscribers and a http endpoint. If it
* publishes a message, it notifies all local subscribers and sends out a json message to
* all endpoints that are registered for this topic. If it receives a messagen through
* its endpoint, it notifies all local subscribers.
*
* It is build on the `Topic` class from fs2.concurrent. A map of a topic name to such a
* It is build on the `Topic` class from fs2.concurrent. A map of the name to such a
* `Topic` instance is maintained. To work across machines, the database is used as a
* synchronization point. Each subscriber must provide a http api and so its "callback"
* URL is added into the database to the list of remote subscribers.
* synchronization point. Each node must provide a http api and so its "callback" URL is
* added into the database associated to a topic name.
*
* When publishing a message, the message can be published to the internal fs2 topic.
* Then all URLs to this topic name are looked up in the database and the message is
* POSTed to each URL as JSON. The endpoint of each machine takes this message and
* publishes it to its own internal fs2.concurrent.Topic instance.
*
* Obviously, there are drawbacks: it is slow, because the messages go through http and
* connections must be opened/closed etc and the database is hit as well. Then it doesn't
* scale to lots of machines and messages. The upside is, that it works with the current
* setup and it should be good enough for personal use, where there are only a small
* amount of machines and messages.
* Obviously, this doesn't scale well to lots of machines and messages. It should be good
* enough for personal use, where there are only a small amount of machines and messages.
*
* The main use case for docspell is to communicate between the rest-server and job
* executor. It is for internal communication and all topics are known at compile time.

View File

@ -38,12 +38,6 @@ object Main extends IOApp {
pools = connectEC.map(Pools.apply)
rc <-
pools.use(p =>
RestServer
.stream[IO](cfg, p)
.compile
.drain
.as(ExitCode.Success)
)
pools.use(p => RestServer.serve[IO](cfg, p))
} yield rc
}

View File

@ -7,10 +7,8 @@
package docspell.restserver
import cats.effect._
import cats.implicits._
import docspell.backend.BackendApp
import docspell.backend.msg.{JobDone, Ping}
import docspell.common.Logger
import docspell.ftsclient.FtsClient
import docspell.ftssolr.SolrFtsClient
@ -35,24 +33,10 @@ object RestAppImpl {
ftsClient <- createFtsClient(cfg)(httpClient)
pubSubT = PubSubT(pubSub, logger)
backend <- BackendApp.create[F](cfg.backend, store, ftsClient, pubSubT)
_ <- Resource.eval(subscriptions(backend, logger))
app = new RestAppImpl[F](cfg, backend)
} yield app
}
private def subscriptions[F[_]: Async](
backend: BackendApp[F],
logger: Logger[F]
): F[Unit] =
for {
_ <- Async[F].start(backend.pubSub.subscribeSink(Ping.topic) { msg =>
logger.info(s">>>> PING $msg")
})
_ <- Async[F].start(backend.pubSub.subscribeSink(JobDone.topic) { msg =>
logger.info(s">>>> Job Done $msg")
})
} yield ()
private def createFtsClient[F[_]: Async](
cfg: Config
)(client: Client[F]): Resource[F, FtsClient[F]] =

View File

@ -6,9 +6,12 @@
package docspell.restserver
import scala.concurrent.duration._
import cats.effect._
import cats.implicits._
import fs2.Stream
import fs2.concurrent.Topic
import docspell.backend.auth.{AuthToken, ShareToken}
import docspell.backend.msg.Topics
@ -19,6 +22,8 @@ import docspell.restserver.auth.OpenId
import docspell.restserver.http4s.EnvMiddleware
import docspell.restserver.routes._
import docspell.restserver.webapp._
import docspell.restserver.ws.OutputEvent.KeepAlive
import docspell.restserver.ws.{OutputEvent, WebSocketRoutes}
import docspell.store.Store
import org.http4s._
@ -30,63 +35,96 @@ import org.http4s.headers.Location
import org.http4s.implicits._
import org.http4s.server.Router
import org.http4s.server.middleware.Logger
import org.http4s.server.websocket.WebSocketBuilder2
object RestServer {
def stream[F[_]: Async](cfg: Config, pools: Pools): Stream[F, Nothing] = {
def serve[F[_]: Async](cfg: Config, pools: Pools): F[ExitCode] =
for {
wsTopic <- Topic[F, OutputEvent]
keepAlive = Stream
.awakeEvery[F](30.seconds)
.map(_ => KeepAlive)
.through(wsTopic.publish)
val templates = TemplateRoutes[F](cfg)
val app = for {
server =
Stream
.resource(createApp(cfg, pools))
.flatMap { case (restApp, pubSub, httpClient) =>
Stream(
Subscriptions(wsTopic, restApp.backend.pubSub),
BlazeServerBuilder[F]
.bindHttp(cfg.bind.port, cfg.bind.address)
.withoutBanner
.withHttpWebSocketApp(
createHttpApp(cfg, httpClient, pubSub, restApp, wsTopic)
)
.serve
.drain
)
}
exit <-
(server ++ Stream(keepAlive)).parJoinUnbounded.compile.drain.as(ExitCode.Success)
} yield exit
def createApp[F[_]: Async](
cfg: Config,
pools: Pools
): Resource[F, (RestApp[F], NaivePubSub[F], Client[F])] =
for {
httpClient <- BlazeClientBuilder[F].resource
store <- Store.create[F](
cfg.backend.jdbc,
cfg.backend.files.chunkSize,
pools.connectEC
)
httpClient <- BlazeClientBuilder[F].resource
pubSub <- NaivePubSub(cfg.pubSubConfig, store, httpClient)(Topics.all.map(_.topic))
restApp <- RestAppImpl.create[F](cfg, store, httpClient, pubSub)
httpClient <- BlazeClientBuilder[F].resource
httpApp = Router(
"/internal/pubsub" -> pubSub.receiveRoute,
"/api/info" -> routes.InfoRoutes(),
"/api/v1/open/" -> openRoutes(cfg, httpClient, restApp),
"/api/v1/sec/" -> Authenticate(restApp.backend.login, cfg.auth) { token =>
securedRoutes(cfg, restApp, token)
},
"/api/v1/admin" -> AdminAuth(cfg.adminEndpoint) {
adminRoutes(cfg, restApp)
},
"/api/v1/share" -> ShareAuth(restApp.backend.share, cfg.auth) { token =>
shareRoutes(cfg, restApp, token)
},
"/api/doc" -> templates.doc,
"/app/assets" -> EnvMiddleware(WebjarRoutes.appRoutes[F]),
"/app" -> EnvMiddleware(templates.app),
"/sw.js" -> EnvMiddleware(templates.serviceWorker),
"/" -> redirectTo("/app")
).orNotFound
} yield (restApp, pubSub, httpClient)
finalHttpApp = Logger.httpApp(logHeaders = false, logBody = false)(httpApp)
def createHttpApp[F[_]: Async](
cfg: Config,
httpClient: Client[F],
pubSub: NaivePubSub[F],
restApp: RestApp[F],
topic: Topic[F, OutputEvent]
)(
wsB: WebSocketBuilder2[F]
) = {
val templates = TemplateRoutes[F](cfg)
val httpApp = Router(
"/internal/pubsub" -> pubSub.receiveRoute,
"/api/info" -> routes.InfoRoutes(),
"/api/v1/open/" -> openRoutes(cfg, httpClient, restApp),
"/api/v1/sec/" -> Authenticate(restApp.backend.login, cfg.auth) { token =>
securedRoutes(cfg, restApp, wsB, topic, token)
},
"/api/v1/admin" -> AdminAuth(cfg.adminEndpoint) {
adminRoutes(cfg, restApp)
},
"/api/v1/share" -> ShareAuth(restApp.backend.share, cfg.auth) { token =>
shareRoutes(cfg, restApp, token)
},
"/api/doc" -> templates.doc,
"/app/assets" -> EnvMiddleware(WebjarRoutes.appRoutes[F]),
"/app" -> EnvMiddleware(templates.app),
"/sw.js" -> EnvMiddleware(templates.serviceWorker),
"/" -> redirectTo("/app")
).orNotFound
} yield finalHttpApp
Stream
.resource(app)
.flatMap(httpApp =>
BlazeServerBuilder[F]
.bindHttp(cfg.bind.port, cfg.bind.address)
.withHttpApp(httpApp)
.withoutBanner
.serve
)
}.drain
Logger.httpApp(logHeaders = false, logBody = false)(httpApp)
}
def securedRoutes[F[_]: Async](
cfg: Config,
restApp: RestApp[F],
wsB: WebSocketBuilder2[F],
topic: Topic[F, OutputEvent],
token: AuthToken
): HttpRoutes[F] =
Router(
"ws" -> WebSocketRoutes(token, topic, wsB),
"auth" -> LoginRoutes.session(restApp.backend.login, cfg, token),
"tag" -> TagRoutes(restApp.backend, token),
"equipment" -> EquipmentRoutes(restApp.backend, token),

View File

@ -0,0 +1,35 @@
/*
* Copyright 2020 Eike K. & Contributors
*
* SPDX-License-Identifier: AGPL-3.0-or-later
*/
package docspell.restserver
import fs2.Stream
import fs2.concurrent.Topic
import docspell.backend.msg.JobDone
import docspell.common._
import docspell.common.syntax.StringSyntax._
import docspell.pubsub.api.PubSubT
import docspell.restserver.ws.OutputEvent
/** Subscribes to those events from docspell that are forwarded to the websocket endpoints
*/
object Subscriptions {
def apply[F[_]](
wsTopic: Topic[F, OutputEvent],
pubSub: PubSubT[F]
): Stream[F, Nothing] =
jobDone(pubSub).through(wsTopic.publish)
def jobDone[F[_]](pubSub: PubSubT[F]): Stream[F, OutputEvent] =
pubSub
.subscribe(JobDone.topic)
.filter(m => m.body.task == ProcessItemArgs.taskName)
.map(m => m.body.args.parseJsonAs[ProcessItemArgs])
.collect { case Right(a) => OutputEvent.ItemProcessed(a.meta.collective) }
}

View File

@ -0,0 +1,11 @@
/*
* Copyright 2020 Eike K. & Contributors
*
* SPDX-License-Identifier: AGPL-3.0-or-later
*/
package docspell.restserver.ws
sealed trait InputMessage
object InputMessage {}

View File

@ -0,0 +1,32 @@
/*
* Copyright 2020 Eike K. & Contributors
*
* SPDX-License-Identifier: AGPL-3.0-or-later
*/
package docspell.restserver.ws
import docspell.backend.auth.AuthToken
import docspell.common._
sealed trait OutputEvent {
def forCollective(token: AuthToken): Boolean
def encode: String
}
object OutputEvent {
case object KeepAlive extends OutputEvent {
def forCollective(token: AuthToken): Boolean = true
def encode: String = "keep-alive"
}
final case class ItemProcessed(collective: Ident) extends OutputEvent {
def forCollective(token: AuthToken): Boolean =
token.account.collective == collective
def encode: String =
"item-processed"
}
}

View File

@ -0,0 +1,44 @@
/*
* Copyright 2020 Eike K. & Contributors
*
* SPDX-License-Identifier: AGPL-3.0-or-later
*/
package docspell.restserver.ws
import cats.effect.Async
import fs2.concurrent.Topic
import fs2.{Pipe, Stream}
import docspell.backend.auth.AuthToken
import org.http4s.HttpRoutes
import org.http4s.dsl.Http4sDsl
import org.http4s.server.websocket.WebSocketBuilder2
import org.http4s.websocket.WebSocketFrame
import org.http4s.websocket.WebSocketFrame.Text
object WebSocketRoutes {
def apply[F[_]: Async](
user: AuthToken,
topic: Topic[F, OutputEvent],
wsb: WebSocketBuilder2[F]
): HttpRoutes[F] = {
val dsl = new Http4sDsl[F] {}
import dsl._
HttpRoutes.of { case GET -> Root =>
val toClient: Stream[F, WebSocketFrame.Text] =
topic
.subscribe(500)
.filter(_.forCollective(user))
.map(msg => Text(msg.encode))
val toServer: Pipe[F, WebSocketFrame, Unit] =
_.map(_ => ())
wsb.build(toClient, toServer)
}
}
}

View File

@ -190,6 +190,7 @@ type Msg
| SetLanguage UiLanguage
| ClientSettingsSaveResp UiSettings (Result Http.Error BasicResult)
| ReceiveBrowserSettings StoredUiSettings
| ReceiveWsMessage String
defaultPage : Flags -> Page

View File

@ -308,6 +308,13 @@ updateWithSub msg model =
in
updateUserSettings texts lm model
ReceiveWsMessage data ->
let
_ =
Debug.log "WS-msg" data
in
( model, Cmd.none, Sub.none )
applyClientSettings : Messages -> Model -> UiSettings -> ( Model, Cmd Msg, Sub Msg )
applyClientSettings texts model settings =

View File

@ -93,4 +93,5 @@ subscriptions model =
Sub.batch
[ model.subs
, Ports.receiveUiSettings ReceiveBrowserSettings
, Ports.receiveWsMessage ReceiveWsMessage
]

View File

@ -11,6 +11,7 @@ port module Ports exposing
, printElement
, receiveCheckQueryResult
, receiveUiSettings
, receiveWsMessage
, removeAccount
, requestUiSettings
, setAccount
@ -55,6 +56,15 @@ and calls the print dialog.
port printElement : String -> Cmd msg
{-| Receives messages from the websocket.
-}
port receiveWsMessage : (String -> msg) -> Sub msg
--- Higher level functions based on ports
setUiTheme : UiTheme -> Cmd msg
setUiTheme theme =
internalSetUiTheme (Data.UiTheme.toString theme)

View File

@ -135,3 +135,13 @@ elmApp.ports.printElement.subscribe(function(id) {
}
}
});
var socket = new WebSocket('ws://localhost:7880/api/v1/sec/ws');
socket.addEventListener("message", function(event) {
if (event.data != "keep-alive" && event.data) {
elmApp.ports.receiveWsMessage.send(event.data);
}
});
// elmApp.ports.sendWsMessage.subscribe(function(msg) {
// socket.send(msg);
// });