From b08e88cd694c53794bc1631ad203f482160a7e3b Mon Sep 17 00:00:00 2001 From: Eike Kettner Date: Tue, 5 Jan 2021 20:54:53 +0100 Subject: [PATCH 1/4] Add (inofficial) routes to get system information --- build.sbt | 8 +- .../main/scala/docspell/common/ByteSize.scala | 69 ++++++++++++ .../main/scala/docspell/common/JvmInfo.scala | 103 ++++++++++++++++++ .../main/scala/docspell/joex/JoexServer.scala | 2 +- .../docspell/joex/routes/InfoRoutes.scala | 28 +++-- .../docspell/restserver/RestServer.scala | 3 +- .../restserver/routes/InfoRoutes.scala | 13 ++- 7 files changed, 210 insertions(+), 16 deletions(-) create mode 100644 modules/common/src/main/scala/docspell/common/ByteSize.scala create mode 100644 modules/common/src/main/scala/docspell/common/JvmInfo.scala diff --git a/build.sbt b/build.sbt index 52eed4f0..5eb05301 100644 --- a/build.sbt +++ b/build.sbt @@ -446,7 +446,9 @@ val joex = project addCompilerPlugin(Dependencies.betterMonadicFor), buildInfoPackage := "docspell.joex", reStart / javaOptions ++= Seq( - s"-Dconfig.file=${(LocalRootProject / baseDirectory).value / "local" / "dev.conf"}" + s"-Dconfig.file=${(LocalRootProject / baseDirectory).value / "local" / "dev.conf"}", + "-Xmx1596M", + "-XX:+UseG1GC" ), Revolver.enableDebugging(port = 5051, suspend = false) ) @@ -494,7 +496,9 @@ val restserver = project (Compile / resourceDirectory).value.getParentFile / "templates" ), reStart / javaOptions ++= Seq( - s"-Dconfig.file=${(LocalRootProject / baseDirectory).value / "local" / "dev.conf"}" + s"-Dconfig.file=${(LocalRootProject / baseDirectory).value / "local" / "dev.conf"}", + "-Xmx150M", + "-XX:+UseG1GC" ), Revolver.enableDebugging(port = 5050, suspend = false) ) diff --git a/modules/common/src/main/scala/docspell/common/ByteSize.scala b/modules/common/src/main/scala/docspell/common/ByteSize.scala new file mode 100644 index 00000000..3f6e4ccc --- /dev/null +++ b/modules/common/src/main/scala/docspell/common/ByteSize.scala @@ -0,0 +1,69 @@ +package docspell.common + +import io.circe.Decoder +import io.circe.Encoder + +final case class ByteSize(bytes: Long) { + + def toHuman: String = + ByteSize.bytesToHuman(bytes) + + def <=(other: ByteSize) = + bytes <= other.bytes + + def >=(other: ByteSize) = + bytes >= other.bytes + + def >(other: ByteSize) = + bytes > other.bytes + + def -(other: ByteSize) = + ByteSize(bytes - other.bytes) + + def +(other: ByteSize) = + ByteSize(bytes + other.bytes) +} + +object ByteSize { + + val zero = ByteSize(0L) + + def bytesToHuman(bytes: Long): String = + if (math.abs(bytes) < 1024 && bytes != Long.MinValue) s"${bytes}B" + else { + val k = bytes / 1024.0 + if (math.abs(k) < 1024) f"$k%.02fK" + else { + val m = k / 1024.0 + if (math.abs(m) < 1024) f"$m%.02fM" + else f"${m / 1024.0}%.02fG" + } + } + + def parse(str: String): Either[String, ByteSize] = + str.toLongOption + .map(ByteSize.apply) + .toRight(s"Not a valid size string: $str") + .orElse(span(str.toLowerCase) match { + case (num, "k") => + Right(ByteSize(math.round(num.toDouble * 1024))) + case (num, "m") => + Right(ByteSize(math.round(num.toDouble * 1024 * 1024))) + case (num, "g") => + Right(ByteSize(math.round(num.toDouble * 1024 * 1024 * 1024))) + case _ => + Left(s"Invalid byte string: $str") + }) + + private def span(str: String): (String, String) = + if (str.isEmpty) ("", "") + else (str.init, str.last.toString) + + def unsafe(str: String): ByteSize = + parse(str).fold(sys.error, identity) + + implicit val jsonDecoder: Decoder[ByteSize] = + Decoder.decodeLong.map(ByteSize.apply) + implicit val jsonEncoder: Encoder[ByteSize] = + Encoder.encodeLong.contramap(_.bytes) +} diff --git a/modules/common/src/main/scala/docspell/common/JvmInfo.scala b/modules/common/src/main/scala/docspell/common/JvmInfo.scala new file mode 100644 index 00000000..35852c98 --- /dev/null +++ b/modules/common/src/main/scala/docspell/common/JvmInfo.scala @@ -0,0 +1,103 @@ +package docspell.common + +import java.time.Instant + +import scala.jdk.CollectionConverters._ + +import cats.effect._ +import cats.implicits._ + +import io.circe.generic.semiauto.{deriveDecoder, deriveEncoder} +import io.circe.{Decoder, Encoder} + +case class JvmInfo( + id: Ident, + pidHost: String, + ncpu: Int, + inputArgs: List[String], + libraryPath: String, + specVendor: String, + specVersion: String, + startTime: Timestamp, + uptime: Duration, + vmName: String, + vmVendor: String, + vmVersion: String, + heapUsage: JvmInfo.MemoryUsage, + props: Map[String, String] +) + +object JvmInfo { + + def create[F[_]: Sync](id: Ident): F[JvmInfo] = + MemoryUsage.createHeap[F].flatMap { mu => + Sync[F].delay { + val rmb = management.ManagementFactory.getRuntimeMXBean() + val rt = Runtime.getRuntime() + JvmInfo( + id, + pidHost = rmb.getName(), + ncpu = rt.availableProcessors(), + inputArgs = rmb.getInputArguments().asScala.toList, + libraryPath = rmb.getLibraryPath(), + specVendor = rmb.getSpecVendor(), + specVersion = rmb.getSpecVersion(), + startTime = Timestamp(Instant.ofEpochMilli(rmb.getStartTime())), + uptime = Duration.millis(rmb.getUptime()), + vmName = rmb.getVmName(), + vmVendor = rmb.getVmVendor(), + vmVersion = rmb.getVmVersion(), + heapUsage = mu, + props = rmb.getSystemProperties().asScala.toMap + ) + } + } + + case class MemoryUsage( + init: Long, + used: Long, + comitted: Long, + max: Long, + free: Long, + description: String + ) + + object MemoryUsage { + + def apply(init: Long, used: Long, comitted: Long, max: Long): MemoryUsage = { + def str(n: Long) = ByteSize(n).toHuman + + val free = max - used + + val descr = + s"init=${str(init)}, used=${str(used)}, comitted=${str(comitted)}, max=${str(max)}, free=${str(free)}" + MemoryUsage(init, used, comitted, max, free, descr) + } + + val empty = MemoryUsage(0, 0, 0, 0) + + def createHeap[F[_]: Sync]: F[MemoryUsage] = + Sync[F].delay { + val mxb = management.ManagementFactory.getMemoryMXBean() + val heap = mxb.getHeapMemoryUsage() + MemoryUsage( + init = math.max(0, heap.getInit()), + used = math.max(0, heap.getUsed()), + comitted = math.max(0, heap.getCommitted()), + max = math.max(0, heap.getMax()) + ) + } + + implicit val jsonEncoder: Encoder[MemoryUsage] = + deriveEncoder[MemoryUsage] + + implicit val jsonDecoder: Decoder[MemoryUsage] = + deriveDecoder[MemoryUsage] + } + + implicit val jsonEncoder: Encoder[JvmInfo] = + deriveEncoder[JvmInfo] + + implicit val jsonDecoder: Decoder[JvmInfo] = + deriveDecoder[JvmInfo] +} diff --git a/modules/joex/src/main/scala/docspell/joex/JoexServer.scala b/modules/joex/src/main/scala/docspell/joex/JoexServer.scala index f103e76f..9296ee30 100644 --- a/modules/joex/src/main/scala/docspell/joex/JoexServer.scala +++ b/modules/joex/src/main/scala/docspell/joex/JoexServer.scala @@ -35,7 +35,7 @@ object JoexServer { .create[F](cfg, signal, pools.connectEC, pools.httpClientEC, pools.blocker) httpApp = Router( - "/api/info" -> InfoRoutes(), + "/api/info" -> InfoRoutes(cfg), "/api/v1" -> JoexRoutes(joexApp) ).orNotFound diff --git a/modules/joex/src/main/scala/docspell/joex/routes/InfoRoutes.scala b/modules/joex/src/main/scala/docspell/joex/routes/InfoRoutes.scala index 694da0f2..093d9806 100644 --- a/modules/joex/src/main/scala/docspell/joex/routes/InfoRoutes.scala +++ b/modules/joex/src/main/scala/docspell/joex/routes/InfoRoutes.scala @@ -1,8 +1,10 @@ package docspell.joex.routes import cats.effect.Sync +import cats.implicits._ -import docspell.joex.BuildInfo +import docspell.common.JvmInfo +import docspell.joex.{BuildInfo, Config} import docspell.joexapi.model.VersionInfo import org.http4s.HttpRoutes @@ -11,19 +13,23 @@ import org.http4s.dsl.Http4sDsl object InfoRoutes { - def apply[F[_]: Sync](): HttpRoutes[F] = { + def apply[F[_]: Sync](cfg: Config): HttpRoutes[F] = { val dsl = new Http4sDsl[F] {} import dsl._ - HttpRoutes.of[F] { case GET -> (Root / "version") => - Ok( - VersionInfo( - BuildInfo.version, - BuildInfo.builtAtMillis, - BuildInfo.builtAtString, - BuildInfo.gitHeadCommit.getOrElse(""), - BuildInfo.gitDescribedVersion.getOrElse("") + HttpRoutes.of[F] { + case GET -> Root / "version" => + Ok( + VersionInfo( + BuildInfo.version, + BuildInfo.builtAtMillis, + BuildInfo.builtAtString, + BuildInfo.gitHeadCommit.getOrElse(""), + BuildInfo.gitDescribedVersion.getOrElse("") + ) ) - ) + + case GET -> Root / "system" => + JvmInfo.create[F](cfg.appId).flatMap(Ok(_)) } } } diff --git a/modules/restserver/src/main/scala/docspell/restserver/RestServer.scala b/modules/restserver/src/main/scala/docspell/restserver/RestServer.scala index 1f582c47..557bed9e 100644 --- a/modules/restserver/src/main/scala/docspell/restserver/RestServer.scala +++ b/modules/restserver/src/main/scala/docspell/restserver/RestServer.scala @@ -104,7 +104,8 @@ object RestServer { def adminRoutes[F[_]: Effect](cfg: Config, restApp: RestApp[F]): HttpRoutes[F] = Router( "fts" -> FullTextIndexRoutes.admin(cfg, restApp.backend), - "user" -> UserRoutes.admin(restApp.backend) + "user" -> UserRoutes.admin(restApp.backend), + "info" -> InfoRoutes.admin(cfg) ) def redirectTo[F[_]: Effect](path: String): HttpRoutes[F] = { diff --git a/modules/restserver/src/main/scala/docspell/restserver/routes/InfoRoutes.scala b/modules/restserver/src/main/scala/docspell/restserver/routes/InfoRoutes.scala index c8a01926..9102df6e 100644 --- a/modules/restserver/src/main/scala/docspell/restserver/routes/InfoRoutes.scala +++ b/modules/restserver/src/main/scala/docspell/restserver/routes/InfoRoutes.scala @@ -1,9 +1,11 @@ package docspell.restserver.routes import cats.effect.Sync +import cats.implicits._ +import docspell.common._ import docspell.restapi.model.VersionInfo -import docspell.restserver.BuildInfo +import docspell.restserver.{BuildInfo, Config} import org.http4s.HttpRoutes import org.http4s.circe.CirceEntityEncoder._ @@ -26,4 +28,13 @@ object InfoRoutes { ) } } + + def admin[F[_]: Sync](cfg: Config): HttpRoutes[F] = { + val dsl = new Http4sDsl[F] {} + import dsl._ + HttpRoutes.of[F] { case GET -> Root / "system" => + JvmInfo.create[F](cfg.appId).flatMap(Ok(_)) + } + + } } From 73a95728358c7c1990ff8805fd35bbacd8990577 Mon Sep 17 00:00:00 2001 From: Eike Kettner Date: Tue, 5 Jan 2021 23:56:20 +0100 Subject: [PATCH 2/4] Poc for clearing stanford pipeline after some idle time --- .../docspell/analysis/TextAnalyser.scala | 2 +- .../docspell/analysis/nlp/PipelineCache.scala | 56 ++++++++++++++++++- 2 files changed, 54 insertions(+), 4 deletions(-) diff --git a/modules/analysis/src/main/scala/docspell/analysis/TextAnalyser.scala b/modules/analysis/src/main/scala/docspell/analysis/TextAnalyser.scala index 44f7203b..dffd4fd8 100644 --- a/modules/analysis/src/main/scala/docspell/analysis/TextAnalyser.scala +++ b/modules/analysis/src/main/scala/docspell/analysis/TextAnalyser.scala @@ -31,7 +31,7 @@ object TextAnalyser { labels ++ dates.map(dl => dl.label.copy(label = dl.date.toString)) } - def create[F[_]: Sync](cfg: TextAnalysisConfig): Resource[F, TextAnalyser[F]] = + def create[F[_]: Concurrent: Timer](cfg: TextAnalysisConfig): Resource[F, TextAnalyser[F]] = Resource .liftF(PipelineCache[F]()) .map(cache => diff --git a/modules/analysis/src/main/scala/docspell/analysis/nlp/PipelineCache.scala b/modules/analysis/src/main/scala/docspell/analysis/nlp/PipelineCache.scala index 88e13ee3..b5d48ee4 100644 --- a/modules/analysis/src/main/scala/docspell/analysis/nlp/PipelineCache.scala +++ b/modules/analysis/src/main/scala/docspell/analysis/nlp/PipelineCache.scala @@ -9,6 +9,8 @@ import docspell.common._ import edu.stanford.nlp.pipeline.StanfordCoreNLP import org.log4s.getLogger +import scala.concurrent.duration._ +import cats.data.OptionT /** Creating the StanfordCoreNLP pipeline is quite expensive as it * involves IO and initializing large objects. @@ -32,18 +34,64 @@ object PipelineCache { makeClassifier(settings).pure[F] } - def apply[F[_]: Sync](): F[PipelineCache[F]] = - Ref.of(Map.empty[String, Entry]).map(data => (new Impl[F](data): PipelineCache[F])) + def apply[F[_]: Concurrent: Timer](): F[PipelineCache[F]] = + for { + data <- Ref.of(Map.empty[String, Entry]) + counter <- Ref.of(Long.MinValue) + cleaning <- Ref.of(false) + } yield new Impl[F](data, counter, cleaning): PipelineCache[F] - final private class Impl[F[_]: Sync](data: Ref[F, Map[String, Entry]]) + final private class Impl[F[_]]( + data: Ref[F, Map[String, Entry]], + counter: Ref[F, Long], + cleaningProgress: Ref[F, Boolean] + )(implicit T: Timer[F], F: Concurrent[F]) extends PipelineCache[F] { + private[this] val clearInterval = 1.minute + private[this] val log = Logger.log4s(logger) + def obtain(key: String, settings: StanfordNerSettings): F[StanfordCoreNLP] = for { id <- makeSettingsId(settings) nlp <- data.modify(cache => getOrCreate(key, id, cache, settings)) + _ <- scheduleClearPipeline } yield nlp + private def scheduleClearPipeline: F[Unit] = + (for { + cnt <- OptionT(counter.tryModify(n => (n + 1, n + 1))) + free <- OptionT.liftF(cleaningProgress.access.flatMap { case (b, setter) => + if (b) false.pure[F] + else setter(true) + }) + _ <- OptionT.liftF( + if (free) + F.start( + T.sleep(clearInterval) *> cleaningProgress.set(false) *> clearStale(cnt) + ) + else ().pure[F] + ) + } yield ()).getOrElse(()) + + private def clearStale(n: Long): F[Unit] = + log.debug("Attempting to clear stanford nlp pipeline cache to free memory") *> + counter.get.flatMap(x => + if (x == n) clearAll + else + log.debug( + "Don't clear yet, as it has been used in between" + ) *> scheduleClearPipeline + ) + + private def clearAll: F[Unit] = + log.info("Clearing stanford nlp pipeline cache now!") *> + data.set(Map.empty) *> Sync[F].delay { + // turns out that everything is cached in a static map + StanfordCoreNLP.clearAnnotatorPool() + System.gc(); + } + private def getOrCreate( key: String, id: String, @@ -81,6 +129,8 @@ object PipelineCache { } } + + private def makeClassifier(settings: StanfordNerSettings): StanfordCoreNLP = { logger.info(s"Creating ${settings.lang.name} Stanford NLP NER classifier...") new StanfordCoreNLP(Properties.forSettings(settings)) From a670bbb6c2d963bbcd2a858bd41b0d40d6ad5a2a Mon Sep 17 00:00:00 2001 From: Eike Kettner Date: Wed, 6 Jan 2021 00:35:58 +0100 Subject: [PATCH 3/4] Make idle interval when clearing nlp cache configurable --- .../docspell/analysis/TextAnalyser.scala | 6 +- .../analysis/TextAnalysisConfig.scala | 2 + .../docspell/analysis/nlp/PipelineCache.scala | 146 +++++++++++------- .../analysis/nlp/StanfordNerClassifier.scala | 10 +- .../joex/src/main/resources/reference.conf | 7 + .../src/main/scala/docspell/joex/Config.scala | 2 + 6 files changed, 109 insertions(+), 64 deletions(-) diff --git a/modules/analysis/src/main/scala/docspell/analysis/TextAnalyser.scala b/modules/analysis/src/main/scala/docspell/analysis/TextAnalyser.scala index dffd4fd8..8ec4854e 100644 --- a/modules/analysis/src/main/scala/docspell/analysis/TextAnalyser.scala +++ b/modules/analysis/src/main/scala/docspell/analysis/TextAnalyser.scala @@ -31,9 +31,11 @@ object TextAnalyser { labels ++ dates.map(dl => dl.label.copy(label = dl.date.toString)) } - def create[F[_]: Concurrent: Timer](cfg: TextAnalysisConfig): Resource[F, TextAnalyser[F]] = + def create[F[_]: Concurrent: Timer]( + cfg: TextAnalysisConfig + ): Resource[F, TextAnalyser[F]] = Resource - .liftF(PipelineCache[F]()) + .liftF(PipelineCache[F](cfg.clearStanfordPipelineInterval)) .map(cache => new TextAnalyser[F] { def annotate( diff --git a/modules/analysis/src/main/scala/docspell/analysis/TextAnalysisConfig.scala b/modules/analysis/src/main/scala/docspell/analysis/TextAnalysisConfig.scala index 596a6247..cb6e1d39 100644 --- a/modules/analysis/src/main/scala/docspell/analysis/TextAnalysisConfig.scala +++ b/modules/analysis/src/main/scala/docspell/analysis/TextAnalysisConfig.scala @@ -1,8 +1,10 @@ package docspell.analysis import docspell.analysis.nlp.TextClassifierConfig +import docspell.common._ case class TextAnalysisConfig( maxLength: Int, + clearStanfordPipelineInterval: Duration, classifier: TextClassifierConfig ) diff --git a/modules/analysis/src/main/scala/docspell/analysis/nlp/PipelineCache.scala b/modules/analysis/src/main/scala/docspell/analysis/nlp/PipelineCache.scala index b5d48ee4..663fbcbf 100644 --- a/modules/analysis/src/main/scala/docspell/analysis/nlp/PipelineCache.scala +++ b/modules/analysis/src/main/scala/docspell/analysis/nlp/PipelineCache.scala @@ -1,5 +1,7 @@ package docspell.analysis.nlp +import scala.concurrent.duration.{Duration => _, _} + import cats.Applicative import cats.effect._ import cats.effect.concurrent.Ref @@ -9,8 +11,6 @@ import docspell.common._ import edu.stanford.nlp.pipeline.StanfordCoreNLP import org.log4s.getLogger -import scala.concurrent.duration._ -import cats.data.OptionT /** Creating the StanfordCoreNLP pipeline is quite expensive as it * involves IO and initializing large objects. @@ -21,7 +21,7 @@ import cats.data.OptionT */ trait PipelineCache[F[_]] { - def obtain(key: String, settings: StanfordNerSettings): F[StanfordCoreNLP] + def obtain(key: String, settings: StanfordNerSettings): Resource[F, StanfordCoreNLP] } @@ -30,68 +30,31 @@ object PipelineCache { def none[F[_]: Applicative]: PipelineCache[F] = new PipelineCache[F] { - def obtain(ignored: String, settings: StanfordNerSettings): F[StanfordCoreNLP] = - makeClassifier(settings).pure[F] + def obtain( + ignored: String, + settings: StanfordNerSettings + ): Resource[F, StanfordCoreNLP] = + Resource.liftF(makeClassifier(settings).pure[F]) } - def apply[F[_]: Concurrent: Timer](): F[PipelineCache[F]] = + def apply[F[_]: Concurrent: Timer](clearInterval: Duration): F[PipelineCache[F]] = for { - data <- Ref.of(Map.empty[String, Entry]) - counter <- Ref.of(Long.MinValue) - cleaning <- Ref.of(false) - } yield new Impl[F](data, counter, cleaning): PipelineCache[F] + data <- Ref.of(Map.empty[String, Entry]) + cacheClear <- CacheClearing.create(data, clearInterval) + } yield new Impl[F](data, cacheClear) - final private class Impl[F[_]]( + final private class Impl[F[_]: Sync]( data: Ref[F, Map[String, Entry]], - counter: Ref[F, Long], - cleaningProgress: Ref[F, Boolean] - )(implicit T: Timer[F], F: Concurrent[F]) - extends PipelineCache[F] { + cacheClear: CacheClearing[F] + ) extends PipelineCache[F] { - private[this] val clearInterval = 1.minute - private[this] val log = Logger.log4s(logger) - - def obtain(key: String, settings: StanfordNerSettings): F[StanfordCoreNLP] = + def obtain(key: String, settings: StanfordNerSettings): Resource[F, StanfordCoreNLP] = for { - id <- makeSettingsId(settings) - nlp <- data.modify(cache => getOrCreate(key, id, cache, settings)) - _ <- scheduleClearPipeline + _ <- cacheClear.withCache + id <- Resource.liftF(makeSettingsId(settings)) + nlp <- Resource.liftF(data.modify(cache => getOrCreate(key, id, cache, settings))) } yield nlp - private def scheduleClearPipeline: F[Unit] = - (for { - cnt <- OptionT(counter.tryModify(n => (n + 1, n + 1))) - free <- OptionT.liftF(cleaningProgress.access.flatMap { case (b, setter) => - if (b) false.pure[F] - else setter(true) - }) - _ <- OptionT.liftF( - if (free) - F.start( - T.sleep(clearInterval) *> cleaningProgress.set(false) *> clearStale(cnt) - ) - else ().pure[F] - ) - } yield ()).getOrElse(()) - - private def clearStale(n: Long): F[Unit] = - log.debug("Attempting to clear stanford nlp pipeline cache to free memory") *> - counter.get.flatMap(x => - if (x == n) clearAll - else - log.debug( - "Don't clear yet, as it has been used in between" - ) *> scheduleClearPipeline - ) - - private def clearAll: F[Unit] = - log.info("Clearing stanford nlp pipeline cache now!") *> - data.set(Map.empty) *> Sync[F].delay { - // turns out that everything is cached in a static map - StanfordCoreNLP.clearAnnotatorPool() - System.gc(); - } - private def getOrCreate( key: String, id: String, @@ -130,6 +93,77 @@ object PipelineCache { } + trait CacheClearing[F[_]] { + def withCache: Resource[F, Unit] + } + + object CacheClearing { + def none[F[_]: Applicative]: CacheClearing[F] = + new CacheClearing[F] { + def withCache: Resource[F, Unit] = + Resource.pure[F, Unit](()) + } + + def create[F[_]: Concurrent: Timer]( + data: Ref[F, Map[String, Entry]], + interval: Duration + ): F[CacheClearing[F]] = + for { + counter <- Ref.of(0L) + cleaning <- Ref.of(None: Option[Fiber[F, Unit]]) + log = Logger.log4s(logger) + result <- + if (interval.millis <= 0) + log + .info("Disable clearing StanfordNLP cache, due to config setting") + .map(_ => none[F]) + else + log + .info(s"Clearing StanfordNLP cache after $interval idle time") + .map(_ => + new CacheClearingImpl[F](data, counter, cleaning, interval.toScala) + ) + } yield result + } + + final private class CacheClearingImpl[F[_]]( + data: Ref[F, Map[String, Entry]], + counter: Ref[F, Long], + cleaningFiber: Ref[F, Option[Fiber[F, Unit]]], + clearInterval: FiniteDuration + )(implicit T: Timer[F], F: Concurrent[F]) + extends CacheClearing[F] { + private[this] val log = Logger.log4s[F](logger) + + def withCache: Resource[F, Unit] = + Resource.make(counter.update(_ + 1))(_ => + counter.updateAndGet(_ - 1).flatMap(n => scheduleClearPipeline(n)) + ) + + def scheduleClearPipeline(cnt: Long): F[Unit] = + if (cnt > 0) cancelClear + else cancelClear *> clearAllLater.flatMap(fiber => cleaningFiber.set(fiber.some)) + + private def cancelClear: F[Unit] = + cleaningFiber.getAndSet(None).flatMap { + case Some(fiber) => fiber.cancel *> logDontClear + case None => ().pure[F] + } + + private def clearAllLater: F[Fiber[F, Unit]] = + F.start(T.sleep(clearInterval) *> clearAll) + + private def logDontClear: F[Unit] = + log.info("Cancel stanford cache clearing, as it has been used in between.") + + def clearAll: F[Unit] = + log.info("Clearing stanford nlp cache now!") *> + data.set(Map.empty) *> Sync[F].delay { + // turns out that everything is cached in a static map + StanfordCoreNLP.clearAnnotatorPool() + System.gc(); + } + } private def makeClassifier(settings: StanfordNerSettings): StanfordCoreNLP = { logger.info(s"Creating ${settings.lang.name} Stanford NLP NER classifier...") diff --git a/modules/analysis/src/main/scala/docspell/analysis/nlp/StanfordNerClassifier.scala b/modules/analysis/src/main/scala/docspell/analysis/nlp/StanfordNerClassifier.scala index 383a07ea..b82b1c95 100644 --- a/modules/analysis/src/main/scala/docspell/analysis/nlp/StanfordNerClassifier.scala +++ b/modules/analysis/src/main/scala/docspell/analysis/nlp/StanfordNerClassifier.scala @@ -1,12 +1,10 @@ package docspell.analysis.nlp -import scala.jdk.CollectionConverters._ - import cats.Applicative -import cats.implicits._ +import scala.jdk.CollectionConverters._ +import cats.effect._ import docspell.common._ - import edu.stanford.nlp.pipeline.{CoreDocument, StanfordCoreNLP} object StanfordNerClassifier { @@ -22,13 +20,13 @@ object StanfordNerClassifier { * a new classifier must be created. It will then replace the * previous one. */ - def nerAnnotate[F[_]: Applicative]( + def nerAnnotate[F[_]: BracketThrow]( cacheKey: String, cache: PipelineCache[F] )(settings: StanfordNerSettings, text: String): F[Vector[NerLabel]] = cache .obtain(cacheKey, settings) - .map(crf => runClassifier(crf, text)) + .use(crf => Applicative[F].pure(runClassifier(crf, text))) def runClassifier(nerClassifier: StanfordCoreNLP, text: String): Vector[NerLabel] = { val doc = new CoreDocument(text) diff --git a/modules/joex/src/main/resources/reference.conf b/modules/joex/src/main/resources/reference.conf index 8437982d..9561be58 100644 --- a/modules/joex/src/main/resources/reference.conf +++ b/modules/joex/src/main/resources/reference.conf @@ -276,6 +276,13 @@ docspell.joex { # files. working-dir = ${java.io.tmpdir}"/docspell-analysis" + # The StanfordCoreNLP library caches language models which + # requires quite some amount of memory. Setting this interval to a + # positive duration, the cache is cleared after this amount of + # idle time. Set it to 0 to disable it if you have enough memory, + # processing will be faster. + clear-stanford-nlp-interval = "15 minutes" + regex-ner { # Whether to enable custom NER annotation. This uses the address # book of a collective as input for NER tagging (to automatically diff --git a/modules/joex/src/main/scala/docspell/joex/Config.scala b/modules/joex/src/main/scala/docspell/joex/Config.scala index cbbb4a33..601d0049 100644 --- a/modules/joex/src/main/scala/docspell/joex/Config.scala +++ b/modules/joex/src/main/scala/docspell/joex/Config.scala @@ -60,6 +60,7 @@ object Config { case class TextAnalysis( maxLength: Int, workingDir: Path, + clearStanfordNlpInterval: Duration, regexNer: RegexNer, classification: Classification ) { @@ -67,6 +68,7 @@ object Config { def textAnalysisConfig: TextAnalysisConfig = TextAnalysisConfig( maxLength, + clearStanfordNlpInterval, TextClassifierConfig( workingDir, NonEmptyList From a563ba33e7b7a80ab05d8bfc08fabe77a1d8c18d Mon Sep 17 00:00:00 2001 From: Eike Kettner Date: Wed, 6 Jan 2021 23:06:13 +0100 Subject: [PATCH 4/4] Add new joex option to nix module --- nix/module-joex.nix | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/nix/module-joex.nix b/nix/module-joex.nix index a0590f81..373a6aed 100644 --- a/nix/module-joex.nix +++ b/nix/module-joex.nix @@ -118,6 +118,7 @@ let ]; }; working-dir = "/tmp/docspell-analysis"; + clear-stanford-nlp-interval = "15 minutes"; }; processing = { max-due-date-years = 10; @@ -771,6 +772,14 @@ in { files. ''; }; + clear-stanford-nlp-interval = mkOption { + type = types.str; + default = defaults.text-analysis.clear-stanford-nlp-interval; + description = '' + Idle time after which the NLP caches are cleared to free + memory. If <= 0 clearing the cache is disabled. + ''; + }; regex-ner = mkOption { type = types.submodule({