Refactor user tasks to support collective and user scopes

Before, there were periodic tasks run per collective and not user by
making sure that submitter + group are the same value. This is now
encoded in `UserTaskScope` so it is now obvious and errors can be
reduced when using this.
This commit is contained in:
eikek 2021-08-14 21:10:09 +02:00
parent 548dfb9a57
commit 31d885ed79
14 changed files with 177 additions and 114 deletions

View File

@ -18,8 +18,7 @@ import docspell.store.UpdateResult
import docspell.store.queries.QCollective import docspell.store.queries.QCollective
import docspell.store.queue.JobQueue import docspell.store.queue.JobQueue
import docspell.store.records._ import docspell.store.records._
import docspell.store.usertask.UserTask import docspell.store.usertask.{UserTask, UserTaskScope, UserTaskStore}
import docspell.store.usertask.UserTaskStore
import docspell.store.{AddResult, Store} import docspell.store.{AddResult, Store}
import com.github.eikek.calev._ import com.github.eikek.calev._
@ -169,7 +168,7 @@ object OCollective {
None, None,
LearnClassifierArgs(coll) LearnClassifierArgs(coll)
) )
_ <- uts.updateOneTask(AccountId(coll, LearnClassifierArgs.taskName), ut) _ <- uts.updateOneTask(UserTaskScope(coll), ut)
_ <- joex.notifyAllNodes _ <- joex.notifyAllNodes
} yield () } yield ()
@ -185,7 +184,7 @@ object OCollective {
None, None,
EmptyTrashArgs(coll) EmptyTrashArgs(coll)
) )
_ <- uts.updateOneTask(AccountId(coll, coll), ut) _ <- uts.updateOneTask(UserTaskScope(coll), ut)
_ <- joex.notifyAllNodes _ <- joex.notifyAllNodes
} yield () } yield ()
@ -199,7 +198,7 @@ object OCollective {
CalEvent(WeekdayComponent.All, DateEvent.All, TimeEvent.All), CalEvent(WeekdayComponent.All, DateEvent.All, TimeEvent.All),
None, None,
LearnClassifierArgs(collective) LearnClassifierArgs(collective)
).encode.toPeriodicTask(AccountId(collective, LearnClassifierArgs.taskName)) ).encode.toPeriodicTask(UserTaskScope(collective))
job <- ut.toJob job <- ut.toJob
_ <- queue.insert(job) _ <- queue.insert(job)
_ <- joex.notifyAllNodes _ <- joex.notifyAllNodes
@ -215,7 +214,7 @@ object OCollective {
CalEvent(WeekdayComponent.All, DateEvent.All, TimeEvent.All), CalEvent(WeekdayComponent.All, DateEvent.All, TimeEvent.All),
None, None,
EmptyTrashArgs(collective) EmptyTrashArgs(collective)
).encode.toPeriodicTask(AccountId(collective, collective)) ).encode.toPeriodicTask(UserTaskScope(collective))
job <- ut.toJob job <- ut.toJob
_ <- queue.insert(job) _ <- queue.insert(job)
_ <- joex.notifyAllNodes _ <- joex.notifyAllNodes

View File

