Add api docs and cleanup

This commit is contained in:
Eike Kettner 2020-08-13 20:52:43 +02:00
parent 081c4da903
commit 3986487f11
11 changed files with 155 additions and 73 deletions

View File

@ -61,7 +61,7 @@ object BackendApp {
uploadImpl <- OUpload(store, queue, cfg.files, joexImpl) uploadImpl <- OUpload(store, queue, cfg.files, joexImpl)
nodeImpl <- ONode(store) nodeImpl <- ONode(store)
jobImpl <- OJob(store, joexImpl) jobImpl <- OJob(store, joexImpl)
itemImpl <- OItem(store, ftsClient) itemImpl <- OItem(store, ftsClient, queue, joexImpl)
itemSearchImpl <- OItemSearch(store) itemSearchImpl <- OItemSearch(store)
fulltextImpl <- OFulltext(itemSearchImpl, ftsClient, store, queue, joexImpl) fulltextImpl <- OFulltext(itemSearchImpl, ftsClient, store, queue, joexImpl)
javaEmil = javaEmil =

View File

@ -25,15 +25,16 @@ object JobFactory {
now, now,
account.user, account.user,
prio, prio,
None collective
.map(c => c / ConvertAllPdfArgs.taskName)
.orElse(ConvertAllPdfArgs.taskName.some)
) )
} yield job } yield job
def reprocessItem[F[_]: Sync]( def reprocessItem[F[_]: Sync](
args: ReProcessItemArgs, args: ReProcessItemArgs,
account: AccountId, account: AccountId,
prio: Priority, prio: Priority
tracker: Option[Ident]
): F[RJob] = ): F[RJob] =
for { for {
id <- Ident.randomId[F] id <- Ident.randomId[F]
@ -47,7 +48,7 @@ object JobFactory {
now, now,
account.user, account.user,
prio, prio,
tracker Some(ReProcessItemArgs.taskName / args.itemId)
) )
} yield job } yield job

View File

