mirror of
				https://github.com/TheAnachronism/docspell.git
				synced 2025-10-31 09:30:12 +00:00 
			
		
		
		
	
							
								
								
									
										40
									
								
								build.sbt
									
									
									
									
									
								
							
							
						
						
									
										40
									
								
								build.sbt
									
									
									
									
									
								
							| @@ -321,25 +321,6 @@ val loggingApi = project | ||||
|         Dependencies.sourcecode | ||||
|   ) | ||||
|  | ||||
| // Base module, everything depends on this – including restapi and | ||||
| // joexapi modules. This should aim to have least possible | ||||
| // dependencies | ||||
| val common = project | ||||
|   .in(file("modules/common")) | ||||
|   .disablePlugins(RevolverPlugin) | ||||
|   .settings(sharedSettings) | ||||
|   .withTestSettings | ||||
|   .settings( | ||||
|     name := "docspell-common", | ||||
|     libraryDependencies ++= | ||||
|       Dependencies.fs2 ++ | ||||
|         Dependencies.circe ++ | ||||
|         Dependencies.circeGenericExtra ++ | ||||
|         Dependencies.calevCore ++ | ||||
|         Dependencies.calevCirce | ||||
|   ) | ||||
|   .dependsOn(loggingApi) | ||||
|  | ||||
| val loggingScribe = project | ||||
|   .in(file("modules/logging/scribe")) | ||||
|   .disablePlugins(RevolverPlugin) | ||||
| @@ -355,6 +336,25 @@ val loggingScribe = project | ||||
|   ) | ||||
|   .dependsOn(loggingApi) | ||||
|  | ||||
| // Base module, everything depends on this – including restapi and | ||||
| // joexapi modules. This should aim to have least possible | ||||
| // dependencies | ||||
| val common = project | ||||
|   .in(file("modules/common")) | ||||
|   .disablePlugins(RevolverPlugin) | ||||
|   .settings(sharedSettings) | ||||
|   .withTestSettingsDependsOn(loggingScribe) | ||||
|   .settings( | ||||
|     name := "docspell-common", | ||||
|     libraryDependencies ++= | ||||
|       Dependencies.fs2 ++ | ||||
|         Dependencies.circe ++ | ||||
|         Dependencies.circeGenericExtra ++ | ||||
|         Dependencies.calevCore ++ | ||||
|         Dependencies.calevCirce | ||||
|   ) | ||||
|   .dependsOn(loggingApi) | ||||
|  | ||||
| // Some example files for testing | ||||
| // https://file-examples.com/index.php/sample-documents-download/sample-doc-download/ | ||||
| val files = project | ||||
| @@ -393,7 +393,7 @@ ${lines.map(_._1).mkString(",\n")} | ||||
|       Seq(target) | ||||
|     }.taskValue | ||||
|   ) | ||||
|   .dependsOn(common) | ||||
|   .dependsOn(common, loggingScribe) | ||||
|  | ||||
| val query = | ||||
|   crossProject(JSPlatform, JVMPlatform) | ||||
|   | ||||
| @@ -12,7 +12,8 @@ import fs2.Stream | ||||
| import fs2.io.file.{Files, Path} | ||||
|  | ||||
| import docspell.common._ | ||||
| import docspell.files.Zip | ||||
| import docspell.common.syntax.file._ | ||||
| import docspell.common.util.{Directory, Zip} | ||||
|  | ||||
| final case class AddonArchive(url: LenientUri, name: String, version: String) { | ||||
|   def nameAndVersion: String = | ||||
| @@ -36,8 +37,8 @@ final case class AddonArchive(url: LenientUri, name: String, version: String) { | ||||
|         case false => | ||||
|           Files[F].createDirectories(target) *> | ||||
|             reader(url) | ||||
|               .through(Zip.unzip(8192, glob)) | ||||
|               .through(Zip.saveTo(logger, target, moveUp = true)) | ||||
|               .through(Zip[F](logger.some).unzip(glob = glob, targetDir = target.some)) | ||||
|               .evalTap(_ => Directory.unwrapSingle[F](logger, target)) | ||||
|               .compile | ||||
|               .drain | ||||
|               .as(target) | ||||
| @@ -72,12 +73,13 @@ object AddonArchive { | ||||
|       archive: Either[Path, Stream[F, Byte]] | ||||
|   ): F[(Boolean, Boolean)] = { | ||||
|     val files = Files[F] | ||||
|     val logger = docspell.logging.getLogger[F] | ||||
|     def forPath(path: Path): F[(Boolean, Boolean)] = | ||||
|       (files.exists(path / "Dockerfile"), files.exists(path / "flake.nix")).tupled | ||||
|  | ||||
|     def forZip(data: Stream[F, Byte]): F[(Boolean, Boolean)] = | ||||
|       data | ||||
|         .through(Zip.unzip(8192, Glob("Dockerfile|flake.nix"))) | ||||
|         .through(Zip[F](logger.some).unzip(glob = Glob("Dockerfile|flake.nix"))) | ||||
|         .collect { | ||||
|           case bin if bin.name == "Dockerfile" => (true, false) | ||||
|           case bin if bin.name == "flake.nix"  => (false, true) | ||||
|   | ||||
| @@ -14,6 +14,7 @@ import fs2.io.file._ | ||||
|  | ||||
| import docspell.common.UrlReader | ||||
| import docspell.common.exec.Env | ||||
| import docspell.common.util.Directory | ||||
| import docspell.logging.Logger | ||||
|  | ||||
| trait AddonExecutor[F[_]] { | ||||
|   | ||||
| @@ -15,7 +15,8 @@ import fs2.Stream | ||||
| import fs2.io.file.{Files, Path} | ||||
|  | ||||
| import docspell.common.Glob | ||||
| import docspell.files.Zip | ||||
| import docspell.common.syntax.file._ | ||||
| import docspell.common.util.Zip | ||||
|  | ||||
| import io.circe.generic.semiauto.{deriveDecoder, deriveEncoder} | ||||
| import io.circe.yaml.{parser => YamlParser} | ||||
| @@ -153,6 +154,12 @@ object AddonMeta { | ||||
|       .map(fromJsonString) | ||||
|       .rethrow | ||||
|  | ||||
|   def fromJsonFile[F[_]: Sync](file: Path): F[AddonMeta] = | ||||
|     Sync[F] | ||||
|       .blocking(java.nio.file.Files.readString(file.toNioPath)) | ||||
|       .map(fromJsonString) | ||||
|       .rethrow | ||||
|  | ||||
|   def fromYamlString(str: String): Either[Throwable, AddonMeta] = | ||||
|     YamlParser.parse(str).flatMap(_.as[AddonMeta]) | ||||
|  | ||||
| @@ -164,6 +171,13 @@ object AddonMeta { | ||||
|       .map(fromYamlString) | ||||
|       .rethrow | ||||
|  | ||||
|   def fromYamlFile[F[_]: Sync](file: Path): F[AddonMeta] = | ||||
|     Sync[F] | ||||
|       .blocking(YamlParser.parse(java.nio.file.Files.newBufferedReader(file.toNioPath))) | ||||
|       .rethrow | ||||
|       .map(_.as[AddonMeta]) | ||||
|       .rethrow | ||||
|  | ||||
|   def findInDirectory[F[_]: Sync: Files](dir: Path): F[AddonMeta] = { | ||||
|     val logger = docspell.logging.getLogger[F] | ||||
|     val jsonFile = dir / "docspell-addon.json" | ||||
| @@ -194,18 +208,22 @@ object AddonMeta { | ||||
|   } | ||||
|  | ||||
|   def findInZip[F[_]: Async](zipFile: Stream[F, Byte]): F[AddonMeta] = { | ||||
|     val logger = docspell.logging.getLogger[F] | ||||
|     val fail: F[AddonMeta] = Async[F].raiseError( | ||||
|       new FileNotFoundException( | ||||
|         s"No docspell-addon.{yaml|json} file found in zip!" | ||||
|       ) | ||||
|     ) | ||||
|     zipFile | ||||
|       .through(Zip.unzip(8192, Glob("docspell-addon.*|**/docspell-addon.*"))) | ||||
|       .filter(bin => !bin.name.endsWith("/")) | ||||
|       .through( | ||||
|         Zip[F](logger.some).unzip(glob = Glob("docspell-addon.*|**/docspell-addon.*")) | ||||
|       ) | ||||
|       .filter(file => !file.name.endsWith("/")) | ||||
|       .flatMap { bin => | ||||
|         if (bin.extensionIn(Set("json"))) Stream.eval(AddonMeta.fromJsonBytes(bin.data)) | ||||
|         else if (bin.extensionIn(Set("yaml", "yml"))) | ||||
|           Stream.eval(AddonMeta.fromYamlBytes(bin.data)) | ||||
|         val ext = bin.extension | ||||
|         if (ext.equalsIgnoreCase("json")) Stream.eval(AddonMeta.fromJsonFile(bin)) | ||||
|         else if (Set("yaml", "yml").contains(ext.toLowerCase)) | ||||
|           Stream.eval(AddonMeta.fromYamlFile(bin)) | ||||
|         else Stream.empty | ||||
|       } | ||||
|       .take(1) | ||||
|   | ||||
| @@ -10,6 +10,7 @@ import cats.effect.Resource | ||||
| import fs2.io.file.{Files, Path} | ||||
|  | ||||
| import docspell.common.exec.Env | ||||
| import docspell.common.util.Directory | ||||
|  | ||||
| case class InputEnv( | ||||
|     addons: List[AddonRef], | ||||
|   | ||||
| @@ -13,7 +13,7 @@ import fs2.io.file.{Files, Path, PosixPermissions} | ||||
|  | ||||
| import docspell.addons.out.AddonOutput | ||||
| import docspell.common.LenientUri | ||||
| import docspell.files.Zip | ||||
| import docspell.common.util.Zip | ||||
|  | ||||
| import io.circe.syntax._ | ||||
|  | ||||
| @@ -59,9 +59,9 @@ object AddonGenerator { | ||||
|   private def createZip(dir: Path, files: List[Path]) = | ||||
|     Stream | ||||
|       .emits(files) | ||||
|       .map(f => (f.fileName.toString, Files[IO].readAll(f))) | ||||
|       .map(f => (f.fileName.toString, f)) | ||||
|       .covary[IO] | ||||
|       .through(Zip.zip[IO](logger, 8192)) | ||||
|       .through(Zip[IO](logger.some).zipFiles()) | ||||
|       .through(Files[IO].writeAll(dir / "addon.zip")) | ||||
|       .compile | ||||
|       .drain | ||||
|   | ||||
| @@ -7,9 +7,10 @@ | ||||
| package docspell.addons | ||||
|  | ||||
| import cats.effect._ | ||||
| import cats.syntax.all._ | ||||
|  | ||||
| import docspell.common.Glob | ||||
| import docspell.files.Zip | ||||
| import docspell.common.util.{Directory, Zip} | ||||
| import docspell.logging.TestLoggingConfig | ||||
|  | ||||
| import munit._ | ||||
| @@ -26,8 +27,8 @@ class AddonMetaTest extends CatsEffectSuite with TestLoggingConfig with Fixtures | ||||
|     for { | ||||
|       _ <- dummyAddonUrl | ||||
|         .readURL[IO](8192) | ||||
|         .through(Zip.unzip(8192, Glob.all)) | ||||
|         .through(Zip.saveTo(logger, dir, moveUp = true)) | ||||
|         .through(Zip[IO]().unzip(8192, Glob.all, dir.some)) | ||||
|         .evalTap(_ => Directory.unwrapSingle(logger, dir)) | ||||
|         .compile | ||||
|         .drain | ||||
|       meta <- AddonMeta.findInDirectory[IO](dir) | ||||
|   | ||||
| @@ -16,6 +16,7 @@ import docspell.backend.ops.OAttachment | ||||
| import docspell.common._ | ||||
| import docspell.common.bc.BackendCommandRunner | ||||
| import docspell.common.exec.Env | ||||
| import docspell.common.util.Directory | ||||
| import docspell.logging.Logger | ||||
| import docspell.scheduler.JobStore | ||||
| import docspell.store.Store | ||||
|   | ||||
| @@ -17,7 +17,7 @@ import docspell.backend.JobFactory | ||||
| import docspell.backend.ops.OAttachment | ||||
| import docspell.common._ | ||||
| import docspell.common.bc.BackendCommandRunner | ||||
| import docspell.files.FileSupport | ||||
| import docspell.common.syntax.file._ | ||||
| import docspell.logging.Logger | ||||
| import docspell.scheduler.JobStore | ||||
| import docspell.store.Store | ||||
| @@ -28,7 +28,7 @@ final private[joex] class AddonPostProcess[F[_]: Sync: Files]( | ||||
|     store: Store[F], | ||||
|     attachOps: OAttachment[F], | ||||
|     jobStore: JobStore[F] | ||||
| ) extends FileSupport { | ||||
| ) { | ||||
|  | ||||
|   def onResult( | ||||
|       logger: Logger[F], | ||||
| @@ -105,7 +105,7 @@ final private[joex] class AddonPostProcess[F[_]: Sync: Files]( | ||||
|         .getOrElse(Vector.empty) | ||||
|       _ <- textFiles.traverse_ { case (key, file) => | ||||
|         withAttach(logger, key, attachs) { ra => | ||||
|           setText(collective, ra, file.readText) | ||||
|           setText(collective, ra, file.readString) | ||||
|         } | ||||
|       } | ||||
|       _ <- pdfFiles.traverse_ { case (key, file) => | ||||
|   | ||||
| @@ -95,7 +95,13 @@ final class AddonValidate[F[_]: Async]( | ||||
|           ) | ||||
|         else rightUnitT | ||||
|  | ||||
|       joexSupport <- EitherT.liftF(joexOps.getAddonSupport) | ||||
|       joexSupport <- EitherT(joexOps.getAddonSupport.attempt).leftMap { ex => | ||||
|         logger.asUnsafe.warn(ex)(s"Joex validation failed!") | ||||
|         AddonUnsupported( | ||||
|           s"Joex validation failed due to an error: ${ex.getMessage}", | ||||
|           Nil | ||||
|         ) | ||||
|       } | ||||
|       addonRunners <- EitherT.liftF(meta.enabledTypes(addonData)) | ||||
|       _ <- EitherT.liftF( | ||||
|         logger.info( | ||||
|   | ||||
| @@ -9,9 +9,14 @@ package docspell.common | ||||
| import java.nio.charset.Charset | ||||
| import java.nio.charset.StandardCharsets | ||||
|  | ||||
| import cats.data.OptionT | ||||
| import cats.effect._ | ||||
| import cats.syntax.all._ | ||||
| import fs2.io.file.{Files, Path} | ||||
| import fs2.{Chunk, Pipe, Stream} | ||||
|  | ||||
| import docspell.logging.Logger | ||||
|  | ||||
| import scodec.bits.ByteVector | ||||
|  | ||||
| final case class Binary[F[_]](name: String, mime: MimeType, data: Stream[F, Byte]) { | ||||
| @@ -34,6 +39,9 @@ final case class Binary[F[_]](name: String, mime: MimeType, data: Stream[F, Byte | ||||
|  | ||||
| object Binary { | ||||
|  | ||||
|   def apply[F[_]: Async](file: Path): Binary[F] = | ||||
|     Binary(file.fileName.toString, Files[F].readAll(file)) | ||||
|  | ||||
|   def apply[F[_]](name: String, data: Stream[F, Byte]): Binary[F] = | ||||
|     Binary[F](name, MimeType.octetStream, data) | ||||
|  | ||||
| @@ -65,6 +73,38 @@ object Binary { | ||||
|   def loadAllBytes[F[_]: Sync](data: Stream[F, Byte]): F[ByteVector] = | ||||
|     data.chunks.map(_.toByteVector).compile.fold(ByteVector.empty)((r, e) => r ++ e) | ||||
|  | ||||
|   /** Convert paths into `Binary`s */ | ||||
|   def toBinary[F[_]: Async]: Pipe[F, Path, Binary[F]] = | ||||
|     _.map(Binary[F](_)) | ||||
|  | ||||
|   /** Save one or more binaries to a target directory. */ | ||||
|   def saveTo[F[_]: Async]( | ||||
|       logger: Logger[F], | ||||
|       targetDir: Path | ||||
|   ): Pipe[F, Binary[F], Path] = | ||||
|     binaries => | ||||
|       binaries | ||||
|         .filter(e => !e.name.endsWith("/")) | ||||
|         .evalMap { entry => | ||||
|           val out = targetDir / entry.name | ||||
|           val createParent = | ||||
|             OptionT | ||||
|               .fromOption[F](out.parent) | ||||
|               .flatMapF(parent => | ||||
|                 Files[F] | ||||
|                   .exists(parent) | ||||
|                   .map(flag => Option.when(!flag)(parent)) | ||||
|               ) | ||||
|               .semiflatMap(p => Files[F].createDirectories(p)) | ||||
|               .getOrElse(()) | ||||
|  | ||||
|           logger.trace(s"Copy ${entry.name} -> $out") *> | ||||
|             createParent *> | ||||
|             entry.data.through(Files[F].writeAll(out)).compile.drain | ||||
|         } | ||||
|         .drain | ||||
|         .as(targetDir) | ||||
|  | ||||
|   // This is a copy from org.http4s.util | ||||
|   // Http4s is licensed under the Apache License 2.0 | ||||
|   private object util { | ||||
|   | ||||
| @@ -6,17 +6,49 @@ | ||||
|  | ||||
| package docspell.common.syntax | ||||
|  | ||||
| import fs2.io.file.Path | ||||
| import java.nio.file.{Files => NioFiles} | ||||
|  | ||||
| import cats.effect._ | ||||
| import fs2.Stream | ||||
| import fs2.io.file.{Files, Path} | ||||
|  | ||||
| import docspell.common.syntax.stream._ | ||||
|  | ||||
| import io.circe.Encoder | ||||
| import io.circe.syntax._ | ||||
|  | ||||
| trait FileSyntax { | ||||
|  | ||||
|   implicit final class PathOps(p: Path) { | ||||
|   implicit final class PathOps(self: Path) { | ||||
|  | ||||
|     def absolutePath: Path = | ||||
|       p.absolute | ||||
|       self.absolute | ||||
|  | ||||
|     def absolutePathAsString: String = | ||||
|       absolutePath.toString | ||||
|  | ||||
|     def name: String = self.fileName.toString | ||||
|     def extension: String = self.extName.stripPrefix(".") | ||||
|     def dropLeft(n: Int): Path = | ||||
|       Path.fromNioPath(self.toNioPath.subpath(n, self.toNioPath.getNameCount)) | ||||
|  | ||||
|     def readString[F[_]: Sync]: F[String] = Sync[F].blocking( | ||||
|       NioFiles.readString(self.toNioPath) | ||||
|     ) | ||||
|  | ||||
|     def sha256Hex[F[_]: Files: Sync]: F[String] = | ||||
|       Files[F].readAll(self).sha256Hex | ||||
|  | ||||
|     def readAll[F[_]: Files]: Stream[F, Byte] = | ||||
|       Files[F].readAll(self) | ||||
|  | ||||
|     def writeJson[A: Encoder, F[_]: Files: Sync](value: A): F[Unit] = | ||||
|       Stream | ||||
|         .emit(value.asJson.noSpaces) | ||||
|         .through(fs2.text.utf8.encode) | ||||
|         .through(Files[F].writeAll(self)) | ||||
|         .compile | ||||
|         .drain | ||||
|   } | ||||
| } | ||||
|  | ||||
|   | ||||
| @@ -12,6 +12,7 @@ import fs2.Stream | ||||
|  | ||||
| import io.circe._ | ||||
| import io.circe.parser._ | ||||
| import scodec.bits.ByteVector | ||||
|  | ||||
| trait StreamSyntax { | ||||
|   implicit class StringStreamOps[F[_]](s: Stream[F, String]) { | ||||
| @@ -24,4 +25,14 @@ trait StreamSyntax { | ||||
|           } yield value | ||||
|         ) | ||||
|   } | ||||
|  | ||||
|   implicit final class ByteStreamSyntax[F[_]](self: Stream[F, Byte]) { | ||||
|     def sha256Hex(implicit F: Sync[F]): F[String] = | ||||
|       self | ||||
|         .through(fs2.hash.sha256) | ||||
|         .compile | ||||
|         .foldChunks(ByteVector.empty)(_ ++ _.toByteVector) | ||||
|         .map(_.toHex) | ||||
|   } | ||||
| } | ||||
| object StreamSyntax extends StreamSyntax | ||||
|   | ||||
| @@ -8,6 +8,11 @@ package docspell.common | ||||
|  | ||||
| package object syntax { | ||||
|  | ||||
|   val either = EitherSyntax | ||||
|   val stream = StreamSyntax | ||||
|   val string = StringSyntax | ||||
|   val file = FileSyntax | ||||
|  | ||||
|   object all extends EitherSyntax with StreamSyntax with StringSyntax with FileSyntax | ||||
|  | ||||
| } | ||||
|   | ||||
| @@ -4,13 +4,17 @@ | ||||
|  * SPDX-License-Identifier: AGPL-3.0-or-later | ||||
|  */ | ||||
| 
 | ||||
| package docspell.addons | ||||
| package docspell.common.util | ||||
| 
 | ||||
| import cats.effect._ | ||||
| import cats.syntax.all._ | ||||
| import cats.{Applicative, Monad} | ||||
| import fs2.Stream | ||||
| import fs2.io.file.{Files, Path, PosixPermissions} | ||||
| 
 | ||||
| import docspell.logging.Logger | ||||
| 
 | ||||
| /** Utility functions for directories. */ | ||||
| object Directory { | ||||
| 
 | ||||
|   def create[F[_]: Files: Applicative](dir: Path): F[Path] = | ||||
| @@ -36,17 +40,6 @@ object Directory { | ||||
|       d <- mkTemp(parent, prefix) | ||||
|     } yield d | ||||
| 
 | ||||
|   def temp2[F[_]: Files]( | ||||
|       parent: Path, | ||||
|       prefix1: String, | ||||
|       prefix2: String | ||||
|   ): Resource[F, (Path, Path)] = | ||||
|     for { | ||||
|       _ <- Resource.eval(Files[F].createDirectories(parent)) | ||||
|       a <- mkTemp(parent, prefix1) | ||||
|       b <- mkTemp(parent, prefix2) | ||||
|     } yield (a, b) | ||||
| 
 | ||||
|   def createTemp[F[_]: Files: Monad]( | ||||
|       parent: Path, | ||||
|       prefix: String | ||||
| @@ -71,4 +64,46 @@ object Directory { | ||||
|         prefix, | ||||
|         PosixPermissions.fromOctal("777") | ||||
|       ) | ||||
| 
 | ||||
|   /** If `dir` contains only a single non-empty directory, then its contents are moved out | ||||
|     * of it and the directory is deleted. This is applied repeatedly until the condition | ||||
|     * doesn't apply anymore (there are multiple entries in the directory or none). | ||||
|     */ | ||||
|   def unwrapSingle[F[_]: Sync: Files](logger: Logger[F], dir: Path): F[Boolean] = | ||||
|     Stream | ||||
|       .repeatEval(unwrapSingle1(logger, dir)) | ||||
|       .takeWhile(identity) | ||||
|       .compile | ||||
|       .fold(false)(_ || _) | ||||
| 
 | ||||
|   def unwrapSingle1[F[_]: Sync: Files]( | ||||
|       logger: Logger[F], | ||||
|       dir: Path | ||||
|   ): F[Boolean] = | ||||
|     Files[F] | ||||
|       .list(dir) | ||||
|       .take(2) | ||||
|       .compile | ||||
|       .toList | ||||
|       .flatMap { | ||||
|         case subdir :: Nil => | ||||
|           nonEmpty(subdir) | ||||
|             .flatMap { | ||||
|               case false => false.pure[F] | ||||
|               case true => | ||||
|                 for { | ||||
|                   _ <- Files[F] | ||||
|                     .list(subdir) | ||||
|                     .filter(p => p != dir) | ||||
|                     .evalTap(c => logger.trace(s"Move $c -> ${dir / c.fileName}")) | ||||
|                     .evalMap(child => Files[F].move(child, dir / child.fileName)) | ||||
|                     .compile | ||||
|                     .drain | ||||
|                   _ <- Files[F].delete(subdir) | ||||
|                 } yield true | ||||
|             } | ||||
| 
 | ||||
|         case _ => | ||||
|           false.pure[F] | ||||
|       } | ||||
| } | ||||
							
								
								
									
										43
									
								
								modules/common/src/main/scala/docspell/common/util/Zip.scala
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										43
									
								
								modules/common/src/main/scala/docspell/common/util/Zip.scala
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,43 @@ | ||||
| /* | ||||
|  * Copyright 2020 Eike K. & Contributors | ||||
|  * | ||||
|  * SPDX-License-Identifier: AGPL-3.0-or-later | ||||
|  */ | ||||
|  | ||||
| package docspell.common.util | ||||
|  | ||||
| import cats.effect._ | ||||
| import fs2.io.file.Path | ||||
| import fs2.{Pipe, Stream} | ||||
|  | ||||
| import docspell.common.Glob | ||||
| import docspell.logging.Logger | ||||
|  | ||||
| trait Zip[F[_]] { | ||||
|  | ||||
|   def zip(chunkSize: Int = Zip.defaultChunkSize): Pipe[F, (String, Stream[F, Byte]), Byte] | ||||
|  | ||||
|   def zipFiles(chunkSize: Int = Zip.defaultChunkSize): Pipe[F, (String, Path), Byte] | ||||
|  | ||||
|   def unzip( | ||||
|       chunkSize: Int = Zip.defaultChunkSize, | ||||
|       glob: Glob = Glob.all, | ||||
|       targetDir: Option[Path] = None | ||||
|   ): Pipe[F, Byte, Path] | ||||
|  | ||||
|   def unzipFiles( | ||||
|       chunkSize: Int = Zip.defaultChunkSize, | ||||
|       glob: Glob = Glob.all, | ||||
|       targetDir: Path => Option[Path] = _ => None | ||||
|   ): Pipe[F, Path, Path] | ||||
| } | ||||
|  | ||||
| object Zip { | ||||
|   val defaultChunkSize = 64 * 1024 | ||||
|  | ||||
|   def apply[F[_]: Async]( | ||||
|       logger: Option[Logger[F]] = None, | ||||
|       tempDir: Option[Path] = None | ||||
|   ): Zip[F] = | ||||
|     new ZipImpl[F](logger, tempDir) | ||||
| } | ||||
							
								
								
									
										233
									
								
								modules/common/src/main/scala/docspell/common/util/ZipImpl.scala
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										233
									
								
								modules/common/src/main/scala/docspell/common/util/ZipImpl.scala
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,233 @@ | ||||
| /* | ||||
|  * Copyright 2020 Eike K. & Contributors | ||||
|  * | ||||
|  * SPDX-License-Identifier: AGPL-3.0-or-later | ||||
|  */ | ||||
|  | ||||
| package docspell.common.util | ||||
|  | ||||
| import java.io.BufferedInputStream | ||||
| import java.nio.charset.StandardCharsets | ||||
| import java.util.zip.{ZipEntry, ZipFile, ZipOutputStream} | ||||
|  | ||||
| import scala.jdk.CollectionConverters._ | ||||
| import scala.util.Using | ||||
| import scala.util.Using.Releasable | ||||
|  | ||||
| import cats.effect._ | ||||
| import cats.syntax.all._ | ||||
| import fs2.io.file.{Files, Path} | ||||
| import fs2.{Chunk, Pipe, Stream} | ||||
|  | ||||
| import docspell.common.Glob | ||||
| import docspell.logging.Logger | ||||
|  | ||||
| final private class ZipImpl[F[_]: Async]( | ||||
|     log: Option[Logger[F]], | ||||
|     tempDir: Option[Path] | ||||
| ) extends Zip[F] { | ||||
|   private[this] val logger = log.getOrElse(docspell.logging.Logger.offF[F]) | ||||
|  | ||||
|   private val createTempDir: Resource[F, Path] = | ||||
|     Files[F].tempDirectory(tempDir, "docspell-zip-", None) | ||||
|  | ||||
|   def zip(chunkSize: Int): Pipe[F, (String, Stream[F, Byte]), Byte] = | ||||
|     in => ZipImpl.zipJava(logger, chunkSize, in.through(ZipImpl.deduplicate)) | ||||
|  | ||||
|   def zipFiles(chunkSize: Int): Pipe[F, (String, Path), Byte] = | ||||
|     in => ZipImpl.zipJavaPath(logger, chunkSize, in.through(ZipImpl.deduplicate)) | ||||
|  | ||||
|   def unzip( | ||||
|       chunkSize: Int, | ||||
|       glob: Glob, | ||||
|       targetDir: Option[Path] | ||||
|   ): Pipe[F, Byte, Path] = { input => | ||||
|     Stream | ||||
|       .resource(Files[F].tempFile(tempDir, "", ".zip", None)) | ||||
|       .evalTap(tempFile => input.through(Files[F].writeAll(tempFile)).compile.drain) | ||||
|       .through(unzipFiles(chunkSize, glob, _ => targetDir)) | ||||
|   } | ||||
|  | ||||
|   def unzipFiles( | ||||
|       chunkSize: Int, | ||||
|       glob: Glob, | ||||
|       targetDir: Path => Option[Path] | ||||
|   ): Pipe[F, Path, Path] = | ||||
|     input => | ||||
|       for { | ||||
|         zipArchive <- input | ||||
|         tempDir <- targetDir(zipArchive) | ||||
|           .map(Stream.emit) | ||||
|           .getOrElse(Stream.resource(createTempDir)) | ||||
|         entries <- Stream.eval(Sync[F].blocking { | ||||
|           ZipImpl.unzipZipFile(zipArchive, tempDir, glob) | ||||
|         }) | ||||
|         e <- Stream.chunk(entries) | ||||
|       } yield e | ||||
| } | ||||
|  | ||||
| object ZipImpl { | ||||
|   implicit val zipFileReleasable: Releasable[ZipFile] = | ||||
|     (resource: ZipFile) => resource.close() | ||||
|  | ||||
|   private def unzipZipFile(zip: Path, target: Path, glob: Glob): Chunk[Path] = | ||||
|     Using.resource(new ZipFile(zip.toNioPath.toFile, StandardCharsets.UTF_8)) { zf => | ||||
|       Chunk.iterator( | ||||
|         zf.entries() | ||||
|           .asScala | ||||
|           .filter(ze => !ze.getName.endsWith("/")) | ||||
|           .filter(ze => glob.matchFilenameOrPath(ze.getName)) | ||||
|           .map { ze => | ||||
|             val out = target / ze.getName | ||||
|             out.parent.map(_.toNioPath).foreach { p => | ||||
|               java.nio.file.Files.createDirectories(p) | ||||
|             } | ||||
|             Using.resource(java.nio.file.Files.newOutputStream(out.toNioPath)) { fout => | ||||
|               zf.getInputStream(ze).transferTo(fout) | ||||
|               out | ||||
|             } | ||||
|           } | ||||
|       ) | ||||
|     } | ||||
|  | ||||
| //  private def unzipZipStream( | ||||
| //      zip: InputStream, | ||||
| //      target: Path, | ||||
| //      glob: Glob | ||||
| //  ): List[Path] = | ||||
| //    Using.resource(new ZipInputStream(zip, StandardCharsets.UTF_8)) { zf => | ||||
| //      @annotation.tailrec | ||||
| //      def go(entry: Option[ZipEntry], result: List[Path]): List[Path] = | ||||
| //        entry match { | ||||
| //          case Some(ze) if glob.matchFilenameOrPath(ze.getName) => | ||||
| //            val out = target / ze.getName | ||||
| //            Using.resource(java.nio.file.Files.newOutputStream(out.toNioPath)) { fout => | ||||
| //              zf.transferTo(fout) | ||||
| //            } | ||||
| //            zf.closeEntry() | ||||
| //            go(Option(zf.getNextEntry), out :: result) | ||||
| //          case Some(_) => | ||||
| //            zf.closeEntry() | ||||
| //            go(Option(zf.getNextEntry), result) | ||||
| //          case None => | ||||
| //            result | ||||
| //        } | ||||
| // | ||||
| //      go(Option(zf.getNextEntry), Nil) | ||||
| //    } | ||||
|  | ||||
| //  private def unzipStream2[F[_]: Async]( | ||||
| //      in: InputStream, | ||||
| //      chunkSize: Int, | ||||
| //      glob: Glob | ||||
| //  ): Stream[F, Binary[F]] = { | ||||
| //    val zin = new ZipInputStream(in) | ||||
| // | ||||
| //    val nextEntry = Resource.make(Sync[F].delay(Option(zin.getNextEntry))) { | ||||
| //      case Some(_) => Sync[F].delay(zin.closeEntry()) | ||||
| //      case None    => ().pure[F] | ||||
| //    } | ||||
| // | ||||
| //    Stream | ||||
| //      .resource(nextEntry) | ||||
| //      .repeat | ||||
| //      .unNoneTerminate | ||||
| //      .filter(ze => glob.matchFilenameOrPath(ze.getName)) | ||||
| //      .map { ze => | ||||
| //        val name = ze.getName | ||||
| //        val data = | ||||
| //          fs2.io.readInputStream[F]((zin: InputStream).pure[F], chunkSize, false) | ||||
| //        Binary(name, data) | ||||
| //      } | ||||
| //  } | ||||
|  | ||||
|   private def deduplicate[F[_]: Sync, A]: Pipe[F, (String, A), (String, A)] = { | ||||
|     def makeName(name: String, count: Int): String = | ||||
|       if (count <= 0) name | ||||
|       else | ||||
|         name.lastIndexOf('.') match { | ||||
|           case n if n > 0 => | ||||
|             s"${name.substring(0, n)}_$count${name.substring(n)}" | ||||
|           case _ => | ||||
|             s"${name}_$count" | ||||
|         } | ||||
|  | ||||
|     @annotation.tailrec | ||||
|     def unique( | ||||
|         current: Set[String], | ||||
|         name: String, | ||||
|         counter: Int | ||||
|     ): (Set[String], String) = { | ||||
|       val nextName = makeName(name, counter) | ||||
|       if (current.contains(nextName)) | ||||
|         unique(current, name, counter + 1) | ||||
|       else (current + nextName, nextName) | ||||
|     } | ||||
|  | ||||
|     in => | ||||
|       Stream | ||||
|         .eval(Ref.of[F, Set[String]](Set.empty[String])) | ||||
|         .flatMap { ref => | ||||
|           in.evalMap { element => | ||||
|             ref | ||||
|               .modify(names => unique(names, element._1, 0)) | ||||
|               .map(n => (n, element._2)) | ||||
|           } | ||||
|         } | ||||
|   } | ||||
|  | ||||
|   private def zipJava[F[_]: Async]( | ||||
|       logger: Logger[F], | ||||
|       chunkSize: Int, | ||||
|       entries: Stream[F, (String, Stream[F, Byte])] | ||||
|   ): Stream[F, Byte] = | ||||
|     fs2.io.readOutputStream(chunkSize) { out => | ||||
|       val zip = new ZipOutputStream(out, StandardCharsets.UTF_8) | ||||
|       val writeEntries = | ||||
|         entries.evalMap { case (name, bytes) => | ||||
|           val javaOut = | ||||
|             bytes.through( | ||||
|               fs2.io.writeOutputStream[F](Sync[F].pure(zip), closeAfterUse = false) | ||||
|             ) | ||||
|           val nextEntry = | ||||
|             logger.debug(s"Adding $name to zip file…") *> | ||||
|               Sync[F].delay(zip.putNextEntry(new ZipEntry(name))) | ||||
|           Resource | ||||
|             .make(nextEntry)(_ => Sync[F].delay(zip.closeEntry())) | ||||
|             .use(_ => javaOut.compile.drain) | ||||
|         } | ||||
|       val closeStream = Sync[F].delay(zip.close()) | ||||
|  | ||||
|       writeEntries.onFinalize(closeStream).compile.drain | ||||
|     } | ||||
|  | ||||
|   private def zipJavaPath[F[_]: Async]( | ||||
|       logger: Logger[F], | ||||
|       chunkSize: Int, | ||||
|       entries: Stream[F, (String, Path)] | ||||
|   ): Stream[F, Byte] = | ||||
|     fs2.io.readOutputStream(chunkSize) { out => | ||||
|       val zip = new ZipOutputStream(out, StandardCharsets.UTF_8) | ||||
|       val writeEntries = | ||||
|         entries.evalMap { case (name, file) => | ||||
|           val javaOut = Sync[F].blocking { | ||||
|             val fin = new BufferedInputStream( | ||||
|               java.nio.file.Files.newInputStream(file.toNioPath), | ||||
|               chunkSize | ||||
|             ) | ||||
|             fin.transferTo(zip) | ||||
|             fin.close() | ||||
|           } | ||||
|  | ||||
|           val nextEntry = | ||||
|             logger.debug(s"Adding $name to zip file…") *> | ||||
|               Sync[F].delay(zip.putNextEntry(new ZipEntry(name))) | ||||
|           Resource | ||||
|             .make(nextEntry)(_ => Sync[F].delay(zip.closeEntry())) | ||||
|             .use(_ => javaOut) | ||||
|         } | ||||
|       val closeStream = Sync[F].delay(zip.close()) | ||||
|  | ||||
|       writeEntries.onFinalize(closeStream).compile.drain | ||||
|     } | ||||
| } | ||||
| @@ -0,0 +1,114 @@ | ||||
| /* | ||||
|  * Copyright 2020 Eike K. & Contributors | ||||
|  * | ||||
|  * SPDX-License-Identifier: AGPL-3.0-or-later | ||||
|  */ | ||||
|  | ||||
| package docspell.common.util | ||||
|  | ||||
| import cats.effect._ | ||||
| import cats.syntax.all._ | ||||
| import fs2.io.file.{Files, Path} | ||||
|  | ||||
| import docspell.logging.TestLoggingConfig | ||||
|  | ||||
| import munit.CatsEffectSuite | ||||
|  | ||||
| class DirectoryTest extends CatsEffectSuite with TestLoggingConfig { | ||||
|   val logger = docspell.logging.getLogger[IO] | ||||
|   val tempDir = ResourceFixture( | ||||
|     Files[IO].tempDirectory(Path("target").some, "directory-test-", None) | ||||
|   ) | ||||
|  | ||||
|   tempDir.test("unwrap directory when non empty") { dir => | ||||
|     for { | ||||
|       _ <- createDirectoryTree(dir, List("test/file1", "test/file2")) | ||||
|       r <- Directory.unwrapSingle1(logger, dir) | ||||
|       files <- Files[IO] | ||||
|         .list(dir) | ||||
|         .map(file => dir.relativize(file).toString) | ||||
|         .compile | ||||
|         .toVector | ||||
|       _ = { | ||||
|         assert(r) | ||||
|         assertEquals(files.sorted, Vector("file1", "file2")) | ||||
|       } | ||||
|     } yield () | ||||
|   } | ||||
|  | ||||
|   tempDir.test("unwrap directory when not empty repeat") { dir => | ||||
|     for { | ||||
|       _ <- createDirectoryTree(dir, List("test/file1", "test/file2")) | ||||
|       r <- Directory.unwrapSingle(logger, dir) | ||||
|       files <- Files[IO] | ||||
|         .list(dir) | ||||
|         .map(file => dir.relativize(file).toString) | ||||
|         .compile | ||||
|         .toVector | ||||
|       _ = { | ||||
|         assert(r) | ||||
|         assertEquals(files.sorted, Vector("file1", "file2")) | ||||
|       } | ||||
|     } yield () | ||||
|   } | ||||
|  | ||||
|   tempDir.test("unwrap nested directory") { dir => | ||||
|     for { | ||||
|       _ <- createDirectoryTree(dir, List("test0/test1/file1", "test0/test1/file2")) | ||||
|       r <- Directory.unwrapSingle(logger, dir) | ||||
|       files <- Files[IO] | ||||
|         .list(dir) | ||||
|         .map(file => dir.relativize(file).toString) | ||||
|         .compile | ||||
|         .toVector | ||||
|       _ = { | ||||
|         assert(r) | ||||
|         assertEquals(files.sorted, Vector("file1", "file2")) | ||||
|       } | ||||
|     } yield () | ||||
|   } | ||||
|  | ||||
|   tempDir.test("do nothing on empty directory") { dir => | ||||
|     for { | ||||
|       r1 <- Directory.unwrapSingle1[IO](logger, dir) | ||||
|       r2 <- Directory.unwrapSingle[IO](logger, dir) | ||||
|       _ = { | ||||
|         assert(!r1) | ||||
|         assert(!r2) | ||||
|       } | ||||
|     } yield () | ||||
|   } | ||||
|  | ||||
|   tempDir.test("do nothing when directory contains more than one entry") { dir => | ||||
|     for { | ||||
|       _ <- createDirectoryTree(dir, List("test1/file1", "file2")) | ||||
|       r1 <- Directory.unwrapSingle1[IO](logger, dir) | ||||
|       r2 <- Directory.unwrapSingle[IO](logger, dir) | ||||
|       _ = { | ||||
|         assert(!r1) | ||||
|         assert(!r2) | ||||
|       } | ||||
|     } yield () | ||||
|   } | ||||
|  | ||||
|   tempDir.test("do nothing when directory contains more than one entry (2)") { dir => | ||||
|     for { | ||||
|       _ <- createDirectoryTree(dir, List("file1", "file2")) | ||||
|       r1 <- Directory.unwrapSingle1[IO](logger, dir) | ||||
|       r2 <- Directory.unwrapSingle[IO](logger, dir) | ||||
|       _ = { | ||||
|         assert(!r1) | ||||
|         assert(!r2) | ||||
|       } | ||||
|     } yield () | ||||
|   } | ||||
|  | ||||
|   def createDirectoryTree(dir: Path, entries: List[String]): IO[Unit] = | ||||
|     entries.traverse_ { name => | ||||
|       val out = dir / name | ||||
|       out.parent | ||||
|         .map(p => Files[IO].createDirectories(p)) | ||||
|         .getOrElse(IO.unit) *> | ||||
|         Files[IO].createFile(out) | ||||
|     } | ||||
| } | ||||
| @@ -7,19 +7,16 @@ | ||||
| package docspell.files | ||||
|  | ||||
| import cats.data.OptionT | ||||
| import cats.effect.Sync | ||||
| import cats.effect.{Async, Sync} | ||||
| import cats.syntax.all._ | ||||
| import fs2.Stream | ||||
| import fs2.Pipe | ||||
| import fs2.io.file.{Files, Path} | ||||
|  | ||||
| import docspell.common.{MimeType, MimeTypeHint} | ||||
|  | ||||
| import io.circe.Encoder | ||||
| import io.circe.syntax._ | ||||
| import docspell.common.{Binary, MimeType, MimeTypeHint} | ||||
|  | ||||
| trait FileSupport { | ||||
|   implicit final class FileOps[F[_]: Files: Sync](self: Path) { | ||||
|     def detectMime: F[Option[MimeType]] = | ||||
|   implicit final class FileOps(self: Path) { | ||||
|     def detectMime[F[_]: Files: Sync]: F[Option[MimeType]] = | ||||
|       Files[F].isReadable(self).flatMap { flag => | ||||
|         OptionT | ||||
|           .whenF(flag) { | ||||
| @@ -32,30 +29,18 @@ trait FileSupport { | ||||
|           .value | ||||
|       } | ||||
|  | ||||
|     def asTextFile(alt: MimeType => F[Unit]): F[Option[Path]] = | ||||
|       OptionT(detectMime).flatMapF { mime => | ||||
|         if (mime.matches(MimeType.text("plain"))) self.some.pure[F] | ||||
|         else alt(mime).as(None: Option[Path]) | ||||
|       }.value | ||||
|  | ||||
|     def readText: F[String] = | ||||
|       Files[F] | ||||
|         .readAll(self) | ||||
|         .through(fs2.text.utf8.decode) | ||||
|         .compile | ||||
|         .string | ||||
|  | ||||
|     def readAll: Stream[F, Byte] = | ||||
|       Files[F].readAll(self) | ||||
|  | ||||
|     def writeJson[A: Encoder](value: A): F[Unit] = | ||||
|       Stream | ||||
|         .emit(value.asJson.noSpaces) | ||||
|         .through(fs2.text.utf8.encode) | ||||
|         .through(Files[F].writeAll(self)) | ||||
|         .compile | ||||
|         .drain | ||||
|     def mimeType[F[_]: Files: Sync]: F[MimeType] = | ||||
|       detectMime.map(_.getOrElse(MimeType.octetStream)) | ||||
|   } | ||||
|  | ||||
|   def detectMime[F[_]: Sync]: Pipe[F, Binary[F], Binary[F]] = | ||||
|     _.evalMap { bin => | ||||
|       val hint = MimeTypeHint.filename(bin.name).withAdvertised(bin.mime.asString) | ||||
|       TikaMimetype.detect[F](bin.data, hint).map(mt => bin.copy(mime = mt)) | ||||
|     } | ||||
|  | ||||
|   def toBinaryWithMime[F[_]: Async]: Pipe[F, Path, Binary[F]] = | ||||
|     _.evalMap(file => file.mimeType.map(mt => Binary(file).copy(mime = mt))) | ||||
| } | ||||
|  | ||||
| object FileSupport extends FileSupport | ||||
|   | ||||
| @@ -1,180 +0,0 @@ | ||||
| /* | ||||
|  * Copyright 2020 Eike K. & Contributors | ||||
|  * | ||||
|  * SPDX-License-Identifier: AGPL-3.0-or-later | ||||
|  */ | ||||
|  | ||||
| package docspell.files | ||||
|  | ||||
| import java.io.InputStream | ||||
| import java.nio.charset.StandardCharsets | ||||
| import java.util.zip.{ZipEntry, ZipInputStream, ZipOutputStream} | ||||
|  | ||||
| import cats.data.OptionT | ||||
| import cats.effect._ | ||||
| import cats.implicits._ | ||||
| import fs2.io.file.{Files, Path} | ||||
| import fs2.{Pipe, Stream} | ||||
|  | ||||
| import docspell.common.Binary | ||||
| import docspell.common.Glob | ||||
| import docspell.logging.Logger | ||||
|  | ||||
| object Zip { | ||||
|  | ||||
|   def zip[F[_]: Async]( | ||||
|       logger: Logger[F], | ||||
|       chunkSize: Int | ||||
|   ): Pipe[F, (String, Stream[F, Byte]), Byte] = | ||||
|     in => zipJava(logger, chunkSize, in.through(deduplicate)) | ||||
|  | ||||
|   def unzip[F[_]: Async]( | ||||
|       chunkSize: Int, | ||||
|       glob: Glob | ||||
|   ): Pipe[F, Byte, Binary[F]] = | ||||
|     s => unzipStream[F](chunkSize, glob)(s) | ||||
|  | ||||
|   def unzipStream[F[_]: Async](chunkSize: Int, glob: Glob)( | ||||
|       data: Stream[F, Byte] | ||||
|   ): Stream[F, Binary[F]] = | ||||
|     data | ||||
|       .through(fs2.io.toInputStream[F]) | ||||
|       .flatMap(in => unzipJava(in, chunkSize, glob)) | ||||
|  | ||||
|   def saveTo[F[_]: Async]( | ||||
|       logger: Logger[F], | ||||
|       targetDir: Path, | ||||
|       moveUp: Boolean | ||||
|   ): Pipe[F, Binary[F], Path] = | ||||
|     binaries => | ||||
|       binaries | ||||
|         .filter(e => !e.name.endsWith("/")) | ||||
|         .evalMap { entry => | ||||
|           val out = targetDir / entry.name | ||||
|           val createParent = | ||||
|             OptionT | ||||
|               .fromOption[F](out.parent) | ||||
|               .flatMapF(parent => | ||||
|                 Files[F] | ||||
|                   .exists(parent) | ||||
|                   .map(flag => Option.when(!flag)(parent)) | ||||
|               ) | ||||
|               .semiflatMap(p => Files[F].createDirectories(p)) | ||||
|               .getOrElse(()) | ||||
|  | ||||
|           logger.trace(s"Unzip ${entry.name} -> $out") *> | ||||
|             createParent *> | ||||
|             entry.data.through(Files[F].writeAll(out)).compile.drain | ||||
|         } | ||||
|         .drain ++ Stream | ||||
|         .eval(if (moveUp) moveContentsUp(logger)(targetDir) else ().pure[F]) | ||||
|         .as(targetDir) | ||||
|  | ||||
|   private def moveContentsUp[F[_]: Sync: Files](logger: Logger[F])(dir: Path): F[Unit] = | ||||
|     Files[F] | ||||
|       .list(dir) | ||||
|       .take(2) | ||||
|       .compile | ||||
|       .toList | ||||
|       .flatMap { | ||||
|         case subdir :: Nil => | ||||
|           Files[F].isDirectory(subdir).flatMap { | ||||
|             case false => ().pure[F] | ||||
|             case true => | ||||
|               Files[F] | ||||
|                 .list(subdir) | ||||
|                 .filter(p => p != dir) | ||||
|                 .evalTap(c => logger.trace(s"Move $c -> ${dir / c.fileName}")) | ||||
|                 .evalMap(child => Files[F].move(child, dir / child.fileName)) | ||||
|                 .compile | ||||
|                 .drain | ||||
|           } | ||||
|  | ||||
|         case _ => | ||||
|           ().pure[F] | ||||
|       } | ||||
|  | ||||
|   def unzipJava[F[_]: Async]( | ||||
|       in: InputStream, | ||||
|       chunkSize: Int, | ||||
|       glob: Glob | ||||
|   ): Stream[F, Binary[F]] = { | ||||
|     val zin = new ZipInputStream(in) | ||||
|  | ||||
|     val nextEntry = Resource.make(Sync[F].delay(Option(zin.getNextEntry))) { | ||||
|       case Some(_) => Sync[F].delay(zin.closeEntry()) | ||||
|       case None    => ().pure[F] | ||||
|     } | ||||
|  | ||||
|     Stream | ||||
|       .resource(nextEntry) | ||||
|       .repeat | ||||
|       .unNoneTerminate | ||||
|       .filter(ze => glob.matchFilenameOrPath(ze.getName())) | ||||
|       .map { ze => | ||||
|         val name = ze.getName() | ||||
|         val data = | ||||
|           fs2.io.readInputStream[F]((zin: InputStream).pure[F], chunkSize, false) | ||||
|         Binary(name, data) | ||||
|       } | ||||
|   } | ||||
|  | ||||
|   private def deduplicate[F[_]: Sync, A]: Pipe[F, (String, A), (String, A)] = { | ||||
|     def makeName(name: String, count: Int): String = | ||||
|       if (count <= 0) name | ||||
|       else | ||||
|         name.lastIndexOf('.') match { | ||||
|           case n if n > 0 => | ||||
|             s"${name.substring(0, n)}_$count${name.substring(n)}" | ||||
|           case _ => | ||||
|             s"${name}_$count" | ||||
|         } | ||||
|  | ||||
|     def unique( | ||||
|         current: Set[String], | ||||
|         name: String, | ||||
|         counter: Int | ||||
|     ): (Set[String], String) = { | ||||
|       val nextName = makeName(name, counter) | ||||
|       if (current.contains(nextName)) | ||||
|         unique(current, name, counter + 1) | ||||
|       else (current + nextName, nextName) | ||||
|     } | ||||
|  | ||||
|     in => | ||||
|       Stream | ||||
|         .eval(Ref.of[F, Set[String]](Set.empty[String])) | ||||
|         .flatMap { ref => | ||||
|           in.evalMap { element => | ||||
|             ref | ||||
|               .modify(names => unique(names, element._1, 0)) | ||||
|               .map(n => (n, element._2)) | ||||
|           } | ||||
|         } | ||||
|   } | ||||
|  | ||||
|   def zipJava[F[_]: Async]( | ||||
|       logger: Logger[F], | ||||
|       chunkSize: Int, | ||||
|       entries: Stream[F, (String, Stream[F, Byte])] | ||||
|   ): Stream[F, Byte] = | ||||
|     fs2.io.readOutputStream(chunkSize) { out => | ||||
|       val zip = new ZipOutputStream(out, StandardCharsets.UTF_8) | ||||
|       val writeEntries = | ||||
|         entries.evalMap { case (name, bytes) => | ||||
|           val javaOut = | ||||
|             bytes.through( | ||||
|               fs2.io.writeOutputStream[F](Sync[F].pure(zip), closeAfterUse = false) | ||||
|             ) | ||||
|           val nextEntry = | ||||
|             logger.debug(s"Adding $name to zip file…") *> | ||||
|               Sync[F].delay(zip.putNextEntry(new ZipEntry(name))) | ||||
|           Resource | ||||
|             .make(nextEntry)(_ => Sync[F].delay(zip.closeEntry())) | ||||
|             .use(_ => javaOut.compile.drain) | ||||
|         } | ||||
|       val closeStream = Sync[F].delay(zip.close()) | ||||
|  | ||||
|       writeEntries.onFinalize(closeStream).compile.drain | ||||
|     } | ||||
| } | ||||
| @@ -7,10 +7,12 @@ | ||||
| package docspell.files | ||||
|  | ||||
| import cats.effect._ | ||||
| import cats.implicits._ | ||||
| import cats.syntax.option._ | ||||
| import fs2.Stream | ||||
| import fs2.io.file.{Files, Path} | ||||
|  | ||||
| import docspell.common.Glob | ||||
| import docspell.common.syntax.file._ | ||||
| import docspell.common.util.Zip | ||||
| import docspell.logging.TestLoggingConfig | ||||
|  | ||||
| import munit._ | ||||
| @@ -21,29 +23,101 @@ class ZipTest extends CatsEffectSuite with TestLoggingConfig { | ||||
|     Files[IO].tempDirectory(Path("target").some, "zip-test-", None) | ||||
|   ) | ||||
|  | ||||
|   test("unzip") { | ||||
|   tempDir.test("unzip") { dir => | ||||
|     val zipFile = ExampleFiles.letters_zip.readURL[IO](8192) | ||||
|     val unzip = zipFile.through(Zip.unzip(8192, Glob.all)) | ||||
|     val unzip: Stream[IO, Path] = zipFile | ||||
|       .through(Zip[IO](logger.some, dir.some).unzip(8192)) | ||||
|  | ||||
|     unzip | ||||
|       .evalMap { entry => | ||||
|         val x = entry.data.map(_ => 1).foldMonoid.compile.lastOrError | ||||
|         x.map { size => | ||||
|           if (entry.name.endsWith(".pdf")) { | ||||
|             assertEquals(entry.name, "letter-de.pdf") | ||||
|             assertEquals(size, 34815) | ||||
|           } else { | ||||
|             assertEquals(entry.name, "letter-en.txt") | ||||
|             assertEquals(size, 1131) | ||||
|           } | ||||
|     (for { | ||||
|       file <- unzip | ||||
|       length <- Stream.eval(Files[IO].size(file)) | ||||
|       sha <- Stream.eval(file.sha256Hex[IO]) | ||||
|       _ = { | ||||
|         if (file.name == "letter-de.pdf") { | ||||
|           assertEquals(length, 34815L) | ||||
|           assertEquals( | ||||
|             sha, | ||||
|             "299c15429ce327099c322b36caaec56e7a6034106531c5d1b3fd085467a8d495" | ||||
|           ) | ||||
|         } else { | ||||
|           assertEquals(file.name, "letter-en.txt") | ||||
|           assertEquals(length, 1131L) | ||||
|           assertEquals( | ||||
|             sha, | ||||
|             "55eca47c65084126d7c3bbce941cadff0f642a7287ff8e0f3fc9c2c33a4bb7f0" | ||||
|           ) | ||||
|         } | ||||
|       } | ||||
|     } yield ()).compile.drain | ||||
|   } | ||||
|  | ||||
|   tempDir.test("unzip directories and files") { dir => | ||||
|     val zipFile = ExampleFiles.zip_dirs_zip.readURL[IO](8192) | ||||
|     val unzip: Stream[IO, Path] = zipFile | ||||
|       .through(Zip[IO](logger.some, dir.some).unzip(8192)) | ||||
|  | ||||
|     val entries = | ||||
|       for { | ||||
|         file <- unzip | ||||
|         sha <- Stream.eval(file.sha256Hex[IO]) | ||||
|       } yield (file.name, file, sha) | ||||
|  | ||||
|     val expectedSha = | ||||
|       "5891b5b522d5df086d0ff0b110fbd9d21bb4fc7163af34d08286a2e846f6be03" | ||||
|  | ||||
|     entries | ||||
|       .map { | ||||
|         case ("file1.txt", file, realSha) => | ||||
|           assertEquals(realSha, expectedSha) | ||||
|           val relFile = dir.relativize(file).dropLeft(1) | ||||
|           assertEquals(relFile.toString, "file1.txt") | ||||
|  | ||||
|         case ("file2.txt", file, realSha) => | ||||
|           assertEquals(realSha, expectedSha) | ||||
|           val relFile = dir.relativize(file).dropLeft(1) | ||||
|           assertEquals(relFile.toString, "dir1/file2.txt") | ||||
|  | ||||
|         case ("file3.txt", file, realSha) => | ||||
|           assertEquals(realSha, expectedSha) | ||||
|           val relFile = dir.relativize(file).dropLeft(1) | ||||
|           assertEquals(relFile.toString, "dir1/dir11/file3.txt") | ||||
|  | ||||
|         case ("file4.txt", file, realSha) => | ||||
|           assertEquals(realSha, expectedSha) | ||||
|           val relFile = dir.relativize(file).dropLeft(1) | ||||
|           assertEquals(relFile.toString, "dir2/file4.txt") | ||||
|  | ||||
|         case (name, _, _) => | ||||
|           fail(s"Unexpected file: $name") | ||||
|       } | ||||
|       .compile | ||||
|       .drain | ||||
|   } | ||||
|  | ||||
|   tempDir.test("unzipTo directory tree") { _ => | ||||
|     // val zipFile = ExampleFiles.zip_dirs_zip.readURL[IO](8192) | ||||
|     // zipFile.through(Zip.unzip(G)) | ||||
|   } | ||||
| //  tempDir.test("test runtime") { _ => | ||||
| //    val archive = Path("./local/large-archive.zip") | ||||
| // | ||||
| //    for { | ||||
| // | ||||
| //      timer1 <- Duration.stopTime[IO] | ||||
| //      es1 <- Files[IO] | ||||
| //        .readAll(archive) | ||||
| //        .through(Zip[IO]().unzip(64 * 1024)) | ||||
| //        .compile | ||||
| //        .toVector | ||||
| //      duration1 <- timer1 | ||||
| // | ||||
| //      timer2 <- Duration.stopTime[IO] | ||||
| //      es2 <- fs2.Stream | ||||
| //        .emit(archive) | ||||
| //        .covary[IO] | ||||
| //        .through(Zip[IO]().unzipFiles(64 * 1024)) | ||||
| //        .compile | ||||
| //        .toVector | ||||
| //      duration2 <- timer2 | ||||
| // | ||||
| //      _ <- IO.println(s">>>>1. ${duration1.formatExact}, entries: $es1") | ||||
| //      _ <- IO.println(s">>>>2. ${duration2.formatExact}, entries: $es2") | ||||
| //    } yield () | ||||
| //  } | ||||
| } | ||||
|   | ||||
| @@ -15,14 +15,14 @@ import docspell.addons.{AddonTriggerType, InputEnv, Middleware} | ||||
| import docspell.backend.joex.AddonOps.ExecResult | ||||
| import docspell.backend.joex.{AddonOps, LoggerExtension} | ||||
| import docspell.common._ | ||||
| import docspell.files.FileSupport | ||||
| import docspell.common.syntax.file._ | ||||
| import docspell.joex.process.ItemData | ||||
| import docspell.logging.Logger | ||||
| import docspell.scheduler.Task | ||||
| import docspell.store.Store | ||||
| import docspell.store.queries.QAttachment | ||||
|  | ||||
| object GenericItemAddonTask extends LoggerExtension with FileSupport { | ||||
| object GenericItemAddonTask extends LoggerExtension { | ||||
|  | ||||
|   private val itemSubdir = "item" | ||||
|   private val itemDataJson = s"$itemSubdir/item-data.json" | ||||
|   | ||||
| @@ -16,7 +16,7 @@ import docspell.backend.ops.ODownloadAll | ||||
| import docspell.backend.ops.ODownloadAll.model.DownloadSummary | ||||
| import docspell.backend.task.DownloadZipArgs | ||||
| import docspell.common._ | ||||
| import docspell.files.Zip | ||||
| import docspell.common.util.Zip | ||||
| import docspell.scheduler.Task | ||||
| import docspell.store.Store | ||||
| import docspell.store.queries.{ItemFileMeta, QItem} | ||||
| @@ -50,7 +50,7 @@ object DownloadZipTask { | ||||
|  | ||||
|       val storeZipFile = | ||||
|         allFiles | ||||
|           .through(Zip.zip(ctx.logger, chunkSize)) | ||||
|           .through(Zip[F](ctx.logger.some).zip(chunkSize)) | ||||
|           .through( | ||||
|             store.fileRepo.save( | ||||
|               ctx.args.accountId.collective, | ||||
|   | ||||
| @@ -14,7 +14,7 @@ import fs2.Stream | ||||
|  | ||||
| import docspell.backend.JobFactory | ||||
| import docspell.common._ | ||||
| import docspell.files.Zip | ||||
| import docspell.common.util.Zip | ||||
| import docspell.logging.Logger | ||||
| import docspell.scheduler._ | ||||
| import docspell.store.Store | ||||
| @@ -44,7 +44,7 @@ object MultiUploadArchiveTask { | ||||
|                 extractZip(store, ctx.args)(file) | ||||
|                   .evalTap(entry => | ||||
|                     ctx.logger.debug( | ||||
|                       s"Create job for entry: ${entry.files.flatMap(_.name)}" | ||||
|                       s"Create job for entry: ${entry.files.flatMap(_.name).mkString(", ")}" | ||||
|                     ) | ||||
|                   ) | ||||
|                   .evalMap(makeJob[F](ctx, jobStore)) | ||||
| @@ -109,7 +109,8 @@ object MultiUploadArchiveTask { | ||||
|   )(file: ProcessItemArgs.File): Stream[F, ProcessItemArgs] = | ||||
|     store.fileRepo | ||||
|       .getBytes(file.fileMetaId) | ||||
|       .through(Zip.unzip[F](8192, args.meta.fileFilter.getOrElse(Glob.all))) | ||||
|       .through(Zip[F]().unzip(glob = args.meta.fileFilter.getOrElse(Glob.all))) | ||||
|       .through(Binary.toBinary[F]) | ||||
|       .flatMap { entry => | ||||
|         val hint = MimeTypeHint(entry.name.some, entry.mime.asString.some) | ||||
|         entry.data | ||||
|   | ||||
| @@ -16,7 +16,7 @@ import cats.kernel.Order | ||||
| import fs2.Stream | ||||
|  | ||||
| import docspell.common._ | ||||
| import docspell.files.Zip | ||||
| import docspell.common.util.Zip | ||||
| import docspell.joex.mail._ | ||||
| import docspell.scheduler._ | ||||
| import docspell.store.Store | ||||
| @@ -146,7 +146,8 @@ object ExtractArchive { | ||||
|     val glob = ctx.args.meta.fileFilter.getOrElse(Glob.all) | ||||
|     ctx.logger.debug(s"Filtering zip entries with '${glob.asString}'") *> | ||||
|       zipData | ||||
|         .through(Zip.unzip[F](8192, glob)) | ||||
|         .through(Zip[F](ctx.logger.some).unzip(glob = glob)) | ||||
|         .through(Binary.toBinary[F]) | ||||
|         .zipWithIndex | ||||
|         .flatMap(handleEntry(ctx, store, ra, pos, archive, None)) | ||||
|         .foldMonoid | ||||
|   | ||||
		Reference in New Issue
	
	Block a user