From 0651db9901adab3042dce68a3f222da17872647b Mon Sep 17 00:00:00 2001 From: eikek Date: Sun, 14 Nov 2021 22:33:24 +0100 Subject: [PATCH] Make publish async, replace joexclient in periodic job scheduler --- .../src/main/scala/docspell/backend/ops/OJoex.scala | 8 +++++--- .../src/main/scala/docspell/joex/JoexAppImpl.scala | 4 +--- .../docspell/joex/scheduler/PeriodicScheduler.scala | 6 +++--- .../joex/scheduler/PeriodicSchedulerImpl.scala | 8 +++----- .../src/main/scala/docspell/pubsub/api/PubSub.scala | 8 +++++--- .../src/main/scala/docspell/pubsub/api/PubSubT.scala | 12 ++++++------ .../scala/docspell/pubsub/naive/NaivePubSub.scala | 7 +++++-- .../docspell/pubsub/naive/NaivePubSubTest.scala | 2 +- 8 files changed, 29 insertions(+), 26 deletions(-) diff --git a/modules/backend/src/main/scala/docspell/backend/ops/OJoex.scala b/modules/backend/src/main/scala/docspell/backend/ops/OJoex.scala index 1ce43149..e51e8bd6 100644 --- a/modules/backend/src/main/scala/docspell/backend/ops/OJoex.scala +++ b/modules/backend/src/main/scala/docspell/backend/ops/OJoex.scala @@ -6,7 +6,9 @@ package docspell.backend.ops +import cats.Applicative import cats.effect._ +import cats.implicits._ import docspell.backend.msg.{CancelJob, Topics} import docspell.common.Ident @@ -20,13 +22,13 @@ trait OJoex[F[_]] { } object OJoex { - def apply[F[_]](pubSub: PubSubT[F]): Resource[F, OJoex[F]] = + def apply[F[_]: Applicative](pubSub: PubSubT[F]): Resource[F, OJoex[F]] = Resource.pure[F, OJoex[F]](new OJoex[F] { def notifyAllNodes: F[Unit] = - pubSub.publish1IgnoreErrors(Topics.jobsNotify, ()) + pubSub.publish1IgnoreErrors(Topics.jobsNotify, ()).as(()) def cancelJob(job: Ident, worker: Ident): F[Unit] = - pubSub.publish1IgnoreErrors(CancelJob.topic, CancelJob(job, worker)) + pubSub.publish1IgnoreErrors(CancelJob.topic, CancelJob(job, worker)).as(()) }) } diff --git a/modules/joex/src/main/scala/docspell/joex/JoexAppImpl.scala b/modules/joex/src/main/scala/docspell/joex/JoexAppImpl.scala index f761bfa8..41dee0ab 100644 --- a/modules/joex/src/main/scala/docspell/joex/JoexAppImpl.scala +++ b/modules/joex/src/main/scala/docspell/joex/JoexAppImpl.scala @@ -32,7 +32,6 @@ import docspell.joex.process.ReProcessItem import docspell.joex.scanmailbox._ import docspell.joex.scheduler._ import docspell.joex.updatecheck._ -import docspell.joexapi.client.JoexClient import docspell.pubsub.api.{PubSub, PubSubT} import docspell.store.Store import docspell.store.queue._ @@ -127,7 +126,6 @@ object JoexAppImpl { ): Resource[F, JoexApp[F]] = for { pstore <- PeriodicTaskStore.create(store) - client = JoexClient(httpClient) pubSubT = PubSubT( pubSub, Logger.log4s(org.log4s.getLogger(s"joex-${cfg.appId.id}")) @@ -271,7 +269,7 @@ object JoexAppImpl { sch, queue, pstore, - client + joex ) app = new JoexAppImpl(cfg, store, queue, pubSubT, pstore, termSignal, sch, psch) appR <- Resource.make(app.init.map(_ => app))(_.initShutdown) diff --git a/modules/joex/src/main/scala/docspell/joex/scheduler/PeriodicScheduler.scala b/modules/joex/src/main/scala/docspell/joex/scheduler/PeriodicScheduler.scala index 5366aa13..13f61705 100644 --- a/modules/joex/src/main/scala/docspell/joex/scheduler/PeriodicScheduler.scala +++ b/modules/joex/src/main/scala/docspell/joex/scheduler/PeriodicScheduler.scala @@ -10,7 +10,7 @@ import cats.effect._ import fs2._ import fs2.concurrent.SignallingRef -import docspell.joexapi.client.JoexClient +import docspell.backend.ops.OJoex import docspell.store.queue._ /** A periodic scheduler takes care to submit periodic tasks to the job queue. @@ -40,7 +40,7 @@ object PeriodicScheduler { sch: Scheduler[F], queue: JobQueue[F], store: PeriodicTaskStore[F], - client: JoexClient[F] + joex: OJoex[F] ): Resource[F, PeriodicScheduler[F]] = for { waiter <- Resource.eval(SignallingRef(true)) @@ -50,7 +50,7 @@ object PeriodicScheduler { sch, queue, store, - client, + joex, waiter, state ) diff --git a/modules/joex/src/main/scala/docspell/joex/scheduler/PeriodicSchedulerImpl.scala b/modules/joex/src/main/scala/docspell/joex/scheduler/PeriodicSchedulerImpl.scala index 2a47eaf8..b44ee6c4 100644 --- a/modules/joex/src/main/scala/docspell/joex/scheduler/PeriodicSchedulerImpl.scala +++ b/modules/joex/src/main/scala/docspell/joex/scheduler/PeriodicSchedulerImpl.scala @@ -11,10 +11,10 @@ import cats.implicits._ import fs2._ import fs2.concurrent.SignallingRef +import docspell.backend.ops.OJoex import docspell.common._ import docspell.common.syntax.all._ import docspell.joex.scheduler.PeriodicSchedulerImpl.State -import docspell.joexapi.client.JoexClient import docspell.store.queue._ import docspell.store.records.RPeriodicTask @@ -26,7 +26,7 @@ final class PeriodicSchedulerImpl[F[_]: Async]( sch: Scheduler[F], queue: JobQueue[F], store: PeriodicTaskStore[F], - client: JoexClient[F], + joex: OJoex[F], waiter: SignallingRef[F, Boolean], state: SignallingRef[F, State[F]] ) extends PeriodicScheduler[F] { @@ -119,9 +119,7 @@ final class PeriodicSchedulerImpl[F[_]: Async]( } def notifyJoex: F[Unit] = - sch.notifyChange *> store.findJoexNodes.flatMap( - _.traverse(n => client.notifyJoexIgnoreErrors(n.url)).map(_ => ()) - ) + sch.notifyChange *> joex.notifyAllNodes def scheduleNotify(pj: RPeriodicTask): F[Unit] = Timestamp diff --git a/modules/pubsub/api/src/main/scala/docspell/pubsub/api/PubSub.scala b/modules/pubsub/api/src/main/scala/docspell/pubsub/api/PubSub.scala index 6a9eb9a5..72ff8fe0 100644 --- a/modules/pubsub/api/src/main/scala/docspell/pubsub/api/PubSub.scala +++ b/modules/pubsub/api/src/main/scala/docspell/pubsub/api/PubSub.scala @@ -15,7 +15,7 @@ import docspell.common.{Ident, Timestamp} import io.circe.Json trait PubSub[F[_]] { - def publish1(topic: Topic, msg: Json): F[MessageHead] + def publish1(topic: Topic, msg: Json): F[F[MessageHead]] def publish(topic: Topic): Pipe[F, Json, MessageHead] @@ -24,8 +24,10 @@ trait PubSub[F[_]] { object PubSub { def noop[F[_]: Applicative]: PubSub[F] = new PubSub[F] { - def publish1(topic: Topic, msg: Json): F[MessageHead] = - Applicative[F].pure(MessageHead(Ident.unsafe("0"), Timestamp.Epoch, topic)) + def publish1(topic: Topic, msg: Json): F[F[MessageHead]] = + Applicative[F].pure( + Applicative[F].pure(MessageHead(Ident.unsafe("0"), Timestamp.Epoch, topic)) + ) def publish(topic: Topic): Pipe[F, Json, MessageHead] = _ => Stream.empty diff --git a/modules/pubsub/api/src/main/scala/docspell/pubsub/api/PubSubT.scala b/modules/pubsub/api/src/main/scala/docspell/pubsub/api/PubSubT.scala index d07f5e41..2e51922d 100644 --- a/modules/pubsub/api/src/main/scala/docspell/pubsub/api/PubSubT.scala +++ b/modules/pubsub/api/src/main/scala/docspell/pubsub/api/PubSubT.scala @@ -16,9 +16,9 @@ import docspell.common.Logger trait PubSubT[F[_]] { - def publish1[A](topic: TypedTopic[A], msg: A): F[MessageHead] + def publish1[A](topic: TypedTopic[A], msg: A): F[F[MessageHead]] - def publish1IgnoreErrors[A](topic: TypedTopic[A], msg: A): F[Unit] + def publish1IgnoreErrors[A](topic: TypedTopic[A], msg: A): F[F[Unit]] def publish[A](topic: TypedTopic[A]): Pipe[F, A, MessageHead] @@ -37,15 +37,15 @@ object PubSubT { def apply[F[_]: Async](pubSub: PubSub[F], logger: Logger[F]): PubSubT[F] = new PubSubT[F] { - def publish1[A](topic: TypedTopic[A], msg: A): F[MessageHead] = + def publish1[A](topic: TypedTopic[A], msg: A): F[F[MessageHead]] = pubSub.publish1(topic.topic, topic.codec(msg)) - def publish1IgnoreErrors[A](topic: TypedTopic[A], msg: A): F[Unit] = - publish1(topic, msg).attempt.flatMap { + def publish1IgnoreErrors[A](topic: TypedTopic[A], msg: A): F[F[Unit]] = + publish1(topic, msg).map(_.attempt.flatMap { case Right(_) => ().pure[F] case Left(ex) => logger.error(ex)(s"Error publishing to topic ${topic.topic.name}: $msg") - } + }) def publish[A](topic: TypedTopic[A]): Pipe[F, A, MessageHead] = _.map(topic.codec.apply).through(pubSub.publish(topic.topic)) diff --git a/modules/pubsub/naive/src/main/scala/docspell/pubsub/naive/NaivePubSub.scala b/modules/pubsub/naive/src/main/scala/docspell/pubsub/naive/NaivePubSub.scala index 811b6d58..3fb25e7c 100644 --- a/modules/pubsub/naive/src/main/scala/docspell/pubsub/naive/NaivePubSub.scala +++ b/modules/pubsub/naive/src/main/scala/docspell/pubsub/naive/NaivePubSub.scala @@ -65,7 +65,10 @@ final class NaivePubSub[F[_]: Async]( def withClient(client: Client[F]): NaivePubSub[F] = new NaivePubSub[F](cfg, state, store, client) - def publish1(topic: Topic, msgBody: Json): F[MessageHead] = + def publish1(topic: Topic, msgBody: Json): F[F[MessageHead]] = + Async[F].start(publish0(topic, msgBody)).map(fiber => fiber.joinWithNever) + + def publish0(topic: Topic, msgBody: Json): F[MessageHead] = for { head <- mkMessageHead(topic) msg = Message(head, msgBody) @@ -78,7 +81,7 @@ final class NaivePubSub[F[_]: Async]( def publish(topic: Topic): Pipe[F, Json, MessageHead] = ms => //TODO Do some optimization by grouping messages to the same topic - ms.evalMap(publish1(topic, _)) + ms.evalMap(publish0(topic, _)) def subscribe(topics: NonEmptyList[Topic]): Stream[F, Message[Json]] = (for { diff --git a/modules/pubsub/naive/src/test/scala/docspell/pubsub/naive/NaivePubSubTest.scala b/modules/pubsub/naive/src/test/scala/docspell/pubsub/naive/NaivePubSubTest.scala index 8d3094c2..dfb12e1d 100644 --- a/modules/pubsub/naive/src/test/scala/docspell/pubsub/naive/NaivePubSubTest.scala +++ b/modules/pubsub/naive/src/test/scala/docspell/pubsub/naive/NaivePubSubTest.scala @@ -44,7 +44,7 @@ class NaivePubSubTest extends CatsEffectSuite with Fixtures { for { res <- subscribe(ps, Topics.jobSubmitted) (received, _, subFiber) = res - headSend <- ps.publish1(Topics.jobSubmitted, JobSubmittedMsg("hello".id)) + headSend <- ps.publish1(Topics.jobSubmitted, JobSubmittedMsg("hello".id)).flatten outcome <- subFiber.join msgRec <- received.get _ = assert(outcome.isSuccess)