diff --git a/modules/backend/src/main/scala/docspell/backend/ops/OItemSearch.scala b/modules/backend/src/main/scala/docspell/backend/ops/OItemSearch.scala index 3abc6771..cb469880 100644 --- a/modules/backend/src/main/scala/docspell/backend/ops/OItemSearch.scala +++ b/modules/backend/src/main/scala/docspell/backend/ops/OItemSearch.scala @@ -23,6 +23,8 @@ import doobie.implicits._ trait OItemSearch[F[_]] { def findItem(id: Ident, collective: Ident): F[Option[ItemData]] + def findDeleted(collective: Ident, limit: Int): F[Vector[RItem]] + def findItems(maxNoteLen: Int)(q: Query, batch: Batch): F[Vector[ListItem]] /** Same as `findItems` but does more queries per item to find all tags. */ @@ -145,6 +147,13 @@ object OItemSearch { .toVector } + def findDeleted(collective: Ident, limit: Int): F[Vector[RItem]] = + store + .transact(RItem.findDeleted(collective, limit)) + .take(limit.toLong) + .compile + .toVector + def findItemsWithTags( maxNoteLen: Int )(q: Query, batch: Batch): F[Vector[ListItemWithTags]] = diff --git a/modules/joex/src/main/scala/docspell/joex/JoexAppImpl.scala b/modules/joex/src/main/scala/docspell/joex/JoexAppImpl.scala index 1d7f3419..cd2a114c 100644 --- a/modules/joex/src/main/scala/docspell/joex/JoexAppImpl.scala +++ b/modules/joex/src/main/scala/docspell/joex/JoexAppImpl.scala @@ -18,6 +18,7 @@ import docspell.common._ import docspell.ftsclient.FtsClient import docspell.ftssolr.SolrFtsClient import docspell.joex.analysis.RegexNerFile +import docspell.joex.emptytrash._ import docspell.joex.fts.{MigrationTask, ReIndexTask} import docspell.joex.hk._ import docspell.joex.learn.LearnClassifierTask @@ -94,16 +95,17 @@ object JoexAppImpl { for { httpClient <- BlazeClientBuilder[F](clientEC).resource client = JoexClient(httpClient) - store <- Store.create(cfg.jdbc, connectEC) - queue <- JobQueue(store) - pstore <- PeriodicTaskStore.create(store) - nodeOps <- ONode(store) - joex <- OJoex(client, store) - upload <- OUpload(store, queue, cfg.files, joex) - fts <- createFtsClient(cfg)(httpClient) - itemOps <- OItem(store, fts, queue, joex) - analyser <- TextAnalyser.create[F](cfg.textAnalysis.textAnalysisConfig) - regexNer <- RegexNerFile(cfg.textAnalysis.regexNerFileConfig, store) + store <- Store.create(cfg.jdbc, connectEC) + queue <- JobQueue(store) + pstore <- PeriodicTaskStore.create(store) + nodeOps <- ONode(store) + joex <- OJoex(client, store) + upload <- OUpload(store, queue, cfg.files, joex) + fts <- createFtsClient(cfg)(httpClient) + itemOps <- OItem(store, fts, queue, joex) + itemSearchOps <- OItemSearch(store) + analyser <- TextAnalyser.create[F](cfg.textAnalysis.textAnalysisConfig) + regexNer <- RegexNerFile(cfg.textAnalysis.regexNerFileConfig, store) javaEmil = JavaMailEmil(Settings.defaultSettings.copy(debug = cfg.mailDebug)) sch <- SchedulerBuilder(cfg.scheduler, store) @@ -206,6 +208,13 @@ object JoexAppImpl { AllPageCountTask.onCancel[F] ) ) + .withTask( + JobTask.json( + EmptyTrashArgs.taskName, + EmptyTrashTask[F](itemOps, itemSearchOps), + EmptyTrashTask.onCancel[F] + ) + ) .resource psch <- PeriodicScheduler.create( cfg.periodicScheduler, diff --git a/modules/joex/src/main/scala/docspell/joex/emptytrash/EmptyTrashTask.scala b/modules/joex/src/main/scala/docspell/joex/emptytrash/EmptyTrashTask.scala new file mode 100644 index 00000000..12173cb2 --- /dev/null +++ b/modules/joex/src/main/scala/docspell/joex/emptytrash/EmptyTrashTask.scala @@ -0,0 +1,68 @@ +/* + * Copyright 2020 Docspell Contributors + * + * SPDX-License-Identifier: GPL-3.0-or-later + */ + +package docspell.joex.emptytrash + +import cats.effect._ +import cats.implicits._ +import fs2.Stream + +import docspell.backend.ops.{OItem, OItemSearch} +import docspell.common._ +import docspell.joex.scheduler._ +import docspell.store.records.RItem + +object EmptyTrashTask { + + type Args = EmptyTrashArgs + + def onCancel[F[_]]: Task[F, Args, Unit] = + Task.log(_.warn("Cancelling empty-trash task")) + + private val pageSize = 20 + + def apply[F[_]: Async]( + itemOps: OItem[F], + itemSearchOps: OItemSearch[F] + ): Task[F, Args, Unit] = + Task { ctx => + val collId = ctx.args.collective + for { + _ <- ctx.logger.info(s"Starting removing all soft-deleted items") + nDeleted <- deleteAll(collId, itemOps, itemSearchOps, ctx) + _ <- ctx.logger.info(s"Finished deleting ${nDeleted} items") + } yield () + } + + private def deleteAll[F[_]: Async]( + collective: Ident, + itemOps: OItem[F], + itemSearchOps: OItemSearch[F], + ctx: Context[F, _] + ): F[Int] = + Stream + .eval(itemSearchOps.findDeleted(collective, pageSize)) + .evalMap(deleteChunk(collective, itemOps, ctx)) + .repeat + .takeWhile(_ > 0) + .compile + .foldMonoid + + private def deleteChunk[F[_]: Async]( + collective: Ident, + itemOps: OItem[F], + ctx: Context[F, _] + )(chunk: Vector[RItem]): F[Int] = + if (chunk.isEmpty) { + 0.pure[F] + } else { + ctx.logger.info(s"Deleting next ${chunk.size} items …") *> + chunk.traverse(i => + ctx.logger.debug(s"Delete item ${i.id.id} / ${i.name} now") *> + itemOps.deleteItem(i.id, collective) + ) *> chunk.size.pure[F] + } +} diff --git a/modules/store/src/main/scala/docspell/store/records/RItem.scala b/modules/store/src/main/scala/docspell/store/records/RItem.scala index 78eb5416..0fcdc7e9 100644 --- a/modules/store/src/main/scala/docspell/store/records/RItem.scala +++ b/modules/store/src/main/scala/docspell/store/records/RItem.scala @@ -9,6 +9,7 @@ package docspell.store.records import cats.data.NonEmptyList import cats.effect.Sync import cats.implicits._ +import fs2.Stream import docspell.common._ import docspell.store.qb.DSL._ @@ -388,6 +389,11 @@ object RItem { def findById(itemId: Ident): ConnectionIO[Option[RItem]] = run(select(T.all), from(T), T.id === itemId).query[RItem].option + def findDeleted(collective: Ident, chunkSize: Int): Stream[ConnectionIO, RItem] = + run(select(T.all), from(T), T.cid === collective && T.state === ItemState.deleted) + .query[RItem] + .streamWithChunkSize(chunkSize) + def checkByIdAndCollective(itemId: Ident, coll: Ident): ConnectionIO[Option[Ident]] = Select(T.id.s, from(T), T.id === itemId && T.cid === coll).build.query[Ident].option diff --git a/modules/store/src/main/scala/docspell/store/usertask/UserTaskStore.scala b/modules/store/src/main/scala/docspell/store/usertask/UserTaskStore.scala index 914e1357..6caab8f0 100644 --- a/modules/store/src/main/scala/docspell/store/usertask/UserTaskStore.scala +++ b/modules/store/src/main/scala/docspell/store/usertask/UserTaskStore.scala @@ -83,10 +83,10 @@ trait UserTaskStore[F[_]] { * * Unlike `updateTask`, this ensures that there is at most one task * of some name in the db. Multiple same tasks (task with same - * name) may not be allowed to run, dependening on what they do. + * name) may not be allowed to run, depending on what they do. * This is not ensured by the database, though. * - * If there are currently mutliple tasks with same name as `ut` for + * If there are currently multiple tasks with same name as `ut` for * the user `account`, they will all be removed and the given task * inserted! */