Integrate periodic tasks

The first use case for periodic task is the cleanup of expired
invitation keys. This is part of a house-keeping periodic task.
This commit is contained in:
Eike Kettner 2020-03-08 15:26:56 +01:00
parent 616c333fa5
commit 854a596da3
25 changed files with 388 additions and 108 deletions

View File

@ -141,6 +141,7 @@ val common = project.in(file("modules/common")).
Dependencies.fs2 ++ Dependencies.fs2 ++
Dependencies.circe ++ Dependencies.circe ++
Dependencies.loggingApi ++ Dependencies.loggingApi ++
Dependencies.calevCore ++
Dependencies.pureconfig.map(_ % "optional") Dependencies.pureconfig.map(_ % "optional")
) )

View File

@ -16,6 +16,9 @@ case class Timestamp(value: Instant) {
def minus(d: Duration): Timestamp = def minus(d: Duration): Timestamp =
Timestamp(value.minusNanos(d.nanos)) Timestamp(value.minusNanos(d.nanos))
def - (d: Duration): Timestamp =
minus(d)
def minusHours(n: Long): Timestamp = def minusHours(n: Long): Timestamp =
Timestamp(value.minusSeconds(n * 60 * 60)) Timestamp(value.minusSeconds(n * 60 * 60))
@ -31,6 +34,9 @@ case class Timestamp(value: Instant) {
def atUTC: ZonedDateTime = atZone(Timestamp.UTC) def atUTC: ZonedDateTime = atZone(Timestamp.UTC)
def asString: String = value.toString def asString: String = value.toString
def < (other: Timestamp): Boolean =
this.value.isBefore(other.value)
} }
object Timestamp { object Timestamp {

View File

@ -4,6 +4,7 @@ import docspell.common._
import _root_.pureconfig._ import _root_.pureconfig._
import _root_.pureconfig.error.{CannotConvert, FailureReason} import _root_.pureconfig.error.{CannotConvert, FailureReason}
import scodec.bits.ByteVector import scodec.bits.ByteVector
import com.github.eikek.calev.CalEvent
import scala.reflect.ClassTag import scala.reflect.ClassTag
@ -31,6 +32,10 @@ object Implicits {
else ByteVector.encodeUtf8(str).left.map(ex => s"Invalid utf8 string: ${ex.getMessage}") else ByteVector.encodeUtf8(str).left.map(ex => s"Invalid utf8 string: ${ex.getMessage}")
}) })
implicit val caleventReader: ConfigReader[CalEvent] =
ConfigReader[String].emap(reason(CalEvent.parse))
def reason[A: ClassTag](f: String => Either[String, A]): String => Either[FailureReason, A] = def reason[A: ClassTag](f: String => Either[String, A]): String => Either[FailureReason, A] =
in => in =>
f(in).left.map(str => CannotConvert(in, implicitly[ClassTag[A]].runtimeClass.toString, str)) f(in).left.map(str => CannotConvert(in, implicitly[ClassTag[A]].runtimeClass.toString, str))

View File

