diff --git a/build.sbt b/build.sbt index d8ffd775..7f395a82 100644 --- a/build.sbt +++ b/build.sbt @@ -530,7 +530,7 @@ val schedulerApi = project Dependencies.fs2Core ++ Dependencies.circeCore ) - .dependsOn(loggingApi, common, store, pubsubApi) + .dependsOn(loggingApi, common, pubsubApi) val schedulerImpl = project .in(file("modules/scheduler/impl")) @@ -540,7 +540,7 @@ val schedulerImpl = project .settings( name := "docspell-scheduler-impl" ) - .dependsOn(schedulerApi, notificationApi, pubsubApi) + .dependsOn(store, schedulerApi, notificationApi, pubsubApi) val extract = project .in(file("modules/extract")) diff --git a/modules/joex/src/main/scala/docspell/joex/JoexAppImpl.scala b/modules/joex/src/main/scala/docspell/joex/JoexAppImpl.scala index ec9d5682..df0f53f0 100644 --- a/modules/joex/src/main/scala/docspell/joex/JoexAppImpl.scala +++ b/modules/joex/src/main/scala/docspell/joex/JoexAppImpl.scala @@ -149,91 +149,91 @@ object JoexAppImpl extends MailAddressCodec { .withTask( JobTask.json( ProcessItemArgs.taskName, - ItemHandler.newItem[F](cfg, itemOps, fts, analyser, regexNer), - ItemHandler.onCancel[F] + ItemHandler.newItem[F](cfg,store, itemOps, fts, analyser, regexNer), + ItemHandler.onCancel[F](store) ) ) .withTask( JobTask.json( ReProcessItemArgs.taskName, - ReProcessItem[F](cfg, fts, itemOps, analyser, regexNer), + ReProcessItem[F](cfg, fts, itemOps, analyser, regexNer, store), ReProcessItem.onCancel[F] ) ) .withTask( JobTask.json( ScanMailboxArgs.taskName, - ScanMailboxTask[F](cfg.userTasks.scanMailbox, javaEmil, upload, joex), + ScanMailboxTask[F](cfg.userTasks.scanMailbox, store, javaEmil, upload, joex), ScanMailboxTask.onCancel[F] ) ) .withTask( JobTask.json( MigrationTask.taskName, - MigrationTask[F](cfg.fullTextSearch, fts, createIndex), + MigrationTask[F](cfg.fullTextSearch, store, fts, createIndex), MigrationTask.onCancel[F] ) ) .withTask( JobTask.json( ReIndexTask.taskName, - ReIndexTask[F](cfg.fullTextSearch, fts, createIndex), + ReIndexTask[F](cfg.fullTextSearch, store, fts, createIndex), ReIndexTask.onCancel[F] ) ) .withTask( JobTask.json( HouseKeepingTask.taskName, - HouseKeepingTask[F](cfg, fileRepo), + HouseKeepingTask[F](cfg, store, fileRepo), HouseKeepingTask.onCancel[F] ) ) .withTask( JobTask.json( PdfConvTask.taskName, - PdfConvTask[F](cfg), + PdfConvTask[F](cfg, store), PdfConvTask.onCancel[F] ) ) .withTask( JobTask.json( ConvertAllPdfArgs.taskName, - ConvertAllPdfTask[F](jobStoreModule.jobs, joex), + ConvertAllPdfTask[F](jobStoreModule.jobs, joex, store), ConvertAllPdfTask.onCancel[F] ) ) .withTask( JobTask.json( LearnClassifierArgs.taskName, - LearnClassifierTask[F](cfg.textAnalysis, analyser), + LearnClassifierTask[F](cfg.textAnalysis, store, analyser), LearnClassifierTask.onCancel[F] ) ) .withTask( JobTask.json( MakePreviewArgs.taskName, - MakePreviewTask[F](cfg.extraction.preview), + MakePreviewTask[F](cfg.extraction.preview, store), MakePreviewTask.onCancel[F] ) ) .withTask( JobTask.json( AllPreviewsArgs.taskName, - AllPreviewsTask[F](jobStoreModule.jobs, joex), + AllPreviewsTask[F](jobStoreModule.jobs, joex, store), AllPreviewsTask.onCancel[F] ) ) .withTask( JobTask.json( MakePageCountArgs.taskName, - MakePageCountTask[F](), + MakePageCountTask[F](store), MakePageCountTask.onCancel[F] ) ) .withTask( JobTask.json( AllPageCountTask.taskName, - AllPageCountTask[F](jobStoreModule.jobs, joex), + AllPageCountTask[F](store, jobStoreModule.jobs, joex), AllPageCountTask.onCancel[F] ) ) @@ -250,6 +250,7 @@ object JoexAppImpl extends MailAddressCodec { UpdateCheckTask[F]( cfg.updateCheck, cfg.sendMail, + store, javaEmil, updateCheck, ThisVersion.default @@ -260,28 +261,28 @@ object JoexAppImpl extends MailAddressCodec { .withTask( JobTask.json( PeriodicQueryTask.taskName, - PeriodicQueryTask[F](notification), + PeriodicQueryTask[F](store, notification), PeriodicQueryTask.onCancel[F] ) ) .withTask( JobTask.json( PeriodicDueItemsTask.taskName, - PeriodicDueItemsTask[F](notification), + PeriodicDueItemsTask[F](store, notification), PeriodicDueItemsTask.onCancel[F] ) ) .withTask( JobTask.json( FileCopyTaskArgs.taskName, - FileCopyTask[F](cfg), + FileCopyTask[F](cfg, store), FileCopyTask.onCancel[F] ) ) .withTask( JobTask.json( FileIntegrityCheckArgs.taskName, - FileIntegrityCheckTask[F](fileRepo), + FileIntegrityCheckTask[F](fileRepo, store), FileIntegrityCheckTask.onCancel[F] ) ) diff --git a/modules/joex/src/main/scala/docspell/joex/filecopy/FileCopyTask.scala b/modules/joex/src/main/scala/docspell/joex/filecopy/FileCopyTask.scala index 2f201363..8c89a157 100644 --- a/modules/joex/src/main/scala/docspell/joex/filecopy/FileCopyTask.scala +++ b/modules/joex/src/main/scala/docspell/joex/filecopy/FileCopyTask.scala @@ -17,6 +17,7 @@ import docspell.logging.Logger import docspell.store.file.{BinnyUtils, FileRepository, FileRepositoryConfig} import binny.CopyTool.Counter import binny.{BinaryId, BinaryStore, CopyTool} +import docspell.store.Store import io.circe.generic.semiauto.deriveCodec import io.circe.{Codec, Decoder, Encoder} @@ -69,7 +70,7 @@ object FileCopyTask { def onCancel[F[_]]: Task[F, Args, Unit] = Task.log(_.warn(s"Cancelling ${FileCopyTaskArgs.taskName.id} task")) - def apply[F[_]: Async](cfg: Config): Task[F, Args, CopyResult] = + def apply[F[_]: Async](cfg: Config, store: Store[F]): Task[F, Args, CopyResult] = Task { ctx => val src = ctx.args.from .map(id => @@ -93,8 +94,8 @@ object FileCopyTask { .fromList(targets.filter(_ != srcConfig)) .toRight(CopyResult.noTargetStore) - srcRepo = ctx.store.createFileRepository(srcConfig, true) - targetRepos = trgConfig.map(ctx.store.createFileRepository(_, false)) + srcRepo = store.createFileRepository(srcConfig, true) + targetRepos = trgConfig.map(store.createFileRepository(_, false)) } yield (srcRepo, targetRepos) data match { diff --git a/modules/joex/src/main/scala/docspell/joex/filecopy/FileIntegrityCheckTask.scala b/modules/joex/src/main/scala/docspell/joex/filecopy/FileIntegrityCheckTask.scala index abd4d01b..1d407453 100644 --- a/modules/joex/src/main/scala/docspell/joex/filecopy/FileIntegrityCheckTask.scala +++ b/modules/joex/src/main/scala/docspell/joex/filecopy/FileIntegrityCheckTask.scala @@ -13,6 +13,7 @@ import docspell.backend.ops.OFileRepository import docspell.backend.ops.OFileRepository.IntegrityResult import docspell.common.{FileIntegrityCheckArgs, FileKey} import docspell.scheduler.{JobTaskResultEncoder, Task} +import docspell.store.Store import docspell.store.records.RFileMeta import io.circe.Encoder import io.circe.generic.semiauto.deriveEncoder @@ -54,9 +55,9 @@ object FileIntegrityCheckTask { } } - def apply[F[_]: Sync](ops: OFileRepository[F]): Task[F, Args, Result] = + def apply[F[_]: Sync](ops: OFileRepository[F], store: Store[F]): Task[F, Args, Result] = Task { ctx => - ctx.store + store .transact( RFileMeta .findAll(ctx.args.pattern, 50) diff --git a/modules/joex/src/main/scala/docspell/joex/fts/FtsContext.scala b/modules/joex/src/main/scala/docspell/joex/fts/FtsContext.scala index 41008a1d..2e046d89 100644 --- a/modules/joex/src/main/scala/docspell/joex/fts/FtsContext.scala +++ b/modules/joex/src/main/scala/docspell/joex/fts/FtsContext.scala @@ -9,25 +9,13 @@ package docspell.joex.fts import docspell.backend.fulltext.CreateIndex import docspell.ftsclient.FtsClient import docspell.joex.Config -import docspell.scheduler.Context import docspell.logging.Logger import docspell.store.Store -case class FtsContext[F[_]]( +final case class FtsContext[F[_]]( cfg: Config.FullTextSearch, store: Store[F], fulltext: CreateIndex[F], fts: FtsClient[F], logger: Logger[F] ) - -object FtsContext { - - def apply[F[_]]( - cfg: Config.FullTextSearch, - fts: FtsClient[F], - fulltext: CreateIndex[F], - ctx: Context[F, _] - ): FtsContext[F] = - FtsContext(cfg, ctx.store, fulltext, fts, ctx.logger) -} diff --git a/modules/joex/src/main/scala/docspell/joex/fts/FtsWork.scala b/modules/joex/src/main/scala/docspell/joex/fts/FtsWork.scala index 1a0a256f..8379098f 100644 --- a/modules/joex/src/main/scala/docspell/joex/fts/FtsWork.scala +++ b/modules/joex/src/main/scala/docspell/joex/fts/FtsWork.scala @@ -9,13 +9,13 @@ package docspell.joex.fts import cats._ import cats.data.{Kleisli, NonEmptyList} import cats.implicits._ - import docspell.backend.fulltext.CreateIndex import docspell.common._ import docspell.ftsclient._ import docspell.joex.Config import docspell.scheduler.Context import docspell.logging.Logger +import docspell.store.Store object FtsWork { import syntax._ @@ -106,10 +106,11 @@ object FtsWork { def forContext( cfg: Config.FullTextSearch, + store: Store[F], fts: FtsClient[F], fulltext: CreateIndex[F] ): Kleisli[F, Context[F, _], Unit] = - mt.local(ctx => FtsContext(cfg, fts, fulltext, ctx)) + mt.local(ctx => FtsContext(cfg, store, fulltext, fts, ctx.logger)) } } } diff --git a/modules/joex/src/main/scala/docspell/joex/fts/MigrationTask.scala b/modules/joex/src/main/scala/docspell/joex/fts/MigrationTask.scala index bd0e6fb9..6430b332 100644 --- a/modules/joex/src/main/scala/docspell/joex/fts/MigrationTask.scala +++ b/modules/joex/src/main/scala/docspell/joex/fts/MigrationTask.scala @@ -13,12 +13,14 @@ import docspell.common._ import docspell.ftsclient._ import docspell.joex.Config import docspell.scheduler.{Job, Task} +import docspell.store.Store object MigrationTask { val taskName = Ident.unsafe("full-text-index") def apply[F[_]: Async]( cfg: Config.FullTextSearch, + store: Store[F], fts: FtsClient[F], createIndex: CreateIndex[F] ): Task[F, Unit, Unit] = @@ -28,7 +30,7 @@ object MigrationTask { Task(ctx => for { migs <- migrationTasks[F](fts) - res <- Migration[F](cfg, fts, ctx.store, createIndex, ctx.logger).run(migs) + res <- Migration[F](cfg, fts, store, createIndex, ctx.logger).run(migs) } yield res ) ) diff --git a/modules/joex/src/main/scala/docspell/joex/fts/ReIndexTask.scala b/modules/joex/src/main/scala/docspell/joex/fts/ReIndexTask.scala index d9c0e832..ca1dd228 100644 --- a/modules/joex/src/main/scala/docspell/joex/fts/ReIndexTask.scala +++ b/modules/joex/src/main/scala/docspell/joex/fts/ReIndexTask.scala @@ -8,13 +8,13 @@ package docspell.joex.fts import cats.effect._ import cats.implicits._ - import docspell.backend.fulltext.CreateIndex import docspell.common._ import docspell.ftsclient._ import docspell.joex.Config import docspell.joex.fts.FtsWork.syntax._ import docspell.scheduler.Task +import docspell.store.Store object ReIndexTask { type Args = ReIndexTaskArgs @@ -24,6 +24,7 @@ object ReIndexTask { def apply[F[_]: Async]( cfg: Config.FullTextSearch, + store: Store[F], fts: FtsClient[F], fulltext: CreateIndex[F] ): Task[F, Args, Unit] = @@ -31,7 +32,7 @@ object ReIndexTask { .log[F, Args](_.info(s"Running full-text re-index now")) .flatMap(_ => Task(ctx => - clearData[F](ctx.args.collective).forContext(cfg, fts, fulltext).run(ctx) + clearData[F](ctx.args.collective).forContext(cfg, store, fts, fulltext).run(ctx) ) ) diff --git a/modules/joex/src/main/scala/docspell/joex/hk/CheckNodesTask.scala b/modules/joex/src/main/scala/docspell/joex/hk/CheckNodesTask.scala index aff4a5d3..1d202b52 100644 --- a/modules/joex/src/main/scala/docspell/joex/hk/CheckNodesTask.scala +++ b/modules/joex/src/main/scala/docspell/joex/hk/CheckNodesTask.scala @@ -8,18 +8,18 @@ package docspell.joex.hk import cats.effect._ import cats.implicits._ - import docspell.common._ -import docspell.scheduler.{Context, Task} +import docspell.scheduler.Task import docspell.logging.Logger +import docspell.store.Store import docspell.store.records._ - import org.http4s.blaze.client.BlazeClientBuilder import org.http4s.client.Client object CheckNodesTask { def apply[F[_]: Async]( - cfg: HouseKeepingConfig.CheckNodes + cfg: HouseKeepingConfig.CheckNodes, + store: Store[F] ): Task[F, Unit, CleanupResult] = Task { ctx => if (cfg.enabled) @@ -27,12 +27,12 @@ object CheckNodesTask { _ <- ctx.logger.info("Check nodes reachability") ec = scala.concurrent.ExecutionContext.global _ <- BlazeClientBuilder[F].withExecutionContext(ec).resource.use { client => - checkNodes(ctx, client) + checkNodes(ctx.logger, store, client) } _ <- ctx.logger.info( s"Remove nodes not found more than ${cfg.minNotFound} times" ) - n <- removeNodes(ctx, cfg) + n <- removeNodes(store, cfg) _ <- ctx.logger.info(s"Removed $n nodes") } yield CleanupResult.of(n) else @@ -41,14 +41,18 @@ object CheckNodesTask { } - def checkNodes[F[_]: Async](ctx: Context[F, _], client: Client[F]): F[Unit] = - ctx.store + def checkNodes[F[_]: Async]( + logger: Logger[F], + store: Store[F], + client: Client[F] + ): F[Unit] = + store .transact(RNode.streamAll) .evalMap(node => - checkNode(ctx.logger, client)(node.url) + checkNode(logger, client)(node.url) .flatMap(seen => - if (seen) ctx.store.transact(RNode.resetNotFound(node.id)) - else ctx.store.transact(RNode.incrementNotFound(node.id)) + if (seen) store.transact(RNode.resetNotFound(node.id)) + else store.transact(RNode.incrementNotFound(node.id)) ) ) .compile @@ -68,9 +72,9 @@ object CheckNodesTask { } def removeNodes[F[_]]( - ctx: Context[F, _], + store: Store[F], cfg: HouseKeepingConfig.CheckNodes ): F[Int] = - ctx.store.transact(RNode.deleteNotFound(cfg.minNotFound)) + store.transact(RNode.deleteNotFound(cfg.minNotFound)) } diff --git a/modules/joex/src/main/scala/docspell/joex/hk/CleanupInvitesTask.scala b/modules/joex/src/main/scala/docspell/joex/hk/CleanupInvitesTask.scala index e0cf916c..2dbed981 100644 --- a/modules/joex/src/main/scala/docspell/joex/hk/CleanupInvitesTask.scala +++ b/modules/joex/src/main/scala/docspell/joex/hk/CleanupInvitesTask.scala @@ -8,15 +8,16 @@ package docspell.joex.hk import cats.effect._ import cats.implicits._ - import docspell.common._ import docspell.scheduler.Task +import docspell.store.Store import docspell.store.records._ object CleanupInvitesTask { def apply[F[_]: Sync]( - cfg: HouseKeepingConfig.CleanupInvites + cfg: HouseKeepingConfig.CleanupInvites, + store: Store[F] ): Task[F, Unit, CleanupResult] = Task { ctx => if (cfg.enabled) @@ -24,7 +25,7 @@ object CleanupInvitesTask { now <- Timestamp.current[F] ts = now - cfg.olderThan _ <- ctx.logger.info(s"Cleanup invitations older than $ts") - n <- ctx.store.transact(RInvitation.deleteOlderThan(ts)) + n <- store.transact(RInvitation.deleteOlderThan(ts)) _ <- ctx.logger.info(s"Removed $n invitations") } yield CleanupResult.of(n) else diff --git a/modules/joex/src/main/scala/docspell/joex/hk/CleanupJobsTask.scala b/modules/joex/src/main/scala/docspell/joex/hk/CleanupJobsTask.scala index 104ae306..0a4ee43b 100644 --- a/modules/joex/src/main/scala/docspell/joex/hk/CleanupJobsTask.scala +++ b/modules/joex/src/main/scala/docspell/joex/hk/CleanupJobsTask.scala @@ -18,7 +18,8 @@ import docspell.store.records._ object CleanupJobsTask { def apply[F[_]: Sync]( - cfg: HouseKeepingConfig.CleanupJobs + cfg: HouseKeepingConfig.CleanupJobs, + store: Store[F] ): Task[F, Unit, CleanupResult] = Task { ctx => if (cfg.enabled) @@ -26,7 +27,7 @@ object CleanupJobsTask { now <- Timestamp.current[F] ts = now - cfg.olderThan _ <- ctx.logger.info(s"Cleanup jobs older than $ts") - n <- deleteDoneJobs(ctx.store, ts, cfg.deleteBatch) + n <- deleteDoneJobs(store, ts, cfg.deleteBatch) _ <- ctx.logger.info(s"Removed $n jobs") } yield CleanupResult.of(n) else diff --git a/modules/joex/src/main/scala/docspell/joex/hk/CleanupRememberMeTask.scala b/modules/joex/src/main/scala/docspell/joex/hk/CleanupRememberMeTask.scala index e1040e1c..b3d909ab 100644 --- a/modules/joex/src/main/scala/docspell/joex/hk/CleanupRememberMeTask.scala +++ b/modules/joex/src/main/scala/docspell/joex/hk/CleanupRememberMeTask.scala @@ -8,14 +8,15 @@ package docspell.joex.hk import cats.effect._ import cats.implicits._ - import docspell.common._ import docspell.scheduler.Task +import docspell.store.Store import docspell.store.records._ object CleanupRememberMeTask { def apply[F[_]: Sync]( - cfg: HouseKeepingConfig.CleanupRememberMe + cfg: HouseKeepingConfig.CleanupRememberMe, + store: Store[F] ): Task[F, Unit, CleanupResult] = Task { ctx => if (cfg.enabled) @@ -23,7 +24,7 @@ object CleanupRememberMeTask { now <- Timestamp.current[F] ts = now - cfg.olderThan _ <- ctx.logger.info(s"Cleanup remember-me tokens older than $ts") - n <- ctx.store.transact(RRememberMe.deleteOlderThan(ts)) + n <- store.transact(RRememberMe.deleteOlderThan(ts)) _ <- ctx.logger.info(s"Removed $n tokens") } yield CleanupResult.of(n) else diff --git a/modules/joex/src/main/scala/docspell/joex/hk/HouseKeepingTask.scala b/modules/joex/src/main/scala/docspell/joex/hk/HouseKeepingTask.scala index ff025d31..5e87ce17 100644 --- a/modules/joex/src/main/scala/docspell/joex/hk/HouseKeepingTask.scala +++ b/modules/joex/src/main/scala/docspell/joex/hk/HouseKeepingTask.scala @@ -15,6 +15,7 @@ import docspell.joex.filecopy.FileIntegrityCheckTask import docspell.scheduler.{JobTaskResultEncoder, Task} import com.github.eikek.calev._ import docspell.scheduler.usertask.UserTask +import docspell.store.Store import io.circe.Encoder import io.circe.generic.semiauto.deriveEncoder @@ -25,15 +26,16 @@ object HouseKeepingTask { def apply[F[_]: Async]( cfg: Config, + store: Store[F], fileRepo: OFileRepository[F] ): Task[F, Unit, Result] = { val combined = ( - CheckNodesTask(cfg.houseKeeping.checkNodes), - CleanupInvitesTask(cfg.houseKeeping.cleanupInvites), - CleanupJobsTask(cfg.houseKeeping.cleanupJobs), - CleanupRememberMeTask(cfg.houseKeeping.cleanupRememberMe), - IntegrityCheckTask(cfg.houseKeeping.integrityCheck, fileRepo) + CheckNodesTask(cfg.houseKeeping.checkNodes, store), + CleanupInvitesTask(cfg.houseKeeping.cleanupInvites, store), + CleanupJobsTask(cfg.houseKeeping.cleanupJobs, store), + CleanupRememberMeTask(cfg.houseKeeping.cleanupRememberMe, store), + IntegrityCheckTask(cfg.houseKeeping.integrityCheck, store, fileRepo) ).mapN(Result.apply) Task diff --git a/modules/joex/src/main/scala/docspell/joex/hk/IntegrityCheckTask.scala b/modules/joex/src/main/scala/docspell/joex/hk/IntegrityCheckTask.scala index b9b32ccb..06fca014 100644 --- a/modules/joex/src/main/scala/docspell/joex/hk/IntegrityCheckTask.scala +++ b/modules/joex/src/main/scala/docspell/joex/hk/IntegrityCheckTask.scala @@ -8,21 +8,22 @@ package docspell.joex.hk import cats.effect._ import cats.implicits._ - import docspell.backend.ops.OFileRepository import docspell.common._ import docspell.joex.filecopy.FileIntegrityCheckTask import docspell.scheduler.Task +import docspell.store.Store object IntegrityCheckTask { def apply[F[_]: Sync]( cfg: HouseKeepingConfig.IntegrityCheck, + store: Store[F], fileRepo: OFileRepository[F] ): Task[F, Unit, FileIntegrityCheckTask.Result] = Task { ctx => if (cfg.enabled) - FileIntegrityCheckTask(fileRepo).run( + FileIntegrityCheckTask(fileRepo, store).run( ctx.map(_ => FileIntegrityCheckArgs(FileKeyPart.Empty)) ) else diff --git a/modules/joex/src/main/scala/docspell/joex/learn/LearnClassifierTask.scala b/modules/joex/src/main/scala/docspell/joex/learn/LearnClassifierTask.scala index 60fb01e3..ce49077f 100644 --- a/modules/joex/src/main/scala/docspell/joex/learn/LearnClassifierTask.scala +++ b/modules/joex/src/main/scala/docspell/joex/learn/LearnClassifierTask.scala @@ -9,13 +9,13 @@ package docspell.joex.learn import cats.data.OptionT import cats.effect._ import cats.implicits._ - import docspell.analysis.TextAnalyser import docspell.backend.ops.OCollective import docspell.common._ import docspell.joex.Config import docspell.scheduler._ import docspell.logging.Logger +import docspell.store.Store import docspell.store.records.{RClassifierModel, RClassifierSetting} object LearnClassifierTask { @@ -29,14 +29,16 @@ object LearnClassifierTask { def apply[F[_]: Async]( cfg: Config.TextAnalysis, + store: Store[F], analyser: TextAnalyser[F] ): Task[F, Args, Unit] = - learnTags(cfg, analyser) - .flatMap(_ => learnItemEntities(cfg, analyser)) + learnTags(cfg, store, analyser) + .flatMap(_ => learnItemEntities(cfg, store, analyser)) .flatMap(_ => Task(_ => Sync[F].delay(System.gc()))) private def learnItemEntities[F[_]: Async]( cfg: Config.TextAnalysis, + store: Store[F], analyser: TextAnalyser[F] ): Task[F, Args, Unit] = Task { ctx => @@ -44,6 +46,7 @@ object LearnClassifierTask { LearnItemEntities .learnAll( analyser, + store, ctx.args.collective, cfg.classification.itemCount, cfg.maxLength @@ -54,16 +57,17 @@ object LearnClassifierTask { private def learnTags[F[_]: Async]( cfg: Config.TextAnalysis, + store: Store[F], analyser: TextAnalyser[F] ): Task[F, Args, Unit] = Task { ctx => val learnTags = for { - sett <- findActiveSettings[F](ctx, cfg) + sett <- findActiveSettings[F](ctx, store, cfg) maxItems = cfg.classification.itemCountOrWhenLower(sett.itemCount) _ <- OptionT.liftF( LearnTags - .learnAllTagCategories(analyser)( + .learnAllTagCategories(analyser, store)( ctx.args.collective, maxItems, cfg.maxLength @@ -74,34 +78,38 @@ object LearnClassifierTask { // learn classifier models from active tag categories learnTags.getOrElseF(logInactiveWarning(ctx.logger)) *> // delete classifier model files for categories that have been removed - clearObsoleteTagModels(ctx) *> + clearObsoleteTagModels(ctx, store) *> // when tags are deleted, categories may get removed. fix the json array - ctx.store + store .transact(RClassifierSetting.fixCategoryList(ctx.args.collective)) .map(_ => ()) } - private def clearObsoleteTagModels[F[_]: Sync](ctx: Context[F, Args]): F[Unit] = + private def clearObsoleteTagModels[F[_]: Sync]( + ctx: Context[F, Args], + store: Store[F] + ): F[Unit] = for { - list <- ctx.store.transact( + list <- store.transact( ClassifierName.findOrphanTagModels(ctx.args.collective) ) _ <- ctx.logger.info( s"Found ${list.size} obsolete model files that are deleted now." ) - n <- ctx.store.transact(RClassifierModel.deleteAll(list.map(_.id))) + n <- store.transact(RClassifierModel.deleteAll(list.map(_.id))) _ <- list .map(_.fileId) - .traverse(id => ctx.store.fileRepo.delete(id)) + .traverse(id => store.fileRepo.delete(id)) _ <- ctx.logger.debug(s"Deleted $n model files.") } yield () private def findActiveSettings[F[_]: Sync]( ctx: Context[F, Args], + store: Store[F], cfg: Config.TextAnalysis ): OptionT[F, OCollective.Classifier] = if (cfg.classification.enabled) - OptionT(ctx.store.transact(RClassifierSetting.findById(ctx.args.collective))) + OptionT(store.transact(RClassifierSetting.findById(ctx.args.collective))) .filter(_.autoTagEnabled) .map(OCollective.Classifier.fromRecord) else diff --git a/modules/joex/src/main/scala/docspell/joex/learn/LearnItemEntities.scala b/modules/joex/src/main/scala/docspell/joex/learn/LearnItemEntities.scala index cf8e7fe5..7d59afef 100644 --- a/modules/joex/src/main/scala/docspell/joex/learn/LearnItemEntities.scala +++ b/modules/joex/src/main/scala/docspell/joex/learn/LearnItemEntities.scala @@ -10,76 +10,84 @@ import cats.data.Kleisli import cats.effect._ import cats.implicits._ import fs2.Stream - import docspell.analysis.TextAnalyser import docspell.analysis.classifier.TextClassifier.Data import docspell.common._ import docspell.scheduler._ +import docspell.store.Store object LearnItemEntities { def learnAll[F[_]: Async, A]( analyser: TextAnalyser[F], + store: Store[F], collective: Ident, maxItems: Int, maxTextLen: Int ): Task[F, A, Unit] = - learnCorrOrg[F, A](analyser, collective, maxItems, maxTextLen) - .flatMap(_ => learnCorrPerson[F, A](analyser, collective, maxItems, maxTextLen)) - .flatMap(_ => learnConcPerson(analyser, collective, maxItems, maxTextLen)) - .flatMap(_ => learnConcEquip(analyser, collective, maxItems, maxTextLen)) + learnCorrOrg[F, A](analyser, store, collective, maxItems, maxTextLen) + .flatMap(_ => + learnCorrPerson[F, A](analyser, store, collective, maxItems, maxTextLen) + ) + .flatMap(_ => learnConcPerson(analyser, store, collective, maxItems, maxTextLen)) + .flatMap(_ => learnConcEquip(analyser, store, collective, maxItems, maxTextLen)) def learnCorrOrg[F[_]: Async, A]( analyser: TextAnalyser[F], + store: Store[F], collective: Ident, maxItems: Int, maxTextLen: Int ): Task[F, A, Unit] = - learn(analyser, collective)( + learn(store, analyser, collective)( ClassifierName.correspondentOrg, - ctx => SelectItems.forCorrOrg(ctx.store, collective, maxItems, maxTextLen) + _ => SelectItems.forCorrOrg(store, collective, maxItems, maxTextLen) ) def learnCorrPerson[F[_]: Async, A]( analyser: TextAnalyser[F], + store: Store[F], collective: Ident, maxItems: Int, maxTextLen: Int ): Task[F, A, Unit] = - learn(analyser, collective)( + learn(store, analyser, collective)( ClassifierName.correspondentPerson, - ctx => SelectItems.forCorrPerson(ctx.store, collective, maxItems, maxTextLen) + _ => SelectItems.forCorrPerson(store, collective, maxItems, maxTextLen) ) def learnConcPerson[F[_]: Async, A]( analyser: TextAnalyser[F], + store: Store[F], collective: Ident, maxItems: Int, maxTextLen: Int ): Task[F, A, Unit] = - learn(analyser, collective)( + learn(store, analyser, collective)( ClassifierName.concernedPerson, - ctx => SelectItems.forConcPerson(ctx.store, collective, maxItems, maxTextLen) + _ => SelectItems.forConcPerson(store, collective, maxItems, maxTextLen) ) def learnConcEquip[F[_]: Async, A]( analyser: TextAnalyser[F], + store: Store[F], collective: Ident, maxItems: Int, maxTextLen: Int ): Task[F, A, Unit] = - learn(analyser, collective)( + learn(store, analyser, collective)( ClassifierName.concernedEquip, - ctx => SelectItems.forConcEquip(ctx.store, collective, maxItems, maxTextLen) + _ => SelectItems.forConcEquip(store, collective, maxItems, maxTextLen) ) private def learn[F[_]: Async, A]( + store: Store[F], analyser: TextAnalyser[F], collective: Ident )(cname: ClassifierName, data: Context[F, _] => Stream[F, Data]): Task[F, A, Unit] = Task { ctx => ctx.logger.info(s"Learn classifier ${cname.name}") *> analyser.classifier.trainClassifier(ctx.logger, data(ctx))( - Kleisli(StoreClassifierModel.handleModel(ctx, collective, cname)) + Kleisli(StoreClassifierModel.handleModel(store, ctx.logger, collective, cname)) ) } } diff --git a/modules/joex/src/main/scala/docspell/joex/learn/LearnTags.scala b/modules/joex/src/main/scala/docspell/joex/learn/LearnTags.scala index f1460c19..54eb5a88 100644 --- a/modules/joex/src/main/scala/docspell/joex/learn/LearnTags.scala +++ b/modules/joex/src/main/scala/docspell/joex/learn/LearnTags.scala @@ -9,16 +9,17 @@ package docspell.joex.learn import cats.data.Kleisli import cats.effect._ import cats.implicits._ - import docspell.analysis.TextAnalyser import docspell.common._ import docspell.scheduler._ +import docspell.store.Store import docspell.store.records.RClassifierSetting object LearnTags { def learnTagCategory[F[_]: Async, A]( analyser: TextAnalyser[F], + store: Store[F], collective: Ident, maxItems: Int, maxTextLen: Int @@ -26,12 +27,14 @@ object LearnTags { category: String ): Task[F, A, Unit] = Task { ctx => - val data = SelectItems.forCategory(ctx, collective)(maxItems, category, maxTextLen) + val data = + SelectItems.forCategory(store, collective)(maxItems, category, maxTextLen) ctx.logger.info(s"Learn classifier for tag category: $category") *> analyser.classifier.trainClassifier(ctx.logger, data)( Kleisli( StoreClassifierModel.handleModel( - ctx, + store, + ctx.logger, collective, ClassifierName.tagCategory(category) ) @@ -39,15 +42,15 @@ object LearnTags { ) } - def learnAllTagCategories[F[_]: Async, A](analyser: TextAnalyser[F])( + def learnAllTagCategories[F[_]: Async, A](analyser: TextAnalyser[F], store: Store[F])( collective: Ident, maxItems: Int, maxTextLen: Int ): Task[F, A, Unit] = Task { ctx => for { - cats <- ctx.store.transact(RClassifierSetting.getActiveCategories(collective)) - task = learnTagCategory[F, A](analyser, collective, maxItems, maxTextLen) _ + cats <- store.transact(RClassifierSetting.getActiveCategories(collective)) + task = learnTagCategory[F, A](analyser, store, collective, maxItems, maxTextLen) _ _ <- cats.map(task).traverse(_.run(ctx)) } yield () } diff --git a/modules/joex/src/main/scala/docspell/joex/learn/SelectItems.scala b/modules/joex/src/main/scala/docspell/joex/learn/SelectItems.scala index b25f28ce..a54d7f7e 100644 --- a/modules/joex/src/main/scala/docspell/joex/learn/SelectItems.scala +++ b/modules/joex/src/main/scala/docspell/joex/learn/SelectItems.scala @@ -10,7 +10,6 @@ import fs2.{Pipe, Stream} import docspell.analysis.classifier.TextClassifier.Data import docspell.common._ -import docspell.scheduler.Context import docspell.store.Store import docspell.store.qb.Batch import docspell.store.queries.{QItem, TextAndTag} @@ -21,16 +20,7 @@ object SelectItems { val pageSep = LearnClassifierTask.pageSep val noClass = LearnClassifierTask.noClass - def forCategory[F[_]](ctx: Context[F, _], collective: Ident)( - maxItems: Int, - category: String, - maxTextLen: Int - ): Stream[F, Data] = - forCategory(ctx.store, collective, maxItems, category, maxTextLen) - - def forCategory[F[_]]( - store: Store[F], - collective: Ident, + def forCategory[F[_]](store: Store[F], collective: Ident)( maxItems: Int, category: String, maxTextLen: Int diff --git a/modules/joex/src/main/scala/docspell/joex/learn/StoreClassifierModel.scala b/modules/joex/src/main/scala/docspell/joex/learn/StoreClassifierModel.scala index 8c4feb9b..9d5aafe9 100644 --- a/modules/joex/src/main/scala/docspell/joex/learn/StoreClassifierModel.scala +++ b/modules/joex/src/main/scala/docspell/joex/learn/StoreClassifierModel.scala @@ -12,7 +12,6 @@ import fs2.io.file.Files import docspell.analysis.classifier.ClassifierModel import docspell.common._ -import docspell.scheduler._ import docspell.logging.Logger import docspell.store.Store import docspell.store.records.RClassifierModel @@ -20,21 +19,12 @@ import docspell.store.records.RClassifierModel object StoreClassifierModel { def handleModel[F[_]: Async]( - ctx: Context[F, _], + store: Store[F], + logger: Logger[F], collective: Ident, modelName: ClassifierName )( trainedModel: ClassifierModel - ): F[Unit] = - handleModel(ctx.store, ctx.logger)(collective, modelName, trainedModel) - - def handleModel[F[_]: Async]( - store: Store[F], - logger: Logger[F] - )( - collective: Ident, - modelName: ClassifierName, - trainedModel: ClassifierModel ): F[Unit] = for { oldFile <- store.transact( diff --git a/modules/joex/src/main/scala/docspell/joex/mail/ReadMail.scala b/modules/joex/src/main/scala/docspell/joex/mail/ReadMail.scala index 9b2db148..66450864 100644 --- a/modules/joex/src/main/scala/docspell/joex/mail/ReadMail.scala +++ b/modules/joex/src/main/scala/docspell/joex/mail/ReadMail.scala @@ -70,7 +70,7 @@ object ReadMail { HtmlBodyViewConfig.default.copy( textToHtml = MarkdownBody.makeHtml(markdownCfg) ) - ).map(makeHtmlBinary[F] _).map(b => Some(b)) + ).map(makeHtmlBinary[F]).map(b => Some(b)) } for { diff --git a/modules/joex/src/main/scala/docspell/joex/notify/PeriodicDueItemsTask.scala b/modules/joex/src/main/scala/docspell/joex/notify/PeriodicDueItemsTask.scala index 69361b69..1969a1aa 100644 --- a/modules/joex/src/main/scala/docspell/joex/notify/PeriodicDueItemsTask.scala +++ b/modules/joex/src/main/scala/docspell/joex/notify/PeriodicDueItemsTask.scala @@ -9,7 +9,6 @@ package docspell.joex.notify import cats.data.NonEmptyList import cats.effect._ import cats.implicits._ - import docspell.backend.ops.ONotification import docspell.common._ import docspell.scheduler.Context @@ -20,6 +19,7 @@ import docspell.notification.api.PeriodicDueItemsArgs import docspell.query.Date import docspell.query.ItemQuery._ import docspell.query.ItemQueryDsl._ +import docspell.store.Store import docspell.store.qb.Batch import docspell.store.queries.ListItem import docspell.store.queries.{QItem, Query} @@ -32,11 +32,14 @@ object PeriodicDueItemsTask { def onCancel[F[_]]: Task[F, Args, Unit] = Task.log(_.warn(s"Cancelling ${taskName.id} task")) - def apply[F[_]: Sync](notificationOps: ONotification[F]): Task[F, Args, Unit] = + def apply[F[_]: Sync]( + store: Store[F], + notificationOps: ONotification[F] + ): Task[F, Args, Unit] = Task { ctx => val limit = 7 Timestamp.current[F].flatMap { now => - withItems(ctx, limit, now) { items => + withItems(ctx, store, limit, now) { items => withEventContext(ctx, items, limit, now) { eventCtx => withChannel(ctx, notificationOps) { channels => notificationOps.sendMessage(ctx.logger, eventCtx, channels) @@ -51,7 +54,12 @@ object PeriodicDueItemsTask { ): F[Unit] = TaskOperations.withChannel(ctx.logger, ctx.args.channels, ctx.args.account, ops)(cont) - def withItems[F[_]: Sync](ctx: Context[F, Args], limit: Int, now: Timestamp)( + def withItems[F[_]: Sync]( + ctx: Context[F, Args], + store: Store[F], + limit: Int, + now: Timestamp + )( cont: Vector[ListItem] => F[Unit] ): F[Unit] = { val rightDate = Date((now + Duration.days(ctx.args.remindDays.toLong)).toMillis) @@ -77,7 +85,7 @@ object PeriodicDueItemsTask { for { res <- - ctx.store + store .transact( QItem .findItems(q, now.toUtcDate, 0, Batch.limit(limit)) diff --git a/modules/joex/src/main/scala/docspell/joex/notify/PeriodicQueryTask.scala b/modules/joex/src/main/scala/docspell/joex/notify/PeriodicQueryTask.scala index 8ed45a86..3b08edca 100644 --- a/modules/joex/src/main/scala/docspell/joex/notify/PeriodicQueryTask.scala +++ b/modules/joex/src/main/scala/docspell/joex/notify/PeriodicQueryTask.scala @@ -10,7 +10,6 @@ import cats.data.OptionT import cats.data.{NonEmptyList => Nel} import cats.effect._ import cats.implicits._ - import docspell.backend.ops.ONotification import docspell.common._ import docspell.scheduler.Context @@ -22,6 +21,7 @@ import docspell.query.ItemQuery import docspell.query.ItemQuery.Expr import docspell.query.ItemQuery.Expr.AndExpr import docspell.query.ItemQueryParser +import docspell.store.Store import docspell.store.qb.Batch import docspell.store.queries.ListItem import docspell.store.queries.{QItem, Query} @@ -36,11 +36,14 @@ object PeriodicQueryTask { def onCancel[F[_]]: Task[F, Args, Unit] = Task.log(_.warn(s"Cancelling ${taskName.id} task")) - def apply[F[_]: Sync](notificationOps: ONotification[F]): Task[F, Args, Unit] = + def apply[F[_]: Sync]( + store: Store[F], + notificationOps: ONotification[F] + ): Task[F, Args, Unit] = Task { ctx => val limit = 7 Timestamp.current[F].flatMap { now => - withItems(ctx, limit, now) { items => + withItems(ctx, store, limit, now) { items => withEventContext(ctx, items, limit, now) { eventCtx => withChannel(ctx, notificationOps) { channels => notificationOps.sendMessage(ctx.logger, eventCtx, channels) @@ -58,9 +61,11 @@ object PeriodicQueryTask { private def queryString(q: ItemQuery.Expr) = ItemQueryParser.asString(q) - def withQuery[F[_]: Sync](ctx: Context[F, Args])(cont: Query => F[Unit]): F[Unit] = { + def withQuery[F[_]: Sync](ctx: Context[F, Args], store: Store[F])( + cont: Query => F[Unit] + ): F[Unit] = { def fromBookmark(id: String) = - ctx.store + store .transact(RQueryBookmark.findByNameOrId(ctx.args.account, id)) .map(_.map(_.query)) .flatTap(q => @@ -68,7 +73,7 @@ object PeriodicQueryTask { ) def fromShare(id: String) = - ctx.store + store .transact(RShare.findOneByCollective(ctx.args.account.collective, Some(true), id)) .map(_.map(_.query)) .flatTap(q => @@ -120,11 +125,16 @@ object PeriodicQueryTask { } } - def withItems[F[_]: Sync](ctx: Context[F, Args], limit: Int, now: Timestamp)( + def withItems[F[_]: Sync]( + ctx: Context[F, Args], + store: Store[F], + limit: Int, + now: Timestamp + )( cont: Vector[ListItem] => F[Unit] ): F[Unit] = - withQuery(ctx) { query => - val items = ctx.store + withQuery(ctx, store) { query => + val items = store .transact(QItem.findItems(query, now.toUtcDate, 0, Batch.limit(limit))) .compile .to(Vector) diff --git a/modules/joex/src/main/scala/docspell/joex/pagecount/AllPageCountTask.scala b/modules/joex/src/main/scala/docspell/joex/pagecount/AllPageCountTask.scala index 5b8448d9..6ff6b578 100644 --- a/modules/joex/src/main/scala/docspell/joex/pagecount/AllPageCountTask.scala +++ b/modules/joex/src/main/scala/docspell/joex/pagecount/AllPageCountTask.scala @@ -13,6 +13,7 @@ import docspell.backend.JobFactory import docspell.backend.ops.OJoex import docspell.common._ import docspell.scheduler.{Context, Job, JobStore, Task} +import docspell.store.Store import docspell.store.records.RAttachment object AllPageCountTask { @@ -20,11 +21,15 @@ object AllPageCountTask { val taskName = Ident.unsafe("all-page-count") type Args = Unit - def apply[F[_]: Sync](jobStore: JobStore[F], joex: OJoex[F]): Task[F, Args, Unit] = + def apply[F[_]: Sync]( + store: Store[F], + jobStore: JobStore[F], + joex: OJoex[F] + ): Task[F, Args, Unit] = Task { ctx => for { _ <- ctx.logger.info("Generating previews for attachments") - n <- submitConversionJobs(ctx, jobStore) + n <- submitConversionJobs(ctx, store, jobStore) _ <- ctx.logger.info(s"Submitted $n jobs") _ <- joex.notifyAllNodes } yield () @@ -35,9 +40,10 @@ object AllPageCountTask { def submitConversionJobs[F[_]: Sync]( ctx: Context[F, Args], + store: Store[F], jobStore: JobStore[F] ): F[Int] = - ctx.store + store .transact(findAttachments) .chunks .flatMap(createJobs[F]) diff --git a/modules/joex/src/main/scala/docspell/joex/pagecount/PageCountTask.scala b/modules/joex/src/main/scala/docspell/joex/pagecount/MakePageCountTask.scala similarity index 71% rename from modules/joex/src/main/scala/docspell/joex/pagecount/PageCountTask.scala rename to modules/joex/src/main/scala/docspell/joex/pagecount/MakePageCountTask.scala index 5f4e73fa..bcb5fd77 100644 --- a/modules/joex/src/main/scala/docspell/joex/pagecount/PageCountTask.scala +++ b/modules/joex/src/main/scala/docspell/joex/pagecount/MakePageCountTask.scala @@ -8,11 +8,11 @@ package docspell.joex.pagecount import cats.effect._ import cats.implicits._ - import docspell.common._ import docspell.joex.process.AttachmentPageCount import docspell.scheduler.Context import docspell.scheduler.Task +import docspell.store.Store import docspell.store.records.RAttachment import docspell.store.records.RAttachmentMeta @@ -20,10 +20,10 @@ object MakePageCountTask { type Args = MakePageCountArgs - def apply[F[_]: Sync](): Task[F, Args, Unit] = + def apply[F[_]: Sync](store: Store[F]): Task[F, Args, Unit] = Task { ctx => for { - exists <- pageCountExists(ctx) + exists <- pageCountExists(ctx, store) _ <- if (exists) ctx.logger.info( @@ -32,7 +32,7 @@ object MakePageCountTask { else ctx.logger.info( s"Reading page-count for attachment ${ctx.args.attachment}" - ) *> generatePageCount(ctx) + ) *> generatePageCount(ctx, store) } yield () } @@ -40,19 +40,20 @@ object MakePageCountTask { Task.log(_.warn("Cancelling make-page-count task")) private def generatePageCount[F[_]: Sync]( - ctx: Context[F, Args] + ctx: Context[F, Args], + store: Store[F] ): F[Unit] = for { - ra <- ctx.store.transact(RAttachment.findById(ctx.args.attachment)) + ra <- store.transact(RAttachment.findById(ctx.args.attachment)) _ <- ra - .map(AttachmentPageCount.createPageCount(ctx)) + .map(AttachmentPageCount.createPageCount(ctx, store)) .getOrElse( ctx.logger.warn(s"No attachment found with id: ${ctx.args.attachment}") ) } yield () - private def pageCountExists[F[_]](ctx: Context[F, Args]): F[Boolean] = - ctx.store.transact( + private def pageCountExists[F[_]](ctx: Context[F, Args], store: Store[F]): F[Boolean] = + store.transact( RAttachmentMeta .findPageCountById(ctx.args.attachment) .map(_.exists(_ > 0)) diff --git a/modules/joex/src/main/scala/docspell/joex/pdfconv/ConvertAllPdfTask.scala b/modules/joex/src/main/scala/docspell/joex/pdfconv/ConvertAllPdfTask.scala index f1b30c5a..47b66323 100644 --- a/modules/joex/src/main/scala/docspell/joex/pdfconv/ConvertAllPdfTask.scala +++ b/modules/joex/src/main/scala/docspell/joex/pdfconv/ConvertAllPdfTask.scala @@ -12,6 +12,7 @@ import fs2.{Chunk, Stream} import docspell.backend.ops.OJoex import docspell.common._ import docspell.scheduler.{Context, Job, JobStore, Task} +import docspell.store.Store import docspell.store.records.RAttachment /* A task to find all non-converted pdf files (of a collective, or @@ -21,11 +22,15 @@ import docspell.store.records.RAttachment object ConvertAllPdfTask { type Args = ConvertAllPdfArgs - def apply[F[_]: Sync](jobStore: JobStore[F], joex: OJoex[F]): Task[F, Args, Unit] = + def apply[F[_]: Sync]( + jobStore: JobStore[F], + joex: OJoex[F], + store: Store[F] + ): Task[F, Args, Unit] = Task { ctx => for { _ <- ctx.logger.info("Converting pdfs using ocrmypdf") - n <- submitConversionJobs(ctx, jobStore) + n <- submitConversionJobs(ctx, store, jobStore) _ <- ctx.logger.info(s"Submitted $n file conversion jobs") _ <- joex.notifyAllNodes } yield () @@ -36,9 +41,10 @@ object ConvertAllPdfTask { def submitConversionJobs[F[_]: Sync]( ctx: Context[F, Args], + store: Store[F], jobStore: JobStore[F] ): F[Int] = - ctx.store + store .transact(RAttachment.findNonConvertedPdf(ctx.args.collective, 50)) .chunks .flatMap(createJobs[F](ctx)) diff --git a/modules/joex/src/main/scala/docspell/joex/pdfconv/PdfConvTask.scala b/modules/joex/src/main/scala/docspell/joex/pdfconv/PdfConvTask.scala index 47843a0b..7bf6fd5c 100644 --- a/modules/joex/src/main/scala/docspell/joex/pdfconv/PdfConvTask.scala +++ b/modules/joex/src/main/scala/docspell/joex/pdfconv/PdfConvTask.scala @@ -11,14 +11,13 @@ import cats.data.OptionT import cats.effect._ import cats.implicits._ import fs2.Stream - import docspell.common._ import docspell.convert.ConversionResult import docspell.convert.extern.OcrMyPdf import docspell.joex.Config import docspell.scheduler.{Context, Task} +import docspell.store.Store import docspell.store.records._ - import io.circe.generic.semiauto._ import io.circe.{Decoder, Encoder} @@ -36,12 +35,12 @@ object PdfConvTask { val taskName = Ident.unsafe("pdf-files-migration") - def apply[F[_]: Async](cfg: Config): Task[F, Args, Unit] = + def apply[F[_]: Async](cfg: Config, store: Store[F]): Task[F, Args, Unit] = Task { ctx => for { _ <- ctx.logger.info(s"Converting pdf file ${ctx.args} using ocrmypdf") - meta <- checkInputs(cfg, ctx) - _ <- meta.traverse(fm => convert(cfg, ctx, fm)) + meta <- checkInputs(cfg, ctx, store) + _ <- meta.traverse(fm => convert(cfg, ctx, store, fm)) } yield () } @@ -53,19 +52,20 @@ object PdfConvTask { // check if file exists and if it is pdf and if source id is the same and if ocrmypdf is enabled def checkInputs[F[_]: Sync]( cfg: Config, - ctx: Context[F, Args] + ctx: Context[F, Args], + store: Store[F] ): F[Option[RFileMeta]] = { val none: Option[RFileMeta] = None val checkSameFiles = (for { - ra <- OptionT(ctx.store.transact(RAttachment.findById(ctx.args.attachId))) + ra <- OptionT(store.transact(RAttachment.findById(ctx.args.attachId))) isSame <- OptionT.liftF( - ctx.store.transact(RAttachmentSource.isSameFile(ra.id, ra.fileId)) + store.transact(RAttachmentSource.isSameFile(ra.id, ra.fileId)) ) } yield isSame).getOrElse(false) val existsPdf = for { - meta <- ctx.store.transact(RAttachment.findMeta(ctx.args.attachId)) + meta <- store.transact(RAttachment.findMeta(ctx.args.attachId)) res = meta.filter(_.mimetype.matches(MimeType.pdf)) _ <- if (res.isEmpty) @@ -90,18 +90,19 @@ object PdfConvTask { def convert[F[_]: Async]( cfg: Config, ctx: Context[F, Args], + store: Store[F], in: RFileMeta ): F[Unit] = { - val fs = ctx.store.fileRepo + val fs = store.fileRepo val data = fs.getBytes(in.id) val storeResult: ConversionResult.Handler[F, Unit] = Kleisli { case ConversionResult.SuccessPdf(file) => - storeToAttachment(ctx, in, file) + storeToAttachment(ctx, store, in, file) case ConversionResult.SuccessPdfTxt(file, _) => - storeToAttachment(ctx, in, file) + storeToAttachment(ctx, store, in, file) case ConversionResult.UnsupportedFormat(mime) => ctx.logger.warn( @@ -124,19 +125,20 @@ object PdfConvTask { )(data, storeResult) for { - lang <- getLanguage(ctx) + lang <- getLanguage(ctx, store) _ <- ocrMyPdf(lang) } yield () } - def getLanguage[F[_]: Sync](ctx: Context[F, Args]): F[Language] = + def getLanguage[F[_]: Sync](ctx: Context[F, Args], store: Store[F]): F[Language] = (for { - coll <- OptionT(ctx.store.transact(RCollective.findByAttachment(ctx.args.attachId))) + coll <- OptionT(store.transact(RCollective.findByAttachment(ctx.args.attachId))) lang = coll.language } yield lang).getOrElse(Language.German) def storeToAttachment[F[_]: Sync]( ctx: Context[F, Args], + store: Store[F], meta: RFileMeta, newFile: Stream[F, Byte] ): F[Unit] = { @@ -146,10 +148,10 @@ object PdfConvTask { for { fid <- newFile - .through(ctx.store.fileRepo.save(collective, cat, mimeHint)) + .through(store.fileRepo.save(collective, cat, mimeHint)) .compile .lastOrError - _ <- ctx.store.transact(RAttachment.updateFileId(ctx.args.attachId, fid)) + _ <- store.transact(RAttachment.updateFileId(ctx.args.attachId, fid)) } yield () } } diff --git a/modules/joex/src/main/scala/docspell/joex/preview/AllPreviewsTask.scala b/modules/joex/src/main/scala/docspell/joex/preview/AllPreviewsTask.scala index 100ace8f..dcf66d56 100644 --- a/modules/joex/src/main/scala/docspell/joex/preview/AllPreviewsTask.scala +++ b/modules/joex/src/main/scala/docspell/joex/preview/AllPreviewsTask.scala @@ -14,17 +14,22 @@ import docspell.backend.ops.OJoex import docspell.common.MakePreviewArgs.StoreMode import docspell.common._ import docspell.scheduler.{Context, Job, JobStore, Task} +import docspell.store.Store import docspell.store.records.RAttachment object AllPreviewsTask { type Args = AllPreviewsArgs - def apply[F[_]: Sync](jobStore: JobStore[F], joex: OJoex[F]): Task[F, Args, Unit] = + def apply[F[_]: Sync]( + jobStore: JobStore[F], + joex: OJoex[F], + store: Store[F] + ): Task[F, Args, Unit] = Task { ctx => for { _ <- ctx.logger.info("Generating previews for attachments") - n <- submitConversionJobs(ctx, jobStore) + n <- submitConversionJobs(ctx, store, jobStore) _ <- ctx.logger.info(s"Submitted $n jobs") _ <- joex.notifyAllNodes } yield () @@ -35,9 +40,10 @@ object AllPreviewsTask { def submitConversionJobs[F[_]: Sync]( ctx: Context[F, Args], + store: Store[F], jobStore: JobStore[F] ): F[Int] = - ctx.store + store .transact(findAttachments(ctx)) .chunks .flatMap(createJobs[F](ctx)) diff --git a/modules/joex/src/main/scala/docspell/joex/preview/MakePreviewTask.scala b/modules/joex/src/main/scala/docspell/joex/preview/MakePreviewTask.scala index d20a6c75..afd541ea 100644 --- a/modules/joex/src/main/scala/docspell/joex/preview/MakePreviewTask.scala +++ b/modules/joex/src/main/scala/docspell/joex/preview/MakePreviewTask.scala @@ -8,13 +8,13 @@ package docspell.joex.preview import cats.effect._ import cats.implicits._ - import docspell.common._ import docspell.extract.pdfbox.PdfboxPreview import docspell.extract.pdfbox.PreviewConfig import docspell.joex.process.AttachmentPreview import docspell.scheduler.Context import docspell.scheduler.Task +import docspell.store.Store import docspell.store.records.RAttachment import docspell.store.records.RAttachmentPreview @@ -22,10 +22,10 @@ object MakePreviewTask { type Args = MakePreviewArgs - def apply[F[_]: Sync](pcfg: PreviewConfig): Task[F, Args, Unit] = + def apply[F[_]: Sync](pcfg: PreviewConfig, store: Store[F]): Task[F, Args, Unit] = Task { ctx => for { - exists <- previewExists(ctx) + exists <- previewExists(ctx, store) preview <- PdfboxPreview(pcfg) _ <- if (exists) @@ -35,7 +35,7 @@ object MakePreviewTask { else ctx.logger.info( s"Generating preview image for attachment ${ctx.args.attachment}" - ) *> generatePreview(ctx, preview) + ) *> generatePreview(ctx, store, preview) } yield () } @@ -44,20 +44,24 @@ object MakePreviewTask { private def generatePreview[F[_]: Sync]( ctx: Context[F, Args], + store: Store[F], preview: PdfboxPreview[F] ): F[Unit] = for { - ra <- ctx.store.transact(RAttachment.findById(ctx.args.attachment)) + ra <- store.transact(RAttachment.findById(ctx.args.attachment)) _ <- ra - .map(AttachmentPreview.createPreview(ctx, preview)) + .map(AttachmentPreview.createPreview(ctx, store, preview)) .getOrElse( ctx.logger.error(s"No attachment found with id: ${ctx.args.attachment}") ) } yield () - private def previewExists[F[_]: Sync](ctx: Context[F, Args]): F[Boolean] = + private def previewExists[F[_]: Sync]( + ctx: Context[F, Args], + store: Store[F] + ): F[Boolean] = if (ctx.args.store == MakePreviewArgs.StoreMode.WhenMissing) - ctx.store.transact( + store.transact( RAttachmentPreview.findById(ctx.args.attachment).map(_.isDefined) ) else diff --git a/modules/joex/src/main/scala/docspell/joex/process/AttachmentPageCount.scala b/modules/joex/src/main/scala/docspell/joex/process/AttachmentPageCount.scala index f6444311..1e216897 100644 --- a/modules/joex/src/main/scala/docspell/joex/process/AttachmentPageCount.scala +++ b/modules/joex/src/main/scala/docspell/joex/process/AttachmentPageCount.scala @@ -11,11 +11,11 @@ import cats.data.OptionT import cats.effect._ import cats.implicits._ import fs2.Stream - import docspell.common._ import docspell.extract.pdfbox.PdfMetaData import docspell.extract.pdfbox.PdfboxExtract import docspell.scheduler._ +import docspell.store.Store import docspell.store.records.RAttachment import docspell.store.records._ @@ -24,7 +24,7 @@ import docspell.store.records._ */ object AttachmentPageCount { - def apply[F[_]: Sync]()( + def apply[F[_]: Sync](store: Store[F])( item: ItemData ): Task[F, ProcessItemArgs, ItemData] = Task { ctx => @@ -33,7 +33,7 @@ object AttachmentPageCount { s"Retrieving page count for ${item.attachments.size} files…" ) _ <- item.attachments - .traverse(createPageCount(ctx)) + .traverse(createPageCount(ctx, store)) .attempt .flatMap { case Right(_) => ().pure[F] @@ -46,14 +46,15 @@ object AttachmentPageCount { } def createPageCount[F[_]: Sync]( - ctx: Context[F, _] + ctx: Context[F, _], + store: Store[F] )(ra: RAttachment): F[Option[PdfMetaData]] = - findMime[F](ctx)(ra).flatMap { + findMime[F](store)(ra).flatMap { case MimeType.PdfMatch(_) => - PdfboxExtract.getMetaData(loadFile(ctx)(ra)).flatMap { + PdfboxExtract.getMetaData(loadFile(store)(ra)).flatMap { case Right(md) => ctx.logger.debug(s"Found number of pages: ${md.pageCount}") *> - updatePageCount(ctx, md, ra).map(_.some) + updatePageCount(ctx, store, md, ra).map(_.some) case Left(ex) => ctx.logger.warn(s"Error obtaining pages count: ${ex.getMessage}") *> (None: Option[PdfMetaData]).pure[F] @@ -66,6 +67,7 @@ object AttachmentPageCount { private def updatePageCount[F[_]: Sync]( ctx: Context[F, _], + store: Store[F], md: PdfMetaData, ra: RAttachment ): F[PdfMetaData] = @@ -73,12 +75,12 @@ object AttachmentPageCount { _ <- ctx.logger.debug( s"Update attachment ${ra.id.id} with page count ${md.pageCount.some}" ) - n <- ctx.store.transact(RAttachmentMeta.updatePageCount(ra.id, md.pageCount.some)) + n <- store.transact(RAttachmentMeta.updatePageCount(ra.id, md.pageCount.some)) m <- if (n == 0) ctx.logger.warn( s"No attachmentmeta record exists for ${ra.id.id}. Creating new." - ) *> ctx.store.transact( + ) *> store.transact( RAttachmentMeta.insert( RAttachmentMeta( ra.id, @@ -94,11 +96,11 @@ object AttachmentPageCount { _ <- ctx.logger.debug(s"Stored page count (${n + m}).") } yield md - def findMime[F[_]: Functor](ctx: Context[F, _])(ra: RAttachment): F[MimeType] = - OptionT(ctx.store.transact(RFileMeta.findById(ra.fileId))) + def findMime[F[_]: Functor](store: Store[F])(ra: RAttachment): F[MimeType] = + OptionT(store.transact(RFileMeta.findById(ra.fileId))) .map(_.mimetype) .getOrElse(MimeType.octetStream) - def loadFile[F[_]](ctx: Context[F, _])(ra: RAttachment): Stream[F, Byte] = - ctx.store.fileRepo.getBytes(ra.fileId) + def loadFile[F[_]](store: Store[F])(ra: RAttachment): Stream[F, Byte] = + store.fileRepo.getBytes(ra.fileId) } diff --git a/modules/joex/src/main/scala/docspell/joex/process/AttachmentPreview.scala b/modules/joex/src/main/scala/docspell/joex/process/AttachmentPreview.scala index b15253da..db7c5f14 100644 --- a/modules/joex/src/main/scala/docspell/joex/process/AttachmentPreview.scala +++ b/modules/joex/src/main/scala/docspell/joex/process/AttachmentPreview.scala @@ -11,11 +11,11 @@ import cats.data.OptionT import cats.effect._ import cats.implicits._ import fs2.Stream - import docspell.common._ import docspell.extract.pdfbox.PdfboxPreview import docspell.extract.pdfbox.PreviewConfig import docspell.scheduler._ +import docspell.store.Store import docspell.store.queries.QAttachment import docspell.store.records.RAttachment import docspell.store.records._ @@ -26,7 +26,7 @@ import docspell.store.records._ */ object AttachmentPreview { - def apply[F[_]: Sync](pcfg: PreviewConfig)( + def apply[F[_]: Sync](pcfg: PreviewConfig, store: Store[F])( item: ItemData ): Task[F, ProcessItemArgs, ItemData] = Task { ctx => @@ -36,7 +36,7 @@ object AttachmentPreview { ) preview <- PdfboxPreview(pcfg) _ <- item.attachments - .traverse(createPreview(ctx, preview)) + .traverse(createPreview(ctx, store, preview)) .attempt .flatMap { case Right(_) => ().pure[F] @@ -50,16 +50,17 @@ object AttachmentPreview { def createPreview[F[_]: Sync]( ctx: Context[F, _], + store: Store[F], preview: PdfboxPreview[F] )( ra: RAttachment ): F[Option[RAttachmentPreview]] = - findMime[F](ctx)(ra).flatMap { + findMime[F](store)(ra).flatMap { case MimeType.PdfMatch(_) => - preview.previewPNG(loadFile(ctx)(ra)).flatMap { + preview.previewPNG(loadFile(store)(ra)).flatMap { case Some(out) => ctx.logger.debug("Preview generated, saving to database…") *> - createRecord(ctx, ra.fileId.collective, out, ra).map(_.some) + createRecord(store, ra.fileId.collective, out, ra).map(_.some) case None => ctx.logger .info(s"Preview could not be generated. Maybe the pdf has no pages?") *> @@ -72,7 +73,7 @@ object AttachmentPreview { } private def createRecord[F[_]: Sync]( - ctx: Context[F, _], + store: Store[F], collective: Ident, png: Stream[F, Byte], ra: RAttachment @@ -83,7 +84,7 @@ object AttachmentPreview { for { fileId <- png .through( - ctx.store.fileRepo.save( + store.fileRepo.save( collective, FileCategory.PreviewImage, MimeTypeHint(name.map(_.fullName), Some("image/png")) @@ -93,16 +94,16 @@ object AttachmentPreview { .lastOrError now <- Timestamp.current[F] rp = RAttachmentPreview(ra.id, fileId, name.map(_.fullName), now) - _ <- QAttachment.deletePreview(ctx.store)(ra.id) - _ <- ctx.store.transact(RAttachmentPreview.insert(rp)) + _ <- QAttachment.deletePreview(store)(ra.id) + _ <- store.transact(RAttachmentPreview.insert(rp)) } yield rp } - def findMime[F[_]: Functor](ctx: Context[F, _])(ra: RAttachment): F[MimeType] = - OptionT(ctx.store.transact(RFileMeta.findById(ra.fileId))) + def findMime[F[_]: Functor](store: Store[F])(ra: RAttachment): F[MimeType] = + OptionT(store.transact(RFileMeta.findById(ra.fileId))) .map(_.mimetype) .getOrElse(MimeType.octetStream) - def loadFile[F[_]](ctx: Context[F, _])(ra: RAttachment): Stream[F, Byte] = - ctx.store.fileRepo.getBytes(ra.fileId) + def loadFile[F[_]](store: Store[F])(ra: RAttachment): Stream[F, Byte] = + store.fileRepo.getBytes(ra.fileId) } diff --git a/modules/joex/src/main/scala/docspell/joex/process/ConvertPdf.scala b/modules/joex/src/main/scala/docspell/joex/process/ConvertPdf.scala index a7855808..1d4b9196 100644 --- a/modules/joex/src/main/scala/docspell/joex/process/ConvertPdf.scala +++ b/modules/joex/src/main/scala/docspell/joex/process/ConvertPdf.scala @@ -11,13 +11,13 @@ import cats.data.{Kleisli, OptionT} import cats.effect._ import cats.implicits._ import fs2.Stream - import docspell.common._ import docspell.convert.ConversionResult.Handler import docspell.convert.SanitizeHtml import docspell.convert._ import docspell.joex.extract.JsoupSanitizer import docspell.scheduler._ +import docspell.store.Store import docspell.store.records._ /** Goes through all attachments and creates a PDF version of it where supported. @@ -36,21 +36,22 @@ object ConvertPdf { def apply[F[_]: Async]( cfg: ConvertConfig, + store: Store[F], item: ItemData ): Task[F, Args, ItemData] = Task { ctx => def convert(ra: RAttachment): F[(RAttachment, Option[RAttachmentMeta])] = - isConverted(ctx)(ra).flatMap { + isConverted(store)(ra).flatMap { case true if ctx.args.isNormalProcessing => ctx.logger.info( s"Conversion to pdf already done for attachment ${ra.name}." ) *> - ctx.store + store .transact(RAttachmentMeta.findById(ra.id)) .map(rmOpt => (ra, rmOpt)) case _ => - findMime(ctx)(ra).flatMap(m => - convertSafe(cfg, JsoupSanitizer.clean, ctx, item)(ra, m) + findMime(store)(ra).flatMap(m => + convertSafe(cfg, JsoupSanitizer.clean, ctx, store, item)(ra, m) ) } @@ -62,13 +63,15 @@ object ConvertPdf { } - def isConverted[F[_]](ctx: Context[F, Args])( + def isConverted[F[_]](store: Store[F])( ra: RAttachment ): F[Boolean] = - ctx.store.transact(RAttachmentSource.isConverted(ra.id)) + store.transact(RAttachmentSource.isConverted(ra.id)) - def findMime[F[_]: Functor](ctx: Context[F, _])(ra: RAttachment): F[MimeType] = - OptionT(ctx.store.transact(RFileMeta.findById(ra.fileId))) + def findMime[F[_]: Functor](store: Store[F])( + ra: RAttachment + ): F[MimeType] = + OptionT(store.transact(RFileMeta.findById(ra.fileId))) .map(_.mimetype) .getOrElse(MimeType.octetStream) @@ -76,14 +79,15 @@ object ConvertPdf { cfg: ConvertConfig, sanitizeHtml: SanitizeHtml, ctx: Context[F, Args], + store: Store[F], item: ItemData )(ra: RAttachment, mime: MimeType): F[(RAttachment, Option[RAttachmentMeta])] = - loadCollectivePasswords(ctx).flatMap(collPass => + loadCollectivePasswords(ctx, store).flatMap(collPass => Conversion.create[F](cfg, sanitizeHtml, collPass, ctx.logger).use { conv => mime match { case mt => - val data = ctx.store.fileRepo.getBytes(ra.fileId) - val handler = conversionHandler[F](ctx, cfg, ra, item) + val data = store.fileRepo.getBytes(ra.fileId) + val handler = conversionHandler[F](ctx, store, cfg, ra, item) ctx.logger .info(s"Converting file ${ra.name} (${mime.asString}) into a PDF") *> conv.toPDF(DataType(mt), ctx.args.meta.language, handler)( @@ -94,14 +98,16 @@ object ConvertPdf { ) private def loadCollectivePasswords[F[_]: Async]( - ctx: Context[F, Args] + ctx: Context[F, Args], + store: Store[F] ): F[List[Password]] = - ctx.store + store .transact(RCollectivePassword.findAll(ctx.args.meta.collective)) .map(_.map(_.password).distinct) private def conversionHandler[F[_]: Sync]( ctx: Context[F, Args], + store: Store[F], cfg: ConvertConfig, ra: RAttachment, item: ItemData @@ -109,12 +115,12 @@ object ConvertPdf { Kleisli { case ConversionResult.SuccessPdf(pdf) => ctx.logger.info(s"Conversion to pdf successful. Saving file.") *> - storePDF(ctx, cfg, ra, pdf) + storePDF(ctx, store, cfg, ra, pdf) .map(r => (r, None)) case ConversionResult.SuccessPdfTxt(pdf, txt) => ctx.logger.info(s"Conversion to pdf+txt successful. Saving file.") *> - storePDF(ctx, cfg, ra, pdf) + storePDF(ctx, store, cfg, ra, pdf) .flatMap(r => txt.map(t => ( @@ -148,6 +154,7 @@ object ConvertPdf { private def storePDF[F[_]: Sync]( ctx: Context[F, Args], + store: Store[F], cfg: ConvertConfig, ra: RAttachment, pdf: Stream[F, Byte] @@ -162,7 +169,7 @@ object ConvertPdf { pdf .through( - ctx.store.fileRepo.save( + store.fileRepo.save( ctx.args.meta.collective, FileCategory.AttachmentConvert, MimeTypeHint(hint.filename, hint.advertised) @@ -170,32 +177,33 @@ object ConvertPdf { ) .compile .lastOrError - .flatMap(fmId => updateAttachment[F](ctx, ra, fmId, newName).map(_ => fmId)) + .flatMap(fmId => updateAttachment[F](ctx, store, ra, fmId, newName).map(_ => fmId)) .map(fmId => ra.copy(fileId = fmId, name = newName)) } private def updateAttachment[F[_]: Sync]( ctx: Context[F, _], + store: Store[F], ra: RAttachment, fmId: FileKey, newName: Option[String] ): F[Unit] = for { - oldFile <- ctx.store.transact(RAttachment.findById(ra.id)) + oldFile <- store.transact(RAttachment.findById(ra.id)) _ <- - ctx.store + store .transact(RAttachment.updateFileIdAndName(ra.id, fmId, newName)) _ <- oldFile match { case Some(raPrev) => for { sameFile <- - ctx.store + store .transact(RAttachmentSource.isSameFile(ra.id, raPrev.fileId)) _ <- if (sameFile) ().pure[F] else ctx.logger.info("Deleting previous attachment file") *> - ctx.store.fileRepo + store.fileRepo .delete(raPrev.fileId) .attempt .flatMap { diff --git a/modules/joex/src/main/scala/docspell/joex/process/CreateItem.scala b/modules/joex/src/main/scala/docspell/joex/process/CreateItem.scala index 56fd9b2f..7bd299bb 100644 --- a/modules/joex/src/main/scala/docspell/joex/process/CreateItem.scala +++ b/modules/joex/src/main/scala/docspell/joex/process/CreateItem.scala @@ -11,9 +11,9 @@ import cats.data.OptionT import cats.effect.Sync import cats.implicits._ import fs2.Stream - import docspell.common._ import docspell.scheduler.{Context, Task} +import docspell.store.Store import docspell.store.file.FileMetadata import docspell.store.queries.QItem import docspell.store.records._ @@ -21,13 +21,13 @@ import docspell.store.records._ /** Task that creates the item. */ object CreateItem { - def apply[F[_]: Sync]: Task[F, ProcessItemArgs, ItemData] = - findExisting[F].flatMap { + def apply[F[_]: Sync](store: Store[F]): Task[F, ProcessItemArgs, ItemData] = + findExisting[F](store).flatMap { case Some(ri) => Task.pure(ri) - case None => createNew[F] + case None => createNew[F](store) } - def createNew[F[_]: Sync]: Task[F, ProcessItemArgs, ItemData] = + def createNew[F[_]: Sync](store: Store[F]): Task[F, ProcessItemArgs, ItemData] = Task { ctx => def isValidFile(fm: FileMetadata) = ctx.args.meta.validFileTypes.isEmpty || @@ -36,11 +36,11 @@ object CreateItem { def fileMetas(itemId: Ident, now: Timestamp) = Stream - .eval(ctx.store.transact(RAttachment.nextPosition(itemId))) + .eval(store.transact(RAttachment.nextPosition(itemId))) .flatMap { offset => Stream .emits(ctx.args.files) - .evalMap(f => ctx.store.fileRepo.findMeta(f.fileMetaId).map(fm => (f, fm))) + .evalMap(f => store.fileRepo.findMeta(f.fileMetaId).map(fm => (f, fm))) .collect { case (f, Some(fm)) if isValidFile(fm) => f } .zipWithIndex .evalMap { case (f, index) => @@ -67,11 +67,11 @@ object CreateItem { (for { _ <- OptionT.liftF( ctx.logger.info( - s"Loading item with id ${id.id} to ammend" + s"Loading item with id ${id.id} to amend" ) ) item <- OptionT( - ctx.store + store .transact(RItem.findByIdAndCollective(id, ctx.args.meta.collective)) ) } yield (1, item)) @@ -88,7 +88,7 @@ object CreateItem { ctx.args.meta.direction.getOrElse(Direction.Incoming), ItemState.Premature ) - n <- ctx.store.transact(RItem.insert(item)) + n <- store.transact(RItem.insert(item)) } yield (n, item) } @@ -98,7 +98,7 @@ object CreateItem { _ <- if (it._1 != 1) storeItemError[F](ctx) else ().pure[F] now <- Timestamp.current[F] fm <- fileMetas(it._2.id, now) - k <- fm.traverse(insertAttachment(ctx)) + k <- fm.traverse(insertAttachment(store)) _ <- logDifferences(ctx, fm, k.sum) dur <- time _ <- ctx.logger.info(s"Creating item finished in ${dur.formatExact}") @@ -115,25 +115,27 @@ object CreateItem { ) } - def insertAttachment[F[_]](ctx: Context[F, _])(ra: RAttachment): F[Int] = { + def insertAttachment[F[_]](store: Store[F])(ra: RAttachment): F[Int] = { val rs = RAttachmentSource.of(ra) - ctx.store.transact(for { + store.transact(for { n <- RAttachment.insert(ra) _ <- RAttachmentSource.insert(rs) } yield n) } - private def findExisting[F[_]: Sync]: Task[F, ProcessItemArgs, Option[ItemData]] = + private def findExisting[F[_]: Sync]( + store: Store[F] + ): Task[F, ProcessItemArgs, Option[ItemData]] = Task { ctx => val states = ItemState.invalidStates val fileMetaIds = ctx.args.files.map(_.fileMetaId).toSet for { - cand <- ctx.store.transact(QItem.findByFileIds(fileMetaIds.toSeq, states)) + cand <- store.transact(QItem.findByFileIds(fileMetaIds.toSeq, states)) _ <- if (cand.nonEmpty) ctx.logger.warn(s"Found ${cand.size} existing item with these files.") else ().pure[F] - ht <- cand.drop(1).traverse(ri => QItem.delete(ctx.store)(ri.id, ri.cid)) + ht <- cand.drop(1).traverse(ri => QItem.delete(store)(ri.id, ri.cid)) _ <- if (ht.sum > 0) ctx.logger.warn(s"Removed ${ht.sum} items with same attachments") @@ -144,7 +146,7 @@ object CreateItem { OptionT( // load attachments but only those mentioned in the task's arguments cand.headOption.traverse(ri => - ctx.store + store .transact(RAttachment.findByItemCollectiveSource(ri.id, ri.cid, fids)) .flatTap(ats => ctx.logger.debug( @@ -156,7 +158,7 @@ object CreateItem { ) .getOrElse(Vector.empty) orig <- rms.traverse(a => - ctx.store.transact(RAttachmentSource.findById(a.id)).map(s => (a, s)) + store.transact(RAttachmentSource.findById(a.id)).map(s => (a, s)) ) origMap = orig diff --git a/modules/joex/src/main/scala/docspell/joex/process/CrossCheckProposals.scala b/modules/joex/src/main/scala/docspell/joex/process/CrossCheckProposals.scala index e13b5da8..74066434 100644 --- a/modules/joex/src/main/scala/docspell/joex/process/CrossCheckProposals.scala +++ b/modules/joex/src/main/scala/docspell/joex/process/CrossCheckProposals.scala @@ -10,10 +10,10 @@ import cats.data.NonEmptyList import cats.data.OptionT import cats.effect.Sync import cats.implicits._ - import docspell.common._ import docspell.scheduler.Task import docspell.logging.Logger +import docspell.store.Store /** After candidates have been determined, the set is reduced by doing some cross checks. * For example: if a organization is suggested as correspondent, the correspondent person @@ -22,13 +22,15 @@ import docspell.logging.Logger */ object CrossCheckProposals { - def apply[F[_]: Sync](data: ItemData): Task[F, ProcessItemArgs, ItemData] = + def apply[F[_]: Sync]( + store: Store[F] + )(data: ItemData): Task[F, ProcessItemArgs, ItemData] = Task { ctx => val proposals = data.finalProposals val corrOrg = proposals.find(MetaProposalType.CorrOrg) (for { orgRef <- OptionT.fromOption[F](corrOrg) - persRefs <- OptionT.liftF(EvalProposals.findOrganizationRelation(data, ctx)) + persRefs <- OptionT.liftF(EvalProposals.findOrganizationRelation(data, store)) clProps <- OptionT.liftF( personOrgCheck[F](ctx.logger, data.classifyProposals, persRefs)(orgRef) ) @@ -53,7 +55,7 @@ object CrossCheckProposals { mpl.find(MetaProposalType.CorrPerson) match { case Some(ppl) => val list = ppl.values.filter(c => - persRefs.get(c.ref.id).exists(_.organization == Some(orgId)) + persRefs.get(c.ref.id).exists(_.organization.contains(orgId)) ) if (ppl.values.toList == list) mpl.pure[F] diff --git a/modules/joex/src/main/scala/docspell/joex/process/DuplicateCheck.scala b/modules/joex/src/main/scala/docspell/joex/process/DuplicateCheck.scala index 6b07e949..44b01592 100644 --- a/modules/joex/src/main/scala/docspell/joex/process/DuplicateCheck.scala +++ b/modules/joex/src/main/scala/docspell/joex/process/DuplicateCheck.scala @@ -8,58 +8,63 @@ package docspell.joex.process import cats.effect._ import cats.implicits._ - import docspell.common._ import docspell.scheduler.{Context, Task} +import docspell.store.Store import docspell.store.queries.QItem import docspell.store.records.RFileMeta import docspell.store.records.RJob - import doobie._ object DuplicateCheck { type Args = ProcessItemArgs - def apply[F[_]: Sync]: Task[F, Args, Args] = + def apply[F[_]: Sync](store: Store[F]): Task[F, Args, Args] = Task { ctx => if (ctx.args.meta.skipDuplicate) for { - retries <- getRetryCount(ctx) + retries <- getRetryCount(ctx, store) res <- if (retries == 0) - ctx.logger.debug("Checking for duplicate files") *> removeDuplicates(ctx) + ctx.logger + .debug("Checking for duplicate files") *> removeDuplicates(ctx, store) else ctx.args.pure[F] } yield res else ctx.logger.debug("Not checking for duplicates") *> ctx.args.pure[F] } - def removeDuplicates[F[_]: Sync](ctx: Context[F, Args]): F[ProcessItemArgs] = + def removeDuplicates[F[_]: Sync]( + ctx: Context[F, Args], + store: Store[F] + ): F[ProcessItemArgs] = for { - fileMetas <- findDuplicates(ctx) - _ <- fileMetas.traverse(deleteDuplicate(ctx)) + fileMetas <- findDuplicates(ctx, store) + _ <- fileMetas.traverse(deleteDuplicate(ctx, store)) ids = fileMetas.filter(_.exists).map(_.fm.id).toSet } yield ctx.args.copy(files = ctx.args.files.filterNot(f => ids.contains(f.fileMetaId)) ) - private def getRetryCount[F[_]: Sync](ctx: Context[F, Args]): F[Int] = - ctx.store.transact(RJob.getRetries(ctx.jobId)).map(_.getOrElse(0)) + private def getRetryCount[F[_]: Sync](ctx: Context[F, _], store: Store[F]): F[Int] = + store.transact(RJob.getRetries(ctx.jobId)).map(_.getOrElse(0)) private def deleteDuplicate[F[_]: Sync]( - ctx: Context[F, Args] + ctx: Context[F, Args], + store: Store[F] )(fd: FileMetaDupes): F[Unit] = { val fname = ctx.args.files.find(_.fileMetaId == fd.fm.id).flatMap(_.name) if (fd.exists) ctx.logger - .info(s"Deleting duplicate file $fname!") *> ctx.store.fileRepo + .info(s"Deleting duplicate file $fname!") *> store.fileRepo .delete(fd.fm.id) else ().pure[F] } private def findDuplicates[F[_]]( - ctx: Context[F, Args] + ctx: Context[F, Args], + store: Store[F] ): F[Vector[FileMetaDupes]] = - ctx.store.transact(for { + store.transact(for { fileMetas <- RFileMeta.findByIds(ctx.args.files.map(_.fileMetaId)) dupes <- fileMetas.traverse(checkDuplicate(ctx)) } yield dupes) diff --git a/modules/joex/src/main/scala/docspell/joex/process/EvalProposals.scala b/modules/joex/src/main/scala/docspell/joex/process/EvalProposals.scala index 2a9092a1..327c0d08 100644 --- a/modules/joex/src/main/scala/docspell/joex/process/EvalProposals.scala +++ b/modules/joex/src/main/scala/docspell/joex/process/EvalProposals.scala @@ -7,30 +7,31 @@ package docspell.joex.process import java.time.{LocalDate, Period} - import cats.effect.Sync import cats.implicits._ - import docspell.common._ -import docspell.scheduler.{Context, Task} +import docspell.scheduler.Task +import docspell.store.Store import docspell.store.records.{RAttachmentMeta, RPerson} /** Calculate weights for candidates that adds the most likely candidate a lower number. */ object EvalProposals { - def apply[F[_]: Sync](data: ItemData): Task[F, ProcessItemArgs, ItemData] = - Task { ctx => + def apply[F[_]: Sync]( + store: Store[F] + )(data: ItemData): Task[F, ProcessItemArgs, ItemData] = + Task { _ => for { now <- Timestamp.current[F] - personRefs <- findOrganizationRelation[F](data, ctx) + personRefs <- findOrganizationRelation[F](data, store) metas = data.metas.map(calcCandidateWeight(now.toUtcDate, personRefs)) } yield data.copy(metas = metas) } def findOrganizationRelation[F[_]: Sync]( data: ItemData, - ctx: Context[F, _] + store: Store[F] ): F[Map[Ident, PersonRef]] = { val corrPersIds = data.metas .map(_.proposals) @@ -38,7 +39,7 @@ object EvalProposals { .flatMap(_.find(MetaProposalType.CorrPerson)) .flatMap(_.values.toList.map(_.ref.id)) .toSet - ctx.store + store .transact(RPerson.findOrganization(corrPersIds)) .map(_.map(p => (p.id, p)).toMap) } diff --git a/modules/joex/src/main/scala/docspell/joex/process/ExtractArchive.scala b/modules/joex/src/main/scala/docspell/joex/process/ExtractArchive.scala index b3fb3bbf..c4b02630 100644 --- a/modules/joex/src/main/scala/docspell/joex/process/ExtractArchive.scala +++ b/modules/joex/src/main/scala/docspell/joex/process/ExtractArchive.scala @@ -14,13 +14,12 @@ import cats.implicits._ import cats.kernel.Monoid import cats.kernel.Order import fs2.Stream - import docspell.common._ import docspell.files.Zip import docspell.joex.mail._ import docspell.scheduler._ +import docspell.store.Store import docspell.store.records._ - import emil.Mail /** Goes through all attachments and extracts archive files, like zip files. The process @@ -34,39 +33,41 @@ import emil.Mail object ExtractArchive { type Args = ProcessItemArgs - def apply[F[_]: Async]( + def apply[F[_]: Async](store: Store[F])( item: ItemData ): Task[F, Args, ItemData] = - multiPass(item, None).map(_._2) + multiPass(store, item, None).map(_._2) def multiPass[F[_]: Async]( + store: Store[F], item: ItemData, archive: Option[RAttachmentArchive] ): Task[F, Args, (Option[RAttachmentArchive], ItemData)] = - singlePass(item, archive).flatMap { t => + singlePass(store, item, archive).flatMap { t => if (t._1.isEmpty) Task.pure(t) - else multiPass(t._2, t._1) + else multiPass(store, t._2, t._1) } def singlePass[F[_]: Async]( + store: Store[F], item: ItemData, archive: Option[RAttachmentArchive] ): Task[F, Args, (Option[RAttachmentArchive], ItemData)] = Task { ctx => def extract(ra: RAttachment, pos: Int): F[Extracted] = - findMime(ctx)(ra).flatMap(m => extractSafe(ctx, archive)(ra, pos, m)) + findMime(store)(ra).flatMap(m => extractSafe(ctx, store, archive)(ra, pos, m)) for { - lastPos <- ctx.store.transact(RAttachment.nextPosition(item.item.id)) + lastPos <- store.transact(RAttachment.nextPosition(item.item.id)) extracts <- item.attachments.zipWithIndex .traverse(t => extract(t._1, lastPos + t._2)) .map(Monoid[Extracted].combineAll) .map(fixPositions) nra = extracts.files - _ <- extracts.files.traverse(storeAttachment(ctx)) + _ <- extracts.files.traverse(storeAttachment(store)) naa = extracts.archives - _ <- naa.traverse(storeArchive(ctx)) + _ <- naa.traverse(storeArchive(store)) } yield naa.headOption -> item.copy( attachments = nra, originFile = item.originFile ++ nra.map(a => a.id -> a.fileId).toMap, @@ -83,25 +84,26 @@ object ExtractArchive { if (extract.archives.isEmpty) extract else extract.updatePositions - def findMime[F[_]: Functor](ctx: Context[F, _])(ra: RAttachment): F[MimeType] = - OptionT(ctx.store.transact(RFileMeta.findById(ra.fileId))) + def findMime[F[_]: Functor](store: Store[F])(ra: RAttachment): F[MimeType] = + OptionT(store.transact(RFileMeta.findById(ra.fileId))) .map(_.mimetype) .getOrElse(MimeType.octetStream) def extractSafe[F[_]: Async]( ctx: Context[F, Args], + store: Store[F], archive: Option[RAttachmentArchive] )(ra: RAttachment, pos: Int, mime: MimeType): F[Extracted] = mime match { case MimeType.ZipMatch(_) if ra.name.exists(_.toLowerCase.endsWith(".zip")) => ctx.logger.info(s"Extracting zip archive ${ra.name.getOrElse("")}.") *> - extractZip(ctx, archive)(ra, pos) - .flatMap(cleanupParents(ctx, ra, archive)) + extractZip(ctx, store, archive)(ra, pos) + .flatMap(cleanupParents(ctx, store, ra, archive)) case MimeType.EmailMatch(_) => ctx.logger.info(s"Reading e-mail ${ra.name.getOrElse("")}") *> - extractMail(ctx, archive)(ra, pos) - .flatMap(cleanupParents(ctx, ra, archive)) + extractMail(ctx, store, archive)(ra, pos) + .flatMap(cleanupParents(ctx, store, ra, archive)) case _ => ctx.logger.debug(s"Not an archive: ${mime.asString}") *> @@ -110,6 +112,7 @@ object ExtractArchive { def cleanupParents[F[_]: Sync]( ctx: Context[F, _], + store: Store[F], ra: RAttachment, archive: Option[RAttachmentArchive] )(extracted: Extracted): F[Extracted] = @@ -119,30 +122,31 @@ object ExtractArchive { _ <- ctx.logger.debug( s"Extracted inner attachment ${ra.name}. Remove it completely." ) - _ <- ctx.store.transact(RAttachmentArchive.delete(ra.id)) - _ <- ctx.store.transact(RAttachment.delete(ra.id)) - _ <- ctx.store.fileRepo.delete(ra.fileId) + _ <- store.transact(RAttachmentArchive.delete(ra.id)) + _ <- store.transact(RAttachment.delete(ra.id)) + _ <- store.fileRepo.delete(ra.fileId) } yield extracted case None => for { _ <- ctx.logger.debug( s"Extracted attachment ${ra.name}. Remove it from the item." ) - _ <- ctx.store.transact(RAttachment.delete(ra.id)) + _ <- store.transact(RAttachment.delete(ra.id)) } yield extracted.copy(files = extracted.files.filter(_.id != ra.id)) } def extractZip[F[_]: Async]( ctx: Context[F, Args], + store: Store[F], archive: Option[RAttachmentArchive] )(ra: RAttachment, pos: Int): F[Extracted] = { - val zipData = ctx.store.fileRepo.getBytes(ra.fileId) + val zipData = store.fileRepo.getBytes(ra.fileId) val glob = ctx.args.meta.fileFilter.getOrElse(Glob.all) ctx.logger.debug(s"Filtering zip entries with '${glob.asString}'") *> zipData .through(Zip.unzipP[F](8192, glob)) .zipWithIndex - .flatMap(handleEntry(ctx, ra, pos, archive, None)) + .flatMap(handleEntry(ctx, store, ra, pos, archive, None)) .foldMonoid .compile .lastOrError @@ -150,9 +154,10 @@ object ExtractArchive { def extractMail[F[_]: Async]( ctx: Context[F, Args], + store: Store[F], archive: Option[RAttachmentArchive] )(ra: RAttachment, pos: Int): F[Extracted] = { - val email: Stream[F, Byte] = ctx.store.fileRepo.getBytes(ra.fileId) + val email: Stream[F, Byte] = store.fileRepo.getBytes(ra.fileId) val glob = ctx.args.meta.fileFilter.getOrElse(Glob.all) val attachOnly = ctx.args.meta.attachmentsOnly.getOrElse(false) @@ -170,7 +175,9 @@ object ExtractArchive { ReadMail .mailToEntries(ctx.logger, glob, attachOnly)(mail) .zipWithIndex - .flatMap(handleEntry(ctx, ra, pos, archive, mId)) ++ Stream.eval(givenMeta) + .flatMap(handleEntry(ctx, store, ra, pos, archive, mId)) ++ Stream.eval( + givenMeta + ) } .foldMonoid .compile @@ -185,6 +192,7 @@ object ExtractArchive { def handleEntry[F[_]: Sync]( ctx: Context[F, Args], + store: Store[F], ra: RAttachment, pos: Int, archive: Option[RAttachmentArchive], @@ -195,7 +203,7 @@ object ExtractArchive { val (entry, subPos) = tentry val mimeHint = MimeTypeHint.filename(entry.name).withAdvertised(entry.mime.asString) val fileId = entry.data.through( - ctx.store.fileRepo + store.fileRepo .save(ctx.args.meta.collective, FileCategory.AttachmentSource, mimeHint) ) @@ -217,16 +225,16 @@ object ExtractArchive { } - def storeAttachment[F[_]: Sync](ctx: Context[F, _])(ra: RAttachment): F[Int] = { - val insert = CreateItem.insertAttachment(ctx)(ra) + def storeAttachment[F[_]: Sync](store: Store[F])(ra: RAttachment): F[Int] = { + val insert = CreateItem.insertAttachment(store)(ra) for { - n1 <- ctx.store.transact(RAttachment.updatePosition(ra.id, ra.position)) + n1 <- store.transact(RAttachment.updatePosition(ra.id, ra.position)) n2 <- if (n1 > 0) 0.pure[F] else insert } yield n1 + n2 } - def storeArchive[F[_]](ctx: Context[F, _])(aa: RAttachmentArchive): F[Int] = - ctx.store.transact(RAttachmentArchive.insert(aa)) + def storeArchive[F[_]](store: Store[F])(aa: RAttachmentArchive): F[Int] = + store.transact(RAttachmentArchive.insert(aa)) case class Extracted( files: Vector[RAttachment], diff --git a/modules/joex/src/main/scala/docspell/joex/process/FindProposal.scala b/modules/joex/src/main/scala/docspell/joex/process/FindProposal.scala index a4af8e48..80beb187 100644 --- a/modules/joex/src/main/scala/docspell/joex/process/FindProposal.scala +++ b/modules/joex/src/main/scala/docspell/joex/process/FindProposal.scala @@ -7,16 +7,15 @@ package docspell.joex.process import java.time.ZoneId - import cats.effect.Sync import cats.implicits._ import cats.{Applicative, FlatMap} - import docspell.analysis.contact._ import docspell.common.MetaProposal.Candidate import docspell.common._ import docspell.joex.Config import docspell.scheduler.{Context, Task} +import docspell.store.Store import docspell.store.records._ /** Super simple approach to find corresponding meta data to an item by looking up values @@ -26,7 +25,8 @@ object FindProposal { type Args = ProcessItemArgs def apply[F[_]: Sync]( - cfg: Config.TextAnalysis + cfg: Config.TextAnalysis, + store: Store[F] )(data: ItemData): Task[F, Args, ItemData] = Task { ctx => val rmas = data.metas.map(rm => rm.copy(nerlabels = removeDuplicates(rm.nerlabels))) @@ -34,15 +34,16 @@ object FindProposal { _ <- ctx.logger.info("Starting find-proposal") rmv <- rmas .traverse(rm => - processAttachment(cfg, rm, data.findDates(rm), ctx) + processAttachment(cfg, rm, data.findDates(rm), ctx, store) .map(ml => rm.copy(proposals = ml)) ) - clp <- lookupClassifierProposals(ctx, data.classifyProposals) + clp <- lookupClassifierProposals(ctx, store, data.classifyProposals) } yield data.copy(metas = rmv, classifyProposals = clp) } def lookupClassifierProposals[F[_]: Sync]( ctx: Context[F, Args], + store: Store[F], mpList: MetaProposalList ): F[MetaProposalList] = { val coll = ctx.args.meta.collective @@ -50,7 +51,7 @@ object FindProposal { def lookup(mp: MetaProposal): F[Option[IdRef]] = mp.proposalType match { case MetaProposalType.CorrOrg => - ctx.store + store .transact( ROrganization .findLike(coll, mp.values.head.ref.name.toLowerCase, OrgUse.notDisabled) @@ -60,7 +61,7 @@ object FindProposal { ctx.logger.debug(s"Found classifier organization for $mp: $oref") ) case MetaProposalType.CorrPerson => - ctx.store + store .transact( RPerson .findLike( @@ -74,7 +75,7 @@ object FindProposal { ctx.logger.debug(s"Found classifier corr-person for $mp: $oref") ) case MetaProposalType.ConcPerson => - ctx.store + store .transact( RPerson .findLike( @@ -88,7 +89,7 @@ object FindProposal { ctx.logger.debug(s"Found classifier conc-person for $mp: $oref") ) case MetaProposalType.ConcEquip => - ctx.store + store .transact( REquipment .findLike( @@ -123,9 +124,10 @@ object FindProposal { cfg: Config.TextAnalysis, rm: RAttachmentMeta, rd: Vector[NerDateLabel], - ctx: Context[F, ProcessItemArgs] + ctx: Context[F, Args], + store: Store[F] ): F[MetaProposalList] = { - val finder = Finder.searchExact(ctx).next(Finder.searchFuzzy(ctx)) + val finder = Finder.searchExact(ctx, store).next(Finder.searchFuzzy(ctx, store)) List(finder.find(rm.nerlabels), makeDateProposal(cfg, rd)) .traverse(identity) .map(MetaProposalList.flatten) @@ -215,19 +217,24 @@ object FindProposal { def unit[F[_]: Applicative](value: MetaProposalList): Finder[F] = _ => value.pure[F] - def searchExact[F[_]: Sync](ctx: Context[F, ProcessItemArgs]): Finder[F] = + def searchExact[F[_]: Sync](ctx: Context[F, Args], store: Store[F]): Finder[F] = labels => - labels.toList.traverse(nl => search(nl, true, ctx)).map(MetaProposalList.flatten) + labels.toList + .traverse(nl => search(nl, true, ctx, store)) + .map(MetaProposalList.flatten) - def searchFuzzy[F[_]: Sync](ctx: Context[F, ProcessItemArgs]): Finder[F] = + def searchFuzzy[F[_]: Sync](ctx: Context[F, Args], store: Store[F]): Finder[F] = labels => - labels.toList.traverse(nl => search(nl, false, ctx)).map(MetaProposalList.flatten) + labels.toList + .traverse(nl => search(nl, false, ctx, store)) + .map(MetaProposalList.flatten) } private def search[F[_]: Sync]( nt: NerLabel, exact: Boolean, - ctx: Context[F, ProcessItemArgs] + ctx: Context[F, ProcessItemArgs], + store: Store[F] ): F[MetaProposalList] = { val value = if (exact) normalizeSearchValue(nt.label) @@ -243,7 +250,7 @@ object FindProposal { nt.tag match { case NerTag.Organization => ctx.logger.debug(s"Looking for organizations: $value") *> - ctx.store + store .transact( ROrganization .findLike(ctx.args.meta.collective, value, OrgUse.notDisabled) @@ -251,20 +258,20 @@ object FindProposal { .map(MetaProposalList.from(MetaProposalType.CorrOrg, nt)) case NerTag.Person => - val s1 = ctx.store + val s1 = store .transact( RPerson .findLike(ctx.args.meta.collective, value, PersonUse.concerningAndBoth) ) .map(MetaProposalList.from(MetaProposalType.ConcPerson, nt)) - val s2 = ctx.store + val s2 = store .transact( RPerson .findLike(ctx.args.meta.collective, value, PersonUse.correspondentAndBoth) ) .map(MetaProposalList.from(MetaProposalType.CorrPerson, nt)) val s3 = - ctx.store + store .transact( ROrganization .findLike(ctx.args.meta.collective, value, OrgUse.notDisabled) @@ -283,7 +290,7 @@ object FindProposal { case NerTag.Misc => ctx.logger.debug(s"Looking for equipments: $value") *> - ctx.store + store .transact( REquipment .findLike(ctx.args.meta.collective, value, EquipmentUse.notDisabled) @@ -291,7 +298,7 @@ object FindProposal { .map(MetaProposalList.from(MetaProposalType.ConcEquip, nt)) case NerTag.Email => - searchContact(nt, ContactKind.Email, value, ctx) + searchContact(nt, ContactKind.Email, value, ctx, store) case NerTag.Website => if (!exact) { @@ -301,9 +308,9 @@ object FindProposal { .map(_.toPrimaryDomain.asString) .map(s => s"%$s%") .getOrElse(value) - searchContact(nt, ContactKind.Website, searchString, ctx) + searchContact(nt, ContactKind.Website, searchString, ctx, store) } else - searchContact(nt, ContactKind.Website, value, ctx) + searchContact(nt, ContactKind.Website, value, ctx, store) case NerTag.Date => // There is no database search required for this tag @@ -315,18 +322,19 @@ object FindProposal { nt: NerLabel, kind: ContactKind, value: String, - ctx: Context[F, ProcessItemArgs] + ctx: Context[F, ProcessItemArgs], + store: Store[F] ): F[MetaProposalList] = { - val orgs = ctx.store + val orgs = store .transact(ROrganization.findLike(ctx.args.meta.collective, kind, value)) .map(MetaProposalList.from(MetaProposalType.CorrOrg, nt)) - val corrP = ctx.store + val corrP = store .transact( RPerson .findLike(ctx.args.meta.collective, kind, value, PersonUse.correspondentAndBoth) ) .map(MetaProposalList.from(MetaProposalType.CorrPerson, nt)) - val concP = ctx.store + val concP = store .transact( RPerson .findLike(ctx.args.meta.collective, kind, value, PersonUse.concerningAndBoth) diff --git a/modules/joex/src/main/scala/docspell/joex/process/ItemHandler.scala b/modules/joex/src/main/scala/docspell/joex/process/ItemHandler.scala index b3a5ae7b..3122da9f 100644 --- a/modules/joex/src/main/scala/docspell/joex/process/ItemHandler.scala +++ b/modules/joex/src/main/scala/docspell/joex/process/ItemHandler.scala @@ -10,7 +10,6 @@ import cats.data.OptionT import cats.effect._ import cats.implicits._ import fs2.Stream - import docspell.analysis.TextAnalyser import docspell.backend.ops.OItem import docspell.common.{ItemState, ProcessItemArgs} @@ -18,49 +17,51 @@ import docspell.ftsclient.FtsClient import docspell.joex.Config import docspell.joex.analysis.RegexNerFile import docspell.scheduler.Task +import docspell.store.Store import docspell.store.queries.QItem import docspell.store.records.RItem object ItemHandler { type Args = ProcessItemArgs - def onCancel[F[_]: Sync]: Task[F, Args, Unit] = + def onCancel[F[_]: Sync](store: Store[F]): Task[F, Args, Unit] = logWarn[F]("Now cancelling.").flatMap(_ => - markItemCreated.flatMap { + markItemCreated(store).flatMap { case true => Task.pure(()) case false => - deleteByFileIds[F].flatMap(_ => deleteFiles) + deleteByFileIds[F](store).flatMap(_ => deleteFiles(store)) } ) def newItem[F[_]: Async]( cfg: Config, + store: Store[F], itemOps: OItem[F], fts: FtsClient[F], analyser: TextAnalyser[F], regexNer: RegexNerFile[F] ): Task[F, Args, Option[ItemData]] = logBeginning[F].flatMap(_ => - DuplicateCheck[F] + DuplicateCheck[F](store) .flatMap(args => if (args.files.isEmpty) logNoFiles[F].map(_ => None) else { val create: Task[F, Args, ItemData] = - CreateItem[F].contramap(_ => args.pure[F]) + CreateItem[F](store).contramap(_ => args.pure[F]) create - .flatMap(itemStateTask(ItemState.Processing)) - .flatMap(safeProcess[F](cfg, itemOps, fts, analyser, regexNer)) + .flatMap(itemStateTask(store, ItemState.Processing)) + .flatMap(safeProcess[F](cfg, store, itemOps, fts, analyser, regexNer)) .map(_.some) } ) ) - def itemStateTask[F[_]: Sync, A]( - state: ItemState - )(data: ItemData): Task[F, A, ItemData] = - Task(ctx => - ctx.store + def itemStateTask[F[_]: Sync, A](store: Store[F], state: ItemState)( + data: ItemData + ): Task[F, A, ItemData] = + Task(_ => + store .transact(RItem.updateState(data.item.id, state, ItemState.invalidStates)) .map(_ => data) ) @@ -70,6 +71,7 @@ object ItemHandler { def safeProcess[F[_]: Async]( cfg: Config, + store: Store[F], itemOps: OItem[F], fts: FtsClient[F], analyser: TextAnalyser[F], @@ -77,30 +79,31 @@ object ItemHandler { )(data: ItemData): Task[F, Args, ItemData] = isLastRetry[F].flatMap { case true => - ProcessItem[F](cfg, itemOps, fts, analyser, regexNer)(data).attempt.flatMap { - case Right(d) => - Task.pure(d) - case Left(ex) => - logWarn[F]( - "Processing failed on last retry. Creating item but without proposals." - ).flatMap(_ => itemStateTask(ItemState.Created)(data)) - .andThen(_ => Sync[F].raiseError(ex)) - } + ProcessItem[F](cfg, itemOps, fts, analyser, regexNer, store)(data).attempt + .flatMap { + case Right(d) => + Task.pure(d) + case Left(ex) => + logWarn[F]( + "Processing failed on last retry. Creating item but without proposals." + ).flatMap(_ => itemStateTask(store, ItemState.Created)(data)) + .andThen(_ => Sync[F].raiseError(ex)) + } case false => - ProcessItem[F](cfg, itemOps, fts, analyser, regexNer)(data) - .flatMap(itemStateTask(ItemState.Created)) + ProcessItem[F](cfg, itemOps, fts, analyser, regexNer, store)(data) + .flatMap(itemStateTask(store, ItemState.Created)) } - private def markItemCreated[F[_]: Sync]: Task[F, Args, Boolean] = + private def markItemCreated[F[_]: Sync](store: Store[F]): Task[F, Args, Boolean] = Task { ctx => val fileMetaIds = ctx.args.files.map(_.fileMetaId).toSet (for { - item <- OptionT(ctx.store.transact(QItem.findOneByFileIds(fileMetaIds.toSeq))) + item <- OptionT(store.transact(QItem.findOneByFileIds(fileMetaIds.toSeq))) _ <- OptionT.liftF( ctx.logger.info("Processing cancelled. Marking item as created anyways.") ) _ <- OptionT.liftF( - ctx.store + store .transact( RItem.updateState(item.id, ItemState.Created, ItemState.invalidStates) ) @@ -111,11 +114,11 @@ object ItemHandler { ) } - private def deleteByFileIds[F[_]: Sync]: Task[F, Args, Unit] = + private def deleteByFileIds[F[_]: Sync](store: Store[F]): Task[F, Args, Unit] = Task { ctx => val states = ItemState.invalidStates for { - items <- ctx.store.transact( + items <- store.transact( QItem.findByFileIds(ctx.args.files.map(_.fileMetaId), states) ) _ <- @@ -124,16 +127,16 @@ object ItemHandler { ctx.logger.info( s"No items found for file ids ${ctx.args.files.map(_.fileMetaId)}" ) - _ <- items.traverse(i => QItem.delete(ctx.store)(i.id, ctx.args.meta.collective)) + _ <- items.traverse(i => QItem.delete(store)(i.id, ctx.args.meta.collective)) } yield () } - private def deleteFiles[F[_]: Sync]: Task[F, Args, Unit] = + private def deleteFiles[F[_]: Sync](store: Store[F]): Task[F, Args, Unit] = Task(ctx => ctx.logger.info("Deleting input files …") *> Stream .emits(ctx.args.files.map(_.fileMetaId)) - .evalMap(id => ctx.store.fileRepo.delete(id).attempt) + .evalMap(id => store.fileRepo.delete(id).attempt) .compile .drain ) diff --git a/modules/joex/src/main/scala/docspell/joex/process/LinkProposal.scala b/modules/joex/src/main/scala/docspell/joex/process/LinkProposal.scala index 2e85e450..71598b64 100644 --- a/modules/joex/src/main/scala/docspell/joex/process/LinkProposal.scala +++ b/modules/joex/src/main/scala/docspell/joex/process/LinkProposal.scala @@ -9,22 +9,26 @@ package docspell.joex.process import cats.data.NonEmptyList import cats.effect.Sync import cats.implicits._ - import docspell.common._ import docspell.scheduler.{Context, Task} +import docspell.store.Store import docspell.store.records.RItem object LinkProposal { - def onlyNew[F[_]: Sync](data: ItemData): Task[F, ProcessItemArgs, ItemData] = + def onlyNew[F[_]: Sync]( + store: Store[F] + )(data: ItemData): Task[F, ProcessItemArgs, ItemData] = if (data.item.state.isValid) Task .log[F, ProcessItemArgs](_.debug(s"Not linking proposals on existing item")) .map(_ => data) else - LinkProposal[F](data) + LinkProposal[F](store)(data) - def apply[F[_]: Sync](data: ItemData): Task[F, ProcessItemArgs, ItemData] = + def apply[F[_]: Sync]( + store: Store[F] + )(data: ItemData): Task[F, ProcessItemArgs, ItemData] = if (data.item.state == ItemState.Confirmed) Task .log[F, ProcessItemArgs](_.debug(s"Not linking proposals on confirmed item")) @@ -35,7 +39,7 @@ object LinkProposal { ctx.logger.info(s"Starting linking proposals") *> MetaProposalType.all - .traverse(applyValue(data, proposals, ctx)) + .traverse(applyValue(data, proposals, ctx, store)) .map(result => ctx.logger.info(s"Results from proposal processing: $result")) .map(_ => data) } @@ -43,7 +47,8 @@ object LinkProposal { def applyValue[F[_]: Sync]( data: ItemData, proposalList: MetaProposalList, - ctx: Context[F, ProcessItemArgs] + ctx: Context[F, ProcessItemArgs], + store: Store[F] )(mpt: MetaProposalType): F[Result] = data.givenMeta.find(mpt).orElse(proposalList.find(mpt)) match { case None => @@ -51,29 +56,30 @@ object LinkProposal { Result.noneFound(mpt).pure[F] case Some(a) if a.isSingleValue => ctx.logger.info(s"Found one candidate for ${a.proposalType}") *> - setItemMeta(data.item.id, ctx, a.proposalType, a.values.head.ref.id).map(_ => - Result.single(mpt) + setItemMeta(data.item.id, ctx, store, a.proposalType, a.values.head.ref.id).map( + _ => Result.single(mpt) ) case Some(a) => val ids = a.values.map(_.ref.id.id) ctx.logger.info( s"Found many (${a.size}, $ids) candidates for ${a.proposalType}. Setting first." ) *> - setItemMeta(data.item.id, ctx, a.proposalType, a.values.head.ref.id).map(_ => - Result.multiple(mpt) + setItemMeta(data.item.id, ctx, store, a.proposalType, a.values.head.ref.id).map( + _ => Result.multiple(mpt) ) } def setItemMeta[F[_]: Sync]( itemId: Ident, ctx: Context[F, ProcessItemArgs], + store: Store[F], mpt: MetaProposalType, value: Ident ): F[Int] = mpt match { case MetaProposalType.CorrOrg => ctx.logger.debug(s"Updating item organization with: ${value.id}") *> - ctx.store.transact( + store.transact( RItem.updateCorrOrg( NonEmptyList.of(itemId), ctx.args.meta.collective, @@ -82,7 +88,7 @@ object LinkProposal { ) case MetaProposalType.ConcPerson => ctx.logger.debug(s"Updating item concerning person with: $value") *> - ctx.store.transact( + store.transact( RItem.updateConcPerson( NonEmptyList.of(itemId), ctx.args.meta.collective, @@ -91,7 +97,7 @@ object LinkProposal { ) case MetaProposalType.CorrPerson => ctx.logger.debug(s"Updating item correspondent person with: $value") *> - ctx.store.transact( + store.transact( RItem.updateCorrPerson( NonEmptyList.of(itemId), ctx.args.meta.collective, @@ -100,7 +106,7 @@ object LinkProposal { ) case MetaProposalType.ConcEquip => ctx.logger.debug(s"Updating item concerning equipment with: $value") *> - ctx.store.transact( + store.transact( RItem.updateConcEquip( NonEmptyList.of(itemId), ctx.args.meta.collective, @@ -112,7 +118,7 @@ object LinkProposal { case Some(ld) => val ts = Timestamp.from(ld.atStartOfDay(Timestamp.UTC)) ctx.logger.debug(s"Updating item date ${value.id}") *> - ctx.store.transact( + store.transact( RItem.updateDate( NonEmptyList.of(itemId), ctx.args.meta.collective, @@ -128,7 +134,7 @@ object LinkProposal { case Some(ld) => val ts = Timestamp.from(ld.atStartOfDay(Timestamp.UTC)) ctx.logger.debug(s"Updating item due-date suggestion ${value.id}") *> - ctx.store.transact( + store.transact( RItem.updateDueDate( NonEmptyList.of(itemId), ctx.args.meta.collective, diff --git a/modules/joex/src/main/scala/docspell/joex/process/ProcessItem.scala b/modules/joex/src/main/scala/docspell/joex/process/ProcessItem.scala index 5918698c..819be5c7 100644 --- a/modules/joex/src/main/scala/docspell/joex/process/ProcessItem.scala +++ b/modules/joex/src/main/scala/docspell/joex/process/ProcessItem.scala @@ -8,7 +8,6 @@ package docspell.joex.process import cats.effect._ import cats.implicits._ - import docspell.analysis.TextAnalyser import docspell.backend.ops.OItem import docspell.common.ProcessItemArgs @@ -16,6 +15,7 @@ import docspell.ftsclient.FtsClient import docspell.joex.Config import docspell.joex.analysis.RegexNerFile import docspell.scheduler.Task +import docspell.store.Store object ProcessItem { @@ -24,12 +24,13 @@ object ProcessItem { itemOps: OItem[F], fts: FtsClient[F], analyser: TextAnalyser[F], - regexNer: RegexNerFile[F] + regexNer: RegexNerFile[F], + store: Store[F] )(item: ItemData): Task[F, ProcessItemArgs, ItemData] = - ExtractArchive(item) + ExtractArchive(store)(item) .flatMap(Task.setProgress(20)) - .flatMap(processAttachments0(cfg, fts, analyser, regexNer, (40, 60, 80))) - .flatMap(LinkProposal.onlyNew[F]) + .flatMap(processAttachments0(cfg, fts, analyser, regexNer, store, (40, 60, 80))) + .flatMap(LinkProposal.onlyNew[F](store)) .flatMap(SetGivenData.onlyNew[F](itemOps)) .flatMap(Task.setProgress(99)) .flatMap(RemoveEmptyItem(itemOps)) @@ -38,34 +39,37 @@ object ProcessItem { cfg: Config, fts: FtsClient[F], analyser: TextAnalyser[F], - regexNer: RegexNerFile[F] + regexNer: RegexNerFile[F], + store: Store[F] )(item: ItemData): Task[F, ProcessItemArgs, ItemData] = - processAttachments0[F](cfg, fts, analyser, regexNer, (30, 60, 90))(item) + processAttachments0[F](cfg, fts, analyser, regexNer, store, (30, 60, 90))(item) def analysisOnly[F[_]: Async]( cfg: Config, analyser: TextAnalyser[F], - regexNer: RegexNerFile[F] + regexNer: RegexNerFile[F], + store: Store[F] )(item: ItemData): Task[F, ProcessItemArgs, ItemData] = - TextAnalysis[F](cfg.textAnalysis, analyser, regexNer)(item) - .flatMap(FindProposal[F](cfg.textAnalysis)) - .flatMap(EvalProposals[F]) - .flatMap(CrossCheckProposals[F]) - .flatMap(SaveProposals[F]) + TextAnalysis[F](cfg.textAnalysis, analyser, regexNer, store)(item) + .flatMap(FindProposal[F](cfg.textAnalysis, store)) + .flatMap(EvalProposals[F](store)) + .flatMap(CrossCheckProposals[F](store)) + .flatMap(SaveProposals[F](store)) private def processAttachments0[F[_]: Async]( cfg: Config, fts: FtsClient[F], analyser: TextAnalyser[F], regexNer: RegexNerFile[F], + store: Store[F], progress: (Int, Int, Int) )(item: ItemData): Task[F, ProcessItemArgs, ItemData] = - ConvertPdf(cfg.convert, item) + ConvertPdf(cfg.convert, store, item) .flatMap(Task.setProgress(progress._1)) - .flatMap(TextExtraction(cfg.extraction, fts)) - .flatMap(AttachmentPreview(cfg.extraction.preview)) - .flatMap(AttachmentPageCount()) + .flatMap(TextExtraction(cfg.extraction, fts, store)) + .flatMap(AttachmentPreview(cfg.extraction.preview, store)) + .flatMap(AttachmentPageCount(store)) .flatMap(Task.setProgress(progress._2)) - .flatMap(analysisOnly[F](cfg, analyser, regexNer)) + .flatMap(analysisOnly[F](cfg, analyser, regexNer, store)) .flatMap(Task.setProgress(progress._3)) } diff --git a/modules/joex/src/main/scala/docspell/joex/process/ReProcessItem.scala b/modules/joex/src/main/scala/docspell/joex/process/ReProcessItem.scala index 97091b75..cc840710 100644 --- a/modules/joex/src/main/scala/docspell/joex/process/ReProcessItem.scala +++ b/modules/joex/src/main/scala/docspell/joex/process/ReProcessItem.scala @@ -9,7 +9,6 @@ package docspell.joex.process import cats.data.OptionT import cats.effect._ import cats.implicits._ - import docspell.analysis.TextAnalyser import docspell.backend.ops.OItem import docspell.common._ @@ -18,6 +17,7 @@ import docspell.joex.Config import docspell.joex.analysis.RegexNerFile import docspell.scheduler.Context import docspell.scheduler.Task +import docspell.store.Store import docspell.store.queries.QItem import docspell.store.records.RAttachment import docspell.store.records.RAttachmentSource @@ -32,13 +32,14 @@ object ReProcessItem { fts: FtsClient[F], itemOps: OItem[F], analyser: TextAnalyser[F], - regexNer: RegexNerFile[F] + regexNer: RegexNerFile[F], + store: Store[F] ): Task[F, Args, Unit] = Task .log[F, Args](_.info("===== Start reprocessing ======")) .flatMap(_ => - loadItem[F] - .flatMap(safeProcess[F](cfg, fts, itemOps, analyser, regexNer)) + loadItem[F](store) + .flatMap(safeProcess[F](cfg, fts, itemOps, analyser, regexNer, store)) .map(_ => ()) ) @@ -53,13 +54,13 @@ object ReProcessItem { else ra => selection.contains(ra.id) } - def loadItem[F[_]: Sync]: Task[F, Args, ItemData] = + def loadItem[F[_]: Sync](store: Store[F]): Task[F, Args, ItemData] = Task { ctx => (for { - item <- OptionT(ctx.store.transact(RItem.findById(ctx.args.itemId))) - attach <- OptionT.liftF(ctx.store.transact(RAttachment.findByItem(item.id))) + item <- OptionT(store.transact(RItem.findById(ctx.args.itemId))) + attach <- OptionT.liftF(store.transact(RAttachment.findByItem(item.id))) asrc <- - OptionT.liftF(ctx.store.transact(RAttachmentSource.findByItem(ctx.args.itemId))) + OptionT.liftF(store.transact(RAttachmentSource.findByItem(ctx.args.itemId))) asrcMap = asrc.map(s => s.id -> s).toMap // copy the original files over to attachments to run the default processing task // the processing doesn't touch the original files, only RAttachments @@ -97,6 +98,7 @@ object ReProcessItem { itemOps: OItem[F], analyser: TextAnalyser[F], regexNer: RegexNerFile[F], + store: Store[F], data: ItemData ): Task[F, Args, ItemData] = { @@ -121,21 +123,21 @@ object ReProcessItem { Nil ).pure[F] - getLanguage[F].flatMap { lang => + getLanguage[F](store).flatMap { lang => ProcessItem - .processAttachments[F](cfg, fts, analyser, regexNer)(data) - .flatMap(LinkProposal[F]) + .processAttachments[F](cfg, fts, analyser, regexNer, store)(data) + .flatMap(LinkProposal[F](store)) .flatMap(SetGivenData[F](itemOps)) .contramap[Args](convertArgs(lang)) } } - def getLanguage[F[_]: Sync]: Task[F, Args, Language] = + def getLanguage[F[_]: Sync](store: Store[F]): Task[F, Args, Language] = Task { ctx => val lang1 = OptionT( - ctx.store.transact(QItem.getItemLanguage(ctx.args.itemId)).map(_.headOption) + store.transact(QItem.getItemLanguage(ctx.args.itemId)).map(_.headOption) ) - val lang2 = OptionT(ctx.store.transact(RCollective.findByItem(ctx.args.itemId))) + val lang2 = OptionT(store.transact(RCollective.findByItem(ctx.args.itemId))) .map(_.language) lang1.orElse(lang2).getOrElse(Language.German) @@ -149,11 +151,12 @@ object ReProcessItem { fts: FtsClient[F], itemOps: OItem[F], analyser: TextAnalyser[F], - regexNer: RegexNerFile[F] + regexNer: RegexNerFile[F], + store: Store[F] )(data: ItemData): Task[F, Args, ItemData] = isLastRetry[F].flatMap { case true => - processFiles[F](cfg, fts, itemOps, analyser, regexNer, data).attempt + processFiles[F](cfg, fts, itemOps, analyser, regexNer, store, data).attempt .flatMap { case Right(d) => Task.pure(d) @@ -163,7 +166,7 @@ object ReProcessItem { ).andThen(_ => Sync[F].raiseError(ex)) } case false => - processFiles[F](cfg, fts, itemOps, analyser, regexNer, data) + processFiles[F](cfg, fts, itemOps, analyser, regexNer, store, data) } private def logWarn[F[_]](msg: => String): Task[F, Args, Unit] = diff --git a/modules/joex/src/main/scala/docspell/joex/process/SaveProposals.scala b/modules/joex/src/main/scala/docspell/joex/process/SaveProposals.scala index cdc8627a..fdb0c7a7 100644 --- a/modules/joex/src/main/scala/docspell/joex/process/SaveProposals.scala +++ b/modules/joex/src/main/scala/docspell/joex/process/SaveProposals.scala @@ -8,17 +8,16 @@ package docspell.joex.process import cats.effect.Sync import cats.implicits._ - import docspell.common._ import docspell.scheduler.{Context, Task} -import docspell.store.AddResult +import docspell.store.{AddResult, Store} import docspell.store.records._ /** Saves the proposals in the database */ object SaveProposals { type Args = ProcessItemArgs - def apply[F[_]: Sync](data: ItemData): Task[F, Args, ItemData] = + def apply[F[_]: Sync](store: Store[F])(data: ItemData): Task[F, Args, ItemData] = Task { ctx => for { _ <- ctx.logger.info("Storing proposals") @@ -26,20 +25,24 @@ object SaveProposals { .traverse(rm => ctx.logger.debug( s"Storing attachment proposals: ${rm.proposals}" - ) *> ctx.store.transact(RAttachmentMeta.updateProposals(rm.id, rm.proposals)) + ) *> store.transact(RAttachmentMeta.updateProposals(rm.id, rm.proposals)) ) _ <- if (data.classifyProposals.isEmpty && data.classifyTags.isEmpty) 0.pure[F] - else saveItemProposal(ctx, data) + else saveItemProposal(ctx, store, data) } yield data } - def saveItemProposal[F[_]: Sync](ctx: Context[F, Args], data: ItemData): F[Unit] = { + def saveItemProposal[F[_]: Sync]( + ctx: Context[F, Args], + store: Store[F], + data: ItemData + ): F[Unit] = { def upsert(v: RItemProposal): F[Int] = - ctx.store.add(RItemProposal.insert(v), RItemProposal.exists(v.itemId)).flatMap { + store.add(RItemProposal.insert(v), RItemProposal.exists(v.itemId)).flatMap { case AddResult.Success => 1.pure[F] case AddResult.EntityExists(_) => - ctx.store.transact(RItemProposal.update(v)) + store.transact(RItemProposal.update(v)) case AddResult.Failure(ex) => ctx.logger.warn(s"Could not store item proposals: ${ex.getMessage}") *> 0 .pure[F] @@ -47,7 +50,7 @@ object SaveProposals { for { _ <- ctx.logger.debug(s"Storing classifier proposals: ${data.classifyProposals}") - tags <- ctx.store.transact( + tags <- store.transact( RTag.findAllByNameOrId(data.classifyTags, ctx.args.meta.collective) ) tagRefs = tags.map(t => IdRef(t.tagId, t.name)) diff --git a/modules/joex/src/main/scala/docspell/joex/process/TextAnalysis.scala b/modules/joex/src/main/scala/docspell/joex/process/TextAnalysis.scala index aced45f7..774040dd 100644 --- a/modules/joex/src/main/scala/docspell/joex/process/TextAnalysis.scala +++ b/modules/joex/src/main/scala/docspell/joex/process/TextAnalysis.scala @@ -9,7 +9,6 @@ package docspell.joex.process import cats.Traverse import cats.effect._ import cats.implicits._ - import docspell.analysis.classifier.TextClassifier import docspell.analysis.{NlpSettings, TextAnalyser} import docspell.common.MetaProposal.Candidate @@ -20,6 +19,7 @@ import docspell.joex.learn.{ClassifierName, Classify, LearnClassifierTask} import docspell.joex.process.ItemData.AttachmentDates import docspell.scheduler.Context import docspell.scheduler.Task +import docspell.store.Store import docspell.store.records.{RAttachmentMeta, RClassifierSetting} object TextAnalysis { @@ -28,7 +28,8 @@ object TextAnalysis { def apply[F[_]: Async]( cfg: Config.TextAnalysis, analyser: TextAnalyser[F], - nerFile: RegexNerFile[F] + nerFile: RegexNerFile[F], + store: Store[F] )(item: ItemData): Task[F, Args, ItemData] = Task { ctx => for { @@ -41,18 +42,19 @@ object TextAnalysis { ) _ <- ctx.logger.debug(s"Storing tags: ${t.map(_._1.copy(content = None))}") _ <- t.traverse(m => - ctx.store.transact(RAttachmentMeta.updateLabels(m._1.id, m._1.nerlabels)) + store.transact(RAttachmentMeta.updateLabels(m._1.id, m._1.nerlabels)) ) v = t.toVector - autoTagEnabled <- getActiveAutoTag(ctx, cfg) + autoTagEnabled <- getActiveAutoTag(ctx, store, cfg) tag <- - if (autoTagEnabled) predictTags(ctx, cfg, item.metas, analyser.classifier) + if (autoTagEnabled) + predictTags(ctx, store, cfg, item.metas, analyser.classifier) else List.empty[String].pure[F] classProposals <- if (cfg.classification.enabled) - predictItemEntities(ctx, cfg, item.metas, analyser.classifier) + predictItemEntities(ctx, store, cfg, item.metas, analyser.classifier) else MetaProposalList.empty.pure[F] e <- s @@ -86,16 +88,17 @@ object TextAnalysis { def predictTags[F[_]: Async]( ctx: Context[F, Args], + store: Store[F], cfg: Config.TextAnalysis, metas: Vector[RAttachmentMeta], classifier: TextClassifier[F] ): F[List[String]] = { val text = metas.flatMap(_.content).mkString(LearnClassifierTask.pageSep) val classifyWith: ClassifierName => F[Option[String]] = - makeClassify(ctx, cfg, classifier)(text) + makeClassify(ctx, store, cfg, classifier)(text) for { - names <- ctx.store.transact( + names <- store.transact( ClassifierName.findTagClassifiers(ctx.args.meta.collective) ) _ <- ctx.logger.debug(s"Guessing tags for ${names.size} categories") @@ -105,6 +108,7 @@ object TextAnalysis { def predictItemEntities[F[_]: Async]( ctx: Context[F, Args], + store: Store[F], cfg: Config.TextAnalysis, metas: Vector[RAttachmentMeta], classifier: TextClassifier[F] @@ -116,7 +120,7 @@ object TextAnalysis { mtype: MetaProposalType ): F[Option[MetaProposal]] = for { - label <- makeClassify(ctx, cfg, classifier)(text).apply(cname) + label <- makeClassify(ctx, store, cfg, classifier)(text).apply(cname) } yield label.map(str => MetaProposal(mtype, Candidate(IdRef(Ident.unsafe(""), str), Set.empty)) ) @@ -136,13 +140,14 @@ object TextAnalysis { private def makeClassify[F[_]: Async]( ctx: Context[F, Args], + store: Store[F], cfg: Config.TextAnalysis, classifier: TextClassifier[F] )(text: String): ClassifierName => F[Option[String]] = Classify[F]( ctx.logger, cfg.workingDir, - ctx.store, + store, classifier, ctx.args.meta.collective, text @@ -150,10 +155,11 @@ object TextAnalysis { private def getActiveAutoTag[F[_]: Sync]( ctx: Context[F, Args], + store: Store[F], cfg: Config.TextAnalysis ): F[Boolean] = if (cfg.classification.enabled) - ctx.store + store .transact(RClassifierSetting.findById(ctx.args.meta.collective)) .map(_.exists(_.autoTagEnabled)) .flatTap(enabled => diff --git a/modules/joex/src/main/scala/docspell/joex/process/TextExtraction.scala b/modules/joex/src/main/scala/docspell/joex/process/TextExtraction.scala index eaa02683..757ee54f 100644 --- a/modules/joex/src/main/scala/docspell/joex/process/TextExtraction.scala +++ b/modules/joex/src/main/scala/docspell/joex/process/TextExtraction.scala @@ -9,16 +9,16 @@ package docspell.joex.process import cats.data.OptionT import cats.effect._ import cats.implicits._ - import docspell.common._ import docspell.extract.{ExtractConfig, ExtractResult, Extraction} import docspell.ftsclient.{FtsClient, TextData} import docspell.scheduler.{Context, Task} +import docspell.store.Store import docspell.store.records.{RAttachment, RAttachmentMeta, RFileMeta} object TextExtraction { - def apply[F[_]: Async](cfg: ExtractConfig, fts: FtsClient[F])( + def apply[F[_]: Async](cfg: ExtractConfig, fts: FtsClient[F], store: Store[F])( item: ItemData ): Task[F, ProcessItemArgs, ItemData] = Task { ctx => @@ -30,6 +30,7 @@ object TextExtraction { txt <- item.attachments.traverse( extractTextIfEmpty( ctx, + store, cfg, ctx.args.meta.language, ctx.args.meta.collective, @@ -38,7 +39,7 @@ object TextExtraction { ) _ <- ctx.logger.debug("Storing extracted texts …") _ <- - txt.toList.traverse(res => ctx.store.transact(RAttachmentMeta.upsert(res.am))) + txt.toList.traverse(res => store.transact(RAttachmentMeta.upsert(res.am))) _ <- ctx.logger.debug(s"Extracted text stored.") idxItem = TextData.item( item.item.id, @@ -65,6 +66,7 @@ object TextExtraction { def extractTextIfEmpty[F[_]: Async]( ctx: Context[F, ProcessItemArgs], + store: Store[F], cfg: ExtractConfig, lang: Language, collective: Ident, @@ -91,13 +93,14 @@ object TextExtraction { ctx.logger.info("TextExtraction skipped, since text is already available.") *> makeTextData((rm, Nil)).pure[F] case _ => - extractTextToMeta[F](ctx, cfg, lang, item)(ra) + extractTextToMeta[F](ctx, store, cfg, lang, item)(ra) .map(makeTextData) } } def extractTextToMeta[F[_]: Async]( ctx: Context[F, _], + store: Store[F], cfg: ExtractConfig, lang: Language, item: ItemData @@ -105,8 +108,8 @@ object TextExtraction { for { _ <- ctx.logger.debug(s"Extracting text for attachment ${stripAttachmentName(ra)}") dst <- Duration.stopTime[F] - fids <- filesToExtract(ctx)(item, ra) - res <- extractTextFallback(ctx, cfg, ra, lang)(fids) + fids <- filesToExtract(store)(item, ra) + res <- extractTextFallback(ctx, store, cfg, ra, lang)(fids) meta = item.changeMeta( ra.id, lang, @@ -123,14 +126,14 @@ object TextExtraction { } yield (meta, tags) def extractText[F[_]: Sync]( - ctx: Context[F, _], + store: Store[F], extr: Extraction[F], lang: Language )(fileId: FileKey): F[ExtractResult] = { - val data = ctx.store.fileRepo.getBytes(fileId) + val data = store.fileRepo.getBytes(fileId) def findMime: F[MimeType] = - OptionT(ctx.store.fileRepo.findMeta(fileId)) + OptionT(store.fileRepo.findMeta(fileId)) .map(_.mimetype) .getOrElse(MimeType.octetStream) @@ -140,6 +143,7 @@ object TextExtraction { private def extractTextFallback[F[_]: Async]( ctx: Context[F, _], + store: Store[F], cfg: ExtractConfig, ra: RAttachment, lang: Language @@ -151,7 +155,7 @@ object TextExtraction { case id :: rest => val extr = Extraction.create[F](ctx.logger, cfg) - extractText[F](ctx, extr, lang)(id) + extractText[F](store, extr, lang)(id) .flatMap { case res @ ExtractResult.Success(_, _) => res.some.pure[F] @@ -161,12 +165,12 @@ object TextExtraction { .warn( s"Cannot extract text from file ${stripAttachmentName(ra)}: unsupported format ${mt.asString}. Try with converted file." ) - .flatMap(_ => extractTextFallback[F](ctx, cfg, ra, lang)(rest)) + .flatMap(_ => extractTextFallback[F](ctx, store, cfg, ra, lang)(rest)) case ExtractResult.Failure(ex) => ctx.logger .warn(s"Cannot extract text: ${ex.getMessage}. Try with converted file") - .flatMap(_ => extractTextFallback[F](ctx, cfg, ra, lang)(rest)) + .flatMap(_ => extractTextFallback[F](ctx, store, cfg, ra, lang)(rest)) } } @@ -176,13 +180,13 @@ object TextExtraction { * If the source file is a PDF, then use the converted file. This may then already * contain the text if ocrmypdf is enabled. If it is disabled, both files are the same. */ - private def filesToExtract[F[_]: Sync](ctx: Context[F, _])( + private def filesToExtract[F[_]: Sync](store: Store[F])( item: ItemData, ra: RAttachment ): F[List[FileKey]] = item.originFile.get(ra.id) match { case Some(sid) => - ctx.store.transact(RFileMeta.findMime(sid)).map { + store.transact(RFileMeta.findMime(sid)).map { case Some(MimeType.PdfMatch(_)) => List(ra.fileId) case _ => diff --git a/modules/joex/src/main/scala/docspell/joex/routes/JoexRoutes.scala b/modules/joex/src/main/scala/docspell/joex/routes/JoexRoutes.scala index 1a582b98..6810f3d9 100644 --- a/modules/joex/src/main/scala/docspell/joex/routes/JoexRoutes.scala +++ b/modules/joex/src/main/scala/docspell/joex/routes/JoexRoutes.scala @@ -12,7 +12,7 @@ import cats.implicits._ import docspell.common.{Duration, Ident, Timestamp} import docspell.joex.JoexApp import docspell.joexapi.model._ -import docspell.store.records.{RJob, RJobLog} +import docspell.store.records.RJobLog import org.http4s.HttpRoutes import org.http4s.circe.CirceEntityEncoder._ @@ -67,17 +67,19 @@ object JoexRoutes { } } - def mkJob(j: RJob): Job = + // TODO !! + + def mkJob(j: docspell.scheduler.Job[String]): Job = Job( j.id, j.subject, - j.submitted, + Timestamp.Epoch, j.priority, - j.retries, - j.progress, - j.started.getOrElse(Timestamp.Epoch) + -1, + -1, + Timestamp.Epoch ) - def mkJobLog(j: RJob, jl: Vector[RJobLog]): JobAndLog = + def mkJobLog(j: docspell.scheduler.Job[String], jl: Vector[RJobLog]): JobAndLog = JobAndLog(mkJob(j), jl.map(r => JobLogEvent(r.created, r.level, r.message)).toList) } diff --git a/modules/joex/src/main/scala/docspell/joex/scanmailbox/ScanMailboxTask.scala b/modules/joex/src/main/scala/docspell/joex/scanmailbox/ScanMailboxTask.scala index 363420b0..5a91003b 100644 --- a/modules/joex/src/main/scala/docspell/joex/scanmailbox/ScanMailboxTask.scala +++ b/modules/joex/src/main/scala/docspell/joex/scanmailbox/ScanMailboxTask.scala @@ -12,7 +12,6 @@ import cats.data.OptionT import cats.effect._ import cats.implicits._ import fs2._ - import docspell.backend.ops.{OJoex, OUpload} import docspell.common._ import docspell.joex.Config @@ -20,8 +19,8 @@ import docspell.scheduler.{Context, Task} import docspell.logging.Logger import docspell.store.queries.QOrganization import docspell.store.records._ - import _root_.io.circe.syntax._ +import docspell.store.Store import emil.SearchQuery.{All, ReceivedDate} import emil.javamail.syntax._ import emil.{MimeType => _, _} @@ -32,6 +31,7 @@ object ScanMailboxTask { def apply[F[_]: Sync]( cfg: Config.ScanMailbox, + store: Store[F], emil: Emil[F], upload: OUpload[F], joex: OJoex[F] @@ -42,22 +42,22 @@ object ScanMailboxTask { s"=== Start importing mails for user ${ctx.args.account.user.id}" ) _ <- ctx.logger.debug(s"Settings: ${ctx.args.asJson.noSpaces}") - mailCfg <- getMailSettings(ctx) + mailCfg <- getMailSettings(ctx, store) folders = ctx.args.folders.mkString(", ") userId = ctx.args.account.user imapConn = ctx.args.imapConnection _ <- ctx.logger.info( s"Reading mails for user ${userId.id} from ${imapConn.id}/$folders" ) - _ <- importMails(cfg, mailCfg, emil, upload, joex, ctx) + _ <- importMails(cfg, mailCfg, emil, upload, joex, ctx, store) } yield () } def onCancel[F[_]]: Task[F, ScanMailboxArgs, Unit] = Task.log(_.warn("Cancelling scan-mailbox task")) - def getMailSettings[F[_]: Sync](ctx: Context[F, Args]): F[RUserImap] = - ctx.store + def getMailSettings[F[_]: Sync](ctx: Context[F, Args], store: Store[F]): F[RUserImap] = + store .transact(RUserImap.getByName(ctx.args.account, ctx.args.imapConnection)) .flatMap { case Some(c) => c.pure[F] @@ -75,10 +75,11 @@ object ScanMailboxTask { theEmil: Emil[F], upload: OUpload[F], joex: OJoex[F], - ctx: Context[F, Args] + ctx: Context[F, Args], + store: Store[F] ): F[Unit] = { val mailer = theEmil(mailCfg.toMailConfig) - val impl = new Impl[F](cfg, ctx) + val impl = new Impl[F](cfg, ctx, store) val inFolders = ctx.args.folders.take(cfg.maxFolders) val getInitialInput = @@ -142,7 +143,11 @@ object ScanMailboxTask { ScanResult(List(folder -> left), processed) } - final private class Impl[F[_]: Sync](cfg: Config.ScanMailbox, ctx: Context[F, Args]) { + final private class Impl[F[_]: Sync]( + cfg: Config.ScanMailbox, + ctx: Context[F, Args], + store: Store[F] + ) { private def logOp[C](f: Logger[F] => F[Unit]): MailOp[F, C, Unit] = MailOp(_ => f(ctx.logger)) @@ -213,7 +218,7 @@ object ScanMailboxTask { NonEmptyList.fromFoldable(headers.flatMap(_.mh.messageId)) match { case Some(nl) => for { - archives <- ctx.store.transact( + archives <- store.transact( RAttachmentArchive .findByMessageIdAndCollective(nl, ctx.args.account.collective) ) @@ -237,7 +242,7 @@ object ScanMailboxTask { for { from <- OptionT.fromOption[F](mh.from) _ <- OptionT( - ctx.store.transact( + store.transact( QOrganization .findPersonByContact( ctx.args.account.collective, diff --git a/modules/joex/src/main/scala/docspell/joex/updatecheck/UpdateCheckTask.scala b/modules/joex/src/main/scala/docspell/joex/updatecheck/UpdateCheckTask.scala index 6da9e345..128bc76b 100644 --- a/modules/joex/src/main/scala/docspell/joex/updatecheck/UpdateCheckTask.scala +++ b/modules/joex/src/main/scala/docspell/joex/updatecheck/UpdateCheckTask.scala @@ -10,9 +10,9 @@ import cats.data.OptionT import cats.effect._ import cats.implicits._ import docspell.common._ -import docspell.scheduler.Context import docspell.scheduler.Task import docspell.scheduler.usertask.UserTask +import docspell.store.Store import docspell.store.records.RUserEmail import emil._ @@ -37,6 +37,7 @@ object UpdateCheckTask { def apply[F[_]: Async]( cfg: UpdateCheckConfig, sendCfg: MailSendConfig, + store: Store[F], emil: Emil[F], updateCheck: UpdateCheck[F], thisVersion: ThisVersion @@ -50,7 +51,7 @@ object UpdateCheckTask { _ <- ctx.logger.debug( s"Get SMTP connection for ${cfg.senderAccount.asString} and ${cfg.smtpId}" ) - smtpCfg <- findConnection(ctx, cfg) + smtpCfg <- findConnection(store, cfg) _ <- ctx.logger.debug("Checking for latest release at GitHub") latest <- updateCheck.latestRelease _ <- ctx.logger.debug(s"Got latest release: $latest.") @@ -77,10 +78,10 @@ object UpdateCheckTask { Task.pure(()) def findConnection[F[_]: Sync]( - ctx: Context[F, _], + store: Store[F], cfg: UpdateCheckConfig ): F[RUserEmail] = - OptionT(ctx.store.transact(RUserEmail.getByName(cfg.senderAccount, cfg.smtpId))) + OptionT(store.transact(RUserEmail.getByName(cfg.senderAccount, cfg.smtpId))) .getOrElseF( Sync[F].raiseError( new Exception( diff --git a/modules/scheduler/api/src/main/scala/docspell/scheduler/Context.scala b/modules/scheduler/api/src/main/scala/docspell/scheduler/Context.scala index abefafa5..0594dcf8 100644 --- a/modules/scheduler/api/src/main/scala/docspell/scheduler/Context.scala +++ b/modules/scheduler/api/src/main/scala/docspell/scheduler/Context.scala @@ -8,7 +8,6 @@ package docspell.scheduler import docspell.common._ import docspell.logging.Logger -import docspell.store.Store trait Context[F[_], A] { self => @@ -22,8 +21,6 @@ trait Context[F[_], A] { self => def setProgress(percent: Int): F[Unit] - def store: Store[F] - def isLastRetry: F[Boolean] def map[C](f: A => C): Context[F, C] diff --git a/modules/scheduler/api/src/main/scala/docspell/scheduler/Scheduler.scala b/modules/scheduler/api/src/main/scala/docspell/scheduler/Scheduler.scala index 0e44244b..e3f1193e 100644 --- a/modules/scheduler/api/src/main/scala/docspell/scheduler/Scheduler.scala +++ b/modules/scheduler/api/src/main/scala/docspell/scheduler/Scheduler.scala @@ -10,13 +10,12 @@ import cats.effect._ import fs2.Stream import docspell.common.Ident -import docspell.store.records.RJob trait Scheduler[F[_]] { def config: SchedulerConfig - def getRunning: F[Vector[RJob]] + def getRunning: F[Vector[Job[String]]] def requestCancel(jobId: Ident): F[Boolean] diff --git a/modules/scheduler/impl/src/main/scala/docspell/scheduler/impl/ContextImpl.scala b/modules/scheduler/impl/src/main/scala/docspell/scheduler/impl/ContextImpl.scala index 45647f03..69d2b34b 100644 --- a/modules/scheduler/impl/src/main/scala/docspell/scheduler/impl/ContextImpl.scala +++ b/modules/scheduler/impl/src/main/scala/docspell/scheduler/impl/ContextImpl.scala @@ -13,7 +13,7 @@ import docspell.store.records.RJob class ContextImpl[F[_]: Functor, A]( val args: A, val logger: Logger[F], - val store: Store[F], + store: Store[F], val config: SchedulerConfig, val jobId: Ident ) extends Context[F, A] { diff --git a/modules/scheduler/impl/src/main/scala/docspell/scheduler/impl/SchedulerImpl.scala b/modules/scheduler/impl/src/main/scala/docspell/scheduler/impl/SchedulerImpl.scala index 4aad8901..0f063599 100644 --- a/modules/scheduler/impl/src/main/scala/docspell/scheduler/impl/SchedulerImpl.scala +++ b/modules/scheduler/impl/src/main/scala/docspell/scheduler/impl/SchedulerImpl.scala @@ -64,8 +64,23 @@ final class SchedulerImpl[F[_]: Async]( .drain ) - def getRunning: F[Vector[RJob]] = - state.get.flatMap(s => QJob.findAll(s.getRunning, store)) + def getRunning: F[Vector[Job[String]]] = + state.get + .flatMap(s => QJob.findAll(s.getRunning, store)) + .map( + _.map(rj => + Job( + rj.id, + rj.task, + rj.group, + rj.args, + rj.subject, + rj.submitter, + rj.priority, + rj.tracker + ) + ) + ) def requestCancel(jobId: Ident): F[Boolean] = logger.info(s"Scheduler requested to cancel job: ${jobId.id}") *>