Refactor scheduler into api / impl

This commit is contained in:
eikek
2022-03-13 14:27:06 +01:00
parent 69765f05ff
commit 3a05dc56cb
50 changed files with 1076 additions and 867 deletions

View File

@ -6,14 +6,9 @@
package docspell.scheduler
import cats.effect._
import cats.implicits._
import cats.{Applicative, Functor}
import docspell.common._
import docspell.logging.Logger
import docspell.store.Store
import docspell.store.records.RJob
trait Context[F[_], A] { self =>
@ -29,54 +24,8 @@ trait Context[F[_], A] { self =>
def store: Store[F]
final def isLastRetry(implicit ev: Applicative[F]): F[Boolean] =
for {
current <- store.transact(RJob.getRetries(jobId))
last = config.retries == current.getOrElse(0)
} yield last
def isLastRetry: F[Boolean]
def map[C](f: A => C): Context[F, C]
def map[C](f: A => C)(implicit F: Functor[F]): Context[F, C] =
new Context.ContextImpl[F, C](f(args), logger, store, config, jobId)
}
object Context {
def create[F[_]: Async, A](
jobId: Ident,
arg: A,
config: SchedulerConfig,
log: Logger[F],
store: Store[F]
): Context[F, A] =
new ContextImpl(arg, log, store, config, jobId)
def apply[F[_]: Async, A](
job: RJob,
arg: A,
config: SchedulerConfig,
logSink: LogSink[F],
store: Store[F]
): F[Context[F, A]] = {
val log = docspell.logging.getLogger[F]
for {
_ <- log.trace("Creating logger for task run")
logger <- QueueLogger(job.id, job.info, config.logBufferSize, logSink)
_ <- log.trace("Logger created, instantiating context")
ctx = create[F, A](job.id, arg, config, logger, store)
} yield ctx
}
final private class ContextImpl[F[_]: Functor, A](
val args: A,
val logger: Logger[F],
val store: Store[F],
val config: SchedulerConfig,
val jobId: Ident
) extends Context[F, A] {
def setProgress(percent: Int): F[Unit] = {
val pval = math.min(100, math.max(0, percent))
store.transact(RJob.setProgress(jobId, pval)).map(_ => ())
}
}
}

View File

@ -0,0 +1,36 @@
package docspell.scheduler
import cats.effect.Sync
import cats.syntax.functor._
import docspell.common._
import io.circe.Encoder
final case class Job[A](
id: Ident,
task: Ident,
group: Ident,
args: A,
subject: String,
submitter: Ident,
priority: Priority,
tracker: Option[Ident]
) {
def encode(implicit E: Encoder[A]): Job[String] =
Job(id, task, group, E.apply(args).noSpaces, subject, submitter, priority, tracker)
}
object Job {
def createNew[F[_]: Sync, A](
task: Ident,
group: Ident,
args: A,
subject: String,
submitter: Ident,
priority: Priority,
tracker: Option[Ident]
): F[Job[A]] =
Ident.randomId[F].map { id =>
Job(id, task, group, args, subject, submitter, priority, tracker)
}
}

View File

@ -1,97 +0,0 @@
/*
* Copyright 2020 Eike K. & Contributors
*
* SPDX-License-Identifier: AGPL-3.0-or-later
*/
package docspell.scheduler
import cats.effect._
import cats.implicits._
import docspell.common._
import docspell.store.Store
import docspell.store.queries.QJob
import docspell.store.records.RJob
trait JobQueue[F[_]] {
/** Inserts the job into the queue to get picked up as soon as possible. The job must
* have a new unique id.
*/
def insert(job: RJob): F[Unit]
/** Inserts the job into the queue only, if there is no job with the same tracker-id
* running at the moment. The job id must be a new unique id.
*
* If the job has no tracker defined, it is simply inserted.
*/
def insertIfNew(job: RJob): F[Boolean]
def insertAll(jobs: Seq[RJob]): F[List[Boolean]]
def insertAllIfNew(jobs: Seq[RJob]): F[List[Boolean]]
def nextJob(
prio: Ident => F[Priority],
worker: Ident,
retryPause: Duration
): F[Option[RJob]]
}
object JobQueue {
private[scheduler] def create[F[_]: Async](store: Store[F]): Resource[F, JobQueue[F]] =
Resource.pure[F, JobQueue[F]](new JobQueue[F] {
private[this] val logger = docspell.logging.getLogger[F]
def nextJob(
prio: Ident => F[Priority],
worker: Ident,
retryPause: Duration
): F[Option[RJob]] =
logger
.trace("Select next job") *> QJob.takeNextJob(store)(prio, worker, retryPause)
def insert(job: RJob): F[Unit] =
store
.transact(RJob.insert(job))
.flatMap { n =>
if (n != 1)
Async[F]
.raiseError(new Exception(s"Inserting job failed. Update count: $n"))
else ().pure[F]
}
def insertIfNew(job: RJob): F[Boolean] =
for {
rj <- job.tracker match {
case Some(tid) =>
store.transact(RJob.findNonFinalByTracker(tid))
case None =>
None.pure[F]
}
ret <-
if (rj.isDefined) false.pure[F]
else insert(job).as(true)
} yield ret
def insertAll(jobs: Seq[RJob]): F[List[Boolean]] =
jobs.toList
.traverse(j => insert(j).attempt)
.flatMap(_.traverse {
case Right(()) => true.pure[F]
case Left(ex) =>
logger.error(ex)("Could not insert job. Skipping it.").as(false)
})
def insertAllIfNew(jobs: Seq[RJob]): F[List[Boolean]] =
jobs.toList
.traverse(j => insertIfNew(j).attempt)
.flatMap(_.traverse {
case Right(true) => true.pure[F]
case Right(false) => false.pure[F]
case Left(ex) =>
logger.error(ex)("Could not insert job. Skipping it.").as(false)
})
})
}

View File

@ -0,0 +1,21 @@
package docspell.scheduler
trait JobStore[F[_]] {
/** Inserts the job into the queue to get picked up as soon as possible. The job must
* have a new unique id.
*/
def insert(job: Job[String]): F[Unit]
/** Inserts the job into the queue only, if there is no job with the same tracker-id
* running at the moment. The job id must be a new unique id.
*
* If the job has no tracker defined, it is simply inserted.
*/
def insertIfNew(job: Job[String]): F[Boolean]
def insertAll(jobs: Seq[Job[String]]): F[List[Boolean]]
def insertAllIfNew(jobs: Seq[Job[String]]): F[List[Boolean]]
}

View File

@ -0,0 +1,9 @@
package docspell.scheduler
import docspell.scheduler.usertask.UserTaskStore
trait JobStoreModule[F[_]] {
def userTasks: UserTaskStore[F]
def jobs: JobStore[F]
}

View File

@ -43,7 +43,7 @@ object JobTask {
str.parseJsonAs[A] match {
case Right(a) => a.pure[F]
case Left(ex) =>
Sync[F].raiseError(new Exception(s"Cannot parse task arguments: $str", ex))
Sync[F].raiseError(new Exception(s"Cannot parse task arguments: '$str'", ex))
}
JobTask(name, task.contramap(convert).map(E.encode), onCancel.contramap(convert))

View File

@ -12,3 +12,8 @@ case class PeriodicSchedulerConfig(
name: Ident,
wakeupPeriod: Duration
)
object PeriodicSchedulerConfig {
def default(id: Ident): PeriodicSchedulerConfig =
PeriodicSchedulerConfig(id, Duration.minutes(10))
}

View File

@ -20,11 +20,10 @@ case class SchedulerConfig(
object SchedulerConfig {
val default = SchedulerConfig(
name = Ident.unsafe("default-scheduler"),
poolSize = 2 // math.max(2, Runtime.getRuntime.availableProcessors / 2)
,
countingScheme = CountingScheme(2, 1),
def default(id: Ident) = SchedulerConfig(
name = id,
poolSize = 1,
countingScheme = CountingScheme(3, 1),
retries = 5,
retryDelay = Duration.seconds(30),
logBufferSize = 500,

View File

@ -0,0 +1,6 @@
package docspell.scheduler
trait SchedulerModule[F[_]] {
def scheduler: Scheduler[F]
def periodicScheduler: PeriodicScheduler[F]
}

View File

@ -6,15 +6,11 @@
package docspell.scheduler.usertask
import cats.effect._
import cats.implicits._
import com.github.eikek.calev.CalEvent
import docspell.common._
import docspell.common.syntax.all._
import docspell.store.records.RPeriodicTask
import io.circe.{Decoder, Encoder}
import io.circe.Encoder
case class UserTask[A](
final case class UserTask[A](
id: Ident,
name: Ident,
enabled: Boolean,
@ -32,33 +28,3 @@ case class UserTask[A](
def mapArgs[B](f: A => B): UserTask[B] =
withArgs(f(args))
}
object UserTask {
implicit final class UserTaskCodec(ut: UserTask[String]) {
def decode[A](implicit D: Decoder[A]): Either[String, UserTask[A]] =
ut.args
.parseJsonAs[A]
.left
.map(_.getMessage)
.map(a => ut.copy(args = a))
def toPeriodicTask[F[_]: Sync](
scope: UserTaskScope,
subject: Option[String]
): F[RPeriodicTask] =
QUserTask
.create[F](
ut.enabled,
scope,
ut.name,
ut.args,
subject.getOrElse(s"${scope.fold(_.user.id, _.id)}: ${ut.name.id}"),
Priority.Low,
ut.timer,
ut.summary
)
.map(r => r.copy(id = ut.id))
}
}

View File

@ -20,7 +20,7 @@ sealed trait UserTaskScope { self: Product =>
/** Maps to the account or uses the collective for both parts if the scope is collective
* wide.
*/
private[usertask] def toAccountId: AccountId =
private[scheduler] def toAccountId: AccountId =
AccountId(collective, fold(_.user, identity))
}
@ -49,4 +49,7 @@ object UserTaskScope {
def apply(collective: Ident): UserTaskScope =
UserTaskScope.collective(collective)
def system: UserTaskScope =
collective(DocspellSystem.taskGroup)
}

View File

@ -7,10 +7,7 @@
package docspell.scheduler.usertask
import cats.data.OptionT
import cats.effect._
import cats.implicits._
import docspell.common._
import docspell.store.{AddResult, Store}
import fs2.Stream
import io.circe._
@ -88,96 +85,11 @@ trait UserTaskStore[F[_]] {
/** Delete all tasks of the given user that have name `name`. */
def deleteAll(scope: UserTaskScope, name: Ident): F[Int]
}
object UserTaskStore {
def apply[F[_]: Async](store: Store[F]): Resource[F, UserTaskStore[F]] =
Resource.pure[F, UserTaskStore[F]](new UserTaskStore[F] {
def getAll(scope: UserTaskScope): Stream[F, UserTask[String]] =
store.transact(QUserTask.findAll(scope.toAccountId))
def getByNameRaw(scope: UserTaskScope, name: Ident): Stream[F, UserTask[String]] =
store.transact(QUserTask.findByName(scope.toAccountId, name))
def getByIdRaw(scope: UserTaskScope, id: Ident): OptionT[F, UserTask[String]] =
OptionT(store.transact(QUserTask.findById(scope.toAccountId, id)))
def getByName[A](scope: UserTaskScope, name: Ident)(implicit
D: Decoder[A]
): Stream[F, UserTask[A]] =
getByNameRaw(scope, name).flatMap(_.decode match {
case Right(ua) => Stream.emit(ua)
case Left(err) => Stream.raiseError[F](new Exception(err))
})
def updateTask[A](scope: UserTaskScope, subject: Option[String], ut: UserTask[A])(
implicit E: Encoder[A]
): F[Int] = {
val exists = QUserTask.exists(ut.id)
val insert = QUserTask.insert(scope, subject, ut.encode)
store.add(insert, exists).flatMap {
case AddResult.Success =>
1.pure[F]
case AddResult.EntityExists(_) =>
store.transact(QUserTask.update(scope, subject, ut.encode))
case AddResult.Failure(ex) =>
Async[F].raiseError(ex)
}
}
def deleteTask(scope: UserTaskScope, id: Ident): F[Int] =
store.transact(QUserTask.delete(scope.toAccountId, id))
def getOneByNameRaw(
scope: UserTaskScope,
name: Ident
): OptionT[F, UserTask[String]] =
OptionT(
getByNameRaw(scope, name)
.take(2)
.compile
.toList
.flatMap {
case Nil => (None: Option[UserTask[String]]).pure[F]
case ut :: Nil => ut.some.pure[F]
case _ => Async[F].raiseError(new Exception("More than one result found"))
}
)
def getOneByName[A](scope: UserTaskScope, name: Ident)(implicit
D: Decoder[A]
): OptionT[F, UserTask[A]] =
getOneByNameRaw(scope, name)
.semiflatMap(_.decode match {
case Right(ua) => ua.pure[F]
case Left(err) => Async[F].raiseError(new Exception(err))
})
def updateOneTask[A](
scope: UserTaskScope,
subject: Option[String],
ut: UserTask[A]
)(implicit
E: Encoder[A]
): F[UserTask[String]] =
getByNameRaw(scope, ut.name).compile.toList.flatMap {
case a :: rest =>
val task = ut.copy(id = a.id).encode
for {
_ <- store.transact(QUserTask.update(scope, subject, task))
_ <- store.transact(
rest.traverse(t => QUserTask.delete(scope.toAccountId, t.id))
)
} yield task
case Nil =>
val task = ut.encode
store.transact(QUserTask.insert(scope, subject, task)).map(_ => task)
}
def deleteAll(scope: UserTaskScope, name: Ident): F[Int] =
store.transact(QUserTask.deleteAll(scope.toAccountId, name))
})
/** Discards the schedule and immediately submits the task to the job executor's queue.
* It will not update the corresponding periodic task.
*/
def executeNow[A](scope: UserTaskScope, subject: Option[String], task: UserTask[A])(
implicit E: Encoder[A]
): F[Unit]
}

View File

@ -0,0 +1,21 @@
/*
* Copyright 2020 Eike K. & Contributors
*
* SPDX-License-Identifier: AGPL-3.0-or-later
*/
package docspell.scheduler
import docspell.common.Priority
import munit._
class CountingSchemeSpec extends FunSuite {
test("counting") {
val cs = CountingScheme(2, 1)
val list = List.iterate(cs.nextPriority, 6)(_._1.nextPriority).map(_._2)
val expect = List(Priority.High, Priority.High, Priority.Low)
assertEquals(list, expect ++ expect)
}
}

View File

@ -0,0 +1,61 @@
package docspell.scheduler.impl
import cats._
import cats.syntax.all._
import cats.effect._
import docspell.common._
import docspell.logging.Logger
import docspell.scheduler._
import docspell.store.Store
import docspell.store.records.RJob
class ContextImpl[F[_]: Functor, A](
val args: A,
val logger: Logger[F],
val store: Store[F],
val config: SchedulerConfig,
val jobId: Ident
) extends Context[F, A] {
def setProgress(percent: Int): F[Unit] = {
val pval = math.min(100, math.max(0, percent))
store.transact(RJob.setProgress(jobId, pval)).map(_ => ())
}
def isLastRetry: F[Boolean] =
for {
current <- store.transact(RJob.getRetries(jobId))
last = config.retries == current.getOrElse(0)
} yield last
def map[C](f: A => C) =
new ContextImpl[F, C](f(args), logger, store, config, jobId)
}
object ContextImpl {
def create[F[_]: Async, A](
jobId: Ident,
arg: A,
config: SchedulerConfig,
log: Logger[F],
store: Store[F]
): Context[F, A] =
new ContextImpl(arg, log, store, config, jobId)
def apply[F[_]: Async, A](
job: RJob,
arg: A,
config: SchedulerConfig,
logSink: LogSink[F],
store: Store[F]
): F[Context[F, A]] = {
val log = docspell.logging.getLogger[F]
for {
_ <- log.trace("Creating logger for task run")
logger <- QueueLogger(job.id, job.info, config.logBufferSize, logSink)
_ <- log.trace("Logger created, instantiating context")
ctx = create[F, A](job.id, arg, config, logger, store)
} yield ctx
}
}

View File

@ -0,0 +1,37 @@
/*
* Copyright 2020 Eike K. & Contributors
*
* SPDX-License-Identifier: AGPL-3.0-or-later
*/
package docspell.scheduler.impl
import cats.effect._
import cats.implicits._
import docspell.common._
import docspell.store.Store
import docspell.store.queries.QJob
import docspell.store.records.RJob
trait JobQueue[F[_]] {
def nextJob(
prio: Ident => F[Priority],
worker: Ident,
retryPause: Duration
): F[Option[RJob]]
}
object JobQueue {
private[scheduler] def apply[F[_]: Async](store: Store[F]): JobQueue[F] =
new JobQueue[F] {
private[this] val logger = docspell.logging.getLogger[F]
def nextJob(
prio: Ident => F[Priority],
worker: Ident,
retryPause: Duration
): F[Option[RJob]] =
logger
.trace("Select next job") *> QJob.takeNextJob(store)(prio, worker, retryPause)
}
}

View File

@ -0,0 +1,87 @@
package docspell.scheduler.impl
import cats.effect.Sync
import cats.syntax.all._
import docspell.common.Timestamp
import docspell.scheduler._
import docspell.store.Store
import docspell.store.records.RJob
final class JobStoreImpl[F[_]: Sync](store: Store[F]) extends JobStore[F] {
private[this] val logger = docspell.logging.getLogger[F]
def insert(job: Job[String]): F[Unit] =
for {
now <- Timestamp.current[F]
_ <- insert0(job, now)
} yield ()
def insert0(job: Job[String], submitted: Timestamp): F[Unit] =
store
.transact(RJob.insert(toRecord(job, submitted)))
.flatMap { n =>
if (n != 1)
Sync[F]
.raiseError(new Exception(s"Inserting job failed. Update count: $n"))
else ().pure[F]
}
def insertIfNew(job: Job[String]): F[Boolean] =
Timestamp.current[F].flatMap(now => insertIfNew0(job, now))
def insertIfNew0(job: Job[String], submitted: Timestamp): F[Boolean] =
for {
rj <- job.tracker match {
case Some(tid) =>
store.transact(RJob.findNonFinalByTracker(tid))
case None =>
None.pure[F]
}
ret <-
if (rj.isDefined) false.pure[F]
else insert0(job, submitted).as(true)
} yield ret
def insertAll(jobs: Seq[Job[String]]): F[List[Boolean]] =
Timestamp.current[F].flatMap { now =>
jobs.toList
.traverse(j => insert0(j, now).attempt)
.flatMap(_.traverse {
case Right(()) => true.pure[F]
case Left(ex) =>
logger.error(ex)("Could not insert job. Skipping it.").as(false)
})
}
def insertAllIfNew(jobs: Seq[Job[String]]) =
Timestamp.current[F].flatMap { now =>
jobs.toList
.traverse(j => insertIfNew0(j, now).attempt)
.flatMap(_.traverse {
case Right(true) => true.pure[F]
case Right(false) => false.pure[F]
case Left(ex) =>
logger.error(ex)("Could not insert job. Skipping it.").as(false)
})
}
def toRecord(job: Job[String], timestamp: Timestamp): RJob =
RJob.newJob(
job.id,
job.task,
job.group,
job.args,
job.subject,
timestamp,
job.submitter,
job.priority,
job.tracker
)
}
object JobStoreImpl {
def apply[F[_]: Sync](store: Store[F]): JobStore[F] =
new JobStoreImpl[F](store)
}

View File

@ -0,0 +1,50 @@
package docspell.scheduler.impl
import cats.effect.Async
import docspell.notification.api.EventSink
import docspell.pubsub.api.PubSubT
import docspell.scheduler._
import docspell.scheduler.usertask.UserTaskStore
import docspell.store.Store
case class JobStoreModuleBuilder[F[_]: Async](
store: Store[F],
pubsub: PubSubT[F],
eventSink: EventSink[F]
) {
def withPubsub(ps: PubSubT[F]): JobStoreModuleBuilder[F] =
copy(pubsub = ps)
def withEventSink(es: EventSink[F]): JobStoreModuleBuilder[F] =
copy(eventSink = es)
def build: JobStoreModuleBuilder.Module[F] = {
val jobStore = JobStorePublish(store, pubsub, eventSink)
val periodicTaskStore = PeriodicTaskStore(store, jobStore)
val userTaskStore = UserTaskStoreImpl(store, periodicTaskStore)
new JobStoreModuleBuilder.Module(
userTaskStore,
periodicTaskStore,
jobStore,
store,
eventSink,
pubsub
)
}
}
object JobStoreModuleBuilder {
def apply[F[_]: Async](store: Store[F]): JobStoreModuleBuilder[F] =
JobStoreModuleBuilder(store, PubSubT.noop[F], EventSink.silent[F])
final class Module[F[_]](
val userTasks: UserTaskStore[F],
val periodicTaskStore: PeriodicTaskStore[F],
val jobs: JobStore[F],
val store: Store[F],
val eventSink: EventSink[F],
val pubSubT: PubSubT[F]
) extends JobStoreModule[F] {}
}

View File

@ -4,51 +4,52 @@
* SPDX-License-Identifier: AGPL-3.0-or-later
*/
package docspell.scheduler.msg
package docspell.scheduler.impl
import cats.effect._
import cats.implicits._
import docspell.common.{Duration, Ident, Priority}
import docspell.common.JobState
import docspell.notification.api.{Event, EventSink}
import docspell.pubsub.api.PubSubT
import docspell.scheduler.JobQueue
import docspell.scheduler._
import docspell.scheduler.msg.JobSubmitted
import docspell.store.Store
import docspell.store.records.RJob
final class JobQueuePublish[F[_]: Sync](
delegate: JobQueue[F],
final class JobStorePublish[F[_]: Sync](
delegate: JobStore[F],
pubsub: PubSubT[F],
eventSink: EventSink[F]
) extends JobQueue[F] {
) extends JobStore[F] {
private def msg(job: RJob): JobSubmitted =
private def msg(job: Job[String]): JobSubmitted =
JobSubmitted(job.id, job.group, job.task, job.args)
private def event(job: RJob): Event.JobSubmitted =
private def event(job: Job[String]): Event.JobSubmitted =
Event.JobSubmitted(
job.id,
job.group,
job.task,
job.args,
job.state,
JobState.waiting,
job.subject,
job.submitter
)
private def publish(job: RJob): F[Unit] =
private def publish(job: Job[String]): F[Unit] =
pubsub.publish1(JobSubmitted.topic, msg(job)).as(()) *>
eventSink.offer(event(job))
def insert(job: RJob) =
def insert(job: Job[String]) =
delegate.insert(job).flatTap(_ => publish(job))
def insertIfNew(job: RJob) =
def insertIfNew(job: Job[String]) =
delegate.insertIfNew(job).flatTap {
case true => publish(job)
case false => ().pure[F]
}
def insertAll(jobs: Seq[RJob]) =
def insertAll(jobs: Seq[Job[String]]) =
delegate.insertAll(jobs).flatTap { results =>
results.zip(jobs).traverse { case (res, job) =>
if (res) publish(job)
@ -56,23 +57,20 @@ final class JobQueuePublish[F[_]: Sync](
}
}
def insertAllIfNew(jobs: Seq[RJob]) =
def insertAllIfNew(jobs: Seq[Job[String]]) =
delegate.insertAllIfNew(jobs).flatTap { results =>
results.zip(jobs).traverse { case (res, job) =>
if (res) publish(job)
else ().pure[F]
}
}
def nextJob(prio: Ident => F[Priority], worker: Ident, retryPause: Duration) =
delegate.nextJob(prio, worker, retryPause)
}
object JobQueuePublish {
object JobStorePublish {
def apply[F[_]: Async](
store: Store[F],
pubSub: PubSubT[F],
eventSink: EventSink[F]
): Resource[F, JobQueue[F]] =
JobQueue.create(store).map(q => new JobQueuePublish[F](q, pubSub, eventSink))
): JobStore[F] =
new JobStorePublish[F](JobStoreImpl(store), pubSub, eventSink)
}

View File

@ -4,16 +4,16 @@
* SPDX-License-Identifier: AGPL-3.0-or-later
*/
package docspell.scheduler
package docspell.scheduler.impl
import cats.effect._
import cats.implicits._
import fs2.Pipe
import docspell.common._
import docspell.logging
import docspell.scheduler.LogEvent
import docspell.store.Store
import docspell.store.records.RJobLog
import fs2.Pipe
trait LogSink[F[_]] {

View File

@ -7,10 +7,8 @@ import fs2.concurrent.SignallingRef
object PeriodicSchedulerBuilder {
def build[F[_]: Async](
def resource[F[_]: Async](
cfg: PeriodicSchedulerConfig,
sch: Scheduler[F],
queue: JobQueue[F],
store: PeriodicTaskStore[F],
pubsub: PubSubT[F]
): Resource[F, PeriodicScheduler[F]] =
@ -19,8 +17,6 @@ object PeriodicSchedulerBuilder {
state <- Resource.eval(SignallingRef(PeriodicSchedulerImpl.emptyState[F]))
psch = new PeriodicSchedulerImpl[F](
cfg,
sch,
queue,
store,
pubsub,
waiter,

View File

@ -20,8 +20,6 @@ import eu.timepit.fs2cron.calev.CalevScheduler
final class PeriodicSchedulerImpl[F[_]: Async](
val config: PeriodicSchedulerConfig,
sch: Scheduler[F],
queue: JobQueue[F],
store: PeriodicTaskStore[F],
pubSub: PubSubT[F],
waiter: SignallingRef[F, Boolean],
@ -119,11 +117,11 @@ final class PeriodicSchedulerImpl[F[_]: Async](
case None =>
logger.info(s"Submitting job for periodic task '${pj.task.id}'") *>
pj.toJob.flatMap(queue.insert) *> notifyJoex *> true.pure[F]
store.submit(pj) *> notifyJoex *> true.pure[F]
}
def notifyJoex: F[Unit] =
sch.notifyChange *> pubSub.publish1IgnoreErrors(JobsNotify(), ()).void
pubSub.publish1IgnoreErrors(JobsNotify(), ()).void
def scheduleNotify(pj: RPeriodicTask): F[Unit] =
Timestamp

View File

@ -9,6 +9,7 @@ package docspell.scheduler.impl
import cats.effect._
import cats.implicits._
import docspell.common._
import docspell.scheduler.{Job, JobStore}
import docspell.store.queries.QPeriodicTask
import docspell.store.records._
import docspell.store.{AddResult, Store}
@ -37,12 +38,18 @@ trait PeriodicTaskStore[F[_]] {
/** Find all joex nodes as registered in the database. */
def findJoexNodes: F[Vector[RNode]]
/** Creates a job from the given task and submits it into the job queue */
def submit(task: RPeriodicTask): F[Unit]
}
object PeriodicTaskStore {
def create[F[_]: Sync](store: Store[F]): Resource[F, PeriodicTaskStore[F]] =
Resource.pure[F, PeriodicTaskStore[F]](new PeriodicTaskStore[F] {
def apply[F[_]: Sync](
store: Store[F],
jobStore: JobStore[F]
): PeriodicTaskStore[F] =
new PeriodicTaskStore[F] {
private[this] val logger = docspell.logging.getLogger[F]
def takeNext(
worker: Ident,
@ -116,5 +123,22 @@ object PeriodicTaskStore {
def findJoexNodes: F[Vector[RNode]] =
store.transact(RNode.findAll(NodeType.Joex))
})
def submit(task: RPeriodicTask) =
makeJob(task).flatMap(jobStore.insert)
def makeJob(rt: RPeriodicTask): F[Job[String]] =
Ident.randomId[F].map { id =>
Job(
id,
rt.task,
rt.group,
rt.args,
rt.subject,
rt.submitter,
rt.priority,
Some(id)
)
}
}
}

View File

@ -1,19 +1,48 @@
package docspell.scheduler.usertask
package docspell.scheduler.impl
import cats.implicits._
import cats.effect.Sync
import cats.syntax.all._
import com.github.eikek.calev.CalEvent
import docspell.common._
import docspell.common.{AccountId, Ident, Priority, Timestamp}
import docspell.scheduler.usertask.{UserTask, UserTaskScope}
import docspell.store.qb.DML
import docspell.store.qb.DSL._
import docspell.store.records.RPeriodicTask
import doobie.ConnectionIO
import fs2.Stream
import io.circe.Encoder
import io.circe.{Decoder, Encoder}
import doobie._
object QUserTask {
private val RT = RPeriodicTask.T
implicit final class UserTaskCodec(ut: UserTask[String]) {
import docspell.common.syntax.all._
def decode[A](implicit D: Decoder[A]): Either[String, UserTask[A]] =
ut.args
.parseJsonAs[A]
.left
.map(_.getMessage)
.map(a => ut.copy(args = a))
def toPeriodicTask[F[_]: Sync](
scope: UserTaskScope,
subject: Option[String]
): F[RPeriodicTask] =
QUserTask
.create[F](
ut.enabled,
scope,
ut.name,
ut.args,
subject.getOrElse(s"${scope.fold(_.user.id, _.id)}: ${ut.name.id}"),
Priority.Low,
ut.timer,
ut.summary
)
.map(r => r.copy(id = ut.id))
}
def findAll(account: AccountId): Stream[ConnectionIO, UserTask[String]] =
run(
select(RT.all),

View File

@ -1,19 +1,13 @@
/*
* Copyright 2020 Eike K. & Contributors
*
* SPDX-License-Identifier: AGPL-3.0-or-later
*/
package docspell.scheduler.impl
package docspell.scheduler
import cats.effect._
import cats.syntax.all._
import cats.effect.std.Queue
import cats.implicits._
import fs2.Stream
import docspell.common._
import cats.effect.{Async, Sync}
import docspell.common.{Ident, LogLevel}
import docspell.logging
import docspell.logging.{Level, Logger}
import docspell.scheduler.LogEvent
import fs2.Stream
object QueueLogger {

View File

@ -10,7 +10,7 @@ import cats.effect._
import cats.effect.std.Semaphore
import cats.implicits._
import fs2.concurrent.SignallingRef
import docspell.scheduler.{JobQueue, _}
import docspell.scheduler._
import docspell.notification.api.EventSink
import docspell.pubsub.api.PubSubT
import docspell.store.Store
@ -19,7 +19,7 @@ case class SchedulerBuilder[F[_]: Async](
config: SchedulerConfig,
tasks: JobTaskRegistry[F],
store: Store[F],
queue: Resource[F, JobQueue[F]],
queue: JobQueue[F],
logSink: LogSink[F],
pubSub: PubSubT[F],
eventSink: EventSink[F]
@ -34,14 +34,11 @@ case class SchedulerBuilder[F[_]: Async](
def withTask[A](task: JobTask[F]): SchedulerBuilder[F] =
withTaskRegistry(tasks.withTask(task))
def withQueue(queue: Resource[F, JobQueue[F]]): SchedulerBuilder[F] =
copy(queue = queue)
def withLogSink(sink: LogSink[F]): SchedulerBuilder[F] =
copy(logSink = sink)
def withQueue(queue: JobQueue[F]): SchedulerBuilder[F] =
copy(queue = Resource.pure[F, JobQueue[F]](queue))
copy(queue = queue)
def withPubSub(pubSubT: PubSubT[F]): SchedulerBuilder[F] =
copy(pubSub = pubSubT)
@ -53,14 +50,13 @@ case class SchedulerBuilder[F[_]: Async](
resource.evalMap(sch => Async[F].start(sch.start.compile.drain).map(_ => sch))
def resource: Resource[F, Scheduler[F]] = {
val scheduler: Resource[F, SchedulerImpl[F]] = for {
jq <- queue
waiter <- Resource.eval(SignallingRef(true))
state <- Resource.eval(SignallingRef(SchedulerImpl.emptyState[F]))
perms <- Resource.eval(Semaphore(config.poolSize.toLong))
val scheduler: F[SchedulerImpl[F]] = for {
waiter <- SignallingRef(true)
state <- SignallingRef(SchedulerImpl.emptyState[F])
perms <- Semaphore(config.poolSize.toLong)
} yield new SchedulerImpl[F](
config,
jq,
queue,
pubSub,
eventSink,
tasks,
@ -71,7 +67,7 @@ case class SchedulerBuilder[F[_]: Async](
perms
)
scheduler.evalTap(_.init).map(s => s: Scheduler[F])
Resource.eval(scheduler.flatTap(_.init)).map(s => s: Scheduler[F])
}
}
@ -86,10 +82,9 @@ object SchedulerBuilder {
config,
JobTaskRegistry.empty[F],
store,
JobQueue.create(store),
JobQueue(store),
LogSink.db[F](store),
PubSubT.noop[F],
EventSink.silent[F]
)
}

View File

@ -14,7 +14,7 @@ import fs2.Stream
import fs2.concurrent.SignallingRef
import docspell.scheduler.msg.{CancelJob, JobDone, JobsNotify}
import docspell.common._
import docspell.scheduler.{JobQueue, _}
import docspell.scheduler._
import docspell.scheduler.impl.SchedulerImpl._
import docspell.notification.api.Event
import docspell.notification.api.EventSink
@ -172,7 +172,7 @@ final class SchedulerImpl[F[_]: Async](
for {
_ <-
logger.debug(s"Creating context for job ${job.info} to run cancellation $t")
ctx <- Context[F, String](job, job.args, config, logSink, store)
ctx <- ContextImpl[F, String](job, job.args, config, logSink, store)
_ <- t.onCancel.run(ctx)
_ <- state.modify(_.markCancelled(job))
_ <- onFinish(job, JobTaskResult.empty, JobState.Cancelled)
@ -196,7 +196,7 @@ final class SchedulerImpl[F[_]: Async](
case Right(t) =>
for {
_ <- logger.debug(s"Creating context for job ${job.info} to run $t")
ctx <- Context[F, String](job, job.args, config, logSink, store)
ctx <- ContextImpl[F, String](job, job.args, config, logSink, store)
jot = wrapTask(job, t.task, ctx)
tok <- forkRun(job, jot.run(ctx), t.onCancel.run(ctx), ctx)
_ <- state.modify(_.addRunning(job, tok))

View File

@ -0,0 +1,68 @@
package docspell.scheduler.impl
import cats.effect._
import docspell.common.Ident
import docspell.scheduler.{
JobTaskRegistry,
PeriodicSchedulerConfig,
SchedulerConfig,
SchedulerModule
}
case class SchedulerModuleBuilder[F[_]: Async] private (
periodicSchedulerConfig: PeriodicSchedulerConfig,
schedulerBuilder: SchedulerBuilder[F],
jobStoreModule: JobStoreModuleBuilder.Module[F]
) {
private def configureScheduler(
f: SchedulerBuilder[F] => SchedulerBuilder[F]
): SchedulerModuleBuilder[F] =
copy(schedulerBuilder = f(schedulerBuilder))
def withTaskRegistry(reg: JobTaskRegistry[F]): SchedulerModuleBuilder[F] =
configureScheduler(_.withTaskRegistry(reg))
def withSchedulerConfig(cfg: SchedulerConfig): SchedulerModuleBuilder[F] =
configureScheduler(_.withConfig(cfg))
def withPeriodicSchedulerConfig(
cfg: PeriodicSchedulerConfig
): SchedulerModuleBuilder[F] =
copy(periodicSchedulerConfig = cfg)
def resource: Resource[F, SchedulerModule[F]] = {
val queue = JobQueue(jobStoreModule.store)
for {
schedulerR <- schedulerBuilder
.withPubSub(jobStoreModule.pubSubT)
.withEventSink(jobStoreModule.eventSink)
.withQueue(queue)
.resource
periodicTaskSchedulerR <-
PeriodicSchedulerBuilder.resource(
periodicSchedulerConfig,
jobStoreModule.periodicTaskStore,
jobStoreModule.pubSubT
)
} yield new SchedulerModule[F] {
val scheduler = schedulerR
val periodicScheduler = periodicTaskSchedulerR
}
}
}
object SchedulerModuleBuilder {
def apply[F[_]: Async](
jobStoreModule: JobStoreModuleBuilder.Module[F]
): SchedulerModuleBuilder[F] = {
val id = Ident.unsafe("default-node-id")
new SchedulerModuleBuilder(
PeriodicSchedulerConfig.default(id),
SchedulerBuilder(SchedulerConfig.default(id), jobStoreModule.store),
jobStoreModule
)
}
}

View File

@ -0,0 +1,117 @@
package docspell.scheduler.impl
import cats.effect._
import docspell.scheduler.usertask.UserTaskStore
import cats.data.OptionT
import cats.implicits._
import docspell.common._
import docspell.scheduler.usertask._
import docspell.store.{AddResult, Store}
import fs2.Stream
import io.circe._
import QUserTask.UserTaskCodec
final class UserTaskStoreImpl[F[_]: Sync](
store: Store[F],
periodicTaskStore: PeriodicTaskStore[F]
) extends UserTaskStore[F] {
def getAll(scope: UserTaskScope): Stream[F, UserTask[String]] =
store.transact(QUserTask.findAll(scope.toAccountId))
def getByNameRaw(scope: UserTaskScope, name: Ident): Stream[F, UserTask[String]] =
store.transact(QUserTask.findByName(scope.toAccountId, name))
def getByIdRaw(scope: UserTaskScope, id: Ident): OptionT[F, UserTask[String]] =
OptionT(store.transact(QUserTask.findById(scope.toAccountId, id)))
def getByName[A](scope: UserTaskScope, name: Ident)(implicit
D: Decoder[A]
): Stream[F, UserTask[A]] =
getByNameRaw(scope, name).flatMap(_.decode match {
case Right(ua) => Stream.emit(ua)
case Left(err) => Stream.raiseError[F](new Exception(err))
})
def updateTask[A](scope: UserTaskScope, subject: Option[String], ut: UserTask[A])(
implicit E: Encoder[A]
): F[Int] = {
val exists = QUserTask.exists(ut.id)
val insert = QUserTask.insert(scope, subject, ut.encode)
store.add(insert, exists).flatMap {
case AddResult.Success =>
1.pure[F]
case AddResult.EntityExists(_) =>
store.transact(QUserTask.update(scope, subject, ut.encode))
case AddResult.Failure(ex) =>
Sync[F].raiseError(ex)
}
}
def deleteTask(scope: UserTaskScope, id: Ident): F[Int] =
store.transact(QUserTask.delete(scope.toAccountId, id))
def getOneByNameRaw(
scope: UserTaskScope,
name: Ident
): OptionT[F, UserTask[String]] =
OptionT(
getByNameRaw(scope, name)
.take(2)
.compile
.toList
.flatMap {
case Nil => (None: Option[UserTask[String]]).pure[F]
case ut :: Nil => ut.some.pure[F]
case _ => Sync[F].raiseError(new Exception("More than one result found"))
}
)
def getOneByName[A](scope: UserTaskScope, name: Ident)(implicit
D: Decoder[A]
): OptionT[F, UserTask[A]] =
getOneByNameRaw(scope, name)
.semiflatMap(_.decode match {
case Right(ua) => ua.pure[F]
case Left(err) => Sync[F].raiseError(new Exception(err))
})
def updateOneTask[A](
scope: UserTaskScope,
subject: Option[String],
ut: UserTask[A]
)(implicit
E: Encoder[A]
): F[UserTask[String]] =
getByNameRaw(scope, ut.name).compile.toList.flatMap {
case a :: rest =>
val task = ut.copy(id = a.id).encode
for {
_ <- store.transact(QUserTask.update(scope, subject, task))
_ <- store.transact(
rest.traverse(t => QUserTask.delete(scope.toAccountId, t.id))
)
} yield task
case Nil =>
val task = ut.encode
store.transact(QUserTask.insert(scope, subject, task)).map(_ => task)
}
def deleteAll(scope: UserTaskScope, name: Ident): F[Int] =
store.transact(QUserTask.deleteAll(scope.toAccountId, name))
def executeNow[A](scope: UserTaskScope, subject: Option[String], task: UserTask[A])(
implicit E: Encoder[A]
): F[Unit] =
for {
ptask <- task.encode.toPeriodicTask(scope, subject)
_ <- periodicTaskStore.submit(ptask)
} yield ()
}
object UserTaskStoreImpl {
def apply[F[_]: Sync](
store: Store[F],
periodicTaskStore: PeriodicTaskStore[F]
): UserTaskStore[F] =
new UserTaskStoreImpl[F](store, periodicTaskStore)
}