Setup solr schema and index all data using a system task

The task runs on application start. It sets the schema using solr's
schema api and then indexes all data in the database. Each step is
memorized so that it is not executed again on subsequent starts.
This commit is contained in:
Eike Kettner 2020-06-19 20:49:59 +02:00
parent 1f4220eccb
commit 2a0bf24088
15 changed files with 185 additions and 37 deletions

View File

@ -0,0 +1,19 @@
package docspell.ftssolr
object Fields {
val discriminator = "discriminator"
val id = "id"
val itemId = "itemId"
val collectiveId = "collectiveId"
object Attachment {
val attachmentId = "attachmentId"
val attachmentName = "attachmentName"
val content = "content"
}
object Item {
val itemName = "itemName"
val itemNotes = "itemNotes"
}
}

View File

@ -3,30 +3,32 @@ package docspell.ftssolr
import docspell.common._ import docspell.common._
import docspell.ftsclient._ import docspell.ftsclient._
import io.circe._ import io.circe._
import Fields.{Item, Attachment}
trait JsonCodec { trait JsonCodec {
implicit def attachmentEncoder: Encoder[TextData.Attachment] = implicit def attachmentEncoder: Encoder[TextData.Attachment] =
new Encoder[TextData.Attachment] { new Encoder[TextData.Attachment] {
final def apply(td: TextData.Attachment): Json = Json.obj( final def apply(td: TextData.Attachment): Json = Json.obj(
("id", Ident.encodeIdent(td.id)), (Fields.id, Ident.encodeIdent(td.id)),
("item", Ident.encodeIdent(td.item)), (Fields.itemId, Ident.encodeIdent(td.item)),
("collective", Ident.encodeIdent(td.collective)), (Fields.collectiveId, Ident.encodeIdent(td.collective)),
("attachmentName", Json.fromString(td.name.getOrElse(""))), (Attachment.attachmentId, Ident.encodeIdent(td.attachId)),
("content", Json.fromString(td.text.getOrElse(""))), (Attachment.attachmentName, Json.fromString(td.name.getOrElse(""))),
("discriminator", Json.fromString("attachment")) (Attachment.content, Json.fromString(td.text.getOrElse(""))),
(Fields.discriminator, Json.fromString("attachment"))
) )
} }
implicit def itemEncoder: Encoder[TextData.Item] = implicit def itemEncoder: Encoder[TextData.Item] =
new Encoder[TextData.Item] { new Encoder[TextData.Item] {
final def apply(td: TextData.Item): Json = Json.obj( final def apply(td: TextData.Item): Json = Json.obj(
("id", Ident.encodeIdent(td.id)), (Fields.id, Ident.encodeIdent(td.id)),
("item", Ident.encodeIdent(td.item)), (Fields.itemId, Ident.encodeIdent(td.item)),
("collective", Ident.encodeIdent(td.collective)), (Fields.collectiveId, Ident.encodeIdent(td.collective)),
("itemName", Json.fromString(td.name.getOrElse(""))), (Item.itemName, Json.fromString(td.name.getOrElse(""))),
("itemNotes", Json.fromString(td.notes.getOrElse(""))), (Item.itemNotes, Json.fromString(td.notes.getOrElse(""))),
("discriminator", Json.fromString("item")) (Fields.discriminator, Json.fromString("item"))
) )
} }

View File

@ -2,6 +2,6 @@ package docspell.ftssolr
import docspell.common._ import docspell.common._
final case class SolrConfig(url: LenientUri) final case class SolrConfig(url: LenientUri, commitWithin: Int)
object SolrConfig {} object SolrConfig {}

View File

@ -11,11 +11,12 @@ import docspell.ftsclient._
import docspell.ftsclient.FtsBasicResult._ import docspell.ftsclient.FtsBasicResult._
final class SolrFtsClient[F[_]: Effect]( final class SolrFtsClient[F[_]: Effect](
solrUpdate: SolrUpdate[F] solrUpdate: SolrUpdate[F],
solrSetup: SolrSetup[F]
) extends FtsClient[F] { ) extends FtsClient[F] {
def initialize: F[Unit] = def initialize: F[Unit] =
().pure[F] solrSetup.setupSchema
def searchBasic(q: FtsQuery): Stream[F, FtsBasicResult] = def searchBasic(q: FtsQuery): Stream[F, FtsBasicResult] =
Stream.emits( Stream.emits(
@ -52,7 +53,7 @@ object SolrFtsClient {
httpClient: Client[F] httpClient: Client[F]
): Resource[F, FtsClient[F]] = ): Resource[F, FtsClient[F]] =
Resource.pure[F, FtsClient[F]]( Resource.pure[F, FtsClient[F]](
new SolrFtsClient(SolrUpdate(cfg, httpClient)) new SolrFtsClient(SolrUpdate(cfg, httpClient), SolrSetup(cfg, httpClient))
) )
} }

View File

@ -0,0 +1,102 @@
package docspell.ftssolr
import cats.effect._
import org.http4s._
import cats.implicits._
import org.http4s.client.Client
import org.http4s.circe._
import org.http4s.client.dsl.Http4sClientDsl
import org.log4s.getLogger
import _root_.io.circe.syntax._
import _root_.io.circe._
import _root_.io.circe.generic.semiauto._
import Fields.{Attachment, Item}
trait SolrSetup[F[_]] {
def setupSchema: F[Unit]
}
object SolrSetup {
private[this] val logger = getLogger
def apply[F[_]: ConcurrentEffect](cfg: SolrConfig, client: Client[F]): SolrSetup[F] = {
val dsl = new Http4sClientDsl[F] {}
import dsl._
new SolrSetup[F] {
val url = (Uri.unsafeFromString(cfg.url.asString) / "schema")
.withQueryParam("commitWithin", cfg.commitWithin.toString)
def setupSchema: F[Unit] = {
val cmds0 =
List(
Fields.id,
Fields.itemId,
Fields.collectiveId,
Fields.discriminator,
Attachment.attachmentId
)
.traverse(addStringField)
val cmds1 = List(
Attachment.attachmentName,
Attachment.content,
Item.itemName,
Item.itemNotes
)
.traverse(addTextField)
cmds0 *> cmds1 *> ().pure[F]
}
private def run(cmd: Json): F[Unit] = {
val req = Method.POST(cmd, url)
logger.debug(s"Running request $req: ${cmd.noSpaces}")
client.expect[String](req).map(r => logger.debug(s"Response: $r"))
}
private def addStringField(name: String): F[Unit] =
run(DeleteField.command(DeleteField(name))).attempt *>
run(AddField.command(AddField.string(name)))
private def addTextField(name: String): F[Unit] =
run(DeleteField.command(DeleteField(name))).attempt *>
run(AddField.command(AddField.text(name)))
}
}
// Schema Commands
case class AddField(
name: String,
`type`: String,
stored: Boolean,
indexed: Boolean,
multiValued: Boolean
)
object AddField {
implicit val encoder: Encoder[AddField] =
deriveEncoder[AddField]
def command(body: AddField): Json =
Map("add-field" -> body.asJson).asJson
def string(name: String): AddField =
AddField(name, "string", true, true, false)
def text(name: String): AddField =
AddField(name, "text_general", true, true, false)
}
case class DeleteField(name: String)
object DeleteField {
implicit val encoder: Encoder[DeleteField] =
deriveEncoder[DeleteField]
def command(body: DeleteField): Json =
Map("delete-field" -> body.asJson).asJson
}
}

View File

@ -29,7 +29,7 @@ object SolrUpdate {
new SolrUpdate[F] { new SolrUpdate[F] {
val url = (Uri.unsafeFromString(cfg.url.asString) / "update") val url = (Uri.unsafeFromString(cfg.url.asString) / "update")
.withQueryParam("commitWithin", "1000") .withQueryParam("commitWithin", cfg.commitWithin.toString)
.withQueryParam("overwrite", "true") .withQueryParam("overwrite", "true")
.withQueryParam("wt", "json") .withQueryParam("wt", "json")

View File

@ -373,6 +373,7 @@ docspell.joex {
} }
solr = { solr = {
url = "http://localhost:8983/solr/docspell_core" url = "http://localhost:8983/solr/docspell_core"
commit-within = 1000
} }
} }
} }

View File

@ -56,7 +56,7 @@ final class JoexAppImpl[F[_]: ConcurrentEffect: ContextShift: Timer](
private def scheduleBackgroundTasks: F[Unit] = private def scheduleBackgroundTasks: F[Unit] =
HouseKeepingTask HouseKeepingTask
.periodicTask[F](cfg.houseKeeping.schedule) .periodicTask[F](cfg.houseKeeping.schedule)
.flatMap(pstore.insert) *> IndexTask.job.flatMap(queue.insert) .flatMap(pstore.insert) *> IndexTask.job.flatMap(queue.insertIfNew)
} }
object JoexAppImpl { object JoexAppImpl {

View File

@ -42,7 +42,7 @@ object IndexTask {
now, now,
systemGroup, systemGroup,
Priority.Low, Priority.Low,
None Some(taskName)
) )
} }

