From 0b3fe8eb5c2afff1110cfec1c49a52fd518e4434 Mon Sep 17 00:00:00 2001 From: eikek Date: Sun, 20 Mar 2022 11:40:44 +0100 Subject: [PATCH] Publish joex from within the job store Reduces friction when using the job store. --- .../scala/docspell/backend/BackendApp.scala | 9 ++--- .../docspell/backend/ops/OCollective.scala | 9 +---- .../backend/ops/OFileRepository.scala | 23 +++-------- .../docspell/backend/ops/OFulltext.scala | 7 ++-- .../scala/docspell/backend/ops/OItem.scala | 40 +++++-------------- .../scala/docspell/backend/ops/OUpload.scala | 21 +++------- .../docspell/backend/ops/OUserTask.scala | 5 +-- .../main/scala/docspell/joex/JoexTasks.scala | 12 +++--- .../joex/pagecount/AllPageCountTask.scala | 5 +-- .../joex/pdfconv/ConvertAllPdfTask.scala | 3 -- .../joex/preview/AllPreviewsTask.scala | 3 -- .../joex/scanmailbox/ScanMailboxTask.scala | 2 +- .../restserver/routes/AttachmentRoutes.scala | 7 ++-- .../routes/FileRepositoryRoutes.scala | 4 +- .../routes/IntegrationEndpointRoutes.scala | 2 +- .../restserver/routes/ItemMultiRoutes.scala | 2 +- .../restserver/routes/ItemRoutes.scala | 2 +- .../restserver/routes/UploadRoutes.scala | 2 +- .../scheduler/impl/JobStorePublish.scala | 35 ++++++++++------ 19 files changed, 71 insertions(+), 122 deletions(-) diff --git a/modules/backend/src/main/scala/docspell/backend/BackendApp.scala b/modules/backend/src/main/scala/docspell/backend/BackendApp.scala index 2413b37c..09611031 100644 --- a/modules/backend/src/main/scala/docspell/backend/BackendApp.scala +++ b/modules/backend/src/main/scala/docspell/backend/BackendApp.scala @@ -78,18 +78,17 @@ object BackendApp { tagImpl <- OTag[F](store) equipImpl <- OEquipment[F](store) orgImpl <- OOrganization(store) - uploadImpl <- OUpload(store, schedulerModule.jobs, joexImpl) + uploadImpl <- OUpload(store, schedulerModule.jobs) nodeImpl <- ONode(store) jobImpl <- OJob(store, joexImpl, pubSubT) createIndex <- CreateIndex.resource(ftsClient, store) - itemImpl <- OItem(store, ftsClient, createIndex, schedulerModule.jobs, joexImpl) + itemImpl <- OItem(store, ftsClient, createIndex, schedulerModule.jobs) itemSearchImpl <- OItemSearch(store) fulltextImpl <- OFulltext( itemSearchImpl, ftsClient, store, - schedulerModule.jobs, - joexImpl + schedulerModule.jobs ) mailImpl <- OMail(store, javaEmil) userTaskImpl <- OUserTask( @@ -106,7 +105,7 @@ object BackendApp { ) notifyImpl <- ONotification(store, notificationMod) bookmarksImpl <- OQueryBookmarks(store) - fileRepoImpl <- OFileRepository(store, schedulerModule.jobs, joexImpl) + fileRepoImpl <- OFileRepository(store, schedulerModule.jobs) itemLinkImpl <- Resource.pure(OItemLink(store, itemSearchImpl)) } yield new BackendApp[F] { val pubSub = pubSubT diff --git a/modules/backend/src/main/scala/docspell/backend/ops/OCollective.scala b/modules/backend/src/main/scala/docspell/backend/ops/OCollective.scala index bd4451e3..dfcf587d 100644 --- a/modules/backend/src/main/scala/docspell/backend/ops/OCollective.scala +++ b/modules/backend/src/main/scala/docspell/backend/ops/OCollective.scala @@ -78,8 +78,7 @@ trait OCollective[F[_]] { */ def generatePreviews( storeMode: MakePreviewArgs.StoreMode, - account: AccountId, - notifyJoex: Boolean + account: AccountId ): F[UpdateResult] } @@ -206,7 +205,6 @@ object OCollective { ) _ <- uts .executeNow(UserTaskScope(collective), args.makeSubject.some, ut) - _ <- joex.notifyAllNodes } yield () def startEmptyTrash(args: EmptyTrashArgs): F[Unit] = @@ -222,7 +220,6 @@ object OCollective { ) _ <- uts .executeNow(UserTaskScope(args.collective), args.makeSubject.some, ut) - _ <- joex.notifyAllNodes } yield () def findSettings(collective: Ident): F[Option[OCollective.Settings]] = @@ -313,8 +310,7 @@ object OCollective { def generatePreviews( storeMode: MakePreviewArgs.StoreMode, - account: AccountId, - notifyJoex: Boolean + account: AccountId ): F[UpdateResult] = for { job <- JobFactory.allPreviews[F]( @@ -322,7 +318,6 @@ object OCollective { Some(account.user) ) _ <- jobStore.insertIfNew(job.encode) - _ <- if (notifyJoex) joex.notifyAllNodes else ().pure[F] } yield UpdateResult.success }) diff --git a/modules/backend/src/main/scala/docspell/backend/ops/OFileRepository.scala b/modules/backend/src/main/scala/docspell/backend/ops/OFileRepository.scala index 0e2d9cdb..fc2dc045 100644 --- a/modules/backend/src/main/scala/docspell/backend/ops/OFileRepository.scala +++ b/modules/backend/src/main/scala/docspell/backend/ops/OFileRepository.scala @@ -21,15 +21,9 @@ import scodec.bits.ByteVector trait OFileRepository[F[_]] { /** Inserts the job or return None if such a job already is running. */ - def cloneFileRepository( - args: FileCopyTaskArgs, - notifyJoex: Boolean - ): F[Option[Job[FileCopyTaskArgs]]] + def cloneFileRepository(args: FileCopyTaskArgs): F[Option[Job[FileCopyTaskArgs]]] - def checkIntegrityAll( - part: FileKeyPart, - notifyJoex: Boolean - ): F[Option[Job[FileIntegrityCheckArgs]]] + def checkIntegrityAll(part: FileKeyPart): F[Option[Job[FileIntegrityCheckArgs]]] def checkIntegrity(key: FileKey, hash: Option[ByteVector]): F[Option[IntegrityResult]] } @@ -40,30 +34,23 @@ object OFileRepository { def apply[F[_]: Async]( store: Store[F], - jobStore: JobStore[F], - joex: OJoex[F] + jobStore: JobStore[F] ): Resource[F, OFileRepository[F]] = Resource.pure(new OFileRepository[F] { private[this] val logger = docspell.logging.getLogger[F] - def cloneFileRepository( - args: FileCopyTaskArgs, - notifyJoex: Boolean - ): F[Option[Job[FileCopyTaskArgs]]] = + def cloneFileRepository(args: FileCopyTaskArgs): F[Option[Job[FileCopyTaskArgs]]] = for { job <- JobFactory.fileCopy(args) flag <- jobStore.insertIfNew(job.encode) - _ <- if (notifyJoex) joex.notifyAllNodes else ().pure[F] } yield Option.when(flag)(job) def checkIntegrityAll( - part: FileKeyPart, - notifyJoex: Boolean + part: FileKeyPart ): F[Option[Job[FileIntegrityCheckArgs]]] = for { job <- JobFactory.integrityCheck(FileIntegrityCheckArgs(part)) flag <- jobStore.insertIfNew(job.encode) - _ <- if (notifyJoex) joex.notifyAllNodes else ().pure[F] } yield Option.when(flag)(job) def checkIntegrity( diff --git a/modules/backend/src/main/scala/docspell/backend/ops/OFulltext.scala b/modules/backend/src/main/scala/docspell/backend/ops/OFulltext.scala index 39452ea5..1ae010fd 100644 --- a/modules/backend/src/main/scala/docspell/backend/ops/OFulltext.scala +++ b/modules/backend/src/main/scala/docspell/backend/ops/OFulltext.scala @@ -81,8 +81,7 @@ object OFulltext { itemSearch: OItemSearch[F], fts: FtsClient[F], store: Store[F], - jobStore: JobStore[F], - joex: OJoex[F] + jobStore: JobStore[F] ): Resource[F, OFulltext[F]] = Resource.pure[F, OFulltext[F]](new OFulltext[F] { val logger = docspell.logging.getLogger[F] @@ -90,7 +89,7 @@ object OFulltext { for { _ <- logger.info(s"Re-index all.") job <- JobFactory.reIndexAll[F] - _ <- jobStore.insertIfNew(job.encode) *> joex.notifyAllNodes + _ <- jobStore.insertIfNew(job.encode) } yield () def reindexCollective(account: AccountId): F[Unit] = @@ -102,7 +101,7 @@ object OFulltext { job <- JobFactory.reIndex(account) _ <- if (exist.isDefined) ().pure[F] - else jobStore.insertIfNew(job.encode) *> joex.notifyAllNodes + else jobStore.insertIfNew(job.encode) } yield () def findIndexOnly(maxNoteLen: Int)( diff --git a/modules/backend/src/main/scala/docspell/backend/ops/OItem.scala b/modules/backend/src/main/scala/docspell/backend/ops/OItem.scala index 04643348..a6e3b314 100644 --- a/modules/backend/src/main/scala/docspell/backend/ops/OItem.scala +++ b/modules/backend/src/main/scala/docspell/backend/ops/OItem.scala @@ -183,14 +183,12 @@ trait OItem[F[_]] { def reprocess( item: Ident, attachments: List[Ident], - account: AccountId, - notifyJoex: Boolean + account: AccountId ): F[UpdateResult] def reprocessAll( items: Nel[Ident], - account: AccountId, - notifyJoex: Boolean + account: AccountId ): F[UpdateResult] /** Submits a task that finds all non-converted pdfs and triggers converting them using @@ -198,22 +196,17 @@ trait OItem[F[_]] { */ def convertAllPdf( collective: Option[Ident], - submitter: Option[Ident], - notifyJoex: Boolean + submitter: Option[Ident] ): F[UpdateResult] /** Submits a task that (re)generates the preview image for an attachment. */ def generatePreview( args: MakePreviewArgs, - account: AccountId, - notifyJoex: Boolean + account: AccountId ): F[UpdateResult] /** Submits a task that (re)generates the preview images for all attachments. */ - def generateAllPreviews( - storeMode: MakePreviewArgs.StoreMode, - notifyJoex: Boolean - ): F[UpdateResult] + def generateAllPreviews(storeMode: MakePreviewArgs.StoreMode): F[UpdateResult] /** Merges a list of items into one item. The remaining items are deleted. */ def merge( @@ -228,8 +221,7 @@ object OItem { store: Store[F], fts: FtsClient[F], createIndex: CreateIndex[F], - jobStore: JobStore[F], - joex: OJoex[F] + jobStore: JobStore[F] ): Resource[F, OItem[F]] = for { otag <- OTag(store) @@ -752,8 +744,7 @@ object OItem { def reprocess( item: Ident, attachments: List[Ident], - account: AccountId, - notifyJoex: Boolean + account: AccountId ): F[UpdateResult] = (for { _ <- OptionT( @@ -764,13 +755,11 @@ object OItem { JobFactory.reprocessItem[F](args, account, Priority.Low) ) _ <- OptionT.liftF(jobStore.insertIfNew(job.encode)) - _ <- OptionT.liftF(if (notifyJoex) joex.notifyAllNodes else ().pure[F]) } yield UpdateResult.success).getOrElse(UpdateResult.notFound) def reprocessAll( items: Nel[Ident], - account: AccountId, - notifyJoex: Boolean + account: AccountId ): F[UpdateResult] = UpdateResult.fromUpdate(for { items <- store.transact(RItem.filterItems(items, account.collective)) @@ -779,39 +768,32 @@ object OItem { .traverse(arg => JobFactory.reprocessItem[F](arg, account, Priority.Low)) .map(_.map(_.encode)) _ <- jobStore.insertAllIfNew(jobs) - _ <- if (notifyJoex) joex.notifyAllNodes else ().pure[F] } yield items.size) def convertAllPdf( collective: Option[Ident], - submitter: Option[Ident], - notifyJoex: Boolean + submitter: Option[Ident] ): F[UpdateResult] = for { job <- JobFactory.convertAllPdfs[F](collective, submitter, Priority.Low) _ <- jobStore.insertIfNew(job.encode) - _ <- if (notifyJoex) joex.notifyAllNodes else ().pure[F] } yield UpdateResult.success def generatePreview( args: MakePreviewArgs, - account: AccountId, - notifyJoex: Boolean + account: AccountId ): F[UpdateResult] = for { job <- JobFactory.makePreview[F](args, account.some) _ <- jobStore.insertIfNew(job.encode) - _ <- if (notifyJoex) joex.notifyAllNodes else ().pure[F] } yield UpdateResult.success def generateAllPreviews( - storeMode: MakePreviewArgs.StoreMode, - notifyJoex: Boolean + storeMode: MakePreviewArgs.StoreMode ): F[UpdateResult] = for { job <- JobFactory.allPreviews[F](AllPreviewsArgs(None, storeMode), None) _ <- jobStore.insertIfNew(job.encode) - _ <- if (notifyJoex) joex.notifyAllNodes else ().pure[F] } yield UpdateResult.success private def onSuccessIgnoreError(update: F[Unit])(ar: UpdateResult): F[Unit] = diff --git a/modules/backend/src/main/scala/docspell/backend/ops/OUpload.scala b/modules/backend/src/main/scala/docspell/backend/ops/OUpload.scala index 8857ff83..9d49e65d 100644 --- a/modules/backend/src/main/scala/docspell/backend/ops/OUpload.scala +++ b/modules/backend/src/main/scala/docspell/backend/ops/OUpload.scala @@ -23,7 +23,6 @@ trait OUpload[F[_]] { def submit( data: OUpload.UploadData[F], account: AccountId, - notifyJoex: Boolean, itemId: Option[Ident] ): F[OUpload.UploadResult] @@ -34,21 +33,19 @@ trait OUpload[F[_]] { def submit( data: OUpload.UploadData[F], sourceId: Ident, - notifyJoex: Boolean, itemId: Option[Ident] ): F[OUpload.UploadResult] final def submitEither( data: OUpload.UploadData[F], accOrSrc: Either[Ident, AccountId], - notifyJoex: Boolean, itemId: Option[Ident] ): F[OUpload.UploadResult] = accOrSrc match { case Right(acc) => - submit(data, acc, notifyJoex, itemId) + submit(data, acc, itemId) case Left(srcId) => - submit(data, srcId, notifyJoex, itemId) + submit(data, srcId, itemId) } } @@ -109,15 +106,13 @@ object OUpload { def apply[F[_]: Sync]( store: Store[F], - jobStore: JobStore[F], - joex: OJoex[F] + jobStore: JobStore[F] ): Resource[F, OUpload[F]] = Resource.pure[F, OUpload[F]](new OUpload[F] { private[this] val logger = docspell.logging.getLogger[F] def submit( data: OUpload.UploadData[F], account: AccountId, - notifyJoex: Boolean, itemId: Option[Ident] ): F[OUpload.UploadResult] = (for { @@ -150,7 +145,7 @@ object OUpload { args = ProcessItemArgs(meta, files.toList) jobs <- right(makeJobs(data, args, account)) _ <- right(logger.debug(s"Storing jobs: $jobs")) - res <- right(submitJobs(notifyJoex)(jobs.map(_.encode))) + res <- right(submitJobs(jobs.map(_.encode))) _ <- right( store.transact( RSource.incrementCounter(data.meta.sourceAbbrev, account.collective) @@ -161,7 +156,6 @@ object OUpload { def submit( data: OUpload.UploadData[F], sourceId: Ident, - notifyJoex: Boolean, itemId: Option[Ident] ): F[OUpload.UploadResult] = (for { @@ -181,16 +175,13 @@ object OUpload { priority = src.source.priority ) accId = AccountId(src.source.cid, src.source.sid) - result <- OptionT.liftF(submit(updata, accId, notifyJoex, itemId)) + result <- OptionT.liftF(submit(updata, accId, itemId)) } yield result).getOrElse(UploadResult.noSource) - private def submitJobs( - notifyJoex: Boolean - )(jobs: List[Job[String]]): F[OUpload.UploadResult] = + private def submitJobs(jobs: List[Job[String]]): F[OUpload.UploadResult] = for { _ <- logger.debug(s"Storing jobs: $jobs") _ <- jobStore.insertAll(jobs) - _ <- if (notifyJoex) joex.notifyAllNodes else ().pure[F] } yield UploadResult.Success /** Saves the file into the database. */ diff --git a/modules/backend/src/main/scala/docspell/backend/ops/OUserTask.scala b/modules/backend/src/main/scala/docspell/backend/ops/OUserTask.scala index 7fc80b7d..9265effe 100644 --- a/modules/backend/src/main/scala/docspell/backend/ops/OUserTask.scala +++ b/modules/backend/src/main/scala/docspell/backend/ops/OUserTask.scala @@ -92,10 +92,7 @@ object OUserTask { def executeNow[A](scope: UserTaskScope, subject: Option[String], task: UserTask[A])( implicit E: Encoder[A] ): F[Unit] = - for { - _ <- taskStore.executeNow(scope, subject, task) - _ <- joex.notifyAllNodes - } yield () + taskStore.executeNow(scope, subject, task) def getScanMailbox(scope: UserTaskScope): Stream[F, UserTask[ScanMailboxArgs]] = taskStore diff --git a/modules/joex/src/main/scala/docspell/joex/JoexTasks.scala b/modules/joex/src/main/scala/docspell/joex/JoexTasks.scala index b9c242fe..334943f7 100644 --- a/modules/joex/src/main/scala/docspell/joex/JoexTasks.scala +++ b/modules/joex/src/main/scala/docspell/joex/JoexTasks.scala @@ -117,7 +117,7 @@ final class JoexTasks[F[_]: Async]( .withTask( JobTask.json( ConvertAllPdfArgs.taskName, - ConvertAllPdfTask[F](jobStoreModule.jobs, joex, store), + ConvertAllPdfTask[F](jobStoreModule.jobs, store), ConvertAllPdfTask.onCancel[F] ) ) @@ -138,7 +138,7 @@ final class JoexTasks[F[_]: Async]( .withTask( JobTask.json( AllPreviewsArgs.taskName, - AllPreviewsTask[F](jobStoreModule.jobs, joex, store), + AllPreviewsTask[F](jobStoreModule.jobs, store), AllPreviewsTask.onCancel[F] ) ) @@ -152,7 +152,7 @@ final class JoexTasks[F[_]: Async]( .withTask( JobTask.json( AllPageCountTask.taskName, - AllPageCountTask[F](store, jobStoreModule.jobs, joex), + AllPageCountTask[F](store, jobStoreModule.jobs), AllPageCountTask.onCancel[F] ) ) @@ -220,16 +220,16 @@ object JoexTasks { for { joex <- OJoex(pubSub) store = jobStoreModule.store - upload <- OUpload(store, jobStoreModule.jobs, joex) + upload <- OUpload(store, jobStoreModule.jobs) fts <- createFtsClient(cfg)(httpClient) createIndex <- CreateIndex.resource(fts, store) - itemOps <- OItem(store, fts, createIndex, jobStoreModule.jobs, joex) + itemOps <- OItem(store, fts, createIndex, jobStoreModule.jobs) itemSearchOps <- OItemSearch(store) analyser <- TextAnalyser.create[F](cfg.textAnalysis.textAnalysisConfig) regexNer <- RegexNerFile(cfg.textAnalysis.regexNerFileConfig, store) updateCheck <- UpdateCheck.resource(httpClient) notification <- ONotification(store, notificationModule) - fileRepo <- OFileRepository(store, jobStoreModule.jobs, joex) + fileRepo <- OFileRepository(store, jobStoreModule.jobs) } yield new JoexTasks[F]( cfg, store, diff --git a/modules/joex/src/main/scala/docspell/joex/pagecount/AllPageCountTask.scala b/modules/joex/src/main/scala/docspell/joex/pagecount/AllPageCountTask.scala index f6f91346..1620b87f 100644 --- a/modules/joex/src/main/scala/docspell/joex/pagecount/AllPageCountTask.scala +++ b/modules/joex/src/main/scala/docspell/joex/pagecount/AllPageCountTask.scala @@ -11,7 +11,6 @@ import cats.implicits._ import fs2.{Chunk, Stream} import docspell.backend.JobFactory -import docspell.backend.ops.OJoex import docspell.common._ import docspell.scheduler._ import docspell.store.Store @@ -24,15 +23,13 @@ object AllPageCountTask { def apply[F[_]: Sync]( store: Store[F], - jobStore: JobStore[F], - joex: OJoex[F] + jobStore: JobStore[F] ): Task[F, Args, Unit] = Task { ctx => for { _ <- ctx.logger.info("Generating previews for attachments") n <- submitConversionJobs(ctx, store, jobStore) _ <- ctx.logger.info(s"Submitted $n jobs") - _ <- joex.notifyAllNodes } yield () } diff --git a/modules/joex/src/main/scala/docspell/joex/pdfconv/ConvertAllPdfTask.scala b/modules/joex/src/main/scala/docspell/joex/pdfconv/ConvertAllPdfTask.scala index 84d3687d..607da1f9 100644 --- a/modules/joex/src/main/scala/docspell/joex/pdfconv/ConvertAllPdfTask.scala +++ b/modules/joex/src/main/scala/docspell/joex/pdfconv/ConvertAllPdfTask.scala @@ -10,7 +10,6 @@ import cats.effect._ import cats.implicits._ import fs2.{Chunk, Stream} -import docspell.backend.ops.OJoex import docspell.common._ import docspell.scheduler._ import docspell.store.Store @@ -25,7 +24,6 @@ object ConvertAllPdfTask { def apply[F[_]: Sync]( jobStore: JobStore[F], - joex: OJoex[F], store: Store[F] ): Task[F, Args, Unit] = Task { ctx => @@ -33,7 +31,6 @@ object ConvertAllPdfTask { _ <- ctx.logger.info("Converting pdfs using ocrmypdf") n <- submitConversionJobs(ctx, store, jobStore) _ <- ctx.logger.info(s"Submitted $n file conversion jobs") - _ <- joex.notifyAllNodes } yield () } diff --git a/modules/joex/src/main/scala/docspell/joex/preview/AllPreviewsTask.scala b/modules/joex/src/main/scala/docspell/joex/preview/AllPreviewsTask.scala index b0988305..c50d64a1 100644 --- a/modules/joex/src/main/scala/docspell/joex/preview/AllPreviewsTask.scala +++ b/modules/joex/src/main/scala/docspell/joex/preview/AllPreviewsTask.scala @@ -11,7 +11,6 @@ import cats.implicits._ import fs2.{Chunk, Stream} import docspell.backend.JobFactory -import docspell.backend.ops.OJoex import docspell.common.MakePreviewArgs.StoreMode import docspell.common._ import docspell.scheduler._ @@ -24,7 +23,6 @@ object AllPreviewsTask { def apply[F[_]: Sync]( jobStore: JobStore[F], - joex: OJoex[F], store: Store[F] ): Task[F, Args, Unit] = Task { ctx => @@ -32,7 +30,6 @@ object AllPreviewsTask { _ <- ctx.logger.info("Generating previews for attachments") n <- submitConversionJobs(ctx, store, jobStore) _ <- ctx.logger.info(s"Submitted $n jobs") - _ <- joex.notifyAllNodes } yield () } diff --git a/modules/joex/src/main/scala/docspell/joex/scanmailbox/ScanMailboxTask.scala b/modules/joex/src/main/scala/docspell/joex/scanmailbox/ScanMailboxTask.scala index 4f2fc0ec..cd0fb02b 100644 --- a/modules/joex/src/main/scala/docspell/joex/scanmailbox/ScanMailboxTask.scala +++ b/modules/joex/src/main/scala/docspell/joex/scanmailbox/ScanMailboxTask.scala @@ -337,7 +337,7 @@ object ScanMailboxTask { priority = Priority.Low, tracker = None ) - res <- upload.submit(data, ctx.args.account, false, None) + res <- upload.submit(data, ctx.args.account, None) } yield res } diff --git a/modules/restserver/src/main/scala/docspell/restserver/routes/AttachmentRoutes.scala b/modules/restserver/src/main/scala/docspell/restserver/routes/AttachmentRoutes.scala index fb7409b7..9d2264e0 100644 --- a/modules/restserver/src/main/scala/docspell/restserver/routes/AttachmentRoutes.scala +++ b/modules/restserver/src/main/scala/docspell/restserver/routes/AttachmentRoutes.scala @@ -121,8 +121,7 @@ object AttachmentRoutes { for { res <- backend.item.generatePreview( MakePreviewArgs.replace(id), - user.account, - true + user.account ) resp <- Ok( Conversions.basicResult(res, "Generating preview image task submitted.") @@ -169,7 +168,7 @@ object AttachmentRoutes { HttpRoutes.of { case POST -> Root / "generatePreviews" => for { - res <- backend.item.generateAllPreviews(MakePreviewArgs.StoreMode.Replace, true) + res <- backend.item.generateAllPreviews(MakePreviewArgs.StoreMode.Replace) resp <- Ok( Conversions.basicResult(res, "Generate all previews task submitted.") ) @@ -178,7 +177,7 @@ object AttachmentRoutes { case POST -> Root / "convertallpdfs" => for { res <- - backend.item.convertAllPdf(None, None, true) + backend.item.convertAllPdf(None, None) resp <- Ok(Conversions.basicResult(res, "Convert all PDFs task submitted")) } yield resp } diff --git a/modules/restserver/src/main/scala/docspell/restserver/routes/FileRepositoryRoutes.scala b/modules/restserver/src/main/scala/docspell/restserver/routes/FileRepositoryRoutes.scala index 3d4592c2..f89d687e 100644 --- a/modules/restserver/src/main/scala/docspell/restserver/routes/FileRepositoryRoutes.scala +++ b/modules/restserver/src/main/scala/docspell/restserver/routes/FileRepositoryRoutes.scala @@ -32,7 +32,7 @@ object FileRepositoryRoutes { for { input <- req.as[FileRepositoryCloneRequest] args = makeTaskArgs(input) - job <- backend.fileRepository.cloneFileRepository(args, true) + job <- backend.fileRepository.cloneFileRepository(args) result = BasicResult( job.isDefined, job.fold(s"Job for '${FileCopyTaskArgs.taskName.id}' already running")(j => @@ -46,7 +46,7 @@ object FileRepositoryRoutes { case req @ POST -> Root / "integrityCheck" => for { input <- req.as[FileKeyPart] - job <- backend.fileRepository.checkIntegrityAll(input, true) + job <- backend.fileRepository.checkIntegrityAll(input) result = BasicResult( job.isDefined, job.fold(s"Job for '${FileCopyTaskArgs.taskName.id}' already running")(j => diff --git a/modules/restserver/src/main/scala/docspell/restserver/routes/IntegrationEndpointRoutes.scala b/modules/restserver/src/main/scala/docspell/restserver/routes/IntegrationEndpointRoutes.scala index 8d05d7f4..3777f18a 100644 --- a/modules/restserver/src/main/scala/docspell/restserver/routes/IntegrationEndpointRoutes.scala +++ b/modules/restserver/src/main/scala/docspell/restserver/routes/IntegrationEndpointRoutes.scala @@ -111,7 +111,7 @@ object IntegrationEndpointRoutes { cfg.backend.files.validMimeTypes ) account = AccountId(coll, DocspellSystem.user) - result <- backend.upload.submit(updata, account, true, None) + result <- backend.upload.submit(updata, account, None) res <- Ok(basicResult(result)) } yield res } diff --git a/modules/restserver/src/main/scala/docspell/restserver/routes/ItemMultiRoutes.scala b/modules/restserver/src/main/scala/docspell/restserver/routes/ItemMultiRoutes.scala index 373ac8af..a713f160 100644 --- a/modules/restserver/src/main/scala/docspell/restserver/routes/ItemMultiRoutes.scala +++ b/modules/restserver/src/main/scala/docspell/restserver/routes/ItemMultiRoutes.scala @@ -181,7 +181,7 @@ object ItemMultiRoutes extends NonEmptyListSupport with MultiIdSupport { for { json <- req.as[IdList] items <- requireNonEmpty(json.ids) - res <- backend.item.reprocessAll(items, user.account, true) + res <- backend.item.reprocessAll(items, user.account) resp <- Ok(Conversions.basicResult(res, "Re-process task(s) submitted.")) } yield resp diff --git a/modules/restserver/src/main/scala/docspell/restserver/routes/ItemRoutes.scala b/modules/restserver/src/main/scala/docspell/restserver/routes/ItemRoutes.scala index 7d46e83e..d830fc6c 100644 --- a/modules/restserver/src/main/scala/docspell/restserver/routes/ItemRoutes.scala +++ b/modules/restserver/src/main/scala/docspell/restserver/routes/ItemRoutes.scala @@ -388,7 +388,7 @@ object ItemRoutes { for { data <- req.as[IdList] _ <- logger.debug(s"Re-process item ${id.id}") - res <- backend.item.reprocess(id, data.ids, user.account, true) + res <- backend.item.reprocess(id, data.ids, user.account) resp <- Ok(Conversions.basicResult(res, "Re-process task submitted.")) } yield resp diff --git a/modules/restserver/src/main/scala/docspell/restserver/routes/UploadRoutes.scala b/modules/restserver/src/main/scala/docspell/restserver/routes/UploadRoutes.scala index 8a18cd6c..3fe1c379 100644 --- a/modules/restserver/src/main/scala/docspell/restserver/routes/UploadRoutes.scala +++ b/modules/restserver/src/main/scala/docspell/restserver/routes/UploadRoutes.scala @@ -96,7 +96,7 @@ object UploadRoutes { prio, cfg.backend.files.validMimeTypes ) - result <- backend.upload.submitEither(updata, accOrSrc, true, itemId) + result <- backend.upload.submitEither(updata, accOrSrc, itemId) res <- Ok(basicResult(result)) } yield res } diff --git a/modules/scheduler/impl/src/main/scala/docspell/scheduler/impl/JobStorePublish.scala b/modules/scheduler/impl/src/main/scala/docspell/scheduler/impl/JobStorePublish.scala index e984e6e5..3d88e322 100644 --- a/modules/scheduler/impl/src/main/scala/docspell/scheduler/impl/JobStorePublish.scala +++ b/modules/scheduler/impl/src/main/scala/docspell/scheduler/impl/JobStorePublish.scala @@ -13,7 +13,7 @@ import docspell.common.{Ident, JobState} import docspell.notification.api.{Event, EventSink} import docspell.pubsub.api.PubSubT import docspell.scheduler._ -import docspell.scheduler.msg.JobSubmitted +import docspell.scheduler.msg.{JobSubmitted, JobsNotify} import docspell.store.Store final class JobStorePublish[F[_]: Sync]( @@ -40,30 +40,39 @@ final class JobStorePublish[F[_]: Sync]( pubsub.publish1(JobSubmitted.topic, msg(job)).as(()) *> eventSink.offer(event(job)) + private def notifyJoex: F[Unit] = + pubsub.publish1IgnoreErrors(JobsNotify(), ()).void + def insert(job: Job[String]) = - delegate.insert(job).flatTap(_ => publish(job)) + delegate.insert(job).flatTap(_ => publish(job) *> notifyJoex) def insertIfNew(job: Job[String]) = delegate.insertIfNew(job).flatTap { - case true => publish(job) + case true => publish(job) *> notifyJoex case false => ().pure[F] } def insertAll(jobs: Seq[Job[String]]) = - delegate.insertAll(jobs).flatTap { results => - results.zip(jobs).traverse { case (res, job) => - if (res) publish(job) - else ().pure[F] + delegate + .insertAll(jobs) + .flatTap { results => + results.zip(jobs).traverse { case (res, job) => + if (res) publish(job) + else ().pure[F] + } } - } + .flatTap(_ => notifyJoex) def insertAllIfNew(jobs: Seq[Job[String]]) = - delegate.insertAllIfNew(jobs).flatTap { results => - results.zip(jobs).traverse { case (res, job) => - if (res) publish(job) - else ().pure[F] + delegate + .insertAllIfNew(jobs) + .flatTap { results => + results.zip(jobs).traverse { case (res, job) => + if (res) publish(job) + else ().pure[F] + } } - } + .flatTap(_ => notifyJoex) def findById(jobId: Ident) = delegate.findById(jobId)