mirror of
https://github.com/TheAnachronism/docspell.git
synced 2025-06-22 02:18:26 +00:00
Initial naive pubsub impl generalising from current setup
This commit is contained in:
@ -0,0 +1,212 @@
|
||||
/*
|
||||
* Copyright 2020 Eike K. & Contributors
|
||||
*
|
||||
* SPDX-License-Identifier: AGPL-3.0-or-later
|
||||
*/
|
||||
|
||||
package docspell.pubsub.naive
|
||||
|
||||
import cats.data.NonEmptyList
|
||||
import cats.effect._
|
||||
import cats.implicits._
|
||||
import fs2.Pipe
|
||||
import fs2.Stream
|
||||
import fs2.concurrent.{Topic => Fs2Topic}
|
||||
|
||||
import docspell.common._
|
||||
import docspell.pubsub.api._
|
||||
import docspell.pubsub.naive.NaivePubSub.State
|
||||
import docspell.store.Store
|
||||
import docspell.store.records.RPubSub
|
||||
|
||||
import io.circe.Json
|
||||
import org.http4s.circe.CirceEntityCodec._
|
||||
import org.http4s.client.Client
|
||||
import org.http4s.client.dsl.Http4sClientDsl
|
||||
import org.http4s.dsl.Http4sDsl
|
||||
import org.http4s.{HttpRoutes, Uri}
|
||||
|
||||
/** A pubsub implementation that can be used across machines, but uses a rather
|
||||
* inefficient protocol. The reason is to make it work with the current setup, i.e. not
|
||||
* requiring to add another complex piece of software to the mix, like Kafka or RabbitMQ.
|
||||
*
|
||||
* However, the api should allow to be used on top of such a tool. This implementation
|
||||
* can be used in a personal setting, where there are only a few nodes.
|
||||
*
|
||||
* How it works:
|
||||
*
|
||||
* It is build on the `Topic` class from fs2.concurrent. A map of a topic name to such a
|
||||
* `Topic` instance is maintained. To work across machines, the database is used as a
|
||||
* synchronization point. Each subscriber must provide a http api and so its "callback"
|
||||
* URL is added into the database to the list of remote subscribers.
|
||||
*
|
||||
* When publishing a message, the message can be published to the internal fs2 topic.
|
||||
* Then all URLs to this topic name are looked up in the database and the message is
|
||||
* POSTed to each URL as JSON. The endpoint of each machine takes this message and
|
||||
* publishes it to its own internal fs2.concurrent.Topic instance.
|
||||
*
|
||||
* Obviously, there are drawbacks: it is slow, because the messages go through http and
|
||||
* connections must be opened/closed etc and the database is hit as well. Then it doesn't
|
||||
* scale to lots of machines and messages. The upside is, that it works with the current
|
||||
* setup and it should be good enough for personal use, where there are only a small
|
||||
* amount of machines and messages.
|
||||
*
|
||||
* The main use case for docspell is to communicate between the rest-server and job
|
||||
* executor. It is for internal communication and all topics are known at compile time.
|
||||
*/
|
||||
final class NaivePubSub[F[_]: Async](
|
||||
cfg: PubSubConfig,
|
||||
state: Ref[F, State[F]],
|
||||
store: Store[F],
|
||||
client: Client[F]
|
||||
) extends PubSub[F] {
|
||||
private val logger: Logger[F] = Logger.log4s(org.log4s.getLogger)
|
||||
|
||||
def withClient(client: Client[F]): NaivePubSub[F] =
|
||||
new NaivePubSub[F](cfg, state, store, client)
|
||||
|
||||
def publish1(topic: Topic, msgBody: Json): F[MessageHead] =
|
||||
for {
|
||||
head <- mkMessageHead(topic)
|
||||
msg = Message(head, msgBody)
|
||||
_ <- logger.trace(s"Publishing: $msg")
|
||||
// go through all local subscribers and publish to the fs2 topic
|
||||
_ <- publishLocal(msg)
|
||||
// get all remote subscribers from the database and send the message via http
|
||||
_ <- publishRemote(msg)
|
||||
} yield head
|
||||
|
||||
def publish(topic: Topic): Pipe[F, Json, MessageHead] =
|
||||
ms => //TODO Do some optimization by grouping messages to the same topic
|
||||
ms.evalMap(publish1(topic, _))
|
||||
|
||||
def subscribe(topics: NonEmptyList[Topic]): Stream[F, Message[Json]] =
|
||||
(for {
|
||||
_ <- logger.s.info(s"Adding subscriber for topics: $topics")
|
||||
_ <- Stream.resource[F, Unit](addRemote(topics))
|
||||
m <- Stream.eval(addLocal(topics))
|
||||
} yield m).flatten
|
||||
|
||||
/** Receive messages from remote publishers and passes them to the local subscribers. */
|
||||
def receiveRoute: HttpRoutes[F] = {
|
||||
val dsl = new Http4sDsl[F] {}
|
||||
import dsl._
|
||||
|
||||
HttpRoutes.of { case req @ POST -> Root =>
|
||||
for {
|
||||
data <- req.as[List[Message[Json]]]
|
||||
_ <- logger.trace(s"Received external message(s): $data")
|
||||
_ <- data.traverse(publishLocal)
|
||||
resp <- Ok(())
|
||||
} yield resp
|
||||
}
|
||||
}
|
||||
|
||||
// ---- private helpers
|
||||
|
||||
private def mkMessageHead(topic: Topic): F[MessageHead] =
|
||||
for {
|
||||
id <- Ident.randomId[F]
|
||||
ts <- Timestamp.current[F]
|
||||
head = MessageHead(id, ts, topic)
|
||||
} yield head
|
||||
|
||||
private def addLocal(topics: NonEmptyList[Topic]): F[Stream[F, Message[Json]]] = {
|
||||
val topicSet = topics.map(_.name).toList.toSet
|
||||
for {
|
||||
st <- state.get
|
||||
tpc = st.topics.view.filterKeys(topicSet.contains)
|
||||
_ <-
|
||||
if (tpc.isEmpty)
|
||||
logger.warn(s"Subscribing to 0 topics! Topics $topics were not initialized")
|
||||
else ().pure[F]
|
||||
data = tpc.values.toList.traverse(t => t.subscribe(cfg.subscriberQueueSize))
|
||||
out = data.flatMap(msgs => Stream.emits(msgs))
|
||||
} yield out
|
||||
}
|
||||
|
||||
private def addRemote(topics: NonEmptyList[Topic]): Resource[F, Unit] = {
|
||||
def subscribe: F[Unit] =
|
||||
logger.trace(s"Incrementing counter for topics: $topics") *>
|
||||
store.transact(RPubSub.increment(cfg.url, topics.map(_.name))).as(())
|
||||
|
||||
def unsubscribe: F[Unit] =
|
||||
logger.trace(s"Decrementing counter for topics: $topics") *>
|
||||
store.transact(RPubSub.decrement(cfg.url, topics.map(_.name))).as(())
|
||||
|
||||
Resource.make(subscribe)(_ => unsubscribe)
|
||||
}
|
||||
|
||||
private def publishLocal(msg: Message[Json]): F[Unit] =
|
||||
for {
|
||||
st <- state.get
|
||||
_ <- st.topics.get(msg.head.topic.name) match {
|
||||
case Some(sub) =>
|
||||
logger.trace(s"Publishing message to local topic: $msg") *>
|
||||
sub.publish1(msg).as(())
|
||||
case None =>
|
||||
().pure[F]
|
||||
}
|
||||
} yield ()
|
||||
|
||||
private def publishRemote(msg: Message[Json]): F[Unit] = {
|
||||
val dsl = new Http4sDsl[F] with Http4sClientDsl[F] {}
|
||||
import dsl._
|
||||
|
||||
for {
|
||||
_ <- logger.trace(s"Find all nodes subscribed to topic ${msg.head.topic.name}")
|
||||
urls <- store.transact(RPubSub.findSubs(msg.head.topic.name))
|
||||
_ <- logger.trace(s"Publishing to remote urls ${urls.map(_.asString)}: $msg")
|
||||
reqs = urls
|
||||
.map(u => Uri.unsafeFromString(u.asString))
|
||||
.map(uri => POST(List(msg), uri))
|
||||
res <- reqs.traverse(req => client.status(req)).attempt
|
||||
_ <- res match {
|
||||
case Right(s) =>
|
||||
if (s.forall(_.isSuccess)) ().pure[F]
|
||||
else if (s.size == urls.size)
|
||||
logger.warn(
|
||||
s"No nodes was be reached! Reason: $s, message: $msg"
|
||||
)
|
||||
else
|
||||
logger.warn(
|
||||
s"Some nodes were not reached! Reason: $s, message: $msg"
|
||||
)
|
||||
case Left(ex) =>
|
||||
logger.error(ex)(s"Error publishing ${msg.head.topic.name} message remotely")
|
||||
}
|
||||
} yield ()
|
||||
}
|
||||
}
|
||||
|
||||
object NaivePubSub {
|
||||
|
||||
def apply[F[_]: Async](
|
||||
cfg: PubSubConfig,
|
||||
store: Store[F],
|
||||
client: Client[F]
|
||||
)(topics: NonEmptyList[Topic]): F[NaivePubSub[F]] =
|
||||
for {
|
||||
state <- Ref.ofEffect[F, State[F]](State.create[F](topics))
|
||||
_ <- store.transact(RPubSub.initTopics(cfg.nodeId, cfg.url, topics.map(_.name)))
|
||||
} yield new NaivePubSub[F](cfg, state, store, client)
|
||||
|
||||
def create[F[_]: Async](
|
||||
cfg: PubSubConfig,
|
||||
store: Store[F],
|
||||
client: Client[F],
|
||||
logger: Logger[F]
|
||||
)(topics: NonEmptyList[Topic]): F[PubSubT[F, NaivePubSub[F]]] =
|
||||
apply[F](cfg, store, client)(topics).map(ps => PubSubT(ps, logger))
|
||||
|
||||
final case class State[F[_]](topics: Map[String, Fs2Topic[F, Message[Json]]]) {}
|
||||
|
||||
object State {
|
||||
def empty[F[_]]: State[F] = State[F](Map.empty)
|
||||
def create[F[_]: Async](topics: NonEmptyList[Topic]): F[State[F]] =
|
||||
topics
|
||||
.traverse(t => Fs2Topic[F, Message[Json]].map(fs2t => t.name -> fs2t))
|
||||
.map(_.toList.toMap)
|
||||
.map(State.apply)
|
||||
}
|
||||
}
|
@ -0,0 +1,11 @@
|
||||
/*
|
||||
* Copyright 2020 Eike K. & Contributors
|
||||
*
|
||||
* SPDX-License-Identifier: AGPL-3.0-or-later
|
||||
*/
|
||||
|
||||
package docspell.pubsub.naive
|
||||
|
||||
import docspell.common.{Ident, LenientUri}
|
||||
|
||||
case class PubSubConfig(nodeId: Ident, url: LenientUri, subscriberQueueSize: Int)
|
@ -0,0 +1,85 @@
|
||||
/*
|
||||
* Copyright 2020 Eike K. & Contributors
|
||||
*
|
||||
* SPDX-License-Identifier: AGPL-3.0-or-later
|
||||
*/
|
||||
|
||||
package docspell.pubsub.naive
|
||||
|
||||
import cats.effect._
|
||||
|
||||
import docspell.common._
|
||||
import docspell.pubsub.api._
|
||||
import docspell.store.{Store, StoreFixture}
|
||||
|
||||
import munit.CatsEffectSuite
|
||||
import org.http4s.Response
|
||||
import org.http4s.client.Client
|
||||
|
||||
trait Fixtures extends HttpClientOps { self: CatsEffectSuite =>
|
||||
|
||||
val pubsubEnv = ResourceFixture(Fixtures.envResource("node-1"))
|
||||
|
||||
val pubsubT = ResourceFixture {
|
||||
Fixtures
|
||||
.envResource("node-1")
|
||||
.flatMap(e => Resource.eval(e.pubSub))
|
||||
.map(ps => PubSubT(ps, Fixtures.loggerIO))
|
||||
}
|
||||
|
||||
def conntectedPubsubs(env: Fixtures.Env) =
|
||||
for {
|
||||
// Create two pubsub instances connected to the same database
|
||||
ps_1 <- env.withNodeId("node-1").pubSubT
|
||||
ps_2 <- env.withNodeId("node-2").pubSubT
|
||||
|
||||
// both instances have a dummy client. now connect their clients to each other
|
||||
ps1 = ps_1.withDelegate(ps_1.delegate.withClient(httpClient(ps_2)))
|
||||
ps2 = ps_2.withDelegate(ps_2.delegate.withClient(httpClient(ps_1)))
|
||||
} yield (ps1, ps2)
|
||||
|
||||
implicit final class StringId(s: String) {
|
||||
def id: Ident = Ident.unsafe(s)
|
||||
}
|
||||
}
|
||||
|
||||
object Fixtures {
|
||||
private val loggerIO: Logger[IO] = Logger.log4s(org.log4s.getLogger)
|
||||
|
||||
final case class Env(store: Store[IO], cfg: PubSubConfig) {
|
||||
def pubSub: IO[NaivePubSub[IO]] = {
|
||||
val dummyClient = Client[IO](_ => Resource.pure(Response.notFound[IO]))
|
||||
NaivePubSub(cfg, store, dummyClient)(Topics.all.map(_.topic))
|
||||
}
|
||||
def pubSubT: IO[PubSubT[IO, NaivePubSub[IO]]] =
|
||||
pubSub.map(PubSubT(_, loggerIO))
|
||||
|
||||
def withNodeId(nodeId: String): Env =
|
||||
copy(cfg =
|
||||
cfg.copy(
|
||||
nodeId = Ident.unsafe(nodeId),
|
||||
url = LenientUri.unsafe(s"http://$nodeId/")
|
||||
)
|
||||
)
|
||||
}
|
||||
|
||||
def testConfig(nodeId: String) =
|
||||
PubSubConfig(
|
||||
Ident.unsafe(nodeId),
|
||||
LenientUri.unsafe(s"http://${nodeId}/"),
|
||||
0
|
||||
)
|
||||
|
||||
def storeResource: Resource[IO, Store[IO]] =
|
||||
for {
|
||||
random <- Resource.eval(Ident.randomId[IO])
|
||||
cfg = StoreFixture.memoryDB(random.id.take(12))
|
||||
store <- StoreFixture.store(cfg)
|
||||
_ <- Resource.eval(store.migrate)
|
||||
} yield store
|
||||
|
||||
def envResource(nodeId: String): Resource[IO, Env] =
|
||||
for {
|
||||
store <- storeResource
|
||||
} yield Env(store, testConfig(nodeId))
|
||||
}
|
@ -0,0 +1,55 @@
|
||||
/*
|
||||
* Copyright 2020 Eike K. & Contributors
|
||||
*
|
||||
* SPDX-License-Identifier: AGPL-3.0-or-later
|
||||
*/
|
||||
|
||||
package docspell.pubsub.naive
|
||||
|
||||
import cats.effect._
|
||||
|
||||
import docspell.common._
|
||||
import docspell.pubsub.api._
|
||||
|
||||
import io.circe.Encoder
|
||||
import org.http4s.circe.CirceEntityCodec._
|
||||
import org.http4s.client.Client
|
||||
import org.http4s.client.dsl.io._
|
||||
import org.http4s.dsl.io._
|
||||
import org.http4s.{HttpApp, HttpRoutes, Uri}
|
||||
|
||||
trait HttpClientOps {
|
||||
def httpClient(routes: HttpRoutes[IO]): Client[IO] =
|
||||
Client.fromHttpApp(HttpApp(routes.orNotFound.run))
|
||||
|
||||
def httpClient(ps: NaivePubSub[IO]): Client[IO] =
|
||||
httpClient(ps.receiveRoute)
|
||||
|
||||
def httpClient(ps: PubSubT[IO, NaivePubSub[IO]]): Client[IO] =
|
||||
httpClient(ps.delegate)
|
||||
|
||||
implicit final class ClientOps(client: Client[IO]) {
|
||||
val uri = Uri.unsafeFromString("http://localhost/")
|
||||
|
||||
def sendMessage[A: Encoder](topic: Topic, body: A): IO[Unit] = {
|
||||
val encode: Encoder[List[Message[A]]] = implicitly[Encoder[List[Message[A]]]]
|
||||
|
||||
for {
|
||||
id <- Ident.randomId[IO]
|
||||
time <- Timestamp.current[IO]
|
||||
mesg = List(Message(MessageHead(id, time, topic), body))
|
||||
_ <- HttpClientOps.logger.debug(s"Sending message(s): $mesg")
|
||||
_ <- client.expectOr[Unit](POST(encode(mesg), uri)) { resp =>
|
||||
IO(new Exception(s"Unexpected response: $resp"))
|
||||
}
|
||||
} yield ()
|
||||
}
|
||||
|
||||
def send[A](typedTopic: TypedTopic[A], body: A): IO[Unit] =
|
||||
sendMessage(typedTopic.topic, body)(typedTopic.codec)
|
||||
}
|
||||
}
|
||||
|
||||
object HttpClientOps {
|
||||
private val logger: Logger[IO] = Logger.log4s(org.log4s.getLogger)
|
||||
}
|
@ -0,0 +1,128 @@
|
||||
/*
|
||||
* Copyright 2020 Eike K. & Contributors
|
||||
*
|
||||
* SPDX-License-Identifier: AGPL-3.0-or-later
|
||||
*/
|
||||
|
||||
package docspell.pubsub.naive
|
||||
|
||||
import scala.concurrent.duration._
|
||||
|
||||
import cats.effect._
|
||||
import cats.implicits._
|
||||
import fs2.concurrent.SignallingRef
|
||||
|
||||
import docspell.common._
|
||||
import docspell.pubsub.api.Topics.{JobDoneMsg, JobSubmittedMsg}
|
||||
import docspell.pubsub.api._
|
||||
|
||||
import munit.CatsEffectSuite
|
||||
|
||||
class NaivePubSubTest extends CatsEffectSuite with Fixtures {
|
||||
private[this] val logger = Logger.log4s[IO](org.log4s.getLogger)
|
||||
|
||||
def subscribe[A](ps: PubSubT[IO, NaivePubSub[IO]], topic: TypedTopic[A]) =
|
||||
for {
|
||||
received <- Ref.of[IO, Option[Message[A]]](None)
|
||||
halt <- SignallingRef.of[IO, Boolean](false)
|
||||
fiber <- Async[IO].start(
|
||||
logger.debug(s"${Thread.currentThread()} Listening for messages...") *>
|
||||
ps.subscribe(topic)
|
||||
.evalMap(m =>
|
||||
logger.debug(s"Handling message: $m") *>
|
||||
received.set(Some(m)) *>
|
||||
halt.set(true)
|
||||
)
|
||||
.interruptWhen(halt)
|
||||
.compile
|
||||
.drain
|
||||
)
|
||||
_ <- IO.sleep(500.millis)
|
||||
} yield (received, halt, fiber)
|
||||
|
||||
pubsubT.test("local publish receives message") { ps =>
|
||||
for {
|
||||
res <- subscribe(ps, Topics.jobSubmitted)
|
||||
(received, _, subFiber) = res
|
||||
headSend <- ps.publish1(Topics.jobSubmitted, JobSubmittedMsg("hello".id))
|
||||
outcome <- subFiber.join
|
||||
msgRec <- received.get
|
||||
_ = assert(outcome.isSuccess)
|
||||
_ = assertEquals(msgRec.map(_.head), Option(headSend))
|
||||
} yield ()
|
||||
}
|
||||
|
||||
pubsubT.test("local publish to different topic doesn't receive") { ps =>
|
||||
val otherTopic = Topics.jobSubmitted.withTopic(Topic("other-name"))
|
||||
for {
|
||||
res <- subscribe(ps, Topics.jobSubmitted)
|
||||
(received, halt, subFiber) = res
|
||||
_ <- ps.publish1(otherTopic, JobSubmittedMsg("hello".id))
|
||||
_ <- IO.sleep(100.millis) //allow some time for receiving
|
||||
_ <- halt.set(true)
|
||||
outcome <- subFiber.join
|
||||
_ = assert(outcome.isSuccess)
|
||||
recMsg <- received.get
|
||||
_ = assert(recMsg.isEmpty)
|
||||
} yield ()
|
||||
}
|
||||
|
||||
pubsubT.test("receive messages remotely") { ps =>
|
||||
val msg = JobSubmittedMsg("hello-remote".id)
|
||||
for {
|
||||
res <- subscribe(ps, Topics.jobSubmitted)
|
||||
(received, _, subFiber) = res
|
||||
client = httpClient(ps.delegate.receiveRoute)
|
||||
_ <- client.send(Topics.jobSubmitted, msg)
|
||||
outcome <- subFiber.join
|
||||
msgRec <- received.get
|
||||
_ = assert(outcome.isSuccess)
|
||||
_ = assertEquals(msgRec.map(_.head.topic), Topics.jobSubmitted.topic.some)
|
||||
_ = assertEquals(msgRec.map(_.body), msg.some)
|
||||
} yield ()
|
||||
}
|
||||
|
||||
pubsubEnv.test("send messages remotely") { env =>
|
||||
val msg = JobSubmittedMsg("hello-remote".id)
|
||||
for {
|
||||
// Create two pubsub instances connected to the same database
|
||||
pubsubs <- conntectedPubsubs(env)
|
||||
(ps1, ps2) = pubsubs
|
||||
|
||||
// subscribe to ps1 and send via ps2
|
||||
res <- subscribe(ps1, Topics.jobSubmitted)
|
||||
(received, _, subFiber) = res
|
||||
_ <- ps2.publish1(Topics.jobSubmitted, msg)
|
||||
outcome <- subFiber.join
|
||||
msgRec <- received.get
|
||||
|
||||
// check results
|
||||
_ = assert(outcome.isSuccess)
|
||||
_ = assertEquals(msgRec.map(_.head.topic), Topics.jobSubmitted.topic.some)
|
||||
_ = assertEquals(msgRec.map(_.body), msg.some)
|
||||
} yield ()
|
||||
}
|
||||
|
||||
pubsubEnv.test("do not receive remote message from other topic") { env =>
|
||||
val msg = JobDoneMsg("job-1".id, "task-2".id)
|
||||
|
||||
for {
|
||||
// Create two pubsub instances connected to the same database
|
||||
pubsubs <- conntectedPubsubs(env)
|
||||
(ps1, ps2) = pubsubs
|
||||
|
||||
// subscribe to ps1 and send via ps2
|
||||
res <- subscribe(ps1, Topics.jobSubmitted)
|
||||
(received, halt, subFiber) = res
|
||||
_ <- ps2.publish1(Topics.jobDone, msg)
|
||||
_ <- IO.sleep(100.millis)
|
||||
_ <- halt.set(true)
|
||||
outcome <- subFiber.join
|
||||
msgRec <- received.get
|
||||
|
||||
// check results
|
||||
_ = assert(outcome.isSuccess)
|
||||
_ = assertEquals(msgRec, None)
|
||||
} yield ()
|
||||
}
|
||||
}
|
Reference in New Issue
Block a user