Move user task to scheduler module

This commit is contained in:
eikek
2022-03-12 15:45:21 +01:00
parent 83d3644b39
commit 69765f05ff
15 changed files with 87 additions and 116 deletions

View File

@ -1,104 +0,0 @@
/*
* Copyright 2020 Eike K. & Contributors
*
* SPDX-License-Identifier: AGPL-3.0-or-later
*/
package docspell.store.queries
import fs2._
import docspell.common._
import docspell.store.qb.DSL._
import docspell.store.qb._
import docspell.store.records._
import docspell.store.usertask.{UserTask, UserTaskScope}
import doobie._
object QUserTask {
private val RT = RPeriodicTask.T
def findAll(account: AccountId): Stream[ConnectionIO, UserTask[String]] =
run(
select(RT.all),
from(RT),
RT.group === account.collective && RT.submitter === account.user
).query[RPeriodicTask].stream.map(makeUserTask)
def findByName(
account: AccountId,
name: Ident
): Stream[ConnectionIO, UserTask[String]] =
run(
select(RT.all),
from(RT),
where(
RT.group === account.collective,
RT.submitter === account.user,
RT.task === name
)
).query[RPeriodicTask].stream.map(makeUserTask)
def findById(
account: AccountId,
id: Ident
): ConnectionIO[Option[UserTask[String]]] =
run(
select(RT.all),
from(RT),
where(
RT.group === account.collective,
RT.submitter === account.user,
RT.id === id
)
).query[RPeriodicTask].option.map(_.map(makeUserTask))
def insert(
scope: UserTaskScope,
subject: Option[String],
task: UserTask[String]
): ConnectionIO[Int] =
for {
r <- task.toPeriodicTask[ConnectionIO](scope, subject)
n <- RPeriodicTask.insert(r)
} yield n
def update(
scope: UserTaskScope,
subject: Option[String],
task: UserTask[String]
): ConnectionIO[Int] =
for {
r <- task.toPeriodicTask[ConnectionIO](scope, subject)
n <- RPeriodicTask.update(r)
} yield n
def exists(id: Ident): ConnectionIO[Boolean] =
RPeriodicTask.exists(id)
def delete(account: AccountId, id: Ident): ConnectionIO[Int] =
DML
.delete(
RT,
where(
RT.group === account.collective,
RT.submitter === account.user,
RT.id === id
)
)
def deleteAll(account: AccountId, name: Ident): ConnectionIO[Int] =
DML.delete(
RT,
where(
RT.group === account.collective,
RT.submitter === account.user,
RT.task === name
)
)
def makeUserTask(r: RPeriodicTask): UserTask[String] =
UserTask(r.id, r.task, r.enabled, r.timer, r.summary, r.args)
}

View File

@ -9,16 +9,12 @@ package docspell.store.records
import cats.data.NonEmptyList
import cats.effect._
import cats.implicits._
import docspell.common._
import docspell.store.qb.DSL._
import docspell.store.qb._
import docspell.store.usertask.UserTaskScope
import com.github.eikek.calev.CalEvent
import doobie._
import doobie.implicits._
import io.circe.Encoder
/** A periodic task is a special job description, that shares a few properties of a
* `RJob`. It must provide all information to create a `RJob` value eventually.
@ -65,66 +61,6 @@ case class RPeriodicTask(
object RPeriodicTask {
def create[F[_]: Sync](
enabled: Boolean,
scope: UserTaskScope,
task: Ident,
args: String,
subject: String,
priority: Priority,
timer: CalEvent,
summary: Option[String]
): F[RPeriodicTask] =
Ident
.randomId[F]
.flatMap(id =>
Timestamp
.current[F]
.map { now =>
RPeriodicTask(
id,
enabled,
task,
scope.collective,
args,
subject,
scope.fold(_.user, identity),
priority,
None,
None,
timer,
timer
.nextElapse(now.atZone(Timestamp.UTC))
.map(_.toInstant)
.map(Timestamp.apply)
.getOrElse(Timestamp.Epoch),
now,
summary
)
}
)
def createJson[F[_]: Sync, A](
enabled: Boolean,
scope: UserTaskScope,
task: Ident,
args: A,
subject: String,
priority: Priority,
timer: CalEvent,
summary: Option[String]
)(implicit E: Encoder[A]): F[RPeriodicTask] =
create[F](
enabled,
scope,
task,
E(args).noSpaces,
subject,
priority,
timer,
summary
)
final case class Table(alias: Option[String]) extends TableDef {
val tableName = "periodic_task"

View File

@ -1,67 +0,0 @@
/*
* Copyright 2020 Eike K. & Contributors
*
* SPDX-License-Identifier: AGPL-3.0-or-later
*/
package docspell.store.usertask
import cats.effect._
import cats.implicits._
import docspell.common._
import docspell.common.syntax.all._
import docspell.store.records.RPeriodicTask
import com.github.eikek.calev.CalEvent
import io.circe.Decoder
import io.circe.Encoder
case class UserTask[A](
id: Ident,
name: Ident,
enabled: Boolean,
timer: CalEvent,
summary: Option[String],
args: A
) {
def encode(implicit E: Encoder[A]): UserTask[String] =
copy(args = E(args).noSpaces)
def withArgs[B](newArgs: B): UserTask[B] =
copy(args = newArgs)
def mapArgs[B](f: A => B): UserTask[B] =
withArgs(f(args))
}
object UserTask {
implicit final class UserTaskCodec(ut: UserTask[String]) {
def decode[A](implicit D: Decoder[A]): Either[String, UserTask[A]] =
ut.args
.parseJsonAs[A]
.left
.map(_.getMessage)
.map(a => ut.copy(args = a))
def toPeriodicTask[F[_]: Sync](
scope: UserTaskScope,
subject: Option[String]
): F[RPeriodicTask] =
RPeriodicTask
.create[F](
ut.enabled,
scope,
ut.name,
ut.args,
subject.getOrElse(s"${scope.fold(_.user.id, _.id)}: ${ut.name.id}"),
Priority.Low,
ut.timer,
ut.summary
)
.map(r => r.copy(id = ut.id))
}
}

