Upload zip files contents as independent files

This commit is contained in:
eikek
2022-03-20 11:40:25 +01:00
parent 06c3561a8e
commit b84bbbd750
15 changed files with 330 additions and 23 deletions

View File

@ -131,6 +131,22 @@ object JobFactory extends MailAddressCodec {
Some(ReProcessItemArgs.taskName / args.itemId)
)
def multiUpload[F[_]: Sync](
args: ProcessItemArgs,
account: AccountId,
prio: Priority,
tracker: Option[Ident]
): F[Job[ProcessItemArgs]] =
Job.createNew(
ProcessItemArgs.multiUploadTaskName,
account.collective,
args,
args.makeSubject,
account.user,
prio,
tracker
)
def processItem[F[_]: Sync](
args: ProcessItemArgs,
account: AccountId,
@ -148,11 +164,11 @@ object JobFactory extends MailAddressCodec {
)
def processItems[F[_]: Sync](
args: Vector[ProcessItemArgs],
args: List[ProcessItemArgs],
account: AccountId,
prio: Priority,
tracker: Option[Ident]
): F[Vector[Job[ProcessItemArgs]]] = {
): F[List[Job[ProcessItemArgs]]] = {
def create(arg: ProcessItemArgs): F[Job[ProcessItemArgs]] =
Job.createNew(
ProcessItemArgs.taskName,

View File

@ -68,7 +68,8 @@ object OUpload {
fileFilter: Glob,
tags: List[String],
language: Option[Language],
attachmentsOnly: Option[Boolean]
attachmentsOnly: Option[Boolean],
flattenArchives: Option[Boolean]
)
case class UploadData[F[_]](
@ -146,12 +147,10 @@ object OUpload {
false,
data.meta.attachmentsOnly
)
args =
if (data.multiple) files.map(f => ProcessItemArgs(meta, List(f)))
else Vector(ProcessItemArgs(meta, files.toList))
jobs <- right(makeJobs(args, account, data.priority, data.tracker))
args = ProcessItemArgs(meta, files.toList)
jobs <- right(makeJobs(data, args, account))
_ <- right(logger.debug(s"Storing jobs: $jobs"))
res <- right(submitJobs(notifyJoex)(jobs))
res <- right(submitJobs(notifyJoex)(jobs.map(_.encode)))
_ <- right(
store.transact(
RSource.incrementCounter(data.meta.sourceAbbrev, account.collective)
@ -187,7 +186,7 @@ object OUpload {
private def submitJobs(
notifyJoex: Boolean
)(jobs: Vector[Job[String]]): F[OUpload.UploadResult] =
)(jobs: List[Job[String]]): F[OUpload.UploadResult] =
for {
_ <- logger.debug(s"Storing jobs: $jobs")
_ <- jobStore.insertAll(jobs)
@ -240,13 +239,24 @@ object OUpload {
else right(().pure[F])
private def makeJobs(
args: Vector[ProcessItemArgs],
account: AccountId,
prio: Priority,
tracker: Option[Ident]
): F[Vector[Job[String]]] =
JobFactory
.processItems[F](args, account, prio, tracker)
.map(_.map(_.encode))
data: UploadData[F],
args: ProcessItemArgs,
account: AccountId
): F[List[Job[ProcessItemArgs]]] =
if (data.meta.flattenArchives.getOrElse(false))
JobFactory
.multiUpload(args, account, data.priority, data.tracker)
.map(List(_))
else if (data.multiple)
JobFactory.processItems(
args.files.map(f => args.copy(files = List(f))),
account,
data.priority,
data.tracker
)
else
JobFactory
.processItem[F](args, account, data.priority, data.tracker)
.map(List(_))
})
}

View File

@ -40,6 +40,8 @@ object ProcessItemArgs {
val taskName = Ident.unsafe("process-item")
val multiUploadTaskName = Ident.unsafe("multi-upload-process")
case class ProcessMeta(
collective: Ident,
itemId: Option[Ident],

View File

@ -20,6 +20,7 @@ import docspell.joex.filecopy.{FileCopyTask, FileIntegrityCheckTask}
import docspell.joex.fts.{MigrationTask, ReIndexTask}
import docspell.joex.hk.HouseKeepingTask
import docspell.joex.learn.LearnClassifierTask
import docspell.joex.multiupload.MultiUploadArchiveTask
import docspell.joex.notify.{PeriodicDueItemsTask, PeriodicQueryTask}
import docspell.joex.pagecount.{AllPageCountTask, MakePageCountTask}
import docspell.joex.pdfconv.{ConvertAllPdfTask, PdfConvTask}
@ -64,6 +65,13 @@ final class JoexTasks[F[_]: Async](
ItemHandler.onCancel[F](store)
)
)
.withTask(
JobTask.json(
ProcessItemArgs.multiUploadTaskName,
MultiUploadArchiveTask[F](store, jobStoreModule.jobs),
MultiUploadArchiveTask.onCancel[F](store)
)
)
.withTask(
JobTask.json(
ReProcessItemArgs.taskName,

View File

@ -0,0 +1,143 @@
/*
* Copyright 2020 Eike K. & Contributors
*
* SPDX-License-Identifier: AGPL-3.0-or-later
*/
package docspell.joex.multiupload
import cats.Monoid
import cats.data.OptionT
import cats.effect._
import cats.implicits._
import fs2.Stream
import docspell.backend.JobFactory
import docspell.common._
import docspell.files.Zip
import docspell.logging.Logger
import docspell.scheduler._
import docspell.store.Store
/** Task to submit multiple files at once. By default, one file in an upload results in
* one item. Zip files are extracted, but its inner files are considered to be one item
* with (perhaps) multiple attachments.
*
* In contrast, this task extracts ZIP files (not recursively) and submits each extracted
* file to be processed separately. Non-zip files are submitted as is. If zip files
* contain other zip file, these inner zip files will result in one item each, only the
* outer zip file is extracted here.
*
* Note: the outer zip file only acts as a container to transport multiple files and is
* NOT kept in docspell!
*/
object MultiUploadArchiveTask {
type Args = ProcessItemArgs
def apply[F[_]: Async](store: Store[F], jobStore: JobStore[F]): Task[F, Args, Result] =
Task { ctx =>
ctx.args.files
.traverse { file =>
isZipFile(store)(file).flatMap {
case true =>
ctx.logger.info(s"Extracting zip file ${file.name}") *>
extractZip(store, ctx.args)(file)
.evalTap(entry =>
ctx.logger.debug(
s"Create job for entry: ${entry.files.flatMap(_.name)}"
)
)
.evalMap(makeJob[F](ctx, jobStore))
.compile
.toList
.map(Jobs.extracted(file))
case false =>
makeJob(ctx, jobStore)(ctx.args.copy(files = List(file))).map(Jobs.normal)
}
}
.map(_.combineAll)
.flatTap(jobs => jobStore.insertAll(jobs.jobs))
.flatTap(deleteZips(store, ctx.logger))
.map(_.result)
.flatTap(result =>
ctx.logger.info(
s"Submitted ${result.submittedFiles}, extracted ${result.extractedZips} zips."
)
)
}
def onCancel[F[_]: Sync](store: Store[F]): Task[F, ProcessItemArgs, Unit] =
Task { ctx =>
for {
_ <- ctx.logger.warn("Cancelling multi-upload task, deleting uploaded files.")
_ <- ctx.args.files.map(_.fileMetaId).traverse(store.fileRepo.delete).void
} yield ()
}
private def deleteZips[F[_]: Sync](store: Store[F], logger: Logger[F])(
jobs: Jobs
): F[Unit] =
logger.info(s"Deleting ${jobs.zips.size} extracted zip fies.") *>
jobs.zips.map(_.fileMetaId).traverse(store.fileRepo.delete).void
private def makeJob[F[_]: Sync](ctx: Context[F, Args], jobStore: JobStore[F])(
args: ProcessItemArgs
): F[Job[String]] =
for {
currentJob <- jobStore.findById(ctx.jobId)
prio = currentJob.map(_.priority).getOrElse(Priority.Low)
submitter = currentJob.map(_.submitter).getOrElse(DocspellSystem.user)
job <- JobFactory.processItem(
args,
AccountId(ctx.args.meta.collective, submitter),
prio,
None
)
} yield job.encode
private def isZipFile[F[_]: Sync](
store: Store[F]
)(file: ProcessItemArgs.File): F[Boolean] =
OptionT(store.fileRepo.findMeta(file.fileMetaId))
.map(_.mimetype.matches(MimeType.zip))
.getOrElse(false)
private def extractZip[F[_]: Async](
store: Store[F],
args: Args
)(file: ProcessItemArgs.File): Stream[F, ProcessItemArgs] =
store.fileRepo
.getBytes(file.fileMetaId)
.through(Zip.unzipP[F](8192, args.meta.fileFilter.getOrElse(Glob.all)))
.flatMap { entry =>
val hint = MimeTypeHint(entry.name.some, entry.mime.asString.some)
entry.data
.through(
store.fileRepo.save(args.meta.collective, FileCategory.AttachmentSource, hint)
)
.map(key =>
args.copy(files = ProcessItemArgs.File(entry.name.some, key) :: Nil)
)
}
case class Jobs(
result: Result,
jobs: List[Job[String]],
zips: List[ProcessItemArgs.File]
)
object Jobs {
def extracted(zip: ProcessItemArgs.File)(jobs: List[Job[String]]): Jobs =
Jobs(Result(jobs.size, 1), jobs, List(zip))
def normal(job: Job[String]): Jobs =
Jobs(Result.notExtracted, List(job), Nil)
val empty: Jobs = Jobs(Result.empty, Nil, Nil)
implicit val jobsMonoid: Monoid[Jobs] =
Monoid.instance(
empty,
(a, b) => Jobs(a.result.combine(b.result), a.jobs ::: b.jobs, a.zips ::: b.zips)
)
}
}

View File

@ -0,0 +1,36 @@
/*
* Copyright 2020 Eike K. & Contributors
*
* SPDX-License-Identifier: AGPL-3.0-or-later
*/
package docspell.joex.multiupload
import cats.Monoid
import docspell.scheduler.JobTaskResultEncoder
import io.circe.Encoder
import io.circe.generic.semiauto.deriveEncoder
case class Result(submittedFiles: Int, extractedZips: Int)
object Result {
val empty: Result = Result(0, 0)
def notExtracted: Result = Result(1, 0)
implicit val resultMonoid: Monoid[Result] =
Monoid.instance(
empty,
(a, b) =>
Result(a.submittedFiles + b.submittedFiles, a.extractedZips + b.extractedZips)
)
implicit val jsonEncoder: Encoder[Result] =
deriveEncoder
implicit val taskResultEncoder: JobTaskResultEncoder[Result] =
JobTaskResultEncoder.fromJson[Result].withMessage { result =>
s"Submitted ${result.submittedFiles} files, extracted ${result.extractedZips} zip files."
}
}

View File

@ -327,7 +327,8 @@ object ScanMailboxTask {
args.fileFilter.getOrElse(Glob.all),
args.tags.getOrElse(Nil),
args.language,
args.attachmentsOnly
args.attachmentsOnly,
None
)
data = OUpload.UploadData(
multiple = false,

View File

@ -7514,6 +7514,15 @@ components:
If `true` (the default) each file in the upload request
results in a separate item. If it is set to `false`, then
all files in the request are put into a single item.
flattenArchives:
type: boolean
default: false
description: |
This implies `multiple = true` and will (when `true`)
treat every ZIP file as a container of independent files
that will result in separate items. When this is `false`
then each zip file will result in one item with its
contents being the attachments.
direction:
type: string
format: direction

View File

@ -354,7 +354,8 @@ trait Conversions {
m.fileFilter.getOrElse(Glob.all),
m.tags.map(_.items).getOrElse(Nil),
m.language,
m.attachmentsOnly
m.attachmentsOnly,
m.flattenArchives
)
)
)
@ -371,6 +372,7 @@ trait Conversions {
Glob.all,
Nil,
None,
None,
None
)
)

View File

@ -6,6 +6,8 @@
package docspell.scheduler
import docspell.common.Ident
trait JobStore[F[_]] {
/** Inserts the job into the queue to get picked up as soon as possible. The job must
@ -24,4 +26,5 @@ trait JobStore[F[_]] {
def insertAllIfNew(jobs: Seq[Job[String]]): F[List[Boolean]]
def findById(jobId: Ident): F[Option[Job[String]]]
}

View File

@ -6,10 +6,11 @@
package docspell.scheduler.impl
import cats.data.OptionT
import cats.effect.Sync
import cats.syntax.all._
import docspell.common.Timestamp
import docspell.common.{Ident, Timestamp}
import docspell.scheduler._
import docspell.store.Store
import docspell.store.records.RJob
@ -72,6 +73,14 @@ final class JobStoreImpl[F[_]: Sync](store: Store[F]) extends JobStore[F] {
})
}
def findById(jobId: Ident) =
OptionT(store.transact(RJob.findById(jobId)))
.map(toJob)
.value
def toJob(r: RJob): Job[String] =
Job(r.id, r.task, r.group, r.args, r.subject, r.submitter, r.priority, r.tracker)
def toRecord(job: Job[String], timestamp: Timestamp): RJob =
RJob.newJob(
job.id,

View File

@ -9,7 +9,7 @@ package docspell.scheduler.impl
import cats.effect._
import cats.implicits._
import docspell.common.JobState
import docspell.common.{Ident, JobState}
import docspell.notification.api.{Event, EventSink}
import docspell.pubsub.api.PubSubT
import docspell.scheduler._
@ -64,6 +64,9 @@ final class JobStorePublish[F[_]: Sync](
else ().pure[F]
}
}
def findById(jobId: Ident) =
delegate.findById(jobId)
}
object JobStorePublish {

View File

@ -43,6 +43,7 @@ type alias Model =
, skipDuplicates : Bool
, languageModel : Comp.FixedDropdown.Model Language
, language : Maybe Language
, flattenArchives : Bool
}
@ -56,6 +57,7 @@ type Msg
| DropzoneMsg Comp.Dropzone.Msg
| ToggleSkipDuplicates
| LanguageMsg (Comp.FixedDropdown.Msg Language)
| ToggleFlattenArchives
init : Model
@ -71,6 +73,7 @@ init =
, languageModel =
Comp.FixedDropdown.init Data.Language.all
, language = Nothing
, flattenArchives = False
}
@ -132,11 +135,44 @@ update sourceId flags msg model =
( { model | incoming = not model.incoming }, Cmd.none, Sub.none )
ToggleSingleItem ->
( { model | singleItem = not model.singleItem }, Cmd.none, Sub.none )
let
newFlag =
not model.singleItem
in
( { model
| singleItem = newFlag
, flattenArchives =
if newFlag then
False
else
model.flattenArchives
}
, Cmd.none
, Sub.none
)
ToggleSkipDuplicates ->
( { model | skipDuplicates = not model.skipDuplicates }, Cmd.none, Sub.none )
ToggleFlattenArchives ->
let
newFlag =
not model.flattenArchives
in
( { model
| flattenArchives = newFlag
, singleItem =
if newFlag then
False
else
model.singleItem
}
, Cmd.none
, Sub.none
)
SubmitUpload ->
let
emptyMeta =
@ -153,6 +189,7 @@ update sourceId flags msg model =
else
Just "outgoing"
, language = Maybe.map Data.Language.toIso3 model.language
, flattenArchives = Just model.flattenArchives
}
fileids =
@ -403,6 +440,20 @@ renderForm texts model =
]
]
]
, div [ class "flex flex-col mb-3" ]
[ label [ class "inline-flex items-center" ]
[ input
[ type_ "checkbox"
, checked model.flattenArchives
, onCheck (\_ -> ToggleFlattenArchives)
, class Styles.checkboxInput
]
[]
, span [ class "ml-2" ]
[ text texts.flattenArchives
]
]
]
, div [ class "flex flex-col mb-3" ]
[ label [ class "inline-flex items-center" ]
[ input

View File

@ -35,6 +35,7 @@ type alias Texts =
}
, selectedFiles : String
, languageLabel : Language -> String
, flattenArchives : String
}
@ -65,6 +66,7 @@ gb =
}
, selectedFiles = "Selected Files"
, languageLabel = Messages.Data.Language.gb
, flattenArchives = "Extract zip file contents into separate items, in contrast to a single document with multiple attachments."
}
@ -95,6 +97,7 @@ de =
}
, selectedFiles = "Ausgewählte Dateien"
, languageLabel = Messages.Data.Language.de
, flattenArchives = "ZIP Dateien in separate Dokumente entpacken, anstatt ein Dokument mit mehreren Anhängen."
}
@ -125,4 +128,5 @@ fr =
}
, selectedFiles = "Fichiers séléctionnés"
, languageLabel = Messages.Data.Language.fr
, flattenArchives = "Décompresser les fichiers ZIP dans des documents séparés au lieu de créer un document avec plusieurs pièces jointes."
}