mirror of
https://github.com/TheAnachronism/docspell.git
synced 2025-06-22 02:18:26 +00:00
Prepare for new search logic with feature toggle
This commit is contained in:
@ -0,0 +1,213 @@
|
||||
/*
|
||||
* Copyright 2020 Eike K. & Contributors
|
||||
*
|
||||
* SPDX-License-Identifier: AGPL-3.0-or-later
|
||||
*/
|
||||
|
||||
package docspell.store.impl
|
||||
|
||||
import cats.Foldable
|
||||
import cats.data.NonEmptyList
|
||||
import cats.effect._
|
||||
import cats.syntax.all._
|
||||
import fs2.{Pipe, Stream}
|
||||
|
||||
import docspell.common.{Duration, Ident}
|
||||
import docspell.ftsclient.FtsResult
|
||||
import docspell.store.Db
|
||||
import docspell.store.qb.DSL._
|
||||
import docspell.store.qb._
|
||||
|
||||
import doobie._
|
||||
import doobie.implicits._
|
||||
import io.circe.generic.semiauto.{deriveDecoder, deriveEncoder}
|
||||
import io.circe.{Decoder, Encoder}
|
||||
|
||||
/** Temporary table used to store item ids fetched from fulltext search */
|
||||
object TempFtsTable {
|
||||
private[this] val logger = docspell.logging.getLogger[ConnectionIO]
|
||||
|
||||
case class Row(id: Ident, score: Option[Double], context: Option[ContextEntry])
|
||||
object Row {
|
||||
def from(result: FtsResult)(m: FtsResult.ItemMatch): Row = {
|
||||
val context = m.data match {
|
||||
case FtsResult.AttachmentData(_, attachName) =>
|
||||
result.highlight
|
||||
.get(m.id)
|
||||
.filter(_.nonEmpty)
|
||||
.map(str => ContextEntry(attachName, str))
|
||||
|
||||
case FtsResult.ItemData =>
|
||||
result.highlight
|
||||
.get(m.id)
|
||||
.filter(_.nonEmpty)
|
||||
.map(str => ContextEntry("item", str))
|
||||
}
|
||||
Row(m.itemId, m.score.some, context)
|
||||
}
|
||||
}
|
||||
|
||||
case class ContextEntry(name: String, context: List[String])
|
||||
object ContextEntry {
|
||||
implicit val jsonDecoder: Decoder[ContextEntry] = deriveDecoder
|
||||
implicit val jsonEncoder: Encoder[ContextEntry] = deriveEncoder
|
||||
|
||||
implicit val meta: Meta[ContextEntry] =
|
||||
jsonMeta[ContextEntry]
|
||||
}
|
||||
|
||||
case class Table(tableName: String, alias: Option[String], dbms: Db) extends TableDef {
|
||||
val id: Column[Ident] = Column("id", this)
|
||||
val score: Column[Double] = Column("score", this)
|
||||
val context: Column[ContextEntry] = Column("context", this)
|
||||
|
||||
val all: NonEmptyList[Column[_]] = NonEmptyList.of(id, score, context)
|
||||
|
||||
def as(newAlias: String): Table = copy(alias = Some(newAlias))
|
||||
|
||||
def distinctCte(name: String) =
|
||||
dbms.fold(
|
||||
TempFtsTable.distinctCtePg(this, name),
|
||||
TempFtsTable.distinctCteMaria(this, name),
|
||||
TempFtsTable.distinctCteH2(this, name)
|
||||
)
|
||||
|
||||
def distinctCteSimple(name: String) =
|
||||
CteBind(copy(tableName = name) -> Select(select(id), from(this)).distinct)
|
||||
|
||||
def insertAll[F[_]: Foldable](rows: F[Row]): ConnectionIO[Int] =
|
||||
insertBatch(this, rows)
|
||||
|
||||
def dropTable: ConnectionIO[Int] =
|
||||
TempFtsTable.dropTable(Fragment.const0(tableName)).update.run
|
||||
|
||||
def createIndex: ConnectionIO[Unit] = {
|
||||
val analyze = dbms.fold(
|
||||
TempFtsTable.analyzeTablePg(this),
|
||||
cio.unit,
|
||||
cio.unit
|
||||
)
|
||||
|
||||
TempFtsTable.createIndex(this) *> analyze
|
||||
}
|
||||
|
||||
def insert: Pipe[ConnectionIO, FtsResult, Int] =
|
||||
in => in.evalMap(res => insertAll(res.results.map(Row.from(res))))
|
||||
}
|
||||
|
||||
def createTable(db: Db, name: String): ConnectionIO[Table] = {
|
||||
val stmt = db.fold(
|
||||
createTablePostgreSQL(Fragment.const(name)),
|
||||
createTableMariaDB(Fragment.const0(name)),
|
||||
createTableH2(Fragment.const0(name))
|
||||
)
|
||||
stmt.as(Table(name, None, db))
|
||||
}
|
||||
|
||||
def prepareTable(db: Db, name: String): Pipe[ConnectionIO, FtsResult, Table] =
|
||||
in =>
|
||||
for {
|
||||
timed <- Stream.eval(Duration.stopTime[ConnectionIO])
|
||||
tt <- Stream.eval(createTable(db, name))
|
||||
n <- in.through(tt.insert).foldMonoid
|
||||
_ <- Stream.eval(tt.createIndex)
|
||||
duration <- Stream.eval(timed)
|
||||
_ <- Stream.eval(
|
||||
logger.info(
|
||||
s"Creating temporary fts table ($n elements) took: ${duration.formatExact}"
|
||||
)
|
||||
)
|
||||
} yield tt
|
||||
|
||||
private def dropTable(name: Fragment): Fragment =
|
||||
sql"""DROP TABLE IF EXISTS $name"""
|
||||
|
||||
private def createTableH2(name: Fragment): ConnectionIO[Int] =
|
||||
sql"""${dropTable(name)}; CREATE LOCAL TEMPORARY TABLE $name (
|
||||
| id varchar not null,
|
||||
| score double precision,
|
||||
| context text
|
||||
|);""".stripMargin.update.run
|
||||
|
||||
private def createTableMariaDB(name: Fragment): ConnectionIO[Int] =
|
||||
dropTable(name).update.run *>
|
||||
sql"""CREATE TEMPORARY TABLE $name (
|
||||
| id varchar(254) not null,
|
||||
| score double,
|
||||
| context mediumtext
|
||||
|)""".stripMargin.update.run
|
||||
|
||||
private def createTablePostgreSQL(name: Fragment): ConnectionIO[Int] =
|
||||
sql"""CREATE TEMPORARY TABLE IF NOT EXISTS $name (
|
||||
| id varchar not null,
|
||||
| score double precision,
|
||||
| context text
|
||||
|) ON COMMIT DROP;""".stripMargin.update.run
|
||||
|
||||
private def createIndex(table: Table): ConnectionIO[Unit] = {
|
||||
val tableName = Fragment.const0(table.tableName)
|
||||
|
||||
val idIdxName = Fragment.const0(s"${table.tableName}_id_idx")
|
||||
val id = Fragment.const0(table.id.name)
|
||||
val scoreIdxName = Fragment.const0(s"${table.tableName}_score_idx")
|
||||
val score = Fragment.const0(table.score.name)
|
||||
|
||||
sql"CREATE INDEX IF NOT EXISTS $idIdxName ON $tableName($id)".update.run.void *>
|
||||
sql"CREATE INDEX IF NOT EXISTS $scoreIdxName ON $tableName($score)".update.run.void
|
||||
}
|
||||
|
||||
private def analyzeTablePg(table: Table): ConnectionIO[Unit] = {
|
||||
val tableName = Fragment.const0(table.tableName)
|
||||
sql"ANALYZE $tableName".update.run.void
|
||||
}
|
||||
|
||||
private def insertBatch[F[_]: Foldable](table: Table, rows: F[Row]) = {
|
||||
val sql =
|
||||
s"""INSERT INTO ${table.tableName}
|
||||
| (${table.id.name}, ${table.score.name}, ${table.context.name})
|
||||
| VALUES (?, ?, ?)""".stripMargin
|
||||
|
||||
Update[Row](sql).updateMany(rows)
|
||||
}
|
||||
|
||||
private def distinctCtePg(table: Table, name: String): CteBind =
|
||||
CteBind(
|
||||
table.copy(tableName = name) ->
|
||||
Select(
|
||||
select(
|
||||
table.id.s,
|
||||
max(table.score).as(table.score.name),
|
||||
rawFunction("string_agg", table.context.s, lit("','")).as(table.context.name)
|
||||
),
|
||||
from(table)
|
||||
).groupBy(table.id)
|
||||
)
|
||||
|
||||
private def distinctCteMaria(table: Table, name: String): CteBind =
|
||||
CteBind(
|
||||
table.copy(tableName = name) ->
|
||||
Select(
|
||||
select(
|
||||
table.id.s,
|
||||
max(table.score).as(table.score.name),
|
||||
rawFunction("group_concat", table.context.s).as(table.context.name)
|
||||
),
|
||||
from(table)
|
||||
).groupBy(table.id)
|
||||
)
|
||||
|
||||
private def distinctCteH2(table: Table, name: String): CteBind =
|
||||
CteBind(
|
||||
table.copy(tableName = name) ->
|
||||
Select(
|
||||
select(
|
||||
table.id.s,
|
||||
max(table.score).as(table.score.name),
|
||||
rawFunction("listagg", table.context.s, lit("','")).as(table.context.name)
|
||||
),
|
||||
from(table)
|
||||
).groupBy(table.id)
|
||||
)
|
||||
|
||||
private val cio: Sync[ConnectionIO] = Sync[ConnectionIO]
|
||||
}
|
@ -1,85 +0,0 @@
|
||||
package docspell.store.impl
|
||||
|
||||
import cats.Foldable
|
||||
import cats.data.NonEmptyList
|
||||
import cats.effect._
|
||||
import cats.syntax.all._
|
||||
import docspell.common.Ident
|
||||
import docspell.store.Db
|
||||
import docspell.store.qb.{Column, TableDef}
|
||||
import docspell.store.impl.DoobieMeta._
|
||||
import doobie._
|
||||
import doobie.implicits._
|
||||
|
||||
/** Temporary table used to store item ids fetched from fulltext search */
|
||||
object TempIdTable {
|
||||
case class Row(id: Ident)
|
||||
case class Table(tableName: String, alias: Option[String], dbms: Db) extends TableDef {
|
||||
val id: Column[Ident] = Column("id", this)
|
||||
|
||||
val all: NonEmptyList[Column[_]] = NonEmptyList.of(id)
|
||||
|
||||
def as(newAlias: String): Table = copy(alias = Some(newAlias))
|
||||
|
||||
def insertAll[F[_]: Foldable](rows: F[Row]): ConnectionIO[Int] =
|
||||
insertBatch(this, rows)
|
||||
|
||||
def dropTable: ConnectionIO[Int] =
|
||||
TempIdTable.dropTable(Fragment.const0(tableName)).update.run
|
||||
|
||||
def createIndex: ConnectionIO[Unit] = {
|
||||
val analyze = dbms.fold(
|
||||
TempIdTable.analyzeTablePg(this),
|
||||
Sync[ConnectionIO].unit,
|
||||
Sync[ConnectionIO].unit
|
||||
)
|
||||
|
||||
TempIdTable.createIndex(this) *> analyze
|
||||
}
|
||||
}
|
||||
|
||||
def createTable(db: Db, name: String): ConnectionIO[Table] = {
|
||||
val stmt = db.fold(
|
||||
createTablePostgreSQL(Fragment.const(name)),
|
||||
createTableMariaDB(Fragment.const0(name)),
|
||||
createTableH2(Fragment.const0(name))
|
||||
)
|
||||
stmt.as(Table(name, None, db))
|
||||
}
|
||||
|
||||
private def dropTable(name: Fragment): Fragment =
|
||||
sql"""DROP TABLE IF EXISTS $name"""
|
||||
|
||||
private def createTableH2(name: Fragment): ConnectionIO[Int] =
|
||||
sql"""${dropTable(name)}; CREATE LOCAL TEMPORARY TABLE $name (
|
||||
| id varchar not null
|
||||
|);""".stripMargin.update.run
|
||||
|
||||
private def createTableMariaDB(name: Fragment): ConnectionIO[Int] =
|
||||
dropTable(name).update.run *>
|
||||
sql"CREATE TEMPORARY TABLE $name (id varchar(254) not null);".update.run
|
||||
|
||||
private def createTablePostgreSQL(name: Fragment): ConnectionIO[Int] =
|
||||
sql"""CREATE TEMPORARY TABLE IF NOT EXISTS $name (
|
||||
| id varchar not null
|
||||
|) ON COMMIT DROP;""".stripMargin.update.run
|
||||
|
||||
private def createIndex(table: Table): ConnectionIO[Unit] = {
|
||||
val idxName = Fragment.const0(s"${table.tableName}_id_idx")
|
||||
val tableName = Fragment.const0(table.tableName)
|
||||
val col = Fragment.const0(table.id.name)
|
||||
sql"""CREATE INDEX IF NOT EXISTS $idxName ON $tableName($col);""".update.run.void
|
||||
}
|
||||
|
||||
private def analyzeTablePg(table: Table): ConnectionIO[Unit] = {
|
||||
val tableName = Fragment.const0(table.tableName)
|
||||
sql"ANALYZE $tableName".update.run.void
|
||||
}
|
||||
|
||||
private def insertBatch[F[_]: Foldable](table: Table, rows: F[Row]) = {
|
||||
val sql =
|
||||
s"INSERT INTO ${table.tableName} (${table.id.name}) VALUES (?)"
|
||||
|
||||
Update[Row](sql).updateMany(rows)
|
||||
}
|
||||
}
|
@ -43,6 +43,8 @@ object DBFunction {
|
||||
|
||||
case class Concat(exprs: NonEmptyList[SelectExpr]) extends DBFunction
|
||||
|
||||
case class Raw(name: String, exprs: NonEmptyList[SelectExpr]) extends DBFunction
|
||||
|
||||
sealed trait Operator
|
||||
object Operator {
|
||||
case object Plus extends Operator
|
||||
|
@ -122,6 +122,9 @@ trait DSL extends DoobieMeta {
|
||||
def concat(expr: SelectExpr, exprs: SelectExpr*): DBFunction =
|
||||
DBFunction.Concat(Nel.of(expr, exprs: _*))
|
||||
|
||||
def rawFunction(name: String, expr: SelectExpr, more: SelectExpr*): DBFunction =
|
||||
DBFunction.Raw(name, Nel.of(expr, more: _*))
|
||||
|
||||
def const[A](value: A)(implicit P: Put[A]): SelectExpr.SelectConstant[A] =
|
||||
SelectExpr.SelectConstant(value, None)
|
||||
|
||||
|
@ -61,6 +61,11 @@ object DBFunctionBuilder extends CommonBuilder {
|
||||
|
||||
case DBFunction.Sum(expr) =>
|
||||
sql"SUM(" ++ SelectExprBuilder.build(expr) ++ fr")"
|
||||
|
||||
case DBFunction.Raw(name, exprs) =>
|
||||
val n = Fragment.const0(name)
|
||||
val inner = exprs.map(SelectExprBuilder.build).toList.reduce(_ ++ comma ++ _)
|
||||
sql"$n($inner)"
|
||||
}
|
||||
|
||||
def buildOperator(op: DBFunction.Operator): Fragment =
|
||||
|
@ -0,0 +1,50 @@
|
||||
/*
|
||||
* Copyright 2020 Eike K. & Contributors
|
||||
*
|
||||
* SPDX-License-Identifier: AGPL-3.0-or-later
|
||||
*/
|
||||
|
||||
package docspell.store.queries
|
||||
|
||||
import docspell.store.impl.TempFtsTable
|
||||
import docspell.store.qb.DSL._
|
||||
import docspell.store.qb._
|
||||
import docspell.store.records.RItem
|
||||
|
||||
trait FtsSupport {
|
||||
|
||||
implicit final class SelectOps(select: Select) {
|
||||
def joinFtsIdOnly(
|
||||
itemTable: RItem.Table,
|
||||
ftsTable: Option[TempFtsTable.Table]
|
||||
): Select =
|
||||
ftsTable match {
|
||||
case Some(ftst) =>
|
||||
val tt = cteTable(ftst)
|
||||
select
|
||||
.appendCte(ftst.distinctCteSimple(tt.tableName))
|
||||
.changeFrom(_.innerJoin(tt, itemTable.id === tt.id))
|
||||
case None =>
|
||||
select
|
||||
}
|
||||
|
||||
def joinFtsDetails(
|
||||
itemTable: RItem.Table,
|
||||
ftsTable: Option[TempFtsTable.Table]
|
||||
): Select =
|
||||
ftsTable match {
|
||||
case Some(ftst) =>
|
||||
val tt = cteTable(ftst)
|
||||
select
|
||||
.appendCte(ftst.distinctCte(tt.tableName))
|
||||
.changeFrom(_.innerJoin(tt, itemTable.id === tt.id))
|
||||
case None =>
|
||||
select
|
||||
}
|
||||
}
|
||||
|
||||
def cteTable(ftsTable: TempFtsTable.Table) =
|
||||
ftsTable.copy(tableName = "cte_fts")
|
||||
}
|
||||
|
||||
object FtsSupport extends FtsSupport
|
@ -7,6 +7,7 @@
|
||||
package docspell.store.queries
|
||||
|
||||
import docspell.common._
|
||||
import docspell.store.impl.TempFtsTable.ContextEntry
|
||||
|
||||
case class ListItem(
|
||||
id: Ident,
|
||||
@ -22,5 +23,21 @@ case class ListItem(
|
||||
concPerson: Option[IdRef],
|
||||
concEquip: Option[IdRef],
|
||||
folder: Option[IdRef],
|
||||
notes: Option[String]
|
||||
)
|
||||
notes: Option[String],
|
||||
context: Option[String]
|
||||
) {
|
||||
|
||||
def decodeContext: Option[Either[String, List[ContextEntry]]] =
|
||||
context.map(_.trim).filter(_.nonEmpty).map { str =>
|
||||
// This is a bit… well. The common denominator for the dbms used is string aggregation
|
||||
// when combining multiple matches. So the `ContextEntry` objects are concatenated and
|
||||
// separated by comma. TemplateFtsTable ensures than the single entries are all json
|
||||
// objects.
|
||||
val jsonStr = s"[ $str ]"
|
||||
io.circe.parser
|
||||
.decode[List[Option[ContextEntry]]](jsonStr)
|
||||
.left
|
||||
.map(_.getMessage)
|
||||
.map(_.flatten)
|
||||
}
|
||||
}
|
||||
|
@ -18,15 +18,17 @@ import docspell.common.{FileKey, IdRef, _}
|
||||
import docspell.query.ItemQuery.Expr.ValidItemStates
|
||||
import docspell.query.{ItemQuery, ItemQueryDsl}
|
||||
import docspell.store.Store
|
||||
import docspell.store.impl.TempFtsTable
|
||||
import docspell.store.qb.DSL._
|
||||
import docspell.store.qb._
|
||||
import docspell.store.qb.generator.{ItemQueryGenerator, Tables}
|
||||
import docspell.store.queries.Query.OrderSelect
|
||||
import docspell.store.records._
|
||||
|
||||
import doobie.implicits._
|
||||
import doobie.{Query => _, _}
|
||||
|
||||
object QItem {
|
||||
object QItem extends FtsSupport {
|
||||
private[this] val logger = docspell.logging.getLogger[ConnectionIO]
|
||||
|
||||
private val equip = REquipment.as("e")
|
||||
@ -44,6 +46,35 @@ object QItem {
|
||||
private val ti = RTagItem.as("ti")
|
||||
private val meta = RFileMeta.as("fmeta")
|
||||
|
||||
private def orderSelect(ftsOpt: Option[TempFtsTable.Table]): OrderSelect =
|
||||
new OrderSelect {
|
||||
val item = i
|
||||
val fts = ftsOpt
|
||||
}
|
||||
|
||||
private val emptyString: SelectExpr = const("")
|
||||
|
||||
def queryItems(
|
||||
q: Query,
|
||||
today: LocalDate,
|
||||
maxNoteLen: Int,
|
||||
batch: Batch,
|
||||
ftsTable: Option[TempFtsTable.Table]
|
||||
) = {
|
||||
val cteFts = ftsTable.map(cteTable)
|
||||
val sql =
|
||||
findItemsBase(q.fix, today, maxNoteLen, cteFts)
|
||||
.changeWhere(c => c && queryCondition(today, q.fix.account.collective, q.cond))
|
||||
.joinFtsDetails(i, ftsTable)
|
||||
.limit(batch)
|
||||
.build
|
||||
|
||||
logger.stream.debug(s"List $batch items: $sql").drain ++
|
||||
sql.query[ListItem].stream
|
||||
}
|
||||
|
||||
// ----
|
||||
|
||||
def countAttachmentsAndItems(items: Nel[Ident]): ConnectionIO[Int] =
|
||||
Select(count(a.id).s, from(a), a.itemId.in(items)).build
|
||||
.query[Int]
|
||||
@ -115,7 +146,12 @@ object QItem {
|
||||
ItemQuery.Expr.and(ValidItemStates, ItemQueryDsl.Q.itemIdsIn(nel.map(_.id)))
|
||||
val account = AccountId(collective, Ident.unsafe(""))
|
||||
|
||||
findItemsBase(Query.Fix(account, Some(expr), None), LocalDate.EPOCH, 0).build
|
||||
findItemsBase(
|
||||
Query.Fix(account, Some(expr), None),
|
||||
LocalDate.EPOCH,
|
||||
0,
|
||||
None
|
||||
).build
|
||||
.query[ListItem]
|
||||
.to[Vector]
|
||||
}
|
||||
@ -130,7 +166,12 @@ object QItem {
|
||||
cv.itemId === itemId
|
||||
).build.query[ItemFieldValue].to[Vector]
|
||||
|
||||
private def findItemsBase(q: Query.Fix, today: LocalDate, noteMaxLen: Int): Select = {
|
||||
private def findItemsBase(
|
||||
q: Query.Fix,
|
||||
today: LocalDate,
|
||||
noteMaxLen: Int,
|
||||
ftsTable: Option[TempFtsTable.Table]
|
||||
): Select.Ordered = {
|
||||
val coll = q.account.collective
|
||||
|
||||
Select(
|
||||
@ -154,8 +195,9 @@ object QItem {
|
||||
f.id.s,
|
||||
f.name.s,
|
||||
substring(i.notes.s, 1, noteMaxLen).s,
|
||||
q.orderAsc
|
||||
.map(of => coalesce(of(i).s, i.created.s).s)
|
||||
ftsTable.map(_.context.s).getOrElse(emptyString),
|
||||
q.order
|
||||
.map(f => f(orderSelect(ftsTable)).expr)
|
||||
.getOrElse(i.created.s)
|
||||
),
|
||||
from(i)
|
||||
@ -172,8 +214,8 @@ object QItem {
|
||||
)
|
||||
)
|
||||
).orderBy(
|
||||
q.orderAsc
|
||||
.map(of => OrderBy.asc(coalesce(of(i).s, i.created.s).s))
|
||||
q.order
|
||||
.map(of => of(orderSelect(ftsTable)))
|
||||
.getOrElse(OrderBy.desc(coalesce(i.itemDate.s, i.created.s).s))
|
||||
)
|
||||
}
|
||||
@ -184,7 +226,7 @@ object QItem {
|
||||
today: LocalDate,
|
||||
maxFiles: Int
|
||||
): Select =
|
||||
findItemsBase(q.fix, today, 0)
|
||||
findItemsBase(q.fix, today, 0, None)
|
||||
.changeFrom(_.innerJoin(a, a.itemId === i.id).innerJoin(as, a.id === as.id))
|
||||
.changeFrom(from =>
|
||||
ftype match {
|
||||
@ -277,26 +319,22 @@ object QItem {
|
||||
today: LocalDate,
|
||||
maxNoteLen: Int,
|
||||
batch: Batch
|
||||
): Stream[ConnectionIO, ListItem] = {
|
||||
val sql = findItemsBase(q.fix, today, maxNoteLen)
|
||||
.changeWhere(c => c && queryCondition(today, q.fix.account.collective, q.cond))
|
||||
.limit(batch)
|
||||
.build
|
||||
logger.stream.trace(s"List $batch items: $sql").drain ++
|
||||
sql.query[ListItem].stream
|
||||
}
|
||||
): Stream[ConnectionIO, ListItem] =
|
||||
queryItems(q, today, maxNoteLen, batch, None)
|
||||
|
||||
def searchStats(today: LocalDate)(q: Query): ConnectionIO[SearchSummary] =
|
||||
def searchStats(today: LocalDate, ftsTable: Option[TempFtsTable.Table])(
|
||||
q: Query
|
||||
): ConnectionIO[SearchSummary] =
|
||||
for {
|
||||
count <- searchCountSummary(today)(q)
|
||||
tags <- searchTagSummary(today)(q)
|
||||
cats <- searchTagCategorySummary(today)(q)
|
||||
fields <- searchFieldSummary(today)(q)
|
||||
folders <- searchFolderSummary(today)(q)
|
||||
orgs <- searchCorrOrgSummary(today)(q)
|
||||
corrPers <- searchCorrPersonSummary(today)(q)
|
||||
concPers <- searchConcPersonSummary(today)(q)
|
||||
concEquip <- searchConcEquipSummary(today)(q)
|
||||
count <- searchCountSummary(today, ftsTable)(q)
|
||||
tags <- searchTagSummary(today, ftsTable)(q)
|
||||
cats <- searchTagCategorySummary(today, ftsTable)(q)
|
||||
fields <- searchFieldSummary(today, ftsTable)(q)
|
||||
folders <- searchFolderSummary(today, ftsTable)(q)
|
||||
orgs <- searchCorrOrgSummary(today, ftsTable)(q)
|
||||
corrPers <- searchCorrPersonSummary(today, ftsTable)(q)
|
||||
concPers <- searchConcPersonSummary(today, ftsTable)(q)
|
||||
concEquip <- searchConcEquipSummary(today, ftsTable)(q)
|
||||
} yield SearchSummary(
|
||||
count,
|
||||
tags,
|
||||
@ -310,7 +348,8 @@ object QItem {
|
||||
)
|
||||
|
||||
def searchTagCategorySummary(
|
||||
today: LocalDate
|
||||
today: LocalDate,
|
||||
ftsTable: Option[TempFtsTable.Table]
|
||||
)(q: Query): ConnectionIO[List[CategoryCount]] = {
|
||||
val tagFrom =
|
||||
from(ti)
|
||||
@ -318,7 +357,8 @@ object QItem {
|
||||
.innerJoin(i, i.id === ti.itemId)
|
||||
|
||||
val catCloud =
|
||||
findItemsBase(q.fix, today, 0).unwrap
|
||||
findItemsBase(q.fix, today, 0, None).unwrap
|
||||
.joinFtsIdOnly(i, ftsTable)
|
||||
.withSelect(select(tag.category).append(countDistinct(i.id).as("num")))
|
||||
.changeFrom(_.prepend(tagFrom))
|
||||
.changeWhere(c => c && queryCondition(today, q.fix.account.collective, q.cond))
|
||||
@ -334,14 +374,17 @@ object QItem {
|
||||
} yield existing ++ other.map(n => CategoryCount(n.some, 0))
|
||||
}
|
||||
|
||||
def searchTagSummary(today: LocalDate)(q: Query): ConnectionIO[List[TagCount]] = {
|
||||
def searchTagSummary(today: LocalDate, ftsTable: Option[TempFtsTable.Table])(
|
||||
q: Query
|
||||
): ConnectionIO[List[TagCount]] = {
|
||||
val tagFrom =
|
||||
from(ti)
|
||||
.innerJoin(tag, tag.tid === ti.tagId)
|
||||
.innerJoin(i, i.id === ti.itemId)
|
||||
|
||||
val tagCloud =
|
||||
findItemsBase(q.fix, today, 0).unwrap
|
||||
findItemsBase(q.fix, today, 0, None).unwrap
|
||||
.joinFtsIdOnly(i, ftsTable)
|
||||
.withSelect(select(tag.all).append(countDistinct(i.id).as("num")))
|
||||
.changeFrom(_.prepend(tagFrom))
|
||||
.changeWhere(c => c && queryCondition(today, q.fix.account.collective, q.cond))
|
||||
@ -358,39 +401,46 @@ object QItem {
|
||||
} yield existing ++ other.map(TagCount(_, 0))
|
||||
}
|
||||
|
||||
def searchCountSummary(today: LocalDate)(q: Query): ConnectionIO[Int] =
|
||||
findItemsBase(q.fix, today, 0).unwrap
|
||||
def searchCountSummary(today: LocalDate, ftsTable: Option[TempFtsTable.Table])(
|
||||
q: Query
|
||||
): ConnectionIO[Int] =
|
||||
findItemsBase(q.fix, today, 0, None).unwrap
|
||||
.joinFtsIdOnly(i, ftsTable)
|
||||
.withSelect(Nel.of(count(i.id).as("num")))
|
||||
.changeWhere(c => c && queryCondition(today, q.fix.account.collective, q.cond))
|
||||
.build
|
||||
.query[Int]
|
||||
.unique
|
||||
|
||||
def searchCorrOrgSummary(today: LocalDate)(q: Query): ConnectionIO[List[IdRefCount]] =
|
||||
searchIdRefSummary(org.oid, org.name, i.corrOrg, today)(q)
|
||||
|
||||
def searchCorrPersonSummary(today: LocalDate)(
|
||||
def searchCorrOrgSummary(today: LocalDate, ftsTable: Option[TempFtsTable.Table])(
|
||||
q: Query
|
||||
): ConnectionIO[List[IdRefCount]] =
|
||||
searchIdRefSummary(pers0.pid, pers0.name, i.corrPerson, today)(q)
|
||||
searchIdRefSummary(org.oid, org.name, i.corrOrg, today, ftsTable)(q)
|
||||
|
||||
def searchConcPersonSummary(today: LocalDate)(
|
||||
def searchCorrPersonSummary(today: LocalDate, ftsTable: Option[TempFtsTable.Table])(
|
||||
q: Query
|
||||
): ConnectionIO[List[IdRefCount]] =
|
||||
searchIdRefSummary(pers1.pid, pers1.name, i.concPerson, today)(q)
|
||||
searchIdRefSummary(pers0.pid, pers0.name, i.corrPerson, today, ftsTable)(q)
|
||||
|
||||
def searchConcEquipSummary(today: LocalDate)(
|
||||
def searchConcPersonSummary(today: LocalDate, ftsTable: Option[TempFtsTable.Table])(
|
||||
q: Query
|
||||
): ConnectionIO[List[IdRefCount]] =
|
||||
searchIdRefSummary(equip.eid, equip.name, i.concEquipment, today)(q)
|
||||
searchIdRefSummary(pers1.pid, pers1.name, i.concPerson, today, ftsTable)(q)
|
||||
|
||||
def searchConcEquipSummary(today: LocalDate, ftsTable: Option[TempFtsTable.Table])(
|
||||
q: Query
|
||||
): ConnectionIO[List[IdRefCount]] =
|
||||
searchIdRefSummary(equip.eid, equip.name, i.concEquipment, today, ftsTable)(q)
|
||||
|
||||
private def searchIdRefSummary(
|
||||
idCol: Column[Ident],
|
||||
nameCol: Column[String],
|
||||
fkCol: Column[Ident],
|
||||
today: LocalDate
|
||||
today: LocalDate,
|
||||
ftsTable: Option[TempFtsTable.Table]
|
||||
)(q: Query): ConnectionIO[List[IdRefCount]] =
|
||||
findItemsBase(q.fix, today, 0).unwrap
|
||||
findItemsBase(q.fix, today, 0, None).unwrap
|
||||
.joinFtsIdOnly(i, ftsTable)
|
||||
.withSelect(select(idCol, nameCol).append(count(idCol).as("num")))
|
||||
.changeWhere(c =>
|
||||
c && fkCol.isNotNull && queryCondition(today, q.fix.account.collective, q.cond)
|
||||
@ -400,9 +450,12 @@ object QItem {
|
||||
.query[IdRefCount]
|
||||
.to[List]
|
||||
|
||||
def searchFolderSummary(today: LocalDate)(q: Query): ConnectionIO[List[FolderCount]] = {
|
||||
def searchFolderSummary(today: LocalDate, ftsTable: Option[TempFtsTable.Table])(
|
||||
q: Query
|
||||
): ConnectionIO[List[FolderCount]] = {
|
||||
val fu = RUser.as("fu")
|
||||
findItemsBase(q.fix, today, 0).unwrap
|
||||
findItemsBase(q.fix, today, 0, None).unwrap
|
||||
.joinFtsIdOnly(i, ftsTable)
|
||||
.withSelect(select(f.id, f.name, f.owner, fu.login).append(count(i.id).as("num")))
|
||||
.changeFrom(_.innerJoin(fu, fu.uid === f.owner))
|
||||
.changeWhere(c => c && queryCondition(today, q.fix.account.collective, q.cond))
|
||||
@ -412,14 +465,17 @@ object QItem {
|
||||
.to[List]
|
||||
}
|
||||
|
||||
def searchFieldSummary(today: LocalDate)(q: Query): ConnectionIO[List[FieldStats]] = {
|
||||
def searchFieldSummary(today: LocalDate, ftsTable: Option[TempFtsTable.Table])(
|
||||
q: Query
|
||||
): ConnectionIO[List[FieldStats]] = {
|
||||
val fieldJoin =
|
||||
from(cv)
|
||||
.innerJoin(cf, cf.id === cv.field)
|
||||
.innerJoin(i, i.id === cv.itemId)
|
||||
|
||||
val base =
|
||||
findItemsBase(q.fix, today, 0).unwrap
|
||||
findItemsBase(q.fix, today, 0, None).unwrap
|
||||
.joinFtsIdOnly(i, ftsTable)
|
||||
.changeFrom(_.prepend(fieldJoin))
|
||||
.changeWhere(c => c && queryCondition(today, q.fix.account.collective, q.cond))
|
||||
.groupBy(GroupBy(cf.all))
|
||||
@ -498,7 +554,7 @@ object QItem {
|
||||
)
|
||||
)
|
||||
|
||||
val from = findItemsBase(q.fix, today, maxNoteLen)
|
||||
val from = findItemsBase(q.fix, today, maxNoteLen, None)
|
||||
.appendCte(cte)
|
||||
.appendSelect(Tids.weight.s)
|
||||
.changeFrom(_.innerJoin(Tids, Tids.itemId === i.id))
|
||||
|
@ -8,7 +8,9 @@ package docspell.store.queries
|
||||
|
||||
import docspell.common._
|
||||
import docspell.query.ItemQuery
|
||||
import docspell.store.qb.Column
|
||||
import docspell.store.impl.TempFtsTable
|
||||
import docspell.store.qb.DSL._
|
||||
import docspell.store.qb.{Column, OrderBy}
|
||||
import docspell.store.records.RItem
|
||||
|
||||
case class Query(fix: Query.Fix, cond: Query.QueryCond) {
|
||||
@ -16,7 +18,7 @@ case class Query(fix: Query.Fix, cond: Query.QueryCond) {
|
||||
copy(cond = f(cond))
|
||||
|
||||
def withOrder(orderAsc: RItem.Table => Column[_]): Query =
|
||||
withFix(_.copy(orderAsc = Some(orderAsc)))
|
||||
withFix(_.copy(order = Some(_.byItemColumnAsc(orderAsc))))
|
||||
|
||||
def withFix(f: Query.Fix => Query.Fix): Query =
|
||||
copy(fix = f(fix))
|
||||
@ -29,6 +31,19 @@ case class Query(fix: Query.Fix, cond: Query.QueryCond) {
|
||||
}
|
||||
|
||||
object Query {
|
||||
trait OrderSelect {
|
||||
def item: RItem.Table
|
||||
def fts: Option[TempFtsTable.Table]
|
||||
|
||||
def byDefault: OrderBy =
|
||||
OrderBy.desc(coalesce(item.itemDate.s, item.created.s).s)
|
||||
|
||||
def byItemColumnAsc(f: RItem.Table => Column[_]): OrderBy =
|
||||
OrderBy.asc(coalesce(f(item).s, item.created.s).s)
|
||||
|
||||
def byScore: OrderBy =
|
||||
fts.map(t => OrderBy.desc(t.score.s)).getOrElse(byDefault)
|
||||
}
|
||||
|
||||
def apply(fix: Fix): Query =
|
||||
Query(fix, QueryExpr(None))
|
||||
@ -36,7 +51,7 @@ object Query {
|
||||
case class Fix(
|
||||
account: AccountId,
|
||||
query: Option[ItemQuery.Expr],
|
||||
orderAsc: Option[RItem.Table => Column[_]]
|
||||
order: Option[OrderSelect => OrderBy]
|
||||
) {
|
||||
|
||||
def isEmpty: Boolean =
|
||||
|
@ -1,19 +1,27 @@
|
||||
/*
|
||||
* Copyright 2020 Eike K. & Contributors
|
||||
*
|
||||
* SPDX-License-Identifier: AGPL-3.0-or-later
|
||||
*/
|
||||
|
||||
package docspell.store
|
||||
|
||||
import java.util.UUID
|
||||
|
||||
import cats.effect._
|
||||
|
||||
import docspell.common._
|
||||
import docspell.logging.TestLoggingConfig
|
||||
|
||||
import com.dimafeng.testcontainers.munit.fixtures.TestContainersFixtures
|
||||
import com.dimafeng.testcontainers.{
|
||||
JdbcDatabaseContainer,
|
||||
MariaDBContainer,
|
||||
PostgreSQLContainer
|
||||
}
|
||||
import com.dimafeng.testcontainers.munit.fixtures.TestContainersFixtures
|
||||
import docspell.common._
|
||||
import docspell.logging.TestLoggingConfig
|
||||
import doobie._
|
||||
import munit.CatsEffectSuite
|
||||
import org.testcontainers.utility.DockerImageName
|
||||
import doobie._
|
||||
|
||||
import java.util.UUID
|
||||
|
||||
trait DatabaseTest
|
||||
extends CatsEffectSuite
|
||||
|
@ -1,3 +1,9 @@
|
||||
/*
|
||||
* Copyright 2020 Eike K. & Contributors
|
||||
*
|
||||
* SPDX-License-Identifier: AGPL-3.0-or-later
|
||||
*/
|
||||
|
||||
package docspell.store
|
||||
|
||||
import cats.effect._
|
||||
|
@ -0,0 +1,200 @@
|
||||
/*
|
||||
* Copyright 2020 Eike K. & Contributors
|
||||
*
|
||||
* SPDX-License-Identifier: AGPL-3.0-or-later
|
||||
*/
|
||||
|
||||
package docspell.store.impl
|
||||
|
||||
import java.time.LocalDate
|
||||
|
||||
import cats.effect.IO
|
||||
import cats.syntax.option._
|
||||
import cats.syntax.traverse._
|
||||
import fs2.Stream
|
||||
|
||||
import docspell.common._
|
||||
import docspell.ftsclient.FtsResult
|
||||
import docspell.ftsclient.FtsResult.{AttachmentData, ItemMatch}
|
||||
import docspell.logging.Level
|
||||
import docspell.store._
|
||||
import docspell.store.impl.TempFtsTable.Row
|
||||
import docspell.store.qb.DSL._
|
||||
import docspell.store.qb._
|
||||
import docspell.store.queries.{QItem, Query}
|
||||
import docspell.store.records.{RCollective, RItem}
|
||||
|
||||
import doobie._
|
||||
|
||||
class TempFtsTableTest extends DatabaseTest {
|
||||
private[this] val logger = docspell.logging.getLogger[IO]
|
||||
override def rootMinimumLevel = Level.Info
|
||||
|
||||
override def munitFixtures = postgresAll ++ mariaDbAll ++ h2All
|
||||
|
||||
def id(str: String): Ident = Ident.unsafe(str)
|
||||
|
||||
def stores: (Store[IO], Store[IO], Store[IO]) =
|
||||
(pgStore(), mariaStore(), h2Store())
|
||||
|
||||
test("create temporary table") {
|
||||
val (pg, maria, h2) = stores
|
||||
for {
|
||||
_ <- assertCreateTempTable(pg)
|
||||
_ <- assertCreateTempTable(maria)
|
||||
_ <- assertCreateTempTable(h2)
|
||||
} yield ()
|
||||
}
|
||||
|
||||
test("query items sql") {
|
||||
val (pg, maria, h2) = stores
|
||||
for {
|
||||
_ <- prepareItems(pg)
|
||||
_ <- prepareItems(maria)
|
||||
_ <- prepareItems(h2)
|
||||
_ <- assertQueryItem(pg, ftsResults(10, 10))
|
||||
_ <- assertQueryItem(pg, ftsResults(3000, 500))
|
||||
_ <- assertQueryItem(pg, ftsResults(3000, 500))
|
||||
_ <- assertQueryItem(maria, ftsResults(10, 10))
|
||||
_ <- assertQueryItem(maria, ftsResults(3000, 500))
|
||||
_ <- assertQueryItem(h2, ftsResults(10, 10))
|
||||
_ <- assertQueryItem(h2, ftsResults(3000, 500))
|
||||
} yield ()
|
||||
}
|
||||
|
||||
def prepareItems(store: Store[IO]) =
|
||||
for {
|
||||
_ <- store.transact(RCollective.insert(makeCollective(DocspellSystem.user)))
|
||||
items = (0 until 200)
|
||||
.map(makeItem(_, DocspellSystem.user))
|
||||
.toList
|
||||
_ <- items.traverse(i => store.transact(RItem.insert(i)))
|
||||
} yield ()
|
||||
|
||||
def assertCreateTempTable(store: Store[IO]) = {
|
||||
val insertRows =
|
||||
List(
|
||||
Row(id("abc-def"), None, None),
|
||||
Row(id("abc-123"), Some(1.56), None),
|
||||
Row(id("zyx-321"), None, None)
|
||||
)
|
||||
val create =
|
||||
for {
|
||||
table <- TempFtsTable.createTable(store.dbms, "tt")
|
||||
n <- table.insertAll(insertRows)
|
||||
_ <- table.createIndex
|
||||
rows <- Select(select(table.all), from(table))
|
||||
.orderBy(table.id)
|
||||
.build
|
||||
.query[Row]
|
||||
.to[List]
|
||||
} yield (n, rows)
|
||||
|
||||
val verify =
|
||||
store.transact(create).map { case (inserted, rows) =>
|
||||
if (store.dbms != Db.MariaDB) {
|
||||
assertEquals(inserted, 3)
|
||||
}
|
||||
assertEquals(rows, insertRows.sortBy(_.id))
|
||||
}
|
||||
|
||||
verify *> verify
|
||||
}
|
||||
|
||||
def assertQueryItem(store: Store[IO], ftsResults: Stream[ConnectionIO, FtsResult]) =
|
||||
for {
|
||||
today <- IO(LocalDate.now())
|
||||
account = DocspellSystem.account
|
||||
tempTable = ftsResults
|
||||
.through(TempFtsTable.prepareTable(store.dbms, "fts_result"))
|
||||
.compile
|
||||
.lastOrError
|
||||
q = Query(Query.Fix(account, None, None), Query.QueryExpr(None))
|
||||
timed <- Duration.stopTime[IO]
|
||||
items <- store
|
||||
.transact(
|
||||
tempTable.flatMap(t =>
|
||||
QItem
|
||||
.queryItems(q, today, 0, Batch.limit(10), t.some)
|
||||
.compile
|
||||
.to(List)
|
||||
)
|
||||
)
|
||||
duration <- timed
|
||||
_ <- logger.info(s"Join took: ${duration.formatExact}")
|
||||
|
||||
} yield {
|
||||
assert(items.nonEmpty)
|
||||
assert(items.head.context.isDefined)
|
||||
}
|
||||
|
||||
def ftsResult(start: Int, end: Int): FtsResult = {
|
||||
def matchData(n: Int): List[ItemMatch] =
|
||||
List(
|
||||
ItemMatch(
|
||||
id(s"m$n"),
|
||||
id(s"item-$n"),
|
||||
DocspellSystem.user,
|
||||
math.random(),
|
||||
FtsResult.ItemData
|
||||
),
|
||||
ItemMatch(
|
||||
id(s"m$n-1"),
|
||||
id(s"item-$n"),
|
||||
DocspellSystem.user,
|
||||
math.random(),
|
||||
AttachmentData(id(s"item-$n-attach-1"), "attachment.pdf")
|
||||
)
|
||||
)
|
||||
|
||||
val hl =
|
||||
(start until end)
|
||||
.flatMap(n =>
|
||||
List(
|
||||
id(s"m$n-1") -> List("this *a test* please"),
|
||||
id(s"m$n") -> List("only **items** here")
|
||||
)
|
||||
)
|
||||
.toMap
|
||||
|
||||
FtsResult.empty
|
||||
.copy(
|
||||
count = end,
|
||||
highlight = hl,
|
||||
results = (start until end).toList.flatMap(matchData)
|
||||
)
|
||||
}
|
||||
|
||||
def ftsResults(len: Int, chunkSize: Int): Stream[ConnectionIO, FtsResult] = {
|
||||
val chunks = len / chunkSize
|
||||
Stream.range(0, chunks).map { n =>
|
||||
val start = n * chunkSize
|
||||
val end = start + chunkSize
|
||||
ftsResult(start, end)
|
||||
}
|
||||
}
|
||||
|
||||
def makeCollective(cid: Ident): RCollective =
|
||||
RCollective(cid, CollectiveState.Active, Language.English, true, Timestamp.Epoch)
|
||||
|
||||
def makeItem(n: Int, cid: Ident): RItem =
|
||||
RItem(
|
||||
id(s"item-$n"),
|
||||
cid,
|
||||
s"item $n",
|
||||
None,
|
||||
"test",
|
||||
Direction.Incoming,
|
||||
ItemState.Created,
|
||||
None,
|
||||
None,
|
||||
None,
|
||||
None,
|
||||
None,
|
||||
None,
|
||||
Timestamp.Epoch,
|
||||
Timestamp.Epoch,
|
||||
None,
|
||||
None
|
||||
)
|
||||
}
|
@ -1,55 +0,0 @@
|
||||
package docspell.store.impl
|
||||
|
||||
import cats.effect.IO
|
||||
import docspell.common.Ident
|
||||
import docspell.store._
|
||||
import docspell.store.impl.TempIdTable.Row
|
||||
import docspell.store.qb._
|
||||
import docspell.store.qb.DSL._
|
||||
|
||||
class TempIdTableTest extends DatabaseTest {
|
||||
|
||||
override def munitFixtures = postgresAll ++ mariaDbAll ++ h2All
|
||||
|
||||
def id(str: String): Ident = Ident.unsafe(str)
|
||||
|
||||
test("create temporary table postgres") {
|
||||
val store = pgStore()
|
||||
assertCreateTempTable(store)
|
||||
}
|
||||
|
||||
test("create temporary table mariadb") {
|
||||
val store = mariaStore()
|
||||
assertCreateTempTable(store)
|
||||
}
|
||||
|
||||
test("create temporary table h2") {
|
||||
val store = h2Store()
|
||||
assertCreateTempTable(store)
|
||||
}
|
||||
|
||||
def assertCreateTempTable(store: Store[IO]) = {
|
||||
val insertRows = List(Row(id("abc-def")), Row(id("abc-123")), Row(id("zyx-321")))
|
||||
val create =
|
||||
for {
|
||||
table <- TempIdTable.createTable(store.dbms, "tt")
|
||||
n <- table.insertAll(insertRows)
|
||||
_ <- table.createIndex
|
||||
rows <- Select(select(table.all), from(table))
|
||||
.orderBy(table.id)
|
||||
.build
|
||||
.query[Row]
|
||||
.to[List]
|
||||
} yield (n, rows)
|
||||
|
||||
val verify =
|
||||
store.transact(create).map { case (inserted, rows) =>
|
||||
if (store.dbms != Db.MariaDB) {
|
||||
assertEquals(inserted, 3)
|
||||
}
|
||||
assertEquals(rows, insertRows.sortBy(_.id))
|
||||
}
|
||||
|
||||
verify *> verify
|
||||
}
|
||||
}
|
@ -7,7 +7,9 @@
|
||||
package docspell.store.migrate
|
||||
|
||||
import cats.effect._
|
||||
|
||||
import docspell.store.{DatabaseTest, SchemaMigrateConfig, StoreFixture}
|
||||
|
||||
import org.flywaydb.core.api.output.MigrateResult
|
||||
|
||||
class MigrateTest extends DatabaseTest {
|
||||
|
Reference in New Issue
Block a user