Add startup task to find page counts of existing files

This commit is contained in:
Eike Kettner 2020-11-09 20:34:34 +01:00
parent de00b46e5d
commit 29455d638c
9 changed files with 234 additions and 11 deletions

View File

@ -8,6 +8,26 @@ import docspell.store.records.RJob
object JobFactory {
def makePageCount[F[_]: Sync](
args: MakePageCountArgs,
account: Option[AccountId]
): F[RJob] =
for {
id <- Ident.randomId[F]
now <- Timestamp.current[F]
job = RJob.newJob(
id,
MakePageCountArgs.taskName,
account.map(_.collective).getOrElse(DocspellSystem.taskGroup),
args,
s"Find page-count metadata for ${args.attachment.id}",
now,
account.map(_.user).getOrElse(DocspellSystem.user),
Priority.Low,
Some(MakePageCountArgs.taskName / args.attachment)
)
} yield job
def makePreview[F[_]: Sync](
args: MakePreviewArgs,
account: Option[AccountId]

View File

@ -2,8 +2,9 @@ package docspell.common
object DocspellSystem {
val user = Ident.unsafe("docspell-system")
val taskGroup = user
val migrationTaskTracker = Ident.unsafe("full-text-index-tracker")
val allPreviewTaskTracker = Ident.unsafe("generate-all-previews")
val user = Ident.unsafe("docspell-system")
val taskGroup = user
val migrationTaskTracker = Ident.unsafe("full-text-index-tracker")
val allPreviewTaskTracker = Ident.unsafe("generate-all-previews")
val allPageCountTaskTracker = Ident.unsafe("all-page-count-tracker")
}

View File

@ -0,0 +1,24 @@
package docspell.common
import io.circe.generic.semiauto._
import io.circe.{Decoder, Encoder}
/** Arguments for the `MakePageCountTask` that reads the number of
* pages for an attachment and stores it into the meta data of the
* attachment.
*/
case class MakePageCountArgs(
attachment: Ident
)
object MakePageCountArgs {
val taskName = Ident.unsafe("make-page-count")
implicit val jsonEncoder: Encoder[MakePageCountArgs] =
deriveEncoder[MakePageCountArgs]
implicit val jsonDecoder: Decoder[MakePageCountArgs] =
deriveDecoder[MakePageCountArgs]
}

View File

@ -16,6 +16,7 @@ import docspell.joex.fts.{MigrationTask, ReIndexTask}
import docspell.joex.hk._
import docspell.joex.learn.LearnClassifierTask
import docspell.joex.notify._
import docspell.joex.pagecount._
import docspell.joex.pdfconv.ConvertAllPdfTask
import docspell.joex.pdfconv.PdfConvTask
import docspell.joex.preview._
@ -72,7 +73,8 @@ final class JoexAppImpl[F[_]: ConcurrentEffect: ContextShift: Timer](
MigrationTask.job.flatMap(queue.insertIfNew) *>
AllPreviewsTask
.job(MakePreviewArgs.StoreMode.WhenMissing, None)
.flatMap(queue.insertIfNew)
.flatMap(queue.insertIfNew) *>
AllPageCountTask.job.flatMap(queue.insertIfNew)
}
object JoexAppImpl {
@ -185,6 +187,20 @@ object JoexAppImpl {
AllPreviewsTask.onCancel[F]
)
)
.withTask(
JobTask.json(
MakePageCountArgs.taskName,
MakePageCountTask[F](),
MakePageCountTask.onCancel[F]
)
)
.withTask(
JobTask.json(
AllPageCountTask.taskName,
AllPageCountTask[F](queue, joex),
AllPageCountTask.onCancel[F]
)
)
.resource
psch <- PeriodicScheduler.create(
cfg.periodicScheduler,

View File

@ -0,0 +1,75 @@
package docspell.joex.pagecount
import cats.effect._
import cats.implicits._
import fs2.{Chunk, Stream}
import docspell.backend.JobFactory
import docspell.backend.ops.OJoex
import docspell.common._
import docspell.joex.scheduler.Context
import docspell.joex.scheduler.Task
import docspell.store.queue.JobQueue
import docspell.store.records.RAttachment
import docspell.store.records.RJob
object AllPageCountTask {
val taskName = Ident.unsafe("all-page-count")
type Args = Unit
def apply[F[_]: Sync](queue: JobQueue[F], joex: OJoex[F]): Task[F, Args, Unit] =
Task { ctx =>
for {
_ <- ctx.logger.info("Generating previews for attachments")
n <- submitConversionJobs(ctx, queue)
_ <- ctx.logger.info(s"Submitted $n jobs")
_ <- joex.notifyAllNodes
} yield ()
}
def onCancel[F[_]: Sync]: Task[F, Args, Unit] =
Task.log(_.warn("Cancelling all-previews task"))
def submitConversionJobs[F[_]: Sync](
ctx: Context[F, Args],
queue: JobQueue[F]
): F[Int] =
ctx.store
.transact(findAttachments)
.chunks
.flatMap(createJobs[F])
.chunks
.evalMap(jobs => queue.insertAllIfNew(jobs.toVector).map(_ => jobs.size))
.evalTap(n => ctx.logger.debug(s"Submitted $n jobs …"))
.compile
.foldMonoid
private def findAttachments[F[_]] =
RAttachment.findAllWithoutPageCount(50)
private def createJobs[F[_]: Sync](ras: Chunk[RAttachment]): Stream[F, RJob] = {
def mkJob(ra: RAttachment): F[RJob] =
JobFactory.makePageCount(MakePageCountArgs(ra.id), None)
val jobs = ras.traverse(mkJob)
Stream.evalUnChunk(jobs)
}
def job[F[_]: Sync]: F[RJob] =
for {
id <- Ident.randomId[F]
now <- Timestamp.current[F]
} yield RJob.newJob(
id,
AllPageCountTask.taskName,
DocspellSystem.taskGroup,
(),
"Create all page-counts",
now,
DocspellSystem.taskGroup,
Priority.Low,
Some(DocspellSystem.allPageCountTaskTracker)
)
}

View File

@ -0,0 +1,55 @@
package docspell.joex.pagecount
import cats.effect._
import cats.implicits._
import docspell.common._
import docspell.joex.process.AttachmentPageCount
import docspell.joex.scheduler.Context
import docspell.joex.scheduler.Task
import docspell.store.records.RAttachment
import docspell.store.records.RAttachmentMeta
object MakePageCountTask {
type Args = MakePageCountArgs
def apply[F[_]: Sync](): Task[F, Args, Unit] =
Task { ctx =>
for {
exists <- pageCountExists(ctx)
_ <-
if (exists)
ctx.logger.info(
s"PageCount already exists for attachment ${ctx.args.attachment}. Skipping."
)
else
ctx.logger.info(
s"Reading page-count for attachment ${ctx.args.attachment}"
) *> generatePageCount(ctx)
} yield ()
}
def onCancel[F[_]: Sync]: Task[F, Args, Unit] =
Task.log(_.warn("Cancelling make-page-count task"))
private def generatePageCount[F[_]: Sync](
ctx: Context[F, Args]
): F[Unit] =
for {
ra <- ctx.store.transact(RAttachment.findById(ctx.args.attachment))
_ <- ra
.map(AttachmentPageCount.createPageCount(ctx))
.getOrElse(
ctx.logger.warn(s"No attachment found with id: ${ctx.args.attachment}")
)
} yield ()
private def pageCountExists[F[_]: Sync](ctx: Context[F, Args]): F[Boolean] =
ctx.store.transact(
RAttachmentMeta
.findById(ctx.args.attachment)
.map(_.flatMap(_.pages).exists(_ > 0))
)
}

View File

@ -50,14 +50,16 @@ object AttachmentPageCount {
case MimeType.PdfMatch(_) =>
PdfboxExtract.getMetaData(loadFile(ctx)(ra)).flatMap {
case Right(md) =>
updatePageCount(ctx, md, ra).map(_.some)
ctx.logger.debug(s"Found number of pages: ${md.pageCount}") *>
updatePageCount(ctx, md, ra).map(_.some)
case Left(ex) =>
ctx.logger.warn(s"Error obtaining pages count: ${ex.getMessage}") *>
(None: Option[PdfMetaData]).pure[F]
}
case _ =>
(None: Option[PdfMetaData]).pure[F]
case mt =>
ctx.logger.warn(s"Not a pdf file, but ${mt.asString}, cannot get page count.") *>
(None: Option[PdfMetaData]).pure[F]
}
private def updatePageCount[F[_]: Sync](
@ -65,8 +67,23 @@ object AttachmentPageCount {
md: PdfMetaData,
ra: RAttachment
): F[PdfMetaData] =
ctx.store.transact(RAttachmentMeta.updatePageCount(ra.id, md.pageCount.some)) *> md
.pure[F]
for {
_ <- ctx.logger.debug(
s"Update attachment ${ra.id.id} with page count ${md.pageCount.some}"
)
n <- ctx.store.transact(RAttachmentMeta.updatePageCount(ra.id, md.pageCount.some))
m <-
if (n == 0)
ctx.logger.warn(
s"No attachmentmeta record exists for ${ra.id.id}. Creating new."
) *> ctx.store.transact(
RAttachmentMeta.insert(
RAttachmentMeta(ra.id, None, Nil, MetaProposalList.empty, md.pageCount.some)
)
)
else 0.pure[F]
_ <- ctx.logger.debug(s"Stored page count (${n + m}).")
} yield md
def findMime[F[_]: Functor](ctx: Context[F, _])(ra: RAttachment): F[MimeType] =
OptionT(ctx.store.transact(RFileMeta.findById(ra.fileId)))

View File

@ -15,6 +15,7 @@ import docspell.common.syntax.all._
import docspell.ftsclient.FtsResult
import docspell.restapi.model._
import docspell.restserver.conv.Conversions._
import docspell.store.queries.QItem
import docspell.store.records._
import docspell.store.{AddResult, UpdateResult}
@ -22,7 +23,6 @@ import bitpeace.FileMeta
import org.http4s.headers.`Content-Type`
import org.http4s.multipart.Multipart
import org.log4s.Logger
import docspell.store.queries.QItem
trait Conversions {

View File

@ -255,6 +255,21 @@ object RAttachment {
}
}
def findAllWithoutPageCount(chunkSize: Int): Stream[ConnectionIO, RAttachment] = {
val aId = Columns.id.prefix("a")
val mId = RAttachmentMeta.Columns.id.prefix("m")
val mPages = RAttachmentMeta.Columns.pages.prefix("m")
val cols = all.map(_.prefix("a"))
val join = table ++ fr"a LEFT OUTER JOIN" ++
RAttachmentMeta.table ++ fr"m ON" ++ aId.is(mId)
val cond = mPages.isNull
selectSimple(cols, join, cond)
.query[RAttachment]
.streamWithChunkSize(chunkSize)
}
def findWithoutPreview(
coll: Option[Ident],
chunkSize: Int