From 42fb87a432629502f0fa479fcf7049e319f2ea66 Mon Sep 17 00:00:00 2001
From: eikek <eike.kettner@posteo.de>
Date: Wed, 12 Jan 2022 21:17:49 +0100
Subject: [PATCH] Remove pubsub entries when shutting down and initialize by
 uri

---
 .../src/main/scala/docspell/pubsub/naive/NaivePubSub.scala  | 6 ++++--
 .../src/main/scala/docspell/store/records/RPubSub.scala     | 5 ++++-
 2 files changed, 8 insertions(+), 3 deletions(-)

diff --git a/modules/pubsub/naive/src/main/scala/docspell/pubsub/naive/NaivePubSub.scala b/modules/pubsub/naive/src/main/scala/docspell/pubsub/naive/NaivePubSub.scala
index 519cb12c..00385ad0 100644
--- a/modules/pubsub/naive/src/main/scala/docspell/pubsub/naive/NaivePubSub.scala
+++ b/modules/pubsub/naive/src/main/scala/docspell/pubsub/naive/NaivePubSub.scala
@@ -182,10 +182,12 @@ object NaivePubSub {
       store: Store[F],
       client: Client[F]
   )(topics: NonEmptyList[Topic]): Resource[F, NaivePubSub[F]] =
-    Resource.eval(for {
+    Resource.make(for {
       state <- Ref.ofEffect[F, State[F]](State.create[F](topics))
       _ <- store.transact(RPubSub.initTopics(cfg.nodeId, cfg.url, topics.map(_.name)))
-    } yield new NaivePubSub[F](cfg, state, store, client))
+    } yield new NaivePubSub[F](cfg, state, store, client))(_ =>
+      store.transact(RPubSub.deleteTopics(cfg.nodeId)).as(())
+    )
 
   def create[F[_]: Async](
       cfg: PubSubConfig,
diff --git a/modules/store/src/main/scala/docspell/store/records/RPubSub.scala b/modules/store/src/main/scala/docspell/store/records/RPubSub.scala
index 6509aaea..022cd11d 100644
--- a/modules/store/src/main/scala/docspell/store/records/RPubSub.scala
+++ b/modules/store/src/main/scala/docspell/store/records/RPubSub.scala
@@ -52,7 +52,7 @@ object RPubSub {
       url: LenientUri,
       topics: NonEmptyList[String]
   ): ConnectionIO[Int] =
-    DML.delete(T, T.nodeId === nodeId) *>
+    DML.delete(T, T.nodeId === nodeId || T.url === url) *>
       topics.toList
         .traverse(t =>
           Ident
@@ -61,6 +61,9 @@ object RPubSub {
         )
         .map(_.sum)
 
+  def deleteTopics(nodeId: Ident): ConnectionIO[Int] =
+    DML.delete(T, T.nodeId === nodeId)
+
   def increment(url: LenientUri, topics: NonEmptyList[String]): ConnectionIO[Int] =
     DML.update(
       T,