Make pipeline cache generic to be used with BasicCRFAnnotator

This commit is contained in:
Eike Kettner 2021-01-13 22:59:02 +01:00
parent 4462ebae0f
commit a77f67d73a
3 changed files with 68 additions and 44 deletions

View File

@ -33,7 +33,12 @@ object TextAnalyser {
blocker: Blocker blocker: Blocker
): Resource[F, TextAnalyser[F]] = ): Resource[F, TextAnalyser[F]] =
Resource Resource
.liftF(PipelineCache[F](cfg.clearStanfordPipelineInterval)) .liftF(
PipelineCache(cfg.clearStanfordPipelineInterval)(
StanfordNerAnnotator.makePipeline,
StanfordNerAnnotator.clearPipelineCaches[F]
)
)
.map(cache => .map(cache =>
new TextAnalyser[F] { new TextAnalyser[F] {
def annotate( def annotate(

View File

@ -1,15 +1,12 @@
package docspell.analysis.nlp package docspell.analysis.nlp
import scala.concurrent.duration.{Duration => _, _} import scala.concurrent.duration.{Duration => _, _}
import cats.Applicative import cats.Applicative
import cats.data.Kleisli
import cats.effect._ import cats.effect._
import cats.effect.concurrent.Ref import cats.effect.concurrent.Ref
import cats.implicits._ import cats.implicits._
import docspell.common._ import docspell.common._
import edu.stanford.nlp.pipeline.StanfordCoreNLP
import org.log4s.getLogger import org.log4s.getLogger
/** Creating the StanfordCoreNLP pipeline is quite expensive as it /** Creating the StanfordCoreNLP pipeline is quite expensive as it
@ -19,48 +16,57 @@ import org.log4s.getLogger
* *
* **This is an internal API** * **This is an internal API**
*/ */
trait PipelineCache[F[_]] { trait PipelineCache[F[_], A] {
def obtain(key: String, settings: StanfordNerSettings): Resource[F, StanfordCoreNLP] def obtain(key: String, settings: StanfordNerSettings): Resource[F, A]
} }
object PipelineCache { object PipelineCache {
private[this] val logger = getLogger private[this] val logger = getLogger
def none[F[_]: Applicative]: PipelineCache[F] = def none[F[_]: Applicative, A](
new PipelineCache[F] { creator: Kleisli[F, StanfordNerSettings, A]
): PipelineCache[F, A] =
new PipelineCache[F, A] {
def obtain( def obtain(
ignored: String, ignored: String,
settings: StanfordNerSettings settings: StanfordNerSettings
): Resource[F, StanfordCoreNLP] = ): Resource[F, A] =
Resource.liftF(makeClassifier(settings).pure[F]) Resource.liftF(creator.run(settings))
} }
def apply[F[_]: Concurrent: Timer](clearInterval: Duration): F[PipelineCache[F]] = def apply[F[_]: Concurrent: Timer, A](clearInterval: Duration)(
creator: StanfordNerSettings => A,
release: F[Unit]
): F[PipelineCache[F, A]] =
for { for {
data <- Ref.of(Map.empty[String, Entry]) data <- Ref.of(Map.empty[String, Entry[A]])
cacheClear <- CacheClearing.create(data, clearInterval) cacheClear <- CacheClearing.create(data, clearInterval, release)
} yield new Impl[F](data, cacheClear) } yield new Impl[F, A](data, creator, cacheClear)
final private class Impl[F[_]: Sync]( final private class Impl[F[_]: Sync, A](
data: Ref[F, Map[String, Entry]], data: Ref[F, Map[String, Entry[A]]],
creator: StanfordNerSettings => A,
cacheClear: CacheClearing[F] cacheClear: CacheClearing[F]
) extends PipelineCache[F] { ) extends PipelineCache[F, A] {
def obtain(key: String, settings: StanfordNerSettings): Resource[F, StanfordCoreNLP] = def obtain(key: String, settings: StanfordNerSettings): Resource[F, A] =
for { for {
_ <- cacheClear.withCache _ <- cacheClear.withCache
id <- Resource.liftF(makeSettingsId(settings)) id <- Resource.liftF(makeSettingsId(settings))
nlp <- Resource.liftF(data.modify(cache => getOrCreate(key, id, cache, settings))) nlp <- Resource.liftF(
data.modify(cache => getOrCreate(key, id, cache, settings, creator))
)
} yield nlp } yield nlp
private def getOrCreate( private def getOrCreate(
key: String, key: String,
id: String, id: String,
cache: Map[String, Entry], cache: Map[String, Entry[A]],
settings: StanfordNerSettings settings: StanfordNerSettings,
): (Map[String, Entry], StanfordCoreNLP) = creator: StanfordNerSettings => A
): (Map[String, Entry[A]], A) =
cache.get(key) match { cache.get(key) match {
case Some(entry) => case Some(entry) =>
if (entry.id == id) (cache, entry.value) if (entry.id == id) (cache, entry.value)
@ -68,13 +74,13 @@ object PipelineCache {
logger.info( logger.info(
s"StanfordNLP settings changed for key $key. Creating new classifier" s"StanfordNLP settings changed for key $key. Creating new classifier"
) )
val nlp = makeClassifier(settings) val nlp = creator(settings)
val e = Entry(id, nlp) val e = Entry(id, nlp)
(cache.updated(key, e), nlp) (cache.updated(key, e), nlp)
} }
case None => case None =>
val nlp = makeClassifier(settings) val nlp = creator(settings)
val e = Entry(id, nlp) val e = Entry(id, nlp)
(cache.updated(key, e), nlp) (cache.updated(key, e), nlp)
} }
@ -104,9 +110,10 @@ object PipelineCache {
Resource.pure[F, Unit](()) Resource.pure[F, Unit](())
} }
def create[F[_]: Concurrent: Timer]( def create[F[_]: Concurrent: Timer, A](
data: Ref[F, Map[String, Entry]], data: Ref[F, Map[String, Entry[A]]],
interval: Duration interval: Duration,
release: F[Unit]
): F[CacheClearing[F]] = ): F[CacheClearing[F]] =
for { for {
counter <- Ref.of(0L) counter <- Ref.of(0L)
@ -121,16 +128,23 @@ object PipelineCache {
log log
.info(s"Clearing StanfordNLP cache after $interval idle time") .info(s"Clearing StanfordNLP cache after $interval idle time")
.map(_ => .map(_ =>
new CacheClearingImpl[F](data, counter, cleaning, interval.toScala) new CacheClearingImpl[F, A](
data,
counter,
cleaning,
interval.toScala,
release
)
) )
} yield result } yield result
} }
final private class CacheClearingImpl[F[_]]( final private class CacheClearingImpl[F[_], A](
data: Ref[F, Map[String, Entry]], data: Ref[F, Map[String, Entry[A]]],
counter: Ref[F, Long], counter: Ref[F, Long],
cleaningFiber: Ref[F, Option[Fiber[F, Unit]]], cleaningFiber: Ref[F, Option[Fiber[F, Unit]]],
clearInterval: FiniteDuration clearInterval: FiniteDuration,
release: F[Unit]
)(implicit T: Timer[F], F: Concurrent[F]) )(implicit T: Timer[F], F: Concurrent[F])
extends CacheClearing[F] { extends CacheClearing[F] {
private[this] val log = Logger.log4s[F](logger) private[this] val log = Logger.log4s[F](logger)
@ -158,17 +172,10 @@ object PipelineCache {
def clearAll: F[Unit] = def clearAll: F[Unit] =
log.info("Clearing stanford nlp cache now!") *> log.info("Clearing stanford nlp cache now!") *>
data.set(Map.empty) *> Sync[F].delay { data.set(Map.empty) *> release *> Sync[F].delay {
// turns out that everything is cached in a static map
StanfordCoreNLP.clearAnnotatorPool()
System.gc(); System.gc();
} }
} }
private def makeClassifier(settings: StanfordNerSettings): StanfordCoreNLP = { private case class Entry[A](id: String, value: A)
logger.info(s"Creating ${settings.lang.name} Stanford NLP NER classifier...")
new StanfordCoreNLP(Properties.forSettings(settings))
}
private case class Entry(id: String, value: StanfordCoreNLP)
} }

View File

@ -8,8 +8,10 @@ import cats.effect._
import docspell.common._ import docspell.common._
import edu.stanford.nlp.pipeline.{CoreDocument, StanfordCoreNLP} import edu.stanford.nlp.pipeline.{CoreDocument, StanfordCoreNLP}
import org.log4s.getLogger
object StanfordNerAnnotator { object StanfordNerAnnotator {
private[this] val logger = getLogger
/** Runs named entity recognition on the given `text`. /** Runs named entity recognition on the given `text`.
* *
@ -24,7 +26,7 @@ object StanfordNerAnnotator {
*/ */
def nerAnnotate[F[_]: BracketThrow]( def nerAnnotate[F[_]: BracketThrow](
cacheKey: String, cacheKey: String,
cache: PipelineCache[F] cache: PipelineCache[F, StanfordCoreNLP]
)(settings: StanfordNerSettings, text: String): F[Vector[NerLabel]] = )(settings: StanfordNerSettings, text: String): F[Vector[NerLabel]] =
cache cache
.obtain(cacheKey, settings) .obtain(cacheKey, settings)
@ -36,4 +38,14 @@ object StanfordNerAnnotator {
doc.tokens().asScala.collect(Function.unlift(LabelConverter.toNerLabel)).toVector doc.tokens().asScala.collect(Function.unlift(LabelConverter.toNerLabel)).toVector
} }
def makePipeline(settings: StanfordNerSettings): StanfordCoreNLP = {
logger.info(s"Creating ${settings.lang.name} Stanford NLP NER classifier...")
new StanfordCoreNLP(Properties.forSettings(settings))
}
def clearPipelineCaches[F[_]: Sync]: F[Unit] =
Sync[F].delay {
// turns out that everything is cached in a static map
StanfordCoreNLP.clearAnnotatorPool()
}
} }