mirror of
https://github.com/TheAnachronism/docspell.git
synced 2025-04-04 18:39:33 +00:00
Make internal endpoints available to nodes only
This commit is contained in:
parent
61c01ad79b
commit
7b8afe8371
modules
joex/src/main/scala/docspell/joex
pubsub/naive/src
main/scala/docspell/pubsub/naive
test/scala/docspell/pubsub/naive
restserver/src/main/scala/docspell/restserver
store/src/main
resources/db/migration
h2
mariadb
postgresql
scala/docspell/store/records
@ -18,6 +18,7 @@ import docspell.extract.ExtractConfig
|
||||
import docspell.ftssolr.SolrConfig
|
||||
import docspell.joex.analysis.RegexNerFile
|
||||
import docspell.joex.hk.HouseKeepingConfig
|
||||
import docspell.joex.routes.InternalHeader
|
||||
import docspell.joex.scheduler.{PeriodicSchedulerConfig, SchedulerConfig}
|
||||
import docspell.joex.updatecheck.UpdateCheckConfig
|
||||
import docspell.pubsub.naive.PubSubConfig
|
||||
@ -42,8 +43,13 @@ case class Config(
|
||||
updateCheck: UpdateCheckConfig
|
||||
) {
|
||||
|
||||
def pubSubConfig: PubSubConfig =
|
||||
PubSubConfig(appId, baseUrl / "internal" / "pubsub", 100)
|
||||
def pubSubConfig(headerValue: Ident): PubSubConfig =
|
||||
PubSubConfig(
|
||||
appId,
|
||||
baseUrl / "internal" / "pubsub",
|
||||
100,
|
||||
InternalHeader.header(headerValue.id)
|
||||
)
|
||||
}
|
||||
|
||||
object Config {
|
||||
|
@ -16,6 +16,7 @@ import docspell.common.Pools
|
||||
import docspell.joex.routes._
|
||||
import docspell.pubsub.naive.NaivePubSub
|
||||
import docspell.store.Store
|
||||
import docspell.store.records.RInternalSetting
|
||||
|
||||
import org.http4s.HttpApp
|
||||
import org.http4s.blaze.client.BlazeClientBuilder
|
||||
@ -43,13 +44,20 @@ object JoexServer {
|
||||
cfg.files.chunkSize,
|
||||
pools.connectEC
|
||||
)
|
||||
settings <- Resource.eval(store.transact(RInternalSetting.create))
|
||||
httpClient <- BlazeClientBuilder[F].resource
|
||||
pubSub <- NaivePubSub(cfg.pubSubConfig, store, httpClient)(Topics.all.map(_.topic))
|
||||
pubSub <- NaivePubSub(
|
||||
cfg.pubSubConfig(settings.internalRouteKey),
|
||||
store,
|
||||
httpClient
|
||||
)(Topics.all.map(_.topic))
|
||||
|
||||
joexApp <- JoexAppImpl.create[F](cfg, signal, store, httpClient, pubSub)
|
||||
|
||||
httpApp = Router(
|
||||
"/internal/pubsub" -> pubSub.receiveRoute,
|
||||
"/internal" -> InternalHeader(settings.internalRouteKey) {
|
||||
Router("pubsub" -> pubSub.receiveRoute)
|
||||
},
|
||||
"/api/info" -> InfoRoutes(cfg),
|
||||
"/api/v1" -> JoexRoutes(joexApp)
|
||||
).orNotFound
|
||||
|
@ -0,0 +1,59 @@
|
||||
/*
|
||||
* Copyright 2020 Eike K. & Contributors
|
||||
*
|
||||
* SPDX-License-Identifier: AGPL-3.0-or-later
|
||||
*/
|
||||
|
||||
package docspell.joex.routes
|
||||
|
||||
import cats.data.{Kleisli, OptionT}
|
||||
import cats.effect.kernel.Async
|
||||
import cats.implicits._
|
||||
|
||||
import docspell.common.Ident
|
||||
|
||||
import org.http4s._
|
||||
import org.http4s.dsl.Http4sDsl
|
||||
import org.http4s.server.AuthMiddleware
|
||||
import org.typelevel.ci._
|
||||
|
||||
// duplicated in restserver project
|
||||
object InternalHeader {
|
||||
|
||||
private val headerName = ci"Docspell-Internal-Api"
|
||||
|
||||
def header(value: String): Header.Raw =
|
||||
Header.Raw(headerName, value)
|
||||
|
||||
def apply[F[_]: Async](key: Ident)(routes: HttpRoutes[F]): HttpRoutes[F] = {
|
||||
val dsl: Http4sDsl[F] = new Http4sDsl[F] {}
|
||||
import dsl._
|
||||
|
||||
val authUser = checkSecret[F](key)
|
||||
|
||||
val onFailure: AuthedRoutes[String, F] =
|
||||
Kleisli(req => OptionT.liftF(NotFound(req.context)))
|
||||
|
||||
val middleware: AuthMiddleware[F, Unit] =
|
||||
AuthMiddleware(authUser, onFailure)
|
||||
|
||||
middleware(AuthedRoutes(authReq => routes.run(authReq.req)))
|
||||
}
|
||||
|
||||
private def checkSecret[F[_]: Async](
|
||||
key: Ident
|
||||
): Kleisli[F, Request[F], Either[String, Unit]] =
|
||||
Kleisli(req =>
|
||||
extractSecret[F](req)
|
||||
.filter(compareSecret(key.id))
|
||||
.toRight("Secret invalid")
|
||||
.map(_ => ())
|
||||
.pure[F]
|
||||
)
|
||||
|
||||
private def extractSecret[F[_]](req: Request[F]): Option[String] =
|
||||
req.headers.get(headerName).map(_.head.value)
|
||||
|
||||
private def compareSecret(s1: String)(s2: String): Boolean =
|
||||
s1 == s2
|
||||
}
|
@ -159,7 +159,7 @@ final class NaivePubSub[F[_]: Async](
|
||||
_ <- logger.trace(s"Publishing to remote urls ${urls.map(_.asString)}: $msg")
|
||||
reqs = urls
|
||||
.map(u => Uri.unsafeFromString(u.asString))
|
||||
.map(uri => POST(List(msg), uri))
|
||||
.map(uri => POST(List(msg), uri).putHeaders(cfg.reqHeader))
|
||||
resList <- reqs.traverse(req => client.status(req).attempt)
|
||||
_ <- resList.traverse {
|
||||
case Right(s) =>
|
||||
|
@ -8,4 +8,11 @@ package docspell.pubsub.naive
|
||||
|
||||
import docspell.common.{Ident, LenientUri}
|
||||
|
||||
case class PubSubConfig(nodeId: Ident, url: LenientUri, subscriberQueueSize: Int)
|
||||
import org.http4s.Header
|
||||
|
||||
case class PubSubConfig(
|
||||
nodeId: Ident,
|
||||
url: LenientUri,
|
||||
subscriberQueueSize: Int,
|
||||
reqHeader: Header.Raw
|
||||
)
|
||||
|
@ -13,8 +13,9 @@ import docspell.pubsub.api._
|
||||
import docspell.store.{Store, StoreFixture}
|
||||
|
||||
import munit.CatsEffectSuite
|
||||
import org.http4s.Response
|
||||
import org.http4s.client.Client
|
||||
import org.http4s.{Header, Response}
|
||||
import org.typelevel.ci._
|
||||
|
||||
trait Fixtures extends HttpClientOps { self: CatsEffectSuite =>
|
||||
|
||||
@ -67,7 +68,8 @@ object Fixtures {
|
||||
PubSubConfig(
|
||||
Ident.unsafe(nodeId),
|
||||
LenientUri.unsafe(s"http://$nodeId/"),
|
||||
0
|
||||
0,
|
||||
Header.Raw(ci"Docspell-Internal", "abc")
|
||||
)
|
||||
|
||||
def storeResource: Resource[IO, Store[IO]] =
|
||||
|
@ -14,6 +14,7 @@ import docspell.oidc.ProviderConfig
|
||||
import docspell.pubsub.naive.PubSubConfig
|
||||
import docspell.restserver.Config.OpenIdConfig
|
||||
import docspell.restserver.auth.OpenId
|
||||
import docspell.restserver.http4s.InternalHeader
|
||||
|
||||
import com.comcast.ip4s.IpAddress
|
||||
|
||||
@ -35,8 +36,13 @@ case class Config(
|
||||
def openIdEnabled: Boolean =
|
||||
openid.exists(_.enabled)
|
||||
|
||||
def pubSubConfig: PubSubConfig =
|
||||
PubSubConfig(appId, baseUrl / "internal" / "pubsub", 100)
|
||||
def pubSubConfig(headerValue: Ident): PubSubConfig =
|
||||
PubSubConfig(
|
||||
appId,
|
||||
baseUrl / "internal" / "pubsub",
|
||||
100,
|
||||
InternalHeader.header(headerValue.id)
|
||||
)
|
||||
}
|
||||
|
||||
object Config {
|
||||
|
@ -19,12 +19,13 @@ import docspell.common._
|
||||
import docspell.oidc.CodeFlowRoutes
|
||||
import docspell.pubsub.naive.NaivePubSub
|
||||
import docspell.restserver.auth.OpenId
|
||||
import docspell.restserver.http4s.EnvMiddleware
|
||||
import docspell.restserver.http4s.{EnvMiddleware, InternalHeader}
|
||||
import docspell.restserver.routes._
|
||||
import docspell.restserver.webapp._
|
||||
import docspell.restserver.ws.OutputEvent.KeepAlive
|
||||
import docspell.restserver.ws.{OutputEvent, WebSocketRoutes}
|
||||
import docspell.store.Store
|
||||
import docspell.store.records.RInternalSetting
|
||||
|
||||
import org.http4s._
|
||||
import org.http4s.blaze.client.BlazeClientBuilder
|
||||
@ -50,14 +51,14 @@ object RestServer {
|
||||
server =
|
||||
Stream
|
||||
.resource(createApp(cfg, pools))
|
||||
.flatMap { case (restApp, pubSub, httpClient) =>
|
||||
.flatMap { case (restApp, pubSub, httpClient, setting) =>
|
||||
Stream(
|
||||
Subscriptions(wsTopic, restApp.backend.pubSub),
|
||||
BlazeServerBuilder[F]
|
||||
.bindHttp(cfg.bind.port, cfg.bind.address)
|
||||
.withoutBanner
|
||||
.withHttpWebSocketApp(
|
||||
createHttpApp(cfg, httpClient, pubSub, restApp, wsTopic)
|
||||
createHttpApp(cfg, setting, httpClient, pubSub, restApp, wsTopic)
|
||||
)
|
||||
.serve
|
||||
.drain
|
||||
@ -71,7 +72,7 @@ object RestServer {
|
||||
def createApp[F[_]: Async](
|
||||
cfg: Config,
|
||||
pools: Pools
|
||||
): Resource[F, (RestApp[F], NaivePubSub[F], Client[F])] =
|
||||
): Resource[F, (RestApp[F], NaivePubSub[F], Client[F], RInternalSetting)] =
|
||||
for {
|
||||
httpClient <- BlazeClientBuilder[F].resource
|
||||
store <- Store.create[F](
|
||||
@ -79,12 +80,18 @@ object RestServer {
|
||||
cfg.backend.files.chunkSize,
|
||||
pools.connectEC
|
||||
)
|
||||
pubSub <- NaivePubSub(cfg.pubSubConfig, store, httpClient)(Topics.all.map(_.topic))
|
||||
setting <- Resource.eval(store.transact(RInternalSetting.create))
|
||||
pubSub <- NaivePubSub(
|
||||
cfg.pubSubConfig(setting.internalRouteKey),
|
||||
store,
|
||||
httpClient
|
||||
)(Topics.all.map(_.topic))
|
||||
restApp <- RestAppImpl.create[F](cfg, store, httpClient, pubSub)
|
||||
} yield (restApp, pubSub, httpClient)
|
||||
} yield (restApp, pubSub, httpClient, setting)
|
||||
|
||||
def createHttpApp[F[_]: Async](
|
||||
cfg: Config,
|
||||
internSettings: RInternalSetting,
|
||||
httpClient: Client[F],
|
||||
pubSub: NaivePubSub[F],
|
||||
restApp: RestApp[F],
|
||||
@ -94,7 +101,9 @@ object RestServer {
|
||||
) = {
|
||||
val templates = TemplateRoutes[F](cfg)
|
||||
val httpApp = Router(
|
||||
"/internal/pubsub" -> pubSub.receiveRoute,
|
||||
"/internal" -> InternalHeader(internSettings.internalRouteKey) {
|
||||
internalRoutes(pubSub)
|
||||
},
|
||||
"/api/info" -> routes.InfoRoutes(),
|
||||
"/api/v1/open/" -> openRoutes(cfg, httpClient, restApp),
|
||||
"/api/v1/sec/" -> Authenticate(restApp.backend.login, cfg.auth) { token =>
|
||||
@ -116,6 +125,11 @@ object RestServer {
|
||||
Logger.httpApp(logHeaders = false, logBody = false)(httpApp)
|
||||
}
|
||||
|
||||
def internalRoutes[F[_]: Async](pubSub: NaivePubSub[F]): HttpRoutes[F] =
|
||||
Router(
|
||||
"pubsub" -> pubSub.receiveRoute
|
||||
)
|
||||
|
||||
def securedRoutes[F[_]: Async](
|
||||
cfg: Config,
|
||||
restApp: RestApp[F],
|
||||
|
@ -8,6 +8,7 @@ package docspell.restserver
|
||||
|
||||
import fs2.Stream
|
||||
import fs2.concurrent.Topic
|
||||
|
||||
import docspell.backend.msg.JobDone
|
||||
import docspell.common.ProcessItemArgs
|
||||
import docspell.pubsub.api.PubSubT
|
||||
|
@ -0,0 +1,59 @@
|
||||
/*
|
||||
* Copyright 2020 Eike K. & Contributors
|
||||
*
|
||||
* SPDX-License-Identifier: AGPL-3.0-or-later
|
||||
*/
|
||||
|
||||
package docspell.restserver.http4s
|
||||
|
||||
import cats.data.{Kleisli, OptionT}
|
||||
import cats.effect.kernel.Async
|
||||
import cats.implicits._
|
||||
|
||||
import docspell.common.Ident
|
||||
|
||||
import org.http4s._
|
||||
import org.http4s.dsl.Http4sDsl
|
||||
import org.http4s.server.AuthMiddleware
|
||||
import org.typelevel.ci._
|
||||
|
||||
// duplicated in joex project
|
||||
object InternalHeader {
|
||||
|
||||
private val headerName = ci"Docspell-Internal-Api"
|
||||
|
||||
def header(value: String): Header.Raw =
|
||||
Header.Raw(headerName, value)
|
||||
|
||||
def apply[F[_]: Async](key: Ident)(routes: HttpRoutes[F]): HttpRoutes[F] = {
|
||||
val dsl: Http4sDsl[F] = new Http4sDsl[F] {}
|
||||
import dsl._
|
||||
|
||||
val authUser = checkSecret[F](key)
|
||||
|
||||
val onFailure: AuthedRoutes[String, F] =
|
||||
Kleisli(req => OptionT.liftF(NotFound(req.context)))
|
||||
|
||||
val middleware: AuthMiddleware[F, Unit] =
|
||||
AuthMiddleware(authUser, onFailure)
|
||||
|
||||
middleware(AuthedRoutes(authReq => routes.run(authReq.req)))
|
||||
}
|
||||
|
||||
private def checkSecret[F[_]: Async](
|
||||
key: Ident
|
||||
): Kleisli[F, Request[F], Either[String, Unit]] =
|
||||
Kleisli(req =>
|
||||
extractSecret[F](req)
|
||||
.filter(compareSecret(key.id))
|
||||
.toRight("Secret invalid")
|
||||
.map(_ => ())
|
||||
.pure[F]
|
||||
)
|
||||
|
||||
private def extractSecret[F[_]](req: Request[F]): Option[String] =
|
||||
req.headers.get(headerName).map(_.head.value)
|
||||
|
||||
private def compareSecret(s1: String)(s2: String): Boolean =
|
||||
s1 == s2
|
||||
}
|
@ -0,0 +1,4 @@
|
||||
CREATE TABLE "internal_setting" (
|
||||
"id" varchar(254) not null primary key,
|
||||
"internal_route_key" varchar(254) not null
|
||||
)
|
@ -0,0 +1,4 @@
|
||||
CREATE TABLE `internal_setting` (
|
||||
`id` varchar(254) not null primary key,
|
||||
`internal_route_key` varchar(254) not null
|
||||
)
|
@ -0,0 +1,4 @@
|
||||
CREATE TABLE "internal_setting" (
|
||||
"id" varchar(254) not null primary key,
|
||||
"internal_route_key" varchar(254) not null
|
||||
)
|
@ -0,0 +1,70 @@
|
||||
/*
|
||||
* Copyright 2020 Eike K. & Contributors
|
||||
*
|
||||
* SPDX-License-Identifier: AGPL-3.0-or-later
|
||||
*/
|
||||
|
||||
package docspell.store.records
|
||||
|
||||
import cats.data.NonEmptyList
|
||||
import cats.effect.implicits._
|
||||
import cats.implicits._
|
||||
|
||||
import docspell.common._
|
||||
import docspell.store.qb.DSL._
|
||||
import docspell.store.qb._
|
||||
|
||||
import doobie._
|
||||
import doobie.implicits._
|
||||
|
||||
final case class RInternalSetting(
|
||||
id: Ident,
|
||||
internalRouteKey: Ident
|
||||
)
|
||||
|
||||
object RInternalSetting {
|
||||
final case class Table(alias: Option[String]) extends TableDef {
|
||||
val tableName = "internal_setting"
|
||||
|
||||
val id = Column[Ident]("id", this)
|
||||
val internalRouteKey = Column[Ident]("internal_route_key", this)
|
||||
|
||||
val all: NonEmptyList[Column[_]] =
|
||||
NonEmptyList.of(id, internalRouteKey)
|
||||
}
|
||||
|
||||
def as(alias: String): Table =
|
||||
Table(Some(alias))
|
||||
|
||||
val T = Table(None)
|
||||
|
||||
private[this] val currentId = Ident.unsafe("4835448a-ff3a-4c2b-ad48-d06bf0d5720a")
|
||||
|
||||
private def read: Query0[RInternalSetting] =
|
||||
Select(select(T.all), from(T), T.id === currentId).build
|
||||
.query[RInternalSetting]
|
||||
|
||||
private def insert: ConnectionIO[Int] =
|
||||
for {
|
||||
rkey <- Ident.randomId[ConnectionIO]
|
||||
r = RInternalSetting(currentId, rkey)
|
||||
n <- DML.insert(T, T.all, sql"${r.id},${r.internalRouteKey}")
|
||||
} yield n
|
||||
|
||||
def create: ConnectionIO[RInternalSetting] =
|
||||
for {
|
||||
s0 <- read.option
|
||||
s <- s0 match {
|
||||
case Some(a) => a.pure[ConnectionIO]
|
||||
case None =>
|
||||
insert.attemptSql *> withoutTransaction(read.unique)
|
||||
}
|
||||
} yield s
|
||||
|
||||
// https://tpolecat.github.io/doobie/docs/18-FAQ.html#how-do-i-run-something-outside-of-a-transaction-
|
||||
/** Take a program `p` and return an equivalent one that first commits any ongoing
|
||||
* transaction, runs `p` without transaction handling, then starts a new transaction.
|
||||
*/
|
||||
private def withoutTransaction[A](p: ConnectionIO[A]): ConnectionIO[A] =
|
||||
FC.setAutoCommit(true).bracket(_ => p)(_ => FC.setAutoCommit(false))
|
||||
}
|
Loading…
x
Reference in New Issue
Block a user