mirror of
				https://github.com/TheAnachronism/docspell.git
				synced 2025-10-31 17:50:11 +00:00 
			
		
		
		
	Poc for clearing stanford pipeline after some idle time
This commit is contained in:
		| @@ -31,7 +31,7 @@ object TextAnalyser { | |||||||
|       labels ++ dates.map(dl => dl.label.copy(label = dl.date.toString)) |       labels ++ dates.map(dl => dl.label.copy(label = dl.date.toString)) | ||||||
|   } |   } | ||||||
|  |  | ||||||
|   def create[F[_]: Sync](cfg: TextAnalysisConfig): Resource[F, TextAnalyser[F]] = |   def create[F[_]: Concurrent: Timer](cfg: TextAnalysisConfig): Resource[F, TextAnalyser[F]] = | ||||||
|     Resource |     Resource | ||||||
|       .liftF(PipelineCache[F]()) |       .liftF(PipelineCache[F]()) | ||||||
|       .map(cache => |       .map(cache => | ||||||
|   | |||||||
| @@ -9,6 +9,8 @@ import docspell.common._ | |||||||
|  |  | ||||||
| import edu.stanford.nlp.pipeline.StanfordCoreNLP | import edu.stanford.nlp.pipeline.StanfordCoreNLP | ||||||
| import org.log4s.getLogger | import org.log4s.getLogger | ||||||
|  | import scala.concurrent.duration._ | ||||||
|  | import cats.data.OptionT | ||||||
|  |  | ||||||
| /** Creating the StanfordCoreNLP pipeline is quite expensive as it | /** Creating the StanfordCoreNLP pipeline is quite expensive as it | ||||||
|   * involves IO and initializing large objects. |   * involves IO and initializing large objects. | ||||||
| @@ -32,18 +34,64 @@ object PipelineCache { | |||||||
|         makeClassifier(settings).pure[F] |         makeClassifier(settings).pure[F] | ||||||
|     } |     } | ||||||
|  |  | ||||||
|   def apply[F[_]: Sync](): F[PipelineCache[F]] = |   def apply[F[_]: Concurrent: Timer](): F[PipelineCache[F]] = | ||||||
|     Ref.of(Map.empty[String, Entry]).map(data => (new Impl[F](data): PipelineCache[F])) |     for { | ||||||
|  |       data     <- Ref.of(Map.empty[String, Entry]) | ||||||
|  |       counter  <- Ref.of(Long.MinValue) | ||||||
|  |       cleaning <- Ref.of(false) | ||||||
|  |     } yield new Impl[F](data, counter, cleaning): PipelineCache[F] | ||||||
|  |  | ||||||
|   final private class Impl[F[_]: Sync](data: Ref[F, Map[String, Entry]]) |   final private class Impl[F[_]]( | ||||||
|  |       data: Ref[F, Map[String, Entry]], | ||||||
|  |       counter: Ref[F, Long], | ||||||
|  |       cleaningProgress: Ref[F, Boolean] | ||||||
|  |   )(implicit T: Timer[F], F: Concurrent[F]) | ||||||
|       extends PipelineCache[F] { |       extends PipelineCache[F] { | ||||||
|  |  | ||||||
|  |     private[this] val clearInterval = 1.minute | ||||||
|  |     private[this] val log           = Logger.log4s(logger) | ||||||
|  |  | ||||||
|     def obtain(key: String, settings: StanfordNerSettings): F[StanfordCoreNLP] = |     def obtain(key: String, settings: StanfordNerSettings): F[StanfordCoreNLP] = | ||||||
|       for { |       for { | ||||||
|         id  <- makeSettingsId(settings) |         id  <- makeSettingsId(settings) | ||||||
|         nlp <- data.modify(cache => getOrCreate(key, id, cache, settings)) |         nlp <- data.modify(cache => getOrCreate(key, id, cache, settings)) | ||||||
|  |         _   <- scheduleClearPipeline | ||||||
|       } yield nlp |       } yield nlp | ||||||
|  |  | ||||||
|  |     private def scheduleClearPipeline: F[Unit] = | ||||||
|  |       (for { | ||||||
|  |         cnt <- OptionT(counter.tryModify(n => (n + 1, n + 1))) | ||||||
|  |         free <- OptionT.liftF(cleaningProgress.access.flatMap { case (b, setter) => | ||||||
|  |           if (b) false.pure[F] | ||||||
|  |           else setter(true) | ||||||
|  |         }) | ||||||
|  |         _ <- OptionT.liftF( | ||||||
|  |           if (free) | ||||||
|  |             F.start( | ||||||
|  |               T.sleep(clearInterval) *> cleaningProgress.set(false) *> clearStale(cnt) | ||||||
|  |             ) | ||||||
|  |           else ().pure[F] | ||||||
|  |         ) | ||||||
|  |       } yield ()).getOrElse(()) | ||||||
|  |  | ||||||
|  |     private def clearStale(n: Long): F[Unit] = | ||||||
|  |       log.debug("Attempting to clear stanford nlp pipeline cache to free memory") *> | ||||||
|  |         counter.get.flatMap(x => | ||||||
|  |           if (x == n) clearAll | ||||||
|  |           else | ||||||
|  |             log.debug( | ||||||
|  |               "Don't clear yet, as it has been used in between" | ||||||
|  |             ) *> scheduleClearPipeline | ||||||
|  |         ) | ||||||
|  |  | ||||||
|  |     private def clearAll: F[Unit] = | ||||||
|  |       log.info("Clearing stanford nlp pipeline cache now!") *> | ||||||
|  |         data.set(Map.empty) *> Sync[F].delay { | ||||||
|  |           // turns out that everything is cached in a static map | ||||||
|  |           StanfordCoreNLP.clearAnnotatorPool() | ||||||
|  |           System.gc(); | ||||||
|  |         } | ||||||
|  |  | ||||||
|     private def getOrCreate( |     private def getOrCreate( | ||||||
|         key: String, |         key: String, | ||||||
|         id: String, |         id: String, | ||||||
| @@ -81,6 +129,8 @@ object PipelineCache { | |||||||
|     } |     } | ||||||
|  |  | ||||||
|   } |   } | ||||||
|  |  | ||||||
|  |  | ||||||
|   private def makeClassifier(settings: StanfordNerSettings): StanfordCoreNLP = { |   private def makeClassifier(settings: StanfordNerSettings): StanfordCoreNLP = { | ||||||
|     logger.info(s"Creating ${settings.lang.name} Stanford NLP NER classifier...") |     logger.info(s"Creating ${settings.lang.name} Stanford NLP NER classifier...") | ||||||
|     new StanfordCoreNLP(Properties.forSettings(settings)) |     new StanfordCoreNLP(Properties.forSettings(settings)) | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user