diff --git a/modules/backend/src/main/scala/docspell/backend/JobFactory.scala b/modules/backend/src/main/scala/docspell/backend/JobFactory.scala index fdb0d860..56ac2566 100644 --- a/modules/backend/src/main/scala/docspell/backend/JobFactory.scala +++ b/modules/backend/src/main/scala/docspell/backend/JobFactory.scala @@ -8,6 +8,26 @@ import docspell.store.records.RJob object JobFactory { + def makePageCount[F[_]: Sync]( + args: MakePageCountArgs, + account: Option[AccountId] + ): F[RJob] = + for { + id <- Ident.randomId[F] + now <- Timestamp.current[F] + job = RJob.newJob( + id, + MakePageCountArgs.taskName, + account.map(_.collective).getOrElse(DocspellSystem.taskGroup), + args, + s"Find page-count metadata for ${args.attachment.id}", + now, + account.map(_.user).getOrElse(DocspellSystem.user), + Priority.Low, + Some(MakePageCountArgs.taskName / args.attachment) + ) + } yield job + def makePreview[F[_]: Sync]( args: MakePreviewArgs, account: Option[AccountId] diff --git a/modules/common/src/main/scala/docspell/common/DocspellSystem.scala b/modules/common/src/main/scala/docspell/common/DocspellSystem.scala index ad410281..def2ade2 100644 --- a/modules/common/src/main/scala/docspell/common/DocspellSystem.scala +++ b/modules/common/src/main/scala/docspell/common/DocspellSystem.scala @@ -2,8 +2,9 @@ package docspell.common object DocspellSystem { - val user = Ident.unsafe("docspell-system") - val taskGroup = user - val migrationTaskTracker = Ident.unsafe("full-text-index-tracker") - val allPreviewTaskTracker = Ident.unsafe("generate-all-previews") + val user = Ident.unsafe("docspell-system") + val taskGroup = user + val migrationTaskTracker = Ident.unsafe("full-text-index-tracker") + val allPreviewTaskTracker = Ident.unsafe("generate-all-previews") + val allPageCountTaskTracker = Ident.unsafe("all-page-count-tracker") } diff --git a/modules/common/src/main/scala/docspell/common/MakePageCountArgs.scala b/modules/common/src/main/scala/docspell/common/MakePageCountArgs.scala new file mode 100644 index 00000000..ed955213 --- /dev/null +++ b/modules/common/src/main/scala/docspell/common/MakePageCountArgs.scala @@ -0,0 +1,24 @@ +package docspell.common + +import io.circe.generic.semiauto._ +import io.circe.{Decoder, Encoder} + +/** Arguments for the `MakePageCountTask` that reads the number of + * pages for an attachment and stores it into the meta data of the + * attachment. + */ +case class MakePageCountArgs( + attachment: Ident +) + +object MakePageCountArgs { + + val taskName = Ident.unsafe("make-page-count") + + implicit val jsonEncoder: Encoder[MakePageCountArgs] = + deriveEncoder[MakePageCountArgs] + + implicit val jsonDecoder: Decoder[MakePageCountArgs] = + deriveDecoder[MakePageCountArgs] + +} diff --git a/modules/joex/src/main/scala/docspell/joex/JoexAppImpl.scala b/modules/joex/src/main/scala/docspell/joex/JoexAppImpl.scala index 2b9b96c5..51fed2bc 100644 --- a/modules/joex/src/main/scala/docspell/joex/JoexAppImpl.scala +++ b/modules/joex/src/main/scala/docspell/joex/JoexAppImpl.scala @@ -16,6 +16,7 @@ import docspell.joex.fts.{MigrationTask, ReIndexTask} import docspell.joex.hk._ import docspell.joex.learn.LearnClassifierTask import docspell.joex.notify._ +import docspell.joex.pagecount._ import docspell.joex.pdfconv.ConvertAllPdfTask import docspell.joex.pdfconv.PdfConvTask import docspell.joex.preview._ @@ -72,7 +73,8 @@ final class JoexAppImpl[F[_]: ConcurrentEffect: ContextShift: Timer]( MigrationTask.job.flatMap(queue.insertIfNew) *> AllPreviewsTask .job(MakePreviewArgs.StoreMode.WhenMissing, None) - .flatMap(queue.insertIfNew) + .flatMap(queue.insertIfNew) *> + AllPageCountTask.job.flatMap(queue.insertIfNew) } object JoexAppImpl { @@ -185,6 +187,20 @@ object JoexAppImpl { AllPreviewsTask.onCancel[F] ) ) + .withTask( + JobTask.json( + MakePageCountArgs.taskName, + MakePageCountTask[F](), + MakePageCountTask.onCancel[F] + ) + ) + .withTask( + JobTask.json( + AllPageCountTask.taskName, + AllPageCountTask[F](queue, joex), + AllPageCountTask.onCancel[F] + ) + ) .resource psch <- PeriodicScheduler.create( cfg.periodicScheduler, diff --git a/modules/joex/src/main/scala/docspell/joex/pagecount/AllPageCountTask.scala b/modules/joex/src/main/scala/docspell/joex/pagecount/AllPageCountTask.scala new file mode 100644 index 00000000..43a93146 --- /dev/null +++ b/modules/joex/src/main/scala/docspell/joex/pagecount/AllPageCountTask.scala @@ -0,0 +1,75 @@ +package docspell.joex.pagecount + +import cats.effect._ +import cats.implicits._ +import fs2.{Chunk, Stream} + +import docspell.backend.JobFactory +import docspell.backend.ops.OJoex +import docspell.common._ +import docspell.joex.scheduler.Context +import docspell.joex.scheduler.Task +import docspell.store.queue.JobQueue +import docspell.store.records.RAttachment +import docspell.store.records.RJob + +object AllPageCountTask { + + val taskName = Ident.unsafe("all-page-count") + type Args = Unit + + def apply[F[_]: Sync](queue: JobQueue[F], joex: OJoex[F]): Task[F, Args, Unit] = + Task { ctx => + for { + _ <- ctx.logger.info("Generating previews for attachments") + n <- submitConversionJobs(ctx, queue) + _ <- ctx.logger.info(s"Submitted $n jobs") + _ <- joex.notifyAllNodes + } yield () + } + + def onCancel[F[_]: Sync]: Task[F, Args, Unit] = + Task.log(_.warn("Cancelling all-previews task")) + + def submitConversionJobs[F[_]: Sync]( + ctx: Context[F, Args], + queue: JobQueue[F] + ): F[Int] = + ctx.store + .transact(findAttachments) + .chunks + .flatMap(createJobs[F]) + .chunks + .evalMap(jobs => queue.insertAllIfNew(jobs.toVector).map(_ => jobs.size)) + .evalTap(n => ctx.logger.debug(s"Submitted $n jobs …")) + .compile + .foldMonoid + + private def findAttachments[F[_]] = + RAttachment.findAllWithoutPageCount(50) + + private def createJobs[F[_]: Sync](ras: Chunk[RAttachment]): Stream[F, RJob] = { + def mkJob(ra: RAttachment): F[RJob] = + JobFactory.makePageCount(MakePageCountArgs(ra.id), None) + + val jobs = ras.traverse(mkJob) + Stream.evalUnChunk(jobs) + } + + def job[F[_]: Sync]: F[RJob] = + for { + id <- Ident.randomId[F] + now <- Timestamp.current[F] + } yield RJob.newJob( + id, + AllPageCountTask.taskName, + DocspellSystem.taskGroup, + (), + "Create all page-counts", + now, + DocspellSystem.taskGroup, + Priority.Low, + Some(DocspellSystem.allPageCountTaskTracker) + ) + +} diff --git a/modules/joex/src/main/scala/docspell/joex/pagecount/PageCountTask.scala b/modules/joex/src/main/scala/docspell/joex/pagecount/PageCountTask.scala new file mode 100644 index 00000000..7e97186c --- /dev/null +++ b/modules/joex/src/main/scala/docspell/joex/pagecount/PageCountTask.scala @@ -0,0 +1,55 @@ +package docspell.joex.pagecount + +import cats.effect._ +import cats.implicits._ + +import docspell.common._ +import docspell.joex.process.AttachmentPageCount +import docspell.joex.scheduler.Context +import docspell.joex.scheduler.Task +import docspell.store.records.RAttachment +import docspell.store.records.RAttachmentMeta + +object MakePageCountTask { + + type Args = MakePageCountArgs + + def apply[F[_]: Sync](): Task[F, Args, Unit] = + Task { ctx => + for { + exists <- pageCountExists(ctx) + _ <- + if (exists) + ctx.logger.info( + s"PageCount already exists for attachment ${ctx.args.attachment}. Skipping." + ) + else + ctx.logger.info( + s"Reading page-count for attachment ${ctx.args.attachment}" + ) *> generatePageCount(ctx) + } yield () + } + + def onCancel[F[_]: Sync]: Task[F, Args, Unit] = + Task.log(_.warn("Cancelling make-page-count task")) + + private def generatePageCount[F[_]: Sync]( + ctx: Context[F, Args] + ): F[Unit] = + for { + ra <- ctx.store.transact(RAttachment.findById(ctx.args.attachment)) + _ <- ra + .map(AttachmentPageCount.createPageCount(ctx)) + .getOrElse( + ctx.logger.warn(s"No attachment found with id: ${ctx.args.attachment}") + ) + } yield () + + private def pageCountExists[F[_]: Sync](ctx: Context[F, Args]): F[Boolean] = + ctx.store.transact( + RAttachmentMeta + .findById(ctx.args.attachment) + .map(_.flatMap(_.pages).exists(_ > 0)) + ) + +} diff --git a/modules/joex/src/main/scala/docspell/joex/process/AttachmentPageCount.scala b/modules/joex/src/main/scala/docspell/joex/process/AttachmentPageCount.scala index c1dbe7e4..f3cf7b0e 100644 --- a/modules/joex/src/main/scala/docspell/joex/process/AttachmentPageCount.scala +++ b/modules/joex/src/main/scala/docspell/joex/process/AttachmentPageCount.scala @@ -50,14 +50,16 @@ object AttachmentPageCount { case MimeType.PdfMatch(_) => PdfboxExtract.getMetaData(loadFile(ctx)(ra)).flatMap { case Right(md) => - updatePageCount(ctx, md, ra).map(_.some) + ctx.logger.debug(s"Found number of pages: ${md.pageCount}") *> + updatePageCount(ctx, md, ra).map(_.some) case Left(ex) => ctx.logger.warn(s"Error obtaining pages count: ${ex.getMessage}") *> (None: Option[PdfMetaData]).pure[F] } - case _ => - (None: Option[PdfMetaData]).pure[F] + case mt => + ctx.logger.warn(s"Not a pdf file, but ${mt.asString}, cannot get page count.") *> + (None: Option[PdfMetaData]).pure[F] } private def updatePageCount[F[_]: Sync]( @@ -65,8 +67,23 @@ object AttachmentPageCount { md: PdfMetaData, ra: RAttachment ): F[PdfMetaData] = - ctx.store.transact(RAttachmentMeta.updatePageCount(ra.id, md.pageCount.some)) *> md - .pure[F] + for { + _ <- ctx.logger.debug( + s"Update attachment ${ra.id.id} with page count ${md.pageCount.some}" + ) + n <- ctx.store.transact(RAttachmentMeta.updatePageCount(ra.id, md.pageCount.some)) + m <- + if (n == 0) + ctx.logger.warn( + s"No attachmentmeta record exists for ${ra.id.id}. Creating new." + ) *> ctx.store.transact( + RAttachmentMeta.insert( + RAttachmentMeta(ra.id, None, Nil, MetaProposalList.empty, md.pageCount.some) + ) + ) + else 0.pure[F] + _ <- ctx.logger.debug(s"Stored page count (${n + m}).") + } yield md def findMime[F[_]: Functor](ctx: Context[F, _])(ra: RAttachment): F[MimeType] = OptionT(ctx.store.transact(RFileMeta.findById(ra.fileId))) diff --git a/modules/restserver/src/main/scala/docspell/restserver/conv/Conversions.scala b/modules/restserver/src/main/scala/docspell/restserver/conv/Conversions.scala index fc0ccb75..aba61555 100644 --- a/modules/restserver/src/main/scala/docspell/restserver/conv/Conversions.scala +++ b/modules/restserver/src/main/scala/docspell/restserver/conv/Conversions.scala @@ -15,6 +15,7 @@ import docspell.common.syntax.all._ import docspell.ftsclient.FtsResult import docspell.restapi.model._ import docspell.restserver.conv.Conversions._ +import docspell.store.queries.QItem import docspell.store.records._ import docspell.store.{AddResult, UpdateResult} @@ -22,7 +23,6 @@ import bitpeace.FileMeta import org.http4s.headers.`Content-Type` import org.http4s.multipart.Multipart import org.log4s.Logger -import docspell.store.queries.QItem trait Conversions { diff --git a/modules/store/src/main/scala/docspell/store/records/RAttachment.scala b/modules/store/src/main/scala/docspell/store/records/RAttachment.scala index fa1453b6..b23c4146 100644 --- a/modules/store/src/main/scala/docspell/store/records/RAttachment.scala +++ b/modules/store/src/main/scala/docspell/store/records/RAttachment.scala @@ -255,6 +255,21 @@ object RAttachment { } } + def findAllWithoutPageCount(chunkSize: Int): Stream[ConnectionIO, RAttachment] = { + val aId = Columns.id.prefix("a") + val mId = RAttachmentMeta.Columns.id.prefix("m") + val mPages = RAttachmentMeta.Columns.pages.prefix("m") + + val cols = all.map(_.prefix("a")) + val join = table ++ fr"a LEFT OUTER JOIN" ++ + RAttachmentMeta.table ++ fr"m ON" ++ aId.is(mId) + val cond = mPages.isNull + + selectSimple(cols, join, cond) + .query[RAttachment] + .streamWithChunkSize(chunkSize) + } + def findWithoutPreview( coll: Option[Ident], chunkSize: Int