Working poc of postgresql based fulltext search backend

This commit is contained in:
eikek 2022-03-20 21:44:32 +01:00
parent b2add008ed
commit 029335e607
17 changed files with 601 additions and 18 deletions

View File

@ -619,6 +619,20 @@ val ftssolr = project
)
.dependsOn(common, ftsclient)
val ftspsql = project
.in(file("modules/fts-psql"))
.disablePlugins(RevolverPlugin)
.settings(sharedSettings)
.withTestSettings
.settings(
name := "docspell-fts-psql",
libraryDependencies ++=
Dependencies.doobie ++
Dependencies.postgres ++
Dependencies.flyway
)
.dependsOn(common, ftsclient, store % "compile->test;test->test")
val restapi = project
.in(file("modules/restapi"))
.disablePlugins(RevolverPlugin)
@ -769,6 +783,7 @@ val joex = project
joexapi,
restapi,
ftssolr,
ftspsql,
pubsubNaive,
notificationImpl,
schedulerImpl
@ -841,6 +856,7 @@ val restserver = project
backend,
webapp,
ftssolr,
ftspsql,
oidc,
pubsubNaive,
notificationImpl,
@ -926,6 +942,7 @@ val root = project
analysis,
ftsclient,
ftssolr,
ftspsql,
files,
store,
joexapi,

View File

@ -0,0 +1,27 @@
create table "ftspsql_search"(
"id" varchar(254) not null primary key,
"item_id" varchar(254) not null,
"collective" varchar(254) not null,
"lang" varchar(254) not null,
"attach_id" varchar(254),
"folder_id" varchar(254),
"updated_at" timestamptz not null default current_timestamp,
--- content columns
"attach_name" text,
"attach_content" text,
"item_name" text,
"item_notes" text,
--- index column
"fts_config" regconfig not null,
"text_index" tsvector
generated always as (
setweight(to_tsvector("fts_config", coalesce("attach_name", '')), 'B') ||
setweight(to_tsvector("fts_config", coalesce("item_name", '')), 'B') ||
setweight(to_tsvector("fts_config", coalesce("attach_content", '')), 'C') ||
setweight(to_tsvector("fts_config", coalesce("item_notes", '')), 'C')) stored
);
create index "ftspsql_search_ftsidx" on "ftspsql_search" using GIN ("text_index");
create index "ftpsql_search_item_idx" on "ftspsql_search"("item_id");
create index "ftpsql_search_attach_idx" on "ftspsql_search"("attach_id");
create index "ftpsql_search_folder_idx" on "ftspsql_search"("folder_id");

View File

@ -0,0 +1,36 @@
package docspell.ftspsql
import cats.effect._
import cats.implicits._
import org.flywaydb.core.Flyway
import org.flywaydb.core.api.output.MigrateResult
final class DbMigration[F[_]: Sync](cfg: PsqlConfig) {
private[this] val logger = docspell.logging.getLogger[F]
private val location: String = "classpath:db/psqlfts"
def run: F[MigrateResult] =
for {
fw <- createFlyway
_ <- logger.info(s"Running FTS migrations")
result <- Sync[F].blocking(fw.migrate())
} yield result
def createFlyway: F[Flyway] =
for {
_ <- logger.info(s"Creating Flyway for: $location")
fw = Flyway
.configure()
.table("flyway_fts_history")
.cleanDisabled(true)
.dataSource(cfg.url.asString, cfg.user, cfg.password.pass)
.locations(location)
.baselineOnMigrate(true)
.load()
} yield fw
}
object DbMigration {
def apply[F[_]: Sync](cfg: PsqlConfig): DbMigration[F] =
new DbMigration[F](cfg)
}

View File

@ -0,0 +1,26 @@
package docspell.ftspsql
import docspell.common._
import doobie._
import doobie.util.log.Success
trait DoobieMeta {
implicit val sqlLogging: LogHandler = LogHandler {
case e @ Success(_, _, _, _) =>
DoobieMeta.logger.trace("SQL " + e)
case e =>
DoobieMeta.logger.error(s"SQL Failure: $e")
}
implicit val metaIdent: Meta[Ident] =
Meta[String].timap(Ident.unsafe)(_.id)
implicit val metaLanguage: Meta[Language] =
Meta[String].timap(Language.unsafe)(_.iso3)
}
object DoobieMeta {
private val logger = org.log4s.getLogger
}

