mirror of
https://github.com/TheAnachronism/docspell.git
synced 2025-02-15 20:33:26 +00:00
parent
2ce6536d0b
commit
a9b0c0e086
@ -0,0 +1,41 @@
|
||||
/*
|
||||
* Copyright 2020 Eike K. & Contributors
|
||||
*
|
||||
* SPDX-License-Identifier: AGPL-3.0-or-later
|
||||
*/
|
||||
|
||||
package docspell.common.util
|
||||
|
||||
import cats.effect._
|
||||
import cats.syntax.all._
|
||||
import fs2._
|
||||
import fs2.concurrent.{Signal, SignallingRef}
|
||||
|
||||
object ResourceUse {
|
||||
def apply[F[_]: Concurrent, A](
|
||||
resource: Resource[F, A]
|
||||
): Implicits.UseSyntax[F, A] =
|
||||
new Implicits.UseSyntax(resource)
|
||||
|
||||
object Implicits {
|
||||
implicit final class UseSyntax[F[_]: Concurrent, A](resource: Resource[F, A]) {
|
||||
|
||||
/** Evaluates `resource` endlessly or until the signal turns `true`. */
|
||||
def useUntil(
|
||||
signal: Signal[F, Boolean],
|
||||
returnValue: Ref[F, ExitCode]
|
||||
): F[ExitCode] = {
|
||||
val server = Stream.resource(resource)
|
||||
val blockUntilTrue = signal.discrete.takeWhile(_ == false).drain
|
||||
val exit = fs2.Stream.eval(returnValue.get)
|
||||
(server *> (blockUntilTrue ++ exit)).compile.lastOrError
|
||||
}
|
||||
|
||||
def useForever(implicit ev: Async[F]): F[ExitCode] = for {
|
||||
termSignal <- SignallingRef.of[F, Boolean](false)
|
||||
exitValue <- Ref.of(ExitCode.Success)
|
||||
rc <- useUntil(termSignal, exitValue)
|
||||
} yield rc
|
||||
}
|
||||
}
|
||||
}
|
@ -14,6 +14,7 @@ import fs2.io.net.Network
|
||||
|
||||
import docspell.backend.msg.Topics
|
||||
import docspell.common.Pools
|
||||
import docspell.common.util.ResourceUse.Implicits._
|
||||
import docspell.joex.routes._
|
||||
import docspell.pubsub.naive.NaivePubSub
|
||||
import docspell.store.Store
|
||||
@ -74,15 +75,14 @@ object JoexServer {
|
||||
|
||||
Stream
|
||||
.resource(app)
|
||||
.flatMap { app =>
|
||||
Stream.resource {
|
||||
EmberServerBuilder
|
||||
.default[F]
|
||||
.withHost(cfg.bind.address)
|
||||
.withPort(cfg.bind.port)
|
||||
.withHttpApp(app.httpApp)
|
||||
.build
|
||||
}
|
||||
.evalMap { app =>
|
||||
EmberServerBuilder
|
||||
.default[F]
|
||||
.withHost(cfg.bind.address)
|
||||
.withPort(cfg.bind.port)
|
||||
.withHttpApp(app.httpApp)
|
||||
.build
|
||||
.useUntil(app.termSig, app.exitRef)
|
||||
}
|
||||
}.drain
|
||||
}
|
||||
|
@ -58,24 +58,16 @@ object RestServer {
|
||||
Stream(
|
||||
restApp.subscriptions,
|
||||
restApp.eventConsume(maxConcurrent = 2),
|
||||
Stream.resource {
|
||||
if (cfg.serverOptions.enableHttp2)
|
||||
EmberServerBuilder
|
||||
.default[F]
|
||||
.withHost(cfg.bind.address)
|
||||
.withPort(cfg.bind.port)
|
||||
.withMaxConnections(cfg.serverOptions.maxConnections)
|
||||
.withHttpWebSocketApp(createHttpApp(setting, pubSub, restApp))
|
||||
.withHttp2
|
||||
.build
|
||||
else
|
||||
EmberServerBuilder
|
||||
.default[F]
|
||||
.withHost(cfg.bind.address)
|
||||
.withPort(cfg.bind.port)
|
||||
.withMaxConnections(cfg.serverOptions.maxConnections)
|
||||
.withHttpWebSocketApp(createHttpApp(setting, pubSub, restApp))
|
||||
.build
|
||||
Stream.eval {
|
||||
EmberServerBuilder
|
||||
.default[F]
|
||||
.withHost(cfg.bind.address)
|
||||
.withPort(cfg.bind.port)
|
||||
.withMaxConnections(cfg.serverOptions.maxConnections)
|
||||
.withHttpWebSocketApp(createHttpApp(setting, pubSub, restApp))
|
||||
.toggleHttp2(cfg.serverOptions.enableHttp2)
|
||||
.build
|
||||
.useForever
|
||||
}
|
||||
)
|
||||
}
|
||||
@ -164,4 +156,9 @@ object RestServer {
|
||||
).pure[F]
|
||||
}
|
||||
}
|
||||
|
||||
implicit final class EmberServerBuilderExt[F[_]](self: EmberServerBuilder[F]) {
|
||||
def toggleHttp2(flag: Boolean) =
|
||||
if (flag) self.withHttp2 else self.withoutHttp2
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user