diff --git a/modules/common/src/main/scala/docspell/common/ThreadFactories.scala b/modules/common/src/main/scala/docspell/common/ThreadFactories.scala index c1ab24df..4fb367bc 100644 --- a/modules/common/src/main/scala/docspell/common/ThreadFactories.scala +++ b/modules/common/src/main/scala/docspell/common/ThreadFactories.scala @@ -2,6 +2,8 @@ package docspell.common import java.util.concurrent.atomic.AtomicLong import java.util.concurrent.{Executors, ThreadFactory} +import cats.effect._ +import scala.concurrent._ object ThreadFactories { @@ -17,4 +19,16 @@ object ThreadFactories { } } + def executorResource[F[_]: Sync]( + c: => ExecutionContextExecutorService + ): Resource[F, ExecutionContextExecutorService] = + Resource.make(Sync[F].delay(c))(ec => Sync[F].delay(ec.shutdown)) + + def cached[F[_]: Sync](tf: ThreadFactory): Resource[F, ExecutionContextExecutorService] = + executorResource( + ExecutionContext.fromExecutorService(Executors.newCachedThreadPool(tf)) + ) + + def fixed[F[_]: Sync](n: Int, tf: ThreadFactory): Resource[F, ExecutionContextExecutorService] = + executorResource(ExecutionContext.fromExecutorService(Executors.newFixedThreadPool(n, tf))) } diff --git a/modules/joex/src/main/scala/docspell/joex/Main.scala b/modules/joex/src/main/scala/docspell/joex/Main.scala index 4c1f7319..40bdb0b7 100644 --- a/modules/joex/src/main/scala/docspell/joex/Main.scala +++ b/modules/joex/src/main/scala/docspell/joex/Main.scala @@ -3,9 +3,8 @@ package docspell.joex import cats.effect.{Blocker, ExitCode, IO, IOApp} import cats.implicits._ -import scala.concurrent.ExecutionContext -import java.util.concurrent.Executors import java.nio.file.{Files, Paths} +import scala.concurrent.ExecutionContext import docspell.common.{Banner, ThreadFactories} import org.log4s._ @@ -13,13 +12,8 @@ import org.log4s._ object Main extends IOApp { private[this] val logger = getLogger - val blockingEC: ExecutionContext = ExecutionContext.fromExecutor( - Executors.newCachedThreadPool(ThreadFactories.ofName("docspell-joex-blocking")) - ) - val blocker = Blocker.liftExecutionContext(blockingEC) - val connectEC: ExecutionContext = ExecutionContext.fromExecutorService( - Executors.newFixedThreadPool(5, ThreadFactories.ofName("docspell-joex-dbconnect")) - ) + val blockingEC = ThreadFactories.cached[IO](ThreadFactories.ofName("docspell-joex-blocking")) + val connectEC = ThreadFactories.fixed[IO](5, ThreadFactories.ofName("docspell-joex-dbconnect")) def run(args: List[String]) = { args match { @@ -52,6 +46,15 @@ object Main extends IOApp { cfg.baseUrl ) logger.info(s"\n${banner.render("***>")}") - JoexServer.stream[IO](cfg, connectEC, blockingEC, blocker).compile.drain.as(ExitCode.Success) + val pools = for { + cec <- connectEC + bec <- blockingEC + blocker = Blocker.liftExecutorService(bec) + } yield Pools(cec, bec, blocker) + pools.use(p => + JoexServer.stream[IO](cfg, p.connectEC, p.clientEC, p.blocker).compile.drain.as(ExitCode.Success) + ) } + + case class Pools(connectEC: ExecutionContext, clientEC: ExecutionContext, blocker: Blocker) } diff --git a/modules/restserver/src/main/scala/docspell/restserver/Main.scala b/modules/restserver/src/main/scala/docspell/restserver/Main.scala index 1c9d3d45..d48ef0b6 100644 --- a/modules/restserver/src/main/scala/docspell/restserver/Main.scala +++ b/modules/restserver/src/main/scala/docspell/restserver/Main.scala @@ -4,7 +4,6 @@ import cats.effect._ import cats.implicits._ import scala.concurrent.ExecutionContext -import java.util.concurrent.Executors import java.nio.file.{Files, Paths} import docspell.common.{Banner, ThreadFactories} @@ -13,14 +12,8 @@ import org.log4s._ object Main extends IOApp { private[this] val logger = getLogger - val blockingEc: ExecutionContext = ExecutionContext.fromExecutor( - Executors.newCachedThreadPool(ThreadFactories.ofName("docspell-restserver-blocking")) - ) - val blocker = Blocker.liftExecutionContext(blockingEc) - - val connectEC: ExecutionContext = ExecutionContext.fromExecutorService( - Executors.newFixedThreadPool(5, ThreadFactories.ofName("docspell-dbconnect")) - ) + val blockingEC = ThreadFactories.cached[IO](ThreadFactories.ofName("docspell-restserver-blocking")) + val connectEC = ThreadFactories.fixed[IO](5, ThreadFactories.ofName("docspell-dbconnect")) def run(args: List[String]) = { args match { @@ -52,7 +45,17 @@ object Main extends IOApp { cfg.appId, cfg.baseUrl ) + val pools = for { + cec <- connectEC + bec <- blockingEC + blocker = Blocker.liftExecutorService(bec) + } yield Pools(cec, bec, blocker) + logger.info(s"\n${banner.render("***>")}") - RestServer.stream[IO](cfg, connectEC, blockingEc, blocker).compile.drain.as(ExitCode.Success) + pools.use(p => + RestServer.stream[IO](cfg, p.connectEC, p.clientEC, p.blocker).compile.drain.as(ExitCode.Success) + ) } + + case class Pools(connectEC: ExecutionContext, clientEC: ExecutionContext, blocker: Blocker) }