diff --git a/modules/common/src/main/scala/docspell/common/ConvertAllPdfArgs.scala b/modules/common/src/main/scala/docspell/common/ConvertAllPdfArgs.scala new file mode 100644 index 00000000..d4ae5ba7 --- /dev/null +++ b/modules/common/src/main/scala/docspell/common/ConvertAllPdfArgs.scala @@ -0,0 +1,14 @@ +package docspell.common + +import io.circe._ +import io.circe.generic.semiauto._ + +case class ConvertAllPdfArgs(collective: Option[Ident]) + +object ConvertAllPdfArgs { + val taskName = Ident.unsafe("submit-pdf-migration-tasks") + implicit val jsonDecoder: Decoder[ConvertAllPdfArgs] = + deriveDecoder[ConvertAllPdfArgs] + implicit val jsonEncoder: Encoder[ConvertAllPdfArgs] = + deriveEncoder[ConvertAllPdfArgs] +} diff --git a/modules/joex/src/main/scala/docspell/joex/JoexAppImpl.scala b/modules/joex/src/main/scala/docspell/joex/JoexAppImpl.scala index 9882455c..f07e089e 100644 --- a/modules/joex/src/main/scala/docspell/joex/JoexAppImpl.scala +++ b/modules/joex/src/main/scala/docspell/joex/JoexAppImpl.scala @@ -13,6 +13,8 @@ import docspell.ftssolr.SolrFtsClient import docspell.joex.fts.{MigrationTask, ReIndexTask} import docspell.joex.hk._ import docspell.joex.notify._ +import docspell.joex.pdfconv.ConvertAllPdfTask +import docspell.joex.pdfconv.PdfConvTask import docspell.joex.process.ItemHandler import docspell.joex.process.ReProcessItem import docspell.joex.scanmailbox._ @@ -139,6 +141,20 @@ object JoexAppImpl { HouseKeepingTask.onCancel[F] ) ) + .withTask( + JobTask.json( + PdfConvTask.taskName, + PdfConvTask[F](cfg), + PdfConvTask.onCancel[F] + ) + ) + .withTask( + JobTask.json( + ConvertAllPdfArgs.taskName, + ConvertAllPdfTask[F](queue, joex), + ConvertAllPdfTask.onCancel[F] + ) + ) .resource psch <- PeriodicScheduler.create( cfg.periodicScheduler, diff --git a/modules/joex/src/main/scala/docspell/joex/pdfconv/ConvertAllPdfTask.scala b/modules/joex/src/main/scala/docspell/joex/pdfconv/ConvertAllPdfTask.scala new file mode 100644 index 00000000..c40d0783 --- /dev/null +++ b/modules/joex/src/main/scala/docspell/joex/pdfconv/ConvertAllPdfTask.scala @@ -0,0 +1,68 @@ +package docspell.joex.pdfconv + +import cats.effect._ +import cats.implicits._ +import fs2.{Chunk, Stream} + +import docspell.backend.ops.OJoex +import docspell.common._ +import docspell.joex.scheduler.{Context, Task} +import docspell.store.queue.JobQueue +import docspell.store.records.RAttachment +import docspell.store.records._ + +object ConvertAllPdfTask { + type Args = ConvertAllPdfArgs + + def apply[F[_]: Sync](queue: JobQueue[F], joex: OJoex[F]): Task[F, Args, Unit] = + Task { ctx => + for { + _ <- ctx.logger.info("Converting older pdfs using ocrmypdf") + n <- submitConversionJobs(ctx, queue) + _ <- ctx.logger.info(s"Submitted $n jobs for file conversion") + _ <- joex.notifyAllNodes + } yield () + } + + def onCancel[F[_]: Sync]: Task[F, Args, Unit] = + Task.log(_.warn("Cancelling convert-old-pdf task")) + + def submitConversionJobs[F[_]: Sync]( + ctx: Context[F, Args], + queue: JobQueue[F] + ): F[Int] = + ctx.store + .transact(RAttachment.findNonConvertedPdf(ctx.args.collective, 50)) + .chunks + .flatMap(createJobs[F](ctx)) + .chunks + .evalMap(jobs => queue.insertAll(jobs.toVector).map(_ => jobs.size)) + .evalTap(n => ctx.logger.debug(s"Submitted $n jobs …")) + .compile + .foldMonoid + + private def createJobs[F[_]: Sync]( + ctx: Context[F, Args] + )(ras: Chunk[RAttachment]): Stream[F, RJob] = { + val collectiveOrSystem = ctx.args.collective.getOrElse(DocspellSystem.taskGroup) + + def mkJob(ra: RAttachment): F[RJob] = + for { + id <- Ident.randomId[F] + now <- Timestamp.current[F] + } yield RJob.newJob( + id, + PdfConvTask.taskName, + collectiveOrSystem, + PdfConvTask.Args(ra.id), + s"Convert pdf ${ra.id.id}/${ra.name.getOrElse("-")}", + now, + collectiveOrSystem, + Priority.Low, + Some(ra.id) + ) + + val jobs = ras.traverse(mkJob) + Stream.evalUnChunk(jobs) + } +} diff --git a/modules/joex/src/main/scala/docspell/joex/pdfconv/PdfConvTask.scala b/modules/joex/src/main/scala/docspell/joex/pdfconv/PdfConvTask.scala new file mode 100644 index 00000000..07cc7c36 --- /dev/null +++ b/modules/joex/src/main/scala/docspell/joex/pdfconv/PdfConvTask.scala @@ -0,0 +1,155 @@ +package docspell.joex.pdfconv + +import cats.data.Kleisli +import cats.data.OptionT +import cats.effect._ +import cats.implicits._ +import fs2.Stream + +import docspell.common._ +import docspell.convert.ConversionResult +import docspell.convert.extern.OcrMyPdf +import docspell.joex.Config +import docspell.joex.scheduler.{Context, Task} +import docspell.store.records._ + +import bitpeace.FileMeta +import bitpeace.Mimetype +import bitpeace.MimetypeHint +import bitpeace.RangeDef +import io.circe.generic.semiauto._ +import io.circe.{Decoder, Encoder} + +/** Converts the given attachment file using ocrmypdf if it is a pdf + * and has not already been converted (the source file is the same as + * in the attachment). + */ +object PdfConvTask { + case class Args(attachId: Ident) + object Args { + implicit val jsonDecoder: Decoder[Args] = + deriveDecoder[Args] + implicit val jsonEncoder: Encoder[Args] = + deriveEncoder[Args] + } + + val taskName = Ident.unsafe("pdf-files-migration") + + def apply[F[_]: Sync: ContextShift](cfg: Config): Task[F, Args, Unit] = + Task { ctx => + for { + _ <- ctx.logger.info(s"Converting pdf file ${ctx.args} using ocrmypdf") + meta <- checkInputs(cfg, ctx) + _ <- meta.traverse(fm => convert(cfg, ctx, fm)) + } yield () + } + + def onCancel[F[_]: Sync]: Task[F, Args, Unit] = + Task.log(_.warn("Cancelling pdfconv task")) + + // --- Helper + + // check if file exists and if it is pdf and if source id is the same and if ocrmypdf is enabled + def checkInputs[F[_]: Sync](cfg: Config, ctx: Context[F, Args]): F[Option[FileMeta]] = { + val none: Option[FileMeta] = None + val checkSameFiles = + (for { + ra <- OptionT(ctx.store.transact(RAttachment.findById(ctx.args.attachId))) + isSame <- OptionT.liftF( + ctx.store.transact(RAttachmentSource.isSameFile(ra.id, ra.fileId)) + ) + } yield isSame).getOrElse(false) + val existsPdf = + for { + meta <- ctx.store.transact(RAttachment.findMeta(ctx.args.attachId)) + res = meta.filter(_.mimetype.matches(Mimetype.`application/pdf`)) + _ <- + if (res.isEmpty) + ctx.logger.info( + s"The attachment ${ctx.args.attachId} doesn't exist or is no pdf: $meta" + ) + else ().pure[F] + } yield res + + if (cfg.convert.ocrmypdf.enabled) + checkSameFiles.flatMap { + case true => existsPdf + case false => + ctx.logger.info( + s"The attachment ${ctx.args.attachId} already has been converted. Skipping." + ) *> + none.pure[F] + } + else none.pure[F] + } + + def convert[F[_]: Sync: ContextShift]( + cfg: Config, + ctx: Context[F, Args], + in: FileMeta + ): F[Unit] = { + val bp = ctx.store.bitpeace + val data = Stream + .emit(in) + .through(bp.fetchData2(RangeDef.all)) + + val storeResult: ConversionResult.Handler[F, Unit] = + Kleisli({ + case ConversionResult.SuccessPdf(file) => + storeToAttachment(ctx, in, file) + + case ConversionResult.SuccessPdfTxt(file, _) => + storeToAttachment(ctx, in, file) + + case ConversionResult.UnsupportedFormat(mime) => + ctx.logger.warn( + s"Unable to convert '${mime}' file ${ctx.args}: unsupported format." + ) + + case ConversionResult.InputMalformed(mime, reason) => + ctx.logger.warn(s"Unable to convert '${mime}' file ${ctx.args}: $reason") + + case ConversionResult.Failure(ex) => + ctx.logger.error(ex)(s"Failure converting file ${ctx.args}: ${ex.getMessage}") + }) + + def ocrMyPdf(lang: Language): F[Unit] = + OcrMyPdf.toPDF[F, Unit]( + cfg.convert.ocrmypdf, + lang, + in.chunksize, + ctx.blocker, + ctx.logger + )(data, storeResult) + + for { + lang <- getLanguage(ctx) + _ <- ocrMyPdf(lang) + } yield () + } + + def getLanguage[F[_]: Sync](ctx: Context[F, Args]): F[Language] = + (for { + coll <- OptionT(ctx.store.transact(RCollective.findByAttachment(ctx.args.attachId))) + lang = coll.language + } yield lang).getOrElse(Language.German) + + def storeToAttachment[F[_]: Sync]( + ctx: Context[F, Args], + meta: FileMeta, + newFile: Stream[F, Byte] + ): F[Unit] = { + val mimeHint = MimetypeHint.advertised(meta.mimetype.asString) + for { + time <- Timestamp.current[F] + fid <- Ident.randomId[F] + _ <- + ctx.store.bitpeace + .saveNew(newFile, meta.chunksize, mimeHint, Some(fid.id), time.value) + .compile + .lastOrError + _ <- ctx.store.transact(RAttachment.updateFileId(ctx.args.attachId, fid)) + } yield () + } + +} diff --git a/modules/store/src/main/scala/docspell/store/records/RAttachment.scala b/modules/store/src/main/scala/docspell/store/records/RAttachment.scala index 8c93de42..fa6bb724 100644 --- a/modules/store/src/main/scala/docspell/store/records/RAttachment.scala +++ b/modules/store/src/main/scala/docspell/store/records/RAttachment.scala @@ -1,6 +1,7 @@ package docspell.store.records import cats.implicits._ +import fs2.Stream import docspell.common._ import docspell.store.impl.Implicits._ @@ -197,4 +198,32 @@ object RAttachment { def findItemId(attachId: Ident): ConnectionIO[Option[Ident]] = selectSimple(Seq(itemId), table, id.is(attachId)).query[Ident].option + + def findNonConvertedPdf( + coll: Option[Ident], + chunkSize: Int + ): Stream[ConnectionIO, RAttachment] = { + val aId = Columns.id.prefix("a") + val aItem = Columns.itemId.prefix("a") + val aFile = Columns.fileId.prefix("a") + val sId = RAttachmentSource.Columns.id.prefix("s") + val sFile = RAttachmentSource.Columns.fileId.prefix("s") + val iId = RItem.Columns.id.prefix("i") + val iColl = RItem.Columns.cid.prefix("i") + val mId = RFileMeta.Columns.id.prefix("m") + val mType = RFileMeta.Columns.mimetype.prefix("m") + val pdfType = "application/pdf%" + + val from = table ++ fr"a INNER JOIN" ++ + RAttachmentSource.table ++ fr"s ON" ++ sId.is(aId) ++ fr"INNER JOIN" ++ + RItem.table ++ fr"i ON" ++ iId.is(aItem) ++ fr"INNER JOIN" ++ + RFileMeta.table ++ fr"m ON" ++ aFile.is(mId) + val where = coll match { + case Some(cid) => and(iColl.is(cid), aFile.is(sFile), mType.lowerLike(pdfType)) + case None => and(aFile.is(sFile), mType.lowerLike(pdfType)) + } + selectSimple(all.map(_.prefix("a")), from, where) + .query[RAttachment] + .streamWithChunkSize(chunkSize) + } } diff --git a/modules/store/src/main/scala/docspell/store/records/RCollective.scala b/modules/store/src/main/scala/docspell/store/records/RCollective.scala index 22115d5e..fa40e374 100644 --- a/modules/store/src/main/scala/docspell/store/records/RCollective.scala +++ b/modules/store/src/main/scala/docspell/store/records/RCollective.scala @@ -109,7 +109,7 @@ object RCollective { RItem.table ++ fr"i ON" ++ cId.is(iColl) ++ fr"INNER JOIN" ++ RAttachment.table ++ fr"a ON" ++ aItem.is(iId) - selectSimple(all, from, aId.is(attachId)).query[RCollective].option + selectSimple(all.map(_.prefix("c")), from, aId.is(attachId)).query[RCollective].option } case class Settings(language: Language, integrationEnabled: Boolean)