diff --git a/build.sbt b/build.sbt index 37aee634..68b11ccc 100644 --- a/build.sbt +++ b/build.sbt @@ -141,6 +141,7 @@ val common = project.in(file("modules/common")). Dependencies.fs2 ++ Dependencies.circe ++ Dependencies.loggingApi ++ + Dependencies.calevCore ++ Dependencies.pureconfig.map(_ % "optional") ) diff --git a/modules/common/src/main/scala/docspell/common/Timestamp.scala b/modules/common/src/main/scala/docspell/common/Timestamp.scala index bd496efb..d5298c56 100644 --- a/modules/common/src/main/scala/docspell/common/Timestamp.scala +++ b/modules/common/src/main/scala/docspell/common/Timestamp.scala @@ -16,6 +16,9 @@ case class Timestamp(value: Instant) { def minus(d: Duration): Timestamp = Timestamp(value.minusNanos(d.nanos)) + def - (d: Duration): Timestamp = + minus(d) + def minusHours(n: Long): Timestamp = Timestamp(value.minusSeconds(n * 60 * 60)) @@ -31,6 +34,9 @@ case class Timestamp(value: Instant) { def atUTC: ZonedDateTime = atZone(Timestamp.UTC) def asString: String = value.toString + + def < (other: Timestamp): Boolean = + this.value.isBefore(other.value) } object Timestamp { diff --git a/modules/common/src/main/scala/docspell/common/pureconfig/Implicits.scala b/modules/common/src/main/scala/docspell/common/pureconfig/Implicits.scala index 7fc880ab..ceee5629 100644 --- a/modules/common/src/main/scala/docspell/common/pureconfig/Implicits.scala +++ b/modules/common/src/main/scala/docspell/common/pureconfig/Implicits.scala @@ -4,6 +4,7 @@ import docspell.common._ import _root_.pureconfig._ import _root_.pureconfig.error.{CannotConvert, FailureReason} import scodec.bits.ByteVector +import com.github.eikek.calev.CalEvent import scala.reflect.ClassTag @@ -31,6 +32,10 @@ object Implicits { else ByteVector.encodeUtf8(str).left.map(ex => s"Invalid utf8 string: ${ex.getMessage}") }) + implicit val caleventReader: ConfigReader[CalEvent] = + ConfigReader[String].emap(reason(CalEvent.parse)) + + def reason[A: ClassTag](f: String => Either[String, A]): String => Either[FailureReason, A] = in => f(in).left.map(str => CannotConvert(in, implicitly[ClassTag[A]].runtimeClass.toString, str)) diff --git a/modules/joex/src/main/resources/reference.conf b/modules/joex/src/main/resources/reference.conf index ba056a83..461f6fce 100644 --- a/modules/joex/src/main/resources/reference.conf +++ b/modules/joex/src/main/resources/reference.conf @@ -77,6 +77,21 @@ docspell.joex { wakeup-period = "10 minutes" } + # Docspell uses periodic house keeping tasks, like cleaning expired + # invites, that can be configured here. + house-keeping { + + # When the house keeping task executes. Default is every 4 hours. + schedule = "*-*-* 0/4:00:00" + + # This task removes invitation keys that have been created but not + # used. The timespan here must be greater than the `invite-time' + # setting in the rest server config file. + cleanup-invites = { + older-than = "30 days" + } + } + # Configuration of text extraction extraction { # For PDF files it is first tried to read the text parts of the diff --git a/modules/joex/src/main/scala/docspell/joex/Config.scala b/modules/joex/src/main/scala/docspell/joex/Config.scala index a3f18d60..1d678766 100644 --- a/modules/joex/src/main/scala/docspell/joex/Config.scala +++ b/modules/joex/src/main/scala/docspell/joex/Config.scala @@ -5,6 +5,7 @@ import docspell.joex.scheduler.{PeriodicSchedulerConfig, SchedulerConfig} import docspell.store.JdbcConfig import docspell.convert.ConvertConfig import docspell.extract.ExtractConfig +import docspell.joex.hk.HouseKeepingConfig case class Config( appId: Ident, @@ -13,6 +14,7 @@ case class Config( jdbc: JdbcConfig, scheduler: SchedulerConfig, periodicScheduler: PeriodicSchedulerConfig, + houseKeeping: HouseKeepingConfig, extraction: ExtractConfig, convert: ConvertConfig ) diff --git a/modules/joex/src/main/scala/docspell/joex/JoexAppImpl.scala b/modules/joex/src/main/scala/docspell/joex/JoexAppImpl.scala index 45045137..c16a1a33 100644 --- a/modules/joex/src/main/scala/docspell/joex/JoexAppImpl.scala +++ b/modules/joex/src/main/scala/docspell/joex/JoexAppImpl.scala @@ -3,9 +3,10 @@ package docspell.joex import cats.implicits._ import cats.effect._ import docspell.common.{Ident, NodeType, ProcessItemArgs} -import docspell.joex.background._ +import docspell.joex.hk._ import docspell.joex.process.ItemHandler import docspell.joex.scheduler._ +import docspell.joexapi.client.JoexClient import docspell.store.Store import docspell.store.queue._ import docspell.store.ops.ONode @@ -18,6 +19,7 @@ final class JoexAppImpl[F[_]: ConcurrentEffect: ContextShift: Timer]( cfg: Config, nodeOps: ONode[F], store: Store[F], + pstore: PeriodicTaskStore[F], termSignal: SignallingRef[F, Boolean], val scheduler: Scheduler[F], val periodicScheduler: PeriodicScheduler[F] @@ -27,6 +29,7 @@ final class JoexAppImpl[F[_]: ConcurrentEffect: ContextShift: Timer]( val run = scheduler.start.compile.drain val prun = periodicScheduler.start.compile.drain for { + _ <- HouseKeepingTask.submit(pstore, cfg.houseKeeping.schedule) _ <- ConcurrentEffect[F].start(run) _ <- ConcurrentEffect[F].start(prun) _ <- scheduler.periodicAwake @@ -52,14 +55,15 @@ object JoexAppImpl { cfg: Config, termSignal: SignallingRef[F, Boolean], connectEC: ExecutionContext, + clientEC: ExecutionContext, blocker: Blocker ): Resource[F, JoexApp[F]] = for { + client <- JoexClient.resource(clientEC) store <- Store.create(cfg.jdbc, connectEC, blocker) queue <- JobQueue(store) pstore <- PeriodicTaskStore.create(store) nodeOps <- ONode(store) - psch <- PeriodicScheduler.create(cfg.periodicScheduler, queue, pstore, Timer[F]) sch <- SchedulerBuilder(cfg.scheduler, blocker, store) .withQueue(queue) .withTask( @@ -71,13 +75,14 @@ object JoexAppImpl { ) .withTask( JobTask.json( - PeriodicTask.taskName, - PeriodicTask[F](cfg), - PeriodicTask.onCancel[F] + HouseKeepingTask.taskName, + HouseKeepingTask[F](cfg), + HouseKeepingTask.onCancel[F] ) ) .resource - app = new JoexAppImpl(cfg, nodeOps, store, termSignal, sch, psch) + psch <- PeriodicScheduler.create(cfg.periodicScheduler, sch, queue, pstore, client, Timer[F]) + app = new JoexAppImpl(cfg, nodeOps, store, pstore, termSignal, sch, psch) appR <- Resource.make(app.init.map(_ => app))(_.shutdown) } yield appR } diff --git a/modules/joex/src/main/scala/docspell/joex/JoexServer.scala b/modules/joex/src/main/scala/docspell/joex/JoexServer.scala index 00919923..c5966549 100644 --- a/modules/joex/src/main/scala/docspell/joex/JoexServer.scala +++ b/modules/joex/src/main/scala/docspell/joex/JoexServer.scala @@ -24,13 +24,14 @@ object JoexServer { def stream[F[_]: ConcurrentEffect: ContextShift]( cfg: Config, connectEC: ExecutionContext, + clientEC: ExecutionContext, blocker: Blocker )(implicit T: Timer[F]): Stream[F, Nothing] = { val app = for { signal <- Resource.liftF(SignallingRef[F, Boolean](false)) exitCode <- Resource.liftF(Ref[F].of(ExitCode.Success)) - joexApp <- JoexAppImpl.create[F](cfg, signal, connectEC, blocker) + joexApp <- JoexAppImpl.create[F](cfg, signal, connectEC, clientEC, blocker) httpApp = Router( "/api/info" -> InfoRoutes(), diff --git a/modules/joex/src/main/scala/docspell/joex/Main.scala b/modules/joex/src/main/scala/docspell/joex/Main.scala index a800acdb..4c1f7319 100644 --- a/modules/joex/src/main/scala/docspell/joex/Main.scala +++ b/modules/joex/src/main/scala/docspell/joex/Main.scala @@ -13,10 +13,10 @@ import org.log4s._ object Main extends IOApp { private[this] val logger = getLogger - val blockingEc: ExecutionContext = ExecutionContext.fromExecutor( + val blockingEC: ExecutionContext = ExecutionContext.fromExecutor( Executors.newCachedThreadPool(ThreadFactories.ofName("docspell-joex-blocking")) ) - val blocker = Blocker.liftExecutionContext(blockingEc) + val blocker = Blocker.liftExecutionContext(blockingEC) val connectEC: ExecutionContext = ExecutionContext.fromExecutorService( Executors.newFixedThreadPool(5, ThreadFactories.ofName("docspell-joex-dbconnect")) ) @@ -52,6 +52,6 @@ object Main extends IOApp { cfg.baseUrl ) logger.info(s"\n${banner.render("***>")}") - JoexServer.stream[IO](cfg, connectEC, blocker).compile.drain.as(ExitCode.Success) + JoexServer.stream[IO](cfg, connectEC, blockingEC, blocker).compile.drain.as(ExitCode.Success) } } diff --git a/modules/joex/src/main/scala/docspell/joex/hk/CleanupInvitesTask.scala b/modules/joex/src/main/scala/docspell/joex/hk/CleanupInvitesTask.scala new file mode 100644 index 00000000..592a80b8 --- /dev/null +++ b/modules/joex/src/main/scala/docspell/joex/hk/CleanupInvitesTask.scala @@ -0,0 +1,24 @@ +package docspell.joex.hk + +import cats.implicits._ +import cats.effect._ + +import docspell.common._ +import docspell.joex.Config +import docspell.joex.scheduler.Task +import docspell.store.records._ + +object CleanupInvitesTask { + + def apply[F[_]: Sync: ContextShift](cfg: Config): Task[F, Unit, Unit] = + Task { ctx => + val threshold = cfg.houseKeeping.cleanupInvites.olderThan + for { + now <- Timestamp.current[F] + ts = now - threshold + _ <- ctx.logger.info(s"Cleanup invitations older than $ts") + n <- ctx.store.transact(RInvitation.deleteOlderThan(ts)) + _ <- ctx.logger.info(s"Removed $n invitations") + } yield () + } +} diff --git a/modules/joex/src/main/scala/docspell/joex/hk/HouseKeepingConfig.scala b/modules/joex/src/main/scala/docspell/joex/hk/HouseKeepingConfig.scala new file mode 100644 index 00000000..bf826db2 --- /dev/null +++ b/modules/joex/src/main/scala/docspell/joex/hk/HouseKeepingConfig.scala @@ -0,0 +1,17 @@ +package docspell.joex.hk + +import com.github.eikek.calev.CalEvent +import docspell.common._ + +import HouseKeepingConfig._ + +case class HouseKeepingConfig( + schedule: CalEvent, + cleanupInvites: CleanupInvites +) + +object HouseKeepingConfig { + + case class CleanupInvites(olderThan: Duration) + +} diff --git a/modules/joex/src/main/scala/docspell/joex/hk/HouseKeepingTask.scala b/modules/joex/src/main/scala/docspell/joex/hk/HouseKeepingTask.scala new file mode 100644 index 00000000..07339baf --- /dev/null +++ b/modules/joex/src/main/scala/docspell/joex/hk/HouseKeepingTask.scala @@ -0,0 +1,50 @@ +package docspell.joex.hk + +import cats.implicits._ +import cats.effect._ +import com.github.eikek.calev._ + +import docspell.common._ +import docspell.joex.Config +import docspell.joex.scheduler.Task +import docspell.store.queue._ +import docspell.store.records._ + +object HouseKeepingTask { + private val periodicId = Ident.unsafe("docspell-houskeeping") + val systemGroup: Ident = Ident.unsafe("docspell-system") + + val taskName: Ident = Ident.unsafe("housekeeping") + + def apply[F[_]: Sync: ContextShift](cfg: Config): Task[F, Unit, Unit] = + log[F](_.info(s"Running house-keeping task now")) + .flatMap(_ => CleanupInvitesTask(cfg)) + + def onCancel[F[_]: Sync: ContextShift]: Task[F, Unit, Unit] = + Task(_.logger.warn("Cancelling background task")) + + def submit[F[_]: Sync]( + pstore: PeriodicTaskStore[F], + ce: CalEvent + ): F[Unit] = { + val makeJob = + RPeriodicTask.createJson( + true, + taskName, + systemGroup, + (), + "Docspell house-keeping", + systemGroup, + Priority.Low, + ce + ) + + for { + job <- makeJob + _ <- pstore.insert(job.copy(id = periodicId)).attempt + } yield () + } + + private def log[F[_]](f: Logger[F] => F[Unit]): Task[F, Unit, Unit] = + Task(ctx => f(ctx.logger)) +} diff --git a/modules/joex/src/main/scala/docspell/joex/scheduler/PeriodicScheduler.scala b/modules/joex/src/main/scala/docspell/joex/scheduler/PeriodicScheduler.scala index 6789adfa..62d0c345 100644 --- a/modules/joex/src/main/scala/docspell/joex/scheduler/PeriodicScheduler.scala +++ b/modules/joex/src/main/scala/docspell/joex/scheduler/PeriodicScheduler.scala @@ -4,6 +4,7 @@ import fs2._ import fs2.concurrent.SignallingRef import cats.effect._ +import docspell.joexapi.client.JoexClient import docspell.store.queue._ /** A periodic scheduler takes care to submit periodic tasks to the @@ -32,14 +33,16 @@ object PeriodicScheduler { def create[F[_]: ConcurrentEffect: ContextShift]( cfg: PeriodicSchedulerConfig, + sch: Scheduler[F], queue: JobQueue[F], store: PeriodicTaskStore[F], + client: JoexClient[F], timer: Timer[F] ): Resource[F, PeriodicScheduler[F]] = for { waiter <- Resource.liftF(SignallingRef(true)) state <- Resource.liftF(SignallingRef(PeriodicSchedulerImpl.emptyState[F])) - psch = new PeriodicSchedulerImpl[F](cfg, queue, store, waiter, state, timer) + psch = new PeriodicSchedulerImpl[F](cfg, sch, queue, store, client, waiter, state, timer) _ <- Resource.liftF(psch.init) } yield psch diff --git a/modules/joex/src/main/scala/docspell/joex/scheduler/PeriodicSchedulerImpl.scala b/modules/joex/src/main/scala/docspell/joex/scheduler/PeriodicSchedulerImpl.scala index fe0c5c39..f8de3c8c 100644 --- a/modules/joex/src/main/scala/docspell/joex/scheduler/PeriodicSchedulerImpl.scala +++ b/modules/joex/src/main/scala/docspell/joex/scheduler/PeriodicSchedulerImpl.scala @@ -9,45 +9,18 @@ import com.github.eikek.fs2calev._ import docspell.common._ import docspell.common.syntax.all._ +import docspell.joexapi.client.JoexClient import docspell.store.queue._ import docspell.store.records.RPeriodicTask import PeriodicSchedulerImpl.State -/* -onStartUp: -- remove worker value from all of the current - -Loop: -- get earliest pjob -- if none: stop -- if triggered: - - mark worker, restart loop on fail - - submit new job - - check for non-final jobs of that name - - if exist: log info - - if not exist: submit - - update next trigger (in both cases) - - remove worker - - restart loop -- if future - - schedule notify - - stop loop - - -onNotify: -- cancel current scheduled notify -- start Loop - - -onShutdown: -- nothing to do - */ - final class PeriodicSchedulerImpl[F[_]: ConcurrentEffect: ContextShift]( val config: PeriodicSchedulerConfig, + sch: Scheduler[F], queue: JobQueue[F], store: PeriodicTaskStore[F], + client: JoexClient[F], waiter: SignallingRef[F, Boolean], state: SignallingRef[F, State[F]], timer: Timer[F] @@ -91,15 +64,18 @@ final class PeriodicSchedulerImpl[F[_]: ConcurrentEffect: ContextShift]( _ <- logger.fdebug(s"Looking for next periodic task") go <- logThrow("Error getting next task")( store - .takeNext(config.name) + .takeNext(config.name, None) .use({ - case Some(pj) => + case Marked.Found(pj) => logger .fdebug(s"Found periodic task '${pj.subject}/${pj.timer.asString}'") *> (if (isTriggered(pj, now)) submitJob(pj) - else scheduleNotify(pj)).map(_ => true) - case None => + else scheduleNotify(pj).map(_ => false)) + case Marked.NotFound => logger.fdebug("No periodic task found") *> false.pure[F] + case Marked.NotMarkable => + logger.fdebug("Periodic job cannot be marked. Trying again.") *> true + .pure[F] }) ) } yield go @@ -123,32 +99,44 @@ final class PeriodicSchedulerImpl[F[_]: ConcurrentEffect: ContextShift]( } def isTriggered(pj: RPeriodicTask, now: Timestamp): Boolean = - pj.timer.contains(now.value) + pj.nextrun < now - def submitJob(pj: RPeriodicTask): F[Unit] = + def submitJob(pj: RPeriodicTask): F[Boolean] = store .findNonFinalJob(pj.id) .flatMap({ case Some(job) => logger.finfo[F]( s"There is already a job with non-final state '${job.state}' in the queue" - ) + ) *> scheduleNotify(pj) *> false.pure[F] case None => logger.finfo[F](s"Submitting job for periodic task '${pj.task.id}'") *> - pj.toJob.flatMap(queue.insert) + pj.toJob.flatMap(queue.insert) *> notifyJoex *> true.pure[F] }) + def notifyJoex: F[Unit] = + sch.notifyChange *> store.findJoexNodes.flatMap( + _.traverse(n => client.notifyJoexIgnoreErrors(n.url)).map(_ => ()) + ) + def scheduleNotify(pj: RPeriodicTask): F[Unit] = - ConcurrentEffect[F] - .start( - CalevFs2 - .sleep[F](pj.timer) - .evalMap(_ => notifyChange) - .compile - .drain - ) - .flatMap(fb => state.modify(_.setNotify(fb))) + Timestamp + .current[F] + .flatMap(now => + logger.fdebug( + s"Scheduling next notify for timer ${pj.timer.asString} -> ${pj.timer.nextElapse(now.toUtcDateTime)}" + ) + ) *> + ConcurrentEffect[F] + .start( + CalevFs2 + .sleep[F](pj.timer) + .evalMap(_ => notifyChange) + .compile + .drain + ) + .flatMap(fb => state.modify(_.setNotify(fb))) def cancelNotify: F[Unit] = state diff --git a/modules/joexapi/src/main/scala/docspell/joexapi/client/JoexClient.scala b/modules/joexapi/src/main/scala/docspell/joexapi/client/JoexClient.scala index 964d4691..02a4c2cf 100644 --- a/modules/joexapi/src/main/scala/docspell/joexapi/client/JoexClient.scala +++ b/modules/joexapi/src/main/scala/docspell/joexapi/client/JoexClient.scala @@ -3,6 +3,7 @@ package docspell.joexapi.client import cats.implicits._ import cats.effect._ import docspell.common.{Ident, LenientUri} +import docspell.common.syntax.all._ import org.http4s.{Method, Request, Uri} import org.http4s.client.Client import org.http4s.client.blaze.BlazeClientBuilder @@ -28,21 +29,24 @@ object JoexClient { new JoexClient[F] { def notifyJoex(base: LenientUri): F[Unit] = { val notifyUrl = base / "api" / "v1" / "notify" - val req = Request[F](Method.POST, uri(notifyUrl)) - client.expect[String](req).map(_ => ()) + val req = Request[F](Method.POST, uri(notifyUrl)) + logger.fdebug(s"Notify joex at ${notifyUrl.asString}") *> + client.expect[String](req).map(_ => ()) } def notifyJoexIgnoreErrors(base: LenientUri): F[Unit] = notifyJoex(base).attempt.map { case Right(()) => () case Left(ex) => - logger.warn(s"Notifying Joex instance '${base.asString}' failed: ${ex.getMessage}") + logger.warn( + s"Notifying Joex instance '${base.asString}' failed: ${ex.getMessage}" + ) () } def cancelJob(base: LenientUri, job: Ident): F[Unit] = { val cancelUrl = base / "api" / "v1" / "job" / job.id / "cancel" - val req = Request[F](Method.POST, uri(cancelUrl)) + val req = Request[F](Method.POST, uri(cancelUrl)) client.expect[String](req).map(_ => ()) } @@ -52,4 +56,4 @@ object JoexClient { def resource[F[_]: ConcurrentEffect](ec: ExecutionContext): Resource[F, JoexClient[F]] = BlazeClientBuilder[F](ec).resource.map(apply[F]) -} \ No newline at end of file +} diff --git a/modules/microsite/docs/dev/adr.md b/modules/microsite/docs/dev/adr.md index 285571da..20c149b0 100644 --- a/modules/microsite/docs/dev/adr.md +++ b/modules/microsite/docs/dev/adr.md @@ -16,3 +16,4 @@ title: ADRs - [0009 Convert Office Files](adr/0009_convert_office_docs) - [0010 Convert Image Files](adr/0010_convert_image_files) - [0011 Extract Text](adr/0011_extract_text) +- [0012 Periodic Tasks](adr/0012_periodic_tasks) diff --git a/modules/microsite/docs/dev/adr/0012_periodic_tasks.md b/modules/microsite/docs/dev/adr/0012_periodic_tasks.md new file mode 100644 index 00000000..5edd559c --- /dev/null +++ b/modules/microsite/docs/dev/adr/0012_periodic_tasks.md @@ -0,0 +1,102 @@ +--- +layout: docs +title: Periodic Tasks +--- + +# Periodic Tasks + +## Context and Problem Statement + +Currently there is a `Scheduler` that consumes tasks off a queue in +the database. This allows multiple job executors running in parallel +racing for the next job to execute. This is for executing tasks +immediately – as long as there are enough resource. + +What is missing, is a component that maintains periodic tasks. The +reason for this is to have house keeping tasks that run regularily and +clean up stale or unused data. Later, users should be able to create +periodic tasks, for example to read e-mails from an inbox. + +The problem is again, that it must work with multiple job executor +instances running at the same time. This is the same pattern as with +the `Scheduler`: it must be ensured that only one task is used at a +time. Multiple job exectuors must not schedule a perdiodic task more +than once. If a periodic tasks takes longer than the time between +runs, it must wait for the next interval. + + +## Considered Options + +1. Adding a `timer` and `nextrun` field to the current `job` table +2. Creating a separate table for periodic tasks + +## Decision Outcome + +The 2. option. + +For internal housekeeping tasks, it may suffice to reuse the existing +`job` queue by adding more fields such that a job may be considered +periodic. But this conflates with what the `Scheduler` is doing now +(executing tasks as soon as possible while being bound to some +resources) with a completely different subject. + +There will be a new `PeriodicScheduler` that works on a new table in +the database that is representing periodic tasks. This table will +share fields with the `job` table to be able to create `RJob` +instances. This new component is only taking care of periodically +submitting jobs to the job queue such that the `Scheduler` will +eventually pick it up and run it. + +```sql +CREATE TABLE "periodic_task" ( + "id" varchar(254) not null primary key, + "enabled" boolean not null, + "task" varchar(254) not null, + "group_" varchar(254) not null, + "args" text not null, + "subject" varchar(254) not null, + "submitter" varchar(254) not null, + "priority" int not null, + "worker" varchar(254), + "marked" timestamp, + "timer" varchar(254) not null, + "nextrun" timestamp not null, + "created" timestamp not null +); +``` + +Preparing for other features, periodic tasks will be created by users. +It should be possible to disable/enable them. The next 6 properties +are needed to insert jobs into the `job` table. The `worker` field +(and `marked`) are used to mark a periodic job as "being worked on by +a job executor". + +The `timer` is the schedule, which is a +[systemd-like](https://man.cx/systemd.time#heading7) calendar event +string. This is parsed by [this +library](https://github.com/eikek/calev). The `nextrun` field will +store the timestamp of the next time the task would need to be +executed. This is needed to query this table for the newest task. + +The `PeriodicScheduler` works roughly like this: + +On startup: +- Remove stale worker values. If the process has been killed, there + may be marked tasks which must be cleared now. + +Main-Loop: +0. Cancel current scheduled notify (see 4. below) +1. get next (= earliest & enabled) periodic job +2. if none: stop +3. if triggered (= `nextrun <= 'now'`): + - Mark periodic task. On fail: goto 1. + - Submit new job into the jobqueue: + - Update `nextrun` field + - Check for non-final jobs of that name. This is required to not + run the same periodic task multiple times concurrently. + - if exist: goto 4. + - if not exist: submit job + - Unmark periodic task +4. if future + - schedule notify: notify self to run again next time the task + schedule triggers diff --git a/modules/microsite/docs/dev/adr/template.md b/modules/microsite/docs/dev/adr/template.md index 25696bbe..d531e39e 100644 --- a/modules/microsite/docs/dev/adr/template.md +++ b/modules/microsite/docs/dev/adr/template.md @@ -1,3 +1,8 @@ +--- +layout: docs +title: Short Title +--- + # [short title of solved problem and solution] * Status: [proposed | rejected | accepted | deprecated | … | superseded by [ADR-0005](0005-example.md)] diff --git a/modules/store/src/main/scala/docspell/store/impl/Column.scala b/modules/store/src/main/scala/docspell/store/impl/Column.scala index 9d697bf4..a42a09a3 100644 --- a/modules/store/src/main/scala/docspell/store/impl/Column.scala +++ b/modules/store/src/main/scala/docspell/store/impl/Column.scala @@ -30,6 +30,9 @@ case class Column(name: String, ns: String = "", alias: String = "") { def is(c: Column): Fragment = f ++ fr"=" ++ c.f + def isNot[A: Put](value: A): Fragment = + f ++ fr"<> $value" + def isNull: Fragment = f ++ fr"is null" diff --git a/modules/store/src/main/scala/docspell/store/queries/QPeriodicTask.scala b/modules/store/src/main/scala/docspell/store/queries/QPeriodicTask.scala index 248afcfe..e52c27cc 100644 --- a/modules/store/src/main/scala/docspell/store/queries/QPeriodicTask.scala +++ b/modules/store/src/main/scala/docspell/store/queries/QPeriodicTask.scala @@ -1,26 +1,27 @@ package docspell.store.queries -//import cats.implicits._ import docspell.common._ -//import docspell.common.syntax.all._ import docspell.store.impl.Implicits._ import docspell.store.records._ import doobie._ import doobie.implicits._ -//import org.log4s._ object QPeriodicTask { -// private[this] val logger = getLogger def clearWorkers(name: Ident): ConnectionIO[Int] = { val worker = RPeriodicTask.Columns.worker updateRow(RPeriodicTask.table, worker.is(name), worker.setTo[Ident](None)).update.run } - def setWorker(pid: Ident, name: Ident): ConnectionIO[Int] = { + def setWorker(pid: Ident, name: Ident, ts: Timestamp): ConnectionIO[Int] = { val id = RPeriodicTask.Columns.id val worker = RPeriodicTask.Columns.worker - updateRow(RPeriodicTask.table, and(id.is(pid), worker.isNull), worker.setTo(name)).update.run + val marked = RPeriodicTask.Columns.marked + updateRow( + RPeriodicTask.table, + and(id.is(pid), worker.isNull), + commas(worker.setTo(name), marked.setTo(ts)) + ).update.run } def unsetWorker( @@ -37,10 +38,17 @@ object QPeriodicTask { ).update.run } - def findNext: ConnectionIO[Option[RPeriodicTask]] = { - val order = orderBy(RPeriodicTask.Columns.nextrun.f) ++ fr"ASC" + def findNext(excl: Option[Ident]): ConnectionIO[Option[RPeriodicTask]] = { + val enabled = RPeriodicTask.Columns.enabled + val pid = RPeriodicTask.Columns.id + val order = orderBy(RPeriodicTask.Columns.nextrun.f) ++ fr"ASC" + + val where = excl match { + case Some(id) => and(pid.isNot(id), enabled.is(true)) + case None => enabled.is(true) + } val sql = - selectSimple(RPeriodicTask.Columns.all, RPeriodicTask.table, Fragment.empty) ++ order + selectSimple(RPeriodicTask.Columns.all, RPeriodicTask.table, where) ++ order sql.query[RPeriodicTask].streamWithChunkSize(2).take(1).compile.last } diff --git a/modules/store/src/main/scala/docspell/store/queue/JobQueue.scala b/modules/store/src/main/scala/docspell/store/queue/JobQueue.scala index f0a9d722..1eaf2cb7 100644 --- a/modules/store/src/main/scala/docspell/store/queue/JobQueue.scala +++ b/modules/store/src/main/scala/docspell/store/queue/JobQueue.scala @@ -29,7 +29,7 @@ object JobQueue { worker: Ident, retryPause: Duration ): F[Option[RJob]] = - logger.fdebug("Select next job") *> QJob.takeNextJob(store)(prio, worker, retryPause) + logger.ftrace("Select next job") *> QJob.takeNextJob(store)(prio, worker, retryPause) def insert(job: RJob): F[Unit] = store diff --git a/modules/store/src/main/scala/docspell/store/queue/Marked.scala b/modules/store/src/main/scala/docspell/store/queue/Marked.scala new file mode 100644 index 00000000..2b634198 --- /dev/null +++ b/modules/store/src/main/scala/docspell/store/queue/Marked.scala @@ -0,0 +1,17 @@ +package docspell.store.queue + +sealed trait Marked[+A] {} + +object Marked { + + final case class Found[A](value: A) extends Marked[A] + + final case object NotFound extends Marked[Nothing] + + final case object NotMarkable extends Marked[Nothing] + + + def found[A](v: A): Marked[A] = Found(v) + def notFound[A]: Marked[A] = NotFound + def notMarkable[A]: Marked[A] = NotMarkable +} diff --git a/modules/store/src/main/scala/docspell/store/queue/PeriodicTaskStore.scala b/modules/store/src/main/scala/docspell/store/queue/PeriodicTaskStore.scala index 477cce03..787ec6cf 100644 --- a/modules/store/src/main/scala/docspell/store/queue/PeriodicTaskStore.scala +++ b/modules/store/src/main/scala/docspell/store/queue/PeriodicTaskStore.scala @@ -2,7 +2,6 @@ package docspell.store.queue import cats.effect._ import cats.implicits._ -import fs2.Stream import org.log4s.getLogger import com.github.eikek.fs2calev._ import docspell.common._ @@ -20,7 +19,10 @@ trait PeriodicTaskStore[F[_]] { * care of unmarking the task after use and updating `nextRun` with * the next timestamp. */ - def takeNext(worker: Ident): Resource[F, Option[RPeriodicTask]] + def takeNext( + worker: Ident, + excludeId: Option[Ident] + ): Resource[F, Marked[RPeriodicTask]] def clearMarks(name: Ident): F[Unit] @@ -33,6 +35,8 @@ trait PeriodicTaskStore[F[_]] { /** Adds the task only if it not already exists. */ def add(task: RPeriodicTask): F[AddResult] + + def findJoexNodes: F[Vector[RNode]] } object PeriodicTaskStore { @@ -40,38 +44,37 @@ object PeriodicTaskStore { def create[F[_]: Sync](store: Store[F]): Resource[F, PeriodicTaskStore[F]] = Resource.pure[F, PeriodicTaskStore[F]](new PeriodicTaskStore[F] { - println(s"$store") - def takeNext(worker: Ident): Resource[F, Option[RPeriodicTask]] = { - val chooseNext: F[Either[String, Option[RPeriodicTask]]] = - getNext.flatMap { + def takeNext( + worker: Ident, + excludeId: Option[Ident] + ): Resource[F, Marked[RPeriodicTask]] = { + val chooseNext: F[Marked[RPeriodicTask]] = + getNext(excludeId).flatMap { case Some(pj) => mark(pj.id, worker).map { - case true => Right(Some(pj.copy(worker = worker.some))) - case false => Left("Cannot mark periodic task") + case true => Marked.found(pj.copy(worker = worker.some)) + case false => Marked.notMarkable } case None => - val result: Either[String, Option[RPeriodicTask]] = - Right(None) - result.pure[F] + Marked.notFound.pure[F] } - val get = - Stream.eval(chooseNext).repeat.take(10).find(_.isRight).compile.lastOrError - val r = Resource.make(get)({ - case Right(Some(pj)) => unmark(pj) - case _ => ().pure[F] + + Resource.make(chooseNext)({ + case Marked.Found(pj) => unmark(pj) + case _ => ().pure[F] }) - r.flatMap { - case Right(job) => Resource.pure(job) - case Left(err) => Resource.liftF(Sync[F].raiseError(new Exception(err))) - } } - def getNext: F[Option[RPeriodicTask]] = - store.transact(QPeriodicTask.findNext) + def getNext(excl: Option[Ident]): F[Option[RPeriodicTask]] = + store.transact(QPeriodicTask.findNext(excl)) def mark(pid: Ident, name: Ident): F[Boolean] = - store.transact(QPeriodicTask.setWorker(pid, name)).map(_ > 0) + Timestamp + .current[F] + .flatMap(now => + store.transact(QPeriodicTask.setWorker(pid, name, now)).map(_ > 0) + ) def unmark(job: RPeriodicTask): F[Unit] = for { @@ -98,16 +101,15 @@ object PeriodicTaskStore { def insert(task: RPeriodicTask): F[Unit] = { val update = store.transact(RPeriodicTask.update(task)) - val insertAttempt = store.transact(RPeriodicTask.insert(task)) - .attempt.map { - case Right(n) => n > 0 - case Left(_) => false - } + val insertAttempt = store.transact(RPeriodicTask.insert(task)).attempt.map { + case Right(n) => n > 0 + case Left(_) => false + } for { - n1 <- update + n1 <- update ins <- if (n1 == 0) insertAttempt else true.pure[F] - _ <- if (ins) 1.pure[F] else update + _ <- if (ins) 1.pure[F] else update } yield () } @@ -116,5 +118,9 @@ object PeriodicTaskStore { val exists = RPeriodicTask.exists(task.id) store.add(insert, exists) } + + def findJoexNodes: F[Vector[RNode]] = + store.transact(RNode.findAll(NodeType.Joex)) + }) } diff --git a/modules/store/src/main/scala/docspell/store/records/RInvitation.scala b/modules/store/src/main/scala/docspell/store/records/RInvitation.scala index 98d48a51..37764d0c 100644 --- a/modules/store/src/main/scala/docspell/store/records/RInvitation.scala +++ b/modules/store/src/main/scala/docspell/store/records/RInvitation.scala @@ -46,4 +46,7 @@ object RInvitation { _ <- delete(invite) } yield inv > 0 } + + def deleteOlderThan(ts: Timestamp): ConnectionIO[Int] = + deleteFrom(table, created.isLt(ts)).update.run } diff --git a/modules/store/src/main/scala/docspell/store/records/RPeriodicTask.scala b/modules/store/src/main/scala/docspell/store/records/RPeriodicTask.scala index 57d37e20..a510904b 100644 --- a/modules/store/src/main/scala/docspell/store/records/RPeriodicTask.scala +++ b/modules/store/src/main/scala/docspell/store/records/RPeriodicTask.scala @@ -8,6 +8,7 @@ import com.github.eikek.calev.CalEvent import docspell.common._ import docspell.store.impl.Column import docspell.store.impl.Implicits._ +import io.circe.Encoder /** A periodic task is a special job description, that shares a few * properties of a `RJob`. It must provide all information to create @@ -62,8 +63,6 @@ object RPeriodicTask { subject: String, submitter: Ident, priority: Priority, - worker: Option[Ident], - marked: Option[Timestamp], timer: CalEvent ): F[RPeriodicTask] = Ident @@ -81,8 +80,8 @@ object RPeriodicTask { subject, submitter, priority, - worker, - marked, + None, + None, timer, timer .nextElapse(now.atZone(Timestamp.UTC)) @@ -94,6 +93,18 @@ object RPeriodicTask { } ) + def createJson[F[_]: Sync, A]( + enabled: Boolean, + task: Ident, + group: Ident, + args: A, + subject: String, + submitter: Ident, + priority: Priority, + timer: CalEvent + )(implicit E: Encoder[A]): F[RPeriodicTask] = + create[F](enabled, task, group, E(args).noSpaces, subject, submitter, priority, timer) + val table = fr"periodic_task" object Columns { diff --git a/project/Dependencies.scala b/project/Dependencies.scala index dea1cff3..7d221bd2 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -38,10 +38,13 @@ object Dependencies { val ViewerJSVersion = "0.5.8" - val calev = Seq( + val calevCore = Seq( "com.github.eikek" %% "calev-core" % CalevVersion, + ) + val calevFs2 = Seq( "com.github.eikek" %% "calev-fs2" % CalevVersion ) + val calev = calevFs2 ++ calevCore val jclOverSlf4j = Seq( "org.slf4j" % "jcl-over-slf4j" % Slf4jVersion