View File

@ -0,0 +1,58 @@
package docspell.ftspsql
import cats.syntax.all._
import docspell.common.{Ident, Language}
import docspell.ftsclient.TextData
final case class FtsRecord(
id: String,
itemId: Ident,
collective: Ident,
language: Language,
attachId: Option[Ident],
folderId: Option[Ident],
attachName: Option[String],
attachContent: Option[String],
itemName: Option[String],
itemNotes: Option[String]
)
object FtsRecord {
def fromTextData(td: TextData): FtsRecord =
td match {
case TextData.Attachment(
item,
attachId,
collective,
folder,
language,
name,
text
) =>
FtsRecord(
td.id.id,
item,
collective,
language,
attachId.some,
folder,
name,
text,
None,
None
)
case TextData.Item(item, collective, folder, name, notes, language) =>
FtsRecord(
td.id.id,
item,
collective,
language,
None,
folder,
None,
None,
name,
notes
)
}
}

View File

@ -0,0 +1,178 @@
package docspell.ftspsql
import cats.data.NonEmptyList
import docspell.common._
import docspell.ftsclient.FtsQuery
import doobie._
import doobie.implicits._
import fs2.Chunk
object FtsRepository extends DoobieMeta {
val table = fr"ftspsql_search"
def searchSummary(q: FtsQuery): ConnectionIO[SearchSummary] = {
val selectRank = mkSelectRank
val query = mkQueryPart(q)
sql"""select count(id), max($selectRank)
|from $table, $query
|where ${mkCondition(q)} AND query @@ text_index
|""".stripMargin
.query[SearchSummary]
.unique
}
def search(
q: FtsQuery,
withHighlighting: Boolean
): ConnectionIO[Vector[SearchResult]] = {
val selectRank = mkSelectRank
val hlOption =
s"startsel=${q.highlight.pre},stopsel=${q.highlight.post}"
val selectHl =
if (!withHighlighting) fr"null as highlight"
else
fr"""ts_headline(
| fts_config,
| coalesce(attach_name, '') ||
| ' ' || coalesce(attach_content, '') ||
| ' ' || coalesce(item_name, '') ||
| ' ' || coalesce(item_notes, ''), query, $hlOption) as highlight""".stripMargin
val select =
fr"id, item_id, collective, lang, attach_id, folder_id, attach_name, item_name, $selectRank as rank, $selectHl"
val query = mkQueryPart(q)
sql"""select $select
|from $table, $query
|where ${mkCondition(q)} AND query @@ text_index
|order by rank desc
|limit ${q.limit}
|offset ${q.offset}
|""".stripMargin
.query[SearchResult]
.to[Vector]
}
private def mkCondition(q: FtsQuery): Fragment = {
val coll = fr"collective = ${q.collective}"
val items =
NonEmptyList.fromList(q.items.toList).map { nel =>
val ids = nel.map(id => fr"$id").reduceLeft(_ ++ fr"," ++ _)
fr"item_id in ($ids)"
}
val folders =
NonEmptyList.fromList(q.folders.toList).map { nel =>
val ids = nel.map(id => fr"$id").reduceLeft(_ ++ fr"," ++ _)
fr"folder_id in ($ids)"
}
List(items, folders).flatten.foldLeft(coll)(_ ++ fr"AND" ++ _)
}
private def mkQueryPart(q: FtsQuery): Fragment =
fr"websearch_to_tsquery(fts_config, ${q.q}) query"
private def mkSelectRank: Fragment =
fr"ts_rank_cd(text_index, query, 4)"
def replaceChunk(r: Chunk[FtsRecord]): ConnectionIO[Int] =
r.traverse(replace).map(_.foldLeft(0)(_ + _))
def replace(r: FtsRecord): ConnectionIO[Int] =
(fr"INSERT INTO $table (id,item_id,collective,lang,attach_id,folder_id,attach_name,attach_content,item_name,item_notes,fts_config) VALUES (" ++
commas(
sql"${r.id}",
sql"${r.itemId}",
sql"${r.collective}",
sql"${r.language}",
sql"${r.attachId}",
sql"${r.folderId}",
sql"${r.attachName}",
sql"${r.attachContent}",
sql"${r.itemName}",
sql"${r.itemNotes}",
sql"${pgConfig(r.language)}::regconfig"
) ++ sql") on conflict (id) do update set " ++ commas(
sql"lang = ${r.language}",
sql"folder_id = ${r.folderId}",
sql"attach_name = ${r.attachName}",
sql"attach_content = ${r.attachContent}",
sql"item_name = ${r.itemName}",
sql"item_notes = ${r.itemNotes}",
sql"fts_config = ${pgConfig(r.language)}::regconfig"
)).update.run
def update(r: FtsRecord): ConnectionIO[Int] =
(fr"UPDATE $table SET" ++ commas(
sql"lang = ${r.language}",
sql"folder_id = ${r.folderId}",
sql"attach_name = ${r.attachName}",
sql"attach_content = ${r.attachContent}",
sql"item_name = ${r.itemName}",
sql"item_notes = ${r.itemNotes}",
sql"fts_config = ${pgConfig(r.language)}::regconfig"
) ++ fr"WHERE id = ${r.id}").update.run
def updateChunk(r: Chunk[FtsRecord]): ConnectionIO[Int] =
r.traverse(update).map(_.foldLeft(0)(_ + _))
def updateFolder(
itemId: Ident,
collective: Ident,
folder: Option[Ident]
): ConnectionIO[Int] =
(sql"UPDATE $table" ++
fr"SET folder_id = $folder" ++
fr"WHERE item_id = $itemId AND collective = $collective").update.run
def deleteByItemId(itemId: Ident): ConnectionIO[Int] =
sql"DELETE FROM $table WHERE item_id = $itemId".update.run
def deleteByAttachId(attachId: Ident): ConnectionIO[Int] =
sql"DELETE FROM $table WHERE attach_id = $attachId".update.run
def deleteAll: ConnectionIO[Int] =
sql"DELETE FROM $table".update.run
def delete(collective: Ident): ConnectionIO[Int] =
sql"DELETE FROM $table WHERE collective = $collective".update.run
def resetAll: ConnectionIO[Int] = {
val dropFlyway = sql"DROP TABLE IF EXISTS flyway_fts_history".update.run
val dropSearch = sql"DROP TABLE IF EXISTS $table".update.run
for {
a <- dropFlyway
b <- dropSearch
} yield a + b
}
private def commas(fr: Fragment, frn: Fragment*): Fragment =
frn.foldLeft(fr)(_ ++ fr"," ++ _)
def pgConfig(language: Language): String =
language match {
case Language.English => "english"
case Language.German => "german"
case Language.French => "french"
case Language.Italian => "italian"
case Language.Spanish => "spanish"
case Language.Hungarian => "hungarian"
case Language.Portuguese => "portuguese"
case Language.Czech => "simple" // ?
case Language.Danish => "danish"
case Language.Finnish => "finnish"
case Language.Norwegian => "norwegian"
case Language.Swedish => "swedish"
case Language.Russian => "russian"
case Language.Romanian => "romanian"
case Language.Dutch => "dutch"
case Language.Latvian => "lithuanian" // ?
case Language.Japanese => "simple"
case Language.Hebrew => "simple"
}
}

