mirror of
https://github.com/TheAnachronism/docspell.git
synced 2025-06-22 02:18:26 +00:00
Improve glob and filter archive entries
This commit is contained in:
@ -16,9 +16,10 @@ import emil.{MimeType => _, _}
|
||||
object ReadMail {
|
||||
|
||||
def readBytesP[F[_]: ConcurrentEffect: ContextShift](
|
||||
logger: Logger[F]
|
||||
logger: Logger[F],
|
||||
glob: Glob
|
||||
): Pipe[F, Byte, Binary[F]] =
|
||||
_.through(bytesToMail(logger)).flatMap(mailToEntries[F](logger))
|
||||
_.through(bytesToMail(logger)).flatMap(mailToEntries[F](logger, glob))
|
||||
|
||||
def bytesToMail[F[_]: Sync](logger: Logger[F]): Pipe[F, Byte, Mail[F]] =
|
||||
s =>
|
||||
@ -26,7 +27,8 @@ object ReadMail {
|
||||
s.through(Mail.readBytes[F])
|
||||
|
||||
def mailToEntries[F[_]: ConcurrentEffect: ContextShift](
|
||||
logger: Logger[F]
|
||||
logger: Logger[F],
|
||||
glob: Glob
|
||||
)(mail: Mail[F]): Stream[F, Binary[F]] = {
|
||||
val bodyEntry: F[Option[Binary[F]]] =
|
||||
if (mail.body.isEmpty) (None: Option[Binary[F]]).pure[F]
|
||||
@ -48,10 +50,12 @@ object ReadMail {
|
||||
) >>
|
||||
(Stream
|
||||
.eval(bodyEntry)
|
||||
.flatMap(e => Stream.emits(e.toSeq)) ++
|
||||
.flatMap(e => Stream.emits(e.toSeq))
|
||||
.filter(a => glob.matches(a.name)) ++
|
||||
Stream
|
||||
.eval(TnefExtract.replace(mail))
|
||||
.flatMap(m => Stream.emits(m.attachments.all))
|
||||
.filter(a => a.filename.exists(glob.matches))
|
||||
.map(a =>
|
||||
Binary(a.filename.getOrElse("noname"), a.mimeType.toLocal, a.content)
|
||||
))
|
||||
|
@ -95,12 +95,12 @@ object ExtractArchive {
|
||||
case MimeType.ZipMatch(_) if ra.name.exists(_.endsWith(".zip")) =>
|
||||
ctx.logger.info(s"Extracting zip archive ${ra.name.getOrElse("<noname>")}.") *>
|
||||
extractZip(ctx, archive)(ra, pos)
|
||||
.flatTap(_ => cleanupParents(ctx, ra, archive))
|
||||
.flatMap(cleanupParents(ctx, ra, archive))
|
||||
|
||||
case MimeType.EmailMatch(_) =>
|
||||
ctx.logger.info(s"Reading e-mail ${ra.name.getOrElse("<noname>")}") *>
|
||||
extractMail(ctx, archive)(ra, pos)
|
||||
.flatTap(_ => cleanupParents(ctx, ra, archive))
|
||||
.flatMap(cleanupParents(ctx, ra, archive))
|
||||
|
||||
case _ =>
|
||||
ctx.logger.debug(s"Not an archive: ${mime.asString}") *>
|
||||
@ -111,7 +111,7 @@ object ExtractArchive {
|
||||
ctx: Context[F, _],
|
||||
ra: RAttachment,
|
||||
archive: Option[RAttachmentArchive]
|
||||
): F[Unit] =
|
||||
)(extracted: Extracted): F[Extracted] =
|
||||
archive match {
|
||||
case Some(_) =>
|
||||
for {
|
||||
@ -121,36 +121,37 @@ object ExtractArchive {
|
||||
_ <- ctx.store.transact(RAttachmentArchive.delete(ra.id))
|
||||
_ <- ctx.store.transact(RAttachment.delete(ra.id))
|
||||
_ <- ctx.store.bitpeace.delete(ra.fileId.id).compile.drain
|
||||
} yield ()
|
||||
} yield extracted
|
||||
case None =>
|
||||
for {
|
||||
_ <- ctx.logger.debug(
|
||||
s"Extracted attachment ${ra.name}. Remove it from the item."
|
||||
)
|
||||
_ <- ctx.store.transact(RAttachment.delete(ra.id))
|
||||
} yield ()
|
||||
} yield extracted.copy(files = extracted.files.filter(_.id != ra.id))
|
||||
}
|
||||
|
||||
def extractZip[F[_]: ConcurrentEffect: ContextShift](
|
||||
ctx: Context[F, _],
|
||||
ctx: Context[F, ProcessItemArgs],
|
||||
archive: Option[RAttachmentArchive]
|
||||
)(ra: RAttachment, pos: Int): F[Extracted] = {
|
||||
val zipData = ctx.store.bitpeace
|
||||
.get(ra.fileId.id)
|
||||
.unNoneTerminate
|
||||
.through(ctx.store.bitpeace.fetchData2(RangeDef.all))
|
||||
|
||||
zipData
|
||||
.through(Zip.unzipP[F](8192, ctx.blocker))
|
||||
.zipWithIndex
|
||||
.flatMap(handleEntry(ctx, ra, pos, archive, None))
|
||||
.foldMonoid
|
||||
.compile
|
||||
.lastOrError
|
||||
val glob = ctx.args.meta.fileFilter.getOrElse(Glob.all)
|
||||
ctx.logger.debug(s"Filtering zip entries with '${glob.asString}'") *>
|
||||
zipData
|
||||
.through(Zip.unzipP[F](8192, ctx.blocker, glob))
|
||||
.zipWithIndex
|
||||
.flatMap(handleEntry(ctx, ra, pos, archive, None))
|
||||
.foldMonoid
|
||||
.compile
|
||||
.lastOrError
|
||||
}
|
||||
|
||||
def extractMail[F[_]: ConcurrentEffect: ContextShift](
|
||||
ctx: Context[F, _],
|
||||
ctx: Context[F, ProcessItemArgs],
|
||||
archive: Option[RAttachmentArchive]
|
||||
)(ra: RAttachment, pos: Int): F[Extracted] = {
|
||||
val email: Stream[F, Byte] = ctx.store.bitpeace
|
||||
@ -158,24 +159,26 @@ object ExtractArchive {
|
||||
.unNoneTerminate
|
||||
.through(ctx.store.bitpeace.fetchData2(RangeDef.all))
|
||||
|
||||
email
|
||||
.through(ReadMail.bytesToMail[F](ctx.logger))
|
||||
.flatMap { mail =>
|
||||
val mId = mail.header.messageId
|
||||
val givenMeta =
|
||||
for {
|
||||
_ <- ctx.logger.debug(s"Use mail date for item date: ${mail.header.date}")
|
||||
s <- Sync[F].delay(extractMailMeta(mail))
|
||||
} yield s
|
||||
val glob = ctx.args.meta.fileFilter.getOrElse(Glob.all)
|
||||
ctx.logger.debug(s"Filtering email attachments with '${glob.asString}'") *>
|
||||
email
|
||||
.through(ReadMail.bytesToMail[F](ctx.logger))
|
||||
.flatMap { mail =>
|
||||
val mId = mail.header.messageId
|
||||
val givenMeta =
|
||||
for {
|
||||
_ <- ctx.logger.debug(s"Use mail date for item date: ${mail.header.date}")
|
||||
s <- Sync[F].delay(extractMailMeta(mail))
|
||||
} yield s
|
||||
|
||||
ReadMail
|
||||
.mailToEntries(ctx.logger)(mail)
|
||||
.zipWithIndex
|
||||
.flatMap(handleEntry(ctx, ra, pos, archive, mId)) ++ Stream.eval(givenMeta)
|
||||
}
|
||||
.foldMonoid
|
||||
.compile
|
||||
.lastOrError
|
||||
ReadMail
|
||||
.mailToEntries(ctx.logger, glob)(mail)
|
||||
.zipWithIndex
|
||||
.flatMap(handleEntry(ctx, ra, pos, archive, mId)) ++ Stream.eval(givenMeta)
|
||||
}
|
||||
.foldMonoid
|
||||
.compile
|
||||
.lastOrError
|
||||
}
|
||||
|
||||
def extractMailMeta[F[_]](mail: Mail[F]): Extracted =
|
||||
@ -239,6 +242,9 @@ object ExtractArchive {
|
||||
positions ++ e.positions
|
||||
)
|
||||
|
||||
def filterNames(filter: Glob): Extracted =
|
||||
copy(files = files.filter(ra => filter.matches(ra.name.getOrElse(""))))
|
||||
|
||||
def setMeta(m: MetaProposal): Extracted =
|
||||
setMeta(MetaProposalList.of(m))
|
||||
|
||||
|
@ -25,6 +25,7 @@ object ProcessItem {
|
||||
.flatMap(LinkProposal[F])
|
||||
.flatMap(SetGivenData[F](itemOps))
|
||||
.flatMap(Task.setProgress(99))
|
||||
.flatMap(RemoveEmptyItem(itemOps))
|
||||
|
||||
def processAttachments[F[_]: ConcurrentEffect: ContextShift](
|
||||
cfg: Config,
|
||||
|
@ -0,0 +1,26 @@
|
||||
package docspell.joex.process
|
||||
|
||||
import cats.effect._
|
||||
import cats.implicits._
|
||||
|
||||
import docspell.backend.ops.OItem
|
||||
import docspell.common._
|
||||
import docspell.joex.scheduler.Task
|
||||
|
||||
object RemoveEmptyItem {
|
||||
|
||||
def apply[F[_]: Sync](
|
||||
ops: OItem[F]
|
||||
)(data: ItemData): Task[F, ProcessItemArgs, ItemData] =
|
||||
if (data.item.state.isInvalid && data.attachments.isEmpty)
|
||||
Task { ctx =>
|
||||
for {
|
||||
_ <- ctx.logger.warn(s"Removing item as it doesn't have any attachments!")
|
||||
n <- ops.deleteItem(data.item.id, data.item.cid)
|
||||
_ <- ctx.logger.warn(s"Removed item ($n). No item has been created!")
|
||||
} yield data
|
||||
}
|
||||
else
|
||||
Task.pure(data)
|
||||
|
||||
}
|
@ -256,7 +256,7 @@ object ScanMailboxTask {
|
||||
args.itemFolder,
|
||||
Seq.empty,
|
||||
true,
|
||||
args.fileFilter,
|
||||
args.fileFilter.getOrElse(Glob.all),
|
||||
args.tags.getOrElse(Nil)
|
||||
)
|
||||
data = OUpload.UploadData(
|
||||
|
Reference in New Issue
Block a user