Learn classifiers for item entities

Learns classifiers for concerned and correspondent entities. This can
be used as an alternative to or after nlp.
This commit is contained in:
Eike Kettner 2021-01-19 20:54:47 +01:00
parent a6f29153c4
commit 99dcaae66b
5 changed files with 284 additions and 51 deletions

View File

@ -1,6 +1,5 @@
package docspell.joex.learn
import cats.data.Kleisli
import cats.data.OptionT
import cats.effect._
import cats.implicits._
@ -24,6 +23,29 @@ object LearnClassifierTask {
def apply[F[_]: Sync: ContextShift](
cfg: Config.TextAnalysis,
analyser: TextAnalyser[F]
): Task[F, Args, Unit] =
learnTags(cfg, analyser)
.flatMap(_ => learnItemEntities(cfg, analyser))
private def learnItemEntities[F[_]: Sync: ContextShift](
cfg: Config.TextAnalysis,
analyser: TextAnalyser[F]
): Task[F, Args, Unit] =
Task { ctx =>
if (cfg.classification.enabled)
LearnItemEntities
.learnAll(
analyser,
ctx.args.collective,
cfg.classification.itemCount
)
.run(ctx)
else ().pure[F]
}
private def learnTags[F[_]: Sync: ContextShift](
cfg: Config.TextAnalysis,
analyser: TextAnalyser[F]
): Task[F, Args, Unit] =
Task { ctx =>
val learnTags =
@ -31,10 +53,11 @@ object LearnClassifierTask {
sett <- findActiveSettings[F](ctx, cfg)
maxItems = math.min(cfg.classification.itemCount, sett.itemCount)
_ <- OptionT.liftF(
learnAllTagCategories(analyser)(ctx.args.collective, maxItems).run(ctx)
LearnTags
.learnAllTagCategories(analyser)(ctx.args.collective, maxItems)
.run(ctx)
)
} yield ()
// learn classifier models from active tag categories
learnTags.getOrElseF(logInactiveWarning(ctx.logger)) *>
// delete classifier model files for categories that have been removed
@ -45,39 +68,6 @@ object LearnClassifierTask {
.map(_ => ())
}
def learnTagCategory[F[_]: Sync: ContextShift, A](
analyser: TextAnalyser[F],
collective: Ident,
maxItems: Int
)(
category: String
): Task[F, A, Unit] =
Task { ctx =>
val data = SelectItems.forCategory(ctx, collective)(maxItems, category)
ctx.logger.info(s"Learn classifier for tag category: $category") *>
analyser.classifier.trainClassifier(ctx.logger, data)(
Kleisli(
StoreClassifierModel.handleModel(
ctx,
collective,
ClassifierName.tagCategory(category)
)
)
)
}
def learnAllTagCategories[F[_]: Sync: ContextShift, A](analyser: TextAnalyser[F])(
collective: Ident,
maxItems: Int
): Task[F, A, Unit] =
Task { ctx =>
for {
cats <- ctx.store.transact(RClassifierSetting.getActiveCategories(collective))
task = learnTagCategory[F, A](analyser, collective, maxItems) _
_ <- cats.map(task).traverse(_.run(ctx))
} yield ()
}
private def clearObsoleteTagModels[F[_]: Sync](ctx: Context[F, Args]): F[Unit] =
for {
list <- ctx.store.transact(

View File

@ -0,0 +1,74 @@
package docspell.joex.learn
import cats.data.Kleisli
import cats.effect._
import cats.implicits._
import fs2.Stream
import docspell.analysis.TextAnalyser
import docspell.analysis.classifier.TextClassifier.Data
import docspell.common._
import docspell.joex.scheduler._
object LearnItemEntities {
def learnAll[F[_]: Sync: ContextShift, A](
analyser: TextAnalyser[F],
collective: Ident,
maxItems: Int
): Task[F, A, Unit] =
learnCorrOrg(analyser, collective, maxItems)
.flatMap(_ => learnCorrPerson[F, A](analyser, collective, maxItems))
.flatMap(_ => learnConcPerson(analyser, collective, maxItems))
.flatMap(_ => learnConcEquip(analyser, collective, maxItems))
def learnCorrOrg[F[_]: Sync: ContextShift, A](
analyser: TextAnalyser[F],
collective: Ident,
maxItems: Int
): Task[F, A, Unit] =
learn(analyser, collective)(
ClassifierName.correspondentOrg,
ctx => SelectItems.forCorrOrg(ctx.store, collective, maxItems)
)
def learnCorrPerson[F[_]: Sync: ContextShift, A](
analyser: TextAnalyser[F],
collective: Ident,
maxItems: Int
): Task[F, A, Unit] =
learn(analyser, collective)(
ClassifierName.correspondentPerson,
ctx => SelectItems.forCorrPerson(ctx.store, collective, maxItems)
)
def learnConcPerson[F[_]: Sync: ContextShift, A](
analyser: TextAnalyser[F],
collective: Ident,
maxItems: Int
): Task[F, A, Unit] =
learn(analyser, collective)(
ClassifierName.concernedPerson,
ctx => SelectItems.forConcPerson(ctx.store, collective, maxItems)
)
def learnConcEquip[F[_]: Sync: ContextShift, A](
analyser: TextAnalyser[F],
collective: Ident,
maxItems: Int
): Task[F, A, Unit] =
learn(analyser, collective)(
ClassifierName.concernedEquip,
ctx => SelectItems.forConcEquip(ctx.store, collective, maxItems)
)
private def learn[F[_]: Sync: ContextShift, A](
analyser: TextAnalyser[F],
collective: Ident
)(cname: ClassifierName, data: Context[F, _] => Stream[F, Data]): Task[F, A, Unit] =
Task { ctx =>
ctx.logger.info(s"Learn classifier ${cname.name}") *>
analyser.classifier.trainClassifier(ctx.logger, data(ctx))(
Kleisli(StoreClassifierModel.handleModel(ctx, collective, cname))
)
}
}

View File

@ -0,0 +1,46 @@
package docspell.joex.learn
import cats.data.Kleisli
import cats.effect._
import cats.implicits._
import docspell.analysis.TextAnalyser
import docspell.common._
import docspell.joex.scheduler._
import docspell.store.records.RClassifierSetting
object LearnTags {
def learnTagCategory[F[_]: Sync: ContextShift, A](
analyser: TextAnalyser[F],
collective: Ident,
maxItems: Int
)(
category: String
): Task[F, A, Unit] =
Task { ctx =>
val data = SelectItems.forCategory(ctx, collective)(maxItems, category)
ctx.logger.info(s"Learn classifier for tag category: $category") *>
analyser.classifier.trainClassifier(ctx.logger, data)(
Kleisli(
StoreClassifierModel.handleModel(
ctx,
collective,
ClassifierName.tagCategory(category)
)
)
)
}
def learnAllTagCategories[F[_]: Sync: ContextShift, A](analyser: TextAnalyser[F])(
collective: Ident,
maxItems: Int
): Task[F, A, Unit] =
Task { ctx =>
for {
cats <- ctx.store.transact(RClassifierSetting.getActiveCategories(collective))
task = learnTagCategory[F, A](analyser, collective, maxItems) _
_ <- cats.map(task).traverse(_.run(ctx))
} yield ()
}
}

View File

@ -1,13 +1,15 @@
package docspell.joex.learn
import fs2.Stream
import fs2.{Pipe, Stream}
import docspell.analysis.classifier.TextClassifier.Data
import docspell.common._
import docspell.joex.scheduler.Context
import docspell.store.Store
import docspell.store.qb.Batch
import docspell.store.queries.QItem
import docspell.store.queries.{QItem, TextAndTag}
import doobie._
object SelectItems {
val pageSep = LearnClassifierTask.pageSep
@ -25,15 +27,67 @@ object SelectItems {
max: Int,
category: String
): Stream[F, Data] = {
val limit = if (max <= 0) Batch.all else Batch.limit(max)
val connStream =
for {
item <- QItem.findAllNewesFirst(collective, 10, limit)
tt <- Stream.eval(
QItem.resolveTextAndTag(collective, item, category, pageSep)
)
} yield Data(tt.tag.map(_.name).getOrElse(noClass), item.id, tt.text.trim)
store.transact(connStream.filter(_.text.nonEmpty))
allItems(collective, max)
.evalMap(item => QItem.resolveTextAndTag(collective, item, category, pageSep))
.through(mkData)
store.transact(connStream)
}
def forCorrOrg[F[_]](
store: Store[F],
collective: Ident,
max: Int
): Stream[F, Data] = {
val connStream =
allItems(collective, max)
.evalMap(item => QItem.resolveTextAndCorrOrg(collective, item, pageSep))
.through(mkData)
store.transact(connStream)
}
def forCorrPerson[F[_]](
store: Store[F],
collective: Ident,
max: Int
): Stream[F, Data] = {
val connStream =
allItems(collective, max)
.evalMap(item => QItem.resolveTextAndCorrPerson(collective, item, pageSep))
.through(mkData)
store.transact(connStream)
}
def forConcPerson[F[_]](
store: Store[F],
collective: Ident,
max: Int
): Stream[F, Data] = {
val connStream =
allItems(collective, max)
.evalMap(item => QItem.resolveTextAndConcPerson(collective, item, pageSep))
.through(mkData)
store.transact(connStream)
}
def forConcEquip[F[_]](
store: Store[F],
collective: Ident,
max: Int
): Stream[F, Data] = {
val connStream =
allItems(collective, max)
.evalMap(item => QItem.resolveTextAndConcEquip(collective, item, pageSep))
.through(mkData)
store.transact(connStream)
}
private def allItems(collective: Ident, max: Int): Stream[ConnectionIO, Ident] = {
val limit = if (max <= 0) Batch.all else Batch.limit(max)
QItem.findAllNewesFirst(collective, 10, limit)
}
private def mkData[F[_]]: Pipe[F, TextAndTag, Data] =
_.map(tt => Data(tt.tag.map(_.name).getOrElse(noClass), tt.itemId.id, tt.text.trim))
.filter(_.text.nonEmpty)
}

View File

@ -567,7 +567,7 @@ object QItem {
val tagsTid = Column[Ident]("tid", tags)
val tagsName = Column[String]("tname", tags)
val q =
readTextAndTag(collective, itemId, pageSep) {
withCte(
tags -> Select(
select(ti.itemId.as(tagsItem), tag.tid.as(tagsTid), tag.name.as(tagsName)),
@ -584,18 +584,87 @@ object QItem {
.leftJoin(tags, tagsItem === i.id),
i.id === itemId && i.cid === collective && m.content.isNotNull && m.content <> ""
)
).build
)
}
}
def resolveTextAndCorrOrg(
collective: Ident,
itemId: Ident,
pageSep: String
): ConnectionIO[TextAndTag] =
readTextAndTag(collective, itemId, pageSep) {
Select(
select(m.content, org.oid, org.name),
from(i)
.innerJoin(a, a.itemId === i.id)
.innerJoin(m, m.id === a.id)
.leftJoin(org, org.oid === i.corrOrg),
i.id === itemId && m.content.isNotNull && m.content <> ""
)
}
def resolveTextAndCorrPerson(
collective: Ident,
itemId: Ident,
pageSep: String
): ConnectionIO[TextAndTag] =
readTextAndTag(collective, itemId, pageSep) {
Select(
select(m.content, pers0.pid, pers0.name),
from(i)
.innerJoin(a, a.itemId === i.id)
.innerJoin(m, m.id === a.id)
.leftJoin(pers0, pers0.pid === i.corrPerson),
i.id === itemId && m.content.isNotNull && m.content <> ""
)
}
def resolveTextAndConcPerson(
collective: Ident,
itemId: Ident,
pageSep: String
): ConnectionIO[TextAndTag] =
readTextAndTag(collective, itemId, pageSep) {
Select(
select(m.content, pers0.pid, pers0.name),
from(i)
.innerJoin(a, a.itemId === i.id)
.innerJoin(m, m.id === a.id)
.leftJoin(pers0, pers0.pid === i.concPerson),
i.id === itemId && m.content.isNotNull && m.content <> ""
)
}
def resolveTextAndConcEquip(
collective: Ident,
itemId: Ident,
pageSep: String
): ConnectionIO[TextAndTag] =
readTextAndTag(collective, itemId, pageSep) {
Select(
select(m.content, equip.eid, equip.name),
from(i)
.innerJoin(a, a.itemId === i.id)
.innerJoin(m, m.id === a.id)
.leftJoin(equip, equip.eid === i.concEquipment),
i.id === itemId && m.content.isNotNull && m.content <> ""
)
}
private def readTextAndTag(collective: Ident, itemId: Ident, pageSep: String)(
q: Select
): ConnectionIO[TextAndTag] =
for {
_ <- logger.ftrace[ConnectionIO](
s"query: $q (${itemId.id}, ${collective.id}, ${tagCategory})"
s"query: $q (${itemId.id}, ${collective.id})"
)
texts <- q.query[(String, Option[TextAndTag.TagName])].to[List]
texts <- q.build.query[(String, Option[TextAndTag.TagName])].to[List]
_ <- logger.ftrace[ConnectionIO](
s"Got ${texts.size} text and tag entries for item ${itemId.id}"
)
tag = texts.headOption.flatMap(_._2)
txt = texts.map(_._1).mkString(pageSep)
} yield TextAndTag(itemId, txt, tag)
}
}