View File

@ -57,22 +57,23 @@ object Migration {
_ <- OptionT.liftF(ctx.logger.info(s"Apply ${m.version}/${m.description}")) _ <- OptionT.liftF(ctx.logger.info(s"Apply ${m.version}/${m.description}"))
rec <- OptionT(insertRecord) rec <- OptionT(insertRecord)
res <- OptionT.liftF(m.task.run(ctx).attempt) res <- OptionT.liftF(m.task.run(ctx).attempt)
_ <- OptionT.liftF(res match { ret <- OptionT.liftF(res match {
case Right(()) => ().pure[F] case Right(()) => ().pure[F]
case Left(ex) => case Left(ex) =>
ctx.logger.error(ex)( ctx.logger.error(ex)(
s"Applying index migration ${m.version}/${m.description} failed" s"Applying index migration ${m.version}/${m.description} failed"
) *> ) *>
ctx.store.transact(RFtsMigration.deleteById(rec.id)) *> Effect[F].raiseError( ctx.store.transact(RFtsMigration.deleteById(rec.id)) *> Effect[F]
ex .raiseError[Unit](
) ex
)
}) })
} yield ()).getOrElseF( } yield ret).getOrElseF(
ctx.logger.info(s"Migration ${m.version}/${m.description} already applied.") ctx.logger.info(s"Migration ${m.version}/${m.description} already applied.")
) )
} }
def migrationTasks[F[_]]: List[Migration[F]] = def migrationTasks[F[_]: Effect]: List[Migration[F]] =
List( List(
Migration[F](1, solrEngine, "initialize", Kleisli(ctx => ctx.fts.initialize)), Migration[F](1, solrEngine, "initialize", Kleisli(ctx => ctx.fts.initialize)),
Migration[F]( Migration[F](

View File

@ -153,6 +153,7 @@ docspell.server {
enabled = true enabled = true
solr = { solr = {
url = "http://localhost:8983/solr/docspell_core" url = "http://localhost:8983/solr/docspell_core"
commit-within = 1000
} }
} }
} }

View File

@ -51,15 +51,4 @@ object QPeriodicTask {
selectSimple(RPeriodicTask.Columns.all, RPeriodicTask.table, where) ++ order selectSimple(RPeriodicTask.Columns.all, RPeriodicTask.table, where) ++ order
sql.query[RPeriodicTask].streamWithChunkSize(2).take(1).compile.last sql.query[RPeriodicTask].streamWithChunkSize(2).take(1).compile.last
} }
def findNonFinal(pid: Ident): ConnectionIO[Option[RJob]] =
selectSimple(
RJob.Columns.all,
RJob.table,
and(
RJob.Columns.tracker.is(pid),
RJob.Columns.state.isOneOf(JobState.all.diff(JobState.done).toSeq)
)
).query[RJob].option
} }

