Refactoring

This commit is contained in:
eikek
2022-05-31 22:42:35 +02:00
parent 78b19a0940
commit 66aab0c952
10 changed files with 244 additions and 132 deletions

View File

@ -0,0 +1,30 @@
/*
* Copyright 2020 Eike K. & Contributors
*
* SPDX-License-Identifier: AGPL-3.0-or-later
*/
package docspell.store.fts
import docspell.store.impl.DoobieMeta.jsonMeta
import doobie.Meta
import io.circe.generic.semiauto.{deriveDecoder, deriveEncoder}
import io.circe.{Decoder, Encoder}
/** Highlighting context from a fulltext search.
*
* @param name
* the document name, either attachment name or "item"
* @param context
* lines with highlighting infos
*/
case class ContextEntry(name: String, context: List[String])
object ContextEntry {
implicit val jsonDecoder: Decoder[ContextEntry] = deriveDecoder
implicit val jsonEncoder: Encoder[ContextEntry] = deriveEncoder
implicit val meta: Meta[ContextEntry] =
jsonMeta[ContextEntry]
}

View File

@ -0,0 +1,88 @@
/*
* Copyright 2020 Eike K. & Contributors
*
* SPDX-License-Identifier: AGPL-3.0-or-later
*/
package docspell.store.fts
import cats.Foldable
import cats.data.NonEmptyList
import cats.effect.Sync
import cats.syntax.all._
import fs2.Pipe
import docspell.common._
import docspell.ftsclient.FtsResult
import docspell.store.Db
import docspell.store.qb.DSL._
import docspell.store.qb._
import doobie._
import doobie.implicits._
/** Temporary table used to store item ids fetched from fulltext search */
case class RFtsResult(id: Ident, score: Option[Double], context: Option[ContextEntry])
object RFtsResult {
def fromResult(result: FtsResult)(m: FtsResult.ItemMatch): RFtsResult = {
val context = m.data match {
case FtsResult.AttachmentData(_, attachName) =>
result.highlight
.get(m.id)
.filter(_.nonEmpty)
.map(str => ContextEntry(attachName, str))
case FtsResult.ItemData =>
result.highlight
.get(m.id)
.filter(_.nonEmpty)
.map(str => ContextEntry("item", str))
}
RFtsResult(m.itemId, m.score.some, context)
}
def prepareTable(db: Db, name: String): Pipe[ConnectionIO, FtsResult, Table] =
TempFtsOps.prepareTable(db, name)
case class Table(tableName: String, alias: Option[String], dbms: Db) extends TableDef {
val id: Column[Ident] = Column("id", this)
val score: Column[Double] = Column("score", this)
val context: Column[ContextEntry] = Column("context", this)
val all: NonEmptyList[Column[_]] = NonEmptyList.of(id, score, context)
def as(newAlias: String): Table = copy(alias = Some(newAlias))
def distinctCte(name: String) =
dbms.fold(
TempFtsOps.distinctCtePg(this, name),
TempFtsOps.distinctCteMaria(this, name),
TempFtsOps.distinctCteH2(this, name)
)
def distinctCteSimple(name: String) =
CteBind(copy(tableName = name) -> Select(select(id), from(this)).distinct)
def insertAll[F[_]: Foldable](rows: F[RFtsResult]): ConnectionIO[Int] =
TempFtsOps.insertBatch(this, rows)
def dropTable: ConnectionIO[Int] =
TempFtsOps.dropTable(Fragment.const0(tableName)).update.run
def createIndex: ConnectionIO[Unit] = {
val analyze = dbms.fold(
TempFtsOps.analyzeTablePg(this),
cio.unit,
cio.unit
)
TempFtsOps.createIndex(this) *> analyze
}
def insert: Pipe[ConnectionIO, FtsResult, Int] =
in => in.evalMap(res => insertAll(res.results.map(RFtsResult.fromResult(res))))
}
private val cio: Sync[ConnectionIO] = Sync[ConnectionIO]
}