View File

@ -1,52 +0,0 @@
/*
* Copyright 2020 Eike K. & Contributors
*
* SPDX-License-Identifier: AGPL-3.0-or-later
*/
package docspell.store.usertask
import docspell.common._
sealed trait UserTaskScope { self: Product =>
def name: String =
productPrefix.toLowerCase
def collective: Ident
def fold[A](fa: AccountId => A, fb: Ident => A): A
/** Maps to the account or uses the collective for both parts if the scope is collective
* wide.
*/
private[usertask] def toAccountId: AccountId =
AccountId(collective, fold(_.user, identity))
}
object UserTaskScope {
final case class Account(account: AccountId) extends UserTaskScope {
val collective = account.collective
def fold[A](fa: AccountId => A, fb: Ident => A): A =
fa(account)
}
final case class Collective(collective: Ident) extends UserTaskScope {
def fold[A](fa: AccountId => A, fb: Ident => A): A =
fb(collective)
}
def collective(id: Ident): UserTaskScope =
Collective(id)
def account(accountId: AccountId): UserTaskScope =
Account(accountId)
def apply(accountId: AccountId): UserTaskScope =
UserTaskScope.account(accountId)
def apply(collective: Ident): UserTaskScope =
UserTaskScope.collective(collective)
}

View File

