From 26d7c91266e79e1d47a31ae04070dd65d03d9d0a Mon Sep 17 00:00:00 2001 From: eikek Date: Wed, 13 Jul 2022 23:37:46 +0200 Subject: [PATCH] Adopt modules to new collective table --- .../notification/api/EventContext.scala | 2 +- .../notification/impl/context/BasicData.scala | 2 +- .../impl/context/DeleteFieldValueCtx.scala | 4 +- .../impl/context/ItemSelectionCtx.scala | 10 +-- .../impl/context/SetFieldValueCtx.scala | 4 +- .../impl/context/TagsChangedCtx.scala | 6 +- .../docspell/scheduler/FindJobOwner.scala | 19 ++++++ .../impl/JobStoreModuleBuilder.scala | 21 +++++-- .../scheduler/impl/JobStorePublish.scala | 36 ++++++----- .../scheduler/impl/SchedulerBuilder.scala | 12 +++- .../scheduler/impl/SchedulerImpl.scala | 61 +++++++++++-------- 11 files changed, 113 insertions(+), 64 deletions(-) create mode 100644 modules/scheduler/api/src/main/scala/docspell/scheduler/FindJobOwner.scala diff --git a/modules/notification/api/src/main/scala/docspell/notification/api/EventContext.scala b/modules/notification/api/src/main/scala/docspell/notification/api/EventContext.scala index c51497ad..61fd0964 100644 --- a/modules/notification/api/src/main/scala/docspell/notification/api/EventContext.scala +++ b/modules/notification/api/src/main/scala/docspell/notification/api/EventContext.scala @@ -26,7 +26,7 @@ trait EventContext { "account" -> Json.obj( "collective" -> event.account.collective.asJson, "user" -> event.account.login.asJson, - "login" -> event.account.asJson + "login" -> event.account.asAccountId.asJson ), "content" -> content ) diff --git a/modules/notification/impl/src/main/scala/docspell/notification/impl/context/BasicData.scala b/modules/notification/impl/src/main/scala/docspell/notification/impl/context/BasicData.scala index 9c552969..94f15cdf 100644 --- a/modules/notification/impl/src/main/scala/docspell/notification/impl/context/BasicData.scala +++ b/modules/notification/impl/src/main/scala/docspell/notification/impl/context/BasicData.scala @@ -70,7 +70,7 @@ object BasicData { def find( itemIds: NonEmptyList[Ident], - account: AccountId, + account: AccountInfo, now: Timestamp ): ConnectionIO[Vector[Item]] = { import ItemQueryDsl._ diff --git a/modules/notification/impl/src/main/scala/docspell/notification/impl/context/DeleteFieldValueCtx.scala b/modules/notification/impl/src/main/scala/docspell/notification/impl/context/DeleteFieldValueCtx.scala index b204f823..0f87c4fb 100644 --- a/modules/notification/impl/src/main/scala/docspell/notification/impl/context/DeleteFieldValueCtx.scala +++ b/modules/notification/impl/src/main/scala/docspell/notification/impl/context/DeleteFieldValueCtx.scala @@ -46,7 +46,7 @@ object DeleteFieldValueCtx { for { now <- OptionT.liftF(Timestamp.current[ConnectionIO]) items <- OptionT.liftF(Item.find(ev.items, ev.account, now)) - field <- OptionT(RCustomField.findById(ev.field, ev.account.collective)) + field <- OptionT(RCustomField.findById(ev.field, ev.account.collectiveId)) msg = DeleteFieldValueCtx( ev, Data( @@ -71,7 +71,7 @@ object DeleteFieldValueCtx { ) final case class Data( - account: AccountId, + account: AccountInfo, items: List[Item], field: Field, itemUrl: Option[String] diff --git a/modules/notification/impl/src/main/scala/docspell/notification/impl/context/ItemSelectionCtx.scala b/modules/notification/impl/src/main/scala/docspell/notification/impl/context/ItemSelectionCtx.scala index 994ab1b9..fba4a38e 100644 --- a/modules/notification/impl/src/main/scala/docspell/notification/impl/context/ItemSelectionCtx.scala +++ b/modules/notification/impl/src/main/scala/docspell/notification/impl/context/ItemSelectionCtx.scala @@ -61,7 +61,7 @@ object ItemSelectionCtx { items.toList, ev.itemUrl, ev.more, - ev.account.user.id + ev.account.login.id ) ) } yield msg @@ -73,12 +73,12 @@ object ItemSelectionCtx { items <- ev.items.traverse(Item.sample[F]) } yield ItemSelectionCtx( ev, - Data(ev.account, items.toList, ev.itemUrl, ev.more, ev.account.user.id) + Data(ev.account, items.toList, ev.itemUrl, ev.more, ev.account.login.id) ) ) final case class Data( - account: AccountId, + account: AccountInfo, items: List[Item], itemUrl: Option[String], more: Boolean, @@ -89,7 +89,7 @@ object ItemSelectionCtx { io.circe.generic.semiauto.deriveEncoder def create( - account: AccountId, + account: AccountInfo, items: Vector[ListItem], baseUrl: Option[LenientUri], more: Boolean, @@ -100,7 +100,7 @@ object ItemSelectionCtx { items.map(Item(now)).toList, baseUrl.map(_.asString), more, - account.user.id + account.login.id ) } diff --git a/modules/notification/impl/src/main/scala/docspell/notification/impl/context/SetFieldValueCtx.scala b/modules/notification/impl/src/main/scala/docspell/notification/impl/context/SetFieldValueCtx.scala index ebb8dc70..ced15467 100644 --- a/modules/notification/impl/src/main/scala/docspell/notification/impl/context/SetFieldValueCtx.scala +++ b/modules/notification/impl/src/main/scala/docspell/notification/impl/context/SetFieldValueCtx.scala @@ -44,7 +44,7 @@ object SetFieldValueCtx { for { now <- OptionT.liftF(Timestamp.current[ConnectionIO]) items <- OptionT.liftF(Item.find(ev.items, ev.account, now)) - field <- OptionT(RCustomField.findById(ev.field, ev.account.collective)) + field <- OptionT(RCustomField.findById(ev.field, ev.account.collectiveId)) msg = SetFieldValueCtx( ev, Data( @@ -70,7 +70,7 @@ object SetFieldValueCtx { ) final case class Data( - account: AccountId, + account: AccountInfo, items: List[Item], field: Field, value: String, diff --git a/modules/notification/impl/src/main/scala/docspell/notification/impl/context/TagsChangedCtx.scala b/modules/notification/impl/src/main/scala/docspell/notification/impl/context/TagsChangedCtx.scala index bd7aa8d4..f674ecf1 100644 --- a/modules/notification/impl/src/main/scala/docspell/notification/impl/context/TagsChangedCtx.scala +++ b/modules/notification/impl/src/main/scala/docspell/notification/impl/context/TagsChangedCtx.scala @@ -39,8 +39,8 @@ object TagsChangedCtx { def apply: Factory = EventContext.factory(ev => for { - tagsAdded <- RTag.findAllByNameOrId(ev.added, ev.account.collective) - tagsRemov <- RTag.findAllByNameOrId(ev.removed, ev.account.collective) + tagsAdded <- RTag.findAllByNameOrId(ev.added, ev.account.collectiveId) + tagsRemov <- RTag.findAllByNameOrId(ev.removed, ev.account.collectiveId) now <- Timestamp.current[ConnectionIO] items <- Item.find(ev.items, ev.account, now) msg = TagsChangedCtx( @@ -69,7 +69,7 @@ object TagsChangedCtx { ) final case class Data( - account: AccountId, + account: AccountInfo, items: List[Item], added: List[Tag], removed: List[Tag], diff --git a/modules/scheduler/api/src/main/scala/docspell/scheduler/FindJobOwner.scala b/modules/scheduler/api/src/main/scala/docspell/scheduler/FindJobOwner.scala new file mode 100644 index 00000000..66578f53 --- /dev/null +++ b/modules/scheduler/api/src/main/scala/docspell/scheduler/FindJobOwner.scala @@ -0,0 +1,19 @@ +package docspell.scheduler + +import cats.Applicative +import docspell.common.AccountInfo + +/** Strategy to find the user that submitted the job. This is used to emit events about + * starting/finishing jobs. + * + * If an account cannot be determined, no events can be send. + */ +trait FindJobOwner[F[_]] { + def apply(job: Job[_]): F[Option[AccountInfo]] +} + +object FindJobOwner { + + def none[F[_]: Applicative]: FindJobOwner[F] = + (_: Job[_]) => Applicative[F].pure(None) +} diff --git a/modules/scheduler/impl/src/main/scala/docspell/scheduler/impl/JobStoreModuleBuilder.scala b/modules/scheduler/impl/src/main/scala/docspell/scheduler/impl/JobStoreModuleBuilder.scala index 0cab5e30..b62ae5cd 100644 --- a/modules/scheduler/impl/src/main/scala/docspell/scheduler/impl/JobStoreModuleBuilder.scala +++ b/modules/scheduler/impl/src/main/scala/docspell/scheduler/impl/JobStoreModuleBuilder.scala @@ -17,7 +17,8 @@ import docspell.store.Store case class JobStoreModuleBuilder[F[_]: Async]( store: Store[F], pubsub: PubSubT[F], - eventSink: EventSink[F] + eventSink: EventSink[F], + findJobOwner: FindJobOwner[F] ) { def withPubsub(ps: PubSubT[F]): JobStoreModuleBuilder[F] = copy(pubsub = ps) @@ -25,8 +26,11 @@ case class JobStoreModuleBuilder[F[_]: Async]( def withEventSink(es: EventSink[F]): JobStoreModuleBuilder[F] = copy(eventSink = es) + def withFindJobOwner(f: FindJobOwner[F]): JobStoreModuleBuilder[F] = + copy(findJobOwner = f) + def build: JobStoreModuleBuilder.Module[F] = { - val jobStore = JobStorePublish(store, pubsub, eventSink) + val jobStore = JobStorePublish(store, pubsub, eventSink, findJobOwner) val periodicTaskStore = PeriodicTaskStore(store, jobStore) val userTaskStore = UserTaskStoreImpl(store, periodicTaskStore) new JobStoreModuleBuilder.Module( @@ -35,7 +39,8 @@ case class JobStoreModuleBuilder[F[_]: Async]( jobStore, store, eventSink, - pubsub + pubsub, + findJobOwner ) } } @@ -43,7 +48,12 @@ case class JobStoreModuleBuilder[F[_]: Async]( object JobStoreModuleBuilder { def apply[F[_]: Async](store: Store[F]): JobStoreModuleBuilder[F] = - JobStoreModuleBuilder(store, PubSubT.noop[F], EventSink.silent[F]) + JobStoreModuleBuilder( + store, + PubSubT.noop[F], + EventSink.silent[F], + FindJobOwner.none[F] + ) final class Module[F[_]]( val userTasks: UserTaskStore[F], @@ -51,6 +61,7 @@ object JobStoreModuleBuilder { val jobs: JobStore[F], val store: Store[F], val eventSink: EventSink[F], - val pubSubT: PubSubT[F] + val pubSubT: PubSubT[F], + val findJobOwner: FindJobOwner[F] ) extends JobStoreModule[F] {} } diff --git a/modules/scheduler/impl/src/main/scala/docspell/scheduler/impl/JobStorePublish.scala b/modules/scheduler/impl/src/main/scala/docspell/scheduler/impl/JobStorePublish.scala index 3d88e322..4bb82b99 100644 --- a/modules/scheduler/impl/src/main/scala/docspell/scheduler/impl/JobStorePublish.scala +++ b/modules/scheduler/impl/src/main/scala/docspell/scheduler/impl/JobStorePublish.scala @@ -6,9 +6,9 @@ package docspell.scheduler.impl +import cats.data.OptionT import cats.effect._ import cats.implicits._ - import docspell.common.{Ident, JobState} import docspell.notification.api.{Event, EventSink} import docspell.pubsub.api.PubSubT @@ -19,26 +19,31 @@ import docspell.store.Store final class JobStorePublish[F[_]: Sync]( delegate: JobStore[F], pubsub: PubSubT[F], - eventSink: EventSink[F] + eventSink: EventSink[F], + findJobOwner: FindJobOwner[F] ) extends JobStore[F] { private def msg(job: Job[String]): JobSubmitted = JobSubmitted(job.id, job.group, job.task, job.args) - private def event(job: Job[String]): Event.JobSubmitted = - Event.JobSubmitted( - job.id, - job.group, - job.task, - job.args, - JobState.waiting, - job.subject, - job.submitter - ) + private def event(job: Job[String]): OptionT[F, Event.JobSubmitted] = + OptionT(findJobOwner(job)) + .map( + Event.JobSubmitted( + _, + job.id, + job.group, + job.task, + job.args, + JobState.waiting, + job.subject, + job.submitter + ) + ) private def publish(job: Job[String]): F[Unit] = pubsub.publish1(JobSubmitted.topic, msg(job)).as(()) *> - eventSink.offer(event(job)) + event(job).semiflatMap(eventSink.offer).value.void private def notifyJoex: F[Unit] = pubsub.publish1IgnoreErrors(JobsNotify(), ()).void @@ -82,7 +87,8 @@ object JobStorePublish { def apply[F[_]: Async]( store: Store[F], pubSub: PubSubT[F], - eventSink: EventSink[F] + eventSink: EventSink[F], + findJobOwner: FindJobOwner[F] ): JobStore[F] = - new JobStorePublish[F](JobStoreImpl(store), pubSub, eventSink) + new JobStorePublish[F](JobStoreImpl(store), pubSub, eventSink, findJobOwner) } diff --git a/modules/scheduler/impl/src/main/scala/docspell/scheduler/impl/SchedulerBuilder.scala b/modules/scheduler/impl/src/main/scala/docspell/scheduler/impl/SchedulerBuilder.scala index 6a6e0c2a..f2def0e1 100644 --- a/modules/scheduler/impl/src/main/scala/docspell/scheduler/impl/SchedulerBuilder.scala +++ b/modules/scheduler/impl/src/main/scala/docspell/scheduler/impl/SchedulerBuilder.scala @@ -23,7 +23,8 @@ case class SchedulerBuilder[F[_]: Async]( queue: JobQueue[F], logSink: LogSink[F], pubSub: PubSubT[F], - eventSink: EventSink[F] + eventSink: EventSink[F], + findJobOwner: FindJobOwner[F] ) { def withConfig(cfg: SchedulerConfig): SchedulerBuilder[F] = @@ -32,7 +33,7 @@ case class SchedulerBuilder[F[_]: Async]( def withTaskRegistry(reg: JobTaskRegistry[F]): SchedulerBuilder[F] = copy(tasks = reg) - def withTask[A](task: JobTask[F]): SchedulerBuilder[F] = + def withTask(task: JobTask[F]): SchedulerBuilder[F] = withTaskRegistry(tasks.withTask(task)) def withLogSink(sink: LogSink[F]): SchedulerBuilder[F] = @@ -47,6 +48,9 @@ case class SchedulerBuilder[F[_]: Async]( def withEventSink(sink: EventSink[F]): SchedulerBuilder[F] = copy(eventSink = sink) + def withFindJobOwner(f: FindJobOwner[F]): SchedulerBuilder[F] = + copy(findJobOwner = f) + def serve: Resource[F, Scheduler[F]] = resource.evalMap(sch => Async[F].start(sch.start.compile.drain).map(_ => sch)) @@ -60,6 +64,7 @@ case class SchedulerBuilder[F[_]: Async]( queue, pubSub, eventSink, + findJobOwner, tasks, store, logSink, @@ -86,6 +91,7 @@ object SchedulerBuilder { JobQueue(store), LogSink.db[F](store), PubSubT.noop[F], - EventSink.silent[F] + EventSink.silent[F], + FindJobOwner.none[F] ) } diff --git a/modules/scheduler/impl/src/main/scala/docspell/scheduler/impl/SchedulerImpl.scala b/modules/scheduler/impl/src/main/scala/docspell/scheduler/impl/SchedulerImpl.scala index db6fc5f1..d39b26d7 100644 --- a/modules/scheduler/impl/src/main/scala/docspell/scheduler/impl/SchedulerImpl.scala +++ b/modules/scheduler/impl/src/main/scala/docspell/scheduler/impl/SchedulerImpl.scala @@ -30,6 +30,7 @@ final class SchedulerImpl[F[_]: Async]( queue: JobQueue[F], pubSub: PubSubT[F], eventSink: EventSink[F], + findJobOwner: FindJobOwner[F], tasks: JobTaskRegistry[F], store: Store[F], logSink: LogSink[F], @@ -68,20 +69,19 @@ final class SchedulerImpl[F[_]: Async]( def getRunning: F[Vector[Job[String]]] = state.get .flatMap(s => QJob.findAll(s.getRunning, store)) - .map( - _.map(rj => - Job( - rj.id, - rj.task, - rj.group, - rj.args, - rj.subject, - rj.submitter, - rj.priority, - rj.tracker - ) - ) - ) + .map(_.map(convertJob)) + + private def convertJob(rj: RJob): Job[String] = + Job( + rj.id, + rj.task, + rj.group, + rj.args, + rj.subject, + rj.submitter, + rj.priority, + rj.tracker + ) def requestCancel(jobId: Ident): F[Boolean] = logger.info(s"Scheduler requested to cancel job: ${jobId.id}") *> @@ -235,22 +235,29 @@ final class SchedulerImpl[F[_]: Async]( ) ) _ <- Sync[F].whenA(JobState.isDone(finishState))( - eventSink.offer( - Event.JobDone( - job.id, - job.group, - job.task, - job.args, - job.state, - job.subject, - job.submitter, - result.json.getOrElse(Json.Null), - result.message - ) - ) + makeJobDoneEvent(job, result) + .semiflatMap(eventSink.offer) + .value ) } yield () + private def makeJobDoneEvent(job: RJob, result: JobTaskResult) = + for { + acc <- OptionT(findJobOwner(convertJob(job))) + ev = Event.JobDone( + acc, + job.id, + job.group, + job.task, + job.args, + job.state, + job.subject, + job.submitter, + result.json.getOrElse(Json.Null), + result.message + ) + } yield ev + def onStart(job: RJob): F[Unit] = QJob.setRunning( job.id,