mirror of
https://github.com/TheAnachronism/docspell.git
synced 2025-06-21 09:58:26 +00:00
Index exsiting data in solr
This commit is contained in:
@ -10,6 +10,11 @@ sealed trait TextData {
|
||||
|
||||
def collective: Ident
|
||||
|
||||
final def fold[A](f: TextData.Attachment => A, g: TextData.Item => A): A =
|
||||
this match {
|
||||
case a: TextData.Attachment => f(a)
|
||||
case a: TextData.Item => g(a)
|
||||
}
|
||||
}
|
||||
|
||||
object TextData {
|
||||
|
@ -0,0 +1,41 @@
|
||||
package docspell.ftssolr
|
||||
|
||||
import docspell.common._
|
||||
import docspell.ftsclient._
|
||||
import io.circe._
|
||||
|
||||
trait JsonCodec {
|
||||
|
||||
implicit def attachmentEncoder: Encoder[TextData.Attachment] =
|
||||
new Encoder[TextData.Attachment] {
|
||||
final def apply(td: TextData.Attachment): Json = Json.obj(
|
||||
("id", Ident.encodeIdent(td.id)),
|
||||
("item", Ident.encodeIdent(td.item)),
|
||||
("collective", Ident.encodeIdent(td.collective)),
|
||||
("attachmentName", Json.fromString(td.name.getOrElse(""))),
|
||||
("content", Json.fromString(td.text.getOrElse(""))),
|
||||
("discriminator", Json.fromString("attachment"))
|
||||
)
|
||||
}
|
||||
|
||||
implicit def itemEncoder: Encoder[TextData.Item] =
|
||||
new Encoder[TextData.Item] {
|
||||
final def apply(td: TextData.Item): Json = Json.obj(
|
||||
("id", Ident.encodeIdent(td.id)),
|
||||
("item", Ident.encodeIdent(td.item)),
|
||||
("collective", Ident.encodeIdent(td.collective)),
|
||||
("itemName", Json.fromString(td.name.getOrElse(""))),
|
||||
("itemNotes", Json.fromString(td.notes.getOrElse(""))),
|
||||
("discriminator", Json.fromString("item"))
|
||||
)
|
||||
}
|
||||
|
||||
|
||||
implicit def textDataEncoder(implicit
|
||||
ae: Encoder[TextData.Attachment],
|
||||
ie: Encoder[TextData.Item]
|
||||
): Encoder[TextData] =
|
||||
Encoder(_.fold(ae.apply, ie.apply))
|
||||
}
|
||||
|
||||
object JsonCodec extends JsonCodec
|
@ -11,10 +11,8 @@ import docspell.ftsclient._
|
||||
import docspell.ftsclient.FtsBasicResult._
|
||||
|
||||
final class SolrFtsClient[F[_]: Effect](
|
||||
cfg: SolrConfig,
|
||||
client: Client[F]
|
||||
solrUpdate: SolrUpdate[F]
|
||||
) extends FtsClient[F] {
|
||||
println(s"$client $cfg")
|
||||
|
||||
def initialize: F[Unit] =
|
||||
().pure[F]
|
||||
@ -34,7 +32,16 @@ final class SolrFtsClient[F[_]: Effect](
|
||||
)
|
||||
|
||||
def indexData(logger: Logger[F], data: Stream[F, TextData]): F[Unit] =
|
||||
logger.info("Inserting lots of data into index")
|
||||
(for {
|
||||
_ <- Stream.eval(logger.debug("Inserting data into index"))
|
||||
chunks <- data.chunks
|
||||
res <- Stream.eval(solrUpdate.many(chunks.toList).attempt)
|
||||
_ <- res match {
|
||||
case Right(()) => Stream.emit(())
|
||||
case Left(ex) =>
|
||||
Stream.eval(logger.error(ex)("Error inserting chunk of data into index"))
|
||||
}
|
||||
} yield ()).compile.drain
|
||||
|
||||
}
|
||||
|
||||
@ -44,6 +51,8 @@ object SolrFtsClient {
|
||||
cfg: SolrConfig,
|
||||
httpClient: Client[F]
|
||||
): Resource[F, FtsClient[F]] =
|
||||
Resource.pure[F, FtsClient[F]](new SolrFtsClient(cfg, httpClient))
|
||||
Resource.pure[F, FtsClient[F]](
|
||||
new SolrFtsClient(SolrUpdate(cfg, httpClient))
|
||||
)
|
||||
|
||||
}
|
||||
|
@ -0,0 +1,49 @@
|
||||
package docspell.ftssolr
|
||||
|
||||
import cats.effect._
|
||||
import org.http4s._
|
||||
import cats.implicits._
|
||||
import org.http4s.client.Client
|
||||
import org.http4s.circe._
|
||||
import org.http4s.client.dsl.Http4sClientDsl
|
||||
import _root_.io.circe.syntax._
|
||||
import org.log4s.getLogger
|
||||
|
||||
import docspell.ftsclient._
|
||||
import JsonCodec._
|
||||
|
||||
trait SolrUpdate[F[_]] {
|
||||
|
||||
def single(td: TextData): F[Unit]
|
||||
|
||||
def many(tds: List[TextData]): F[Unit]
|
||||
|
||||
}
|
||||
|
||||
object SolrUpdate {
|
||||
private[this] val logger = getLogger
|
||||
|
||||
def apply[F[_]: ConcurrentEffect](cfg: SolrConfig, client: Client[F]): SolrUpdate[F] = {
|
||||
val dsl = new Http4sClientDsl[F] {}
|
||||
import dsl._
|
||||
|
||||
new SolrUpdate[F] {
|
||||
val url = (Uri.unsafeFromString(cfg.url.asString) / "update")
|
||||
.withQueryParam("commitWithin", "1000")
|
||||
.withQueryParam("overwrite", "true")
|
||||
.withQueryParam("wt", "json")
|
||||
|
||||
def single(td: TextData): F[Unit] = {
|
||||
val req = Method.POST(td.asJson, url)
|
||||
logger.debug(s"Running request $req")
|
||||
client.expect[String](req).map(r => logger.debug(s"Response: $r"))
|
||||
}
|
||||
|
||||
def many(tds: List[TextData]): F[Unit] = {
|
||||
val req = Method.POST(tds.asJson, url)
|
||||
logger.debug(s"Running request $req")
|
||||
client.expect[String](req).map(r => logger.debug(s"Response: $r"))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
@ -6,5 +6,5 @@ CREATE TABLE "fts_migration" (
|
||||
"created" timestamp not null
|
||||
);
|
||||
|
||||
CREATE UNIQE INDEX "fts_migration_version_engine_idx"
|
||||
CREATE UNIQUE INDEX "fts_migration_version_engine_idx"
|
||||
ON "fts_migration"("version", "fts_engine");
|
||||
|
Reference in New Issue
Block a user