From 2858d6b8535604e95aee0f34ca7cb64c086d05a4 Mon Sep 17 00:00:00 2001 From: Eike Kettner Date: Wed, 20 May 2020 19:44:45 +0200 Subject: [PATCH] Notify job executors at the end of the task --- .../joex/scanmailbox/ScanMailboxTask.scala | 20 +++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/modules/joex/src/main/scala/docspell/joex/scanmailbox/ScanMailboxTask.scala b/modules/joex/src/main/scala/docspell/joex/scanmailbox/ScanMailboxTask.scala index 572f46a8..8af6b6a7 100644 --- a/modules/joex/src/main/scala/docspell/joex/scanmailbox/ScanMailboxTask.scala +++ b/modules/joex/src/main/scala/docspell/joex/scanmailbox/ScanMailboxTask.scala @@ -85,7 +85,7 @@ object ScanMailboxTask { if (acc.noneLeft(name)) acc.pure[F] else mailer - .run(impl.handleFolder(theEmil.access, upload, joex)(name)) + .run(impl.handleFolder(theEmil.access, upload)(name)) .map(_ ++ acc) Stream @@ -98,13 +98,14 @@ object ScanMailboxTask { ) .lastOr(ScanResult.empty) .evalMap { sr => - if (sr.processed >= cfg.maxMails) - ctx.logger.warn( - s"Reached server maximum of ${cfg.maxMails} processed mails. Processed ${sr.processed} mails." - ) - else - ctx.logger - .info(s"Stopped after processing ${sr.processed} mails") + joex.notifyAllNodes *> + (if (sr.processed >= cfg.maxMails) + ctx.logger.warn( + s"Reached server maximum of ${cfg.maxMails} processed mails. Processed ${sr.processed} mails." + ) + else + ctx.logger + .info(s"Stopped after processing ${sr.processed} mails")) } .compile .drain @@ -133,7 +134,7 @@ object ScanMailboxTask { final private class Impl[F[_]: Sync](cfg: Config.ScanMailbox, ctx: Context[F, Args]) { - def handleFolder[C](a: Access[F, C], upload: OUpload[F], joex: OJoex[F])( + def handleFolder[C](a: Access[F, C], upload: OUpload[F])( name: String ): MailOp[F, C, ScanResult] = for { @@ -142,7 +143,6 @@ object ScanMailboxTask { search <- searchMails(a)(folder) headers <- Kleisli.liftF(filterMessageIds(search.mails)) _ <- headers.traverse(handleOne(a, upload)) - _ <- Kleisli.liftF(joex.notifyAllNodes) } yield ScanResult(name, search.mails.size, search.count - search.mails.size) def requireFolder[C](a: Access[F, C])(name: String): MailOp[F, C, MailFolder] =