mirror of
https://github.com/TheAnachronism/docspell.git
synced 2025-06-05 22:55:58 +00:00
Add support for eml (rfc822 email) files
This commit is contained in:
parent
4ed7a137f7
commit
6b1156182c
24
modules/common/src/main/scala/docspell/common/Binary.scala
Normal file
24
modules/common/src/main/scala/docspell/common/Binary.scala
Normal file
@ -0,0 +1,24 @@
|
|||||||
|
package docspell.common
|
||||||
|
|
||||||
|
import fs2.Stream
|
||||||
|
|
||||||
|
final case class Binary[F[_]](name: String, mime: MimeType, data: Stream[F, Byte]) {
|
||||||
|
|
||||||
|
def withMime(mime: MimeType): Binary[F] =
|
||||||
|
copy(mime = mime)
|
||||||
|
}
|
||||||
|
|
||||||
|
object Binary {
|
||||||
|
|
||||||
|
def apply[F[_]](name: String, data: Stream[F, Byte]): Binary[F] =
|
||||||
|
Binary[F](name, MimeType.octetStream, data)
|
||||||
|
|
||||||
|
def utf8[F[_]](name: String, content: String): Binary[F] =
|
||||||
|
Binary[F](name, MimeType.octetStream, Stream.emit(content).through(fs2.text.utf8Encode))
|
||||||
|
|
||||||
|
def text[F[_]](name: String, content: String): Binary[F] =
|
||||||
|
utf8(name, content).withMime(MimeType.plain)
|
||||||
|
|
||||||
|
def html[F[_]](name: String, content: String): Binary[F] =
|
||||||
|
utf8(name, content).withMime(MimeType.html)
|
||||||
|
}
|
@ -6,27 +6,26 @@ import fs2.{Pipe, Stream}
|
|||||||
import java.io.InputStream
|
import java.io.InputStream
|
||||||
import java.util.zip.ZipInputStream
|
import java.util.zip.ZipInputStream
|
||||||
import java.nio.file.Paths
|
import java.nio.file.Paths
|
||||||
|
import docspell.common.Binary
|
||||||
|
|
||||||
object Zip {
|
object Zip {
|
||||||
|
|
||||||
case class Entry[F[_]](name: String, data: Stream[F, Byte])
|
|
||||||
|
|
||||||
def unzipP[F[_]: ConcurrentEffect: ContextShift](
|
def unzipP[F[_]: ConcurrentEffect: ContextShift](
|
||||||
chunkSize: Int,
|
chunkSize: Int,
|
||||||
blocker: Blocker
|
blocker: Blocker
|
||||||
): Pipe[F, Byte, Entry[F]] =
|
): Pipe[F, Byte, Binary[F]] =
|
||||||
s => unzip[F](chunkSize, blocker)(s)
|
s => unzip[F](chunkSize, blocker)(s)
|
||||||
|
|
||||||
def unzip[F[_]: ConcurrentEffect: ContextShift](chunkSize: Int, blocker: Blocker)(
|
def unzip[F[_]: ConcurrentEffect: ContextShift](chunkSize: Int, blocker: Blocker)(
|
||||||
data: Stream[F, Byte]
|
data: Stream[F, Byte]
|
||||||
): Stream[F, Entry[F]] =
|
): Stream[F, Binary[F]] =
|
||||||
data.through(fs2.io.toInputStream[F]).flatMap(in => unzipJava(in, chunkSize, blocker))
|
data.through(fs2.io.toInputStream[F]).flatMap(in => unzipJava(in, chunkSize, blocker))
|
||||||
|
|
||||||
def unzipJava[F[_]: Sync: ContextShift](
|
def unzipJava[F[_]: Sync: ContextShift](
|
||||||
in: InputStream,
|
in: InputStream,
|
||||||
chunkSize: Int,
|
chunkSize: Int,
|
||||||
blocker: Blocker
|
blocker: Blocker
|
||||||
): Stream[F, Entry[F]] = {
|
): Stream[F, Binary[F]] = {
|
||||||
val zin = new ZipInputStream(in)
|
val zin = new ZipInputStream(in)
|
||||||
|
|
||||||
val nextEntry = Resource.make(Sync[F].delay(Option(zin.getNextEntry))) {
|
val nextEntry = Resource.make(Sync[F].delay(Option(zin.getNextEntry))) {
|
||||||
@ -42,7 +41,7 @@ object Zip {
|
|||||||
val name = Paths.get(ze.getName()).getFileName.toString
|
val name = Paths.get(ze.getName()).getFileName.toString
|
||||||
val data =
|
val data =
|
||||||
fs2.io.readInputStream[F]((zin: InputStream).pure[F], chunkSize, blocker, false)
|
fs2.io.readInputStream[F]((zin: InputStream).pure[F], chunkSize, blocker, false)
|
||||||
Entry(name, data)
|
Binary(name, data)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -0,0 +1,62 @@
|
|||||||
|
package docspell.joex.mail
|
||||||
|
|
||||||
|
import cats.effect._
|
||||||
|
import cats.implicits._
|
||||||
|
import fs2.{Pipe, Stream}
|
||||||
|
import emil.{MimeType => _, _}
|
||||||
|
import emil.javamail.syntax._
|
||||||
|
import cats.Applicative
|
||||||
|
|
||||||
|
import docspell.common._
|
||||||
|
|
||||||
|
object ReadMail {
|
||||||
|
|
||||||
|
def read[F[_]: Sync](str: String): F[Mail[F]] =
|
||||||
|
Mail.deserialize(str)
|
||||||
|
|
||||||
|
def readBytesP[F[_]: Sync](logger: Logger[F]): Pipe[F, Byte, Binary[F]] =
|
||||||
|
s =>
|
||||||
|
Stream.eval(logger.debug(s"Converting e-mail into its parts")) >>
|
||||||
|
bytesToMail(s).flatMap(mailToEntries[F](logger))
|
||||||
|
|
||||||
|
def bytesToMail[F[_]: Sync](data: Stream[F, Byte]): Stream[F, Mail[F]] =
|
||||||
|
data.through(fs2.text.utf8Decode).foldMonoid.evalMap(read[F])
|
||||||
|
|
||||||
|
def mailToEntries[F[_]: Applicative](
|
||||||
|
logger: Logger[F]
|
||||||
|
)(mail: Mail[F]): Stream[F, Binary[F]] = {
|
||||||
|
val bodyEntry: F[Option[Binary[F]]] = mail.body.fold(
|
||||||
|
_ => (None: Option[Binary[F]]).pure[F],
|
||||||
|
txt => txt.text.map(c => Binary.text[F]("mail.txt", c).some),
|
||||||
|
html => html.html.map(c => Binary.html[F]("mail.html", c).some),
|
||||||
|
both => both.html.map(c => Binary.html[F]("mail.html", c).some)
|
||||||
|
)
|
||||||
|
|
||||||
|
Stream.eval(
|
||||||
|
logger.debug(
|
||||||
|
s"E-mail has ${mail.attachments.size} attachments and ${bodyType(mail.body)}"
|
||||||
|
)
|
||||||
|
) >>
|
||||||
|
(Stream
|
||||||
|
.eval(bodyEntry)
|
||||||
|
.flatMap(e => Stream.emits(e.toSeq)) ++
|
||||||
|
Stream
|
||||||
|
.emits(mail.attachments.all)
|
||||||
|
.map(a =>
|
||||||
|
Binary(a.filename.getOrElse("noname"), a.mimeType.toDocspell, a.content)
|
||||||
|
))
|
||||||
|
}
|
||||||
|
|
||||||
|
implicit class MimeTypeConv(m: emil.MimeType) {
|
||||||
|
def toDocspell: MimeType =
|
||||||
|
MimeType(m.primary, m.sub)
|
||||||
|
}
|
||||||
|
|
||||||
|
private def bodyType[F[_]](body: MailBody[F]): String =
|
||||||
|
body.fold(
|
||||||
|
_ => "empty-body",
|
||||||
|
_ => "text-body",
|
||||||
|
_ => "html-body",
|
||||||
|
_ => "text-and-html-body"
|
||||||
|
)
|
||||||
|
}
|
@ -7,6 +7,7 @@ import cats.effect._
|
|||||||
import cats.implicits._
|
import cats.implicits._
|
||||||
import fs2.Stream
|
import fs2.Stream
|
||||||
import docspell.common._
|
import docspell.common._
|
||||||
|
import docspell.joex.mail._
|
||||||
import docspell.joex.scheduler._
|
import docspell.joex.scheduler._
|
||||||
import docspell.store.records._
|
import docspell.store.records._
|
||||||
import docspell.files.Zip
|
import docspell.files.Zip
|
||||||
@ -74,6 +75,11 @@ object ExtractArchive {
|
|||||||
extractZip(ctx, archive)(ra)
|
extractZip(ctx, archive)(ra)
|
||||||
.flatTap(_ => cleanupParents(ctx, ra, archive))
|
.flatTap(_ => cleanupParents(ctx, ra, archive))
|
||||||
|
|
||||||
|
case Mimetype("message", "rfc822", _) =>
|
||||||
|
ctx.logger.info(s"Reading e-mail ${ra.name.getOrElse("<noname>")}") *>
|
||||||
|
extractMail(ctx, archive)(ra)
|
||||||
|
.flatTap(_ => cleanupParents(ctx, ra, archive))
|
||||||
|
|
||||||
case _ =>
|
case _ =>
|
||||||
ctx.logger.debug(s"Not an archive: ${mime.asString}") *>
|
ctx.logger.debug(s"Not an archive: ${mime.asString}") *>
|
||||||
Extracted.noArchive(ra).pure[F]
|
Extracted.noArchive(ra).pure[F]
|
||||||
@ -114,8 +120,37 @@ object ExtractArchive {
|
|||||||
|
|
||||||
zipData
|
zipData
|
||||||
.through(Zip.unzipP[F](8192, ctx.blocker))
|
.through(Zip.unzipP[F](8192, ctx.blocker))
|
||||||
.flatMap { entry =>
|
.flatMap(handleEntry(ctx, ra, archive))
|
||||||
val mimeHint = MimetypeHint.filename(entry.name)
|
.foldMonoid
|
||||||
|
.compile
|
||||||
|
.lastOrError
|
||||||
|
}
|
||||||
|
|
||||||
|
def extractMail[F[_]: Sync](
|
||||||
|
ctx: Context[F, _],
|
||||||
|
archive: Option[RAttachmentArchive]
|
||||||
|
)(ra: RAttachment): F[Extracted] = {
|
||||||
|
val email = ctx.store.bitpeace
|
||||||
|
.get(ra.fileId.id)
|
||||||
|
.unNoneTerminate
|
||||||
|
.through(ctx.store.bitpeace.fetchData2(RangeDef.all))
|
||||||
|
|
||||||
|
email
|
||||||
|
.through(ReadMail.readBytesP[F](ctx.logger))
|
||||||
|
.flatMap(handleEntry(ctx, ra, archive))
|
||||||
|
.foldMonoid
|
||||||
|
.compile
|
||||||
|
.lastOrError
|
||||||
|
}
|
||||||
|
|
||||||
|
def handleEntry[F[_]: Sync](
|
||||||
|
ctx: Context[F, _],
|
||||||
|
ra: RAttachment,
|
||||||
|
archive: Option[RAttachmentArchive]
|
||||||
|
)(
|
||||||
|
entry: Binary[F]
|
||||||
|
): Stream[F, Extracted] = {
|
||||||
|
val mimeHint = MimetypeHint.filename(entry.name).withAdvertised(entry.mime.asString)
|
||||||
val fileMeta = ctx.store.bitpeace.saveNew(entry.data, 8192, mimeHint)
|
val fileMeta = ctx.store.bitpeace.saveNew(entry.data, 8192, mimeHint)
|
||||||
Stream.eval(ctx.logger.debug(s"Extracted ${entry.name}. Storing as attachment.")) >>
|
Stream.eval(ctx.logger.debug(s"Extracted ${entry.name}. Storing as attachment.")) >>
|
||||||
fileMeta.evalMap { fm =>
|
fileMeta.evalMap { fm =>
|
||||||
@ -132,10 +167,7 @@ object ExtractArchive {
|
|||||||
Extracted.of(nra, aa)
|
Extracted.of(nra, aa)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
|
||||||
.foldMonoid
|
|
||||||
.compile
|
|
||||||
.lastOrError
|
|
||||||
}
|
}
|
||||||
|
|
||||||
def storeAttachment[F[_]: Sync](ctx: Context[F, _])(ra: RAttachment): F[Int] = {
|
def storeAttachment[F[_]: Sync](ctx: Context[F, _])(ra: RAttachment): F[Int] = {
|
||||||
|
Loading…
x
Reference in New Issue
Block a user