diff --git a/build.sbt b/build.sbt index b47c36ce..6ac36289 100644 --- a/build.sbt +++ b/build.sbt @@ -564,7 +564,7 @@ val backend = project Dependencies.http4sClient ++ Dependencies.emil ) - .dependsOn(store, joexapi, ftsclient, totp) + .dependsOn(store, joexapi, ftsclient, totp, pubsubApi) val oidc = project .in(file("modules/oidc")) @@ -655,7 +655,8 @@ val joex = project analysis, joexapi, restapi, - ftssolr + ftssolr, + pubsubNaive ) val restserver = project @@ -719,7 +720,7 @@ val restserver = project } } ) - .dependsOn(config, restapi, joexapi, backend, webapp, ftssolr, oidc) + .dependsOn(config, restapi, joexapi, backend, webapp, ftssolr, oidc, pubsubNaive) // --- Website Documentation diff --git a/modules/backend/src/main/scala/docspell/backend/BackendApp.scala b/modules/backend/src/main/scala/docspell/backend/BackendApp.scala index 4aea530f..cdc92fcc 100644 --- a/modules/backend/src/main/scala/docspell/backend/BackendApp.scala +++ b/modules/backend/src/main/scala/docspell/backend/BackendApp.scala @@ -6,8 +6,6 @@ package docspell.backend -import scala.concurrent.ExecutionContext - import cats.effect._ import docspell.backend.auth.Login @@ -15,15 +13,13 @@ import docspell.backend.fulltext.CreateIndex import docspell.backend.ops._ import docspell.backend.signup.OSignup import docspell.ftsclient.FtsClient -import docspell.joexapi.client.JoexClient +import docspell.pubsub.api.PubSubT import docspell.store.Store import docspell.store.queue.JobQueue import docspell.store.usertask.UserTaskStore import docspell.totp.Totp import emil.javamail.{JavaMailEmil, Settings} -import org.http4s.blaze.client.BlazeClientBuilder -import org.http4s.client.Client trait BackendApp[F[_]] { @@ -49,6 +45,7 @@ trait BackendApp[F[_]] { def clientSettings: OClientSettings[F] def totp: OTotp[F] def share: OShare[F] + def pubSub: PubSubT[F] } object BackendApp { @@ -56,8 +53,8 @@ object BackendApp { def create[F[_]: Async]( cfg: Config, store: Store[F], - httpClient: Client[F], - ftsClient: FtsClient[F] + ftsClient: FtsClient[F], + pubSubT: PubSubT[F] ): Resource[F, BackendApp[F]] = for { utStore <- UserTaskStore(store) @@ -65,7 +62,7 @@ object BackendApp { totpImpl <- OTotp(store, Totp.default) loginImpl <- Login[F](store, Totp.default) signupImpl <- OSignup[F](store) - joexImpl <- OJoex(JoexClient(httpClient), store) + joexImpl <- OJoex(pubSubT) collImpl <- OCollective[F](store, utStore, queue, joexImpl) sourceImpl <- OSource[F](store) tagImpl <- OTag[F](store) @@ -90,6 +87,7 @@ object BackendApp { OShare(store, itemSearchImpl, simpleSearchImpl, javaEmil) ) } yield new BackendApp[F] { + val pubSub = pubSubT val login = loginImpl val signup = signupImpl val collective = collImpl @@ -113,15 +111,4 @@ object BackendApp { val totp = totpImpl val share = shareImpl } - - def apply[F[_]: Async]( - cfg: Config, - connectEC: ExecutionContext - )(ftsFactory: Client[F] => Resource[F, FtsClient[F]]): Resource[F, BackendApp[F]] = - for { - store <- Store.create(cfg.jdbc, cfg.files.chunkSize, connectEC) - httpClient <- BlazeClientBuilder[F].resource - ftsClient <- ftsFactory(httpClient) - backend <- create(cfg, store, httpClient, ftsClient) - } yield backend } diff --git a/modules/backend/src/main/scala/docspell/backend/msg/CancelJob.scala b/modules/backend/src/main/scala/docspell/backend/msg/CancelJob.scala new file mode 100644 index 00000000..677f1fba --- /dev/null +++ b/modules/backend/src/main/scala/docspell/backend/msg/CancelJob.scala @@ -0,0 +1,27 @@ +/* + * Copyright 2020 Eike K. & Contributors + * + * SPDX-License-Identifier: AGPL-3.0-or-later + */ + +package docspell.backend.msg + +import docspell.common._ +import docspell.pubsub.api.{Topic, TypedTopic} + +import io.circe.generic.semiauto.{deriveDecoder, deriveEncoder} +import io.circe.{Decoder, Encoder} + +/** Message to request to cancel a job. */ +final case class CancelJob(jobId: Ident, nodeId: Ident) + +object CancelJob { + implicit val jsonDecoder: Decoder[CancelJob] = + deriveDecoder[CancelJob] + + implicit val jsonEncoder: Encoder[CancelJob] = + deriveEncoder[CancelJob] + + val topic: TypedTopic[CancelJob] = + TypedTopic(Topic("job-cancel-request")) +} diff --git a/modules/backend/src/main/scala/docspell/backend/msg/JobDone.scala b/modules/backend/src/main/scala/docspell/backend/msg/JobDone.scala new file mode 100644 index 00000000..9b09b8f1 --- /dev/null +++ b/modules/backend/src/main/scala/docspell/backend/msg/JobDone.scala @@ -0,0 +1,26 @@ +/* + * Copyright 2020 Eike K. & Contributors + * + * SPDX-License-Identifier: AGPL-3.0-or-later + */ + +package docspell.backend.msg + +import docspell.common._ +import docspell.pubsub.api.{Topic, TypedTopic} + +import io.circe.generic.semiauto.{deriveDecoder, deriveEncoder} +import io.circe.{Decoder, Encoder} + +/** Message to notify about finished jobs. They have a final state. */ +final case class JobDone(jobId: Ident, task: Ident, args: String, state: JobState) +object JobDone { + implicit val jsonDecoder: Decoder[JobDone] = + deriveDecoder[JobDone] + + implicit val jsonEncoder: Encoder[JobDone] = + deriveEncoder[JobDone] + + val topic: TypedTopic[JobDone] = + TypedTopic(Topic("job-finished")) +} diff --git a/modules/backend/src/main/scala/docspell/backend/msg/Ping.scala b/modules/backend/src/main/scala/docspell/backend/msg/Ping.scala new file mode 100644 index 00000000..b300a76d --- /dev/null +++ b/modules/backend/src/main/scala/docspell/backend/msg/Ping.scala @@ -0,0 +1,31 @@ +/* + * Copyright 2020 Eike K. & Contributors + * + * SPDX-License-Identifier: AGPL-3.0-or-later + */ + +package docspell.backend.msg + +import java.util.concurrent.atomic.AtomicLong + +import docspell.pubsub.api.{Topic, TypedTopic} + +import io.circe.generic.semiauto.{deriveDecoder, deriveEncoder} +import io.circe.{Decoder, Encoder} + +final case class Ping(sender: String, num: Long) + +object Ping { + implicit val jsonDecoder: Decoder[Ping] = + deriveDecoder[Ping] + + implicit val jsonEncoder: Encoder[Ping] = + deriveEncoder[Ping] + + private[this] val counter = new AtomicLong(0) + def next(sender: String): Ping = + Ping(sender, counter.getAndIncrement()) + + val topic: TypedTopic[Ping] = + TypedTopic[Ping](Topic("ping")) +} diff --git a/modules/backend/src/main/scala/docspell/backend/msg/Topics.scala b/modules/backend/src/main/scala/docspell/backend/msg/Topics.scala new file mode 100644 index 00000000..996ff7c6 --- /dev/null +++ b/modules/backend/src/main/scala/docspell/backend/msg/Topics.scala @@ -0,0 +1,23 @@ +/* + * Copyright 2020 Eike K. & Contributors + * + * SPDX-License-Identifier: AGPL-3.0-or-later + */ + +package docspell.backend.msg + +import cats.data.NonEmptyList + +import docspell.pubsub.api.{Topic, TypedTopic} + +/** All topics used in Docspell. */ +object Topics { + + /** A generic notification to the job executors to look for new work. */ + val jobsNotify: TypedTopic[Unit] = + TypedTopic[Unit](Topic("jobs-notify")) + + /** A list of all topics. It is required to list every topic in use here! */ + val all: NonEmptyList[TypedTopic[_]] = + NonEmptyList.of(Ping.topic, JobDone.topic, CancelJob.topic, jobsNotify) +} diff --git a/modules/backend/src/main/scala/docspell/backend/ops/OJob.scala b/modules/backend/src/main/scala/docspell/backend/ops/OJob.scala index 4ab45fb4..7f7a36fd 100644 --- a/modules/backend/src/main/scala/docspell/backend/ops/OJob.scala +++ b/modules/backend/src/main/scala/docspell/backend/ops/OJob.scala @@ -11,8 +11,7 @@ import cats.effect._ import cats.implicits._ import docspell.backend.ops.OJob.{CollectiveQueueState, JobCancelResult} -import docspell.common.Priority -import docspell.common.{Ident, JobState} +import docspell.common._ import docspell.store.Store import docspell.store.UpdateResult import docspell.store.queries.QJob @@ -55,6 +54,7 @@ object OJob { joex: OJoex[F] ): Resource[F, OJob[F]] = Resource.pure[F, OJob[F]](new OJob[F] { + private[this] val logger = Logger.log4s(org.log4s.getLogger(OJob.getClass)) def queueState(collective: Ident, maxResults: Int): F[CollectiveQueueState] = store @@ -77,11 +77,9 @@ object OJob { job.worker match { case Some(worker) => for { - flag <- joex.cancelJob(job.id, worker) - res <- - if (flag) JobCancelResult.cancelRequested.pure[F] - else remove(job) - } yield res + _ <- logger.debug(s"Attempt to cancel job: ${job.id.id}") + _ <- joex.cancelJob(job.id, worker) + } yield JobCancelResult.cancelRequested case None => remove(job) } diff --git a/modules/backend/src/main/scala/docspell/backend/ops/OJoex.scala b/modules/backend/src/main/scala/docspell/backend/ops/OJoex.scala index d4aaf956..1ce43149 100644 --- a/modules/backend/src/main/scala/docspell/backend/ops/OJoex.scala +++ b/modules/backend/src/main/scala/docspell/backend/ops/OJoex.scala @@ -6,41 +6,27 @@ package docspell.backend.ops -import cats.data.OptionT import cats.effect._ -import cats.implicits._ -import docspell.common.{Ident, NodeType} -import docspell.joexapi.client.JoexClient -import docspell.store.Store -import docspell.store.records.RNode +import docspell.backend.msg.{CancelJob, Topics} +import docspell.common.Ident +import docspell.pubsub.api.PubSubT trait OJoex[F[_]] { def notifyAllNodes: F[Unit] - def cancelJob(job: Ident, worker: Ident): F[Boolean] - + def cancelJob(job: Ident, worker: Ident): F[Unit] } object OJoex { - - def apply[F[_]: Sync](client: JoexClient[F], store: Store[F]): Resource[F, OJoex[F]] = + def apply[F[_]](pubSub: PubSubT[F]): Resource[F, OJoex[F]] = Resource.pure[F, OJoex[F]](new OJoex[F] { + def notifyAllNodes: F[Unit] = - for { - nodes <- store.transact(RNode.findAll(NodeType.Joex)) - _ <- nodes.toList.traverse(n => client.notifyJoexIgnoreErrors(n.url)) - } yield () + pubSub.publish1IgnoreErrors(Topics.jobsNotify, ()) - def cancelJob(job: Ident, worker: Ident): F[Boolean] = - (for { - node <- OptionT(store.transact(RNode.findById(worker))) - cancel <- OptionT.liftF(client.cancelJob(node.url, job)) - } yield cancel.success).getOrElse(false) + def cancelJob(job: Ident, worker: Ident): F[Unit] = + pubSub.publish1IgnoreErrors(CancelJob.topic, CancelJob(job, worker)) }) - - def create[F[_]: Async](store: Store[F]): Resource[F, OJoex[F]] = - JoexClient.resource.flatMap(client => apply(client, store)) - } diff --git a/modules/common/src/main/scala/docspell/common/Logger.scala b/modules/common/src/main/scala/docspell/common/Logger.scala index 936c9d34..01265ef4 100644 --- a/modules/common/src/main/scala/docspell/common/Logger.scala +++ b/modules/common/src/main/scala/docspell/common/Logger.scala @@ -6,6 +6,7 @@ package docspell.common +import cats.Applicative import cats.effect.Sync import fs2.Stream @@ -45,6 +46,27 @@ trait Logger[F[_]] { self => object Logger { + def off[F[_]: Applicative]: Logger[F] = + new Logger[F] { + def trace(msg: => String): F[Unit] = + Applicative[F].pure(()) + + def debug(msg: => String): F[Unit] = + Applicative[F].pure(()) + + def info(msg: => String): F[Unit] = + Applicative[F].pure(()) + + def warn(msg: => String): F[Unit] = + Applicative[F].pure(()) + + def error(ex: Throwable)(msg: => String): F[Unit] = + Applicative[F].pure(()) + + def error(msg: => String): F[Unit] = + Applicative[F].pure(()) + } + def log4s[F[_]: Sync](log: Log4sLogger): Logger[F] = new Logger[F] { def trace(msg: => String): F[Unit] = diff --git a/modules/joex/src/main/scala/docspell/joex/Config.scala b/modules/joex/src/main/scala/docspell/joex/Config.scala index 2554d9bf..81172ff2 100644 --- a/modules/joex/src/main/scala/docspell/joex/Config.scala +++ b/modules/joex/src/main/scala/docspell/joex/Config.scala @@ -20,6 +20,7 @@ import docspell.joex.analysis.RegexNerFile import docspell.joex.hk.HouseKeepingConfig import docspell.joex.scheduler.{PeriodicSchedulerConfig, SchedulerConfig} import docspell.joex.updatecheck.UpdateCheckConfig +import docspell.pubsub.naive.PubSubConfig import docspell.store.JdbcConfig case class Config( @@ -39,7 +40,11 @@ case class Config( mailDebug: Boolean, fullTextSearch: Config.FullTextSearch, updateCheck: UpdateCheckConfig -) +) { + + def pubSubConfig: PubSubConfig = + PubSubConfig(appId, baseUrl / "internal" / "pubsub", 100) +} object Config { case class Bind(address: String, port: Int) diff --git a/modules/joex/src/main/scala/docspell/joex/JoexAppImpl.scala b/modules/joex/src/main/scala/docspell/joex/JoexAppImpl.scala index 86c65efc..85359092 100644 --- a/modules/joex/src/main/scala/docspell/joex/JoexAppImpl.scala +++ b/modules/joex/src/main/scala/docspell/joex/JoexAppImpl.scala @@ -6,14 +6,13 @@ package docspell.joex -import scala.concurrent.ExecutionContext - import cats.effect._ import cats.implicits._ import fs2.concurrent.SignallingRef import docspell.analysis.TextAnalyser import docspell.backend.fulltext.CreateIndex +import docspell.backend.msg.{CancelJob, Ping, Topics} import docspell.backend.ops._ import docspell.common._ import docspell.ftsclient.FtsClient @@ -34,6 +33,7 @@ import docspell.joex.scanmailbox._ import docspell.joex.scheduler._ import docspell.joex.updatecheck._ import docspell.joexapi.client.JoexClient +import docspell.pubsub.api.{PubSub, PubSubT} import docspell.store.Store import docspell.store.queue._ import docspell.store.records.{REmptyTrashSetting, RJobLog} @@ -41,19 +41,20 @@ import docspell.store.usertask.UserTaskScope import docspell.store.usertask.UserTaskStore import emil.javamail._ -import org.http4s.blaze.client.BlazeClientBuilder import org.http4s.client.Client final class JoexAppImpl[F[_]: Async]( cfg: Config, - nodeOps: ONode[F], store: Store[F], queue: JobQueue[F], + pubSubT: PubSubT[F], pstore: PeriodicTaskStore[F], termSignal: SignallingRef[F, Boolean], val scheduler: Scheduler[F], val periodicScheduler: PeriodicScheduler[F] ) extends JoexApp[F] { + private[this] val logger: Logger[F] = + Logger.log4s(org.log4s.getLogger(s"Joex-${cfg.appId.id}")) def init: F[Unit] = { val run = scheduler.start.compile.drain @@ -64,16 +65,26 @@ final class JoexAppImpl[F[_]: Async]( _ <- Async[F].start(prun) _ <- scheduler.periodicAwake _ <- periodicScheduler.periodicAwake - _ <- nodeOps.register(cfg.appId, NodeType.Joex, cfg.baseUrl) + _ <- subscriptions } yield () } + def subscriptions = + for { + _ <- Async[F].start(pubSubT.subscribeSink(Ping.topic) { msg => + logger.info(s">>>> PING $msg") + }) + _ <- Async[F].start(pubSubT.subscribeSink(Topics.jobsNotify) { _ => + scheduler.notifyChange + }) + _ <- Async[F].start(pubSubT.subscribeSink(CancelJob.topic) { msg => + scheduler.requestCancel(msg.body.jobId).as(()) + }) + } yield () + def findLogs(jobId: Ident): F[Vector[RJobLog]] = store.transact(RJobLog.findLogs(jobId)) - def shutdown: F[Unit] = - nodeOps.unregister(cfg.appId) - def initShutdown: F[Unit] = periodicScheduler.shutdown *> scheduler.shutdown(false) *> termSignal.set(true) @@ -116,16 +127,19 @@ object JoexAppImpl { def create[F[_]: Async]( cfg: Config, termSignal: SignallingRef[F, Boolean], - connectEC: ExecutionContext + store: Store[F], + httpClient: Client[F], + pubSub: PubSub[F] ): Resource[F, JoexApp[F]] = for { - httpClient <- BlazeClientBuilder[F].resource - client = JoexClient(httpClient) - store <- Store.create(cfg.jdbc, cfg.files.chunkSize, connectEC) queue <- JobQueue(store) pstore <- PeriodicTaskStore.create(store) - nodeOps <- ONode(store) - joex <- OJoex(client, store) + client = JoexClient(httpClient) + pubSubT = PubSubT( + pubSub, + Logger.log4s(org.log4s.getLogger(s"joex-${cfg.appId.id}")) + ) + joex <- OJoex(pubSubT) upload <- OUpload(store, queue, joex) fts <- createFtsClient(cfg)(httpClient) createIndex <- CreateIndex.resource(fts, store) @@ -138,6 +152,7 @@ object JoexAppImpl { JavaMailEmil(Settings.defaultSettings.copy(debug = cfg.mailDebug)) sch <- SchedulerBuilder(cfg.scheduler, store) .withQueue(queue) + .withPubSub(pubSubT) .withTask( JobTask.json( ProcessItemArgs.taskName, @@ -264,8 +279,8 @@ object JoexAppImpl { pstore, client ) - app = new JoexAppImpl(cfg, nodeOps, store, queue, pstore, termSignal, sch, psch) - appR <- Resource.make(app.init.map(_ => app))(_.shutdown) + app = new JoexAppImpl(cfg, store, queue, pubSubT, pstore, termSignal, sch, psch) + appR <- Resource.make(app.init.map(_ => app))(_.initShutdown) } yield appR private def createFtsClient[F[_]: Async]( diff --git a/modules/joex/src/main/scala/docspell/joex/JoexServer.scala b/modules/joex/src/main/scala/docspell/joex/JoexServer.scala index 8c4773dc..cad75cbc 100644 --- a/modules/joex/src/main/scala/docspell/joex/JoexServer.scala +++ b/modules/joex/src/main/scala/docspell/joex/JoexServer.scala @@ -11,10 +11,14 @@ import cats.effect._ import fs2.Stream import fs2.concurrent.SignallingRef +import docspell.backend.msg.Topics import docspell.common.Pools import docspell.joex.routes._ +import docspell.pubsub.naive.NaivePubSub +import docspell.store.Store import org.http4s.HttpApp +import org.http4s.blaze.client.BlazeClientBuilder import org.http4s.blaze.server.BlazeServerBuilder import org.http4s.implicits._ import org.http4s.server.Router @@ -33,9 +37,19 @@ object JoexServer { val app = for { signal <- Resource.eval(SignallingRef[F, Boolean](false)) exitCode <- Resource.eval(Ref[F].of(ExitCode.Success)) - joexApp <- JoexAppImpl.create[F](cfg, signal, pools.connectEC) + + store <- Store.create[F]( + cfg.jdbc, + cfg.files.chunkSize, + pools.connectEC + ) + httpClient <- BlazeClientBuilder[F].resource + pubSub <- NaivePubSub(cfg.pubSubConfig, store, httpClient)(Topics.all.map(_.topic)) + + joexApp <- JoexAppImpl.create[F](cfg, signal, store, httpClient, pubSub) httpApp = Router( + "/internal/pubsub" -> pubSub.receiveRoute, "/api/info" -> InfoRoutes(cfg), "/api/v1" -> JoexRoutes(joexApp) ).orNotFound diff --git a/modules/joex/src/main/scala/docspell/joex/scheduler/SchedulerBuilder.scala b/modules/joex/src/main/scala/docspell/joex/scheduler/SchedulerBuilder.scala index 4f981a87..cc09f7da 100644 --- a/modules/joex/src/main/scala/docspell/joex/scheduler/SchedulerBuilder.scala +++ b/modules/joex/src/main/scala/docspell/joex/scheduler/SchedulerBuilder.scala @@ -11,6 +11,7 @@ import cats.effect.std.Semaphore import cats.implicits._ import fs2.concurrent.SignallingRef +import docspell.pubsub.api.PubSubT import docspell.store.Store import docspell.store.queue.JobQueue @@ -19,7 +20,8 @@ case class SchedulerBuilder[F[_]: Async]( tasks: JobTaskRegistry[F], store: Store[F], queue: Resource[F, JobQueue[F]], - logSink: LogSink[F] + logSink: LogSink[F], + pubSub: PubSubT[F] ) { def withConfig(cfg: SchedulerConfig): SchedulerBuilder[F] = @@ -32,7 +34,7 @@ case class SchedulerBuilder[F[_]: Async]( withTaskRegistry(tasks.withTask(task)) def withQueue(queue: Resource[F, JobQueue[F]]): SchedulerBuilder[F] = - SchedulerBuilder[F](config, tasks, store, queue, logSink) + copy(queue = queue) def withLogSink(sink: LogSink[F]): SchedulerBuilder[F] = copy(logSink = sink) @@ -40,6 +42,9 @@ case class SchedulerBuilder[F[_]: Async]( def withQueue(queue: JobQueue[F]): SchedulerBuilder[F] = copy(queue = Resource.pure[F, JobQueue[F]](queue)) + def withPubSub(pubSubT: PubSubT[F]): SchedulerBuilder[F] = + copy(pubSub = pubSubT) + def serve: Resource[F, Scheduler[F]] = resource.evalMap(sch => Async[F].start(sch.start.compile.drain).map(_ => sch)) @@ -52,6 +57,7 @@ case class SchedulerBuilder[F[_]: Async]( } yield new SchedulerImpl[F]( config, jq, + pubSub, tasks, store, logSink, @@ -76,7 +82,8 @@ object SchedulerBuilder { JobTaskRegistry.empty[F], store, JobQueue(store), - LogSink.db[F](store) + LogSink.db[F](store), + PubSubT.noop[F] ) } diff --git a/modules/joex/src/main/scala/docspell/joex/scheduler/SchedulerImpl.scala b/modules/joex/src/main/scala/docspell/joex/scheduler/SchedulerImpl.scala index d01d6756..e38a282b 100644 --- a/modules/joex/src/main/scala/docspell/joex/scheduler/SchedulerImpl.scala +++ b/modules/joex/src/main/scala/docspell/joex/scheduler/SchedulerImpl.scala @@ -13,19 +13,22 @@ import cats.implicits._ import fs2.Stream import fs2.concurrent.SignallingRef +import docspell.backend.msg.JobDone import docspell.common._ import docspell.common.syntax.all._ import docspell.joex.scheduler.SchedulerImpl._ +import docspell.pubsub.api.PubSubT import docspell.store.Store import docspell.store.queries.QJob import docspell.store.queue.JobQueue import docspell.store.records.RJob -import org.log4s._ +import org.log4s.getLogger final class SchedulerImpl[F[_]: Async]( val config: SchedulerConfig, queue: JobQueue[F], + pubSub: PubSubT[F], tasks: JobTaskRegistry[F], store: Store[F], logSink: LogSink[F], @@ -55,20 +58,21 @@ final class SchedulerImpl[F[_]: Async]( state.get.flatMap(s => QJob.findAll(s.getRunning, store)) def requestCancel(jobId: Ident): F[Boolean] = - state.get.flatMap(_.cancelRequest(jobId) match { - case Some(ct) => ct.map(_ => true) - case None => - (for { - job <- OptionT(store.transact(RJob.findByIdAndWorker(jobId, config.name))) - _ <- OptionT.liftF( - if (job.isInProgress) executeCancel(job) - else ().pure[F] - ) - } yield true) - .getOrElseF( - logger.fwarn(s"Job ${jobId.id} not found, cannot cancel.").map(_ => false) - ) - }) + logger.finfo(s"Scheduler requested to cancel job: ${jobId.id}") *> + state.get.flatMap(_.cancelRequest(jobId) match { + case Some(ct) => ct.map(_ => true) + case None => + (for { + job <- OptionT(store.transact(RJob.findByIdAndWorker(jobId, config.name))) + _ <- OptionT.liftF( + if (job.isInProgress) executeCancel(job) + else ().pure[F] + ) + } yield true) + .getOrElseF( + logger.fwarn(s"Job ${jobId.id} not found, cannot cancel.").map(_ => false) + ) + }) def notifyChange: F[Unit] = waiter.update(b => !b) @@ -198,6 +202,10 @@ final class SchedulerImpl[F[_]: Async]( ) _ <- state.modify(_.removeRunning(job)) _ <- QJob.setFinalState(job.id, finalState, store) + _ <- pubSub.publish1IgnoreErrors( + JobDone.topic, + JobDone(job.id, job.task, job.args, finalState) + ) } yield () def onStart(job: RJob): F[Unit] = diff --git a/modules/pubsub/api/src/main/scala/docspell/pubsub/api/PubSub.scala b/modules/pubsub/api/src/main/scala/docspell/pubsub/api/PubSub.scala index 4ae9ef6a..6a9eb9a5 100644 --- a/modules/pubsub/api/src/main/scala/docspell/pubsub/api/PubSub.scala +++ b/modules/pubsub/api/src/main/scala/docspell/pubsub/api/PubSub.scala @@ -6,9 +6,12 @@ package docspell.pubsub.api +import cats.Applicative import cats.data.NonEmptyList import fs2.{Pipe, Stream} +import docspell.common.{Ident, Timestamp} + import io.circe.Json trait PubSub[F[_]] { @@ -18,3 +21,16 @@ trait PubSub[F[_]] { def subscribe(topics: NonEmptyList[Topic]): Stream[F, Message[Json]] } +object PubSub { + def noop[F[_]: Applicative]: PubSub[F] = + new PubSub[F] { + def publish1(topic: Topic, msg: Json): F[MessageHead] = + Applicative[F].pure(MessageHead(Ident.unsafe("0"), Timestamp.Epoch, topic)) + + def publish(topic: Topic): Pipe[F, Json, MessageHead] = + _ => Stream.empty + + def subscribe(topics: NonEmptyList[Topic]): Stream[F, Message[Json]] = + Stream.empty + } +} diff --git a/modules/pubsub/api/src/main/scala/docspell/pubsub/api/PubSubT.scala b/modules/pubsub/api/src/main/scala/docspell/pubsub/api/PubSubT.scala index 9f28b798..d07f5e41 100644 --- a/modules/pubsub/api/src/main/scala/docspell/pubsub/api/PubSubT.scala +++ b/modules/pubsub/api/src/main/scala/docspell/pubsub/api/PubSubT.scala @@ -7,6 +7,9 @@ package docspell.pubsub.api import cats.data.NonEmptyList +import cats.effect._ +import cats.implicits._ +import fs2.concurrent.SignallingRef import fs2.{Pipe, Stream} import docspell.common.Logger @@ -15,22 +18,35 @@ trait PubSubT[F[_]] { def publish1[A](topic: TypedTopic[A], msg: A): F[MessageHead] + def publish1IgnoreErrors[A](topic: TypedTopic[A], msg: A): F[Unit] + def publish[A](topic: TypedTopic[A]): Pipe[F, A, MessageHead] def subscribe[A](topic: TypedTopic[A]): Stream[F, Message[A]] + def subscribeSink[A](topic: TypedTopic[A])(handler: Message[A] => F[Unit]): F[F[Unit]] + def delegate: PubSub[F] def withDelegate(delegate: PubSub[F]): PubSubT[F] } object PubSubT { + def noop[F[_]: Async]: PubSubT[F] = + PubSubT(PubSub.noop[F], Logger.off[F]) - def apply[F[_]](pubSub: PubSub[F], logger: Logger[F]): PubSubT[F] = + def apply[F[_]: Async](pubSub: PubSub[F], logger: Logger[F]): PubSubT[F] = new PubSubT[F] { def publish1[A](topic: TypedTopic[A], msg: A): F[MessageHead] = pubSub.publish1(topic.topic, topic.codec(msg)) + def publish1IgnoreErrors[A](topic: TypedTopic[A], msg: A): F[Unit] = + publish1(topic, msg).attempt.flatMap { + case Right(_) => ().pure[F] + case Left(ex) => + logger.error(ex)(s"Error publishing to topic ${topic.topic.name}: $msg") + } + def publish[A](topic: TypedTopic[A]): Pipe[F, A, MessageHead] = _.map(topic.codec.apply).through(pubSub.publish(topic.topic)) @@ -49,6 +65,18 @@ object PubSubT { } ) + def subscribeSink[A]( + topic: TypedTopic[A] + )(handler: Message[A] => F[Unit]): F[F[Unit]] = + for { + halt <- SignallingRef.of[F, Boolean](false) + _ <- subscribe(topic) + .evalMap(handler) + .interruptWhen(halt) + .compile + .drain + } yield halt.set(true) + def delegate: PubSub[F] = pubSub def withDelegate(newDelegate: PubSub[F]): PubSubT[F] = diff --git a/modules/pubsub/api/src/main/scala/docspell/pubsub/api/Topics.scala b/modules/pubsub/api/src/main/scala/docspell/pubsub/api/Topics.scala deleted file mode 100644 index a536607a..00000000 --- a/modules/pubsub/api/src/main/scala/docspell/pubsub/api/Topics.scala +++ /dev/null @@ -1,61 +0,0 @@ -/* - * Copyright 2020 Eike K. & Contributors - * - * SPDX-License-Identifier: AGPL-3.0-or-later - */ - -package docspell.pubsub.api - -import cats.data.NonEmptyList - -import docspell.common.Ident - -import io.circe.generic.semiauto.{deriveDecoder, deriveEncoder} -import io.circe.{Decoder, Encoder} - -/** All topics used in Docspell. */ -object Topics { - - /** Notify when a job has finished. */ - val jobDone: TypedTopic[JobDoneMsg] = TypedTopic[JobDoneMsg](Topic("job-done")) - - /** Notify when a job has been submitted. The job executor listens to these messages to - * wake up and do its work. - */ - val jobSubmitted: TypedTopic[JobSubmittedMsg] = - TypedTopic[JobSubmittedMsg](Topic("job-submitted")) - - /** Notify a node to cancel a job with the given id */ - val cancelJob: TypedTopic[CancelJobMsg] = - TypedTopic[CancelJobMsg](Topic("cancel-job")) - - val all: NonEmptyList[TypedTopic[_]] = NonEmptyList.of(jobDone, jobSubmitted, cancelJob) - - final case class JobSubmittedMsg(task: Ident) - object JobSubmittedMsg { - implicit val jsonDecoder: Decoder[JobSubmittedMsg] = - deriveDecoder[JobSubmittedMsg] - - implicit val jsonEncoder: Encoder[JobSubmittedMsg] = - deriveEncoder[JobSubmittedMsg] - } - - final case class JobDoneMsg(jobId: Ident, task: Ident) - object JobDoneMsg { - implicit val jsonDecoder: Decoder[JobDoneMsg] = - deriveDecoder[JobDoneMsg] - - implicit val jsonEncoder: Encoder[JobDoneMsg] = - deriveEncoder[JobDoneMsg] - } - - final case class CancelJobMsg(jobId: Ident, nodeId: Ident) - object CancelJobMsg { - implicit val jsonDecoder: Decoder[CancelJobMsg] = - deriveDecoder[CancelJobMsg] - - implicit val jsonEncoder: Encoder[CancelJobMsg] = - deriveEncoder[CancelJobMsg] - - } -} diff --git a/modules/pubsub/naive/src/main/scala/docspell/pubsub/naive/NaivePubSub.scala b/modules/pubsub/naive/src/main/scala/docspell/pubsub/naive/NaivePubSub.scala index fb7bb1cb..cb7370ad 100644 --- a/modules/pubsub/naive/src/main/scala/docspell/pubsub/naive/NaivePubSub.scala +++ b/modules/pubsub/naive/src/main/scala/docspell/pubsub/naive/NaivePubSub.scala @@ -155,23 +155,16 @@ final class NaivePubSub[F[_]: Async]( for { _ <- logger.trace(s"Find all nodes subscribed to topic ${msg.head.topic.name}") - urls <- store.transact(RPubSub.findSubs(msg.head.topic.name)) + urls <- store.transact(RPubSub.findSubs(msg.head.topic.name, cfg.nodeId)) _ <- logger.trace(s"Publishing to remote urls ${urls.map(_.asString)}: $msg") reqs = urls .map(u => Uri.unsafeFromString(u.asString)) .map(uri => POST(List(msg), uri)) - res <- reqs.traverse(req => client.status(req)).attempt - _ <- res match { + resList <- reqs.traverse(req => client.status(req).attempt) + _ <- resList.traverse { case Right(s) => - if (s.forall(_.isSuccess)) ().pure[F] - else if (s.size == urls.size) - logger.warn( - s"No nodes was be reached! Reason: $s, message: $msg" - ) - else - logger.warn( - s"Some nodes were not reached! Reason: $s, message: $msg" - ) + if (s.isSuccess) ().pure[F] + else logger.warn(s"A node was not reached! Reason: $s, message: $msg") case Left(ex) => logger.error(ex)(s"Error publishing ${msg.head.topic.name} message remotely") } diff --git a/modules/pubsub/naive/src/test/scala/docspell/pubsub/naive/NaivePubSubTest.scala b/modules/pubsub/naive/src/test/scala/docspell/pubsub/naive/NaivePubSubTest.scala index ece6f443..8d3094c2 100644 --- a/modules/pubsub/naive/src/test/scala/docspell/pubsub/naive/NaivePubSubTest.scala +++ b/modules/pubsub/naive/src/test/scala/docspell/pubsub/naive/NaivePubSubTest.scala @@ -13,8 +13,8 @@ import cats.implicits._ import fs2.concurrent.SignallingRef import docspell.common._ -import docspell.pubsub.api.Topics.{JobDoneMsg, JobSubmittedMsg} import docspell.pubsub.api._ +import docspell.pubsub.naive.Topics._ import munit.CatsEffectSuite @@ -104,7 +104,7 @@ class NaivePubSubTest extends CatsEffectSuite with Fixtures { } pubsubEnv.test("do not receive remote message from other topic") { env => - val msg = JobDoneMsg("job-1".id, "task-2".id) + val msg = JobCancelMsg("job-1".id) // Create two pubsub instances connected to the same database conntectedPubsubs(env).use { case (ps1, ps2) => @@ -112,7 +112,7 @@ class NaivePubSubTest extends CatsEffectSuite with Fixtures { // subscribe to ps1 and send via ps2 res <- subscribe(ps1, Topics.jobSubmitted) (received, halt, subFiber) = res - _ <- ps2.publish1(Topics.jobDone, msg) + _ <- ps2.publish1(Topics.jobCancel, msg) _ <- IO.sleep(100.millis) _ <- halt.set(true) outcome <- subFiber.join diff --git a/modules/pubsub/naive/src/test/scala/docspell/pubsub/naive/Topics.scala b/modules/pubsub/naive/src/test/scala/docspell/pubsub/naive/Topics.scala new file mode 100644 index 00000000..8388efa0 --- /dev/null +++ b/modules/pubsub/naive/src/test/scala/docspell/pubsub/naive/Topics.scala @@ -0,0 +1,37 @@ +/* + * Copyright 2020 Eike K. & Contributors + * + * SPDX-License-Identifier: AGPL-3.0-or-later + */ + +package docspell.pubsub.naive + +import cats.data.NonEmptyList + +import docspell.common.Ident +import docspell.pubsub.api._ + +import io.circe.generic.semiauto.{deriveDecoder, deriveEncoder} +import io.circe.{Decoder, Encoder} + +object Topics { + val jobSubmitted: TypedTopic[JobSubmittedMsg] = + TypedTopic[JobSubmittedMsg](Topic("test-job-submitted")) + + final case class JobSubmittedMsg(task: Ident) + object JobSubmittedMsg { + implicit val encode: Encoder[JobSubmittedMsg] = deriveEncoder[JobSubmittedMsg] + implicit val decode: Decoder[JobSubmittedMsg] = deriveDecoder[JobSubmittedMsg] + } + + val jobCancel: TypedTopic[JobCancelMsg] = + TypedTopic[JobCancelMsg](Topic("test-job-done")) + final case class JobCancelMsg(id: Ident) + object JobCancelMsg { + implicit val encode: Encoder[JobCancelMsg] = deriveEncoder[JobCancelMsg] + implicit val decode: Decoder[JobCancelMsg] = deriveDecoder[JobCancelMsg] + } + + def all: NonEmptyList[TypedTopic[_]] = + NonEmptyList.of(jobSubmitted, jobCancel) +} diff --git a/modules/restserver/src/main/scala/docspell/restserver/Config.scala b/modules/restserver/src/main/scala/docspell/restserver/Config.scala index 4b64d7e7..589e5ff1 100644 --- a/modules/restserver/src/main/scala/docspell/restserver/Config.scala +++ b/modules/restserver/src/main/scala/docspell/restserver/Config.scala @@ -11,6 +11,7 @@ import docspell.backend.{Config => BackendConfig} import docspell.common._ import docspell.ftssolr.SolrConfig import docspell.oidc.ProviderConfig +import docspell.pubsub.naive.PubSubConfig import docspell.restserver.Config.OpenIdConfig import docspell.restserver.auth.OpenId @@ -33,6 +34,9 @@ case class Config( ) { def openIdEnabled: Boolean = openid.exists(_.enabled) + + def pubSubConfig: PubSubConfig = + PubSubConfig(appId, baseUrl / "internal" / "pubsub", 100) } object Config { diff --git a/modules/restserver/src/main/scala/docspell/restserver/RestApp.scala b/modules/restserver/src/main/scala/docspell/restserver/RestApp.scala index 975ae73c..68383b0a 100644 --- a/modules/restserver/src/main/scala/docspell/restserver/RestApp.scala +++ b/modules/restserver/src/main/scala/docspell/restserver/RestApp.scala @@ -10,8 +10,6 @@ import docspell.backend.BackendApp trait RestApp[F[_]] { - def init: F[Unit] - def config: Config def backend: BackendApp[F] diff --git a/modules/restserver/src/main/scala/docspell/restserver/RestAppImpl.scala b/modules/restserver/src/main/scala/docspell/restserver/RestAppImpl.scala index 74b6a303..313ec05e 100644 --- a/modules/restserver/src/main/scala/docspell/restserver/RestAppImpl.scala +++ b/modules/restserver/src/main/scala/docspell/restserver/RestAppImpl.scala @@ -6,41 +6,52 @@ package docspell.restserver -import scala.concurrent.ExecutionContext - import cats.effect._ import cats.implicits._ import docspell.backend.BackendApp -import docspell.common.NodeType +import docspell.backend.msg.{JobDone, Ping} +import docspell.common.Logger import docspell.ftsclient.FtsClient import docspell.ftssolr.SolrFtsClient +import docspell.pubsub.api.{PubSub, PubSubT} +import docspell.store.Store import org.http4s.client.Client final class RestAppImpl[F[_]](val config: Config, val backend: BackendApp[F]) - extends RestApp[F] { - - def init: F[Unit] = - backend.node.register(config.appId, NodeType.Restserver, config.baseUrl) - - def shutdown: F[Unit] = - backend.node.unregister(config.appId) -} + extends RestApp[F] {} object RestAppImpl { def create[F[_]: Async]( cfg: Config, - connectEC: ExecutionContext - ): Resource[F, RestApp[F]] = + store: Store[F], + httpClient: Client[F], + pubSub: PubSub[F] + ): Resource[F, RestApp[F]] = { + val logger = Logger.log4s(org.log4s.getLogger(s"restserver-${cfg.appId.id}")) for { - backend <- BackendApp(cfg.backend, connectEC)( - createFtsClient[F](cfg) - ) + ftsClient <- createFtsClient(cfg)(httpClient) + pubSubT = PubSubT(pubSub, logger) + backend <- BackendApp.create[F](cfg.backend, store, ftsClient, pubSubT) + _ <- Resource.eval(subscriptions(backend, logger)) app = new RestAppImpl[F](cfg, backend) - appR <- Resource.make(app.init.map(_ => app))(_.shutdown) - } yield appR + } yield app + } + + private def subscriptions[F[_]: Async]( + backend: BackendApp[F], + logger: Logger[F] + ): F[Unit] = + for { + _ <- Async[F].start(backend.pubSub.subscribeSink(Ping.topic) { msg => + logger.info(s">>>> PING $msg") + }) + _ <- Async[F].start(backend.pubSub.subscribeSink(JobDone.topic) { msg => + logger.info(s">>>> Job Done $msg") + }) + } yield () private def createFtsClient[F[_]: Async]( cfg: Config diff --git a/modules/restserver/src/main/scala/docspell/restserver/RestServer.scala b/modules/restserver/src/main/scala/docspell/restserver/RestServer.scala index f64f0a93..e5531fbe 100644 --- a/modules/restserver/src/main/scala/docspell/restserver/RestServer.scala +++ b/modules/restserver/src/main/scala/docspell/restserver/RestServer.scala @@ -11,12 +11,15 @@ import cats.implicits._ import fs2.Stream import docspell.backend.auth.{AuthToken, ShareToken} +import docspell.backend.msg.Topics import docspell.common._ import docspell.oidc.CodeFlowRoutes +import docspell.pubsub.naive.NaivePubSub import docspell.restserver.auth.OpenId import docspell.restserver.http4s.EnvMiddleware import docspell.restserver.routes._ import docspell.restserver.webapp._ +import docspell.store.Store import org.http4s._ import org.http4s.blaze.client.BlazeClientBuilder @@ -34,9 +37,17 @@ object RestServer { val templates = TemplateRoutes[F](cfg) val app = for { - restApp <- RestAppImpl.create[F](cfg, pools.connectEC) + store <- Store.create[F]( + cfg.backend.jdbc, + cfg.backend.files.chunkSize, + pools.connectEC + ) + httpClient <- BlazeClientBuilder[F].resource + pubSub <- NaivePubSub(cfg.pubSubConfig, store, httpClient)(Topics.all.map(_.topic)) + restApp <- RestAppImpl.create[F](cfg, store, httpClient, pubSub) httpClient <- BlazeClientBuilder[F].resource httpApp = Router( + "/internal/pubsub" -> pubSub.receiveRoute, "/api/info" -> routes.InfoRoutes(), "/api/v1/open/" -> openRoutes(cfg, httpClient, restApp), "/api/v1/sec/" -> Authenticate(restApp.backend.login, cfg.auth) { token => diff --git a/modules/store/src/main/scala/docspell/store/records/RPubSub.scala b/modules/store/src/main/scala/docspell/store/records/RPubSub.scala index f0e2ce6e..6509aaea 100644 --- a/modules/store/src/main/scala/docspell/store/records/RPubSub.scala +++ b/modules/store/src/main/scala/docspell/store/records/RPubSub.scala @@ -79,8 +79,12 @@ object RPubSub { ) ) - def findSubs(topic: String): ConnectionIO[List[LenientUri]] = - run(select(T.url), from(T), T.topic === topic && T.counter > 0) + def findSubs(topic: String, excludeNode: Ident): ConnectionIO[List[LenientUri]] = + run( + select(T.url), + from(T), + T.topic === topic && T.counter > 0 && T.nodeId <> excludeNode + ) .query[LenientUri] .to[List] }