diff --git a/modules/backend/src/main/scala/docspell/backend/JobFactory.scala b/modules/backend/src/main/scala/docspell/backend/JobFactory.scala index 9a79f104..d54b0f39 100644 --- a/modules/backend/src/main/scala/docspell/backend/JobFactory.scala +++ b/modules/backend/src/main/scala/docspell/backend/JobFactory.scala @@ -131,6 +131,22 @@ object JobFactory extends MailAddressCodec { Some(ReProcessItemArgs.taskName / args.itemId) ) + def multiUpload[F[_]: Sync]( + args: ProcessItemArgs, + account: AccountId, + prio: Priority, + tracker: Option[Ident] + ): F[Job[ProcessItemArgs]] = + Job.createNew( + ProcessItemArgs.multiUploadTaskName, + account.collective, + args, + args.makeSubject, + account.user, + prio, + tracker + ) + def processItem[F[_]: Sync]( args: ProcessItemArgs, account: AccountId, @@ -148,11 +164,11 @@ object JobFactory extends MailAddressCodec { ) def processItems[F[_]: Sync]( - args: Vector[ProcessItemArgs], + args: List[ProcessItemArgs], account: AccountId, prio: Priority, tracker: Option[Ident] - ): F[Vector[Job[ProcessItemArgs]]] = { + ): F[List[Job[ProcessItemArgs]]] = { def create(arg: ProcessItemArgs): F[Job[ProcessItemArgs]] = Job.createNew( ProcessItemArgs.taskName, diff --git a/modules/backend/src/main/scala/docspell/backend/ops/OUpload.scala b/modules/backend/src/main/scala/docspell/backend/ops/OUpload.scala index ca523cf8..8857ff83 100644 --- a/modules/backend/src/main/scala/docspell/backend/ops/OUpload.scala +++ b/modules/backend/src/main/scala/docspell/backend/ops/OUpload.scala @@ -68,7 +68,8 @@ object OUpload { fileFilter: Glob, tags: List[String], language: Option[Language], - attachmentsOnly: Option[Boolean] + attachmentsOnly: Option[Boolean], + flattenArchives: Option[Boolean] ) case class UploadData[F[_]]( @@ -146,12 +147,10 @@ object OUpload { false, data.meta.attachmentsOnly ) - args = - if (data.multiple) files.map(f => ProcessItemArgs(meta, List(f))) - else Vector(ProcessItemArgs(meta, files.toList)) - jobs <- right(makeJobs(args, account, data.priority, data.tracker)) + args = ProcessItemArgs(meta, files.toList) + jobs <- right(makeJobs(data, args, account)) _ <- right(logger.debug(s"Storing jobs: $jobs")) - res <- right(submitJobs(notifyJoex)(jobs)) + res <- right(submitJobs(notifyJoex)(jobs.map(_.encode))) _ <- right( store.transact( RSource.incrementCounter(data.meta.sourceAbbrev, account.collective) @@ -187,7 +186,7 @@ object OUpload { private def submitJobs( notifyJoex: Boolean - )(jobs: Vector[Job[String]]): F[OUpload.UploadResult] = + )(jobs: List[Job[String]]): F[OUpload.UploadResult] = for { _ <- logger.debug(s"Storing jobs: $jobs") _ <- jobStore.insertAll(jobs) @@ -240,13 +239,24 @@ object OUpload { else right(().pure[F]) private def makeJobs( - args: Vector[ProcessItemArgs], - account: AccountId, - prio: Priority, - tracker: Option[Ident] - ): F[Vector[Job[String]]] = - JobFactory - .processItems[F](args, account, prio, tracker) - .map(_.map(_.encode)) + data: UploadData[F], + args: ProcessItemArgs, + account: AccountId + ): F[List[Job[ProcessItemArgs]]] = + if (data.meta.flattenArchives.getOrElse(false)) + JobFactory + .multiUpload(args, account, data.priority, data.tracker) + .map(List(_)) + else if (data.multiple) + JobFactory.processItems( + args.files.map(f => args.copy(files = List(f))), + account, + data.priority, + data.tracker + ) + else + JobFactory + .processItem[F](args, account, data.priority, data.tracker) + .map(List(_)) }) } diff --git a/modules/common/src/main/scala/docspell/common/ProcessItemArgs.scala b/modules/common/src/main/scala/docspell/common/ProcessItemArgs.scala index 046b2255..8398eaf0 100644 --- a/modules/common/src/main/scala/docspell/common/ProcessItemArgs.scala +++ b/modules/common/src/main/scala/docspell/common/ProcessItemArgs.scala @@ -40,6 +40,8 @@ object ProcessItemArgs { val taskName = Ident.unsafe("process-item") + val multiUploadTaskName = Ident.unsafe("multi-upload-process") + case class ProcessMeta( collective: Ident, itemId: Option[Ident], diff --git a/modules/joex/src/main/scala/docspell/joex/JoexTasks.scala b/modules/joex/src/main/scala/docspell/joex/JoexTasks.scala index 8d2f0a1f..b9c242fe 100644 --- a/modules/joex/src/main/scala/docspell/joex/JoexTasks.scala +++ b/modules/joex/src/main/scala/docspell/joex/JoexTasks.scala @@ -20,6 +20,7 @@ import docspell.joex.filecopy.{FileCopyTask, FileIntegrityCheckTask} import docspell.joex.fts.{MigrationTask, ReIndexTask} import docspell.joex.hk.HouseKeepingTask import docspell.joex.learn.LearnClassifierTask +import docspell.joex.multiupload.MultiUploadArchiveTask import docspell.joex.notify.{PeriodicDueItemsTask, PeriodicQueryTask} import docspell.joex.pagecount.{AllPageCountTask, MakePageCountTask} import docspell.joex.pdfconv.{ConvertAllPdfTask, PdfConvTask} @@ -64,6 +65,13 @@ final class JoexTasks[F[_]: Async]( ItemHandler.onCancel[F](store) ) ) + .withTask( + JobTask.json( + ProcessItemArgs.multiUploadTaskName, + MultiUploadArchiveTask[F](store, jobStoreModule.jobs), + MultiUploadArchiveTask.onCancel[F](store) + ) + ) .withTask( JobTask.json( ReProcessItemArgs.taskName, diff --git a/modules/joex/src/main/scala/docspell/joex/multiupload/MultiUploadArchiveTask.scala b/modules/joex/src/main/scala/docspell/joex/multiupload/MultiUploadArchiveTask.scala new file mode 100644 index 00000000..6fc5f634 --- /dev/null +++ b/modules/joex/src/main/scala/docspell/joex/multiupload/MultiUploadArchiveTask.scala @@ -0,0 +1,143 @@ +/* + * Copyright 2020 Eike K. & Contributors + * + * SPDX-License-Identifier: AGPL-3.0-or-later + */ + +package docspell.joex.multiupload + +import cats.Monoid +import cats.data.OptionT +import cats.effect._ +import cats.implicits._ +import fs2.Stream + +import docspell.backend.JobFactory +import docspell.common._ +import docspell.files.Zip +import docspell.logging.Logger +import docspell.scheduler._ +import docspell.store.Store + +/** Task to submit multiple files at once. By default, one file in an upload results in + * one item. Zip files are extracted, but its inner files are considered to be one item + * with (perhaps) multiple attachments. + * + * In contrast, this task extracts ZIP files (not recursively) and submits each extracted + * file to be processed separately. Non-zip files are submitted as is. If zip files + * contain other zip file, these inner zip files will result in one item each, only the + * outer zip file is extracted here. + * + * Note: the outer zip file only acts as a container to transport multiple files and is + * NOT kept in docspell! + */ +object MultiUploadArchiveTask { + type Args = ProcessItemArgs + + def apply[F[_]: Async](store: Store[F], jobStore: JobStore[F]): Task[F, Args, Result] = + Task { ctx => + ctx.args.files + .traverse { file => + isZipFile(store)(file).flatMap { + case true => + ctx.logger.info(s"Extracting zip file ${file.name}") *> + extractZip(store, ctx.args)(file) + .evalTap(entry => + ctx.logger.debug( + s"Create job for entry: ${entry.files.flatMap(_.name)}" + ) + ) + .evalMap(makeJob[F](ctx, jobStore)) + .compile + .toList + .map(Jobs.extracted(file)) + + case false => + makeJob(ctx, jobStore)(ctx.args.copy(files = List(file))).map(Jobs.normal) + } + } + .map(_.combineAll) + .flatTap(jobs => jobStore.insertAll(jobs.jobs)) + .flatTap(deleteZips(store, ctx.logger)) + .map(_.result) + .flatTap(result => + ctx.logger.info( + s"Submitted ${result.submittedFiles}, extracted ${result.extractedZips} zips." + ) + ) + } + + def onCancel[F[_]: Sync](store: Store[F]): Task[F, ProcessItemArgs, Unit] = + Task { ctx => + for { + _ <- ctx.logger.warn("Cancelling multi-upload task, deleting uploaded files.") + _ <- ctx.args.files.map(_.fileMetaId).traverse(store.fileRepo.delete).void + } yield () + } + + private def deleteZips[F[_]: Sync](store: Store[F], logger: Logger[F])( + jobs: Jobs + ): F[Unit] = + logger.info(s"Deleting ${jobs.zips.size} extracted zip fies.") *> + jobs.zips.map(_.fileMetaId).traverse(store.fileRepo.delete).void + + private def makeJob[F[_]: Sync](ctx: Context[F, Args], jobStore: JobStore[F])( + args: ProcessItemArgs + ): F[Job[String]] = + for { + currentJob <- jobStore.findById(ctx.jobId) + prio = currentJob.map(_.priority).getOrElse(Priority.Low) + submitter = currentJob.map(_.submitter).getOrElse(DocspellSystem.user) + job <- JobFactory.processItem( + args, + AccountId(ctx.args.meta.collective, submitter), + prio, + None + ) + } yield job.encode + + private def isZipFile[F[_]: Sync]( + store: Store[F] + )(file: ProcessItemArgs.File): F[Boolean] = + OptionT(store.fileRepo.findMeta(file.fileMetaId)) + .map(_.mimetype.matches(MimeType.zip)) + .getOrElse(false) + + private def extractZip[F[_]: Async]( + store: Store[F], + args: Args + )(file: ProcessItemArgs.File): Stream[F, ProcessItemArgs] = + store.fileRepo + .getBytes(file.fileMetaId) + .through(Zip.unzipP[F](8192, args.meta.fileFilter.getOrElse(Glob.all))) + .flatMap { entry => + val hint = MimeTypeHint(entry.name.some, entry.mime.asString.some) + entry.data + .through( + store.fileRepo.save(args.meta.collective, FileCategory.AttachmentSource, hint) + ) + .map(key => + args.copy(files = ProcessItemArgs.File(entry.name.some, key) :: Nil) + ) + } + + case class Jobs( + result: Result, + jobs: List[Job[String]], + zips: List[ProcessItemArgs.File] + ) + object Jobs { + def extracted(zip: ProcessItemArgs.File)(jobs: List[Job[String]]): Jobs = + Jobs(Result(jobs.size, 1), jobs, List(zip)) + + def normal(job: Job[String]): Jobs = + Jobs(Result.notExtracted, List(job), Nil) + + val empty: Jobs = Jobs(Result.empty, Nil, Nil) + implicit val jobsMonoid: Monoid[Jobs] = + Monoid.instance( + empty, + (a, b) => Jobs(a.result.combine(b.result), a.jobs ::: b.jobs, a.zips ::: b.zips) + ) + } +} diff --git a/modules/joex/src/main/scala/docspell/joex/multiupload/Result.scala b/modules/joex/src/main/scala/docspell/joex/multiupload/Result.scala new file mode 100644 index 00000000..fbe5dc4f --- /dev/null +++ b/modules/joex/src/main/scala/docspell/joex/multiupload/Result.scala @@ -0,0 +1,36 @@ +/* + * Copyright 2020 Eike K. & Contributors + * + * SPDX-License-Identifier: AGPL-3.0-or-later + */ + +package docspell.joex.multiupload + +import cats.Monoid + +import docspell.scheduler.JobTaskResultEncoder + +import io.circe.Encoder +import io.circe.generic.semiauto.deriveEncoder + +case class Result(submittedFiles: Int, extractedZips: Int) + +object Result { + val empty: Result = Result(0, 0) + def notExtracted: Result = Result(1, 0) + + implicit val resultMonoid: Monoid[Result] = + Monoid.instance( + empty, + (a, b) => + Result(a.submittedFiles + b.submittedFiles, a.extractedZips + b.extractedZips) + ) + + implicit val jsonEncoder: Encoder[Result] = + deriveEncoder + + implicit val taskResultEncoder: JobTaskResultEncoder[Result] = + JobTaskResultEncoder.fromJson[Result].withMessage { result => + s"Submitted ${result.submittedFiles} files, extracted ${result.extractedZips} zip files." + } +} diff --git a/modules/joex/src/main/scala/docspell/joex/scanmailbox/ScanMailboxTask.scala b/modules/joex/src/main/scala/docspell/joex/scanmailbox/ScanMailboxTask.scala index fcd2e33f..4f2fc0ec 100644 --- a/modules/joex/src/main/scala/docspell/joex/scanmailbox/ScanMailboxTask.scala +++ b/modules/joex/src/main/scala/docspell/joex/scanmailbox/ScanMailboxTask.scala @@ -327,7 +327,8 @@ object ScanMailboxTask { args.fileFilter.getOrElse(Glob.all), args.tags.getOrElse(Nil), args.language, - args.attachmentsOnly + args.attachmentsOnly, + None ) data = OUpload.UploadData( multiple = false, diff --git a/modules/restapi/src/main/resources/docspell-openapi.yml b/modules/restapi/src/main/resources/docspell-openapi.yml index c64c34ca..fb68b68b 100644 --- a/modules/restapi/src/main/resources/docspell-openapi.yml +++ b/modules/restapi/src/main/resources/docspell-openapi.yml @@ -7514,6 +7514,15 @@ components: If `true` (the default) each file in the upload request results in a separate item. If it is set to `false`, then all files in the request are put into a single item. + flattenArchives: + type: boolean + default: false + description: | + This implies `multiple = true` and will (when `true`) + treat every ZIP file as a container of independent files + that will result in separate items. When this is `false` + then each zip file will result in one item with its + contents being the attachments. direction: type: string format: direction diff --git a/modules/restserver/src/main/scala/docspell/restserver/conv/Conversions.scala b/modules/restserver/src/main/scala/docspell/restserver/conv/Conversions.scala index d449fea4..26c95dd2 100644 --- a/modules/restserver/src/main/scala/docspell/restserver/conv/Conversions.scala +++ b/modules/restserver/src/main/scala/docspell/restserver/conv/Conversions.scala @@ -354,7 +354,8 @@ trait Conversions { m.fileFilter.getOrElse(Glob.all), m.tags.map(_.items).getOrElse(Nil), m.language, - m.attachmentsOnly + m.attachmentsOnly, + m.flattenArchives ) ) ) @@ -371,6 +372,7 @@ trait Conversions { Glob.all, Nil, None, + None, None ) ) diff --git a/modules/scheduler/api/src/main/scala/docspell/scheduler/JobStore.scala b/modules/scheduler/api/src/main/scala/docspell/scheduler/JobStore.scala index de9eee9d..d8cae78a 100644 --- a/modules/scheduler/api/src/main/scala/docspell/scheduler/JobStore.scala +++ b/modules/scheduler/api/src/main/scala/docspell/scheduler/JobStore.scala @@ -6,6 +6,8 @@ package docspell.scheduler +import docspell.common.Ident + trait JobStore[F[_]] { /** Inserts the job into the queue to get picked up as soon as possible. The job must @@ -24,4 +26,5 @@ trait JobStore[F[_]] { def insertAllIfNew(jobs: Seq[Job[String]]): F[List[Boolean]] + def findById(jobId: Ident): F[Option[Job[String]]] } diff --git a/modules/scheduler/impl/src/main/scala/docspell/scheduler/impl/JobStoreImpl.scala b/modules/scheduler/impl/src/main/scala/docspell/scheduler/impl/JobStoreImpl.scala index 38b0b067..d150a2eb 100644 --- a/modules/scheduler/impl/src/main/scala/docspell/scheduler/impl/JobStoreImpl.scala +++ b/modules/scheduler/impl/src/main/scala/docspell/scheduler/impl/JobStoreImpl.scala @@ -6,10 +6,11 @@ package docspell.scheduler.impl +import cats.data.OptionT import cats.effect.Sync import cats.syntax.all._ -import docspell.common.Timestamp +import docspell.common.{Ident, Timestamp} import docspell.scheduler._ import docspell.store.Store import docspell.store.records.RJob @@ -72,6 +73,14 @@ final class JobStoreImpl[F[_]: Sync](store: Store[F]) extends JobStore[F] { }) } + def findById(jobId: Ident) = + OptionT(store.transact(RJob.findById(jobId))) + .map(toJob) + .value + + def toJob(r: RJob): Job[String] = + Job(r.id, r.task, r.group, r.args, r.subject, r.submitter, r.priority, r.tracker) + def toRecord(job: Job[String], timestamp: Timestamp): RJob = RJob.newJob( job.id, diff --git a/modules/scheduler/impl/src/main/scala/docspell/scheduler/impl/JobStorePublish.scala b/modules/scheduler/impl/src/main/scala/docspell/scheduler/impl/JobStorePublish.scala index f9ced949..e984e6e5 100644 --- a/modules/scheduler/impl/src/main/scala/docspell/scheduler/impl/JobStorePublish.scala +++ b/modules/scheduler/impl/src/main/scala/docspell/scheduler/impl/JobStorePublish.scala @@ -9,7 +9,7 @@ package docspell.scheduler.impl import cats.effect._ import cats.implicits._ -import docspell.common.JobState +import docspell.common.{Ident, JobState} import docspell.notification.api.{Event, EventSink} import docspell.pubsub.api.PubSubT import docspell.scheduler._ @@ -64,6 +64,9 @@ final class JobStorePublish[F[_]: Sync]( else ().pure[F] } } + + def findById(jobId: Ident) = + delegate.findById(jobId) } object JobStorePublish { diff --git a/modules/webapp/src/main/elm/Comp/UploadForm.elm b/modules/webapp/src/main/elm/Comp/UploadForm.elm index 0dd01e37..e94fb422 100644 --- a/modules/webapp/src/main/elm/Comp/UploadForm.elm +++ b/modules/webapp/src/main/elm/Comp/UploadForm.elm @@ -43,6 +43,7 @@ type alias Model = , skipDuplicates : Bool , languageModel : Comp.FixedDropdown.Model Language , language : Maybe Language + , flattenArchives : Bool } @@ -56,6 +57,7 @@ type Msg | DropzoneMsg Comp.Dropzone.Msg | ToggleSkipDuplicates | LanguageMsg (Comp.FixedDropdown.Msg Language) + | ToggleFlattenArchives init : Model @@ -71,6 +73,7 @@ init = , languageModel = Comp.FixedDropdown.init Data.Language.all , language = Nothing + , flattenArchives = False } @@ -132,11 +135,44 @@ update sourceId flags msg model = ( { model | incoming = not model.incoming }, Cmd.none, Sub.none ) ToggleSingleItem -> - ( { model | singleItem = not model.singleItem }, Cmd.none, Sub.none ) + let + newFlag = + not model.singleItem + in + ( { model + | singleItem = newFlag + , flattenArchives = + if newFlag then + False + + else + model.flattenArchives + } + , Cmd.none + , Sub.none + ) ToggleSkipDuplicates -> ( { model | skipDuplicates = not model.skipDuplicates }, Cmd.none, Sub.none ) + ToggleFlattenArchives -> + let + newFlag = + not model.flattenArchives + in + ( { model + | flattenArchives = newFlag + , singleItem = + if newFlag then + False + + else + model.singleItem + } + , Cmd.none + , Sub.none + ) + SubmitUpload -> let emptyMeta = @@ -153,6 +189,7 @@ update sourceId flags msg model = else Just "outgoing" , language = Maybe.map Data.Language.toIso3 model.language + , flattenArchives = Just model.flattenArchives } fileids = @@ -403,6 +440,20 @@ renderForm texts model = ] ] ] + , div [ class "flex flex-col mb-3" ] + [ label [ class "inline-flex items-center" ] + [ input + [ type_ "checkbox" + , checked model.flattenArchives + , onCheck (\_ -> ToggleFlattenArchives) + , class Styles.checkboxInput + ] + [] + , span [ class "ml-2" ] + [ text texts.flattenArchives + ] + ] + ] , div [ class "flex flex-col mb-3" ] [ label [ class "inline-flex items-center" ] [ input diff --git a/modules/webapp/src/main/elm/Messages/Comp/UploadForm.elm b/modules/webapp/src/main/elm/Messages/Comp/UploadForm.elm index 385fddfe..fdf4505d 100644 --- a/modules/webapp/src/main/elm/Messages/Comp/UploadForm.elm +++ b/modules/webapp/src/main/elm/Messages/Comp/UploadForm.elm @@ -35,6 +35,7 @@ type alias Texts = } , selectedFiles : String , languageLabel : Language -> String + , flattenArchives : String } @@ -65,6 +66,7 @@ gb = } , selectedFiles = "Selected Files" , languageLabel = Messages.Data.Language.gb + , flattenArchives = "Extract zip file contents into separate items, in contrast to a single document with multiple attachments." } @@ -95,6 +97,7 @@ de = } , selectedFiles = "Ausgewählte Dateien" , languageLabel = Messages.Data.Language.de + , flattenArchives = "ZIP Dateien in separate Dokumente entpacken, anstatt ein Dokument mit mehreren Anhängen." } @@ -125,4 +128,5 @@ fr = } , selectedFiles = "Fichiers séléctionnés" , languageLabel = Messages.Data.Language.fr + , flattenArchives = "Décompresser les fichiers ZIP dans des documents séparés au lieu de créer un document avec plusieurs pièces jointes." } diff --git a/website/site/content/docs/api/upload.md b/website/site/content/docs/api/upload.md index c6687b4c..4ff8886f 100644 --- a/website/site/content/docs/api/upload.md +++ b/website/site/content/docs/api/upload.md @@ -53,6 +53,7 @@ specified via a JSON structure in a part with name `meta`: , fileFilter: Maybe String , language: Maybe String , attachmentsOnly: Maybe Bool +, flattenArchives: Maybe Bool } ``` @@ -95,7 +96,16 @@ specified via a JSON structure in a part with name `meta`: `*.eml`). If this is `true`, then the e-mail body is discarded and only the attachments are imported. An e-mail without any attachments is therefore skipped. - +- `flattenArchives` is flag to control how zip files are treated. When + this is `false` (the default), then one zip file results in one item + and its contents are the attachments. If you rather want the + contents to be treated as independent files, then set this to + `true`. This will submit each entry in the zip file as a separate + processing job. Note: when this is `true` the zip file is just a + container and doesn't contain other useful information and therefore + is *NOT* kept in docspell, only its contents are. Also note that + only the uploaded zip files are extracted once (not recursively), so + if it contains other zip files, they are treated as normal. # Endpoints