Use collective data with NER annotation

This commit is contained in:
Eike Kettner
2020-08-24 23:25:57 +02:00
parent de5b33c40d
commit 3473cbb773
12 changed files with 413 additions and 76 deletions

View File

@ -248,6 +248,29 @@ docspell.joex {
# should suffice. Default is 10000, which are about 2-3 pages
# (just a rough guess, of course).
max-length = 10000
# A working directory for the analyser to store temporary/working
# files.
working-dir = ${java.io.tmpdir}"/docspell-analysis"
regex-ner {
# Whether to enable custom NER annotation. This uses the address
# book of a collective as input for NER tagging (to automatically
# find correspondent and concerned entities). If the address book
# is large, this can be quite memory intensive and also makes text
# analysis slower. But it greatly improves accuracy. If this is
# false, NER tagging uses only statistical models (that also work
# quite well).
#
# This setting might be moved to the collective settings in the
# future.
enabled = true
# The NER annotation uses a file of patterns that is derived from
# a collective's address book. This is is the time how long this
# file will be kept until a check for a state change is done.
file-cache-time = "1 minute"
}
}
# Configuration for converting files into PDFs.

View File

@ -1,11 +1,14 @@
package docspell.joex
import java.nio.file.Path
import docspell.analysis.TextAnalysisConfig
import docspell.backend.Config.Files
import docspell.common._
import docspell.convert.ConvertConfig
import docspell.extract.ExtractConfig
import docspell.ftssolr.SolrConfig
import docspell.joex.analysis.RegexNerFile
import docspell.joex.hk.HouseKeepingConfig
import docspell.joex.scheduler.{PeriodicSchedulerConfig, SchedulerConfig}
import docspell.store.JdbcConfig
@ -20,7 +23,7 @@ case class Config(
userTasks: Config.UserTasks,
houseKeeping: HouseKeepingConfig,
extraction: ExtractConfig,
textAnalysis: TextAnalysisConfig,
textAnalysis: Config.TextAnalysis,
convert: ConvertConfig,
sendMail: MailSendConfig,
files: Files,
@ -50,4 +53,19 @@ object Config {
}
case class Processing(maxDueDateYears: Int)
case class TextAnalysis(
maxLength: Int,
workingDir: Path,
regexNer: RegexNer
) {
def textAnalysisConfig: TextAnalysisConfig =
TextAnalysisConfig(maxLength)
def regexNerFileConfig: RegexNerFile.Config =
RegexNerFile.Config(regexNer.enabled, workingDir, regexNer.fileCacheTime)
}
case class RegexNer(enabled: Boolean, fileCacheTime: Duration)
}

View File

