From 709848244c7306d46f2bed0cc53069814107c4a6 Mon Sep 17 00:00:00 2001 From: Eike Kettner <eike.kettner@posteo.de> Date: Sun, 8 Nov 2020 23:46:02 +0100 Subject: [PATCH] Create tasks to generate all previews There is a task to generate preview images per attachment. It can either add them (if not present yet) or overwrite them (e.g. some config has changed). There is a task that selects all attachments without previews and submits a task to create it. This is submitted on start automatically to generate previews for all existing attachments. --- .../docspell/common/AllPreviewsArgs.scala | 26 ++++++ .../docspell/common/DocspellSystem.scala | 8 +- .../docspell/common/MakePreviewArgs.scala | 53 ++++++++++++ .../scala/docspell/joex/JoexAppImpl.scala | 20 ++++- .../joex/preview/AllPreviewsTask.scala | 86 +++++++++++++++++++ .../joex/preview/MakePreviewTask.scala | 57 ++++++++++++ .../joex/process/AttachmentPreview.scala | 10 ++- .../docspell/store/queries/QAttachment.scala | 16 ++++ .../docspell/store/records/RAttachment.scala | 32 +++++++ 9 files changed, 299 insertions(+), 9 deletions(-) create mode 100644 modules/common/src/main/scala/docspell/common/AllPreviewsArgs.scala create mode 100644 modules/common/src/main/scala/docspell/common/MakePreviewArgs.scala create mode 100644 modules/joex/src/main/scala/docspell/joex/preview/AllPreviewsTask.scala create mode 100644 modules/joex/src/main/scala/docspell/joex/preview/MakePreviewTask.scala diff --git a/modules/common/src/main/scala/docspell/common/AllPreviewsArgs.scala b/modules/common/src/main/scala/docspell/common/AllPreviewsArgs.scala new file mode 100644 index 00000000..b4ee054f --- /dev/null +++ b/modules/common/src/main/scala/docspell/common/AllPreviewsArgs.scala @@ -0,0 +1,26 @@ +package docspell.common + +import io.circe.generic.semiauto._ +import io.circe.{Decoder, Encoder} + +/** Arguments for the `AllPreviewsTask` that submits tasks to + * generates a preview image for attachments. + * + * It can replace the current preview image or only generate one, if + * it is missing. If no collective is specified, it considers all + * attachments. + */ +case class AllPreviewsArgs( + collective: Option[Ident], + storeMode: MakePreviewArgs.StoreMode +) + +object AllPreviewsArgs { + + val taskName = Ident.unsafe("all-previews") + + implicit val jsonEncoder: Encoder[AllPreviewsArgs] = + deriveEncoder[AllPreviewsArgs] + implicit val jsonDecoder: Decoder[AllPreviewsArgs] = + deriveDecoder[AllPreviewsArgs] +} diff --git a/modules/common/src/main/scala/docspell/common/DocspellSystem.scala b/modules/common/src/main/scala/docspell/common/DocspellSystem.scala index 52cbb717..ad410281 100644 --- a/modules/common/src/main/scala/docspell/common/DocspellSystem.scala +++ b/modules/common/src/main/scala/docspell/common/DocspellSystem.scala @@ -2,8 +2,8 @@ package docspell.common object DocspellSystem { - val user = Ident.unsafe("docspell-system") - val taskGroup = user - val migrationTaskTracker = Ident.unsafe("full-text-index-tracker") - + val user = Ident.unsafe("docspell-system") + val taskGroup = user + val migrationTaskTracker = Ident.unsafe("full-text-index-tracker") + val allPreviewTaskTracker = Ident.unsafe("generate-all-previews") } diff --git a/modules/common/src/main/scala/docspell/common/MakePreviewArgs.scala b/modules/common/src/main/scala/docspell/common/MakePreviewArgs.scala new file mode 100644 index 00000000..711c3fea --- /dev/null +++ b/modules/common/src/main/scala/docspell/common/MakePreviewArgs.scala @@ -0,0 +1,53 @@ +package docspell.common + +import io.circe.generic.semiauto._ +import io.circe.{Decoder, Encoder} + +/** Arguments for the `MakePreviewTask` that generates a preview image + * for an attachment. + * + * It can replace the current preview image or only generate one, if + * it is missing. + */ +case class MakePreviewArgs( + attachment: Ident, + store: MakePreviewArgs.StoreMode +) + +object MakePreviewArgs { + + val taskName = Ident.unsafe("make-preview") + + sealed trait StoreMode extends Product { + final def name: String = + productPrefix.toLowerCase() + } + object StoreMode { + + /** Replace any preview file that may already exist. */ + case object Replace extends StoreMode + + /** Only create a preview image, if it is missing. */ + case object WhenMissing extends StoreMode + + def fromString(str: String): Either[String, StoreMode] = + Option(str).map(_.trim.toLowerCase()) match { + case Some("replace") => Right(Replace) + case Some("whenmissing") => Right(WhenMissing) + case _ => Left(s"Invalid store mode: $str") + } + + implicit val jsonEncoder: Encoder[StoreMode] = + Encoder.encodeString.contramap(_.name) + + implicit val jsonDecoder: Decoder[StoreMode] = + Decoder.decodeString.emap(fromString) + } + + implicit val jsonEncoder: Encoder[MakePreviewArgs] = + deriveEncoder[MakePreviewArgs] + + implicit val jsonDecoder: Decoder[MakePreviewArgs] = + deriveDecoder[MakePreviewArgs] + +} diff --git a/modules/joex/src/main/scala/docspell/joex/JoexAppImpl.scala b/modules/joex/src/main/scala/docspell/joex/JoexAppImpl.scala index 7c3f57fc..4362d93a 100644 --- a/modules/joex/src/main/scala/docspell/joex/JoexAppImpl.scala +++ b/modules/joex/src/main/scala/docspell/joex/JoexAppImpl.scala @@ -18,6 +18,7 @@ import docspell.joex.learn.LearnClassifierTask import docspell.joex.notify._ import docspell.joex.pdfconv.ConvertAllPdfTask import docspell.joex.pdfconv.PdfConvTask +import docspell.joex.preview._ import docspell.joex.process.ItemHandler import docspell.joex.process.ReProcessItem import docspell.joex.scanmailbox._ @@ -68,7 +69,10 @@ final class JoexAppImpl[F[_]: ConcurrentEffect: ContextShift: Timer]( HouseKeepingTask .periodicTask[F](cfg.houseKeeping.schedule) .flatMap(pstore.insert) *> - MigrationTask.job.flatMap(queue.insertIfNew) + MigrationTask.job.flatMap(queue.insertIfNew) *> + AllPreviewsTask + .job(MakePreviewArgs.StoreMode.WhenMissing, None) + .flatMap(queue.insertIfNew) } object JoexAppImpl { @@ -167,6 +171,20 @@ object JoexAppImpl { LearnClassifierTask.onCancel[F] ) ) + .withTask( + JobTask.json( + MakePreviewArgs.taskName, + MakePreviewTask[F](cfg.convert), + MakePreviewTask.onCancel[F] + ) + ) + .withTask( + JobTask.json( + AllPreviewsArgs.taskName, + AllPreviewsTask[F](queue, joex), + AllPreviewsTask.onCancel[F] + ) + ) .resource psch <- PeriodicScheduler.create( cfg.periodicScheduler, diff --git a/modules/joex/src/main/scala/docspell/joex/preview/AllPreviewsTask.scala b/modules/joex/src/main/scala/docspell/joex/preview/AllPreviewsTask.scala new file mode 100644 index 00000000..31e6d636 --- /dev/null +++ b/modules/joex/src/main/scala/docspell/joex/preview/AllPreviewsTask.scala @@ -0,0 +1,86 @@ +package docspell.joex.preview + +import fs2.{Chunk, Stream} +import docspell.common._ +import cats.effect._ +import cats.implicits._ +import docspell.store.queue.JobQueue +import docspell.backend.ops.OJoex +import docspell.joex.scheduler.Task +import docspell.joex.scheduler.Context +import docspell.store.records.RAttachment +import docspell.store.records.RJob + +object AllPreviewsTask { + + type Args = AllPreviewsArgs + + def apply[F[_]: Sync](queue: JobQueue[F], joex: OJoex[F]): Task[F, Args, Unit] = + Task { ctx => + for { + _ <- ctx.logger.info("Generating previews for attachments") + n <- submitConversionJobs(ctx, queue) + _ <- ctx.logger.info(s"Submitted $n jobs") + _ <- joex.notifyAllNodes + } yield () + } + + def onCancel[F[_]: Sync]: Task[F, Args, Unit] = + Task.log(_.warn("Cancelling all-previews task")) + + def submitConversionJobs[F[_]: Sync]( + ctx: Context[F, Args], + queue: JobQueue[F] + ): F[Int] = + ctx.store + .transact(RAttachment.findWithoutPreview(ctx.args.collective, 50)) + .chunks + .flatMap(createJobs[F](ctx)) + .chunks + .evalMap(jobs => queue.insertAllIfNew(jobs.toVector).map(_ => jobs.size)) + .evalTap(n => ctx.logger.debug(s"Submitted $n jobs …")) + .compile + .foldMonoid + + private def createJobs[F[_]: Sync]( + ctx: Context[F, Args] + )(ras: Chunk[RAttachment]): Stream[F, RJob] = { + val collectiveOrSystem = ctx.args.collective.getOrElse(DocspellSystem.taskGroup) + + def mkJob(ra: RAttachment): F[RJob] = + for { + id <- Ident.randomId[F] + now <- Timestamp.current[F] + } yield RJob.newJob( + id, + MakePreviewArgs.taskName, + collectiveOrSystem, + MakePreviewArgs(ra.id, ctx.args.storeMode), + s"Create preview ${ra.id.id}/${ra.name.getOrElse("-")}", + now, + collectiveOrSystem, + Priority.Low, + Some(MakePreviewArgs.taskName / ra.id) + ) + + val jobs = ras.traverse(mkJob) + Stream.evalUnChunk(jobs) + } + + def job[F[_]: Sync](storeMode: MakePreviewArgs.StoreMode, cid: Option[Ident]): F[RJob] = + for { + id <- Ident.randomId[F] + now <- Timestamp.current[F] + } yield RJob.newJob( + id, + AllPreviewsArgs.taskName, + cid.getOrElse(DocspellSystem.taskGroup), + AllPreviewsArgs(cid, storeMode), + "Create preview images", + now, + DocspellSystem.taskGroup, + Priority.Low, + Some(DocspellSystem.allPreviewTaskTracker) + ) + +} diff --git a/modules/joex/src/main/scala/docspell/joex/preview/MakePreviewTask.scala b/modules/joex/src/main/scala/docspell/joex/preview/MakePreviewTask.scala new file mode 100644 index 00000000..9da04e33 --- /dev/null +++ b/modules/joex/src/main/scala/docspell/joex/preview/MakePreviewTask.scala @@ -0,0 +1,57 @@ +package docspell.joex.preview + +import cats.implicits._ +import cats.effect._ +import docspell.common._ +import docspell.joex.scheduler.Task +import docspell.store.records.RAttachmentPreview +import docspell.joex.scheduler.Context +import docspell.joex.process.AttachmentPreview +import docspell.convert.ConvertConfig +import docspell.extract.pdfbox.PdfboxPreview +import docspell.store.records.RAttachment + +object MakePreviewTask { + + type Args = MakePreviewArgs + + def apply[F[_]: Sync](cfg: ConvertConfig): Task[F, Args, Unit] = + Task { ctx => + for { + exists <- previewExists(ctx) + preview <- PdfboxPreview(30) + _ <- + if (exists) + ctx.logger.info( + s"Preview already exists for attachment ${ctx.args.attachment}. Skipping." + ) + else + ctx.logger.info( + s"Generating preview image for attachment ${ctx.args.attachment}" + ) *> generatePreview(ctx, preview, cfg) + } yield () + } + + def onCancel[F[_]: Sync]: Task[F, Args, Unit] = + Task.log(_.warn("Cancelling make-preview task")) + + private def generatePreview[F[_]: Sync]( + ctx: Context[F, Args], + preview: PdfboxPreview[F], + cfg: ConvertConfig + ): F[Unit] = + for { + ra <- ctx.store.transact(RAttachment.findById(ctx.args.attachment)) + _ <- ra + .map(AttachmentPreview.createPreview(ctx, preview, cfg.chunkSize)) + .getOrElse(().pure[F]) + } yield () + + private def previewExists[F[_]: Sync](ctx: Context[F, Args]): F[Boolean] = + if (ctx.args.store == MakePreviewArgs.StoreMode.WhenMissing) + ctx.store.transact( + RAttachmentPreview.findById(ctx.args.attachment).map(_.isDefined) + ) + else + false.pure[F] +} diff --git a/modules/joex/src/main/scala/docspell/joex/process/AttachmentPreview.scala b/modules/joex/src/main/scala/docspell/joex/process/AttachmentPreview.scala index d18c270d..26db6b03 100644 --- a/modules/joex/src/main/scala/docspell/joex/process/AttachmentPreview.scala +++ b/modules/joex/src/main/scala/docspell/joex/process/AttachmentPreview.scala @@ -15,6 +15,7 @@ import docspell.store.records._ import docspell.store.syntax.MimeTypes._ import bitpeace.{Mimetype, MimetypeHint, RangeDef} +import docspell.store.queries.QAttachment /** Goes through all attachments that must be already converted into a * pdf. If it is a pdf, the first page is converted into a small @@ -31,14 +32,14 @@ object AttachmentPreview { s"Creating preview images for ${item.attachments.size} files…" ) preview <- PdfboxPreview(24) - _ <- item.attachments.traverse(createPreview(ctx, preview, cfg)) + _ <- item.attachments.traverse(createPreview(ctx, preview, cfg.chunkSize)) } yield item } def createPreview[F[_]: Sync]( ctx: Context[F, _], preview: PdfboxPreview[F], - cfg: ConvertConfig + chunkSize: Int )( ra: RAttachment ): F[Option[RAttachmentPreview]] = @@ -46,7 +47,7 @@ object AttachmentPreview { case MimeType.PdfMatch(_) => preview.previewPNG(loadFile(ctx)(ra)).flatMap { case Some(out) => - createRecord(ctx, out, ra, cfg.chunkSize).map(_.some) + createRecord(ctx, out, ra, chunkSize).map(_.some) case None => (None: Option[RAttachmentPreview]).pure[F] } @@ -55,7 +56,7 @@ object AttachmentPreview { (None: Option[RAttachmentPreview]).pure[F] } - def createRecord[F[_]: Sync]( + private def createRecord[F[_]: Sync]( ctx: Context[F, _], png: Stream[F, Byte], ra: RAttachment, @@ -75,6 +76,7 @@ object AttachmentPreview { .lastOrError now <- Timestamp.current[F] rp = RAttachmentPreview(ra.id, Ident.unsafe(fileMeta.id), name.map(_.fullName), now) + _ <- QAttachment.deletePreview(ctx.store)(ra.id) _ <- ctx.store.transact(RAttachmentPreview.insert(rp)) } yield rp } diff --git a/modules/store/src/main/scala/docspell/store/queries/QAttachment.scala b/modules/store/src/main/scala/docspell/store/queries/QAttachment.scala index ca5260aa..9fbe7401 100644 --- a/modules/store/src/main/scala/docspell/store/queries/QAttachment.scala +++ b/modules/store/src/main/scala/docspell/store/queries/QAttachment.scala @@ -17,6 +17,22 @@ import doobie.implicits._ object QAttachment { private[this] val logger = org.log4s.getLogger + def deletePreview[F[_]: Sync](store: Store[F])(attachId: Ident): F[Int] = { + val findPreview = + for { + rp <- RAttachmentPreview.findById(attachId) + } yield rp.toSeq + + Stream + .evalSeq(store.transact(findPreview)) + .map(_.fileId.id) + .flatMap(store.bitpeace.delete) + .map(flag => if (flag) 1 else 0) + .evalMap(_ => store.transact(RAttachmentPreview.delete(attachId))) + .compile + .foldMonoid + } + /** Deletes an attachment, its related source and meta data records. * It will only delete an related archive file, if this is the last * attachment in that archive. diff --git a/modules/store/src/main/scala/docspell/store/records/RAttachment.scala b/modules/store/src/main/scala/docspell/store/records/RAttachment.scala index 4e8d3d40..8be0fdb6 100644 --- a/modules/store/src/main/scala/docspell/store/records/RAttachment.scala +++ b/modules/store/src/main/scala/docspell/store/records/RAttachment.scala @@ -231,6 +231,38 @@ object RAttachment { def findItemId(attachId: Ident): ConnectionIO[Option[Ident]] = selectSimple(Seq(itemId), table, id.is(attachId)).query[Ident].option + def findWithoutPreview( + coll: Option[Ident], + chunkSize: Int + ): Stream[ConnectionIO, RAttachment] = { + val aId = Columns.id.prefix("a") + val aItem = Columns.itemId.prefix("a") + val pId = RAttachmentPreview.Columns.id.prefix("p") + val iId = RItem.Columns.id.prefix("i") + val iColl = RItem.Columns.cid.prefix("i") + + val cols = all.map(_.prefix("a")) + val baseJoin = + table ++ fr"a LEFT OUTER JOIN" ++ + RAttachmentPreview.table ++ fr"p ON" ++ pId.is(aId) + + val baseCond = + Seq(pId.isNull) + + coll match { + case Some(cid) => + val join = baseJoin ++ fr"INNER JOIN" ++ RItem.table ++ fr"i ON" ++ iId.is(aItem) + val cond = and(baseCond ++ Seq(iColl.is(cid))) + selectSimple(cols, join, cond) + .query[RAttachment] + .streamWithChunkSize(chunkSize) + case None => + selectSimple(cols, baseJoin, and(baseCond)) + .query[RAttachment] + .streamWithChunkSize(chunkSize) + } + } + def findNonConvertedPdf( coll: Option[Ident], chunkSize: Int