diff --git a/modules/fts-solr/src/main/scala/docspell/ftssolr/Fields.scala b/modules/fts-solr/src/main/scala/docspell/ftssolr/Fields.scala new file mode 100644 index 00000000..f9ecc354 --- /dev/null +++ b/modules/fts-solr/src/main/scala/docspell/ftssolr/Fields.scala @@ -0,0 +1,19 @@ +package docspell.ftssolr + +object Fields { + val discriminator = "discriminator" + val id = "id" + val itemId = "itemId" + val collectiveId = "collectiveId" + + object Attachment { + val attachmentId = "attachmentId" + val attachmentName = "attachmentName" + val content = "content" + } + + object Item { + val itemName = "itemName" + val itemNotes = "itemNotes" + } +} diff --git a/modules/fts-solr/src/main/scala/docspell/ftssolr/JsonCodec.scala b/modules/fts-solr/src/main/scala/docspell/ftssolr/JsonCodec.scala index d240e8e1..9ad35645 100644 --- a/modules/fts-solr/src/main/scala/docspell/ftssolr/JsonCodec.scala +++ b/modules/fts-solr/src/main/scala/docspell/ftssolr/JsonCodec.scala @@ -3,30 +3,32 @@ package docspell.ftssolr import docspell.common._ import docspell.ftsclient._ import io.circe._ +import Fields.{Item, Attachment} trait JsonCodec { implicit def attachmentEncoder: Encoder[TextData.Attachment] = new Encoder[TextData.Attachment] { final def apply(td: TextData.Attachment): Json = Json.obj( - ("id", Ident.encodeIdent(td.id)), - ("item", Ident.encodeIdent(td.item)), - ("collective", Ident.encodeIdent(td.collective)), - ("attachmentName", Json.fromString(td.name.getOrElse(""))), - ("content", Json.fromString(td.text.getOrElse(""))), - ("discriminator", Json.fromString("attachment")) + (Fields.id, Ident.encodeIdent(td.id)), + (Fields.itemId, Ident.encodeIdent(td.item)), + (Fields.collectiveId, Ident.encodeIdent(td.collective)), + (Attachment.attachmentId, Ident.encodeIdent(td.attachId)), + (Attachment.attachmentName, Json.fromString(td.name.getOrElse(""))), + (Attachment.content, Json.fromString(td.text.getOrElse(""))), + (Fields.discriminator, Json.fromString("attachment")) ) } implicit def itemEncoder: Encoder[TextData.Item] = new Encoder[TextData.Item] { final def apply(td: TextData.Item): Json = Json.obj( - ("id", Ident.encodeIdent(td.id)), - ("item", Ident.encodeIdent(td.item)), - ("collective", Ident.encodeIdent(td.collective)), - ("itemName", Json.fromString(td.name.getOrElse(""))), - ("itemNotes", Json.fromString(td.notes.getOrElse(""))), - ("discriminator", Json.fromString("item")) + (Fields.id, Ident.encodeIdent(td.id)), + (Fields.itemId, Ident.encodeIdent(td.item)), + (Fields.collectiveId, Ident.encodeIdent(td.collective)), + (Item.itemName, Json.fromString(td.name.getOrElse(""))), + (Item.itemNotes, Json.fromString(td.notes.getOrElse(""))), + (Fields.discriminator, Json.fromString("item")) ) } diff --git a/modules/fts-solr/src/main/scala/docspell/ftssolr/SolrConfig.scala b/modules/fts-solr/src/main/scala/docspell/ftssolr/SolrConfig.scala index 41fb5ad2..89ecb1ea 100644 --- a/modules/fts-solr/src/main/scala/docspell/ftssolr/SolrConfig.scala +++ b/modules/fts-solr/src/main/scala/docspell/ftssolr/SolrConfig.scala @@ -2,6 +2,6 @@ package docspell.ftssolr import docspell.common._ -final case class SolrConfig(url: LenientUri) +final case class SolrConfig(url: LenientUri, commitWithin: Int) object SolrConfig {} diff --git a/modules/fts-solr/src/main/scala/docspell/ftssolr/SolrFtsClient.scala b/modules/fts-solr/src/main/scala/docspell/ftssolr/SolrFtsClient.scala index f6281f1f..14eaa8d2 100644 --- a/modules/fts-solr/src/main/scala/docspell/ftssolr/SolrFtsClient.scala +++ b/modules/fts-solr/src/main/scala/docspell/ftssolr/SolrFtsClient.scala @@ -11,11 +11,12 @@ import docspell.ftsclient._ import docspell.ftsclient.FtsBasicResult._ final class SolrFtsClient[F[_]: Effect]( - solrUpdate: SolrUpdate[F] + solrUpdate: SolrUpdate[F], + solrSetup: SolrSetup[F] ) extends FtsClient[F] { def initialize: F[Unit] = - ().pure[F] + solrSetup.setupSchema def searchBasic(q: FtsQuery): Stream[F, FtsBasicResult] = Stream.emits( @@ -52,7 +53,7 @@ object SolrFtsClient { httpClient: Client[F] ): Resource[F, FtsClient[F]] = Resource.pure[F, FtsClient[F]]( - new SolrFtsClient(SolrUpdate(cfg, httpClient)) + new SolrFtsClient(SolrUpdate(cfg, httpClient), SolrSetup(cfg, httpClient)) ) } diff --git a/modules/fts-solr/src/main/scala/docspell/ftssolr/SolrSetup.scala b/modules/fts-solr/src/main/scala/docspell/ftssolr/SolrSetup.scala new file mode 100644 index 00000000..616dfaff --- /dev/null +++ b/modules/fts-solr/src/main/scala/docspell/ftssolr/SolrSetup.scala @@ -0,0 +1,102 @@ +package docspell.ftssolr + +import cats.effect._ +import org.http4s._ +import cats.implicits._ +import org.http4s.client.Client +import org.http4s.circe._ +import org.http4s.client.dsl.Http4sClientDsl +import org.log4s.getLogger +import _root_.io.circe.syntax._ +import _root_.io.circe._ +import _root_.io.circe.generic.semiauto._ + +import Fields.{Attachment, Item} + +trait SolrSetup[F[_]] { + + def setupSchema: F[Unit] + +} + +object SolrSetup { + private[this] val logger = getLogger + + def apply[F[_]: ConcurrentEffect](cfg: SolrConfig, client: Client[F]): SolrSetup[F] = { + val dsl = new Http4sClientDsl[F] {} + import dsl._ + + new SolrSetup[F] { + val url = (Uri.unsafeFromString(cfg.url.asString) / "schema") + .withQueryParam("commitWithin", cfg.commitWithin.toString) + + def setupSchema: F[Unit] = { + val cmds0 = + List( + Fields.id, + Fields.itemId, + Fields.collectiveId, + Fields.discriminator, + Attachment.attachmentId + ) + .traverse(addStringField) + val cmds1 = List( + Attachment.attachmentName, + Attachment.content, + Item.itemName, + Item.itemNotes + ) + .traverse(addTextField) + + cmds0 *> cmds1 *> ().pure[F] + } + + private def run(cmd: Json): F[Unit] = { + val req = Method.POST(cmd, url) + logger.debug(s"Running request $req: ${cmd.noSpaces}") + client.expect[String](req).map(r => logger.debug(s"Response: $r")) + } + + private def addStringField(name: String): F[Unit] = + run(DeleteField.command(DeleteField(name))).attempt *> + run(AddField.command(AddField.string(name))) + + private def addTextField(name: String): F[Unit] = + run(DeleteField.command(DeleteField(name))).attempt *> + run(AddField.command(AddField.text(name))) + + } + } + + // Schema Commands + + case class AddField( + name: String, + `type`: String, + stored: Boolean, + indexed: Boolean, + multiValued: Boolean + ) + object AddField { + implicit val encoder: Encoder[AddField] = + deriveEncoder[AddField] + + def command(body: AddField): Json = + Map("add-field" -> body.asJson).asJson + + def string(name: String): AddField = + AddField(name, "string", true, true, false) + + def text(name: String): AddField = + AddField(name, "text_general", true, true, false) + } + + case class DeleteField(name: String) + object DeleteField { + implicit val encoder: Encoder[DeleteField] = + deriveEncoder[DeleteField] + + def command(body: DeleteField): Json = + Map("delete-field" -> body.asJson).asJson + } +} diff --git a/modules/fts-solr/src/main/scala/docspell/ftssolr/SolrUpdate.scala b/modules/fts-solr/src/main/scala/docspell/ftssolr/SolrUpdate.scala index 7a73d45c..3b772cd2 100644 --- a/modules/fts-solr/src/main/scala/docspell/ftssolr/SolrUpdate.scala +++ b/modules/fts-solr/src/main/scala/docspell/ftssolr/SolrUpdate.scala @@ -29,7 +29,7 @@ object SolrUpdate { new SolrUpdate[F] { val url = (Uri.unsafeFromString(cfg.url.asString) / "update") - .withQueryParam("commitWithin", "1000") + .withQueryParam("commitWithin", cfg.commitWithin.toString) .withQueryParam("overwrite", "true") .withQueryParam("wt", "json") diff --git a/modules/joex/src/main/resources/reference.conf b/modules/joex/src/main/resources/reference.conf index efb9c5fc..3bcfd5e6 100644 --- a/modules/joex/src/main/resources/reference.conf +++ b/modules/joex/src/main/resources/reference.conf @@ -373,6 +373,7 @@ docspell.joex { } solr = { url = "http://localhost:8983/solr/docspell_core" + commit-within = 1000 } } } \ No newline at end of file diff --git a/modules/joex/src/main/scala/docspell/joex/JoexAppImpl.scala b/modules/joex/src/main/scala/docspell/joex/JoexAppImpl.scala index 05e5e940..c8a32fbf 100644 --- a/modules/joex/src/main/scala/docspell/joex/JoexAppImpl.scala +++ b/modules/joex/src/main/scala/docspell/joex/JoexAppImpl.scala @@ -56,7 +56,7 @@ final class JoexAppImpl[F[_]: ConcurrentEffect: ContextShift: Timer]( private def scheduleBackgroundTasks: F[Unit] = HouseKeepingTask .periodicTask[F](cfg.houseKeeping.schedule) - .flatMap(pstore.insert) *> IndexTask.job.flatMap(queue.insert) + .flatMap(pstore.insert) *> IndexTask.job.flatMap(queue.insertIfNew) } object JoexAppImpl { diff --git a/modules/joex/src/main/scala/docspell/joex/fts/IndexTask.scala b/modules/joex/src/main/scala/docspell/joex/fts/IndexTask.scala index 5be969d9..c7298943 100644 --- a/modules/joex/src/main/scala/docspell/joex/fts/IndexTask.scala +++ b/modules/joex/src/main/scala/docspell/joex/fts/IndexTask.scala @@ -42,7 +42,7 @@ object IndexTask { now, systemGroup, Priority.Low, - None + Some(taskName) ) } diff --git a/modules/joex/src/main/scala/docspell/joex/fts/Migration.scala b/modules/joex/src/main/scala/docspell/joex/fts/Migration.scala index 546198ad..88af2ef4 100644 --- a/modules/joex/src/main/scala/docspell/joex/fts/Migration.scala +++ b/modules/joex/src/main/scala/docspell/joex/fts/Migration.scala @@ -57,22 +57,23 @@ object Migration { _ <- OptionT.liftF(ctx.logger.info(s"Apply ${m.version}/${m.description}")) rec <- OptionT(insertRecord) res <- OptionT.liftF(m.task.run(ctx).attempt) - _ <- OptionT.liftF(res match { + ret <- OptionT.liftF(res match { case Right(()) => ().pure[F] case Left(ex) => ctx.logger.error(ex)( s"Applying index migration ${m.version}/${m.description} failed" ) *> - ctx.store.transact(RFtsMigration.deleteById(rec.id)) *> Effect[F].raiseError( - ex - ) + ctx.store.transact(RFtsMigration.deleteById(rec.id)) *> Effect[F] + .raiseError[Unit]( + ex + ) }) - } yield ()).getOrElseF( + } yield ret).getOrElseF( ctx.logger.info(s"Migration ${m.version}/${m.description} already applied.") ) } - def migrationTasks[F[_]]: List[Migration[F]] = + def migrationTasks[F[_]: Effect]: List[Migration[F]] = List( Migration[F](1, solrEngine, "initialize", Kleisli(ctx => ctx.fts.initialize)), Migration[F]( diff --git a/modules/restserver/src/main/resources/reference.conf b/modules/restserver/src/main/resources/reference.conf index c3a09ae2..3ac7e9e5 100644 --- a/modules/restserver/src/main/resources/reference.conf +++ b/modules/restserver/src/main/resources/reference.conf @@ -153,6 +153,7 @@ docspell.server { enabled = true solr = { url = "http://localhost:8983/solr/docspell_core" + commit-within = 1000 } } } diff --git a/modules/store/src/main/scala/docspell/store/queries/QPeriodicTask.scala b/modules/store/src/main/scala/docspell/store/queries/QPeriodicTask.scala index e52c27cc..21624f7a 100644 --- a/modules/store/src/main/scala/docspell/store/queries/QPeriodicTask.scala +++ b/modules/store/src/main/scala/docspell/store/queries/QPeriodicTask.scala @@ -51,15 +51,4 @@ object QPeriodicTask { selectSimple(RPeriodicTask.Columns.all, RPeriodicTask.table, where) ++ order sql.query[RPeriodicTask].streamWithChunkSize(2).take(1).compile.last } - - def findNonFinal(pid: Ident): ConnectionIO[Option[RJob]] = - selectSimple( - RJob.Columns.all, - RJob.table, - and( - RJob.Columns.tracker.is(pid), - RJob.Columns.state.isOneOf(JobState.all.diff(JobState.done).toSeq) - ) - ).query[RJob].option - } diff --git a/modules/store/src/main/scala/docspell/store/queue/JobQueue.scala b/modules/store/src/main/scala/docspell/store/queue/JobQueue.scala index e77fa755..2fa60d91 100644 --- a/modules/store/src/main/scala/docspell/store/queue/JobQueue.scala +++ b/modules/store/src/main/scala/docspell/store/queue/JobQueue.scala @@ -11,8 +11,19 @@ import org.log4s._ trait JobQueue[F[_]] { + /** Inserts the job into the queue to get picked up as soon as + * possible. The job must have a new unique id. + */ def insert(job: RJob): F[Unit] + /** Inserts the job into the queue only, if there is no job with the + * same tracker-id running at the moment. The job id must be a new + * unique id. + * + * If the job has no tracker defined, it is simply inserted. + */ + def insertIfNew(job: RJob): F[Unit] + def insertAll(jobs: Seq[RJob]): F[Unit] def nextJob( @@ -46,6 +57,19 @@ object JobQueue { else ().pure[F] } + def insertIfNew(job: RJob): F[Unit] = + for { + rj <- job.tracker match { + case Some(tid) => + store.transact(RJob.findNonFinalByTracker(tid)) + case None => + None.pure[F] + } + ret <- + if (rj.isDefined) ().pure[F] + else insert(job) + } yield ret + def insertAll(jobs: Seq[RJob]): F[Unit] = jobs.toList .traverse(j => insert(j).attempt) diff --git a/modules/store/src/main/scala/docspell/store/queue/PeriodicTaskStore.scala b/modules/store/src/main/scala/docspell/store/queue/PeriodicTaskStore.scala index c94458bb..16350602 100644 --- a/modules/store/src/main/scala/docspell/store/queue/PeriodicTaskStore.scala +++ b/modules/store/src/main/scala/docspell/store/queue/PeriodicTaskStore.scala @@ -100,7 +100,7 @@ object PeriodicTaskStore { } def findNonFinalJob(pjobId: Ident): F[Option[RJob]] = - store.transact(QPeriodicTask.findNonFinal(pjobId)) + store.transact(RJob.findNonFinalByTracker(pjobId)) def insert(task: RPeriodicTask): F[Unit] = { val update = store.transact(RPeriodicTask.update(task)) diff --git a/modules/store/src/main/scala/docspell/store/records/RJob.scala b/modules/store/src/main/scala/docspell/store/records/RJob.scala index 9a286743..9f607c5c 100644 --- a/modules/store/src/main/scala/docspell/store/records/RJob.scala +++ b/modules/store/src/main/scala/docspell/store/records/RJob.scala @@ -255,4 +255,12 @@ object RJob { .map(_ => 1) .compile .foldMonoid + + def findNonFinalByTracker(trackerId: Ident): ConnectionIO[Option[RJob]] = + selectSimple( + all, + table, + and(tracker.is(trackerId), state.isOneOf(JobState.all.diff(JobState.done).toSeq)) + ).query[RJob].option + }