Server-side stub impl for notify-due-items

This commit is contained in:
Eike Kettner
2020-04-19 20:31:28 +02:00
parent e97e0db45c
commit ad772c0c25
16 changed files with 562 additions and 37 deletions

View File

@ -0,0 +1,89 @@
package docspell.store.queries
import fs2._
import docspell.common._
import docspell.store.impl.Implicits._
import docspell.store.records._
import docspell.store.usertask.UserTask
import doobie._
object QUserTask {
private val cols = RPeriodicTask.Columns
def findAll(account: AccountId): Stream[ConnectionIO, UserTask[String]] =
selectSimple(
RPeriodicTask.Columns.all,
RPeriodicTask.table,
and(cols.group.is(account.collective), cols.submitter.is(account.user))
).query[RPeriodicTask].stream.map(makeUserTask)
def findByName(
account: AccountId,
name: Ident
): Stream[ConnectionIO, UserTask[String]] =
selectSimple(
RPeriodicTask.Columns.all,
RPeriodicTask.table,
and(
cols.group.is(account.collective),
cols.submitter.is(account.user),
cols.task.is(name)
)
).query[RPeriodicTask].stream.map(makeUserTask)
def insert(account: AccountId, task: UserTask[String]): ConnectionIO[Int] =
for {
r <- makePeriodicTask(account, task)
n <- RPeriodicTask.insert(r)
} yield n
def update(account: AccountId, task: UserTask[String]): ConnectionIO[Int] =
for {
r <- makePeriodicTask(account, task)
n <- RPeriodicTask.update(r)
} yield n
def exists(id: Ident): ConnectionIO[Boolean] =
RPeriodicTask.exists(id)
def delete(account: AccountId, id: Ident): ConnectionIO[Int] =
deleteFrom(
RPeriodicTask.table,
and(
cols.group.is(account.collective),
cols.submitter.is(account.user),
cols.id.is(id)
)
).update.run
def deleteAll(account: AccountId, name: Ident): ConnectionIO[Int] =
deleteFrom(
RPeriodicTask.table,
and(
cols.group.is(account.collective),
cols.submitter.is(account.user),
cols.task.is(name)
)
).update.run
def makeUserTask(r: RPeriodicTask): UserTask[String] =
UserTask(r.id, r.task, r.enabled, r.timer, r.args)
def makePeriodicTask(
account: AccountId,
t: UserTask[String]
): ConnectionIO[RPeriodicTask] =
RPeriodicTask
.create[ConnectionIO](
t.enabled,
t.name,
account.collective,
t.args,
s"${account.user.id}: ${t.name.id}",
account.user,
Priority.Low,
t.timer
)
.map(r => r.copy(id = t.id))
}

View File

@ -0,0 +1,33 @@
package docspell.store.usertask
import com.github.eikek.calev.CalEvent
import io.circe.Decoder
import io.circe.Encoder
import docspell.common._
import docspell.common.syntax.all._
case class UserTask[A](
id: Ident,
name: Ident,
enabled: Boolean,
timer: CalEvent,
args: A
) {
def encode(implicit E: Encoder[A]): UserTask[String] =
copy(args = E(args).noSpaces)
}
object UserTask {
implicit final class UserTaskCodec(ut: UserTask[String]) {
def decode[A](implicit D: Decoder[A]): Either[String, UserTask[A]] =
ut.args.parseJsonAs[A]
.left.map(_.getMessage)
.map(a => ut.copy(args = a))
}
}

View File

