Fix looping over already seen mails when they are skipped

When skipping mails due to a filter, it must still enter the
post-handling step. Otherwise it will be seen again on next run.

Issue: #551
This commit is contained in:
Eike Kettner 2021-01-09 14:13:02 +01:00
parent d712f8303d
commit bddafa7d28

View File

@ -135,15 +135,21 @@ object ScanMailboxTask {
final private class Impl[F[_]: Sync](cfg: Config.ScanMailbox, ctx: Context[F, Args]) { final private class Impl[F[_]: Sync](cfg: Config.ScanMailbox, ctx: Context[F, Args]) {
private def logOp[C](f: Logger[F] => F[Unit]): MailOp[F, C, Unit] =
MailOp(_ => f(ctx.logger))
def handleFolder[C](a: Access[F, C], upload: OUpload[F])( def handleFolder[C](a: Access[F, C], upload: OUpload[F])(
name: String name: String
): MailOp[F, C, ScanResult] = ): MailOp[F, C, ScanResult] =
for { for {
_ <- Kleisli.liftF(ctx.logger.info(s"Processing folder $name")) _ <- Kleisli.liftF(ctx.logger.info(s"Processing folder $name"))
folder <- requireFolder(a)(name) folder <- requireFolder(a)(name)
search <- searchMails(a)(folder) search <- searchMails(a)(folder)
headers <- Kleisli.liftF(filterSubjects(search.mails).flatMap(filterMessageIds)) items = search.mails.map(MailHeaderItem(_))
_ <- headers.traverse(handleOne(ctx.args, a, upload)) headers <- Kleisli.liftF(
filterSubjects(items).flatMap(filterMessageIds)
)
_ <- headers.traverse(handleOne(ctx.args, a, upload))
} yield ScanResult(name, search.mails.size, search.count - search.mails.size) } yield ScanResult(name, search.mails.size, search.count - search.mails.size)
def requireFolder[C](a: Access[F, C])(name: String): MailOp[F, C, MailFolder] = def requireFolder[C](a: Access[F, C])(name: String): MailOp[F, C, MailFolder] =
@ -163,43 +169,39 @@ object ScanMailboxTask {
} }
for { for {
_ <- Kleisli.liftF( _ <- logOp(
ctx.logger.debug( _.debug(s"Searching next ${cfg.mailBatchSize} mails in ${folder.name}.")
s"Searching next ${cfg.mailBatchSize} mails in ${folder.name}."
)
) )
query <- Kleisli.liftF(q) query <- Kleisli.liftF(q)
mails <- a.search(folder, cfg.mailBatchSize)(query) mails <- a.search(folder, cfg.mailBatchSize)(query)
_ <- Kleisli.liftF( _ <- logOp(
ctx.logger.debug( _.debug(
s"Found ${mails.count} mails in folder. Reading first ${mails.mails.size}" s"Found ${mails.count} mails in folder. Reading first ${mails.mails.size}"
) )
) )
} yield mails } yield mails
} }
def filterSubjects(headers: Vector[MailHeader]): F[Vector[MailHeader]] = def filterSubjects(headers: Vector[MailHeaderItem]): F[Vector[MailHeaderItem]] =
ctx.args.subjectFilter match { ctx.args.subjectFilter match {
case Some(sf) => case Some(sf) =>
def check(mh: MailHeader): F[Option[MailHeader]] = def check(mh: MailHeaderItem): F[MailHeaderItem] =
if (sf.matches(caseSensitive = false)(mh.subject)) if (mh.notProcess || sf.matches(caseSensitive = false)(mh.mh.subject))
ctx.logger.debug( mh.pure[F]
s"Including mail '${mh.subject}', it matches the filter."
) *> Option(mh).pure[F]
else else
ctx.logger.debug( ctx.logger.debug(
s"Excluding mail '${mh.subject}', it doesn't match the filter." s"Excluding mail '${mh.mh.subject}', it doesn't match the subject filter."
) *> (None: Option[MailHeader]).pure[F] ) *> mh.skip.pure[F]
ctx.logger.info( ctx.logger.info(
s"Filtering mails on subject using filter: ${sf.asString}" s"Filtering mails on subject using filter: ${sf.asString}"
) *> headers.traverseFilter(check) ) *> headers.traverse(check)
case None => case None =>
ctx.logger.debug("Not matching on subjects. No filter given") *> headers.pure[F] ctx.logger.debug("Not matching on subjects. No filter given") *> headers.pure[F]
} }
def filterMessageIds(headers: Vector[MailHeader]): F[Vector[MailHeader]] = def filterMessageIds[C](headers: Vector[MailHeaderItem]): F[Vector[MailHeaderItem]] =
NonEmptyList.fromFoldable(headers.flatMap(_.messageId)) match { NonEmptyList.fromFoldable(headers.flatMap(_.mh.messageId)) match {
case Some(nl) => case Some(nl) =>
for { for {
archives <- ctx.store.transact( archives <- ctx.store.transact(
@ -207,12 +209,14 @@ object ScanMailboxTask {
.findByMessageIdAndCollective(nl, ctx.args.account.collective) .findByMessageIdAndCollective(nl, ctx.args.account.collective)
) )
existing = archives.flatMap(_.messageId).toSet existing = archives.flatMap(_.messageId).toSet
mails = headers.filterNot(mh => mh.messageId.forall(existing.contains)) mails <- headers
_ <- headers.size - mails.size match { .traverse(mh =>
case 0 => ().pure[F] if (mh.process && mh.mh.messageId.forall(existing.contains))
case n => ctx.logger.debug(
ctx.logger.info(s"Excluded $n mails since items for them already exist.") s"Excluding mail '${mh.mh.subject}' it has been imported in the past.'"
} ) *> mh.skip.pure[F]
else mh.pure[F]
)
} yield mails } yield mails
case None => case None =>
@ -248,15 +252,21 @@ object ScanMailboxTask {
def postHandle[C](a: Access[F, C])(mh: MailHeader): MailOp[F, C, Unit] = def postHandle[C](a: Access[F, C])(mh: MailHeader): MailOp[F, C, Unit] =
ctx.args.targetFolder match { ctx.args.targetFolder match {
case Some(tf) => case Some(tf) =>
a.getOrCreateFolder(None, tf).flatMap(folder => a.moveMail(mh, folder)) logOp(_.debug(s"Post handling mail: ${mh.subject} - moving to folder: $tf"))
.flatMap(_ =>
a.getOrCreateFolder(None, tf).flatMap(folder => a.moveMail(mh, folder))
)
case None if ctx.args.deleteMail => case None if ctx.args.deleteMail =>
a.deleteMail(mh).flatMapF { r => logOp(_.debug(s"Post handling mail: ${mh.subject} - deleting mail.")).flatMap(
if (r.count == 0) _ =>
ctx.logger.warn(s"Mail '${mh.subject}' could not be deleted") a.deleteMail(mh).flatMapF { r =>
else ().pure[F] if (r.count == 0)
} ctx.logger.warn(s"Mail could not be deleted!")
else ().pure[F]
}
)
case None => case None =>
MailOp.pure(()) logOp(_.debug(s"Post handling mail: ${mh.subject} - no handling defined!"))
} }
def submitMail(upload: OUpload[F], args: Args)( def submitMail(upload: OUpload[F], args: Args)(
@ -292,13 +302,15 @@ object ScanMailboxTask {
} }
def handleOne[C](args: Args, a: Access[F, C], upload: OUpload[F])( def handleOne[C](args: Args, a: Access[F, C], upload: OUpload[F])(
mh: MailHeader mh: MailHeaderItem
): MailOp[F, C, Unit] = ): MailOp[F, C, Unit] =
for { for {
mail <- a.loadMail(mh) mail <- a.loadMail(mh.mh)
res <- mail match { res <- mail match {
case Some(m) => case Some(m) if mh.process =>
Kleisli.liftF(submitMail(upload, args)(m).attempt) Kleisli.liftF(submitMail(upload, args)(m).attempt)
case Some(_) =>
Kleisli.liftF(Either.right(mh).pure[F])
case None => case None =>
MailOp.pure[F, C, Either[Throwable, OUpload.UploadResult]]( MailOp.pure[F, C, Either[Throwable, OUpload.UploadResult]](
Either.left(new Exception(s"Mail not found")) Either.left(new Exception(s"Mail not found"))
@ -307,10 +319,15 @@ object ScanMailboxTask {
_ <- res.fold( _ <- res.fold(
ex => ex =>
Kleisli.liftF( Kleisli.liftF(
ctx.logger.warn(s"Error submitting '${mh.subject}': ${ex.getMessage}") ctx.logger.warn(s"Error submitting '${mh.mh.subject}': ${ex.getMessage}")
), ),
_ => postHandle(a)(mh) _ => postHandle(a)(mh.mh)
) )
} yield () } yield ()
} }
case class MailHeaderItem(mh: MailHeader, process: Boolean = true) {
def skip = copy(process = false)
def notProcess = !process
}
} }