diff --git a/modules/backend/src/main/scala/docspell/backend/BackendApp.scala b/modules/backend/src/main/scala/docspell/backend/BackendApp.scala index 72ce0138..6ff3c73e 100644 --- a/modules/backend/src/main/scala/docspell/backend/BackendApp.scala +++ b/modules/backend/src/main/scala/docspell/backend/BackendApp.scala @@ -61,7 +61,7 @@ object BackendApp { uploadImpl <- OUpload(store, queue, cfg.files, joexImpl) nodeImpl <- ONode(store) jobImpl <- OJob(store, joexImpl) - itemImpl <- OItem(store, ftsClient) + itemImpl <- OItem(store, ftsClient, queue, joexImpl) itemSearchImpl <- OItemSearch(store) fulltextImpl <- OFulltext(itemSearchImpl, ftsClient, store, queue, joexImpl) javaEmil = diff --git a/modules/backend/src/main/scala/docspell/backend/JobFactory.scala b/modules/backend/src/main/scala/docspell/backend/JobFactory.scala index d7d8fe91..bc05a188 100644 --- a/modules/backend/src/main/scala/docspell/backend/JobFactory.scala +++ b/modules/backend/src/main/scala/docspell/backend/JobFactory.scala @@ -8,6 +8,50 @@ import docspell.store.records.RJob object JobFactory { + def convertAllPdfs[F[_]: Sync]( + collective: Option[Ident], + account: AccountId, + prio: Priority + ): F[RJob] = + for { + id <- Ident.randomId[F] + now <- Timestamp.current[F] + job = RJob.newJob( + id, + ConvertAllPdfArgs.taskName, + account.collective, + ConvertAllPdfArgs(collective), + s"Convert all pdfs not yet converted", + now, + account.user, + prio, + collective + .map(c => c / ConvertAllPdfArgs.taskName) + .orElse(ConvertAllPdfArgs.taskName.some) + ) + } yield job + + def reprocessItem[F[_]: Sync]( + args: ReProcessItemArgs, + account: AccountId, + prio: Priority + ): F[RJob] = + for { + id <- Ident.randomId[F] + now <- Timestamp.current[F] + job = RJob.newJob( + id, + ReProcessItemArgs.taskName, + account.collective, + args, + s"Re-process files of item ${args.itemId.id}", + now, + account.user, + prio, + Some(ReProcessItemArgs.taskName / args.itemId) + ) + } yield job + def processItem[F[_]: Sync]( args: ProcessItemArgs, account: AccountId, diff --git a/modules/backend/src/main/scala/docspell/backend/ops/OItem.scala b/modules/backend/src/main/scala/docspell/backend/ops/OItem.scala index 4919fdfe..da3efce2 100644 --- a/modules/backend/src/main/scala/docspell/backend/ops/OItem.scala +++ b/modules/backend/src/main/scala/docspell/backend/ops/OItem.scala @@ -4,10 +4,12 @@ import cats.data.OptionT import cats.effect.{Effect, Resource} import cats.implicits._ +import docspell.backend.JobFactory import docspell.common._ import docspell.ftsclient.FtsClient import docspell.store.UpdateResult import docspell.store.queries.{QAttachment, QItem} +import docspell.store.queue.JobQueue import docspell.store.records._ import docspell.store.{AddResult, Store} @@ -76,11 +78,38 @@ trait OItem[F[_]] { name: Option[String], collective: Ident ): F[AddResult] + + /** Submits the item for re-processing. The list of attachment ids can + * be used to only re-process a subset of the item's attachments. + * If this list is empty, all attachments are reprocessed. This + * call only submits the job into the queue. + */ + def reprocess( + item: Ident, + attachments: List[Ident], + account: AccountId, + notifyJoex: Boolean + ): F[UpdateResult] + + /** Submits a task that finds all non-converted pdfs and triggers + * converting them using ocrmypdf. Each file is converted by a + * separate task. + */ + def convertAllPdf( + collective: Option[Ident], + account: AccountId, + notifyJoex: Boolean + ): F[UpdateResult] } object OItem { - def apply[F[_]: Effect](store: Store[F], fts: FtsClient[F]): Resource[F, OItem[F]] = + def apply[F[_]: Effect]( + store: Store[F], + fts: FtsClient[F], + queue: JobQueue[F], + joex: OJoex[F] + ): Resource[F, OItem[F]] = for { otag <- OTag(store) oorg <- OOrganization(store) @@ -400,6 +429,35 @@ object OItem { ) ) + def reprocess( + item: Ident, + attachments: List[Ident], + account: AccountId, + notifyJoex: Boolean + ): F[UpdateResult] = + (for { + _ <- OptionT( + store.transact(RItem.findByIdAndCollective(item, account.collective)) + ) + args = ReProcessItemArgs(item, attachments) + job <- OptionT.liftF( + JobFactory.reprocessItem[F](args, account, Priority.Low) + ) + _ <- OptionT.liftF(queue.insertIfNew(job)) + _ <- OptionT.liftF(if (notifyJoex) joex.notifyAllNodes else ().pure[F]) + } yield UpdateResult.success).getOrElse(UpdateResult.notFound) + + def convertAllPdf( + collective: Option[Ident], + account: AccountId, + notifyJoex: Boolean + ): F[UpdateResult] = + for { + job <- JobFactory.convertAllPdfs[F](collective, account, Priority.Low) + _ <- queue.insertIfNew(job) + _ <- if (notifyJoex) joex.notifyAllNodes else ().pure[F] + } yield UpdateResult.success + private def onSuccessIgnoreError(update: F[Unit])(ar: AddResult): F[Unit] = ar match { case AddResult.Success => diff --git a/modules/backend/src/main/scala/docspell/backend/ops/OJob.scala b/modules/backend/src/main/scala/docspell/backend/ops/OJob.scala index 9a05337c..ade2fda0 100644 --- a/modules/backend/src/main/scala/docspell/backend/ops/OJob.scala +++ b/modules/backend/src/main/scala/docspell/backend/ops/OJob.scala @@ -48,7 +48,9 @@ object OJob { def queueState(collective: Ident, maxResults: Int): F[CollectiveQueueState] = store - .transact(QJob.queueStateSnapshot(collective).take(maxResults.toLong)) + .transact( + QJob.queueStateSnapshot(collective, maxResults.toLong) + ) .map(t => JobDetail(t._1, t._2)) .compile .toVector diff --git a/modules/common/src/main/scala/docspell/common/ConvertAllPdfArgs.scala b/modules/common/src/main/scala/docspell/common/ConvertAllPdfArgs.scala new file mode 100644 index 00000000..eb2978d7 --- /dev/null +++ b/modules/common/src/main/scala/docspell/common/ConvertAllPdfArgs.scala @@ -0,0 +1,26 @@ +package docspell.common + +import io.circe._ +import io.circe.generic.semiauto._ + +/** Arguments for the task that finds all pdf files that have not been + * converted and submits for each a job that will convert the file + * using ocrmypdf. + * + * If the `collective` argument is present, then this task and the + * ones that are submitted by this task run in the realm of the + * collective (and only their files are considered). If it is empty, + * it is a system task and all files are considered. + */ +case class ConvertAllPdfArgs(collective: Option[Ident]) + +object ConvertAllPdfArgs { + + val taskName = Ident.unsafe("submit-pdf-migration-tasks") + + implicit val jsonDecoder: Decoder[ConvertAllPdfArgs] = + deriveDecoder[ConvertAllPdfArgs] + + implicit val jsonEncoder: Encoder[ConvertAllPdfArgs] = + deriveEncoder[ConvertAllPdfArgs] +} diff --git a/modules/common/src/main/scala/docspell/common/ReProcessItemArgs.scala b/modules/common/src/main/scala/docspell/common/ReProcessItemArgs.scala new file mode 100644 index 00000000..f8afdd58 --- /dev/null +++ b/modules/common/src/main/scala/docspell/common/ReProcessItemArgs.scala @@ -0,0 +1,24 @@ +package docspell.common + +import io.circe.generic.semiauto._ +import io.circe.{Decoder, Encoder} + +/** Arguments when re-processing an item. + * + * The `itemId` must exist and point to some item. If the attachment + * list is non-empty, only those attachments are re-processed. They + * must belong to the given item. If the list is empty, then all + * attachments are re-processed. + */ +case class ReProcessItemArgs(itemId: Ident, attachments: List[Ident]) + +object ReProcessItemArgs { + + val taskName: Ident = Ident.unsafe("re-process-item") + + implicit val jsonEncoder: Encoder[ReProcessItemArgs] = + deriveEncoder[ReProcessItemArgs] + + implicit val jsonDecoder: Decoder[ReProcessItemArgs] = + deriveDecoder[ReProcessItemArgs] +} diff --git a/modules/joex/src/main/scala/docspell/joex/JoexAppImpl.scala b/modules/joex/src/main/scala/docspell/joex/JoexAppImpl.scala index 965659b7..bc415446 100644 --- a/modules/joex/src/main/scala/docspell/joex/JoexAppImpl.scala +++ b/modules/joex/src/main/scala/docspell/joex/JoexAppImpl.scala @@ -13,7 +13,10 @@ import docspell.ftssolr.SolrFtsClient import docspell.joex.fts.{MigrationTask, ReIndexTask} import docspell.joex.hk._ import docspell.joex.notify._ +import docspell.joex.pdfconv.ConvertAllPdfTask +import docspell.joex.pdfconv.PdfConvTask import docspell.joex.process.ItemHandler +import docspell.joex.process.ReProcessItem import docspell.joex.scanmailbox._ import docspell.joex.scheduler._ import docspell.joexapi.client.JoexClient @@ -84,7 +87,7 @@ object JoexAppImpl { joex <- OJoex(client, store) upload <- OUpload(store, queue, cfg.files, joex) fts <- createFtsClient(cfg)(httpClient) - itemOps <- OItem(store, fts) + itemOps <- OItem(store, fts, queue, joex) javaEmil = JavaMailEmil(blocker, Settings.defaultSettings.copy(debug = cfg.mailDebug)) sch <- SchedulerBuilder(cfg.scheduler, blocker, store) @@ -96,6 +99,13 @@ object JoexAppImpl { ItemHandler.onCancel[F] ) ) + .withTask( + JobTask.json( + ReProcessItemArgs.taskName, + ReProcessItem[F](cfg, fts), + ReProcessItem.onCancel[F] + ) + ) .withTask( JobTask.json( NotifyDueItemsArgs.taskName, @@ -131,6 +141,20 @@ object JoexAppImpl { HouseKeepingTask.onCancel[F] ) ) + .withTask( + JobTask.json( + PdfConvTask.taskName, + PdfConvTask[F](cfg), + PdfConvTask.onCancel[F] + ) + ) + .withTask( + JobTask.json( + ConvertAllPdfArgs.taskName, + ConvertAllPdfTask[F](queue, joex), + ConvertAllPdfTask.onCancel[F] + ) + ) .resource psch <- PeriodicScheduler.create( cfg.periodicScheduler, diff --git a/modules/joex/src/main/scala/docspell/joex/pdfconv/ConvertAllPdfTask.scala b/modules/joex/src/main/scala/docspell/joex/pdfconv/ConvertAllPdfTask.scala new file mode 100644 index 00000000..019894fa --- /dev/null +++ b/modules/joex/src/main/scala/docspell/joex/pdfconv/ConvertAllPdfTask.scala @@ -0,0 +1,72 @@ +package docspell.joex.pdfconv + +import cats.effect._ +import cats.implicits._ +import fs2.{Chunk, Stream} + +import docspell.backend.ops.OJoex +import docspell.common._ +import docspell.joex.scheduler.{Context, Task} +import docspell.store.queue.JobQueue +import docspell.store.records.RAttachment +import docspell.store.records._ + +/* A task to find all non-converted pdf files (of a collective, or + * all) and converting them using ocrmypdf by submitting a job for + * each found file. + */ +object ConvertAllPdfTask { + type Args = ConvertAllPdfArgs + + def apply[F[_]: Sync](queue: JobQueue[F], joex: OJoex[F]): Task[F, Args, Unit] = + Task { ctx => + for { + _ <- ctx.logger.info("Converting pdfs using ocrmypdf") + n <- submitConversionJobs(ctx, queue) + _ <- ctx.logger.info(s"Submitted $n file conversion jobs") + _ <- joex.notifyAllNodes + } yield () + } + + def onCancel[F[_]: Sync]: Task[F, Args, Unit] = + Task.log(_.warn("Cancelling convert-old-pdf task")) + + def submitConversionJobs[F[_]: Sync]( + ctx: Context[F, Args], + queue: JobQueue[F] + ): F[Int] = + ctx.store + .transact(RAttachment.findNonConvertedPdf(ctx.args.collective, 50)) + .chunks + .flatMap(createJobs[F](ctx)) + .chunks + .evalMap(jobs => queue.insertAllIfNew(jobs.toVector).map(_ => jobs.size)) + .evalTap(n => ctx.logger.debug(s"Submitted $n jobs …")) + .compile + .foldMonoid + + private def createJobs[F[_]: Sync]( + ctx: Context[F, Args] + )(ras: Chunk[RAttachment]): Stream[F, RJob] = { + val collectiveOrSystem = ctx.args.collective.getOrElse(DocspellSystem.taskGroup) + + def mkJob(ra: RAttachment): F[RJob] = + for { + id <- Ident.randomId[F] + now <- Timestamp.current[F] + } yield RJob.newJob( + id, + PdfConvTask.taskName, + collectiveOrSystem, + PdfConvTask.Args(ra.id), + s"Convert pdf ${ra.id.id}/${ra.name.getOrElse("-")}", + now, + collectiveOrSystem, + Priority.Low, + Some(PdfConvTask.taskName / ra.id) + ) + + val jobs = ras.traverse(mkJob) + Stream.evalUnChunk(jobs) + } +} diff --git a/modules/joex/src/main/scala/docspell/joex/pdfconv/PdfConvTask.scala b/modules/joex/src/main/scala/docspell/joex/pdfconv/PdfConvTask.scala new file mode 100644 index 00000000..07cc7c36 --- /dev/null +++ b/modules/joex/src/main/scala/docspell/joex/pdfconv/PdfConvTask.scala @@ -0,0 +1,155 @@ +package docspell.joex.pdfconv + +import cats.data.Kleisli +import cats.data.OptionT +import cats.effect._ +import cats.implicits._ +import fs2.Stream + +import docspell.common._ +import docspell.convert.ConversionResult +import docspell.convert.extern.OcrMyPdf +import docspell.joex.Config +import docspell.joex.scheduler.{Context, Task} +import docspell.store.records._ + +import bitpeace.FileMeta +import bitpeace.Mimetype +import bitpeace.MimetypeHint +import bitpeace.RangeDef +import io.circe.generic.semiauto._ +import io.circe.{Decoder, Encoder} + +/** Converts the given attachment file using ocrmypdf if it is a pdf + * and has not already been converted (the source file is the same as + * in the attachment). + */ +object PdfConvTask { + case class Args(attachId: Ident) + object Args { + implicit val jsonDecoder: Decoder[Args] = + deriveDecoder[Args] + implicit val jsonEncoder: Encoder[Args] = + deriveEncoder[Args] + } + + val taskName = Ident.unsafe("pdf-files-migration") + + def apply[F[_]: Sync: ContextShift](cfg: Config): Task[F, Args, Unit] = + Task { ctx => + for { + _ <- ctx.logger.info(s"Converting pdf file ${ctx.args} using ocrmypdf") + meta <- checkInputs(cfg, ctx) + _ <- meta.traverse(fm => convert(cfg, ctx, fm)) + } yield () + } + + def onCancel[F[_]: Sync]: Task[F, Args, Unit] = + Task.log(_.warn("Cancelling pdfconv task")) + + // --- Helper + + // check if file exists and if it is pdf and if source id is the same and if ocrmypdf is enabled + def checkInputs[F[_]: Sync](cfg: Config, ctx: Context[F, Args]): F[Option[FileMeta]] = { + val none: Option[FileMeta] = None + val checkSameFiles = + (for { + ra <- OptionT(ctx.store.transact(RAttachment.findById(ctx.args.attachId))) + isSame <- OptionT.liftF( + ctx.store.transact(RAttachmentSource.isSameFile(ra.id, ra.fileId)) + ) + } yield isSame).getOrElse(false) + val existsPdf = + for { + meta <- ctx.store.transact(RAttachment.findMeta(ctx.args.attachId)) + res = meta.filter(_.mimetype.matches(Mimetype.`application/pdf`)) + _ <- + if (res.isEmpty) + ctx.logger.info( + s"The attachment ${ctx.args.attachId} doesn't exist or is no pdf: $meta" + ) + else ().pure[F] + } yield res + + if (cfg.convert.ocrmypdf.enabled) + checkSameFiles.flatMap { + case true => existsPdf + case false => + ctx.logger.info( + s"The attachment ${ctx.args.attachId} already has been converted. Skipping." + ) *> + none.pure[F] + } + else none.pure[F] + } + + def convert[F[_]: Sync: ContextShift]( + cfg: Config, + ctx: Context[F, Args], + in: FileMeta + ): F[Unit] = { + val bp = ctx.store.bitpeace + val data = Stream + .emit(in) + .through(bp.fetchData2(RangeDef.all)) + + val storeResult: ConversionResult.Handler[F, Unit] = + Kleisli({ + case ConversionResult.SuccessPdf(file) => + storeToAttachment(ctx, in, file) + + case ConversionResult.SuccessPdfTxt(file, _) => + storeToAttachment(ctx, in, file) + + case ConversionResult.UnsupportedFormat(mime) => + ctx.logger.warn( + s"Unable to convert '${mime}' file ${ctx.args}: unsupported format." + ) + + case ConversionResult.InputMalformed(mime, reason) => + ctx.logger.warn(s"Unable to convert '${mime}' file ${ctx.args}: $reason") + + case ConversionResult.Failure(ex) => + ctx.logger.error(ex)(s"Failure converting file ${ctx.args}: ${ex.getMessage}") + }) + + def ocrMyPdf(lang: Language): F[Unit] = + OcrMyPdf.toPDF[F, Unit]( + cfg.convert.ocrmypdf, + lang, + in.chunksize, + ctx.blocker, + ctx.logger + )(data, storeResult) + + for { + lang <- getLanguage(ctx) + _ <- ocrMyPdf(lang) + } yield () + } + + def getLanguage[F[_]: Sync](ctx: Context[F, Args]): F[Language] = + (for { + coll <- OptionT(ctx.store.transact(RCollective.findByAttachment(ctx.args.attachId))) + lang = coll.language + } yield lang).getOrElse(Language.German) + + def storeToAttachment[F[_]: Sync]( + ctx: Context[F, Args], + meta: FileMeta, + newFile: Stream[F, Byte] + ): F[Unit] = { + val mimeHint = MimetypeHint.advertised(meta.mimetype.asString) + for { + time <- Timestamp.current[F] + fid <- Ident.randomId[F] + _ <- + ctx.store.bitpeace + .saveNew(newFile, meta.chunksize, mimeHint, Some(fid.id), time.value) + .compile + .lastOrError + _ <- ctx.store.transact(RAttachment.updateFileId(ctx.args.attachId, fid)) + } yield () + } + +} diff --git a/modules/joex/src/main/scala/docspell/joex/process/ConvertPdf.scala b/modules/joex/src/main/scala/docspell/joex/process/ConvertPdf.scala index ba75ec3a..572a18bb 100644 --- a/modules/joex/src/main/scala/docspell/joex/process/ConvertPdf.scala +++ b/modules/joex/src/main/scala/docspell/joex/process/ConvertPdf.scala @@ -126,11 +126,46 @@ object ConvertPdf { .compile .lastOrError .map(fm => Ident.unsafe(fm.id)) - .flatMap(fmId => - ctx.store - .transact(RAttachment.updateFileIdAndName(ra.id, fmId, newName)) - .map(_ => fmId) - ) + .flatMap(fmId => updateAttachment[F](ctx, ra, fmId, newName).map(_ => fmId)) .map(fmId => ra.copy(fileId = fmId, name = newName)) } + + private def updateAttachment[F[_]: Sync]( + ctx: Context[F, _], + ra: RAttachment, + fmId: Ident, + newName: Option[String] + ): F[Unit] = + for { + oldFile <- ctx.store.transact(RAttachment.findById(ra.id)) + _ <- + ctx.store + .transact(RAttachment.updateFileIdAndName(ra.id, fmId, newName)) + _ <- oldFile match { + case Some(raPrev) => + for { + sameFile <- + ctx.store + .transact(RAttachmentSource.isSameFile(ra.id, raPrev.fileId)) + _ <- + if (sameFile) ().pure[F] + else + ctx.logger.info("Deleting previous attachment file") *> + ctx.store.bitpeace + .delete(raPrev.fileId.id) + .compile + .drain + .attempt + .flatMap { + case Right(_) => ().pure[F] + case Left(ex) => + ctx.logger + .error(ex)(s"Cannot delete previous attachment file: ${raPrev}") + + } + } yield () + case None => + ().pure[F] + } + } yield () } diff --git a/modules/joex/src/main/scala/docspell/joex/process/ProcessItem.scala b/modules/joex/src/main/scala/docspell/joex/process/ProcessItem.scala index 139ec8f6..9b4d050f 100644 --- a/modules/joex/src/main/scala/docspell/joex/process/ProcessItem.scala +++ b/modules/joex/src/main/scala/docspell/joex/process/ProcessItem.scala @@ -17,16 +17,17 @@ object ProcessItem { )(item: ItemData): Task[F, ProcessItemArgs, ItemData] = ExtractArchive(item) .flatMap(Task.setProgress(20)) - .flatMap(ConvertPdf(cfg.convert, _)) - .flatMap(Task.setProgress(40)) - .flatMap(TextExtraction(cfg.extraction, fts)) - .flatMap(Task.setProgress(60)) - .flatMap(analysisOnly[F](cfg)) - .flatMap(Task.setProgress(80)) + .flatMap(processAttachments0(cfg, fts, (40, 60, 80))) .flatMap(LinkProposal[F]) .flatMap(SetGivenData[F](itemOps)) .flatMap(Task.setProgress(99)) + def processAttachments[F[_]: ConcurrentEffect: ContextShift]( + cfg: Config, + fts: FtsClient[F] + )(item: ItemData): Task[F, ProcessItemArgs, ItemData] = + processAttachments0[F](cfg, fts, (30, 60, 90))(item) + def analysisOnly[F[_]: Sync]( cfg: Config )(item: ItemData): Task[F, ProcessItemArgs, ItemData] = @@ -34,4 +35,16 @@ object ProcessItem { .flatMap(FindProposal[F](cfg.processing)) .flatMap(EvalProposals[F]) .flatMap(SaveProposals[F]) + + private def processAttachments0[F[_]: ConcurrentEffect: ContextShift]( + cfg: Config, + fts: FtsClient[F], + progress: (Int, Int, Int) + )(item: ItemData): Task[F, ProcessItemArgs, ItemData] = + ConvertPdf(cfg.convert, item) + .flatMap(Task.setProgress(progress._1)) + .flatMap(TextExtraction(cfg.extraction, fts)) + .flatMap(Task.setProgress(progress._2)) + .flatMap(analysisOnly[F](cfg)) + .flatMap(Task.setProgress(progress._3)) } diff --git a/modules/joex/src/main/scala/docspell/joex/process/ReProcessItem.scala b/modules/joex/src/main/scala/docspell/joex/process/ReProcessItem.scala new file mode 100644 index 00000000..8f5e11f2 --- /dev/null +++ b/modules/joex/src/main/scala/docspell/joex/process/ReProcessItem.scala @@ -0,0 +1,131 @@ +package docspell.joex.process + +import cats.data.OptionT +import cats.effect._ +import cats.implicits._ + +import docspell.common._ +import docspell.ftsclient.FtsClient +import docspell.joex.Config +import docspell.joex.scheduler.Context +import docspell.joex.scheduler.Task +import docspell.store.records.RAttachment +import docspell.store.records.RAttachmentSource +import docspell.store.records.RCollective +import docspell.store.records.RItem + +object ReProcessItem { + type Args = ReProcessItemArgs + + def apply[F[_]: ConcurrentEffect: ContextShift]( + cfg: Config, + fts: FtsClient[F] + ): Task[F, Args, Unit] = + loadItem[F] + .flatMap(safeProcess[F](cfg, fts)) + .map(_ => ()) + + def onCancel[F[_]: Sync: ContextShift]: Task[F, Args, Unit] = + logWarn("Now cancelling re-processing.") + + // --- Helpers + + private def contains[F[_]](ctx: Context[F, Args]): RAttachment => Boolean = { + val selection = ctx.args.attachments.toSet + if (selection.isEmpty) (_ => true) + else ra => selection.contains(ra.id) + } + + def loadItem[F[_]: Sync]: Task[F, Args, ItemData] = + Task { ctx => + (for { + item <- OptionT(ctx.store.transact(RItem.findById(ctx.args.itemId))) + attach <- OptionT.liftF(ctx.store.transact(RAttachment.findByItem(item.id))) + asrc <- + OptionT.liftF(ctx.store.transact(RAttachmentSource.findByItem(ctx.args.itemId))) + asrcMap = asrc.map(s => s.id -> s).toMap + // copy the original files over to attachments to run the default processing task + // the processing doesn't touch the original files, only RAttachments + attachSrc = + attach + .filter(contains(ctx)) + .flatMap(a => + asrcMap.get(a.id).map { src => + a.copy(fileId = src.fileId, name = src.name) + } + ) + } yield ItemData( + item, + attachSrc, + Vector.empty, + Vector.empty, + asrcMap.view.mapValues(_.fileId).toMap, + MetaProposalList.empty, + Nil + )).getOrElseF( + Sync[F].raiseError(new Exception(s"Item not found: ${ctx.args.itemId.id}")) + ) + } + + def processFiles[F[_]: ConcurrentEffect: ContextShift]( + cfg: Config, + fts: FtsClient[F], + data: ItemData + ): Task[F, Args, ItemData] = { + + val convertArgs: Language => Args => F[ProcessItemArgs] = + lang => + args => + ProcessItemArgs( + ProcessItemArgs.ProcessMeta( + data.item.cid, + args.itemId.some, + lang, + None, //direction + "", //source-id + None, //folder + Seq.empty + ), + Nil + ).pure[F] + + getLanguage[F].flatMap { lang => + ProcessItem + .processAttachments[F](cfg, fts)(data) + .contramap[Args](convertArgs(lang)) + } + } + + def getLanguage[F[_]: Sync]: Task[F, Args, Language] = + Task { ctx => + (for { + coll <- OptionT(ctx.store.transact(RCollective.findByItem(ctx.args.itemId))) + lang = coll.language + } yield lang).getOrElse(Language.German) + } + + def isLastRetry[F[_]: Sync]: Task[F, Args, Boolean] = + Task(_.isLastRetry) + + def safeProcess[F[_]: ConcurrentEffect: ContextShift]( + cfg: Config, + fts: FtsClient[F] + )(data: ItemData): Task[F, Args, ItemData] = + isLastRetry[F].flatMap { + case true => + processFiles[F](cfg, fts, data).attempt + .flatMap({ + case Right(d) => + Task.pure(d) + case Left(ex) => + logWarn[F]( + "Processing failed on last retry." + ).andThen(_ => Sync[F].raiseError(ex)) + }) + case false => + processFiles[F](cfg, fts, data) + } + + private def logWarn[F[_]](msg: => String): Task[F, Args, Unit] = + Task(_.logger.warn(msg)) +} diff --git a/modules/restapi/src/main/resources/docspell-openapi.yml b/modules/restapi/src/main/resources/docspell-openapi.yml index 7833b28e..94f84dd0 100644 --- a/modules/restapi/src/main/resources/docspell-openapi.yml +++ b/modules/restapi/src/main/resources/docspell-openapi.yml @@ -1213,6 +1213,33 @@ paths: schema: $ref: "#/components/schemas/BasicResult" + /sec/item/convertallpdfs: + post: + tags: [ Item ] + summary: Convert all non-converted pdfs. + description: | + Submits a job that will find all pdf files that have not been + converted and converts them using the ocrmypdf tool (if + enabled). This tool has been added in version 0.9.0 and so + older files can be "migrated" this way, or maybe after + enabling the tool. + + The task finds all files of the current collective and submits + task for each file to convert. These tasks are submitted with + a low priority so that normal processing can still proceed. + + The body of the request should be empty. + security: + - authTokenHeader: [] + responses: + 200: + description: Ok + content: + application/json: + schema: + $ref: "#/components/schemas/BasicResult" + + /sec/item/search: post: tags: [ Item ] @@ -1796,6 +1823,31 @@ paths: application/json: schema: $ref: "#/components/schemas/ItemProposals" + /sec/item/{itemId}/reprocess: + post: + tags: [ Item ] + summary: Start reprocessing the files of the item. + description: | + This submits a job that will re-process the files (either all + or the ones specified) of the item and replace the metadata. + security: + - authTokenHeader: [] + parameters: + - $ref: "#/components/parameters/id" + requestBody: + content: + application/json: + schema: + $ref: "#/components/schemas/IdList" + responses: + 200: + description: Ok + content: + application/json: + schema: + $ref: "#/components/schemas/BasicResult" + + /sec/item/{itemId}/attachment/movebefore: post: tags: [ Item ] @@ -2604,6 +2656,17 @@ paths: components: schemas: + IdList: + description: + A list of identifiers. + required: + - ids + properties: + ids: + type: array + items: + type: string + format: ident StringList: description: | A simple list of strings. diff --git a/modules/restserver/src/main/scala/docspell/restserver/routes/ItemRoutes.scala b/modules/restserver/src/main/scala/docspell/restserver/routes/ItemRoutes.scala index 8f51d79a..a033791d 100644 --- a/modules/restserver/src/main/scala/docspell/restserver/routes/ItemRoutes.scala +++ b/modules/restserver/src/main/scala/docspell/restserver/routes/ItemRoutes.scala @@ -31,6 +31,13 @@ object ItemRoutes { import dsl._ HttpRoutes.of { + case POST -> Root / "convertallpdfs" => + for { + res <- + backend.item.convertAllPdf(user.account.collective.some, user.account, true) + resp <- Ok(Conversions.basicResult(res, "Task submitted")) + } yield resp + case req @ POST -> Root / "search" => for { mask <- req.as[ItemSearch] @@ -279,6 +286,15 @@ object ItemRoutes { resp <- Ok(Conversions.basicResult(res, "Attachment moved.")) } yield resp + case req @ POST -> Root / Ident(id) / "reprocess" => + for { + data <- req.as[IdList] + ids = data.ids.flatMap(s => Ident.fromString(s).toOption) + _ <- logger.fdebug(s"Re-process item ${id.id}") + res <- backend.item.reprocess(id, ids, user.account, true) + resp <- Ok(Conversions.basicResult(res, "Re-process task submitted.")) + } yield resp + case DELETE -> Root / Ident(id) => for { n <- backend.item.deleteItem(id, user.account.collective) diff --git a/modules/restserver/src/main/scala/docspell/restserver/routes/JobQueueRoutes.scala b/modules/restserver/src/main/scala/docspell/restserver/routes/JobQueueRoutes.scala index fc605f74..4a34b219 100644 --- a/modules/restserver/src/main/scala/docspell/restserver/routes/JobQueueRoutes.scala +++ b/modules/restserver/src/main/scala/docspell/restserver/routes/JobQueueRoutes.scala @@ -21,7 +21,7 @@ object JobQueueRoutes { HttpRoutes.of { case GET -> Root / "state" => for { - js <- backend.job.queueState(user.account.collective, 200) + js <- backend.job.queueState(user.account.collective, 40) res = Conversions.mkJobQueueState(js) resp <- Ok(res) } yield resp diff --git a/modules/store/src/main/scala/docspell/store/queries/QJob.scala b/modules/store/src/main/scala/docspell/store/queries/QJob.scala index 99f94b67..f3521ed9 100644 --- a/modules/store/src/main/scala/docspell/store/queries/QJob.scala +++ b/modules/store/src/main/scala/docspell/store/queries/QJob.scala @@ -209,7 +209,8 @@ object QJob { store.transact(RJob.findFromIds(ids)) def queueStateSnapshot( - collective: Ident + collective: Ident, + max: Long ): Stream[ConnectionIO, (RJob, Vector[RJobLog])] = { val JC = RJob.Columns val waiting: Set[JobState] = Set(JobState.Waiting, JobState.Stuck, JobState.Scheduled) @@ -218,18 +219,34 @@ object QJob { def selectJobs(now: Timestamp): Stream[ConnectionIO, RJob] = { val refDate = now.minusHours(24) - val sql = selectSimple( + + val runningJobs = (selectSimple( + JC.all, + RJob.table, + and(JC.group.is(collective), JC.state.isOneOf(running.toSeq)) + ) ++ orderBy(JC.submitted.desc)).query[RJob].stream + + val waitingJobs = (selectSimple( JC.all, RJob.table, and( JC.group.is(collective), - or( - and(JC.state.isOneOf(done.toSeq), JC.submitted.isGt(refDate)), - JC.state.isOneOf((running ++ waiting).toSeq) - ) + JC.state.isOneOf(waiting.toSeq), + JC.submitted.isGt(refDate) ) - ) - (sql ++ orderBy(JC.submitted.desc)).query[RJob].stream + ) ++ orderBy(JC.submitted.desc)).query[RJob].stream.take(max) + + val doneJobs = (selectSimple( + JC.all, + RJob.table, + and( + JC.group.is(collective), + JC.state.isOneOf(done.toSeq), + JC.submitted.isGt(refDate) + ) + ) ++ orderBy(JC.submitted.desc)).query[RJob].stream.take(max) + + runningJobs ++ waitingJobs ++ doneJobs } def selectLogs(job: RJob): ConnectionIO[Vector[RJobLog]] = diff --git a/modules/store/src/main/scala/docspell/store/queue/JobQueue.scala b/modules/store/src/main/scala/docspell/store/queue/JobQueue.scala index f7d15ed5..127a45e1 100644 --- a/modules/store/src/main/scala/docspell/store/queue/JobQueue.scala +++ b/modules/store/src/main/scala/docspell/store/queue/JobQueue.scala @@ -28,6 +28,8 @@ trait JobQueue[F[_]] { def insertAll(jobs: Seq[RJob]): F[Unit] + def insertAllIfNew(jobs: Seq[RJob]): F[Unit] + def nextJob( prio: Ident => F[Priority], worker: Ident, @@ -81,5 +83,13 @@ object JobQueue { logger.error(ex)("Could not insert job. Skipping it.") }) + def insertAllIfNew(jobs: Seq[RJob]): F[Unit] = + jobs.toList + .traverse(j => insertIfNew(j).attempt) + .map(_.foreach { + case Right(()) => + case Left(ex) => + logger.error(ex)("Could not insert job. Skipping it.") + }) }) } diff --git a/modules/store/src/main/scala/docspell/store/records/RAttachment.scala b/modules/store/src/main/scala/docspell/store/records/RAttachment.scala index 61d676b6..fa6bb724 100644 --- a/modules/store/src/main/scala/docspell/store/records/RAttachment.scala +++ b/modules/store/src/main/scala/docspell/store/records/RAttachment.scala @@ -1,6 +1,7 @@ package docspell.store.records import cats.implicits._ +import fs2.Stream import docspell.common._ import docspell.store.impl.Implicits._ @@ -71,6 +72,16 @@ object RAttachment { commas(fileId.setTo(fId), name.setTo(fname)) ).update.run + def updateFileId( + attachId: Ident, + fId: Ident + ): ConnectionIO[Int] = + updateRow( + table, + id.is(attachId), + fileId.setTo(fId) + ).update.run + def updatePosition(attachId: Ident, pos: Int): ConnectionIO[Int] = updateRow(table, id.is(attachId), position.setTo(pos)).update.run @@ -187,4 +198,32 @@ object RAttachment { def findItemId(attachId: Ident): ConnectionIO[Option[Ident]] = selectSimple(Seq(itemId), table, id.is(attachId)).query[Ident].option + + def findNonConvertedPdf( + coll: Option[Ident], + chunkSize: Int + ): Stream[ConnectionIO, RAttachment] = { + val aId = Columns.id.prefix("a") + val aItem = Columns.itemId.prefix("a") + val aFile = Columns.fileId.prefix("a") + val sId = RAttachmentSource.Columns.id.prefix("s") + val sFile = RAttachmentSource.Columns.fileId.prefix("s") + val iId = RItem.Columns.id.prefix("i") + val iColl = RItem.Columns.cid.prefix("i") + val mId = RFileMeta.Columns.id.prefix("m") + val mType = RFileMeta.Columns.mimetype.prefix("m") + val pdfType = "application/pdf%" + + val from = table ++ fr"a INNER JOIN" ++ + RAttachmentSource.table ++ fr"s ON" ++ sId.is(aId) ++ fr"INNER JOIN" ++ + RItem.table ++ fr"i ON" ++ iId.is(aItem) ++ fr"INNER JOIN" ++ + RFileMeta.table ++ fr"m ON" ++ aFile.is(mId) + val where = coll match { + case Some(cid) => and(iColl.is(cid), aFile.is(sFile), mType.lowerLike(pdfType)) + case None => and(aFile.is(sFile), mType.lowerLike(pdfType)) + } + selectSimple(all.map(_.prefix("a")), from, where) + .query[RAttachment] + .streamWithChunkSize(chunkSize) + } } diff --git a/modules/store/src/main/scala/docspell/store/records/RAttachmentSource.scala b/modules/store/src/main/scala/docspell/store/records/RAttachmentSource.scala index 58b0a6c7..d732ecff 100644 --- a/modules/store/src/main/scala/docspell/store/records/RAttachmentSource.scala +++ b/modules/store/src/main/scala/docspell/store/records/RAttachmentSource.scala @@ -42,6 +42,12 @@ object RAttachmentSource { def findById(attachId: Ident): ConnectionIO[Option[RAttachmentSource]] = selectSimple(all, table, id.is(attachId)).query[RAttachmentSource].option + def isSameFile(attachId: Ident, file: Ident): ConnectionIO[Boolean] = + selectCount(id, table, and(id.is(attachId), fileId.is(file))) + .query[Int] + .unique + .map(_ > 0) + def delete(attachId: Ident): ConnectionIO[Int] = deleteFrom(table, id.is(attachId)).update.run @@ -64,6 +70,17 @@ object RAttachmentSource { selectSimple(all.map(_.prefix("a")), from, where).query[RAttachmentSource].option } + def findByItem(itemId: Ident): ConnectionIO[Vector[RAttachmentSource]] = { + val sId = Columns.id.prefix("s") + val aId = RAttachment.Columns.id.prefix("a") + val aItem = RAttachment.Columns.itemId.prefix("a") + + val from = table ++ fr"s INNER JOIN" ++ RAttachment.table ++ fr"a ON" ++ sId.is(aId) + selectSimple(all.map(_.prefix("s")), from, aItem.is(itemId)) + .query[RAttachmentSource] + .to[Vector] + } + def findByItemWithMeta( id: Ident ): ConnectionIO[Vector[(RAttachmentSource, FileMeta)]] = { diff --git a/modules/store/src/main/scala/docspell/store/records/RCollective.scala b/modules/store/src/main/scala/docspell/store/records/RCollective.scala index 9d27bd1e..fa40e374 100644 --- a/modules/store/src/main/scala/docspell/store/records/RCollective.scala +++ b/modules/store/src/main/scala/docspell/store/records/RCollective.scala @@ -75,6 +75,14 @@ object RCollective { sql.query[RCollective].option } + def findByItem(itemId: Ident): ConnectionIO[Option[RCollective]] = { + val iColl = RItem.Columns.cid.prefix("i") + val iId = RItem.Columns.id.prefix("i") + val cId = id.prefix("c") + val from = RItem.table ++ fr"i INNER JOIN" ++ table ++ fr"c ON" ++ iColl.is(cId) + selectSimple(all.map(_.prefix("c")), from, iId.is(itemId)).query[RCollective].option + } + def existsById(cid: Ident): ConnectionIO[Boolean] = { val sql = selectCount(id, table, id.is(cid)) sql.query[Int].unique.map(_ > 0) @@ -90,5 +98,19 @@ object RCollective { sql.query[RCollective].stream } + def findByAttachment(attachId: Ident): ConnectionIO[Option[RCollective]] = { + val iColl = RItem.Columns.cid.prefix("i") + val iId = RItem.Columns.id.prefix("i") + val aItem = RAttachment.Columns.itemId.prefix("a") + val aId = RAttachment.Columns.id.prefix("a") + val cId = Columns.id.prefix("c") + + val from = table ++ fr"c INNER JOIN" ++ + RItem.table ++ fr"i ON" ++ cId.is(iColl) ++ fr"INNER JOIN" ++ + RAttachment.table ++ fr"a ON" ++ aItem.is(iId) + + selectSimple(all.map(_.prefix("c")), from, aId.is(attachId)).query[RCollective].option + } + case class Settings(language: Language, integrationEnabled: Boolean) } diff --git a/modules/store/src/main/scala/docspell/store/records/RItem.scala b/modules/store/src/main/scala/docspell/store/records/RItem.scala index e961e8b2..a0025ddb 100644 --- a/modules/store/src/main/scala/docspell/store/records/RItem.scala +++ b/modules/store/src/main/scala/docspell/store/records/RItem.scala @@ -314,6 +314,9 @@ object RItem { def findByIdAndCollective(itemId: Ident, coll: Ident): ConnectionIO[Option[RItem]] = selectSimple(all, table, and(id.is(itemId), cid.is(coll))).query[RItem].option + def findById(itemId: Ident): ConnectionIO[Option[RItem]] = + selectSimple(all, table, id.is(itemId)).query[RItem].option + def checkByIdAndCollective(itemId: Ident, coll: Ident): ConnectionIO[Option[Ident]] = selectSimple(Seq(id), table, and(id.is(itemId), cid.is(coll))).query[Ident].option diff --git a/tools/convert-all-pdfs.sh b/tools/convert-all-pdfs.sh new file mode 100755 index 00000000..5e47e2e1 --- /dev/null +++ b/tools/convert-all-pdfs.sh @@ -0,0 +1,29 @@ +#!/usr/bin/env bash +# +# Simple script to authenticate with docspell and trigger the "convert +# all pdf" route that submits a task to convert all pdf files using +# ocrmypdf. + +set -e + +BASE_URL="${1:-http://localhost:7880}" +LOGIN_URL="$BASE_URL/api/v1/open/auth/login" +TRIGGER_URL="$BASE_URL/api/v1/sec/item/convertallpdfs" + +echo "Login to trigger converting all pdfs." +echo "Using url: $BASE_URL" +echo -n "Account: " +read USER +echo -n "Password: " +read -s PASS +echo + +auth=$(curl --fail -XPOST --silent --data-binary "{\"account\":\"$USER\", \"password\":\"$PASS\"}" "$LOGIN_URL") + +if [ "$(echo $auth | jq .success)" == "true" ]; then + echo "Login successful" + auth_token=$(echo $auth | jq -r .token) + curl --fail -XPOST -H "X-Docspell-Auth: $auth_token" "$TRIGGER_URL" +else + echo "Login failed." +fi diff --git a/website/site/content/docs/joex/_index.md b/website/site/content/docs/joex/_index.md index c9ac7fd7..bc0bd517 100644 --- a/website/site/content/docs/joex/_index.md +++ b/website/site/content/docs/joex/_index.md @@ -67,7 +67,7 @@ logged in. The relevant part of the config file regarding the scheduler is shown below with some explanations. -``` +``` conf docspell.joex { # other settings left out for brevity diff --git a/website/site/content/docs/tools/convert-all-pdf.md b/website/site/content/docs/tools/convert-all-pdf.md new file mode 100644 index 00000000..a0b91aea --- /dev/null +++ b/website/site/content/docs/tools/convert-all-pdf.md @@ -0,0 +1,46 @@ ++++ +title = "Convert All PDFs" +description = "Convert all PDF files using OcrMyPdf." +weight = 60 ++++ + +# convert-all-pdf.sh + +With version 0.9.0 there was support added for another external tool, +[OCRMyPdf](https://github.com/jbarlow83/OCRmyPDF), that can convert +PDF files such that they contain the OCR-ed text layer. This tool is +optional and can be disabled. + +In order to convert all previously processed files with this tool, +there is an +[endpoint](/openapi/docspell-openapi.html#api-Item-secItemConvertallpdfsPost) +that submits a task to convert all PDF files not already converted for +your collective. + +There is no UI part to trigger this route, so you need to use curl or +the script `convert-all-pdfs.sh` in the `tools/` directory. + + +# Usage + +``` +./convert-all-pdfs.sh [docspell-base-url] +``` + +For example, if docspell is at `http://localhost:7880`: + +``` +./convert-all-pdfs.sh http://localhost:7880 +``` + +The script asks for your account name and password. It then logs in +and triggers the said endpoint. After this you should see a few tasks +running. + +There will be one task per file to convert. All these tasks are +submitted with a low priority. So files uploaded through the webapp or +a [source](@/docs/webapp/uploading.md#anonymous-upload) with a high +priority, will be preferred as [configured in the job +executor](@/docs/joex/_index.md#scheduler-config). This is to not +disturb normal processing when many conversion tasks are being +executed. diff --git a/website/site/content/docs/webapp/sources-edit.png b/website/site/content/docs/webapp/sources-edit.png new file mode 100644 index 00000000..23804991 Binary files /dev/null and b/website/site/content/docs/webapp/sources-edit.png differ diff --git a/website/site/content/docs/webapp/uploading.md b/website/site/content/docs/webapp/uploading.md index 4f729435..a8f37381 100644 --- a/website/site/content/docs/webapp/uploading.md +++ b/website/site/content/docs/webapp/uploading.md @@ -29,6 +29,8 @@ scripts. For this the next variant exists. It is also possible to upload files without authentication. This should make tools that interact with docspell much easier to write. +The [Android Client App](@/docs/tools/android.md) uses these urls to +upload files. Go to "Collective Settings" and then to the "Source" tab. A *Source* identifies an endpoint where files can be uploaded anonymously. @@ -41,7 +43,7 @@ username is not visible. Example screenshot: -{{ figure(file="sources-form.png") }} +{{ figure(file="sources-edit.png") }} This example shows a source with name "test". Besides a description and a name that is only used for displaying purposes, a priority and a @@ -58,25 +60,26 @@ The source endpoint defines two urls: - `/app/upload/` - `/api/v1/open/upload/item/` +{{ figure(file="sources-form.png") }} + The first points to a web page where everyone could upload files into your account. You could give this url to people for sending files directly into your docspell. The second url is the API url, which accepts the requests to upload -files (it is used by the upload page, the first url). +files. This second url can be used with the [Android Client +App](@/docs/tools/android.md) to upload files. -For example, the api url can be used to upload files with curl: +Another example is to use curl for uploading files from the command +line:: ``` bash $ curl -XPOST -F file=@test.pdf http://192.168.1.95:7880/api/v1/open/upload/item/3H7hvJcDJuk-NrAW4zxsdfj-K6TMPyb6BGP-xKptVxUdqWa {"success":true,"message":"Files submitted."} ``` -You could add more `-F file=@/path/to/your/file.pdf` to upload -multiple files (note, the `@` is required by curl, so it knows that -the following is a file). There is a [script -provided](@/docs/tools/ds.md) that uses this to upload files from the -command line. +There is a [script provided](@/docs/tools/ds.md) that uses curl to +upload files from the command line more conveniently. When files are uploaded to an source endpoint, the items resulting from this uploads are marked with the name of the source. So you know