Allow bookmarks in periodic query notification

This commit is contained in:
eikek
2022-01-10 14:25:20 +01:00
parent ccb4df5bd7
commit 699cf091e6
19 changed files with 497 additions and 82 deletions

View File

@ -7,6 +7,7 @@
package docspell.joex.notify
import cats.data.OptionT
import cats.data.{NonEmptyList => Nel}
import cats.effect._
import cats.implicits._
@ -17,10 +18,14 @@ import docspell.joex.scheduler.Task
import docspell.notification.api.EventContext
import docspell.notification.api.NotificationChannel
import docspell.notification.api.PeriodicQueryArgs
import docspell.query.ItemQuery
import docspell.query.ItemQuery.Expr.AndExpr
import docspell.query.ItemQueryParser
import docspell.store.qb.Batch
import docspell.store.queries.ListItem
import docspell.store.queries.{QItem, Query}
import docspell.store.records.RQueryBookmark
import docspell.store.records.RShare
import docspell.store.records.RUser
object PeriodicQueryTask {
@ -54,22 +59,77 @@ object PeriodicQueryTask {
)
.getOrElse(())
private def queryString(q: ItemQuery.Expr) =
ItemQueryParser.asString(q)
def makeQuery[F[_]: Sync](ctx: Context[F, Args])(cont: Query => F[Unit]): F[Unit] = {
def fromBookmark(id: String) =
ctx.store
.transact(RQueryBookmark.findByNameOrId(ctx.args.account, id))
.map(_.map(_.query))
.flatTap(q =>
ctx.logger.debug(s"Loaded bookmark '$id': ${q.map(_.expr).map(queryString)}")
)
def fromShare(id: String) =
ctx.store
.transact(RShare.findOneByCollective(ctx.args.account.collective, Some(true), id))
.map(_.map(_.query))
.flatTap(q =>
ctx.logger.debug(s"Loaded share '$id': ${q.map(_.expr).map(queryString)}")
)
def fromBookmarkOrShare(id: String) =
OptionT(fromBookmark(id)).orElse(OptionT(fromShare(id))).value
def withQuery(bm: Option[ItemQuery], str: String): F[Unit] =
ItemQueryParser.parse(str) match {
case Right(q) =>
val expr = bm.map(b => AndExpr(Nel.of(b.expr, q.expr))).getOrElse(q.expr)
val query = Query(Query.Fix(ctx.args.account, Some(expr), None))
ctx.logger.debug(s"Running query: ${queryString(expr)}") *> cont(query)
case Left(err) =>
ctx.logger.error(
s"Item query is invalid, stopping: ${ctx.args.query.map(_.query)} - ${err.render}"
)
}
(ctx.args.bookmark, ctx.args.query) match {
case (Some(bm), Some(qstr)) =>
ctx.logger.debug(s"Using bookmark $bm and query $qstr") *>
fromBookmarkOrShare(bm).flatMap(bq => withQuery(bq, qstr.query))
case (Some(bm), None) =>
fromBookmarkOrShare(bm).flatMap {
case Some(bq) =>
val query = Query(Query.Fix(ctx.args.account, Some(bq.expr), None))
ctx.logger.debug(s"Using bookmark: ${queryString(bq.expr)}") *> cont(query)
case None =>
ctx.logger.error(
s"No bookmark found for id: $bm. Can't continue. Please fix the task query."
)
}
case (None, Some(qstr)) =>
ctx.logger.debug(s"Using query: ${qstr.query}") *> withQuery(None, qstr.query)
case (None, None) =>
ctx.logger.error(s"No query provided for task $taskName!")
}
}
def withItems[F[_]: Sync](ctx: Context[F, Args], limit: Int, now: Timestamp)(
cont: Vector[ListItem] => F[Unit]
): F[Unit] =
ItemQueryParser.parse(ctx.args.query.query) match {
case Right(q) =>
val query = Query(Query.Fix(ctx.args.account, Some(q.expr), None))
val items = ctx.store
.transact(QItem.findItems(query, now.toUtcDate, 0, Batch.limit(limit)))
.compile
.to(Vector)
makeQuery(ctx) { query =>
val items = ctx.store
.transact(QItem.findItems(query, now.toUtcDate, 0, Batch.limit(limit)))
.compile
.to(Vector)
items.flatMap(cont)
case Left(err) =>
ctx.logger.error(
s"Item query is invalid, stopping: ${ctx.args.query} - ${err.render}"
)
items.flatMap(cont)
}
def withEventContext[F[_]](