mirror of
				https://github.com/TheAnachronism/docspell.git
				synced 2025-10-31 09:30:12 +00:00 
			
		
		
		
	Properly initialize thread pools
This commit is contained in:
		| @@ -2,6 +2,8 @@ package docspell.common | ||||
|  | ||||
| import java.util.concurrent.atomic.AtomicLong | ||||
| import java.util.concurrent.{Executors, ThreadFactory} | ||||
| import cats.effect._ | ||||
| import scala.concurrent._ | ||||
|  | ||||
| object ThreadFactories { | ||||
|  | ||||
| @@ -17,4 +19,16 @@ object ThreadFactories { | ||||
|       } | ||||
|     } | ||||
|  | ||||
|   def executorResource[F[_]: Sync]( | ||||
|       c: => ExecutionContextExecutorService | ||||
|   ): Resource[F, ExecutionContextExecutorService] = | ||||
|     Resource.make(Sync[F].delay(c))(ec => Sync[F].delay(ec.shutdown)) | ||||
|  | ||||
|   def cached[F[_]: Sync](tf: ThreadFactory): Resource[F, ExecutionContextExecutorService] = | ||||
|     executorResource( | ||||
|       ExecutionContext.fromExecutorService(Executors.newCachedThreadPool(tf)) | ||||
|     ) | ||||
|  | ||||
|   def fixed[F[_]: Sync](n: Int, tf: ThreadFactory): Resource[F, ExecutionContextExecutorService] = | ||||
|     executorResource(ExecutionContext.fromExecutorService(Executors.newFixedThreadPool(n, tf))) | ||||
| } | ||||
|   | ||||
| @@ -3,9 +3,8 @@ package docspell.joex | ||||
| import cats.effect.{Blocker, ExitCode, IO, IOApp} | ||||
| import cats.implicits._ | ||||
|  | ||||
| import scala.concurrent.ExecutionContext | ||||
| import java.util.concurrent.Executors | ||||
| import java.nio.file.{Files, Paths} | ||||
| import scala.concurrent.ExecutionContext | ||||
|  | ||||
| import docspell.common.{Banner, ThreadFactories} | ||||
| import org.log4s._ | ||||
| @@ -13,13 +12,8 @@ import org.log4s._ | ||||
| object Main extends IOApp { | ||||
|   private[this] val logger = getLogger | ||||
|  | ||||
|   val blockingEC: ExecutionContext = ExecutionContext.fromExecutor( | ||||
|     Executors.newCachedThreadPool(ThreadFactories.ofName("docspell-joex-blocking")) | ||||
|   ) | ||||
|   val blocker = Blocker.liftExecutionContext(blockingEC) | ||||
|   val connectEC: ExecutionContext = ExecutionContext.fromExecutorService( | ||||
|     Executors.newFixedThreadPool(5, ThreadFactories.ofName("docspell-joex-dbconnect")) | ||||
|   ) | ||||
|   val blockingEC = ThreadFactories.cached[IO](ThreadFactories.ofName("docspell-joex-blocking")) | ||||
|   val connectEC = ThreadFactories.fixed[IO](5, ThreadFactories.ofName("docspell-joex-dbconnect")) | ||||
|  | ||||
|   def run(args: List[String]) = { | ||||
|     args match { | ||||
| @@ -52,6 +46,15 @@ object Main extends IOApp { | ||||
|       cfg.baseUrl | ||||
|     ) | ||||
|     logger.info(s"\n${banner.render("***>")}") | ||||
|     JoexServer.stream[IO](cfg, connectEC, blockingEC, blocker).compile.drain.as(ExitCode.Success) | ||||
|     val pools = for { | ||||
|       cec <- connectEC | ||||
|       bec <- blockingEC | ||||
|       blocker = Blocker.liftExecutorService(bec) | ||||
|     } yield Pools(cec, bec, blocker) | ||||
|     pools.use(p => | ||||
|       JoexServer.stream[IO](cfg, p.connectEC, p.clientEC, p.blocker).compile.drain.as(ExitCode.Success) | ||||
|     ) | ||||
|   } | ||||
|  | ||||
|   case class Pools(connectEC: ExecutionContext, clientEC: ExecutionContext, blocker: Blocker) | ||||
| } | ||||
|   | ||||
| @@ -4,7 +4,6 @@ import cats.effect._ | ||||
| import cats.implicits._ | ||||
|  | ||||
| import scala.concurrent.ExecutionContext | ||||
| import java.util.concurrent.Executors | ||||
| import java.nio.file.{Files, Paths} | ||||
|  | ||||
| import docspell.common.{Banner, ThreadFactories} | ||||
| @@ -13,14 +12,8 @@ import org.log4s._ | ||||
| object Main extends IOApp { | ||||
|   private[this] val logger = getLogger | ||||
|  | ||||
|   val blockingEc: ExecutionContext = ExecutionContext.fromExecutor( | ||||
|     Executors.newCachedThreadPool(ThreadFactories.ofName("docspell-restserver-blocking")) | ||||
|   ) | ||||
|   val blocker = Blocker.liftExecutionContext(blockingEc) | ||||
|  | ||||
|   val connectEC: ExecutionContext = ExecutionContext.fromExecutorService( | ||||
|     Executors.newFixedThreadPool(5, ThreadFactories.ofName("docspell-dbconnect")) | ||||
|   ) | ||||
|   val blockingEC = ThreadFactories.cached[IO](ThreadFactories.ofName("docspell-restserver-blocking")) | ||||
|   val connectEC = ThreadFactories.fixed[IO](5, ThreadFactories.ofName("docspell-dbconnect")) | ||||
|  | ||||
|   def run(args: List[String]) = { | ||||
|     args match { | ||||
| @@ -52,7 +45,17 @@ object Main extends IOApp { | ||||
|       cfg.appId, | ||||
|       cfg.baseUrl | ||||
|     ) | ||||
|     val pools = for { | ||||
|       cec <- connectEC | ||||
|       bec <- blockingEC | ||||
|       blocker = Blocker.liftExecutorService(bec) | ||||
|     } yield Pools(cec, bec, blocker) | ||||
|  | ||||
|     logger.info(s"\n${banner.render("***>")}") | ||||
|     RestServer.stream[IO](cfg, connectEC, blockingEc, blocker).compile.drain.as(ExitCode.Success) | ||||
|     pools.use(p => | ||||
|       RestServer.stream[IO](cfg, p.connectEC, p.clientEC, p.blocker).compile.drain.as(ExitCode.Success) | ||||
|     ) | ||||
|   } | ||||
|  | ||||
|   case class Pools(connectEC: ExecutionContext, clientEC: ExecutionContext, blocker: Blocker) | ||||
| } | ||||
|   | ||||
		Reference in New Issue
	
	Block a user