diff --git a/modules/backend/src/main/scala/docspell/backend/msg/Ping.scala b/modules/backend/src/main/scala/docspell/backend/msg/Ping.scala deleted file mode 100644 index b300a76d..00000000 --- a/modules/backend/src/main/scala/docspell/backend/msg/Ping.scala +++ /dev/null @@ -1,31 +0,0 @@ -/* - * Copyright 2020 Eike K. & Contributors - * - * SPDX-License-Identifier: AGPL-3.0-or-later - */ - -package docspell.backend.msg - -import java.util.concurrent.atomic.AtomicLong - -import docspell.pubsub.api.{Topic, TypedTopic} - -import io.circe.generic.semiauto.{deriveDecoder, deriveEncoder} -import io.circe.{Decoder, Encoder} - -final case class Ping(sender: String, num: Long) - -object Ping { - implicit val jsonDecoder: Decoder[Ping] = - deriveDecoder[Ping] - - implicit val jsonEncoder: Encoder[Ping] = - deriveEncoder[Ping] - - private[this] val counter = new AtomicLong(0) - def next(sender: String): Ping = - Ping(sender, counter.getAndIncrement()) - - val topic: TypedTopic[Ping] = - TypedTopic[Ping](Topic("ping")) -} diff --git a/modules/backend/src/main/scala/docspell/backend/msg/Topics.scala b/modules/backend/src/main/scala/docspell/backend/msg/Topics.scala index 996ff7c6..26c594f7 100644 --- a/modules/backend/src/main/scala/docspell/backend/msg/Topics.scala +++ b/modules/backend/src/main/scala/docspell/backend/msg/Topics.scala @@ -19,5 +19,5 @@ object Topics { /** A list of all topics. It is required to list every topic in use here! */ val all: NonEmptyList[TypedTopic[_]] = - NonEmptyList.of(Ping.topic, JobDone.topic, CancelJob.topic, jobsNotify) + NonEmptyList.of(JobDone.topic, CancelJob.topic, jobsNotify) } diff --git a/modules/common/src/main/scala/docspell/common/syntax/StringSyntax.scala b/modules/common/src/main/scala/docspell/common/syntax/StringSyntax.scala index 6fc06440..40d5bf80 100644 --- a/modules/common/src/main/scala/docspell/common/syntax/StringSyntax.scala +++ b/modules/common/src/main/scala/docspell/common/syntax/StringSyntax.scala @@ -12,9 +12,7 @@ import io.circe.Decoder import io.circe.parser._ trait StringSyntax { - implicit class EvenMoreStringOps(s: String) { - def asNonBlank: Option[String] = Option(s).filter(_.trim.nonEmpty) @@ -24,5 +22,6 @@ trait StringSyntax { value <- json.as[A] } yield value } - } + +object StringSyntax extends StringSyntax diff --git a/modules/joex/src/main/scala/docspell/joex/JoexAppImpl.scala b/modules/joex/src/main/scala/docspell/joex/JoexAppImpl.scala index 85359092..2ec86d11 100644 --- a/modules/joex/src/main/scala/docspell/joex/JoexAppImpl.scala +++ b/modules/joex/src/main/scala/docspell/joex/JoexAppImpl.scala @@ -12,7 +12,7 @@ import fs2.concurrent.SignallingRef import docspell.analysis.TextAnalyser import docspell.backend.fulltext.CreateIndex -import docspell.backend.msg.{CancelJob, Ping, Topics} +import docspell.backend.msg.{CancelJob, Topics} import docspell.backend.ops._ import docspell.common._ import docspell.ftsclient.FtsClient @@ -53,9 +53,6 @@ final class JoexAppImpl[F[_]: Async]( val scheduler: Scheduler[F], val periodicScheduler: PeriodicScheduler[F] ) extends JoexApp[F] { - private[this] val logger: Logger[F] = - Logger.log4s(org.log4s.getLogger(s"Joex-${cfg.appId.id}")) - def init: F[Unit] = { val run = scheduler.start.compile.drain val prun = periodicScheduler.start.compile.drain @@ -71,9 +68,6 @@ final class JoexAppImpl[F[_]: Async]( def subscriptions = for { - _ <- Async[F].start(pubSubT.subscribeSink(Ping.topic) { msg => - logger.info(s">>>> PING $msg") - }) _ <- Async[F].start(pubSubT.subscribeSink(Topics.jobsNotify) { _ => scheduler.notifyChange }) diff --git a/modules/pubsub/naive/src/main/scala/docspell/pubsub/naive/NaivePubSub.scala b/modules/pubsub/naive/src/main/scala/docspell/pubsub/naive/NaivePubSub.scala index cb7370ad..16ad7096 100644 --- a/modules/pubsub/naive/src/main/scala/docspell/pubsub/naive/NaivePubSub.scala +++ b/modules/pubsub/naive/src/main/scala/docspell/pubsub/naive/NaivePubSub.scala @@ -26,30 +26,30 @@ import org.http4s.client.dsl.Http4sClientDsl import org.http4s.dsl.Http4sDsl import org.http4s.{HttpRoutes, Uri} -/** A pubsub implementation that can be used across machines, but uses a rather - * inefficient protocol. The reason is to make it work with the current setup, i.e. not - * requiring to add another complex piece of software to the mix, like Kafka or RabbitMQ. +/** A pubsub implementation that can be used across machines, using a rather inefficient + * but simple protocol. It can therefore work with the current setup, i.e. not requiring + * to add another complex piece of software to the mix, like Kafka or RabbitMQ. * * However, the api should allow to be used on top of such a tool. This implementation * can be used in a personal setting, where there are only a few nodes. * - * How it works: + * How it works: Each node has a set of local subscribers and a http endpoint. If it + * publishes a message, it notifies all local subscribers and sends out a json message to + * all endpoints that are registered for this topic. If it receives a messagen through + * its endpoint, it notifies all local subscribers. * - * It is build on the `Topic` class from fs2.concurrent. A map of a topic name to such a + * It is build on the `Topic` class from fs2.concurrent. A map of the name to such a * `Topic` instance is maintained. To work across machines, the database is used as a - * synchronization point. Each subscriber must provide a http api and so its "callback" - * URL is added into the database to the list of remote subscribers. + * synchronization point. Each node must provide a http api and so its "callback" URL is + * added into the database associated to a topic name. * * When publishing a message, the message can be published to the internal fs2 topic. * Then all URLs to this topic name are looked up in the database and the message is * POSTed to each URL as JSON. The endpoint of each machine takes this message and * publishes it to its own internal fs2.concurrent.Topic instance. * - * Obviously, there are drawbacks: it is slow, because the messages go through http and - * connections must be opened/closed etc and the database is hit as well. Then it doesn't - * scale to lots of machines and messages. The upside is, that it works with the current - * setup and it should be good enough for personal use, where there are only a small - * amount of machines and messages. + * Obviously, this doesn't scale well to lots of machines and messages. It should be good + * enough for personal use, where there are only a small amount of machines and messages. * * The main use case for docspell is to communicate between the rest-server and job * executor. It is for internal communication and all topics are known at compile time. diff --git a/modules/restserver/src/main/scala/docspell/restserver/Main.scala b/modules/restserver/src/main/scala/docspell/restserver/Main.scala index 5907ff41..61844bdd 100644 --- a/modules/restserver/src/main/scala/docspell/restserver/Main.scala +++ b/modules/restserver/src/main/scala/docspell/restserver/Main.scala @@ -38,12 +38,6 @@ object Main extends IOApp { pools = connectEC.map(Pools.apply) rc <- - pools.use(p => - RestServer - .stream[IO](cfg, p) - .compile - .drain - .as(ExitCode.Success) - ) + pools.use(p => RestServer.serve[IO](cfg, p)) } yield rc } diff --git a/modules/restserver/src/main/scala/docspell/restserver/RestAppImpl.scala b/modules/restserver/src/main/scala/docspell/restserver/RestAppImpl.scala index 313ec05e..65a5ad5e 100644 --- a/modules/restserver/src/main/scala/docspell/restserver/RestAppImpl.scala +++ b/modules/restserver/src/main/scala/docspell/restserver/RestAppImpl.scala @@ -7,10 +7,8 @@ package docspell.restserver import cats.effect._ -import cats.implicits._ import docspell.backend.BackendApp -import docspell.backend.msg.{JobDone, Ping} import docspell.common.Logger import docspell.ftsclient.FtsClient import docspell.ftssolr.SolrFtsClient @@ -35,24 +33,10 @@ object RestAppImpl { ftsClient <- createFtsClient(cfg)(httpClient) pubSubT = PubSubT(pubSub, logger) backend <- BackendApp.create[F](cfg.backend, store, ftsClient, pubSubT) - _ <- Resource.eval(subscriptions(backend, logger)) app = new RestAppImpl[F](cfg, backend) } yield app } - private def subscriptions[F[_]: Async]( - backend: BackendApp[F], - logger: Logger[F] - ): F[Unit] = - for { - _ <- Async[F].start(backend.pubSub.subscribeSink(Ping.topic) { msg => - logger.info(s">>>> PING $msg") - }) - _ <- Async[F].start(backend.pubSub.subscribeSink(JobDone.topic) { msg => - logger.info(s">>>> Job Done $msg") - }) - } yield () - private def createFtsClient[F[_]: Async]( cfg: Config )(client: Client[F]): Resource[F, FtsClient[F]] = diff --git a/modules/restserver/src/main/scala/docspell/restserver/RestServer.scala b/modules/restserver/src/main/scala/docspell/restserver/RestServer.scala index e5531fbe..707c7aeb 100644 --- a/modules/restserver/src/main/scala/docspell/restserver/RestServer.scala +++ b/modules/restserver/src/main/scala/docspell/restserver/RestServer.scala @@ -6,9 +6,12 @@ package docspell.restserver +import scala.concurrent.duration._ + import cats.effect._ import cats.implicits._ import fs2.Stream +import fs2.concurrent.Topic import docspell.backend.auth.{AuthToken, ShareToken} import docspell.backend.msg.Topics @@ -19,6 +22,8 @@ import docspell.restserver.auth.OpenId import docspell.restserver.http4s.EnvMiddleware import docspell.restserver.routes._ import docspell.restserver.webapp._ +import docspell.restserver.ws.OutputEvent.KeepAlive +import docspell.restserver.ws.{OutputEvent, WebSocketRoutes} import docspell.store.Store import org.http4s._ @@ -30,63 +35,96 @@ import org.http4s.headers.Location import org.http4s.implicits._ import org.http4s.server.Router import org.http4s.server.middleware.Logger +import org.http4s.server.websocket.WebSocketBuilder2 object RestServer { - def stream[F[_]: Async](cfg: Config, pools: Pools): Stream[F, Nothing] = { + def serve[F[_]: Async](cfg: Config, pools: Pools): F[ExitCode] = + for { + wsTopic <- Topic[F, OutputEvent] + keepAlive = Stream + .awakeEvery[F](30.seconds) + .map(_ => KeepAlive) + .through(wsTopic.publish) - val templates = TemplateRoutes[F](cfg) - val app = for { + server = + Stream + .resource(createApp(cfg, pools)) + .flatMap { case (restApp, pubSub, httpClient) => + Stream( + Subscriptions(wsTopic, restApp.backend.pubSub), + BlazeServerBuilder[F] + .bindHttp(cfg.bind.port, cfg.bind.address) + .withoutBanner + .withHttpWebSocketApp( + createHttpApp(cfg, httpClient, pubSub, restApp, wsTopic) + ) + .serve + .drain + ) + } + + exit <- + (server ++ Stream(keepAlive)).parJoinUnbounded.compile.drain.as(ExitCode.Success) + } yield exit + + def createApp[F[_]: Async]( + cfg: Config, + pools: Pools + ): Resource[F, (RestApp[F], NaivePubSub[F], Client[F])] = + for { + httpClient <- BlazeClientBuilder[F].resource store <- Store.create[F]( cfg.backend.jdbc, cfg.backend.files.chunkSize, pools.connectEC ) - httpClient <- BlazeClientBuilder[F].resource pubSub <- NaivePubSub(cfg.pubSubConfig, store, httpClient)(Topics.all.map(_.topic)) restApp <- RestAppImpl.create[F](cfg, store, httpClient, pubSub) - httpClient <- BlazeClientBuilder[F].resource - httpApp = Router( - "/internal/pubsub" -> pubSub.receiveRoute, - "/api/info" -> routes.InfoRoutes(), - "/api/v1/open/" -> openRoutes(cfg, httpClient, restApp), - "/api/v1/sec/" -> Authenticate(restApp.backend.login, cfg.auth) { token => - securedRoutes(cfg, restApp, token) - }, - "/api/v1/admin" -> AdminAuth(cfg.adminEndpoint) { - adminRoutes(cfg, restApp) - }, - "/api/v1/share" -> ShareAuth(restApp.backend.share, cfg.auth) { token => - shareRoutes(cfg, restApp, token) - }, - "/api/doc" -> templates.doc, - "/app/assets" -> EnvMiddleware(WebjarRoutes.appRoutes[F]), - "/app" -> EnvMiddleware(templates.app), - "/sw.js" -> EnvMiddleware(templates.serviceWorker), - "/" -> redirectTo("/app") - ).orNotFound + } yield (restApp, pubSub, httpClient) - finalHttpApp = Logger.httpApp(logHeaders = false, logBody = false)(httpApp) + def createHttpApp[F[_]: Async]( + cfg: Config, + httpClient: Client[F], + pubSub: NaivePubSub[F], + restApp: RestApp[F], + topic: Topic[F, OutputEvent] + )( + wsB: WebSocketBuilder2[F] + ) = { + val templates = TemplateRoutes[F](cfg) + val httpApp = Router( + "/internal/pubsub" -> pubSub.receiveRoute, + "/api/info" -> routes.InfoRoutes(), + "/api/v1/open/" -> openRoutes(cfg, httpClient, restApp), + "/api/v1/sec/" -> Authenticate(restApp.backend.login, cfg.auth) { token => + securedRoutes(cfg, restApp, wsB, topic, token) + }, + "/api/v1/admin" -> AdminAuth(cfg.adminEndpoint) { + adminRoutes(cfg, restApp) + }, + "/api/v1/share" -> ShareAuth(restApp.backend.share, cfg.auth) { token => + shareRoutes(cfg, restApp, token) + }, + "/api/doc" -> templates.doc, + "/app/assets" -> EnvMiddleware(WebjarRoutes.appRoutes[F]), + "/app" -> EnvMiddleware(templates.app), + "/sw.js" -> EnvMiddleware(templates.serviceWorker), + "/" -> redirectTo("/app") + ).orNotFound - } yield finalHttpApp - - Stream - .resource(app) - .flatMap(httpApp => - BlazeServerBuilder[F] - .bindHttp(cfg.bind.port, cfg.bind.address) - .withHttpApp(httpApp) - .withoutBanner - .serve - ) - }.drain + Logger.httpApp(logHeaders = false, logBody = false)(httpApp) + } def securedRoutes[F[_]: Async]( cfg: Config, restApp: RestApp[F], + wsB: WebSocketBuilder2[F], + topic: Topic[F, OutputEvent], token: AuthToken ): HttpRoutes[F] = Router( + "ws" -> WebSocketRoutes(token, topic, wsB), "auth" -> LoginRoutes.session(restApp.backend.login, cfg, token), "tag" -> TagRoutes(restApp.backend, token), "equipment" -> EquipmentRoutes(restApp.backend, token), diff --git a/modules/restserver/src/main/scala/docspell/restserver/Subscriptions.scala b/modules/restserver/src/main/scala/docspell/restserver/Subscriptions.scala new file mode 100644 index 00000000..a574f0f2 --- /dev/null +++ b/modules/restserver/src/main/scala/docspell/restserver/Subscriptions.scala @@ -0,0 +1,35 @@ +/* + * Copyright 2020 Eike K. & Contributors + * + * SPDX-License-Identifier: AGPL-3.0-or-later + */ + +package docspell.restserver + +import fs2.Stream +import fs2.concurrent.Topic + +import docspell.backend.msg.JobDone +import docspell.common._ +import docspell.common.syntax.StringSyntax._ +import docspell.pubsub.api.PubSubT +import docspell.restserver.ws.OutputEvent + +/** Subscribes to those events from docspell that are forwarded to the websocket endpoints + */ +object Subscriptions { + + def apply[F[_]]( + wsTopic: Topic[F, OutputEvent], + pubSub: PubSubT[F] + ): Stream[F, Nothing] = + jobDone(pubSub).through(wsTopic.publish) + + def jobDone[F[_]](pubSub: PubSubT[F]): Stream[F, OutputEvent] = + pubSub + .subscribe(JobDone.topic) + .filter(m => m.body.task == ProcessItemArgs.taskName) + .map(m => m.body.args.parseJsonAs[ProcessItemArgs]) + .collect { case Right(a) => OutputEvent.ItemProcessed(a.meta.collective) } + +} diff --git a/modules/restserver/src/main/scala/docspell/restserver/ws/InputMessage.scala b/modules/restserver/src/main/scala/docspell/restserver/ws/InputMessage.scala new file mode 100644 index 00000000..4ea8dc02 --- /dev/null +++ b/modules/restserver/src/main/scala/docspell/restserver/ws/InputMessage.scala @@ -0,0 +1,11 @@ +/* + * Copyright 2020 Eike K. & Contributors + * + * SPDX-License-Identifier: AGPL-3.0-or-later + */ + +package docspell.restserver.ws + +sealed trait InputMessage + +object InputMessage {} diff --git a/modules/restserver/src/main/scala/docspell/restserver/ws/OutputEvent.scala b/modules/restserver/src/main/scala/docspell/restserver/ws/OutputEvent.scala new file mode 100644 index 00000000..8567f3a7 --- /dev/null +++ b/modules/restserver/src/main/scala/docspell/restserver/ws/OutputEvent.scala @@ -0,0 +1,32 @@ +/* + * Copyright 2020 Eike K. & Contributors + * + * SPDX-License-Identifier: AGPL-3.0-or-later + */ + +package docspell.restserver.ws + +import docspell.backend.auth.AuthToken +import docspell.common._ + +sealed trait OutputEvent { + def forCollective(token: AuthToken): Boolean + def encode: String +} + +object OutputEvent { + + case object KeepAlive extends OutputEvent { + def forCollective(token: AuthToken): Boolean = true + def encode: String = "keep-alive" + } + + final case class ItemProcessed(collective: Ident) extends OutputEvent { + def forCollective(token: AuthToken): Boolean = + token.account.collective == collective + + def encode: String = + "item-processed" + } + +} diff --git a/modules/restserver/src/main/scala/docspell/restserver/ws/WebSocketRoutes.scala b/modules/restserver/src/main/scala/docspell/restserver/ws/WebSocketRoutes.scala new file mode 100644 index 00000000..022420b2 --- /dev/null +++ b/modules/restserver/src/main/scala/docspell/restserver/ws/WebSocketRoutes.scala @@ -0,0 +1,44 @@ +/* + * Copyright 2020 Eike K. & Contributors + * + * SPDX-License-Identifier: AGPL-3.0-or-later + */ + +package docspell.restserver.ws + +import cats.effect.Async +import fs2.concurrent.Topic +import fs2.{Pipe, Stream} + +import docspell.backend.auth.AuthToken + +import org.http4s.HttpRoutes +import org.http4s.dsl.Http4sDsl +import org.http4s.server.websocket.WebSocketBuilder2 +import org.http4s.websocket.WebSocketFrame +import org.http4s.websocket.WebSocketFrame.Text + +object WebSocketRoutes { + + def apply[F[_]: Async]( + user: AuthToken, + topic: Topic[F, OutputEvent], + wsb: WebSocketBuilder2[F] + ): HttpRoutes[F] = { + val dsl = new Http4sDsl[F] {} + import dsl._ + + HttpRoutes.of { case GET -> Root => + val toClient: Stream[F, WebSocketFrame.Text] = + topic + .subscribe(500) + .filter(_.forCollective(user)) + .map(msg => Text(msg.encode)) + + val toServer: Pipe[F, WebSocketFrame, Unit] = + _.map(_ => ()) + + wsb.build(toClient, toServer) + } + } +} diff --git a/modules/webapp/src/main/elm/App/Data.elm b/modules/webapp/src/main/elm/App/Data.elm index c64c192a..73a813f4 100644 --- a/modules/webapp/src/main/elm/App/Data.elm +++ b/modules/webapp/src/main/elm/App/Data.elm @@ -190,6 +190,7 @@ type Msg | SetLanguage UiLanguage | ClientSettingsSaveResp UiSettings (Result Http.Error BasicResult) | ReceiveBrowserSettings StoredUiSettings + | ReceiveWsMessage String defaultPage : Flags -> Page diff --git a/modules/webapp/src/main/elm/App/Update.elm b/modules/webapp/src/main/elm/App/Update.elm index 47d7ad9b..f3b7e22f 100644 --- a/modules/webapp/src/main/elm/App/Update.elm +++ b/modules/webapp/src/main/elm/App/Update.elm @@ -308,6 +308,13 @@ updateWithSub msg model = in updateUserSettings texts lm model + ReceiveWsMessage data -> + let + _ = + Debug.log "WS-msg" data + in + ( model, Cmd.none, Sub.none ) + applyClientSettings : Messages -> Model -> UiSettings -> ( Model, Cmd Msg, Sub Msg ) applyClientSettings texts model settings = diff --git a/modules/webapp/src/main/elm/Main.elm b/modules/webapp/src/main/elm/Main.elm index a5b7eda3..be80a076 100644 --- a/modules/webapp/src/main/elm/Main.elm +++ b/modules/webapp/src/main/elm/Main.elm @@ -93,4 +93,5 @@ subscriptions model = Sub.batch [ model.subs , Ports.receiveUiSettings ReceiveBrowserSettings + , Ports.receiveWsMessage ReceiveWsMessage ] diff --git a/modules/webapp/src/main/elm/Ports.elm b/modules/webapp/src/main/elm/Ports.elm index 233e6c63..c366bccc 100644 --- a/modules/webapp/src/main/elm/Ports.elm +++ b/modules/webapp/src/main/elm/Ports.elm @@ -11,6 +11,7 @@ port module Ports exposing , printElement , receiveCheckQueryResult , receiveUiSettings + , receiveWsMessage , removeAccount , requestUiSettings , setAccount @@ -55,6 +56,15 @@ and calls the print dialog. port printElement : String -> Cmd msg +{-| Receives messages from the websocket. +-} +port receiveWsMessage : (String -> msg) -> Sub msg + + + +--- Higher level functions based on ports + + setUiTheme : UiTheme -> Cmd msg setUiTheme theme = internalSetUiTheme (Data.UiTheme.toString theme) diff --git a/modules/webapp/src/main/webjar/docspell.js b/modules/webapp/src/main/webjar/docspell.js index ef4ef03c..e70a00bd 100644 --- a/modules/webapp/src/main/webjar/docspell.js +++ b/modules/webapp/src/main/webjar/docspell.js @@ -135,3 +135,13 @@ elmApp.ports.printElement.subscribe(function(id) { } } }); + +var socket = new WebSocket('ws://localhost:7880/api/v1/sec/ws'); +socket.addEventListener("message", function(event) { + if (event.data != "keep-alive" && event.data) { + elmApp.ports.receiveWsMessage.send(event.data); + } +}); +// elmApp.ports.sendWsMessage.subscribe(function(msg) { +// socket.send(msg); +// });