From 14ea4091c468a801ada771296932adc33b8a5865 Mon Sep 17 00:00:00 2001 From: Eike Kettner Date: Sun, 21 Jun 2020 13:15:02 +0200 Subject: [PATCH] Renaming things --- .../docspell/backend/ops/OFulltext.scala | 13 +++ .../scala/docspell/ftsclient/FtsClient.scala | 8 ++ .../docspell/ftssolr/SolrFtsClient.scala | 14 ++- .../scala/docspell/ftssolr/SolrUpdate.scala | 20 +++- .../scala/docspell/joex/JoexAppImpl.scala | 11 ++- .../scala/docspell/joex/fts/FtsContext.scala | 24 +++++ .../scala/docspell/joex/fts/FtsWork.scala | 72 ++++++++++++++ .../scala/docspell/joex/fts/IndexTask.scala | 55 ----------- .../scala/docspell/joex/fts/MigrateCtx.scala | 13 --- .../scala/docspell/joex/fts/Migration.scala | 8 +- .../docspell/joex/fts/MigrationTask.scala | 94 +++++++++---------- .../scala/docspell/joex/fts/package.scala | 2 +- 12 files changed, 206 insertions(+), 128 deletions(-) create mode 100644 modules/joex/src/main/scala/docspell/joex/fts/FtsContext.scala create mode 100644 modules/joex/src/main/scala/docspell/joex/fts/FtsWork.scala delete mode 100644 modules/joex/src/main/scala/docspell/joex/fts/IndexTask.scala delete mode 100644 modules/joex/src/main/scala/docspell/joex/fts/MigrateCtx.scala diff --git a/modules/backend/src/main/scala/docspell/backend/ops/OFulltext.scala b/modules/backend/src/main/scala/docspell/backend/ops/OFulltext.scala index 41669cbb..c96004b4 100644 --- a/modules/backend/src/main/scala/docspell/backend/ops/OFulltext.scala +++ b/modules/backend/src/main/scala/docspell/backend/ops/OFulltext.scala @@ -3,6 +3,7 @@ package docspell.backend.ops import cats.effect._ import cats.implicits._ import fs2.Stream +import docspell.common._ import docspell.ftsclient._ import OItemSearch.{Batch, ListItem, ListItemWithTags, Query} @@ -13,6 +14,15 @@ trait OFulltext[F[_]] { /** Same as `findItems` but does more queries per item to find all tags. */ def findItemsWithTags(q: Query, fts: String, batch: Batch): F[Vector[ListItemWithTags]] + /** Clears the full-text index completely and launches a task that + * indexes all data. + */ + def reindexAll: F[Unit] + + /** Clears the full-text index for the given collective and starts a + * task indexing all their data. + */ + def reindexCollective(collective: Ident): F[Unit] } object OFulltext { @@ -25,6 +35,9 @@ object OFulltext { fts: FtsClient[F] ): Resource[F, OFulltext[F]] = Resource.pure[F, OFulltext[F]](new OFulltext[F] { + def reindexAll: F[Unit] = ??? + + def reindexCollective(collective: Ident): F[Unit] = ??? def findItems(q: Query, ftsQ: String, batch: Batch): F[Vector[ListItem]] = findItemsFts(q, ftsQ, batch, itemSearch.findItems) diff --git a/modules/fts-client/src/main/scala/docspell/ftsclient/FtsClient.scala b/modules/fts-client/src/main/scala/docspell/ftsclient/FtsClient.scala index 9b9ee16f..a1bec931 100644 --- a/modules/fts-client/src/main/scala/docspell/ftsclient/FtsClient.scala +++ b/modules/fts-client/src/main/scala/docspell/ftsclient/FtsClient.scala @@ -18,6 +18,7 @@ trait FtsClient[F[_]] { */ def initialize: F[Unit] + /** Run a full-text search. */ def search(q: FtsQuery): F[FtsResult] def searchAll(q: FtsQuery): Stream[F, FtsResult] = @@ -81,4 +82,11 @@ trait FtsClient[F[_]] { None ) ) + + /** Clears the index – removes everything. */ + def clearAll(logger: Logger[F]): F[Unit] + + /** Clears the index from all data belonging to the given collective. */ + def clear(logger: Logger[F], collective: Ident): F[Unit] + } diff --git a/modules/fts-solr/src/main/scala/docspell/ftssolr/SolrFtsClient.scala b/modules/fts-solr/src/main/scala/docspell/ftssolr/SolrFtsClient.scala index f5fae6bc..cd269bef 100644 --- a/modules/fts-solr/src/main/scala/docspell/ftssolr/SolrFtsClient.scala +++ b/modules/fts-solr/src/main/scala/docspell/ftssolr/SolrFtsClient.scala @@ -26,8 +26,10 @@ final class SolrFtsClient[F[_]: Effect]( def updateIndex(logger: Logger[F], data: Stream[F, TextData]): F[Unit] = modifyIndex(logger, data)(solrUpdate.update) - def modifyIndex(logger: Logger[F], data: Stream[F, TextData])(f: List[TextData] => F[Unit]): F[Unit] = - (for { + def modifyIndex(logger: Logger[F], data: Stream[F, TextData])( + f: List[TextData] => F[Unit] + ): F[Unit] = + (for { _ <- Stream.eval(logger.debug("Updating SOLR index")) chunks <- data.chunks res <- Stream.eval(f(chunks.toList).attempt) @@ -37,6 +39,14 @@ final class SolrFtsClient[F[_]: Effect]( Stream.eval(logger.error(ex)("Error updating with chunk of data")) } } yield ()).compile.drain + + def clearAll(logger: Logger[F]): F[Unit] = + logger.info("Deleting complete full-text index!") *> + solrUpdate.delete("*:*") + + def clear(logger: Logger[F], collective: Ident): F[Unit] = + logger.info(s"Deleting full-text index for collective ${collective.id}") *> + solrUpdate.delete(s"${Field.collectiveId.name}:${collective.id}") } object SolrFtsClient { diff --git a/modules/fts-solr/src/main/scala/docspell/ftssolr/SolrUpdate.scala b/modules/fts-solr/src/main/scala/docspell/ftssolr/SolrUpdate.scala index c8de4d09..a2ed4146 100644 --- a/modules/fts-solr/src/main/scala/docspell/ftssolr/SolrUpdate.scala +++ b/modules/fts-solr/src/main/scala/docspell/ftssolr/SolrUpdate.scala @@ -6,6 +6,7 @@ import cats.implicits._ import org.http4s.client.Client import org.http4s.circe._ import org.http4s.client.dsl.Http4sClientDsl +import _root_.io.circe._ import _root_.io.circe.syntax._ import org.log4s.getLogger @@ -18,6 +19,7 @@ trait SolrUpdate[F[_]] { def update(tds: List[TextData]): F[Unit] + def delete(q: String): F[Unit] } object SolrUpdate { @@ -44,6 +46,11 @@ object SolrUpdate { client.expect[String](req).map(r => logger.debug(s"Req: $req Response: $r")) } + def delete(q: String): F[Unit] = { + val req = Method.POST(Delete(q).asJson, url) + client.expect[String](req).map(r => logger.debug(s"Req: $req Response: $r")) + } + private val minOneChange: TextData => Boolean = _ match { case td: TextData.Attachment => @@ -52,5 +59,16 @@ object SolrUpdate { td.name.isDefined || td.notes.isDefined } } - } + } + + case class Delete(query: String) + object Delete { + implicit val jsonEncoder: Encoder[Delete] = + new Encoder[Delete] { + def apply(d: Delete): Json = + Json.obj( + ("delete", Json.obj("query" -> d.query.asJson)) + ) + } + } } diff --git a/modules/joex/src/main/scala/docspell/joex/JoexAppImpl.scala b/modules/joex/src/main/scala/docspell/joex/JoexAppImpl.scala index c8a32fbf..d896dbd5 100644 --- a/modules/joex/src/main/scala/docspell/joex/JoexAppImpl.scala +++ b/modules/joex/src/main/scala/docspell/joex/JoexAppImpl.scala @@ -7,7 +7,7 @@ import docspell.common._ import docspell.backend.ops._ import docspell.joex.hk._ import docspell.joex.notify._ -import docspell.joex.fts.IndexTask +import docspell.joex.fts.MigrationTask import docspell.joex.scanmailbox._ import docspell.joex.process.ItemHandler import docspell.joex.scheduler._ @@ -56,7 +56,8 @@ final class JoexAppImpl[F[_]: ConcurrentEffect: ContextShift: Timer]( private def scheduleBackgroundTasks: F[Unit] = HouseKeepingTask .periodicTask[F](cfg.houseKeeping.schedule) - .flatMap(pstore.insert) *> IndexTask.job.flatMap(queue.insertIfNew) + .flatMap(pstore.insert) *> + MigrationTask.job.flatMap(queue.insertIfNew) } object JoexAppImpl { @@ -105,9 +106,9 @@ object JoexAppImpl { ) .withTask( JobTask.json( - IndexTask.taskName, - IndexTask[F](cfg.fullTextSearch, fts), - IndexTask.onCancel[F] + MigrationTask.taskName, + MigrationTask[F](cfg.fullTextSearch, fts), + MigrationTask.onCancel[F] ) ) .withTask( diff --git a/modules/joex/src/main/scala/docspell/joex/fts/FtsContext.scala b/modules/joex/src/main/scala/docspell/joex/fts/FtsContext.scala new file mode 100644 index 00000000..ac1267e6 --- /dev/null +++ b/modules/joex/src/main/scala/docspell/joex/fts/FtsContext.scala @@ -0,0 +1,24 @@ +package docspell.joex.fts + +import docspell.common.Logger +import docspell.joex.Config +import docspell.joex.scheduler.Context +import docspell.store.Store +import docspell.ftsclient.FtsClient + +case class FtsContext[F[_]]( + cfg: Config.FullTextSearch, + store: Store[F], + fts: FtsClient[F], + logger: Logger[F] +) + +object FtsContext { + + def apply[F[_]]( + cfg: Config.FullTextSearch, + fts: FtsClient[F], + ctx: Context[F, _] + ): FtsContext[F] = + FtsContext(cfg, ctx.store, fts, ctx.logger) +} diff --git a/modules/joex/src/main/scala/docspell/joex/fts/FtsWork.scala b/modules/joex/src/main/scala/docspell/joex/fts/FtsWork.scala new file mode 100644 index 00000000..4840511c --- /dev/null +++ b/modules/joex/src/main/scala/docspell/joex/fts/FtsWork.scala @@ -0,0 +1,72 @@ +package docspell.joex.fts + +import cats.effect._ +import cats.data.{Kleisli, NonEmptyList} +import cats.{FlatMap, Semigroup} +import docspell.store.queries.{QAttachment, QItem} +import docspell.ftsclient._ +import docspell.joex.scheduler.Context +import docspell.joex.Config + +object FtsWork { + def apply[F[_]](f: FtsContext[F] => F[Unit]): FtsWork[F] = + Kleisli(f) + + def all[F[_]: FlatMap]( + m0: FtsWork[F], + mn: FtsWork[F]* + ): FtsWork[F] = + NonEmptyList.of(m0, mn: _*).reduce(semigroup[F]) + + implicit def semigroup[F[_]: FlatMap]: Semigroup[FtsWork[F]] = + Semigroup.instance((mt1, mt2) => mt1.flatMap(_ => mt2)) + + implicit final class FtsWorkOps[F[_]](mt: FtsWork[F]) { + def ++(mn: FtsWork[F])(implicit ev: FlatMap[F]): FtsWork[F] = + all(mt, mn) + + def forContext( + cfg: Config.FullTextSearch, + fts: FtsClient[F] + ): Kleisli[F, Context[F, _], Unit] = + mt.local(ctx => FtsContext(cfg, fts, ctx)) + } + + // some tasks + + def initialize[F[_]]: FtsWork[F] = + FtsWork(_.fts.initialize) + + def insertAll[F[_]: Effect]: FtsWork[F] = + FtsWork + .all( + FtsWork(ctx => + ctx.fts.indexData( + ctx.logger, + ctx.store + .transact( + QAttachment.allAttachmentMetaAndName(ctx.cfg.migration.indexAllChunk) + ) + .map(caa => + TextData + .attachment( + caa.item, + caa.id, + caa.collective, + caa.lang, + caa.name, + caa.content + ) + ) + ) + ), + FtsWork(ctx => + ctx.fts.indexData( + ctx.logger, + ctx.store + .transact(QItem.allNameAndNotes(ctx.cfg.migration.indexAllChunk * 5)) + .map(nn => TextData.item(nn.id, nn.collective, Option(nn.name), nn.notes)) + ) + ) + ) +} diff --git a/modules/joex/src/main/scala/docspell/joex/fts/IndexTask.scala b/modules/joex/src/main/scala/docspell/joex/fts/IndexTask.scala deleted file mode 100644 index 70e01631..00000000 --- a/modules/joex/src/main/scala/docspell/joex/fts/IndexTask.scala +++ /dev/null @@ -1,55 +0,0 @@ -package docspell.joex.fts - -import cats.effect._ -import cats.implicits._ -import docspell.common._ -import docspell.joex.Config -import docspell.joex.scheduler.Task -import docspell.ftsclient._ -import docspell.store.records.RJob -import docspell.joex.hk.HouseKeepingTask - -object IndexTask { - val taskName: Ident = Ident.unsafe("full-text-index") - val systemGroup = HouseKeepingTask.systemGroup - - def apply[F[_]: ConcurrentEffect]( - cfg: Config.FullTextSearch, - fts: FtsClient[F] - ): Task[F, Unit, Unit] = - Task - .log[F, Unit](_.info(s"Running full-text-index task now")) - .flatMap(_ => - Task(ctx => - Migration[F](cfg, ctx.store, fts, ctx.logger) - .run(migrationTasks[F]) - ) - ) - - def onCancel[F[_]: Sync]: Task[F, Unit, Unit] = - Task.log[F, Unit](_.warn("Cancelling full-text-index task")) - - def job[F[_]: Sync]: F[RJob] = - for { - id <- Ident.randomId[F] - now <- Timestamp.current[F] - } yield RJob.newJob( - id, - taskName, - systemGroup, - (), - "Create full-text index", - now, - systemGroup, - Priority.Low, - Some(taskName) - ) - - private val solrEngine = Ident.unsafe("solr") - def migrationTasks[F[_]: Effect]: List[Migration[F]] = - List( - Migration[F](1, solrEngine, "initialize", MigrationTask[F](ctx => ctx.fts.initialize)), - Migration[F](2, solrEngine, "Index all from database", MigrationTask.insertAll[F]) - ) - -} diff --git a/modules/joex/src/main/scala/docspell/joex/fts/MigrateCtx.scala b/modules/joex/src/main/scala/docspell/joex/fts/MigrateCtx.scala deleted file mode 100644 index 99d5d367..00000000 --- a/modules/joex/src/main/scala/docspell/joex/fts/MigrateCtx.scala +++ /dev/null @@ -1,13 +0,0 @@ -package docspell.joex.fts - -import docspell.common.Logger -import docspell.joex.Config -import docspell.store.Store -import docspell.ftsclient.FtsClient - -case class MigrateCtx[F[_]]( - cfg: Config.FullTextSearch, - store: Store[F], - fts: FtsClient[F], - logger: Logger[F] -) diff --git a/modules/joex/src/main/scala/docspell/joex/fts/Migration.scala b/modules/joex/src/main/scala/docspell/joex/fts/Migration.scala index 0c50eba8..cfc63940 100644 --- a/modules/joex/src/main/scala/docspell/joex/fts/Migration.scala +++ b/modules/joex/src/main/scala/docspell/joex/fts/Migration.scala @@ -14,22 +14,22 @@ case class Migration[F[_]]( version: Int, engine: Ident, description: String, - task: MigrationTask[F] + task: FtsWork[F] ) object Migration { def apply[F[_]: Effect]( cfg: Config.FullTextSearch, - store: Store[F], fts: FtsClient[F], + store: Store[F], logger: Logger[F] ): Kleisli[F, List[Migration[F]], Unit] = { - val ctx = MigrateCtx(cfg, store, fts, logger) + val ctx = FtsContext(cfg, store, fts, logger) Kleisli(migs => Traverse[List].sequence(migs.map(applySingle[F](ctx))).map(_ => ())) } - def applySingle[F[_]: Effect](ctx: MigrateCtx[F])(m: Migration[F]): F[Unit] = { + def applySingle[F[_]: Effect](ctx: FtsContext[F])(m: Migration[F]): F[Unit] = { val insertRecord: F[Option[RFtsMigration]] = for { rec <- RFtsMigration.create(m.version, m.engine, m.description) diff --git a/modules/joex/src/main/scala/docspell/joex/fts/MigrationTask.scala b/modules/joex/src/main/scala/docspell/joex/fts/MigrationTask.scala index a2a6a671..46f58b48 100644 --- a/modules/joex/src/main/scala/docspell/joex/fts/MigrationTask.scala +++ b/modules/joex/src/main/scala/docspell/joex/fts/MigrationTask.scala @@ -1,56 +1,56 @@ package docspell.joex.fts import cats.effect._ -import cats.data.{Kleisli, NonEmptyList} -import cats.{FlatMap, Semigroup} -import docspell.store.queries.{QAttachment, QItem} -import docspell.ftsclient.TextData +import cats.implicits._ +import docspell.common._ +import docspell.joex.Config +import docspell.joex.scheduler.Task +import docspell.ftsclient._ +import docspell.store.records.RJob +import docspell.joex.hk.HouseKeepingTask object MigrationTask { - def apply[F[_]](f: MigrateCtx[F] => F[Unit]): MigrationTask[F] = - Kleisli(f) + val taskName = Ident.unsafe("full-text-index") + val tracker = Ident.unsafe("full-text-index-tracker") + val systemGroup = HouseKeepingTask.systemGroup - def all[F[_]: FlatMap]( - m0: MigrationTask[F], - mn: MigrationTask[F]* - ): MigrationTask[F] = - NonEmptyList.of(m0, mn: _*).reduce(semigroup[F]) - - implicit def semigroup[F[_]: FlatMap]: Semigroup[MigrationTask[F]] = - Semigroup.instance((mt1, mt2) => mt1.flatMap(_ => mt2)) - - // some tasks - - def insertAll[F[_]: Effect]: MigrationTask[F] = - MigrationTask - .all( - MigrationTask(ctx => - ctx.fts.indexData( - ctx.logger, - ctx.store - .transact( - QAttachment.allAttachmentMetaAndName(ctx.cfg.migration.indexAllChunk) - ) - .map(caa => - TextData - .attachment( - caa.item, - caa.id, - caa.collective, - caa.lang, - caa.name, - caa.content - ) - ) - ) - ), - MigrationTask(ctx => - ctx.fts.indexData( - ctx.logger, - ctx.store - .transact(QItem.allNameAndNotes(ctx.cfg.migration.indexAllChunk * 5)) - .map(nn => TextData.item(nn.id, nn.collective, Option(nn.name), nn.notes)) - ) + def apply[F[_]: ConcurrentEffect]( + cfg: Config.FullTextSearch, + fts: FtsClient[F] + ): Task[F, Unit, Unit] = + Task + .log[F, Unit](_.info(s"Running full-text-index migrations now")) + .flatMap(_ => + Task(ctx => + Migration[F](cfg, fts, ctx.store, ctx.logger) + .run(migrationTasks[F]) ) ) + + def onCancel[F[_]: Sync]: Task[F, Unit, Unit] = + Task.log[F, Unit](_.warn("Cancelling full-text-index task")) + + def job[F[_]: Sync]: F[RJob] = + for { + id <- Ident.randomId[F] + now <- Timestamp.current[F] + } yield RJob.newJob( + id, + taskName, + systemGroup, + (), + "Create full-text index", + now, + systemGroup, + Priority.Low, + Some(tracker) + ) + + private val solrEngine = Ident.unsafe("solr") + def migrationTasks[F[_]: Effect]: List[Migration[F]] = + List( + Migration[F](1, solrEngine, "initialize", FtsWork.initialize[F]), + Migration[F](2, solrEngine, "Index all from database", FtsWork.insertAll[F]) + ) + } diff --git a/modules/joex/src/main/scala/docspell/joex/fts/package.scala b/modules/joex/src/main/scala/docspell/joex/fts/package.scala index 80b2ef26..784754ab 100644 --- a/modules/joex/src/main/scala/docspell/joex/fts/package.scala +++ b/modules/joex/src/main/scala/docspell/joex/fts/package.scala @@ -4,6 +4,6 @@ import cats.data.Kleisli package object fts { - type MigrationTask[F[_]] = Kleisli[F, MigrateCtx[F], Unit] + type FtsWork[F[_]] = Kleisli[F, FtsContext[F], Unit] }