From 4694433e38b656c383ba658066750c8ba3bb9904 Mon Sep 17 00:00:00 2001 From: Eike Kettner Date: Sun, 24 May 2020 14:58:18 +0200 Subject: [PATCH] Fix attachment positions It worked for new items, because the implicit offset was 0. when adding archives to existing items, there are already attachments and the new attachments are added to the end. This won't work if files are added concurrently, because there is no quick and reliable way to determine the offset then. --- .../docspell/joex/process/CreateItem.scala | 2 +- .../joex/process/ExtractArchive.scala | 118 +++++++++++++----- .../scala/docspell/store/impl/Column.scala | 2 + .../docspell/store/records/RAttachment.scala | 6 +- 4 files changed, 95 insertions(+), 33 deletions(-) diff --git a/modules/joex/src/main/scala/docspell/joex/process/CreateItem.scala b/modules/joex/src/main/scala/docspell/joex/process/CreateItem.scala index 8ef30826..0a06b421 100644 --- a/modules/joex/src/main/scala/docspell/joex/process/CreateItem.scala +++ b/modules/joex/src/main/scala/docspell/joex/process/CreateItem.scala @@ -32,7 +32,7 @@ object CreateItem { def fileMetas(itemId: Ident, now: Timestamp) = Stream - .eval(ctx.store.transact(RAttachment.countOnItem(itemId))) + .eval(ctx.store.transact(RAttachment.nextPosition(itemId))) .flatMap { offset => Stream .emits(ctx.args.files) diff --git a/modules/joex/src/main/scala/docspell/joex/process/ExtractArchive.scala b/modules/joex/src/main/scala/docspell/joex/process/ExtractArchive.scala index d3a156ff..06cbba72 100644 --- a/modules/joex/src/main/scala/docspell/joex/process/ExtractArchive.scala +++ b/modules/joex/src/main/scala/docspell/joex/process/ExtractArchive.scala @@ -13,6 +13,8 @@ import docspell.store.records._ import docspell.files.Zip import cats.kernel.Monoid import emil.Mail +import cats.kernel.Order +import cats.data.NonEmptyList /** Goes through all attachments and extracts archive files, like zip * files. The process is recursive, until all archives have been @@ -46,24 +48,37 @@ object ExtractArchive { archive: Option[RAttachmentArchive] ): Task[F, ProcessItemArgs, (Option[RAttachmentArchive], ItemData)] = Task { ctx => - def extract(ra: RAttachment) = - findMime(ctx)(ra).flatMap(m => extractSafe(ctx, archive)(ra, m)) + def extract(ra: RAttachment, pos: Int): F[Extracted] = + findMime(ctx)(ra).flatMap(m => extractSafe(ctx, archive)(ra, pos, m)) for { - ras <- item.attachments.traverse(extract) - lastPos <- ctx.store.transact(RAttachment.countOnItem(item.item.id)) - nra = - ras.flatMap(_.files).zipWithIndex.map(t => t._1.copy(position = lastPos + t._2)) - _ <- nra.traverse(storeAttachment(ctx)) - naa = ras.flatMap(_.archives) + lastPos <- ctx.store.transact(RAttachment.nextPosition(item.item.id)) + extracts <- + item.attachments.zipWithIndex + .traverse(t => extract(t._1, lastPos + t._2)) + .map(Monoid[Extracted].combineAll) + .map(fixPositions) + nra = extracts.files + _ <- extracts.files.traverse(storeAttachment(ctx)) + naa = extracts.archives _ <- naa.traverse(storeArchive(ctx)) } yield naa.headOption -> item.copy( attachments = nra, originFile = item.originFile ++ nra.map(a => a.id -> a.fileId).toMap, - givenMeta = item.givenMeta.fillEmptyFrom(Monoid[Extracted].combineAll(ras).meta) + givenMeta = item.givenMeta.fillEmptyFrom(extracts.meta) ) } + /** After all files have been extracted, the `extract' contains the + * whole (combined) result. This fixes positions of the attachments + * such that the elements of an archive are "spliced" into the + * attachment list at the position of the archive. If there is no + * archive, positions don't need to be fixed. + */ + private def fixPositions(extract: Extracted): Extracted = + if (extract.archives.isEmpty) extract + else extract.updatePositions + def findMime[F[_]: Functor](ctx: Context[F, _])(ra: RAttachment): F[Mimetype] = OptionT(ctx.store.transact(RFileMeta.findById(ra.fileId))) .map(_.mimetype) @@ -72,21 +87,21 @@ object ExtractArchive { def extractSafe[F[_]: ConcurrentEffect: ContextShift]( ctx: Context[F, ProcessItemArgs], archive: Option[RAttachmentArchive] - )(ra: RAttachment, mime: Mimetype): F[Extracted] = + )(ra: RAttachment, pos: Int, mime: Mimetype): F[Extracted] = mime match { case Mimetype("application", "zip", _) if ra.name.exists(_.endsWith(".zip")) => ctx.logger.info(s"Extracting zip archive ${ra.name.getOrElse("")}.") *> - extractZip(ctx, archive)(ra) + extractZip(ctx, archive)(ra, pos) .flatTap(_ => cleanupParents(ctx, ra, archive)) case Mimetype("message", "rfc822", _) => ctx.logger.info(s"Reading e-mail ${ra.name.getOrElse("")}") *> - extractMail(ctx, archive)(ra) + extractMail(ctx, archive)(ra, pos) .flatTap(_ => cleanupParents(ctx, ra, archive)) case _ => ctx.logger.debug(s"Not an archive: ${mime.asString}") *> - Extracted.noArchive(ra).pure[F] + Extracted.noArchive(ra, pos, 0).pure[F] } def cleanupParents[F[_]: Sync]( @@ -116,7 +131,7 @@ object ExtractArchive { def extractZip[F[_]: ConcurrentEffect: ContextShift]( ctx: Context[F, _], archive: Option[RAttachmentArchive] - )(ra: RAttachment): F[Extracted] = { + )(ra: RAttachment, pos: Int): F[Extracted] = { val zipData = ctx.store.bitpeace .get(ra.fileId.id) .unNoneTerminate @@ -124,7 +139,8 @@ object ExtractArchive { zipData .through(Zip.unzipP[F](8192, ctx.blocker)) - .flatMap(handleEntry(ctx, ra, archive, None)) + .zipWithIndex + .flatMap(handleEntry(ctx, ra, pos, archive, None)) .foldMonoid .compile .lastOrError @@ -133,7 +149,7 @@ object ExtractArchive { def extractMail[F[_]: ConcurrentEffect: ContextShift]( ctx: Context[F, _], archive: Option[RAttachmentArchive] - )(ra: RAttachment): F[Extracted] = { + )(ra: RAttachment, pos: Int): F[Extracted] = { val email: Stream[F, Byte] = ctx.store.bitpeace .get(ra.fileId.id) .unNoneTerminate @@ -151,7 +167,8 @@ object ExtractArchive { ReadMail .mailToEntries(ctx.logger)(mail) - .flatMap(handleEntry(ctx, ra, archive, mId)) ++ Stream.eval(givenMeta) + .zipWithIndex + .flatMap(handleEntry(ctx, ra, pos, archive, mId)) ++ Stream.eval(givenMeta) } .foldMonoid .compile @@ -167,13 +184,15 @@ object ExtractArchive { def handleEntry[F[_]: Sync]( ctx: Context[F, _], ra: RAttachment, + pos: Int, archive: Option[RAttachmentArchive], messageId: Option[String] )( - entry: Binary[F] + tentry: (Binary[F], Long) ): Stream[F, Extracted] = { - val mimeHint = MimetypeHint.filename(entry.name).withAdvertised(entry.mime.asString) - val fileMeta = ctx.store.bitpeace.saveNew(entry.data, 8192, mimeHint) + val (entry, subPos) = tentry + val mimeHint = MimetypeHint.filename(entry.name).withAdvertised(entry.mime.asString) + val fileMeta = ctx.store.bitpeace.saveNew(entry.data, 8192, mimeHint) Stream.eval(ctx.logger.debug(s"Extracted ${entry.name}. Storing as attachment.")) >> fileMeta.evalMap { fm => Ident.randomId.map { id => @@ -181,12 +200,12 @@ object ExtractArchive { id, ra.itemId, Ident.unsafe(fm.id), - 0, //position is updated afterwards + pos, ra.created, Option(entry.name).map(_.trim).filter(_.nonEmpty) ) val aa = archive.getOrElse(RAttachmentArchive.of(ra, messageId)).copy(id = id) - Extracted.of(nra, aa) + Extracted.of(nra, aa, pos, subPos.toInt) } } @@ -206,28 +225,67 @@ object ExtractArchive { case class Extracted( files: Vector[RAttachment], archives: Vector[RAttachmentArchive], - meta: MetaProposalList + meta: MetaProposalList, + positions: List[Extracted.Pos] ) { def ++(e: Extracted) = - Extracted(files ++ e.files, archives ++ e.archives, meta.fillEmptyFrom(e.meta)) + Extracted( + files ++ e.files, + archives ++ e.archives, + meta.fillEmptyFrom(e.meta), + positions ++ e.positions + ) def setMeta(m: MetaProposal): Extracted = setMeta(MetaProposalList.of(m)) def setMeta(ml: MetaProposalList): Extracted = - Extracted(files, archives, meta.fillEmptyFrom(ml)) + Extracted(files, archives, meta.fillEmptyFrom(ml), positions) + + def updatePositions: Extracted = + NonEmptyList.fromList(positions) match { + case None => + this + case Some(nel) => + val sorted = nel.sorted + println(s"---------------------------- $sorted ") + val offset = sorted.head.first + val pos = + sorted.zipWithIndex.map({ case (p, i) => p.id -> (i + offset) }).toList.toMap + val nf = + files.map(f => pos.get(f.id).map(n => f.copy(position = n)).getOrElse(f)) + copy(files = nf) + } } object Extracted { - val empty = Extracted(Vector.empty, Vector.empty, MetaProposalList.empty) + val empty = + Extracted(Vector.empty, Vector.empty, MetaProposalList.empty, Nil) - def noArchive(ra: RAttachment): Extracted = - Extracted(Vector(ra), Vector.empty, MetaProposalList.empty) + def noArchive(ra: RAttachment, pos: Int, subPos: Int): Extracted = + Extracted( + Vector(ra), + Vector.empty, + MetaProposalList.empty, + List(Pos(ra.id, pos, subPos)) + ) - def of(ra: RAttachment, aa: RAttachmentArchive): Extracted = - Extracted(Vector(ra), Vector(aa), MetaProposalList.empty) + def of(ra: RAttachment, aa: RAttachmentArchive, pos: Int, subPos: Int): Extracted = + Extracted( + Vector(ra), + Vector(aa), + MetaProposalList.empty, + List(Pos(ra.id, pos, subPos)) + ) implicit val extractedMonoid: Monoid[Extracted] = Monoid.instance(empty, _ ++ _) + + case class Pos(id: Ident, first: Int, second: Int) + + object Pos { + implicit val ordering: Order[Pos] = + Order.whenEqual(Order.by(_.first), Order.by(_.second)) + } } } diff --git a/modules/store/src/main/scala/docspell/store/impl/Column.scala b/modules/store/src/main/scala/docspell/store/impl/Column.scala index d84ed3cf..0cdc0be3 100644 --- a/modules/store/src/main/scala/docspell/store/impl/Column.scala +++ b/modules/store/src/main/scala/docspell/store/impl/Column.scala @@ -101,4 +101,6 @@ case class Column(name: String, ns: String = "", alias: String = "") { def asc: Fragment = f ++ fr"asc" + def max: Fragment = + fr"MAX(" ++ f ++ fr")" } diff --git a/modules/store/src/main/scala/docspell/store/records/RAttachment.scala b/modules/store/src/main/scala/docspell/store/records/RAttachment.scala index dbb5dc16..b997bb5e 100644 --- a/modules/store/src/main/scala/docspell/store/records/RAttachment.scala +++ b/modules/store/src/main/scala/docspell/store/records/RAttachment.scala @@ -38,8 +38,10 @@ object RAttachment { fr"${v.id},${v.itemId},${v.fileId.id},${v.position},${v.created},${v.name}" ).update.run - def countOnItem(id: Ident): ConnectionIO[Int] = - selectCount(itemId, table, itemId.is(id)).query[Int].unique + def nextPosition(id: Ident): ConnectionIO[Int] = + for { + max <- selectSimple(position.max, table, itemId.is(id)).query[Option[Int]].unique + } yield max.map(_ + 1).getOrElse(0) def updateFileIdAndName( attachId: Ident,