mirror of
				https://github.com/TheAnachronism/docspell.git
				synced 2025-10-31 17:50:11 +00:00 
			
		
		
		
	Fix finding attachments for retries
The attachments to process again must be searched in sources and archives, too.
This commit is contained in:
		| @@ -1,5 +1,6 @@ | ||||
| package docspell.joex.process | ||||
|  | ||||
| import cats.data.NonEmptyList | ||||
| import cats.data.OptionT | ||||
| import cats.effect.Sync | ||||
| import cats.implicits._ | ||||
| @@ -125,21 +126,31 @@ object CreateItem { | ||||
|       for { | ||||
|         cand <- ctx.store.transact(QItem.findByFileIds(fileMetaIds.toSeq)) | ||||
|         _ <- | ||||
|           if (cand.nonEmpty) ctx.logger.warn("Found existing item with these files.") | ||||
|           if (cand.nonEmpty) | ||||
|             ctx.logger.warn(s"Found ${cand.size} existing item with these files.") | ||||
|           else ().pure[F] | ||||
|         ht <- cand.drop(1).traverse(ri => QItem.delete(ctx.store)(ri.id, ri.cid)) | ||||
|         _ <- | ||||
|           if (ht.sum > 0) | ||||
|             ctx.logger.warn(s"Removed ${ht.sum} items with same attachments") | ||||
|           else ().pure[F] | ||||
|         rms <- OptionT( | ||||
|           //load attachments but only those mentioned in the task's arguments | ||||
|           cand.headOption.traverse(ri => | ||||
|             ctx.store | ||||
|               .transact(RAttachment.findByItemAndCollective(ri.id, ri.cid)) | ||||
|               .map(_.filter(r => fileMetaIds.contains(r.fileId))) | ||||
|         rms <- OptionT | ||||
|           .fromOption[F](NonEmptyList.fromList(fileMetaIds.toList)) | ||||
|           .flatMap(fids => | ||||
|             OptionT( | ||||
|               //load attachments but only those mentioned in the task's arguments | ||||
|               cand.headOption.traverse(ri => | ||||
|                 ctx.store | ||||
|                   .transact(RAttachment.findByItemCollectiveSource(ri.id, ri.cid, fids)) | ||||
|                   .flatTap(ats => | ||||
|                     ctx.logger.debug( | ||||
|                       s"Found ${ats.size} attachments. Use only those from task args: ${fileMetaIds}" | ||||
|                     ) | ||||
|                   ) | ||||
|               ) | ||||
|             ) | ||||
|           ) | ||||
|         ).getOrElse(Vector.empty) | ||||
|           .getOrElse(Vector.empty) | ||||
|         orig <- rms.traverse(a => | ||||
|           ctx.store.transact(RAttachmentSource.findById(a.id)).map(s => (a, s)) | ||||
|         ) | ||||
|   | ||||
| @@ -20,7 +20,9 @@ object TextExtraction { | ||||
|   ): Task[F, ProcessItemArgs, ItemData] = | ||||
|     Task { ctx => | ||||
|       for { | ||||
|         _     <- ctx.logger.info("Starting text extraction") | ||||
|         _ <- ctx.logger.info( | ||||
|           s"Starting text extraction for ${item.attachments.size} files" | ||||
|         ) | ||||
|         start <- Duration.stopTime[F] | ||||
|         txt <- item.attachments.traverse( | ||||
|           extractTextIfEmpty( | ||||
| @@ -31,9 +33,10 @@ object TextExtraction { | ||||
|             item | ||||
|           ) | ||||
|         ) | ||||
|         _ <- ctx.logger.debug("Storing extracted texts") | ||||
|         _ <- ctx.logger.debug("Storing extracted texts …") | ||||
|         _ <- | ||||
|           txt.toList.traverse(res => ctx.store.transact(RAttachmentMeta.upsert(res.am))) | ||||
|         _ <- ctx.logger.debug(s"Extracted text stored.") | ||||
|         idxItem = TextData.item( | ||||
|           item.item.id, | ||||
|           ctx.args.meta.collective, | ||||
|   | ||||
| @@ -1,5 +1,6 @@ | ||||
| package docspell.store.records | ||||
|  | ||||
| import cats.data.NonEmptyList | ||||
| import cats.implicits._ | ||||
| import fs2.Stream | ||||
|  | ||||
| @@ -158,6 +159,36 @@ object RAttachment { | ||||
|     q.query[RAttachment].to[Vector] | ||||
|   } | ||||
|  | ||||
|   def findByItemCollectiveSource( | ||||
|       id: Ident, | ||||
|       coll: Ident, | ||||
|       fileIds: NonEmptyList[Ident] | ||||
|   ): ConnectionIO[Vector[RAttachment]] = { | ||||
|  | ||||
|     val iId   = RItem.Columns.id.prefix("i") | ||||
|     val iColl = RItem.Columns.cid.prefix("i") | ||||
|     val aItem = Columns.itemId.prefix("a") | ||||
|     val aId   = Columns.id.prefix("a") | ||||
|     val aFile = Columns.fileId.prefix("a") | ||||
|     val sId   = RAttachmentSource.Columns.id.prefix("s") | ||||
|     val sFile = RAttachmentSource.Columns.fileId.prefix("s") | ||||
|     val rId   = RAttachmentArchive.Columns.id.prefix("r") | ||||
|     val rFile = RAttachmentArchive.Columns.fileId.prefix("r") | ||||
|  | ||||
|     val from = table ++ fr"a INNER JOIN" ++ | ||||
|       RItem.table ++ fr"i ON" ++ iId.is(aItem) ++ fr"LEFT JOIN" ++ | ||||
|       RAttachmentSource.table ++ fr"s ON" ++ sId.is(aId) ++ fr"LEFT JOIN" ++ | ||||
|       RAttachmentArchive.table ++ fr"r ON" ++ rId.is(aId) | ||||
|  | ||||
|     val cond = and( | ||||
|       iId.is(id), | ||||
|       iColl.is(coll), | ||||
|       or(aFile.isIn(fileIds), sFile.isIn(fileIds), rFile.isIn(fileIds)) | ||||
|     ) | ||||
|  | ||||
|     selectSimple(all.map(_.prefix("a")), from, cond).query[RAttachment].to[Vector] | ||||
|   } | ||||
|  | ||||
|   def findByItemAndCollectiveWithMeta( | ||||
|       id: Ident, | ||||
|       coll: Ident | ||||
|   | ||||
		Reference in New Issue
	
	Block a user