@ -4,10 +4,12 @@ import cats.data.OptionT
import cats.effect.{Effect, Resource} import cats.effect.{Effect, Resource}
import cats.implicits._ import cats.implicits._
import docspell.backend.JobFactory
import docspell.common._ import docspell.common._
import docspell.ftsclient.FtsClient import docspell.ftsclient.FtsClient
import docspell.store.UpdateResult import docspell.store.UpdateResult
import docspell.store.queries.{QAttachment, QItem} import docspell.store.queries.{QAttachment, QItem}
import docspell.store.queue.JobQueue
import docspell.store.records._ import docspell.store.records._
import docspell.store.{AddResult, Store} import docspell.store.{AddResult, Store}
@ -76,11 +78,38 @@ trait OItem[F[_]] {
name: Option[String], name: Option[String],
collective: Ident collective: Ident
): F[AddResult] ): F[AddResult]
/** Submits the item for re-processing. The list of attachment ids can
* be used to only re-process a subset of the item's attachments.
* If this list is empty, all attachments are reprocessed. This
* call only submits the job into the queue.
*/
def reprocess(
item: Ident,
attachments: List[Ident],
account: AccountId,
notifyJoex: Boolean
): F[UpdateResult]
/** Submits a task that finds all non-converted pdfs and triggers
* converting them using ocrmypdf. Each file is converted by a
* separate task.
*/
def convertAllPdf(
collective: Option[Ident],
account: AccountId,
notifyJoex: Boolean
): F[UpdateResult]
} }
object OItem { object OItem {
def apply[F[_]: Effect](store: Store[F], fts: FtsClient[F]): Resource[F, OItem[F]] = def apply[F[_]: Effect](
store: Store[F],
fts: FtsClient[F],
queue: JobQueue[F],
joex: OJoex[F]
): Resource[F, OItem[F]] =
for { for {
otag <- OTag(store) otag <- OTag(store)
oorg <- OOrganization(store) oorg <- OOrganization(store)
@ -400,6 +429,35 @@ object OItem {
) )
) )
def reprocess(
item: Ident,
attachments: List[Ident],
account: AccountId,
notifyJoex: Boolean
): F[UpdateResult] =
(for {
_ <- OptionT(
store.transact(RItem.findByIdAndCollective(item, account.collective))
)
args = ReProcessItemArgs(item, attachments)
job <- OptionT.liftF(
JobFactory.reprocessItem[F](args, account, Priority.Low)
)
_ <- OptionT.liftF(queue.insertIfNew(job))
_ <- OptionT.liftF(if (notifyJoex) joex.notifyAllNodes else ().pure[F])
} yield UpdateResult.success).getOrElse(UpdateResult.notFound)
def convertAllPdf(
collective: Option[Ident],
account: AccountId,
notifyJoex: Boolean
): F[UpdateResult] =
for {
job <- JobFactory.convertAllPdfs[F](collective, account, Priority.Low)
_ <- queue.insertIfNew(job)
_ <- if (notifyJoex) joex.notifyAllNodes else ().pure[F]
} yield UpdateResult.success
private def onSuccessIgnoreError(update: F[Unit])(ar: AddResult): F[Unit] = private def onSuccessIgnoreError(update: F[Unit])(ar: AddResult): F[Unit] =
ar match { ar match {
case AddResult.Success => case AddResult.Success =>

View File

@ -44,24 +44,6 @@ trait OUpload[F[_]] {
case Left(srcId) => case Left(srcId) =>
submit(data, srcId, notifyJoex, itemId) submit(data, srcId, notifyJoex, itemId)
} }
/** Submits the item for re-processing. The list of attachment ids can
* be used to only re-process a subset of the item's attachments.
* If this list is empty, all attachments are reprocessed. This
* call only submits the job into the queue.
*/
def reprocess(
item: Ident,
attachments: List[Ident],
account: AccountId,
notifyJoex: Boolean
): F[OUpload.UploadResult]
def convertAllPdf(
collective: Option[Ident],
account: AccountId,
notifyJoex: Boolean
): F[OUpload.UploadResult]
} }
object OUpload { object OUpload {
@ -177,31 +159,6 @@ object OUpload {
result <- OptionT.liftF(submit(updata, accId, notifyJoex, itemId)) result <- OptionT.liftF(submit(updata, accId, notifyJoex, itemId))
} yield result).getOrElse(UploadResult.noSource) } yield result).getOrElse(UploadResult.noSource)
def reprocess(
item: Ident,
attachments: List[Ident],
account: AccountId,
notifyJoex: Boolean
): F[UploadResult] =
(for {
_ <-
OptionT(store.transact(RItem.findByIdAndCollective(item, account.collective)))
args = ReProcessItemArgs(item, attachments)
job <-
OptionT.liftF(JobFactory.reprocessItem[F](args, account, Priority.Low, None))
res <- OptionT.liftF(submitJobs(notifyJoex)(Vector(job)))
} yield res).getOrElse(UploadResult.noItem)
def convertAllPdf(
collective: Option[Ident],
account: AccountId,
notifyJoex: Boolean
): F[OUpload.UploadResult] =
for {
job <- JobFactory.convertAllPdfs(collective, account, Priority.Low)
res <- submitJobs(notifyJoex)(Vector(job))
} yield res
private def submitJobs( private def submitJobs(
notifyJoex: Boolean notifyJoex: Boolean
)(jobs: Vector[RJob]): F[OUpload.UploadResult] = )(jobs: Vector[RJob]): F[OUpload.UploadResult] =

View File

@ -3,12 +3,24 @@ package docspell.common
import io.circe._ import io.circe._
import io.circe.generic.semiauto._ import io.circe.generic.semiauto._
/** Arguments for the task that finds all pdf files that have not been
* converted and submits for each a job that will convert the file
* using ocrmypdf.
*
* If the `collective` argument is present, then this task and the
* ones that are submitted by this task run in the realm of the
* collective (and only their files are considered). If it is empty,
* it is a system task and all files are considered.
*/
case class ConvertAllPdfArgs(collective: Option[Ident]) case class ConvertAllPdfArgs(collective: Option[Ident])
object ConvertAllPdfArgs { object ConvertAllPdfArgs {
val taskName = Ident.unsafe("submit-pdf-migration-tasks") val taskName = Ident.unsafe("submit-pdf-migration-tasks")
implicit val jsonDecoder: Decoder[ConvertAllPdfArgs] = implicit val jsonDecoder: Decoder[ConvertAllPdfArgs] =
deriveDecoder[ConvertAllPdfArgs] deriveDecoder[ConvertAllPdfArgs]
implicit val jsonEncoder: Encoder[ConvertAllPdfArgs] = implicit val jsonEncoder: Encoder[ConvertAllPdfArgs] =
deriveEncoder[ConvertAllPdfArgs] deriveEncoder[ConvertAllPdfArgs]
} }

View File

