From bddafa7d28e59b90637b09f77541c2cb2d5ead7f Mon Sep 17 00:00:00 2001 From: Eike Kettner Date: Sat, 9 Jan 2021 14:13:02 +0100 Subject: [PATCH] Fix looping over already seen mails when they are skipped When skipping mails due to a filter, it must still enter the post-handling step. Otherwise it will be seen again on next run. Issue: #551 --- .../joex/scanmailbox/ScanMailboxTask.scala | 97 +++++++++++-------- 1 file changed, 57 insertions(+), 40 deletions(-) diff --git a/modules/joex/src/main/scala/docspell/joex/scanmailbox/ScanMailboxTask.scala b/modules/joex/src/main/scala/docspell/joex/scanmailbox/ScanMailboxTask.scala index ed038926..71bf3902 100644 --- a/modules/joex/src/main/scala/docspell/joex/scanmailbox/ScanMailboxTask.scala +++ b/modules/joex/src/main/scala/docspell/joex/scanmailbox/ScanMailboxTask.scala @@ -135,15 +135,21 @@ object ScanMailboxTask { final private class Impl[F[_]: Sync](cfg: Config.ScanMailbox, ctx: Context[F, Args]) { + private def logOp[C](f: Logger[F] => F[Unit]): MailOp[F, C, Unit] = + MailOp(_ => f(ctx.logger)) + def handleFolder[C](a: Access[F, C], upload: OUpload[F])( name: String ): MailOp[F, C, ScanResult] = for { - _ <- Kleisli.liftF(ctx.logger.info(s"Processing folder $name")) - folder <- requireFolder(a)(name) - search <- searchMails(a)(folder) - headers <- Kleisli.liftF(filterSubjects(search.mails).flatMap(filterMessageIds)) - _ <- headers.traverse(handleOne(ctx.args, a, upload)) + _ <- Kleisli.liftF(ctx.logger.info(s"Processing folder $name")) + folder <- requireFolder(a)(name) + search <- searchMails(a)(folder) + items = search.mails.map(MailHeaderItem(_)) + headers <- Kleisli.liftF( + filterSubjects(items).flatMap(filterMessageIds) + ) + _ <- headers.traverse(handleOne(ctx.args, a, upload)) } yield ScanResult(name, search.mails.size, search.count - search.mails.size) def requireFolder[C](a: Access[F, C])(name: String): MailOp[F, C, MailFolder] = @@ -163,43 +169,39 @@ object ScanMailboxTask { } for { - _ <- Kleisli.liftF( - ctx.logger.debug( - s"Searching next ${cfg.mailBatchSize} mails in ${folder.name}." - ) + _ <- logOp( + _.debug(s"Searching next ${cfg.mailBatchSize} mails in ${folder.name}.") ) query <- Kleisli.liftF(q) mails <- a.search(folder, cfg.mailBatchSize)(query) - _ <- Kleisli.liftF( - ctx.logger.debug( + _ <- logOp( + _.debug( s"Found ${mails.count} mails in folder. Reading first ${mails.mails.size}" ) ) } yield mails } - def filterSubjects(headers: Vector[MailHeader]): F[Vector[MailHeader]] = + def filterSubjects(headers: Vector[MailHeaderItem]): F[Vector[MailHeaderItem]] = ctx.args.subjectFilter match { case Some(sf) => - def check(mh: MailHeader): F[Option[MailHeader]] = - if (sf.matches(caseSensitive = false)(mh.subject)) - ctx.logger.debug( - s"Including mail '${mh.subject}', it matches the filter." - ) *> Option(mh).pure[F] + def check(mh: MailHeaderItem): F[MailHeaderItem] = + if (mh.notProcess || sf.matches(caseSensitive = false)(mh.mh.subject)) + mh.pure[F] else ctx.logger.debug( - s"Excluding mail '${mh.subject}', it doesn't match the filter." - ) *> (None: Option[MailHeader]).pure[F] + s"Excluding mail '${mh.mh.subject}', it doesn't match the subject filter." + ) *> mh.skip.pure[F] ctx.logger.info( s"Filtering mails on subject using filter: ${sf.asString}" - ) *> headers.traverseFilter(check) + ) *> headers.traverse(check) case None => ctx.logger.debug("Not matching on subjects. No filter given") *> headers.pure[F] } - def filterMessageIds(headers: Vector[MailHeader]): F[Vector[MailHeader]] = - NonEmptyList.fromFoldable(headers.flatMap(_.messageId)) match { + def filterMessageIds[C](headers: Vector[MailHeaderItem]): F[Vector[MailHeaderItem]] = + NonEmptyList.fromFoldable(headers.flatMap(_.mh.messageId)) match { case Some(nl) => for { archives <- ctx.store.transact( @@ -207,12 +209,14 @@ object ScanMailboxTask { .findByMessageIdAndCollective(nl, ctx.args.account.collective) ) existing = archives.flatMap(_.messageId).toSet - mails = headers.filterNot(mh => mh.messageId.forall(existing.contains)) - _ <- headers.size - mails.size match { - case 0 => ().pure[F] - case n => - ctx.logger.info(s"Excluded $n mails since items for them already exist.") - } + mails <- headers + .traverse(mh => + if (mh.process && mh.mh.messageId.forall(existing.contains)) + ctx.logger.debug( + s"Excluding mail '${mh.mh.subject}' it has been imported in the past.'" + ) *> mh.skip.pure[F] + else mh.pure[F] + ) } yield mails case None => @@ -248,15 +252,21 @@ object ScanMailboxTask { def postHandle[C](a: Access[F, C])(mh: MailHeader): MailOp[F, C, Unit] = ctx.args.targetFolder match { case Some(tf) => - a.getOrCreateFolder(None, tf).flatMap(folder => a.moveMail(mh, folder)) + logOp(_.debug(s"Post handling mail: ${mh.subject} - moving to folder: $tf")) + .flatMap(_ => + a.getOrCreateFolder(None, tf).flatMap(folder => a.moveMail(mh, folder)) + ) case None if ctx.args.deleteMail => - a.deleteMail(mh).flatMapF { r => - if (r.count == 0) - ctx.logger.warn(s"Mail '${mh.subject}' could not be deleted") - else ().pure[F] - } + logOp(_.debug(s"Post handling mail: ${mh.subject} - deleting mail.")).flatMap( + _ => + a.deleteMail(mh).flatMapF { r => + if (r.count == 0) + ctx.logger.warn(s"Mail could not be deleted!") + else ().pure[F] + } + ) case None => - MailOp.pure(()) + logOp(_.debug(s"Post handling mail: ${mh.subject} - no handling defined!")) } def submitMail(upload: OUpload[F], args: Args)( @@ -292,13 +302,15 @@ object ScanMailboxTask { } def handleOne[C](args: Args, a: Access[F, C], upload: OUpload[F])( - mh: MailHeader + mh: MailHeaderItem ): MailOp[F, C, Unit] = for { - mail <- a.loadMail(mh) + mail <- a.loadMail(mh.mh) res <- mail match { - case Some(m) => + case Some(m) if mh.process => Kleisli.liftF(submitMail(upload, args)(m).attempt) + case Some(_) => + Kleisli.liftF(Either.right(mh).pure[F]) case None => MailOp.pure[F, C, Either[Throwable, OUpload.UploadResult]]( Either.left(new Exception(s"Mail not found")) @@ -307,10 +319,15 @@ object ScanMailboxTask { _ <- res.fold( ex => Kleisli.liftF( - ctx.logger.warn(s"Error submitting '${mh.subject}': ${ex.getMessage}") + ctx.logger.warn(s"Error submitting '${mh.mh.subject}': ${ex.getMessage}") ), - _ => postHandle(a)(mh) + _ => postHandle(a)(mh.mh) ) } yield () } + + case class MailHeaderItem(mh: MailHeader, process: Boolean = true) { + def skip = copy(process = false) + def notProcess = !process + } }