Add support for more generic notification

This is a start to have different kinds of notifications. It is
possible to be notified via e-mail, matrix or gotify. It also extends
the current "periodic query" for due items by allowing notification
over different channels. A "generic periodic query" variant is added
as well.
This commit is contained in:
eikek
2021-11-22 00:22:51 +01:00
parent 93a828720c
commit 4ffc8d1f14
175 changed files with 13041 additions and 599 deletions

View File

@ -11,6 +11,7 @@ import cats.implicits._
import fs2.concurrent.SignallingRef
import docspell.analysis.TextAnalyser
import docspell.backend.MailAddressCodec
import docspell.backend.fulltext.CreateIndex
import docspell.backend.msg.{CancelJob, JobQueuePublish, Topics}
import docspell.backend.ops._
@ -32,6 +33,8 @@ import docspell.joex.process.ReProcessItem
import docspell.joex.scanmailbox._
import docspell.joex.scheduler._
import docspell.joex.updatecheck._
import docspell.notification.api.NotificationModule
import docspell.notification.impl.NotificationModuleImpl
import docspell.pubsub.api.{PubSub, PubSubT}
import docspell.store.Store
import docspell.store.queue._
@ -49,16 +52,19 @@ final class JoexAppImpl[F[_]: Async](
pubSubT: PubSubT[F],
pstore: PeriodicTaskStore[F],
termSignal: SignallingRef[F, Boolean],
notificationMod: NotificationModule[F],
val scheduler: Scheduler[F],
val periodicScheduler: PeriodicScheduler[F]
) extends JoexApp[F] {
def init: F[Unit] = {
val run = scheduler.start.compile.drain
val prun = periodicScheduler.start.compile.drain
val eventConsume = notificationMod.consumeAllEvents(2).compile.drain
for {
_ <- scheduleBackgroundTasks
_ <- Async[F].start(run)
_ <- Async[F].start(prun)
_ <- Async[F].start(eventConsume)
_ <- scheduler.periodicAwake
_ <- periodicScheduler.periodicAwake
_ <- subscriptions
@ -115,7 +121,7 @@ final class JoexAppImpl[F[_]: Async](
}
object JoexAppImpl {
object JoexAppImpl extends MailAddressCodec {
def create[F[_]: Async](
cfg: Config,
@ -130,7 +136,12 @@ object JoexAppImpl {
pubSub,
Logger.log4s(org.log4s.getLogger(s"joex-${cfg.appId.id}"))
)
queue <- JobQueuePublish(store, pubSubT)
javaEmil =
JavaMailEmil(Settings.defaultSettings.copy(debug = cfg.mailDebug))
notificationMod <- Resource.eval(
NotificationModuleImpl[F](store, javaEmil, httpClient, 200)
)
queue <- JobQueuePublish(store, pubSubT, notificationMod)
joex <- OJoex(pubSubT)
upload <- OUpload(store, queue, joex)
fts <- createFtsClient(cfg)(httpClient)
@ -140,11 +151,11 @@ object JoexAppImpl {
analyser <- TextAnalyser.create[F](cfg.textAnalysis.textAnalysisConfig)
regexNer <- RegexNerFile(cfg.textAnalysis.regexNerFileConfig, store)
updateCheck <- UpdateCheck.resource(httpClient)
javaEmil =
JavaMailEmil(Settings.defaultSettings.copy(debug = cfg.mailDebug))
notification <- ONotification(store, notificationMod)
sch <- SchedulerBuilder(cfg.scheduler, store)
.withQueue(queue)
.withPubSub(pubSubT)
.withEventSink(notificationMod)
.withTask(
JobTask.json(
ProcessItemArgs.taskName,
@ -263,6 +274,20 @@ object JoexAppImpl {
UpdateCheckTask.onCancel[F]
)
)
.withTask(
JobTask.json(
PeriodicQueryTask.taskName,
PeriodicQueryTask[F](notification),
PeriodicQueryTask.onCancel[F]
)
)
.withTask(
JobTask.json(
PeriodicDueItemsTask.taskName,
PeriodicDueItemsTask[F](notification),
PeriodicDueItemsTask.onCancel[F]
)
)
.resource
psch <- PeriodicScheduler.create(
cfg.periodicScheduler,
@ -271,7 +296,17 @@ object JoexAppImpl {
pstore,
joex
)
app = new JoexAppImpl(cfg, store, queue, pubSubT, pstore, termSignal, sch, psch)
app = new JoexAppImpl(
cfg,
store,
queue,
pubSubT,
pstore,
termSignal,
notificationMod,
sch,
psch
)
appR <- Resource.make(app.init.map(_ => app))(_.initShutdown)
} yield appR

View File

@ -0,0 +1,106 @@
/*
* Copyright 2020 Eike K. & Contributors
*
* SPDX-License-Identifier: AGPL-3.0-or-later
*/
package docspell.joex.notify
import cats.data.NonEmptyList
import cats.effect._
import cats.implicits._
import docspell.backend.ops.ONotification
import docspell.common._
import docspell.joex.scheduler.Context
import docspell.joex.scheduler.Task
import docspell.notification.api.EventContext
import docspell.notification.api.NotificationChannel
import docspell.notification.api.PeriodicDueItemsArgs
import docspell.query.Date
import docspell.query.ItemQuery._
import docspell.query.ItemQueryDsl._
import docspell.store.qb.Batch
import docspell.store.queries.ListItem
import docspell.store.queries.{QItem, Query}
object PeriodicDueItemsTask {
val taskName = PeriodicDueItemsArgs.taskName
type Args = PeriodicDueItemsArgs
def onCancel[F[_]]: Task[F, Args, Unit] =
Task.log(_.warn(s"Cancelling ${taskName.id} task"))
def apply[F[_]: Sync](notificationOps: ONotification[F]): Task[F, Args, Unit] =
Task { ctx =>
val limit = 7
Timestamp.current[F].flatMap { now =>
withItems(ctx, limit, now) { items =>
withEventContext(ctx, items, limit, now) { eventCtx =>
withChannel(ctx, notificationOps) { channels =>
notificationOps.sendMessage(ctx.logger, eventCtx, channels)
}
}
}
}
}
def withChannel[F[_]: Sync](ctx: Context[F, Args], ops: ONotification[F])(
cont: Vector[NotificationChannel] => F[Unit]
): F[Unit] =
TaskOperations.withChannel(ctx.logger, ctx.args.channel, ops)(cont)
def withItems[F[_]: Sync](ctx: Context[F, Args], limit: Int, now: Timestamp)(
cont: Vector[ListItem] => F[Unit]
): F[Unit] = {
val rightDate = Date((now + Duration.days(ctx.args.remindDays.toLong)).toMillis)
val q =
Query
.all(ctx.args.account)
.withOrder(orderAsc = _.dueDate)
.withFix(_.copy(query = Expr.ValidItemStates.some))
.withCond(_ =>
Query.QueryExpr(
Attr.DueDate <= rightDate &&?
ctx.args.daysBack.map(back =>
Attr.DueDate >= Date((now - Duration.days(back.toLong)).toMillis)
) &&?
NonEmptyList
.fromList(ctx.args.tagsInclude)
.map(ids => Q.tagIdsEq(ids.map(_.id))) &&?
NonEmptyList
.fromList(ctx.args.tagsExclude)
.map(ids => Q.tagIdsIn(ids.map(_.id)).negate)
)
)
for {
res <-
ctx.store
.transact(
QItem
.findItems(q, now.toUtcDate, 0, Batch.limit(limit))
.take(limit.toLong)
)
.compile
.toVector
_ <- cont(res)
} yield ()
}
def withEventContext[F[_]](
ctx: Context[F, Args],
items: Vector[ListItem],
limit: Int,
now: Timestamp
)(cont: EventContext => F[Unit]): F[Unit] =
TaskOperations.withEventContext(
ctx.logger,
ctx.args.account,
ctx.args.baseUrl,
items,
limit,
now
)(cont)
}

View File

@ -0,0 +1,84 @@
/*
* Copyright 2020 Eike K. & Contributors
*
* SPDX-License-Identifier: AGPL-3.0-or-later
*/
package docspell.joex.notify
import cats.effect._
import cats.implicits._
import docspell.backend.ops.ONotification
import docspell.common._
import docspell.joex.scheduler.Context
import docspell.joex.scheduler.Task
import docspell.notification.api.EventContext
import docspell.notification.api.NotificationChannel
import docspell.notification.api.PeriodicQueryArgs
import docspell.query.ItemQueryParser
import docspell.store.qb.Batch
import docspell.store.queries.ListItem
import docspell.store.queries.{QItem, Query}
object PeriodicQueryTask {
val taskName = PeriodicQueryArgs.taskName
type Args = PeriodicQueryArgs
def onCancel[F[_]]: Task[F, Args, Unit] =
Task.log(_.warn(s"Cancelling ${taskName.id} task"))
def apply[F[_]: Sync](notificationOps: ONotification[F]): Task[F, Args, Unit] =
Task { ctx =>
val limit = 7
Timestamp.current[F].flatMap { now =>
withItems(ctx, limit, now) { items =>
withEventContext(ctx, items, limit, now) { eventCtx =>
withChannel(ctx, notificationOps) { channels =>
notificationOps.sendMessage(ctx.logger, eventCtx, channels)
}
}
}
}
}
def withChannel[F[_]: Sync](ctx: Context[F, Args], ops: ONotification[F])(
cont: Vector[NotificationChannel] => F[Unit]
): F[Unit] =
TaskOperations.withChannel(ctx.logger, ctx.args.channel, ops)(cont)
def withItems[F[_]: Sync](ctx: Context[F, Args], limit: Int, now: Timestamp)(
cont: Vector[ListItem] => F[Unit]
): F[Unit] =
ItemQueryParser.parse(ctx.args.query.query) match {
case Right(q) =>
val query = Query(Query.Fix(ctx.args.account, Some(q.expr), None))
val items = ctx.store
.transact(QItem.findItems(query, now.toUtcDate, 0, Batch.limit(limit)))
.compile
.to(Vector)
items.flatMap(cont)
case Left(err) =>
ctx.logger.error(
s"Item query is invalid, stopping: ${ctx.args.query} - ${err.render}"
)
}
def withEventContext[F[_]](
ctx: Context[F, Args],
items: Vector[ListItem],
limit: Int,
now: Timestamp
)(cont: EventContext => F[Unit]): F[Unit] =
TaskOperations.withEventContext(
ctx.logger,
ctx.args.account,
ctx.args.baseUrl,
items,
limit,
now
)(cont)
}

View File

@ -0,0 +1,64 @@
/*
* Copyright 2020 Eike K. & Contributors
*
* SPDX-License-Identifier: AGPL-3.0-or-later
*/
package docspell.joex.notify
import cats.data.NonEmptyList
import cats.effect._
import cats.implicits._
import docspell.backend.ops.ONotification
import docspell.common._
import docspell.notification.api.ChannelOrRef
import docspell.notification.api.Event
import docspell.notification.api.EventContext
import docspell.notification.api.NotificationChannel
import docspell.notification.impl.context.ItemSelectionCtx
import docspell.store.queries.ListItem
trait TaskOperations {
def withChannel[F[_]: Sync](
logger: Logger[F],
channel: ChannelOrRef,
ops: ONotification[F]
)(
cont: Vector[NotificationChannel] => F[Unit]
): F[Unit] = {
val channels = channel match {
case Right(ch) => ops.mkNotificationChannel(ch)
case Left(ref) => ops.findNotificationChannel(ref)
}
channels.flatMap { ch =>
if (ch.isEmpty)
logger.error(s"No channels found for the given data: ${channel}")
else cont(ch)
}
}
def withEventContext[F[_]](
logger: Logger[F],
account: AccountId,
baseUrl: Option[LenientUri],
items: Vector[ListItem],
limit: Int,
now: Timestamp
)(cont: EventContext => F[Unit]): F[Unit] =
NonEmptyList.fromFoldable(items) match {
case Some(nel) =>
val more = items.size >= limit
val eventCtx = ItemSelectionCtx(
Event.ItemSelection(account, nel.map(_.id), more, baseUrl),
ItemSelectionCtx.Data
.create(account, items, baseUrl, more, now)
)
cont(eventCtx)
case None =>
logger.info(s"The query selected no items. Notification aborted")
}
}
object TaskOperations extends TaskOperations

View File

@ -11,6 +11,7 @@ import cats.effect.std.Semaphore
import cats.implicits._
import fs2.concurrent.SignallingRef
import docspell.notification.api.EventSink
import docspell.pubsub.api.PubSubT
import docspell.store.Store
import docspell.store.queue.JobQueue
@ -21,7 +22,8 @@ case class SchedulerBuilder[F[_]: Async](
store: Store[F],
queue: Resource[F, JobQueue[F]],
logSink: LogSink[F],
pubSub: PubSubT[F]
pubSub: PubSubT[F],
eventSink: EventSink[F]
) {
def withConfig(cfg: SchedulerConfig): SchedulerBuilder[F] =
@ -45,6 +47,9 @@ case class SchedulerBuilder[F[_]: Async](
def withPubSub(pubSubT: PubSubT[F]): SchedulerBuilder[F] =
copy(pubSub = pubSubT)
def withEventSink(sink: EventSink[F]): SchedulerBuilder[F] =
copy(eventSink = sink)
def serve: Resource[F, Scheduler[F]] =
resource.evalMap(sch => Async[F].start(sch.start.compile.drain).map(_ => sch))
@ -58,6 +63,7 @@ case class SchedulerBuilder[F[_]: Async](
config,
jq,
pubSub,
eventSink,
tasks,
store,
logSink,
@ -83,7 +89,8 @@ object SchedulerBuilder {
store,
JobQueue(store),
LogSink.db[F](store),
PubSubT.noop[F]
PubSubT.noop[F],
EventSink.silent[F]
)
}

View File

@ -17,6 +17,8 @@ import docspell.backend.msg.JobDone
import docspell.common._
import docspell.common.syntax.all._
import docspell.joex.scheduler.SchedulerImpl._
import docspell.notification.api.Event
import docspell.notification.api.EventSink
import docspell.pubsub.api.PubSubT
import docspell.store.Store
import docspell.store.queries.QJob
@ -29,6 +31,7 @@ final class SchedulerImpl[F[_]: Async](
val config: SchedulerConfig,
queue: JobQueue[F],
pubSub: PubSubT[F],
eventSink: EventSink[F],
tasks: JobTaskRegistry[F],
store: Store[F],
logSink: LogSink[F],
@ -206,6 +209,17 @@ final class SchedulerImpl[F[_]: Async](
JobDone.topic,
JobDone(job.id, job.group, job.task, job.args, finalState)
)
_ <- eventSink.offer(
Event.JobDone(
job.id,
job.group,
job.task,
job.args,
job.state,
job.subject,
job.submitter
)
)
} yield ()
def onStart(job: RJob): F[Unit] =