diff --git a/build.sbt b/build.sbt index 7a837d9c..cba02caf 100644 --- a/build.sbt +++ b/build.sbt @@ -619,6 +619,20 @@ val ftssolr = project ) .dependsOn(common, ftsclient) +val ftspsql = project + .in(file("modules/fts-psql")) + .disablePlugins(RevolverPlugin) + .settings(sharedSettings) + .withTestSettings + .settings( + name := "docspell-fts-psql", + libraryDependencies ++= + Dependencies.doobie ++ + Dependencies.postgres ++ + Dependencies.flyway + ) + .dependsOn(common, ftsclient, store % "compile->test;test->test") + val restapi = project .in(file("modules/restapi")) .disablePlugins(RevolverPlugin) @@ -769,6 +783,7 @@ val joex = project joexapi, restapi, ftssolr, + ftspsql, pubsubNaive, notificationImpl, schedulerImpl @@ -841,6 +856,7 @@ val restserver = project backend, webapp, ftssolr, + ftspsql, oidc, pubsubNaive, notificationImpl, @@ -926,6 +942,7 @@ val root = project analysis, ftsclient, ftssolr, + ftspsql, files, store, joexapi, diff --git a/modules/fts-psql/src/main/resources/db/psqlfts/V2.0.0__initial_schema.sql b/modules/fts-psql/src/main/resources/db/psqlfts/V2.0.0__initial_schema.sql new file mode 100644 index 00000000..cad4ef9a --- /dev/null +++ b/modules/fts-psql/src/main/resources/db/psqlfts/V2.0.0__initial_schema.sql @@ -0,0 +1,27 @@ +create table "ftspsql_search"( + "id" varchar(254) not null primary key, + "item_id" varchar(254) not null, + "collective" varchar(254) not null, + "lang" varchar(254) not null, + "attach_id" varchar(254), + "folder_id" varchar(254), + "updated_at" timestamptz not null default current_timestamp, + --- content columns + "attach_name" text, + "attach_content" text, + "item_name" text, + "item_notes" text, + --- index column + "fts_config" regconfig not null, + "text_index" tsvector + generated always as ( + setweight(to_tsvector("fts_config", coalesce("attach_name", '')), 'B') || + setweight(to_tsvector("fts_config", coalesce("item_name", '')), 'B') || + setweight(to_tsvector("fts_config", coalesce("attach_content", '')), 'C') || + setweight(to_tsvector("fts_config", coalesce("item_notes", '')), 'C')) stored +); + +create index "ftspsql_search_ftsidx" on "ftspsql_search" using GIN ("text_index"); +create index "ftpsql_search_item_idx" on "ftspsql_search"("item_id"); +create index "ftpsql_search_attach_idx" on "ftspsql_search"("attach_id"); +create index "ftpsql_search_folder_idx" on "ftspsql_search"("folder_id"); diff --git a/modules/fts-psql/src/main/scala/docspell/ftspsql/DbMigration.scala b/modules/fts-psql/src/main/scala/docspell/ftspsql/DbMigration.scala new file mode 100644 index 00000000..b5ce5fd3 --- /dev/null +++ b/modules/fts-psql/src/main/scala/docspell/ftspsql/DbMigration.scala @@ -0,0 +1,36 @@ +package docspell.ftspsql + +import cats.effect._ +import cats.implicits._ +import org.flywaydb.core.Flyway +import org.flywaydb.core.api.output.MigrateResult + +final class DbMigration[F[_]: Sync](cfg: PsqlConfig) { + private[this] val logger = docspell.logging.getLogger[F] + private val location: String = "classpath:db/psqlfts" + + def run: F[MigrateResult] = + for { + fw <- createFlyway + _ <- logger.info(s"Running FTS migrations") + result <- Sync[F].blocking(fw.migrate()) + } yield result + + def createFlyway: F[Flyway] = + for { + _ <- logger.info(s"Creating Flyway for: $location") + fw = Flyway + .configure() + .table("flyway_fts_history") + .cleanDisabled(true) + .dataSource(cfg.url.asString, cfg.user, cfg.password.pass) + .locations(location) + .baselineOnMigrate(true) + .load() + } yield fw +} + +object DbMigration { + def apply[F[_]: Sync](cfg: PsqlConfig): DbMigration[F] = + new DbMigration[F](cfg) +} diff --git a/modules/fts-psql/src/main/scala/docspell/ftspsql/DoobieMeta.scala b/modules/fts-psql/src/main/scala/docspell/ftspsql/DoobieMeta.scala new file mode 100644 index 00000000..60302f37 --- /dev/null +++ b/modules/fts-psql/src/main/scala/docspell/ftspsql/DoobieMeta.scala @@ -0,0 +1,26 @@ +package docspell.ftspsql + +import docspell.common._ +import doobie._ +import doobie.util.log.Success + +trait DoobieMeta { + + implicit val sqlLogging: LogHandler = LogHandler { + case e @ Success(_, _, _, _) => + DoobieMeta.logger.trace("SQL " + e) + case e => + DoobieMeta.logger.error(s"SQL Failure: $e") + } + + implicit val metaIdent: Meta[Ident] = + Meta[String].timap(Ident.unsafe)(_.id) + + implicit val metaLanguage: Meta[Language] = + Meta[String].timap(Language.unsafe)(_.iso3) + +} + +object DoobieMeta { + private val logger = org.log4s.getLogger +} diff --git a/modules/fts-psql/src/main/scala/docspell/ftspsql/FtsRecord.scala b/modules/fts-psql/src/main/scala/docspell/ftspsql/FtsRecord.scala new file mode 100644 index 00000000..2036923c --- /dev/null +++ b/modules/fts-psql/src/main/scala/docspell/ftspsql/FtsRecord.scala @@ -0,0 +1,58 @@ +package docspell.ftspsql + +import cats.syntax.all._ +import docspell.common.{Ident, Language} +import docspell.ftsclient.TextData + +final case class FtsRecord( + id: String, + itemId: Ident, + collective: Ident, + language: Language, + attachId: Option[Ident], + folderId: Option[Ident], + attachName: Option[String], + attachContent: Option[String], + itemName: Option[String], + itemNotes: Option[String] +) + +object FtsRecord { + def fromTextData(td: TextData): FtsRecord = + td match { + case TextData.Attachment( + item, + attachId, + collective, + folder, + language, + name, + text + ) => + FtsRecord( + td.id.id, + item, + collective, + language, + attachId.some, + folder, + name, + text, + None, + None + ) + case TextData.Item(item, collective, folder, name, notes, language) => + FtsRecord( + td.id.id, + item, + collective, + language, + None, + folder, + None, + None, + name, + notes + ) + } +} diff --git a/modules/fts-psql/src/main/scala/docspell/ftspsql/FtsRepository.scala b/modules/fts-psql/src/main/scala/docspell/ftspsql/FtsRepository.scala new file mode 100644 index 00000000..251bcdc9 --- /dev/null +++ b/modules/fts-psql/src/main/scala/docspell/ftspsql/FtsRepository.scala @@ -0,0 +1,178 @@ +package docspell.ftspsql + +import cats.data.NonEmptyList +import docspell.common._ +import docspell.ftsclient.FtsQuery +import doobie._ +import doobie.implicits._ +import fs2.Chunk + +object FtsRepository extends DoobieMeta { + val table = fr"ftspsql_search" + + def searchSummary(q: FtsQuery): ConnectionIO[SearchSummary] = { + val selectRank = mkSelectRank + val query = mkQueryPart(q) + + sql"""select count(id), max($selectRank) + |from $table, $query + |where ${mkCondition(q)} AND query @@ text_index + |""".stripMargin + .query[SearchSummary] + .unique + } + + def search( + q: FtsQuery, + withHighlighting: Boolean + ): ConnectionIO[Vector[SearchResult]] = { + val selectRank = mkSelectRank + + val hlOption = + s"startsel=${q.highlight.pre},stopsel=${q.highlight.post}" + + val selectHl = + if (!withHighlighting) fr"null as highlight" + else + fr"""ts_headline( + | fts_config, + | coalesce(attach_name, '') || + | ' ' || coalesce(attach_content, '') || + | ' ' || coalesce(item_name, '') || + | ' ' || coalesce(item_notes, ''), query, $hlOption) as highlight""".stripMargin + + val select = + fr"id, item_id, collective, lang, attach_id, folder_id, attach_name, item_name, $selectRank as rank, $selectHl" + + val query = mkQueryPart(q) + + sql"""select $select + |from $table, $query + |where ${mkCondition(q)} AND query @@ text_index + |order by rank desc + |limit ${q.limit} + |offset ${q.offset} + |""".stripMargin + .query[SearchResult] + .to[Vector] + } + + private def mkCondition(q: FtsQuery): Fragment = { + val coll = fr"collective = ${q.collective}" + val items = + NonEmptyList.fromList(q.items.toList).map { nel => + val ids = nel.map(id => fr"$id").reduceLeft(_ ++ fr"," ++ _) + fr"item_id in ($ids)" + } + + val folders = + NonEmptyList.fromList(q.folders.toList).map { nel => + val ids = nel.map(id => fr"$id").reduceLeft(_ ++ fr"," ++ _) + fr"folder_id in ($ids)" + } + + List(items, folders).flatten.foldLeft(coll)(_ ++ fr"AND" ++ _) + } + + private def mkQueryPart(q: FtsQuery): Fragment = + fr"websearch_to_tsquery(fts_config, ${q.q}) query" + + private def mkSelectRank: Fragment = + fr"ts_rank_cd(text_index, query, 4)" + + def replaceChunk(r: Chunk[FtsRecord]): ConnectionIO[Int] = + r.traverse(replace).map(_.foldLeft(0)(_ + _)) + + def replace(r: FtsRecord): ConnectionIO[Int] = + (fr"INSERT INTO $table (id,item_id,collective,lang,attach_id,folder_id,attach_name,attach_content,item_name,item_notes,fts_config) VALUES (" ++ + commas( + sql"${r.id}", + sql"${r.itemId}", + sql"${r.collective}", + sql"${r.language}", + sql"${r.attachId}", + sql"${r.folderId}", + sql"${r.attachName}", + sql"${r.attachContent}", + sql"${r.itemName}", + sql"${r.itemNotes}", + sql"${pgConfig(r.language)}::regconfig" + ) ++ sql") on conflict (id) do update set " ++ commas( + sql"lang = ${r.language}", + sql"folder_id = ${r.folderId}", + sql"attach_name = ${r.attachName}", + sql"attach_content = ${r.attachContent}", + sql"item_name = ${r.itemName}", + sql"item_notes = ${r.itemNotes}", + sql"fts_config = ${pgConfig(r.language)}::regconfig" + )).update.run + + def update(r: FtsRecord): ConnectionIO[Int] = + (fr"UPDATE $table SET" ++ commas( + sql"lang = ${r.language}", + sql"folder_id = ${r.folderId}", + sql"attach_name = ${r.attachName}", + sql"attach_content = ${r.attachContent}", + sql"item_name = ${r.itemName}", + sql"item_notes = ${r.itemNotes}", + sql"fts_config = ${pgConfig(r.language)}::regconfig" + ) ++ fr"WHERE id = ${r.id}").update.run + + def updateChunk(r: Chunk[FtsRecord]): ConnectionIO[Int] = + r.traverse(update).map(_.foldLeft(0)(_ + _)) + + def updateFolder( + itemId: Ident, + collective: Ident, + folder: Option[Ident] + ): ConnectionIO[Int] = + (sql"UPDATE $table" ++ + fr"SET folder_id = $folder" ++ + fr"WHERE item_id = $itemId AND collective = $collective").update.run + + def deleteByItemId(itemId: Ident): ConnectionIO[Int] = + sql"DELETE FROM $table WHERE item_id = $itemId".update.run + + def deleteByAttachId(attachId: Ident): ConnectionIO[Int] = + sql"DELETE FROM $table WHERE attach_id = $attachId".update.run + + def deleteAll: ConnectionIO[Int] = + sql"DELETE FROM $table".update.run + + def delete(collective: Ident): ConnectionIO[Int] = + sql"DELETE FROM $table WHERE collective = $collective".update.run + + def resetAll: ConnectionIO[Int] = { + val dropFlyway = sql"DROP TABLE IF EXISTS flyway_fts_history".update.run + val dropSearch = sql"DROP TABLE IF EXISTS $table".update.run + for { + a <- dropFlyway + b <- dropSearch + } yield a + b + } + + private def commas(fr: Fragment, frn: Fragment*): Fragment = + frn.foldLeft(fr)(_ ++ fr"," ++ _) + + def pgConfig(language: Language): String = + language match { + case Language.English => "english" + case Language.German => "german" + case Language.French => "french" + case Language.Italian => "italian" + case Language.Spanish => "spanish" + case Language.Hungarian => "hungarian" + case Language.Portuguese => "portuguese" + case Language.Czech => "simple" // ? + case Language.Danish => "danish" + case Language.Finnish => "finnish" + case Language.Norwegian => "norwegian" + case Language.Swedish => "swedish" + case Language.Russian => "russian" + case Language.Romanian => "romanian" + case Language.Dutch => "dutch" + case Language.Latvian => "lithuanian" // ? + case Language.Japanese => "simple" + case Language.Hebrew => "simple" + } +} diff --git a/modules/fts-psql/src/main/scala/docspell/ftspsql/PsqlConfig.scala b/modules/fts-psql/src/main/scala/docspell/ftspsql/PsqlConfig.scala new file mode 100644 index 00000000..136f919f --- /dev/null +++ b/modules/fts-psql/src/main/scala/docspell/ftspsql/PsqlConfig.scala @@ -0,0 +1,5 @@ +package docspell.ftspsql + +import docspell.common.{LenientUri, Password} + +case class PsqlConfig(url: LenientUri, user: String, password: Password) diff --git a/modules/fts-psql/src/main/scala/docspell/ftspsql/PsqlFtsClient.scala b/modules/fts-psql/src/main/scala/docspell/ftspsql/PsqlFtsClient.scala new file mode 100644 index 00000000..f16f170d --- /dev/null +++ b/modules/fts-psql/src/main/scala/docspell/ftspsql/PsqlFtsClient.scala @@ -0,0 +1,131 @@ +package docspell.ftspsql + +import cats.effect._ +import cats.implicits._ +import com.zaxxer.hikari.HikariDataSource +import docspell.common._ +import docspell.ftsclient._ +import docspell.logging.Logger +import doobie._ +import doobie.hikari.HikariTransactor +import doobie.implicits._ +import fs2.Stream + +import scala.concurrent.ExecutionContext + +final class PsqlFtsClient[F[_]: Sync](cfg: PsqlConfig, xa: Transactor[F]) + extends FtsClient[F] { + val engine = Ident.unsafe("postgres") + + def initialize: F[List[FtsMigration[F]]] = + Sync[F].pure( + List( + FtsMigration( + 0, + engine, + "initialize", + DbMigration[F](cfg).run.as(FtsMigration.Result.WorkDone) + ) + ) + ) + + def initializeNew: List[FtsMigration[F]] = + List( + FtsMigration( + 10, + engine, + "reset", + FtsRepository.resetAll.transact(xa).as(FtsMigration.Result.workDone) + ), + FtsMigration( + 20, + engine, + "schema", + DbMigration[F](cfg).run.as(FtsMigration.Result.workDone) + ), + FtsMigration(20, engine, "index all", FtsMigration.Result.indexAll.pure[F]) + ) + + def search(q: FtsQuery): F[FtsResult] = + for { + startNanos <- Sync[F].delay(System.nanoTime()) + summary <- FtsRepository.searchSummary(q).transact(xa) + results <- FtsRepository.search(q, true).transact(xa) + endNanos <- Sync[F].delay(System.nanoTime()) + duration = Duration.nanos(endNanos - startNanos) + res = SearchResult + .toFtsResult(summary, results) + .copy(qtime = duration) + } yield res + + def indexData(logger: Logger[F], data: Stream[F, TextData]): F[Unit] = + data + .map(FtsRecord.fromTextData) + .chunkN(50) + .evalMap(chunk => + logger.debug(s"Update fts index with ${chunk.size} records") *> FtsRepository + .replaceChunk(chunk) + .transact(xa) + ) + .compile + .drain + + def updateIndex(logger: Logger[F], data: Stream[F, TextData]): F[Unit] = + data + .map(FtsRecord.fromTextData) + .chunkN(50) + .evalMap(chunk => FtsRepository.updateChunk(chunk).transact(xa)) + .compile + .drain + + def updateFolder( + logger: Logger[F], + itemId: Ident, + collective: Ident, + folder: Option[Ident] + ): F[Unit] = + logger.debug(s"Update folder '${folder + .map(_.id)}' in fts for collective ${collective.id} and item ${itemId.id}") *> + FtsRepository.updateFolder(itemId, collective, folder).transact(xa).void + + def removeItem(logger: Logger[F], itemId: Ident): F[Unit] = + logger.debug(s"Removing item from fts index: ${itemId.id}") *> + FtsRepository.deleteByItemId(itemId).transact(xa).void + + def removeAttachment(logger: Logger[F], attachId: Ident): F[Unit] = + logger.debug(s"Removing attachment from fts index: ${attachId.id}") *> + FtsRepository.deleteByAttachId(attachId).transact(xa).void + + def clearAll(logger: Logger[F]): F[Unit] = + logger.info(s"Deleting complete FTS index") *> + FtsRepository.deleteAll.transact(xa).void + + def clear(logger: Logger[F], collective: Ident): F[Unit] = + logger.info(s"Deleting index for collective ${collective.id}") *> + FtsRepository.delete(collective).transact(xa).void +} + +object PsqlFtsClient { + def apply[F[_]: Async]( + cfg: PsqlConfig, + connectEC: ExecutionContext + ): Resource[F, PsqlFtsClient[F]] = { + val acquire = Sync[F].delay(new HikariDataSource()) + val free: HikariDataSource => F[Unit] = ds => Sync[F].delay(ds.close()) + + for { + ds <- Resource.make(acquire)(free) + _ = Resource.pure { + ds.setJdbcUrl(cfg.url.asString) + ds.setUsername(cfg.user) + ds.setPassword(cfg.password.pass) + ds.setDriverClassName("org.postgresql.Driver") + } + xa = HikariTransactor[F](ds, connectEC) + + pc = new PsqlFtsClient[F](cfg, xa) + // _ <- Resource.eval(st.migrate) + } yield pc + } + +} diff --git a/modules/fts-psql/src/main/scala/docspell/ftspsql/SearchResult.scala b/modules/fts-psql/src/main/scala/docspell/ftspsql/SearchResult.scala new file mode 100644 index 00000000..8d2fdab6 --- /dev/null +++ b/modules/fts-psql/src/main/scala/docspell/ftspsql/SearchResult.scala @@ -0,0 +1,47 @@ +package docspell.ftspsql + +import docspell.common._ +import docspell.ftsclient.FtsResult.{ItemMatch, MatchData} +import docspell.ftsclient.FtsResult + +final case class SearchResult( + id: Ident, + itemId: Ident, + collective: Ident, + language: Language, + attachId: Option[Ident], + folderId: Option[Ident], + attachName: Option[String], + itemName: Option[String], + rank: Double, + highlight: Option[String] +) + +object SearchResult { + + def toFtsResult(summary: SearchSummary, results: Vector[SearchResult]): FtsResult = { + def mkEntry(r: SearchResult): (ItemMatch, (Ident, List[String])) = { + def create(md: MatchData) = ItemMatch(r.id, r.itemId, r.collective, r.rank, md) + + val itemMatch = + r.attachId match { + case Some(aId) => + create(FtsResult.AttachmentData(aId, r.attachName.getOrElse(""))) + case None => + create(FtsResult.ItemData) + } + + (itemMatch, r.id -> r.highlight.toList) + } + + val (items, hl) = results.map(mkEntry).unzip + + FtsResult( + Duration.zero, + summary.count.toInt, + summary.maxScore, + hl.toMap, + items.toList + ) + } +} diff --git a/modules/fts-psql/src/main/scala/docspell/ftspsql/SearchSummary.scala b/modules/fts-psql/src/main/scala/docspell/ftspsql/SearchSummary.scala new file mode 100644 index 00000000..3e4e838c --- /dev/null +++ b/modules/fts-psql/src/main/scala/docspell/ftspsql/SearchSummary.scala @@ -0,0 +1,3 @@ +package docspell.ftspsql + +case class SearchSummary(count: Long, maxScore: Double) diff --git a/modules/fts-psql/src/test/scala/docspell/ftspsql/MigrationTest.scala b/modules/fts-psql/src/test/scala/docspell/ftspsql/MigrationTest.scala new file mode 100644 index 00000000..b21c9368 --- /dev/null +++ b/modules/fts-psql/src/test/scala/docspell/ftspsql/MigrationTest.scala @@ -0,0 +1,31 @@ +package docspell.ftspsql + +import cats.effect._ +import cats.effect.unsafe.implicits._ +import docspell.logging.{Level, LogConfig} +//import cats.implicits._ +import com.dimafeng.testcontainers.PostgreSQLContainer +import com.dimafeng.testcontainers.munit.TestContainerForAll +import docspell.common._ +import docspell.logging.TestLoggingConfig +import munit.FunSuite +import org.testcontainers.utility.DockerImageName + +class MigrationTest extends FunSuite with TestContainerForAll with TestLoggingConfig { + override val containerDef: PostgreSQLContainer.Def = + PostgreSQLContainer.Def(DockerImageName.parse("postgres:14")) + + override def docspellLogConfig: LogConfig = + LogConfig(Level.Debug, LogConfig.Format.Fancy) + + override def rootMinimumLevel = Level.Warn + + test("create schema") { + withContainers { cnt => + val jdbc = + PsqlConfig(LenientUri.unsafe(cnt.jdbcUrl), cnt.username, Password(cnt.password)) + + new DbMigration[IO](jdbc).run.void.unsafeRunSync() + } + } +} diff --git a/modules/fts-solr/src/main/scala/docspell/ftssolr/JsonCodec.scala b/modules/fts-solr/src/main/scala/docspell/ftssolr/JsonCodec.scala index 31c093d9..0d972749 100644 --- a/modules/fts-solr/src/main/scala/docspell/ftssolr/JsonCodec.scala +++ b/modules/fts-solr/src/main/scala/docspell/ftssolr/JsonCodec.scala @@ -22,7 +22,7 @@ trait JsonCodec { new Encoder[TextData.Attachment] { final def apply(td: TextData.Attachment): Json = { val cnt = - (Field.contentField(td.lang).name, Json.fromString(td.text.getOrElse(""))) + (Field.contentField(td.language).name, Json.fromString(td.text.getOrElse(""))) Json.fromFields( cnt :: List( @@ -165,7 +165,7 @@ trait JsonCodec { val setter = List( td.name.map(n => (Field.attachmentName.name, Map("set" -> n.asJson).asJson)), td.text.map(txt => - (Field.contentField(td.lang).name, Map("set" -> txt.asJson).asJson) + (Field.contentField(td.language).name, Map("set" -> txt.asJson).asJson) ) ).flatten Json.fromFields( diff --git a/modules/joex/src/main/scala/docspell/joex/JoexTasks.scala b/modules/joex/src/main/scala/docspell/joex/JoexTasks.scala index 334943f7..c6ab41f4 100644 --- a/modules/joex/src/main/scala/docspell/joex/JoexTasks.scala +++ b/modules/joex/src/main/scala/docspell/joex/JoexTasks.scala @@ -7,13 +7,13 @@ package docspell.joex import cats.effect.{Async, Resource} - import docspell.analysis.TextAnalyser import docspell.backend.fulltext.CreateIndex import docspell.backend.ops._ import docspell.common._ import docspell.ftsclient.FtsClient -import docspell.ftssolr.SolrFtsClient +import docspell.ftspsql.{PsqlConfig, PsqlFtsClient} +//import docspell.ftssolr.SolrFtsClient import docspell.joex.analysis.RegexNerFile import docspell.joex.emptytrash.EmptyTrashTask import docspell.joex.filecopy.{FileCopyTask, FileIntegrityCheckTask} @@ -33,7 +33,6 @@ import docspell.pubsub.api.PubSubT import docspell.scheduler.impl.JobStoreModuleBuilder import docspell.scheduler.{JobStoreModule, JobTask, JobTaskRegistry} import docspell.store.Store - import emil.Emil import org.http4s.client.Client @@ -221,7 +220,7 @@ object JoexTasks { joex <- OJoex(pubSub) store = jobStoreModule.store upload <- OUpload(store, jobStoreModule.jobs) - fts <- createFtsClient(cfg)(httpClient) + fts <- createFtsClient(cfg, store) createIndex <- CreateIndex.resource(fts, store) itemOps <- OItem(store, fts, createIndex, jobStoreModule.jobs) itemSearchOps <- OItemSearch(store) @@ -249,8 +248,17 @@ object JoexTasks { ) private def createFtsClient[F[_]: Async]( - cfg: Config - )(client: Client[F]): Resource[F, FtsClient[F]] = - if (cfg.fullTextSearch.enabled) SolrFtsClient(cfg.fullTextSearch.solr, client) + cfg: Config, + store: Store[F] /*, + client: Client[F] */ + ): Resource[F, FtsClient[F]] = + // if (cfg.fullTextSearch.enabled) SolrFtsClient(cfg.fullTextSearch.solr, client) + if (cfg.fullTextSearch.enabled) + Resource.pure[F, FtsClient[F]]( + new PsqlFtsClient[F]( + PsqlConfig(cfg.jdbc.url, cfg.jdbc.user, Password(cfg.jdbc.password)), + store.transactor + ) + ) else Resource.pure[F, FtsClient[F]](FtsClient.none[F]) } diff --git a/modules/restserver/src/main/scala/docspell/restserver/RestAppImpl.scala b/modules/restserver/src/main/scala/docspell/restserver/RestAppImpl.scala index 7e6c7025..6016afb6 100644 --- a/modules/restserver/src/main/scala/docspell/restserver/RestAppImpl.scala +++ b/modules/restserver/src/main/scala/docspell/restserver/RestAppImpl.scala @@ -9,11 +9,12 @@ package docspell.restserver import cats.effect._ import fs2.Stream import fs2.concurrent.Topic - import docspell.backend.BackendApp import docspell.backend.auth.{AuthToken, ShareToken} +import docspell.common.Password import docspell.ftsclient.FtsClient -import docspell.ftssolr.SolrFtsClient +import docspell.ftspsql.{PsqlConfig, PsqlFtsClient} +//import docspell.ftssolr.SolrFtsClient import docspell.notification.api.NotificationModule import docspell.notification.impl.NotificationModuleImpl import docspell.oidc.CodeFlowRoutes @@ -25,7 +26,6 @@ import docspell.restserver.webapp.{TemplateRoutes, Templates, WebjarRoutes} import docspell.restserver.ws.{OutputEvent, WebSocketRoutes} import docspell.scheduler.impl.JobStoreModuleBuilder import docspell.store.Store - import emil.javamail.JavaMailEmil import org.http4s.HttpRoutes import org.http4s.client.Client @@ -163,7 +163,7 @@ object RestAppImpl { val logger = docspell.logging.getLogger[F](s"restserver-${cfg.appId.id}") for { - ftsClient <- createFtsClient(cfg)(httpClient) + ftsClient <- createFtsClient(cfg, store) pubSubT = PubSubT(pubSub, logger) javaEmil = JavaMailEmil(cfg.backend.mailSettings) notificationMod <- Resource.eval( @@ -188,8 +188,21 @@ object RestAppImpl { } private def createFtsClient[F[_]: Async]( - cfg: Config - )(client: Client[F]): Resource[F, FtsClient[F]] = - if (cfg.fullTextSearch.enabled) SolrFtsClient(cfg.fullTextSearch.solr, client) + cfg: Config, + store: Store[F] /*, client: Client[F] */ + ): Resource[F, FtsClient[F]] = + // if (cfg.fullTextSearch.enabled) SolrFtsClient(cfg.fullTextSearch.solr, client) + if (cfg.fullTextSearch.enabled) + Resource.pure[F, FtsClient[F]]( + new PsqlFtsClient[F]( + PsqlConfig( + cfg.backend.jdbc.url, + cfg.backend.jdbc.user, + Password(cfg.backend.jdbc.password) + ), + store.transactor + ) + ) else Resource.pure[F, FtsClient[F]](FtsClient.none[F]) + } diff --git a/modules/store/src/main/scala/docspell/store/Store.scala b/modules/store/src/main/scala/docspell/store/Store.scala index b7f611e3..d41e83d2 100644 --- a/modules/store/src/main/scala/docspell/store/Store.scala +++ b/modules/store/src/main/scala/docspell/store/Store.scala @@ -34,6 +34,8 @@ trait Store[F[_]] { ): FileRepository[F] def add(insert: ConnectionIO[Int], exists: ConnectionIO[Boolean]): F[AddResult] + + def transactor: Transactor[F] } object Store { diff --git a/modules/store/src/main/scala/docspell/store/impl/StoreImpl.scala b/modules/store/src/main/scala/docspell/store/impl/StoreImpl.scala index 87703a8b..d68ef6e3 100644 --- a/modules/store/src/main/scala/docspell/store/impl/StoreImpl.scala +++ b/modules/store/src/main/scala/docspell/store/impl/StoreImpl.scala @@ -24,8 +24,9 @@ final class StoreImpl[F[_]: Async]( val fileRepo: FileRepository[F], jdbc: JdbcConfig, ds: DataSource, - xa: Transactor[F] + val transactor: Transactor[F] ) extends Store[F] { + private[this] val xa = transactor def createFileRepository( cfg: FileRepositoryConfig, diff --git a/modules/store/src/test/scala/docspell/store/migrate/PostgresqlMigrateTest.scala b/modules/store/src/test/scala/docspell/store/migrate/PostgresqlMigrateTest.scala index 1ba69f55..235b240e 100644 --- a/modules/store/src/test/scala/docspell/store/migrate/PostgresqlMigrateTest.scala +++ b/modules/store/src/test/scala/docspell/store/migrate/PostgresqlMigrateTest.scala @@ -23,7 +23,7 @@ class PostgresqlMigrateTest with TestContainerForAll with TestLoggingConfig { override val containerDef: PostgreSQLContainer.Def = - PostgreSQLContainer.Def(DockerImageName.parse("postgres:13")) + PostgreSQLContainer.Def(DockerImageName.parse("postgres:14")) test("postgres empty schema migration") { assume(Docker.existsUnsafe, "docker doesn't exist!")