Merge pull request #547 from eikek/joex-memory

Joex memory
This commit is contained in:
mergify[bot] 2021-01-06 23:09:01 +00:00 committed by GitHub
commit b28cf74a72
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 332 additions and 34 deletions

View File

@ -446,7 +446,9 @@ val joex = project
addCompilerPlugin(Dependencies.betterMonadicFor),
buildInfoPackage := "docspell.joex",
reStart / javaOptions ++= Seq(
s"-Dconfig.file=${(LocalRootProject / baseDirectory).value / "local" / "dev.conf"}"
s"-Dconfig.file=${(LocalRootProject / baseDirectory).value / "local" / "dev.conf"}",
"-Xmx1596M",
"-XX:+UseG1GC"
),
Revolver.enableDebugging(port = 5051, suspend = false)
)
@ -494,7 +496,9 @@ val restserver = project
(Compile / resourceDirectory).value.getParentFile / "templates"
),
reStart / javaOptions ++= Seq(
s"-Dconfig.file=${(LocalRootProject / baseDirectory).value / "local" / "dev.conf"}"
s"-Dconfig.file=${(LocalRootProject / baseDirectory).value / "local" / "dev.conf"}",
"-Xmx150M",
"-XX:+UseG1GC"
),
Revolver.enableDebugging(port = 5050, suspend = false)
)

View File

