From d483d9f176e63f4e8a9ed60c8c72bc00902cb4e8 Mon Sep 17 00:00:00 2001 From: eikek Date: Tue, 2 Nov 2021 00:41:16 +0100 Subject: [PATCH 1/7] Initial naive pubsub impl generalising from current setup --- build.sbt | 34 ++- .../scala/docspell/pubsub/api/Message.scala | 28 +++ .../docspell/pubsub/api/MessageHead.scala | 22 ++ .../scala/docspell/pubsub/api/PubSub.scala | 20 ++ .../scala/docspell/pubsub/api/PubSubT.scala | 57 +++++ .../scala/docspell/pubsub/api/Topic.scala | 26 +++ .../scala/docspell/pubsub/api/Topics.scala | 47 ++++ .../docspell/pubsub/api/TypedTopic.scala | 29 +++ .../docspell/pubsub/naive/NaivePubSub.scala | 212 ++++++++++++++++++ .../docspell/pubsub/naive/PubSubConfig.scala | 11 + .../docspell/pubsub/naive/Fixtures.scala | 85 +++++++ .../docspell/pubsub/naive/HttpClientOps.scala | 55 +++++ .../pubsub/naive/NaivePubSubTest.scala | 128 +++++++++++ .../db/migration/h2/V1.28.0__pubsub.sql | 8 + .../db/migration/mariadb/V1.28.0__pubsub.sql | 8 + .../migration/postgresql/V1.28.0__pubsub.sql | 8 + .../docspell/store/records/RPubSub.scala | 86 +++++++ .../store/src/test/resources/logback-test.xml | 2 +- 18 files changed, 864 insertions(+), 2 deletions(-) create mode 100644 modules/pubsub/api/src/main/scala/docspell/pubsub/api/Message.scala create mode 100644 modules/pubsub/api/src/main/scala/docspell/pubsub/api/MessageHead.scala create mode 100644 modules/pubsub/api/src/main/scala/docspell/pubsub/api/PubSub.scala create mode 100644 modules/pubsub/api/src/main/scala/docspell/pubsub/api/PubSubT.scala create mode 100644 modules/pubsub/api/src/main/scala/docspell/pubsub/api/Topic.scala create mode 100644 modules/pubsub/api/src/main/scala/docspell/pubsub/api/Topics.scala create mode 100644 modules/pubsub/api/src/main/scala/docspell/pubsub/api/TypedTopic.scala create mode 100644 modules/pubsub/naive/src/main/scala/docspell/pubsub/naive/NaivePubSub.scala create mode 100644 modules/pubsub/naive/src/main/scala/docspell/pubsub/naive/PubSubConfig.scala create mode 100644 modules/pubsub/naive/src/test/scala/docspell/pubsub/naive/Fixtures.scala create mode 100644 modules/pubsub/naive/src/test/scala/docspell/pubsub/naive/HttpClientOps.scala create mode 100644 modules/pubsub/naive/src/test/scala/docspell/pubsub/naive/NaivePubSubTest.scala create mode 100644 modules/store/src/main/resources/db/migration/h2/V1.28.0__pubsub.sql create mode 100644 modules/store/src/main/resources/db/migration/mariadb/V1.28.0__pubsub.sql create mode 100644 modules/store/src/main/resources/db/migration/postgresql/V1.28.0__pubsub.sql create mode 100644 modules/store/src/main/scala/docspell/store/records/RPubSub.scala diff --git a/build.sbt b/build.sbt index 82ebe6c2..b47c36ce 100644 --- a/build.sbt +++ b/build.sbt @@ -410,6 +410,36 @@ val store = project ) .dependsOn(common, query.jvm, totp, files) +val pubsubApi = project + .in(file("modules/pubsub/api")) + .disablePlugins(RevolverPlugin) + .settings(sharedSettings) + .settings(testSettingsMUnit) + .settings( + name := "docspell-pubsub-api", + addCompilerPlugin(Dependencies.kindProjectorPlugin), + libraryDependencies ++= + Dependencies.fs2 + ) + .dependsOn(common) + +val pubsubNaive = project + .in(file("modules/pubsub/naive")) + .disablePlugins(RevolverPlugin) + .settings(sharedSettings) + .settings(testSettingsMUnit) + .settings( + name := "docspell-pubsub-naive", + addCompilerPlugin(Dependencies.kindProjectorPlugin), + libraryDependencies ++= + Dependencies.fs2 ++ + Dependencies.http4sCirce ++ + Dependencies.http4sDsl ++ + Dependencies.http4sClient ++ + Dependencies.circe + ) + .dependsOn(common, pubsubApi, store % "compile->compile;test->test") + val extract = project .in(file("modules/extract")) .disablePlugins(RevolverPlugin) @@ -781,7 +811,9 @@ val root = project query.jvm, query.js, totp, - oidc + oidc, + pubsubApi, + pubsubNaive ) // --- Helpers diff --git a/modules/pubsub/api/src/main/scala/docspell/pubsub/api/Message.scala b/modules/pubsub/api/src/main/scala/docspell/pubsub/api/Message.scala new file mode 100644 index 00000000..c6d27c02 --- /dev/null +++ b/modules/pubsub/api/src/main/scala/docspell/pubsub/api/Message.scala @@ -0,0 +1,28 @@ +/* + * Copyright 2020 Eike K. & Contributors + * + * SPDX-License-Identifier: AGPL-3.0-or-later + */ + +package docspell.pubsub.api + +import io.circe.generic.semiauto.{deriveDecoder, deriveEncoder} +import io.circe.{Decoder, Encoder, Json} + +final case class Message[A](head: MessageHead, body: A) {} + +object Message { + implicit val jsonDecoderJson: Decoder[Message[Json]] = + deriveDecoder[Message[Json]] + + implicit val jsonEncoderJson: Encoder[Message[Json]] = + deriveEncoder[Message[Json]] + + implicit def jsonDecoder[A](implicit da: Decoder[A]): Decoder[Message[A]] = + jsonDecoderJson.emap(mj => + da.decodeJson(mj.body).map(b => mj.copy(body = b)).left.map(_.message) + ) + + implicit def jsonEncoder[A](implicit ea: Encoder[A]): Encoder[Message[A]] = + jsonEncoderJson.contramap(m => m.copy(body = ea(m.body))) +} diff --git a/modules/pubsub/api/src/main/scala/docspell/pubsub/api/MessageHead.scala b/modules/pubsub/api/src/main/scala/docspell/pubsub/api/MessageHead.scala new file mode 100644 index 00000000..c604b52d --- /dev/null +++ b/modules/pubsub/api/src/main/scala/docspell/pubsub/api/MessageHead.scala @@ -0,0 +1,22 @@ +/* + * Copyright 2020 Eike K. & Contributors + * + * SPDX-License-Identifier: AGPL-3.0-or-later + */ + +package docspell.pubsub.api + +import docspell.common.{Ident, Timestamp} + +import io.circe.generic.semiauto.{deriveDecoder, deriveEncoder} +import io.circe.{Decoder, Encoder} + +final case class MessageHead(id: Ident, send: Timestamp, topic: Topic) + +object MessageHead { + implicit val jsonDecoder: Decoder[MessageHead] = + deriveDecoder[MessageHead] + + implicit val jsonEncoder: Encoder[MessageHead] = + deriveEncoder[MessageHead] +} diff --git a/modules/pubsub/api/src/main/scala/docspell/pubsub/api/PubSub.scala b/modules/pubsub/api/src/main/scala/docspell/pubsub/api/PubSub.scala new file mode 100644 index 00000000..4ae9ef6a --- /dev/null +++ b/modules/pubsub/api/src/main/scala/docspell/pubsub/api/PubSub.scala @@ -0,0 +1,20 @@ +/* + * Copyright 2020 Eike K. & Contributors + * + * SPDX-License-Identifier: AGPL-3.0-or-later + */ + +package docspell.pubsub.api + +import cats.data.NonEmptyList +import fs2.{Pipe, Stream} + +import io.circe.Json + +trait PubSub[F[_]] { + def publish1(topic: Topic, msg: Json): F[MessageHead] + + def publish(topic: Topic): Pipe[F, Json, MessageHead] + + def subscribe(topics: NonEmptyList[Topic]): Stream[F, Message[Json]] +} diff --git a/modules/pubsub/api/src/main/scala/docspell/pubsub/api/PubSubT.scala b/modules/pubsub/api/src/main/scala/docspell/pubsub/api/PubSubT.scala new file mode 100644 index 00000000..176e8885 --- /dev/null +++ b/modules/pubsub/api/src/main/scala/docspell/pubsub/api/PubSubT.scala @@ -0,0 +1,57 @@ +/* + * Copyright 2020 Eike K. & Contributors + * + * SPDX-License-Identifier: AGPL-3.0-or-later + */ + +package docspell.pubsub.api + +import cats.data.NonEmptyList +import fs2.{Pipe, Stream} + +import docspell.common.Logger + +trait PubSubT[F[_], P <: PubSub[F]] { + + def publish1[A](topic: TypedTopic[A], msg: A): F[MessageHead] + + def publish[A](topic: TypedTopic[A]): Pipe[F, A, MessageHead] + + def subscribe[A](topic: TypedTopic[A]): Stream[F, Message[A]] + + def delegate: P + + def withDelegate[R <: PubSub[F]](delegate: R): PubSubT[F, R] +} + +object PubSubT { + + def apply[F[_], P <: PubSub[F]](pubSub: P, logger: Logger[F]): PubSubT[F, P] = + new PubSubT[F, P] { + def publish1[A](topic: TypedTopic[A], msg: A): F[MessageHead] = + pubSub.publish1(topic.topic, topic.codec(msg)) + + def publish[A](topic: TypedTopic[A]): Pipe[F, A, MessageHead] = + _.map(topic.codec.apply).through(pubSub.publish(topic.topic)) + + def subscribe[A](topic: TypedTopic[A]): Stream[F, Message[A]] = + pubSub + .subscribe(NonEmptyList.of(topic.topic)) + .flatMap(m => + m.body.as[A](topic.codec) match { + case Right(a) => Stream.emit(Message(m.head, a)) + case Left(err) => + logger.s + .error(err)( + s"Could not decode message to topic ${topic.name} to ${topic.msgClass}: ${m.body.noSpaces}" + ) + .drain + } + ) + + def delegate: P = pubSub + + def withDelegate[R <: PubSub[F]](newDelegate: R): PubSubT[F, R] = + PubSubT(newDelegate, logger) + } +} diff --git a/modules/pubsub/api/src/main/scala/docspell/pubsub/api/Topic.scala b/modules/pubsub/api/src/main/scala/docspell/pubsub/api/Topic.scala new file mode 100644 index 00000000..ab9d664b --- /dev/null +++ b/modules/pubsub/api/src/main/scala/docspell/pubsub/api/Topic.scala @@ -0,0 +1,26 @@ +/* + * Copyright 2020 Eike K. & Contributors + * + * SPDX-License-Identifier: AGPL-3.0-or-later + */ + +package docspell.pubsub.api + +import io.circe.{Decoder, Encoder} + +final case class Topic private (topic: String) { + def name: String = topic +} + +object Topic { + implicit val jsonDecoder: Decoder[Topic] = + Decoder.decodeString.map(Topic.apply) + + implicit val jsonEncoder: Encoder[Topic] = + Encoder.encodeString.contramap(_.topic) + + def apply(name: String): Topic = { + require(name.trim.nonEmpty) + new Topic(name) + } +} diff --git a/modules/pubsub/api/src/main/scala/docspell/pubsub/api/Topics.scala b/modules/pubsub/api/src/main/scala/docspell/pubsub/api/Topics.scala new file mode 100644 index 00000000..78b22408 --- /dev/null +++ b/modules/pubsub/api/src/main/scala/docspell/pubsub/api/Topics.scala @@ -0,0 +1,47 @@ +/* + * Copyright 2020 Eike K. & Contributors + * + * SPDX-License-Identifier: AGPL-3.0-or-later + */ + +package docspell.pubsub.api + +import cats.data.NonEmptyList + +import docspell.common.Ident + +import io.circe.generic.semiauto.{deriveDecoder, deriveEncoder} +import io.circe.{Decoder, Encoder} + +/** All topics used in Docspell. */ +object Topics { + + /** Notify when a job has finished. */ + val jobDone: TypedTopic[JobDoneMsg] = TypedTopic[JobDoneMsg](Topic("job-done")) + + /** Notify when a job has been submitted. The job executor listens to these messages to + * wake up and do its work. + */ + val jobSubmitted: TypedTopic[JobSubmittedMsg] = + TypedTopic[JobSubmittedMsg](Topic("job-submitted")) + + val all: NonEmptyList[TypedTopic[_]] = NonEmptyList.of(jobDone, jobSubmitted) + + final case class JobSubmittedMsg(id: Ident) + object JobSubmittedMsg { + implicit val jsonDecoder: Decoder[JobSubmittedMsg] = + deriveDecoder[JobSubmittedMsg] + + implicit val jsonEncoder: Encoder[JobSubmittedMsg] = + deriveEncoder[JobSubmittedMsg] + } + + final case class JobDoneMsg(jobId: Ident, task: Ident) + object JobDoneMsg { + implicit val jsonDecoder: Decoder[JobDoneMsg] = + deriveDecoder[JobDoneMsg] + + implicit val jsonEncoder: Encoder[JobDoneMsg] = + deriveEncoder[JobDoneMsg] + } +} diff --git a/modules/pubsub/api/src/main/scala/docspell/pubsub/api/TypedTopic.scala b/modules/pubsub/api/src/main/scala/docspell/pubsub/api/TypedTopic.scala new file mode 100644 index 00000000..542144ee --- /dev/null +++ b/modules/pubsub/api/src/main/scala/docspell/pubsub/api/TypedTopic.scala @@ -0,0 +1,29 @@ +/* + * Copyright 2020 Eike K. & Contributors + * + * SPDX-License-Identifier: AGPL-3.0-or-later + */ + +package docspell.pubsub.api + +import scala.reflect.{ClassTag, classTag} + +import io.circe.{Codec, Decoder, Encoder} + +final case class TypedTopic[A](topic: Topic, codec: Codec[A], msgClass: Class[_]) { + def name: String = topic.name + + def withTopic(topic: Topic): TypedTopic[A] = + copy(topic = topic) + + def withName(name: String): TypedTopic[A] = + withTopic(Topic(name)) +} + +object TypedTopic { + + def apply[A: ClassTag]( + topic: Topic + )(implicit dec: Decoder[A], enc: Encoder[A]): TypedTopic[A] = + TypedTopic(topic, Codec.from(dec, enc), classTag[A].runtimeClass) +} diff --git a/modules/pubsub/naive/src/main/scala/docspell/pubsub/naive/NaivePubSub.scala b/modules/pubsub/naive/src/main/scala/docspell/pubsub/naive/NaivePubSub.scala new file mode 100644 index 00000000..55bdc786 --- /dev/null +++ b/modules/pubsub/naive/src/main/scala/docspell/pubsub/naive/NaivePubSub.scala @@ -0,0 +1,212 @@ +/* + * Copyright 2020 Eike K. & Contributors + * + * SPDX-License-Identifier: AGPL-3.0-or-later + */ + +package docspell.pubsub.naive + +import cats.data.NonEmptyList +import cats.effect._ +import cats.implicits._ +import fs2.Pipe +import fs2.Stream +import fs2.concurrent.{Topic => Fs2Topic} + +import docspell.common._ +import docspell.pubsub.api._ +import docspell.pubsub.naive.NaivePubSub.State +import docspell.store.Store +import docspell.store.records.RPubSub + +import io.circe.Json +import org.http4s.circe.CirceEntityCodec._ +import org.http4s.client.Client +import org.http4s.client.dsl.Http4sClientDsl +import org.http4s.dsl.Http4sDsl +import org.http4s.{HttpRoutes, Uri} + +/** A pubsub implementation that can be used across machines, but uses a rather + * inefficient protocol. The reason is to make it work with the current setup, i.e. not + * requiring to add another complex piece of software to the mix, like Kafka or RabbitMQ. + * + * However, the api should allow to be used on top of such a tool. This implementation + * can be used in a personal setting, where there are only a few nodes. + * + * How it works: + * + * It is build on the `Topic` class from fs2.concurrent. A map of a topic name to such a + * `Topic` instance is maintained. To work across machines, the database is used as a + * synchronization point. Each subscriber must provide a http api and so its "callback" + * URL is added into the database to the list of remote subscribers. + * + * When publishing a message, the message can be published to the internal fs2 topic. + * Then all URLs to this topic name are looked up in the database and the message is + * POSTed to each URL as JSON. The endpoint of each machine takes this message and + * publishes it to its own internal fs2.concurrent.Topic instance. + * + * Obviously, there are drawbacks: it is slow, because the messages go through http and + * connections must be opened/closed etc and the database is hit as well. Then it doesn't + * scale to lots of machines and messages. The upside is, that it works with the current + * setup and it should be good enough for personal use, where there are only a small + * amount of machines and messages. + * + * The main use case for docspell is to communicate between the rest-server and job + * executor. It is for internal communication and all topics are known at compile time. + */ +final class NaivePubSub[F[_]: Async]( + cfg: PubSubConfig, + state: Ref[F, State[F]], + store: Store[F], + client: Client[F] +) extends PubSub[F] { + private val logger: Logger[F] = Logger.log4s(org.log4s.getLogger) + + def withClient(client: Client[F]): NaivePubSub[F] = + new NaivePubSub[F](cfg, state, store, client) + + def publish1(topic: Topic, msgBody: Json): F[MessageHead] = + for { + head <- mkMessageHead(topic) + msg = Message(head, msgBody) + _ <- logger.trace(s"Publishing: $msg") + // go through all local subscribers and publish to the fs2 topic + _ <- publishLocal(msg) + // get all remote subscribers from the database and send the message via http + _ <- publishRemote(msg) + } yield head + + def publish(topic: Topic): Pipe[F, Json, MessageHead] = + ms => //TODO Do some optimization by grouping messages to the same topic + ms.evalMap(publish1(topic, _)) + + def subscribe(topics: NonEmptyList[Topic]): Stream[F, Message[Json]] = + (for { + _ <- logger.s.info(s"Adding subscriber for topics: $topics") + _ <- Stream.resource[F, Unit](addRemote(topics)) + m <- Stream.eval(addLocal(topics)) + } yield m).flatten + + /** Receive messages from remote publishers and passes them to the local subscribers. */ + def receiveRoute: HttpRoutes[F] = { + val dsl = new Http4sDsl[F] {} + import dsl._ + + HttpRoutes.of { case req @ POST -> Root => + for { + data <- req.as[List[Message[Json]]] + _ <- logger.trace(s"Received external message(s): $data") + _ <- data.traverse(publishLocal) + resp <- Ok(()) + } yield resp + } + } + + // ---- private helpers + + private def mkMessageHead(topic: Topic): F[MessageHead] = + for { + id <- Ident.randomId[F] + ts <- Timestamp.current[F] + head = MessageHead(id, ts, topic) + } yield head + + private def addLocal(topics: NonEmptyList[Topic]): F[Stream[F, Message[Json]]] = { + val topicSet = topics.map(_.name).toList.toSet + for { + st <- state.get + tpc = st.topics.view.filterKeys(topicSet.contains) + _ <- + if (tpc.isEmpty) + logger.warn(s"Subscribing to 0 topics! Topics $topics were not initialized") + else ().pure[F] + data = tpc.values.toList.traverse(t => t.subscribe(cfg.subscriberQueueSize)) + out = data.flatMap(msgs => Stream.emits(msgs)) + } yield out + } + + private def addRemote(topics: NonEmptyList[Topic]): Resource[F, Unit] = { + def subscribe: F[Unit] = + logger.trace(s"Incrementing counter for topics: $topics") *> + store.transact(RPubSub.increment(cfg.url, topics.map(_.name))).as(()) + + def unsubscribe: F[Unit] = + logger.trace(s"Decrementing counter for topics: $topics") *> + store.transact(RPubSub.decrement(cfg.url, topics.map(_.name))).as(()) + + Resource.make(subscribe)(_ => unsubscribe) + } + + private def publishLocal(msg: Message[Json]): F[Unit] = + for { + st <- state.get + _ <- st.topics.get(msg.head.topic.name) match { + case Some(sub) => + logger.trace(s"Publishing message to local topic: $msg") *> + sub.publish1(msg).as(()) + case None => + ().pure[F] + } + } yield () + + private def publishRemote(msg: Message[Json]): F[Unit] = { + val dsl = new Http4sDsl[F] with Http4sClientDsl[F] {} + import dsl._ + + for { + _ <- logger.trace(s"Find all nodes subscribed to topic ${msg.head.topic.name}") + urls <- store.transact(RPubSub.findSubs(msg.head.topic.name)) + _ <- logger.trace(s"Publishing to remote urls ${urls.map(_.asString)}: $msg") + reqs = urls + .map(u => Uri.unsafeFromString(u.asString)) + .map(uri => POST(List(msg), uri)) + res <- reqs.traverse(req => client.status(req)).attempt + _ <- res match { + case Right(s) => + if (s.forall(_.isSuccess)) ().pure[F] + else if (s.size == urls.size) + logger.warn( + s"No nodes was be reached! Reason: $s, message: $msg" + ) + else + logger.warn( + s"Some nodes were not reached! Reason: $s, message: $msg" + ) + case Left(ex) => + logger.error(ex)(s"Error publishing ${msg.head.topic.name} message remotely") + } + } yield () + } +} + +object NaivePubSub { + + def apply[F[_]: Async]( + cfg: PubSubConfig, + store: Store[F], + client: Client[F] + )(topics: NonEmptyList[Topic]): F[NaivePubSub[F]] = + for { + state <- Ref.ofEffect[F, State[F]](State.create[F](topics)) + _ <- store.transact(RPubSub.initTopics(cfg.nodeId, cfg.url, topics.map(_.name))) + } yield new NaivePubSub[F](cfg, state, store, client) + + def create[F[_]: Async]( + cfg: PubSubConfig, + store: Store[F], + client: Client[F], + logger: Logger[F] + )(topics: NonEmptyList[Topic]): F[PubSubT[F, NaivePubSub[F]]] = + apply[F](cfg, store, client)(topics).map(ps => PubSubT(ps, logger)) + + final case class State[F[_]](topics: Map[String, Fs2Topic[F, Message[Json]]]) {} + + object State { + def empty[F[_]]: State[F] = State[F](Map.empty) + def create[F[_]: Async](topics: NonEmptyList[Topic]): F[State[F]] = + topics + .traverse(t => Fs2Topic[F, Message[Json]].map(fs2t => t.name -> fs2t)) + .map(_.toList.toMap) + .map(State.apply) + } +} diff --git a/modules/pubsub/naive/src/main/scala/docspell/pubsub/naive/PubSubConfig.scala b/modules/pubsub/naive/src/main/scala/docspell/pubsub/naive/PubSubConfig.scala new file mode 100644 index 00000000..b00131d6 --- /dev/null +++ b/modules/pubsub/naive/src/main/scala/docspell/pubsub/naive/PubSubConfig.scala @@ -0,0 +1,11 @@ +/* + * Copyright 2020 Eike K. & Contributors + * + * SPDX-License-Identifier: AGPL-3.0-or-later + */ + +package docspell.pubsub.naive + +import docspell.common.{Ident, LenientUri} + +case class PubSubConfig(nodeId: Ident, url: LenientUri, subscriberQueueSize: Int) diff --git a/modules/pubsub/naive/src/test/scala/docspell/pubsub/naive/Fixtures.scala b/modules/pubsub/naive/src/test/scala/docspell/pubsub/naive/Fixtures.scala new file mode 100644 index 00000000..cd2a386a --- /dev/null +++ b/modules/pubsub/naive/src/test/scala/docspell/pubsub/naive/Fixtures.scala @@ -0,0 +1,85 @@ +/* + * Copyright 2020 Eike K. & Contributors + * + * SPDX-License-Identifier: AGPL-3.0-or-later + */ + +package docspell.pubsub.naive + +import cats.effect._ + +import docspell.common._ +import docspell.pubsub.api._ +import docspell.store.{Store, StoreFixture} + +import munit.CatsEffectSuite +import org.http4s.Response +import org.http4s.client.Client + +trait Fixtures extends HttpClientOps { self: CatsEffectSuite => + + val pubsubEnv = ResourceFixture(Fixtures.envResource("node-1")) + + val pubsubT = ResourceFixture { + Fixtures + .envResource("node-1") + .flatMap(e => Resource.eval(e.pubSub)) + .map(ps => PubSubT(ps, Fixtures.loggerIO)) + } + + def conntectedPubsubs(env: Fixtures.Env) = + for { + // Create two pubsub instances connected to the same database + ps_1 <- env.withNodeId("node-1").pubSubT + ps_2 <- env.withNodeId("node-2").pubSubT + + // both instances have a dummy client. now connect their clients to each other + ps1 = ps_1.withDelegate(ps_1.delegate.withClient(httpClient(ps_2))) + ps2 = ps_2.withDelegate(ps_2.delegate.withClient(httpClient(ps_1))) + } yield (ps1, ps2) + + implicit final class StringId(s: String) { + def id: Ident = Ident.unsafe(s) + } +} + +object Fixtures { + private val loggerIO: Logger[IO] = Logger.log4s(org.log4s.getLogger) + + final case class Env(store: Store[IO], cfg: PubSubConfig) { + def pubSub: IO[NaivePubSub[IO]] = { + val dummyClient = Client[IO](_ => Resource.pure(Response.notFound[IO])) + NaivePubSub(cfg, store, dummyClient)(Topics.all.map(_.topic)) + } + def pubSubT: IO[PubSubT[IO, NaivePubSub[IO]]] = + pubSub.map(PubSubT(_, loggerIO)) + + def withNodeId(nodeId: String): Env = + copy(cfg = + cfg.copy( + nodeId = Ident.unsafe(nodeId), + url = LenientUri.unsafe(s"http://$nodeId/") + ) + ) + } + + def testConfig(nodeId: String) = + PubSubConfig( + Ident.unsafe(nodeId), + LenientUri.unsafe(s"http://${nodeId}/"), + 0 + ) + + def storeResource: Resource[IO, Store[IO]] = + for { + random <- Resource.eval(Ident.randomId[IO]) + cfg = StoreFixture.memoryDB(random.id.take(12)) + store <- StoreFixture.store(cfg) + _ <- Resource.eval(store.migrate) + } yield store + + def envResource(nodeId: String): Resource[IO, Env] = + for { + store <- storeResource + } yield Env(store, testConfig(nodeId)) +} diff --git a/modules/pubsub/naive/src/test/scala/docspell/pubsub/naive/HttpClientOps.scala b/modules/pubsub/naive/src/test/scala/docspell/pubsub/naive/HttpClientOps.scala new file mode 100644 index 00000000..cafdf0c2 --- /dev/null +++ b/modules/pubsub/naive/src/test/scala/docspell/pubsub/naive/HttpClientOps.scala @@ -0,0 +1,55 @@ +/* + * Copyright 2020 Eike K. & Contributors + * + * SPDX-License-Identifier: AGPL-3.0-or-later + */ + +package docspell.pubsub.naive + +import cats.effect._ + +import docspell.common._ +import docspell.pubsub.api._ + +import io.circe.Encoder +import org.http4s.circe.CirceEntityCodec._ +import org.http4s.client.Client +import org.http4s.client.dsl.io._ +import org.http4s.dsl.io._ +import org.http4s.{HttpApp, HttpRoutes, Uri} + +trait HttpClientOps { + def httpClient(routes: HttpRoutes[IO]): Client[IO] = + Client.fromHttpApp(HttpApp(routes.orNotFound.run)) + + def httpClient(ps: NaivePubSub[IO]): Client[IO] = + httpClient(ps.receiveRoute) + + def httpClient(ps: PubSubT[IO, NaivePubSub[IO]]): Client[IO] = + httpClient(ps.delegate) + + implicit final class ClientOps(client: Client[IO]) { + val uri = Uri.unsafeFromString("http://localhost/") + + def sendMessage[A: Encoder](topic: Topic, body: A): IO[Unit] = { + val encode: Encoder[List[Message[A]]] = implicitly[Encoder[List[Message[A]]]] + + for { + id <- Ident.randomId[IO] + time <- Timestamp.current[IO] + mesg = List(Message(MessageHead(id, time, topic), body)) + _ <- HttpClientOps.logger.debug(s"Sending message(s): $mesg") + _ <- client.expectOr[Unit](POST(encode(mesg), uri)) { resp => + IO(new Exception(s"Unexpected response: $resp")) + } + } yield () + } + + def send[A](typedTopic: TypedTopic[A], body: A): IO[Unit] = + sendMessage(typedTopic.topic, body)(typedTopic.codec) + } +} + +object HttpClientOps { + private val logger: Logger[IO] = Logger.log4s(org.log4s.getLogger) +} diff --git a/modules/pubsub/naive/src/test/scala/docspell/pubsub/naive/NaivePubSubTest.scala b/modules/pubsub/naive/src/test/scala/docspell/pubsub/naive/NaivePubSubTest.scala new file mode 100644 index 00000000..c244e1da --- /dev/null +++ b/modules/pubsub/naive/src/test/scala/docspell/pubsub/naive/NaivePubSubTest.scala @@ -0,0 +1,128 @@ +/* + * Copyright 2020 Eike K. & Contributors + * + * SPDX-License-Identifier: AGPL-3.0-or-later + */ + +package docspell.pubsub.naive + +import scala.concurrent.duration._ + +import cats.effect._ +import cats.implicits._ +import fs2.concurrent.SignallingRef + +import docspell.common._ +import docspell.pubsub.api.Topics.{JobDoneMsg, JobSubmittedMsg} +import docspell.pubsub.api._ + +import munit.CatsEffectSuite + +class NaivePubSubTest extends CatsEffectSuite with Fixtures { + private[this] val logger = Logger.log4s[IO](org.log4s.getLogger) + + def subscribe[A](ps: PubSubT[IO, NaivePubSub[IO]], topic: TypedTopic[A]) = + for { + received <- Ref.of[IO, Option[Message[A]]](None) + halt <- SignallingRef.of[IO, Boolean](false) + fiber <- Async[IO].start( + logger.debug(s"${Thread.currentThread()} Listening for messages...") *> + ps.subscribe(topic) + .evalMap(m => + logger.debug(s"Handling message: $m") *> + received.set(Some(m)) *> + halt.set(true) + ) + .interruptWhen(halt) + .compile + .drain + ) + _ <- IO.sleep(500.millis) + } yield (received, halt, fiber) + + pubsubT.test("local publish receives message") { ps => + for { + res <- subscribe(ps, Topics.jobSubmitted) + (received, _, subFiber) = res + headSend <- ps.publish1(Topics.jobSubmitted, JobSubmittedMsg("hello".id)) + outcome <- subFiber.join + msgRec <- received.get + _ = assert(outcome.isSuccess) + _ = assertEquals(msgRec.map(_.head), Option(headSend)) + } yield () + } + + pubsubT.test("local publish to different topic doesn't receive") { ps => + val otherTopic = Topics.jobSubmitted.withTopic(Topic("other-name")) + for { + res <- subscribe(ps, Topics.jobSubmitted) + (received, halt, subFiber) = res + _ <- ps.publish1(otherTopic, JobSubmittedMsg("hello".id)) + _ <- IO.sleep(100.millis) //allow some time for receiving + _ <- halt.set(true) + outcome <- subFiber.join + _ = assert(outcome.isSuccess) + recMsg <- received.get + _ = assert(recMsg.isEmpty) + } yield () + } + + pubsubT.test("receive messages remotely") { ps => + val msg = JobSubmittedMsg("hello-remote".id) + for { + res <- subscribe(ps, Topics.jobSubmitted) + (received, _, subFiber) = res + client = httpClient(ps.delegate.receiveRoute) + _ <- client.send(Topics.jobSubmitted, msg) + outcome <- subFiber.join + msgRec <- received.get + _ = assert(outcome.isSuccess) + _ = assertEquals(msgRec.map(_.head.topic), Topics.jobSubmitted.topic.some) + _ = assertEquals(msgRec.map(_.body), msg.some) + } yield () + } + + pubsubEnv.test("send messages remotely") { env => + val msg = JobSubmittedMsg("hello-remote".id) + for { + // Create two pubsub instances connected to the same database + pubsubs <- conntectedPubsubs(env) + (ps1, ps2) = pubsubs + + // subscribe to ps1 and send via ps2 + res <- subscribe(ps1, Topics.jobSubmitted) + (received, _, subFiber) = res + _ <- ps2.publish1(Topics.jobSubmitted, msg) + outcome <- subFiber.join + msgRec <- received.get + + // check results + _ = assert(outcome.isSuccess) + _ = assertEquals(msgRec.map(_.head.topic), Topics.jobSubmitted.topic.some) + _ = assertEquals(msgRec.map(_.body), msg.some) + } yield () + } + + pubsubEnv.test("do not receive remote message from other topic") { env => + val msg = JobDoneMsg("job-1".id, "task-2".id) + + for { + // Create two pubsub instances connected to the same database + pubsubs <- conntectedPubsubs(env) + (ps1, ps2) = pubsubs + + // subscribe to ps1 and send via ps2 + res <- subscribe(ps1, Topics.jobSubmitted) + (received, halt, subFiber) = res + _ <- ps2.publish1(Topics.jobDone, msg) + _ <- IO.sleep(100.millis) + _ <- halt.set(true) + outcome <- subFiber.join + msgRec <- received.get + + // check results + _ = assert(outcome.isSuccess) + _ = assertEquals(msgRec, None) + } yield () + } +} diff --git a/modules/store/src/main/resources/db/migration/h2/V1.28.0__pubsub.sql b/modules/store/src/main/resources/db/migration/h2/V1.28.0__pubsub.sql new file mode 100644 index 00000000..ae42b13c --- /dev/null +++ b/modules/store/src/main/resources/db/migration/h2/V1.28.0__pubsub.sql @@ -0,0 +1,8 @@ +CREATE TABLE "pubsub" ( + "id" varchar(254) not null primary key, + "node_id" varchar(254) not null, + "url" varchar(254) not null, + "topic" varchar(254) not null, + "counter" int not null, + unique("url", "topic") +) diff --git a/modules/store/src/main/resources/db/migration/mariadb/V1.28.0__pubsub.sql b/modules/store/src/main/resources/db/migration/mariadb/V1.28.0__pubsub.sql new file mode 100644 index 00000000..2bd09016 --- /dev/null +++ b/modules/store/src/main/resources/db/migration/mariadb/V1.28.0__pubsub.sql @@ -0,0 +1,8 @@ +CREATE TABLE `pubsub` ( + `id` varchar(254) not null primary key, + `node_id` varchar(254) not null, + `url` varchar(254) not null, + `topic` varchar(254) not null, + `counter` int not null, + unique(`url`, `topic`) +) diff --git a/modules/store/src/main/resources/db/migration/postgresql/V1.28.0__pubsub.sql b/modules/store/src/main/resources/db/migration/postgresql/V1.28.0__pubsub.sql new file mode 100644 index 00000000..ae42b13c --- /dev/null +++ b/modules/store/src/main/resources/db/migration/postgresql/V1.28.0__pubsub.sql @@ -0,0 +1,8 @@ +CREATE TABLE "pubsub" ( + "id" varchar(254) not null primary key, + "node_id" varchar(254) not null, + "url" varchar(254) not null, + "topic" varchar(254) not null, + "counter" int not null, + unique("url", "topic") +) diff --git a/modules/store/src/main/scala/docspell/store/records/RPubSub.scala b/modules/store/src/main/scala/docspell/store/records/RPubSub.scala new file mode 100644 index 00000000..f0e2ce6e --- /dev/null +++ b/modules/store/src/main/scala/docspell/store/records/RPubSub.scala @@ -0,0 +1,86 @@ +/* + * Copyright 2020 Eike K. & Contributors + * + * SPDX-License-Identifier: AGPL-3.0-or-later + */ + +package docspell.store.records + +import cats.data.NonEmptyList +import cats.implicits._ + +import docspell.common._ +import docspell.store.qb.DSL._ +import docspell.store.qb.{Column, DML, TableDef} + +import doobie._ +import doobie.implicits._ + +/** A table for supporting naive pubsub across nodes. */ +final case class RPubSub( + id: Ident, + nodeId: Ident, + url: LenientUri, + topic: String, + counter: Int +) + +object RPubSub { + final case class Table(alias: Option[String]) extends TableDef { + val tableName: String = "pubsub" + + val id = Column[Ident]("id", this) + val nodeId = Column[Ident]("node_id", this) + val url = Column[LenientUri]("url", this) + val topic = Column[String]("topic", this) + val counter = Column[Int]("counter", this) + + val all: NonEmptyList[Column[_]] = + NonEmptyList.of(id, nodeId, url, topic, counter) + } + def as(alias: String): Table = + Table(Some(alias)) + + val T: Table = Table(None) + + def insert(r: RPubSub): ConnectionIO[Int] = + DML.insert(T, T.all, sql"${r.id}, ${r.nodeId}, ${r.url}, ${r.topic}, ${r.counter}") + + /** Insert all topics with counter = 0 */ + def initTopics( + nodeId: Ident, + url: LenientUri, + topics: NonEmptyList[String] + ): ConnectionIO[Int] = + DML.delete(T, T.nodeId === nodeId) *> + topics.toList + .traverse(t => + Ident + .randomId[ConnectionIO] + .flatMap(id => insert(RPubSub(id, nodeId, url, t, 0))) + ) + .map(_.sum) + + def increment(url: LenientUri, topics: NonEmptyList[String]): ConnectionIO[Int] = + DML.update( + T, + T.url === url && T.topic.in(topics), + DML.set( + T.counter.increment(1) + ) + ) + + def decrement(url: LenientUri, topics: NonEmptyList[String]): ConnectionIO[Int] = + DML.update( + T, + T.url === url && T.topic.in(topics), + DML.set( + T.counter.decrement(1) + ) + ) + + def findSubs(topic: String): ConnectionIO[List[LenientUri]] = + run(select(T.url), from(T), T.topic === topic && T.counter > 0) + .query[LenientUri] + .to[List] +} diff --git a/modules/store/src/test/resources/logback-test.xml b/modules/store/src/test/resources/logback-test.xml index 9cf93b57..6d2ef8c0 100644 --- a/modules/store/src/test/resources/logback-test.xml +++ b/modules/store/src/test/resources/logback-test.xml @@ -3,7 +3,7 @@ true - %highlight(%-5level) %cyan(%logger{15}) - %msg %n + %highlight(%-5level) [%t{10}] %cyan(%logger{15}) - %msg %n From 4d5c695882c4b19bb9c51384252d7255a21939fb Mon Sep 17 00:00:00 2001 From: eikek Date: Fri, 5 Nov 2021 20:56:02 +0100 Subject: [PATCH 2/7] Hide implementation details from PubSubT interface --- .../scala/docspell/pubsub/api/PubSubT.scala | 14 ++-- .../scala/docspell/pubsub/api/Topics.scala | 18 ++++- .../docspell/pubsub/naive/NaivePubSub.scala | 8 +-- .../docspell/pubsub/naive/Fixtures.scala | 12 ++-- .../docspell/pubsub/naive/HttpClientOps.scala | 8 ++- .../pubsub/naive/NaivePubSubTest.scala | 66 +++++++++---------- 6 files changed, 72 insertions(+), 54 deletions(-) diff --git a/modules/pubsub/api/src/main/scala/docspell/pubsub/api/PubSubT.scala b/modules/pubsub/api/src/main/scala/docspell/pubsub/api/PubSubT.scala index 176e8885..9f28b798 100644 --- a/modules/pubsub/api/src/main/scala/docspell/pubsub/api/PubSubT.scala +++ b/modules/pubsub/api/src/main/scala/docspell/pubsub/api/PubSubT.scala @@ -11,7 +11,7 @@ import fs2.{Pipe, Stream} import docspell.common.Logger -trait PubSubT[F[_], P <: PubSub[F]] { +trait PubSubT[F[_]] { def publish1[A](topic: TypedTopic[A], msg: A): F[MessageHead] @@ -19,15 +19,15 @@ trait PubSubT[F[_], P <: PubSub[F]] { def subscribe[A](topic: TypedTopic[A]): Stream[F, Message[A]] - def delegate: P + def delegate: PubSub[F] - def withDelegate[R <: PubSub[F]](delegate: R): PubSubT[F, R] + def withDelegate(delegate: PubSub[F]): PubSubT[F] } object PubSubT { - def apply[F[_], P <: PubSub[F]](pubSub: P, logger: Logger[F]): PubSubT[F, P] = - new PubSubT[F, P] { + def apply[F[_]](pubSub: PubSub[F], logger: Logger[F]): PubSubT[F] = + new PubSubT[F] { def publish1[A](topic: TypedTopic[A], msg: A): F[MessageHead] = pubSub.publish1(topic.topic, topic.codec(msg)) @@ -49,9 +49,9 @@ object PubSubT { } ) - def delegate: P = pubSub + def delegate: PubSub[F] = pubSub - def withDelegate[R <: PubSub[F]](newDelegate: R): PubSubT[F, R] = + def withDelegate(newDelegate: PubSub[F]): PubSubT[F] = PubSubT(newDelegate, logger) } } diff --git a/modules/pubsub/api/src/main/scala/docspell/pubsub/api/Topics.scala b/modules/pubsub/api/src/main/scala/docspell/pubsub/api/Topics.scala index 78b22408..a536607a 100644 --- a/modules/pubsub/api/src/main/scala/docspell/pubsub/api/Topics.scala +++ b/modules/pubsub/api/src/main/scala/docspell/pubsub/api/Topics.scala @@ -25,9 +25,13 @@ object Topics { val jobSubmitted: TypedTopic[JobSubmittedMsg] = TypedTopic[JobSubmittedMsg](Topic("job-submitted")) - val all: NonEmptyList[TypedTopic[_]] = NonEmptyList.of(jobDone, jobSubmitted) + /** Notify a node to cancel a job with the given id */ + val cancelJob: TypedTopic[CancelJobMsg] = + TypedTopic[CancelJobMsg](Topic("cancel-job")) - final case class JobSubmittedMsg(id: Ident) + val all: NonEmptyList[TypedTopic[_]] = NonEmptyList.of(jobDone, jobSubmitted, cancelJob) + + final case class JobSubmittedMsg(task: Ident) object JobSubmittedMsg { implicit val jsonDecoder: Decoder[JobSubmittedMsg] = deriveDecoder[JobSubmittedMsg] @@ -44,4 +48,14 @@ object Topics { implicit val jsonEncoder: Encoder[JobDoneMsg] = deriveEncoder[JobDoneMsg] } + + final case class CancelJobMsg(jobId: Ident, nodeId: Ident) + object CancelJobMsg { + implicit val jsonDecoder: Decoder[CancelJobMsg] = + deriveDecoder[CancelJobMsg] + + implicit val jsonEncoder: Encoder[CancelJobMsg] = + deriveEncoder[CancelJobMsg] + + } } diff --git a/modules/pubsub/naive/src/main/scala/docspell/pubsub/naive/NaivePubSub.scala b/modules/pubsub/naive/src/main/scala/docspell/pubsub/naive/NaivePubSub.scala index 55bdc786..fb7bb1cb 100644 --- a/modules/pubsub/naive/src/main/scala/docspell/pubsub/naive/NaivePubSub.scala +++ b/modules/pubsub/naive/src/main/scala/docspell/pubsub/naive/NaivePubSub.scala @@ -185,18 +185,18 @@ object NaivePubSub { cfg: PubSubConfig, store: Store[F], client: Client[F] - )(topics: NonEmptyList[Topic]): F[NaivePubSub[F]] = - for { + )(topics: NonEmptyList[Topic]): Resource[F, NaivePubSub[F]] = + Resource.eval(for { state <- Ref.ofEffect[F, State[F]](State.create[F](topics)) _ <- store.transact(RPubSub.initTopics(cfg.nodeId, cfg.url, topics.map(_.name))) - } yield new NaivePubSub[F](cfg, state, store, client) + } yield new NaivePubSub[F](cfg, state, store, client)) def create[F[_]: Async]( cfg: PubSubConfig, store: Store[F], client: Client[F], logger: Logger[F] - )(topics: NonEmptyList[Topic]): F[PubSubT[F, NaivePubSub[F]]] = + )(topics: NonEmptyList[Topic]): Resource[F, PubSubT[F]] = apply[F](cfg, store, client)(topics).map(ps => PubSubT(ps, logger)) final case class State[F[_]](topics: Map[String, Fs2Topic[F, Message[Json]]]) {} diff --git a/modules/pubsub/naive/src/test/scala/docspell/pubsub/naive/Fixtures.scala b/modules/pubsub/naive/src/test/scala/docspell/pubsub/naive/Fixtures.scala index cd2a386a..99e863ce 100644 --- a/modules/pubsub/naive/src/test/scala/docspell/pubsub/naive/Fixtures.scala +++ b/modules/pubsub/naive/src/test/scala/docspell/pubsub/naive/Fixtures.scala @@ -23,7 +23,7 @@ trait Fixtures extends HttpClientOps { self: CatsEffectSuite => val pubsubT = ResourceFixture { Fixtures .envResource("node-1") - .flatMap(e => Resource.eval(e.pubSub)) + .flatMap(_.pubSub) .map(ps => PubSubT(ps, Fixtures.loggerIO)) } @@ -34,8 +34,8 @@ trait Fixtures extends HttpClientOps { self: CatsEffectSuite => ps_2 <- env.withNodeId("node-2").pubSubT // both instances have a dummy client. now connect their clients to each other - ps1 = ps_1.withDelegate(ps_1.delegate.withClient(httpClient(ps_2))) - ps2 = ps_2.withDelegate(ps_2.delegate.withClient(httpClient(ps_1))) + ps1 = ps_1.withDelegate(ps_1.delegateT.withClient(httpClient(ps_2))) + ps2 = ps_2.withDelegate(ps_2.delegateT.withClient(httpClient(ps_1))) } yield (ps1, ps2) implicit final class StringId(s: String) { @@ -47,11 +47,11 @@ object Fixtures { private val loggerIO: Logger[IO] = Logger.log4s(org.log4s.getLogger) final case class Env(store: Store[IO], cfg: PubSubConfig) { - def pubSub: IO[NaivePubSub[IO]] = { + def pubSub: Resource[IO, NaivePubSub[IO]] = { val dummyClient = Client[IO](_ => Resource.pure(Response.notFound[IO])) NaivePubSub(cfg, store, dummyClient)(Topics.all.map(_.topic)) } - def pubSubT: IO[PubSubT[IO, NaivePubSub[IO]]] = + def pubSubT: Resource[IO, PubSubT[IO]] = pubSub.map(PubSubT(_, loggerIO)) def withNodeId(nodeId: String): Env = @@ -66,7 +66,7 @@ object Fixtures { def testConfig(nodeId: String) = PubSubConfig( Ident.unsafe(nodeId), - LenientUri.unsafe(s"http://${nodeId}/"), + LenientUri.unsafe(s"http://$nodeId/"), 0 ) diff --git a/modules/pubsub/naive/src/test/scala/docspell/pubsub/naive/HttpClientOps.scala b/modules/pubsub/naive/src/test/scala/docspell/pubsub/naive/HttpClientOps.scala index cafdf0c2..3659f88e 100644 --- a/modules/pubsub/naive/src/test/scala/docspell/pubsub/naive/HttpClientOps.scala +++ b/modules/pubsub/naive/src/test/scala/docspell/pubsub/naive/HttpClientOps.scala @@ -25,8 +25,8 @@ trait HttpClientOps { def httpClient(ps: NaivePubSub[IO]): Client[IO] = httpClient(ps.receiveRoute) - def httpClient(ps: PubSubT[IO, NaivePubSub[IO]]): Client[IO] = - httpClient(ps.delegate) + def httpClient(ps: PubSubT[IO]): Client[IO] = + httpClient(ps.delegateT) implicit final class ClientOps(client: Client[IO]) { val uri = Uri.unsafeFromString("http://localhost/") @@ -48,6 +48,10 @@ trait HttpClientOps { def send[A](typedTopic: TypedTopic[A], body: A): IO[Unit] = sendMessage(typedTopic.topic, body)(typedTopic.codec) } + + implicit final class PubSubTestOps(ps: PubSubT[IO]) { + def delegateT: NaivePubSub[IO] = ps.delegate.asInstanceOf[NaivePubSub[IO]] + } } object HttpClientOps { diff --git a/modules/pubsub/naive/src/test/scala/docspell/pubsub/naive/NaivePubSubTest.scala b/modules/pubsub/naive/src/test/scala/docspell/pubsub/naive/NaivePubSubTest.scala index c244e1da..ece6f443 100644 --- a/modules/pubsub/naive/src/test/scala/docspell/pubsub/naive/NaivePubSubTest.scala +++ b/modules/pubsub/naive/src/test/scala/docspell/pubsub/naive/NaivePubSubTest.scala @@ -21,7 +21,7 @@ import munit.CatsEffectSuite class NaivePubSubTest extends CatsEffectSuite with Fixtures { private[this] val logger = Logger.log4s[IO](org.log4s.getLogger) - def subscribe[A](ps: PubSubT[IO, NaivePubSub[IO]], topic: TypedTopic[A]) = + def subscribe[A](ps: PubSubT[IO], topic: TypedTopic[A]) = for { received <- Ref.of[IO, Option[Message[A]]](None) halt <- SignallingRef.of[IO, Boolean](false) @@ -72,7 +72,7 @@ class NaivePubSubTest extends CatsEffectSuite with Fixtures { for { res <- subscribe(ps, Topics.jobSubmitted) (received, _, subFiber) = res - client = httpClient(ps.delegate.receiveRoute) + client = httpClient(ps.delegateT.receiveRoute) _ <- client.send(Topics.jobSubmitted, msg) outcome <- subFiber.join msgRec <- received.get @@ -84,45 +84,45 @@ class NaivePubSubTest extends CatsEffectSuite with Fixtures { pubsubEnv.test("send messages remotely") { env => val msg = JobSubmittedMsg("hello-remote".id) - for { - // Create two pubsub instances connected to the same database - pubsubs <- conntectedPubsubs(env) - (ps1, ps2) = pubsubs - // subscribe to ps1 and send via ps2 - res <- subscribe(ps1, Topics.jobSubmitted) - (received, _, subFiber) = res - _ <- ps2.publish1(Topics.jobSubmitted, msg) - outcome <- subFiber.join - msgRec <- received.get + // Create two pubsub instances connected to the same database + conntectedPubsubs(env).use { case (ps1, ps2) => + for { + // subscribe to ps1 and send via ps2 + res <- subscribe(ps1, Topics.jobSubmitted) + (received, _, subFiber) = res + _ <- ps2.publish1(Topics.jobSubmitted, msg) + outcome <- subFiber.join + msgRec <- received.get - // check results - _ = assert(outcome.isSuccess) - _ = assertEquals(msgRec.map(_.head.topic), Topics.jobSubmitted.topic.some) - _ = assertEquals(msgRec.map(_.body), msg.some) - } yield () + // check results + _ = assert(outcome.isSuccess) + _ = assertEquals(msgRec.map(_.head.topic), Topics.jobSubmitted.topic.some) + _ = assertEquals(msgRec.map(_.body), msg.some) + } yield () + } } pubsubEnv.test("do not receive remote message from other topic") { env => val msg = JobDoneMsg("job-1".id, "task-2".id) - for { - // Create two pubsub instances connected to the same database - pubsubs <- conntectedPubsubs(env) - (ps1, ps2) = pubsubs + // Create two pubsub instances connected to the same database + conntectedPubsubs(env).use { case (ps1, ps2) => + for { + // subscribe to ps1 and send via ps2 + res <- subscribe(ps1, Topics.jobSubmitted) + (received, halt, subFiber) = res + _ <- ps2.publish1(Topics.jobDone, msg) + _ <- IO.sleep(100.millis) + _ <- halt.set(true) + outcome <- subFiber.join + msgRec <- received.get - // subscribe to ps1 and send via ps2 - res <- subscribe(ps1, Topics.jobSubmitted) - (received, halt, subFiber) = res - _ <- ps2.publish1(Topics.jobDone, msg) - _ <- IO.sleep(100.millis) - _ <- halt.set(true) - outcome <- subFiber.join - msgRec <- received.get + // check results + _ = assert(outcome.isSuccess) + _ = assertEquals(msgRec, None) + } yield () + } - // check results - _ = assert(outcome.isSuccess) - _ = assertEquals(msgRec, None) - } yield () } } From f38d520a1d0f68d309a78d3d165724f52dd71b77 Mon Sep 17 00:00:00 2001 From: eikek Date: Fri, 5 Nov 2021 21:01:02 +0100 Subject: [PATCH 3/7] Hooking the new pubsub impl into the application --- build.sbt | 7 ++- .../scala/docspell/backend/BackendApp.scala | 25 ++------ .../docspell/backend/msg/CancelJob.scala | 27 ++++++++ .../scala/docspell/backend/msg/JobDone.scala | 26 ++++++++ .../scala/docspell/backend/msg/Ping.scala | 31 ++++++++++ .../scala/docspell/backend/msg/Topics.scala | 23 +++++++ .../scala/docspell/backend/ops/OJob.scala | 12 ++-- .../scala/docspell/backend/ops/OJoex.scala | 32 +++------- .../main/scala/docspell/common/Logger.scala | 22 +++++++ .../src/main/scala/docspell/joex/Config.scala | 7 ++- .../scala/docspell/joex/JoexAppImpl.scala | 47 +++++++++----- .../main/scala/docspell/joex/JoexServer.scala | 16 ++++- .../joex/scheduler/SchedulerBuilder.scala | 13 +++- .../joex/scheduler/SchedulerImpl.scala | 38 +++++++----- .../scala/docspell/pubsub/api/PubSub.scala | 16 +++++ .../scala/docspell/pubsub/api/PubSubT.scala | 30 ++++++++- .../scala/docspell/pubsub/api/Topics.scala | 61 ------------------- .../docspell/pubsub/naive/NaivePubSub.scala | 17 ++---- .../pubsub/naive/NaivePubSubTest.scala | 6 +- .../scala/docspell/pubsub/naive/Topics.scala | 37 +++++++++++ .../scala/docspell/restserver/Config.scala | 4 ++ .../scala/docspell/restserver/RestApp.scala | 2 - .../docspell/restserver/RestAppImpl.scala | 47 ++++++++------ .../docspell/restserver/RestServer.scala | 13 +++- .../docspell/store/records/RPubSub.scala | 8 ++- 25 files changed, 379 insertions(+), 188 deletions(-) create mode 100644 modules/backend/src/main/scala/docspell/backend/msg/CancelJob.scala create mode 100644 modules/backend/src/main/scala/docspell/backend/msg/JobDone.scala create mode 100644 modules/backend/src/main/scala/docspell/backend/msg/Ping.scala create mode 100644 modules/backend/src/main/scala/docspell/backend/msg/Topics.scala delete mode 100644 modules/pubsub/api/src/main/scala/docspell/pubsub/api/Topics.scala create mode 100644 modules/pubsub/naive/src/test/scala/docspell/pubsub/naive/Topics.scala diff --git a/build.sbt b/build.sbt index b47c36ce..6ac36289 100644 --- a/build.sbt +++ b/build.sbt @@ -564,7 +564,7 @@ val backend = project Dependencies.http4sClient ++ Dependencies.emil ) - .dependsOn(store, joexapi, ftsclient, totp) + .dependsOn(store, joexapi, ftsclient, totp, pubsubApi) val oidc = project .in(file("modules/oidc")) @@ -655,7 +655,8 @@ val joex = project analysis, joexapi, restapi, - ftssolr + ftssolr, + pubsubNaive ) val restserver = project @@ -719,7 +720,7 @@ val restserver = project } } ) - .dependsOn(config, restapi, joexapi, backend, webapp, ftssolr, oidc) + .dependsOn(config, restapi, joexapi, backend, webapp, ftssolr, oidc, pubsubNaive) // --- Website Documentation diff --git a/modules/backend/src/main/scala/docspell/backend/BackendApp.scala b/modules/backend/src/main/scala/docspell/backend/BackendApp.scala index 4aea530f..cdc92fcc 100644 --- a/modules/backend/src/main/scala/docspell/backend/BackendApp.scala +++ b/modules/backend/src/main/scala/docspell/backend/BackendApp.scala @@ -6,8 +6,6 @@ package docspell.backend -import scala.concurrent.ExecutionContext - import cats.effect._ import docspell.backend.auth.Login @@ -15,15 +13,13 @@ import docspell.backend.fulltext.CreateIndex import docspell.backend.ops._ import docspell.backend.signup.OSignup import docspell.ftsclient.FtsClient -import docspell.joexapi.client.JoexClient +import docspell.pubsub.api.PubSubT import docspell.store.Store import docspell.store.queue.JobQueue import docspell.store.usertask.UserTaskStore import docspell.totp.Totp import emil.javamail.{JavaMailEmil, Settings} -import org.http4s.blaze.client.BlazeClientBuilder -import org.http4s.client.Client trait BackendApp[F[_]] { @@ -49,6 +45,7 @@ trait BackendApp[F[_]] { def clientSettings: OClientSettings[F] def totp: OTotp[F] def share: OShare[F] + def pubSub: PubSubT[F] } object BackendApp { @@ -56,8 +53,8 @@ object BackendApp { def create[F[_]: Async]( cfg: Config, store: Store[F], - httpClient: Client[F], - ftsClient: FtsClient[F] + ftsClient: FtsClient[F], + pubSubT: PubSubT[F] ): Resource[F, BackendApp[F]] = for { utStore <- UserTaskStore(store) @@ -65,7 +62,7 @@ object BackendApp { totpImpl <- OTotp(store, Totp.default) loginImpl <- Login[F](store, Totp.default) signupImpl <- OSignup[F](store) - joexImpl <- OJoex(JoexClient(httpClient), store) + joexImpl <- OJoex(pubSubT) collImpl <- OCollective[F](store, utStore, queue, joexImpl) sourceImpl <- OSource[F](store) tagImpl <- OTag[F](store) @@ -90,6 +87,7 @@ object BackendApp { OShare(store, itemSearchImpl, simpleSearchImpl, javaEmil) ) } yield new BackendApp[F] { + val pubSub = pubSubT val login = loginImpl val signup = signupImpl val collective = collImpl @@ -113,15 +111,4 @@ object BackendApp { val totp = totpImpl val share = shareImpl } - - def apply[F[_]: Async]( - cfg: Config, - connectEC: ExecutionContext - )(ftsFactory: Client[F] => Resource[F, FtsClient[F]]): Resource[F, BackendApp[F]] = - for { - store <- Store.create(cfg.jdbc, cfg.files.chunkSize, connectEC) - httpClient <- BlazeClientBuilder[F].resource - ftsClient <- ftsFactory(httpClient) - backend <- create(cfg, store, httpClient, ftsClient) - } yield backend } diff --git a/modules/backend/src/main/scala/docspell/backend/msg/CancelJob.scala b/modules/backend/src/main/scala/docspell/backend/msg/CancelJob.scala new file mode 100644 index 00000000..677f1fba --- /dev/null +++ b/modules/backend/src/main/scala/docspell/backend/msg/CancelJob.scala @@ -0,0 +1,27 @@ +/* + * Copyright 2020 Eike K. & Contributors + * + * SPDX-License-Identifier: AGPL-3.0-or-later + */ + +package docspell.backend.msg + +import docspell.common._ +import docspell.pubsub.api.{Topic, TypedTopic} + +import io.circe.generic.semiauto.{deriveDecoder, deriveEncoder} +import io.circe.{Decoder, Encoder} + +/** Message to request to cancel a job. */ +final case class CancelJob(jobId: Ident, nodeId: Ident) + +object CancelJob { + implicit val jsonDecoder: Decoder[CancelJob] = + deriveDecoder[CancelJob] + + implicit val jsonEncoder: Encoder[CancelJob] = + deriveEncoder[CancelJob] + + val topic: TypedTopic[CancelJob] = + TypedTopic(Topic("job-cancel-request")) +} diff --git a/modules/backend/src/main/scala/docspell/backend/msg/JobDone.scala b/modules/backend/src/main/scala/docspell/backend/msg/JobDone.scala new file mode 100644 index 00000000..9b09b8f1 --- /dev/null +++ b/modules/backend/src/main/scala/docspell/backend/msg/JobDone.scala @@ -0,0 +1,26 @@ +/* + * Copyright 2020 Eike K. & Contributors + * + * SPDX-License-Identifier: AGPL-3.0-or-later + */ + +package docspell.backend.msg + +import docspell.common._ +import docspell.pubsub.api.{Topic, TypedTopic} + +import io.circe.generic.semiauto.{deriveDecoder, deriveEncoder} +import io.circe.{Decoder, Encoder} + +/** Message to notify about finished jobs. They have a final state. */ +final case class JobDone(jobId: Ident, task: Ident, args: String, state: JobState) +object JobDone { + implicit val jsonDecoder: Decoder[JobDone] = + deriveDecoder[JobDone] + + implicit val jsonEncoder: Encoder[JobDone] = + deriveEncoder[JobDone] + + val topic: TypedTopic[JobDone] = + TypedTopic(Topic("job-finished")) +} diff --git a/modules/backend/src/main/scala/docspell/backend/msg/Ping.scala b/modules/backend/src/main/scala/docspell/backend/msg/Ping.scala new file mode 100644 index 00000000..b300a76d --- /dev/null +++ b/modules/backend/src/main/scala/docspell/backend/msg/Ping.scala @@ -0,0 +1,31 @@ +/* + * Copyright 2020 Eike K. & Contributors + * + * SPDX-License-Identifier: AGPL-3.0-or-later + */ + +package docspell.backend.msg + +import java.util.concurrent.atomic.AtomicLong + +import docspell.pubsub.api.{Topic, TypedTopic} + +import io.circe.generic.semiauto.{deriveDecoder, deriveEncoder} +import io.circe.{Decoder, Encoder} + +final case class Ping(sender: String, num: Long) + +object Ping { + implicit val jsonDecoder: Decoder[Ping] = + deriveDecoder[Ping] + + implicit val jsonEncoder: Encoder[Ping] = + deriveEncoder[Ping] + + private[this] val counter = new AtomicLong(0) + def next(sender: String): Ping = + Ping(sender, counter.getAndIncrement()) + + val topic: TypedTopic[Ping] = + TypedTopic[Ping](Topic("ping")) +} diff --git a/modules/backend/src/main/scala/docspell/backend/msg/Topics.scala b/modules/backend/src/main/scala/docspell/backend/msg/Topics.scala new file mode 100644 index 00000000..996ff7c6 --- /dev/null +++ b/modules/backend/src/main/scala/docspell/backend/msg/Topics.scala @@ -0,0 +1,23 @@ +/* + * Copyright 2020 Eike K. & Contributors + * + * SPDX-License-Identifier: AGPL-3.0-or-later + */ + +package docspell.backend.msg + +import cats.data.NonEmptyList + +import docspell.pubsub.api.{Topic, TypedTopic} + +/** All topics used in Docspell. */ +object Topics { + + /** A generic notification to the job executors to look for new work. */ + val jobsNotify: TypedTopic[Unit] = + TypedTopic[Unit](Topic("jobs-notify")) + + /** A list of all topics. It is required to list every topic in use here! */ + val all: NonEmptyList[TypedTopic[_]] = + NonEmptyList.of(Ping.topic, JobDone.topic, CancelJob.topic, jobsNotify) +} diff --git a/modules/backend/src/main/scala/docspell/backend/ops/OJob.scala b/modules/backend/src/main/scala/docspell/backend/ops/OJob.scala index 4ab45fb4..7f7a36fd 100644 --- a/modules/backend/src/main/scala/docspell/backend/ops/OJob.scala +++ b/modules/backend/src/main/scala/docspell/backend/ops/OJob.scala @@ -11,8 +11,7 @@ import cats.effect._ import cats.implicits._ import docspell.backend.ops.OJob.{CollectiveQueueState, JobCancelResult} -import docspell.common.Priority -import docspell.common.{Ident, JobState} +import docspell.common._ import docspell.store.Store import docspell.store.UpdateResult import docspell.store.queries.QJob @@ -55,6 +54,7 @@ object OJob { joex: OJoex[F] ): Resource[F, OJob[F]] = Resource.pure[F, OJob[F]](new OJob[F] { + private[this] val logger = Logger.log4s(org.log4s.getLogger(OJob.getClass)) def queueState(collective: Ident, maxResults: Int): F[CollectiveQueueState] = store @@ -77,11 +77,9 @@ object OJob { job.worker match { case Some(worker) => for { - flag <- joex.cancelJob(job.id, worker) - res <- - if (flag) JobCancelResult.cancelRequested.pure[F] - else remove(job) - } yield res + _ <- logger.debug(s"Attempt to cancel job: ${job.id.id}") + _ <- joex.cancelJob(job.id, worker) + } yield JobCancelResult.cancelRequested case None => remove(job) } diff --git a/modules/backend/src/main/scala/docspell/backend/ops/OJoex.scala b/modules/backend/src/main/scala/docspell/backend/ops/OJoex.scala index d4aaf956..1ce43149 100644 --- a/modules/backend/src/main/scala/docspell/backend/ops/OJoex.scala +++ b/modules/backend/src/main/scala/docspell/backend/ops/OJoex.scala @@ -6,41 +6,27 @@ package docspell.backend.ops -import cats.data.OptionT import cats.effect._ -import cats.implicits._ -import docspell.common.{Ident, NodeType} -import docspell.joexapi.client.JoexClient -import docspell.store.Store -import docspell.store.records.RNode +import docspell.backend.msg.{CancelJob, Topics} +import docspell.common.Ident +import docspell.pubsub.api.PubSubT trait OJoex[F[_]] { def notifyAllNodes: F[Unit] - def cancelJob(job: Ident, worker: Ident): F[Boolean] - + def cancelJob(job: Ident, worker: Ident): F[Unit] } object OJoex { - - def apply[F[_]: Sync](client: JoexClient[F], store: Store[F]): Resource[F, OJoex[F]] = + def apply[F[_]](pubSub: PubSubT[F]): Resource[F, OJoex[F]] = Resource.pure[F, OJoex[F]](new OJoex[F] { + def notifyAllNodes: F[Unit] = - for { - nodes <- store.transact(RNode.findAll(NodeType.Joex)) - _ <- nodes.toList.traverse(n => client.notifyJoexIgnoreErrors(n.url)) - } yield () + pubSub.publish1IgnoreErrors(Topics.jobsNotify, ()) - def cancelJob(job: Ident, worker: Ident): F[Boolean] = - (for { - node <- OptionT(store.transact(RNode.findById(worker))) - cancel <- OptionT.liftF(client.cancelJob(node.url, job)) - } yield cancel.success).getOrElse(false) + def cancelJob(job: Ident, worker: Ident): F[Unit] = + pubSub.publish1IgnoreErrors(CancelJob.topic, CancelJob(job, worker)) }) - - def create[F[_]: Async](store: Store[F]): Resource[F, OJoex[F]] = - JoexClient.resource.flatMap(client => apply(client, store)) - } diff --git a/modules/common/src/main/scala/docspell/common/Logger.scala b/modules/common/src/main/scala/docspell/common/Logger.scala index 936c9d34..01265ef4 100644 --- a/modules/common/src/main/scala/docspell/common/Logger.scala +++ b/modules/common/src/main/scala/docspell/common/Logger.scala @@ -6,6 +6,7 @@ package docspell.common +import cats.Applicative import cats.effect.Sync import fs2.Stream @@ -45,6 +46,27 @@ trait Logger[F[_]] { self => object Logger { + def off[F[_]: Applicative]: Logger[F] = + new Logger[F] { + def trace(msg: => String): F[Unit] = + Applicative[F].pure(()) + + def debug(msg: => String): F[Unit] = + Applicative[F].pure(()) + + def info(msg: => String): F[Unit] = + Applicative[F].pure(()) + + def warn(msg: => String): F[Unit] = + Applicative[F].pure(()) + + def error(ex: Throwable)(msg: => String): F[Unit] = + Applicative[F].pure(()) + + def error(msg: => String): F[Unit] = + Applicative[F].pure(()) + } + def log4s[F[_]: Sync](log: Log4sLogger): Logger[F] = new Logger[F] { def trace(msg: => String): F[Unit] = diff --git a/modules/joex/src/main/scala/docspell/joex/Config.scala b/modules/joex/src/main/scala/docspell/joex/Config.scala index 2554d9bf..81172ff2 100644 --- a/modules/joex/src/main/scala/docspell/joex/Config.scala +++ b/modules/joex/src/main/scala/docspell/joex/Config.scala @@ -20,6 +20,7 @@ import docspell.joex.analysis.RegexNerFile import docspell.joex.hk.HouseKeepingConfig import docspell.joex.scheduler.{PeriodicSchedulerConfig, SchedulerConfig} import docspell.joex.updatecheck.UpdateCheckConfig +import docspell.pubsub.naive.PubSubConfig import docspell.store.JdbcConfig case class Config( @@ -39,7 +40,11 @@ case class Config( mailDebug: Boolean, fullTextSearch: Config.FullTextSearch, updateCheck: UpdateCheckConfig -) +) { + + def pubSubConfig: PubSubConfig = + PubSubConfig(appId, baseUrl / "internal" / "pubsub", 100) +} object Config { case class Bind(address: String, port: Int) diff --git a/modules/joex/src/main/scala/docspell/joex/JoexAppImpl.scala b/modules/joex/src/main/scala/docspell/joex/JoexAppImpl.scala index 86c65efc..85359092 100644 --- a/modules/joex/src/main/scala/docspell/joex/JoexAppImpl.scala +++ b/modules/joex/src/main/scala/docspell/joex/JoexAppImpl.scala @@ -6,14 +6,13 @@ package docspell.joex -import scala.concurrent.ExecutionContext - import cats.effect._ import cats.implicits._ import fs2.concurrent.SignallingRef import docspell.analysis.TextAnalyser import docspell.backend.fulltext.CreateIndex +import docspell.backend.msg.{CancelJob, Ping, Topics} import docspell.backend.ops._ import docspell.common._ import docspell.ftsclient.FtsClient @@ -34,6 +33,7 @@ import docspell.joex.scanmailbox._ import docspell.joex.scheduler._ import docspell.joex.updatecheck._ import docspell.joexapi.client.JoexClient +import docspell.pubsub.api.{PubSub, PubSubT} import docspell.store.Store import docspell.store.queue._ import docspell.store.records.{REmptyTrashSetting, RJobLog} @@ -41,19 +41,20 @@ import docspell.store.usertask.UserTaskScope import docspell.store.usertask.UserTaskStore import emil.javamail._ -import org.http4s.blaze.client.BlazeClientBuilder import org.http4s.client.Client final class JoexAppImpl[F[_]: Async]( cfg: Config, - nodeOps: ONode[F], store: Store[F], queue: JobQueue[F], + pubSubT: PubSubT[F], pstore: PeriodicTaskStore[F], termSignal: SignallingRef[F, Boolean], val scheduler: Scheduler[F], val periodicScheduler: PeriodicScheduler[F] ) extends JoexApp[F] { + private[this] val logger: Logger[F] = + Logger.log4s(org.log4s.getLogger(s"Joex-${cfg.appId.id}")) def init: F[Unit] = { val run = scheduler.start.compile.drain @@ -64,16 +65,26 @@ final class JoexAppImpl[F[_]: Async]( _ <- Async[F].start(prun) _ <- scheduler.periodicAwake _ <- periodicScheduler.periodicAwake - _ <- nodeOps.register(cfg.appId, NodeType.Joex, cfg.baseUrl) + _ <- subscriptions } yield () } + def subscriptions = + for { + _ <- Async[F].start(pubSubT.subscribeSink(Ping.topic) { msg => + logger.info(s">>>> PING $msg") + }) + _ <- Async[F].start(pubSubT.subscribeSink(Topics.jobsNotify) { _ => + scheduler.notifyChange + }) + _ <- Async[F].start(pubSubT.subscribeSink(CancelJob.topic) { msg => + scheduler.requestCancel(msg.body.jobId).as(()) + }) + } yield () + def findLogs(jobId: Ident): F[Vector[RJobLog]] = store.transact(RJobLog.findLogs(jobId)) - def shutdown: F[Unit] = - nodeOps.unregister(cfg.appId) - def initShutdown: F[Unit] = periodicScheduler.shutdown *> scheduler.shutdown(false) *> termSignal.set(true) @@ -116,16 +127,19 @@ object JoexAppImpl { def create[F[_]: Async]( cfg: Config, termSignal: SignallingRef[F, Boolean], - connectEC: ExecutionContext + store: Store[F], + httpClient: Client[F], + pubSub: PubSub[F] ): Resource[F, JoexApp[F]] = for { - httpClient <- BlazeClientBuilder[F].resource - client = JoexClient(httpClient) - store <- Store.create(cfg.jdbc, cfg.files.chunkSize, connectEC) queue <- JobQueue(store) pstore <- PeriodicTaskStore.create(store) - nodeOps <- ONode(store) - joex <- OJoex(client, store) + client = JoexClient(httpClient) + pubSubT = PubSubT( + pubSub, + Logger.log4s(org.log4s.getLogger(s"joex-${cfg.appId.id}")) + ) + joex <- OJoex(pubSubT) upload <- OUpload(store, queue, joex) fts <- createFtsClient(cfg)(httpClient) createIndex <- CreateIndex.resource(fts, store) @@ -138,6 +152,7 @@ object JoexAppImpl { JavaMailEmil(Settings.defaultSettings.copy(debug = cfg.mailDebug)) sch <- SchedulerBuilder(cfg.scheduler, store) .withQueue(queue) + .withPubSub(pubSubT) .withTask( JobTask.json( ProcessItemArgs.taskName, @@ -264,8 +279,8 @@ object JoexAppImpl { pstore, client ) - app = new JoexAppImpl(cfg, nodeOps, store, queue, pstore, termSignal, sch, psch) - appR <- Resource.make(app.init.map(_ => app))(_.shutdown) + app = new JoexAppImpl(cfg, store, queue, pubSubT, pstore, termSignal, sch, psch) + appR <- Resource.make(app.init.map(_ => app))(_.initShutdown) } yield appR private def createFtsClient[F[_]: Async]( diff --git a/modules/joex/src/main/scala/docspell/joex/JoexServer.scala b/modules/joex/src/main/scala/docspell/joex/JoexServer.scala index 8c4773dc..cad75cbc 100644 --- a/modules/joex/src/main/scala/docspell/joex/JoexServer.scala +++ b/modules/joex/src/main/scala/docspell/joex/JoexServer.scala @@ -11,10 +11,14 @@ import cats.effect._ import fs2.Stream import fs2.concurrent.SignallingRef +import docspell.backend.msg.Topics import docspell.common.Pools import docspell.joex.routes._ +import docspell.pubsub.naive.NaivePubSub +import docspell.store.Store import org.http4s.HttpApp +import org.http4s.blaze.client.BlazeClientBuilder import org.http4s.blaze.server.BlazeServerBuilder import org.http4s.implicits._ import org.http4s.server.Router @@ -33,9 +37,19 @@ object JoexServer { val app = for { signal <- Resource.eval(SignallingRef[F, Boolean](false)) exitCode <- Resource.eval(Ref[F].of(ExitCode.Success)) - joexApp <- JoexAppImpl.create[F](cfg, signal, pools.connectEC) + + store <- Store.create[F]( + cfg.jdbc, + cfg.files.chunkSize, + pools.connectEC + ) + httpClient <- BlazeClientBuilder[F].resource + pubSub <- NaivePubSub(cfg.pubSubConfig, store, httpClient)(Topics.all.map(_.topic)) + + joexApp <- JoexAppImpl.create[F](cfg, signal, store, httpClient, pubSub) httpApp = Router( + "/internal/pubsub" -> pubSub.receiveRoute, "/api/info" -> InfoRoutes(cfg), "/api/v1" -> JoexRoutes(joexApp) ).orNotFound diff --git a/modules/joex/src/main/scala/docspell/joex/scheduler/SchedulerBuilder.scala b/modules/joex/src/main/scala/docspell/joex/scheduler/SchedulerBuilder.scala index 4f981a87..cc09f7da 100644 --- a/modules/joex/src/main/scala/docspell/joex/scheduler/SchedulerBuilder.scala +++ b/modules/joex/src/main/scala/docspell/joex/scheduler/SchedulerBuilder.scala @@ -11,6 +11,7 @@ import cats.effect.std.Semaphore import cats.implicits._ import fs2.concurrent.SignallingRef +import docspell.pubsub.api.PubSubT import docspell.store.Store import docspell.store.queue.JobQueue @@ -19,7 +20,8 @@ case class SchedulerBuilder[F[_]: Async]( tasks: JobTaskRegistry[F], store: Store[F], queue: Resource[F, JobQueue[F]], - logSink: LogSink[F] + logSink: LogSink[F], + pubSub: PubSubT[F] ) { def withConfig(cfg: SchedulerConfig): SchedulerBuilder[F] = @@ -32,7 +34,7 @@ case class SchedulerBuilder[F[_]: Async]( withTaskRegistry(tasks.withTask(task)) def withQueue(queue: Resource[F, JobQueue[F]]): SchedulerBuilder[F] = - SchedulerBuilder[F](config, tasks, store, queue, logSink) + copy(queue = queue) def withLogSink(sink: LogSink[F]): SchedulerBuilder[F] = copy(logSink = sink) @@ -40,6 +42,9 @@ case class SchedulerBuilder[F[_]: Async]( def withQueue(queue: JobQueue[F]): SchedulerBuilder[F] = copy(queue = Resource.pure[F, JobQueue[F]](queue)) + def withPubSub(pubSubT: PubSubT[F]): SchedulerBuilder[F] = + copy(pubSub = pubSubT) + def serve: Resource[F, Scheduler[F]] = resource.evalMap(sch => Async[F].start(sch.start.compile.drain).map(_ => sch)) @@ -52,6 +57,7 @@ case class SchedulerBuilder[F[_]: Async]( } yield new SchedulerImpl[F]( config, jq, + pubSub, tasks, store, logSink, @@ -76,7 +82,8 @@ object SchedulerBuilder { JobTaskRegistry.empty[F], store, JobQueue(store), - LogSink.db[F](store) + LogSink.db[F](store), + PubSubT.noop[F] ) } diff --git a/modules/joex/src/main/scala/docspell/joex/scheduler/SchedulerImpl.scala b/modules/joex/src/main/scala/docspell/joex/scheduler/SchedulerImpl.scala index d01d6756..e38a282b 100644 --- a/modules/joex/src/main/scala/docspell/joex/scheduler/SchedulerImpl.scala +++ b/modules/joex/src/main/scala/docspell/joex/scheduler/SchedulerImpl.scala @@ -13,19 +13,22 @@ import cats.implicits._ import fs2.Stream import fs2.concurrent.SignallingRef +import docspell.backend.msg.JobDone import docspell.common._ import docspell.common.syntax.all._ import docspell.joex.scheduler.SchedulerImpl._ +import docspell.pubsub.api.PubSubT import docspell.store.Store import docspell.store.queries.QJob import docspell.store.queue.JobQueue import docspell.store.records.RJob -import org.log4s._ +import org.log4s.getLogger final class SchedulerImpl[F[_]: Async]( val config: SchedulerConfig, queue: JobQueue[F], + pubSub: PubSubT[F], tasks: JobTaskRegistry[F], store: Store[F], logSink: LogSink[F], @@ -55,20 +58,21 @@ final class SchedulerImpl[F[_]: Async]( state.get.flatMap(s => QJob.findAll(s.getRunning, store)) def requestCancel(jobId: Ident): F[Boolean] = - state.get.flatMap(_.cancelRequest(jobId) match { - case Some(ct) => ct.map(_ => true) - case None => - (for { - job <- OptionT(store.transact(RJob.findByIdAndWorker(jobId, config.name))) - _ <- OptionT.liftF( - if (job.isInProgress) executeCancel(job) - else ().pure[F] - ) - } yield true) - .getOrElseF( - logger.fwarn(s"Job ${jobId.id} not found, cannot cancel.").map(_ => false) - ) - }) + logger.finfo(s"Scheduler requested to cancel job: ${jobId.id}") *> + state.get.flatMap(_.cancelRequest(jobId) match { + case Some(ct) => ct.map(_ => true) + case None => + (for { + job <- OptionT(store.transact(RJob.findByIdAndWorker(jobId, config.name))) + _ <- OptionT.liftF( + if (job.isInProgress) executeCancel(job) + else ().pure[F] + ) + } yield true) + .getOrElseF( + logger.fwarn(s"Job ${jobId.id} not found, cannot cancel.").map(_ => false) + ) + }) def notifyChange: F[Unit] = waiter.update(b => !b) @@ -198,6 +202,10 @@ final class SchedulerImpl[F[_]: Async]( ) _ <- state.modify(_.removeRunning(job)) _ <- QJob.setFinalState(job.id, finalState, store) + _ <- pubSub.publish1IgnoreErrors( + JobDone.topic, + JobDone(job.id, job.task, job.args, finalState) + ) } yield () def onStart(job: RJob): F[Unit] = diff --git a/modules/pubsub/api/src/main/scala/docspell/pubsub/api/PubSub.scala b/modules/pubsub/api/src/main/scala/docspell/pubsub/api/PubSub.scala index 4ae9ef6a..6a9eb9a5 100644 --- a/modules/pubsub/api/src/main/scala/docspell/pubsub/api/PubSub.scala +++ b/modules/pubsub/api/src/main/scala/docspell/pubsub/api/PubSub.scala @@ -6,9 +6,12 @@ package docspell.pubsub.api +import cats.Applicative import cats.data.NonEmptyList import fs2.{Pipe, Stream} +import docspell.common.{Ident, Timestamp} + import io.circe.Json trait PubSub[F[_]] { @@ -18,3 +21,16 @@ trait PubSub[F[_]] { def subscribe(topics: NonEmptyList[Topic]): Stream[F, Message[Json]] } +object PubSub { + def noop[F[_]: Applicative]: PubSub[F] = + new PubSub[F] { + def publish1(topic: Topic, msg: Json): F[MessageHead] = + Applicative[F].pure(MessageHead(Ident.unsafe("0"), Timestamp.Epoch, topic)) + + def publish(topic: Topic): Pipe[F, Json, MessageHead] = + _ => Stream.empty + + def subscribe(topics: NonEmptyList[Topic]): Stream[F, Message[Json]] = + Stream.empty + } +} diff --git a/modules/pubsub/api/src/main/scala/docspell/pubsub/api/PubSubT.scala b/modules/pubsub/api/src/main/scala/docspell/pubsub/api/PubSubT.scala index 9f28b798..d07f5e41 100644 --- a/modules/pubsub/api/src/main/scala/docspell/pubsub/api/PubSubT.scala +++ b/modules/pubsub/api/src/main/scala/docspell/pubsub/api/PubSubT.scala @@ -7,6 +7,9 @@ package docspell.pubsub.api import cats.data.NonEmptyList +import cats.effect._ +import cats.implicits._ +import fs2.concurrent.SignallingRef import fs2.{Pipe, Stream} import docspell.common.Logger @@ -15,22 +18,35 @@ trait PubSubT[F[_]] { def publish1[A](topic: TypedTopic[A], msg: A): F[MessageHead] + def publish1IgnoreErrors[A](topic: TypedTopic[A], msg: A): F[Unit] + def publish[A](topic: TypedTopic[A]): Pipe[F, A, MessageHead] def subscribe[A](topic: TypedTopic[A]): Stream[F, Message[A]] + def subscribeSink[A](topic: TypedTopic[A])(handler: Message[A] => F[Unit]): F[F[Unit]] + def delegate: PubSub[F] def withDelegate(delegate: PubSub[F]): PubSubT[F] } object PubSubT { + def noop[F[_]: Async]: PubSubT[F] = + PubSubT(PubSub.noop[F], Logger.off[F]) - def apply[F[_]](pubSub: PubSub[F], logger: Logger[F]): PubSubT[F] = + def apply[F[_]: Async](pubSub: PubSub[F], logger: Logger[F]): PubSubT[F] = new PubSubT[F] { def publish1[A](topic: TypedTopic[A], msg: A): F[MessageHead] = pubSub.publish1(topic.topic, topic.codec(msg)) + def publish1IgnoreErrors[A](topic: TypedTopic[A], msg: A): F[Unit] = + publish1(topic, msg).attempt.flatMap { + case Right(_) => ().pure[F] + case Left(ex) => + logger.error(ex)(s"Error publishing to topic ${topic.topic.name}: $msg") + } + def publish[A](topic: TypedTopic[A]): Pipe[F, A, MessageHead] = _.map(topic.codec.apply).through(pubSub.publish(topic.topic)) @@ -49,6 +65,18 @@ object PubSubT { } ) + def subscribeSink[A]( + topic: TypedTopic[A] + )(handler: Message[A] => F[Unit]): F[F[Unit]] = + for { + halt <- SignallingRef.of[F, Boolean](false) + _ <- subscribe(topic) + .evalMap(handler) + .interruptWhen(halt) + .compile + .drain + } yield halt.set(true) + def delegate: PubSub[F] = pubSub def withDelegate(newDelegate: PubSub[F]): PubSubT[F] = diff --git a/modules/pubsub/api/src/main/scala/docspell/pubsub/api/Topics.scala b/modules/pubsub/api/src/main/scala/docspell/pubsub/api/Topics.scala deleted file mode 100644 index a536607a..00000000 --- a/modules/pubsub/api/src/main/scala/docspell/pubsub/api/Topics.scala +++ /dev/null @@ -1,61 +0,0 @@ -/* - * Copyright 2020 Eike K. & Contributors - * - * SPDX-License-Identifier: AGPL-3.0-or-later - */ - -package docspell.pubsub.api - -import cats.data.NonEmptyList - -import docspell.common.Ident - -import io.circe.generic.semiauto.{deriveDecoder, deriveEncoder} -import io.circe.{Decoder, Encoder} - -/** All topics used in Docspell. */ -object Topics { - - /** Notify when a job has finished. */ - val jobDone: TypedTopic[JobDoneMsg] = TypedTopic[JobDoneMsg](Topic("job-done")) - - /** Notify when a job has been submitted. The job executor listens to these messages to - * wake up and do its work. - */ - val jobSubmitted: TypedTopic[JobSubmittedMsg] = - TypedTopic[JobSubmittedMsg](Topic("job-submitted")) - - /** Notify a node to cancel a job with the given id */ - val cancelJob: TypedTopic[CancelJobMsg] = - TypedTopic[CancelJobMsg](Topic("cancel-job")) - - val all: NonEmptyList[TypedTopic[_]] = NonEmptyList.of(jobDone, jobSubmitted, cancelJob) - - final case class JobSubmittedMsg(task: Ident) - object JobSubmittedMsg { - implicit val jsonDecoder: Decoder[JobSubmittedMsg] = - deriveDecoder[JobSubmittedMsg] - - implicit val jsonEncoder: Encoder[JobSubmittedMsg] = - deriveEncoder[JobSubmittedMsg] - } - - final case class JobDoneMsg(jobId: Ident, task: Ident) - object JobDoneMsg { - implicit val jsonDecoder: Decoder[JobDoneMsg] = - deriveDecoder[JobDoneMsg] - - implicit val jsonEncoder: Encoder[JobDoneMsg] = - deriveEncoder[JobDoneMsg] - } - - final case class CancelJobMsg(jobId: Ident, nodeId: Ident) - object CancelJobMsg { - implicit val jsonDecoder: Decoder[CancelJobMsg] = - deriveDecoder[CancelJobMsg] - - implicit val jsonEncoder: Encoder[CancelJobMsg] = - deriveEncoder[CancelJobMsg] - - } -} diff --git a/modules/pubsub/naive/src/main/scala/docspell/pubsub/naive/NaivePubSub.scala b/modules/pubsub/naive/src/main/scala/docspell/pubsub/naive/NaivePubSub.scala index fb7bb1cb..cb7370ad 100644 --- a/modules/pubsub/naive/src/main/scala/docspell/pubsub/naive/NaivePubSub.scala +++ b/modules/pubsub/naive/src/main/scala/docspell/pubsub/naive/NaivePubSub.scala @@ -155,23 +155,16 @@ final class NaivePubSub[F[_]: Async]( for { _ <- logger.trace(s"Find all nodes subscribed to topic ${msg.head.topic.name}") - urls <- store.transact(RPubSub.findSubs(msg.head.topic.name)) + urls <- store.transact(RPubSub.findSubs(msg.head.topic.name, cfg.nodeId)) _ <- logger.trace(s"Publishing to remote urls ${urls.map(_.asString)}: $msg") reqs = urls .map(u => Uri.unsafeFromString(u.asString)) .map(uri => POST(List(msg), uri)) - res <- reqs.traverse(req => client.status(req)).attempt - _ <- res match { + resList <- reqs.traverse(req => client.status(req).attempt) + _ <- resList.traverse { case Right(s) => - if (s.forall(_.isSuccess)) ().pure[F] - else if (s.size == urls.size) - logger.warn( - s"No nodes was be reached! Reason: $s, message: $msg" - ) - else - logger.warn( - s"Some nodes were not reached! Reason: $s, message: $msg" - ) + if (s.isSuccess) ().pure[F] + else logger.warn(s"A node was not reached! Reason: $s, message: $msg") case Left(ex) => logger.error(ex)(s"Error publishing ${msg.head.topic.name} message remotely") } diff --git a/modules/pubsub/naive/src/test/scala/docspell/pubsub/naive/NaivePubSubTest.scala b/modules/pubsub/naive/src/test/scala/docspell/pubsub/naive/NaivePubSubTest.scala index ece6f443..8d3094c2 100644 --- a/modules/pubsub/naive/src/test/scala/docspell/pubsub/naive/NaivePubSubTest.scala +++ b/modules/pubsub/naive/src/test/scala/docspell/pubsub/naive/NaivePubSubTest.scala @@ -13,8 +13,8 @@ import cats.implicits._ import fs2.concurrent.SignallingRef import docspell.common._ -import docspell.pubsub.api.Topics.{JobDoneMsg, JobSubmittedMsg} import docspell.pubsub.api._ +import docspell.pubsub.naive.Topics._ import munit.CatsEffectSuite @@ -104,7 +104,7 @@ class NaivePubSubTest extends CatsEffectSuite with Fixtures { } pubsubEnv.test("do not receive remote message from other topic") { env => - val msg = JobDoneMsg("job-1".id, "task-2".id) + val msg = JobCancelMsg("job-1".id) // Create two pubsub instances connected to the same database conntectedPubsubs(env).use { case (ps1, ps2) => @@ -112,7 +112,7 @@ class NaivePubSubTest extends CatsEffectSuite with Fixtures { // subscribe to ps1 and send via ps2 res <- subscribe(ps1, Topics.jobSubmitted) (received, halt, subFiber) = res - _ <- ps2.publish1(Topics.jobDone, msg) + _ <- ps2.publish1(Topics.jobCancel, msg) _ <- IO.sleep(100.millis) _ <- halt.set(true) outcome <- subFiber.join diff --git a/modules/pubsub/naive/src/test/scala/docspell/pubsub/naive/Topics.scala b/modules/pubsub/naive/src/test/scala/docspell/pubsub/naive/Topics.scala new file mode 100644 index 00000000..8388efa0 --- /dev/null +++ b/modules/pubsub/naive/src/test/scala/docspell/pubsub/naive/Topics.scala @@ -0,0 +1,37 @@ +/* + * Copyright 2020 Eike K. & Contributors + * + * SPDX-License-Identifier: AGPL-3.0-or-later + */ + +package docspell.pubsub.naive + +import cats.data.NonEmptyList + +import docspell.common.Ident +import docspell.pubsub.api._ + +import io.circe.generic.semiauto.{deriveDecoder, deriveEncoder} +import io.circe.{Decoder, Encoder} + +object Topics { + val jobSubmitted: TypedTopic[JobSubmittedMsg] = + TypedTopic[JobSubmittedMsg](Topic("test-job-submitted")) + + final case class JobSubmittedMsg(task: Ident) + object JobSubmittedMsg { + implicit val encode: Encoder[JobSubmittedMsg] = deriveEncoder[JobSubmittedMsg] + implicit val decode: Decoder[JobSubmittedMsg] = deriveDecoder[JobSubmittedMsg] + } + + val jobCancel: TypedTopic[JobCancelMsg] = + TypedTopic[JobCancelMsg](Topic("test-job-done")) + final case class JobCancelMsg(id: Ident) + object JobCancelMsg { + implicit val encode: Encoder[JobCancelMsg] = deriveEncoder[JobCancelMsg] + implicit val decode: Decoder[JobCancelMsg] = deriveDecoder[JobCancelMsg] + } + + def all: NonEmptyList[TypedTopic[_]] = + NonEmptyList.of(jobSubmitted, jobCancel) +} diff --git a/modules/restserver/src/main/scala/docspell/restserver/Config.scala b/modules/restserver/src/main/scala/docspell/restserver/Config.scala index 4b64d7e7..589e5ff1 100644 --- a/modules/restserver/src/main/scala/docspell/restserver/Config.scala +++ b/modules/restserver/src/main/scala/docspell/restserver/Config.scala @@ -11,6 +11,7 @@ import docspell.backend.{Config => BackendConfig} import docspell.common._ import docspell.ftssolr.SolrConfig import docspell.oidc.ProviderConfig +import docspell.pubsub.naive.PubSubConfig import docspell.restserver.Config.OpenIdConfig import docspell.restserver.auth.OpenId @@ -33,6 +34,9 @@ case class Config( ) { def openIdEnabled: Boolean = openid.exists(_.enabled) + + def pubSubConfig: PubSubConfig = + PubSubConfig(appId, baseUrl / "internal" / "pubsub", 100) } object Config { diff --git a/modules/restserver/src/main/scala/docspell/restserver/RestApp.scala b/modules/restserver/src/main/scala/docspell/restserver/RestApp.scala index 975ae73c..68383b0a 100644 --- a/modules/restserver/src/main/scala/docspell/restserver/RestApp.scala +++ b/modules/restserver/src/main/scala/docspell/restserver/RestApp.scala @@ -10,8 +10,6 @@ import docspell.backend.BackendApp trait RestApp[F[_]] { - def init: F[Unit] - def config: Config def backend: BackendApp[F] diff --git a/modules/restserver/src/main/scala/docspell/restserver/RestAppImpl.scala b/modules/restserver/src/main/scala/docspell/restserver/RestAppImpl.scala index 74b6a303..313ec05e 100644 --- a/modules/restserver/src/main/scala/docspell/restserver/RestAppImpl.scala +++ b/modules/restserver/src/main/scala/docspell/restserver/RestAppImpl.scala @@ -6,41 +6,52 @@ package docspell.restserver -import scala.concurrent.ExecutionContext - import cats.effect._ import cats.implicits._ import docspell.backend.BackendApp -import docspell.common.NodeType +import docspell.backend.msg.{JobDone, Ping} +import docspell.common.Logger import docspell.ftsclient.FtsClient import docspell.ftssolr.SolrFtsClient +import docspell.pubsub.api.{PubSub, PubSubT} +import docspell.store.Store import org.http4s.client.Client final class RestAppImpl[F[_]](val config: Config, val backend: BackendApp[F]) - extends RestApp[F] { - - def init: F[Unit] = - backend.node.register(config.appId, NodeType.Restserver, config.baseUrl) - - def shutdown: F[Unit] = - backend.node.unregister(config.appId) -} + extends RestApp[F] {} object RestAppImpl { def create[F[_]: Async]( cfg: Config, - connectEC: ExecutionContext - ): Resource[F, RestApp[F]] = + store: Store[F], + httpClient: Client[F], + pubSub: PubSub[F] + ): Resource[F, RestApp[F]] = { + val logger = Logger.log4s(org.log4s.getLogger(s"restserver-${cfg.appId.id}")) for { - backend <- BackendApp(cfg.backend, connectEC)( - createFtsClient[F](cfg) - ) + ftsClient <- createFtsClient(cfg)(httpClient) + pubSubT = PubSubT(pubSub, logger) + backend <- BackendApp.create[F](cfg.backend, store, ftsClient, pubSubT) + _ <- Resource.eval(subscriptions(backend, logger)) app = new RestAppImpl[F](cfg, backend) - appR <- Resource.make(app.init.map(_ => app))(_.shutdown) - } yield appR + } yield app + } + + private def subscriptions[F[_]: Async]( + backend: BackendApp[F], + logger: Logger[F] + ): F[Unit] = + for { + _ <- Async[F].start(backend.pubSub.subscribeSink(Ping.topic) { msg => + logger.info(s">>>> PING $msg") + }) + _ <- Async[F].start(backend.pubSub.subscribeSink(JobDone.topic) { msg => + logger.info(s">>>> Job Done $msg") + }) + } yield () private def createFtsClient[F[_]: Async]( cfg: Config diff --git a/modules/restserver/src/main/scala/docspell/restserver/RestServer.scala b/modules/restserver/src/main/scala/docspell/restserver/RestServer.scala index f64f0a93..e5531fbe 100644 --- a/modules/restserver/src/main/scala/docspell/restserver/RestServer.scala +++ b/modules/restserver/src/main/scala/docspell/restserver/RestServer.scala @@ -11,12 +11,15 @@ import cats.implicits._ import fs2.Stream import docspell.backend.auth.{AuthToken, ShareToken} +import docspell.backend.msg.Topics import docspell.common._ import docspell.oidc.CodeFlowRoutes +import docspell.pubsub.naive.NaivePubSub import docspell.restserver.auth.OpenId import docspell.restserver.http4s.EnvMiddleware import docspell.restserver.routes._ import docspell.restserver.webapp._ +import docspell.store.Store import org.http4s._ import org.http4s.blaze.client.BlazeClientBuilder @@ -34,9 +37,17 @@ object RestServer { val templates = TemplateRoutes[F](cfg) val app = for { - restApp <- RestAppImpl.create[F](cfg, pools.connectEC) + store <- Store.create[F]( + cfg.backend.jdbc, + cfg.backend.files.chunkSize, + pools.connectEC + ) + httpClient <- BlazeClientBuilder[F].resource + pubSub <- NaivePubSub(cfg.pubSubConfig, store, httpClient)(Topics.all.map(_.topic)) + restApp <- RestAppImpl.create[F](cfg, store, httpClient, pubSub) httpClient <- BlazeClientBuilder[F].resource httpApp = Router( + "/internal/pubsub" -> pubSub.receiveRoute, "/api/info" -> routes.InfoRoutes(), "/api/v1/open/" -> openRoutes(cfg, httpClient, restApp), "/api/v1/sec/" -> Authenticate(restApp.backend.login, cfg.auth) { token => diff --git a/modules/store/src/main/scala/docspell/store/records/RPubSub.scala b/modules/store/src/main/scala/docspell/store/records/RPubSub.scala index f0e2ce6e..6509aaea 100644 --- a/modules/store/src/main/scala/docspell/store/records/RPubSub.scala +++ b/modules/store/src/main/scala/docspell/store/records/RPubSub.scala @@ -79,8 +79,12 @@ object RPubSub { ) ) - def findSubs(topic: String): ConnectionIO[List[LenientUri]] = - run(select(T.url), from(T), T.topic === topic && T.counter > 0) + def findSubs(topic: String, excludeNode: Ident): ConnectionIO[List[LenientUri]] = + run( + select(T.url), + from(T), + T.topic === topic && T.counter > 0 && T.nodeId <> excludeNode + ) .query[LenientUri] .to[List] } From 3e58d97f72c990b2bd5b70b84dc3236dd33205ca Mon Sep 17 00:00:00 2001 From: eikek Date: Sat, 6 Nov 2021 21:32:07 +0100 Subject: [PATCH 4/7] Add websockets and notify frontend when an item is processed --- .../scala/docspell/backend/msg/Ping.scala | 31 ----- .../scala/docspell/backend/msg/Topics.scala | 2 +- .../docspell/common/syntax/StringSyntax.scala | 5 +- .../scala/docspell/joex/JoexAppImpl.scala | 8 +- .../docspell/pubsub/naive/NaivePubSub.scala | 24 ++-- .../main/scala/docspell/restserver/Main.scala | 8 +- .../docspell/restserver/RestAppImpl.scala | 16 --- .../docspell/restserver/RestServer.scala | 112 ++++++++++++------ .../docspell/restserver/Subscriptions.scala | 35 ++++++ .../docspell/restserver/ws/InputMessage.scala | 11 ++ .../docspell/restserver/ws/OutputEvent.scala | 32 +++++ .../restserver/ws/WebSocketRoutes.scala | 44 +++++++ modules/webapp/src/main/elm/App/Data.elm | 1 + modules/webapp/src/main/elm/App/Update.elm | 7 ++ modules/webapp/src/main/elm/Main.elm | 1 + modules/webapp/src/main/elm/Ports.elm | 10 ++ modules/webapp/src/main/webjar/docspell.js | 10 ++ 17 files changed, 243 insertions(+), 114 deletions(-) delete mode 100644 modules/backend/src/main/scala/docspell/backend/msg/Ping.scala create mode 100644 modules/restserver/src/main/scala/docspell/restserver/Subscriptions.scala create mode 100644 modules/restserver/src/main/scala/docspell/restserver/ws/InputMessage.scala create mode 100644 modules/restserver/src/main/scala/docspell/restserver/ws/OutputEvent.scala create mode 100644 modules/restserver/src/main/scala/docspell/restserver/ws/WebSocketRoutes.scala diff --git a/modules/backend/src/main/scala/docspell/backend/msg/Ping.scala b/modules/backend/src/main/scala/docspell/backend/msg/Ping.scala deleted file mode 100644 index b300a76d..00000000 --- a/modules/backend/src/main/scala/docspell/backend/msg/Ping.scala +++ /dev/null @@ -1,31 +0,0 @@ -/* - * Copyright 2020 Eike K. & Contributors - * - * SPDX-License-Identifier: AGPL-3.0-or-later - */ - -package docspell.backend.msg - -import java.util.concurrent.atomic.AtomicLong - -import docspell.pubsub.api.{Topic, TypedTopic} - -import io.circe.generic.semiauto.{deriveDecoder, deriveEncoder} -import io.circe.{Decoder, Encoder} - -final case class Ping(sender: String, num: Long) - -object Ping { - implicit val jsonDecoder: Decoder[Ping] = - deriveDecoder[Ping] - - implicit val jsonEncoder: Encoder[Ping] = - deriveEncoder[Ping] - - private[this] val counter = new AtomicLong(0) - def next(sender: String): Ping = - Ping(sender, counter.getAndIncrement()) - - val topic: TypedTopic[Ping] = - TypedTopic[Ping](Topic("ping")) -} diff --git a/modules/backend/src/main/scala/docspell/backend/msg/Topics.scala b/modules/backend/src/main/scala/docspell/backend/msg/Topics.scala index 996ff7c6..26c594f7 100644 --- a/modules/backend/src/main/scala/docspell/backend/msg/Topics.scala +++ b/modules/backend/src/main/scala/docspell/backend/msg/Topics.scala @@ -19,5 +19,5 @@ object Topics { /** A list of all topics. It is required to list every topic in use here! */ val all: NonEmptyList[TypedTopic[_]] = - NonEmptyList.of(Ping.topic, JobDone.topic, CancelJob.topic, jobsNotify) + NonEmptyList.of(JobDone.topic, CancelJob.topic, jobsNotify) } diff --git a/modules/common/src/main/scala/docspell/common/syntax/StringSyntax.scala b/modules/common/src/main/scala/docspell/common/syntax/StringSyntax.scala index 6fc06440..40d5bf80 100644 --- a/modules/common/src/main/scala/docspell/common/syntax/StringSyntax.scala +++ b/modules/common/src/main/scala/docspell/common/syntax/StringSyntax.scala @@ -12,9 +12,7 @@ import io.circe.Decoder import io.circe.parser._ trait StringSyntax { - implicit class EvenMoreStringOps(s: String) { - def asNonBlank: Option[String] = Option(s).filter(_.trim.nonEmpty) @@ -24,5 +22,6 @@ trait StringSyntax { value <- json.as[A] } yield value } - } + +object StringSyntax extends StringSyntax diff --git a/modules/joex/src/main/scala/docspell/joex/JoexAppImpl.scala b/modules/joex/src/main/scala/docspell/joex/JoexAppImpl.scala index 85359092..2ec86d11 100644 --- a/modules/joex/src/main/scala/docspell/joex/JoexAppImpl.scala +++ b/modules/joex/src/main/scala/docspell/joex/JoexAppImpl.scala @@ -12,7 +12,7 @@ import fs2.concurrent.SignallingRef import docspell.analysis.TextAnalyser import docspell.backend.fulltext.CreateIndex -import docspell.backend.msg.{CancelJob, Ping, Topics} +import docspell.backend.msg.{CancelJob, Topics} import docspell.backend.ops._ import docspell.common._ import docspell.ftsclient.FtsClient @@ -53,9 +53,6 @@ final class JoexAppImpl[F[_]: Async]( val scheduler: Scheduler[F], val periodicScheduler: PeriodicScheduler[F] ) extends JoexApp[F] { - private[this] val logger: Logger[F] = - Logger.log4s(org.log4s.getLogger(s"Joex-${cfg.appId.id}")) - def init: F[Unit] = { val run = scheduler.start.compile.drain val prun = periodicScheduler.start.compile.drain @@ -71,9 +68,6 @@ final class JoexAppImpl[F[_]: Async]( def subscriptions = for { - _ <- Async[F].start(pubSubT.subscribeSink(Ping.topic) { msg => - logger.info(s">>>> PING $msg") - }) _ <- Async[F].start(pubSubT.subscribeSink(Topics.jobsNotify) { _ => scheduler.notifyChange }) diff --git a/modules/pubsub/naive/src/main/scala/docspell/pubsub/naive/NaivePubSub.scala b/modules/pubsub/naive/src/main/scala/docspell/pubsub/naive/NaivePubSub.scala index cb7370ad..16ad7096 100644 --- a/modules/pubsub/naive/src/main/scala/docspell/pubsub/naive/NaivePubSub.scala +++ b/modules/pubsub/naive/src/main/scala/docspell/pubsub/naive/NaivePubSub.scala @@ -26,30 +26,30 @@ import org.http4s.client.dsl.Http4sClientDsl import org.http4s.dsl.Http4sDsl import org.http4s.{HttpRoutes, Uri} -/** A pubsub implementation that can be used across machines, but uses a rather - * inefficient protocol. The reason is to make it work with the current setup, i.e. not - * requiring to add another complex piece of software to the mix, like Kafka or RabbitMQ. +/** A pubsub implementation that can be used across machines, using a rather inefficient + * but simple protocol. It can therefore work with the current setup, i.e. not requiring + * to add another complex piece of software to the mix, like Kafka or RabbitMQ. * * However, the api should allow to be used on top of such a tool. This implementation * can be used in a personal setting, where there are only a few nodes. * - * How it works: + * How it works: Each node has a set of local subscribers and a http endpoint. If it + * publishes a message, it notifies all local subscribers and sends out a json message to + * all endpoints that are registered for this topic. If it receives a messagen through + * its endpoint, it notifies all local subscribers. * - * It is build on the `Topic` class from fs2.concurrent. A map of a topic name to such a + * It is build on the `Topic` class from fs2.concurrent. A map of the name to such a * `Topic` instance is maintained. To work across machines, the database is used as a - * synchronization point. Each subscriber must provide a http api and so its "callback" - * URL is added into the database to the list of remote subscribers. + * synchronization point. Each node must provide a http api and so its "callback" URL is + * added into the database associated to a topic name. * * When publishing a message, the message can be published to the internal fs2 topic. * Then all URLs to this topic name are looked up in the database and the message is * POSTed to each URL as JSON. The endpoint of each machine takes this message and * publishes it to its own internal fs2.concurrent.Topic instance. * - * Obviously, there are drawbacks: it is slow, because the messages go through http and - * connections must be opened/closed etc and the database is hit as well. Then it doesn't - * scale to lots of machines and messages. The upside is, that it works with the current - * setup and it should be good enough for personal use, where there are only a small - * amount of machines and messages. + * Obviously, this doesn't scale well to lots of machines and messages. It should be good + * enough for personal use, where there are only a small amount of machines and messages. * * The main use case for docspell is to communicate between the rest-server and job * executor. It is for internal communication and all topics are known at compile time. diff --git a/modules/restserver/src/main/scala/docspell/restserver/Main.scala b/modules/restserver/src/main/scala/docspell/restserver/Main.scala index 5907ff41..61844bdd 100644 --- a/modules/restserver/src/main/scala/docspell/restserver/Main.scala +++ b/modules/restserver/src/main/scala/docspell/restserver/Main.scala @@ -38,12 +38,6 @@ object Main extends IOApp { pools = connectEC.map(Pools.apply) rc <- - pools.use(p => - RestServer - .stream[IO](cfg, p) - .compile - .drain - .as(ExitCode.Success) - ) + pools.use(p => RestServer.serve[IO](cfg, p)) } yield rc } diff --git a/modules/restserver/src/main/scala/docspell/restserver/RestAppImpl.scala b/modules/restserver/src/main/scala/docspell/restserver/RestAppImpl.scala index 313ec05e..65a5ad5e 100644 --- a/modules/restserver/src/main/scala/docspell/restserver/RestAppImpl.scala +++ b/modules/restserver/src/main/scala/docspell/restserver/RestAppImpl.scala @@ -7,10 +7,8 @@ package docspell.restserver import cats.effect._ -import cats.implicits._ import docspell.backend.BackendApp -import docspell.backend.msg.{JobDone, Ping} import docspell.common.Logger import docspell.ftsclient.FtsClient import docspell.ftssolr.SolrFtsClient @@ -35,24 +33,10 @@ object RestAppImpl { ftsClient <- createFtsClient(cfg)(httpClient) pubSubT = PubSubT(pubSub, logger) backend <- BackendApp.create[F](cfg.backend, store, ftsClient, pubSubT) - _ <- Resource.eval(subscriptions(backend, logger)) app = new RestAppImpl[F](cfg, backend) } yield app } - private def subscriptions[F[_]: Async]( - backend: BackendApp[F], - logger: Logger[F] - ): F[Unit] = - for { - _ <- Async[F].start(backend.pubSub.subscribeSink(Ping.topic) { msg => - logger.info(s">>>> PING $msg") - }) - _ <- Async[F].start(backend.pubSub.subscribeSink(JobDone.topic) { msg => - logger.info(s">>>> Job Done $msg") - }) - } yield () - private def createFtsClient[F[_]: Async]( cfg: Config )(client: Client[F]): Resource[F, FtsClient[F]] = diff --git a/modules/restserver/src/main/scala/docspell/restserver/RestServer.scala b/modules/restserver/src/main/scala/docspell/restserver/RestServer.scala index e5531fbe..707c7aeb 100644 --- a/modules/restserver/src/main/scala/docspell/restserver/RestServer.scala +++ b/modules/restserver/src/main/scala/docspell/restserver/RestServer.scala @@ -6,9 +6,12 @@ package docspell.restserver +import scala.concurrent.duration._ + import cats.effect._ import cats.implicits._ import fs2.Stream +import fs2.concurrent.Topic import docspell.backend.auth.{AuthToken, ShareToken} import docspell.backend.msg.Topics @@ -19,6 +22,8 @@ import docspell.restserver.auth.OpenId import docspell.restserver.http4s.EnvMiddleware import docspell.restserver.routes._ import docspell.restserver.webapp._ +import docspell.restserver.ws.OutputEvent.KeepAlive +import docspell.restserver.ws.{OutputEvent, WebSocketRoutes} import docspell.store.Store import org.http4s._ @@ -30,63 +35,96 @@ import org.http4s.headers.Location import org.http4s.implicits._ import org.http4s.server.Router import org.http4s.server.middleware.Logger +import org.http4s.server.websocket.WebSocketBuilder2 object RestServer { - def stream[F[_]: Async](cfg: Config, pools: Pools): Stream[F, Nothing] = { + def serve[F[_]: Async](cfg: Config, pools: Pools): F[ExitCode] = + for { + wsTopic <- Topic[F, OutputEvent] + keepAlive = Stream + .awakeEvery[F](30.seconds) + .map(_ => KeepAlive) + .through(wsTopic.publish) - val templates = TemplateRoutes[F](cfg) - val app = for { + server = + Stream + .resource(createApp(cfg, pools)) + .flatMap { case (restApp, pubSub, httpClient) => + Stream( + Subscriptions(wsTopic, restApp.backend.pubSub), + BlazeServerBuilder[F] + .bindHttp(cfg.bind.port, cfg.bind.address) + .withoutBanner + .withHttpWebSocketApp( + createHttpApp(cfg, httpClient, pubSub, restApp, wsTopic) + ) + .serve + .drain + ) + } + + exit <- + (server ++ Stream(keepAlive)).parJoinUnbounded.compile.drain.as(ExitCode.Success) + } yield exit + + def createApp[F[_]: Async]( + cfg: Config, + pools: Pools + ): Resource[F, (RestApp[F], NaivePubSub[F], Client[F])] = + for { + httpClient <- BlazeClientBuilder[F].resource store <- Store.create[F]( cfg.backend.jdbc, cfg.backend.files.chunkSize, pools.connectEC ) - httpClient <- BlazeClientBuilder[F].resource pubSub <- NaivePubSub(cfg.pubSubConfig, store, httpClient)(Topics.all.map(_.topic)) restApp <- RestAppImpl.create[F](cfg, store, httpClient, pubSub) - httpClient <- BlazeClientBuilder[F].resource - httpApp = Router( - "/internal/pubsub" -> pubSub.receiveRoute, - "/api/info" -> routes.InfoRoutes(), - "/api/v1/open/" -> openRoutes(cfg, httpClient, restApp), - "/api/v1/sec/" -> Authenticate(restApp.backend.login, cfg.auth) { token => - securedRoutes(cfg, restApp, token) - }, - "/api/v1/admin" -> AdminAuth(cfg.adminEndpoint) { - adminRoutes(cfg, restApp) - }, - "/api/v1/share" -> ShareAuth(restApp.backend.share, cfg.auth) { token => - shareRoutes(cfg, restApp, token) - }, - "/api/doc" -> templates.doc, - "/app/assets" -> EnvMiddleware(WebjarRoutes.appRoutes[F]), - "/app" -> EnvMiddleware(templates.app), - "/sw.js" -> EnvMiddleware(templates.serviceWorker), - "/" -> redirectTo("/app") - ).orNotFound + } yield (restApp, pubSub, httpClient) - finalHttpApp = Logger.httpApp(logHeaders = false, logBody = false)(httpApp) + def createHttpApp[F[_]: Async]( + cfg: Config, + httpClient: Client[F], + pubSub: NaivePubSub[F], + restApp: RestApp[F], + topic: Topic[F, OutputEvent] + )( + wsB: WebSocketBuilder2[F] + ) = { + val templates = TemplateRoutes[F](cfg) + val httpApp = Router( + "/internal/pubsub" -> pubSub.receiveRoute, + "/api/info" -> routes.InfoRoutes(), + "/api/v1/open/" -> openRoutes(cfg, httpClient, restApp), + "/api/v1/sec/" -> Authenticate(restApp.backend.login, cfg.auth) { token => + securedRoutes(cfg, restApp, wsB, topic, token) + }, + "/api/v1/admin" -> AdminAuth(cfg.adminEndpoint) { + adminRoutes(cfg, restApp) + }, + "/api/v1/share" -> ShareAuth(restApp.backend.share, cfg.auth) { token => + shareRoutes(cfg, restApp, token) + }, + "/api/doc" -> templates.doc, + "/app/assets" -> EnvMiddleware(WebjarRoutes.appRoutes[F]), + "/app" -> EnvMiddleware(templates.app), + "/sw.js" -> EnvMiddleware(templates.serviceWorker), + "/" -> redirectTo("/app") + ).orNotFound - } yield finalHttpApp - - Stream - .resource(app) - .flatMap(httpApp => - BlazeServerBuilder[F] - .bindHttp(cfg.bind.port, cfg.bind.address) - .withHttpApp(httpApp) - .withoutBanner - .serve - ) - }.drain + Logger.httpApp(logHeaders = false, logBody = false)(httpApp) + } def securedRoutes[F[_]: Async]( cfg: Config, restApp: RestApp[F], + wsB: WebSocketBuilder2[F], + topic: Topic[F, OutputEvent], token: AuthToken ): HttpRoutes[F] = Router( + "ws" -> WebSocketRoutes(token, topic, wsB), "auth" -> LoginRoutes.session(restApp.backend.login, cfg, token), "tag" -> TagRoutes(restApp.backend, token), "equipment" -> EquipmentRoutes(restApp.backend, token), diff --git a/modules/restserver/src/main/scala/docspell/restserver/Subscriptions.scala b/modules/restserver/src/main/scala/docspell/restserver/Subscriptions.scala new file mode 100644 index 00000000..a574f0f2 --- /dev/null +++ b/modules/restserver/src/main/scala/docspell/restserver/Subscriptions.scala @@ -0,0 +1,35 @@ +/* + * Copyright 2020 Eike K. & Contributors + * + * SPDX-License-Identifier: AGPL-3.0-or-later + */ + +package docspell.restserver + +import fs2.Stream +import fs2.concurrent.Topic + +import docspell.backend.msg.JobDone +import docspell.common._ +import docspell.common.syntax.StringSyntax._ +import docspell.pubsub.api.PubSubT +import docspell.restserver.ws.OutputEvent + +/** Subscribes to those events from docspell that are forwarded to the websocket endpoints + */ +object Subscriptions { + + def apply[F[_]]( + wsTopic: Topic[F, OutputEvent], + pubSub: PubSubT[F] + ): Stream[F, Nothing] = + jobDone(pubSub).through(wsTopic.publish) + + def jobDone[F[_]](pubSub: PubSubT[F]): Stream[F, OutputEvent] = + pubSub + .subscribe(JobDone.topic) + .filter(m => m.body.task == ProcessItemArgs.taskName) + .map(m => m.body.args.parseJsonAs[ProcessItemArgs]) + .collect { case Right(a) => OutputEvent.ItemProcessed(a.meta.collective) } + +} diff --git a/modules/restserver/src/main/scala/docspell/restserver/ws/InputMessage.scala b/modules/restserver/src/main/scala/docspell/restserver/ws/InputMessage.scala new file mode 100644 index 00000000..4ea8dc02 --- /dev/null +++ b/modules/restserver/src/main/scala/docspell/restserver/ws/InputMessage.scala @@ -0,0 +1,11 @@ +/* + * Copyright 2020 Eike K. & Contributors + * + * SPDX-License-Identifier: AGPL-3.0-or-later + */ + +package docspell.restserver.ws + +sealed trait InputMessage + +object InputMessage {} diff --git a/modules/restserver/src/main/scala/docspell/restserver/ws/OutputEvent.scala b/modules/restserver/src/main/scala/docspell/restserver/ws/OutputEvent.scala new file mode 100644 index 00000000..8567f3a7 --- /dev/null +++ b/modules/restserver/src/main/scala/docspell/restserver/ws/OutputEvent.scala @@ -0,0 +1,32 @@ +/* + * Copyright 2020 Eike K. & Contributors + * + * SPDX-License-Identifier: AGPL-3.0-or-later + */ + +package docspell.restserver.ws + +import docspell.backend.auth.AuthToken +import docspell.common._ + +sealed trait OutputEvent { + def forCollective(token: AuthToken): Boolean + def encode: String +} + +object OutputEvent { + + case object KeepAlive extends OutputEvent { + def forCollective(token: AuthToken): Boolean = true + def encode: String = "keep-alive" + } + + final case class ItemProcessed(collective: Ident) extends OutputEvent { + def forCollective(token: AuthToken): Boolean = + token.account.collective == collective + + def encode: String = + "item-processed" + } + +} diff --git a/modules/restserver/src/main/scala/docspell/restserver/ws/WebSocketRoutes.scala b/modules/restserver/src/main/scala/docspell/restserver/ws/WebSocketRoutes.scala new file mode 100644 index 00000000..022420b2 --- /dev/null +++ b/modules/restserver/src/main/scala/docspell/restserver/ws/WebSocketRoutes.scala @@ -0,0 +1,44 @@ +/* + * Copyright 2020 Eike K. & Contributors + * + * SPDX-License-Identifier: AGPL-3.0-or-later + */ + +package docspell.restserver.ws + +import cats.effect.Async +import fs2.concurrent.Topic +import fs2.{Pipe, Stream} + +import docspell.backend.auth.AuthToken + +import org.http4s.HttpRoutes +import org.http4s.dsl.Http4sDsl +import org.http4s.server.websocket.WebSocketBuilder2 +import org.http4s.websocket.WebSocketFrame +import org.http4s.websocket.WebSocketFrame.Text + +object WebSocketRoutes { + + def apply[F[_]: Async]( + user: AuthToken, + topic: Topic[F, OutputEvent], + wsb: WebSocketBuilder2[F] + ): HttpRoutes[F] = { + val dsl = new Http4sDsl[F] {} + import dsl._ + + HttpRoutes.of { case GET -> Root => + val toClient: Stream[F, WebSocketFrame.Text] = + topic + .subscribe(500) + .filter(_.forCollective(user)) + .map(msg => Text(msg.encode)) + + val toServer: Pipe[F, WebSocketFrame, Unit] = + _.map(_ => ()) + + wsb.build(toClient, toServer) + } + } +} diff --git a/modules/webapp/src/main/elm/App/Data.elm b/modules/webapp/src/main/elm/App/Data.elm index c64c192a..73a813f4 100644 --- a/modules/webapp/src/main/elm/App/Data.elm +++ b/modules/webapp/src/main/elm/App/Data.elm @@ -190,6 +190,7 @@ type Msg | SetLanguage UiLanguage | ClientSettingsSaveResp UiSettings (Result Http.Error BasicResult) | ReceiveBrowserSettings StoredUiSettings + | ReceiveWsMessage String defaultPage : Flags -> Page diff --git a/modules/webapp/src/main/elm/App/Update.elm b/modules/webapp/src/main/elm/App/Update.elm index 47d7ad9b..f3b7e22f 100644 --- a/modules/webapp/src/main/elm/App/Update.elm +++ b/modules/webapp/src/main/elm/App/Update.elm @@ -308,6 +308,13 @@ updateWithSub msg model = in updateUserSettings texts lm model + ReceiveWsMessage data -> + let + _ = + Debug.log "WS-msg" data + in + ( model, Cmd.none, Sub.none ) + applyClientSettings : Messages -> Model -> UiSettings -> ( Model, Cmd Msg, Sub Msg ) applyClientSettings texts model settings = diff --git a/modules/webapp/src/main/elm/Main.elm b/modules/webapp/src/main/elm/Main.elm index a5b7eda3..be80a076 100644 --- a/modules/webapp/src/main/elm/Main.elm +++ b/modules/webapp/src/main/elm/Main.elm @@ -93,4 +93,5 @@ subscriptions model = Sub.batch [ model.subs , Ports.receiveUiSettings ReceiveBrowserSettings + , Ports.receiveWsMessage ReceiveWsMessage ] diff --git a/modules/webapp/src/main/elm/Ports.elm b/modules/webapp/src/main/elm/Ports.elm index 233e6c63..c366bccc 100644 --- a/modules/webapp/src/main/elm/Ports.elm +++ b/modules/webapp/src/main/elm/Ports.elm @@ -11,6 +11,7 @@ port module Ports exposing , printElement , receiveCheckQueryResult , receiveUiSettings + , receiveWsMessage , removeAccount , requestUiSettings , setAccount @@ -55,6 +56,15 @@ and calls the print dialog. port printElement : String -> Cmd msg +{-| Receives messages from the websocket. +-} +port receiveWsMessage : (String -> msg) -> Sub msg + + + +--- Higher level functions based on ports + + setUiTheme : UiTheme -> Cmd msg setUiTheme theme = internalSetUiTheme (Data.UiTheme.toString theme) diff --git a/modules/webapp/src/main/webjar/docspell.js b/modules/webapp/src/main/webjar/docspell.js index ef4ef03c..e70a00bd 100644 --- a/modules/webapp/src/main/webjar/docspell.js +++ b/modules/webapp/src/main/webjar/docspell.js @@ -135,3 +135,13 @@ elmApp.ports.printElement.subscribe(function(id) { } } }); + +var socket = new WebSocket('ws://localhost:7880/api/v1/sec/ws'); +socket.addEventListener("message", function(event) { + if (event.data != "keep-alive" && event.data) { + elmApp.ports.receiveWsMessage.send(event.data); + } +}); +// elmApp.ports.sendWsMessage.subscribe(function(msg) { +// socket.send(msg); +// }); From 42767e7732c2edbe8b160762ea12ad8f071f8116 Mon Sep 17 00:00:00 2001 From: eikek Date: Sat, 6 Nov 2021 23:35:43 +0100 Subject: [PATCH 5/7] Refresh home page on item-process event --- modules/webapp/src/main/elm/App/Update.elm | 17 +++++++++++--- .../webapp/src/main/elm/Data/ServerEvent.elm | 22 +++++++++++++++++++ .../webapp/src/main/elm/Page/Home/Data.elm | 1 + .../webapp/src/main/elm/Page/Home/Update.elm | 16 ++++++++++++++ 4 files changed, 53 insertions(+), 3 deletions(-) create mode 100644 modules/webapp/src/main/elm/Data/ServerEvent.elm diff --git a/modules/webapp/src/main/elm/App/Update.elm b/modules/webapp/src/main/elm/App/Update.elm index f3b7e22f..c5c852af 100644 --- a/modules/webapp/src/main/elm/App/Update.elm +++ b/modules/webapp/src/main/elm/App/Update.elm @@ -15,6 +15,7 @@ import App.Data exposing (..) import Browser exposing (UrlRequest(..)) import Browser.Navigation as Nav import Data.Flags +import Data.ServerEvent exposing (ServerEvent(..)) import Data.UiSettings exposing (UiSettings) import Data.UiTheme import Messages exposing (Messages) @@ -310,10 +311,20 @@ updateWithSub msg model = ReceiveWsMessage data -> let - _ = - Debug.log "WS-msg" data + se = + Data.ServerEvent.fromString data in - ( model, Cmd.none, Sub.none ) + case se of + Just ItemProcessed -> + case model.page of + HomePage -> + updateHome texts Page.Home.Data.RefreshView model + + _ -> + ( model, Cmd.none, Sub.none ) + + Nothing -> + ( model, Cmd.none, Sub.none ) applyClientSettings : Messages -> Model -> UiSettings -> ( Model, Cmd Msg, Sub Msg ) diff --git a/modules/webapp/src/main/elm/Data/ServerEvent.elm b/modules/webapp/src/main/elm/Data/ServerEvent.elm new file mode 100644 index 00000000..c4d84b72 --- /dev/null +++ b/modules/webapp/src/main/elm/Data/ServerEvent.elm @@ -0,0 +1,22 @@ +{- + Copyright 2020 Eike K. & Contributors + + SPDX-License-Identifier: AGPL-3.0-or-later +-} + + +module Data.ServerEvent exposing (ServerEvent(..), fromString) + + +type ServerEvent + = ItemProcessed + + +fromString : String -> Maybe ServerEvent +fromString str = + case String.toLower str of + "item-processed" -> + Just ItemProcessed + + _ -> + Nothing diff --git a/modules/webapp/src/main/elm/Page/Home/Data.elm b/modules/webapp/src/main/elm/Page/Home/Data.elm index 0a2deed7..7c1bef6a 100644 --- a/modules/webapp/src/main/elm/Page/Home/Data.elm +++ b/modules/webapp/src/main/elm/Page/Home/Data.elm @@ -229,6 +229,7 @@ type Msg | PublishItemsMsg Comp.PublishItems.Msg | TogglePublishCurrentQueryView | PublishViewMsg Comp.PublishItems.Msg + | RefreshView type SearchType diff --git a/modules/webapp/src/main/elm/Page/Home/Update.elm b/modules/webapp/src/main/elm/Page/Home/Update.elm index 27ddf0fd..998b2017 100644 --- a/modules/webapp/src/main/elm/Page/Home/Update.elm +++ b/modules/webapp/src/main/elm/Page/Home/Update.elm @@ -227,6 +227,22 @@ update mId key flags texts settings msg model = else doSearch param nm + RefreshView -> + let + param = + { flags = flags + , searchType = model.lastSearchType + , pageSize = settings.itemSearchPageSize + , offset = model.searchOffset + , scroll = False + } + in + if model.searchInProgress then + withSub ( model, Cmd.none ) + + else + doSearch param model + ToggleSearchMenu -> let nextView = From 8ae314bead56c963052f5c722926397f72537f87 Mon Sep 17 00:00:00 2001 From: eikek Date: Sun, 7 Nov 2021 00:40:43 +0100 Subject: [PATCH 6/7] Fix websocket initialization --- modules/webapp/src/main/webjar/docspell.js | 38 +++++++++++++++++----- 1 file changed, 29 insertions(+), 9 deletions(-) diff --git a/modules/webapp/src/main/webjar/docspell.js b/modules/webapp/src/main/webjar/docspell.js index e70a00bd..08aa6aed 100644 --- a/modules/webapp/src/main/webjar/docspell.js +++ b/modules/webapp/src/main/webjar/docspell.js @@ -12,9 +12,9 @@ function extend() { var result = {}; for (var i = 0; i < arguments.length; i++) { forEachIn(arguments[i], - function(obj, key) { - result[key] = obj; - }); + function(obj, key) { + result[key] = obj; + }); } return result; } @@ -41,13 +41,18 @@ elmApp.ports.internalSetUiTheme.subscribe(function(themeName) { }); elmApp.ports.setAccount.subscribe(function(authResult) { - console.log("Add account from local storage"); + console.log("Add account to local storage"); localStorage.setItem("account", JSON.stringify(authResult)); + + if (!dsWebSocket) { + initWS(); + } }); elmApp.ports.removeAccount.subscribe(function() { console.log("Remove account from local storage"); localStorage.removeItem("account"); + closeWS(); }); elmApp.ports.requestUiSettings.subscribe(function(args) { @@ -136,12 +141,27 @@ elmApp.ports.printElement.subscribe(function(id) { } }); -var socket = new WebSocket('ws://localhost:7880/api/v1/sec/ws'); -socket.addEventListener("message", function(event) { - if (event.data != "keep-alive" && event.data) { - elmApp.ports.receiveWsMessage.send(event.data); + +var dsWebSocket = null; +function closeWS() { + if (dsWebSocket) { + console.log("Closing websocket connection"); + dsWebSocket.close(1000, "Done"); + dsWebSocket = null; } -}); +} +function initWS() { + closeWS(); + var protocol = (window.location.protocol === 'https:') ? 'wss:' : 'ws:'; + var url = protocol + '//' + window.location.host + '/api/v1/sec/ws'; + console.log("Initialize websocket at " + url); + dsWebSocket = new WebSocket(url); + dsWebSocket.addEventListener("message", function(event) { + if (event.data != "keep-alive" && event.data) { + elmApp.ports.receiveWsMessage.send(event.data); + } + }); +} // elmApp.ports.sendWsMessage.subscribe(function(msg) { // socket.send(msg); // }); From 62d02d75b44a027e2765f85144f9913fe7278fcd Mon Sep 17 00:00:00 2001 From: eikek Date: Sun, 7 Nov 2021 17:14:02 +0100 Subject: [PATCH 7/7] Show a message when a new document arrived --- modules/webapp/src/main/elm/App/Data.elm | 3 +++ modules/webapp/src/main/elm/App/Update.elm | 14 ++++++++++++-- modules/webapp/src/main/elm/App/View2.elm | 14 +++++++++++++- modules/webapp/src/main/elm/Messages/App.elm | 3 +++ modules/webapp/src/main/elm/Styles.elm | 7 ++++++- 5 files changed, 37 insertions(+), 4 deletions(-) diff --git a/modules/webapp/src/main/elm/App/Data.elm b/modules/webapp/src/main/elm/App/Data.elm index 73a813f4..f88a36cc 100644 --- a/modules/webapp/src/main/elm/App/Data.elm +++ b/modules/webapp/src/main/elm/App/Data.elm @@ -64,6 +64,7 @@ type alias Model = , anonymousTheme : UiTheme , anonymousUiLang : UiLanguage , langMenuOpen : Bool + , showNewItemsArrived : Bool } @@ -126,6 +127,7 @@ init key url flags_ settings = , anonymousTheme = Data.UiTheme.Light , anonymousUiLang = Messages.UiLanguage.English , langMenuOpen = False + , showNewItemsArrived = False } , Cmd.batch [ Cmd.map UserSettingsMsg uc @@ -191,6 +193,7 @@ type Msg | ClientSettingsSaveResp UiSettings (Result Http.Error BasicResult) | ReceiveBrowserSettings StoredUiSettings | ReceiveWsMessage String + | ToggleShowNewItemsArrived defaultPage : Flags -> Page diff --git a/modules/webapp/src/main/elm/App/Update.elm b/modules/webapp/src/main/elm/App/Update.elm index c5c852af..60bfcd47 100644 --- a/modules/webapp/src/main/elm/App/Update.elm +++ b/modules/webapp/src/main/elm/App/Update.elm @@ -316,16 +316,26 @@ updateWithSub msg model = in case se of Just ItemProcessed -> + let + newModel = + { model | showNewItemsArrived = True } + in case model.page of HomePage -> - updateHome texts Page.Home.Data.RefreshView model + updateHome texts Page.Home.Data.RefreshView newModel _ -> - ( model, Cmd.none, Sub.none ) + ( newModel, Cmd.none, Sub.none ) Nothing -> ( model, Cmd.none, Sub.none ) + ToggleShowNewItemsArrived -> + ( { model | showNewItemsArrived = not model.showNewItemsArrived } + , Cmd.none + , Sub.none + ) + applyClientSettings : Messages -> Model -> UiSettings -> ( Model, Cmd Msg, Sub Msg ) applyClientSettings texts model settings = diff --git a/modules/webapp/src/main/elm/App/View2.elm b/modules/webapp/src/main/elm/App/View2.elm index 75159ed1..4e3d657c 100644 --- a/modules/webapp/src/main/elm/App/View2.elm +++ b/modules/webapp/src/main/elm/App/View2.elm @@ -71,7 +71,19 @@ topNavUser auth model = , activeStyle = "hover:bg-blue-200 dark:hover:bg-bluegray-800 w-12" } , headerNavItem True model - , div [ class "flex flex-grow justify-end" ] + , div [ class "flex flex-grow justify-center" ] + [ a + [ class S.infoMessageBase + , class "my-2 px-1 py-1 rounded-lg inline-block hover:opacity-50" + , classList [ ( "hidden", not model.showNewItemsArrived ) ] + , href "#" + , onClick ToggleShowNewItemsArrived + ] + [ i [ class "fa fa-exclamation-circle mr-1" ] [] + , text texts.app.newItemsArrived + ] + ] + , div [ class "flex justify-end" ] [ userMenu texts.app auth model , dataMenu texts.app auth model ] diff --git a/modules/webapp/src/main/elm/Messages/App.elm b/modules/webapp/src/main/elm/Messages/App.elm index 0288e4a7..06d32a73 100644 --- a/modules/webapp/src/main/elm/Messages/App.elm +++ b/modules/webapp/src/main/elm/Messages/App.elm @@ -23,6 +23,7 @@ type alias Texts = , processingQueue : String , newInvites : String , help : String + , newItemsArrived : String } @@ -38,6 +39,7 @@ gb = , processingQueue = "Processing Queue" , newInvites = "New Invites" , help = "Help" + , newItemsArrived = "New items arrived!" } @@ -53,4 +55,5 @@ de = , processingQueue = "Verarbeitung" , newInvites = "Neue Einladung" , help = "Hilfe (English)" + , newItemsArrived = "Neue Dokumente eingetroffen!" } diff --git a/modules/webapp/src/main/elm/Styles.elm b/modules/webapp/src/main/elm/Styles.elm index 645d2324..e7cbdb6e 100644 --- a/modules/webapp/src/main/elm/Styles.elm +++ b/modules/webapp/src/main/elm/Styles.elm @@ -63,9 +63,14 @@ warnMessageColors = " border-yellow-800 bg-yellow-50 text-yellow-800 dark:border-amber-200 dark:bg-amber-800 dark:text-amber-200 " +infoMessageBase : String +infoMessageBase = + " border border-blue-800 bg-blue-100 text-blue-800 dark:border-lightblue-200 dark:bg-lightblue-800 dark:text-lightblue-200 dark:bg-opacity-25 " + + infoMessage : String infoMessage = - " border border-blue-800 bg-blue-100 text-blue-800 dark:border-lightblue-200 dark:bg-lightblue-800 dark:text-lightblue-200 dark:bg-opacity-25 px-2 py-2 rounded " + infoMessageBase ++ " px-2 py-2 rounded " message : String