diff --git a/modules/backend/src/main/scala/docspell/backend/BackendApp.scala b/modules/backend/src/main/scala/docspell/backend/BackendApp.scala index 331d79d4..f86bf39e 100644 --- a/modules/backend/src/main/scala/docspell/backend/BackendApp.scala +++ b/modules/backend/src/main/scala/docspell/backend/BackendApp.scala @@ -48,7 +48,7 @@ object BackendApp { equipImpl <- OEquipment[F](store) orgImpl <- OOrganization(store) joexImpl <- OJoex.create(httpClientEc, store) - uploadImpl <- OUpload(store, queue, cfg, joexImpl) + uploadImpl <- OUpload(store, queue, cfg.files, joexImpl) nodeImpl <- ONode(store) jobImpl <- OJob(store, joexImpl) itemImpl <- OItem(store) diff --git a/modules/backend/src/main/scala/docspell/backend/ops/OUpload.scala b/modules/backend/src/main/scala/docspell/backend/ops/OUpload.scala index a99fd305..3130e635 100644 --- a/modules/backend/src/main/scala/docspell/backend/ops/OUpload.scala +++ b/modules/backend/src/main/scala/docspell/backend/ops/OUpload.scala @@ -52,7 +52,7 @@ object OUpload { def apply[F[_]: Sync]( store: Store[F], queue: JobQueue[F], - cfg: Config, + cfg: Config.Files, joex: OJoex[F] ): Resource[F, OUpload[F]] = Resource.pure[F, OUpload[F]](new OUpload[F] { @@ -105,7 +105,7 @@ object OUpload { private def saveFile(file: File[F]): F[Option[ProcessItemArgs.File]] = logger.finfo(s"Receiving file $file") *> store.bitpeace - .saveNew(file.data, cfg.files.chunkSize, MimetypeHint(file.name, None), None) + .saveNew(file.data, cfg.chunkSize, MimetypeHint(file.name, None), None) .compile .lastOrError .map(fm => Ident.unsafe(fm.id)) diff --git a/modules/joex/src/main/resources/reference.conf b/modules/joex/src/main/resources/reference.conf index ff90f24f..fc9f7892 100644 --- a/modules/joex/src/main/resources/reference.conf +++ b/modules/joex/src/main/resources/reference.conf @@ -308,4 +308,28 @@ docspell.joex { working-dir = ${java.io.tmpdir}"/docspell-convert" } } + + # The same section is also present in the rest-server config. It is + # used when submitting files into the job queue for processing. + # + # Currently, these settings may affect memory usage of all nodes, so + # it should be the same on all nodes. + files { + # Defines the chunk size (in bytes) used to store the files. + # This will affect the memory footprint when uploading and + # downloading files. At most this amount is loaded into RAM for + # down- and uploading. + # + # It also defines the chunk size used for the blobs inside the + # database. + chunk-size = 524288 + + # The file content types that are considered valid. Docspell + # will only pass these files to processing. The processing code + # itself has also checks for which files are supported and which + # not. This affects the uploading part and can be used to + # restrict file types that should be handed over to processing. + # By default all files are allowed. + valid-mime-types = [ ] + } } \ No newline at end of file diff --git a/modules/joex/src/main/scala/docspell/joex/Config.scala b/modules/joex/src/main/scala/docspell/joex/Config.scala index 98280392..58a9cb31 100644 --- a/modules/joex/src/main/scala/docspell/joex/Config.scala +++ b/modules/joex/src/main/scala/docspell/joex/Config.scala @@ -7,6 +7,7 @@ import docspell.store.JdbcConfig import docspell.convert.ConvertConfig import docspell.extract.ExtractConfig import docspell.joex.hk.HouseKeepingConfig +import docspell.backend.Config.Files case class Config( appId: Ident, @@ -19,7 +20,8 @@ case class Config( extraction: ExtractConfig, textAnalysis: TextAnalysisConfig, convert: ConvertConfig, - sendMail: MailSendConfig + sendMail: MailSendConfig, + files: Files ) object Config { diff --git a/modules/joex/src/main/scala/docspell/joex/JoexAppImpl.scala b/modules/joex/src/main/scala/docspell/joex/JoexAppImpl.scala index ac6ff7c7..354c81cd 100644 --- a/modules/joex/src/main/scala/docspell/joex/JoexAppImpl.scala +++ b/modules/joex/src/main/scala/docspell/joex/JoexAppImpl.scala @@ -4,6 +4,7 @@ import cats.implicits._ import cats.effect._ import emil.javamail._ import docspell.common._ +import docspell.backend.ops._ import docspell.joex.hk._ import docspell.joex.notify._ import docspell.joex.scanmailbox._ @@ -12,7 +13,6 @@ import docspell.joex.scheduler._ import docspell.joexapi.client.JoexClient import docspell.store.Store import docspell.store.queue._ -import docspell.backend.ops.ONode import docspell.store.records.RJobLog import fs2.concurrent.SignallingRef import scala.concurrent.ExecutionContext @@ -68,6 +68,8 @@ object JoexAppImpl { queue <- JobQueue(store) pstore <- PeriodicTaskStore.create(store) nodeOps <- ONode(store) + joex <- OJoex(client, store) + upload <- OUpload(store, queue, cfg.files, joex) javaEmil = JavaMailEmil(blocker) sch <- SchedulerBuilder(cfg.scheduler, blocker, store) .withQueue(queue) @@ -88,7 +90,7 @@ object JoexAppImpl { .withTask( JobTask.json( ScanMailboxArgs.taskName, - ScanMailboxTask[F](javaEmil), + ScanMailboxTask[F](javaEmil, upload), ScanMailboxTask.onCancel[F] ) ) diff --git a/modules/joex/src/main/scala/docspell/joex/scanmailbox/ScanMailboxTask.scala b/modules/joex/src/main/scala/docspell/joex/scanmailbox/ScanMailboxTask.scala index 45dd1cf2..c016515b 100644 --- a/modules/joex/src/main/scala/docspell/joex/scanmailbox/ScanMailboxTask.scala +++ b/modules/joex/src/main/scala/docspell/joex/scanmailbox/ScanMailboxTask.scala @@ -6,6 +6,7 @@ import emil._ //import emil.javamail.syntax._ import docspell.common._ +import docspell.backend.ops.OUpload import docspell.store.records._ import docspell.joex.scheduler.{Context, Task} @@ -13,7 +14,7 @@ object ScanMailboxTask { val maxItems: Long = 7 type Args = ScanMailboxArgs - def apply[F[_]: Sync](emil: Emil[F]): Task[F, Args, Unit] = + def apply[F[_]: Sync](emil: Emil[F], upload: OUpload[F]): Task[F, Args, Unit] = Task { ctx => for { _ <- ctx.logger.info( @@ -26,7 +27,7 @@ object ScanMailboxTask { _ <- ctx.logger.info( s"Reading mails for user ${userId.id} from ${imapConn.id}/${folders}" ) - _ <- importMails(mailCfg, emil, ctx) + _ <- importMails(mailCfg, emil, upload, ctx) } yield () } @@ -49,7 +50,26 @@ object ScanMailboxTask { def importMails[F[_]: Sync]( cfg: RUserImap, emil: Emil[F], + upload: OUpload[F], ctx: Context[F, Args] ): F[Unit] = - Sync[F].delay(println(s"$emil $ctx $cfg")) + Sync[F].delay(println(s"$emil $ctx $cfg $upload")) + + object Impl { + + // limit number of folders + // limit number of mails to retrieve per folder + // per folder: + // fetch X mails + // check via msgId if already present; if not: + // load mail + // serialize to *bytes* + // store mail in queue + // move mail or delete or do nothing + // errors: log and keep going + // errors per folder fetch: fail the task + // notifiy joex after each batch + // + // no message id? make hash over complete mail or just import it + } }