@ -11,6 +11,7 @@ import docspell.backend.ops._
import docspell.common._
import docspell.ftsclient.FtsClient
import docspell.ftssolr.SolrFtsClient
import docspell.joex.analysis.RegexNerFile
import docspell.joex.fts.{MigrationTask, ReIndexTask}
import docspell.joex.hk._
import docspell.joex.notify._
@ -89,7 +90,8 @@ object JoexAppImpl {
upload <- OUpload(store, queue, cfg.files, joex)
fts <- createFtsClient(cfg)(httpClient)
itemOps <- OItem(store, fts, queue, joex)
analyser <- TextAnalyser.create[F](cfg.textAnalysis)
analyser <- TextAnalyser.create[F](cfg.textAnalysis.textAnalysisConfig)
regexNer <- RegexNerFile(cfg.textAnalysis.regexNerFileConfig, blocker, store)
javaEmil =
JavaMailEmil(blocker, Settings.defaultSettings.copy(debug = cfg.mailDebug))
sch <- SchedulerBuilder(cfg.scheduler, blocker, store)
@ -97,14 +99,14 @@ object JoexAppImpl {
.withTask(
JobTask.json(
ProcessItemArgs.taskName,
ItemHandler.newItem[F](cfg, itemOps, fts, analyser),
ItemHandler.newItem[F](cfg, itemOps, fts, analyser, regexNer),
ItemHandler.onCancel[F]
)
)
.withTask(
JobTask.json(
ReProcessItemArgs.taskName,
ReProcessItem[F](cfg, fts, analyser),
ReProcessItem[F](cfg, fts, analyser, regexNer),
ReProcessItem.onCancel[F]
)
)

View File

@ -0,0 +1,99 @@
package docspell.joex.analysis
import java.nio.file.Path
import cats.effect._
import cats.implicits._
import docspell.analysis.split.TextSplitter
import docspell.common._
import docspell.store.queries.QCollective
import io.circe.generic.semiauto._
import io.circe.{Decoder, Encoder}
case class NerFile(collective: Ident, updated: Timestamp, creation: Timestamp) {
def nerFilePath(directory: Path): Path =
NerFile.nerFilePath(directory, collective)
def jsonFilePath(directory: Path) =
NerFile.jsonFilePath(directory, collective)
}
object NerFile {
implicit val jsonDecoder: Decoder[NerFile] =
deriveDecoder[NerFile]
implicit val jsonEncoder: Encoder[NerFile] =
deriveEncoder[NerFile]
private def nerFilePath(directory: Path, collective: Ident): Path =
directory.resolve(s"${collective.id}.txt")
private def jsonFilePath(directory: Path, collective: Ident): Path =
directory.resolve(s"${collective.id}.json")
def find[F[_]: Sync: ContextShift](
collective: Ident,
directory: Path,
blocker: Blocker
): F[Option[NerFile]] = {
val file = jsonFilePath(directory, collective)
File.existsNonEmpty[F](file).flatMap {
case true =>
File
.readJson[F, NerFile](file, blocker)
.map(_.some)
case false =>
(None: Option[NerFile]).pure[F]
}
}
def mkNerConfig(names: QCollective.Names): String = {
val orgs = names.org
.flatMap(Pattern(3))
.distinct
.map(_.toRow("ORGANIZATION", "LOCATION,PERSON,MISC"))
val pers =
names.pers
.flatMap(Pattern(2))
.distinct
.map(_.toRow("PERSON", "LOCATION,MISC"))
val equips =
names.equip
.flatMap(Pattern(1))
.distinct
.map(_.toRow("MISC", "LOCATION"))
(orgs ++ pers ++ equips).mkString("\n")
}
case class Pattern(value: String, weight: Int) {
def toRow(tag: String, overrideTags: String): String =
s"$value\t$tag\t$overrideTags\t$weight"
}
object Pattern {
def apply(weight: Int)(str: String): Vector[Pattern] = {
val delims = " \t\n\r".toSet
val words =
TextSplitter
.split(str, delims)
.map(_.toLower.value.trim)
.filter(_.nonEmpty)
.toVector
.map(w => s"(?i)${w}")
val tokens =
TextSplitter
.splitToken(str, delims)
.map(_.toLower.value.trim)
.filter(_.nonEmpty)
.toVector
.take(3)
.map(w => s"(?i)${w}")
tokens.map(t => Pattern(t, weight)).prepended(Pattern(words.mkString(" "), weight))
}
}
}

View File

@ -0,0 +1,164 @@
package docspell.joex.analysis
import java.nio.file.Path
import cats.effect._
import cats.effect.concurrent.Semaphore
import cats.implicits._
import docspell.common._
import docspell.common.syntax.all._
import docspell.store.Store
import docspell.store.queries.QCollective
import docspell.store.records.REquipment
import docspell.store.records.ROrganization
import docspell.store.records.RPerson
import io.circe.syntax._
import org.log4s.getLogger
/** Maintains a custom regex-ner file per collective for stanford's
* regexner annotator.
*/
trait RegexNerFile[F[_]] {
def makeFile(collective: Ident): F[Option[Path]]
}
object RegexNerFile {
private[this] val logger = getLogger
case class Config(enabled: Boolean, directory: Path, minTime: Duration)
def apply[F[_]: Concurrent: ContextShift](
cfg: Config,
blocker: Blocker,
store: Store[F]
): Resource[F, RegexNerFile[F]] =
for {
dir <- File.withTempDir[F](cfg.directory, "regexner-")
writer <- Resource.liftF(Semaphore(1))
} yield new Impl[F](cfg.copy(directory = dir), blocker, store, writer)
final private class Impl[F[_]: Concurrent: ContextShift](
cfg: Config,
blocker: Blocker,
store: Store[F],
writer: Semaphore[F] //TODO allow parallelism per collective
) extends RegexNerFile[F] {
def makeFile(collective: Ident): F[Option[Path]] =
if (cfg.enabled) doMakeFile(collective)
else (None: Option[Path]).pure[F]
def doMakeFile(collective: Ident): F[Option[Path]] =
for {
now <- Timestamp.current[F]
existing <- NerFile.find[F](collective, cfg.directory, blocker)
result <- existing match {
case Some(nf) =>
val dur = Duration.between(nf.creation, now)
if (dur > cfg.minTime)
logger.fdebug(
s"Cache time elapsed (${dur} > ${cfg.minTime}). Check for new state."
) *> updateFile(
collective,
now,
Some(nf)
)
else nf.nerFilePath(cfg.directory).some.pure[F]
case None =>
updateFile(collective, now, None)
}
} yield result
private def updateFile(
collective: Ident,
now: Timestamp,
current: Option[NerFile]
): F[Option[Path]] =
for {
lastUpdate <- store.transact(Sql.latestUpdate(collective))
result <- lastUpdate match {
case None =>
(None: Option[Path]).pure[F]
case Some(lup) =>
current match {
case Some(cur) =>
val nerf =
if (cur.updated == lup)
logger.fdebug(s"No state change detected.") *> updateTimestamp(
cur,
now
) *> cur.pure[F]
else
logger.fdebug(
s"There have been state changes for collective '${collective.id}'. Reload NER file."
) *> createFile(lup, collective, now)
nerf.map(_.nerFilePath(cfg.directory).some)
case None =>
createFile(lup, collective, now)
.map(_.nerFilePath(cfg.directory).some)
}
}
} yield result
private def updateTimestamp(nf: NerFile, now: Timestamp): F[Unit] =
writer.withPermit(for {
file <- Sync[F].pure(nf.jsonFilePath(cfg.directory))
_ <- File.mkDir(file.getParent)
_ <- File.writeString(file, nf.copy(creation = now).asJson.spaces2)
} yield ())
private def createFile(
lastUpdate: Timestamp,
collective: Ident,
now: Timestamp
): F[NerFile] = {
def update(nf: NerFile, text: String): F[Unit] =
writer.withPermit(for {
jsonFile <- Sync[F].pure(nf.jsonFilePath(cfg.directory))
_ <- logger.fdebug(s"Writing custom NER file for collective '${collective.id}'")
_ <- File.mkDir(jsonFile.getParent)
_ <- File.writeString(nf.nerFilePath(cfg.directory), text)
_ <- File.writeString(jsonFile, nf.asJson.spaces2)
} yield ())
for {
_ <- logger.finfo(s"Generating custom NER file for collective '${collective.id}'")
names <- store.transact(QCollective.allNames(collective))
nerFile = NerFile(collective, lastUpdate, now)
_ <- update(nerFile, NerFile.mkNerConfig(names))
} yield nerFile
}
}
object Sql {
import doobie._
import doobie.implicits._
import docspell.store.impl.Implicits._
import docspell.store.impl.Column
def latestUpdate(collective: Ident): ConnectionIO[Option[Timestamp]] = {
def max(col: Column, table: Fragment, cidCol: Column): Fragment =
selectSimple(col.max ++ fr"as t", table, cidCol.is(collective))
val sql =
List(
max(
ROrganization.Columns.updated,
ROrganization.table,
ROrganization.Columns.cid
),
max(RPerson.Columns.updated, RPerson.table, RPerson.Columns.cid),
max(REquipment.Columns.updated, REquipment.table, REquipment.Columns.cid)
)
.reduce(_ ++ fr"UNION ALL" ++ _)
selectSimple(fr"MAX(t)", fr"(" ++ sql ++ fr") as x", Fragment.empty)
.query[Timestamp]
.option
}
}
}

View File

@ -10,6 +10,7 @@ import docspell.backend.ops.OItem
import docspell.common.{ItemState, ProcessItemArgs}
import docspell.ftsclient.FtsClient
import docspell.joex.Config
import docspell.joex.analysis.RegexNerFile
import docspell.joex.scheduler.Task
import docspell.store.queries.QItem
import docspell.store.records.RItem
@ -31,11 +32,12 @@ object ItemHandler {
cfg: Config,
itemOps: OItem[F],
fts: FtsClient[F],
analyser: TextAnalyser[F]
analyser: TextAnalyser[F],
regexNer: RegexNerFile[F]
): Task[F, Args, Unit] =
CreateItem[F]
.flatMap(itemStateTask(ItemState.Processing))
.flatMap(safeProcess[F](cfg, itemOps, fts, analyser))
.flatMap(safeProcess[F](cfg, itemOps, fts, analyser, regexNer))
.map(_ => ())
def itemStateTask[F[_]: Sync, A](
@ -54,11 +56,12 @@ object ItemHandler {
cfg: Config,
itemOps: OItem[F],
fts: FtsClient[F],
analyser: TextAnalyser[F]
analyser: TextAnalyser[F],
regexNer: RegexNerFile[F]
)(data: ItemData): Task[F, Args, ItemData] =
isLastRetry[F].flatMap {
case true =>
ProcessItem[F](cfg, itemOps, fts, analyser)(data).attempt.flatMap({
ProcessItem[F](cfg, itemOps, fts, analyser, regexNer)(data).attempt.flatMap({
case Right(d) =>
Task.pure(d)
case Left(ex) =>
@ -68,7 +71,7 @@ object ItemHandler {
.andThen(_ => Sync[F].raiseError(ex))
})
case false =>
ProcessItem[F](cfg, itemOps, fts, analyser)(data)
ProcessItem[F](cfg, itemOps, fts, analyser, regexNer)(data)
.flatMap(itemStateTask(ItemState.Created))
}

View File

@ -7,6 +7,7 @@ import docspell.backend.ops.OItem
import docspell.common.ProcessItemArgs
import docspell.ftsclient.FtsClient
import docspell.joex.Config
import docspell.joex.analysis.RegexNerFile
import docspell.joex.scheduler.Task
object ProcessItem {
@ -15,11 +16,12 @@ object ProcessItem {
cfg: Config,
itemOps: OItem[F],
fts: FtsClient[F],
analyser: TextAnalyser[F]
analyser: TextAnalyser[F],
regexNer: RegexNerFile[F]
)(item: ItemData): Task[F, ProcessItemArgs, ItemData] =
ExtractArchive(item)
.flatMap(Task.setProgress(20))
.flatMap(processAttachments0(cfg, fts, analyser, (40, 60, 80)))
.flatMap(processAttachments0(cfg, fts, analyser, regexNer, (40, 60, 80)))
.flatMap(LinkProposal[F])
.flatMap(SetGivenData[F](itemOps))
.flatMap(Task.setProgress(99))
@ -27,15 +29,17 @@ object ProcessItem {
def processAttachments[F[_]: ConcurrentEffect: ContextShift](
cfg: Config,
fts: FtsClient[F],
analyser: TextAnalyser[F]
analyser: TextAnalyser[F],
regexNer: RegexNerFile[F]
)(item: ItemData): Task[F, ProcessItemArgs, ItemData] =
processAttachments0[F](cfg, fts, analyser, (30, 60, 90))(item)
processAttachments0[F](cfg, fts, analyser, regexNer, (30, 60, 90))(item)
def analysisOnly[F[_]: Sync](
cfg: Config,
analyser: TextAnalyser[F]
analyser: TextAnalyser[F],
regexNer: RegexNerFile[F]
)(item: ItemData): Task[F, ProcessItemArgs, ItemData] =
TextAnalysis[F](analyser)(item)
TextAnalysis[F](analyser, regexNer)(item)
.flatMap(FindProposal[F](cfg.processing))
.flatMap(EvalProposals[F])
.flatMap(SaveProposals[F])
@ -44,12 +48,13 @@ object ProcessItem {
cfg: Config,
fts: FtsClient[F],
analyser: TextAnalyser[F],
regexNer: RegexNerFile[F],
progress: (Int, Int, Int)
)(item: ItemData): Task[F, ProcessItemArgs, ItemData] =
ConvertPdf(cfg.convert, item)
.flatMap(Task.setProgress(progress._1))
.flatMap(TextExtraction(cfg.extraction, fts))
.flatMap(Task.setProgress(progress._2))
.flatMap(analysisOnly[F](cfg, analyser))
.flatMap(analysisOnly[F](cfg, analyser, regexNer))
.flatMap(Task.setProgress(progress._3))
}

View File

@ -8,6 +8,7 @@ import docspell.analysis.TextAnalyser
import docspell.common._
import docspell.ftsclient.FtsClient
import docspell.joex.Config
import docspell.joex.analysis.RegexNerFile
import docspell.joex.scheduler.Context
import docspell.joex.scheduler.Task
import docspell.store.records.RAttachment
@ -21,10 +22,11 @@ object ReProcessItem {
def apply[F[_]: ConcurrentEffect: ContextShift](
cfg: Config,
fts: FtsClient[F],
analyser: TextAnalyser[F]
analyser: TextAnalyser[F],
regexNer: RegexNerFile[F]
): Task[F, Args, Unit] =
loadItem[F]
.flatMap(safeProcess[F](cfg, fts, analyser))
.flatMap(safeProcess[F](cfg, fts, analyser, regexNer))
.map(_ => ())
def onCancel[F[_]: Sync: ContextShift]: Task[F, Args, Unit] =
@ -73,6 +75,7 @@ object ReProcessItem {
cfg: Config,
fts: FtsClient[F],
analyser: TextAnalyser[F],
regexNer: RegexNerFile[F],
data: ItemData
): Task[F, Args, ItemData] = {
@ -94,7 +97,7 @@ object ReProcessItem {
getLanguage[F].flatMap { lang =>
ProcessItem
.processAttachments[F](cfg, fts, analyser)(data)
.processAttachments[F](cfg, fts, analyser, regexNer)(data)
.contramap[Args](convertArgs(lang))
}
}
@ -113,11 +116,12 @@ object ReProcessItem {
def safeProcess[F[_]: ConcurrentEffect: ContextShift](
cfg: Config,
fts: FtsClient[F],
analyser: TextAnalyser[F]
analyser: TextAnalyser[F],
regexNer: RegexNerFile[F]
)(data: ItemData): Task[F, Args, ItemData] =
isLastRetry[F].flatMap {
case true =>
processFiles[F](cfg, fts, analyser, data).attempt
processFiles[F](cfg, fts, analyser, regexNer, data).attempt
.flatMap({
case Right(d) =>
Task.pure(d)
@ -127,7 +131,7 @@ object ReProcessItem {
).andThen(_ => Sync[F].raiseError(ex))
})
case false =>
processFiles[F](cfg, fts, analyser, data)
processFiles[F](cfg, fts, analyser, regexNer, data)
}
private def logWarn[F[_]](msg: => String): Task[F, Args, Unit] =

View File

@ -1,24 +1,22 @@
package docspell.joex.process
import java.nio.file.Paths
import cats.effect._
import cats.implicits._
import docspell.analysis.TextAnalyser
import docspell.analysis.nlp.StanfordSettings
import docspell.analysis.split.TextSplitter
import docspell.common._
import docspell.joex.analysis.RegexNerFile
import docspell.joex.process.ItemData.AttachmentDates
import docspell.joex.scheduler.Context
import docspell.joex.scheduler.Task
import docspell.store.queries.QCollective
import docspell.store.records.RAttachmentMeta
object TextAnalysis {
def apply[F[_]: Sync](
analyser: TextAnalyser[F]
analyser: TextAnalyser[F],
nerFile: RegexNerFile[F]
)(item: ItemData): Task[F, ProcessItemArgs, ItemData] =
Task { ctx =>
for {
@ -27,7 +25,7 @@ object TextAnalysis {
t <-
item.metas.toList
.traverse(
annotateAttachment[F](ctx, analyser)
annotateAttachment[F](ctx, analyser, nerFile)
)
_ <- ctx.logger.debug(s"Storing tags: ${t.map(_._1.copy(content = None))}")
_ <- t.traverse(m =>
@ -41,63 +39,19 @@ object TextAnalysis {
def annotateAttachment[F[_]: Sync](
ctx: Context[F, ProcessItemArgs],
analyser: TextAnalyser[F]
analyser: TextAnalyser[F],
nerFile: RegexNerFile[F]
)(rm: RAttachmentMeta): F[(RAttachmentMeta, AttachmentDates)] = {
val settings = StanfordSettings(ctx.args.meta.language, false, None)
for {
names <- ctx.store.transact(QCollective.allNames(ctx.args.meta.collective))
temp <- File.mkTempFile(Paths.get("."), "textanalysis")
_ <- File.writeString(temp, mkNerConfig(names))
sett = settings.copy(regexNer = Some(temp))
customNer <- nerFile.makeFile(ctx.args.meta.collective)
sett = settings.copy(regexNer = customNer)
labels <- analyser.annotate(
ctx.logger,
sett,
ctx.args.meta.collective,
rm.content.getOrElse("")
)
_ <- File.deleteFile(temp)
} yield (rm.copy(nerlabels = labels.all.toList), AttachmentDates(rm, labels.dates))
}
def mkNerConfig(names: QCollective.Names): String = {
val orgs = names.org
.flatMap(Pattern(3))
.distinct
.map(_.toRow("ORGANIZATION", "LOCATION,PERSON,MISC"))
val pers =
names.pers
.flatMap(Pattern(2))
.distinct
.map(_.toRow("PERSON", "LOCATION,MISC"))
val equips =
names.equip
.flatMap(Pattern(1))
.distinct
.map(_.toRow("MISC", "LOCATION"))
(orgs ++ pers ++ equips).mkString("\n")
}
case class Pattern(value: String, weight: Int) {
def toRow(tag: String, overrideTags: String): String =
s"$value\t$tag\t$overrideTags\t$weight"
}
object Pattern {
def apply(weight: Int)(str: String): Vector[Pattern] = {
val delims = " \t\n\r".toSet
val words =
TextSplitter.split(str, delims).toVector.map(w => s"(?i)${w.toLower.value}")
val tokens =
TextSplitter
.splitToken(str, delims)
.toVector
.take(3)
.map(w => s"(?i)${w.toLower.value}")
tokens.map(t => Pattern(t, weight)).prepended(Pattern(words.mkString(" "), weight))
}
}
}