From e3f6892abd7b505524f03946ce8a3cdd1448216b Mon Sep 17 00:00:00 2001 From: Eike Kettner Date: Sat, 12 Dec 2020 01:56:06 +0100 Subject: [PATCH] Convert job record --- .../scala/docspell/backend/ops/OJob.scala | 2 +- .../main/scala/docspell/common/JobState.scala | 22 +- .../docspell/joex/analysis/RegexNerFile.scala | 2 +- .../scala/docspell/store/impl/Implicits.scala | 11 +- .../main/scala/docspell/store/qb/Column.scala | 5 +- .../scala/docspell/store/qb/Condition.scala | 4 + .../scala/docspell/store/qb/DBFunction.scala | 38 ++- .../main/scala/docspell/store/qb/DSL.scala | 64 ++++- .../scala/docspell/store/qb/GroupBy.scala | 4 +- .../main/scala/docspell/store/qb/Select.scala | 15 +- .../scala/docspell/store/qb/SelectExpr.scala | 22 +- .../store/qb/impl/CommonBuilder.scala | 20 ++ .../store/qb/impl/ConditionBuilder.scala | 20 +- .../store/qb/impl/DBFunctionBuilder.scala | 40 +++ .../docspell/store/qb/impl/DoobieQuery.scala | 2 + .../store/qb/impl/SelectExprBuilder.scala | 25 +- .../scala/docspell/store/queries/QJob.scala | 133 +++++----- .../scala/docspell/store/records/RJob.scala | 249 +++++++++--------- .../docspell/store/records/RJobGroupUse.scala | 24 +- .../docspell/store/records/RJobLog.scala | 40 +-- 20 files changed, 445 insertions(+), 297 deletions(-) create mode 100644 modules/store/src/main/scala/docspell/store/qb/impl/CommonBuilder.scala create mode 100644 modules/store/src/main/scala/docspell/store/qb/impl/DBFunctionBuilder.scala diff --git a/modules/backend/src/main/scala/docspell/backend/ops/OJob.scala b/modules/backend/src/main/scala/docspell/backend/ops/OJob.scala index 4b83a30a..6809f259 100644 --- a/modules/backend/src/main/scala/docspell/backend/ops/OJob.scala +++ b/modules/backend/src/main/scala/docspell/backend/ops/OJob.scala @@ -39,7 +39,7 @@ object OJob { def queued: Vector[JobDetail] = jobs.filter(r => JobState.queued.contains(r.job.state)) def done: Vector[JobDetail] = - jobs.filter(r => JobState.done.contains(r.job.state)) + jobs.filter(r => JobState.done.toList.contains(r.job.state)) def running: Vector[JobDetail] = jobs.filter(_.job.state == JobState.Running) } diff --git a/modules/common/src/main/scala/docspell/common/JobState.scala b/modules/common/src/main/scala/docspell/common/JobState.scala index 69f0e622..0ab68584 100644 --- a/modules/common/src/main/scala/docspell/common/JobState.scala +++ b/modules/common/src/main/scala/docspell/common/JobState.scala @@ -1,5 +1,7 @@ package docspell.common +import cats.data.NonEmptyList + import io.circe.{Decoder, Encoder} sealed trait JobState { self: Product => @@ -12,8 +14,6 @@ object JobState { /** Waiting for being executed. */ case object Waiting extends JobState {} - def waiting: JobState = Waiting - /** A scheduler has picked up this job and will pass it to the next * free slot. */ @@ -34,10 +34,20 @@ object JobState { /** Finished with success */ case object Success extends JobState {} - val all: Set[JobState] = - Set(Waiting, Scheduled, Running, Stuck, Failed, Cancelled, Success) - val queued: Set[JobState] = Set(Waiting, Scheduled, Stuck) - val done: Set[JobState] = Set(Failed, Cancelled, Success) + val waiting: JobState = Waiting + val stuck: JobState = Stuck + val scheduled: JobState = Scheduled + val running: JobState = Running + val failed: JobState = Failed + val cancelled: JobState = Cancelled + val success: JobState = Success + + val all: NonEmptyList[JobState] = + NonEmptyList.of(Waiting, Scheduled, Running, Stuck, Failed, Cancelled, Success) + val queued: Set[JobState] = Set(Waiting, Scheduled, Stuck) + val done: NonEmptyList[JobState] = NonEmptyList.of(Failed, Cancelled, Success) + val notDone: NonEmptyList[JobState] = //all - done + NonEmptyList.of(Waiting, Scheduled, Running, Stuck) val inProgress: Set[JobState] = Set(Scheduled, Running, Stuck) def parse(str: String): Either[String, JobState] = diff --git a/modules/joex/src/main/scala/docspell/joex/analysis/RegexNerFile.scala b/modules/joex/src/main/scala/docspell/joex/analysis/RegexNerFile.scala index fb5f097c..e5dcce3e 100644 --- a/modules/joex/src/main/scala/docspell/joex/analysis/RegexNerFile.scala +++ b/modules/joex/src/main/scala/docspell/joex/analysis/RegexNerFile.scala @@ -141,7 +141,7 @@ object RegexNerFile { def latestUpdate(collective: Ident): ConnectionIO[Option[Timestamp]] = { def max_(col: Column[_], cidCol: Column[Ident]): Select = - Select(select(max(col).as("t")), from(col.table), cidCol === collective) + Select(List(max(col).as("t")), from(col.table), cidCol === collective) val sql = union( max_(ROrganization.T.updated, ROrganization.T.cid), diff --git a/modules/store/src/main/scala/docspell/store/impl/Implicits.scala b/modules/store/src/main/scala/docspell/store/impl/Implicits.scala index 30cba7ca..2047b301 100644 --- a/modules/store/src/main/scala/docspell/store/impl/Implicits.scala +++ b/modules/store/src/main/scala/docspell/store/impl/Implicits.scala @@ -6,15 +6,10 @@ object Implicits extends DoobieMeta with DoobieSyntax { def oldColumn: Column = Column(col.name) - def column: Column = { - val c = col.alias match { - case Some(a) => oldColumn.as(a) + def column: Column = + col.table.alias match { + case Some(p) => oldColumn.prefix(p) case None => oldColumn } - col.table.alias match { - case Some(p) => c.prefix(p) - case None => c - } - } } } diff --git a/modules/store/src/main/scala/docspell/store/qb/Column.scala b/modules/store/src/main/scala/docspell/store/qb/Column.scala index 90936f90..a8465417 100644 --- a/modules/store/src/main/scala/docspell/store/qb/Column.scala +++ b/modules/store/src/main/scala/docspell/store/qb/Column.scala @@ -1,8 +1,5 @@ package docspell.store.qb -case class Column[A](name: String, table: TableDef, alias: Option[String] = None) { - def as(alias: String): Column[A] = - copy(alias = Some(alias)) -} +case class Column[A](name: String, table: TableDef) object Column {} diff --git a/modules/store/src/main/scala/docspell/store/qb/Condition.scala b/modules/store/src/main/scala/docspell/store/qb/Condition.scala index c1f623cb..4738545a 100644 --- a/modules/store/src/main/scala/docspell/store/qb/Condition.scala +++ b/modules/store/src/main/scala/docspell/store/qb/Condition.scala @@ -12,6 +12,10 @@ object Condition { val P: Put[A] ) extends Condition + case class CompareFVal[A](dbf: DBFunction, op: Operator, value: A)(implicit + val P: Put[A] + ) extends Condition + case class CompareCol[A](col1: Column[A], op: Operator, col2: Column[A]) extends Condition diff --git a/modules/store/src/main/scala/docspell/store/qb/DBFunction.scala b/modules/store/src/main/scala/docspell/store/qb/DBFunction.scala index fbbaac6f..52b50024 100644 --- a/modules/store/src/main/scala/docspell/store/qb/DBFunction.scala +++ b/modules/store/src/main/scala/docspell/store/qb/DBFunction.scala @@ -1,31 +1,27 @@ package docspell.store.qb -sealed trait DBFunction { - def alias: String - - def as(alias: String): DBFunction -} +sealed trait DBFunction {} object DBFunction { - def countAllAs(alias: String) = - CountAll(alias) + val countAll: DBFunction = CountAll - def countAs[A](column: Column[A], alias: String): DBFunction = - Count(column, alias) + def countAs[A](column: Column[A]): DBFunction = + Count(column) - case class CountAll(alias: String) extends DBFunction { - def as(a: String) = - copy(alias = a) - } + case object CountAll extends DBFunction - case class Count(column: Column[_], alias: String) extends DBFunction { - def as(a: String) = - copy(alias = a) - } + case class Count(column: Column[_]) extends DBFunction - case class Max(column: Column[_], alias: String) extends DBFunction { - def as(a: String) = - copy(alias = a) - } + case class Max(column: Column[_]) extends DBFunction + + case class Min(column: Column[_]) extends DBFunction + + case class Coalesce(expr: SelectExpr, exprs: Vector[SelectExpr]) extends DBFunction + + case class Power(expr: SelectExpr, base: Int) extends DBFunction + + case class Plus(expr: SelectExpr, exprs: Vector[SelectExpr]) extends DBFunction + + case class Mult(expr: SelectExpr, exprs: Vector[SelectExpr]) extends DBFunction } diff --git a/modules/store/src/main/scala/docspell/store/qb/DSL.scala b/modules/store/src/main/scala/docspell/store/qb/DSL.scala index e399515b..849305c4 100644 --- a/modules/store/src/main/scala/docspell/store/qb/DSL.scala +++ b/modules/store/src/main/scala/docspell/store/qb/DSL.scala @@ -23,13 +23,13 @@ trait DSL extends DoobieMeta { DoobieQuery.distinct(Select(projection, from, where)) def select(dbf: DBFunction): Seq[SelectExpr] = - Seq(SelectExpr.SelectFun(dbf)) + Seq(SelectExpr.SelectFun(dbf, None)) def select(c: Column[_], cs: Column[_]*): Seq[SelectExpr] = select(c :: cs.toList) def select(seq: Seq[Column[_]], seqs: Seq[Column[_]]*): Seq[SelectExpr] = - (seq ++ seqs.flatten).map(SelectExpr.SelectColumn.apply) + (seq ++ seqs.flatten).map(c => SelectExpr.SelectColumn(c, None)) def union(s1: Select, sn: Select*): Select = Select.Union(s1, sn.toVector) @@ -41,10 +41,28 @@ trait DSL extends DoobieMeta { FromExpr.SubSelect(sel, "x") def count(c: Column[_]): DBFunction = - DBFunction.Count(c, "cn") + DBFunction.Count(c) def max(c: Column[_]): DBFunction = - DBFunction.Max(c, "mn") + DBFunction.Max(c) + + def min(c: Column[_]): DBFunction = + DBFunction.Min(c) + + def coalesce(expr: SelectExpr, more: SelectExpr*): DBFunction.Coalesce = + DBFunction.Coalesce(expr, more.toVector) + + def power(base: Int, expr: SelectExpr): DBFunction = + DBFunction.Power(expr, base) + + def lit[A](value: A)(implicit P: Put[A]): SelectExpr.SelectLit[A] = + SelectExpr.SelectLit(value, None) + + def plus(expr: SelectExpr, more: SelectExpr*): DBFunction = + DBFunction.Plus(expr, more.toVector) + + def mult(expr: SelectExpr, more: SelectExpr*): DBFunction = + DBFunction.Mult(expr, more.toVector) def and(c: Condition, cs: Condition*): Condition = c match { @@ -75,6 +93,8 @@ trait DSL extends DoobieMeta { else and(c, cs: _*) implicit final class ColumnOps[A](col: Column[A]) { + def s: SelectExpr = SelectExpr.SelectColumn(col, None) + def as(alias: String) = SelectExpr.SelectColumn(col, Some(alias)) def setTo(value: A)(implicit P: Put[A]): Setter[A] = Setter.SetValue(col, value) @@ -86,10 +106,10 @@ trait DSL extends DoobieMeta { Setter.Increment(col, amount) def asc: OrderBy = - OrderBy(SelectExpr.SelectColumn(col), OrderBy.OrderType.Asc) + OrderBy(SelectExpr.SelectColumn(col, None), OrderBy.OrderType.Asc) def desc: OrderBy = - OrderBy(SelectExpr.SelectColumn(col), OrderBy.OrderType.Desc) + OrderBy(SelectExpr.SelectColumn(col, None), OrderBy.OrderType.Desc) def ===(value: A)(implicit P: Put[A]): Condition = Condition.CompareVal(col, Operator.Eq, value) @@ -155,6 +175,38 @@ trait DSL extends DoobieMeta { not(c) } + implicit final class DBFunctionOps(dbf: DBFunction) { + def s: SelectExpr = SelectExpr.SelectFun(dbf, None) + def as(alias: String) = SelectExpr.SelectFun(dbf, Some(alias)) + + def ===[A](value: A)(implicit P: Put[A]): Condition = + Condition.CompareFVal(dbf, Operator.Eq, value) + + def ====(value: String): Condition = + Condition.CompareFVal(dbf, Operator.Eq, value) + + def like[A](value: A)(implicit P: Put[A]): Condition = + Condition.CompareFVal(dbf, Operator.LowerLike, value) + + def likes(value: String): Condition = + Condition.CompareFVal(dbf, Operator.LowerLike, value) + + def <=[A](value: A)(implicit P: Put[A]): Condition = + Condition.CompareFVal(dbf, Operator.Lte, value) + + def >=[A](value: A)(implicit P: Put[A]): Condition = + Condition.CompareFVal(dbf, Operator.Gte, value) + + def >[A](value: A)(implicit P: Put[A]): Condition = + Condition.CompareFVal(dbf, Operator.Gt, value) + + def <[A](value: A)(implicit P: Put[A]): Condition = + Condition.CompareFVal(dbf, Operator.Lt, value) + + def <>[A](value: A)(implicit P: Put[A]): Condition = + Condition.CompareFVal(dbf, Operator.Neq, value) + } + } object DSL extends DSL diff --git a/modules/store/src/main/scala/docspell/store/qb/GroupBy.scala b/modules/store/src/main/scala/docspell/store/qb/GroupBy.scala index 51250513..fffa53f9 100644 --- a/modules/store/src/main/scala/docspell/store/qb/GroupBy.scala +++ b/modules/store/src/main/scala/docspell/store/qb/GroupBy.scala @@ -6,8 +6,8 @@ object GroupBy { def apply(c: Column[_], cs: Column[_]*): GroupBy = GroupBy( - SelectExpr.SelectColumn(c), - cs.toVector.map(SelectExpr.SelectColumn.apply), + SelectExpr.SelectColumn(c, None), + cs.toVector.map(c => SelectExpr.SelectColumn(c, None)), None ) } diff --git a/modules/store/src/main/scala/docspell/store/qb/Select.scala b/modules/store/src/main/scala/docspell/store/qb/Select.scala index fd660332..c7076be5 100644 --- a/modules/store/src/main/scala/docspell/store/qb/Select.scala +++ b/modules/store/src/main/scala/docspell/store/qb/Select.scala @@ -11,11 +11,18 @@ sealed trait Select { def run: Fragment = DoobieQuery(this) - def orderBy(ob: OrderBy, obs: OrderBy*): Select = + def orderBy(ob: OrderBy, obs: OrderBy*): Select.Ordered = Select.Ordered(this, ob, obs.toVector) - def orderBy(c: Column[_]): Select = - orderBy(OrderBy(SelectExpr.SelectColumn(c), OrderBy.OrderType.Asc)) + def orderBy(c: Column[_]): Select.Ordered = + orderBy(OrderBy(SelectExpr.SelectColumn(c, None), OrderBy.OrderType.Asc)) + + def limit(n: Int): Select = + this match { + case Select.Limit(q, _) => Select.Limit(q, n) + case _ => + Select.Limit(this, n) + } } object Select { @@ -49,4 +56,6 @@ object Select { case class Ordered(q: Select, orderBy: OrderBy, orderBys: Vector[OrderBy]) extends Select + + case class Limit(q: Select, limit: Int) extends Select } diff --git a/modules/store/src/main/scala/docspell/store/qb/SelectExpr.scala b/modules/store/src/main/scala/docspell/store/qb/SelectExpr.scala index 1ccb3b90..fec6eee4 100644 --- a/modules/store/src/main/scala/docspell/store/qb/SelectExpr.scala +++ b/modules/store/src/main/scala/docspell/store/qb/SelectExpr.scala @@ -1,11 +1,27 @@ package docspell.store.qb -sealed trait SelectExpr +import doobie.Put + +sealed trait SelectExpr { self => + def as(alias: String): SelectExpr +} object SelectExpr { - case class SelectColumn(column: Column[_]) extends SelectExpr + case class SelectColumn(column: Column[_], alias: Option[String]) extends SelectExpr { + def as(a: String): SelectColumn = + copy(alias = Some(a)) + } - case class SelectFun(fun: DBFunction) extends SelectExpr + case class SelectFun(fun: DBFunction, alias: Option[String]) extends SelectExpr { + def as(a: String): SelectFun = + copy(alias = Some(a)) + } + + case class SelectLit[A](value: A, alias: Option[String])(implicit val P: Put[A]) + extends SelectExpr { + def as(a: String): SelectLit[A] = + copy(alias = Some(a)) + } } diff --git a/modules/store/src/main/scala/docspell/store/qb/impl/CommonBuilder.scala b/modules/store/src/main/scala/docspell/store/qb/impl/CommonBuilder.scala new file mode 100644 index 00000000..8b79cfdf --- /dev/null +++ b/modules/store/src/main/scala/docspell/store/qb/impl/CommonBuilder.scala @@ -0,0 +1,20 @@ +package docspell.store.qb.impl + +import docspell.store.qb._ + +import doobie._ +import doobie.implicits._ + +trait CommonBuilder { + def column(col: Column[_]): Fragment = { + val prefix = col.table.alias.getOrElse(col.table.tableName) + if (prefix.isEmpty) columnNoPrefix(col) + else Fragment.const0(prefix) ++ Fragment.const0(".") ++ Fragment.const0(col.name) + } + + def columnNoPrefix(col: Column[_]): Fragment = + Fragment.const0(col.name) + + def appendAs(alias: Option[String]): Fragment = + alias.map(a => fr" AS" ++ Fragment.const(a)).getOrElse(Fragment.empty) +} diff --git a/modules/store/src/main/scala/docspell/store/qb/impl/ConditionBuilder.scala b/modules/store/src/main/scala/docspell/store/qb/impl/ConditionBuilder.scala index 2dfbd8a8..8f56738a 100644 --- a/modules/store/src/main/scala/docspell/store/qb/impl/ConditionBuilder.scala +++ b/modules/store/src/main/scala/docspell/store/qb/impl/ConditionBuilder.scala @@ -25,6 +25,17 @@ object ConditionBuilder { } colFrag ++ opFrag ++ valFrag + case c @ Condition.CompareFVal(dbf, op, value) => + val opFrag = operator(op) + val valFrag = buildValue(value)(c.P) + val dbfFrag = op match { + case Operator.LowerLike => + lower(dbf) + case _ => + DBFunctionBuilder.build(dbf) + } + dbfFrag ++ opFrag ++ valFrag + case Condition.CompareCol(c1, op, c2) => val (c1Frag, c2Frag) = op match { case Operator.LowerLike => @@ -36,13 +47,13 @@ object ConditionBuilder { case Condition.InSubSelect(col, subsel) => val sub = DoobieQuery(subsel) - SelectExprBuilder.column(col) ++ sql" IN (" ++ sub ++ sql")" + SelectExprBuilder.column(col) ++ sql" IN (" ++ sub ++ parenClose case c @ Condition.InValues(col, values, toLower) => val cfrag = if (toLower) lower(col) else SelectExprBuilder.column(col) cfrag ++ sql" IN (" ++ values.toList .map(a => buildValue(a)(c.P)) - .reduce(_ ++ comma ++ _) ++ sql")" + .reduce(_ ++ comma ++ _) ++ parenClose case Condition.IsNull(col) => SelectExprBuilder.column(col) ++ fr" is null" @@ -89,5 +100,8 @@ object ConditionBuilder { fr"$v" def lower(col: Column[_]): Fragment = - Fragment.const0("LOWER(") ++ SelectExprBuilder.column(col) ++ sql")" + Fragment.const0("LOWER(") ++ SelectExprBuilder.column(col) ++ parenClose + + def lower(dbf: DBFunction): Fragment = + Fragment.const0("LOWER(") ++ DBFunctionBuilder.build(dbf) ++ parenClose } diff --git a/modules/store/src/main/scala/docspell/store/qb/impl/DBFunctionBuilder.scala b/modules/store/src/main/scala/docspell/store/qb/impl/DBFunctionBuilder.scala new file mode 100644 index 00000000..cbc48ec8 --- /dev/null +++ b/modules/store/src/main/scala/docspell/store/qb/impl/DBFunctionBuilder.scala @@ -0,0 +1,40 @@ +package docspell.store.qb.impl + +import docspell.store.qb.DBFunction + +import doobie._ +import doobie.implicits._ + +object DBFunctionBuilder extends CommonBuilder { + private val comma = fr"," + + def build(expr: DBFunction): Fragment = + expr match { + case DBFunction.CountAll => + sql"COUNT(*)" + + case DBFunction.Count(col) => + sql"COUNT(" ++ column(col) ++ fr")" + + case DBFunction.Max(col) => + sql"MAX(" ++ column(col) ++ fr")" + + case DBFunction.Min(col) => + sql"MIN(" ++ column(col) ++ fr")" + + case DBFunction.Coalesce(expr, exprs) => + val v = exprs.prepended(expr).map(SelectExprBuilder.build) + sql"COALESCE(" ++ v.reduce(_ ++ comma ++ _) ++ sql")" + + case DBFunction.Power(expr, base) => + sql"POWER($base, " ++ SelectExprBuilder.build(expr) ++ sql")" + + case DBFunction.Plus(expr, more) => + val v = more.prepended(expr).map(SelectExprBuilder.build) + v.reduce(_ ++ fr" +" ++ _) + + case DBFunction.Mult(expr, more) => + val v = more.prepended(expr).map(SelectExprBuilder.build) + v.reduce(_ ++ fr" *" ++ _) + } +} diff --git a/modules/store/src/main/scala/docspell/store/qb/impl/DoobieQuery.scala b/modules/store/src/main/scala/docspell/store/qb/impl/DoobieQuery.scala index e20d9e72..b1a45d70 100644 --- a/modules/store/src/main/scala/docspell/store/qb/impl/DoobieQuery.scala +++ b/modules/store/src/main/scala/docspell/store/qb/impl/DoobieQuery.scala @@ -34,6 +34,8 @@ object DoobieQuery { val order = obs.prepended(ob).map(orderBy).reduce(_ ++ comma ++ _) build(distinct)(q) ++ fr"ORDER BY" ++ order + case Select.Limit(q, n) => + build(distinct)(q) ++ fr" LIMIT $n" } def buildSimple(sq: Select.SimpleSelect): Fragment = { diff --git a/modules/store/src/main/scala/docspell/store/qb/impl/SelectExprBuilder.scala b/modules/store/src/main/scala/docspell/store/qb/impl/SelectExprBuilder.scala index f59d1fe5..b027b704 100644 --- a/modules/store/src/main/scala/docspell/store/qb/impl/SelectExprBuilder.scala +++ b/modules/store/src/main/scala/docspell/store/qb/impl/SelectExprBuilder.scala @@ -2,32 +2,21 @@ package docspell.store.qb.impl import docspell.store.qb._ -import _root_.doobie.implicits._ import _root_.doobie.{Query => _, _} -object SelectExprBuilder { +object SelectExprBuilder extends CommonBuilder { def build(expr: SelectExpr): Fragment = expr match { - case SelectExpr.SelectColumn(col) => - column(col) + case SelectExpr.SelectColumn(col, alias) => + column(col) ++ appendAs(alias) - case SelectExpr.SelectFun(DBFunction.CountAll(alias)) => - sql"COUNT(*) AS" ++ Fragment.const(alias) + case s @ SelectExpr.SelectLit(value, aliasOpt) => + ConditionBuilder.buildValue(value)(s.P) ++ appendAs(aliasOpt) - case SelectExpr.SelectFun(DBFunction.Count(col, alias)) => - sql"COUNT(" ++ column(col) ++ fr") AS" ++ Fragment.const(alias) + case SelectExpr.SelectFun(fun, alias) => + DBFunctionBuilder.build(fun) ++ appendAs(alias) - case SelectExpr.SelectFun(DBFunction.Max(col, alias)) => - sql"MAX(" ++ column(col) ++ fr") AS" ++ Fragment.const(alias) } - def column(col: Column[_]): Fragment = { - val prefix = col.table.alias.getOrElse(col.table.tableName) - if (prefix.isEmpty) columnNoPrefix(col) - else Fragment.const0(prefix) ++ Fragment.const0(".") ++ Fragment.const0(col.name) - } - - def columnNoPrefix(col: Column[_]): Fragment = - Fragment.const0(col.name) } diff --git a/modules/store/src/main/scala/docspell/store/queries/QJob.scala b/modules/store/src/main/scala/docspell/store/queries/QJob.scala index f3521ed9..815f9de6 100644 --- a/modules/store/src/main/scala/docspell/store/queries/QJob.scala +++ b/modules/store/src/main/scala/docspell/store/queries/QJob.scala @@ -1,5 +1,6 @@ package docspell.store.queries +import cats.data.NonEmptyList import cats.effect.Effect import cats.implicits._ import fs2.Stream @@ -7,7 +8,8 @@ import fs2.Stream import docspell.common._ import docspell.common.syntax.all._ import docspell.store.Store -import docspell.store.impl.Implicits._ +import docspell.store.qb.DSL._ +import docspell.store.qb._ import docspell.store.records.{RJob, RJobGroupUse, RJobLog} import doobie._ @@ -89,70 +91,60 @@ object QJob { now: Timestamp, initialPause: Duration ): ConnectionIO[Option[Ident]] = { - val JC = RJob.Columns - val waiting: JobState = JobState.Waiting - val stuck: JobState = JobState.Stuck - val jgroup = JC.group.prefix("a") - val jstate = JC.state.prefix("a") - val ugroup = RJobGroupUse.Columns.group.prefix("b") - val uworker = RJobGroupUse.Columns.worker.prefix("b") - - val stuckTrigger = coalesce(JC.startedmillis.prefix("a").f, sql"${now.toMillis}") ++ - fr"+" ++ power2(JC.retries.prefix("a")) ++ fr"* ${initialPause.millis}" + val JC = RJob.as("a") + val G = RJobGroupUse.as("b") + val stuckTrigger = stuckTriggerValue(JC, initialPause, now) val stateCond = - or(jstate.is(waiting), and(jstate.is(stuck), stuckTrigger ++ fr"< ${now.toMillis}")) + JC.state === JobState.waiting || (JC.state === JobState.stuck && stuckTrigger < now.toMillis) - val sql1 = fr"SELECT" ++ jgroup.f ++ fr"as g FROM" ++ RJob.table ++ fr"a" ++ - fr"INNER JOIN" ++ RJobGroupUse.table ++ fr"b ON" ++ jgroup.isGt(ugroup) ++ - fr"WHERE" ++ and(uworker.is(worker), stateCond) ++ - fr"LIMIT 1" //LIMIT is not sql standard, but supported by h2,mariadb and postgres - val sql2 = fr"SELECT min(" ++ jgroup.f ++ fr") as g FROM" ++ RJob.table ++ fr"a" ++ - fr"WHERE" ++ stateCond + val sql1 = + Select( + List(max(JC.group).as("g")), + from(JC).innerJoin(G, JC.group === G.group), + G.worker === worker && stateCond + ) - val union = - sql"SELECT g FROM ((" ++ sql1 ++ sql") UNION ALL (" ++ sql2 ++ sql")) as t0 WHERE g is not null" + val sql2 = + Select(List(min(JC.group).as("g")), from(JC), stateCond) - union - .query[Ident] - .to[List] - .map( - _.headOption - ) // either one or two results, but may be empty if RJob table is empty + val gcol = Column[String]("g", TableDef("")) + val groups = + Select(select(gcol), fromSubSelect(union(sql1, sql2)).as("t0"), gcol.isNull.negate) + + // either 0, one or two results, but may be empty if RJob table is empty + groups.run.query[Ident].to[List].map(_.headOption) } + private def stuckTriggerValue(t: RJob.Table, initialPause: Duration, now: Timestamp) = + plus( + coalesce(t.startedmillis.s, lit(now.toMillis)).s, + mult(power(2, t.retries.s).s, lit(initialPause.millis)).s + ) + def selectNextJob( group: Ident, prio: Priority, initialPause: Duration, now: Timestamp ): ConnectionIO[Option[RJob]] = { - val JC = RJob.Columns + val JC = RJob.T val psort = if (prio == Priority.High) JC.priority.desc else JC.priority.asc - val waiting: JobState = JobState.Waiting - val stuck: JobState = JobState.Stuck + val waiting = JobState.waiting + val stuck = JobState.stuck - val stuckTrigger = - coalesce(JC.startedmillis.f, sql"${now.toMillis}") ++ fr"+" ++ power2( - JC.retries - ) ++ fr"* ${initialPause.millis}" - val sql = selectSimple( - JC.all, - RJob.table, - and( - JC.group.is(group), - or( - JC.state.is(waiting), - and(JC.state.is(stuck), stuckTrigger ++ fr"< ${now.toMillis}") - ) - ) - ) ++ - orderBy(JC.state.asc, psort, JC.submitted.asc) ++ - fr"LIMIT 1" + val stuckTrigger = stuckTriggerValue(JC, initialPause, now) + val sql = + Select( + select(JC.all), + from(JC), + JC.group === group && (JC.state === waiting || + (JC.state === stuck && stuckTrigger < now.toMillis)) + ).orderBy(JC.state.asc, psort, JC.submitted.asc).limit(1) - sql.query[RJob].option + sql.run.query[RJob].option } def setCancelled[F[_]: Effect](id: Ident, store: Store[F]): F[Unit] = @@ -212,39 +204,34 @@ object QJob { collective: Ident, max: Long ): Stream[ConnectionIO, (RJob, Vector[RJobLog])] = { - val JC = RJob.Columns - val waiting: Set[JobState] = Set(JobState.Waiting, JobState.Stuck, JobState.Scheduled) - val running: Set[JobState] = Set(JobState.Running) - val done = JobState.all.diff(waiting).diff(running) + val JC = RJob.T + val waiting = NonEmptyList.of(JobState.Waiting, JobState.Stuck, JobState.Scheduled) + val running = NonEmptyList.of(JobState.Running) + //val done = JobState.all.filterNot(js => ).diff(waiting).diff(running) def selectJobs(now: Timestamp): Stream[ConnectionIO, RJob] = { val refDate = now.minusHours(24) + val runningJobs = Select( + select(JC.all), + from(JC), + JC.group === collective && JC.state.in(running) + ).orderBy(JC.submitted.desc).run.query[RJob].stream - val runningJobs = (selectSimple( - JC.all, - RJob.table, - and(JC.group.is(collective), JC.state.isOneOf(running.toSeq)) - ) ++ orderBy(JC.submitted.desc)).query[RJob].stream + val waitingJobs = Select( + select(JC.all), + from(JC), + JC.group === collective && JC.state.in(waiting) && JC.submitted > refDate + ).orderBy(JC.submitted.desc).run.query[RJob].stream.take(max) - val waitingJobs = (selectSimple( - JC.all, - RJob.table, + val doneJobs = Select( + select(JC.all), + from(JC), and( - JC.group.is(collective), - JC.state.isOneOf(waiting.toSeq), - JC.submitted.isGt(refDate) + JC.group === collective, + JC.state.in(JobState.done), + JC.submitted > refDate ) - ) ++ orderBy(JC.submitted.desc)).query[RJob].stream.take(max) - - val doneJobs = (selectSimple( - JC.all, - RJob.table, - and( - JC.group.is(collective), - JC.state.isOneOf(done.toSeq), - JC.submitted.isGt(refDate) - ) - ) ++ orderBy(JC.submitted.desc)).query[RJob].stream.take(max) + ).orderBy(JC.submitted.desc).run.query[RJob].stream.take(max) runningJobs ++ waitingJobs ++ doneJobs } diff --git a/modules/store/src/main/scala/docspell/store/records/RJob.scala b/modules/store/src/main/scala/docspell/store/records/RJob.scala index 0bc2dc14..5f7b8850 100644 --- a/modules/store/src/main/scala/docspell/store/records/RJob.scala +++ b/modules/store/src/main/scala/docspell/store/records/RJob.scala @@ -1,12 +1,12 @@ package docspell.store.records -import cats.effect.Sync +import cats.data.NonEmptyList import cats.implicits._ import fs2.Stream import docspell.common._ -import docspell.store.impl.Column -import docspell.store.impl.Implicits._ +import docspell.store.qb.DSL._ +import docspell.store.qb._ import doobie._ import doobie.implicits._ @@ -34,7 +34,7 @@ case class RJob( s"${id.id.substring(0, 9)}.../${group.id}/${task.id}/$priority" def isFinalState: Boolean = - JobState.done.contains(state) + JobState.done.toList.contains(state) def isInProgress: Boolean = JobState.inProgress.contains(state) @@ -71,25 +71,25 @@ object RJob { None ) - val table = fr"job" + final case class Table(alias: Option[String]) extends TableDef { + val tableName = "job" - object Columns { - val id = Column("jid") - val task = Column("task") - val group = Column("group_") - val args = Column("args") - val subject = Column("subject") - val submitted = Column("submitted") - val submitter = Column("submitter") - val priority = Column("priority") - val state = Column("state") - val retries = Column("retries") - val progress = Column("progress") - val tracker = Column("tracker") - val worker = Column("worker") - val started = Column("started") - val startedmillis = Column("startedmillis") - val finished = Column("finished") + val id = Column[Ident]("jid", this) + val task = Column[Ident]("task", this) + val group = Column[Ident]("group_", this) + val args = Column[String]("args", this) + val subject = Column[String]("subject", this) + val submitted = Column[Timestamp]("submitted", this) + val submitter = Column[Ident]("submitter", this) + val priority = Column[Priority]("priority", this) + val state = Column[JobState]("state", this) + val retries = Column[Int]("retries", this) + val progress = Column[Int]("progress", this) + val tracker = Column[Ident]("tracker", this) + val worker = Column[Ident]("worker", this) + val started = Column[Timestamp]("started", this) + val startedmillis = Column[Long]("startedmillis", this) + val finished = Column[Timestamp]("finished", this) val all = List( id, task, @@ -109,163 +109,174 @@ object RJob { ) } - import Columns._ + val T = Table(None) + def as(alias: String): Table = + Table(Some(alias)) def insert(v: RJob): ConnectionIO[Int] = { val smillis = v.started.map(_.toMillis) - val sql = insertRow( - table, - all ++ List(startedmillis), + DML.insert( + T, + T.all ++ List(T.startedmillis), fr"${v.id},${v.task},${v.group},${v.args},${v.subject},${v.submitted},${v.submitter},${v.priority},${v.state},${v.retries},${v.progress},${v.tracker},${v.worker},${v.started},${v.finished},$smillis" ) - sql.update.run } def findFromIds(ids: Seq[Ident]): ConnectionIO[Vector[RJob]] = - if (ids.isEmpty) Sync[ConnectionIO].pure(Vector.empty[RJob]) - else selectSimple(all, table, id.isOneOf(ids)).query[RJob].to[Vector] + NonEmptyList.fromList(ids.toList) match { + case None => + Vector.empty[RJob].pure[ConnectionIO] + case Some(nel) => + run(select(T.all), from(T), T.id.in(nel)).query[RJob].to[Vector] + } def findByIdAndGroup(jobId: Ident, jobGroup: Ident): ConnectionIO[Option[RJob]] = - selectSimple(all, table, and(id.is(jobId), group.is(jobGroup))).query[RJob].option + run(select(T.all), from(T), T.id === jobId && T.group === jobGroup).query[RJob].option def findById(jobId: Ident): ConnectionIO[Option[RJob]] = - selectSimple(all, table, id.is(jobId)).query[RJob].option + run(select(T.all), from(T), T.id === jobId).query[RJob].option def findByIdAndWorker(jobId: Ident, workerId: Ident): ConnectionIO[Option[RJob]] = - selectSimple(all, table, and(id.is(jobId), worker.is(workerId))).query[RJob].option + run(select(T.all), from(T), T.id === jobId && T.worker === workerId) + .query[RJob] + .option def setRunningToWaiting(workerId: Ident): ConnectionIO[Int] = { - val states: Seq[JobState] = List(JobState.Running, JobState.Scheduled) - updateRow( - table, - and(worker.is(workerId), state.isOneOf(states)), - state.setTo(JobState.Waiting: JobState) - ).update.run + val states: NonEmptyList[JobState] = + NonEmptyList.of(JobState.Running, JobState.Scheduled) + DML.update( + T, + where(T.worker === workerId, T.state.in(states)), + DML.set(T.state.setTo(JobState.waiting)) + ) } def incrementRetries(jobid: Ident): ConnectionIO[Int] = - updateRow( - table, - and(id.is(jobid), state.is(JobState.Stuck: JobState)), - retries.f ++ fr"=" ++ retries.f ++ fr"+ 1" - ).update.run + DML + .update( + T, + where(T.id === jobid, T.state === JobState.stuck), + DML.set(T.retries.increment(1)) + ) def setRunning(jobId: Ident, workerId: Ident, now: Timestamp): ConnectionIO[Int] = - updateRow( - table, - id.is(jobId), - commas( - state.setTo(JobState.Running: JobState), - started.setTo(now), - startedmillis.setTo(now.toMillis), - worker.setTo(workerId) + DML.update( + T, + T.id === jobId, + DML.set( + T.state.setTo(JobState.running), + T.started.setTo(now), + T.startedmillis.setTo(now.toMillis), + T.worker.setTo(workerId) ) - ).update.run + ) def setWaiting(jobId: Ident): ConnectionIO[Int] = - updateRow( - table, - id.is(jobId), - commas( - state.setTo(JobState.Waiting: JobState), - started.setTo(None: Option[Timestamp]), - startedmillis.setTo(None: Option[Long]), - finished.setTo(None: Option[Timestamp]) + DML + .update( + T, + T.id === jobId, + DML.set( + T.state.setTo(JobState.Waiting: JobState), + T.started.setTo(None: Option[Timestamp]), + T.startedmillis.setTo(None: Option[Long]), + T.finished.setTo(None: Option[Timestamp]) + ) ) - ).update.run def setScheduled(jobId: Ident, workerId: Ident): ConnectionIO[Int] = for { _ <- incrementRetries(jobId) - n <- updateRow( - table, - and( - id.is(jobId), - or(worker.isNull, worker.is(workerId)), - state.isOneOf(Seq[JobState](JobState.Waiting, JobState.Stuck)) + n <- DML.update( + T, + where( + T.id === jobId, + or(T.worker.isNull, T.worker === workerId), + T.state.in(NonEmptyList.of(JobState.waiting, JobState.stuck)) ), - commas( - state.setTo(JobState.Scheduled: JobState), - worker.setTo(workerId) + DML.set( + T.state.setTo(JobState.scheduled), + T.worker.setTo(workerId) ) - ).update.run + ) } yield n def setSuccess(jobId: Ident, now: Timestamp): ConnectionIO[Int] = - updateRow( - table, - id.is(jobId), - commas( - state.setTo(JobState.Success: JobState), - finished.setTo(now) + DML + .update( + T, + T.id === jobId, + DML.set( + T.state.setTo(JobState.success), + T.finished.setTo(now) + ) ) - ).update.run def setStuck(jobId: Ident, now: Timestamp): ConnectionIO[Int] = - updateRow( - table, - id.is(jobId), - commas( - state.setTo(JobState.Stuck: JobState), - finished.setTo(now) + DML.update( + T, + T.id === jobId, + DML.set( + T.state.setTo(JobState.stuck), + T.finished.setTo(now) ) - ).update.run + ) def setFailed(jobId: Ident, now: Timestamp): ConnectionIO[Int] = - updateRow( - table, - id.is(jobId), - commas( - state.setTo(JobState.Failed: JobState), - finished.setTo(now) + DML.update( + T, + T.id === jobId, + DML.set( + T.state.setTo(JobState.failed), + T.finished.setTo(now) ) - ).update.run + ) def setCancelled(jobId: Ident, now: Timestamp): ConnectionIO[Int] = - updateRow( - table, - id.is(jobId), - commas( - state.setTo(JobState.Cancelled: JobState), - finished.setTo(now) + DML.update( + T, + T.id === jobId, + DML.set( + T.state.setTo(JobState.cancelled), + T.finished.setTo(now) ) - ).update.run + ) def setPriority(jobId: Ident, jobGroup: Ident, prio: Priority): ConnectionIO[Int] = - updateRow( - table, - and(id.is(jobId), group.is(jobGroup), state.is(JobState.waiting)), - priority.setTo(prio) - ).update.run + DML.update( + T, + where(T.id === jobId, T.group === jobGroup, T.state === JobState.waiting), + DML.set(T.priority.setTo(prio)) + ) def getRetries(jobId: Ident): ConnectionIO[Option[Int]] = - selectSimple(List(retries), table, id.is(jobId)).query[Int].option + run(select(T.retries), from(T), T.id === jobId).query[Int].option def setProgress(jobId: Ident, perc: Int): ConnectionIO[Int] = - updateRow(table, id.is(jobId), progress.setTo(perc)).update.run + DML.update(T, T.id === jobId, DML.set(T.progress.setTo(perc))) def selectWaiting: ConnectionIO[Option[RJob]] = { - val sql = selectSimple(all, table, state.is(JobState.Waiting: JobState)) + val sql = run(select(T.all), from(T), T.state === JobState.waiting) sql.query[RJob].to[Vector].map(_.headOption) } - def selectGroupInState(states: Seq[JobState]): ConnectionIO[Vector[Ident]] = { + def selectGroupInState(states: NonEmptyList[JobState]): ConnectionIO[Vector[Ident]] = { val sql = - selectDistinct(List(group), table, state.isOneOf(states)) ++ orderBy(group.f) - sql.query[Ident].to[Vector] + Select(select(T.group), from(T), T.state.in(states)).orderBy(T.group) + sql.run.query[Ident].to[Vector] } def delete(jobId: Ident): ConnectionIO[Int] = for { n0 <- RJobLog.deleteAll(jobId) - n1 <- deleteFrom(table, id.is(jobId)).update.run + n1 <- DML.delete(T, T.id === jobId) } yield n0 + n1 def findIdsDoneAndOlderThan(ts: Timestamp): Stream[ConnectionIO, Ident] = - selectSimple( - Seq(id), - table, - and(state.isOneOf(JobState.done.toSeq), or(finished.isNull, finished.isLt(ts))) + run( + select(T.id), + from(T), + T.state.in(JobState.done) && (T.finished.isNull || T.finished < ts) ).query[Ident].stream def deleteDoneAndOlderThan(ts: Timestamp, batch: Int): ConnectionIO[Int] = @@ -277,10 +288,10 @@ object RJob { .foldMonoid def findNonFinalByTracker(trackerId: Ident): ConnectionIO[Option[RJob]] = - selectSimple( - all, - table, - and(tracker.is(trackerId), state.isOneOf(JobState.all.diff(JobState.done).toSeq)) + run( + select(T.all), + from(T), + where(T.tracker === trackerId, T.state.in(JobState.notDone)) ).query[RJob].option } diff --git a/modules/store/src/main/scala/docspell/store/records/RJobGroupUse.scala b/modules/store/src/main/scala/docspell/store/records/RJobGroupUse.scala index 1ce0e448..9cf4aec4 100644 --- a/modules/store/src/main/scala/docspell/store/records/RJobGroupUse.scala +++ b/modules/store/src/main/scala/docspell/store/records/RJobGroupUse.scala @@ -3,8 +3,8 @@ package docspell.store.records import cats.implicits._ import docspell.common._ -import docspell.store.impl.Column -import docspell.store.impl.Implicits._ +import docspell.store.qb.DSL._ +import docspell.store.qb._ import doobie._ import doobie.implicits._ @@ -12,25 +12,27 @@ import doobie.implicits._ case class RJobGroupUse(groupId: Ident, workerId: Ident) {} object RJobGroupUse { + final case class Table(alias: Option[String]) extends TableDef { + val tableName = "jobgroupuse" - val table = fr"jobgroupuse" - - object Columns { - val group = Column("groupid") - val worker = Column("workerid") + val group = Column[Ident]("groupid", this) + val worker = Column[Ident]("workerid", this) val all = List(group, worker) } - import Columns._ + + val T = Table(None) + def as(alias: String): Table = + Table(Some(alias)) def insert(v: RJobGroupUse): ConnectionIO[Int] = - insertRow(table, all, fr"${v.groupId},${v.workerId}").update.run + DML.insert(T, T.all, fr"${v.groupId},${v.workerId}") def updateGroup(v: RJobGroupUse): ConnectionIO[Int] = - updateRow(table, worker.is(v.workerId), group.setTo(v.groupId)).update.run + DML.update(T, T.worker === v.workerId, DML.set(T.group.setTo(v.groupId))) def setGroup(v: RJobGroupUse): ConnectionIO[Int] = updateGroup(v).flatMap(n => if (n > 0) n.pure[ConnectionIO] else insert(v)) def findGroup(workerId: Ident): ConnectionIO[Option[Ident]] = - selectSimple(List(group), table, worker.is(workerId)).query[Ident].option + run(select(T.group), from(T), T.worker === workerId).query[Ident].option } diff --git a/modules/store/src/main/scala/docspell/store/records/RJobLog.scala b/modules/store/src/main/scala/docspell/store/records/RJobLog.scala index 546aa5fe..999e9570 100644 --- a/modules/store/src/main/scala/docspell/store/records/RJobLog.scala +++ b/modules/store/src/main/scala/docspell/store/records/RJobLog.scala @@ -1,8 +1,8 @@ package docspell.store.records import docspell.common._ -import docspell.store.impl.Column -import docspell.store.impl.Implicits._ +import docspell.store.qb.DSL._ +import docspell.store.qb._ import doobie._ import doobie.implicits._ @@ -16,35 +16,39 @@ case class RJobLog( ) {} object RJobLog { + final case class Table(alias: Option[String]) extends TableDef { + val tableName = "joblog" - val table = fr"joblog" - - object Columns { - val id = Column("id") - val jobId = Column("jid") - val level = Column("level") - val created = Column("created") - val message = Column("message") + val id = Column[Ident]("id", this) + val jobId = Column[Ident]("jid", this) + val level = Column[LogLevel]("level", this) + val created = Column[Timestamp]("created", this) + val message = Column[String]("message", this) val all = List(id, jobId, level, created, message) // separate column only for sorting, so not included in `all` and // the case class - val counter = Column("counter") + val counter = Column[Long]("counter", this) } - import Columns._ + + val T = Table(None) + def as(alias: String): Table = + Table(Some(alias)) def insert(v: RJobLog): ConnectionIO[Int] = - insertRow( - table, - all, + DML.insert( + T, + T.all, fr"${v.id},${v.jobId},${v.level},${v.created},${v.message}" - ).update.run + ) def findLogs(id: Ident): ConnectionIO[Vector[RJobLog]] = - (selectSimple(all, table, jobId.is(id)) ++ orderBy(created.asc, counter.asc)) + Select(select(T.all), from(T), T.jobId === id) + .orderBy(T.created.asc, T.counter.asc) + .run .query[RJobLog] .to[Vector] def deleteAll(job: Ident): ConnectionIO[Int] = - deleteFrom(table, jobId.is(job)).update.run + DML.delete(T, T.jobId === job) }