mirror of
https://github.com/TheAnachronism/docspell.git
synced 2025-04-05 10:59:33 +00:00
Add a task implementation to delete items
This commit is contained in:
parent
4901276c66
commit
50706c3d6d
@ -23,6 +23,8 @@ import doobie.implicits._
|
|||||||
trait OItemSearch[F[_]] {
|
trait OItemSearch[F[_]] {
|
||||||
def findItem(id: Ident, collective: Ident): F[Option[ItemData]]
|
def findItem(id: Ident, collective: Ident): F[Option[ItemData]]
|
||||||
|
|
||||||
|
def findDeleted(collective: Ident, limit: Int): F[Vector[RItem]]
|
||||||
|
|
||||||
def findItems(maxNoteLen: Int)(q: Query, batch: Batch): F[Vector[ListItem]]
|
def findItems(maxNoteLen: Int)(q: Query, batch: Batch): F[Vector[ListItem]]
|
||||||
|
|
||||||
/** Same as `findItems` but does more queries per item to find all tags. */
|
/** Same as `findItems` but does more queries per item to find all tags. */
|
||||||
@ -145,6 +147,13 @@ object OItemSearch {
|
|||||||
.toVector
|
.toVector
|
||||||
}
|
}
|
||||||
|
|
||||||
|
def findDeleted(collective: Ident, limit: Int): F[Vector[RItem]] =
|
||||||
|
store
|
||||||
|
.transact(RItem.findDeleted(collective, limit))
|
||||||
|
.take(limit.toLong)
|
||||||
|
.compile
|
||||||
|
.toVector
|
||||||
|
|
||||||
def findItemsWithTags(
|
def findItemsWithTags(
|
||||||
maxNoteLen: Int
|
maxNoteLen: Int
|
||||||
)(q: Query, batch: Batch): F[Vector[ListItemWithTags]] =
|
)(q: Query, batch: Batch): F[Vector[ListItemWithTags]] =
|
||||||
|
@ -18,6 +18,7 @@ import docspell.common._
|
|||||||
import docspell.ftsclient.FtsClient
|
import docspell.ftsclient.FtsClient
|
||||||
import docspell.ftssolr.SolrFtsClient
|
import docspell.ftssolr.SolrFtsClient
|
||||||
import docspell.joex.analysis.RegexNerFile
|
import docspell.joex.analysis.RegexNerFile
|
||||||
|
import docspell.joex.emptytrash._
|
||||||
import docspell.joex.fts.{MigrationTask, ReIndexTask}
|
import docspell.joex.fts.{MigrationTask, ReIndexTask}
|
||||||
import docspell.joex.hk._
|
import docspell.joex.hk._
|
||||||
import docspell.joex.learn.LearnClassifierTask
|
import docspell.joex.learn.LearnClassifierTask
|
||||||
@ -102,6 +103,7 @@ object JoexAppImpl {
|
|||||||
upload <- OUpload(store, queue, cfg.files, joex)
|
upload <- OUpload(store, queue, cfg.files, joex)
|
||||||
fts <- createFtsClient(cfg)(httpClient)
|
fts <- createFtsClient(cfg)(httpClient)
|
||||||
itemOps <- OItem(store, fts, queue, joex)
|
itemOps <- OItem(store, fts, queue, joex)
|
||||||
|
itemSearchOps <- OItemSearch(store)
|
||||||
analyser <- TextAnalyser.create[F](cfg.textAnalysis.textAnalysisConfig)
|
analyser <- TextAnalyser.create[F](cfg.textAnalysis.textAnalysisConfig)
|
||||||
regexNer <- RegexNerFile(cfg.textAnalysis.regexNerFileConfig, store)
|
regexNer <- RegexNerFile(cfg.textAnalysis.regexNerFileConfig, store)
|
||||||
javaEmil =
|
javaEmil =
|
||||||
@ -206,6 +208,13 @@ object JoexAppImpl {
|
|||||||
AllPageCountTask.onCancel[F]
|
AllPageCountTask.onCancel[F]
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
|
.withTask(
|
||||||
|
JobTask.json(
|
||||||
|
EmptyTrashArgs.taskName,
|
||||||
|
EmptyTrashTask[F](itemOps, itemSearchOps),
|
||||||
|
EmptyTrashTask.onCancel[F]
|
||||||
|
)
|
||||||
|
)
|
||||||
.resource
|
.resource
|
||||||
psch <- PeriodicScheduler.create(
|
psch <- PeriodicScheduler.create(
|
||||||
cfg.periodicScheduler,
|
cfg.periodicScheduler,
|
||||||
|
@ -0,0 +1,68 @@
|
|||||||
|
/*
|
||||||
|
* Copyright 2020 Docspell Contributors
|
||||||
|
*
|
||||||
|
* SPDX-License-Identifier: GPL-3.0-or-later
|
||||||
|
*/
|
||||||
|
|
||||||
|
package docspell.joex.emptytrash
|
||||||
|
|
||||||
|
import cats.effect._
|
||||||
|
import cats.implicits._
|
||||||
|
import fs2.Stream
|
||||||
|
|
||||||
|
import docspell.backend.ops.{OItem, OItemSearch}
|
||||||
|
import docspell.common._
|
||||||
|
import docspell.joex.scheduler._
|
||||||
|
import docspell.store.records.RItem
|
||||||
|
|
||||||
|
object EmptyTrashTask {
|
||||||
|
|
||||||
|
type Args = EmptyTrashArgs
|
||||||
|
|
||||||
|
def onCancel[F[_]]: Task[F, Args, Unit] =
|
||||||
|
Task.log(_.warn("Cancelling empty-trash task"))
|
||||||
|
|
||||||
|
private val pageSize = 20
|
||||||
|
|
||||||
|
def apply[F[_]: Async](
|
||||||
|
itemOps: OItem[F],
|
||||||
|
itemSearchOps: OItemSearch[F]
|
||||||
|
): Task[F, Args, Unit] =
|
||||||
|
Task { ctx =>
|
||||||
|
val collId = ctx.args.collective
|
||||||
|
for {
|
||||||
|
_ <- ctx.logger.info(s"Starting removing all soft-deleted items")
|
||||||
|
nDeleted <- deleteAll(collId, itemOps, itemSearchOps, ctx)
|
||||||
|
_ <- ctx.logger.info(s"Finished deleting ${nDeleted} items")
|
||||||
|
} yield ()
|
||||||
|
}
|
||||||
|
|
||||||
|
private def deleteAll[F[_]: Async](
|
||||||
|
collective: Ident,
|
||||||
|
itemOps: OItem[F],
|
||||||
|
itemSearchOps: OItemSearch[F],
|
||||||
|
ctx: Context[F, _]
|
||||||
|
): F[Int] =
|
||||||
|
Stream
|
||||||
|
.eval(itemSearchOps.findDeleted(collective, pageSize))
|
||||||
|
.evalMap(deleteChunk(collective, itemOps, ctx))
|
||||||
|
.repeat
|
||||||
|
.takeWhile(_ > 0)
|
||||||
|
.compile
|
||||||
|
.foldMonoid
|
||||||
|
|
||||||
|
private def deleteChunk[F[_]: Async](
|
||||||
|
collective: Ident,
|
||||||
|
itemOps: OItem[F],
|
||||||
|
ctx: Context[F, _]
|
||||||
|
)(chunk: Vector[RItem]): F[Int] =
|
||||||
|
if (chunk.isEmpty) {
|
||||||
|
0.pure[F]
|
||||||
|
} else {
|
||||||
|
ctx.logger.info(s"Deleting next ${chunk.size} items …") *>
|
||||||
|
chunk.traverse(i =>
|
||||||
|
ctx.logger.debug(s"Delete item ${i.id.id} / ${i.name} now") *>
|
||||||
|
itemOps.deleteItem(i.id, collective)
|
||||||
|
) *> chunk.size.pure[F]
|
||||||
|
}
|
||||||
|
}
|
@ -9,6 +9,7 @@ package docspell.store.records
|
|||||||
import cats.data.NonEmptyList
|
import cats.data.NonEmptyList
|
||||||
import cats.effect.Sync
|
import cats.effect.Sync
|
||||||
import cats.implicits._
|
import cats.implicits._
|
||||||
|
import fs2.Stream
|
||||||
|
|
||||||
import docspell.common._
|
import docspell.common._
|
||||||
import docspell.store.qb.DSL._
|
import docspell.store.qb.DSL._
|
||||||
@ -388,6 +389,11 @@ object RItem {
|
|||||||
def findById(itemId: Ident): ConnectionIO[Option[RItem]] =
|
def findById(itemId: Ident): ConnectionIO[Option[RItem]] =
|
||||||
run(select(T.all), from(T), T.id === itemId).query[RItem].option
|
run(select(T.all), from(T), T.id === itemId).query[RItem].option
|
||||||
|
|
||||||
|
def findDeleted(collective: Ident, chunkSize: Int): Stream[ConnectionIO, RItem] =
|
||||||
|
run(select(T.all), from(T), T.cid === collective && T.state === ItemState.deleted)
|
||||||
|
.query[RItem]
|
||||||
|
.streamWithChunkSize(chunkSize)
|
||||||
|
|
||||||
def checkByIdAndCollective(itemId: Ident, coll: Ident): ConnectionIO[Option[Ident]] =
|
def checkByIdAndCollective(itemId: Ident, coll: Ident): ConnectionIO[Option[Ident]] =
|
||||||
Select(T.id.s, from(T), T.id === itemId && T.cid === coll).build.query[Ident].option
|
Select(T.id.s, from(T), T.id === itemId && T.cid === coll).build.query[Ident].option
|
||||||
|
|
||||||
|
@ -83,10 +83,10 @@ trait UserTaskStore[F[_]] {
|
|||||||
*
|
*
|
||||||
* Unlike `updateTask`, this ensures that there is at most one task
|
* Unlike `updateTask`, this ensures that there is at most one task
|
||||||
* of some name in the db. Multiple same tasks (task with same
|
* of some name in the db. Multiple same tasks (task with same
|
||||||
* name) may not be allowed to run, dependening on what they do.
|
* name) may not be allowed to run, depending on what they do.
|
||||||
* This is not ensured by the database, though.
|
* This is not ensured by the database, though.
|
||||||
*
|
*
|
||||||
* If there are currently mutliple tasks with same name as `ut` for
|
* If there are currently multiple tasks with same name as `ut` for
|
||||||
* the user `account`, they will all be removed and the given task
|
* the user `account`, they will all be removed and the given task
|
||||||
* inserted!
|
* inserted!
|
||||||
*/
|
*/
|
||||||
|
Loading…
x
Reference in New Issue
Block a user