mirror of
https://github.com/TheAnachronism/docspell.git
synced 2025-06-21 18:08:25 +00:00
Introducing fts client into codebase
This commit is contained in:
@ -364,4 +364,12 @@ docspell.joex {
|
||||
# By default all files are allowed.
|
||||
valid-mime-types = [ ]
|
||||
}
|
||||
|
||||
# Configuration of the full-text search engine.
|
||||
full-text-search {
|
||||
enabled = true
|
||||
solr = {
|
||||
url = "http://localhost:8983/solr/docspell_core"
|
||||
}
|
||||
}
|
||||
}
|
@ -8,6 +8,7 @@ import docspell.convert.ConvertConfig
|
||||
import docspell.extract.ExtractConfig
|
||||
import docspell.joex.hk.HouseKeepingConfig
|
||||
import docspell.backend.Config.Files
|
||||
import docspell.ftssolr.SolrConfig
|
||||
|
||||
case class Config(
|
||||
appId: Ident,
|
||||
@ -23,7 +24,8 @@ case class Config(
|
||||
convert: ConvertConfig,
|
||||
sendMail: MailSendConfig,
|
||||
files: Files,
|
||||
mailDebug: Boolean
|
||||
mailDebug: Boolean,
|
||||
fullTextSearch: Config.FullTextSearch
|
||||
)
|
||||
|
||||
object Config {
|
||||
@ -34,4 +36,6 @@ object Config {
|
||||
math.min(mailChunkSize, maxMails)
|
||||
}
|
||||
case class UserTasks(scanMailbox: ScanMailbox)
|
||||
|
||||
case class FullTextSearch(enabled: Boolean, solr: SolrConfig)
|
||||
}
|
||||
|
@ -14,8 +14,10 @@ import docspell.joexapi.client.JoexClient
|
||||
import docspell.store.Store
|
||||
import docspell.store.queue._
|
||||
import docspell.store.records.RJobLog
|
||||
import docspell.ftssolr.SolrFtsClient
|
||||
import fs2.concurrent.SignallingRef
|
||||
import scala.concurrent.ExecutionContext
|
||||
import org.http4s.client.blaze.BlazeClientBuilder
|
||||
|
||||
final class JoexAppImpl[F[_]: ConcurrentEffect: ContextShift: Timer](
|
||||
cfg: Config,
|
||||
@ -63,13 +65,15 @@ object JoexAppImpl {
|
||||
blocker: Blocker
|
||||
): Resource[F, JoexApp[F]] =
|
||||
for {
|
||||
client <- JoexClient.resource(clientEC)
|
||||
httpClient <- BlazeClientBuilder[F](clientEC).resource
|
||||
client = JoexClient(httpClient)
|
||||
store <- Store.create(cfg.jdbc, connectEC, blocker)
|
||||
queue <- JobQueue(store)
|
||||
pstore <- PeriodicTaskStore.create(store)
|
||||
nodeOps <- ONode(store)
|
||||
joex <- OJoex(client, store)
|
||||
upload <- OUpload(store, queue, cfg.files, joex)
|
||||
fts <- SolrFtsClient(cfg.fullTextSearch.solr, httpClient)
|
||||
javaEmil =
|
||||
JavaMailEmil(blocker, Settings.defaultSettings.copy(debug = cfg.mailDebug))
|
||||
sch <- SchedulerBuilder(cfg.scheduler, blocker, store)
|
||||
@ -77,7 +81,7 @@ object JoexAppImpl {
|
||||
.withTask(
|
||||
JobTask.json(
|
||||
ProcessItemArgs.taskName,
|
||||
ItemHandler.newItem[F](cfg),
|
||||
ItemHandler.newItem[F](cfg, fts),
|
||||
ItemHandler.onCancel[F]
|
||||
)
|
||||
)
|
||||
|
@ -8,6 +8,7 @@ import docspell.joex.Config
|
||||
import docspell.joex.scheduler.Task
|
||||
import docspell.store.queries.QItem
|
||||
import docspell.store.records.RItem
|
||||
import docspell.ftsclient.FtsClient
|
||||
|
||||
object ItemHandler {
|
||||
def onCancel[F[_]: Sync: ContextShift]: Task[F, ProcessItemArgs, Unit] =
|
||||
@ -16,11 +17,11 @@ object ItemHandler {
|
||||
)
|
||||
|
||||
def newItem[F[_]: ConcurrentEffect: ContextShift](
|
||||
cfg: Config
|
||||
cfg: Config, fts: FtsClient[F]
|
||||
): Task[F, ProcessItemArgs, Unit] =
|
||||
CreateItem[F]
|
||||
.flatMap(itemStateTask(ItemState.Processing))
|
||||
.flatMap(safeProcess[F](cfg))
|
||||
.flatMap(safeProcess[F](cfg, fts))
|
||||
.map(_ => ())
|
||||
|
||||
def itemStateTask[F[_]: Sync, A](
|
||||
@ -36,11 +37,11 @@ object ItemHandler {
|
||||
Task(_.isLastRetry)
|
||||
|
||||
def safeProcess[F[_]: ConcurrentEffect: ContextShift](
|
||||
cfg: Config
|
||||
cfg: Config, fts: FtsClient[F]
|
||||
)(data: ItemData): Task[F, ProcessItemArgs, ItemData] =
|
||||
isLastRetry[F].flatMap {
|
||||
case true =>
|
||||
ProcessItem[F](cfg)(data).attempt.flatMap({
|
||||
ProcessItem[F](cfg, fts)(data).attempt.flatMap({
|
||||
case Right(d) =>
|
||||
Task.pure(d)
|
||||
case Left(ex) =>
|
||||
@ -50,7 +51,7 @@ object ItemHandler {
|
||||
.andThen(_ => Sync[F].raiseError(ex))
|
||||
})
|
||||
case false =>
|
||||
ProcessItem[F](cfg)(data).flatMap(itemStateTask(ItemState.Created))
|
||||
ProcessItem[F](cfg, fts)(data).flatMap(itemStateTask(ItemState.Created))
|
||||
}
|
||||
|
||||
def deleteByFileIds[F[_]: Sync: ContextShift]: Task[F, ProcessItemArgs, Unit] =
|
||||
|
@ -5,17 +5,18 @@ import docspell.common.ProcessItemArgs
|
||||
import docspell.analysis.TextAnalysisConfig
|
||||
import docspell.joex.scheduler.Task
|
||||
import docspell.joex.Config
|
||||
import docspell.ftsclient.FtsClient
|
||||
|
||||
object ProcessItem {
|
||||
|
||||
def apply[F[_]: ConcurrentEffect: ContextShift](
|
||||
cfg: Config
|
||||
cfg: Config, fts: FtsClient[F]
|
||||
)(item: ItemData): Task[F, ProcessItemArgs, ItemData] =
|
||||
ExtractArchive(item)
|
||||
.flatMap(Task.setProgress(20))
|
||||
.flatMap(ConvertPdf(cfg.convert, _))
|
||||
.flatMap(Task.setProgress(40))
|
||||
.flatMap(TextExtraction(cfg.extraction, _))
|
||||
.flatMap(TextExtraction(cfg.extraction, fts))
|
||||
.flatMap(Task.setProgress(60))
|
||||
.flatMap(analysisOnly[F](cfg.textAnalysis))
|
||||
.flatMap(Task.setProgress(80))
|
||||
|
@ -1,19 +1,20 @@
|
||||
package docspell.joex.process
|
||||
|
||||
import fs2.Stream
|
||||
import bitpeace.{Mimetype, RangeDef}
|
||||
import cats.data.OptionT
|
||||
import cats.implicits._
|
||||
import cats.effect.{ContextShift, Sync}
|
||||
import cats.effect._
|
||||
import docspell.common._
|
||||
import docspell.extract.{ExtractConfig, ExtractResult, Extraction}
|
||||
import docspell.joex.scheduler.{Context, Task}
|
||||
import docspell.store.records.{RAttachment, RAttachmentMeta, RFileMeta}
|
||||
import docspell.store.syntax.MimeTypes._
|
||||
import docspell.ftsclient.{FtsClient, TextData}
|
||||
|
||||
object TextExtraction {
|
||||
|
||||
def apply[F[_]: Sync: ContextShift](
|
||||
cfg: ExtractConfig,
|
||||
def apply[F[_]: ConcurrentEffect: ContextShift](cfg: ExtractConfig, fts: FtsClient[F])(
|
||||
item: ItemData
|
||||
): Task[F, ProcessItemArgs, ItemData] =
|
||||
Task { ctx =>
|
||||
@ -23,8 +24,20 @@ object TextExtraction {
|
||||
txt <- item.attachments.traverse(
|
||||
extractTextIfEmpty(ctx, cfg, ctx.args.meta.language, item)
|
||||
)
|
||||
_ <- ctx.logger.debug("Storing extracted texts")
|
||||
_ <- txt.toList.traverse(rm => ctx.store.transact(RAttachmentMeta.upsert(rm)))
|
||||
_ <- ctx.logger.debug("Storing extracted texts")
|
||||
_ <- txt.toList.traverse(rm => ctx.store.transact(RAttachmentMeta.upsert(rm)))
|
||||
_ <- fts.indexData(
|
||||
Stream
|
||||
.emits(txt)
|
||||
.map(a =>
|
||||
TextData(
|
||||
item.item.id,
|
||||
a.id,
|
||||
ctx.args.meta.collective,
|
||||
a.content.getOrElse("")
|
||||
)
|
||||
)
|
||||
)
|
||||
dur <- start
|
||||
_ <- ctx.logger.info(s"Text extraction finished in ${dur.formatExact}")
|
||||
} yield item.copy(metas = txt)
|
||||
|
Reference in New Issue
Block a user