mirror of
				https://github.com/TheAnachronism/docspell.git
				synced 2025-10-30 21:40:12 +00:00 
			
		
		
		
	Allow configuring stanford-ner and cache based on collective
This commit is contained in:
		| @@ -5,12 +5,19 @@ import cats.implicits._ | ||||
|  | ||||
| import docspell.analysis.contact.Contact | ||||
| import docspell.analysis.date.DateFind | ||||
| import docspell.analysis.nlp.PipelineCache | ||||
| import docspell.analysis.nlp.StanfordNerClassifier | ||||
| import docspell.analysis.nlp.StanfordSettings | ||||
| import docspell.common._ | ||||
|  | ||||
| trait TextAnalyser[F[_]] { | ||||
|  | ||||
|   def annotate(logger: Logger[F], lang: Language, text: String): F[TextAnalyser.Result] | ||||
|   def annotate( | ||||
|       logger: Logger[F], | ||||
|       settings: StanfordSettings, | ||||
|       cacheKey: Ident, | ||||
|       text: String | ||||
|   ): F[TextAnalyser.Result] | ||||
|  | ||||
| } | ||||
| object TextAnalyser { | ||||
| @@ -22,43 +29,47 @@ object TextAnalyser { | ||||
|   } | ||||
|  | ||||
|   def create[F[_]: Sync](cfg: TextAnalysisConfig): Resource[F, TextAnalyser[F]] = | ||||
|     Resource.pure[F, TextAnalyser[F]](new TextAnalyser[F] { | ||||
|       def annotate( | ||||
|           logger: Logger[F], | ||||
|           lang: Language, | ||||
|           text: String | ||||
|       ): F[TextAnalyser.Result] = | ||||
|         for { | ||||
|           input <- textLimit(logger, text) | ||||
|           tags0 <- stanfordNer(lang, input) | ||||
|           tags1 <- contactNer(input) | ||||
|           dates <- dateNer(lang, input) | ||||
|           list  = tags0 ++ tags1 | ||||
|           spans = NerLabelSpan.build(list) | ||||
|         } yield Result(spans ++ list, dates) | ||||
|     Resource | ||||
|       .liftF(PipelineCache[F]()) | ||||
|       .map(cache => | ||||
|         new TextAnalyser[F] { | ||||
|           def annotate( | ||||
|               logger: Logger[F], | ||||
|               settings: StanfordSettings, | ||||
|               cacheKey: Ident, | ||||
|               text: String | ||||
|           ): F[TextAnalyser.Result] = | ||||
|             for { | ||||
|               input <- textLimit(logger, text) | ||||
|               tags0 <- stanfordNer(cacheKey, settings, input) | ||||
|               tags1 <- contactNer(input) | ||||
|               dates <- dateNer(settings.lang, input) | ||||
|               list  = tags0 ++ tags1 | ||||
|               spans = NerLabelSpan.build(list) | ||||
|             } yield Result(spans ++ list, dates) | ||||
|  | ||||
|       private def textLimit(logger: Logger[F], text: String): F[String] = | ||||
|         if (text.length <= cfg.maxLength) text.pure[F] | ||||
|         else | ||||
|           logger.info( | ||||
|             s"The text to analyse is larger than limit (${text.length} > ${cfg.maxLength})." + | ||||
|               s" Analysing only first ${cfg.maxLength} characters." | ||||
|           ) *> text.take(cfg.maxLength).pure[F] | ||||
|           private def textLimit(logger: Logger[F], text: String): F[String] = | ||||
|             if (text.length <= cfg.maxLength) text.pure[F] | ||||
|             else | ||||
|               logger.info( | ||||
|                 s"The text to analyse is larger than limit (${text.length} > ${cfg.maxLength})." + | ||||
|                   s" Analysing only first ${cfg.maxLength} characters." | ||||
|               ) *> text.take(cfg.maxLength).pure[F] | ||||
|  | ||||
|       private def stanfordNer(lang: Language, text: String): F[Vector[NerLabel]] = | ||||
|         Sync[F].delay { | ||||
|           StanfordNerClassifier.nerAnnotate(lang)(text) | ||||
|           private def stanfordNer(key: Ident, settings: StanfordSettings, text: String) | ||||
|               : F[Vector[NerLabel]] = | ||||
|             StanfordNerClassifier.nerAnnotate[F](key.id, cache)(settings, text) | ||||
|  | ||||
|           private def contactNer(text: String): F[Vector[NerLabel]] = | ||||
|             Sync[F].delay { | ||||
|               Contact.annotate(text) | ||||
|             } | ||||
|  | ||||
|           private def dateNer(lang: Language, text: String): F[Vector[NerDateLabel]] = | ||||
|             Sync[F].delay { | ||||
|               DateFind.findDates(text, lang).toVector | ||||
|             } | ||||
|         } | ||||
|  | ||||
|       private def contactNer(text: String): F[Vector[NerLabel]] = | ||||
|         Sync[F].delay { | ||||
|           Contact.annotate(text) | ||||
|         } | ||||
|  | ||||
|       private def dateNer(lang: Language, text: String): F[Vector[NerDateLabel]] = | ||||
|         Sync[F].delay { | ||||
|           DateFind.findDates(text, lang).toVector | ||||
|         } | ||||
|     }) | ||||
|       ) | ||||
|  | ||||
| } | ||||
|   | ||||
| @@ -0,0 +1,90 @@ | ||||
| package docspell.analysis.nlp | ||||
|  | ||||
| import cats.Applicative | ||||
| import cats.effect._ | ||||
| import cats.effect.concurrent.Ref | ||||
| import cats.implicits._ | ||||
|  | ||||
| import docspell.common._ | ||||
|  | ||||
| import edu.stanford.nlp.pipeline.StanfordCoreNLP | ||||
| import org.log4s.getLogger | ||||
|  | ||||
| /** Creating the StanfordCoreNLP pipeline is quite expensive as it | ||||
|   * involves IO and initializing large objects. | ||||
|   * | ||||
|   * Therefore, the instances are cached, because they are thread-safe. | ||||
|   * | ||||
|   * **This is an internal API** | ||||
|   */ | ||||
| trait PipelineCache[F[_]] { | ||||
|  | ||||
|   def obtain(key: String, settings: StanfordSettings): F[StanfordCoreNLP] | ||||
|  | ||||
| } | ||||
|  | ||||
| object PipelineCache { | ||||
|   private[this] val logger = getLogger | ||||
|  | ||||
|   def none[F[_]: Applicative]: PipelineCache[F] = | ||||
|     new PipelineCache[F] { | ||||
|       def obtain(ignored: String, settings: StanfordSettings): F[StanfordCoreNLP] = | ||||
|         makeClassifier(settings).pure[F] | ||||
|     } | ||||
|  | ||||
|   def apply[F[_]: Sync](): F[PipelineCache[F]] = | ||||
|     Ref.of(Map.empty[String, Entry]).map(data => (new Impl[F](data): PipelineCache[F])) | ||||
|  | ||||
|   final private class Impl[F[_]: Sync](data: Ref[F, Map[String, Entry]]) | ||||
|       extends PipelineCache[F] { | ||||
|  | ||||
|     def obtain(key: String, settings: StanfordSettings): F[StanfordCoreNLP] = | ||||
|       for { | ||||
|         id  <- makeSettingsId(settings) | ||||
|         nlp <- data.modify(cache => getOrCreate(key, id, cache, settings)) | ||||
|       } yield nlp | ||||
|  | ||||
|     private def getOrCreate( | ||||
|         key: String, | ||||
|         id: String, | ||||
|         cache: Map[String, Entry], | ||||
|         settings: StanfordSettings | ||||
|     ): (Map[String, Entry], StanfordCoreNLP) = | ||||
|       cache.get(key) match { | ||||
|         case Some(entry) => | ||||
|           if (entry.id == id) (cache, entry.value) | ||||
|           else { | ||||
|             logger.info( | ||||
|               s"StanfordNLP settings changed for key $key. Creating new classifier" | ||||
|             ) | ||||
|             val nlp = makeClassifier(settings) | ||||
|             val e   = Entry(id, nlp) | ||||
|             (cache.updated(key, e), nlp) | ||||
|           } | ||||
|  | ||||
|         case None => | ||||
|           val nlp = makeClassifier(settings) | ||||
|           val e   = Entry(id, nlp) | ||||
|           (cache.updated(key, e), nlp) | ||||
|       } | ||||
|  | ||||
|     private def makeSettingsId(settings: StanfordSettings): F[String] = { | ||||
|       val base = settings.copy(regexNer = None).toString | ||||
|       val size: F[Long] = | ||||
|         settings.regexNer match { | ||||
|           case Some(p) => | ||||
|             File.size(p) | ||||
|           case None => | ||||
|             0L.pure[F] | ||||
|         } | ||||
|       size.map(len => s"$base-$len") | ||||
|     } | ||||
|  | ||||
|   } | ||||
|   private def makeClassifier(settings: StanfordSettings): StanfordCoreNLP = { | ||||
|     logger.info(s"Creating ${settings.lang.name} Stanford NLP NER classifier...") | ||||
|     new StanfordCoreNLP(Properties.forSettings(settings)) | ||||
|   } | ||||
|  | ||||
|   private case class Entry(id: String, value: StanfordCoreNLP) | ||||
| } | ||||
| @@ -3,6 +3,7 @@ package docspell.analysis.nlp | ||||
| import java.util.{Properties => JProps} | ||||
|  | ||||
| import docspell.analysis.nlp.Properties.Implicits._ | ||||
| import docspell.common._ | ||||
|  | ||||
| object Properties { | ||||
|  | ||||
| @@ -13,6 +14,19 @@ object Properties { | ||||
|     p | ||||
|   } | ||||
|  | ||||
|   def forSettings(settings: StanfordSettings): JProps = { | ||||
|     val regexNerFile = settings.regexNer | ||||
|       .map(p => p.normalize().toAbsolutePath().toString()) | ||||
|     settings.lang match { | ||||
|       case Language.German => | ||||
|         Properties.nerGerman(regexNerFile, settings.highRecall) | ||||
|       case Language.English => | ||||
|         Properties.nerEnglish(regexNerFile) | ||||
|       case Language.French => | ||||
|         Properties.nerFrench(regexNerFile, settings.highRecall) | ||||
|     } | ||||
|   } | ||||
|  | ||||
|   def nerGerman(regexNerMappingFile: Option[String], highRecall: Boolean): JProps = | ||||
|     Properties( | ||||
|       "annotators"                  -> "tokenize,ssplit,mwt,pos,lemma,ner", | ||||
|   | ||||
| @@ -1,45 +1,39 @@ | ||||
| package docspell.analysis.nlp | ||||
|  | ||||
| import java.util.{Properties => JProps} | ||||
|  | ||||
| import scala.jdk.CollectionConverters._ | ||||
|  | ||||
| import cats.Applicative | ||||
| import cats.implicits._ | ||||
|  | ||||
| import docspell.common._ | ||||
|  | ||||
| import edu.stanford.nlp.pipeline.{CoreDocument, StanfordCoreNLP} | ||||
| import org.log4s.getLogger | ||||
|  | ||||
| object StanfordNerClassifier { | ||||
|   private[this] val logger = getLogger | ||||
|  | ||||
|   lazy val germanNerClassifier  = makeClassifier(Language.German) | ||||
|   lazy val englishNerClassifier = makeClassifier(Language.English) | ||||
|   lazy val frenchNerClassifier  = makeClassifier(Language.French) | ||||
|   /** Runs named entity recognition on the given `text`. | ||||
|     * | ||||
|     * This uses the classifier pipeline from stanford-nlp, see | ||||
|     * https://nlp.stanford.edu/software/CRF-NER.html. Creating these | ||||
|     * classifiers is quite expensive, it involves loading large model | ||||
|     * files. The classifiers are thread-safe and so they are cached. | ||||
|     * The `cacheKey` defines the "slot" where classifiers are stored | ||||
|     * and retrieved. If for a given `cacheKey` the `settings` change, | ||||
|     * a new classifier must be created. It will then replace the | ||||
|     * previous one. | ||||
|     */ | ||||
|   def nerAnnotate[F[_]: Applicative]( | ||||
|       cacheKey: String, | ||||
|       cache: PipelineCache[F] | ||||
|   )(settings: StanfordSettings, text: String): F[Vector[NerLabel]] = | ||||
|     cache | ||||
|       .obtain(cacheKey, settings) | ||||
|       .map(crf => runClassifier(crf, text)) | ||||
|  | ||||
|   def nerAnnotate(lang: Language)(text: String): Vector[NerLabel] = { | ||||
|     val nerClassifier = lang match { | ||||
|       case Language.English => englishNerClassifier | ||||
|       case Language.German  => germanNerClassifier | ||||
|       case Language.French  => frenchNerClassifier | ||||
|     } | ||||
|   def runClassifier(nerClassifier: StanfordCoreNLP, text: String): Vector[NerLabel] = { | ||||
|     val doc = new CoreDocument(text) | ||||
|     nerClassifier.annotate(doc) | ||||
|  | ||||
|     doc.tokens().asScala.collect(Function.unlift(LabelConverter.toNerLabel)).toVector | ||||
|   } | ||||
|  | ||||
|   private def makeClassifier(lang: Language): StanfordCoreNLP = { | ||||
|     logger.info(s"Creating ${lang.name} Stanford NLP NER classifier...") | ||||
|     new StanfordCoreNLP(classifierProperties(lang)) | ||||
|   } | ||||
|  | ||||
|   private def classifierProperties(lang: Language): JProps = | ||||
|     lang match { | ||||
|       case Language.German => | ||||
|         Properties.nerGerman(None, false) | ||||
|       case Language.English => | ||||
|         Properties.nerEnglish(None) | ||||
|       case Language.French => | ||||
|         Properties.nerFrench(None, false) | ||||
|     } | ||||
| } | ||||
|   | ||||
| @@ -0,0 +1,22 @@ | ||||
| package docspell.analysis.nlp | ||||
|  | ||||
| import java.nio.file.Path | ||||
|  | ||||
| import docspell.common._ | ||||
|  | ||||
| /** Settings for configuring the stanford NER pipeline. | ||||
|   * | ||||
|   * The language is mandatory, only the provided ones are supported. | ||||
|   * The `highRecall` only applies for non-English languages. For | ||||
|   * non-English languages the english classifier is run as second | ||||
|   * classifier and if `highRecall` is true, then it will be used to | ||||
|   * tag untagged tokens. This may lead to a lot of false positives, | ||||
|   * but since English is omnipresent in other languages, too it | ||||
|   * depends on the use case for whether this is useful or not. | ||||
|   * | ||||
|   * The `regexNer` allows to specify a text file as described here: | ||||
|   * https://nlp.stanford.edu/software/regexner.html. This will be used | ||||
|   * as a last step to tag untagged tokens using the provided list of | ||||
|   * regexps. | ||||
|   */ | ||||
| case class StanfordSettings(lang: Language, highRecall: Boolean, regexNer: Option[Path]) | ||||
| @@ -55,6 +55,9 @@ object File { | ||||
|   def exists[F[_]: Sync](file: Path): F[Boolean] = | ||||
|     Sync[F].delay(Files.exists(file)) | ||||
|  | ||||
|   def size[F[_]: Sync](file: Path): F[Long] = | ||||
|     Sync[F].delay(Files.size(file)) | ||||
|  | ||||
|   def existsNonEmpty[F[_]: Sync](file: Path, minSize: Long = 0): F[Boolean] = | ||||
|     Sync[F].delay(Files.exists(file) && Files.size(file) > minSize) | ||||
|  | ||||
|   | ||||
| @@ -6,6 +6,7 @@ import cats.effect._ | ||||
| import cats.implicits._ | ||||
| import fs2.concurrent.SignallingRef | ||||
|  | ||||
| import docspell.analysis.TextAnalyser | ||||
| import docspell.backend.ops._ | ||||
| import docspell.common._ | ||||
| import docspell.ftsclient.FtsClient | ||||
| @@ -80,14 +81,15 @@ object JoexAppImpl { | ||||
|     for { | ||||
|       httpClient <- BlazeClientBuilder[F](clientEC).resource | ||||
|       client = JoexClient(httpClient) | ||||
|       store   <- Store.create(cfg.jdbc, connectEC, blocker) | ||||
|       queue   <- JobQueue(store) | ||||
|       pstore  <- PeriodicTaskStore.create(store) | ||||
|       nodeOps <- ONode(store) | ||||
|       joex    <- OJoex(client, store) | ||||
|       upload  <- OUpload(store, queue, cfg.files, joex) | ||||
|       fts     <- createFtsClient(cfg)(httpClient) | ||||
|       itemOps <- OItem(store, fts, queue, joex) | ||||
|       store    <- Store.create(cfg.jdbc, connectEC, blocker) | ||||
|       queue    <- JobQueue(store) | ||||
|       pstore   <- PeriodicTaskStore.create(store) | ||||
|       nodeOps  <- ONode(store) | ||||
|       joex     <- OJoex(client, store) | ||||
|       upload   <- OUpload(store, queue, cfg.files, joex) | ||||
|       fts      <- createFtsClient(cfg)(httpClient) | ||||
|       itemOps  <- OItem(store, fts, queue, joex) | ||||
|       analyser <- TextAnalyser.create[F](cfg.textAnalysis) | ||||
|       javaEmil = | ||||
|         JavaMailEmil(blocker, Settings.defaultSettings.copy(debug = cfg.mailDebug)) | ||||
|       sch <- SchedulerBuilder(cfg.scheduler, blocker, store) | ||||
| @@ -95,14 +97,14 @@ object JoexAppImpl { | ||||
|         .withTask( | ||||
|           JobTask.json( | ||||
|             ProcessItemArgs.taskName, | ||||
|             ItemHandler.newItem[F](cfg, itemOps, fts), | ||||
|             ItemHandler.newItem[F](cfg, itemOps, fts, analyser), | ||||
|             ItemHandler.onCancel[F] | ||||
|           ) | ||||
|         ) | ||||
|         .withTask( | ||||
|           JobTask.json( | ||||
|             ReProcessItemArgs.taskName, | ||||
|             ReProcessItem[F](cfg, fts), | ||||
|             ReProcessItem[F](cfg, fts, analyser), | ||||
|             ReProcessItem.onCancel[F] | ||||
|           ) | ||||
|         ) | ||||
|   | ||||
| @@ -5,6 +5,7 @@ import cats.effect._ | ||||
| import cats.implicits._ | ||||
| import fs2.Stream | ||||
|  | ||||
| import docspell.analysis.TextAnalyser | ||||
| import docspell.backend.ops.OItem | ||||
| import docspell.common.{ItemState, ProcessItemArgs} | ||||
| import docspell.ftsclient.FtsClient | ||||
| @@ -29,11 +30,12 @@ object ItemHandler { | ||||
|   def newItem[F[_]: ConcurrentEffect: ContextShift]( | ||||
|       cfg: Config, | ||||
|       itemOps: OItem[F], | ||||
|       fts: FtsClient[F] | ||||
|       fts: FtsClient[F], | ||||
|       analyser: TextAnalyser[F] | ||||
|   ): Task[F, Args, Unit] = | ||||
|     CreateItem[F] | ||||
|       .flatMap(itemStateTask(ItemState.Processing)) | ||||
|       .flatMap(safeProcess[F](cfg, itemOps, fts)) | ||||
|       .flatMap(safeProcess[F](cfg, itemOps, fts, analyser)) | ||||
|       .map(_ => ()) | ||||
|  | ||||
|   def itemStateTask[F[_]: Sync, A]( | ||||
| @@ -51,11 +53,12 @@ object ItemHandler { | ||||
|   def safeProcess[F[_]: ConcurrentEffect: ContextShift]( | ||||
|       cfg: Config, | ||||
|       itemOps: OItem[F], | ||||
|       fts: FtsClient[F] | ||||
|       fts: FtsClient[F], | ||||
|       analyser: TextAnalyser[F] | ||||
|   )(data: ItemData): Task[F, Args, ItemData] = | ||||
|     isLastRetry[F].flatMap { | ||||
|       case true => | ||||
|         ProcessItem[F](cfg, itemOps, fts)(data).attempt.flatMap({ | ||||
|         ProcessItem[F](cfg, itemOps, fts, analyser)(data).attempt.flatMap({ | ||||
|           case Right(d) => | ||||
|             Task.pure(d) | ||||
|           case Left(ex) => | ||||
| @@ -65,7 +68,8 @@ object ItemHandler { | ||||
|               .andThen(_ => Sync[F].raiseError(ex)) | ||||
|         }) | ||||
|       case false => | ||||
|         ProcessItem[F](cfg, itemOps, fts)(data).flatMap(itemStateTask(ItemState.Created)) | ||||
|         ProcessItem[F](cfg, itemOps, fts, analyser)(data) | ||||
|           .flatMap(itemStateTask(ItemState.Created)) | ||||
|     } | ||||
|  | ||||
|   private def markItemCreated[F[_]: Sync]: Task[F, Args, Boolean] = | ||||
|   | ||||
| @@ -2,6 +2,7 @@ package docspell.joex.process | ||||
|  | ||||
| import cats.effect._ | ||||
|  | ||||
| import docspell.analysis.TextAnalyser | ||||
| import docspell.backend.ops.OItem | ||||
| import docspell.common.ProcessItemArgs | ||||
| import docspell.ftsclient.FtsClient | ||||
| @@ -13,25 +14,28 @@ object ProcessItem { | ||||
|   def apply[F[_]: ConcurrentEffect: ContextShift]( | ||||
|       cfg: Config, | ||||
|       itemOps: OItem[F], | ||||
|       fts: FtsClient[F] | ||||
|       fts: FtsClient[F], | ||||
|       analyser: TextAnalyser[F] | ||||
|   )(item: ItemData): Task[F, ProcessItemArgs, ItemData] = | ||||
|     ExtractArchive(item) | ||||
|       .flatMap(Task.setProgress(20)) | ||||
|       .flatMap(processAttachments0(cfg, fts, (40, 60, 80))) | ||||
|       .flatMap(processAttachments0(cfg, fts, analyser, (40, 60, 80))) | ||||
|       .flatMap(LinkProposal[F]) | ||||
|       .flatMap(SetGivenData[F](itemOps)) | ||||
|       .flatMap(Task.setProgress(99)) | ||||
|  | ||||
|   def processAttachments[F[_]: ConcurrentEffect: ContextShift]( | ||||
|       cfg: Config, | ||||
|       fts: FtsClient[F] | ||||
|       fts: FtsClient[F], | ||||
|       analyser: TextAnalyser[F] | ||||
|   )(item: ItemData): Task[F, ProcessItemArgs, ItemData] = | ||||
|     processAttachments0[F](cfg, fts, (30, 60, 90))(item) | ||||
|     processAttachments0[F](cfg, fts, analyser, (30, 60, 90))(item) | ||||
|  | ||||
|   def analysisOnly[F[_]: Sync]( | ||||
|       cfg: Config | ||||
|       cfg: Config, | ||||
|       analyser: TextAnalyser[F] | ||||
|   )(item: ItemData): Task[F, ProcessItemArgs, ItemData] = | ||||
|     TextAnalysis[F](cfg.textAnalysis)(item) | ||||
|     TextAnalysis[F](analyser)(item) | ||||
|       .flatMap(FindProposal[F](cfg.processing)) | ||||
|       .flatMap(EvalProposals[F]) | ||||
|       .flatMap(SaveProposals[F]) | ||||
| @@ -39,12 +43,13 @@ object ProcessItem { | ||||
|   private def processAttachments0[F[_]: ConcurrentEffect: ContextShift]( | ||||
|       cfg: Config, | ||||
|       fts: FtsClient[F], | ||||
|       analyser: TextAnalyser[F], | ||||
|       progress: (Int, Int, Int) | ||||
|   )(item: ItemData): Task[F, ProcessItemArgs, ItemData] = | ||||
|     ConvertPdf(cfg.convert, item) | ||||
|       .flatMap(Task.setProgress(progress._1)) | ||||
|       .flatMap(TextExtraction(cfg.extraction, fts)) | ||||
|       .flatMap(Task.setProgress(progress._2)) | ||||
|       .flatMap(analysisOnly[F](cfg)) | ||||
|       .flatMap(analysisOnly[F](cfg, analyser)) | ||||
|       .flatMap(Task.setProgress(progress._3)) | ||||
| } | ||||
|   | ||||
| @@ -4,6 +4,7 @@ import cats.data.OptionT | ||||
| import cats.effect._ | ||||
| import cats.implicits._ | ||||
|  | ||||
| import docspell.analysis.TextAnalyser | ||||
| import docspell.common._ | ||||
| import docspell.ftsclient.FtsClient | ||||
| import docspell.joex.Config | ||||
| @@ -19,10 +20,11 @@ object ReProcessItem { | ||||
|  | ||||
|   def apply[F[_]: ConcurrentEffect: ContextShift]( | ||||
|       cfg: Config, | ||||
|       fts: FtsClient[F] | ||||
|       fts: FtsClient[F], | ||||
|       analyser: TextAnalyser[F] | ||||
|   ): Task[F, Args, Unit] = | ||||
|     loadItem[F] | ||||
|       .flatMap(safeProcess[F](cfg, fts)) | ||||
|       .flatMap(safeProcess[F](cfg, fts, analyser)) | ||||
|       .map(_ => ()) | ||||
|  | ||||
|   def onCancel[F[_]: Sync: ContextShift]: Task[F, Args, Unit] = | ||||
| @@ -70,6 +72,7 @@ object ReProcessItem { | ||||
|   def processFiles[F[_]: ConcurrentEffect: ContextShift]( | ||||
|       cfg: Config, | ||||
|       fts: FtsClient[F], | ||||
|       analyser: TextAnalyser[F], | ||||
|       data: ItemData | ||||
|   ): Task[F, Args, ItemData] = { | ||||
|  | ||||
| @@ -91,7 +94,7 @@ object ReProcessItem { | ||||
|  | ||||
|     getLanguage[F].flatMap { lang => | ||||
|       ProcessItem | ||||
|         .processAttachments[F](cfg, fts)(data) | ||||
|         .processAttachments[F](cfg, fts, analyser)(data) | ||||
|         .contramap[Args](convertArgs(lang)) | ||||
|     } | ||||
|   } | ||||
| @@ -109,11 +112,12 @@ object ReProcessItem { | ||||
|  | ||||
|   def safeProcess[F[_]: ConcurrentEffect: ContextShift]( | ||||
|       cfg: Config, | ||||
|       fts: FtsClient[F] | ||||
|       fts: FtsClient[F], | ||||
|       analyser: TextAnalyser[F] | ||||
|   )(data: ItemData): Task[F, Args, ItemData] = | ||||
|     isLastRetry[F].flatMap { | ||||
|       case true => | ||||
|         processFiles[F](cfg, fts, data).attempt | ||||
|         processFiles[F](cfg, fts, analyser, data).attempt | ||||
|           .flatMap({ | ||||
|             case Right(d) => | ||||
|               Task.pure(d) | ||||
| @@ -123,7 +127,7 @@ object ReProcessItem { | ||||
|               ).andThen(_ => Sync[F].raiseError(ex)) | ||||
|           }) | ||||
|       case false => | ||||
|         processFiles[F](cfg, fts, data) | ||||
|         processFiles[F](cfg, fts, analyser, data) | ||||
|     } | ||||
|  | ||||
|   private def logWarn[F[_]](msg: => String): Task[F, Args, Unit] = | ||||
|   | ||||
| @@ -1,9 +1,10 @@ | ||||
| package docspell.joex.process | ||||
|  | ||||
| import cats.effect.Sync | ||||
| import cats.effect._ | ||||
| import cats.implicits._ | ||||
|  | ||||
| import docspell.analysis.{TextAnalyser, TextAnalysisConfig} | ||||
| import docspell.analysis.TextAnalyser | ||||
| import docspell.analysis.nlp.StanfordSettings | ||||
| import docspell.common._ | ||||
| import docspell.joex.process.ItemData.AttachmentDates | ||||
| import docspell.joex.scheduler.Task | ||||
| @@ -12,36 +13,40 @@ import docspell.store.records.RAttachmentMeta | ||||
| object TextAnalysis { | ||||
|  | ||||
|   def apply[F[_]: Sync]( | ||||
|       cfg: TextAnalysisConfig | ||||
|       analyser: TextAnalyser[F] | ||||
|   )(item: ItemData): Task[F, ProcessItemArgs, ItemData] = | ||||
|     Task { ctx => | ||||
|       TextAnalyser.create[F](cfg).use { analyser => | ||||
|         for { | ||||
|           _ <- ctx.logger.info("Starting text analysis") | ||||
|           s <- Duration.stopTime[F] | ||||
|           t <- | ||||
|             item.metas.toList | ||||
|               .traverse( | ||||
|                 annotateAttachment[F](ctx.args.meta.language, ctx.logger, analyser) | ||||
|               ) | ||||
|           _ <- ctx.logger.debug(s"Storing tags: ${t.map(_._1.copy(content = None))}") | ||||
|           _ <- t.traverse(m => | ||||
|             ctx.store.transact(RAttachmentMeta.updateLabels(m._1.id, m._1.nerlabels)) | ||||
|           ) | ||||
|           e <- s | ||||
|           _ <- ctx.logger.info(s"Text-Analysis finished in ${e.formatExact}") | ||||
|           v = t.toVector | ||||
|         } yield item.copy(metas = v.map(_._1), dateLabels = v.map(_._2)) | ||||
|       } | ||||
|       for { | ||||
|         _ <- ctx.logger.info("Starting text analysis") | ||||
|         s <- Duration.stopTime[F] | ||||
|         t <- | ||||
|           item.metas.toList | ||||
|             .traverse( | ||||
|               annotateAttachment[F](ctx.args, ctx.logger, analyser) | ||||
|             ) | ||||
|         _ <- ctx.logger.debug(s"Storing tags: ${t.map(_._1.copy(content = None))}") | ||||
|         _ <- t.traverse(m => | ||||
|           ctx.store.transact(RAttachmentMeta.updateLabels(m._1.id, m._1.nerlabels)) | ||||
|         ) | ||||
|         e <- s | ||||
|         _ <- ctx.logger.info(s"Text-Analysis finished in ${e.formatExact}") | ||||
|         v = t.toVector | ||||
|       } yield item.copy(metas = v.map(_._1), dateLabels = v.map(_._2)) | ||||
|     } | ||||
|  | ||||
|   def annotateAttachment[F[_]: Sync]( | ||||
|       lang: Language, | ||||
|       args: ProcessItemArgs, | ||||
|       logger: Logger[F], | ||||
|       analyser: TextAnalyser[F] | ||||
|   )(rm: RAttachmentMeta): F[(RAttachmentMeta, AttachmentDates)] = | ||||
|   )(rm: RAttachmentMeta): F[(RAttachmentMeta, AttachmentDates)] = { | ||||
|     val settings = StanfordSettings(args.meta.language, false, None) | ||||
|     for { | ||||
|       labels <- analyser.annotate(logger, lang, rm.content.getOrElse("")) | ||||
|       labels <- analyser.annotate( | ||||
|         logger, | ||||
|         settings, | ||||
|         args.meta.collective, | ||||
|         rm.content.getOrElse("") | ||||
|       ) | ||||
|     } yield (rm.copy(nerlabels = labels.all.toList), AttachmentDates(rm, labels.dates)) | ||||
|  | ||||
|   } | ||||
| } | ||||
|   | ||||
		Reference in New Issue
	
	Block a user