diff --git a/build.sbt b/build.sbt
index 82ebe6c2..6ac36289 100644
--- a/build.sbt
+++ b/build.sbt
@@ -410,6 +410,36 @@ val store = project
)
.dependsOn(common, query.jvm, totp, files)
+val pubsubApi = project
+ .in(file("modules/pubsub/api"))
+ .disablePlugins(RevolverPlugin)
+ .settings(sharedSettings)
+ .settings(testSettingsMUnit)
+ .settings(
+ name := "docspell-pubsub-api",
+ addCompilerPlugin(Dependencies.kindProjectorPlugin),
+ libraryDependencies ++=
+ Dependencies.fs2
+ )
+ .dependsOn(common)
+
+val pubsubNaive = project
+ .in(file("modules/pubsub/naive"))
+ .disablePlugins(RevolverPlugin)
+ .settings(sharedSettings)
+ .settings(testSettingsMUnit)
+ .settings(
+ name := "docspell-pubsub-naive",
+ addCompilerPlugin(Dependencies.kindProjectorPlugin),
+ libraryDependencies ++=
+ Dependencies.fs2 ++
+ Dependencies.http4sCirce ++
+ Dependencies.http4sDsl ++
+ Dependencies.http4sClient ++
+ Dependencies.circe
+ )
+ .dependsOn(common, pubsubApi, store % "compile->compile;test->test")
+
val extract = project
.in(file("modules/extract"))
.disablePlugins(RevolverPlugin)
@@ -534,7 +564,7 @@ val backend = project
Dependencies.http4sClient ++
Dependencies.emil
)
- .dependsOn(store, joexapi, ftsclient, totp)
+ .dependsOn(store, joexapi, ftsclient, totp, pubsubApi)
val oidc = project
.in(file("modules/oidc"))
@@ -625,7 +655,8 @@ val joex = project
analysis,
joexapi,
restapi,
- ftssolr
+ ftssolr,
+ pubsubNaive
)
val restserver = project
@@ -689,7 +720,7 @@ val restserver = project
}
}
)
- .dependsOn(config, restapi, joexapi, backend, webapp, ftssolr, oidc)
+ .dependsOn(config, restapi, joexapi, backend, webapp, ftssolr, oidc, pubsubNaive)
// --- Website Documentation
@@ -781,7 +812,9 @@ val root = project
query.jvm,
query.js,
totp,
- oidc
+ oidc,
+ pubsubApi,
+ pubsubNaive
)
// --- Helpers
diff --git a/modules/backend/src/main/scala/docspell/backend/BackendApp.scala b/modules/backend/src/main/scala/docspell/backend/BackendApp.scala
index 4aea530f..cdc92fcc 100644
--- a/modules/backend/src/main/scala/docspell/backend/BackendApp.scala
+++ b/modules/backend/src/main/scala/docspell/backend/BackendApp.scala
@@ -6,8 +6,6 @@
package docspell.backend
-import scala.concurrent.ExecutionContext
-
import cats.effect._
import docspell.backend.auth.Login
@@ -15,15 +13,13 @@ import docspell.backend.fulltext.CreateIndex
import docspell.backend.ops._
import docspell.backend.signup.OSignup
import docspell.ftsclient.FtsClient
-import docspell.joexapi.client.JoexClient
+import docspell.pubsub.api.PubSubT
import docspell.store.Store
import docspell.store.queue.JobQueue
import docspell.store.usertask.UserTaskStore
import docspell.totp.Totp
import emil.javamail.{JavaMailEmil, Settings}
-import org.http4s.blaze.client.BlazeClientBuilder
-import org.http4s.client.Client
trait BackendApp[F[_]] {
@@ -49,6 +45,7 @@ trait BackendApp[F[_]] {
def clientSettings: OClientSettings[F]
def totp: OTotp[F]
def share: OShare[F]
+ def pubSub: PubSubT[F]
}
object BackendApp {
@@ -56,8 +53,8 @@ object BackendApp {
def create[F[_]: Async](
cfg: Config,
store: Store[F],
- httpClient: Client[F],
- ftsClient: FtsClient[F]
+ ftsClient: FtsClient[F],
+ pubSubT: PubSubT[F]
): Resource[F, BackendApp[F]] =
for {
utStore <- UserTaskStore(store)
@@ -65,7 +62,7 @@ object BackendApp {
totpImpl <- OTotp(store, Totp.default)
loginImpl <- Login[F](store, Totp.default)
signupImpl <- OSignup[F](store)
- joexImpl <- OJoex(JoexClient(httpClient), store)
+ joexImpl <- OJoex(pubSubT)
collImpl <- OCollective[F](store, utStore, queue, joexImpl)
sourceImpl <- OSource[F](store)
tagImpl <- OTag[F](store)
@@ -90,6 +87,7 @@ object BackendApp {
OShare(store, itemSearchImpl, simpleSearchImpl, javaEmil)
)
} yield new BackendApp[F] {
+ val pubSub = pubSubT
val login = loginImpl
val signup = signupImpl
val collective = collImpl
@@ -113,15 +111,4 @@ object BackendApp {
val totp = totpImpl
val share = shareImpl
}
-
- def apply[F[_]: Async](
- cfg: Config,
- connectEC: ExecutionContext
- )(ftsFactory: Client[F] => Resource[F, FtsClient[F]]): Resource[F, BackendApp[F]] =
- for {
- store <- Store.create(cfg.jdbc, cfg.files.chunkSize, connectEC)
- httpClient <- BlazeClientBuilder[F].resource
- ftsClient <- ftsFactory(httpClient)
- backend <- create(cfg, store, httpClient, ftsClient)
- } yield backend
}
diff --git a/modules/backend/src/main/scala/docspell/backend/msg/CancelJob.scala b/modules/backend/src/main/scala/docspell/backend/msg/CancelJob.scala
new file mode 100644
index 00000000..677f1fba
--- /dev/null
+++ b/modules/backend/src/main/scala/docspell/backend/msg/CancelJob.scala
@@ -0,0 +1,27 @@
+/*
+ * Copyright 2020 Eike K. & Contributors
+ *
+ * SPDX-License-Identifier: AGPL-3.0-or-later
+ */
+
+package docspell.backend.msg
+
+import docspell.common._
+import docspell.pubsub.api.{Topic, TypedTopic}
+
+import io.circe.generic.semiauto.{deriveDecoder, deriveEncoder}
+import io.circe.{Decoder, Encoder}
+
+/** Message to request to cancel a job. */
+final case class CancelJob(jobId: Ident, nodeId: Ident)
+
+object CancelJob {
+ implicit val jsonDecoder: Decoder[CancelJob] =
+ deriveDecoder[CancelJob]
+
+ implicit val jsonEncoder: Encoder[CancelJob] =
+ deriveEncoder[CancelJob]
+
+ val topic: TypedTopic[CancelJob] =
+ TypedTopic(Topic("job-cancel-request"))
+}
diff --git a/modules/backend/src/main/scala/docspell/backend/msg/JobDone.scala b/modules/backend/src/main/scala/docspell/backend/msg/JobDone.scala
new file mode 100644
index 00000000..9b09b8f1
--- /dev/null
+++ b/modules/backend/src/main/scala/docspell/backend/msg/JobDone.scala
@@ -0,0 +1,26 @@
+/*
+ * Copyright 2020 Eike K. & Contributors
+ *
+ * SPDX-License-Identifier: AGPL-3.0-or-later
+ */
+
+package docspell.backend.msg
+
+import docspell.common._
+import docspell.pubsub.api.{Topic, TypedTopic}
+
+import io.circe.generic.semiauto.{deriveDecoder, deriveEncoder}
+import io.circe.{Decoder, Encoder}
+
+/** Message to notify about finished jobs. They have a final state. */
+final case class JobDone(jobId: Ident, task: Ident, args: String, state: JobState)
+object JobDone {
+ implicit val jsonDecoder: Decoder[JobDone] =
+ deriveDecoder[JobDone]
+
+ implicit val jsonEncoder: Encoder[JobDone] =
+ deriveEncoder[JobDone]
+
+ val topic: TypedTopic[JobDone] =
+ TypedTopic(Topic("job-finished"))
+}
diff --git a/modules/backend/src/main/scala/docspell/backend/msg/Topics.scala b/modules/backend/src/main/scala/docspell/backend/msg/Topics.scala
new file mode 100644
index 00000000..26c594f7
--- /dev/null
+++ b/modules/backend/src/main/scala/docspell/backend/msg/Topics.scala
@@ -0,0 +1,23 @@
+/*
+ * Copyright 2020 Eike K. & Contributors
+ *
+ * SPDX-License-Identifier: AGPL-3.0-or-later
+ */
+
+package docspell.backend.msg
+
+import cats.data.NonEmptyList
+
+import docspell.pubsub.api.{Topic, TypedTopic}
+
+/** All topics used in Docspell. */
+object Topics {
+
+ /** A generic notification to the job executors to look for new work. */
+ val jobsNotify: TypedTopic[Unit] =
+ TypedTopic[Unit](Topic("jobs-notify"))
+
+ /** A list of all topics. It is required to list every topic in use here! */
+ val all: NonEmptyList[TypedTopic[_]] =
+ NonEmptyList.of(JobDone.topic, CancelJob.topic, jobsNotify)
+}
diff --git a/modules/backend/src/main/scala/docspell/backend/ops/OJob.scala b/modules/backend/src/main/scala/docspell/backend/ops/OJob.scala
index 4ab45fb4..7f7a36fd 100644
--- a/modules/backend/src/main/scala/docspell/backend/ops/OJob.scala
+++ b/modules/backend/src/main/scala/docspell/backend/ops/OJob.scala
@@ -11,8 +11,7 @@ import cats.effect._
import cats.implicits._
import docspell.backend.ops.OJob.{CollectiveQueueState, JobCancelResult}
-import docspell.common.Priority
-import docspell.common.{Ident, JobState}
+import docspell.common._
import docspell.store.Store
import docspell.store.UpdateResult
import docspell.store.queries.QJob
@@ -55,6 +54,7 @@ object OJob {
joex: OJoex[F]
): Resource[F, OJob[F]] =
Resource.pure[F, OJob[F]](new OJob[F] {
+ private[this] val logger = Logger.log4s(org.log4s.getLogger(OJob.getClass))
def queueState(collective: Ident, maxResults: Int): F[CollectiveQueueState] =
store
@@ -77,11 +77,9 @@ object OJob {
job.worker match {
case Some(worker) =>
for {
- flag <- joex.cancelJob(job.id, worker)
- res <-
- if (flag) JobCancelResult.cancelRequested.pure[F]
- else remove(job)
- } yield res
+ _ <- logger.debug(s"Attempt to cancel job: ${job.id.id}")
+ _ <- joex.cancelJob(job.id, worker)
+ } yield JobCancelResult.cancelRequested
case None =>
remove(job)
}
diff --git a/modules/backend/src/main/scala/docspell/backend/ops/OJoex.scala b/modules/backend/src/main/scala/docspell/backend/ops/OJoex.scala
index d4aaf956..1ce43149 100644
--- a/modules/backend/src/main/scala/docspell/backend/ops/OJoex.scala
+++ b/modules/backend/src/main/scala/docspell/backend/ops/OJoex.scala
@@ -6,41 +6,27 @@
package docspell.backend.ops
-import cats.data.OptionT
import cats.effect._
-import cats.implicits._
-import docspell.common.{Ident, NodeType}
-import docspell.joexapi.client.JoexClient
-import docspell.store.Store
-import docspell.store.records.RNode
+import docspell.backend.msg.{CancelJob, Topics}
+import docspell.common.Ident
+import docspell.pubsub.api.PubSubT
trait OJoex[F[_]] {
def notifyAllNodes: F[Unit]
- def cancelJob(job: Ident, worker: Ident): F[Boolean]
-
+ def cancelJob(job: Ident, worker: Ident): F[Unit]
}
object OJoex {
-
- def apply[F[_]: Sync](client: JoexClient[F], store: Store[F]): Resource[F, OJoex[F]] =
+ def apply[F[_]](pubSub: PubSubT[F]): Resource[F, OJoex[F]] =
Resource.pure[F, OJoex[F]](new OJoex[F] {
+
def notifyAllNodes: F[Unit] =
- for {
- nodes <- store.transact(RNode.findAll(NodeType.Joex))
- _ <- nodes.toList.traverse(n => client.notifyJoexIgnoreErrors(n.url))
- } yield ()
+ pubSub.publish1IgnoreErrors(Topics.jobsNotify, ())
- def cancelJob(job: Ident, worker: Ident): F[Boolean] =
- (for {
- node <- OptionT(store.transact(RNode.findById(worker)))
- cancel <- OptionT.liftF(client.cancelJob(node.url, job))
- } yield cancel.success).getOrElse(false)
+ def cancelJob(job: Ident, worker: Ident): F[Unit] =
+ pubSub.publish1IgnoreErrors(CancelJob.topic, CancelJob(job, worker))
})
-
- def create[F[_]: Async](store: Store[F]): Resource[F, OJoex[F]] =
- JoexClient.resource.flatMap(client => apply(client, store))
-
}
diff --git a/modules/common/src/main/scala/docspell/common/Logger.scala b/modules/common/src/main/scala/docspell/common/Logger.scala
index 936c9d34..01265ef4 100644
--- a/modules/common/src/main/scala/docspell/common/Logger.scala
+++ b/modules/common/src/main/scala/docspell/common/Logger.scala
@@ -6,6 +6,7 @@
package docspell.common
+import cats.Applicative
import cats.effect.Sync
import fs2.Stream
@@ -45,6 +46,27 @@ trait Logger[F[_]] { self =>
object Logger {
+ def off[F[_]: Applicative]: Logger[F] =
+ new Logger[F] {
+ def trace(msg: => String): F[Unit] =
+ Applicative[F].pure(())
+
+ def debug(msg: => String): F[Unit] =
+ Applicative[F].pure(())
+
+ def info(msg: => String): F[Unit] =
+ Applicative[F].pure(())
+
+ def warn(msg: => String): F[Unit] =
+ Applicative[F].pure(())
+
+ def error(ex: Throwable)(msg: => String): F[Unit] =
+ Applicative[F].pure(())
+
+ def error(msg: => String): F[Unit] =
+ Applicative[F].pure(())
+ }
+
def log4s[F[_]: Sync](log: Log4sLogger): Logger[F] =
new Logger[F] {
def trace(msg: => String): F[Unit] =
diff --git a/modules/common/src/main/scala/docspell/common/syntax/StringSyntax.scala b/modules/common/src/main/scala/docspell/common/syntax/StringSyntax.scala
index 6fc06440..40d5bf80 100644
--- a/modules/common/src/main/scala/docspell/common/syntax/StringSyntax.scala
+++ b/modules/common/src/main/scala/docspell/common/syntax/StringSyntax.scala
@@ -12,9 +12,7 @@ import io.circe.Decoder
import io.circe.parser._
trait StringSyntax {
-
implicit class EvenMoreStringOps(s: String) {
-
def asNonBlank: Option[String] =
Option(s).filter(_.trim.nonEmpty)
@@ -24,5 +22,6 @@ trait StringSyntax {
value <- json.as[A]
} yield value
}
-
}
+
+object StringSyntax extends StringSyntax
diff --git a/modules/joex/src/main/scala/docspell/joex/Config.scala b/modules/joex/src/main/scala/docspell/joex/Config.scala
index 2554d9bf..81172ff2 100644
--- a/modules/joex/src/main/scala/docspell/joex/Config.scala
+++ b/modules/joex/src/main/scala/docspell/joex/Config.scala
@@ -20,6 +20,7 @@ import docspell.joex.analysis.RegexNerFile
import docspell.joex.hk.HouseKeepingConfig
import docspell.joex.scheduler.{PeriodicSchedulerConfig, SchedulerConfig}
import docspell.joex.updatecheck.UpdateCheckConfig
+import docspell.pubsub.naive.PubSubConfig
import docspell.store.JdbcConfig
case class Config(
@@ -39,7 +40,11 @@ case class Config(
mailDebug: Boolean,
fullTextSearch: Config.FullTextSearch,
updateCheck: UpdateCheckConfig
-)
+) {
+
+ def pubSubConfig: PubSubConfig =
+ PubSubConfig(appId, baseUrl / "internal" / "pubsub", 100)
+}
object Config {
case class Bind(address: String, port: Int)
diff --git a/modules/joex/src/main/scala/docspell/joex/JoexAppImpl.scala b/modules/joex/src/main/scala/docspell/joex/JoexAppImpl.scala
index 86c65efc..2ec86d11 100644
--- a/modules/joex/src/main/scala/docspell/joex/JoexAppImpl.scala
+++ b/modules/joex/src/main/scala/docspell/joex/JoexAppImpl.scala
@@ -6,14 +6,13 @@
package docspell.joex
-import scala.concurrent.ExecutionContext
-
import cats.effect._
import cats.implicits._
import fs2.concurrent.SignallingRef
import docspell.analysis.TextAnalyser
import docspell.backend.fulltext.CreateIndex
+import docspell.backend.msg.{CancelJob, Topics}
import docspell.backend.ops._
import docspell.common._
import docspell.ftsclient.FtsClient
@@ -34,6 +33,7 @@ import docspell.joex.scanmailbox._
import docspell.joex.scheduler._
import docspell.joex.updatecheck._
import docspell.joexapi.client.JoexClient
+import docspell.pubsub.api.{PubSub, PubSubT}
import docspell.store.Store
import docspell.store.queue._
import docspell.store.records.{REmptyTrashSetting, RJobLog}
@@ -41,20 +41,18 @@ import docspell.store.usertask.UserTaskScope
import docspell.store.usertask.UserTaskStore
import emil.javamail._
-import org.http4s.blaze.client.BlazeClientBuilder
import org.http4s.client.Client
final class JoexAppImpl[F[_]: Async](
cfg: Config,
- nodeOps: ONode[F],
store: Store[F],
queue: JobQueue[F],
+ pubSubT: PubSubT[F],
pstore: PeriodicTaskStore[F],
termSignal: SignallingRef[F, Boolean],
val scheduler: Scheduler[F],
val periodicScheduler: PeriodicScheduler[F]
) extends JoexApp[F] {
-
def init: F[Unit] = {
val run = scheduler.start.compile.drain
val prun = periodicScheduler.start.compile.drain
@@ -64,16 +62,23 @@ final class JoexAppImpl[F[_]: Async](
_ <- Async[F].start(prun)
_ <- scheduler.periodicAwake
_ <- periodicScheduler.periodicAwake
- _ <- nodeOps.register(cfg.appId, NodeType.Joex, cfg.baseUrl)
+ _ <- subscriptions
} yield ()
}
+ def subscriptions =
+ for {
+ _ <- Async[F].start(pubSubT.subscribeSink(Topics.jobsNotify) { _ =>
+ scheduler.notifyChange
+ })
+ _ <- Async[F].start(pubSubT.subscribeSink(CancelJob.topic) { msg =>
+ scheduler.requestCancel(msg.body.jobId).as(())
+ })
+ } yield ()
+
def findLogs(jobId: Ident): F[Vector[RJobLog]] =
store.transact(RJobLog.findLogs(jobId))
- def shutdown: F[Unit] =
- nodeOps.unregister(cfg.appId)
-
def initShutdown: F[Unit] =
periodicScheduler.shutdown *> scheduler.shutdown(false) *> termSignal.set(true)
@@ -116,16 +121,19 @@ object JoexAppImpl {
def create[F[_]: Async](
cfg: Config,
termSignal: SignallingRef[F, Boolean],
- connectEC: ExecutionContext
+ store: Store[F],
+ httpClient: Client[F],
+ pubSub: PubSub[F]
): Resource[F, JoexApp[F]] =
for {
- httpClient <- BlazeClientBuilder[F].resource
- client = JoexClient(httpClient)
- store <- Store.create(cfg.jdbc, cfg.files.chunkSize, connectEC)
queue <- JobQueue(store)
pstore <- PeriodicTaskStore.create(store)
- nodeOps <- ONode(store)
- joex <- OJoex(client, store)
+ client = JoexClient(httpClient)
+ pubSubT = PubSubT(
+ pubSub,
+ Logger.log4s(org.log4s.getLogger(s"joex-${cfg.appId.id}"))
+ )
+ joex <- OJoex(pubSubT)
upload <- OUpload(store, queue, joex)
fts <- createFtsClient(cfg)(httpClient)
createIndex <- CreateIndex.resource(fts, store)
@@ -138,6 +146,7 @@ object JoexAppImpl {
JavaMailEmil(Settings.defaultSettings.copy(debug = cfg.mailDebug))
sch <- SchedulerBuilder(cfg.scheduler, store)
.withQueue(queue)
+ .withPubSub(pubSubT)
.withTask(
JobTask.json(
ProcessItemArgs.taskName,
@@ -264,8 +273,8 @@ object JoexAppImpl {
pstore,
client
)
- app = new JoexAppImpl(cfg, nodeOps, store, queue, pstore, termSignal, sch, psch)
- appR <- Resource.make(app.init.map(_ => app))(_.shutdown)
+ app = new JoexAppImpl(cfg, store, queue, pubSubT, pstore, termSignal, sch, psch)
+ appR <- Resource.make(app.init.map(_ => app))(_.initShutdown)
} yield appR
private def createFtsClient[F[_]: Async](
diff --git a/modules/joex/src/main/scala/docspell/joex/JoexServer.scala b/modules/joex/src/main/scala/docspell/joex/JoexServer.scala
index 8c4773dc..cad75cbc 100644
--- a/modules/joex/src/main/scala/docspell/joex/JoexServer.scala
+++ b/modules/joex/src/main/scala/docspell/joex/JoexServer.scala
@@ -11,10 +11,14 @@ import cats.effect._
import fs2.Stream
import fs2.concurrent.SignallingRef
+import docspell.backend.msg.Topics
import docspell.common.Pools
import docspell.joex.routes._
+import docspell.pubsub.naive.NaivePubSub
+import docspell.store.Store
import org.http4s.HttpApp
+import org.http4s.blaze.client.BlazeClientBuilder
import org.http4s.blaze.server.BlazeServerBuilder
import org.http4s.implicits._
import org.http4s.server.Router
@@ -33,9 +37,19 @@ object JoexServer {
val app = for {
signal <- Resource.eval(SignallingRef[F, Boolean](false))
exitCode <- Resource.eval(Ref[F].of(ExitCode.Success))
- joexApp <- JoexAppImpl.create[F](cfg, signal, pools.connectEC)
+
+ store <- Store.create[F](
+ cfg.jdbc,
+ cfg.files.chunkSize,
+ pools.connectEC
+ )
+ httpClient <- BlazeClientBuilder[F].resource
+ pubSub <- NaivePubSub(cfg.pubSubConfig, store, httpClient)(Topics.all.map(_.topic))
+
+ joexApp <- JoexAppImpl.create[F](cfg, signal, store, httpClient, pubSub)
httpApp = Router(
+ "/internal/pubsub" -> pubSub.receiveRoute,
"/api/info" -> InfoRoutes(cfg),
"/api/v1" -> JoexRoutes(joexApp)
).orNotFound
diff --git a/modules/joex/src/main/scala/docspell/joex/scheduler/SchedulerBuilder.scala b/modules/joex/src/main/scala/docspell/joex/scheduler/SchedulerBuilder.scala
index 4f981a87..cc09f7da 100644
--- a/modules/joex/src/main/scala/docspell/joex/scheduler/SchedulerBuilder.scala
+++ b/modules/joex/src/main/scala/docspell/joex/scheduler/SchedulerBuilder.scala
@@ -11,6 +11,7 @@ import cats.effect.std.Semaphore
import cats.implicits._
import fs2.concurrent.SignallingRef
+import docspell.pubsub.api.PubSubT
import docspell.store.Store
import docspell.store.queue.JobQueue
@@ -19,7 +20,8 @@ case class SchedulerBuilder[F[_]: Async](
tasks: JobTaskRegistry[F],
store: Store[F],
queue: Resource[F, JobQueue[F]],
- logSink: LogSink[F]
+ logSink: LogSink[F],
+ pubSub: PubSubT[F]
) {
def withConfig(cfg: SchedulerConfig): SchedulerBuilder[F] =
@@ -32,7 +34,7 @@ case class SchedulerBuilder[F[_]: Async](
withTaskRegistry(tasks.withTask(task))
def withQueue(queue: Resource[F, JobQueue[F]]): SchedulerBuilder[F] =
- SchedulerBuilder[F](config, tasks, store, queue, logSink)
+ copy(queue = queue)
def withLogSink(sink: LogSink[F]): SchedulerBuilder[F] =
copy(logSink = sink)
@@ -40,6 +42,9 @@ case class SchedulerBuilder[F[_]: Async](
def withQueue(queue: JobQueue[F]): SchedulerBuilder[F] =
copy(queue = Resource.pure[F, JobQueue[F]](queue))
+ def withPubSub(pubSubT: PubSubT[F]): SchedulerBuilder[F] =
+ copy(pubSub = pubSubT)
+
def serve: Resource[F, Scheduler[F]] =
resource.evalMap(sch => Async[F].start(sch.start.compile.drain).map(_ => sch))
@@ -52,6 +57,7 @@ case class SchedulerBuilder[F[_]: Async](
} yield new SchedulerImpl[F](
config,
jq,
+ pubSub,
tasks,
store,
logSink,
@@ -76,7 +82,8 @@ object SchedulerBuilder {
JobTaskRegistry.empty[F],
store,
JobQueue(store),
- LogSink.db[F](store)
+ LogSink.db[F](store),
+ PubSubT.noop[F]
)
}
diff --git a/modules/joex/src/main/scala/docspell/joex/scheduler/SchedulerImpl.scala b/modules/joex/src/main/scala/docspell/joex/scheduler/SchedulerImpl.scala
index d01d6756..e38a282b 100644
--- a/modules/joex/src/main/scala/docspell/joex/scheduler/SchedulerImpl.scala
+++ b/modules/joex/src/main/scala/docspell/joex/scheduler/SchedulerImpl.scala
@@ -13,19 +13,22 @@ import cats.implicits._
import fs2.Stream
import fs2.concurrent.SignallingRef
+import docspell.backend.msg.JobDone
import docspell.common._
import docspell.common.syntax.all._
import docspell.joex.scheduler.SchedulerImpl._
+import docspell.pubsub.api.PubSubT
import docspell.store.Store
import docspell.store.queries.QJob
import docspell.store.queue.JobQueue
import docspell.store.records.RJob
-import org.log4s._
+import org.log4s.getLogger
final class SchedulerImpl[F[_]: Async](
val config: SchedulerConfig,
queue: JobQueue[F],
+ pubSub: PubSubT[F],
tasks: JobTaskRegistry[F],
store: Store[F],
logSink: LogSink[F],
@@ -55,20 +58,21 @@ final class SchedulerImpl[F[_]: Async](
state.get.flatMap(s => QJob.findAll(s.getRunning, store))
def requestCancel(jobId: Ident): F[Boolean] =
- state.get.flatMap(_.cancelRequest(jobId) match {
- case Some(ct) => ct.map(_ => true)
- case None =>
- (for {
- job <- OptionT(store.transact(RJob.findByIdAndWorker(jobId, config.name)))
- _ <- OptionT.liftF(
- if (job.isInProgress) executeCancel(job)
- else ().pure[F]
- )
- } yield true)
- .getOrElseF(
- logger.fwarn(s"Job ${jobId.id} not found, cannot cancel.").map(_ => false)
- )
- })
+ logger.finfo(s"Scheduler requested to cancel job: ${jobId.id}") *>
+ state.get.flatMap(_.cancelRequest(jobId) match {
+ case Some(ct) => ct.map(_ => true)
+ case None =>
+ (for {
+ job <- OptionT(store.transact(RJob.findByIdAndWorker(jobId, config.name)))
+ _ <- OptionT.liftF(
+ if (job.isInProgress) executeCancel(job)
+ else ().pure[F]
+ )
+ } yield true)
+ .getOrElseF(
+ logger.fwarn(s"Job ${jobId.id} not found, cannot cancel.").map(_ => false)
+ )
+ })
def notifyChange: F[Unit] =
waiter.update(b => !b)
@@ -198,6 +202,10 @@ final class SchedulerImpl[F[_]: Async](
)
_ <- state.modify(_.removeRunning(job))
_ <- QJob.setFinalState(job.id, finalState, store)
+ _ <- pubSub.publish1IgnoreErrors(
+ JobDone.topic,
+ JobDone(job.id, job.task, job.args, finalState)
+ )
} yield ()
def onStart(job: RJob): F[Unit] =
diff --git a/modules/pubsub/api/src/main/scala/docspell/pubsub/api/Message.scala b/modules/pubsub/api/src/main/scala/docspell/pubsub/api/Message.scala
new file mode 100644
index 00000000..c6d27c02
--- /dev/null
+++ b/modules/pubsub/api/src/main/scala/docspell/pubsub/api/Message.scala
@@ -0,0 +1,28 @@
+/*
+ * Copyright 2020 Eike K. & Contributors
+ *
+ * SPDX-License-Identifier: AGPL-3.0-or-later
+ */
+
+package docspell.pubsub.api
+
+import io.circe.generic.semiauto.{deriveDecoder, deriveEncoder}
+import io.circe.{Decoder, Encoder, Json}
+
+final case class Message[A](head: MessageHead, body: A) {}
+
+object Message {
+ implicit val jsonDecoderJson: Decoder[Message[Json]] =
+ deriveDecoder[Message[Json]]
+
+ implicit val jsonEncoderJson: Encoder[Message[Json]] =
+ deriveEncoder[Message[Json]]
+
+ implicit def jsonDecoder[A](implicit da: Decoder[A]): Decoder[Message[A]] =
+ jsonDecoderJson.emap(mj =>
+ da.decodeJson(mj.body).map(b => mj.copy(body = b)).left.map(_.message)
+ )
+
+ implicit def jsonEncoder[A](implicit ea: Encoder[A]): Encoder[Message[A]] =
+ jsonEncoderJson.contramap(m => m.copy(body = ea(m.body)))
+}
diff --git a/modules/pubsub/api/src/main/scala/docspell/pubsub/api/MessageHead.scala b/modules/pubsub/api/src/main/scala/docspell/pubsub/api/MessageHead.scala
new file mode 100644
index 00000000..c604b52d
--- /dev/null
+++ b/modules/pubsub/api/src/main/scala/docspell/pubsub/api/MessageHead.scala
@@ -0,0 +1,22 @@
+/*
+ * Copyright 2020 Eike K. & Contributors
+ *
+ * SPDX-License-Identifier: AGPL-3.0-or-later
+ */
+
+package docspell.pubsub.api
+
+import docspell.common.{Ident, Timestamp}
+
+import io.circe.generic.semiauto.{deriveDecoder, deriveEncoder}
+import io.circe.{Decoder, Encoder}
+
+final case class MessageHead(id: Ident, send: Timestamp, topic: Topic)
+
+object MessageHead {
+ implicit val jsonDecoder: Decoder[MessageHead] =
+ deriveDecoder[MessageHead]
+
+ implicit val jsonEncoder: Encoder[MessageHead] =
+ deriveEncoder[MessageHead]
+}
diff --git a/modules/pubsub/api/src/main/scala/docspell/pubsub/api/PubSub.scala b/modules/pubsub/api/src/main/scala/docspell/pubsub/api/PubSub.scala
new file mode 100644
index 00000000..6a9eb9a5
--- /dev/null
+++ b/modules/pubsub/api/src/main/scala/docspell/pubsub/api/PubSub.scala
@@ -0,0 +1,36 @@
+/*
+ * Copyright 2020 Eike K. & Contributors
+ *
+ * SPDX-License-Identifier: AGPL-3.0-or-later
+ */
+
+package docspell.pubsub.api
+
+import cats.Applicative
+import cats.data.NonEmptyList
+import fs2.{Pipe, Stream}
+
+import docspell.common.{Ident, Timestamp}
+
+import io.circe.Json
+
+trait PubSub[F[_]] {
+ def publish1(topic: Topic, msg: Json): F[MessageHead]
+
+ def publish(topic: Topic): Pipe[F, Json, MessageHead]
+
+ def subscribe(topics: NonEmptyList[Topic]): Stream[F, Message[Json]]
+}
+object PubSub {
+ def noop[F[_]: Applicative]: PubSub[F] =
+ new PubSub[F] {
+ def publish1(topic: Topic, msg: Json): F[MessageHead] =
+ Applicative[F].pure(MessageHead(Ident.unsafe("0"), Timestamp.Epoch, topic))
+
+ def publish(topic: Topic): Pipe[F, Json, MessageHead] =
+ _ => Stream.empty
+
+ def subscribe(topics: NonEmptyList[Topic]): Stream[F, Message[Json]] =
+ Stream.empty
+ }
+}
diff --git a/modules/pubsub/api/src/main/scala/docspell/pubsub/api/PubSubT.scala b/modules/pubsub/api/src/main/scala/docspell/pubsub/api/PubSubT.scala
new file mode 100644
index 00000000..d07f5e41
--- /dev/null
+++ b/modules/pubsub/api/src/main/scala/docspell/pubsub/api/PubSubT.scala
@@ -0,0 +1,85 @@
+/*
+ * Copyright 2020 Eike K. & Contributors
+ *
+ * SPDX-License-Identifier: AGPL-3.0-or-later
+ */
+
+package docspell.pubsub.api
+
+import cats.data.NonEmptyList
+import cats.effect._
+import cats.implicits._
+import fs2.concurrent.SignallingRef
+import fs2.{Pipe, Stream}
+
+import docspell.common.Logger
+
+trait PubSubT[F[_]] {
+
+ def publish1[A](topic: TypedTopic[A], msg: A): F[MessageHead]
+
+ def publish1IgnoreErrors[A](topic: TypedTopic[A], msg: A): F[Unit]
+
+ def publish[A](topic: TypedTopic[A]): Pipe[F, A, MessageHead]
+
+ def subscribe[A](topic: TypedTopic[A]): Stream[F, Message[A]]
+
+ def subscribeSink[A](topic: TypedTopic[A])(handler: Message[A] => F[Unit]): F[F[Unit]]
+
+ def delegate: PubSub[F]
+
+ def withDelegate(delegate: PubSub[F]): PubSubT[F]
+}
+
+object PubSubT {
+ def noop[F[_]: Async]: PubSubT[F] =
+ PubSubT(PubSub.noop[F], Logger.off[F])
+
+ def apply[F[_]: Async](pubSub: PubSub[F], logger: Logger[F]): PubSubT[F] =
+ new PubSubT[F] {
+ def publish1[A](topic: TypedTopic[A], msg: A): F[MessageHead] =
+ pubSub.publish1(topic.topic, topic.codec(msg))
+
+ def publish1IgnoreErrors[A](topic: TypedTopic[A], msg: A): F[Unit] =
+ publish1(topic, msg).attempt.flatMap {
+ case Right(_) => ().pure[F]
+ case Left(ex) =>
+ logger.error(ex)(s"Error publishing to topic ${topic.topic.name}: $msg")
+ }
+
+ def publish[A](topic: TypedTopic[A]): Pipe[F, A, MessageHead] =
+ _.map(topic.codec.apply).through(pubSub.publish(topic.topic))
+
+ def subscribe[A](topic: TypedTopic[A]): Stream[F, Message[A]] =
+ pubSub
+ .subscribe(NonEmptyList.of(topic.topic))
+ .flatMap(m =>
+ m.body.as[A](topic.codec) match {
+ case Right(a) => Stream.emit(Message(m.head, a))
+ case Left(err) =>
+ logger.s
+ .error(err)(
+ s"Could not decode message to topic ${topic.name} to ${topic.msgClass}: ${m.body.noSpaces}"
+ )
+ .drain
+ }
+ )
+
+ def subscribeSink[A](
+ topic: TypedTopic[A]
+ )(handler: Message[A] => F[Unit]): F[F[Unit]] =
+ for {
+ halt <- SignallingRef.of[F, Boolean](false)
+ _ <- subscribe(topic)
+ .evalMap(handler)
+ .interruptWhen(halt)
+ .compile
+ .drain
+ } yield halt.set(true)
+
+ def delegate: PubSub[F] = pubSub
+
+ def withDelegate(newDelegate: PubSub[F]): PubSubT[F] =
+ PubSubT(newDelegate, logger)
+ }
+}
diff --git a/modules/pubsub/api/src/main/scala/docspell/pubsub/api/Topic.scala b/modules/pubsub/api/src/main/scala/docspell/pubsub/api/Topic.scala
new file mode 100644
index 00000000..ab9d664b
--- /dev/null
+++ b/modules/pubsub/api/src/main/scala/docspell/pubsub/api/Topic.scala
@@ -0,0 +1,26 @@
+/*
+ * Copyright 2020 Eike K. & Contributors
+ *
+ * SPDX-License-Identifier: AGPL-3.0-or-later
+ */
+
+package docspell.pubsub.api
+
+import io.circe.{Decoder, Encoder}
+
+final case class Topic private (topic: String) {
+ def name: String = topic
+}
+
+object Topic {
+ implicit val jsonDecoder: Decoder[Topic] =
+ Decoder.decodeString.map(Topic.apply)
+
+ implicit val jsonEncoder: Encoder[Topic] =
+ Encoder.encodeString.contramap(_.topic)
+
+ def apply(name: String): Topic = {
+ require(name.trim.nonEmpty)
+ new Topic(name)
+ }
+}
diff --git a/modules/pubsub/api/src/main/scala/docspell/pubsub/api/TypedTopic.scala b/modules/pubsub/api/src/main/scala/docspell/pubsub/api/TypedTopic.scala
new file mode 100644
index 00000000..542144ee
--- /dev/null
+++ b/modules/pubsub/api/src/main/scala/docspell/pubsub/api/TypedTopic.scala
@@ -0,0 +1,29 @@
+/*
+ * Copyright 2020 Eike K. & Contributors
+ *
+ * SPDX-License-Identifier: AGPL-3.0-or-later
+ */
+
+package docspell.pubsub.api
+
+import scala.reflect.{ClassTag, classTag}
+
+import io.circe.{Codec, Decoder, Encoder}
+
+final case class TypedTopic[A](topic: Topic, codec: Codec[A], msgClass: Class[_]) {
+ def name: String = topic.name
+
+ def withTopic(topic: Topic): TypedTopic[A] =
+ copy(topic = topic)
+
+ def withName(name: String): TypedTopic[A] =
+ withTopic(Topic(name))
+}
+
+object TypedTopic {
+
+ def apply[A: ClassTag](
+ topic: Topic
+ )(implicit dec: Decoder[A], enc: Encoder[A]): TypedTopic[A] =
+ TypedTopic(topic, Codec.from(dec, enc), classTag[A].runtimeClass)
+}
diff --git a/modules/pubsub/naive/src/main/scala/docspell/pubsub/naive/NaivePubSub.scala b/modules/pubsub/naive/src/main/scala/docspell/pubsub/naive/NaivePubSub.scala
new file mode 100644
index 00000000..16ad7096
--- /dev/null
+++ b/modules/pubsub/naive/src/main/scala/docspell/pubsub/naive/NaivePubSub.scala
@@ -0,0 +1,205 @@
+/*
+ * Copyright 2020 Eike K. & Contributors
+ *
+ * SPDX-License-Identifier: AGPL-3.0-or-later
+ */
+
+package docspell.pubsub.naive
+
+import cats.data.NonEmptyList
+import cats.effect._
+import cats.implicits._
+import fs2.Pipe
+import fs2.Stream
+import fs2.concurrent.{Topic => Fs2Topic}
+
+import docspell.common._
+import docspell.pubsub.api._
+import docspell.pubsub.naive.NaivePubSub.State
+import docspell.store.Store
+import docspell.store.records.RPubSub
+
+import io.circe.Json
+import org.http4s.circe.CirceEntityCodec._
+import org.http4s.client.Client
+import org.http4s.client.dsl.Http4sClientDsl
+import org.http4s.dsl.Http4sDsl
+import org.http4s.{HttpRoutes, Uri}
+
+/** A pubsub implementation that can be used across machines, using a rather inefficient
+ * but simple protocol. It can therefore work with the current setup, i.e. not requiring
+ * to add another complex piece of software to the mix, like Kafka or RabbitMQ.
+ *
+ * However, the api should allow to be used on top of such a tool. This implementation
+ * can be used in a personal setting, where there are only a few nodes.
+ *
+ * How it works: Each node has a set of local subscribers and a http endpoint. If it
+ * publishes a message, it notifies all local subscribers and sends out a json message to
+ * all endpoints that are registered for this topic. If it receives a messagen through
+ * its endpoint, it notifies all local subscribers.
+ *
+ * It is build on the `Topic` class from fs2.concurrent. A map of the name to such a
+ * `Topic` instance is maintained. To work across machines, the database is used as a
+ * synchronization point. Each node must provide a http api and so its "callback" URL is
+ * added into the database associated to a topic name.
+ *
+ * When publishing a message, the message can be published to the internal fs2 topic.
+ * Then all URLs to this topic name are looked up in the database and the message is
+ * POSTed to each URL as JSON. The endpoint of each machine takes this message and
+ * publishes it to its own internal fs2.concurrent.Topic instance.
+ *
+ * Obviously, this doesn't scale well to lots of machines and messages. It should be good
+ * enough for personal use, where there are only a small amount of machines and messages.
+ *
+ * The main use case for docspell is to communicate between the rest-server and job
+ * executor. It is for internal communication and all topics are known at compile time.
+ */
+final class NaivePubSub[F[_]: Async](
+ cfg: PubSubConfig,
+ state: Ref[F, State[F]],
+ store: Store[F],
+ client: Client[F]
+) extends PubSub[F] {
+ private val logger: Logger[F] = Logger.log4s(org.log4s.getLogger)
+
+ def withClient(client: Client[F]): NaivePubSub[F] =
+ new NaivePubSub[F](cfg, state, store, client)
+
+ def publish1(topic: Topic, msgBody: Json): F[MessageHead] =
+ for {
+ head <- mkMessageHead(topic)
+ msg = Message(head, msgBody)
+ _ <- logger.trace(s"Publishing: $msg")
+ // go through all local subscribers and publish to the fs2 topic
+ _ <- publishLocal(msg)
+ // get all remote subscribers from the database and send the message via http
+ _ <- publishRemote(msg)
+ } yield head
+
+ def publish(topic: Topic): Pipe[F, Json, MessageHead] =
+ ms => //TODO Do some optimization by grouping messages to the same topic
+ ms.evalMap(publish1(topic, _))
+
+ def subscribe(topics: NonEmptyList[Topic]): Stream[F, Message[Json]] =
+ (for {
+ _ <- logger.s.info(s"Adding subscriber for topics: $topics")
+ _ <- Stream.resource[F, Unit](addRemote(topics))
+ m <- Stream.eval(addLocal(topics))
+ } yield m).flatten
+
+ /** Receive messages from remote publishers and passes them to the local subscribers. */
+ def receiveRoute: HttpRoutes[F] = {
+ val dsl = new Http4sDsl[F] {}
+ import dsl._
+
+ HttpRoutes.of { case req @ POST -> Root =>
+ for {
+ data <- req.as[List[Message[Json]]]
+ _ <- logger.trace(s"Received external message(s): $data")
+ _ <- data.traverse(publishLocal)
+ resp <- Ok(())
+ } yield resp
+ }
+ }
+
+ // ---- private helpers
+
+ private def mkMessageHead(topic: Topic): F[MessageHead] =
+ for {
+ id <- Ident.randomId[F]
+ ts <- Timestamp.current[F]
+ head = MessageHead(id, ts, topic)
+ } yield head
+
+ private def addLocal(topics: NonEmptyList[Topic]): F[Stream[F, Message[Json]]] = {
+ val topicSet = topics.map(_.name).toList.toSet
+ for {
+ st <- state.get
+ tpc = st.topics.view.filterKeys(topicSet.contains)
+ _ <-
+ if (tpc.isEmpty)
+ logger.warn(s"Subscribing to 0 topics! Topics $topics were not initialized")
+ else ().pure[F]
+ data = tpc.values.toList.traverse(t => t.subscribe(cfg.subscriberQueueSize))
+ out = data.flatMap(msgs => Stream.emits(msgs))
+ } yield out
+ }
+
+ private def addRemote(topics: NonEmptyList[Topic]): Resource[F, Unit] = {
+ def subscribe: F[Unit] =
+ logger.trace(s"Incrementing counter for topics: $topics") *>
+ store.transact(RPubSub.increment(cfg.url, topics.map(_.name))).as(())
+
+ def unsubscribe: F[Unit] =
+ logger.trace(s"Decrementing counter for topics: $topics") *>
+ store.transact(RPubSub.decrement(cfg.url, topics.map(_.name))).as(())
+
+ Resource.make(subscribe)(_ => unsubscribe)
+ }
+
+ private def publishLocal(msg: Message[Json]): F[Unit] =
+ for {
+ st <- state.get
+ _ <- st.topics.get(msg.head.topic.name) match {
+ case Some(sub) =>
+ logger.trace(s"Publishing message to local topic: $msg") *>
+ sub.publish1(msg).as(())
+ case None =>
+ ().pure[F]
+ }
+ } yield ()
+
+ private def publishRemote(msg: Message[Json]): F[Unit] = {
+ val dsl = new Http4sDsl[F] with Http4sClientDsl[F] {}
+ import dsl._
+
+ for {
+ _ <- logger.trace(s"Find all nodes subscribed to topic ${msg.head.topic.name}")
+ urls <- store.transact(RPubSub.findSubs(msg.head.topic.name, cfg.nodeId))
+ _ <- logger.trace(s"Publishing to remote urls ${urls.map(_.asString)}: $msg")
+ reqs = urls
+ .map(u => Uri.unsafeFromString(u.asString))
+ .map(uri => POST(List(msg), uri))
+ resList <- reqs.traverse(req => client.status(req).attempt)
+ _ <- resList.traverse {
+ case Right(s) =>
+ if (s.isSuccess) ().pure[F]
+ else logger.warn(s"A node was not reached! Reason: $s, message: $msg")
+ case Left(ex) =>
+ logger.error(ex)(s"Error publishing ${msg.head.topic.name} message remotely")
+ }
+ } yield ()
+ }
+}
+
+object NaivePubSub {
+
+ def apply[F[_]: Async](
+ cfg: PubSubConfig,
+ store: Store[F],
+ client: Client[F]
+ )(topics: NonEmptyList[Topic]): Resource[F, NaivePubSub[F]] =
+ Resource.eval(for {
+ state <- Ref.ofEffect[F, State[F]](State.create[F](topics))
+ _ <- store.transact(RPubSub.initTopics(cfg.nodeId, cfg.url, topics.map(_.name)))
+ } yield new NaivePubSub[F](cfg, state, store, client))
+
+ def create[F[_]: Async](
+ cfg: PubSubConfig,
+ store: Store[F],
+ client: Client[F],
+ logger: Logger[F]
+ )(topics: NonEmptyList[Topic]): Resource[F, PubSubT[F]] =
+ apply[F](cfg, store, client)(topics).map(ps => PubSubT(ps, logger))
+
+ final case class State[F[_]](topics: Map[String, Fs2Topic[F, Message[Json]]]) {}
+
+ object State {
+ def empty[F[_]]: State[F] = State[F](Map.empty)
+ def create[F[_]: Async](topics: NonEmptyList[Topic]): F[State[F]] =
+ topics
+ .traverse(t => Fs2Topic[F, Message[Json]].map(fs2t => t.name -> fs2t))
+ .map(_.toList.toMap)
+ .map(State.apply)
+ }
+}
diff --git a/modules/pubsub/naive/src/main/scala/docspell/pubsub/naive/PubSubConfig.scala b/modules/pubsub/naive/src/main/scala/docspell/pubsub/naive/PubSubConfig.scala
new file mode 100644
index 00000000..b00131d6
--- /dev/null
+++ b/modules/pubsub/naive/src/main/scala/docspell/pubsub/naive/PubSubConfig.scala
@@ -0,0 +1,11 @@
+/*
+ * Copyright 2020 Eike K. & Contributors
+ *
+ * SPDX-License-Identifier: AGPL-3.0-or-later
+ */
+
+package docspell.pubsub.naive
+
+import docspell.common.{Ident, LenientUri}
+
+case class PubSubConfig(nodeId: Ident, url: LenientUri, subscriberQueueSize: Int)
diff --git a/modules/pubsub/naive/src/test/scala/docspell/pubsub/naive/Fixtures.scala b/modules/pubsub/naive/src/test/scala/docspell/pubsub/naive/Fixtures.scala
new file mode 100644
index 00000000..99e863ce
--- /dev/null
+++ b/modules/pubsub/naive/src/test/scala/docspell/pubsub/naive/Fixtures.scala
@@ -0,0 +1,85 @@
+/*
+ * Copyright 2020 Eike K. & Contributors
+ *
+ * SPDX-License-Identifier: AGPL-3.0-or-later
+ */
+
+package docspell.pubsub.naive
+
+import cats.effect._
+
+import docspell.common._
+import docspell.pubsub.api._
+import docspell.store.{Store, StoreFixture}
+
+import munit.CatsEffectSuite
+import org.http4s.Response
+import org.http4s.client.Client
+
+trait Fixtures extends HttpClientOps { self: CatsEffectSuite =>
+
+ val pubsubEnv = ResourceFixture(Fixtures.envResource("node-1"))
+
+ val pubsubT = ResourceFixture {
+ Fixtures
+ .envResource("node-1")
+ .flatMap(_.pubSub)
+ .map(ps => PubSubT(ps, Fixtures.loggerIO))
+ }
+
+ def conntectedPubsubs(env: Fixtures.Env) =
+ for {
+ // Create two pubsub instances connected to the same database
+ ps_1 <- env.withNodeId("node-1").pubSubT
+ ps_2 <- env.withNodeId("node-2").pubSubT
+
+ // both instances have a dummy client. now connect their clients to each other
+ ps1 = ps_1.withDelegate(ps_1.delegateT.withClient(httpClient(ps_2)))
+ ps2 = ps_2.withDelegate(ps_2.delegateT.withClient(httpClient(ps_1)))
+ } yield (ps1, ps2)
+
+ implicit final class StringId(s: String) {
+ def id: Ident = Ident.unsafe(s)
+ }
+}
+
+object Fixtures {
+ private val loggerIO: Logger[IO] = Logger.log4s(org.log4s.getLogger)
+
+ final case class Env(store: Store[IO], cfg: PubSubConfig) {
+ def pubSub: Resource[IO, NaivePubSub[IO]] = {
+ val dummyClient = Client[IO](_ => Resource.pure(Response.notFound[IO]))
+ NaivePubSub(cfg, store, dummyClient)(Topics.all.map(_.topic))
+ }
+ def pubSubT: Resource[IO, PubSubT[IO]] =
+ pubSub.map(PubSubT(_, loggerIO))
+
+ def withNodeId(nodeId: String): Env =
+ copy(cfg =
+ cfg.copy(
+ nodeId = Ident.unsafe(nodeId),
+ url = LenientUri.unsafe(s"http://$nodeId/")
+ )
+ )
+ }
+
+ def testConfig(nodeId: String) =
+ PubSubConfig(
+ Ident.unsafe(nodeId),
+ LenientUri.unsafe(s"http://$nodeId/"),
+ 0
+ )
+
+ def storeResource: Resource[IO, Store[IO]] =
+ for {
+ random <- Resource.eval(Ident.randomId[IO])
+ cfg = StoreFixture.memoryDB(random.id.take(12))
+ store <- StoreFixture.store(cfg)
+ _ <- Resource.eval(store.migrate)
+ } yield store
+
+ def envResource(nodeId: String): Resource[IO, Env] =
+ for {
+ store <- storeResource
+ } yield Env(store, testConfig(nodeId))
+}
diff --git a/modules/pubsub/naive/src/test/scala/docspell/pubsub/naive/HttpClientOps.scala b/modules/pubsub/naive/src/test/scala/docspell/pubsub/naive/HttpClientOps.scala
new file mode 100644
index 00000000..3659f88e
--- /dev/null
+++ b/modules/pubsub/naive/src/test/scala/docspell/pubsub/naive/HttpClientOps.scala
@@ -0,0 +1,59 @@
+/*
+ * Copyright 2020 Eike K. & Contributors
+ *
+ * SPDX-License-Identifier: AGPL-3.0-or-later
+ */
+
+package docspell.pubsub.naive
+
+import cats.effect._
+
+import docspell.common._
+import docspell.pubsub.api._
+
+import io.circe.Encoder
+import org.http4s.circe.CirceEntityCodec._
+import org.http4s.client.Client
+import org.http4s.client.dsl.io._
+import org.http4s.dsl.io._
+import org.http4s.{HttpApp, HttpRoutes, Uri}
+
+trait HttpClientOps {
+ def httpClient(routes: HttpRoutes[IO]): Client[IO] =
+ Client.fromHttpApp(HttpApp(routes.orNotFound.run))
+
+ def httpClient(ps: NaivePubSub[IO]): Client[IO] =
+ httpClient(ps.receiveRoute)
+
+ def httpClient(ps: PubSubT[IO]): Client[IO] =
+ httpClient(ps.delegateT)
+
+ implicit final class ClientOps(client: Client[IO]) {
+ val uri = Uri.unsafeFromString("http://localhost/")
+
+ def sendMessage[A: Encoder](topic: Topic, body: A): IO[Unit] = {
+ val encode: Encoder[List[Message[A]]] = implicitly[Encoder[List[Message[A]]]]
+
+ for {
+ id <- Ident.randomId[IO]
+ time <- Timestamp.current[IO]
+ mesg = List(Message(MessageHead(id, time, topic), body))
+ _ <- HttpClientOps.logger.debug(s"Sending message(s): $mesg")
+ _ <- client.expectOr[Unit](POST(encode(mesg), uri)) { resp =>
+ IO(new Exception(s"Unexpected response: $resp"))
+ }
+ } yield ()
+ }
+
+ def send[A](typedTopic: TypedTopic[A], body: A): IO[Unit] =
+ sendMessage(typedTopic.topic, body)(typedTopic.codec)
+ }
+
+ implicit final class PubSubTestOps(ps: PubSubT[IO]) {
+ def delegateT: NaivePubSub[IO] = ps.delegate.asInstanceOf[NaivePubSub[IO]]
+ }
+}
+
+object HttpClientOps {
+ private val logger: Logger[IO] = Logger.log4s(org.log4s.getLogger)
+}
diff --git a/modules/pubsub/naive/src/test/scala/docspell/pubsub/naive/NaivePubSubTest.scala b/modules/pubsub/naive/src/test/scala/docspell/pubsub/naive/NaivePubSubTest.scala
new file mode 100644
index 00000000..8d3094c2
--- /dev/null
+++ b/modules/pubsub/naive/src/test/scala/docspell/pubsub/naive/NaivePubSubTest.scala
@@ -0,0 +1,128 @@
+/*
+ * Copyright 2020 Eike K. & Contributors
+ *
+ * SPDX-License-Identifier: AGPL-3.0-or-later
+ */
+
+package docspell.pubsub.naive
+
+import scala.concurrent.duration._
+
+import cats.effect._
+import cats.implicits._
+import fs2.concurrent.SignallingRef
+
+import docspell.common._
+import docspell.pubsub.api._
+import docspell.pubsub.naive.Topics._
+
+import munit.CatsEffectSuite
+
+class NaivePubSubTest extends CatsEffectSuite with Fixtures {
+ private[this] val logger = Logger.log4s[IO](org.log4s.getLogger)
+
+ def subscribe[A](ps: PubSubT[IO], topic: TypedTopic[A]) =
+ for {
+ received <- Ref.of[IO, Option[Message[A]]](None)
+ halt <- SignallingRef.of[IO, Boolean](false)
+ fiber <- Async[IO].start(
+ logger.debug(s"${Thread.currentThread()} Listening for messages...") *>
+ ps.subscribe(topic)
+ .evalMap(m =>
+ logger.debug(s"Handling message: $m") *>
+ received.set(Some(m)) *>
+ halt.set(true)
+ )
+ .interruptWhen(halt)
+ .compile
+ .drain
+ )
+ _ <- IO.sleep(500.millis)
+ } yield (received, halt, fiber)
+
+ pubsubT.test("local publish receives message") { ps =>
+ for {
+ res <- subscribe(ps, Topics.jobSubmitted)
+ (received, _, subFiber) = res
+ headSend <- ps.publish1(Topics.jobSubmitted, JobSubmittedMsg("hello".id))
+ outcome <- subFiber.join
+ msgRec <- received.get
+ _ = assert(outcome.isSuccess)
+ _ = assertEquals(msgRec.map(_.head), Option(headSend))
+ } yield ()
+ }
+
+ pubsubT.test("local publish to different topic doesn't receive") { ps =>
+ val otherTopic = Topics.jobSubmitted.withTopic(Topic("other-name"))
+ for {
+ res <- subscribe(ps, Topics.jobSubmitted)
+ (received, halt, subFiber) = res
+ _ <- ps.publish1(otherTopic, JobSubmittedMsg("hello".id))
+ _ <- IO.sleep(100.millis) //allow some time for receiving
+ _ <- halt.set(true)
+ outcome <- subFiber.join
+ _ = assert(outcome.isSuccess)
+ recMsg <- received.get
+ _ = assert(recMsg.isEmpty)
+ } yield ()
+ }
+
+ pubsubT.test("receive messages remotely") { ps =>
+ val msg = JobSubmittedMsg("hello-remote".id)
+ for {
+ res <- subscribe(ps, Topics.jobSubmitted)
+ (received, _, subFiber) = res
+ client = httpClient(ps.delegateT.receiveRoute)
+ _ <- client.send(Topics.jobSubmitted, msg)
+ outcome <- subFiber.join
+ msgRec <- received.get
+ _ = assert(outcome.isSuccess)
+ _ = assertEquals(msgRec.map(_.head.topic), Topics.jobSubmitted.topic.some)
+ _ = assertEquals(msgRec.map(_.body), msg.some)
+ } yield ()
+ }
+
+ pubsubEnv.test("send messages remotely") { env =>
+ val msg = JobSubmittedMsg("hello-remote".id)
+
+ // Create two pubsub instances connected to the same database
+ conntectedPubsubs(env).use { case (ps1, ps2) =>
+ for {
+ // subscribe to ps1 and send via ps2
+ res <- subscribe(ps1, Topics.jobSubmitted)
+ (received, _, subFiber) = res
+ _ <- ps2.publish1(Topics.jobSubmitted, msg)
+ outcome <- subFiber.join
+ msgRec <- received.get
+
+ // check results
+ _ = assert(outcome.isSuccess)
+ _ = assertEquals(msgRec.map(_.head.topic), Topics.jobSubmitted.topic.some)
+ _ = assertEquals(msgRec.map(_.body), msg.some)
+ } yield ()
+ }
+ }
+
+ pubsubEnv.test("do not receive remote message from other topic") { env =>
+ val msg = JobCancelMsg("job-1".id)
+
+ // Create two pubsub instances connected to the same database
+ conntectedPubsubs(env).use { case (ps1, ps2) =>
+ for {
+ // subscribe to ps1 and send via ps2
+ res <- subscribe(ps1, Topics.jobSubmitted)
+ (received, halt, subFiber) = res
+ _ <- ps2.publish1(Topics.jobCancel, msg)
+ _ <- IO.sleep(100.millis)
+ _ <- halt.set(true)
+ outcome <- subFiber.join
+ msgRec <- received.get
+
+ // check results
+ _ = assert(outcome.isSuccess)
+ _ = assertEquals(msgRec, None)
+ } yield ()
+ }
+
+ }
+}
diff --git a/modules/pubsub/naive/src/test/scala/docspell/pubsub/naive/Topics.scala b/modules/pubsub/naive/src/test/scala/docspell/pubsub/naive/Topics.scala
new file mode 100644
index 00000000..8388efa0
--- /dev/null
+++ b/modules/pubsub/naive/src/test/scala/docspell/pubsub/naive/Topics.scala
@@ -0,0 +1,37 @@
+/*
+ * Copyright 2020 Eike K. & Contributors
+ *
+ * SPDX-License-Identifier: AGPL-3.0-or-later
+ */
+
+package docspell.pubsub.naive
+
+import cats.data.NonEmptyList
+
+import docspell.common.Ident
+import docspell.pubsub.api._
+
+import io.circe.generic.semiauto.{deriveDecoder, deriveEncoder}
+import io.circe.{Decoder, Encoder}
+
+object Topics {
+ val jobSubmitted: TypedTopic[JobSubmittedMsg] =
+ TypedTopic[JobSubmittedMsg](Topic("test-job-submitted"))
+
+ final case class JobSubmittedMsg(task: Ident)
+ object JobSubmittedMsg {
+ implicit val encode: Encoder[JobSubmittedMsg] = deriveEncoder[JobSubmittedMsg]
+ implicit val decode: Decoder[JobSubmittedMsg] = deriveDecoder[JobSubmittedMsg]
+ }
+
+ val jobCancel: TypedTopic[JobCancelMsg] =
+ TypedTopic[JobCancelMsg](Topic("test-job-done"))
+ final case class JobCancelMsg(id: Ident)
+ object JobCancelMsg {
+ implicit val encode: Encoder[JobCancelMsg] = deriveEncoder[JobCancelMsg]
+ implicit val decode: Decoder[JobCancelMsg] = deriveDecoder[JobCancelMsg]
+ }
+
+ def all: NonEmptyList[TypedTopic[_]] =
+ NonEmptyList.of(jobSubmitted, jobCancel)
+}
diff --git a/modules/restserver/src/main/scala/docspell/restserver/Config.scala b/modules/restserver/src/main/scala/docspell/restserver/Config.scala
index 4b64d7e7..589e5ff1 100644
--- a/modules/restserver/src/main/scala/docspell/restserver/Config.scala
+++ b/modules/restserver/src/main/scala/docspell/restserver/Config.scala
@@ -11,6 +11,7 @@ import docspell.backend.{Config => BackendConfig}
import docspell.common._
import docspell.ftssolr.SolrConfig
import docspell.oidc.ProviderConfig
+import docspell.pubsub.naive.PubSubConfig
import docspell.restserver.Config.OpenIdConfig
import docspell.restserver.auth.OpenId
@@ -33,6 +34,9 @@ case class Config(
) {
def openIdEnabled: Boolean =
openid.exists(_.enabled)
+
+ def pubSubConfig: PubSubConfig =
+ PubSubConfig(appId, baseUrl / "internal" / "pubsub", 100)
}
object Config {
diff --git a/modules/restserver/src/main/scala/docspell/restserver/Main.scala b/modules/restserver/src/main/scala/docspell/restserver/Main.scala
index 5907ff41..61844bdd 100644
--- a/modules/restserver/src/main/scala/docspell/restserver/Main.scala
+++ b/modules/restserver/src/main/scala/docspell/restserver/Main.scala
@@ -38,12 +38,6 @@ object Main extends IOApp {
pools = connectEC.map(Pools.apply)
rc <-
- pools.use(p =>
- RestServer
- .stream[IO](cfg, p)
- .compile
- .drain
- .as(ExitCode.Success)
- )
+ pools.use(p => RestServer.serve[IO](cfg, p))
} yield rc
}
diff --git a/modules/restserver/src/main/scala/docspell/restserver/RestApp.scala b/modules/restserver/src/main/scala/docspell/restserver/RestApp.scala
index 975ae73c..68383b0a 100644
--- a/modules/restserver/src/main/scala/docspell/restserver/RestApp.scala
+++ b/modules/restserver/src/main/scala/docspell/restserver/RestApp.scala
@@ -10,8 +10,6 @@ import docspell.backend.BackendApp
trait RestApp[F[_]] {
- def init: F[Unit]
-
def config: Config
def backend: BackendApp[F]
diff --git a/modules/restserver/src/main/scala/docspell/restserver/RestAppImpl.scala b/modules/restserver/src/main/scala/docspell/restserver/RestAppImpl.scala
index 74b6a303..65a5ad5e 100644
--- a/modules/restserver/src/main/scala/docspell/restserver/RestAppImpl.scala
+++ b/modules/restserver/src/main/scala/docspell/restserver/RestAppImpl.scala
@@ -6,41 +6,36 @@
package docspell.restserver
-import scala.concurrent.ExecutionContext
-
import cats.effect._
-import cats.implicits._
import docspell.backend.BackendApp
-import docspell.common.NodeType
+import docspell.common.Logger
import docspell.ftsclient.FtsClient
import docspell.ftssolr.SolrFtsClient
+import docspell.pubsub.api.{PubSub, PubSubT}
+import docspell.store.Store
import org.http4s.client.Client
final class RestAppImpl[F[_]](val config: Config, val backend: BackendApp[F])
- extends RestApp[F] {
-
- def init: F[Unit] =
- backend.node.register(config.appId, NodeType.Restserver, config.baseUrl)
-
- def shutdown: F[Unit] =
- backend.node.unregister(config.appId)
-}
+ extends RestApp[F] {}
object RestAppImpl {
def create[F[_]: Async](
cfg: Config,
- connectEC: ExecutionContext
- ): Resource[F, RestApp[F]] =
+ store: Store[F],
+ httpClient: Client[F],
+ pubSub: PubSub[F]
+ ): Resource[F, RestApp[F]] = {
+ val logger = Logger.log4s(org.log4s.getLogger(s"restserver-${cfg.appId.id}"))
for {
- backend <- BackendApp(cfg.backend, connectEC)(
- createFtsClient[F](cfg)
- )
+ ftsClient <- createFtsClient(cfg)(httpClient)
+ pubSubT = PubSubT(pubSub, logger)
+ backend <- BackendApp.create[F](cfg.backend, store, ftsClient, pubSubT)
app = new RestAppImpl[F](cfg, backend)
- appR <- Resource.make(app.init.map(_ => app))(_.shutdown)
- } yield appR
+ } yield app
+ }
private def createFtsClient[F[_]: Async](
cfg: Config
diff --git a/modules/restserver/src/main/scala/docspell/restserver/RestServer.scala b/modules/restserver/src/main/scala/docspell/restserver/RestServer.scala
index f64f0a93..707c7aeb 100644
--- a/modules/restserver/src/main/scala/docspell/restserver/RestServer.scala
+++ b/modules/restserver/src/main/scala/docspell/restserver/RestServer.scala
@@ -6,17 +6,25 @@
package docspell.restserver
+import scala.concurrent.duration._
+
import cats.effect._
import cats.implicits._
import fs2.Stream
+import fs2.concurrent.Topic
import docspell.backend.auth.{AuthToken, ShareToken}
+import docspell.backend.msg.Topics
import docspell.common._
import docspell.oidc.CodeFlowRoutes
+import docspell.pubsub.naive.NaivePubSub
import docspell.restserver.auth.OpenId
import docspell.restserver.http4s.EnvMiddleware
import docspell.restserver.routes._
import docspell.restserver.webapp._
+import docspell.restserver.ws.OutputEvent.KeepAlive
+import docspell.restserver.ws.{OutputEvent, WebSocketRoutes}
+import docspell.store.Store
import org.http4s._
import org.http4s.blaze.client.BlazeClientBuilder
@@ -27,55 +35,96 @@ import org.http4s.headers.Location
import org.http4s.implicits._
import org.http4s.server.Router
import org.http4s.server.middleware.Logger
+import org.http4s.server.websocket.WebSocketBuilder2
object RestServer {
- def stream[F[_]: Async](cfg: Config, pools: Pools): Stream[F, Nothing] = {
+ def serve[F[_]: Async](cfg: Config, pools: Pools): F[ExitCode] =
+ for {
+ wsTopic <- Topic[F, OutputEvent]
+ keepAlive = Stream
+ .awakeEvery[F](30.seconds)
+ .map(_ => KeepAlive)
+ .through(wsTopic.publish)
- val templates = TemplateRoutes[F](cfg)
- val app = for {
- restApp <- RestAppImpl.create[F](cfg, pools.connectEC)
+ server =
+ Stream
+ .resource(createApp(cfg, pools))
+ .flatMap { case (restApp, pubSub, httpClient) =>
+ Stream(
+ Subscriptions(wsTopic, restApp.backend.pubSub),
+ BlazeServerBuilder[F]
+ .bindHttp(cfg.bind.port, cfg.bind.address)
+ .withoutBanner
+ .withHttpWebSocketApp(
+ createHttpApp(cfg, httpClient, pubSub, restApp, wsTopic)
+ )
+ .serve
+ .drain
+ )
+ }
+
+ exit <-
+ (server ++ Stream(keepAlive)).parJoinUnbounded.compile.drain.as(ExitCode.Success)
+ } yield exit
+
+ def createApp[F[_]: Async](
+ cfg: Config,
+ pools: Pools
+ ): Resource[F, (RestApp[F], NaivePubSub[F], Client[F])] =
+ for {
httpClient <- BlazeClientBuilder[F].resource
- httpApp = Router(
- "/api/info" -> routes.InfoRoutes(),
- "/api/v1/open/" -> openRoutes(cfg, httpClient, restApp),
- "/api/v1/sec/" -> Authenticate(restApp.backend.login, cfg.auth) { token =>
- securedRoutes(cfg, restApp, token)
- },
- "/api/v1/admin" -> AdminAuth(cfg.adminEndpoint) {
- adminRoutes(cfg, restApp)
- },
- "/api/v1/share" -> ShareAuth(restApp.backend.share, cfg.auth) { token =>
- shareRoutes(cfg, restApp, token)
- },
- "/api/doc" -> templates.doc,
- "/app/assets" -> EnvMiddleware(WebjarRoutes.appRoutes[F]),
- "/app" -> EnvMiddleware(templates.app),
- "/sw.js" -> EnvMiddleware(templates.serviceWorker),
- "/" -> redirectTo("/app")
- ).orNotFound
-
- finalHttpApp = Logger.httpApp(logHeaders = false, logBody = false)(httpApp)
-
- } yield finalHttpApp
-
- Stream
- .resource(app)
- .flatMap(httpApp =>
- BlazeServerBuilder[F]
- .bindHttp(cfg.bind.port, cfg.bind.address)
- .withHttpApp(httpApp)
- .withoutBanner
- .serve
+ store <- Store.create[F](
+ cfg.backend.jdbc,
+ cfg.backend.files.chunkSize,
+ pools.connectEC
)
- }.drain
+ pubSub <- NaivePubSub(cfg.pubSubConfig, store, httpClient)(Topics.all.map(_.topic))
+ restApp <- RestAppImpl.create[F](cfg, store, httpClient, pubSub)
+ } yield (restApp, pubSub, httpClient)
+
+ def createHttpApp[F[_]: Async](
+ cfg: Config,
+ httpClient: Client[F],
+ pubSub: NaivePubSub[F],
+ restApp: RestApp[F],
+ topic: Topic[F, OutputEvent]
+ )(
+ wsB: WebSocketBuilder2[F]
+ ) = {
+ val templates = TemplateRoutes[F](cfg)
+ val httpApp = Router(
+ "/internal/pubsub" -> pubSub.receiveRoute,
+ "/api/info" -> routes.InfoRoutes(),
+ "/api/v1/open/" -> openRoutes(cfg, httpClient, restApp),
+ "/api/v1/sec/" -> Authenticate(restApp.backend.login, cfg.auth) { token =>
+ securedRoutes(cfg, restApp, wsB, topic, token)
+ },
+ "/api/v1/admin" -> AdminAuth(cfg.adminEndpoint) {
+ adminRoutes(cfg, restApp)
+ },
+ "/api/v1/share" -> ShareAuth(restApp.backend.share, cfg.auth) { token =>
+ shareRoutes(cfg, restApp, token)
+ },
+ "/api/doc" -> templates.doc,
+ "/app/assets" -> EnvMiddleware(WebjarRoutes.appRoutes[F]),
+ "/app" -> EnvMiddleware(templates.app),
+ "/sw.js" -> EnvMiddleware(templates.serviceWorker),
+ "/" -> redirectTo("/app")
+ ).orNotFound
+
+ Logger.httpApp(logHeaders = false, logBody = false)(httpApp)
+ }
def securedRoutes[F[_]: Async](
cfg: Config,
restApp: RestApp[F],
+ wsB: WebSocketBuilder2[F],
+ topic: Topic[F, OutputEvent],
token: AuthToken
): HttpRoutes[F] =
Router(
+ "ws" -> WebSocketRoutes(token, topic, wsB),
"auth" -> LoginRoutes.session(restApp.backend.login, cfg, token),
"tag" -> TagRoutes(restApp.backend, token),
"equipment" -> EquipmentRoutes(restApp.backend, token),
diff --git a/modules/restserver/src/main/scala/docspell/restserver/Subscriptions.scala b/modules/restserver/src/main/scala/docspell/restserver/Subscriptions.scala
new file mode 100644
index 00000000..a574f0f2
--- /dev/null
+++ b/modules/restserver/src/main/scala/docspell/restserver/Subscriptions.scala
@@ -0,0 +1,35 @@
+/*
+ * Copyright 2020 Eike K. & Contributors
+ *
+ * SPDX-License-Identifier: AGPL-3.0-or-later
+ */
+
+package docspell.restserver
+
+import fs2.Stream
+import fs2.concurrent.Topic
+
+import docspell.backend.msg.JobDone
+import docspell.common._
+import docspell.common.syntax.StringSyntax._
+import docspell.pubsub.api.PubSubT
+import docspell.restserver.ws.OutputEvent
+
+/** Subscribes to those events from docspell that are forwarded to the websocket endpoints
+ */
+object Subscriptions {
+
+ def apply[F[_]](
+ wsTopic: Topic[F, OutputEvent],
+ pubSub: PubSubT[F]
+ ): Stream[F, Nothing] =
+ jobDone(pubSub).through(wsTopic.publish)
+
+ def jobDone[F[_]](pubSub: PubSubT[F]): Stream[F, OutputEvent] =
+ pubSub
+ .subscribe(JobDone.topic)
+ .filter(m => m.body.task == ProcessItemArgs.taskName)
+ .map(m => m.body.args.parseJsonAs[ProcessItemArgs])
+ .collect { case Right(a) => OutputEvent.ItemProcessed(a.meta.collective) }
+
+}
diff --git a/modules/restserver/src/main/scala/docspell/restserver/ws/InputMessage.scala b/modules/restserver/src/main/scala/docspell/restserver/ws/InputMessage.scala
new file mode 100644
index 00000000..4ea8dc02
--- /dev/null
+++ b/modules/restserver/src/main/scala/docspell/restserver/ws/InputMessage.scala
@@ -0,0 +1,11 @@
+/*
+ * Copyright 2020 Eike K. & Contributors
+ *
+ * SPDX-License-Identifier: AGPL-3.0-or-later
+ */
+
+package docspell.restserver.ws
+
+sealed trait InputMessage
+
+object InputMessage {}
diff --git a/modules/restserver/src/main/scala/docspell/restserver/ws/OutputEvent.scala b/modules/restserver/src/main/scala/docspell/restserver/ws/OutputEvent.scala
new file mode 100644
index 00000000..8567f3a7
--- /dev/null
+++ b/modules/restserver/src/main/scala/docspell/restserver/ws/OutputEvent.scala
@@ -0,0 +1,32 @@
+/*
+ * Copyright 2020 Eike K. & Contributors
+ *
+ * SPDX-License-Identifier: AGPL-3.0-or-later
+ */
+
+package docspell.restserver.ws
+
+import docspell.backend.auth.AuthToken
+import docspell.common._
+
+sealed trait OutputEvent {
+ def forCollective(token: AuthToken): Boolean
+ def encode: String
+}
+
+object OutputEvent {
+
+ case object KeepAlive extends OutputEvent {
+ def forCollective(token: AuthToken): Boolean = true
+ def encode: String = "keep-alive"
+ }
+
+ final case class ItemProcessed(collective: Ident) extends OutputEvent {
+ def forCollective(token: AuthToken): Boolean =
+ token.account.collective == collective
+
+ def encode: String =
+ "item-processed"
+ }
+
+}
diff --git a/modules/restserver/src/main/scala/docspell/restserver/ws/WebSocketRoutes.scala b/modules/restserver/src/main/scala/docspell/restserver/ws/WebSocketRoutes.scala
new file mode 100644
index 00000000..022420b2
--- /dev/null
+++ b/modules/restserver/src/main/scala/docspell/restserver/ws/WebSocketRoutes.scala
@@ -0,0 +1,44 @@
+/*
+ * Copyright 2020 Eike K. & Contributors
+ *
+ * SPDX-License-Identifier: AGPL-3.0-or-later
+ */
+
+package docspell.restserver.ws
+
+import cats.effect.Async
+import fs2.concurrent.Topic
+import fs2.{Pipe, Stream}
+
+import docspell.backend.auth.AuthToken
+
+import org.http4s.HttpRoutes
+import org.http4s.dsl.Http4sDsl
+import org.http4s.server.websocket.WebSocketBuilder2
+import org.http4s.websocket.WebSocketFrame
+import org.http4s.websocket.WebSocketFrame.Text
+
+object WebSocketRoutes {
+
+ def apply[F[_]: Async](
+ user: AuthToken,
+ topic: Topic[F, OutputEvent],
+ wsb: WebSocketBuilder2[F]
+ ): HttpRoutes[F] = {
+ val dsl = new Http4sDsl[F] {}
+ import dsl._
+
+ HttpRoutes.of { case GET -> Root =>
+ val toClient: Stream[F, WebSocketFrame.Text] =
+ topic
+ .subscribe(500)
+ .filter(_.forCollective(user))
+ .map(msg => Text(msg.encode))
+
+ val toServer: Pipe[F, WebSocketFrame, Unit] =
+ _.map(_ => ())
+
+ wsb.build(toClient, toServer)
+ }
+ }
+}
diff --git a/modules/store/src/main/resources/db/migration/h2/V1.28.0__pubsub.sql b/modules/store/src/main/resources/db/migration/h2/V1.28.0__pubsub.sql
new file mode 100644
index 00000000..ae42b13c
--- /dev/null
+++ b/modules/store/src/main/resources/db/migration/h2/V1.28.0__pubsub.sql
@@ -0,0 +1,8 @@
+CREATE TABLE "pubsub" (
+ "id" varchar(254) not null primary key,
+ "node_id" varchar(254) not null,
+ "url" varchar(254) not null,
+ "topic" varchar(254) not null,
+ "counter" int not null,
+ unique("url", "topic")
+)
diff --git a/modules/store/src/main/resources/db/migration/mariadb/V1.28.0__pubsub.sql b/modules/store/src/main/resources/db/migration/mariadb/V1.28.0__pubsub.sql
new file mode 100644
index 00000000..2bd09016
--- /dev/null
+++ b/modules/store/src/main/resources/db/migration/mariadb/V1.28.0__pubsub.sql
@@ -0,0 +1,8 @@
+CREATE TABLE `pubsub` (
+ `id` varchar(254) not null primary key,
+ `node_id` varchar(254) not null,
+ `url` varchar(254) not null,
+ `topic` varchar(254) not null,
+ `counter` int not null,
+ unique(`url`, `topic`)
+)
diff --git a/modules/store/src/main/resources/db/migration/postgresql/V1.28.0__pubsub.sql b/modules/store/src/main/resources/db/migration/postgresql/V1.28.0__pubsub.sql
new file mode 100644
index 00000000..ae42b13c
--- /dev/null
+++ b/modules/store/src/main/resources/db/migration/postgresql/V1.28.0__pubsub.sql
@@ -0,0 +1,8 @@
+CREATE TABLE "pubsub" (
+ "id" varchar(254) not null primary key,
+ "node_id" varchar(254) not null,
+ "url" varchar(254) not null,
+ "topic" varchar(254) not null,
+ "counter" int not null,
+ unique("url", "topic")
+)
diff --git a/modules/store/src/main/scala/docspell/store/records/RPubSub.scala b/modules/store/src/main/scala/docspell/store/records/RPubSub.scala
new file mode 100644
index 00000000..6509aaea
--- /dev/null
+++ b/modules/store/src/main/scala/docspell/store/records/RPubSub.scala
@@ -0,0 +1,90 @@
+/*
+ * Copyright 2020 Eike K. & Contributors
+ *
+ * SPDX-License-Identifier: AGPL-3.0-or-later
+ */
+
+package docspell.store.records
+
+import cats.data.NonEmptyList
+import cats.implicits._
+
+import docspell.common._
+import docspell.store.qb.DSL._
+import docspell.store.qb.{Column, DML, TableDef}
+
+import doobie._
+import doobie.implicits._
+
+/** A table for supporting naive pubsub across nodes. */
+final case class RPubSub(
+ id: Ident,
+ nodeId: Ident,
+ url: LenientUri,
+ topic: String,
+ counter: Int
+)
+
+object RPubSub {
+ final case class Table(alias: Option[String]) extends TableDef {
+ val tableName: String = "pubsub"
+
+ val id = Column[Ident]("id", this)
+ val nodeId = Column[Ident]("node_id", this)
+ val url = Column[LenientUri]("url", this)
+ val topic = Column[String]("topic", this)
+ val counter = Column[Int]("counter", this)
+
+ val all: NonEmptyList[Column[_]] =
+ NonEmptyList.of(id, nodeId, url, topic, counter)
+ }
+ def as(alias: String): Table =
+ Table(Some(alias))
+
+ val T: Table = Table(None)
+
+ def insert(r: RPubSub): ConnectionIO[Int] =
+ DML.insert(T, T.all, sql"${r.id}, ${r.nodeId}, ${r.url}, ${r.topic}, ${r.counter}")
+
+ /** Insert all topics with counter = 0 */
+ def initTopics(
+ nodeId: Ident,
+ url: LenientUri,
+ topics: NonEmptyList[String]
+ ): ConnectionIO[Int] =
+ DML.delete(T, T.nodeId === nodeId) *>
+ topics.toList
+ .traverse(t =>
+ Ident
+ .randomId[ConnectionIO]
+ .flatMap(id => insert(RPubSub(id, nodeId, url, t, 0)))
+ )
+ .map(_.sum)
+
+ def increment(url: LenientUri, topics: NonEmptyList[String]): ConnectionIO[Int] =
+ DML.update(
+ T,
+ T.url === url && T.topic.in(topics),
+ DML.set(
+ T.counter.increment(1)
+ )
+ )
+
+ def decrement(url: LenientUri, topics: NonEmptyList[String]): ConnectionIO[Int] =
+ DML.update(
+ T,
+ T.url === url && T.topic.in(topics),
+ DML.set(
+ T.counter.decrement(1)
+ )
+ )
+
+ def findSubs(topic: String, excludeNode: Ident): ConnectionIO[List[LenientUri]] =
+ run(
+ select(T.url),
+ from(T),
+ T.topic === topic && T.counter > 0 && T.nodeId <> excludeNode
+ )
+ .query[LenientUri]
+ .to[List]
+}
diff --git a/modules/store/src/test/resources/logback-test.xml b/modules/store/src/test/resources/logback-test.xml
index 9cf93b57..6d2ef8c0 100644
--- a/modules/store/src/test/resources/logback-test.xml
+++ b/modules/store/src/test/resources/logback-test.xml
@@ -3,7 +3,7 @@
true
- %highlight(%-5level) %cyan(%logger{15}) - %msg %n
+ %highlight(%-5level) [%t{10}] %cyan(%logger{15}) - %msg %n
diff --git a/modules/webapp/src/main/elm/App/Data.elm b/modules/webapp/src/main/elm/App/Data.elm
index c64c192a..f88a36cc 100644
--- a/modules/webapp/src/main/elm/App/Data.elm
+++ b/modules/webapp/src/main/elm/App/Data.elm
@@ -64,6 +64,7 @@ type alias Model =
, anonymousTheme : UiTheme
, anonymousUiLang : UiLanguage
, langMenuOpen : Bool
+ , showNewItemsArrived : Bool
}
@@ -126,6 +127,7 @@ init key url flags_ settings =
, anonymousTheme = Data.UiTheme.Light
, anonymousUiLang = Messages.UiLanguage.English
, langMenuOpen = False
+ , showNewItemsArrived = False
}
, Cmd.batch
[ Cmd.map UserSettingsMsg uc
@@ -190,6 +192,8 @@ type Msg
| SetLanguage UiLanguage
| ClientSettingsSaveResp UiSettings (Result Http.Error BasicResult)
| ReceiveBrowserSettings StoredUiSettings
+ | ReceiveWsMessage String
+ | ToggleShowNewItemsArrived
defaultPage : Flags -> Page
diff --git a/modules/webapp/src/main/elm/App/Update.elm b/modules/webapp/src/main/elm/App/Update.elm
index 47d7ad9b..60bfcd47 100644
--- a/modules/webapp/src/main/elm/App/Update.elm
+++ b/modules/webapp/src/main/elm/App/Update.elm
@@ -15,6 +15,7 @@ import App.Data exposing (..)
import Browser exposing (UrlRequest(..))
import Browser.Navigation as Nav
import Data.Flags
+import Data.ServerEvent exposing (ServerEvent(..))
import Data.UiSettings exposing (UiSettings)
import Data.UiTheme
import Messages exposing (Messages)
@@ -308,6 +309,33 @@ updateWithSub msg model =
in
updateUserSettings texts lm model
+ ReceiveWsMessage data ->
+ let
+ se =
+ Data.ServerEvent.fromString data
+ in
+ case se of
+ Just ItemProcessed ->
+ let
+ newModel =
+ { model | showNewItemsArrived = True }
+ in
+ case model.page of
+ HomePage ->
+ updateHome texts Page.Home.Data.RefreshView newModel
+
+ _ ->
+ ( newModel, Cmd.none, Sub.none )
+
+ Nothing ->
+ ( model, Cmd.none, Sub.none )
+
+ ToggleShowNewItemsArrived ->
+ ( { model | showNewItemsArrived = not model.showNewItemsArrived }
+ , Cmd.none
+ , Sub.none
+ )
+
applyClientSettings : Messages -> Model -> UiSettings -> ( Model, Cmd Msg, Sub Msg )
applyClientSettings texts model settings =
diff --git a/modules/webapp/src/main/elm/App/View2.elm b/modules/webapp/src/main/elm/App/View2.elm
index 75159ed1..4e3d657c 100644
--- a/modules/webapp/src/main/elm/App/View2.elm
+++ b/modules/webapp/src/main/elm/App/View2.elm
@@ -71,7 +71,19 @@ topNavUser auth model =
, activeStyle = "hover:bg-blue-200 dark:hover:bg-bluegray-800 w-12"
}
, headerNavItem True model
- , div [ class "flex flex-grow justify-end" ]
+ , div [ class "flex flex-grow justify-center" ]
+ [ a
+ [ class S.infoMessageBase
+ , class "my-2 px-1 py-1 rounded-lg inline-block hover:opacity-50"
+ , classList [ ( "hidden", not model.showNewItemsArrived ) ]
+ , href "#"
+ , onClick ToggleShowNewItemsArrived
+ ]
+ [ i [ class "fa fa-exclamation-circle mr-1" ] []
+ , text texts.app.newItemsArrived
+ ]
+ ]
+ , div [ class "flex justify-end" ]
[ userMenu texts.app auth model
, dataMenu texts.app auth model
]
diff --git a/modules/webapp/src/main/elm/Data/ServerEvent.elm b/modules/webapp/src/main/elm/Data/ServerEvent.elm
new file mode 100644
index 00000000..c4d84b72
--- /dev/null
+++ b/modules/webapp/src/main/elm/Data/ServerEvent.elm
@@ -0,0 +1,22 @@
+{-
+ Copyright 2020 Eike K. & Contributors
+
+ SPDX-License-Identifier: AGPL-3.0-or-later
+-}
+
+
+module Data.ServerEvent exposing (ServerEvent(..), fromString)
+
+
+type ServerEvent
+ = ItemProcessed
+
+
+fromString : String -> Maybe ServerEvent
+fromString str =
+ case String.toLower str of
+ "item-processed" ->
+ Just ItemProcessed
+
+ _ ->
+ Nothing
diff --git a/modules/webapp/src/main/elm/Main.elm b/modules/webapp/src/main/elm/Main.elm
index a5b7eda3..be80a076 100644
--- a/modules/webapp/src/main/elm/Main.elm
+++ b/modules/webapp/src/main/elm/Main.elm
@@ -93,4 +93,5 @@ subscriptions model =
Sub.batch
[ model.subs
, Ports.receiveUiSettings ReceiveBrowserSettings
+ , Ports.receiveWsMessage ReceiveWsMessage
]
diff --git a/modules/webapp/src/main/elm/Messages/App.elm b/modules/webapp/src/main/elm/Messages/App.elm
index 0288e4a7..06d32a73 100644
--- a/modules/webapp/src/main/elm/Messages/App.elm
+++ b/modules/webapp/src/main/elm/Messages/App.elm
@@ -23,6 +23,7 @@ type alias Texts =
, processingQueue : String
, newInvites : String
, help : String
+ , newItemsArrived : String
}
@@ -38,6 +39,7 @@ gb =
, processingQueue = "Processing Queue"
, newInvites = "New Invites"
, help = "Help"
+ , newItemsArrived = "New items arrived!"
}
@@ -53,4 +55,5 @@ de =
, processingQueue = "Verarbeitung"
, newInvites = "Neue Einladung"
, help = "Hilfe (English)"
+ , newItemsArrived = "Neue Dokumente eingetroffen!"
}
diff --git a/modules/webapp/src/main/elm/Page/Home/Data.elm b/modules/webapp/src/main/elm/Page/Home/Data.elm
index 0a2deed7..7c1bef6a 100644
--- a/modules/webapp/src/main/elm/Page/Home/Data.elm
+++ b/modules/webapp/src/main/elm/Page/Home/Data.elm
@@ -229,6 +229,7 @@ type Msg
| PublishItemsMsg Comp.PublishItems.Msg
| TogglePublishCurrentQueryView
| PublishViewMsg Comp.PublishItems.Msg
+ | RefreshView
type SearchType
diff --git a/modules/webapp/src/main/elm/Page/Home/Update.elm b/modules/webapp/src/main/elm/Page/Home/Update.elm
index 27ddf0fd..998b2017 100644
--- a/modules/webapp/src/main/elm/Page/Home/Update.elm
+++ b/modules/webapp/src/main/elm/Page/Home/Update.elm
@@ -227,6 +227,22 @@ update mId key flags texts settings msg model =
else
doSearch param nm
+ RefreshView ->
+ let
+ param =
+ { flags = flags
+ , searchType = model.lastSearchType
+ , pageSize = settings.itemSearchPageSize
+ , offset = model.searchOffset
+ , scroll = False
+ }
+ in
+ if model.searchInProgress then
+ withSub ( model, Cmd.none )
+
+ else
+ doSearch param model
+
ToggleSearchMenu ->
let
nextView =
diff --git a/modules/webapp/src/main/elm/Ports.elm b/modules/webapp/src/main/elm/Ports.elm
index 233e6c63..c366bccc 100644
--- a/modules/webapp/src/main/elm/Ports.elm
+++ b/modules/webapp/src/main/elm/Ports.elm
@@ -11,6 +11,7 @@ port module Ports exposing
, printElement
, receiveCheckQueryResult
, receiveUiSettings
+ , receiveWsMessage
, removeAccount
, requestUiSettings
, setAccount
@@ -55,6 +56,15 @@ and calls the print dialog.
port printElement : String -> Cmd msg
+{-| Receives messages from the websocket.
+-}
+port receiveWsMessage : (String -> msg) -> Sub msg
+
+
+
+--- Higher level functions based on ports
+
+
setUiTheme : UiTheme -> Cmd msg
setUiTheme theme =
internalSetUiTheme (Data.UiTheme.toString theme)
diff --git a/modules/webapp/src/main/elm/Styles.elm b/modules/webapp/src/main/elm/Styles.elm
index 645d2324..e7cbdb6e 100644
--- a/modules/webapp/src/main/elm/Styles.elm
+++ b/modules/webapp/src/main/elm/Styles.elm
@@ -63,9 +63,14 @@ warnMessageColors =
" border-yellow-800 bg-yellow-50 text-yellow-800 dark:border-amber-200 dark:bg-amber-800 dark:text-amber-200 "
+infoMessageBase : String
+infoMessageBase =
+ " border border-blue-800 bg-blue-100 text-blue-800 dark:border-lightblue-200 dark:bg-lightblue-800 dark:text-lightblue-200 dark:bg-opacity-25 "
+
+
infoMessage : String
infoMessage =
- " border border-blue-800 bg-blue-100 text-blue-800 dark:border-lightblue-200 dark:bg-lightblue-800 dark:text-lightblue-200 dark:bg-opacity-25 px-2 py-2 rounded "
+ infoMessageBase ++ " px-2 py-2 rounded "
message : String
diff --git a/modules/webapp/src/main/webjar/docspell.js b/modules/webapp/src/main/webjar/docspell.js
index ef4ef03c..08aa6aed 100644
--- a/modules/webapp/src/main/webjar/docspell.js
+++ b/modules/webapp/src/main/webjar/docspell.js
@@ -12,9 +12,9 @@ function extend() {
var result = {};
for (var i = 0; i < arguments.length; i++) {
forEachIn(arguments[i],
- function(obj, key) {
- result[key] = obj;
- });
+ function(obj, key) {
+ result[key] = obj;
+ });
}
return result;
}
@@ -41,13 +41,18 @@ elmApp.ports.internalSetUiTheme.subscribe(function(themeName) {
});
elmApp.ports.setAccount.subscribe(function(authResult) {
- console.log("Add account from local storage");
+ console.log("Add account to local storage");
localStorage.setItem("account", JSON.stringify(authResult));
+
+ if (!dsWebSocket) {
+ initWS();
+ }
});
elmApp.ports.removeAccount.subscribe(function() {
console.log("Remove account from local storage");
localStorage.removeItem("account");
+ closeWS();
});
elmApp.ports.requestUiSettings.subscribe(function(args) {
@@ -135,3 +140,28 @@ elmApp.ports.printElement.subscribe(function(id) {
}
}
});
+
+
+var dsWebSocket = null;
+function closeWS() {
+ if (dsWebSocket) {
+ console.log("Closing websocket connection");
+ dsWebSocket.close(1000, "Done");
+ dsWebSocket = null;
+ }
+}
+function initWS() {
+ closeWS();
+ var protocol = (window.location.protocol === 'https:') ? 'wss:' : 'ws:';
+ var url = protocol + '//' + window.location.host + '/api/v1/sec/ws';
+ console.log("Initialize websocket at " + url);
+ dsWebSocket = new WebSocket(url);
+ dsWebSocket.addEventListener("message", function(event) {
+ if (event.data != "keep-alive" && event.data) {
+ elmApp.ports.receiveWsMessage.send(event.data);
+ }
+ });
+}
+// elmApp.ports.sendWsMessage.subscribe(function(msg) {
+// socket.send(msg);
+// });