@ -21,47 +21,47 @@ trait OUserTask[F[_]] {
/** Return the settings for all scan-mailbox tasks of the current user. /** Return the settings for all scan-mailbox tasks of the current user.
*/ */
def getScanMailbox(account: AccountId): Stream[F, UserTask[ScanMailboxArgs]] def getScanMailbox(scope: UserTaskScope): Stream[F, UserTask[ScanMailboxArgs]]
/** Find a scan-mailbox task by the given id. */ /** Find a scan-mailbox task by the given id. */
def findScanMailbox( def findScanMailbox(
id: Ident, id: Ident,
account: AccountId scope: UserTaskScope
): OptionT[F, UserTask[ScanMailboxArgs]] ): OptionT[F, UserTask[ScanMailboxArgs]]
/** Updates the scan-mailbox tasks and notifies the joex nodes. /** Updates the scan-mailbox tasks and notifies the joex nodes.
*/ */
def submitScanMailbox( def submitScanMailbox(
account: AccountId, scope: UserTaskScope,
task: UserTask[ScanMailboxArgs] task: UserTask[ScanMailboxArgs]
): F[Unit] ): F[Unit]
/** Return the settings for all the notify-due-items task of the /** Return the settings for all the notify-due-items task of the
* current user. * current user.
*/ */
def getNotifyDueItems(account: AccountId): Stream[F, UserTask[NotifyDueItemsArgs]] def getNotifyDueItems(scope: UserTaskScope): Stream[F, UserTask[NotifyDueItemsArgs]]
/** Find a notify-due-items task by the given id. */ /** Find a notify-due-items task by the given id. */
def findNotifyDueItems( def findNotifyDueItems(
id: Ident, id: Ident,
account: AccountId scope: UserTaskScope
): OptionT[F, UserTask[NotifyDueItemsArgs]] ): OptionT[F, UserTask[NotifyDueItemsArgs]]
/** Updates the notify-due-items tasks and notifies the joex nodes. /** Updates the notify-due-items tasks and notifies the joex nodes.
*/ */
def submitNotifyDueItems( def submitNotifyDueItems(
account: AccountId, scope: UserTaskScope,
task: UserTask[NotifyDueItemsArgs] task: UserTask[NotifyDueItemsArgs]
): F[Unit] ): F[Unit]
/** Removes a user task with the given id. */ /** Removes a user task with the given id. */
def deleteTask(account: AccountId, id: Ident): F[Unit] def deleteTask(scope: UserTaskScope, id: Ident): F[Unit]
/** Discards the schedule and immediately submits the task to the job /** Discards the schedule and immediately submits the task to the job
* executor's queue. It will not update the corresponding periodic * executor's queue. It will not update the corresponding periodic
* task. * task.
*/ */
def executeNow[A](account: AccountId, task: UserTask[A])(implicit def executeNow[A](scope: UserTaskScope, task: UserTask[A])(implicit
E: Encoder[A] E: Encoder[A]
): F[Unit] ): F[Unit]
} }
@ -75,57 +75,59 @@ object OUserTask {
): Resource[F, OUserTask[F]] = ): Resource[F, OUserTask[F]] =
Resource.pure[F, OUserTask[F]](new OUserTask[F] { Resource.pure[F, OUserTask[F]](new OUserTask[F] {
def executeNow[A](account: AccountId, task: UserTask[A])(implicit def executeNow[A](scope: UserTaskScope, task: UserTask[A])(implicit
E: Encoder[A] E: Encoder[A]
): F[Unit] = ): F[Unit] =
for { for {
ptask <- task.encode.toPeriodicTask(account) ptask <- task.encode.toPeriodicTask(scope)
job <- ptask.toJob job <- ptask.toJob
_ <- queue.insert(job) _ <- queue.insert(job)
_ <- joex.notifyAllNodes _ <- joex.notifyAllNodes
} yield () } yield ()
def getScanMailbox(account: AccountId): Stream[F, UserTask[ScanMailboxArgs]] = def getScanMailbox(scope: UserTaskScope): Stream[F, UserTask[ScanMailboxArgs]] =
store store
.getByName[ScanMailboxArgs](account, ScanMailboxArgs.taskName) .getByName[ScanMailboxArgs](scope, ScanMailboxArgs.taskName)
def findScanMailbox( def findScanMailbox(
id: Ident, id: Ident,
account: AccountId scope: UserTaskScope
): OptionT[F, UserTask[ScanMailboxArgs]] = ): OptionT[F, UserTask[ScanMailboxArgs]] =
OptionT(getScanMailbox(account).find(_.id == id).compile.last) OptionT(getScanMailbox(scope).find(_.id == id).compile.last)
def deleteTask(account: AccountId, id: Ident): F[Unit] = def deleteTask(scope: UserTaskScope, id: Ident): F[Unit] =
(for { (for {
_ <- store.getByIdRaw(account, id) _ <- store.getByIdRaw(scope, id)
_ <- OptionT.liftF(store.deleteTask(account, id)) _ <- OptionT.liftF(store.deleteTask(scope, id))
} yield ()).getOrElse(()) } yield ()).getOrElse(())
def submitScanMailbox( def submitScanMailbox(
account: AccountId, scope: UserTaskScope,
task: UserTask[ScanMailboxArgs] task: UserTask[ScanMailboxArgs]
): F[Unit] = ): F[Unit] =
for { for {
_ <- store.updateTask[ScanMailboxArgs](account, task) _ <- store.updateTask[ScanMailboxArgs](scope, task)
_ <- joex.notifyAllNodes _ <- joex.notifyAllNodes
} yield () } yield ()
def getNotifyDueItems(account: AccountId): Stream[F, UserTask[NotifyDueItemsArgs]] = def getNotifyDueItems(
scope: UserTaskScope
): Stream[F, UserTask[NotifyDueItemsArgs]] =
store store
.getByName[NotifyDueItemsArgs](account, NotifyDueItemsArgs.taskName) .getByName[NotifyDueItemsArgs](scope, NotifyDueItemsArgs.taskName)
def findNotifyDueItems( def findNotifyDueItems(
id: Ident, id: Ident,
account: AccountId scope: UserTaskScope
): OptionT[F, UserTask[NotifyDueItemsArgs]] = ): OptionT[F, UserTask[NotifyDueItemsArgs]] =
OptionT(getNotifyDueItems(account).find(_.id == id).compile.last) OptionT(getNotifyDueItems(scope).find(_.id == id).compile.last)
def submitNotifyDueItems( def submitNotifyDueItems(
account: AccountId, scope: UserTaskScope,
task: UserTask[NotifyDueItemsArgs] task: UserTask[NotifyDueItemsArgs]
): F[Unit] = ): F[Unit] =
for { for {
_ <- store.updateTask[NotifyDueItemsArgs](account, task) _ <- store.updateTask[NotifyDueItemsArgs](scope, task)
_ <- joex.notifyAllNodes _ <- joex.notifyAllNodes
} yield () } yield ()
}) })

