Merge pull request #45 from eikek/feature/background-tasks

Feature/background tasks
This commit is contained in:
eikek 2020-03-09 20:59:46 +01:00 committed by GitHub
commit a07a6ff376
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
42 changed files with 1201 additions and 85 deletions

View File

@ -3,7 +3,7 @@ version = "2.4.2"
align = more align = more
#align.arrowEnumeratorGenerator = true #align.arrowEnumeratorGenerator = true
maxColumn = 100 maxColumn = 90
rewrite.rules = [ rewrite.rules = [
AvoidInfix AvoidInfix

View File

@ -141,6 +141,7 @@ val common = project.in(file("modules/common")).
Dependencies.fs2 ++ Dependencies.fs2 ++
Dependencies.circe ++ Dependencies.circe ++
Dependencies.loggingApi ++ Dependencies.loggingApi ++
Dependencies.calevCore ++
Dependencies.pureconfig.map(_ % "optional") Dependencies.pureconfig.map(_ % "optional")
) )
@ -194,7 +195,8 @@ val store = project.in(file("modules/store")).
Dependencies.databases ++ Dependencies.databases ++
Dependencies.flyway ++ Dependencies.flyway ++
Dependencies.loggingApi ++ Dependencies.loggingApi ++
Dependencies.emil Dependencies.emil ++
Dependencies.calev
).dependsOn(common) ).dependsOn(common)
val extract = project.in(file("modules/extract")). val extract = project.in(file("modules/extract")).
@ -260,7 +262,8 @@ val joexapi = project.in(file("modules/joexapi")).
settings( settings(
name := "docspell-joexapi", name := "docspell-joexapi",
libraryDependencies ++= libraryDependencies ++=
Dependencies.circe, Dependencies.circe ++
Dependencies.http4sClient,
openapiTargetLanguage := Language.Scala, openapiTargetLanguage := Language.Scala,
openapiPackage := Pkg("docspell.joexapi.model"), openapiPackage := Pkg("docspell.joexapi.model"),
openapiSpec := (Compile/resourceDirectory).value/"joex-openapi.yml" openapiSpec := (Compile/resourceDirectory).value/"joex-openapi.yml"
@ -302,7 +305,7 @@ val backend = project.in(file("modules/backend")).
Dependencies.bcrypt ++ Dependencies.bcrypt ++
Dependencies.http4sClient ++ Dependencies.http4sClient ++
Dependencies.emil Dependencies.emil
).dependsOn(store) ).dependsOn(store, joexapi)
val webapp = project.in(file("modules/webapp")). val webapp = project.in(file("modules/webapp")).
disablePlugins(RevolverPlugin). disablePlugins(RevolverPlugin).

View File

