From 5205ee0623ae0275ed14ccef8c7cf87e0739177f Mon Sep 17 00:00:00 2001
From: eikek <eike.kettner@posteo.de>
Date: Mon, 7 Jun 2021 16:46:43 +0200
Subject: [PATCH] Store solr migration state in a solr document

---
 .../scala/docspell/ftsclient/FtsClient.scala  | 29 ++++--
 .../scala/docspell/ftssolr/JsonCodec.scala    | 31 ++++++
 .../docspell/ftssolr/SolrFtsClient.scala      |  7 +-
 .../docspell/ftssolr/SolrMigration.scala      | 70 +++++++++++++
 .../scala/docspell/ftssolr/SolrQuery.scala    | 12 +++
 .../scala/docspell/ftssolr/SolrSetup.scala    | 98 +++++++++++--------
 .../scala/docspell/ftssolr/SolrUpdate.scala   |  7 ++
 .../scala/docspell/ftssolr/VersionDoc.scala   | 11 +++
 .../scala/docspell/joex/fts/FtsWork.scala     |  4 +-
 .../docspell/joex/fts/MigrationTask.scala     | 10 +-
 10 files changed, 224 insertions(+), 55 deletions(-)
 create mode 100644 modules/fts-solr/src/main/scala/docspell/ftssolr/SolrMigration.scala
 create mode 100644 modules/fts-solr/src/main/scala/docspell/ftssolr/VersionDoc.scala

