diff --git a/build.sbt b/build.sbt
index 82ebe6c2..b47c36ce 100644
--- a/build.sbt
+++ b/build.sbt
@@ -410,6 +410,36 @@ val store = project
)
.dependsOn(common, query.jvm, totp, files)
+val pubsubApi = project
+ .in(file("modules/pubsub/api"))
+ .disablePlugins(RevolverPlugin)
+ .settings(sharedSettings)
+ .settings(testSettingsMUnit)
+ .settings(
+ name := "docspell-pubsub-api",
+ addCompilerPlugin(Dependencies.kindProjectorPlugin),
+ libraryDependencies ++=
+ Dependencies.fs2
+ )
+ .dependsOn(common)
+
+val pubsubNaive = project
+ .in(file("modules/pubsub/naive"))
+ .disablePlugins(RevolverPlugin)
+ .settings(sharedSettings)
+ .settings(testSettingsMUnit)
+ .settings(
+ name := "docspell-pubsub-naive",
+ addCompilerPlugin(Dependencies.kindProjectorPlugin),
+ libraryDependencies ++=
+ Dependencies.fs2 ++
+ Dependencies.http4sCirce ++
+ Dependencies.http4sDsl ++
+ Dependencies.http4sClient ++
+ Dependencies.circe
+ )
+ .dependsOn(common, pubsubApi, store % "compile->compile;test->test")
+
val extract = project
.in(file("modules/extract"))
.disablePlugins(RevolverPlugin)
@@ -781,7 +811,9 @@ val root = project
query.jvm,
query.js,
totp,
- oidc
+ oidc,
+ pubsubApi,
+ pubsubNaive
)
// --- Helpers
diff --git a/modules/pubsub/api/src/main/scala/docspell/pubsub/api/Message.scala b/modules/pubsub/api/src/main/scala/docspell/pubsub/api/Message.scala
new file mode 100644
index 00000000..c6d27c02
--- /dev/null
+++ b/modules/pubsub/api/src/main/scala/docspell/pubsub/api/Message.scala
@@ -0,0 +1,28 @@
+/*
+ * Copyright 2020 Eike K. & Contributors
+ *
+ * SPDX-License-Identifier: AGPL-3.0-or-later
+ */
+
+package docspell.pubsub.api
+
+import io.circe.generic.semiauto.{deriveDecoder, deriveEncoder}
+import io.circe.{Decoder, Encoder, Json}
+
+final case class Message[A](head: MessageHead, body: A) {}
+
+object Message {
+ implicit val jsonDecoderJson: Decoder[Message[Json]] =
+ deriveDecoder[Message[Json]]
+
+ implicit val jsonEncoderJson: Encoder[Message[Json]] =
+ deriveEncoder[Message[Json]]
+
+ implicit def jsonDecoder[A](implicit da: Decoder[A]): Decoder[Message[A]] =
+ jsonDecoderJson.emap(mj =>
+ da.decodeJson(mj.body).map(b => mj.copy(body = b)).left.map(_.message)
+ )
+
+ implicit def jsonEncoder[A](implicit ea: Encoder[A]): Encoder[Message[A]] =
+ jsonEncoderJson.contramap(m => m.copy(body = ea(m.body)))
+}
diff --git a/modules/pubsub/api/src/main/scala/docspell/pubsub/api/MessageHead.scala b/modules/pubsub/api/src/main/scala/docspell/pubsub/api/MessageHead.scala
new file mode 100644
index 00000000..c604b52d
--- /dev/null
+++ b/modules/pubsub/api/src/main/scala/docspell/pubsub/api/MessageHead.scala
@@ -0,0 +1,22 @@
+/*
+ * Copyright 2020 Eike K. & Contributors
+ *
+ * SPDX-License-Identifier: AGPL-3.0-or-later
+ */
+
+package docspell.pubsub.api
+
+import docspell.common.{Ident, Timestamp}
+
+import io.circe.generic.semiauto.{deriveDecoder, deriveEncoder}
+import io.circe.{Decoder, Encoder}
+
+final case class MessageHead(id: Ident, send: Timestamp, topic: Topic)
+
+object MessageHead {
+ implicit val jsonDecoder: Decoder[MessageHead] =
+ deriveDecoder[MessageHead]
+
+ implicit val jsonEncoder: Encoder[MessageHead] =
+ deriveEncoder[MessageHead]
+}
diff --git a/modules/pubsub/api/src/main/scala/docspell/pubsub/api/PubSub.scala b/modules/pubsub/api/src/main/scala/docspell/pubsub/api/PubSub.scala
new file mode 100644
index 00000000..4ae9ef6a
--- /dev/null
+++ b/modules/pubsub/api/src/main/scala/docspell/pubsub/api/PubSub.scala
@@ -0,0 +1,20 @@
+/*
+ * Copyright 2020 Eike K. & Contributors
+ *
+ * SPDX-License-Identifier: AGPL-3.0-or-later
+ */
+
+package docspell.pubsub.api
+
+import cats.data.NonEmptyList
+import fs2.{Pipe, Stream}
+
+import io.circe.Json
+
+trait PubSub[F[_]] {
+ def publish1(topic: Topic, msg: Json): F[MessageHead]
+
+ def publish(topic: Topic): Pipe[F, Json, MessageHead]
+
+ def subscribe(topics: NonEmptyList[Topic]): Stream[F, Message[Json]]
+}
diff --git a/modules/pubsub/api/src/main/scala/docspell/pubsub/api/PubSubT.scala b/modules/pubsub/api/src/main/scala/docspell/pubsub/api/PubSubT.scala
new file mode 100644
index 00000000..176e8885
--- /dev/null
+++ b/modules/pubsub/api/src/main/scala/docspell/pubsub/api/PubSubT.scala
@@ -0,0 +1,57 @@
+/*
+ * Copyright 2020 Eike K. & Contributors
+ *
+ * SPDX-License-Identifier: AGPL-3.0-or-later
+ */
+
+package docspell.pubsub.api
+
+import cats.data.NonEmptyList
+import fs2.{Pipe, Stream}
+
+import docspell.common.Logger
+
+trait PubSubT[F[_], P <: PubSub[F]] {
+
+ def publish1[A](topic: TypedTopic[A], msg: A): F[MessageHead]
+
+ def publish[A](topic: TypedTopic[A]): Pipe[F, A, MessageHead]
+
+ def subscribe[A](topic: TypedTopic[A]): Stream[F, Message[A]]
+
+ def delegate: P
+
+ def withDelegate[R <: PubSub[F]](delegate: R): PubSubT[F, R]
+}
+
+object PubSubT {
+
+ def apply[F[_], P <: PubSub[F]](pubSub: P, logger: Logger[F]): PubSubT[F, P] =
+ new PubSubT[F, P] {
+ def publish1[A](topic: TypedTopic[A], msg: A): F[MessageHead] =
+ pubSub.publish1(topic.topic, topic.codec(msg))
+
+ def publish[A](topic: TypedTopic[A]): Pipe[F, A, MessageHead] =
+ _.map(topic.codec.apply).through(pubSub.publish(topic.topic))
+
+ def subscribe[A](topic: TypedTopic[A]): Stream[F, Message[A]] =
+ pubSub
+ .subscribe(NonEmptyList.of(topic.topic))
+ .flatMap(m =>
+ m.body.as[A](topic.codec) match {
+ case Right(a) => Stream.emit(Message(m.head, a))
+ case Left(err) =>
+ logger.s
+ .error(err)(
+ s"Could not decode message to topic ${topic.name} to ${topic.msgClass}: ${m.body.noSpaces}"
+ )
+ .drain
+ }
+ )
+
+ def delegate: P = pubSub
+
+ def withDelegate[R <: PubSub[F]](newDelegate: R): PubSubT[F, R] =
+ PubSubT(newDelegate, logger)
+ }
+}
diff --git a/modules/pubsub/api/src/main/scala/docspell/pubsub/api/Topic.scala b/modules/pubsub/api/src/main/scala/docspell/pubsub/api/Topic.scala
new file mode 100644
index 00000000..ab9d664b
--- /dev/null
+++ b/modules/pubsub/api/src/main/scala/docspell/pubsub/api/Topic.scala
@@ -0,0 +1,26 @@
+/*
+ * Copyright 2020 Eike K. & Contributors
+ *
+ * SPDX-License-Identifier: AGPL-3.0-or-later
+ */
+
+package docspell.pubsub.api
+
+import io.circe.{Decoder, Encoder}
+
+final case class Topic private (topic: String) {
+ def name: String = topic
+}
+
+object Topic {
+ implicit val jsonDecoder: Decoder[Topic] =
+ Decoder.decodeString.map(Topic.apply)
+
+ implicit val jsonEncoder: Encoder[Topic] =
+ Encoder.encodeString.contramap(_.topic)
+
+ def apply(name: String): Topic = {
+ require(name.trim.nonEmpty)
+ new Topic(name)
+ }
+}
diff --git a/modules/pubsub/api/src/main/scala/docspell/pubsub/api/Topics.scala b/modules/pubsub/api/src/main/scala/docspell/pubsub/api/Topics.scala
new file mode 100644
index 00000000..78b22408
--- /dev/null
+++ b/modules/pubsub/api/src/main/scala/docspell/pubsub/api/Topics.scala
@@ -0,0 +1,47 @@
+/*
+ * Copyright 2020 Eike K. & Contributors
+ *
+ * SPDX-License-Identifier: AGPL-3.0-or-later
+ */
+
+package docspell.pubsub.api
+
+import cats.data.NonEmptyList
+
+import docspell.common.Ident
+
+import io.circe.generic.semiauto.{deriveDecoder, deriveEncoder}
+import io.circe.{Decoder, Encoder}
+
+/** All topics used in Docspell. */
+object Topics {
+
+ /** Notify when a job has finished. */
+ val jobDone: TypedTopic[JobDoneMsg] = TypedTopic[JobDoneMsg](Topic("job-done"))
+
+ /** Notify when a job has been submitted. The job executor listens to these messages to
+ * wake up and do its work.
+ */
+ val jobSubmitted: TypedTopic[JobSubmittedMsg] =
+ TypedTopic[JobSubmittedMsg](Topic("job-submitted"))
+
+ val all: NonEmptyList[TypedTopic[_]] = NonEmptyList.of(jobDone, jobSubmitted)
+
+ final case class JobSubmittedMsg(id: Ident)
+ object JobSubmittedMsg {
+ implicit val jsonDecoder: Decoder[JobSubmittedMsg] =
+ deriveDecoder[JobSubmittedMsg]
+
+ implicit val jsonEncoder: Encoder[JobSubmittedMsg] =
+ deriveEncoder[JobSubmittedMsg]
+ }
+
+ final case class JobDoneMsg(jobId: Ident, task: Ident)
+ object JobDoneMsg {
+ implicit val jsonDecoder: Decoder[JobDoneMsg] =
+ deriveDecoder[JobDoneMsg]
+
+ implicit val jsonEncoder: Encoder[JobDoneMsg] =
+ deriveEncoder[JobDoneMsg]
+ }
+}
diff --git a/modules/pubsub/api/src/main/scala/docspell/pubsub/api/TypedTopic.scala b/modules/pubsub/api/src/main/scala/docspell/pubsub/api/TypedTopic.scala
new file mode 100644
index 00000000..542144ee
--- /dev/null
+++ b/modules/pubsub/api/src/main/scala/docspell/pubsub/api/TypedTopic.scala
@@ -0,0 +1,29 @@
+/*
+ * Copyright 2020 Eike K. & Contributors
+ *
+ * SPDX-License-Identifier: AGPL-3.0-or-later
+ */
+
+package docspell.pubsub.api
+
+import scala.reflect.{ClassTag, classTag}
+
+import io.circe.{Codec, Decoder, Encoder}
+
+final case class TypedTopic[A](topic: Topic, codec: Codec[A], msgClass: Class[_]) {
+ def name: String = topic.name
+
+ def withTopic(topic: Topic): TypedTopic[A] =
+ copy(topic = topic)
+
+ def withName(name: String): TypedTopic[A] =
+ withTopic(Topic(name))
+}
+
+object TypedTopic {
+
+ def apply[A: ClassTag](
+ topic: Topic
+ )(implicit dec: Decoder[A], enc: Encoder[A]): TypedTopic[A] =
+ TypedTopic(topic, Codec.from(dec, enc), classTag[A].runtimeClass)
+}
diff --git a/modules/pubsub/naive/src/main/scala/docspell/pubsub/naive/NaivePubSub.scala b/modules/pubsub/naive/src/main/scala/docspell/pubsub/naive/NaivePubSub.scala
new file mode 100644
index 00000000..55bdc786
--- /dev/null
+++ b/modules/pubsub/naive/src/main/scala/docspell/pubsub/naive/NaivePubSub.scala
@@ -0,0 +1,212 @@
+/*
+ * Copyright 2020 Eike K. & Contributors
+ *
+ * SPDX-License-Identifier: AGPL-3.0-or-later
+ */
+
+package docspell.pubsub.naive
+
+import cats.data.NonEmptyList
+import cats.effect._
+import cats.implicits._
+import fs2.Pipe
+import fs2.Stream
+import fs2.concurrent.{Topic => Fs2Topic}
+
+import docspell.common._
+import docspell.pubsub.api._
+import docspell.pubsub.naive.NaivePubSub.State
+import docspell.store.Store
+import docspell.store.records.RPubSub
+
+import io.circe.Json
+import org.http4s.circe.CirceEntityCodec._
+import org.http4s.client.Client
+import org.http4s.client.dsl.Http4sClientDsl
+import org.http4s.dsl.Http4sDsl
+import org.http4s.{HttpRoutes, Uri}
+
+/** A pubsub implementation that can be used across machines, but uses a rather
+ * inefficient protocol. The reason is to make it work with the current setup, i.e. not
+ * requiring to add another complex piece of software to the mix, like Kafka or RabbitMQ.
+ *
+ * However, the api should allow to be used on top of such a tool. This implementation
+ * can be used in a personal setting, where there are only a few nodes.
+ *
+ * How it works:
+ *
+ * It is build on the `Topic` class from fs2.concurrent. A map of a topic name to such a
+ * `Topic` instance is maintained. To work across machines, the database is used as a
+ * synchronization point. Each subscriber must provide a http api and so its "callback"
+ * URL is added into the database to the list of remote subscribers.
+ *
+ * When publishing a message, the message can be published to the internal fs2 topic.
+ * Then all URLs to this topic name are looked up in the database and the message is
+ * POSTed to each URL as JSON. The endpoint of each machine takes this message and
+ * publishes it to its own internal fs2.concurrent.Topic instance.
+ *
+ * Obviously, there are drawbacks: it is slow, because the messages go through http and
+ * connections must be opened/closed etc and the database is hit as well. Then it doesn't
+ * scale to lots of machines and messages. The upside is, that it works with the current
+ * setup and it should be good enough for personal use, where there are only a small
+ * amount of machines and messages.
+ *
+ * The main use case for docspell is to communicate between the rest-server and job
+ * executor. It is for internal communication and all topics are known at compile time.
+ */
+final class NaivePubSub[F[_]: Async](
+ cfg: PubSubConfig,
+ state: Ref[F, State[F]],
+ store: Store[F],
+ client: Client[F]
+) extends PubSub[F] {
+ private val logger: Logger[F] = Logger.log4s(org.log4s.getLogger)
+
+ def withClient(client: Client[F]): NaivePubSub[F] =
+ new NaivePubSub[F](cfg, state, store, client)
+
+ def publish1(topic: Topic, msgBody: Json): F[MessageHead] =
+ for {
+ head <- mkMessageHead(topic)
+ msg = Message(head, msgBody)
+ _ <- logger.trace(s"Publishing: $msg")
+ // go through all local subscribers and publish to the fs2 topic
+ _ <- publishLocal(msg)
+ // get all remote subscribers from the database and send the message via http
+ _ <- publishRemote(msg)
+ } yield head
+
+ def publish(topic: Topic): Pipe[F, Json, MessageHead] =
+ ms => //TODO Do some optimization by grouping messages to the same topic
+ ms.evalMap(publish1(topic, _))
+
+ def subscribe(topics: NonEmptyList[Topic]): Stream[F, Message[Json]] =
+ (for {
+ _ <- logger.s.info(s"Adding subscriber for topics: $topics")
+ _ <- Stream.resource[F, Unit](addRemote(topics))
+ m <- Stream.eval(addLocal(topics))
+ } yield m).flatten
+
+ /** Receive messages from remote publishers and passes them to the local subscribers. */
+ def receiveRoute: HttpRoutes[F] = {
+ val dsl = new Http4sDsl[F] {}
+ import dsl._
+
+ HttpRoutes.of { case req @ POST -> Root =>
+ for {
+ data <- req.as[List[Message[Json]]]
+ _ <- logger.trace(s"Received external message(s): $data")
+ _ <- data.traverse(publishLocal)
+ resp <- Ok(())
+ } yield resp
+ }
+ }
+
+ // ---- private helpers
+
+ private def mkMessageHead(topic: Topic): F[MessageHead] =
+ for {
+ id <- Ident.randomId[F]
+ ts <- Timestamp.current[F]
+ head = MessageHead(id, ts, topic)
+ } yield head
+
+ private def addLocal(topics: NonEmptyList[Topic]): F[Stream[F, Message[Json]]] = {
+ val topicSet = topics.map(_.name).toList.toSet
+ for {
+ st <- state.get
+ tpc = st.topics.view.filterKeys(topicSet.contains)
+ _ <-
+ if (tpc.isEmpty)
+ logger.warn(s"Subscribing to 0 topics! Topics $topics were not initialized")
+ else ().pure[F]
+ data = tpc.values.toList.traverse(t => t.subscribe(cfg.subscriberQueueSize))
+ out = data.flatMap(msgs => Stream.emits(msgs))
+ } yield out
+ }
+
+ private def addRemote(topics: NonEmptyList[Topic]): Resource[F, Unit] = {
+ def subscribe: F[Unit] =
+ logger.trace(s"Incrementing counter for topics: $topics") *>
+ store.transact(RPubSub.increment(cfg.url, topics.map(_.name))).as(())
+
+ def unsubscribe: F[Unit] =
+ logger.trace(s"Decrementing counter for topics: $topics") *>
+ store.transact(RPubSub.decrement(cfg.url, topics.map(_.name))).as(())
+
+ Resource.make(subscribe)(_ => unsubscribe)
+ }
+
+ private def publishLocal(msg: Message[Json]): F[Unit] =
+ for {
+ st <- state.get
+ _ <- st.topics.get(msg.head.topic.name) match {
+ case Some(sub) =>
+ logger.trace(s"Publishing message to local topic: $msg") *>
+ sub.publish1(msg).as(())
+ case None =>
+ ().pure[F]
+ }
+ } yield ()
+
+ private def publishRemote(msg: Message[Json]): F[Unit] = {
+ val dsl = new Http4sDsl[F] with Http4sClientDsl[F] {}
+ import dsl._
+
+ for {
+ _ <- logger.trace(s"Find all nodes subscribed to topic ${msg.head.topic.name}")
+ urls <- store.transact(RPubSub.findSubs(msg.head.topic.name))
+ _ <- logger.trace(s"Publishing to remote urls ${urls.map(_.asString)}: $msg")
+ reqs = urls
+ .map(u => Uri.unsafeFromString(u.asString))
+ .map(uri => POST(List(msg), uri))
+ res <- reqs.traverse(req => client.status(req)).attempt
+ _ <- res match {
+ case Right(s) =>
+ if (s.forall(_.isSuccess)) ().pure[F]
+ else if (s.size == urls.size)
+ logger.warn(
+ s"No nodes was be reached! Reason: $s, message: $msg"
+ )
+ else
+ logger.warn(
+ s"Some nodes were not reached! Reason: $s, message: $msg"
+ )
+ case Left(ex) =>
+ logger.error(ex)(s"Error publishing ${msg.head.topic.name} message remotely")
+ }
+ } yield ()
+ }
+}
+
+object NaivePubSub {
+
+ def apply[F[_]: Async](
+ cfg: PubSubConfig,
+ store: Store[F],
+ client: Client[F]
+ )(topics: NonEmptyList[Topic]): F[NaivePubSub[F]] =
+ for {
+ state <- Ref.ofEffect[F, State[F]](State.create[F](topics))
+ _ <- store.transact(RPubSub.initTopics(cfg.nodeId, cfg.url, topics.map(_.name)))
+ } yield new NaivePubSub[F](cfg, state, store, client)
+
+ def create[F[_]: Async](
+ cfg: PubSubConfig,
+ store: Store[F],
+ client: Client[F],
+ logger: Logger[F]
+ )(topics: NonEmptyList[Topic]): F[PubSubT[F, NaivePubSub[F]]] =
+ apply[F](cfg, store, client)(topics).map(ps => PubSubT(ps, logger))
+
+ final case class State[F[_]](topics: Map[String, Fs2Topic[F, Message[Json]]]) {}
+
+ object State {
+ def empty[F[_]]: State[F] = State[F](Map.empty)
+ def create[F[_]: Async](topics: NonEmptyList[Topic]): F[State[F]] =
+ topics
+ .traverse(t => Fs2Topic[F, Message[Json]].map(fs2t => t.name -> fs2t))
+ .map(_.toList.toMap)
+ .map(State.apply)
+ }
+}
diff --git a/modules/pubsub/naive/src/main/scala/docspell/pubsub/naive/PubSubConfig.scala b/modules/pubsub/naive/src/main/scala/docspell/pubsub/naive/PubSubConfig.scala
new file mode 100644
index 00000000..b00131d6
--- /dev/null
+++ b/modules/pubsub/naive/src/main/scala/docspell/pubsub/naive/PubSubConfig.scala
@@ -0,0 +1,11 @@
+/*
+ * Copyright 2020 Eike K. & Contributors
+ *
+ * SPDX-License-Identifier: AGPL-3.0-or-later
+ */
+
+package docspell.pubsub.naive
+
+import docspell.common.{Ident, LenientUri}
+
+case class PubSubConfig(nodeId: Ident, url: LenientUri, subscriberQueueSize: Int)
diff --git a/modules/pubsub/naive/src/test/scala/docspell/pubsub/naive/Fixtures.scala b/modules/pubsub/naive/src/test/scala/docspell/pubsub/naive/Fixtures.scala
new file mode 100644
index 00000000..cd2a386a
--- /dev/null
+++ b/modules/pubsub/naive/src/test/scala/docspell/pubsub/naive/Fixtures.scala
@@ -0,0 +1,85 @@
+/*
+ * Copyright 2020 Eike K. & Contributors
+ *
+ * SPDX-License-Identifier: AGPL-3.0-or-later
+ */
+
+package docspell.pubsub.naive
+
+import cats.effect._
+
+import docspell.common._
+import docspell.pubsub.api._
+import docspell.store.{Store, StoreFixture}
+
+import munit.CatsEffectSuite
+import org.http4s.Response
+import org.http4s.client.Client
+
+trait Fixtures extends HttpClientOps { self: CatsEffectSuite =>
+
+ val pubsubEnv = ResourceFixture(Fixtures.envResource("node-1"))
+
+ val pubsubT = ResourceFixture {
+ Fixtures
+ .envResource("node-1")
+ .flatMap(e => Resource.eval(e.pubSub))
+ .map(ps => PubSubT(ps, Fixtures.loggerIO))
+ }
+
+ def conntectedPubsubs(env: Fixtures.Env) =
+ for {
+ // Create two pubsub instances connected to the same database
+ ps_1 <- env.withNodeId("node-1").pubSubT
+ ps_2 <- env.withNodeId("node-2").pubSubT
+
+ // both instances have a dummy client. now connect their clients to each other
+ ps1 = ps_1.withDelegate(ps_1.delegate.withClient(httpClient(ps_2)))
+ ps2 = ps_2.withDelegate(ps_2.delegate.withClient(httpClient(ps_1)))
+ } yield (ps1, ps2)
+
+ implicit final class StringId(s: String) {
+ def id: Ident = Ident.unsafe(s)
+ }
+}
+
+object Fixtures {
+ private val loggerIO: Logger[IO] = Logger.log4s(org.log4s.getLogger)
+
+ final case class Env(store: Store[IO], cfg: PubSubConfig) {
+ def pubSub: IO[NaivePubSub[IO]] = {
+ val dummyClient = Client[IO](_ => Resource.pure(Response.notFound[IO]))
+ NaivePubSub(cfg, store, dummyClient)(Topics.all.map(_.topic))
+ }
+ def pubSubT: IO[PubSubT[IO, NaivePubSub[IO]]] =
+ pubSub.map(PubSubT(_, loggerIO))
+
+ def withNodeId(nodeId: String): Env =
+ copy(cfg =
+ cfg.copy(
+ nodeId = Ident.unsafe(nodeId),
+ url = LenientUri.unsafe(s"http://$nodeId/")
+ )
+ )
+ }
+
+ def testConfig(nodeId: String) =
+ PubSubConfig(
+ Ident.unsafe(nodeId),
+ LenientUri.unsafe(s"http://${nodeId}/"),
+ 0
+ )
+
+ def storeResource: Resource[IO, Store[IO]] =
+ for {
+ random <- Resource.eval(Ident.randomId[IO])
+ cfg = StoreFixture.memoryDB(random.id.take(12))
+ store <- StoreFixture.store(cfg)
+ _ <- Resource.eval(store.migrate)
+ } yield store
+
+ def envResource(nodeId: String): Resource[IO, Env] =
+ for {
+ store <- storeResource
+ } yield Env(store, testConfig(nodeId))
+}
diff --git a/modules/pubsub/naive/src/test/scala/docspell/pubsub/naive/HttpClientOps.scala b/modules/pubsub/naive/src/test/scala/docspell/pubsub/naive/HttpClientOps.scala
new file mode 100644
index 00000000..cafdf0c2
--- /dev/null
+++ b/modules/pubsub/naive/src/test/scala/docspell/pubsub/naive/HttpClientOps.scala
@@ -0,0 +1,55 @@
+/*
+ * Copyright 2020 Eike K. & Contributors
+ *
+ * SPDX-License-Identifier: AGPL-3.0-or-later
+ */
+
+package docspell.pubsub.naive
+
+import cats.effect._
+
+import docspell.common._
+import docspell.pubsub.api._
+
+import io.circe.Encoder
+import org.http4s.circe.CirceEntityCodec._
+import org.http4s.client.Client
+import org.http4s.client.dsl.io._
+import org.http4s.dsl.io._
+import org.http4s.{HttpApp, HttpRoutes, Uri}
+
+trait HttpClientOps {
+ def httpClient(routes: HttpRoutes[IO]): Client[IO] =
+ Client.fromHttpApp(HttpApp(routes.orNotFound.run))
+
+ def httpClient(ps: NaivePubSub[IO]): Client[IO] =
+ httpClient(ps.receiveRoute)
+
+ def httpClient(ps: PubSubT[IO, NaivePubSub[IO]]): Client[IO] =
+ httpClient(ps.delegate)
+
+ implicit final class ClientOps(client: Client[IO]) {
+ val uri = Uri.unsafeFromString("http://localhost/")
+
+ def sendMessage[A: Encoder](topic: Topic, body: A): IO[Unit] = {
+ val encode: Encoder[List[Message[A]]] = implicitly[Encoder[List[Message[A]]]]
+
+ for {
+ id <- Ident.randomId[IO]
+ time <- Timestamp.current[IO]
+ mesg = List(Message(MessageHead(id, time, topic), body))
+ _ <- HttpClientOps.logger.debug(s"Sending message(s): $mesg")
+ _ <- client.expectOr[Unit](POST(encode(mesg), uri)) { resp =>
+ IO(new Exception(s"Unexpected response: $resp"))
+ }
+ } yield ()
+ }
+
+ def send[A](typedTopic: TypedTopic[A], body: A): IO[Unit] =
+ sendMessage(typedTopic.topic, body)(typedTopic.codec)
+ }
+}
+
+object HttpClientOps {
+ private val logger: Logger[IO] = Logger.log4s(org.log4s.getLogger)
+}
diff --git a/modules/pubsub/naive/src/test/scala/docspell/pubsub/naive/NaivePubSubTest.scala b/modules/pubsub/naive/src/test/scala/docspell/pubsub/naive/NaivePubSubTest.scala
new file mode 100644
index 00000000..c244e1da
--- /dev/null
+++ b/modules/pubsub/naive/src/test/scala/docspell/pubsub/naive/NaivePubSubTest.scala
@@ -0,0 +1,128 @@
+/*
+ * Copyright 2020 Eike K. & Contributors
+ *
+ * SPDX-License-Identifier: AGPL-3.0-or-later
+ */
+
+package docspell.pubsub.naive
+
+import scala.concurrent.duration._
+
+import cats.effect._
+import cats.implicits._
+import fs2.concurrent.SignallingRef
+
+import docspell.common._
+import docspell.pubsub.api.Topics.{JobDoneMsg, JobSubmittedMsg}
+import docspell.pubsub.api._
+
+import munit.CatsEffectSuite
+
+class NaivePubSubTest extends CatsEffectSuite with Fixtures {
+ private[this] val logger = Logger.log4s[IO](org.log4s.getLogger)
+
+ def subscribe[A](ps: PubSubT[IO, NaivePubSub[IO]], topic: TypedTopic[A]) =
+ for {
+ received <- Ref.of[IO, Option[Message[A]]](None)
+ halt <- SignallingRef.of[IO, Boolean](false)
+ fiber <- Async[IO].start(
+ logger.debug(s"${Thread.currentThread()} Listening for messages...") *>
+ ps.subscribe(topic)
+ .evalMap(m =>
+ logger.debug(s"Handling message: $m") *>
+ received.set(Some(m)) *>
+ halt.set(true)
+ )
+ .interruptWhen(halt)
+ .compile
+ .drain
+ )
+ _ <- IO.sleep(500.millis)
+ } yield (received, halt, fiber)
+
+ pubsubT.test("local publish receives message") { ps =>
+ for {
+ res <- subscribe(ps, Topics.jobSubmitted)
+ (received, _, subFiber) = res
+ headSend <- ps.publish1(Topics.jobSubmitted, JobSubmittedMsg("hello".id))
+ outcome <- subFiber.join
+ msgRec <- received.get
+ _ = assert(outcome.isSuccess)
+ _ = assertEquals(msgRec.map(_.head), Option(headSend))
+ } yield ()
+ }
+
+ pubsubT.test("local publish to different topic doesn't receive") { ps =>
+ val otherTopic = Topics.jobSubmitted.withTopic(Topic("other-name"))
+ for {
+ res <- subscribe(ps, Topics.jobSubmitted)
+ (received, halt, subFiber) = res
+ _ <- ps.publish1(otherTopic, JobSubmittedMsg("hello".id))
+ _ <- IO.sleep(100.millis) //allow some time for receiving
+ _ <- halt.set(true)
+ outcome <- subFiber.join
+ _ = assert(outcome.isSuccess)
+ recMsg <- received.get
+ _ = assert(recMsg.isEmpty)
+ } yield ()
+ }
+
+ pubsubT.test("receive messages remotely") { ps =>
+ val msg = JobSubmittedMsg("hello-remote".id)
+ for {
+ res <- subscribe(ps, Topics.jobSubmitted)
+ (received, _, subFiber) = res
+ client = httpClient(ps.delegate.receiveRoute)
+ _ <- client.send(Topics.jobSubmitted, msg)
+ outcome <- subFiber.join
+ msgRec <- received.get
+ _ = assert(outcome.isSuccess)
+ _ = assertEquals(msgRec.map(_.head.topic), Topics.jobSubmitted.topic.some)
+ _ = assertEquals(msgRec.map(_.body), msg.some)
+ } yield ()
+ }
+
+ pubsubEnv.test("send messages remotely") { env =>
+ val msg = JobSubmittedMsg("hello-remote".id)
+ for {
+ // Create two pubsub instances connected to the same database
+ pubsubs <- conntectedPubsubs(env)
+ (ps1, ps2) = pubsubs
+
+ // subscribe to ps1 and send via ps2
+ res <- subscribe(ps1, Topics.jobSubmitted)
+ (received, _, subFiber) = res
+ _ <- ps2.publish1(Topics.jobSubmitted, msg)
+ outcome <- subFiber.join
+ msgRec <- received.get
+
+ // check results
+ _ = assert(outcome.isSuccess)
+ _ = assertEquals(msgRec.map(_.head.topic), Topics.jobSubmitted.topic.some)
+ _ = assertEquals(msgRec.map(_.body), msg.some)
+ } yield ()
+ }
+
+ pubsubEnv.test("do not receive remote message from other topic") { env =>
+ val msg = JobDoneMsg("job-1".id, "task-2".id)
+
+ for {
+ // Create two pubsub instances connected to the same database
+ pubsubs <- conntectedPubsubs(env)
+ (ps1, ps2) = pubsubs
+
+ // subscribe to ps1 and send via ps2
+ res <- subscribe(ps1, Topics.jobSubmitted)
+ (received, halt, subFiber) = res
+ _ <- ps2.publish1(Topics.jobDone, msg)
+ _ <- IO.sleep(100.millis)
+ _ <- halt.set(true)
+ outcome <- subFiber.join
+ msgRec <- received.get
+
+ // check results
+ _ = assert(outcome.isSuccess)
+ _ = assertEquals(msgRec, None)
+ } yield ()
+ }
+}
diff --git a/modules/store/src/main/resources/db/migration/h2/V1.28.0__pubsub.sql b/modules/store/src/main/resources/db/migration/h2/V1.28.0__pubsub.sql
new file mode 100644
index 00000000..ae42b13c
--- /dev/null
+++ b/modules/store/src/main/resources/db/migration/h2/V1.28.0__pubsub.sql
@@ -0,0 +1,8 @@
+CREATE TABLE "pubsub" (
+ "id" varchar(254) not null primary key,
+ "node_id" varchar(254) not null,
+ "url" varchar(254) not null,
+ "topic" varchar(254) not null,
+ "counter" int not null,
+ unique("url", "topic")
+)
diff --git a/modules/store/src/main/resources/db/migration/mariadb/V1.28.0__pubsub.sql b/modules/store/src/main/resources/db/migration/mariadb/V1.28.0__pubsub.sql
new file mode 100644
index 00000000..2bd09016
--- /dev/null
+++ b/modules/store/src/main/resources/db/migration/mariadb/V1.28.0__pubsub.sql
@@ -0,0 +1,8 @@
+CREATE TABLE `pubsub` (
+ `id` varchar(254) not null primary key,
+ `node_id` varchar(254) not null,
+ `url` varchar(254) not null,
+ `topic` varchar(254) not null,
+ `counter` int not null,
+ unique(`url`, `topic`)
+)
diff --git a/modules/store/src/main/resources/db/migration/postgresql/V1.28.0__pubsub.sql b/modules/store/src/main/resources/db/migration/postgresql/V1.28.0__pubsub.sql
new file mode 100644
index 00000000..ae42b13c
--- /dev/null
+++ b/modules/store/src/main/resources/db/migration/postgresql/V1.28.0__pubsub.sql
@@ -0,0 +1,8 @@
+CREATE TABLE "pubsub" (
+ "id" varchar(254) not null primary key,
+ "node_id" varchar(254) not null,
+ "url" varchar(254) not null,
+ "topic" varchar(254) not null,
+ "counter" int not null,
+ unique("url", "topic")
+)
diff --git a/modules/store/src/main/scala/docspell/store/records/RPubSub.scala b/modules/store/src/main/scala/docspell/store/records/RPubSub.scala
new file mode 100644
index 00000000..f0e2ce6e
--- /dev/null
+++ b/modules/store/src/main/scala/docspell/store/records/RPubSub.scala
@@ -0,0 +1,86 @@
+/*
+ * Copyright 2020 Eike K. & Contributors
+ *
+ * SPDX-License-Identifier: AGPL-3.0-or-later
+ */
+
+package docspell.store.records
+
+import cats.data.NonEmptyList
+import cats.implicits._
+
+import docspell.common._
+import docspell.store.qb.DSL._
+import docspell.store.qb.{Column, DML, TableDef}
+
+import doobie._
+import doobie.implicits._
+
+/** A table for supporting naive pubsub across nodes. */
+final case class RPubSub(
+ id: Ident,
+ nodeId: Ident,
+ url: LenientUri,
+ topic: String,
+ counter: Int
+)
+
+object RPubSub {
+ final case class Table(alias: Option[String]) extends TableDef {
+ val tableName: String = "pubsub"
+
+ val id = Column[Ident]("id", this)
+ val nodeId = Column[Ident]("node_id", this)
+ val url = Column[LenientUri]("url", this)
+ val topic = Column[String]("topic", this)
+ val counter = Column[Int]("counter", this)
+
+ val all: NonEmptyList[Column[_]] =
+ NonEmptyList.of(id, nodeId, url, topic, counter)
+ }
+ def as(alias: String): Table =
+ Table(Some(alias))
+
+ val T: Table = Table(None)
+
+ def insert(r: RPubSub): ConnectionIO[Int] =
+ DML.insert(T, T.all, sql"${r.id}, ${r.nodeId}, ${r.url}, ${r.topic}, ${r.counter}")
+
+ /** Insert all topics with counter = 0 */
+ def initTopics(
+ nodeId: Ident,
+ url: LenientUri,
+ topics: NonEmptyList[String]
+ ): ConnectionIO[Int] =
+ DML.delete(T, T.nodeId === nodeId) *>
+ topics.toList
+ .traverse(t =>
+ Ident
+ .randomId[ConnectionIO]
+ .flatMap(id => insert(RPubSub(id, nodeId, url, t, 0)))
+ )
+ .map(_.sum)
+
+ def increment(url: LenientUri, topics: NonEmptyList[String]): ConnectionIO[Int] =
+ DML.update(
+ T,
+ T.url === url && T.topic.in(topics),
+ DML.set(
+ T.counter.increment(1)
+ )
+ )
+
+ def decrement(url: LenientUri, topics: NonEmptyList[String]): ConnectionIO[Int] =
+ DML.update(
+ T,
+ T.url === url && T.topic.in(topics),
+ DML.set(
+ T.counter.decrement(1)
+ )
+ )
+
+ def findSubs(topic: String): ConnectionIO[List[LenientUri]] =
+ run(select(T.url), from(T), T.topic === topic && T.counter > 0)
+ .query[LenientUri]
+ .to[List]
+}
diff --git a/modules/store/src/test/resources/logback-test.xml b/modules/store/src/test/resources/logback-test.xml
index 9cf93b57..6d2ef8c0 100644
--- a/modules/store/src/test/resources/logback-test.xml
+++ b/modules/store/src/test/resources/logback-test.xml
@@ -3,7 +3,7 @@
true
- %highlight(%-5level) %cyan(%logger{15}) - %msg %n
+ %highlight(%-5level) [%t{10}] %cyan(%logger{15}) - %msg %n