Publish joex from within the job store

Reduces friction when using the job store.
This commit is contained in:
eikek 2022-03-20 11:40:44 +01:00
parent b84bbbd750
commit 0b3fe8eb5c
19 changed files with 71 additions and 122 deletions

View File

@ -78,18 +78,17 @@ object BackendApp {
tagImpl <- OTag[F](store) tagImpl <- OTag[F](store)
equipImpl <- OEquipment[F](store) equipImpl <- OEquipment[F](store)
orgImpl <- OOrganization(store) orgImpl <- OOrganization(store)
uploadImpl <- OUpload(store, schedulerModule.jobs, joexImpl) uploadImpl <- OUpload(store, schedulerModule.jobs)
nodeImpl <- ONode(store) nodeImpl <- ONode(store)
jobImpl <- OJob(store, joexImpl, pubSubT) jobImpl <- OJob(store, joexImpl, pubSubT)
createIndex <- CreateIndex.resource(ftsClient, store) createIndex <- CreateIndex.resource(ftsClient, store)
itemImpl <- OItem(store, ftsClient, createIndex, schedulerModule.jobs, joexImpl) itemImpl <- OItem(store, ftsClient, createIndex, schedulerModule.jobs)
itemSearchImpl <- OItemSearch(store) itemSearchImpl <- OItemSearch(store)
fulltextImpl <- OFulltext( fulltextImpl <- OFulltext(
itemSearchImpl, itemSearchImpl,
ftsClient, ftsClient,
store, store,
schedulerModule.jobs, schedulerModule.jobs
joexImpl
) )
mailImpl <- OMail(store, javaEmil) mailImpl <- OMail(store, javaEmil)
userTaskImpl <- OUserTask( userTaskImpl <- OUserTask(
@ -106,7 +105,7 @@ object BackendApp {
) )
notifyImpl <- ONotification(store, notificationMod) notifyImpl <- ONotification(store, notificationMod)
bookmarksImpl <- OQueryBookmarks(store) bookmarksImpl <- OQueryBookmarks(store)
fileRepoImpl <- OFileRepository(store, schedulerModule.jobs, joexImpl) fileRepoImpl <- OFileRepository(store, schedulerModule.jobs)
itemLinkImpl <- Resource.pure(OItemLink(store, itemSearchImpl)) itemLinkImpl <- Resource.pure(OItemLink(store, itemSearchImpl))
} yield new BackendApp[F] { } yield new BackendApp[F] {
val pubSub = pubSubT val pubSub = pubSubT

View File

@ -78,8 +78,7 @@ trait OCollective[F[_]] {
*/ */
def generatePreviews( def generatePreviews(
storeMode: MakePreviewArgs.StoreMode, storeMode: MakePreviewArgs.StoreMode,
account: AccountId, account: AccountId
notifyJoex: Boolean
): F[UpdateResult] ): F[UpdateResult]
} }
@ -206,7 +205,6 @@ object OCollective {
) )
_ <- uts _ <- uts
.executeNow(UserTaskScope(collective), args.makeSubject.some, ut) .executeNow(UserTaskScope(collective), args.makeSubject.some, ut)
_ <- joex.notifyAllNodes
} yield () } yield ()
def startEmptyTrash(args: EmptyTrashArgs): F[Unit] = def startEmptyTrash(args: EmptyTrashArgs): F[Unit] =
@ -222,7 +220,6 @@ object OCollective {
) )
_ <- uts _ <- uts
.executeNow(UserTaskScope(args.collective), args.makeSubject.some, ut) .executeNow(UserTaskScope(args.collective), args.makeSubject.some, ut)
_ <- joex.notifyAllNodes
} yield () } yield ()
def findSettings(collective: Ident): F[Option[OCollective.Settings]] = def findSettings(collective: Ident): F[Option[OCollective.Settings]] =
@ -313,8 +310,7 @@ object OCollective {
def generatePreviews( def generatePreviews(
storeMode: MakePreviewArgs.StoreMode, storeMode: MakePreviewArgs.StoreMode,
account: AccountId, account: AccountId
notifyJoex: Boolean
): F[UpdateResult] = ): F[UpdateResult] =
for { for {
job <- JobFactory.allPreviews[F]( job <- JobFactory.allPreviews[F](
@ -322,7 +318,6 @@ object OCollective {
Some(account.user) Some(account.user)
) )
_ <- jobStore.insertIfNew(job.encode) _ <- jobStore.insertIfNew(job.encode)
_ <- if (notifyJoex) joex.notifyAllNodes else ().pure[F]
} yield UpdateResult.success } yield UpdateResult.success
}) })

