Adopt code to use moved schedule api

This commit is contained in:
eikek
2022-03-12 14:15:36 +01:00
parent 0ce3abb3ff
commit aafd908906
70 changed files with 64 additions and 1398 deletions

View File

@ -1,32 +0,0 @@
/*
* Copyright 2020 Eike K. & Contributors
*
* SPDX-License-Identifier: AGPL-3.0-or-later
*/
package docspell.backend.msg
import docspell.common._
import docspell.pubsub.api.{Topic, TypedTopic}
import io.circe.generic.semiauto.{deriveDecoder, deriveEncoder}
import io.circe.{Decoder, Encoder}
/** Message to notify about finished jobs. They have a final state. */
final case class JobDone(
jobId: Ident,
group: Ident,
task: Ident,
args: String,
state: JobState
)
object JobDone {
implicit val jsonDecoder: Decoder[JobDone] =
deriveDecoder[JobDone]
implicit val jsonEncoder: Encoder[JobDone] =
deriveEncoder[JobDone]
val topic: TypedTopic[JobDone] =
TypedTopic(Topic("job-finished"))
}

View File

@ -7,8 +7,8 @@
package docspell.backend.msg package docspell.backend.msg
import cats.data.NonEmptyList import cats.data.NonEmptyList
import docspell.pubsub.api.{Topic, TypedTopic} import docspell.pubsub.api.{Topic, TypedTopic}
import docspell.scheduler.msg.JobDone
/** All topics used in Docspell. */ /** All topics used in Docspell. */
object Topics { object Topics {

View File

@ -19,7 +19,7 @@ import docspell.ftssolr.SolrConfig
import docspell.joex.analysis.RegexNerFile import docspell.joex.analysis.RegexNerFile
import docspell.joex.hk.HouseKeepingConfig import docspell.joex.hk.HouseKeepingConfig
import docspell.joex.routes.InternalHeader import docspell.joex.routes.InternalHeader
import docspell.joex.scheduler.{PeriodicSchedulerConfig, SchedulerConfig} import docspell.scheduler.{PeriodicSchedulerConfig, SchedulerConfig}
import docspell.joex.updatecheck.UpdateCheckConfig import docspell.joex.updatecheck.UpdateCheckConfig
import docspell.logging.LogConfig import docspell.logging.LogConfig
import docspell.pubsub.naive.PubSubConfig import docspell.pubsub.naive.PubSubConfig

View File

@ -10,7 +10,7 @@ import cats.effect.Async
import docspell.config.Implicits._ import docspell.config.Implicits._
import docspell.config.{ConfigFactory, Validation} import docspell.config.{ConfigFactory, Validation}
import docspell.joex.scheduler.CountingScheme import docspell.scheduler.CountingScheme
import emil.MailAddress import emil.MailAddress
import emil.javamail.syntax._ import emil.javamail.syntax._

View File

@ -7,7 +7,7 @@
package docspell.joex package docspell.joex
import docspell.common.Ident import docspell.common.Ident
import docspell.joex.scheduler.{PeriodicScheduler, Scheduler} import docspell.scheduler.{PeriodicScheduler, Scheduler}
import docspell.store.records.RJobLog import docspell.store.records.RJobLog
trait JoexApp[F[_]] { trait JoexApp[F[_]] {

View File

@ -9,7 +9,6 @@ package docspell.joex
import cats.effect._ import cats.effect._
import cats.implicits._ import cats.implicits._
import fs2.concurrent.SignallingRef import fs2.concurrent.SignallingRef
import docspell.analysis.TextAnalyser import docspell.analysis.TextAnalyser
import docspell.backend.MailAddressCodec import docspell.backend.MailAddressCodec
import docspell.backend.fulltext.CreateIndex import docspell.backend.fulltext.CreateIndex
@ -32,7 +31,8 @@ import docspell.joex.preview._
import docspell.joex.process.ItemHandler import docspell.joex.process.ItemHandler
import docspell.joex.process.ReProcessItem import docspell.joex.process.ReProcessItem
import docspell.joex.scanmailbox._ import docspell.joex.scanmailbox._
import docspell.joex.scheduler._ import docspell.scheduler._
import docspell.scheduler.impl.{PeriodicSchedulerBuilder, SchedulerBuilder}
import docspell.joex.updatecheck._ import docspell.joex.updatecheck._
import docspell.notification.api.NotificationModule import docspell.notification.api.NotificationModule
import docspell.notification.impl.NotificationModuleImpl import docspell.notification.impl.NotificationModuleImpl
@ -42,7 +42,6 @@ import docspell.store.queue._
import docspell.store.records.{REmptyTrashSetting, RJobLog} import docspell.store.records.{REmptyTrashSetting, RJobLog}
import docspell.store.usertask.UserTaskScope import docspell.store.usertask.UserTaskScope
import docspell.store.usertask.UserTaskStore import docspell.store.usertask.UserTaskStore
import emil.javamail._ import emil.javamail._
import org.http4s.client.Client import org.http4s.client.Client
@ -296,12 +295,12 @@ object JoexAppImpl extends MailAddressCodec {
) )
) )
.resource .resource
psch <- PeriodicScheduler.create( psch <- PeriodicSchedulerBuilder.build(
cfg.periodicScheduler, cfg.periodicScheduler,
sch, sch,
queue, queue,
pstore, pstore,
joex joex.notifyAllNodes
) )
app = new JoexAppImpl( app = new JoexAppImpl(
cfg, cfg,

View File

@ -12,7 +12,7 @@ import fs2.Stream
import docspell.backend.ops.{OItem, OItemSearch} import docspell.backend.ops.{OItem, OItemSearch}
import docspell.common._ import docspell.common._
import docspell.joex.scheduler._ import docspell.scheduler._
import docspell.store.records.RItem import docspell.store.records.RItem
import docspell.store.usertask.UserTask import docspell.store.usertask.UserTask

View File

@ -12,7 +12,7 @@ import cats.implicits._
import docspell.common.FileCopyTaskArgs.Selection import docspell.common.FileCopyTaskArgs.Selection
import docspell.common.{FileCopyTaskArgs, Ident} import docspell.common.{FileCopyTaskArgs, Ident}
import docspell.joex.Config import docspell.joex.Config
import docspell.joex.scheduler.{JobTaskResultEncoder, Task} import docspell.scheduler.{JobTaskResultEncoder, Task}
import docspell.logging.Logger import docspell.logging.Logger
import docspell.store.file.{BinnyUtils, FileRepository, FileRepositoryConfig} import docspell.store.file.{BinnyUtils, FileRepository, FileRepositoryConfig}
import binny.CopyTool.Counter import binny.CopyTool.Counter

View File

@ -12,7 +12,7 @@ import cats.implicits._
import docspell.backend.ops.OFileRepository import docspell.backend.ops.OFileRepository
import docspell.backend.ops.OFileRepository.IntegrityResult import docspell.backend.ops.OFileRepository.IntegrityResult
import docspell.common.{FileIntegrityCheckArgs, FileKey} import docspell.common.{FileIntegrityCheckArgs, FileKey}
import docspell.joex.scheduler.{JobTaskResultEncoder, Task} import docspell.scheduler.{JobTaskResultEncoder, Task}
import docspell.store.records.RFileMeta import docspell.store.records.RFileMeta
import io.circe.Encoder import io.circe.Encoder
import io.circe.generic.semiauto.deriveEncoder import io.circe.generic.semiauto.deriveEncoder

View File

@ -9,7 +9,7 @@ package docspell.joex.fts
import docspell.backend.fulltext.CreateIndex import docspell.backend.fulltext.CreateIndex
import docspell.ftsclient.FtsClient import docspell.ftsclient.FtsClient
import docspell.joex.Config import docspell.joex.Config
import docspell.joex.scheduler.Context import docspell.scheduler.Context
import docspell.logging.Logger import docspell.logging.Logger
import docspell.store.Store import docspell.store.Store

View File

@ -14,7 +14,7 @@ import docspell.backend.fulltext.CreateIndex
import docspell.common._ import docspell.common._
import docspell.ftsclient._ import docspell.ftsclient._
import docspell.joex.Config import docspell.joex.Config
import docspell.joex.scheduler.Context import docspell.scheduler.Context
import docspell.logging.Logger import docspell.logging.Logger
object FtsWork { object FtsWork {

View File

@ -13,7 +13,7 @@ import docspell.backend.fulltext.CreateIndex
import docspell.common._ import docspell.common._
import docspell.ftsclient._ import docspell.ftsclient._
import docspell.joex.Config import docspell.joex.Config
import docspell.joex.scheduler.Task import docspell.scheduler.Task
import docspell.store.records.RJob import docspell.store.records.RJob
object MigrationTask { object MigrationTask {

View File

@ -14,7 +14,7 @@ import docspell.common._
import docspell.ftsclient._ import docspell.ftsclient._
import docspell.joex.Config import docspell.joex.Config
import docspell.joex.fts.FtsWork.syntax._ import docspell.joex.fts.FtsWork.syntax._
import docspell.joex.scheduler.Task import docspell.scheduler.Task
object ReIndexTask { object ReIndexTask {
type Args = ReIndexTaskArgs type Args = ReIndexTaskArgs

View File

@ -10,7 +10,7 @@ import cats.effect._
import cats.implicits._ import cats.implicits._
import docspell.common._ import docspell.common._
import docspell.joex.scheduler.{Context, Task} import docspell.scheduler.{Context, Task}
import docspell.logging.Logger import docspell.logging.Logger
import docspell.store.records._ import docspell.store.records._

View File

@ -10,7 +10,7 @@ import cats.effect._
import cats.implicits._ import cats.implicits._
import docspell.common._ import docspell.common._
import docspell.joex.scheduler.Task import docspell.scheduler.Task
import docspell.store.records._ import docspell.store.records._
object CleanupInvitesTask { object CleanupInvitesTask {

View File

@ -11,7 +11,7 @@ import cats.implicits._
import fs2.Stream import fs2.Stream
import docspell.common._ import docspell.common._
import docspell.joex.scheduler.Task import docspell.scheduler.Task
import docspell.store.Store import docspell.store.Store
import docspell.store.records._ import docspell.store.records._

View File

@ -10,7 +10,7 @@ import cats.effect._
import cats.implicits._ import cats.implicits._
import docspell.common._ import docspell.common._
import docspell.joex.scheduler.Task import docspell.scheduler.Task
import docspell.store.records._ import docspell.store.records._
object CleanupRememberMeTask { object CleanupRememberMeTask {

View File

@ -13,7 +13,7 @@ import docspell.backend.ops.OFileRepository
import docspell.common._ import docspell.common._
import docspell.joex.Config import docspell.joex.Config
import docspell.joex.filecopy.FileIntegrityCheckTask import docspell.joex.filecopy.FileIntegrityCheckTask
import docspell.joex.scheduler.{JobTaskResultEncoder, Task} import docspell.scheduler.{JobTaskResultEncoder, Task}
import docspell.store.records._ import docspell.store.records._
import docspell.store.usertask.UserTaskScope import docspell.store.usertask.UserTaskScope

View File

@ -12,7 +12,7 @@ import cats.implicits._
import docspell.backend.ops.OFileRepository import docspell.backend.ops.OFileRepository
import docspell.common._ import docspell.common._
import docspell.joex.filecopy.FileIntegrityCheckTask import docspell.joex.filecopy.FileIntegrityCheckTask
import docspell.joex.scheduler.Task import docspell.scheduler.Task
object IntegrityCheckTask { object IntegrityCheckTask {

View File

@ -14,7 +14,7 @@ import docspell.analysis.TextAnalyser
import docspell.backend.ops.OCollective import docspell.backend.ops.OCollective
import docspell.common._ import docspell.common._
import docspell.joex.Config import docspell.joex.Config
import docspell.joex.scheduler._ import docspell.scheduler._
import docspell.logging.Logger import docspell.logging.Logger
import docspell.store.records.{RClassifierModel, RClassifierSetting} import docspell.store.records.{RClassifierModel, RClassifierSetting}

View File

@ -14,7 +14,7 @@ import fs2.Stream
import docspell.analysis.TextAnalyser import docspell.analysis.TextAnalyser
import docspell.analysis.classifier.TextClassifier.Data import docspell.analysis.classifier.TextClassifier.Data
import docspell.common._ import docspell.common._
import docspell.joex.scheduler._ import docspell.scheduler._
object LearnItemEntities { object LearnItemEntities {
def learnAll[F[_]: Async, A]( def learnAll[F[_]: Async, A](

View File

@ -12,7 +12,7 @@ import cats.implicits._
import docspell.analysis.TextAnalyser import docspell.analysis.TextAnalyser
import docspell.common._ import docspell.common._
import docspell.joex.scheduler._ import docspell.scheduler._
import docspell.store.records.RClassifierSetting import docspell.store.records.RClassifierSetting
object LearnTags { object LearnTags {

View File

@ -10,7 +10,7 @@ import fs2.{Pipe, Stream}
import docspell.analysis.classifier.TextClassifier.Data import docspell.analysis.classifier.TextClassifier.Data
import docspell.common._ import docspell.common._
import docspell.joex.scheduler.Context import docspell.scheduler.Context
import docspell.store.Store import docspell.store.Store
import docspell.store.qb.Batch import docspell.store.qb.Batch
import docspell.store.queries.{QItem, TextAndTag} import docspell.store.queries.{QItem, TextAndTag}

View File

@ -12,7 +12,7 @@ import fs2.io.file.Files
import docspell.analysis.classifier.ClassifierModel import docspell.analysis.classifier.ClassifierModel
import docspell.common._ import docspell.common._
import docspell.joex.scheduler._ import docspell.scheduler._
import docspell.logging.Logger import docspell.logging.Logger
import docspell.store.Store import docspell.store.Store
import docspell.store.records.RClassifierModel import docspell.store.records.RClassifierModel

View File

@ -12,8 +12,8 @@ import cats.implicits._
import docspell.backend.ops.ONotification import docspell.backend.ops.ONotification
import docspell.common._ import docspell.common._
import docspell.joex.scheduler.Context import docspell.scheduler.Context
import docspell.joex.scheduler.Task import docspell.scheduler.Task
import docspell.notification.api.EventContext import docspell.notification.api.EventContext
import docspell.notification.api.NotificationChannel import docspell.notification.api.NotificationChannel
import docspell.notification.api.PeriodicDueItemsArgs import docspell.notification.api.PeriodicDueItemsArgs

View File

@ -13,8 +13,8 @@ import cats.implicits._
import docspell.backend.ops.ONotification import docspell.backend.ops.ONotification
import docspell.common._ import docspell.common._
import docspell.joex.scheduler.Context import docspell.scheduler.Context
import docspell.joex.scheduler.Task import docspell.scheduler.Task
import docspell.notification.api.EventContext import docspell.notification.api.EventContext
import docspell.notification.api.NotificationChannel import docspell.notification.api.NotificationChannel
import docspell.notification.api.PeriodicQueryArgs import docspell.notification.api.PeriodicQueryArgs

View File

@ -13,8 +13,8 @@ import fs2.{Chunk, Stream}
import docspell.backend.JobFactory import docspell.backend.JobFactory
import docspell.backend.ops.OJoex import docspell.backend.ops.OJoex
import docspell.common._ import docspell.common._
import docspell.joex.scheduler.Context import docspell.scheduler.Context
import docspell.joex.scheduler.Task import docspell.scheduler.Task
import docspell.store.queue.JobQueue import docspell.store.queue.JobQueue
import docspell.store.records.RAttachment import docspell.store.records.RAttachment
import docspell.store.records.RJob import docspell.store.records.RJob

View File

@ -11,8 +11,8 @@ import cats.implicits._
import docspell.common._ import docspell.common._
import docspell.joex.process.AttachmentPageCount import docspell.joex.process.AttachmentPageCount
import docspell.joex.scheduler.Context import docspell.scheduler.Context
import docspell.joex.scheduler.Task import docspell.scheduler.Task
import docspell.store.records.RAttachment import docspell.store.records.RAttachment
import docspell.store.records.RAttachmentMeta import docspell.store.records.RAttachmentMeta

View File

@ -12,7 +12,7 @@ import fs2.{Chunk, Stream}
import docspell.backend.ops.OJoex import docspell.backend.ops.OJoex
import docspell.common._ import docspell.common._
import docspell.joex.scheduler.{Context, Task} import docspell.scheduler.{Context, Task}
import docspell.store.queue.JobQueue import docspell.store.queue.JobQueue
import docspell.store.records.RAttachment import docspell.store.records.RAttachment
import docspell.store.records._ import docspell.store.records._

View File

@ -16,7 +16,7 @@ import docspell.common._
import docspell.convert.ConversionResult import docspell.convert.ConversionResult
import docspell.convert.extern.OcrMyPdf import docspell.convert.extern.OcrMyPdf
import docspell.joex.Config import docspell.joex.Config
import docspell.joex.scheduler.{Context, Task} import docspell.scheduler.{Context, Task}
import docspell.store.records._ import docspell.store.records._
import io.circe.generic.semiauto._ import io.circe.generic.semiauto._

View File

@ -14,8 +14,8 @@ import docspell.backend.JobFactory
import docspell.backend.ops.OJoex import docspell.backend.ops.OJoex
import docspell.common.MakePreviewArgs.StoreMode import docspell.common.MakePreviewArgs.StoreMode
import docspell.common._ import docspell.common._
import docspell.joex.scheduler.Context import docspell.scheduler.Context
import docspell.joex.scheduler.Task import docspell.scheduler.Task
import docspell.store.queue.JobQueue import docspell.store.queue.JobQueue
import docspell.store.records.RAttachment import docspell.store.records.RAttachment
import docspell.store.records.RJob import docspell.store.records.RJob

View File

@ -13,8 +13,8 @@ import docspell.common._
import docspell.extract.pdfbox.PdfboxPreview import docspell.extract.pdfbox.PdfboxPreview
import docspell.extract.pdfbox.PreviewConfig import docspell.extract.pdfbox.PreviewConfig
import docspell.joex.process.AttachmentPreview import docspell.joex.process.AttachmentPreview
import docspell.joex.scheduler.Context import docspell.scheduler.Context
import docspell.joex.scheduler.Task import docspell.scheduler.Task
import docspell.store.records.RAttachment import docspell.store.records.RAttachment
import docspell.store.records.RAttachmentPreview import docspell.store.records.RAttachmentPreview

View File

@ -15,7 +15,7 @@ import fs2.Stream
import docspell.common._ import docspell.common._
import docspell.extract.pdfbox.PdfMetaData import docspell.extract.pdfbox.PdfMetaData
import docspell.extract.pdfbox.PdfboxExtract import docspell.extract.pdfbox.PdfboxExtract
import docspell.joex.scheduler._ import docspell.scheduler._
import docspell.store.records.RAttachment import docspell.store.records.RAttachment
import docspell.store.records._ import docspell.store.records._

View File

@ -15,7 +15,7 @@ import fs2.Stream
import docspell.common._ import docspell.common._
import docspell.extract.pdfbox.PdfboxPreview import docspell.extract.pdfbox.PdfboxPreview
import docspell.extract.pdfbox.PreviewConfig import docspell.extract.pdfbox.PreviewConfig
import docspell.joex.scheduler._ import docspell.scheduler._
import docspell.store.queries.QAttachment import docspell.store.queries.QAttachment
import docspell.store.records.RAttachment import docspell.store.records.RAttachment
import docspell.store.records._ import docspell.store.records._

View File

@ -17,7 +17,7 @@ import docspell.convert.ConversionResult.Handler
import docspell.convert.SanitizeHtml import docspell.convert.SanitizeHtml
import docspell.convert._ import docspell.convert._
import docspell.joex.extract.JsoupSanitizer import docspell.joex.extract.JsoupSanitizer
import docspell.joex.scheduler._ import docspell.scheduler._
import docspell.store.records._ import docspell.store.records._
/** Goes through all attachments and creates a PDF version of it where supported. /** Goes through all attachments and creates a PDF version of it where supported.

View File

@ -13,7 +13,7 @@ import cats.implicits._
import fs2.Stream import fs2.Stream
import docspell.common._ import docspell.common._
import docspell.joex.scheduler.{Context, Task} import docspell.scheduler.{Context, Task}
import docspell.store.file.FileMetadata import docspell.store.file.FileMetadata
import docspell.store.queries.QItem import docspell.store.queries.QItem
import docspell.store.records._ import docspell.store.records._

View File

@ -12,7 +12,7 @@ import cats.effect.Sync
import cats.implicits._ import cats.implicits._
import docspell.common._ import docspell.common._
import docspell.joex.scheduler.Task import docspell.scheduler.Task
import docspell.logging.Logger import docspell.logging.Logger
/** After candidates have been determined, the set is reduced by doing some cross checks. /** After candidates have been determined, the set is reduced by doing some cross checks.

View File

@ -10,7 +10,7 @@ import cats.effect._
import cats.implicits._ import cats.implicits._
import docspell.common._ import docspell.common._
import docspell.joex.scheduler.{Context, Task} import docspell.scheduler.{Context, Task}
import docspell.store.queries.QItem import docspell.store.queries.QItem
import docspell.store.records.RFileMeta import docspell.store.records.RFileMeta
import docspell.store.records.RJob import docspell.store.records.RJob

View File

@ -12,7 +12,7 @@ import cats.effect.Sync
import cats.implicits._ import cats.implicits._
import docspell.common._ import docspell.common._
import docspell.joex.scheduler.{Context, Task} import docspell.scheduler.{Context, Task}
import docspell.store.records.{RAttachmentMeta, RPerson} import docspell.store.records.{RAttachmentMeta, RPerson}
/** Calculate weights for candidates that adds the most likely candidate a lower number. /** Calculate weights for candidates that adds the most likely candidate a lower number.

View File

@ -18,7 +18,7 @@ import fs2.Stream
import docspell.common._ import docspell.common._
import docspell.files.Zip import docspell.files.Zip
import docspell.joex.mail._ import docspell.joex.mail._
import docspell.joex.scheduler._ import docspell.scheduler._
import docspell.store.records._ import docspell.store.records._
import emil.Mail import emil.Mail

View File

@ -16,7 +16,7 @@ import docspell.analysis.contact._
import docspell.common.MetaProposal.Candidate import docspell.common.MetaProposal.Candidate
import docspell.common._ import docspell.common._
import docspell.joex.Config import docspell.joex.Config
import docspell.joex.scheduler.{Context, Task} import docspell.scheduler.{Context, Task}
import docspell.store.records._ import docspell.store.records._
/** Super simple approach to find corresponding meta data to an item by looking up values /** Super simple approach to find corresponding meta data to an item by looking up values

View File

@ -8,7 +8,7 @@ package docspell.joex.process
import docspell.common._ import docspell.common._
import docspell.joex.process.ItemData.AttachmentDates import docspell.joex.process.ItemData.AttachmentDates
import docspell.joex.scheduler.JobTaskResultEncoder import docspell.scheduler.JobTaskResultEncoder
import docspell.store.records.{RAttachment, RAttachmentMeta, RItem} import docspell.store.records.{RAttachment, RAttachmentMeta, RItem}
import io.circe.syntax.EncoderOps import io.circe.syntax.EncoderOps

View File

@ -17,7 +17,7 @@ import docspell.common.{ItemState, ProcessItemArgs}
import docspell.ftsclient.FtsClient import docspell.ftsclient.FtsClient
import docspell.joex.Config import docspell.joex.Config
import docspell.joex.analysis.RegexNerFile import docspell.joex.analysis.RegexNerFile
import docspell.joex.scheduler.Task import docspell.scheduler.Task
import docspell.store.queries.QItem import docspell.store.queries.QItem
import docspell.store.records.RItem import docspell.store.records.RItem

View File

@ -11,7 +11,7 @@ import cats.effect.Sync
import cats.implicits._ import cats.implicits._
import docspell.common._ import docspell.common._
import docspell.joex.scheduler.{Context, Task} import docspell.scheduler.{Context, Task}
import docspell.store.records.RItem import docspell.store.records.RItem
object LinkProposal { object LinkProposal {

View File

@ -15,7 +15,7 @@ import docspell.common.ProcessItemArgs
import docspell.ftsclient.FtsClient import docspell.ftsclient.FtsClient
import docspell.joex.Config import docspell.joex.Config
import docspell.joex.analysis.RegexNerFile import docspell.joex.analysis.RegexNerFile
import docspell.joex.scheduler.Task import docspell.scheduler.Task
object ProcessItem { object ProcessItem {

View File

@ -16,8 +16,8 @@ import docspell.common._
import docspell.ftsclient.FtsClient import docspell.ftsclient.FtsClient
import docspell.joex.Config import docspell.joex.Config
import docspell.joex.analysis.RegexNerFile import docspell.joex.analysis.RegexNerFile
import docspell.joex.scheduler.Context import docspell.scheduler.Context
import docspell.joex.scheduler.Task import docspell.scheduler.Task
import docspell.store.queries.QItem import docspell.store.queries.QItem
import docspell.store.records.RAttachment import docspell.store.records.RAttachment
import docspell.store.records.RAttachmentSource import docspell.store.records.RAttachmentSource

View File

@ -11,7 +11,7 @@ import cats.implicits._
import docspell.backend.ops.OItem import docspell.backend.ops.OItem
import docspell.common._ import docspell.common._
import docspell.joex.scheduler.Task import docspell.scheduler.Task
object RemoveEmptyItem { object RemoveEmptyItem {

View File

@ -10,7 +10,7 @@ import cats.effect.Sync
import cats.implicits._ import cats.implicits._
import docspell.common._ import docspell.common._
import docspell.joex.scheduler.{Context, Task} import docspell.scheduler.{Context, Task}
import docspell.store.AddResult import docspell.store.AddResult
import docspell.store.records._ import docspell.store.records._

View File

@ -11,7 +11,7 @@ import cats.implicits._
import docspell.backend.ops.OItem import docspell.backend.ops.OItem
import docspell.common._ import docspell.common._
import docspell.joex.scheduler.Task import docspell.scheduler.Task
import docspell.store.UpdateResult import docspell.store.UpdateResult
object SetGivenData { object SetGivenData {

View File

@ -18,8 +18,8 @@ import docspell.joex.Config
import docspell.joex.analysis.RegexNerFile import docspell.joex.analysis.RegexNerFile
import docspell.joex.learn.{ClassifierName, Classify, LearnClassifierTask} import docspell.joex.learn.{ClassifierName, Classify, LearnClassifierTask}
import docspell.joex.process.ItemData.AttachmentDates import docspell.joex.process.ItemData.AttachmentDates
import docspell.joex.scheduler.Context import docspell.scheduler.Context
import docspell.joex.scheduler.Task import docspell.scheduler.Task
import docspell.store.records.{RAttachmentMeta, RClassifierSetting} import docspell.store.records.{RAttachmentMeta, RClassifierSetting}
object TextAnalysis { object TextAnalysis {

View File

@ -13,7 +13,7 @@ import cats.implicits._
import docspell.common._ import docspell.common._
import docspell.extract.{ExtractConfig, ExtractResult, Extraction} import docspell.extract.{ExtractConfig, ExtractResult, Extraction}
import docspell.ftsclient.{FtsClient, TextData} import docspell.ftsclient.{FtsClient, TextData}
import docspell.joex.scheduler.{Context, Task} import docspell.scheduler.{Context, Task}
import docspell.store.records.{RAttachment, RAttachmentMeta, RFileMeta} import docspell.store.records.{RAttachment, RAttachmentMeta, RFileMeta}
object TextExtraction { object TextExtraction {

View File

@ -16,7 +16,7 @@ import fs2._
import docspell.backend.ops.{OJoex, OUpload} import docspell.backend.ops.{OJoex, OUpload}
import docspell.common._ import docspell.common._
import docspell.joex.Config import docspell.joex.Config
import docspell.joex.scheduler.{Context, Task} import docspell.scheduler.{Context, Task}
import docspell.logging.Logger import docspell.logging.Logger
import docspell.store.queries.QOrganization import docspell.store.queries.QOrganization
import docspell.store.records._ import docspell.store.records._

View File

@ -1,82 +0,0 @@
/*
* Copyright 2020 Eike K. & Contributors
*
* SPDX-License-Identifier: AGPL-3.0-or-later
*/
package docspell.joex.scheduler
import cats.effect._
import cats.implicits._
import cats.{Applicative, Functor}
import docspell.common._
import docspell.logging.Logger
import docspell.store.Store
import docspell.store.records.RJob
trait Context[F[_], A] { self =>
def jobId: Ident
def args: A
def config: SchedulerConfig
def logger: Logger[F]
def setProgress(percent: Int): F[Unit]
def store: Store[F]
final def isLastRetry(implicit ev: Applicative[F]): F[Boolean] =
for {
current <- store.transact(RJob.getRetries(jobId))
last = config.retries == current.getOrElse(0)
} yield last
def map[C](f: A => C)(implicit F: Functor[F]): Context[F, C] =
new Context.ContextImpl[F, C](f(args), logger, store, config, jobId)
}
object Context {
def create[F[_]: Async, A](
jobId: Ident,
arg: A,
config: SchedulerConfig,
log: Logger[F],
store: Store[F]
): Context[F, A] =
new ContextImpl(arg, log, store, config, jobId)
def apply[F[_]: Async, A](
job: RJob,
arg: A,
config: SchedulerConfig,
logSink: LogSink[F],
store: Store[F]
): F[Context[F, A]] = {
val log = docspell.logging.getLogger[F]
for {
_ <- log.trace("Creating logger for task run")
logger <- QueueLogger(job.id, job.info, config.logBufferSize, logSink)
_ <- log.trace("Logger created, instantiating context")
ctx = create[F, A](job.id, arg, config, logger, store)
} yield ctx
}
final private class ContextImpl[F[_]: Functor, A](
val args: A,
val logger: Logger[F],
val store: Store[F],
val config: SchedulerConfig,
val jobId: Ident
) extends Context[F, A] {
def setProgress(percent: Int): F[Unit] = {
val pval = math.min(100, math.max(0, percent))
store.transact(RJob.setProgress(jobId, pval)).map(_ => ())
}
}
}

View File

@ -1,44 +0,0 @@
/*
* Copyright 2020 Eike K. & Contributors
*
* SPDX-License-Identifier: AGPL-3.0-or-later
*/
package docspell.joex.scheduler
import cats.implicits._
import docspell.common.Priority
/** A counting scheme to indicate a ratio between scheduling high and low priority jobs.
*
* For example high=4, low=1 means: ”schedule 4 high priority jobs and then 1 low
* priority job“.
*/
case class CountingScheme(high: Int, low: Int, counter: Int = 0) {
def nextPriority: (CountingScheme, Priority) =
if (counter <= 0) (increment, Priority.High)
else {
val rest = counter % (high + low)
if (rest < high) (increment, Priority.High)
else (increment, Priority.Low)
}
def increment: CountingScheme =
copy(counter = counter + 1)
}
object CountingScheme {
def writeString(cs: CountingScheme): String =
s"${cs.high},${cs.low}"
def readString(str: String): Either[String, CountingScheme] =
str.split(',') match {
case Array(h, l) =>
Either.catchNonFatal(CountingScheme(h.toInt, l.toInt)).left.map(_.getMessage)
case _ =>
Left(s"Invalid counting scheme: $str")
}
}

View File

@ -1,51 +0,0 @@
/*
* Copyright 2020 Eike K. & Contributors
*
* SPDX-License-Identifier: AGPL-3.0-or-later
*/
package docspell.joex.scheduler
import cats.effect.Sync
import cats.implicits._
import docspell.common.Ident
import docspell.common.syntax.all._
import io.circe.Decoder
/** Binds a Task to a name. This is required to lookup the code based on the taskName in
* the RJob data and to execute it given the arguments that have to be read from a
* string.
*
* Since the scheduler only has a string for the task argument, this only works for Task
* impls that accept a string. There is a convenience constructor that uses circe to
* decode json into some type A.
*/
case class JobTask[F[_]](
name: Ident,
task: Task[F, String, JobTaskResult],
onCancel: Task[F, String, Unit]
)
object JobTask {
def json[F[_]: Sync, A, B](
name: Ident,
task: Task[F, A, B],
onCancel: Task[F, A, Unit]
)(implicit
D: Decoder[A],
E: JobTaskResultEncoder[B]
): JobTask[F] = {
val convert: String => F[A] =
str =>
str.parseJsonAs[A] match {
case Right(a) => a.pure[F]
case Left(ex) =>
Sync[F].raiseError(new Exception(s"Cannot parse task arguments: $str", ex))
}
JobTask(name, task.contramap(convert).map(E.encode), onCancel.contramap(convert))
}
}

View File

@ -1,30 +0,0 @@
/*
* Copyright 2020 Eike K. & Contributors
*
* SPDX-License-Identifier: AGPL-3.0-or-later
*/
package docspell.joex.scheduler
import docspell.common.Ident
/** This is a mapping from some identifier to a task. This is used by the scheduler to
* lookup an implementation using the taskName field of the RJob database record.
*/
final class JobTaskRegistry[F[_]](tasks: Map[Ident, JobTask[F]]) {
def withTask(task: JobTask[F]): JobTaskRegistry[F] =
JobTaskRegistry(tasks.updated(task.name, task))
def find(taskName: Ident): Option[JobTask[F]] =
tasks.get(taskName)
}
object JobTaskRegistry {
def apply[F[_]](map: Map[Ident, JobTask[F]]): JobTaskRegistry[F] =
new JobTaskRegistry[F](map)
def empty[F[_]]: JobTaskRegistry[F] = apply(Map.empty)
}

View File

@ -1,27 +0,0 @@
/*
* Copyright 2020 Eike K. & Contributors
*
* SPDX-License-Identifier: AGPL-3.0-or-later
*/
package docspell.joex.scheduler
import io.circe.Json
final case class JobTaskResult(message: Option[String], json: Option[Json]) {
def withMessage(m: String): JobTaskResult =
copy(message = Some(m))
def withJson(json: Json): JobTaskResult =
copy(json = Some(json))
}
object JobTaskResult {
val empty: JobTaskResult = JobTaskResult(None, None)
def message(msg: String): JobTaskResult = JobTaskResult(Some(msg), None)
def json(json: Json): JobTaskResult = JobTaskResult(None, Some(json))
}

View File

@ -1,49 +0,0 @@
/*
* Copyright 2020 Eike K. & Contributors
*
* SPDX-License-Identifier: AGPL-3.0-or-later
*/
package docspell.joex.scheduler
import docspell.joex.scheduler.JobTaskResultEncoder.instance
import io.circe.Encoder
trait JobTaskResultEncoder[A] { self =>
def encode(a: A): JobTaskResult
final def contramap[B](f: B => A): JobTaskResultEncoder[B] =
JobTaskResultEncoder.instance(b => self.encode(f(b)))
final def map(f: JobTaskResult => JobTaskResult): JobTaskResultEncoder[A] =
instance(a => f(self.encode(a)))
final def modify(f: (A, JobTaskResult) => JobTaskResult): JobTaskResultEncoder[A] =
instance(a => f(a, self.encode(a)))
final def withMessage(f: A => String): JobTaskResultEncoder[A] =
modify((a, r) => r.withMessage(f(a)))
}
object JobTaskResultEncoder {
def apply[A](implicit v: JobTaskResultEncoder[A]): JobTaskResultEncoder[A] = v
def instance[A](f: A => JobTaskResult): JobTaskResultEncoder[A] =
(a: A) => f(a)
def fromJson[A: Encoder]: JobTaskResultEncoder[A] =
instance(a => JobTaskResult.json(Encoder[A].apply(a)))
implicit val unitJobTaskResultEncoder: JobTaskResultEncoder[Unit] =
instance(_ => JobTaskResult.empty)
implicit def optionJobTaskResultEncoder[A](implicit
ea: JobTaskResultEncoder[A]
): JobTaskResultEncoder[Option[A]] =
instance {
case Some(a) => ea.encode(a)
case None => JobTaskResult.empty
}
}

View File

@ -1,38 +0,0 @@
/*
* Copyright 2020 Eike K. & Contributors
*
* SPDX-License-Identifier: AGPL-3.0-or-later
*/
package docspell.joex.scheduler
import cats.effect.Sync
import cats.implicits._
import docspell.common._
case class LogEvent(
jobId: Ident,
jobInfo: String,
time: Timestamp,
level: LogLevel,
msg: String,
ex: Option[Throwable] = None
) {
def logLine: String =
s">>> ${time.asString} $level $jobInfo: $msg"
}
object LogEvent {
def create[F[_]: Sync](
jobId: Ident,
jobInfo: String,
level: LogLevel,
msg: String
): F[LogEvent] =
Timestamp.current[F].map(now => LogEvent(jobId, jobInfo, now, level, msg))
}

View File

@ -1,76 +0,0 @@
/*
* Copyright 2020 Eike K. & Contributors
*
* SPDX-License-Identifier: AGPL-3.0-or-later
*/
package docspell.joex.scheduler
import cats.effect._
import cats.implicits._
import fs2.Pipe
import docspell.common._
import docspell.logging
import docspell.store.Store
import docspell.store.records.RJobLog
trait LogSink[F[_]] {
def receive: Pipe[F, LogEvent, Unit]
}
object LogSink {
def apply[F[_]](sink: Pipe[F, LogEvent, Unit]): LogSink[F] =
new LogSink[F] {
val receive = sink
}
def logInternal[F[_]: Sync](e: LogEvent): F[Unit] = {
val logger = docspell.logging.getLogger[F]
val addData: logging.LogEvent => logging.LogEvent =
_.data("jobId", e.jobId).data("jobInfo", e.jobInfo)
e.level match {
case LogLevel.Info =>
logger.infoWith(e.logLine)(addData)
case LogLevel.Debug =>
logger.debugWith(e.logLine)(addData)
case LogLevel.Warn =>
logger.warnWith(e.logLine)(addData)
case LogLevel.Error =>
e.ex match {
case Some(exc) =>
logger.errorWith(e.logLine)(addData.andThen(_.addError(exc)))
case None =>
logger.errorWith(e.logLine)(addData)
}
}
}
def printer[F[_]: Sync]: LogSink[F] =
LogSink(_.evalMap(e => logInternal(e)))
def db[F[_]: Async](store: Store[F]): LogSink[F] =
LogSink(
_.evalMap(ev =>
for {
id <- Ident.randomId[F]
joblog = RJobLog(
id,
ev.jobId,
ev.level,
ev.time,
ev.msg + ev.ex.map(th => ": " + th.getMessage).getOrElse("")
)
_ <- logInternal(ev)
_ <- store.transact(RJobLog.insert(joblog))
} yield ()
)
)
def dbAndLog[F[_]: Async](store: Store[F]): LogSink[F] =
LogSink(_.broadcastThrough(printer[F].receive, db[F](store).receive))
}

View File

@ -1,60 +0,0 @@
/*
* Copyright 2020 Eike K. & Contributors
*
* SPDX-License-Identifier: AGPL-3.0-or-later
*/
package docspell.joex.scheduler
import cats.effect._
import fs2._
import fs2.concurrent.SignallingRef
import docspell.backend.ops.OJoex
import docspell.store.queue._
/** A periodic scheduler takes care to submit periodic tasks to the job queue.
*
* It is run in the background to regularily find a periodic task to execute. If the task
* is due, it will be submitted into the job queue where it will be picked up by the
* scheduler from some joex instance. If it is due in the future, a notification is
* scheduled to be received at that time so the task can be looked up again.
*/
trait PeriodicScheduler[F[_]] {
def config: PeriodicSchedulerConfig
def start: Stream[F, Nothing]
def shutdown: F[Unit]
def periodicAwake: F[Fiber[F, Throwable, Unit]]
def notifyChange: F[Unit]
}
object PeriodicScheduler {
def create[F[_]: Async](
cfg: PeriodicSchedulerConfig,
sch: Scheduler[F],
queue: JobQueue[F],
store: PeriodicTaskStore[F],
joex: OJoex[F]
): Resource[F, PeriodicScheduler[F]] =
for {
waiter <- Resource.eval(SignallingRef(true))
state <- Resource.eval(SignallingRef(PeriodicSchedulerImpl.emptyState[F]))
psch = new PeriodicSchedulerImpl[F](
cfg,
sch,
queue,
store,
joex,
waiter,
state
)
_ <- Resource.eval(psch.init)
} yield psch
}

View File

@ -1,14 +0,0 @@
/*
* Copyright 2020 Eike K. & Contributors
*
* SPDX-License-Identifier: AGPL-3.0-or-later
*/
package docspell.joex.scheduler
import docspell.common._
case class PeriodicSchedulerConfig(
name: Ident,
wakeupPeriod: Duration
)

View File

@ -1,182 +0,0 @@
/*
* Copyright 2020 Eike K. & Contributors
*
* SPDX-License-Identifier: AGPL-3.0-or-later
*/
package docspell.joex.scheduler
import cats.effect._
import cats.implicits._
import fs2._
import fs2.concurrent.SignallingRef
import docspell.backend.ops.OJoex
import docspell.common._
import docspell.joex.scheduler.PeriodicSchedulerImpl.State
import docspell.store.queue._
import docspell.store.records.RPeriodicTask
import eu.timepit.fs2cron.calev.CalevScheduler
final class PeriodicSchedulerImpl[F[_]: Async](
val config: PeriodicSchedulerConfig,
sch: Scheduler[F],
queue: JobQueue[F],
store: PeriodicTaskStore[F],
joex: OJoex[F],
waiter: SignallingRef[F, Boolean],
state: SignallingRef[F, State[F]]
) extends PeriodicScheduler[F] {
private[this] val logger = docspell.logging.getLogger[F]
def start: Stream[F, Nothing] =
logger.stream.info("Starting periodic scheduler").drain ++
mainLoop
def shutdown: F[Unit] =
state.modify(_.requestShutdown)
def periodicAwake: F[Fiber[F, Throwable, Unit]] =
Async[F].start(
Stream
.awakeEvery[F](config.wakeupPeriod.toScala)
.evalMap(_ => logger.debug("Periodic awake reached") *> notifyChange)
.compile
.drain
)
def notifyChange: F[Unit] =
waiter.update(b => !b)
// internal
/** On startup, get all periodic jobs from this scheduler and remove the mark, so they
* get picked up again.
*/
def init: F[Unit] =
logError("Error clearing marks")(store.clearMarks(config.name))
def mainLoop: Stream[F, Nothing] = {
val body: F[Boolean] =
for {
_ <- logger.debug(s"Going into main loop")
now <- Timestamp.current[F]
_ <- logger.debug(s"Looking for next periodic task")
go <- logThrow("Error getting next task")(
store
.takeNext(config.name, None)
.use {
case Marked.Found(pj) =>
logger
.debug(s"Found periodic task '${pj.subject}/${pj.timer.asString}'") *>
(if (isTriggered(pj, now)) submitJob(pj)
else scheduleNotify(pj).map(_ => false))
case Marked.NotFound =>
logger.debug("No periodic task found") *> false.pure[F]
case Marked.NotMarkable =>
logger.debug("Periodic job cannot be marked. Trying again.") *> true
.pure[F]
}
)
} yield go
Stream
.eval(state.get.map(_.shutdownRequest))
.evalTap(
if (_) logger.info("Stopping main loop due to shutdown request.")
else ().pure[F]
)
.flatMap(if (_) Stream.empty else Stream.eval(cancelNotify *> body))
.flatMap {
case true =>
mainLoop
case false =>
logger.stream.debug(s"Waiting for notify").drain ++
waiter.discrete.take(2).drain ++
logger.stream.debug(s"Notify signal, going into main loop").drain ++
mainLoop
}
}
def isTriggered(pj: RPeriodicTask, now: Timestamp): Boolean =
pj.nextrun < now
def submitJob(pj: RPeriodicTask): F[Boolean] =
store
.findNonFinalJob(pj.id)
.flatMap {
case Some(job) =>
logger.info(
s"There is already a job with non-final state '${job.state}' in the queue"
) *> scheduleNotify(pj) *> false.pure[F]
case None =>
logger.info(s"Submitting job for periodic task '${pj.task.id}'") *>
pj.toJob.flatMap(queue.insert) *> notifyJoex *> true.pure[F]
}
def notifyJoex: F[Unit] =
sch.notifyChange *> joex.notifyAllNodes
def scheduleNotify(pj: RPeriodicTask): F[Unit] =
Timestamp
.current[F]
.flatMap(now =>
logger.debug(
s"Scheduling next notify for timer ${pj.timer.asString} -> ${pj.timer.nextElapse(now.toUtcDateTime)}"
)
) *>
Async[F]
.start(
CalevScheduler
.utc[F]
.sleep(pj.timer)
.evalMap(_ => notifyChange)
.compile
.drain
)
.flatMap(fb => state.modify(_.setNotify(fb)))
def cancelNotify: F[Unit] =
state
.modify(_.clearNotify)
.flatMap {
case Some(fb) =>
fb.cancel
case None =>
().pure[F]
}
private def logError(msg: => String)(fa: F[Unit]): F[Unit] =
fa.attempt.flatMap {
case Right(_) => ().pure[F]
case Left(ex) => logger.error(ex)(msg).map(_ => ())
}
private def logThrow[A](msg: => String)(fa: F[A]): F[A] =
fa.attempt.flatMap {
case r @ Right(_) => (r: Either[Throwable, A]).pure[F]
case l @ Left(ex) => logger.error(ex)(msg).map(_ => (l: Either[Throwable, A]))
}.rethrow
}
object PeriodicSchedulerImpl {
def emptyState[F[_]]: State[F] =
State(false, None)
case class State[F[_]](
shutdownRequest: Boolean,
scheduledNotify: Option[Fiber[F, Throwable, Unit]]
) {
def requestShutdown: (State[F], Unit) =
(copy(shutdownRequest = true), ())
def setNotify(fb: Fiber[F, Throwable, Unit]): (State[F], Unit) =
(copy(scheduledNotify = Some(fb)), ())
def clearNotify: (State[F], Option[Fiber[F, Throwable, Unit]]) =
(copy(scheduledNotify = None), scheduledNotify)
}
}

View File

@ -1,58 +0,0 @@
/*
* Copyright 2020 Eike K. & Contributors
*
* SPDX-License-Identifier: AGPL-3.0-or-later
*/
package docspell.joex.scheduler
import cats.effect._
import cats.effect.std.Queue
import cats.implicits._
import fs2.Stream
import docspell.common._
import docspell.logging
import docspell.logging.{Level, Logger}
object QueueLogger {
def create[F[_]: Sync](
jobId: Ident,
jobInfo: String,
q: Queue[F, LogEvent]
): Logger[F] =
new Logger[F] {
def log(logEvent: logging.LogEvent) =
LogEvent
.create[F](jobId, jobInfo, level2Level(logEvent.level), logEvent.msg())
.flatMap { ev =>
val event =
logEvent.findErrors.headOption
.map(ex => ev.copy(ex = Some(ex)))
.getOrElse(ev)
q.offer(event)
}
def asUnsafe = Logger.off
}
def apply[F[_]: Async](
jobId: Ident,
jobInfo: String,
bufferSize: Int,
sink: LogSink[F]
): F[Logger[F]] =
for {
q <- Queue.circularBuffer[F, LogEvent](bufferSize)
log = create(jobId, jobInfo, q)
_ <- Async[F].start(
Stream.fromQueueUnterminated(q).through(sink.receive).compile.drain
)
} yield log
private def level2Level(level: Level): LogLevel =
LogLevel.fromLevel(level)
}

View File

@ -1,38 +0,0 @@
/*
* Copyright 2020 Eike K. & Contributors
*
* SPDX-License-Identifier: AGPL-3.0-or-later
*/
package docspell.joex.scheduler
import cats.effect._
import fs2.Stream
import docspell.common.Ident
import docspell.store.records.RJob
trait Scheduler[F[_]] {
def config: SchedulerConfig
def getRunning: F[Vector[RJob]]
def requestCancel(jobId: Ident): F[Boolean]
def notifyChange: F[Unit]
def start: Stream[F, Nothing]
/** Requests to shutdown the scheduler.
*
* The scheduler will not take any new jobs from the queue. If there are still running
* jobs, it waits for them to complete. when the cancelAll flag is set to true, it
* cancels all running jobs.
*
* The returned F[Unit] can be evaluated to wait for all that to complete.
*/
def shutdown(cancelAll: Boolean): F[Unit]
def periodicAwake: F[Fiber[F, Throwable, Unit]]
}

View File

@ -1,96 +0,0 @@
/*
* Copyright 2020 Eike K. & Contributors
*
* SPDX-License-Identifier: AGPL-3.0-or-later
*/
package docspell.joex.scheduler
import cats.effect._
import cats.effect.std.Semaphore
import cats.implicits._
import fs2.concurrent.SignallingRef
import docspell.notification.api.EventSink
import docspell.pubsub.api.PubSubT
import docspell.store.Store
import docspell.store.queue.JobQueue
case class SchedulerBuilder[F[_]: Async](
config: SchedulerConfig,
tasks: JobTaskRegistry[F],
store: Store[F],
queue: Resource[F, JobQueue[F]],
logSink: LogSink[F],
pubSub: PubSubT[F],
eventSink: EventSink[F]
) {
def withConfig(cfg: SchedulerConfig): SchedulerBuilder[F] =
copy(config = cfg)
def withTaskRegistry(reg: JobTaskRegistry[F]): SchedulerBuilder[F] =
copy(tasks = reg)
def withTask[A](task: JobTask[F]): SchedulerBuilder[F] =
withTaskRegistry(tasks.withTask(task))
def withQueue(queue: Resource[F, JobQueue[F]]): SchedulerBuilder[F] =
copy(queue = queue)
def withLogSink(sink: LogSink[F]): SchedulerBuilder[F] =
copy(logSink = sink)
def withQueue(queue: JobQueue[F]): SchedulerBuilder[F] =
copy(queue = Resource.pure[F, JobQueue[F]](queue))
def withPubSub(pubSubT: PubSubT[F]): SchedulerBuilder[F] =
copy(pubSub = pubSubT)
def withEventSink(sink: EventSink[F]): SchedulerBuilder[F] =
copy(eventSink = sink)
def serve: Resource[F, Scheduler[F]] =
resource.evalMap(sch => Async[F].start(sch.start.compile.drain).map(_ => sch))
def resource: Resource[F, Scheduler[F]] = {
val scheduler: Resource[F, SchedulerImpl[F]] = for {
jq <- queue
waiter <- Resource.eval(SignallingRef(true))
state <- Resource.eval(SignallingRef(SchedulerImpl.emptyState[F]))
perms <- Resource.eval(Semaphore(config.poolSize.toLong))
} yield new SchedulerImpl[F](
config,
jq,
pubSub,
eventSink,
tasks,
store,
logSink,
state,
waiter,
perms
)
scheduler.evalTap(_.init).map(s => s: Scheduler[F])
}
}
object SchedulerBuilder {
def apply[F[_]: Async](
config: SchedulerConfig,
store: Store[F]
): SchedulerBuilder[F] =
new SchedulerBuilder[F](
config,
JobTaskRegistry.empty[F],
store,
JobQueue(store),
LogSink.db[F](store),
PubSubT.noop[F],
EventSink.silent[F]
)
}

View File

@ -1,33 +0,0 @@
/*
* Copyright 2020 Eike K. & Contributors
*
* SPDX-License-Identifier: AGPL-3.0-or-later
*/
package docspell.joex.scheduler
import docspell.common._
case class SchedulerConfig(
name: Ident,
poolSize: Int,
countingScheme: CountingScheme,
retries: Int,
retryDelay: Duration,
logBufferSize: Int,
wakeupPeriod: Duration
)
object SchedulerConfig {
val default = SchedulerConfig(
name = Ident.unsafe("default-scheduler"),
poolSize = 2 // math.max(2, Runtime.getRuntime.availableProcessors / 2)
,
countingScheme = CountingScheme(2, 1),
retries = 5,
retryDelay = Duration.seconds(30),
logBufferSize = 500,
wakeupPeriod = Duration.minutes(10)
)
}

View File

@ -1,351 +0,0 @@
/*
* Copyright 2020 Eike K. & Contributors
*
* SPDX-License-Identifier: AGPL-3.0-or-later
*/
package docspell.joex.scheduler
import cats.data.OptionT
import cats.effect._
import cats.effect.std.Semaphore
import cats.implicits._
import fs2.Stream
import fs2.concurrent.SignallingRef
import docspell.backend.msg.JobDone
import docspell.common._
import docspell.joex.scheduler.SchedulerImpl._
import docspell.notification.api.Event
import docspell.notification.api.EventSink
import docspell.pubsub.api.PubSubT
import docspell.store.Store
import docspell.store.queries.QJob
import docspell.store.queue.JobQueue
import docspell.store.records.RJob
import io.circe.Json
final class SchedulerImpl[F[_]: Async](
val config: SchedulerConfig,
queue: JobQueue[F],
pubSub: PubSubT[F],
eventSink: EventSink[F],
tasks: JobTaskRegistry[F],
store: Store[F],
logSink: LogSink[F],
state: SignallingRef[F, State[F]],
waiter: SignallingRef[F, Boolean],
permits: Semaphore[F]
) extends Scheduler[F] {
private[this] val logger = docspell.logging.getLogger[F]
/** On startup, get all jobs in state running from this scheduler and put them into
* waiting state, so they get picked up again.
*/
def init: F[Unit] =
QJob.runningToWaiting(config.name, store)
def periodicAwake: F[Fiber[F, Throwable, Unit]] =
Async[F].start(
Stream
.awakeEvery[F](config.wakeupPeriod.toScala)
.evalMap(_ => logger.debug("Periodic awake reached") *> notifyChange)
.compile
.drain
)
def getRunning: F[Vector[RJob]] =
state.get.flatMap(s => QJob.findAll(s.getRunning, store))
def requestCancel(jobId: Ident): F[Boolean] =
logger.info(s"Scheduler requested to cancel job: ${jobId.id}") *>
state.get.flatMap(_.cancelRequest(jobId) match {
case Some(ct) => ct.map(_ => true)
case None =>
(for {
job <- OptionT(store.transact(RJob.findByIdAndWorker(jobId, config.name)))
_ <- OptionT.liftF(
if (job.isInProgress) executeCancel(job)
else ().pure[F]
)
} yield true)
.getOrElseF(
logger.warn(s"Job ${jobId.id} not found, cannot cancel.").map(_ => false)
)
})
def notifyChange: F[Unit] =
waiter.update(b => !b)
def shutdown(cancelAll: Boolean): F[Unit] = {
val doCancel =
state.get.flatMap(_.cancelTokens.values.toList.traverse(identity)).map(_ => ())
val runShutdown =
state.modify(_.requestShutdown) *> (if (cancelAll) doCancel else ().pure[F])
val wait = Stream
.eval(runShutdown)
.evalMap(_ => logger.info("Scheduler is shutting down now."))
.flatMap(_ =>
Stream.eval(state.get) ++ Stream
.suspend(state.discrete.takeWhile(_.getRunning.nonEmpty))
)
.flatMap { state =>
if (state.getRunning.isEmpty) Stream.eval(logger.info("No jobs running."))
else
Stream.eval(
logger.info(s"Waiting for ${state.getRunning.size} jobs to finish.")
) ++
Stream.emit(state)
}
(wait.drain ++ Stream.emit(())).compile.lastOrError
}
def start: Stream[F, Nothing] =
logger.stream.info("Starting scheduler").drain ++
mainLoop
def mainLoop: Stream[F, Nothing] = {
val body: F[Boolean] =
for {
_ <- permits.available.flatMap(a =>
logger.debug(s"Try to acquire permit ($a free)")
)
_ <- permits.acquire
_ <- logger.debug("New permit acquired")
down <- state.get.map(_.shutdownRequest)
rjob <-
if (down)
logger.info("") *> permits.release *> (None: Option[RJob]).pure[F]
else
queue.nextJob(
group => state.modify(_.nextPrio(group, config.countingScheme)),
config.name,
config.retryDelay
)
_ <- logger.debug(s"Next job found: ${rjob.map(_.info)}")
_ <- rjob.map(execute).getOrElse(permits.release)
} yield rjob.isDefined
Stream
.eval(state.get.map(_.shutdownRequest))
.evalTap(
if (_) logger.info("Stopping main loop due to shutdown request.")
else ().pure[F]
)
.flatMap(if (_) Stream.empty else Stream.eval(body))
.flatMap {
case true =>
mainLoop
case false =>
logger.stream.debug(s"Waiting for notify").drain ++
waiter.discrete.take(2).drain ++
logger.stream.debug(s"Notify signal, going into main loop").drain ++
mainLoop
}
}
private def executeCancel(job: RJob): F[Unit] = {
val task = for {
jobtask <-
tasks
.find(job.task)
.toRight(s"This executor cannot run tasks with name: ${job.task}")
} yield jobtask
task match {
case Left(err) =>
logger.error(s"Unable to run cancellation task for job ${job.info}: $err")
case Right(t) =>
for {
_ <-
logger.debug(s"Creating context for job ${job.info} to run cancellation $t")
ctx <- Context[F, String](job, job.args, config, logSink, store)
_ <- t.onCancel.run(ctx)
_ <- state.modify(_.markCancelled(job))
_ <- onFinish(job, JobTaskResult.empty, JobState.Cancelled)
_ <- ctx.logger.warn("Job has been cancelled.")
_ <- logger.debug(s"Job ${job.info} has been cancelled.")
} yield ()
}
}
def execute(job: RJob): F[Unit] = {
val task = for {
jobtask <-
tasks
.find(job.task)
.toRight(s"This executor cannot run tasks with name: ${job.task}")
} yield jobtask
task match {
case Left(err) =>
logger.error(s"Unable to start a task for job ${job.info}: $err")
case Right(t) =>
for {
_ <- logger.debug(s"Creating context for job ${job.info} to run $t")
ctx <- Context[F, String](job, job.args, config, logSink, store)
jot = wrapTask(job, t.task, ctx)
tok <- forkRun(job, jot.run(ctx), t.onCancel.run(ctx), ctx)
_ <- state.modify(_.addRunning(job, tok))
} yield ()
}
}
def onFinish(job: RJob, result: JobTaskResult, finishState: JobState): F[Unit] =
for {
_ <- logger.debug(s"Job ${job.info} done $finishState. Releasing resources.")
_ <- permits.release *> permits.available.flatMap(a =>
logger.debug(s"Permit released ($a free)")
)
_ <- state.modify(_.removeRunning(job))
_ <- QJob.setFinalState(job.id, finishState, store)
_ <- Sync[F].whenA(JobState.isDone(finishState))(
pubSub.publish1IgnoreErrors(
JobDone.topic,
JobDone(job.id, job.group, job.task, job.args, finishState)
)
)
_ <- Sync[F].whenA(JobState.isDone(finishState))(
eventSink.offer(
Event.JobDone(
job.id,
job.group,
job.task,
job.args,
job.state,
job.subject,
job.submitter,
result.json.getOrElse(Json.Null),
result.message
)
)
)
} yield ()
def onStart(job: RJob): F[Unit] =
QJob.setRunning(
job.id,
config.name,
store
) // also increments retries if current state=stuck
def wrapTask(
job: RJob,
task: Task[F, String, JobTaskResult],
ctx: Context[F, String]
): Task[F, String, Unit] =
task
.mapF(fa => onStart(job) *> logger.debug("Starting task now") *> fa)
.mapF(_.attempt.flatMap {
case Right(result) =>
logger.info(s"Job execution successful: ${job.info}")
ctx.logger.info("Job execution successful") *>
(JobState.Success: JobState, result).pure[F]
case Left(ex) =>
state.get.map(_.wasCancelled(job)).flatMap {
case true =>
logger.error(ex)(s"Job ${job.info} execution failed (cancel = true)")
ctx.logger.error(ex)("Job execution failed (cancel = true)") *>
(JobState.Cancelled: JobState, JobTaskResult.empty).pure[F]
case false =>
QJob.exceedsRetries(job.id, config.retries, store).flatMap {
case true =>
logger.error(ex)(s"Job ${job.info} execution failed. Retries exceeded.")
ctx.logger
.error(ex)(s"Job ${job.info} execution failed. Retries exceeded.")
.map(_ => (JobState.Failed: JobState, JobTaskResult.empty))
case false =>
logger.error(ex)(s"Job ${job.info} execution failed. Retrying later.")
ctx.logger
.error(ex)(s"Job ${job.info} execution failed. Retrying later.")
.map(_ => (JobState.Stuck: JobState, JobTaskResult.empty))
}
}
})
.mapF(_.attempt.flatMap {
case Right((jstate, result)) =>
onFinish(job, result, jstate)
case Left(ex) =>
logger.error(ex)(s"Error happened during post-processing of ${job.info}!")
// we don't know the real outcome here…
// since tasks should be idempotent, set it to stuck. if above has failed, this might fail anyways
onFinish(job, JobTaskResult.empty, JobState.Stuck)
})
def forkRun(
job: RJob,
code: F[Unit],
onCancel: F[Unit],
ctx: Context[F, String]
): F[F[Unit]] =
logger.debug(s"Forking job ${job.info}") *>
Async[F]
.start(code)
.map(fiber =>
logger.debug(s"Cancelling job ${job.info}") *>
fiber.cancel *>
onCancel.attempt.map {
case Right(_) => ()
case Left(ex) =>
logger.error(ex)(s"Task's cancelling code failed. Job ${job.info}.")
()
} *>
state.modify(_.markCancelled(job)) *>
onFinish(job, JobTaskResult.empty, JobState.Cancelled) *>
ctx.logger.warn("Job has been cancelled.") *>
logger.debug(s"Job ${job.info} has been cancelled.")
)
}
object SchedulerImpl {
type CancelToken[F[_]] = F[Unit]
def emptyState[F[_]]: State[F] =
State(Map.empty, Set.empty, Map.empty, false)
case class State[F[_]](
counters: Map[Ident, CountingScheme],
cancelled: Set[Ident],
cancelTokens: Map[Ident, CancelToken[F]],
shutdownRequest: Boolean
) {
def nextPrio(group: Ident, initial: CountingScheme): (State[F], Priority) = {
val (cs, prio) = counters.getOrElse(group, initial).nextPriority
(copy(counters = counters.updated(group, cs)), prio)
}
def addRunning(job: RJob, token: CancelToken[F]): (State[F], Unit) =
(
State(counters, cancelled, cancelTokens.updated(job.id, token), shutdownRequest),
()
)
def removeRunning(job: RJob): (State[F], Unit) =
(
copy(cancelled = cancelled - job.id, cancelTokens = cancelTokens.removed(job.id)),
()
)
def markCancelled(job: RJob): (State[F], Unit) =
(copy(cancelled = cancelled + job.id), ())
def wasCancelled(job: RJob): Boolean =
cancelled.contains(job.id)
def cancelRequest(id: Ident): Option[F[Unit]] =
cancelTokens.get(id)
def getRunning: Seq[Ident] =
cancelTokens.keys.toSeq
def requestShutdown: (State[F], Unit) =
(copy(shutdownRequest = true), ())
}
}

View File

@ -1,72 +0,0 @@
/*
* Copyright 2020 Eike K. & Contributors
*
* SPDX-License-Identifier: AGPL-3.0-or-later
*/
package docspell.joex.scheduler
import cats._
import cats.data.Kleisli
import cats.effect.Sync
import cats.implicits._
import docspell.logging.Logger
/** The code that is executed by the scheduler */
trait Task[F[_], A, B] {
def run(ctx: Context[F, A]): F[B]
def andThen[C](f: B => F[C])(implicit F: FlatMap[F]): Task[F, A, C] =
Task(Task.toKleisli(this).andThen(f))
def mapF[C](f: F[B] => F[C]): Task[F, A, C] =
Task(Task.toKleisli(this).mapF(f))
def attempt(implicit
F: ApplicativeError[F, Throwable]
): Task[F, A, Either[Throwable, B]] =
mapF(_.attempt)
def contramap[C](f: C => F[A])(implicit F: FlatMap[F]): Task[F, C, B] = {
ctxc: Context[F, C] => f(ctxc.args).flatMap(a => run(ctxc.map(_ => a)))
}
}
object Task {
def pure[F[_]: Applicative, A, B](b: B): Task[F, A, B] =
Task(_ => b.pure[F])
def of[F[_], A, B](b: F[B]): Task[F, A, B] =
Task(_ => b)
def apply[F[_], A, B](f: Context[F, A] => F[B]): Task[F, A, B] =
(ctx: Context[F, A]) => f(ctx)
def apply[F[_], A, B](k: Kleisli[F, Context[F, A], B]): Task[F, A, B] =
c => k.run(c)
def toKleisli[F[_], A, B](t: Task[F, A, B]): Kleisli[F, Context[F, A], B] =
Kleisli(t.run)
def setProgress[F[_]: Sync, A, B](n: Int)(data: B): Task[F, A, B] =
Task(_.setProgress(n).map(_ => data))
def log[F[_], A](f: Logger[F] => F[Unit]): Task[F, A, Unit] =
Task(ctx => f(ctx.logger))
implicit def taskMonad[F[_]: Monad, T]: Monad[Task[F, T, *]] =
new Monad[Task[F, T, *]] {
def pure[A](x: A) = Task(_ => Monad[F].pure(x))
def flatMap[A, B](fa: Task[F, T, A])(f: A => Task[F, T, B]) =
Task(Task.toKleisli(fa).flatMap(a => Task.toKleisli(f(a))))
def tailRecM[A, B](a: A)(f: A => Task[F, T, Either[A, B]]) = {
val monadK = Monad[Kleisli[F, Context[F, T], *]]
val r = monadK.tailRecM(a)(x => Task.toKleisli(f(x)))
Task(r)
}
}
}

View File

@ -11,8 +11,8 @@ import cats.effect._
import cats.implicits._ import cats.implicits._
import docspell.common._ import docspell.common._
import docspell.joex.scheduler.Context import docspell.scheduler.Context
import docspell.joex.scheduler.Task import docspell.scheduler.Task
import docspell.store.records.RPeriodicTask import docspell.store.records.RPeriodicTask
import docspell.store.records.RUserEmail import docspell.store.records.RUserEmail
import docspell.store.usertask.UserTask import docspell.store.usertask.UserTask