View File

@ -6,8 +6,9 @@
package docspell.common package docspell.common
import com.github.eikek.calev.CalEvent
import docspell.common.syntax.all._ import docspell.common.syntax.all._
import com.github.eikek.calev.CalEvent
import io.circe._ import io.circe._
import io.circe.generic.semiauto._ import io.circe.generic.semiauto._

View File

@ -7,9 +7,11 @@
package docspell.joex package docspell.joex
import scala.concurrent.ExecutionContext import scala.concurrent.ExecutionContext
import cats.effect._ import cats.effect._
import cats.implicits._ import cats.implicits._
import fs2.concurrent.SignallingRef import fs2.concurrent.SignallingRef
import docspell.analysis.TextAnalyser import docspell.analysis.TextAnalyser
import docspell.backend.ops._ import docspell.backend.ops._
import docspell.common._ import docspell.common._
@ -33,6 +35,7 @@ import docspell.joexapi.client.JoexClient
import docspell.store.Store import docspell.store.Store
import docspell.store.queue._ import docspell.store.queue._
import docspell.store.records.{REmptyTrashSetting, RJobLog} import docspell.store.records.{REmptyTrashSetting, RJobLog}
import emil.javamail._ import emil.javamail._
import org.http4s.blaze.client.BlazeClientBuilder import org.http4s.blaze.client.BlazeClientBuilder
import org.http4s.client.Client import org.http4s.client.Client

View File

