From f2d67dc816ad2109b9912e9712f07334901fa06d Mon Sep 17 00:00:00 2001 From: Eike Kettner Date: Tue, 19 May 2020 07:39:02 +0200 Subject: [PATCH] Initial impl of import from mailbox user task --- .../main/scala/docspell/common/MimeType.scala | 1 + .../joex/src/main/resources/reference.conf | 22 ++ .../src/main/scala/docspell/joex/Config.scala | 4 + .../scala/docspell/joex/JoexAppImpl.scala | 2 +- .../joex/scanmailbox/ScanMailboxTask.scala | 224 +++++++++++++++++- .../scala/docspell/store/impl/Column.scala | 4 + .../store/queries/QOrganization.scala | 25 ++ .../store/records/RAttachmentArchive.scala | 21 ++ 8 files changed, 292 insertions(+), 11 deletions(-) diff --git a/modules/common/src/main/scala/docspell/common/MimeType.scala b/modules/common/src/main/scala/docspell/common/MimeType.scala index c312d370..f5230196 100644 --- a/modules/common/src/main/scala/docspell/common/MimeType.scala +++ b/modules/common/src/main/scala/docspell/common/MimeType.scala @@ -96,6 +96,7 @@ object MimeType { val tiff = image("tiff") val html = text("html") val plain = text("plain") + val eml = MimeType("message", "rfc822", Map.empty) object PdfMatch { def unapply(mt: MimeType): Option[MimeType] = diff --git a/modules/joex/src/main/resources/reference.conf b/modules/joex/src/main/resources/reference.conf index fc9f7892..e4b1a38c 100644 --- a/modules/joex/src/main/resources/reference.conf +++ b/modules/joex/src/main/resources/reference.conf @@ -90,6 +90,28 @@ docspell.joex { wakeup-period = "10 minutes" } + # Configuration for the user-tasks. + user-tasks { + # Allows to import e-mails by scanning a mailbox. + scan-mailbox { + # A limit of how many folders to scan through. If a user + # configures more than this, only upto this limit folders are + # scanned and a warning is logged. + max-folders = 50 + + # How many mails (headers only) to retrieve in one chunk. + mail-chunk-size = 100 + + # A limit on how many mails to process in one job run. This is + # only to avoid resource allocation to one user/collective. + # + # If more than this number of mails is encountered, a warning is + # logged. + max-mails = 1000 + } + } + + # Docspell uses periodic house keeping tasks, like cleaning expired # invites, that can be configured here. house-keeping { diff --git a/modules/joex/src/main/scala/docspell/joex/Config.scala b/modules/joex/src/main/scala/docspell/joex/Config.scala index 58a9cb31..fb6df973 100644 --- a/modules/joex/src/main/scala/docspell/joex/Config.scala +++ b/modules/joex/src/main/scala/docspell/joex/Config.scala @@ -16,6 +16,7 @@ case class Config( jdbc: JdbcConfig, scheduler: SchedulerConfig, periodicScheduler: PeriodicSchedulerConfig, + userTasks: Config.UserTasks, houseKeeping: HouseKeepingConfig, extraction: ExtractConfig, textAnalysis: TextAnalysisConfig, @@ -26,4 +27,7 @@ case class Config( object Config { case class Bind(address: String, port: Int) + + case class ScanMailbox(maxFolders: Int, mailChunkSize: Int, maxMails: Int) + case class UserTasks(scanMailbox: ScanMailbox) } diff --git a/modules/joex/src/main/scala/docspell/joex/JoexAppImpl.scala b/modules/joex/src/main/scala/docspell/joex/JoexAppImpl.scala index 354c81cd..ea11a908 100644 --- a/modules/joex/src/main/scala/docspell/joex/JoexAppImpl.scala +++ b/modules/joex/src/main/scala/docspell/joex/JoexAppImpl.scala @@ -90,7 +90,7 @@ object JoexAppImpl { .withTask( JobTask.json( ScanMailboxArgs.taskName, - ScanMailboxTask[F](javaEmil, upload), + ScanMailboxTask[F](cfg.userTasks.scanMailbox, javaEmil, upload, joex), ScanMailboxTask.onCancel[F] ) ) diff --git a/modules/joex/src/main/scala/docspell/joex/scanmailbox/ScanMailboxTask.scala b/modules/joex/src/main/scala/docspell/joex/scanmailbox/ScanMailboxTask.scala index c016515b..e0956ef8 100644 --- a/modules/joex/src/main/scala/docspell/joex/scanmailbox/ScanMailboxTask.scala +++ b/modules/joex/src/main/scala/docspell/joex/scanmailbox/ScanMailboxTask.scala @@ -1,20 +1,32 @@ package docspell.joex.scanmailbox +import fs2._ import cats.implicits._ import cats.effect._ -import emil._ -//import emil.javamail.syntax._ +import emil.{MimeType => _, _} +import emil.javamail.syntax._ +import emil.SearchQuery.{All, ReceivedDate} import docspell.common._ -import docspell.backend.ops.OUpload +import docspell.backend.ops.{OJoex, OUpload} import docspell.store.records._ +import docspell.joex.Config import docspell.joex.scheduler.{Context, Task} +import docspell.store.queries.QOrganization +import cats.data.Kleisli +import cats.data.NonEmptyList +import cats.data.OptionT object ScanMailboxTask { val maxItems: Long = 7 type Args = ScanMailboxArgs - def apply[F[_]: Sync](emil: Emil[F], upload: OUpload[F]): Task[F, Args, Unit] = + def apply[F[_]: Sync]( + cfg: Config.ScanMailbox, + emil: Emil[F], + upload: OUpload[F], + joex: OJoex[F] + ): Task[F, Args, Unit] = Task { ctx => for { _ <- ctx.logger.info( @@ -27,7 +39,7 @@ object ScanMailboxTask { _ <- ctx.logger.info( s"Reading mails for user ${userId.id} from ${imapConn.id}/${folders}" ) - _ <- importMails(mailCfg, emil, upload, ctx) + _ <- importMails(cfg, mailCfg, emil, upload, joex, ctx) } yield () } @@ -48,14 +60,206 @@ object ScanMailboxTask { } def importMails[F[_]: Sync]( - cfg: RUserImap, - emil: Emil[F], + cfg: Config.ScanMailbox, + mailCfg: RUserImap, + theEmil: Emil[F], upload: OUpload[F], + joex: OJoex[F], ctx: Context[F, Args] - ): F[Unit] = - Sync[F].delay(println(s"$emil $ctx $cfg $upload")) + ): F[Unit] = { + val mailer = theEmil(mailCfg.toMailConfig) + val impl = new Impl[F](cfg, ctx) + val inFolders = ctx.args.folders.take(cfg.maxFolders) - object Impl { + val getInitialInput = + for { + _ <- + if (inFolders.size != ctx.args.folders.size) + ctx.logger.warn( + s"More than ${cfg.maxFolders} submitted. Only first ${cfg.maxFolders} will be scanned." + ) + else ().pure[F] + } yield inFolders + + def processFolders(in: Seq[String]): Stream[F, ScanResult] = { + val pass = + for { + name <- Stream.emits(in).covary[F] + res <- + Stream.eval(mailer.run(impl.handleFolder(theEmil.access, upload, joex)(name))) + } yield res + + pass + .fold1(_ ++ _) + .flatMap { sr => + if (sr.folders.isEmpty) Stream.emit(sr) + else processFolders(sr.folders) + } + .takeWhile(_.processed < cfg.maxMails) + } + + Stream.eval(getInitialInput).flatMap(processFolders).compile.drain + } + + case class ScanResult(folders: Seq[String], processed: Int, left: Int) { + + /** Removes folders where nothing is left to process */ + def ++(sr: ScanResult): ScanResult = + if (left == 0) ScanResult(sr.folders, processed + sr.processed, sr.left) + else if (sr.left == 0) ScanResult(folders, processed + sr.processed, left) + else ScanResult(folders ++ sr.folders, processed + sr.processed, left + sr.left) + } + object ScanResult { + def apply(folder: String, processed: Int, left: Int): ScanResult = + if (left <= 0) ScanResult(Seq.empty, processed, 0) + else ScanResult(Seq(folder), processed, left) + } + + final private class Impl[F[_]: Sync](cfg: Config.ScanMailbox, ctx: Context[F, Args]) { + + def handleFolder[C](a: Access[F, C], upload: OUpload[F], joex: OJoex[F])( + name: String + ): MailOp[F, C, ScanResult] = + for { + _ <- Kleisli.liftF(ctx.logger.info(s"Processing folder $name")) + folder <- requireFolder(a)(name) + search <- searchMails(a)(folder) + headers <- Kleisli.liftF(filterMessageIds(search.mails)) + _ <- headers.traverse(handleOne(a, upload)) + _ <- Kleisli.liftF(joex.notifyAllNodes) + } yield ScanResult(name, headers.size, search.count - search.mails.size) + + def requireFolder[C](a: Access[F, C])(name: String): MailOp[F, C, MailFolder] = + if ("INBOX".equalsIgnoreCase(name)) a.getInbox + else //TODO resolve sub-folders + a.findFolder(None, name) + .map(_.toRight(new Exception(s"Folder '$name' not found"))) + .mapF(_.rethrow) + + def searchMails[C]( + a: Access[F, C] + )(folder: MailFolder): MailOp[F, C, SearchResult[MailHeader]] = { + val q = ctx.args.receivedSince match { + case Some(d) => + Timestamp.current[F].map(now => ReceivedDate >= now.minus(d).value) + case None => All.pure[F] + } + + for { + _ <- Kleisli.liftF( + ctx.logger.info(s"Searching next ${cfg.mailChunkSize} mails in ${folder.name}.") + ) + query <- Kleisli.liftF(q) + mails <- a.search(folder, cfg.mailChunkSize)(query) + } yield mails + } + + def filterMessageIds(headers: Vector[MailHeader]): F[Vector[MailHeader]] = + NonEmptyList.fromFoldable(headers.flatMap(_.messageId)) match { + case Some(nl) => + for { + archives <- ctx.store.transact( + RAttachmentArchive + .findByMessageIdAndCollective(nl, ctx.args.account.collective) + ) + existing = archives.flatMap(_.messageId).toSet + mails = headers.filterNot(mh => mh.messageId.forall(existing.contains)) + _ <- headers.size - mails.size match { + case 0 => ().pure[F] + case n => ctx.logger.info(s"Excluded $n mails since items for them already exist.") + } + } yield mails + + case None => + ctx.logger.info("No mails found") *> headers.pure[F] + } + + def getDirection(mh: MailHeader): F[Direction] = { + val out: OptionT[F, Direction] = + for { + from <- OptionT.fromOption[F](mh.from) + _ <- OptionT( + ctx.store.transact( + QOrganization + .findPersonByContact( + ctx.args.account.collective, + from.address, + Some(ContactKind.Email), + Some(true) + ) + .take(1) + .compile + .last + ) + ) + } yield Direction.Outgoing + + OptionT + .fromOption[F](ctx.args.direction) + .orElse(out) + .getOrElse(Direction.Incoming) + } + + def postHandle[C](a: Access[F, C])(mh: MailHeader): MailOp[F, C, Unit] = + ctx.args.targetFolder match { + case Some(tf) => + a.getOrCreateFolder(None, tf).flatMap(folder => a.moveMail(mh, folder)) + case None if ctx.args.deleteMail => + a.deleteMail(mh).flatMapF { r => + if (r.count == 0) + ctx.logger.warn(s"Mail '${mh.subject}' could not be deleted") + else ().pure[F] + } + case None => + MailOp.pure(()) + } + + def submitMail(upload: OUpload[F])(mail: Mail[F]): F[OUpload.UploadResult] = { + val file = OUpload.File( + Some(mail.header.subject + ".eml"), + Some(MimeType.eml), + mail.toByteStream + ) + for { + _ <- ctx.logger.debug(s"Submitting mail '${mail.header.subject}'") + dir <- getDirection(mail.header) + meta = OUpload.UploadMeta( + Some(dir), + s"mailbox-${ctx.args.account.user.id}", + Seq.empty + ) + data = OUpload.UploadData( + multiple = false, + meta = meta, + files = Vector(file), + priority = Priority.Low, + tracker = None + ) + res <- upload.submit(data, ctx.args.account, false) + } yield res + } + + def handleOne[C](a: Access[F, C], upload: OUpload[F])( + mh: MailHeader + ): MailOp[F, C, Unit] = + for { + mail <- a.loadMail(mh) + res <- mail match { + case Some(m) => + Kleisli.liftF(submitMail(upload)(m).attempt) + case None => + MailOp.pure[F, C, Either[Throwable, OUpload.UploadResult]]( + Either.left(new Exception(s"Mail not found")) + ) + } + _ <- res.fold( + ex => + Kleisli.liftF( + ctx.logger.warn(s"Error submitting '${mh.subject}': ${ex.getMessage}") + ), + _ => postHandle(a)(mh) + ) + } yield () // limit number of folders // limit number of mails to retrieve per folder diff --git a/modules/store/src/main/scala/docspell/store/impl/Column.scala b/modules/store/src/main/scala/docspell/store/impl/Column.scala index c2f19abd..d84ed3cf 100644 --- a/modules/store/src/main/scala/docspell/store/impl/Column.scala +++ b/modules/store/src/main/scala/docspell/store/impl/Column.scala @@ -2,6 +2,7 @@ package docspell.store.impl import doobie._, doobie.implicits._ import docspell.store.impl.DoobieSyntax._ +import cats.data.NonEmptyList case class Column(name: String, ns: String = "", alias: String = "") { @@ -46,6 +47,9 @@ case class Column(name: String, ns: String = "", alias: String = "") { def isIn(values: Seq[Fragment]): Fragment = f ++ fr"IN (" ++ commas(values) ++ fr")" + def isIn[A: Put](values: NonEmptyList[A]): Fragment = + isIn(values.map(a => sql"$a").toList) + def isIn(frag: Fragment): Fragment = f ++ fr"IN (" ++ frag ++ fr")" diff --git a/modules/store/src/main/scala/docspell/store/queries/QOrganization.scala b/modules/store/src/main/scala/docspell/store/queries/QOrganization.scala index e9e00631..5fd63b15 100644 --- a/modules/store/src/main/scala/docspell/store/queries/QOrganization.scala +++ b/modules/store/src/main/scala/docspell/store/queries/QOrganization.scala @@ -86,6 +86,31 @@ object QOrganization { }) } + def findPersonByContact( + coll: Ident, + value: String, + ck: Option[ContactKind], + concerning: Option[Boolean] + ): Stream[ConnectionIO, RPerson] = { + val pColl = PC.cid.prefix("p") + val pConc = PC.concerning.prefix("p") + val pId = PC.pid.prefix("p") + val cPers = RContact.Columns.personId.prefix("c") + val cVal = RContact.Columns.value.prefix("c") + val cKind = RContact.Columns.kind.prefix("c") + + val from = RPerson.table ++ fr"p INNER JOIN" ++ + RContact.table ++ fr"c ON" ++ cPers.is(pId) + val q = Seq( + cVal.lowerLike(s"%${value.toLowerCase}%"), + pColl.is(coll) + ) ++ concerning.map(pConc.is(_)).toSeq ++ ck.map(cKind.is(_)).toSeq + + selectDistinct(PC.all.map(_.prefix("p")), from, and(q)) + .query[RPerson] + .stream + } + def addOrg[F[_]]( org: ROrganization, contacts: Seq[RContact], diff --git a/modules/store/src/main/scala/docspell/store/records/RAttachmentArchive.scala b/modules/store/src/main/scala/docspell/store/records/RAttachmentArchive.scala index 04dd38b0..09ccbab7 100644 --- a/modules/store/src/main/scala/docspell/store/records/RAttachmentArchive.scala +++ b/modules/store/src/main/scala/docspell/store/records/RAttachmentArchive.scala @@ -6,6 +6,7 @@ import doobie.implicits._ import docspell.common._ import docspell.store.impl._ import docspell.store.impl.Implicits._ +import cats.data.NonEmptyList /** The archive file of some attachment. The `id` is shared with the * attachment, to create a 0..1-1 relationship. @@ -72,6 +73,26 @@ object RAttachmentArchive { selectSimple(all.map(_.prefix("a")), from, where).query[RAttachmentArchive].option } + def findByMessageIdAndCollective( + messageIds: NonEmptyList[String], + collective: Ident + ): ConnectionIO[Vector[RAttachmentArchive]] = { + val bId = RAttachment.Columns.id.prefix("b") + val bItem = RAttachment.Columns.itemId.prefix("b") + val aMsgId = Columns.messageId.prefix("a") + val aId = Columns.id.prefix("a") + val iId = RItem.Columns.id.prefix("i") + val iColl = RItem.Columns.cid.prefix("i") + + val from = table ++ fr"a INNER JOIN" ++ + RAttachment.table ++ fr"b ON" ++ aId.is(bId) ++ + fr"INNER JOIN" ++ RItem.table ++ fr"i ON" ++ bItem.is(iId) + + val where = and(aMsgId.isIn(messageIds), iColl.is(collective)) + + selectSimple(all.map(_.prefix("a")), from, where).query[RAttachmentArchive].to[Vector] + } + def findByItemWithMeta( id: Ident ): ConnectionIO[Vector[(RAttachmentArchive, FileMeta)]] = {