mirror of
https://github.com/TheAnachronism/docspell.git
synced 2025-03-28 01:35:06 +00:00
Merge pull request #2378 from eikek/2376-processing-bin-files
Fix potential infinite loop
This commit is contained in:
commit
b181c57424
@ -12,8 +12,8 @@ import java.nio.charset.StandardCharsets
|
||||
import cats.data.OptionT
|
||||
import cats.effect._
|
||||
import cats.syntax.all._
|
||||
import fs2._
|
||||
import fs2.io.file.{Files, Path}
|
||||
import fs2.{Chunk, Pipe, Stream}
|
||||
|
||||
import docspell.logging.Logger
|
||||
|
||||
@ -64,11 +64,11 @@ object Binary {
|
||||
def html[F[_]](name: String, content: ByteVector, cs: Charset): Binary[F] =
|
||||
Binary(name, MimeType.html.withCharset(cs), Stream.chunk(Chunk.byteVector(content)))
|
||||
|
||||
def decode[F[_]](cs: Charset): Pipe[F, Byte, String] =
|
||||
def decode[F[_]: RaiseThrowable](cs: Charset): Pipe[F, Byte, String] =
|
||||
if (cs == StandardCharsets.UTF_8)
|
||||
fs2.text.utf8.decode
|
||||
else
|
||||
util.decode[F](cs)
|
||||
fs2.text.decodeWithCharset(cs)
|
||||
|
||||
def loadAllBytes[F[_]: Sync](data: Stream[F, Byte]): F[ByteVector] =
|
||||
data.chunks.map(_.toByteVector).compile.fold(ByteVector.empty)((r, e) => r ++ e)
|
||||
@ -104,49 +104,4 @@ object Binary {
|
||||
}
|
||||
.drain
|
||||
.as(targetDir)
|
||||
|
||||
// This is a copy from org.http4s.util
|
||||
// Http4s is licensed under the Apache License 2.0
|
||||
private object util {
|
||||
import fs2._
|
||||
import java.nio._
|
||||
|
||||
private val utf8Bom: Chunk[Byte] = Chunk(0xef.toByte, 0xbb.toByte, 0xbf.toByte)
|
||||
|
||||
def decode[F[_]](charset: Charset): Pipe[F, Byte, String] = {
|
||||
val decoder = charset.newDecoder
|
||||
val maxCharsPerByte = math.ceil(decoder.maxCharsPerByte().toDouble).toInt
|
||||
val avgBytesPerChar = math.ceil(1.0 / decoder.averageCharsPerByte().toDouble).toInt
|
||||
val charBufferSize = 128
|
||||
|
||||
_.repeatPull[String] {
|
||||
_.unconsN(charBufferSize * avgBytesPerChar, allowFewer = true).flatMap {
|
||||
case None =>
|
||||
val charBuffer = CharBuffer.allocate(1)
|
||||
decoder.decode(ByteBuffer.allocate(0), charBuffer, true)
|
||||
decoder.flush(charBuffer)
|
||||
val outputString = charBuffer.flip().toString
|
||||
if (outputString.isEmpty) Pull.done.as(None)
|
||||
else Pull.output1(outputString).as(None)
|
||||
case Some((chunk, stream)) =>
|
||||
if (chunk.nonEmpty) {
|
||||
val chunkWithoutBom = skipByteOrderMark(chunk)
|
||||
val bytes = chunkWithoutBom.toArray
|
||||
val byteBuffer = ByteBuffer.wrap(bytes)
|
||||
val charBuffer = CharBuffer.allocate(bytes.length * maxCharsPerByte)
|
||||
decoder.decode(byteBuffer, charBuffer, false)
|
||||
val nextStream = stream.consChunk(Chunk.byteBuffer(byteBuffer.slice()))
|
||||
Pull.output1(charBuffer.flip().toString).as(Some(nextStream))
|
||||
} else
|
||||
Pull.output(Chunk.empty[String]).as(Some(stream))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private def skipByteOrderMark[F[_]](chunk: Chunk[Byte]): Chunk[Byte] =
|
||||
if (chunk.size >= 3 && chunk.take(3) == utf8Bom)
|
||||
chunk.drop(3)
|
||||
else chunk
|
||||
|
||||
}
|
||||
}
|
||||
|
@ -20,21 +20,20 @@ object ResourceUse {
|
||||
object Implicits {
|
||||
implicit final class UseSyntax[F[_]: Concurrent, A](resource: Resource[F, A]) {
|
||||
|
||||
/** Evaluates `resource` endlessly or until the signal turns `true`. */
|
||||
def useUntil(
|
||||
def useWhile(
|
||||
signal: Signal[F, Boolean],
|
||||
returnValue: Ref[F, ExitCode]
|
||||
): F[ExitCode] = {
|
||||
): Stream[F, ExitCode] = {
|
||||
val server = Stream.resource(resource)
|
||||
val blockUntilTrue = signal.discrete.takeWhile(_ == false).drain
|
||||
val exit = fs2.Stream.eval(returnValue.get)
|
||||
(server *> (blockUntilTrue ++ exit)).compile.lastOrError
|
||||
val exit = Stream.eval(returnValue.get)
|
||||
server *> (blockUntilTrue ++ exit)
|
||||
}
|
||||
|
||||
def useForever(implicit ev: Async[F]): F[ExitCode] = for {
|
||||
termSignal <- SignallingRef.of[F, Boolean](false)
|
||||
exitValue <- Ref.of(ExitCode.Success)
|
||||
rc <- useUntil(termSignal, exitValue)
|
||||
def useForever(implicit ev: Async[F]): Stream[F, ExitCode] = for {
|
||||
termSignal <- Stream.eval(SignallingRef.of[F, Boolean](false))
|
||||
exitValue <- Stream.eval(Ref.of(ExitCode.Success))
|
||||
rc <- useWhile(termSignal, exitValue)
|
||||
} yield rc
|
||||
}
|
||||
}
|
||||
|
@ -52,7 +52,7 @@ object Markdown {
|
||||
cfg: MarkdownConfig,
|
||||
cs: Charset
|
||||
): F[String] =
|
||||
data.through(Binary.decode(cs)).compile.foldMonoid.map(str => toHtml(str, cfg))
|
||||
data.through(Binary.decode(cs)).compile.string.map(str => toHtml(str, cfg))
|
||||
|
||||
private def wrapHtml(body: String, cfg: MarkdownConfig): String =
|
||||
s"""<!DOCTYPE html>
|
||||
|
@ -75,14 +75,14 @@ object JoexServer {
|
||||
|
||||
Stream
|
||||
.resource(app)
|
||||
.evalMap { app =>
|
||||
.flatMap { app =>
|
||||
EmberServerBuilder
|
||||
.default[F]
|
||||
.withHost(cfg.bind.address)
|
||||
.withPort(cfg.bind.port)
|
||||
.withHttpApp(app.httpApp)
|
||||
.build
|
||||
.useUntil(app.termSig, app.exitRef)
|
||||
.useWhile(app.termSig, app.exitRef)
|
||||
}
|
||||
}.drain
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user