Implement storage routines for periodic scheduler

This commit is contained in:
Eike Kettner 2020-03-08 13:56:23 +01:00
parent 1e598bd902
commit 616c333fa5
7 changed files with 152 additions and 13 deletions

View File

@ -28,6 +28,8 @@ case class Timestamp(value: Instant) {
def atZone(zone: ZoneId): ZonedDateTime = def atZone(zone: ZoneId): ZonedDateTime =
value.atZone(zone) value.atZone(zone)
def atUTC: ZonedDateTime = atZone(Timestamp.UTC)
def asString: String = value.toString def asString: String = value.toString
} }
@ -39,6 +41,9 @@ object Timestamp {
def current[F[_]: Sync]: F[Timestamp] = def current[F[_]: Sync]: F[Timestamp] =
Sync[F].delay(Timestamp(Instant.now)) Sync[F].delay(Timestamp(Instant.now))
def from(zd: ZonedDateTime): Timestamp =
Timestamp(zd.toInstant)
implicit val encodeTimestamp: Encoder[Timestamp] = implicit val encodeTimestamp: Encoder[Timestamp] =
BaseJsonCodecs.encodeInstantEpoch.contramap(_.value) BaseJsonCodecs.encodeInstantEpoch.contramap(_.value)

View File

@ -94,10 +94,12 @@ final class PeriodicSchedulerImpl[F[_]: ConcurrentEffect: ContextShift](
.takeNext(config.name) .takeNext(config.name)
.use({ .use({
case Some(pj) => case Some(pj) =>
logger
.fdebug(s"Found periodic task '${pj.subject}/${pj.timer.asString}'") *>
(if (isTriggered(pj, now)) submitJob(pj) (if (isTriggered(pj, now)) submitJob(pj)
else scheduleNotify(pj)).map(_ => true) else scheduleNotify(pj)).map(_ => true)
case None => case None =>
false.pure[F] logger.fdebug("No periodic task found") *> false.pure[F]
}) })
) )
} yield go } yield go

View File

@ -13,3 +13,6 @@ CREATE TABLE `periodic_task` (
`nextrun` timestamp not null, `nextrun` timestamp not null,
`created` timestamp not null `created` timestamp not null
); );
CREATE INDEX `periodic_task_nextrun_idx` ON `periodic_task`(`nextrun`);
CREATE INDEX `periodic_task_worker_idx` ON `periodic_task`(`worker`);

View File

@ -8,6 +8,11 @@ CREATE TABLE "periodic_task" (
"submitter" varchar(254) not null, "submitter" varchar(254) not null,
"priority" int not null, "priority" int not null,
"worker" varchar(254), "worker" varchar(254),
"marked" timestamp,
"timer" varchar(254) not null, "timer" varchar(254) not null,
"nextrun" timestamp not null "nextrun" timestamp not null,
"created" timestamp not null
); );
CREATE INDEX "periodic_task_nextrun_idx" ON "periodic_task"("nextrun");
CREATE INDEX "periodic_task_worker_idx" ON "periodic_task"("worker");

View File

@ -0,0 +1,57 @@
package docspell.store.queries
//import cats.implicits._
import docspell.common._
//import docspell.common.syntax.all._
import docspell.store.impl.Implicits._
import docspell.store.records._
import doobie._
import doobie.implicits._
//import org.log4s._
object QPeriodicTask {
// private[this] val logger = getLogger
def clearWorkers(name: Ident): ConnectionIO[Int] = {
val worker = RPeriodicTask.Columns.worker
updateRow(RPeriodicTask.table, worker.is(name), worker.setTo[Ident](None)).update.run
}
def setWorker(pid: Ident, name: Ident): ConnectionIO[Int] = {
val id = RPeriodicTask.Columns.id
val worker = RPeriodicTask.Columns.worker
updateRow(RPeriodicTask.table, and(id.is(pid), worker.isNull), worker.setTo(name)).update.run
}
def unsetWorker(
pid: Ident,
nextRun: Option[Timestamp]
): ConnectionIO[Int] = {
val id = RPeriodicTask.Columns.id
val worker = RPeriodicTask.Columns.worker
val next = RPeriodicTask.Columns.nextrun
updateRow(
RPeriodicTask.table,
id.is(pid),
commas(worker.setTo[Ident](None), next.setTo(nextRun))
).update.run
}
def findNext: ConnectionIO[Option[RPeriodicTask]] = {
val order = orderBy(RPeriodicTask.Columns.nextrun.f) ++ fr"ASC"
val sql =
selectSimple(RPeriodicTask.Columns.all, RPeriodicTask.table, Fragment.empty) ++ order
sql.query[RPeriodicTask].streamWithChunkSize(2).take(1).compile.last
}
def findNonFinal(pid: Ident): ConnectionIO[Option[RJob]] =
selectSimple(
RJob.Columns.all,
RJob.table,
and(
RJob.Columns.tracker.is(pid),
RJob.Columns.state.isOneOf(JobState.all.diff(JobState.done).toSeq)
)
).query[RJob].option
}

View File

