Initial impl of import from mailbox user task

This commit is contained in:
Eike Kettner 2020-05-19 07:39:02 +02:00
parent 451a09dda0
commit f2d67dc816
8 changed files with 292 additions and 11 deletions

View File

@ -96,6 +96,7 @@ object MimeType {
val tiff = image("tiff") val tiff = image("tiff")
val html = text("html") val html = text("html")
val plain = text("plain") val plain = text("plain")
val eml = MimeType("message", "rfc822", Map.empty)
object PdfMatch { object PdfMatch {
def unapply(mt: MimeType): Option[MimeType] = def unapply(mt: MimeType): Option[MimeType] =

View File

@ -90,6 +90,28 @@ docspell.joex {
wakeup-period = "10 minutes" wakeup-period = "10 minutes"
} }
# Configuration for the user-tasks.
user-tasks {
# Allows to import e-mails by scanning a mailbox.
scan-mailbox {
# A limit of how many folders to scan through. If a user
# configures more than this, only upto this limit folders are
# scanned and a warning is logged.
max-folders = 50
# How many mails (headers only) to retrieve in one chunk.
mail-chunk-size = 100
# A limit on how many mails to process in one job run. This is
# only to avoid resource allocation to one user/collective.
#
# If more than this number of mails is encountered, a warning is
# logged.
max-mails = 1000
}
}
# Docspell uses periodic house keeping tasks, like cleaning expired # Docspell uses periodic house keeping tasks, like cleaning expired
# invites, that can be configured here. # invites, that can be configured here.
house-keeping { house-keeping {

View File

@ -16,6 +16,7 @@ case class Config(
jdbc: JdbcConfig, jdbc: JdbcConfig,
scheduler: SchedulerConfig, scheduler: SchedulerConfig,
periodicScheduler: PeriodicSchedulerConfig, periodicScheduler: PeriodicSchedulerConfig,
userTasks: Config.UserTasks,
houseKeeping: HouseKeepingConfig, houseKeeping: HouseKeepingConfig,
extraction: ExtractConfig, extraction: ExtractConfig,
textAnalysis: TextAnalysisConfig, textAnalysis: TextAnalysisConfig,
@ -26,4 +27,7 @@ case class Config(
object Config { object Config {
case class Bind(address: String, port: Int) case class Bind(address: String, port: Int)
case class ScanMailbox(maxFolders: Int, mailChunkSize: Int, maxMails: Int)
case class UserTasks(scanMailbox: ScanMailbox)
} }

View File

@ -90,7 +90,7 @@ object JoexAppImpl {
.withTask( .withTask(
JobTask.json( JobTask.json(
ScanMailboxArgs.taskName, ScanMailboxArgs.taskName,
ScanMailboxTask[F](javaEmil, upload), ScanMailboxTask[F](cfg.userTasks.scanMailbox, javaEmil, upload, joex),
ScanMailboxTask.onCancel[F] ScanMailboxTask.onCancel[F]
) )
) )

View File

@ -1,20 +1,32 @@
package docspell.joex.scanmailbox package docspell.joex.scanmailbox
import fs2._
import cats.implicits._ import cats.implicits._
import cats.effect._ import cats.effect._
import emil._ import emil.{MimeType => _, _}
//import emil.javamail.syntax._ import emil.javamail.syntax._
import emil.SearchQuery.{All, ReceivedDate}
import docspell.common._ import docspell.common._
import docspell.backend.ops.OUpload import docspell.backend.ops.{OJoex, OUpload}
import docspell.store.records._ import docspell.store.records._
import docspell.joex.Config
import docspell.joex.scheduler.{Context, Task} import docspell.joex.scheduler.{Context, Task}
import docspell.store.queries.QOrganization
import cats.data.Kleisli
import cats.data.NonEmptyList
import cats.data.OptionT
object ScanMailboxTask { object ScanMailboxTask {
val maxItems: Long = 7 val maxItems: Long = 7
type Args = ScanMailboxArgs type Args = ScanMailboxArgs
def apply[F[_]: Sync](emil: Emil[F], upload: OUpload[F]): Task[F, Args, Unit] = def apply[F[_]: Sync](
cfg: Config.ScanMailbox,
emil: Emil[F],
upload: OUpload[F],
joex: OJoex[F]
): Task[F, Args, Unit] =
Task { ctx => Task { ctx =>
for { for {
_ <- ctx.logger.info( _ <- ctx.logger.info(
@ -27,7 +39,7 @@ object ScanMailboxTask {
_ <- ctx.logger.info( _ <- ctx.logger.info(
s"Reading mails for user ${userId.id} from ${imapConn.id}/${folders}" s"Reading mails for user ${userId.id} from ${imapConn.id}/${folders}"
) )
_ <- importMails(mailCfg, emil, upload, ctx) _ <- importMails(cfg, mailCfg, emil, upload, joex, ctx)
} yield () } yield ()
} }
@ -48,14 +60,206 @@ object ScanMailboxTask {
} }
def importMails[F[_]: Sync]( def importMails[F[_]: Sync](
cfg: RUserImap, cfg: Config.ScanMailbox,
emil: Emil[F], mailCfg: RUserImap,
theEmil: Emil[F],
upload: OUpload[F], upload: OUpload[F],
joex: OJoex[F],
ctx: Context[F, Args] ctx: Context[F, Args]
): F[Unit] = ): F[Unit] = {
Sync[F].delay(println(s"$emil $ctx $cfg $upload")) val mailer = theEmil(mailCfg.toMailConfig)
val impl = new Impl[F](cfg, ctx)
val inFolders = ctx.args.folders.take(cfg.maxFolders)
object Impl { val getInitialInput =
for {
_ <-
if (inFolders.size != ctx.args.folders.size)
ctx.logger.warn(
s"More than ${cfg.maxFolders} submitted. Only first ${cfg.maxFolders} will be scanned."
)
else ().pure[F]
} yield inFolders
def processFolders(in: Seq[String]): Stream[F, ScanResult] = {
val pass =
for {
name <- Stream.emits(in).covary[F]
res <-
Stream.eval(mailer.run(impl.handleFolder(theEmil.access, upload, joex)(name)))
} yield res
pass
.fold1(_ ++ _)
.flatMap { sr =>
if (sr.folders.isEmpty) Stream.emit(sr)
else processFolders(sr.folders)
}
.takeWhile(_.processed < cfg.maxMails)
}
Stream.eval(getInitialInput).flatMap(processFolders).compile.drain
}
case class ScanResult(folders: Seq[String], processed: Int, left: Int) {
/** Removes folders where nothing is left to process */
def ++(sr: ScanResult): ScanResult =
if (left == 0) ScanResult(sr.folders, processed + sr.processed, sr.left)
else if (sr.left == 0) ScanResult(folders, processed + sr.processed, left)
else ScanResult(folders ++ sr.folders, processed + sr.processed, left + sr.left)
}
object ScanResult {
def apply(folder: String, processed: Int, left: Int): ScanResult =
if (left <= 0) ScanResult(Seq.empty, processed, 0)
else ScanResult(Seq(folder), processed, left)
}
final private class Impl[F[_]: Sync](cfg: Config.ScanMailbox, ctx: Context[F, Args]) {
def handleFolder[C](a: Access[F, C], upload: OUpload[F], joex: OJoex[F])(
name: String
): MailOp[F, C, ScanResult] =
for {
_ <- Kleisli.liftF(ctx.logger.info(s"Processing folder $name"))
folder <- requireFolder(a)(name)
search <- searchMails(a)(folder)
headers <- Kleisli.liftF(filterMessageIds(search.mails))
_ <- headers.traverse(handleOne(a, upload))
_ <- Kleisli.liftF(joex.notifyAllNodes)
} yield ScanResult(name, headers.size, search.count - search.mails.size)
def requireFolder[C](a: Access[F, C])(name: String): MailOp[F, C, MailFolder] =
if ("INBOX".equalsIgnoreCase(name)) a.getInbox
else //TODO resolve sub-folders
a.findFolder(None, name)
.map(_.toRight(new Exception(s"Folder '$name' not found")))
.mapF(_.rethrow)
def searchMails[C](
a: Access[F, C]
)(folder: MailFolder): MailOp[F, C, SearchResult[MailHeader]] = {
val q = ctx.args.receivedSince match {
case Some(d) =>
Timestamp.current[F].map(now => ReceivedDate >= now.minus(d).value)
case None => All.pure[F]
}
for {
_ <- Kleisli.liftF(
ctx.logger.info(s"Searching next ${cfg.mailChunkSize} mails in ${folder.name}.")
)
query <- Kleisli.liftF(q)
mails <- a.search(folder, cfg.mailChunkSize)(query)
} yield mails
}
def filterMessageIds(headers: Vector[MailHeader]): F[Vector[MailHeader]] =
NonEmptyList.fromFoldable(headers.flatMap(_.messageId)) match {
case Some(nl) =>
for {
archives <- ctx.store.transact(
RAttachmentArchive
.findByMessageIdAndCollective(nl, ctx.args.account.collective)
)
existing = archives.flatMap(_.messageId).toSet
mails = headers.filterNot(mh => mh.messageId.forall(existing.contains))
_ <- headers.size - mails.size match {
case 0 => ().pure[F]
case n => ctx.logger.info(s"Excluded $n mails since items for them already exist.")
}
} yield mails
case None =>
ctx.logger.info("No mails found") *> headers.pure[F]
}
def getDirection(mh: MailHeader): F[Direction] = {
val out: OptionT[F, Direction] =
for {
from <- OptionT.fromOption[F](mh.from)
_ <- OptionT(
ctx.store.transact(
QOrganization
.findPersonByContact(
ctx.args.account.collective,
from.address,
Some(ContactKind.Email),
Some(true)
)
.take(1)
.compile
.last
)
)
} yield Direction.Outgoing
OptionT
.fromOption[F](ctx.args.direction)
.orElse(out)
.getOrElse(Direction.Incoming)
}
def postHandle[C](a: Access[F, C])(mh: MailHeader): MailOp[F, C, Unit] =
ctx.args.targetFolder match {
case Some(tf) =>
a.getOrCreateFolder(None, tf).flatMap(folder => a.moveMail(mh, folder))
case None if ctx.args.deleteMail =>
a.deleteMail(mh).flatMapF { r =>
if (r.count == 0)
ctx.logger.warn(s"Mail '${mh.subject}' could not be deleted")
else ().pure[F]
}
case None =>
MailOp.pure(())
}
def submitMail(upload: OUpload[F])(mail: Mail[F]): F[OUpload.UploadResult] = {
val file = OUpload.File(
Some(mail.header.subject + ".eml"),
Some(MimeType.eml),
mail.toByteStream
)
for {
_ <- ctx.logger.debug(s"Submitting mail '${mail.header.subject}'")
dir <- getDirection(mail.header)
meta = OUpload.UploadMeta(
Some(dir),
s"mailbox-${ctx.args.account.user.id}",
Seq.empty
)
data = OUpload.UploadData(
multiple = false,
meta = meta,
files = Vector(file),
priority = Priority.Low,
tracker = None
)
res <- upload.submit(data, ctx.args.account, false)
} yield res
}
def handleOne[C](a: Access[F, C], upload: OUpload[F])(
mh: MailHeader
): MailOp[F, C, Unit] =
for {
mail <- a.loadMail(mh)
res <- mail match {
case Some(m) =>
Kleisli.liftF(submitMail(upload)(m).attempt)
case None =>
MailOp.pure[F, C, Either[Throwable, OUpload.UploadResult]](
Either.left(new Exception(s"Mail not found"))
)
}
_ <- res.fold(
ex =>
Kleisli.liftF(
ctx.logger.warn(s"Error submitting '${mh.subject}': ${ex.getMessage}")
),
_ => postHandle(a)(mh)
)
} yield ()
// limit number of folders // limit number of folders
// limit number of mails to retrieve per folder // limit number of mails to retrieve per folder

View File

@ -2,6 +2,7 @@ package docspell.store.impl
import doobie._, doobie.implicits._ import doobie._, doobie.implicits._
import docspell.store.impl.DoobieSyntax._ import docspell.store.impl.DoobieSyntax._
import cats.data.NonEmptyList
case class Column(name: String, ns: String = "", alias: String = "") { case class Column(name: String, ns: String = "", alias: String = "") {
@ -46,6 +47,9 @@ case class Column(name: String, ns: String = "", alias: String = "") {
def isIn(values: Seq[Fragment]): Fragment = def isIn(values: Seq[Fragment]): Fragment =
f ++ fr"IN (" ++ commas(values) ++ fr")" f ++ fr"IN (" ++ commas(values) ++ fr")"
def isIn[A: Put](values: NonEmptyList[A]): Fragment =
isIn(values.map(a => sql"$a").toList)
def isIn(frag: Fragment): Fragment = def isIn(frag: Fragment): Fragment =
f ++ fr"IN (" ++ frag ++ fr")" f ++ fr"IN (" ++ frag ++ fr")"

View File

@ -86,6 +86,31 @@ object QOrganization {
}) })
} }
def findPersonByContact(
coll: Ident,
value: String,
ck: Option[ContactKind],
concerning: Option[Boolean]
): Stream[ConnectionIO, RPerson] = {
val pColl = PC.cid.prefix("p")
val pConc = PC.concerning.prefix("p")
val pId = PC.pid.prefix("p")
val cPers = RContact.Columns.personId.prefix("c")
val cVal = RContact.Columns.value.prefix("c")
val cKind = RContact.Columns.kind.prefix("c")
val from = RPerson.table ++ fr"p INNER JOIN" ++
RContact.table ++ fr"c ON" ++ cPers.is(pId)
val q = Seq(
cVal.lowerLike(s"%${value.toLowerCase}%"),
pColl.is(coll)
) ++ concerning.map(pConc.is(_)).toSeq ++ ck.map(cKind.is(_)).toSeq
selectDistinct(PC.all.map(_.prefix("p")), from, and(q))
.query[RPerson]
.stream
}
def addOrg[F[_]]( def addOrg[F[_]](
org: ROrganization, org: ROrganization,
contacts: Seq[RContact], contacts: Seq[RContact],

View File

@ -6,6 +6,7 @@ import doobie.implicits._
import docspell.common._ import docspell.common._
import docspell.store.impl._ import docspell.store.impl._
import docspell.store.impl.Implicits._ import docspell.store.impl.Implicits._
import cats.data.NonEmptyList
/** The archive file of some attachment. The `id` is shared with the /** The archive file of some attachment. The `id` is shared with the
* attachment, to create a 0..1-1 relationship. * attachment, to create a 0..1-1 relationship.
@ -72,6 +73,26 @@ object RAttachmentArchive {
selectSimple(all.map(_.prefix("a")), from, where).query[RAttachmentArchive].option selectSimple(all.map(_.prefix("a")), from, where).query[RAttachmentArchive].option
} }
def findByMessageIdAndCollective(
messageIds: NonEmptyList[String],
collective: Ident
): ConnectionIO[Vector[RAttachmentArchive]] = {
val bId = RAttachment.Columns.id.prefix("b")
val bItem = RAttachment.Columns.itemId.prefix("b")
val aMsgId = Columns.messageId.prefix("a")
val aId = Columns.id.prefix("a")
val iId = RItem.Columns.id.prefix("i")
val iColl = RItem.Columns.cid.prefix("i")
val from = table ++ fr"a INNER JOIN" ++
RAttachment.table ++ fr"b ON" ++ aId.is(bId) ++
fr"INNER JOIN" ++ RItem.table ++ fr"i ON" ++ bItem.is(iId)
val where = and(aMsgId.isIn(messageIds), iColl.is(collective))
selectSimple(all.map(_.prefix("a")), from, where).query[RAttachmentArchive].to[Vector]
}
def findByItemWithMeta( def findByItemWithMeta(
id: Ident id: Ident
): ConnectionIO[Vector[(RAttachmentArchive, FileMeta)]] = { ): ConnectionIO[Vector[(RAttachmentArchive, FileMeta)]] = {