From 6cef9d4f075897f1855f5b105880e536200050a8 Mon Sep 17 00:00:00 2001 From: eikek Date: Sat, 18 Jun 2022 16:37:38 +0200 Subject: [PATCH] Improve performance of zip/unzip Adds tests and includes some cleanup --- build.sbt | 40 +-- .../scala/docspell/addons/AddonArchive.scala | 10 +- .../scala/docspell/addons/AddonExecutor.scala | 1 + .../scala/docspell/addons/AddonMeta.scala | 30 ++- .../main/scala/docspell/addons/InputEnv.scala | 1 + .../docspell/addons/AddonGenerator.scala | 6 +- .../scala/docspell/addons/AddonMetaTest.scala | 7 +- .../docspell/backend/joex/AddonOps.scala | 1 + .../backend/joex/AddonPostProcess.scala | 6 +- .../main/scala/docspell/common/Binary.scala | 40 +++ .../docspell/common/syntax/FileSyntax.scala | 38 ++- .../docspell/common/syntax/StreamSyntax.scala | 11 + .../docspell/common/syntax/package.scala | 5 + .../docspell/common/util}/Directory.scala | 59 ++++- .../main/scala/docspell/common/util/Zip.scala | 43 ++++ .../scala/docspell/common/util/ZipImpl.scala | 233 ++++++++++++++++++ .../docspell/common/util/DirectoryTest.scala | 114 +++++++++ .../scala/docspell/files/FileSupport.scala | 47 ++-- .../src/main/scala/docspell/files/Zip.scala | 180 -------------- .../test/scala/docspell/files/ZipTest.scala | 112 +++++++-- .../joex/addon/GenericItemAddonTask.scala | 4 +- .../joex/download/DownloadZipTask.scala | 4 +- .../multiupload/MultiUploadArchiveTask.scala | 7 +- .../joex/process/ExtractArchive.scala | 5 +- 24 files changed, 711 insertions(+), 293 deletions(-) rename modules/{addonlib/src/main/scala/docspell/addons => common/src/main/scala/docspell/common/util}/Directory.scala (53%) create mode 100644 modules/common/src/main/scala/docspell/common/util/Zip.scala create mode 100644 modules/common/src/main/scala/docspell/common/util/ZipImpl.scala create mode 100644 modules/common/src/test/scala/docspell/common/util/DirectoryTest.scala delete mode 100644 modules/files/src/main/scala/docspell/files/Zip.scala diff --git a/build.sbt b/build.sbt index 632fc718..d5f5dd0e 100644 --- a/build.sbt +++ b/build.sbt @@ -321,25 +321,6 @@ val loggingApi = project Dependencies.sourcecode ) -// Base module, everything depends on this – including restapi and -// joexapi modules. This should aim to have least possible -// dependencies -val common = project - .in(file("modules/common")) - .disablePlugins(RevolverPlugin) - .settings(sharedSettings) - .withTestSettings - .settings( - name := "docspell-common", - libraryDependencies ++= - Dependencies.fs2 ++ - Dependencies.circe ++ - Dependencies.circeGenericExtra ++ - Dependencies.calevCore ++ - Dependencies.calevCirce - ) - .dependsOn(loggingApi) - val loggingScribe = project .in(file("modules/logging/scribe")) .disablePlugins(RevolverPlugin) @@ -355,6 +336,25 @@ val loggingScribe = project ) .dependsOn(loggingApi) +// Base module, everything depends on this – including restapi and +// joexapi modules. This should aim to have least possible +// dependencies +val common = project + .in(file("modules/common")) + .disablePlugins(RevolverPlugin) + .settings(sharedSettings) + .withTestSettingsDependsOn(loggingScribe) + .settings( + name := "docspell-common", + libraryDependencies ++= + Dependencies.fs2 ++ + Dependencies.circe ++ + Dependencies.circeGenericExtra ++ + Dependencies.calevCore ++ + Dependencies.calevCirce + ) + .dependsOn(loggingApi) + // Some example files for testing // https://file-examples.com/index.php/sample-documents-download/sample-doc-download/ val files = project @@ -393,7 +393,7 @@ ${lines.map(_._1).mkString(",\n")} Seq(target) }.taskValue ) - .dependsOn(common) + .dependsOn(common, loggingScribe) val query = crossProject(JSPlatform, JVMPlatform) diff --git a/modules/addonlib/src/main/scala/docspell/addons/AddonArchive.scala b/modules/addonlib/src/main/scala/docspell/addons/AddonArchive.scala index 3c2d6051..e12d8354 100644 --- a/modules/addonlib/src/main/scala/docspell/addons/AddonArchive.scala +++ b/modules/addonlib/src/main/scala/docspell/addons/AddonArchive.scala @@ -12,7 +12,8 @@ import fs2.Stream import fs2.io.file.{Files, Path} import docspell.common._ -import docspell.files.Zip +import docspell.common.syntax.file._ +import docspell.common.util.{Directory, Zip} final case class AddonArchive(url: LenientUri, name: String, version: String) { def nameAndVersion: String = @@ -36,8 +37,8 @@ final case class AddonArchive(url: LenientUri, name: String, version: String) { case false => Files[F].createDirectories(target) *> reader(url) - .through(Zip.unzip(8192, glob)) - .through(Zip.saveTo(logger, target, moveUp = true)) + .through(Zip[F](logger.some).unzip(glob = glob, targetDir = target.some)) + .evalTap(_ => Directory.unwrapSingle[F](logger, target)) .compile .drain .as(target) @@ -72,12 +73,13 @@ object AddonArchive { archive: Either[Path, Stream[F, Byte]] ): F[(Boolean, Boolean)] = { val files = Files[F] + val logger = docspell.logging.getLogger[F] def forPath(path: Path): F[(Boolean, Boolean)] = (files.exists(path / "Dockerfile"), files.exists(path / "flake.nix")).tupled def forZip(data: Stream[F, Byte]): F[(Boolean, Boolean)] = data - .through(Zip.unzip(8192, Glob("Dockerfile|flake.nix"))) + .through(Zip[F](logger.some).unzip(glob = Glob("Dockerfile|flake.nix"))) .collect { case bin if bin.name == "Dockerfile" => (true, false) case bin if bin.name == "flake.nix" => (false, true) diff --git a/modules/addonlib/src/main/scala/docspell/addons/AddonExecutor.scala b/modules/addonlib/src/main/scala/docspell/addons/AddonExecutor.scala index 68715cf3..79a496ca 100644 --- a/modules/addonlib/src/main/scala/docspell/addons/AddonExecutor.scala +++ b/modules/addonlib/src/main/scala/docspell/addons/AddonExecutor.scala @@ -14,6 +14,7 @@ import fs2.io.file._ import docspell.common.UrlReader import docspell.common.exec.Env +import docspell.common.util.Directory import docspell.logging.Logger trait AddonExecutor[F[_]] { diff --git a/modules/addonlib/src/main/scala/docspell/addons/AddonMeta.scala b/modules/addonlib/src/main/scala/docspell/addons/AddonMeta.scala index 22e2aa7c..e68917d6 100644 --- a/modules/addonlib/src/main/scala/docspell/addons/AddonMeta.scala +++ b/modules/addonlib/src/main/scala/docspell/addons/AddonMeta.scala @@ -15,7 +15,8 @@ import fs2.Stream import fs2.io.file.{Files, Path} import docspell.common.Glob -import docspell.files.Zip +import docspell.common.syntax.file._ +import docspell.common.util.Zip import io.circe.generic.semiauto.{deriveDecoder, deriveEncoder} import io.circe.yaml.{parser => YamlParser} @@ -153,6 +154,12 @@ object AddonMeta { .map(fromJsonString) .rethrow + def fromJsonFile[F[_]: Sync](file: Path): F[AddonMeta] = + Sync[F] + .blocking(java.nio.file.Files.readString(file.toNioPath)) + .map(fromJsonString) + .rethrow + def fromYamlString(str: String): Either[Throwable, AddonMeta] = YamlParser.parse(str).flatMap(_.as[AddonMeta]) @@ -164,6 +171,13 @@ object AddonMeta { .map(fromYamlString) .rethrow + def fromYamlFile[F[_]: Sync](file: Path): F[AddonMeta] = + Sync[F] + .blocking(YamlParser.parse(java.nio.file.Files.newBufferedReader(file.toNioPath))) + .rethrow + .map(_.as[AddonMeta]) + .rethrow + def findInDirectory[F[_]: Sync: Files](dir: Path): F[AddonMeta] = { val logger = docspell.logging.getLogger[F] val jsonFile = dir / "docspell-addon.json" @@ -194,18 +208,22 @@ object AddonMeta { } def findInZip[F[_]: Async](zipFile: Stream[F, Byte]): F[AddonMeta] = { + val logger = docspell.logging.getLogger[F] val fail: F[AddonMeta] = Async[F].raiseError( new FileNotFoundException( s"No docspell-addon.{yaml|json} file found in zip!" ) ) zipFile - .through(Zip.unzip(8192, Glob("docspell-addon.*|**/docspell-addon.*"))) - .filter(bin => !bin.name.endsWith("/")) + .through( + Zip[F](logger.some).unzip(glob = Glob("docspell-addon.*|**/docspell-addon.*")) + ) + .filter(file => !file.name.endsWith("/")) .flatMap { bin => - if (bin.extensionIn(Set("json"))) Stream.eval(AddonMeta.fromJsonBytes(bin.data)) - else if (bin.extensionIn(Set("yaml", "yml"))) - Stream.eval(AddonMeta.fromYamlBytes(bin.data)) + val ext = bin.extension + if (ext.equalsIgnoreCase("json")) Stream.eval(AddonMeta.fromJsonFile(bin)) + else if (Set("yaml", "yml").contains(ext.toLowerCase)) + Stream.eval(AddonMeta.fromYamlFile(bin)) else Stream.empty } .take(1) diff --git a/modules/addonlib/src/main/scala/docspell/addons/InputEnv.scala b/modules/addonlib/src/main/scala/docspell/addons/InputEnv.scala index bd5fb7cc..3163c03c 100644 --- a/modules/addonlib/src/main/scala/docspell/addons/InputEnv.scala +++ b/modules/addonlib/src/main/scala/docspell/addons/InputEnv.scala @@ -10,6 +10,7 @@ import cats.effect.Resource import fs2.io.file.{Files, Path} import docspell.common.exec.Env +import docspell.common.util.Directory case class InputEnv( addons: List[AddonRef], diff --git a/modules/addonlib/src/test/scala/docspell/addons/AddonGenerator.scala b/modules/addonlib/src/test/scala/docspell/addons/AddonGenerator.scala index 11a546b9..63c162ee 100644 --- a/modules/addonlib/src/test/scala/docspell/addons/AddonGenerator.scala +++ b/modules/addonlib/src/test/scala/docspell/addons/AddonGenerator.scala @@ -13,7 +13,7 @@ import fs2.io.file.{Files, Path, PosixPermissions} import docspell.addons.out.AddonOutput import docspell.common.LenientUri -import docspell.files.Zip +import docspell.common.util.Zip import io.circe.syntax._ @@ -59,9 +59,9 @@ object AddonGenerator { private def createZip(dir: Path, files: List[Path]) = Stream .emits(files) - .map(f => (f.fileName.toString, Files[IO].readAll(f))) + .map(f => (f.fileName.toString, f)) .covary[IO] - .through(Zip.zip[IO](logger, 8192)) + .through(Zip[IO](logger.some).zipFiles()) .through(Files[IO].writeAll(dir / "addon.zip")) .compile .drain diff --git a/modules/addonlib/src/test/scala/docspell/addons/AddonMetaTest.scala b/modules/addonlib/src/test/scala/docspell/addons/AddonMetaTest.scala index 9403d02c..38e37b38 100644 --- a/modules/addonlib/src/test/scala/docspell/addons/AddonMetaTest.scala +++ b/modules/addonlib/src/test/scala/docspell/addons/AddonMetaTest.scala @@ -7,9 +7,10 @@ package docspell.addons import cats.effect._ +import cats.syntax.all._ import docspell.common.Glob -import docspell.files.Zip +import docspell.common.util.{Directory, Zip} import docspell.logging.TestLoggingConfig import munit._ @@ -26,8 +27,8 @@ class AddonMetaTest extends CatsEffectSuite with TestLoggingConfig with Fixtures for { _ <- dummyAddonUrl .readURL[IO](8192) - .through(Zip.unzip(8192, Glob.all)) - .through(Zip.saveTo(logger, dir, moveUp = true)) + .through(Zip[IO]().unzip(8192, Glob.all, dir.some)) + .evalTap(_ => Directory.unwrapSingle(logger, dir)) .compile .drain meta <- AddonMeta.findInDirectory[IO](dir) diff --git a/modules/backend/src/main/scala/docspell/backend/joex/AddonOps.scala b/modules/backend/src/main/scala/docspell/backend/joex/AddonOps.scala index f69dd2a7..16d42d3b 100644 --- a/modules/backend/src/main/scala/docspell/backend/joex/AddonOps.scala +++ b/modules/backend/src/main/scala/docspell/backend/joex/AddonOps.scala @@ -16,6 +16,7 @@ import docspell.backend.ops.OAttachment import docspell.common._ import docspell.common.bc.BackendCommandRunner import docspell.common.exec.Env +import docspell.common.util.Directory import docspell.logging.Logger import docspell.scheduler.JobStore import docspell.store.Store diff --git a/modules/backend/src/main/scala/docspell/backend/joex/AddonPostProcess.scala b/modules/backend/src/main/scala/docspell/backend/joex/AddonPostProcess.scala index 172904eb..dddaee5f 100644 --- a/modules/backend/src/main/scala/docspell/backend/joex/AddonPostProcess.scala +++ b/modules/backend/src/main/scala/docspell/backend/joex/AddonPostProcess.scala @@ -17,7 +17,7 @@ import docspell.backend.JobFactory import docspell.backend.ops.OAttachment import docspell.common._ import docspell.common.bc.BackendCommandRunner -import docspell.files.FileSupport +import docspell.common.syntax.file._ import docspell.logging.Logger import docspell.scheduler.JobStore import docspell.store.Store @@ -28,7 +28,7 @@ final private[joex] class AddonPostProcess[F[_]: Sync: Files]( store: Store[F], attachOps: OAttachment[F], jobStore: JobStore[F] -) extends FileSupport { +) { def onResult( logger: Logger[F], @@ -105,7 +105,7 @@ final private[joex] class AddonPostProcess[F[_]: Sync: Files]( .getOrElse(Vector.empty) _ <- textFiles.traverse_ { case (key, file) => withAttach(logger, key, attachs) { ra => - setText(collective, ra, file.readText) + setText(collective, ra, file.readString) } } _ <- pdfFiles.traverse_ { case (key, file) => diff --git a/modules/common/src/main/scala/docspell/common/Binary.scala b/modules/common/src/main/scala/docspell/common/Binary.scala index fa36ed1c..409ee6d4 100644 --- a/modules/common/src/main/scala/docspell/common/Binary.scala +++ b/modules/common/src/main/scala/docspell/common/Binary.scala @@ -9,9 +9,14 @@ package docspell.common import java.nio.charset.Charset import java.nio.charset.StandardCharsets +import cats.data.OptionT import cats.effect._ +import cats.syntax.all._ +import fs2.io.file.{Files, Path} import fs2.{Chunk, Pipe, Stream} +import docspell.logging.Logger + import scodec.bits.ByteVector final case class Binary[F[_]](name: String, mime: MimeType, data: Stream[F, Byte]) { @@ -34,6 +39,9 @@ final case class Binary[F[_]](name: String, mime: MimeType, data: Stream[F, Byte object Binary { + def apply[F[_]: Async](file: Path): Binary[F] = + Binary(file.fileName.toString, Files[F].readAll(file)) + def apply[F[_]](name: String, data: Stream[F, Byte]): Binary[F] = Binary[F](name, MimeType.octetStream, data) @@ -65,6 +73,38 @@ object Binary { def loadAllBytes[F[_]: Sync](data: Stream[F, Byte]): F[ByteVector] = data.chunks.map(_.toByteVector).compile.fold(ByteVector.empty)((r, e) => r ++ e) + /** Convert paths into `Binary`s */ + def toBinary[F[_]: Async]: Pipe[F, Path, Binary[F]] = + _.map(Binary[F](_)) + + /** Save one or more binaries to a target directory. */ + def saveTo[F[_]: Async]( + logger: Logger[F], + targetDir: Path + ): Pipe[F, Binary[F], Path] = + binaries => + binaries + .filter(e => !e.name.endsWith("/")) + .evalMap { entry => + val out = targetDir / entry.name + val createParent = + OptionT + .fromOption[F](out.parent) + .flatMapF(parent => + Files[F] + .exists(parent) + .map(flag => Option.when(!flag)(parent)) + ) + .semiflatMap(p => Files[F].createDirectories(p)) + .getOrElse(()) + + logger.trace(s"Copy ${entry.name} -> $out") *> + createParent *> + entry.data.through(Files[F].writeAll(out)).compile.drain + } + .drain + .as(targetDir) + // This is a copy from org.http4s.util // Http4s is licensed under the Apache License 2.0 private object util { diff --git a/modules/common/src/main/scala/docspell/common/syntax/FileSyntax.scala b/modules/common/src/main/scala/docspell/common/syntax/FileSyntax.scala index 728475e4..e075666a 100644 --- a/modules/common/src/main/scala/docspell/common/syntax/FileSyntax.scala +++ b/modules/common/src/main/scala/docspell/common/syntax/FileSyntax.scala @@ -6,17 +6,49 @@ package docspell.common.syntax -import fs2.io.file.Path +import java.nio.file.{Files => NioFiles} + +import cats.effect._ +import fs2.Stream +import fs2.io.file.{Files, Path} + +import docspell.common.syntax.stream._ + +import io.circe.Encoder +import io.circe.syntax._ trait FileSyntax { - implicit final class PathOps(p: Path) { + implicit final class PathOps(self: Path) { def absolutePath: Path = - p.absolute + self.absolute def absolutePathAsString: String = absolutePath.toString + + def name: String = self.fileName.toString + def extension: String = self.extName.stripPrefix(".") + def dropLeft(n: Int): Path = + Path.fromNioPath(self.toNioPath.subpath(n, self.toNioPath.getNameCount)) + + def readString[F[_]: Sync]: F[String] = Sync[F].blocking( + NioFiles.readString(self.toNioPath) + ) + + def sha256Hex[F[_]: Files: Sync]: F[String] = + Files[F].readAll(self).sha256Hex + + def readAll[F[_]: Files]: Stream[F, Byte] = + Files[F].readAll(self) + + def writeJson[A: Encoder, F[_]: Files: Sync](value: A): F[Unit] = + Stream + .emit(value.asJson.noSpaces) + .through(fs2.text.utf8.encode) + .through(Files[F].writeAll(self)) + .compile + .drain } } diff --git a/modules/common/src/main/scala/docspell/common/syntax/StreamSyntax.scala b/modules/common/src/main/scala/docspell/common/syntax/StreamSyntax.scala index 8e1a4d48..18faefca 100644 --- a/modules/common/src/main/scala/docspell/common/syntax/StreamSyntax.scala +++ b/modules/common/src/main/scala/docspell/common/syntax/StreamSyntax.scala @@ -12,6 +12,7 @@ import fs2.Stream import io.circe._ import io.circe.parser._ +import scodec.bits.ByteVector trait StreamSyntax { implicit class StringStreamOps[F[_]](s: Stream[F, String]) { @@ -24,4 +25,14 @@ trait StreamSyntax { } yield value ) } + + implicit final class ByteStreamSyntax[F[_]](self: Stream[F, Byte]) { + def sha256Hex(implicit F: Sync[F]): F[String] = + self + .through(fs2.hash.sha256) + .compile + .foldChunks(ByteVector.empty)(_ ++ _.toByteVector) + .map(_.toHex) + } } +object StreamSyntax extends StreamSyntax diff --git a/modules/common/src/main/scala/docspell/common/syntax/package.scala b/modules/common/src/main/scala/docspell/common/syntax/package.scala index a295982d..7d79b701 100644 --- a/modules/common/src/main/scala/docspell/common/syntax/package.scala +++ b/modules/common/src/main/scala/docspell/common/syntax/package.scala @@ -8,6 +8,11 @@ package docspell.common package object syntax { + val either = EitherSyntax + val stream = StreamSyntax + val string = StringSyntax + val file = FileSyntax + object all extends EitherSyntax with StreamSyntax with StringSyntax with FileSyntax } diff --git a/modules/addonlib/src/main/scala/docspell/addons/Directory.scala b/modules/common/src/main/scala/docspell/common/util/Directory.scala similarity index 53% rename from modules/addonlib/src/main/scala/docspell/addons/Directory.scala rename to modules/common/src/main/scala/docspell/common/util/Directory.scala index 3b72ac4c..fcde4ebb 100644 --- a/modules/addonlib/src/main/scala/docspell/addons/Directory.scala +++ b/modules/common/src/main/scala/docspell/common/util/Directory.scala @@ -4,13 +4,17 @@ * SPDX-License-Identifier: AGPL-3.0-or-later */ -package docspell.addons +package docspell.common.util import cats.effect._ import cats.syntax.all._ import cats.{Applicative, Monad} +import fs2.Stream import fs2.io.file.{Files, Path, PosixPermissions} +import docspell.logging.Logger + +/** Utility functions for directories. */ object Directory { def create[F[_]: Files: Applicative](dir: Path): F[Path] = @@ -36,17 +40,6 @@ object Directory { d <- mkTemp(parent, prefix) } yield d - def temp2[F[_]: Files]( - parent: Path, - prefix1: String, - prefix2: String - ): Resource[F, (Path, Path)] = - for { - _ <- Resource.eval(Files[F].createDirectories(parent)) - a <- mkTemp(parent, prefix1) - b <- mkTemp(parent, prefix2) - } yield (a, b) - def createTemp[F[_]: Files: Monad]( parent: Path, prefix: String @@ -71,4 +64,46 @@ object Directory { prefix, PosixPermissions.fromOctal("777") ) + + /** If `dir` contains only a single non-empty directory, then its contents are moved out + * of it and the directory is deleted. This is applied repeatedly until the condition + * doesn't apply anymore (there are multiple entries in the directory or none). + */ + def unwrapSingle[F[_]: Sync: Files](logger: Logger[F], dir: Path): F[Boolean] = + Stream + .repeatEval(unwrapSingle1(logger, dir)) + .takeWhile(identity) + .compile + .fold(false)(_ || _) + + def unwrapSingle1[F[_]: Sync: Files]( + logger: Logger[F], + dir: Path + ): F[Boolean] = + Files[F] + .list(dir) + .take(2) + .compile + .toList + .flatMap { + case subdir :: Nil => + nonEmpty(subdir) + .flatMap { + case false => false.pure[F] + case true => + for { + _ <- Files[F] + .list(subdir) + .filter(p => p != dir) + .evalTap(c => logger.trace(s"Move $c -> ${dir / c.fileName}")) + .evalMap(child => Files[F].move(child, dir / child.fileName)) + .compile + .drain + _ <- Files[F].delete(subdir) + } yield true + } + + case _ => + false.pure[F] + } } diff --git a/modules/common/src/main/scala/docspell/common/util/Zip.scala b/modules/common/src/main/scala/docspell/common/util/Zip.scala new file mode 100644 index 00000000..335ed0b5 --- /dev/null +++ b/modules/common/src/main/scala/docspell/common/util/Zip.scala @@ -0,0 +1,43 @@ +/* + * Copyright 2020 Eike K. & Contributors + * + * SPDX-License-Identifier: AGPL-3.0-or-later + */ + +package docspell.common.util + +import cats.effect._ +import fs2.io.file.Path +import fs2.{Pipe, Stream} + +import docspell.common.Glob +import docspell.logging.Logger + +trait Zip[F[_]] { + + def zip(chunkSize: Int = Zip.defaultChunkSize): Pipe[F, (String, Stream[F, Byte]), Byte] + + def zipFiles(chunkSize: Int = Zip.defaultChunkSize): Pipe[F, (String, Path), Byte] + + def unzip( + chunkSize: Int = Zip.defaultChunkSize, + glob: Glob = Glob.all, + targetDir: Option[Path] = None + ): Pipe[F, Byte, Path] + + def unzipFiles( + chunkSize: Int = Zip.defaultChunkSize, + glob: Glob = Glob.all, + targetDir: Path => Option[Path] = _ => None + ): Pipe[F, Path, Path] +} + +object Zip { + val defaultChunkSize = 64 * 1024 + + def apply[F[_]: Async]( + logger: Option[Logger[F]] = None, + tempDir: Option[Path] = None + ): Zip[F] = + new ZipImpl[F](logger, tempDir) +} diff --git a/modules/common/src/main/scala/docspell/common/util/ZipImpl.scala b/modules/common/src/main/scala/docspell/common/util/ZipImpl.scala new file mode 100644 index 00000000..be65eee2 --- /dev/null +++ b/modules/common/src/main/scala/docspell/common/util/ZipImpl.scala @@ -0,0 +1,233 @@ +/* + * Copyright 2020 Eike K. & Contributors + * + * SPDX-License-Identifier: AGPL-3.0-or-later + */ + +package docspell.common.util + +import java.io.BufferedInputStream +import java.nio.charset.StandardCharsets +import java.util.zip.{ZipEntry, ZipFile, ZipOutputStream} + +import scala.jdk.CollectionConverters._ +import scala.util.Using +import scala.util.Using.Releasable + +import cats.effect._ +import cats.syntax.all._ +import fs2.io.file.{Files, Path} +import fs2.{Chunk, Pipe, Stream} + +import docspell.common.Glob +import docspell.logging.Logger + +final private class ZipImpl[F[_]: Async]( + log: Option[Logger[F]], + tempDir: Option[Path] +) extends Zip[F] { + private[this] val logger = log.getOrElse(docspell.logging.Logger.offF[F]) + + private val createTempDir: Resource[F, Path] = + Files[F].tempDirectory(tempDir, "docspell-zip-", None) + + def zip(chunkSize: Int): Pipe[F, (String, Stream[F, Byte]), Byte] = + in => ZipImpl.zipJava(logger, chunkSize, in.through(ZipImpl.deduplicate)) + + def zipFiles(chunkSize: Int): Pipe[F, (String, Path), Byte] = + in => ZipImpl.zipJavaPath(logger, chunkSize, in.through(ZipImpl.deduplicate)) + + def unzip( + chunkSize: Int, + glob: Glob, + targetDir: Option[Path] + ): Pipe[F, Byte, Path] = { input => + Stream + .resource(Files[F].tempFile(tempDir, "", ".zip", None)) + .evalTap(tempFile => input.through(Files[F].writeAll(tempFile)).compile.drain) + .through(unzipFiles(chunkSize, glob, _ => targetDir)) + } + + def unzipFiles( + chunkSize: Int, + glob: Glob, + targetDir: Path => Option[Path] + ): Pipe[F, Path, Path] = + input => + for { + zipArchive <- input + tempDir <- targetDir(zipArchive) + .map(Stream.emit) + .getOrElse(Stream.resource(createTempDir)) + entries <- Stream.eval(Sync[F].blocking { + ZipImpl.unzipZipFile(zipArchive, tempDir, glob) + }) + e <- Stream.chunk(entries) + } yield e +} + +object ZipImpl { + implicit val zipFileReleasable: Releasable[ZipFile] = + (resource: ZipFile) => resource.close() + + private def unzipZipFile(zip: Path, target: Path, glob: Glob): Chunk[Path] = + Using.resource(new ZipFile(zip.toNioPath.toFile, StandardCharsets.UTF_8)) { zf => + Chunk.iterator( + zf.entries() + .asScala + .filter(ze => !ze.getName.endsWith("/")) + .filter(ze => glob.matchFilenameOrPath(ze.getName)) + .map { ze => + val out = target / ze.getName + out.parent.map(_.toNioPath).foreach { p => + java.nio.file.Files.createDirectories(p) + } + Using.resource(java.nio.file.Files.newOutputStream(out.toNioPath)) { fout => + zf.getInputStream(ze).transferTo(fout) + out + } + } + ) + } + +// private def unzipZipStream( +// zip: InputStream, +// target: Path, +// glob: Glob +// ): List[Path] = +// Using.resource(new ZipInputStream(zip, StandardCharsets.UTF_8)) { zf => +// @annotation.tailrec +// def go(entry: Option[ZipEntry], result: List[Path]): List[Path] = +// entry match { +// case Some(ze) if glob.matchFilenameOrPath(ze.getName) => +// val out = target / ze.getName +// Using.resource(java.nio.file.Files.newOutputStream(out.toNioPath)) { fout => +// zf.transferTo(fout) +// } +// zf.closeEntry() +// go(Option(zf.getNextEntry), out :: result) +// case Some(_) => +// zf.closeEntry() +// go(Option(zf.getNextEntry), result) +// case None => +// result +// } +// +// go(Option(zf.getNextEntry), Nil) +// } + +// private def unzipStream2[F[_]: Async]( +// in: InputStream, +// chunkSize: Int, +// glob: Glob +// ): Stream[F, Binary[F]] = { +// val zin = new ZipInputStream(in) +// +// val nextEntry = Resource.make(Sync[F].delay(Option(zin.getNextEntry))) { +// case Some(_) => Sync[F].delay(zin.closeEntry()) +// case None => ().pure[F] +// } +// +// Stream +// .resource(nextEntry) +// .repeat +// .unNoneTerminate +// .filter(ze => glob.matchFilenameOrPath(ze.getName)) +// .map { ze => +// val name = ze.getName +// val data = +// fs2.io.readInputStream[F]((zin: InputStream).pure[F], chunkSize, false) +// Binary(name, data) +// } +// } + + private def deduplicate[F[_]: Sync, A]: Pipe[F, (String, A), (String, A)] = { + def makeName(name: String, count: Int): String = + if (count <= 0) name + else + name.lastIndexOf('.') match { + case n if n > 0 => + s"${name.substring(0, n)}_$count${name.substring(n)}" + case _ => + s"${name}_$count" + } + + @annotation.tailrec + def unique( + current: Set[String], + name: String, + counter: Int + ): (Set[String], String) = { + val nextName = makeName(name, counter) + if (current.contains(nextName)) + unique(current, name, counter + 1) + else (current + nextName, nextName) + } + + in => + Stream + .eval(Ref.of[F, Set[String]](Set.empty[String])) + .flatMap { ref => + in.evalMap { element => + ref + .modify(names => unique(names, element._1, 0)) + .map(n => (n, element._2)) + } + } + } + + private def zipJava[F[_]: Async]( + logger: Logger[F], + chunkSize: Int, + entries: Stream[F, (String, Stream[F, Byte])] + ): Stream[F, Byte] = + fs2.io.readOutputStream(chunkSize) { out => + val zip = new ZipOutputStream(out, StandardCharsets.UTF_8) + val writeEntries = + entries.evalMap { case (name, bytes) => + val javaOut = + bytes.through( + fs2.io.writeOutputStream[F](Sync[F].pure(zip), closeAfterUse = false) + ) + val nextEntry = + logger.debug(s"Adding $name to zip file…") *> + Sync[F].delay(zip.putNextEntry(new ZipEntry(name))) + Resource + .make(nextEntry)(_ => Sync[F].delay(zip.closeEntry())) + .use(_ => javaOut.compile.drain) + } + val closeStream = Sync[F].delay(zip.close()) + + writeEntries.onFinalize(closeStream).compile.drain + } + + private def zipJavaPath[F[_]: Async]( + logger: Logger[F], + chunkSize: Int, + entries: Stream[F, (String, Path)] + ): Stream[F, Byte] = + fs2.io.readOutputStream(chunkSize) { out => + val zip = new ZipOutputStream(out, StandardCharsets.UTF_8) + val writeEntries = + entries.evalMap { case (name, file) => + val javaOut = Sync[F].blocking { + val fin = new BufferedInputStream( + java.nio.file.Files.newInputStream(file.toNioPath), + chunkSize + ) + fin.transferTo(zip) + fin.close() + } + + val nextEntry = + logger.debug(s"Adding $name to zip file…") *> + Sync[F].delay(zip.putNextEntry(new ZipEntry(name))) + Resource + .make(nextEntry)(_ => Sync[F].delay(zip.closeEntry())) + .use(_ => javaOut) + } + val closeStream = Sync[F].delay(zip.close()) + + writeEntries.onFinalize(closeStream).compile.drain + } +} diff --git a/modules/common/src/test/scala/docspell/common/util/DirectoryTest.scala b/modules/common/src/test/scala/docspell/common/util/DirectoryTest.scala new file mode 100644 index 00000000..a760dcc1 --- /dev/null +++ b/modules/common/src/test/scala/docspell/common/util/DirectoryTest.scala @@ -0,0 +1,114 @@ +/* + * Copyright 2020 Eike K. & Contributors + * + * SPDX-License-Identifier: AGPL-3.0-or-later + */ + +package docspell.common.util + +import cats.effect._ +import cats.syntax.all._ +import fs2.io.file.{Files, Path} + +import docspell.logging.TestLoggingConfig + +import munit.CatsEffectSuite + +class DirectoryTest extends CatsEffectSuite with TestLoggingConfig { + val logger = docspell.logging.getLogger[IO] + val tempDir = ResourceFixture( + Files[IO].tempDirectory(Path("target").some, "directory-test-", None) + ) + + tempDir.test("unwrap directory when non empty") { dir => + for { + _ <- createDirectoryTree(dir, List("test/file1", "test/file2")) + r <- Directory.unwrapSingle1(logger, dir) + files <- Files[IO] + .list(dir) + .map(file => dir.relativize(file).toString) + .compile + .toVector + _ = { + assert(r) + assertEquals(files.sorted, Vector("file1", "file2")) + } + } yield () + } + + tempDir.test("unwrap directory when not empty repeat") { dir => + for { + _ <- createDirectoryTree(dir, List("test/file1", "test/file2")) + r <- Directory.unwrapSingle(logger, dir) + files <- Files[IO] + .list(dir) + .map(file => dir.relativize(file).toString) + .compile + .toVector + _ = { + assert(r) + assertEquals(files.sorted, Vector("file1", "file2")) + } + } yield () + } + + tempDir.test("unwrap nested directory") { dir => + for { + _ <- createDirectoryTree(dir, List("test0/test1/file1", "test0/test1/file2")) + r <- Directory.unwrapSingle(logger, dir) + files <- Files[IO] + .list(dir) + .map(file => dir.relativize(file).toString) + .compile + .toVector + _ = { + assert(r) + assertEquals(files.sorted, Vector("file1", "file2")) + } + } yield () + } + + tempDir.test("do nothing on empty directory") { dir => + for { + r1 <- Directory.unwrapSingle1[IO](logger, dir) + r2 <- Directory.unwrapSingle[IO](logger, dir) + _ = { + assert(!r1) + assert(!r2) + } + } yield () + } + + tempDir.test("do nothing when directory contains more than one entry") { dir => + for { + _ <- createDirectoryTree(dir, List("test1/file1", "file2")) + r1 <- Directory.unwrapSingle1[IO](logger, dir) + r2 <- Directory.unwrapSingle[IO](logger, dir) + _ = { + assert(!r1) + assert(!r2) + } + } yield () + } + + tempDir.test("do nothing when directory contains more than one entry (2)") { dir => + for { + _ <- createDirectoryTree(dir, List("file1", "file2")) + r1 <- Directory.unwrapSingle1[IO](logger, dir) + r2 <- Directory.unwrapSingle[IO](logger, dir) + _ = { + assert(!r1) + assert(!r2) + } + } yield () + } + + def createDirectoryTree(dir: Path, entries: List[String]): IO[Unit] = + entries.traverse_ { name => + val out = dir / name + out.parent + .map(p => Files[IO].createDirectories(p)) + .getOrElse(IO.unit) *> + Files[IO].createFile(out) + } +} diff --git a/modules/files/src/main/scala/docspell/files/FileSupport.scala b/modules/files/src/main/scala/docspell/files/FileSupport.scala index 5219ac37..bdd19197 100644 --- a/modules/files/src/main/scala/docspell/files/FileSupport.scala +++ b/modules/files/src/main/scala/docspell/files/FileSupport.scala @@ -7,19 +7,16 @@ package docspell.files import cats.data.OptionT -import cats.effect.Sync +import cats.effect.{Async, Sync} import cats.syntax.all._ -import fs2.Stream +import fs2.Pipe import fs2.io.file.{Files, Path} -import docspell.common.{MimeType, MimeTypeHint} - -import io.circe.Encoder -import io.circe.syntax._ +import docspell.common.{Binary, MimeType, MimeTypeHint} trait FileSupport { - implicit final class FileOps[F[_]: Files: Sync](self: Path) { - def detectMime: F[Option[MimeType]] = + implicit final class FileOps(self: Path) { + def detectMime[F[_]: Files: Sync]: F[Option[MimeType]] = Files[F].isReadable(self).flatMap { flag => OptionT .whenF(flag) { @@ -32,30 +29,18 @@ trait FileSupport { .value } - def asTextFile(alt: MimeType => F[Unit]): F[Option[Path]] = - OptionT(detectMime).flatMapF { mime => - if (mime.matches(MimeType.text("plain"))) self.some.pure[F] - else alt(mime).as(None: Option[Path]) - }.value - - def readText: F[String] = - Files[F] - .readAll(self) - .through(fs2.text.utf8.decode) - .compile - .string - - def readAll: Stream[F, Byte] = - Files[F].readAll(self) - - def writeJson[A: Encoder](value: A): F[Unit] = - Stream - .emit(value.asJson.noSpaces) - .through(fs2.text.utf8.encode) - .through(Files[F].writeAll(self)) - .compile - .drain + def mimeType[F[_]: Files: Sync]: F[MimeType] = + detectMime.map(_.getOrElse(MimeType.octetStream)) } + + def detectMime[F[_]: Sync]: Pipe[F, Binary[F], Binary[F]] = + _.evalMap { bin => + val hint = MimeTypeHint.filename(bin.name).withAdvertised(bin.mime.asString) + TikaMimetype.detect[F](bin.data, hint).map(mt => bin.copy(mime = mt)) + } + + def toBinaryWithMime[F[_]: Async]: Pipe[F, Path, Binary[F]] = + _.evalMap(file => file.mimeType.map(mt => Binary(file).copy(mime = mt))) } object FileSupport extends FileSupport diff --git a/modules/files/src/main/scala/docspell/files/Zip.scala b/modules/files/src/main/scala/docspell/files/Zip.scala deleted file mode 100644 index b8d1dfd0..00000000 --- a/modules/files/src/main/scala/docspell/files/Zip.scala +++ /dev/null @@ -1,180 +0,0 @@ -/* - * Copyright 2020 Eike K. & Contributors - * - * SPDX-License-Identifier: AGPL-3.0-or-later - */ - -package docspell.files - -import java.io.InputStream -import java.nio.charset.StandardCharsets -import java.util.zip.{ZipEntry, ZipInputStream, ZipOutputStream} - -import cats.data.OptionT -import cats.effect._ -import cats.implicits._ -import fs2.io.file.{Files, Path} -import fs2.{Pipe, Stream} - -import docspell.common.Binary -import docspell.common.Glob -import docspell.logging.Logger - -object Zip { - - def zip[F[_]: Async]( - logger: Logger[F], - chunkSize: Int - ): Pipe[F, (String, Stream[F, Byte]), Byte] = - in => zipJava(logger, chunkSize, in.through(deduplicate)) - - def unzip[F[_]: Async]( - chunkSize: Int, - glob: Glob - ): Pipe[F, Byte, Binary[F]] = - s => unzipStream[F](chunkSize, glob)(s) - - def unzipStream[F[_]: Async](chunkSize: Int, glob: Glob)( - data: Stream[F, Byte] - ): Stream[F, Binary[F]] = - data - .through(fs2.io.toInputStream[F]) - .flatMap(in => unzipJava(in, chunkSize, glob)) - - def saveTo[F[_]: Async]( - logger: Logger[F], - targetDir: Path, - moveUp: Boolean - ): Pipe[F, Binary[F], Path] = - binaries => - binaries - .filter(e => !e.name.endsWith("/")) - .evalMap { entry => - val out = targetDir / entry.name - val createParent = - OptionT - .fromOption[F](out.parent) - .flatMapF(parent => - Files[F] - .exists(parent) - .map(flag => Option.when(!flag)(parent)) - ) - .semiflatMap(p => Files[F].createDirectories(p)) - .getOrElse(()) - - logger.trace(s"Unzip ${entry.name} -> $out") *> - createParent *> - entry.data.through(Files[F].writeAll(out)).compile.drain - } - .drain ++ Stream - .eval(if (moveUp) moveContentsUp(logger)(targetDir) else ().pure[F]) - .as(targetDir) - - private def moveContentsUp[F[_]: Sync: Files](logger: Logger[F])(dir: Path): F[Unit] = - Files[F] - .list(dir) - .take(2) - .compile - .toList - .flatMap { - case subdir :: Nil => - Files[F].isDirectory(subdir).flatMap { - case false => ().pure[F] - case true => - Files[F] - .list(subdir) - .filter(p => p != dir) - .evalTap(c => logger.trace(s"Move $c -> ${dir / c.fileName}")) - .evalMap(child => Files[F].move(child, dir / child.fileName)) - .compile - .drain - } - - case _ => - ().pure[F] - } - - def unzipJava[F[_]: Async]( - in: InputStream, - chunkSize: Int, - glob: Glob - ): Stream[F, Binary[F]] = { - val zin = new ZipInputStream(in) - - val nextEntry = Resource.make(Sync[F].delay(Option(zin.getNextEntry))) { - case Some(_) => Sync[F].delay(zin.closeEntry()) - case None => ().pure[F] - } - - Stream - .resource(nextEntry) - .repeat - .unNoneTerminate - .filter(ze => glob.matchFilenameOrPath(ze.getName())) - .map { ze => - val name = ze.getName() - val data = - fs2.io.readInputStream[F]((zin: InputStream).pure[F], chunkSize, false) - Binary(name, data) - } - } - - private def deduplicate[F[_]: Sync, A]: Pipe[F, (String, A), (String, A)] = { - def makeName(name: String, count: Int): String = - if (count <= 0) name - else - name.lastIndexOf('.') match { - case n if n > 0 => - s"${name.substring(0, n)}_$count${name.substring(n)}" - case _ => - s"${name}_$count" - } - - def unique( - current: Set[String], - name: String, - counter: Int - ): (Set[String], String) = { - val nextName = makeName(name, counter) - if (current.contains(nextName)) - unique(current, name, counter + 1) - else (current + nextName, nextName) - } - - in => - Stream - .eval(Ref.of[F, Set[String]](Set.empty[String])) - .flatMap { ref => - in.evalMap { element => - ref - .modify(names => unique(names, element._1, 0)) - .map(n => (n, element._2)) - } - } - } - - def zipJava[F[_]: Async]( - logger: Logger[F], - chunkSize: Int, - entries: Stream[F, (String, Stream[F, Byte])] - ): Stream[F, Byte] = - fs2.io.readOutputStream(chunkSize) { out => - val zip = new ZipOutputStream(out, StandardCharsets.UTF_8) - val writeEntries = - entries.evalMap { case (name, bytes) => - val javaOut = - bytes.through( - fs2.io.writeOutputStream[F](Sync[F].pure(zip), closeAfterUse = false) - ) - val nextEntry = - logger.debug(s"Adding $name to zip file…") *> - Sync[F].delay(zip.putNextEntry(new ZipEntry(name))) - Resource - .make(nextEntry)(_ => Sync[F].delay(zip.closeEntry())) - .use(_ => javaOut.compile.drain) - } - val closeStream = Sync[F].delay(zip.close()) - - writeEntries.onFinalize(closeStream).compile.drain - } -} diff --git a/modules/files/src/test/scala/docspell/files/ZipTest.scala b/modules/files/src/test/scala/docspell/files/ZipTest.scala index 9230ac8f..45d86d0e 100644 --- a/modules/files/src/test/scala/docspell/files/ZipTest.scala +++ b/modules/files/src/test/scala/docspell/files/ZipTest.scala @@ -7,10 +7,12 @@ package docspell.files import cats.effect._ -import cats.implicits._ +import cats.syntax.option._ +import fs2.Stream import fs2.io.file.{Files, Path} -import docspell.common.Glob +import docspell.common.syntax.file._ +import docspell.common.util.Zip import docspell.logging.TestLoggingConfig import munit._ @@ -21,29 +23,101 @@ class ZipTest extends CatsEffectSuite with TestLoggingConfig { Files[IO].tempDirectory(Path("target").some, "zip-test-", None) ) - test("unzip") { + tempDir.test("unzip") { dir => val zipFile = ExampleFiles.letters_zip.readURL[IO](8192) - val unzip = zipFile.through(Zip.unzip(8192, Glob.all)) + val unzip: Stream[IO, Path] = zipFile + .through(Zip[IO](logger.some, dir.some).unzip(8192)) - unzip - .evalMap { entry => - val x = entry.data.map(_ => 1).foldMonoid.compile.lastOrError - x.map { size => - if (entry.name.endsWith(".pdf")) { - assertEquals(entry.name, "letter-de.pdf") - assertEquals(size, 34815) - } else { - assertEquals(entry.name, "letter-en.txt") - assertEquals(size, 1131) - } + (for { + file <- unzip + length <- Stream.eval(Files[IO].size(file)) + sha <- Stream.eval(file.sha256Hex[IO]) + _ = { + if (file.name == "letter-de.pdf") { + assertEquals(length, 34815L) + assertEquals( + sha, + "299c15429ce327099c322b36caaec56e7a6034106531c5d1b3fd085467a8d495" + ) + } else { + assertEquals(file.name, "letter-en.txt") + assertEquals(length, 1131L) + assertEquals( + sha, + "55eca47c65084126d7c3bbce941cadff0f642a7287ff8e0f3fc9c2c33a4bb7f0" + ) } } + } yield ()).compile.drain + } + + tempDir.test("unzip directories and files") { dir => + val zipFile = ExampleFiles.zip_dirs_zip.readURL[IO](8192) + val unzip: Stream[IO, Path] = zipFile + .through(Zip[IO](logger.some, dir.some).unzip(8192)) + + val entries = + for { + file <- unzip + sha <- Stream.eval(file.sha256Hex[IO]) + } yield (file.name, file, sha) + + val expectedSha = + "5891b5b522d5df086d0ff0b110fbd9d21bb4fc7163af34d08286a2e846f6be03" + + entries + .map { + case ("file1.txt", file, realSha) => + assertEquals(realSha, expectedSha) + val relFile = dir.relativize(file).dropLeft(1) + assertEquals(relFile.toString, "file1.txt") + + case ("file2.txt", file, realSha) => + assertEquals(realSha, expectedSha) + val relFile = dir.relativize(file).dropLeft(1) + assertEquals(relFile.toString, "dir1/file2.txt") + + case ("file3.txt", file, realSha) => + assertEquals(realSha, expectedSha) + val relFile = dir.relativize(file).dropLeft(1) + assertEquals(relFile.toString, "dir1/dir11/file3.txt") + + case ("file4.txt", file, realSha) => + assertEquals(realSha, expectedSha) + val relFile = dir.relativize(file).dropLeft(1) + assertEquals(relFile.toString, "dir2/file4.txt") + + case (name, _, _) => + fail(s"Unexpected file: $name") + } .compile .drain } - tempDir.test("unzipTo directory tree") { _ => - // val zipFile = ExampleFiles.zip_dirs_zip.readURL[IO](8192) - // zipFile.through(Zip.unzip(G)) - } +// tempDir.test("test runtime") { _ => +// val archive = Path("./local/large-archive.zip") +// +// for { +// +// timer1 <- Duration.stopTime[IO] +// es1 <- Files[IO] +// .readAll(archive) +// .through(Zip[IO]().unzip(64 * 1024)) +// .compile +// .toVector +// duration1 <- timer1 +// +// timer2 <- Duration.stopTime[IO] +// es2 <- fs2.Stream +// .emit(archive) +// .covary[IO] +// .through(Zip[IO]().unzipFiles(64 * 1024)) +// .compile +// .toVector +// duration2 <- timer2 +// +// _ <- IO.println(s">>>>1. ${duration1.formatExact}, entries: $es1") +// _ <- IO.println(s">>>>2. ${duration2.formatExact}, entries: $es2") +// } yield () +// } } diff --git a/modules/joex/src/main/scala/docspell/joex/addon/GenericItemAddonTask.scala b/modules/joex/src/main/scala/docspell/joex/addon/GenericItemAddonTask.scala index 50bbf41b..68b2d775 100644 --- a/modules/joex/src/main/scala/docspell/joex/addon/GenericItemAddonTask.scala +++ b/modules/joex/src/main/scala/docspell/joex/addon/GenericItemAddonTask.scala @@ -15,14 +15,14 @@ import docspell.addons.{AddonTriggerType, InputEnv, Middleware} import docspell.backend.joex.AddonOps.ExecResult import docspell.backend.joex.{AddonOps, LoggerExtension} import docspell.common._ -import docspell.files.FileSupport +import docspell.common.syntax.file._ import docspell.joex.process.ItemData import docspell.logging.Logger import docspell.scheduler.Task import docspell.store.Store import docspell.store.queries.QAttachment -object GenericItemAddonTask extends LoggerExtension with FileSupport { +object GenericItemAddonTask extends LoggerExtension { private val itemSubdir = "item" private val itemDataJson = s"$itemSubdir/item-data.json" diff --git a/modules/joex/src/main/scala/docspell/joex/download/DownloadZipTask.scala b/modules/joex/src/main/scala/docspell/joex/download/DownloadZipTask.scala index 125d1c75..00d747d9 100644 --- a/modules/joex/src/main/scala/docspell/joex/download/DownloadZipTask.scala +++ b/modules/joex/src/main/scala/docspell/joex/download/DownloadZipTask.scala @@ -16,7 +16,7 @@ import docspell.backend.ops.ODownloadAll import docspell.backend.ops.ODownloadAll.model.DownloadSummary import docspell.backend.task.DownloadZipArgs import docspell.common._ -import docspell.files.Zip +import docspell.common.util.Zip import docspell.scheduler.Task import docspell.store.Store import docspell.store.queries.{ItemFileMeta, QItem} @@ -50,7 +50,7 @@ object DownloadZipTask { val storeZipFile = allFiles - .through(Zip.zip(ctx.logger, chunkSize)) + .through(Zip[F](ctx.logger.some).zip(chunkSize)) .through( store.fileRepo.save( ctx.args.accountId.collective, diff --git a/modules/joex/src/main/scala/docspell/joex/multiupload/MultiUploadArchiveTask.scala b/modules/joex/src/main/scala/docspell/joex/multiupload/MultiUploadArchiveTask.scala index 0470fcdc..988dd1a0 100644 --- a/modules/joex/src/main/scala/docspell/joex/multiupload/MultiUploadArchiveTask.scala +++ b/modules/joex/src/main/scala/docspell/joex/multiupload/MultiUploadArchiveTask.scala @@ -14,7 +14,7 @@ import fs2.Stream import docspell.backend.JobFactory import docspell.common._ -import docspell.files.Zip +import docspell.common.util.Zip import docspell.logging.Logger import docspell.scheduler._ import docspell.store.Store @@ -44,7 +44,7 @@ object MultiUploadArchiveTask { extractZip(store, ctx.args)(file) .evalTap(entry => ctx.logger.debug( - s"Create job for entry: ${entry.files.flatMap(_.name)}" + s"Create job for entry: ${entry.files.flatMap(_.name).mkString(", ")}" ) ) .evalMap(makeJob[F](ctx, jobStore)) @@ -109,7 +109,8 @@ object MultiUploadArchiveTask { )(file: ProcessItemArgs.File): Stream[F, ProcessItemArgs] = store.fileRepo .getBytes(file.fileMetaId) - .through(Zip.unzip[F](8192, args.meta.fileFilter.getOrElse(Glob.all))) + .through(Zip[F]().unzip(glob = args.meta.fileFilter.getOrElse(Glob.all))) + .through(Binary.toBinary[F]) .flatMap { entry => val hint = MimeTypeHint(entry.name.some, entry.mime.asString.some) entry.data diff --git a/modules/joex/src/main/scala/docspell/joex/process/ExtractArchive.scala b/modules/joex/src/main/scala/docspell/joex/process/ExtractArchive.scala index 27cbe414..410e31bb 100644 --- a/modules/joex/src/main/scala/docspell/joex/process/ExtractArchive.scala +++ b/modules/joex/src/main/scala/docspell/joex/process/ExtractArchive.scala @@ -16,7 +16,7 @@ import cats.kernel.Order import fs2.Stream import docspell.common._ -import docspell.files.Zip +import docspell.common.util.Zip import docspell.joex.mail._ import docspell.scheduler._ import docspell.store.Store @@ -146,7 +146,8 @@ object ExtractArchive { val glob = ctx.args.meta.fileFilter.getOrElse(Glob.all) ctx.logger.debug(s"Filtering zip entries with '${glob.asString}'") *> zipData - .through(Zip.unzip[F](8192, glob)) + .through(Zip[F](ctx.logger.some).unzip(glob = glob)) + .through(Binary.toBinary[F]) .zipWithIndex .flatMap(handleEntry(ctx, store, ra, pos, archive, None)) .foldMonoid