Hide implementation details from PubSubT interface

This commit is contained in:
eikek 2021-11-05 20:56:02 +01:00
parent d483d9f176
commit 4d5c695882
6 changed files with 72 additions and 54 deletions
modules/pubsub
api/src/main/scala/docspell/pubsub/api
naive/src
main/scala/docspell/pubsub/naive
test/scala/docspell/pubsub/naive

@ -11,7 +11,7 @@ import fs2.{Pipe, Stream}
import docspell.common.Logger
trait PubSubT[F[_], P <: PubSub[F]] {
trait PubSubT[F[_]] {
def publish1[A](topic: TypedTopic[A], msg: A): F[MessageHead]
@ -19,15 +19,15 @@ trait PubSubT[F[_], P <: PubSub[F]] {
def subscribe[A](topic: TypedTopic[A]): Stream[F, Message[A]]
def delegate: P
def delegate: PubSub[F]
def withDelegate[R <: PubSub[F]](delegate: R): PubSubT[F, R]
def withDelegate(delegate: PubSub[F]): PubSubT[F]
}
object PubSubT {
def apply[F[_], P <: PubSub[F]](pubSub: P, logger: Logger[F]): PubSubT[F, P] =
new PubSubT[F, P] {
def apply[F[_]](pubSub: PubSub[F], logger: Logger[F]): PubSubT[F] =
new PubSubT[F] {
def publish1[A](topic: TypedTopic[A], msg: A): F[MessageHead] =
pubSub.publish1(topic.topic, topic.codec(msg))
@ -49,9 +49,9 @@ object PubSubT {
}
)
def delegate: P = pubSub
def delegate: PubSub[F] = pubSub
def withDelegate[R <: PubSub[F]](newDelegate: R): PubSubT[F, R] =
def withDelegate(newDelegate: PubSub[F]): PubSubT[F] =
PubSubT(newDelegate, logger)
}
}

@ -25,9 +25,13 @@ object Topics {
val jobSubmitted: TypedTopic[JobSubmittedMsg] =
TypedTopic[JobSubmittedMsg](Topic("job-submitted"))
val all: NonEmptyList[TypedTopic[_]] = NonEmptyList.of(jobDone, jobSubmitted)
/** Notify a node to cancel a job with the given id */
val cancelJob: TypedTopic[CancelJobMsg] =
TypedTopic[CancelJobMsg](Topic("cancel-job"))
final case class JobSubmittedMsg(id: Ident)
val all: NonEmptyList[TypedTopic[_]] = NonEmptyList.of(jobDone, jobSubmitted, cancelJob)
final case class JobSubmittedMsg(task: Ident)
object JobSubmittedMsg {
implicit val jsonDecoder: Decoder[JobSubmittedMsg] =
deriveDecoder[JobSubmittedMsg]
@ -44,4 +48,14 @@ object Topics {
implicit val jsonEncoder: Encoder[JobDoneMsg] =
deriveEncoder[JobDoneMsg]
}
final case class CancelJobMsg(jobId: Ident, nodeId: Ident)
object CancelJobMsg {
implicit val jsonDecoder: Decoder[CancelJobMsg] =
deriveDecoder[CancelJobMsg]
implicit val jsonEncoder: Encoder[CancelJobMsg] =
deriveEncoder[CancelJobMsg]
}
}

@ -185,18 +185,18 @@ object NaivePubSub {
cfg: PubSubConfig,
store: Store[F],
client: Client[F]
)(topics: NonEmptyList[Topic]): F[NaivePubSub[F]] =
for {
)(topics: NonEmptyList[Topic]): Resource[F, NaivePubSub[F]] =
Resource.eval(for {
state <- Ref.ofEffect[F, State[F]](State.create[F](topics))
_ <- store.transact(RPubSub.initTopics(cfg.nodeId, cfg.url, topics.map(_.name)))
} yield new NaivePubSub[F](cfg, state, store, client)
} yield new NaivePubSub[F](cfg, state, store, client))
def create[F[_]: Async](
cfg: PubSubConfig,
store: Store[F],
client: Client[F],
logger: Logger[F]
)(topics: NonEmptyList[Topic]): F[PubSubT[F, NaivePubSub[F]]] =
)(topics: NonEmptyList[Topic]): Resource[F, PubSubT[F]] =
apply[F](cfg, store, client)(topics).map(ps => PubSubT(ps, logger))
final case class State[F[_]](topics: Map[String, Fs2Topic[F, Message[Json]]]) {}

@ -23,7 +23,7 @@ trait Fixtures extends HttpClientOps { self: CatsEffectSuite =>
val pubsubT = ResourceFixture {
Fixtures
.envResource("node-1")
.flatMap(e => Resource.eval(e.pubSub))
.flatMap(_.pubSub)
.map(ps => PubSubT(ps, Fixtures.loggerIO))
}
@ -34,8 +34,8 @@ trait Fixtures extends HttpClientOps { self: CatsEffectSuite =>
ps_2 <- env.withNodeId("node-2").pubSubT
// both instances have a dummy client. now connect their clients to each other
ps1 = ps_1.withDelegate(ps_1.delegate.withClient(httpClient(ps_2)))
ps2 = ps_2.withDelegate(ps_2.delegate.withClient(httpClient(ps_1)))
ps1 = ps_1.withDelegate(ps_1.delegateT.withClient(httpClient(ps_2)))
ps2 = ps_2.withDelegate(ps_2.delegateT.withClient(httpClient(ps_1)))
} yield (ps1, ps2)
implicit final class StringId(s: String) {
@ -47,11 +47,11 @@ object Fixtures {
private val loggerIO: Logger[IO] = Logger.log4s(org.log4s.getLogger)
final case class Env(store: Store[IO], cfg: PubSubConfig) {
def pubSub: IO[NaivePubSub[IO]] = {
def pubSub: Resource[IO, NaivePubSub[IO]] = {
val dummyClient = Client[IO](_ => Resource.pure(Response.notFound[IO]))
NaivePubSub(cfg, store, dummyClient)(Topics.all.map(_.topic))
}
def pubSubT: IO[PubSubT[IO, NaivePubSub[IO]]] =
def pubSubT: Resource[IO, PubSubT[IO]] =
pubSub.map(PubSubT(_, loggerIO))
def withNodeId(nodeId: String): Env =
@ -66,7 +66,7 @@ object Fixtures {
def testConfig(nodeId: String) =
PubSubConfig(
Ident.unsafe(nodeId),
LenientUri.unsafe(s"http://${nodeId}/"),
LenientUri.unsafe(s"http://$nodeId/"),
0
)

@ -25,8 +25,8 @@ trait HttpClientOps {
def httpClient(ps: NaivePubSub[IO]): Client[IO] =
httpClient(ps.receiveRoute)
def httpClient(ps: PubSubT[IO, NaivePubSub[IO]]): Client[IO] =
httpClient(ps.delegate)
def httpClient(ps: PubSubT[IO]): Client[IO] =
httpClient(ps.delegateT)
implicit final class ClientOps(client: Client[IO]) {
val uri = Uri.unsafeFromString("http://localhost/")
@ -48,6 +48,10 @@ trait HttpClientOps {
def send[A](typedTopic: TypedTopic[A], body: A): IO[Unit] =
sendMessage(typedTopic.topic, body)(typedTopic.codec)
}
implicit final class PubSubTestOps(ps: PubSubT[IO]) {
def delegateT: NaivePubSub[IO] = ps.delegate.asInstanceOf[NaivePubSub[IO]]
}
}
object HttpClientOps {

@ -21,7 +21,7 @@ import munit.CatsEffectSuite
class NaivePubSubTest extends CatsEffectSuite with Fixtures {
private[this] val logger = Logger.log4s[IO](org.log4s.getLogger)
def subscribe[A](ps: PubSubT[IO, NaivePubSub[IO]], topic: TypedTopic[A]) =
def subscribe[A](ps: PubSubT[IO], topic: TypedTopic[A]) =
for {
received <- Ref.of[IO, Option[Message[A]]](None)
halt <- SignallingRef.of[IO, Boolean](false)
@ -72,7 +72,7 @@ class NaivePubSubTest extends CatsEffectSuite with Fixtures {
for {
res <- subscribe(ps, Topics.jobSubmitted)
(received, _, subFiber) = res
client = httpClient(ps.delegate.receiveRoute)
client = httpClient(ps.delegateT.receiveRoute)
_ <- client.send(Topics.jobSubmitted, msg)
outcome <- subFiber.join
msgRec <- received.get
@ -84,45 +84,45 @@ class NaivePubSubTest extends CatsEffectSuite with Fixtures {
pubsubEnv.test("send messages remotely") { env =>
val msg = JobSubmittedMsg("hello-remote".id)
for {
// Create two pubsub instances connected to the same database
pubsubs <- conntectedPubsubs(env)
(ps1, ps2) = pubsubs
// subscribe to ps1 and send via ps2
res <- subscribe(ps1, Topics.jobSubmitted)
(received, _, subFiber) = res
_ <- ps2.publish1(Topics.jobSubmitted, msg)
outcome <- subFiber.join
msgRec <- received.get
// Create two pubsub instances connected to the same database
conntectedPubsubs(env).use { case (ps1, ps2) =>
for {
// subscribe to ps1 and send via ps2
res <- subscribe(ps1, Topics.jobSubmitted)
(received, _, subFiber) = res
_ <- ps2.publish1(Topics.jobSubmitted, msg)
outcome <- subFiber.join
msgRec <- received.get
// check results
_ = assert(outcome.isSuccess)
_ = assertEquals(msgRec.map(_.head.topic), Topics.jobSubmitted.topic.some)
_ = assertEquals(msgRec.map(_.body), msg.some)
} yield ()
// check results
_ = assert(outcome.isSuccess)
_ = assertEquals(msgRec.map(_.head.topic), Topics.jobSubmitted.topic.some)
_ = assertEquals(msgRec.map(_.body), msg.some)
} yield ()
}
}
pubsubEnv.test("do not receive remote message from other topic") { env =>
val msg = JobDoneMsg("job-1".id, "task-2".id)
for {
// Create two pubsub instances connected to the same database
pubsubs <- conntectedPubsubs(env)
(ps1, ps2) = pubsubs
// Create two pubsub instances connected to the same database
conntectedPubsubs(env).use { case (ps1, ps2) =>
for {
// subscribe to ps1 and send via ps2
res <- subscribe(ps1, Topics.jobSubmitted)
(received, halt, subFiber) = res
_ <- ps2.publish1(Topics.jobDone, msg)
_ <- IO.sleep(100.millis)
_ <- halt.set(true)
outcome <- subFiber.join
msgRec <- received.get
// subscribe to ps1 and send via ps2
res <- subscribe(ps1, Topics.jobSubmitted)
(received, halt, subFiber) = res
_ <- ps2.publish1(Topics.jobDone, msg)
_ <- IO.sleep(100.millis)
_ <- halt.set(true)
outcome <- subFiber.join
msgRec <- received.get
// check results
_ = assert(outcome.isSuccess)
_ = assertEquals(msgRec, None)
} yield ()
}
// check results
_ = assert(outcome.isSuccess)
_ = assertEquals(msgRec, None)
} yield ()
}
}