@ -1,186 +0,0 @@
/*
* Copyright 2020 Eike K. & Contributors
*
* SPDX-License-Identifier: AGPL-3.0-or-later
*/
package docspell.store.usertask
import cats.data.OptionT
import cats.effect._
import cats.implicits._
import fs2.Stream
import docspell.common._
import docspell.store.queries.QUserTask
import docspell.store.{AddResult, Store}
import io.circe._
/** User tasks are `RPeriodicTask`s that can be managed by the user. The user can change
* arguments, enable/disable it or run it just once.
*
* This class defines methods at a higher level, dealing with `UserTask` and
* `UserTaskScope` instead of directly using `RPeriodicTask`. A user task is associated
* to a specific user (not just the collective). But it can be associated to the whole
* collective by using the collective as submitter, too. This is abstracted in
* `UserTaskScope`.
*
* implNote: The mapping is as follows: The collective is the task group. The submitter
* property contains the username. Once a task is saved to the database, it can only be
* referenced uniquely by its id. A user may submit multiple same tasks (with different
* properties).
*/
trait UserTaskStore[F[_]] {
/** Return all tasks of the given user. */
def getAll(scope: UserTaskScope): Stream[F, UserTask[String]]
/** Return all tasks of the given name and user. The task's arguments are returned as
* stored in the database.
*/
def getByNameRaw(scope: UserTaskScope, name: Ident): Stream[F, UserTask[String]]
/** Return all tasks of the given name and user. The task's arguments are decoded using
* the given json decoder.
*/
def getByName[A](scope: UserTaskScope, name: Ident)(implicit
D: Decoder[A]
): Stream[F, UserTask[A]]
/** Return a user-task with the given id. */
def getByIdRaw(scope: UserTaskScope, id: Ident): OptionT[F, UserTask[String]]
/** Updates or inserts the given task.
*
* The task is identified by its id. If no task with this id exists, a new one is
* created. Otherwise the existing task is updated.
*/
def updateTask[A](scope: UserTaskScope, subject: Option[String], ut: UserTask[A])(
implicit E: Encoder[A]
): F[Int]
/** Delete the task with the given id of the given user. */
def deleteTask(scope: UserTaskScope, id: Ident): F[Int]
/** Return the task of the given user and name. If multiple exists, an error is
* returned. The task's arguments are returned as stored in the database.
*/
def getOneByNameRaw(scope: UserTaskScope, name: Ident): OptionT[F, UserTask[String]]
/** Return the task of the given user and name. If multiple exists, an error is
* returned. The task's arguments are decoded using the given json decoder.
*/
def getOneByName[A](scope: UserTaskScope, name: Ident)(implicit
D: Decoder[A]
): OptionT[F, UserTask[A]]
/** Updates or inserts the given task.
*
* Unlike `updateTask`, this ensures that there is at most one task of some name in the
* db. Multiple same tasks (task with same name) may not be allowed to run, depending
* on what they do. This is not ensured by the database, though. The task is identified
* by task name, submitter and group.
*
* If there are currently multiple tasks with same name as `ut` for the user `account`,
* they will all be removed and the given task inserted!
*/
def updateOneTask[A](scope: UserTaskScope, subject: Option[String], ut: UserTask[A])(
implicit E: Encoder[A]
): F[UserTask[String]]
/** Delete all tasks of the given user that have name `name`. */
def deleteAll(scope: UserTaskScope, name: Ident): F[Int]
}
object UserTaskStore {
def apply[F[_]: Async](store: Store[F]): Resource[F, UserTaskStore[F]] =
Resource.pure[F, UserTaskStore[F]](new UserTaskStore[F] {
def getAll(scope: UserTaskScope): Stream[F, UserTask[String]] =
store.transact(QUserTask.findAll(scope.toAccountId))
def getByNameRaw(scope: UserTaskScope, name: Ident): Stream[F, UserTask[String]] =
store.transact(QUserTask.findByName(scope.toAccountId, name))
def getByIdRaw(scope: UserTaskScope, id: Ident): OptionT[F, UserTask[String]] =
OptionT(store.transact(QUserTask.findById(scope.toAccountId, id)))
def getByName[A](scope: UserTaskScope, name: Ident)(implicit
D: Decoder[A]
): Stream[F, UserTask[A]] =
getByNameRaw(scope, name).flatMap(_.decode match {
case Right(ua) => Stream.emit(ua)
case Left(err) => Stream.raiseError[F](new Exception(err))
})
def updateTask[A](scope: UserTaskScope, subject: Option[String], ut: UserTask[A])(
implicit E: Encoder[A]
): F[Int] = {
val exists = QUserTask.exists(ut.id)
val insert = QUserTask.insert(scope, subject, ut.encode)
store.add(insert, exists).flatMap {
case AddResult.Success =>
1.pure[F]
case AddResult.EntityExists(_) =>
store.transact(QUserTask.update(scope, subject, ut.encode))
case AddResult.Failure(ex) =>
Async[F].raiseError(ex)
}
}
def deleteTask(scope: UserTaskScope, id: Ident): F[Int] =
store.transact(QUserTask.delete(scope.toAccountId, id))
def getOneByNameRaw(
scope: UserTaskScope,
name: Ident
): OptionT[F, UserTask[String]] =
OptionT(
getByNameRaw(scope, name)
.take(2)
.compile
.toList
.flatMap {
case Nil => (None: Option[UserTask[String]]).pure[F]
case ut :: Nil => ut.some.pure[F]
case _ => Async[F].raiseError(new Exception("More than one result found"))
}
)
def getOneByName[A](scope: UserTaskScope, name: Ident)(implicit
D: Decoder[A]
): OptionT[F, UserTask[A]] =
getOneByNameRaw(scope, name)
.semiflatMap(_.decode match {
case Right(ua) => ua.pure[F]
case Left(err) => Async[F].raiseError(new Exception(err))
})
def updateOneTask[A](
scope: UserTaskScope,
subject: Option[String],
ut: UserTask[A]
)(implicit
E: Encoder[A]
): F[UserTask[String]] =
getByNameRaw(scope, ut.name).compile.toList.flatMap {
case a :: rest =>
val task = ut.copy(id = a.id).encode
for {
_ <- store.transact(QUserTask.update(scope, subject, task))
_ <- store.transact(
rest.traverse(t => QUserTask.delete(scope.toAccountId, t.id))
)
} yield task
case Nil =>
val task = ut.encode
store.transact(QUserTask.insert(scope, subject, task)).map(_ => task)
}
def deleteAll(scope: UserTaskScope, name: Ident): F[Int] =
store.transact(QUserTask.deleteAll(scope.toAccountId, name))
})
}