Remove pubsub entries when shutting down and initialize by uri

This commit is contained in:
eikek
2022-01-12 21:17:49 +01:00
parent 02790ba8e7
commit 42fb87a432
2 changed files with 8 additions and 3 deletions

View File

@ -182,10 +182,12 @@ object NaivePubSub {
store: Store[F],
client: Client[F]
)(topics: NonEmptyList[Topic]): Resource[F, NaivePubSub[F]] =
Resource.eval(for {
Resource.make(for {
state <- Ref.ofEffect[F, State[F]](State.create[F](topics))
_ <- store.transact(RPubSub.initTopics(cfg.nodeId, cfg.url, topics.map(_.name)))
} yield new NaivePubSub[F](cfg, state, store, client))
} yield new NaivePubSub[F](cfg, state, store, client))(_ =>
store.transact(RPubSub.deleteTopics(cfg.nodeId)).as(())
)
def create[F[_]: Async](
cfg: PubSubConfig,