Add a task to re-process files of an item

This commit is contained in:
Eike Kettner 2020-08-12 22:26:44 +02:00
parent 8e15478e3c
commit 07e9a9767e
13 changed files with 350 additions and 5 deletions

View File

@ -8,6 +8,28 @@ import docspell.store.records.RJob
object JobFactory {
def reprocessItem[F[_]: Sync](
args: ReProcessItemArgs,
account: AccountId,
prio: Priority,
tracker: Option[Ident]
): F[RJob] =
for {
id <- Ident.randomId[F]
now <- Timestamp.current[F]
job = RJob.newJob(
id,
ReProcessItemArgs.taskName,
account.collective,
args,
s"Re-process files of item ${args.itemId.id}",
now,
account.user,
prio,
tracker
)
} yield job
def processItem[F[_]: Sync](
args: ProcessItemArgs,
account: AccountId,

View File

@ -44,6 +44,19 @@ trait OUpload[F[_]] {
case Left(srcId) =>
submit(data, srcId, notifyJoex, itemId)
}
/** Submits the item for re-processing. The list of attachment ids can
* be used to only re-process a subset of the item's attachments.
* If this list is empty, all attachments are reprocessed. This
* call only submits the job into the queue.
*/
def reprocess(
item: Ident,
attachments: List[Ident],
account: AccountId,
notifyJoex: Boolean
): F[OUpload.UploadResult]
}
object OUpload {
@ -159,6 +172,21 @@ object OUpload {
result <- OptionT.liftF(submit(updata, accId, notifyJoex, itemId))
} yield result).getOrElse(UploadResult.noSource)
def reprocess(
item: Ident,
attachments: List[Ident],
account: AccountId,
notifyJoex: Boolean
): F[UploadResult] =
(for {
_ <-
OptionT(store.transact(RItem.findByIdAndCollective(item, account.collective)))
args = ReProcessItemArgs(item, attachments)
job <-
OptionT.liftF(JobFactory.reprocessItem[F](args, account, Priority.Low, None))
res <- OptionT.liftF(submitJobs(notifyJoex)(Vector(job)))
} yield res).getOrElse(UploadResult.noItem)
private def submitJobs(
notifyJoex: Boolean
)(jobs: Vector[RJob]): F[OUpload.UploadResult] =

View File

@ -0,0 +1,24 @@
package docspell.common
import io.circe.generic.semiauto._
import io.circe.{Decoder, Encoder}
/** Arguments when re-processing an item.
*
* The `itemId` must exist and point to some item. If the attachment
* list is non-empty, only those attachments are re-processed. They
* must belong to the given item. If the list is empty, then all
* attachments are re-processed.
*/
case class ReProcessItemArgs(itemId: Ident, attachments: List[Ident])
object ReProcessItemArgs {
val taskName: Ident = Ident.unsafe("re-process-item")
implicit val jsonEncoder: Encoder[ReProcessItemArgs] =
deriveEncoder[ReProcessItemArgs]
implicit val jsonDecoder: Decoder[ReProcessItemArgs] =
deriveDecoder[ReProcessItemArgs]
}

View File

@ -14,6 +14,7 @@ import docspell.joex.fts.{MigrationTask, ReIndexTask}
import docspell.joex.hk._
import docspell.joex.notify._
import docspell.joex.process.ItemHandler
import docspell.joex.process.ReProcessItem
import docspell.joex.scanmailbox._
import docspell.joex.scheduler._
import docspell.joexapi.client.JoexClient
@ -96,6 +97,13 @@ object JoexAppImpl {
ItemHandler.onCancel[F]
)
)
.withTask(
JobTask.json(
ReProcessItemArgs.taskName,
ReProcessItem[F](cfg, fts),
ReProcessItem.onCancel[F]
)
)
.withTask(
JobTask.json(
NotifyDueItemsArgs.taskName,

View File

@ -126,11 +126,46 @@ object ConvertPdf {
.compile
.lastOrError
.map(fm => Ident.unsafe(fm.id))
.flatMap(fmId =>
ctx.store
.transact(RAttachment.updateFileIdAndName(ra.id, fmId, newName))
.map(_ => fmId)
)
.flatMap(fmId => updateAttachment[F](ctx, ra, fmId, newName).map(_ => fmId))
.map(fmId => ra.copy(fileId = fmId, name = newName))
}
private def updateAttachment[F[_]: Sync](
ctx: Context[F, _],
ra: RAttachment,
fmId: Ident,
newName: Option[String]
): F[Unit] =
for {
oldFile <- ctx.store.transact(RAttachment.findById(ra.id))
_ <-
ctx.store
.transact(RAttachment.updateFileIdAndName(ra.id, fmId, newName))
_ <- oldFile match {
case Some(raPrev) =>
for {
sameFile <-
ctx.store
.transact(RAttachmentSource.isSameFile(ra.id, raPrev.fileId))
_ <-
if (sameFile) ().pure[F]
else
ctx.logger.info("Deleting previous attachment file") *>
ctx.store.bitpeace
.delete(raPrev.fileId.id)
.compile
.drain
.attempt
.flatMap {
case Right(_) => ().pure[F]
case Left(ex) =>
ctx.logger
.error(ex)(s"Cannot delete previous attachment file: ${raPrev}")
}
} yield ()
case None =>
().pure[F]
}
} yield ()
}

View File

@ -27,6 +27,17 @@ object ProcessItem {
.flatMap(SetGivenData[F](itemOps))
.flatMap(Task.setProgress(99))
def processAttachments[F[_]: ConcurrentEffect: ContextShift](
cfg: Config,
fts: FtsClient[F]
)(item: ItemData): Task[F, ProcessItemArgs, ItemData] =
ConvertPdf(cfg.convert, item)
.flatMap(Task.setProgress(30))
.flatMap(TextExtraction(cfg.extraction, fts))
.flatMap(Task.setProgress(60))
.flatMap(analysisOnly[F](cfg))
.flatMap(Task.setProgress(90))
def analysisOnly[F[_]: Sync](
cfg: Config
)(item: ItemData): Task[F, ProcessItemArgs, ItemData] =

View File

@ -0,0 +1,131 @@
package docspell.joex.process
import cats.data.OptionT
import cats.effect._
import cats.implicits._
import docspell.common._
import docspell.ftsclient.FtsClient
import docspell.joex.Config
import docspell.joex.scheduler.Context
import docspell.joex.scheduler.Task
import docspell.store.records.RAttachment
import docspell.store.records.RAttachmentSource
import docspell.store.records.RCollective
import docspell.store.records.RItem
object ReProcessItem {
type Args = ReProcessItemArgs
def apply[F[_]: ConcurrentEffect: ContextShift](
cfg: Config,
fts: FtsClient[F]
): Task[F, Args, Unit] =
loadItem[F]
.flatMap(safeProcess[F](cfg, fts))
.map(_ => ())
def onCancel[F[_]: Sync: ContextShift]: Task[F, Args, Unit] =
logWarn("Now cancelling re-processing.")
// --- Helpers
private def contains[F[_]](ctx: Context[F, Args]): RAttachment => Boolean = {
val selection = ctx.args.attachments.toSet
if (selection.isEmpty) (_ => true)
else ra => selection.contains(ra.id)
}
def loadItem[F[_]: Sync]: Task[F, Args, ItemData] =
Task { ctx =>
(for {
item <- OptionT(ctx.store.transact(RItem.findById(ctx.args.itemId)))
attach <- OptionT.liftF(ctx.store.transact(RAttachment.findByItem(item.id)))
asrc <-
OptionT.liftF(ctx.store.transact(RAttachmentSource.findByItem(ctx.args.itemId)))
asrcMap = asrc.map(s => s.id -> s).toMap
// copy the original files over to attachments to run the default processing task
// the processing doesn't touch the original files, only RAttachments
attachSrc =
attach
.filter(contains(ctx))
.flatMap(a =>
asrcMap.get(a.id).map { src =>
a.copy(fileId = src.fileId, name = src.name)
}
)
} yield ItemData(
item,
attachSrc,
Vector.empty,
Vector.empty,
asrcMap.view.mapValues(_.fileId).toMap,
MetaProposalList.empty,
Nil
)).getOrElseF(
Sync[F].raiseError(new Exception(s"Item not found: ${ctx.args.itemId.id}"))
)
}
def processFiles[F[_]: ConcurrentEffect: ContextShift](
cfg: Config,
fts: FtsClient[F],
data: ItemData
): Task[F, Args, ItemData] = {
val convertArgs: Language => Args => F[ProcessItemArgs] =
lang =>
args =>
ProcessItemArgs(
ProcessItemArgs.ProcessMeta(
data.item.cid,
args.itemId.some,
lang,
None, //direction
"", //source-id
None, //folder
Seq.empty
),
Nil
).pure[F]
getLanguage[F].flatMap { lang =>
ProcessItem
.processAttachments[F](cfg, fts)(data)
.contramap[Args](convertArgs(lang))
}
}
def getLanguage[F[_]: Sync]: Task[F, Args, Language] =
Task { ctx =>
(for {
coll <- OptionT(ctx.store.transact(RCollective.findByItem(ctx.args.itemId)))
lang = coll.language
} yield lang).getOrElse(Language.German)
}
def isLastRetry[F[_]: Sync]: Task[F, Args, Boolean] =
Task(_.isLastRetry)
def safeProcess[F[_]: ConcurrentEffect: ContextShift](
cfg: Config,
fts: FtsClient[F]
)(data: ItemData): Task[F, Args, ItemData] =
isLastRetry[F].flatMap {
case true =>
processFiles[F](cfg, fts, data).attempt
.flatMap({
case Right(d) =>
Task.pure(d)
case Left(ex) =>
logWarn[F](
"Processing failed on last retry."
).andThen(_ => Sync[F].raiseError(ex))
})
case false =>
processFiles[F](cfg, fts, data)
}
private def logWarn[F[_]](msg: => String): Task[F, Args, Unit] =
Task(_.logger.warn(msg))
}

View File

@ -1796,6 +1796,31 @@ paths:
application/json:
schema:
$ref: "#/components/schemas/ItemProposals"
/sec/item/{itemId}/reprocess:
post:
tags: [ Item ]
summary: Start reprocessing the files of the item.
description: |
This submits a job that will re-process the files (either all
or the ones specified) of the item and replace the metadata.
security:
- authTokenHeader: []
parameters:
- $ref: "#/components/parameters/id"
requestBody:
content:
application/json:
schema:
$ref: "#/components/schemas/StringList"
responses:
200:
description: Ok
content:
application/json:
schema:
$ref: "#/components/schemas/BasicResult"
/sec/item/{itemId}/attachment/movebefore:
post:
tags: [ Item ]

View File

@ -279,6 +279,15 @@ object ItemRoutes {
resp <- Ok(Conversions.basicResult(res, "Attachment moved."))
} yield resp
case req @ POST -> Root / Ident(id) / "reprocess" =>
for {
data <- req.as[StringList]
ids = data.items.flatMap(s => Ident.fromString(s).toOption)
_ <- logger.fdebug(s"Re-process item ${id.id}")
res <- backend.upload.reprocess(id, ids, user.account, true)
resp <- Ok(Conversions.basicResult(res))
} yield resp
case DELETE -> Root / Ident(id) =>
for {
n <- backend.item.deleteItem(id, user.account.collective)

View File

@ -71,6 +71,16 @@ object RAttachment {
commas(fileId.setTo(fId), name.setTo(fname))
).update.run
def updateFileId(
attachId: Ident,
fId: Ident
): ConnectionIO[Int] =
updateRow(
table,
id.is(attachId),
fileId.setTo(fId)
).update.run
def updatePosition(attachId: Ident, pos: Int): ConnectionIO[Int] =
updateRow(table, id.is(attachId), position.setTo(pos)).update.run

View File

@ -42,6 +42,12 @@ object RAttachmentSource {
def findById(attachId: Ident): ConnectionIO[Option[RAttachmentSource]] =
selectSimple(all, table, id.is(attachId)).query[RAttachmentSource].option
def isSameFile(attachId: Ident, file: Ident): ConnectionIO[Boolean] =
selectCount(id, table, and(id.is(attachId), fileId.is(file)))
.query[Int]
.unique
.map(_ > 0)
def delete(attachId: Ident): ConnectionIO[Int] =
deleteFrom(table, id.is(attachId)).update.run
@ -64,6 +70,17 @@ object RAttachmentSource {
selectSimple(all.map(_.prefix("a")), from, where).query[RAttachmentSource].option
}
def findByItem(itemId: Ident): ConnectionIO[Vector[RAttachmentSource]] = {
val sId = Columns.id.prefix("s")
val aId = RAttachment.Columns.id.prefix("a")
val aItem = RAttachment.Columns.itemId.prefix("a")
val from = table ++ fr"s INNER JOIN" ++ RAttachment.table ++ fr"a ON" ++ sId.is(aId)
selectSimple(all.map(_.prefix("s")), from, aItem.is(itemId))
.query[RAttachmentSource]
.to[Vector]
}
def findByItemWithMeta(
id: Ident
): ConnectionIO[Vector[(RAttachmentSource, FileMeta)]] = {

View File

@ -75,6 +75,14 @@ object RCollective {
sql.query[RCollective].option
}
def findByItem(itemId: Ident): ConnectionIO[Option[RCollective]] = {
val iColl = RItem.Columns.cid.prefix("i")
val iId = RItem.Columns.id.prefix("i")
val cId = id.prefix("c")
val from = RItem.table ++ fr"i INNER JOIN" ++ table ++ fr"c ON" ++ iColl.is(cId)
selectSimple(all.map(_.prefix("c")), from, iId.is(itemId)).query[RCollective].option
}
def existsById(cid: Ident): ConnectionIO[Boolean] = {
val sql = selectCount(id, table, id.is(cid))
sql.query[Int].unique.map(_ > 0)
@ -90,5 +98,19 @@ object RCollective {
sql.query[RCollective].stream
}
def findByAttachment(attachId: Ident): ConnectionIO[Option[RCollective]] = {
val iColl = RItem.Columns.cid.prefix("i")
val iId = RItem.Columns.id.prefix("i")
val aItem = RAttachment.Columns.itemId.prefix("a")
val aId = RAttachment.Columns.id.prefix("a")
val cId = Columns.id.prefix("c")
val from = table ++ fr"c INNER JOIN" ++
RItem.table ++ fr"i ON" ++ cId.is(iColl) ++ fr"INNER JOIN" ++
RAttachment.table ++ fr"a ON" ++ aItem.is(iId)
selectSimple(all, from, aId.is(attachId)).query[RCollective].option
}
case class Settings(language: Language, integrationEnabled: Boolean)
}

View File

@ -314,6 +314,9 @@ object RItem {
def findByIdAndCollective(itemId: Ident, coll: Ident): ConnectionIO[Option[RItem]] =
selectSimple(all, table, and(id.is(itemId), cid.is(coll))).query[RItem].option
def findById(itemId: Ident): ConnectionIO[Option[RItem]] =
selectSimple(all, table, id.is(itemId)).query[RItem].option
def checkByIdAndCollective(itemId: Ident, coll: Ident): ConnectionIO[Option[Ident]] =
selectSimple(Seq(id), table, and(id.is(itemId), cid.is(coll))).query[Ident].option