Fix attachment positions

It worked for new items, because the implicit offset was 0. when
adding archives to existing items, there are already attachments and
the new attachments are added to the end. This won't work if files are
added concurrently, because there is no quick and reliable way to
determine the offset then.
This commit is contained in:
Eike Kettner 2020-05-24 14:58:18 +02:00
parent 1dde43e092
commit 4694433e38
4 changed files with 95 additions and 33 deletions

View File

@ -32,7 +32,7 @@ object CreateItem {
def fileMetas(itemId: Ident, now: Timestamp) = def fileMetas(itemId: Ident, now: Timestamp) =
Stream Stream
.eval(ctx.store.transact(RAttachment.countOnItem(itemId))) .eval(ctx.store.transact(RAttachment.nextPosition(itemId)))
.flatMap { offset => .flatMap { offset =>
Stream Stream
.emits(ctx.args.files) .emits(ctx.args.files)

View File

@ -13,6 +13,8 @@ import docspell.store.records._
import docspell.files.Zip import docspell.files.Zip
import cats.kernel.Monoid import cats.kernel.Monoid
import emil.Mail import emil.Mail
import cats.kernel.Order
import cats.data.NonEmptyList
/** Goes through all attachments and extracts archive files, like zip /** Goes through all attachments and extracts archive files, like zip
* files. The process is recursive, until all archives have been * files. The process is recursive, until all archives have been
@ -46,24 +48,37 @@ object ExtractArchive {
archive: Option[RAttachmentArchive] archive: Option[RAttachmentArchive]
): Task[F, ProcessItemArgs, (Option[RAttachmentArchive], ItemData)] = ): Task[F, ProcessItemArgs, (Option[RAttachmentArchive], ItemData)] =
Task { ctx => Task { ctx =>
def extract(ra: RAttachment) = def extract(ra: RAttachment, pos: Int): F[Extracted] =
findMime(ctx)(ra).flatMap(m => extractSafe(ctx, archive)(ra, m)) findMime(ctx)(ra).flatMap(m => extractSafe(ctx, archive)(ra, pos, m))
for { for {
ras <- item.attachments.traverse(extract) lastPos <- ctx.store.transact(RAttachment.nextPosition(item.item.id))
lastPos <- ctx.store.transact(RAttachment.countOnItem(item.item.id)) extracts <-
nra = item.attachments.zipWithIndex
ras.flatMap(_.files).zipWithIndex.map(t => t._1.copy(position = lastPos + t._2)) .traverse(t => extract(t._1, lastPos + t._2))
_ <- nra.traverse(storeAttachment(ctx)) .map(Monoid[Extracted].combineAll)
naa = ras.flatMap(_.archives) .map(fixPositions)
nra = extracts.files
_ <- extracts.files.traverse(storeAttachment(ctx))
naa = extracts.archives
_ <- naa.traverse(storeArchive(ctx)) _ <- naa.traverse(storeArchive(ctx))
} yield naa.headOption -> item.copy( } yield naa.headOption -> item.copy(
attachments = nra, attachments = nra,
originFile = item.originFile ++ nra.map(a => a.id -> a.fileId).toMap, originFile = item.originFile ++ nra.map(a => a.id -> a.fileId).toMap,
givenMeta = item.givenMeta.fillEmptyFrom(Monoid[Extracted].combineAll(ras).meta) givenMeta = item.givenMeta.fillEmptyFrom(extracts.meta)
) )
} }
/** After all files have been extracted, the `extract' contains the
* whole (combined) result. This fixes positions of the attachments
* such that the elements of an archive are "spliced" into the
* attachment list at the position of the archive. If there is no
* archive, positions don't need to be fixed.
*/
private def fixPositions(extract: Extracted): Extracted =
if (extract.archives.isEmpty) extract
else extract.updatePositions
def findMime[F[_]: Functor](ctx: Context[F, _])(ra: RAttachment): F[Mimetype] = def findMime[F[_]: Functor](ctx: Context[F, _])(ra: RAttachment): F[Mimetype] =
OptionT(ctx.store.transact(RFileMeta.findById(ra.fileId))) OptionT(ctx.store.transact(RFileMeta.findById(ra.fileId)))
.map(_.mimetype) .map(_.mimetype)
@ -72,21 +87,21 @@ object ExtractArchive {
def extractSafe[F[_]: ConcurrentEffect: ContextShift]( def extractSafe[F[_]: ConcurrentEffect: ContextShift](
ctx: Context[F, ProcessItemArgs], ctx: Context[F, ProcessItemArgs],
archive: Option[RAttachmentArchive] archive: Option[RAttachmentArchive]
)(ra: RAttachment, mime: Mimetype): F[Extracted] = )(ra: RAttachment, pos: Int, mime: Mimetype): F[Extracted] =
mime match { mime match {
case Mimetype("application", "zip", _) if ra.name.exists(_.endsWith(".zip")) => case Mimetype("application", "zip", _) if ra.name.exists(_.endsWith(".zip")) =>
ctx.logger.info(s"Extracting zip archive ${ra.name.getOrElse("<noname>")}.") *> ctx.logger.info(s"Extracting zip archive ${ra.name.getOrElse("<noname>")}.") *>
extractZip(ctx, archive)(ra) extractZip(ctx, archive)(ra, pos)
.flatTap(_ => cleanupParents(ctx, ra, archive)) .flatTap(_ => cleanupParents(ctx, ra, archive))
case Mimetype("message", "rfc822", _) => case Mimetype("message", "rfc822", _) =>
ctx.logger.info(s"Reading e-mail ${ra.name.getOrElse("<noname>")}") *> ctx.logger.info(s"Reading e-mail ${ra.name.getOrElse("<noname>")}") *>
extractMail(ctx, archive)(ra) extractMail(ctx, archive)(ra, pos)
.flatTap(_ => cleanupParents(ctx, ra, archive)) .flatTap(_ => cleanupParents(ctx, ra, archive))
case _ => case _ =>
ctx.logger.debug(s"Not an archive: ${mime.asString}") *> ctx.logger.debug(s"Not an archive: ${mime.asString}") *>
Extracted.noArchive(ra).pure[F] Extracted.noArchive(ra, pos, 0).pure[F]
} }
def cleanupParents[F[_]: Sync]( def cleanupParents[F[_]: Sync](
@ -116,7 +131,7 @@ object ExtractArchive {
def extractZip[F[_]: ConcurrentEffect: ContextShift]( def extractZip[F[_]: ConcurrentEffect: ContextShift](
ctx: Context[F, _], ctx: Context[F, _],
archive: Option[RAttachmentArchive] archive: Option[RAttachmentArchive]
)(ra: RAttachment): F[Extracted] = { )(ra: RAttachment, pos: Int): F[Extracted] = {
val zipData = ctx.store.bitpeace val zipData = ctx.store.bitpeace
.get(ra.fileId.id) .get(ra.fileId.id)
.unNoneTerminate .unNoneTerminate
@ -124,7 +139,8 @@ object ExtractArchive {
zipData zipData
.through(Zip.unzipP[F](8192, ctx.blocker)) .through(Zip.unzipP[F](8192, ctx.blocker))
.flatMap(handleEntry(ctx, ra, archive, None)) .zipWithIndex
.flatMap(handleEntry(ctx, ra, pos, archive, None))
.foldMonoid .foldMonoid
.compile .compile
.lastOrError .lastOrError
@ -133,7 +149,7 @@ object ExtractArchive {
def extractMail[F[_]: ConcurrentEffect: ContextShift]( def extractMail[F[_]: ConcurrentEffect: ContextShift](
ctx: Context[F, _], ctx: Context[F, _],
archive: Option[RAttachmentArchive] archive: Option[RAttachmentArchive]
)(ra: RAttachment): F[Extracted] = { )(ra: RAttachment, pos: Int): F[Extracted] = {
val email: Stream[F, Byte] = ctx.store.bitpeace val email: Stream[F, Byte] = ctx.store.bitpeace
.get(ra.fileId.id) .get(ra.fileId.id)
.unNoneTerminate .unNoneTerminate
@ -151,7 +167,8 @@ object ExtractArchive {
ReadMail ReadMail
.mailToEntries(ctx.logger)(mail) .mailToEntries(ctx.logger)(mail)
.flatMap(handleEntry(ctx, ra, archive, mId)) ++ Stream.eval(givenMeta) .zipWithIndex
.flatMap(handleEntry(ctx, ra, pos, archive, mId)) ++ Stream.eval(givenMeta)
} }
.foldMonoid .foldMonoid
.compile .compile
@ -167,13 +184,15 @@ object ExtractArchive {
def handleEntry[F[_]: Sync]( def handleEntry[F[_]: Sync](
ctx: Context[F, _], ctx: Context[F, _],
ra: RAttachment, ra: RAttachment,
pos: Int,
archive: Option[RAttachmentArchive], archive: Option[RAttachmentArchive],
messageId: Option[String] messageId: Option[String]
)( )(
entry: Binary[F] tentry: (Binary[F], Long)
): Stream[F, Extracted] = { ): Stream[F, Extracted] = {
val mimeHint = MimetypeHint.filename(entry.name).withAdvertised(entry.mime.asString) val (entry, subPos) = tentry
val fileMeta = ctx.store.bitpeace.saveNew(entry.data, 8192, mimeHint) val mimeHint = MimetypeHint.filename(entry.name).withAdvertised(entry.mime.asString)
val fileMeta = ctx.store.bitpeace.saveNew(entry.data, 8192, mimeHint)
Stream.eval(ctx.logger.debug(s"Extracted ${entry.name}. Storing as attachment.")) >> Stream.eval(ctx.logger.debug(s"Extracted ${entry.name}. Storing as attachment.")) >>
fileMeta.evalMap { fm => fileMeta.evalMap { fm =>
Ident.randomId.map { id => Ident.randomId.map { id =>
@ -181,12 +200,12 @@ object ExtractArchive {
id, id,
ra.itemId, ra.itemId,
Ident.unsafe(fm.id), Ident.unsafe(fm.id),
0, //position is updated afterwards pos,
ra.created, ra.created,
Option(entry.name).map(_.trim).filter(_.nonEmpty) Option(entry.name).map(_.trim).filter(_.nonEmpty)
) )
val aa = archive.getOrElse(RAttachmentArchive.of(ra, messageId)).copy(id = id) val aa = archive.getOrElse(RAttachmentArchive.of(ra, messageId)).copy(id = id)
Extracted.of(nra, aa) Extracted.of(nra, aa, pos, subPos.toInt)
} }
} }
@ -206,28 +225,67 @@ object ExtractArchive {
case class Extracted( case class Extracted(
files: Vector[RAttachment], files: Vector[RAttachment],
archives: Vector[RAttachmentArchive], archives: Vector[RAttachmentArchive],
meta: MetaProposalList meta: MetaProposalList,
positions: List[Extracted.Pos]
) { ) {
def ++(e: Extracted) = def ++(e: Extracted) =
Extracted(files ++ e.files, archives ++ e.archives, meta.fillEmptyFrom(e.meta)) Extracted(
files ++ e.files,
archives ++ e.archives,
meta.fillEmptyFrom(e.meta),
positions ++ e.positions
)
def setMeta(m: MetaProposal): Extracted = def setMeta(m: MetaProposal): Extracted =
setMeta(MetaProposalList.of(m)) setMeta(MetaProposalList.of(m))
def setMeta(ml: MetaProposalList): Extracted = def setMeta(ml: MetaProposalList): Extracted =
Extracted(files, archives, meta.fillEmptyFrom(ml)) Extracted(files, archives, meta.fillEmptyFrom(ml), positions)
def updatePositions: Extracted =
NonEmptyList.fromList(positions) match {
case None =>
this
case Some(nel) =>
val sorted = nel.sorted
println(s"---------------------------- $sorted ")
val offset = sorted.head.first
val pos =
sorted.zipWithIndex.map({ case (p, i) => p.id -> (i + offset) }).toList.toMap
val nf =
files.map(f => pos.get(f.id).map(n => f.copy(position = n)).getOrElse(f))
copy(files = nf)
}
} }
object Extracted { object Extracted {
val empty = Extracted(Vector.empty, Vector.empty, MetaProposalList.empty) val empty =
Extracted(Vector.empty, Vector.empty, MetaProposalList.empty, Nil)
def noArchive(ra: RAttachment): Extracted = def noArchive(ra: RAttachment, pos: Int, subPos: Int): Extracted =
Extracted(Vector(ra), Vector.empty, MetaProposalList.empty) Extracted(
Vector(ra),
Vector.empty,
MetaProposalList.empty,
List(Pos(ra.id, pos, subPos))
)
def of(ra: RAttachment, aa: RAttachmentArchive): Extracted = def of(ra: RAttachment, aa: RAttachmentArchive, pos: Int, subPos: Int): Extracted =
Extracted(Vector(ra), Vector(aa), MetaProposalList.empty) Extracted(
Vector(ra),
Vector(aa),
MetaProposalList.empty,
List(Pos(ra.id, pos, subPos))
)
implicit val extractedMonoid: Monoid[Extracted] = implicit val extractedMonoid: Monoid[Extracted] =
Monoid.instance(empty, _ ++ _) Monoid.instance(empty, _ ++ _)
case class Pos(id: Ident, first: Int, second: Int)
object Pos {
implicit val ordering: Order[Pos] =
Order.whenEqual(Order.by(_.first), Order.by(_.second))
}
} }
} }

View File

@ -101,4 +101,6 @@ case class Column(name: String, ns: String = "", alias: String = "") {
def asc: Fragment = def asc: Fragment =
f ++ fr"asc" f ++ fr"asc"
def max: Fragment =
fr"MAX(" ++ f ++ fr")"
} }

View File

@ -38,8 +38,10 @@ object RAttachment {
fr"${v.id},${v.itemId},${v.fileId.id},${v.position},${v.created},${v.name}" fr"${v.id},${v.itemId},${v.fileId.id},${v.position},${v.created},${v.name}"
).update.run ).update.run
def countOnItem(id: Ident): ConnectionIO[Int] = def nextPosition(id: Ident): ConnectionIO[Int] =
selectCount(itemId, table, itemId.is(id)).query[Int].unique for {
max <- selectSimple(position.max, table, itemId.is(id)).query[Option[Int]].unique
} yield max.map(_ + 1).getOrElse(0)
def updateFileIdAndName( def updateFileIdAndName(
attachId: Ident, attachId: Ident,