mirror of
https://github.com/TheAnachronism/docspell.git
synced 2025-04-04 10:29:34 +00:00
Store solr migration state in a solr document
This commit is contained in:
parent
c879fdf460
commit
5205ee0623
@ -17,12 +17,26 @@ import org.log4s.getLogger
|
||||
*/
|
||||
trait FtsClient[F[_]] {
|
||||
|
||||
/** Initialization tasks. This is called exactly once at the very
|
||||
* beginning when initializing the full-text index and then never
|
||||
* again (except when re-indexing everything). It may be used to
|
||||
* setup the database.
|
||||
/** Initialization tasks. This can be used to setup the fulltext
|
||||
* search engine. The implementation is expected to keep track of
|
||||
* run migrations, so that running these is idempotent. For
|
||||
* example, it may be run on each application start.
|
||||
*
|
||||
* Initialization may involve re-indexing all data, therefore it
|
||||
* must run outside the scope of this client. The migration may
|
||||
* include a task that applies any work and/or it can return a
|
||||
* result indicating that after this task a re-index is necessary.
|
||||
*/
|
||||
def initialize: List[FtsMigration[F]]
|
||||
def initialize: F[List[FtsMigration[F]]]
|
||||
|
||||
/** A list of initialization tasks that are meant to run when there
|
||||
* was no setup at all or when re-creating the index.
|
||||
*
|
||||
* This is not run on startup, but only when required, for example
|
||||
* when re-creating the entire index. These tasks don't need to
|
||||
* preserve the data in the index.
|
||||
*/
|
||||
def initializeNew: List[FtsMigration[F]]
|
||||
|
||||
/** Run a full-text search. */
|
||||
def search(q: FtsQuery): F[FtsResult]
|
||||
@ -116,7 +130,10 @@ object FtsClient {
|
||||
new FtsClient[F] {
|
||||
private[this] val logger = Logger.log4s[F](getLogger)
|
||||
|
||||
def initialize: List[FtsMigration[F]] =
|
||||
def initialize: F[List[FtsMigration[F]]] =
|
||||
Sync[F].pure(Nil)
|
||||
|
||||
def initializeNew: List[FtsMigration[F]] =
|
||||
Nil
|
||||
|
||||
def search(q: FtsQuery): F[FtsResult] =
|
||||
|
@ -53,6 +53,37 @@ trait JsonCodec {
|
||||
): Encoder[TextData] =
|
||||
Encoder(_.fold(ae.apply, ie.apply))
|
||||
|
||||
implicit def versionDocEncoder: Encoder[VersionDoc] =
|
||||
new Encoder[VersionDoc] {
|
||||
final def apply(d: VersionDoc): Json =
|
||||
Json.fromFields(
|
||||
List(
|
||||
(VersionDoc.Fields.id.name, d.id.asJson),
|
||||
(
|
||||
VersionDoc.Fields.currentVersion.name,
|
||||
Map("set" -> d.currentVersion.asJson).asJson
|
||||
)
|
||||
)
|
||||
)
|
||||
}
|
||||
|
||||
implicit def decoderVersionDoc: Decoder[VersionDoc] =
|
||||
new Decoder[VersionDoc] {
|
||||
final def apply(c: HCursor): Decoder.Result[VersionDoc] =
|
||||
for {
|
||||
id <- c.get[String](VersionDoc.Fields.id.name)
|
||||
version <- c.get[Int](VersionDoc.Fields.currentVersion.name)
|
||||
} yield VersionDoc(id, version)
|
||||
}
|
||||
|
||||
implicit def versionDocDecoder: Decoder[Option[VersionDoc]] =
|
||||
new Decoder[Option[VersionDoc]] {
|
||||
final def apply(c: HCursor): Decoder.Result[Option[VersionDoc]] =
|
||||
c.downField("response")
|
||||
.get[List[VersionDoc]]("docs")
|
||||
.map(_.headOption)
|
||||
}
|
||||
|
||||
implicit def docIdResultsDecoder: Decoder[DocIdResult] =
|
||||
new Decoder[DocIdResult] {
|
||||
final def apply(c: HCursor): Decoder.Result[DocIdResult] =
|
||||
|
@ -17,8 +17,11 @@ final class SolrFtsClient[F[_]: Effect](
|
||||
solrQuery: SolrQuery[F]
|
||||
) extends FtsClient[F] {
|
||||
|
||||
def initialize: List[FtsMigration[F]] =
|
||||
solrSetup.setupSchema
|
||||
def initialize: F[List[FtsMigration[F]]] =
|
||||
solrSetup.remainingSetup.map(_.map(_.value))
|
||||
|
||||
def initializeNew: List[FtsMigration[F]] =
|
||||
solrSetup.setupSchema.map(_.value)
|
||||
|
||||
def search(q: FtsQuery): F[FtsResult] =
|
||||
solrQuery.query(q)
|
||||
|
@ -0,0 +1,70 @@
|
||||
package docspell.ftssolr
|
||||
import cats.implicits._
|
||||
import cats.{Applicative, Functor}
|
||||
|
||||
import docspell.common._
|
||||
import docspell.ftsclient.FtsMigration
|
||||
|
||||
final case class SolrMigration[F[_]](value: FtsMigration[F], dataChangeOnly: Boolean) {
|
||||
def isSchemaChange: Boolean = !dataChangeOnly
|
||||
}
|
||||
|
||||
object SolrMigration {
|
||||
private val solrEngine = Ident.unsafe("solr")
|
||||
|
||||
def deleteData[F[_]: Functor](version: Int, solrUpdate: SolrUpdate[F]): SolrMigration[F] =
|
||||
apply(version, "Delete all data", solrUpdate.delete("*:*", Option(0)))
|
||||
|
||||
def writeVersion[F[_]: Functor](
|
||||
solrUpdate: SolrUpdate[F],
|
||||
doc: VersionDoc
|
||||
): SolrMigration[F] =
|
||||
apply(
|
||||
Int.MaxValue,
|
||||
s"Write current version: ${doc.currentVersion}",
|
||||
solrUpdate.updateVersionDoc(doc)
|
||||
)
|
||||
|
||||
def reIndexAll[F[_]: Applicative](
|
||||
versionNumber: Int,
|
||||
description: String
|
||||
): SolrMigration[F] =
|
||||
SolrMigration(
|
||||
FtsMigration(
|
||||
versionNumber,
|
||||
solrEngine,
|
||||
description,
|
||||
FtsMigration.Result.reIndexAll.pure[F]
|
||||
),
|
||||
true
|
||||
)
|
||||
|
||||
def indexAll[F[_]: Applicative](
|
||||
versionNumber: Int,
|
||||
description: String
|
||||
): SolrMigration[F] =
|
||||
SolrMigration(
|
||||
FtsMigration(
|
||||
versionNumber,
|
||||
solrEngine,
|
||||
description,
|
||||
FtsMigration.Result.indexAll.pure[F]
|
||||
),
|
||||
true
|
||||
)
|
||||
|
||||
def apply[F[_]: Functor](
|
||||
version: Int,
|
||||
description: String,
|
||||
task: F[Unit]
|
||||
): SolrMigration[F] =
|
||||
SolrMigration(
|
||||
FtsMigration(
|
||||
version,
|
||||
solrEngine,
|
||||
description,
|
||||
task.map(_ => FtsMigration.Result.workDone)
|
||||
),
|
||||
false
|
||||
)
|
||||
}
|
@ -17,6 +17,8 @@ trait SolrQuery[F[_]] {
|
||||
def query(q: QueryData): F[FtsResult]
|
||||
|
||||
def query(q: FtsQuery): F[FtsResult]
|
||||
|
||||
def findVersionDoc(id: String): F[Option[VersionDoc]]
|
||||
}
|
||||
|
||||
object SolrQuery {
|
||||
@ -54,6 +56,16 @@ object SolrQuery {
|
||||
)
|
||||
query(fq)
|
||||
}
|
||||
|
||||
def findVersionDoc(id: String): F[Option[VersionDoc]] = {
|
||||
val fields = List(
|
||||
Field.id,
|
||||
Field("current_version_i")
|
||||
)
|
||||
val query = QueryData(s"id:$id", "", 1, 0, fields, Map.empty)
|
||||
val req = Method.POST(query.asJson, url)
|
||||
client.expect[Option[VersionDoc]](req)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -4,7 +4,6 @@ import cats.effect._
|
||||
import cats.implicits._
|
||||
|
||||
import docspell.common._
|
||||
import docspell.ftsclient.FtsMigration
|
||||
|
||||
import _root_.io.circe._
|
||||
import _root_.io.circe.generic.semiauto._
|
||||
@ -16,12 +15,14 @@ import org.http4s.client.dsl.Http4sClientDsl
|
||||
|
||||
trait SolrSetup[F[_]] {
|
||||
|
||||
def setupSchema: List[FtsMigration[F]]
|
||||
def setupSchema: List[SolrMigration[F]]
|
||||
|
||||
def remainingSetup: F[List[SolrMigration[F]]]
|
||||
|
||||
}
|
||||
|
||||
object SolrSetup {
|
||||
private val solrEngine = Ident.unsafe("solr")
|
||||
private val versionDocId = "6d8f09f4-8d7e-4bc9-98b8-7c89223b36dd"
|
||||
|
||||
def apply[F[_]: ConcurrentEffect](cfg: SolrConfig, client: Client[F]): SolrSetup[F] = {
|
||||
val dsl = new Http4sClientDsl[F] {}
|
||||
@ -32,62 +33,75 @@ object SolrSetup {
|
||||
val url = (Uri.unsafeFromString(cfg.url.asString) / "schema")
|
||||
.withQueryParam("commitWithin", cfg.commitWithin.toString)
|
||||
|
||||
def setupSchema: List[FtsMigration[F]] =
|
||||
def remainingSetup: F[List[SolrMigration[F]]] =
|
||||
for {
|
||||
current <- SolrQuery(cfg, client).findVersionDoc(versionDocId)
|
||||
migs = current match {
|
||||
case None => setupSchema
|
||||
case Some(ver) =>
|
||||
val verDoc =
|
||||
VersionDoc(versionDocId, allMigrations.map(_.value.version).max)
|
||||
val solrUp = SolrUpdate(cfg, client)
|
||||
val remain = allMigrations.filter(v => v.value.version > ver.currentVersion)
|
||||
if (remain.isEmpty) remain
|
||||
else remain :+ SolrMigration.writeVersion(solrUp, verDoc)
|
||||
}
|
||||
} yield migs
|
||||
|
||||
def setupSchema: List[SolrMigration[F]] = {
|
||||
val verDoc = VersionDoc(versionDocId, allMigrations.map(_.value.version).max)
|
||||
val solrUp = SolrUpdate(cfg, client)
|
||||
val writeVersion = SolrMigration.writeVersion(solrUp, verDoc)
|
||||
val deleteAll = SolrMigration.deleteData(0, solrUp)
|
||||
val indexAll = SolrMigration.indexAll[F](Int.MaxValue, "Index all data")
|
||||
|
||||
deleteAll :: (allMigrations.filter(_.isSchemaChange) ::: List(indexAll, writeVersion))
|
||||
}
|
||||
|
||||
private def allMigrations: List[SolrMigration[F]] =
|
||||
List(
|
||||
FtsMigration[F](
|
||||
SolrMigration[F](
|
||||
1,
|
||||
solrEngine,
|
||||
"Initialize",
|
||||
setupCoreSchema.map(_ => FtsMigration.Result.workDone)
|
||||
setupCoreSchema
|
||||
),
|
||||
FtsMigration[F](
|
||||
3,
|
||||
solrEngine,
|
||||
SolrMigration[F](
|
||||
2,
|
||||
"Add folder field",
|
||||
addFolderField.map(_ => FtsMigration.Result.workDone)
|
||||
addFolderField
|
||||
),
|
||||
FtsMigration[F](
|
||||
SolrMigration.indexAll(3, "Index all from database after adding folder field"),
|
||||
SolrMigration[F](
|
||||
4,
|
||||
solrEngine,
|
||||
"Index all from database",
|
||||
FtsMigration.Result.indexAll.pure[F]
|
||||
),
|
||||
FtsMigration[F](
|
||||
5,
|
||||
solrEngine,
|
||||
"Add content_fr field",
|
||||
addContentField(Language.French).map(_ => FtsMigration.Result.workDone)
|
||||
addContentField(Language.French)
|
||||
),
|
||||
FtsMigration[F](
|
||||
SolrMigration
|
||||
.indexAll(5, "Index all from database after adding french content field"),
|
||||
SolrMigration[F](
|
||||
6,
|
||||
solrEngine,
|
||||
"Index all from database",
|
||||
FtsMigration.Result.indexAll.pure[F]
|
||||
),
|
||||
FtsMigration[F](
|
||||
7,
|
||||
solrEngine,
|
||||
"Add content_it field",
|
||||
addContentField(Language.Italian).map(_ => FtsMigration.Result.reIndexAll)
|
||||
addContentField(Language.Italian)
|
||||
),
|
||||
FtsMigration[F](
|
||||
SolrMigration.reIndexAll(7, "Re-Index after adding italian content field"),
|
||||
SolrMigration[F](
|
||||
8,
|
||||
solrEngine,
|
||||
"Add content_es field",
|
||||
addContentField(Language.Spanish).map(_ => FtsMigration.Result.reIndexAll)
|
||||
addContentField(Language.Spanish)
|
||||
),
|
||||
FtsMigration[F](
|
||||
9,
|
||||
solrEngine,
|
||||
"Add more content fields",
|
||||
addMoreContentFields.map(_ => FtsMigration.Result.reIndexAll)
|
||||
),
|
||||
FtsMigration[F](
|
||||
SolrMigration.reIndexAll(9, "Re-Index after adding spanish content field"),
|
||||
SolrMigration[F](
|
||||
10,
|
||||
solrEngine,
|
||||
"Add more content fields",
|
||||
addMoreContentFields
|
||||
),
|
||||
SolrMigration.reIndexAll(11, "Re-Index after adding more content fields"),
|
||||
SolrMigration[F](
|
||||
12,
|
||||
"Add latvian content field",
|
||||
addContentField(Language.Latvian).map(_ => FtsMigration.Result.reIndexAll)
|
||||
)
|
||||
addContentField(Language.Latvian)
|
||||
),
|
||||
SolrMigration.reIndexAll(13, "Re-Index after adding latvian content field")
|
||||
)
|
||||
|
||||
def addFolderField: F[Unit] =
|
||||
|
@ -23,6 +23,8 @@ trait SolrUpdate[F[_]] {
|
||||
|
||||
def updateFolder(itemId: Ident, collective: Ident, folder: Option[Ident]): F[Unit]
|
||||
|
||||
def updateVersionDoc(doc: VersionDoc): F[Unit]
|
||||
|
||||
def delete(q: String, commitWithin: Option[Int]): F[Unit]
|
||||
}
|
||||
|
||||
@ -48,6 +50,11 @@ object SolrUpdate {
|
||||
client.expect[Unit](req)
|
||||
}
|
||||
|
||||
def updateVersionDoc(doc: VersionDoc): F[Unit] = {
|
||||
val req = Method.POST(List(doc).asJson, url)
|
||||
client.expect[Unit](req)
|
||||
}
|
||||
|
||||
def updateFolder(
|
||||
itemId: Ident,
|
||||
collective: Ident,
|
||||
|
@ -0,0 +1,11 @@
|
||||
package docspell.ftssolr
|
||||
|
||||
final case class VersionDoc(id: String, currentVersion: Int)
|
||||
|
||||
object VersionDoc {
|
||||
|
||||
object Fields {
|
||||
val id = Field("id")
|
||||
val currentVersion = Field("current_version_i")
|
||||
}
|
||||
}
|
@ -18,7 +18,9 @@ object FtsWork {
|
||||
def reInitializeTasks[F[_]: Monad]: FtsWork[F] =
|
||||
FtsWork { ctx =>
|
||||
val migrations =
|
||||
ctx.fts.initialize.map(fm => fm.changeResult(_ => FtsMigration.Result.workDone))
|
||||
ctx.fts.initializeNew.map(fm =>
|
||||
fm.changeResult(_ => FtsMigration.Result.workDone)
|
||||
)
|
||||
|
||||
NonEmptyList.fromList(migrations) match {
|
||||
case Some(nel) =>
|
||||
|
@ -20,8 +20,10 @@ object MigrationTask {
|
||||
.log[F, Unit](_.info(s"Running full-text-index migrations now"))
|
||||
.flatMap(_ =>
|
||||
Task(ctx =>
|
||||
Migration[F](cfg, fts, ctx.store, ctx.logger)
|
||||
.run(migrationTasks[F](fts))
|
||||
for {
|
||||
migs <- migrationTasks[F](fts)
|
||||
res <- Migration[F](cfg, fts, ctx.store, ctx.logger).run(migs)
|
||||
} yield res
|
||||
)
|
||||
)
|
||||
|
||||
@ -44,7 +46,7 @@ object MigrationTask {
|
||||
Some(DocspellSystem.migrationTaskTracker)
|
||||
)
|
||||
|
||||
def migrationTasks[F[_]: Effect](fts: FtsClient[F]): List[Migration[F]] =
|
||||
fts.initialize.map(fm => Migration.from(fm))
|
||||
def migrationTasks[F[_]: Effect](fts: FtsClient[F]): F[List[Migration[F]]] =
|
||||
fts.initialize.map(_.map(fm => Migration.from(fm)))
|
||||
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user