Merge pull request #1459 from eikek/flatten-zip-uploads

Flatten zip uploads
This commit is contained in:
mergify[bot] 2022-03-20 11:05:48 +00:00 committed by GitHub
commit 880b1fcf94
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
32 changed files with 425 additions and 145 deletions

View File

@ -78,18 +78,17 @@ object BackendApp {
tagImpl <- OTag[F](store)
equipImpl <- OEquipment[F](store)
orgImpl <- OOrganization(store)
uploadImpl <- OUpload(store, schedulerModule.jobs, joexImpl)
uploadImpl <- OUpload(store, schedulerModule.jobs)
nodeImpl <- ONode(store)
jobImpl <- OJob(store, joexImpl, pubSubT)
createIndex <- CreateIndex.resource(ftsClient, store)
itemImpl <- OItem(store, ftsClient, createIndex, schedulerModule.jobs, joexImpl)
itemImpl <- OItem(store, ftsClient, createIndex, schedulerModule.jobs)
itemSearchImpl <- OItemSearch(store)
fulltextImpl <- OFulltext(
itemSearchImpl,
ftsClient,
store,
schedulerModule.jobs,
joexImpl
schedulerModule.jobs
)
mailImpl <- OMail(store, javaEmil)
userTaskImpl <- OUserTask(
@ -106,7 +105,7 @@ object BackendApp {
)
notifyImpl <- ONotification(store, notificationMod)
bookmarksImpl <- OQueryBookmarks(store)
fileRepoImpl <- OFileRepository(store, schedulerModule.jobs, joexImpl)
fileRepoImpl <- OFileRepository(store, schedulerModule.jobs)
itemLinkImpl <- Resource.pure(OItemLink(store, itemSearchImpl))
} yield new BackendApp[F] {
val pubSub = pubSubT

View File

@ -131,6 +131,22 @@ object JobFactory extends MailAddressCodec {
Some(ReProcessItemArgs.taskName / args.itemId)
)
def multiUpload[F[_]: Sync](
args: ProcessItemArgs,
account: AccountId,
prio: Priority,
tracker: Option[Ident]
): F[Job[ProcessItemArgs]] =
Job.createNew(
ProcessItemArgs.multiUploadTaskName,
account.collective,
args,
args.makeSubject,
account.user,
prio,
tracker
)
def processItem[F[_]: Sync](
args: ProcessItemArgs,
account: AccountId,
@ -148,11 +164,11 @@ object JobFactory extends MailAddressCodec {
)
def processItems[F[_]: Sync](
args: Vector[ProcessItemArgs],
args: List[ProcessItemArgs],
account: AccountId,
prio: Priority,
tracker: Option[Ident]
): F[Vector[Job[ProcessItemArgs]]] = {
): F[List[Job[ProcessItemArgs]]] = {
def create(arg: ProcessItemArgs): F[Job[ProcessItemArgs]] =
Job.createNew(
ProcessItemArgs.taskName,

View File

@ -78,8 +78,7 @@ trait OCollective[F[_]] {
*/
def generatePreviews(
storeMode: MakePreviewArgs.StoreMode,
account: AccountId,
notifyJoex: Boolean
account: AccountId
): F[UpdateResult]
}
@ -206,7 +205,6 @@ object OCollective {
)
_ <- uts
.executeNow(UserTaskScope(collective), args.makeSubject.some, ut)
_ <- joex.notifyAllNodes
} yield ()
def startEmptyTrash(args: EmptyTrashArgs): F[Unit] =
@ -222,7 +220,6 @@ object OCollective {
)
_ <- uts
.executeNow(UserTaskScope(args.collective), args.makeSubject.some, ut)
_ <- joex.notifyAllNodes
} yield ()
def findSettings(collective: Ident): F[Option[OCollective.Settings]] =
@ -313,8 +310,7 @@ object OCollective {
def generatePreviews(
storeMode: MakePreviewArgs.StoreMode,
account: AccountId,
notifyJoex: Boolean
account: AccountId
): F[UpdateResult] =
for {
job <- JobFactory.allPreviews[F](
@ -322,7 +318,6 @@ object OCollective {
Some(account.user)
)
_ <- jobStore.insertIfNew(job.encode)
_ <- if (notifyJoex) joex.notifyAllNodes else ().pure[F]
} yield UpdateResult.success
})

View File

@ -21,15 +21,9 @@ import scodec.bits.ByteVector
trait OFileRepository[F[_]] {
/** Inserts the job or return None if such a job already is running. */
def cloneFileRepository(
args: FileCopyTaskArgs,
notifyJoex: Boolean
): F[Option[Job[FileCopyTaskArgs]]]
def cloneFileRepository(args: FileCopyTaskArgs): F[Option[Job[FileCopyTaskArgs]]]
def checkIntegrityAll(
part: FileKeyPart,
notifyJoex: Boolean
): F[Option[Job[FileIntegrityCheckArgs]]]
def checkIntegrityAll(part: FileKeyPart): F[Option[Job[FileIntegrityCheckArgs]]]
def checkIntegrity(key: FileKey, hash: Option[ByteVector]): F[Option[IntegrityResult]]
}
@ -40,30 +34,23 @@ object OFileRepository {
def apply[F[_]: Async](
store: Store[F],
jobStore: JobStore[F],
joex: OJoex[F]
jobStore: JobStore[F]
): Resource[F, OFileRepository[F]] =
Resource.pure(new OFileRepository[F] {
private[this] val logger = docspell.logging.getLogger[F]
def cloneFileRepository(
args: FileCopyTaskArgs,
notifyJoex: Boolean
): F[Option[Job[FileCopyTaskArgs]]] =
def cloneFileRepository(args: FileCopyTaskArgs): F[Option[Job[FileCopyTaskArgs]]] =
for {
job <- JobFactory.fileCopy(args)
flag <- jobStore.insertIfNew(job.encode)
_ <- if (notifyJoex) joex.notifyAllNodes else ().pure[F]
} yield Option.when(flag)(job)
def checkIntegrityAll(
part: FileKeyPart,
notifyJoex: Boolean
part: FileKeyPart
): F[Option[Job[FileIntegrityCheckArgs]]] =
for {
job <- JobFactory.integrityCheck(FileIntegrityCheckArgs(part))
flag <- jobStore.insertIfNew(job.encode)
_ <- if (notifyJoex) joex.notifyAllNodes else ().pure[F]
} yield Option.when(flag)(job)
def checkIntegrity(

View File

@ -81,8 +81,7 @@ object OFulltext {
itemSearch: OItemSearch[F],
fts: FtsClient[F],
store: Store[F],
jobStore: JobStore[F],
joex: OJoex[F]
jobStore: JobStore[F]
): Resource[F, OFulltext[F]] =
Resource.pure[F, OFulltext[F]](new OFulltext[F] {
val logger = docspell.logging.getLogger[F]
@ -90,7 +89,7 @@ object OFulltext {
for {
_ <- logger.info(s"Re-index all.")
job <- JobFactory.reIndexAll[F]
_ <- jobStore.insertIfNew(job.encode) *> joex.notifyAllNodes
_ <- jobStore.insertIfNew(job.encode)
} yield ()
def reindexCollective(account: AccountId): F[Unit] =
@ -102,7 +101,7 @@ object OFulltext {
job <- JobFactory.reIndex(account)
_ <-
if (exist.isDefined) ().pure[F]
else jobStore.insertIfNew(job.encode) *> joex.notifyAllNodes
else jobStore.insertIfNew(job.encode)
} yield ()
def findIndexOnly(maxNoteLen: Int)(

View File

@ -183,14 +183,12 @@ trait OItem[F[_]] {
def reprocess(
item: Ident,
attachments: List[Ident],
account: AccountId,
notifyJoex: Boolean
account: AccountId
): F[UpdateResult]
def reprocessAll(
items: Nel[Ident],
account: AccountId,
notifyJoex: Boolean
account: AccountId
): F[UpdateResult]
/** Submits a task that finds all non-converted pdfs and triggers converting them using
@ -198,22 +196,17 @@ trait OItem[F[_]] {
*/
def convertAllPdf(
collective: Option[Ident],
submitter: Option[Ident],
notifyJoex: Boolean
submitter: Option[Ident]
): F[UpdateResult]
/** Submits a task that (re)generates the preview image for an attachment. */
def generatePreview(
args: MakePreviewArgs,
account: AccountId,
notifyJoex: Boolean
account: AccountId
): F[UpdateResult]
/** Submits a task that (re)generates the preview images for all attachments. */
def generateAllPreviews(
storeMode: MakePreviewArgs.StoreMode,
notifyJoex: Boolean
): F[UpdateResult]
def generateAllPreviews(storeMode: MakePreviewArgs.StoreMode): F[UpdateResult]
/** Merges a list of items into one item. The remaining items are deleted. */
def merge(
@ -228,8 +221,7 @@ object OItem {
store: Store[F],
fts: FtsClient[F],
createIndex: CreateIndex[F],
jobStore: JobStore[F],
joex: OJoex[F]
jobStore: JobStore[F]
): Resource[F, OItem[F]] =
for {
otag <- OTag(store)
@ -752,8 +744,7 @@ object OItem {
def reprocess(
item: Ident,
attachments: List[Ident],
account: AccountId,
notifyJoex: Boolean
account: AccountId
): F[UpdateResult] =
(for {
_ <- OptionT(
@ -764,13 +755,11 @@ object OItem {
JobFactory.reprocessItem[F](args, account, Priority.Low)
)
_ <- OptionT.liftF(jobStore.insertIfNew(job.encode))
_ <- OptionT.liftF(if (notifyJoex) joex.notifyAllNodes else ().pure[F])
} yield UpdateResult.success).getOrElse(UpdateResult.notFound)
def reprocessAll(
items: Nel[Ident],
account: AccountId,
notifyJoex: Boolean
account: AccountId
): F[UpdateResult] =
UpdateResult.fromUpdate(for {
items <- store.transact(RItem.filterItems(items, account.collective))
@ -779,39 +768,32 @@ object OItem {
.traverse(arg => JobFactory.reprocessItem[F](arg, account, Priority.Low))
.map(_.map(_.encode))
_ <- jobStore.insertAllIfNew(jobs)
_ <- if (notifyJoex) joex.notifyAllNodes else ().pure[F]
} yield items.size)
def convertAllPdf(
collective: Option[Ident],
submitter: Option[Ident],
notifyJoex: Boolean
submitter: Option[Ident]
): F[UpdateResult] =
for {
job <- JobFactory.convertAllPdfs[F](collective, submitter, Priority.Low)
_ <- jobStore.insertIfNew(job.encode)
_ <- if (notifyJoex) joex.notifyAllNodes else ().pure[F]
} yield UpdateResult.success
def generatePreview(
args: MakePreviewArgs,
account: AccountId,
notifyJoex: Boolean
account: AccountId
): F[UpdateResult] =
for {
job <- JobFactory.makePreview[F](args, account.some)
_ <- jobStore.insertIfNew(job.encode)
_ <- if (notifyJoex) joex.notifyAllNodes else ().pure[F]
} yield UpdateResult.success
def generateAllPreviews(
storeMode: MakePreviewArgs.StoreMode,
notifyJoex: Boolean
storeMode: MakePreviewArgs.StoreMode
): F[UpdateResult] =
for {
job <- JobFactory.allPreviews[F](AllPreviewsArgs(None, storeMode), None)
_ <- jobStore.insertIfNew(job.encode)
_ <- if (notifyJoex) joex.notifyAllNodes else ().pure[F]
} yield UpdateResult.success
private def onSuccessIgnoreError(update: F[Unit])(ar: UpdateResult): F[Unit] =

View File

@ -23,7 +23,6 @@ trait OUpload[F[_]] {
def submit(
data: OUpload.UploadData[F],
account: AccountId,
notifyJoex: Boolean,
itemId: Option[Ident]
): F[OUpload.UploadResult]
@ -34,21 +33,19 @@ trait OUpload[F[_]] {
def submit(
data: OUpload.UploadData[F],
sourceId: Ident,
notifyJoex: Boolean,
itemId: Option[Ident]
): F[OUpload.UploadResult]
final def submitEither(
data: OUpload.UploadData[F],
accOrSrc: Either[Ident, AccountId],
notifyJoex: Boolean,
itemId: Option[Ident]
): F[OUpload.UploadResult] =
accOrSrc match {
case Right(acc) =>
submit(data, acc, notifyJoex, itemId)
submit(data, acc, itemId)
case Left(srcId) =>
submit(data, srcId, notifyJoex, itemId)
submit(data, srcId, itemId)
}
}
@ -68,7 +65,8 @@ object OUpload {
fileFilter: Glob,
tags: List[String],
language: Option[Language],
attachmentsOnly: Option[Boolean]
attachmentsOnly: Option[Boolean],
flattenArchives: Option[Boolean]
)
case class UploadData[F[_]](
@ -108,15 +106,13 @@ object OUpload {
def apply[F[_]: Sync](
store: Store[F],
jobStore: JobStore[F],
joex: OJoex[F]
jobStore: JobStore[F]
): Resource[F, OUpload[F]] =
Resource.pure[F, OUpload[F]](new OUpload[F] {
private[this] val logger = docspell.logging.getLogger[F]
def submit(
data: OUpload.UploadData[F],
account: AccountId,
notifyJoex: Boolean,
itemId: Option[Ident]
): F[OUpload.UploadResult] =
(for {
@ -146,12 +142,10 @@ object OUpload {
false,
data.meta.attachmentsOnly
)
args =
if (data.multiple) files.map(f => ProcessItemArgs(meta, List(f)))
else Vector(ProcessItemArgs(meta, files.toList))
jobs <- right(makeJobs(args, account, data.priority, data.tracker))
args = ProcessItemArgs(meta, files.toList)
jobs <- right(makeJobs(data, args, account))
_ <- right(logger.debug(s"Storing jobs: $jobs"))
res <- right(submitJobs(notifyJoex)(jobs))
res <- right(submitJobs(jobs.map(_.encode)))
_ <- right(
store.transact(
RSource.incrementCounter(data.meta.sourceAbbrev, account.collective)
@ -162,7 +156,6 @@ object OUpload {
def submit(
data: OUpload.UploadData[F],
sourceId: Ident,
notifyJoex: Boolean,
itemId: Option[Ident]
): F[OUpload.UploadResult] =
(for {
@ -182,16 +175,13 @@ object OUpload {
priority = src.source.priority
)
accId = AccountId(src.source.cid, src.source.sid)
result <- OptionT.liftF(submit(updata, accId, notifyJoex, itemId))
result <- OptionT.liftF(submit(updata, accId, itemId))
} yield result).getOrElse(UploadResult.noSource)
private def submitJobs(
notifyJoex: Boolean
)(jobs: Vector[Job[String]]): F[OUpload.UploadResult] =
private def submitJobs(jobs: List[Job[String]]): F[OUpload.UploadResult] =
for {
_ <- logger.debug(s"Storing jobs: $jobs")
_ <- jobStore.insertAll(jobs)
_ <- if (notifyJoex) joex.notifyAllNodes else ().pure[F]
} yield UploadResult.Success
/** Saves the file into the database. */
@ -240,13 +230,24 @@ object OUpload {
else right(().pure[F])
private def makeJobs(
args: Vector[ProcessItemArgs],
account: AccountId,
prio: Priority,
tracker: Option[Ident]
): F[Vector[Job[String]]] =
data: UploadData[F],
args: ProcessItemArgs,
account: AccountId
): F[List[Job[ProcessItemArgs]]] =
if (data.meta.flattenArchives.getOrElse(false))
JobFactory
.processItems[F](args, account, prio, tracker)
.map(_.map(_.encode))
.multiUpload(args, account, data.priority, data.tracker)
.map(List(_))
else if (data.multiple)
JobFactory.processItems(
args.files.map(f => args.copy(files = List(f))),
account,
data.priority,
data.tracker
)
else
JobFactory
.processItem[F](args, account, data.priority, data.tracker)
.map(List(_))
})
}

View File

@ -92,10 +92,7 @@ object OUserTask {
def executeNow[A](scope: UserTaskScope, subject: Option[String], task: UserTask[A])(
implicit E: Encoder[A]
): F[Unit] =
for {
_ <- taskStore.executeNow(scope, subject, task)
_ <- joex.notifyAllNodes
} yield ()
taskStore.executeNow(scope, subject, task)
def getScanMailbox(scope: UserTaskScope): Stream[F, UserTask[ScanMailboxArgs]] =
taskStore

View File

@ -40,6 +40,8 @@ object ProcessItemArgs {
val taskName = Ident.unsafe("process-item")
val multiUploadTaskName = Ident.unsafe("multi-upload-process")
case class ProcessMeta(
collective: Ident,
itemId: Option[Ident],

View File

@ -20,6 +20,7 @@ import docspell.joex.filecopy.{FileCopyTask, FileIntegrityCheckTask}
import docspell.joex.fts.{MigrationTask, ReIndexTask}
import docspell.joex.hk.HouseKeepingTask
import docspell.joex.learn.LearnClassifierTask
import docspell.joex.multiupload.MultiUploadArchiveTask
import docspell.joex.notify.{PeriodicDueItemsTask, PeriodicQueryTask}
import docspell.joex.pagecount.{AllPageCountTask, MakePageCountTask}
import docspell.joex.pdfconv.{ConvertAllPdfTask, PdfConvTask}
@ -64,6 +65,13 @@ final class JoexTasks[F[_]: Async](
ItemHandler.onCancel[F](store)
)
)
.withTask(
JobTask.json(
ProcessItemArgs.multiUploadTaskName,
MultiUploadArchiveTask[F](store, jobStoreModule.jobs),
MultiUploadArchiveTask.onCancel[F](store)
)
)
.withTask(
JobTask.json(
ReProcessItemArgs.taskName,
@ -109,7 +117,7 @@ final class JoexTasks[F[_]: Async](
.withTask(
JobTask.json(
ConvertAllPdfArgs.taskName,
ConvertAllPdfTask[F](jobStoreModule.jobs, joex, store),
ConvertAllPdfTask[F](jobStoreModule.jobs, store),
ConvertAllPdfTask.onCancel[F]
)
)
@ -130,7 +138,7 @@ final class JoexTasks[F[_]: Async](
.withTask(
JobTask.json(
AllPreviewsArgs.taskName,
AllPreviewsTask[F](jobStoreModule.jobs, joex, store),
AllPreviewsTask[F](jobStoreModule.jobs, store),
AllPreviewsTask.onCancel[F]
)
)
@ -144,7 +152,7 @@ final class JoexTasks[F[_]: Async](
.withTask(
JobTask.json(
AllPageCountTask.taskName,
AllPageCountTask[F](store, jobStoreModule.jobs, joex),
AllPageCountTask[F](store, jobStoreModule.jobs),
AllPageCountTask.onCancel[F]
)
)
@ -212,16 +220,16 @@ object JoexTasks {
for {
joex <- OJoex(pubSub)
store = jobStoreModule.store
upload <- OUpload(store, jobStoreModule.jobs, joex)
upload <- OUpload(store, jobStoreModule.jobs)
fts <- createFtsClient(cfg)(httpClient)
createIndex <- CreateIndex.resource(fts, store)
itemOps <- OItem(store, fts, createIndex, jobStoreModule.jobs, joex)
itemOps <- OItem(store, fts, createIndex, jobStoreModule.jobs)
itemSearchOps <- OItemSearch(store)
analyser <- TextAnalyser.create[F](cfg.textAnalysis.textAnalysisConfig)
regexNer <- RegexNerFile(cfg.textAnalysis.regexNerFileConfig, store)
updateCheck <- UpdateCheck.resource(httpClient)
notification <- ONotification(store, notificationModule)
fileRepo <- OFileRepository(store, jobStoreModule.jobs, joex)
fileRepo <- OFileRepository(store, jobStoreModule.jobs)
} yield new JoexTasks[F](
cfg,
store,

View File

@ -39,7 +39,7 @@ object EmptyTrashTask {
def apply[F[_]: Async](
itemOps: OItem[F],
itemSearchOps: OItemSearch[F]
): Task[F, Args, Unit] =
): Task[F, Args, Result] =
Task { ctx =>
for {
now <- Timestamp.current[F]
@ -49,7 +49,7 @@ object EmptyTrashTask {
)
nDeleted <- deleteAll(ctx.args, maxDate, itemOps, itemSearchOps, ctx)
_ <- ctx.logger.info(s"Finished deleting $nDeleted items")
} yield ()
} yield Result(nDeleted)
}
private def deleteAll[F[_]: Async](

View File

@ -0,0 +1,24 @@
/*
* Copyright 2020 Eike K. & Contributors
*
* SPDX-License-Identifier: AGPL-3.0-or-later
*/
package docspell.joex.emptytrash
import docspell.scheduler.JobTaskResultEncoder
import io.circe.Encoder
import io.circe.generic.semiauto.deriveEncoder
final case class Result(removed: Int)
object Result {
implicit val jsonEncoder: Encoder[Result] =
deriveEncoder
implicit val jobTaskResultEncoder: JobTaskResultEncoder[Result] =
JobTaskResultEncoder.fromJson[Result].withMessage { result =>
s"Deleted ${result.removed} items."
}
}

View File

@ -0,0 +1,143 @@
/*
* Copyright 2020 Eike K. & Contributors
*
* SPDX-License-Identifier: AGPL-3.0-or-later
*/
package docspell.joex.multiupload
import cats.Monoid
import cats.data.OptionT
import cats.effect._
import cats.implicits._
import fs2.Stream
import docspell.backend.JobFactory
import docspell.common._
import docspell.files.Zip
import docspell.logging.Logger
import docspell.scheduler._
import docspell.store.Store
/** Task to submit multiple files at once. By default, one file in an upload results in
* one item. Zip files are extracted, but its inner files are considered to be one item
* with (perhaps) multiple attachments.
*
* In contrast, this task extracts ZIP files (not recursively) and submits each extracted
* file to be processed separately. Non-zip files are submitted as is. If zip files
* contain other zip file, these inner zip files will result in one item each, only the
* outer zip file is extracted here.
*
* Note: the outer zip file only acts as a container to transport multiple files and is
* NOT kept in docspell!
*/
object MultiUploadArchiveTask {
type Args = ProcessItemArgs
def apply[F[_]: Async](store: Store[F], jobStore: JobStore[F]): Task[F, Args, Result] =
Task { ctx =>
ctx.args.files
.traverse { file =>
isZipFile(store)(file).flatMap {
case true =>
ctx.logger.info(s"Extracting zip file ${file.name}") *>
extractZip(store, ctx.args)(file)
.evalTap(entry =>
ctx.logger.debug(
s"Create job for entry: ${entry.files.flatMap(_.name)}"
)
)
.evalMap(makeJob[F](ctx, jobStore))
.compile
.toList
.map(Jobs.extracted(file))
case false =>
makeJob(ctx, jobStore)(ctx.args.copy(files = List(file))).map(Jobs.normal)
}
}
.map(_.combineAll)
.flatTap(jobs => jobStore.insertAll(jobs.jobs))
.flatTap(deleteZips(store, ctx.logger))
.map(_.result)
.flatTap(result =>
ctx.logger.info(
s"Submitted ${result.submittedFiles}, extracted ${result.extractedZips} zips."
)
)
}
def onCancel[F[_]: Sync](store: Store[F]): Task[F, ProcessItemArgs, Unit] =
Task { ctx =>
for {
_ <- ctx.logger.warn("Cancelling multi-upload task, deleting uploaded files.")
_ <- ctx.args.files.map(_.fileMetaId).traverse(store.fileRepo.delete).void
} yield ()
}
private def deleteZips[F[_]: Sync](store: Store[F], logger: Logger[F])(
jobs: Jobs
): F[Unit] =
logger.info(s"Deleting ${jobs.zips.size} extracted zip fies.") *>
jobs.zips.map(_.fileMetaId).traverse(store.fileRepo.delete).void
private def makeJob[F[_]: Sync](ctx: Context[F, Args], jobStore: JobStore[F])(
args: ProcessItemArgs
): F[Job[String]] =
for {
currentJob <- jobStore.findById(ctx.jobId)
prio = currentJob.map(_.priority).getOrElse(Priority.Low)
submitter = currentJob.map(_.submitter).getOrElse(DocspellSystem.user)
job <- JobFactory.processItem(
args,
AccountId(ctx.args.meta.collective, submitter),
prio,
None
)
} yield job.encode
private def isZipFile[F[_]: Sync](
store: Store[F]
)(file: ProcessItemArgs.File): F[Boolean] =
OptionT(store.fileRepo.findMeta(file.fileMetaId))
.map(_.mimetype.matches(MimeType.zip))
.getOrElse(false)
private def extractZip[F[_]: Async](
store: Store[F],
args: Args
)(file: ProcessItemArgs.File): Stream[F, ProcessItemArgs] =
store.fileRepo
.getBytes(file.fileMetaId)
.through(Zip.unzipP[F](8192, args.meta.fileFilter.getOrElse(Glob.all)))
.flatMap { entry =>
val hint = MimeTypeHint(entry.name.some, entry.mime.asString.some)
entry.data
.through(
store.fileRepo.save(args.meta.collective, FileCategory.AttachmentSource, hint)
)
.map(key =>
args.copy(files = ProcessItemArgs.File(entry.name.some, key) :: Nil)
)
}
case class Jobs(
result: Result,
jobs: List[Job[String]],
zips: List[ProcessItemArgs.File]
)
object Jobs {
def extracted(zip: ProcessItemArgs.File)(jobs: List[Job[String]]): Jobs =
Jobs(Result(jobs.size, 1), jobs, List(zip))
def normal(job: Job[String]): Jobs =
Jobs(Result.notExtracted, List(job), Nil)
val empty: Jobs = Jobs(Result.empty, Nil, Nil)
implicit val jobsMonoid: Monoid[Jobs] =
Monoid.instance(
empty,
(a, b) => Jobs(a.result.combine(b.result), a.jobs ::: b.jobs, a.zips ::: b.zips)
)
}
}

View File

@ -0,0 +1,36 @@
/*
* Copyright 2020 Eike K. & Contributors
*
* SPDX-License-Identifier: AGPL-3.0-or-later
*/
package docspell.joex.multiupload
import cats.Monoid
import docspell.scheduler.JobTaskResultEncoder
import io.circe.Encoder
import io.circe.generic.semiauto.deriveEncoder
case class Result(submittedFiles: Int, extractedZips: Int)
object Result {
val empty: Result = Result(0, 0)
def notExtracted: Result = Result(1, 0)
implicit val resultMonoid: Monoid[Result] =
Monoid.instance(
empty,
(a, b) =>
Result(a.submittedFiles + b.submittedFiles, a.extractedZips + b.extractedZips)
)
implicit val jsonEncoder: Encoder[Result] =
deriveEncoder
implicit val taskResultEncoder: JobTaskResultEncoder[Result] =
JobTaskResultEncoder.fromJson[Result].withMessage { result =>
s"Submitted ${result.submittedFiles} files, extracted ${result.extractedZips} zip files."
}
}

View File

@ -11,7 +11,6 @@ import cats.implicits._
import fs2.{Chunk, Stream}
import docspell.backend.JobFactory
import docspell.backend.ops.OJoex
import docspell.common._
import docspell.scheduler._
import docspell.store.Store
@ -24,15 +23,13 @@ object AllPageCountTask {
def apply[F[_]: Sync](
store: Store[F],
jobStore: JobStore[F],
joex: OJoex[F]
jobStore: JobStore[F]
): Task[F, Args, Unit] =
Task { ctx =>
for {
_ <- ctx.logger.info("Generating previews for attachments")
n <- submitConversionJobs(ctx, store, jobStore)
_ <- ctx.logger.info(s"Submitted $n jobs")
_ <- joex.notifyAllNodes
} yield ()
}

View File

@ -10,7 +10,6 @@ import cats.effect._
import cats.implicits._
import fs2.{Chunk, Stream}
import docspell.backend.ops.OJoex
import docspell.common._
import docspell.scheduler._
import docspell.store.Store
@ -25,7 +24,6 @@ object ConvertAllPdfTask {
def apply[F[_]: Sync](
jobStore: JobStore[F],
joex: OJoex[F],
store: Store[F]
): Task[F, Args, Unit] =
Task { ctx =>
@ -33,7 +31,6 @@ object ConvertAllPdfTask {
_ <- ctx.logger.info("Converting pdfs using ocrmypdf")
n <- submitConversionJobs(ctx, store, jobStore)
_ <- ctx.logger.info(s"Submitted $n file conversion jobs")
_ <- joex.notifyAllNodes
} yield ()
}

View File

@ -11,7 +11,6 @@ import cats.implicits._
import fs2.{Chunk, Stream}
import docspell.backend.JobFactory
import docspell.backend.ops.OJoex
import docspell.common.MakePreviewArgs.StoreMode
import docspell.common._
import docspell.scheduler._
@ -24,7 +23,6 @@ object AllPreviewsTask {
def apply[F[_]: Sync](
jobStore: JobStore[F],
joex: OJoex[F],
store: Store[F]
): Task[F, Args, Unit] =
Task { ctx =>
@ -32,7 +30,6 @@ object AllPreviewsTask {
_ <- ctx.logger.info("Generating previews for attachments")
n <- submitConversionJobs(ctx, store, jobStore)
_ <- ctx.logger.info(s"Submitted $n jobs")
_ <- joex.notifyAllNodes
} yield ()
}

View File

@ -327,7 +327,8 @@ object ScanMailboxTask {
args.fileFilter.getOrElse(Glob.all),
args.tags.getOrElse(Nil),
args.language,
args.attachmentsOnly
args.attachmentsOnly,
None
)
data = OUpload.UploadData(
multiple = false,
@ -336,7 +337,7 @@ object ScanMailboxTask {
priority = Priority.Low,
tracker = None
)
res <- upload.submit(data, ctx.args.account, false, None)
res <- upload.submit(data, ctx.args.account, None)
} yield res
}

View File

@ -7514,6 +7514,15 @@ components:
If `true` (the default) each file in the upload request
results in a separate item. If it is set to `false`, then
all files in the request are put into a single item.
flattenArchives:
type: boolean
default: false
description: |
This implies `multiple = true` and will (when `true`)
treat every ZIP file as a container of independent files
that will result in separate items. When this is `false`
then each zip file will result in one item with its
contents being the attachments.
direction:
type: string
format: direction

View File

@ -354,7 +354,8 @@ trait Conversions {
m.fileFilter.getOrElse(Glob.all),
m.tags.map(_.items).getOrElse(Nil),
m.language,
m.attachmentsOnly
m.attachmentsOnly,
m.flattenArchives
)
)
)
@ -371,6 +372,7 @@ trait Conversions {
Glob.all,
Nil,
None,
None,
None
)
)

View File

@ -121,8 +121,7 @@ object AttachmentRoutes {
for {
res <- backend.item.generatePreview(
MakePreviewArgs.replace(id),
user.account,
true
user.account
)
resp <- Ok(
Conversions.basicResult(res, "Generating preview image task submitted.")
@ -169,7 +168,7 @@ object AttachmentRoutes {
HttpRoutes.of {
case POST -> Root / "generatePreviews" =>
for {
res <- backend.item.generateAllPreviews(MakePreviewArgs.StoreMode.Replace, true)
res <- backend.item.generateAllPreviews(MakePreviewArgs.StoreMode.Replace)
resp <- Ok(
Conversions.basicResult(res, "Generate all previews task submitted.")
)
@ -178,7 +177,7 @@ object AttachmentRoutes {
case POST -> Root / "convertallpdfs" =>
for {
res <-
backend.item.convertAllPdf(None, None, true)
backend.item.convertAllPdf(None, None)
resp <- Ok(Conversions.basicResult(res, "Convert all PDFs task submitted"))
} yield resp
}

View File

@ -32,7 +32,7 @@ object FileRepositoryRoutes {
for {
input <- req.as[FileRepositoryCloneRequest]
args = makeTaskArgs(input)
job <- backend.fileRepository.cloneFileRepository(args, true)
job <- backend.fileRepository.cloneFileRepository(args)
result = BasicResult(
job.isDefined,
job.fold(s"Job for '${FileCopyTaskArgs.taskName.id}' already running")(j =>
@ -46,7 +46,7 @@ object FileRepositoryRoutes {
case req @ POST -> Root / "integrityCheck" =>
for {
input <- req.as[FileKeyPart]
job <- backend.fileRepository.checkIntegrityAll(input, true)
job <- backend.fileRepository.checkIntegrityAll(input)
result = BasicResult(
job.isDefined,
job.fold(s"Job for '${FileCopyTaskArgs.taskName.id}' already running")(j =>

View File

@ -111,7 +111,7 @@ object IntegrationEndpointRoutes {
cfg.backend.files.validMimeTypes
)
account = AccountId(coll, DocspellSystem.user)
result <- backend.upload.submit(updata, account, true, None)
result <- backend.upload.submit(updata, account, None)
res <- Ok(basicResult(result))
} yield res
}

View File

@ -181,7 +181,7 @@ object ItemMultiRoutes extends NonEmptyListSupport with MultiIdSupport {
for {
json <- req.as[IdList]
items <- requireNonEmpty(json.ids)
res <- backend.item.reprocessAll(items, user.account, true)
res <- backend.item.reprocessAll(items, user.account)
resp <- Ok(Conversions.basicResult(res, "Re-process task(s) submitted."))
} yield resp

View File

@ -388,7 +388,7 @@ object ItemRoutes {
for {
data <- req.as[IdList]
_ <- logger.debug(s"Re-process item ${id.id}")
res <- backend.item.reprocess(id, data.ids, user.account, true)
res <- backend.item.reprocess(id, data.ids, user.account)
resp <- Ok(Conversions.basicResult(res, "Re-process task submitted."))
} yield resp

View File

@ -96,7 +96,7 @@ object UploadRoutes {
prio,
cfg.backend.files.validMimeTypes
)
result <- backend.upload.submitEither(updata, accOrSrc, true, itemId)
result <- backend.upload.submitEither(updata, accOrSrc, itemId)
res <- Ok(basicResult(result))
} yield res
}

View File

@ -6,6 +6,8 @@
package docspell.scheduler
import docspell.common.Ident
trait JobStore[F[_]] {
/** Inserts the job into the queue to get picked up as soon as possible. The job must
@ -24,4 +26,5 @@ trait JobStore[F[_]] {
def insertAllIfNew(jobs: Seq[Job[String]]): F[List[Boolean]]
def findById(jobId: Ident): F[Option[Job[String]]]
}

View File

@ -6,10 +6,11 @@
package docspell.scheduler.impl
import cats.data.OptionT
import cats.effect.Sync
import cats.syntax.all._
import docspell.common.Timestamp
import docspell.common.{Ident, Timestamp}
import docspell.scheduler._
import docspell.store.Store
import docspell.store.records.RJob
@ -72,6 +73,14 @@ final class JobStoreImpl[F[_]: Sync](store: Store[F]) extends JobStore[F] {
})
}
def findById(jobId: Ident) =
OptionT(store.transact(RJob.findById(jobId)))
.map(toJob)
.value
def toJob(r: RJob): Job[String] =
Job(r.id, r.task, r.group, r.args, r.subject, r.submitter, r.priority, r.tracker)
def toRecord(job: Job[String], timestamp: Timestamp): RJob =
RJob.newJob(
job.id,

View File

@ -9,11 +9,11 @@ package docspell.scheduler.impl
import cats.effect._
import cats.implicits._
import docspell.common.JobState
import docspell.common.{Ident, JobState}
import docspell.notification.api.{Event, EventSink}
import docspell.pubsub.api.PubSubT
import docspell.scheduler._
import docspell.scheduler.msg.JobSubmitted
import docspell.scheduler.msg.{JobSubmitted, JobsNotify}
import docspell.store.Store
final class JobStorePublish[F[_]: Sync](
@ -40,30 +40,42 @@ final class JobStorePublish[F[_]: Sync](
pubsub.publish1(JobSubmitted.topic, msg(job)).as(()) *>
eventSink.offer(event(job))
private def notifyJoex: F[Unit] =
pubsub.publish1IgnoreErrors(JobsNotify(), ()).void
def insert(job: Job[String]) =
delegate.insert(job).flatTap(_ => publish(job))
delegate.insert(job).flatTap(_ => publish(job) *> notifyJoex)
def insertIfNew(job: Job[String]) =
delegate.insertIfNew(job).flatTap {
case true => publish(job)
case true => publish(job) *> notifyJoex
case false => ().pure[F]
}
def insertAll(jobs: Seq[Job[String]]) =
delegate.insertAll(jobs).flatTap { results =>
delegate
.insertAll(jobs)
.flatTap { results =>
results.zip(jobs).traverse { case (res, job) =>
if (res) publish(job)
else ().pure[F]
}
}
.flatTap(_ => notifyJoex)
def insertAllIfNew(jobs: Seq[Job[String]]) =
delegate.insertAllIfNew(jobs).flatTap { results =>
delegate
.insertAllIfNew(jobs)
.flatTap { results =>
results.zip(jobs).traverse { case (res, job) =>
if (res) publish(job)
else ().pure[F]
}
}
.flatTap(_ => notifyJoex)
def findById(jobId: Ident) =
delegate.findById(jobId)
}
object JobStorePublish {

View File

@ -43,6 +43,7 @@ type alias Model =
, skipDuplicates : Bool
, languageModel : Comp.FixedDropdown.Model Language
, language : Maybe Language
, flattenArchives : Bool
}
@ -56,6 +57,7 @@ type Msg
| DropzoneMsg Comp.Dropzone.Msg
| ToggleSkipDuplicates
| LanguageMsg (Comp.FixedDropdown.Msg Language)
| ToggleFlattenArchives
init : Model
@ -71,6 +73,7 @@ init =
, languageModel =
Comp.FixedDropdown.init Data.Language.all
, language = Nothing
, flattenArchives = False
}
@ -132,11 +135,44 @@ update sourceId flags msg model =
( { model | incoming = not model.incoming }, Cmd.none, Sub.none )
ToggleSingleItem ->
( { model | singleItem = not model.singleItem }, Cmd.none, Sub.none )
let
newFlag =
not model.singleItem
in
( { model
| singleItem = newFlag
, flattenArchives =
if newFlag then
False
else
model.flattenArchives
}
, Cmd.none
, Sub.none
)
ToggleSkipDuplicates ->
( { model | skipDuplicates = not model.skipDuplicates }, Cmd.none, Sub.none )
ToggleFlattenArchives ->
let
newFlag =
not model.flattenArchives
in
( { model
| flattenArchives = newFlag
, singleItem =
if newFlag then
False
else
model.singleItem
}
, Cmd.none
, Sub.none
)
SubmitUpload ->
let
emptyMeta =
@ -153,6 +189,7 @@ update sourceId flags msg model =
else
Just "outgoing"
, language = Maybe.map Data.Language.toIso3 model.language
, flattenArchives = Just model.flattenArchives
}
fileids =
@ -403,6 +440,20 @@ renderForm texts model =
]
]
]
, div [ class "flex flex-col mb-3" ]
[ label [ class "inline-flex items-center" ]
[ input
[ type_ "checkbox"
, checked model.flattenArchives
, onCheck (\_ -> ToggleFlattenArchives)
, class Styles.checkboxInput
]
[]
, span [ class "ml-2" ]
[ text texts.flattenArchives
]
]
]
, div [ class "flex flex-col mb-3" ]
[ label [ class "inline-flex items-center" ]
[ input

View File

@ -35,6 +35,7 @@ type alias Texts =
}
, selectedFiles : String
, languageLabel : Language -> String
, flattenArchives : String
}
@ -65,6 +66,7 @@ gb =
}
, selectedFiles = "Selected Files"
, languageLabel = Messages.Data.Language.gb
, flattenArchives = "Extract zip file contents into separate items, in contrast to a single document with multiple attachments."
}
@ -95,6 +97,7 @@ de =
}
, selectedFiles = "Ausgewählte Dateien"
, languageLabel = Messages.Data.Language.de
, flattenArchives = "ZIP Dateien in separate Dokumente entpacken, anstatt ein Dokument mit mehreren Anhängen."
}
@ -125,4 +128,5 @@ fr =
}
, selectedFiles = "Fichiers séléctionnés"
, languageLabel = Messages.Data.Language.fr
, flattenArchives = "Décompresser les fichiers ZIP dans des documents séparés au lieu de créer un document avec plusieurs pièces jointes."
}

View File

@ -53,6 +53,7 @@ specified via a JSON structure in a part with name `meta`:
, fileFilter: Maybe String
, language: Maybe String
, attachmentsOnly: Maybe Bool
, flattenArchives: Maybe Bool
}
```
@ -95,7 +96,16 @@ specified via a JSON structure in a part with name `meta`:
`*.eml`). If this is `true`, then the e-mail body is discarded and
only the attachments are imported. An e-mail without any attachments
is therefore skipped.
- `flattenArchives` is flag to control how zip files are treated. When
this is `false` (the default), then one zip file results in one item
and its contents are the attachments. If you rather want the
contents to be treated as independent files, then set this to
`true`. This will submit each entry in the zip file as a separate
processing job. Note: when this is `true` the zip file is just a
container and doesn't contain other useful information and therefore
is *NOT* kept in docspell, only its contents are. Also note that
only the uploaded zip files are extracted once (not recursively), so
if it contains other zip files, they are treated as normal.
# Endpoints