Add task to index current database state

This commit is contained in:
Eike Kettner 2020-06-18 22:38:45 +02:00
parent 146d1b0562
commit 60c079f664
12 changed files with 317 additions and 8 deletions

View File

@ -1,6 +1,7 @@
package docspell.ftsclient
import fs2.Stream
import docspell.common._
/** The fts client is the interface for docspell to a fulltext search
* engine.
@ -12,7 +13,13 @@ import fs2.Stream
*/
trait FtsClient[F[_]] {
/** Optional operation to do some initialization tasks. This is called
* exactly once and then never again. It may be used to setup the
* database.
*/
def initialize: F[Unit]
def searchBasic(q: FtsQuery): Stream[F, FtsBasicResult]
def indexData(data: Stream[F, TextData]): F[Unit]
def indexData(logger: Logger[F], data: Stream[F, TextData]): F[Unit]
}

View File

@ -2,6 +2,7 @@ package docspell.ftssolr
import fs2.Stream
import cats.effect._
import cats.implicits._
import org.http4s.client.Client
import cats.data.NonEmptyList
@ -9,8 +10,15 @@ import docspell.common._
import docspell.ftsclient._
import docspell.ftsclient.FtsBasicResult._
final class SolrFtsClient[F[_]](cfg: SolrConfig, client: Client[F]) extends FtsClient[F] {
final class SolrFtsClient[F[_]: Effect](
cfg: SolrConfig,
client: Client[F]
) extends FtsClient[F] {
println(s"$client $cfg")
def initialize: F[Unit] =
().pure[F]
def searchBasic(q: FtsQuery): Stream[F, FtsBasicResult] =
Stream.emits(
Seq(
@ -25,8 +33,9 @@ final class SolrFtsClient[F[_]](cfg: SolrConfig, client: Client[F]) extends FtsC
)
)
def indexData(data: Stream[F, TextData]): F[Unit] =
???
def indexData(logger: Logger[F], data: Stream[F, TextData]): F[Unit] =
logger.info("Inserting lots of data into index")
}
object SolrFtsClient {

View File

@ -368,6 +368,9 @@ docspell.joex {
# Configuration of the full-text search engine.
full-text-search {
enabled = true
migration = {
index-all-chunk = 10
}
solr = {
url = "http://localhost:8983/solr/docspell_core"
}

View File

@ -37,5 +37,14 @@ object Config {
}
case class UserTasks(scanMailbox: ScanMailbox)
case class FullTextSearch(enabled: Boolean, solr: SolrConfig)
case class FullTextSearch(
enabled: Boolean,
migration: FullTextSearch.Migration,
solr: SolrConfig
)
object FullTextSearch {
final case class Migration(indexAllChunk: Int)
}
}

View File

@ -7,6 +7,7 @@ import docspell.common._
import docspell.backend.ops._
import docspell.joex.hk._
import docspell.joex.notify._
import docspell.joex.fts.IndexTask
import docspell.joex.scanmailbox._
import docspell.joex.process.ItemHandler
import docspell.joex.scheduler._
@ -23,6 +24,7 @@ final class JoexAppImpl[F[_]: ConcurrentEffect: ContextShift: Timer](
cfg: Config,
nodeOps: ONode[F],
store: Store[F],
queue: JobQueue[F],
pstore: PeriodicTaskStore[F],
termSignal: SignallingRef[F, Boolean],
val scheduler: Scheduler[F],
@ -52,7 +54,9 @@ final class JoexAppImpl[F[_]: ConcurrentEffect: ContextShift: Timer](
periodicScheduler.shutdown *> scheduler.shutdown(false) *> termSignal.set(true)
private def scheduleBackgroundTasks: F[Unit] =
HouseKeepingTask.periodicTask[F](cfg.houseKeeping.schedule).flatMap(pstore.insert)
HouseKeepingTask
.periodicTask[F](cfg.houseKeeping.schedule)
.flatMap(pstore.insert) *> IndexTask.job.flatMap(queue.insert)
}
object JoexAppImpl {
@ -99,6 +103,13 @@ object JoexAppImpl {
ScanMailboxTask.onCancel[F]
)
)
.withTask(
JobTask.json(
IndexTask.taskName,
IndexTask[F](cfg.fullTextSearch, fts),
IndexTask.onCancel[F]
)
)
.withTask(
JobTask.json(
HouseKeepingTask.taskName,
@ -115,7 +126,7 @@ object JoexAppImpl {
client,
Timer[F]
)
app = new JoexAppImpl(cfg, nodeOps, store, pstore, termSignal, sch, psch)
app = new JoexAppImpl(cfg, nodeOps, store, queue, pstore, termSignal, sch, psch)
appR <- Resource.make(app.init.map(_ => app))(_.shutdown)
} yield appR
}

View File

@ -0,0 +1,48 @@
package docspell.joex.fts
import cats.effect._
import cats.implicits._
import docspell.common._
import docspell.joex.Config
import docspell.joex.scheduler.Task
import docspell.ftsclient._
import docspell.store.records.RJob
import docspell.joex.hk.HouseKeepingTask
object IndexTask {
val taskName: Ident = Ident.unsafe("full-text-index")
val systemGroup = HouseKeepingTask.systemGroup
def apply[F[_]: ConcurrentEffect](
cfg: Config.FullTextSearch,
fts: FtsClient[F]
): Task[F, Unit, Unit] =
Task
.log[F, Unit](_.info(s"Running full-text-index task now"))
.flatMap(_ =>
Task(ctx =>
Migration[F](cfg, ctx.store, fts, ctx.logger)
.run(Migration.migrationTasks[F])
)
)
def onCancel[F[_]: Sync]: Task[F, Unit, Unit] =
Task.log[F, Unit](_.warn("Cancelling full-text-index task"))
def job[F[_]: Sync]: F[RJob] =
for {
id <- Ident.randomId[F]
now <- Timestamp.current[F]
} yield RJob.newJob(
id,
taskName,
systemGroup,
(),
"Create full-text index",
now,
systemGroup,
Priority.Low,
None
)
}

View File

@ -0,0 +1,110 @@
package docspell.joex.fts
import cats.effect._
import cats.implicits._
import cats.data.{Kleisli, OptionT}
import cats.Traverse
import docspell.common._
import docspell.joex.Config
import docspell.store.{AddResult, Store}
import docspell.store.records.RFtsMigration
import docspell.store.queries.{QAttachment, QItem}
import docspell.ftsclient._
object Migration {
private val solrEngine = Ident.unsafe("solr")
case class MigrateCtx[F[_]](
cfg: Config.FullTextSearch,
store: Store[F],
fts: FtsClient[F],
logger: Logger[F]
)
case class Migration[F[_]](
version: Int,
engine: Ident,
description: String,
task: Kleisli[F, MigrateCtx[F], Unit]
)
def apply[F[_]: Effect](
cfg: Config.FullTextSearch,
store: Store[F],
fts: FtsClient[F],
logger: Logger[F]
): Kleisli[F, List[Migration[F]], Unit] = {
val ctx = MigrateCtx(cfg, store, fts, logger)
Kleisli(migs => Traverse[List].sequence(migs.map(applySingle[F](ctx))).map(_ => ()))
}
def applySingle[F[_]: Effect](ctx: MigrateCtx[F])(m: Migration[F]): F[Unit] = {
val insertRecord: F[Option[RFtsMigration]] =
for {
rec <- RFtsMigration.create(m.version, m.engine, m.description)
res <- ctx.store.add(
RFtsMigration.insert(rec),
RFtsMigration.exists(m.version, m.engine)
)
ret <- res match {
case AddResult.Success => rec.some.pure[F]
case AddResult.EntityExists(_) => None.pure[F]
case AddResult.Failure(ex) => Effect[F].raiseError(ex)
}
} yield ret
(for {
_ <- OptionT.liftF(ctx.logger.info(s"Apply ${m.version}/${m.description}"))
rec <- OptionT(insertRecord)
res <- OptionT.liftF(m.task.run(ctx).attempt)
_ <- OptionT.liftF(res match {
case Right(()) => ().pure[F]
case Left(ex) =>
ctx.logger.error(ex)(
s"Applying index migration ${m.version}/${m.description} failed"
) *>
ctx.store.transact(RFtsMigration.deleteById(rec.id)) *> Effect[F].raiseError(
ex
)
})
} yield ()).getOrElseF(
ctx.logger.info(s"Migration ${m.version}/${m.description} already applied.")
)
}
def migrationTasks[F[_]]: List[Migration[F]] =
List(
Migration[F](1, solrEngine, "initialize", Kleisli(ctx => ctx.fts.initialize)),
Migration[F](
2,
solrEngine,
"Index all attachments from database",
Kleisli(ctx =>
ctx.fts.indexData(
ctx.logger,
ctx.store
.transact(
QAttachment.allAttachmentMetaAndName(ctx.cfg.migration.indexAllChunk)
)
.map(caa =>
TextData
.attachment(caa.item, caa.id, caa.collective, caa.name, caa.content)
)
)
)
),
Migration[F](
3,
solrEngine,
"Index all items from database",
Kleisli(ctx =>
ctx.fts.indexData(
ctx.logger,
ctx.store
.transact(QItem.allNameAndNotes(ctx.cfg.migration.indexAllChunk * 5))
.map(nn => TextData.item(nn.id, nn.collective, Option(nn.name), nn.notes))
)
)
)
)
}

View File

@ -32,7 +32,7 @@ object TextExtraction {
)
_ <- ctx.logger.debug("Storing extracted texts")
_ <- txt.toList.traverse(rm => ctx.store.transact(RAttachmentMeta.upsert(rm._1)))
_ <- fts.indexData(Stream.emits(txt.map(_._2)))
_ <- fts.indexData(ctx.logger, Stream.emits(txt.map(_._2)))
dur <- start
_ <- ctx.logger.info(s"Text extraction finished in ${dur.formatExact}")
} yield item.copy(metas = txt.map(_._1))

View File

@ -0,0 +1,10 @@
CREATE TABLE "fts_migration" (
"id" varchar(254) not null primary key,
"version" int not null,
"fts_engine" varchar(254) not null,
"description" varchar(254) not null,
"created" timestamp not null
);
CREATE UNIQE INDEX "fts_migration_version_engine_idx"
ON "fts_migration"("version", "fts_engine");

View File

@ -138,4 +138,30 @@ object QAttachment {
q.query[RAttachmentMeta].option
}
case class ContentAndName(
id: Ident,
item: Ident,
collective: Ident,
name: Option[String],
content: Option[String]
)
def allAttachmentMetaAndName(chunkSize: Int): Stream[ConnectionIO, ContentAndName] = {
val aId = RAttachment.Columns.id.prefix("a")
val aItem = RAttachment.Columns.itemId.prefix("a")
val aName = RAttachment.Columns.name.prefix("a")
val mId = RAttachmentMeta.Columns.id.prefix("m")
val mContent = RAttachmentMeta.Columns.content.prefix("m")
val iId = RItem.Columns.id.prefix("i")
val iColl = RItem.Columns.cid.prefix("i")
val cols = Seq(aId, aItem, iColl, aName, mContent)
val from = RAttachment.table ++ fr"a INNER JOIN" ++
RAttachmentMeta.table ++ fr"m ON" ++ aId.is(mId) ++
fr"INNER JOIN" ++ RItem.table ++ fr"i ON" ++ iId.is(aItem)
selectSimple(cols, from, Fragment.empty)
.query[ContentAndName]
.streamWithChunkSize(chunkSize)
}
}

View File

@ -469,4 +469,21 @@ object QItem {
prefix(suffix(value))
}
final case class NameAndNotes(
id: Ident,
collective: Ident,
name: String,
notes: Option[String]
)
def allNameAndNotes(chunkSize: Int): Stream[ConnectionIO, NameAndNotes] = {
val iId = RItem.Columns.id
val iColl = RItem.Columns.cid
val iName = RItem.Columns.name
val iNotes = RItem.Columns.notes
val cols = Seq(iId, iColl, iName, iNotes)
selectSimple(cols, RItem.table, Fragment.empty)
.query[NameAndNotes]
.streamWithChunkSize(chunkSize)
}
}

View File

@ -0,0 +1,59 @@
package docspell.store.records
import cats.implicits._
import cats.effect._
import doobie._
import doobie.implicits._
import docspell.common._
import docspell.store.impl._
import docspell.store.impl.Implicits._
final case class RFtsMigration(
id: Ident,
version: Int,
ftsEngine: Ident,
description: String,
created: Timestamp
)
object RFtsMigration {
def create[F[_]: Sync](
version: Int,
ftsEngine: Ident,
description: String
): F[RFtsMigration] =
for {
newId <- Ident.randomId[F]
now <- Timestamp.current[F]
} yield RFtsMigration(newId, version, ftsEngine, description, now)
val table = fr"fts_migration"
object Columns {
val id = Column("id")
val version = Column("version")
val ftsEngine = Column("fts_engine")
val description = Column("description")
val created = Column("created")
val all = List(id, version, ftsEngine, description, created)
}
import Columns._
def insert(v: RFtsMigration): ConnectionIO[Int] =
insertRow(
table,
all,
fr"${v.id},${v.version},${v.ftsEngine},${v.description},${v.created}"
).update.run
def exists(vers: Int, engine: Ident): ConnectionIO[Boolean] =
selectCount(id, table, and(version.is(vers), ftsEngine.is(engine)))
.query[Int]
.unique
.map(_ > 0)
def deleteById(rId: Ident): ConnectionIO[Int] =
deleteFrom(table, id.is(rId)).update.run
}