Update channel names when retrieving user tasks

Usertasks hold their arguments in an opaque json structure that is not
connected to the other data at the db level. When loading tasks that
holds references (like channels) they could have changed in the
meantime. This is now a hacky way around that updates the channels
when loading. Should they be deleted, the tasks fails when running.
This commit is contained in:
eikek
2022-01-30 17:09:52 +01:00
parent 0097d2bc73
commit 14a413e787
10 changed files with 102 additions and 15 deletions

View File

@ -81,7 +81,7 @@ object BackendApp {
itemSearchImpl <- OItemSearch(store)
fulltextImpl <- OFulltext(itemSearchImpl, ftsClient, store, queue, joexImpl)
mailImpl <- OMail(store, javaEmil)
userTaskImpl <- OUserTask(utStore, queue, joexImpl)
userTaskImpl <- OUserTask(utStore, store, queue, joexImpl)
folderImpl <- OFolder(store)
customFieldsImpl <- OCustomFields(store)
simpleSearchImpl = OSimpleSearch(fulltextImpl, itemSearchImpl)

View File

@ -6,15 +6,16 @@
package docspell.backend.ops
import cats.data.OptionT
import cats.data.{NonEmptyList, OptionT}
import cats.effect._
import cats.implicits._
import fs2.Stream
import docspell.common._
import docspell.notification.api.PeriodicDueItemsArgs
import docspell.notification.api.PeriodicQueryArgs
import docspell.notification.api.{ChannelRef, PeriodicDueItemsArgs, PeriodicQueryArgs}
import docspell.store.Store
import docspell.store.queue.JobQueue
import docspell.store.records.RNotificationChannel
import docspell.store.usertask._
import io.circe.Encoder
@ -83,7 +84,8 @@ trait OUserTask[F[_]] {
object OUserTask {
def apply[F[_]: Async](
store: UserTaskStore[F],
taskStore: UserTaskStore[F],
store: Store[F],
queue: JobQueue[F],
joex: OJoex[F]
): Resource[F, OUserTask[F]] =
@ -100,7 +102,7 @@ object OUserTask {
} yield ()
def getScanMailbox(scope: UserTaskScope): Stream[F, UserTask[ScanMailboxArgs]] =
store
taskStore
.getByName[ScanMailboxArgs](scope, ScanMailboxArgs.taskName)
def findScanMailbox(
@ -111,8 +113,8 @@ object OUserTask {
def deleteTask(scope: UserTaskScope, id: Ident): F[Unit] =
(for {
_ <- store.getByIdRaw(scope, id)
_ <- OptionT.liftF(store.deleteTask(scope, id))
_ <- taskStore.getByIdRaw(scope, id)
_ <- OptionT.liftF(taskStore.deleteTask(scope, id))
} yield ()).getOrElse(())
def submitScanMailbox(
@ -121,21 +123,28 @@ object OUserTask {
task: UserTask[ScanMailboxArgs]
): F[Unit] =
for {
_ <- store.updateTask[ScanMailboxArgs](scope, subject, task)
_ <- taskStore.updateTask[ScanMailboxArgs](scope, subject, task)
_ <- joex.notifyAllNodes
} yield ()
def getNotifyDueItems(
scope: UserTaskScope
): Stream[F, UserTask[PeriodicDueItemsArgs]] =
store
taskStore
.getByName[PeriodicDueItemsArgs](scope, PeriodicDueItemsArgs.taskName)
.evalMap(ut =>
resolveChannels(ut.args.channels)
.map(chs => ut.mapArgs(_.copy(channels = chs)))
)
def findNotifyDueItems(
id: Ident,
scope: UserTaskScope
): OptionT[F, UserTask[PeriodicDueItemsArgs]] =
OptionT(getNotifyDueItems(scope).find(_.id == id).compile.last)
.semiflatMap(ut =>
resolveChannels(ut.args.channels).map(ch => ut.mapArgs(_.copy(channels = ch)))
)
def submitNotifyDueItems(
scope: UserTaskScope,
@ -143,18 +152,26 @@ object OUserTask {
task: UserTask[PeriodicDueItemsArgs]
): F[Unit] =
for {
_ <- store.updateTask[PeriodicDueItemsArgs](scope, subject, task)
_ <- taskStore.updateTask[PeriodicDueItemsArgs](scope, subject, task)
_ <- joex.notifyAllNodes
} yield ()
def getPeriodicQuery(scope: UserTaskScope): Stream[F, UserTask[PeriodicQueryArgs]] =
store.getByName[PeriodicQueryArgs](scope, PeriodicQueryArgs.taskName)
taskStore
.getByName[PeriodicQueryArgs](scope, PeriodicQueryArgs.taskName)
.evalMap(ut =>
resolveChannels(ut.args.channels)
.map(chs => ut.mapArgs(_.copy(channels = chs)))
)
def findPeriodicQuery(
id: Ident,
scope: UserTaskScope
): OptionT[F, UserTask[PeriodicQueryArgs]] =
OptionT(getPeriodicQuery(scope).find(_.id == id).compile.last)
.semiflatMap(ut =>
resolveChannels(ut.args.channels).map(ch => ut.mapArgs(_.copy(channels = ch)))
)
def submitPeriodicQuery(
scope: UserTaskScope,
@ -162,9 +179,18 @@ object OUserTask {
task: UserTask[PeriodicQueryArgs]
): F[Unit] =
for {
_ <- store.updateTask[PeriodicQueryArgs](scope, subject, task)
_ <- taskStore.updateTask[PeriodicQueryArgs](scope, subject, task)
_ <- joex.notifyAllNodes
} yield ()
})
// When retrieving arguments containing channel references, we must update
// details because they could have changed in the db. There are no separate
// database models for each user task, so rather a hacky compromise
private def resolveChannels(
refs: NonEmptyList[ChannelRef]
): F[NonEmptyList[ChannelRef]] =
store.transact(RNotificationChannel.resolveRefs(refs)).map { resolved =>
NonEmptyList.fromList(resolved).getOrElse(refs)
}
})
}

View File

@ -6,6 +6,7 @@
package docspell.notification.api
import cats.Order
import cats.data.{NonEmptyList => Nel}
import io.circe.Decoder
@ -43,4 +44,7 @@ object ChannelType {
Decoder.decodeString.emap(fromString)
implicit val jsonEncoder: Encoder[ChannelType] =
Encoder.encodeString.contramap(_.name)
implicit val order: Order[ChannelType] =
Order.by(_.name)
}

View File

@ -56,6 +56,9 @@ trait DSL extends DoobieMeta {
def union(s1: Select, sn: Select*): Select =
Select.Union(s1, sn.toVector)
def union(selects: Nel[Select]): Select =
Select.Union(selects.head, selects.tail.toVector)
def intersect(s1: Select, sn: Select*): Select =
Select.Intersect(s1, sn.toVector)

View File

@ -6,11 +6,12 @@
package docspell.store.records
import cats.data.OptionT
import cats.data.{NonEmptyList => Nel, OptionT}
import cats.implicits._
import docspell.common._
import docspell.notification.api.{Channel, ChannelRef, ChannelType}
import docspell.store.qb.DSL._
import doobie._
@ -130,6 +131,22 @@ object RNotificationChannel {
RNotificationChannelHttp.getById(userId)(ref.id).map(_.map(Http.apply))
}
def resolveRefs(refs: Nel[ChannelRef]): ConnectionIO[List[ChannelRef]] = {
val byType = refs.groupByNem(_.channelType)
val queries = byType.toNel
.map {
case (ChannelType.Mail, refs) =>
RNotificationChannelMail.findRefs(refs.map(_.id))
case (ChannelType.Matrix, refs) =>
RNotificationChannelMatrix.findRefs(refs.map(_.id))
case (ChannelType.Gotify, refs) =>
RNotificationChannelGotify.findRefs(refs.map(_.id))
case (ChannelType.Http, refs) =>
RNotificationChannelHttp.findRefs(refs.map(_.id))
}
union(queries).build.query[ChannelRef].to[List]
}
def getByHook(hook: RNotificationHook): ConnectionIO[Vector[RNotificationChannel]] = {
def opt(id: Option[Ident]): OptionT[ConnectionIO, Ident] =
OptionT.fromOption(id)

View File

@ -9,6 +9,7 @@ package docspell.store.records
import cats.data.NonEmptyList
import docspell.common._
import docspell.notification.api.ChannelType
import docspell.store.qb.DSL._
import docspell.store.qb._
@ -97,4 +98,11 @@ object RNotificationChannelGotify {
T.id === id && T.uid.in(Select(select(u.uid), from(u), u.isAccount(account)))
)
}
def findRefs(ids: NonEmptyList[Ident]): Select =
Select(
select(T.id.s, const(ChannelType.Gotify.name), T.name.s),
from(T),
T.id.in(ids)
)
}

View File

@ -9,6 +9,7 @@ package docspell.store.records
import cats.data.NonEmptyList
import docspell.common._
import docspell.notification.api.ChannelType
import docspell.store.qb.DSL._
import docspell.store.qb._
@ -80,4 +81,11 @@ object RNotificationChannelHttp {
T.id === id && T.uid.in(Select(select(u.uid), from(u), u.isAccount(account)))
)
}
def findRefs(ids: NonEmptyList[Ident]): Select =
Select(
select(T.id.s, const(ChannelType.Http.name), T.name.s),
from(T),
T.id.in(ids)
)
}

View File

@ -9,6 +9,7 @@ package docspell.store.records
import cats.data.NonEmptyList
import docspell.common._
import docspell.notification.api.ChannelType
import docspell.store.qb.DSL._
import docspell.store.qb._
@ -90,4 +91,11 @@ object RNotificationChannelMail {
T.id === id && T.uid.in(Select(select(u.uid), from(u), u.isAccount(account)))
)
}
def findRefs(ids: NonEmptyList[Ident]): Select =
Select(
select(T.id.s, const(ChannelType.Mail.name), T.name.s),
from(T),
T.id.in(ids)
)
}

View File

@ -9,6 +9,7 @@ package docspell.store.records
import cats.data.NonEmptyList
import docspell.common._
import docspell.notification.api.ChannelType
import docspell.store.qb.DSL._
import docspell.store.qb._
@ -106,4 +107,11 @@ object RNotificationChannelMatrix {
T.id === id && T.uid.in(Select(select(u.uid), from(u), u.isAccount(account)))
)
}
def findRefs(ids: NonEmptyList[Ident]): Select =
Select(
select(T.id.s, const(ChannelType.Matrix.name), T.name.s),
from(T),
T.id.in(ids)
)
}

View File

@ -29,6 +29,11 @@ case class UserTask[A](
def encode(implicit E: Encoder[A]): UserTask[String] =
copy(args = E(args).noSpaces)
def withArgs[B](newArgs: B): UserTask[B] =
copy(args = newArgs)
def mapArgs[B](f: A => B): UserTask[B] =
withArgs(f(args))
}
object UserTask {