diff --git a/modules/backend/src/main/scala/docspell/backend/BackendApp.scala b/modules/backend/src/main/scala/docspell/backend/BackendApp.scala index 72ce0138..6ff3c73e 100644 --- a/modules/backend/src/main/scala/docspell/backend/BackendApp.scala +++ b/modules/backend/src/main/scala/docspell/backend/BackendApp.scala @@ -61,7 +61,7 @@ object BackendApp { uploadImpl <- OUpload(store, queue, cfg.files, joexImpl) nodeImpl <- ONode(store) jobImpl <- OJob(store, joexImpl) - itemImpl <- OItem(store, ftsClient) + itemImpl <- OItem(store, ftsClient, queue, joexImpl) itemSearchImpl <- OItemSearch(store) fulltextImpl <- OFulltext(itemSearchImpl, ftsClient, store, queue, joexImpl) javaEmil = diff --git a/modules/backend/src/main/scala/docspell/backend/JobFactory.scala b/modules/backend/src/main/scala/docspell/backend/JobFactory.scala index 396352b4..bc05a188 100644 --- a/modules/backend/src/main/scala/docspell/backend/JobFactory.scala +++ b/modules/backend/src/main/scala/docspell/backend/JobFactory.scala @@ -25,15 +25,16 @@ object JobFactory { now, account.user, prio, - None + collective + .map(c => c / ConvertAllPdfArgs.taskName) + .orElse(ConvertAllPdfArgs.taskName.some) ) } yield job def reprocessItem[F[_]: Sync]( args: ReProcessItemArgs, account: AccountId, - prio: Priority, - tracker: Option[Ident] + prio: Priority ): F[RJob] = for { id <- Ident.randomId[F] @@ -47,7 +48,7 @@ object JobFactory { now, account.user, prio, - tracker + Some(ReProcessItemArgs.taskName / args.itemId) ) } yield job diff --git a/modules/backend/src/main/scala/docspell/backend/ops/OItem.scala b/modules/backend/src/main/scala/docspell/backend/ops/OItem.scala index 4919fdfe..da3efce2 100644 --- a/modules/backend/src/main/scala/docspell/backend/ops/OItem.scala +++ b/modules/backend/src/main/scala/docspell/backend/ops/OItem.scala @@ -4,10 +4,12 @@ import cats.data.OptionT import cats.effect.{Effect, Resource} import cats.implicits._ +import docspell.backend.JobFactory import docspell.common._ import docspell.ftsclient.FtsClient import docspell.store.UpdateResult import docspell.store.queries.{QAttachment, QItem} +import docspell.store.queue.JobQueue import docspell.store.records._ import docspell.store.{AddResult, Store} @@ -76,11 +78,38 @@ trait OItem[F[_]] { name: Option[String], collective: Ident ): F[AddResult] + + /** Submits the item for re-processing. The list of attachment ids can + * be used to only re-process a subset of the item's attachments. + * If this list is empty, all attachments are reprocessed. This + * call only submits the job into the queue. + */ + def reprocess( + item: Ident, + attachments: List[Ident], + account: AccountId, + notifyJoex: Boolean + ): F[UpdateResult] + + /** Submits a task that finds all non-converted pdfs and triggers + * converting them using ocrmypdf. Each file is converted by a + * separate task. + */ + def convertAllPdf( + collective: Option[Ident], + account: AccountId, + notifyJoex: Boolean + ): F[UpdateResult] } object OItem { - def apply[F[_]: Effect](store: Store[F], fts: FtsClient[F]): Resource[F, OItem[F]] = + def apply[F[_]: Effect]( + store: Store[F], + fts: FtsClient[F], + queue: JobQueue[F], + joex: OJoex[F] + ): Resource[F, OItem[F]] = for { otag <- OTag(store) oorg <- OOrganization(store) @@ -400,6 +429,35 @@ object OItem { ) ) + def reprocess( + item: Ident, + attachments: List[Ident], + account: AccountId, + notifyJoex: Boolean + ): F[UpdateResult] = + (for { + _ <- OptionT( + store.transact(RItem.findByIdAndCollective(item, account.collective)) + ) + args = ReProcessItemArgs(item, attachments) + job <- OptionT.liftF( + JobFactory.reprocessItem[F](args, account, Priority.Low) + ) + _ <- OptionT.liftF(queue.insertIfNew(job)) + _ <- OptionT.liftF(if (notifyJoex) joex.notifyAllNodes else ().pure[F]) + } yield UpdateResult.success).getOrElse(UpdateResult.notFound) + + def convertAllPdf( + collective: Option[Ident], + account: AccountId, + notifyJoex: Boolean + ): F[UpdateResult] = + for { + job <- JobFactory.convertAllPdfs[F](collective, account, Priority.Low) + _ <- queue.insertIfNew(job) + _ <- if (notifyJoex) joex.notifyAllNodes else ().pure[F] + } yield UpdateResult.success + private def onSuccessIgnoreError(update: F[Unit])(ar: AddResult): F[Unit] = ar match { case AddResult.Success => diff --git a/modules/backend/src/main/scala/docspell/backend/ops/OUpload.scala b/modules/backend/src/main/scala/docspell/backend/ops/OUpload.scala index c6edbfb2..a9145f72 100644 --- a/modules/backend/src/main/scala/docspell/backend/ops/OUpload.scala +++ b/modules/backend/src/main/scala/docspell/backend/ops/OUpload.scala @@ -44,24 +44,6 @@ trait OUpload[F[_]] { case Left(srcId) => submit(data, srcId, notifyJoex, itemId) } - - /** Submits the item for re-processing. The list of attachment ids can - * be used to only re-process a subset of the item's attachments. - * If this list is empty, all attachments are reprocessed. This - * call only submits the job into the queue. - */ - def reprocess( - item: Ident, - attachments: List[Ident], - account: AccountId, - notifyJoex: Boolean - ): F[OUpload.UploadResult] - - def convertAllPdf( - collective: Option[Ident], - account: AccountId, - notifyJoex: Boolean - ): F[OUpload.UploadResult] } object OUpload { @@ -177,31 +159,6 @@ object OUpload { result <- OptionT.liftF(submit(updata, accId, notifyJoex, itemId)) } yield result).getOrElse(UploadResult.noSource) - def reprocess( - item: Ident, - attachments: List[Ident], - account: AccountId, - notifyJoex: Boolean - ): F[UploadResult] = - (for { - _ <- - OptionT(store.transact(RItem.findByIdAndCollective(item, account.collective))) - args = ReProcessItemArgs(item, attachments) - job <- - OptionT.liftF(JobFactory.reprocessItem[F](args, account, Priority.Low, None)) - res <- OptionT.liftF(submitJobs(notifyJoex)(Vector(job))) - } yield res).getOrElse(UploadResult.noItem) - - def convertAllPdf( - collective: Option[Ident], - account: AccountId, - notifyJoex: Boolean - ): F[OUpload.UploadResult] = - for { - job <- JobFactory.convertAllPdfs(collective, account, Priority.Low) - res <- submitJobs(notifyJoex)(Vector(job)) - } yield res - private def submitJobs( notifyJoex: Boolean )(jobs: Vector[RJob]): F[OUpload.UploadResult] = diff --git a/modules/common/src/main/scala/docspell/common/ConvertAllPdfArgs.scala b/modules/common/src/main/scala/docspell/common/ConvertAllPdfArgs.scala index d4ae5ba7..eb2978d7 100644 --- a/modules/common/src/main/scala/docspell/common/ConvertAllPdfArgs.scala +++ b/modules/common/src/main/scala/docspell/common/ConvertAllPdfArgs.scala @@ -3,12 +3,24 @@ package docspell.common import io.circe._ import io.circe.generic.semiauto._ +/** Arguments for the task that finds all pdf files that have not been + * converted and submits for each a job that will convert the file + * using ocrmypdf. + * + * If the `collective` argument is present, then this task and the + * ones that are submitted by this task run in the realm of the + * collective (and only their files are considered). If it is empty, + * it is a system task and all files are considered. + */ case class ConvertAllPdfArgs(collective: Option[Ident]) object ConvertAllPdfArgs { + val taskName = Ident.unsafe("submit-pdf-migration-tasks") + implicit val jsonDecoder: Decoder[ConvertAllPdfArgs] = deriveDecoder[ConvertAllPdfArgs] + implicit val jsonEncoder: Encoder[ConvertAllPdfArgs] = deriveEncoder[ConvertAllPdfArgs] } diff --git a/modules/joex/src/main/scala/docspell/joex/JoexAppImpl.scala b/modules/joex/src/main/scala/docspell/joex/JoexAppImpl.scala index f07e089e..bc415446 100644 --- a/modules/joex/src/main/scala/docspell/joex/JoexAppImpl.scala +++ b/modules/joex/src/main/scala/docspell/joex/JoexAppImpl.scala @@ -87,7 +87,7 @@ object JoexAppImpl { joex <- OJoex(client, store) upload <- OUpload(store, queue, cfg.files, joex) fts <- createFtsClient(cfg)(httpClient) - itemOps <- OItem(store, fts) + itemOps <- OItem(store, fts, queue, joex) javaEmil = JavaMailEmil(blocker, Settings.defaultSettings.copy(debug = cfg.mailDebug)) sch <- SchedulerBuilder(cfg.scheduler, blocker, store) diff --git a/modules/joex/src/main/scala/docspell/joex/pdfconv/ConvertAllPdfTask.scala b/modules/joex/src/main/scala/docspell/joex/pdfconv/ConvertAllPdfTask.scala index c40d0783..019894fa 100644 --- a/modules/joex/src/main/scala/docspell/joex/pdfconv/ConvertAllPdfTask.scala +++ b/modules/joex/src/main/scala/docspell/joex/pdfconv/ConvertAllPdfTask.scala @@ -11,15 +11,19 @@ import docspell.store.queue.JobQueue import docspell.store.records.RAttachment import docspell.store.records._ +/* A task to find all non-converted pdf files (of a collective, or + * all) and converting them using ocrmypdf by submitting a job for + * each found file. + */ object ConvertAllPdfTask { type Args = ConvertAllPdfArgs def apply[F[_]: Sync](queue: JobQueue[F], joex: OJoex[F]): Task[F, Args, Unit] = Task { ctx => for { - _ <- ctx.logger.info("Converting older pdfs using ocrmypdf") + _ <- ctx.logger.info("Converting pdfs using ocrmypdf") n <- submitConversionJobs(ctx, queue) - _ <- ctx.logger.info(s"Submitted $n jobs for file conversion") + _ <- ctx.logger.info(s"Submitted $n file conversion jobs") _ <- joex.notifyAllNodes } yield () } @@ -36,7 +40,7 @@ object ConvertAllPdfTask { .chunks .flatMap(createJobs[F](ctx)) .chunks - .evalMap(jobs => queue.insertAll(jobs.toVector).map(_ => jobs.size)) + .evalMap(jobs => queue.insertAllIfNew(jobs.toVector).map(_ => jobs.size)) .evalTap(n => ctx.logger.debug(s"Submitted $n jobs …")) .compile .foldMonoid @@ -59,7 +63,7 @@ object ConvertAllPdfTask { now, collectiveOrSystem, Priority.Low, - Some(ra.id) + Some(PdfConvTask.taskName / ra.id) ) val jobs = ras.traverse(mkJob) diff --git a/modules/joex/src/main/scala/docspell/joex/process/ProcessItem.scala b/modules/joex/src/main/scala/docspell/joex/process/ProcessItem.scala index 72eefa39..9b4d050f 100644 --- a/modules/joex/src/main/scala/docspell/joex/process/ProcessItem.scala +++ b/modules/joex/src/main/scala/docspell/joex/process/ProcessItem.scala @@ -17,12 +17,7 @@ object ProcessItem { )(item: ItemData): Task[F, ProcessItemArgs, ItemData] = ExtractArchive(item) .flatMap(Task.setProgress(20)) - .flatMap(ConvertPdf(cfg.convert, _)) - .flatMap(Task.setProgress(40)) - .flatMap(TextExtraction(cfg.extraction, fts)) - .flatMap(Task.setProgress(60)) - .flatMap(analysisOnly[F](cfg)) - .flatMap(Task.setProgress(80)) + .flatMap(processAttachments0(cfg, fts, (40, 60, 80))) .flatMap(LinkProposal[F]) .flatMap(SetGivenData[F](itemOps)) .flatMap(Task.setProgress(99)) @@ -31,12 +26,7 @@ object ProcessItem { cfg: Config, fts: FtsClient[F] )(item: ItemData): Task[F, ProcessItemArgs, ItemData] = - ConvertPdf(cfg.convert, item) - .flatMap(Task.setProgress(30)) - .flatMap(TextExtraction(cfg.extraction, fts)) - .flatMap(Task.setProgress(60)) - .flatMap(analysisOnly[F](cfg)) - .flatMap(Task.setProgress(90)) + processAttachments0[F](cfg, fts, (30, 60, 90))(item) def analysisOnly[F[_]: Sync]( cfg: Config @@ -45,4 +35,16 @@ object ProcessItem { .flatMap(FindProposal[F](cfg.processing)) .flatMap(EvalProposals[F]) .flatMap(SaveProposals[F]) + + private def processAttachments0[F[_]: ConcurrentEffect: ContextShift]( + cfg: Config, + fts: FtsClient[F], + progress: (Int, Int, Int) + )(item: ItemData): Task[F, ProcessItemArgs, ItemData] = + ConvertPdf(cfg.convert, item) + .flatMap(Task.setProgress(progress._1)) + .flatMap(TextExtraction(cfg.extraction, fts)) + .flatMap(Task.setProgress(progress._2)) + .flatMap(analysisOnly[F](cfg)) + .flatMap(Task.setProgress(progress._3)) } diff --git a/modules/restapi/src/main/resources/docspell-openapi.yml b/modules/restapi/src/main/resources/docspell-openapi.yml index c8831ed4..94f84dd0 100644 --- a/modules/restapi/src/main/resources/docspell-openapi.yml +++ b/modules/restapi/src/main/resources/docspell-openapi.yml @@ -1213,6 +1213,33 @@ paths: schema: $ref: "#/components/schemas/BasicResult" + /sec/item/convertallpdfs: + post: + tags: [ Item ] + summary: Convert all non-converted pdfs. + description: | + Submits a job that will find all pdf files that have not been + converted and converts them using the ocrmypdf tool (if + enabled). This tool has been added in version 0.9.0 and so + older files can be "migrated" this way, or maybe after + enabling the tool. + + The task finds all files of the current collective and submits + task for each file to convert. These tasks are submitted with + a low priority so that normal processing can still proceed. + + The body of the request should be empty. + security: + - authTokenHeader: [] + responses: + 200: + description: Ok + content: + application/json: + schema: + $ref: "#/components/schemas/BasicResult" + + /sec/item/search: post: tags: [ Item ] @@ -1811,7 +1838,7 @@ paths: content: application/json: schema: - $ref: "#/components/schemas/StringList" + $ref: "#/components/schemas/IdList" responses: 200: description: Ok @@ -2629,6 +2656,17 @@ paths: components: schemas: + IdList: + description: + A list of identifiers. + required: + - ids + properties: + ids: + type: array + items: + type: string + format: ident StringList: description: | A simple list of strings. diff --git a/modules/restserver/src/main/scala/docspell/restserver/routes/ItemRoutes.scala b/modules/restserver/src/main/scala/docspell/restserver/routes/ItemRoutes.scala index 49363696..a033791d 100644 --- a/modules/restserver/src/main/scala/docspell/restserver/routes/ItemRoutes.scala +++ b/modules/restserver/src/main/scala/docspell/restserver/routes/ItemRoutes.scala @@ -34,8 +34,8 @@ object ItemRoutes { case POST -> Root / "convertallpdfs" => for { res <- - backend.upload.convertAllPdf(user.account.collective.some, user.account, true) - resp <- Ok(Conversions.basicResult(res)) + backend.item.convertAllPdf(user.account.collective.some, user.account, true) + resp <- Ok(Conversions.basicResult(res, "Task submitted")) } yield resp case req @ POST -> Root / "search" => @@ -288,11 +288,11 @@ object ItemRoutes { case req @ POST -> Root / Ident(id) / "reprocess" => for { - data <- req.as[StringList] - ids = data.items.flatMap(s => Ident.fromString(s).toOption) + data <- req.as[IdList] + ids = data.ids.flatMap(s => Ident.fromString(s).toOption) _ <- logger.fdebug(s"Re-process item ${id.id}") - res <- backend.upload.reprocess(id, ids, user.account, true) - resp <- Ok(Conversions.basicResult(res)) + res <- backend.item.reprocess(id, ids, user.account, true) + resp <- Ok(Conversions.basicResult(res, "Re-process task submitted.")) } yield resp case DELETE -> Root / Ident(id) => diff --git a/modules/store/src/main/scala/docspell/store/queue/JobQueue.scala b/modules/store/src/main/scala/docspell/store/queue/JobQueue.scala index f7d15ed5..127a45e1 100644 --- a/modules/store/src/main/scala/docspell/store/queue/JobQueue.scala +++ b/modules/store/src/main/scala/docspell/store/queue/JobQueue.scala @@ -28,6 +28,8 @@ trait JobQueue[F[_]] { def insertAll(jobs: Seq[RJob]): F[Unit] + def insertAllIfNew(jobs: Seq[RJob]): F[Unit] + def nextJob( prio: Ident => F[Priority], worker: Ident, @@ -81,5 +83,13 @@ object JobQueue { logger.error(ex)("Could not insert job. Skipping it.") }) + def insertAllIfNew(jobs: Seq[RJob]): F[Unit] = + jobs.toList + .traverse(j => insertIfNew(j).attempt) + .map(_.foreach { + case Right(()) => + case Left(ex) => + logger.error(ex)("Could not insert job. Skipping it.") + }) }) }