mirror of
https://github.com/TheAnachronism/docspell.git
synced 2025-06-22 02:18:26 +00:00
Extend config for external commands (#2536)
Allows to configure external commands and provide different arguments based on runtime values, like language. It extends the current config of a command to allow a `arg-mappings` section. An example for ocrmypdf: ```conf ocrmypdf = { enabled = true command = { program = "ocrmypdf" ### new arg-mappings arg-mappings = { "mylang" = { value = "{{lang}}" mappings = [ { matches = "deu" args = [ "-l", "deu", "--pdf-renderer", "sandwich" ] }, { matches = ".*" args = [ "-l", "{{lang}}" ] } ] } } #### end new arg-mappings args = [ ### will be replaced with corresponding args from "mylang" mapping "{{mylang}}", "--skip-text", "--deskew", "-j", "1", "{{infile}}", "{{outfile}}" ] timeout = "5 minutes" } working-dir = ${java.io.tmpdir}"/docspell-convert" } ``` The whole section will be first processed to replace all `{{…}}` patterns with corresponding values. Then `arg-mappings` will be looked at and the first match (value == matches) in its `mappings` array is used to replace its name in the arguments to the command.
This commit is contained in:
@ -1,212 +0,0 @@
|
||||
/*
|
||||
* Copyright 2020 Eike K. & Contributors
|
||||
*
|
||||
* SPDX-License-Identifier: AGPL-3.0-or-later
|
||||
*/
|
||||
|
||||
package docspell.common
|
||||
|
||||
import java.io.InputStream
|
||||
import java.lang.ProcessBuilder.Redirect
|
||||
import java.util.concurrent.TimeUnit
|
||||
|
||||
import scala.jdk.CollectionConverters._
|
||||
|
||||
import cats.effect._
|
||||
import cats.implicits._
|
||||
import fs2.io.file.Path
|
||||
import fs2.{Stream, io, text}
|
||||
|
||||
import docspell.common.{exec => newExec}
|
||||
import docspell.logging.Logger
|
||||
|
||||
// better use `SysCmd` and `SysExec`
|
||||
object SystemCommand {
|
||||
|
||||
final case class Config(
|
||||
program: String,
|
||||
args: Seq[String],
|
||||
timeout: Duration,
|
||||
env: Map[String, String] = Map.empty
|
||||
) {
|
||||
|
||||
def toSysCmd = newExec
|
||||
.SysCmd(program, newExec.Args(args))
|
||||
.withTimeout(timeout)
|
||||
.addEnv(newExec.Env(env))
|
||||
|
||||
def mapArgs(f: String => String): Config =
|
||||
Config(program, args.map(f), timeout)
|
||||
|
||||
def replace(repl: Map[String, String]): Config =
|
||||
mapArgs(s =>
|
||||
repl.foldLeft(s) { case (res, (k, v)) =>
|
||||
res.replace(k, v)
|
||||
}
|
||||
)
|
||||
|
||||
def withEnv(key: String, value: String): Config =
|
||||
copy(env = env.updated(key, value))
|
||||
|
||||
def addEnv(moreEnv: Map[String, String]): Config =
|
||||
copy(env = env ++ moreEnv)
|
||||
|
||||
def appendArgs(extraArgs: Args): Config =
|
||||
copy(args = args ++ extraArgs.args)
|
||||
|
||||
def appendArgs(extraArgs: Seq[String]): Config =
|
||||
copy(args = args ++ extraArgs)
|
||||
|
||||
def toCmd: List[String] =
|
||||
program :: args.toList
|
||||
|
||||
lazy val cmdString: String =
|
||||
toCmd.mkString(" ")
|
||||
}
|
||||
|
||||
final case class Args(args: Vector[String]) extends Iterable[String] {
|
||||
override def iterator = args.iterator
|
||||
|
||||
def prepend(a: String): Args = Args(a +: args)
|
||||
|
||||
def prependWhen(flag: Boolean)(a: String): Args =
|
||||
prependOption(Option.when(flag)(a))
|
||||
|
||||
def prependOption(value: Option[String]): Args =
|
||||
value.map(prepend).getOrElse(this)
|
||||
|
||||
def append(a: String, as: String*): Args =
|
||||
Args(args ++ (a +: as.toVector))
|
||||
|
||||
def appendOption(value: Option[String]): Args =
|
||||
value.map(append(_)).getOrElse(this)
|
||||
|
||||
def appendOptionVal(first: String, second: Option[String]): Args =
|
||||
second.map(b => append(first, b)).getOrElse(this)
|
||||
|
||||
def appendWhen(flag: Boolean)(a: String, as: String*): Args =
|
||||
if (flag) append(a, as: _*) else this
|
||||
|
||||
def appendWhenNot(flag: Boolean)(a: String, as: String*): Args =
|
||||
if (!flag) append(a, as: _*) else this
|
||||
|
||||
def append(p: Path): Args =
|
||||
append(p.toString)
|
||||
|
||||
def append(as: Iterable[String]): Args =
|
||||
Args(args ++ as.toVector)
|
||||
}
|
||||
object Args {
|
||||
val empty: Args = Args()
|
||||
|
||||
def apply(as: String*): Args =
|
||||
Args(as.toVector)
|
||||
}
|
||||
|
||||
final case class Result(rc: Int, stdout: String, stderr: String)
|
||||
|
||||
def exec[F[_]: Sync](
|
||||
cmd: Config,
|
||||
logger: Logger[F],
|
||||
wd: Option[Path] = None,
|
||||
stdin: Stream[F, Byte] = Stream.empty
|
||||
): Stream[F, Result] =
|
||||
startProcess(cmd, wd, logger, stdin) { proc =>
|
||||
Stream.eval {
|
||||
for {
|
||||
_ <- writeToProcess(stdin, proc)
|
||||
term <- Sync[F].blocking(proc.waitFor(cmd.timeout.seconds, TimeUnit.SECONDS))
|
||||
_ <-
|
||||
if (term)
|
||||
logger.debug(s"Command `${cmd.cmdString}` finished: ${proc.exitValue}")
|
||||
else
|
||||
logger.warn(
|
||||
s"Command `${cmd.cmdString}` did not finish in ${cmd.timeout.formatExact}!"
|
||||
)
|
||||
_ <- if (!term) timeoutError(proc, cmd) else Sync[F].pure(())
|
||||
out <-
|
||||
if (term) inputStreamToString(proc.getInputStream)
|
||||
else Sync[F].pure("")
|
||||
err <-
|
||||
if (term) inputStreamToString(proc.getErrorStream)
|
||||
else Sync[F].pure("")
|
||||
} yield Result(proc.exitValue, out, err)
|
||||
}
|
||||
}
|
||||
|
||||
def execSuccess[F[_]: Sync](
|
||||
cmd: Config,
|
||||
logger: Logger[F],
|
||||
wd: Option[Path] = None,
|
||||
stdin: Stream[F, Byte] = Stream.empty
|
||||
): Stream[F, Result] =
|
||||
exec(cmd, logger, wd, stdin).flatMap { r =>
|
||||
if (r.rc != 0)
|
||||
Stream.raiseError[F](
|
||||
new Exception(
|
||||
s"Command `${cmd.cmdString}` returned non-zero exit code ${r.rc}. Stderr: ${r.stderr}"
|
||||
)
|
||||
)
|
||||
else Stream.emit(r)
|
||||
}
|
||||
|
||||
private def startProcess[F[_]: Sync, A](
|
||||
cmd: Config,
|
||||
wd: Option[Path],
|
||||
logger: Logger[F],
|
||||
stdin: Stream[F, Byte]
|
||||
)(
|
||||
f: Process => Stream[F, A]
|
||||
): Stream[F, A] = {
|
||||
val log = logger.debug(s"Running external command: ${cmd.cmdString}")
|
||||
val hasStdin = stdin.take(1).compile.last.map(_.isDefined)
|
||||
val proc = log *> hasStdin.flatMap(flag =>
|
||||
Sync[F].blocking {
|
||||
val pb = new ProcessBuilder(cmd.toCmd.asJava)
|
||||
.redirectInput(if (flag) Redirect.PIPE else Redirect.INHERIT)
|
||||
.redirectError(Redirect.PIPE)
|
||||
.redirectOutput(Redirect.PIPE)
|
||||
|
||||
val pbEnv = pb.environment()
|
||||
cmd.env.foreach { case (key, value) =>
|
||||
pbEnv.put(key, value)
|
||||
}
|
||||
wd.map(_.toNioPath.toFile).foreach(pb.directory)
|
||||
pb.start()
|
||||
}
|
||||
)
|
||||
Stream
|
||||
.bracket(proc)(p =>
|
||||
logger.debug(s"Closing process: `${cmd.cmdString}`").map(_ => p.destroy())
|
||||
)
|
||||
.flatMap(f)
|
||||
}
|
||||
|
||||
private def inputStreamToString[F[_]: Sync](in: InputStream): F[String] =
|
||||
io.readInputStream(Sync[F].pure(in), 16 * 1024, closeAfterUse = false)
|
||||
.through(text.utf8.decode)
|
||||
.chunks
|
||||
.map(_.toVector.mkString)
|
||||
.fold1(_ + _)
|
||||
.compile
|
||||
.last
|
||||
.map(_.getOrElse(""))
|
||||
|
||||
private def writeToProcess[F[_]: Sync](
|
||||
data: Stream[F, Byte],
|
||||
proc: Process
|
||||
): F[Unit] =
|
||||
data
|
||||
.through(io.writeOutputStream(Sync[F].blocking(proc.getOutputStream)))
|
||||
.compile
|
||||
.drain
|
||||
|
||||
private def timeoutError[F[_]: Sync](proc: Process, cmd: Config): F[Unit] =
|
||||
Sync[F].blocking(proc.destroyForcibly()).attempt *> {
|
||||
Sync[F].raiseError(
|
||||
new Exception(
|
||||
s"Command `${cmd.cmdString}` timed out (${cmd.timeout.formatExact})"
|
||||
)
|
||||
)
|
||||
}
|
||||
}
|
@ -17,6 +17,9 @@ case class Env(values: Map[String, String]) {
|
||||
def addAll(e: Env): Env =
|
||||
Env(values ++ e.values)
|
||||
|
||||
def modifyValue(f: String => String): Env =
|
||||
Env(values.view.mapValues(f).toMap)
|
||||
|
||||
def ++(e: Env) = addAll(e)
|
||||
|
||||
def foreach(f: (String, String) => Unit): Unit =
|
||||
|
@ -0,0 +1,89 @@
|
||||
/*
|
||||
* Copyright 2020 Eike K. & Contributors
|
||||
*
|
||||
* SPDX-License-Identifier: AGPL-3.0-or-later
|
||||
*/
|
||||
|
||||
package docspell.common.exec
|
||||
|
||||
import docspell.common.Duration
|
||||
import docspell.common.Ident
|
||||
import docspell.common.exec.Env
|
||||
import docspell.common.exec.ExternalCommand.ArgMapping
|
||||
import docspell.common.exec.SysCmd
|
||||
|
||||
final case class ExternalCommand(
|
||||
program: String,
|
||||
args: Seq[String],
|
||||
timeout: Duration,
|
||||
env: Map[String, String] = Map.empty,
|
||||
argMappings: Map[Ident, ArgMapping] = Map.empty
|
||||
) {
|
||||
def withVars(vars: Map[String, String]): ExternalCommand.WithVars =
|
||||
ExternalCommand.WithVars(this, vars)
|
||||
|
||||
import ExternalCommand.pattern
|
||||
|
||||
def resolve(vars: Map[String, String]): SysCmd = {
|
||||
val replace = ExternalCommand.replaceString(vars) _
|
||||
val resolvedArgMappings =
|
||||
argMappings.view.mapValues(_.resolve(replace).firstMatch).toMap
|
||||
val resolvedArgs = args.map(replace).flatMap { arg =>
|
||||
resolvedArgMappings
|
||||
.find(e => pattern(e._1.id) == arg)
|
||||
.map(_._2)
|
||||
.getOrElse(List(arg))
|
||||
}
|
||||
|
||||
SysCmd(replace(program), resolvedArgs: _*)
|
||||
.withTimeout(timeout)
|
||||
.withEnv(_ => Env(env).modifyValue(replace))
|
||||
}
|
||||
}
|
||||
|
||||
object ExternalCommand {
|
||||
private val openPattern = "{{"
|
||||
private val closePattern = "}}"
|
||||
|
||||
private def pattern(s: String): String = s"${openPattern}${s}${closePattern}"
|
||||
|
||||
def apply(program: String, args: Seq[String], timeout: Duration): ExternalCommand =
|
||||
ExternalCommand(program, args, timeout, Map.empty, Map.empty)
|
||||
|
||||
final case class ArgMapping(
|
||||
value: String,
|
||||
mappings: List[ArgMatch]
|
||||
) {
|
||||
private[exec] def resolve(replace: String => String): ArgMapping =
|
||||
ArgMapping(replace(value), mappings.map(_.resolve(replace)))
|
||||
|
||||
def firstMatch: List[String] =
|
||||
mappings.find(am => value.matches(am.matches)).map(_.args).getOrElse(Nil)
|
||||
}
|
||||
|
||||
final case class ArgMatch(
|
||||
matches: String,
|
||||
args: List[String]
|
||||
) {
|
||||
private[exec] def resolve(replace: String => String): ArgMatch =
|
||||
ArgMatch(replace(matches), args.map(replace))
|
||||
}
|
||||
|
||||
private def replaceString(vars: Map[String, String])(in: String): String =
|
||||
vars.foldLeft(in) { case (result, (name, value)) =>
|
||||
val key = s"{{$name}}"
|
||||
result.replace(key, value)
|
||||
}
|
||||
|
||||
final case class WithVars(cmd: ExternalCommand, vars: Map[String, String]) {
|
||||
def resolved: SysCmd = cmd.resolve(vars)
|
||||
def append(more: (String, String)*): WithVars =
|
||||
WithVars(cmd, vars ++ more.toMap)
|
||||
|
||||
def withVar(key: String, value: String): WithVars =
|
||||
WithVars(cmd, vars.updated(key, value))
|
||||
|
||||
def withVarOption(key: String, value: Option[String]): WithVars =
|
||||
value.map(withVar(key, _)).getOrElse(this)
|
||||
}
|
||||
}
|
@ -38,6 +38,20 @@ trait SysExec[F[_]] {
|
||||
|
||||
def waitFor(timeout: Option[Duration] = None): F[Int]
|
||||
|
||||
/** Uses `waitFor` and throws when return code is non-zero. Logs stderr and stdout while
|
||||
* waiting.
|
||||
*/
|
||||
def runToSuccess(logger: Logger[F], timeout: Option[Duration] = None)(implicit
|
||||
F: Async[F]
|
||||
): F[Int]
|
||||
|
||||
/** Uses `waitFor` and throws when return code is non-zero. Logs stderr while waiting
|
||||
* and collects stdout once finished successfully.
|
||||
*/
|
||||
def runToSuccessStdout(logger: Logger[F], timeout: Option[Duration] = None)(implicit
|
||||
F: Async[F]
|
||||
): F[String]
|
||||
|
||||
/** Sends a signal to the process to terminate it immediately */
|
||||
def cancel: F[Unit]
|
||||
|
||||
@ -75,6 +89,12 @@ object SysExec {
|
||||
proc <- startProcess(logger, cmd, workdir, stdin)
|
||||
fibers <- Resource.eval(Ref.of[F, List[F[Unit]]](Nil))
|
||||
} yield new SysExec[F] {
|
||||
private lazy val basicName: String =
|
||||
cmd.program.lastIndexOf(java.io.File.separatorChar.toInt) match {
|
||||
case n if n > 0 => cmd.program.drop(n + 1)
|
||||
case _ => cmd.program.takeRight(16)
|
||||
}
|
||||
|
||||
def stdout: Stream[F, Byte] =
|
||||
fs2.io.readInputStream(
|
||||
Sync[F].blocking(proc.getInputStream),
|
||||
@ -107,6 +127,39 @@ object SysExec {
|
||||
)
|
||||
}
|
||||
|
||||
def runToSuccess(logger: Logger[F], timeout: Option[Duration])(implicit
|
||||
F: Async[F]
|
||||
): F[Int] =
|
||||
logOutputs(logger, basicName).use(_.waitFor(timeout).flatMap {
|
||||
case rc if rc == 0 => Sync[F].pure(0)
|
||||
case rc =>
|
||||
Sync[F].raiseError(
|
||||
new Exception(s"Command `${cmd.program}` returned non-zero exit code ${rc}")
|
||||
)
|
||||
})
|
||||
|
||||
def runToSuccessStdout(logger: Logger[F], timeout: Option[Duration])(implicit
|
||||
F: Async[F]
|
||||
): F[String] =
|
||||
F.background(
|
||||
stderrLines
|
||||
.through(line => Stream.eval(logger.debug(s"[$basicName (err)]: $line")))
|
||||
.compile
|
||||
.drain
|
||||
).use { f1 =>
|
||||
waitFor(timeout)
|
||||
.flatMap {
|
||||
case rc if rc == 0 => stdout.through(fs2.text.utf8.decode).compile.string
|
||||
case rc =>
|
||||
Sync[F].raiseError[String](
|
||||
new Exception(
|
||||
s"Command `${cmd.program}` returned non-zero exit code ${rc}"
|
||||
)
|
||||
)
|
||||
}
|
||||
.flatTap(_ => f1)
|
||||
}
|
||||
|
||||
def consumeOutputs(out: Pipe[F, String, Unit], err: Pipe[F, String, Unit])(implicit
|
||||
F: Async[F]
|
||||
): Resource[F, SysExec[F]] =
|
||||
|
@ -0,0 +1,74 @@
|
||||
/*
|
||||
* Copyright 2020 Eike K. & Contributors
|
||||
*
|
||||
* SPDX-License-Identifier: AGPL-3.0-or-later
|
||||
*/
|
||||
|
||||
package docspell.common.exec
|
||||
|
||||
import docspell.common.Duration
|
||||
import docspell.common.Ident
|
||||
import docspell.common.exec.Args
|
||||
import docspell.common.exec.Env
|
||||
import docspell.common.exec.ExternalCommand._
|
||||
import docspell.common.exec.SysCmd
|
||||
|
||||
import munit.FunSuite
|
||||
|
||||
class ExternalCommandTest extends FunSuite {
|
||||
|
||||
test("resolve") {
|
||||
val cmd = ExternalCommand(
|
||||
program = "tesseract",
|
||||
args = "{{infile}}" :: "{{lang-spec}}" :: "out" :: "pdf" :: "txt" :: Nil,
|
||||
timeout = Duration.minutes(5),
|
||||
env = Map.empty,
|
||||
argMappings = Map(
|
||||
Ident.unsafe("lang-spec") -> ArgMapping(
|
||||
value = "{{lang}}",
|
||||
mappings = List(
|
||||
ArgMatch(
|
||||
matches = "jpn_vert",
|
||||
args = List("-l", "jpn_vert", "-c", "preserve_interword_spaces=1")
|
||||
),
|
||||
ArgMatch(
|
||||
matches = ".*",
|
||||
args = List("-l", "{{lang}}")
|
||||
)
|
||||
)
|
||||
)
|
||||
)
|
||||
)
|
||||
|
||||
val varsDe = Map("lang" -> "de", "encoding" -> "UTF_8", "infile" -> "text.jpg")
|
||||
assertEquals(
|
||||
cmd.resolve(varsDe),
|
||||
SysCmd(
|
||||
"tesseract",
|
||||
Args.of("text.jpg", "-l", "de", "out", "pdf", "txt"),
|
||||
Env.empty,
|
||||
Duration.minutes(5)
|
||||
)
|
||||
)
|
||||
|
||||
val varsJpnVert = varsDe.updated("lang", "jpn_vert")
|
||||
assertEquals(
|
||||
cmd.resolve(varsJpnVert),
|
||||
SysCmd(
|
||||
"tesseract",
|
||||
Args.of(
|
||||
"text.jpg",
|
||||
"-l",
|
||||
"jpn_vert",
|
||||
"-c",
|
||||
"preserve_interword_spaces=1",
|
||||
"out",
|
||||
"pdf",
|
||||
"txt"
|
||||
),
|
||||
Env.empty,
|
||||
Duration.minutes(5)
|
||||
)
|
||||
)
|
||||
}
|
||||
}
|
Reference in New Issue
Block a user