diff --git a/modules/pubsub/naive/src/main/scala/docspell/pubsub/naive/NaivePubSub.scala b/modules/pubsub/naive/src/main/scala/docspell/pubsub/naive/NaivePubSub.scala index 519cb12c..00385ad0 100644 --- a/modules/pubsub/naive/src/main/scala/docspell/pubsub/naive/NaivePubSub.scala +++ b/modules/pubsub/naive/src/main/scala/docspell/pubsub/naive/NaivePubSub.scala @@ -182,10 +182,12 @@ object NaivePubSub { store: Store[F], client: Client[F] )(topics: NonEmptyList[Topic]): Resource[F, NaivePubSub[F]] = - Resource.eval(for { + Resource.make(for { state <- Ref.ofEffect[F, State[F]](State.create[F](topics)) _ <- store.transact(RPubSub.initTopics(cfg.nodeId, cfg.url, topics.map(_.name))) - } yield new NaivePubSub[F](cfg, state, store, client)) + } yield new NaivePubSub[F](cfg, state, store, client))(_ => + store.transact(RPubSub.deleteTopics(cfg.nodeId)).as(()) + ) def create[F[_]: Async]( cfg: PubSubConfig, diff --git a/modules/store/src/main/scala/docspell/store/records/RPubSub.scala b/modules/store/src/main/scala/docspell/store/records/RPubSub.scala index 6509aaea..022cd11d 100644 --- a/modules/store/src/main/scala/docspell/store/records/RPubSub.scala +++ b/modules/store/src/main/scala/docspell/store/records/RPubSub.scala @@ -52,7 +52,7 @@ object RPubSub { url: LenientUri, topics: NonEmptyList[String] ): ConnectionIO[Int] = - DML.delete(T, T.nodeId === nodeId) *> + DML.delete(T, T.nodeId === nodeId || T.url === url) *> topics.toList .traverse(t => Ident @@ -61,6 +61,9 @@ object RPubSub { ) .map(_.sum) + def deleteTopics(nodeId: Ident): ConnectionIO[Int] = + DML.delete(T, T.nodeId === nodeId) + def increment(url: LenientUri, topics: NonEmptyList[String]): ConnectionIO[Int] = DML.update( T,