From 09ea724c135fe281a8b66a9e101bd39722adc9dd Mon Sep 17 00:00:00 2001 From: Eike Kettner Date: Wed, 25 Mar 2020 21:58:54 +0100 Subject: [PATCH] Store message-id of eml files --- .../scala/docspell/joex/mail/ReadMail.scala | 30 +++++++++---------- .../joex/process/ExtractArchive.scala | 18 +++++++---- .../mariadb/V1.4.0__attachment_archive.sql | 4 +++ .../postgresql/V1.4.0__attachment_archive.sql | 4 +++ .../store/records/RAttachmentArchive.scala | 14 +++++---- 5 files changed, 44 insertions(+), 26 deletions(-) diff --git a/modules/joex/src/main/scala/docspell/joex/mail/ReadMail.scala b/modules/joex/src/main/scala/docspell/joex/mail/ReadMail.scala index 27539543..4528fa0a 100644 --- a/modules/joex/src/main/scala/docspell/joex/mail/ReadMail.scala +++ b/modules/joex/src/main/scala/docspell/joex/mail/ReadMail.scala @@ -20,12 +20,12 @@ object ReadMail { def readBytesP[F[_]: ConcurrentEffect: ContextShift]( logger: Logger[F] ): Pipe[F, Byte, Binary[F]] = - s => - Stream.eval(logger.debug(s"Converting e-mail into its parts")) >> - bytesToMail(s).flatMap(mailToEntries[F](logger)) + _.through(bytesToMail(logger)).flatMap(mailToEntries[F](logger)) - def bytesToMail[F[_]: Sync](data: Stream[F, Byte]): Stream[F, Mail[F]] = - data.through(Binary.decode(StandardCharsets.US_ASCII)).foldMonoid.evalMap(read[F]) + def bytesToMail[F[_]: Sync](logger: Logger[F]): Pipe[F, Byte, Mail[F]] = + s => + Stream.eval(logger.debug(s"Converting e-mail file...")) >> + s.through(Binary.decode(StandardCharsets.US_ASCII)).foldMonoid.evalMap(read[F]) def mailToEntries[F[_]: ConcurrentEffect: ContextShift]( logger: Logger[F] @@ -59,20 +59,20 @@ object ReadMail { } private def fixHtml(cnt: BodyContent): BodyContent = { - val str = cnt.asString.trim.toLowerCase + val str = cnt.asString.trim.toLowerCase val head = htmlHeader(cnt.charsetOrUtf8) if (str.startsWith(" - BodyContent(head + s + htmlHeaderEnd) - case BodyContent.ByteContent(bv, cs) => - val begin = ByteVector.view(head.getBytes(cnt.charsetOrUtf8)) - val end = ByteVector.view(htmlHeaderEnd.getBytes(cnt.charsetOrUtf8)) - BodyContent(begin ++ bv ++ end, cs) - } + else + cnt match { + case BodyContent.StringContent(s) => + BodyContent(head + s + htmlHeaderEnd) + case BodyContent.ByteContent(bv, cs) => + val begin = ByteVector.view(head.getBytes(cnt.charsetOrUtf8)) + val end = ByteVector.view(htmlHeaderEnd.getBytes(cnt.charsetOrUtf8)) + BodyContent(begin ++ bv ++ end, cs) + } } - implicit class MimeTypeConv(m: emil.MimeType) { def toDocspell: MimeType = MimeType(m.primary, m.sub, m.params) diff --git a/modules/joex/src/main/scala/docspell/joex/process/ExtractArchive.scala b/modules/joex/src/main/scala/docspell/joex/process/ExtractArchive.scala index 3671aa86..4429203b 100644 --- a/modules/joex/src/main/scala/docspell/joex/process/ExtractArchive.scala +++ b/modules/joex/src/main/scala/docspell/joex/process/ExtractArchive.scala @@ -120,7 +120,7 @@ object ExtractArchive { zipData .through(Zip.unzipP[F](8192, ctx.blocker)) - .flatMap(handleEntry(ctx, ra, archive)) + .flatMap(handleEntry(ctx, ra, archive, None)) .foldMonoid .compile .lastOrError @@ -130,14 +130,19 @@ object ExtractArchive { ctx: Context[F, _], archive: Option[RAttachmentArchive] )(ra: RAttachment): F[Extracted] = { - val email = ctx.store.bitpeace + val email: Stream[F, Byte] = ctx.store.bitpeace .get(ra.fileId.id) .unNoneTerminate .through(ctx.store.bitpeace.fetchData2(RangeDef.all)) email - .through(ReadMail.readBytesP[F](ctx.logger)) - .flatMap(handleEntry(ctx, ra, archive)) + .through(ReadMail.bytesToMail[F](ctx.logger)) + .flatMap { mail => + val mId = mail.header.messageId + ReadMail + .mailToEntries(ctx.logger)(mail) + .flatMap(handleEntry(ctx, ra, archive, mId)) + } .foldMonoid .compile .lastOrError @@ -146,7 +151,8 @@ object ExtractArchive { def handleEntry[F[_]: Sync]( ctx: Context[F, _], ra: RAttachment, - archive: Option[RAttachmentArchive] + archive: Option[RAttachmentArchive], + messageId: Option[String] )( entry: Binary[F] ): Stream[F, Extracted] = { @@ -163,7 +169,7 @@ object ExtractArchive { ra.created, Option(entry.name).map(_.trim).filter(_.nonEmpty) ) - val aa = archive.getOrElse(RAttachmentArchive.of(ra)).copy(id = id) + val aa = archive.getOrElse(RAttachmentArchive.of(ra, messageId)).copy(id = id) Extracted.of(nra, aa) } } diff --git a/modules/store/src/main/resources/db/migration/mariadb/V1.4.0__attachment_archive.sql b/modules/store/src/main/resources/db/migration/mariadb/V1.4.0__attachment_archive.sql index 4218c815..a38b61b2 100644 --- a/modules/store/src/main/resources/db/migration/mariadb/V1.4.0__attachment_archive.sql +++ b/modules/store/src/main/resources/db/migration/mariadb/V1.4.0__attachment_archive.sql @@ -2,7 +2,11 @@ CREATE TABLE `attachment_archive` ( `id` varchar(254) not null primary key, `file_id` varchar(254) not null, `filename` varchar(254), + `message_id` varchar(254), `created` timestamp not null, foreign key (`file_id`) references `filemeta`(`id`), foreign key (`id`) references `attachment`(`attachid`) ); + +CREATE INDEX `attachment_archive_message_id_idx` +ON `attachment_archive`(`message_id`); diff --git a/modules/store/src/main/resources/db/migration/postgresql/V1.4.0__attachment_archive.sql b/modules/store/src/main/resources/db/migration/postgresql/V1.4.0__attachment_archive.sql index 26dc7a56..15ecf1f2 100644 --- a/modules/store/src/main/resources/db/migration/postgresql/V1.4.0__attachment_archive.sql +++ b/modules/store/src/main/resources/db/migration/postgresql/V1.4.0__attachment_archive.sql @@ -2,7 +2,11 @@ CREATE TABLE "attachment_archive" ( "id" varchar(254) not null primary key, "file_id" varchar(254) not null, "filename" varchar(254), + "message_id" varchar(254), "created" timestamp not null, foreign key ("file_id") references "filemeta"("id"), foreign key ("id") references "attachment"("attachid") ); + +CREATE INDEX "attachment_archive_message_id_idx" +ON "attachment_archive"("message_id"); diff --git a/modules/store/src/main/scala/docspell/store/records/RAttachmentArchive.scala b/modules/store/src/main/scala/docspell/store/records/RAttachmentArchive.scala index ee4d891f..8e2e9e93 100644 --- a/modules/store/src/main/scala/docspell/store/records/RAttachmentArchive.scala +++ b/modules/store/src/main/scala/docspell/store/records/RAttachmentArchive.scala @@ -14,6 +14,7 @@ case class RAttachmentArchive( id: Ident, //same as RAttachment.id fileId: Ident, name: Option[String], + messageId: Option[String], created: Timestamp ) @@ -25,18 +26,19 @@ object RAttachmentArchive { val id = Column("id") val fileId = Column("file_id") val name = Column("filename") + val messageId = Column("message_id") val created = Column("created") - val all = List(id, fileId, name, created) + val all = List(id, fileId, name, messageId, created) } import Columns._ - def of(ra: RAttachment): RAttachmentArchive = - RAttachmentArchive(ra.id, ra.fileId, ra.name, ra.created) + def of(ra: RAttachment, mId: Option[String]): RAttachmentArchive = + RAttachmentArchive(ra.id, ra.fileId, ra.name, mId, ra.created) def insert(v: RAttachmentArchive): ConnectionIO[Int] = - insertRow(table, all, fr"${v.id},${v.fileId},${v.name},${v.created}").update.run + insertRow(table, all, fr"${v.id},${v.fileId},${v.name},${v.messageId},${v.created}").update.run def findById(attachId: Ident): ConnectionIO[Option[RAttachmentArchive]] = selectSimple(all, table, id.is(attachId)).query[RAttachmentArchive].option @@ -66,7 +68,9 @@ object RAttachmentArchive { selectSimple(all.map(_.prefix("a")), from, where).query[RAttachmentArchive].option } - def findByItemWithMeta(id: Ident): ConnectionIO[Vector[(RAttachmentArchive, FileMeta)]] = { + def findByItemWithMeta( + id: Ident + ): ConnectionIO[Vector[(RAttachmentArchive, FileMeta)]] = { import bitpeace.sql._ val aId = Columns.id.prefix("a")