Merge pull request #228 from eikek/reprocess-item-files

Reprocess item files
This commit is contained in:
mergify[bot]
2020-08-13 20:25:22 +00:00
committed by GitHub
26 changed files with 882 additions and 33 deletions

View File

@ -61,7 +61,7 @@ object BackendApp {
uploadImpl <- OUpload(store, queue, cfg.files, joexImpl) uploadImpl <- OUpload(store, queue, cfg.files, joexImpl)
nodeImpl <- ONode(store) nodeImpl <- ONode(store)
jobImpl <- OJob(store, joexImpl) jobImpl <- OJob(store, joexImpl)
itemImpl <- OItem(store, ftsClient) itemImpl <- OItem(store, ftsClient, queue, joexImpl)
itemSearchImpl <- OItemSearch(store) itemSearchImpl <- OItemSearch(store)
fulltextImpl <- OFulltext(itemSearchImpl, ftsClient, store, queue, joexImpl) fulltextImpl <- OFulltext(itemSearchImpl, ftsClient, store, queue, joexImpl)
javaEmil = javaEmil =

View File

@ -8,6 +8,50 @@ import docspell.store.records.RJob
object JobFactory { object JobFactory {
def convertAllPdfs[F[_]: Sync](
collective: Option[Ident],
account: AccountId,
prio: Priority
): F[RJob] =
for {
id <- Ident.randomId[F]
now <- Timestamp.current[F]
job = RJob.newJob(
id,
ConvertAllPdfArgs.taskName,
account.collective,
ConvertAllPdfArgs(collective),
s"Convert all pdfs not yet converted",
now,
account.user,
prio,
collective
.map(c => c / ConvertAllPdfArgs.taskName)
.orElse(ConvertAllPdfArgs.taskName.some)
)
} yield job
def reprocessItem[F[_]: Sync](
args: ReProcessItemArgs,
account: AccountId,
prio: Priority
): F[RJob] =
for {
id <- Ident.randomId[F]
now <- Timestamp.current[F]
job = RJob.newJob(
id,
ReProcessItemArgs.taskName,
account.collective,
args,
s"Re-process files of item ${args.itemId.id}",
now,
account.user,
prio,
Some(ReProcessItemArgs.taskName / args.itemId)
)
} yield job
def processItem[F[_]: Sync]( def processItem[F[_]: Sync](
args: ProcessItemArgs, args: ProcessItemArgs,
account: AccountId, account: AccountId,

View File

@ -4,10 +4,12 @@ import cats.data.OptionT
import cats.effect.{Effect, Resource} import cats.effect.{Effect, Resource}
import cats.implicits._ import cats.implicits._
import docspell.backend.JobFactory
import docspell.common._ import docspell.common._
import docspell.ftsclient.FtsClient import docspell.ftsclient.FtsClient
import docspell.store.UpdateResult import docspell.store.UpdateResult
import docspell.store.queries.{QAttachment, QItem} import docspell.store.queries.{QAttachment, QItem}
import docspell.store.queue.JobQueue
import docspell.store.records._ import docspell.store.records._
import docspell.store.{AddResult, Store} import docspell.store.{AddResult, Store}
@ -76,11 +78,38 @@ trait OItem[F[_]] {
name: Option[String], name: Option[String],
collective: Ident collective: Ident
): F[AddResult] ): F[AddResult]
/** Submits the item for re-processing. The list of attachment ids can
* be used to only re-process a subset of the item's attachments.
* If this list is empty, all attachments are reprocessed. This
* call only submits the job into the queue.
*/
def reprocess(
item: Ident,
attachments: List[Ident],
account: AccountId,
notifyJoex: Boolean
): F[UpdateResult]
/** Submits a task that finds all non-converted pdfs and triggers
* converting them using ocrmypdf. Each file is converted by a
* separate task.
*/
def convertAllPdf(
collective: Option[Ident],
account: AccountId,
notifyJoex: Boolean
): F[UpdateResult]
} }
object OItem { object OItem {
def apply[F[_]: Effect](store: Store[F], fts: FtsClient[F]): Resource[F, OItem[F]] = def apply[F[_]: Effect](
store: Store[F],
fts: FtsClient[F],
queue: JobQueue[F],
joex: OJoex[F]
): Resource[F, OItem[F]] =
for { for {
otag <- OTag(store) otag <- OTag(store)
oorg <- OOrganization(store) oorg <- OOrganization(store)
@ -400,6 +429,35 @@ object OItem {
) )
) )
def reprocess(
item: Ident,
attachments: List[Ident],
account: AccountId,
notifyJoex: Boolean
): F[UpdateResult] =
(for {
_ <- OptionT(
store.transact(RItem.findByIdAndCollective(item, account.collective))
)
args = ReProcessItemArgs(item, attachments)
job <- OptionT.liftF(
JobFactory.reprocessItem[F](args, account, Priority.Low)
)
_ <- OptionT.liftF(queue.insertIfNew(job))
_ <- OptionT.liftF(if (notifyJoex) joex.notifyAllNodes else ().pure[F])
} yield UpdateResult.success).getOrElse(UpdateResult.notFound)
def convertAllPdf(
collective: Option[Ident],
account: AccountId,
notifyJoex: Boolean
): F[UpdateResult] =
for {
job <- JobFactory.convertAllPdfs[F](collective, account, Priority.Low)
_ <- queue.insertIfNew(job)
_ <- if (notifyJoex) joex.notifyAllNodes else ().pure[F]
} yield UpdateResult.success
private def onSuccessIgnoreError(update: F[Unit])(ar: AddResult): F[Unit] = private def onSuccessIgnoreError(update: F[Unit])(ar: AddResult): F[Unit] =
ar match { ar match {
case AddResult.Success => case AddResult.Success =>

View File

@ -48,7 +48,9 @@ object OJob {
def queueState(collective: Ident, maxResults: Int): F[CollectiveQueueState] = def queueState(collective: Ident, maxResults: Int): F[CollectiveQueueState] =
store store
.transact(QJob.queueStateSnapshot(collective).take(maxResults.toLong)) .transact(
QJob.queueStateSnapshot(collective, maxResults.toLong)
)
.map(t => JobDetail(t._1, t._2)) .map(t => JobDetail(t._1, t._2))
.compile .compile
.toVector .toVector

View File

@ -0,0 +1,26 @@
package docspell.common
import io.circe._
import io.circe.generic.semiauto._
/** Arguments for the task that finds all pdf files that have not been
* converted and submits for each a job that will convert the file
* using ocrmypdf.
*
* If the `collective` argument is present, then this task and the
* ones that are submitted by this task run in the realm of the
* collective (and only their files are considered). If it is empty,
* it is a system task and all files are considered.
*/
case class ConvertAllPdfArgs(collective: Option[Ident])
object ConvertAllPdfArgs {
val taskName = Ident.unsafe("submit-pdf-migration-tasks")
implicit val jsonDecoder: Decoder[ConvertAllPdfArgs] =
deriveDecoder[ConvertAllPdfArgs]
implicit val jsonEncoder: Encoder[ConvertAllPdfArgs] =
deriveEncoder[ConvertAllPdfArgs]
}

View File

@ -0,0 +1,24 @@
package docspell.common
import io.circe.generic.semiauto._
import io.circe.{Decoder, Encoder}
/** Arguments when re-processing an item.
*
* The `itemId` must exist and point to some item. If the attachment
* list is non-empty, only those attachments are re-processed. They
* must belong to the given item. If the list is empty, then all
* attachments are re-processed.
*/
case class ReProcessItemArgs(itemId: Ident, attachments: List[Ident])
object ReProcessItemArgs {
val taskName: Ident = Ident.unsafe("re-process-item")
implicit val jsonEncoder: Encoder[ReProcessItemArgs] =
deriveEncoder[ReProcessItemArgs]
implicit val jsonDecoder: Decoder[ReProcessItemArgs] =
deriveDecoder[ReProcessItemArgs]
}

View File

@ -13,7 +13,10 @@ import docspell.ftssolr.SolrFtsClient
import docspell.joex.fts.{MigrationTask, ReIndexTask} import docspell.joex.fts.{MigrationTask, ReIndexTask}
import docspell.joex.hk._ import docspell.joex.hk._
import docspell.joex.notify._ import docspell.joex.notify._
import docspell.joex.pdfconv.ConvertAllPdfTask
import docspell.joex.pdfconv.PdfConvTask
import docspell.joex.process.ItemHandler import docspell.joex.process.ItemHandler
import docspell.joex.process.ReProcessItem
import docspell.joex.scanmailbox._ import docspell.joex.scanmailbox._
import docspell.joex.scheduler._ import docspell.joex.scheduler._
import docspell.joexapi.client.JoexClient import docspell.joexapi.client.JoexClient
@ -84,7 +87,7 @@ object JoexAppImpl {
joex <- OJoex(client, store) joex <- OJoex(client, store)
upload <- OUpload(store, queue, cfg.files, joex) upload <- OUpload(store, queue, cfg.files, joex)
fts <- createFtsClient(cfg)(httpClient) fts <- createFtsClient(cfg)(httpClient)
itemOps <- OItem(store, fts) itemOps <- OItem(store, fts, queue, joex)
javaEmil = javaEmil =
JavaMailEmil(blocker, Settings.defaultSettings.copy(debug = cfg.mailDebug)) JavaMailEmil(blocker, Settings.defaultSettings.copy(debug = cfg.mailDebug))
sch <- SchedulerBuilder(cfg.scheduler, blocker, store) sch <- SchedulerBuilder(cfg.scheduler, blocker, store)
@ -96,6 +99,13 @@ object JoexAppImpl {
ItemHandler.onCancel[F] ItemHandler.onCancel[F]
) )
) )
.withTask(
JobTask.json(
ReProcessItemArgs.taskName,
ReProcessItem[F](cfg, fts),
ReProcessItem.onCancel[F]
)
)
.withTask( .withTask(
JobTask.json( JobTask.json(
NotifyDueItemsArgs.taskName, NotifyDueItemsArgs.taskName,
@ -131,6 +141,20 @@ object JoexAppImpl {
HouseKeepingTask.onCancel[F] HouseKeepingTask.onCancel[F]
) )
) )
.withTask(
JobTask.json(
PdfConvTask.taskName,
PdfConvTask[F](cfg),
PdfConvTask.onCancel[F]
)
)
.withTask(
JobTask.json(
ConvertAllPdfArgs.taskName,
ConvertAllPdfTask[F](queue, joex),
ConvertAllPdfTask.onCancel[F]
)
)
.resource .resource
psch <- PeriodicScheduler.create( psch <- PeriodicScheduler.create(
cfg.periodicScheduler, cfg.periodicScheduler,

View File

@ -0,0 +1,72 @@
package docspell.joex.pdfconv
import cats.effect._
import cats.implicits._
import fs2.{Chunk, Stream}
import docspell.backend.ops.OJoex
import docspell.common._
import docspell.joex.scheduler.{Context, Task}
import docspell.store.queue.JobQueue
import docspell.store.records.RAttachment
import docspell.store.records._
/* A task to find all non-converted pdf files (of a collective, or
* all) and converting them using ocrmypdf by submitting a job for
* each found file.
*/
object ConvertAllPdfTask {
type Args = ConvertAllPdfArgs
def apply[F[_]: Sync](queue: JobQueue[F], joex: OJoex[F]): Task[F, Args, Unit] =
Task { ctx =>
for {
_ <- ctx.logger.info("Converting pdfs using ocrmypdf")
n <- submitConversionJobs(ctx, queue)
_ <- ctx.logger.info(s"Submitted $n file conversion jobs")
_ <- joex.notifyAllNodes
} yield ()
}
def onCancel[F[_]: Sync]: Task[F, Args, Unit] =
Task.log(_.warn("Cancelling convert-old-pdf task"))
def submitConversionJobs[F[_]: Sync](
ctx: Context[F, Args],
queue: JobQueue[F]
): F[Int] =
ctx.store
.transact(RAttachment.findNonConvertedPdf(ctx.args.collective, 50))
.chunks
.flatMap(createJobs[F](ctx))
.chunks
.evalMap(jobs => queue.insertAllIfNew(jobs.toVector).map(_ => jobs.size))
.evalTap(n => ctx.logger.debug(s"Submitted $n jobs …"))
.compile
.foldMonoid
private def createJobs[F[_]: Sync](
ctx: Context[F, Args]
)(ras: Chunk[RAttachment]): Stream[F, RJob] = {
val collectiveOrSystem = ctx.args.collective.getOrElse(DocspellSystem.taskGroup)
def mkJob(ra: RAttachment): F[RJob] =
for {
id <- Ident.randomId[F]
now <- Timestamp.current[F]
} yield RJob.newJob(
id,
PdfConvTask.taskName,
collectiveOrSystem,
PdfConvTask.Args(ra.id),
s"Convert pdf ${ra.id.id}/${ra.name.getOrElse("-")}",
now,
collectiveOrSystem,
Priority.Low,
Some(PdfConvTask.taskName / ra.id)
)
val jobs = ras.traverse(mkJob)
Stream.evalUnChunk(jobs)
}
}

View File

@ -0,0 +1,155 @@
package docspell.joex.pdfconv
import cats.data.Kleisli
import cats.data.OptionT
import cats.effect._
import cats.implicits._
import fs2.Stream
import docspell.common._
import docspell.convert.ConversionResult
import docspell.convert.extern.OcrMyPdf
import docspell.joex.Config
import docspell.joex.scheduler.{Context, Task}
import docspell.store.records._
import bitpeace.FileMeta
import bitpeace.Mimetype
import bitpeace.MimetypeHint
import bitpeace.RangeDef
import io.circe.generic.semiauto._
import io.circe.{Decoder, Encoder}
/** Converts the given attachment file using ocrmypdf if it is a pdf
* and has not already been converted (the source file is the same as
* in the attachment).
*/
object PdfConvTask {
case class Args(attachId: Ident)
object Args {
implicit val jsonDecoder: Decoder[Args] =
deriveDecoder[Args]
implicit val jsonEncoder: Encoder[Args] =
deriveEncoder[Args]
}
val taskName = Ident.unsafe("pdf-files-migration")
def apply[F[_]: Sync: ContextShift](cfg: Config): Task[F, Args, Unit] =
Task { ctx =>
for {
_ <- ctx.logger.info(s"Converting pdf file ${ctx.args} using ocrmypdf")
meta <- checkInputs(cfg, ctx)
_ <- meta.traverse(fm => convert(cfg, ctx, fm))
} yield ()
}
def onCancel[F[_]: Sync]: Task[F, Args, Unit] =
Task.log(_.warn("Cancelling pdfconv task"))
// --- Helper
// check if file exists and if it is pdf and if source id is the same and if ocrmypdf is enabled
def checkInputs[F[_]: Sync](cfg: Config, ctx: Context[F, Args]): F[Option[FileMeta]] = {
val none: Option[FileMeta] = None
val checkSameFiles =
(for {
ra <- OptionT(ctx.store.transact(RAttachment.findById(ctx.args.attachId)))
isSame <- OptionT.liftF(
ctx.store.transact(RAttachmentSource.isSameFile(ra.id, ra.fileId))
)
} yield isSame).getOrElse(false)
val existsPdf =
for {
meta <- ctx.store.transact(RAttachment.findMeta(ctx.args.attachId))
res = meta.filter(_.mimetype.matches(Mimetype.`application/pdf`))
_ <-
if (res.isEmpty)
ctx.logger.info(
s"The attachment ${ctx.args.attachId} doesn't exist or is no pdf: $meta"
)
else ().pure[F]
} yield res
if (cfg.convert.ocrmypdf.enabled)
checkSameFiles.flatMap {
case true => existsPdf
case false =>
ctx.logger.info(
s"The attachment ${ctx.args.attachId} already has been converted. Skipping."
) *>
none.pure[F]
}
else none.pure[F]
}
def convert[F[_]: Sync: ContextShift](
cfg: Config,
ctx: Context[F, Args],
in: FileMeta
): F[Unit] = {
val bp = ctx.store.bitpeace
val data = Stream
.emit(in)
.through(bp.fetchData2(RangeDef.all))
val storeResult: ConversionResult.Handler[F, Unit] =
Kleisli({
case ConversionResult.SuccessPdf(file) =>
storeToAttachment(ctx, in, file)
case ConversionResult.SuccessPdfTxt(file, _) =>
storeToAttachment(ctx, in, file)
case ConversionResult.UnsupportedFormat(mime) =>
ctx.logger.warn(
s"Unable to convert '${mime}' file ${ctx.args}: unsupported format."
)
case ConversionResult.InputMalformed(mime, reason) =>
ctx.logger.warn(s"Unable to convert '${mime}' file ${ctx.args}: $reason")
case ConversionResult.Failure(ex) =>
ctx.logger.error(ex)(s"Failure converting file ${ctx.args}: ${ex.getMessage}")
})
def ocrMyPdf(lang: Language): F[Unit] =
OcrMyPdf.toPDF[F, Unit](
cfg.convert.ocrmypdf,
lang,
in.chunksize,
ctx.blocker,
ctx.logger
)(data, storeResult)
for {
lang <- getLanguage(ctx)
_ <- ocrMyPdf(lang)
} yield ()
}
def getLanguage[F[_]: Sync](ctx: Context[F, Args]): F[Language] =
(for {
coll <- OptionT(ctx.store.transact(RCollective.findByAttachment(ctx.args.attachId)))
lang = coll.language
} yield lang).getOrElse(Language.German)
def storeToAttachment[F[_]: Sync](
ctx: Context[F, Args],
meta: FileMeta,
newFile: Stream[F, Byte]
): F[Unit] = {
val mimeHint = MimetypeHint.advertised(meta.mimetype.asString)
for {
time <- Timestamp.current[F]
fid <- Ident.randomId[F]
_ <-
ctx.store.bitpeace
.saveNew(newFile, meta.chunksize, mimeHint, Some(fid.id), time.value)
.compile
.lastOrError
_ <- ctx.store.transact(RAttachment.updateFileId(ctx.args.attachId, fid))
} yield ()
}
}

View File

@ -126,11 +126,46 @@ object ConvertPdf {
.compile .compile
.lastOrError .lastOrError
.map(fm => Ident.unsafe(fm.id)) .map(fm => Ident.unsafe(fm.id))
.flatMap(fmId => .flatMap(fmId => updateAttachment[F](ctx, ra, fmId, newName).map(_ => fmId))
ctx.store
.transact(RAttachment.updateFileIdAndName(ra.id, fmId, newName))
.map(_ => fmId)
)
.map(fmId => ra.copy(fileId = fmId, name = newName)) .map(fmId => ra.copy(fileId = fmId, name = newName))
} }
private def updateAttachment[F[_]: Sync](
ctx: Context[F, _],
ra: RAttachment,
fmId: Ident,
newName: Option[String]
): F[Unit] =
for {
oldFile <- ctx.store.transact(RAttachment.findById(ra.id))
_ <-
ctx.store
.transact(RAttachment.updateFileIdAndName(ra.id, fmId, newName))
_ <- oldFile match {
case Some(raPrev) =>
for {
sameFile <-
ctx.store
.transact(RAttachmentSource.isSameFile(ra.id, raPrev.fileId))
_ <-
if (sameFile) ().pure[F]
else
ctx.logger.info("Deleting previous attachment file") *>
ctx.store.bitpeace
.delete(raPrev.fileId.id)
.compile
.drain
.attempt
.flatMap {
case Right(_) => ().pure[F]
case Left(ex) =>
ctx.logger
.error(ex)(s"Cannot delete previous attachment file: ${raPrev}")
}
} yield ()
case None =>
().pure[F]
}
} yield ()
} }

View File

@ -17,16 +17,17 @@ object ProcessItem {
)(item: ItemData): Task[F, ProcessItemArgs, ItemData] = )(item: ItemData): Task[F, ProcessItemArgs, ItemData] =
ExtractArchive(item) ExtractArchive(item)
.flatMap(Task.setProgress(20)) .flatMap(Task.setProgress(20))
.flatMap(ConvertPdf(cfg.convert, _)) .flatMap(processAttachments0(cfg, fts, (40, 60, 80)))
.flatMap(Task.setProgress(40))
.flatMap(TextExtraction(cfg.extraction, fts))
.flatMap(Task.setProgress(60))
.flatMap(analysisOnly[F](cfg))
.flatMap(Task.setProgress(80))
.flatMap(LinkProposal[F]) .flatMap(LinkProposal[F])
.flatMap(SetGivenData[F](itemOps)) .flatMap(SetGivenData[F](itemOps))
.flatMap(Task.setProgress(99)) .flatMap(Task.setProgress(99))
def processAttachments[F[_]: ConcurrentEffect: ContextShift](
cfg: Config,
fts: FtsClient[F]
)(item: ItemData): Task[F, ProcessItemArgs, ItemData] =
processAttachments0[F](cfg, fts, (30, 60, 90))(item)
def analysisOnly[F[_]: Sync]( def analysisOnly[F[_]: Sync](
cfg: Config cfg: Config
)(item: ItemData): Task[F, ProcessItemArgs, ItemData] = )(item: ItemData): Task[F, ProcessItemArgs, ItemData] =
@ -34,4 +35,16 @@ object ProcessItem {
.flatMap(FindProposal[F](cfg.processing)) .flatMap(FindProposal[F](cfg.processing))
.flatMap(EvalProposals[F]) .flatMap(EvalProposals[F])
.flatMap(SaveProposals[F]) .flatMap(SaveProposals[F])
private def processAttachments0[F[_]: ConcurrentEffect: ContextShift](
cfg: Config,
fts: FtsClient[F],
progress: (Int, Int, Int)
)(item: ItemData): Task[F, ProcessItemArgs, ItemData] =
ConvertPdf(cfg.convert, item)
.flatMap(Task.setProgress(progress._1))
.flatMap(TextExtraction(cfg.extraction, fts))
.flatMap(Task.setProgress(progress._2))
.flatMap(analysisOnly[F](cfg))
.flatMap(Task.setProgress(progress._3))
} }

View File

@ -0,0 +1,131 @@
package docspell.joex.process
import cats.data.OptionT
import cats.effect._
import cats.implicits._
import docspell.common._
import docspell.ftsclient.FtsClient
import docspell.joex.Config
import docspell.joex.scheduler.Context
import docspell.joex.scheduler.Task
import docspell.store.records.RAttachment
import docspell.store.records.RAttachmentSource
import docspell.store.records.RCollective
import docspell.store.records.RItem
object ReProcessItem {
type Args = ReProcessItemArgs
def apply[F[_]: ConcurrentEffect: ContextShift](
cfg: Config,
fts: FtsClient[F]
): Task[F, Args, Unit] =
loadItem[F]
.flatMap(safeProcess[F](cfg, fts))
.map(_ => ())
def onCancel[F[_]: Sync: ContextShift]: Task[F, Args, Unit] =
logWarn("Now cancelling re-processing.")
// --- Helpers
private def contains[F[_]](ctx: Context[F, Args]): RAttachment => Boolean = {
val selection = ctx.args.attachments.toSet
if (selection.isEmpty) (_ => true)
else ra => selection.contains(ra.id)
}
def loadItem[F[_]: Sync]: Task[F, Args, ItemData] =
Task { ctx =>
(for {
item <- OptionT(ctx.store.transact(RItem.findById(ctx.args.itemId)))
attach <- OptionT.liftF(ctx.store.transact(RAttachment.findByItem(item.id)))
asrc <-
OptionT.liftF(ctx.store.transact(RAttachmentSource.findByItem(ctx.args.itemId)))
asrcMap = asrc.map(s => s.id -> s).toMap
// copy the original files over to attachments to run the default processing task
// the processing doesn't touch the original files, only RAttachments
attachSrc =
attach
.filter(contains(ctx))
.flatMap(a =>
asrcMap.get(a.id).map { src =>
a.copy(fileId = src.fileId, name = src.name)
}
)
} yield ItemData(
item,
attachSrc,
Vector.empty,
Vector.empty,
asrcMap.view.mapValues(_.fileId).toMap,
MetaProposalList.empty,
Nil
)).getOrElseF(
Sync[F].raiseError(new Exception(s"Item not found: ${ctx.args.itemId.id}"))
)
}
def processFiles[F[_]: ConcurrentEffect: ContextShift](
cfg: Config,
fts: FtsClient[F],
data: ItemData
): Task[F, Args, ItemData] = {
val convertArgs: Language => Args => F[ProcessItemArgs] =
lang =>
args =>
ProcessItemArgs(
ProcessItemArgs.ProcessMeta(
data.item.cid,
args.itemId.some,
lang,
None, //direction
"", //source-id
None, //folder
Seq.empty
),
Nil
).pure[F]
getLanguage[F].flatMap { lang =>
ProcessItem
.processAttachments[F](cfg, fts)(data)
.contramap[Args](convertArgs(lang))
}
}
def getLanguage[F[_]: Sync]: Task[F, Args, Language] =
Task { ctx =>
(for {
coll <- OptionT(ctx.store.transact(RCollective.findByItem(ctx.args.itemId)))
lang = coll.language
} yield lang).getOrElse(Language.German)
}
def isLastRetry[F[_]: Sync]: Task[F, Args, Boolean] =
Task(_.isLastRetry)
def safeProcess[F[_]: ConcurrentEffect: ContextShift](
cfg: Config,
fts: FtsClient[F]
)(data: ItemData): Task[F, Args, ItemData] =
isLastRetry[F].flatMap {
case true =>
processFiles[F](cfg, fts, data).attempt
.flatMap({
case Right(d) =>
Task.pure(d)
case Left(ex) =>
logWarn[F](
"Processing failed on last retry."
).andThen(_ => Sync[F].raiseError(ex))
})
case false =>
processFiles[F](cfg, fts, data)
}
private def logWarn[F[_]](msg: => String): Task[F, Args, Unit] =
Task(_.logger.warn(msg))
}

View File

@ -1213,6 +1213,33 @@ paths:
schema: schema:
$ref: "#/components/schemas/BasicResult" $ref: "#/components/schemas/BasicResult"
/sec/item/convertallpdfs:
post:
tags: [ Item ]
summary: Convert all non-converted pdfs.
description: |
Submits a job that will find all pdf files that have not been
converted and converts them using the ocrmypdf tool (if
enabled). This tool has been added in version 0.9.0 and so
older files can be "migrated" this way, or maybe after
enabling the tool.
The task finds all files of the current collective and submits
task for each file to convert. These tasks are submitted with
a low priority so that normal processing can still proceed.
The body of the request should be empty.
security:
- authTokenHeader: []
responses:
200:
description: Ok
content:
application/json:
schema:
$ref: "#/components/schemas/BasicResult"
/sec/item/search: /sec/item/search:
post: post:
tags: [ Item ] tags: [ Item ]
@ -1796,6 +1823,31 @@ paths:
application/json: application/json:
schema: schema:
$ref: "#/components/schemas/ItemProposals" $ref: "#/components/schemas/ItemProposals"
/sec/item/{itemId}/reprocess:
post:
tags: [ Item ]
summary: Start reprocessing the files of the item.
description: |
This submits a job that will re-process the files (either all
or the ones specified) of the item and replace the metadata.
security:
- authTokenHeader: []
parameters:
- $ref: "#/components/parameters/id"
requestBody:
content:
application/json:
schema:
$ref: "#/components/schemas/IdList"
responses:
200:
description: Ok
content:
application/json:
schema:
$ref: "#/components/schemas/BasicResult"
/sec/item/{itemId}/attachment/movebefore: /sec/item/{itemId}/attachment/movebefore:
post: post:
tags: [ Item ] tags: [ Item ]
@ -2604,6 +2656,17 @@ paths:
components: components:
schemas: schemas:
IdList:
description:
A list of identifiers.
required:
- ids
properties:
ids:
type: array
items:
type: string
format: ident
StringList: StringList:
description: | description: |
A simple list of strings. A simple list of strings.

View File

@ -31,6 +31,13 @@ object ItemRoutes {
import dsl._ import dsl._
HttpRoutes.of { HttpRoutes.of {
case POST -> Root / "convertallpdfs" =>
for {
res <-
backend.item.convertAllPdf(user.account.collective.some, user.account, true)
resp <- Ok(Conversions.basicResult(res, "Task submitted"))
} yield resp
case req @ POST -> Root / "search" => case req @ POST -> Root / "search" =>
for { for {
mask <- req.as[ItemSearch] mask <- req.as[ItemSearch]
@ -279,6 +286,15 @@ object ItemRoutes {
resp <- Ok(Conversions.basicResult(res, "Attachment moved.")) resp <- Ok(Conversions.basicResult(res, "Attachment moved."))
} yield resp } yield resp
case req @ POST -> Root / Ident(id) / "reprocess" =>
for {
data <- req.as[IdList]
ids = data.ids.flatMap(s => Ident.fromString(s).toOption)
_ <- logger.fdebug(s"Re-process item ${id.id}")
res <- backend.item.reprocess(id, ids, user.account, true)
resp <- Ok(Conversions.basicResult(res, "Re-process task submitted."))
} yield resp
case DELETE -> Root / Ident(id) => case DELETE -> Root / Ident(id) =>
for { for {
n <- backend.item.deleteItem(id, user.account.collective) n <- backend.item.deleteItem(id, user.account.collective)

View File

@ -21,7 +21,7 @@ object JobQueueRoutes {
HttpRoutes.of { HttpRoutes.of {
case GET -> Root / "state" => case GET -> Root / "state" =>
for { for {
js <- backend.job.queueState(user.account.collective, 200) js <- backend.job.queueState(user.account.collective, 40)
res = Conversions.mkJobQueueState(js) res = Conversions.mkJobQueueState(js)
resp <- Ok(res) resp <- Ok(res)
} yield resp } yield resp

View File

@ -209,7 +209,8 @@ object QJob {
store.transact(RJob.findFromIds(ids)) store.transact(RJob.findFromIds(ids))
def queueStateSnapshot( def queueStateSnapshot(
collective: Ident collective: Ident,
max: Long
): Stream[ConnectionIO, (RJob, Vector[RJobLog])] = { ): Stream[ConnectionIO, (RJob, Vector[RJobLog])] = {
val JC = RJob.Columns val JC = RJob.Columns
val waiting: Set[JobState] = Set(JobState.Waiting, JobState.Stuck, JobState.Scheduled) val waiting: Set[JobState] = Set(JobState.Waiting, JobState.Stuck, JobState.Scheduled)
@ -218,18 +219,34 @@ object QJob {
def selectJobs(now: Timestamp): Stream[ConnectionIO, RJob] = { def selectJobs(now: Timestamp): Stream[ConnectionIO, RJob] = {
val refDate = now.minusHours(24) val refDate = now.minusHours(24)
val sql = selectSimple(
val runningJobs = (selectSimple(
JC.all,
RJob.table,
and(JC.group.is(collective), JC.state.isOneOf(running.toSeq))
) ++ orderBy(JC.submitted.desc)).query[RJob].stream
val waitingJobs = (selectSimple(
JC.all, JC.all,
RJob.table, RJob.table,
and( and(
JC.group.is(collective), JC.group.is(collective),
or( JC.state.isOneOf(waiting.toSeq),
and(JC.state.isOneOf(done.toSeq), JC.submitted.isGt(refDate)), JC.submitted.isGt(refDate)
JC.state.isOneOf((running ++ waiting).toSeq)
) )
) ++ orderBy(JC.submitted.desc)).query[RJob].stream.take(max)
val doneJobs = (selectSimple(
JC.all,
RJob.table,
and(
JC.group.is(collective),
JC.state.isOneOf(done.toSeq),
JC.submitted.isGt(refDate)
) )
) ) ++ orderBy(JC.submitted.desc)).query[RJob].stream.take(max)
(sql ++ orderBy(JC.submitted.desc)).query[RJob].stream
runningJobs ++ waitingJobs ++ doneJobs
} }
def selectLogs(job: RJob): ConnectionIO[Vector[RJobLog]] = def selectLogs(job: RJob): ConnectionIO[Vector[RJobLog]] =

View File

@ -28,6 +28,8 @@ trait JobQueue[F[_]] {
def insertAll(jobs: Seq[RJob]): F[Unit] def insertAll(jobs: Seq[RJob]): F[Unit]
def insertAllIfNew(jobs: Seq[RJob]): F[Unit]
def nextJob( def nextJob(
prio: Ident => F[Priority], prio: Ident => F[Priority],
worker: Ident, worker: Ident,
@ -81,5 +83,13 @@ object JobQueue {
logger.error(ex)("Could not insert job. Skipping it.") logger.error(ex)("Could not insert job. Skipping it.")
}) })
def insertAllIfNew(jobs: Seq[RJob]): F[Unit] =
jobs.toList
.traverse(j => insertIfNew(j).attempt)
.map(_.foreach {
case Right(()) =>
case Left(ex) =>
logger.error(ex)("Could not insert job. Skipping it.")
})
}) })
} }

View File

@ -1,6 +1,7 @@
package docspell.store.records package docspell.store.records
import cats.implicits._ import cats.implicits._
import fs2.Stream
import docspell.common._ import docspell.common._
import docspell.store.impl.Implicits._ import docspell.store.impl.Implicits._
@ -71,6 +72,16 @@ object RAttachment {
commas(fileId.setTo(fId), name.setTo(fname)) commas(fileId.setTo(fId), name.setTo(fname))
).update.run ).update.run
def updateFileId(
attachId: Ident,
fId: Ident
): ConnectionIO[Int] =
updateRow(
table,
id.is(attachId),
fileId.setTo(fId)
).update.run
def updatePosition(attachId: Ident, pos: Int): ConnectionIO[Int] = def updatePosition(attachId: Ident, pos: Int): ConnectionIO[Int] =
updateRow(table, id.is(attachId), position.setTo(pos)).update.run updateRow(table, id.is(attachId), position.setTo(pos)).update.run
@ -187,4 +198,32 @@ object RAttachment {
def findItemId(attachId: Ident): ConnectionIO[Option[Ident]] = def findItemId(attachId: Ident): ConnectionIO[Option[Ident]] =
selectSimple(Seq(itemId), table, id.is(attachId)).query[Ident].option selectSimple(Seq(itemId), table, id.is(attachId)).query[Ident].option
def findNonConvertedPdf(
coll: Option[Ident],
chunkSize: Int
): Stream[ConnectionIO, RAttachment] = {
val aId = Columns.id.prefix("a")
val aItem = Columns.itemId.prefix("a")
val aFile = Columns.fileId.prefix("a")
val sId = RAttachmentSource.Columns.id.prefix("s")
val sFile = RAttachmentSource.Columns.fileId.prefix("s")
val iId = RItem.Columns.id.prefix("i")
val iColl = RItem.Columns.cid.prefix("i")
val mId = RFileMeta.Columns.id.prefix("m")
val mType = RFileMeta.Columns.mimetype.prefix("m")
val pdfType = "application/pdf%"
val from = table ++ fr"a INNER JOIN" ++
RAttachmentSource.table ++ fr"s ON" ++ sId.is(aId) ++ fr"INNER JOIN" ++
RItem.table ++ fr"i ON" ++ iId.is(aItem) ++ fr"INNER JOIN" ++
RFileMeta.table ++ fr"m ON" ++ aFile.is(mId)
val where = coll match {
case Some(cid) => and(iColl.is(cid), aFile.is(sFile), mType.lowerLike(pdfType))
case None => and(aFile.is(sFile), mType.lowerLike(pdfType))
}
selectSimple(all.map(_.prefix("a")), from, where)
.query[RAttachment]
.streamWithChunkSize(chunkSize)
}
} }

View File

@ -42,6 +42,12 @@ object RAttachmentSource {
def findById(attachId: Ident): ConnectionIO[Option[RAttachmentSource]] = def findById(attachId: Ident): ConnectionIO[Option[RAttachmentSource]] =
selectSimple(all, table, id.is(attachId)).query[RAttachmentSource].option selectSimple(all, table, id.is(attachId)).query[RAttachmentSource].option
def isSameFile(attachId: Ident, file: Ident): ConnectionIO[Boolean] =
selectCount(id, table, and(id.is(attachId), fileId.is(file)))
.query[Int]
.unique
.map(_ > 0)
def delete(attachId: Ident): ConnectionIO[Int] = def delete(attachId: Ident): ConnectionIO[Int] =
deleteFrom(table, id.is(attachId)).update.run deleteFrom(table, id.is(attachId)).update.run
@ -64,6 +70,17 @@ object RAttachmentSource {
selectSimple(all.map(_.prefix("a")), from, where).query[RAttachmentSource].option selectSimple(all.map(_.prefix("a")), from, where).query[RAttachmentSource].option
} }
def findByItem(itemId: Ident): ConnectionIO[Vector[RAttachmentSource]] = {
val sId = Columns.id.prefix("s")
val aId = RAttachment.Columns.id.prefix("a")
val aItem = RAttachment.Columns.itemId.prefix("a")
val from = table ++ fr"s INNER JOIN" ++ RAttachment.table ++ fr"a ON" ++ sId.is(aId)
selectSimple(all.map(_.prefix("s")), from, aItem.is(itemId))
.query[RAttachmentSource]
.to[Vector]
}
def findByItemWithMeta( def findByItemWithMeta(
id: Ident id: Ident
): ConnectionIO[Vector[(RAttachmentSource, FileMeta)]] = { ): ConnectionIO[Vector[(RAttachmentSource, FileMeta)]] = {

View File

@ -75,6 +75,14 @@ object RCollective {
sql.query[RCollective].option sql.query[RCollective].option
} }
def findByItem(itemId: Ident): ConnectionIO[Option[RCollective]] = {
val iColl = RItem.Columns.cid.prefix("i")
val iId = RItem.Columns.id.prefix("i")
val cId = id.prefix("c")
val from = RItem.table ++ fr"i INNER JOIN" ++ table ++ fr"c ON" ++ iColl.is(cId)
selectSimple(all.map(_.prefix("c")), from, iId.is(itemId)).query[RCollective].option
}
def existsById(cid: Ident): ConnectionIO[Boolean] = { def existsById(cid: Ident): ConnectionIO[Boolean] = {
val sql = selectCount(id, table, id.is(cid)) val sql = selectCount(id, table, id.is(cid))
sql.query[Int].unique.map(_ > 0) sql.query[Int].unique.map(_ > 0)
@ -90,5 +98,19 @@ object RCollective {
sql.query[RCollective].stream sql.query[RCollective].stream
} }
def findByAttachment(attachId: Ident): ConnectionIO[Option[RCollective]] = {
val iColl = RItem.Columns.cid.prefix("i")
val iId = RItem.Columns.id.prefix("i")
val aItem = RAttachment.Columns.itemId.prefix("a")
val aId = RAttachment.Columns.id.prefix("a")
val cId = Columns.id.prefix("c")
val from = table ++ fr"c INNER JOIN" ++
RItem.table ++ fr"i ON" ++ cId.is(iColl) ++ fr"INNER JOIN" ++
RAttachment.table ++ fr"a ON" ++ aItem.is(iId)
selectSimple(all.map(_.prefix("c")), from, aId.is(attachId)).query[RCollective].option
}
case class Settings(language: Language, integrationEnabled: Boolean) case class Settings(language: Language, integrationEnabled: Boolean)
} }

View File

@ -314,6 +314,9 @@ object RItem {
def findByIdAndCollective(itemId: Ident, coll: Ident): ConnectionIO[Option[RItem]] = def findByIdAndCollective(itemId: Ident, coll: Ident): ConnectionIO[Option[RItem]] =
selectSimple(all, table, and(id.is(itemId), cid.is(coll))).query[RItem].option selectSimple(all, table, and(id.is(itemId), cid.is(coll))).query[RItem].option
def findById(itemId: Ident): ConnectionIO[Option[RItem]] =
selectSimple(all, table, id.is(itemId)).query[RItem].option
def checkByIdAndCollective(itemId: Ident, coll: Ident): ConnectionIO[Option[Ident]] = def checkByIdAndCollective(itemId: Ident, coll: Ident): ConnectionIO[Option[Ident]] =
selectSimple(Seq(id), table, and(id.is(itemId), cid.is(coll))).query[Ident].option selectSimple(Seq(id), table, and(id.is(itemId), cid.is(coll))).query[Ident].option

29
tools/convert-all-pdfs.sh Executable file
View File

@ -0,0 +1,29 @@
#!/usr/bin/env bash
#
# Simple script to authenticate with docspell and trigger the "convert
# all pdf" route that submits a task to convert all pdf files using
# ocrmypdf.
set -e
BASE_URL="${1:-http://localhost:7880}"
LOGIN_URL="$BASE_URL/api/v1/open/auth/login"
TRIGGER_URL="$BASE_URL/api/v1/sec/item/convertallpdfs"
echo "Login to trigger converting all pdfs."
echo "Using url: $BASE_URL"
echo -n "Account: "
read USER
echo -n "Password: "
read -s PASS
echo
auth=$(curl --fail -XPOST --silent --data-binary "{\"account\":\"$USER\", \"password\":\"$PASS\"}" "$LOGIN_URL")
if [ "$(echo $auth | jq .success)" == "true" ]; then
echo "Login successful"
auth_token=$(echo $auth | jq -r .token)
curl --fail -XPOST -H "X-Docspell-Auth: $auth_token" "$TRIGGER_URL"
else
echo "Login failed."
fi

View File

@ -67,7 +67,7 @@ logged in.
The relevant part of the config file regarding the scheduler is shown The relevant part of the config file regarding the scheduler is shown
below with some explanations. below with some explanations.
``` ``` conf
docspell.joex { docspell.joex {
# other settings left out for brevity # other settings left out for brevity

View File

@ -0,0 +1,46 @@
+++
title = "Convert All PDFs"
description = "Convert all PDF files using OcrMyPdf."
weight = 60
+++
# convert-all-pdf.sh
With version 0.9.0 there was support added for another external tool,
[OCRMyPdf](https://github.com/jbarlow83/OCRmyPDF), that can convert
PDF files such that they contain the OCR-ed text layer. This tool is
optional and can be disabled.
In order to convert all previously processed files with this tool,
there is an
[endpoint](/openapi/docspell-openapi.html#api-Item-secItemConvertallpdfsPost)
that submits a task to convert all PDF files not already converted for
your collective.
There is no UI part to trigger this route, so you need to use curl or
the script `convert-all-pdfs.sh` in the `tools/` directory.
# Usage
```
./convert-all-pdfs.sh [docspell-base-url]
```
For example, if docspell is at `http://localhost:7880`:
```
./convert-all-pdfs.sh http://localhost:7880
```
The script asks for your account name and password. It then logs in
and triggers the said endpoint. After this you should see a few tasks
running.
There will be one task per file to convert. All these tasks are
submitted with a low priority. So files uploaded through the webapp or
a [source](@/docs/webapp/uploading.md#anonymous-upload) with a high
priority, will be preferred as [configured in the job
executor](@/docs/joex/_index.md#scheduler-config). This is to not
disturb normal processing when many conversion tasks are being
executed.

Binary file not shown.

After

Width:  |  Height:  |  Size: 46 KiB

View File

@ -29,6 +29,8 @@ scripts. For this the next variant exists.
It is also possible to upload files without authentication. This It is also possible to upload files without authentication. This
should make tools that interact with docspell much easier to write. should make tools that interact with docspell much easier to write.
The [Android Client App](@/docs/tools/android.md) uses these urls to
upload files.
Go to "Collective Settings" and then to the "Source" tab. A *Source* Go to "Collective Settings" and then to the "Source" tab. A *Source*
identifies an endpoint where files can be uploaded anonymously. identifies an endpoint where files can be uploaded anonymously.
@ -41,7 +43,7 @@ username is not visible.
Example screenshot: Example screenshot:
{{ figure(file="sources-form.png") }} {{ figure(file="sources-edit.png") }}
This example shows a source with name "test". Besides a description This example shows a source with name "test". Besides a description
and a name that is only used for displaying purposes, a priority and a and a name that is only used for displaying purposes, a priority and a
@ -58,25 +60,26 @@ The source endpoint defines two urls:
- `/app/upload/<id>` - `/app/upload/<id>`
- `/api/v1/open/upload/item/<id>` - `/api/v1/open/upload/item/<id>`
{{ figure(file="sources-form.png") }}
The first points to a web page where everyone could upload files into The first points to a web page where everyone could upload files into
your account. You could give this url to people for sending files your account. You could give this url to people for sending files
directly into your docspell. directly into your docspell.
The second url is the API url, which accepts the requests to upload The second url is the API url, which accepts the requests to upload
files (it is used by the upload page, the first url). files. This second url can be used with the [Android Client
App](@/docs/tools/android.md) to upload files.
For example, the api url can be used to upload files with curl: Another example is to use curl for uploading files from the command
line::
``` bash ``` bash
$ curl -XPOST -F file=@test.pdf http://192.168.1.95:7880/api/v1/open/upload/item/3H7hvJcDJuk-NrAW4zxsdfj-K6TMPyb6BGP-xKptVxUdqWa $ curl -XPOST -F file=@test.pdf http://192.168.1.95:7880/api/v1/open/upload/item/3H7hvJcDJuk-NrAW4zxsdfj-K6TMPyb6BGP-xKptVxUdqWa
{"success":true,"message":"Files submitted."} {"success":true,"message":"Files submitted."}
``` ```
You could add more `-F file=@/path/to/your/file.pdf` to upload There is a [script provided](@/docs/tools/ds.md) that uses curl to
multiple files (note, the `@` is required by curl, so it knows that upload files from the command line more conveniently.
the following is a file). There is a [script
provided](@/docs/tools/ds.md) that uses this to upload files from the
command line.
When files are uploaded to an source endpoint, the items resulting When files are uploaded to an source endpoint, the items resulting
from this uploads are marked with the name of the source. So you know from this uploads are marked with the name of the source. So you know