View File

@ -0,0 +1,5 @@
package docspell.ftspsql
import docspell.common.{LenientUri, Password}
case class PsqlConfig(url: LenientUri, user: String, password: Password)

View File

@ -0,0 +1,131 @@
package docspell.ftspsql
import cats.effect._
import cats.implicits._
import com.zaxxer.hikari.HikariDataSource
import docspell.common._
import docspell.ftsclient._
import docspell.logging.Logger
import doobie._
import doobie.hikari.HikariTransactor
import doobie.implicits._
import fs2.Stream
import scala.concurrent.ExecutionContext
final class PsqlFtsClient[F[_]: Sync](cfg: PsqlConfig, xa: Transactor[F])
extends FtsClient[F] {
val engine = Ident.unsafe("postgres")
def initialize: F[List[FtsMigration[F]]] =
Sync[F].pure(
List(
FtsMigration(
0,
engine,
"initialize",
DbMigration[F](cfg).run.as(FtsMigration.Result.WorkDone)
)
)
)
def initializeNew: List[FtsMigration[F]] =
List(
FtsMigration(
10,
engine,
"reset",
FtsRepository.resetAll.transact(xa).as(FtsMigration.Result.workDone)
),
FtsMigration(
20,
engine,
"schema",
DbMigration[F](cfg).run.as(FtsMigration.Result.workDone)
),
FtsMigration(20, engine, "index all", FtsMigration.Result.indexAll.pure[F])
)
def search(q: FtsQuery): F[FtsResult] =
for {
startNanos <- Sync[F].delay(System.nanoTime())
summary <- FtsRepository.searchSummary(q).transact(xa)
results <- FtsRepository.search(q, true).transact(xa)
endNanos <- Sync[F].delay(System.nanoTime())
duration = Duration.nanos(endNanos - startNanos)
res = SearchResult
.toFtsResult(summary, results)
.copy(qtime = duration)
} yield res
def indexData(logger: Logger[F], data: Stream[F, TextData]): F[Unit] =
data
.map(FtsRecord.fromTextData)
.chunkN(50)
.evalMap(chunk =>
logger.debug(s"Update fts index with ${chunk.size} records") *> FtsRepository
.replaceChunk(chunk)
.transact(xa)
)
.compile
.drain
def updateIndex(logger: Logger[F], data: Stream[F, TextData]): F[Unit] =
data
.map(FtsRecord.fromTextData)
.chunkN(50)
.evalMap(chunk => FtsRepository.updateChunk(chunk).transact(xa))
.compile
.drain
def updateFolder(
logger: Logger[F],
itemId: Ident,
collective: Ident,
folder: Option[Ident]
): F[Unit] =
logger.debug(s"Update folder '${folder
.map(_.id)}' in fts for collective ${collective.id} and item ${itemId.id}") *>
FtsRepository.updateFolder(itemId, collective, folder).transact(xa).void
def removeItem(logger: Logger[F], itemId: Ident): F[Unit] =
logger.debug(s"Removing item from fts index: ${itemId.id}") *>
FtsRepository.deleteByItemId(itemId).transact(xa).void
def removeAttachment(logger: Logger[F], attachId: Ident): F[Unit] =
logger.debug(s"Removing attachment from fts index: ${attachId.id}") *>
FtsRepository.deleteByAttachId(attachId).transact(xa).void
def clearAll(logger: Logger[F]): F[Unit] =
logger.info(s"Deleting complete FTS index") *>
FtsRepository.deleteAll.transact(xa).void
def clear(logger: Logger[F], collective: Ident): F[Unit] =
logger.info(s"Deleting index for collective ${collective.id}") *>
FtsRepository.delete(collective).transact(xa).void
}
object PsqlFtsClient {
def apply[F[_]: Async](
cfg: PsqlConfig,
connectEC: ExecutionContext
): Resource[F, PsqlFtsClient[F]] = {
val acquire = Sync[F].delay(new HikariDataSource())
val free: HikariDataSource => F[Unit] = ds => Sync[F].delay(ds.close())
for {
ds <- Resource.make(acquire)(free)
_ = Resource.pure {
ds.setJdbcUrl(cfg.url.asString)
ds.setUsername(cfg.user)
ds.setPassword(cfg.password.pass)
ds.setDriverClassName("org.postgresql.Driver")
}
xa = HikariTransactor[F](ds, connectEC)
pc = new PsqlFtsClient[F](cfg, xa)
// _ <- Resource.eval(st.migrate)
} yield pc
}
}

