From 2a0bf240887f724e0975c885f84f70ccabe02335 Mon Sep 17 00:00:00 2001 From: Eike Kettner Date: Fri, 19 Jun 2020 20:49:59 +0200 Subject: [PATCH] Setup solr schema and index all data using a system task The task runs on application start. It sets the schema using solr's schema api and then indexes all data in the database. Each step is memorized so that it is not executed again on subsequent starts. --- .../main/scala/docspell/ftssolr/Fields.scala | 19 ++++ .../scala/docspell/ftssolr/JsonCodec.scala | 26 ++--- .../scala/docspell/ftssolr/SolrConfig.scala | 2 +- .../docspell/ftssolr/SolrFtsClient.scala | 7 +- .../scala/docspell/ftssolr/SolrSetup.scala | 102 ++++++++++++++++++ .../scala/docspell/ftssolr/SolrUpdate.scala | 2 +- .../joex/src/main/resources/reference.conf | 1 + .../scala/docspell/joex/JoexAppImpl.scala | 2 +- .../scala/docspell/joex/fts/IndexTask.scala | 2 +- .../scala/docspell/joex/fts/Migration.scala | 13 +-- .../src/main/resources/reference.conf | 1 + .../store/queries/QPeriodicTask.scala | 11 -- .../scala/docspell/store/queue/JobQueue.scala | 24 +++++ .../store/queue/PeriodicTaskStore.scala | 2 +- .../scala/docspell/store/records/RJob.scala | 8 ++ 15 files changed, 185 insertions(+), 37 deletions(-) create mode 100644 modules/fts-solr/src/main/scala/docspell/ftssolr/Fields.scala create mode 100644 modules/fts-solr/src/main/scala/docspell/ftssolr/SolrSetup.scala diff --git a/modules/fts-solr/src/main/scala/docspell/ftssolr/Fields.scala b/modules/fts-solr/src/main/scala/docspell/ftssolr/Fields.scala new file mode 100644 index 00000000..f9ecc354 --- /dev/null +++ b/modules/fts-solr/src/main/scala/docspell/ftssolr/Fields.scala @@ -0,0 +1,19 @@ +package docspell.ftssolr + +object Fields { + val discriminator = "discriminator" + val id = "id" + val itemId = "itemId" + val collectiveId = "collectiveId" + + object Attachment { + val attachmentId = "attachmentId" + val attachmentName = "attachmentName" + val content = "content" + } + + object Item { + val itemName = "itemName" + val itemNotes = "itemNotes" + } +} diff --git a/modules/fts-solr/src/main/scala/docspell/ftssolr/JsonCodec.scala b/modules/fts-solr/src/main/scala/docspell/ftssolr/JsonCodec.scala index d240e8e1..9ad35645 100644 --- a/modules/fts-solr/src/main/scala/docspell/ftssolr/JsonCodec.scala +++ b/modules/fts-solr/src/main/scala/docspell/ftssolr/JsonCodec.scala @@ -3,30 +3,32 @@ package docspell.ftssolr import docspell.common._ import docspell.ftsclient._ import io.circe._ +import Fields.{Item, Attachment} trait JsonCodec { implicit def attachmentEncoder: Encoder[TextData.Attachment] = new Encoder[TextData.Attachment] { final def apply(td: TextData.Attachment): Json = Json.obj( - ("id", Ident.encodeIdent(td.id)), - ("item", Ident.encodeIdent(td.item)), - ("collective", Ident.encodeIdent(td.collective)), - ("attachmentName", Json.fromString(td.name.getOrElse(""))), - ("content", Json.fromString(td.text.getOrElse(""))), - ("discriminator", Json.fromString("attachment")) + (Fields.id, Ident.encodeIdent(td.id)), + (Fields.itemId, Ident.encodeIdent(td.item)), + (Fields.collectiveId, Ident.encodeIdent(td.collective)), + (Attachment.attachmentId, Ident.encodeIdent(td.attachId)), + (Attachment.attachmentName, Json.fromString(td.name.getOrElse(""))), + (Attachment.content, Json.fromString(td.text.getOrElse(""))), + (Fields.discriminator, Json.fromString("attachment")) ) } implicit def itemEncoder: Encoder[TextData.Item] = new Encoder[TextData.Item] { final def apply(td: TextData.Item): Json = Json.obj( - ("id", Ident.encodeIdent(td.id)), - ("item", Ident.encodeIdent(td.item)), - ("collective", Ident.encodeIdent(td.collective)), - ("itemName", Json.fromString(td.name.getOrElse(""))), - ("itemNotes", Json.fromString(td.notes.getOrElse(""))), - ("discriminator", Json.fromString("item")) + (Fields.id, Ident.encodeIdent(td.id)), + (Fields.itemId, Ident.encodeIdent(td.item)), + (Fields.collectiveId, Ident.encodeIdent(td.collective)), + (Item.itemName, Json.fromString(td.name.getOrElse(""))), + (Item.itemNotes, Json.fromString(td.notes.getOrElse(""))), + (Fields.discriminator, Json.fromString("item")) ) } diff --git a/modules/fts-solr/src/main/scala/docspell/ftssolr/SolrConfig.scala b/modules/fts-solr/src/main/scala/docspell/ftssolr/SolrConfig.scala index 41fb5ad2..89ecb1ea 100644 --- a/modules/fts-solr/src/main/scala/docspell/ftssolr/SolrConfig.scala +++ b/modules/fts-solr/src/main/scala/docspell/ftssolr/SolrConfig.scala @@ -2,6 +2,6 @@ package docspell.ftssolr import docspell.common._ -final case class SolrConfig(url: LenientUri) +final case class SolrConfig(url: LenientUri, commitWithin: Int) object SolrConfig {} diff --git a/modules/fts-solr/src/main/scala/docspell/ftssolr/SolrFtsClient.scala b/modules/fts-solr/src/main/scala/docspell/ftssolr/SolrFtsClient.scala index f6281f1f..14eaa8d2 100644 --- a/modules/fts-solr/src/main/scala/docspell/ftssolr/SolrFtsClient.scala +++ b/modules/fts-solr/src/main/scala/docspell/ftssolr/SolrFtsClient.scala @@ -11,11 +11,12 @@ import docspell.ftsclient._ import docspell.ftsclient.FtsBasicResult._ final class SolrFtsClient[F[_]: Effect]( - solrUpdate: SolrUpdate[F] + solrUpdate: SolrUpdate[F], + solrSetup: SolrSetup[F] ) extends FtsClient[F] { def initialize: F[Unit] = - ().pure[F] + solrSetup.setupSchema def searchBasic(q: FtsQuery): Stream[F, FtsBasicResult] = Stream.emits( @@ -52,7 +53,7 @@ object SolrFtsClient { httpClient: Client[F] ): Resource[F, FtsClient[F]] = Resource.pure[F, FtsClient[F]]( - new SolrFtsClient(SolrUpdate(cfg, httpClient)) + new SolrFtsClient(SolrUpdate(cfg, httpClient), SolrSetup(cfg, httpClient)) ) } diff --git a/modules/fts-solr/src/main/scala/docspell/ftssolr/SolrSetup.scala b/modules/fts-solr/src/main/scala/docspell/ftssolr/SolrSetup.scala new file mode 100644 index 00000000..616dfaff --- /dev/null +++ b/modules/fts-solr/src/main/scala/docspell/ftssolr/SolrSetup.scala @@ -0,0 +1,102 @@ +package docspell.ftssolr + +import cats.effect._ +import org.http4s._ +import cats.implicits._ +import org.http4s.client.Client +import org.http4s.circe._ +import org.http4s.client.dsl.Http4sClientDsl +import org.log4s.getLogger +import _root_.io.circe.syntax._ +import _root_.io.circe._ +import _root_.io.circe.generic.semiauto._ + +import Fields.{Attachment, Item} + +trait SolrSetup[F[_]] { + + def setupSchema: F[Unit] + +} + +object SolrSetup { + private[this] val logger = getLogger + + def apply[F[_]: ConcurrentEffect](cfg: SolrConfig, client: Client[F]): SolrSetup[F] = { + val dsl = new Http4sClientDsl[F] {} + import dsl._ + + new SolrSetup[F] { + val url = (Uri.unsafeFromString(cfg.url.asString) / "schema") + .withQueryParam("commitWithin", cfg.commitWithin.toString) + + def setupSchema: F[Unit] = { + val cmds0 = + List( + Fields.id, + Fields.itemId, + Fields.collectiveId, + Fields.discriminator, + Attachment.attachmentId + ) + .traverse(addStringField) + val cmds1 = List( + Attachment.attachmentName, + Attachment.content, + Item.itemName, + Item.itemNotes + ) + .traverse(addTextField) + + cmds0 *> cmds1 *> ().pure[F] + } + + private def run(cmd: Json): F[Unit] = { + val req = Method.POST(cmd, url) + logger.debug(s"Running request $req: ${cmd.noSpaces}") + client.expect[String](req).map(r => logger.debug(s"Response: $r")) + } + + private def addStringField(name: String): F[Unit] = + run(DeleteField.command(DeleteField(name))).attempt *> + run(AddField.command(AddField.string(name))) + + private def addTextField(name: String): F[Unit] = + run(DeleteField.command(DeleteField(name))).attempt *> + run(AddField.command(AddField.text(name))) + + } + } + + // Schema Commands + + case class AddField( + name: String, + `type`: String, + stored: Boolean, + indexed: Boolean, + multiValued: Boolean + ) + object AddField { + implicit val encoder: Encoder[AddField] = + deriveEncoder[AddField] + + def command(body: AddField): Json = + Map("add-field" -> body.asJson).asJson + + def string(name: String): AddField = + AddField(name, "string", true, true, false) + + def text(name: String): AddField = + AddField(name, "text_general", true, true, false) + } + + case class DeleteField(name: String) + object DeleteField { + implicit val encoder: Encoder[DeleteField] = + deriveEncoder[DeleteField] + + def command(body: DeleteField): Json = + Map("delete-field" -> body.asJson).asJson + } +} diff --git a/modules/fts-solr/src/main/scala/docspell/ftssolr/SolrUpdate.scala b/modules/fts-solr/src/main/scala/docspell/ftssolr/SolrUpdate.scala index 7a73d45c..3b772cd2 100644 --- a/modules/fts-solr/src/main/scala/docspell/ftssolr/SolrUpdate.scala +++ b/modules/fts-solr/src/main/scala/docspell/ftssolr/SolrUpdate.scala @@ -29,7 +29,7 @@ object SolrUpdate { new SolrUpdate[F] { val url = (Uri.unsafeFromString(cfg.url.asString) / "update") - .withQueryParam("commitWithin", "1000") + .withQueryParam("commitWithin", cfg.commitWithin.toString) .withQueryParam("overwrite", "true") .withQueryParam("wt", "json") diff --git a/modules/joex/src/main/resources/reference.conf b/modules/joex/src/main/resources/reference.conf index efb9c5fc..3bcfd5e6 100644 --- a/modules/joex/src/main/resources/reference.conf +++ b/modules/joex/src/main/resources/reference.conf @@ -373,6 +373,7 @@ docspell.joex { } solr = { url = "http://localhost:8983/solr/docspell_core" + commit-within = 1000 } } } \ No newline at end of file diff --git a/modules/joex/src/main/scala/docspell/joex/JoexAppImpl.scala b/modules/joex/src/main/scala/docspell/joex/JoexAppImpl.scala index 05e5e940..c8a32fbf 100644 --- a/modules/joex/src/main/scala/docspell/joex/JoexAppImpl.scala +++ b/modules/joex/src/main/scala/docspell/joex/JoexAppImpl.scala @@ -56,7 +56,7 @@ final class JoexAppImpl[F[_]: ConcurrentEffect: ContextShift: Timer]( private def scheduleBackgroundTasks: F[Unit] = HouseKeepingTask .periodicTask[F](cfg.houseKeeping.schedule) - .flatMap(pstore.insert) *> IndexTask.job.flatMap(queue.insert) + .flatMap(pstore.insert) *> IndexTask.job.flatMap(queue.insertIfNew) } object JoexAppImpl { diff --git a/modules/joex/src/main/scala/docspell/joex/fts/IndexTask.scala b/modules/joex/src/main/scala/docspell/joex/fts/IndexTask.scala index 5be969d9..c7298943 100644 --- a/modules/joex/src/main/scala/docspell/joex/fts/IndexTask.scala +++ b/modules/joex/src/main/scala/docspell/joex/fts/IndexTask.scala @@ -42,7 +42,7 @@ object IndexTask { now, systemGroup, Priority.Low, - None + Some(taskName) ) } diff --git a/modules/joex/src/main/scala/docspell/joex/fts/Migration.scala b/modules/joex/src/main/scala/docspell/joex/fts/Migration.scala index 546198ad..88af2ef4 100644 --- a/modules/joex/src/main/scala/docspell/joex/fts/Migration.scala +++ b/modules/joex/src/main/scala/docspell/joex/fts/Migration.scala @@ -57,22 +57,23 @@ object Migration { _ <- OptionT.liftF(ctx.logger.info(s"Apply ${m.version}/${m.description}")) rec <- OptionT(insertRecord) res <- OptionT.liftF(m.task.run(ctx).attempt) - _ <- OptionT.liftF(res match { + ret <- OptionT.liftF(res match { case Right(()) => ().pure[F] case Left(ex) => ctx.logger.error(ex)( s"Applying index migration ${m.version}/${m.description} failed" ) *> - ctx.store.transact(RFtsMigration.deleteById(rec.id)) *> Effect[F].raiseError( - ex - ) + ctx.store.transact(RFtsMigration.deleteById(rec.id)) *> Effect[F] + .raiseError[Unit]( + ex + ) }) - } yield ()).getOrElseF( + } yield ret).getOrElseF( ctx.logger.info(s"Migration ${m.version}/${m.description} already applied.") ) } - def migrationTasks[F[_]]: List[Migration[F]] = + def migrationTasks[F[_]: Effect]: List[Migration[F]] = List( Migration[F](1, solrEngine, "initialize", Kleisli(ctx => ctx.fts.initialize)), Migration[F]( diff --git a/modules/restserver/src/main/resources/reference.conf b/modules/restserver/src/main/resources/reference.conf index c3a09ae2..3ac7e9e5 100644 --- a/modules/restserver/src/main/resources/reference.conf +++ b/modules/restserver/src/main/resources/reference.conf @@ -153,6 +153,7 @@ docspell.server { enabled = true solr = { url = "http://localhost:8983/solr/docspell_core" + commit-within = 1000 } } } diff --git a/modules/store/src/main/scala/docspell/store/queries/QPeriodicTask.scala b/modules/store/src/main/scala/docspell/store/queries/QPeriodicTask.scala index e52c27cc..21624f7a 100644 --- a/modules/store/src/main/scala/docspell/store/queries/QPeriodicTask.scala +++ b/modules/store/src/main/scala/docspell/store/queries/QPeriodicTask.scala @@ -51,15 +51,4 @@ object QPeriodicTask { selectSimple(RPeriodicTask.Columns.all, RPeriodicTask.table, where) ++ order sql.query[RPeriodicTask].streamWithChunkSize(2).take(1).compile.last } - - def findNonFinal(pid: Ident): ConnectionIO[Option[RJob]] = - selectSimple( - RJob.Columns.all, - RJob.table, - and( - RJob.Columns.tracker.is(pid), - RJob.Columns.state.isOneOf(JobState.all.diff(JobState.done).toSeq) - ) - ).query[RJob].option - } diff --git a/modules/store/src/main/scala/docspell/store/queue/JobQueue.scala b/modules/store/src/main/scala/docspell/store/queue/JobQueue.scala index e77fa755..2fa60d91 100644 --- a/modules/store/src/main/scala/docspell/store/queue/JobQueue.scala +++ b/modules/store/src/main/scala/docspell/store/queue/JobQueue.scala @@ -11,8 +11,19 @@ import org.log4s._ trait JobQueue[F[_]] { + /** Inserts the job into the queue to get picked up as soon as + * possible. The job must have a new unique id. + */ def insert(job: RJob): F[Unit] + /** Inserts the job into the queue only, if there is no job with the + * same tracker-id running at the moment. The job id must be a new + * unique id. + * + * If the job has no tracker defined, it is simply inserted. + */ + def insertIfNew(job: RJob): F[Unit] + def insertAll(jobs: Seq[RJob]): F[Unit] def nextJob( @@ -46,6 +57,19 @@ object JobQueue { else ().pure[F] } + def insertIfNew(job: RJob): F[Unit] = + for { + rj <- job.tracker match { + case Some(tid) => + store.transact(RJob.findNonFinalByTracker(tid)) + case None => + None.pure[F] + } + ret <- + if (rj.isDefined) ().pure[F] + else insert(job) + } yield ret + def insertAll(jobs: Seq[RJob]): F[Unit] = jobs.toList .traverse(j => insert(j).attempt) diff --git a/modules/store/src/main/scala/docspell/store/queue/PeriodicTaskStore.scala b/modules/store/src/main/scala/docspell/store/queue/PeriodicTaskStore.scala index c94458bb..16350602 100644 --- a/modules/store/src/main/scala/docspell/store/queue/PeriodicTaskStore.scala +++ b/modules/store/src/main/scala/docspell/store/queue/PeriodicTaskStore.scala @@ -100,7 +100,7 @@ object PeriodicTaskStore { } def findNonFinalJob(pjobId: Ident): F[Option[RJob]] = - store.transact(QPeriodicTask.findNonFinal(pjobId)) + store.transact(RJob.findNonFinalByTracker(pjobId)) def insert(task: RPeriodicTask): F[Unit] = { val update = store.transact(RPeriodicTask.update(task)) diff --git a/modules/store/src/main/scala/docspell/store/records/RJob.scala b/modules/store/src/main/scala/docspell/store/records/RJob.scala index 9a286743..9f607c5c 100644 --- a/modules/store/src/main/scala/docspell/store/records/RJob.scala +++ b/modules/store/src/main/scala/docspell/store/records/RJob.scala @@ -255,4 +255,12 @@ object RJob { .map(_ => 1) .compile .foldMonoid + + def findNonFinalByTracker(trackerId: Ident): ConnectionIO[Option[RJob]] = + selectSimple( + all, + table, + and(tracker.is(trackerId), state.isOneOf(JobState.all.diff(JobState.done).toSeq)) + ).query[RJob].option + }