Notify job executors at the end of the task

This commit is contained in:
Eike Kettner 2020-05-20 19:44:45 +02:00
parent 31a1abf395
commit 2858d6b853

View File

@ -85,7 +85,7 @@ object ScanMailboxTask {
if (acc.noneLeft(name)) acc.pure[F] if (acc.noneLeft(name)) acc.pure[F]
else else
mailer mailer
.run(impl.handleFolder(theEmil.access, upload, joex)(name)) .run(impl.handleFolder(theEmil.access, upload)(name))
.map(_ ++ acc) .map(_ ++ acc)
Stream Stream
@ -98,13 +98,14 @@ object ScanMailboxTask {
) )
.lastOr(ScanResult.empty) .lastOr(ScanResult.empty)
.evalMap { sr => .evalMap { sr =>
if (sr.processed >= cfg.maxMails) joex.notifyAllNodes *>
ctx.logger.warn( (if (sr.processed >= cfg.maxMails)
s"Reached server maximum of ${cfg.maxMails} processed mails. Processed ${sr.processed} mails." ctx.logger.warn(
) s"Reached server maximum of ${cfg.maxMails} processed mails. Processed ${sr.processed} mails."
else )
ctx.logger else
.info(s"Stopped after processing ${sr.processed} mails") ctx.logger
.info(s"Stopped after processing ${sr.processed} mails"))
} }
.compile .compile
.drain .drain
@ -133,7 +134,7 @@ object ScanMailboxTask {
final private class Impl[F[_]: Sync](cfg: Config.ScanMailbox, ctx: Context[F, Args]) { final private class Impl[F[_]: Sync](cfg: Config.ScanMailbox, ctx: Context[F, Args]) {
def handleFolder[C](a: Access[F, C], upload: OUpload[F], joex: OJoex[F])( def handleFolder[C](a: Access[F, C], upload: OUpload[F])(
name: String name: String
): MailOp[F, C, ScanResult] = ): MailOp[F, C, ScanResult] =
for { for {
@ -142,7 +143,6 @@ object ScanMailboxTask {
search <- searchMails(a)(folder) search <- searchMails(a)(folder)
headers <- Kleisli.liftF(filterMessageIds(search.mails)) headers <- Kleisli.liftF(filterMessageIds(search.mails))
_ <- headers.traverse(handleOne(a, upload)) _ <- headers.traverse(handleOne(a, upload))
_ <- Kleisli.liftF(joex.notifyAllNodes)
} yield ScanResult(name, search.mails.size, search.count - search.mails.size) } yield ScanResult(name, search.mails.size, search.count - search.mails.size)
def requireFolder[C](a: Access[F, C])(name: String): MailOp[F, C, MailFolder] = def requireFolder[C](a: Access[F, C])(name: String): MailOp[F, C, MailFolder] =