From 4d5c695882c4b19bb9c51384252d7255a21939fb Mon Sep 17 00:00:00 2001 From: eikek Date: Fri, 5 Nov 2021 20:56:02 +0100 Subject: [PATCH] Hide implementation details from PubSubT interface --- .../scala/docspell/pubsub/api/PubSubT.scala | 14 ++-- .../scala/docspell/pubsub/api/Topics.scala | 18 ++++- .../docspell/pubsub/naive/NaivePubSub.scala | 8 +-- .../docspell/pubsub/naive/Fixtures.scala | 12 ++-- .../docspell/pubsub/naive/HttpClientOps.scala | 8 ++- .../pubsub/naive/NaivePubSubTest.scala | 66 +++++++++---------- 6 files changed, 72 insertions(+), 54 deletions(-) diff --git a/modules/pubsub/api/src/main/scala/docspell/pubsub/api/PubSubT.scala b/modules/pubsub/api/src/main/scala/docspell/pubsub/api/PubSubT.scala index 176e8885..9f28b798 100644 --- a/modules/pubsub/api/src/main/scala/docspell/pubsub/api/PubSubT.scala +++ b/modules/pubsub/api/src/main/scala/docspell/pubsub/api/PubSubT.scala @@ -11,7 +11,7 @@ import fs2.{Pipe, Stream} import docspell.common.Logger -trait PubSubT[F[_], P <: PubSub[F]] { +trait PubSubT[F[_]] { def publish1[A](topic: TypedTopic[A], msg: A): F[MessageHead] @@ -19,15 +19,15 @@ trait PubSubT[F[_], P <: PubSub[F]] { def subscribe[A](topic: TypedTopic[A]): Stream[F, Message[A]] - def delegate: P + def delegate: PubSub[F] - def withDelegate[R <: PubSub[F]](delegate: R): PubSubT[F, R] + def withDelegate(delegate: PubSub[F]): PubSubT[F] } object PubSubT { - def apply[F[_], P <: PubSub[F]](pubSub: P, logger: Logger[F]): PubSubT[F, P] = - new PubSubT[F, P] { + def apply[F[_]](pubSub: PubSub[F], logger: Logger[F]): PubSubT[F] = + new PubSubT[F] { def publish1[A](topic: TypedTopic[A], msg: A): F[MessageHead] = pubSub.publish1(topic.topic, topic.codec(msg)) @@ -49,9 +49,9 @@ object PubSubT { } ) - def delegate: P = pubSub + def delegate: PubSub[F] = pubSub - def withDelegate[R <: PubSub[F]](newDelegate: R): PubSubT[F, R] = + def withDelegate(newDelegate: PubSub[F]): PubSubT[F] = PubSubT(newDelegate, logger) } } diff --git a/modules/pubsub/api/src/main/scala/docspell/pubsub/api/Topics.scala b/modules/pubsub/api/src/main/scala/docspell/pubsub/api/Topics.scala index 78b22408..a536607a 100644 --- a/modules/pubsub/api/src/main/scala/docspell/pubsub/api/Topics.scala +++ b/modules/pubsub/api/src/main/scala/docspell/pubsub/api/Topics.scala @@ -25,9 +25,13 @@ object Topics { val jobSubmitted: TypedTopic[JobSubmittedMsg] = TypedTopic[JobSubmittedMsg](Topic("job-submitted")) - val all: NonEmptyList[TypedTopic[_]] = NonEmptyList.of(jobDone, jobSubmitted) + /** Notify a node to cancel a job with the given id */ + val cancelJob: TypedTopic[CancelJobMsg] = + TypedTopic[CancelJobMsg](Topic("cancel-job")) - final case class JobSubmittedMsg(id: Ident) + val all: NonEmptyList[TypedTopic[_]] = NonEmptyList.of(jobDone, jobSubmitted, cancelJob) + + final case class JobSubmittedMsg(task: Ident) object JobSubmittedMsg { implicit val jsonDecoder: Decoder[JobSubmittedMsg] = deriveDecoder[JobSubmittedMsg] @@ -44,4 +48,14 @@ object Topics { implicit val jsonEncoder: Encoder[JobDoneMsg] = deriveEncoder[JobDoneMsg] } + + final case class CancelJobMsg(jobId: Ident, nodeId: Ident) + object CancelJobMsg { + implicit val jsonDecoder: Decoder[CancelJobMsg] = + deriveDecoder[CancelJobMsg] + + implicit val jsonEncoder: Encoder[CancelJobMsg] = + deriveEncoder[CancelJobMsg] + + } } diff --git a/modules/pubsub/naive/src/main/scala/docspell/pubsub/naive/NaivePubSub.scala b/modules/pubsub/naive/src/main/scala/docspell/pubsub/naive/NaivePubSub.scala index 55bdc786..fb7bb1cb 100644 --- a/modules/pubsub/naive/src/main/scala/docspell/pubsub/naive/NaivePubSub.scala +++ b/modules/pubsub/naive/src/main/scala/docspell/pubsub/naive/NaivePubSub.scala @@ -185,18 +185,18 @@ object NaivePubSub { cfg: PubSubConfig, store: Store[F], client: Client[F] - )(topics: NonEmptyList[Topic]): F[NaivePubSub[F]] = - for { + )(topics: NonEmptyList[Topic]): Resource[F, NaivePubSub[F]] = + Resource.eval(for { state <- Ref.ofEffect[F, State[F]](State.create[F](topics)) _ <- store.transact(RPubSub.initTopics(cfg.nodeId, cfg.url, topics.map(_.name))) - } yield new NaivePubSub[F](cfg, state, store, client) + } yield new NaivePubSub[F](cfg, state, store, client)) def create[F[_]: Async]( cfg: PubSubConfig, store: Store[F], client: Client[F], logger: Logger[F] - )(topics: NonEmptyList[Topic]): F[PubSubT[F, NaivePubSub[F]]] = + )(topics: NonEmptyList[Topic]): Resource[F, PubSubT[F]] = apply[F](cfg, store, client)(topics).map(ps => PubSubT(ps, logger)) final case class State[F[_]](topics: Map[String, Fs2Topic[F, Message[Json]]]) {} diff --git a/modules/pubsub/naive/src/test/scala/docspell/pubsub/naive/Fixtures.scala b/modules/pubsub/naive/src/test/scala/docspell/pubsub/naive/Fixtures.scala index cd2a386a..99e863ce 100644 --- a/modules/pubsub/naive/src/test/scala/docspell/pubsub/naive/Fixtures.scala +++ b/modules/pubsub/naive/src/test/scala/docspell/pubsub/naive/Fixtures.scala @@ -23,7 +23,7 @@ trait Fixtures extends HttpClientOps { self: CatsEffectSuite => val pubsubT = ResourceFixture { Fixtures .envResource("node-1") - .flatMap(e => Resource.eval(e.pubSub)) + .flatMap(_.pubSub) .map(ps => PubSubT(ps, Fixtures.loggerIO)) } @@ -34,8 +34,8 @@ trait Fixtures extends HttpClientOps { self: CatsEffectSuite => ps_2 <- env.withNodeId("node-2").pubSubT // both instances have a dummy client. now connect their clients to each other - ps1 = ps_1.withDelegate(ps_1.delegate.withClient(httpClient(ps_2))) - ps2 = ps_2.withDelegate(ps_2.delegate.withClient(httpClient(ps_1))) + ps1 = ps_1.withDelegate(ps_1.delegateT.withClient(httpClient(ps_2))) + ps2 = ps_2.withDelegate(ps_2.delegateT.withClient(httpClient(ps_1))) } yield (ps1, ps2) implicit final class StringId(s: String) { @@ -47,11 +47,11 @@ object Fixtures { private val loggerIO: Logger[IO] = Logger.log4s(org.log4s.getLogger) final case class Env(store: Store[IO], cfg: PubSubConfig) { - def pubSub: IO[NaivePubSub[IO]] = { + def pubSub: Resource[IO, NaivePubSub[IO]] = { val dummyClient = Client[IO](_ => Resource.pure(Response.notFound[IO])) NaivePubSub(cfg, store, dummyClient)(Topics.all.map(_.topic)) } - def pubSubT: IO[PubSubT[IO, NaivePubSub[IO]]] = + def pubSubT: Resource[IO, PubSubT[IO]] = pubSub.map(PubSubT(_, loggerIO)) def withNodeId(nodeId: String): Env = @@ -66,7 +66,7 @@ object Fixtures { def testConfig(nodeId: String) = PubSubConfig( Ident.unsafe(nodeId), - LenientUri.unsafe(s"http://${nodeId}/"), + LenientUri.unsafe(s"http://$nodeId/"), 0 ) diff --git a/modules/pubsub/naive/src/test/scala/docspell/pubsub/naive/HttpClientOps.scala b/modules/pubsub/naive/src/test/scala/docspell/pubsub/naive/HttpClientOps.scala index cafdf0c2..3659f88e 100644 --- a/modules/pubsub/naive/src/test/scala/docspell/pubsub/naive/HttpClientOps.scala +++ b/modules/pubsub/naive/src/test/scala/docspell/pubsub/naive/HttpClientOps.scala @@ -25,8 +25,8 @@ trait HttpClientOps { def httpClient(ps: NaivePubSub[IO]): Client[IO] = httpClient(ps.receiveRoute) - def httpClient(ps: PubSubT[IO, NaivePubSub[IO]]): Client[IO] = - httpClient(ps.delegate) + def httpClient(ps: PubSubT[IO]): Client[IO] = + httpClient(ps.delegateT) implicit final class ClientOps(client: Client[IO]) { val uri = Uri.unsafeFromString("http://localhost/") @@ -48,6 +48,10 @@ trait HttpClientOps { def send[A](typedTopic: TypedTopic[A], body: A): IO[Unit] = sendMessage(typedTopic.topic, body)(typedTopic.codec) } + + implicit final class PubSubTestOps(ps: PubSubT[IO]) { + def delegateT: NaivePubSub[IO] = ps.delegate.asInstanceOf[NaivePubSub[IO]] + } } object HttpClientOps { diff --git a/modules/pubsub/naive/src/test/scala/docspell/pubsub/naive/NaivePubSubTest.scala b/modules/pubsub/naive/src/test/scala/docspell/pubsub/naive/NaivePubSubTest.scala index c244e1da..ece6f443 100644 --- a/modules/pubsub/naive/src/test/scala/docspell/pubsub/naive/NaivePubSubTest.scala +++ b/modules/pubsub/naive/src/test/scala/docspell/pubsub/naive/NaivePubSubTest.scala @@ -21,7 +21,7 @@ import munit.CatsEffectSuite class NaivePubSubTest extends CatsEffectSuite with Fixtures { private[this] val logger = Logger.log4s[IO](org.log4s.getLogger) - def subscribe[A](ps: PubSubT[IO, NaivePubSub[IO]], topic: TypedTopic[A]) = + def subscribe[A](ps: PubSubT[IO], topic: TypedTopic[A]) = for { received <- Ref.of[IO, Option[Message[A]]](None) halt <- SignallingRef.of[IO, Boolean](false) @@ -72,7 +72,7 @@ class NaivePubSubTest extends CatsEffectSuite with Fixtures { for { res <- subscribe(ps, Topics.jobSubmitted) (received, _, subFiber) = res - client = httpClient(ps.delegate.receiveRoute) + client = httpClient(ps.delegateT.receiveRoute) _ <- client.send(Topics.jobSubmitted, msg) outcome <- subFiber.join msgRec <- received.get @@ -84,45 +84,45 @@ class NaivePubSubTest extends CatsEffectSuite with Fixtures { pubsubEnv.test("send messages remotely") { env => val msg = JobSubmittedMsg("hello-remote".id) - for { - // Create two pubsub instances connected to the same database - pubsubs <- conntectedPubsubs(env) - (ps1, ps2) = pubsubs - // subscribe to ps1 and send via ps2 - res <- subscribe(ps1, Topics.jobSubmitted) - (received, _, subFiber) = res - _ <- ps2.publish1(Topics.jobSubmitted, msg) - outcome <- subFiber.join - msgRec <- received.get + // Create two pubsub instances connected to the same database + conntectedPubsubs(env).use { case (ps1, ps2) => + for { + // subscribe to ps1 and send via ps2 + res <- subscribe(ps1, Topics.jobSubmitted) + (received, _, subFiber) = res + _ <- ps2.publish1(Topics.jobSubmitted, msg) + outcome <- subFiber.join + msgRec <- received.get - // check results - _ = assert(outcome.isSuccess) - _ = assertEquals(msgRec.map(_.head.topic), Topics.jobSubmitted.topic.some) - _ = assertEquals(msgRec.map(_.body), msg.some) - } yield () + // check results + _ = assert(outcome.isSuccess) + _ = assertEquals(msgRec.map(_.head.topic), Topics.jobSubmitted.topic.some) + _ = assertEquals(msgRec.map(_.body), msg.some) + } yield () + } } pubsubEnv.test("do not receive remote message from other topic") { env => val msg = JobDoneMsg("job-1".id, "task-2".id) - for { - // Create two pubsub instances connected to the same database - pubsubs <- conntectedPubsubs(env) - (ps1, ps2) = pubsubs + // Create two pubsub instances connected to the same database + conntectedPubsubs(env).use { case (ps1, ps2) => + for { + // subscribe to ps1 and send via ps2 + res <- subscribe(ps1, Topics.jobSubmitted) + (received, halt, subFiber) = res + _ <- ps2.publish1(Topics.jobDone, msg) + _ <- IO.sleep(100.millis) + _ <- halt.set(true) + outcome <- subFiber.join + msgRec <- received.get - // subscribe to ps1 and send via ps2 - res <- subscribe(ps1, Topics.jobSubmitted) - (received, halt, subFiber) = res - _ <- ps2.publish1(Topics.jobDone, msg) - _ <- IO.sleep(100.millis) - _ <- halt.set(true) - outcome <- subFiber.join - msgRec <- received.get + // check results + _ = assert(outcome.isSuccess) + _ = assertEquals(msgRec, None) + } yield () + } - // check results - _ = assert(outcome.isSuccess) - _ = assertEquals(msgRec, None) - } yield () } }