diff --git a/build.sbt b/build.sbt index 7f395a82..510aabb6 100644 --- a/build.sbt +++ b/build.sbt @@ -540,7 +540,12 @@ val schedulerImpl = project .settings( name := "docspell-scheduler-impl" ) - .dependsOn(store, schedulerApi, notificationApi, pubsubApi) + .dependsOn( + store % "compile->compile;test->test", + schedulerApi, + notificationApi, + pubsubApi + ) val extract = project .in(file("modules/extract")) diff --git a/modules/backend/src/main/scala/docspell/backend/ops/OJob.scala b/modules/backend/src/main/scala/docspell/backend/ops/OJob.scala index e0c14552..a5d88d35 100644 --- a/modules/backend/src/main/scala/docspell/backend/ops/OJob.scala +++ b/modules/backend/src/main/scala/docspell/backend/ops/OJob.scala @@ -16,7 +16,7 @@ import docspell.pubsub.api.PubSubT import docspell.scheduler.msg.JobDone import docspell.store.Store import docspell.store.UpdateResult -import docspell.store.queries.QJob +import docspell.store.queries.QJobQueue import docspell.store.records.{RJob, RJobLog} trait OJob[F[_]] { @@ -64,7 +64,7 @@ object OJob { def queueState(collective: Ident, maxResults: Int): F[CollectiveQueueState] = store .transact( - QJob.queueStateSnapshot(collective, maxResults.toLong) + QJobQueue.queueStateSnapshot(collective, maxResults.toLong) ) .map(t => JobDetail(t._1, t._2)) .compile diff --git a/modules/scheduler/impl/src/main/scala/docspell/scheduler/impl/JobQueue.scala b/modules/scheduler/impl/src/main/scala/docspell/scheduler/impl/JobQueue.scala index 5317b7dc..fab38496 100644 --- a/modules/scheduler/impl/src/main/scala/docspell/scheduler/impl/JobQueue.scala +++ b/modules/scheduler/impl/src/main/scala/docspell/scheduler/impl/JobQueue.scala @@ -11,7 +11,6 @@ import cats.implicits._ import docspell.common._ import docspell.store.Store -import docspell.store.queries.QJob import docspell.store.records.RJob trait JobQueue[F[_]] { diff --git a/modules/store/src/main/scala/docspell/store/queries/QJob.scala b/modules/scheduler/impl/src/main/scala/docspell/scheduler/impl/QJob.scala similarity index 80% rename from modules/store/src/main/scala/docspell/store/queries/QJob.scala rename to modules/scheduler/impl/src/main/scala/docspell/scheduler/impl/QJob.scala index a172540b..91c37804 100644 --- a/modules/store/src/main/scala/docspell/store/queries/QJob.scala +++ b/modules/scheduler/impl/src/main/scala/docspell/scheduler/impl/QJob.scala @@ -4,10 +4,9 @@ * SPDX-License-Identifier: AGPL-3.0-or-later */ -package docspell.store.queries +package docspell.scheduler.impl -import cats.data.NonEmptyList -import cats.effect._ +import cats.effect.Async import cats.implicits._ import fs2.Stream @@ -15,10 +14,9 @@ import docspell.common._ import docspell.store.Store import docspell.store.qb.DSL._ import docspell.store.qb._ -import docspell.store.records.{RJob, RJobGroupUse, RJobLog} +import docspell.store.records.{RJob, RJobGroupUse} -import doobie._ -import doobie.implicits._ +import doobie.ConnectionIO object QJob { private[this] val cioLogger = docspell.logging.getLogger[ConnectionIO] @@ -231,50 +229,4 @@ object QJob { def findAll[F[_]](ids: Seq[Ident], store: Store[F]): F[Vector[RJob]] = store.transact(RJob.findFromIds(ids)) - - def queueStateSnapshot( - collective: Ident, - max: Long - ): Stream[ConnectionIO, (RJob, Vector[RJobLog])] = { - val JC = RJob.T - val waiting = NonEmptyList.of(JobState.Waiting, JobState.Stuck, JobState.Scheduled) - val running = NonEmptyList.of(JobState.Running) - // val done = JobState.all.filterNot(js => ).diff(waiting).diff(running) - - def selectJobs(now: Timestamp): Stream[ConnectionIO, RJob] = { - val refDate = now.minusHours(24) - val runningJobs = Select( - select(JC.all), - from(JC), - JC.group === collective && JC.state.in(running) - ).orderBy(JC.submitted.desc).build.query[RJob].stream - - val waitingJobs = Select( - select(JC.all), - from(JC), - JC.group === collective && JC.state.in(waiting) && JC.submitted > refDate - ).orderBy(JC.submitted.desc).build.query[RJob].stream.take(max) - - val doneJobs = Select( - select(JC.all), - from(JC), - and( - JC.group === collective, - JC.state.in(JobState.done), - JC.submitted > refDate - ) - ).orderBy(JC.submitted.desc).build.query[RJob].stream.take(max) - - runningJobs ++ waitingJobs ++ doneJobs - } - - def selectLogs(job: RJob): ConnectionIO[Vector[RJobLog]] = - RJobLog.findLogs(job.id) - - for { - now <- Stream.eval(Timestamp.current[ConnectionIO]) - job <- selectJobs(now) - res <- Stream.eval(selectLogs(job)) - } yield (job, res) - } } diff --git a/modules/scheduler/impl/src/main/scala/docspell/scheduler/impl/SchedulerImpl.scala b/modules/scheduler/impl/src/main/scala/docspell/scheduler/impl/SchedulerImpl.scala index 955a3b59..bc87d0fa 100644 --- a/modules/scheduler/impl/src/main/scala/docspell/scheduler/impl/SchedulerImpl.scala +++ b/modules/scheduler/impl/src/main/scala/docspell/scheduler/impl/SchedulerImpl.scala @@ -21,7 +21,6 @@ import docspell.scheduler._ import docspell.scheduler.impl.SchedulerImpl._ import docspell.scheduler.msg.{CancelJob, JobDone, JobsNotify} import docspell.store.Store -import docspell.store.queries.QJob import docspell.store.records.RJob import io.circe.Json diff --git a/modules/store/src/test/scala/docspell/store/queries/QJobTest.scala b/modules/scheduler/impl/src/test/scala/docspell/scheduler/impl/QJobTest.scala similarity index 96% rename from modules/store/src/test/scala/docspell/store/queries/QJobTest.scala rename to modules/scheduler/impl/src/test/scala/docspell/scheduler/impl/QJobTest.scala index adf6facd..6c78a050 100644 --- a/modules/store/src/test/scala/docspell/store/queries/QJobTest.scala +++ b/modules/scheduler/impl/src/test/scala/docspell/scheduler/impl/QJobTest.scala @@ -4,7 +4,7 @@ * SPDX-License-Identifier: AGPL-3.0-or-later */ -package docspell.store.queries +package docspell.scheduler.impl import java.time.Instant import java.util.concurrent.atomic.AtomicLong @@ -14,8 +14,7 @@ import cats.implicits._ import docspell.common._ import docspell.logging.TestLoggingConfig import docspell.store.StoreFixture -import docspell.store.records.RJob -import docspell.store.records.RJobGroupUse +import docspell.store.records.{RJob, RJobGroupUse} import doobie.implicits._ import munit._ diff --git a/modules/store/src/main/scala/docspell/store/queries/QJobQueue.scala b/modules/store/src/main/scala/docspell/store/queries/QJobQueue.scala new file mode 100644 index 00000000..9ab45d2a --- /dev/null +++ b/modules/store/src/main/scala/docspell/store/queries/QJobQueue.scala @@ -0,0 +1,66 @@ +/* + * Copyright 2020 Eike K. & Contributors + * + * SPDX-License-Identifier: AGPL-3.0-or-later + */ + +package docspell.store.queries + +import cats.data.NonEmptyList +import fs2.Stream + +import docspell.common._ +import docspell.store.qb.DSL._ +import docspell.store.qb._ +import docspell.store.records.{RJob, RJobLog} + +import doobie.ConnectionIO + +object QJobQueue { + + def queueStateSnapshot( + collective: Ident, + max: Long + ): Stream[ConnectionIO, (RJob, Vector[RJobLog])] = { + val JC = RJob.T + val waiting = NonEmptyList.of(JobState.Waiting, JobState.Stuck, JobState.Scheduled) + val running = NonEmptyList.of(JobState.Running) + // val done = JobState.all.filterNot(js => ).diff(waiting).diff(running) + + def selectJobs(now: Timestamp): Stream[ConnectionIO, RJob] = { + val refDate = now.minusHours(24) + val runningJobs = Select( + select(JC.all), + from(JC), + JC.group === collective && JC.state.in(running) + ).orderBy(JC.submitted.desc).build.query[RJob].stream + + val waitingJobs = Select( + select(JC.all), + from(JC), + JC.group === collective && JC.state.in(waiting) && JC.submitted > refDate + ).orderBy(JC.submitted.desc).build.query[RJob].stream.take(max) + + val doneJobs = Select( + select(JC.all), + from(JC), + and( + JC.group === collective, + JC.state.in(JobState.done), + JC.submitted > refDate + ) + ).orderBy(JC.submitted.desc).build.query[RJob].stream.take(max) + + runningJobs ++ waitingJobs ++ doneJobs + } + + def selectLogs(job: RJob): ConnectionIO[Vector[RJobLog]] = + RJobLog.findLogs(job.id) + + for { + now <- Stream.eval(Timestamp.current[ConnectionIO]) + job <- selectJobs(now) + res <- Stream.eval(selectLogs(job)) + } yield (job, res) + } +}