Run file integrity check in house keeping tasks

This commit is contained in:
eikek
2022-03-12 12:06:36 +01:00
parent b71085761b
commit cd3db6ea08
15 changed files with 150 additions and 34 deletions

View File

@ -194,6 +194,11 @@ docspell.joex {
# How often the node must be unreachable, before it is removed. # How often the node must be unreachable, before it is removed.
min-not-found = 2 min-not-found = 2
} }
# Checks all files against their checksum
integrity-check {
enabled = true
}
} }
# A periodic task to check for new releases of docspell. It can # A periodic task to check for new releases of docspell. It can

View File

@ -194,7 +194,7 @@ object JoexAppImpl extends MailAddressCodec {
.withTask( .withTask(
JobTask.json( JobTask.json(
HouseKeepingTask.taskName, HouseKeepingTask.taskName,
HouseKeepingTask[F](cfg), HouseKeepingTask[F](cfg, fileRepo),
HouseKeepingTask.onCancel[F] HouseKeepingTask.onCancel[F]
) )
) )

View File

@ -7,6 +7,7 @@
package docspell.joex.fts package docspell.joex.fts
import cats.effect._ import cats.effect._
import cats.implicits._
import docspell.backend.fulltext.CreateIndex import docspell.backend.fulltext.CreateIndex
import docspell.common._ import docspell.common._
@ -42,7 +43,7 @@ object ReIndexTask {
(collective match { (collective match {
case Some(_) => case Some(_) =>
FtsWork FtsWork
.clearIndex(collective) .clearIndex[F](collective)
.recoverWith( .recoverWith(
FtsWork.log[F](_.info("Clearing data failed. Continue re-indexing.")) FtsWork.log[F](_.info("Clearing data failed. Continue re-indexing."))
) ++ ) ++

View File

@ -18,10 +18,9 @@ import org.http4s.blaze.client.BlazeClientBuilder
import org.http4s.client.Client import org.http4s.client.Client
object CheckNodesTask { object CheckNodesTask {
def apply[F[_]: Async]( def apply[F[_]: Async](
cfg: HouseKeepingConfig.CheckNodes cfg: HouseKeepingConfig.CheckNodes
): Task[F, Unit, Unit] = ): Task[F, Unit, CleanupResult] =
Task { ctx => Task { ctx =>
if (cfg.enabled) if (cfg.enabled)
for { for {
@ -35,9 +34,11 @@ object CheckNodesTask {
) )
n <- removeNodes(ctx, cfg) n <- removeNodes(ctx, cfg)
_ <- ctx.logger.info(s"Removed $n nodes") _ <- ctx.logger.info(s"Removed $n nodes")
} yield () } yield CleanupResult.of(n)
else else
ctx.logger.info("CheckNodes task is disabled in the configuration") ctx.logger.info("CheckNodes task is disabled in the configuration") *>
CleanupResult.disabled.pure[F]
} }
def checkNodes[F[_]: Async](ctx: Context[F, _], client: Client[F]): F[Unit] = def checkNodes[F[_]: Async](ctx: Context[F, _], client: Client[F]): F[Unit] =

View File

@ -15,7 +15,9 @@ import docspell.store.records._
object CleanupInvitesTask { object CleanupInvitesTask {
def apply[F[_]: Sync](cfg: HouseKeepingConfig.CleanupInvites): Task[F, Unit, Unit] = def apply[F[_]: Sync](
cfg: HouseKeepingConfig.CleanupInvites
): Task[F, Unit, CleanupResult] =
Task { ctx => Task { ctx =>
if (cfg.enabled) if (cfg.enabled)
for { for {
@ -24,8 +26,9 @@ object CleanupInvitesTask {
_ <- ctx.logger.info(s"Cleanup invitations older than $ts") _ <- ctx.logger.info(s"Cleanup invitations older than $ts")
n <- ctx.store.transact(RInvitation.deleteOlderThan(ts)) n <- ctx.store.transact(RInvitation.deleteOlderThan(ts))
_ <- ctx.logger.info(s"Removed $n invitations") _ <- ctx.logger.info(s"Removed $n invitations")
} yield () } yield CleanupResult.of(n)
else else
ctx.logger.info("CleanupInvites task is disabled in the configuration") ctx.logger.info("CleanupInvites task is disabled in the configuration") *>
CleanupResult.disabled.pure[F]
} }
} }

View File

@ -17,7 +17,9 @@ import docspell.store.records._
object CleanupJobsTask { object CleanupJobsTask {
def apply[F[_]: Sync](cfg: HouseKeepingConfig.CleanupJobs): Task[F, Unit, Unit] = def apply[F[_]: Sync](
cfg: HouseKeepingConfig.CleanupJobs
): Task[F, Unit, CleanupResult] =
Task { ctx => Task { ctx =>
if (cfg.enabled) if (cfg.enabled)
for { for {
@ -26,9 +28,10 @@ object CleanupJobsTask {
_ <- ctx.logger.info(s"Cleanup jobs older than $ts") _ <- ctx.logger.info(s"Cleanup jobs older than $ts")
n <- deleteDoneJobs(ctx.store, ts, cfg.deleteBatch) n <- deleteDoneJobs(ctx.store, ts, cfg.deleteBatch)
_ <- ctx.logger.info(s"Removed $n jobs") _ <- ctx.logger.info(s"Removed $n jobs")
} yield () } yield CleanupResult.of(n)
else else
ctx.logger.info("CleanupJobs task is disabled in the configuration") ctx.logger.info("CleanupJobs task is disabled in the configuration") *>
CleanupResult.disabled.pure[F]
} }
def deleteDoneJobs[F[_]: Sync](store: Store[F], ts: Timestamp, batch: Int): F[Int] = def deleteDoneJobs[F[_]: Sync](store: Store[F], ts: Timestamp, batch: Int): F[Int] =

View File

@ -14,8 +14,9 @@ import docspell.joex.scheduler.Task
import docspell.store.records._ import docspell.store.records._
object CleanupRememberMeTask { object CleanupRememberMeTask {
def apply[F[_]: Sync](
def apply[F[_]: Sync](cfg: HouseKeepingConfig.CleanupRememberMe): Task[F, Unit, Unit] = cfg: HouseKeepingConfig.CleanupRememberMe
): Task[F, Unit, CleanupResult] =
Task { ctx => Task { ctx =>
if (cfg.enabled) if (cfg.enabled)
for { for {
@ -24,8 +25,9 @@ object CleanupRememberMeTask {
_ <- ctx.logger.info(s"Cleanup remember-me tokens older than $ts") _ <- ctx.logger.info(s"Cleanup remember-me tokens older than $ts")
n <- ctx.store.transact(RRememberMe.deleteOlderThan(ts)) n <- ctx.store.transact(RRememberMe.deleteOlderThan(ts))
_ <- ctx.logger.info(s"Removed $n tokens") _ <- ctx.logger.info(s"Removed $n tokens")
} yield () } yield CleanupResult.of(n)
else else
ctx.logger.info("CleanupRememberMe task is disabled in the configuration") ctx.logger.info("CleanupRememberMe task is disabled in the configuration") *>
CleanupResult.disabled.pure[F]
} }
} }

View File

@ -0,0 +1,21 @@
/*
* Copyright 2020 Eike K. & Contributors
*
* SPDX-License-Identifier: AGPL-3.0-or-later
*/
package docspell.joex.hk
import io.circe.Encoder
import io.circe.generic.semiauto.deriveEncoder
case class CleanupResult(removed: Int, disabled: Boolean) {
def asString = if (disabled) "disabled" else s"$removed"
}
object CleanupResult {
def of(n: Int): CleanupResult = CleanupResult(n, false)
def disabled: CleanupResult = CleanupResult(0, true)
implicit val jsonEncoder: Encoder[CleanupResult] =
deriveEncoder
}

View File

@ -16,7 +16,8 @@ case class HouseKeepingConfig(
cleanupInvites: CleanupInvites, cleanupInvites: CleanupInvites,
cleanupJobs: CleanupJobs, cleanupJobs: CleanupJobs,
cleanupRememberMe: CleanupRememberMe, cleanupRememberMe: CleanupRememberMe,
checkNodes: CheckNodes checkNodes: CheckNodes,
integrityCheck: IntegrityCheck
) )
object HouseKeepingConfig { object HouseKeepingConfig {
@ -29,4 +30,5 @@ object HouseKeepingConfig {
case class CheckNodes(enabled: Boolean, minNotFound: Int) case class CheckNodes(enabled: Boolean, minNotFound: Int)
case class IntegrityCheck(enabled: Boolean)
} }

View File

@ -8,26 +8,41 @@ package docspell.joex.hk
import cats.effect._ import cats.effect._
import cats.implicits._ import cats.implicits._
import docspell.backend.ops.OFileRepository
import docspell.common._ import docspell.common._
import docspell.joex.Config import docspell.joex.Config
import docspell.joex.scheduler.Task import docspell.joex.filecopy.FileIntegrityCheckTask
import docspell.joex.scheduler.{JobTaskResultEncoder, Task}
import docspell.store.records._ import docspell.store.records._
import docspell.store.usertask.UserTaskScope import docspell.store.usertask.UserTaskScope
import com.github.eikek.calev._ import com.github.eikek.calev._
import docspell.backend.ops.OFileRepository import io.circe.Encoder
import io.circe.generic.semiauto.deriveEncoder
object HouseKeepingTask { object HouseKeepingTask {
private val periodicId = Ident.unsafe("docspell-houskeeping") private val periodicId = Ident.unsafe("docspell-houskeeping")
val taskName: Ident = Ident.unsafe("housekeeping") val taskName: Ident = Ident.unsafe("housekeeping")
def apply[F[_]: Async](cfg: Config, fileRepo: OFileRepository[F]): Task[F, Unit, Unit] = def apply[F[_]: Async](
cfg: Config,
fileRepo: OFileRepository[F]
): Task[F, Unit, Result] = {
val combined =
(
CheckNodesTask(cfg.houseKeeping.checkNodes),
CleanupInvitesTask(cfg.houseKeeping.cleanupInvites),
CleanupJobsTask(cfg.houseKeeping.cleanupJobs),
CleanupRememberMeTask(cfg.houseKeeping.cleanupRememberMe),
IntegrityCheckTask(cfg.houseKeeping.integrityCheck, fileRepo)
).mapN(Result.apply)
Task Task
.log[F, Unit](_.info(s"Running house-keeping task now")) .log[F, Unit](_.info(s"Running house-keeping task now"))
.flatMap(_ => CleanupInvitesTask(cfg.houseKeeping.cleanupInvites)) .flatMap(_ => combined)
.flatMap(_ => CleanupRememberMeTask(cfg.houseKeeping.cleanupRememberMe)) }
.flatMap(_ => CleanupJobsTask(cfg.houseKeeping.cleanupJobs))
.flatMap(_ => CheckNodesTask(cfg.houseKeeping.checkNodes))
def onCancel[F[_]]: Task[F, Unit, Unit] = def onCancel[F[_]]: Task[F, Unit, Unit] =
Task.log[F, Unit](_.warn("Cancelling house-keeping task")) Task.log[F, Unit](_.warn("Cancelling house-keeping task"))
@ -45,4 +60,27 @@ object HouseKeepingTask {
None None
) )
.map(_.copy(id = periodicId)) .map(_.copy(id = periodicId))
case class Result(
checkNodes: CleanupResult,
cleanupInvites: CleanupResult,
cleanupJobs: CleanupResult,
cleanupRememberMe: CleanupResult,
integrityCheck: FileIntegrityCheckTask.Result
)
object Result {
implicit val jsonEncoder: Encoder[Result] =
deriveEncoder
implicit val jobTaskResultEncoder: JobTaskResultEncoder[Result] =
JobTaskResultEncoder.fromJson[Result].withMessage { r =>
s"- Nodes removed: ${r.checkNodes.asString}\n" +
s"- Invites removed: ${r.cleanupInvites.asString}\n" +
s"- Jobs removed: ${r.cleanupJobs.asString}\n" +
s"- RememberMe removed: ${r.cleanupRememberMe.asString}\n" +
s"- Integrity check: ok=${r.integrityCheck.ok}, failed=${r.integrityCheck.failedKeys.size}, notFound=${r.integrityCheck.notFoundKeys.size}"
}
}
} }

View File

@ -0,0 +1,32 @@
/*
* Copyright 2020 Eike K. & Contributors
*
* SPDX-License-Identifier: AGPL-3.0-or-later
*/
package docspell.joex.hk
import cats.effect._
import cats.implicits._
import docspell.backend.ops.OFileRepository
import docspell.common._
import docspell.joex.filecopy.FileIntegrityCheckTask
import docspell.joex.scheduler.Task
object IntegrityCheckTask {
def apply[F[_]: Sync](
cfg: HouseKeepingConfig.IntegrityCheck,
fileRepo: OFileRepository[F]
): Task[F, Unit, FileIntegrityCheckTask.Result] =
Task { ctx =>
if (cfg.enabled)
FileIntegrityCheckTask(fileRepo).run(
ctx.map(_ => FileIntegrityCheckArgs(FileKeyPart.Empty))
)
else
ctx.logger.info("Integrity check task is disabled in the configuration") *>
FileIntegrityCheckTask.Result.empty.pure[F]
}
}

View File

@ -23,7 +23,7 @@ object LearnItemEntities {
maxItems: Int, maxItems: Int,
maxTextLen: Int maxTextLen: Int
): Task[F, A, Unit] = ): Task[F, A, Unit] =
learnCorrOrg(analyser, collective, maxItems, maxTextLen) learnCorrOrg[F, A](analyser, collective, maxItems, maxTextLen)
.flatMap(_ => learnCorrPerson[F, A](analyser, collective, maxItems, maxTextLen)) .flatMap(_ => learnCorrPerson[F, A](analyser, collective, maxItems, maxTextLen))
.flatMap(_ => learnConcPerson(analyser, collective, maxItems, maxTextLen)) .flatMap(_ => learnConcPerson(analyser, collective, maxItems, maxTextLen))
.flatMap(_ => learnConcEquip(analyser, collective, maxItems, maxTextLen)) .flatMap(_ => learnConcEquip(analyser, collective, maxItems, maxTextLen))

View File

@ -25,7 +25,7 @@ object ItemHandler {
type Args = ProcessItemArgs type Args = ProcessItemArgs
def onCancel[F[_]: Sync]: Task[F, Args, Unit] = def onCancel[F[_]: Sync]: Task[F, Args, Unit] =
logWarn("Now cancelling.").flatMap(_ => logWarn[F]("Now cancelling.").flatMap(_ =>
markItemCreated.flatMap { markItemCreated.flatMap {
case true => case true =>
Task.pure(()) Task.pure(())
@ -41,10 +41,10 @@ object ItemHandler {
analyser: TextAnalyser[F], analyser: TextAnalyser[F],
regexNer: RegexNerFile[F] regexNer: RegexNerFile[F]
): Task[F, Args, Option[ItemData]] = ): Task[F, Args, Option[ItemData]] =
logBeginning.flatMap(_ => logBeginning[F].flatMap(_ =>
DuplicateCheck[F] DuplicateCheck[F]
.flatMap(args => .flatMap(args =>
if (args.files.isEmpty) logNoFiles.map(_ => None) if (args.files.isEmpty) logNoFiles[F].map(_ => None)
else { else {
val create: Task[F, Args, ItemData] = val create: Task[F, Args, ItemData] =
CreateItem[F].contramap(_ => args.pure[F]) CreateItem[F].contramap(_ => args.pure[F])

View File

@ -7,6 +7,7 @@
package docspell.joex.process package docspell.joex.process
import cats.effect._ import cats.effect._
import cats.implicits._
import docspell.analysis.TextAnalyser import docspell.analysis.TextAnalyser
import docspell.backend.ops.OItem import docspell.backend.ops.OItem

View File

@ -18,12 +18,6 @@ trait Task[F[_], A, B] {
def run(ctx: Context[F, A]): F[B] def run(ctx: Context[F, A]): F[B]
def map[C](f: B => C)(implicit F: Functor[F]): Task[F, A, C] =
Task(Task.toKleisli(this).map(f))
def flatMap[C](f: B => Task[F, A, C])(implicit F: FlatMap[F]): Task[F, A, C] =
Task(Task.toKleisli(this).flatMap(a => Task.toKleisli(f(a))))
def andThen[C](f: B => F[C])(implicit F: FlatMap[F]): Task[F, A, C] = def andThen[C](f: B => F[C])(implicit F: FlatMap[F]): Task[F, A, C] =
Task(Task.toKleisli(this).andThen(f)) Task(Task.toKleisli(this).andThen(f))
@ -62,4 +56,17 @@ object Task {
def log[F[_], A](f: Logger[F] => F[Unit]): Task[F, A, Unit] = def log[F[_], A](f: Logger[F] => F[Unit]): Task[F, A, Unit] =
Task(ctx => f(ctx.logger)) Task(ctx => f(ctx.logger))
implicit def taskMonad[F[_]: Monad, T]: Monad[Task[F, T, *]] =
new Monad[Task[F, T, *]] {
def pure[A](x: A) = Task(_ => Monad[F].pure(x))
def flatMap[A, B](fa: Task[F, T, A])(f: A => Task[F, T, B]) =
Task(Task.toKleisli(fa).flatMap(a => Task.toKleisli(f(a))))
def tailRecM[A, B](a: A)(f: A => Task[F, T, Either[A, B]]) = {
val monadK = Monad[Kleisli[F, Context[F, T], *]]
val r = monadK.tailRecM(a)(x => Task.toKleisli(f(x)))
Task(r)
}
}
} }