From 31a1abf3950212fa78dd0f20aab6b52285050f90 Mon Sep 17 00:00:00 2001 From: Eike Kettner Date: Wed, 20 May 2020 00:17:52 +0200 Subject: [PATCH] Add server limits to importing mails task --- .../joex/scanmailbox/ScanMailboxTask.scala | 97 ++++++++++--------- 1 file changed, 53 insertions(+), 44 deletions(-) diff --git a/modules/joex/src/main/scala/docspell/joex/scanmailbox/ScanMailboxTask.scala b/modules/joex/src/main/scala/docspell/joex/scanmailbox/ScanMailboxTask.scala index e0956ef8..572f46a8 100644 --- a/modules/joex/src/main/scala/docspell/joex/scanmailbox/ScanMailboxTask.scala +++ b/modules/joex/src/main/scala/docspell/joex/scanmailbox/ScanMailboxTask.scala @@ -81,38 +81,54 @@ object ScanMailboxTask { else ().pure[F] } yield inFolders - def processFolders(in: Seq[String]): Stream[F, ScanResult] = { - val pass = - for { - name <- Stream.emits(in).covary[F] - res <- - Stream.eval(mailer.run(impl.handleFolder(theEmil.access, upload, joex)(name))) - } yield res + def processFolder(acc: ScanResult, name: String): F[ScanResult] = + if (acc.noneLeft(name)) acc.pure[F] + else + mailer + .run(impl.handleFolder(theEmil.access, upload, joex)(name)) + .map(_ ++ acc) - pass - .fold1(_ ++ _) - .flatMap { sr => - if (sr.folders.isEmpty) Stream.emit(sr) - else processFolders(sr.folders) - } - .takeWhile(_.processed < cfg.maxMails) + Stream + .eval(getInitialInput) + .flatMap(Stream.emits) + .repeat + .evalScan(ScanResult.empty)(processFolder) + .takeThrough(result => + result.processed < cfg.maxMails && result.someLeft(inFolders.size) + ) + .lastOr(ScanResult.empty) + .evalMap { sr => + if (sr.processed >= cfg.maxMails) + ctx.logger.warn( + s"Reached server maximum of ${cfg.maxMails} processed mails. Processed ${sr.processed} mails." + ) + else + ctx.logger + .info(s"Stopped after processing ${sr.processed} mails") + } + .compile + .drain + } + + case class ScanResult(folders: List[(String, Int)], processed: Int) { + + def ++(sr: ScanResult): ScanResult = { + val fs = (folders ++ sr.folders).sortBy(_._2).distinctBy(_._1) + ScanResult(fs, processed + sr.processed) } - Stream.eval(getInitialInput).flatMap(processFolders).compile.drain + def noneLeft(name: String): Boolean = + folders.find(_._1 == name).exists(_._2 <= 0) + + def someLeft(inputFolders: Int) = + ScanResult.empty == this || folders.exists(_._2 > 0) || inputFolders > folders.size + } - case class ScanResult(folders: Seq[String], processed: Int, left: Int) { - - /** Removes folders where nothing is left to process */ - def ++(sr: ScanResult): ScanResult = - if (left == 0) ScanResult(sr.folders, processed + sr.processed, sr.left) - else if (sr.left == 0) ScanResult(folders, processed + sr.processed, left) - else ScanResult(folders ++ sr.folders, processed + sr.processed, left + sr.left) - } object ScanResult { + val empty = ScanResult(Nil, 0) def apply(folder: String, processed: Int, left: Int): ScanResult = - if (left <= 0) ScanResult(Seq.empty, processed, 0) - else ScanResult(Seq(folder), processed, left) + ScanResult(List(folder -> left), processed) } final private class Impl[F[_]: Sync](cfg: Config.ScanMailbox, ctx: Context[F, Args]) { @@ -127,7 +143,7 @@ object ScanMailboxTask { headers <- Kleisli.liftF(filterMessageIds(search.mails)) _ <- headers.traverse(handleOne(a, upload)) _ <- Kleisli.liftF(joex.notifyAllNodes) - } yield ScanResult(name, headers.size, search.count - search.mails.size) + } yield ScanResult(name, search.mails.size, search.count - search.mails.size) def requireFolder[C](a: Access[F, C])(name: String): MailOp[F, C, MailFolder] = if ("INBOX".equalsIgnoreCase(name)) a.getInbox @@ -147,10 +163,17 @@ object ScanMailboxTask { for { _ <- Kleisli.liftF( - ctx.logger.info(s"Searching next ${cfg.mailChunkSize} mails in ${folder.name}.") + ctx.logger.debug( + s"Searching next ${cfg.mailChunkSize} mails in ${folder.name}." + ) ) query <- Kleisli.liftF(q) mails <- a.search(folder, cfg.mailChunkSize)(query) + _ <- Kleisli.liftF( + ctx.logger.debug( + s"Found ${mails.count} mails in folder. Reading first ${mails.mails.size}" + ) + ) } yield mails } @@ -166,12 +189,13 @@ object ScanMailboxTask { mails = headers.filterNot(mh => mh.messageId.forall(existing.contains)) _ <- headers.size - mails.size match { case 0 => ().pure[F] - case n => ctx.logger.info(s"Excluded $n mails since items for them already exist.") + case n => + ctx.logger.info(s"Excluded $n mails since items for them already exist.") } } yield mails case None => - ctx.logger.info("No mails found") *> headers.pure[F] + headers.pure[F] } def getDirection(mh: MailHeader): F[Direction] = { @@ -260,20 +284,5 @@ object ScanMailboxTask { _ => postHandle(a)(mh) ) } yield () - - // limit number of folders - // limit number of mails to retrieve per folder - // per folder: - // fetch X mails - // check via msgId if already present; if not: - // load mail - // serialize to *bytes* - // store mail in queue - // move mail or delete or do nothing - // errors: log and keep going - // errors per folder fetch: fail the task - // notifiy joex after each batch - // - // no message id? make hash over complete mail or just import it } }