From 718e44a21cb637a51006b7ba08bbd3b2fab194c0 Mon Sep 17 00:00:00 2001 From: Eike Kettner Date: Mon, 9 Mar 2020 20:24:00 +0100 Subject: [PATCH] Add cleanup jobs task --- .../joex/src/main/resources/reference.conf | 28 +++++++++++++-- .../scala/docspell/joex/JoexAppImpl.scala | 15 ++++++-- .../docspell/joex/hk/CleanupInvitesTask.scala | 22 ++++++------ .../docspell/joex/hk/CleanupJobsTask.scala | 36 +++++++++++++++++++ .../docspell/joex/hk/HouseKeepingConfig.scala | 7 ++-- .../docspell/joex/hk/HouseKeepingTask.scala | 31 ++++++---------- .../scala/docspell/joex/scheduler/Task.scala | 4 +++ .../scala/docspell/store/records/RJob.scala | 23 ++++++++++-- 8 files changed, 126 insertions(+), 40 deletions(-) create mode 100644 modules/joex/src/main/scala/docspell/joex/hk/CleanupJobsTask.scala diff --git a/modules/joex/src/main/resources/reference.conf b/modules/joex/src/main/resources/reference.conf index 461f6fce..cd345cfb 100644 --- a/modules/joex/src/main/resources/reference.conf +++ b/modules/joex/src/main/resources/reference.conf @@ -81,15 +81,39 @@ docspell.joex { # invites, that can be configured here. house-keeping { - # When the house keeping task executes. Default is every 4 hours. - schedule = "*-*-* 0/4:00:00" + # When the house keeping tasks execute. Default is to run every + # week. + schedule = "Sun *-*-* 00:00:00" # This task removes invitation keys that have been created but not # used. The timespan here must be greater than the `invite-time' # setting in the rest server config file. cleanup-invites = { + + # Whether this task is enabled. + enabled = true + + # The minimum age of invites to be deleted. older-than = "30 days" } + + # Jobs store their log output in the database. Normally this data + # is only interesting for some period of time. The processing logs + # of old files can be removed eventually. + cleanup-jobs = { + + # Whether this task is enabled. + enabled = true + + # The minimum age of jobs to delete. It is matched against the + # `finished' timestamp. + older-than = "30 days" + + # This defines how many jobs are deleted in one transaction. + # Since the data to delete may get large, it can be configured + # whether more or less memory should be used. + delete-batch = "100" + } } # Configuration of text extraction diff --git a/modules/joex/src/main/scala/docspell/joex/JoexAppImpl.scala b/modules/joex/src/main/scala/docspell/joex/JoexAppImpl.scala index c16a1a33..f613b736 100644 --- a/modules/joex/src/main/scala/docspell/joex/JoexAppImpl.scala +++ b/modules/joex/src/main/scala/docspell/joex/JoexAppImpl.scala @@ -29,7 +29,7 @@ final class JoexAppImpl[F[_]: ConcurrentEffect: ContextShift: Timer]( val run = scheduler.start.compile.drain val prun = periodicScheduler.start.compile.drain for { - _ <- HouseKeepingTask.submit(pstore, cfg.houseKeeping.schedule) + _ <- scheduleBackgroundTasks _ <- ConcurrentEffect[F].start(run) _ <- ConcurrentEffect[F].start(prun) _ <- scheduler.periodicAwake @@ -47,6 +47,8 @@ final class JoexAppImpl[F[_]: ConcurrentEffect: ContextShift: Timer]( def initShutdown: F[Unit] = periodicScheduler.shutdown *> scheduler.shutdown(false) *> termSignal.set(true) + private def scheduleBackgroundTasks: F[Unit] = + HouseKeepingTask.periodicTask[F](cfg.houseKeeping.schedule).flatMap(pstore.insert) } object JoexAppImpl { @@ -59,7 +61,7 @@ object JoexAppImpl { blocker: Blocker ): Resource[F, JoexApp[F]] = for { - client <- JoexClient.resource(clientEC) + client <- JoexClient.resource(clientEC) store <- Store.create(cfg.jdbc, connectEC, blocker) queue <- JobQueue(store) pstore <- PeriodicTaskStore.create(store) @@ -81,7 +83,14 @@ object JoexAppImpl { ) ) .resource - psch <- PeriodicScheduler.create(cfg.periodicScheduler, sch, queue, pstore, client, Timer[F]) + psch <- PeriodicScheduler.create( + cfg.periodicScheduler, + sch, + queue, + pstore, + client, + Timer[F] + ) app = new JoexAppImpl(cfg, nodeOps, store, pstore, termSignal, sch, psch) appR <- Resource.make(app.init.map(_ => app))(_.shutdown) } yield appR diff --git a/modules/joex/src/main/scala/docspell/joex/hk/CleanupInvitesTask.scala b/modules/joex/src/main/scala/docspell/joex/hk/CleanupInvitesTask.scala index 592a80b8..345d488d 100644 --- a/modules/joex/src/main/scala/docspell/joex/hk/CleanupInvitesTask.scala +++ b/modules/joex/src/main/scala/docspell/joex/hk/CleanupInvitesTask.scala @@ -4,21 +4,23 @@ import cats.implicits._ import cats.effect._ import docspell.common._ -import docspell.joex.Config import docspell.joex.scheduler.Task import docspell.store.records._ object CleanupInvitesTask { - def apply[F[_]: Sync: ContextShift](cfg: Config): Task[F, Unit, Unit] = + def apply[F[_]: Sync](cfg: HouseKeepingConfig.CleanupInvites): Task[F, Unit, Unit] = Task { ctx => - val threshold = cfg.houseKeeping.cleanupInvites.olderThan - for { - now <- Timestamp.current[F] - ts = now - threshold - _ <- ctx.logger.info(s"Cleanup invitations older than $ts") - n <- ctx.store.transact(RInvitation.deleteOlderThan(ts)) - _ <- ctx.logger.info(s"Removed $n invitations") - } yield () + if (cfg.enabled) { + for { + now <- Timestamp.current[F] + ts = now - cfg.olderThan + _ <- ctx.logger.info(s"Cleanup invitations older than $ts") + n <- ctx.store.transact(RInvitation.deleteOlderThan(ts)) + _ <- ctx.logger.info(s"Removed $n invitations") + } yield () + } else { + ctx.logger.info("CleanupInvites task is disabled in the configuration") + } } } diff --git a/modules/joex/src/main/scala/docspell/joex/hk/CleanupJobsTask.scala b/modules/joex/src/main/scala/docspell/joex/hk/CleanupJobsTask.scala new file mode 100644 index 00000000..43105814 --- /dev/null +++ b/modules/joex/src/main/scala/docspell/joex/hk/CleanupJobsTask.scala @@ -0,0 +1,36 @@ +package docspell.joex.hk + +import cats.implicits._ +import cats.effect._ +import fs2.Stream + +import docspell.common._ +import docspell.joex.scheduler.Task +import docspell.store.Store +import docspell.store.records._ + +object CleanupJobsTask { + + def apply[F[_]: Sync](cfg: HouseKeepingConfig.CleanupJobs): Task[F, Unit, Unit] = + Task { ctx => + if (cfg.enabled) { + for { + now <- Timestamp.current[F] + ts = now - cfg.olderThan + _ <- ctx.logger.info(s"Cleanup jobs older than $ts") + n <- deleteDoneJobs(ctx.store, ts, cfg.deleteBatch) + _ <- ctx.logger.info(s"Removed $n jobs") + } yield () + } else { + ctx.logger.info("CleanupJobs task is disabled in the configuration") + } + } + + def deleteDoneJobs[F[_]: Sync](store: Store[F], ts: Timestamp, batch: Int): F[Int] = + Stream + .eval(store.transact(RJob.deleteDoneAndOlderThan(ts, batch))) + .repeat + .takeWhile(_ > 0) + .compile + .foldMonoid +} diff --git a/modules/joex/src/main/scala/docspell/joex/hk/HouseKeepingConfig.scala b/modules/joex/src/main/scala/docspell/joex/hk/HouseKeepingConfig.scala index bf826db2..217d7d88 100644 --- a/modules/joex/src/main/scala/docspell/joex/hk/HouseKeepingConfig.scala +++ b/modules/joex/src/main/scala/docspell/joex/hk/HouseKeepingConfig.scala @@ -7,11 +7,14 @@ import HouseKeepingConfig._ case class HouseKeepingConfig( schedule: CalEvent, - cleanupInvites: CleanupInvites + cleanupInvites: CleanupInvites, + cleanupJobs: CleanupJobs ) object HouseKeepingConfig { - case class CleanupInvites(olderThan: Duration) + case class CleanupInvites(enabled: Boolean, olderThan: Duration) + + case class CleanupJobs(enabled: Boolean, olderThan: Duration, deleteBatch: Int) } diff --git a/modules/joex/src/main/scala/docspell/joex/hk/HouseKeepingTask.scala b/modules/joex/src/main/scala/docspell/joex/hk/HouseKeepingTask.scala index 07339baf..1d4d558d 100644 --- a/modules/joex/src/main/scala/docspell/joex/hk/HouseKeepingTask.scala +++ b/modules/joex/src/main/scala/docspell/joex/hk/HouseKeepingTask.scala @@ -7,7 +7,6 @@ import com.github.eikek.calev._ import docspell.common._ import docspell.joex.Config import docspell.joex.scheduler.Task -import docspell.store.queue._ import docspell.store.records._ object HouseKeepingTask { @@ -16,19 +15,17 @@ object HouseKeepingTask { val taskName: Ident = Ident.unsafe("housekeeping") - def apply[F[_]: Sync: ContextShift](cfg: Config): Task[F, Unit, Unit] = - log[F](_.info(s"Running house-keeping task now")) - .flatMap(_ => CleanupInvitesTask(cfg)) + def apply[F[_]: Sync](cfg: Config): Task[F, Unit, Unit] = + Task.log[F](_.info(s"Running house-keeping task now")) + .flatMap(_ => CleanupInvitesTask(cfg.houseKeeping.cleanupInvites)) + .flatMap(_ => CleanupJobsTask(cfg.houseKeeping.cleanupJobs)) - def onCancel[F[_]: Sync: ContextShift]: Task[F, Unit, Unit] = - Task(_.logger.warn("Cancelling background task")) + def onCancel[F[_]: Sync]: Task[F, Unit, Unit] = + Task.log(_.warn("Cancelling house-keeping task")) - def submit[F[_]: Sync]( - pstore: PeriodicTaskStore[F], - ce: CalEvent - ): F[Unit] = { - val makeJob = - RPeriodicTask.createJson( + def periodicTask[F[_]: Sync](ce: CalEvent): F[RPeriodicTask] = + RPeriodicTask + .createJson( true, taskName, systemGroup, @@ -38,13 +35,5 @@ object HouseKeepingTask { Priority.Low, ce ) - - for { - job <- makeJob - _ <- pstore.insert(job.copy(id = periodicId)).attempt - } yield () - } - - private def log[F[_]](f: Logger[F] => F[Unit]): Task[F, Unit, Unit] = - Task(ctx => f(ctx.logger)) + .map(_.copy(id = periodicId)) } diff --git a/modules/joex/src/main/scala/docspell/joex/scheduler/Task.scala b/modules/joex/src/main/scala/docspell/joex/scheduler/Task.scala index 8ac59a1b..ae35fab2 100644 --- a/modules/joex/src/main/scala/docspell/joex/scheduler/Task.scala +++ b/modules/joex/src/main/scala/docspell/joex/scheduler/Task.scala @@ -4,6 +4,7 @@ import cats.implicits._ import cats.{Applicative, ApplicativeError, FlatMap, Functor} import cats.data.Kleisli import cats.effect.Sync +import docspell.common.Logger /** * The code that is executed by the scheduler @@ -51,4 +52,7 @@ object Task { def setProgress[F[_]: Sync, A, B](n: Int)(data: B): Task[F, A, B] = Task(_.setProgress(n).map(_ => data)) + + def log[F[_]](f: Logger[F] => F[Unit]): Task[F, Unit, Unit] = + Task(ctx => f(ctx.logger)) } diff --git a/modules/store/src/main/scala/docspell/store/records/RJob.scala b/modules/store/src/main/scala/docspell/store/records/RJob.scala index 87520852..9a286743 100644 --- a/modules/store/src/main/scala/docspell/store/records/RJob.scala +++ b/modules/store/src/main/scala/docspell/store/records/RJob.scala @@ -1,12 +1,15 @@ package docspell.store.records import cats.effect.Sync +import cats.implicits._ +import fs2.Stream import doobie._ import doobie.implicits._ +import io.circe.Encoder + import docspell.common._ import docspell.store.impl.Column import docspell.store.impl.Implicits._ -import io.circe.Encoder case class RJob( id: Ident, @@ -227,7 +230,8 @@ object RJob { } def selectGroupInState(states: Seq[JobState]): ConnectionIO[Vector[Ident]] = { - val sql = selectDistinct(List(group), table, state.isOneOf(states)) ++ orderBy(group.f) + val sql = + selectDistinct(List(group), table, state.isOneOf(states)) ++ orderBy(group.f) sql.query[Ident].to[Vector] } @@ -236,4 +240,19 @@ object RJob { n0 <- RJobLog.deleteAll(jobId) n1 <- deleteFrom(table, id.is(jobId)).update.run } yield n0 + n1 + + def findIdsDoneAndOlderThan(ts: Timestamp): Stream[ConnectionIO, Ident] = + selectSimple( + Seq(id), + table, + and(state.isOneOf(JobState.done.toSeq), or(finished.isNull, finished.isLt(ts))) + ).query[Ident].stream + + def deleteDoneAndOlderThan(ts: Timestamp, batch: Int): ConnectionIO[Int] = + findIdsDoneAndOlderThan(ts) + .take(batch.toLong) + .evalMap(delete) + .map(_ => 1) + .compile + .foldMonoid }