From 14a413e78769526d152b463019dc9766dc4091f1 Mon Sep 17 00:00:00 2001 From: eikek Date: Sun, 30 Jan 2022 17:09:52 +0100 Subject: [PATCH] Update channel names when retrieving user tasks Usertasks hold their arguments in an opaque json structure that is not connected to the other data at the db level. When loading tasks that holds references (like channels) they could have changed in the meantime. This is now a hacky way around that updates the channels when loading. Should they be deleted, the tasks fails when running. --- .../scala/docspell/backend/BackendApp.scala | 2 +- .../docspell/backend/ops/OUserTask.scala | 52 ++++++++++++++----- .../notification/api/ChannelType.scala | 4 ++ .../main/scala/docspell/store/qb/DSL.scala | 3 ++ .../store/records/RNotificationChannel.scala | 19 ++++++- .../records/RNotificationChannelGotify.scala | 8 +++ .../records/RNotificationChannelHttp.scala | 8 +++ .../records/RNotificationChannelMail.scala | 8 +++ .../records/RNotificationChannelMatrix.scala | 8 +++ .../docspell/store/usertask/UserTask.scala | 5 ++ 10 files changed, 102 insertions(+), 15 deletions(-) diff --git a/modules/backend/src/main/scala/docspell/backend/BackendApp.scala b/modules/backend/src/main/scala/docspell/backend/BackendApp.scala index c18e72e8..fd217fb5 100644 --- a/modules/backend/src/main/scala/docspell/backend/BackendApp.scala +++ b/modules/backend/src/main/scala/docspell/backend/BackendApp.scala @@ -81,7 +81,7 @@ object BackendApp { itemSearchImpl <- OItemSearch(store) fulltextImpl <- OFulltext(itemSearchImpl, ftsClient, store, queue, joexImpl) mailImpl <- OMail(store, javaEmil) - userTaskImpl <- OUserTask(utStore, queue, joexImpl) + userTaskImpl <- OUserTask(utStore, store, queue, joexImpl) folderImpl <- OFolder(store) customFieldsImpl <- OCustomFields(store) simpleSearchImpl = OSimpleSearch(fulltextImpl, itemSearchImpl) diff --git a/modules/backend/src/main/scala/docspell/backend/ops/OUserTask.scala b/modules/backend/src/main/scala/docspell/backend/ops/OUserTask.scala index 9485e4e0..8f8d0ab3 100644 --- a/modules/backend/src/main/scala/docspell/backend/ops/OUserTask.scala +++ b/modules/backend/src/main/scala/docspell/backend/ops/OUserTask.scala @@ -6,15 +6,16 @@ package docspell.backend.ops -import cats.data.OptionT +import cats.data.{NonEmptyList, OptionT} import cats.effect._ import cats.implicits._ import fs2.Stream import docspell.common._ -import docspell.notification.api.PeriodicDueItemsArgs -import docspell.notification.api.PeriodicQueryArgs +import docspell.notification.api.{ChannelRef, PeriodicDueItemsArgs, PeriodicQueryArgs} +import docspell.store.Store import docspell.store.queue.JobQueue +import docspell.store.records.RNotificationChannel import docspell.store.usertask._ import io.circe.Encoder @@ -83,7 +84,8 @@ trait OUserTask[F[_]] { object OUserTask { def apply[F[_]: Async]( - store: UserTaskStore[F], + taskStore: UserTaskStore[F], + store: Store[F], queue: JobQueue[F], joex: OJoex[F] ): Resource[F, OUserTask[F]] = @@ -100,7 +102,7 @@ object OUserTask { } yield () def getScanMailbox(scope: UserTaskScope): Stream[F, UserTask[ScanMailboxArgs]] = - store + taskStore .getByName[ScanMailboxArgs](scope, ScanMailboxArgs.taskName) def findScanMailbox( @@ -111,8 +113,8 @@ object OUserTask { def deleteTask(scope: UserTaskScope, id: Ident): F[Unit] = (for { - _ <- store.getByIdRaw(scope, id) - _ <- OptionT.liftF(store.deleteTask(scope, id)) + _ <- taskStore.getByIdRaw(scope, id) + _ <- OptionT.liftF(taskStore.deleteTask(scope, id)) } yield ()).getOrElse(()) def submitScanMailbox( @@ -121,21 +123,28 @@ object OUserTask { task: UserTask[ScanMailboxArgs] ): F[Unit] = for { - _ <- store.updateTask[ScanMailboxArgs](scope, subject, task) + _ <- taskStore.updateTask[ScanMailboxArgs](scope, subject, task) _ <- joex.notifyAllNodes } yield () def getNotifyDueItems( scope: UserTaskScope ): Stream[F, UserTask[PeriodicDueItemsArgs]] = - store + taskStore .getByName[PeriodicDueItemsArgs](scope, PeriodicDueItemsArgs.taskName) + .evalMap(ut => + resolveChannels(ut.args.channels) + .map(chs => ut.mapArgs(_.copy(channels = chs))) + ) def findNotifyDueItems( id: Ident, scope: UserTaskScope ): OptionT[F, UserTask[PeriodicDueItemsArgs]] = OptionT(getNotifyDueItems(scope).find(_.id == id).compile.last) + .semiflatMap(ut => + resolveChannels(ut.args.channels).map(ch => ut.mapArgs(_.copy(channels = ch))) + ) def submitNotifyDueItems( scope: UserTaskScope, @@ -143,18 +152,26 @@ object OUserTask { task: UserTask[PeriodicDueItemsArgs] ): F[Unit] = for { - _ <- store.updateTask[PeriodicDueItemsArgs](scope, subject, task) + _ <- taskStore.updateTask[PeriodicDueItemsArgs](scope, subject, task) _ <- joex.notifyAllNodes } yield () def getPeriodicQuery(scope: UserTaskScope): Stream[F, UserTask[PeriodicQueryArgs]] = - store.getByName[PeriodicQueryArgs](scope, PeriodicQueryArgs.taskName) + taskStore + .getByName[PeriodicQueryArgs](scope, PeriodicQueryArgs.taskName) + .evalMap(ut => + resolveChannels(ut.args.channels) + .map(chs => ut.mapArgs(_.copy(channels = chs))) + ) def findPeriodicQuery( id: Ident, scope: UserTaskScope ): OptionT[F, UserTask[PeriodicQueryArgs]] = OptionT(getPeriodicQuery(scope).find(_.id == id).compile.last) + .semiflatMap(ut => + resolveChannels(ut.args.channels).map(ch => ut.mapArgs(_.copy(channels = ch))) + ) def submitPeriodicQuery( scope: UserTaskScope, @@ -162,9 +179,18 @@ object OUserTask { task: UserTask[PeriodicQueryArgs] ): F[Unit] = for { - _ <- store.updateTask[PeriodicQueryArgs](scope, subject, task) + _ <- taskStore.updateTask[PeriodicQueryArgs](scope, subject, task) _ <- joex.notifyAllNodes } yield () - }) + // When retrieving arguments containing channel references, we must update + // details because they could have changed in the db. There are no separate + // database models for each user task, so rather a hacky compromise + private def resolveChannels( + refs: NonEmptyList[ChannelRef] + ): F[NonEmptyList[ChannelRef]] = + store.transact(RNotificationChannel.resolveRefs(refs)).map { resolved => + NonEmptyList.fromList(resolved).getOrElse(refs) + } + }) } diff --git a/modules/notification/api/src/main/scala/docspell/notification/api/ChannelType.scala b/modules/notification/api/src/main/scala/docspell/notification/api/ChannelType.scala index 4403f2be..cb5d177c 100644 --- a/modules/notification/api/src/main/scala/docspell/notification/api/ChannelType.scala +++ b/modules/notification/api/src/main/scala/docspell/notification/api/ChannelType.scala @@ -6,6 +6,7 @@ package docspell.notification.api +import cats.Order import cats.data.{NonEmptyList => Nel} import io.circe.Decoder @@ -43,4 +44,7 @@ object ChannelType { Decoder.decodeString.emap(fromString) implicit val jsonEncoder: Encoder[ChannelType] = Encoder.encodeString.contramap(_.name) + + implicit val order: Order[ChannelType] = + Order.by(_.name) } diff --git a/modules/store/src/main/scala/docspell/store/qb/DSL.scala b/modules/store/src/main/scala/docspell/store/qb/DSL.scala index 22e17dbd..e5cc1a9a 100644 --- a/modules/store/src/main/scala/docspell/store/qb/DSL.scala +++ b/modules/store/src/main/scala/docspell/store/qb/DSL.scala @@ -56,6 +56,9 @@ trait DSL extends DoobieMeta { def union(s1: Select, sn: Select*): Select = Select.Union(s1, sn.toVector) + def union(selects: Nel[Select]): Select = + Select.Union(selects.head, selects.tail.toVector) + def intersect(s1: Select, sn: Select*): Select = Select.Intersect(s1, sn.toVector) diff --git a/modules/store/src/main/scala/docspell/store/records/RNotificationChannel.scala b/modules/store/src/main/scala/docspell/store/records/RNotificationChannel.scala index b51e366c..586bd82a 100644 --- a/modules/store/src/main/scala/docspell/store/records/RNotificationChannel.scala +++ b/modules/store/src/main/scala/docspell/store/records/RNotificationChannel.scala @@ -6,11 +6,12 @@ package docspell.store.records -import cats.data.OptionT +import cats.data.{NonEmptyList => Nel, OptionT} import cats.implicits._ import docspell.common._ import docspell.notification.api.{Channel, ChannelRef, ChannelType} +import docspell.store.qb.DSL._ import doobie._ @@ -130,6 +131,22 @@ object RNotificationChannel { RNotificationChannelHttp.getById(userId)(ref.id).map(_.map(Http.apply)) } + def resolveRefs(refs: Nel[ChannelRef]): ConnectionIO[List[ChannelRef]] = { + val byType = refs.groupByNem(_.channelType) + val queries = byType.toNel + .map { + case (ChannelType.Mail, refs) => + RNotificationChannelMail.findRefs(refs.map(_.id)) + case (ChannelType.Matrix, refs) => + RNotificationChannelMatrix.findRefs(refs.map(_.id)) + case (ChannelType.Gotify, refs) => + RNotificationChannelGotify.findRefs(refs.map(_.id)) + case (ChannelType.Http, refs) => + RNotificationChannelHttp.findRefs(refs.map(_.id)) + } + union(queries).build.query[ChannelRef].to[List] + } + def getByHook(hook: RNotificationHook): ConnectionIO[Vector[RNotificationChannel]] = { def opt(id: Option[Ident]): OptionT[ConnectionIO, Ident] = OptionT.fromOption(id) diff --git a/modules/store/src/main/scala/docspell/store/records/RNotificationChannelGotify.scala b/modules/store/src/main/scala/docspell/store/records/RNotificationChannelGotify.scala index b0da9e79..40bb9528 100644 --- a/modules/store/src/main/scala/docspell/store/records/RNotificationChannelGotify.scala +++ b/modules/store/src/main/scala/docspell/store/records/RNotificationChannelGotify.scala @@ -9,6 +9,7 @@ package docspell.store.records import cats.data.NonEmptyList import docspell.common._ +import docspell.notification.api.ChannelType import docspell.store.qb.DSL._ import docspell.store.qb._ @@ -97,4 +98,11 @@ object RNotificationChannelGotify { T.id === id && T.uid.in(Select(select(u.uid), from(u), u.isAccount(account))) ) } + + def findRefs(ids: NonEmptyList[Ident]): Select = + Select( + select(T.id.s, const(ChannelType.Gotify.name), T.name.s), + from(T), + T.id.in(ids) + ) } diff --git a/modules/store/src/main/scala/docspell/store/records/RNotificationChannelHttp.scala b/modules/store/src/main/scala/docspell/store/records/RNotificationChannelHttp.scala index a70eb8d8..b06eaae7 100644 --- a/modules/store/src/main/scala/docspell/store/records/RNotificationChannelHttp.scala +++ b/modules/store/src/main/scala/docspell/store/records/RNotificationChannelHttp.scala @@ -9,6 +9,7 @@ package docspell.store.records import cats.data.NonEmptyList import docspell.common._ +import docspell.notification.api.ChannelType import docspell.store.qb.DSL._ import docspell.store.qb._ @@ -80,4 +81,11 @@ object RNotificationChannelHttp { T.id === id && T.uid.in(Select(select(u.uid), from(u), u.isAccount(account))) ) } + + def findRefs(ids: NonEmptyList[Ident]): Select = + Select( + select(T.id.s, const(ChannelType.Http.name), T.name.s), + from(T), + T.id.in(ids) + ) } diff --git a/modules/store/src/main/scala/docspell/store/records/RNotificationChannelMail.scala b/modules/store/src/main/scala/docspell/store/records/RNotificationChannelMail.scala index afa2d34c..d73fe0c8 100644 --- a/modules/store/src/main/scala/docspell/store/records/RNotificationChannelMail.scala +++ b/modules/store/src/main/scala/docspell/store/records/RNotificationChannelMail.scala @@ -9,6 +9,7 @@ package docspell.store.records import cats.data.NonEmptyList import docspell.common._ +import docspell.notification.api.ChannelType import docspell.store.qb.DSL._ import docspell.store.qb._ @@ -90,4 +91,11 @@ object RNotificationChannelMail { T.id === id && T.uid.in(Select(select(u.uid), from(u), u.isAccount(account))) ) } + + def findRefs(ids: NonEmptyList[Ident]): Select = + Select( + select(T.id.s, const(ChannelType.Mail.name), T.name.s), + from(T), + T.id.in(ids) + ) } diff --git a/modules/store/src/main/scala/docspell/store/records/RNotificationChannelMatrix.scala b/modules/store/src/main/scala/docspell/store/records/RNotificationChannelMatrix.scala index d6981906..f6965b88 100644 --- a/modules/store/src/main/scala/docspell/store/records/RNotificationChannelMatrix.scala +++ b/modules/store/src/main/scala/docspell/store/records/RNotificationChannelMatrix.scala @@ -9,6 +9,7 @@ package docspell.store.records import cats.data.NonEmptyList import docspell.common._ +import docspell.notification.api.ChannelType import docspell.store.qb.DSL._ import docspell.store.qb._ @@ -106,4 +107,11 @@ object RNotificationChannelMatrix { T.id === id && T.uid.in(Select(select(u.uid), from(u), u.isAccount(account))) ) } + + def findRefs(ids: NonEmptyList[Ident]): Select = + Select( + select(T.id.s, const(ChannelType.Matrix.name), T.name.s), + from(T), + T.id.in(ids) + ) } diff --git a/modules/store/src/main/scala/docspell/store/usertask/UserTask.scala b/modules/store/src/main/scala/docspell/store/usertask/UserTask.scala index 60ce80f3..13685aca 100644 --- a/modules/store/src/main/scala/docspell/store/usertask/UserTask.scala +++ b/modules/store/src/main/scala/docspell/store/usertask/UserTask.scala @@ -29,6 +29,11 @@ case class UserTask[A]( def encode(implicit E: Encoder[A]): UserTask[String] = copy(args = E(args).noSpaces) + def withArgs[B](newArgs: B): UserTask[B] = + copy(args = newArgs) + + def mapArgs[B](f: A => B): UserTask[B] = + withArgs(f(args)) } object UserTask {