diff --git a/modules/common/src/main/scala/docspell/common/Binary.scala b/modules/common/src/main/scala/docspell/common/Binary.scala new file mode 100644 index 00000000..34f2059c --- /dev/null +++ b/modules/common/src/main/scala/docspell/common/Binary.scala @@ -0,0 +1,24 @@ +package docspell.common + +import fs2.Stream + +final case class Binary[F[_]](name: String, mime: MimeType, data: Stream[F, Byte]) { + + def withMime(mime: MimeType): Binary[F] = + copy(mime = mime) +} + +object Binary { + + def apply[F[_]](name: String, data: Stream[F, Byte]): Binary[F] = + Binary[F](name, MimeType.octetStream, data) + + def utf8[F[_]](name: String, content: String): Binary[F] = + Binary[F](name, MimeType.octetStream, Stream.emit(content).through(fs2.text.utf8Encode)) + + def text[F[_]](name: String, content: String): Binary[F] = + utf8(name, content).withMime(MimeType.plain) + + def html[F[_]](name: String, content: String): Binary[F] = + utf8(name, content).withMime(MimeType.html) +} diff --git a/modules/files/src/main/scala/docspell/files/Zip.scala b/modules/files/src/main/scala/docspell/files/Zip.scala index 55d4cef9..fd786fa9 100644 --- a/modules/files/src/main/scala/docspell/files/Zip.scala +++ b/modules/files/src/main/scala/docspell/files/Zip.scala @@ -6,27 +6,26 @@ import fs2.{Pipe, Stream} import java.io.InputStream import java.util.zip.ZipInputStream import java.nio.file.Paths +import docspell.common.Binary object Zip { - case class Entry[F[_]](name: String, data: Stream[F, Byte]) - def unzipP[F[_]: ConcurrentEffect: ContextShift]( chunkSize: Int, blocker: Blocker - ): Pipe[F, Byte, Entry[F]] = + ): Pipe[F, Byte, Binary[F]] = s => unzip[F](chunkSize, blocker)(s) def unzip[F[_]: ConcurrentEffect: ContextShift](chunkSize: Int, blocker: Blocker)( data: Stream[F, Byte] - ): Stream[F, Entry[F]] = + ): Stream[F, Binary[F]] = data.through(fs2.io.toInputStream[F]).flatMap(in => unzipJava(in, chunkSize, blocker)) def unzipJava[F[_]: Sync: ContextShift]( in: InputStream, chunkSize: Int, blocker: Blocker - ): Stream[F, Entry[F]] = { + ): Stream[F, Binary[F]] = { val zin = new ZipInputStream(in) val nextEntry = Resource.make(Sync[F].delay(Option(zin.getNextEntry))) { @@ -42,7 +41,7 @@ object Zip { val name = Paths.get(ze.getName()).getFileName.toString val data = fs2.io.readInputStream[F]((zin: InputStream).pure[F], chunkSize, blocker, false) - Entry(name, data) + Binary(name, data) } } } diff --git a/modules/joex/src/main/scala/docspell/joex/mail/ReadMail.scala b/modules/joex/src/main/scala/docspell/joex/mail/ReadMail.scala new file mode 100644 index 00000000..2f4f8b54 --- /dev/null +++ b/modules/joex/src/main/scala/docspell/joex/mail/ReadMail.scala @@ -0,0 +1,62 @@ +package docspell.joex.mail + +import cats.effect._ +import cats.implicits._ +import fs2.{Pipe, Stream} +import emil.{MimeType => _, _} +import emil.javamail.syntax._ +import cats.Applicative + +import docspell.common._ + +object ReadMail { + + def read[F[_]: Sync](str: String): F[Mail[F]] = + Mail.deserialize(str) + + def readBytesP[F[_]: Sync](logger: Logger[F]): Pipe[F, Byte, Binary[F]] = + s => + Stream.eval(logger.debug(s"Converting e-mail into its parts")) >> + bytesToMail(s).flatMap(mailToEntries[F](logger)) + + def bytesToMail[F[_]: Sync](data: Stream[F, Byte]): Stream[F, Mail[F]] = + data.through(fs2.text.utf8Decode).foldMonoid.evalMap(read[F]) + + def mailToEntries[F[_]: Applicative]( + logger: Logger[F] + )(mail: Mail[F]): Stream[F, Binary[F]] = { + val bodyEntry: F[Option[Binary[F]]] = mail.body.fold( + _ => (None: Option[Binary[F]]).pure[F], + txt => txt.text.map(c => Binary.text[F]("mail.txt", c).some), + html => html.html.map(c => Binary.html[F]("mail.html", c).some), + both => both.html.map(c => Binary.html[F]("mail.html", c).some) + ) + + Stream.eval( + logger.debug( + s"E-mail has ${mail.attachments.size} attachments and ${bodyType(mail.body)}" + ) + ) >> + (Stream + .eval(bodyEntry) + .flatMap(e => Stream.emits(e.toSeq)) ++ + Stream + .emits(mail.attachments.all) + .map(a => + Binary(a.filename.getOrElse("noname"), a.mimeType.toDocspell, a.content) + )) + } + + implicit class MimeTypeConv(m: emil.MimeType) { + def toDocspell: MimeType = + MimeType(m.primary, m.sub) + } + + private def bodyType[F[_]](body: MailBody[F]): String = + body.fold( + _ => "empty-body", + _ => "text-body", + _ => "html-body", + _ => "text-and-html-body" + ) +} diff --git a/modules/joex/src/main/scala/docspell/joex/process/ExtractArchive.scala b/modules/joex/src/main/scala/docspell/joex/process/ExtractArchive.scala index 22b39f34..62ea43cf 100644 --- a/modules/joex/src/main/scala/docspell/joex/process/ExtractArchive.scala +++ b/modules/joex/src/main/scala/docspell/joex/process/ExtractArchive.scala @@ -7,6 +7,7 @@ import cats.effect._ import cats.implicits._ import fs2.Stream import docspell.common._ +import docspell.joex.mail._ import docspell.joex.scheduler._ import docspell.store.records._ import docspell.files.Zip @@ -74,6 +75,11 @@ object ExtractArchive { extractZip(ctx, archive)(ra) .flatTap(_ => cleanupParents(ctx, ra, archive)) + case Mimetype("message", "rfc822", _) => + ctx.logger.info(s"Reading e-mail ${ra.name.getOrElse("")}") *> + extractMail(ctx, archive)(ra) + .flatTap(_ => cleanupParents(ctx, ra, archive)) + case _ => ctx.logger.debug(s"Not an archive: ${mime.asString}") *> Extracted.noArchive(ra).pure[F] @@ -114,30 +120,56 @@ object ExtractArchive { zipData .through(Zip.unzipP[F](8192, ctx.blocker)) - .flatMap { entry => - val mimeHint = MimetypeHint.filename(entry.name) - val fileMeta = ctx.store.bitpeace.saveNew(entry.data, 8192, mimeHint) - Stream.eval(ctx.logger.debug(s"Extracted ${entry.name}. Storing as attachment.")) >> - fileMeta.evalMap { fm => - Ident.randomId.map { id => - val nra = RAttachment( - id, - ra.itemId, - Ident.unsafe(fm.id), - 0, //position is updated afterwards - ra.created, - Option(entry.name).map(_.trim).filter(_.nonEmpty) - ) - val aa = archive.getOrElse(RAttachmentArchive.of(ra)).copy(id = id) - Extracted.of(nra, aa) - } - } - } + .flatMap(handleEntry(ctx, ra, archive)) .foldMonoid .compile .lastOrError } + def extractMail[F[_]: Sync]( + ctx: Context[F, _], + archive: Option[RAttachmentArchive] + )(ra: RAttachment): F[Extracted] = { + val email = ctx.store.bitpeace + .get(ra.fileId.id) + .unNoneTerminate + .through(ctx.store.bitpeace.fetchData2(RangeDef.all)) + + email + .through(ReadMail.readBytesP[F](ctx.logger)) + .flatMap(handleEntry(ctx, ra, archive)) + .foldMonoid + .compile + .lastOrError + } + + def handleEntry[F[_]: Sync]( + ctx: Context[F, _], + ra: RAttachment, + archive: Option[RAttachmentArchive] + )( + entry: Binary[F] + ): Stream[F, Extracted] = { + val mimeHint = MimetypeHint.filename(entry.name).withAdvertised(entry.mime.asString) + val fileMeta = ctx.store.bitpeace.saveNew(entry.data, 8192, mimeHint) + Stream.eval(ctx.logger.debug(s"Extracted ${entry.name}. Storing as attachment.")) >> + fileMeta.evalMap { fm => + Ident.randomId.map { id => + val nra = RAttachment( + id, + ra.itemId, + Ident.unsafe(fm.id), + 0, //position is updated afterwards + ra.created, + Option(entry.name).map(_.trim).filter(_.nonEmpty) + ) + val aa = archive.getOrElse(RAttachmentArchive.of(ra)).copy(id = id) + Extracted.of(nra, aa) + } + } + + } + def storeAttachment[F[_]: Sync](ctx: Context[F, _])(ra: RAttachment): F[Int] = { val insert = CreateItem.insertAttachment(ctx)(ra) for {