diff --git a/modules/files/src/test/resources/large-file.pdf b/modules/files/src/test/resources/large-file.pdf new file mode 100644 index 00000000..a1d5fac1 Binary files /dev/null and b/modules/files/src/test/resources/large-file.pdf differ diff --git a/modules/joex/src/main/scala/docspell/joex/process/ConvertPdf.scala b/modules/joex/src/main/scala/docspell/joex/process/ConvertPdf.scala index 572a18bb..17cca3e0 100644 --- a/modules/joex/src/main/scala/docspell/joex/process/ConvertPdf.scala +++ b/modules/joex/src/main/scala/docspell/joex/process/ConvertPdf.scala @@ -38,10 +38,20 @@ object ConvertPdf { item: ItemData ): Task[F, ProcessItemArgs, ItemData] = Task { ctx => - def convert(ra: RAttachment) = - findMime(ctx)(ra).flatMap(m => - convertSafe(cfg, JsoupSanitizer.clean, ctx, item)(ra, m) - ) + def convert(ra: RAttachment): F[(RAttachment, Option[RAttachmentMeta])] = + isConverted(ctx)(ra).flatMap { + case true => + ctx.logger.info( + s"Conversion to pdf already done for attachment ${ra.name}." + ) *> + ctx.store + .transact(RAttachmentMeta.findById(ra.id)) + .map(rmOpt => (ra, rmOpt)) + case false => + findMime(ctx)(ra).flatMap(m => + convertSafe(cfg, JsoupSanitizer.clean, ctx, item)(ra, m) + ) + } for { ras <- item.attachments.traverse(convert) @@ -51,6 +61,11 @@ object ConvertPdf { } + def isConverted[F[_]: Sync](ctx: Context[F, ProcessItemArgs])( + ra: RAttachment + ): F[Boolean] = + ctx.store.transact(RAttachmentSource.isConverted(ra.id)) + def findMime[F[_]: Functor](ctx: Context[F, _])(ra: RAttachment): F[Mimetype] = OptionT(ctx.store.transact(RFileMeta.findById(ra.fileId))) .map(_.mimetype) diff --git a/modules/joex/src/main/scala/docspell/joex/process/CreateItem.scala b/modules/joex/src/main/scala/docspell/joex/process/CreateItem.scala index def2e6f4..9bc85a33 100644 --- a/modules/joex/src/main/scala/docspell/joex/process/CreateItem.scala +++ b/modules/joex/src/main/scala/docspell/joex/process/CreateItem.scala @@ -1,5 +1,6 @@ package docspell.joex.process +import cats.data.NonEmptyList import cats.data.OptionT import cats.effect.Sync import cats.implicits._ @@ -125,21 +126,31 @@ object CreateItem { for { cand <- ctx.store.transact(QItem.findByFileIds(fileMetaIds.toSeq)) _ <- - if (cand.nonEmpty) ctx.logger.warn("Found existing item with these files.") + if (cand.nonEmpty) + ctx.logger.warn(s"Found ${cand.size} existing item with these files.") else ().pure[F] ht <- cand.drop(1).traverse(ri => QItem.delete(ctx.store)(ri.id, ri.cid)) _ <- if (ht.sum > 0) ctx.logger.warn(s"Removed ${ht.sum} items with same attachments") else ().pure[F] - rms <- OptionT( - //load attachments but only those mentioned in the task's arguments - cand.headOption.traverse(ri => - ctx.store - .transact(RAttachment.findByItemAndCollective(ri.id, ri.cid)) - .map(_.filter(r => fileMetaIds.contains(r.fileId))) + rms <- OptionT + .fromOption[F](NonEmptyList.fromList(fileMetaIds.toList)) + .flatMap(fids => + OptionT( + //load attachments but only those mentioned in the task's arguments + cand.headOption.traverse(ri => + ctx.store + .transact(RAttachment.findByItemCollectiveSource(ri.id, ri.cid, fids)) + .flatTap(ats => + ctx.logger.debug( + s"Found ${ats.size} attachments. Use only those from task args: ${fileMetaIds}" + ) + ) + ) + ) ) - ).getOrElse(Vector.empty) + .getOrElse(Vector.empty) orig <- rms.traverse(a => ctx.store.transact(RAttachmentSource.findById(a.id)).map(s => (a, s)) ) diff --git a/modules/joex/src/main/scala/docspell/joex/process/DuplicateCheck.scala b/modules/joex/src/main/scala/docspell/joex/process/DuplicateCheck.scala index 069bf9f5..5b30d3ab 100644 --- a/modules/joex/src/main/scala/docspell/joex/process/DuplicateCheck.scala +++ b/modules/joex/src/main/scala/docspell/joex/process/DuplicateCheck.scala @@ -7,6 +7,7 @@ import docspell.common._ import docspell.joex.scheduler.{Context, Task} import docspell.store.queries.QItem import docspell.store.records.RFileMeta +import docspell.store.records.RJob import bitpeace.FileMeta import doobie._ @@ -17,7 +18,13 @@ object DuplicateCheck { def apply[F[_]: Sync]: Task[F, Args, Args] = Task { ctx => if (ctx.args.meta.skipDuplicate) - ctx.logger.debug("Checking for duplicate files") *> removeDuplicates(ctx) + for { + retries <- getRetryCount(ctx) + res <- + if (retries == 0) + ctx.logger.debug("Checking for duplicate files") *> removeDuplicates(ctx) + else ctx.args.pure[F] + } yield res else ctx.logger.debug("Not checking for duplicates") *> ctx.args.pure[F] } @@ -30,6 +37,9 @@ object DuplicateCheck { ctx.args.files.filterNot(f => ids.contains(f.fileMetaId.id)) ) + private def getRetryCount[F[_]: Sync](ctx: Context[F, Args]): F[Int] = + ctx.store.transact(RJob.getRetries(ctx.jobId)).map(_.getOrElse(0)) + private def deleteDuplicate[F[_]: Sync]( ctx: Context[F, Args] )(fd: FileMetaDupes): F[Unit] = { diff --git a/modules/joex/src/main/scala/docspell/joex/process/TextExtraction.scala b/modules/joex/src/main/scala/docspell/joex/process/TextExtraction.scala index 89bb1f61..112034a4 100644 --- a/modules/joex/src/main/scala/docspell/joex/process/TextExtraction.scala +++ b/modules/joex/src/main/scala/docspell/joex/process/TextExtraction.scala @@ -20,7 +20,9 @@ object TextExtraction { ): Task[F, ProcessItemArgs, ItemData] = Task { ctx => for { - _ <- ctx.logger.info("Starting text extraction") + _ <- ctx.logger.info( + s"Starting text extraction for ${item.attachments.size} files" + ) start <- Duration.stopTime[F] txt <- item.attachments.traverse( extractTextIfEmpty( @@ -31,9 +33,10 @@ object TextExtraction { item ) ) - _ <- ctx.logger.debug("Storing extracted texts") + _ <- ctx.logger.debug("Storing extracted texts …") _ <- txt.toList.traverse(res => ctx.store.transact(RAttachmentMeta.upsert(res.am))) + _ <- ctx.logger.debug(s"Extracted text stored.") idxItem = TextData.item( item.item.id, ctx.args.meta.collective, diff --git a/modules/store/src/main/resources/db/migration/mariadb/V1.9.2__fix_text_length.sql b/modules/store/src/main/resources/db/migration/mariadb/V1.9.2__fix_text_length.sql new file mode 100644 index 00000000..07f4460c --- /dev/null +++ b/modules/store/src/main/resources/db/migration/mariadb/V1.9.2__fix_text_length.sql @@ -0,0 +1,14 @@ +ALTER TABLE `attachmentmeta` +MODIFY COLUMN `content` longtext; + +ALTER TABLE `attachmentmeta` +MODIFY COLUMN `nerlabels` longtext; + +ALTER TABLE `attachmentmeta` +MODIFY COLUMN `itemproposals` longtext; + +ALTER TABLE `job` +MODIFY COLUMN `args` mediumtext; + +ALTER TABLE `joblog` +MODIFY COLUMN `message` mediumtext; diff --git a/modules/store/src/main/scala/docspell/store/impl/Column.scala b/modules/store/src/main/scala/docspell/store/impl/Column.scala index 578dd213..2357664b 100644 --- a/modules/store/src/main/scala/docspell/store/impl/Column.scala +++ b/modules/store/src/main/scala/docspell/store/impl/Column.scala @@ -44,6 +44,9 @@ case class Column(name: String, ns: String = "", alias: String = "") { def isNot[A: Put](value: A): Fragment = f ++ fr"<> $value" + def isNot(c: Column): Fragment = + f ++ fr"<>" ++ c.f + def isNull: Fragment = f ++ fr"is null" diff --git a/modules/store/src/main/scala/docspell/store/records/RAttachment.scala b/modules/store/src/main/scala/docspell/store/records/RAttachment.scala index 50c4bc51..334ac711 100644 --- a/modules/store/src/main/scala/docspell/store/records/RAttachment.scala +++ b/modules/store/src/main/scala/docspell/store/records/RAttachment.scala @@ -1,5 +1,6 @@ package docspell.store.records +import cats.data.NonEmptyList import cats.implicits._ import fs2.Stream @@ -158,6 +159,36 @@ object RAttachment { q.query[RAttachment].to[Vector] } + def findByItemCollectiveSource( + id: Ident, + coll: Ident, + fileIds: NonEmptyList[Ident] + ): ConnectionIO[Vector[RAttachment]] = { + + val iId = RItem.Columns.id.prefix("i") + val iColl = RItem.Columns.cid.prefix("i") + val aItem = Columns.itemId.prefix("a") + val aId = Columns.id.prefix("a") + val aFile = Columns.fileId.prefix("a") + val sId = RAttachmentSource.Columns.id.prefix("s") + val sFile = RAttachmentSource.Columns.fileId.prefix("s") + val rId = RAttachmentArchive.Columns.id.prefix("r") + val rFile = RAttachmentArchive.Columns.fileId.prefix("r") + + val from = table ++ fr"a INNER JOIN" ++ + RItem.table ++ fr"i ON" ++ iId.is(aItem) ++ fr"LEFT JOIN" ++ + RAttachmentSource.table ++ fr"s ON" ++ sId.is(aId) ++ fr"LEFT JOIN" ++ + RAttachmentArchive.table ++ fr"r ON" ++ rId.is(aId) + + val cond = and( + iId.is(id), + iColl.is(coll), + or(aFile.isIn(fileIds), sFile.isIn(fileIds), rFile.isIn(fileIds)) + ) + + selectSimple(all.map(_.prefix("a")), from, cond).query[RAttachment].to[Vector] + } + def findByItemAndCollectiveWithMeta( id: Ident, coll: Ident diff --git a/modules/store/src/main/scala/docspell/store/records/RAttachmentMeta.scala b/modules/store/src/main/scala/docspell/store/records/RAttachmentMeta.scala index 72223180..d1cb79ea 100644 --- a/modules/store/src/main/scala/docspell/store/records/RAttachmentMeta.scala +++ b/modules/store/src/main/scala/docspell/store/records/RAttachmentMeta.scala @@ -46,6 +46,9 @@ object RAttachmentMeta { def exists(attachId: Ident): ConnectionIO[Boolean] = selectCount(id, table, id.is(attachId)).query[Int].unique.map(_ > 0) + def findById(attachId: Ident): ConnectionIO[Option[RAttachmentMeta]] = + selectSimple(all, table, id.is(attachId)).query[RAttachmentMeta].option + def upsert(v: RAttachmentMeta): ConnectionIO[Int] = for { n0 <- update(v) diff --git a/modules/store/src/main/scala/docspell/store/records/RAttachmentSource.scala b/modules/store/src/main/scala/docspell/store/records/RAttachmentSource.scala index d732ecff..f67a805f 100644 --- a/modules/store/src/main/scala/docspell/store/records/RAttachmentSource.scala +++ b/modules/store/src/main/scala/docspell/store/records/RAttachmentSource.scala @@ -48,6 +48,21 @@ object RAttachmentSource { .unique .map(_ > 0) + def isConverted(attachId: Ident): ConnectionIO[Boolean] = { + val sId = Columns.id.prefix("s") + val sFile = Columns.fileId.prefix("s") + val aId = RAttachment.Columns.id.prefix("a") + val aFile = RAttachment.Columns.fileId.prefix("a") + + val from = table ++ fr"s INNER JOIN" ++ + RAttachment.table ++ fr"a ON" ++ aId.is(sId) + + selectCount(aId, from, and(aId.is(attachId), aFile.isNot(sFile))) + .query[Int] + .unique + .map(_ > 0) + } + def delete(attachId: Ident): ConnectionIO[Int] = deleteFrom(table, id.is(attachId)).update.run