From 73a95728358c7c1990ff8805fd35bbacd8990577 Mon Sep 17 00:00:00 2001 From: Eike Kettner Date: Tue, 5 Jan 2021 23:56:20 +0100 Subject: [PATCH] Poc for clearing stanford pipeline after some idle time --- .../docspell/analysis/TextAnalyser.scala | 2 +- .../docspell/analysis/nlp/PipelineCache.scala | 56 ++++++++++++++++++- 2 files changed, 54 insertions(+), 4 deletions(-) diff --git a/modules/analysis/src/main/scala/docspell/analysis/TextAnalyser.scala b/modules/analysis/src/main/scala/docspell/analysis/TextAnalyser.scala index 44f7203b..dffd4fd8 100644 --- a/modules/analysis/src/main/scala/docspell/analysis/TextAnalyser.scala +++ b/modules/analysis/src/main/scala/docspell/analysis/TextAnalyser.scala @@ -31,7 +31,7 @@ object TextAnalyser { labels ++ dates.map(dl => dl.label.copy(label = dl.date.toString)) } - def create[F[_]: Sync](cfg: TextAnalysisConfig): Resource[F, TextAnalyser[F]] = + def create[F[_]: Concurrent: Timer](cfg: TextAnalysisConfig): Resource[F, TextAnalyser[F]] = Resource .liftF(PipelineCache[F]()) .map(cache => diff --git a/modules/analysis/src/main/scala/docspell/analysis/nlp/PipelineCache.scala b/modules/analysis/src/main/scala/docspell/analysis/nlp/PipelineCache.scala index 88e13ee3..b5d48ee4 100644 --- a/modules/analysis/src/main/scala/docspell/analysis/nlp/PipelineCache.scala +++ b/modules/analysis/src/main/scala/docspell/analysis/nlp/PipelineCache.scala @@ -9,6 +9,8 @@ import docspell.common._ import edu.stanford.nlp.pipeline.StanfordCoreNLP import org.log4s.getLogger +import scala.concurrent.duration._ +import cats.data.OptionT /** Creating the StanfordCoreNLP pipeline is quite expensive as it * involves IO and initializing large objects. @@ -32,18 +34,64 @@ object PipelineCache { makeClassifier(settings).pure[F] } - def apply[F[_]: Sync](): F[PipelineCache[F]] = - Ref.of(Map.empty[String, Entry]).map(data => (new Impl[F](data): PipelineCache[F])) + def apply[F[_]: Concurrent: Timer](): F[PipelineCache[F]] = + for { + data <- Ref.of(Map.empty[String, Entry]) + counter <- Ref.of(Long.MinValue) + cleaning <- Ref.of(false) + } yield new Impl[F](data, counter, cleaning): PipelineCache[F] - final private class Impl[F[_]: Sync](data: Ref[F, Map[String, Entry]]) + final private class Impl[F[_]]( + data: Ref[F, Map[String, Entry]], + counter: Ref[F, Long], + cleaningProgress: Ref[F, Boolean] + )(implicit T: Timer[F], F: Concurrent[F]) extends PipelineCache[F] { + private[this] val clearInterval = 1.minute + private[this] val log = Logger.log4s(logger) + def obtain(key: String, settings: StanfordNerSettings): F[StanfordCoreNLP] = for { id <- makeSettingsId(settings) nlp <- data.modify(cache => getOrCreate(key, id, cache, settings)) + _ <- scheduleClearPipeline } yield nlp + private def scheduleClearPipeline: F[Unit] = + (for { + cnt <- OptionT(counter.tryModify(n => (n + 1, n + 1))) + free <- OptionT.liftF(cleaningProgress.access.flatMap { case (b, setter) => + if (b) false.pure[F] + else setter(true) + }) + _ <- OptionT.liftF( + if (free) + F.start( + T.sleep(clearInterval) *> cleaningProgress.set(false) *> clearStale(cnt) + ) + else ().pure[F] + ) + } yield ()).getOrElse(()) + + private def clearStale(n: Long): F[Unit] = + log.debug("Attempting to clear stanford nlp pipeline cache to free memory") *> + counter.get.flatMap(x => + if (x == n) clearAll + else + log.debug( + "Don't clear yet, as it has been used in between" + ) *> scheduleClearPipeline + ) + + private def clearAll: F[Unit] = + log.info("Clearing stanford nlp pipeline cache now!") *> + data.set(Map.empty) *> Sync[F].delay { + // turns out that everything is cached in a static map + StanfordCoreNLP.clearAnnotatorPool() + System.gc(); + } + private def getOrCreate( key: String, id: String, @@ -81,6 +129,8 @@ object PipelineCache { } } + + private def makeClassifier(settings: StanfordNerSettings): StanfordCoreNLP = { logger.info(s"Creating ${settings.lang.name} Stanford NLP NER classifier...") new StanfordCoreNLP(Properties.forSettings(settings))