mirror of
https://github.com/TheAnachronism/docspell.git
synced 2025-04-04 10:29:34 +00:00
Add a task to convert all pdfs that have not been converted
This commit is contained in:
parent
07e9a9767e
commit
41ea071555
@ -0,0 +1,14 @@
|
||||
package docspell.common
|
||||
|
||||
import io.circe._
|
||||
import io.circe.generic.semiauto._
|
||||
|
||||
case class ConvertAllPdfArgs(collective: Option[Ident])
|
||||
|
||||
object ConvertAllPdfArgs {
|
||||
val taskName = Ident.unsafe("submit-pdf-migration-tasks")
|
||||
implicit val jsonDecoder: Decoder[ConvertAllPdfArgs] =
|
||||
deriveDecoder[ConvertAllPdfArgs]
|
||||
implicit val jsonEncoder: Encoder[ConvertAllPdfArgs] =
|
||||
deriveEncoder[ConvertAllPdfArgs]
|
||||
}
|
@ -13,6 +13,8 @@ import docspell.ftssolr.SolrFtsClient
|
||||
import docspell.joex.fts.{MigrationTask, ReIndexTask}
|
||||
import docspell.joex.hk._
|
||||
import docspell.joex.notify._
|
||||
import docspell.joex.pdfconv.ConvertAllPdfTask
|
||||
import docspell.joex.pdfconv.PdfConvTask
|
||||
import docspell.joex.process.ItemHandler
|
||||
import docspell.joex.process.ReProcessItem
|
||||
import docspell.joex.scanmailbox._
|
||||
@ -139,6 +141,20 @@ object JoexAppImpl {
|
||||
HouseKeepingTask.onCancel[F]
|
||||
)
|
||||
)
|
||||
.withTask(
|
||||
JobTask.json(
|
||||
PdfConvTask.taskName,
|
||||
PdfConvTask[F](cfg),
|
||||
PdfConvTask.onCancel[F]
|
||||
)
|
||||
)
|
||||
.withTask(
|
||||
JobTask.json(
|
||||
ConvertAllPdfArgs.taskName,
|
||||
ConvertAllPdfTask[F](queue, joex),
|
||||
ConvertAllPdfTask.onCancel[F]
|
||||
)
|
||||
)
|
||||
.resource
|
||||
psch <- PeriodicScheduler.create(
|
||||
cfg.periodicScheduler,
|
||||
|
@ -0,0 +1,68 @@
|
||||
package docspell.joex.pdfconv
|
||||
|
||||
import cats.effect._
|
||||
import cats.implicits._
|
||||
import fs2.{Chunk, Stream}
|
||||
|
||||
import docspell.backend.ops.OJoex
|
||||
import docspell.common._
|
||||
import docspell.joex.scheduler.{Context, Task}
|
||||
import docspell.store.queue.JobQueue
|
||||
import docspell.store.records.RAttachment
|
||||
import docspell.store.records._
|
||||
|
||||
object ConvertAllPdfTask {
|
||||
type Args = ConvertAllPdfArgs
|
||||
|
||||
def apply[F[_]: Sync](queue: JobQueue[F], joex: OJoex[F]): Task[F, Args, Unit] =
|
||||
Task { ctx =>
|
||||
for {
|
||||
_ <- ctx.logger.info("Converting older pdfs using ocrmypdf")
|
||||
n <- submitConversionJobs(ctx, queue)
|
||||
_ <- ctx.logger.info(s"Submitted $n jobs for file conversion")
|
||||
_ <- joex.notifyAllNodes
|
||||
} yield ()
|
||||
}
|
||||
|
||||
def onCancel[F[_]: Sync]: Task[F, Args, Unit] =
|
||||
Task.log(_.warn("Cancelling convert-old-pdf task"))
|
||||
|
||||
def submitConversionJobs[F[_]: Sync](
|
||||
ctx: Context[F, Args],
|
||||
queue: JobQueue[F]
|
||||
): F[Int] =
|
||||
ctx.store
|
||||
.transact(RAttachment.findNonConvertedPdf(ctx.args.collective, 50))
|
||||
.chunks
|
||||
.flatMap(createJobs[F](ctx))
|
||||
.chunks
|
||||
.evalMap(jobs => queue.insertAll(jobs.toVector).map(_ => jobs.size))
|
||||
.evalTap(n => ctx.logger.debug(s"Submitted $n jobs …"))
|
||||
.compile
|
||||
.foldMonoid
|
||||
|
||||
private def createJobs[F[_]: Sync](
|
||||
ctx: Context[F, Args]
|
||||
)(ras: Chunk[RAttachment]): Stream[F, RJob] = {
|
||||
val collectiveOrSystem = ctx.args.collective.getOrElse(DocspellSystem.taskGroup)
|
||||
|
||||
def mkJob(ra: RAttachment): F[RJob] =
|
||||
for {
|
||||
id <- Ident.randomId[F]
|
||||
now <- Timestamp.current[F]
|
||||
} yield RJob.newJob(
|
||||
id,
|
||||
PdfConvTask.taskName,
|
||||
collectiveOrSystem,
|
||||
PdfConvTask.Args(ra.id),
|
||||
s"Convert pdf ${ra.id.id}/${ra.name.getOrElse("-")}",
|
||||
now,
|
||||
collectiveOrSystem,
|
||||
Priority.Low,
|
||||
Some(ra.id)
|
||||
)
|
||||
|
||||
val jobs = ras.traverse(mkJob)
|
||||
Stream.evalUnChunk(jobs)
|
||||
}
|
||||
}
|
@ -0,0 +1,155 @@
|
||||
package docspell.joex.pdfconv
|
||||
|
||||
import cats.data.Kleisli
|
||||
import cats.data.OptionT
|
||||
import cats.effect._
|
||||
import cats.implicits._
|
||||
import fs2.Stream
|
||||
|
||||
import docspell.common._
|
||||
import docspell.convert.ConversionResult
|
||||
import docspell.convert.extern.OcrMyPdf
|
||||
import docspell.joex.Config
|
||||
import docspell.joex.scheduler.{Context, Task}
|
||||
import docspell.store.records._
|
||||
|
||||
import bitpeace.FileMeta
|
||||
import bitpeace.Mimetype
|
||||
import bitpeace.MimetypeHint
|
||||
import bitpeace.RangeDef
|
||||
import io.circe.generic.semiauto._
|
||||
import io.circe.{Decoder, Encoder}
|
||||
|
||||
/** Converts the given attachment file using ocrmypdf if it is a pdf
|
||||
* and has not already been converted (the source file is the same as
|
||||
* in the attachment).
|
||||
*/
|
||||
object PdfConvTask {
|
||||
case class Args(attachId: Ident)
|
||||
object Args {
|
||||
implicit val jsonDecoder: Decoder[Args] =
|
||||
deriveDecoder[Args]
|
||||
implicit val jsonEncoder: Encoder[Args] =
|
||||
deriveEncoder[Args]
|
||||
}
|
||||
|
||||
val taskName = Ident.unsafe("pdf-files-migration")
|
||||
|
||||
def apply[F[_]: Sync: ContextShift](cfg: Config): Task[F, Args, Unit] =
|
||||
Task { ctx =>
|
||||
for {
|
||||
_ <- ctx.logger.info(s"Converting pdf file ${ctx.args} using ocrmypdf")
|
||||
meta <- checkInputs(cfg, ctx)
|
||||
_ <- meta.traverse(fm => convert(cfg, ctx, fm))
|
||||
} yield ()
|
||||
}
|
||||
|
||||
def onCancel[F[_]: Sync]: Task[F, Args, Unit] =
|
||||
Task.log(_.warn("Cancelling pdfconv task"))
|
||||
|
||||
// --- Helper
|
||||
|
||||
// check if file exists and if it is pdf and if source id is the same and if ocrmypdf is enabled
|
||||
def checkInputs[F[_]: Sync](cfg: Config, ctx: Context[F, Args]): F[Option[FileMeta]] = {
|
||||
val none: Option[FileMeta] = None
|
||||
val checkSameFiles =
|
||||
(for {
|
||||
ra <- OptionT(ctx.store.transact(RAttachment.findById(ctx.args.attachId)))
|
||||
isSame <- OptionT.liftF(
|
||||
ctx.store.transact(RAttachmentSource.isSameFile(ra.id, ra.fileId))
|
||||
)
|
||||
} yield isSame).getOrElse(false)
|
||||
val existsPdf =
|
||||
for {
|
||||
meta <- ctx.store.transact(RAttachment.findMeta(ctx.args.attachId))
|
||||
res = meta.filter(_.mimetype.matches(Mimetype.`application/pdf`))
|
||||
_ <-
|
||||
if (res.isEmpty)
|
||||
ctx.logger.info(
|
||||
s"The attachment ${ctx.args.attachId} doesn't exist or is no pdf: $meta"
|
||||
)
|
||||
else ().pure[F]
|
||||
} yield res
|
||||
|
||||
if (cfg.convert.ocrmypdf.enabled)
|
||||
checkSameFiles.flatMap {
|
||||
case true => existsPdf
|
||||
case false =>
|
||||
ctx.logger.info(
|
||||
s"The attachment ${ctx.args.attachId} already has been converted. Skipping."
|
||||
) *>
|
||||
none.pure[F]
|
||||
}
|
||||
else none.pure[F]
|
||||
}
|
||||
|
||||
def convert[F[_]: Sync: ContextShift](
|
||||
cfg: Config,
|
||||
ctx: Context[F, Args],
|
||||
in: FileMeta
|
||||
): F[Unit] = {
|
||||
val bp = ctx.store.bitpeace
|
||||
val data = Stream
|
||||
.emit(in)
|
||||
.through(bp.fetchData2(RangeDef.all))
|
||||
|
||||
val storeResult: ConversionResult.Handler[F, Unit] =
|
||||
Kleisli({
|
||||
case ConversionResult.SuccessPdf(file) =>
|
||||
storeToAttachment(ctx, in, file)
|
||||
|
||||
case ConversionResult.SuccessPdfTxt(file, _) =>
|
||||
storeToAttachment(ctx, in, file)
|
||||
|
||||
case ConversionResult.UnsupportedFormat(mime) =>
|
||||
ctx.logger.warn(
|
||||
s"Unable to convert '${mime}' file ${ctx.args}: unsupported format."
|
||||
)
|
||||
|
||||
case ConversionResult.InputMalformed(mime, reason) =>
|
||||
ctx.logger.warn(s"Unable to convert '${mime}' file ${ctx.args}: $reason")
|
||||
|
||||
case ConversionResult.Failure(ex) =>
|
||||
ctx.logger.error(ex)(s"Failure converting file ${ctx.args}: ${ex.getMessage}")
|
||||
})
|
||||
|
||||
def ocrMyPdf(lang: Language): F[Unit] =
|
||||
OcrMyPdf.toPDF[F, Unit](
|
||||
cfg.convert.ocrmypdf,
|
||||
lang,
|
||||
in.chunksize,
|
||||
ctx.blocker,
|
||||
ctx.logger
|
||||
)(data, storeResult)
|
||||
|
||||
for {
|
||||
lang <- getLanguage(ctx)
|
||||
_ <- ocrMyPdf(lang)
|
||||
} yield ()
|
||||
}
|
||||
|
||||
def getLanguage[F[_]: Sync](ctx: Context[F, Args]): F[Language] =
|
||||
(for {
|
||||
coll <- OptionT(ctx.store.transact(RCollective.findByAttachment(ctx.args.attachId)))
|
||||
lang = coll.language
|
||||
} yield lang).getOrElse(Language.German)
|
||||
|
||||
def storeToAttachment[F[_]: Sync](
|
||||
ctx: Context[F, Args],
|
||||
meta: FileMeta,
|
||||
newFile: Stream[F, Byte]
|
||||
): F[Unit] = {
|
||||
val mimeHint = MimetypeHint.advertised(meta.mimetype.asString)
|
||||
for {
|
||||
time <- Timestamp.current[F]
|
||||
fid <- Ident.randomId[F]
|
||||
_ <-
|
||||
ctx.store.bitpeace
|
||||
.saveNew(newFile, meta.chunksize, mimeHint, Some(fid.id), time.value)
|
||||
.compile
|
||||
.lastOrError
|
||||
_ <- ctx.store.transact(RAttachment.updateFileId(ctx.args.attachId, fid))
|
||||
} yield ()
|
||||
}
|
||||
|
||||
}
|
@ -1,6 +1,7 @@
|
||||
package docspell.store.records
|
||||
|
||||
import cats.implicits._
|
||||
import fs2.Stream
|
||||
|
||||
import docspell.common._
|
||||
import docspell.store.impl.Implicits._
|
||||
@ -197,4 +198,32 @@ object RAttachment {
|
||||
|
||||
def findItemId(attachId: Ident): ConnectionIO[Option[Ident]] =
|
||||
selectSimple(Seq(itemId), table, id.is(attachId)).query[Ident].option
|
||||
|
||||
def findNonConvertedPdf(
|
||||
coll: Option[Ident],
|
||||
chunkSize: Int
|
||||
): Stream[ConnectionIO, RAttachment] = {
|
||||
val aId = Columns.id.prefix("a")
|
||||
val aItem = Columns.itemId.prefix("a")
|
||||
val aFile = Columns.fileId.prefix("a")
|
||||
val sId = RAttachmentSource.Columns.id.prefix("s")
|
||||
val sFile = RAttachmentSource.Columns.fileId.prefix("s")
|
||||
val iId = RItem.Columns.id.prefix("i")
|
||||
val iColl = RItem.Columns.cid.prefix("i")
|
||||
val mId = RFileMeta.Columns.id.prefix("m")
|
||||
val mType = RFileMeta.Columns.mimetype.prefix("m")
|
||||
val pdfType = "application/pdf%"
|
||||
|
||||
val from = table ++ fr"a INNER JOIN" ++
|
||||
RAttachmentSource.table ++ fr"s ON" ++ sId.is(aId) ++ fr"INNER JOIN" ++
|
||||
RItem.table ++ fr"i ON" ++ iId.is(aItem) ++ fr"INNER JOIN" ++
|
||||
RFileMeta.table ++ fr"m ON" ++ aFile.is(mId)
|
||||
val where = coll match {
|
||||
case Some(cid) => and(iColl.is(cid), aFile.is(sFile), mType.lowerLike(pdfType))
|
||||
case None => and(aFile.is(sFile), mType.lowerLike(pdfType))
|
||||
}
|
||||
selectSimple(all.map(_.prefix("a")), from, where)
|
||||
.query[RAttachment]
|
||||
.streamWithChunkSize(chunkSize)
|
||||
}
|
||||
}
|
||||
|
@ -109,7 +109,7 @@ object RCollective {
|
||||
RItem.table ++ fr"i ON" ++ cId.is(iColl) ++ fr"INNER JOIN" ++
|
||||
RAttachment.table ++ fr"a ON" ++ aItem.is(iId)
|
||||
|
||||
selectSimple(all, from, aId.is(attachId)).query[RCollective].option
|
||||
selectSimple(all.map(_.prefix("c")), from, aId.is(attachId)).query[RCollective].option
|
||||
}
|
||||
|
||||
case class Settings(language: Language, integrationEnabled: Boolean)
|
||||
|
Loading…
x
Reference in New Issue
Block a user