diff --git a/modules/common/src/main/scala/docspell/common/Pools.scala b/modules/common/src/main/scala/docspell/common/Pools.scala new file mode 100644 index 00000000..fd30b9fb --- /dev/null +++ b/modules/common/src/main/scala/docspell/common/Pools.scala @@ -0,0 +1,13 @@ +package docspell.common + +import cats.effect._ +import scala.concurrent.ExecutionContext + +/** Captures thread pools to use in an application. + */ +case class Pools( + connectEC: ExecutionContext, + httpClientEC: ExecutionContext, + blocker: Blocker, + restEC: ExecutionContext +) diff --git a/modules/common/src/main/scala/docspell/common/ThreadFactories.scala b/modules/common/src/main/scala/docspell/common/ThreadFactories.scala index 57f225c5..e3e4cc9d 100644 --- a/modules/common/src/main/scala/docspell/common/ThreadFactories.scala +++ b/modules/common/src/main/scala/docspell/common/ThreadFactories.scala @@ -4,6 +4,9 @@ import java.util.concurrent.atomic.AtomicLong import java.util.concurrent.{Executors, ThreadFactory} import cats.effect._ import scala.concurrent._ +import java.util.concurrent.ForkJoinPool +import java.util.concurrent.ForkJoinPool.ForkJoinWorkerThreadFactory +import java.util.concurrent.ForkJoinWorkerThread object ThreadFactories { @@ -19,6 +22,18 @@ object ThreadFactories { } } + def ofNameFJ(prefix: String): ForkJoinWorkerThreadFactory = + new ForkJoinWorkerThreadFactory { + val tf = ForkJoinPool.defaultForkJoinWorkerThreadFactory + val counter = new AtomicLong(0) + + def newThread(pool: ForkJoinPool): ForkJoinWorkerThread = { + val t = tf.newThread(pool) + t.setName(s"$prefix-${counter.getAndIncrement()}") + t + } + } + def executorResource[F[_]: Sync]( c: => ExecutionContextExecutorService ): Resource[F, ExecutionContextExecutorService] = @@ -38,4 +53,19 @@ object ThreadFactories { executorResource( ExecutionContext.fromExecutorService(Executors.newFixedThreadPool(n, tf)) ) + + def workSteal[F[_]: Sync]( + n: Int, + tf: ForkJoinWorkerThreadFactory + ): Resource[F, ExecutionContextExecutorService] = + executorResource( + ExecutionContext.fromExecutorService( + new ForkJoinPool(n, tf, null, true) + ) + ) + + def workSteal[F[_]: Sync]( + tf: ForkJoinWorkerThreadFactory + ): Resource[F, ExecutionContextExecutorService] = + workSteal[F](Runtime.getRuntime().availableProcessors() + 1, tf) } diff --git a/modules/joex/src/main/scala/docspell/joex/JoexServer.scala b/modules/joex/src/main/scala/docspell/joex/JoexServer.scala index c5966549..58884c9e 100644 --- a/modules/joex/src/main/scala/docspell/joex/JoexServer.scala +++ b/modules/joex/src/main/scala/docspell/joex/JoexServer.scala @@ -2,6 +2,7 @@ package docspell.joex import cats.effect._ import cats.effect.concurrent.Ref +import docspell.common.Pools import docspell.joex.routes._ import org.http4s.server.blaze.BlazeServerBuilder import org.http4s.implicits._ @@ -11,8 +12,6 @@ import org.http4s.HttpApp import org.http4s.server.middleware.Logger import org.http4s.server.Router -import scala.concurrent.ExecutionContext - object JoexServer { private case class App[F[_]]( @@ -23,15 +22,14 @@ object JoexServer { def stream[F[_]: ConcurrentEffect: ContextShift]( cfg: Config, - connectEC: ExecutionContext, - clientEC: ExecutionContext, - blocker: Blocker + pools: Pools )(implicit T: Timer[F]): Stream[F, Nothing] = { val app = for { signal <- Resource.liftF(SignallingRef[F, Boolean](false)) exitCode <- Resource.liftF(Ref[F].of(ExitCode.Success)) - joexApp <- JoexAppImpl.create[F](cfg, signal, connectEC, clientEC, blocker) + joexApp <- JoexAppImpl + .create[F](cfg, signal, pools.connectEC, pools.httpClientEC, pools.blocker) httpApp = Router( "/api/info" -> InfoRoutes(), @@ -46,7 +44,7 @@ object JoexServer { Stream .resource(app) .flatMap(app => - BlazeServerBuilder[F] + BlazeServerBuilder[F](pools.restEC) .bindHttp(cfg.bind.port, cfg.bind.address) .withHttpApp(app.httpApp) .withoutBanner diff --git a/modules/joex/src/main/scala/docspell/joex/Main.scala b/modules/joex/src/main/scala/docspell/joex/Main.scala index e4f234b6..2a324a3d 100644 --- a/modules/joex/src/main/scala/docspell/joex/Main.scala +++ b/modules/joex/src/main/scala/docspell/joex/Main.scala @@ -4,9 +4,8 @@ import cats.effect.{Blocker, ExitCode, IO, IOApp} import cats.implicits._ import java.nio.file.{Files, Paths} -import scala.concurrent.ExecutionContext -import docspell.common.{Banner, ThreadFactories} +import docspell.common.{Banner, Pools, ThreadFactories} import org.log4s._ object Main extends IOApp { @@ -16,6 +15,8 @@ object Main extends IOApp { ThreadFactories.cached[IO](ThreadFactories.ofName("docspell-joex-blocking")) val connectEC = ThreadFactories.fixed[IO](5, ThreadFactories.ofName("docspell-joex-dbconnect")) + val restserverEC = + ThreadFactories.workSteal[IO](ThreadFactories.ofNameFJ("docspell-joex-server")) def run(args: List[String]) = { args match { @@ -52,19 +53,14 @@ object Main extends IOApp { cec <- connectEC bec <- blockingEC blocker = Blocker.liftExecutorService(bec) - } yield Pools(cec, bec, blocker) + rec <- restserverEC + } yield Pools(cec, bec, blocker, rec) pools.use(p => JoexServer - .stream[IO](cfg, p.connectEC, p.clientEC, p.blocker) + .stream[IO](cfg, p) .compile .drain .as(ExitCode.Success) ) } - - case class Pools( - connectEC: ExecutionContext, - clientEC: ExecutionContext, - blocker: Blocker - ) } diff --git a/modules/restserver/src/main/scala/docspell/restserver/Main.scala b/modules/restserver/src/main/scala/docspell/restserver/Main.scala index 99ed9bb3..eceeba3e 100644 --- a/modules/restserver/src/main/scala/docspell/restserver/Main.scala +++ b/modules/restserver/src/main/scala/docspell/restserver/Main.scala @@ -3,10 +3,9 @@ package docspell.restserver import cats.effect._ import cats.implicits._ -import scala.concurrent.ExecutionContext import java.nio.file.{Files, Paths} -import docspell.common.{Banner, ThreadFactories} +import docspell.common.{Banner, Pools, ThreadFactories} import org.log4s._ object Main extends IOApp { @@ -16,6 +15,8 @@ object Main extends IOApp { ThreadFactories.cached[IO](ThreadFactories.ofName("docspell-restserver-blocking")) val connectEC = ThreadFactories.fixed[IO](5, ThreadFactories.ofName("docspell-dbconnect")) + val restserverEC = + ThreadFactories.workSteal[IO](ThreadFactories.ofNameFJ("docspell-restserver")) def run(args: List[String]) = { args match { @@ -51,21 +52,16 @@ object Main extends IOApp { cec <- connectEC bec <- blockingEC blocker = Blocker.liftExecutorService(bec) - } yield Pools(cec, bec, blocker) + rec <- restserverEC + } yield Pools(cec, bec, blocker, rec) logger.info(s"\n${banner.render("***>")}") pools.use(p => RestServer - .stream[IO](cfg, p.connectEC, p.clientEC, p.blocker) + .stream[IO](cfg, p) .compile .drain .as(ExitCode.Success) ) } - - case class Pools( - connectEC: ExecutionContext, - clientEC: ExecutionContext, - blocker: Blocker - ) } diff --git a/modules/restserver/src/main/scala/docspell/restserver/RestServer.scala b/modules/restserver/src/main/scala/docspell/restserver/RestServer.scala index 53ee45e3..2abbf53d 100644 --- a/modules/restserver/src/main/scala/docspell/restserver/RestServer.scala +++ b/modules/restserver/src/main/scala/docspell/restserver/RestServer.scala @@ -2,6 +2,7 @@ package docspell.restserver import cats.effect._ import cats.data.{Kleisli, OptionT} +import docspell.common.Pools import docspell.backend.auth.AuthToken import docspell.restserver.routes._ import docspell.restserver.webapp._ @@ -13,20 +14,17 @@ import org.http4s.server.Router import org.http4s.server.blaze.BlazeServerBuilder import org.http4s.server.middleware.Logger -import scala.concurrent.ExecutionContext - object RestServer { def stream[F[_]: ConcurrentEffect]( cfg: Config, - connectEC: ExecutionContext, - httpClientEc: ExecutionContext, - blocker: Blocker + pools: Pools )(implicit T: Timer[F], CS: ContextShift[F]): Stream[F, Nothing] = { - val templates = TemplateRoutes[F](blocker, cfg) + val templates = TemplateRoutes[F](pools.blocker, cfg) val app = for { - restApp <- RestAppImpl.create[F](cfg, connectEC, httpClientEc, blocker) + restApp <- RestAppImpl + .create[F](cfg, pools.connectEC, pools.httpClientEC, pools.blocker) httpApp = Router( "/api/info" -> routes.InfoRoutes(), "/api/v1/open/" -> openRoutes(cfg, restApp), @@ -34,7 +32,7 @@ object RestServer { securedRoutes(cfg, restApp, token) }, "/api/doc" -> templates.doc, - "/app/assets" -> WebjarRoutes.appRoutes[F](blocker), + "/app/assets" -> WebjarRoutes.appRoutes[F](pools.blocker), "/app" -> templates.app, "/" -> redirectTo("/app") ).orNotFound @@ -46,7 +44,7 @@ object RestServer { Stream .resource(app) .flatMap(httpApp => - BlazeServerBuilder[F] + BlazeServerBuilder[F](pools.restEC) .bindHttp(cfg.bind.port, cfg.bind.address) .withHttpApp(httpApp) .withoutBanner diff --git a/project/Dependencies.scala b/project/Dependencies.scala index aff9894a..7a38ea5e 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -16,7 +16,7 @@ object Dependencies { val FlywayVersion = "6.4.0" val Fs2Version = "2.3.0" val H2Version = "1.4.200" - val Http4sVersion = "0.21.3" + val Http4sVersion = "0.21.4" val Icu4jVersion = "67.1" val JsoupVersion = "1.13.1" val KindProjectorVersion = "0.10.3"