Make idle interval when clearing nlp cache configurable

This commit is contained in:
Eike Kettner 2021-01-06 00:35:58 +01:00
parent 73a9572835
commit a670bbb6c2
6 changed files with 109 additions and 64 deletions

View File

@ -31,9 +31,11 @@ object TextAnalyser {
labels ++ dates.map(dl => dl.label.copy(label = dl.date.toString))
}
def create[F[_]: Concurrent: Timer](cfg: TextAnalysisConfig): Resource[F, TextAnalyser[F]] =
def create[F[_]: Concurrent: Timer](
cfg: TextAnalysisConfig
): Resource[F, TextAnalyser[F]] =
Resource
.liftF(PipelineCache[F]())
.liftF(PipelineCache[F](cfg.clearStanfordPipelineInterval))
.map(cache =>
new TextAnalyser[F] {
def annotate(

View File

@ -1,8 +1,10 @@
package docspell.analysis
import docspell.analysis.nlp.TextClassifierConfig
import docspell.common._
case class TextAnalysisConfig(
maxLength: Int,
clearStanfordPipelineInterval: Duration,
classifier: TextClassifierConfig
)

View File

@ -1,5 +1,7 @@
package docspell.analysis.nlp
import scala.concurrent.duration.{Duration => _, _}
import cats.Applicative
import cats.effect._
import cats.effect.concurrent.Ref
@ -9,8 +11,6 @@ import docspell.common._
import edu.stanford.nlp.pipeline.StanfordCoreNLP
import org.log4s.getLogger
import scala.concurrent.duration._
import cats.data.OptionT
/** Creating the StanfordCoreNLP pipeline is quite expensive as it
* involves IO and initializing large objects.
@ -21,7 +21,7 @@ import cats.data.OptionT
*/
trait PipelineCache[F[_]] {
def obtain(key: String, settings: StanfordNerSettings): F[StanfordCoreNLP]
def obtain(key: String, settings: StanfordNerSettings): Resource[F, StanfordCoreNLP]
}
@ -30,68 +30,31 @@ object PipelineCache {
def none[F[_]: Applicative]: PipelineCache[F] =
new PipelineCache[F] {
def obtain(ignored: String, settings: StanfordNerSettings): F[StanfordCoreNLP] =
makeClassifier(settings).pure[F]
def obtain(
ignored: String,
settings: StanfordNerSettings
): Resource[F, StanfordCoreNLP] =
Resource.liftF(makeClassifier(settings).pure[F])
}
def apply[F[_]: Concurrent: Timer](): F[PipelineCache[F]] =
def apply[F[_]: Concurrent: Timer](clearInterval: Duration): F[PipelineCache[F]] =
for {
data <- Ref.of(Map.empty[String, Entry])
counter <- Ref.of(Long.MinValue)
cleaning <- Ref.of(false)
} yield new Impl[F](data, counter, cleaning): PipelineCache[F]
data <- Ref.of(Map.empty[String, Entry])
cacheClear <- CacheClearing.create(data, clearInterval)
} yield new Impl[F](data, cacheClear)
final private class Impl[F[_]](
final private class Impl[F[_]: Sync](
data: Ref[F, Map[String, Entry]],
counter: Ref[F, Long],
cleaningProgress: Ref[F, Boolean]
)(implicit T: Timer[F], F: Concurrent[F])
extends PipelineCache[F] {
cacheClear: CacheClearing[F]
) extends PipelineCache[F] {
private[this] val clearInterval = 1.minute
private[this] val log = Logger.log4s(logger)
def obtain(key: String, settings: StanfordNerSettings): F[StanfordCoreNLP] =
def obtain(key: String, settings: StanfordNerSettings): Resource[F, StanfordCoreNLP] =
for {
id <- makeSettingsId(settings)
nlp <- data.modify(cache => getOrCreate(key, id, cache, settings))
_ <- scheduleClearPipeline
_ <- cacheClear.withCache
id <- Resource.liftF(makeSettingsId(settings))
nlp <- Resource.liftF(data.modify(cache => getOrCreate(key, id, cache, settings)))
} yield nlp
private def scheduleClearPipeline: F[Unit] =
(for {
cnt <- OptionT(counter.tryModify(n => (n + 1, n + 1)))
free <- OptionT.liftF(cleaningProgress.access.flatMap { case (b, setter) =>
if (b) false.pure[F]
else setter(true)
})
_ <- OptionT.liftF(
if (free)
F.start(
T.sleep(clearInterval) *> cleaningProgress.set(false) *> clearStale(cnt)
)
else ().pure[F]
)
} yield ()).getOrElse(())
private def clearStale(n: Long): F[Unit] =
log.debug("Attempting to clear stanford nlp pipeline cache to free memory") *>
counter.get.flatMap(x =>
if (x == n) clearAll
else
log.debug(
"Don't clear yet, as it has been used in between"
) *> scheduleClearPipeline
)
private def clearAll: F[Unit] =
log.info("Clearing stanford nlp pipeline cache now!") *>
data.set(Map.empty) *> Sync[F].delay {
// turns out that everything is cached in a static map
StanfordCoreNLP.clearAnnotatorPool()
System.gc();
}
private def getOrCreate(
key: String,
id: String,
@ -130,6 +93,77 @@ object PipelineCache {
}
trait CacheClearing[F[_]] {
def withCache: Resource[F, Unit]
}
object CacheClearing {
def none[F[_]: Applicative]: CacheClearing[F] =
new CacheClearing[F] {
def withCache: Resource[F, Unit] =
Resource.pure[F, Unit](())
}
def create[F[_]: Concurrent: Timer](
data: Ref[F, Map[String, Entry]],
interval: Duration
): F[CacheClearing[F]] =
for {
counter <- Ref.of(0L)
cleaning <- Ref.of(None: Option[Fiber[F, Unit]])
log = Logger.log4s(logger)
result <-
if (interval.millis <= 0)
log
.info("Disable clearing StanfordNLP cache, due to config setting")
.map(_ => none[F])
else
log
.info(s"Clearing StanfordNLP cache after $interval idle time")
.map(_ =>
new CacheClearingImpl[F](data, counter, cleaning, interval.toScala)
)
} yield result
}
final private class CacheClearingImpl[F[_]](
data: Ref[F, Map[String, Entry]],
counter: Ref[F, Long],
cleaningFiber: Ref[F, Option[Fiber[F, Unit]]],
clearInterval: FiniteDuration
)(implicit T: Timer[F], F: Concurrent[F])
extends CacheClearing[F] {
private[this] val log = Logger.log4s[F](logger)
def withCache: Resource[F, Unit] =
Resource.make(counter.update(_ + 1))(_ =>
counter.updateAndGet(_ - 1).flatMap(n => scheduleClearPipeline(n))
)
def scheduleClearPipeline(cnt: Long): F[Unit] =
if (cnt > 0) cancelClear
else cancelClear *> clearAllLater.flatMap(fiber => cleaningFiber.set(fiber.some))
private def cancelClear: F[Unit] =
cleaningFiber.getAndSet(None).flatMap {
case Some(fiber) => fiber.cancel *> logDontClear
case None => ().pure[F]
}
private def clearAllLater: F[Fiber[F, Unit]] =
F.start(T.sleep(clearInterval) *> clearAll)
private def logDontClear: F[Unit] =
log.info("Cancel stanford cache clearing, as it has been used in between.")
def clearAll: F[Unit] =
log.info("Clearing stanford nlp cache now!") *>
data.set(Map.empty) *> Sync[F].delay {
// turns out that everything is cached in a static map
StanfordCoreNLP.clearAnnotatorPool()
System.gc();
}
}
private def makeClassifier(settings: StanfordNerSettings): StanfordCoreNLP = {
logger.info(s"Creating ${settings.lang.name} Stanford NLP NER classifier...")

View File

@ -1,12 +1,10 @@
package docspell.analysis.nlp
import scala.jdk.CollectionConverters._
import cats.Applicative
import cats.implicits._
import scala.jdk.CollectionConverters._
import cats.effect._
import docspell.common._
import edu.stanford.nlp.pipeline.{CoreDocument, StanfordCoreNLP}
object StanfordNerClassifier {
@ -22,13 +20,13 @@ object StanfordNerClassifier {
* a new classifier must be created. It will then replace the
* previous one.
*/
def nerAnnotate[F[_]: Applicative](
def nerAnnotate[F[_]: BracketThrow](
cacheKey: String,
cache: PipelineCache[F]
)(settings: StanfordNerSettings, text: String): F[Vector[NerLabel]] =
cache
.obtain(cacheKey, settings)
.map(crf => runClassifier(crf, text))
.use(crf => Applicative[F].pure(runClassifier(crf, text)))
def runClassifier(nerClassifier: StanfordCoreNLP, text: String): Vector[NerLabel] = {
val doc = new CoreDocument(text)

View File

@ -276,6 +276,13 @@ docspell.joex {
# files.
working-dir = ${java.io.tmpdir}"/docspell-analysis"
# The StanfordCoreNLP library caches language models which
# requires quite some amount of memory. Setting this interval to a
# positive duration, the cache is cleared after this amount of
# idle time. Set it to 0 to disable it if you have enough memory,
# processing will be faster.
clear-stanford-nlp-interval = "15 minutes"
regex-ner {
# Whether to enable custom NER annotation. This uses the address
# book of a collective as input for NER tagging (to automatically

View File

@ -60,6 +60,7 @@ object Config {
case class TextAnalysis(
maxLength: Int,
workingDir: Path,
clearStanfordNlpInterval: Duration,
regexNer: RegexNer,
classification: Classification
) {
@ -67,6 +68,7 @@ object Config {
def textAnalysisConfig: TextAnalysisConfig =
TextAnalysisConfig(
maxLength,
clearStanfordNlpInterval,
TextClassifierConfig(
workingDir,
NonEmptyList