diff --git a/modules/backend/src/main/scala/docspell/backend/msg/JobDone.scala b/modules/backend/src/main/scala/docspell/backend/msg/JobDone.scala deleted file mode 100644 index 0ac701c3..00000000 --- a/modules/backend/src/main/scala/docspell/backend/msg/JobDone.scala +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Copyright 2020 Eike K. & Contributors - * - * SPDX-License-Identifier: AGPL-3.0-or-later - */ - -package docspell.backend.msg - -import docspell.common._ -import docspell.pubsub.api.{Topic, TypedTopic} - -import io.circe.generic.semiauto.{deriveDecoder, deriveEncoder} -import io.circe.{Decoder, Encoder} - -/** Message to notify about finished jobs. They have a final state. */ -final case class JobDone( - jobId: Ident, - group: Ident, - task: Ident, - args: String, - state: JobState -) -object JobDone { - implicit val jsonDecoder: Decoder[JobDone] = - deriveDecoder[JobDone] - - implicit val jsonEncoder: Encoder[JobDone] = - deriveEncoder[JobDone] - - val topic: TypedTopic[JobDone] = - TypedTopic(Topic("job-finished")) -} diff --git a/modules/backend/src/main/scala/docspell/backend/msg/Topics.scala b/modules/backend/src/main/scala/docspell/backend/msg/Topics.scala index ae53d9f6..bda187d1 100644 --- a/modules/backend/src/main/scala/docspell/backend/msg/Topics.scala +++ b/modules/backend/src/main/scala/docspell/backend/msg/Topics.scala @@ -7,8 +7,8 @@ package docspell.backend.msg import cats.data.NonEmptyList - import docspell.pubsub.api.{Topic, TypedTopic} +import docspell.scheduler.msg.JobDone /** All topics used in Docspell. */ object Topics { diff --git a/modules/joex/src/main/scala/docspell/joex/Config.scala b/modules/joex/src/main/scala/docspell/joex/Config.scala index 549b24ca..0d0945d6 100644 --- a/modules/joex/src/main/scala/docspell/joex/Config.scala +++ b/modules/joex/src/main/scala/docspell/joex/Config.scala @@ -19,7 +19,7 @@ import docspell.ftssolr.SolrConfig import docspell.joex.analysis.RegexNerFile import docspell.joex.hk.HouseKeepingConfig import docspell.joex.routes.InternalHeader -import docspell.joex.scheduler.{PeriodicSchedulerConfig, SchedulerConfig} +import docspell.scheduler.{PeriodicSchedulerConfig, SchedulerConfig} import docspell.joex.updatecheck.UpdateCheckConfig import docspell.logging.LogConfig import docspell.pubsub.naive.PubSubConfig diff --git a/modules/joex/src/main/scala/docspell/joex/ConfigFile.scala b/modules/joex/src/main/scala/docspell/joex/ConfigFile.scala index a17d94a3..ec5e70da 100644 --- a/modules/joex/src/main/scala/docspell/joex/ConfigFile.scala +++ b/modules/joex/src/main/scala/docspell/joex/ConfigFile.scala @@ -10,7 +10,7 @@ import cats.effect.Async import docspell.config.Implicits._ import docspell.config.{ConfigFactory, Validation} -import docspell.joex.scheduler.CountingScheme +import docspell.scheduler.CountingScheme import emil.MailAddress import emil.javamail.syntax._ diff --git a/modules/joex/src/main/scala/docspell/joex/JoexApp.scala b/modules/joex/src/main/scala/docspell/joex/JoexApp.scala index 4f096f02..7fe53412 100644 --- a/modules/joex/src/main/scala/docspell/joex/JoexApp.scala +++ b/modules/joex/src/main/scala/docspell/joex/JoexApp.scala @@ -7,7 +7,7 @@ package docspell.joex import docspell.common.Ident -import docspell.joex.scheduler.{PeriodicScheduler, Scheduler} +import docspell.scheduler.{PeriodicScheduler, Scheduler} import docspell.store.records.RJobLog trait JoexApp[F[_]] { diff --git a/modules/joex/src/main/scala/docspell/joex/JoexAppImpl.scala b/modules/joex/src/main/scala/docspell/joex/JoexAppImpl.scala index ce34e43c..7ba81536 100644 --- a/modules/joex/src/main/scala/docspell/joex/JoexAppImpl.scala +++ b/modules/joex/src/main/scala/docspell/joex/JoexAppImpl.scala @@ -9,7 +9,6 @@ package docspell.joex import cats.effect._ import cats.implicits._ import fs2.concurrent.SignallingRef - import docspell.analysis.TextAnalyser import docspell.backend.MailAddressCodec import docspell.backend.fulltext.CreateIndex @@ -32,7 +31,8 @@ import docspell.joex.preview._ import docspell.joex.process.ItemHandler import docspell.joex.process.ReProcessItem import docspell.joex.scanmailbox._ -import docspell.joex.scheduler._ +import docspell.scheduler._ +import docspell.scheduler.impl.{PeriodicSchedulerBuilder, SchedulerBuilder} import docspell.joex.updatecheck._ import docspell.notification.api.NotificationModule import docspell.notification.impl.NotificationModuleImpl @@ -42,7 +42,6 @@ import docspell.store.queue._ import docspell.store.records.{REmptyTrashSetting, RJobLog} import docspell.store.usertask.UserTaskScope import docspell.store.usertask.UserTaskStore - import emil.javamail._ import org.http4s.client.Client @@ -296,12 +295,12 @@ object JoexAppImpl extends MailAddressCodec { ) ) .resource - psch <- PeriodicScheduler.create( + psch <- PeriodicSchedulerBuilder.build( cfg.periodicScheduler, sch, queue, pstore, - joex + joex.notifyAllNodes ) app = new JoexAppImpl( cfg, diff --git a/modules/joex/src/main/scala/docspell/joex/emptytrash/EmptyTrashTask.scala b/modules/joex/src/main/scala/docspell/joex/emptytrash/EmptyTrashTask.scala index 5b1cbd3f..28a834c7 100644 --- a/modules/joex/src/main/scala/docspell/joex/emptytrash/EmptyTrashTask.scala +++ b/modules/joex/src/main/scala/docspell/joex/emptytrash/EmptyTrashTask.scala @@ -12,7 +12,7 @@ import fs2.Stream import docspell.backend.ops.{OItem, OItemSearch} import docspell.common._ -import docspell.joex.scheduler._ +import docspell.scheduler._ import docspell.store.records.RItem import docspell.store.usertask.UserTask diff --git a/modules/joex/src/main/scala/docspell/joex/filecopy/FileCopyTask.scala b/modules/joex/src/main/scala/docspell/joex/filecopy/FileCopyTask.scala index 67be2980..2f201363 100644 --- a/modules/joex/src/main/scala/docspell/joex/filecopy/FileCopyTask.scala +++ b/modules/joex/src/main/scala/docspell/joex/filecopy/FileCopyTask.scala @@ -12,7 +12,7 @@ import cats.implicits._ import docspell.common.FileCopyTaskArgs.Selection import docspell.common.{FileCopyTaskArgs, Ident} import docspell.joex.Config -import docspell.joex.scheduler.{JobTaskResultEncoder, Task} +import docspell.scheduler.{JobTaskResultEncoder, Task} import docspell.logging.Logger import docspell.store.file.{BinnyUtils, FileRepository, FileRepositoryConfig} import binny.CopyTool.Counter diff --git a/modules/joex/src/main/scala/docspell/joex/filecopy/FileIntegrityCheckTask.scala b/modules/joex/src/main/scala/docspell/joex/filecopy/FileIntegrityCheckTask.scala index 0b64f131..abd4d01b 100644 --- a/modules/joex/src/main/scala/docspell/joex/filecopy/FileIntegrityCheckTask.scala +++ b/modules/joex/src/main/scala/docspell/joex/filecopy/FileIntegrityCheckTask.scala @@ -12,7 +12,7 @@ import cats.implicits._ import docspell.backend.ops.OFileRepository import docspell.backend.ops.OFileRepository.IntegrityResult import docspell.common.{FileIntegrityCheckArgs, FileKey} -import docspell.joex.scheduler.{JobTaskResultEncoder, Task} +import docspell.scheduler.{JobTaskResultEncoder, Task} import docspell.store.records.RFileMeta import io.circe.Encoder import io.circe.generic.semiauto.deriveEncoder diff --git a/modules/joex/src/main/scala/docspell/joex/fts/FtsContext.scala b/modules/joex/src/main/scala/docspell/joex/fts/FtsContext.scala index 8e7f133b..41008a1d 100644 --- a/modules/joex/src/main/scala/docspell/joex/fts/FtsContext.scala +++ b/modules/joex/src/main/scala/docspell/joex/fts/FtsContext.scala @@ -9,7 +9,7 @@ package docspell.joex.fts import docspell.backend.fulltext.CreateIndex import docspell.ftsclient.FtsClient import docspell.joex.Config -import docspell.joex.scheduler.Context +import docspell.scheduler.Context import docspell.logging.Logger import docspell.store.Store diff --git a/modules/joex/src/main/scala/docspell/joex/fts/FtsWork.scala b/modules/joex/src/main/scala/docspell/joex/fts/FtsWork.scala index 1184c0a3..1a0a256f 100644 --- a/modules/joex/src/main/scala/docspell/joex/fts/FtsWork.scala +++ b/modules/joex/src/main/scala/docspell/joex/fts/FtsWork.scala @@ -14,7 +14,7 @@ import docspell.backend.fulltext.CreateIndex import docspell.common._ import docspell.ftsclient._ import docspell.joex.Config -import docspell.joex.scheduler.Context +import docspell.scheduler.Context import docspell.logging.Logger object FtsWork { diff --git a/modules/joex/src/main/scala/docspell/joex/fts/MigrationTask.scala b/modules/joex/src/main/scala/docspell/joex/fts/MigrationTask.scala index dee2fc1f..707b457b 100644 --- a/modules/joex/src/main/scala/docspell/joex/fts/MigrationTask.scala +++ b/modules/joex/src/main/scala/docspell/joex/fts/MigrationTask.scala @@ -13,7 +13,7 @@ import docspell.backend.fulltext.CreateIndex import docspell.common._ import docspell.ftsclient._ import docspell.joex.Config -import docspell.joex.scheduler.Task +import docspell.scheduler.Task import docspell.store.records.RJob object MigrationTask { diff --git a/modules/joex/src/main/scala/docspell/joex/fts/ReIndexTask.scala b/modules/joex/src/main/scala/docspell/joex/fts/ReIndexTask.scala index 7c7f81e0..d9c0e832 100644 --- a/modules/joex/src/main/scala/docspell/joex/fts/ReIndexTask.scala +++ b/modules/joex/src/main/scala/docspell/joex/fts/ReIndexTask.scala @@ -14,7 +14,7 @@ import docspell.common._ import docspell.ftsclient._ import docspell.joex.Config import docspell.joex.fts.FtsWork.syntax._ -import docspell.joex.scheduler.Task +import docspell.scheduler.Task object ReIndexTask { type Args = ReIndexTaskArgs diff --git a/modules/joex/src/main/scala/docspell/joex/hk/CheckNodesTask.scala b/modules/joex/src/main/scala/docspell/joex/hk/CheckNodesTask.scala index 09651893..aff4a5d3 100644 --- a/modules/joex/src/main/scala/docspell/joex/hk/CheckNodesTask.scala +++ b/modules/joex/src/main/scala/docspell/joex/hk/CheckNodesTask.scala @@ -10,7 +10,7 @@ import cats.effect._ import cats.implicits._ import docspell.common._ -import docspell.joex.scheduler.{Context, Task} +import docspell.scheduler.{Context, Task} import docspell.logging.Logger import docspell.store.records._ diff --git a/modules/joex/src/main/scala/docspell/joex/hk/CleanupInvitesTask.scala b/modules/joex/src/main/scala/docspell/joex/hk/CleanupInvitesTask.scala index de23eb71..e0cf916c 100644 --- a/modules/joex/src/main/scala/docspell/joex/hk/CleanupInvitesTask.scala +++ b/modules/joex/src/main/scala/docspell/joex/hk/CleanupInvitesTask.scala @@ -10,7 +10,7 @@ import cats.effect._ import cats.implicits._ import docspell.common._ -import docspell.joex.scheduler.Task +import docspell.scheduler.Task import docspell.store.records._ object CleanupInvitesTask { diff --git a/modules/joex/src/main/scala/docspell/joex/hk/CleanupJobsTask.scala b/modules/joex/src/main/scala/docspell/joex/hk/CleanupJobsTask.scala index 67a1e1a2..104ae306 100644 --- a/modules/joex/src/main/scala/docspell/joex/hk/CleanupJobsTask.scala +++ b/modules/joex/src/main/scala/docspell/joex/hk/CleanupJobsTask.scala @@ -11,7 +11,7 @@ import cats.implicits._ import fs2.Stream import docspell.common._ -import docspell.joex.scheduler.Task +import docspell.scheduler.Task import docspell.store.Store import docspell.store.records._ diff --git a/modules/joex/src/main/scala/docspell/joex/hk/CleanupRememberMeTask.scala b/modules/joex/src/main/scala/docspell/joex/hk/CleanupRememberMeTask.scala index 120b8cc8..e1040e1c 100644 --- a/modules/joex/src/main/scala/docspell/joex/hk/CleanupRememberMeTask.scala +++ b/modules/joex/src/main/scala/docspell/joex/hk/CleanupRememberMeTask.scala @@ -10,7 +10,7 @@ import cats.effect._ import cats.implicits._ import docspell.common._ -import docspell.joex.scheduler.Task +import docspell.scheduler.Task import docspell.store.records._ object CleanupRememberMeTask { diff --git a/modules/joex/src/main/scala/docspell/joex/hk/HouseKeepingTask.scala b/modules/joex/src/main/scala/docspell/joex/hk/HouseKeepingTask.scala index c0c3d084..206b7c29 100644 --- a/modules/joex/src/main/scala/docspell/joex/hk/HouseKeepingTask.scala +++ b/modules/joex/src/main/scala/docspell/joex/hk/HouseKeepingTask.scala @@ -13,7 +13,7 @@ import docspell.backend.ops.OFileRepository import docspell.common._ import docspell.joex.Config import docspell.joex.filecopy.FileIntegrityCheckTask -import docspell.joex.scheduler.{JobTaskResultEncoder, Task} +import docspell.scheduler.{JobTaskResultEncoder, Task} import docspell.store.records._ import docspell.store.usertask.UserTaskScope diff --git a/modules/joex/src/main/scala/docspell/joex/hk/IntegrityCheckTask.scala b/modules/joex/src/main/scala/docspell/joex/hk/IntegrityCheckTask.scala index 42bb3333..b9b32ccb 100644 --- a/modules/joex/src/main/scala/docspell/joex/hk/IntegrityCheckTask.scala +++ b/modules/joex/src/main/scala/docspell/joex/hk/IntegrityCheckTask.scala @@ -12,7 +12,7 @@ import cats.implicits._ import docspell.backend.ops.OFileRepository import docspell.common._ import docspell.joex.filecopy.FileIntegrityCheckTask -import docspell.joex.scheduler.Task +import docspell.scheduler.Task object IntegrityCheckTask { diff --git a/modules/joex/src/main/scala/docspell/joex/learn/LearnClassifierTask.scala b/modules/joex/src/main/scala/docspell/joex/learn/LearnClassifierTask.scala index 129afc5a..60fb01e3 100644 --- a/modules/joex/src/main/scala/docspell/joex/learn/LearnClassifierTask.scala +++ b/modules/joex/src/main/scala/docspell/joex/learn/LearnClassifierTask.scala @@ -14,7 +14,7 @@ import docspell.analysis.TextAnalyser import docspell.backend.ops.OCollective import docspell.common._ import docspell.joex.Config -import docspell.joex.scheduler._ +import docspell.scheduler._ import docspell.logging.Logger import docspell.store.records.{RClassifierModel, RClassifierSetting} diff --git a/modules/joex/src/main/scala/docspell/joex/learn/LearnItemEntities.scala b/modules/joex/src/main/scala/docspell/joex/learn/LearnItemEntities.scala index 9fc1b502..cf8e7fe5 100644 --- a/modules/joex/src/main/scala/docspell/joex/learn/LearnItemEntities.scala +++ b/modules/joex/src/main/scala/docspell/joex/learn/LearnItemEntities.scala @@ -14,7 +14,7 @@ import fs2.Stream import docspell.analysis.TextAnalyser import docspell.analysis.classifier.TextClassifier.Data import docspell.common._ -import docspell.joex.scheduler._ +import docspell.scheduler._ object LearnItemEntities { def learnAll[F[_]: Async, A]( diff --git a/modules/joex/src/main/scala/docspell/joex/learn/LearnTags.scala b/modules/joex/src/main/scala/docspell/joex/learn/LearnTags.scala index 732a38af..f1460c19 100644 --- a/modules/joex/src/main/scala/docspell/joex/learn/LearnTags.scala +++ b/modules/joex/src/main/scala/docspell/joex/learn/LearnTags.scala @@ -12,7 +12,7 @@ import cats.implicits._ import docspell.analysis.TextAnalyser import docspell.common._ -import docspell.joex.scheduler._ +import docspell.scheduler._ import docspell.store.records.RClassifierSetting object LearnTags { diff --git a/modules/joex/src/main/scala/docspell/joex/learn/SelectItems.scala b/modules/joex/src/main/scala/docspell/joex/learn/SelectItems.scala index fa99354d..b25f28ce 100644 --- a/modules/joex/src/main/scala/docspell/joex/learn/SelectItems.scala +++ b/modules/joex/src/main/scala/docspell/joex/learn/SelectItems.scala @@ -10,7 +10,7 @@ import fs2.{Pipe, Stream} import docspell.analysis.classifier.TextClassifier.Data import docspell.common._ -import docspell.joex.scheduler.Context +import docspell.scheduler.Context import docspell.store.Store import docspell.store.qb.Batch import docspell.store.queries.{QItem, TextAndTag} diff --git a/modules/joex/src/main/scala/docspell/joex/learn/StoreClassifierModel.scala b/modules/joex/src/main/scala/docspell/joex/learn/StoreClassifierModel.scala index e0e7eabc..8c4feb9b 100644 --- a/modules/joex/src/main/scala/docspell/joex/learn/StoreClassifierModel.scala +++ b/modules/joex/src/main/scala/docspell/joex/learn/StoreClassifierModel.scala @@ -12,7 +12,7 @@ import fs2.io.file.Files import docspell.analysis.classifier.ClassifierModel import docspell.common._ -import docspell.joex.scheduler._ +import docspell.scheduler._ import docspell.logging.Logger import docspell.store.Store import docspell.store.records.RClassifierModel diff --git a/modules/joex/src/main/scala/docspell/joex/notify/PeriodicDueItemsTask.scala b/modules/joex/src/main/scala/docspell/joex/notify/PeriodicDueItemsTask.scala index 8ef5bc33..69361b69 100644 --- a/modules/joex/src/main/scala/docspell/joex/notify/PeriodicDueItemsTask.scala +++ b/modules/joex/src/main/scala/docspell/joex/notify/PeriodicDueItemsTask.scala @@ -12,8 +12,8 @@ import cats.implicits._ import docspell.backend.ops.ONotification import docspell.common._ -import docspell.joex.scheduler.Context -import docspell.joex.scheduler.Task +import docspell.scheduler.Context +import docspell.scheduler.Task import docspell.notification.api.EventContext import docspell.notification.api.NotificationChannel import docspell.notification.api.PeriodicDueItemsArgs diff --git a/modules/joex/src/main/scala/docspell/joex/notify/PeriodicQueryTask.scala b/modules/joex/src/main/scala/docspell/joex/notify/PeriodicQueryTask.scala index 088f65ed..8ed45a86 100644 --- a/modules/joex/src/main/scala/docspell/joex/notify/PeriodicQueryTask.scala +++ b/modules/joex/src/main/scala/docspell/joex/notify/PeriodicQueryTask.scala @@ -13,8 +13,8 @@ import cats.implicits._ import docspell.backend.ops.ONotification import docspell.common._ -import docspell.joex.scheduler.Context -import docspell.joex.scheduler.Task +import docspell.scheduler.Context +import docspell.scheduler.Task import docspell.notification.api.EventContext import docspell.notification.api.NotificationChannel import docspell.notification.api.PeriodicQueryArgs diff --git a/modules/joex/src/main/scala/docspell/joex/pagecount/AllPageCountTask.scala b/modules/joex/src/main/scala/docspell/joex/pagecount/AllPageCountTask.scala index c882d582..99b315b7 100644 --- a/modules/joex/src/main/scala/docspell/joex/pagecount/AllPageCountTask.scala +++ b/modules/joex/src/main/scala/docspell/joex/pagecount/AllPageCountTask.scala @@ -13,8 +13,8 @@ import fs2.{Chunk, Stream} import docspell.backend.JobFactory import docspell.backend.ops.OJoex import docspell.common._ -import docspell.joex.scheduler.Context -import docspell.joex.scheduler.Task +import docspell.scheduler.Context +import docspell.scheduler.Task import docspell.store.queue.JobQueue import docspell.store.records.RAttachment import docspell.store.records.RJob diff --git a/modules/joex/src/main/scala/docspell/joex/pagecount/PageCountTask.scala b/modules/joex/src/main/scala/docspell/joex/pagecount/PageCountTask.scala index fcbaaa79..5f4e73fa 100644 --- a/modules/joex/src/main/scala/docspell/joex/pagecount/PageCountTask.scala +++ b/modules/joex/src/main/scala/docspell/joex/pagecount/PageCountTask.scala @@ -11,8 +11,8 @@ import cats.implicits._ import docspell.common._ import docspell.joex.process.AttachmentPageCount -import docspell.joex.scheduler.Context -import docspell.joex.scheduler.Task +import docspell.scheduler.Context +import docspell.scheduler.Task import docspell.store.records.RAttachment import docspell.store.records.RAttachmentMeta diff --git a/modules/joex/src/main/scala/docspell/joex/pdfconv/ConvertAllPdfTask.scala b/modules/joex/src/main/scala/docspell/joex/pdfconv/ConvertAllPdfTask.scala index 1df9fcf4..e4def9ac 100644 --- a/modules/joex/src/main/scala/docspell/joex/pdfconv/ConvertAllPdfTask.scala +++ b/modules/joex/src/main/scala/docspell/joex/pdfconv/ConvertAllPdfTask.scala @@ -12,7 +12,7 @@ import fs2.{Chunk, Stream} import docspell.backend.ops.OJoex import docspell.common._ -import docspell.joex.scheduler.{Context, Task} +import docspell.scheduler.{Context, Task} import docspell.store.queue.JobQueue import docspell.store.records.RAttachment import docspell.store.records._ diff --git a/modules/joex/src/main/scala/docspell/joex/pdfconv/PdfConvTask.scala b/modules/joex/src/main/scala/docspell/joex/pdfconv/PdfConvTask.scala index ca82615a..47843a0b 100644 --- a/modules/joex/src/main/scala/docspell/joex/pdfconv/PdfConvTask.scala +++ b/modules/joex/src/main/scala/docspell/joex/pdfconv/PdfConvTask.scala @@ -16,7 +16,7 @@ import docspell.common._ import docspell.convert.ConversionResult import docspell.convert.extern.OcrMyPdf import docspell.joex.Config -import docspell.joex.scheduler.{Context, Task} +import docspell.scheduler.{Context, Task} import docspell.store.records._ import io.circe.generic.semiauto._ diff --git a/modules/joex/src/main/scala/docspell/joex/preview/AllPreviewsTask.scala b/modules/joex/src/main/scala/docspell/joex/preview/AllPreviewsTask.scala index 4021b67a..15616ed5 100644 --- a/modules/joex/src/main/scala/docspell/joex/preview/AllPreviewsTask.scala +++ b/modules/joex/src/main/scala/docspell/joex/preview/AllPreviewsTask.scala @@ -14,8 +14,8 @@ import docspell.backend.JobFactory import docspell.backend.ops.OJoex import docspell.common.MakePreviewArgs.StoreMode import docspell.common._ -import docspell.joex.scheduler.Context -import docspell.joex.scheduler.Task +import docspell.scheduler.Context +import docspell.scheduler.Task import docspell.store.queue.JobQueue import docspell.store.records.RAttachment import docspell.store.records.RJob diff --git a/modules/joex/src/main/scala/docspell/joex/preview/MakePreviewTask.scala b/modules/joex/src/main/scala/docspell/joex/preview/MakePreviewTask.scala index 14bf18f3..d20a6c75 100644 --- a/modules/joex/src/main/scala/docspell/joex/preview/MakePreviewTask.scala +++ b/modules/joex/src/main/scala/docspell/joex/preview/MakePreviewTask.scala @@ -13,8 +13,8 @@ import docspell.common._ import docspell.extract.pdfbox.PdfboxPreview import docspell.extract.pdfbox.PreviewConfig import docspell.joex.process.AttachmentPreview -import docspell.joex.scheduler.Context -import docspell.joex.scheduler.Task +import docspell.scheduler.Context +import docspell.scheduler.Task import docspell.store.records.RAttachment import docspell.store.records.RAttachmentPreview diff --git a/modules/joex/src/main/scala/docspell/joex/process/AttachmentPageCount.scala b/modules/joex/src/main/scala/docspell/joex/process/AttachmentPageCount.scala index cce9400d..f6444311 100644 --- a/modules/joex/src/main/scala/docspell/joex/process/AttachmentPageCount.scala +++ b/modules/joex/src/main/scala/docspell/joex/process/AttachmentPageCount.scala @@ -15,7 +15,7 @@ import fs2.Stream import docspell.common._ import docspell.extract.pdfbox.PdfMetaData import docspell.extract.pdfbox.PdfboxExtract -import docspell.joex.scheduler._ +import docspell.scheduler._ import docspell.store.records.RAttachment import docspell.store.records._ diff --git a/modules/joex/src/main/scala/docspell/joex/process/AttachmentPreview.scala b/modules/joex/src/main/scala/docspell/joex/process/AttachmentPreview.scala index 2a55775d..b15253da 100644 --- a/modules/joex/src/main/scala/docspell/joex/process/AttachmentPreview.scala +++ b/modules/joex/src/main/scala/docspell/joex/process/AttachmentPreview.scala @@ -15,7 +15,7 @@ import fs2.Stream import docspell.common._ import docspell.extract.pdfbox.PdfboxPreview import docspell.extract.pdfbox.PreviewConfig -import docspell.joex.scheduler._ +import docspell.scheduler._ import docspell.store.queries.QAttachment import docspell.store.records.RAttachment import docspell.store.records._ diff --git a/modules/joex/src/main/scala/docspell/joex/process/ConvertPdf.scala b/modules/joex/src/main/scala/docspell/joex/process/ConvertPdf.scala index 73754127..a7855808 100644 --- a/modules/joex/src/main/scala/docspell/joex/process/ConvertPdf.scala +++ b/modules/joex/src/main/scala/docspell/joex/process/ConvertPdf.scala @@ -17,7 +17,7 @@ import docspell.convert.ConversionResult.Handler import docspell.convert.SanitizeHtml import docspell.convert._ import docspell.joex.extract.JsoupSanitizer -import docspell.joex.scheduler._ +import docspell.scheduler._ import docspell.store.records._ /** Goes through all attachments and creates a PDF version of it where supported. diff --git a/modules/joex/src/main/scala/docspell/joex/process/CreateItem.scala b/modules/joex/src/main/scala/docspell/joex/process/CreateItem.scala index 092c8495..56fd9b2f 100644 --- a/modules/joex/src/main/scala/docspell/joex/process/CreateItem.scala +++ b/modules/joex/src/main/scala/docspell/joex/process/CreateItem.scala @@ -13,7 +13,7 @@ import cats.implicits._ import fs2.Stream import docspell.common._ -import docspell.joex.scheduler.{Context, Task} +import docspell.scheduler.{Context, Task} import docspell.store.file.FileMetadata import docspell.store.queries.QItem import docspell.store.records._ diff --git a/modules/joex/src/main/scala/docspell/joex/process/CrossCheckProposals.scala b/modules/joex/src/main/scala/docspell/joex/process/CrossCheckProposals.scala index 3cd9d3ad..e13b5da8 100644 --- a/modules/joex/src/main/scala/docspell/joex/process/CrossCheckProposals.scala +++ b/modules/joex/src/main/scala/docspell/joex/process/CrossCheckProposals.scala @@ -12,7 +12,7 @@ import cats.effect.Sync import cats.implicits._ import docspell.common._ -import docspell.joex.scheduler.Task +import docspell.scheduler.Task import docspell.logging.Logger /** After candidates have been determined, the set is reduced by doing some cross checks. diff --git a/modules/joex/src/main/scala/docspell/joex/process/DuplicateCheck.scala b/modules/joex/src/main/scala/docspell/joex/process/DuplicateCheck.scala index 41fcd0e3..6b07e949 100644 --- a/modules/joex/src/main/scala/docspell/joex/process/DuplicateCheck.scala +++ b/modules/joex/src/main/scala/docspell/joex/process/DuplicateCheck.scala @@ -10,7 +10,7 @@ import cats.effect._ import cats.implicits._ import docspell.common._ -import docspell.joex.scheduler.{Context, Task} +import docspell.scheduler.{Context, Task} import docspell.store.queries.QItem import docspell.store.records.RFileMeta import docspell.store.records.RJob diff --git a/modules/joex/src/main/scala/docspell/joex/process/EvalProposals.scala b/modules/joex/src/main/scala/docspell/joex/process/EvalProposals.scala index 38c129b1..2a9092a1 100644 --- a/modules/joex/src/main/scala/docspell/joex/process/EvalProposals.scala +++ b/modules/joex/src/main/scala/docspell/joex/process/EvalProposals.scala @@ -12,7 +12,7 @@ import cats.effect.Sync import cats.implicits._ import docspell.common._ -import docspell.joex.scheduler.{Context, Task} +import docspell.scheduler.{Context, Task} import docspell.store.records.{RAttachmentMeta, RPerson} /** Calculate weights for candidates that adds the most likely candidate a lower number. diff --git a/modules/joex/src/main/scala/docspell/joex/process/ExtractArchive.scala b/modules/joex/src/main/scala/docspell/joex/process/ExtractArchive.scala index 17f90b59..b3fb3bbf 100644 --- a/modules/joex/src/main/scala/docspell/joex/process/ExtractArchive.scala +++ b/modules/joex/src/main/scala/docspell/joex/process/ExtractArchive.scala @@ -18,7 +18,7 @@ import fs2.Stream import docspell.common._ import docspell.files.Zip import docspell.joex.mail._ -import docspell.joex.scheduler._ +import docspell.scheduler._ import docspell.store.records._ import emil.Mail diff --git a/modules/joex/src/main/scala/docspell/joex/process/FindProposal.scala b/modules/joex/src/main/scala/docspell/joex/process/FindProposal.scala index a6898f51..a4af8e48 100644 --- a/modules/joex/src/main/scala/docspell/joex/process/FindProposal.scala +++ b/modules/joex/src/main/scala/docspell/joex/process/FindProposal.scala @@ -16,7 +16,7 @@ import docspell.analysis.contact._ import docspell.common.MetaProposal.Candidate import docspell.common._ import docspell.joex.Config -import docspell.joex.scheduler.{Context, Task} +import docspell.scheduler.{Context, Task} import docspell.store.records._ /** Super simple approach to find corresponding meta data to an item by looking up values diff --git a/modules/joex/src/main/scala/docspell/joex/process/ItemData.scala b/modules/joex/src/main/scala/docspell/joex/process/ItemData.scala index 3b5ac5ab..c96c0189 100644 --- a/modules/joex/src/main/scala/docspell/joex/process/ItemData.scala +++ b/modules/joex/src/main/scala/docspell/joex/process/ItemData.scala @@ -8,7 +8,7 @@ package docspell.joex.process import docspell.common._ import docspell.joex.process.ItemData.AttachmentDates -import docspell.joex.scheduler.JobTaskResultEncoder +import docspell.scheduler.JobTaskResultEncoder import docspell.store.records.{RAttachment, RAttachmentMeta, RItem} import io.circe.syntax.EncoderOps diff --git a/modules/joex/src/main/scala/docspell/joex/process/ItemHandler.scala b/modules/joex/src/main/scala/docspell/joex/process/ItemHandler.scala index 5da1e6d6..21627f56 100644 --- a/modules/joex/src/main/scala/docspell/joex/process/ItemHandler.scala +++ b/modules/joex/src/main/scala/docspell/joex/process/ItemHandler.scala @@ -17,7 +17,7 @@ import docspell.common.{ItemState, ProcessItemArgs} import docspell.ftsclient.FtsClient import docspell.joex.Config import docspell.joex.analysis.RegexNerFile -import docspell.joex.scheduler.Task +import docspell.scheduler.Task import docspell.store.queries.QItem import docspell.store.records.RItem diff --git a/modules/joex/src/main/scala/docspell/joex/process/LinkProposal.scala b/modules/joex/src/main/scala/docspell/joex/process/LinkProposal.scala index 7f5418fc..2e85e450 100644 --- a/modules/joex/src/main/scala/docspell/joex/process/LinkProposal.scala +++ b/modules/joex/src/main/scala/docspell/joex/process/LinkProposal.scala @@ -11,7 +11,7 @@ import cats.effect.Sync import cats.implicits._ import docspell.common._ -import docspell.joex.scheduler.{Context, Task} +import docspell.scheduler.{Context, Task} import docspell.store.records.RItem object LinkProposal { diff --git a/modules/joex/src/main/scala/docspell/joex/process/ProcessItem.scala b/modules/joex/src/main/scala/docspell/joex/process/ProcessItem.scala index 735d13be..5918698c 100644 --- a/modules/joex/src/main/scala/docspell/joex/process/ProcessItem.scala +++ b/modules/joex/src/main/scala/docspell/joex/process/ProcessItem.scala @@ -15,7 +15,7 @@ import docspell.common.ProcessItemArgs import docspell.ftsclient.FtsClient import docspell.joex.Config import docspell.joex.analysis.RegexNerFile -import docspell.joex.scheduler.Task +import docspell.scheduler.Task object ProcessItem { diff --git a/modules/joex/src/main/scala/docspell/joex/process/ReProcessItem.scala b/modules/joex/src/main/scala/docspell/joex/process/ReProcessItem.scala index 15aa939d..834832a5 100644 --- a/modules/joex/src/main/scala/docspell/joex/process/ReProcessItem.scala +++ b/modules/joex/src/main/scala/docspell/joex/process/ReProcessItem.scala @@ -16,8 +16,8 @@ import docspell.common._ import docspell.ftsclient.FtsClient import docspell.joex.Config import docspell.joex.analysis.RegexNerFile -import docspell.joex.scheduler.Context -import docspell.joex.scheduler.Task +import docspell.scheduler.Context +import docspell.scheduler.Task import docspell.store.queries.QItem import docspell.store.records.RAttachment import docspell.store.records.RAttachmentSource diff --git a/modules/joex/src/main/scala/docspell/joex/process/RemoveEmptyItem.scala b/modules/joex/src/main/scala/docspell/joex/process/RemoveEmptyItem.scala index fadcb949..3d961937 100644 --- a/modules/joex/src/main/scala/docspell/joex/process/RemoveEmptyItem.scala +++ b/modules/joex/src/main/scala/docspell/joex/process/RemoveEmptyItem.scala @@ -11,7 +11,7 @@ import cats.implicits._ import docspell.backend.ops.OItem import docspell.common._ -import docspell.joex.scheduler.Task +import docspell.scheduler.Task object RemoveEmptyItem { diff --git a/modules/joex/src/main/scala/docspell/joex/process/SaveProposals.scala b/modules/joex/src/main/scala/docspell/joex/process/SaveProposals.scala index 18cbb933..cdc8627a 100644 --- a/modules/joex/src/main/scala/docspell/joex/process/SaveProposals.scala +++ b/modules/joex/src/main/scala/docspell/joex/process/SaveProposals.scala @@ -10,7 +10,7 @@ import cats.effect.Sync import cats.implicits._ import docspell.common._ -import docspell.joex.scheduler.{Context, Task} +import docspell.scheduler.{Context, Task} import docspell.store.AddResult import docspell.store.records._ diff --git a/modules/joex/src/main/scala/docspell/joex/process/SetGivenData.scala b/modules/joex/src/main/scala/docspell/joex/process/SetGivenData.scala index 54283101..0734d294 100644 --- a/modules/joex/src/main/scala/docspell/joex/process/SetGivenData.scala +++ b/modules/joex/src/main/scala/docspell/joex/process/SetGivenData.scala @@ -11,7 +11,7 @@ import cats.implicits._ import docspell.backend.ops.OItem import docspell.common._ -import docspell.joex.scheduler.Task +import docspell.scheduler.Task import docspell.store.UpdateResult object SetGivenData { diff --git a/modules/joex/src/main/scala/docspell/joex/process/TextAnalysis.scala b/modules/joex/src/main/scala/docspell/joex/process/TextAnalysis.scala index 00e410da..aced45f7 100644 --- a/modules/joex/src/main/scala/docspell/joex/process/TextAnalysis.scala +++ b/modules/joex/src/main/scala/docspell/joex/process/TextAnalysis.scala @@ -18,8 +18,8 @@ import docspell.joex.Config import docspell.joex.analysis.RegexNerFile import docspell.joex.learn.{ClassifierName, Classify, LearnClassifierTask} import docspell.joex.process.ItemData.AttachmentDates -import docspell.joex.scheduler.Context -import docspell.joex.scheduler.Task +import docspell.scheduler.Context +import docspell.scheduler.Task import docspell.store.records.{RAttachmentMeta, RClassifierSetting} object TextAnalysis { diff --git a/modules/joex/src/main/scala/docspell/joex/process/TextExtraction.scala b/modules/joex/src/main/scala/docspell/joex/process/TextExtraction.scala index 3f6be877..eaa02683 100644 --- a/modules/joex/src/main/scala/docspell/joex/process/TextExtraction.scala +++ b/modules/joex/src/main/scala/docspell/joex/process/TextExtraction.scala @@ -13,7 +13,7 @@ import cats.implicits._ import docspell.common._ import docspell.extract.{ExtractConfig, ExtractResult, Extraction} import docspell.ftsclient.{FtsClient, TextData} -import docspell.joex.scheduler.{Context, Task} +import docspell.scheduler.{Context, Task} import docspell.store.records.{RAttachment, RAttachmentMeta, RFileMeta} object TextExtraction { diff --git a/modules/joex/src/main/scala/docspell/joex/scanmailbox/ScanMailboxTask.scala b/modules/joex/src/main/scala/docspell/joex/scanmailbox/ScanMailboxTask.scala index 869362e3..363420b0 100644 --- a/modules/joex/src/main/scala/docspell/joex/scanmailbox/ScanMailboxTask.scala +++ b/modules/joex/src/main/scala/docspell/joex/scanmailbox/ScanMailboxTask.scala @@ -16,7 +16,7 @@ import fs2._ import docspell.backend.ops.{OJoex, OUpload} import docspell.common._ import docspell.joex.Config -import docspell.joex.scheduler.{Context, Task} +import docspell.scheduler.{Context, Task} import docspell.logging.Logger import docspell.store.queries.QOrganization import docspell.store.records._ diff --git a/modules/joex/src/main/scala/docspell/joex/scheduler/Context.scala b/modules/joex/src/main/scala/docspell/joex/scheduler/Context.scala deleted file mode 100644 index 2fb2a529..00000000 --- a/modules/joex/src/main/scala/docspell/joex/scheduler/Context.scala +++ /dev/null @@ -1,82 +0,0 @@ -/* - * Copyright 2020 Eike K. & Contributors - * - * SPDX-License-Identifier: AGPL-3.0-or-later - */ - -package docspell.joex.scheduler - -import cats.effect._ -import cats.implicits._ -import cats.{Applicative, Functor} - -import docspell.common._ -import docspell.logging.Logger -import docspell.store.Store -import docspell.store.records.RJob - -trait Context[F[_], A] { self => - - def jobId: Ident - - def args: A - - def config: SchedulerConfig - - def logger: Logger[F] - - def setProgress(percent: Int): F[Unit] - - def store: Store[F] - - final def isLastRetry(implicit ev: Applicative[F]): F[Boolean] = - for { - current <- store.transact(RJob.getRetries(jobId)) - last = config.retries == current.getOrElse(0) - } yield last - - def map[C](f: A => C)(implicit F: Functor[F]): Context[F, C] = - new Context.ContextImpl[F, C](f(args), logger, store, config, jobId) -} - -object Context { - - def create[F[_]: Async, A]( - jobId: Ident, - arg: A, - config: SchedulerConfig, - log: Logger[F], - store: Store[F] - ): Context[F, A] = - new ContextImpl(arg, log, store, config, jobId) - - def apply[F[_]: Async, A]( - job: RJob, - arg: A, - config: SchedulerConfig, - logSink: LogSink[F], - store: Store[F] - ): F[Context[F, A]] = { - val log = docspell.logging.getLogger[F] - for { - _ <- log.trace("Creating logger for task run") - logger <- QueueLogger(job.id, job.info, config.logBufferSize, logSink) - _ <- log.trace("Logger created, instantiating context") - ctx = create[F, A](job.id, arg, config, logger, store) - } yield ctx - } - - final private class ContextImpl[F[_]: Functor, A]( - val args: A, - val logger: Logger[F], - val store: Store[F], - val config: SchedulerConfig, - val jobId: Ident - ) extends Context[F, A] { - - def setProgress(percent: Int): F[Unit] = { - val pval = math.min(100, math.max(0, percent)) - store.transact(RJob.setProgress(jobId, pval)).map(_ => ()) - } - } -} diff --git a/modules/joex/src/main/scala/docspell/joex/scheduler/CountingScheme.scala b/modules/joex/src/main/scala/docspell/joex/scheduler/CountingScheme.scala deleted file mode 100644 index 68890f57..00000000 --- a/modules/joex/src/main/scala/docspell/joex/scheduler/CountingScheme.scala +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Copyright 2020 Eike K. & Contributors - * - * SPDX-License-Identifier: AGPL-3.0-or-later - */ - -package docspell.joex.scheduler - -import cats.implicits._ - -import docspell.common.Priority - -/** A counting scheme to indicate a ratio between scheduling high and low priority jobs. - * - * For example high=4, low=1 means: ”schedule 4 high priority jobs and then 1 low - * priority job“. - */ -case class CountingScheme(high: Int, low: Int, counter: Int = 0) { - - def nextPriority: (CountingScheme, Priority) = - if (counter <= 0) (increment, Priority.High) - else { - val rest = counter % (high + low) - if (rest < high) (increment, Priority.High) - else (increment, Priority.Low) - } - - def increment: CountingScheme = - copy(counter = counter + 1) -} - -object CountingScheme { - - def writeString(cs: CountingScheme): String = - s"${cs.high},${cs.low}" - - def readString(str: String): Either[String, CountingScheme] = - str.split(',') match { - case Array(h, l) => - Either.catchNonFatal(CountingScheme(h.toInt, l.toInt)).left.map(_.getMessage) - case _ => - Left(s"Invalid counting scheme: $str") - } -} diff --git a/modules/joex/src/main/scala/docspell/joex/scheduler/JobTask.scala b/modules/joex/src/main/scala/docspell/joex/scheduler/JobTask.scala deleted file mode 100644 index aec269b2..00000000 --- a/modules/joex/src/main/scala/docspell/joex/scheduler/JobTask.scala +++ /dev/null @@ -1,51 +0,0 @@ -/* - * Copyright 2020 Eike K. & Contributors - * - * SPDX-License-Identifier: AGPL-3.0-or-later - */ - -package docspell.joex.scheduler - -import cats.effect.Sync -import cats.implicits._ - -import docspell.common.Ident -import docspell.common.syntax.all._ - -import io.circe.Decoder - -/** Binds a Task to a name. This is required to lookup the code based on the taskName in - * the RJob data and to execute it given the arguments that have to be read from a - * string. - * - * Since the scheduler only has a string for the task argument, this only works for Task - * impls that accept a string. There is a convenience constructor that uses circe to - * decode json into some type A. - */ -case class JobTask[F[_]]( - name: Ident, - task: Task[F, String, JobTaskResult], - onCancel: Task[F, String, Unit] -) - -object JobTask { - - def json[F[_]: Sync, A, B]( - name: Ident, - task: Task[F, A, B], - onCancel: Task[F, A, Unit] - )(implicit - D: Decoder[A], - E: JobTaskResultEncoder[B] - ): JobTask[F] = { - val convert: String => F[A] = - str => - str.parseJsonAs[A] match { - case Right(a) => a.pure[F] - case Left(ex) => - Sync[F].raiseError(new Exception(s"Cannot parse task arguments: $str", ex)) - } - - JobTask(name, task.contramap(convert).map(E.encode), onCancel.contramap(convert)) - } -} diff --git a/modules/joex/src/main/scala/docspell/joex/scheduler/JobTaskRegistry.scala b/modules/joex/src/main/scala/docspell/joex/scheduler/JobTaskRegistry.scala deleted file mode 100644 index 0efb37b7..00000000 --- a/modules/joex/src/main/scala/docspell/joex/scheduler/JobTaskRegistry.scala +++ /dev/null @@ -1,30 +0,0 @@ -/* - * Copyright 2020 Eike K. & Contributors - * - * SPDX-License-Identifier: AGPL-3.0-or-later - */ - -package docspell.joex.scheduler - -import docspell.common.Ident - -/** This is a mapping from some identifier to a task. This is used by the scheduler to - * lookup an implementation using the taskName field of the RJob database record. - */ -final class JobTaskRegistry[F[_]](tasks: Map[Ident, JobTask[F]]) { - - def withTask(task: JobTask[F]): JobTaskRegistry[F] = - JobTaskRegistry(tasks.updated(task.name, task)) - - def find(taskName: Ident): Option[JobTask[F]] = - tasks.get(taskName) -} - -object JobTaskRegistry { - - def apply[F[_]](map: Map[Ident, JobTask[F]]): JobTaskRegistry[F] = - new JobTaskRegistry[F](map) - - def empty[F[_]]: JobTaskRegistry[F] = apply(Map.empty) - -} diff --git a/modules/joex/src/main/scala/docspell/joex/scheduler/JobTaskResult.scala b/modules/joex/src/main/scala/docspell/joex/scheduler/JobTaskResult.scala deleted file mode 100644 index d735565e..00000000 --- a/modules/joex/src/main/scala/docspell/joex/scheduler/JobTaskResult.scala +++ /dev/null @@ -1,27 +0,0 @@ -/* - * Copyright 2020 Eike K. & Contributors - * - * SPDX-License-Identifier: AGPL-3.0-or-later - */ - -package docspell.joex.scheduler - -import io.circe.Json - -final case class JobTaskResult(message: Option[String], json: Option[Json]) { - - def withMessage(m: String): JobTaskResult = - copy(message = Some(m)) - - def withJson(json: Json): JobTaskResult = - copy(json = Some(json)) -} - -object JobTaskResult { - - val empty: JobTaskResult = JobTaskResult(None, None) - - def message(msg: String): JobTaskResult = JobTaskResult(Some(msg), None) - - def json(json: Json): JobTaskResult = JobTaskResult(None, Some(json)) -} diff --git a/modules/joex/src/main/scala/docspell/joex/scheduler/JobTaskResultEncoder.scala b/modules/joex/src/main/scala/docspell/joex/scheduler/JobTaskResultEncoder.scala deleted file mode 100644 index 205d01ce..00000000 --- a/modules/joex/src/main/scala/docspell/joex/scheduler/JobTaskResultEncoder.scala +++ /dev/null @@ -1,49 +0,0 @@ -/* - * Copyright 2020 Eike K. & Contributors - * - * SPDX-License-Identifier: AGPL-3.0-or-later - */ - -package docspell.joex.scheduler - -import docspell.joex.scheduler.JobTaskResultEncoder.instance - -import io.circe.Encoder - -trait JobTaskResultEncoder[A] { self => - def encode(a: A): JobTaskResult - - final def contramap[B](f: B => A): JobTaskResultEncoder[B] = - JobTaskResultEncoder.instance(b => self.encode(f(b))) - - final def map(f: JobTaskResult => JobTaskResult): JobTaskResultEncoder[A] = - instance(a => f(self.encode(a))) - - final def modify(f: (A, JobTaskResult) => JobTaskResult): JobTaskResultEncoder[A] = - instance(a => f(a, self.encode(a))) - - final def withMessage(f: A => String): JobTaskResultEncoder[A] = - modify((a, r) => r.withMessage(f(a))) -} - -object JobTaskResultEncoder { - - def apply[A](implicit v: JobTaskResultEncoder[A]): JobTaskResultEncoder[A] = v - - def instance[A](f: A => JobTaskResult): JobTaskResultEncoder[A] = - (a: A) => f(a) - - def fromJson[A: Encoder]: JobTaskResultEncoder[A] = - instance(a => JobTaskResult.json(Encoder[A].apply(a))) - - implicit val unitJobTaskResultEncoder: JobTaskResultEncoder[Unit] = - instance(_ => JobTaskResult.empty) - - implicit def optionJobTaskResultEncoder[A](implicit - ea: JobTaskResultEncoder[A] - ): JobTaskResultEncoder[Option[A]] = - instance { - case Some(a) => ea.encode(a) - case None => JobTaskResult.empty - } -} diff --git a/modules/joex/src/main/scala/docspell/joex/scheduler/LogEvent.scala b/modules/joex/src/main/scala/docspell/joex/scheduler/LogEvent.scala deleted file mode 100644 index 8edc966d..00000000 --- a/modules/joex/src/main/scala/docspell/joex/scheduler/LogEvent.scala +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Copyright 2020 Eike K. & Contributors - * - * SPDX-License-Identifier: AGPL-3.0-or-later - */ - -package docspell.joex.scheduler - -import cats.effect.Sync -import cats.implicits._ - -import docspell.common._ - -case class LogEvent( - jobId: Ident, - jobInfo: String, - time: Timestamp, - level: LogLevel, - msg: String, - ex: Option[Throwable] = None -) { - - def logLine: String = - s">>> ${time.asString} $level $jobInfo: $msg" - -} - -object LogEvent { - - def create[F[_]: Sync]( - jobId: Ident, - jobInfo: String, - level: LogLevel, - msg: String - ): F[LogEvent] = - Timestamp.current[F].map(now => LogEvent(jobId, jobInfo, now, level, msg)) - -} diff --git a/modules/joex/src/main/scala/docspell/joex/scheduler/LogSink.scala b/modules/joex/src/main/scala/docspell/joex/scheduler/LogSink.scala deleted file mode 100644 index bf01a050..00000000 --- a/modules/joex/src/main/scala/docspell/joex/scheduler/LogSink.scala +++ /dev/null @@ -1,76 +0,0 @@ -/* - * Copyright 2020 Eike K. & Contributors - * - * SPDX-License-Identifier: AGPL-3.0-or-later - */ - -package docspell.joex.scheduler - -import cats.effect._ -import cats.implicits._ -import fs2.Pipe - -import docspell.common._ -import docspell.logging -import docspell.store.Store -import docspell.store.records.RJobLog - -trait LogSink[F[_]] { - - def receive: Pipe[F, LogEvent, Unit] - -} - -object LogSink { - - def apply[F[_]](sink: Pipe[F, LogEvent, Unit]): LogSink[F] = - new LogSink[F] { - val receive = sink - } - - def logInternal[F[_]: Sync](e: LogEvent): F[Unit] = { - val logger = docspell.logging.getLogger[F] - val addData: logging.LogEvent => logging.LogEvent = - _.data("jobId", e.jobId).data("jobInfo", e.jobInfo) - - e.level match { - case LogLevel.Info => - logger.infoWith(e.logLine)(addData) - case LogLevel.Debug => - logger.debugWith(e.logLine)(addData) - case LogLevel.Warn => - logger.warnWith(e.logLine)(addData) - case LogLevel.Error => - e.ex match { - case Some(exc) => - logger.errorWith(e.logLine)(addData.andThen(_.addError(exc))) - case None => - logger.errorWith(e.logLine)(addData) - } - } - } - - def printer[F[_]: Sync]: LogSink[F] = - LogSink(_.evalMap(e => logInternal(e))) - - def db[F[_]: Async](store: Store[F]): LogSink[F] = - LogSink( - _.evalMap(ev => - for { - id <- Ident.randomId[F] - joblog = RJobLog( - id, - ev.jobId, - ev.level, - ev.time, - ev.msg + ev.ex.map(th => ": " + th.getMessage).getOrElse("") - ) - _ <- logInternal(ev) - _ <- store.transact(RJobLog.insert(joblog)) - } yield () - ) - ) - - def dbAndLog[F[_]: Async](store: Store[F]): LogSink[F] = - LogSink(_.broadcastThrough(printer[F].receive, db[F](store).receive)) -} diff --git a/modules/joex/src/main/scala/docspell/joex/scheduler/PeriodicScheduler.scala b/modules/joex/src/main/scala/docspell/joex/scheduler/PeriodicScheduler.scala deleted file mode 100644 index 13f61705..00000000 --- a/modules/joex/src/main/scala/docspell/joex/scheduler/PeriodicScheduler.scala +++ /dev/null @@ -1,60 +0,0 @@ -/* - * Copyright 2020 Eike K. & Contributors - * - * SPDX-License-Identifier: AGPL-3.0-or-later - */ - -package docspell.joex.scheduler - -import cats.effect._ -import fs2._ -import fs2.concurrent.SignallingRef - -import docspell.backend.ops.OJoex -import docspell.store.queue._ - -/** A periodic scheduler takes care to submit periodic tasks to the job queue. - * - * It is run in the background to regularily find a periodic task to execute. If the task - * is due, it will be submitted into the job queue where it will be picked up by the - * scheduler from some joex instance. If it is due in the future, a notification is - * scheduled to be received at that time so the task can be looked up again. - */ -trait PeriodicScheduler[F[_]] { - - def config: PeriodicSchedulerConfig - - def start: Stream[F, Nothing] - - def shutdown: F[Unit] - - def periodicAwake: F[Fiber[F, Throwable, Unit]] - - def notifyChange: F[Unit] -} - -object PeriodicScheduler { - - def create[F[_]: Async]( - cfg: PeriodicSchedulerConfig, - sch: Scheduler[F], - queue: JobQueue[F], - store: PeriodicTaskStore[F], - joex: OJoex[F] - ): Resource[F, PeriodicScheduler[F]] = - for { - waiter <- Resource.eval(SignallingRef(true)) - state <- Resource.eval(SignallingRef(PeriodicSchedulerImpl.emptyState[F])) - psch = new PeriodicSchedulerImpl[F]( - cfg, - sch, - queue, - store, - joex, - waiter, - state - ) - _ <- Resource.eval(psch.init) - } yield psch - -} diff --git a/modules/joex/src/main/scala/docspell/joex/scheduler/PeriodicSchedulerConfig.scala b/modules/joex/src/main/scala/docspell/joex/scheduler/PeriodicSchedulerConfig.scala deleted file mode 100644 index c960f4d0..00000000 --- a/modules/joex/src/main/scala/docspell/joex/scheduler/PeriodicSchedulerConfig.scala +++ /dev/null @@ -1,14 +0,0 @@ -/* - * Copyright 2020 Eike K. & Contributors - * - * SPDX-License-Identifier: AGPL-3.0-or-later - */ - -package docspell.joex.scheduler - -import docspell.common._ - -case class PeriodicSchedulerConfig( - name: Ident, - wakeupPeriod: Duration -) diff --git a/modules/joex/src/main/scala/docspell/joex/scheduler/PeriodicSchedulerImpl.scala b/modules/joex/src/main/scala/docspell/joex/scheduler/PeriodicSchedulerImpl.scala deleted file mode 100644 index 39761a74..00000000 --- a/modules/joex/src/main/scala/docspell/joex/scheduler/PeriodicSchedulerImpl.scala +++ /dev/null @@ -1,182 +0,0 @@ -/* - * Copyright 2020 Eike K. & Contributors - * - * SPDX-License-Identifier: AGPL-3.0-or-later - */ - -package docspell.joex.scheduler - -import cats.effect._ -import cats.implicits._ -import fs2._ -import fs2.concurrent.SignallingRef - -import docspell.backend.ops.OJoex -import docspell.common._ -import docspell.joex.scheduler.PeriodicSchedulerImpl.State -import docspell.store.queue._ -import docspell.store.records.RPeriodicTask - -import eu.timepit.fs2cron.calev.CalevScheduler - -final class PeriodicSchedulerImpl[F[_]: Async]( - val config: PeriodicSchedulerConfig, - sch: Scheduler[F], - queue: JobQueue[F], - store: PeriodicTaskStore[F], - joex: OJoex[F], - waiter: SignallingRef[F, Boolean], - state: SignallingRef[F, State[F]] -) extends PeriodicScheduler[F] { - private[this] val logger = docspell.logging.getLogger[F] - - def start: Stream[F, Nothing] = - logger.stream.info("Starting periodic scheduler").drain ++ - mainLoop - - def shutdown: F[Unit] = - state.modify(_.requestShutdown) - - def periodicAwake: F[Fiber[F, Throwable, Unit]] = - Async[F].start( - Stream - .awakeEvery[F](config.wakeupPeriod.toScala) - .evalMap(_ => logger.debug("Periodic awake reached") *> notifyChange) - .compile - .drain - ) - - def notifyChange: F[Unit] = - waiter.update(b => !b) - - // internal - - /** On startup, get all periodic jobs from this scheduler and remove the mark, so they - * get picked up again. - */ - def init: F[Unit] = - logError("Error clearing marks")(store.clearMarks(config.name)) - - def mainLoop: Stream[F, Nothing] = { - val body: F[Boolean] = - for { - _ <- logger.debug(s"Going into main loop") - now <- Timestamp.current[F] - _ <- logger.debug(s"Looking for next periodic task") - go <- logThrow("Error getting next task")( - store - .takeNext(config.name, None) - .use { - case Marked.Found(pj) => - logger - .debug(s"Found periodic task '${pj.subject}/${pj.timer.asString}'") *> - (if (isTriggered(pj, now)) submitJob(pj) - else scheduleNotify(pj).map(_ => false)) - case Marked.NotFound => - logger.debug("No periodic task found") *> false.pure[F] - case Marked.NotMarkable => - logger.debug("Periodic job cannot be marked. Trying again.") *> true - .pure[F] - } - ) - } yield go - - Stream - .eval(state.get.map(_.shutdownRequest)) - .evalTap( - if (_) logger.info("Stopping main loop due to shutdown request.") - else ().pure[F] - ) - .flatMap(if (_) Stream.empty else Stream.eval(cancelNotify *> body)) - .flatMap { - case true => - mainLoop - case false => - logger.stream.debug(s"Waiting for notify").drain ++ - waiter.discrete.take(2).drain ++ - logger.stream.debug(s"Notify signal, going into main loop").drain ++ - mainLoop - } - } - - def isTriggered(pj: RPeriodicTask, now: Timestamp): Boolean = - pj.nextrun < now - - def submitJob(pj: RPeriodicTask): F[Boolean] = - store - .findNonFinalJob(pj.id) - .flatMap { - case Some(job) => - logger.info( - s"There is already a job with non-final state '${job.state}' in the queue" - ) *> scheduleNotify(pj) *> false.pure[F] - - case None => - logger.info(s"Submitting job for periodic task '${pj.task.id}'") *> - pj.toJob.flatMap(queue.insert) *> notifyJoex *> true.pure[F] - } - - def notifyJoex: F[Unit] = - sch.notifyChange *> joex.notifyAllNodes - - def scheduleNotify(pj: RPeriodicTask): F[Unit] = - Timestamp - .current[F] - .flatMap(now => - logger.debug( - s"Scheduling next notify for timer ${pj.timer.asString} -> ${pj.timer.nextElapse(now.toUtcDateTime)}" - ) - ) *> - Async[F] - .start( - CalevScheduler - .utc[F] - .sleep(pj.timer) - .evalMap(_ => notifyChange) - .compile - .drain - ) - .flatMap(fb => state.modify(_.setNotify(fb))) - - def cancelNotify: F[Unit] = - state - .modify(_.clearNotify) - .flatMap { - case Some(fb) => - fb.cancel - case None => - ().pure[F] - } - - private def logError(msg: => String)(fa: F[Unit]): F[Unit] = - fa.attempt.flatMap { - case Right(_) => ().pure[F] - case Left(ex) => logger.error(ex)(msg).map(_ => ()) - } - - private def logThrow[A](msg: => String)(fa: F[A]): F[A] = - fa.attempt.flatMap { - case r @ Right(_) => (r: Either[Throwable, A]).pure[F] - case l @ Left(ex) => logger.error(ex)(msg).map(_ => (l: Either[Throwable, A])) - }.rethrow -} - -object PeriodicSchedulerImpl { - def emptyState[F[_]]: State[F] = - State(false, None) - - case class State[F[_]]( - shutdownRequest: Boolean, - scheduledNotify: Option[Fiber[F, Throwable, Unit]] - ) { - def requestShutdown: (State[F], Unit) = - (copy(shutdownRequest = true), ()) - - def setNotify(fb: Fiber[F, Throwable, Unit]): (State[F], Unit) = - (copy(scheduledNotify = Some(fb)), ()) - - def clearNotify: (State[F], Option[Fiber[F, Throwable, Unit]]) = - (copy(scheduledNotify = None), scheduledNotify) - - } -} diff --git a/modules/joex/src/main/scala/docspell/joex/scheduler/QueueLogger.scala b/modules/joex/src/main/scala/docspell/joex/scheduler/QueueLogger.scala deleted file mode 100644 index 357a1e83..00000000 --- a/modules/joex/src/main/scala/docspell/joex/scheduler/QueueLogger.scala +++ /dev/null @@ -1,58 +0,0 @@ -/* - * Copyright 2020 Eike K. & Contributors - * - * SPDX-License-Identifier: AGPL-3.0-or-later - */ - -package docspell.joex.scheduler - -import cats.effect._ -import cats.effect.std.Queue -import cats.implicits._ -import fs2.Stream - -import docspell.common._ -import docspell.logging -import docspell.logging.{Level, Logger} - -object QueueLogger { - - def create[F[_]: Sync]( - jobId: Ident, - jobInfo: String, - q: Queue[F, LogEvent] - ): Logger[F] = - new Logger[F] { - - def log(logEvent: logging.LogEvent) = - LogEvent - .create[F](jobId, jobInfo, level2Level(logEvent.level), logEvent.msg()) - .flatMap { ev => - val event = - logEvent.findErrors.headOption - .map(ex => ev.copy(ex = Some(ex))) - .getOrElse(ev) - - q.offer(event) - } - - def asUnsafe = Logger.off - } - - def apply[F[_]: Async]( - jobId: Ident, - jobInfo: String, - bufferSize: Int, - sink: LogSink[F] - ): F[Logger[F]] = - for { - q <- Queue.circularBuffer[F, LogEvent](bufferSize) - log = create(jobId, jobInfo, q) - _ <- Async[F].start( - Stream.fromQueueUnterminated(q).through(sink.receive).compile.drain - ) - } yield log - - private def level2Level(level: Level): LogLevel = - LogLevel.fromLevel(level) -} diff --git a/modules/joex/src/main/scala/docspell/joex/scheduler/Scheduler.scala b/modules/joex/src/main/scala/docspell/joex/scheduler/Scheduler.scala deleted file mode 100644 index 730b616b..00000000 --- a/modules/joex/src/main/scala/docspell/joex/scheduler/Scheduler.scala +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Copyright 2020 Eike K. & Contributors - * - * SPDX-License-Identifier: AGPL-3.0-or-later - */ - -package docspell.joex.scheduler - -import cats.effect._ -import fs2.Stream - -import docspell.common.Ident -import docspell.store.records.RJob - -trait Scheduler[F[_]] { - - def config: SchedulerConfig - - def getRunning: F[Vector[RJob]] - - def requestCancel(jobId: Ident): F[Boolean] - - def notifyChange: F[Unit] - - def start: Stream[F, Nothing] - - /** Requests to shutdown the scheduler. - * - * The scheduler will not take any new jobs from the queue. If there are still running - * jobs, it waits for them to complete. when the cancelAll flag is set to true, it - * cancels all running jobs. - * - * The returned F[Unit] can be evaluated to wait for all that to complete. - */ - def shutdown(cancelAll: Boolean): F[Unit] - - def periodicAwake: F[Fiber[F, Throwable, Unit]] -} diff --git a/modules/joex/src/main/scala/docspell/joex/scheduler/SchedulerBuilder.scala b/modules/joex/src/main/scala/docspell/joex/scheduler/SchedulerBuilder.scala deleted file mode 100644 index cc5cef12..00000000 --- a/modules/joex/src/main/scala/docspell/joex/scheduler/SchedulerBuilder.scala +++ /dev/null @@ -1,96 +0,0 @@ -/* - * Copyright 2020 Eike K. & Contributors - * - * SPDX-License-Identifier: AGPL-3.0-or-later - */ - -package docspell.joex.scheduler - -import cats.effect._ -import cats.effect.std.Semaphore -import cats.implicits._ -import fs2.concurrent.SignallingRef - -import docspell.notification.api.EventSink -import docspell.pubsub.api.PubSubT -import docspell.store.Store -import docspell.store.queue.JobQueue - -case class SchedulerBuilder[F[_]: Async]( - config: SchedulerConfig, - tasks: JobTaskRegistry[F], - store: Store[F], - queue: Resource[F, JobQueue[F]], - logSink: LogSink[F], - pubSub: PubSubT[F], - eventSink: EventSink[F] -) { - - def withConfig(cfg: SchedulerConfig): SchedulerBuilder[F] = - copy(config = cfg) - - def withTaskRegistry(reg: JobTaskRegistry[F]): SchedulerBuilder[F] = - copy(tasks = reg) - - def withTask[A](task: JobTask[F]): SchedulerBuilder[F] = - withTaskRegistry(tasks.withTask(task)) - - def withQueue(queue: Resource[F, JobQueue[F]]): SchedulerBuilder[F] = - copy(queue = queue) - - def withLogSink(sink: LogSink[F]): SchedulerBuilder[F] = - copy(logSink = sink) - - def withQueue(queue: JobQueue[F]): SchedulerBuilder[F] = - copy(queue = Resource.pure[F, JobQueue[F]](queue)) - - def withPubSub(pubSubT: PubSubT[F]): SchedulerBuilder[F] = - copy(pubSub = pubSubT) - - def withEventSink(sink: EventSink[F]): SchedulerBuilder[F] = - copy(eventSink = sink) - - def serve: Resource[F, Scheduler[F]] = - resource.evalMap(sch => Async[F].start(sch.start.compile.drain).map(_ => sch)) - - def resource: Resource[F, Scheduler[F]] = { - val scheduler: Resource[F, SchedulerImpl[F]] = for { - jq <- queue - waiter <- Resource.eval(SignallingRef(true)) - state <- Resource.eval(SignallingRef(SchedulerImpl.emptyState[F])) - perms <- Resource.eval(Semaphore(config.poolSize.toLong)) - } yield new SchedulerImpl[F]( - config, - jq, - pubSub, - eventSink, - tasks, - store, - logSink, - state, - waiter, - perms - ) - - scheduler.evalTap(_.init).map(s => s: Scheduler[F]) - } - -} - -object SchedulerBuilder { - - def apply[F[_]: Async]( - config: SchedulerConfig, - store: Store[F] - ): SchedulerBuilder[F] = - new SchedulerBuilder[F]( - config, - JobTaskRegistry.empty[F], - store, - JobQueue(store), - LogSink.db[F](store), - PubSubT.noop[F], - EventSink.silent[F] - ) - -} diff --git a/modules/joex/src/main/scala/docspell/joex/scheduler/SchedulerConfig.scala b/modules/joex/src/main/scala/docspell/joex/scheduler/SchedulerConfig.scala deleted file mode 100644 index fc67dbe7..00000000 --- a/modules/joex/src/main/scala/docspell/joex/scheduler/SchedulerConfig.scala +++ /dev/null @@ -1,33 +0,0 @@ -/* - * Copyright 2020 Eike K. & Contributors - * - * SPDX-License-Identifier: AGPL-3.0-or-later - */ - -package docspell.joex.scheduler - -import docspell.common._ - -case class SchedulerConfig( - name: Ident, - poolSize: Int, - countingScheme: CountingScheme, - retries: Int, - retryDelay: Duration, - logBufferSize: Int, - wakeupPeriod: Duration -) - -object SchedulerConfig { - - val default = SchedulerConfig( - name = Ident.unsafe("default-scheduler"), - poolSize = 2 // math.max(2, Runtime.getRuntime.availableProcessors / 2) - , - countingScheme = CountingScheme(2, 1), - retries = 5, - retryDelay = Duration.seconds(30), - logBufferSize = 500, - wakeupPeriod = Duration.minutes(10) - ) -} diff --git a/modules/joex/src/main/scala/docspell/joex/scheduler/SchedulerImpl.scala b/modules/joex/src/main/scala/docspell/joex/scheduler/SchedulerImpl.scala deleted file mode 100644 index a4cc030a..00000000 --- a/modules/joex/src/main/scala/docspell/joex/scheduler/SchedulerImpl.scala +++ /dev/null @@ -1,351 +0,0 @@ -/* - * Copyright 2020 Eike K. & Contributors - * - * SPDX-License-Identifier: AGPL-3.0-or-later - */ - -package docspell.joex.scheduler - -import cats.data.OptionT -import cats.effect._ -import cats.effect.std.Semaphore -import cats.implicits._ -import fs2.Stream -import fs2.concurrent.SignallingRef - -import docspell.backend.msg.JobDone -import docspell.common._ -import docspell.joex.scheduler.SchedulerImpl._ -import docspell.notification.api.Event -import docspell.notification.api.EventSink -import docspell.pubsub.api.PubSubT -import docspell.store.Store -import docspell.store.queries.QJob -import docspell.store.queue.JobQueue -import docspell.store.records.RJob - -import io.circe.Json - -final class SchedulerImpl[F[_]: Async]( - val config: SchedulerConfig, - queue: JobQueue[F], - pubSub: PubSubT[F], - eventSink: EventSink[F], - tasks: JobTaskRegistry[F], - store: Store[F], - logSink: LogSink[F], - state: SignallingRef[F, State[F]], - waiter: SignallingRef[F, Boolean], - permits: Semaphore[F] -) extends Scheduler[F] { - - private[this] val logger = docspell.logging.getLogger[F] - - /** On startup, get all jobs in state running from this scheduler and put them into - * waiting state, so they get picked up again. - */ - def init: F[Unit] = - QJob.runningToWaiting(config.name, store) - - def periodicAwake: F[Fiber[F, Throwable, Unit]] = - Async[F].start( - Stream - .awakeEvery[F](config.wakeupPeriod.toScala) - .evalMap(_ => logger.debug("Periodic awake reached") *> notifyChange) - .compile - .drain - ) - - def getRunning: F[Vector[RJob]] = - state.get.flatMap(s => QJob.findAll(s.getRunning, store)) - - def requestCancel(jobId: Ident): F[Boolean] = - logger.info(s"Scheduler requested to cancel job: ${jobId.id}") *> - state.get.flatMap(_.cancelRequest(jobId) match { - case Some(ct) => ct.map(_ => true) - case None => - (for { - job <- OptionT(store.transact(RJob.findByIdAndWorker(jobId, config.name))) - _ <- OptionT.liftF( - if (job.isInProgress) executeCancel(job) - else ().pure[F] - ) - } yield true) - .getOrElseF( - logger.warn(s"Job ${jobId.id} not found, cannot cancel.").map(_ => false) - ) - }) - - def notifyChange: F[Unit] = - waiter.update(b => !b) - - def shutdown(cancelAll: Boolean): F[Unit] = { - val doCancel = - state.get.flatMap(_.cancelTokens.values.toList.traverse(identity)).map(_ => ()) - - val runShutdown = - state.modify(_.requestShutdown) *> (if (cancelAll) doCancel else ().pure[F]) - - val wait = Stream - .eval(runShutdown) - .evalMap(_ => logger.info("Scheduler is shutting down now.")) - .flatMap(_ => - Stream.eval(state.get) ++ Stream - .suspend(state.discrete.takeWhile(_.getRunning.nonEmpty)) - ) - .flatMap { state => - if (state.getRunning.isEmpty) Stream.eval(logger.info("No jobs running.")) - else - Stream.eval( - logger.info(s"Waiting for ${state.getRunning.size} jobs to finish.") - ) ++ - Stream.emit(state) - } - - (wait.drain ++ Stream.emit(())).compile.lastOrError - } - - def start: Stream[F, Nothing] = - logger.stream.info("Starting scheduler").drain ++ - mainLoop - - def mainLoop: Stream[F, Nothing] = { - val body: F[Boolean] = - for { - _ <- permits.available.flatMap(a => - logger.debug(s"Try to acquire permit ($a free)") - ) - _ <- permits.acquire - _ <- logger.debug("New permit acquired") - down <- state.get.map(_.shutdownRequest) - rjob <- - if (down) - logger.info("") *> permits.release *> (None: Option[RJob]).pure[F] - else - queue.nextJob( - group => state.modify(_.nextPrio(group, config.countingScheme)), - config.name, - config.retryDelay - ) - _ <- logger.debug(s"Next job found: ${rjob.map(_.info)}") - _ <- rjob.map(execute).getOrElse(permits.release) - } yield rjob.isDefined - - Stream - .eval(state.get.map(_.shutdownRequest)) - .evalTap( - if (_) logger.info("Stopping main loop due to shutdown request.") - else ().pure[F] - ) - .flatMap(if (_) Stream.empty else Stream.eval(body)) - .flatMap { - case true => - mainLoop - case false => - logger.stream.debug(s"Waiting for notify").drain ++ - waiter.discrete.take(2).drain ++ - logger.stream.debug(s"Notify signal, going into main loop").drain ++ - mainLoop - } - } - - private def executeCancel(job: RJob): F[Unit] = { - val task = for { - jobtask <- - tasks - .find(job.task) - .toRight(s"This executor cannot run tasks with name: ${job.task}") - } yield jobtask - - task match { - case Left(err) => - logger.error(s"Unable to run cancellation task for job ${job.info}: $err") - case Right(t) => - for { - _ <- - logger.debug(s"Creating context for job ${job.info} to run cancellation $t") - ctx <- Context[F, String](job, job.args, config, logSink, store) - _ <- t.onCancel.run(ctx) - _ <- state.modify(_.markCancelled(job)) - _ <- onFinish(job, JobTaskResult.empty, JobState.Cancelled) - _ <- ctx.logger.warn("Job has been cancelled.") - _ <- logger.debug(s"Job ${job.info} has been cancelled.") - } yield () - } - } - - def execute(job: RJob): F[Unit] = { - val task = for { - jobtask <- - tasks - .find(job.task) - .toRight(s"This executor cannot run tasks with name: ${job.task}") - } yield jobtask - - task match { - case Left(err) => - logger.error(s"Unable to start a task for job ${job.info}: $err") - case Right(t) => - for { - _ <- logger.debug(s"Creating context for job ${job.info} to run $t") - ctx <- Context[F, String](job, job.args, config, logSink, store) - jot = wrapTask(job, t.task, ctx) - tok <- forkRun(job, jot.run(ctx), t.onCancel.run(ctx), ctx) - _ <- state.modify(_.addRunning(job, tok)) - } yield () - } - } - - def onFinish(job: RJob, result: JobTaskResult, finishState: JobState): F[Unit] = - for { - _ <- logger.debug(s"Job ${job.info} done $finishState. Releasing resources.") - _ <- permits.release *> permits.available.flatMap(a => - logger.debug(s"Permit released ($a free)") - ) - _ <- state.modify(_.removeRunning(job)) - _ <- QJob.setFinalState(job.id, finishState, store) - _ <- Sync[F].whenA(JobState.isDone(finishState))( - pubSub.publish1IgnoreErrors( - JobDone.topic, - JobDone(job.id, job.group, job.task, job.args, finishState) - ) - ) - _ <- Sync[F].whenA(JobState.isDone(finishState))( - eventSink.offer( - Event.JobDone( - job.id, - job.group, - job.task, - job.args, - job.state, - job.subject, - job.submitter, - result.json.getOrElse(Json.Null), - result.message - ) - ) - ) - } yield () - - def onStart(job: RJob): F[Unit] = - QJob.setRunning( - job.id, - config.name, - store - ) // also increments retries if current state=stuck - - def wrapTask( - job: RJob, - task: Task[F, String, JobTaskResult], - ctx: Context[F, String] - ): Task[F, String, Unit] = - task - .mapF(fa => onStart(job) *> logger.debug("Starting task now") *> fa) - .mapF(_.attempt.flatMap { - case Right(result) => - logger.info(s"Job execution successful: ${job.info}") - ctx.logger.info("Job execution successful") *> - (JobState.Success: JobState, result).pure[F] - case Left(ex) => - state.get.map(_.wasCancelled(job)).flatMap { - case true => - logger.error(ex)(s"Job ${job.info} execution failed (cancel = true)") - ctx.logger.error(ex)("Job execution failed (cancel = true)") *> - (JobState.Cancelled: JobState, JobTaskResult.empty).pure[F] - case false => - QJob.exceedsRetries(job.id, config.retries, store).flatMap { - case true => - logger.error(ex)(s"Job ${job.info} execution failed. Retries exceeded.") - ctx.logger - .error(ex)(s"Job ${job.info} execution failed. Retries exceeded.") - .map(_ => (JobState.Failed: JobState, JobTaskResult.empty)) - case false => - logger.error(ex)(s"Job ${job.info} execution failed. Retrying later.") - ctx.logger - .error(ex)(s"Job ${job.info} execution failed. Retrying later.") - .map(_ => (JobState.Stuck: JobState, JobTaskResult.empty)) - } - } - }) - .mapF(_.attempt.flatMap { - case Right((jstate, result)) => - onFinish(job, result, jstate) - case Left(ex) => - logger.error(ex)(s"Error happened during post-processing of ${job.info}!") - // we don't know the real outcome here… - // since tasks should be idempotent, set it to stuck. if above has failed, this might fail anyways - onFinish(job, JobTaskResult.empty, JobState.Stuck) - }) - - def forkRun( - job: RJob, - code: F[Unit], - onCancel: F[Unit], - ctx: Context[F, String] - ): F[F[Unit]] = - logger.debug(s"Forking job ${job.info}") *> - Async[F] - .start(code) - .map(fiber => - logger.debug(s"Cancelling job ${job.info}") *> - fiber.cancel *> - onCancel.attempt.map { - case Right(_) => () - case Left(ex) => - logger.error(ex)(s"Task's cancelling code failed. Job ${job.info}.") - () - } *> - state.modify(_.markCancelled(job)) *> - onFinish(job, JobTaskResult.empty, JobState.Cancelled) *> - ctx.logger.warn("Job has been cancelled.") *> - logger.debug(s"Job ${job.info} has been cancelled.") - ) -} - -object SchedulerImpl { - - type CancelToken[F[_]] = F[Unit] - - def emptyState[F[_]]: State[F] = - State(Map.empty, Set.empty, Map.empty, false) - - case class State[F[_]]( - counters: Map[Ident, CountingScheme], - cancelled: Set[Ident], - cancelTokens: Map[Ident, CancelToken[F]], - shutdownRequest: Boolean - ) { - - def nextPrio(group: Ident, initial: CountingScheme): (State[F], Priority) = { - val (cs, prio) = counters.getOrElse(group, initial).nextPriority - (copy(counters = counters.updated(group, cs)), prio) - } - - def addRunning(job: RJob, token: CancelToken[F]): (State[F], Unit) = - ( - State(counters, cancelled, cancelTokens.updated(job.id, token), shutdownRequest), - () - ) - - def removeRunning(job: RJob): (State[F], Unit) = - ( - copy(cancelled = cancelled - job.id, cancelTokens = cancelTokens.removed(job.id)), - () - ) - - def markCancelled(job: RJob): (State[F], Unit) = - (copy(cancelled = cancelled + job.id), ()) - - def wasCancelled(job: RJob): Boolean = - cancelled.contains(job.id) - - def cancelRequest(id: Ident): Option[F[Unit]] = - cancelTokens.get(id) - - def getRunning: Seq[Ident] = - cancelTokens.keys.toSeq - - def requestShutdown: (State[F], Unit) = - (copy(shutdownRequest = true), ()) - } -} diff --git a/modules/joex/src/main/scala/docspell/joex/scheduler/Task.scala b/modules/joex/src/main/scala/docspell/joex/scheduler/Task.scala deleted file mode 100644 index 4ba4d84d..00000000 --- a/modules/joex/src/main/scala/docspell/joex/scheduler/Task.scala +++ /dev/null @@ -1,72 +0,0 @@ -/* - * Copyright 2020 Eike K. & Contributors - * - * SPDX-License-Identifier: AGPL-3.0-or-later - */ - -package docspell.joex.scheduler - -import cats._ -import cats.data.Kleisli -import cats.effect.Sync -import cats.implicits._ - -import docspell.logging.Logger - -/** The code that is executed by the scheduler */ -trait Task[F[_], A, B] { - - def run(ctx: Context[F, A]): F[B] - - def andThen[C](f: B => F[C])(implicit F: FlatMap[F]): Task[F, A, C] = - Task(Task.toKleisli(this).andThen(f)) - - def mapF[C](f: F[B] => F[C]): Task[F, A, C] = - Task(Task.toKleisli(this).mapF(f)) - - def attempt(implicit - F: ApplicativeError[F, Throwable] - ): Task[F, A, Either[Throwable, B]] = - mapF(_.attempt) - - def contramap[C](f: C => F[A])(implicit F: FlatMap[F]): Task[F, C, B] = { - ctxc: Context[F, C] => f(ctxc.args).flatMap(a => run(ctxc.map(_ => a))) - } -} - -object Task { - - def pure[F[_]: Applicative, A, B](b: B): Task[F, A, B] = - Task(_ => b.pure[F]) - - def of[F[_], A, B](b: F[B]): Task[F, A, B] = - Task(_ => b) - - def apply[F[_], A, B](f: Context[F, A] => F[B]): Task[F, A, B] = - (ctx: Context[F, A]) => f(ctx) - - def apply[F[_], A, B](k: Kleisli[F, Context[F, A], B]): Task[F, A, B] = - c => k.run(c) - - def toKleisli[F[_], A, B](t: Task[F, A, B]): Kleisli[F, Context[F, A], B] = - Kleisli(t.run) - - def setProgress[F[_]: Sync, A, B](n: Int)(data: B): Task[F, A, B] = - Task(_.setProgress(n).map(_ => data)) - - def log[F[_], A](f: Logger[F] => F[Unit]): Task[F, A, Unit] = - Task(ctx => f(ctx.logger)) - - implicit def taskMonad[F[_]: Monad, T]: Monad[Task[F, T, *]] = - new Monad[Task[F, T, *]] { - def pure[A](x: A) = Task(_ => Monad[F].pure(x)) - def flatMap[A, B](fa: Task[F, T, A])(f: A => Task[F, T, B]) = - Task(Task.toKleisli(fa).flatMap(a => Task.toKleisli(f(a)))) - - def tailRecM[A, B](a: A)(f: A => Task[F, T, Either[A, B]]) = { - val monadK = Monad[Kleisli[F, Context[F, T], *]] - val r = monadK.tailRecM(a)(x => Task.toKleisli(f(x))) - Task(r) - } - } -} diff --git a/modules/joex/src/main/scala/docspell/joex/updatecheck/UpdateCheckTask.scala b/modules/joex/src/main/scala/docspell/joex/updatecheck/UpdateCheckTask.scala index c89b9fde..7a82f65b 100644 --- a/modules/joex/src/main/scala/docspell/joex/updatecheck/UpdateCheckTask.scala +++ b/modules/joex/src/main/scala/docspell/joex/updatecheck/UpdateCheckTask.scala @@ -11,8 +11,8 @@ import cats.effect._ import cats.implicits._ import docspell.common._ -import docspell.joex.scheduler.Context -import docspell.joex.scheduler.Task +import docspell.scheduler.Context +import docspell.scheduler.Task import docspell.store.records.RPeriodicTask import docspell.store.records.RUserEmail import docspell.store.usertask.UserTask