mirror of
https://github.com/TheAnachronism/docspell.git
synced 2025-04-05 02:49:32 +00:00
commit
33c68373fb
41
build.sbt
41
build.sbt
@ -410,6 +410,36 @@ val store = project
|
||||
)
|
||||
.dependsOn(common, query.jvm, totp, files)
|
||||
|
||||
val pubsubApi = project
|
||||
.in(file("modules/pubsub/api"))
|
||||
.disablePlugins(RevolverPlugin)
|
||||
.settings(sharedSettings)
|
||||
.settings(testSettingsMUnit)
|
||||
.settings(
|
||||
name := "docspell-pubsub-api",
|
||||
addCompilerPlugin(Dependencies.kindProjectorPlugin),
|
||||
libraryDependencies ++=
|
||||
Dependencies.fs2
|
||||
)
|
||||
.dependsOn(common)
|
||||
|
||||
val pubsubNaive = project
|
||||
.in(file("modules/pubsub/naive"))
|
||||
.disablePlugins(RevolverPlugin)
|
||||
.settings(sharedSettings)
|
||||
.settings(testSettingsMUnit)
|
||||
.settings(
|
||||
name := "docspell-pubsub-naive",
|
||||
addCompilerPlugin(Dependencies.kindProjectorPlugin),
|
||||
libraryDependencies ++=
|
||||
Dependencies.fs2 ++
|
||||
Dependencies.http4sCirce ++
|
||||
Dependencies.http4sDsl ++
|
||||
Dependencies.http4sClient ++
|
||||
Dependencies.circe
|
||||
)
|
||||
.dependsOn(common, pubsubApi, store % "compile->compile;test->test")
|
||||
|
||||
val extract = project
|
||||
.in(file("modules/extract"))
|
||||
.disablePlugins(RevolverPlugin)
|
||||
@ -534,7 +564,7 @@ val backend = project
|
||||
Dependencies.http4sClient ++
|
||||
Dependencies.emil
|
||||
)
|
||||
.dependsOn(store, joexapi, ftsclient, totp)
|
||||
.dependsOn(store, joexapi, ftsclient, totp, pubsubApi)
|
||||
|
||||
val oidc = project
|
||||
.in(file("modules/oidc"))
|
||||
@ -625,7 +655,8 @@ val joex = project
|
||||
analysis,
|
||||
joexapi,
|
||||
restapi,
|
||||
ftssolr
|
||||
ftssolr,
|
||||
pubsubNaive
|
||||
)
|
||||
|
||||
val restserver = project
|
||||
@ -689,7 +720,7 @@ val restserver = project
|
||||
}
|
||||
}
|
||||
)
|
||||
.dependsOn(config, restapi, joexapi, backend, webapp, ftssolr, oidc)
|
||||
.dependsOn(config, restapi, joexapi, backend, webapp, ftssolr, oidc, pubsubNaive)
|
||||
|
||||
// --- Website Documentation
|
||||
|
||||
@ -781,7 +812,9 @@ val root = project
|
||||
query.jvm,
|
||||
query.js,
|
||||
totp,
|
||||
oidc
|
||||
oidc,
|
||||
pubsubApi,
|
||||
pubsubNaive
|
||||
)
|
||||
|
||||
// --- Helpers
|
||||
|
@ -6,8 +6,6 @@
|
||||
|
||||
package docspell.backend
|
||||
|
||||
import scala.concurrent.ExecutionContext
|
||||
|
||||
import cats.effect._
|
||||
|
||||
import docspell.backend.auth.Login
|
||||
@ -15,15 +13,13 @@ import docspell.backend.fulltext.CreateIndex
|
||||
import docspell.backend.ops._
|
||||
import docspell.backend.signup.OSignup
|
||||
import docspell.ftsclient.FtsClient
|
||||
import docspell.joexapi.client.JoexClient
|
||||
import docspell.pubsub.api.PubSubT
|
||||
import docspell.store.Store
|
||||
import docspell.store.queue.JobQueue
|
||||
import docspell.store.usertask.UserTaskStore
|
||||
import docspell.totp.Totp
|
||||
|
||||
import emil.javamail.{JavaMailEmil, Settings}
|
||||
import org.http4s.blaze.client.BlazeClientBuilder
|
||||
import org.http4s.client.Client
|
||||
|
||||
trait BackendApp[F[_]] {
|
||||
|
||||
@ -49,6 +45,7 @@ trait BackendApp[F[_]] {
|
||||
def clientSettings: OClientSettings[F]
|
||||
def totp: OTotp[F]
|
||||
def share: OShare[F]
|
||||
def pubSub: PubSubT[F]
|
||||
}
|
||||
|
||||
object BackendApp {
|
||||
@ -56,8 +53,8 @@ object BackendApp {
|
||||
def create[F[_]: Async](
|
||||
cfg: Config,
|
||||
store: Store[F],
|
||||
httpClient: Client[F],
|
||||
ftsClient: FtsClient[F]
|
||||
ftsClient: FtsClient[F],
|
||||
pubSubT: PubSubT[F]
|
||||
): Resource[F, BackendApp[F]] =
|
||||
for {
|
||||
utStore <- UserTaskStore(store)
|
||||
@ -65,7 +62,7 @@ object BackendApp {
|
||||
totpImpl <- OTotp(store, Totp.default)
|
||||
loginImpl <- Login[F](store, Totp.default)
|
||||
signupImpl <- OSignup[F](store)
|
||||
joexImpl <- OJoex(JoexClient(httpClient), store)
|
||||
joexImpl <- OJoex(pubSubT)
|
||||
collImpl <- OCollective[F](store, utStore, queue, joexImpl)
|
||||
sourceImpl <- OSource[F](store)
|
||||
tagImpl <- OTag[F](store)
|
||||
@ -90,6 +87,7 @@ object BackendApp {
|
||||
OShare(store, itemSearchImpl, simpleSearchImpl, javaEmil)
|
||||
)
|
||||
} yield new BackendApp[F] {
|
||||
val pubSub = pubSubT
|
||||
val login = loginImpl
|
||||
val signup = signupImpl
|
||||
val collective = collImpl
|
||||
@ -113,15 +111,4 @@ object BackendApp {
|
||||
val totp = totpImpl
|
||||
val share = shareImpl
|
||||
}
|
||||
|
||||
def apply[F[_]: Async](
|
||||
cfg: Config,
|
||||
connectEC: ExecutionContext
|
||||
)(ftsFactory: Client[F] => Resource[F, FtsClient[F]]): Resource[F, BackendApp[F]] =
|
||||
for {
|
||||
store <- Store.create(cfg.jdbc, cfg.files.chunkSize, connectEC)
|
||||
httpClient <- BlazeClientBuilder[F].resource
|
||||
ftsClient <- ftsFactory(httpClient)
|
||||
backend <- create(cfg, store, httpClient, ftsClient)
|
||||
} yield backend
|
||||
}
|
||||
|
@ -0,0 +1,27 @@
|
||||
/*
|
||||
* Copyright 2020 Eike K. & Contributors
|
||||
*
|
||||
* SPDX-License-Identifier: AGPL-3.0-or-later
|
||||
*/
|
||||
|
||||
package docspell.backend.msg
|
||||
|
||||
import docspell.common._
|
||||
import docspell.pubsub.api.{Topic, TypedTopic}
|
||||
|
||||
import io.circe.generic.semiauto.{deriveDecoder, deriveEncoder}
|
||||
import io.circe.{Decoder, Encoder}
|
||||
|
||||
/** Message to request to cancel a job. */
|
||||
final case class CancelJob(jobId: Ident, nodeId: Ident)
|
||||
|
||||
object CancelJob {
|
||||
implicit val jsonDecoder: Decoder[CancelJob] =
|
||||
deriveDecoder[CancelJob]
|
||||
|
||||
implicit val jsonEncoder: Encoder[CancelJob] =
|
||||
deriveEncoder[CancelJob]
|
||||
|
||||
val topic: TypedTopic[CancelJob] =
|
||||
TypedTopic(Topic("job-cancel-request"))
|
||||
}
|
@ -0,0 +1,26 @@
|
||||
/*
|
||||
* Copyright 2020 Eike K. & Contributors
|
||||
*
|
||||
* SPDX-License-Identifier: AGPL-3.0-or-later
|
||||
*/
|
||||
|
||||
package docspell.backend.msg
|
||||
|
||||
import docspell.common._
|
||||
import docspell.pubsub.api.{Topic, TypedTopic}
|
||||
|
||||
import io.circe.generic.semiauto.{deriveDecoder, deriveEncoder}
|
||||
import io.circe.{Decoder, Encoder}
|
||||
|
||||
/** Message to notify about finished jobs. They have a final state. */
|
||||
final case class JobDone(jobId: Ident, task: Ident, args: String, state: JobState)
|
||||
object JobDone {
|
||||
implicit val jsonDecoder: Decoder[JobDone] =
|
||||
deriveDecoder[JobDone]
|
||||
|
||||
implicit val jsonEncoder: Encoder[JobDone] =
|
||||
deriveEncoder[JobDone]
|
||||
|
||||
val topic: TypedTopic[JobDone] =
|
||||
TypedTopic(Topic("job-finished"))
|
||||
}
|
@ -0,0 +1,23 @@
|
||||
/*
|
||||
* Copyright 2020 Eike K. & Contributors
|
||||
*
|
||||
* SPDX-License-Identifier: AGPL-3.0-or-later
|
||||
*/
|
||||
|
||||
package docspell.backend.msg
|
||||
|
||||
import cats.data.NonEmptyList
|
||||
|
||||
import docspell.pubsub.api.{Topic, TypedTopic}
|
||||
|
||||
/** All topics used in Docspell. */
|
||||
object Topics {
|
||||
|
||||
/** A generic notification to the job executors to look for new work. */
|
||||
val jobsNotify: TypedTopic[Unit] =
|
||||
TypedTopic[Unit](Topic("jobs-notify"))
|
||||
|
||||
/** A list of all topics. It is required to list every topic in use here! */
|
||||
val all: NonEmptyList[TypedTopic[_]] =
|
||||
NonEmptyList.of(JobDone.topic, CancelJob.topic, jobsNotify)
|
||||
}
|
@ -11,8 +11,7 @@ import cats.effect._
|
||||
import cats.implicits._
|
||||
|
||||
import docspell.backend.ops.OJob.{CollectiveQueueState, JobCancelResult}
|
||||
import docspell.common.Priority
|
||||
import docspell.common.{Ident, JobState}
|
||||
import docspell.common._
|
||||
import docspell.store.Store
|
||||
import docspell.store.UpdateResult
|
||||
import docspell.store.queries.QJob
|
||||
@ -55,6 +54,7 @@ object OJob {
|
||||
joex: OJoex[F]
|
||||
): Resource[F, OJob[F]] =
|
||||
Resource.pure[F, OJob[F]](new OJob[F] {
|
||||
private[this] val logger = Logger.log4s(org.log4s.getLogger(OJob.getClass))
|
||||
|
||||
def queueState(collective: Ident, maxResults: Int): F[CollectiveQueueState] =
|
||||
store
|
||||
@ -77,11 +77,9 @@ object OJob {
|
||||
job.worker match {
|
||||
case Some(worker) =>
|
||||
for {
|
||||
flag <- joex.cancelJob(job.id, worker)
|
||||
res <-
|
||||
if (flag) JobCancelResult.cancelRequested.pure[F]
|
||||
else remove(job)
|
||||
} yield res
|
||||
_ <- logger.debug(s"Attempt to cancel job: ${job.id.id}")
|
||||
_ <- joex.cancelJob(job.id, worker)
|
||||
} yield JobCancelResult.cancelRequested
|
||||
case None =>
|
||||
remove(job)
|
||||
}
|
||||
|
@ -6,41 +6,27 @@
|
||||
|
||||
package docspell.backend.ops
|
||||
|
||||
import cats.data.OptionT
|
||||
import cats.effect._
|
||||
import cats.implicits._
|
||||
|
||||
import docspell.common.{Ident, NodeType}
|
||||
import docspell.joexapi.client.JoexClient
|
||||
import docspell.store.Store
|
||||
import docspell.store.records.RNode
|
||||
import docspell.backend.msg.{CancelJob, Topics}
|
||||
import docspell.common.Ident
|
||||
import docspell.pubsub.api.PubSubT
|
||||
|
||||
trait OJoex[F[_]] {
|
||||
|
||||
def notifyAllNodes: F[Unit]
|
||||
|
||||
def cancelJob(job: Ident, worker: Ident): F[Boolean]
|
||||
|
||||
def cancelJob(job: Ident, worker: Ident): F[Unit]
|
||||
}
|
||||
|
||||
object OJoex {
|
||||
|
||||
def apply[F[_]: Sync](client: JoexClient[F], store: Store[F]): Resource[F, OJoex[F]] =
|
||||
def apply[F[_]](pubSub: PubSubT[F]): Resource[F, OJoex[F]] =
|
||||
Resource.pure[F, OJoex[F]](new OJoex[F] {
|
||||
|
||||
def notifyAllNodes: F[Unit] =
|
||||
for {
|
||||
nodes <- store.transact(RNode.findAll(NodeType.Joex))
|
||||
_ <- nodes.toList.traverse(n => client.notifyJoexIgnoreErrors(n.url))
|
||||
} yield ()
|
||||
pubSub.publish1IgnoreErrors(Topics.jobsNotify, ())
|
||||
|
||||
def cancelJob(job: Ident, worker: Ident): F[Boolean] =
|
||||
(for {
|
||||
node <- OptionT(store.transact(RNode.findById(worker)))
|
||||
cancel <- OptionT.liftF(client.cancelJob(node.url, job))
|
||||
} yield cancel.success).getOrElse(false)
|
||||
def cancelJob(job: Ident, worker: Ident): F[Unit] =
|
||||
pubSub.publish1IgnoreErrors(CancelJob.topic, CancelJob(job, worker))
|
||||
})
|
||||
|
||||
def create[F[_]: Async](store: Store[F]): Resource[F, OJoex[F]] =
|
||||
JoexClient.resource.flatMap(client => apply(client, store))
|
||||
|
||||
}
|
||||
|
@ -6,6 +6,7 @@
|
||||
|
||||
package docspell.common
|
||||
|
||||
import cats.Applicative
|
||||
import cats.effect.Sync
|
||||
import fs2.Stream
|
||||
|
||||
@ -45,6 +46,27 @@ trait Logger[F[_]] { self =>
|
||||
|
||||
object Logger {
|
||||
|
||||
def off[F[_]: Applicative]: Logger[F] =
|
||||
new Logger[F] {
|
||||
def trace(msg: => String): F[Unit] =
|
||||
Applicative[F].pure(())
|
||||
|
||||
def debug(msg: => String): F[Unit] =
|
||||
Applicative[F].pure(())
|
||||
|
||||
def info(msg: => String): F[Unit] =
|
||||
Applicative[F].pure(())
|
||||
|
||||
def warn(msg: => String): F[Unit] =
|
||||
Applicative[F].pure(())
|
||||
|
||||
def error(ex: Throwable)(msg: => String): F[Unit] =
|
||||
Applicative[F].pure(())
|
||||
|
||||
def error(msg: => String): F[Unit] =
|
||||
Applicative[F].pure(())
|
||||
}
|
||||
|
||||
def log4s[F[_]: Sync](log: Log4sLogger): Logger[F] =
|
||||
new Logger[F] {
|
||||
def trace(msg: => String): F[Unit] =
|
||||
|
@ -12,9 +12,7 @@ import io.circe.Decoder
|
||||
import io.circe.parser._
|
||||
|
||||
trait StringSyntax {
|
||||
|
||||
implicit class EvenMoreStringOps(s: String) {
|
||||
|
||||
def asNonBlank: Option[String] =
|
||||
Option(s).filter(_.trim.nonEmpty)
|
||||
|
||||
@ -24,5 +22,6 @@ trait StringSyntax {
|
||||
value <- json.as[A]
|
||||
} yield value
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
object StringSyntax extends StringSyntax
|
||||
|
@ -20,6 +20,7 @@ import docspell.joex.analysis.RegexNerFile
|
||||
import docspell.joex.hk.HouseKeepingConfig
|
||||
import docspell.joex.scheduler.{PeriodicSchedulerConfig, SchedulerConfig}
|
||||
import docspell.joex.updatecheck.UpdateCheckConfig
|
||||
import docspell.pubsub.naive.PubSubConfig
|
||||
import docspell.store.JdbcConfig
|
||||
|
||||
case class Config(
|
||||
@ -39,7 +40,11 @@ case class Config(
|
||||
mailDebug: Boolean,
|
||||
fullTextSearch: Config.FullTextSearch,
|
||||
updateCheck: UpdateCheckConfig
|
||||
)
|
||||
) {
|
||||
|
||||
def pubSubConfig: PubSubConfig =
|
||||
PubSubConfig(appId, baseUrl / "internal" / "pubsub", 100)
|
||||
}
|
||||
|
||||
object Config {
|
||||
case class Bind(address: String, port: Int)
|
||||
|
@ -6,14 +6,13 @@
|
||||
|
||||
package docspell.joex
|
||||
|
||||
import scala.concurrent.ExecutionContext
|
||||
|
||||
import cats.effect._
|
||||
import cats.implicits._
|
||||
import fs2.concurrent.SignallingRef
|
||||
|
||||
import docspell.analysis.TextAnalyser
|
||||
import docspell.backend.fulltext.CreateIndex
|
||||
import docspell.backend.msg.{CancelJob, Topics}
|
||||
import docspell.backend.ops._
|
||||
import docspell.common._
|
||||
import docspell.ftsclient.FtsClient
|
||||
@ -34,6 +33,7 @@ import docspell.joex.scanmailbox._
|
||||
import docspell.joex.scheduler._
|
||||
import docspell.joex.updatecheck._
|
||||
import docspell.joexapi.client.JoexClient
|
||||
import docspell.pubsub.api.{PubSub, PubSubT}
|
||||
import docspell.store.Store
|
||||
import docspell.store.queue._
|
||||
import docspell.store.records.{REmptyTrashSetting, RJobLog}
|
||||
@ -41,20 +41,18 @@ import docspell.store.usertask.UserTaskScope
|
||||
import docspell.store.usertask.UserTaskStore
|
||||
|
||||
import emil.javamail._
|
||||
import org.http4s.blaze.client.BlazeClientBuilder
|
||||
import org.http4s.client.Client
|
||||
|
||||
final class JoexAppImpl[F[_]: Async](
|
||||
cfg: Config,
|
||||
nodeOps: ONode[F],
|
||||
store: Store[F],
|
||||
queue: JobQueue[F],
|
||||
pubSubT: PubSubT[F],
|
||||
pstore: PeriodicTaskStore[F],
|
||||
termSignal: SignallingRef[F, Boolean],
|
||||
val scheduler: Scheduler[F],
|
||||
val periodicScheduler: PeriodicScheduler[F]
|
||||
) extends JoexApp[F] {
|
||||
|
||||
def init: F[Unit] = {
|
||||
val run = scheduler.start.compile.drain
|
||||
val prun = periodicScheduler.start.compile.drain
|
||||
@ -64,16 +62,23 @@ final class JoexAppImpl[F[_]: Async](
|
||||
_ <- Async[F].start(prun)
|
||||
_ <- scheduler.periodicAwake
|
||||
_ <- periodicScheduler.periodicAwake
|
||||
_ <- nodeOps.register(cfg.appId, NodeType.Joex, cfg.baseUrl)
|
||||
_ <- subscriptions
|
||||
} yield ()
|
||||
}
|
||||
|
||||
def subscriptions =
|
||||
for {
|
||||
_ <- Async[F].start(pubSubT.subscribeSink(Topics.jobsNotify) { _ =>
|
||||
scheduler.notifyChange
|
||||
})
|
||||
_ <- Async[F].start(pubSubT.subscribeSink(CancelJob.topic) { msg =>
|
||||
scheduler.requestCancel(msg.body.jobId).as(())
|
||||
})
|
||||
} yield ()
|
||||
|
||||
def findLogs(jobId: Ident): F[Vector[RJobLog]] =
|
||||
store.transact(RJobLog.findLogs(jobId))
|
||||
|
||||
def shutdown: F[Unit] =
|
||||
nodeOps.unregister(cfg.appId)
|
||||
|
||||
def initShutdown: F[Unit] =
|
||||
periodicScheduler.shutdown *> scheduler.shutdown(false) *> termSignal.set(true)
|
||||
|
||||
@ -116,16 +121,19 @@ object JoexAppImpl {
|
||||
def create[F[_]: Async](
|
||||
cfg: Config,
|
||||
termSignal: SignallingRef[F, Boolean],
|
||||
connectEC: ExecutionContext
|
||||
store: Store[F],
|
||||
httpClient: Client[F],
|
||||
pubSub: PubSub[F]
|
||||
): Resource[F, JoexApp[F]] =
|
||||
for {
|
||||
httpClient <- BlazeClientBuilder[F].resource
|
||||
client = JoexClient(httpClient)
|
||||
store <- Store.create(cfg.jdbc, cfg.files.chunkSize, connectEC)
|
||||
queue <- JobQueue(store)
|
||||
pstore <- PeriodicTaskStore.create(store)
|
||||
nodeOps <- ONode(store)
|
||||
joex <- OJoex(client, store)
|
||||
client = JoexClient(httpClient)
|
||||
pubSubT = PubSubT(
|
||||
pubSub,
|
||||
Logger.log4s(org.log4s.getLogger(s"joex-${cfg.appId.id}"))
|
||||
)
|
||||
joex <- OJoex(pubSubT)
|
||||
upload <- OUpload(store, queue, joex)
|
||||
fts <- createFtsClient(cfg)(httpClient)
|
||||
createIndex <- CreateIndex.resource(fts, store)
|
||||
@ -138,6 +146,7 @@ object JoexAppImpl {
|
||||
JavaMailEmil(Settings.defaultSettings.copy(debug = cfg.mailDebug))
|
||||
sch <- SchedulerBuilder(cfg.scheduler, store)
|
||||
.withQueue(queue)
|
||||
.withPubSub(pubSubT)
|
||||
.withTask(
|
||||
JobTask.json(
|
||||
ProcessItemArgs.taskName,
|
||||
@ -264,8 +273,8 @@ object JoexAppImpl {
|
||||
pstore,
|
||||
client
|
||||
)
|
||||
app = new JoexAppImpl(cfg, nodeOps, store, queue, pstore, termSignal, sch, psch)
|
||||
appR <- Resource.make(app.init.map(_ => app))(_.shutdown)
|
||||
app = new JoexAppImpl(cfg, store, queue, pubSubT, pstore, termSignal, sch, psch)
|
||||
appR <- Resource.make(app.init.map(_ => app))(_.initShutdown)
|
||||
} yield appR
|
||||
|
||||
private def createFtsClient[F[_]: Async](
|
||||
|
@ -11,10 +11,14 @@ import cats.effect._
|
||||
import fs2.Stream
|
||||
import fs2.concurrent.SignallingRef
|
||||
|
||||
import docspell.backend.msg.Topics
|
||||
import docspell.common.Pools
|
||||
import docspell.joex.routes._
|
||||
import docspell.pubsub.naive.NaivePubSub
|
||||
import docspell.store.Store
|
||||
|
||||
import org.http4s.HttpApp
|
||||
import org.http4s.blaze.client.BlazeClientBuilder
|
||||
import org.http4s.blaze.server.BlazeServerBuilder
|
||||
import org.http4s.implicits._
|
||||
import org.http4s.server.Router
|
||||
@ -33,9 +37,19 @@ object JoexServer {
|
||||
val app = for {
|
||||
signal <- Resource.eval(SignallingRef[F, Boolean](false))
|
||||
exitCode <- Resource.eval(Ref[F].of(ExitCode.Success))
|
||||
joexApp <- JoexAppImpl.create[F](cfg, signal, pools.connectEC)
|
||||
|
||||
store <- Store.create[F](
|
||||
cfg.jdbc,
|
||||
cfg.files.chunkSize,
|
||||
pools.connectEC
|
||||
)
|
||||
httpClient <- BlazeClientBuilder[F].resource
|
||||
pubSub <- NaivePubSub(cfg.pubSubConfig, store, httpClient)(Topics.all.map(_.topic))
|
||||
|
||||
joexApp <- JoexAppImpl.create[F](cfg, signal, store, httpClient, pubSub)
|
||||
|
||||
httpApp = Router(
|
||||
"/internal/pubsub" -> pubSub.receiveRoute,
|
||||
"/api/info" -> InfoRoutes(cfg),
|
||||
"/api/v1" -> JoexRoutes(joexApp)
|
||||
).orNotFound
|
||||
|
@ -11,6 +11,7 @@ import cats.effect.std.Semaphore
|
||||
import cats.implicits._
|
||||
import fs2.concurrent.SignallingRef
|
||||
|
||||
import docspell.pubsub.api.PubSubT
|
||||
import docspell.store.Store
|
||||
import docspell.store.queue.JobQueue
|
||||
|
||||
@ -19,7 +20,8 @@ case class SchedulerBuilder[F[_]: Async](
|
||||
tasks: JobTaskRegistry[F],
|
||||
store: Store[F],
|
||||
queue: Resource[F, JobQueue[F]],
|
||||
logSink: LogSink[F]
|
||||
logSink: LogSink[F],
|
||||
pubSub: PubSubT[F]
|
||||
) {
|
||||
|
||||
def withConfig(cfg: SchedulerConfig): SchedulerBuilder[F] =
|
||||
@ -32,7 +34,7 @@ case class SchedulerBuilder[F[_]: Async](
|
||||
withTaskRegistry(tasks.withTask(task))
|
||||
|
||||
def withQueue(queue: Resource[F, JobQueue[F]]): SchedulerBuilder[F] =
|
||||
SchedulerBuilder[F](config, tasks, store, queue, logSink)
|
||||
copy(queue = queue)
|
||||
|
||||
def withLogSink(sink: LogSink[F]): SchedulerBuilder[F] =
|
||||
copy(logSink = sink)
|
||||
@ -40,6 +42,9 @@ case class SchedulerBuilder[F[_]: Async](
|
||||
def withQueue(queue: JobQueue[F]): SchedulerBuilder[F] =
|
||||
copy(queue = Resource.pure[F, JobQueue[F]](queue))
|
||||
|
||||
def withPubSub(pubSubT: PubSubT[F]): SchedulerBuilder[F] =
|
||||
copy(pubSub = pubSubT)
|
||||
|
||||
def serve: Resource[F, Scheduler[F]] =
|
||||
resource.evalMap(sch => Async[F].start(sch.start.compile.drain).map(_ => sch))
|
||||
|
||||
@ -52,6 +57,7 @@ case class SchedulerBuilder[F[_]: Async](
|
||||
} yield new SchedulerImpl[F](
|
||||
config,
|
||||
jq,
|
||||
pubSub,
|
||||
tasks,
|
||||
store,
|
||||
logSink,
|
||||
@ -76,7 +82,8 @@ object SchedulerBuilder {
|
||||
JobTaskRegistry.empty[F],
|
||||
store,
|
||||
JobQueue(store),
|
||||
LogSink.db[F](store)
|
||||
LogSink.db[F](store),
|
||||
PubSubT.noop[F]
|
||||
)
|
||||
|
||||
}
|
||||
|
@ -13,19 +13,22 @@ import cats.implicits._
|
||||
import fs2.Stream
|
||||
import fs2.concurrent.SignallingRef
|
||||
|
||||
import docspell.backend.msg.JobDone
|
||||
import docspell.common._
|
||||
import docspell.common.syntax.all._
|
||||
import docspell.joex.scheduler.SchedulerImpl._
|
||||
import docspell.pubsub.api.PubSubT
|
||||
import docspell.store.Store
|
||||
import docspell.store.queries.QJob
|
||||
import docspell.store.queue.JobQueue
|
||||
import docspell.store.records.RJob
|
||||
|
||||
import org.log4s._
|
||||
import org.log4s.getLogger
|
||||
|
||||
final class SchedulerImpl[F[_]: Async](
|
||||
val config: SchedulerConfig,
|
||||
queue: JobQueue[F],
|
||||
pubSub: PubSubT[F],
|
||||
tasks: JobTaskRegistry[F],
|
||||
store: Store[F],
|
||||
logSink: LogSink[F],
|
||||
@ -55,20 +58,21 @@ final class SchedulerImpl[F[_]: Async](
|
||||
state.get.flatMap(s => QJob.findAll(s.getRunning, store))
|
||||
|
||||
def requestCancel(jobId: Ident): F[Boolean] =
|
||||
state.get.flatMap(_.cancelRequest(jobId) match {
|
||||
case Some(ct) => ct.map(_ => true)
|
||||
case None =>
|
||||
(for {
|
||||
job <- OptionT(store.transact(RJob.findByIdAndWorker(jobId, config.name)))
|
||||
_ <- OptionT.liftF(
|
||||
if (job.isInProgress) executeCancel(job)
|
||||
else ().pure[F]
|
||||
)
|
||||
} yield true)
|
||||
.getOrElseF(
|
||||
logger.fwarn(s"Job ${jobId.id} not found, cannot cancel.").map(_ => false)
|
||||
)
|
||||
})
|
||||
logger.finfo(s"Scheduler requested to cancel job: ${jobId.id}") *>
|
||||
state.get.flatMap(_.cancelRequest(jobId) match {
|
||||
case Some(ct) => ct.map(_ => true)
|
||||
case None =>
|
||||
(for {
|
||||
job <- OptionT(store.transact(RJob.findByIdAndWorker(jobId, config.name)))
|
||||
_ <- OptionT.liftF(
|
||||
if (job.isInProgress) executeCancel(job)
|
||||
else ().pure[F]
|
||||
)
|
||||
} yield true)
|
||||
.getOrElseF(
|
||||
logger.fwarn(s"Job ${jobId.id} not found, cannot cancel.").map(_ => false)
|
||||
)
|
||||
})
|
||||
|
||||
def notifyChange: F[Unit] =
|
||||
waiter.update(b => !b)
|
||||
@ -198,6 +202,10 @@ final class SchedulerImpl[F[_]: Async](
|
||||
)
|
||||
_ <- state.modify(_.removeRunning(job))
|
||||
_ <- QJob.setFinalState(job.id, finalState, store)
|
||||
_ <- pubSub.publish1IgnoreErrors(
|
||||
JobDone.topic,
|
||||
JobDone(job.id, job.task, job.args, finalState)
|
||||
)
|
||||
} yield ()
|
||||
|
||||
def onStart(job: RJob): F[Unit] =
|
||||
|
@ -0,0 +1,28 @@
|
||||
/*
|
||||
* Copyright 2020 Eike K. & Contributors
|
||||
*
|
||||
* SPDX-License-Identifier: AGPL-3.0-or-later
|
||||
*/
|
||||
|
||||
package docspell.pubsub.api
|
||||
|
||||
import io.circe.generic.semiauto.{deriveDecoder, deriveEncoder}
|
||||
import io.circe.{Decoder, Encoder, Json}
|
||||
|
||||
final case class Message[A](head: MessageHead, body: A) {}
|
||||
|
||||
object Message {
|
||||
implicit val jsonDecoderJson: Decoder[Message[Json]] =
|
||||
deriveDecoder[Message[Json]]
|
||||
|
||||
implicit val jsonEncoderJson: Encoder[Message[Json]] =
|
||||
deriveEncoder[Message[Json]]
|
||||
|
||||
implicit def jsonDecoder[A](implicit da: Decoder[A]): Decoder[Message[A]] =
|
||||
jsonDecoderJson.emap(mj =>
|
||||
da.decodeJson(mj.body).map(b => mj.copy(body = b)).left.map(_.message)
|
||||
)
|
||||
|
||||
implicit def jsonEncoder[A](implicit ea: Encoder[A]): Encoder[Message[A]] =
|
||||
jsonEncoderJson.contramap(m => m.copy(body = ea(m.body)))
|
||||
}
|
@ -0,0 +1,22 @@
|
||||
/*
|
||||
* Copyright 2020 Eike K. & Contributors
|
||||
*
|
||||
* SPDX-License-Identifier: AGPL-3.0-or-later
|
||||
*/
|
||||
|
||||
package docspell.pubsub.api
|
||||
|
||||
import docspell.common.{Ident, Timestamp}
|
||||
|
||||
import io.circe.generic.semiauto.{deriveDecoder, deriveEncoder}
|
||||
import io.circe.{Decoder, Encoder}
|
||||
|
||||
final case class MessageHead(id: Ident, send: Timestamp, topic: Topic)
|
||||
|
||||
object MessageHead {
|
||||
implicit val jsonDecoder: Decoder[MessageHead] =
|
||||
deriveDecoder[MessageHead]
|
||||
|
||||
implicit val jsonEncoder: Encoder[MessageHead] =
|
||||
deriveEncoder[MessageHead]
|
||||
}
|
@ -0,0 +1,36 @@
|
||||
/*
|
||||
* Copyright 2020 Eike K. & Contributors
|
||||
*
|
||||
* SPDX-License-Identifier: AGPL-3.0-or-later
|
||||
*/
|
||||
|
||||
package docspell.pubsub.api
|
||||
|
||||
import cats.Applicative
|
||||
import cats.data.NonEmptyList
|
||||
import fs2.{Pipe, Stream}
|
||||
|
||||
import docspell.common.{Ident, Timestamp}
|
||||
|
||||
import io.circe.Json
|
||||
|
||||
trait PubSub[F[_]] {
|
||||
def publish1(topic: Topic, msg: Json): F[MessageHead]
|
||||
|
||||
def publish(topic: Topic): Pipe[F, Json, MessageHead]
|
||||
|
||||
def subscribe(topics: NonEmptyList[Topic]): Stream[F, Message[Json]]
|
||||
}
|
||||
object PubSub {
|
||||
def noop[F[_]: Applicative]: PubSub[F] =
|
||||
new PubSub[F] {
|
||||
def publish1(topic: Topic, msg: Json): F[MessageHead] =
|
||||
Applicative[F].pure(MessageHead(Ident.unsafe("0"), Timestamp.Epoch, topic))
|
||||
|
||||
def publish(topic: Topic): Pipe[F, Json, MessageHead] =
|
||||
_ => Stream.empty
|
||||
|
||||
def subscribe(topics: NonEmptyList[Topic]): Stream[F, Message[Json]] =
|
||||
Stream.empty
|
||||
}
|
||||
}
|
@ -0,0 +1,85 @@
|
||||
/*
|
||||
* Copyright 2020 Eike K. & Contributors
|
||||
*
|
||||
* SPDX-License-Identifier: AGPL-3.0-or-later
|
||||
*/
|
||||
|
||||
package docspell.pubsub.api
|
||||
|
||||
import cats.data.NonEmptyList
|
||||
import cats.effect._
|
||||
import cats.implicits._
|
||||
import fs2.concurrent.SignallingRef
|
||||
import fs2.{Pipe, Stream}
|
||||
|
||||
import docspell.common.Logger
|
||||
|
||||
trait PubSubT[F[_]] {
|
||||
|
||||
def publish1[A](topic: TypedTopic[A], msg: A): F[MessageHead]
|
||||
|
||||
def publish1IgnoreErrors[A](topic: TypedTopic[A], msg: A): F[Unit]
|
||||
|
||||
def publish[A](topic: TypedTopic[A]): Pipe[F, A, MessageHead]
|
||||
|
||||
def subscribe[A](topic: TypedTopic[A]): Stream[F, Message[A]]
|
||||
|
||||
def subscribeSink[A](topic: TypedTopic[A])(handler: Message[A] => F[Unit]): F[F[Unit]]
|
||||
|
||||
def delegate: PubSub[F]
|
||||
|
||||
def withDelegate(delegate: PubSub[F]): PubSubT[F]
|
||||
}
|
||||
|
||||
object PubSubT {
|
||||
def noop[F[_]: Async]: PubSubT[F] =
|
||||
PubSubT(PubSub.noop[F], Logger.off[F])
|
||||
|
||||
def apply[F[_]: Async](pubSub: PubSub[F], logger: Logger[F]): PubSubT[F] =
|
||||
new PubSubT[F] {
|
||||
def publish1[A](topic: TypedTopic[A], msg: A): F[MessageHead] =
|
||||
pubSub.publish1(topic.topic, topic.codec(msg))
|
||||
|
||||
def publish1IgnoreErrors[A](topic: TypedTopic[A], msg: A): F[Unit] =
|
||||
publish1(topic, msg).attempt.flatMap {
|
||||
case Right(_) => ().pure[F]
|
||||
case Left(ex) =>
|
||||
logger.error(ex)(s"Error publishing to topic ${topic.topic.name}: $msg")
|
||||
}
|
||||
|
||||
def publish[A](topic: TypedTopic[A]): Pipe[F, A, MessageHead] =
|
||||
_.map(topic.codec.apply).through(pubSub.publish(topic.topic))
|
||||
|
||||
def subscribe[A](topic: TypedTopic[A]): Stream[F, Message[A]] =
|
||||
pubSub
|
||||
.subscribe(NonEmptyList.of(topic.topic))
|
||||
.flatMap(m =>
|
||||
m.body.as[A](topic.codec) match {
|
||||
case Right(a) => Stream.emit(Message(m.head, a))
|
||||
case Left(err) =>
|
||||
logger.s
|
||||
.error(err)(
|
||||
s"Could not decode message to topic ${topic.name} to ${topic.msgClass}: ${m.body.noSpaces}"
|
||||
)
|
||||
.drain
|
||||
}
|
||||
)
|
||||
|
||||
def subscribeSink[A](
|
||||
topic: TypedTopic[A]
|
||||
)(handler: Message[A] => F[Unit]): F[F[Unit]] =
|
||||
for {
|
||||
halt <- SignallingRef.of[F, Boolean](false)
|
||||
_ <- subscribe(topic)
|
||||
.evalMap(handler)
|
||||
.interruptWhen(halt)
|
||||
.compile
|
||||
.drain
|
||||
} yield halt.set(true)
|
||||
|
||||
def delegate: PubSub[F] = pubSub
|
||||
|
||||
def withDelegate(newDelegate: PubSub[F]): PubSubT[F] =
|
||||
PubSubT(newDelegate, logger)
|
||||
}
|
||||
}
|
@ -0,0 +1,26 @@
|
||||
/*
|
||||
* Copyright 2020 Eike K. & Contributors
|
||||
*
|
||||
* SPDX-License-Identifier: AGPL-3.0-or-later
|
||||
*/
|
||||
|
||||
package docspell.pubsub.api
|
||||
|
||||
import io.circe.{Decoder, Encoder}
|
||||
|
||||
final case class Topic private (topic: String) {
|
||||
def name: String = topic
|
||||
}
|
||||
|
||||
object Topic {
|
||||
implicit val jsonDecoder: Decoder[Topic] =
|
||||
Decoder.decodeString.map(Topic.apply)
|
||||
|
||||
implicit val jsonEncoder: Encoder[Topic] =
|
||||
Encoder.encodeString.contramap(_.topic)
|
||||
|
||||
def apply(name: String): Topic = {
|
||||
require(name.trim.nonEmpty)
|
||||
new Topic(name)
|
||||
}
|
||||
}
|
@ -0,0 +1,29 @@
|
||||
/*
|
||||
* Copyright 2020 Eike K. & Contributors
|
||||
*
|
||||
* SPDX-License-Identifier: AGPL-3.0-or-later
|
||||
*/
|
||||
|
||||
package docspell.pubsub.api
|
||||
|
||||
import scala.reflect.{ClassTag, classTag}
|
||||
|
||||
import io.circe.{Codec, Decoder, Encoder}
|
||||
|
||||
final case class TypedTopic[A](topic: Topic, codec: Codec[A], msgClass: Class[_]) {
|
||||
def name: String = topic.name
|
||||
|
||||
def withTopic(topic: Topic): TypedTopic[A] =
|
||||
copy(topic = topic)
|
||||
|
||||
def withName(name: String): TypedTopic[A] =
|
||||
withTopic(Topic(name))
|
||||
}
|
||||
|
||||
object TypedTopic {
|
||||
|
||||
def apply[A: ClassTag](
|
||||
topic: Topic
|
||||
)(implicit dec: Decoder[A], enc: Encoder[A]): TypedTopic[A] =
|
||||
TypedTopic(topic, Codec.from(dec, enc), classTag[A].runtimeClass)
|
||||
}
|
@ -0,0 +1,205 @@
|
||||
/*
|
||||
* Copyright 2020 Eike K. & Contributors
|
||||
*
|
||||
* SPDX-License-Identifier: AGPL-3.0-or-later
|
||||
*/
|
||||
|
||||
package docspell.pubsub.naive
|
||||
|
||||
import cats.data.NonEmptyList
|
||||
import cats.effect._
|
||||
import cats.implicits._
|
||||
import fs2.Pipe
|
||||
import fs2.Stream
|
||||
import fs2.concurrent.{Topic => Fs2Topic}
|
||||
|
||||
import docspell.common._
|
||||
import docspell.pubsub.api._
|
||||
import docspell.pubsub.naive.NaivePubSub.State
|
||||
import docspell.store.Store
|
||||
import docspell.store.records.RPubSub
|
||||
|
||||
import io.circe.Json
|
||||
import org.http4s.circe.CirceEntityCodec._
|
||||
import org.http4s.client.Client
|
||||
import org.http4s.client.dsl.Http4sClientDsl
|
||||
import org.http4s.dsl.Http4sDsl
|
||||
import org.http4s.{HttpRoutes, Uri}
|
||||
|
||||
/** A pubsub implementation that can be used across machines, using a rather inefficient
|
||||
* but simple protocol. It can therefore work with the current setup, i.e. not requiring
|
||||
* to add another complex piece of software to the mix, like Kafka or RabbitMQ.
|
||||
*
|
||||
* However, the api should allow to be used on top of such a tool. This implementation
|
||||
* can be used in a personal setting, where there are only a few nodes.
|
||||
*
|
||||
* How it works: Each node has a set of local subscribers and a http endpoint. If it
|
||||
* publishes a message, it notifies all local subscribers and sends out a json message to
|
||||
* all endpoints that are registered for this topic. If it receives a messagen through
|
||||
* its endpoint, it notifies all local subscribers.
|
||||
*
|
||||
* It is build on the `Topic` class from fs2.concurrent. A map of the name to such a
|
||||
* `Topic` instance is maintained. To work across machines, the database is used as a
|
||||
* synchronization point. Each node must provide a http api and so its "callback" URL is
|
||||
* added into the database associated to a topic name.
|
||||
*
|
||||
* When publishing a message, the message can be published to the internal fs2 topic.
|
||||
* Then all URLs to this topic name are looked up in the database and the message is
|
||||
* POSTed to each URL as JSON. The endpoint of each machine takes this message and
|
||||
* publishes it to its own internal fs2.concurrent.Topic instance.
|
||||
*
|
||||
* Obviously, this doesn't scale well to lots of machines and messages. It should be good
|
||||
* enough for personal use, where there are only a small amount of machines and messages.
|
||||
*
|
||||
* The main use case for docspell is to communicate between the rest-server and job
|
||||
* executor. It is for internal communication and all topics are known at compile time.
|
||||
*/
|
||||
final class NaivePubSub[F[_]: Async](
|
||||
cfg: PubSubConfig,
|
||||
state: Ref[F, State[F]],
|
||||
store: Store[F],
|
||||
client: Client[F]
|
||||
) extends PubSub[F] {
|
||||
private val logger: Logger[F] = Logger.log4s(org.log4s.getLogger)
|
||||
|
||||
def withClient(client: Client[F]): NaivePubSub[F] =
|
||||
new NaivePubSub[F](cfg, state, store, client)
|
||||
|
||||
def publish1(topic: Topic, msgBody: Json): F[MessageHead] =
|
||||
for {
|
||||
head <- mkMessageHead(topic)
|
||||
msg = Message(head, msgBody)
|
||||
_ <- logger.trace(s"Publishing: $msg")
|
||||
// go through all local subscribers and publish to the fs2 topic
|
||||
_ <- publishLocal(msg)
|
||||
// get all remote subscribers from the database and send the message via http
|
||||
_ <- publishRemote(msg)
|
||||
} yield head
|
||||
|
||||
def publish(topic: Topic): Pipe[F, Json, MessageHead] =
|
||||
ms => //TODO Do some optimization by grouping messages to the same topic
|
||||
ms.evalMap(publish1(topic, _))
|
||||
|
||||
def subscribe(topics: NonEmptyList[Topic]): Stream[F, Message[Json]] =
|
||||
(for {
|
||||
_ <- logger.s.info(s"Adding subscriber for topics: $topics")
|
||||
_ <- Stream.resource[F, Unit](addRemote(topics))
|
||||
m <- Stream.eval(addLocal(topics))
|
||||
} yield m).flatten
|
||||
|
||||
/** Receive messages from remote publishers and passes them to the local subscribers. */
|
||||
def receiveRoute: HttpRoutes[F] = {
|
||||
val dsl = new Http4sDsl[F] {}
|
||||
import dsl._
|
||||
|
||||
HttpRoutes.of { case req @ POST -> Root =>
|
||||
for {
|
||||
data <- req.as[List[Message[Json]]]
|
||||
_ <- logger.trace(s"Received external message(s): $data")
|
||||
_ <- data.traverse(publishLocal)
|
||||
resp <- Ok(())
|
||||
} yield resp
|
||||
}
|
||||
}
|
||||
|
||||
// ---- private helpers
|
||||
|
||||
private def mkMessageHead(topic: Topic): F[MessageHead] =
|
||||
for {
|
||||
id <- Ident.randomId[F]
|
||||
ts <- Timestamp.current[F]
|
||||
head = MessageHead(id, ts, topic)
|
||||
} yield head
|
||||
|
||||
private def addLocal(topics: NonEmptyList[Topic]): F[Stream[F, Message[Json]]] = {
|
||||
val topicSet = topics.map(_.name).toList.toSet
|
||||
for {
|
||||
st <- state.get
|
||||
tpc = st.topics.view.filterKeys(topicSet.contains)
|
||||
_ <-
|
||||
if (tpc.isEmpty)
|
||||
logger.warn(s"Subscribing to 0 topics! Topics $topics were not initialized")
|
||||
else ().pure[F]
|
||||
data = tpc.values.toList.traverse(t => t.subscribe(cfg.subscriberQueueSize))
|
||||
out = data.flatMap(msgs => Stream.emits(msgs))
|
||||
} yield out
|
||||
}
|
||||
|
||||
private def addRemote(topics: NonEmptyList[Topic]): Resource[F, Unit] = {
|
||||
def subscribe: F[Unit] =
|
||||
logger.trace(s"Incrementing counter for topics: $topics") *>
|
||||
store.transact(RPubSub.increment(cfg.url, topics.map(_.name))).as(())
|
||||
|
||||
def unsubscribe: F[Unit] =
|
||||
logger.trace(s"Decrementing counter for topics: $topics") *>
|
||||
store.transact(RPubSub.decrement(cfg.url, topics.map(_.name))).as(())
|
||||
|
||||
Resource.make(subscribe)(_ => unsubscribe)
|
||||
}
|
||||
|
||||
private def publishLocal(msg: Message[Json]): F[Unit] =
|
||||
for {
|
||||
st <- state.get
|
||||
_ <- st.topics.get(msg.head.topic.name) match {
|
||||
case Some(sub) =>
|
||||
logger.trace(s"Publishing message to local topic: $msg") *>
|
||||
sub.publish1(msg).as(())
|
||||
case None =>
|
||||
().pure[F]
|
||||
}
|
||||
} yield ()
|
||||
|
||||
private def publishRemote(msg: Message[Json]): F[Unit] = {
|
||||
val dsl = new Http4sDsl[F] with Http4sClientDsl[F] {}
|
||||
import dsl._
|
||||
|
||||
for {
|
||||
_ <- logger.trace(s"Find all nodes subscribed to topic ${msg.head.topic.name}")
|
||||
urls <- store.transact(RPubSub.findSubs(msg.head.topic.name, cfg.nodeId))
|
||||
_ <- logger.trace(s"Publishing to remote urls ${urls.map(_.asString)}: $msg")
|
||||
reqs = urls
|
||||
.map(u => Uri.unsafeFromString(u.asString))
|
||||
.map(uri => POST(List(msg), uri))
|
||||
resList <- reqs.traverse(req => client.status(req).attempt)
|
||||
_ <- resList.traverse {
|
||||
case Right(s) =>
|
||||
if (s.isSuccess) ().pure[F]
|
||||
else logger.warn(s"A node was not reached! Reason: $s, message: $msg")
|
||||
case Left(ex) =>
|
||||
logger.error(ex)(s"Error publishing ${msg.head.topic.name} message remotely")
|
||||
}
|
||||
} yield ()
|
||||
}
|
||||
}
|
||||
|
||||
object NaivePubSub {
|
||||
|
||||
def apply[F[_]: Async](
|
||||
cfg: PubSubConfig,
|
||||
store: Store[F],
|
||||
client: Client[F]
|
||||
)(topics: NonEmptyList[Topic]): Resource[F, NaivePubSub[F]] =
|
||||
Resource.eval(for {
|
||||
state <- Ref.ofEffect[F, State[F]](State.create[F](topics))
|
||||
_ <- store.transact(RPubSub.initTopics(cfg.nodeId, cfg.url, topics.map(_.name)))
|
||||
} yield new NaivePubSub[F](cfg, state, store, client))
|
||||
|
||||
def create[F[_]: Async](
|
||||
cfg: PubSubConfig,
|
||||
store: Store[F],
|
||||
client: Client[F],
|
||||
logger: Logger[F]
|
||||
)(topics: NonEmptyList[Topic]): Resource[F, PubSubT[F]] =
|
||||
apply[F](cfg, store, client)(topics).map(ps => PubSubT(ps, logger))
|
||||
|
||||
final case class State[F[_]](topics: Map[String, Fs2Topic[F, Message[Json]]]) {}
|
||||
|
||||
object State {
|
||||
def empty[F[_]]: State[F] = State[F](Map.empty)
|
||||
def create[F[_]: Async](topics: NonEmptyList[Topic]): F[State[F]] =
|
||||
topics
|
||||
.traverse(t => Fs2Topic[F, Message[Json]].map(fs2t => t.name -> fs2t))
|
||||
.map(_.toList.toMap)
|
||||
.map(State.apply)
|
||||
}
|
||||
}
|
@ -0,0 +1,11 @@
|
||||
/*
|
||||
* Copyright 2020 Eike K. & Contributors
|
||||
*
|
||||
* SPDX-License-Identifier: AGPL-3.0-or-later
|
||||
*/
|
||||
|
||||
package docspell.pubsub.naive
|
||||
|
||||
import docspell.common.{Ident, LenientUri}
|
||||
|
||||
case class PubSubConfig(nodeId: Ident, url: LenientUri, subscriberQueueSize: Int)
|
@ -0,0 +1,85 @@
|
||||
/*
|
||||
* Copyright 2020 Eike K. & Contributors
|
||||
*
|
||||
* SPDX-License-Identifier: AGPL-3.0-or-later
|
||||
*/
|
||||
|
||||
package docspell.pubsub.naive
|
||||
|
||||
import cats.effect._
|
||||
|
||||
import docspell.common._
|
||||
import docspell.pubsub.api._
|
||||
import docspell.store.{Store, StoreFixture}
|
||||
|
||||
import munit.CatsEffectSuite
|
||||
import org.http4s.Response
|
||||
import org.http4s.client.Client
|
||||
|
||||
trait Fixtures extends HttpClientOps { self: CatsEffectSuite =>
|
||||
|
||||
val pubsubEnv = ResourceFixture(Fixtures.envResource("node-1"))
|
||||
|
||||
val pubsubT = ResourceFixture {
|
||||
Fixtures
|
||||
.envResource("node-1")
|
||||
.flatMap(_.pubSub)
|
||||
.map(ps => PubSubT(ps, Fixtures.loggerIO))
|
||||
}
|
||||
|
||||
def conntectedPubsubs(env: Fixtures.Env) =
|
||||
for {
|
||||
// Create two pubsub instances connected to the same database
|
||||
ps_1 <- env.withNodeId("node-1").pubSubT
|
||||
ps_2 <- env.withNodeId("node-2").pubSubT
|
||||
|
||||
// both instances have a dummy client. now connect their clients to each other
|
||||
ps1 = ps_1.withDelegate(ps_1.delegateT.withClient(httpClient(ps_2)))
|
||||
ps2 = ps_2.withDelegate(ps_2.delegateT.withClient(httpClient(ps_1)))
|
||||
} yield (ps1, ps2)
|
||||
|
||||
implicit final class StringId(s: String) {
|
||||
def id: Ident = Ident.unsafe(s)
|
||||
}
|
||||
}
|
||||
|
||||
object Fixtures {
|
||||
private val loggerIO: Logger[IO] = Logger.log4s(org.log4s.getLogger)
|
||||
|
||||
final case class Env(store: Store[IO], cfg: PubSubConfig) {
|
||||
def pubSub: Resource[IO, NaivePubSub[IO]] = {
|
||||
val dummyClient = Client[IO](_ => Resource.pure(Response.notFound[IO]))
|
||||
NaivePubSub(cfg, store, dummyClient)(Topics.all.map(_.topic))
|
||||
}
|
||||
def pubSubT: Resource[IO, PubSubT[IO]] =
|
||||
pubSub.map(PubSubT(_, loggerIO))
|
||||
|
||||
def withNodeId(nodeId: String): Env =
|
||||
copy(cfg =
|
||||
cfg.copy(
|
||||
nodeId = Ident.unsafe(nodeId),
|
||||
url = LenientUri.unsafe(s"http://$nodeId/")
|
||||
)
|
||||
)
|
||||
}
|
||||
|
||||
def testConfig(nodeId: String) =
|
||||
PubSubConfig(
|
||||
Ident.unsafe(nodeId),
|
||||
LenientUri.unsafe(s"http://$nodeId/"),
|
||||
0
|
||||
)
|
||||
|
||||
def storeResource: Resource[IO, Store[IO]] =
|
||||
for {
|
||||
random <- Resource.eval(Ident.randomId[IO])
|
||||
cfg = StoreFixture.memoryDB(random.id.take(12))
|
||||
store <- StoreFixture.store(cfg)
|
||||
_ <- Resource.eval(store.migrate)
|
||||
} yield store
|
||||
|
||||
def envResource(nodeId: String): Resource[IO, Env] =
|
||||
for {
|
||||
store <- storeResource
|
||||
} yield Env(store, testConfig(nodeId))
|
||||
}
|
@ -0,0 +1,59 @@
|
||||
/*
|
||||
* Copyright 2020 Eike K. & Contributors
|
||||
*
|
||||
* SPDX-License-Identifier: AGPL-3.0-or-later
|
||||
*/
|
||||
|
||||
package docspell.pubsub.naive
|
||||
|
||||
import cats.effect._
|
||||
|
||||
import docspell.common._
|
||||
import docspell.pubsub.api._
|
||||
|
||||
import io.circe.Encoder
|
||||
import org.http4s.circe.CirceEntityCodec._
|
||||
import org.http4s.client.Client
|
||||
import org.http4s.client.dsl.io._
|
||||
import org.http4s.dsl.io._
|
||||
import org.http4s.{HttpApp, HttpRoutes, Uri}
|
||||
|
||||
trait HttpClientOps {
|
||||
def httpClient(routes: HttpRoutes[IO]): Client[IO] =
|
||||
Client.fromHttpApp(HttpApp(routes.orNotFound.run))
|
||||
|
||||
def httpClient(ps: NaivePubSub[IO]): Client[IO] =
|
||||
httpClient(ps.receiveRoute)
|
||||
|
||||
def httpClient(ps: PubSubT[IO]): Client[IO] =
|
||||
httpClient(ps.delegateT)
|
||||
|
||||
implicit final class ClientOps(client: Client[IO]) {
|
||||
val uri = Uri.unsafeFromString("http://localhost/")
|
||||
|
||||
def sendMessage[A: Encoder](topic: Topic, body: A): IO[Unit] = {
|
||||
val encode: Encoder[List[Message[A]]] = implicitly[Encoder[List[Message[A]]]]
|
||||
|
||||
for {
|
||||
id <- Ident.randomId[IO]
|
||||
time <- Timestamp.current[IO]
|
||||
mesg = List(Message(MessageHead(id, time, topic), body))
|
||||
_ <- HttpClientOps.logger.debug(s"Sending message(s): $mesg")
|
||||
_ <- client.expectOr[Unit](POST(encode(mesg), uri)) { resp =>
|
||||
IO(new Exception(s"Unexpected response: $resp"))
|
||||
}
|
||||
} yield ()
|
||||
}
|
||||
|
||||
def send[A](typedTopic: TypedTopic[A], body: A): IO[Unit] =
|
||||
sendMessage(typedTopic.topic, body)(typedTopic.codec)
|
||||
}
|
||||
|
||||
implicit final class PubSubTestOps(ps: PubSubT[IO]) {
|
||||
def delegateT: NaivePubSub[IO] = ps.delegate.asInstanceOf[NaivePubSub[IO]]
|
||||
}
|
||||
}
|
||||
|
||||
object HttpClientOps {
|
||||
private val logger: Logger[IO] = Logger.log4s(org.log4s.getLogger)
|
||||
}
|
@ -0,0 +1,128 @@
|
||||
/*
|
||||
* Copyright 2020 Eike K. & Contributors
|
||||
*
|
||||
* SPDX-License-Identifier: AGPL-3.0-or-later
|
||||
*/
|
||||
|
||||
package docspell.pubsub.naive
|
||||
|
||||
import scala.concurrent.duration._
|
||||
|
||||
import cats.effect._
|
||||
import cats.implicits._
|
||||
import fs2.concurrent.SignallingRef
|
||||
|
||||
import docspell.common._
|
||||
import docspell.pubsub.api._
|
||||
import docspell.pubsub.naive.Topics._
|
||||
|
||||
import munit.CatsEffectSuite
|
||||
|
||||
class NaivePubSubTest extends CatsEffectSuite with Fixtures {
|
||||
private[this] val logger = Logger.log4s[IO](org.log4s.getLogger)
|
||||
|
||||
def subscribe[A](ps: PubSubT[IO], topic: TypedTopic[A]) =
|
||||
for {
|
||||
received <- Ref.of[IO, Option[Message[A]]](None)
|
||||
halt <- SignallingRef.of[IO, Boolean](false)
|
||||
fiber <- Async[IO].start(
|
||||
logger.debug(s"${Thread.currentThread()} Listening for messages...") *>
|
||||
ps.subscribe(topic)
|
||||
.evalMap(m =>
|
||||
logger.debug(s"Handling message: $m") *>
|
||||
received.set(Some(m)) *>
|
||||
halt.set(true)
|
||||
)
|
||||
.interruptWhen(halt)
|
||||
.compile
|
||||
.drain
|
||||
)
|
||||
_ <- IO.sleep(500.millis)
|
||||
} yield (received, halt, fiber)
|
||||
|
||||
pubsubT.test("local publish receives message") { ps =>
|
||||
for {
|
||||
res <- subscribe(ps, Topics.jobSubmitted)
|
||||
(received, _, subFiber) = res
|
||||
headSend <- ps.publish1(Topics.jobSubmitted, JobSubmittedMsg("hello".id))
|
||||
outcome <- subFiber.join
|
||||
msgRec <- received.get
|
||||
_ = assert(outcome.isSuccess)
|
||||
_ = assertEquals(msgRec.map(_.head), Option(headSend))
|
||||
} yield ()
|
||||
}
|
||||
|
||||
pubsubT.test("local publish to different topic doesn't receive") { ps =>
|
||||
val otherTopic = Topics.jobSubmitted.withTopic(Topic("other-name"))
|
||||
for {
|
||||
res <- subscribe(ps, Topics.jobSubmitted)
|
||||
(received, halt, subFiber) = res
|
||||
_ <- ps.publish1(otherTopic, JobSubmittedMsg("hello".id))
|
||||
_ <- IO.sleep(100.millis) //allow some time for receiving
|
||||
_ <- halt.set(true)
|
||||
outcome <- subFiber.join
|
||||
_ = assert(outcome.isSuccess)
|
||||
recMsg <- received.get
|
||||
_ = assert(recMsg.isEmpty)
|
||||
} yield ()
|
||||
}
|
||||
|
||||
pubsubT.test("receive messages remotely") { ps =>
|
||||
val msg = JobSubmittedMsg("hello-remote".id)
|
||||
for {
|
||||
res <- subscribe(ps, Topics.jobSubmitted)
|
||||
(received, _, subFiber) = res
|
||||
client = httpClient(ps.delegateT.receiveRoute)
|
||||
_ <- client.send(Topics.jobSubmitted, msg)
|
||||
outcome <- subFiber.join
|
||||
msgRec <- received.get
|
||||
_ = assert(outcome.isSuccess)
|
||||
_ = assertEquals(msgRec.map(_.head.topic), Topics.jobSubmitted.topic.some)
|
||||
_ = assertEquals(msgRec.map(_.body), msg.some)
|
||||
} yield ()
|
||||
}
|
||||
|
||||
pubsubEnv.test("send messages remotely") { env =>
|
||||
val msg = JobSubmittedMsg("hello-remote".id)
|
||||
|
||||
// Create two pubsub instances connected to the same database
|
||||
conntectedPubsubs(env).use { case (ps1, ps2) =>
|
||||
for {
|
||||
// subscribe to ps1 and send via ps2
|
||||
res <- subscribe(ps1, Topics.jobSubmitted)
|
||||
(received, _, subFiber) = res
|
||||
_ <- ps2.publish1(Topics.jobSubmitted, msg)
|
||||
outcome <- subFiber.join
|
||||
msgRec <- received.get
|
||||
|
||||
// check results
|
||||
_ = assert(outcome.isSuccess)
|
||||
_ = assertEquals(msgRec.map(_.head.topic), Topics.jobSubmitted.topic.some)
|
||||
_ = assertEquals(msgRec.map(_.body), msg.some)
|
||||
} yield ()
|
||||
}
|
||||
}
|
||||
|
||||
pubsubEnv.test("do not receive remote message from other topic") { env =>
|
||||
val msg = JobCancelMsg("job-1".id)
|
||||
|
||||
// Create two pubsub instances connected to the same database
|
||||
conntectedPubsubs(env).use { case (ps1, ps2) =>
|
||||
for {
|
||||
// subscribe to ps1 and send via ps2
|
||||
res <- subscribe(ps1, Topics.jobSubmitted)
|
||||
(received, halt, subFiber) = res
|
||||
_ <- ps2.publish1(Topics.jobCancel, msg)
|
||||
_ <- IO.sleep(100.millis)
|
||||
_ <- halt.set(true)
|
||||
outcome <- subFiber.join
|
||||
msgRec <- received.get
|
||||
|
||||
// check results
|
||||
_ = assert(outcome.isSuccess)
|
||||
_ = assertEquals(msgRec, None)
|
||||
} yield ()
|
||||
}
|
||||
|
||||
}
|
||||
}
|
@ -0,0 +1,37 @@
|
||||
/*
|
||||
* Copyright 2020 Eike K. & Contributors
|
||||
*
|
||||
* SPDX-License-Identifier: AGPL-3.0-or-later
|
||||
*/
|
||||
|
||||
package docspell.pubsub.naive
|
||||
|
||||
import cats.data.NonEmptyList
|
||||
|
||||
import docspell.common.Ident
|
||||
import docspell.pubsub.api._
|
||||
|
||||
import io.circe.generic.semiauto.{deriveDecoder, deriveEncoder}
|
||||
import io.circe.{Decoder, Encoder}
|
||||
|
||||
object Topics {
|
||||
val jobSubmitted: TypedTopic[JobSubmittedMsg] =
|
||||
TypedTopic[JobSubmittedMsg](Topic("test-job-submitted"))
|
||||
|
||||
final case class JobSubmittedMsg(task: Ident)
|
||||
object JobSubmittedMsg {
|
||||
implicit val encode: Encoder[JobSubmittedMsg] = deriveEncoder[JobSubmittedMsg]
|
||||
implicit val decode: Decoder[JobSubmittedMsg] = deriveDecoder[JobSubmittedMsg]
|
||||
}
|
||||
|
||||
val jobCancel: TypedTopic[JobCancelMsg] =
|
||||
TypedTopic[JobCancelMsg](Topic("test-job-done"))
|
||||
final case class JobCancelMsg(id: Ident)
|
||||
object JobCancelMsg {
|
||||
implicit val encode: Encoder[JobCancelMsg] = deriveEncoder[JobCancelMsg]
|
||||
implicit val decode: Decoder[JobCancelMsg] = deriveDecoder[JobCancelMsg]
|
||||
}
|
||||
|
||||
def all: NonEmptyList[TypedTopic[_]] =
|
||||
NonEmptyList.of(jobSubmitted, jobCancel)
|
||||
}
|
@ -11,6 +11,7 @@ import docspell.backend.{Config => BackendConfig}
|
||||
import docspell.common._
|
||||
import docspell.ftssolr.SolrConfig
|
||||
import docspell.oidc.ProviderConfig
|
||||
import docspell.pubsub.naive.PubSubConfig
|
||||
import docspell.restserver.Config.OpenIdConfig
|
||||
import docspell.restserver.auth.OpenId
|
||||
|
||||
@ -33,6 +34,9 @@ case class Config(
|
||||
) {
|
||||
def openIdEnabled: Boolean =
|
||||
openid.exists(_.enabled)
|
||||
|
||||
def pubSubConfig: PubSubConfig =
|
||||
PubSubConfig(appId, baseUrl / "internal" / "pubsub", 100)
|
||||
}
|
||||
|
||||
object Config {
|
||||
|
@ -38,12 +38,6 @@ object Main extends IOApp {
|
||||
|
||||
pools = connectEC.map(Pools.apply)
|
||||
rc <-
|
||||
pools.use(p =>
|
||||
RestServer
|
||||
.stream[IO](cfg, p)
|
||||
.compile
|
||||
.drain
|
||||
.as(ExitCode.Success)
|
||||
)
|
||||
pools.use(p => RestServer.serve[IO](cfg, p))
|
||||
} yield rc
|
||||
}
|
||||
|
@ -10,8 +10,6 @@ import docspell.backend.BackendApp
|
||||
|
||||
trait RestApp[F[_]] {
|
||||
|
||||
def init: F[Unit]
|
||||
|
||||
def config: Config
|
||||
|
||||
def backend: BackendApp[F]
|
||||
|
@ -6,41 +6,36 @@
|
||||
|
||||
package docspell.restserver
|
||||
|
||||
import scala.concurrent.ExecutionContext
|
||||
|
||||
import cats.effect._
|
||||
import cats.implicits._
|
||||
|
||||
import docspell.backend.BackendApp
|
||||
import docspell.common.NodeType
|
||||
import docspell.common.Logger
|
||||
import docspell.ftsclient.FtsClient
|
||||
import docspell.ftssolr.SolrFtsClient
|
||||
import docspell.pubsub.api.{PubSub, PubSubT}
|
||||
import docspell.store.Store
|
||||
|
||||
import org.http4s.client.Client
|
||||
|
||||
final class RestAppImpl[F[_]](val config: Config, val backend: BackendApp[F])
|
||||
extends RestApp[F] {
|
||||
|
||||
def init: F[Unit] =
|
||||
backend.node.register(config.appId, NodeType.Restserver, config.baseUrl)
|
||||
|
||||
def shutdown: F[Unit] =
|
||||
backend.node.unregister(config.appId)
|
||||
}
|
||||
extends RestApp[F] {}
|
||||
|
||||
object RestAppImpl {
|
||||
|
||||
def create[F[_]: Async](
|
||||
cfg: Config,
|
||||
connectEC: ExecutionContext
|
||||
): Resource[F, RestApp[F]] =
|
||||
store: Store[F],
|
||||
httpClient: Client[F],
|
||||
pubSub: PubSub[F]
|
||||
): Resource[F, RestApp[F]] = {
|
||||
val logger = Logger.log4s(org.log4s.getLogger(s"restserver-${cfg.appId.id}"))
|
||||
for {
|
||||
backend <- BackendApp(cfg.backend, connectEC)(
|
||||
createFtsClient[F](cfg)
|
||||
)
|
||||
ftsClient <- createFtsClient(cfg)(httpClient)
|
||||
pubSubT = PubSubT(pubSub, logger)
|
||||
backend <- BackendApp.create[F](cfg.backend, store, ftsClient, pubSubT)
|
||||
app = new RestAppImpl[F](cfg, backend)
|
||||
appR <- Resource.make(app.init.map(_ => app))(_.shutdown)
|
||||
} yield appR
|
||||
} yield app
|
||||
}
|
||||
|
||||
private def createFtsClient[F[_]: Async](
|
||||
cfg: Config
|
||||
|
@ -6,17 +6,25 @@
|
||||
|
||||
package docspell.restserver
|
||||
|
||||
import scala.concurrent.duration._
|
||||
|
||||
import cats.effect._
|
||||
import cats.implicits._
|
||||
import fs2.Stream
|
||||
import fs2.concurrent.Topic
|
||||
|
||||
import docspell.backend.auth.{AuthToken, ShareToken}
|
||||
import docspell.backend.msg.Topics
|
||||
import docspell.common._
|
||||
import docspell.oidc.CodeFlowRoutes
|
||||
import docspell.pubsub.naive.NaivePubSub
|
||||
import docspell.restserver.auth.OpenId
|
||||
import docspell.restserver.http4s.EnvMiddleware
|
||||
import docspell.restserver.routes._
|
||||
import docspell.restserver.webapp._
|
||||
import docspell.restserver.ws.OutputEvent.KeepAlive
|
||||
import docspell.restserver.ws.{OutputEvent, WebSocketRoutes}
|
||||
import docspell.store.Store
|
||||
|
||||
import org.http4s._
|
||||
import org.http4s.blaze.client.BlazeClientBuilder
|
||||
@ -27,55 +35,96 @@ import org.http4s.headers.Location
|
||||
import org.http4s.implicits._
|
||||
import org.http4s.server.Router
|
||||
import org.http4s.server.middleware.Logger
|
||||
import org.http4s.server.websocket.WebSocketBuilder2
|
||||
|
||||
object RestServer {
|
||||
|
||||
def stream[F[_]: Async](cfg: Config, pools: Pools): Stream[F, Nothing] = {
|
||||
def serve[F[_]: Async](cfg: Config, pools: Pools): F[ExitCode] =
|
||||
for {
|
||||
wsTopic <- Topic[F, OutputEvent]
|
||||
keepAlive = Stream
|
||||
.awakeEvery[F](30.seconds)
|
||||
.map(_ => KeepAlive)
|
||||
.through(wsTopic.publish)
|
||||
|
||||
val templates = TemplateRoutes[F](cfg)
|
||||
val app = for {
|
||||
restApp <- RestAppImpl.create[F](cfg, pools.connectEC)
|
||||
server =
|
||||
Stream
|
||||
.resource(createApp(cfg, pools))
|
||||
.flatMap { case (restApp, pubSub, httpClient) =>
|
||||
Stream(
|
||||
Subscriptions(wsTopic, restApp.backend.pubSub),
|
||||
BlazeServerBuilder[F]
|
||||
.bindHttp(cfg.bind.port, cfg.bind.address)
|
||||
.withoutBanner
|
||||
.withHttpWebSocketApp(
|
||||
createHttpApp(cfg, httpClient, pubSub, restApp, wsTopic)
|
||||
)
|
||||
.serve
|
||||
.drain
|
||||
)
|
||||
}
|
||||
|
||||
exit <-
|
||||
(server ++ Stream(keepAlive)).parJoinUnbounded.compile.drain.as(ExitCode.Success)
|
||||
} yield exit
|
||||
|
||||
def createApp[F[_]: Async](
|
||||
cfg: Config,
|
||||
pools: Pools
|
||||
): Resource[F, (RestApp[F], NaivePubSub[F], Client[F])] =
|
||||
for {
|
||||
httpClient <- BlazeClientBuilder[F].resource
|
||||
httpApp = Router(
|
||||
"/api/info" -> routes.InfoRoutes(),
|
||||
"/api/v1/open/" -> openRoutes(cfg, httpClient, restApp),
|
||||
"/api/v1/sec/" -> Authenticate(restApp.backend.login, cfg.auth) { token =>
|
||||
securedRoutes(cfg, restApp, token)
|
||||
},
|
||||
"/api/v1/admin" -> AdminAuth(cfg.adminEndpoint) {
|
||||
adminRoutes(cfg, restApp)
|
||||
},
|
||||
"/api/v1/share" -> ShareAuth(restApp.backend.share, cfg.auth) { token =>
|
||||
shareRoutes(cfg, restApp, token)
|
||||
},
|
||||
"/api/doc" -> templates.doc,
|
||||
"/app/assets" -> EnvMiddleware(WebjarRoutes.appRoutes[F]),
|
||||
"/app" -> EnvMiddleware(templates.app),
|
||||
"/sw.js" -> EnvMiddleware(templates.serviceWorker),
|
||||
"/" -> redirectTo("/app")
|
||||
).orNotFound
|
||||
|
||||
finalHttpApp = Logger.httpApp(logHeaders = false, logBody = false)(httpApp)
|
||||
|
||||
} yield finalHttpApp
|
||||
|
||||
Stream
|
||||
.resource(app)
|
||||
.flatMap(httpApp =>
|
||||
BlazeServerBuilder[F]
|
||||
.bindHttp(cfg.bind.port, cfg.bind.address)
|
||||
.withHttpApp(httpApp)
|
||||
.withoutBanner
|
||||
.serve
|
||||
store <- Store.create[F](
|
||||
cfg.backend.jdbc,
|
||||
cfg.backend.files.chunkSize,
|
||||
pools.connectEC
|
||||
)
|
||||
}.drain
|
||||
pubSub <- NaivePubSub(cfg.pubSubConfig, store, httpClient)(Topics.all.map(_.topic))
|
||||
restApp <- RestAppImpl.create[F](cfg, store, httpClient, pubSub)
|
||||
} yield (restApp, pubSub, httpClient)
|
||||
|
||||
def createHttpApp[F[_]: Async](
|
||||
cfg: Config,
|
||||
httpClient: Client[F],
|
||||
pubSub: NaivePubSub[F],
|
||||
restApp: RestApp[F],
|
||||
topic: Topic[F, OutputEvent]
|
||||
)(
|
||||
wsB: WebSocketBuilder2[F]
|
||||
) = {
|
||||
val templates = TemplateRoutes[F](cfg)
|
||||
val httpApp = Router(
|
||||
"/internal/pubsub" -> pubSub.receiveRoute,
|
||||
"/api/info" -> routes.InfoRoutes(),
|
||||
"/api/v1/open/" -> openRoutes(cfg, httpClient, restApp),
|
||||
"/api/v1/sec/" -> Authenticate(restApp.backend.login, cfg.auth) { token =>
|
||||
securedRoutes(cfg, restApp, wsB, topic, token)
|
||||
},
|
||||
"/api/v1/admin" -> AdminAuth(cfg.adminEndpoint) {
|
||||
adminRoutes(cfg, restApp)
|
||||
},
|
||||
"/api/v1/share" -> ShareAuth(restApp.backend.share, cfg.auth) { token =>
|
||||
shareRoutes(cfg, restApp, token)
|
||||
},
|
||||
"/api/doc" -> templates.doc,
|
||||
"/app/assets" -> EnvMiddleware(WebjarRoutes.appRoutes[F]),
|
||||
"/app" -> EnvMiddleware(templates.app),
|
||||
"/sw.js" -> EnvMiddleware(templates.serviceWorker),
|
||||
"/" -> redirectTo("/app")
|
||||
).orNotFound
|
||||
|
||||
Logger.httpApp(logHeaders = false, logBody = false)(httpApp)
|
||||
}
|
||||
|
||||
def securedRoutes[F[_]: Async](
|
||||
cfg: Config,
|
||||
restApp: RestApp[F],
|
||||
wsB: WebSocketBuilder2[F],
|
||||
topic: Topic[F, OutputEvent],
|
||||
token: AuthToken
|
||||
): HttpRoutes[F] =
|
||||
Router(
|
||||
"ws" -> WebSocketRoutes(token, topic, wsB),
|
||||
"auth" -> LoginRoutes.session(restApp.backend.login, cfg, token),
|
||||
"tag" -> TagRoutes(restApp.backend, token),
|
||||
"equipment" -> EquipmentRoutes(restApp.backend, token),
|
||||
|
@ -0,0 +1,35 @@
|
||||
/*
|
||||
* Copyright 2020 Eike K. & Contributors
|
||||
*
|
||||
* SPDX-License-Identifier: AGPL-3.0-or-later
|
||||
*/
|
||||
|
||||
package docspell.restserver
|
||||
|
||||
import fs2.Stream
|
||||
import fs2.concurrent.Topic
|
||||
|
||||
import docspell.backend.msg.JobDone
|
||||
import docspell.common._
|
||||
import docspell.common.syntax.StringSyntax._
|
||||
import docspell.pubsub.api.PubSubT
|
||||
import docspell.restserver.ws.OutputEvent
|
||||
|
||||
/** Subscribes to those events from docspell that are forwarded to the websocket endpoints
|
||||
*/
|
||||
object Subscriptions {
|
||||
|
||||
def apply[F[_]](
|
||||
wsTopic: Topic[F, OutputEvent],
|
||||
pubSub: PubSubT[F]
|
||||
): Stream[F, Nothing] =
|
||||
jobDone(pubSub).through(wsTopic.publish)
|
||||
|
||||
def jobDone[F[_]](pubSub: PubSubT[F]): Stream[F, OutputEvent] =
|
||||
pubSub
|
||||
.subscribe(JobDone.topic)
|
||||
.filter(m => m.body.task == ProcessItemArgs.taskName)
|
||||
.map(m => m.body.args.parseJsonAs[ProcessItemArgs])
|
||||
.collect { case Right(a) => OutputEvent.ItemProcessed(a.meta.collective) }
|
||||
|
||||
}
|
@ -0,0 +1,11 @@
|
||||
/*
|
||||
* Copyright 2020 Eike K. & Contributors
|
||||
*
|
||||
* SPDX-License-Identifier: AGPL-3.0-or-later
|
||||
*/
|
||||
|
||||
package docspell.restserver.ws
|
||||
|
||||
sealed trait InputMessage
|
||||
|
||||
object InputMessage {}
|
@ -0,0 +1,32 @@
|
||||
/*
|
||||
* Copyright 2020 Eike K. & Contributors
|
||||
*
|
||||
* SPDX-License-Identifier: AGPL-3.0-or-later
|
||||
*/
|
||||
|
||||
package docspell.restserver.ws
|
||||
|
||||
import docspell.backend.auth.AuthToken
|
||||
import docspell.common._
|
||||
|
||||
sealed trait OutputEvent {
|
||||
def forCollective(token: AuthToken): Boolean
|
||||
def encode: String
|
||||
}
|
||||
|
||||
object OutputEvent {
|
||||
|
||||
case object KeepAlive extends OutputEvent {
|
||||
def forCollective(token: AuthToken): Boolean = true
|
||||
def encode: String = "keep-alive"
|
||||
}
|
||||
|
||||
final case class ItemProcessed(collective: Ident) extends OutputEvent {
|
||||
def forCollective(token: AuthToken): Boolean =
|
||||
token.account.collective == collective
|
||||
|
||||
def encode: String =
|
||||
"item-processed"
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,44 @@
|
||||
/*
|
||||
* Copyright 2020 Eike K. & Contributors
|
||||
*
|
||||
* SPDX-License-Identifier: AGPL-3.0-or-later
|
||||
*/
|
||||
|
||||
package docspell.restserver.ws
|
||||
|
||||
import cats.effect.Async
|
||||
import fs2.concurrent.Topic
|
||||
import fs2.{Pipe, Stream}
|
||||
|
||||
import docspell.backend.auth.AuthToken
|
||||
|
||||
import org.http4s.HttpRoutes
|
||||
import org.http4s.dsl.Http4sDsl
|
||||
import org.http4s.server.websocket.WebSocketBuilder2
|
||||
import org.http4s.websocket.WebSocketFrame
|
||||
import org.http4s.websocket.WebSocketFrame.Text
|
||||
|
||||
object WebSocketRoutes {
|
||||
|
||||
def apply[F[_]: Async](
|
||||
user: AuthToken,
|
||||
topic: Topic[F, OutputEvent],
|
||||
wsb: WebSocketBuilder2[F]
|
||||
): HttpRoutes[F] = {
|
||||
val dsl = new Http4sDsl[F] {}
|
||||
import dsl._
|
||||
|
||||
HttpRoutes.of { case GET -> Root =>
|
||||
val toClient: Stream[F, WebSocketFrame.Text] =
|
||||
topic
|
||||
.subscribe(500)
|
||||
.filter(_.forCollective(user))
|
||||
.map(msg => Text(msg.encode))
|
||||
|
||||
val toServer: Pipe[F, WebSocketFrame, Unit] =
|
||||
_.map(_ => ())
|
||||
|
||||
wsb.build(toClient, toServer)
|
||||
}
|
||||
}
|
||||
}
|
@ -0,0 +1,8 @@
|
||||
CREATE TABLE "pubsub" (
|
||||
"id" varchar(254) not null primary key,
|
||||
"node_id" varchar(254) not null,
|
||||
"url" varchar(254) not null,
|
||||
"topic" varchar(254) not null,
|
||||
"counter" int not null,
|
||||
unique("url", "topic")
|
||||
)
|
@ -0,0 +1,8 @@
|
||||
CREATE TABLE `pubsub` (
|
||||
`id` varchar(254) not null primary key,
|
||||
`node_id` varchar(254) not null,
|
||||
`url` varchar(254) not null,
|
||||
`topic` varchar(254) not null,
|
||||
`counter` int not null,
|
||||
unique(`url`, `topic`)
|
||||
)
|
@ -0,0 +1,8 @@
|
||||
CREATE TABLE "pubsub" (
|
||||
"id" varchar(254) not null primary key,
|
||||
"node_id" varchar(254) not null,
|
||||
"url" varchar(254) not null,
|
||||
"topic" varchar(254) not null,
|
||||
"counter" int not null,
|
||||
unique("url", "topic")
|
||||
)
|
@ -0,0 +1,90 @@
|
||||
/*
|
||||
* Copyright 2020 Eike K. & Contributors
|
||||
*
|
||||
* SPDX-License-Identifier: AGPL-3.0-or-later
|
||||
*/
|
||||
|
||||
package docspell.store.records
|
||||
|
||||
import cats.data.NonEmptyList
|
||||
import cats.implicits._
|
||||
|
||||
import docspell.common._
|
||||
import docspell.store.qb.DSL._
|
||||
import docspell.store.qb.{Column, DML, TableDef}
|
||||
|
||||
import doobie._
|
||||
import doobie.implicits._
|
||||
|
||||
/** A table for supporting naive pubsub across nodes. */
|
||||
final case class RPubSub(
|
||||
id: Ident,
|
||||
nodeId: Ident,
|
||||
url: LenientUri,
|
||||
topic: String,
|
||||
counter: Int
|
||||
)
|
||||
|
||||
object RPubSub {
|
||||
final case class Table(alias: Option[String]) extends TableDef {
|
||||
val tableName: String = "pubsub"
|
||||
|
||||
val id = Column[Ident]("id", this)
|
||||
val nodeId = Column[Ident]("node_id", this)
|
||||
val url = Column[LenientUri]("url", this)
|
||||
val topic = Column[String]("topic", this)
|
||||
val counter = Column[Int]("counter", this)
|
||||
|
||||
val all: NonEmptyList[Column[_]] =
|
||||
NonEmptyList.of(id, nodeId, url, topic, counter)
|
||||
}
|
||||
def as(alias: String): Table =
|
||||
Table(Some(alias))
|
||||
|
||||
val T: Table = Table(None)
|
||||
|
||||
def insert(r: RPubSub): ConnectionIO[Int] =
|
||||
DML.insert(T, T.all, sql"${r.id}, ${r.nodeId}, ${r.url}, ${r.topic}, ${r.counter}")
|
||||
|
||||
/** Insert all topics with counter = 0 */
|
||||
def initTopics(
|
||||
nodeId: Ident,
|
||||
url: LenientUri,
|
||||
topics: NonEmptyList[String]
|
||||
): ConnectionIO[Int] =
|
||||
DML.delete(T, T.nodeId === nodeId) *>
|
||||
topics.toList
|
||||
.traverse(t =>
|
||||
Ident
|
||||
.randomId[ConnectionIO]
|
||||
.flatMap(id => insert(RPubSub(id, nodeId, url, t, 0)))
|
||||
)
|
||||
.map(_.sum)
|
||||
|
||||
def increment(url: LenientUri, topics: NonEmptyList[String]): ConnectionIO[Int] =
|
||||
DML.update(
|
||||
T,
|
||||
T.url === url && T.topic.in(topics),
|
||||
DML.set(
|
||||
T.counter.increment(1)
|
||||
)
|
||||
)
|
||||
|
||||
def decrement(url: LenientUri, topics: NonEmptyList[String]): ConnectionIO[Int] =
|
||||
DML.update(
|
||||
T,
|
||||
T.url === url && T.topic.in(topics),
|
||||
DML.set(
|
||||
T.counter.decrement(1)
|
||||
)
|
||||
)
|
||||
|
||||
def findSubs(topic: String, excludeNode: Ident): ConnectionIO[List[LenientUri]] =
|
||||
run(
|
||||
select(T.url),
|
||||
from(T),
|
||||
T.topic === topic && T.counter > 0 && T.nodeId <> excludeNode
|
||||
)
|
||||
.query[LenientUri]
|
||||
.to[List]
|
||||
}
|
@ -3,7 +3,7 @@
|
||||
<withJansi>true</withJansi>
|
||||
|
||||
<encoder>
|
||||
<pattern>%highlight(%-5level) %cyan(%logger{15}) - %msg %n</pattern>
|
||||
<pattern>%highlight(%-5level) [%t{10}] %cyan(%logger{15}) - %msg %n</pattern>
|
||||
</encoder>
|
||||
</appender>
|
||||
|
||||
|
@ -64,6 +64,7 @@ type alias Model =
|
||||
, anonymousTheme : UiTheme
|
||||
, anonymousUiLang : UiLanguage
|
||||
, langMenuOpen : Bool
|
||||
, showNewItemsArrived : Bool
|
||||
}
|
||||
|
||||
|
||||
@ -126,6 +127,7 @@ init key url flags_ settings =
|
||||
, anonymousTheme = Data.UiTheme.Light
|
||||
, anonymousUiLang = Messages.UiLanguage.English
|
||||
, langMenuOpen = False
|
||||
, showNewItemsArrived = False
|
||||
}
|
||||
, Cmd.batch
|
||||
[ Cmd.map UserSettingsMsg uc
|
||||
@ -190,6 +192,8 @@ type Msg
|
||||
| SetLanguage UiLanguage
|
||||
| ClientSettingsSaveResp UiSettings (Result Http.Error BasicResult)
|
||||
| ReceiveBrowserSettings StoredUiSettings
|
||||
| ReceiveWsMessage String
|
||||
| ToggleShowNewItemsArrived
|
||||
|
||||
|
||||
defaultPage : Flags -> Page
|
||||
|
@ -15,6 +15,7 @@ import App.Data exposing (..)
|
||||
import Browser exposing (UrlRequest(..))
|
||||
import Browser.Navigation as Nav
|
||||
import Data.Flags
|
||||
import Data.ServerEvent exposing (ServerEvent(..))
|
||||
import Data.UiSettings exposing (UiSettings)
|
||||
import Data.UiTheme
|
||||
import Messages exposing (Messages)
|
||||
@ -308,6 +309,33 @@ updateWithSub msg model =
|
||||
in
|
||||
updateUserSettings texts lm model
|
||||
|
||||
ReceiveWsMessage data ->
|
||||
let
|
||||
se =
|
||||
Data.ServerEvent.fromString data
|
||||
in
|
||||
case se of
|
||||
Just ItemProcessed ->
|
||||
let
|
||||
newModel =
|
||||
{ model | showNewItemsArrived = True }
|
||||
in
|
||||
case model.page of
|
||||
HomePage ->
|
||||
updateHome texts Page.Home.Data.RefreshView newModel
|
||||
|
||||
_ ->
|
||||
( newModel, Cmd.none, Sub.none )
|
||||
|
||||
Nothing ->
|
||||
( model, Cmd.none, Sub.none )
|
||||
|
||||
ToggleShowNewItemsArrived ->
|
||||
( { model | showNewItemsArrived = not model.showNewItemsArrived }
|
||||
, Cmd.none
|
||||
, Sub.none
|
||||
)
|
||||
|
||||
|
||||
applyClientSettings : Messages -> Model -> UiSettings -> ( Model, Cmd Msg, Sub Msg )
|
||||
applyClientSettings texts model settings =
|
||||
|
@ -71,7 +71,19 @@ topNavUser auth model =
|
||||
, activeStyle = "hover:bg-blue-200 dark:hover:bg-bluegray-800 w-12"
|
||||
}
|
||||
, headerNavItem True model
|
||||
, div [ class "flex flex-grow justify-end" ]
|
||||
, div [ class "flex flex-grow justify-center" ]
|
||||
[ a
|
||||
[ class S.infoMessageBase
|
||||
, class "my-2 px-1 py-1 rounded-lg inline-block hover:opacity-50"
|
||||
, classList [ ( "hidden", not model.showNewItemsArrived ) ]
|
||||
, href "#"
|
||||
, onClick ToggleShowNewItemsArrived
|
||||
]
|
||||
[ i [ class "fa fa-exclamation-circle mr-1" ] []
|
||||
, text texts.app.newItemsArrived
|
||||
]
|
||||
]
|
||||
, div [ class "flex justify-end" ]
|
||||
[ userMenu texts.app auth model
|
||||
, dataMenu texts.app auth model
|
||||
]
|
||||
|
22
modules/webapp/src/main/elm/Data/ServerEvent.elm
Normal file
22
modules/webapp/src/main/elm/Data/ServerEvent.elm
Normal file
@ -0,0 +1,22 @@
|
||||
{-
|
||||
Copyright 2020 Eike K. & Contributors
|
||||
|
||||
SPDX-License-Identifier: AGPL-3.0-or-later
|
||||
-}
|
||||
|
||||
|
||||
module Data.ServerEvent exposing (ServerEvent(..), fromString)
|
||||
|
||||
|
||||
type ServerEvent
|
||||
= ItemProcessed
|
||||
|
||||
|
||||
fromString : String -> Maybe ServerEvent
|
||||
fromString str =
|
||||
case String.toLower str of
|
||||
"item-processed" ->
|
||||
Just ItemProcessed
|
||||
|
||||
_ ->
|
||||
Nothing
|
@ -93,4 +93,5 @@ subscriptions model =
|
||||
Sub.batch
|
||||
[ model.subs
|
||||
, Ports.receiveUiSettings ReceiveBrowserSettings
|
||||
, Ports.receiveWsMessage ReceiveWsMessage
|
||||
]
|
||||
|
@ -23,6 +23,7 @@ type alias Texts =
|
||||
, processingQueue : String
|
||||
, newInvites : String
|
||||
, help : String
|
||||
, newItemsArrived : String
|
||||
}
|
||||
|
||||
|
||||
@ -38,6 +39,7 @@ gb =
|
||||
, processingQueue = "Processing Queue"
|
||||
, newInvites = "New Invites"
|
||||
, help = "Help"
|
||||
, newItemsArrived = "New items arrived!"
|
||||
}
|
||||
|
||||
|
||||
@ -53,4 +55,5 @@ de =
|
||||
, processingQueue = "Verarbeitung"
|
||||
, newInvites = "Neue Einladung"
|
||||
, help = "Hilfe (English)"
|
||||
, newItemsArrived = "Neue Dokumente eingetroffen!"
|
||||
}
|
||||
|
@ -229,6 +229,7 @@ type Msg
|
||||
| PublishItemsMsg Comp.PublishItems.Msg
|
||||
| TogglePublishCurrentQueryView
|
||||
| PublishViewMsg Comp.PublishItems.Msg
|
||||
| RefreshView
|
||||
|
||||
|
||||
type SearchType
|
||||
|
@ -227,6 +227,22 @@ update mId key flags texts settings msg model =
|
||||
else
|
||||
doSearch param nm
|
||||
|
||||
RefreshView ->
|
||||
let
|
||||
param =
|
||||
{ flags = flags
|
||||
, searchType = model.lastSearchType
|
||||
, pageSize = settings.itemSearchPageSize
|
||||
, offset = model.searchOffset
|
||||
, scroll = False
|
||||
}
|
||||
in
|
||||
if model.searchInProgress then
|
||||
withSub ( model, Cmd.none )
|
||||
|
||||
else
|
||||
doSearch param model
|
||||
|
||||
ToggleSearchMenu ->
|
||||
let
|
||||
nextView =
|
||||
|
@ -11,6 +11,7 @@ port module Ports exposing
|
||||
, printElement
|
||||
, receiveCheckQueryResult
|
||||
, receiveUiSettings
|
||||
, receiveWsMessage
|
||||
, removeAccount
|
||||
, requestUiSettings
|
||||
, setAccount
|
||||
@ -55,6 +56,15 @@ and calls the print dialog.
|
||||
port printElement : String -> Cmd msg
|
||||
|
||||
|
||||
{-| Receives messages from the websocket.
|
||||
-}
|
||||
port receiveWsMessage : (String -> msg) -> Sub msg
|
||||
|
||||
|
||||
|
||||
--- Higher level functions based on ports
|
||||
|
||||
|
||||
setUiTheme : UiTheme -> Cmd msg
|
||||
setUiTheme theme =
|
||||
internalSetUiTheme (Data.UiTheme.toString theme)
|
||||
|
@ -63,9 +63,14 @@ warnMessageColors =
|
||||
" border-yellow-800 bg-yellow-50 text-yellow-800 dark:border-amber-200 dark:bg-amber-800 dark:text-amber-200 "
|
||||
|
||||
|
||||
infoMessageBase : String
|
||||
infoMessageBase =
|
||||
" border border-blue-800 bg-blue-100 text-blue-800 dark:border-lightblue-200 dark:bg-lightblue-800 dark:text-lightblue-200 dark:bg-opacity-25 "
|
||||
|
||||
|
||||
infoMessage : String
|
||||
infoMessage =
|
||||
" border border-blue-800 bg-blue-100 text-blue-800 dark:border-lightblue-200 dark:bg-lightblue-800 dark:text-lightblue-200 dark:bg-opacity-25 px-2 py-2 rounded "
|
||||
infoMessageBase ++ " px-2 py-2 rounded "
|
||||
|
||||
|
||||
message : String
|
||||
|
@ -12,9 +12,9 @@ function extend() {
|
||||
var result = {};
|
||||
for (var i = 0; i < arguments.length; i++) {
|
||||
forEachIn(arguments[i],
|
||||
function(obj, key) {
|
||||
result[key] = obj;
|
||||
});
|
||||
function(obj, key) {
|
||||
result[key] = obj;
|
||||
});
|
||||
}
|
||||
return result;
|
||||
}
|
||||
@ -41,13 +41,18 @@ elmApp.ports.internalSetUiTheme.subscribe(function(themeName) {
|
||||
});
|
||||
|
||||
elmApp.ports.setAccount.subscribe(function(authResult) {
|
||||
console.log("Add account from local storage");
|
||||
console.log("Add account to local storage");
|
||||
localStorage.setItem("account", JSON.stringify(authResult));
|
||||
|
||||
if (!dsWebSocket) {
|
||||
initWS();
|
||||
}
|
||||
});
|
||||
|
||||
elmApp.ports.removeAccount.subscribe(function() {
|
||||
console.log("Remove account from local storage");
|
||||
localStorage.removeItem("account");
|
||||
closeWS();
|
||||
});
|
||||
|
||||
elmApp.ports.requestUiSettings.subscribe(function(args) {
|
||||
@ -135,3 +140,28 @@ elmApp.ports.printElement.subscribe(function(id) {
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
|
||||
var dsWebSocket = null;
|
||||
function closeWS() {
|
||||
if (dsWebSocket) {
|
||||
console.log("Closing websocket connection");
|
||||
dsWebSocket.close(1000, "Done");
|
||||
dsWebSocket = null;
|
||||
}
|
||||
}
|
||||
function initWS() {
|
||||
closeWS();
|
||||
var protocol = (window.location.protocol === 'https:') ? 'wss:' : 'ws:';
|
||||
var url = protocol + '//' + window.location.host + '/api/v1/sec/ws';
|
||||
console.log("Initialize websocket at " + url);
|
||||
dsWebSocket = new WebSocket(url);
|
||||
dsWebSocket.addEventListener("message", function(event) {
|
||||
if (event.data != "keep-alive" && event.data) {
|
||||
elmApp.ports.receiveWsMessage.send(event.data);
|
||||
}
|
||||
});
|
||||
}
|
||||
// elmApp.ports.sendWsMessage.subscribe(function(msg) {
|
||||
// socket.send(msg);
|
||||
// });
|
||||
|
Loading…
x
Reference in New Issue
Block a user