mirror of
https://github.com/TheAnachronism/docspell.git
synced 2025-06-21 18:08:25 +00:00
Make publish async, replace joexclient in periodic job scheduler
This commit is contained in:
@ -15,7 +15,7 @@ import docspell.common.{Ident, Timestamp}
|
||||
import io.circe.Json
|
||||
|
||||
trait PubSub[F[_]] {
|
||||
def publish1(topic: Topic, msg: Json): F[MessageHead]
|
||||
def publish1(topic: Topic, msg: Json): F[F[MessageHead]]
|
||||
|
||||
def publish(topic: Topic): Pipe[F, Json, MessageHead]
|
||||
|
||||
@ -24,8 +24,10 @@ trait PubSub[F[_]] {
|
||||
object PubSub {
|
||||
def noop[F[_]: Applicative]: PubSub[F] =
|
||||
new PubSub[F] {
|
||||
def publish1(topic: Topic, msg: Json): F[MessageHead] =
|
||||
Applicative[F].pure(MessageHead(Ident.unsafe("0"), Timestamp.Epoch, topic))
|
||||
def publish1(topic: Topic, msg: Json): F[F[MessageHead]] =
|
||||
Applicative[F].pure(
|
||||
Applicative[F].pure(MessageHead(Ident.unsafe("0"), Timestamp.Epoch, topic))
|
||||
)
|
||||
|
||||
def publish(topic: Topic): Pipe[F, Json, MessageHead] =
|
||||
_ => Stream.empty
|
||||
|
@ -16,9 +16,9 @@ import docspell.common.Logger
|
||||
|
||||
trait PubSubT[F[_]] {
|
||||
|
||||
def publish1[A](topic: TypedTopic[A], msg: A): F[MessageHead]
|
||||
def publish1[A](topic: TypedTopic[A], msg: A): F[F[MessageHead]]
|
||||
|
||||
def publish1IgnoreErrors[A](topic: TypedTopic[A], msg: A): F[Unit]
|
||||
def publish1IgnoreErrors[A](topic: TypedTopic[A], msg: A): F[F[Unit]]
|
||||
|
||||
def publish[A](topic: TypedTopic[A]): Pipe[F, A, MessageHead]
|
||||
|
||||
@ -37,15 +37,15 @@ object PubSubT {
|
||||
|
||||
def apply[F[_]: Async](pubSub: PubSub[F], logger: Logger[F]): PubSubT[F] =
|
||||
new PubSubT[F] {
|
||||
def publish1[A](topic: TypedTopic[A], msg: A): F[MessageHead] =
|
||||
def publish1[A](topic: TypedTopic[A], msg: A): F[F[MessageHead]] =
|
||||
pubSub.publish1(topic.topic, topic.codec(msg))
|
||||
|
||||
def publish1IgnoreErrors[A](topic: TypedTopic[A], msg: A): F[Unit] =
|
||||
publish1(topic, msg).attempt.flatMap {
|
||||
def publish1IgnoreErrors[A](topic: TypedTopic[A], msg: A): F[F[Unit]] =
|
||||
publish1(topic, msg).map(_.attempt.flatMap {
|
||||
case Right(_) => ().pure[F]
|
||||
case Left(ex) =>
|
||||
logger.error(ex)(s"Error publishing to topic ${topic.topic.name}: $msg")
|
||||
}
|
||||
})
|
||||
|
||||
def publish[A](topic: TypedTopic[A]): Pipe[F, A, MessageHead] =
|
||||
_.map(topic.codec.apply).through(pubSub.publish(topic.topic))
|
||||
|
@ -65,7 +65,10 @@ final class NaivePubSub[F[_]: Async](
|
||||
def withClient(client: Client[F]): NaivePubSub[F] =
|
||||
new NaivePubSub[F](cfg, state, store, client)
|
||||
|
||||
def publish1(topic: Topic, msgBody: Json): F[MessageHead] =
|
||||
def publish1(topic: Topic, msgBody: Json): F[F[MessageHead]] =
|
||||
Async[F].start(publish0(topic, msgBody)).map(fiber => fiber.joinWithNever)
|
||||
|
||||
def publish0(topic: Topic, msgBody: Json): F[MessageHead] =
|
||||
for {
|
||||
head <- mkMessageHead(topic)
|
||||
msg = Message(head, msgBody)
|
||||
@ -78,7 +81,7 @@ final class NaivePubSub[F[_]: Async](
|
||||
|
||||
def publish(topic: Topic): Pipe[F, Json, MessageHead] =
|
||||
ms => //TODO Do some optimization by grouping messages to the same topic
|
||||
ms.evalMap(publish1(topic, _))
|
||||
ms.evalMap(publish0(topic, _))
|
||||
|
||||
def subscribe(topics: NonEmptyList[Topic]): Stream[F, Message[Json]] =
|
||||
(for {
|
||||
|
@ -44,7 +44,7 @@ class NaivePubSubTest extends CatsEffectSuite with Fixtures {
|
||||
for {
|
||||
res <- subscribe(ps, Topics.jobSubmitted)
|
||||
(received, _, subFiber) = res
|
||||
headSend <- ps.publish1(Topics.jobSubmitted, JobSubmittedMsg("hello".id))
|
||||
headSend <- ps.publish1(Topics.jobSubmitted, JobSubmittedMsg("hello".id)).flatten
|
||||
outcome <- subFiber.join
|
||||
msgRec <- received.get
|
||||
_ = assert(outcome.isSuccess)
|
||||
|
Reference in New Issue
Block a user