Add backend operations for re-creating the full-text index

This commit is contained in:
Eike Kettner
2020-06-21 15:46:51 +02:00
parent 14ea4091c4
commit 0d8b03fc61
13 changed files with 237 additions and 58 deletions

View File

@ -61,7 +61,7 @@ object BackendApp {
jobImpl <- OJob(store, joexImpl) jobImpl <- OJob(store, joexImpl)
itemImpl <- OItem(store, solrFts) itemImpl <- OItem(store, solrFts)
itemSearchImpl <- OItemSearch(store) itemSearchImpl <- OItemSearch(store)
fulltextImpl <- OFulltext(itemSearchImpl, solrFts) fulltextImpl <- OFulltext(itemSearchImpl, solrFts, store, queue)
javaEmil = javaEmil =
JavaMailEmil(blocker, Settings.defaultSettings.copy(debug = cfg.mailDebug)) JavaMailEmil(blocker, Settings.defaultSettings.copy(debug = cfg.mailDebug))
mailImpl <- OMail(store, javaEmil) mailImpl <- OMail(store, javaEmil)

View File

@ -0,0 +1,90 @@
package docspell.backend
import cats.effect._
import cats.implicits._
import docspell.common._
import docspell.store.records.RJob
object JobFactory {
def processItem[F[_]: Sync](
args: ProcessItemArgs,
account: AccountId,
prio: Priority,
tracker: Option[Ident]
): F[RJob] =
for {
id <- Ident.randomId[F]
now <- Timestamp.current[F]
job = RJob.newJob(
id,
ProcessItemArgs.taskName,
account.collective,
args,
args.makeSubject,
now,
account.user,
prio,
tracker
)
} yield job
def processItems[F[_]: Sync](
args: Vector[ProcessItemArgs],
account: AccountId,
prio: Priority,
tracker: Option[Ident]
): F[Vector[RJob]] = {
def create(id: Ident, now: Timestamp, arg: ProcessItemArgs): RJob =
RJob.newJob(
id,
ProcessItemArgs.taskName,
account.collective,
arg,
arg.makeSubject,
now,
account.user,
prio,
tracker
)
for {
id <- Ident.randomId[F]
now <- Timestamp.current[F]
jobs = args.map(a => create(id, now, a))
} yield jobs
}
def reIndexAll[F[_]: Sync]: F[RJob] =
for {
id <- Ident.randomId[F]
now <- Timestamp.current[F]
} yield RJob.newJob(
id,
ReIndexTaskArgs.taskName,
DocspellSystem.taskGroup,
ReIndexTaskArgs(None),
s"Recreate full-text index",
now,
DocspellSystem.taskGroup,
Priority.Low,
Some(DocspellSystem.migrationTaskTracker)
)
def reIndex[F[_]: Sync](account: AccountId): F[RJob] =
for {
id <- Ident.randomId[F]
now <- Timestamp.current[F]
args = ReIndexTaskArgs(Some(account.collective))
} yield RJob.newJob(
id,
ReIndexTaskArgs.taskName,
account.collective,
args,
s"Recreate full-text index",
now,
account.user,
Priority.Low,
Some(ReIndexTaskArgs.tracker(args))
)
}

View File

@ -5,6 +5,10 @@ import cats.implicits._
import fs2.Stream import fs2.Stream
import docspell.common._ import docspell.common._
import docspell.ftsclient._ import docspell.ftsclient._
import docspell.backend.JobFactory
import docspell.store.Store
import docspell.store.records.RJob
import docspell.store.queue.JobQueue
import OItemSearch.{Batch, ListItem, ListItemWithTags, Query} import OItemSearch.{Batch, ListItem, ListItemWithTags, Query}
trait OFulltext[F[_]] { trait OFulltext[F[_]] {
@ -22,7 +26,7 @@ trait OFulltext[F[_]] {
/** Clears the full-text index for the given collective and starts a /** Clears the full-text index for the given collective and starts a
* task indexing all their data. * task indexing all their data.
*/ */
def reindexCollective(collective: Ident): F[Unit] def reindexCollective(account: AccountId): F[Unit]
} }
object OFulltext { object OFulltext {
@ -32,12 +36,27 @@ object OFulltext {
def apply[F[_]: Effect]( def apply[F[_]: Effect](
itemSearch: OItemSearch[F], itemSearch: OItemSearch[F],
fts: FtsClient[F] fts: FtsClient[F],
store: Store[F],
queue: JobQueue[F]
): Resource[F, OFulltext[F]] = ): Resource[F, OFulltext[F]] =
Resource.pure[F, OFulltext[F]](new OFulltext[F] { Resource.pure[F, OFulltext[F]](new OFulltext[F] {
def reindexAll: F[Unit] = ??? def reindexAll: F[Unit] =
for {
job <- JobFactory.reIndexAll[F]
_ <- queue.insertIfNew(job)
} yield ()
def reindexCollective(collective: Ident): F[Unit] = ??? def reindexCollective(account: AccountId): F[Unit] =
for {
exist <- store.transact(
RJob.findNonFinalByTracker(DocspellSystem.migrationTaskTracker)
)
job <- JobFactory.reIndex(account)
_ <-
if (exist.isDefined) ().pure[F]
else queue.insertIfNew(job)
} yield ()
def findItems(q: Query, ftsQ: String, batch: Batch): F[Vector[ListItem]] = def findItems(q: Query, ftsQ: String, batch: Batch): F[Vector[ListItem]] =
findItemsFts(q, ftsQ, batch, itemSearch.findItems) findItemsFts(q, ftsQ, batch, itemSearch.findItems)

View File

@ -5,7 +5,7 @@ import cats.Functor
import cats.data.{EitherT, OptionT} import cats.data.{EitherT, OptionT}
import cats.effect._ import cats.effect._
import cats.implicits._ import cats.implicits._
import docspell.backend.Config import docspell.backend.{Config, JobFactory}
import fs2.Stream import fs2.Stream
import docspell.common._ import docspell.common._
import docspell.common.syntax.all._ import docspell.common.syntax.all._
@ -203,26 +203,7 @@ object OUpload {
account: AccountId, account: AccountId,
prio: Priority, prio: Priority,
tracker: Option[Ident] tracker: Option[Ident]
): F[Vector[RJob]] = { ): F[Vector[RJob]] =
def create(id: Ident, now: Timestamp, arg: ProcessItemArgs): RJob = JobFactory.processItems[F](args, account, prio, tracker)
RJob.newJob(
id,
ProcessItemArgs.taskName,
account.collective,
arg,
arg.makeSubject,
now,
account.user,
prio,
tracker
)
for {
id <- Ident.randomId[F]
now <- Timestamp.current[F]
jobs = args.map(a => create(id, now, a))
} yield jobs
}
}) })
} }

View File

@ -0,0 +1,8 @@
package docspell.common
object DocspellSystem {
val taskGroup = Ident.unsafe("docspell-system")
val migrationTaskTracker = Ident.unsafe("full-text-index-tracker")
}

View File

@ -0,0 +1,24 @@
package docspell.common
import io.circe._
import io.circe.generic.semiauto._
final case class ReIndexTaskArgs(collective: Option[Ident])
object ReIndexTaskArgs {
val taskName = Ident.unsafe("full-text-reindex")
def tracker(args: ReIndexTaskArgs): Ident =
args.collective match {
case Some(cid) =>
cid / DocspellSystem.migrationTaskTracker
case None =>
DocspellSystem.migrationTaskTracker
}
implicit val jsonEncoder: Encoder[ReIndexTaskArgs] =
deriveEncoder[ReIndexTaskArgs]
implicit val jsonDecoder: Decoder[ReIndexTaskArgs] =
deriveDecoder[ReIndexTaskArgs]
}

View File

@ -7,7 +7,7 @@ import docspell.common._
import docspell.backend.ops._ import docspell.backend.ops._
import docspell.joex.hk._ import docspell.joex.hk._
import docspell.joex.notify._ import docspell.joex.notify._
import docspell.joex.fts.MigrationTask import docspell.joex.fts.{MigrationTask, ReIndexTask}
import docspell.joex.scanmailbox._ import docspell.joex.scanmailbox._
import docspell.joex.process.ItemHandler import docspell.joex.process.ItemHandler
import docspell.joex.scheduler._ import docspell.joex.scheduler._
@ -111,6 +111,13 @@ object JoexAppImpl {
MigrationTask.onCancel[F] MigrationTask.onCancel[F]
) )
) )
.withTask(
JobTask.json(
ReIndexTask.taskName,
ReIndexTask[F](cfg.fullTextSearch, fts),
ReIndexTask.onCancel[F]
)
)
.withTask( .withTask(
JobTask.json( JobTask.json(
HouseKeepingTask.taskName, HouseKeepingTask.taskName,

View File

@ -3,10 +3,11 @@ package docspell.joex.fts
import cats.effect._ import cats.effect._
import cats.data.{Kleisli, NonEmptyList} import cats.data.{Kleisli, NonEmptyList}
import cats.{FlatMap, Semigroup} import cats.{FlatMap, Semigroup}
import docspell.store.queries.{QAttachment, QItem} import docspell.common._
import docspell.ftsclient._ import docspell.ftsclient._
import docspell.joex.scheduler.Context import docspell.joex.scheduler.Context
import docspell.joex.Config import docspell.joex.Config
import docspell.store.queries.{QAttachment, QItem}
object FtsWork { object FtsWork {
def apply[F[_]](f: FtsContext[F] => F[Unit]): FtsWork[F] = def apply[F[_]](f: FtsContext[F] => F[Unit]): FtsWork[F] =
@ -21,23 +22,20 @@ object FtsWork {
implicit def semigroup[F[_]: FlatMap]: Semigroup[FtsWork[F]] = implicit def semigroup[F[_]: FlatMap]: Semigroup[FtsWork[F]] =
Semigroup.instance((mt1, mt2) => mt1.flatMap(_ => mt2)) Semigroup.instance((mt1, mt2) => mt1.flatMap(_ => mt2))
implicit final class FtsWorkOps[F[_]](mt: FtsWork[F]) {
def ++(mn: FtsWork[F])(implicit ev: FlatMap[F]): FtsWork[F] =
all(mt, mn)
def forContext(
cfg: Config.FullTextSearch,
fts: FtsClient[F]
): Kleisli[F, Context[F, _], Unit] =
mt.local(ctx => FtsContext(cfg, fts, ctx))
}
// some tasks // some tasks
def initialize[F[_]]: FtsWork[F] = def initialize[F[_]]: FtsWork[F] =
FtsWork(_.fts.initialize) FtsWork(_.fts.initialize)
def insertAll[F[_]: Effect]: FtsWork[F] = def clearIndex[F[_]](coll: Option[Ident]): FtsWork[F] =
coll match {
case Some(cid) =>
FtsWork(ctx => ctx.fts.clear(ctx.logger, cid))
case None =>
FtsWork(ctx => ctx.fts.clearAll(ctx.logger))
}
def insertAll[F[_]: Effect](coll: Option[Ident]): FtsWork[F] =
FtsWork FtsWork
.all( .all(
FtsWork(ctx => FtsWork(ctx =>
@ -45,7 +43,8 @@ object FtsWork {
ctx.logger, ctx.logger,
ctx.store ctx.store
.transact( .transact(
QAttachment.allAttachmentMetaAndName(ctx.cfg.migration.indexAllChunk) QAttachment
.allAttachmentMetaAndName(coll, ctx.cfg.migration.indexAllChunk)
) )
.map(caa => .map(caa =>
TextData TextData
@ -64,9 +63,22 @@ object FtsWork {
ctx.fts.indexData( ctx.fts.indexData(
ctx.logger, ctx.logger,
ctx.store ctx.store
.transact(QItem.allNameAndNotes(ctx.cfg.migration.indexAllChunk * 5)) .transact(QItem.allNameAndNotes(coll, ctx.cfg.migration.indexAllChunk * 5))
.map(nn => TextData.item(nn.id, nn.collective, Option(nn.name), nn.notes)) .map(nn => TextData.item(nn.id, nn.collective, Option(nn.name), nn.notes))
) )
) )
) )
object syntax {
implicit final class FtsWorkOps[F[_]](mt: FtsWork[F]) {
def ++(mn: FtsWork[F])(implicit ev: FlatMap[F]): FtsWork[F] =
all(mt, mn)
def forContext(
cfg: Config.FullTextSearch,
fts: FtsClient[F]
): Kleisli[F, Context[F, _], Unit] =
mt.local(ctx => FtsContext(cfg, fts, ctx))
}
}
} }

View File

@ -7,12 +7,9 @@ import docspell.joex.Config
import docspell.joex.scheduler.Task import docspell.joex.scheduler.Task
import docspell.ftsclient._ import docspell.ftsclient._
import docspell.store.records.RJob import docspell.store.records.RJob
import docspell.joex.hk.HouseKeepingTask
object MigrationTask { object MigrationTask {
val taskName = Ident.unsafe("full-text-index") val taskName = Ident.unsafe("full-text-index")
val tracker = Ident.unsafe("full-text-index-tracker")
val systemGroup = HouseKeepingTask.systemGroup
def apply[F[_]: ConcurrentEffect]( def apply[F[_]: ConcurrentEffect](
cfg: Config.FullTextSearch, cfg: Config.FullTextSearch,
@ -37,20 +34,20 @@ object MigrationTask {
} yield RJob.newJob( } yield RJob.newJob(
id, id,
taskName, taskName,
systemGroup, DocspellSystem.taskGroup,
(), (),
"Create full-text index", "Create full-text index",
now, now,
systemGroup, DocspellSystem.taskGroup,
Priority.Low, Priority.Low,
Some(tracker) Some(DocspellSystem.migrationTaskTracker)
) )
private val solrEngine = Ident.unsafe("solr") private val solrEngine = Ident.unsafe("solr")
def migrationTasks[F[_]: Effect]: List[Migration[F]] = def migrationTasks[F[_]: Effect]: List[Migration[F]] =
List( List(
Migration[F](1, solrEngine, "initialize", FtsWork.initialize[F]), Migration[F](1, solrEngine, "initialize", FtsWork.initialize[F]),
Migration[F](2, solrEngine, "Index all from database", FtsWork.insertAll[F]) Migration[F](2, solrEngine, "Index all from database", FtsWork.insertAll[F](None))
) )
} }

View File

@ -0,0 +1,33 @@
package docspell.joex.fts
import cats.effect._
import docspell.common._
import docspell.joex.Config
import docspell.joex.scheduler.Task
import docspell.ftsclient._
import FtsWork.syntax._
object ReIndexTask {
type Args = ReIndexTaskArgs
val taskName = ReIndexTaskArgs.taskName
val tracker = DocspellSystem.migrationTaskTracker
def apply[F[_]: ConcurrentEffect](
cfg: Config.FullTextSearch,
fts: FtsClient[F]
): Task[F, Args, Unit] =
Task
.log[F, Args](_.info(s"Running full-text re-index now"))
.flatMap(_ =>
Task(ctx =>
(FtsWork.clearIndex(ctx.args.collective) ++ FtsWork.insertAll[F](
ctx.args.collective
)).forContext(cfg, fts).run(ctx)
)
)
def onCancel[F[_]: Sync]: Task[F, Args, Unit] =
Task.log[F, Args](_.warn("Cancelling full-text re-index task"))
}

View File

@ -11,7 +11,6 @@ import docspell.store.records._
object HouseKeepingTask { object HouseKeepingTask {
private val periodicId = Ident.unsafe("docspell-houskeeping") private val periodicId = Ident.unsafe("docspell-houskeeping")
val systemGroup: Ident = Ident.unsafe("docspell-system")
val taskName: Ident = Ident.unsafe("housekeeping") val taskName: Ident = Ident.unsafe("housekeeping")
@ -29,10 +28,10 @@ object HouseKeepingTask {
.createJson( .createJson(
true, true,
taskName, taskName,
systemGroup, DocspellSystem.taskGroup,
(), (),
"Docspell house-keeping", "Docspell house-keeping",
systemGroup, DocspellSystem.taskGroup,
Priority.Low, Priority.Low,
ce ce
) )

View File

@ -147,7 +147,10 @@ object QAttachment {
name: Option[String], name: Option[String],
content: Option[String] content: Option[String]
) )
def allAttachmentMetaAndName(chunkSize: Int): Stream[ConnectionIO, ContentAndName] = { def allAttachmentMetaAndName(
coll: Option[Ident],
chunkSize: Int
): Stream[ConnectionIO, ContentAndName] = {
val aId = RAttachment.Columns.id.prefix("a") val aId = RAttachment.Columns.id.prefix("a")
val aItem = RAttachment.Columns.itemId.prefix("a") val aItem = RAttachment.Columns.itemId.prefix("a")
val aName = RAttachment.Columns.name.prefix("a") val aName = RAttachment.Columns.name.prefix("a")
@ -164,7 +167,9 @@ object QAttachment {
fr"INNER JOIN" ++ RItem.table ++ fr"i ON" ++ iId.is(aItem) ++ fr"INNER JOIN" ++ RItem.table ++ fr"i ON" ++ iId.is(aItem) ++
fr"INNER JOIN" ++ RCollective.table ++ fr"c ON" ++ cId.is(iColl) fr"INNER JOIN" ++ RCollective.table ++ fr"c ON" ++ cId.is(iColl)
selectSimple(cols, from, Fragment.empty) val where = coll.map(cid => iColl.is(cid)).getOrElse(Fragment.empty)
selectSimple(cols, from, where)
.query[ContentAndName] .query[ContentAndName]
.streamWithChunkSize(chunkSize) .streamWithChunkSize(chunkSize)
} }

View File

@ -475,14 +475,18 @@ object QItem {
name: String, name: String,
notes: Option[String] notes: Option[String]
) )
def allNameAndNotes(chunkSize: Int): Stream[ConnectionIO, NameAndNotes] = { def allNameAndNotes(
coll: Option[Ident],
chunkSize: Int
): Stream[ConnectionIO, NameAndNotes] = {
val iId = RItem.Columns.id val iId = RItem.Columns.id
val iColl = RItem.Columns.cid val iColl = RItem.Columns.cid
val iName = RItem.Columns.name val iName = RItem.Columns.name
val iNotes = RItem.Columns.notes val iNotes = RItem.Columns.notes
val cols = Seq(iId, iColl, iName, iNotes) val cols = Seq(iId, iColl, iName, iNotes)
selectSimple(cols, RItem.table, Fragment.empty) val where = coll.map(cid => iColl.is(cid)).getOrElse(Fragment.empty)
selectSimple(cols, RItem.table, where)
.query[NameAndNotes] .query[NameAndNotes]
.streamWithChunkSize(chunkSize) .streamWithChunkSize(chunkSize)
} }