@ -25,6 +25,7 @@ trait BackendApp[F[_]] {
def job: OJob[F] def job: OJob[F]
def item: OItem[F] def item: OItem[F]
def mail: OMail[F] def mail: OMail[F]
def joex: OJoex[F]
} }
object BackendApp { object BackendApp {
@ -44,9 +45,10 @@ object BackendApp {
tagImpl <- OTag[F](store) tagImpl <- OTag[F](store)
equipImpl <- OEquipment[F](store) equipImpl <- OEquipment[F](store)
orgImpl <- OOrganization(store) orgImpl <- OOrganization(store)
uploadImpl <- OUpload(store, queue, cfg, httpClientEc) joexImpl <- OJoex.create(httpClientEc, store)
uploadImpl <- OUpload(store, queue, cfg, joexImpl)
nodeImpl <- ONode(store) nodeImpl <- ONode(store)
jobImpl <- OJob(store, httpClientEc) jobImpl <- OJob(store, joexImpl)
itemImpl <- OItem(store) itemImpl <- OItem(store)
mailImpl <- OMail(store, JavaMailEmil(blocker)) mailImpl <- OMail(store, JavaMailEmil(blocker))
} yield new BackendApp[F] { } yield new BackendApp[F] {
@ -62,6 +64,7 @@ object BackendApp {
val job = jobImpl val job = jobImpl
val item = itemImpl val item = itemImpl
val mail = mailImpl val mail = mailImpl
val joex = joexImpl
} }
def apply[F[_]: ConcurrentEffect: ContextShift]( def apply[F[_]: ConcurrentEffect: ContextShift](

View File

@ -1,15 +1,13 @@
package docspell.backend.ops package docspell.backend.ops
import cats.implicits._ import cats.implicits._
import cats.effect.{ConcurrentEffect, Resource} import cats.effect._
import docspell.backend.ops.OJob.{CollectiveQueueState, JobCancelResult} import docspell.backend.ops.OJob.{CollectiveQueueState, JobCancelResult}
import docspell.common.{Ident, JobState} import docspell.common.{Ident, JobState}
import docspell.store.Store import docspell.store.Store
import docspell.store.queries.QJob import docspell.store.queries.QJob
import docspell.store.records.{RJob, RJobLog} import docspell.store.records.{RJob, RJobLog}
import scala.concurrent.ExecutionContext
trait OJob[F[_]] { trait OJob[F[_]] {
def queueState(collective: Ident, maxResults: Int): F[CollectiveQueueState] def queueState(collective: Ident, maxResults: Int): F[CollectiveQueueState]
@ -36,9 +34,9 @@ object OJob {
jobs.filter(_.job.state == JobState.Running) jobs.filter(_.job.state == JobState.Running)
} }
def apply[F[_]: ConcurrentEffect]( def apply[F[_]: Sync](
store: Store[F], store: Store[F],
clientEC: ExecutionContext joex: OJoex[F]
): Resource[F, OJob[F]] = ): Resource[F, OJob[F]] =
Resource.pure[F, OJob[F]](new OJob[F] { Resource.pure[F, OJob[F]](new OJob[F] {
@ -70,8 +68,7 @@ object OJob {
} }
def tryCancel(job: RJob, worker: Ident): F[JobCancelResult] = def tryCancel(job: RJob, worker: Ident): F[JobCancelResult] =
OJoex joex.cancelJob(job.id, worker)
.cancelJob(job.id, worker, store, clientEC)
.map(flag => if (flag) JobCancelResult.CancelRequested else JobCancelResult.JobNotFound) .map(flag => if (flag) JobCancelResult.CancelRequested else JobCancelResult.JobNotFound)
for { for {

View File

@ -1,62 +1,40 @@
package docspell.backend.ops package docspell.backend.ops
import cats.implicits._ import cats.implicits._
import cats.effect.ConcurrentEffect import cats.effect._
import docspell.common.{Ident, NodeType} import docspell.common.{Ident, NodeType}
import docspell.joexapi.client.JoexClient
import docspell.store.Store import docspell.store.Store
import docspell.store.records.RNode import docspell.store.records.RNode
import org.http4s.client.blaze.BlazeClientBuilder
import org.http4s.Method._
import org.http4s.{Request, Uri}
import scala.concurrent.ExecutionContext import scala.concurrent.ExecutionContext
import org.log4s._
trait OJoex[F[_]] {
def notifyAllNodes: F[Unit]
def cancelJob(job: Ident, worker: Ident): F[Boolean]
}
object OJoex { object OJoex {
private[this] val logger = getLogger
def notifyAll[F[_]: ConcurrentEffect]( def apply[F[_]: Sync](client: JoexClient[F], store: Store[F]): Resource[F, OJoex[F]] =
store: Store[F], Resource.pure[F, OJoex[F]](new OJoex[F] {
clientExecutionContext: ExecutionContext def notifyAllNodes: F[Unit] =
): F[Unit] = for {
for { nodes <- store.transact(RNode.findAll(NodeType.Joex))
nodes <- store.transact(RNode.findAll(NodeType.Joex)) _ <- nodes.toList.traverse(n => client.notifyJoexIgnoreErrors(n.url))
_ <- nodes.toList.traverse(notifyJoex[F](clientExecutionContext)) } yield ()
} yield ()
def cancelJob[F[_]: ConcurrentEffect]( def cancelJob(job: Ident, worker: Ident): F[Boolean] =
jobId: Ident, for {
worker: Ident, node <- store.transact(RNode.findById(worker))
store: Store[F], cancel <- node.traverse(n => client.cancelJob(n.url, job))
clientEc: ExecutionContext } yield cancel.isDefined
): F[Boolean] = })
for {
node <- store.transact(RNode.findById(worker))
cancel <- node.traverse(joexCancel(clientEc)(_, jobId))
} yield cancel.getOrElse(false)
private def joexCancel[F[_]: ConcurrentEffect]( def create[F[_]: ConcurrentEffect](ec: ExecutionContext, store: Store[F]): Resource[F, OJoex[F]] =
ec: ExecutionContext JoexClient.resource(ec).flatMap(client => apply(client, store))
)(node: RNode, job: Ident): F[Boolean] = {
val notifyUrl = node.url / "api" / "v1" / "job" / job.id / "cancel"
BlazeClientBuilder[F](ec).resource.use { client =>
val req = Request[F](POST, Uri.unsafeFromString(notifyUrl.asString))
client.expect[String](req).map(_ => true)
}
}
private def notifyJoex[F[_]: ConcurrentEffect](ec: ExecutionContext)(node: RNode): F[Unit] = {
val notifyUrl = node.url / "api" / "v1" / "notify"
val execute = BlazeClientBuilder[F](ec).resource.use { client =>
val req = Request[F](POST, Uri.unsafeFromString(notifyUrl.asString))
client.expect[String](req).map(_ => ())
}
execute.attempt.map {
case Right(_) =>
()
case Left(_) =>
logger.warn(s"Notifying Joex instance '${node.id.id}/${node.url.asString}' failed.")
()
}
}
} }

View File

@ -2,7 +2,7 @@ package docspell.backend.ops
import bitpeace.MimetypeHint import bitpeace.MimetypeHint
import cats.implicits._ import cats.implicits._
import cats.effect.{ConcurrentEffect, Effect, Resource} import cats.effect._
import docspell.backend.Config import docspell.backend.Config
import fs2.Stream import fs2.Stream
import docspell.common._ import docspell.common._
@ -12,8 +12,6 @@ import docspell.store.queue.JobQueue
import docspell.store.records.{RCollective, RJob, RSource} import docspell.store.records.{RCollective, RJob, RSource}
import org.log4s._ import org.log4s._
import scala.concurrent.ExecutionContext
trait OUpload[F[_]] { trait OUpload[F[_]] {
def submit(data: OUpload.UploadData[F], account: AccountId): F[OUpload.UploadResult] def submit(data: OUpload.UploadData[F], account: AccountId): F[OUpload.UploadResult]
@ -51,11 +49,11 @@ object OUpload {
case object NoSource extends UploadResult case object NoSource extends UploadResult
} }
def apply[F[_]: ConcurrentEffect]( def apply[F[_]: Sync](
store: Store[F], store: Store[F],
queue: JobQueue[F], queue: JobQueue[F],
cfg: Config, cfg: Config,
httpClientEC: ExecutionContext joex: OJoex[F]
): Resource[F, OUpload[F]] = ): Resource[F, OUpload[F]] =
Resource.pure[F, OUpload[F]](new OUpload[F] { Resource.pure[F, OUpload[F]](new OUpload[F] {
@ -92,7 +90,7 @@ object OUpload {
for { for {
_ <- logger.fdebug(s"Storing jobs: $jobs") _ <- logger.fdebug(s"Storing jobs: $jobs")
_ <- queue.insertAll(jobs) _ <- queue.insertAll(jobs)
_ <- OJoex.notifyAll(store, httpClientEC) _ <- joex.notifyAllNodes
} yield UploadResult.Success } yield UploadResult.Success
private def saveFile(file: File[F]): F[Option[ProcessItemArgs.File]] = private def saveFile(file: File[F]): F[Option[ProcessItemArgs.File]] =
@ -109,7 +107,7 @@ object OUpload {
}, id => Some(ProcessItemArgs.File(file.name, id)))) }, id => Some(ProcessItemArgs.File(file.name, id))))
private def checkFileList(files: Seq[ProcessItemArgs.File]): F[Either[UploadResult, Unit]] = private def checkFileList(files: Seq[ProcessItemArgs.File]): F[Either[UploadResult, Unit]] =
Effect[F].pure(if (files.isEmpty) Left(UploadResult.NoFiles) else Right(())) Sync[F].pure(if (files.isEmpty) Left(UploadResult.NoFiles) else Right(()))
private def makeJobs( private def makeJobs(
args: Vector[ProcessItemArgs], args: Vector[ProcessItemArgs],

View File

@ -0,0 +1,28 @@
package docspell.common
import scodec.bits.ByteVector
import java.nio.charset.StandardCharsets
final class Hash(bytes: ByteVector) {
private def digest(name: String): String =
bytes.digest(name).toHex.toLowerCase
def sha256: String =
digest("SHA-256")
def md5: String =
digest("MD5")
def add(str: String): Hash =
new Hash(bytes ++ ByteVector.view(str.getBytes(StandardCharsets.UTF_8)))
def add(id: Ident): Hash =
add(id.id)
}
object Hash {
def empty: Hash = new Hash(ByteVector.empty)
}

View File

@ -4,6 +4,8 @@ import java.time.{Instant, LocalDate, ZoneId}
import cats.effect.Sync import cats.effect.Sync
import io.circe.{Decoder, Encoder} import io.circe.{Decoder, Encoder}
import java.time.LocalDateTime
import java.time.ZonedDateTime
case class Timestamp(value: Instant) { case class Timestamp(value: Instant) {
@ -14,22 +16,40 @@ case class Timestamp(value: Instant) {
def minus(d: Duration): Timestamp = def minus(d: Duration): Timestamp =
Timestamp(value.minusNanos(d.nanos)) Timestamp(value.minusNanos(d.nanos))
def - (d: Duration): Timestamp =
minus(d)
def minusHours(n: Long): Timestamp = def minusHours(n: Long): Timestamp =
Timestamp(value.minusSeconds(n * 60 * 60)) Timestamp(value.minusSeconds(n * 60 * 60))
def toDate: LocalDate = def toUtcDate: LocalDate =
value.atZone(ZoneId.of("UTC")).toLocalDate value.atZone(Timestamp.UTC).toLocalDate
def toUtcDateTime: LocalDateTime =
value.atZone(Timestamp.UTC).toLocalDateTime
def atZone(zone: ZoneId): ZonedDateTime =
value.atZone(zone)
def atUTC: ZonedDateTime = atZone(Timestamp.UTC)
def asString: String = value.toString def asString: String = value.toString
def < (other: Timestamp): Boolean =
this.value.isBefore(other.value)
} }
object Timestamp { object Timestamp {
val UTC = ZoneId.of("UTC")
val Epoch = Timestamp(Instant.EPOCH) val Epoch = Timestamp(Instant.EPOCH)
def current[F[_]: Sync]: F[Timestamp] = def current[F[_]: Sync]: F[Timestamp] =
Sync[F].delay(Timestamp(Instant.now)) Sync[F].delay(Timestamp(Instant.now))
def from(zd: ZonedDateTime): Timestamp =
Timestamp(zd.toInstant)
implicit val encodeTimestamp: Encoder[Timestamp] = implicit val encodeTimestamp: Encoder[Timestamp] =
BaseJsonCodecs.encodeInstantEpoch.contramap(_.value) BaseJsonCodecs.encodeInstantEpoch.contramap(_.value)

View File

@ -4,6 +4,7 @@ import docspell.common._
import _root_.pureconfig._ import _root_.pureconfig._
import _root_.pureconfig.error.{CannotConvert, FailureReason} import _root_.pureconfig.error.{CannotConvert, FailureReason}
import scodec.bits.ByteVector import scodec.bits.ByteVector
import com.github.eikek.calev.CalEvent
import scala.reflect.ClassTag import scala.reflect.ClassTag
@ -31,6 +32,10 @@ object Implicits {
else ByteVector.encodeUtf8(str).left.map(ex => s"Invalid utf8 string: ${ex.getMessage}") else ByteVector.encodeUtf8(str).left.map(ex => s"Invalid utf8 string: ${ex.getMessage}")
}) })
implicit val caleventReader: ConfigReader[CalEvent] =
ConfigReader[String].emap(reason(CalEvent.parse))
def reason[A: ClassTag](f: String => Either[String, A]): String => Either[FailureReason, A] = def reason[A: ClassTag](f: String => Either[String, A]): String => Either[FailureReason, A] =
in => in =>
f(in).left.map(str => CannotConvert(in, implicitly[ClassTag[A]].runtimeClass.toString, str)) f(in).left.map(str => CannotConvert(in, implicitly[ClassTag[A]].runtimeClass.toString, str))

View File

@ -64,6 +64,58 @@ docspell.joex {
wakeup-period = "30 minutes" wakeup-period = "30 minutes"
} }
periodic-scheduler {
# Each scheduler needs a unique name. This defaults to the node
# name, which must be unique, too.
name = ${docspell.joex.app-id}
# A fallback to start looking for due periodic tasks regularily.
# Usually joex instances should be notified via REST calls if
# external processes change tasks. But these requests may get
# lost.
wakeup-period = "10 minutes"
}
# Docspell uses periodic house keeping tasks, like cleaning expired
# invites, that can be configured here.
house-keeping {
# When the house keeping tasks execute. Default is to run every
# week.
schedule = "Sun *-*-* 00:00:00"
# This task removes invitation keys that have been created but not
# used. The timespan here must be greater than the `invite-time'
# setting in the rest server config file.
cleanup-invites = {
# Whether this task is enabled.
enabled = true
# The minimum age of invites to be deleted.
older-than = "30 days"
}
# Jobs store their log output in the database. Normally this data
# is only interesting for some period of time. The processing logs
# of old files can be removed eventually.
cleanup-jobs = {
# Whether this task is enabled.
enabled = true
# The minimum age of jobs to delete. It is matched against the
# `finished' timestamp.
older-than = "30 days"
# This defines how many jobs are deleted in one transaction.
# Since the data to delete may get large, it can be configured
# whether more or less memory should be used.
delete-batch = "100"
}
}
# Configuration of text extraction # Configuration of text extraction
extraction { extraction {
# For PDF files it is first tried to read the text parts of the # For PDF files it is first tried to read the text parts of the

View File

@ -1,10 +1,11 @@
package docspell.joex package docspell.joex
import docspell.common.{Ident, LenientUri} import docspell.common.{Ident, LenientUri}
import docspell.joex.scheduler.SchedulerConfig import docspell.joex.scheduler.{PeriodicSchedulerConfig, SchedulerConfig}
import docspell.store.JdbcConfig import docspell.store.JdbcConfig
import docspell.convert.ConvertConfig import docspell.convert.ConvertConfig
import docspell.extract.ExtractConfig import docspell.extract.ExtractConfig
import docspell.joex.hk.HouseKeepingConfig
case class Config( case class Config(
appId: Ident, appId: Ident,
@ -12,6 +13,8 @@ case class Config(
bind: Config.Bind, bind: Config.Bind,
jdbc: JdbcConfig, jdbc: JdbcConfig,
scheduler: SchedulerConfig, scheduler: SchedulerConfig,
periodicScheduler: PeriodicSchedulerConfig,
houseKeeping: HouseKeepingConfig,
extraction: ExtractConfig, extraction: ExtractConfig,
convert: ConvertConfig convert: ConvertConfig
) )

View File

@ -1,7 +1,7 @@
package docspell.joex package docspell.joex
import docspell.common.Ident import docspell.common.Ident
import docspell.joex.scheduler.Scheduler import docspell.joex.scheduler.{PeriodicScheduler, Scheduler}
import docspell.store.records.RJobLog import docspell.store.records.RJobLog
trait JoexApp[F[_]] { trait JoexApp[F[_]] {
@ -10,6 +10,8 @@ trait JoexApp[F[_]] {
def scheduler: Scheduler[F] def scheduler: Scheduler[F]
def periodicScheduler: PeriodicScheduler[F]
def findLogs(jobId: Ident): F[Vector[RJobLog]] def findLogs(jobId: Ident): F[Vector[RJobLog]]
/** Shuts down the job executor. /** Shuts down the job executor.

View File

@ -3,9 +3,12 @@ package docspell.joex
import cats.implicits._ import cats.implicits._
import cats.effect._ import cats.effect._
import docspell.common.{Ident, NodeType, ProcessItemArgs} import docspell.common.{Ident, NodeType, ProcessItemArgs}
import docspell.joex.hk._
import docspell.joex.process.ItemHandler import docspell.joex.process.ItemHandler
import docspell.joex.scheduler.{JobTask, Scheduler, SchedulerBuilder} import docspell.joex.scheduler._
import docspell.joexapi.client.JoexClient
import docspell.store.Store import docspell.store.Store
import docspell.store.queue._
import docspell.store.ops.ONode import docspell.store.ops.ONode
import docspell.store.records.RJobLog import docspell.store.records.RJobLog
import fs2.concurrent.SignallingRef import fs2.concurrent.SignallingRef
@ -16,15 +19,21 @@ final class JoexAppImpl[F[_]: ConcurrentEffect: ContextShift: Timer](
cfg: Config, cfg: Config,
nodeOps: ONode[F], nodeOps: ONode[F],
store: Store[F], store: Store[F],
pstore: PeriodicTaskStore[F],
termSignal: SignallingRef[F, Boolean], termSignal: SignallingRef[F, Boolean],
val scheduler: Scheduler[F] val scheduler: Scheduler[F],
val periodicScheduler: PeriodicScheduler[F]
) extends JoexApp[F] { ) extends JoexApp[F] {
def init: F[Unit] = { def init: F[Unit] = {
val run = scheduler.start.compile.drain val run = scheduler.start.compile.drain
val prun = periodicScheduler.start.compile.drain
for { for {
_ <- scheduleBackgroundTasks
_ <- ConcurrentEffect[F].start(run) _ <- ConcurrentEffect[F].start(run)
_ <- ConcurrentEffect[F].start(prun)
_ <- scheduler.periodicAwake _ <- scheduler.periodicAwake
_ <- periodicScheduler.periodicAwake
_ <- nodeOps.register(cfg.appId, NodeType.Joex, cfg.baseUrl) _ <- nodeOps.register(cfg.appId, NodeType.Joex, cfg.baseUrl)
} yield () } yield ()
} }
@ -36,8 +45,10 @@ final class JoexAppImpl[F[_]: ConcurrentEffect: ContextShift: Timer](
nodeOps.unregister(cfg.appId) nodeOps.unregister(cfg.appId)
def initShutdown: F[Unit] = def initShutdown: F[Unit] =
scheduler.shutdown(false) *> termSignal.set(true) periodicScheduler.shutdown *> scheduler.shutdown(false) *> termSignal.set(true)
private def scheduleBackgroundTasks: F[Unit] =
HouseKeepingTask.periodicTask[F](cfg.houseKeeping.schedule).flatMap(pstore.insert)
} }
object JoexAppImpl { object JoexAppImpl {
@ -46,12 +57,17 @@ object JoexAppImpl {
cfg: Config, cfg: Config,
termSignal: SignallingRef[F, Boolean], termSignal: SignallingRef[F, Boolean],
connectEC: ExecutionContext, connectEC: ExecutionContext,
clientEC: ExecutionContext,
blocker: Blocker blocker: Blocker
): Resource[F, JoexApp[F]] = ): Resource[F, JoexApp[F]] =
for { for {
client <- JoexClient.resource(clientEC)
store <- Store.create(cfg.jdbc, connectEC, blocker) store <- Store.create(cfg.jdbc, connectEC, blocker)
queue <- JobQueue(store)
pstore <- PeriodicTaskStore.create(store)
nodeOps <- ONode(store) nodeOps <- ONode(store)
sch <- SchedulerBuilder(cfg.scheduler, blocker, store) sch <- SchedulerBuilder(cfg.scheduler, blocker, store)
.withQueue(queue)
.withTask( .withTask(
JobTask.json( JobTask.json(
ProcessItemArgs.taskName, ProcessItemArgs.taskName,
@ -59,8 +75,23 @@ object JoexAppImpl {
ItemHandler.onCancel[F] ItemHandler.onCancel[F]
) )
) )
.withTask(
JobTask.json(
HouseKeepingTask.taskName,
HouseKeepingTask[F](cfg),
HouseKeepingTask.onCancel[F]
)
)
.resource .resource
app = new JoexAppImpl(cfg, nodeOps, store, termSignal, sch) psch <- PeriodicScheduler.create(
cfg.periodicScheduler,
sch,
queue,
pstore,
client,
Timer[F]
)
app = new JoexAppImpl(cfg, nodeOps, store, pstore, termSignal, sch, psch)
appR <- Resource.make(app.init.map(_ => app))(_.shutdown) appR <- Resource.make(app.init.map(_ => app))(_.shutdown)
} yield appR } yield appR
} }

View File

@ -24,13 +24,14 @@ object JoexServer {
def stream[F[_]: ConcurrentEffect: ContextShift]( def stream[F[_]: ConcurrentEffect: ContextShift](
cfg: Config, cfg: Config,
connectEC: ExecutionContext, connectEC: ExecutionContext,
clientEC: ExecutionContext,
blocker: Blocker blocker: Blocker
)(implicit T: Timer[F]): Stream[F, Nothing] = { )(implicit T: Timer[F]): Stream[F, Nothing] = {
val app = for { val app = for {
signal <- Resource.liftF(SignallingRef[F, Boolean](false)) signal <- Resource.liftF(SignallingRef[F, Boolean](false))
exitCode <- Resource.liftF(Ref[F].of(ExitCode.Success)) exitCode <- Resource.liftF(Ref[F].of(ExitCode.Success))
joexApp <- JoexAppImpl.create[F](cfg, signal, connectEC, blocker) joexApp <- JoexAppImpl.create[F](cfg, signal, connectEC, clientEC, blocker)
httpApp = Router( httpApp = Router(
"/api/info" -> InfoRoutes(), "/api/info" -> InfoRoutes(),

View File

@ -13,10 +13,10 @@ import org.log4s._
object Main extends IOApp { object Main extends IOApp {
private[this] val logger = getLogger private[this] val logger = getLogger
val blockingEc: ExecutionContext = ExecutionContext.fromExecutor( val blockingEC: ExecutionContext = ExecutionContext.fromExecutor(
Executors.newCachedThreadPool(ThreadFactories.ofName("docspell-joex-blocking")) Executors.newCachedThreadPool(ThreadFactories.ofName("docspell-joex-blocking"))
) )
val blocker = Blocker.liftExecutionContext(blockingEc) val blocker = Blocker.liftExecutionContext(blockingEC)
val connectEC: ExecutionContext = ExecutionContext.fromExecutorService( val connectEC: ExecutionContext = ExecutionContext.fromExecutorService(
Executors.newFixedThreadPool(5, ThreadFactories.ofName("docspell-joex-dbconnect")) Executors.newFixedThreadPool(5, ThreadFactories.ofName("docspell-joex-dbconnect"))
) )
@ -52,6 +52,6 @@ object Main extends IOApp {
cfg.baseUrl cfg.baseUrl
) )
logger.info(s"\n${banner.render("***>")}") logger.info(s"\n${banner.render("***>")}")
JoexServer.stream[IO](cfg, connectEC, blocker).compile.drain.as(ExitCode.Success) JoexServer.stream[IO](cfg, connectEC, blockingEC, blocker).compile.drain.as(ExitCode.Success)
} }
} }

View File

@ -0,0 +1,26 @@
package docspell.joex.hk
import cats.implicits._
import cats.effect._
import docspell.common._
import docspell.joex.scheduler.Task
import docspell.store.records._
object CleanupInvitesTask {
def apply[F[_]: Sync](cfg: HouseKeepingConfig.CleanupInvites): Task[F, Unit, Unit] =
Task { ctx =>
if (cfg.enabled) {
for {
now <- Timestamp.current[F]
ts = now - cfg.olderThan
_ <- ctx.logger.info(s"Cleanup invitations older than $ts")
n <- ctx.store.transact(RInvitation.deleteOlderThan(ts))
_ <- ctx.logger.info(s"Removed $n invitations")
} yield ()
} else {
ctx.logger.info("CleanupInvites task is disabled in the configuration")
}
}
}

View File

@ -0,0 +1,36 @@
package docspell.joex.hk
import cats.implicits._
import cats.effect._
import fs2.Stream
import docspell.common._
import docspell.joex.scheduler.Task
import docspell.store.Store
import docspell.store.records._
object CleanupJobsTask {
def apply[F[_]: Sync](cfg: HouseKeepingConfig.CleanupJobs): Task[F, Unit, Unit] =
Task { ctx =>
if (cfg.enabled) {
for {
now <- Timestamp.current[F]
ts = now - cfg.olderThan
_ <- ctx.logger.info(s"Cleanup jobs older than $ts")
n <- deleteDoneJobs(ctx.store, ts, cfg.deleteBatch)
_ <- ctx.logger.info(s"Removed $n jobs")
} yield ()
} else {
ctx.logger.info("CleanupJobs task is disabled in the configuration")
}
}
def deleteDoneJobs[F[_]: Sync](store: Store[F], ts: Timestamp, batch: Int): F[Int] =
Stream
.eval(store.transact(RJob.deleteDoneAndOlderThan(ts, batch)))
.repeat
.takeWhile(_ > 0)
.compile
.foldMonoid
}

View File

@ -0,0 +1,20 @@
package docspell.joex.hk
import com.github.eikek.calev.CalEvent
import docspell.common._
import HouseKeepingConfig._
case class HouseKeepingConfig(
schedule: CalEvent,
cleanupInvites: CleanupInvites,
cleanupJobs: CleanupJobs
)
object HouseKeepingConfig {
case class CleanupInvites(enabled: Boolean, olderThan: Duration)
case class CleanupJobs(enabled: Boolean, olderThan: Duration, deleteBatch: Int)
}

View File

@ -0,0 +1,39 @@
package docspell.joex.hk
import cats.implicits._
import cats.effect._
import com.github.eikek.calev._
import docspell.common._
import docspell.joex.Config
import docspell.joex.scheduler.Task
import docspell.store.records._
object HouseKeepingTask {
private val periodicId = Ident.unsafe("docspell-houskeeping")
val systemGroup: Ident = Ident.unsafe("docspell-system")
val taskName: Ident = Ident.unsafe("housekeeping")
def apply[F[_]: Sync](cfg: Config): Task[F, Unit, Unit] =
Task.log[F](_.info(s"Running house-keeping task now"))
.flatMap(_ => CleanupInvitesTask(cfg.houseKeeping.cleanupInvites))
.flatMap(_ => CleanupJobsTask(cfg.houseKeeping.cleanupJobs))
def onCancel[F[_]: Sync]: Task[F, Unit, Unit] =
Task.log(_.warn("Cancelling house-keeping task"))
def periodicTask[F[_]: Sync](ce: CalEvent): F[RPeriodicTask] =
RPeriodicTask
.createJson(
true,
taskName,
systemGroup,
(),
"Docspell house-keeping",
systemGroup,
Priority.Low,
ce
)
.map(_.copy(id = periodicId))
}

View File

@ -19,7 +19,8 @@ object JoexRoutes {
case POST -> Root / "notify" => case POST -> Root / "notify" =>
for { for {
_ <- app.scheduler.notifyChange _ <- app.scheduler.notifyChange
resp <- Ok(BasicResult(true, "Scheduler notified.")) _ <- app.periodicScheduler.notifyChange
resp <- Ok(BasicResult(true, "Schedulers notified."))
} yield resp } yield resp
case GET -> Root / "running" => case GET -> Root / "running" =>

View File

@ -0,0 +1,49 @@
package docspell.joex.scheduler
import fs2._
import fs2.concurrent.SignallingRef
import cats.effect._
import docspell.joexapi.client.JoexClient
import docspell.store.queue._
/** A periodic scheduler takes care to submit periodic tasks to the
* job queue.
*
* It is run in the background to regularily find a periodic task to
* execute. If the task is due, it will be submitted into the job
* queue where it will be picked up by the scheduler from some joex
* instance. If it is due in the future, a notification is scheduled
* to be received at that time so the task can be looked up again.
*/
trait PeriodicScheduler[F[_]] {
def config: PeriodicSchedulerConfig
def start: Stream[F, Nothing]
def shutdown: F[Unit]
def periodicAwake: F[Fiber[F, Unit]]
def notifyChange: F[Unit]
}
object PeriodicScheduler {
def create[F[_]: ConcurrentEffect: ContextShift](
cfg: PeriodicSchedulerConfig,
sch: Scheduler[F],
queue: JobQueue[F],
store: PeriodicTaskStore[F],
client: JoexClient[F],
timer: Timer[F]
): Resource[F, PeriodicScheduler[F]] =
for {
waiter <- Resource.liftF(SignallingRef(true))
state <- Resource.liftF(SignallingRef(PeriodicSchedulerImpl.emptyState[F]))
psch = new PeriodicSchedulerImpl[F](cfg, sch, queue, store, client, waiter, state, timer)
_ <- Resource.liftF(psch.init)
} yield psch
}

View File

@ -0,0 +1,8 @@
package docspell.joex.scheduler
import docspell.common._
case class PeriodicSchedulerConfig(
name: Ident,
wakeupPeriod: Duration
)

View File

@ -0,0 +1,184 @@
package docspell.joex.scheduler
import fs2._
import fs2.concurrent.SignallingRef
import cats.effect._
import cats.implicits._
import org.log4s.getLogger
import com.github.eikek.fs2calev._
import docspell.common._
import docspell.common.syntax.all._
import docspell.joexapi.client.JoexClient
import docspell.store.queue._
import docspell.store.records.RPeriodicTask
import PeriodicSchedulerImpl.State
final class PeriodicSchedulerImpl[F[_]: ConcurrentEffect: ContextShift](
val config: PeriodicSchedulerConfig,
sch: Scheduler[F],
queue: JobQueue[F],
store: PeriodicTaskStore[F],
client: JoexClient[F],
waiter: SignallingRef[F, Boolean],
state: SignallingRef[F, State[F]],
timer: Timer[F]
) extends PeriodicScheduler[F] {
private[this] val logger = getLogger
implicit private val _timer: Timer[F] = timer
def start: Stream[F, Nothing] =
logger.sinfo("Starting periodic scheduler") ++
mainLoop
def shutdown: F[Unit] =
state.modify(_.requestShutdown)
def periodicAwake: F[Fiber[F, Unit]] =
ConcurrentEffect[F].start(
Stream
.awakeEvery[F](config.wakeupPeriod.toScala)
.evalMap(_ => logger.fdebug("Periodic awake reached") *> notifyChange)
.compile
.drain
)
def notifyChange: F[Unit] =
waiter.update(b => !b)
// internal
/**
* On startup, get all periodic jobs from this scheduler and remove
* the mark, so they get picked up again.
*/
def init: F[Unit] =
logError("Error clearing marks")(store.clearMarks(config.name))
def mainLoop: Stream[F, Nothing] = {
val body: F[Boolean] =
for {
_ <- logger.fdebug(s"Going into main loop")
now <- Timestamp.current[F]
_ <- logger.fdebug(s"Looking for next periodic task")
go <- logThrow("Error getting next task")(
store
.takeNext(config.name, None)
.use({
case Marked.Found(pj) =>
logger
.fdebug(s"Found periodic task '${pj.subject}/${pj.timer.asString}'") *>
(if (isTriggered(pj, now)) submitJob(pj)
else scheduleNotify(pj).map(_ => false))
case Marked.NotFound =>
logger.fdebug("No periodic task found") *> false.pure[F]
case Marked.NotMarkable =>
logger.fdebug("Periodic job cannot be marked. Trying again.") *> true
.pure[F]
})
)
} yield go
Stream
.eval(state.get.map(_.shutdownRequest))
.evalTap(
if (_) logger.finfo[F]("Stopping main loop due to shutdown request.")
else ().pure[F]
)
.flatMap(if (_) Stream.empty else Stream.eval(cancelNotify *> body))
.flatMap({
case true =>
mainLoop
case false =>
logger.sdebug(s"Waiting for notify") ++
waiter.discrete.take(2).drain ++
logger.sdebug(s"Notify signal, going into main loop") ++
mainLoop
})
}
def isTriggered(pj: RPeriodicTask, now: Timestamp): Boolean =
pj.nextrun < now
def submitJob(pj: RPeriodicTask): F[Boolean] =
store
.findNonFinalJob(pj.id)
.flatMap({
case Some(job) =>
logger.finfo[F](
s"There is already a job with non-final state '${job.state}' in the queue"
) *> scheduleNotify(pj) *> false.pure[F]
case None =>
logger.finfo[F](s"Submitting job for periodic task '${pj.task.id}'") *>
pj.toJob.flatMap(queue.insert) *> notifyJoex *> true.pure[F]
})
def notifyJoex: F[Unit] =
sch.notifyChange *> store.findJoexNodes.flatMap(
_.traverse(n => client.notifyJoexIgnoreErrors(n.url)).map(_ => ())
)
def scheduleNotify(pj: RPeriodicTask): F[Unit] =
Timestamp
.current[F]
.flatMap(now =>
logger.fdebug(
s"Scheduling next notify for timer ${pj.timer.asString} -> ${pj.timer.nextElapse(now.toUtcDateTime)}"
)
) *>
ConcurrentEffect[F]
.start(
CalevFs2
.sleep[F](pj.timer)
.evalMap(_ => notifyChange)
.compile
.drain
)
.flatMap(fb => state.modify(_.setNotify(fb)))
def cancelNotify: F[Unit] =
state
.modify(_.clearNotify)
.flatMap({
case Some(fb) =>
fb.cancel
case None =>
().pure[F]
})
private def logError(msg: => String)(fa: F[Unit]): F[Unit] =
fa.attempt.flatMap {
case Right(_) => ().pure[F]
case Left(ex) => logger.ferror(ex)(msg).map(_ => ())
}
private def logThrow[A](msg: => String)(fa: F[A]): F[A] =
fa.attempt
.flatMap({
case r @ Right(_) => (r: Either[Throwable, A]).pure[F]
case l @ Left(ex) => logger.ferror(ex)(msg).map(_ => (l: Either[Throwable, A]))
})
.rethrow
}
object PeriodicSchedulerImpl {
def emptyState[F[_]]: State[F] =
State(false, None)
case class State[F[_]](
shutdownRequest: Boolean,
scheduledNotify: Option[Fiber[F, Unit]]
) {
def requestShutdown: (State[F], Unit) =
(copy(shutdownRequest = true), ())
def setNotify(fb: Fiber[F, Unit]): (State[F], Unit) =
(copy(scheduledNotify = Some(fb)), ())
def clearNotify: (State[F], Option[Fiber[F, Unit]]) =
(copy(scheduledNotify = None), scheduledNotify)
}
}

View File

@ -34,6 +34,9 @@ case class SchedulerBuilder[F[_]: ConcurrentEffect: ContextShift](
def withLogSink(sink: LogSink[F]): SchedulerBuilder[F] = def withLogSink(sink: LogSink[F]): SchedulerBuilder[F] =
copy(logSink = sink) copy(logSink = sink)
def withQueue(queue: JobQueue[F]): SchedulerBuilder[F] =
copy(queue = Resource.pure[F, JobQueue[F]](queue))
def serve: Resource[F, Scheduler[F]] = def serve: Resource[F, Scheduler[F]] =
resource.evalMap(sch => ConcurrentEffect[F].start(sch.start.compile.drain).map(_ => sch)) resource.evalMap(sch => ConcurrentEffect[F].start(sch.start.compile.drain).map(_ => sch))

View File

@ -4,6 +4,7 @@ import cats.implicits._
import cats.{Applicative, ApplicativeError, FlatMap, Functor} import cats.{Applicative, ApplicativeError, FlatMap, Functor}
import cats.data.Kleisli import cats.data.Kleisli
import cats.effect.Sync import cats.effect.Sync
import docspell.common.Logger
/** /**
* The code that is executed by the scheduler * The code that is executed by the scheduler
@ -51,4 +52,7 @@ object Task {
def setProgress[F[_]: Sync, A, B](n: Int)(data: B): Task[F, A, B] = def setProgress[F[_]: Sync, A, B](n: Int)(data: B): Task[F, A, B] =
Task(_.setProgress(n).map(_ => data)) Task(_.setProgress(n).map(_ => data))
def log[F[_]](f: Logger[F] => F[Unit]): Task[F, Unit, Unit] =
Task(ctx => f(ctx.logger))
} }

View File

@ -0,0 +1,59 @@
package docspell.joexapi.client
import cats.implicits._
import cats.effect._
import docspell.common.{Ident, LenientUri}
import docspell.common.syntax.all._
import org.http4s.{Method, Request, Uri}
import org.http4s.client.Client
import org.http4s.client.blaze.BlazeClientBuilder
import scala.concurrent.ExecutionContext
import org.log4s.getLogger
trait JoexClient[F[_]] {
def notifyJoex(base: LenientUri): F[Unit]
def notifyJoexIgnoreErrors(base: LenientUri): F[Unit]
def cancelJob(base: LenientUri, job: Ident): F[Unit]
}
object JoexClient {
private[this] val logger = getLogger
def apply[F[_]: Sync](client: Client[F]): JoexClient[F] =
new JoexClient[F] {
def notifyJoex(base: LenientUri): F[Unit] = {
val notifyUrl = base / "api" / "v1" / "notify"
val req = Request[F](Method.POST, uri(notifyUrl))
logger.fdebug(s"Notify joex at ${notifyUrl.asString}") *>
client.expect[String](req).map(_ => ())
}
def notifyJoexIgnoreErrors(base: LenientUri): F[Unit] =
notifyJoex(base).attempt.map {
case Right(()) => ()
case Left(ex) =>
logger.warn(
s"Notifying Joex instance '${base.asString}' failed: ${ex.getMessage}"
)
()
}
def cancelJob(base: LenientUri, job: Ident): F[Unit] = {
val cancelUrl = base / "api" / "v1" / "job" / job.id / "cancel"
val req = Request[F](Method.POST, uri(cancelUrl))
client.expect[String](req).map(_ => ())
}
private def uri(u: LenientUri): Uri =
Uri.unsafeFromString(u.asString)
}
def resource[F[_]: ConcurrentEffect](ec: ExecutionContext): Resource[F, JoexClient[F]] =
BlazeClientBuilder[F](ec).resource.map(apply[F])
}

View File

@ -16,3 +16,4 @@ title: ADRs
- [0009 Convert Office Files](adr/0009_convert_office_docs) - [0009 Convert Office Files](adr/0009_convert_office_docs)
- [0010 Convert Image Files](adr/0010_convert_image_files) - [0010 Convert Image Files](adr/0010_convert_image_files)
- [0011 Extract Text](adr/0011_extract_text) - [0011 Extract Text](adr/0011_extract_text)
- [0012 Periodic Tasks](adr/0012_periodic_tasks)

View File

@ -0,0 +1,102 @@
---
layout: docs
title: Periodic Tasks
---
# Periodic Tasks
## Context and Problem Statement
Currently there is a `Scheduler` that consumes tasks off a queue in
the database. This allows multiple job executors running in parallel
racing for the next job to execute. This is for executing tasks
immediately as long as there are enough resource.
What is missing, is a component that maintains periodic tasks. The
reason for this is to have house keeping tasks that run regularily and
clean up stale or unused data. Later, users should be able to create
periodic tasks, for example to read e-mails from an inbox.
The problem is again, that it must work with multiple job executor
instances running at the same time. This is the same pattern as with
the `Scheduler`: it must be ensured that only one task is used at a
time. Multiple job exectuors must not schedule a perdiodic task more
than once. If a periodic tasks takes longer than the time between
runs, it must wait for the next interval.
## Considered Options
1. Adding a `timer` and `nextrun` field to the current `job` table
2. Creating a separate table for periodic tasks
## Decision Outcome
The 2. option.
For internal housekeeping tasks, it may suffice to reuse the existing
`job` queue by adding more fields such that a job may be considered
periodic. But this conflates with what the `Scheduler` is doing now
(executing tasks as soon as possible while being bound to some
resources) with a completely different subject.
There will be a new `PeriodicScheduler` that works on a new table in
the database that is representing periodic tasks. This table will
share fields with the `job` table to be able to create `RJob`
instances. This new component is only taking care of periodically
submitting jobs to the job queue such that the `Scheduler` will
eventually pick it up and run it.
```sql
CREATE TABLE "periodic_task" (
"id" varchar(254) not null primary key,
"enabled" boolean not null,
"task" varchar(254) not null,
"group_" varchar(254) not null,
"args" text not null,
"subject" varchar(254) not null,
"submitter" varchar(254) not null,
"priority" int not null,
"worker" varchar(254),
"marked" timestamp,
"timer" varchar(254) not null,
"nextrun" timestamp not null,
"created" timestamp not null
);
```
Preparing for other features, periodic tasks will be created by users.
It should be possible to disable/enable them. The next 6 properties
are needed to insert jobs into the `job` table. The `worker` field
(and `marked`) are used to mark a periodic job as "being worked on by
a job executor".
The `timer` is the schedule, which is a
[systemd-like](https://man.cx/systemd.time#heading7) calendar event
string. This is parsed by [this
library](https://github.com/eikek/calev). The `nextrun` field will
store the timestamp of the next time the task would need to be
executed. This is needed to query this table for the newest task.
The `PeriodicScheduler` works roughly like this:
On startup:
- Remove stale worker values. If the process has been killed, there
may be marked tasks which must be cleared now.
Main-Loop:
0. Cancel current scheduled notify (see 4. below)
1. get next (= earliest & enabled) periodic job
2. if none: stop
3. if triggered (= `nextrun <= 'now'`):
- Mark periodic task. On fail: goto 1.
- Submit new job into the jobqueue:
- Update `nextrun` field
- Check for non-final jobs of that name. This is required to not
run the same periodic task multiple times concurrently.
- if exist: goto 4.
- if not exist: submit job
- Unmark periodic task
4. if future
- schedule notify: notify self to run again next time the task
schedule triggers

View File

@ -1,3 +1,8 @@
---
layout: docs
title: Short Title
---
# [short title of solved problem and solution] # [short title of solved problem and solution]
* Status: [proposed | rejected | accepted | deprecated | … | superseded by [ADR-0005](0005-example.md)] <!-- optional --> * Status: [proposed | rejected | accepted | deprecated | … | superseded by [ADR-0005](0005-example.md)] <!-- optional -->

View File

@ -118,7 +118,7 @@ trait Conversions {
) )
def mkItemList(v: Vector[OItem.ListItem]): ItemLightList = { def mkItemList(v: Vector[OItem.ListItem]): ItemLightList = {
val groups = v.groupBy(item => item.date.toDate.toString.substring(0, 7)) val groups = v.groupBy(item => item.date.toUtcDate.toString.substring(0, 7))
def mkGroup(g: (String, Vector[OItem.ListItem])): ItemLightGroup = def mkGroup(g: (String, Vector[OItem.ListItem])): ItemLightGroup =
ItemLightGroup(g._1, g._2.map(mkItemLight).toList) ItemLightGroup(g._1, g._2.map(mkItemLight).toList)

View File

@ -0,0 +1,18 @@
CREATE TABLE `periodic_task` (
`id` varchar(254) not null primary key,
`enabled` boolean not null,
`task` varchar(254) not null,
`group_` varchar(254) not null,
`args` text not null,
`subject` varchar(254) not null,
`submitter` varchar(254) not null,
`priority` int not null,
`worker` varchar(254),
`marked` timestamp,
`timer` varchar(254) not null,
`nextrun` timestamp not null,
`created` timestamp not null
);
CREATE INDEX `periodic_task_nextrun_idx` ON `periodic_task`(`nextrun`);
CREATE INDEX `periodic_task_worker_idx` ON `periodic_task`(`worker`);

View File

@ -0,0 +1,18 @@
CREATE TABLE "periodic_task" (
"id" varchar(254) not null primary key,
"enabled" boolean not null,
"task" varchar(254) not null,
"group_" varchar(254) not null,
"args" text not null,
"subject" varchar(254) not null,
"submitter" varchar(254) not null,
"priority" int not null,
"worker" varchar(254),
"marked" timestamp,
"timer" varchar(254) not null,
"nextrun" timestamp not null,
"created" timestamp not null
);
CREATE INDEX "periodic_task_nextrun_idx" ON "periodic_task"("nextrun");
CREATE INDEX "periodic_task_worker_idx" ON "periodic_task"("worker");

View File

@ -30,6 +30,9 @@ case class Column(name: String, ns: String = "", alias: String = "") {
def is(c: Column): Fragment = def is(c: Column): Fragment =
f ++ fr"=" ++ c.f f ++ fr"=" ++ c.f
def isNot[A: Put](value: A): Fragment =
f ++ fr"<> $value"
def isNull: Fragment = def isNull: Fragment =
f ++ fr"is null" f ++ fr"is null"

View File

@ -7,6 +7,7 @@ import doobie._
import doobie.implicits.legacy.instant._ import doobie.implicits.legacy.instant._
import doobie.util.log.Success import doobie.util.log.Success
import emil.{MailAddress, SSLType} import emil.{MailAddress, SSLType}
import com.github.eikek.calev.CalEvent
import docspell.common._ import docspell.common._
import docspell.common.syntax.all._ import docspell.common.syntax.all._
@ -98,6 +99,9 @@ trait DoobieMeta {
Meta[String].imap(str => str.split(',').toList.map(_.trim).map(EmilUtil.unsafeReadMailAddress))( Meta[String].imap(str => str.split(',').toList.map(_.trim).map(EmilUtil.unsafeReadMailAddress))(
lma => lma.map(EmilUtil.mailAddressString).mkString(",") lma => lma.map(EmilUtil.mailAddressString).mkString(",")
) )
implicit val metaCalEvent: Meta[CalEvent] =
Meta[String].timap(CalEvent.unsafe)(_.asString)
} }
object DoobieMeta extends DoobieMeta { object DoobieMeta extends DoobieMeta {

View File

@ -0,0 +1,65 @@
package docspell.store.queries
import docspell.common._
import docspell.store.impl.Implicits._
import docspell.store.records._
import doobie._
import doobie.implicits._
object QPeriodicTask {
def clearWorkers(name: Ident): ConnectionIO[Int] = {
val worker = RPeriodicTask.Columns.worker
updateRow(RPeriodicTask.table, worker.is(name), worker.setTo[Ident](None)).update.run
}
def setWorker(pid: Ident, name: Ident, ts: Timestamp): ConnectionIO[Int] = {
val id = RPeriodicTask.Columns.id
val worker = RPeriodicTask.Columns.worker
val marked = RPeriodicTask.Columns.marked
updateRow(
RPeriodicTask.table,
and(id.is(pid), worker.isNull),
commas(worker.setTo(name), marked.setTo(ts))
).update.run
}
def unsetWorker(
pid: Ident,
nextRun: Option[Timestamp]
): ConnectionIO[Int] = {
val id = RPeriodicTask.Columns.id
val worker = RPeriodicTask.Columns.worker
val next = RPeriodicTask.Columns.nextrun
updateRow(
RPeriodicTask.table,
id.is(pid),
commas(worker.setTo[Ident](None), next.setTo(nextRun))
).update.run
}
def findNext(excl: Option[Ident]): ConnectionIO[Option[RPeriodicTask]] = {
val enabled = RPeriodicTask.Columns.enabled
val pid = RPeriodicTask.Columns.id
val order = orderBy(RPeriodicTask.Columns.nextrun.f) ++ fr"ASC"
val where = excl match {
case Some(id) => and(pid.isNot(id), enabled.is(true))
case None => enabled.is(true)
}
val sql =
selectSimple(RPeriodicTask.Columns.all, RPeriodicTask.table, where) ++ order
sql.query[RPeriodicTask].streamWithChunkSize(2).take(1).compile.last
}
def findNonFinal(pid: Ident): ConnectionIO[Option[RJob]] =
selectSimple(
RJob.Columns.all,
RJob.table,
and(
RJob.Columns.tracker.is(pid),
RJob.Columns.state.isOneOf(JobState.all.diff(JobState.done).toSeq)
)
).query[RJob].option
}

View File

@ -29,7 +29,7 @@ object JobQueue {
worker: Ident, worker: Ident,
retryPause: Duration retryPause: Duration
): F[Option[RJob]] = ): F[Option[RJob]] =
logger.fdebug("Select next job") *> QJob.takeNextJob(store)(prio, worker, retryPause) logger.ftrace("Select next job") *> QJob.takeNextJob(store)(prio, worker, retryPause)
def insert(job: RJob): F[Unit] = def insert(job: RJob): F[Unit] =
store store

View File

@ -0,0 +1,17 @@
package docspell.store.queue
sealed trait Marked[+A] {}
object Marked {
final case class Found[A](value: A) extends Marked[A]
final case object NotFound extends Marked[Nothing]
final case object NotMarkable extends Marked[Nothing]
def found[A](v: A): Marked[A] = Found(v)
def notFound[A]: Marked[A] = NotFound
def notMarkable[A]: Marked[A] = NotMarkable
}

View File

@ -0,0 +1,126 @@
package docspell.store.queue
import cats.effect._
import cats.implicits._
import org.log4s.getLogger
import com.github.eikek.fs2calev._
import docspell.common._
import docspell.common.syntax.all._
import docspell.store.{AddResult, Store}
import docspell.store.records._
import docspell.store.queries.QPeriodicTask
trait PeriodicTaskStore[F[_]] {
/** Get the free periodic task due next and reserve it to the given
* worker.
*
* If found, the task is returned and resource finalization takes
* care of unmarking the task after use and updating `nextRun` with
* the next timestamp.
*/
def takeNext(
worker: Ident,
excludeId: Option[Ident]
): Resource[F, Marked[RPeriodicTask]]
def clearMarks(name: Ident): F[Unit]
def findNonFinalJob(pjobId: Ident): F[Option[RJob]]
/** Insert a task or update if it already exists.
*/
def insert(task: RPeriodicTask): F[Unit]
/** Adds the task only if it not already exists.
*/
def add(task: RPeriodicTask): F[AddResult]
def findJoexNodes: F[Vector[RNode]]
}
object PeriodicTaskStore {
private[this] val logger = getLogger
def create[F[_]: Sync](store: Store[F]): Resource[F, PeriodicTaskStore[F]] =
Resource.pure[F, PeriodicTaskStore[F]](new PeriodicTaskStore[F] {
def takeNext(
worker: Ident,
excludeId: Option[Ident]
): Resource[F, Marked[RPeriodicTask]] = {
val chooseNext: F[Marked[RPeriodicTask]] =
getNext(excludeId).flatMap {
case Some(pj) =>
mark(pj.id, worker).map {
case true => Marked.found(pj.copy(worker = worker.some))
case false => Marked.notMarkable
}
case None =>
Marked.notFound.pure[F]
}
Resource.make(chooseNext)({
case Marked.Found(pj) => unmark(pj)
case _ => ().pure[F]
})
}
def getNext(excl: Option[Ident]): F[Option[RPeriodicTask]] =
store.transact(QPeriodicTask.findNext(excl))
def mark(pid: Ident, name: Ident): F[Boolean] =
Timestamp
.current[F]
.flatMap(now =>
store.transact(QPeriodicTask.setWorker(pid, name, now)).map(_ > 0)
)
def unmark(job: RPeriodicTask): F[Unit] =
for {
now <- Timestamp.current[F]
nextRun <- CalevFs2
.nextElapses[F](now.atUTC)(job.timer)
.take(1)
.compile
.last
.map(_.map(Timestamp.from))
_ <- store.transact(QPeriodicTask.unsetWorker(job.id, nextRun))
} yield ()
def clearMarks(name: Ident): F[Unit] =
store
.transact(QPeriodicTask.clearWorkers(name))
.flatMap { n =>
if (n > 0) logger.finfo(s"Clearing $n periodic tasks from worker ${name.id}")
else ().pure[F]
}
def findNonFinalJob(pjobId: Ident): F[Option[RJob]] =
store.transact(QPeriodicTask.findNonFinal(pjobId))
def insert(task: RPeriodicTask): F[Unit] = {
val update = store.transact(RPeriodicTask.update(task))
val insertAttempt = store.transact(RPeriodicTask.insert(task)).attempt.map {
case Right(n) => n > 0
case Left(_) => false
}
for {
n1 <- update
ins <- if (n1 == 0) insertAttempt else true.pure[F]
_ <- if (ins) 1.pure[F] else update
} yield ()
}
def add(task: RPeriodicTask): F[AddResult] = {
val insert = RPeriodicTask.insert(task)
val exists = RPeriodicTask.exists(task.id)
store.add(insert, exists)
}
def findJoexNodes: F[Vector[RNode]] =
store.transact(RNode.findAll(NodeType.Joex))
})
}

View File

@ -46,4 +46,7 @@ object RInvitation {
_ <- delete(invite) _ <- delete(invite)
} yield inv > 0 } yield inv > 0
} }
def deleteOlderThan(ts: Timestamp): ConnectionIO[Int] =
deleteFrom(table, created.isLt(ts)).update.run
} }

View File

@ -1,12 +1,15 @@
package docspell.store.records package docspell.store.records
import cats.effect.Sync import cats.effect.Sync
import cats.implicits._
import fs2.Stream
import doobie._ import doobie._
import doobie.implicits._ import doobie.implicits._
import io.circe.Encoder
import docspell.common._ import docspell.common._
import docspell.store.impl.Column import docspell.store.impl.Column
import docspell.store.impl.Implicits._ import docspell.store.impl.Implicits._
import io.circe.Encoder
case class RJob( case class RJob(
id: Ident, id: Ident,
@ -227,7 +230,8 @@ object RJob {
} }
def selectGroupInState(states: Seq[JobState]): ConnectionIO[Vector[Ident]] = { def selectGroupInState(states: Seq[JobState]): ConnectionIO[Vector[Ident]] = {
val sql = selectDistinct(List(group), table, state.isOneOf(states)) ++ orderBy(group.f) val sql =
selectDistinct(List(group), table, state.isOneOf(states)) ++ orderBy(group.f)
sql.query[Ident].to[Vector] sql.query[Ident].to[Vector]
} }
@ -236,4 +240,19 @@ object RJob {
n0 <- RJobLog.deleteAll(jobId) n0 <- RJobLog.deleteAll(jobId)
n1 <- deleteFrom(table, id.is(jobId)).update.run n1 <- deleteFrom(table, id.is(jobId)).update.run
} yield n0 + n1 } yield n0 + n1
def findIdsDoneAndOlderThan(ts: Timestamp): Stream[ConnectionIO, Ident] =
selectSimple(
Seq(id),
table,
and(state.isOneOf(JobState.done.toSeq), or(finished.isNull, finished.isLt(ts)))
).query[Ident].stream
def deleteDoneAndOlderThan(ts: Timestamp, batch: Int): ConnectionIO[Int] =
findIdsDoneAndOlderThan(ts)
.take(batch.toLong)
.evalMap(delete)
.map(_ => 1)
.compile
.foldMonoid
} }

View File

@ -0,0 +1,176 @@
package docspell.store.records
import cats.effect._
import cats.implicits._
import doobie._
import doobie.implicits._
import com.github.eikek.calev.CalEvent
import docspell.common._
import docspell.store.impl.Column
import docspell.store.impl.Implicits._
import io.circe.Encoder
/** A periodic task is a special job description, that shares a few
* properties of a `RJob`. It must provide all information to create
* a `RJob` value eventually.
*/
case class RPeriodicTask(
id: Ident,
enabled: Boolean,
task: Ident,
group: Ident,
args: String,
subject: String,
submitter: Ident,
priority: Priority,
worker: Option[Ident],
marked: Option[Timestamp],
timer: CalEvent,
nextrun: Timestamp,
created: Timestamp
) {
def toJob[F[_]: Sync]: F[RJob] =
for {
now <- Timestamp.current[F]
jid <- Ident.randomId[F]
} yield RJob(
jid,
task,
group,
args,
subject,
now,
submitter,
priority,
JobState.Waiting,
0,
0,
Some(id),
None,
None,
None
)
}
object RPeriodicTask {
def create[F[_]: Sync](
enabled: Boolean,
task: Ident,
group: Ident,
args: String,
subject: String,
submitter: Ident,
priority: Priority,
timer: CalEvent
): F[RPeriodicTask] =
Ident
.randomId[F]
.flatMap(id =>
Timestamp
.current[F]
.map { now =>
RPeriodicTask(
id,
enabled,
task,
group,
args,
subject,
submitter,
priority,
None,
None,
timer,
timer
.nextElapse(now.atZone(Timestamp.UTC))
.map(_.toInstant)
.map(Timestamp.apply)
.getOrElse(Timestamp.Epoch),
now
)
}
)
def createJson[F[_]: Sync, A](
enabled: Boolean,
task: Ident,
group: Ident,
args: A,
subject: String,
submitter: Ident,
priority: Priority,
timer: CalEvent
)(implicit E: Encoder[A]): F[RPeriodicTask] =
create[F](enabled, task, group, E(args).noSpaces, subject, submitter, priority, timer)
val table = fr"periodic_task"
object Columns {
val id = Column("id")
val enabled = Column("enabled")
val task = Column("task")
val group = Column("group_")
val args = Column("args")
val subject = Column("subject")
val submitter = Column("submitter")
val priority = Column("priority")
val worker = Column("worker")
val marked = Column("marked")
val timer = Column("timer")
val nextrun = Column("nextrun")
val created = Column("created")
val all = List(
id,
enabled,
task,
group,
args,
subject,
submitter,
priority,
worker,
marked,
timer,
nextrun,
created
)
}
import Columns._
def insert(v: RPeriodicTask): ConnectionIO[Int] = {
val sql = insertRow(
table,
all,
fr"${v.id},${v.enabled},${v.task},${v.group},${v.args}," ++
fr"${v.subject},${v.submitter},${v.priority},${v.worker}," ++
fr"${v.marked},${v.timer},${v.nextrun},${v.created}"
)
sql.update.run
}
def update(v: RPeriodicTask): ConnectionIO[Int] = {
val sql = updateRow(
table,
id.is(v.id),
commas(
enabled.setTo(v.enabled),
group.setTo(v.group),
args.setTo(v.args),
subject.setTo(v.subject),
submitter.setTo(v.submitter),
priority.setTo(v.priority),
worker.setTo(v.worker),
marked.setTo(v.marked),
timer.setTo(v.timer),
nextrun.setTo(v.nextrun)
)
)
sql.update.run
}
def exists(pid: Ident): ConnectionIO[Boolean] =
selectCount(id, table, id.is(pid)).query[Int].unique.map(_ > 0)
}

View File

@ -7,6 +7,7 @@ object Dependencies {
val BcryptVersion = "0.4" val BcryptVersion = "0.4"
val BetterMonadicForVersion = "0.3.1" val BetterMonadicForVersion = "0.3.1"
val BitpeaceVersion = "0.4.3" val BitpeaceVersion = "0.4.3"
val CalevVersion = "0.1.0"
val CirceVersion = "0.13.0" val CirceVersion = "0.13.0"
val DoobieVersion = "0.8.8" val DoobieVersion = "0.8.8"
val EmilVersion = "0.2.0" val EmilVersion = "0.2.0"
@ -37,6 +38,14 @@ object Dependencies {
val ViewerJSVersion = "0.5.8" val ViewerJSVersion = "0.5.8"
val calevCore = Seq(
"com.github.eikek" %% "calev-core" % CalevVersion,
)
val calevFs2 = Seq(
"com.github.eikek" %% "calev-fs2" % CalevVersion
)
val calev = calevFs2 ++ calevCore
val jclOverSlf4j = Seq( val jclOverSlf4j = Seq(
"org.slf4j" % "jcl-over-slf4j" % Slf4jVersion "org.slf4j" % "jcl-over-slf4j" % Slf4jVersion
) )