Merge pull request #437 from eikek/upload-improvements

Upload improvements
This commit is contained in:
mergify[bot]
2020-11-12 22:58:08 +00:00
committed by GitHub
35 changed files with 1081 additions and 191 deletions

View File

@ -16,9 +16,10 @@ import emil.{MimeType => _, _}
object ReadMail {
def readBytesP[F[_]: ConcurrentEffect: ContextShift](
logger: Logger[F]
logger: Logger[F],
glob: Glob
): Pipe[F, Byte, Binary[F]] =
_.through(bytesToMail(logger)).flatMap(mailToEntries[F](logger))
_.through(bytesToMail(logger)).flatMap(mailToEntries[F](logger, glob))
def bytesToMail[F[_]: Sync](logger: Logger[F]): Pipe[F, Byte, Mail[F]] =
s =>
@ -26,7 +27,8 @@ object ReadMail {
s.through(Mail.readBytes[F])
def mailToEntries[F[_]: ConcurrentEffect: ContextShift](
logger: Logger[F]
logger: Logger[F],
glob: Glob
)(mail: Mail[F]): Stream[F, Binary[F]] = {
val bodyEntry: F[Option[Binary[F]]] =
if (mail.body.isEmpty) (None: Option[Binary[F]]).pure[F]
@ -48,10 +50,12 @@ object ReadMail {
) >>
(Stream
.eval(bodyEntry)
.flatMap(e => Stream.emits(e.toSeq)) ++
.flatMap(e => Stream.emits(e.toSeq))
.filter(a => glob.matches(a.name)) ++
Stream
.eval(TnefExtract.replace(mail))
.flatMap(m => Stream.emits(m.attachments.all))
.filter(a => a.filename.exists(glob.matches))
.map(a =>
Binary(a.filename.getOrElse("noname"), a.mimeType.toLocal, a.content)
))

View File

@ -95,12 +95,12 @@ object ExtractArchive {
case MimeType.ZipMatch(_) if ra.name.exists(_.endsWith(".zip")) =>
ctx.logger.info(s"Extracting zip archive ${ra.name.getOrElse("<noname>")}.") *>
extractZip(ctx, archive)(ra, pos)
.flatTap(_ => cleanupParents(ctx, ra, archive))
.flatMap(cleanupParents(ctx, ra, archive))
case MimeType.EmailMatch(_) =>
ctx.logger.info(s"Reading e-mail ${ra.name.getOrElse("<noname>")}") *>
extractMail(ctx, archive)(ra, pos)
.flatTap(_ => cleanupParents(ctx, ra, archive))
.flatMap(cleanupParents(ctx, ra, archive))
case _ =>
ctx.logger.debug(s"Not an archive: ${mime.asString}") *>
@ -111,7 +111,7 @@ object ExtractArchive {
ctx: Context[F, _],
ra: RAttachment,
archive: Option[RAttachmentArchive]
): F[Unit] =
)(extracted: Extracted): F[Extracted] =
archive match {
case Some(_) =>
for {
@ -121,36 +121,37 @@ object ExtractArchive {
_ <- ctx.store.transact(RAttachmentArchive.delete(ra.id))
_ <- ctx.store.transact(RAttachment.delete(ra.id))
_ <- ctx.store.bitpeace.delete(ra.fileId.id).compile.drain
} yield ()
} yield extracted
case None =>
for {
_ <- ctx.logger.debug(
s"Extracted attachment ${ra.name}. Remove it from the item."
)
_ <- ctx.store.transact(RAttachment.delete(ra.id))
} yield ()
} yield extracted.copy(files = extracted.files.filter(_.id != ra.id))
}
def extractZip[F[_]: ConcurrentEffect: ContextShift](
ctx: Context[F, _],
ctx: Context[F, ProcessItemArgs],
archive: Option[RAttachmentArchive]
)(ra: RAttachment, pos: Int): F[Extracted] = {
val zipData = ctx.store.bitpeace
.get(ra.fileId.id)
.unNoneTerminate
.through(ctx.store.bitpeace.fetchData2(RangeDef.all))
zipData
.through(Zip.unzipP[F](8192, ctx.blocker))
.zipWithIndex
.flatMap(handleEntry(ctx, ra, pos, archive, None))
.foldMonoid
.compile
.lastOrError
val glob = ctx.args.meta.fileFilter.getOrElse(Glob.all)
ctx.logger.debug(s"Filtering zip entries with '${glob.asString}'") *>
zipData
.through(Zip.unzipP[F](8192, ctx.blocker, glob))
.zipWithIndex
.flatMap(handleEntry(ctx, ra, pos, archive, None))
.foldMonoid
.compile
.lastOrError
}
def extractMail[F[_]: ConcurrentEffect: ContextShift](
ctx: Context[F, _],
ctx: Context[F, ProcessItemArgs],
archive: Option[RAttachmentArchive]
)(ra: RAttachment, pos: Int): F[Extracted] = {
val email: Stream[F, Byte] = ctx.store.bitpeace
@ -158,24 +159,26 @@ object ExtractArchive {
.unNoneTerminate
.through(ctx.store.bitpeace.fetchData2(RangeDef.all))
email
.through(ReadMail.bytesToMail[F](ctx.logger))
.flatMap { mail =>
val mId = mail.header.messageId
val givenMeta =
for {
_ <- ctx.logger.debug(s"Use mail date for item date: ${mail.header.date}")
s <- Sync[F].delay(extractMailMeta(mail))
} yield s
val glob = ctx.args.meta.fileFilter.getOrElse(Glob.all)
ctx.logger.debug(s"Filtering email attachments with '${glob.asString}'") *>
email
.through(ReadMail.bytesToMail[F](ctx.logger))
.flatMap { mail =>
val mId = mail.header.messageId
val givenMeta =
for {
_ <- ctx.logger.debug(s"Use mail date for item date: ${mail.header.date}")
s <- Sync[F].delay(extractMailMeta(mail))
} yield s
ReadMail
.mailToEntries(ctx.logger)(mail)
.zipWithIndex
.flatMap(handleEntry(ctx, ra, pos, archive, mId)) ++ Stream.eval(givenMeta)
}
.foldMonoid
.compile
.lastOrError
ReadMail
.mailToEntries(ctx.logger, glob)(mail)
.zipWithIndex
.flatMap(handleEntry(ctx, ra, pos, archive, mId)) ++ Stream.eval(givenMeta)
}
.foldMonoid
.compile
.lastOrError
}
def extractMailMeta[F[_]](mail: Mail[F]): Extracted =
@ -239,6 +242,9 @@ object ExtractArchive {
positions ++ e.positions
)
def filterNames(filter: Glob): Extracted =
copy(files = files.filter(ra => filter.matches(ra.name.getOrElse(""))))
def setMeta(m: MetaProposal): Extracted =
setMeta(MetaProposalList.of(m))

View File

@ -25,6 +25,7 @@ object ProcessItem {
.flatMap(LinkProposal[F])
.flatMap(SetGivenData[F](itemOps))
.flatMap(Task.setProgress(99))
.flatMap(RemoveEmptyItem(itemOps))
def processAttachments[F[_]: ConcurrentEffect: ContextShift](
cfg: Config,

View File

@ -91,7 +91,9 @@ object ReProcessItem {
"", //source-id
None, //folder
Seq.empty,
false
false,
None,
None
),
Nil
).pure[F]

View File

@ -0,0 +1,26 @@
package docspell.joex.process
import cats.effect._
import cats.implicits._
import docspell.backend.ops.OItem
import docspell.common._
import docspell.joex.scheduler.Task
object RemoveEmptyItem {
def apply[F[_]: Sync](
ops: OItem[F]
)(data: ItemData): Task[F, ProcessItemArgs, ItemData] =
if (data.item.state.isInvalid && data.attachments.isEmpty)
Task { ctx =>
for {
_ <- ctx.logger.warn(s"Removing item as it doesn't have any attachments!")
n <- ops.deleteItem(data.item.id, data.item.cid)
_ <- ctx.logger.warn(s"Removed item ($n). No item has been created!")
} yield data
}
else
Task.pure(data)
}

View File

@ -45,9 +45,10 @@ object SetGivenData {
Task { ctx =>
val itemId = data.item.id
val collective = ctx.args.meta.collective
val tags = (ctx.args.meta.tags.getOrElse(Nil) ++ data.tags).distinct
for {
_ <- ctx.logger.info(s"Set tags from given data: ${data.tags}")
e <- ops.linkTags(itemId, data.tags, collective).attempt
_ <- ctx.logger.info(s"Set tags from given data: ${tags}")
e <- ops.linkTags(itemId, tags, collective).attempt
_ <- e.fold(
ex => ctx.logger.warn(s"Error setting tags: ${ex.getMessage}"),
_ => ().pure[F]

View File

@ -47,7 +47,9 @@ object TextExtraction {
_ <- fts.indexData(ctx.logger, (idxItem +: txt.map(_.td)).toSeq: _*)
dur <- start
_ <- ctx.logger.info(s"Text extraction finished in ${dur.formatExact}")
} yield item.copy(metas = txt.map(_.am), tags = txt.flatMap(_.tags).distinct.toList)
} yield item
.copy(metas = txt.map(_.am))
.appendTags(txt.flatMap(_.tags).distinct.toList)
}
// -- helpers

View File

@ -255,7 +255,9 @@ object ScanMailboxTask {
s"mailbox-${ctx.args.account.user.id}",
args.itemFolder,
Seq.empty,
true
true,
args.fileFilter.getOrElse(Glob.all),
args.tags.getOrElse(Nil)
)
data = OUpload.UploadData(
multiple = false,