Convert job record

This commit is contained in:
Eike Kettner 2020-12-12 01:56:06 +01:00
parent 1aa1f4367e
commit e3f6892abd
20 changed files with 445 additions and 297 deletions

View File

@ -39,7 +39,7 @@ object OJob {
def queued: Vector[JobDetail] = def queued: Vector[JobDetail] =
jobs.filter(r => JobState.queued.contains(r.job.state)) jobs.filter(r => JobState.queued.contains(r.job.state))
def done: Vector[JobDetail] = def done: Vector[JobDetail] =
jobs.filter(r => JobState.done.contains(r.job.state)) jobs.filter(r => JobState.done.toList.contains(r.job.state))
def running: Vector[JobDetail] = def running: Vector[JobDetail] =
jobs.filter(_.job.state == JobState.Running) jobs.filter(_.job.state == JobState.Running)
} }

View File

@ -1,5 +1,7 @@
package docspell.common package docspell.common
import cats.data.NonEmptyList
import io.circe.{Decoder, Encoder} import io.circe.{Decoder, Encoder}
sealed trait JobState { self: Product => sealed trait JobState { self: Product =>
@ -12,8 +14,6 @@ object JobState {
/** Waiting for being executed. */ /** Waiting for being executed. */
case object Waiting extends JobState {} case object Waiting extends JobState {}
def waiting: JobState = Waiting
/** A scheduler has picked up this job and will pass it to the next /** A scheduler has picked up this job and will pass it to the next
* free slot. * free slot.
*/ */
@ -34,10 +34,20 @@ object JobState {
/** Finished with success */ /** Finished with success */
case object Success extends JobState {} case object Success extends JobState {}
val all: Set[JobState] = val waiting: JobState = Waiting
Set(Waiting, Scheduled, Running, Stuck, Failed, Cancelled, Success) val stuck: JobState = Stuck
val queued: Set[JobState] = Set(Waiting, Scheduled, Stuck) val scheduled: JobState = Scheduled
val done: Set[JobState] = Set(Failed, Cancelled, Success) val running: JobState = Running
val failed: JobState = Failed
val cancelled: JobState = Cancelled
val success: JobState = Success
val all: NonEmptyList[JobState] =
NonEmptyList.of(Waiting, Scheduled, Running, Stuck, Failed, Cancelled, Success)
val queued: Set[JobState] = Set(Waiting, Scheduled, Stuck)
val done: NonEmptyList[JobState] = NonEmptyList.of(Failed, Cancelled, Success)
val notDone: NonEmptyList[JobState] = //all - done
NonEmptyList.of(Waiting, Scheduled, Running, Stuck)
val inProgress: Set[JobState] = Set(Scheduled, Running, Stuck) val inProgress: Set[JobState] = Set(Scheduled, Running, Stuck)
def parse(str: String): Either[String, JobState] = def parse(str: String): Either[String, JobState] =

View File

@ -141,7 +141,7 @@ object RegexNerFile {
def latestUpdate(collective: Ident): ConnectionIO[Option[Timestamp]] = { def latestUpdate(collective: Ident): ConnectionIO[Option[Timestamp]] = {
def max_(col: Column[_], cidCol: Column[Ident]): Select = def max_(col: Column[_], cidCol: Column[Ident]): Select =
Select(select(max(col).as("t")), from(col.table), cidCol === collective) Select(List(max(col).as("t")), from(col.table), cidCol === collective)
val sql = union( val sql = union(
max_(ROrganization.T.updated, ROrganization.T.cid), max_(ROrganization.T.updated, ROrganization.T.cid),

View File

@ -6,15 +6,10 @@ object Implicits extends DoobieMeta with DoobieSyntax {
def oldColumn: Column = def oldColumn: Column =
Column(col.name) Column(col.name)
def column: Column = { def column: Column =
val c = col.alias match { col.table.alias match {
case Some(a) => oldColumn.as(a) case Some(p) => oldColumn.prefix(p)
case None => oldColumn case None => oldColumn
} }
col.table.alias match {
case Some(p) => c.prefix(p)
case None => c
}
}
} }
} }

View File

@ -1,8 +1,5 @@
package docspell.store.qb package docspell.store.qb
case class Column[A](name: String, table: TableDef, alias: Option[String] = None) { case class Column[A](name: String, table: TableDef)
def as(alias: String): Column[A] =
copy(alias = Some(alias))
}
object Column {} object Column {}

View File

@ -12,6 +12,10 @@ object Condition {
val P: Put[A] val P: Put[A]
) extends Condition ) extends Condition
case class CompareFVal[A](dbf: DBFunction, op: Operator, value: A)(implicit
val P: Put[A]
) extends Condition
case class CompareCol[A](col1: Column[A], op: Operator, col2: Column[A]) case class CompareCol[A](col1: Column[A], op: Operator, col2: Column[A])
extends Condition extends Condition

View File

@ -1,31 +1,27 @@
package docspell.store.qb package docspell.store.qb
sealed trait DBFunction { sealed trait DBFunction {}
def alias: String
def as(alias: String): DBFunction
}
object DBFunction { object DBFunction {
def countAllAs(alias: String) = val countAll: DBFunction = CountAll
CountAll(alias)
def countAs[A](column: Column[A], alias: String): DBFunction = def countAs[A](column: Column[A]): DBFunction =
Count(column, alias) Count(column)
case class CountAll(alias: String) extends DBFunction { case object CountAll extends DBFunction
def as(a: String) =
copy(alias = a)
}
case class Count(column: Column[_], alias: String) extends DBFunction { case class Count(column: Column[_]) extends DBFunction
def as(a: String) =
copy(alias = a)
}
case class Max(column: Column[_], alias: String) extends DBFunction { case class Max(column: Column[_]) extends DBFunction
def as(a: String) =
copy(alias = a) case class Min(column: Column[_]) extends DBFunction
}
case class Coalesce(expr: SelectExpr, exprs: Vector[SelectExpr]) extends DBFunction
case class Power(expr: SelectExpr, base: Int) extends DBFunction
case class Plus(expr: SelectExpr, exprs: Vector[SelectExpr]) extends DBFunction
case class Mult(expr: SelectExpr, exprs: Vector[SelectExpr]) extends DBFunction
} }

View File

@ -23,13 +23,13 @@ trait DSL extends DoobieMeta {
DoobieQuery.distinct(Select(projection, from, where)) DoobieQuery.distinct(Select(projection, from, where))
def select(dbf: DBFunction): Seq[SelectExpr] = def select(dbf: DBFunction): Seq[SelectExpr] =
Seq(SelectExpr.SelectFun(dbf)) Seq(SelectExpr.SelectFun(dbf, None))
def select(c: Column[_], cs: Column[_]*): Seq[SelectExpr] = def select(c: Column[_], cs: Column[_]*): Seq[SelectExpr] =
select(c :: cs.toList) select(c :: cs.toList)
def select(seq: Seq[Column[_]], seqs: Seq[Column[_]]*): Seq[SelectExpr] = def select(seq: Seq[Column[_]], seqs: Seq[Column[_]]*): Seq[SelectExpr] =
(seq ++ seqs.flatten).map(SelectExpr.SelectColumn.apply) (seq ++ seqs.flatten).map(c => SelectExpr.SelectColumn(c, None))
def union(s1: Select, sn: Select*): Select = def union(s1: Select, sn: Select*): Select =
Select.Union(s1, sn.toVector) Select.Union(s1, sn.toVector)
@ -41,10 +41,28 @@ trait DSL extends DoobieMeta {
FromExpr.SubSelect(sel, "x") FromExpr.SubSelect(sel, "x")
def count(c: Column[_]): DBFunction = def count(c: Column[_]): DBFunction =
DBFunction.Count(c, "cn") DBFunction.Count(c)
def max(c: Column[_]): DBFunction = def max(c: Column[_]): DBFunction =
DBFunction.Max(c, "mn") DBFunction.Max(c)
def min(c: Column[_]): DBFunction =
DBFunction.Min(c)
def coalesce(expr: SelectExpr, more: SelectExpr*): DBFunction.Coalesce =
DBFunction.Coalesce(expr, more.toVector)
def power(base: Int, expr: SelectExpr): DBFunction =
DBFunction.Power(expr, base)
def lit[A](value: A)(implicit P: Put[A]): SelectExpr.SelectLit[A] =
SelectExpr.SelectLit(value, None)
def plus(expr: SelectExpr, more: SelectExpr*): DBFunction =
DBFunction.Plus(expr, more.toVector)
def mult(expr: SelectExpr, more: SelectExpr*): DBFunction =
DBFunction.Mult(expr, more.toVector)
def and(c: Condition, cs: Condition*): Condition = def and(c: Condition, cs: Condition*): Condition =
c match { c match {
@ -75,6 +93,8 @@ trait DSL extends DoobieMeta {
else and(c, cs: _*) else and(c, cs: _*)
implicit final class ColumnOps[A](col: Column[A]) { implicit final class ColumnOps[A](col: Column[A]) {
def s: SelectExpr = SelectExpr.SelectColumn(col, None)
def as(alias: String) = SelectExpr.SelectColumn(col, Some(alias))
def setTo(value: A)(implicit P: Put[A]): Setter[A] = def setTo(value: A)(implicit P: Put[A]): Setter[A] =
Setter.SetValue(col, value) Setter.SetValue(col, value)
@ -86,10 +106,10 @@ trait DSL extends DoobieMeta {
Setter.Increment(col, amount) Setter.Increment(col, amount)
def asc: OrderBy = def asc: OrderBy =
OrderBy(SelectExpr.SelectColumn(col), OrderBy.OrderType.Asc) OrderBy(SelectExpr.SelectColumn(col, None), OrderBy.OrderType.Asc)
def desc: OrderBy = def desc: OrderBy =
OrderBy(SelectExpr.SelectColumn(col), OrderBy.OrderType.Desc) OrderBy(SelectExpr.SelectColumn(col, None), OrderBy.OrderType.Desc)
def ===(value: A)(implicit P: Put[A]): Condition = def ===(value: A)(implicit P: Put[A]): Condition =
Condition.CompareVal(col, Operator.Eq, value) Condition.CompareVal(col, Operator.Eq, value)
@ -155,6 +175,38 @@ trait DSL extends DoobieMeta {
not(c) not(c)
} }
implicit final class DBFunctionOps(dbf: DBFunction) {
def s: SelectExpr = SelectExpr.SelectFun(dbf, None)
def as(alias: String) = SelectExpr.SelectFun(dbf, Some(alias))
def ===[A](value: A)(implicit P: Put[A]): Condition =
Condition.CompareFVal(dbf, Operator.Eq, value)
def ====(value: String): Condition =
Condition.CompareFVal(dbf, Operator.Eq, value)
def like[A](value: A)(implicit P: Put[A]): Condition =
Condition.CompareFVal(dbf, Operator.LowerLike, value)
def likes(value: String): Condition =
Condition.CompareFVal(dbf, Operator.LowerLike, value)
def <=[A](value: A)(implicit P: Put[A]): Condition =
Condition.CompareFVal(dbf, Operator.Lte, value)
def >=[A](value: A)(implicit P: Put[A]): Condition =
Condition.CompareFVal(dbf, Operator.Gte, value)
def >[A](value: A)(implicit P: Put[A]): Condition =
Condition.CompareFVal(dbf, Operator.Gt, value)
def <[A](value: A)(implicit P: Put[A]): Condition =
Condition.CompareFVal(dbf, Operator.Lt, value)
def <>[A](value: A)(implicit P: Put[A]): Condition =
Condition.CompareFVal(dbf, Operator.Neq, value)
}
} }
object DSL extends DSL object DSL extends DSL

View File

@ -6,8 +6,8 @@ object GroupBy {
def apply(c: Column[_], cs: Column[_]*): GroupBy = def apply(c: Column[_], cs: Column[_]*): GroupBy =
GroupBy( GroupBy(
SelectExpr.SelectColumn(c), SelectExpr.SelectColumn(c, None),
cs.toVector.map(SelectExpr.SelectColumn.apply), cs.toVector.map(c => SelectExpr.SelectColumn(c, None)),
None None
) )
} }

View File

@ -11,11 +11,18 @@ sealed trait Select {
def run: Fragment = def run: Fragment =
DoobieQuery(this) DoobieQuery(this)
def orderBy(ob: OrderBy, obs: OrderBy*): Select = def orderBy(ob: OrderBy, obs: OrderBy*): Select.Ordered =
Select.Ordered(this, ob, obs.toVector) Select.Ordered(this, ob, obs.toVector)
def orderBy(c: Column[_]): Select = def orderBy(c: Column[_]): Select.Ordered =
orderBy(OrderBy(SelectExpr.SelectColumn(c), OrderBy.OrderType.Asc)) orderBy(OrderBy(SelectExpr.SelectColumn(c, None), OrderBy.OrderType.Asc))
def limit(n: Int): Select =
this match {
case Select.Limit(q, _) => Select.Limit(q, n)
case _ =>
Select.Limit(this, n)
}
} }
object Select { object Select {
@ -49,4 +56,6 @@ object Select {
case class Ordered(q: Select, orderBy: OrderBy, orderBys: Vector[OrderBy]) case class Ordered(q: Select, orderBy: OrderBy, orderBys: Vector[OrderBy])
extends Select extends Select
case class Limit(q: Select, limit: Int) extends Select
} }

View File

@ -1,11 +1,27 @@
package docspell.store.qb package docspell.store.qb
sealed trait SelectExpr import doobie.Put
sealed trait SelectExpr { self =>
def as(alias: String): SelectExpr
}
object SelectExpr { object SelectExpr {
case class SelectColumn(column: Column[_]) extends SelectExpr case class SelectColumn(column: Column[_], alias: Option[String]) extends SelectExpr {
def as(a: String): SelectColumn =
copy(alias = Some(a))
}
case class SelectFun(fun: DBFunction) extends SelectExpr case class SelectFun(fun: DBFunction, alias: Option[String]) extends SelectExpr {
def as(a: String): SelectFun =
copy(alias = Some(a))
}
case class SelectLit[A](value: A, alias: Option[String])(implicit val P: Put[A])
extends SelectExpr {
def as(a: String): SelectLit[A] =
copy(alias = Some(a))
}
} }

View File

@ -0,0 +1,20 @@
package docspell.store.qb.impl
import docspell.store.qb._
import doobie._
import doobie.implicits._
trait CommonBuilder {
def column(col: Column[_]): Fragment = {
val prefix = col.table.alias.getOrElse(col.table.tableName)
if (prefix.isEmpty) columnNoPrefix(col)
else Fragment.const0(prefix) ++ Fragment.const0(".") ++ Fragment.const0(col.name)
}
def columnNoPrefix(col: Column[_]): Fragment =
Fragment.const0(col.name)
def appendAs(alias: Option[String]): Fragment =
alias.map(a => fr" AS" ++ Fragment.const(a)).getOrElse(Fragment.empty)
}

View File

@ -25,6 +25,17 @@ object ConditionBuilder {
} }
colFrag ++ opFrag ++ valFrag colFrag ++ opFrag ++ valFrag
case c @ Condition.CompareFVal(dbf, op, value) =>
val opFrag = operator(op)
val valFrag = buildValue(value)(c.P)
val dbfFrag = op match {
case Operator.LowerLike =>
lower(dbf)
case _ =>
DBFunctionBuilder.build(dbf)
}
dbfFrag ++ opFrag ++ valFrag
case Condition.CompareCol(c1, op, c2) => case Condition.CompareCol(c1, op, c2) =>
val (c1Frag, c2Frag) = op match { val (c1Frag, c2Frag) = op match {
case Operator.LowerLike => case Operator.LowerLike =>
@ -36,13 +47,13 @@ object ConditionBuilder {
case Condition.InSubSelect(col, subsel) => case Condition.InSubSelect(col, subsel) =>
val sub = DoobieQuery(subsel) val sub = DoobieQuery(subsel)
SelectExprBuilder.column(col) ++ sql" IN (" ++ sub ++ sql")" SelectExprBuilder.column(col) ++ sql" IN (" ++ sub ++ parenClose
case c @ Condition.InValues(col, values, toLower) => case c @ Condition.InValues(col, values, toLower) =>
val cfrag = if (toLower) lower(col) else SelectExprBuilder.column(col) val cfrag = if (toLower) lower(col) else SelectExprBuilder.column(col)
cfrag ++ sql" IN (" ++ values.toList cfrag ++ sql" IN (" ++ values.toList
.map(a => buildValue(a)(c.P)) .map(a => buildValue(a)(c.P))
.reduce(_ ++ comma ++ _) ++ sql")" .reduce(_ ++ comma ++ _) ++ parenClose
case Condition.IsNull(col) => case Condition.IsNull(col) =>
SelectExprBuilder.column(col) ++ fr" is null" SelectExprBuilder.column(col) ++ fr" is null"
@ -89,5 +100,8 @@ object ConditionBuilder {
fr"$v" fr"$v"
def lower(col: Column[_]): Fragment = def lower(col: Column[_]): Fragment =
Fragment.const0("LOWER(") ++ SelectExprBuilder.column(col) ++ sql")" Fragment.const0("LOWER(") ++ SelectExprBuilder.column(col) ++ parenClose
def lower(dbf: DBFunction): Fragment =
Fragment.const0("LOWER(") ++ DBFunctionBuilder.build(dbf) ++ parenClose
} }

View File

@ -0,0 +1,40 @@
package docspell.store.qb.impl
import docspell.store.qb.DBFunction
import doobie._
import doobie.implicits._
object DBFunctionBuilder extends CommonBuilder {
private val comma = fr","
def build(expr: DBFunction): Fragment =
expr match {
case DBFunction.CountAll =>
sql"COUNT(*)"
case DBFunction.Count(col) =>
sql"COUNT(" ++ column(col) ++ fr")"
case DBFunction.Max(col) =>
sql"MAX(" ++ column(col) ++ fr")"
case DBFunction.Min(col) =>
sql"MIN(" ++ column(col) ++ fr")"
case DBFunction.Coalesce(expr, exprs) =>
val v = exprs.prepended(expr).map(SelectExprBuilder.build)
sql"COALESCE(" ++ v.reduce(_ ++ comma ++ _) ++ sql")"
case DBFunction.Power(expr, base) =>
sql"POWER($base, " ++ SelectExprBuilder.build(expr) ++ sql")"
case DBFunction.Plus(expr, more) =>
val v = more.prepended(expr).map(SelectExprBuilder.build)
v.reduce(_ ++ fr" +" ++ _)
case DBFunction.Mult(expr, more) =>
val v = more.prepended(expr).map(SelectExprBuilder.build)
v.reduce(_ ++ fr" *" ++ _)
}
}

View File

@ -34,6 +34,8 @@ object DoobieQuery {
val order = obs.prepended(ob).map(orderBy).reduce(_ ++ comma ++ _) val order = obs.prepended(ob).map(orderBy).reduce(_ ++ comma ++ _)
build(distinct)(q) ++ fr"ORDER BY" ++ order build(distinct)(q) ++ fr"ORDER BY" ++ order
case Select.Limit(q, n) =>
build(distinct)(q) ++ fr" LIMIT $n"
} }
def buildSimple(sq: Select.SimpleSelect): Fragment = { def buildSimple(sq: Select.SimpleSelect): Fragment = {

View File

@ -2,32 +2,21 @@ package docspell.store.qb.impl
import docspell.store.qb._ import docspell.store.qb._
import _root_.doobie.implicits._
import _root_.doobie.{Query => _, _} import _root_.doobie.{Query => _, _}
object SelectExprBuilder { object SelectExprBuilder extends CommonBuilder {
def build(expr: SelectExpr): Fragment = def build(expr: SelectExpr): Fragment =
expr match { expr match {
case SelectExpr.SelectColumn(col) => case SelectExpr.SelectColumn(col, alias) =>
column(col) column(col) ++ appendAs(alias)
case SelectExpr.SelectFun(DBFunction.CountAll(alias)) => case s @ SelectExpr.SelectLit(value, aliasOpt) =>
sql"COUNT(*) AS" ++ Fragment.const(alias) ConditionBuilder.buildValue(value)(s.P) ++ appendAs(aliasOpt)
case SelectExpr.SelectFun(DBFunction.Count(col, alias)) => case SelectExpr.SelectFun(fun, alias) =>
sql"COUNT(" ++ column(col) ++ fr") AS" ++ Fragment.const(alias) DBFunctionBuilder.build(fun) ++ appendAs(alias)
case SelectExpr.SelectFun(DBFunction.Max(col, alias)) =>
sql"MAX(" ++ column(col) ++ fr") AS" ++ Fragment.const(alias)
} }
def column(col: Column[_]): Fragment = {
val prefix = col.table.alias.getOrElse(col.table.tableName)
if (prefix.isEmpty) columnNoPrefix(col)
else Fragment.const0(prefix) ++ Fragment.const0(".") ++ Fragment.const0(col.name)
}
def columnNoPrefix(col: Column[_]): Fragment =
Fragment.const0(col.name)
} }

View File

@ -1,5 +1,6 @@
package docspell.store.queries package docspell.store.queries
import cats.data.NonEmptyList
import cats.effect.Effect import cats.effect.Effect
import cats.implicits._ import cats.implicits._
import fs2.Stream import fs2.Stream
@ -7,7 +8,8 @@ import fs2.Stream
import docspell.common._ import docspell.common._
import docspell.common.syntax.all._ import docspell.common.syntax.all._
import docspell.store.Store import docspell.store.Store
import docspell.store.impl.Implicits._ import docspell.store.qb.DSL._
import docspell.store.qb._
import docspell.store.records.{RJob, RJobGroupUse, RJobLog} import docspell.store.records.{RJob, RJobGroupUse, RJobLog}
import doobie._ import doobie._
@ -89,70 +91,60 @@ object QJob {
now: Timestamp, now: Timestamp,
initialPause: Duration initialPause: Duration
): ConnectionIO[Option[Ident]] = { ): ConnectionIO[Option[Ident]] = {
val JC = RJob.Columns val JC = RJob.as("a")
val waiting: JobState = JobState.Waiting val G = RJobGroupUse.as("b")
val stuck: JobState = JobState.Stuck
val jgroup = JC.group.prefix("a")
val jstate = JC.state.prefix("a")
val ugroup = RJobGroupUse.Columns.group.prefix("b")
val uworker = RJobGroupUse.Columns.worker.prefix("b")
val stuckTrigger = coalesce(JC.startedmillis.prefix("a").f, sql"${now.toMillis}") ++
fr"+" ++ power2(JC.retries.prefix("a")) ++ fr"* ${initialPause.millis}"
val stuckTrigger = stuckTriggerValue(JC, initialPause, now)
val stateCond = val stateCond =
or(jstate.is(waiting), and(jstate.is(stuck), stuckTrigger ++ fr"< ${now.toMillis}")) JC.state === JobState.waiting || (JC.state === JobState.stuck && stuckTrigger < now.toMillis)
val sql1 = fr"SELECT" ++ jgroup.f ++ fr"as g FROM" ++ RJob.table ++ fr"a" ++ val sql1 =
fr"INNER JOIN" ++ RJobGroupUse.table ++ fr"b ON" ++ jgroup.isGt(ugroup) ++ Select(
fr"WHERE" ++ and(uworker.is(worker), stateCond) ++ List(max(JC.group).as("g")),
fr"LIMIT 1" //LIMIT is not sql standard, but supported by h2,mariadb and postgres from(JC).innerJoin(G, JC.group === G.group),
val sql2 = fr"SELECT min(" ++ jgroup.f ++ fr") as g FROM" ++ RJob.table ++ fr"a" ++ G.worker === worker && stateCond
fr"WHERE" ++ stateCond )
val union = val sql2 =
sql"SELECT g FROM ((" ++ sql1 ++ sql") UNION ALL (" ++ sql2 ++ sql")) as t0 WHERE g is not null" Select(List(min(JC.group).as("g")), from(JC), stateCond)
union val gcol = Column[String]("g", TableDef(""))
.query[Ident] val groups =
.to[List] Select(select(gcol), fromSubSelect(union(sql1, sql2)).as("t0"), gcol.isNull.negate)
.map(
_.headOption // either 0, one or two results, but may be empty if RJob table is empty
) // either one or two results, but may be empty if RJob table is empty groups.run.query[Ident].to[List].map(_.headOption)
} }
private def stuckTriggerValue(t: RJob.Table, initialPause: Duration, now: Timestamp) =
plus(
coalesce(t.startedmillis.s, lit(now.toMillis)).s,
mult(power(2, t.retries.s).s, lit(initialPause.millis)).s
)
def selectNextJob( def selectNextJob(
group: Ident, group: Ident,
prio: Priority, prio: Priority,
initialPause: Duration, initialPause: Duration,
now: Timestamp now: Timestamp
): ConnectionIO[Option[RJob]] = { ): ConnectionIO[Option[RJob]] = {
val JC = RJob.Columns val JC = RJob.T
val psort = val psort =
if (prio == Priority.High) JC.priority.desc if (prio == Priority.High) JC.priority.desc
else JC.priority.asc else JC.priority.asc
val waiting: JobState = JobState.Waiting val waiting = JobState.waiting
val stuck: JobState = JobState.Stuck val stuck = JobState.stuck
val stuckTrigger = val stuckTrigger = stuckTriggerValue(JC, initialPause, now)
coalesce(JC.startedmillis.f, sql"${now.toMillis}") ++ fr"+" ++ power2( val sql =
JC.retries Select(
) ++ fr"* ${initialPause.millis}" select(JC.all),
val sql = selectSimple( from(JC),
JC.all, JC.group === group && (JC.state === waiting ||
RJob.table, (JC.state === stuck && stuckTrigger < now.toMillis))
and( ).orderBy(JC.state.asc, psort, JC.submitted.asc).limit(1)
JC.group.is(group),
or(
JC.state.is(waiting),
and(JC.state.is(stuck), stuckTrigger ++ fr"< ${now.toMillis}")
)
)
) ++
orderBy(JC.state.asc, psort, JC.submitted.asc) ++
fr"LIMIT 1"
sql.query[RJob].option sql.run.query[RJob].option
} }
def setCancelled[F[_]: Effect](id: Ident, store: Store[F]): F[Unit] = def setCancelled[F[_]: Effect](id: Ident, store: Store[F]): F[Unit] =
@ -212,39 +204,34 @@ object QJob {
collective: Ident, collective: Ident,
max: Long max: Long
): Stream[ConnectionIO, (RJob, Vector[RJobLog])] = { ): Stream[ConnectionIO, (RJob, Vector[RJobLog])] = {
val JC = RJob.Columns val JC = RJob.T
val waiting: Set[JobState] = Set(JobState.Waiting, JobState.Stuck, JobState.Scheduled) val waiting = NonEmptyList.of(JobState.Waiting, JobState.Stuck, JobState.Scheduled)
val running: Set[JobState] = Set(JobState.Running) val running = NonEmptyList.of(JobState.Running)
val done = JobState.all.diff(waiting).diff(running) //val done = JobState.all.filterNot(js => ).diff(waiting).diff(running)
def selectJobs(now: Timestamp): Stream[ConnectionIO, RJob] = { def selectJobs(now: Timestamp): Stream[ConnectionIO, RJob] = {
val refDate = now.minusHours(24) val refDate = now.minusHours(24)
val runningJobs = Select(
select(JC.all),
from(JC),
JC.group === collective && JC.state.in(running)
).orderBy(JC.submitted.desc).run.query[RJob].stream
val runningJobs = (selectSimple( val waitingJobs = Select(
JC.all, select(JC.all),
RJob.table, from(JC),
and(JC.group.is(collective), JC.state.isOneOf(running.toSeq)) JC.group === collective && JC.state.in(waiting) && JC.submitted > refDate
) ++ orderBy(JC.submitted.desc)).query[RJob].stream ).orderBy(JC.submitted.desc).run.query[RJob].stream.take(max)
val waitingJobs = (selectSimple( val doneJobs = Select(
JC.all, select(JC.all),
RJob.table, from(JC),
and( and(
JC.group.is(collective), JC.group === collective,
JC.state.isOneOf(waiting.toSeq), JC.state.in(JobState.done),
JC.submitted.isGt(refDate) JC.submitted > refDate
) )
) ++ orderBy(JC.submitted.desc)).query[RJob].stream.take(max) ).orderBy(JC.submitted.desc).run.query[RJob].stream.take(max)
val doneJobs = (selectSimple(
JC.all,
RJob.table,
and(
JC.group.is(collective),
JC.state.isOneOf(done.toSeq),
JC.submitted.isGt(refDate)
)
) ++ orderBy(JC.submitted.desc)).query[RJob].stream.take(max)
runningJobs ++ waitingJobs ++ doneJobs runningJobs ++ waitingJobs ++ doneJobs
} }

View File

@ -1,12 +1,12 @@
package docspell.store.records package docspell.store.records
import cats.effect.Sync import cats.data.NonEmptyList
import cats.implicits._ import cats.implicits._
import fs2.Stream import fs2.Stream
import docspell.common._ import docspell.common._
import docspell.store.impl.Column import docspell.store.qb.DSL._
import docspell.store.impl.Implicits._ import docspell.store.qb._
import doobie._ import doobie._
import doobie.implicits._ import doobie.implicits._
@ -34,7 +34,7 @@ case class RJob(
s"${id.id.substring(0, 9)}.../${group.id}/${task.id}/$priority" s"${id.id.substring(0, 9)}.../${group.id}/${task.id}/$priority"
def isFinalState: Boolean = def isFinalState: Boolean =
JobState.done.contains(state) JobState.done.toList.contains(state)
def isInProgress: Boolean = def isInProgress: Boolean =
JobState.inProgress.contains(state) JobState.inProgress.contains(state)
@ -71,25 +71,25 @@ object RJob {
None None
) )
val table = fr"job" final case class Table(alias: Option[String]) extends TableDef {
val tableName = "job"
object Columns { val id = Column[Ident]("jid", this)
val id = Column("jid") val task = Column[Ident]("task", this)
val task = Column("task") val group = Column[Ident]("group_", this)
val group = Column("group_") val args = Column[String]("args", this)
val args = Column("args") val subject = Column[String]("subject", this)
val subject = Column("subject") val submitted = Column[Timestamp]("submitted", this)
val submitted = Column("submitted") val submitter = Column[Ident]("submitter", this)
val submitter = Column("submitter") val priority = Column[Priority]("priority", this)
val priority = Column("priority") val state = Column[JobState]("state", this)
val state = Column("state") val retries = Column[Int]("retries", this)
val retries = Column("retries") val progress = Column[Int]("progress", this)
val progress = Column("progress") val tracker = Column[Ident]("tracker", this)
val tracker = Column("tracker") val worker = Column[Ident]("worker", this)
val worker = Column("worker") val started = Column[Timestamp]("started", this)
val started = Column("started") val startedmillis = Column[Long]("startedmillis", this)
val startedmillis = Column("startedmillis") val finished = Column[Timestamp]("finished", this)
val finished = Column("finished")
val all = List( val all = List(
id, id,
task, task,
@ -109,163 +109,174 @@ object RJob {
) )
} }
import Columns._ val T = Table(None)
def as(alias: String): Table =
Table(Some(alias))
def insert(v: RJob): ConnectionIO[Int] = { def insert(v: RJob): ConnectionIO[Int] = {
val smillis = v.started.map(_.toMillis) val smillis = v.started.map(_.toMillis)
val sql = insertRow( DML.insert(
table, T,
all ++ List(startedmillis), T.all ++ List(T.startedmillis),
fr"${v.id},${v.task},${v.group},${v.args},${v.subject},${v.submitted},${v.submitter},${v.priority},${v.state},${v.retries},${v.progress},${v.tracker},${v.worker},${v.started},${v.finished},$smillis" fr"${v.id},${v.task},${v.group},${v.args},${v.subject},${v.submitted},${v.submitter},${v.priority},${v.state},${v.retries},${v.progress},${v.tracker},${v.worker},${v.started},${v.finished},$smillis"
) )
sql.update.run
} }
def findFromIds(ids: Seq[Ident]): ConnectionIO[Vector[RJob]] = def findFromIds(ids: Seq[Ident]): ConnectionIO[Vector[RJob]] =
if (ids.isEmpty) Sync[ConnectionIO].pure(Vector.empty[RJob]) NonEmptyList.fromList(ids.toList) match {
else selectSimple(all, table, id.isOneOf(ids)).query[RJob].to[Vector] case None =>
Vector.empty[RJob].pure[ConnectionIO]
case Some(nel) =>
run(select(T.all), from(T), T.id.in(nel)).query[RJob].to[Vector]
}
def findByIdAndGroup(jobId: Ident, jobGroup: Ident): ConnectionIO[Option[RJob]] = def findByIdAndGroup(jobId: Ident, jobGroup: Ident): ConnectionIO[Option[RJob]] =
selectSimple(all, table, and(id.is(jobId), group.is(jobGroup))).query[RJob].option run(select(T.all), from(T), T.id === jobId && T.group === jobGroup).query[RJob].option
def findById(jobId: Ident): ConnectionIO[Option[RJob]] = def findById(jobId: Ident): ConnectionIO[Option[RJob]] =
selectSimple(all, table, id.is(jobId)).query[RJob].option run(select(T.all), from(T), T.id === jobId).query[RJob].option
def findByIdAndWorker(jobId: Ident, workerId: Ident): ConnectionIO[Option[RJob]] = def findByIdAndWorker(jobId: Ident, workerId: Ident): ConnectionIO[Option[RJob]] =
selectSimple(all, table, and(id.is(jobId), worker.is(workerId))).query[RJob].option run(select(T.all), from(T), T.id === jobId && T.worker === workerId)
.query[RJob]
.option
def setRunningToWaiting(workerId: Ident): ConnectionIO[Int] = { def setRunningToWaiting(workerId: Ident): ConnectionIO[Int] = {
val states: Seq[JobState] = List(JobState.Running, JobState.Scheduled) val states: NonEmptyList[JobState] =
updateRow( NonEmptyList.of(JobState.Running, JobState.Scheduled)
table, DML.update(
and(worker.is(workerId), state.isOneOf(states)), T,
state.setTo(JobState.Waiting: JobState) where(T.worker === workerId, T.state.in(states)),
).update.run DML.set(T.state.setTo(JobState.waiting))
)
} }
def incrementRetries(jobid: Ident): ConnectionIO[Int] = def incrementRetries(jobid: Ident): ConnectionIO[Int] =
updateRow( DML
table, .update(
and(id.is(jobid), state.is(JobState.Stuck: JobState)), T,
retries.f ++ fr"=" ++ retries.f ++ fr"+ 1" where(T.id === jobid, T.state === JobState.stuck),
).update.run DML.set(T.retries.increment(1))
)
def setRunning(jobId: Ident, workerId: Ident, now: Timestamp): ConnectionIO[Int] = def setRunning(jobId: Ident, workerId: Ident, now: Timestamp): ConnectionIO[Int] =
updateRow( DML.update(
table, T,
id.is(jobId), T.id === jobId,
commas( DML.set(
state.setTo(JobState.Running: JobState), T.state.setTo(JobState.running),
started.setTo(now), T.started.setTo(now),
startedmillis.setTo(now.toMillis), T.startedmillis.setTo(now.toMillis),
worker.setTo(workerId) T.worker.setTo(workerId)
) )
).update.run )
def setWaiting(jobId: Ident): ConnectionIO[Int] = def setWaiting(jobId: Ident): ConnectionIO[Int] =
updateRow( DML
table, .update(
id.is(jobId), T,
commas( T.id === jobId,
state.setTo(JobState.Waiting: JobState), DML.set(
started.setTo(None: Option[Timestamp]), T.state.setTo(JobState.Waiting: JobState),
startedmillis.setTo(None: Option[Long]), T.started.setTo(None: Option[Timestamp]),
finished.setTo(None: Option[Timestamp]) T.startedmillis.setTo(None: Option[Long]),
T.finished.setTo(None: Option[Timestamp])
)
) )
).update.run
def setScheduled(jobId: Ident, workerId: Ident): ConnectionIO[Int] = def setScheduled(jobId: Ident, workerId: Ident): ConnectionIO[Int] =
for { for {
_ <- incrementRetries(jobId) _ <- incrementRetries(jobId)
n <- updateRow( n <- DML.update(
table, T,
and( where(
id.is(jobId), T.id === jobId,
or(worker.isNull, worker.is(workerId)), or(T.worker.isNull, T.worker === workerId),
state.isOneOf(Seq[JobState](JobState.Waiting, JobState.Stuck)) T.state.in(NonEmptyList.of(JobState.waiting, JobState.stuck))
), ),
commas( DML.set(
state.setTo(JobState.Scheduled: JobState), T.state.setTo(JobState.scheduled),
worker.setTo(workerId) T.worker.setTo(workerId)
) )
).update.run )
} yield n } yield n
def setSuccess(jobId: Ident, now: Timestamp): ConnectionIO[Int] = def setSuccess(jobId: Ident, now: Timestamp): ConnectionIO[Int] =
updateRow( DML
table, .update(
id.is(jobId), T,
commas( T.id === jobId,
state.setTo(JobState.Success: JobState), DML.set(
finished.setTo(now) T.state.setTo(JobState.success),
T.finished.setTo(now)
)
) )
).update.run
def setStuck(jobId: Ident, now: Timestamp): ConnectionIO[Int] = def setStuck(jobId: Ident, now: Timestamp): ConnectionIO[Int] =
updateRow( DML.update(
table, T,
id.is(jobId), T.id === jobId,
commas( DML.set(
state.setTo(JobState.Stuck: JobState), T.state.setTo(JobState.stuck),
finished.setTo(now) T.finished.setTo(now)
) )
).update.run )
def setFailed(jobId: Ident, now: Timestamp): ConnectionIO[Int] = def setFailed(jobId: Ident, now: Timestamp): ConnectionIO[Int] =
updateRow( DML.update(
table, T,
id.is(jobId), T.id === jobId,
commas( DML.set(
state.setTo(JobState.Failed: JobState), T.state.setTo(JobState.failed),
finished.setTo(now) T.finished.setTo(now)
) )
).update.run )
def setCancelled(jobId: Ident, now: Timestamp): ConnectionIO[Int] = def setCancelled(jobId: Ident, now: Timestamp): ConnectionIO[Int] =
updateRow( DML.update(
table, T,
id.is(jobId), T.id === jobId,
commas( DML.set(
state.setTo(JobState.Cancelled: JobState), T.state.setTo(JobState.cancelled),
finished.setTo(now) T.finished.setTo(now)
) )
).update.run )
def setPriority(jobId: Ident, jobGroup: Ident, prio: Priority): ConnectionIO[Int] = def setPriority(jobId: Ident, jobGroup: Ident, prio: Priority): ConnectionIO[Int] =
updateRow( DML.update(
table, T,
and(id.is(jobId), group.is(jobGroup), state.is(JobState.waiting)), where(T.id === jobId, T.group === jobGroup, T.state === JobState.waiting),
priority.setTo(prio) DML.set(T.priority.setTo(prio))
).update.run )
def getRetries(jobId: Ident): ConnectionIO[Option[Int]] = def getRetries(jobId: Ident): ConnectionIO[Option[Int]] =
selectSimple(List(retries), table, id.is(jobId)).query[Int].option run(select(T.retries), from(T), T.id === jobId).query[Int].option
def setProgress(jobId: Ident, perc: Int): ConnectionIO[Int] = def setProgress(jobId: Ident, perc: Int): ConnectionIO[Int] =
updateRow(table, id.is(jobId), progress.setTo(perc)).update.run DML.update(T, T.id === jobId, DML.set(T.progress.setTo(perc)))
def selectWaiting: ConnectionIO[Option[RJob]] = { def selectWaiting: ConnectionIO[Option[RJob]] = {
val sql = selectSimple(all, table, state.is(JobState.Waiting: JobState)) val sql = run(select(T.all), from(T), T.state === JobState.waiting)
sql.query[RJob].to[Vector].map(_.headOption) sql.query[RJob].to[Vector].map(_.headOption)
} }
def selectGroupInState(states: Seq[JobState]): ConnectionIO[Vector[Ident]] = { def selectGroupInState(states: NonEmptyList[JobState]): ConnectionIO[Vector[Ident]] = {
val sql = val sql =
selectDistinct(List(group), table, state.isOneOf(states)) ++ orderBy(group.f) Select(select(T.group), from(T), T.state.in(states)).orderBy(T.group)
sql.query[Ident].to[Vector] sql.run.query[Ident].to[Vector]
} }
def delete(jobId: Ident): ConnectionIO[Int] = def delete(jobId: Ident): ConnectionIO[Int] =
for { for {
n0 <- RJobLog.deleteAll(jobId) n0 <- RJobLog.deleteAll(jobId)
n1 <- deleteFrom(table, id.is(jobId)).update.run n1 <- DML.delete(T, T.id === jobId)
} yield n0 + n1 } yield n0 + n1
def findIdsDoneAndOlderThan(ts: Timestamp): Stream[ConnectionIO, Ident] = def findIdsDoneAndOlderThan(ts: Timestamp): Stream[ConnectionIO, Ident] =
selectSimple( run(
Seq(id), select(T.id),
table, from(T),
and(state.isOneOf(JobState.done.toSeq), or(finished.isNull, finished.isLt(ts))) T.state.in(JobState.done) && (T.finished.isNull || T.finished < ts)
).query[Ident].stream ).query[Ident].stream
def deleteDoneAndOlderThan(ts: Timestamp, batch: Int): ConnectionIO[Int] = def deleteDoneAndOlderThan(ts: Timestamp, batch: Int): ConnectionIO[Int] =
@ -277,10 +288,10 @@ object RJob {
.foldMonoid .foldMonoid
def findNonFinalByTracker(trackerId: Ident): ConnectionIO[Option[RJob]] = def findNonFinalByTracker(trackerId: Ident): ConnectionIO[Option[RJob]] =
selectSimple( run(
all, select(T.all),
table, from(T),
and(tracker.is(trackerId), state.isOneOf(JobState.all.diff(JobState.done).toSeq)) where(T.tracker === trackerId, T.state.in(JobState.notDone))
).query[RJob].option ).query[RJob].option
} }

View File

@ -3,8 +3,8 @@ package docspell.store.records
import cats.implicits._ import cats.implicits._
import docspell.common._ import docspell.common._
import docspell.store.impl.Column import docspell.store.qb.DSL._
import docspell.store.impl.Implicits._ import docspell.store.qb._
import doobie._ import doobie._
import doobie.implicits._ import doobie.implicits._
@ -12,25 +12,27 @@ import doobie.implicits._
case class RJobGroupUse(groupId: Ident, workerId: Ident) {} case class RJobGroupUse(groupId: Ident, workerId: Ident) {}
object RJobGroupUse { object RJobGroupUse {
final case class Table(alias: Option[String]) extends TableDef {
val tableName = "jobgroupuse"
val table = fr"jobgroupuse" val group = Column[Ident]("groupid", this)
val worker = Column[Ident]("workerid", this)
object Columns {
val group = Column("groupid")
val worker = Column("workerid")
val all = List(group, worker) val all = List(group, worker)
} }
import Columns._
val T = Table(None)
def as(alias: String): Table =
Table(Some(alias))
def insert(v: RJobGroupUse): ConnectionIO[Int] = def insert(v: RJobGroupUse): ConnectionIO[Int] =
insertRow(table, all, fr"${v.groupId},${v.workerId}").update.run DML.insert(T, T.all, fr"${v.groupId},${v.workerId}")
def updateGroup(v: RJobGroupUse): ConnectionIO[Int] = def updateGroup(v: RJobGroupUse): ConnectionIO[Int] =
updateRow(table, worker.is(v.workerId), group.setTo(v.groupId)).update.run DML.update(T, T.worker === v.workerId, DML.set(T.group.setTo(v.groupId)))
def setGroup(v: RJobGroupUse): ConnectionIO[Int] = def setGroup(v: RJobGroupUse): ConnectionIO[Int] =
updateGroup(v).flatMap(n => if (n > 0) n.pure[ConnectionIO] else insert(v)) updateGroup(v).flatMap(n => if (n > 0) n.pure[ConnectionIO] else insert(v))
def findGroup(workerId: Ident): ConnectionIO[Option[Ident]] = def findGroup(workerId: Ident): ConnectionIO[Option[Ident]] =
selectSimple(List(group), table, worker.is(workerId)).query[Ident].option run(select(T.group), from(T), T.worker === workerId).query[Ident].option
} }

View File

@ -1,8 +1,8 @@
package docspell.store.records package docspell.store.records
import docspell.common._ import docspell.common._
import docspell.store.impl.Column import docspell.store.qb.DSL._
import docspell.store.impl.Implicits._ import docspell.store.qb._
import doobie._ import doobie._
import doobie.implicits._ import doobie.implicits._
@ -16,35 +16,39 @@ case class RJobLog(
) {} ) {}
object RJobLog { object RJobLog {
final case class Table(alias: Option[String]) extends TableDef {
val tableName = "joblog"
val table = fr"joblog" val id = Column[Ident]("id", this)
val jobId = Column[Ident]("jid", this)
object Columns { val level = Column[LogLevel]("level", this)
val id = Column("id") val created = Column[Timestamp]("created", this)
val jobId = Column("jid") val message = Column[String]("message", this)
val level = Column("level")
val created = Column("created")
val message = Column("message")
val all = List(id, jobId, level, created, message) val all = List(id, jobId, level, created, message)
// separate column only for sorting, so not included in `all` and // separate column only for sorting, so not included in `all` and
// the case class // the case class
val counter = Column("counter") val counter = Column[Long]("counter", this)
} }
import Columns._
val T = Table(None)
def as(alias: String): Table =
Table(Some(alias))
def insert(v: RJobLog): ConnectionIO[Int] = def insert(v: RJobLog): ConnectionIO[Int] =
insertRow( DML.insert(
table, T,
all, T.all,
fr"${v.id},${v.jobId},${v.level},${v.created},${v.message}" fr"${v.id},${v.jobId},${v.level},${v.created},${v.message}"
).update.run )
def findLogs(id: Ident): ConnectionIO[Vector[RJobLog]] = def findLogs(id: Ident): ConnectionIO[Vector[RJobLog]] =
(selectSimple(all, table, jobId.is(id)) ++ orderBy(created.asc, counter.asc)) Select(select(T.all), from(T), T.jobId === id)
.orderBy(T.created.asc, T.counter.asc)
.run
.query[RJobLog] .query[RJobLog]
.to[Vector] .to[Vector]
def deleteAll(job: Ident): ConnectionIO[Int] = def deleteAll(job: Ident): ConnectionIO[Int] =
deleteFrom(table, jobId.is(job)).update.run DML.delete(T, T.jobId === job)
} }