diff --git a/.scalafmt.conf b/.scalafmt.conf index ab23da00..ccaa37ac 100644 --- a/.scalafmt.conf +++ b/.scalafmt.conf @@ -3,7 +3,7 @@ version = "2.4.2" align = more #align.arrowEnumeratorGenerator = true -maxColumn = 100 +maxColumn = 90 rewrite.rules = [ AvoidInfix diff --git a/build.sbt b/build.sbt index 5ebd8526..68b11ccc 100644 --- a/build.sbt +++ b/build.sbt @@ -141,6 +141,7 @@ val common = project.in(file("modules/common")). Dependencies.fs2 ++ Dependencies.circe ++ Dependencies.loggingApi ++ + Dependencies.calevCore ++ Dependencies.pureconfig.map(_ % "optional") ) @@ -194,7 +195,8 @@ val store = project.in(file("modules/store")). Dependencies.databases ++ Dependencies.flyway ++ Dependencies.loggingApi ++ - Dependencies.emil + Dependencies.emil ++ + Dependencies.calev ).dependsOn(common) val extract = project.in(file("modules/extract")). @@ -260,7 +262,8 @@ val joexapi = project.in(file("modules/joexapi")). settings( name := "docspell-joexapi", libraryDependencies ++= - Dependencies.circe, + Dependencies.circe ++ + Dependencies.http4sClient, openapiTargetLanguage := Language.Scala, openapiPackage := Pkg("docspell.joexapi.model"), openapiSpec := (Compile/resourceDirectory).value/"joex-openapi.yml" @@ -302,7 +305,7 @@ val backend = project.in(file("modules/backend")). Dependencies.bcrypt ++ Dependencies.http4sClient ++ Dependencies.emil - ).dependsOn(store) + ).dependsOn(store, joexapi) val webapp = project.in(file("modules/webapp")). disablePlugins(RevolverPlugin). diff --git a/modules/backend/src/main/scala/docspell/backend/BackendApp.scala b/modules/backend/src/main/scala/docspell/backend/BackendApp.scala index 18e7ef37..39b1e0b7 100644 --- a/modules/backend/src/main/scala/docspell/backend/BackendApp.scala +++ b/modules/backend/src/main/scala/docspell/backend/BackendApp.scala @@ -25,6 +25,7 @@ trait BackendApp[F[_]] { def job: OJob[F] def item: OItem[F] def mail: OMail[F] + def joex: OJoex[F] } object BackendApp { @@ -44,9 +45,10 @@ object BackendApp { tagImpl <- OTag[F](store) equipImpl <- OEquipment[F](store) orgImpl <- OOrganization(store) - uploadImpl <- OUpload(store, queue, cfg, httpClientEc) + joexImpl <- OJoex.create(httpClientEc, store) + uploadImpl <- OUpload(store, queue, cfg, joexImpl) nodeImpl <- ONode(store) - jobImpl <- OJob(store, httpClientEc) + jobImpl <- OJob(store, joexImpl) itemImpl <- OItem(store) mailImpl <- OMail(store, JavaMailEmil(blocker)) } yield new BackendApp[F] { @@ -62,6 +64,7 @@ object BackendApp { val job = jobImpl val item = itemImpl val mail = mailImpl + val joex = joexImpl } def apply[F[_]: ConcurrentEffect: ContextShift]( diff --git a/modules/backend/src/main/scala/docspell/backend/ops/OJob.scala b/modules/backend/src/main/scala/docspell/backend/ops/OJob.scala index 98c675c2..21cdfb1e 100644 --- a/modules/backend/src/main/scala/docspell/backend/ops/OJob.scala +++ b/modules/backend/src/main/scala/docspell/backend/ops/OJob.scala @@ -1,15 +1,13 @@ package docspell.backend.ops import cats.implicits._ -import cats.effect.{ConcurrentEffect, Resource} +import cats.effect._ import docspell.backend.ops.OJob.{CollectiveQueueState, JobCancelResult} import docspell.common.{Ident, JobState} import docspell.store.Store import docspell.store.queries.QJob import docspell.store.records.{RJob, RJobLog} -import scala.concurrent.ExecutionContext - trait OJob[F[_]] { def queueState(collective: Ident, maxResults: Int): F[CollectiveQueueState] @@ -36,9 +34,9 @@ object OJob { jobs.filter(_.job.state == JobState.Running) } - def apply[F[_]: ConcurrentEffect]( + def apply[F[_]: Sync]( store: Store[F], - clientEC: ExecutionContext + joex: OJoex[F] ): Resource[F, OJob[F]] = Resource.pure[F, OJob[F]](new OJob[F] { @@ -70,8 +68,7 @@ object OJob { } def tryCancel(job: RJob, worker: Ident): F[JobCancelResult] = - OJoex - .cancelJob(job.id, worker, store, clientEC) + joex.cancelJob(job.id, worker) .map(flag => if (flag) JobCancelResult.CancelRequested else JobCancelResult.JobNotFound) for { diff --git a/modules/backend/src/main/scala/docspell/backend/ops/OJoex.scala b/modules/backend/src/main/scala/docspell/backend/ops/OJoex.scala index 151b8485..e9283dc4 100644 --- a/modules/backend/src/main/scala/docspell/backend/ops/OJoex.scala +++ b/modules/backend/src/main/scala/docspell/backend/ops/OJoex.scala @@ -1,62 +1,40 @@ package docspell.backend.ops import cats.implicits._ -import cats.effect.ConcurrentEffect +import cats.effect._ import docspell.common.{Ident, NodeType} +import docspell.joexapi.client.JoexClient import docspell.store.Store import docspell.store.records.RNode -import org.http4s.client.blaze.BlazeClientBuilder -import org.http4s.Method._ -import org.http4s.{Request, Uri} import scala.concurrent.ExecutionContext -import org.log4s._ + +trait OJoex[F[_]] { + + def notifyAllNodes: F[Unit] + + def cancelJob(job: Ident, worker: Ident): F[Boolean] + +} object OJoex { - private[this] val logger = getLogger - def notifyAll[F[_]: ConcurrentEffect]( - store: Store[F], - clientExecutionContext: ExecutionContext - ): F[Unit] = - for { - nodes <- store.transact(RNode.findAll(NodeType.Joex)) - _ <- nodes.toList.traverse(notifyJoex[F](clientExecutionContext)) - } yield () + def apply[F[_]: Sync](client: JoexClient[F], store: Store[F]): Resource[F, OJoex[F]] = + Resource.pure[F, OJoex[F]](new OJoex[F] { + def notifyAllNodes: F[Unit] = + for { + nodes <- store.transact(RNode.findAll(NodeType.Joex)) + _ <- nodes.toList.traverse(n => client.notifyJoexIgnoreErrors(n.url)) + } yield () - def cancelJob[F[_]: ConcurrentEffect]( - jobId: Ident, - worker: Ident, - store: Store[F], - clientEc: ExecutionContext - ): F[Boolean] = - for { - node <- store.transact(RNode.findById(worker)) - cancel <- node.traverse(joexCancel(clientEc)(_, jobId)) - } yield cancel.getOrElse(false) + def cancelJob(job: Ident, worker: Ident): F[Boolean] = + for { + node <- store.transact(RNode.findById(worker)) + cancel <- node.traverse(n => client.cancelJob(n.url, job)) + } yield cancel.isDefined + }) - private def joexCancel[F[_]: ConcurrentEffect]( - ec: ExecutionContext - )(node: RNode, job: Ident): F[Boolean] = { - val notifyUrl = node.url / "api" / "v1" / "job" / job.id / "cancel" - BlazeClientBuilder[F](ec).resource.use { client => - val req = Request[F](POST, Uri.unsafeFromString(notifyUrl.asString)) - client.expect[String](req).map(_ => true) - } - } + def create[F[_]: ConcurrentEffect](ec: ExecutionContext, store: Store[F]): Resource[F, OJoex[F]] = + JoexClient.resource(ec).flatMap(client => apply(client, store)) - private def notifyJoex[F[_]: ConcurrentEffect](ec: ExecutionContext)(node: RNode): F[Unit] = { - val notifyUrl = node.url / "api" / "v1" / "notify" - val execute = BlazeClientBuilder[F](ec).resource.use { client => - val req = Request[F](POST, Uri.unsafeFromString(notifyUrl.asString)) - client.expect[String](req).map(_ => ()) - } - execute.attempt.map { - case Right(_) => - () - case Left(_) => - logger.warn(s"Notifying Joex instance '${node.id.id}/${node.url.asString}' failed.") - () - } - } } diff --git a/modules/backend/src/main/scala/docspell/backend/ops/OUpload.scala b/modules/backend/src/main/scala/docspell/backend/ops/OUpload.scala index 37cf4a2d..1f20ff30 100644 --- a/modules/backend/src/main/scala/docspell/backend/ops/OUpload.scala +++ b/modules/backend/src/main/scala/docspell/backend/ops/OUpload.scala @@ -2,7 +2,7 @@ package docspell.backend.ops import bitpeace.MimetypeHint import cats.implicits._ -import cats.effect.{ConcurrentEffect, Effect, Resource} +import cats.effect._ import docspell.backend.Config import fs2.Stream import docspell.common._ @@ -12,8 +12,6 @@ import docspell.store.queue.JobQueue import docspell.store.records.{RCollective, RJob, RSource} import org.log4s._ -import scala.concurrent.ExecutionContext - trait OUpload[F[_]] { def submit(data: OUpload.UploadData[F], account: AccountId): F[OUpload.UploadResult] @@ -51,11 +49,11 @@ object OUpload { case object NoSource extends UploadResult } - def apply[F[_]: ConcurrentEffect]( + def apply[F[_]: Sync]( store: Store[F], queue: JobQueue[F], cfg: Config, - httpClientEC: ExecutionContext + joex: OJoex[F] ): Resource[F, OUpload[F]] = Resource.pure[F, OUpload[F]](new OUpload[F] { @@ -92,7 +90,7 @@ object OUpload { for { _ <- logger.fdebug(s"Storing jobs: $jobs") _ <- queue.insertAll(jobs) - _ <- OJoex.notifyAll(store, httpClientEC) + _ <- joex.notifyAllNodes } yield UploadResult.Success private def saveFile(file: File[F]): F[Option[ProcessItemArgs.File]] = @@ -109,7 +107,7 @@ object OUpload { }, id => Some(ProcessItemArgs.File(file.name, id)))) private def checkFileList(files: Seq[ProcessItemArgs.File]): F[Either[UploadResult, Unit]] = - Effect[F].pure(if (files.isEmpty) Left(UploadResult.NoFiles) else Right(())) + Sync[F].pure(if (files.isEmpty) Left(UploadResult.NoFiles) else Right(())) private def makeJobs( args: Vector[ProcessItemArgs], diff --git a/modules/common/src/main/scala/docspell/common/Hash.scala b/modules/common/src/main/scala/docspell/common/Hash.scala new file mode 100644 index 00000000..2737c3e7 --- /dev/null +++ b/modules/common/src/main/scala/docspell/common/Hash.scala @@ -0,0 +1,28 @@ +package docspell.common + +import scodec.bits.ByteVector +import java.nio.charset.StandardCharsets + +final class Hash(bytes: ByteVector) { + + private def digest(name: String): String = + bytes.digest(name).toHex.toLowerCase + + def sha256: String = + digest("SHA-256") + + def md5: String = + digest("MD5") + + def add(str: String): Hash = + new Hash(bytes ++ ByteVector.view(str.getBytes(StandardCharsets.UTF_8))) + + def add(id: Ident): Hash = + add(id.id) +} + +object Hash { + + def empty: Hash = new Hash(ByteVector.empty) + +} diff --git a/modules/common/src/main/scala/docspell/common/Timestamp.scala b/modules/common/src/main/scala/docspell/common/Timestamp.scala index 4bafba64..d5298c56 100644 --- a/modules/common/src/main/scala/docspell/common/Timestamp.scala +++ b/modules/common/src/main/scala/docspell/common/Timestamp.scala @@ -4,6 +4,8 @@ import java.time.{Instant, LocalDate, ZoneId} import cats.effect.Sync import io.circe.{Decoder, Encoder} +import java.time.LocalDateTime +import java.time.ZonedDateTime case class Timestamp(value: Instant) { @@ -14,22 +16,40 @@ case class Timestamp(value: Instant) { def minus(d: Duration): Timestamp = Timestamp(value.minusNanos(d.nanos)) + def - (d: Duration): Timestamp = + minus(d) + def minusHours(n: Long): Timestamp = Timestamp(value.minusSeconds(n * 60 * 60)) - def toDate: LocalDate = - value.atZone(ZoneId.of("UTC")).toLocalDate + def toUtcDate: LocalDate = + value.atZone(Timestamp.UTC).toLocalDate + + def toUtcDateTime: LocalDateTime = + value.atZone(Timestamp.UTC).toLocalDateTime + + def atZone(zone: ZoneId): ZonedDateTime = + value.atZone(zone) + + def atUTC: ZonedDateTime = atZone(Timestamp.UTC) def asString: String = value.toString + + def < (other: Timestamp): Boolean = + this.value.isBefore(other.value) } object Timestamp { + val UTC = ZoneId.of("UTC") val Epoch = Timestamp(Instant.EPOCH) def current[F[_]: Sync]: F[Timestamp] = Sync[F].delay(Timestamp(Instant.now)) + def from(zd: ZonedDateTime): Timestamp = + Timestamp(zd.toInstant) + implicit val encodeTimestamp: Encoder[Timestamp] = BaseJsonCodecs.encodeInstantEpoch.contramap(_.value) diff --git a/modules/common/src/main/scala/docspell/common/pureconfig/Implicits.scala b/modules/common/src/main/scala/docspell/common/pureconfig/Implicits.scala index 7fc880ab..ceee5629 100644 --- a/modules/common/src/main/scala/docspell/common/pureconfig/Implicits.scala +++ b/modules/common/src/main/scala/docspell/common/pureconfig/Implicits.scala @@ -4,6 +4,7 @@ import docspell.common._ import _root_.pureconfig._ import _root_.pureconfig.error.{CannotConvert, FailureReason} import scodec.bits.ByteVector +import com.github.eikek.calev.CalEvent import scala.reflect.ClassTag @@ -31,6 +32,10 @@ object Implicits { else ByteVector.encodeUtf8(str).left.map(ex => s"Invalid utf8 string: ${ex.getMessage}") }) + implicit val caleventReader: ConfigReader[CalEvent] = + ConfigReader[String].emap(reason(CalEvent.parse)) + + def reason[A: ClassTag](f: String => Either[String, A]): String => Either[FailureReason, A] = in => f(in).left.map(str => CannotConvert(in, implicitly[ClassTag[A]].runtimeClass.toString, str)) diff --git a/modules/joex/src/main/resources/reference.conf b/modules/joex/src/main/resources/reference.conf index 9712f54d..cd345cfb 100644 --- a/modules/joex/src/main/resources/reference.conf +++ b/modules/joex/src/main/resources/reference.conf @@ -64,6 +64,58 @@ docspell.joex { wakeup-period = "30 minutes" } + periodic-scheduler { + + # Each scheduler needs a unique name. This defaults to the node + # name, which must be unique, too. + name = ${docspell.joex.app-id} + + # A fallback to start looking for due periodic tasks regularily. + # Usually joex instances should be notified via REST calls if + # external processes change tasks. But these requests may get + # lost. + wakeup-period = "10 minutes" + } + + # Docspell uses periodic house keeping tasks, like cleaning expired + # invites, that can be configured here. + house-keeping { + + # When the house keeping tasks execute. Default is to run every + # week. + schedule = "Sun *-*-* 00:00:00" + + # This task removes invitation keys that have been created but not + # used. The timespan here must be greater than the `invite-time' + # setting in the rest server config file. + cleanup-invites = { + + # Whether this task is enabled. + enabled = true + + # The minimum age of invites to be deleted. + older-than = "30 days" + } + + # Jobs store their log output in the database. Normally this data + # is only interesting for some period of time. The processing logs + # of old files can be removed eventually. + cleanup-jobs = { + + # Whether this task is enabled. + enabled = true + + # The minimum age of jobs to delete. It is matched against the + # `finished' timestamp. + older-than = "30 days" + + # This defines how many jobs are deleted in one transaction. + # Since the data to delete may get large, it can be configured + # whether more or less memory should be used. + delete-batch = "100" + } + } + # Configuration of text extraction extraction { # For PDF files it is first tried to read the text parts of the diff --git a/modules/joex/src/main/scala/docspell/joex/Config.scala b/modules/joex/src/main/scala/docspell/joex/Config.scala index 55790144..1d678766 100644 --- a/modules/joex/src/main/scala/docspell/joex/Config.scala +++ b/modules/joex/src/main/scala/docspell/joex/Config.scala @@ -1,10 +1,11 @@ package docspell.joex import docspell.common.{Ident, LenientUri} -import docspell.joex.scheduler.SchedulerConfig +import docspell.joex.scheduler.{PeriodicSchedulerConfig, SchedulerConfig} import docspell.store.JdbcConfig import docspell.convert.ConvertConfig import docspell.extract.ExtractConfig +import docspell.joex.hk.HouseKeepingConfig case class Config( appId: Ident, @@ -12,6 +13,8 @@ case class Config( bind: Config.Bind, jdbc: JdbcConfig, scheduler: SchedulerConfig, + periodicScheduler: PeriodicSchedulerConfig, + houseKeeping: HouseKeepingConfig, extraction: ExtractConfig, convert: ConvertConfig ) diff --git a/modules/joex/src/main/scala/docspell/joex/JoexApp.scala b/modules/joex/src/main/scala/docspell/joex/JoexApp.scala index 0bcbba25..bbb052a6 100644 --- a/modules/joex/src/main/scala/docspell/joex/JoexApp.scala +++ b/modules/joex/src/main/scala/docspell/joex/JoexApp.scala @@ -1,7 +1,7 @@ package docspell.joex import docspell.common.Ident -import docspell.joex.scheduler.Scheduler +import docspell.joex.scheduler.{PeriodicScheduler, Scheduler} import docspell.store.records.RJobLog trait JoexApp[F[_]] { @@ -10,6 +10,8 @@ trait JoexApp[F[_]] { def scheduler: Scheduler[F] + def periodicScheduler: PeriodicScheduler[F] + def findLogs(jobId: Ident): F[Vector[RJobLog]] /** Shuts down the job executor. diff --git a/modules/joex/src/main/scala/docspell/joex/JoexAppImpl.scala b/modules/joex/src/main/scala/docspell/joex/JoexAppImpl.scala index 8e4fbccf..f613b736 100644 --- a/modules/joex/src/main/scala/docspell/joex/JoexAppImpl.scala +++ b/modules/joex/src/main/scala/docspell/joex/JoexAppImpl.scala @@ -3,9 +3,12 @@ package docspell.joex import cats.implicits._ import cats.effect._ import docspell.common.{Ident, NodeType, ProcessItemArgs} +import docspell.joex.hk._ import docspell.joex.process.ItemHandler -import docspell.joex.scheduler.{JobTask, Scheduler, SchedulerBuilder} +import docspell.joex.scheduler._ +import docspell.joexapi.client.JoexClient import docspell.store.Store +import docspell.store.queue._ import docspell.store.ops.ONode import docspell.store.records.RJobLog import fs2.concurrent.SignallingRef @@ -16,15 +19,21 @@ final class JoexAppImpl[F[_]: ConcurrentEffect: ContextShift: Timer]( cfg: Config, nodeOps: ONode[F], store: Store[F], + pstore: PeriodicTaskStore[F], termSignal: SignallingRef[F, Boolean], - val scheduler: Scheduler[F] + val scheduler: Scheduler[F], + val periodicScheduler: PeriodicScheduler[F] ) extends JoexApp[F] { def init: F[Unit] = { - val run = scheduler.start.compile.drain + val run = scheduler.start.compile.drain + val prun = periodicScheduler.start.compile.drain for { + _ <- scheduleBackgroundTasks _ <- ConcurrentEffect[F].start(run) + _ <- ConcurrentEffect[F].start(prun) _ <- scheduler.periodicAwake + _ <- periodicScheduler.periodicAwake _ <- nodeOps.register(cfg.appId, NodeType.Joex, cfg.baseUrl) } yield () } @@ -36,8 +45,10 @@ final class JoexAppImpl[F[_]: ConcurrentEffect: ContextShift: Timer]( nodeOps.unregister(cfg.appId) def initShutdown: F[Unit] = - scheduler.shutdown(false) *> termSignal.set(true) + periodicScheduler.shutdown *> scheduler.shutdown(false) *> termSignal.set(true) + private def scheduleBackgroundTasks: F[Unit] = + HouseKeepingTask.periodicTask[F](cfg.houseKeeping.schedule).flatMap(pstore.insert) } object JoexAppImpl { @@ -46,12 +57,17 @@ object JoexAppImpl { cfg: Config, termSignal: SignallingRef[F, Boolean], connectEC: ExecutionContext, + clientEC: ExecutionContext, blocker: Blocker ): Resource[F, JoexApp[F]] = for { + client <- JoexClient.resource(clientEC) store <- Store.create(cfg.jdbc, connectEC, blocker) + queue <- JobQueue(store) + pstore <- PeriodicTaskStore.create(store) nodeOps <- ONode(store) sch <- SchedulerBuilder(cfg.scheduler, blocker, store) + .withQueue(queue) .withTask( JobTask.json( ProcessItemArgs.taskName, @@ -59,8 +75,23 @@ object JoexAppImpl { ItemHandler.onCancel[F] ) ) + .withTask( + JobTask.json( + HouseKeepingTask.taskName, + HouseKeepingTask[F](cfg), + HouseKeepingTask.onCancel[F] + ) + ) .resource - app = new JoexAppImpl(cfg, nodeOps, store, termSignal, sch) + psch <- PeriodicScheduler.create( + cfg.periodicScheduler, + sch, + queue, + pstore, + client, + Timer[F] + ) + app = new JoexAppImpl(cfg, nodeOps, store, pstore, termSignal, sch, psch) appR <- Resource.make(app.init.map(_ => app))(_.shutdown) } yield appR } diff --git a/modules/joex/src/main/scala/docspell/joex/JoexServer.scala b/modules/joex/src/main/scala/docspell/joex/JoexServer.scala index 00919923..c5966549 100644 --- a/modules/joex/src/main/scala/docspell/joex/JoexServer.scala +++ b/modules/joex/src/main/scala/docspell/joex/JoexServer.scala @@ -24,13 +24,14 @@ object JoexServer { def stream[F[_]: ConcurrentEffect: ContextShift]( cfg: Config, connectEC: ExecutionContext, + clientEC: ExecutionContext, blocker: Blocker )(implicit T: Timer[F]): Stream[F, Nothing] = { val app = for { signal <- Resource.liftF(SignallingRef[F, Boolean](false)) exitCode <- Resource.liftF(Ref[F].of(ExitCode.Success)) - joexApp <- JoexAppImpl.create[F](cfg, signal, connectEC, blocker) + joexApp <- JoexAppImpl.create[F](cfg, signal, connectEC, clientEC, blocker) httpApp = Router( "/api/info" -> InfoRoutes(), diff --git a/modules/joex/src/main/scala/docspell/joex/Main.scala b/modules/joex/src/main/scala/docspell/joex/Main.scala index a800acdb..4c1f7319 100644 --- a/modules/joex/src/main/scala/docspell/joex/Main.scala +++ b/modules/joex/src/main/scala/docspell/joex/Main.scala @@ -13,10 +13,10 @@ import org.log4s._ object Main extends IOApp { private[this] val logger = getLogger - val blockingEc: ExecutionContext = ExecutionContext.fromExecutor( + val blockingEC: ExecutionContext = ExecutionContext.fromExecutor( Executors.newCachedThreadPool(ThreadFactories.ofName("docspell-joex-blocking")) ) - val blocker = Blocker.liftExecutionContext(blockingEc) + val blocker = Blocker.liftExecutionContext(blockingEC) val connectEC: ExecutionContext = ExecutionContext.fromExecutorService( Executors.newFixedThreadPool(5, ThreadFactories.ofName("docspell-joex-dbconnect")) ) @@ -52,6 +52,6 @@ object Main extends IOApp { cfg.baseUrl ) logger.info(s"\n${banner.render("***>")}") - JoexServer.stream[IO](cfg, connectEC, blocker).compile.drain.as(ExitCode.Success) + JoexServer.stream[IO](cfg, connectEC, blockingEC, blocker).compile.drain.as(ExitCode.Success) } } diff --git a/modules/joex/src/main/scala/docspell/joex/hk/CleanupInvitesTask.scala b/modules/joex/src/main/scala/docspell/joex/hk/CleanupInvitesTask.scala new file mode 100644 index 00000000..345d488d --- /dev/null +++ b/modules/joex/src/main/scala/docspell/joex/hk/CleanupInvitesTask.scala @@ -0,0 +1,26 @@ +package docspell.joex.hk + +import cats.implicits._ +import cats.effect._ + +import docspell.common._ +import docspell.joex.scheduler.Task +import docspell.store.records._ + +object CleanupInvitesTask { + + def apply[F[_]: Sync](cfg: HouseKeepingConfig.CleanupInvites): Task[F, Unit, Unit] = + Task { ctx => + if (cfg.enabled) { + for { + now <- Timestamp.current[F] + ts = now - cfg.olderThan + _ <- ctx.logger.info(s"Cleanup invitations older than $ts") + n <- ctx.store.transact(RInvitation.deleteOlderThan(ts)) + _ <- ctx.logger.info(s"Removed $n invitations") + } yield () + } else { + ctx.logger.info("CleanupInvites task is disabled in the configuration") + } + } +} diff --git a/modules/joex/src/main/scala/docspell/joex/hk/CleanupJobsTask.scala b/modules/joex/src/main/scala/docspell/joex/hk/CleanupJobsTask.scala new file mode 100644 index 00000000..43105814 --- /dev/null +++ b/modules/joex/src/main/scala/docspell/joex/hk/CleanupJobsTask.scala @@ -0,0 +1,36 @@ +package docspell.joex.hk + +import cats.implicits._ +import cats.effect._ +import fs2.Stream + +import docspell.common._ +import docspell.joex.scheduler.Task +import docspell.store.Store +import docspell.store.records._ + +object CleanupJobsTask { + + def apply[F[_]: Sync](cfg: HouseKeepingConfig.CleanupJobs): Task[F, Unit, Unit] = + Task { ctx => + if (cfg.enabled) { + for { + now <- Timestamp.current[F] + ts = now - cfg.olderThan + _ <- ctx.logger.info(s"Cleanup jobs older than $ts") + n <- deleteDoneJobs(ctx.store, ts, cfg.deleteBatch) + _ <- ctx.logger.info(s"Removed $n jobs") + } yield () + } else { + ctx.logger.info("CleanupJobs task is disabled in the configuration") + } + } + + def deleteDoneJobs[F[_]: Sync](store: Store[F], ts: Timestamp, batch: Int): F[Int] = + Stream + .eval(store.transact(RJob.deleteDoneAndOlderThan(ts, batch))) + .repeat + .takeWhile(_ > 0) + .compile + .foldMonoid +} diff --git a/modules/joex/src/main/scala/docspell/joex/hk/HouseKeepingConfig.scala b/modules/joex/src/main/scala/docspell/joex/hk/HouseKeepingConfig.scala new file mode 100644 index 00000000..217d7d88 --- /dev/null +++ b/modules/joex/src/main/scala/docspell/joex/hk/HouseKeepingConfig.scala @@ -0,0 +1,20 @@ +package docspell.joex.hk + +import com.github.eikek.calev.CalEvent +import docspell.common._ + +import HouseKeepingConfig._ + +case class HouseKeepingConfig( + schedule: CalEvent, + cleanupInvites: CleanupInvites, + cleanupJobs: CleanupJobs +) + +object HouseKeepingConfig { + + case class CleanupInvites(enabled: Boolean, olderThan: Duration) + + case class CleanupJobs(enabled: Boolean, olderThan: Duration, deleteBatch: Int) + +} diff --git a/modules/joex/src/main/scala/docspell/joex/hk/HouseKeepingTask.scala b/modules/joex/src/main/scala/docspell/joex/hk/HouseKeepingTask.scala new file mode 100644 index 00000000..1d4d558d --- /dev/null +++ b/modules/joex/src/main/scala/docspell/joex/hk/HouseKeepingTask.scala @@ -0,0 +1,39 @@ +package docspell.joex.hk + +import cats.implicits._ +import cats.effect._ +import com.github.eikek.calev._ + +import docspell.common._ +import docspell.joex.Config +import docspell.joex.scheduler.Task +import docspell.store.records._ + +object HouseKeepingTask { + private val periodicId = Ident.unsafe("docspell-houskeeping") + val systemGroup: Ident = Ident.unsafe("docspell-system") + + val taskName: Ident = Ident.unsafe("housekeeping") + + def apply[F[_]: Sync](cfg: Config): Task[F, Unit, Unit] = + Task.log[F](_.info(s"Running house-keeping task now")) + .flatMap(_ => CleanupInvitesTask(cfg.houseKeeping.cleanupInvites)) + .flatMap(_ => CleanupJobsTask(cfg.houseKeeping.cleanupJobs)) + + def onCancel[F[_]: Sync]: Task[F, Unit, Unit] = + Task.log(_.warn("Cancelling house-keeping task")) + + def periodicTask[F[_]: Sync](ce: CalEvent): F[RPeriodicTask] = + RPeriodicTask + .createJson( + true, + taskName, + systemGroup, + (), + "Docspell house-keeping", + systemGroup, + Priority.Low, + ce + ) + .map(_.copy(id = periodicId)) +} diff --git a/modules/joex/src/main/scala/docspell/joex/routes/JoexRoutes.scala b/modules/joex/src/main/scala/docspell/joex/routes/JoexRoutes.scala index 2e32275c..1ca68274 100644 --- a/modules/joex/src/main/scala/docspell/joex/routes/JoexRoutes.scala +++ b/modules/joex/src/main/scala/docspell/joex/routes/JoexRoutes.scala @@ -19,7 +19,8 @@ object JoexRoutes { case POST -> Root / "notify" => for { _ <- app.scheduler.notifyChange - resp <- Ok(BasicResult(true, "Scheduler notified.")) + _ <- app.periodicScheduler.notifyChange + resp <- Ok(BasicResult(true, "Schedulers notified.")) } yield resp case GET -> Root / "running" => diff --git a/modules/joex/src/main/scala/docspell/joex/scheduler/PeriodicScheduler.scala b/modules/joex/src/main/scala/docspell/joex/scheduler/PeriodicScheduler.scala new file mode 100644 index 00000000..62d0c345 --- /dev/null +++ b/modules/joex/src/main/scala/docspell/joex/scheduler/PeriodicScheduler.scala @@ -0,0 +1,49 @@ +package docspell.joex.scheduler + +import fs2._ +import fs2.concurrent.SignallingRef +import cats.effect._ + +import docspell.joexapi.client.JoexClient +import docspell.store.queue._ + +/** A periodic scheduler takes care to submit periodic tasks to the + * job queue. + * + * It is run in the background to regularily find a periodic task to + * execute. If the task is due, it will be submitted into the job + * queue where it will be picked up by the scheduler from some joex + * instance. If it is due in the future, a notification is scheduled + * to be received at that time so the task can be looked up again. + */ +trait PeriodicScheduler[F[_]] { + + def config: PeriodicSchedulerConfig + + def start: Stream[F, Nothing] + + def shutdown: F[Unit] + + def periodicAwake: F[Fiber[F, Unit]] + + def notifyChange: F[Unit] +} + +object PeriodicScheduler { + + def create[F[_]: ConcurrentEffect: ContextShift]( + cfg: PeriodicSchedulerConfig, + sch: Scheduler[F], + queue: JobQueue[F], + store: PeriodicTaskStore[F], + client: JoexClient[F], + timer: Timer[F] + ): Resource[F, PeriodicScheduler[F]] = + for { + waiter <- Resource.liftF(SignallingRef(true)) + state <- Resource.liftF(SignallingRef(PeriodicSchedulerImpl.emptyState[F])) + psch = new PeriodicSchedulerImpl[F](cfg, sch, queue, store, client, waiter, state, timer) + _ <- Resource.liftF(psch.init) + } yield psch + +} diff --git a/modules/joex/src/main/scala/docspell/joex/scheduler/PeriodicSchedulerConfig.scala b/modules/joex/src/main/scala/docspell/joex/scheduler/PeriodicSchedulerConfig.scala new file mode 100644 index 00000000..fc5872c6 --- /dev/null +++ b/modules/joex/src/main/scala/docspell/joex/scheduler/PeriodicSchedulerConfig.scala @@ -0,0 +1,8 @@ +package docspell.joex.scheduler + +import docspell.common._ + +case class PeriodicSchedulerConfig( + name: Ident, + wakeupPeriod: Duration +) diff --git a/modules/joex/src/main/scala/docspell/joex/scheduler/PeriodicSchedulerImpl.scala b/modules/joex/src/main/scala/docspell/joex/scheduler/PeriodicSchedulerImpl.scala new file mode 100644 index 00000000..f8de3c8c --- /dev/null +++ b/modules/joex/src/main/scala/docspell/joex/scheduler/PeriodicSchedulerImpl.scala @@ -0,0 +1,184 @@ +package docspell.joex.scheduler + +import fs2._ +import fs2.concurrent.SignallingRef +import cats.effect._ +import cats.implicits._ +import org.log4s.getLogger +import com.github.eikek.fs2calev._ + +import docspell.common._ +import docspell.common.syntax.all._ +import docspell.joexapi.client.JoexClient +import docspell.store.queue._ +import docspell.store.records.RPeriodicTask + +import PeriodicSchedulerImpl.State + +final class PeriodicSchedulerImpl[F[_]: ConcurrentEffect: ContextShift]( + val config: PeriodicSchedulerConfig, + sch: Scheduler[F], + queue: JobQueue[F], + store: PeriodicTaskStore[F], + client: JoexClient[F], + waiter: SignallingRef[F, Boolean], + state: SignallingRef[F, State[F]], + timer: Timer[F] +) extends PeriodicScheduler[F] { + private[this] val logger = getLogger + implicit private val _timer: Timer[F] = timer + + def start: Stream[F, Nothing] = + logger.sinfo("Starting periodic scheduler") ++ + mainLoop + + def shutdown: F[Unit] = + state.modify(_.requestShutdown) + + def periodicAwake: F[Fiber[F, Unit]] = + ConcurrentEffect[F].start( + Stream + .awakeEvery[F](config.wakeupPeriod.toScala) + .evalMap(_ => logger.fdebug("Periodic awake reached") *> notifyChange) + .compile + .drain + ) + + def notifyChange: F[Unit] = + waiter.update(b => !b) + + // internal + + /** + * On startup, get all periodic jobs from this scheduler and remove + * the mark, so they get picked up again. + */ + def init: F[Unit] = + logError("Error clearing marks")(store.clearMarks(config.name)) + + def mainLoop: Stream[F, Nothing] = { + val body: F[Boolean] = + for { + _ <- logger.fdebug(s"Going into main loop") + now <- Timestamp.current[F] + _ <- logger.fdebug(s"Looking for next periodic task") + go <- logThrow("Error getting next task")( + store + .takeNext(config.name, None) + .use({ + case Marked.Found(pj) => + logger + .fdebug(s"Found periodic task '${pj.subject}/${pj.timer.asString}'") *> + (if (isTriggered(pj, now)) submitJob(pj) + else scheduleNotify(pj).map(_ => false)) + case Marked.NotFound => + logger.fdebug("No periodic task found") *> false.pure[F] + case Marked.NotMarkable => + logger.fdebug("Periodic job cannot be marked. Trying again.") *> true + .pure[F] + }) + ) + } yield go + + Stream + .eval(state.get.map(_.shutdownRequest)) + .evalTap( + if (_) logger.finfo[F]("Stopping main loop due to shutdown request.") + else ().pure[F] + ) + .flatMap(if (_) Stream.empty else Stream.eval(cancelNotify *> body)) + .flatMap({ + case true => + mainLoop + case false => + logger.sdebug(s"Waiting for notify") ++ + waiter.discrete.take(2).drain ++ + logger.sdebug(s"Notify signal, going into main loop") ++ + mainLoop + }) + } + + def isTriggered(pj: RPeriodicTask, now: Timestamp): Boolean = + pj.nextrun < now + + def submitJob(pj: RPeriodicTask): F[Boolean] = + store + .findNonFinalJob(pj.id) + .flatMap({ + case Some(job) => + logger.finfo[F]( + s"There is already a job with non-final state '${job.state}' in the queue" + ) *> scheduleNotify(pj) *> false.pure[F] + + case None => + logger.finfo[F](s"Submitting job for periodic task '${pj.task.id}'") *> + pj.toJob.flatMap(queue.insert) *> notifyJoex *> true.pure[F] + }) + + def notifyJoex: F[Unit] = + sch.notifyChange *> store.findJoexNodes.flatMap( + _.traverse(n => client.notifyJoexIgnoreErrors(n.url)).map(_ => ()) + ) + + def scheduleNotify(pj: RPeriodicTask): F[Unit] = + Timestamp + .current[F] + .flatMap(now => + logger.fdebug( + s"Scheduling next notify for timer ${pj.timer.asString} -> ${pj.timer.nextElapse(now.toUtcDateTime)}" + ) + ) *> + ConcurrentEffect[F] + .start( + CalevFs2 + .sleep[F](pj.timer) + .evalMap(_ => notifyChange) + .compile + .drain + ) + .flatMap(fb => state.modify(_.setNotify(fb))) + + def cancelNotify: F[Unit] = + state + .modify(_.clearNotify) + .flatMap({ + case Some(fb) => + fb.cancel + case None => + ().pure[F] + }) + + private def logError(msg: => String)(fa: F[Unit]): F[Unit] = + fa.attempt.flatMap { + case Right(_) => ().pure[F] + case Left(ex) => logger.ferror(ex)(msg).map(_ => ()) + } + + private def logThrow[A](msg: => String)(fa: F[A]): F[A] = + fa.attempt + .flatMap({ + case r @ Right(_) => (r: Either[Throwable, A]).pure[F] + case l @ Left(ex) => logger.ferror(ex)(msg).map(_ => (l: Either[Throwable, A])) + }) + .rethrow +} + +object PeriodicSchedulerImpl { + def emptyState[F[_]]: State[F] = + State(false, None) + + case class State[F[_]]( + shutdownRequest: Boolean, + scheduledNotify: Option[Fiber[F, Unit]] + ) { + def requestShutdown: (State[F], Unit) = + (copy(shutdownRequest = true), ()) + + def setNotify(fb: Fiber[F, Unit]): (State[F], Unit) = + (copy(scheduledNotify = Some(fb)), ()) + + def clearNotify: (State[F], Option[Fiber[F, Unit]]) = + (copy(scheduledNotify = None), scheduledNotify) + + } +} diff --git a/modules/joex/src/main/scala/docspell/joex/scheduler/SchedulerBuilder.scala b/modules/joex/src/main/scala/docspell/joex/scheduler/SchedulerBuilder.scala index d1faee33..d2879a93 100644 --- a/modules/joex/src/main/scala/docspell/joex/scheduler/SchedulerBuilder.scala +++ b/modules/joex/src/main/scala/docspell/joex/scheduler/SchedulerBuilder.scala @@ -34,6 +34,9 @@ case class SchedulerBuilder[F[_]: ConcurrentEffect: ContextShift]( def withLogSink(sink: LogSink[F]): SchedulerBuilder[F] = copy(logSink = sink) + def withQueue(queue: JobQueue[F]): SchedulerBuilder[F] = + copy(queue = Resource.pure[F, JobQueue[F]](queue)) + def serve: Resource[F, Scheduler[F]] = resource.evalMap(sch => ConcurrentEffect[F].start(sch.start.compile.drain).map(_ => sch)) diff --git a/modules/joex/src/main/scala/docspell/joex/scheduler/Task.scala b/modules/joex/src/main/scala/docspell/joex/scheduler/Task.scala index 8ac59a1b..ae35fab2 100644 --- a/modules/joex/src/main/scala/docspell/joex/scheduler/Task.scala +++ b/modules/joex/src/main/scala/docspell/joex/scheduler/Task.scala @@ -4,6 +4,7 @@ import cats.implicits._ import cats.{Applicative, ApplicativeError, FlatMap, Functor} import cats.data.Kleisli import cats.effect.Sync +import docspell.common.Logger /** * The code that is executed by the scheduler @@ -51,4 +52,7 @@ object Task { def setProgress[F[_]: Sync, A, B](n: Int)(data: B): Task[F, A, B] = Task(_.setProgress(n).map(_ => data)) + + def log[F[_]](f: Logger[F] => F[Unit]): Task[F, Unit, Unit] = + Task(ctx => f(ctx.logger)) } diff --git a/modules/joexapi/src/main/scala/docspell/joexapi/client/JoexClient.scala b/modules/joexapi/src/main/scala/docspell/joexapi/client/JoexClient.scala new file mode 100644 index 00000000..02a4c2cf --- /dev/null +++ b/modules/joexapi/src/main/scala/docspell/joexapi/client/JoexClient.scala @@ -0,0 +1,59 @@ +package docspell.joexapi.client + +import cats.implicits._ +import cats.effect._ +import docspell.common.{Ident, LenientUri} +import docspell.common.syntax.all._ +import org.http4s.{Method, Request, Uri} +import org.http4s.client.Client +import org.http4s.client.blaze.BlazeClientBuilder +import scala.concurrent.ExecutionContext + +import org.log4s.getLogger + +trait JoexClient[F[_]] { + + def notifyJoex(base: LenientUri): F[Unit] + + def notifyJoexIgnoreErrors(base: LenientUri): F[Unit] + + def cancelJob(base: LenientUri, job: Ident): F[Unit] + +} + +object JoexClient { + + private[this] val logger = getLogger + + def apply[F[_]: Sync](client: Client[F]): JoexClient[F] = + new JoexClient[F] { + def notifyJoex(base: LenientUri): F[Unit] = { + val notifyUrl = base / "api" / "v1" / "notify" + val req = Request[F](Method.POST, uri(notifyUrl)) + logger.fdebug(s"Notify joex at ${notifyUrl.asString}") *> + client.expect[String](req).map(_ => ()) + } + + def notifyJoexIgnoreErrors(base: LenientUri): F[Unit] = + notifyJoex(base).attempt.map { + case Right(()) => () + case Left(ex) => + logger.warn( + s"Notifying Joex instance '${base.asString}' failed: ${ex.getMessage}" + ) + () + } + + def cancelJob(base: LenientUri, job: Ident): F[Unit] = { + val cancelUrl = base / "api" / "v1" / "job" / job.id / "cancel" + val req = Request[F](Method.POST, uri(cancelUrl)) + client.expect[String](req).map(_ => ()) + } + + private def uri(u: LenientUri): Uri = + Uri.unsafeFromString(u.asString) + } + + def resource[F[_]: ConcurrentEffect](ec: ExecutionContext): Resource[F, JoexClient[F]] = + BlazeClientBuilder[F](ec).resource.map(apply[F]) +} diff --git a/modules/microsite/docs/dev/adr.md b/modules/microsite/docs/dev/adr.md index 285571da..20c149b0 100644 --- a/modules/microsite/docs/dev/adr.md +++ b/modules/microsite/docs/dev/adr.md @@ -16,3 +16,4 @@ title: ADRs - [0009 Convert Office Files](adr/0009_convert_office_docs) - [0010 Convert Image Files](adr/0010_convert_image_files) - [0011 Extract Text](adr/0011_extract_text) +- [0012 Periodic Tasks](adr/0012_periodic_tasks) diff --git a/modules/microsite/docs/dev/adr/0012_periodic_tasks.md b/modules/microsite/docs/dev/adr/0012_periodic_tasks.md new file mode 100644 index 00000000..5edd559c --- /dev/null +++ b/modules/microsite/docs/dev/adr/0012_periodic_tasks.md @@ -0,0 +1,102 @@ +--- +layout: docs +title: Periodic Tasks +--- + +# Periodic Tasks + +## Context and Problem Statement + +Currently there is a `Scheduler` that consumes tasks off a queue in +the database. This allows multiple job executors running in parallel +racing for the next job to execute. This is for executing tasks +immediately – as long as there are enough resource. + +What is missing, is a component that maintains periodic tasks. The +reason for this is to have house keeping tasks that run regularily and +clean up stale or unused data. Later, users should be able to create +periodic tasks, for example to read e-mails from an inbox. + +The problem is again, that it must work with multiple job executor +instances running at the same time. This is the same pattern as with +the `Scheduler`: it must be ensured that only one task is used at a +time. Multiple job exectuors must not schedule a perdiodic task more +than once. If a periodic tasks takes longer than the time between +runs, it must wait for the next interval. + + +## Considered Options + +1. Adding a `timer` and `nextrun` field to the current `job` table +2. Creating a separate table for periodic tasks + +## Decision Outcome + +The 2. option. + +For internal housekeeping tasks, it may suffice to reuse the existing +`job` queue by adding more fields such that a job may be considered +periodic. But this conflates with what the `Scheduler` is doing now +(executing tasks as soon as possible while being bound to some +resources) with a completely different subject. + +There will be a new `PeriodicScheduler` that works on a new table in +the database that is representing periodic tasks. This table will +share fields with the `job` table to be able to create `RJob` +instances. This new component is only taking care of periodically +submitting jobs to the job queue such that the `Scheduler` will +eventually pick it up and run it. + +```sql +CREATE TABLE "periodic_task" ( + "id" varchar(254) not null primary key, + "enabled" boolean not null, + "task" varchar(254) not null, + "group_" varchar(254) not null, + "args" text not null, + "subject" varchar(254) not null, + "submitter" varchar(254) not null, + "priority" int not null, + "worker" varchar(254), + "marked" timestamp, + "timer" varchar(254) not null, + "nextrun" timestamp not null, + "created" timestamp not null +); +``` + +Preparing for other features, periodic tasks will be created by users. +It should be possible to disable/enable them. The next 6 properties +are needed to insert jobs into the `job` table. The `worker` field +(and `marked`) are used to mark a periodic job as "being worked on by +a job executor". + +The `timer` is the schedule, which is a +[systemd-like](https://man.cx/systemd.time#heading7) calendar event +string. This is parsed by [this +library](https://github.com/eikek/calev). The `nextrun` field will +store the timestamp of the next time the task would need to be +executed. This is needed to query this table for the newest task. + +The `PeriodicScheduler` works roughly like this: + +On startup: +- Remove stale worker values. If the process has been killed, there + may be marked tasks which must be cleared now. + +Main-Loop: +0. Cancel current scheduled notify (see 4. below) +1. get next (= earliest & enabled) periodic job +2. if none: stop +3. if triggered (= `nextrun <= 'now'`): + - Mark periodic task. On fail: goto 1. + - Submit new job into the jobqueue: + - Update `nextrun` field + - Check for non-final jobs of that name. This is required to not + run the same periodic task multiple times concurrently. + - if exist: goto 4. + - if not exist: submit job + - Unmark periodic task +4. if future + - schedule notify: notify self to run again next time the task + schedule triggers diff --git a/modules/microsite/docs/dev/adr/template.md b/modules/microsite/docs/dev/adr/template.md index 25696bbe..d531e39e 100644 --- a/modules/microsite/docs/dev/adr/template.md +++ b/modules/microsite/docs/dev/adr/template.md @@ -1,3 +1,8 @@ +--- +layout: docs +title: Short Title +--- + # [short title of solved problem and solution] * Status: [proposed | rejected | accepted | deprecated | … | superseded by [ADR-0005](0005-example.md)] diff --git a/modules/restserver/src/main/scala/docspell/restserver/conv/Conversions.scala b/modules/restserver/src/main/scala/docspell/restserver/conv/Conversions.scala index 123234d4..def5cc61 100644 --- a/modules/restserver/src/main/scala/docspell/restserver/conv/Conversions.scala +++ b/modules/restserver/src/main/scala/docspell/restserver/conv/Conversions.scala @@ -118,7 +118,7 @@ trait Conversions { ) def mkItemList(v: Vector[OItem.ListItem]): ItemLightList = { - val groups = v.groupBy(item => item.date.toDate.toString.substring(0, 7)) + val groups = v.groupBy(item => item.date.toUtcDate.toString.substring(0, 7)) def mkGroup(g: (String, Vector[OItem.ListItem])): ItemLightGroup = ItemLightGroup(g._1, g._2.map(mkItemLight).toList) diff --git a/modules/store/src/main/resources/db/migration/mariadb/V1.3.0__periodic_job.sql b/modules/store/src/main/resources/db/migration/mariadb/V1.3.0__periodic_job.sql new file mode 100644 index 00000000..782d9592 --- /dev/null +++ b/modules/store/src/main/resources/db/migration/mariadb/V1.3.0__periodic_job.sql @@ -0,0 +1,18 @@ +CREATE TABLE `periodic_task` ( + `id` varchar(254) not null primary key, + `enabled` boolean not null, + `task` varchar(254) not null, + `group_` varchar(254) not null, + `args` text not null, + `subject` varchar(254) not null, + `submitter` varchar(254) not null, + `priority` int not null, + `worker` varchar(254), + `marked` timestamp, + `timer` varchar(254) not null, + `nextrun` timestamp not null, + `created` timestamp not null +); + +CREATE INDEX `periodic_task_nextrun_idx` ON `periodic_task`(`nextrun`); +CREATE INDEX `periodic_task_worker_idx` ON `periodic_task`(`worker`); diff --git a/modules/store/src/main/resources/db/migration/postgresql/V1.3.0__periodic_job.sql b/modules/store/src/main/resources/db/migration/postgresql/V1.3.0__periodic_job.sql new file mode 100644 index 00000000..215b9dd7 --- /dev/null +++ b/modules/store/src/main/resources/db/migration/postgresql/V1.3.0__periodic_job.sql @@ -0,0 +1,18 @@ +CREATE TABLE "periodic_task" ( + "id" varchar(254) not null primary key, + "enabled" boolean not null, + "task" varchar(254) not null, + "group_" varchar(254) not null, + "args" text not null, + "subject" varchar(254) not null, + "submitter" varchar(254) not null, + "priority" int not null, + "worker" varchar(254), + "marked" timestamp, + "timer" varchar(254) not null, + "nextrun" timestamp not null, + "created" timestamp not null +); + +CREATE INDEX "periodic_task_nextrun_idx" ON "periodic_task"("nextrun"); +CREATE INDEX "periodic_task_worker_idx" ON "periodic_task"("worker"); diff --git a/modules/store/src/main/scala/docspell/store/impl/Column.scala b/modules/store/src/main/scala/docspell/store/impl/Column.scala index 9d697bf4..a42a09a3 100644 --- a/modules/store/src/main/scala/docspell/store/impl/Column.scala +++ b/modules/store/src/main/scala/docspell/store/impl/Column.scala @@ -30,6 +30,9 @@ case class Column(name: String, ns: String = "", alias: String = "") { def is(c: Column): Fragment = f ++ fr"=" ++ c.f + def isNot[A: Put](value: A): Fragment = + f ++ fr"<> $value" + def isNull: Fragment = f ++ fr"is null" diff --git a/modules/store/src/main/scala/docspell/store/impl/DoobieMeta.scala b/modules/store/src/main/scala/docspell/store/impl/DoobieMeta.scala index 62f058cd..8bc57809 100644 --- a/modules/store/src/main/scala/docspell/store/impl/DoobieMeta.scala +++ b/modules/store/src/main/scala/docspell/store/impl/DoobieMeta.scala @@ -7,6 +7,7 @@ import doobie._ import doobie.implicits.legacy.instant._ import doobie.util.log.Success import emil.{MailAddress, SSLType} +import com.github.eikek.calev.CalEvent import docspell.common._ import docspell.common.syntax.all._ @@ -98,6 +99,9 @@ trait DoobieMeta { Meta[String].imap(str => str.split(',').toList.map(_.trim).map(EmilUtil.unsafeReadMailAddress))( lma => lma.map(EmilUtil.mailAddressString).mkString(",") ) + + implicit val metaCalEvent: Meta[CalEvent] = + Meta[String].timap(CalEvent.unsafe)(_.asString) } object DoobieMeta extends DoobieMeta { diff --git a/modules/store/src/main/scala/docspell/store/queries/QPeriodicTask.scala b/modules/store/src/main/scala/docspell/store/queries/QPeriodicTask.scala new file mode 100644 index 00000000..e52c27cc --- /dev/null +++ b/modules/store/src/main/scala/docspell/store/queries/QPeriodicTask.scala @@ -0,0 +1,65 @@ +package docspell.store.queries + +import docspell.common._ +import docspell.store.impl.Implicits._ +import docspell.store.records._ +import doobie._ +import doobie.implicits._ + +object QPeriodicTask { + + def clearWorkers(name: Ident): ConnectionIO[Int] = { + val worker = RPeriodicTask.Columns.worker + updateRow(RPeriodicTask.table, worker.is(name), worker.setTo[Ident](None)).update.run + } + + def setWorker(pid: Ident, name: Ident, ts: Timestamp): ConnectionIO[Int] = { + val id = RPeriodicTask.Columns.id + val worker = RPeriodicTask.Columns.worker + val marked = RPeriodicTask.Columns.marked + updateRow( + RPeriodicTask.table, + and(id.is(pid), worker.isNull), + commas(worker.setTo(name), marked.setTo(ts)) + ).update.run + } + + def unsetWorker( + pid: Ident, + nextRun: Option[Timestamp] + ): ConnectionIO[Int] = { + val id = RPeriodicTask.Columns.id + val worker = RPeriodicTask.Columns.worker + val next = RPeriodicTask.Columns.nextrun + updateRow( + RPeriodicTask.table, + id.is(pid), + commas(worker.setTo[Ident](None), next.setTo(nextRun)) + ).update.run + } + + def findNext(excl: Option[Ident]): ConnectionIO[Option[RPeriodicTask]] = { + val enabled = RPeriodicTask.Columns.enabled + val pid = RPeriodicTask.Columns.id + val order = orderBy(RPeriodicTask.Columns.nextrun.f) ++ fr"ASC" + + val where = excl match { + case Some(id) => and(pid.isNot(id), enabled.is(true)) + case None => enabled.is(true) + } + val sql = + selectSimple(RPeriodicTask.Columns.all, RPeriodicTask.table, where) ++ order + sql.query[RPeriodicTask].streamWithChunkSize(2).take(1).compile.last + } + + def findNonFinal(pid: Ident): ConnectionIO[Option[RJob]] = + selectSimple( + RJob.Columns.all, + RJob.table, + and( + RJob.Columns.tracker.is(pid), + RJob.Columns.state.isOneOf(JobState.all.diff(JobState.done).toSeq) + ) + ).query[RJob].option + +} diff --git a/modules/store/src/main/scala/docspell/store/queue/JobQueue.scala b/modules/store/src/main/scala/docspell/store/queue/JobQueue.scala index f0a9d722..1eaf2cb7 100644 --- a/modules/store/src/main/scala/docspell/store/queue/JobQueue.scala +++ b/modules/store/src/main/scala/docspell/store/queue/JobQueue.scala @@ -29,7 +29,7 @@ object JobQueue { worker: Ident, retryPause: Duration ): F[Option[RJob]] = - logger.fdebug("Select next job") *> QJob.takeNextJob(store)(prio, worker, retryPause) + logger.ftrace("Select next job") *> QJob.takeNextJob(store)(prio, worker, retryPause) def insert(job: RJob): F[Unit] = store diff --git a/modules/store/src/main/scala/docspell/store/queue/Marked.scala b/modules/store/src/main/scala/docspell/store/queue/Marked.scala new file mode 100644 index 00000000..2b634198 --- /dev/null +++ b/modules/store/src/main/scala/docspell/store/queue/Marked.scala @@ -0,0 +1,17 @@ +package docspell.store.queue + +sealed trait Marked[+A] {} + +object Marked { + + final case class Found[A](value: A) extends Marked[A] + + final case object NotFound extends Marked[Nothing] + + final case object NotMarkable extends Marked[Nothing] + + + def found[A](v: A): Marked[A] = Found(v) + def notFound[A]: Marked[A] = NotFound + def notMarkable[A]: Marked[A] = NotMarkable +} diff --git a/modules/store/src/main/scala/docspell/store/queue/PeriodicTaskStore.scala b/modules/store/src/main/scala/docspell/store/queue/PeriodicTaskStore.scala new file mode 100644 index 00000000..787ec6cf --- /dev/null +++ b/modules/store/src/main/scala/docspell/store/queue/PeriodicTaskStore.scala @@ -0,0 +1,126 @@ +package docspell.store.queue + +import cats.effect._ +import cats.implicits._ +import org.log4s.getLogger +import com.github.eikek.fs2calev._ +import docspell.common._ +import docspell.common.syntax.all._ +import docspell.store.{AddResult, Store} +import docspell.store.records._ +import docspell.store.queries.QPeriodicTask + +trait PeriodicTaskStore[F[_]] { + + /** Get the free periodic task due next and reserve it to the given + * worker. + * + * If found, the task is returned and resource finalization takes + * care of unmarking the task after use and updating `nextRun` with + * the next timestamp. + */ + def takeNext( + worker: Ident, + excludeId: Option[Ident] + ): Resource[F, Marked[RPeriodicTask]] + + def clearMarks(name: Ident): F[Unit] + + def findNonFinalJob(pjobId: Ident): F[Option[RJob]] + + /** Insert a task or update if it already exists. + */ + def insert(task: RPeriodicTask): F[Unit] + + /** Adds the task only if it not already exists. + */ + def add(task: RPeriodicTask): F[AddResult] + + def findJoexNodes: F[Vector[RNode]] +} + +object PeriodicTaskStore { + private[this] val logger = getLogger + + def create[F[_]: Sync](store: Store[F]): Resource[F, PeriodicTaskStore[F]] = + Resource.pure[F, PeriodicTaskStore[F]](new PeriodicTaskStore[F] { + + def takeNext( + worker: Ident, + excludeId: Option[Ident] + ): Resource[F, Marked[RPeriodicTask]] = { + val chooseNext: F[Marked[RPeriodicTask]] = + getNext(excludeId).flatMap { + case Some(pj) => + mark(pj.id, worker).map { + case true => Marked.found(pj.copy(worker = worker.some)) + case false => Marked.notMarkable + } + case None => + Marked.notFound.pure[F] + } + + Resource.make(chooseNext)({ + case Marked.Found(pj) => unmark(pj) + case _ => ().pure[F] + }) + } + + def getNext(excl: Option[Ident]): F[Option[RPeriodicTask]] = + store.transact(QPeriodicTask.findNext(excl)) + + def mark(pid: Ident, name: Ident): F[Boolean] = + Timestamp + .current[F] + .flatMap(now => + store.transact(QPeriodicTask.setWorker(pid, name, now)).map(_ > 0) + ) + + def unmark(job: RPeriodicTask): F[Unit] = + for { + now <- Timestamp.current[F] + nextRun <- CalevFs2 + .nextElapses[F](now.atUTC)(job.timer) + .take(1) + .compile + .last + .map(_.map(Timestamp.from)) + _ <- store.transact(QPeriodicTask.unsetWorker(job.id, nextRun)) + } yield () + + def clearMarks(name: Ident): F[Unit] = + store + .transact(QPeriodicTask.clearWorkers(name)) + .flatMap { n => + if (n > 0) logger.finfo(s"Clearing $n periodic tasks from worker ${name.id}") + else ().pure[F] + } + + def findNonFinalJob(pjobId: Ident): F[Option[RJob]] = + store.transact(QPeriodicTask.findNonFinal(pjobId)) + + def insert(task: RPeriodicTask): F[Unit] = { + val update = store.transact(RPeriodicTask.update(task)) + val insertAttempt = store.transact(RPeriodicTask.insert(task)).attempt.map { + case Right(n) => n > 0 + case Left(_) => false + } + + for { + n1 <- update + ins <- if (n1 == 0) insertAttempt else true.pure[F] + _ <- if (ins) 1.pure[F] else update + } yield () + } + + def add(task: RPeriodicTask): F[AddResult] = { + val insert = RPeriodicTask.insert(task) + val exists = RPeriodicTask.exists(task.id) + store.add(insert, exists) + } + + def findJoexNodes: F[Vector[RNode]] = + store.transact(RNode.findAll(NodeType.Joex)) + + }) +} diff --git a/modules/store/src/main/scala/docspell/store/records/RInvitation.scala b/modules/store/src/main/scala/docspell/store/records/RInvitation.scala index 98d48a51..37764d0c 100644 --- a/modules/store/src/main/scala/docspell/store/records/RInvitation.scala +++ b/modules/store/src/main/scala/docspell/store/records/RInvitation.scala @@ -46,4 +46,7 @@ object RInvitation { _ <- delete(invite) } yield inv > 0 } + + def deleteOlderThan(ts: Timestamp): ConnectionIO[Int] = + deleteFrom(table, created.isLt(ts)).update.run } diff --git a/modules/store/src/main/scala/docspell/store/records/RJob.scala b/modules/store/src/main/scala/docspell/store/records/RJob.scala index 87520852..9a286743 100644 --- a/modules/store/src/main/scala/docspell/store/records/RJob.scala +++ b/modules/store/src/main/scala/docspell/store/records/RJob.scala @@ -1,12 +1,15 @@ package docspell.store.records import cats.effect.Sync +import cats.implicits._ +import fs2.Stream import doobie._ import doobie.implicits._ +import io.circe.Encoder + import docspell.common._ import docspell.store.impl.Column import docspell.store.impl.Implicits._ -import io.circe.Encoder case class RJob( id: Ident, @@ -227,7 +230,8 @@ object RJob { } def selectGroupInState(states: Seq[JobState]): ConnectionIO[Vector[Ident]] = { - val sql = selectDistinct(List(group), table, state.isOneOf(states)) ++ orderBy(group.f) + val sql = + selectDistinct(List(group), table, state.isOneOf(states)) ++ orderBy(group.f) sql.query[Ident].to[Vector] } @@ -236,4 +240,19 @@ object RJob { n0 <- RJobLog.deleteAll(jobId) n1 <- deleteFrom(table, id.is(jobId)).update.run } yield n0 + n1 + + def findIdsDoneAndOlderThan(ts: Timestamp): Stream[ConnectionIO, Ident] = + selectSimple( + Seq(id), + table, + and(state.isOneOf(JobState.done.toSeq), or(finished.isNull, finished.isLt(ts))) + ).query[Ident].stream + + def deleteDoneAndOlderThan(ts: Timestamp, batch: Int): ConnectionIO[Int] = + findIdsDoneAndOlderThan(ts) + .take(batch.toLong) + .evalMap(delete) + .map(_ => 1) + .compile + .foldMonoid } diff --git a/modules/store/src/main/scala/docspell/store/records/RPeriodicTask.scala b/modules/store/src/main/scala/docspell/store/records/RPeriodicTask.scala new file mode 100644 index 00000000..a510904b --- /dev/null +++ b/modules/store/src/main/scala/docspell/store/records/RPeriodicTask.scala @@ -0,0 +1,176 @@ +package docspell.store.records + +import cats.effect._ +import cats.implicits._ +import doobie._ +import doobie.implicits._ +import com.github.eikek.calev.CalEvent +import docspell.common._ +import docspell.store.impl.Column +import docspell.store.impl.Implicits._ +import io.circe.Encoder + +/** A periodic task is a special job description, that shares a few + * properties of a `RJob`. It must provide all information to create + * a `RJob` value eventually. + */ +case class RPeriodicTask( + id: Ident, + enabled: Boolean, + task: Ident, + group: Ident, + args: String, + subject: String, + submitter: Ident, + priority: Priority, + worker: Option[Ident], + marked: Option[Timestamp], + timer: CalEvent, + nextrun: Timestamp, + created: Timestamp +) { + + def toJob[F[_]: Sync]: F[RJob] = + for { + now <- Timestamp.current[F] + jid <- Ident.randomId[F] + } yield RJob( + jid, + task, + group, + args, + subject, + now, + submitter, + priority, + JobState.Waiting, + 0, + 0, + Some(id), + None, + None, + None + ) +} + +object RPeriodicTask { + + def create[F[_]: Sync]( + enabled: Boolean, + task: Ident, + group: Ident, + args: String, + subject: String, + submitter: Ident, + priority: Priority, + timer: CalEvent + ): F[RPeriodicTask] = + Ident + .randomId[F] + .flatMap(id => + Timestamp + .current[F] + .map { now => + RPeriodicTask( + id, + enabled, + task, + group, + args, + subject, + submitter, + priority, + None, + None, + timer, + timer + .nextElapse(now.atZone(Timestamp.UTC)) + .map(_.toInstant) + .map(Timestamp.apply) + .getOrElse(Timestamp.Epoch), + now + ) + } + ) + + def createJson[F[_]: Sync, A]( + enabled: Boolean, + task: Ident, + group: Ident, + args: A, + subject: String, + submitter: Ident, + priority: Priority, + timer: CalEvent + )(implicit E: Encoder[A]): F[RPeriodicTask] = + create[F](enabled, task, group, E(args).noSpaces, subject, submitter, priority, timer) + + val table = fr"periodic_task" + + object Columns { + val id = Column("id") + val enabled = Column("enabled") + val task = Column("task") + val group = Column("group_") + val args = Column("args") + val subject = Column("subject") + val submitter = Column("submitter") + val priority = Column("priority") + val worker = Column("worker") + val marked = Column("marked") + val timer = Column("timer") + val nextrun = Column("nextrun") + val created = Column("created") + val all = List( + id, + enabled, + task, + group, + args, + subject, + submitter, + priority, + worker, + marked, + timer, + nextrun, + created + ) + } + + import Columns._ + + def insert(v: RPeriodicTask): ConnectionIO[Int] = { + val sql = insertRow( + table, + all, + fr"${v.id},${v.enabled},${v.task},${v.group},${v.args}," ++ + fr"${v.subject},${v.submitter},${v.priority},${v.worker}," ++ + fr"${v.marked},${v.timer},${v.nextrun},${v.created}" + ) + sql.update.run + } + + def update(v: RPeriodicTask): ConnectionIO[Int] = { + val sql = updateRow( + table, + id.is(v.id), + commas( + enabled.setTo(v.enabled), + group.setTo(v.group), + args.setTo(v.args), + subject.setTo(v.subject), + submitter.setTo(v.submitter), + priority.setTo(v.priority), + worker.setTo(v.worker), + marked.setTo(v.marked), + timer.setTo(v.timer), + nextrun.setTo(v.nextrun) + ) + ) + sql.update.run + } + + def exists(pid: Ident): ConnectionIO[Boolean] = + selectCount(id, table, id.is(pid)).query[Int].unique.map(_ > 0) +} diff --git a/project/Dependencies.scala b/project/Dependencies.scala index c0dc23d8..32f5ac26 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -7,6 +7,7 @@ object Dependencies { val BcryptVersion = "0.4" val BetterMonadicForVersion = "0.3.1" val BitpeaceVersion = "0.4.3" + val CalevVersion = "0.1.0" val CirceVersion = "0.13.0" val DoobieVersion = "0.8.8" val EmilVersion = "0.2.0" @@ -37,6 +38,14 @@ object Dependencies { val ViewerJSVersion = "0.5.8" + val calevCore = Seq( + "com.github.eikek" %% "calev-core" % CalevVersion, + ) + val calevFs2 = Seq( + "com.github.eikek" %% "calev-fs2" % CalevVersion + ) + val calev = calevFs2 ++ calevCore + val jclOverSlf4j = Seq( "org.slf4j" % "jcl-over-slf4j" % Slf4jVersion )