@ -3,9 +3,13 @@ package docspell.store.queue
import cats.effect._ import cats.effect._
import cats.implicits._ import cats.implicits._
import fs2.Stream import fs2.Stream
import org.log4s.getLogger
import com.github.eikek.fs2calev._
import docspell.common._ import docspell.common._
import docspell.store.Store import docspell.common.syntax.all._
import docspell.store.{AddResult, Store}
import docspell.store.records._ import docspell.store.records._
import docspell.store.queries.QPeriodicTask
trait PeriodicTaskStore[F[_]] { trait PeriodicTaskStore[F[_]] {
@ -22,10 +26,17 @@ trait PeriodicTaskStore[F[_]] {
def findNonFinalJob(pjobId: Ident): F[Option[RJob]] def findNonFinalJob(pjobId: Ident): F[Option[RJob]]
/** Insert a task or update if it already exists.
*/
def insert(task: RPeriodicTask): F[Unit] def insert(task: RPeriodicTask): F[Unit]
/** Adds the task only if it not already exists.
*/
def add(task: RPeriodicTask): F[AddResult]
} }
object PeriodicTaskStore { object PeriodicTaskStore {
private[this] val logger = getLogger
def create[F[_]: Sync](store: Store[F]): Resource[F, PeriodicTaskStore[F]] = def create[F[_]: Sync](store: Store[F]): Resource[F, PeriodicTaskStore[F]] =
Resource.pure[F, PeriodicTaskStore[F]](new PeriodicTaskStore[F] { Resource.pure[F, PeriodicTaskStore[F]](new PeriodicTaskStore[F] {
@ -57,21 +68,53 @@ object PeriodicTaskStore {
} }
def getNext: F[Option[RPeriodicTask]] = def getNext: F[Option[RPeriodicTask]] =
Sync[F].raiseError(new Exception("not implemented")) store.transact(QPeriodicTask.findNext)
def mark(pid: Ident, name: Ident): F[Boolean] = def mark(pid: Ident, name: Ident): F[Boolean] =
Sync[F].raiseError(new Exception(s"not implemented $pid $name")) store.transact(QPeriodicTask.setWorker(pid, name)).map(_ > 0)
def unmark(job: RPeriodicTask): F[Unit] = def unmark(job: RPeriodicTask): F[Unit] =
Sync[F].raiseError(new Exception(s"not implemented $job")) for {
now <- Timestamp.current[F]
nextRun <- CalevFs2
.nextElapses[F](now.atUTC)(job.timer)
.take(1)
.compile
.last
.map(_.map(Timestamp.from))
_ <- store.transact(QPeriodicTask.unsetWorker(job.id, nextRun))
} yield ()
def clearMarks(name: Ident): F[Unit] = def clearMarks(name: Ident): F[Unit] =
Sync[F].raiseError(new Exception("not implemented")) store
.transact(QPeriodicTask.clearWorkers(name))
.flatMap { n =>
if (n > 0) logger.finfo(s"Clearing $n periodic tasks from worker ${name.id}")
else ().pure[F]
}
def findNonFinalJob(pjobId: Ident): F[Option[RJob]] = def findNonFinalJob(pjobId: Ident): F[Option[RJob]] =
Sync[F].raiseError(new Exception("not implemented")) store.transact(QPeriodicTask.findNonFinal(pjobId))
def insert(task: RPeriodicTask): F[Unit] = def insert(task: RPeriodicTask): F[Unit] = {
Sync[F].raiseError(new Exception("not implemented")) val update = store.transact(RPeriodicTask.update(task))
val insertAttempt = store.transact(RPeriodicTask.insert(task))
.attempt.map {
case Right(n) => n > 0
case Left(_) => false
}
for {
n1 <- update
ins <- if (n1 == 0) insertAttempt else true.pure[F]
_ <- if (ins) 1.pure[F] else update
} yield ()
}
def add(task: RPeriodicTask): F[AddResult] = {
val insert = RPeriodicTask.insert(task)
val exists = RPeriodicTask.exists(task.id)
store.add(insert, exists)
}
}) })
} }

View File

@ -133,9 +133,33 @@ object RPeriodicTask {
val sql = insertRow( val sql = insertRow(
table, table,
all, all,
fr"${v.id},${v.enabled},${v.task},${v.group},${v.args},${v.subject},${v.submitter},${v.priority},${v.worker},${v.marked},${v.timer},${v.nextrun},${v.created}" fr"${v.id},${v.enabled},${v.task},${v.group},${v.args}," ++
fr"${v.subject},${v.submitter},${v.priority},${v.worker}," ++
fr"${v.marked},${v.timer},${v.nextrun},${v.created}"
) )
sql.update.run sql.update.run
} }
def update(v: RPeriodicTask): ConnectionIO[Int] = {
val sql = updateRow(
table,
id.is(v.id),
commas(
enabled.setTo(v.enabled),
group.setTo(v.group),
args.setTo(v.args),
subject.setTo(v.subject),
submitter.setTo(v.submitter),
priority.setTo(v.priority),
worker.setTo(v.worker),
marked.setTo(v.marked),
timer.setTo(v.timer),
nextrun.setTo(v.nextrun)
)
)
sql.update.run
}
def exists(pid: Ident): ConnectionIO[Boolean] =
selectCount(id, table, id.is(pid)).query[Int].unique.map(_ > 0)
} }