diff --git a/build.sbt b/build.sbt index 9eb54d39..91c21a17 100644 --- a/build.sbt +++ b/build.sbt @@ -530,7 +530,7 @@ val schedulerApi = project Dependencies.fs2Core ++ Dependencies.circeCore ) - .dependsOn(loggingApi, common, store, pubsubApi) + .dependsOn(loggingApi, common, store, notificationApi, pubsubApi) val schedulerImpl = project .in(file("modules/scheduler/impl")) diff --git a/modules/backend/src/main/scala/docspell/backend/BackendApp.scala b/modules/backend/src/main/scala/docspell/backend/BackendApp.scala index f3142016..64ce4b9b 100644 --- a/modules/backend/src/main/scala/docspell/backend/BackendApp.scala +++ b/modules/backend/src/main/scala/docspell/backend/BackendApp.scala @@ -7,19 +7,17 @@ package docspell.backend import cats.effect._ - import docspell.backend.auth.Login import docspell.backend.fulltext.CreateIndex -import docspell.backend.msg.JobQueuePublish import docspell.backend.ops._ import docspell.backend.signup.OSignup import docspell.ftsclient.FtsClient import docspell.notification.api.{EventExchange, NotificationModule} import docspell.pubsub.api.PubSubT +import docspell.scheduler.msg.JobQueuePublish import docspell.store.Store import docspell.store.usertask.UserTaskStore import docspell.totp.Totp - import emil.Emil trait BackendApp[F[_]] { diff --git a/modules/backend/src/main/scala/docspell/backend/msg/Topics.scala b/modules/backend/src/main/scala/docspell/backend/msg/Topics.scala index bda187d1..3a887097 100644 --- a/modules/backend/src/main/scala/docspell/backend/msg/Topics.scala +++ b/modules/backend/src/main/scala/docspell/backend/msg/Topics.scala @@ -7,17 +7,19 @@ package docspell.backend.msg import cats.data.NonEmptyList -import docspell.pubsub.api.{Topic, TypedTopic} -import docspell.scheduler.msg.JobDone +import docspell.pubsub.api.TypedTopic +import docspell.scheduler.msg._ /** All topics used in Docspell. */ object Topics { - /** A generic notification to the job executors to look for new work. */ - val jobsNotify: TypedTopic[Unit] = - TypedTopic[Unit](Topic("jobs-notify")) - /** A list of all topics. It is required to list every topic in use here! */ val all: NonEmptyList[TypedTopic[_]] = - NonEmptyList.of(JobDone.topic, CancelJob.topic, jobsNotify, JobSubmitted.topic) + NonEmptyList.of( + JobDone.topic, + CancelJob.topic, + JobsNotify(), + JobSubmitted.topic, + PeriodicTaskNotify() + ) } diff --git a/modules/backend/src/main/scala/docspell/backend/ops/OCollective.scala b/modules/backend/src/main/scala/docspell/backend/ops/OCollective.scala index 907bfcef..f72ba217 100644 --- a/modules/backend/src/main/scala/docspell/backend/ops/OCollective.scala +++ b/modules/backend/src/main/scala/docspell/backend/ops/OCollective.scala @@ -9,19 +9,17 @@ package docspell.backend.ops import cats.effect.{Async, Resource} import cats.implicits._ import fs2.Stream - import docspell.backend.JobFactory import docspell.backend.PasswordCrypt import docspell.backend.ops.OCollective._ import docspell.common._ import docspell.store.UpdateResult import docspell.store.queries.{QCollective, QUser} -import docspell.store.queue.JobQueue import docspell.store.records._ import docspell.store.usertask.{UserTask, UserTaskScope, UserTaskStore} import docspell.store.{AddResult, Store} - import com.github.eikek.calev._ +import docspell.scheduler.JobQueue trait OCollective[F[_]] { diff --git a/modules/backend/src/main/scala/docspell/backend/ops/OFileRepository.scala b/modules/backend/src/main/scala/docspell/backend/ops/OFileRepository.scala index aec12823..12ea8191 100644 --- a/modules/backend/src/main/scala/docspell/backend/ops/OFileRepository.scala +++ b/modules/backend/src/main/scala/docspell/backend/ops/OFileRepository.scala @@ -9,14 +9,12 @@ package docspell.backend.ops import cats.data.OptionT import cats.effect._ import cats.implicits._ - import docspell.backend.JobFactory import docspell.backend.ops.OFileRepository.IntegrityResult import docspell.common._ +import docspell.scheduler.JobQueue import docspell.store.Store -import docspell.store.queue.JobQueue import docspell.store.records.RJob - import scodec.bits.ByteVector trait OFileRepository[F[_]] { diff --git a/modules/backend/src/main/scala/docspell/backend/ops/OFulltext.scala b/modules/backend/src/main/scala/docspell/backend/ops/OFulltext.scala index 9d057f1c..88b9c3e3 100644 --- a/modules/backend/src/main/scala/docspell/backend/ops/OFulltext.scala +++ b/modules/backend/src/main/scala/docspell/backend/ops/OFulltext.scala @@ -10,15 +10,14 @@ import cats.data.NonEmptyList import cats.effect._ import cats.implicits._ import fs2.Stream - import docspell.backend.JobFactory import docspell.backend.ops.OItemSearch._ import docspell.common._ import docspell.ftsclient._ import docspell.query.ItemQuery._ import docspell.query.ItemQueryDsl._ +import docspell.scheduler.JobQueue import docspell.store.queries.{QFolder, QItem, SelectedItem} -import docspell.store.queue.JobQueue import docspell.store.records.RJob import docspell.store.{Store, qb} diff --git a/modules/backend/src/main/scala/docspell/backend/ops/OItem.scala b/modules/backend/src/main/scala/docspell/backend/ops/OItem.scala index f625cdfe..41a48e7a 100644 --- a/modules/backend/src/main/scala/docspell/backend/ops/OItem.scala +++ b/modules/backend/src/main/scala/docspell/backend/ops/OItem.scala @@ -9,7 +9,6 @@ package docspell.backend.ops import cats.data.{NonEmptyList => Nel, OptionT} import cats.effect.{Async, Resource} import cats.implicits._ - import docspell.backend.AttachedEvent import docspell.backend.JobFactory import docspell.backend.fulltext.CreateIndex @@ -18,11 +17,10 @@ import docspell.common._ import docspell.ftsclient.FtsClient import docspell.logging.Logger import docspell.notification.api.Event +import docspell.scheduler.JobQueue import docspell.store.queries.{QAttachment, QItem, QMoveAttachment} -import docspell.store.queue.JobQueue import docspell.store.records._ import docspell.store.{AddResult, Store, UpdateResult} - import doobie.implicits._ trait OItem[F[_]] { diff --git a/modules/backend/src/main/scala/docspell/backend/ops/OJob.scala b/modules/backend/src/main/scala/docspell/backend/ops/OJob.scala index b4b0b7ae..ad1cc8c8 100644 --- a/modules/backend/src/main/scala/docspell/backend/ops/OJob.scala +++ b/modules/backend/src/main/scala/docspell/backend/ops/OJob.scala @@ -9,11 +9,10 @@ package docspell.backend.ops import cats.data.OptionT import cats.effect._ import cats.implicits._ - -import docspell.backend.msg.JobDone import docspell.backend.ops.OJob.{CollectiveQueueState, JobCancelResult} import docspell.common._ import docspell.pubsub.api.PubSubT +import docspell.scheduler.msg.JobDone import docspell.store.Store import docspell.store.UpdateResult import docspell.store.queries.QJob diff --git a/modules/backend/src/main/scala/docspell/backend/ops/OJoex.scala b/modules/backend/src/main/scala/docspell/backend/ops/OJoex.scala index e51e8bd6..f82285d9 100644 --- a/modules/backend/src/main/scala/docspell/backend/ops/OJoex.scala +++ b/modules/backend/src/main/scala/docspell/backend/ops/OJoex.scala @@ -9,15 +9,16 @@ package docspell.backend.ops import cats.Applicative import cats.effect._ import cats.implicits._ - -import docspell.backend.msg.{CancelJob, Topics} import docspell.common.Ident import docspell.pubsub.api.PubSubT +import docspell.scheduler.msg.{CancelJob, JobsNotify, PeriodicTaskNotify} trait OJoex[F[_]] { def notifyAllNodes: F[Unit] + def notifyPeriodicTasks: F[Unit] + def cancelJob(job: Ident, worker: Ident): F[Unit] } @@ -26,7 +27,10 @@ object OJoex { Resource.pure[F, OJoex[F]](new OJoex[F] { def notifyAllNodes: F[Unit] = - pubSub.publish1IgnoreErrors(Topics.jobsNotify, ()).as(()) + pubSub.publish1IgnoreErrors(JobsNotify(), ()).void + + def notifyPeriodicTasks: F[Unit] = + pubSub.publish1IgnoreErrors(PeriodicTaskNotify(), ()).void def cancelJob(job: Ident, worker: Ident): F[Unit] = pubSub.publish1IgnoreErrors(CancelJob.topic, CancelJob(job, worker)).as(()) diff --git a/modules/backend/src/main/scala/docspell/backend/ops/OUpload.scala b/modules/backend/src/main/scala/docspell/backend/ops/OUpload.scala index dbda65b2..4e1e2dd4 100644 --- a/modules/backend/src/main/scala/docspell/backend/ops/OUpload.scala +++ b/modules/backend/src/main/scala/docspell/backend/ops/OUpload.scala @@ -11,11 +11,10 @@ import cats.data.{EitherT, OptionT} import cats.effect._ import cats.implicits._ import fs2.Stream - import docspell.backend.JobFactory import docspell.common._ +import docspell.scheduler.JobQueue import docspell.store.Store -import docspell.store.queue.JobQueue import docspell.store.records._ trait OUpload[F[_]] { diff --git a/modules/backend/src/main/scala/docspell/backend/ops/OUserTask.scala b/modules/backend/src/main/scala/docspell/backend/ops/OUserTask.scala index 8f8d0ab3..3c5ade36 100644 --- a/modules/backend/src/main/scala/docspell/backend/ops/OUserTask.scala +++ b/modules/backend/src/main/scala/docspell/backend/ops/OUserTask.scala @@ -10,14 +10,12 @@ import cats.data.{NonEmptyList, OptionT} import cats.effect._ import cats.implicits._ import fs2.Stream - import docspell.common._ import docspell.notification.api.{ChannelRef, PeriodicDueItemsArgs, PeriodicQueryArgs} +import docspell.scheduler.JobQueue import docspell.store.Store -import docspell.store.queue.JobQueue import docspell.store.records.RNotificationChannel import docspell.store.usertask._ - import io.circe.Encoder trait OUserTask[F[_]] { @@ -98,7 +96,7 @@ object OUserTask { ptask <- task.encode.toPeriodicTask(scope, subject) job <- ptask.toJob _ <- queue.insert(job) - _ <- joex.notifyAllNodes + _ <- joex.notifyPeriodicTasks } yield () def getScanMailbox(scope: UserTaskScope): Stream[F, UserTask[ScanMailboxArgs]] = @@ -124,7 +122,7 @@ object OUserTask { ): F[Unit] = for { _ <- taskStore.updateTask[ScanMailboxArgs](scope, subject, task) - _ <- joex.notifyAllNodes + _ <- joex.notifyPeriodicTasks } yield () def getNotifyDueItems( @@ -153,7 +151,7 @@ object OUserTask { ): F[Unit] = for { _ <- taskStore.updateTask[PeriodicDueItemsArgs](scope, subject, task) - _ <- joex.notifyAllNodes + _ <- joex.notifyPeriodicTasks } yield () def getPeriodicQuery(scope: UserTaskScope): Stream[F, UserTask[PeriodicQueryArgs]] = @@ -180,7 +178,7 @@ object OUserTask { ): F[Unit] = for { _ <- taskStore.updateTask[PeriodicQueryArgs](scope, subject, task) - _ <- joex.notifyAllNodes + _ <- joex.notifyPeriodicTasks } yield () // When retrieving arguments containing channel references, we must update diff --git a/modules/joex/src/main/scala/docspell/joex/JoexAppImpl.scala b/modules/joex/src/main/scala/docspell/joex/JoexAppImpl.scala index 7ba81536..3cb650c0 100644 --- a/modules/joex/src/main/scala/docspell/joex/JoexAppImpl.scala +++ b/modules/joex/src/main/scala/docspell/joex/JoexAppImpl.scala @@ -12,7 +12,6 @@ import fs2.concurrent.SignallingRef import docspell.analysis.TextAnalyser import docspell.backend.MailAddressCodec import docspell.backend.fulltext.CreateIndex -import docspell.backend.msg.{CancelJob, JobQueuePublish, Topics} import docspell.backend.ops._ import docspell.common._ import docspell.ftsclient.FtsClient @@ -32,13 +31,17 @@ import docspell.joex.process.ItemHandler import docspell.joex.process.ReProcessItem import docspell.joex.scanmailbox._ import docspell.scheduler._ -import docspell.scheduler.impl.{PeriodicSchedulerBuilder, SchedulerBuilder} +import docspell.scheduler.impl.{ + PeriodicSchedulerBuilder, + PeriodicTaskStore, + SchedulerBuilder +} import docspell.joex.updatecheck._ import docspell.notification.api.NotificationModule import docspell.notification.impl.NotificationModuleImpl import docspell.pubsub.api.{PubSub, PubSubT} +import docspell.scheduler.msg.JobQueuePublish import docspell.store.Store -import docspell.store.queue._ import docspell.store.records.{REmptyTrashSetting, RJobLog} import docspell.store.usertask.UserTaskScope import docspell.store.usertask.UserTaskStore @@ -49,7 +52,6 @@ final class JoexAppImpl[F[_]: Async]( cfg: Config, store: Store[F], queue: JobQueue[F], - pubSubT: PubSubT[F], pstore: PeriodicTaskStore[F], termSignal: SignallingRef[F, Boolean], notificationMod: NotificationModule[F], @@ -67,20 +69,11 @@ final class JoexAppImpl[F[_]: Async]( _ <- Async[F].start(eventConsume) _ <- scheduler.periodicAwake _ <- periodicScheduler.periodicAwake - _ <- subscriptions + _ <- scheduler.startSubscriptions + _ <- periodicScheduler.startSubscriptions } yield () } - def subscriptions = - for { - _ <- Async[F].start(pubSubT.subscribeSink(Topics.jobsNotify) { _ => - scheduler.notifyChange - }) - _ <- Async[F].start(pubSubT.subscribeSink(CancelJob.topic) { msg => - scheduler.requestCancel(msg.body.jobId).as(()) - }) - } yield () - def findLogs(jobId: Ident): F[Vector[RJobLog]] = store.transact(RJobLog.findLogs(jobId)) @@ -300,13 +293,12 @@ object JoexAppImpl extends MailAddressCodec { sch, queue, pstore, - joex.notifyAllNodes + pubSubT ) app = new JoexAppImpl( cfg, store, queue, - pubSubT, pstore, termSignal, notificationMod, diff --git a/modules/joex/src/main/scala/docspell/joex/pagecount/AllPageCountTask.scala b/modules/joex/src/main/scala/docspell/joex/pagecount/AllPageCountTask.scala index 99b315b7..0fb9e299 100644 --- a/modules/joex/src/main/scala/docspell/joex/pagecount/AllPageCountTask.scala +++ b/modules/joex/src/main/scala/docspell/joex/pagecount/AllPageCountTask.scala @@ -9,13 +9,10 @@ package docspell.joex.pagecount import cats.effect._ import cats.implicits._ import fs2.{Chunk, Stream} - import docspell.backend.JobFactory import docspell.backend.ops.OJoex import docspell.common._ -import docspell.scheduler.Context -import docspell.scheduler.Task -import docspell.store.queue.JobQueue +import docspell.scheduler.{Context, JobQueue, Task} import docspell.store.records.RAttachment import docspell.store.records.RJob diff --git a/modules/joex/src/main/scala/docspell/joex/pdfconv/ConvertAllPdfTask.scala b/modules/joex/src/main/scala/docspell/joex/pdfconv/ConvertAllPdfTask.scala index e4def9ac..a0865156 100644 --- a/modules/joex/src/main/scala/docspell/joex/pdfconv/ConvertAllPdfTask.scala +++ b/modules/joex/src/main/scala/docspell/joex/pdfconv/ConvertAllPdfTask.scala @@ -9,11 +9,9 @@ package docspell.joex.pdfconv import cats.effect._ import cats.implicits._ import fs2.{Chunk, Stream} - import docspell.backend.ops.OJoex import docspell.common._ -import docspell.scheduler.{Context, Task} -import docspell.store.queue.JobQueue +import docspell.scheduler.{Context, JobQueue, Task} import docspell.store.records.RAttachment import docspell.store.records._ diff --git a/modules/joex/src/main/scala/docspell/joex/preview/AllPreviewsTask.scala b/modules/joex/src/main/scala/docspell/joex/preview/AllPreviewsTask.scala index 15616ed5..849b4f7a 100644 --- a/modules/joex/src/main/scala/docspell/joex/preview/AllPreviewsTask.scala +++ b/modules/joex/src/main/scala/docspell/joex/preview/AllPreviewsTask.scala @@ -9,14 +9,11 @@ package docspell.joex.preview import cats.effect._ import cats.implicits._ import fs2.{Chunk, Stream} - import docspell.backend.JobFactory import docspell.backend.ops.OJoex import docspell.common.MakePreviewArgs.StoreMode import docspell.common._ -import docspell.scheduler.Context -import docspell.scheduler.Task -import docspell.store.queue.JobQueue +import docspell.scheduler.{Context, JobQueue, Task} import docspell.store.records.RAttachment import docspell.store.records.RJob diff --git a/modules/restserver/src/main/scala/docspell/restserver/Subscriptions.scala b/modules/restserver/src/main/scala/docspell/restserver/Subscriptions.scala index faecac11..e1cafa20 100644 --- a/modules/restserver/src/main/scala/docspell/restserver/Subscriptions.scala +++ b/modules/restserver/src/main/scala/docspell/restserver/Subscriptions.scala @@ -9,10 +9,9 @@ package docspell.restserver import cats.effect.Async import fs2.Stream import fs2.concurrent.Topic - -import docspell.backend.msg.{JobDone, JobSubmitted} import docspell.pubsub.api.PubSubT import docspell.restserver.ws.OutputEvent +import docspell.scheduler.msg.{JobDone, JobSubmitted} /** Subscribes to those events from docspell that are forwarded to the websocket endpoints */ diff --git a/modules/store/src/main/scala/docspell/store/queue/JobQueue.scala b/modules/scheduler/api/src/main/scala/docspell/scheduler/JobQueue.scala similarity index 95% rename from modules/store/src/main/scala/docspell/store/queue/JobQueue.scala rename to modules/scheduler/api/src/main/scala/docspell/scheduler/JobQueue.scala index 9b777086..dd927bf9 100644 --- a/modules/store/src/main/scala/docspell/store/queue/JobQueue.scala +++ b/modules/scheduler/api/src/main/scala/docspell/scheduler/JobQueue.scala @@ -4,11 +4,10 @@ * SPDX-License-Identifier: AGPL-3.0-or-later */ -package docspell.store.queue +package docspell.scheduler import cats.effect._ import cats.implicits._ - import docspell.common._ import docspell.store.Store import docspell.store.queries.QJob @@ -40,7 +39,7 @@ trait JobQueue[F[_]] { } object JobQueue { - def apply[F[_]: Async](store: Store[F]): Resource[F, JobQueue[F]] = + private[scheduler] def create[F[_]: Async](store: Store[F]): Resource[F, JobQueue[F]] = Resource.pure[F, JobQueue[F]](new JobQueue[F] { private[this] val logger = docspell.logging.getLogger[F] diff --git a/modules/scheduler/api/src/main/scala/docspell/scheduler/PeriodicScheduler.scala b/modules/scheduler/api/src/main/scala/docspell/scheduler/PeriodicScheduler.scala index 284a9465..3a633a88 100644 --- a/modules/scheduler/api/src/main/scala/docspell/scheduler/PeriodicScheduler.scala +++ b/modules/scheduler/api/src/main/scala/docspell/scheduler/PeriodicScheduler.scala @@ -27,6 +27,7 @@ trait PeriodicScheduler[F[_]] { def periodicAwake: F[Fiber[F, Throwable, Unit]] def notifyChange: F[Unit] -} -object PeriodicScheduler {} + /** Starts listening for notify messages in the background. */ + def startSubscriptions: F[Unit] +} diff --git a/modules/scheduler/api/src/main/scala/docspell/scheduler/Scheduler.scala b/modules/scheduler/api/src/main/scala/docspell/scheduler/Scheduler.scala index 720214c7..0e44244b 100644 --- a/modules/scheduler/api/src/main/scala/docspell/scheduler/Scheduler.scala +++ b/modules/scheduler/api/src/main/scala/docspell/scheduler/Scheduler.scala @@ -22,6 +22,10 @@ trait Scheduler[F[_]] { def notifyChange: F[Unit] + /** Starts reacting on notify and cancel messages. */ + def startSubscriptions: F[Unit] + + /** Starts the schedulers main loop. */ def start: Stream[F, Nothing] /** Requests to shutdown the scheduler. diff --git a/modules/backend/src/main/scala/docspell/backend/msg/CancelJob.scala b/modules/scheduler/api/src/main/scala/docspell/scheduler/msg/CancelJob.scala similarity index 95% rename from modules/backend/src/main/scala/docspell/backend/msg/CancelJob.scala rename to modules/scheduler/api/src/main/scala/docspell/scheduler/msg/CancelJob.scala index 677f1fba..3ec0c891 100644 --- a/modules/backend/src/main/scala/docspell/backend/msg/CancelJob.scala +++ b/modules/scheduler/api/src/main/scala/docspell/scheduler/msg/CancelJob.scala @@ -4,11 +4,10 @@ * SPDX-License-Identifier: AGPL-3.0-or-later */ -package docspell.backend.msg +package docspell.scheduler.msg import docspell.common._ import docspell.pubsub.api.{Topic, TypedTopic} - import io.circe.generic.semiauto.{deriveDecoder, deriveEncoder} import io.circe.{Decoder, Encoder} diff --git a/modules/backend/src/main/scala/docspell/backend/msg/JobQueuePublish.scala b/modules/scheduler/api/src/main/scala/docspell/scheduler/msg/JobQueuePublish.scala similarity index 89% rename from modules/backend/src/main/scala/docspell/backend/msg/JobQueuePublish.scala rename to modules/scheduler/api/src/main/scala/docspell/scheduler/msg/JobQueuePublish.scala index d4011943..7c867a22 100644 --- a/modules/backend/src/main/scala/docspell/backend/msg/JobQueuePublish.scala +++ b/modules/scheduler/api/src/main/scala/docspell/scheduler/msg/JobQueuePublish.scala @@ -4,17 +4,15 @@ * SPDX-License-Identifier: AGPL-3.0-or-later */ -package docspell.backend.msg +package docspell.scheduler.msg import cats.effect._ import cats.implicits._ - import docspell.common.{Duration, Ident, Priority} -import docspell.notification.api.Event -import docspell.notification.api.EventSink +import docspell.notification.api.{Event, EventSink} import docspell.pubsub.api.PubSubT +import docspell.scheduler.JobQueue import docspell.store.Store -import docspell.store.queue.JobQueue import docspell.store.records.RJob final class JobQueuePublish[F[_]: Sync]( @@ -76,5 +74,5 @@ object JobQueuePublish { pubSub: PubSubT[F], eventSink: EventSink[F] ): Resource[F, JobQueue[F]] = - JobQueue(store).map(q => new JobQueuePublish[F](q, pubSub, eventSink)) + JobQueue.create(store).map(q => new JobQueuePublish[F](q, pubSub, eventSink)) } diff --git a/modules/backend/src/main/scala/docspell/backend/msg/JobSubmitted.scala b/modules/scheduler/api/src/main/scala/docspell/scheduler/msg/JobSubmitted.scala similarity index 95% rename from modules/backend/src/main/scala/docspell/backend/msg/JobSubmitted.scala rename to modules/scheduler/api/src/main/scala/docspell/scheduler/msg/JobSubmitted.scala index 2cecdde8..c6ef40b2 100644 --- a/modules/backend/src/main/scala/docspell/backend/msg/JobSubmitted.scala +++ b/modules/scheduler/api/src/main/scala/docspell/scheduler/msg/JobSubmitted.scala @@ -4,10 +4,10 @@ * SPDX-License-Identifier: AGPL-3.0-or-later */ -package docspell.backend.msg +package docspell.scheduler.msg + import docspell.common._ import docspell.pubsub.api.{Topic, TypedTopic} - import io.circe.generic.semiauto.{deriveDecoder, deriveEncoder} import io.circe.{Decoder, Encoder} diff --git a/modules/scheduler/api/src/main/scala/docspell/scheduler/msg/JobsNotify.scala b/modules/scheduler/api/src/main/scala/docspell/scheduler/msg/JobsNotify.scala new file mode 100644 index 00000000..8bc9fc8d --- /dev/null +++ b/modules/scheduler/api/src/main/scala/docspell/scheduler/msg/JobsNotify.scala @@ -0,0 +1,9 @@ +package docspell.scheduler.msg + +import docspell.pubsub.api.{Topic, TypedTopic} + +/** A generic notification to the job executors to look for new work. */ +object JobsNotify { + def apply(): TypedTopic[Unit] = + TypedTopic[Unit](Topic("jobs-notify")) +} diff --git a/modules/scheduler/api/src/main/scala/docspell/scheduler/msg/PeriodicTaskNotify.scala b/modules/scheduler/api/src/main/scala/docspell/scheduler/msg/PeriodicTaskNotify.scala new file mode 100644 index 00000000..d8f5157e --- /dev/null +++ b/modules/scheduler/api/src/main/scala/docspell/scheduler/msg/PeriodicTaskNotify.scala @@ -0,0 +1,9 @@ +package docspell.scheduler.msg + +import docspell.pubsub.api.{Topic, TypedTopic} + +/** A generic notification to the periodic task scheduler to look for new work. */ +object PeriodicTaskNotify { + def apply(): TypedTopic[Unit] = + TypedTopic[Unit](Topic("periodic-task-notify")) +} diff --git a/modules/store/src/main/scala/docspell/store/queue/Marked.scala b/modules/scheduler/impl/src/main/scala/docspell/scheduler/impl/Marked.scala similarity index 93% rename from modules/store/src/main/scala/docspell/store/queue/Marked.scala rename to modules/scheduler/impl/src/main/scala/docspell/scheduler/impl/Marked.scala index af7f32db..27e9eb7d 100644 --- a/modules/store/src/main/scala/docspell/store/queue/Marked.scala +++ b/modules/scheduler/impl/src/main/scala/docspell/scheduler/impl/Marked.scala @@ -4,7 +4,7 @@ * SPDX-License-Identifier: AGPL-3.0-or-later */ -package docspell.store.queue +package docspell.scheduler.impl sealed trait Marked[+A] {} diff --git a/modules/scheduler/impl/src/main/scala/docspell/scheduler/impl/PeriodicSchedulerBuilder.scala b/modules/scheduler/impl/src/main/scala/docspell/scheduler/impl/PeriodicSchedulerBuilder.scala index 5fbaada2..f4d1d7ec 100644 --- a/modules/scheduler/impl/src/main/scala/docspell/scheduler/impl/PeriodicSchedulerBuilder.scala +++ b/modules/scheduler/impl/src/main/scala/docspell/scheduler/impl/PeriodicSchedulerBuilder.scala @@ -1,8 +1,8 @@ package docspell.scheduler.impl import cats.effect._ +import docspell.pubsub.api.PubSubT import docspell.scheduler._ -import docspell.store.queue.{JobQueue, PeriodicTaskStore} import fs2.concurrent.SignallingRef object PeriodicSchedulerBuilder { @@ -12,7 +12,7 @@ object PeriodicSchedulerBuilder { sch: Scheduler[F], queue: JobQueue[F], store: PeriodicTaskStore[F], - notifyJoex: F[Unit] + pubsub: PubSubT[F] ): Resource[F, PeriodicScheduler[F]] = for { waiter <- Resource.eval(SignallingRef(true)) @@ -22,7 +22,7 @@ object PeriodicSchedulerBuilder { sch, queue, store, - notifyJoex, + pubsub, waiter, state ) diff --git a/modules/scheduler/impl/src/main/scala/docspell/scheduler/impl/PeriodicSchedulerImpl.scala b/modules/scheduler/impl/src/main/scala/docspell/scheduler/impl/PeriodicSchedulerImpl.scala index efdefeaf..fd1a3e0d 100644 --- a/modules/scheduler/impl/src/main/scala/docspell/scheduler/impl/PeriodicSchedulerImpl.scala +++ b/modules/scheduler/impl/src/main/scala/docspell/scheduler/impl/PeriodicSchedulerImpl.scala @@ -10,13 +10,12 @@ import cats.effect._ import cats.implicits._ import fs2._ import fs2.concurrent.SignallingRef - import docspell.common._ +import docspell.pubsub.api.PubSubT import docspell.scheduler._ import docspell.scheduler.impl.PeriodicSchedulerImpl.State -import docspell.store.queue._ +import docspell.scheduler.msg.{JobsNotify, PeriodicTaskNotify} import docspell.store.records.RPeriodicTask - import eu.timepit.fs2cron.calev.CalevScheduler final class PeriodicSchedulerImpl[F[_]: Async]( @@ -24,7 +23,7 @@ final class PeriodicSchedulerImpl[F[_]: Async]( sch: Scheduler[F], queue: JobQueue[F], store: PeriodicTaskStore[F], - joexNotifyAll: F[Unit], + pubSub: PubSubT[F], waiter: SignallingRef[F, Boolean], state: SignallingRef[F, State[F]] ) extends PeriodicScheduler[F] { @@ -49,6 +48,13 @@ final class PeriodicSchedulerImpl[F[_]: Async]( def notifyChange: F[Unit] = waiter.update(b => !b) + def startSubscriptions: F[Unit] = + for { + _ <- Async[F].start(pubSub.subscribeSink(PeriodicTaskNotify()) { _ => + logger.info("Notify periodic scheduler from message") *> notifyChange + }) + } yield () + // internal /** On startup, get all periodic jobs from this scheduler and remove the mark, so they @@ -117,7 +123,7 @@ final class PeriodicSchedulerImpl[F[_]: Async]( } def notifyJoex: F[Unit] = - sch.notifyChange *> joexNotifyAll + sch.notifyChange *> pubSub.publish1IgnoreErrors(JobsNotify(), ()).void def scheduleNotify(pj: RPeriodicTask): F[Unit] = Timestamp diff --git a/modules/store/src/main/scala/docspell/store/queue/PeriodicTaskStore.scala b/modules/scheduler/impl/src/main/scala/docspell/scheduler/impl/PeriodicTaskStore.scala similarity index 97% rename from modules/store/src/main/scala/docspell/store/queue/PeriodicTaskStore.scala rename to modules/scheduler/impl/src/main/scala/docspell/scheduler/impl/PeriodicTaskStore.scala index f1fad91f..54846b1f 100644 --- a/modules/store/src/main/scala/docspell/store/queue/PeriodicTaskStore.scala +++ b/modules/scheduler/impl/src/main/scala/docspell/scheduler/impl/PeriodicTaskStore.scala @@ -4,11 +4,10 @@ * SPDX-License-Identifier: AGPL-3.0-or-later */ -package docspell.store.queue +package docspell.scheduler.impl import cats.effect._ import cats.implicits._ - import docspell.common._ import docspell.store.queries.QPeriodicTask import docspell.store.records._ @@ -57,7 +56,7 @@ object PeriodicTaskStore { case false => Marked.notMarkable } case None => - Marked.notFound.pure[F] + Marked.notFound[RPeriodicTask].pure[F] } Resource.make(chooseNext) { diff --git a/modules/scheduler/impl/src/main/scala/docspell/scheduler/impl/SchedulerBuilder.scala b/modules/scheduler/impl/src/main/scala/docspell/scheduler/impl/SchedulerBuilder.scala index 4e51231b..6cd3cc57 100644 --- a/modules/scheduler/impl/src/main/scala/docspell/scheduler/impl/SchedulerBuilder.scala +++ b/modules/scheduler/impl/src/main/scala/docspell/scheduler/impl/SchedulerBuilder.scala @@ -10,12 +10,10 @@ import cats.effect._ import cats.effect.std.Semaphore import cats.implicits._ import fs2.concurrent.SignallingRef - -import docspell.scheduler._ +import docspell.scheduler.{JobQueue, _} import docspell.notification.api.EventSink import docspell.pubsub.api.PubSubT import docspell.store.Store -import docspell.store.queue.JobQueue case class SchedulerBuilder[F[_]: Async]( config: SchedulerConfig, @@ -88,7 +86,7 @@ object SchedulerBuilder { config, JobTaskRegistry.empty[F], store, - JobQueue(store), + JobQueue.create(store), LogSink.db[F](store), PubSubT.noop[F], EventSink.silent[F] diff --git a/modules/scheduler/impl/src/main/scala/docspell/scheduler/impl/SchedulerImpl.scala b/modules/scheduler/impl/src/main/scala/docspell/scheduler/impl/SchedulerImpl.scala index d0de5991..859cbde2 100644 --- a/modules/scheduler/impl/src/main/scala/docspell/scheduler/impl/SchedulerImpl.scala +++ b/modules/scheduler/impl/src/main/scala/docspell/scheduler/impl/SchedulerImpl.scala @@ -12,19 +12,16 @@ import cats.effect.std.Semaphore import cats.implicits._ import fs2.Stream import fs2.concurrent.SignallingRef - -import docspell.scheduler.msg.JobDone +import docspell.scheduler.msg.{CancelJob, JobDone, JobsNotify} import docspell.common._ -import docspell.scheduler._ +import docspell.scheduler.{JobQueue, _} import docspell.scheduler.impl.SchedulerImpl._ import docspell.notification.api.Event import docspell.notification.api.EventSink import docspell.pubsub.api.PubSubT import docspell.store.Store import docspell.store.queries.QJob -import docspell.store.queue.JobQueue import docspell.store.records.RJob - import io.circe.Json final class SchedulerImpl[F[_]: Async]( @@ -42,6 +39,16 @@ final class SchedulerImpl[F[_]: Async]( private[this] val logger = docspell.logging.getLogger[F] + def startSubscriptions = + for { + _ <- Async[F].start(pubSub.subscribeSink(JobsNotify()) { _ => + notifyChange + }) + _ <- Async[F].start(pubSub.subscribeSink(CancelJob.topic) { msg => + requestCancel(msg.body.jobId).void + }) + } yield () + /** On startup, get all jobs in state running from this scheduler and put them into * waiting state, so they get picked up again. */ diff --git a/modules/store/src/main/scala/docspell/store/usertask/UserTaskStore.scala b/modules/store/src/main/scala/docspell/store/usertask/UserTaskStore.scala index 96a08351..e19fa035 100644 --- a/modules/store/src/main/scala/docspell/store/usertask/UserTaskStore.scala +++ b/modules/store/src/main/scala/docspell/store/usertask/UserTaskStore.scala @@ -89,7 +89,7 @@ trait UserTaskStore[F[_]] { implicit E: Encoder[A] ): F[UserTask[String]] - /** Delete all tasks of the given user that have name `name'. */ + /** Delete all tasks of the given user that have name `name`. */ def deleteAll(scope: UserTaskScope, name: Ident): F[Int] }