@ -8,13 +8,15 @@ package docspell.joex.emptytrash
import cats.effect._ import cats.effect._
import cats.implicits._ import cats.implicits._
import com.github.eikek.calev.CalEvent
import fs2.Stream import fs2.Stream
import docspell.backend.ops.{OItem, OItemSearch} import docspell.backend.ops.{OItem, OItemSearch}
import docspell.common._ import docspell.common._
import docspell.joex.scheduler._ import docspell.joex.scheduler._
import docspell.store.records.{RItem, RPeriodicTask} import docspell.store.records.{RItem, RPeriodicTask}
import docspell.store.usertask.UserTask import docspell.store.usertask.{UserTask, UserTaskScope}
import com.github.eikek.calev.CalEvent
object EmptyTrashTask { object EmptyTrashTask {
type Args = EmptyTrashArgs type Args = EmptyTrashArgs
@ -24,8 +26,10 @@ object EmptyTrashTask {
private val pageSize = 20 private val pageSize = 20
def periodicTask[F[_]: Sync](collective: Ident, ce: CalEvent): F[RPeriodicTask] = { def periodicTask[F[_]: Sync](collective: Ident, ce: CalEvent): F[RPeriodicTask] =
Ident.randomId[F].flatMap( id => Ident
.randomId[F]
.flatMap(id =>
UserTask( UserTask(
id, id,
EmptyTrashArgs.taskName, EmptyTrashArgs.taskName,
@ -33,9 +37,8 @@ object EmptyTrashTask {
ce, ce,
None, None,
EmptyTrashArgs(collective) EmptyTrashArgs(collective)
).encode.toPeriodicTask(AccountId(collective, collective))) ).encode.toPeriodicTask(UserTaskScope(collective))
} )
def apply[F[_]: Async]( def apply[F[_]: Async](
itemOps: OItem[F], itemOps: OItem[F],

View File

@ -13,6 +13,7 @@ import docspell.common._
import docspell.joex.Config import docspell.joex.Config
import docspell.joex.scheduler.Task import docspell.joex.scheduler.Task
import docspell.store.records._ import docspell.store.records._
import docspell.store.usertask.UserTaskScope
import com.github.eikek.calev._ import com.github.eikek.calev._
@ -36,11 +37,10 @@ object HouseKeepingTask {
RPeriodicTask RPeriodicTask
.createJson( .createJson(
true, true,
UserTaskScope(DocspellSystem.taskGroup),
taskName, taskName,
DocspellSystem.taskGroup,
(), (),
"Docspell house-keeping", "Docspell house-keeping",
DocspellSystem.taskGroup,
Priority.Low, Priority.Low,
ce, ce,
None None

View File

@ -8,6 +8,7 @@ package docspell.restserver.routes
import cats.effect._ import cats.effect._
import cats.implicits._ import cats.implicits._
import docspell.backend.BackendApp import docspell.backend.BackendApp
import docspell.backend.auth.AuthToken import docspell.backend.auth.AuthToken
import docspell.backend.ops.OCollective import docspell.backend.ops.OCollective
@ -15,6 +16,7 @@ import docspell.common.{EmptyTrashArgs, ListType}
import docspell.restapi.model._ import docspell.restapi.model._
import docspell.restserver.conv.Conversions import docspell.restserver.conv.Conversions
import docspell.restserver.http4s._ import docspell.restserver.http4s._
import com.github.eikek.calev.CalEvent import com.github.eikek.calev.CalEvent
import org.http4s.HttpRoutes import org.http4s.HttpRoutes
import org.http4s.circe.CirceEntityDecoder._ import org.http4s.circe.CirceEntityDecoder._

View File

@ -38,7 +38,7 @@ object NotifyDueItemsRoutes {
HttpRoutes.of { HttpRoutes.of {
case GET -> Root / Ident(id) => case GET -> Root / Ident(id) =>
(for { (for {
task <- ut.findNotifyDueItems(id, user.account) task <- ut.findNotifyDueItems(id, UserTaskScope(user.account))
res <- OptionT.liftF(taskToSettings(user.account, backend, task)) res <- OptionT.liftF(taskToSettings(user.account, backend, task))
resp <- OptionT.liftF(Ok(res)) resp <- OptionT.liftF(Ok(res))
} yield resp).getOrElseF(NotFound()) } yield resp).getOrElseF(NotFound())
@ -49,7 +49,7 @@ object NotifyDueItemsRoutes {
newId <- Ident.randomId[F] newId <- Ident.randomId[F]
task <- makeTask(newId, getBaseUrl(cfg, req), user.account, data) task <- makeTask(newId, getBaseUrl(cfg, req), user.account, data)
res <- res <-
ut.executeNow(user.account, task) ut.executeNow(UserTaskScope(user.account), task)
.attempt .attempt
.map(Conversions.basicResult(_, "Submitted successfully.")) .map(Conversions.basicResult(_, "Submitted successfully."))
resp <- Ok(res) resp <- Ok(res)
@ -58,7 +58,7 @@ object NotifyDueItemsRoutes {
case DELETE -> Root / Ident(id) => case DELETE -> Root / Ident(id) =>
for { for {
res <- res <-
ut.deleteTask(user.account, id) ut.deleteTask(UserTaskScope(user.account), id)
.attempt .attempt
.map(Conversions.basicResult(_, "Deleted successfully")) .map(Conversions.basicResult(_, "Deleted successfully"))
resp <- Ok(res) resp <- Ok(res)
@ -69,7 +69,7 @@ object NotifyDueItemsRoutes {
for { for {
task <- makeTask(data.id, getBaseUrl(cfg, req), user.account, data) task <- makeTask(data.id, getBaseUrl(cfg, req), user.account, data)
res <- res <-
ut.submitNotifyDueItems(user.account, task) ut.submitNotifyDueItems(UserTaskScope(user.account), task)
.attempt .attempt
.map(Conversions.basicResult(_, "Saved successfully")) .map(Conversions.basicResult(_, "Saved successfully"))
resp <- Ok(res) resp <- Ok(res)
@ -87,14 +87,14 @@ object NotifyDueItemsRoutes {
newId <- Ident.randomId[F] newId <- Ident.randomId[F]
task <- makeTask(newId, getBaseUrl(cfg, req), user.account, data) task <- makeTask(newId, getBaseUrl(cfg, req), user.account, data)
res <- res <-
ut.submitNotifyDueItems(user.account, task) ut.submitNotifyDueItems(UserTaskScope(user.account), task)
.attempt .attempt
.map(Conversions.basicResult(_, "Saved successfully.")) .map(Conversions.basicResult(_, "Saved successfully."))
resp <- Ok(res) resp <- Ok(res)
} yield resp } yield resp
case GET -> Root => case GET -> Root =>
ut.getNotifyDueItems(user.account) ut.getNotifyDueItems(UserTaskScope(user.account))
.evalMap(task => taskToSettings(user.account, backend, task)) .evalMap(task => taskToSettings(user.account, backend, task))
.compile .compile
.toVector .toVector

View File

@ -35,7 +35,7 @@ object ScanMailboxRoutes {
HttpRoutes.of { HttpRoutes.of {
case GET -> Root / Ident(id) => case GET -> Root / Ident(id) =>
(for { (for {
task <- ut.findScanMailbox(id, user.account) task <- ut.findScanMailbox(id, UserTaskScope(user.account))
res <- OptionT.liftF(taskToSettings(user.account, backend, task)) res <- OptionT.liftF(taskToSettings(user.account, backend, task))
resp <- OptionT.liftF(Ok(res)) resp <- OptionT.liftF(Ok(res))
} yield resp).getOrElseF(NotFound()) } yield resp).getOrElseF(NotFound())
@ -46,7 +46,7 @@ object ScanMailboxRoutes {
newId <- Ident.randomId[F] newId <- Ident.randomId[F]
task <- makeTask(newId, user.account, data) task <- makeTask(newId, user.account, data)
res <- res <-
ut.executeNow(user.account, task) ut.executeNow(UserTaskScope(user.account), task)
.attempt .attempt
.map(Conversions.basicResult(_, "Submitted successfully.")) .map(Conversions.basicResult(_, "Submitted successfully."))
resp <- Ok(res) resp <- Ok(res)
@ -55,7 +55,7 @@ object ScanMailboxRoutes {
case DELETE -> Root / Ident(id) => case DELETE -> Root / Ident(id) =>
for { for {
res <- res <-
ut.deleteTask(user.account, id) ut.deleteTask(UserTaskScope(user.account), id)
.attempt .attempt
.map(Conversions.basicResult(_, "Deleted successfully.")) .map(Conversions.basicResult(_, "Deleted successfully."))
resp <- Ok(res) resp <- Ok(res)
@ -66,7 +66,7 @@ object ScanMailboxRoutes {
for { for {
task <- makeTask(data.id, user.account, data) task <- makeTask(data.id, user.account, data)
res <- res <-
ut.submitScanMailbox(user.account, task) ut.submitScanMailbox(UserTaskScope(user.account), task)
.attempt .attempt
.map(Conversions.basicResult(_, "Saved successfully.")) .map(Conversions.basicResult(_, "Saved successfully."))
resp <- Ok(res) resp <- Ok(res)
@ -84,14 +84,14 @@ object ScanMailboxRoutes {
newId <- Ident.randomId[F] newId <- Ident.randomId[F]
task <- makeTask(newId, user.account, data) task <- makeTask(newId, user.account, data)
res <- res <-
ut.submitScanMailbox(user.account, task) ut.submitScanMailbox(UserTaskScope(user.account), task)
.attempt .attempt
.map(Conversions.basicResult(_, "Saved successfully.")) .map(Conversions.basicResult(_, "Saved successfully."))
resp <- Ok(res) resp <- Ok(res)
} yield resp } yield resp
case GET -> Root => case GET -> Root =>
ut.getScanMailbox(user.account) ut.getScanMailbox(UserTaskScope(user.account))
.evalMap(task => taskToSettings(user.account, backend, task)) .evalMap(task => taskToSettings(user.account, backend, task))
.compile .compile
.toVector .toVector

View File

@ -12,7 +12,7 @@ import docspell.common._
import docspell.store.qb.DSL._ import docspell.store.qb.DSL._
import docspell.store.qb._ import docspell.store.qb._
import docspell.store.records._ import docspell.store.records._
import docspell.store.usertask.UserTask import docspell.store.usertask.{UserTask, UserTaskScope}
import doobie._ import doobie._
@ -54,15 +54,15 @@ object QUserTask {
) )
).query[RPeriodicTask].option.map(_.map(makeUserTask)) ).query[RPeriodicTask].option.map(_.map(makeUserTask))
def insert(account: AccountId, task: UserTask[String]): ConnectionIO[Int] = def insert(scope: UserTaskScope, task: UserTask[String]): ConnectionIO[Int] =
for { for {
r <- task.toPeriodicTask[ConnectionIO](account) r <- task.toPeriodicTask[ConnectionIO](scope)
n <- RPeriodicTask.insert(r) n <- RPeriodicTask.insert(r)
} yield n } yield n
def update(account: AccountId, task: UserTask[String]): ConnectionIO[Int] = def update(scope: UserTaskScope, task: UserTask[String]): ConnectionIO[Int] =
for { for {
r <- task.toPeriodicTask[ConnectionIO](account) r <- task.toPeriodicTask[ConnectionIO](scope)
n <- RPeriodicTask.update(r) n <- RPeriodicTask.update(r)
} yield n } yield n

View File

@ -13,6 +13,7 @@ import cats.implicits._
import docspell.common._ import docspell.common._
import docspell.store.qb.DSL._ import docspell.store.qb.DSL._
import docspell.store.qb._ import docspell.store.qb._
import docspell.store.usertask.UserTaskScope
import com.github.eikek.calev.CalEvent import com.github.eikek.calev.CalEvent
import doobie._ import doobie._
@ -67,11 +68,10 @@ object RPeriodicTask {
def create[F[_]: Sync]( def create[F[_]: Sync](
enabled: Boolean, enabled: Boolean,
scope: UserTaskScope,
task: Ident, task: Ident,
group: Ident,
args: String, args: String,
subject: String, subject: String,
submitter: Ident,
priority: Priority, priority: Priority,
timer: CalEvent, timer: CalEvent,
summary: Option[String] summary: Option[String]
@ -86,10 +86,10 @@ object RPeriodicTask {
id, id,
enabled, enabled,
task, task,
group, scope.collective,
args, args,
subject, subject,
submitter, scope.fold(_.user, identity),
priority, priority,
None, None,
None, None,
@ -107,22 +107,20 @@ object RPeriodicTask {
def createJson[F[_]: Sync, A]( def createJson[F[_]: Sync, A](
enabled: Boolean, enabled: Boolean,
scope: UserTaskScope,
task: Ident, task: Ident,
group: Ident,
args: A, args: A,
subject: String, subject: String,
submitter: Ident,
priority: Priority, priority: Priority,
timer: CalEvent, timer: CalEvent,
summary: Option[String] summary: Option[String]
)(implicit E: Encoder[A]): F[RPeriodicTask] = )(implicit E: Encoder[A]): F[RPeriodicTask] =
create[F]( create[F](
enabled, enabled,
scope,
task, task,
group,
E(args).noSpaces, E(args).noSpaces,
subject, subject,
submitter,
priority, priority,
timer, timer,
summary summary

View File

@ -43,16 +43,15 @@ object UserTask {
.map(a => ut.copy(args = a)) .map(a => ut.copy(args = a))
def toPeriodicTask[F[_]: Sync]( def toPeriodicTask[F[_]: Sync](
account: AccountId scope: UserTaskScope
): F[RPeriodicTask] = ): F[RPeriodicTask] =
RPeriodicTask RPeriodicTask
.create[F]( .create[F](
ut.enabled, ut.enabled,
scope,
ut.name, ut.name,
account.collective,
ut.args, ut.args,
s"${account.user.id}: ${ut.name.id}", s"${scope.fold(_.user.id, _.id)}: ${ut.name.id}",
account.user,
Priority.Low, Priority.Low,
ut.timer, ut.timer,
ut.summary ut.summary

View File

@ -0,0 +1,52 @@
/*
* Copyright 2020 Docspell Contributors
*
* SPDX-License-Identifier: GPL-3.0-or-later
*/
package docspell.store.usertask
import docspell.common._
sealed trait UserTaskScope { self: Product =>
def name: String =
productPrefix.toLowerCase
def collective: Ident
def fold[A](fa: AccountId => A, fb: Ident => A): A
/** Maps to the account or uses the collective for both parts if the
* scope is collective wide.
*/
private[usertask] def toAccountId: AccountId =
AccountId(collective, fold(_.user, identity))
}
object UserTaskScope {
final case class Account(account: AccountId) extends UserTaskScope {
val collective = account.collective
def fold[A](fa: AccountId => A, fb: Ident => A): A =
fa(account)
}
final case class Collective(collective: Ident) extends UserTaskScope {
def fold[A](fa: AccountId => A, fb: Ident => A): A =
fb(collective)
}
def collective(id: Ident): UserTaskScope =
Collective(id)
def account(accountId: AccountId): UserTaskScope =
Account(accountId)
def apply(accountId: AccountId): UserTaskScope =
UserTaskScope.account(accountId)
def apply(collective: Ident): UserTaskScope =
UserTaskScope.collective(collective)
}

View File

@ -22,13 +22,15 @@ import io.circe._
* once. * once.
* *
* This class defines methods at a higher level, dealing with * This class defines methods at a higher level, dealing with
* `UserTask` and `AccountId` instead of directly using * `UserTask` and `UserTaskScope` instead of directly using
* `RPeriodicTask`. A user task is associated to a specific user (not * `RPeriodicTask`. A user task is associated to a specific user (not
* just the collective). * just the collective). But it can be associated to the whole
* collective by using the collective as submitter, too. This is
* abstracted in `UserTaskScope`.
* *
* implNote: The mapping is as follows: The collective is the task * implNote: The mapping is as follows: The collective is the task
* group. The submitter property contains the username. Once a task * group. The submitter property contains the username. Once a task
* is saved to the database, it can only be refernced uniquely by its * is saved to the database, it can only be referenced uniquely by its
* id. A user may submit multiple same tasks (with different * id. A user may submit multiple same tasks (with different
* properties). * properties).
*/ */
@ -36,22 +38,22 @@ trait UserTaskStore[F[_]] {
/** Return all tasks of the given user. /** Return all tasks of the given user.
*/ */
def getAll(account: AccountId): Stream[F, UserTask[String]] def getAll(scope: UserTaskScope): Stream[F, UserTask[String]]
/** Return all tasks of the given name and user. The task's arguments /** Return all tasks of the given name and user. The task's arguments
* are returned as stored in the database. * are returned as stored in the database.
*/ */
def getByNameRaw(account: AccountId, name: Ident): Stream[F, UserTask[String]] def getByNameRaw(scope: UserTaskScope, name: Ident): Stream[F, UserTask[String]]
/** Return all tasks of the given name and user. The task's arguments /** Return all tasks of the given name and user. The task's arguments
* are decoded using the given json decoder. * are decoded using the given json decoder.
*/ */
def getByName[A](account: AccountId, name: Ident)(implicit def getByName[A](scope: UserTaskScope, name: Ident)(implicit
D: Decoder[A] D: Decoder[A]
): Stream[F, UserTask[A]] ): Stream[F, UserTask[A]]
/** Return a user-task with the given id. */ /** Return a user-task with the given id. */
def getByIdRaw(account: AccountId, id: Ident): OptionT[F, UserTask[String]] def getByIdRaw(scope: UserTaskScope, id: Ident): OptionT[F, UserTask[String]]
/** Updates or inserts the given task. /** Updates or inserts the given task.
* *
@ -59,23 +61,23 @@ trait UserTaskStore[F[_]] {
* exists, a new one is created. Otherwise the existing task is * exists, a new one is created. Otherwise the existing task is
* updated. * updated.
*/ */
def updateTask[A](account: AccountId, ut: UserTask[A])(implicit E: Encoder[A]): F[Int] def updateTask[A](scope: UserTaskScope, ut: UserTask[A])(implicit E: Encoder[A]): F[Int]
/** Delete the task with the given id of the given user. /** Delete the task with the given id of the given user.
*/ */
def deleteTask(account: AccountId, id: Ident): F[Int] def deleteTask(scope: UserTaskScope, id: Ident): F[Int]
/** Return the task of the given user and name. If multiple exists, an /** Return the task of the given user and name. If multiple exists, an
* error is returned. The task's arguments are returned as stored * error is returned. The task's arguments are returned as stored
* in the database. * in the database.
*/ */
def getOneByNameRaw(account: AccountId, name: Ident): OptionT[F, UserTask[String]] def getOneByNameRaw(scope: UserTaskScope, name: Ident): OptionT[F, UserTask[String]]
/** Return the task of the given user and name. If multiple exists, an /** Return the task of the given user and name. If multiple exists, an
* error is returned. The task's arguments are decoded using the * error is returned. The task's arguments are decoded using the
* given json decoder. * given json decoder.
*/ */
def getOneByName[A](account: AccountId, name: Ident)(implicit def getOneByName[A](scope: UserTaskScope, name: Ident)(implicit
D: Decoder[A] D: Decoder[A]
): OptionT[F, UserTask[A]] ): OptionT[F, UserTask[A]]
@ -90,13 +92,13 @@ trait UserTaskStore[F[_]] {
* the user `account`, they will all be removed and the given task * the user `account`, they will all be removed and the given task
* inserted! * inserted!
*/ */
def updateOneTask[A](account: AccountId, ut: UserTask[A])(implicit def updateOneTask[A](scope: UserTaskScope, ut: UserTask[A])(implicit
E: Encoder[A] E: Encoder[A]
): F[UserTask[String]] ): F[UserTask[String]]
/** Delete all tasks of the given user that have name `name'. /** Delete all tasks of the given user that have name `name'.
*/ */
def deleteAll(account: AccountId, name: Ident): F[Int] def deleteAll(scope: UserTaskScope, name: Ident): F[Int]
} }
object UserTaskStore { object UserTaskStore {
@ -104,47 +106,47 @@ object UserTaskStore {
def apply[F[_]: Async](store: Store[F]): Resource[F, UserTaskStore[F]] = def apply[F[_]: Async](store: Store[F]): Resource[F, UserTaskStore[F]] =
Resource.pure[F, UserTaskStore[F]](new UserTaskStore[F] { Resource.pure[F, UserTaskStore[F]](new UserTaskStore[F] {
def getAll(account: AccountId): Stream[F, UserTask[String]] = def getAll(scope: UserTaskScope): Stream[F, UserTask[String]] =
store.transact(QUserTask.findAll(account)) store.transact(QUserTask.findAll(scope.toAccountId))
def getByNameRaw(account: AccountId, name: Ident): Stream[F, UserTask[String]] = def getByNameRaw(scope: UserTaskScope, name: Ident): Stream[F, UserTask[String]] =
store.transact(QUserTask.findByName(account, name)) store.transact(QUserTask.findByName(scope.toAccountId, name))
def getByIdRaw(account: AccountId, id: Ident): OptionT[F, UserTask[String]] = def getByIdRaw(scope: UserTaskScope, id: Ident): OptionT[F, UserTask[String]] =
OptionT(store.transact(QUserTask.findById(account, id))) OptionT(store.transact(QUserTask.findById(scope.toAccountId, id)))
def getByName[A](account: AccountId, name: Ident)(implicit def getByName[A](scope: UserTaskScope, name: Ident)(implicit
D: Decoder[A] D: Decoder[A]
): Stream[F, UserTask[A]] = ): Stream[F, UserTask[A]] =
getByNameRaw(account, name).flatMap(_.decode match { getByNameRaw(scope, name).flatMap(_.decode match {
case Right(ua) => Stream.emit(ua) case Right(ua) => Stream.emit(ua)
case Left(err) => Stream.raiseError[F](new Exception(err)) case Left(err) => Stream.raiseError[F](new Exception(err))
}) })
def updateTask[A](account: AccountId, ut: UserTask[A])(implicit def updateTask[A](scope: UserTaskScope, ut: UserTask[A])(implicit
E: Encoder[A] E: Encoder[A]
): F[Int] = { ): F[Int] = {
val exists = QUserTask.exists(ut.id) val exists = QUserTask.exists(ut.id)
val insert = QUserTask.insert(account, ut.encode) val insert = QUserTask.insert(scope, ut.encode)
store.add(insert, exists).flatMap { store.add(insert, exists).flatMap {
case AddResult.Success => case AddResult.Success =>
1.pure[F] 1.pure[F]
case AddResult.EntityExists(_) => case AddResult.EntityExists(_) =>
store.transact(QUserTask.update(account, ut.encode)) store.transact(QUserTask.update(scope, ut.encode))
case AddResult.Failure(ex) => case AddResult.Failure(ex) =>
Async[F].raiseError(ex) Async[F].raiseError(ex)
} }
} }
def deleteTask(account: AccountId, id: Ident): F[Int] = def deleteTask(scope: UserTaskScope, id: Ident): F[Int] =
store.transact(QUserTask.delete(account, id)) store.transact(QUserTask.delete(scope.toAccountId, id))
def getOneByNameRaw( def getOneByNameRaw(
account: AccountId, scope: UserTaskScope,
name: Ident name: Ident
): OptionT[F, UserTask[String]] = ): OptionT[F, UserTask[String]] =
OptionT( OptionT(
getByNameRaw(account, name) getByNameRaw(scope, name)
.take(2) .take(2)
.compile .compile
.toList .toList
@ -155,32 +157,34 @@ object UserTaskStore {
} }
) )
def getOneByName[A](account: AccountId, name: Ident)(implicit def getOneByName[A](scope: UserTaskScope, name: Ident)(implicit
D: Decoder[A] D: Decoder[A]
): OptionT[F, UserTask[A]] = ): OptionT[F, UserTask[A]] =
getOneByNameRaw(account, name) getOneByNameRaw(scope, name)
.semiflatMap(_.decode match { .semiflatMap(_.decode match {
case Right(ua) => ua.pure[F] case Right(ua) => ua.pure[F]
case Left(err) => Async[F].raiseError(new Exception(err)) case Left(err) => Async[F].raiseError(new Exception(err))
}) })
def updateOneTask[A](account: AccountId, ut: UserTask[A])(implicit def updateOneTask[A](scope: UserTaskScope, ut: UserTask[A])(implicit
E: Encoder[A] E: Encoder[A]
): F[UserTask[String]] = ): F[UserTask[String]] =
getByNameRaw(account, ut.name).compile.toList.flatMap { getByNameRaw(scope, ut.name).compile.toList.flatMap {
case a :: rest => case a :: rest =>
val task = ut.copy(id = a.id).encode val task = ut.copy(id = a.id).encode
for { for {
_ <- store.transact(QUserTask.update(account, task)) _ <- store.transact(QUserTask.update(scope, task))
_ <- store.transact(rest.traverse(t => QUserTask.delete(account, t.id))) _ <- store.transact(
rest.traverse(t => QUserTask.delete(scope.toAccountId, t.id))
)
} yield task } yield task
case Nil => case Nil =>
val task = ut.encode val task = ut.encode
store.transact(QUserTask.insert(account, task)).map(_ => task) store.transact(QUserTask.insert(scope, task)).map(_ => task)
} }
def deleteAll(account: AccountId, name: Ident): F[Int] = def deleteAll(scope: UserTaskScope, name: Ident): F[Int] =
store.transact(QUserTask.deleteAll(account, name)) store.transact(QUserTask.deleteAll(scope.toAccountId, name))
}) })
} }