diff --git a/modules/fts-client/src/main/scala/docspell/ftsclient/FtsClient.scala b/modules/fts-client/src/main/scala/docspell/ftsclient/FtsClient.scala
index dcf2d88f..2619eec6 100644
--- a/modules/fts-client/src/main/scala/docspell/ftsclient/FtsClient.scala
+++ b/modules/fts-client/src/main/scala/docspell/ftsclient/FtsClient.scala
@@ -17,12 +17,26 @@ import org.log4s.getLogger
   */
 trait FtsClient[F[_]] {
 
-  /** Initialization tasks. This is called exactly once at the very
-    * beginning when initializing the full-text index and then never
-    * again (except when re-indexing everything). It may be used to
-    * setup the database.
+  /** Initialization tasks. This can be used to setup the fulltext
+    * search engine. The implementation is expected to keep track of
+    * run migrations, so that running these is idempotent. For
+    * example, it may be run on each application start.
+    *
+    * Initialization may involve re-indexing all data, therefore it
+    * must run outside the scope of this client. The migration may
+    * include a task that applies any work and/or it can return a
+    * result indicating that after this task a re-index is necessary.
     */
-  def initialize: List[FtsMigration[F]]
+  def initialize: F[List[FtsMigration[F]]]
+
+  /** A list of initialization tasks that are meant to run when there
+    * was no setup at all or when re-creating the index.
+    *
+    * This is not run on startup, but only when required, for example
+    * when re-creating the entire index. These tasks don't need to
+    * preserve the data in the index.
+    */
+  def initializeNew: List[FtsMigration[F]]
 
   /** Run a full-text search. */
   def search(q: FtsQuery): F[FtsResult]
@@ -116,7 +130,10 @@ object FtsClient {
     new FtsClient[F] {
       private[this] val logger = Logger.log4s[F](getLogger)
 
-      def initialize: List[FtsMigration[F]] =
+      def initialize: F[List[FtsMigration[F]]] =
+        Sync[F].pure(Nil)
+
+      def initializeNew: List[FtsMigration[F]] =
         Nil
 
       def search(q: FtsQuery): F[FtsResult] =
diff --git a/modules/fts-solr/src/main/scala/docspell/ftssolr/JsonCodec.scala b/modules/fts-solr/src/main/scala/docspell/ftssolr/JsonCodec.scala
index 4c639668..a7b5d8a9 100644
--- a/modules/fts-solr/src/main/scala/docspell/ftssolr/JsonCodec.scala
+++ b/modules/fts-solr/src/main/scala/docspell/ftssolr/JsonCodec.scala
@@ -53,6 +53,37 @@ trait JsonCodec {
   ): Encoder[TextData] =
     Encoder(_.fold(ae.apply, ie.apply))
 
+  implicit def versionDocEncoder: Encoder[VersionDoc] =
+    new Encoder[VersionDoc] {
+      final def apply(d: VersionDoc): Json =
+        Json.fromFields(
+          List(
+            (VersionDoc.Fields.id.name, d.id.asJson),
+            (
+              VersionDoc.Fields.currentVersion.name,
+              Map("set" -> d.currentVersion.asJson).asJson
+            )
+          )
+        )
+    }
+
+  implicit def decoderVersionDoc: Decoder[VersionDoc] =
+    new Decoder[VersionDoc] {
+      final def apply(c: HCursor): Decoder.Result[VersionDoc] =
+        for {
+          id      <- c.get[String](VersionDoc.Fields.id.name)
+          version <- c.get[Int](VersionDoc.Fields.currentVersion.name)
+        } yield VersionDoc(id, version)
+    }
+
+  implicit def versionDocDecoder: Decoder[Option[VersionDoc]] =
+    new Decoder[Option[VersionDoc]] {
+      final def apply(c: HCursor): Decoder.Result[Option[VersionDoc]] =
+        c.downField("response")
+          .get[List[VersionDoc]]("docs")
+          .map(_.headOption)
+    }
+
   implicit def docIdResultsDecoder: Decoder[DocIdResult] =
     new Decoder[DocIdResult] {
       final def apply(c: HCursor): Decoder.Result[DocIdResult] =
diff --git a/modules/fts-solr/src/main/scala/docspell/ftssolr/SolrFtsClient.scala b/modules/fts-solr/src/main/scala/docspell/ftssolr/SolrFtsClient.scala
index f8f7fd3b..b1c7e90d 100644
--- a/modules/fts-solr/src/main/scala/docspell/ftssolr/SolrFtsClient.scala
+++ b/modules/fts-solr/src/main/scala/docspell/ftssolr/SolrFtsClient.scala
@@ -17,8 +17,11 @@ final class SolrFtsClient[F[_]: Effect](
     solrQuery: SolrQuery[F]
 ) extends FtsClient[F] {
 
-  def initialize: List[FtsMigration[F]] =
-    solrSetup.setupSchema
+  def initialize: F[List[FtsMigration[F]]] =
+    solrSetup.remainingSetup.map(_.map(_.value))
+
+  def initializeNew: List[FtsMigration[F]] =
+    solrSetup.setupSchema.map(_.value)
 
   def search(q: FtsQuery): F[FtsResult] =
     solrQuery.query(q)
diff --git a/modules/fts-solr/src/main/scala/docspell/ftssolr/SolrMigration.scala b/modules/fts-solr/src/main/scala/docspell/ftssolr/SolrMigration.scala
new file mode 100644
index 00000000..f1d02a66
--- /dev/null
+++ b/modules/fts-solr/src/main/scala/docspell/ftssolr/SolrMigration.scala
@@ -0,0 +1,70 @@
+package docspell.ftssolr
+import cats.implicits._
+import cats.{Applicative, Functor}
+
+import docspell.common._
+import docspell.ftsclient.FtsMigration
+
+final case class SolrMigration[F[_]](value: FtsMigration[F], dataChangeOnly: Boolean) {
+  def isSchemaChange: Boolean = !dataChangeOnly
+}
+
+object SolrMigration {
+  private val solrEngine = Ident.unsafe("solr")
+
+  def deleteData[F[_]: Functor](version: Int, solrUpdate: SolrUpdate[F]): SolrMigration[F] =
+    apply(version, "Delete all data", solrUpdate.delete("*:*", Option(0)))
+
+  def writeVersion[F[_]: Functor](
+      solrUpdate: SolrUpdate[F],
+      doc: VersionDoc
+  ): SolrMigration[F] =
+    apply(
+      Int.MaxValue,
+      s"Write current version: ${doc.currentVersion}",
+      solrUpdate.updateVersionDoc(doc)
+    )
+
+  def reIndexAll[F[_]: Applicative](
+      versionNumber: Int,
+      description: String
+  ): SolrMigration[F] =
+    SolrMigration(
+      FtsMigration(
+        versionNumber,
+        solrEngine,
+        description,
+        FtsMigration.Result.reIndexAll.pure[F]
+      ),
+      true
+    )
+
+  def indexAll[F[_]: Applicative](
+      versionNumber: Int,
+      description: String
+  ): SolrMigration[F] =
+    SolrMigration(
+      FtsMigration(
+        versionNumber,
+        solrEngine,
+        description,
+        FtsMigration.Result.indexAll.pure[F]
+      ),
+      true
+    )
+
+  def apply[F[_]: Functor](
+      version: Int,
+      description: String,
+      task: F[Unit]
+  ): SolrMigration[F] =
+    SolrMigration(
+      FtsMigration(
+        version,
+        solrEngine,
+        description,
+        task.map(_ => FtsMigration.Result.workDone)
+      ),
+      false
+    )
+}
diff --git a/modules/fts-solr/src/main/scala/docspell/ftssolr/SolrQuery.scala b/modules/fts-solr/src/main/scala/docspell/ftssolr/SolrQuery.scala
index ae286220..11c08954 100644
--- a/modules/fts-solr/src/main/scala/docspell/ftssolr/SolrQuery.scala
+++ b/modules/fts-solr/src/main/scala/docspell/ftssolr/SolrQuery.scala
@@ -17,6 +17,8 @@ trait SolrQuery[F[_]] {
   def query(q: QueryData): F[FtsResult]
 
   def query(q: FtsQuery): F[FtsResult]
+
+  def findVersionDoc(id: String): F[Option[VersionDoc]]
 }
 
 object SolrQuery {
@@ -54,6 +56,16 @@ object SolrQuery {
         )
         query(fq)
       }
+
+      def findVersionDoc(id: String): F[Option[VersionDoc]] = {
+        val fields = List(
+          Field.id,
+          Field("current_version_i")
+        )
+        val query = QueryData(s"id:$id", "", 1, 0, fields, Map.empty)
+        val req   = Method.POST(query.asJson, url)
+        client.expect[Option[VersionDoc]](req)
+      }
     }
   }
 }
diff --git a/modules/fts-solr/src/main/scala/docspell/ftssolr/SolrSetup.scala b/modules/fts-solr/src/main/scala/docspell/ftssolr/SolrSetup.scala
index 95bcb3a7..2ed54e68 100644
--- a/modules/fts-solr/src/main/scala/docspell/ftssolr/SolrSetup.scala
+++ b/modules/fts-solr/src/main/scala/docspell/ftssolr/SolrSetup.scala
@@ -4,7 +4,6 @@ import cats.effect._
 import cats.implicits._
 
 import docspell.common._
-import docspell.ftsclient.FtsMigration
 
 import _root_.io.circe._
 import _root_.io.circe.generic.semiauto._
@@ -16,12 +15,14 @@ import org.http4s.client.dsl.Http4sClientDsl
 
 trait SolrSetup[F[_]] {
 
-  def setupSchema: List[FtsMigration[F]]
+  def setupSchema: List[SolrMigration[F]]
+
+  def remainingSetup: F[List[SolrMigration[F]]]
 
 }
 
 object SolrSetup {
-  private val solrEngine = Ident.unsafe("solr")
+  private val versionDocId = "6d8f09f4-8d7e-4bc9-98b8-7c89223b36dd"
 
   def apply[F[_]: ConcurrentEffect](cfg: SolrConfig, client: Client[F]): SolrSetup[F] = {
     val dsl = new Http4sClientDsl[F] {}
@@ -32,62 +33,75 @@ object SolrSetup {
       val url = (Uri.unsafeFromString(cfg.url.asString) / "schema")
         .withQueryParam("commitWithin", cfg.commitWithin.toString)
 
-      def setupSchema: List[FtsMigration[F]] =
+      def remainingSetup: F[List[SolrMigration[F]]] =
+        for {
+          current <- SolrQuery(cfg, client).findVersionDoc(versionDocId)
+          migs = current match {
+            case None => setupSchema
+            case Some(ver) =>
+              val verDoc =
+                VersionDoc(versionDocId, allMigrations.map(_.value.version).max)
+              val solrUp = SolrUpdate(cfg, client)
+              val remain = allMigrations.filter(v => v.value.version > ver.currentVersion)
+              if (remain.isEmpty) remain
+              else remain :+ SolrMigration.writeVersion(solrUp, verDoc)
+          }
+        } yield migs
+
+      def setupSchema: List[SolrMigration[F]] = {
+        val verDoc = VersionDoc(versionDocId, allMigrations.map(_.value.version).max)
+        val solrUp = SolrUpdate(cfg, client)
+        val writeVersion = SolrMigration.writeVersion(solrUp, verDoc)
+        val deleteAll = SolrMigration.deleteData(0, solrUp)
+        val indexAll = SolrMigration.indexAll[F](Int.MaxValue, "Index all data")
+
+        deleteAll :: (allMigrations.filter(_.isSchemaChange) ::: List(indexAll, writeVersion))
+      }
+
+      private def allMigrations: List[SolrMigration[F]] =
         List(
-          FtsMigration[F](
+          SolrMigration[F](
             1,
-            solrEngine,
             "Initialize",
-            setupCoreSchema.map(_ => FtsMigration.Result.workDone)
+            setupCoreSchema
           ),
-          FtsMigration[F](
-            3,
-            solrEngine,
+          SolrMigration[F](
+            2,
             "Add folder field",
-            addFolderField.map(_ => FtsMigration.Result.workDone)
+            addFolderField
           ),
-          FtsMigration[F](
+          SolrMigration.indexAll(3, "Index all from database after adding folder field"),
+          SolrMigration[F](
             4,
-            solrEngine,
-            "Index all from database",
-            FtsMigration.Result.indexAll.pure[F]
-          ),
-          FtsMigration[F](
-            5,
-            solrEngine,
             "Add content_fr field",
-            addContentField(Language.French).map(_ => FtsMigration.Result.workDone)
+            addContentField(Language.French)
           ),
-          FtsMigration[F](
+          SolrMigration
+            .indexAll(5, "Index all from database after adding french content field"),
+          SolrMigration[F](
             6,
-            solrEngine,
-            "Index all from database",
-            FtsMigration.Result.indexAll.pure[F]
-          ),
-          FtsMigration[F](
-            7,
-            solrEngine,
             "Add content_it field",
-            addContentField(Language.Italian).map(_ => FtsMigration.Result.reIndexAll)
+            addContentField(Language.Italian)
           ),
-          FtsMigration[F](
+          SolrMigration.reIndexAll(7, "Re-Index after adding italian content field"),
+          SolrMigration[F](
             8,
-            solrEngine,
             "Add content_es field",
-            addContentField(Language.Spanish).map(_ => FtsMigration.Result.reIndexAll)
+            addContentField(Language.Spanish)
           ),
-          FtsMigration[F](
-            9,
-            solrEngine,
-            "Add more content fields",
-            addMoreContentFields.map(_ => FtsMigration.Result.reIndexAll)
-          ),
-          FtsMigration[F](
+          SolrMigration.reIndexAll(9, "Re-Index after adding spanish content field"),
+          SolrMigration[F](
             10,
-            solrEngine,
+            "Add more content fields",
+            addMoreContentFields
+          ),
+          SolrMigration.reIndexAll(11, "Re-Index after adding more content fields"),
+          SolrMigration[F](
+            12,
             "Add latvian content field",
-            addContentField(Language.Latvian).map(_ => FtsMigration.Result.reIndexAll)
-          )
+            addContentField(Language.Latvian)
+          ),
+          SolrMigration.reIndexAll(13, "Re-Index after adding latvian content field")
         )
 
       def addFolderField: F[Unit] =
diff --git a/modules/fts-solr/src/main/scala/docspell/ftssolr/SolrUpdate.scala b/modules/fts-solr/src/main/scala/docspell/ftssolr/SolrUpdate.scala
index b5b5e642..7fa7db41 100644
--- a/modules/fts-solr/src/main/scala/docspell/ftssolr/SolrUpdate.scala
+++ b/modules/fts-solr/src/main/scala/docspell/ftssolr/SolrUpdate.scala
@@ -23,6 +23,8 @@ trait SolrUpdate[F[_]] {
 
   def updateFolder(itemId: Ident, collective: Ident, folder: Option[Ident]): F[Unit]
 
+  def updateVersionDoc(doc: VersionDoc): F[Unit]
+
   def delete(q: String, commitWithin: Option[Int]): F[Unit]
 }
 
@@ -48,6 +50,11 @@ object SolrUpdate {
         client.expect[Unit](req)
       }
 
+      def updateVersionDoc(doc: VersionDoc): F[Unit] = {
+        val req = Method.POST(List(doc).asJson, url)
+        client.expect[Unit](req)
+      }
+
       def updateFolder(
           itemId: Ident,
           collective: Ident,
diff --git a/modules/fts-solr/src/main/scala/docspell/ftssolr/VersionDoc.scala b/modules/fts-solr/src/main/scala/docspell/ftssolr/VersionDoc.scala
new file mode 100644
index 00000000..4d733340
--- /dev/null
+++ b/modules/fts-solr/src/main/scala/docspell/ftssolr/VersionDoc.scala
@@ -0,0 +1,11 @@
+package docspell.ftssolr
+
+final case class VersionDoc(id: String, currentVersion: Int)
+
+object VersionDoc {
+
+  object Fields {
+    val id             = Field("id")
+    val currentVersion = Field("current_version_i")
+  }
+}
diff --git a/modules/joex/src/main/scala/docspell/joex/fts/FtsWork.scala b/modules/joex/src/main/scala/docspell/joex/fts/FtsWork.scala
index 7ddfa99d..d50b0acc 100644
--- a/modules/joex/src/main/scala/docspell/joex/fts/FtsWork.scala
+++ b/modules/joex/src/main/scala/docspell/joex/fts/FtsWork.scala
@@ -18,7 +18,9 @@ object FtsWork {
   def reInitializeTasks[F[_]: Monad]: FtsWork[F] =
     FtsWork { ctx =>
       val migrations =
-        ctx.fts.initialize.map(fm => fm.changeResult(_ => FtsMigration.Result.workDone))
+        ctx.fts.initializeNew.map(fm =>
+          fm.changeResult(_ => FtsMigration.Result.workDone)
+        )
 
       NonEmptyList.fromList(migrations) match {
         case Some(nel) =>
diff --git a/modules/joex/src/main/scala/docspell/joex/fts/MigrationTask.scala b/modules/joex/src/main/scala/docspell/joex/fts/MigrationTask.scala
index c887b1a1..d8c4e4db 100644
--- a/modules/joex/src/main/scala/docspell/joex/fts/MigrationTask.scala
+++ b/modules/joex/src/main/scala/docspell/joex/fts/MigrationTask.scala
@@ -20,8 +20,10 @@ object MigrationTask {
       .log[F, Unit](_.info(s"Running full-text-index migrations now"))
       .flatMap(_ =>
         Task(ctx =>
-          Migration[F](cfg, fts, ctx.store, ctx.logger)
-            .run(migrationTasks[F](fts))
+          for {
+            migs <- migrationTasks[F](fts)
+            res  <- Migration[F](cfg, fts, ctx.store, ctx.logger).run(migs)
+          } yield res
         )
       )
 
@@ -44,7 +46,7 @@ object MigrationTask {
       Some(DocspellSystem.migrationTaskTracker)
     )
 
-  def migrationTasks[F[_]: Effect](fts: FtsClient[F]): List[Migration[F]] =
-    fts.initialize.map(fm => Migration.from(fm))
+  def migrationTasks[F[_]: Effect](fts: FtsClient[F]): F[List[Migration[F]]] =
+    fts.initialize.map(_.map(fm => Migration.from(fm)))
 
 }