View File

@ -0,0 +1,47 @@
package docspell.ftspsql
import docspell.common._
import docspell.ftsclient.FtsResult.{ItemMatch, MatchData}
import docspell.ftsclient.FtsResult
final case class SearchResult(
id: Ident,
itemId: Ident,
collective: Ident,
language: Language,
attachId: Option[Ident],
folderId: Option[Ident],
attachName: Option[String],
itemName: Option[String],
rank: Double,
highlight: Option[String]
)
object SearchResult {
def toFtsResult(summary: SearchSummary, results: Vector[SearchResult]): FtsResult = {
def mkEntry(r: SearchResult): (ItemMatch, (Ident, List[String])) = {
def create(md: MatchData) = ItemMatch(r.id, r.itemId, r.collective, r.rank, md)
val itemMatch =
r.attachId match {
case Some(aId) =>
create(FtsResult.AttachmentData(aId, r.attachName.getOrElse("")))
case None =>
create(FtsResult.ItemData)
}
(itemMatch, r.id -> r.highlight.toList)
}
val (items, hl) = results.map(mkEntry).unzip
FtsResult(
Duration.zero,
summary.count.toInt,
summary.maxScore,
hl.toMap,
items.toList
)
}
}

View File

@ -0,0 +1,3 @@
package docspell.ftspsql
case class SearchSummary(count: Long, maxScore: Double)

