Improve performance of zip/unzip

Adds tests and includes some cleanup
This commit is contained in:
eikek
2022-06-18 16:37:38 +02:00
parent 483dbf5d2b
commit 6cef9d4f07
24 changed files with 711 additions and 293 deletions

View File

@ -9,9 +9,14 @@ package docspell.common
import java.nio.charset.Charset
import java.nio.charset.StandardCharsets
import cats.data.OptionT
import cats.effect._
import cats.syntax.all._
import fs2.io.file.{Files, Path}
import fs2.{Chunk, Pipe, Stream}
import docspell.logging.Logger
import scodec.bits.ByteVector
final case class Binary[F[_]](name: String, mime: MimeType, data: Stream[F, Byte]) {
@ -34,6 +39,9 @@ final case class Binary[F[_]](name: String, mime: MimeType, data: Stream[F, Byte
object Binary {
def apply[F[_]: Async](file: Path): Binary[F] =
Binary(file.fileName.toString, Files[F].readAll(file))
def apply[F[_]](name: String, data: Stream[F, Byte]): Binary[F] =
Binary[F](name, MimeType.octetStream, data)
@ -65,6 +73,38 @@ object Binary {
def loadAllBytes[F[_]: Sync](data: Stream[F, Byte]): F[ByteVector] =
data.chunks.map(_.toByteVector).compile.fold(ByteVector.empty)((r, e) => r ++ e)
/** Convert paths into `Binary`s */
def toBinary[F[_]: Async]: Pipe[F, Path, Binary[F]] =
_.map(Binary[F](_))
/** Save one or more binaries to a target directory. */
def saveTo[F[_]: Async](
logger: Logger[F],
targetDir: Path
): Pipe[F, Binary[F], Path] =
binaries =>
binaries
.filter(e => !e.name.endsWith("/"))
.evalMap { entry =>
val out = targetDir / entry.name
val createParent =
OptionT
.fromOption[F](out.parent)
.flatMapF(parent =>
Files[F]
.exists(parent)
.map(flag => Option.when(!flag)(parent))
)
.semiflatMap(p => Files[F].createDirectories(p))
.getOrElse(())
logger.trace(s"Copy ${entry.name} -> $out") *>
createParent *>
entry.data.through(Files[F].writeAll(out)).compile.drain
}
.drain
.as(targetDir)
// This is a copy from org.http4s.util
// Http4s is licensed under the Apache License 2.0
private object util {

View File

@ -6,17 +6,49 @@
package docspell.common.syntax
import fs2.io.file.Path
import java.nio.file.{Files => NioFiles}
import cats.effect._
import fs2.Stream
import fs2.io.file.{Files, Path}
import docspell.common.syntax.stream._
import io.circe.Encoder
import io.circe.syntax._
trait FileSyntax {
implicit final class PathOps(p: Path) {
implicit final class PathOps(self: Path) {
def absolutePath: Path =
p.absolute
self.absolute
def absolutePathAsString: String =
absolutePath.toString
def name: String = self.fileName.toString
def extension: String = self.extName.stripPrefix(".")
def dropLeft(n: Int): Path =
Path.fromNioPath(self.toNioPath.subpath(n, self.toNioPath.getNameCount))
def readString[F[_]: Sync]: F[String] = Sync[F].blocking(
NioFiles.readString(self.toNioPath)
)
def sha256Hex[F[_]: Files: Sync]: F[String] =
Files[F].readAll(self).sha256Hex
def readAll[F[_]: Files]: Stream[F, Byte] =
Files[F].readAll(self)
def writeJson[A: Encoder, F[_]: Files: Sync](value: A): F[Unit] =
Stream
.emit(value.asJson.noSpaces)
.through(fs2.text.utf8.encode)
.through(Files[F].writeAll(self))
.compile
.drain
}
}

View File

@ -12,6 +12,7 @@ import fs2.Stream
import io.circe._
import io.circe.parser._
import scodec.bits.ByteVector
trait StreamSyntax {
implicit class StringStreamOps[F[_]](s: Stream[F, String]) {
@ -24,4 +25,14 @@ trait StreamSyntax {
} yield value
)
}
implicit final class ByteStreamSyntax[F[_]](self: Stream[F, Byte]) {
def sha256Hex(implicit F: Sync[F]): F[String] =
self
.through(fs2.hash.sha256)
.compile
.foldChunks(ByteVector.empty)(_ ++ _.toByteVector)
.map(_.toHex)
}
}
object StreamSyntax extends StreamSyntax

View File

@ -8,6 +8,11 @@ package docspell.common
package object syntax {
val either = EitherSyntax
val stream = StreamSyntax
val string = StringSyntax
val file = FileSyntax
object all extends EitherSyntax with StreamSyntax with StringSyntax with FileSyntax
}

View File

@ -0,0 +1,109 @@
/*
* Copyright 2020 Eike K. & Contributors
*
* SPDX-License-Identifier: AGPL-3.0-or-later
*/
package docspell.common.util
import cats.effect._
import cats.syntax.all._
import cats.{Applicative, Monad}
import fs2.Stream
import fs2.io.file.{Files, Path, PosixPermissions}
import docspell.logging.Logger
/** Utility functions for directories. */
object Directory {
def create[F[_]: Files: Applicative](dir: Path): F[Path] =
Files[F]
.createDirectories(dir, PosixPermissions.fromOctal("777"))
.as(dir)
def createAll[F[_]: Files: Applicative](dir: Path, dirs: Path*): F[Unit] =
(dir :: dirs.toList).traverse_(Files[F].createDirectories(_))
def nonEmpty[F[_]: Files: Sync](dir: Path): F[Boolean] =
List(
Files[F].isDirectory(dir),
Files[F].list(dir).take(1).compile.last.map(_.isDefined)
).sequence.map(_.forall(identity))
def isEmpty[F[_]: Files: Sync](dir: Path): F[Boolean] =
nonEmpty(dir).map(b => !b)
def temp[F[_]: Files](parent: Path, prefix: String): Resource[F, Path] =
for {
_ <- Resource.eval(Files[F].createDirectories(parent))
d <- mkTemp(parent, prefix)
} yield d
def createTemp[F[_]: Files: Monad](
parent: Path,
prefix: String
): F[Path] =
for {
_ <- Files[F].createDirectories(parent)
d <- mkTemp_(parent, prefix)
} yield d
private def mkTemp[F[_]: Files](parent: Path, prefix: String): Resource[F, Path] =
Files[F]
.tempDirectory(
parent.some,
prefix,
PosixPermissions.fromOctal("777")
)
private def mkTemp_[F[_]: Files](parent: Path, prefix: String): F[Path] =
Files[F]
.createTempDirectory(
parent.some,
prefix,
PosixPermissions.fromOctal("777")
)
/** If `dir` contains only a single non-empty directory, then its contents are moved out
* of it and the directory is deleted. This is applied repeatedly until the condition
* doesn't apply anymore (there are multiple entries in the directory or none).
*/
def unwrapSingle[F[_]: Sync: Files](logger: Logger[F], dir: Path): F[Boolean] =
Stream
.repeatEval(unwrapSingle1(logger, dir))
.takeWhile(identity)
.compile
.fold(false)(_ || _)
def unwrapSingle1[F[_]: Sync: Files](
logger: Logger[F],
dir: Path
): F[Boolean] =
Files[F]
.list(dir)
.take(2)
.compile
.toList
.flatMap {
case subdir :: Nil =>
nonEmpty(subdir)
.flatMap {
case false => false.pure[F]
case true =>
for {
_ <- Files[F]
.list(subdir)
.filter(p => p != dir)
.evalTap(c => logger.trace(s"Move $c -> ${dir / c.fileName}"))
.evalMap(child => Files[F].move(child, dir / child.fileName))
.compile
.drain
_ <- Files[F].delete(subdir)
} yield true
}
case _ =>
false.pure[F]
}
}

View File

@ -0,0 +1,43 @@
/*
* Copyright 2020 Eike K. & Contributors
*
* SPDX-License-Identifier: AGPL-3.0-or-later
*/
package docspell.common.util
import cats.effect._
import fs2.io.file.Path
import fs2.{Pipe, Stream}
import docspell.common.Glob
import docspell.logging.Logger
trait Zip[F[_]] {
def zip(chunkSize: Int = Zip.defaultChunkSize): Pipe[F, (String, Stream[F, Byte]), Byte]
def zipFiles(chunkSize: Int = Zip.defaultChunkSize): Pipe[F, (String, Path), Byte]
def unzip(
chunkSize: Int = Zip.defaultChunkSize,
glob: Glob = Glob.all,
targetDir: Option[Path] = None
): Pipe[F, Byte, Path]
def unzipFiles(
chunkSize: Int = Zip.defaultChunkSize,
glob: Glob = Glob.all,
targetDir: Path => Option[Path] = _ => None
): Pipe[F, Path, Path]
}
object Zip {
val defaultChunkSize = 64 * 1024
def apply[F[_]: Async](
logger: Option[Logger[F]] = None,
tempDir: Option[Path] = None
): Zip[F] =
new ZipImpl[F](logger, tempDir)
}

View File

@ -0,0 +1,233 @@
/*
* Copyright 2020 Eike K. & Contributors
*
* SPDX-License-Identifier: AGPL-3.0-or-later
*/
package docspell.common.util
import java.io.BufferedInputStream
import java.nio.charset.StandardCharsets
import java.util.zip.{ZipEntry, ZipFile, ZipOutputStream}
import scala.jdk.CollectionConverters._
import scala.util.Using
import scala.util.Using.Releasable
import cats.effect._
import cats.syntax.all._
import fs2.io.file.{Files, Path}
import fs2.{Chunk, Pipe, Stream}
import docspell.common.Glob
import docspell.logging.Logger
final private class ZipImpl[F[_]: Async](
log: Option[Logger[F]],
tempDir: Option[Path]
) extends Zip[F] {
private[this] val logger = log.getOrElse(docspell.logging.Logger.offF[F])
private val createTempDir: Resource[F, Path] =
Files[F].tempDirectory(tempDir, "docspell-zip-", None)
def zip(chunkSize: Int): Pipe[F, (String, Stream[F, Byte]), Byte] =
in => ZipImpl.zipJava(logger, chunkSize, in.through(ZipImpl.deduplicate))
def zipFiles(chunkSize: Int): Pipe[F, (String, Path), Byte] =
in => ZipImpl.zipJavaPath(logger, chunkSize, in.through(ZipImpl.deduplicate))
def unzip(
chunkSize: Int,
glob: Glob,
targetDir: Option[Path]
): Pipe[F, Byte, Path] = { input =>
Stream
.resource(Files[F].tempFile(tempDir, "", ".zip", None))
.evalTap(tempFile => input.through(Files[F].writeAll(tempFile)).compile.drain)
.through(unzipFiles(chunkSize, glob, _ => targetDir))
}
def unzipFiles(
chunkSize: Int,
glob: Glob,
targetDir: Path => Option[Path]
): Pipe[F, Path, Path] =
input =>
for {
zipArchive <- input
tempDir <- targetDir(zipArchive)
.map(Stream.emit)
.getOrElse(Stream.resource(createTempDir))
entries <- Stream.eval(Sync[F].blocking {
ZipImpl.unzipZipFile(zipArchive, tempDir, glob)
})
e <- Stream.chunk(entries)
} yield e
}
object ZipImpl {
implicit val zipFileReleasable: Releasable[ZipFile] =
(resource: ZipFile) => resource.close()
private def unzipZipFile(zip: Path, target: Path, glob: Glob): Chunk[Path] =
Using.resource(new ZipFile(zip.toNioPath.toFile, StandardCharsets.UTF_8)) { zf =>
Chunk.iterator(
zf.entries()
.asScala
.filter(ze => !ze.getName.endsWith("/"))
.filter(ze => glob.matchFilenameOrPath(ze.getName))
.map { ze =>
val out = target / ze.getName
out.parent.map(_.toNioPath).foreach { p =>
java.nio.file.Files.createDirectories(p)
}
Using.resource(java.nio.file.Files.newOutputStream(out.toNioPath)) { fout =>
zf.getInputStream(ze).transferTo(fout)
out
}
}
)
}
// private def unzipZipStream(
// zip: InputStream,
// target: Path,
// glob: Glob
// ): List[Path] =
// Using.resource(new ZipInputStream(zip, StandardCharsets.UTF_8)) { zf =>
// @annotation.tailrec
// def go(entry: Option[ZipEntry], result: List[Path]): List[Path] =
// entry match {
// case Some(ze) if glob.matchFilenameOrPath(ze.getName) =>
// val out = target / ze.getName
// Using.resource(java.nio.file.Files.newOutputStream(out.toNioPath)) { fout =>
// zf.transferTo(fout)
// }
// zf.closeEntry()
// go(Option(zf.getNextEntry), out :: result)
// case Some(_) =>
// zf.closeEntry()
// go(Option(zf.getNextEntry), result)
// case None =>
// result
// }
//
// go(Option(zf.getNextEntry), Nil)
// }
// private def unzipStream2[F[_]: Async](
// in: InputStream,
// chunkSize: Int,
// glob: Glob
// ): Stream[F, Binary[F]] = {
// val zin = new ZipInputStream(in)
//
// val nextEntry = Resource.make(Sync[F].delay(Option(zin.getNextEntry))) {
// case Some(_) => Sync[F].delay(zin.closeEntry())
// case None => ().pure[F]
// }
//
// Stream
// .resource(nextEntry)
// .repeat
// .unNoneTerminate
// .filter(ze => glob.matchFilenameOrPath(ze.getName))
// .map { ze =>
// val name = ze.getName
// val data =
// fs2.io.readInputStream[F]((zin: InputStream).pure[F], chunkSize, false)
// Binary(name, data)
// }
// }
private def deduplicate[F[_]: Sync, A]: Pipe[F, (String, A), (String, A)] = {
def makeName(name: String, count: Int): String =
if (count <= 0) name
else
name.lastIndexOf('.') match {
case n if n > 0 =>
s"${name.substring(0, n)}_$count${name.substring(n)}"
case _ =>
s"${name}_$count"
}
@annotation.tailrec
def unique(
current: Set[String],
name: String,
counter: Int
): (Set[String], String) = {
val nextName = makeName(name, counter)
if (current.contains(nextName))
unique(current, name, counter + 1)
else (current + nextName, nextName)
}
in =>
Stream
.eval(Ref.of[F, Set[String]](Set.empty[String]))
.flatMap { ref =>
in.evalMap { element =>
ref
.modify(names => unique(names, element._1, 0))
.map(n => (n, element._2))
}
}
}
private def zipJava[F[_]: Async](
logger: Logger[F],
chunkSize: Int,
entries: Stream[F, (String, Stream[F, Byte])]
): Stream[F, Byte] =
fs2.io.readOutputStream(chunkSize) { out =>
val zip = new ZipOutputStream(out, StandardCharsets.UTF_8)
val writeEntries =
entries.evalMap { case (name, bytes) =>
val javaOut =
bytes.through(
fs2.io.writeOutputStream[F](Sync[F].pure(zip), closeAfterUse = false)
)
val nextEntry =
logger.debug(s"Adding $name to zip file…") *>
Sync[F].delay(zip.putNextEntry(new ZipEntry(name)))
Resource
.make(nextEntry)(_ => Sync[F].delay(zip.closeEntry()))
.use(_ => javaOut.compile.drain)
}
val closeStream = Sync[F].delay(zip.close())
writeEntries.onFinalize(closeStream).compile.drain
}
private def zipJavaPath[F[_]: Async](
logger: Logger[F],
chunkSize: Int,
entries: Stream[F, (String, Path)]
): Stream[F, Byte] =
fs2.io.readOutputStream(chunkSize) { out =>
val zip = new ZipOutputStream(out, StandardCharsets.UTF_8)
val writeEntries =
entries.evalMap { case (name, file) =>
val javaOut = Sync[F].blocking {
val fin = new BufferedInputStream(
java.nio.file.Files.newInputStream(file.toNioPath),
chunkSize
)
fin.transferTo(zip)
fin.close()
}
val nextEntry =
logger.debug(s"Adding $name to zip file…") *>
Sync[F].delay(zip.putNextEntry(new ZipEntry(name)))
Resource
.make(nextEntry)(_ => Sync[F].delay(zip.closeEntry()))
.use(_ => javaOut)
}
val closeStream = Sync[F].delay(zip.close())
writeEntries.onFinalize(closeStream).compile.drain
}
}

View File

@ -0,0 +1,114 @@
/*
* Copyright 2020 Eike K. & Contributors
*
* SPDX-License-Identifier: AGPL-3.0-or-later
*/
package docspell.common.util
import cats.effect._
import cats.syntax.all._
import fs2.io.file.{Files, Path}
import docspell.logging.TestLoggingConfig
import munit.CatsEffectSuite
class DirectoryTest extends CatsEffectSuite with TestLoggingConfig {
val logger = docspell.logging.getLogger[IO]
val tempDir = ResourceFixture(
Files[IO].tempDirectory(Path("target").some, "directory-test-", None)
)
tempDir.test("unwrap directory when non empty") { dir =>
for {
_ <- createDirectoryTree(dir, List("test/file1", "test/file2"))
r <- Directory.unwrapSingle1(logger, dir)
files <- Files[IO]
.list(dir)
.map(file => dir.relativize(file).toString)
.compile
.toVector
_ = {
assert(r)
assertEquals(files.sorted, Vector("file1", "file2"))
}
} yield ()
}
tempDir.test("unwrap directory when not empty repeat") { dir =>
for {
_ <- createDirectoryTree(dir, List("test/file1", "test/file2"))
r <- Directory.unwrapSingle(logger, dir)
files <- Files[IO]
.list(dir)
.map(file => dir.relativize(file).toString)
.compile
.toVector
_ = {
assert(r)
assertEquals(files.sorted, Vector("file1", "file2"))
}
} yield ()
}
tempDir.test("unwrap nested directory") { dir =>
for {
_ <- createDirectoryTree(dir, List("test0/test1/file1", "test0/test1/file2"))
r <- Directory.unwrapSingle(logger, dir)
files <- Files[IO]
.list(dir)
.map(file => dir.relativize(file).toString)
.compile
.toVector
_ = {
assert(r)
assertEquals(files.sorted, Vector("file1", "file2"))
}
} yield ()
}
tempDir.test("do nothing on empty directory") { dir =>
for {
r1 <- Directory.unwrapSingle1[IO](logger, dir)
r2 <- Directory.unwrapSingle[IO](logger, dir)
_ = {
assert(!r1)
assert(!r2)
}
} yield ()
}
tempDir.test("do nothing when directory contains more than one entry") { dir =>
for {
_ <- createDirectoryTree(dir, List("test1/file1", "file2"))
r1 <- Directory.unwrapSingle1[IO](logger, dir)
r2 <- Directory.unwrapSingle[IO](logger, dir)
_ = {
assert(!r1)
assert(!r2)
}
} yield ()
}
tempDir.test("do nothing when directory contains more than one entry (2)") { dir =>
for {
_ <- createDirectoryTree(dir, List("file1", "file2"))
r1 <- Directory.unwrapSingle1[IO](logger, dir)
r2 <- Directory.unwrapSingle[IO](logger, dir)
_ = {
assert(!r1)
assert(!r2)
}
} yield ()
}
def createDirectoryTree(dir: Path, entries: List[String]): IO[Unit] =
entries.traverse_ { name =>
val out = dir / name
out.parent
.map(p => Files[IO].createDirectories(p))
.getOrElse(IO.unit) *>
Files[IO].createFile(out)
}
}