From a77f67d73ab4ef5c850d388404e14212c545afe9 Mon Sep 17 00:00:00 2001 From: Eike Kettner Date: Wed, 13 Jan 2021 22:59:02 +0100 Subject: [PATCH] Make pipeline cache generic to be used with BasicCRFAnnotator --- .../docspell/analysis/TextAnalyser.scala | 7 +- .../docspell/analysis/nlp/PipelineCache.scala | 91 ++++++++++--------- .../analysis/nlp/StanfordNerAnnotator.scala | 14 ++- 3 files changed, 68 insertions(+), 44 deletions(-) diff --git a/modules/analysis/src/main/scala/docspell/analysis/TextAnalyser.scala b/modules/analysis/src/main/scala/docspell/analysis/TextAnalyser.scala index b67347ae..6c8e6cff 100644 --- a/modules/analysis/src/main/scala/docspell/analysis/TextAnalyser.scala +++ b/modules/analysis/src/main/scala/docspell/analysis/TextAnalyser.scala @@ -33,7 +33,12 @@ object TextAnalyser { blocker: Blocker ): Resource[F, TextAnalyser[F]] = Resource - .liftF(PipelineCache[F](cfg.clearStanfordPipelineInterval)) + .liftF( + PipelineCache(cfg.clearStanfordPipelineInterval)( + StanfordNerAnnotator.makePipeline, + StanfordNerAnnotator.clearPipelineCaches[F] + ) + ) .map(cache => new TextAnalyser[F] { def annotate( diff --git a/modules/analysis/src/main/scala/docspell/analysis/nlp/PipelineCache.scala b/modules/analysis/src/main/scala/docspell/analysis/nlp/PipelineCache.scala index 9cc3f2d7..61598f9a 100644 --- a/modules/analysis/src/main/scala/docspell/analysis/nlp/PipelineCache.scala +++ b/modules/analysis/src/main/scala/docspell/analysis/nlp/PipelineCache.scala @@ -1,15 +1,12 @@ package docspell.analysis.nlp import scala.concurrent.duration.{Duration => _, _} - import cats.Applicative +import cats.data.Kleisli import cats.effect._ import cats.effect.concurrent.Ref import cats.implicits._ - import docspell.common._ - -import edu.stanford.nlp.pipeline.StanfordCoreNLP import org.log4s.getLogger /** Creating the StanfordCoreNLP pipeline is quite expensive as it @@ -19,48 +16,57 @@ import org.log4s.getLogger * * **This is an internal API** */ -trait PipelineCache[F[_]] { +trait PipelineCache[F[_], A] { - def obtain(key: String, settings: StanfordNerSettings): Resource[F, StanfordCoreNLP] + def obtain(key: String, settings: StanfordNerSettings): Resource[F, A] } object PipelineCache { private[this] val logger = getLogger - def none[F[_]: Applicative]: PipelineCache[F] = - new PipelineCache[F] { + def none[F[_]: Applicative, A]( + creator: Kleisli[F, StanfordNerSettings, A] + ): PipelineCache[F, A] = + new PipelineCache[F, A] { def obtain( ignored: String, settings: StanfordNerSettings - ): Resource[F, StanfordCoreNLP] = - Resource.liftF(makeClassifier(settings).pure[F]) + ): Resource[F, A] = + Resource.liftF(creator.run(settings)) } - def apply[F[_]: Concurrent: Timer](clearInterval: Duration): F[PipelineCache[F]] = + def apply[F[_]: Concurrent: Timer, A](clearInterval: Duration)( + creator: StanfordNerSettings => A, + release: F[Unit] + ): F[PipelineCache[F, A]] = for { - data <- Ref.of(Map.empty[String, Entry]) - cacheClear <- CacheClearing.create(data, clearInterval) - } yield new Impl[F](data, cacheClear) + data <- Ref.of(Map.empty[String, Entry[A]]) + cacheClear <- CacheClearing.create(data, clearInterval, release) + } yield new Impl[F, A](data, creator, cacheClear) - final private class Impl[F[_]: Sync]( - data: Ref[F, Map[String, Entry]], + final private class Impl[F[_]: Sync, A]( + data: Ref[F, Map[String, Entry[A]]], + creator: StanfordNerSettings => A, cacheClear: CacheClearing[F] - ) extends PipelineCache[F] { + ) extends PipelineCache[F, A] { - def obtain(key: String, settings: StanfordNerSettings): Resource[F, StanfordCoreNLP] = + def obtain(key: String, settings: StanfordNerSettings): Resource[F, A] = for { - _ <- cacheClear.withCache - id <- Resource.liftF(makeSettingsId(settings)) - nlp <- Resource.liftF(data.modify(cache => getOrCreate(key, id, cache, settings))) + _ <- cacheClear.withCache + id <- Resource.liftF(makeSettingsId(settings)) + nlp <- Resource.liftF( + data.modify(cache => getOrCreate(key, id, cache, settings, creator)) + ) } yield nlp private def getOrCreate( key: String, id: String, - cache: Map[String, Entry], - settings: StanfordNerSettings - ): (Map[String, Entry], StanfordCoreNLP) = + cache: Map[String, Entry[A]], + settings: StanfordNerSettings, + creator: StanfordNerSettings => A + ): (Map[String, Entry[A]], A) = cache.get(key) match { case Some(entry) => if (entry.id == id) (cache, entry.value) @@ -68,13 +74,13 @@ object PipelineCache { logger.info( s"StanfordNLP settings changed for key $key. Creating new classifier" ) - val nlp = makeClassifier(settings) + val nlp = creator(settings) val e = Entry(id, nlp) (cache.updated(key, e), nlp) } case None => - val nlp = makeClassifier(settings) + val nlp = creator(settings) val e = Entry(id, nlp) (cache.updated(key, e), nlp) } @@ -104,9 +110,10 @@ object PipelineCache { Resource.pure[F, Unit](()) } - def create[F[_]: Concurrent: Timer]( - data: Ref[F, Map[String, Entry]], - interval: Duration + def create[F[_]: Concurrent: Timer, A]( + data: Ref[F, Map[String, Entry[A]]], + interval: Duration, + release: F[Unit] ): F[CacheClearing[F]] = for { counter <- Ref.of(0L) @@ -121,16 +128,23 @@ object PipelineCache { log .info(s"Clearing StanfordNLP cache after $interval idle time") .map(_ => - new CacheClearingImpl[F](data, counter, cleaning, interval.toScala) + new CacheClearingImpl[F, A]( + data, + counter, + cleaning, + interval.toScala, + release + ) ) } yield result } - final private class CacheClearingImpl[F[_]]( - data: Ref[F, Map[String, Entry]], + final private class CacheClearingImpl[F[_], A]( + data: Ref[F, Map[String, Entry[A]]], counter: Ref[F, Long], cleaningFiber: Ref[F, Option[Fiber[F, Unit]]], - clearInterval: FiniteDuration + clearInterval: FiniteDuration, + release: F[Unit] )(implicit T: Timer[F], F: Concurrent[F]) extends CacheClearing[F] { private[this] val log = Logger.log4s[F](logger) @@ -158,17 +172,10 @@ object PipelineCache { def clearAll: F[Unit] = log.info("Clearing stanford nlp cache now!") *> - data.set(Map.empty) *> Sync[F].delay { - // turns out that everything is cached in a static map - StanfordCoreNLP.clearAnnotatorPool() + data.set(Map.empty) *> release *> Sync[F].delay { System.gc(); } } - private def makeClassifier(settings: StanfordNerSettings): StanfordCoreNLP = { - logger.info(s"Creating ${settings.lang.name} Stanford NLP NER classifier...") - new StanfordCoreNLP(Properties.forSettings(settings)) - } - - private case class Entry(id: String, value: StanfordCoreNLP) + private case class Entry[A](id: String, value: A) } diff --git a/modules/analysis/src/main/scala/docspell/analysis/nlp/StanfordNerAnnotator.scala b/modules/analysis/src/main/scala/docspell/analysis/nlp/StanfordNerAnnotator.scala index df9fa431..37b54b40 100644 --- a/modules/analysis/src/main/scala/docspell/analysis/nlp/StanfordNerAnnotator.scala +++ b/modules/analysis/src/main/scala/docspell/analysis/nlp/StanfordNerAnnotator.scala @@ -8,8 +8,10 @@ import cats.effect._ import docspell.common._ import edu.stanford.nlp.pipeline.{CoreDocument, StanfordCoreNLP} +import org.log4s.getLogger object StanfordNerAnnotator { + private[this] val logger = getLogger /** Runs named entity recognition on the given `text`. * @@ -24,7 +26,7 @@ object StanfordNerAnnotator { */ def nerAnnotate[F[_]: BracketThrow]( cacheKey: String, - cache: PipelineCache[F] + cache: PipelineCache[F, StanfordCoreNLP] )(settings: StanfordNerSettings, text: String): F[Vector[NerLabel]] = cache .obtain(cacheKey, settings) @@ -36,4 +38,14 @@ object StanfordNerAnnotator { doc.tokens().asScala.collect(Function.unlift(LabelConverter.toNerLabel)).toVector } + def makePipeline(settings: StanfordNerSettings): StanfordCoreNLP = { + logger.info(s"Creating ${settings.lang.name} Stanford NLP NER classifier...") + new StanfordCoreNLP(Properties.forSettings(settings)) + } + + def clearPipelineCaches[F[_]: Sync]: F[Unit] = + Sync[F].delay { + // turns out that everything is cached in a static map + StanfordCoreNLP.clearAnnotatorPool() + } }