@ -87,7 +87,7 @@ object JoexAppImpl {
joex <- OJoex(client, store) joex <- OJoex(client, store)
upload <- OUpload(store, queue, cfg.files, joex) upload <- OUpload(store, queue, cfg.files, joex)
fts <- createFtsClient(cfg)(httpClient) fts <- createFtsClient(cfg)(httpClient)
itemOps <- OItem(store, fts) itemOps <- OItem(store, fts, queue, joex)
javaEmil = javaEmil =
JavaMailEmil(blocker, Settings.defaultSettings.copy(debug = cfg.mailDebug)) JavaMailEmil(blocker, Settings.defaultSettings.copy(debug = cfg.mailDebug))
sch <- SchedulerBuilder(cfg.scheduler, blocker, store) sch <- SchedulerBuilder(cfg.scheduler, blocker, store)

View File

@ -11,15 +11,19 @@ import docspell.store.queue.JobQueue
import docspell.store.records.RAttachment import docspell.store.records.RAttachment
import docspell.store.records._ import docspell.store.records._
/* A task to find all non-converted pdf files (of a collective, or
* all) and converting them using ocrmypdf by submitting a job for
* each found file.
*/
object ConvertAllPdfTask { object ConvertAllPdfTask {
type Args = ConvertAllPdfArgs type Args = ConvertAllPdfArgs
def apply[F[_]: Sync](queue: JobQueue[F], joex: OJoex[F]): Task[F, Args, Unit] = def apply[F[_]: Sync](queue: JobQueue[F], joex: OJoex[F]): Task[F, Args, Unit] =
Task { ctx => Task { ctx =>
for { for {
_ <- ctx.logger.info("Converting older pdfs using ocrmypdf") _ <- ctx.logger.info("Converting pdfs using ocrmypdf")
n <- submitConversionJobs(ctx, queue) n <- submitConversionJobs(ctx, queue)
_ <- ctx.logger.info(s"Submitted $n jobs for file conversion") _ <- ctx.logger.info(s"Submitted $n file conversion jobs")
_ <- joex.notifyAllNodes _ <- joex.notifyAllNodes
} yield () } yield ()
} }
@ -36,7 +40,7 @@ object ConvertAllPdfTask {
.chunks .chunks
.flatMap(createJobs[F](ctx)) .flatMap(createJobs[F](ctx))
.chunks .chunks
.evalMap(jobs => queue.insertAll(jobs.toVector).map(_ => jobs.size)) .evalMap(jobs => queue.insertAllIfNew(jobs.toVector).map(_ => jobs.size))
.evalTap(n => ctx.logger.debug(s"Submitted $n jobs …")) .evalTap(n => ctx.logger.debug(s"Submitted $n jobs …"))
.compile .compile
.foldMonoid .foldMonoid
@ -59,7 +63,7 @@ object ConvertAllPdfTask {
now, now,
collectiveOrSystem, collectiveOrSystem,
Priority.Low, Priority.Low,
Some(ra.id) Some(PdfConvTask.taskName / ra.id)
) )
val jobs = ras.traverse(mkJob) val jobs = ras.traverse(mkJob)

View File

@ -17,12 +17,7 @@ object ProcessItem {
)(item: ItemData): Task[F, ProcessItemArgs, ItemData] = )(item: ItemData): Task[F, ProcessItemArgs, ItemData] =
ExtractArchive(item) ExtractArchive(item)
.flatMap(Task.setProgress(20)) .flatMap(Task.setProgress(20))
.flatMap(ConvertPdf(cfg.convert, _)) .flatMap(processAttachments0(cfg, fts, (40, 60, 80)))
.flatMap(Task.setProgress(40))
.flatMap(TextExtraction(cfg.extraction, fts))
.flatMap(Task.setProgress(60))
.flatMap(analysisOnly[F](cfg))
.flatMap(Task.setProgress(80))
.flatMap(LinkProposal[F]) .flatMap(LinkProposal[F])
.flatMap(SetGivenData[F](itemOps)) .flatMap(SetGivenData[F](itemOps))
.flatMap(Task.setProgress(99)) .flatMap(Task.setProgress(99))
@ -31,12 +26,7 @@ object ProcessItem {
cfg: Config, cfg: Config,
fts: FtsClient[F] fts: FtsClient[F]
)(item: ItemData): Task[F, ProcessItemArgs, ItemData] = )(item: ItemData): Task[F, ProcessItemArgs, ItemData] =
ConvertPdf(cfg.convert, item) processAttachments0[F](cfg, fts, (30, 60, 90))(item)
.flatMap(Task.setProgress(30))
.flatMap(TextExtraction(cfg.extraction, fts))
.flatMap(Task.setProgress(60))
.flatMap(analysisOnly[F](cfg))
.flatMap(Task.setProgress(90))
def analysisOnly[F[_]: Sync]( def analysisOnly[F[_]: Sync](
cfg: Config cfg: Config
@ -45,4 +35,16 @@ object ProcessItem {
.flatMap(FindProposal[F](cfg.processing)) .flatMap(FindProposal[F](cfg.processing))
.flatMap(EvalProposals[F]) .flatMap(EvalProposals[F])
.flatMap(SaveProposals[F]) .flatMap(SaveProposals[F])
private def processAttachments0[F[_]: ConcurrentEffect: ContextShift](
cfg: Config,
fts: FtsClient[F],
progress: (Int, Int, Int)
)(item: ItemData): Task[F, ProcessItemArgs, ItemData] =
ConvertPdf(cfg.convert, item)
.flatMap(Task.setProgress(progress._1))
.flatMap(TextExtraction(cfg.extraction, fts))
.flatMap(Task.setProgress(progress._2))
.flatMap(analysisOnly[F](cfg))
.flatMap(Task.setProgress(progress._3))
} }

View File

@ -1213,6 +1213,33 @@ paths:
schema: schema:
$ref: "#/components/schemas/BasicResult" $ref: "#/components/schemas/BasicResult"
/sec/item/convertallpdfs:
post:
tags: [ Item ]
summary: Convert all non-converted pdfs.
description: |
Submits a job that will find all pdf files that have not been
converted and converts them using the ocrmypdf tool (if
enabled). This tool has been added in version 0.9.0 and so
older files can be "migrated" this way, or maybe after
enabling the tool.
The task finds all files of the current collective and submits
task for each file to convert. These tasks are submitted with
a low priority so that normal processing can still proceed.
The body of the request should be empty.
security:
- authTokenHeader: []
responses:
200:
description: Ok
content:
application/json:
schema:
$ref: "#/components/schemas/BasicResult"
/sec/item/search: /sec/item/search:
post: post:
tags: [ Item ] tags: [ Item ]
@ -1811,7 +1838,7 @@ paths:
content: content:
application/json: application/json:
schema: schema:
$ref: "#/components/schemas/StringList" $ref: "#/components/schemas/IdList"
responses: responses:
200: 200:
description: Ok description: Ok
@ -2629,6 +2656,17 @@ paths:
components: components:
schemas: schemas:
IdList:
description:
A list of identifiers.
required:
- ids
properties:
ids:
type: array
items:
type: string
format: ident
StringList: StringList:
description: | description: |
A simple list of strings. A simple list of strings.

View File

@ -34,8 +34,8 @@ object ItemRoutes {
case POST -> Root / "convertallpdfs" => case POST -> Root / "convertallpdfs" =>
for { for {
res <- res <-
backend.upload.convertAllPdf(user.account.collective.some, user.account, true) backend.item.convertAllPdf(user.account.collective.some, user.account, true)
resp <- Ok(Conversions.basicResult(res)) resp <- Ok(Conversions.basicResult(res, "Task submitted"))
} yield resp } yield resp
case req @ POST -> Root / "search" => case req @ POST -> Root / "search" =>
@ -288,11 +288,11 @@ object ItemRoutes {
case req @ POST -> Root / Ident(id) / "reprocess" => case req @ POST -> Root / Ident(id) / "reprocess" =>
for { for {
data <- req.as[StringList] data <- req.as[IdList]
ids = data.items.flatMap(s => Ident.fromString(s).toOption) ids = data.ids.flatMap(s => Ident.fromString(s).toOption)
_ <- logger.fdebug(s"Re-process item ${id.id}") _ <- logger.fdebug(s"Re-process item ${id.id}")
res <- backend.upload.reprocess(id, ids, user.account, true) res <- backend.item.reprocess(id, ids, user.account, true)
resp <- Ok(Conversions.basicResult(res)) resp <- Ok(Conversions.basicResult(res, "Re-process task submitted."))
} yield resp } yield resp
case DELETE -> Root / Ident(id) => case DELETE -> Root / Ident(id) =>

View File

@ -28,6 +28,8 @@ trait JobQueue[F[_]] {
def insertAll(jobs: Seq[RJob]): F[Unit] def insertAll(jobs: Seq[RJob]): F[Unit]
def insertAllIfNew(jobs: Seq[RJob]): F[Unit]
def nextJob( def nextJob(
prio: Ident => F[Priority], prio: Ident => F[Priority],
worker: Ident, worker: Ident,
@ -81,5 +83,13 @@ object JobQueue {
logger.error(ex)("Could not insert job. Skipping it.") logger.error(ex)("Could not insert job. Skipping it.")
}) })
def insertAllIfNew(jobs: Seq[RJob]): F[Unit] =
jobs.toList
.traverse(j => insertIfNew(j).attempt)
.map(_.foreach {
case Right(()) =>
case Left(ex) =>
logger.error(ex)("Could not insert job. Skipping it.")
})
}) })
} }