diff --git a/.scalafmt.conf b/.scalafmt.conf index ab23da00..ccaa37ac 100644 --- a/.scalafmt.conf +++ b/.scalafmt.conf @@ -3,7 +3,7 @@ version = "2.4.2" align = more #align.arrowEnumeratorGenerator = true -maxColumn = 100 +maxColumn = 90 rewrite.rules = [ AvoidInfix diff --git a/build.sbt b/build.sbt index 53a3c886..37aee634 100644 --- a/build.sbt +++ b/build.sbt @@ -194,7 +194,8 @@ val store = project.in(file("modules/store")). Dependencies.databases ++ Dependencies.flyway ++ Dependencies.loggingApi ++ - Dependencies.emil + Dependencies.emil ++ + Dependencies.calev ).dependsOn(common) val extract = project.in(file("modules/extract")). diff --git a/modules/common/src/main/scala/docspell/common/Hash.scala b/modules/common/src/main/scala/docspell/common/Hash.scala new file mode 100644 index 00000000..2737c3e7 --- /dev/null +++ b/modules/common/src/main/scala/docspell/common/Hash.scala @@ -0,0 +1,28 @@ +package docspell.common + +import scodec.bits.ByteVector +import java.nio.charset.StandardCharsets + +final class Hash(bytes: ByteVector) { + + private def digest(name: String): String = + bytes.digest(name).toHex.toLowerCase + + def sha256: String = + digest("SHA-256") + + def md5: String = + digest("MD5") + + def add(str: String): Hash = + new Hash(bytes ++ ByteVector.view(str.getBytes(StandardCharsets.UTF_8))) + + def add(id: Ident): Hash = + add(id.id) +} + +object Hash { + + def empty: Hash = new Hash(ByteVector.empty) + +} diff --git a/modules/common/src/main/scala/docspell/common/Timestamp.scala b/modules/common/src/main/scala/docspell/common/Timestamp.scala index 4bafba64..ee0735dc 100644 --- a/modules/common/src/main/scala/docspell/common/Timestamp.scala +++ b/modules/common/src/main/scala/docspell/common/Timestamp.scala @@ -4,6 +4,8 @@ import java.time.{Instant, LocalDate, ZoneId} import cats.effect.Sync import io.circe.{Decoder, Encoder} +import java.time.LocalDateTime +import java.time.ZonedDateTime case class Timestamp(value: Instant) { @@ -17,13 +19,20 @@ case class Timestamp(value: Instant) { def minusHours(n: Long): Timestamp = Timestamp(value.minusSeconds(n * 60 * 60)) - def toDate: LocalDate = - value.atZone(ZoneId.of("UTC")).toLocalDate + def toUtcDate: LocalDate = + value.atZone(Timestamp.UTC).toLocalDate + + def toUtcDateTime: LocalDateTime = + value.atZone(Timestamp.UTC).toLocalDateTime + + def atZone(zone: ZoneId): ZonedDateTime = + value.atZone(zone) def asString: String = value.toString } object Timestamp { + val UTC = ZoneId.of("UTC") val Epoch = Timestamp(Instant.EPOCH) diff --git a/modules/joex/src/main/resources/reference.conf b/modules/joex/src/main/resources/reference.conf index 9712f54d..ba056a83 100644 --- a/modules/joex/src/main/resources/reference.conf +++ b/modules/joex/src/main/resources/reference.conf @@ -64,6 +64,19 @@ docspell.joex { wakeup-period = "30 minutes" } + periodic-scheduler { + + # Each scheduler needs a unique name. This defaults to the node + # name, which must be unique, too. + name = ${docspell.joex.app-id} + + # A fallback to start looking for due periodic tasks regularily. + # Usually joex instances should be notified via REST calls if + # external processes change tasks. But these requests may get + # lost. + wakeup-period = "10 minutes" + } + # Configuration of text extraction extraction { # For PDF files it is first tried to read the text parts of the diff --git a/modules/joex/src/main/scala/docspell/joex/Config.scala b/modules/joex/src/main/scala/docspell/joex/Config.scala index 55790144..a3f18d60 100644 --- a/modules/joex/src/main/scala/docspell/joex/Config.scala +++ b/modules/joex/src/main/scala/docspell/joex/Config.scala @@ -1,7 +1,7 @@ package docspell.joex import docspell.common.{Ident, LenientUri} -import docspell.joex.scheduler.SchedulerConfig +import docspell.joex.scheduler.{PeriodicSchedulerConfig, SchedulerConfig} import docspell.store.JdbcConfig import docspell.convert.ConvertConfig import docspell.extract.ExtractConfig @@ -12,6 +12,7 @@ case class Config( bind: Config.Bind, jdbc: JdbcConfig, scheduler: SchedulerConfig, + periodicScheduler: PeriodicSchedulerConfig, extraction: ExtractConfig, convert: ConvertConfig ) diff --git a/modules/joex/src/main/scala/docspell/joex/JoexApp.scala b/modules/joex/src/main/scala/docspell/joex/JoexApp.scala index 0bcbba25..bbb052a6 100644 --- a/modules/joex/src/main/scala/docspell/joex/JoexApp.scala +++ b/modules/joex/src/main/scala/docspell/joex/JoexApp.scala @@ -1,7 +1,7 @@ package docspell.joex import docspell.common.Ident -import docspell.joex.scheduler.Scheduler +import docspell.joex.scheduler.{PeriodicScheduler, Scheduler} import docspell.store.records.RJobLog trait JoexApp[F[_]] { @@ -10,6 +10,8 @@ trait JoexApp[F[_]] { def scheduler: Scheduler[F] + def periodicScheduler: PeriodicScheduler[F] + def findLogs(jobId: Ident): F[Vector[RJobLog]] /** Shuts down the job executor. diff --git a/modules/joex/src/main/scala/docspell/joex/JoexAppImpl.scala b/modules/joex/src/main/scala/docspell/joex/JoexAppImpl.scala index 8e4fbccf..45045137 100644 --- a/modules/joex/src/main/scala/docspell/joex/JoexAppImpl.scala +++ b/modules/joex/src/main/scala/docspell/joex/JoexAppImpl.scala @@ -3,9 +3,11 @@ package docspell.joex import cats.implicits._ import cats.effect._ import docspell.common.{Ident, NodeType, ProcessItemArgs} +import docspell.joex.background._ import docspell.joex.process.ItemHandler -import docspell.joex.scheduler.{JobTask, Scheduler, SchedulerBuilder} +import docspell.joex.scheduler._ import docspell.store.Store +import docspell.store.queue._ import docspell.store.ops.ONode import docspell.store.records.RJobLog import fs2.concurrent.SignallingRef @@ -17,14 +19,18 @@ final class JoexAppImpl[F[_]: ConcurrentEffect: ContextShift: Timer]( nodeOps: ONode[F], store: Store[F], termSignal: SignallingRef[F, Boolean], - val scheduler: Scheduler[F] + val scheduler: Scheduler[F], + val periodicScheduler: PeriodicScheduler[F] ) extends JoexApp[F] { def init: F[Unit] = { - val run = scheduler.start.compile.drain + val run = scheduler.start.compile.drain + val prun = periodicScheduler.start.compile.drain for { _ <- ConcurrentEffect[F].start(run) + _ <- ConcurrentEffect[F].start(prun) _ <- scheduler.periodicAwake + _ <- periodicScheduler.periodicAwake _ <- nodeOps.register(cfg.appId, NodeType.Joex, cfg.baseUrl) } yield () } @@ -36,7 +42,7 @@ final class JoexAppImpl[F[_]: ConcurrentEffect: ContextShift: Timer]( nodeOps.unregister(cfg.appId) def initShutdown: F[Unit] = - scheduler.shutdown(false) *> termSignal.set(true) + periodicScheduler.shutdown *> scheduler.shutdown(false) *> termSignal.set(true) } @@ -50,8 +56,12 @@ object JoexAppImpl { ): Resource[F, JoexApp[F]] = for { store <- Store.create(cfg.jdbc, connectEC, blocker) + queue <- JobQueue(store) + pstore <- PeriodicTaskStore.create(store) nodeOps <- ONode(store) + psch <- PeriodicScheduler.create(cfg.periodicScheduler, queue, pstore, Timer[F]) sch <- SchedulerBuilder(cfg.scheduler, blocker, store) + .withQueue(queue) .withTask( JobTask.json( ProcessItemArgs.taskName, @@ -59,8 +69,15 @@ object JoexAppImpl { ItemHandler.onCancel[F] ) ) + .withTask( + JobTask.json( + PeriodicTask.taskName, + PeriodicTask[F](cfg), + PeriodicTask.onCancel[F] + ) + ) .resource - app = new JoexAppImpl(cfg, nodeOps, store, termSignal, sch) + app = new JoexAppImpl(cfg, nodeOps, store, termSignal, sch, psch) appR <- Resource.make(app.init.map(_ => app))(_.shutdown) } yield appR } diff --git a/modules/joex/src/main/scala/docspell/joex/routes/JoexRoutes.scala b/modules/joex/src/main/scala/docspell/joex/routes/JoexRoutes.scala index 2e32275c..1ca68274 100644 --- a/modules/joex/src/main/scala/docspell/joex/routes/JoexRoutes.scala +++ b/modules/joex/src/main/scala/docspell/joex/routes/JoexRoutes.scala @@ -19,7 +19,8 @@ object JoexRoutes { case POST -> Root / "notify" => for { _ <- app.scheduler.notifyChange - resp <- Ok(BasicResult(true, "Scheduler notified.")) + _ <- app.periodicScheduler.notifyChange + resp <- Ok(BasicResult(true, "Schedulers notified.")) } yield resp case GET -> Root / "running" => diff --git a/modules/joex/src/main/scala/docspell/joex/scheduler/PeriodicScheduler.scala b/modules/joex/src/main/scala/docspell/joex/scheduler/PeriodicScheduler.scala new file mode 100644 index 00000000..6789adfa --- /dev/null +++ b/modules/joex/src/main/scala/docspell/joex/scheduler/PeriodicScheduler.scala @@ -0,0 +1,46 @@ +package docspell.joex.scheduler + +import fs2._ +import fs2.concurrent.SignallingRef +import cats.effect._ + +import docspell.store.queue._ + +/** A periodic scheduler takes care to submit periodic tasks to the + * job queue. + * + * It is run in the background to regularily find a periodic task to + * execute. If the task is due, it will be submitted into the job + * queue where it will be picked up by the scheduler from some joex + * instance. If it is due in the future, a notification is scheduled + * to be received at that time so the task can be looked up again. + */ +trait PeriodicScheduler[F[_]] { + + def config: PeriodicSchedulerConfig + + def start: Stream[F, Nothing] + + def shutdown: F[Unit] + + def periodicAwake: F[Fiber[F, Unit]] + + def notifyChange: F[Unit] +} + +object PeriodicScheduler { + + def create[F[_]: ConcurrentEffect: ContextShift]( + cfg: PeriodicSchedulerConfig, + queue: JobQueue[F], + store: PeriodicTaskStore[F], + timer: Timer[F] + ): Resource[F, PeriodicScheduler[F]] = + for { + waiter <- Resource.liftF(SignallingRef(true)) + state <- Resource.liftF(SignallingRef(PeriodicSchedulerImpl.emptyState[F])) + psch = new PeriodicSchedulerImpl[F](cfg, queue, store, waiter, state, timer) + _ <- Resource.liftF(psch.init) + } yield psch + +} diff --git a/modules/joex/src/main/scala/docspell/joex/scheduler/PeriodicSchedulerConfig.scala b/modules/joex/src/main/scala/docspell/joex/scheduler/PeriodicSchedulerConfig.scala new file mode 100644 index 00000000..fc5872c6 --- /dev/null +++ b/modules/joex/src/main/scala/docspell/joex/scheduler/PeriodicSchedulerConfig.scala @@ -0,0 +1,8 @@ +package docspell.joex.scheduler + +import docspell.common._ + +case class PeriodicSchedulerConfig( + name: Ident, + wakeupPeriod: Duration +) diff --git a/modules/joex/src/main/scala/docspell/joex/scheduler/PeriodicSchedulerImpl.scala b/modules/joex/src/main/scala/docspell/joex/scheduler/PeriodicSchedulerImpl.scala new file mode 100644 index 00000000..421deacc --- /dev/null +++ b/modules/joex/src/main/scala/docspell/joex/scheduler/PeriodicSchedulerImpl.scala @@ -0,0 +1,194 @@ +package docspell.joex.scheduler + +import fs2._ +import fs2.concurrent.SignallingRef +import cats.effect._ +import cats.implicits._ +import org.log4s.getLogger +import com.github.eikek.fs2calev._ + +import docspell.common._ +import docspell.common.syntax.all._ +import docspell.store.queue._ +import docspell.store.records.RPeriodicTask + +import PeriodicSchedulerImpl.State + +/* +onStartUp: +- remove worker value from all of the current + +Loop: +- get earliest pjob +- if none: stop +- if triggered: + - mark worker, restart loop on fail + - submit new job + - check for non-final jobs of that name + - if exist: log info + - if not exist: submit + - update next trigger (in both cases) + - remove worker + - restart loop +- if future + - schedule notify + - stop loop + + +onNotify: +- cancel current scheduled notify +- start Loop + + +onShutdown: +- nothing to do + */ + +final class PeriodicSchedulerImpl[F[_]: ConcurrentEffect: ContextShift]( + val config: PeriodicSchedulerConfig, + queue: JobQueue[F], + store: PeriodicTaskStore[F], + waiter: SignallingRef[F, Boolean], + state: SignallingRef[F, State[F]], + timer: Timer[F] +) extends PeriodicScheduler[F] { + private[this] val logger = getLogger + implicit private val _timer: Timer[F] = timer + + def start: Stream[F, Nothing] = + logger.sinfo("Starting periodic scheduler") ++ + mainLoop + + def shutdown: F[Unit] = + state.modify(_.requestShutdown) + + def periodicAwake: F[Fiber[F, Unit]] = + ConcurrentEffect[F].start( + Stream + .awakeEvery[F](config.wakeupPeriod.toScala) + .evalMap(_ => logger.fdebug("Periodic awake reached") *> notifyChange) + .compile + .drain + ) + + def notifyChange: F[Unit] = + waiter.update(b => !b) + + // internal + + /** + * On startup, get all periodic jobs from this scheduler and remove + * the mark, so they get picked up again. + */ + def init: F[Unit] = + logError("Error clearing marks")(store.clearMarks(config.name)) + + def mainLoop: Stream[F, Nothing] = { + val body: F[Boolean] = + for { + _ <- logger.fdebug(s"Going into main loop") + now <- Timestamp.current[F] + _ <- logger.fdebug(s"Looking for next periodic task") + go <- logThrow("Error getting next task")( + store + .takeNext(config.name) + .use({ + case Some(pj) => + (if (isTriggered(pj, now)) submitJob(pj) + else scheduleNotify(pj)).map(_ => true) + case None => + false.pure[F] + }) + ) + } yield go + + Stream + .eval(state.get.map(_.shutdownRequest)) + .evalTap( + if (_) logger.finfo[F]("Stopping main loop due to shutdown request.") + else ().pure[F] + ) + .flatMap(if (_) Stream.empty else Stream.eval(cancelNotify *> body)) + .flatMap({ + case true => + mainLoop + case false => + logger.sdebug(s"Waiting for notify") ++ + waiter.discrete.take(2).drain ++ + logger.sdebug(s"Notify signal, going into main loop") ++ + mainLoop + }) + } + + def isTriggered(pj: RPeriodicTask, now: Timestamp): Boolean = + pj.timer.contains(now.value) + + def submitJob(pj: RPeriodicTask): F[Unit] = + store + .findNonFinalJob(pj.id) + .flatMap({ + case Some(job) => + logger.finfo[F]( + s"There is already a job with non-final state '${job.state}' in the queue" + ) + + case None => + logger.finfo[F](s"Submitting job for periodic task '${pj.task.id}'") *> + pj.toJob.flatMap(queue.insert) + }) + + def scheduleNotify(pj: RPeriodicTask): F[Unit] = + ConcurrentEffect[F] + .start( + CalevFs2 + .sleep[F](pj.timer) + .evalMap(_ => notifyChange) + .compile + .drain + ) + .flatMap(fb => state.modify(_.setNotify(fb))) + + def cancelNotify: F[Unit] = + state + .modify(_.clearNotify) + .flatMap({ + case Some(fb) => + fb.cancel + case None => + ().pure[F] + }) + + private def logError(msg: => String)(fa: F[Unit]): F[Unit] = + fa.attempt.flatMap { + case Right(_) => ().pure[F] + case Left(ex) => logger.ferror(ex)(msg).map(_ => ()) + } + + private def logThrow[A](msg: => String)(fa: F[A]): F[A] = + fa.attempt + .flatMap({ + case r @ Right(_) => (r: Either[Throwable, A]).pure[F] + case l @ Left(ex) => logger.ferror(ex)(msg).map(_ => (l: Either[Throwable, A])) + }) + .rethrow +} + +object PeriodicSchedulerImpl { + def emptyState[F[_]]: State[F] = + State(false, None) + + case class State[F[_]]( + shutdownRequest: Boolean, + scheduledNotify: Option[Fiber[F, Unit]] + ) { + def requestShutdown: (State[F], Unit) = + (copy(shutdownRequest = true), ()) + + def setNotify(fb: Fiber[F, Unit]): (State[F], Unit) = + (copy(scheduledNotify = Some(fb)), ()) + + def clearNotify: (State[F], Option[Fiber[F, Unit]]) = + (copy(scheduledNotify = None), scheduledNotify) + + } +} diff --git a/modules/joex/src/main/scala/docspell/joex/scheduler/SchedulerBuilder.scala b/modules/joex/src/main/scala/docspell/joex/scheduler/SchedulerBuilder.scala index d1faee33..d2879a93 100644 --- a/modules/joex/src/main/scala/docspell/joex/scheduler/SchedulerBuilder.scala +++ b/modules/joex/src/main/scala/docspell/joex/scheduler/SchedulerBuilder.scala @@ -34,6 +34,9 @@ case class SchedulerBuilder[F[_]: ConcurrentEffect: ContextShift]( def withLogSink(sink: LogSink[F]): SchedulerBuilder[F] = copy(logSink = sink) + def withQueue(queue: JobQueue[F]): SchedulerBuilder[F] = + copy(queue = Resource.pure[F, JobQueue[F]](queue)) + def serve: Resource[F, Scheduler[F]] = resource.evalMap(sch => ConcurrentEffect[F].start(sch.start.compile.drain).map(_ => sch)) diff --git a/modules/restserver/src/main/scala/docspell/restserver/conv/Conversions.scala b/modules/restserver/src/main/scala/docspell/restserver/conv/Conversions.scala index 123234d4..def5cc61 100644 --- a/modules/restserver/src/main/scala/docspell/restserver/conv/Conversions.scala +++ b/modules/restserver/src/main/scala/docspell/restserver/conv/Conversions.scala @@ -118,7 +118,7 @@ trait Conversions { ) def mkItemList(v: Vector[OItem.ListItem]): ItemLightList = { - val groups = v.groupBy(item => item.date.toDate.toString.substring(0, 7)) + val groups = v.groupBy(item => item.date.toUtcDate.toString.substring(0, 7)) def mkGroup(g: (String, Vector[OItem.ListItem])): ItemLightGroup = ItemLightGroup(g._1, g._2.map(mkItemLight).toList) diff --git a/modules/store/src/main/resources/db/migration/mariadb/V1.3.0__periodic_job.sql b/modules/store/src/main/resources/db/migration/mariadb/V1.3.0__periodic_job.sql new file mode 100644 index 00000000..c882b38a --- /dev/null +++ b/modules/store/src/main/resources/db/migration/mariadb/V1.3.0__periodic_job.sql @@ -0,0 +1,15 @@ +CREATE TABLE `periodic_task` ( + `id` varchar(254) not null primary key, + `enabled` boolean not null, + `task` varchar(254) not null, + `group_` varchar(254) not null, + `args` text not null, + `subject` varchar(254) not null, + `submitter` varchar(254) not null, + `priority` int not null, + `worker` varchar(254), + `marked` timestamp, + `timer` varchar(254) not null, + `nextrun` timestamp not null, + `created` timestamp not null +); diff --git a/modules/store/src/main/resources/db/migration/postgresql/V1.3.0__periodic_job.sql b/modules/store/src/main/resources/db/migration/postgresql/V1.3.0__periodic_job.sql new file mode 100644 index 00000000..57fc1178 --- /dev/null +++ b/modules/store/src/main/resources/db/migration/postgresql/V1.3.0__periodic_job.sql @@ -0,0 +1,13 @@ +CREATE TABLE "periodic_task" ( + "id" varchar(254) not null primary key, + "enabled" boolean not null, + "task" varchar(254) not null, + "group_" varchar(254) not null, + "args" text not null, + "subject" varchar(254) not null, + "submitter" varchar(254) not null, + "priority" int not null, + "worker" varchar(254), + "timer" varchar(254) not null, + "nextrun" timestamp not null +); diff --git a/modules/store/src/main/scala/docspell/store/impl/DoobieMeta.scala b/modules/store/src/main/scala/docspell/store/impl/DoobieMeta.scala index 62f058cd..8bc57809 100644 --- a/modules/store/src/main/scala/docspell/store/impl/DoobieMeta.scala +++ b/modules/store/src/main/scala/docspell/store/impl/DoobieMeta.scala @@ -7,6 +7,7 @@ import doobie._ import doobie.implicits.legacy.instant._ import doobie.util.log.Success import emil.{MailAddress, SSLType} +import com.github.eikek.calev.CalEvent import docspell.common._ import docspell.common.syntax.all._ @@ -98,6 +99,9 @@ trait DoobieMeta { Meta[String].imap(str => str.split(',').toList.map(_.trim).map(EmilUtil.unsafeReadMailAddress))( lma => lma.map(EmilUtil.mailAddressString).mkString(",") ) + + implicit val metaCalEvent: Meta[CalEvent] = + Meta[String].timap(CalEvent.unsafe)(_.asString) } object DoobieMeta extends DoobieMeta { diff --git a/modules/store/src/main/scala/docspell/store/queue/PeriodicTaskStore.scala b/modules/store/src/main/scala/docspell/store/queue/PeriodicTaskStore.scala new file mode 100644 index 00000000..ac9bce6b --- /dev/null +++ b/modules/store/src/main/scala/docspell/store/queue/PeriodicTaskStore.scala @@ -0,0 +1,77 @@ +package docspell.store.queue + +import cats.effect._ +import cats.implicits._ +import fs2.Stream +import docspell.common._ +import docspell.store.Store +import docspell.store.records._ + +trait PeriodicTaskStore[F[_]] { + + /** Get the free periodic task due next and reserve it to the given + * worker. + * + * If found, the task is returned and resource finalization takes + * care of unmarking the task after use and updating `nextRun` with + * the next timestamp. + */ + def takeNext(worker: Ident): Resource[F, Option[RPeriodicTask]] + + def clearMarks(name: Ident): F[Unit] + + def findNonFinalJob(pjobId: Ident): F[Option[RJob]] + + def insert(task: RPeriodicTask): F[Unit] +} + +object PeriodicTaskStore { + + def create[F[_]: Sync](store: Store[F]): Resource[F, PeriodicTaskStore[F]] = + Resource.pure[F, PeriodicTaskStore[F]](new PeriodicTaskStore[F] { + println(s"$store") + + def takeNext(worker: Ident): Resource[F, Option[RPeriodicTask]] = { + val chooseNext: F[Either[String, Option[RPeriodicTask]]] = + getNext.flatMap { + case Some(pj) => + mark(pj.id, worker).map { + case true => Right(Some(pj.copy(worker = worker.some))) + case false => Left("Cannot mark periodic task") + } + case None => + val result: Either[String, Option[RPeriodicTask]] = + Right(None) + result.pure[F] + } + val get = + Stream.eval(chooseNext).repeat.take(10).find(_.isRight).compile.lastOrError + val r = Resource.make(get)({ + case Right(Some(pj)) => unmark(pj) + case _ => ().pure[F] + }) + r.flatMap { + case Right(job) => Resource.pure(job) + case Left(err) => Resource.liftF(Sync[F].raiseError(new Exception(err))) + } + } + + def getNext: F[Option[RPeriodicTask]] = + Sync[F].raiseError(new Exception("not implemented")) + + def mark(pid: Ident, name: Ident): F[Boolean] = + Sync[F].raiseError(new Exception(s"not implemented $pid $name")) + + def unmark(job: RPeriodicTask): F[Unit] = + Sync[F].raiseError(new Exception(s"not implemented $job")) + + def clearMarks(name: Ident): F[Unit] = + Sync[F].raiseError(new Exception("not implemented")) + + def findNonFinalJob(pjobId: Ident): F[Option[RJob]] = + Sync[F].raiseError(new Exception("not implemented")) + + def insert(task: RPeriodicTask): F[Unit] = + Sync[F].raiseError(new Exception("not implemented")) + }) +} diff --git a/modules/store/src/main/scala/docspell/store/records/RPeriodicTask.scala b/modules/store/src/main/scala/docspell/store/records/RPeriodicTask.scala new file mode 100644 index 00000000..92b868d4 --- /dev/null +++ b/modules/store/src/main/scala/docspell/store/records/RPeriodicTask.scala @@ -0,0 +1,141 @@ +package docspell.store.records + +import cats.effect._ +import cats.implicits._ +import doobie._ +import doobie.implicits._ +import com.github.eikek.calev.CalEvent +import docspell.common._ +import docspell.store.impl.Column +import docspell.store.impl.Implicits._ + +/** A periodic task is a special job description, that shares a few + * properties of a `RJob`. It must provide all information to create + * a `RJob` value eventually. + */ +case class RPeriodicTask( + id: Ident, + enabled: Boolean, + task: Ident, + group: Ident, + args: String, + subject: String, + submitter: Ident, + priority: Priority, + worker: Option[Ident], + marked: Option[Timestamp], + timer: CalEvent, + nextrun: Timestamp, + created: Timestamp +) { + + def toJob[F[_]: Sync]: F[RJob] = + for { + now <- Timestamp.current[F] + jid <- Ident.randomId[F] + } yield RJob( + jid, + task, + group, + args, + subject, + now, + submitter, + priority, + JobState.Waiting, + 0, + 0, + Some(id), + None, + None, + None + ) +} + +object RPeriodicTask { + + def create[F[_]: Sync]( + enabled: Boolean, + task: Ident, + group: Ident, + args: String, + subject: String, + submitter: Ident, + priority: Priority, + worker: Option[Ident], + marked: Option[Timestamp], + timer: CalEvent + ): F[RPeriodicTask] = + Ident + .randomId[F] + .flatMap(id => + Timestamp + .current[F] + .map { now => + RPeriodicTask( + id, + enabled, + task, + group, + args, + subject, + submitter, + priority, + worker, + marked, + timer, + timer + .nextElapse(now.atZone(Timestamp.UTC)) + .map(_.toInstant) + .map(Timestamp.apply) + .getOrElse(Timestamp.Epoch), + now + ) + } + ) + + val table = fr"periodic_task" + + object Columns { + val id = Column("id") + val enabled = Column("enabled") + val task = Column("task") + val group = Column("group_") + val args = Column("args") + val subject = Column("subject") + val submitter = Column("submitter") + val priority = Column("priority") + val worker = Column("worker") + val marked = Column("marked") + val timer = Column("timer") + val nextrun = Column("nextrun") + val created = Column("created") + val all = List( + id, + enabled, + task, + group, + args, + subject, + submitter, + priority, + worker, + marked, + timer, + nextrun, + created + ) + } + + import Columns._ + + def insert(v: RPeriodicTask): ConnectionIO[Int] = { + val sql = insertRow( + table, + all, + fr"${v.id},${v.enabled},${v.task},${v.group},${v.args},${v.subject},${v.submitter},${v.priority},${v.worker},${v.marked},${v.timer},${v.nextrun},${v.created}" + ) + sql.update.run + } + +} diff --git a/project/Dependencies.scala b/project/Dependencies.scala index d4636c5d..dea1cff3 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -7,6 +7,7 @@ object Dependencies { val BcryptVersion = "0.4" val BetterMonadicForVersion = "0.3.1" val BitpeaceVersion = "0.4.3" + val CalevVersion = "0.1.0" val CirceVersion = "0.13.0" val DoobieVersion = "0.8.8" val EmilVersion = "0.2.0" @@ -37,6 +38,11 @@ object Dependencies { val ViewerJSVersion = "0.5.8" + val calev = Seq( + "com.github.eikek" %% "calev-core" % CalevVersion, + "com.github.eikek" %% "calev-fs2" % CalevVersion + ) + val jclOverSlf4j = Seq( "org.slf4j" % "jcl-over-slf4j" % Slf4jVersion )