mirror of
https://github.com/TheAnachronism/docspell.git
synced 2025-06-05 22:55:58 +00:00
Add server limits to importing mails task
This commit is contained in:
parent
f2d67dc816
commit
31a1abf395
@ -81,38 +81,54 @@ object ScanMailboxTask {
|
|||||||
else ().pure[F]
|
else ().pure[F]
|
||||||
} yield inFolders
|
} yield inFolders
|
||||||
|
|
||||||
def processFolders(in: Seq[String]): Stream[F, ScanResult] = {
|
def processFolder(acc: ScanResult, name: String): F[ScanResult] =
|
||||||
val pass =
|
if (acc.noneLeft(name)) acc.pure[F]
|
||||||
for {
|
else
|
||||||
name <- Stream.emits(in).covary[F]
|
mailer
|
||||||
res <-
|
.run(impl.handleFolder(theEmil.access, upload, joex)(name))
|
||||||
Stream.eval(mailer.run(impl.handleFolder(theEmil.access, upload, joex)(name)))
|
.map(_ ++ acc)
|
||||||
} yield res
|
|
||||||
|
|
||||||
pass
|
Stream
|
||||||
.fold1(_ ++ _)
|
.eval(getInitialInput)
|
||||||
.flatMap { sr =>
|
.flatMap(Stream.emits)
|
||||||
if (sr.folders.isEmpty) Stream.emit(sr)
|
.repeat
|
||||||
else processFolders(sr.folders)
|
.evalScan(ScanResult.empty)(processFolder)
|
||||||
|
.takeThrough(result =>
|
||||||
|
result.processed < cfg.maxMails && result.someLeft(inFolders.size)
|
||||||
|
)
|
||||||
|
.lastOr(ScanResult.empty)
|
||||||
|
.evalMap { sr =>
|
||||||
|
if (sr.processed >= cfg.maxMails)
|
||||||
|
ctx.logger.warn(
|
||||||
|
s"Reached server maximum of ${cfg.maxMails} processed mails. Processed ${sr.processed} mails."
|
||||||
|
)
|
||||||
|
else
|
||||||
|
ctx.logger
|
||||||
|
.info(s"Stopped after processing ${sr.processed} mails")
|
||||||
}
|
}
|
||||||
.takeWhile(_.processed < cfg.maxMails)
|
.compile
|
||||||
|
.drain
|
||||||
}
|
}
|
||||||
|
|
||||||
Stream.eval(getInitialInput).flatMap(processFolders).compile.drain
|
case class ScanResult(folders: List[(String, Int)], processed: Int) {
|
||||||
|
|
||||||
|
def ++(sr: ScanResult): ScanResult = {
|
||||||
|
val fs = (folders ++ sr.folders).sortBy(_._2).distinctBy(_._1)
|
||||||
|
ScanResult(fs, processed + sr.processed)
|
||||||
}
|
}
|
||||||
|
|
||||||
case class ScanResult(folders: Seq[String], processed: Int, left: Int) {
|
def noneLeft(name: String): Boolean =
|
||||||
|
folders.find(_._1 == name).exists(_._2 <= 0)
|
||||||
|
|
||||||
|
def someLeft(inputFolders: Int) =
|
||||||
|
ScanResult.empty == this || folders.exists(_._2 > 0) || inputFolders > folders.size
|
||||||
|
|
||||||
/** Removes folders where nothing is left to process */
|
|
||||||
def ++(sr: ScanResult): ScanResult =
|
|
||||||
if (left == 0) ScanResult(sr.folders, processed + sr.processed, sr.left)
|
|
||||||
else if (sr.left == 0) ScanResult(folders, processed + sr.processed, left)
|
|
||||||
else ScanResult(folders ++ sr.folders, processed + sr.processed, left + sr.left)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
object ScanResult {
|
object ScanResult {
|
||||||
|
val empty = ScanResult(Nil, 0)
|
||||||
def apply(folder: String, processed: Int, left: Int): ScanResult =
|
def apply(folder: String, processed: Int, left: Int): ScanResult =
|
||||||
if (left <= 0) ScanResult(Seq.empty, processed, 0)
|
ScanResult(List(folder -> left), processed)
|
||||||
else ScanResult(Seq(folder), processed, left)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
final private class Impl[F[_]: Sync](cfg: Config.ScanMailbox, ctx: Context[F, Args]) {
|
final private class Impl[F[_]: Sync](cfg: Config.ScanMailbox, ctx: Context[F, Args]) {
|
||||||
@ -127,7 +143,7 @@ object ScanMailboxTask {
|
|||||||
headers <- Kleisli.liftF(filterMessageIds(search.mails))
|
headers <- Kleisli.liftF(filterMessageIds(search.mails))
|
||||||
_ <- headers.traverse(handleOne(a, upload))
|
_ <- headers.traverse(handleOne(a, upload))
|
||||||
_ <- Kleisli.liftF(joex.notifyAllNodes)
|
_ <- Kleisli.liftF(joex.notifyAllNodes)
|
||||||
} yield ScanResult(name, headers.size, search.count - search.mails.size)
|
} yield ScanResult(name, search.mails.size, search.count - search.mails.size)
|
||||||
|
|
||||||
def requireFolder[C](a: Access[F, C])(name: String): MailOp[F, C, MailFolder] =
|
def requireFolder[C](a: Access[F, C])(name: String): MailOp[F, C, MailFolder] =
|
||||||
if ("INBOX".equalsIgnoreCase(name)) a.getInbox
|
if ("INBOX".equalsIgnoreCase(name)) a.getInbox
|
||||||
@ -147,10 +163,17 @@ object ScanMailboxTask {
|
|||||||
|
|
||||||
for {
|
for {
|
||||||
_ <- Kleisli.liftF(
|
_ <- Kleisli.liftF(
|
||||||
ctx.logger.info(s"Searching next ${cfg.mailChunkSize} mails in ${folder.name}.")
|
ctx.logger.debug(
|
||||||
|
s"Searching next ${cfg.mailChunkSize} mails in ${folder.name}."
|
||||||
|
)
|
||||||
)
|
)
|
||||||
query <- Kleisli.liftF(q)
|
query <- Kleisli.liftF(q)
|
||||||
mails <- a.search(folder, cfg.mailChunkSize)(query)
|
mails <- a.search(folder, cfg.mailChunkSize)(query)
|
||||||
|
_ <- Kleisli.liftF(
|
||||||
|
ctx.logger.debug(
|
||||||
|
s"Found ${mails.count} mails in folder. Reading first ${mails.mails.size}"
|
||||||
|
)
|
||||||
|
)
|
||||||
} yield mails
|
} yield mails
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -166,12 +189,13 @@ object ScanMailboxTask {
|
|||||||
mails = headers.filterNot(mh => mh.messageId.forall(existing.contains))
|
mails = headers.filterNot(mh => mh.messageId.forall(existing.contains))
|
||||||
_ <- headers.size - mails.size match {
|
_ <- headers.size - mails.size match {
|
||||||
case 0 => ().pure[F]
|
case 0 => ().pure[F]
|
||||||
case n => ctx.logger.info(s"Excluded $n mails since items for them already exist.")
|
case n =>
|
||||||
|
ctx.logger.info(s"Excluded $n mails since items for them already exist.")
|
||||||
}
|
}
|
||||||
} yield mails
|
} yield mails
|
||||||
|
|
||||||
case None =>
|
case None =>
|
||||||
ctx.logger.info("No mails found") *> headers.pure[F]
|
headers.pure[F]
|
||||||
}
|
}
|
||||||
|
|
||||||
def getDirection(mh: MailHeader): F[Direction] = {
|
def getDirection(mh: MailHeader): F[Direction] = {
|
||||||
@ -260,20 +284,5 @@ object ScanMailboxTask {
|
|||||||
_ => postHandle(a)(mh)
|
_ => postHandle(a)(mh)
|
||||||
)
|
)
|
||||||
} yield ()
|
} yield ()
|
||||||
|
|
||||||
// limit number of folders
|
|
||||||
// limit number of mails to retrieve per folder
|
|
||||||
// per folder:
|
|
||||||
// fetch X mails
|
|
||||||
// check via msgId if already present; if not:
|
|
||||||
// load mail
|
|
||||||
// serialize to *bytes*
|
|
||||||
// store mail in queue
|
|
||||||
// move mail or delete or do nothing
|
|
||||||
// errors: log and keep going
|
|
||||||
// errors per folder fetch: fail the task
|
|
||||||
// notifiy joex after each batch
|
|
||||||
//
|
|
||||||
// no message id? make hash over complete mail or just import it
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user