@ -77,6 +77,21 @@ docspell.joex {
wakeup-period = "10 minutes" wakeup-period = "10 minutes"
} }
# Docspell uses periodic house keeping tasks, like cleaning expired
# invites, that can be configured here.
house-keeping {
# When the house keeping task executes. Default is every 4 hours.
schedule = "*-*-* 0/4:00:00"
# This task removes invitation keys that have been created but not
# used. The timespan here must be greater than the `invite-time'
# setting in the rest server config file.
cleanup-invites = {
older-than = "30 days"
}
}
# Configuration of text extraction # Configuration of text extraction
extraction { extraction {
# For PDF files it is first tried to read the text parts of the # For PDF files it is first tried to read the text parts of the

View File

@ -5,6 +5,7 @@ import docspell.joex.scheduler.{PeriodicSchedulerConfig, SchedulerConfig}
import docspell.store.JdbcConfig import docspell.store.JdbcConfig
import docspell.convert.ConvertConfig import docspell.convert.ConvertConfig
import docspell.extract.ExtractConfig import docspell.extract.ExtractConfig
import docspell.joex.hk.HouseKeepingConfig
case class Config( case class Config(
appId: Ident, appId: Ident,
@ -13,6 +14,7 @@ case class Config(
jdbc: JdbcConfig, jdbc: JdbcConfig,
scheduler: SchedulerConfig, scheduler: SchedulerConfig,
periodicScheduler: PeriodicSchedulerConfig, periodicScheduler: PeriodicSchedulerConfig,
houseKeeping: HouseKeepingConfig,
extraction: ExtractConfig, extraction: ExtractConfig,
convert: ConvertConfig convert: ConvertConfig
) )

View File

@ -3,9 +3,10 @@ package docspell.joex
import cats.implicits._ import cats.implicits._
import cats.effect._ import cats.effect._
import docspell.common.{Ident, NodeType, ProcessItemArgs} import docspell.common.{Ident, NodeType, ProcessItemArgs}
import docspell.joex.background._ import docspell.joex.hk._
import docspell.joex.process.ItemHandler import docspell.joex.process.ItemHandler
import docspell.joex.scheduler._ import docspell.joex.scheduler._
import docspell.joexapi.client.JoexClient
import docspell.store.Store import docspell.store.Store
import docspell.store.queue._ import docspell.store.queue._
import docspell.store.ops.ONode import docspell.store.ops.ONode
@ -18,6 +19,7 @@ final class JoexAppImpl[F[_]: ConcurrentEffect: ContextShift: Timer](
cfg: Config, cfg: Config,
nodeOps: ONode[F], nodeOps: ONode[F],
store: Store[F], store: Store[F],
pstore: PeriodicTaskStore[F],
termSignal: SignallingRef[F, Boolean], termSignal: SignallingRef[F, Boolean],
val scheduler: Scheduler[F], val scheduler: Scheduler[F],
val periodicScheduler: PeriodicScheduler[F] val periodicScheduler: PeriodicScheduler[F]
@ -27,6 +29,7 @@ final class JoexAppImpl[F[_]: ConcurrentEffect: ContextShift: Timer](
val run = scheduler.start.compile.drain val run = scheduler.start.compile.drain
val prun = periodicScheduler.start.compile.drain val prun = periodicScheduler.start.compile.drain
for { for {
_ <- HouseKeepingTask.submit(pstore, cfg.houseKeeping.schedule)
_ <- ConcurrentEffect[F].start(run) _ <- ConcurrentEffect[F].start(run)
_ <- ConcurrentEffect[F].start(prun) _ <- ConcurrentEffect[F].start(prun)
_ <- scheduler.periodicAwake _ <- scheduler.periodicAwake
@ -52,14 +55,15 @@ object JoexAppImpl {
cfg: Config, cfg: Config,
termSignal: SignallingRef[F, Boolean], termSignal: SignallingRef[F, Boolean],
connectEC: ExecutionContext, connectEC: ExecutionContext,
clientEC: ExecutionContext,
blocker: Blocker blocker: Blocker
): Resource[F, JoexApp[F]] = ): Resource[F, JoexApp[F]] =
for { for {
client <- JoexClient.resource(clientEC)
store <- Store.create(cfg.jdbc, connectEC, blocker) store <- Store.create(cfg.jdbc, connectEC, blocker)
queue <- JobQueue(store) queue <- JobQueue(store)
pstore <- PeriodicTaskStore.create(store) pstore <- PeriodicTaskStore.create(store)
nodeOps <- ONode(store) nodeOps <- ONode(store)
psch <- PeriodicScheduler.create(cfg.periodicScheduler, queue, pstore, Timer[F])
sch <- SchedulerBuilder(cfg.scheduler, blocker, store) sch <- SchedulerBuilder(cfg.scheduler, blocker, store)
.withQueue(queue) .withQueue(queue)
.withTask( .withTask(
@ -71,13 +75,14 @@ object JoexAppImpl {
) )
.withTask( .withTask(
JobTask.json( JobTask.json(
PeriodicTask.taskName, HouseKeepingTask.taskName,
PeriodicTask[F](cfg), HouseKeepingTask[F](cfg),
PeriodicTask.onCancel[F] HouseKeepingTask.onCancel[F]
) )
) )
.resource .resource
app = new JoexAppImpl(cfg, nodeOps, store, termSignal, sch, psch) psch <- PeriodicScheduler.create(cfg.periodicScheduler, sch, queue, pstore, client, Timer[F])
app = new JoexAppImpl(cfg, nodeOps, store, pstore, termSignal, sch, psch)
appR <- Resource.make(app.init.map(_ => app))(_.shutdown) appR <- Resource.make(app.init.map(_ => app))(_.shutdown)
} yield appR } yield appR
} }

View File

@ -24,13 +24,14 @@ object JoexServer {
def stream[F[_]: ConcurrentEffect: ContextShift]( def stream[F[_]: ConcurrentEffect: ContextShift](
cfg: Config, cfg: Config,
connectEC: ExecutionContext, connectEC: ExecutionContext,
clientEC: ExecutionContext,
blocker: Blocker blocker: Blocker
)(implicit T: Timer[F]): Stream[F, Nothing] = { )(implicit T: Timer[F]): Stream[F, Nothing] = {
val app = for { val app = for {
signal <- Resource.liftF(SignallingRef[F, Boolean](false)) signal <- Resource.liftF(SignallingRef[F, Boolean](false))
exitCode <- Resource.liftF(Ref[F].of(ExitCode.Success)) exitCode <- Resource.liftF(Ref[F].of(ExitCode.Success))
joexApp <- JoexAppImpl.create[F](cfg, signal, connectEC, blocker) joexApp <- JoexAppImpl.create[F](cfg, signal, connectEC, clientEC, blocker)
httpApp = Router( httpApp = Router(
"/api/info" -> InfoRoutes(), "/api/info" -> InfoRoutes(),

View File

@ -13,10 +13,10 @@ import org.log4s._
object Main extends IOApp { object Main extends IOApp {
private[this] val logger = getLogger private[this] val logger = getLogger
val blockingEc: ExecutionContext = ExecutionContext.fromExecutor( val blockingEC: ExecutionContext = ExecutionContext.fromExecutor(
Executors.newCachedThreadPool(ThreadFactories.ofName("docspell-joex-blocking")) Executors.newCachedThreadPool(ThreadFactories.ofName("docspell-joex-blocking"))
) )
val blocker = Blocker.liftExecutionContext(blockingEc) val blocker = Blocker.liftExecutionContext(blockingEC)
val connectEC: ExecutionContext = ExecutionContext.fromExecutorService( val connectEC: ExecutionContext = ExecutionContext.fromExecutorService(
Executors.newFixedThreadPool(5, ThreadFactories.ofName("docspell-joex-dbconnect")) Executors.newFixedThreadPool(5, ThreadFactories.ofName("docspell-joex-dbconnect"))
) )
@ -52,6 +52,6 @@ object Main extends IOApp {
cfg.baseUrl cfg.baseUrl
) )
logger.info(s"\n${banner.render("***>")}") logger.info(s"\n${banner.render("***>")}")
JoexServer.stream[IO](cfg, connectEC, blocker).compile.drain.as(ExitCode.Success) JoexServer.stream[IO](cfg, connectEC, blockingEC, blocker).compile.drain.as(ExitCode.Success)
} }
} }

View File

@ -0,0 +1,24 @@
package docspell.joex.hk
import cats.implicits._
import cats.effect._
import docspell.common._
import docspell.joex.Config
import docspell.joex.scheduler.Task
import docspell.store.records._
object CleanupInvitesTask {
def apply[F[_]: Sync: ContextShift](cfg: Config): Task[F, Unit, Unit] =
Task { ctx =>
val threshold = cfg.houseKeeping.cleanupInvites.olderThan
for {
now <- Timestamp.current[F]
ts = now - threshold
_ <- ctx.logger.info(s"Cleanup invitations older than $ts")
n <- ctx.store.transact(RInvitation.deleteOlderThan(ts))
_ <- ctx.logger.info(s"Removed $n invitations")
} yield ()
}
}

View File

@ -0,0 +1,17 @@
package docspell.joex.hk
import com.github.eikek.calev.CalEvent
import docspell.common._
import HouseKeepingConfig._
case class HouseKeepingConfig(
schedule: CalEvent,
cleanupInvites: CleanupInvites
)
object HouseKeepingConfig {
case class CleanupInvites(olderThan: Duration)
}

View File

@ -0,0 +1,50 @@
package docspell.joex.hk
import cats.implicits._
import cats.effect._
import com.github.eikek.calev._
import docspell.common._
import docspell.joex.Config
import docspell.joex.scheduler.Task
import docspell.store.queue._
import docspell.store.records._
object HouseKeepingTask {
private val periodicId = Ident.unsafe("docspell-houskeeping")
val systemGroup: Ident = Ident.unsafe("docspell-system")
val taskName: Ident = Ident.unsafe("housekeeping")
def apply[F[_]: Sync: ContextShift](cfg: Config): Task[F, Unit, Unit] =
log[F](_.info(s"Running house-keeping task now"))
.flatMap(_ => CleanupInvitesTask(cfg))
def onCancel[F[_]: Sync: ContextShift]: Task[F, Unit, Unit] =
Task(_.logger.warn("Cancelling background task"))
def submit[F[_]: Sync](
pstore: PeriodicTaskStore[F],
ce: CalEvent
): F[Unit] = {
val makeJob =
RPeriodicTask.createJson(
true,
taskName,
systemGroup,
(),
"Docspell house-keeping",
systemGroup,
Priority.Low,
ce
)
for {
job <- makeJob
_ <- pstore.insert(job.copy(id = periodicId)).attempt
} yield ()
}
private def log[F[_]](f: Logger[F] => F[Unit]): Task[F, Unit, Unit] =
Task(ctx => f(ctx.logger))
}

View File

@ -4,6 +4,7 @@ import fs2._
import fs2.concurrent.SignallingRef import fs2.concurrent.SignallingRef
import cats.effect._ import cats.effect._
import docspell.joexapi.client.JoexClient
import docspell.store.queue._ import docspell.store.queue._
/** A periodic scheduler takes care to submit periodic tasks to the /** A periodic scheduler takes care to submit periodic tasks to the
@ -32,14 +33,16 @@ object PeriodicScheduler {
def create[F[_]: ConcurrentEffect: ContextShift]( def create[F[_]: ConcurrentEffect: ContextShift](
cfg: PeriodicSchedulerConfig, cfg: PeriodicSchedulerConfig,
sch: Scheduler[F],
queue: JobQueue[F], queue: JobQueue[F],
store: PeriodicTaskStore[F], store: PeriodicTaskStore[F],
client: JoexClient[F],
timer: Timer[F] timer: Timer[F]
): Resource[F, PeriodicScheduler[F]] = ): Resource[F, PeriodicScheduler[F]] =
for { for {
waiter <- Resource.liftF(SignallingRef(true)) waiter <- Resource.liftF(SignallingRef(true))
state <- Resource.liftF(SignallingRef(PeriodicSchedulerImpl.emptyState[F])) state <- Resource.liftF(SignallingRef(PeriodicSchedulerImpl.emptyState[F]))
psch = new PeriodicSchedulerImpl[F](cfg, queue, store, waiter, state, timer) psch = new PeriodicSchedulerImpl[F](cfg, sch, queue, store, client, waiter, state, timer)
_ <- Resource.liftF(psch.init) _ <- Resource.liftF(psch.init)
} yield psch } yield psch

View File

@ -9,45 +9,18 @@ import com.github.eikek.fs2calev._
import docspell.common._ import docspell.common._
import docspell.common.syntax.all._ import docspell.common.syntax.all._
import docspell.joexapi.client.JoexClient
import docspell.store.queue._ import docspell.store.queue._
import docspell.store.records.RPeriodicTask import docspell.store.records.RPeriodicTask
import PeriodicSchedulerImpl.State import PeriodicSchedulerImpl.State
/*
onStartUp:
- remove worker value from all of the current
Loop:
- get earliest pjob
- if none: stop
- if triggered:
- mark worker, restart loop on fail
- submit new job
- check for non-final jobs of that name
- if exist: log info
- if not exist: submit
- update next trigger (in both cases)
- remove worker
- restart loop
- if future
- schedule notify
- stop loop
onNotify:
- cancel current scheduled notify
- start Loop
onShutdown:
- nothing to do
*/
final class PeriodicSchedulerImpl[F[_]: ConcurrentEffect: ContextShift]( final class PeriodicSchedulerImpl[F[_]: ConcurrentEffect: ContextShift](
val config: PeriodicSchedulerConfig, val config: PeriodicSchedulerConfig,
sch: Scheduler[F],
queue: JobQueue[F], queue: JobQueue[F],
store: PeriodicTaskStore[F], store: PeriodicTaskStore[F],
client: JoexClient[F],
waiter: SignallingRef[F, Boolean], waiter: SignallingRef[F, Boolean],
state: SignallingRef[F, State[F]], state: SignallingRef[F, State[F]],
timer: Timer[F] timer: Timer[F]
@ -91,15 +64,18 @@ final class PeriodicSchedulerImpl[F[_]: ConcurrentEffect: ContextShift](
_ <- logger.fdebug(s"Looking for next periodic task") _ <- logger.fdebug(s"Looking for next periodic task")
go <- logThrow("Error getting next task")( go <- logThrow("Error getting next task")(
store store
.takeNext(config.name) .takeNext(config.name, None)
.use({ .use({
case Some(pj) => case Marked.Found(pj) =>
logger logger
.fdebug(s"Found periodic task '${pj.subject}/${pj.timer.asString}'") *> .fdebug(s"Found periodic task '${pj.subject}/${pj.timer.asString}'") *>
(if (isTriggered(pj, now)) submitJob(pj) (if (isTriggered(pj, now)) submitJob(pj)
else scheduleNotify(pj)).map(_ => true) else scheduleNotify(pj).map(_ => false))
case None => case Marked.NotFound =>
logger.fdebug("No periodic task found") *> false.pure[F] logger.fdebug("No periodic task found") *> false.pure[F]
case Marked.NotMarkable =>
logger.fdebug("Periodic job cannot be marked. Trying again.") *> true
.pure[F]
}) })
) )
} yield go } yield go
@ -123,32 +99,44 @@ final class PeriodicSchedulerImpl[F[_]: ConcurrentEffect: ContextShift](
} }
def isTriggered(pj: RPeriodicTask, now: Timestamp): Boolean = def isTriggered(pj: RPeriodicTask, now: Timestamp): Boolean =
pj.timer.contains(now.value) pj.nextrun < now
def submitJob(pj: RPeriodicTask): F[Unit] = def submitJob(pj: RPeriodicTask): F[Boolean] =
store store
.findNonFinalJob(pj.id) .findNonFinalJob(pj.id)
.flatMap({ .flatMap({
case Some(job) => case Some(job) =>
logger.finfo[F]( logger.finfo[F](
s"There is already a job with non-final state '${job.state}' in the queue" s"There is already a job with non-final state '${job.state}' in the queue"
) ) *> scheduleNotify(pj) *> false.pure[F]
case None => case None =>
logger.finfo[F](s"Submitting job for periodic task '${pj.task.id}'") *> logger.finfo[F](s"Submitting job for periodic task '${pj.task.id}'") *>
pj.toJob.flatMap(queue.insert) pj.toJob.flatMap(queue.insert) *> notifyJoex *> true.pure[F]
}) })
def notifyJoex: F[Unit] =
sch.notifyChange *> store.findJoexNodes.flatMap(
_.traverse(n => client.notifyJoexIgnoreErrors(n.url)).map(_ => ())
)
def scheduleNotify(pj: RPeriodicTask): F[Unit] = def scheduleNotify(pj: RPeriodicTask): F[Unit] =
ConcurrentEffect[F] Timestamp
.start( .current[F]
CalevFs2 .flatMap(now =>
.sleep[F](pj.timer) logger.fdebug(
.evalMap(_ => notifyChange) s"Scheduling next notify for timer ${pj.timer.asString} -> ${pj.timer.nextElapse(now.toUtcDateTime)}"
.compile )
.drain ) *>
) ConcurrentEffect[F]
.flatMap(fb => state.modify(_.setNotify(fb))) .start(
CalevFs2
.sleep[F](pj.timer)
.evalMap(_ => notifyChange)
.compile
.drain
)
.flatMap(fb => state.modify(_.setNotify(fb)))
def cancelNotify: F[Unit] = def cancelNotify: F[Unit] =
state state

View File

@ -3,6 +3,7 @@ package docspell.joexapi.client
import cats.implicits._ import cats.implicits._
import cats.effect._ import cats.effect._
import docspell.common.{Ident, LenientUri} import docspell.common.{Ident, LenientUri}
import docspell.common.syntax.all._
import org.http4s.{Method, Request, Uri} import org.http4s.{Method, Request, Uri}
import org.http4s.client.Client import org.http4s.client.Client
import org.http4s.client.blaze.BlazeClientBuilder import org.http4s.client.blaze.BlazeClientBuilder
@ -28,21 +29,24 @@ object JoexClient {
new JoexClient[F] { new JoexClient[F] {
def notifyJoex(base: LenientUri): F[Unit] = { def notifyJoex(base: LenientUri): F[Unit] = {
val notifyUrl = base / "api" / "v1" / "notify" val notifyUrl = base / "api" / "v1" / "notify"
val req = Request[F](Method.POST, uri(notifyUrl)) val req = Request[F](Method.POST, uri(notifyUrl))
client.expect[String](req).map(_ => ()) logger.fdebug(s"Notify joex at ${notifyUrl.asString}") *>
client.expect[String](req).map(_ => ())
} }
def notifyJoexIgnoreErrors(base: LenientUri): F[Unit] = def notifyJoexIgnoreErrors(base: LenientUri): F[Unit] =
notifyJoex(base).attempt.map { notifyJoex(base).attempt.map {
case Right(()) => () case Right(()) => ()
case Left(ex) => case Left(ex) =>
logger.warn(s"Notifying Joex instance '${base.asString}' failed: ${ex.getMessage}") logger.warn(
s"Notifying Joex instance '${base.asString}' failed: ${ex.getMessage}"
)
() ()
} }
def cancelJob(base: LenientUri, job: Ident): F[Unit] = { def cancelJob(base: LenientUri, job: Ident): F[Unit] = {
val cancelUrl = base / "api" / "v1" / "job" / job.id / "cancel" val cancelUrl = base / "api" / "v1" / "job" / job.id / "cancel"
val req = Request[F](Method.POST, uri(cancelUrl)) val req = Request[F](Method.POST, uri(cancelUrl))
client.expect[String](req).map(_ => ()) client.expect[String](req).map(_ => ())
} }

View File

@ -16,3 +16,4 @@ title: ADRs
- [0009 Convert Office Files](adr/0009_convert_office_docs) - [0009 Convert Office Files](adr/0009_convert_office_docs)
- [0010 Convert Image Files](adr/0010_convert_image_files) - [0010 Convert Image Files](adr/0010_convert_image_files)
- [0011 Extract Text](adr/0011_extract_text) - [0011 Extract Text](adr/0011_extract_text)
- [0012 Periodic Tasks](adr/0012_periodic_tasks)

View File

@ -0,0 +1,102 @@
---
layout: docs
title: Periodic Tasks
---
# Periodic Tasks
## Context and Problem Statement
Currently there is a `Scheduler` that consumes tasks off a queue in
the database. This allows multiple job executors running in parallel
racing for the next job to execute. This is for executing tasks
immediately as long as there are enough resource.
What is missing, is a component that maintains periodic tasks. The
reason for this is to have house keeping tasks that run regularily and
clean up stale or unused data. Later, users should be able to create
periodic tasks, for example to read e-mails from an inbox.
The problem is again, that it must work with multiple job executor
instances running at the same time. This is the same pattern as with
the `Scheduler`: it must be ensured that only one task is used at a
time. Multiple job exectuors must not schedule a perdiodic task more
than once. If a periodic tasks takes longer than the time between
runs, it must wait for the next interval.
## Considered Options
1. Adding a `timer` and `nextrun` field to the current `job` table
2. Creating a separate table for periodic tasks
## Decision Outcome
The 2. option.
For internal housekeeping tasks, it may suffice to reuse the existing
`job` queue by adding more fields such that a job may be considered
periodic. But this conflates with what the `Scheduler` is doing now
(executing tasks as soon as possible while being bound to some
resources) with a completely different subject.
There will be a new `PeriodicScheduler` that works on a new table in
the database that is representing periodic tasks. This table will
share fields with the `job` table to be able to create `RJob`
instances. This new component is only taking care of periodically
submitting jobs to the job queue such that the `Scheduler` will
eventually pick it up and run it.
```sql
CREATE TABLE "periodic_task" (
"id" varchar(254) not null primary key,
"enabled" boolean not null,
"task" varchar(254) not null,
"group_" varchar(254) not null,
"args" text not null,
"subject" varchar(254) not null,
"submitter" varchar(254) not null,
"priority" int not null,
"worker" varchar(254),
"marked" timestamp,
"timer" varchar(254) not null,
"nextrun" timestamp not null,
"created" timestamp not null
);
```
Preparing for other features, periodic tasks will be created by users.
It should be possible to disable/enable them. The next 6 properties
are needed to insert jobs into the `job` table. The `worker` field
(and `marked`) are used to mark a periodic job as "being worked on by
a job executor".
The `timer` is the schedule, which is a
[systemd-like](https://man.cx/systemd.time#heading7) calendar event
string. This is parsed by [this
library](https://github.com/eikek/calev). The `nextrun` field will
store the timestamp of the next time the task would need to be
executed. This is needed to query this table for the newest task.
The `PeriodicScheduler` works roughly like this:
On startup:
- Remove stale worker values. If the process has been killed, there
may be marked tasks which must be cleared now.
Main-Loop:
0. Cancel current scheduled notify (see 4. below)
1. get next (= earliest & enabled) periodic job
2. if none: stop
3. if triggered (= `nextrun <= 'now'`):
- Mark periodic task. On fail: goto 1.
- Submit new job into the jobqueue:
- Update `nextrun` field
- Check for non-final jobs of that name. This is required to not
run the same periodic task multiple times concurrently.
- if exist: goto 4.
- if not exist: submit job
- Unmark periodic task
4. if future
- schedule notify: notify self to run again next time the task
schedule triggers

View File

@ -1,3 +1,8 @@
---
layout: docs
title: Short Title
---
# [short title of solved problem and solution] # [short title of solved problem and solution]
* Status: [proposed | rejected | accepted | deprecated | … | superseded by [ADR-0005](0005-example.md)] <!-- optional --> * Status: [proposed | rejected | accepted | deprecated | … | superseded by [ADR-0005](0005-example.md)] <!-- optional -->

View File

@ -30,6 +30,9 @@ case class Column(name: String, ns: String = "", alias: String = "") {
def is(c: Column): Fragment = def is(c: Column): Fragment =
f ++ fr"=" ++ c.f f ++ fr"=" ++ c.f
def isNot[A: Put](value: A): Fragment =
f ++ fr"<> $value"
def isNull: Fragment = def isNull: Fragment =
f ++ fr"is null" f ++ fr"is null"

View File

@ -1,26 +1,27 @@
package docspell.store.queries package docspell.store.queries
//import cats.implicits._
import docspell.common._ import docspell.common._
//import docspell.common.syntax.all._
import docspell.store.impl.Implicits._ import docspell.store.impl.Implicits._
import docspell.store.records._ import docspell.store.records._
import doobie._ import doobie._
import doobie.implicits._ import doobie.implicits._
//import org.log4s._
object QPeriodicTask { object QPeriodicTask {
// private[this] val logger = getLogger
def clearWorkers(name: Ident): ConnectionIO[Int] = { def clearWorkers(name: Ident): ConnectionIO[Int] = {
val worker = RPeriodicTask.Columns.worker val worker = RPeriodicTask.Columns.worker
updateRow(RPeriodicTask.table, worker.is(name), worker.setTo[Ident](None)).update.run updateRow(RPeriodicTask.table, worker.is(name), worker.setTo[Ident](None)).update.run
} }
def setWorker(pid: Ident, name: Ident): ConnectionIO[Int] = { def setWorker(pid: Ident, name: Ident, ts: Timestamp): ConnectionIO[Int] = {
val id = RPeriodicTask.Columns.id val id = RPeriodicTask.Columns.id
val worker = RPeriodicTask.Columns.worker val worker = RPeriodicTask.Columns.worker
updateRow(RPeriodicTask.table, and(id.is(pid), worker.isNull), worker.setTo(name)).update.run val marked = RPeriodicTask.Columns.marked
updateRow(
RPeriodicTask.table,
and(id.is(pid), worker.isNull),
commas(worker.setTo(name), marked.setTo(ts))
).update.run
} }
def unsetWorker( def unsetWorker(
@ -37,10 +38,17 @@ object QPeriodicTask {
).update.run ).update.run
} }
def findNext: ConnectionIO[Option[RPeriodicTask]] = { def findNext(excl: Option[Ident]): ConnectionIO[Option[RPeriodicTask]] = {
val order = orderBy(RPeriodicTask.Columns.nextrun.f) ++ fr"ASC" val enabled = RPeriodicTask.Columns.enabled
val pid = RPeriodicTask.Columns.id
val order = orderBy(RPeriodicTask.Columns.nextrun.f) ++ fr"ASC"
val where = excl match {
case Some(id) => and(pid.isNot(id), enabled.is(true))
case None => enabled.is(true)
}
val sql = val sql =
selectSimple(RPeriodicTask.Columns.all, RPeriodicTask.table, Fragment.empty) ++ order selectSimple(RPeriodicTask.Columns.all, RPeriodicTask.table, where) ++ order
sql.query[RPeriodicTask].streamWithChunkSize(2).take(1).compile.last sql.query[RPeriodicTask].streamWithChunkSize(2).take(1).compile.last
} }

View File

@ -29,7 +29,7 @@ object JobQueue {
worker: Ident, worker: Ident,
retryPause: Duration retryPause: Duration
): F[Option[RJob]] = ): F[Option[RJob]] =
logger.fdebug("Select next job") *> QJob.takeNextJob(store)(prio, worker, retryPause) logger.ftrace("Select next job") *> QJob.takeNextJob(store)(prio, worker, retryPause)
def insert(job: RJob): F[Unit] = def insert(job: RJob): F[Unit] =
store store

View File

@ -0,0 +1,17 @@
package docspell.store.queue
sealed trait Marked[+A] {}
object Marked {
final case class Found[A](value: A) extends Marked[A]
final case object NotFound extends Marked[Nothing]
final case object NotMarkable extends Marked[Nothing]
def found[A](v: A): Marked[A] = Found(v)
def notFound[A]: Marked[A] = NotFound
def notMarkable[A]: Marked[A] = NotMarkable
}

View File

@ -2,7 +2,6 @@ package docspell.store.queue
import cats.effect._ import cats.effect._
import cats.implicits._ import cats.implicits._
import fs2.Stream
import org.log4s.getLogger import org.log4s.getLogger
import com.github.eikek.fs2calev._ import com.github.eikek.fs2calev._
import docspell.common._ import docspell.common._
@ -20,7 +19,10 @@ trait PeriodicTaskStore[F[_]] {
* care of unmarking the task after use and updating `nextRun` with * care of unmarking the task after use and updating `nextRun` with
* the next timestamp. * the next timestamp.
*/ */
def takeNext(worker: Ident): Resource[F, Option[RPeriodicTask]] def takeNext(
worker: Ident,
excludeId: Option[Ident]
): Resource[F, Marked[RPeriodicTask]]
def clearMarks(name: Ident): F[Unit] def clearMarks(name: Ident): F[Unit]
@ -33,6 +35,8 @@ trait PeriodicTaskStore[F[_]] {
/** Adds the task only if it not already exists. /** Adds the task only if it not already exists.
*/ */
def add(task: RPeriodicTask): F[AddResult] def add(task: RPeriodicTask): F[AddResult]
def findJoexNodes: F[Vector[RNode]]
} }
object PeriodicTaskStore { object PeriodicTaskStore {
@ -40,38 +44,37 @@ object PeriodicTaskStore {
def create[F[_]: Sync](store: Store[F]): Resource[F, PeriodicTaskStore[F]] = def create[F[_]: Sync](store: Store[F]): Resource[F, PeriodicTaskStore[F]] =
Resource.pure[F, PeriodicTaskStore[F]](new PeriodicTaskStore[F] { Resource.pure[F, PeriodicTaskStore[F]](new PeriodicTaskStore[F] {
println(s"$store")
def takeNext(worker: Ident): Resource[F, Option[RPeriodicTask]] = { def takeNext(
val chooseNext: F[Either[String, Option[RPeriodicTask]]] = worker: Ident,
getNext.flatMap { excludeId: Option[Ident]
): Resource[F, Marked[RPeriodicTask]] = {
val chooseNext: F[Marked[RPeriodicTask]] =
getNext(excludeId).flatMap {
case Some(pj) => case Some(pj) =>
mark(pj.id, worker).map { mark(pj.id, worker).map {
case true => Right(Some(pj.copy(worker = worker.some))) case true => Marked.found(pj.copy(worker = worker.some))
case false => Left("Cannot mark periodic task") case false => Marked.notMarkable
} }
case None => case None =>
val result: Either[String, Option[RPeriodicTask]] = Marked.notFound.pure[F]
Right(None)
result.pure[F]
} }
val get =
Stream.eval(chooseNext).repeat.take(10).find(_.isRight).compile.lastOrError Resource.make(chooseNext)({
val r = Resource.make(get)({ case Marked.Found(pj) => unmark(pj)
case Right(Some(pj)) => unmark(pj) case _ => ().pure[F]
case _ => ().pure[F]
}) })
r.flatMap {
case Right(job) => Resource.pure(job)
case Left(err) => Resource.liftF(Sync[F].raiseError(new Exception(err)))
}
} }
def getNext: F[Option[RPeriodicTask]] = def getNext(excl: Option[Ident]): F[Option[RPeriodicTask]] =
store.transact(QPeriodicTask.findNext) store.transact(QPeriodicTask.findNext(excl))
def mark(pid: Ident, name: Ident): F[Boolean] = def mark(pid: Ident, name: Ident): F[Boolean] =
store.transact(QPeriodicTask.setWorker(pid, name)).map(_ > 0) Timestamp
.current[F]
.flatMap(now =>
store.transact(QPeriodicTask.setWorker(pid, name, now)).map(_ > 0)
)
def unmark(job: RPeriodicTask): F[Unit] = def unmark(job: RPeriodicTask): F[Unit] =
for { for {
@ -98,16 +101,15 @@ object PeriodicTaskStore {
def insert(task: RPeriodicTask): F[Unit] = { def insert(task: RPeriodicTask): F[Unit] = {
val update = store.transact(RPeriodicTask.update(task)) val update = store.transact(RPeriodicTask.update(task))
val insertAttempt = store.transact(RPeriodicTask.insert(task)) val insertAttempt = store.transact(RPeriodicTask.insert(task)).attempt.map {
.attempt.map { case Right(n) => n > 0
case Right(n) => n > 0 case Left(_) => false
case Left(_) => false }
}
for { for {
n1 <- update n1 <- update
ins <- if (n1 == 0) insertAttempt else true.pure[F] ins <- if (n1 == 0) insertAttempt else true.pure[F]
_ <- if (ins) 1.pure[F] else update _ <- if (ins) 1.pure[F] else update
} yield () } yield ()
} }
@ -116,5 +118,9 @@ object PeriodicTaskStore {
val exists = RPeriodicTask.exists(task.id) val exists = RPeriodicTask.exists(task.id)
store.add(insert, exists) store.add(insert, exists)
} }
def findJoexNodes: F[Vector[RNode]] =
store.transact(RNode.findAll(NodeType.Joex))
}) })
} }

View File

@ -46,4 +46,7 @@ object RInvitation {
_ <- delete(invite) _ <- delete(invite)
} yield inv > 0 } yield inv > 0
} }
def deleteOlderThan(ts: Timestamp): ConnectionIO[Int] =
deleteFrom(table, created.isLt(ts)).update.run
} }

View File

@ -8,6 +8,7 @@ import com.github.eikek.calev.CalEvent
import docspell.common._ import docspell.common._
import docspell.store.impl.Column import docspell.store.impl.Column
import docspell.store.impl.Implicits._ import docspell.store.impl.Implicits._
import io.circe.Encoder
/** A periodic task is a special job description, that shares a few /** A periodic task is a special job description, that shares a few
* properties of a `RJob`. It must provide all information to create * properties of a `RJob`. It must provide all information to create
@ -62,8 +63,6 @@ object RPeriodicTask {
subject: String, subject: String,
submitter: Ident, submitter: Ident,
priority: Priority, priority: Priority,
worker: Option[Ident],
marked: Option[Timestamp],
timer: CalEvent timer: CalEvent
): F[RPeriodicTask] = ): F[RPeriodicTask] =
Ident Ident
@ -81,8 +80,8 @@ object RPeriodicTask {
subject, subject,
submitter, submitter,
priority, priority,
worker, None,
marked, None,
timer, timer,
timer timer
.nextElapse(now.atZone(Timestamp.UTC)) .nextElapse(now.atZone(Timestamp.UTC))
@ -94,6 +93,18 @@ object RPeriodicTask {
} }
) )
def createJson[F[_]: Sync, A](
enabled: Boolean,
task: Ident,
group: Ident,
args: A,
subject: String,
submitter: Ident,
priority: Priority,
timer: CalEvent
)(implicit E: Encoder[A]): F[RPeriodicTask] =
create[F](enabled, task, group, E(args).noSpaces, subject, submitter, priority, timer)
val table = fr"periodic_task" val table = fr"periodic_task"
object Columns { object Columns {

View File

@ -38,10 +38,13 @@ object Dependencies {
val ViewerJSVersion = "0.5.8" val ViewerJSVersion = "0.5.8"
val calev = Seq( val calevCore = Seq(
"com.github.eikek" %% "calev-core" % CalevVersion, "com.github.eikek" %% "calev-core" % CalevVersion,
)
val calevFs2 = Seq(
"com.github.eikek" %% "calev-fs2" % CalevVersion "com.github.eikek" %% "calev-fs2" % CalevVersion
) )
val calev = calevFs2 ++ calevCore
val jclOverSlf4j = Seq( val jclOverSlf4j = Seq(
"org.slf4j" % "jcl-over-slf4j" % Slf4jVersion "org.slf4j" % "jcl-over-slf4j" % Slf4jVersion