mirror of
https://github.com/TheAnachronism/docspell.git
synced 2025-04-05 02:49:32 +00:00
Merge pull request #1987 from eikek/notify-jobs
Fix notifications for collective scoped tasks
This commit is contained in:
commit
aad73824d7
@ -6,6 +6,8 @@
|
|||||||
|
|
||||||
package docspell.backend.joex
|
package docspell.backend.joex
|
||||||
|
|
||||||
|
import cats.syntax.all._
|
||||||
|
|
||||||
import docspell.common.AccountId
|
import docspell.common.AccountId
|
||||||
import docspell.scheduler.FindJobOwner
|
import docspell.scheduler.FindJobOwner
|
||||||
import docspell.store.Store
|
import docspell.store.Store
|
||||||
@ -15,9 +17,11 @@ import docspell.store.queries.QLogin
|
|||||||
* login.
|
* login.
|
||||||
*/
|
*/
|
||||||
object FindJobOwnerAccount {
|
object FindJobOwnerAccount {
|
||||||
def apply[F[_]](store: Store[F]): FindJobOwner[F] =
|
def apply[F[_]: cats.effect.Sync](store: Store[F]): FindJobOwner[F] =
|
||||||
FindJobOwner.of { job =>
|
FindJobOwner.of { job =>
|
||||||
|
val logger = docspell.logging.getLogger[F]
|
||||||
val accountId = AccountId(job.group, job.submitter)
|
val accountId = AccountId(job.group, job.submitter)
|
||||||
store.transact(QLogin.findAccount(accountId))
|
logger.debug(s"Searching for account of ids: $accountId ") *>
|
||||||
|
store.transact(QLogin.findAccount(accountId))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -30,6 +30,7 @@ object EventNotify {
|
|||||||
Kleisli { event =>
|
Kleisli { event =>
|
||||||
(for {
|
(for {
|
||||||
hooks <- OptionT.liftF(store.transact(QNotification.findChannelsForEvent(event)))
|
hooks <- OptionT.liftF(store.transact(QNotification.findChannelsForEvent(event)))
|
||||||
|
_ <- OptionT.liftF(logger.trace(s"Found hooks: $hooks for event: $event"))
|
||||||
evctx <- DbEventContext.apply.run(event).mapK(store.transform)
|
evctx <- DbEventContext.apply.run(event).mapK(store.transform)
|
||||||
channels = hooks
|
channels = hooks
|
||||||
.filter(hc =>
|
.filter(hc =>
|
||||||
|
@ -136,15 +136,14 @@ final class SchedulerImpl[F[_]: Async](
|
|||||||
def mainLoop: Stream[F, Nothing] = {
|
def mainLoop: Stream[F, Nothing] = {
|
||||||
val body: F[Boolean] =
|
val body: F[Boolean] =
|
||||||
for {
|
for {
|
||||||
_ <- permits.available.flatMap(a =>
|
_ <- permits.available.flatTap(a =>
|
||||||
logger.debug(s"Try to acquire permit ($a free)")
|
logger.debug(s"Try to acquire permit ($a free)")
|
||||||
)
|
)
|
||||||
_ <- permits.acquire
|
_ <- permits.acquire
|
||||||
_ <- logger.debug("New permit acquired")
|
_ <- logger.debug("New permit acquired")
|
||||||
down <- state.get.map(_.shutdownRequest)
|
down <- state.get.map(_.shutdownRequest)
|
||||||
rjob <-
|
rjob <-
|
||||||
if (down)
|
if (down) permits.release.as(Option.empty[RJob])
|
||||||
logger.info("") *> permits.release *> (None: Option[RJob]).pure[F]
|
|
||||||
else
|
else
|
||||||
queue.nextJob(
|
queue.nextJob(
|
||||||
group => state.modify(_.nextPrio(group, config.countingScheme)),
|
group => state.modify(_.nextPrio(group, config.countingScheme)),
|
||||||
@ -220,7 +219,7 @@ final class SchedulerImpl[F[_]: Async](
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
def onFinish(job: RJob, result: JobTaskResult, finishState: JobState): F[Unit] =
|
private def onFinish(job: RJob, result: JobTaskResult, finishState: JobState): F[Unit] =
|
||||||
for {
|
for {
|
||||||
_ <- logger.debug(s"Job ${job.info} done $finishState. Releasing resources.")
|
_ <- logger.debug(s"Job ${job.info} done $finishState. Releasing resources.")
|
||||||
_ <- permits.release *> permits.available.flatMap(a =>
|
_ <- permits.release *> permits.available.flatMap(a =>
|
||||||
@ -229,13 +228,13 @@ final class SchedulerImpl[F[_]: Async](
|
|||||||
_ <- state.modify(_.removeRunning(job))
|
_ <- state.modify(_.removeRunning(job))
|
||||||
_ <- QJob.setFinalState(job.id, finishState, store)
|
_ <- QJob.setFinalState(job.id, finishState, store)
|
||||||
_ <- Sync[F].whenA(JobState.isDone(finishState))(
|
_ <- Sync[F].whenA(JobState.isDone(finishState))(
|
||||||
pubSub.publish1IgnoreErrors(
|
logger.trace("Publishing JobDone event") *> pubSub.publish1IgnoreErrors(
|
||||||
JobDone.topic,
|
JobDone.topic,
|
||||||
JobDone(job.id, job.group, job.task, job.args, finishState, result.json)
|
JobDone(job.id, job.group, job.task, job.args, finishState, result.json)
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
_ <- Sync[F].whenA(JobState.isDone(finishState))(
|
_ <- Sync[F].whenA(JobState.isDone(finishState))(
|
||||||
makeJobDoneEvent(job, result)
|
logger.trace("Sending JobDone to event sink") *> makeJobDoneEvent(job, result)
|
||||||
.semiflatMap(eventSink.offer)
|
.semiflatMap(eventSink.offer)
|
||||||
.value
|
.value
|
||||||
)
|
)
|
||||||
@ -243,7 +242,9 @@ final class SchedulerImpl[F[_]: Async](
|
|||||||
|
|
||||||
private def makeJobDoneEvent(job: RJob, result: JobTaskResult) =
|
private def makeJobDoneEvent(job: RJob, result: JobTaskResult) =
|
||||||
for {
|
for {
|
||||||
acc <- OptionT(findJobOwner(convertJob(job)))
|
acc <- OptionT(findJobOwner(convertJob(job))).flatTransform(acc =>
|
||||||
|
logger.debug(s"Found job owner $acc for job $job").as(acc)
|
||||||
|
)
|
||||||
ev = Event.JobDone(
|
ev = Event.JobDone(
|
||||||
acc,
|
acc,
|
||||||
job.id,
|
job.id,
|
||||||
|
@ -61,13 +61,25 @@ object QLogin {
|
|||||||
/** Finds the account given a combination of login/user-id and coll-id/coll-name pair.
|
/** Finds the account given a combination of login/user-id and coll-id/coll-name pair.
|
||||||
*/
|
*/
|
||||||
def findAccount(acc: AccountId): ConnectionIO[Option[AccountInfo]] = {
|
def findAccount(acc: AccountId): ConnectionIO[Option[AccountInfo]] = {
|
||||||
|
// collective may be given as id or name and it is possible to have two collective
|
||||||
|
// ids given. In that (not so nice) case, we need to lookup the collective name and
|
||||||
|
// match it against the user. This kind-of edge case is currently used when a user task
|
||||||
|
// of scope UserTaskScope.collective is run.
|
||||||
|
|
||||||
val collIdOpt = acc.collective.id.toLongOption.map(CollectiveId(_))
|
val collIdOpt = acc.collective.id.toLongOption.map(CollectiveId(_))
|
||||||
findUser0((ut, ct) =>
|
collIdOpt match {
|
||||||
(ut.login === acc.user || ut.uid === acc.user) && collIdOpt
|
case Some(cid) if acc.user == acc.collective =>
|
||||||
.map(id => ct.id === id)
|
findUser0((ut, ct) => ct.id === cid && ut.login === ct.name)
|
||||||
.getOrElse(ct.name === acc.collective)
|
.map(_.map(_.account))
|
||||||
)
|
|
||||||
.map(_.map(_.account))
|
case _ =>
|
||||||
|
findUser0((ut, ct) =>
|
||||||
|
(ut.login === acc.user || ut.uid === acc.user) && collIdOpt
|
||||||
|
.map(id => ct.id === id)
|
||||||
|
.getOrElse(ct.name === acc.collective)
|
||||||
|
)
|
||||||
|
.map(_.map(_.account))
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
def findByRememberMe(
|
def findByRememberMe(
|
||||||
|
Loading…
x
Reference in New Issue
Block a user