diff --git a/modules/joex/src/main/scala/docspell/joex/learn/LearnClassifierTask.scala b/modules/joex/src/main/scala/docspell/joex/learn/LearnClassifierTask.scala index 843ee951..354a8e39 100644 --- a/modules/joex/src/main/scala/docspell/joex/learn/LearnClassifierTask.scala +++ b/modules/joex/src/main/scala/docspell/joex/learn/LearnClassifierTask.scala @@ -1,6 +1,5 @@ package docspell.joex.learn -import cats.data.Kleisli import cats.data.OptionT import cats.effect._ import cats.implicits._ @@ -24,6 +23,29 @@ object LearnClassifierTask { def apply[F[_]: Sync: ContextShift]( cfg: Config.TextAnalysis, analyser: TextAnalyser[F] + ): Task[F, Args, Unit] = + learnTags(cfg, analyser) + .flatMap(_ => learnItemEntities(cfg, analyser)) + + private def learnItemEntities[F[_]: Sync: ContextShift]( + cfg: Config.TextAnalysis, + analyser: TextAnalyser[F] + ): Task[F, Args, Unit] = + Task { ctx => + if (cfg.classification.enabled) + LearnItemEntities + .learnAll( + analyser, + ctx.args.collective, + cfg.classification.itemCount + ) + .run(ctx) + else ().pure[F] + } + + private def learnTags[F[_]: Sync: ContextShift]( + cfg: Config.TextAnalysis, + analyser: TextAnalyser[F] ): Task[F, Args, Unit] = Task { ctx => val learnTags = @@ -31,10 +53,11 @@ object LearnClassifierTask { sett <- findActiveSettings[F](ctx, cfg) maxItems = math.min(cfg.classification.itemCount, sett.itemCount) _ <- OptionT.liftF( - learnAllTagCategories(analyser)(ctx.args.collective, maxItems).run(ctx) + LearnTags + .learnAllTagCategories(analyser)(ctx.args.collective, maxItems) + .run(ctx) ) } yield () - // learn classifier models from active tag categories learnTags.getOrElseF(logInactiveWarning(ctx.logger)) *> // delete classifier model files for categories that have been removed @@ -45,39 +68,6 @@ object LearnClassifierTask { .map(_ => ()) } - def learnTagCategory[F[_]: Sync: ContextShift, A]( - analyser: TextAnalyser[F], - collective: Ident, - maxItems: Int - )( - category: String - ): Task[F, A, Unit] = - Task { ctx => - val data = SelectItems.forCategory(ctx, collective)(maxItems, category) - ctx.logger.info(s"Learn classifier for tag category: $category") *> - analyser.classifier.trainClassifier(ctx.logger, data)( - Kleisli( - StoreClassifierModel.handleModel( - ctx, - collective, - ClassifierName.tagCategory(category) - ) - ) - ) - } - - def learnAllTagCategories[F[_]: Sync: ContextShift, A](analyser: TextAnalyser[F])( - collective: Ident, - maxItems: Int - ): Task[F, A, Unit] = - Task { ctx => - for { - cats <- ctx.store.transact(RClassifierSetting.getActiveCategories(collective)) - task = learnTagCategory[F, A](analyser, collective, maxItems) _ - _ <- cats.map(task).traverse(_.run(ctx)) - } yield () - } - private def clearObsoleteTagModels[F[_]: Sync](ctx: Context[F, Args]): F[Unit] = for { list <- ctx.store.transact( diff --git a/modules/joex/src/main/scala/docspell/joex/learn/LearnItemEntities.scala b/modules/joex/src/main/scala/docspell/joex/learn/LearnItemEntities.scala new file mode 100644 index 00000000..1dc48975 --- /dev/null +++ b/modules/joex/src/main/scala/docspell/joex/learn/LearnItemEntities.scala @@ -0,0 +1,74 @@ +package docspell.joex.learn + +import cats.data.Kleisli +import cats.effect._ +import cats.implicits._ +import fs2.Stream + +import docspell.analysis.TextAnalyser +import docspell.analysis.classifier.TextClassifier.Data +import docspell.common._ +import docspell.joex.scheduler._ + +object LearnItemEntities { + def learnAll[F[_]: Sync: ContextShift, A]( + analyser: TextAnalyser[F], + collective: Ident, + maxItems: Int + ): Task[F, A, Unit] = + learnCorrOrg(analyser, collective, maxItems) + .flatMap(_ => learnCorrPerson[F, A](analyser, collective, maxItems)) + .flatMap(_ => learnConcPerson(analyser, collective, maxItems)) + .flatMap(_ => learnConcEquip(analyser, collective, maxItems)) + + def learnCorrOrg[F[_]: Sync: ContextShift, A]( + analyser: TextAnalyser[F], + collective: Ident, + maxItems: Int + ): Task[F, A, Unit] = + learn(analyser, collective)( + ClassifierName.correspondentOrg, + ctx => SelectItems.forCorrOrg(ctx.store, collective, maxItems) + ) + + def learnCorrPerson[F[_]: Sync: ContextShift, A]( + analyser: TextAnalyser[F], + collective: Ident, + maxItems: Int + ): Task[F, A, Unit] = + learn(analyser, collective)( + ClassifierName.correspondentPerson, + ctx => SelectItems.forCorrPerson(ctx.store, collective, maxItems) + ) + + def learnConcPerson[F[_]: Sync: ContextShift, A]( + analyser: TextAnalyser[F], + collective: Ident, + maxItems: Int + ): Task[F, A, Unit] = + learn(analyser, collective)( + ClassifierName.concernedPerson, + ctx => SelectItems.forConcPerson(ctx.store, collective, maxItems) + ) + + def learnConcEquip[F[_]: Sync: ContextShift, A]( + analyser: TextAnalyser[F], + collective: Ident, + maxItems: Int + ): Task[F, A, Unit] = + learn(analyser, collective)( + ClassifierName.concernedEquip, + ctx => SelectItems.forConcEquip(ctx.store, collective, maxItems) + ) + + private def learn[F[_]: Sync: ContextShift, A]( + analyser: TextAnalyser[F], + collective: Ident + )(cname: ClassifierName, data: Context[F, _] => Stream[F, Data]): Task[F, A, Unit] = + Task { ctx => + ctx.logger.info(s"Learn classifier ${cname.name}") *> + analyser.classifier.trainClassifier(ctx.logger, data(ctx))( + Kleisli(StoreClassifierModel.handleModel(ctx, collective, cname)) + ) + } +} diff --git a/modules/joex/src/main/scala/docspell/joex/learn/LearnTags.scala b/modules/joex/src/main/scala/docspell/joex/learn/LearnTags.scala new file mode 100644 index 00000000..b24eb28d --- /dev/null +++ b/modules/joex/src/main/scala/docspell/joex/learn/LearnTags.scala @@ -0,0 +1,46 @@ +package docspell.joex.learn + +import cats.data.Kleisli +import cats.effect._ +import cats.implicits._ + +import docspell.analysis.TextAnalyser +import docspell.common._ +import docspell.joex.scheduler._ +import docspell.store.records.RClassifierSetting + +object LearnTags { + + def learnTagCategory[F[_]: Sync: ContextShift, A]( + analyser: TextAnalyser[F], + collective: Ident, + maxItems: Int + )( + category: String + ): Task[F, A, Unit] = + Task { ctx => + val data = SelectItems.forCategory(ctx, collective)(maxItems, category) + ctx.logger.info(s"Learn classifier for tag category: $category") *> + analyser.classifier.trainClassifier(ctx.logger, data)( + Kleisli( + StoreClassifierModel.handleModel( + ctx, + collective, + ClassifierName.tagCategory(category) + ) + ) + ) + } + + def learnAllTagCategories[F[_]: Sync: ContextShift, A](analyser: TextAnalyser[F])( + collective: Ident, + maxItems: Int + ): Task[F, A, Unit] = + Task { ctx => + for { + cats <- ctx.store.transact(RClassifierSetting.getActiveCategories(collective)) + task = learnTagCategory[F, A](analyser, collective, maxItems) _ + _ <- cats.map(task).traverse(_.run(ctx)) + } yield () + } +} diff --git a/modules/joex/src/main/scala/docspell/joex/learn/SelectItems.scala b/modules/joex/src/main/scala/docspell/joex/learn/SelectItems.scala index e7c31d7b..c6dab2f0 100644 --- a/modules/joex/src/main/scala/docspell/joex/learn/SelectItems.scala +++ b/modules/joex/src/main/scala/docspell/joex/learn/SelectItems.scala @@ -1,13 +1,15 @@ package docspell.joex.learn -import fs2.Stream +import fs2.{Pipe, Stream} import docspell.analysis.classifier.TextClassifier.Data import docspell.common._ import docspell.joex.scheduler.Context import docspell.store.Store import docspell.store.qb.Batch -import docspell.store.queries.QItem +import docspell.store.queries.{QItem, TextAndTag} + +import doobie._ object SelectItems { val pageSep = LearnClassifierTask.pageSep @@ -25,15 +27,67 @@ object SelectItems { max: Int, category: String ): Stream[F, Data] = { - val limit = if (max <= 0) Batch.all else Batch.limit(max) val connStream = - for { - item <- QItem.findAllNewesFirst(collective, 10, limit) - tt <- Stream.eval( - QItem.resolveTextAndTag(collective, item, category, pageSep) - ) - } yield Data(tt.tag.map(_.name).getOrElse(noClass), item.id, tt.text.trim) - store.transact(connStream.filter(_.text.nonEmpty)) + allItems(collective, max) + .evalMap(item => QItem.resolveTextAndTag(collective, item, category, pageSep)) + .through(mkData) + store.transact(connStream) } + def forCorrOrg[F[_]]( + store: Store[F], + collective: Ident, + max: Int + ): Stream[F, Data] = { + val connStream = + allItems(collective, max) + .evalMap(item => QItem.resolveTextAndCorrOrg(collective, item, pageSep)) + .through(mkData) + store.transact(connStream) + } + + def forCorrPerson[F[_]]( + store: Store[F], + collective: Ident, + max: Int + ): Stream[F, Data] = { + val connStream = + allItems(collective, max) + .evalMap(item => QItem.resolveTextAndCorrPerson(collective, item, pageSep)) + .through(mkData) + store.transact(connStream) + } + + def forConcPerson[F[_]]( + store: Store[F], + collective: Ident, + max: Int + ): Stream[F, Data] = { + val connStream = + allItems(collective, max) + .evalMap(item => QItem.resolveTextAndConcPerson(collective, item, pageSep)) + .through(mkData) + store.transact(connStream) + } + + def forConcEquip[F[_]]( + store: Store[F], + collective: Ident, + max: Int + ): Stream[F, Data] = { + val connStream = + allItems(collective, max) + .evalMap(item => QItem.resolveTextAndConcEquip(collective, item, pageSep)) + .through(mkData) + store.transact(connStream) + } + + private def allItems(collective: Ident, max: Int): Stream[ConnectionIO, Ident] = { + val limit = if (max <= 0) Batch.all else Batch.limit(max) + QItem.findAllNewesFirst(collective, 10, limit) + } + + private def mkData[F[_]]: Pipe[F, TextAndTag, Data] = + _.map(tt => Data(tt.tag.map(_.name).getOrElse(noClass), tt.itemId.id, tt.text.trim)) + .filter(_.text.nonEmpty) } diff --git a/modules/store/src/main/scala/docspell/store/queries/QItem.scala b/modules/store/src/main/scala/docspell/store/queries/QItem.scala index b68afb22..7de59437 100644 --- a/modules/store/src/main/scala/docspell/store/queries/QItem.scala +++ b/modules/store/src/main/scala/docspell/store/queries/QItem.scala @@ -567,7 +567,7 @@ object QItem { val tagsTid = Column[Ident]("tid", tags) val tagsName = Column[String]("tname", tags) - val q = + readTextAndTag(collective, itemId, pageSep) { withCte( tags -> Select( select(ti.itemId.as(tagsItem), tag.tid.as(tagsTid), tag.name.as(tagsName)), @@ -584,18 +584,87 @@ object QItem { .leftJoin(tags, tagsItem === i.id), i.id === itemId && i.cid === collective && m.content.isNotNull && m.content <> "" ) - ).build + ) + } + } + def resolveTextAndCorrOrg( + collective: Ident, + itemId: Ident, + pageSep: String + ): ConnectionIO[TextAndTag] = + readTextAndTag(collective, itemId, pageSep) { + Select( + select(m.content, org.oid, org.name), + from(i) + .innerJoin(a, a.itemId === i.id) + .innerJoin(m, m.id === a.id) + .leftJoin(org, org.oid === i.corrOrg), + i.id === itemId && m.content.isNotNull && m.content <> "" + ) + } + + def resolveTextAndCorrPerson( + collective: Ident, + itemId: Ident, + pageSep: String + ): ConnectionIO[TextAndTag] = + readTextAndTag(collective, itemId, pageSep) { + Select( + select(m.content, pers0.pid, pers0.name), + from(i) + .innerJoin(a, a.itemId === i.id) + .innerJoin(m, m.id === a.id) + .leftJoin(pers0, pers0.pid === i.corrPerson), + i.id === itemId && m.content.isNotNull && m.content <> "" + ) + } + + def resolveTextAndConcPerson( + collective: Ident, + itemId: Ident, + pageSep: String + ): ConnectionIO[TextAndTag] = + readTextAndTag(collective, itemId, pageSep) { + Select( + select(m.content, pers0.pid, pers0.name), + from(i) + .innerJoin(a, a.itemId === i.id) + .innerJoin(m, m.id === a.id) + .leftJoin(pers0, pers0.pid === i.concPerson), + i.id === itemId && m.content.isNotNull && m.content <> "" + ) + } + + def resolveTextAndConcEquip( + collective: Ident, + itemId: Ident, + pageSep: String + ): ConnectionIO[TextAndTag] = + readTextAndTag(collective, itemId, pageSep) { + Select( + select(m.content, equip.eid, equip.name), + from(i) + .innerJoin(a, a.itemId === i.id) + .innerJoin(m, m.id === a.id) + .leftJoin(equip, equip.eid === i.concEquipment), + i.id === itemId && m.content.isNotNull && m.content <> "" + ) + } + + private def readTextAndTag(collective: Ident, itemId: Ident, pageSep: String)( + q: Select + ): ConnectionIO[TextAndTag] = for { _ <- logger.ftrace[ConnectionIO]( - s"query: $q (${itemId.id}, ${collective.id}, ${tagCategory})" + s"query: $q (${itemId.id}, ${collective.id})" ) - texts <- q.query[(String, Option[TextAndTag.TagName])].to[List] + texts <- q.build.query[(String, Option[TextAndTag.TagName])].to[List] _ <- logger.ftrace[ConnectionIO]( s"Got ${texts.size} text and tag entries for item ${itemId.id}" ) tag = texts.headOption.flatMap(_._2) txt = texts.map(_._1).mkString(pageSep) } yield TextAndTag(itemId, txt, tag) - } + }