mirror of
https://github.com/TheAnachronism/docspell.git
synced 2025-06-06 15:15:58 +00:00
Refactor re-index task
This commit is contained in:
parent
3ee0846e19
commit
ac7d00c28f
@ -29,12 +29,11 @@ trait FtsClient[F[_]] {
|
|||||||
*/
|
*/
|
||||||
def initialize: F[List[FtsMigration[F]]]
|
def initialize: F[List[FtsMigration[F]]]
|
||||||
|
|
||||||
/** A list of initialization tasks that are meant to run when there
|
/** A list of initialization tasks that can be run when re-creating
|
||||||
* was no setup at all or when re-creating the index.
|
* the index.
|
||||||
*
|
*
|
||||||
* This is not run on startup, but only when required, for example
|
* This is not run on startup, but only when required, for example
|
||||||
* when re-creating the entire index. These tasks don't need to
|
* when re-creating the entire index.
|
||||||
* preserve the data in the index.
|
|
||||||
*/
|
*/
|
||||||
def initializeNew: List[FtsMigration[F]]
|
def initializeNew: List[FtsMigration[F]]
|
||||||
|
|
||||||
|
@ -12,7 +12,10 @@ final case class SolrMigration[F[_]](value: FtsMigration[F], dataChangeOnly: Boo
|
|||||||
object SolrMigration {
|
object SolrMigration {
|
||||||
private val solrEngine = Ident.unsafe("solr")
|
private val solrEngine = Ident.unsafe("solr")
|
||||||
|
|
||||||
def deleteData[F[_]: Functor](version: Int, solrUpdate: SolrUpdate[F]): SolrMigration[F] =
|
def deleteData[F[_]: Functor](
|
||||||
|
version: Int,
|
||||||
|
solrUpdate: SolrUpdate[F]
|
||||||
|
): SolrMigration[F] =
|
||||||
apply(version, "Delete all data", solrUpdate.delete("*:*", Option(0)))
|
apply(version, "Delete all data", solrUpdate.delete("*:*", Option(0)))
|
||||||
|
|
||||||
def writeVersion[F[_]: Functor](
|
def writeVersion[F[_]: Functor](
|
||||||
|
@ -49,13 +49,14 @@ object SolrSetup {
|
|||||||
} yield migs
|
} yield migs
|
||||||
|
|
||||||
def setupSchema: List[SolrMigration[F]] = {
|
def setupSchema: List[SolrMigration[F]] = {
|
||||||
val verDoc = VersionDoc(versionDocId, allMigrations.map(_.value.version).max)
|
val verDoc = VersionDoc(versionDocId, allMigrations.map(_.value.version).max)
|
||||||
val solrUp = SolrUpdate(cfg, client)
|
val solrUp = SolrUpdate(cfg, client)
|
||||||
val writeVersion = SolrMigration.writeVersion(solrUp, verDoc)
|
val writeVersion = SolrMigration.writeVersion(solrUp, verDoc)
|
||||||
val deleteAll = SolrMigration.deleteData(0, solrUp)
|
val deleteAll = SolrMigration.deleteData(0, solrUp)
|
||||||
val indexAll = SolrMigration.indexAll[F](Int.MaxValue, "Index all data")
|
val indexAll = SolrMigration.indexAll[F](Int.MaxValue, "Index all data")
|
||||||
|
|
||||||
deleteAll :: (allMigrations.filter(_.isSchemaChange) ::: List(indexAll, writeVersion))
|
deleteAll :: (allMigrations
|
||||||
|
.filter(_.isSchemaChange) ::: List(indexAll, writeVersion))
|
||||||
}
|
}
|
||||||
|
|
||||||
private def allMigrations: List[SolrMigration[F]] =
|
private def allMigrations: List[SolrMigration[F]] =
|
||||||
|
@ -11,22 +11,23 @@ import docspell.joex.scheduler.Context
|
|||||||
import docspell.store.queries.{QAttachment, QItem}
|
import docspell.store.queries.{QAttachment, QItem}
|
||||||
|
|
||||||
object FtsWork {
|
object FtsWork {
|
||||||
|
import syntax._
|
||||||
|
|
||||||
def apply[F[_]](f: FtsContext[F] => F[Unit]): FtsWork[F] =
|
def apply[F[_]](f: FtsContext[F] => F[Unit]): FtsWork[F] =
|
||||||
Kleisli(f)
|
Kleisli(f)
|
||||||
|
|
||||||
/** Runs all migration tasks unconditionally and inserts all data as last step. */
|
/** Runs migration tasks to re-create the index. */
|
||||||
def reInitializeTasks[F[_]: Monad]: FtsWork[F] =
|
def reInitializeTasks[F[_]: Monad]: FtsWork[F] =
|
||||||
FtsWork { ctx =>
|
FtsWork { ctx =>
|
||||||
val migrations =
|
val migrations = ctx.fts.initializeNew
|
||||||
ctx.fts.initializeNew.map(fm =>
|
|
||||||
fm.changeResult(_ => FtsMigration.Result.workDone)
|
|
||||||
)
|
|
||||||
|
|
||||||
NonEmptyList.fromList(migrations) match {
|
NonEmptyList.fromList(migrations) match {
|
||||||
case Some(nel) =>
|
case Some(nel) =>
|
||||||
nel
|
nel
|
||||||
.map(fm => from[F](fm.task))
|
.map(fm =>
|
||||||
.append(insertAll[F](None))
|
log[F](_.debug(s"Apply (${fm.engine.id}): ${fm.description}")) ++ from[F](
|
||||||
|
fm.task
|
||||||
|
)
|
||||||
|
)
|
||||||
.reduce(semigroup[F])
|
.reduce(semigroup[F])
|
||||||
.run(ctx)
|
.run(ctx)
|
||||||
case None =>
|
case None =>
|
||||||
@ -34,8 +35,6 @@ object FtsWork {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
*/
|
|
||||||
def from[F[_]: FlatMap: Applicative](t: F[FtsMigration.Result]): FtsWork[F] =
|
def from[F[_]: FlatMap: Applicative](t: F[FtsMigration.Result]): FtsWork[F] =
|
||||||
Kleisli.liftF(t).flatMap(transformResult[F])
|
Kleisli.liftF(t).flatMap(transformResult[F])
|
||||||
|
|
||||||
@ -67,16 +66,20 @@ object FtsWork {
|
|||||||
def log[F[_]](f: Logger[F] => F[Unit]): FtsWork[F] =
|
def log[F[_]](f: Logger[F] => F[Unit]): FtsWork[F] =
|
||||||
FtsWork(ctx => f(ctx.logger))
|
FtsWork(ctx => f(ctx.logger))
|
||||||
|
|
||||||
def clearIndex[F[_]](coll: Option[Ident]): FtsWork[F] =
|
def clearIndex[F[_]: FlatMap](coll: Option[Ident]): FtsWork[F] =
|
||||||
coll match {
|
coll match {
|
||||||
case Some(cid) =>
|
case Some(cid) =>
|
||||||
FtsWork(ctx => ctx.fts.clear(ctx.logger, cid))
|
log[F](_.debug(s"Clearing index data for collective '${cid.id}'")) ++ FtsWork(
|
||||||
|
ctx => ctx.fts.clear(ctx.logger, cid)
|
||||||
|
)
|
||||||
case None =>
|
case None =>
|
||||||
FtsWork(ctx => ctx.fts.clearAll(ctx.logger))
|
log[F](_.debug("Clearing all index data!")) ++ FtsWork(ctx =>
|
||||||
|
ctx.fts.clearAll(ctx.logger)
|
||||||
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
def insertAll[F[_]: FlatMap](coll: Option[Ident]): FtsWork[F] =
|
def insertAll[F[_]: FlatMap](coll: Option[Ident]): FtsWork[F] =
|
||||||
FtsWork
|
log[F](_.info("Inserting all data to index")) ++ FtsWork
|
||||||
.all(
|
.all(
|
||||||
FtsWork(ctx =>
|
FtsWork(ctx =>
|
||||||
ctx.fts.indexData(
|
ctx.fts.indexData(
|
||||||
|
@ -41,10 +41,9 @@ object Migration {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
def applySingle[F[_]: Effect](ctx: FtsContext[F])(m: Migration[F]): F[Unit] = {
|
def applySingle[F[_]: Effect](ctx: FtsContext[F])(m: Migration[F]): F[Unit] =
|
||||||
for {
|
for {
|
||||||
_ <- ctx.logger.info(s"Apply ${m.version}/${m.description}")
|
_ <- ctx.logger.info(s"Apply ${m.version}/${m.description}")
|
||||||
_ <- m.task.run(ctx)
|
_ <- m.task.run(ctx)
|
||||||
} yield ()
|
} yield ()
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
@ -40,12 +40,7 @@ object ReIndexTask {
|
|||||||
FtsWork.insertAll[F](collective)
|
FtsWork.insertAll[F](collective)
|
||||||
|
|
||||||
case None =>
|
case None =>
|
||||||
FtsWork
|
FtsWork.log[F](_.info("Running re-create index")) ++
|
||||||
.clearIndex(None)
|
|
||||||
.recoverWith(
|
|
||||||
FtsWork.log[F](_.info("Clearing data failed. Continue re-indexing."))
|
|
||||||
) ++
|
|
||||||
FtsWork.log[F](_.info("Running index initialize")) ++
|
|
||||||
FtsWork.reInitializeTasks[F]
|
FtsWork.reInitializeTasks[F]
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user