From 8628a0a8b3d3eb2efc9a58a2f3c61b2fe5c1b190 Mon Sep 17 00:00:00 2001 From: Eike Kettner Date: Mon, 24 Aug 2020 00:56:25 +0200 Subject: [PATCH] Allow configuring stanford-ner and cache based on collective --- .../docspell/analysis/TextAnalyser.scala | 83 +++++++++-------- .../docspell/analysis/nlp/PipelineCache.scala | 90 +++++++++++++++++++ .../docspell/analysis/nlp/Properties.scala | 14 +++ .../analysis/nlp/StanfordNerClassifier.scala | 50 +++++------ .../analysis/nlp/StanfordSettings.scala | 22 +++++ .../src/main/scala/docspell/common/File.scala | 3 + .../scala/docspell/joex/JoexAppImpl.scala | 22 ++--- .../docspell/joex/process/ItemHandler.scala | 14 +-- .../docspell/joex/process/ProcessItem.scala | 19 ++-- .../docspell/joex/process/ReProcessItem.scala | 16 ++-- .../docspell/joex/process/TextAnalysis.scala | 55 ++++++------ 11 files changed, 271 insertions(+), 117 deletions(-) create mode 100644 modules/analysis/src/main/scala/docspell/analysis/nlp/PipelineCache.scala create mode 100644 modules/analysis/src/main/scala/docspell/analysis/nlp/StanfordSettings.scala diff --git a/modules/analysis/src/main/scala/docspell/analysis/TextAnalyser.scala b/modules/analysis/src/main/scala/docspell/analysis/TextAnalyser.scala index 443fd47d..75d07eef 100644 --- a/modules/analysis/src/main/scala/docspell/analysis/TextAnalyser.scala +++ b/modules/analysis/src/main/scala/docspell/analysis/TextAnalyser.scala @@ -5,12 +5,19 @@ import cats.implicits._ import docspell.analysis.contact.Contact import docspell.analysis.date.DateFind +import docspell.analysis.nlp.PipelineCache import docspell.analysis.nlp.StanfordNerClassifier +import docspell.analysis.nlp.StanfordSettings import docspell.common._ trait TextAnalyser[F[_]] { - def annotate(logger: Logger[F], lang: Language, text: String): F[TextAnalyser.Result] + def annotate( + logger: Logger[F], + settings: StanfordSettings, + cacheKey: Ident, + text: String + ): F[TextAnalyser.Result] } object TextAnalyser { @@ -22,43 +29,47 @@ object TextAnalyser { } def create[F[_]: Sync](cfg: TextAnalysisConfig): Resource[F, TextAnalyser[F]] = - Resource.pure[F, TextAnalyser[F]](new TextAnalyser[F] { - def annotate( - logger: Logger[F], - lang: Language, - text: String - ): F[TextAnalyser.Result] = - for { - input <- textLimit(logger, text) - tags0 <- stanfordNer(lang, input) - tags1 <- contactNer(input) - dates <- dateNer(lang, input) - list = tags0 ++ tags1 - spans = NerLabelSpan.build(list) - } yield Result(spans ++ list, dates) + Resource + .liftF(PipelineCache[F]()) + .map(cache => + new TextAnalyser[F] { + def annotate( + logger: Logger[F], + settings: StanfordSettings, + cacheKey: Ident, + text: String + ): F[TextAnalyser.Result] = + for { + input <- textLimit(logger, text) + tags0 <- stanfordNer(cacheKey, settings, input) + tags1 <- contactNer(input) + dates <- dateNer(settings.lang, input) + list = tags0 ++ tags1 + spans = NerLabelSpan.build(list) + } yield Result(spans ++ list, dates) - private def textLimit(logger: Logger[F], text: String): F[String] = - if (text.length <= cfg.maxLength) text.pure[F] - else - logger.info( - s"The text to analyse is larger than limit (${text.length} > ${cfg.maxLength})." + - s" Analysing only first ${cfg.maxLength} characters." - ) *> text.take(cfg.maxLength).pure[F] + private def textLimit(logger: Logger[F], text: String): F[String] = + if (text.length <= cfg.maxLength) text.pure[F] + else + logger.info( + s"The text to analyse is larger than limit (${text.length} > ${cfg.maxLength})." + + s" Analysing only first ${cfg.maxLength} characters." + ) *> text.take(cfg.maxLength).pure[F] - private def stanfordNer(lang: Language, text: String): F[Vector[NerLabel]] = - Sync[F].delay { - StanfordNerClassifier.nerAnnotate(lang)(text) + private def stanfordNer(key: Ident, settings: StanfordSettings, text: String) + : F[Vector[NerLabel]] = + StanfordNerClassifier.nerAnnotate[F](key.id, cache)(settings, text) + + private def contactNer(text: String): F[Vector[NerLabel]] = + Sync[F].delay { + Contact.annotate(text) + } + + private def dateNer(lang: Language, text: String): F[Vector[NerDateLabel]] = + Sync[F].delay { + DateFind.findDates(text, lang).toVector + } } - - private def contactNer(text: String): F[Vector[NerLabel]] = - Sync[F].delay { - Contact.annotate(text) - } - - private def dateNer(lang: Language, text: String): F[Vector[NerDateLabel]] = - Sync[F].delay { - DateFind.findDates(text, lang).toVector - } - }) + ) } diff --git a/modules/analysis/src/main/scala/docspell/analysis/nlp/PipelineCache.scala b/modules/analysis/src/main/scala/docspell/analysis/nlp/PipelineCache.scala new file mode 100644 index 00000000..9787563f --- /dev/null +++ b/modules/analysis/src/main/scala/docspell/analysis/nlp/PipelineCache.scala @@ -0,0 +1,90 @@ +package docspell.analysis.nlp + +import cats.Applicative +import cats.effect._ +import cats.effect.concurrent.Ref +import cats.implicits._ + +import docspell.common._ + +import edu.stanford.nlp.pipeline.StanfordCoreNLP +import org.log4s.getLogger + +/** Creating the StanfordCoreNLP pipeline is quite expensive as it + * involves IO and initializing large objects. + * + * Therefore, the instances are cached, because they are thread-safe. + * + * **This is an internal API** + */ +trait PipelineCache[F[_]] { + + def obtain(key: String, settings: StanfordSettings): F[StanfordCoreNLP] + +} + +object PipelineCache { + private[this] val logger = getLogger + + def none[F[_]: Applicative]: PipelineCache[F] = + new PipelineCache[F] { + def obtain(ignored: String, settings: StanfordSettings): F[StanfordCoreNLP] = + makeClassifier(settings).pure[F] + } + + def apply[F[_]: Sync](): F[PipelineCache[F]] = + Ref.of(Map.empty[String, Entry]).map(data => (new Impl[F](data): PipelineCache[F])) + + final private class Impl[F[_]: Sync](data: Ref[F, Map[String, Entry]]) + extends PipelineCache[F] { + + def obtain(key: String, settings: StanfordSettings): F[StanfordCoreNLP] = + for { + id <- makeSettingsId(settings) + nlp <- data.modify(cache => getOrCreate(key, id, cache, settings)) + } yield nlp + + private def getOrCreate( + key: String, + id: String, + cache: Map[String, Entry], + settings: StanfordSettings + ): (Map[String, Entry], StanfordCoreNLP) = + cache.get(key) match { + case Some(entry) => + if (entry.id == id) (cache, entry.value) + else { + logger.info( + s"StanfordNLP settings changed for key $key. Creating new classifier" + ) + val nlp = makeClassifier(settings) + val e = Entry(id, nlp) + (cache.updated(key, e), nlp) + } + + case None => + val nlp = makeClassifier(settings) + val e = Entry(id, nlp) + (cache.updated(key, e), nlp) + } + + private def makeSettingsId(settings: StanfordSettings): F[String] = { + val base = settings.copy(regexNer = None).toString + val size: F[Long] = + settings.regexNer match { + case Some(p) => + File.size(p) + case None => + 0L.pure[F] + } + size.map(len => s"$base-$len") + } + + } + private def makeClassifier(settings: StanfordSettings): StanfordCoreNLP = { + logger.info(s"Creating ${settings.lang.name} Stanford NLP NER classifier...") + new StanfordCoreNLP(Properties.forSettings(settings)) + } + + private case class Entry(id: String, value: StanfordCoreNLP) +} diff --git a/modules/analysis/src/main/scala/docspell/analysis/nlp/Properties.scala b/modules/analysis/src/main/scala/docspell/analysis/nlp/Properties.scala index 75ee7040..314f04fb 100644 --- a/modules/analysis/src/main/scala/docspell/analysis/nlp/Properties.scala +++ b/modules/analysis/src/main/scala/docspell/analysis/nlp/Properties.scala @@ -3,6 +3,7 @@ package docspell.analysis.nlp import java.util.{Properties => JProps} import docspell.analysis.nlp.Properties.Implicits._ +import docspell.common._ object Properties { @@ -13,6 +14,19 @@ object Properties { p } + def forSettings(settings: StanfordSettings): JProps = { + val regexNerFile = settings.regexNer + .map(p => p.normalize().toAbsolutePath().toString()) + settings.lang match { + case Language.German => + Properties.nerGerman(regexNerFile, settings.highRecall) + case Language.English => + Properties.nerEnglish(regexNerFile) + case Language.French => + Properties.nerFrench(regexNerFile, settings.highRecall) + } + } + def nerGerman(regexNerMappingFile: Option[String], highRecall: Boolean): JProps = Properties( "annotators" -> "tokenize,ssplit,mwt,pos,lemma,ner", diff --git a/modules/analysis/src/main/scala/docspell/analysis/nlp/StanfordNerClassifier.scala b/modules/analysis/src/main/scala/docspell/analysis/nlp/StanfordNerClassifier.scala index 32c165f5..424396e5 100644 --- a/modules/analysis/src/main/scala/docspell/analysis/nlp/StanfordNerClassifier.scala +++ b/modules/analysis/src/main/scala/docspell/analysis/nlp/StanfordNerClassifier.scala @@ -1,45 +1,39 @@ package docspell.analysis.nlp -import java.util.{Properties => JProps} - import scala.jdk.CollectionConverters._ +import cats.Applicative +import cats.implicits._ + import docspell.common._ import edu.stanford.nlp.pipeline.{CoreDocument, StanfordCoreNLP} -import org.log4s.getLogger object StanfordNerClassifier { - private[this] val logger = getLogger - lazy val germanNerClassifier = makeClassifier(Language.German) - lazy val englishNerClassifier = makeClassifier(Language.English) - lazy val frenchNerClassifier = makeClassifier(Language.French) + /** Runs named entity recognition on the given `text`. + * + * This uses the classifier pipeline from stanford-nlp, see + * https://nlp.stanford.edu/software/CRF-NER.html. Creating these + * classifiers is quite expensive, it involves loading large model + * files. The classifiers are thread-safe and so they are cached. + * The `cacheKey` defines the "slot" where classifiers are stored + * and retrieved. If for a given `cacheKey` the `settings` change, + * a new classifier must be created. It will then replace the + * previous one. + */ + def nerAnnotate[F[_]: Applicative]( + cacheKey: String, + cache: PipelineCache[F] + )(settings: StanfordSettings, text: String): F[Vector[NerLabel]] = + cache + .obtain(cacheKey, settings) + .map(crf => runClassifier(crf, text)) - def nerAnnotate(lang: Language)(text: String): Vector[NerLabel] = { - val nerClassifier = lang match { - case Language.English => englishNerClassifier - case Language.German => germanNerClassifier - case Language.French => frenchNerClassifier - } + def runClassifier(nerClassifier: StanfordCoreNLP, text: String): Vector[NerLabel] = { val doc = new CoreDocument(text) nerClassifier.annotate(doc) - doc.tokens().asScala.collect(Function.unlift(LabelConverter.toNerLabel)).toVector } - private def makeClassifier(lang: Language): StanfordCoreNLP = { - logger.info(s"Creating ${lang.name} Stanford NLP NER classifier...") - new StanfordCoreNLP(classifierProperties(lang)) - } - - private def classifierProperties(lang: Language): JProps = - lang match { - case Language.German => - Properties.nerGerman(None, false) - case Language.English => - Properties.nerEnglish(None) - case Language.French => - Properties.nerFrench(None, false) - } } diff --git a/modules/analysis/src/main/scala/docspell/analysis/nlp/StanfordSettings.scala b/modules/analysis/src/main/scala/docspell/analysis/nlp/StanfordSettings.scala new file mode 100644 index 00000000..c2f6f98c --- /dev/null +++ b/modules/analysis/src/main/scala/docspell/analysis/nlp/StanfordSettings.scala @@ -0,0 +1,22 @@ +package docspell.analysis.nlp + +import java.nio.file.Path + +import docspell.common._ + +/** Settings for configuring the stanford NER pipeline. + * + * The language is mandatory, only the provided ones are supported. + * The `highRecall` only applies for non-English languages. For + * non-English languages the english classifier is run as second + * classifier and if `highRecall` is true, then it will be used to + * tag untagged tokens. This may lead to a lot of false positives, + * but since English is omnipresent in other languages, too it + * depends on the use case for whether this is useful or not. + * + * The `regexNer` allows to specify a text file as described here: + * https://nlp.stanford.edu/software/regexner.html. This will be used + * as a last step to tag untagged tokens using the provided list of + * regexps. + */ +case class StanfordSettings(lang: Language, highRecall: Boolean, regexNer: Option[Path]) diff --git a/modules/common/src/main/scala/docspell/common/File.scala b/modules/common/src/main/scala/docspell/common/File.scala index e9596fa8..0efc552a 100644 --- a/modules/common/src/main/scala/docspell/common/File.scala +++ b/modules/common/src/main/scala/docspell/common/File.scala @@ -55,6 +55,9 @@ object File { def exists[F[_]: Sync](file: Path): F[Boolean] = Sync[F].delay(Files.exists(file)) + def size[F[_]: Sync](file: Path): F[Long] = + Sync[F].delay(Files.size(file)) + def existsNonEmpty[F[_]: Sync](file: Path, minSize: Long = 0): F[Boolean] = Sync[F].delay(Files.exists(file) && Files.size(file) > minSize) diff --git a/modules/joex/src/main/scala/docspell/joex/JoexAppImpl.scala b/modules/joex/src/main/scala/docspell/joex/JoexAppImpl.scala index bc415446..dcea79df 100644 --- a/modules/joex/src/main/scala/docspell/joex/JoexAppImpl.scala +++ b/modules/joex/src/main/scala/docspell/joex/JoexAppImpl.scala @@ -6,6 +6,7 @@ import cats.effect._ import cats.implicits._ import fs2.concurrent.SignallingRef +import docspell.analysis.TextAnalyser import docspell.backend.ops._ import docspell.common._ import docspell.ftsclient.FtsClient @@ -80,14 +81,15 @@ object JoexAppImpl { for { httpClient <- BlazeClientBuilder[F](clientEC).resource client = JoexClient(httpClient) - store <- Store.create(cfg.jdbc, connectEC, blocker) - queue <- JobQueue(store) - pstore <- PeriodicTaskStore.create(store) - nodeOps <- ONode(store) - joex <- OJoex(client, store) - upload <- OUpload(store, queue, cfg.files, joex) - fts <- createFtsClient(cfg)(httpClient) - itemOps <- OItem(store, fts, queue, joex) + store <- Store.create(cfg.jdbc, connectEC, blocker) + queue <- JobQueue(store) + pstore <- PeriodicTaskStore.create(store) + nodeOps <- ONode(store) + joex <- OJoex(client, store) + upload <- OUpload(store, queue, cfg.files, joex) + fts <- createFtsClient(cfg)(httpClient) + itemOps <- OItem(store, fts, queue, joex) + analyser <- TextAnalyser.create[F](cfg.textAnalysis) javaEmil = JavaMailEmil(blocker, Settings.defaultSettings.copy(debug = cfg.mailDebug)) sch <- SchedulerBuilder(cfg.scheduler, blocker, store) @@ -95,14 +97,14 @@ object JoexAppImpl { .withTask( JobTask.json( ProcessItemArgs.taskName, - ItemHandler.newItem[F](cfg, itemOps, fts), + ItemHandler.newItem[F](cfg, itemOps, fts, analyser), ItemHandler.onCancel[F] ) ) .withTask( JobTask.json( ReProcessItemArgs.taskName, - ReProcessItem[F](cfg, fts), + ReProcessItem[F](cfg, fts, analyser), ReProcessItem.onCancel[F] ) ) diff --git a/modules/joex/src/main/scala/docspell/joex/process/ItemHandler.scala b/modules/joex/src/main/scala/docspell/joex/process/ItemHandler.scala index 4da8f779..240e7f54 100644 --- a/modules/joex/src/main/scala/docspell/joex/process/ItemHandler.scala +++ b/modules/joex/src/main/scala/docspell/joex/process/ItemHandler.scala @@ -5,6 +5,7 @@ import cats.effect._ import cats.implicits._ import fs2.Stream +import docspell.analysis.TextAnalyser import docspell.backend.ops.OItem import docspell.common.{ItemState, ProcessItemArgs} import docspell.ftsclient.FtsClient @@ -29,11 +30,12 @@ object ItemHandler { def newItem[F[_]: ConcurrentEffect: ContextShift]( cfg: Config, itemOps: OItem[F], - fts: FtsClient[F] + fts: FtsClient[F], + analyser: TextAnalyser[F] ): Task[F, Args, Unit] = CreateItem[F] .flatMap(itemStateTask(ItemState.Processing)) - .flatMap(safeProcess[F](cfg, itemOps, fts)) + .flatMap(safeProcess[F](cfg, itemOps, fts, analyser)) .map(_ => ()) def itemStateTask[F[_]: Sync, A]( @@ -51,11 +53,12 @@ object ItemHandler { def safeProcess[F[_]: ConcurrentEffect: ContextShift]( cfg: Config, itemOps: OItem[F], - fts: FtsClient[F] + fts: FtsClient[F], + analyser: TextAnalyser[F] )(data: ItemData): Task[F, Args, ItemData] = isLastRetry[F].flatMap { case true => - ProcessItem[F](cfg, itemOps, fts)(data).attempt.flatMap({ + ProcessItem[F](cfg, itemOps, fts, analyser)(data).attempt.flatMap({ case Right(d) => Task.pure(d) case Left(ex) => @@ -65,7 +68,8 @@ object ItemHandler { .andThen(_ => Sync[F].raiseError(ex)) }) case false => - ProcessItem[F](cfg, itemOps, fts)(data).flatMap(itemStateTask(ItemState.Created)) + ProcessItem[F](cfg, itemOps, fts, analyser)(data) + .flatMap(itemStateTask(ItemState.Created)) } private def markItemCreated[F[_]: Sync]: Task[F, Args, Boolean] = diff --git a/modules/joex/src/main/scala/docspell/joex/process/ProcessItem.scala b/modules/joex/src/main/scala/docspell/joex/process/ProcessItem.scala index 9b4d050f..cd76e095 100644 --- a/modules/joex/src/main/scala/docspell/joex/process/ProcessItem.scala +++ b/modules/joex/src/main/scala/docspell/joex/process/ProcessItem.scala @@ -2,6 +2,7 @@ package docspell.joex.process import cats.effect._ +import docspell.analysis.TextAnalyser import docspell.backend.ops.OItem import docspell.common.ProcessItemArgs import docspell.ftsclient.FtsClient @@ -13,25 +14,28 @@ object ProcessItem { def apply[F[_]: ConcurrentEffect: ContextShift]( cfg: Config, itemOps: OItem[F], - fts: FtsClient[F] + fts: FtsClient[F], + analyser: TextAnalyser[F] )(item: ItemData): Task[F, ProcessItemArgs, ItemData] = ExtractArchive(item) .flatMap(Task.setProgress(20)) - .flatMap(processAttachments0(cfg, fts, (40, 60, 80))) + .flatMap(processAttachments0(cfg, fts, analyser, (40, 60, 80))) .flatMap(LinkProposal[F]) .flatMap(SetGivenData[F](itemOps)) .flatMap(Task.setProgress(99)) def processAttachments[F[_]: ConcurrentEffect: ContextShift]( cfg: Config, - fts: FtsClient[F] + fts: FtsClient[F], + analyser: TextAnalyser[F] )(item: ItemData): Task[F, ProcessItemArgs, ItemData] = - processAttachments0[F](cfg, fts, (30, 60, 90))(item) + processAttachments0[F](cfg, fts, analyser, (30, 60, 90))(item) def analysisOnly[F[_]: Sync]( - cfg: Config + cfg: Config, + analyser: TextAnalyser[F] )(item: ItemData): Task[F, ProcessItemArgs, ItemData] = - TextAnalysis[F](cfg.textAnalysis)(item) + TextAnalysis[F](analyser)(item) .flatMap(FindProposal[F](cfg.processing)) .flatMap(EvalProposals[F]) .flatMap(SaveProposals[F]) @@ -39,12 +43,13 @@ object ProcessItem { private def processAttachments0[F[_]: ConcurrentEffect: ContextShift]( cfg: Config, fts: FtsClient[F], + analyser: TextAnalyser[F], progress: (Int, Int, Int) )(item: ItemData): Task[F, ProcessItemArgs, ItemData] = ConvertPdf(cfg.convert, item) .flatMap(Task.setProgress(progress._1)) .flatMap(TextExtraction(cfg.extraction, fts)) .flatMap(Task.setProgress(progress._2)) - .flatMap(analysisOnly[F](cfg)) + .flatMap(analysisOnly[F](cfg, analyser)) .flatMap(Task.setProgress(progress._3)) } diff --git a/modules/joex/src/main/scala/docspell/joex/process/ReProcessItem.scala b/modules/joex/src/main/scala/docspell/joex/process/ReProcessItem.scala index 8f5e11f2..53282539 100644 --- a/modules/joex/src/main/scala/docspell/joex/process/ReProcessItem.scala +++ b/modules/joex/src/main/scala/docspell/joex/process/ReProcessItem.scala @@ -4,6 +4,7 @@ import cats.data.OptionT import cats.effect._ import cats.implicits._ +import docspell.analysis.TextAnalyser import docspell.common._ import docspell.ftsclient.FtsClient import docspell.joex.Config @@ -19,10 +20,11 @@ object ReProcessItem { def apply[F[_]: ConcurrentEffect: ContextShift]( cfg: Config, - fts: FtsClient[F] + fts: FtsClient[F], + analyser: TextAnalyser[F] ): Task[F, Args, Unit] = loadItem[F] - .flatMap(safeProcess[F](cfg, fts)) + .flatMap(safeProcess[F](cfg, fts, analyser)) .map(_ => ()) def onCancel[F[_]: Sync: ContextShift]: Task[F, Args, Unit] = @@ -70,6 +72,7 @@ object ReProcessItem { def processFiles[F[_]: ConcurrentEffect: ContextShift]( cfg: Config, fts: FtsClient[F], + analyser: TextAnalyser[F], data: ItemData ): Task[F, Args, ItemData] = { @@ -91,7 +94,7 @@ object ReProcessItem { getLanguage[F].flatMap { lang => ProcessItem - .processAttachments[F](cfg, fts)(data) + .processAttachments[F](cfg, fts, analyser)(data) .contramap[Args](convertArgs(lang)) } } @@ -109,11 +112,12 @@ object ReProcessItem { def safeProcess[F[_]: ConcurrentEffect: ContextShift]( cfg: Config, - fts: FtsClient[F] + fts: FtsClient[F], + analyser: TextAnalyser[F] )(data: ItemData): Task[F, Args, ItemData] = isLastRetry[F].flatMap { case true => - processFiles[F](cfg, fts, data).attempt + processFiles[F](cfg, fts, analyser, data).attempt .flatMap({ case Right(d) => Task.pure(d) @@ -123,7 +127,7 @@ object ReProcessItem { ).andThen(_ => Sync[F].raiseError(ex)) }) case false => - processFiles[F](cfg, fts, data) + processFiles[F](cfg, fts, analyser, data) } private def logWarn[F[_]](msg: => String): Task[F, Args, Unit] = diff --git a/modules/joex/src/main/scala/docspell/joex/process/TextAnalysis.scala b/modules/joex/src/main/scala/docspell/joex/process/TextAnalysis.scala index 5e31e2d9..625738ef 100644 --- a/modules/joex/src/main/scala/docspell/joex/process/TextAnalysis.scala +++ b/modules/joex/src/main/scala/docspell/joex/process/TextAnalysis.scala @@ -1,9 +1,10 @@ package docspell.joex.process -import cats.effect.Sync +import cats.effect._ import cats.implicits._ -import docspell.analysis.{TextAnalyser, TextAnalysisConfig} +import docspell.analysis.TextAnalyser +import docspell.analysis.nlp.StanfordSettings import docspell.common._ import docspell.joex.process.ItemData.AttachmentDates import docspell.joex.scheduler.Task @@ -12,36 +13,40 @@ import docspell.store.records.RAttachmentMeta object TextAnalysis { def apply[F[_]: Sync]( - cfg: TextAnalysisConfig + analyser: TextAnalyser[F] )(item: ItemData): Task[F, ProcessItemArgs, ItemData] = Task { ctx => - TextAnalyser.create[F](cfg).use { analyser => - for { - _ <- ctx.logger.info("Starting text analysis") - s <- Duration.stopTime[F] - t <- - item.metas.toList - .traverse( - annotateAttachment[F](ctx.args.meta.language, ctx.logger, analyser) - ) - _ <- ctx.logger.debug(s"Storing tags: ${t.map(_._1.copy(content = None))}") - _ <- t.traverse(m => - ctx.store.transact(RAttachmentMeta.updateLabels(m._1.id, m._1.nerlabels)) - ) - e <- s - _ <- ctx.logger.info(s"Text-Analysis finished in ${e.formatExact}") - v = t.toVector - } yield item.copy(metas = v.map(_._1), dateLabels = v.map(_._2)) - } + for { + _ <- ctx.logger.info("Starting text analysis") + s <- Duration.stopTime[F] + t <- + item.metas.toList + .traverse( + annotateAttachment[F](ctx.args, ctx.logger, analyser) + ) + _ <- ctx.logger.debug(s"Storing tags: ${t.map(_._1.copy(content = None))}") + _ <- t.traverse(m => + ctx.store.transact(RAttachmentMeta.updateLabels(m._1.id, m._1.nerlabels)) + ) + e <- s + _ <- ctx.logger.info(s"Text-Analysis finished in ${e.formatExact}") + v = t.toVector + } yield item.copy(metas = v.map(_._1), dateLabels = v.map(_._2)) } def annotateAttachment[F[_]: Sync]( - lang: Language, + args: ProcessItemArgs, logger: Logger[F], analyser: TextAnalyser[F] - )(rm: RAttachmentMeta): F[(RAttachmentMeta, AttachmentDates)] = + )(rm: RAttachmentMeta): F[(RAttachmentMeta, AttachmentDates)] = { + val settings = StanfordSettings(args.meta.language, false, None) for { - labels <- analyser.annotate(logger, lang, rm.content.getOrElse("")) + labels <- analyser.annotate( + logger, + settings, + args.meta.collective, + rm.content.getOrElse("") + ) } yield (rm.copy(nerlabels = labels.all.toList), AttachmentDates(rm, labels.dates)) - + } }