Convert periodic tasks

This commit is contained in:
Eike Kettner 2020-12-11 23:15:18 +01:00
parent 3cef932ccd
commit 1aa1f4367e
7 changed files with 126 additions and 110 deletions

View File

@ -20,6 +20,8 @@ object Condition {
val P: Put[A] val P: Put[A]
) extends Condition ) extends Condition
case class IsNull(col: Column[_]) extends Condition
case class And(c: Condition, cs: Vector[Condition]) extends Condition case class And(c: Condition, cs: Vector[Condition]) extends Condition
case class Or(c: Condition, cs: Vector[Condition]) extends Condition case class Or(c: Condition, cs: Vector[Condition]) extends Condition
case class Not(c: Condition) extends Condition case class Not(c: Condition) extends Condition

View File

@ -94,14 +94,12 @@ trait DSL extends DoobieMeta {
def ===(value: A)(implicit P: Put[A]): Condition = def ===(value: A)(implicit P: Put[A]): Condition =
Condition.CompareVal(col, Operator.Eq, value) Condition.CompareVal(col, Operator.Eq, value)
//TODO find some better way around the cast
def ====(value: String): Condition = def ====(value: String): Condition =
Condition.CompareVal(col.asInstanceOf[Column[String]], Operator.Eq, value) Condition.CompareVal(col.asInstanceOf[Column[String]], Operator.Eq, value)
def like(value: A)(implicit P: Put[A]): Condition = def like(value: A)(implicit P: Put[A]): Condition =
Condition.CompareVal(col, Operator.LowerLike, value) Condition.CompareVal(col, Operator.LowerLike, value)
//TODO find some better way around the cast
def likes(value: String): Condition = def likes(value: String): Condition =
Condition.CompareVal(col.asInstanceOf[Column[String]], Operator.LowerLike, value) Condition.CompareVal(col.asInstanceOf[Column[String]], Operator.LowerLike, value)
@ -117,6 +115,9 @@ trait DSL extends DoobieMeta {
def <(value: A)(implicit P: Put[A]): Condition = def <(value: A)(implicit P: Put[A]): Condition =
Condition.CompareVal(col, Operator.Lt, value) Condition.CompareVal(col, Operator.Lt, value)
def <>(value: A)(implicit P: Put[A]): Condition =
Condition.CompareVal(col, Operator.Neq, value)
def in(subsel: Select): Condition = def in(subsel: Select): Condition =
Condition.InSubSelect(col, subsel) Condition.InSubSelect(col, subsel)
@ -126,6 +127,9 @@ trait DSL extends DoobieMeta {
def inLower(values: NonEmptyList[A])(implicit P: Put[A]): Condition = def inLower(values: NonEmptyList[A])(implicit P: Put[A]): Condition =
Condition.InValues(col, values, true) Condition.InValues(col, values, true)
def isNull: Condition =
Condition.IsNull(col)
def ===(other: Column[A]): Condition = def ===(other: Column[A]): Condition =
Condition.CompareCol(col, Operator.Eq, other) Condition.CompareCol(col, Operator.Eq, other)
} }

View File

@ -5,6 +5,7 @@ sealed trait Operator
object Operator { object Operator {
case object Eq extends Operator case object Eq extends Operator
case object Neq extends Operator
case object Gt extends Operator case object Gt extends Operator
case object Lt extends Operator case object Lt extends Operator
case object Gte extends Operator case object Gte extends Operator

View File

@ -44,6 +44,9 @@ object ConditionBuilder {
.map(a => buildValue(a)(c.P)) .map(a => buildValue(a)(c.P))
.reduce(_ ++ comma ++ _) ++ sql")" .reduce(_ ++ comma ++ _) ++ sql")"
case Condition.IsNull(col) =>
SelectExprBuilder.column(col) ++ fr" is null"
case Condition.And(c, cs) => case Condition.And(c, cs) =>
val inner = cs.prepended(c).map(build).reduce(_ ++ and ++ _) val inner = cs.prepended(c).map(build).reduce(_ ++ and ++ _)
if (cs.isEmpty) inner if (cs.isEmpty) inner
@ -54,6 +57,9 @@ object ConditionBuilder {
if (cs.isEmpty) inner if (cs.isEmpty) inner
else parenOpen ++ inner ++ parenClose else parenOpen ++ inner ++ parenClose
case Condition.Not(Condition.IsNull(col)) =>
SelectExprBuilder.column(col) ++ fr" is not null"
case Condition.Not(c) => case Condition.Not(c) =>
fr"NOT" ++ build(c) fr"NOT" ++ build(c)
} }
@ -62,6 +68,8 @@ object ConditionBuilder {
op match { op match {
case Operator.Eq => case Operator.Eq =>
fr" =" fr" ="
case Operator.Neq =>
fr" <>"
case Operator.Gt => case Operator.Gt =>
fr" >" fr" >"
case Operator.Lt => case Operator.Lt =>

View File

@ -1,7 +1,8 @@
package docspell.store.queries package docspell.store.queries
import docspell.common._ import docspell.common._
import docspell.store.impl.Implicits._ import docspell.store.qb.DSL._
import docspell.store.qb._
import docspell.store.records._ import docspell.store.records._
import doobie._ import doobie._
@ -9,47 +10,47 @@ import doobie.implicits._
object QPeriodicTask { object QPeriodicTask {
def clearWorkers(name: Ident): ConnectionIO[Int] = { private val RT = RPeriodicTask.T
val worker = RPeriodicTask.Columns.worker
updateRow(RPeriodicTask.table, worker.is(name), worker.setTo[Ident](None)).update.run
}
def setWorker(pid: Ident, name: Ident, ts: Timestamp): ConnectionIO[Int] = { def clearWorkers(name: Ident): ConnectionIO[Int] =
val id = RPeriodicTask.Columns.id DML.update(
val worker = RPeriodicTask.Columns.worker RT,
val marked = RPeriodicTask.Columns.marked RT.worker === name,
updateRow( DML.set(RT.worker.setTo(None: Option[Ident]))
RPeriodicTask.table, )
and(id.is(pid), worker.isNull),
commas(worker.setTo(name), marked.setTo(ts)) def setWorker(pid: Ident, name: Ident, ts: Timestamp): ConnectionIO[Int] =
).update.run DML
} .update(
RT,
RT.id === pid && RT.worker.isNull,
DML.set(
RT.worker.setTo(name),
RT.marked.setTo(ts)
)
)
def unsetWorker( def unsetWorker(
pid: Ident, pid: Ident,
nextRun: Option[Timestamp] nextRun: Option[Timestamp]
): ConnectionIO[Int] = { ): ConnectionIO[Int] =
val id = RPeriodicTask.Columns.id DML.update(
val worker = RPeriodicTask.Columns.worker RT,
val next = RPeriodicTask.Columns.nextrun RT.id === pid,
updateRow( DML.set(
RPeriodicTask.table, RT.worker.setTo(None),
id.is(pid), RT.nextrun.setTo(nextRun)
commas(worker.setTo[Ident](None), next.setTo(nextRun)) )
).update.run )
}
def findNext(excl: Option[Ident]): ConnectionIO[Option[RPeriodicTask]] = { def findNext(excl: Option[Ident]): ConnectionIO[Option[RPeriodicTask]] = {
val enabled = RPeriodicTask.Columns.enabled
val pid = RPeriodicTask.Columns.id
val order = orderBy(RPeriodicTask.Columns.nextrun.f) ++ fr"ASC"
val where = excl match { val where = excl match {
case Some(id) => and(pid.isNot(id), enabled.is(true)) case Some(id) => RT.id <> id && RT.enabled === true
case None => enabled.is(true) case None => RT.enabled === true
} }
val sql = val sql =
selectSimple(RPeriodicTask.Columns.all, RPeriodicTask.table, where) ++ order Select(select(RT.all), from(RT), where).orderBy(RT.nextrun.asc).run
sql.query[RPeriodicTask].streamWithChunkSize(2).take(1).compile.last sql.query[RPeriodicTask].streamWithChunkSize(2).take(1).compile.last
} }
} }

View File

@ -3,33 +3,34 @@ package docspell.store.queries
import fs2._ import fs2._
import docspell.common._ import docspell.common._
import docspell.store.impl.Implicits._ import docspell.store.qb.DSL._
import docspell.store.qb._
import docspell.store.records._ import docspell.store.records._
import docspell.store.usertask.UserTask import docspell.store.usertask.UserTask
import doobie._ import doobie._
object QUserTask { object QUserTask {
private val cols = RPeriodicTask.Columns private val RT = RPeriodicTask.T
def findAll(account: AccountId): Stream[ConnectionIO, UserTask[String]] = def findAll(account: AccountId): Stream[ConnectionIO, UserTask[String]] =
selectSimple( run(
RPeriodicTask.Columns.all, select(RT.all),
RPeriodicTask.table, from(RT),
and(cols.group.is(account.collective), cols.submitter.is(account.user)) RT.group === account.collective && RT.submitter === account.user
).query[RPeriodicTask].stream.map(makeUserTask) ).query[RPeriodicTask].stream.map(makeUserTask)
def findByName( def findByName(
account: AccountId, account: AccountId,
name: Ident name: Ident
): Stream[ConnectionIO, UserTask[String]] = ): Stream[ConnectionIO, UserTask[String]] =
selectSimple( run(
RPeriodicTask.Columns.all, select(RT.all),
RPeriodicTask.table, from(RT),
and( where(
cols.group.is(account.collective), RT.group === account.collective,
cols.submitter.is(account.user), RT.submitter === account.user,
cols.task.is(name) RT.task === name
) )
).query[RPeriodicTask].stream.map(makeUserTask) ).query[RPeriodicTask].stream.map(makeUserTask)
@ -37,13 +38,13 @@ object QUserTask {
account: AccountId, account: AccountId,
id: Ident id: Ident
): ConnectionIO[Option[UserTask[String]]] = ): ConnectionIO[Option[UserTask[String]]] =
selectSimple( run(
RPeriodicTask.Columns.all, select(RT.all),
RPeriodicTask.table, from(RT),
and( where(
cols.group.is(account.collective), RT.group === account.collective,
cols.submitter.is(account.user), RT.submitter === account.user,
cols.id.is(id) RT.id === id
) )
).query[RPeriodicTask].option.map(_.map(makeUserTask)) ).query[RPeriodicTask].option.map(_.map(makeUserTask))
@ -63,24 +64,25 @@ object QUserTask {
RPeriodicTask.exists(id) RPeriodicTask.exists(id)
def delete(account: AccountId, id: Ident): ConnectionIO[Int] = def delete(account: AccountId, id: Ident): ConnectionIO[Int] =
deleteFrom( DML
RPeriodicTask.table, .delete(
and( RT,
cols.group.is(account.collective), where(
cols.submitter.is(account.user), RT.group === account.collective,
cols.id.is(id) RT.submitter === account.user,
RT.id === id
)
) )
).update.run
def deleteAll(account: AccountId, name: Ident): ConnectionIO[Int] = def deleteAll(account: AccountId, name: Ident): ConnectionIO[Int] =
deleteFrom( DML.delete(
RPeriodicTask.table, RT,
and( where(
cols.group.is(account.collective), RT.group === account.collective,
cols.submitter.is(account.user), RT.submitter === account.user,
cols.task.is(name) RT.task === name
) )
).update.run )
def makeUserTask(r: RPeriodicTask): UserTask[String] = def makeUserTask(r: RPeriodicTask): UserTask[String] =
UserTask(r.id, r.task, r.enabled, r.timer, r.args) UserTask(r.id, r.task, r.enabled, r.timer, r.args)

View File

@ -4,8 +4,8 @@ import cats.effect._
import cats.implicits._ import cats.implicits._
import docspell.common._ import docspell.common._
import docspell.store.impl.Column import docspell.store.qb.DSL._
import docspell.store.impl.Implicits._ import docspell.store.qb._
import com.github.eikek.calev.CalEvent import com.github.eikek.calev.CalEvent
import doobie._ import doobie._
@ -107,22 +107,22 @@ object RPeriodicTask {
)(implicit E: Encoder[A]): F[RPeriodicTask] = )(implicit E: Encoder[A]): F[RPeriodicTask] =
create[F](enabled, task, group, E(args).noSpaces, subject, submitter, priority, timer) create[F](enabled, task, group, E(args).noSpaces, subject, submitter, priority, timer)
val table = fr"periodic_task" final case class Table(alias: Option[String]) extends TableDef {
val tableName = "periodic_task"
object Columns { val id = Column[Ident]("id", this)
val id = Column("id") val enabled = Column[Boolean]("enabled", this)
val enabled = Column("enabled") val task = Column[Ident]("task", this)
val task = Column("task") val group = Column[Ident]("group_", this)
val group = Column("group_") val args = Column[String]("args", this)
val args = Column("args") val subject = Column[String]("subject", this)
val subject = Column("subject") val submitter = Column[Ident]("submitter", this)
val submitter = Column("submitter") val priority = Column[Priority]("priority", this)
val priority = Column("priority") val worker = Column[Ident]("worker", this)
val worker = Column("worker") val marked = Column[Timestamp]("marked", this)
val marked = Column("marked") val timer = Column[CalEvent]("timer", this)
val timer = Column("timer") val nextrun = Column[Timestamp]("nextrun", this)
val nextrun = Column("nextrun") val created = Column[Timestamp]("created", this)
val created = Column("created")
val all = List( val all = List(
id, id,
enabled, enabled,
@ -140,39 +140,37 @@ object RPeriodicTask {
) )
} }
import Columns._ val T = Table(None)
def as(alias: String): Table =
Table(Some(alias))
def insert(v: RPeriodicTask): ConnectionIO[Int] = { def insert(v: RPeriodicTask): ConnectionIO[Int] =
val sql = insertRow( DML.insert(
table, T,
all, T.all,
fr"${v.id},${v.enabled},${v.task},${v.group},${v.args}," ++ fr"${v.id},${v.enabled},${v.task},${v.group},${v.args}," ++
fr"${v.subject},${v.submitter},${v.priority},${v.worker}," ++ fr"${v.subject},${v.submitter},${v.priority},${v.worker}," ++
fr"${v.marked},${v.timer},${v.nextrun},${v.created}" fr"${v.marked},${v.timer},${v.nextrun},${v.created}"
) )
sql.update.run
}
def update(v: RPeriodicTask): ConnectionIO[Int] = { def update(v: RPeriodicTask): ConnectionIO[Int] =
val sql = updateRow( DML.update(
table, T,
id.is(v.id), T.id === v.id,
commas( DML.set(
enabled.setTo(v.enabled), T.enabled.setTo(v.enabled),
group.setTo(v.group), T.group.setTo(v.group),
args.setTo(v.args), T.args.setTo(v.args),
subject.setTo(v.subject), T.subject.setTo(v.subject),
submitter.setTo(v.submitter), T.submitter.setTo(v.submitter),
priority.setTo(v.priority), T.priority.setTo(v.priority),
worker.setTo(v.worker), T.worker.setTo(v.worker),
marked.setTo(v.marked), T.marked.setTo(v.marked),
timer.setTo(v.timer), T.timer.setTo(v.timer),
nextrun.setTo(v.nextrun) T.nextrun.setTo(v.nextrun)
) )
) )
sql.update.run
}
def exists(pid: Ident): ConnectionIO[Boolean] = def exists(pid: Ident): ConnectionIO[Boolean] =
selectCount(id, table, id.is(pid)).query[Int].unique.map(_ > 0) run(select(count(T.id)), from(T), T.id === pid).query[Int].unique.map(_ > 0)
} }