Add upload operation to task arguments

This commit is contained in:
Eike Kettner 2020-05-18 15:58:31 +02:00
parent 6e8582ea80
commit 852455c610
6 changed files with 57 additions and 9 deletions

View File

@ -48,7 +48,7 @@ object BackendApp {
equipImpl <- OEquipment[F](store)
orgImpl <- OOrganization(store)
joexImpl <- OJoex.create(httpClientEc, store)
uploadImpl <- OUpload(store, queue, cfg, joexImpl)
uploadImpl <- OUpload(store, queue, cfg.files, joexImpl)
nodeImpl <- ONode(store)
jobImpl <- OJob(store, joexImpl)
itemImpl <- OItem(store)

View File

@ -52,7 +52,7 @@ object OUpload {
def apply[F[_]: Sync](
store: Store[F],
queue: JobQueue[F],
cfg: Config,
cfg: Config.Files,
joex: OJoex[F]
): Resource[F, OUpload[F]] =
Resource.pure[F, OUpload[F]](new OUpload[F] {
@ -105,7 +105,7 @@ object OUpload {
private def saveFile(file: File[F]): F[Option[ProcessItemArgs.File]] =
logger.finfo(s"Receiving file $file") *>
store.bitpeace
.saveNew(file.data, cfg.files.chunkSize, MimetypeHint(file.name, None), None)
.saveNew(file.data, cfg.chunkSize, MimetypeHint(file.name, None), None)
.compile
.lastOrError
.map(fm => Ident.unsafe(fm.id))

View File

@ -308,4 +308,28 @@ docspell.joex {
working-dir = ${java.io.tmpdir}"/docspell-convert"
}
}
# The same section is also present in the rest-server config. It is
# used when submitting files into the job queue for processing.
#
# Currently, these settings may affect memory usage of all nodes, so
# it should be the same on all nodes.
files {
# Defines the chunk size (in bytes) used to store the files.
# This will affect the memory footprint when uploading and
# downloading files. At most this amount is loaded into RAM for
# down- and uploading.
#
# It also defines the chunk size used for the blobs inside the
# database.
chunk-size = 524288
# The file content types that are considered valid. Docspell
# will only pass these files to processing. The processing code
# itself has also checks for which files are supported and which
# not. This affects the uploading part and can be used to
# restrict file types that should be handed over to processing.
# By default all files are allowed.
valid-mime-types = [ ]
}
}

View File

@ -7,6 +7,7 @@ import docspell.store.JdbcConfig
import docspell.convert.ConvertConfig
import docspell.extract.ExtractConfig
import docspell.joex.hk.HouseKeepingConfig
import docspell.backend.Config.Files
case class Config(
appId: Ident,
@ -19,7 +20,8 @@ case class Config(
extraction: ExtractConfig,
textAnalysis: TextAnalysisConfig,
convert: ConvertConfig,
sendMail: MailSendConfig
sendMail: MailSendConfig,
files: Files
)
object Config {

View File

@ -4,6 +4,7 @@ import cats.implicits._
import cats.effect._
import emil.javamail._
import docspell.common._
import docspell.backend.ops._
import docspell.joex.hk._
import docspell.joex.notify._
import docspell.joex.scanmailbox._
@ -12,7 +13,6 @@ import docspell.joex.scheduler._
import docspell.joexapi.client.JoexClient
import docspell.store.Store
import docspell.store.queue._
import docspell.backend.ops.ONode
import docspell.store.records.RJobLog
import fs2.concurrent.SignallingRef
import scala.concurrent.ExecutionContext
@ -68,6 +68,8 @@ object JoexAppImpl {
queue <- JobQueue(store)
pstore <- PeriodicTaskStore.create(store)
nodeOps <- ONode(store)
joex <- OJoex(client, store)
upload <- OUpload(store, queue, cfg.files, joex)
javaEmil = JavaMailEmil(blocker)
sch <- SchedulerBuilder(cfg.scheduler, blocker, store)
.withQueue(queue)
@ -88,7 +90,7 @@ object JoexAppImpl {
.withTask(
JobTask.json(
ScanMailboxArgs.taskName,
ScanMailboxTask[F](javaEmil),
ScanMailboxTask[F](javaEmil, upload),
ScanMailboxTask.onCancel[F]
)
)

View File

@ -6,6 +6,7 @@ import emil._
//import emil.javamail.syntax._
import docspell.common._
import docspell.backend.ops.OUpload
import docspell.store.records._
import docspell.joex.scheduler.{Context, Task}
@ -13,7 +14,7 @@ object ScanMailboxTask {
val maxItems: Long = 7
type Args = ScanMailboxArgs
def apply[F[_]: Sync](emil: Emil[F]): Task[F, Args, Unit] =
def apply[F[_]: Sync](emil: Emil[F], upload: OUpload[F]): Task[F, Args, Unit] =
Task { ctx =>
for {
_ <- ctx.logger.info(
@ -26,7 +27,7 @@ object ScanMailboxTask {
_ <- ctx.logger.info(
s"Reading mails for user ${userId.id} from ${imapConn.id}/${folders}"
)
_ <- importMails(mailCfg, emil, ctx)
_ <- importMails(mailCfg, emil, upload, ctx)
} yield ()
}
@ -49,7 +50,26 @@ object ScanMailboxTask {
def importMails[F[_]: Sync](
cfg: RUserImap,
emil: Emil[F],
upload: OUpload[F],
ctx: Context[F, Args]
): F[Unit] =
Sync[F].delay(println(s"$emil $ctx $cfg"))
Sync[F].delay(println(s"$emil $ctx $cfg $upload"))
object Impl {
// limit number of folders
// limit number of mails to retrieve per folder
// per folder:
// fetch X mails
// check via msgId if already present; if not:
// load mail
// serialize to *bytes*
// store mail in queue
// move mail or delete or do nothing
// errors: log and keep going
// errors per folder fetch: fail the task
// notifiy joex after each batch
//
// no message id? make hash over complete mail or just import it
}
}