Adopt modules to new collective table

This commit is contained in:
eikek
2022-07-13 23:37:46 +02:00
parent 77f22bb5ea
commit 26d7c91266
11 changed files with 113 additions and 64 deletions

View File

@ -0,0 +1,19 @@
package docspell.scheduler
import cats.Applicative
import docspell.common.AccountInfo
/** Strategy to find the user that submitted the job. This is used to emit events about
* starting/finishing jobs.
*
* If an account cannot be determined, no events can be send.
*/
trait FindJobOwner[F[_]] {
def apply(job: Job[_]): F[Option[AccountInfo]]
}
object FindJobOwner {
def none[F[_]: Applicative]: FindJobOwner[F] =
(_: Job[_]) => Applicative[F].pure(None)
}

View File

@ -17,7 +17,8 @@ import docspell.store.Store
case class JobStoreModuleBuilder[F[_]: Async](
store: Store[F],
pubsub: PubSubT[F],
eventSink: EventSink[F]
eventSink: EventSink[F],
findJobOwner: FindJobOwner[F]
) {
def withPubsub(ps: PubSubT[F]): JobStoreModuleBuilder[F] =
copy(pubsub = ps)
@ -25,8 +26,11 @@ case class JobStoreModuleBuilder[F[_]: Async](
def withEventSink(es: EventSink[F]): JobStoreModuleBuilder[F] =
copy(eventSink = es)
def withFindJobOwner(f: FindJobOwner[F]): JobStoreModuleBuilder[F] =
copy(findJobOwner = f)
def build: JobStoreModuleBuilder.Module[F] = {
val jobStore = JobStorePublish(store, pubsub, eventSink)
val jobStore = JobStorePublish(store, pubsub, eventSink, findJobOwner)
val periodicTaskStore = PeriodicTaskStore(store, jobStore)
val userTaskStore = UserTaskStoreImpl(store, periodicTaskStore)
new JobStoreModuleBuilder.Module(
@ -35,7 +39,8 @@ case class JobStoreModuleBuilder[F[_]: Async](
jobStore,
store,
eventSink,
pubsub
pubsub,
findJobOwner
)
}
}
@ -43,7 +48,12 @@ case class JobStoreModuleBuilder[F[_]: Async](
object JobStoreModuleBuilder {
def apply[F[_]: Async](store: Store[F]): JobStoreModuleBuilder[F] =
JobStoreModuleBuilder(store, PubSubT.noop[F], EventSink.silent[F])
JobStoreModuleBuilder(
store,
PubSubT.noop[F],
EventSink.silent[F],
FindJobOwner.none[F]
)
final class Module[F[_]](
val userTasks: UserTaskStore[F],
@ -51,6 +61,7 @@ object JobStoreModuleBuilder {
val jobs: JobStore[F],
val store: Store[F],
val eventSink: EventSink[F],
val pubSubT: PubSubT[F]
val pubSubT: PubSubT[F],
val findJobOwner: FindJobOwner[F]
) extends JobStoreModule[F] {}
}

View File

@ -6,9 +6,9 @@
package docspell.scheduler.impl
import cats.data.OptionT
import cats.effect._
import cats.implicits._
import docspell.common.{Ident, JobState}
import docspell.notification.api.{Event, EventSink}
import docspell.pubsub.api.PubSubT
@ -19,26 +19,31 @@ import docspell.store.Store
final class JobStorePublish[F[_]: Sync](
delegate: JobStore[F],
pubsub: PubSubT[F],
eventSink: EventSink[F]
eventSink: EventSink[F],
findJobOwner: FindJobOwner[F]
) extends JobStore[F] {
private def msg(job: Job[String]): JobSubmitted =
JobSubmitted(job.id, job.group, job.task, job.args)
private def event(job: Job[String]): Event.JobSubmitted =
Event.JobSubmitted(
job.id,
job.group,
job.task,
job.args,
JobState.waiting,
job.subject,
job.submitter
)
private def event(job: Job[String]): OptionT[F, Event.JobSubmitted] =
OptionT(findJobOwner(job))
.map(
Event.JobSubmitted(
_,
job.id,
job.group,
job.task,
job.args,
JobState.waiting,
job.subject,
job.submitter
)
)
private def publish(job: Job[String]): F[Unit] =
pubsub.publish1(JobSubmitted.topic, msg(job)).as(()) *>
eventSink.offer(event(job))
event(job).semiflatMap(eventSink.offer).value.void
private def notifyJoex: F[Unit] =
pubsub.publish1IgnoreErrors(JobsNotify(), ()).void
@ -82,7 +87,8 @@ object JobStorePublish {
def apply[F[_]: Async](
store: Store[F],
pubSub: PubSubT[F],
eventSink: EventSink[F]
eventSink: EventSink[F],
findJobOwner: FindJobOwner[F]
): JobStore[F] =
new JobStorePublish[F](JobStoreImpl(store), pubSub, eventSink)
new JobStorePublish[F](JobStoreImpl(store), pubSub, eventSink, findJobOwner)
}

View File

@ -23,7 +23,8 @@ case class SchedulerBuilder[F[_]: Async](
queue: JobQueue[F],
logSink: LogSink[F],
pubSub: PubSubT[F],
eventSink: EventSink[F]
eventSink: EventSink[F],
findJobOwner: FindJobOwner[F]
) {
def withConfig(cfg: SchedulerConfig): SchedulerBuilder[F] =
@ -32,7 +33,7 @@ case class SchedulerBuilder[F[_]: Async](
def withTaskRegistry(reg: JobTaskRegistry[F]): SchedulerBuilder[F] =
copy(tasks = reg)
def withTask[A](task: JobTask[F]): SchedulerBuilder[F] =
def withTask(task: JobTask[F]): SchedulerBuilder[F] =
withTaskRegistry(tasks.withTask(task))
def withLogSink(sink: LogSink[F]): SchedulerBuilder[F] =
@ -47,6 +48,9 @@ case class SchedulerBuilder[F[_]: Async](
def withEventSink(sink: EventSink[F]): SchedulerBuilder[F] =
copy(eventSink = sink)
def withFindJobOwner(f: FindJobOwner[F]): SchedulerBuilder[F] =
copy(findJobOwner = f)
def serve: Resource[F, Scheduler[F]] =
resource.evalMap(sch => Async[F].start(sch.start.compile.drain).map(_ => sch))
@ -60,6 +64,7 @@ case class SchedulerBuilder[F[_]: Async](
queue,
pubSub,
eventSink,
findJobOwner,
tasks,
store,
logSink,
@ -86,6 +91,7 @@ object SchedulerBuilder {
JobQueue(store),
LogSink.db[F](store),
PubSubT.noop[F],
EventSink.silent[F]
EventSink.silent[F],
FindJobOwner.none[F]
)
}

View File

@ -30,6 +30,7 @@ final class SchedulerImpl[F[_]: Async](
queue: JobQueue[F],
pubSub: PubSubT[F],
eventSink: EventSink[F],
findJobOwner: FindJobOwner[F],
tasks: JobTaskRegistry[F],
store: Store[F],
logSink: LogSink[F],
@ -68,20 +69,19 @@ final class SchedulerImpl[F[_]: Async](
def getRunning: F[Vector[Job[String]]] =
state.get
.flatMap(s => QJob.findAll(s.getRunning, store))
.map(
_.map(rj =>
Job(
rj.id,
rj.task,
rj.group,
rj.args,
rj.subject,
rj.submitter,
rj.priority,
rj.tracker
)
)
)
.map(_.map(convertJob))
private def convertJob(rj: RJob): Job[String] =
Job(
rj.id,
rj.task,
rj.group,
rj.args,
rj.subject,
rj.submitter,
rj.priority,
rj.tracker
)
def requestCancel(jobId: Ident): F[Boolean] =
logger.info(s"Scheduler requested to cancel job: ${jobId.id}") *>
@ -235,22 +235,29 @@ final class SchedulerImpl[F[_]: Async](
)
)
_ <- Sync[F].whenA(JobState.isDone(finishState))(
eventSink.offer(
Event.JobDone(
job.id,
job.group,
job.task,
job.args,
job.state,
job.subject,
job.submitter,
result.json.getOrElse(Json.Null),
result.message
)
)
makeJobDoneEvent(job, result)
.semiflatMap(eventSink.offer)
.value
)
} yield ()
private def makeJobDoneEvent(job: RJob, result: JobTaskResult) =
for {
acc <- OptionT(findJobOwner(convertJob(job)))
ev = Event.JobDone(
acc,
job.id,
job.group,
job.task,
job.args,
job.state,
job.subject,
job.submitter,
result.json.getOrElse(Json.Null),
result.message
)
} yield ev
def onStart(job: RJob): F[Unit] =
QJob.setRunning(
job.id,