@ -31,9 +31,11 @@ object TextAnalyser {
labels ++ dates.map(dl => dl.label.copy(label = dl.date.toString))
}
def create[F[_]: Sync](cfg: TextAnalysisConfig): Resource[F, TextAnalyser[F]] =
def create[F[_]: Concurrent: Timer](
cfg: TextAnalysisConfig
): Resource[F, TextAnalyser[F]] =
Resource
.liftF(PipelineCache[F]())
.liftF(PipelineCache[F](cfg.clearStanfordPipelineInterval))
.map(cache =>
new TextAnalyser[F] {
def annotate(

View File

@ -1,8 +1,10 @@
package docspell.analysis
import docspell.analysis.nlp.TextClassifierConfig
import docspell.common._
case class TextAnalysisConfig(
maxLength: Int,
clearStanfordPipelineInterval: Duration,
classifier: TextClassifierConfig
)

View File

@ -1,5 +1,7 @@
package docspell.analysis.nlp
import scala.concurrent.duration.{Duration => _, _}
import cats.Applicative
import cats.effect._
import cats.effect.concurrent.Ref
@ -19,7 +21,7 @@ import org.log4s.getLogger
*/
trait PipelineCache[F[_]] {
def obtain(key: String, settings: StanfordNerSettings): F[StanfordCoreNLP]
def obtain(key: String, settings: StanfordNerSettings): Resource[F, StanfordCoreNLP]
}
@ -28,20 +30,29 @@ object PipelineCache {
def none[F[_]: Applicative]: PipelineCache[F] =
new PipelineCache[F] {
def obtain(ignored: String, settings: StanfordNerSettings): F[StanfordCoreNLP] =
makeClassifier(settings).pure[F]
def obtain(
ignored: String,
settings: StanfordNerSettings
): Resource[F, StanfordCoreNLP] =
Resource.liftF(makeClassifier(settings).pure[F])
}
def apply[F[_]: Sync](): F[PipelineCache[F]] =
Ref.of(Map.empty[String, Entry]).map(data => (new Impl[F](data): PipelineCache[F]))
final private class Impl[F[_]: Sync](data: Ref[F, Map[String, Entry]])
extends PipelineCache[F] {
def obtain(key: String, settings: StanfordNerSettings): F[StanfordCoreNLP] =
def apply[F[_]: Concurrent: Timer](clearInterval: Duration): F[PipelineCache[F]] =
for {
id <- makeSettingsId(settings)
nlp <- data.modify(cache => getOrCreate(key, id, cache, settings))
data <- Ref.of(Map.empty[String, Entry])
cacheClear <- CacheClearing.create(data, clearInterval)
} yield new Impl[F](data, cacheClear)
final private class Impl[F[_]: Sync](
data: Ref[F, Map[String, Entry]],
cacheClear: CacheClearing[F]
) extends PipelineCache[F] {
def obtain(key: String, settings: StanfordNerSettings): Resource[F, StanfordCoreNLP] =
for {
_ <- cacheClear.withCache
id <- Resource.liftF(makeSettingsId(settings))
nlp <- Resource.liftF(data.modify(cache => getOrCreate(key, id, cache, settings)))
} yield nlp
private def getOrCreate(
@ -81,6 +92,79 @@ object PipelineCache {
}
}
trait CacheClearing[F[_]] {
def withCache: Resource[F, Unit]
}
object CacheClearing {
def none[F[_]: Applicative]: CacheClearing[F] =
new CacheClearing[F] {
def withCache: Resource[F, Unit] =
Resource.pure[F, Unit](())
}
def create[F[_]: Concurrent: Timer](
data: Ref[F, Map[String, Entry]],
interval: Duration
): F[CacheClearing[F]] =
for {
counter <- Ref.of(0L)
cleaning <- Ref.of(None: Option[Fiber[F, Unit]])
log = Logger.log4s(logger)
result <-
if (interval.millis <= 0)
log
.info("Disable clearing StanfordNLP cache, due to config setting")
.map(_ => none[F])
else
log
.info(s"Clearing StanfordNLP cache after $interval idle time")
.map(_ =>
new CacheClearingImpl[F](data, counter, cleaning, interval.toScala)
)
} yield result
}
final private class CacheClearingImpl[F[_]](
data: Ref[F, Map[String, Entry]],
counter: Ref[F, Long],
cleaningFiber: Ref[F, Option[Fiber[F, Unit]]],
clearInterval: FiniteDuration
)(implicit T: Timer[F], F: Concurrent[F])
extends CacheClearing[F] {
private[this] val log = Logger.log4s[F](logger)
def withCache: Resource[F, Unit] =
Resource.make(counter.update(_ + 1))(_ =>
counter.updateAndGet(_ - 1).flatMap(n => scheduleClearPipeline(n))
)
def scheduleClearPipeline(cnt: Long): F[Unit] =
if (cnt > 0) cancelClear
else cancelClear *> clearAllLater.flatMap(fiber => cleaningFiber.set(fiber.some))
private def cancelClear: F[Unit] =
cleaningFiber.getAndSet(None).flatMap {
case Some(fiber) => fiber.cancel *> logDontClear
case None => ().pure[F]
}
private def clearAllLater: F[Fiber[F, Unit]] =
F.start(T.sleep(clearInterval) *> clearAll)
private def logDontClear: F[Unit] =
log.info("Cancel stanford cache clearing, as it has been used in between.")
def clearAll: F[Unit] =
log.info("Clearing stanford nlp cache now!") *>
data.set(Map.empty) *> Sync[F].delay {
// turns out that everything is cached in a static map
StanfordCoreNLP.clearAnnotatorPool()
System.gc();
}
}
private def makeClassifier(settings: StanfordNerSettings): StanfordCoreNLP = {
logger.info(s"Creating ${settings.lang.name} Stanford NLP NER classifier...")
new StanfordCoreNLP(Properties.forSettings(settings))

View File

@ -1,12 +1,10 @@
package docspell.analysis.nlp
import scala.jdk.CollectionConverters._
import cats.Applicative
import cats.implicits._
import scala.jdk.CollectionConverters._
import cats.effect._
import docspell.common._
import edu.stanford.nlp.pipeline.{CoreDocument, StanfordCoreNLP}
object StanfordNerClassifier {
@ -22,13 +20,13 @@ object StanfordNerClassifier {
* a new classifier must be created. It will then replace the
* previous one.
*/
def nerAnnotate[F[_]: Applicative](
def nerAnnotate[F[_]: BracketThrow](
cacheKey: String,
cache: PipelineCache[F]
)(settings: StanfordNerSettings, text: String): F[Vector[NerLabel]] =
cache
.obtain(cacheKey, settings)
.map(crf => runClassifier(crf, text))
.use(crf => Applicative[F].pure(runClassifier(crf, text)))
def runClassifier(nerClassifier: StanfordCoreNLP, text: String): Vector[NerLabel] = {
val doc = new CoreDocument(text)

View File

@ -0,0 +1,69 @@
package docspell.common
import io.circe.Decoder
import io.circe.Encoder
final case class ByteSize(bytes: Long) {
def toHuman: String =
ByteSize.bytesToHuman(bytes)
def <=(other: ByteSize) =
bytes <= other.bytes
def >=(other: ByteSize) =
bytes >= other.bytes
def >(other: ByteSize) =
bytes > other.bytes
def -(other: ByteSize) =
ByteSize(bytes - other.bytes)
def +(other: ByteSize) =
ByteSize(bytes + other.bytes)
}
object ByteSize {
val zero = ByteSize(0L)
def bytesToHuman(bytes: Long): String =
if (math.abs(bytes) < 1024 && bytes != Long.MinValue) s"${bytes}B"
else {
val k = bytes / 1024.0
if (math.abs(k) < 1024) f"$k%.02fK"
else {
val m = k / 1024.0
if (math.abs(m) < 1024) f"$m%.02fM"
else f"${m / 1024.0}%.02fG"
}
}
def parse(str: String): Either[String, ByteSize] =
str.toLongOption
.map(ByteSize.apply)
.toRight(s"Not a valid size string: $str")
.orElse(span(str.toLowerCase) match {
case (num, "k") =>
Right(ByteSize(math.round(num.toDouble * 1024)))
case (num, "m") =>
Right(ByteSize(math.round(num.toDouble * 1024 * 1024)))
case (num, "g") =>
Right(ByteSize(math.round(num.toDouble * 1024 * 1024 * 1024)))
case _ =>
Left(s"Invalid byte string: $str")
})
private def span(str: String): (String, String) =
if (str.isEmpty) ("", "")
else (str.init, str.last.toString)
def unsafe(str: String): ByteSize =
parse(str).fold(sys.error, identity)
implicit val jsonDecoder: Decoder[ByteSize] =
Decoder.decodeLong.map(ByteSize.apply)
implicit val jsonEncoder: Encoder[ByteSize] =
Encoder.encodeLong.contramap(_.bytes)
}

View File

@ -0,0 +1,103 @@
package docspell.common
import java.time.Instant
import scala.jdk.CollectionConverters._
import cats.effect._
import cats.implicits._
import io.circe.generic.semiauto.{deriveDecoder, deriveEncoder}
import io.circe.{Decoder, Encoder}
case class JvmInfo(
id: Ident,
pidHost: String,
ncpu: Int,
inputArgs: List[String],
libraryPath: String,
specVendor: String,
specVersion: String,
startTime: Timestamp,
uptime: Duration,
vmName: String,
vmVendor: String,
vmVersion: String,
heapUsage: JvmInfo.MemoryUsage,
props: Map[String, String]
)
object JvmInfo {
def create[F[_]: Sync](id: Ident): F[JvmInfo] =
MemoryUsage.createHeap[F].flatMap { mu =>
Sync[F].delay {
val rmb = management.ManagementFactory.getRuntimeMXBean()
val rt = Runtime.getRuntime()
JvmInfo(
id,
pidHost = rmb.getName(),
ncpu = rt.availableProcessors(),
inputArgs = rmb.getInputArguments().asScala.toList,
libraryPath = rmb.getLibraryPath(),
specVendor = rmb.getSpecVendor(),
specVersion = rmb.getSpecVersion(),
startTime = Timestamp(Instant.ofEpochMilli(rmb.getStartTime())),
uptime = Duration.millis(rmb.getUptime()),
vmName = rmb.getVmName(),
vmVendor = rmb.getVmVendor(),
vmVersion = rmb.getVmVersion(),
heapUsage = mu,
props = rmb.getSystemProperties().asScala.toMap
)
}
}
case class MemoryUsage(
init: Long,
used: Long,
comitted: Long,
max: Long,
free: Long,
description: String
)
object MemoryUsage {
def apply(init: Long, used: Long, comitted: Long, max: Long): MemoryUsage = {
def str(n: Long) = ByteSize(n).toHuman
val free = max - used
val descr =
s"init=${str(init)}, used=${str(used)}, comitted=${str(comitted)}, max=${str(max)}, free=${str(free)}"
MemoryUsage(init, used, comitted, max, free, descr)
}
val empty = MemoryUsage(0, 0, 0, 0)
def createHeap[F[_]: Sync]: F[MemoryUsage] =
Sync[F].delay {
val mxb = management.ManagementFactory.getMemoryMXBean()
val heap = mxb.getHeapMemoryUsage()
MemoryUsage(
init = math.max(0, heap.getInit()),
used = math.max(0, heap.getUsed()),
comitted = math.max(0, heap.getCommitted()),
max = math.max(0, heap.getMax())
)
}
implicit val jsonEncoder: Encoder[MemoryUsage] =
deriveEncoder[MemoryUsage]
implicit val jsonDecoder: Decoder[MemoryUsage] =
deriveDecoder[MemoryUsage]
}
implicit val jsonEncoder: Encoder[JvmInfo] =
deriveEncoder[JvmInfo]
implicit val jsonDecoder: Decoder[JvmInfo] =
deriveDecoder[JvmInfo]
}

View File

@ -276,6 +276,13 @@ docspell.joex {
# files.
working-dir = ${java.io.tmpdir}"/docspell-analysis"
# The StanfordCoreNLP library caches language models which
# requires quite some amount of memory. Setting this interval to a
# positive duration, the cache is cleared after this amount of
# idle time. Set it to 0 to disable it if you have enough memory,
# processing will be faster.
clear-stanford-nlp-interval = "15 minutes"
regex-ner {
# Whether to enable custom NER annotation. This uses the address
# book of a collective as input for NER tagging (to automatically

View File

@ -60,6 +60,7 @@ object Config {
case class TextAnalysis(
maxLength: Int,
workingDir: Path,
clearStanfordNlpInterval: Duration,
regexNer: RegexNer,
classification: Classification
) {
@ -67,6 +68,7 @@ object Config {
def textAnalysisConfig: TextAnalysisConfig =
TextAnalysisConfig(
maxLength,
clearStanfordNlpInterval,
TextClassifierConfig(
workingDir,
NonEmptyList

View File

@ -35,7 +35,7 @@ object JoexServer {
.create[F](cfg, signal, pools.connectEC, pools.httpClientEC, pools.blocker)
httpApp = Router(
"/api/info" -> InfoRoutes(),
"/api/info" -> InfoRoutes(cfg),
"/api/v1" -> JoexRoutes(joexApp)
).orNotFound

View File

@ -1,8 +1,10 @@
package docspell.joex.routes
import cats.effect.Sync
import cats.implicits._
import docspell.joex.BuildInfo
import docspell.common.JvmInfo
import docspell.joex.{BuildInfo, Config}
import docspell.joexapi.model.VersionInfo
import org.http4s.HttpRoutes
@ -11,10 +13,11 @@ import org.http4s.dsl.Http4sDsl
object InfoRoutes {
def apply[F[_]: Sync](): HttpRoutes[F] = {
def apply[F[_]: Sync](cfg: Config): HttpRoutes[F] = {
val dsl = new Http4sDsl[F] {}
import dsl._
HttpRoutes.of[F] { case GET -> (Root / "version") =>
HttpRoutes.of[F] {
case GET -> Root / "version" =>
Ok(
VersionInfo(
BuildInfo.version,
@ -24,6 +27,9 @@ object InfoRoutes {
BuildInfo.gitDescribedVersion.getOrElse("")
)
)
case GET -> Root / "system" =>
JvmInfo.create[F](cfg.appId).flatMap(Ok(_))
}
}
}

View File

@ -104,7 +104,8 @@ object RestServer {
def adminRoutes[F[_]: Effect](cfg: Config, restApp: RestApp[F]): HttpRoutes[F] =
Router(
"fts" -> FullTextIndexRoutes.admin(cfg, restApp.backend),
"user" -> UserRoutes.admin(restApp.backend)
"user" -> UserRoutes.admin(restApp.backend),
"info" -> InfoRoutes.admin(cfg)
)
def redirectTo[F[_]: Effect](path: String): HttpRoutes[F] = {

View File

@ -1,9 +1,11 @@
package docspell.restserver.routes
import cats.effect.Sync
import cats.implicits._
import docspell.common._
import docspell.restapi.model.VersionInfo
import docspell.restserver.BuildInfo
import docspell.restserver.{BuildInfo, Config}
import org.http4s.HttpRoutes
import org.http4s.circe.CirceEntityEncoder._
@ -26,4 +28,13 @@ object InfoRoutes {
)
}
}
def admin[F[_]: Sync](cfg: Config): HttpRoutes[F] = {
val dsl = new Http4sDsl[F] {}
import dsl._
HttpRoutes.of[F] { case GET -> Root / "system" =>
JvmInfo.create[F](cfg.appId).flatMap(Ok(_))
}
}
}

View File

@ -118,6 +118,7 @@ let
];
};
working-dir = "/tmp/docspell-analysis";
clear-stanford-nlp-interval = "15 minutes";
};
processing = {
max-due-date-years = 10;
@ -771,6 +772,14 @@ in {
files.
'';
};
clear-stanford-nlp-interval = mkOption {
type = types.str;
default = defaults.text-analysis.clear-stanford-nlp-interval;
description = ''
Idle time after which the NLP caches are cleared to free
memory. If <= 0 clearing the cache is disabled.
'';
};
regex-ner = mkOption {
type = types.submodule({