Merge pull request #1280 from eikek/fix/app-ids

Remove pubsub entries when shutting down and initialize by uri
This commit is contained in:
mergify[bot]
2022-01-12 20:40:40 +00:00
committed by GitHub
2 changed files with 8 additions and 3 deletions

View File

@ -182,10 +182,12 @@ object NaivePubSub {
store: Store[F], store: Store[F],
client: Client[F] client: Client[F]
)(topics: NonEmptyList[Topic]): Resource[F, NaivePubSub[F]] = )(topics: NonEmptyList[Topic]): Resource[F, NaivePubSub[F]] =
Resource.eval(for { Resource.make(for {
state <- Ref.ofEffect[F, State[F]](State.create[F](topics)) state <- Ref.ofEffect[F, State[F]](State.create[F](topics))
_ <- store.transact(RPubSub.initTopics(cfg.nodeId, cfg.url, topics.map(_.name))) _ <- store.transact(RPubSub.initTopics(cfg.nodeId, cfg.url, topics.map(_.name)))
} yield new NaivePubSub[F](cfg, state, store, client)) } yield new NaivePubSub[F](cfg, state, store, client))(_ =>
store.transact(RPubSub.deleteTopics(cfg.nodeId)).as(())
)
def create[F[_]: Async]( def create[F[_]: Async](
cfg: PubSubConfig, cfg: PubSubConfig,

View File

@ -52,7 +52,7 @@ object RPubSub {
url: LenientUri, url: LenientUri,
topics: NonEmptyList[String] topics: NonEmptyList[String]
): ConnectionIO[Int] = ): ConnectionIO[Int] =
DML.delete(T, T.nodeId === nodeId) *> DML.delete(T, T.nodeId === nodeId || T.url === url) *>
topics.toList topics.toList
.traverse(t => .traverse(t =>
Ident Ident
@ -61,6 +61,9 @@ object RPubSub {
) )
.map(_.sum) .map(_.sum)
def deleteTopics(nodeId: Ident): ConnectionIO[Int] =
DML.delete(T, T.nodeId === nodeId)
def increment(url: LenientUri, topics: NonEmptyList[String]): ConnectionIO[Int] = def increment(url: LenientUri, topics: NonEmptyList[String]): ConnectionIO[Int] =
DML.update( DML.update(
T, T,