View File

@ -4,97 +4,25 @@
* SPDX-License-Identifier: AGPL-3.0-or-later
*/
package docspell.store.impl
package docspell.store.fts
import cats.Foldable
import cats.data.NonEmptyList
import cats.effect._
import cats.syntax.all._
import fs2.{Pipe, Stream}
import docspell.common.{Duration, Ident}
import docspell.common.Duration
import docspell.ftsclient.FtsResult
import docspell.store.Db
import docspell.store.fts.RFtsResult.Table
import docspell.store.qb.DSL._
import docspell.store.qb._
import doobie._
import doobie.implicits._
import io.circe.generic.semiauto.{deriveDecoder, deriveEncoder}
import io.circe.{Decoder, Encoder}
/** Temporary table used to store item ids fetched from fulltext search */
object TempFtsTable {
private[fts] object TempFtsOps {
private[this] val logger = docspell.logging.getLogger[ConnectionIO]
case class Row(id: Ident, score: Option[Double], context: Option[ContextEntry])
object Row {
def from(result: FtsResult)(m: FtsResult.ItemMatch): Row = {
val context = m.data match {
case FtsResult.AttachmentData(_, attachName) =>
result.highlight
.get(m.id)
.filter(_.nonEmpty)
.map(str => ContextEntry(attachName, str))
case FtsResult.ItemData =>
result.highlight
.get(m.id)
.filter(_.nonEmpty)
.map(str => ContextEntry("item", str))
}
Row(m.itemId, m.score.some, context)
}
}
case class ContextEntry(name: String, context: List[String])
object ContextEntry {
implicit val jsonDecoder: Decoder[ContextEntry] = deriveDecoder
implicit val jsonEncoder: Encoder[ContextEntry] = deriveEncoder
implicit val meta: Meta[ContextEntry] =
jsonMeta[ContextEntry]
}
case class Table(tableName: String, alias: Option[String], dbms: Db) extends TableDef {
val id: Column[Ident] = Column("id", this)
val score: Column[Double] = Column("score", this)
val context: Column[ContextEntry] = Column("context", this)
val all: NonEmptyList[Column[_]] = NonEmptyList.of(id, score, context)
def as(newAlias: String): Table = copy(alias = Some(newAlias))
def distinctCte(name: String) =
dbms.fold(
TempFtsTable.distinctCtePg(this, name),
TempFtsTable.distinctCteMaria(this, name),
TempFtsTable.distinctCteH2(this, name)
)
def distinctCteSimple(name: String) =
CteBind(copy(tableName = name) -> Select(select(id), from(this)).distinct)
def insertAll[F[_]: Foldable](rows: F[Row]): ConnectionIO[Int] =
insertBatch(this, rows)
def dropTable: ConnectionIO[Int] =
TempFtsTable.dropTable(Fragment.const0(tableName)).update.run
def createIndex: ConnectionIO[Unit] = {
val analyze = dbms.fold(
TempFtsTable.analyzeTablePg(this),
cio.unit,
cio.unit
)
TempFtsTable.createIndex(this) *> analyze
}
def insert: Pipe[ConnectionIO, FtsResult, Int] =
in => in.evalMap(res => insertAll(res.results.map(Row.from(res))))
}
def createTable(db: Db, name: String): ConnectionIO[Table] = {
val stmt = db.fold(
createTablePostgreSQL(Fragment.const(name)),
@ -119,7 +47,7 @@ object TempFtsTable {
)
} yield tt
private def dropTable(name: Fragment): Fragment =
def dropTable(name: Fragment): Fragment =
sql"""DROP TABLE IF EXISTS $name"""
private def createTableH2(name: Fragment): ConnectionIO[Int] =
@ -144,7 +72,7 @@ object TempFtsTable {
| context text
|) ON COMMIT DROP;""".stripMargin.update.run
private def createIndex(table: Table): ConnectionIO[Unit] = {
def createIndex(table: Table): ConnectionIO[Unit] = {
val tableName = Fragment.const0(table.tableName)
val idIdxName = Fragment.const0(s"${table.tableName}_id_idx")
@ -156,21 +84,67 @@ object TempFtsTable {
sql"CREATE INDEX IF NOT EXISTS $scoreIdxName ON $tableName($score)".update.run.void
}
private def analyzeTablePg(table: Table): ConnectionIO[Unit] = {
def analyzeTablePg(table: Table): ConnectionIO[Unit] = {
val tableName = Fragment.const0(table.tableName)
sql"ANALYZE $tableName".update.run.void
}
private def insertBatch[F[_]: Foldable](table: Table, rows: F[Row]) = {
// // slowest (9 runs, 6000 rows each, ~170ms)
// def insertBatch2[F[_]: Foldable](table: Table, rows: F[RFtsResult]) = {
// val sql =
// s"""INSERT INTO ${table.tableName}
// | (${table.id.name}, ${table.score.name}, ${table.context.name})
// | VALUES (?, ?, ?)""".stripMargin
//
// Update[RFtsResult](sql).updateMany(rows)
// }
// // better (~115ms)
// def insertBatch3[F[_]: Foldable](
// table: Table,
// rows: F[RFtsResult]
// ): ConnectionIO[Int] = {
// val values = rows
// .foldl(List.empty[Fragment]) { (res, row) =>
// sql"(${row.id},${row.score},${row.context})" :: res
// }
//
// DML.insertMulti(table, table.all, values)
// }
// ~96ms
def insertBatch[F[_]: Foldable](
table: Table,
rows: F[RFtsResult]
): ConnectionIO[Int] = {
val values = rows
.foldl(List.empty[String]) { (res, _) =>
"(?,?,?)" :: res
}
.mkString(",")
val sql =
s"""INSERT INTO ${table.tableName}
| (${table.id.name}, ${table.score.name}, ${table.context.name})
| VALUES (?, ?, ?)""".stripMargin
| VALUES $values""".stripMargin
Update[Row](sql).updateMany(rows)
val encoder = io.circe.Encoder[ContextEntry]
doobie.free.FC.raw { conn =>
val pst = conn.prepareStatement(sql)
rows.foldl(0) { (index, row) =>
pst.setString(index + 1, row.id.id)
row.score
.map(d => pst.setDouble(index + 2, d))
.getOrElse(pst.setNull(index + 2, java.sql.Types.DOUBLE))
row.context
.map(c => pst.setString(index + 3, encoder(c).noSpaces))
.getOrElse(pst.setNull(index + 3, java.sql.Types.VARCHAR))
index + 3
}
pst.executeUpdate()
}
}
private def distinctCtePg(table: Table, name: String): CteBind =
def distinctCtePg(table: Table, name: String): CteBind =
CteBind(
table.copy(tableName = name) ->
Select(
@ -183,7 +157,7 @@ object TempFtsTable {
).groupBy(table.id)
)
private def distinctCteMaria(table: Table, name: String): CteBind =
def distinctCteMaria(table: Table, name: String): CteBind =
CteBind(
table.copy(tableName = name) ->
Select(
@ -196,7 +170,7 @@ object TempFtsTable {
).groupBy(table.id)
)
private def distinctCteH2(table: Table, name: String): CteBind =
def distinctCteH2(table: Table, name: String): CteBind =
CteBind(
table.copy(tableName = name) ->
Select(
@ -208,6 +182,4 @@ object TempFtsTable {
from(table)
).groupBy(table.id)
)
private val cio: Sync[ConnectionIO] = Sync[ConnectionIO]
}

View File

@ -41,6 +41,17 @@ object DML extends DoobieMeta {
): ConnectionIO[Int] =
insertFragment(table, cols, values).update.run
def insertMulti(
table: TableDef,
cols: Nel[Column[_]],
values: Seq[Fragment]
): ConnectionIO[Int] =
(fr"INSERT INTO ${FromExprBuilder.buildTable(table)} (" ++
cols
.map(SelectExprBuilder.columnNoPrefix)
.reduceLeft(_ ++ comma ++ _) ++
fr") VALUES ${values.reduce(_ ++ comma ++ _)}").update.run
def insertFragment(
table: TableDef,
cols: Nel[Column[_]],

View File

@ -6,7 +6,7 @@
package docspell.store.queries
import docspell.store.impl.TempFtsTable
import docspell.store.fts.RFtsResult
import docspell.store.qb.DSL._
import docspell.store.qb._
import docspell.store.records.RItem
@ -16,7 +16,7 @@ trait FtsSupport {
implicit final class SelectOps(select: Select) {
def joinFtsIdOnly(
itemTable: RItem.Table,
ftsTable: Option[TempFtsTable.Table]
ftsTable: Option[RFtsResult.Table]
): Select =
ftsTable match {
case Some(ftst) =>
@ -30,7 +30,7 @@ trait FtsSupport {
def joinFtsDetails(
itemTable: RItem.Table,
ftsTable: Option[TempFtsTable.Table]
ftsTable: Option[RFtsResult.Table]
): Select =
ftsTable match {
case Some(ftst) =>
@ -44,7 +44,7 @@ trait FtsSupport {
def ftsCondition(
itemTable: RItem.Table,
ftsTable: Option[TempFtsTable.Table]
ftsTable: Option[RFtsResult.Table]
): Select =
ftsTable match {
case Some(ftst) =>
@ -55,7 +55,7 @@ trait FtsSupport {
}
}
def cteTable(ftsTable: TempFtsTable.Table) =
def cteTable(ftsTable: RFtsResult.Table) =
ftsTable.copy(tableName = "cte_fts")
}

View File

@ -7,7 +7,7 @@
package docspell.store.queries
import docspell.common._
import docspell.store.impl.TempFtsTable.ContextEntry
import docspell.store.fts.ContextEntry
case class ListItem(
id: Ident,

View File

@ -18,7 +18,7 @@ import docspell.common.{FileKey, IdRef, _}
import docspell.query.ItemQuery.Expr.ValidItemStates
import docspell.query.{ItemQuery, ItemQueryDsl}
import docspell.store.Store
import docspell.store.impl.TempFtsTable
import docspell.store.fts.RFtsResult
import docspell.store.qb.DSL._
import docspell.store.qb._
import docspell.store.qb.generator.{ItemQueryGenerator, Tables}
@ -46,7 +46,7 @@ object QItem extends FtsSupport {
private val ti = RTagItem.as("ti")
private val meta = RFileMeta.as("fmeta")
private def orderSelect(ftsOpt: Option[TempFtsTable.Table]): OrderSelect =
private def orderSelect(ftsOpt: Option[RFtsResult.Table]): OrderSelect =
new OrderSelect {
val item = i
val fts = ftsOpt
@ -59,7 +59,7 @@ object QItem extends FtsSupport {
today: LocalDate,
maxNoteLen: Int,
batch: Batch,
ftsTable: Option[TempFtsTable.Table]
ftsTable: Option[RFtsResult.Table]
) = {
val cteFts = ftsTable.map(cteTable)
val sql =
@ -170,7 +170,7 @@ object QItem extends FtsSupport {
q: Query.Fix,
today: LocalDate,
noteMaxLen: Int,
ftsTable: Option[TempFtsTable.Table]
ftsTable: Option[RFtsResult.Table]
): Select.Ordered = {
val coll = q.account.collective
@ -322,7 +322,7 @@ object QItem extends FtsSupport {
): Stream[ConnectionIO, ListItem] =
queryItems(q, today, maxNoteLen, batch, None)
def searchStats(today: LocalDate, ftsTable: Option[TempFtsTable.Table])(
def searchStats(today: LocalDate, ftsTable: Option[RFtsResult.Table])(
q: Query
): ConnectionIO[SearchSummary] =
for {
@ -349,7 +349,7 @@ object QItem extends FtsSupport {
def searchTagCategorySummary(
today: LocalDate,
ftsTable: Option[TempFtsTable.Table]
ftsTable: Option[RFtsResult.Table]
)(q: Query): ConnectionIO[List[CategoryCount]] = {
val tagFrom =
from(ti)
@ -374,7 +374,7 @@ object QItem extends FtsSupport {
} yield existing ++ other.map(n => CategoryCount(n.some, 0))
}
def searchTagSummary(today: LocalDate, ftsTable: Option[TempFtsTable.Table])(
def searchTagSummary(today: LocalDate, ftsTable: Option[RFtsResult.Table])(
q: Query
): ConnectionIO[List[TagCount]] = {
val tagFrom =
@ -401,7 +401,7 @@ object QItem extends FtsSupport {
} yield existing ++ other.map(TagCount(_, 0))
}
def searchCountSummary(today: LocalDate, ftsTable: Option[TempFtsTable.Table])(
def searchCountSummary(today: LocalDate, ftsTable: Option[RFtsResult.Table])(
q: Query
): ConnectionIO[Int] =
findItemsBase(q.fix, today, 0, None).unwrap
@ -412,22 +412,22 @@ object QItem extends FtsSupport {
.query[Int]
.unique
def searchCorrOrgSummary(today: LocalDate, ftsTable: Option[TempFtsTable.Table])(
def searchCorrOrgSummary(today: LocalDate, ftsTable: Option[RFtsResult.Table])(
q: Query
): ConnectionIO[List[IdRefCount]] =
searchIdRefSummary(org.oid, org.name, i.corrOrg, today, ftsTable)(q)
def searchCorrPersonSummary(today: LocalDate, ftsTable: Option[TempFtsTable.Table])(
def searchCorrPersonSummary(today: LocalDate, ftsTable: Option[RFtsResult.Table])(
q: Query
): ConnectionIO[List[IdRefCount]] =
searchIdRefSummary(pers0.pid, pers0.name, i.corrPerson, today, ftsTable)(q)
def searchConcPersonSummary(today: LocalDate, ftsTable: Option[TempFtsTable.Table])(
def searchConcPersonSummary(today: LocalDate, ftsTable: Option[RFtsResult.Table])(
q: Query
): ConnectionIO[List[IdRefCount]] =
searchIdRefSummary(pers1.pid, pers1.name, i.concPerson, today, ftsTable)(q)
def searchConcEquipSummary(today: LocalDate, ftsTable: Option[TempFtsTable.Table])(
def searchConcEquipSummary(today: LocalDate, ftsTable: Option[RFtsResult.Table])(
q: Query
): ConnectionIO[List[IdRefCount]] =
searchIdRefSummary(equip.eid, equip.name, i.concEquipment, today, ftsTable)(q)
@ -437,7 +437,7 @@ object QItem extends FtsSupport {
nameCol: Column[String],
fkCol: Column[Ident],
today: LocalDate,
ftsTable: Option[TempFtsTable.Table]
ftsTable: Option[RFtsResult.Table]
)(q: Query): ConnectionIO[List[IdRefCount]] =
findItemsBase(q.fix, today, 0, None).unwrap
.joinFtsIdOnly(i, ftsTable)
@ -450,7 +450,7 @@ object QItem extends FtsSupport {
.query[IdRefCount]
.to[List]
def searchFolderSummary(today: LocalDate, ftsTable: Option[TempFtsTable.Table])(
def searchFolderSummary(today: LocalDate, ftsTable: Option[RFtsResult.Table])(
q: Query
): ConnectionIO[List[FolderCount]] = {
val fu = RUser.as("fu")
@ -465,7 +465,7 @@ object QItem extends FtsSupport {
.to[List]
}
def searchFieldSummary(today: LocalDate, ftsTable: Option[TempFtsTable.Table])(
def searchFieldSummary(today: LocalDate, ftsTable: Option[RFtsResult.Table])(
q: Query
): ConnectionIO[List[FieldStats]] = {
val fieldJoin =

View File

@ -8,7 +8,7 @@ package docspell.store.queries
import docspell.common._
import docspell.query.ItemQuery
import docspell.store.impl.TempFtsTable
import docspell.store.fts.RFtsResult
import docspell.store.qb.DSL._
import docspell.store.qb.{Column, OrderBy}
import docspell.store.records.RItem
@ -33,7 +33,7 @@ case class Query(fix: Query.Fix, cond: Query.QueryCond) {
object Query {
trait OrderSelect {
def item: RItem.Table
def fts: Option[TempFtsTable.Table]
def fts: Option[RFtsResult.Table]
def byDefault: OrderBy =
OrderBy.desc(coalesce(item.itemDate.s, item.created.s).s)

View File

@ -4,7 +4,7 @@
* SPDX-License-Identifier: AGPL-3.0-or-later
*/
package docspell.store.impl
package docspell.store.fts
import java.time.LocalDate
@ -18,7 +18,7 @@ import docspell.ftsclient.FtsResult
import docspell.ftsclient.FtsResult.{AttachmentData, ItemMatch}
import docspell.logging.Level
import docspell.store._
import docspell.store.impl.TempFtsTable.Row
import docspell.store.fts.RFtsResult
import docspell.store.qb.DSL._
import docspell.store.qb._
import docspell.store.queries.{QItem, Query}
@ -26,7 +26,7 @@ import docspell.store.records.{RCollective, RItem}
import doobie._
class TempFtsTableTest extends DatabaseTest {
class TempFtsOpsTest extends DatabaseTest {
private[this] val logger = docspell.logging.getLogger[IO]
override def rootMinimumLevel = Level.Info
@ -53,12 +53,11 @@ class TempFtsTableTest extends DatabaseTest {
_ <- prepareItems(maria)
_ <- prepareItems(h2)
_ <- assertQueryItem(pg, ftsResults(10, 10))
_ <- assertQueryItem(pg, ftsResults(3000, 500))
_ <- assertQueryItem(pg, ftsResults(3000, 500))
// _ <- assertQueryItem(pg, ftsResults(3000, 500))
_ <- assertQueryItem(maria, ftsResults(10, 10))
_ <- assertQueryItem(maria, ftsResults(3000, 500))
// _ <- assertQueryItem(maria, ftsResults(3000, 500))
_ <- assertQueryItem(h2, ftsResults(10, 10))
_ <- assertQueryItem(h2, ftsResults(3000, 500))
// _ <- assertQueryItem(h2, ftsResults(3000, 500))
} yield ()
}
@ -74,19 +73,19 @@ class TempFtsTableTest extends DatabaseTest {
def assertCreateTempTable(store: Store[IO]) = {
val insertRows =
List(
Row(id("abc-def"), None, None),
Row(id("abc-123"), Some(1.56), None),
Row(id("zyx-321"), None, None)
RFtsResult(id("abc-def"), None, None),
RFtsResult(id("abc-123"), Some(1.56), None),
RFtsResult(id("zyx-321"), None, None)
)
val create =
for {
table <- TempFtsTable.createTable(store.dbms, "tt")
table <- TempFtsOps.createTable(store.dbms, "tt")
n <- table.insertAll(insertRows)
_ <- table.createIndex
rows <- Select(select(table.all), from(table))
.orderBy(table.id)
.build
.query[Row]
.query[RFtsResult]
.to[List]
} yield (n, rows)
@ -106,7 +105,7 @@ class TempFtsTableTest extends DatabaseTest {
today <- IO(LocalDate.now())
account = DocspellSystem.account
tempTable = ftsResults
.through(TempFtsTable.prepareTable(store.dbms, "fts_result"))
.through(TempFtsOps.prepareTable(store.dbms, "fts_result"))
.compile
.lastOrError
q = Query(Query.Fix(account, None, None), Query.QueryExpr(None))