diff --git a/modules/fts-client/src/main/scala/docspell/ftsclient/FtsClient.scala b/modules/fts-client/src/main/scala/docspell/ftsclient/FtsClient.scala index e63e53dd..b4db64c0 100644 --- a/modules/fts-client/src/main/scala/docspell/ftsclient/FtsClient.scala +++ b/modules/fts-client/src/main/scala/docspell/ftsclient/FtsClient.scala @@ -1,6 +1,7 @@ package docspell.ftsclient import fs2.Stream +import docspell.common._ /** The fts client is the interface for docspell to a fulltext search * engine. @@ -12,7 +13,13 @@ import fs2.Stream */ trait FtsClient[F[_]] { + /** Optional operation to do some initialization tasks. This is called + * exactly once and then never again. It may be used to setup the + * database. + */ + def initialize: F[Unit] + def searchBasic(q: FtsQuery): Stream[F, FtsBasicResult] - def indexData(data: Stream[F, TextData]): F[Unit] + def indexData(logger: Logger[F], data: Stream[F, TextData]): F[Unit] } diff --git a/modules/fts-solr/src/main/scala/docspell/ftssolr/SolrFtsClient.scala b/modules/fts-solr/src/main/scala/docspell/ftssolr/SolrFtsClient.scala index 857a0196..6fc50c3e 100644 --- a/modules/fts-solr/src/main/scala/docspell/ftssolr/SolrFtsClient.scala +++ b/modules/fts-solr/src/main/scala/docspell/ftssolr/SolrFtsClient.scala @@ -2,6 +2,7 @@ package docspell.ftssolr import fs2.Stream import cats.effect._ +import cats.implicits._ import org.http4s.client.Client import cats.data.NonEmptyList @@ -9,8 +10,15 @@ import docspell.common._ import docspell.ftsclient._ import docspell.ftsclient.FtsBasicResult._ -final class SolrFtsClient[F[_]](cfg: SolrConfig, client: Client[F]) extends FtsClient[F] { +final class SolrFtsClient[F[_]: Effect]( + cfg: SolrConfig, + client: Client[F] +) extends FtsClient[F] { println(s"$client $cfg") + + def initialize: F[Unit] = + ().pure[F] + def searchBasic(q: FtsQuery): Stream[F, FtsBasicResult] = Stream.emits( Seq( @@ -25,8 +33,9 @@ final class SolrFtsClient[F[_]](cfg: SolrConfig, client: Client[F]) extends FtsC ) ) - def indexData(data: Stream[F, TextData]): F[Unit] = - ??? + def indexData(logger: Logger[F], data: Stream[F, TextData]): F[Unit] = + logger.info("Inserting lots of data into index") + } object SolrFtsClient { diff --git a/modules/joex/src/main/resources/reference.conf b/modules/joex/src/main/resources/reference.conf index d330771f..efb9c5fc 100644 --- a/modules/joex/src/main/resources/reference.conf +++ b/modules/joex/src/main/resources/reference.conf @@ -368,6 +368,9 @@ docspell.joex { # Configuration of the full-text search engine. full-text-search { enabled = true + migration = { + index-all-chunk = 10 + } solr = { url = "http://localhost:8983/solr/docspell_core" } diff --git a/modules/joex/src/main/scala/docspell/joex/Config.scala b/modules/joex/src/main/scala/docspell/joex/Config.scala index ca3597eb..c9c54528 100644 --- a/modules/joex/src/main/scala/docspell/joex/Config.scala +++ b/modules/joex/src/main/scala/docspell/joex/Config.scala @@ -37,5 +37,14 @@ object Config { } case class UserTasks(scanMailbox: ScanMailbox) - case class FullTextSearch(enabled: Boolean, solr: SolrConfig) + case class FullTextSearch( + enabled: Boolean, + migration: FullTextSearch.Migration, + solr: SolrConfig + ) + + object FullTextSearch { + + final case class Migration(indexAllChunk: Int) + } } diff --git a/modules/joex/src/main/scala/docspell/joex/JoexAppImpl.scala b/modules/joex/src/main/scala/docspell/joex/JoexAppImpl.scala index d4812636..05e5e940 100644 --- a/modules/joex/src/main/scala/docspell/joex/JoexAppImpl.scala +++ b/modules/joex/src/main/scala/docspell/joex/JoexAppImpl.scala @@ -7,6 +7,7 @@ import docspell.common._ import docspell.backend.ops._ import docspell.joex.hk._ import docspell.joex.notify._ +import docspell.joex.fts.IndexTask import docspell.joex.scanmailbox._ import docspell.joex.process.ItemHandler import docspell.joex.scheduler._ @@ -23,6 +24,7 @@ final class JoexAppImpl[F[_]: ConcurrentEffect: ContextShift: Timer]( cfg: Config, nodeOps: ONode[F], store: Store[F], + queue: JobQueue[F], pstore: PeriodicTaskStore[F], termSignal: SignallingRef[F, Boolean], val scheduler: Scheduler[F], @@ -52,7 +54,9 @@ final class JoexAppImpl[F[_]: ConcurrentEffect: ContextShift: Timer]( periodicScheduler.shutdown *> scheduler.shutdown(false) *> termSignal.set(true) private def scheduleBackgroundTasks: F[Unit] = - HouseKeepingTask.periodicTask[F](cfg.houseKeeping.schedule).flatMap(pstore.insert) + HouseKeepingTask + .periodicTask[F](cfg.houseKeeping.schedule) + .flatMap(pstore.insert) *> IndexTask.job.flatMap(queue.insert) } object JoexAppImpl { @@ -99,6 +103,13 @@ object JoexAppImpl { ScanMailboxTask.onCancel[F] ) ) + .withTask( + JobTask.json( + IndexTask.taskName, + IndexTask[F](cfg.fullTextSearch, fts), + IndexTask.onCancel[F] + ) + ) .withTask( JobTask.json( HouseKeepingTask.taskName, @@ -115,7 +126,7 @@ object JoexAppImpl { client, Timer[F] ) - app = new JoexAppImpl(cfg, nodeOps, store, pstore, termSignal, sch, psch) + app = new JoexAppImpl(cfg, nodeOps, store, queue, pstore, termSignal, sch, psch) appR <- Resource.make(app.init.map(_ => app))(_.shutdown) } yield appR } diff --git a/modules/joex/src/main/scala/docspell/joex/fts/IndexTask.scala b/modules/joex/src/main/scala/docspell/joex/fts/IndexTask.scala new file mode 100644 index 00000000..5be969d9 --- /dev/null +++ b/modules/joex/src/main/scala/docspell/joex/fts/IndexTask.scala @@ -0,0 +1,48 @@ +package docspell.joex.fts + +import cats.effect._ +import cats.implicits._ +import docspell.common._ +import docspell.joex.Config +import docspell.joex.scheduler.Task +import docspell.ftsclient._ +import docspell.store.records.RJob +import docspell.joex.hk.HouseKeepingTask + +object IndexTask { + val taskName: Ident = Ident.unsafe("full-text-index") + val systemGroup = HouseKeepingTask.systemGroup + + def apply[F[_]: ConcurrentEffect]( + cfg: Config.FullTextSearch, + fts: FtsClient[F] + ): Task[F, Unit, Unit] = + Task + .log[F, Unit](_.info(s"Running full-text-index task now")) + .flatMap(_ => + Task(ctx => + Migration[F](cfg, ctx.store, fts, ctx.logger) + .run(Migration.migrationTasks[F]) + ) + ) + + def onCancel[F[_]: Sync]: Task[F, Unit, Unit] = + Task.log[F, Unit](_.warn("Cancelling full-text-index task")) + + def job[F[_]: Sync]: F[RJob] = + for { + id <- Ident.randomId[F] + now <- Timestamp.current[F] + } yield RJob.newJob( + id, + taskName, + systemGroup, + (), + "Create full-text index", + now, + systemGroup, + Priority.Low, + None + ) + +} diff --git a/modules/joex/src/main/scala/docspell/joex/fts/Migration.scala b/modules/joex/src/main/scala/docspell/joex/fts/Migration.scala new file mode 100644 index 00000000..546198ad --- /dev/null +++ b/modules/joex/src/main/scala/docspell/joex/fts/Migration.scala @@ -0,0 +1,110 @@ +package docspell.joex.fts + +import cats.effect._ +import cats.implicits._ +import cats.data.{Kleisli, OptionT} +import cats.Traverse +import docspell.common._ +import docspell.joex.Config +import docspell.store.{AddResult, Store} +import docspell.store.records.RFtsMigration +import docspell.store.queries.{QAttachment, QItem} +import docspell.ftsclient._ + +object Migration { + private val solrEngine = Ident.unsafe("solr") + + case class MigrateCtx[F[_]]( + cfg: Config.FullTextSearch, + store: Store[F], + fts: FtsClient[F], + logger: Logger[F] + ) + + case class Migration[F[_]]( + version: Int, + engine: Ident, + description: String, + task: Kleisli[F, MigrateCtx[F], Unit] + ) + + def apply[F[_]: Effect]( + cfg: Config.FullTextSearch, + store: Store[F], + fts: FtsClient[F], + logger: Logger[F] + ): Kleisli[F, List[Migration[F]], Unit] = { + val ctx = MigrateCtx(cfg, store, fts, logger) + Kleisli(migs => Traverse[List].sequence(migs.map(applySingle[F](ctx))).map(_ => ())) + } + + def applySingle[F[_]: Effect](ctx: MigrateCtx[F])(m: Migration[F]): F[Unit] = { + val insertRecord: F[Option[RFtsMigration]] = + for { + rec <- RFtsMigration.create(m.version, m.engine, m.description) + res <- ctx.store.add( + RFtsMigration.insert(rec), + RFtsMigration.exists(m.version, m.engine) + ) + ret <- res match { + case AddResult.Success => rec.some.pure[F] + case AddResult.EntityExists(_) => None.pure[F] + case AddResult.Failure(ex) => Effect[F].raiseError(ex) + } + } yield ret + + (for { + _ <- OptionT.liftF(ctx.logger.info(s"Apply ${m.version}/${m.description}")) + rec <- OptionT(insertRecord) + res <- OptionT.liftF(m.task.run(ctx).attempt) + _ <- OptionT.liftF(res match { + case Right(()) => ().pure[F] + case Left(ex) => + ctx.logger.error(ex)( + s"Applying index migration ${m.version}/${m.description} failed" + ) *> + ctx.store.transact(RFtsMigration.deleteById(rec.id)) *> Effect[F].raiseError( + ex + ) + }) + } yield ()).getOrElseF( + ctx.logger.info(s"Migration ${m.version}/${m.description} already applied.") + ) + } + + def migrationTasks[F[_]]: List[Migration[F]] = + List( + Migration[F](1, solrEngine, "initialize", Kleisli(ctx => ctx.fts.initialize)), + Migration[F]( + 2, + solrEngine, + "Index all attachments from database", + Kleisli(ctx => + ctx.fts.indexData( + ctx.logger, + ctx.store + .transact( + QAttachment.allAttachmentMetaAndName(ctx.cfg.migration.indexAllChunk) + ) + .map(caa => + TextData + .attachment(caa.item, caa.id, caa.collective, caa.name, caa.content) + ) + ) + ) + ), + Migration[F]( + 3, + solrEngine, + "Index all items from database", + Kleisli(ctx => + ctx.fts.indexData( + ctx.logger, + ctx.store + .transact(QItem.allNameAndNotes(ctx.cfg.migration.indexAllChunk * 5)) + .map(nn => TextData.item(nn.id, nn.collective, Option(nn.name), nn.notes)) + ) + ) + ) + ) +} diff --git a/modules/joex/src/main/scala/docspell/joex/process/TextExtraction.scala b/modules/joex/src/main/scala/docspell/joex/process/TextExtraction.scala index 5f87e891..543da0ee 100644 --- a/modules/joex/src/main/scala/docspell/joex/process/TextExtraction.scala +++ b/modules/joex/src/main/scala/docspell/joex/process/TextExtraction.scala @@ -32,7 +32,7 @@ object TextExtraction { ) _ <- ctx.logger.debug("Storing extracted texts") _ <- txt.toList.traverse(rm => ctx.store.transact(RAttachmentMeta.upsert(rm._1))) - _ <- fts.indexData(Stream.emits(txt.map(_._2))) + _ <- fts.indexData(ctx.logger, Stream.emits(txt.map(_._2))) dur <- start _ <- ctx.logger.info(s"Text extraction finished in ${dur.formatExact}") } yield item.copy(metas = txt.map(_._1)) diff --git a/modules/store/src/main/resources/db/migration/postgresql/V1.7.0__fts-migration.sql b/modules/store/src/main/resources/db/migration/postgresql/V1.7.0__fts-migration.sql new file mode 100644 index 00000000..993a14ca --- /dev/null +++ b/modules/store/src/main/resources/db/migration/postgresql/V1.7.0__fts-migration.sql @@ -0,0 +1,10 @@ +CREATE TABLE "fts_migration" ( + "id" varchar(254) not null primary key, + "version" int not null, + "fts_engine" varchar(254) not null, + "description" varchar(254) not null, + "created" timestamp not null +); + +CREATE UNIQE INDEX "fts_migration_version_engine_idx" +ON "fts_migration"("version", "fts_engine"); diff --git a/modules/store/src/main/scala/docspell/store/queries/QAttachment.scala b/modules/store/src/main/scala/docspell/store/queries/QAttachment.scala index 56b2fffe..a09f58ff 100644 --- a/modules/store/src/main/scala/docspell/store/queries/QAttachment.scala +++ b/modules/store/src/main/scala/docspell/store/queries/QAttachment.scala @@ -138,4 +138,30 @@ object QAttachment { q.query[RAttachmentMeta].option } + + case class ContentAndName( + id: Ident, + item: Ident, + collective: Ident, + name: Option[String], + content: Option[String] + ) + def allAttachmentMetaAndName(chunkSize: Int): Stream[ConnectionIO, ContentAndName] = { + val aId = RAttachment.Columns.id.prefix("a") + val aItem = RAttachment.Columns.itemId.prefix("a") + val aName = RAttachment.Columns.name.prefix("a") + val mId = RAttachmentMeta.Columns.id.prefix("m") + val mContent = RAttachmentMeta.Columns.content.prefix("m") + val iId = RItem.Columns.id.prefix("i") + val iColl = RItem.Columns.cid.prefix("i") + + val cols = Seq(aId, aItem, iColl, aName, mContent) + val from = RAttachment.table ++ fr"a INNER JOIN" ++ + RAttachmentMeta.table ++ fr"m ON" ++ aId.is(mId) ++ + fr"INNER JOIN" ++ RItem.table ++ fr"i ON" ++ iId.is(aItem) + + selectSimple(cols, from, Fragment.empty) + .query[ContentAndName] + .streamWithChunkSize(chunkSize) + } } diff --git a/modules/store/src/main/scala/docspell/store/queries/QItem.scala b/modules/store/src/main/scala/docspell/store/queries/QItem.scala index 8485c46d..2d8970bd 100644 --- a/modules/store/src/main/scala/docspell/store/queries/QItem.scala +++ b/modules/store/src/main/scala/docspell/store/queries/QItem.scala @@ -469,4 +469,21 @@ object QItem { prefix(suffix(value)) } + final case class NameAndNotes( + id: Ident, + collective: Ident, + name: String, + notes: Option[String] + ) + def allNameAndNotes(chunkSize: Int): Stream[ConnectionIO, NameAndNotes] = { + val iId = RItem.Columns.id + val iColl = RItem.Columns.cid + val iName = RItem.Columns.name + val iNotes = RItem.Columns.notes + + val cols = Seq(iId, iColl, iName, iNotes) + selectSimple(cols, RItem.table, Fragment.empty) + .query[NameAndNotes] + .streamWithChunkSize(chunkSize) + } } diff --git a/modules/store/src/main/scala/docspell/store/records/RFtsMigration.scala b/modules/store/src/main/scala/docspell/store/records/RFtsMigration.scala new file mode 100644 index 00000000..ab2e40c0 --- /dev/null +++ b/modules/store/src/main/scala/docspell/store/records/RFtsMigration.scala @@ -0,0 +1,59 @@ +package docspell.store.records + +import cats.implicits._ +import cats.effect._ +import doobie._ +import doobie.implicits._ +import docspell.common._ +import docspell.store.impl._ +import docspell.store.impl.Implicits._ + +final case class RFtsMigration( + id: Ident, + version: Int, + ftsEngine: Ident, + description: String, + created: Timestamp +) + +object RFtsMigration { + + def create[F[_]: Sync]( + version: Int, + ftsEngine: Ident, + description: String + ): F[RFtsMigration] = + for { + newId <- Ident.randomId[F] + now <- Timestamp.current[F] + } yield RFtsMigration(newId, version, ftsEngine, description, now) + + val table = fr"fts_migration" + + object Columns { + val id = Column("id") + val version = Column("version") + val ftsEngine = Column("fts_engine") + val description = Column("description") + val created = Column("created") + + val all = List(id, version, ftsEngine, description, created) + } + import Columns._ + + def insert(v: RFtsMigration): ConnectionIO[Int] = + insertRow( + table, + all, + fr"${v.id},${v.version},${v.ftsEngine},${v.description},${v.created}" + ).update.run + + def exists(vers: Int, engine: Ident): ConnectionIO[Boolean] = + selectCount(id, table, and(version.is(vers), ftsEngine.is(engine))) + .query[Int] + .unique + .map(_ > 0) + + def deleteById(rId: Ident): ConnectionIO[Int] = + deleteFrom(table, id.is(rId)).update.run +}