@ -0,0 +1,173 @@
package docspell.store.usertask
import fs2._
import cats.implicits._
import cats.effect._
import cats.data.OptionT
import _root_.io.circe._
import docspell.common._
import docspell.store.{AddResult, Store}
import docspell.store.queries.QUserTask
/** User tasks are `RPeriodicTask`s that can be managed by the user.
* The user can change arguments, enable/disable it or run it just
* once.
*
* This class defines methods at a higher level, dealing with
* `UserTask` and `AccountId` instead of directly using
* `RPeriodicTask`. A user task is associated to a specific user (not
* just the collective).
*
* @implNote: The mapping is as follows: The collective is the task
* group. The submitter property contains the username. Once a task
* is saved to the database, it can only be refernced uniquely by its
* id. A user may submit multiple same tasks (with different
* properties).
*/
trait UserTaskStore[F[_]] {
/** Return all tasks of the given user.
*/
def getAll(account: AccountId): Stream[F, UserTask[String]]
/** Return all tasks of the given name and user. The task's arguments
* are returned as stored in the database.
*/
def getByNameRaw(account: AccountId, name: Ident): Stream[F, UserTask[String]]
/** Return all tasks of the given name and user. The task's arguments
* are decoded using the given json decoder.
*/
def getByName[A](account: AccountId, name: Ident)(
implicit D: Decoder[A]
): Stream[F, UserTask[A]]
/** Updates or inserts the given task.
*
* The task is identified by its id. If no task with this id
* exists, a new one is created. Otherwise the existing task is
* updated. The job executors are notified if a task has been
* enabled.
*/
def updateTask[A](account: AccountId, ut: UserTask[A])(implicit E: Encoder[A]): F[Int]
/** Delete the task with the given id of the given user.
*/
def deleteTask(account: AccountId, id: Ident): F[Int]
/** Return the task of the given user and name. If multiple exists, an
* error is returned. The task's arguments are returned as stored
* in the database.
*/
def getOneByNameRaw(account: AccountId, name: Ident): OptionT[F, UserTask[String]]
/** Return the task of the given user and name. If multiple exists, an
* error is returned. The task's arguments are decoded using the
* given json decoder.
*/
def getOneByName[A](account: AccountId, name: Ident)(
implicit D: Decoder[A]
): OptionT[F, UserTask[A]]
/** Updates or inserts the given task.
*
* Unlike `updateTask`, this ensures that there is at most one task
* of some name in the db. Multiple same tasks (task with same
* name) may not be allowed to run, dependening on what they do.
* This is not ensured by the database, though.
*
* If there are currently mutliple tasks with same name as `ut` for
* the user `account`, they will all be removed and the given task
* inserted!
*/
def updateOneTask[A](account: AccountId, ut: UserTask[A])(
implicit E: Encoder[A]
): F[UserTask[String]]
/** Delete all tasks of the given user that have name `name'.
*/
def deleteAll(account: AccountId, name: Ident): F[Int]
}
object UserTaskStore {
def apply[F[_]: Effect](store: Store[F]): Resource[F, UserTaskStore[F]] =
Resource.pure[F, UserTaskStore[F]](new UserTaskStore[F] {
def getAll(account: AccountId): Stream[F, UserTask[String]] =
store.transact(QUserTask.findAll(account))
def getByNameRaw(account: AccountId, name: Ident): Stream[F, UserTask[String]] =
store.transact(QUserTask.findByName(account, name))
def getByName[A](account: AccountId, name: Ident)(
implicit D: Decoder[A]
): Stream[F, UserTask[A]] =
getByNameRaw(account, name).flatMap(_.decode match {
case Right(ua) => Stream.emit(ua)
case Left(err) => Stream.raiseError[F](new Exception(err))
})
def updateTask[A](account: AccountId, ut: UserTask[A])(
implicit E: Encoder[A]
): F[Int] = {
val exists = QUserTask.exists(ut.id)
val insert = QUserTask.insert(account, ut.encode)
store.add(insert, exists).flatMap {
case AddResult.Success =>
1.pure[F]
case AddResult.EntityExists(_) =>
store.transact(QUserTask.update(account, ut.encode))
case AddResult.Failure(ex) =>
Effect[F].raiseError(ex)
}
}
def deleteTask(account: AccountId, id: Ident): F[Int] =
store.transact(QUserTask.delete(account, id))
def getOneByNameRaw(
account: AccountId,
name: Ident
): OptionT[F, UserTask[String]] =
OptionT(
getByNameRaw(account, name)
.take(2)
.compile
.toList
.flatMap {
case Nil => (None: Option[UserTask[String]]).pure[F]
case ut :: Nil => ut.some.pure[F]
case _ => Effect[F].raiseError(new Exception("More than one result found"))
}
)
def getOneByName[A](account: AccountId, name: Ident)(
implicit D: Decoder[A]
): OptionT[F, UserTask[A]] =
getOneByNameRaw(account, name)
.semiflatMap(_.decode match {
case Right(ua) => ua.pure[F]
case Left(err) => Effect[F].raiseError(new Exception(err))
})
def updateOneTask[A](account: AccountId, ut: UserTask[A])(
implicit E: Encoder[A]
): F[UserTask[String]] =
getByNameRaw(account, ut.name).compile.toList.flatMap {
case a :: rest =>
val task = ut.copy(id = a.id).encode
for {
_ <- store.transact(QUserTask.update(account, task))
_ <- store.transact(rest.traverse(t => QUserTask.delete(account, t.id)))
} yield task
case Nil =>
val task = ut.encode
store.transact(QUserTask.insert(account, task)).map(_ => task)
}
def deleteAll(account: AccountId, name: Ident): F[Int] =
store.transact(QUserTask.deleteAll(account, name))
})
}