View File

@ -0,0 +1,31 @@
package docspell.ftspsql
import cats.effect._
import cats.effect.unsafe.implicits._
import docspell.logging.{Level, LogConfig}
//import cats.implicits._
import com.dimafeng.testcontainers.PostgreSQLContainer
import com.dimafeng.testcontainers.munit.TestContainerForAll
import docspell.common._
import docspell.logging.TestLoggingConfig
import munit.FunSuite
import org.testcontainers.utility.DockerImageName
class MigrationTest extends FunSuite with TestContainerForAll with TestLoggingConfig {
override val containerDef: PostgreSQLContainer.Def =
PostgreSQLContainer.Def(DockerImageName.parse("postgres:14"))
override def docspellLogConfig: LogConfig =
LogConfig(Level.Debug, LogConfig.Format.Fancy)
override def rootMinimumLevel = Level.Warn
test("create schema") {
withContainers { cnt =>
val jdbc =
PsqlConfig(LenientUri.unsafe(cnt.jdbcUrl), cnt.username, Password(cnt.password))
new DbMigration[IO](jdbc).run.void.unsafeRunSync()
}
}
}

View File

@ -22,7 +22,7 @@ trait JsonCodec {
new Encoder[TextData.Attachment] {
final def apply(td: TextData.Attachment): Json = {
val cnt =
(Field.contentField(td.lang).name, Json.fromString(td.text.getOrElse("")))
(Field.contentField(td.language).name, Json.fromString(td.text.getOrElse("")))
Json.fromFields(
cnt :: List(
@ -165,7 +165,7 @@ trait JsonCodec {
val setter = List(
td.name.map(n => (Field.attachmentName.name, Map("set" -> n.asJson).asJson)),
td.text.map(txt =>
(Field.contentField(td.lang).name, Map("set" -> txt.asJson).asJson)
(Field.contentField(td.language).name, Map("set" -> txt.asJson).asJson)
)
).flatten
Json.fromFields(

View File

@ -7,13 +7,13 @@
package docspell.joex
import cats.effect.{Async, Resource}
import docspell.analysis.TextAnalyser
import docspell.backend.fulltext.CreateIndex
import docspell.backend.ops._
import docspell.common._
import docspell.ftsclient.FtsClient
import docspell.ftssolr.SolrFtsClient
import docspell.ftspsql.{PsqlConfig, PsqlFtsClient}
//import docspell.ftssolr.SolrFtsClient
import docspell.joex.analysis.RegexNerFile
import docspell.joex.emptytrash.EmptyTrashTask
import docspell.joex.filecopy.{FileCopyTask, FileIntegrityCheckTask}
@ -33,7 +33,6 @@ import docspell.pubsub.api.PubSubT
import docspell.scheduler.impl.JobStoreModuleBuilder
import docspell.scheduler.{JobStoreModule, JobTask, JobTaskRegistry}
import docspell.store.Store
import emil.Emil
import org.http4s.client.Client
@ -221,7 +220,7 @@ object JoexTasks {
joex <- OJoex(pubSub)
store = jobStoreModule.store
upload <- OUpload(store, jobStoreModule.jobs)
fts <- createFtsClient(cfg)(httpClient)
fts <- createFtsClient(cfg, store)
createIndex <- CreateIndex.resource(fts, store)
itemOps <- OItem(store, fts, createIndex, jobStoreModule.jobs)
itemSearchOps <- OItemSearch(store)
@ -249,8 +248,17 @@ object JoexTasks {
)
private def createFtsClient[F[_]: Async](
cfg: Config
)(client: Client[F]): Resource[F, FtsClient[F]] =
if (cfg.fullTextSearch.enabled) SolrFtsClient(cfg.fullTextSearch.solr, client)
cfg: Config,
store: Store[F] /*,
client: Client[F] */
): Resource[F, FtsClient[F]] =
// if (cfg.fullTextSearch.enabled) SolrFtsClient(cfg.fullTextSearch.solr, client)
if (cfg.fullTextSearch.enabled)
Resource.pure[F, FtsClient[F]](
new PsqlFtsClient[F](
PsqlConfig(cfg.jdbc.url, cfg.jdbc.user, Password(cfg.jdbc.password)),
store.transactor
)
)
else Resource.pure[F, FtsClient[F]](FtsClient.none[F])
}

View File

@ -9,11 +9,12 @@ package docspell.restserver
import cats.effect._
import fs2.Stream
import fs2.concurrent.Topic
import docspell.backend.BackendApp
import docspell.backend.auth.{AuthToken, ShareToken}
import docspell.common.Password
import docspell.ftsclient.FtsClient
import docspell.ftssolr.SolrFtsClient
import docspell.ftspsql.{PsqlConfig, PsqlFtsClient}
//import docspell.ftssolr.SolrFtsClient
import docspell.notification.api.NotificationModule
import docspell.notification.impl.NotificationModuleImpl
import docspell.oidc.CodeFlowRoutes
@ -25,7 +26,6 @@ import docspell.restserver.webapp.{TemplateRoutes, Templates, WebjarRoutes}
import docspell.restserver.ws.{OutputEvent, WebSocketRoutes}
import docspell.scheduler.impl.JobStoreModuleBuilder
import docspell.store.Store
import emil.javamail.JavaMailEmil
import org.http4s.HttpRoutes
import org.http4s.client.Client
@ -163,7 +163,7 @@ object RestAppImpl {
val logger = docspell.logging.getLogger[F](s"restserver-${cfg.appId.id}")
for {
ftsClient <- createFtsClient(cfg)(httpClient)
ftsClient <- createFtsClient(cfg, store)
pubSubT = PubSubT(pubSub, logger)
javaEmil = JavaMailEmil(cfg.backend.mailSettings)
notificationMod <- Resource.eval(
@ -188,8 +188,21 @@ object RestAppImpl {
}
private def createFtsClient[F[_]: Async](
cfg: Config
)(client: Client[F]): Resource[F, FtsClient[F]] =
if (cfg.fullTextSearch.enabled) SolrFtsClient(cfg.fullTextSearch.solr, client)
cfg: Config,
store: Store[F] /*, client: Client[F] */
): Resource[F, FtsClient[F]] =
// if (cfg.fullTextSearch.enabled) SolrFtsClient(cfg.fullTextSearch.solr, client)
if (cfg.fullTextSearch.enabled)
Resource.pure[F, FtsClient[F]](
new PsqlFtsClient[F](
PsqlConfig(
cfg.backend.jdbc.url,
cfg.backend.jdbc.user,
Password(cfg.backend.jdbc.password)
),
store.transactor
)
)
else Resource.pure[F, FtsClient[F]](FtsClient.none[F])
}

View File

@ -34,6 +34,8 @@ trait Store[F[_]] {
): FileRepository[F]
def add(insert: ConnectionIO[Int], exists: ConnectionIO[Boolean]): F[AddResult]
def transactor: Transactor[F]
}
object Store {

View File

@ -24,8 +24,9 @@ final class StoreImpl[F[_]: Async](
val fileRepo: FileRepository[F],
jdbc: JdbcConfig,
ds: DataSource,
xa: Transactor[F]
val transactor: Transactor[F]
) extends Store[F] {
private[this] val xa = transactor
def createFileRepository(
cfg: FileRepositoryConfig,

View File

@ -23,7 +23,7 @@ class PostgresqlMigrateTest
with TestContainerForAll
with TestLoggingConfig {
override val containerDef: PostgreSQLContainer.Def =
PostgreSQLContainer.Def(DockerImageName.parse("postgres:13"))
PostgreSQLContainer.Def(DockerImageName.parse("postgres:14"))
test("postgres empty schema migration") {
assume(Docker.existsUnsafe, "docker doesn't exist!")