View File

@ -11,8 +11,19 @@ import org.log4s._
trait JobQueue[F[_]] { trait JobQueue[F[_]] {
/** Inserts the job into the queue to get picked up as soon as
* possible. The job must have a new unique id.
*/
def insert(job: RJob): F[Unit] def insert(job: RJob): F[Unit]
/** Inserts the job into the queue only, if there is no job with the
* same tracker-id running at the moment. The job id must be a new
* unique id.
*
* If the job has no tracker defined, it is simply inserted.
*/
def insertIfNew(job: RJob): F[Unit]
def insertAll(jobs: Seq[RJob]): F[Unit] def insertAll(jobs: Seq[RJob]): F[Unit]
def nextJob( def nextJob(
@ -46,6 +57,19 @@ object JobQueue {
else ().pure[F] else ().pure[F]
} }
def insertIfNew(job: RJob): F[Unit] =
for {
rj <- job.tracker match {
case Some(tid) =>
store.transact(RJob.findNonFinalByTracker(tid))
case None =>
None.pure[F]
}
ret <-
if (rj.isDefined) ().pure[F]
else insert(job)
} yield ret
def insertAll(jobs: Seq[RJob]): F[Unit] = def insertAll(jobs: Seq[RJob]): F[Unit] =
jobs.toList jobs.toList
.traverse(j => insert(j).attempt) .traverse(j => insert(j).attempt)

View File

@ -100,7 +100,7 @@ object PeriodicTaskStore {
} }
def findNonFinalJob(pjobId: Ident): F[Option[RJob]] = def findNonFinalJob(pjobId: Ident): F[Option[RJob]] =
store.transact(QPeriodicTask.findNonFinal(pjobId)) store.transact(RJob.findNonFinalByTracker(pjobId))
def insert(task: RPeriodicTask): F[Unit] = { def insert(task: RPeriodicTask): F[Unit] = {
val update = store.transact(RPeriodicTask.update(task)) val update = store.transact(RPeriodicTask.update(task))

View File

@ -255,4 +255,12 @@ object RJob {
.map(_ => 1) .map(_ => 1)
.compile .compile
.foldMonoid .foldMonoid
def findNonFinalByTracker(trackerId: Ident): ConnectionIO[Option[RJob]] =
selectSimple(
all,
table,
and(tracker.is(trackerId), state.isOneOf(JobState.all.diff(JobState.done).toSeq))
).query[RJob].option
} }