From 07e9a9767e15d2781247e42fc466d28644da9a7f Mon Sep 17 00:00:00 2001 From: Eike Kettner Date: Wed, 12 Aug 2020 22:26:44 +0200 Subject: [PATCH] Add a task to re-process files of an item --- .../scala/docspell/backend/JobFactory.scala | 22 +++ .../scala/docspell/backend/ops/OUpload.scala | 28 ++++ .../docspell/common/ReProcessItemArgs.scala | 24 ++++ .../scala/docspell/joex/JoexAppImpl.scala | 8 ++ .../docspell/joex/process/ConvertPdf.scala | 45 +++++- .../docspell/joex/process/ProcessItem.scala | 11 ++ .../docspell/joex/process/ReProcessItem.scala | 131 ++++++++++++++++++ .../src/main/resources/docspell-openapi.yml | 25 ++++ .../restserver/routes/ItemRoutes.scala | 9 ++ .../docspell/store/records/RAttachment.scala | 10 ++ .../store/records/RAttachmentSource.scala | 17 +++ .../docspell/store/records/RCollective.scala | 22 +++ .../scala/docspell/store/records/RItem.scala | 3 + 13 files changed, 350 insertions(+), 5 deletions(-) create mode 100644 modules/common/src/main/scala/docspell/common/ReProcessItemArgs.scala create mode 100644 modules/joex/src/main/scala/docspell/joex/process/ReProcessItem.scala diff --git a/modules/backend/src/main/scala/docspell/backend/JobFactory.scala b/modules/backend/src/main/scala/docspell/backend/JobFactory.scala index d7d8fe91..96399ffa 100644 --- a/modules/backend/src/main/scala/docspell/backend/JobFactory.scala +++ b/modules/backend/src/main/scala/docspell/backend/JobFactory.scala @@ -8,6 +8,28 @@ import docspell.store.records.RJob object JobFactory { + def reprocessItem[F[_]: Sync]( + args: ReProcessItemArgs, + account: AccountId, + prio: Priority, + tracker: Option[Ident] + ): F[RJob] = + for { + id <- Ident.randomId[F] + now <- Timestamp.current[F] + job = RJob.newJob( + id, + ReProcessItemArgs.taskName, + account.collective, + args, + s"Re-process files of item ${args.itemId.id}", + now, + account.user, + prio, + tracker + ) + } yield job + def processItem[F[_]: Sync]( args: ProcessItemArgs, account: AccountId, diff --git a/modules/backend/src/main/scala/docspell/backend/ops/OUpload.scala b/modules/backend/src/main/scala/docspell/backend/ops/OUpload.scala index a9145f72..a6fbce49 100644 --- a/modules/backend/src/main/scala/docspell/backend/ops/OUpload.scala +++ b/modules/backend/src/main/scala/docspell/backend/ops/OUpload.scala @@ -44,6 +44,19 @@ trait OUpload[F[_]] { case Left(srcId) => submit(data, srcId, notifyJoex, itemId) } + + /** Submits the item for re-processing. The list of attachment ids can + * be used to only re-process a subset of the item's attachments. + * If this list is empty, all attachments are reprocessed. This + * call only submits the job into the queue. + */ + def reprocess( + item: Ident, + attachments: List[Ident], + account: AccountId, + notifyJoex: Boolean + ): F[OUpload.UploadResult] + } object OUpload { @@ -159,6 +172,21 @@ object OUpload { result <- OptionT.liftF(submit(updata, accId, notifyJoex, itemId)) } yield result).getOrElse(UploadResult.noSource) + def reprocess( + item: Ident, + attachments: List[Ident], + account: AccountId, + notifyJoex: Boolean + ): F[UploadResult] = + (for { + _ <- + OptionT(store.transact(RItem.findByIdAndCollective(item, account.collective))) + args = ReProcessItemArgs(item, attachments) + job <- + OptionT.liftF(JobFactory.reprocessItem[F](args, account, Priority.Low, None)) + res <- OptionT.liftF(submitJobs(notifyJoex)(Vector(job))) + } yield res).getOrElse(UploadResult.noItem) + private def submitJobs( notifyJoex: Boolean )(jobs: Vector[RJob]): F[OUpload.UploadResult] = diff --git a/modules/common/src/main/scala/docspell/common/ReProcessItemArgs.scala b/modules/common/src/main/scala/docspell/common/ReProcessItemArgs.scala new file mode 100644 index 00000000..f8afdd58 --- /dev/null +++ b/modules/common/src/main/scala/docspell/common/ReProcessItemArgs.scala @@ -0,0 +1,24 @@ +package docspell.common + +import io.circe.generic.semiauto._ +import io.circe.{Decoder, Encoder} + +/** Arguments when re-processing an item. + * + * The `itemId` must exist and point to some item. If the attachment + * list is non-empty, only those attachments are re-processed. They + * must belong to the given item. If the list is empty, then all + * attachments are re-processed. + */ +case class ReProcessItemArgs(itemId: Ident, attachments: List[Ident]) + +object ReProcessItemArgs { + + val taskName: Ident = Ident.unsafe("re-process-item") + + implicit val jsonEncoder: Encoder[ReProcessItemArgs] = + deriveEncoder[ReProcessItemArgs] + + implicit val jsonDecoder: Decoder[ReProcessItemArgs] = + deriveDecoder[ReProcessItemArgs] +} diff --git a/modules/joex/src/main/scala/docspell/joex/JoexAppImpl.scala b/modules/joex/src/main/scala/docspell/joex/JoexAppImpl.scala index 965659b7..9882455c 100644 --- a/modules/joex/src/main/scala/docspell/joex/JoexAppImpl.scala +++ b/modules/joex/src/main/scala/docspell/joex/JoexAppImpl.scala @@ -14,6 +14,7 @@ import docspell.joex.fts.{MigrationTask, ReIndexTask} import docspell.joex.hk._ import docspell.joex.notify._ import docspell.joex.process.ItemHandler +import docspell.joex.process.ReProcessItem import docspell.joex.scanmailbox._ import docspell.joex.scheduler._ import docspell.joexapi.client.JoexClient @@ -96,6 +97,13 @@ object JoexAppImpl { ItemHandler.onCancel[F] ) ) + .withTask( + JobTask.json( + ReProcessItemArgs.taskName, + ReProcessItem[F](cfg, fts), + ReProcessItem.onCancel[F] + ) + ) .withTask( JobTask.json( NotifyDueItemsArgs.taskName, diff --git a/modules/joex/src/main/scala/docspell/joex/process/ConvertPdf.scala b/modules/joex/src/main/scala/docspell/joex/process/ConvertPdf.scala index ba75ec3a..572a18bb 100644 --- a/modules/joex/src/main/scala/docspell/joex/process/ConvertPdf.scala +++ b/modules/joex/src/main/scala/docspell/joex/process/ConvertPdf.scala @@ -126,11 +126,46 @@ object ConvertPdf { .compile .lastOrError .map(fm => Ident.unsafe(fm.id)) - .flatMap(fmId => - ctx.store - .transact(RAttachment.updateFileIdAndName(ra.id, fmId, newName)) - .map(_ => fmId) - ) + .flatMap(fmId => updateAttachment[F](ctx, ra, fmId, newName).map(_ => fmId)) .map(fmId => ra.copy(fileId = fmId, name = newName)) } + + private def updateAttachment[F[_]: Sync]( + ctx: Context[F, _], + ra: RAttachment, + fmId: Ident, + newName: Option[String] + ): F[Unit] = + for { + oldFile <- ctx.store.transact(RAttachment.findById(ra.id)) + _ <- + ctx.store + .transact(RAttachment.updateFileIdAndName(ra.id, fmId, newName)) + _ <- oldFile match { + case Some(raPrev) => + for { + sameFile <- + ctx.store + .transact(RAttachmentSource.isSameFile(ra.id, raPrev.fileId)) + _ <- + if (sameFile) ().pure[F] + else + ctx.logger.info("Deleting previous attachment file") *> + ctx.store.bitpeace + .delete(raPrev.fileId.id) + .compile + .drain + .attempt + .flatMap { + case Right(_) => ().pure[F] + case Left(ex) => + ctx.logger + .error(ex)(s"Cannot delete previous attachment file: ${raPrev}") + + } + } yield () + case None => + ().pure[F] + } + } yield () } diff --git a/modules/joex/src/main/scala/docspell/joex/process/ProcessItem.scala b/modules/joex/src/main/scala/docspell/joex/process/ProcessItem.scala index 139ec8f6..72eefa39 100644 --- a/modules/joex/src/main/scala/docspell/joex/process/ProcessItem.scala +++ b/modules/joex/src/main/scala/docspell/joex/process/ProcessItem.scala @@ -27,6 +27,17 @@ object ProcessItem { .flatMap(SetGivenData[F](itemOps)) .flatMap(Task.setProgress(99)) + def processAttachments[F[_]: ConcurrentEffect: ContextShift]( + cfg: Config, + fts: FtsClient[F] + )(item: ItemData): Task[F, ProcessItemArgs, ItemData] = + ConvertPdf(cfg.convert, item) + .flatMap(Task.setProgress(30)) + .flatMap(TextExtraction(cfg.extraction, fts)) + .flatMap(Task.setProgress(60)) + .flatMap(analysisOnly[F](cfg)) + .flatMap(Task.setProgress(90)) + def analysisOnly[F[_]: Sync]( cfg: Config )(item: ItemData): Task[F, ProcessItemArgs, ItemData] = diff --git a/modules/joex/src/main/scala/docspell/joex/process/ReProcessItem.scala b/modules/joex/src/main/scala/docspell/joex/process/ReProcessItem.scala new file mode 100644 index 00000000..8f5e11f2 --- /dev/null +++ b/modules/joex/src/main/scala/docspell/joex/process/ReProcessItem.scala @@ -0,0 +1,131 @@ +package docspell.joex.process + +import cats.data.OptionT +import cats.effect._ +import cats.implicits._ + +import docspell.common._ +import docspell.ftsclient.FtsClient +import docspell.joex.Config +import docspell.joex.scheduler.Context +import docspell.joex.scheduler.Task +import docspell.store.records.RAttachment +import docspell.store.records.RAttachmentSource +import docspell.store.records.RCollective +import docspell.store.records.RItem + +object ReProcessItem { + type Args = ReProcessItemArgs + + def apply[F[_]: ConcurrentEffect: ContextShift]( + cfg: Config, + fts: FtsClient[F] + ): Task[F, Args, Unit] = + loadItem[F] + .flatMap(safeProcess[F](cfg, fts)) + .map(_ => ()) + + def onCancel[F[_]: Sync: ContextShift]: Task[F, Args, Unit] = + logWarn("Now cancelling re-processing.") + + // --- Helpers + + private def contains[F[_]](ctx: Context[F, Args]): RAttachment => Boolean = { + val selection = ctx.args.attachments.toSet + if (selection.isEmpty) (_ => true) + else ra => selection.contains(ra.id) + } + + def loadItem[F[_]: Sync]: Task[F, Args, ItemData] = + Task { ctx => + (for { + item <- OptionT(ctx.store.transact(RItem.findById(ctx.args.itemId))) + attach <- OptionT.liftF(ctx.store.transact(RAttachment.findByItem(item.id))) + asrc <- + OptionT.liftF(ctx.store.transact(RAttachmentSource.findByItem(ctx.args.itemId))) + asrcMap = asrc.map(s => s.id -> s).toMap + // copy the original files over to attachments to run the default processing task + // the processing doesn't touch the original files, only RAttachments + attachSrc = + attach + .filter(contains(ctx)) + .flatMap(a => + asrcMap.get(a.id).map { src => + a.copy(fileId = src.fileId, name = src.name) + } + ) + } yield ItemData( + item, + attachSrc, + Vector.empty, + Vector.empty, + asrcMap.view.mapValues(_.fileId).toMap, + MetaProposalList.empty, + Nil + )).getOrElseF( + Sync[F].raiseError(new Exception(s"Item not found: ${ctx.args.itemId.id}")) + ) + } + + def processFiles[F[_]: ConcurrentEffect: ContextShift]( + cfg: Config, + fts: FtsClient[F], + data: ItemData + ): Task[F, Args, ItemData] = { + + val convertArgs: Language => Args => F[ProcessItemArgs] = + lang => + args => + ProcessItemArgs( + ProcessItemArgs.ProcessMeta( + data.item.cid, + args.itemId.some, + lang, + None, //direction + "", //source-id + None, //folder + Seq.empty + ), + Nil + ).pure[F] + + getLanguage[F].flatMap { lang => + ProcessItem + .processAttachments[F](cfg, fts)(data) + .contramap[Args](convertArgs(lang)) + } + } + + def getLanguage[F[_]: Sync]: Task[F, Args, Language] = + Task { ctx => + (for { + coll <- OptionT(ctx.store.transact(RCollective.findByItem(ctx.args.itemId))) + lang = coll.language + } yield lang).getOrElse(Language.German) + } + + def isLastRetry[F[_]: Sync]: Task[F, Args, Boolean] = + Task(_.isLastRetry) + + def safeProcess[F[_]: ConcurrentEffect: ContextShift]( + cfg: Config, + fts: FtsClient[F] + )(data: ItemData): Task[F, Args, ItemData] = + isLastRetry[F].flatMap { + case true => + processFiles[F](cfg, fts, data).attempt + .flatMap({ + case Right(d) => + Task.pure(d) + case Left(ex) => + logWarn[F]( + "Processing failed on last retry." + ).andThen(_ => Sync[F].raiseError(ex)) + }) + case false => + processFiles[F](cfg, fts, data) + } + + private def logWarn[F[_]](msg: => String): Task[F, Args, Unit] = + Task(_.logger.warn(msg)) +} diff --git a/modules/restapi/src/main/resources/docspell-openapi.yml b/modules/restapi/src/main/resources/docspell-openapi.yml index 7833b28e..c8831ed4 100644 --- a/modules/restapi/src/main/resources/docspell-openapi.yml +++ b/modules/restapi/src/main/resources/docspell-openapi.yml @@ -1796,6 +1796,31 @@ paths: application/json: schema: $ref: "#/components/schemas/ItemProposals" + /sec/item/{itemId}/reprocess: + post: + tags: [ Item ] + summary: Start reprocessing the files of the item. + description: | + This submits a job that will re-process the files (either all + or the ones specified) of the item and replace the metadata. + security: + - authTokenHeader: [] + parameters: + - $ref: "#/components/parameters/id" + requestBody: + content: + application/json: + schema: + $ref: "#/components/schemas/StringList" + responses: + 200: + description: Ok + content: + application/json: + schema: + $ref: "#/components/schemas/BasicResult" + + /sec/item/{itemId}/attachment/movebefore: post: tags: [ Item ] diff --git a/modules/restserver/src/main/scala/docspell/restserver/routes/ItemRoutes.scala b/modules/restserver/src/main/scala/docspell/restserver/routes/ItemRoutes.scala index 8f51d79a..d932d407 100644 --- a/modules/restserver/src/main/scala/docspell/restserver/routes/ItemRoutes.scala +++ b/modules/restserver/src/main/scala/docspell/restserver/routes/ItemRoutes.scala @@ -279,6 +279,15 @@ object ItemRoutes { resp <- Ok(Conversions.basicResult(res, "Attachment moved.")) } yield resp + case req @ POST -> Root / Ident(id) / "reprocess" => + for { + data <- req.as[StringList] + ids = data.items.flatMap(s => Ident.fromString(s).toOption) + _ <- logger.fdebug(s"Re-process item ${id.id}") + res <- backend.upload.reprocess(id, ids, user.account, true) + resp <- Ok(Conversions.basicResult(res)) + } yield resp + case DELETE -> Root / Ident(id) => for { n <- backend.item.deleteItem(id, user.account.collective) diff --git a/modules/store/src/main/scala/docspell/store/records/RAttachment.scala b/modules/store/src/main/scala/docspell/store/records/RAttachment.scala index 61d676b6..8c93de42 100644 --- a/modules/store/src/main/scala/docspell/store/records/RAttachment.scala +++ b/modules/store/src/main/scala/docspell/store/records/RAttachment.scala @@ -71,6 +71,16 @@ object RAttachment { commas(fileId.setTo(fId), name.setTo(fname)) ).update.run + def updateFileId( + attachId: Ident, + fId: Ident + ): ConnectionIO[Int] = + updateRow( + table, + id.is(attachId), + fileId.setTo(fId) + ).update.run + def updatePosition(attachId: Ident, pos: Int): ConnectionIO[Int] = updateRow(table, id.is(attachId), position.setTo(pos)).update.run diff --git a/modules/store/src/main/scala/docspell/store/records/RAttachmentSource.scala b/modules/store/src/main/scala/docspell/store/records/RAttachmentSource.scala index 58b0a6c7..d732ecff 100644 --- a/modules/store/src/main/scala/docspell/store/records/RAttachmentSource.scala +++ b/modules/store/src/main/scala/docspell/store/records/RAttachmentSource.scala @@ -42,6 +42,12 @@ object RAttachmentSource { def findById(attachId: Ident): ConnectionIO[Option[RAttachmentSource]] = selectSimple(all, table, id.is(attachId)).query[RAttachmentSource].option + def isSameFile(attachId: Ident, file: Ident): ConnectionIO[Boolean] = + selectCount(id, table, and(id.is(attachId), fileId.is(file))) + .query[Int] + .unique + .map(_ > 0) + def delete(attachId: Ident): ConnectionIO[Int] = deleteFrom(table, id.is(attachId)).update.run @@ -64,6 +70,17 @@ object RAttachmentSource { selectSimple(all.map(_.prefix("a")), from, where).query[RAttachmentSource].option } + def findByItem(itemId: Ident): ConnectionIO[Vector[RAttachmentSource]] = { + val sId = Columns.id.prefix("s") + val aId = RAttachment.Columns.id.prefix("a") + val aItem = RAttachment.Columns.itemId.prefix("a") + + val from = table ++ fr"s INNER JOIN" ++ RAttachment.table ++ fr"a ON" ++ sId.is(aId) + selectSimple(all.map(_.prefix("s")), from, aItem.is(itemId)) + .query[RAttachmentSource] + .to[Vector] + } + def findByItemWithMeta( id: Ident ): ConnectionIO[Vector[(RAttachmentSource, FileMeta)]] = { diff --git a/modules/store/src/main/scala/docspell/store/records/RCollective.scala b/modules/store/src/main/scala/docspell/store/records/RCollective.scala index 9d27bd1e..22115d5e 100644 --- a/modules/store/src/main/scala/docspell/store/records/RCollective.scala +++ b/modules/store/src/main/scala/docspell/store/records/RCollective.scala @@ -75,6 +75,14 @@ object RCollective { sql.query[RCollective].option } + def findByItem(itemId: Ident): ConnectionIO[Option[RCollective]] = { + val iColl = RItem.Columns.cid.prefix("i") + val iId = RItem.Columns.id.prefix("i") + val cId = id.prefix("c") + val from = RItem.table ++ fr"i INNER JOIN" ++ table ++ fr"c ON" ++ iColl.is(cId) + selectSimple(all.map(_.prefix("c")), from, iId.is(itemId)).query[RCollective].option + } + def existsById(cid: Ident): ConnectionIO[Boolean] = { val sql = selectCount(id, table, id.is(cid)) sql.query[Int].unique.map(_ > 0) @@ -90,5 +98,19 @@ object RCollective { sql.query[RCollective].stream } + def findByAttachment(attachId: Ident): ConnectionIO[Option[RCollective]] = { + val iColl = RItem.Columns.cid.prefix("i") + val iId = RItem.Columns.id.prefix("i") + val aItem = RAttachment.Columns.itemId.prefix("a") + val aId = RAttachment.Columns.id.prefix("a") + val cId = Columns.id.prefix("c") + + val from = table ++ fr"c INNER JOIN" ++ + RItem.table ++ fr"i ON" ++ cId.is(iColl) ++ fr"INNER JOIN" ++ + RAttachment.table ++ fr"a ON" ++ aItem.is(iId) + + selectSimple(all, from, aId.is(attachId)).query[RCollective].option + } + case class Settings(language: Language, integrationEnabled: Boolean) } diff --git a/modules/store/src/main/scala/docspell/store/records/RItem.scala b/modules/store/src/main/scala/docspell/store/records/RItem.scala index e961e8b2..a0025ddb 100644 --- a/modules/store/src/main/scala/docspell/store/records/RItem.scala +++ b/modules/store/src/main/scala/docspell/store/records/RItem.scala @@ -314,6 +314,9 @@ object RItem { def findByIdAndCollective(itemId: Ident, coll: Ident): ConnectionIO[Option[RItem]] = selectSimple(all, table, and(id.is(itemId), cid.is(coll))).query[RItem].option + def findById(itemId: Ident): ConnectionIO[Option[RItem]] = + selectSimple(all, table, id.is(itemId)).query[RItem].option + def checkByIdAndCollective(itemId: Ident, coll: Ident): ConnectionIO[Option[Ident]] = selectSimple(Seq(id), table, and(id.is(itemId), cid.is(coll))).query[Ident].option