diff --git a/modules/analysis/src/main/scala/docspell/analysis/TextAnalyser.scala b/modules/analysis/src/main/scala/docspell/analysis/TextAnalyser.scala index dffd4fd8..8ec4854e 100644 --- a/modules/analysis/src/main/scala/docspell/analysis/TextAnalyser.scala +++ b/modules/analysis/src/main/scala/docspell/analysis/TextAnalyser.scala @@ -31,9 +31,11 @@ object TextAnalyser { labels ++ dates.map(dl => dl.label.copy(label = dl.date.toString)) } - def create[F[_]: Concurrent: Timer](cfg: TextAnalysisConfig): Resource[F, TextAnalyser[F]] = + def create[F[_]: Concurrent: Timer]( + cfg: TextAnalysisConfig + ): Resource[F, TextAnalyser[F]] = Resource - .liftF(PipelineCache[F]()) + .liftF(PipelineCache[F](cfg.clearStanfordPipelineInterval)) .map(cache => new TextAnalyser[F] { def annotate( diff --git a/modules/analysis/src/main/scala/docspell/analysis/TextAnalysisConfig.scala b/modules/analysis/src/main/scala/docspell/analysis/TextAnalysisConfig.scala index 596a6247..cb6e1d39 100644 --- a/modules/analysis/src/main/scala/docspell/analysis/TextAnalysisConfig.scala +++ b/modules/analysis/src/main/scala/docspell/analysis/TextAnalysisConfig.scala @@ -1,8 +1,10 @@ package docspell.analysis import docspell.analysis.nlp.TextClassifierConfig +import docspell.common._ case class TextAnalysisConfig( maxLength: Int, + clearStanfordPipelineInterval: Duration, classifier: TextClassifierConfig ) diff --git a/modules/analysis/src/main/scala/docspell/analysis/nlp/PipelineCache.scala b/modules/analysis/src/main/scala/docspell/analysis/nlp/PipelineCache.scala index b5d48ee4..663fbcbf 100644 --- a/modules/analysis/src/main/scala/docspell/analysis/nlp/PipelineCache.scala +++ b/modules/analysis/src/main/scala/docspell/analysis/nlp/PipelineCache.scala @@ -1,5 +1,7 @@ package docspell.analysis.nlp +import scala.concurrent.duration.{Duration => _, _} + import cats.Applicative import cats.effect._ import cats.effect.concurrent.Ref @@ -9,8 +11,6 @@ import docspell.common._ import edu.stanford.nlp.pipeline.StanfordCoreNLP import org.log4s.getLogger -import scala.concurrent.duration._ -import cats.data.OptionT /** Creating the StanfordCoreNLP pipeline is quite expensive as it * involves IO and initializing large objects. @@ -21,7 +21,7 @@ import cats.data.OptionT */ trait PipelineCache[F[_]] { - def obtain(key: String, settings: StanfordNerSettings): F[StanfordCoreNLP] + def obtain(key: String, settings: StanfordNerSettings): Resource[F, StanfordCoreNLP] } @@ -30,68 +30,31 @@ object PipelineCache { def none[F[_]: Applicative]: PipelineCache[F] = new PipelineCache[F] { - def obtain(ignored: String, settings: StanfordNerSettings): F[StanfordCoreNLP] = - makeClassifier(settings).pure[F] + def obtain( + ignored: String, + settings: StanfordNerSettings + ): Resource[F, StanfordCoreNLP] = + Resource.liftF(makeClassifier(settings).pure[F]) } - def apply[F[_]: Concurrent: Timer](): F[PipelineCache[F]] = + def apply[F[_]: Concurrent: Timer](clearInterval: Duration): F[PipelineCache[F]] = for { - data <- Ref.of(Map.empty[String, Entry]) - counter <- Ref.of(Long.MinValue) - cleaning <- Ref.of(false) - } yield new Impl[F](data, counter, cleaning): PipelineCache[F] + data <- Ref.of(Map.empty[String, Entry]) + cacheClear <- CacheClearing.create(data, clearInterval) + } yield new Impl[F](data, cacheClear) - final private class Impl[F[_]]( + final private class Impl[F[_]: Sync]( data: Ref[F, Map[String, Entry]], - counter: Ref[F, Long], - cleaningProgress: Ref[F, Boolean] - )(implicit T: Timer[F], F: Concurrent[F]) - extends PipelineCache[F] { + cacheClear: CacheClearing[F] + ) extends PipelineCache[F] { - private[this] val clearInterval = 1.minute - private[this] val log = Logger.log4s(logger) - - def obtain(key: String, settings: StanfordNerSettings): F[StanfordCoreNLP] = + def obtain(key: String, settings: StanfordNerSettings): Resource[F, StanfordCoreNLP] = for { - id <- makeSettingsId(settings) - nlp <- data.modify(cache => getOrCreate(key, id, cache, settings)) - _ <- scheduleClearPipeline + _ <- cacheClear.withCache + id <- Resource.liftF(makeSettingsId(settings)) + nlp <- Resource.liftF(data.modify(cache => getOrCreate(key, id, cache, settings))) } yield nlp - private def scheduleClearPipeline: F[Unit] = - (for { - cnt <- OptionT(counter.tryModify(n => (n + 1, n + 1))) - free <- OptionT.liftF(cleaningProgress.access.flatMap { case (b, setter) => - if (b) false.pure[F] - else setter(true) - }) - _ <- OptionT.liftF( - if (free) - F.start( - T.sleep(clearInterval) *> cleaningProgress.set(false) *> clearStale(cnt) - ) - else ().pure[F] - ) - } yield ()).getOrElse(()) - - private def clearStale(n: Long): F[Unit] = - log.debug("Attempting to clear stanford nlp pipeline cache to free memory") *> - counter.get.flatMap(x => - if (x == n) clearAll - else - log.debug( - "Don't clear yet, as it has been used in between" - ) *> scheduleClearPipeline - ) - - private def clearAll: F[Unit] = - log.info("Clearing stanford nlp pipeline cache now!") *> - data.set(Map.empty) *> Sync[F].delay { - // turns out that everything is cached in a static map - StanfordCoreNLP.clearAnnotatorPool() - System.gc(); - } - private def getOrCreate( key: String, id: String, @@ -130,6 +93,77 @@ object PipelineCache { } + trait CacheClearing[F[_]] { + def withCache: Resource[F, Unit] + } + + object CacheClearing { + def none[F[_]: Applicative]: CacheClearing[F] = + new CacheClearing[F] { + def withCache: Resource[F, Unit] = + Resource.pure[F, Unit](()) + } + + def create[F[_]: Concurrent: Timer]( + data: Ref[F, Map[String, Entry]], + interval: Duration + ): F[CacheClearing[F]] = + for { + counter <- Ref.of(0L) + cleaning <- Ref.of(None: Option[Fiber[F, Unit]]) + log = Logger.log4s(logger) + result <- + if (interval.millis <= 0) + log + .info("Disable clearing StanfordNLP cache, due to config setting") + .map(_ => none[F]) + else + log + .info(s"Clearing StanfordNLP cache after $interval idle time") + .map(_ => + new CacheClearingImpl[F](data, counter, cleaning, interval.toScala) + ) + } yield result + } + + final private class CacheClearingImpl[F[_]]( + data: Ref[F, Map[String, Entry]], + counter: Ref[F, Long], + cleaningFiber: Ref[F, Option[Fiber[F, Unit]]], + clearInterval: FiniteDuration + )(implicit T: Timer[F], F: Concurrent[F]) + extends CacheClearing[F] { + private[this] val log = Logger.log4s[F](logger) + + def withCache: Resource[F, Unit] = + Resource.make(counter.update(_ + 1))(_ => + counter.updateAndGet(_ - 1).flatMap(n => scheduleClearPipeline(n)) + ) + + def scheduleClearPipeline(cnt: Long): F[Unit] = + if (cnt > 0) cancelClear + else cancelClear *> clearAllLater.flatMap(fiber => cleaningFiber.set(fiber.some)) + + private def cancelClear: F[Unit] = + cleaningFiber.getAndSet(None).flatMap { + case Some(fiber) => fiber.cancel *> logDontClear + case None => ().pure[F] + } + + private def clearAllLater: F[Fiber[F, Unit]] = + F.start(T.sleep(clearInterval) *> clearAll) + + private def logDontClear: F[Unit] = + log.info("Cancel stanford cache clearing, as it has been used in between.") + + def clearAll: F[Unit] = + log.info("Clearing stanford nlp cache now!") *> + data.set(Map.empty) *> Sync[F].delay { + // turns out that everything is cached in a static map + StanfordCoreNLP.clearAnnotatorPool() + System.gc(); + } + } private def makeClassifier(settings: StanfordNerSettings): StanfordCoreNLP = { logger.info(s"Creating ${settings.lang.name} Stanford NLP NER classifier...") diff --git a/modules/analysis/src/main/scala/docspell/analysis/nlp/StanfordNerClassifier.scala b/modules/analysis/src/main/scala/docspell/analysis/nlp/StanfordNerClassifier.scala index 383a07ea..b82b1c95 100644 --- a/modules/analysis/src/main/scala/docspell/analysis/nlp/StanfordNerClassifier.scala +++ b/modules/analysis/src/main/scala/docspell/analysis/nlp/StanfordNerClassifier.scala @@ -1,12 +1,10 @@ package docspell.analysis.nlp -import scala.jdk.CollectionConverters._ - import cats.Applicative -import cats.implicits._ +import scala.jdk.CollectionConverters._ +import cats.effect._ import docspell.common._ - import edu.stanford.nlp.pipeline.{CoreDocument, StanfordCoreNLP} object StanfordNerClassifier { @@ -22,13 +20,13 @@ object StanfordNerClassifier { * a new classifier must be created. It will then replace the * previous one. */ - def nerAnnotate[F[_]: Applicative]( + def nerAnnotate[F[_]: BracketThrow]( cacheKey: String, cache: PipelineCache[F] )(settings: StanfordNerSettings, text: String): F[Vector[NerLabel]] = cache .obtain(cacheKey, settings) - .map(crf => runClassifier(crf, text)) + .use(crf => Applicative[F].pure(runClassifier(crf, text))) def runClassifier(nerClassifier: StanfordCoreNLP, text: String): Vector[NerLabel] = { val doc = new CoreDocument(text) diff --git a/modules/joex/src/main/resources/reference.conf b/modules/joex/src/main/resources/reference.conf index 8437982d..9561be58 100644 --- a/modules/joex/src/main/resources/reference.conf +++ b/modules/joex/src/main/resources/reference.conf @@ -276,6 +276,13 @@ docspell.joex { # files. working-dir = ${java.io.tmpdir}"/docspell-analysis" + # The StanfordCoreNLP library caches language models which + # requires quite some amount of memory. Setting this interval to a + # positive duration, the cache is cleared after this amount of + # idle time. Set it to 0 to disable it if you have enough memory, + # processing will be faster. + clear-stanford-nlp-interval = "15 minutes" + regex-ner { # Whether to enable custom NER annotation. This uses the address # book of a collective as input for NER tagging (to automatically diff --git a/modules/joex/src/main/scala/docspell/joex/Config.scala b/modules/joex/src/main/scala/docspell/joex/Config.scala index cbbb4a33..601d0049 100644 --- a/modules/joex/src/main/scala/docspell/joex/Config.scala +++ b/modules/joex/src/main/scala/docspell/joex/Config.scala @@ -60,6 +60,7 @@ object Config { case class TextAnalysis( maxLength: Int, workingDir: Path, + clearStanfordNlpInterval: Duration, regexNer: RegexNer, classification: Classification ) { @@ -67,6 +68,7 @@ object Config { def textAnalysisConfig: TextAnalysisConfig = TextAnalysisConfig( maxLength, + clearStanfordNlpInterval, TextClassifierConfig( workingDir, NonEmptyList