diff --git a/build.sbt b/build.sbt index aa74d056..9eb54d39 100644 --- a/build.sbt +++ b/build.sbt @@ -519,6 +519,29 @@ val pubsubNaive = project ) .dependsOn(common, pubsubApi, store % "compile->compile;test->test") +val schedulerApi = project + .in(file("modules/scheduler/api")) + .disablePlugins(RevolverPlugin) + .settings(sharedSettings) + .withTestSettingsDependsOn(loggingScribe) + .settings( + name := "docspell-scheduler-api", + libraryDependencies ++= + Dependencies.fs2Core ++ + Dependencies.circeCore + ) + .dependsOn(loggingApi, common, store, pubsubApi) + +val schedulerImpl = project + .in(file("modules/scheduler/impl")) + .disablePlugins(RevolverPlugin) + .settings(sharedSettings) + .withTestSettingsDependsOn(loggingScribe) + .settings( + name := "docspell-scheduler-impl" + ) + .dependsOn(schedulerApi, notificationApi, pubsubApi) + val extract = project .in(file("modules/extract")) .disablePlugins(RevolverPlugin) @@ -641,7 +664,16 @@ val backend = project Dependencies.http4sClient ++ Dependencies.emil ) - .dependsOn(store, notificationApi, joexapi, ftsclient, totp, pubsubApi, loggingApi) + .dependsOn( + store, + notificationApi, + joexapi, + ftsclient, + totp, + pubsubApi, + loggingApi, + schedulerApi + ) val oidc = project .in(file("modules/oidc")) @@ -732,7 +764,8 @@ val joex = project restapi, ftssolr, pubsubNaive, - notificationImpl + notificationImpl, + schedulerImpl ) val restserver = project @@ -902,7 +935,9 @@ val root = project pubsubApi, pubsubNaive, notificationApi, - notificationImpl + notificationImpl, + schedulerApi, + schedulerImpl ) // --- Helpers diff --git a/modules/scheduler/api/src/main/scala/docspell/scheduler/Context.scala b/modules/scheduler/api/src/main/scala/docspell/scheduler/Context.scala new file mode 100644 index 00000000..d5380f94 --- /dev/null +++ b/modules/scheduler/api/src/main/scala/docspell/scheduler/Context.scala @@ -0,0 +1,82 @@ +/* + * Copyright 2020 Eike K. & Contributors + * + * SPDX-License-Identifier: AGPL-3.0-or-later + */ + +package docspell.scheduler + +import cats.effect._ +import cats.implicits._ +import cats.{Applicative, Functor} + +import docspell.common._ +import docspell.logging.Logger +import docspell.store.Store +import docspell.store.records.RJob + +trait Context[F[_], A] { self => + + def jobId: Ident + + def args: A + + def config: SchedulerConfig + + def logger: Logger[F] + + def setProgress(percent: Int): F[Unit] + + def store: Store[F] + + final def isLastRetry(implicit ev: Applicative[F]): F[Boolean] = + for { + current <- store.transact(RJob.getRetries(jobId)) + last = config.retries == current.getOrElse(0) + } yield last + + def map[C](f: A => C)(implicit F: Functor[F]): Context[F, C] = + new Context.ContextImpl[F, C](f(args), logger, store, config, jobId) +} + +object Context { + + def create[F[_]: Async, A]( + jobId: Ident, + arg: A, + config: SchedulerConfig, + log: Logger[F], + store: Store[F] + ): Context[F, A] = + new ContextImpl(arg, log, store, config, jobId) + + def apply[F[_]: Async, A]( + job: RJob, + arg: A, + config: SchedulerConfig, + logSink: LogSink[F], + store: Store[F] + ): F[Context[F, A]] = { + val log = docspell.logging.getLogger[F] + for { + _ <- log.trace("Creating logger for task run") + logger <- QueueLogger(job.id, job.info, config.logBufferSize, logSink) + _ <- log.trace("Logger created, instantiating context") + ctx = create[F, A](job.id, arg, config, logger, store) + } yield ctx + } + + final private class ContextImpl[F[_]: Functor, A]( + val args: A, + val logger: Logger[F], + val store: Store[F], + val config: SchedulerConfig, + val jobId: Ident + ) extends Context[F, A] { + + def setProgress(percent: Int): F[Unit] = { + val pval = math.min(100, math.max(0, percent)) + store.transact(RJob.setProgress(jobId, pval)).map(_ => ()) + } + } +} diff --git a/modules/scheduler/api/src/main/scala/docspell/scheduler/CountingScheme.scala b/modules/scheduler/api/src/main/scala/docspell/scheduler/CountingScheme.scala new file mode 100644 index 00000000..86aecd1a --- /dev/null +++ b/modules/scheduler/api/src/main/scala/docspell/scheduler/CountingScheme.scala @@ -0,0 +1,44 @@ +/* + * Copyright 2020 Eike K. & Contributors + * + * SPDX-License-Identifier: AGPL-3.0-or-later + */ + +package docspell.scheduler + +import cats.implicits._ + +import docspell.common.Priority + +/** A counting scheme to indicate a ratio between scheduling high and low priority jobs. + * + * For example high=4, low=1 means: ”schedule 4 high priority jobs and then 1 low + * priority job“. + */ +case class CountingScheme(high: Int, low: Int, counter: Int = 0) { + + def nextPriority: (CountingScheme, Priority) = + if (counter <= 0) (increment, Priority.High) + else { + val rest = counter % (high + low) + if (rest < high) (increment, Priority.High) + else (increment, Priority.Low) + } + + def increment: CountingScheme = + copy(counter = counter + 1) +} + +object CountingScheme { + + def writeString(cs: CountingScheme): String = + s"${cs.high},${cs.low}" + + def readString(str: String): Either[String, CountingScheme] = + str.split(',') match { + case Array(h, l) => + Either.catchNonFatal(CountingScheme(h.toInt, l.toInt)).left.map(_.getMessage) + case _ => + Left(s"Invalid counting scheme: $str") + } +} diff --git a/modules/scheduler/api/src/main/scala/docspell/scheduler/JobTask.scala b/modules/scheduler/api/src/main/scala/docspell/scheduler/JobTask.scala new file mode 100644 index 00000000..e4c5074f --- /dev/null +++ b/modules/scheduler/api/src/main/scala/docspell/scheduler/JobTask.scala @@ -0,0 +1,51 @@ +/* + * Copyright 2020 Eike K. & Contributors + * + * SPDX-License-Identifier: AGPL-3.0-or-later + */ + +package docspell.scheduler + +import cats.effect.Sync +import cats.implicits._ + +import docspell.common.Ident +import docspell.common.syntax.all._ + +import io.circe.Decoder + +/** Binds a Task to a name. This is required to lookup the code based on the taskName in + * the RJob data and to execute it given the arguments that have to be read from a + * string. + * + * Since the scheduler only has a string for the task argument, this only works for Task + * impls that accept a string. There is a convenience constructor that uses circe to + * decode json into some type A. + */ +case class JobTask[F[_]]( + name: Ident, + task: Task[F, String, JobTaskResult], + onCancel: Task[F, String, Unit] +) + +object JobTask { + + def json[F[_]: Sync, A, B]( + name: Ident, + task: Task[F, A, B], + onCancel: Task[F, A, Unit] + )(implicit + D: Decoder[A], + E: JobTaskResultEncoder[B] + ): JobTask[F] = { + val convert: String => F[A] = + str => + str.parseJsonAs[A] match { + case Right(a) => a.pure[F] + case Left(ex) => + Sync[F].raiseError(new Exception(s"Cannot parse task arguments: $str", ex)) + } + + JobTask(name, task.contramap(convert).map(E.encode), onCancel.contramap(convert)) + } +} diff --git a/modules/scheduler/api/src/main/scala/docspell/scheduler/JobTaskRegistry.scala b/modules/scheduler/api/src/main/scala/docspell/scheduler/JobTaskRegistry.scala new file mode 100644 index 00000000..baa12150 --- /dev/null +++ b/modules/scheduler/api/src/main/scala/docspell/scheduler/JobTaskRegistry.scala @@ -0,0 +1,30 @@ +/* + * Copyright 2020 Eike K. & Contributors + * + * SPDX-License-Identifier: AGPL-3.0-or-later + */ + +package docspell.scheduler + +import docspell.common.Ident + +/** This is a mapping from some identifier to a task. This is used by the scheduler to + * lookup an implementation using the taskName field of the RJob database record. + */ +final class JobTaskRegistry[F[_]](tasks: Map[Ident, JobTask[F]]) { + + def withTask(task: JobTask[F]): JobTaskRegistry[F] = + JobTaskRegistry(tasks.updated(task.name, task)) + + def find(taskName: Ident): Option[JobTask[F]] = + tasks.get(taskName) +} + +object JobTaskRegistry { + + def apply[F[_]](map: Map[Ident, JobTask[F]]): JobTaskRegistry[F] = + new JobTaskRegistry[F](map) + + def empty[F[_]]: JobTaskRegistry[F] = apply(Map.empty) + +} diff --git a/modules/scheduler/api/src/main/scala/docspell/scheduler/JobTaskResult.scala b/modules/scheduler/api/src/main/scala/docspell/scheduler/JobTaskResult.scala new file mode 100644 index 00000000..c121fc47 --- /dev/null +++ b/modules/scheduler/api/src/main/scala/docspell/scheduler/JobTaskResult.scala @@ -0,0 +1,27 @@ +/* + * Copyright 2020 Eike K. & Contributors + * + * SPDX-License-Identifier: AGPL-3.0-or-later + */ + +package docspell.scheduler + +import io.circe.Json + +final case class JobTaskResult(message: Option[String], json: Option[Json]) { + + def withMessage(m: String): JobTaskResult = + copy(message = Some(m)) + + def withJson(json: Json): JobTaskResult = + copy(json = Some(json)) +} + +object JobTaskResult { + + val empty: JobTaskResult = JobTaskResult(None, None) + + def message(msg: String): JobTaskResult = JobTaskResult(Some(msg), None) + + def json(json: Json): JobTaskResult = JobTaskResult(None, Some(json)) +} diff --git a/modules/scheduler/api/src/main/scala/docspell/scheduler/JobTaskResultEncoder.scala b/modules/scheduler/api/src/main/scala/docspell/scheduler/JobTaskResultEncoder.scala new file mode 100644 index 00000000..a0ad96a5 --- /dev/null +++ b/modules/scheduler/api/src/main/scala/docspell/scheduler/JobTaskResultEncoder.scala @@ -0,0 +1,49 @@ +/* + * Copyright 2020 Eike K. & Contributors + * + * SPDX-License-Identifier: AGPL-3.0-or-later + */ + +package docspell.scheduler + +import docspell.scheduler.JobTaskResultEncoder.instance + +import io.circe.Encoder + +trait JobTaskResultEncoder[A] { self => + def encode(a: A): JobTaskResult + + final def contramap[B](f: B => A): JobTaskResultEncoder[B] = + JobTaskResultEncoder.instance(b => self.encode(f(b))) + + final def map(f: JobTaskResult => JobTaskResult): JobTaskResultEncoder[A] = + instance(a => f(self.encode(a))) + + final def modify(f: (A, JobTaskResult) => JobTaskResult): JobTaskResultEncoder[A] = + instance(a => f(a, self.encode(a))) + + final def withMessage(f: A => String): JobTaskResultEncoder[A] = + modify((a, r) => r.withMessage(f(a))) +} + +object JobTaskResultEncoder { + + def apply[A](implicit v: JobTaskResultEncoder[A]): JobTaskResultEncoder[A] = v + + def instance[A](f: A => JobTaskResult): JobTaskResultEncoder[A] = + (a: A) => f(a) + + def fromJson[A: Encoder]: JobTaskResultEncoder[A] = + instance(a => JobTaskResult.json(Encoder[A].apply(a))) + + implicit val unitJobTaskResultEncoder: JobTaskResultEncoder[Unit] = + instance(_ => JobTaskResult.empty) + + implicit def optionJobTaskResultEncoder[A](implicit + ea: JobTaskResultEncoder[A] + ): JobTaskResultEncoder[Option[A]] = + instance { + case Some(a) => ea.encode(a) + case None => JobTaskResult.empty + } +} diff --git a/modules/scheduler/api/src/main/scala/docspell/scheduler/LogEvent.scala b/modules/scheduler/api/src/main/scala/docspell/scheduler/LogEvent.scala new file mode 100644 index 00000000..29a91631 --- /dev/null +++ b/modules/scheduler/api/src/main/scala/docspell/scheduler/LogEvent.scala @@ -0,0 +1,38 @@ +/* + * Copyright 2020 Eike K. & Contributors + * + * SPDX-License-Identifier: AGPL-3.0-or-later + */ + +package docspell.scheduler + +import cats.effect.Sync +import cats.implicits._ + +import docspell.common._ + +case class LogEvent( + jobId: Ident, + jobInfo: String, + time: Timestamp, + level: LogLevel, + msg: String, + ex: Option[Throwable] = None +) { + + def logLine: String = + s">>> ${time.asString} $level $jobInfo: $msg" + +} + +object LogEvent { + + def create[F[_]: Sync]( + jobId: Ident, + jobInfo: String, + level: LogLevel, + msg: String + ): F[LogEvent] = + Timestamp.current[F].map(now => LogEvent(jobId, jobInfo, now, level, msg)) + +} diff --git a/modules/scheduler/api/src/main/scala/docspell/scheduler/LogSink.scala b/modules/scheduler/api/src/main/scala/docspell/scheduler/LogSink.scala new file mode 100644 index 00000000..6bf34f05 --- /dev/null +++ b/modules/scheduler/api/src/main/scala/docspell/scheduler/LogSink.scala @@ -0,0 +1,76 @@ +/* + * Copyright 2020 Eike K. & Contributors + * + * SPDX-License-Identifier: AGPL-3.0-or-later + */ + +package docspell.scheduler + +import cats.effect._ +import cats.implicits._ +import fs2.Pipe + +import docspell.common._ +import docspell.logging +import docspell.store.Store +import docspell.store.records.RJobLog + +trait LogSink[F[_]] { + + def receive: Pipe[F, LogEvent, Unit] + +} + +object LogSink { + + def apply[F[_]](sink: Pipe[F, LogEvent, Unit]): LogSink[F] = + new LogSink[F] { + val receive = sink + } + + def logInternal[F[_]: Sync](e: LogEvent): F[Unit] = { + val logger = docspell.logging.getLogger[F] + val addData: logging.LogEvent => logging.LogEvent = + _.data("jobId", e.jobId).data("jobInfo", e.jobInfo) + + e.level match { + case LogLevel.Info => + logger.infoWith(e.logLine)(addData) + case LogLevel.Debug => + logger.debugWith(e.logLine)(addData) + case LogLevel.Warn => + logger.warnWith(e.logLine)(addData) + case LogLevel.Error => + e.ex match { + case Some(exc) => + logger.errorWith(e.logLine)(addData.andThen(_.addError(exc))) + case None => + logger.errorWith(e.logLine)(addData) + } + } + } + + def printer[F[_]: Sync]: LogSink[F] = + LogSink(_.evalMap(e => logInternal(e))) + + def db[F[_]: Async](store: Store[F]): LogSink[F] = + LogSink( + _.evalMap(ev => + for { + id <- Ident.randomId[F] + joblog = RJobLog( + id, + ev.jobId, + ev.level, + ev.time, + ev.msg + ev.ex.map(th => ": " + th.getMessage).getOrElse("") + ) + _ <- logInternal(ev) + _ <- store.transact(RJobLog.insert(joblog)) + } yield () + ) + ) + + def dbAndLog[F[_]: Async](store: Store[F]): LogSink[F] = + LogSink(_.broadcastThrough(printer[F].receive, db[F](store).receive)) +} diff --git a/modules/scheduler/api/src/main/scala/docspell/scheduler/PeriodicScheduler.scala b/modules/scheduler/api/src/main/scala/docspell/scheduler/PeriodicScheduler.scala new file mode 100644 index 00000000..284a9465 --- /dev/null +++ b/modules/scheduler/api/src/main/scala/docspell/scheduler/PeriodicScheduler.scala @@ -0,0 +1,32 @@ +/* + * Copyright 2020 Eike K. & Contributors + * + * SPDX-License-Identifier: AGPL-3.0-or-later + */ + +package docspell.scheduler + +import cats.effect._ +import fs2._ + +/** A periodic scheduler takes care to submit periodic tasks to the job queue. + * + * It is run in the background to regularly find a periodic task to execute. If the task + * is due, it will be submitted into the job queue where it will be picked up by the + * scheduler from some joex instance. If it is due in the future, a notification is + * scheduled to be received at that time so the task can be looked up again. + */ +trait PeriodicScheduler[F[_]] { + + def config: PeriodicSchedulerConfig + + def start: Stream[F, Nothing] + + def shutdown: F[Unit] + + def periodicAwake: F[Fiber[F, Throwable, Unit]] + + def notifyChange: F[Unit] +} + +object PeriodicScheduler {} diff --git a/modules/scheduler/api/src/main/scala/docspell/scheduler/PeriodicSchedulerConfig.scala b/modules/scheduler/api/src/main/scala/docspell/scheduler/PeriodicSchedulerConfig.scala new file mode 100644 index 00000000..7445e9e3 --- /dev/null +++ b/modules/scheduler/api/src/main/scala/docspell/scheduler/PeriodicSchedulerConfig.scala @@ -0,0 +1,14 @@ +/* + * Copyright 2020 Eike K. & Contributors + * + * SPDX-License-Identifier: AGPL-3.0-or-later + */ + +package docspell.scheduler + +import docspell.common._ + +case class PeriodicSchedulerConfig( + name: Ident, + wakeupPeriod: Duration +) diff --git a/modules/scheduler/api/src/main/scala/docspell/scheduler/QueueLogger.scala b/modules/scheduler/api/src/main/scala/docspell/scheduler/QueueLogger.scala new file mode 100644 index 00000000..b0f17d23 --- /dev/null +++ b/modules/scheduler/api/src/main/scala/docspell/scheduler/QueueLogger.scala @@ -0,0 +1,58 @@ +/* + * Copyright 2020 Eike K. & Contributors + * + * SPDX-License-Identifier: AGPL-3.0-or-later + */ + +package docspell.scheduler + +import cats.effect._ +import cats.effect.std.Queue +import cats.implicits._ +import fs2.Stream + +import docspell.common._ +import docspell.logging +import docspell.logging.{Level, Logger} + +object QueueLogger { + + def create[F[_]: Sync]( + jobId: Ident, + jobInfo: String, + q: Queue[F, LogEvent] + ): Logger[F] = + new Logger[F] { + + def log(logEvent: logging.LogEvent) = + LogEvent + .create[F](jobId, jobInfo, level2Level(logEvent.level), logEvent.msg()) + .flatMap { ev => + val event = + logEvent.findErrors.headOption + .map(ex => ev.copy(ex = Some(ex))) + .getOrElse(ev) + + q.offer(event) + } + + def asUnsafe = Logger.off + } + + def apply[F[_]: Async]( + jobId: Ident, + jobInfo: String, + bufferSize: Int, + sink: LogSink[F] + ): F[Logger[F]] = + for { + q <- Queue.circularBuffer[F, LogEvent](bufferSize) + log = create(jobId, jobInfo, q) + _ <- Async[F].start( + Stream.fromQueueUnterminated(q).through(sink.receive).compile.drain + ) + } yield log + + private def level2Level(level: Level): LogLevel = + LogLevel.fromLevel(level) +} diff --git a/modules/scheduler/api/src/main/scala/docspell/scheduler/Scheduler.scala b/modules/scheduler/api/src/main/scala/docspell/scheduler/Scheduler.scala new file mode 100644 index 00000000..720214c7 --- /dev/null +++ b/modules/scheduler/api/src/main/scala/docspell/scheduler/Scheduler.scala @@ -0,0 +1,38 @@ +/* + * Copyright 2020 Eike K. & Contributors + * + * SPDX-License-Identifier: AGPL-3.0-or-later + */ + +package docspell.scheduler + +import cats.effect._ +import fs2.Stream + +import docspell.common.Ident +import docspell.store.records.RJob + +trait Scheduler[F[_]] { + + def config: SchedulerConfig + + def getRunning: F[Vector[RJob]] + + def requestCancel(jobId: Ident): F[Boolean] + + def notifyChange: F[Unit] + + def start: Stream[F, Nothing] + + /** Requests to shutdown the scheduler. + * + * The scheduler will not take any new jobs from the queue. If there are still running + * jobs, it waits for them to complete. when the cancelAll flag is set to true, it + * cancels all running jobs. + * + * The returned F[Unit] can be evaluated to wait for all that to complete. + */ + def shutdown(cancelAll: Boolean): F[Unit] + + def periodicAwake: F[Fiber[F, Throwable, Unit]] +} diff --git a/modules/scheduler/api/src/main/scala/docspell/scheduler/SchedulerConfig.scala b/modules/scheduler/api/src/main/scala/docspell/scheduler/SchedulerConfig.scala new file mode 100644 index 00000000..fbbc4f3c --- /dev/null +++ b/modules/scheduler/api/src/main/scala/docspell/scheduler/SchedulerConfig.scala @@ -0,0 +1,33 @@ +/* + * Copyright 2020 Eike K. & Contributors + * + * SPDX-License-Identifier: AGPL-3.0-or-later + */ + +package docspell.scheduler + +import docspell.common._ + +case class SchedulerConfig( + name: Ident, + poolSize: Int, + countingScheme: CountingScheme, + retries: Int, + retryDelay: Duration, + logBufferSize: Int, + wakeupPeriod: Duration +) + +object SchedulerConfig { + + val default = SchedulerConfig( + name = Ident.unsafe("default-scheduler"), + poolSize = 2 // math.max(2, Runtime.getRuntime.availableProcessors / 2) + , + countingScheme = CountingScheme(2, 1), + retries = 5, + retryDelay = Duration.seconds(30), + logBufferSize = 500, + wakeupPeriod = Duration.minutes(10) + ) +} diff --git a/modules/scheduler/api/src/main/scala/docspell/scheduler/Task.scala b/modules/scheduler/api/src/main/scala/docspell/scheduler/Task.scala new file mode 100644 index 00000000..d6868a08 --- /dev/null +++ b/modules/scheduler/api/src/main/scala/docspell/scheduler/Task.scala @@ -0,0 +1,72 @@ +/* + * Copyright 2020 Eike K. & Contributors + * + * SPDX-License-Identifier: AGPL-3.0-or-later + */ + +package docspell.scheduler + +import cats._ +import cats.data.Kleisli +import cats.effect.Sync +import cats.implicits._ + +import docspell.logging.Logger + +/** The code that is executed by the scheduler */ +trait Task[F[_], A, B] { + + def run(ctx: Context[F, A]): F[B] + + def andThen[C](f: B => F[C])(implicit F: FlatMap[F]): Task[F, A, C] = + Task(Task.toKleisli(this).andThen(f)) + + def mapF[C](f: F[B] => F[C]): Task[F, A, C] = + Task(Task.toKleisli(this).mapF(f)) + + def attempt(implicit + F: ApplicativeError[F, Throwable] + ): Task[F, A, Either[Throwable, B]] = + mapF(_.attempt) + + def contramap[C](f: C => F[A])(implicit F: FlatMap[F]): Task[F, C, B] = { + ctxc: Context[F, C] => f(ctxc.args).flatMap(a => run(ctxc.map(_ => a))) + } +} + +object Task { + + def pure[F[_]: Applicative, A, B](b: B): Task[F, A, B] = + Task(_ => b.pure[F]) + + def of[F[_], A, B](b: F[B]): Task[F, A, B] = + Task(_ => b) + + def apply[F[_], A, B](f: Context[F, A] => F[B]): Task[F, A, B] = + (ctx: Context[F, A]) => f(ctx) + + def apply[F[_], A, B](k: Kleisli[F, Context[F, A], B]): Task[F, A, B] = + c => k.run(c) + + def toKleisli[F[_], A, B](t: Task[F, A, B]): Kleisli[F, Context[F, A], B] = + Kleisli(t.run) + + def setProgress[F[_]: Sync, A, B](n: Int)(data: B): Task[F, A, B] = + Task(_.setProgress(n).map(_ => data)) + + def log[F[_], A](f: Logger[F] => F[Unit]): Task[F, A, Unit] = + Task(ctx => f(ctx.logger)) + + implicit def taskMonad[F[_]: Monad, T]: Monad[Task[F, T, *]] = + new Monad[Task[F, T, *]] { + def pure[A](x: A) = Task(_ => Monad[F].pure(x)) + def flatMap[A, B](fa: Task[F, T, A])(f: A => Task[F, T, B]) = + Task(Task.toKleisli(fa).flatMap(a => Task.toKleisli(f(a)))) + + def tailRecM[A, B](a: A)(f: A => Task[F, T, Either[A, B]]) = { + val monadK = Monad[Kleisli[F, Context[F, T], *]] + val r = monadK.tailRecM(a)(x => Task.toKleisli(f(x))) + Task(r) + } + } +} diff --git a/modules/scheduler/api/src/main/scala/docspell/scheduler/msg/JobDone.scala b/modules/scheduler/api/src/main/scala/docspell/scheduler/msg/JobDone.scala new file mode 100644 index 00000000..61eff3f9 --- /dev/null +++ b/modules/scheduler/api/src/main/scala/docspell/scheduler/msg/JobDone.scala @@ -0,0 +1,32 @@ +/* + * Copyright 2020 Eike K. & Contributors + * + * SPDX-License-Identifier: AGPL-3.0-or-later + */ + +package docspell.scheduler.msg + +import docspell.common._ +import docspell.pubsub.api.{Topic, TypedTopic} + +import io.circe.generic.semiauto.{deriveDecoder, deriveEncoder} +import io.circe.{Decoder, Encoder} + +/** Message to notify about finished jobs. They have a final state. */ +final case class JobDone( + jobId: Ident, + group: Ident, + task: Ident, + args: String, + state: JobState +) +object JobDone { + implicit val jsonDecoder: Decoder[JobDone] = + deriveDecoder[JobDone] + + implicit val jsonEncoder: Encoder[JobDone] = + deriveEncoder[JobDone] + + val topic: TypedTopic[JobDone] = + TypedTopic(Topic("job-finished")) +} diff --git a/modules/scheduler/impl/src/main/scala/docspell/scheduler/impl/PeriodicSchedulerBuilder.scala b/modules/scheduler/impl/src/main/scala/docspell/scheduler/impl/PeriodicSchedulerBuilder.scala new file mode 100644 index 00000000..5fbaada2 --- /dev/null +++ b/modules/scheduler/impl/src/main/scala/docspell/scheduler/impl/PeriodicSchedulerBuilder.scala @@ -0,0 +1,31 @@ +package docspell.scheduler.impl + +import cats.effect._ +import docspell.scheduler._ +import docspell.store.queue.{JobQueue, PeriodicTaskStore} +import fs2.concurrent.SignallingRef + +object PeriodicSchedulerBuilder { + + def build[F[_]: Async]( + cfg: PeriodicSchedulerConfig, + sch: Scheduler[F], + queue: JobQueue[F], + store: PeriodicTaskStore[F], + notifyJoex: F[Unit] + ): Resource[F, PeriodicScheduler[F]] = + for { + waiter <- Resource.eval(SignallingRef(true)) + state <- Resource.eval(SignallingRef(PeriodicSchedulerImpl.emptyState[F])) + psch = new PeriodicSchedulerImpl[F]( + cfg, + sch, + queue, + store, + notifyJoex, + waiter, + state + ) + _ <- Resource.eval(psch.init) + } yield psch +} diff --git a/modules/scheduler/impl/src/main/scala/docspell/scheduler/impl/PeriodicSchedulerImpl.scala b/modules/scheduler/impl/src/main/scala/docspell/scheduler/impl/PeriodicSchedulerImpl.scala new file mode 100644 index 00000000..efdefeaf --- /dev/null +++ b/modules/scheduler/impl/src/main/scala/docspell/scheduler/impl/PeriodicSchedulerImpl.scala @@ -0,0 +1,182 @@ +/* + * Copyright 2020 Eike K. & Contributors + * + * SPDX-License-Identifier: AGPL-3.0-or-later + */ + +package docspell.scheduler.impl + +import cats.effect._ +import cats.implicits._ +import fs2._ +import fs2.concurrent.SignallingRef + +import docspell.common._ +import docspell.scheduler._ +import docspell.scheduler.impl.PeriodicSchedulerImpl.State +import docspell.store.queue._ +import docspell.store.records.RPeriodicTask + +import eu.timepit.fs2cron.calev.CalevScheduler + +final class PeriodicSchedulerImpl[F[_]: Async]( + val config: PeriodicSchedulerConfig, + sch: Scheduler[F], + queue: JobQueue[F], + store: PeriodicTaskStore[F], + joexNotifyAll: F[Unit], + waiter: SignallingRef[F, Boolean], + state: SignallingRef[F, State[F]] +) extends PeriodicScheduler[F] { + private[this] val logger = docspell.logging.getLogger[F] + + def start: Stream[F, Nothing] = + logger.stream.info("Starting periodic scheduler").drain ++ + mainLoop + + def shutdown: F[Unit] = + state.modify(_.requestShutdown) + + def periodicAwake: F[Fiber[F, Throwable, Unit]] = + Async[F].start( + Stream + .awakeEvery[F](config.wakeupPeriod.toScala) + .evalMap(_ => logger.debug("Periodic awake reached") *> notifyChange) + .compile + .drain + ) + + def notifyChange: F[Unit] = + waiter.update(b => !b) + + // internal + + /** On startup, get all periodic jobs from this scheduler and remove the mark, so they + * get picked up again. + */ + def init: F[Unit] = + logError("Error clearing marks")(store.clearMarks(config.name)) + + def mainLoop: Stream[F, Nothing] = { + val body: F[Boolean] = + for { + _ <- logger.debug(s"Going into main loop") + now <- Timestamp.current[F] + _ <- logger.debug(s"Looking for next periodic task") + go <- logThrow("Error getting next task")( + store + .takeNext(config.name, None) + .use { + case Marked.Found(pj) => + logger + .debug(s"Found periodic task '${pj.subject}/${pj.timer.asString}'") *> + (if (isTriggered(pj, now)) submitJob(pj) + else scheduleNotify(pj).map(_ => false)) + case Marked.NotFound => + logger.debug("No periodic task found") *> false.pure[F] + case Marked.NotMarkable => + logger.debug("Periodic job cannot be marked. Trying again.") *> true + .pure[F] + } + ) + } yield go + + Stream + .eval(state.get.map(_.shutdownRequest)) + .evalTap( + if (_) logger.info("Stopping main loop due to shutdown request.") + else ().pure[F] + ) + .flatMap(if (_) Stream.empty else Stream.eval(cancelNotify *> body)) + .flatMap { + case true => + mainLoop + case false => + logger.stream.debug(s"Waiting for notify").drain ++ + waiter.discrete.take(2).drain ++ + logger.stream.debug(s"Notify signal, going into main loop").drain ++ + mainLoop + } + } + + def isTriggered(pj: RPeriodicTask, now: Timestamp): Boolean = + pj.nextrun < now + + def submitJob(pj: RPeriodicTask): F[Boolean] = + store + .findNonFinalJob(pj.id) + .flatMap { + case Some(job) => + logger.info( + s"There is already a job with non-final state '${job.state}' in the queue" + ) *> scheduleNotify(pj) *> false.pure[F] + + case None => + logger.info(s"Submitting job for periodic task '${pj.task.id}'") *> + pj.toJob.flatMap(queue.insert) *> notifyJoex *> true.pure[F] + } + + def notifyJoex: F[Unit] = + sch.notifyChange *> joexNotifyAll + + def scheduleNotify(pj: RPeriodicTask): F[Unit] = + Timestamp + .current[F] + .flatMap(now => + logger.debug( + s"Scheduling next notify for timer ${pj.timer.asString} -> ${pj.timer.nextElapse(now.toUtcDateTime)}" + ) + ) *> + Async[F] + .start( + CalevScheduler + .utc[F] + .sleep(pj.timer) + .evalMap(_ => notifyChange) + .compile + .drain + ) + .flatMap(fb => state.modify(_.setNotify(fb))) + + def cancelNotify: F[Unit] = + state + .modify(_.clearNotify) + .flatMap { + case Some(fb) => + fb.cancel + case None => + ().pure[F] + } + + private def logError(msg: => String)(fa: F[Unit]): F[Unit] = + fa.attempt.flatMap { + case Right(_) => ().pure[F] + case Left(ex) => logger.error(ex)(msg).map(_ => ()) + } + + private def logThrow[A](msg: => String)(fa: F[A]): F[A] = + fa.attempt.flatMap { + case r @ Right(_) => (r: Either[Throwable, A]).pure[F] + case l @ Left(ex) => logger.error(ex)(msg).map(_ => (l: Either[Throwable, A])) + }.rethrow +} + +object PeriodicSchedulerImpl { + def emptyState[F[_]]: State[F] = + State(false, None) + + case class State[F[_]]( + shutdownRequest: Boolean, + scheduledNotify: Option[Fiber[F, Throwable, Unit]] + ) { + def requestShutdown: (State[F], Unit) = + (copy(shutdownRequest = true), ()) + + def setNotify(fb: Fiber[F, Throwable, Unit]): (State[F], Unit) = + (copy(scheduledNotify = Some(fb)), ()) + + def clearNotify: (State[F], Option[Fiber[F, Throwable, Unit]]) = + (copy(scheduledNotify = None), scheduledNotify) + + } +} diff --git a/modules/scheduler/impl/src/main/scala/docspell/scheduler/impl/SchedulerBuilder.scala b/modules/scheduler/impl/src/main/scala/docspell/scheduler/impl/SchedulerBuilder.scala new file mode 100644 index 00000000..4e51231b --- /dev/null +++ b/modules/scheduler/impl/src/main/scala/docspell/scheduler/impl/SchedulerBuilder.scala @@ -0,0 +1,97 @@ +/* + * Copyright 2020 Eike K. & Contributors + * + * SPDX-License-Identifier: AGPL-3.0-or-later + */ + +package docspell.scheduler.impl + +import cats.effect._ +import cats.effect.std.Semaphore +import cats.implicits._ +import fs2.concurrent.SignallingRef + +import docspell.scheduler._ +import docspell.notification.api.EventSink +import docspell.pubsub.api.PubSubT +import docspell.store.Store +import docspell.store.queue.JobQueue + +case class SchedulerBuilder[F[_]: Async]( + config: SchedulerConfig, + tasks: JobTaskRegistry[F], + store: Store[F], + queue: Resource[F, JobQueue[F]], + logSink: LogSink[F], + pubSub: PubSubT[F], + eventSink: EventSink[F] +) { + + def withConfig(cfg: SchedulerConfig): SchedulerBuilder[F] = + copy(config = cfg) + + def withTaskRegistry(reg: JobTaskRegistry[F]): SchedulerBuilder[F] = + copy(tasks = reg) + + def withTask[A](task: JobTask[F]): SchedulerBuilder[F] = + withTaskRegistry(tasks.withTask(task)) + + def withQueue(queue: Resource[F, JobQueue[F]]): SchedulerBuilder[F] = + copy(queue = queue) + + def withLogSink(sink: LogSink[F]): SchedulerBuilder[F] = + copy(logSink = sink) + + def withQueue(queue: JobQueue[F]): SchedulerBuilder[F] = + copy(queue = Resource.pure[F, JobQueue[F]](queue)) + + def withPubSub(pubSubT: PubSubT[F]): SchedulerBuilder[F] = + copy(pubSub = pubSubT) + + def withEventSink(sink: EventSink[F]): SchedulerBuilder[F] = + copy(eventSink = sink) + + def serve: Resource[F, Scheduler[F]] = + resource.evalMap(sch => Async[F].start(sch.start.compile.drain).map(_ => sch)) + + def resource: Resource[F, Scheduler[F]] = { + val scheduler: Resource[F, SchedulerImpl[F]] = for { + jq <- queue + waiter <- Resource.eval(SignallingRef(true)) + state <- Resource.eval(SignallingRef(SchedulerImpl.emptyState[F])) + perms <- Resource.eval(Semaphore(config.poolSize.toLong)) + } yield new SchedulerImpl[F]( + config, + jq, + pubSub, + eventSink, + tasks, + store, + logSink, + state, + waiter, + perms + ) + + scheduler.evalTap(_.init).map(s => s: Scheduler[F]) + } + +} + +object SchedulerBuilder { + + def apply[F[_]: Async]( + config: SchedulerConfig, + store: Store[F] + ): SchedulerBuilder[F] = + new SchedulerBuilder[F]( + config, + JobTaskRegistry.empty[F], + store, + JobQueue(store), + LogSink.db[F](store), + PubSubT.noop[F], + EventSink.silent[F] + ) + +} diff --git a/modules/scheduler/impl/src/main/scala/docspell/scheduler/impl/SchedulerImpl.scala b/modules/scheduler/impl/src/main/scala/docspell/scheduler/impl/SchedulerImpl.scala new file mode 100644 index 00000000..d0de5991 --- /dev/null +++ b/modules/scheduler/impl/src/main/scala/docspell/scheduler/impl/SchedulerImpl.scala @@ -0,0 +1,352 @@ +/* + * Copyright 2020 Eike K. & Contributors + * + * SPDX-License-Identifier: AGPL-3.0-or-later + */ + +package docspell.scheduler.impl + +import cats.data.OptionT +import cats.effect._ +import cats.effect.std.Semaphore +import cats.implicits._ +import fs2.Stream +import fs2.concurrent.SignallingRef + +import docspell.scheduler.msg.JobDone +import docspell.common._ +import docspell.scheduler._ +import docspell.scheduler.impl.SchedulerImpl._ +import docspell.notification.api.Event +import docspell.notification.api.EventSink +import docspell.pubsub.api.PubSubT +import docspell.store.Store +import docspell.store.queries.QJob +import docspell.store.queue.JobQueue +import docspell.store.records.RJob + +import io.circe.Json + +final class SchedulerImpl[F[_]: Async]( + val config: SchedulerConfig, + queue: JobQueue[F], + pubSub: PubSubT[F], + eventSink: EventSink[F], + tasks: JobTaskRegistry[F], + store: Store[F], + logSink: LogSink[F], + state: SignallingRef[F, State[F]], + waiter: SignallingRef[F, Boolean], + permits: Semaphore[F] +) extends Scheduler[F] { + + private[this] val logger = docspell.logging.getLogger[F] + + /** On startup, get all jobs in state running from this scheduler and put them into + * waiting state, so they get picked up again. + */ + def init: F[Unit] = + QJob.runningToWaiting(config.name, store) + + def periodicAwake: F[Fiber[F, Throwable, Unit]] = + Async[F].start( + Stream + .awakeEvery[F](config.wakeupPeriod.toScala) + .evalMap(_ => logger.debug("Periodic awake reached") *> notifyChange) + .compile + .drain + ) + + def getRunning: F[Vector[RJob]] = + state.get.flatMap(s => QJob.findAll(s.getRunning, store)) + + def requestCancel(jobId: Ident): F[Boolean] = + logger.info(s"Scheduler requested to cancel job: ${jobId.id}") *> + state.get.flatMap(_.cancelRequest(jobId) match { + case Some(ct) => ct.map(_ => true) + case None => + (for { + job <- OptionT(store.transact(RJob.findByIdAndWorker(jobId, config.name))) + _ <- OptionT.liftF( + if (job.isInProgress) executeCancel(job) + else ().pure[F] + ) + } yield true) + .getOrElseF( + logger.warn(s"Job ${jobId.id} not found, cannot cancel.").map(_ => false) + ) + }) + + def notifyChange: F[Unit] = + waiter.update(b => !b) + + def shutdown(cancelAll: Boolean): F[Unit] = { + val doCancel = + state.get.flatMap(_.cancelTokens.values.toList.traverse(identity)).map(_ => ()) + + val runShutdown = + state.modify(_.requestShutdown) *> (if (cancelAll) doCancel else ().pure[F]) + + val wait = Stream + .eval(runShutdown) + .evalMap(_ => logger.info("Scheduler is shutting down now.")) + .flatMap(_ => + Stream.eval(state.get) ++ Stream + .suspend(state.discrete.takeWhile(_.getRunning.nonEmpty)) + ) + .flatMap { state => + if (state.getRunning.isEmpty) Stream.eval(logger.info("No jobs running.")) + else + Stream.eval( + logger.info(s"Waiting for ${state.getRunning.size} jobs to finish.") + ) ++ + Stream.emit(state) + } + + (wait.drain ++ Stream.emit(())).compile.lastOrError + } + + def start: Stream[F, Nothing] = + logger.stream.info("Starting scheduler").drain ++ + mainLoop + + def mainLoop: Stream[F, Nothing] = { + val body: F[Boolean] = + for { + _ <- permits.available.flatMap(a => + logger.debug(s"Try to acquire permit ($a free)") + ) + _ <- permits.acquire + _ <- logger.debug("New permit acquired") + down <- state.get.map(_.shutdownRequest) + rjob <- + if (down) + logger.info("") *> permits.release *> (None: Option[RJob]).pure[F] + else + queue.nextJob( + group => state.modify(_.nextPrio(group, config.countingScheme)), + config.name, + config.retryDelay + ) + _ <- logger.debug(s"Next job found: ${rjob.map(_.info)}") + _ <- rjob.map(execute).getOrElse(permits.release) + } yield rjob.isDefined + + Stream + .eval(state.get.map(_.shutdownRequest)) + .evalTap( + if (_) logger.info("Stopping main loop due to shutdown request.") + else ().pure[F] + ) + .flatMap(if (_) Stream.empty else Stream.eval(body)) + .flatMap { + case true => + mainLoop + case false => + logger.stream.debug(s"Waiting for notify").drain ++ + waiter.discrete.take(2).drain ++ + logger.stream.debug(s"Notify signal, going into main loop").drain ++ + mainLoop + } + } + + private def executeCancel(job: RJob): F[Unit] = { + val task = for { + jobtask <- + tasks + .find(job.task) + .toRight(s"This executor cannot run tasks with name: ${job.task}") + } yield jobtask + + task match { + case Left(err) => + logger.error(s"Unable to run cancellation task for job ${job.info}: $err") + case Right(t) => + for { + _ <- + logger.debug(s"Creating context for job ${job.info} to run cancellation $t") + ctx <- Context[F, String](job, job.args, config, logSink, store) + _ <- t.onCancel.run(ctx) + _ <- state.modify(_.markCancelled(job)) + _ <- onFinish(job, JobTaskResult.empty, JobState.Cancelled) + _ <- ctx.logger.warn("Job has been cancelled.") + _ <- logger.debug(s"Job ${job.info} has been cancelled.") + } yield () + } + } + + def execute(job: RJob): F[Unit] = { + val task = for { + jobtask <- + tasks + .find(job.task) + .toRight(s"This executor cannot run tasks with name: ${job.task}") + } yield jobtask + + task match { + case Left(err) => + logger.error(s"Unable to start a task for job ${job.info}: $err") + case Right(t) => + for { + _ <- logger.debug(s"Creating context for job ${job.info} to run $t") + ctx <- Context[F, String](job, job.args, config, logSink, store) + jot = wrapTask(job, t.task, ctx) + tok <- forkRun(job, jot.run(ctx), t.onCancel.run(ctx), ctx) + _ <- state.modify(_.addRunning(job, tok)) + } yield () + } + } + + def onFinish(job: RJob, result: JobTaskResult, finishState: JobState): F[Unit] = + for { + _ <- logger.debug(s"Job ${job.info} done $finishState. Releasing resources.") + _ <- permits.release *> permits.available.flatMap(a => + logger.debug(s"Permit released ($a free)") + ) + _ <- state.modify(_.removeRunning(job)) + _ <- QJob.setFinalState(job.id, finishState, store) + _ <- Sync[F].whenA(JobState.isDone(finishState))( + pubSub.publish1IgnoreErrors( + JobDone.topic, + JobDone(job.id, job.group, job.task, job.args, finishState) + ) + ) + _ <- Sync[F].whenA(JobState.isDone(finishState))( + eventSink.offer( + Event.JobDone( + job.id, + job.group, + job.task, + job.args, + job.state, + job.subject, + job.submitter, + result.json.getOrElse(Json.Null), + result.message + ) + ) + ) + } yield () + + def onStart(job: RJob): F[Unit] = + QJob.setRunning( + job.id, + config.name, + store + ) // also increments retries if current state=stuck + + def wrapTask( + job: RJob, + task: Task[F, String, JobTaskResult], + ctx: Context[F, String] + ): Task[F, String, Unit] = + task + .mapF(fa => onStart(job) *> logger.debug("Starting task now") *> fa) + .mapF(_.attempt.flatMap { + case Right(result) => + logger.info(s"Job execution successful: ${job.info}") + ctx.logger.info("Job execution successful") *> + (JobState.Success: JobState, result).pure[F] + case Left(ex) => + state.get.map(_.wasCancelled(job)).flatMap { + case true => + logger.error(ex)(s"Job ${job.info} execution failed (cancel = true)") + ctx.logger.error(ex)("Job execution failed (cancel = true)") *> + (JobState.Cancelled: JobState, JobTaskResult.empty).pure[F] + case false => + QJob.exceedsRetries(job.id, config.retries, store).flatMap { + case true => + logger.error(ex)(s"Job ${job.info} execution failed. Retries exceeded.") + ctx.logger + .error(ex)(s"Job ${job.info} execution failed. Retries exceeded.") + .map(_ => (JobState.Failed: JobState, JobTaskResult.empty)) + case false => + logger.error(ex)(s"Job ${job.info} execution failed. Retrying later.") + ctx.logger + .error(ex)(s"Job ${job.info} execution failed. Retrying later.") + .map(_ => (JobState.Stuck: JobState, JobTaskResult.empty)) + } + } + }) + .mapF(_.attempt.flatMap { + case Right((jstate, result)) => + onFinish(job, result, jstate) + case Left(ex) => + logger.error(ex)(s"Error happened during post-processing of ${job.info}!") + // we don't know the real outcome here… + // since tasks should be idempotent, set it to stuck. if above has failed, this might fail anyways + onFinish(job, JobTaskResult.empty, JobState.Stuck) + }) + + def forkRun( + job: RJob, + code: F[Unit], + onCancel: F[Unit], + ctx: Context[F, String] + ): F[F[Unit]] = + logger.debug(s"Forking job ${job.info}") *> + Async[F] + .start(code) + .map(fiber => + logger.debug(s"Cancelling job ${job.info}") *> + fiber.cancel *> + onCancel.attempt.map { + case Right(_) => () + case Left(ex) => + logger.error(ex)(s"Task's cancelling code failed. Job ${job.info}.") + () + } *> + state.modify(_.markCancelled(job)) *> + onFinish(job, JobTaskResult.empty, JobState.Cancelled) *> + ctx.logger.warn("Job has been cancelled.") *> + logger.debug(s"Job ${job.info} has been cancelled.") + ) +} + +object SchedulerImpl { + + type CancelToken[F[_]] = F[Unit] + + def emptyState[F[_]]: State[F] = + State(Map.empty, Set.empty, Map.empty, false) + + case class State[F[_]]( + counters: Map[Ident, CountingScheme], + cancelled: Set[Ident], + cancelTokens: Map[Ident, CancelToken[F]], + shutdownRequest: Boolean + ) { + + def nextPrio(group: Ident, initial: CountingScheme): (State[F], Priority) = { + val (cs, prio) = counters.getOrElse(group, initial).nextPriority + (copy(counters = counters.updated(group, cs)), prio) + } + + def addRunning(job: RJob, token: CancelToken[F]): (State[F], Unit) = + ( + State(counters, cancelled, cancelTokens.updated(job.id, token), shutdownRequest), + () + ) + + def removeRunning(job: RJob): (State[F], Unit) = + ( + copy(cancelled = cancelled - job.id, cancelTokens = cancelTokens.removed(job.id)), + () + ) + + def markCancelled(job: RJob): (State[F], Unit) = + (copy(cancelled = cancelled + job.id), ()) + + def wasCancelled(job: RJob): Boolean = + cancelled.contains(job.id) + + def cancelRequest(id: Ident): Option[F[Unit]] = + cancelTokens.get(id) + + def getRunning: Seq[Ident] = + cancelTokens.keys.toSeq + + def requestShutdown: (State[F], Unit) = + (copy(shutdownRequest = true), ()) + } +}