View File

@ -21,15 +21,9 @@ import scodec.bits.ByteVector
trait OFileRepository[F[_]] { trait OFileRepository[F[_]] {
/** Inserts the job or return None if such a job already is running. */ /** Inserts the job or return None if such a job already is running. */
def cloneFileRepository( def cloneFileRepository(args: FileCopyTaskArgs): F[Option[Job[FileCopyTaskArgs]]]
args: FileCopyTaskArgs,
notifyJoex: Boolean
): F[Option[Job[FileCopyTaskArgs]]]
def checkIntegrityAll( def checkIntegrityAll(part: FileKeyPart): F[Option[Job[FileIntegrityCheckArgs]]]
part: FileKeyPart,
notifyJoex: Boolean
): F[Option[Job[FileIntegrityCheckArgs]]]
def checkIntegrity(key: FileKey, hash: Option[ByteVector]): F[Option[IntegrityResult]] def checkIntegrity(key: FileKey, hash: Option[ByteVector]): F[Option[IntegrityResult]]
} }
@ -40,30 +34,23 @@ object OFileRepository {
def apply[F[_]: Async]( def apply[F[_]: Async](
store: Store[F], store: Store[F],
jobStore: JobStore[F], jobStore: JobStore[F]
joex: OJoex[F]
): Resource[F, OFileRepository[F]] = ): Resource[F, OFileRepository[F]] =
Resource.pure(new OFileRepository[F] { Resource.pure(new OFileRepository[F] {
private[this] val logger = docspell.logging.getLogger[F] private[this] val logger = docspell.logging.getLogger[F]
def cloneFileRepository( def cloneFileRepository(args: FileCopyTaskArgs): F[Option[Job[FileCopyTaskArgs]]] =
args: FileCopyTaskArgs,
notifyJoex: Boolean
): F[Option[Job[FileCopyTaskArgs]]] =
for { for {
job <- JobFactory.fileCopy(args) job <- JobFactory.fileCopy(args)
flag <- jobStore.insertIfNew(job.encode) flag <- jobStore.insertIfNew(job.encode)
_ <- if (notifyJoex) joex.notifyAllNodes else ().pure[F]
} yield Option.when(flag)(job) } yield Option.when(flag)(job)
def checkIntegrityAll( def checkIntegrityAll(
part: FileKeyPart, part: FileKeyPart
notifyJoex: Boolean
): F[Option[Job[FileIntegrityCheckArgs]]] = ): F[Option[Job[FileIntegrityCheckArgs]]] =
for { for {
job <- JobFactory.integrityCheck(FileIntegrityCheckArgs(part)) job <- JobFactory.integrityCheck(FileIntegrityCheckArgs(part))
flag <- jobStore.insertIfNew(job.encode) flag <- jobStore.insertIfNew(job.encode)
_ <- if (notifyJoex) joex.notifyAllNodes else ().pure[F]
} yield Option.when(flag)(job) } yield Option.when(flag)(job)
def checkIntegrity( def checkIntegrity(

View File

@ -81,8 +81,7 @@ object OFulltext {
itemSearch: OItemSearch[F], itemSearch: OItemSearch[F],
fts: FtsClient[F], fts: FtsClient[F],
store: Store[F], store: Store[F],
jobStore: JobStore[F], jobStore: JobStore[F]
joex: OJoex[F]
): Resource[F, OFulltext[F]] = ): Resource[F, OFulltext[F]] =
Resource.pure[F, OFulltext[F]](new OFulltext[F] { Resource.pure[F, OFulltext[F]](new OFulltext[F] {
val logger = docspell.logging.getLogger[F] val logger = docspell.logging.getLogger[F]
@ -90,7 +89,7 @@ object OFulltext {
for { for {
_ <- logger.info(s"Re-index all.") _ <- logger.info(s"Re-index all.")
job <- JobFactory.reIndexAll[F] job <- JobFactory.reIndexAll[F]
_ <- jobStore.insertIfNew(job.encode) *> joex.notifyAllNodes _ <- jobStore.insertIfNew(job.encode)
} yield () } yield ()
def reindexCollective(account: AccountId): F[Unit] = def reindexCollective(account: AccountId): F[Unit] =
@ -102,7 +101,7 @@ object OFulltext {
job <- JobFactory.reIndex(account) job <- JobFactory.reIndex(account)
_ <- _ <-
if (exist.isDefined) ().pure[F] if (exist.isDefined) ().pure[F]
else jobStore.insertIfNew(job.encode) *> joex.notifyAllNodes else jobStore.insertIfNew(job.encode)
} yield () } yield ()
def findIndexOnly(maxNoteLen: Int)( def findIndexOnly(maxNoteLen: Int)(

View File

@ -183,14 +183,12 @@ trait OItem[F[_]] {
def reprocess( def reprocess(
item: Ident, item: Ident,
attachments: List[Ident], attachments: List[Ident],
account: AccountId, account: AccountId
notifyJoex: Boolean
): F[UpdateResult] ): F[UpdateResult]
def reprocessAll( def reprocessAll(
items: Nel[Ident], items: Nel[Ident],
account: AccountId, account: AccountId
notifyJoex: Boolean
): F[UpdateResult] ): F[UpdateResult]
/** Submits a task that finds all non-converted pdfs and triggers converting them using /** Submits a task that finds all non-converted pdfs and triggers converting them using
@ -198,22 +196,17 @@ trait OItem[F[_]] {
*/ */
def convertAllPdf( def convertAllPdf(
collective: Option[Ident], collective: Option[Ident],
submitter: Option[Ident], submitter: Option[Ident]
notifyJoex: Boolean
): F[UpdateResult] ): F[UpdateResult]
/** Submits a task that (re)generates the preview image for an attachment. */ /** Submits a task that (re)generates the preview image for an attachment. */
def generatePreview( def generatePreview(
args: MakePreviewArgs, args: MakePreviewArgs,
account: AccountId, account: AccountId
notifyJoex: Boolean
): F[UpdateResult] ): F[UpdateResult]
/** Submits a task that (re)generates the preview images for all attachments. */ /** Submits a task that (re)generates the preview images for all attachments. */
def generateAllPreviews( def generateAllPreviews(storeMode: MakePreviewArgs.StoreMode): F[UpdateResult]
storeMode: MakePreviewArgs.StoreMode,
notifyJoex: Boolean
): F[UpdateResult]
/** Merges a list of items into one item. The remaining items are deleted. */ /** Merges a list of items into one item. The remaining items are deleted. */
def merge( def merge(
@ -228,8 +221,7 @@ object OItem {
store: Store[F], store: Store[F],
fts: FtsClient[F], fts: FtsClient[F],
createIndex: CreateIndex[F], createIndex: CreateIndex[F],
jobStore: JobStore[F], jobStore: JobStore[F]
joex: OJoex[F]
): Resource[F, OItem[F]] = ): Resource[F, OItem[F]] =
for { for {
otag <- OTag(store) otag <- OTag(store)
@ -752,8 +744,7 @@ object OItem {
def reprocess( def reprocess(
item: Ident, item: Ident,
attachments: List[Ident], attachments: List[Ident],
account: AccountId, account: AccountId
notifyJoex: Boolean
): F[UpdateResult] = ): F[UpdateResult] =
(for { (for {
_ <- OptionT( _ <- OptionT(
@ -764,13 +755,11 @@ object OItem {
JobFactory.reprocessItem[F](args, account, Priority.Low) JobFactory.reprocessItem[F](args, account, Priority.Low)
) )
_ <- OptionT.liftF(jobStore.insertIfNew(job.encode)) _ <- OptionT.liftF(jobStore.insertIfNew(job.encode))
_ <- OptionT.liftF(if (notifyJoex) joex.notifyAllNodes else ().pure[F])
} yield UpdateResult.success).getOrElse(UpdateResult.notFound) } yield UpdateResult.success).getOrElse(UpdateResult.notFound)
def reprocessAll( def reprocessAll(
items: Nel[Ident], items: Nel[Ident],
account: AccountId, account: AccountId
notifyJoex: Boolean
): F[UpdateResult] = ): F[UpdateResult] =
UpdateResult.fromUpdate(for { UpdateResult.fromUpdate(for {
items <- store.transact(RItem.filterItems(items, account.collective)) items <- store.transact(RItem.filterItems(items, account.collective))
@ -779,39 +768,32 @@ object OItem {
.traverse(arg => JobFactory.reprocessItem[F](arg, account, Priority.Low)) .traverse(arg => JobFactory.reprocessItem[F](arg, account, Priority.Low))
.map(_.map(_.encode)) .map(_.map(_.encode))
_ <- jobStore.insertAllIfNew(jobs) _ <- jobStore.insertAllIfNew(jobs)
_ <- if (notifyJoex) joex.notifyAllNodes else ().pure[F]
} yield items.size) } yield items.size)
def convertAllPdf( def convertAllPdf(
collective: Option[Ident], collective: Option[Ident],
submitter: Option[Ident], submitter: Option[Ident]
notifyJoex: Boolean
): F[UpdateResult] = ): F[UpdateResult] =
for { for {
job <- JobFactory.convertAllPdfs[F](collective, submitter, Priority.Low) job <- JobFactory.convertAllPdfs[F](collective, submitter, Priority.Low)
_ <- jobStore.insertIfNew(job.encode) _ <- jobStore.insertIfNew(job.encode)
_ <- if (notifyJoex) joex.notifyAllNodes else ().pure[F]
} yield UpdateResult.success } yield UpdateResult.success
def generatePreview( def generatePreview(
args: MakePreviewArgs, args: MakePreviewArgs,
account: AccountId, account: AccountId
notifyJoex: Boolean
): F[UpdateResult] = ): F[UpdateResult] =
for { for {
job <- JobFactory.makePreview[F](args, account.some) job <- JobFactory.makePreview[F](args, account.some)
_ <- jobStore.insertIfNew(job.encode) _ <- jobStore.insertIfNew(job.encode)
_ <- if (notifyJoex) joex.notifyAllNodes else ().pure[F]
} yield UpdateResult.success } yield UpdateResult.success
def generateAllPreviews( def generateAllPreviews(
storeMode: MakePreviewArgs.StoreMode, storeMode: MakePreviewArgs.StoreMode
notifyJoex: Boolean
): F[UpdateResult] = ): F[UpdateResult] =
for { for {
job <- JobFactory.allPreviews[F](AllPreviewsArgs(None, storeMode), None) job <- JobFactory.allPreviews[F](AllPreviewsArgs(None, storeMode), None)
_ <- jobStore.insertIfNew(job.encode) _ <- jobStore.insertIfNew(job.encode)
_ <- if (notifyJoex) joex.notifyAllNodes else ().pure[F]
} yield UpdateResult.success } yield UpdateResult.success
private def onSuccessIgnoreError(update: F[Unit])(ar: UpdateResult): F[Unit] = private def onSuccessIgnoreError(update: F[Unit])(ar: UpdateResult): F[Unit] =

View File

@ -23,7 +23,6 @@ trait OUpload[F[_]] {
def submit( def submit(
data: OUpload.UploadData[F], data: OUpload.UploadData[F],
account: AccountId, account: AccountId,
notifyJoex: Boolean,
itemId: Option[Ident] itemId: Option[Ident]
): F[OUpload.UploadResult] ): F[OUpload.UploadResult]
@ -34,21 +33,19 @@ trait OUpload[F[_]] {
def submit( def submit(
data: OUpload.UploadData[F], data: OUpload.UploadData[F],
sourceId: Ident, sourceId: Ident,
notifyJoex: Boolean,
itemId: Option[Ident] itemId: Option[Ident]
): F[OUpload.UploadResult] ): F[OUpload.UploadResult]
final def submitEither( final def submitEither(
data: OUpload.UploadData[F], data: OUpload.UploadData[F],
accOrSrc: Either[Ident, AccountId], accOrSrc: Either[Ident, AccountId],
notifyJoex: Boolean,
itemId: Option[Ident] itemId: Option[Ident]
): F[OUpload.UploadResult] = ): F[OUpload.UploadResult] =
accOrSrc match { accOrSrc match {
case Right(acc) => case Right(acc) =>
submit(data, acc, notifyJoex, itemId) submit(data, acc, itemId)
case Left(srcId) => case Left(srcId) =>
submit(data, srcId, notifyJoex, itemId) submit(data, srcId, itemId)
} }
} }
@ -109,15 +106,13 @@ object OUpload {
def apply[F[_]: Sync]( def apply[F[_]: Sync](
store: Store[F], store: Store[F],
jobStore: JobStore[F], jobStore: JobStore[F]
joex: OJoex[F]
): Resource[F, OUpload[F]] = ): Resource[F, OUpload[F]] =
Resource.pure[F, OUpload[F]](new OUpload[F] { Resource.pure[F, OUpload[F]](new OUpload[F] {
private[this] val logger = docspell.logging.getLogger[F] private[this] val logger = docspell.logging.getLogger[F]
def submit( def submit(
data: OUpload.UploadData[F], data: OUpload.UploadData[F],
account: AccountId, account: AccountId,
notifyJoex: Boolean,
itemId: Option[Ident] itemId: Option[Ident]
): F[OUpload.UploadResult] = ): F[OUpload.UploadResult] =
(for { (for {
@ -150,7 +145,7 @@ object OUpload {
args = ProcessItemArgs(meta, files.toList) args = ProcessItemArgs(meta, files.toList)
jobs <- right(makeJobs(data, args, account)) jobs <- right(makeJobs(data, args, account))
_ <- right(logger.debug(s"Storing jobs: $jobs")) _ <- right(logger.debug(s"Storing jobs: $jobs"))
res <- right(submitJobs(notifyJoex)(jobs.map(_.encode))) res <- right(submitJobs(jobs.map(_.encode)))
_ <- right( _ <- right(
store.transact( store.transact(
RSource.incrementCounter(data.meta.sourceAbbrev, account.collective) RSource.incrementCounter(data.meta.sourceAbbrev, account.collective)
@ -161,7 +156,6 @@ object OUpload {
def submit( def submit(
data: OUpload.UploadData[F], data: OUpload.UploadData[F],
sourceId: Ident, sourceId: Ident,
notifyJoex: Boolean,
itemId: Option[Ident] itemId: Option[Ident]
): F[OUpload.UploadResult] = ): F[OUpload.UploadResult] =
(for { (for {
@ -181,16 +175,13 @@ object OUpload {
priority = src.source.priority priority = src.source.priority
) )
accId = AccountId(src.source.cid, src.source.sid) accId = AccountId(src.source.cid, src.source.sid)
result <- OptionT.liftF(submit(updata, accId, notifyJoex, itemId)) result <- OptionT.liftF(submit(updata, accId, itemId))
} yield result).getOrElse(UploadResult.noSource) } yield result).getOrElse(UploadResult.noSource)
private def submitJobs( private def submitJobs(jobs: List[Job[String]]): F[OUpload.UploadResult] =
notifyJoex: Boolean
)(jobs: List[Job[String]]): F[OUpload.UploadResult] =
for { for {
_ <- logger.debug(s"Storing jobs: $jobs") _ <- logger.debug(s"Storing jobs: $jobs")
_ <- jobStore.insertAll(jobs) _ <- jobStore.insertAll(jobs)
_ <- if (notifyJoex) joex.notifyAllNodes else ().pure[F]
} yield UploadResult.Success } yield UploadResult.Success
/** Saves the file into the database. */ /** Saves the file into the database. */

View File

@ -92,10 +92,7 @@ object OUserTask {
def executeNow[A](scope: UserTaskScope, subject: Option[String], task: UserTask[A])( def executeNow[A](scope: UserTaskScope, subject: Option[String], task: UserTask[A])(
implicit E: Encoder[A] implicit E: Encoder[A]
): F[Unit] = ): F[Unit] =
for { taskStore.executeNow(scope, subject, task)
_ <- taskStore.executeNow(scope, subject, task)
_ <- joex.notifyAllNodes
} yield ()
def getScanMailbox(scope: UserTaskScope): Stream[F, UserTask[ScanMailboxArgs]] = def getScanMailbox(scope: UserTaskScope): Stream[F, UserTask[ScanMailboxArgs]] =
taskStore taskStore

View File

@ -117,7 +117,7 @@ final class JoexTasks[F[_]: Async](
.withTask( .withTask(
JobTask.json( JobTask.json(
ConvertAllPdfArgs.taskName, ConvertAllPdfArgs.taskName,
ConvertAllPdfTask[F](jobStoreModule.jobs, joex, store), ConvertAllPdfTask[F](jobStoreModule.jobs, store),
ConvertAllPdfTask.onCancel[F] ConvertAllPdfTask.onCancel[F]
) )
) )
@ -138,7 +138,7 @@ final class JoexTasks[F[_]: Async](
.withTask( .withTask(
JobTask.json( JobTask.json(
AllPreviewsArgs.taskName, AllPreviewsArgs.taskName,
AllPreviewsTask[F](jobStoreModule.jobs, joex, store), AllPreviewsTask[F](jobStoreModule.jobs, store),
AllPreviewsTask.onCancel[F] AllPreviewsTask.onCancel[F]
) )
) )
@ -152,7 +152,7 @@ final class JoexTasks[F[_]: Async](
.withTask( .withTask(
JobTask.json( JobTask.json(
AllPageCountTask.taskName, AllPageCountTask.taskName,
AllPageCountTask[F](store, jobStoreModule.jobs, joex), AllPageCountTask[F](store, jobStoreModule.jobs),
AllPageCountTask.onCancel[F] AllPageCountTask.onCancel[F]
) )
) )
@ -220,16 +220,16 @@ object JoexTasks {
for { for {
joex <- OJoex(pubSub) joex <- OJoex(pubSub)
store = jobStoreModule.store store = jobStoreModule.store
upload <- OUpload(store, jobStoreModule.jobs, joex) upload <- OUpload(store, jobStoreModule.jobs)
fts <- createFtsClient(cfg)(httpClient) fts <- createFtsClient(cfg)(httpClient)
createIndex <- CreateIndex.resource(fts, store) createIndex <- CreateIndex.resource(fts, store)
itemOps <- OItem(store, fts, createIndex, jobStoreModule.jobs, joex) itemOps <- OItem(store, fts, createIndex, jobStoreModule.jobs)
itemSearchOps <- OItemSearch(store) itemSearchOps <- OItemSearch(store)
analyser <- TextAnalyser.create[F](cfg.textAnalysis.textAnalysisConfig) analyser <- TextAnalyser.create[F](cfg.textAnalysis.textAnalysisConfig)
regexNer <- RegexNerFile(cfg.textAnalysis.regexNerFileConfig, store) regexNer <- RegexNerFile(cfg.textAnalysis.regexNerFileConfig, store)
updateCheck <- UpdateCheck.resource(httpClient) updateCheck <- UpdateCheck.resource(httpClient)
notification <- ONotification(store, notificationModule) notification <- ONotification(store, notificationModule)
fileRepo <- OFileRepository(store, jobStoreModule.jobs, joex) fileRepo <- OFileRepository(store, jobStoreModule.jobs)
} yield new JoexTasks[F]( } yield new JoexTasks[F](
cfg, cfg,
store, store,

View File

@ -11,7 +11,6 @@ import cats.implicits._
import fs2.{Chunk, Stream} import fs2.{Chunk, Stream}
import docspell.backend.JobFactory import docspell.backend.JobFactory
import docspell.backend.ops.OJoex
import docspell.common._ import docspell.common._
import docspell.scheduler._ import docspell.scheduler._
import docspell.store.Store import docspell.store.Store
@ -24,15 +23,13 @@ object AllPageCountTask {
def apply[F[_]: Sync]( def apply[F[_]: Sync](
store: Store[F], store: Store[F],
jobStore: JobStore[F], jobStore: JobStore[F]
joex: OJoex[F]
): Task[F, Args, Unit] = ): Task[F, Args, Unit] =
Task { ctx => Task { ctx =>
for { for {
_ <- ctx.logger.info("Generating previews for attachments") _ <- ctx.logger.info("Generating previews for attachments")
n <- submitConversionJobs(ctx, store, jobStore) n <- submitConversionJobs(ctx, store, jobStore)
_ <- ctx.logger.info(s"Submitted $n jobs") _ <- ctx.logger.info(s"Submitted $n jobs")
_ <- joex.notifyAllNodes
} yield () } yield ()
} }

View File

@ -10,7 +10,6 @@ import cats.effect._
import cats.implicits._ import cats.implicits._
import fs2.{Chunk, Stream} import fs2.{Chunk, Stream}
import docspell.backend.ops.OJoex
import docspell.common._ import docspell.common._
import docspell.scheduler._ import docspell.scheduler._
import docspell.store.Store import docspell.store.Store
@ -25,7 +24,6 @@ object ConvertAllPdfTask {
def apply[F[_]: Sync]( def apply[F[_]: Sync](
jobStore: JobStore[F], jobStore: JobStore[F],
joex: OJoex[F],
store: Store[F] store: Store[F]
): Task[F, Args, Unit] = ): Task[F, Args, Unit] =
Task { ctx => Task { ctx =>
@ -33,7 +31,6 @@ object ConvertAllPdfTask {
_ <- ctx.logger.info("Converting pdfs using ocrmypdf") _ <- ctx.logger.info("Converting pdfs using ocrmypdf")
n <- submitConversionJobs(ctx, store, jobStore) n <- submitConversionJobs(ctx, store, jobStore)
_ <- ctx.logger.info(s"Submitted $n file conversion jobs") _ <- ctx.logger.info(s"Submitted $n file conversion jobs")
_ <- joex.notifyAllNodes
} yield () } yield ()
} }

View File

@ -11,7 +11,6 @@ import cats.implicits._
import fs2.{Chunk, Stream} import fs2.{Chunk, Stream}
import docspell.backend.JobFactory import docspell.backend.JobFactory
import docspell.backend.ops.OJoex
import docspell.common.MakePreviewArgs.StoreMode import docspell.common.MakePreviewArgs.StoreMode
import docspell.common._ import docspell.common._
import docspell.scheduler._ import docspell.scheduler._
@ -24,7 +23,6 @@ object AllPreviewsTask {
def apply[F[_]: Sync]( def apply[F[_]: Sync](
jobStore: JobStore[F], jobStore: JobStore[F],
joex: OJoex[F],
store: Store[F] store: Store[F]
): Task[F, Args, Unit] = ): Task[F, Args, Unit] =
Task { ctx => Task { ctx =>
@ -32,7 +30,6 @@ object AllPreviewsTask {
_ <- ctx.logger.info("Generating previews for attachments") _ <- ctx.logger.info("Generating previews for attachments")
n <- submitConversionJobs(ctx, store, jobStore) n <- submitConversionJobs(ctx, store, jobStore)
_ <- ctx.logger.info(s"Submitted $n jobs") _ <- ctx.logger.info(s"Submitted $n jobs")
_ <- joex.notifyAllNodes
} yield () } yield ()
} }

View File

@ -337,7 +337,7 @@ object ScanMailboxTask {
priority = Priority.Low, priority = Priority.Low,
tracker = None tracker = None
) )
res <- upload.submit(data, ctx.args.account, false, None) res <- upload.submit(data, ctx.args.account, None)
} yield res } yield res
} }

View File

@ -121,8 +121,7 @@ object AttachmentRoutes {
for { for {
res <- backend.item.generatePreview( res <- backend.item.generatePreview(
MakePreviewArgs.replace(id), MakePreviewArgs.replace(id),
user.account, user.account
true
) )
resp <- Ok( resp <- Ok(
Conversions.basicResult(res, "Generating preview image task submitted.") Conversions.basicResult(res, "Generating preview image task submitted.")
@ -169,7 +168,7 @@ object AttachmentRoutes {
HttpRoutes.of { HttpRoutes.of {
case POST -> Root / "generatePreviews" => case POST -> Root / "generatePreviews" =>
for { for {
res <- backend.item.generateAllPreviews(MakePreviewArgs.StoreMode.Replace, true) res <- backend.item.generateAllPreviews(MakePreviewArgs.StoreMode.Replace)
resp <- Ok( resp <- Ok(
Conversions.basicResult(res, "Generate all previews task submitted.") Conversions.basicResult(res, "Generate all previews task submitted.")
) )
@ -178,7 +177,7 @@ object AttachmentRoutes {
case POST -> Root / "convertallpdfs" => case POST -> Root / "convertallpdfs" =>
for { for {
res <- res <-
backend.item.convertAllPdf(None, None, true) backend.item.convertAllPdf(None, None)
resp <- Ok(Conversions.basicResult(res, "Convert all PDFs task submitted")) resp <- Ok(Conversions.basicResult(res, "Convert all PDFs task submitted"))
} yield resp } yield resp
} }

View File

@ -32,7 +32,7 @@ object FileRepositoryRoutes {
for { for {
input <- req.as[FileRepositoryCloneRequest] input <- req.as[FileRepositoryCloneRequest]
args = makeTaskArgs(input) args = makeTaskArgs(input)
job <- backend.fileRepository.cloneFileRepository(args, true) job <- backend.fileRepository.cloneFileRepository(args)
result = BasicResult( result = BasicResult(
job.isDefined, job.isDefined,
job.fold(s"Job for '${FileCopyTaskArgs.taskName.id}' already running")(j => job.fold(s"Job for '${FileCopyTaskArgs.taskName.id}' already running")(j =>
@ -46,7 +46,7 @@ object FileRepositoryRoutes {
case req @ POST -> Root / "integrityCheck" => case req @ POST -> Root / "integrityCheck" =>
for { for {
input <- req.as[FileKeyPart] input <- req.as[FileKeyPart]
job <- backend.fileRepository.checkIntegrityAll(input, true) job <- backend.fileRepository.checkIntegrityAll(input)
result = BasicResult( result = BasicResult(
job.isDefined, job.isDefined,
job.fold(s"Job for '${FileCopyTaskArgs.taskName.id}' already running")(j => job.fold(s"Job for '${FileCopyTaskArgs.taskName.id}' already running")(j =>

View File

@ -111,7 +111,7 @@ object IntegrationEndpointRoutes {
cfg.backend.files.validMimeTypes cfg.backend.files.validMimeTypes
) )
account = AccountId(coll, DocspellSystem.user) account = AccountId(coll, DocspellSystem.user)
result <- backend.upload.submit(updata, account, true, None) result <- backend.upload.submit(updata, account, None)
res <- Ok(basicResult(result)) res <- Ok(basicResult(result))
} yield res } yield res
} }

View File

@ -181,7 +181,7 @@ object ItemMultiRoutes extends NonEmptyListSupport with MultiIdSupport {
for { for {
json <- req.as[IdList] json <- req.as[IdList]
items <- requireNonEmpty(json.ids) items <- requireNonEmpty(json.ids)
res <- backend.item.reprocessAll(items, user.account, true) res <- backend.item.reprocessAll(items, user.account)
resp <- Ok(Conversions.basicResult(res, "Re-process task(s) submitted.")) resp <- Ok(Conversions.basicResult(res, "Re-process task(s) submitted."))
} yield resp } yield resp

View File

@ -388,7 +388,7 @@ object ItemRoutes {
for { for {
data <- req.as[IdList] data <- req.as[IdList]
_ <- logger.debug(s"Re-process item ${id.id}") _ <- logger.debug(s"Re-process item ${id.id}")
res <- backend.item.reprocess(id, data.ids, user.account, true) res <- backend.item.reprocess(id, data.ids, user.account)
resp <- Ok(Conversions.basicResult(res, "Re-process task submitted.")) resp <- Ok(Conversions.basicResult(res, "Re-process task submitted."))
} yield resp } yield resp

View File

@ -96,7 +96,7 @@ object UploadRoutes {
prio, prio,
cfg.backend.files.validMimeTypes cfg.backend.files.validMimeTypes
) )
result <- backend.upload.submitEither(updata, accOrSrc, true, itemId) result <- backend.upload.submitEither(updata, accOrSrc, itemId)
res <- Ok(basicResult(result)) res <- Ok(basicResult(result))
} yield res } yield res
} }

View File

@ -13,7 +13,7 @@ import docspell.common.{Ident, JobState}
import docspell.notification.api.{Event, EventSink} import docspell.notification.api.{Event, EventSink}
import docspell.pubsub.api.PubSubT import docspell.pubsub.api.PubSubT
import docspell.scheduler._ import docspell.scheduler._
import docspell.scheduler.msg.JobSubmitted import docspell.scheduler.msg.{JobSubmitted, JobsNotify}
import docspell.store.Store import docspell.store.Store
final class JobStorePublish[F[_]: Sync]( final class JobStorePublish[F[_]: Sync](
@ -40,30 +40,39 @@ final class JobStorePublish[F[_]: Sync](
pubsub.publish1(JobSubmitted.topic, msg(job)).as(()) *> pubsub.publish1(JobSubmitted.topic, msg(job)).as(()) *>
eventSink.offer(event(job)) eventSink.offer(event(job))
private def notifyJoex: F[Unit] =
pubsub.publish1IgnoreErrors(JobsNotify(), ()).void
def insert(job: Job[String]) = def insert(job: Job[String]) =
delegate.insert(job).flatTap(_ => publish(job)) delegate.insert(job).flatTap(_ => publish(job) *> notifyJoex)
def insertIfNew(job: Job[String]) = def insertIfNew(job: Job[String]) =
delegate.insertIfNew(job).flatTap { delegate.insertIfNew(job).flatTap {
case true => publish(job) case true => publish(job) *> notifyJoex
case false => ().pure[F] case false => ().pure[F]
} }
def insertAll(jobs: Seq[Job[String]]) = def insertAll(jobs: Seq[Job[String]]) =
delegate.insertAll(jobs).flatTap { results => delegate
results.zip(jobs).traverse { case (res, job) => .insertAll(jobs)
if (res) publish(job) .flatTap { results =>
else ().pure[F] results.zip(jobs).traverse { case (res, job) =>
if (res) publish(job)
else ().pure[F]
}
} }
} .flatTap(_ => notifyJoex)
def insertAllIfNew(jobs: Seq[Job[String]]) = def insertAllIfNew(jobs: Seq[Job[String]]) =
delegate.insertAllIfNew(jobs).flatTap { results => delegate
results.zip(jobs).traverse { case (res, job) => .insertAllIfNew(jobs)
if (res) publish(job) .flatTap { results =>
else ().pure[F] results.zip(jobs).traverse { case (res, job) =>
if (res) publish(job)
else ().pure[F]
}
} }
} .flatTap(_ => notifyJoex)
def findById(jobId: Ident) = def findById(jobId: Ident) =
delegate.findById(jobId) delegate.findById(jobId)