From 616c333fa56eb6056c2077830fa3e50f5c3fc4e9 Mon Sep 17 00:00:00 2001 From: Eike Kettner Date: Sun, 8 Mar 2020 13:56:23 +0100 Subject: [PATCH] Implement storage routines for periodic scheduler --- .../scala/docspell/common/Timestamp.scala | 5 ++ .../scheduler/PeriodicSchedulerImpl.scala | 8 ++- .../mariadb/V1.3.0__periodic_job.sql | 3 + .../postgresql/V1.3.0__periodic_job.sql | 7 ++- .../store/queries/QPeriodicTask.scala | 57 ++++++++++++++++++ .../store/queue/PeriodicTaskStore.scala | 59 ++++++++++++++++--- .../store/records/RPeriodicTask.scala | 26 +++++++- 7 files changed, 152 insertions(+), 13 deletions(-) create mode 100644 modules/store/src/main/scala/docspell/store/queries/QPeriodicTask.scala diff --git a/modules/common/src/main/scala/docspell/common/Timestamp.scala b/modules/common/src/main/scala/docspell/common/Timestamp.scala index ee0735dc..bd496efb 100644 --- a/modules/common/src/main/scala/docspell/common/Timestamp.scala +++ b/modules/common/src/main/scala/docspell/common/Timestamp.scala @@ -28,6 +28,8 @@ case class Timestamp(value: Instant) { def atZone(zone: ZoneId): ZonedDateTime = value.atZone(zone) + def atUTC: ZonedDateTime = atZone(Timestamp.UTC) + def asString: String = value.toString } @@ -39,6 +41,9 @@ object Timestamp { def current[F[_]: Sync]: F[Timestamp] = Sync[F].delay(Timestamp(Instant.now)) + def from(zd: ZonedDateTime): Timestamp = + Timestamp(zd.toInstant) + implicit val encodeTimestamp: Encoder[Timestamp] = BaseJsonCodecs.encodeInstantEpoch.contramap(_.value) diff --git a/modules/joex/src/main/scala/docspell/joex/scheduler/PeriodicSchedulerImpl.scala b/modules/joex/src/main/scala/docspell/joex/scheduler/PeriodicSchedulerImpl.scala index 421deacc..fe0c5c39 100644 --- a/modules/joex/src/main/scala/docspell/joex/scheduler/PeriodicSchedulerImpl.scala +++ b/modules/joex/src/main/scala/docspell/joex/scheduler/PeriodicSchedulerImpl.scala @@ -94,10 +94,12 @@ final class PeriodicSchedulerImpl[F[_]: ConcurrentEffect: ContextShift]( .takeNext(config.name) .use({ case Some(pj) => - (if (isTriggered(pj, now)) submitJob(pj) - else scheduleNotify(pj)).map(_ => true) + logger + .fdebug(s"Found periodic task '${pj.subject}/${pj.timer.asString}'") *> + (if (isTriggered(pj, now)) submitJob(pj) + else scheduleNotify(pj)).map(_ => true) case None => - false.pure[F] + logger.fdebug("No periodic task found") *> false.pure[F] }) ) } yield go diff --git a/modules/store/src/main/resources/db/migration/mariadb/V1.3.0__periodic_job.sql b/modules/store/src/main/resources/db/migration/mariadb/V1.3.0__periodic_job.sql index c882b38a..782d9592 100644 --- a/modules/store/src/main/resources/db/migration/mariadb/V1.3.0__periodic_job.sql +++ b/modules/store/src/main/resources/db/migration/mariadb/V1.3.0__periodic_job.sql @@ -13,3 +13,6 @@ CREATE TABLE `periodic_task` ( `nextrun` timestamp not null, `created` timestamp not null ); + +CREATE INDEX `periodic_task_nextrun_idx` ON `periodic_task`(`nextrun`); +CREATE INDEX `periodic_task_worker_idx` ON `periodic_task`(`worker`); diff --git a/modules/store/src/main/resources/db/migration/postgresql/V1.3.0__periodic_job.sql b/modules/store/src/main/resources/db/migration/postgresql/V1.3.0__periodic_job.sql index 57fc1178..215b9dd7 100644 --- a/modules/store/src/main/resources/db/migration/postgresql/V1.3.0__periodic_job.sql +++ b/modules/store/src/main/resources/db/migration/postgresql/V1.3.0__periodic_job.sql @@ -8,6 +8,11 @@ CREATE TABLE "periodic_task" ( "submitter" varchar(254) not null, "priority" int not null, "worker" varchar(254), + "marked" timestamp, "timer" varchar(254) not null, - "nextrun" timestamp not null + "nextrun" timestamp not null, + "created" timestamp not null ); + +CREATE INDEX "periodic_task_nextrun_idx" ON "periodic_task"("nextrun"); +CREATE INDEX "periodic_task_worker_idx" ON "periodic_task"("worker"); diff --git a/modules/store/src/main/scala/docspell/store/queries/QPeriodicTask.scala b/modules/store/src/main/scala/docspell/store/queries/QPeriodicTask.scala new file mode 100644 index 00000000..248afcfe --- /dev/null +++ b/modules/store/src/main/scala/docspell/store/queries/QPeriodicTask.scala @@ -0,0 +1,57 @@ +package docspell.store.queries + +//import cats.implicits._ +import docspell.common._ +//import docspell.common.syntax.all._ +import docspell.store.impl.Implicits._ +import docspell.store.records._ +import doobie._ +import doobie.implicits._ +//import org.log4s._ + +object QPeriodicTask { +// private[this] val logger = getLogger + + def clearWorkers(name: Ident): ConnectionIO[Int] = { + val worker = RPeriodicTask.Columns.worker + updateRow(RPeriodicTask.table, worker.is(name), worker.setTo[Ident](None)).update.run + } + + def setWorker(pid: Ident, name: Ident): ConnectionIO[Int] = { + val id = RPeriodicTask.Columns.id + val worker = RPeriodicTask.Columns.worker + updateRow(RPeriodicTask.table, and(id.is(pid), worker.isNull), worker.setTo(name)).update.run + } + + def unsetWorker( + pid: Ident, + nextRun: Option[Timestamp] + ): ConnectionIO[Int] = { + val id = RPeriodicTask.Columns.id + val worker = RPeriodicTask.Columns.worker + val next = RPeriodicTask.Columns.nextrun + updateRow( + RPeriodicTask.table, + id.is(pid), + commas(worker.setTo[Ident](None), next.setTo(nextRun)) + ).update.run + } + + def findNext: ConnectionIO[Option[RPeriodicTask]] = { + val order = orderBy(RPeriodicTask.Columns.nextrun.f) ++ fr"ASC" + val sql = + selectSimple(RPeriodicTask.Columns.all, RPeriodicTask.table, Fragment.empty) ++ order + sql.query[RPeriodicTask].streamWithChunkSize(2).take(1).compile.last + } + + def findNonFinal(pid: Ident): ConnectionIO[Option[RJob]] = + selectSimple( + RJob.Columns.all, + RJob.table, + and( + RJob.Columns.tracker.is(pid), + RJob.Columns.state.isOneOf(JobState.all.diff(JobState.done).toSeq) + ) + ).query[RJob].option + +} diff --git a/modules/store/src/main/scala/docspell/store/queue/PeriodicTaskStore.scala b/modules/store/src/main/scala/docspell/store/queue/PeriodicTaskStore.scala index ac9bce6b..477cce03 100644 --- a/modules/store/src/main/scala/docspell/store/queue/PeriodicTaskStore.scala +++ b/modules/store/src/main/scala/docspell/store/queue/PeriodicTaskStore.scala @@ -3,9 +3,13 @@ package docspell.store.queue import cats.effect._ import cats.implicits._ import fs2.Stream +import org.log4s.getLogger +import com.github.eikek.fs2calev._ import docspell.common._ -import docspell.store.Store +import docspell.common.syntax.all._ +import docspell.store.{AddResult, Store} import docspell.store.records._ +import docspell.store.queries.QPeriodicTask trait PeriodicTaskStore[F[_]] { @@ -22,10 +26,17 @@ trait PeriodicTaskStore[F[_]] { def findNonFinalJob(pjobId: Ident): F[Option[RJob]] + /** Insert a task or update if it already exists. + */ def insert(task: RPeriodicTask): F[Unit] + + /** Adds the task only if it not already exists. + */ + def add(task: RPeriodicTask): F[AddResult] } object PeriodicTaskStore { + private[this] val logger = getLogger def create[F[_]: Sync](store: Store[F]): Resource[F, PeriodicTaskStore[F]] = Resource.pure[F, PeriodicTaskStore[F]](new PeriodicTaskStore[F] { @@ -57,21 +68,53 @@ object PeriodicTaskStore { } def getNext: F[Option[RPeriodicTask]] = - Sync[F].raiseError(new Exception("not implemented")) + store.transact(QPeriodicTask.findNext) def mark(pid: Ident, name: Ident): F[Boolean] = - Sync[F].raiseError(new Exception(s"not implemented $pid $name")) + store.transact(QPeriodicTask.setWorker(pid, name)).map(_ > 0) def unmark(job: RPeriodicTask): F[Unit] = - Sync[F].raiseError(new Exception(s"not implemented $job")) + for { + now <- Timestamp.current[F] + nextRun <- CalevFs2 + .nextElapses[F](now.atUTC)(job.timer) + .take(1) + .compile + .last + .map(_.map(Timestamp.from)) + _ <- store.transact(QPeriodicTask.unsetWorker(job.id, nextRun)) + } yield () def clearMarks(name: Ident): F[Unit] = - Sync[F].raiseError(new Exception("not implemented")) + store + .transact(QPeriodicTask.clearWorkers(name)) + .flatMap { n => + if (n > 0) logger.finfo(s"Clearing $n periodic tasks from worker ${name.id}") + else ().pure[F] + } def findNonFinalJob(pjobId: Ident): F[Option[RJob]] = - Sync[F].raiseError(new Exception("not implemented")) + store.transact(QPeriodicTask.findNonFinal(pjobId)) - def insert(task: RPeriodicTask): F[Unit] = - Sync[F].raiseError(new Exception("not implemented")) + def insert(task: RPeriodicTask): F[Unit] = { + val update = store.transact(RPeriodicTask.update(task)) + val insertAttempt = store.transact(RPeriodicTask.insert(task)) + .attempt.map { + case Right(n) => n > 0 + case Left(_) => false + } + + for { + n1 <- update + ins <- if (n1 == 0) insertAttempt else true.pure[F] + _ <- if (ins) 1.pure[F] else update + } yield () + } + + def add(task: RPeriodicTask): F[AddResult] = { + val insert = RPeriodicTask.insert(task) + val exists = RPeriodicTask.exists(task.id) + store.add(insert, exists) + } }) } diff --git a/modules/store/src/main/scala/docspell/store/records/RPeriodicTask.scala b/modules/store/src/main/scala/docspell/store/records/RPeriodicTask.scala index 92b868d4..57d37e20 100644 --- a/modules/store/src/main/scala/docspell/store/records/RPeriodicTask.scala +++ b/modules/store/src/main/scala/docspell/store/records/RPeriodicTask.scala @@ -133,9 +133,33 @@ object RPeriodicTask { val sql = insertRow( table, all, - fr"${v.id},${v.enabled},${v.task},${v.group},${v.args},${v.subject},${v.submitter},${v.priority},${v.worker},${v.marked},${v.timer},${v.nextrun},${v.created}" + fr"${v.id},${v.enabled},${v.task},${v.group},${v.args}," ++ + fr"${v.subject},${v.submitter},${v.priority},${v.worker}," ++ + fr"${v.marked},${v.timer},${v.nextrun},${v.created}" ) sql.update.run } + def update(v: RPeriodicTask): ConnectionIO[Int] = { + val sql = updateRow( + table, + id.is(v.id), + commas( + enabled.setTo(v.enabled), + group.setTo(v.group), + args.setTo(v.args), + subject.setTo(v.subject), + submitter.setTo(v.submitter), + priority.setTo(v.priority), + worker.setTo(v.worker), + marked.setTo(v.marked), + timer.setTo(v.timer), + nextrun.setTo(v.nextrun) + ) + ) + sql.update.run + } + + def exists(pid: Ident): ConnectionIO[Boolean] = + selectCount(id, table, id.is(pid)).query[Int].unique.map(_ > 0) }