Add cleanup jobs task

This commit is contained in:
Eike Kettner 2020-03-09 20:24:00 +01:00
parent 854a596da3
commit 718e44a21c
8 changed files with 126 additions and 40 deletions

View File

@ -81,15 +81,39 @@ docspell.joex {
# invites, that can be configured here. # invites, that can be configured here.
house-keeping { house-keeping {
# When the house keeping task executes. Default is every 4 hours. # When the house keeping tasks execute. Default is to run every
schedule = "*-*-* 0/4:00:00" # week.
schedule = "Sun *-*-* 00:00:00"
# This task removes invitation keys that have been created but not # This task removes invitation keys that have been created but not
# used. The timespan here must be greater than the `invite-time' # used. The timespan here must be greater than the `invite-time'
# setting in the rest server config file. # setting in the rest server config file.
cleanup-invites = { cleanup-invites = {
# Whether this task is enabled.
enabled = true
# The minimum age of invites to be deleted.
older-than = "30 days" older-than = "30 days"
} }
# Jobs store their log output in the database. Normally this data
# is only interesting for some period of time. The processing logs
# of old files can be removed eventually.
cleanup-jobs = {
# Whether this task is enabled.
enabled = true
# The minimum age of jobs to delete. It is matched against the
# `finished' timestamp.
older-than = "30 days"
# This defines how many jobs are deleted in one transaction.
# Since the data to delete may get large, it can be configured
# whether more or less memory should be used.
delete-batch = "100"
}
} }
# Configuration of text extraction # Configuration of text extraction

View File

@ -29,7 +29,7 @@ final class JoexAppImpl[F[_]: ConcurrentEffect: ContextShift: Timer](
val run = scheduler.start.compile.drain val run = scheduler.start.compile.drain
val prun = periodicScheduler.start.compile.drain val prun = periodicScheduler.start.compile.drain
for { for {
_ <- HouseKeepingTask.submit(pstore, cfg.houseKeeping.schedule) _ <- scheduleBackgroundTasks
_ <- ConcurrentEffect[F].start(run) _ <- ConcurrentEffect[F].start(run)
_ <- ConcurrentEffect[F].start(prun) _ <- ConcurrentEffect[F].start(prun)
_ <- scheduler.periodicAwake _ <- scheduler.periodicAwake
@ -47,6 +47,8 @@ final class JoexAppImpl[F[_]: ConcurrentEffect: ContextShift: Timer](
def initShutdown: F[Unit] = def initShutdown: F[Unit] =
periodicScheduler.shutdown *> scheduler.shutdown(false) *> termSignal.set(true) periodicScheduler.shutdown *> scheduler.shutdown(false) *> termSignal.set(true)
private def scheduleBackgroundTasks: F[Unit] =
HouseKeepingTask.periodicTask[F](cfg.houseKeeping.schedule).flatMap(pstore.insert)
} }
object JoexAppImpl { object JoexAppImpl {
@ -59,7 +61,7 @@ object JoexAppImpl {
blocker: Blocker blocker: Blocker
): Resource[F, JoexApp[F]] = ): Resource[F, JoexApp[F]] =
for { for {
client <- JoexClient.resource(clientEC) client <- JoexClient.resource(clientEC)
store <- Store.create(cfg.jdbc, connectEC, blocker) store <- Store.create(cfg.jdbc, connectEC, blocker)
queue <- JobQueue(store) queue <- JobQueue(store)
pstore <- PeriodicTaskStore.create(store) pstore <- PeriodicTaskStore.create(store)
@ -81,7 +83,14 @@ object JoexAppImpl {
) )
) )
.resource .resource
psch <- PeriodicScheduler.create(cfg.periodicScheduler, sch, queue, pstore, client, Timer[F]) psch <- PeriodicScheduler.create(
cfg.periodicScheduler,
sch,
queue,
pstore,
client,
Timer[F]
)
app = new JoexAppImpl(cfg, nodeOps, store, pstore, termSignal, sch, psch) app = new JoexAppImpl(cfg, nodeOps, store, pstore, termSignal, sch, psch)
appR <- Resource.make(app.init.map(_ => app))(_.shutdown) appR <- Resource.make(app.init.map(_ => app))(_.shutdown)
} yield appR } yield appR

View File

@ -4,21 +4,23 @@ import cats.implicits._
import cats.effect._ import cats.effect._
import docspell.common._ import docspell.common._
import docspell.joex.Config
import docspell.joex.scheduler.Task import docspell.joex.scheduler.Task
import docspell.store.records._ import docspell.store.records._
object CleanupInvitesTask { object CleanupInvitesTask {
def apply[F[_]: Sync: ContextShift](cfg: Config): Task[F, Unit, Unit] = def apply[F[_]: Sync](cfg: HouseKeepingConfig.CleanupInvites): Task[F, Unit, Unit] =
Task { ctx => Task { ctx =>
val threshold = cfg.houseKeeping.cleanupInvites.olderThan if (cfg.enabled) {
for { for {
now <- Timestamp.current[F] now <- Timestamp.current[F]
ts = now - threshold ts = now - cfg.olderThan
_ <- ctx.logger.info(s"Cleanup invitations older than $ts") _ <- ctx.logger.info(s"Cleanup invitations older than $ts")
n <- ctx.store.transact(RInvitation.deleteOlderThan(ts)) n <- ctx.store.transact(RInvitation.deleteOlderThan(ts))
_ <- ctx.logger.info(s"Removed $n invitations") _ <- ctx.logger.info(s"Removed $n invitations")
} yield () } yield ()
} else {
ctx.logger.info("CleanupInvites task is disabled in the configuration")
}
} }
} }

View File

@ -0,0 +1,36 @@
package docspell.joex.hk
import cats.implicits._
import cats.effect._
import fs2.Stream
import docspell.common._
import docspell.joex.scheduler.Task
import docspell.store.Store
import docspell.store.records._
object CleanupJobsTask {
def apply[F[_]: Sync](cfg: HouseKeepingConfig.CleanupJobs): Task[F, Unit, Unit] =
Task { ctx =>
if (cfg.enabled) {
for {
now <- Timestamp.current[F]
ts = now - cfg.olderThan
_ <- ctx.logger.info(s"Cleanup jobs older than $ts")
n <- deleteDoneJobs(ctx.store, ts, cfg.deleteBatch)
_ <- ctx.logger.info(s"Removed $n jobs")
} yield ()
} else {
ctx.logger.info("CleanupJobs task is disabled in the configuration")
}
}
def deleteDoneJobs[F[_]: Sync](store: Store[F], ts: Timestamp, batch: Int): F[Int] =
Stream
.eval(store.transact(RJob.deleteDoneAndOlderThan(ts, batch)))
.repeat
.takeWhile(_ > 0)
.compile
.foldMonoid
}

View File

@ -7,11 +7,14 @@ import HouseKeepingConfig._
case class HouseKeepingConfig( case class HouseKeepingConfig(
schedule: CalEvent, schedule: CalEvent,
cleanupInvites: CleanupInvites cleanupInvites: CleanupInvites,
cleanupJobs: CleanupJobs
) )
object HouseKeepingConfig { object HouseKeepingConfig {
case class CleanupInvites(olderThan: Duration) case class CleanupInvites(enabled: Boolean, olderThan: Duration)
case class CleanupJobs(enabled: Boolean, olderThan: Duration, deleteBatch: Int)
} }

View File

@ -7,7 +7,6 @@ import com.github.eikek.calev._
import docspell.common._ import docspell.common._
import docspell.joex.Config import docspell.joex.Config
import docspell.joex.scheduler.Task import docspell.joex.scheduler.Task
import docspell.store.queue._
import docspell.store.records._ import docspell.store.records._
object HouseKeepingTask { object HouseKeepingTask {
@ -16,19 +15,17 @@ object HouseKeepingTask {
val taskName: Ident = Ident.unsafe("housekeeping") val taskName: Ident = Ident.unsafe("housekeeping")
def apply[F[_]: Sync: ContextShift](cfg: Config): Task[F, Unit, Unit] = def apply[F[_]: Sync](cfg: Config): Task[F, Unit, Unit] =
log[F](_.info(s"Running house-keeping task now")) Task.log[F](_.info(s"Running house-keeping task now"))
.flatMap(_ => CleanupInvitesTask(cfg)) .flatMap(_ => CleanupInvitesTask(cfg.houseKeeping.cleanupInvites))
.flatMap(_ => CleanupJobsTask(cfg.houseKeeping.cleanupJobs))
def onCancel[F[_]: Sync: ContextShift]: Task[F, Unit, Unit] = def onCancel[F[_]: Sync]: Task[F, Unit, Unit] =
Task(_.logger.warn("Cancelling background task")) Task.log(_.warn("Cancelling house-keeping task"))
def submit[F[_]: Sync]( def periodicTask[F[_]: Sync](ce: CalEvent): F[RPeriodicTask] =
pstore: PeriodicTaskStore[F], RPeriodicTask
ce: CalEvent .createJson(
): F[Unit] = {
val makeJob =
RPeriodicTask.createJson(
true, true,
taskName, taskName,
systemGroup, systemGroup,
@ -38,13 +35,5 @@ object HouseKeepingTask {
Priority.Low, Priority.Low,
ce ce
) )
.map(_.copy(id = periodicId))
for {
job <- makeJob
_ <- pstore.insert(job.copy(id = periodicId)).attempt
} yield ()
}
private def log[F[_]](f: Logger[F] => F[Unit]): Task[F, Unit, Unit] =
Task(ctx => f(ctx.logger))
} }

View File

@ -4,6 +4,7 @@ import cats.implicits._
import cats.{Applicative, ApplicativeError, FlatMap, Functor} import cats.{Applicative, ApplicativeError, FlatMap, Functor}
import cats.data.Kleisli import cats.data.Kleisli
import cats.effect.Sync import cats.effect.Sync
import docspell.common.Logger
/** /**
* The code that is executed by the scheduler * The code that is executed by the scheduler
@ -51,4 +52,7 @@ object Task {
def setProgress[F[_]: Sync, A, B](n: Int)(data: B): Task[F, A, B] = def setProgress[F[_]: Sync, A, B](n: Int)(data: B): Task[F, A, B] =
Task(_.setProgress(n).map(_ => data)) Task(_.setProgress(n).map(_ => data))
def log[F[_]](f: Logger[F] => F[Unit]): Task[F, Unit, Unit] =
Task(ctx => f(ctx.logger))
} }

View File

@ -1,12 +1,15 @@
package docspell.store.records package docspell.store.records
import cats.effect.Sync import cats.effect.Sync
import cats.implicits._
import fs2.Stream
import doobie._ import doobie._
import doobie.implicits._ import doobie.implicits._
import io.circe.Encoder
import docspell.common._ import docspell.common._
import docspell.store.impl.Column import docspell.store.impl.Column
import docspell.store.impl.Implicits._ import docspell.store.impl.Implicits._
import io.circe.Encoder
case class RJob( case class RJob(
id: Ident, id: Ident,
@ -227,7 +230,8 @@ object RJob {
} }
def selectGroupInState(states: Seq[JobState]): ConnectionIO[Vector[Ident]] = { def selectGroupInState(states: Seq[JobState]): ConnectionIO[Vector[Ident]] = {
val sql = selectDistinct(List(group), table, state.isOneOf(states)) ++ orderBy(group.f) val sql =
selectDistinct(List(group), table, state.isOneOf(states)) ++ orderBy(group.f)
sql.query[Ident].to[Vector] sql.query[Ident].to[Vector]
} }
@ -236,4 +240,19 @@ object RJob {
n0 <- RJobLog.deleteAll(jobId) n0 <- RJobLog.deleteAll(jobId)
n1 <- deleteFrom(table, id.is(jobId)).update.run n1 <- deleteFrom(table, id.is(jobId)).update.run
} yield n0 + n1 } yield n0 + n1
def findIdsDoneAndOlderThan(ts: Timestamp): Stream[ConnectionIO, Ident] =
selectSimple(
Seq(id),
table,
and(state.isOneOf(JobState.done.toSeq), or(finished.isNull, finished.isLt(ts)))
).query[Ident].stream
def deleteDoneAndOlderThan(ts: Timestamp, batch: Int): ConnectionIO[Int] =
findIdsDoneAndOlderThan(ts)
.take(batch.toLong)
.evalMap(delete)
.map(_ => 1)
.compile
.foldMonoid
} }