From d483d9f176e63f4e8a9ed60c8c72bc00902cb4e8 Mon Sep 17 00:00:00 2001 From: eikek Date: Tue, 2 Nov 2021 00:41:16 +0100 Subject: [PATCH] Initial naive pubsub impl generalising from current setup --- build.sbt | 34 ++- .../scala/docspell/pubsub/api/Message.scala | 28 +++ .../docspell/pubsub/api/MessageHead.scala | 22 ++ .../scala/docspell/pubsub/api/PubSub.scala | 20 ++ .../scala/docspell/pubsub/api/PubSubT.scala | 57 +++++ .../scala/docspell/pubsub/api/Topic.scala | 26 +++ .../scala/docspell/pubsub/api/Topics.scala | 47 ++++ .../docspell/pubsub/api/TypedTopic.scala | 29 +++ .../docspell/pubsub/naive/NaivePubSub.scala | 212 ++++++++++++++++++ .../docspell/pubsub/naive/PubSubConfig.scala | 11 + .../docspell/pubsub/naive/Fixtures.scala | 85 +++++++ .../docspell/pubsub/naive/HttpClientOps.scala | 55 +++++ .../pubsub/naive/NaivePubSubTest.scala | 128 +++++++++++ .../db/migration/h2/V1.28.0__pubsub.sql | 8 + .../db/migration/mariadb/V1.28.0__pubsub.sql | 8 + .../migration/postgresql/V1.28.0__pubsub.sql | 8 + .../docspell/store/records/RPubSub.scala | 86 +++++++ .../store/src/test/resources/logback-test.xml | 2 +- 18 files changed, 864 insertions(+), 2 deletions(-) create mode 100644 modules/pubsub/api/src/main/scala/docspell/pubsub/api/Message.scala create mode 100644 modules/pubsub/api/src/main/scala/docspell/pubsub/api/MessageHead.scala create mode 100644 modules/pubsub/api/src/main/scala/docspell/pubsub/api/PubSub.scala create mode 100644 modules/pubsub/api/src/main/scala/docspell/pubsub/api/PubSubT.scala create mode 100644 modules/pubsub/api/src/main/scala/docspell/pubsub/api/Topic.scala create mode 100644 modules/pubsub/api/src/main/scala/docspell/pubsub/api/Topics.scala create mode 100644 modules/pubsub/api/src/main/scala/docspell/pubsub/api/TypedTopic.scala create mode 100644 modules/pubsub/naive/src/main/scala/docspell/pubsub/naive/NaivePubSub.scala create mode 100644 modules/pubsub/naive/src/main/scala/docspell/pubsub/naive/PubSubConfig.scala create mode 100644 modules/pubsub/naive/src/test/scala/docspell/pubsub/naive/Fixtures.scala create mode 100644 modules/pubsub/naive/src/test/scala/docspell/pubsub/naive/HttpClientOps.scala create mode 100644 modules/pubsub/naive/src/test/scala/docspell/pubsub/naive/NaivePubSubTest.scala create mode 100644 modules/store/src/main/resources/db/migration/h2/V1.28.0__pubsub.sql create mode 100644 modules/store/src/main/resources/db/migration/mariadb/V1.28.0__pubsub.sql create mode 100644 modules/store/src/main/resources/db/migration/postgresql/V1.28.0__pubsub.sql create mode 100644 modules/store/src/main/scala/docspell/store/records/RPubSub.scala diff --git a/build.sbt b/build.sbt index 82ebe6c2..b47c36ce 100644 --- a/build.sbt +++ b/build.sbt @@ -410,6 +410,36 @@ val store = project ) .dependsOn(common, query.jvm, totp, files) +val pubsubApi = project + .in(file("modules/pubsub/api")) + .disablePlugins(RevolverPlugin) + .settings(sharedSettings) + .settings(testSettingsMUnit) + .settings( + name := "docspell-pubsub-api", + addCompilerPlugin(Dependencies.kindProjectorPlugin), + libraryDependencies ++= + Dependencies.fs2 + ) + .dependsOn(common) + +val pubsubNaive = project + .in(file("modules/pubsub/naive")) + .disablePlugins(RevolverPlugin) + .settings(sharedSettings) + .settings(testSettingsMUnit) + .settings( + name := "docspell-pubsub-naive", + addCompilerPlugin(Dependencies.kindProjectorPlugin), + libraryDependencies ++= + Dependencies.fs2 ++ + Dependencies.http4sCirce ++ + Dependencies.http4sDsl ++ + Dependencies.http4sClient ++ + Dependencies.circe + ) + .dependsOn(common, pubsubApi, store % "compile->compile;test->test") + val extract = project .in(file("modules/extract")) .disablePlugins(RevolverPlugin) @@ -781,7 +811,9 @@ val root = project query.jvm, query.js, totp, - oidc + oidc, + pubsubApi, + pubsubNaive ) // --- Helpers diff --git a/modules/pubsub/api/src/main/scala/docspell/pubsub/api/Message.scala b/modules/pubsub/api/src/main/scala/docspell/pubsub/api/Message.scala new file mode 100644 index 00000000..c6d27c02 --- /dev/null +++ b/modules/pubsub/api/src/main/scala/docspell/pubsub/api/Message.scala @@ -0,0 +1,28 @@ +/* + * Copyright 2020 Eike K. & Contributors + * + * SPDX-License-Identifier: AGPL-3.0-or-later + */ + +package docspell.pubsub.api + +import io.circe.generic.semiauto.{deriveDecoder, deriveEncoder} +import io.circe.{Decoder, Encoder, Json} + +final case class Message[A](head: MessageHead, body: A) {} + +object Message { + implicit val jsonDecoderJson: Decoder[Message[Json]] = + deriveDecoder[Message[Json]] + + implicit val jsonEncoderJson: Encoder[Message[Json]] = + deriveEncoder[Message[Json]] + + implicit def jsonDecoder[A](implicit da: Decoder[A]): Decoder[Message[A]] = + jsonDecoderJson.emap(mj => + da.decodeJson(mj.body).map(b => mj.copy(body = b)).left.map(_.message) + ) + + implicit def jsonEncoder[A](implicit ea: Encoder[A]): Encoder[Message[A]] = + jsonEncoderJson.contramap(m => m.copy(body = ea(m.body))) +} diff --git a/modules/pubsub/api/src/main/scala/docspell/pubsub/api/MessageHead.scala b/modules/pubsub/api/src/main/scala/docspell/pubsub/api/MessageHead.scala new file mode 100644 index 00000000..c604b52d --- /dev/null +++ b/modules/pubsub/api/src/main/scala/docspell/pubsub/api/MessageHead.scala @@ -0,0 +1,22 @@ +/* + * Copyright 2020 Eike K. & Contributors + * + * SPDX-License-Identifier: AGPL-3.0-or-later + */ + +package docspell.pubsub.api + +import docspell.common.{Ident, Timestamp} + +import io.circe.generic.semiauto.{deriveDecoder, deriveEncoder} +import io.circe.{Decoder, Encoder} + +final case class MessageHead(id: Ident, send: Timestamp, topic: Topic) + +object MessageHead { + implicit val jsonDecoder: Decoder[MessageHead] = + deriveDecoder[MessageHead] + + implicit val jsonEncoder: Encoder[MessageHead] = + deriveEncoder[MessageHead] +} diff --git a/modules/pubsub/api/src/main/scala/docspell/pubsub/api/PubSub.scala b/modules/pubsub/api/src/main/scala/docspell/pubsub/api/PubSub.scala new file mode 100644 index 00000000..4ae9ef6a --- /dev/null +++ b/modules/pubsub/api/src/main/scala/docspell/pubsub/api/PubSub.scala @@ -0,0 +1,20 @@ +/* + * Copyright 2020 Eike K. & Contributors + * + * SPDX-License-Identifier: AGPL-3.0-or-later + */ + +package docspell.pubsub.api + +import cats.data.NonEmptyList +import fs2.{Pipe, Stream} + +import io.circe.Json + +trait PubSub[F[_]] { + def publish1(topic: Topic, msg: Json): F[MessageHead] + + def publish(topic: Topic): Pipe[F, Json, MessageHead] + + def subscribe(topics: NonEmptyList[Topic]): Stream[F, Message[Json]] +} diff --git a/modules/pubsub/api/src/main/scala/docspell/pubsub/api/PubSubT.scala b/modules/pubsub/api/src/main/scala/docspell/pubsub/api/PubSubT.scala new file mode 100644 index 00000000..176e8885 --- /dev/null +++ b/modules/pubsub/api/src/main/scala/docspell/pubsub/api/PubSubT.scala @@ -0,0 +1,57 @@ +/* + * Copyright 2020 Eike K. & Contributors + * + * SPDX-License-Identifier: AGPL-3.0-or-later + */ + +package docspell.pubsub.api + +import cats.data.NonEmptyList +import fs2.{Pipe, Stream} + +import docspell.common.Logger + +trait PubSubT[F[_], P <: PubSub[F]] { + + def publish1[A](topic: TypedTopic[A], msg: A): F[MessageHead] + + def publish[A](topic: TypedTopic[A]): Pipe[F, A, MessageHead] + + def subscribe[A](topic: TypedTopic[A]): Stream[F, Message[A]] + + def delegate: P + + def withDelegate[R <: PubSub[F]](delegate: R): PubSubT[F, R] +} + +object PubSubT { + + def apply[F[_], P <: PubSub[F]](pubSub: P, logger: Logger[F]): PubSubT[F, P] = + new PubSubT[F, P] { + def publish1[A](topic: TypedTopic[A], msg: A): F[MessageHead] = + pubSub.publish1(topic.topic, topic.codec(msg)) + + def publish[A](topic: TypedTopic[A]): Pipe[F, A, MessageHead] = + _.map(topic.codec.apply).through(pubSub.publish(topic.topic)) + + def subscribe[A](topic: TypedTopic[A]): Stream[F, Message[A]] = + pubSub + .subscribe(NonEmptyList.of(topic.topic)) + .flatMap(m => + m.body.as[A](topic.codec) match { + case Right(a) => Stream.emit(Message(m.head, a)) + case Left(err) => + logger.s + .error(err)( + s"Could not decode message to topic ${topic.name} to ${topic.msgClass}: ${m.body.noSpaces}" + ) + .drain + } + ) + + def delegate: P = pubSub + + def withDelegate[R <: PubSub[F]](newDelegate: R): PubSubT[F, R] = + PubSubT(newDelegate, logger) + } +} diff --git a/modules/pubsub/api/src/main/scala/docspell/pubsub/api/Topic.scala b/modules/pubsub/api/src/main/scala/docspell/pubsub/api/Topic.scala new file mode 100644 index 00000000..ab9d664b --- /dev/null +++ b/modules/pubsub/api/src/main/scala/docspell/pubsub/api/Topic.scala @@ -0,0 +1,26 @@ +/* + * Copyright 2020 Eike K. & Contributors + * + * SPDX-License-Identifier: AGPL-3.0-or-later + */ + +package docspell.pubsub.api + +import io.circe.{Decoder, Encoder} + +final case class Topic private (topic: String) { + def name: String = topic +} + +object Topic { + implicit val jsonDecoder: Decoder[Topic] = + Decoder.decodeString.map(Topic.apply) + + implicit val jsonEncoder: Encoder[Topic] = + Encoder.encodeString.contramap(_.topic) + + def apply(name: String): Topic = { + require(name.trim.nonEmpty) + new Topic(name) + } +} diff --git a/modules/pubsub/api/src/main/scala/docspell/pubsub/api/Topics.scala b/modules/pubsub/api/src/main/scala/docspell/pubsub/api/Topics.scala new file mode 100644 index 00000000..78b22408 --- /dev/null +++ b/modules/pubsub/api/src/main/scala/docspell/pubsub/api/Topics.scala @@ -0,0 +1,47 @@ +/* + * Copyright 2020 Eike K. & Contributors + * + * SPDX-License-Identifier: AGPL-3.0-or-later + */ + +package docspell.pubsub.api + +import cats.data.NonEmptyList + +import docspell.common.Ident + +import io.circe.generic.semiauto.{deriveDecoder, deriveEncoder} +import io.circe.{Decoder, Encoder} + +/** All topics used in Docspell. */ +object Topics { + + /** Notify when a job has finished. */ + val jobDone: TypedTopic[JobDoneMsg] = TypedTopic[JobDoneMsg](Topic("job-done")) + + /** Notify when a job has been submitted. The job executor listens to these messages to + * wake up and do its work. + */ + val jobSubmitted: TypedTopic[JobSubmittedMsg] = + TypedTopic[JobSubmittedMsg](Topic("job-submitted")) + + val all: NonEmptyList[TypedTopic[_]] = NonEmptyList.of(jobDone, jobSubmitted) + + final case class JobSubmittedMsg(id: Ident) + object JobSubmittedMsg { + implicit val jsonDecoder: Decoder[JobSubmittedMsg] = + deriveDecoder[JobSubmittedMsg] + + implicit val jsonEncoder: Encoder[JobSubmittedMsg] = + deriveEncoder[JobSubmittedMsg] + } + + final case class JobDoneMsg(jobId: Ident, task: Ident) + object JobDoneMsg { + implicit val jsonDecoder: Decoder[JobDoneMsg] = + deriveDecoder[JobDoneMsg] + + implicit val jsonEncoder: Encoder[JobDoneMsg] = + deriveEncoder[JobDoneMsg] + } +} diff --git a/modules/pubsub/api/src/main/scala/docspell/pubsub/api/TypedTopic.scala b/modules/pubsub/api/src/main/scala/docspell/pubsub/api/TypedTopic.scala new file mode 100644 index 00000000..542144ee --- /dev/null +++ b/modules/pubsub/api/src/main/scala/docspell/pubsub/api/TypedTopic.scala @@ -0,0 +1,29 @@ +/* + * Copyright 2020 Eike K. & Contributors + * + * SPDX-License-Identifier: AGPL-3.0-or-later + */ + +package docspell.pubsub.api + +import scala.reflect.{ClassTag, classTag} + +import io.circe.{Codec, Decoder, Encoder} + +final case class TypedTopic[A](topic: Topic, codec: Codec[A], msgClass: Class[_]) { + def name: String = topic.name + + def withTopic(topic: Topic): TypedTopic[A] = + copy(topic = topic) + + def withName(name: String): TypedTopic[A] = + withTopic(Topic(name)) +} + +object TypedTopic { + + def apply[A: ClassTag]( + topic: Topic + )(implicit dec: Decoder[A], enc: Encoder[A]): TypedTopic[A] = + TypedTopic(topic, Codec.from(dec, enc), classTag[A].runtimeClass) +} diff --git a/modules/pubsub/naive/src/main/scala/docspell/pubsub/naive/NaivePubSub.scala b/modules/pubsub/naive/src/main/scala/docspell/pubsub/naive/NaivePubSub.scala new file mode 100644 index 00000000..55bdc786 --- /dev/null +++ b/modules/pubsub/naive/src/main/scala/docspell/pubsub/naive/NaivePubSub.scala @@ -0,0 +1,212 @@ +/* + * Copyright 2020 Eike K. & Contributors + * + * SPDX-License-Identifier: AGPL-3.0-or-later + */ + +package docspell.pubsub.naive + +import cats.data.NonEmptyList +import cats.effect._ +import cats.implicits._ +import fs2.Pipe +import fs2.Stream +import fs2.concurrent.{Topic => Fs2Topic} + +import docspell.common._ +import docspell.pubsub.api._ +import docspell.pubsub.naive.NaivePubSub.State +import docspell.store.Store +import docspell.store.records.RPubSub + +import io.circe.Json +import org.http4s.circe.CirceEntityCodec._ +import org.http4s.client.Client +import org.http4s.client.dsl.Http4sClientDsl +import org.http4s.dsl.Http4sDsl +import org.http4s.{HttpRoutes, Uri} + +/** A pubsub implementation that can be used across machines, but uses a rather + * inefficient protocol. The reason is to make it work with the current setup, i.e. not + * requiring to add another complex piece of software to the mix, like Kafka or RabbitMQ. + * + * However, the api should allow to be used on top of such a tool. This implementation + * can be used in a personal setting, where there are only a few nodes. + * + * How it works: + * + * It is build on the `Topic` class from fs2.concurrent. A map of a topic name to such a + * `Topic` instance is maintained. To work across machines, the database is used as a + * synchronization point. Each subscriber must provide a http api and so its "callback" + * URL is added into the database to the list of remote subscribers. + * + * When publishing a message, the message can be published to the internal fs2 topic. + * Then all URLs to this topic name are looked up in the database and the message is + * POSTed to each URL as JSON. The endpoint of each machine takes this message and + * publishes it to its own internal fs2.concurrent.Topic instance. + * + * Obviously, there are drawbacks: it is slow, because the messages go through http and + * connections must be opened/closed etc and the database is hit as well. Then it doesn't + * scale to lots of machines and messages. The upside is, that it works with the current + * setup and it should be good enough for personal use, where there are only a small + * amount of machines and messages. + * + * The main use case for docspell is to communicate between the rest-server and job + * executor. It is for internal communication and all topics are known at compile time. + */ +final class NaivePubSub[F[_]: Async]( + cfg: PubSubConfig, + state: Ref[F, State[F]], + store: Store[F], + client: Client[F] +) extends PubSub[F] { + private val logger: Logger[F] = Logger.log4s(org.log4s.getLogger) + + def withClient(client: Client[F]): NaivePubSub[F] = + new NaivePubSub[F](cfg, state, store, client) + + def publish1(topic: Topic, msgBody: Json): F[MessageHead] = + for { + head <- mkMessageHead(topic) + msg = Message(head, msgBody) + _ <- logger.trace(s"Publishing: $msg") + // go through all local subscribers and publish to the fs2 topic + _ <- publishLocal(msg) + // get all remote subscribers from the database and send the message via http + _ <- publishRemote(msg) + } yield head + + def publish(topic: Topic): Pipe[F, Json, MessageHead] = + ms => //TODO Do some optimization by grouping messages to the same topic + ms.evalMap(publish1(topic, _)) + + def subscribe(topics: NonEmptyList[Topic]): Stream[F, Message[Json]] = + (for { + _ <- logger.s.info(s"Adding subscriber for topics: $topics") + _ <- Stream.resource[F, Unit](addRemote(topics)) + m <- Stream.eval(addLocal(topics)) + } yield m).flatten + + /** Receive messages from remote publishers and passes them to the local subscribers. */ + def receiveRoute: HttpRoutes[F] = { + val dsl = new Http4sDsl[F] {} + import dsl._ + + HttpRoutes.of { case req @ POST -> Root => + for { + data <- req.as[List[Message[Json]]] + _ <- logger.trace(s"Received external message(s): $data") + _ <- data.traverse(publishLocal) + resp <- Ok(()) + } yield resp + } + } + + // ---- private helpers + + private def mkMessageHead(topic: Topic): F[MessageHead] = + for { + id <- Ident.randomId[F] + ts <- Timestamp.current[F] + head = MessageHead(id, ts, topic) + } yield head + + private def addLocal(topics: NonEmptyList[Topic]): F[Stream[F, Message[Json]]] = { + val topicSet = topics.map(_.name).toList.toSet + for { + st <- state.get + tpc = st.topics.view.filterKeys(topicSet.contains) + _ <- + if (tpc.isEmpty) + logger.warn(s"Subscribing to 0 topics! Topics $topics were not initialized") + else ().pure[F] + data = tpc.values.toList.traverse(t => t.subscribe(cfg.subscriberQueueSize)) + out = data.flatMap(msgs => Stream.emits(msgs)) + } yield out + } + + private def addRemote(topics: NonEmptyList[Topic]): Resource[F, Unit] = { + def subscribe: F[Unit] = + logger.trace(s"Incrementing counter for topics: $topics") *> + store.transact(RPubSub.increment(cfg.url, topics.map(_.name))).as(()) + + def unsubscribe: F[Unit] = + logger.trace(s"Decrementing counter for topics: $topics") *> + store.transact(RPubSub.decrement(cfg.url, topics.map(_.name))).as(()) + + Resource.make(subscribe)(_ => unsubscribe) + } + + private def publishLocal(msg: Message[Json]): F[Unit] = + for { + st <- state.get + _ <- st.topics.get(msg.head.topic.name) match { + case Some(sub) => + logger.trace(s"Publishing message to local topic: $msg") *> + sub.publish1(msg).as(()) + case None => + ().pure[F] + } + } yield () + + private def publishRemote(msg: Message[Json]): F[Unit] = { + val dsl = new Http4sDsl[F] with Http4sClientDsl[F] {} + import dsl._ + + for { + _ <- logger.trace(s"Find all nodes subscribed to topic ${msg.head.topic.name}") + urls <- store.transact(RPubSub.findSubs(msg.head.topic.name)) + _ <- logger.trace(s"Publishing to remote urls ${urls.map(_.asString)}: $msg") + reqs = urls + .map(u => Uri.unsafeFromString(u.asString)) + .map(uri => POST(List(msg), uri)) + res <- reqs.traverse(req => client.status(req)).attempt + _ <- res match { + case Right(s) => + if (s.forall(_.isSuccess)) ().pure[F] + else if (s.size == urls.size) + logger.warn( + s"No nodes was be reached! Reason: $s, message: $msg" + ) + else + logger.warn( + s"Some nodes were not reached! Reason: $s, message: $msg" + ) + case Left(ex) => + logger.error(ex)(s"Error publishing ${msg.head.topic.name} message remotely") + } + } yield () + } +} + +object NaivePubSub { + + def apply[F[_]: Async]( + cfg: PubSubConfig, + store: Store[F], + client: Client[F] + )(topics: NonEmptyList[Topic]): F[NaivePubSub[F]] = + for { + state <- Ref.ofEffect[F, State[F]](State.create[F](topics)) + _ <- store.transact(RPubSub.initTopics(cfg.nodeId, cfg.url, topics.map(_.name))) + } yield new NaivePubSub[F](cfg, state, store, client) + + def create[F[_]: Async]( + cfg: PubSubConfig, + store: Store[F], + client: Client[F], + logger: Logger[F] + )(topics: NonEmptyList[Topic]): F[PubSubT[F, NaivePubSub[F]]] = + apply[F](cfg, store, client)(topics).map(ps => PubSubT(ps, logger)) + + final case class State[F[_]](topics: Map[String, Fs2Topic[F, Message[Json]]]) {} + + object State { + def empty[F[_]]: State[F] = State[F](Map.empty) + def create[F[_]: Async](topics: NonEmptyList[Topic]): F[State[F]] = + topics + .traverse(t => Fs2Topic[F, Message[Json]].map(fs2t => t.name -> fs2t)) + .map(_.toList.toMap) + .map(State.apply) + } +} diff --git a/modules/pubsub/naive/src/main/scala/docspell/pubsub/naive/PubSubConfig.scala b/modules/pubsub/naive/src/main/scala/docspell/pubsub/naive/PubSubConfig.scala new file mode 100644 index 00000000..b00131d6 --- /dev/null +++ b/modules/pubsub/naive/src/main/scala/docspell/pubsub/naive/PubSubConfig.scala @@ -0,0 +1,11 @@ +/* + * Copyright 2020 Eike K. & Contributors + * + * SPDX-License-Identifier: AGPL-3.0-or-later + */ + +package docspell.pubsub.naive + +import docspell.common.{Ident, LenientUri} + +case class PubSubConfig(nodeId: Ident, url: LenientUri, subscriberQueueSize: Int) diff --git a/modules/pubsub/naive/src/test/scala/docspell/pubsub/naive/Fixtures.scala b/modules/pubsub/naive/src/test/scala/docspell/pubsub/naive/Fixtures.scala new file mode 100644 index 00000000..cd2a386a --- /dev/null +++ b/modules/pubsub/naive/src/test/scala/docspell/pubsub/naive/Fixtures.scala @@ -0,0 +1,85 @@ +/* + * Copyright 2020 Eike K. & Contributors + * + * SPDX-License-Identifier: AGPL-3.0-or-later + */ + +package docspell.pubsub.naive + +import cats.effect._ + +import docspell.common._ +import docspell.pubsub.api._ +import docspell.store.{Store, StoreFixture} + +import munit.CatsEffectSuite +import org.http4s.Response +import org.http4s.client.Client + +trait Fixtures extends HttpClientOps { self: CatsEffectSuite => + + val pubsubEnv = ResourceFixture(Fixtures.envResource("node-1")) + + val pubsubT = ResourceFixture { + Fixtures + .envResource("node-1") + .flatMap(e => Resource.eval(e.pubSub)) + .map(ps => PubSubT(ps, Fixtures.loggerIO)) + } + + def conntectedPubsubs(env: Fixtures.Env) = + for { + // Create two pubsub instances connected to the same database + ps_1 <- env.withNodeId("node-1").pubSubT + ps_2 <- env.withNodeId("node-2").pubSubT + + // both instances have a dummy client. now connect their clients to each other + ps1 = ps_1.withDelegate(ps_1.delegate.withClient(httpClient(ps_2))) + ps2 = ps_2.withDelegate(ps_2.delegate.withClient(httpClient(ps_1))) + } yield (ps1, ps2) + + implicit final class StringId(s: String) { + def id: Ident = Ident.unsafe(s) + } +} + +object Fixtures { + private val loggerIO: Logger[IO] = Logger.log4s(org.log4s.getLogger) + + final case class Env(store: Store[IO], cfg: PubSubConfig) { + def pubSub: IO[NaivePubSub[IO]] = { + val dummyClient = Client[IO](_ => Resource.pure(Response.notFound[IO])) + NaivePubSub(cfg, store, dummyClient)(Topics.all.map(_.topic)) + } + def pubSubT: IO[PubSubT[IO, NaivePubSub[IO]]] = + pubSub.map(PubSubT(_, loggerIO)) + + def withNodeId(nodeId: String): Env = + copy(cfg = + cfg.copy( + nodeId = Ident.unsafe(nodeId), + url = LenientUri.unsafe(s"http://$nodeId/") + ) + ) + } + + def testConfig(nodeId: String) = + PubSubConfig( + Ident.unsafe(nodeId), + LenientUri.unsafe(s"http://${nodeId}/"), + 0 + ) + + def storeResource: Resource[IO, Store[IO]] = + for { + random <- Resource.eval(Ident.randomId[IO]) + cfg = StoreFixture.memoryDB(random.id.take(12)) + store <- StoreFixture.store(cfg) + _ <- Resource.eval(store.migrate) + } yield store + + def envResource(nodeId: String): Resource[IO, Env] = + for { + store <- storeResource + } yield Env(store, testConfig(nodeId)) +} diff --git a/modules/pubsub/naive/src/test/scala/docspell/pubsub/naive/HttpClientOps.scala b/modules/pubsub/naive/src/test/scala/docspell/pubsub/naive/HttpClientOps.scala new file mode 100644 index 00000000..cafdf0c2 --- /dev/null +++ b/modules/pubsub/naive/src/test/scala/docspell/pubsub/naive/HttpClientOps.scala @@ -0,0 +1,55 @@ +/* + * Copyright 2020 Eike K. & Contributors + * + * SPDX-License-Identifier: AGPL-3.0-or-later + */ + +package docspell.pubsub.naive + +import cats.effect._ + +import docspell.common._ +import docspell.pubsub.api._ + +import io.circe.Encoder +import org.http4s.circe.CirceEntityCodec._ +import org.http4s.client.Client +import org.http4s.client.dsl.io._ +import org.http4s.dsl.io._ +import org.http4s.{HttpApp, HttpRoutes, Uri} + +trait HttpClientOps { + def httpClient(routes: HttpRoutes[IO]): Client[IO] = + Client.fromHttpApp(HttpApp(routes.orNotFound.run)) + + def httpClient(ps: NaivePubSub[IO]): Client[IO] = + httpClient(ps.receiveRoute) + + def httpClient(ps: PubSubT[IO, NaivePubSub[IO]]): Client[IO] = + httpClient(ps.delegate) + + implicit final class ClientOps(client: Client[IO]) { + val uri = Uri.unsafeFromString("http://localhost/") + + def sendMessage[A: Encoder](topic: Topic, body: A): IO[Unit] = { + val encode: Encoder[List[Message[A]]] = implicitly[Encoder[List[Message[A]]]] + + for { + id <- Ident.randomId[IO] + time <- Timestamp.current[IO] + mesg = List(Message(MessageHead(id, time, topic), body)) + _ <- HttpClientOps.logger.debug(s"Sending message(s): $mesg") + _ <- client.expectOr[Unit](POST(encode(mesg), uri)) { resp => + IO(new Exception(s"Unexpected response: $resp")) + } + } yield () + } + + def send[A](typedTopic: TypedTopic[A], body: A): IO[Unit] = + sendMessage(typedTopic.topic, body)(typedTopic.codec) + } +} + +object HttpClientOps { + private val logger: Logger[IO] = Logger.log4s(org.log4s.getLogger) +} diff --git a/modules/pubsub/naive/src/test/scala/docspell/pubsub/naive/NaivePubSubTest.scala b/modules/pubsub/naive/src/test/scala/docspell/pubsub/naive/NaivePubSubTest.scala new file mode 100644 index 00000000..c244e1da --- /dev/null +++ b/modules/pubsub/naive/src/test/scala/docspell/pubsub/naive/NaivePubSubTest.scala @@ -0,0 +1,128 @@ +/* + * Copyright 2020 Eike K. & Contributors + * + * SPDX-License-Identifier: AGPL-3.0-or-later + */ + +package docspell.pubsub.naive + +import scala.concurrent.duration._ + +import cats.effect._ +import cats.implicits._ +import fs2.concurrent.SignallingRef + +import docspell.common._ +import docspell.pubsub.api.Topics.{JobDoneMsg, JobSubmittedMsg} +import docspell.pubsub.api._ + +import munit.CatsEffectSuite + +class NaivePubSubTest extends CatsEffectSuite with Fixtures { + private[this] val logger = Logger.log4s[IO](org.log4s.getLogger) + + def subscribe[A](ps: PubSubT[IO, NaivePubSub[IO]], topic: TypedTopic[A]) = + for { + received <- Ref.of[IO, Option[Message[A]]](None) + halt <- SignallingRef.of[IO, Boolean](false) + fiber <- Async[IO].start( + logger.debug(s"${Thread.currentThread()} Listening for messages...") *> + ps.subscribe(topic) + .evalMap(m => + logger.debug(s"Handling message: $m") *> + received.set(Some(m)) *> + halt.set(true) + ) + .interruptWhen(halt) + .compile + .drain + ) + _ <- IO.sleep(500.millis) + } yield (received, halt, fiber) + + pubsubT.test("local publish receives message") { ps => + for { + res <- subscribe(ps, Topics.jobSubmitted) + (received, _, subFiber) = res + headSend <- ps.publish1(Topics.jobSubmitted, JobSubmittedMsg("hello".id)) + outcome <- subFiber.join + msgRec <- received.get + _ = assert(outcome.isSuccess) + _ = assertEquals(msgRec.map(_.head), Option(headSend)) + } yield () + } + + pubsubT.test("local publish to different topic doesn't receive") { ps => + val otherTopic = Topics.jobSubmitted.withTopic(Topic("other-name")) + for { + res <- subscribe(ps, Topics.jobSubmitted) + (received, halt, subFiber) = res + _ <- ps.publish1(otherTopic, JobSubmittedMsg("hello".id)) + _ <- IO.sleep(100.millis) //allow some time for receiving + _ <- halt.set(true) + outcome <- subFiber.join + _ = assert(outcome.isSuccess) + recMsg <- received.get + _ = assert(recMsg.isEmpty) + } yield () + } + + pubsubT.test("receive messages remotely") { ps => + val msg = JobSubmittedMsg("hello-remote".id) + for { + res <- subscribe(ps, Topics.jobSubmitted) + (received, _, subFiber) = res + client = httpClient(ps.delegate.receiveRoute) + _ <- client.send(Topics.jobSubmitted, msg) + outcome <- subFiber.join + msgRec <- received.get + _ = assert(outcome.isSuccess) + _ = assertEquals(msgRec.map(_.head.topic), Topics.jobSubmitted.topic.some) + _ = assertEquals(msgRec.map(_.body), msg.some) + } yield () + } + + pubsubEnv.test("send messages remotely") { env => + val msg = JobSubmittedMsg("hello-remote".id) + for { + // Create two pubsub instances connected to the same database + pubsubs <- conntectedPubsubs(env) + (ps1, ps2) = pubsubs + + // subscribe to ps1 and send via ps2 + res <- subscribe(ps1, Topics.jobSubmitted) + (received, _, subFiber) = res + _ <- ps2.publish1(Topics.jobSubmitted, msg) + outcome <- subFiber.join + msgRec <- received.get + + // check results + _ = assert(outcome.isSuccess) + _ = assertEquals(msgRec.map(_.head.topic), Topics.jobSubmitted.topic.some) + _ = assertEquals(msgRec.map(_.body), msg.some) + } yield () + } + + pubsubEnv.test("do not receive remote message from other topic") { env => + val msg = JobDoneMsg("job-1".id, "task-2".id) + + for { + // Create two pubsub instances connected to the same database + pubsubs <- conntectedPubsubs(env) + (ps1, ps2) = pubsubs + + // subscribe to ps1 and send via ps2 + res <- subscribe(ps1, Topics.jobSubmitted) + (received, halt, subFiber) = res + _ <- ps2.publish1(Topics.jobDone, msg) + _ <- IO.sleep(100.millis) + _ <- halt.set(true) + outcome <- subFiber.join + msgRec <- received.get + + // check results + _ = assert(outcome.isSuccess) + _ = assertEquals(msgRec, None) + } yield () + } +} diff --git a/modules/store/src/main/resources/db/migration/h2/V1.28.0__pubsub.sql b/modules/store/src/main/resources/db/migration/h2/V1.28.0__pubsub.sql new file mode 100644 index 00000000..ae42b13c --- /dev/null +++ b/modules/store/src/main/resources/db/migration/h2/V1.28.0__pubsub.sql @@ -0,0 +1,8 @@ +CREATE TABLE "pubsub" ( + "id" varchar(254) not null primary key, + "node_id" varchar(254) not null, + "url" varchar(254) not null, + "topic" varchar(254) not null, + "counter" int not null, + unique("url", "topic") +) diff --git a/modules/store/src/main/resources/db/migration/mariadb/V1.28.0__pubsub.sql b/modules/store/src/main/resources/db/migration/mariadb/V1.28.0__pubsub.sql new file mode 100644 index 00000000..2bd09016 --- /dev/null +++ b/modules/store/src/main/resources/db/migration/mariadb/V1.28.0__pubsub.sql @@ -0,0 +1,8 @@ +CREATE TABLE `pubsub` ( + `id` varchar(254) not null primary key, + `node_id` varchar(254) not null, + `url` varchar(254) not null, + `topic` varchar(254) not null, + `counter` int not null, + unique(`url`, `topic`) +) diff --git a/modules/store/src/main/resources/db/migration/postgresql/V1.28.0__pubsub.sql b/modules/store/src/main/resources/db/migration/postgresql/V1.28.0__pubsub.sql new file mode 100644 index 00000000..ae42b13c --- /dev/null +++ b/modules/store/src/main/resources/db/migration/postgresql/V1.28.0__pubsub.sql @@ -0,0 +1,8 @@ +CREATE TABLE "pubsub" ( + "id" varchar(254) not null primary key, + "node_id" varchar(254) not null, + "url" varchar(254) not null, + "topic" varchar(254) not null, + "counter" int not null, + unique("url", "topic") +) diff --git a/modules/store/src/main/scala/docspell/store/records/RPubSub.scala b/modules/store/src/main/scala/docspell/store/records/RPubSub.scala new file mode 100644 index 00000000..f0e2ce6e --- /dev/null +++ b/modules/store/src/main/scala/docspell/store/records/RPubSub.scala @@ -0,0 +1,86 @@ +/* + * Copyright 2020 Eike K. & Contributors + * + * SPDX-License-Identifier: AGPL-3.0-or-later + */ + +package docspell.store.records + +import cats.data.NonEmptyList +import cats.implicits._ + +import docspell.common._ +import docspell.store.qb.DSL._ +import docspell.store.qb.{Column, DML, TableDef} + +import doobie._ +import doobie.implicits._ + +/** A table for supporting naive pubsub across nodes. */ +final case class RPubSub( + id: Ident, + nodeId: Ident, + url: LenientUri, + topic: String, + counter: Int +) + +object RPubSub { + final case class Table(alias: Option[String]) extends TableDef { + val tableName: String = "pubsub" + + val id = Column[Ident]("id", this) + val nodeId = Column[Ident]("node_id", this) + val url = Column[LenientUri]("url", this) + val topic = Column[String]("topic", this) + val counter = Column[Int]("counter", this) + + val all: NonEmptyList[Column[_]] = + NonEmptyList.of(id, nodeId, url, topic, counter) + } + def as(alias: String): Table = + Table(Some(alias)) + + val T: Table = Table(None) + + def insert(r: RPubSub): ConnectionIO[Int] = + DML.insert(T, T.all, sql"${r.id}, ${r.nodeId}, ${r.url}, ${r.topic}, ${r.counter}") + + /** Insert all topics with counter = 0 */ + def initTopics( + nodeId: Ident, + url: LenientUri, + topics: NonEmptyList[String] + ): ConnectionIO[Int] = + DML.delete(T, T.nodeId === nodeId) *> + topics.toList + .traverse(t => + Ident + .randomId[ConnectionIO] + .flatMap(id => insert(RPubSub(id, nodeId, url, t, 0))) + ) + .map(_.sum) + + def increment(url: LenientUri, topics: NonEmptyList[String]): ConnectionIO[Int] = + DML.update( + T, + T.url === url && T.topic.in(topics), + DML.set( + T.counter.increment(1) + ) + ) + + def decrement(url: LenientUri, topics: NonEmptyList[String]): ConnectionIO[Int] = + DML.update( + T, + T.url === url && T.topic.in(topics), + DML.set( + T.counter.decrement(1) + ) + ) + + def findSubs(topic: String): ConnectionIO[List[LenientUri]] = + run(select(T.url), from(T), T.topic === topic && T.counter > 0) + .query[LenientUri] + .to[List] +} diff --git a/modules/store/src/test/resources/logback-test.xml b/modules/store/src/test/resources/logback-test.xml index 9cf93b57..6d2ef8c0 100644 --- a/modules/store/src/test/resources/logback-test.xml +++ b/modules/store/src/test/resources/logback-test.xml @@ -3,7 +3,7 @@ true - %highlight(%-5level) %cyan(%logger{15}) - %msg %n + %highlight(%-5level) [%t{10}] %cyan(%logger{15}) - %msg %n