From 3a05dc56cbc61113caa06dd4c65cf712a1388c0d Mon Sep 17 00:00:00 2001 From: eikek Date: Sun, 13 Mar 2022 14:27:06 +0100 Subject: [PATCH] Refactor scheduler into api / impl --- build.sbt | 5 +- .../scala/docspell/backend/BackendApp.scala | 33 +- .../scala/docspell/backend/JobFactory.scala | 285 ++++++---------- .../docspell/backend/ops/OCollective.scala | 22 +- .../backend/ops/OFileRepository.scala | 26 +- .../docspell/backend/ops/OFulltext.scala | 12 +- .../scala/docspell/backend/ops/OItem.scala | 17 +- .../scala/docspell/backend/ops/OUpload.scala | 14 +- .../docspell/backend/ops/OUserTask.scala | 8 +- .../docspell/common/syntax/StringSyntax.scala | 9 +- .../scala/docspell/joex/JoexAppImpl.scala | 320 +++++++++--------- .../docspell/joex/fts/MigrationTask.scala | 31 +- .../docspell/joex/hk/HouseKeepingTask.scala | 25 +- .../joex/pagecount/AllPageCountTask.scala | 44 ++- .../joex/pdfconv/ConvertAllPdfTask.scala | 24 +- .../joex/preview/AllPreviewsTask.scala | 24 +- .../docspell/joex/process/ItemHandler.scala | 2 +- .../docspell/joex/process/ReProcessItem.scala | 2 +- .../joex/updatecheck/UpdateCheckTask.scala | 12 +- .../docspell/restserver/RestAppImpl.scala | 9 +- .../scala/docspell/scheduler/Context.scala | 57 +--- .../main/scala/docspell/scheduler/Job.scala | 36 ++ .../scala/docspell/scheduler/JobQueue.scala | 97 ------ .../scala/docspell/scheduler/JobStore.scala | 21 ++ .../docspell/scheduler/JobStoreModule.scala | 9 + .../scala/docspell/scheduler/JobTask.scala | 2 +- .../scheduler/PeriodicSchedulerConfig.scala | 5 + .../docspell/scheduler/SchedulerConfig.scala | 9 +- .../docspell/scheduler/SchedulerModule.scala | 6 + .../scheduler/usertask/UserTask.scala | 38 +-- .../scheduler/usertask/UserTaskScope.scala | 5 +- .../scheduler/usertask/UserTaskStore.scala | 102 +----- .../scheduler/CountingSchemeSpec.scala | 3 +- .../docspell/scheduler/impl/ContextImpl.scala | 61 ++++ .../docspell/scheduler/impl/JobQueue.scala | 37 ++ .../scheduler/impl/JobStoreImpl.scala | 87 +++++ .../impl/JobStoreModuleBuilder.scala | 50 +++ .../scheduler/impl/JobStorePublish.scala} | 40 ++- .../docspell/scheduler/impl}/LogSink.scala | 6 +- .../impl/PeriodicSchedulerBuilder.scala | 6 +- .../impl/PeriodicSchedulerImpl.scala | 6 +- .../scheduler/impl/PeriodicTaskStore.scala | 30 +- .../docspell/scheduler/impl}/QUserTask.scala | 39 ++- .../scheduler/impl}/QueueLogger.scala | 18 +- .../scheduler/impl/SchedulerBuilder.scala | 25 +- .../scheduler/impl/SchedulerImpl.scala | 6 +- .../impl/SchedulerModuleBuilder.scala | 68 ++++ .../scheduler/impl/UserTaskStoreImpl.scala | 117 +++++++ .../scala/docspell/store/records/RJob.scala | 31 +- .../docspell/store/queries/QJobTest.scala | 2 +- 50 files changed, 1076 insertions(+), 867 deletions(-) create mode 100644 modules/scheduler/api/src/main/scala/docspell/scheduler/Job.scala delete mode 100644 modules/scheduler/api/src/main/scala/docspell/scheduler/JobQueue.scala create mode 100644 modules/scheduler/api/src/main/scala/docspell/scheduler/JobStore.scala create mode 100644 modules/scheduler/api/src/main/scala/docspell/scheduler/JobStoreModule.scala create mode 100644 modules/scheduler/api/src/main/scala/docspell/scheduler/SchedulerModule.scala rename modules/{joex/src/test/scala/docspell/joex => scheduler/api/src/test/scala/docspell}/scheduler/CountingSchemeSpec.scala (92%) create mode 100644 modules/scheduler/impl/src/main/scala/docspell/scheduler/impl/ContextImpl.scala create mode 100644 modules/scheduler/impl/src/main/scala/docspell/scheduler/impl/JobQueue.scala create mode 100644 modules/scheduler/impl/src/main/scala/docspell/scheduler/impl/JobStoreImpl.scala create mode 100644 modules/scheduler/impl/src/main/scala/docspell/scheduler/impl/JobStoreModuleBuilder.scala rename modules/scheduler/{api/src/main/scala/docspell/scheduler/msg/JobQueuePublish.scala => impl/src/main/scala/docspell/scheduler/impl/JobStorePublish.scala} (60%) rename modules/scheduler/{api/src/main/scala/docspell/scheduler => impl/src/main/scala/docspell/scheduler/impl}/LogSink.scala (96%) rename modules/scheduler/{api/src/main/scala/docspell/scheduler/usertask => impl/src/main/scala/docspell/scheduler/impl}/QUserTask.scala (79%) rename modules/scheduler/{api/src/main/scala/docspell/scheduler => impl/src/main/scala/docspell/scheduler/impl}/QueueLogger.scala (85%) create mode 100644 modules/scheduler/impl/src/main/scala/docspell/scheduler/impl/SchedulerModuleBuilder.scala create mode 100644 modules/scheduler/impl/src/main/scala/docspell/scheduler/impl/UserTaskStoreImpl.scala diff --git a/build.sbt b/build.sbt index 91c21a17..d8ffd775 100644 --- a/build.sbt +++ b/build.sbt @@ -530,7 +530,7 @@ val schedulerApi = project Dependencies.fs2Core ++ Dependencies.circeCore ) - .dependsOn(loggingApi, common, store, notificationApi, pubsubApi) + .dependsOn(loggingApi, common, store, pubsubApi) val schedulerImpl = project .in(file("modules/scheduler/impl")) @@ -837,7 +837,8 @@ val restserver = project ftssolr, oidc, pubsubNaive, - notificationImpl + notificationImpl, + schedulerImpl ) // --- Website Documentation diff --git a/modules/backend/src/main/scala/docspell/backend/BackendApp.scala b/modules/backend/src/main/scala/docspell/backend/BackendApp.scala index c4758559..ebe361ac 100644 --- a/modules/backend/src/main/scala/docspell/backend/BackendApp.scala +++ b/modules/backend/src/main/scala/docspell/backend/BackendApp.scala @@ -14,8 +14,7 @@ import docspell.backend.signup.OSignup import docspell.ftsclient.FtsClient import docspell.notification.api.{EventExchange, NotificationModule} import docspell.pubsub.api.PubSubT -import docspell.scheduler.msg.JobQueuePublish -import docspell.scheduler.usertask.UserTaskStore +import docspell.scheduler.JobStoreModule import docspell.store.Store import docspell.totp.Totp import emil.Emil @@ -58,29 +57,43 @@ object BackendApp { javaEmil: Emil[F], ftsClient: FtsClient[F], pubSubT: PubSubT[F], + schedulerModule: JobStoreModule[F], notificationMod: NotificationModule[F] ): Resource[F, BackendApp[F]] = for { - utStore <- UserTaskStore(store) - queue <- JobQueuePublish(store, pubSubT, notificationMod) totpImpl <- OTotp(store, Totp.default) loginImpl <- Login[F](store, Totp.default) signupImpl <- OSignup[F](store) joexImpl <- OJoex(pubSubT) - collImpl <- OCollective[F](store, utStore, queue, joexImpl) + collImpl <- OCollective[F]( + store, + schedulerModule.userTasks, + schedulerModule.jobs, + joexImpl + ) sourceImpl <- OSource[F](store) tagImpl <- OTag[F](store) equipImpl <- OEquipment[F](store) orgImpl <- OOrganization(store) - uploadImpl <- OUpload(store, queue, joexImpl) + uploadImpl <- OUpload(store, schedulerModule.jobs, joexImpl) nodeImpl <- ONode(store) jobImpl <- OJob(store, joexImpl, pubSubT) createIndex <- CreateIndex.resource(ftsClient, store) - itemImpl <- OItem(store, ftsClient, createIndex, queue, joexImpl) + itemImpl <- OItem(store, ftsClient, createIndex, schedulerModule.jobs, joexImpl) itemSearchImpl <- OItemSearch(store) - fulltextImpl <- OFulltext(itemSearchImpl, ftsClient, store, queue, joexImpl) + fulltextImpl <- OFulltext( + itemSearchImpl, + ftsClient, + store, + schedulerModule.jobs, + joexImpl + ) mailImpl <- OMail(store, javaEmil) - userTaskImpl <- OUserTask(utStore, store, queue, joexImpl) + userTaskImpl <- OUserTask( + schedulerModule.userTasks, + store, + joexImpl + ) folderImpl <- OFolder(store) customFieldsImpl <- OCustomFields(store) simpleSearchImpl = OSimpleSearch(fulltextImpl, itemSearchImpl) @@ -90,7 +103,7 @@ object BackendApp { ) notifyImpl <- ONotification(store, notificationMod) bookmarksImpl <- OQueryBookmarks(store) - fileRepoImpl <- OFileRepository(store, queue, joexImpl) + fileRepoImpl <- OFileRepository(store, schedulerModule.jobs, joexImpl) } yield new BackendApp[F] { val pubSub = pubSubT val login = loginImpl diff --git a/modules/backend/src/main/scala/docspell/backend/JobFactory.scala b/modules/backend/src/main/scala/docspell/backend/JobFactory.scala index f41911f4..d18356fa 100644 --- a/modules/backend/src/main/scala/docspell/backend/JobFactory.scala +++ b/modules/backend/src/main/scala/docspell/backend/JobFactory.scala @@ -8,124 +8,91 @@ package docspell.backend import cats.effect._ import cats.implicits._ - import docspell.backend.MailAddressCodec import docspell.common._ import docspell.notification.api.PeriodicQueryArgs -import docspell.store.records.RJob +import docspell.scheduler.Job object JobFactory extends MailAddressCodec { def integrityCheck[F[_]: Sync]( args: FileIntegrityCheckArgs, submitter: AccountId = DocspellSystem.account - ): F[RJob] = - for { - id <- Ident.randomId[F] - now <- Timestamp.current[F] - job = RJob.newJob( - id, - FileIntegrityCheckArgs.taskName, - submitter.collective, - args, - s"Check integrity of files", - now, - submitter.user, - Priority.High, - Some(FileIntegrityCheckArgs.taskName) - ) - } yield job + ): F[Job[FileIntegrityCheckArgs]] = + Job.createNew( + FileIntegrityCheckArgs.taskName, + submitter.collective, + args, + s"Check integrity of files", + submitter.user, + Priority.High, + Some(FileIntegrityCheckArgs.taskName) + ) def fileCopy[F[_]: Sync]( args: FileCopyTaskArgs, submitter: AccountId = DocspellSystem.account - ): F[RJob] = - for { - id <- Ident.randomId[F] - now <- Timestamp.current[F] - job = RJob.newJob( - id, - FileCopyTaskArgs.taskName, - submitter.collective, - args, - s"Copying all files", - now, - submitter.user, - Priority.High, - Some(FileCopyTaskArgs.taskName) - ) - } yield job + ): F[Job[FileCopyTaskArgs]] = + Job.createNew( + FileCopyTaskArgs.taskName, + submitter.collective, + args, + "Copying all files", + submitter.user, + Priority.High, + Some(FileCopyTaskArgs.taskName) + ) - def periodicQuery[F[_]: Sync](args: PeriodicQueryArgs, submitter: AccountId): F[RJob] = - for { - id <- Ident.randomId[F] - now <- Timestamp.current[F] - job = RJob.newJob( - id, - PeriodicQueryArgs.taskName, - submitter.collective, - args, - s"Running periodic query, notify via ${args.channels.map(_.channelType)}", - now, - submitter.user, - Priority.Low, - None - ) - } yield job + def periodicQuery[F[_]: Sync]( + args: PeriodicQueryArgs, + submitter: AccountId + ): F[Job[PeriodicQueryArgs]] = + Job.createNew( + PeriodicQueryArgs.taskName, + submitter.collective, + args, + s"Running periodic query, notify via ${args.channels.map(_.channelType)}", + submitter.user, + Priority.Low, + None + ) def makePageCount[F[_]: Sync]( args: MakePageCountArgs, account: Option[AccountId] - ): F[RJob] = - for { - id <- Ident.randomId[F] - now <- Timestamp.current[F] - job = RJob.newJob( - id, - MakePageCountArgs.taskName, - account.map(_.collective).getOrElse(DocspellSystem.taskGroup), - args, - s"Find page-count metadata for ${args.attachment.id}", - now, - account.map(_.user).getOrElse(DocspellSystem.user), - Priority.Low, - Some(MakePageCountArgs.taskName / args.attachment) - ) - } yield job + ): F[Job[MakePageCountArgs]] = + Job.createNew( + MakePageCountArgs.taskName, + account.map(_.collective).getOrElse(DocspellSystem.taskGroup), + args, + s"Find page-count metadata for ${args.attachment.id}", + account.map(_.user).getOrElse(DocspellSystem.user), + Priority.Low, + Some(MakePageCountArgs.taskName / args.attachment) + ) def makePreview[F[_]: Sync]( args: MakePreviewArgs, account: Option[AccountId] - ): F[RJob] = - for { - id <- Ident.randomId[F] - now <- Timestamp.current[F] - job = RJob.newJob( - id, - MakePreviewArgs.taskName, - account.map(_.collective).getOrElse(DocspellSystem.taskGroup), - args, - s"Generate preview image", - now, - account.map(_.user).getOrElse(DocspellSystem.user), - Priority.Low, - Some(MakePreviewArgs.taskName / args.attachment) - ) - } yield job + ): F[Job[MakePreviewArgs]] = + Job.createNew( + MakePreviewArgs.taskName, + account.map(_.collective).getOrElse(DocspellSystem.taskGroup), + args, + s"Generate preview image", + account.map(_.user).getOrElse(DocspellSystem.user), + Priority.Low, + Some(MakePreviewArgs.taskName / args.attachment) + ) def allPreviews[F[_]: Sync]( args: AllPreviewsArgs, submitter: Option[Ident] - ): F[RJob] = - for { - id <- Ident.randomId[F] - now <- Timestamp.current[F] - } yield RJob.newJob( - id, + ): F[Job[AllPreviewsArgs]] = + Job.createNew( AllPreviewsArgs.taskName, args.collective.getOrElse(DocspellSystem.taskGroup), args, "Create preview images", - now, submitter.getOrElse(DocspellSystem.user), Priority.Low, Some(DocspellSystem.allPreviewTaskTracker) @@ -135,127 +102,91 @@ object JobFactory extends MailAddressCodec { collective: Option[Ident], submitter: Option[Ident], prio: Priority - ): F[RJob] = - for { - id <- Ident.randomId[F] - now <- Timestamp.current[F] - job = RJob.newJob( - id, - ConvertAllPdfArgs.taskName, - collective.getOrElse(DocspellSystem.taskGroup), - ConvertAllPdfArgs(collective), - s"Convert all pdfs not yet converted", - now, - submitter.getOrElse(DocspellSystem.user), - prio, - collective - .map(c => c / ConvertAllPdfArgs.taskName) - .orElse(ConvertAllPdfArgs.taskName.some) - ) - } yield job + ): F[Job[ConvertAllPdfArgs]] = + Job.createNew( + ConvertAllPdfArgs.taskName, + collective.getOrElse(DocspellSystem.taskGroup), + ConvertAllPdfArgs(collective), + s"Convert all pdfs not yet converted", + submitter.getOrElse(DocspellSystem.user), + prio, + collective + .map(c => c / ConvertAllPdfArgs.taskName) + .orElse(ConvertAllPdfArgs.taskName.some) + ) def reprocessItem[F[_]: Sync]( args: ReProcessItemArgs, account: AccountId, prio: Priority - ): F[RJob] = - for { - id <- Ident.randomId[F] - now <- Timestamp.current[F] - job = RJob.newJob( - id, - ReProcessItemArgs.taskName, - account.collective, - args, - s"Re-process files of item ${args.itemId.id}", - now, - account.user, - prio, - Some(ReProcessItemArgs.taskName / args.itemId) - ) - } yield job + ): F[Job[ReProcessItemArgs]] = + Job.createNew( + ReProcessItemArgs.taskName, + account.collective, + args, + s"Re-process files of item ${args.itemId.id}", + account.user, + prio, + Some(ReProcessItemArgs.taskName / args.itemId) + ) def processItem[F[_]: Sync]( args: ProcessItemArgs, account: AccountId, prio: Priority, tracker: Option[Ident] - ): F[RJob] = - for { - id <- Ident.randomId[F] - now <- Timestamp.current[F] - job = RJob.newJob( - id, - ProcessItemArgs.taskName, - account.collective, - args, - args.makeSubject, - now, - account.user, - prio, - tracker - ) - } yield job + ): F[Job[ProcessItemArgs]] = + Job.createNew( + ProcessItemArgs.taskName, + account.collective, + args, + args.makeSubject, + account.user, + prio, + tracker + ) def processItems[F[_]: Sync]( args: Vector[ProcessItemArgs], account: AccountId, prio: Priority, tracker: Option[Ident] - ): F[Vector[RJob]] = { - def create(now: Timestamp, arg: ProcessItemArgs): F[RJob] = - Ident - .randomId[F] - .map(id => - RJob.newJob( - id, - ProcessItemArgs.taskName, - account.collective, - arg, - arg.makeSubject, - now, - account.user, - prio, - tracker - ) - ) + ): F[Vector[Job[ProcessItemArgs]]] = { + def create(arg: ProcessItemArgs): F[Job[ProcessItemArgs]] = + Job.createNew( + ProcessItemArgs.taskName, + account.collective, + arg, + arg.makeSubject, + account.user, + prio, + tracker + ) - for { - now <- Timestamp.current[F] - jobs <- args.traverse(a => create(now, a)) - } yield jobs + args.traverse(create) } - def reIndexAll[F[_]: Sync]: F[RJob] = - for { - id <- Ident.randomId[F] - now <- Timestamp.current[F] - } yield RJob.newJob( - id, + def reIndexAll[F[_]: Sync]: F[Job[ReIndexTaskArgs]] = + Job.createNew( ReIndexTaskArgs.taskName, DocspellSystem.taskGroup, ReIndexTaskArgs(None), - s"Recreate full-text index", - now, + "Recreate full-text index", DocspellSystem.taskGroup, Priority.Low, Some(DocspellSystem.migrationTaskTracker) ) - def reIndex[F[_]: Sync](account: AccountId): F[RJob] = - for { - id <- Ident.randomId[F] - now <- Timestamp.current[F] - args = ReIndexTaskArgs(Some(account.collective)) - } yield RJob.newJob( - id, + def reIndex[F[_]: Sync](account: AccountId): F[Job[ReIndexTaskArgs]] = { + val args = ReIndexTaskArgs(Some(account.collective)) + Job.createNew( ReIndexTaskArgs.taskName, account.collective, args, - s"Recreate full-text index", - now, + "Recreate full-text index", account.user, Priority.Low, Some(ReIndexTaskArgs.tracker(args)) ) + } } diff --git a/modules/backend/src/main/scala/docspell/backend/ops/OCollective.scala b/modules/backend/src/main/scala/docspell/backend/ops/OCollective.scala index ada2bb52..09f66ef4 100644 --- a/modules/backend/src/main/scala/docspell/backend/ops/OCollective.scala +++ b/modules/backend/src/main/scala/docspell/backend/ops/OCollective.scala @@ -18,7 +18,7 @@ import docspell.store.queries.{QCollective, QUser} import docspell.store.records._ import docspell.store.{AddResult, Store} import com.github.eikek.calev._ -import docspell.scheduler.JobQueue +import docspell.scheduler.JobStore import docspell.scheduler.usertask.{UserTask, UserTaskScope, UserTaskStore} trait OCollective[F[_]] { @@ -131,7 +131,7 @@ object OCollective { def apply[F[_]: Async]( store: Store[F], uts: UserTaskStore[F], - queue: JobQueue[F], + jobStore: JobStore[F], joex: OJoex[F] ): Resource[F, OCollective[F]] = Resource.pure[F, OCollective[F]](new OCollective[F] { @@ -194,32 +194,32 @@ object OCollective { for { id <- Ident.randomId[F] args = LearnClassifierArgs(collective) - ut <- UserTask( + ut = UserTask( id, LearnClassifierArgs.taskName, true, CalEvent(WeekdayComponent.All, DateEvent.All, TimeEvent.All), None, args - ).encode.toPeriodicTask(UserTaskScope(collective), args.makeSubject.some) - job <- ut.toJob - _ <- queue.insert(job) + ) + _ <- uts + .updateOneTask(UserTaskScope(collective), args.makeSubject.some, ut) _ <- joex.notifyAllNodes } yield () def startEmptyTrash(args: EmptyTrashArgs): F[Unit] = for { id <- Ident.randomId[F] - ut <- UserTask( + ut = UserTask( id, EmptyTrashArgs.taskName, true, CalEvent(WeekdayComponent.All, DateEvent.All, TimeEvent.All), None, args - ).encode.toPeriodicTask(UserTaskScope(args.collective), args.makeSubject.some) - job <- ut.toJob - _ <- queue.insert(job) + ) + _ <- uts + .updateOneTask(UserTaskScope(args.collective), args.makeSubject.some, ut) _ <- joex.notifyAllNodes } yield () @@ -319,7 +319,7 @@ object OCollective { AllPreviewsArgs(Some(account.collective), storeMode), Some(account.user) ) - _ <- queue.insertIfNew(job) + _ <- jobStore.insertIfNew(job.encode) _ <- if (notifyJoex) joex.notifyAllNodes else ().pure[F] } yield UpdateResult.success diff --git a/modules/backend/src/main/scala/docspell/backend/ops/OFileRepository.scala b/modules/backend/src/main/scala/docspell/backend/ops/OFileRepository.scala index 12ea8191..b46c075f 100644 --- a/modules/backend/src/main/scala/docspell/backend/ops/OFileRepository.scala +++ b/modules/backend/src/main/scala/docspell/backend/ops/OFileRepository.scala @@ -12,17 +12,22 @@ import cats.implicits._ import docspell.backend.JobFactory import docspell.backend.ops.OFileRepository.IntegrityResult import docspell.common._ -import docspell.scheduler.JobQueue +import docspell.scheduler.{Job, JobStore} import docspell.store.Store -import docspell.store.records.RJob import scodec.bits.ByteVector trait OFileRepository[F[_]] { /** Inserts the job or return None if such a job already is running. */ - def cloneFileRepository(args: FileCopyTaskArgs, notifyJoex: Boolean): F[Option[RJob]] + def cloneFileRepository( + args: FileCopyTaskArgs, + notifyJoex: Boolean + ): F[Option[Job[FileCopyTaskArgs]]] - def checkIntegrityAll(part: FileKeyPart, notifyJoex: Boolean): F[Option[RJob]] + def checkIntegrityAll( + part: FileKeyPart, + notifyJoex: Boolean + ): F[Option[Job[FileIntegrityCheckArgs]]] def checkIntegrity(key: FileKey, hash: Option[ByteVector]): F[Option[IntegrityResult]] } @@ -33,7 +38,7 @@ object OFileRepository { def apply[F[_]: Async]( store: Store[F], - queue: JobQueue[F], + jobStore: JobStore[F], joex: OJoex[F] ): Resource[F, OFileRepository[F]] = Resource.pure(new OFileRepository[F] { @@ -42,17 +47,20 @@ object OFileRepository { def cloneFileRepository( args: FileCopyTaskArgs, notifyJoex: Boolean - ): F[Option[RJob]] = + ): F[Option[Job[FileCopyTaskArgs]]] = for { job <- JobFactory.fileCopy(args) - flag <- queue.insertIfNew(job) + flag <- jobStore.insertIfNew(job.encode) _ <- if (notifyJoex) joex.notifyAllNodes else ().pure[F] } yield Option.when(flag)(job) - def checkIntegrityAll(part: FileKeyPart, notifyJoex: Boolean): F[Option[RJob]] = + def checkIntegrityAll( + part: FileKeyPart, + notifyJoex: Boolean + ): F[Option[Job[FileIntegrityCheckArgs]]] = for { job <- JobFactory.integrityCheck(FileIntegrityCheckArgs(part)) - flag <- queue.insertIfNew(job) + flag <- jobStore.insertIfNew(job.encode) _ <- if (notifyJoex) joex.notifyAllNodes else ().pure[F] } yield Option.when(flag)(job) diff --git a/modules/backend/src/main/scala/docspell/backend/ops/OFulltext.scala b/modules/backend/src/main/scala/docspell/backend/ops/OFulltext.scala index 88b9c3e3..8fcdb9cd 100644 --- a/modules/backend/src/main/scala/docspell/backend/ops/OFulltext.scala +++ b/modules/backend/src/main/scala/docspell/backend/ops/OFulltext.scala @@ -16,7 +16,7 @@ import docspell.common._ import docspell.ftsclient._ import docspell.query.ItemQuery._ import docspell.query.ItemQueryDsl._ -import docspell.scheduler.JobQueue +import docspell.scheduler.JobStore import docspell.store.queries.{QFolder, QItem, SelectedItem} import docspell.store.records.RJob import docspell.store.{Store, qb} @@ -80,7 +80,7 @@ object OFulltext { itemSearch: OItemSearch[F], fts: FtsClient[F], store: Store[F], - queue: JobQueue[F], + jobStore: JobStore[F], joex: OJoex[F] ): Resource[F, OFulltext[F]] = Resource.pure[F, OFulltext[F]](new OFulltext[F] { @@ -89,7 +89,7 @@ object OFulltext { for { _ <- logger.info(s"Re-index all.") job <- JobFactory.reIndexAll[F] - _ <- queue.insertIfNew(job) *> joex.notifyAllNodes + _ <- jobStore.insertIfNew(job.encode) *> joex.notifyAllNodes } yield () def reindexCollective(account: AccountId): F[Unit] = @@ -101,7 +101,7 @@ object OFulltext { job <- JobFactory.reIndex(account) _ <- if (exist.isDefined) ().pure[F] - else queue.insertIfNew(job) *> joex.notifyAllNodes + else jobStore.insertIfNew(job.encode) *> joex.notifyAllNodes } yield () def findIndexOnly(maxNoteLen: Int)( @@ -323,9 +323,7 @@ object OFulltext { def apply[A](implicit ev: ItemId[A]): ItemId[A] = ev def from[A](f: A => Ident): ItemId[A] = - new ItemId[A] { - def itemId(a: A) = f(a) - } + (a: A) => f(a) implicit val listItemId: ItemId[ListItem] = ItemId.from(_.id) diff --git a/modules/backend/src/main/scala/docspell/backend/ops/OItem.scala b/modules/backend/src/main/scala/docspell/backend/ops/OItem.scala index 41a48e7a..3a8b08ea 100644 --- a/modules/backend/src/main/scala/docspell/backend/ops/OItem.scala +++ b/modules/backend/src/main/scala/docspell/backend/ops/OItem.scala @@ -17,7 +17,7 @@ import docspell.common._ import docspell.ftsclient.FtsClient import docspell.logging.Logger import docspell.notification.api.Event -import docspell.scheduler.JobQueue +import docspell.scheduler.JobStore import docspell.store.queries.{QAttachment, QItem, QMoveAttachment} import docspell.store.records._ import docspell.store.{AddResult, Store, UpdateResult} @@ -226,7 +226,7 @@ object OItem { store: Store[F], fts: FtsClient[F], createIndex: CreateIndex[F], - queue: JobQueue[F], + jobStore: JobStore[F], joex: OJoex[F] ): Resource[F, OItem[F]] = for { @@ -286,7 +286,7 @@ object OItem { ) ev = Event.TagsChanged.partial( itemIds, - added.toList.flatten.map(_.id).toList, + added.toList.flatten.map(_.id), Nil ) } yield AttachedEvent(UpdateResult.success)(ev)) @@ -761,7 +761,7 @@ object OItem { job <- OptionT.liftF( JobFactory.reprocessItem[F](args, account, Priority.Low) ) - _ <- OptionT.liftF(queue.insertIfNew(job)) + _ <- OptionT.liftF(jobStore.insertIfNew(job.encode)) _ <- OptionT.liftF(if (notifyJoex) joex.notifyAllNodes else ().pure[F]) } yield UpdateResult.success).getOrElse(UpdateResult.notFound) @@ -775,7 +775,8 @@ object OItem { jobs <- items .map(item => ReProcessItemArgs(item, Nil)) .traverse(arg => JobFactory.reprocessItem[F](arg, account, Priority.Low)) - _ <- queue.insertAllIfNew(jobs) + .map(_.map(_.encode)) + _ <- jobStore.insertAllIfNew(jobs) _ <- if (notifyJoex) joex.notifyAllNodes else ().pure[F] } yield items.size) @@ -786,7 +787,7 @@ object OItem { ): F[UpdateResult] = for { job <- JobFactory.convertAllPdfs[F](collective, submitter, Priority.Low) - _ <- queue.insertIfNew(job) + _ <- jobStore.insertIfNew(job.encode) _ <- if (notifyJoex) joex.notifyAllNodes else ().pure[F] } yield UpdateResult.success @@ -797,7 +798,7 @@ object OItem { ): F[UpdateResult] = for { job <- JobFactory.makePreview[F](args, account.some) - _ <- queue.insertIfNew(job) + _ <- jobStore.insertIfNew(job.encode) _ <- if (notifyJoex) joex.notifyAllNodes else ().pure[F] } yield UpdateResult.success @@ -807,7 +808,7 @@ object OItem { ): F[UpdateResult] = for { job <- JobFactory.allPreviews[F](AllPreviewsArgs(None, storeMode), None) - _ <- queue.insertIfNew(job) + _ <- jobStore.insertIfNew(job.encode) _ <- if (notifyJoex) joex.notifyAllNodes else ().pure[F] } yield UpdateResult.success diff --git a/modules/backend/src/main/scala/docspell/backend/ops/OUpload.scala b/modules/backend/src/main/scala/docspell/backend/ops/OUpload.scala index 4e1e2dd4..674aa5a4 100644 --- a/modules/backend/src/main/scala/docspell/backend/ops/OUpload.scala +++ b/modules/backend/src/main/scala/docspell/backend/ops/OUpload.scala @@ -13,7 +13,7 @@ import cats.implicits._ import fs2.Stream import docspell.backend.JobFactory import docspell.common._ -import docspell.scheduler.JobQueue +import docspell.scheduler.{Job, JobStore} import docspell.store.Store import docspell.store.records._ @@ -107,7 +107,7 @@ object OUpload { def apply[F[_]: Sync]( store: Store[F], - queue: JobQueue[F], + jobStore: JobStore[F], joex: OJoex[F] ): Resource[F, OUpload[F]] = Resource.pure[F, OUpload[F]](new OUpload[F] { @@ -186,10 +186,10 @@ object OUpload { private def submitJobs( notifyJoex: Boolean - )(jobs: Vector[RJob]): F[OUpload.UploadResult] = + )(jobs: Vector[Job[String]]): F[OUpload.UploadResult] = for { _ <- logger.debug(s"Storing jobs: $jobs") - _ <- queue.insertAll(jobs) + _ <- jobStore.insertAll(jobs) _ <- if (notifyJoex) joex.notifyAllNodes else ().pure[F] } yield UploadResult.Success @@ -243,7 +243,9 @@ object OUpload { account: AccountId, prio: Priority, tracker: Option[Ident] - ): F[Vector[RJob]] = - JobFactory.processItems[F](args, account, prio, tracker) + ): F[Vector[Job[String]]] = + JobFactory + .processItems[F](args, account, prio, tracker) + .map(_.map(_.encode)) }) } diff --git a/modules/backend/src/main/scala/docspell/backend/ops/OUserTask.scala b/modules/backend/src/main/scala/docspell/backend/ops/OUserTask.scala index 87b333e4..f1171bd7 100644 --- a/modules/backend/src/main/scala/docspell/backend/ops/OUserTask.scala +++ b/modules/backend/src/main/scala/docspell/backend/ops/OUserTask.scala @@ -12,7 +12,6 @@ import cats.implicits._ import fs2.Stream import docspell.common._ import docspell.notification.api.{ChannelRef, PeriodicDueItemsArgs, PeriodicQueryArgs} -import docspell.scheduler.JobQueue import docspell.scheduler.usertask.{UserTask, UserTaskScope, UserTaskStore} import docspell.store.Store import docspell.store.records.RNotificationChannel @@ -84,7 +83,6 @@ object OUserTask { def apply[F[_]: Async]( taskStore: UserTaskStore[F], store: Store[F], - queue: JobQueue[F], joex: OJoex[F] ): Resource[F, OUserTask[F]] = Resource.pure[F, OUserTask[F]](new OUserTask[F] { @@ -93,10 +91,8 @@ object OUserTask { implicit E: Encoder[A] ): F[Unit] = for { - ptask <- task.encode.toPeriodicTask(scope, subject) - job <- ptask.toJob - _ <- queue.insert(job) - _ <- joex.notifyPeriodicTasks + _ <- taskStore.executeNow(scope, subject, task) + _ <- joex.notifyAllNodes } yield () def getScanMailbox(scope: UserTaskScope): Stream[F, UserTask[ScanMailboxArgs]] = diff --git a/modules/common/src/main/scala/docspell/common/syntax/StringSyntax.scala b/modules/common/src/main/scala/docspell/common/syntax/StringSyntax.scala index 40d5bf80..5e0038da 100644 --- a/modules/common/src/main/scala/docspell/common/syntax/StringSyntax.scala +++ b/modules/common/src/main/scala/docspell/common/syntax/StringSyntax.scala @@ -6,10 +6,8 @@ package docspell.common.syntax -import cats.implicits._ - import io.circe.Decoder -import io.circe.parser._ +import io.circe.parser trait StringSyntax { implicit class EvenMoreStringOps(s: String) { @@ -18,9 +16,8 @@ trait StringSyntax { def parseJsonAs[A](implicit d: Decoder[A]): Either[Throwable, A] = for { - json <- parse(s).leftMap(_.underlying) - value <- json.as[A] - } yield value + json <- parser.decode[A](s) + } yield json } } diff --git a/modules/joex/src/main/scala/docspell/joex/JoexAppImpl.scala b/modules/joex/src/main/scala/docspell/joex/JoexAppImpl.scala index fd357ebf..ec9d5682 100644 --- a/modules/joex/src/main/scala/docspell/joex/JoexAppImpl.scala +++ b/modules/joex/src/main/scala/docspell/joex/JoexAppImpl.scala @@ -31,16 +31,11 @@ import docspell.joex.process.ItemHandler import docspell.joex.process.ReProcessItem import docspell.joex.scanmailbox._ import docspell.scheduler._ -import docspell.scheduler.impl.{ - PeriodicSchedulerBuilder, - PeriodicTaskStore, - SchedulerBuilder -} +import docspell.scheduler.impl.{JobStoreModuleBuilder, SchedulerModuleBuilder} import docspell.joex.updatecheck._ import docspell.notification.api.NotificationModule import docspell.notification.impl.NotificationModuleImpl import docspell.pubsub.api.{PubSub, PubSubT} -import docspell.scheduler.msg.JobQueuePublish import docspell.scheduler.usertask.{UserTaskScope, UserTaskStore} import docspell.store.Store import docspell.store.records.{REmptyTrashSetting, RJobLog} @@ -50,8 +45,8 @@ import org.http4s.client.Client final class JoexAppImpl[F[_]: Async]( cfg: Config, store: Store[F], - queue: JobQueue[F], - pstore: PeriodicTaskStore[F], + uts: UserTaskStore[F], + jobStore: JobStore[F], termSignal: SignallingRef[F, Boolean], notificationMod: NotificationModule[F], val scheduler: Scheduler[F], @@ -82,32 +77,30 @@ final class JoexAppImpl[F[_]: Async]( private def scheduleBackgroundTasks: F[Unit] = HouseKeepingTask .periodicTask[F](cfg.houseKeeping.schedule) - .flatMap(pstore.insert) *> + .flatMap(t => uts.updateTask(UserTaskScope.system, t.summary, t)) *> scheduleEmptyTrashTasks *> UpdateCheckTask .periodicTask(cfg.updateCheck) - .flatMap(pstore.insert) *> - MigrationTask.job.flatMap(queue.insertIfNew) *> + .flatMap(t => uts.updateTask(UserTaskScope.system, t.summary, t)) *> + MigrationTask.job.flatMap(jobStore.insertIfNew) *> AllPreviewsTask .job(MakePreviewArgs.StoreMode.WhenMissing, None) - .flatMap(queue.insertIfNew) *> - AllPageCountTask.job.flatMap(queue.insertIfNew).as(()) + .flatMap(jobStore.insertIfNew) *> + AllPageCountTask.job.flatMap(jobStore.insertIfNew).void private def scheduleEmptyTrashTasks: F[Unit] = store .transact( REmptyTrashSetting.findForAllCollectives(OCollective.EmptyTrash.default, 50) ) - .evalMap(es => - UserTaskStore(store).use { uts => - val args = EmptyTrashArgs(es.cid, es.minAge) - uts.updateOneTask( - UserTaskScope(args.collective), - args.makeSubject.some, - EmptyTrashTask.userTask(args, es.schedule) - ) - } - ) + .evalMap { es => + val args = EmptyTrashArgs(es.cid, es.minAge) + uts.updateOneTask( + UserTaskScope(args.collective), + args.makeSubject.some, + EmptyTrashTask.userTask(args, es.schedule) + ) + } .compile .drain @@ -123,186 +116,186 @@ object JoexAppImpl extends MailAddressCodec { pubSub: PubSub[F] ): Resource[F, JoexApp[F]] = for { - pstore <- PeriodicTaskStore.create(store) - joexLogger = docspell.logging.getLogger[F](s"joex-${cfg.appId.id}") + joexLogger <- Resource.pure(docspell.logging.getLogger[F](s"joex-${cfg.appId.id}")) pubSubT = PubSubT(pubSub, joexLogger) javaEmil = JavaMailEmil(Settings.defaultSettings.copy(debug = cfg.mailDebug)) notificationMod <- Resource.eval( NotificationModuleImpl[F](store, javaEmil, httpClient, 200) ) - queue <- JobQueuePublish(store, pubSubT, notificationMod) + + jobStoreModule = JobStoreModuleBuilder(store) + .withPubsub(pubSubT) + .withEventSink(notificationMod) + .build + joex <- OJoex(pubSubT) - upload <- OUpload(store, queue, joex) + upload <- OUpload(store, jobStoreModule.jobs, joex) fts <- createFtsClient(cfg)(httpClient) createIndex <- CreateIndex.resource(fts, store) - itemOps <- OItem(store, fts, createIndex, queue, joex) + itemOps <- OItem(store, fts, createIndex, jobStoreModule.jobs, joex) itemSearchOps <- OItemSearch(store) analyser <- TextAnalyser.create[F](cfg.textAnalysis.textAnalysisConfig) regexNer <- RegexNerFile(cfg.textAnalysis.regexNerFileConfig, store) updateCheck <- UpdateCheck.resource(httpClient) notification <- ONotification(store, notificationMod) - fileRepo <- OFileRepository(store, queue, joex) - sch <- SchedulerBuilder(cfg.scheduler, store) - .withQueue(queue) - .withPubSub(pubSubT) - .withEventSink(notificationMod) - .withTask( - JobTask.json( - ProcessItemArgs.taskName, - ItemHandler.newItem[F](cfg, itemOps, fts, analyser, regexNer), - ItemHandler.onCancel[F] + fileRepo <- OFileRepository(store, jobStoreModule.jobs, joex) + + schedulerModule <- SchedulerModuleBuilder(jobStoreModule) + .withSchedulerConfig(cfg.scheduler) + .withPeriodicSchedulerConfig(cfg.periodicScheduler) + .withTaskRegistry(JobTaskRegistry + .empty[F] + .withTask( + JobTask.json( + ProcessItemArgs.taskName, + ItemHandler.newItem[F](cfg, itemOps, fts, analyser, regexNer), + ItemHandler.onCancel[F] + ) ) - ) - .withTask( - JobTask.json( - ReProcessItemArgs.taskName, - ReProcessItem[F](cfg, fts, itemOps, analyser, regexNer), - ReProcessItem.onCancel[F] + .withTask( + JobTask.json( + ReProcessItemArgs.taskName, + ReProcessItem[F](cfg, fts, itemOps, analyser, regexNer), + ReProcessItem.onCancel[F] + ) ) - ) - .withTask( - JobTask.json( - ScanMailboxArgs.taskName, - ScanMailboxTask[F](cfg.userTasks.scanMailbox, javaEmil, upload, joex), - ScanMailboxTask.onCancel[F] + .withTask( + JobTask.json( + ScanMailboxArgs.taskName, + ScanMailboxTask[F](cfg.userTasks.scanMailbox, javaEmil, upload, joex), + ScanMailboxTask.onCancel[F] + ) ) - ) - .withTask( - JobTask.json( - MigrationTask.taskName, - MigrationTask[F](cfg.fullTextSearch, fts, createIndex), - MigrationTask.onCancel[F] + .withTask( + JobTask.json( + MigrationTask.taskName, + MigrationTask[F](cfg.fullTextSearch, fts, createIndex), + MigrationTask.onCancel[F] + ) ) - ) - .withTask( - JobTask.json( - ReIndexTask.taskName, - ReIndexTask[F](cfg.fullTextSearch, fts, createIndex), - ReIndexTask.onCancel[F] + .withTask( + JobTask.json( + ReIndexTask.taskName, + ReIndexTask[F](cfg.fullTextSearch, fts, createIndex), + ReIndexTask.onCancel[F] + ) ) - ) - .withTask( - JobTask.json( - HouseKeepingTask.taskName, - HouseKeepingTask[F](cfg, fileRepo), - HouseKeepingTask.onCancel[F] + .withTask( + JobTask.json( + HouseKeepingTask.taskName, + HouseKeepingTask[F](cfg, fileRepo), + HouseKeepingTask.onCancel[F] + ) ) - ) - .withTask( - JobTask.json( - PdfConvTask.taskName, - PdfConvTask[F](cfg), - PdfConvTask.onCancel[F] + .withTask( + JobTask.json( + PdfConvTask.taskName, + PdfConvTask[F](cfg), + PdfConvTask.onCancel[F] + ) ) - ) - .withTask( - JobTask.json( - ConvertAllPdfArgs.taskName, - ConvertAllPdfTask[F](queue, joex), - ConvertAllPdfTask.onCancel[F] + .withTask( + JobTask.json( + ConvertAllPdfArgs.taskName, + ConvertAllPdfTask[F](jobStoreModule.jobs, joex), + ConvertAllPdfTask.onCancel[F] + ) ) - ) - .withTask( - JobTask.json( - LearnClassifierArgs.taskName, - LearnClassifierTask[F](cfg.textAnalysis, analyser), - LearnClassifierTask.onCancel[F] + .withTask( + JobTask.json( + LearnClassifierArgs.taskName, + LearnClassifierTask[F](cfg.textAnalysis, analyser), + LearnClassifierTask.onCancel[F] + ) ) - ) - .withTask( - JobTask.json( - MakePreviewArgs.taskName, - MakePreviewTask[F](cfg.extraction.preview), - MakePreviewTask.onCancel[F] + .withTask( + JobTask.json( + MakePreviewArgs.taskName, + MakePreviewTask[F](cfg.extraction.preview), + MakePreviewTask.onCancel[F] + ) ) - ) - .withTask( - JobTask.json( - AllPreviewsArgs.taskName, - AllPreviewsTask[F](queue, joex), - AllPreviewsTask.onCancel[F] + .withTask( + JobTask.json( + AllPreviewsArgs.taskName, + AllPreviewsTask[F](jobStoreModule.jobs, joex), + AllPreviewsTask.onCancel[F] + ) ) - ) - .withTask( - JobTask.json( - MakePageCountArgs.taskName, - MakePageCountTask[F](), - MakePageCountTask.onCancel[F] + .withTask( + JobTask.json( + MakePageCountArgs.taskName, + MakePageCountTask[F](), + MakePageCountTask.onCancel[F] + ) ) - ) - .withTask( - JobTask.json( - AllPageCountTask.taskName, - AllPageCountTask[F](queue, joex), - AllPageCountTask.onCancel[F] + .withTask( + JobTask.json( + AllPageCountTask.taskName, + AllPageCountTask[F](jobStoreModule.jobs, joex), + AllPageCountTask.onCancel[F] + ) ) - ) - .withTask( - JobTask.json( - EmptyTrashArgs.taskName, - EmptyTrashTask[F](itemOps, itemSearchOps), - EmptyTrashTask.onCancel[F] + .withTask( + JobTask.json( + EmptyTrashArgs.taskName, + EmptyTrashTask[F](itemOps, itemSearchOps), + EmptyTrashTask.onCancel[F] + ) ) - ) - .withTask( - JobTask.json( - UpdateCheckTask.taskName, - UpdateCheckTask[F]( - cfg.updateCheck, - cfg.sendMail, - javaEmil, - updateCheck, - ThisVersion.default - ), - UpdateCheckTask.onCancel[F] + .withTask( + JobTask.json( + UpdateCheckTask.taskName, + UpdateCheckTask[F]( + cfg.updateCheck, + cfg.sendMail, + javaEmil, + updateCheck, + ThisVersion.default + ), + UpdateCheckTask.onCancel[F] + ) ) - ) - .withTask( - JobTask.json( - PeriodicQueryTask.taskName, - PeriodicQueryTask[F](notification), - PeriodicQueryTask.onCancel[F] + .withTask( + JobTask.json( + PeriodicQueryTask.taskName, + PeriodicQueryTask[F](notification), + PeriodicQueryTask.onCancel[F] + ) ) - ) - .withTask( - JobTask.json( - PeriodicDueItemsTask.taskName, - PeriodicDueItemsTask[F](notification), - PeriodicDueItemsTask.onCancel[F] + .withTask( + JobTask.json( + PeriodicDueItemsTask.taskName, + PeriodicDueItemsTask[F](notification), + PeriodicDueItemsTask.onCancel[F] + ) ) - ) - .withTask( - JobTask.json( - FileCopyTaskArgs.taskName, - FileCopyTask[F](cfg), - FileCopyTask.onCancel[F] + .withTask( + JobTask.json( + FileCopyTaskArgs.taskName, + FileCopyTask[F](cfg), + FileCopyTask.onCancel[F] + ) ) - ) - .withTask( - JobTask.json( - FileIntegrityCheckArgs.taskName, - FileIntegrityCheckTask[F](fileRepo), - FileIntegrityCheckTask.onCancel[F] + .withTask( + JobTask.json( + FileIntegrityCheckArgs.taskName, + FileIntegrityCheckTask[F](fileRepo), + FileIntegrityCheckTask.onCancel[F] + ) ) ) .resource - psch <- PeriodicSchedulerBuilder.build( - cfg.periodicScheduler, - sch, - queue, - pstore, - pubSubT - ) app = new JoexAppImpl( cfg, store, - queue, - pstore, + jobStoreModule.userTasks, + jobStoreModule.jobs, termSignal, notificationMod, - sch, - psch + schedulerModule.scheduler, + schedulerModule.periodicScheduler ) appR <- Resource.make(app.init.map(_ => app))(_.initShutdown) } yield appR @@ -312,4 +305,5 @@ object JoexAppImpl extends MailAddressCodec { )(client: Client[F]): Resource[F, FtsClient[F]] = if (cfg.fullTextSearch.enabled) SolrFtsClient(cfg.fullTextSearch.solr, client) else Resource.pure[F, FtsClient[F]](FtsClient.none[F]) + } diff --git a/modules/joex/src/main/scala/docspell/joex/fts/MigrationTask.scala b/modules/joex/src/main/scala/docspell/joex/fts/MigrationTask.scala index 707b457b..bd0e6fb9 100644 --- a/modules/joex/src/main/scala/docspell/joex/fts/MigrationTask.scala +++ b/modules/joex/src/main/scala/docspell/joex/fts/MigrationTask.scala @@ -8,13 +8,11 @@ package docspell.joex.fts import cats.effect._ import cats.implicits._ - import docspell.backend.fulltext.CreateIndex import docspell.common._ import docspell.ftsclient._ import docspell.joex.Config -import docspell.scheduler.Task -import docspell.store.records.RJob +import docspell.scheduler.{Job, Task} object MigrationTask { val taskName = Ident.unsafe("full-text-index") @@ -38,21 +36,18 @@ object MigrationTask { def onCancel[F[_]]: Task[F, Unit, Unit] = Task.log[F, Unit](_.warn("Cancelling full-text-index task")) - def job[F[_]: Sync]: F[RJob] = - for { - id <- Ident.randomId[F] - now <- Timestamp.current[F] - } yield RJob.newJob( - id, - taskName, - DocspellSystem.taskGroup, - (), - "Create full-text index", - now, - DocspellSystem.taskGroup, - Priority.Low, - Some(DocspellSystem.migrationTaskTracker) - ) + def job[F[_]: Sync]: F[Job[String]] = + Job + .createNew( + taskName, + DocspellSystem.taskGroup, + (), + "Create full-text index", + DocspellSystem.taskGroup, + Priority.Low, + Some(DocspellSystem.migrationTaskTracker) + ) + .map(_.encode) def migrationTasks[F[_]: Async](fts: FtsClient[F]): F[List[Migration[F]]] = fts.initialize.map(_.map(fm => Migration.from(fm))) diff --git a/modules/joex/src/main/scala/docspell/joex/hk/HouseKeepingTask.scala b/modules/joex/src/main/scala/docspell/joex/hk/HouseKeepingTask.scala index 8515eae6..ff025d31 100644 --- a/modules/joex/src/main/scala/docspell/joex/hk/HouseKeepingTask.scala +++ b/modules/joex/src/main/scala/docspell/joex/hk/HouseKeepingTask.scala @@ -13,9 +13,8 @@ import docspell.common._ import docspell.joex.Config import docspell.joex.filecopy.FileIntegrityCheckTask import docspell.scheduler.{JobTaskResultEncoder, Task} -import docspell.store.records._ import com.github.eikek.calev._ -import docspell.scheduler.usertask.{QUserTask, UserTaskScope} +import docspell.scheduler.usertask.UserTask import io.circe.Encoder import io.circe.generic.semiauto.deriveEncoder @@ -45,19 +44,15 @@ object HouseKeepingTask { def onCancel[F[_]]: Task[F, Unit, Unit] = Task.log[F, Unit](_.warn("Cancelling house-keeping task")) - def periodicTask[F[_]: Sync](ce: CalEvent): F[RPeriodicTask] = - QUserTask - .createJson( - true, - UserTaskScope(DocspellSystem.taskGroup), - taskName, - (), - "Docspell house-keeping", - Priority.Low, - ce, - None - ) - .map(_.copy(id = periodicId)) + def periodicTask[F[_]: Sync](ce: CalEvent): F[UserTask[Unit]] = + UserTask( + periodicId, + taskName, + true, + ce, + "Docspell house-keeping".some, + () + ).pure[F] case class Result( checkNodes: CleanupResult, diff --git a/modules/joex/src/main/scala/docspell/joex/pagecount/AllPageCountTask.scala b/modules/joex/src/main/scala/docspell/joex/pagecount/AllPageCountTask.scala index 0fb9e299..5b8448d9 100644 --- a/modules/joex/src/main/scala/docspell/joex/pagecount/AllPageCountTask.scala +++ b/modules/joex/src/main/scala/docspell/joex/pagecount/AllPageCountTask.scala @@ -12,20 +12,19 @@ import fs2.{Chunk, Stream} import docspell.backend.JobFactory import docspell.backend.ops.OJoex import docspell.common._ -import docspell.scheduler.{Context, JobQueue, Task} +import docspell.scheduler.{Context, Job, JobStore, Task} import docspell.store.records.RAttachment -import docspell.store.records.RJob object AllPageCountTask { val taskName = Ident.unsafe("all-page-count") type Args = Unit - def apply[F[_]: Sync](queue: JobQueue[F], joex: OJoex[F]): Task[F, Args, Unit] = + def apply[F[_]: Sync](jobStore: JobStore[F], joex: OJoex[F]): Task[F, Args, Unit] = Task { ctx => for { _ <- ctx.logger.info("Generating previews for attachments") - n <- submitConversionJobs(ctx, queue) + n <- submitConversionJobs(ctx, jobStore) _ <- ctx.logger.info(s"Submitted $n jobs") _ <- joex.notifyAllNodes } yield () @@ -36,14 +35,14 @@ object AllPageCountTask { def submitConversionJobs[F[_]: Sync]( ctx: Context[F, Args], - queue: JobQueue[F] + jobStore: JobStore[F] ): F[Int] = ctx.store .transact(findAttachments) .chunks .flatMap(createJobs[F]) .chunks - .evalMap(jobs => queue.insertAllIfNew(jobs.toVector).map(_ => jobs.size)) + .evalMap(jobs => jobStore.insertAllIfNew(jobs.toVector).map(_ => jobs.size)) .evalTap(n => ctx.logger.debug(s"Submitted $n jobs …")) .compile .foldMonoid @@ -51,28 +50,25 @@ object AllPageCountTask { private def findAttachments[F[_]] = RAttachment.findAllWithoutPageCount(50) - private def createJobs[F[_]: Sync](ras: Chunk[RAttachment]): Stream[F, RJob] = { - def mkJob(ra: RAttachment): F[RJob] = + private def createJobs[F[_]: Sync](ras: Chunk[RAttachment]): Stream[F, Job[String]] = { + def mkJob(ra: RAttachment): F[Job[MakePageCountArgs]] = JobFactory.makePageCount(MakePageCountArgs(ra.id), None) val jobs = ras.traverse(mkJob) - Stream.evalUnChunk(jobs) + Stream.evalUnChunk(jobs).map(_.encode) } - def job[F[_]: Sync]: F[RJob] = - for { - id <- Ident.randomId[F] - now <- Timestamp.current[F] - } yield RJob.newJob( - id, - AllPageCountTask.taskName, - DocspellSystem.taskGroup, - (), - "Create all page-counts", - now, - DocspellSystem.taskGroup, - Priority.Low, - Some(DocspellSystem.allPageCountTaskTracker) - ) + def job[F[_]: Sync]: F[Job[String]] = + Job + .createNew( + AllPageCountTask.taskName, + DocspellSystem.taskGroup, + (), + "Create all page-counts", + DocspellSystem.taskGroup, + Priority.Low, + Some(DocspellSystem.allPageCountTaskTracker) + ) + .map(_.encode) } diff --git a/modules/joex/src/main/scala/docspell/joex/pdfconv/ConvertAllPdfTask.scala b/modules/joex/src/main/scala/docspell/joex/pdfconv/ConvertAllPdfTask.scala index a0865156..f1b30c5a 100644 --- a/modules/joex/src/main/scala/docspell/joex/pdfconv/ConvertAllPdfTask.scala +++ b/modules/joex/src/main/scala/docspell/joex/pdfconv/ConvertAllPdfTask.scala @@ -11,9 +11,8 @@ import cats.implicits._ import fs2.{Chunk, Stream} import docspell.backend.ops.OJoex import docspell.common._ -import docspell.scheduler.{Context, JobQueue, Task} +import docspell.scheduler.{Context, Job, JobStore, Task} import docspell.store.records.RAttachment -import docspell.store.records._ /* A task to find all non-converted pdf files (of a collective, or * all) and converting them using ocrmypdf by submitting a job for @@ -22,11 +21,11 @@ import docspell.store.records._ object ConvertAllPdfTask { type Args = ConvertAllPdfArgs - def apply[F[_]: Sync](queue: JobQueue[F], joex: OJoex[F]): Task[F, Args, Unit] = + def apply[F[_]: Sync](jobStore: JobStore[F], joex: OJoex[F]): Task[F, Args, Unit] = Task { ctx => for { _ <- ctx.logger.info("Converting pdfs using ocrmypdf") - n <- submitConversionJobs(ctx, queue) + n <- submitConversionJobs(ctx, jobStore) _ <- ctx.logger.info(s"Submitted $n file conversion jobs") _ <- joex.notifyAllNodes } yield () @@ -37,40 +36,35 @@ object ConvertAllPdfTask { def submitConversionJobs[F[_]: Sync]( ctx: Context[F, Args], - queue: JobQueue[F] + jobStore: JobStore[F] ): F[Int] = ctx.store .transact(RAttachment.findNonConvertedPdf(ctx.args.collective, 50)) .chunks .flatMap(createJobs[F](ctx)) .chunks - .evalMap(jobs => queue.insertAllIfNew(jobs.toVector).map(_ => jobs.size)) + .evalMap(jobs => jobStore.insertAllIfNew(jobs.toVector).map(_ => jobs.size)) .evalTap(n => ctx.logger.debug(s"Submitted $n jobs …")) .compile .foldMonoid private def createJobs[F[_]: Sync]( ctx: Context[F, Args] - )(ras: Chunk[RAttachment]): Stream[F, RJob] = { + )(ras: Chunk[RAttachment]): Stream[F, Job[String]] = { val collectiveOrSystem = ctx.args.collective.getOrElse(DocspellSystem.taskGroup) - def mkJob(ra: RAttachment): F[RJob] = - for { - id <- Ident.randomId[F] - now <- Timestamp.current[F] - } yield RJob.newJob( - id, + def mkJob(ra: RAttachment): F[Job[PdfConvTask.Args]] = + Job.createNew( PdfConvTask.taskName, collectiveOrSystem, PdfConvTask.Args(ra.id), s"Convert pdf ${ra.id.id}/${ra.name.getOrElse("-")}", - now, collectiveOrSystem, Priority.Low, Some(PdfConvTask.taskName / ra.id) ) val jobs = ras.traverse(mkJob) - Stream.evalUnChunk(jobs) + Stream.evalUnChunk(jobs).map(_.encode) } } diff --git a/modules/joex/src/main/scala/docspell/joex/preview/AllPreviewsTask.scala b/modules/joex/src/main/scala/docspell/joex/preview/AllPreviewsTask.scala index 849b4f7a..100ace8f 100644 --- a/modules/joex/src/main/scala/docspell/joex/preview/AllPreviewsTask.scala +++ b/modules/joex/src/main/scala/docspell/joex/preview/AllPreviewsTask.scala @@ -13,19 +13,18 @@ import docspell.backend.JobFactory import docspell.backend.ops.OJoex import docspell.common.MakePreviewArgs.StoreMode import docspell.common._ -import docspell.scheduler.{Context, JobQueue, Task} +import docspell.scheduler.{Context, Job, JobStore, Task} import docspell.store.records.RAttachment -import docspell.store.records.RJob object AllPreviewsTask { type Args = AllPreviewsArgs - def apply[F[_]: Sync](queue: JobQueue[F], joex: OJoex[F]): Task[F, Args, Unit] = + def apply[F[_]: Sync](jobStore: JobStore[F], joex: OJoex[F]): Task[F, Args, Unit] = Task { ctx => for { _ <- ctx.logger.info("Generating previews for attachments") - n <- submitConversionJobs(ctx, queue) + n <- submitConversionJobs(ctx, jobStore) _ <- ctx.logger.info(s"Submitted $n jobs") _ <- joex.notifyAllNodes } yield () @@ -36,14 +35,16 @@ object AllPreviewsTask { def submitConversionJobs[F[_]: Sync]( ctx: Context[F, Args], - queue: JobQueue[F] + jobStore: JobStore[F] ): F[Int] = ctx.store .transact(findAttachments(ctx)) .chunks .flatMap(createJobs[F](ctx)) .chunks - .evalMap(jobs => queue.insertAllIfNew(jobs.toVector).map(_ => jobs.size)) + .evalMap(jobs => + jobStore.insertAllIfNew(jobs.map(_.encode).toVector).map(_ => jobs.size) + ) .evalTap(n => ctx.logger.debug(s"Submitted $n jobs …")) .compile .foldMonoid @@ -58,13 +59,13 @@ object AllPreviewsTask { private def createJobs[F[_]: Sync]( ctx: Context[F, Args] - )(ras: Chunk[RAttachment]): Stream[F, RJob] = { + )(ras: Chunk[RAttachment]): Stream[F, Job[MakePreviewArgs]] = { val collectiveOrSystem = { val cid = ctx.args.collective.getOrElse(DocspellSystem.taskGroup) AccountId(cid, DocspellSystem.user) } - def mkJob(ra: RAttachment): F[RJob] = + def mkJob(ra: RAttachment): F[Job[MakePreviewArgs]] = JobFactory.makePreview( MakePreviewArgs(ra.id, ctx.args.storeMode), collectiveOrSystem.some @@ -74,7 +75,10 @@ object AllPreviewsTask { Stream.evalUnChunk(jobs) } - def job[F[_]: Sync](storeMode: MakePreviewArgs.StoreMode, cid: Option[Ident]): F[RJob] = - JobFactory.allPreviews(AllPreviewsArgs(cid, storeMode), None) + def job[F[_]: Sync]( + storeMode: MakePreviewArgs.StoreMode, + cid: Option[Ident] + ): F[Job[String]] = + JobFactory.allPreviews(AllPreviewsArgs(cid, storeMode), None).map(_.encode) } diff --git a/modules/joex/src/main/scala/docspell/joex/process/ItemHandler.scala b/modules/joex/src/main/scala/docspell/joex/process/ItemHandler.scala index 21627f56..b3a5ae7b 100644 --- a/modules/joex/src/main/scala/docspell/joex/process/ItemHandler.scala +++ b/modules/joex/src/main/scala/docspell/joex/process/ItemHandler.scala @@ -65,7 +65,7 @@ object ItemHandler { .map(_ => data) ) - def isLastRetry[F[_]: Sync]: Task[F, Args, Boolean] = + def isLastRetry[F[_]]: Task[F, Args, Boolean] = Task(_.isLastRetry) def safeProcess[F[_]: Async]( diff --git a/modules/joex/src/main/scala/docspell/joex/process/ReProcessItem.scala b/modules/joex/src/main/scala/docspell/joex/process/ReProcessItem.scala index 834832a5..97091b75 100644 --- a/modules/joex/src/main/scala/docspell/joex/process/ReProcessItem.scala +++ b/modules/joex/src/main/scala/docspell/joex/process/ReProcessItem.scala @@ -141,7 +141,7 @@ object ReProcessItem { lang1.orElse(lang2).getOrElse(Language.German) } - def isLastRetry[F[_]: Sync]: Task[F, Args, Boolean] = + def isLastRetry[F[_]]: Task[F, Args, Boolean] = Task(_.isLastRetry) def safeProcess[F[_]: Async]( diff --git a/modules/joex/src/main/scala/docspell/joex/updatecheck/UpdateCheckTask.scala b/modules/joex/src/main/scala/docspell/joex/updatecheck/UpdateCheckTask.scala index 94db11b5..6da9e345 100644 --- a/modules/joex/src/main/scala/docspell/joex/updatecheck/UpdateCheckTask.scala +++ b/modules/joex/src/main/scala/docspell/joex/updatecheck/UpdateCheckTask.scala @@ -12,8 +12,7 @@ import cats.implicits._ import docspell.common._ import docspell.scheduler.Context import docspell.scheduler.Task -import docspell.scheduler.usertask.{UserTask, UserTaskScope} -import docspell.store.records.RPeriodicTask +import docspell.scheduler.usertask.UserTask import docspell.store.records.RUserEmail import emil._ @@ -25,18 +24,15 @@ object UpdateCheckTask { def onCancel[F[_]]: Task[F, Args, Unit] = Task.log(_.warn("Cancelling update-check task")) - def periodicTask[F[_]: Sync](cfg: UpdateCheckConfig): F[RPeriodicTask] = + def periodicTask[F[_]: Sync](cfg: UpdateCheckConfig): F[UserTask[Unit]] = UserTask( Ident.unsafe("docspell-update-check"), taskName, cfg.enabled, cfg.schedule, - None, + "Docspell Update Check".some, () - ).encode.toPeriodicTask( - UserTaskScope(cfg.senderAccount.collective), - "Docspell Update Check".some - ) + ).pure[F] def apply[F[_]: Async]( cfg: UpdateCheckConfig, diff --git a/modules/restserver/src/main/scala/docspell/restserver/RestAppImpl.scala b/modules/restserver/src/main/scala/docspell/restserver/RestAppImpl.scala index 7ed9d2dc..0ff2957b 100644 --- a/modules/restserver/src/main/scala/docspell/restserver/RestAppImpl.scala +++ b/modules/restserver/src/main/scala/docspell/restserver/RestAppImpl.scala @@ -9,7 +9,6 @@ package docspell.restserver import cats.effect._ import fs2.Stream import fs2.concurrent.Topic - import docspell.backend.BackendApp import docspell.backend.auth.{AuthToken, ShareToken} import docspell.ftsclient.FtsClient @@ -23,8 +22,8 @@ import docspell.restserver.http4s.EnvMiddleware import docspell.restserver.routes._ import docspell.restserver.webapp.{TemplateRoutes, Templates, WebjarRoutes} import docspell.restserver.ws.{OutputEvent, WebSocketRoutes} +import docspell.scheduler.impl.JobStoreModuleBuilder import docspell.store.Store - import emil.javamail.JavaMailEmil import org.http4s.HttpRoutes import org.http4s.client.Client @@ -167,8 +166,12 @@ object RestAppImpl { notificationMod <- Resource.eval( NotificationModuleImpl[F](store, javaEmil, httpClient, 200) ) + schedulerMod = JobStoreModuleBuilder(store) + .withPubsub(pubSubT) + .withEventSink(notificationMod) + .build backend <- BackendApp - .create[F](store, javaEmil, ftsClient, pubSubT, notificationMod) + .create[F](store, javaEmil, ftsClient, pubSubT, schedulerMod, notificationMod) app = new RestAppImpl[F]( cfg, diff --git a/modules/scheduler/api/src/main/scala/docspell/scheduler/Context.scala b/modules/scheduler/api/src/main/scala/docspell/scheduler/Context.scala index d5380f94..abefafa5 100644 --- a/modules/scheduler/api/src/main/scala/docspell/scheduler/Context.scala +++ b/modules/scheduler/api/src/main/scala/docspell/scheduler/Context.scala @@ -6,14 +6,9 @@ package docspell.scheduler -import cats.effect._ -import cats.implicits._ -import cats.{Applicative, Functor} - import docspell.common._ import docspell.logging.Logger import docspell.store.Store -import docspell.store.records.RJob trait Context[F[_], A] { self => @@ -29,54 +24,8 @@ trait Context[F[_], A] { self => def store: Store[F] - final def isLastRetry(implicit ev: Applicative[F]): F[Boolean] = - for { - current <- store.transact(RJob.getRetries(jobId)) - last = config.retries == current.getOrElse(0) - } yield last + def isLastRetry: F[Boolean] + + def map[C](f: A => C): Context[F, C] - def map[C](f: A => C)(implicit F: Functor[F]): Context[F, C] = - new Context.ContextImpl[F, C](f(args), logger, store, config, jobId) -} - -object Context { - - def create[F[_]: Async, A]( - jobId: Ident, - arg: A, - config: SchedulerConfig, - log: Logger[F], - store: Store[F] - ): Context[F, A] = - new ContextImpl(arg, log, store, config, jobId) - - def apply[F[_]: Async, A]( - job: RJob, - arg: A, - config: SchedulerConfig, - logSink: LogSink[F], - store: Store[F] - ): F[Context[F, A]] = { - val log = docspell.logging.getLogger[F] - for { - _ <- log.trace("Creating logger for task run") - logger <- QueueLogger(job.id, job.info, config.logBufferSize, logSink) - _ <- log.trace("Logger created, instantiating context") - ctx = create[F, A](job.id, arg, config, logger, store) - } yield ctx - } - - final private class ContextImpl[F[_]: Functor, A]( - val args: A, - val logger: Logger[F], - val store: Store[F], - val config: SchedulerConfig, - val jobId: Ident - ) extends Context[F, A] { - - def setProgress(percent: Int): F[Unit] = { - val pval = math.min(100, math.max(0, percent)) - store.transact(RJob.setProgress(jobId, pval)).map(_ => ()) - } - } } diff --git a/modules/scheduler/api/src/main/scala/docspell/scheduler/Job.scala b/modules/scheduler/api/src/main/scala/docspell/scheduler/Job.scala new file mode 100644 index 00000000..9dc08b5c --- /dev/null +++ b/modules/scheduler/api/src/main/scala/docspell/scheduler/Job.scala @@ -0,0 +1,36 @@ +package docspell.scheduler + +import cats.effect.Sync +import cats.syntax.functor._ +import docspell.common._ +import io.circe.Encoder + +final case class Job[A]( + id: Ident, + task: Ident, + group: Ident, + args: A, + subject: String, + submitter: Ident, + priority: Priority, + tracker: Option[Ident] +) { + + def encode(implicit E: Encoder[A]): Job[String] = + Job(id, task, group, E.apply(args).noSpaces, subject, submitter, priority, tracker) +} + +object Job { + def createNew[F[_]: Sync, A]( + task: Ident, + group: Ident, + args: A, + subject: String, + submitter: Ident, + priority: Priority, + tracker: Option[Ident] + ): F[Job[A]] = + Ident.randomId[F].map { id => + Job(id, task, group, args, subject, submitter, priority, tracker) + } +} diff --git a/modules/scheduler/api/src/main/scala/docspell/scheduler/JobQueue.scala b/modules/scheduler/api/src/main/scala/docspell/scheduler/JobQueue.scala deleted file mode 100644 index dd927bf9..00000000 --- a/modules/scheduler/api/src/main/scala/docspell/scheduler/JobQueue.scala +++ /dev/null @@ -1,97 +0,0 @@ -/* - * Copyright 2020 Eike K. & Contributors - * - * SPDX-License-Identifier: AGPL-3.0-or-later - */ - -package docspell.scheduler - -import cats.effect._ -import cats.implicits._ -import docspell.common._ -import docspell.store.Store -import docspell.store.queries.QJob -import docspell.store.records.RJob - -trait JobQueue[F[_]] { - - /** Inserts the job into the queue to get picked up as soon as possible. The job must - * have a new unique id. - */ - def insert(job: RJob): F[Unit] - - /** Inserts the job into the queue only, if there is no job with the same tracker-id - * running at the moment. The job id must be a new unique id. - * - * If the job has no tracker defined, it is simply inserted. - */ - def insertIfNew(job: RJob): F[Boolean] - - def insertAll(jobs: Seq[RJob]): F[List[Boolean]] - - def insertAllIfNew(jobs: Seq[RJob]): F[List[Boolean]] - - def nextJob( - prio: Ident => F[Priority], - worker: Ident, - retryPause: Duration - ): F[Option[RJob]] -} - -object JobQueue { - private[scheduler] def create[F[_]: Async](store: Store[F]): Resource[F, JobQueue[F]] = - Resource.pure[F, JobQueue[F]](new JobQueue[F] { - private[this] val logger = docspell.logging.getLogger[F] - - def nextJob( - prio: Ident => F[Priority], - worker: Ident, - retryPause: Duration - ): F[Option[RJob]] = - logger - .trace("Select next job") *> QJob.takeNextJob(store)(prio, worker, retryPause) - - def insert(job: RJob): F[Unit] = - store - .transact(RJob.insert(job)) - .flatMap { n => - if (n != 1) - Async[F] - .raiseError(new Exception(s"Inserting job failed. Update count: $n")) - else ().pure[F] - } - - def insertIfNew(job: RJob): F[Boolean] = - for { - rj <- job.tracker match { - case Some(tid) => - store.transact(RJob.findNonFinalByTracker(tid)) - case None => - None.pure[F] - } - ret <- - if (rj.isDefined) false.pure[F] - else insert(job).as(true) - } yield ret - - def insertAll(jobs: Seq[RJob]): F[List[Boolean]] = - jobs.toList - .traverse(j => insert(j).attempt) - .flatMap(_.traverse { - case Right(()) => true.pure[F] - case Left(ex) => - logger.error(ex)("Could not insert job. Skipping it.").as(false) - - }) - - def insertAllIfNew(jobs: Seq[RJob]): F[List[Boolean]] = - jobs.toList - .traverse(j => insertIfNew(j).attempt) - .flatMap(_.traverse { - case Right(true) => true.pure[F] - case Right(false) => false.pure[F] - case Left(ex) => - logger.error(ex)("Could not insert job. Skipping it.").as(false) - }) - }) -} diff --git a/modules/scheduler/api/src/main/scala/docspell/scheduler/JobStore.scala b/modules/scheduler/api/src/main/scala/docspell/scheduler/JobStore.scala new file mode 100644 index 00000000..8ca887f9 --- /dev/null +++ b/modules/scheduler/api/src/main/scala/docspell/scheduler/JobStore.scala @@ -0,0 +1,21 @@ +package docspell.scheduler + +trait JobStore[F[_]] { + + /** Inserts the job into the queue to get picked up as soon as possible. The job must + * have a new unique id. + */ + def insert(job: Job[String]): F[Unit] + + /** Inserts the job into the queue only, if there is no job with the same tracker-id + * running at the moment. The job id must be a new unique id. + * + * If the job has no tracker defined, it is simply inserted. + */ + def insertIfNew(job: Job[String]): F[Boolean] + + def insertAll(jobs: Seq[Job[String]]): F[List[Boolean]] + + def insertAllIfNew(jobs: Seq[Job[String]]): F[List[Boolean]] + +} diff --git a/modules/scheduler/api/src/main/scala/docspell/scheduler/JobStoreModule.scala b/modules/scheduler/api/src/main/scala/docspell/scheduler/JobStoreModule.scala new file mode 100644 index 00000000..2d170127 --- /dev/null +++ b/modules/scheduler/api/src/main/scala/docspell/scheduler/JobStoreModule.scala @@ -0,0 +1,9 @@ +package docspell.scheduler + +import docspell.scheduler.usertask.UserTaskStore + +trait JobStoreModule[F[_]] { + + def userTasks: UserTaskStore[F] + def jobs: JobStore[F] +} diff --git a/modules/scheduler/api/src/main/scala/docspell/scheduler/JobTask.scala b/modules/scheduler/api/src/main/scala/docspell/scheduler/JobTask.scala index e4c5074f..a7198378 100644 --- a/modules/scheduler/api/src/main/scala/docspell/scheduler/JobTask.scala +++ b/modules/scheduler/api/src/main/scala/docspell/scheduler/JobTask.scala @@ -43,7 +43,7 @@ object JobTask { str.parseJsonAs[A] match { case Right(a) => a.pure[F] case Left(ex) => - Sync[F].raiseError(new Exception(s"Cannot parse task arguments: $str", ex)) + Sync[F].raiseError(new Exception(s"Cannot parse task arguments: '$str'", ex)) } JobTask(name, task.contramap(convert).map(E.encode), onCancel.contramap(convert)) diff --git a/modules/scheduler/api/src/main/scala/docspell/scheduler/PeriodicSchedulerConfig.scala b/modules/scheduler/api/src/main/scala/docspell/scheduler/PeriodicSchedulerConfig.scala index 7445e9e3..6fa82069 100644 --- a/modules/scheduler/api/src/main/scala/docspell/scheduler/PeriodicSchedulerConfig.scala +++ b/modules/scheduler/api/src/main/scala/docspell/scheduler/PeriodicSchedulerConfig.scala @@ -12,3 +12,8 @@ case class PeriodicSchedulerConfig( name: Ident, wakeupPeriod: Duration ) + +object PeriodicSchedulerConfig { + def default(id: Ident): PeriodicSchedulerConfig = + PeriodicSchedulerConfig(id, Duration.minutes(10)) +} diff --git a/modules/scheduler/api/src/main/scala/docspell/scheduler/SchedulerConfig.scala b/modules/scheduler/api/src/main/scala/docspell/scheduler/SchedulerConfig.scala index fbbc4f3c..25fad892 100644 --- a/modules/scheduler/api/src/main/scala/docspell/scheduler/SchedulerConfig.scala +++ b/modules/scheduler/api/src/main/scala/docspell/scheduler/SchedulerConfig.scala @@ -20,11 +20,10 @@ case class SchedulerConfig( object SchedulerConfig { - val default = SchedulerConfig( - name = Ident.unsafe("default-scheduler"), - poolSize = 2 // math.max(2, Runtime.getRuntime.availableProcessors / 2) - , - countingScheme = CountingScheme(2, 1), + def default(id: Ident) = SchedulerConfig( + name = id, + poolSize = 1, + countingScheme = CountingScheme(3, 1), retries = 5, retryDelay = Duration.seconds(30), logBufferSize = 500, diff --git a/modules/scheduler/api/src/main/scala/docspell/scheduler/SchedulerModule.scala b/modules/scheduler/api/src/main/scala/docspell/scheduler/SchedulerModule.scala new file mode 100644 index 00000000..d422f200 --- /dev/null +++ b/modules/scheduler/api/src/main/scala/docspell/scheduler/SchedulerModule.scala @@ -0,0 +1,6 @@ +package docspell.scheduler + +trait SchedulerModule[F[_]] { + def scheduler: Scheduler[F] + def periodicScheduler: PeriodicScheduler[F] +} diff --git a/modules/scheduler/api/src/main/scala/docspell/scheduler/usertask/UserTask.scala b/modules/scheduler/api/src/main/scala/docspell/scheduler/usertask/UserTask.scala index 1d2dcd65..f700da5c 100644 --- a/modules/scheduler/api/src/main/scala/docspell/scheduler/usertask/UserTask.scala +++ b/modules/scheduler/api/src/main/scala/docspell/scheduler/usertask/UserTask.scala @@ -6,15 +6,11 @@ package docspell.scheduler.usertask -import cats.effect._ -import cats.implicits._ import com.github.eikek.calev.CalEvent import docspell.common._ -import docspell.common.syntax.all._ -import docspell.store.records.RPeriodicTask -import io.circe.{Decoder, Encoder} +import io.circe.Encoder -case class UserTask[A]( +final case class UserTask[A]( id: Ident, name: Ident, enabled: Boolean, @@ -32,33 +28,3 @@ case class UserTask[A]( def mapArgs[B](f: A => B): UserTask[B] = withArgs(f(args)) } - -object UserTask { - - implicit final class UserTaskCodec(ut: UserTask[String]) { - - def decode[A](implicit D: Decoder[A]): Either[String, UserTask[A]] = - ut.args - .parseJsonAs[A] - .left - .map(_.getMessage) - .map(a => ut.copy(args = a)) - - def toPeriodicTask[F[_]: Sync]( - scope: UserTaskScope, - subject: Option[String] - ): F[RPeriodicTask] = - QUserTask - .create[F]( - ut.enabled, - scope, - ut.name, - ut.args, - subject.getOrElse(s"${scope.fold(_.user.id, _.id)}: ${ut.name.id}"), - Priority.Low, - ut.timer, - ut.summary - ) - .map(r => r.copy(id = ut.id)) - } -} diff --git a/modules/scheduler/api/src/main/scala/docspell/scheduler/usertask/UserTaskScope.scala b/modules/scheduler/api/src/main/scala/docspell/scheduler/usertask/UserTaskScope.scala index 45ba0311..236c7ee6 100644 --- a/modules/scheduler/api/src/main/scala/docspell/scheduler/usertask/UserTaskScope.scala +++ b/modules/scheduler/api/src/main/scala/docspell/scheduler/usertask/UserTaskScope.scala @@ -20,7 +20,7 @@ sealed trait UserTaskScope { self: Product => /** Maps to the account or uses the collective for both parts if the scope is collective * wide. */ - private[usertask] def toAccountId: AccountId = + private[scheduler] def toAccountId: AccountId = AccountId(collective, fold(_.user, identity)) } @@ -49,4 +49,7 @@ object UserTaskScope { def apply(collective: Ident): UserTaskScope = UserTaskScope.collective(collective) + + def system: UserTaskScope = + collective(DocspellSystem.taskGroup) } diff --git a/modules/scheduler/api/src/main/scala/docspell/scheduler/usertask/UserTaskStore.scala b/modules/scheduler/api/src/main/scala/docspell/scheduler/usertask/UserTaskStore.scala index 0570531d..2a3dcf0e 100644 --- a/modules/scheduler/api/src/main/scala/docspell/scheduler/usertask/UserTaskStore.scala +++ b/modules/scheduler/api/src/main/scala/docspell/scheduler/usertask/UserTaskStore.scala @@ -7,10 +7,7 @@ package docspell.scheduler.usertask import cats.data.OptionT -import cats.effect._ -import cats.implicits._ import docspell.common._ -import docspell.store.{AddResult, Store} import fs2.Stream import io.circe._ @@ -88,96 +85,11 @@ trait UserTaskStore[F[_]] { /** Delete all tasks of the given user that have name `name`. */ def deleteAll(scope: UserTaskScope, name: Ident): F[Int] -} - -object UserTaskStore { - - def apply[F[_]: Async](store: Store[F]): Resource[F, UserTaskStore[F]] = - Resource.pure[F, UserTaskStore[F]](new UserTaskStore[F] { - - def getAll(scope: UserTaskScope): Stream[F, UserTask[String]] = - store.transact(QUserTask.findAll(scope.toAccountId)) - - def getByNameRaw(scope: UserTaskScope, name: Ident): Stream[F, UserTask[String]] = - store.transact(QUserTask.findByName(scope.toAccountId, name)) - - def getByIdRaw(scope: UserTaskScope, id: Ident): OptionT[F, UserTask[String]] = - OptionT(store.transact(QUserTask.findById(scope.toAccountId, id))) - - def getByName[A](scope: UserTaskScope, name: Ident)(implicit - D: Decoder[A] - ): Stream[F, UserTask[A]] = - getByNameRaw(scope, name).flatMap(_.decode match { - case Right(ua) => Stream.emit(ua) - case Left(err) => Stream.raiseError[F](new Exception(err)) - }) - - def updateTask[A](scope: UserTaskScope, subject: Option[String], ut: UserTask[A])( - implicit E: Encoder[A] - ): F[Int] = { - val exists = QUserTask.exists(ut.id) - val insert = QUserTask.insert(scope, subject, ut.encode) - store.add(insert, exists).flatMap { - case AddResult.Success => - 1.pure[F] - case AddResult.EntityExists(_) => - store.transact(QUserTask.update(scope, subject, ut.encode)) - case AddResult.Failure(ex) => - Async[F].raiseError(ex) - } - } - - def deleteTask(scope: UserTaskScope, id: Ident): F[Int] = - store.transact(QUserTask.delete(scope.toAccountId, id)) - - def getOneByNameRaw( - scope: UserTaskScope, - name: Ident - ): OptionT[F, UserTask[String]] = - OptionT( - getByNameRaw(scope, name) - .take(2) - .compile - .toList - .flatMap { - case Nil => (None: Option[UserTask[String]]).pure[F] - case ut :: Nil => ut.some.pure[F] - case _ => Async[F].raiseError(new Exception("More than one result found")) - } - ) - - def getOneByName[A](scope: UserTaskScope, name: Ident)(implicit - D: Decoder[A] - ): OptionT[F, UserTask[A]] = - getOneByNameRaw(scope, name) - .semiflatMap(_.decode match { - case Right(ua) => ua.pure[F] - case Left(err) => Async[F].raiseError(new Exception(err)) - }) - - def updateOneTask[A]( - scope: UserTaskScope, - subject: Option[String], - ut: UserTask[A] - )(implicit - E: Encoder[A] - ): F[UserTask[String]] = - getByNameRaw(scope, ut.name).compile.toList.flatMap { - case a :: rest => - val task = ut.copy(id = a.id).encode - for { - _ <- store.transact(QUserTask.update(scope, subject, task)) - _ <- store.transact( - rest.traverse(t => QUserTask.delete(scope.toAccountId, t.id)) - ) - } yield task - case Nil => - val task = ut.encode - store.transact(QUserTask.insert(scope, subject, task)).map(_ => task) - } - - def deleteAll(scope: UserTaskScope, name: Ident): F[Int] = - store.transact(QUserTask.deleteAll(scope.toAccountId, name)) - }) - + + /** Discards the schedule and immediately submits the task to the job executor's queue. + * It will not update the corresponding periodic task. + */ + def executeNow[A](scope: UserTaskScope, subject: Option[String], task: UserTask[A])( + implicit E: Encoder[A] + ): F[Unit] } diff --git a/modules/joex/src/test/scala/docspell/joex/scheduler/CountingSchemeSpec.scala b/modules/scheduler/api/src/test/scala/docspell/scheduler/CountingSchemeSpec.scala similarity index 92% rename from modules/joex/src/test/scala/docspell/joex/scheduler/CountingSchemeSpec.scala rename to modules/scheduler/api/src/test/scala/docspell/scheduler/CountingSchemeSpec.scala index cf819f2f..1339d220 100644 --- a/modules/joex/src/test/scala/docspell/joex/scheduler/CountingSchemeSpec.scala +++ b/modules/scheduler/api/src/test/scala/docspell/scheduler/CountingSchemeSpec.scala @@ -4,10 +4,9 @@ * SPDX-License-Identifier: AGPL-3.0-or-later */ -package docspell.joex.scheduler +package docspell.scheduler import docspell.common.Priority - import munit._ class CountingSchemeSpec extends FunSuite { diff --git a/modules/scheduler/impl/src/main/scala/docspell/scheduler/impl/ContextImpl.scala b/modules/scheduler/impl/src/main/scala/docspell/scheduler/impl/ContextImpl.scala new file mode 100644 index 00000000..45647f03 --- /dev/null +++ b/modules/scheduler/impl/src/main/scala/docspell/scheduler/impl/ContextImpl.scala @@ -0,0 +1,61 @@ +package docspell.scheduler.impl + +import cats._ +import cats.syntax.all._ +import cats.effect._ + +import docspell.common._ +import docspell.logging.Logger +import docspell.scheduler._ +import docspell.store.Store +import docspell.store.records.RJob + +class ContextImpl[F[_]: Functor, A]( + val args: A, + val logger: Logger[F], + val store: Store[F], + val config: SchedulerConfig, + val jobId: Ident +) extends Context[F, A] { + + def setProgress(percent: Int): F[Unit] = { + val pval = math.min(100, math.max(0, percent)) + store.transact(RJob.setProgress(jobId, pval)).map(_ => ()) + } + + def isLastRetry: F[Boolean] = + for { + current <- store.transact(RJob.getRetries(jobId)) + last = config.retries == current.getOrElse(0) + } yield last + + def map[C](f: A => C) = + new ContextImpl[F, C](f(args), logger, store, config, jobId) +} + +object ContextImpl { + def create[F[_]: Async, A]( + jobId: Ident, + arg: A, + config: SchedulerConfig, + log: Logger[F], + store: Store[F] + ): Context[F, A] = + new ContextImpl(arg, log, store, config, jobId) + + def apply[F[_]: Async, A]( + job: RJob, + arg: A, + config: SchedulerConfig, + logSink: LogSink[F], + store: Store[F] + ): F[Context[F, A]] = { + val log = docspell.logging.getLogger[F] + for { + _ <- log.trace("Creating logger for task run") + logger <- QueueLogger(job.id, job.info, config.logBufferSize, logSink) + _ <- log.trace("Logger created, instantiating context") + ctx = create[F, A](job.id, arg, config, logger, store) + } yield ctx + } +} diff --git a/modules/scheduler/impl/src/main/scala/docspell/scheduler/impl/JobQueue.scala b/modules/scheduler/impl/src/main/scala/docspell/scheduler/impl/JobQueue.scala new file mode 100644 index 00000000..0c190723 --- /dev/null +++ b/modules/scheduler/impl/src/main/scala/docspell/scheduler/impl/JobQueue.scala @@ -0,0 +1,37 @@ +/* + * Copyright 2020 Eike K. & Contributors + * + * SPDX-License-Identifier: AGPL-3.0-or-later + */ + +package docspell.scheduler.impl + +import cats.effect._ +import cats.implicits._ +import docspell.common._ +import docspell.store.Store +import docspell.store.queries.QJob +import docspell.store.records.RJob + +trait JobQueue[F[_]] { + def nextJob( + prio: Ident => F[Priority], + worker: Ident, + retryPause: Duration + ): F[Option[RJob]] +} + +object JobQueue { + private[scheduler] def apply[F[_]: Async](store: Store[F]): JobQueue[F] = + new JobQueue[F] { + private[this] val logger = docspell.logging.getLogger[F] + + def nextJob( + prio: Ident => F[Priority], + worker: Ident, + retryPause: Duration + ): F[Option[RJob]] = + logger + .trace("Select next job") *> QJob.takeNextJob(store)(prio, worker, retryPause) + } +} diff --git a/modules/scheduler/impl/src/main/scala/docspell/scheduler/impl/JobStoreImpl.scala b/modules/scheduler/impl/src/main/scala/docspell/scheduler/impl/JobStoreImpl.scala new file mode 100644 index 00000000..f2507301 --- /dev/null +++ b/modules/scheduler/impl/src/main/scala/docspell/scheduler/impl/JobStoreImpl.scala @@ -0,0 +1,87 @@ +package docspell.scheduler.impl + +import cats.effect.Sync +import cats.syntax.all._ + +import docspell.common.Timestamp +import docspell.scheduler._ + +import docspell.store.Store +import docspell.store.records.RJob + +final class JobStoreImpl[F[_]: Sync](store: Store[F]) extends JobStore[F] { + private[this] val logger = docspell.logging.getLogger[F] + + def insert(job: Job[String]): F[Unit] = + for { + now <- Timestamp.current[F] + _ <- insert0(job, now) + } yield () + + def insert0(job: Job[String], submitted: Timestamp): F[Unit] = + store + .transact(RJob.insert(toRecord(job, submitted))) + .flatMap { n => + if (n != 1) + Sync[F] + .raiseError(new Exception(s"Inserting job failed. Update count: $n")) + else ().pure[F] + } + + def insertIfNew(job: Job[String]): F[Boolean] = + Timestamp.current[F].flatMap(now => insertIfNew0(job, now)) + + def insertIfNew0(job: Job[String], submitted: Timestamp): F[Boolean] = + for { + rj <- job.tracker match { + case Some(tid) => + store.transact(RJob.findNonFinalByTracker(tid)) + case None => + None.pure[F] + } + ret <- + if (rj.isDefined) false.pure[F] + else insert0(job, submitted).as(true) + } yield ret + + def insertAll(jobs: Seq[Job[String]]): F[List[Boolean]] = + Timestamp.current[F].flatMap { now => + jobs.toList + .traverse(j => insert0(j, now).attempt) + .flatMap(_.traverse { + case Right(()) => true.pure[F] + case Left(ex) => + logger.error(ex)("Could not insert job. Skipping it.").as(false) + }) + } + + def insertAllIfNew(jobs: Seq[Job[String]]) = + Timestamp.current[F].flatMap { now => + jobs.toList + .traverse(j => insertIfNew0(j, now).attempt) + .flatMap(_.traverse { + case Right(true) => true.pure[F] + case Right(false) => false.pure[F] + case Left(ex) => + logger.error(ex)("Could not insert job. Skipping it.").as(false) + }) + } + + def toRecord(job: Job[String], timestamp: Timestamp): RJob = + RJob.newJob( + job.id, + job.task, + job.group, + job.args, + job.subject, + timestamp, + job.submitter, + job.priority, + job.tracker + ) +} + +object JobStoreImpl { + def apply[F[_]: Sync](store: Store[F]): JobStore[F] = + new JobStoreImpl[F](store) +} diff --git a/modules/scheduler/impl/src/main/scala/docspell/scheduler/impl/JobStoreModuleBuilder.scala b/modules/scheduler/impl/src/main/scala/docspell/scheduler/impl/JobStoreModuleBuilder.scala new file mode 100644 index 00000000..e5762935 --- /dev/null +++ b/modules/scheduler/impl/src/main/scala/docspell/scheduler/impl/JobStoreModuleBuilder.scala @@ -0,0 +1,50 @@ +package docspell.scheduler.impl + +import cats.effect.Async + +import docspell.notification.api.EventSink +import docspell.pubsub.api.PubSubT +import docspell.scheduler._ +import docspell.scheduler.usertask.UserTaskStore +import docspell.store.Store + +case class JobStoreModuleBuilder[F[_]: Async]( + store: Store[F], + pubsub: PubSubT[F], + eventSink: EventSink[F] +) { + def withPubsub(ps: PubSubT[F]): JobStoreModuleBuilder[F] = + copy(pubsub = ps) + + def withEventSink(es: EventSink[F]): JobStoreModuleBuilder[F] = + copy(eventSink = es) + + def build: JobStoreModuleBuilder.Module[F] = { + val jobStore = JobStorePublish(store, pubsub, eventSink) + val periodicTaskStore = PeriodicTaskStore(store, jobStore) + val userTaskStore = UserTaskStoreImpl(store, periodicTaskStore) + new JobStoreModuleBuilder.Module( + userTaskStore, + periodicTaskStore, + jobStore, + store, + eventSink, + pubsub + ) + } +} + +object JobStoreModuleBuilder { + + def apply[F[_]: Async](store: Store[F]): JobStoreModuleBuilder[F] = + JobStoreModuleBuilder(store, PubSubT.noop[F], EventSink.silent[F]) + + final class Module[F[_]]( + val userTasks: UserTaskStore[F], + val periodicTaskStore: PeriodicTaskStore[F], + val jobs: JobStore[F], + val store: Store[F], + val eventSink: EventSink[F], + val pubSubT: PubSubT[F] + ) extends JobStoreModule[F] {} +} diff --git a/modules/scheduler/api/src/main/scala/docspell/scheduler/msg/JobQueuePublish.scala b/modules/scheduler/impl/src/main/scala/docspell/scheduler/impl/JobStorePublish.scala similarity index 60% rename from modules/scheduler/api/src/main/scala/docspell/scheduler/msg/JobQueuePublish.scala rename to modules/scheduler/impl/src/main/scala/docspell/scheduler/impl/JobStorePublish.scala index 7c867a22..f9ced949 100644 --- a/modules/scheduler/api/src/main/scala/docspell/scheduler/msg/JobQueuePublish.scala +++ b/modules/scheduler/impl/src/main/scala/docspell/scheduler/impl/JobStorePublish.scala @@ -4,51 +4,52 @@ * SPDX-License-Identifier: AGPL-3.0-or-later */ -package docspell.scheduler.msg +package docspell.scheduler.impl import cats.effect._ import cats.implicits._ -import docspell.common.{Duration, Ident, Priority} + +import docspell.common.JobState import docspell.notification.api.{Event, EventSink} import docspell.pubsub.api.PubSubT -import docspell.scheduler.JobQueue +import docspell.scheduler._ +import docspell.scheduler.msg.JobSubmitted import docspell.store.Store -import docspell.store.records.RJob -final class JobQueuePublish[F[_]: Sync]( - delegate: JobQueue[F], +final class JobStorePublish[F[_]: Sync]( + delegate: JobStore[F], pubsub: PubSubT[F], eventSink: EventSink[F] -) extends JobQueue[F] { +) extends JobStore[F] { - private def msg(job: RJob): JobSubmitted = + private def msg(job: Job[String]): JobSubmitted = JobSubmitted(job.id, job.group, job.task, job.args) - private def event(job: RJob): Event.JobSubmitted = + private def event(job: Job[String]): Event.JobSubmitted = Event.JobSubmitted( job.id, job.group, job.task, job.args, - job.state, + JobState.waiting, job.subject, job.submitter ) - private def publish(job: RJob): F[Unit] = + private def publish(job: Job[String]): F[Unit] = pubsub.publish1(JobSubmitted.topic, msg(job)).as(()) *> eventSink.offer(event(job)) - def insert(job: RJob) = + def insert(job: Job[String]) = delegate.insert(job).flatTap(_ => publish(job)) - def insertIfNew(job: RJob) = + def insertIfNew(job: Job[String]) = delegate.insertIfNew(job).flatTap { case true => publish(job) case false => ().pure[F] } - def insertAll(jobs: Seq[RJob]) = + def insertAll(jobs: Seq[Job[String]]) = delegate.insertAll(jobs).flatTap { results => results.zip(jobs).traverse { case (res, job) => if (res) publish(job) @@ -56,23 +57,20 @@ final class JobQueuePublish[F[_]: Sync]( } } - def insertAllIfNew(jobs: Seq[RJob]) = + def insertAllIfNew(jobs: Seq[Job[String]]) = delegate.insertAllIfNew(jobs).flatTap { results => results.zip(jobs).traverse { case (res, job) => if (res) publish(job) else ().pure[F] } } - - def nextJob(prio: Ident => F[Priority], worker: Ident, retryPause: Duration) = - delegate.nextJob(prio, worker, retryPause) } -object JobQueuePublish { +object JobStorePublish { def apply[F[_]: Async]( store: Store[F], pubSub: PubSubT[F], eventSink: EventSink[F] - ): Resource[F, JobQueue[F]] = - JobQueue.create(store).map(q => new JobQueuePublish[F](q, pubSub, eventSink)) + ): JobStore[F] = + new JobStorePublish[F](JobStoreImpl(store), pubSub, eventSink) } diff --git a/modules/scheduler/api/src/main/scala/docspell/scheduler/LogSink.scala b/modules/scheduler/impl/src/main/scala/docspell/scheduler/impl/LogSink.scala similarity index 96% rename from modules/scheduler/api/src/main/scala/docspell/scheduler/LogSink.scala rename to modules/scheduler/impl/src/main/scala/docspell/scheduler/impl/LogSink.scala index 6bf34f05..8094b90e 100644 --- a/modules/scheduler/api/src/main/scala/docspell/scheduler/LogSink.scala +++ b/modules/scheduler/impl/src/main/scala/docspell/scheduler/impl/LogSink.scala @@ -4,16 +4,16 @@ * SPDX-License-Identifier: AGPL-3.0-or-later */ -package docspell.scheduler +package docspell.scheduler.impl import cats.effect._ import cats.implicits._ -import fs2.Pipe - import docspell.common._ import docspell.logging +import docspell.scheduler.LogEvent import docspell.store.Store import docspell.store.records.RJobLog +import fs2.Pipe trait LogSink[F[_]] { diff --git a/modules/scheduler/impl/src/main/scala/docspell/scheduler/impl/PeriodicSchedulerBuilder.scala b/modules/scheduler/impl/src/main/scala/docspell/scheduler/impl/PeriodicSchedulerBuilder.scala index f4d1d7ec..a3566f13 100644 --- a/modules/scheduler/impl/src/main/scala/docspell/scheduler/impl/PeriodicSchedulerBuilder.scala +++ b/modules/scheduler/impl/src/main/scala/docspell/scheduler/impl/PeriodicSchedulerBuilder.scala @@ -7,10 +7,8 @@ import fs2.concurrent.SignallingRef object PeriodicSchedulerBuilder { - def build[F[_]: Async]( + def resource[F[_]: Async]( cfg: PeriodicSchedulerConfig, - sch: Scheduler[F], - queue: JobQueue[F], store: PeriodicTaskStore[F], pubsub: PubSubT[F] ): Resource[F, PeriodicScheduler[F]] = @@ -19,8 +17,6 @@ object PeriodicSchedulerBuilder { state <- Resource.eval(SignallingRef(PeriodicSchedulerImpl.emptyState[F])) psch = new PeriodicSchedulerImpl[F]( cfg, - sch, - queue, store, pubsub, waiter, diff --git a/modules/scheduler/impl/src/main/scala/docspell/scheduler/impl/PeriodicSchedulerImpl.scala b/modules/scheduler/impl/src/main/scala/docspell/scheduler/impl/PeriodicSchedulerImpl.scala index fd1a3e0d..43d89324 100644 --- a/modules/scheduler/impl/src/main/scala/docspell/scheduler/impl/PeriodicSchedulerImpl.scala +++ b/modules/scheduler/impl/src/main/scala/docspell/scheduler/impl/PeriodicSchedulerImpl.scala @@ -20,8 +20,6 @@ import eu.timepit.fs2cron.calev.CalevScheduler final class PeriodicSchedulerImpl[F[_]: Async]( val config: PeriodicSchedulerConfig, - sch: Scheduler[F], - queue: JobQueue[F], store: PeriodicTaskStore[F], pubSub: PubSubT[F], waiter: SignallingRef[F, Boolean], @@ -119,11 +117,11 @@ final class PeriodicSchedulerImpl[F[_]: Async]( case None => logger.info(s"Submitting job for periodic task '${pj.task.id}'") *> - pj.toJob.flatMap(queue.insert) *> notifyJoex *> true.pure[F] + store.submit(pj) *> notifyJoex *> true.pure[F] } def notifyJoex: F[Unit] = - sch.notifyChange *> pubSub.publish1IgnoreErrors(JobsNotify(), ()).void + pubSub.publish1IgnoreErrors(JobsNotify(), ()).void def scheduleNotify(pj: RPeriodicTask): F[Unit] = Timestamp diff --git a/modules/scheduler/impl/src/main/scala/docspell/scheduler/impl/PeriodicTaskStore.scala b/modules/scheduler/impl/src/main/scala/docspell/scheduler/impl/PeriodicTaskStore.scala index 54846b1f..1f1ecb52 100644 --- a/modules/scheduler/impl/src/main/scala/docspell/scheduler/impl/PeriodicTaskStore.scala +++ b/modules/scheduler/impl/src/main/scala/docspell/scheduler/impl/PeriodicTaskStore.scala @@ -9,6 +9,7 @@ package docspell.scheduler.impl import cats.effect._ import cats.implicits._ import docspell.common._ +import docspell.scheduler.{Job, JobStore} import docspell.store.queries.QPeriodicTask import docspell.store.records._ import docspell.store.{AddResult, Store} @@ -37,12 +38,18 @@ trait PeriodicTaskStore[F[_]] { /** Find all joex nodes as registered in the database. */ def findJoexNodes: F[Vector[RNode]] + + /** Creates a job from the given task and submits it into the job queue */ + def submit(task: RPeriodicTask): F[Unit] } object PeriodicTaskStore { - def create[F[_]: Sync](store: Store[F]): Resource[F, PeriodicTaskStore[F]] = - Resource.pure[F, PeriodicTaskStore[F]](new PeriodicTaskStore[F] { + def apply[F[_]: Sync]( + store: Store[F], + jobStore: JobStore[F] + ): PeriodicTaskStore[F] = + new PeriodicTaskStore[F] { private[this] val logger = docspell.logging.getLogger[F] def takeNext( worker: Ident, @@ -116,5 +123,22 @@ object PeriodicTaskStore { def findJoexNodes: F[Vector[RNode]] = store.transact(RNode.findAll(NodeType.Joex)) - }) + def submit(task: RPeriodicTask) = + makeJob(task).flatMap(jobStore.insert) + + def makeJob(rt: RPeriodicTask): F[Job[String]] = + Ident.randomId[F].map { id => + Job( + id, + rt.task, + rt.group, + rt.args, + rt.subject, + rt.submitter, + rt.priority, + Some(id) + ) + } + + } } diff --git a/modules/scheduler/api/src/main/scala/docspell/scheduler/usertask/QUserTask.scala b/modules/scheduler/impl/src/main/scala/docspell/scheduler/impl/QUserTask.scala similarity index 79% rename from modules/scheduler/api/src/main/scala/docspell/scheduler/usertask/QUserTask.scala rename to modules/scheduler/impl/src/main/scala/docspell/scheduler/impl/QUserTask.scala index f724dfe9..a1ea1632 100644 --- a/modules/scheduler/api/src/main/scala/docspell/scheduler/usertask/QUserTask.scala +++ b/modules/scheduler/impl/src/main/scala/docspell/scheduler/impl/QUserTask.scala @@ -1,19 +1,48 @@ -package docspell.scheduler.usertask +package docspell.scheduler.impl -import cats.implicits._ import cats.effect.Sync +import cats.syntax.all._ import com.github.eikek.calev.CalEvent -import docspell.common._ +import docspell.common.{AccountId, Ident, Priority, Timestamp} +import docspell.scheduler.usertask.{UserTask, UserTaskScope} import docspell.store.qb.DML import docspell.store.qb.DSL._ import docspell.store.records.RPeriodicTask -import doobie.ConnectionIO import fs2.Stream -import io.circe.Encoder +import io.circe.{Decoder, Encoder} +import doobie._ object QUserTask { private val RT = RPeriodicTask.T + implicit final class UserTaskCodec(ut: UserTask[String]) { + import docspell.common.syntax.all._ + + def decode[A](implicit D: Decoder[A]): Either[String, UserTask[A]] = + ut.args + .parseJsonAs[A] + .left + .map(_.getMessage) + .map(a => ut.copy(args = a)) + + def toPeriodicTask[F[_]: Sync]( + scope: UserTaskScope, + subject: Option[String] + ): F[RPeriodicTask] = + QUserTask + .create[F]( + ut.enabled, + scope, + ut.name, + ut.args, + subject.getOrElse(s"${scope.fold(_.user.id, _.id)}: ${ut.name.id}"), + Priority.Low, + ut.timer, + ut.summary + ) + .map(r => r.copy(id = ut.id)) + } + def findAll(account: AccountId): Stream[ConnectionIO, UserTask[String]] = run( select(RT.all), diff --git a/modules/scheduler/api/src/main/scala/docspell/scheduler/QueueLogger.scala b/modules/scheduler/impl/src/main/scala/docspell/scheduler/impl/QueueLogger.scala similarity index 85% rename from modules/scheduler/api/src/main/scala/docspell/scheduler/QueueLogger.scala rename to modules/scheduler/impl/src/main/scala/docspell/scheduler/impl/QueueLogger.scala index b0f17d23..08a176ec 100644 --- a/modules/scheduler/api/src/main/scala/docspell/scheduler/QueueLogger.scala +++ b/modules/scheduler/impl/src/main/scala/docspell/scheduler/impl/QueueLogger.scala @@ -1,19 +1,13 @@ -/* - * Copyright 2020 Eike K. & Contributors - * - * SPDX-License-Identifier: AGPL-3.0-or-later - */ +package docspell.scheduler.impl -package docspell.scheduler - -import cats.effect._ +import cats.syntax.all._ import cats.effect.std.Queue -import cats.implicits._ -import fs2.Stream - -import docspell.common._ +import cats.effect.{Async, Sync} +import docspell.common.{Ident, LogLevel} import docspell.logging import docspell.logging.{Level, Logger} +import docspell.scheduler.LogEvent +import fs2.Stream object QueueLogger { diff --git a/modules/scheduler/impl/src/main/scala/docspell/scheduler/impl/SchedulerBuilder.scala b/modules/scheduler/impl/src/main/scala/docspell/scheduler/impl/SchedulerBuilder.scala index 6cd3cc57..e2cf3286 100644 --- a/modules/scheduler/impl/src/main/scala/docspell/scheduler/impl/SchedulerBuilder.scala +++ b/modules/scheduler/impl/src/main/scala/docspell/scheduler/impl/SchedulerBuilder.scala @@ -10,7 +10,7 @@ import cats.effect._ import cats.effect.std.Semaphore import cats.implicits._ import fs2.concurrent.SignallingRef -import docspell.scheduler.{JobQueue, _} +import docspell.scheduler._ import docspell.notification.api.EventSink import docspell.pubsub.api.PubSubT import docspell.store.Store @@ -19,7 +19,7 @@ case class SchedulerBuilder[F[_]: Async]( config: SchedulerConfig, tasks: JobTaskRegistry[F], store: Store[F], - queue: Resource[F, JobQueue[F]], + queue: JobQueue[F], logSink: LogSink[F], pubSub: PubSubT[F], eventSink: EventSink[F] @@ -34,14 +34,11 @@ case class SchedulerBuilder[F[_]: Async]( def withTask[A](task: JobTask[F]): SchedulerBuilder[F] = withTaskRegistry(tasks.withTask(task)) - def withQueue(queue: Resource[F, JobQueue[F]]): SchedulerBuilder[F] = - copy(queue = queue) - def withLogSink(sink: LogSink[F]): SchedulerBuilder[F] = copy(logSink = sink) def withQueue(queue: JobQueue[F]): SchedulerBuilder[F] = - copy(queue = Resource.pure[F, JobQueue[F]](queue)) + copy(queue = queue) def withPubSub(pubSubT: PubSubT[F]): SchedulerBuilder[F] = copy(pubSub = pubSubT) @@ -53,14 +50,13 @@ case class SchedulerBuilder[F[_]: Async]( resource.evalMap(sch => Async[F].start(sch.start.compile.drain).map(_ => sch)) def resource: Resource[F, Scheduler[F]] = { - val scheduler: Resource[F, SchedulerImpl[F]] = for { - jq <- queue - waiter <- Resource.eval(SignallingRef(true)) - state <- Resource.eval(SignallingRef(SchedulerImpl.emptyState[F])) - perms <- Resource.eval(Semaphore(config.poolSize.toLong)) + val scheduler: F[SchedulerImpl[F]] = for { + waiter <- SignallingRef(true) + state <- SignallingRef(SchedulerImpl.emptyState[F]) + perms <- Semaphore(config.poolSize.toLong) } yield new SchedulerImpl[F]( config, - jq, + queue, pubSub, eventSink, tasks, @@ -71,7 +67,7 @@ case class SchedulerBuilder[F[_]: Async]( perms ) - scheduler.evalTap(_.init).map(s => s: Scheduler[F]) + Resource.eval(scheduler.flatTap(_.init)).map(s => s: Scheduler[F]) } } @@ -86,10 +82,9 @@ object SchedulerBuilder { config, JobTaskRegistry.empty[F], store, - JobQueue.create(store), + JobQueue(store), LogSink.db[F](store), PubSubT.noop[F], EventSink.silent[F] ) - } diff --git a/modules/scheduler/impl/src/main/scala/docspell/scheduler/impl/SchedulerImpl.scala b/modules/scheduler/impl/src/main/scala/docspell/scheduler/impl/SchedulerImpl.scala index 859cbde2..4aad8901 100644 --- a/modules/scheduler/impl/src/main/scala/docspell/scheduler/impl/SchedulerImpl.scala +++ b/modules/scheduler/impl/src/main/scala/docspell/scheduler/impl/SchedulerImpl.scala @@ -14,7 +14,7 @@ import fs2.Stream import fs2.concurrent.SignallingRef import docspell.scheduler.msg.{CancelJob, JobDone, JobsNotify} import docspell.common._ -import docspell.scheduler.{JobQueue, _} +import docspell.scheduler._ import docspell.scheduler.impl.SchedulerImpl._ import docspell.notification.api.Event import docspell.notification.api.EventSink @@ -172,7 +172,7 @@ final class SchedulerImpl[F[_]: Async]( for { _ <- logger.debug(s"Creating context for job ${job.info} to run cancellation $t") - ctx <- Context[F, String](job, job.args, config, logSink, store) + ctx <- ContextImpl[F, String](job, job.args, config, logSink, store) _ <- t.onCancel.run(ctx) _ <- state.modify(_.markCancelled(job)) _ <- onFinish(job, JobTaskResult.empty, JobState.Cancelled) @@ -196,7 +196,7 @@ final class SchedulerImpl[F[_]: Async]( case Right(t) => for { _ <- logger.debug(s"Creating context for job ${job.info} to run $t") - ctx <- Context[F, String](job, job.args, config, logSink, store) + ctx <- ContextImpl[F, String](job, job.args, config, logSink, store) jot = wrapTask(job, t.task, ctx) tok <- forkRun(job, jot.run(ctx), t.onCancel.run(ctx), ctx) _ <- state.modify(_.addRunning(job, tok)) diff --git a/modules/scheduler/impl/src/main/scala/docspell/scheduler/impl/SchedulerModuleBuilder.scala b/modules/scheduler/impl/src/main/scala/docspell/scheduler/impl/SchedulerModuleBuilder.scala new file mode 100644 index 00000000..09e6c6a1 --- /dev/null +++ b/modules/scheduler/impl/src/main/scala/docspell/scheduler/impl/SchedulerModuleBuilder.scala @@ -0,0 +1,68 @@ +package docspell.scheduler.impl + +import cats.effect._ +import docspell.common.Ident +import docspell.scheduler.{ + JobTaskRegistry, + PeriodicSchedulerConfig, + SchedulerConfig, + SchedulerModule +} + +case class SchedulerModuleBuilder[F[_]: Async] private ( + periodicSchedulerConfig: PeriodicSchedulerConfig, + schedulerBuilder: SchedulerBuilder[F], + jobStoreModule: JobStoreModuleBuilder.Module[F] +) { + + private def configureScheduler( + f: SchedulerBuilder[F] => SchedulerBuilder[F] + ): SchedulerModuleBuilder[F] = + copy(schedulerBuilder = f(schedulerBuilder)) + + def withTaskRegistry(reg: JobTaskRegistry[F]): SchedulerModuleBuilder[F] = + configureScheduler(_.withTaskRegistry(reg)) + + def withSchedulerConfig(cfg: SchedulerConfig): SchedulerModuleBuilder[F] = + configureScheduler(_.withConfig(cfg)) + + def withPeriodicSchedulerConfig( + cfg: PeriodicSchedulerConfig + ): SchedulerModuleBuilder[F] = + copy(periodicSchedulerConfig = cfg) + + def resource: Resource[F, SchedulerModule[F]] = { + val queue = JobQueue(jobStoreModule.store) + for { + schedulerR <- schedulerBuilder + .withPubSub(jobStoreModule.pubSubT) + .withEventSink(jobStoreModule.eventSink) + .withQueue(queue) + .resource + + periodicTaskSchedulerR <- + PeriodicSchedulerBuilder.resource( + periodicSchedulerConfig, + jobStoreModule.periodicTaskStore, + jobStoreModule.pubSubT + ) + } yield new SchedulerModule[F] { + val scheduler = schedulerR + val periodicScheduler = periodicTaskSchedulerR + } + } +} + +object SchedulerModuleBuilder { + + def apply[F[_]: Async]( + jobStoreModule: JobStoreModuleBuilder.Module[F] + ): SchedulerModuleBuilder[F] = { + val id = Ident.unsafe("default-node-id") + new SchedulerModuleBuilder( + PeriodicSchedulerConfig.default(id), + SchedulerBuilder(SchedulerConfig.default(id), jobStoreModule.store), + jobStoreModule + ) + } +} diff --git a/modules/scheduler/impl/src/main/scala/docspell/scheduler/impl/UserTaskStoreImpl.scala b/modules/scheduler/impl/src/main/scala/docspell/scheduler/impl/UserTaskStoreImpl.scala new file mode 100644 index 00000000..2f90617e --- /dev/null +++ b/modules/scheduler/impl/src/main/scala/docspell/scheduler/impl/UserTaskStoreImpl.scala @@ -0,0 +1,117 @@ +package docspell.scheduler.impl + +import cats.effect._ +import docspell.scheduler.usertask.UserTaskStore +import cats.data.OptionT +import cats.implicits._ +import docspell.common._ +import docspell.scheduler.usertask._ +import docspell.store.{AddResult, Store} +import fs2.Stream +import io.circe._ +import QUserTask.UserTaskCodec + +final class UserTaskStoreImpl[F[_]: Sync]( + store: Store[F], + periodicTaskStore: PeriodicTaskStore[F] +) extends UserTaskStore[F] { + def getAll(scope: UserTaskScope): Stream[F, UserTask[String]] = + store.transact(QUserTask.findAll(scope.toAccountId)) + + def getByNameRaw(scope: UserTaskScope, name: Ident): Stream[F, UserTask[String]] = + store.transact(QUserTask.findByName(scope.toAccountId, name)) + + def getByIdRaw(scope: UserTaskScope, id: Ident): OptionT[F, UserTask[String]] = + OptionT(store.transact(QUserTask.findById(scope.toAccountId, id))) + + def getByName[A](scope: UserTaskScope, name: Ident)(implicit + D: Decoder[A] + ): Stream[F, UserTask[A]] = + getByNameRaw(scope, name).flatMap(_.decode match { + case Right(ua) => Stream.emit(ua) + case Left(err) => Stream.raiseError[F](new Exception(err)) + }) + + def updateTask[A](scope: UserTaskScope, subject: Option[String], ut: UserTask[A])( + implicit E: Encoder[A] + ): F[Int] = { + val exists = QUserTask.exists(ut.id) + val insert = QUserTask.insert(scope, subject, ut.encode) + store.add(insert, exists).flatMap { + case AddResult.Success => + 1.pure[F] + case AddResult.EntityExists(_) => + store.transact(QUserTask.update(scope, subject, ut.encode)) + case AddResult.Failure(ex) => + Sync[F].raiseError(ex) + } + } + + def deleteTask(scope: UserTaskScope, id: Ident): F[Int] = + store.transact(QUserTask.delete(scope.toAccountId, id)) + + def getOneByNameRaw( + scope: UserTaskScope, + name: Ident + ): OptionT[F, UserTask[String]] = + OptionT( + getByNameRaw(scope, name) + .take(2) + .compile + .toList + .flatMap { + case Nil => (None: Option[UserTask[String]]).pure[F] + case ut :: Nil => ut.some.pure[F] + case _ => Sync[F].raiseError(new Exception("More than one result found")) + } + ) + + def getOneByName[A](scope: UserTaskScope, name: Ident)(implicit + D: Decoder[A] + ): OptionT[F, UserTask[A]] = + getOneByNameRaw(scope, name) + .semiflatMap(_.decode match { + case Right(ua) => ua.pure[F] + case Left(err) => Sync[F].raiseError(new Exception(err)) + }) + + def updateOneTask[A]( + scope: UserTaskScope, + subject: Option[String], + ut: UserTask[A] + )(implicit + E: Encoder[A] + ): F[UserTask[String]] = + getByNameRaw(scope, ut.name).compile.toList.flatMap { + case a :: rest => + val task = ut.copy(id = a.id).encode + for { + _ <- store.transact(QUserTask.update(scope, subject, task)) + _ <- store.transact( + rest.traverse(t => QUserTask.delete(scope.toAccountId, t.id)) + ) + } yield task + case Nil => + val task = ut.encode + store.transact(QUserTask.insert(scope, subject, task)).map(_ => task) + } + + def deleteAll(scope: UserTaskScope, name: Ident): F[Int] = + store.transact(QUserTask.deleteAll(scope.toAccountId, name)) + + def executeNow[A](scope: UserTaskScope, subject: Option[String], task: UserTask[A])( + implicit E: Encoder[A] + ): F[Unit] = + for { + ptask <- task.encode.toPeriodicTask(scope, subject) + _ <- periodicTaskStore.submit(ptask) + } yield () +} + +object UserTaskStoreImpl { + def apply[F[_]: Sync]( + store: Store[F], + periodicTaskStore: PeriodicTaskStore[F] + ): UserTaskStore[F] = + new UserTaskStoreImpl[F](store, periodicTaskStore) +} diff --git a/modules/store/src/main/scala/docspell/store/records/RJob.scala b/modules/store/src/main/scala/docspell/store/records/RJob.scala index 89852623..4804232b 100644 --- a/modules/store/src/main/scala/docspell/store/records/RJob.scala +++ b/modules/store/src/main/scala/docspell/store/records/RJob.scala @@ -48,22 +48,22 @@ case class RJob( object RJob { - def newJob[A]( + def newJob( id: Ident, task: Ident, group: Ident, - args: A, + args: String, subject: String, submitted: Timestamp, submitter: Ident, priority: Priority, tracker: Option[Ident] - )(implicit E: Encoder[A]): RJob = + ): RJob = RJob( id, task, group, - E(args).noSpaces, + args, subject, submitted, submitter, @@ -77,6 +77,29 @@ object RJob { None ) + def fromJson[A]( + id: Ident, + task: Ident, + group: Ident, + args: A, + subject: String, + submitted: Timestamp, + submitter: Ident, + priority: Priority, + tracker: Option[Ident] + )(implicit E: Encoder[A]): RJob = + newJob( + id, + task, + group, + E(args).noSpaces, + subject, + submitted, + submitter, + priority, + tracker + ) + final case class Table(alias: Option[String]) extends TableDef { val tableName = "job" diff --git a/modules/store/src/test/scala/docspell/store/queries/QJobTest.scala b/modules/store/src/test/scala/docspell/store/queries/QJobTest.scala index 8c60f240..adf6facd 100644 --- a/modules/store/src/test/scala/docspell/store/queries/QJobTest.scala +++ b/modules/store/src/test/scala/docspell/store/queries/QJobTest.scala @@ -30,7 +30,7 @@ class QJobTest extends CatsEffectSuite with StoreFixture with TestLoggingConfig private val group2 = Ident.unsafe("group2") def createJob(group: Ident): RJob = - RJob.newJob[Unit]( + RJob.fromJson[Unit]( Ident.unsafe(s"job-${c.incrementAndGet()}"), Ident.unsafe("task"), group,