diff --git a/build.sbt b/build.sbt index 5ebd8526..53a3c886 100644 --- a/build.sbt +++ b/build.sbt @@ -260,7 +260,8 @@ val joexapi = project.in(file("modules/joexapi")). settings( name := "docspell-joexapi", libraryDependencies ++= - Dependencies.circe, + Dependencies.circe ++ + Dependencies.http4sClient, openapiTargetLanguage := Language.Scala, openapiPackage := Pkg("docspell.joexapi.model"), openapiSpec := (Compile/resourceDirectory).value/"joex-openapi.yml" @@ -302,7 +303,7 @@ val backend = project.in(file("modules/backend")). Dependencies.bcrypt ++ Dependencies.http4sClient ++ Dependencies.emil - ).dependsOn(store) + ).dependsOn(store, joexapi) val webapp = project.in(file("modules/webapp")). disablePlugins(RevolverPlugin). diff --git a/modules/backend/src/main/scala/docspell/backend/BackendApp.scala b/modules/backend/src/main/scala/docspell/backend/BackendApp.scala index 18e7ef37..39b1e0b7 100644 --- a/modules/backend/src/main/scala/docspell/backend/BackendApp.scala +++ b/modules/backend/src/main/scala/docspell/backend/BackendApp.scala @@ -25,6 +25,7 @@ trait BackendApp[F[_]] { def job: OJob[F] def item: OItem[F] def mail: OMail[F] + def joex: OJoex[F] } object BackendApp { @@ -44,9 +45,10 @@ object BackendApp { tagImpl <- OTag[F](store) equipImpl <- OEquipment[F](store) orgImpl <- OOrganization(store) - uploadImpl <- OUpload(store, queue, cfg, httpClientEc) + joexImpl <- OJoex.create(httpClientEc, store) + uploadImpl <- OUpload(store, queue, cfg, joexImpl) nodeImpl <- ONode(store) - jobImpl <- OJob(store, httpClientEc) + jobImpl <- OJob(store, joexImpl) itemImpl <- OItem(store) mailImpl <- OMail(store, JavaMailEmil(blocker)) } yield new BackendApp[F] { @@ -62,6 +64,7 @@ object BackendApp { val job = jobImpl val item = itemImpl val mail = mailImpl + val joex = joexImpl } def apply[F[_]: ConcurrentEffect: ContextShift]( diff --git a/modules/backend/src/main/scala/docspell/backend/ops/OJob.scala b/modules/backend/src/main/scala/docspell/backend/ops/OJob.scala index 98c675c2..21cdfb1e 100644 --- a/modules/backend/src/main/scala/docspell/backend/ops/OJob.scala +++ b/modules/backend/src/main/scala/docspell/backend/ops/OJob.scala @@ -1,15 +1,13 @@ package docspell.backend.ops import cats.implicits._ -import cats.effect.{ConcurrentEffect, Resource} +import cats.effect._ import docspell.backend.ops.OJob.{CollectiveQueueState, JobCancelResult} import docspell.common.{Ident, JobState} import docspell.store.Store import docspell.store.queries.QJob import docspell.store.records.{RJob, RJobLog} -import scala.concurrent.ExecutionContext - trait OJob[F[_]] { def queueState(collective: Ident, maxResults: Int): F[CollectiveQueueState] @@ -36,9 +34,9 @@ object OJob { jobs.filter(_.job.state == JobState.Running) } - def apply[F[_]: ConcurrentEffect]( + def apply[F[_]: Sync]( store: Store[F], - clientEC: ExecutionContext + joex: OJoex[F] ): Resource[F, OJob[F]] = Resource.pure[F, OJob[F]](new OJob[F] { @@ -70,8 +68,7 @@ object OJob { } def tryCancel(job: RJob, worker: Ident): F[JobCancelResult] = - OJoex - .cancelJob(job.id, worker, store, clientEC) + joex.cancelJob(job.id, worker) .map(flag => if (flag) JobCancelResult.CancelRequested else JobCancelResult.JobNotFound) for { diff --git a/modules/backend/src/main/scala/docspell/backend/ops/OJoex.scala b/modules/backend/src/main/scala/docspell/backend/ops/OJoex.scala index 151b8485..e9283dc4 100644 --- a/modules/backend/src/main/scala/docspell/backend/ops/OJoex.scala +++ b/modules/backend/src/main/scala/docspell/backend/ops/OJoex.scala @@ -1,62 +1,40 @@ package docspell.backend.ops import cats.implicits._ -import cats.effect.ConcurrentEffect +import cats.effect._ import docspell.common.{Ident, NodeType} +import docspell.joexapi.client.JoexClient import docspell.store.Store import docspell.store.records.RNode -import org.http4s.client.blaze.BlazeClientBuilder -import org.http4s.Method._ -import org.http4s.{Request, Uri} import scala.concurrent.ExecutionContext -import org.log4s._ + +trait OJoex[F[_]] { + + def notifyAllNodes: F[Unit] + + def cancelJob(job: Ident, worker: Ident): F[Boolean] + +} object OJoex { - private[this] val logger = getLogger - def notifyAll[F[_]: ConcurrentEffect]( - store: Store[F], - clientExecutionContext: ExecutionContext - ): F[Unit] = - for { - nodes <- store.transact(RNode.findAll(NodeType.Joex)) - _ <- nodes.toList.traverse(notifyJoex[F](clientExecutionContext)) - } yield () + def apply[F[_]: Sync](client: JoexClient[F], store: Store[F]): Resource[F, OJoex[F]] = + Resource.pure[F, OJoex[F]](new OJoex[F] { + def notifyAllNodes: F[Unit] = + for { + nodes <- store.transact(RNode.findAll(NodeType.Joex)) + _ <- nodes.toList.traverse(n => client.notifyJoexIgnoreErrors(n.url)) + } yield () - def cancelJob[F[_]: ConcurrentEffect]( - jobId: Ident, - worker: Ident, - store: Store[F], - clientEc: ExecutionContext - ): F[Boolean] = - for { - node <- store.transact(RNode.findById(worker)) - cancel <- node.traverse(joexCancel(clientEc)(_, jobId)) - } yield cancel.getOrElse(false) + def cancelJob(job: Ident, worker: Ident): F[Boolean] = + for { + node <- store.transact(RNode.findById(worker)) + cancel <- node.traverse(n => client.cancelJob(n.url, job)) + } yield cancel.isDefined + }) - private def joexCancel[F[_]: ConcurrentEffect]( - ec: ExecutionContext - )(node: RNode, job: Ident): F[Boolean] = { - val notifyUrl = node.url / "api" / "v1" / "job" / job.id / "cancel" - BlazeClientBuilder[F](ec).resource.use { client => - val req = Request[F](POST, Uri.unsafeFromString(notifyUrl.asString)) - client.expect[String](req).map(_ => true) - } - } + def create[F[_]: ConcurrentEffect](ec: ExecutionContext, store: Store[F]): Resource[F, OJoex[F]] = + JoexClient.resource(ec).flatMap(client => apply(client, store)) - private def notifyJoex[F[_]: ConcurrentEffect](ec: ExecutionContext)(node: RNode): F[Unit] = { - val notifyUrl = node.url / "api" / "v1" / "notify" - val execute = BlazeClientBuilder[F](ec).resource.use { client => - val req = Request[F](POST, Uri.unsafeFromString(notifyUrl.asString)) - client.expect[String](req).map(_ => ()) - } - execute.attempt.map { - case Right(_) => - () - case Left(_) => - logger.warn(s"Notifying Joex instance '${node.id.id}/${node.url.asString}' failed.") - () - } - } } diff --git a/modules/backend/src/main/scala/docspell/backend/ops/OUpload.scala b/modules/backend/src/main/scala/docspell/backend/ops/OUpload.scala index 37cf4a2d..1f20ff30 100644 --- a/modules/backend/src/main/scala/docspell/backend/ops/OUpload.scala +++ b/modules/backend/src/main/scala/docspell/backend/ops/OUpload.scala @@ -2,7 +2,7 @@ package docspell.backend.ops import bitpeace.MimetypeHint import cats.implicits._ -import cats.effect.{ConcurrentEffect, Effect, Resource} +import cats.effect._ import docspell.backend.Config import fs2.Stream import docspell.common._ @@ -12,8 +12,6 @@ import docspell.store.queue.JobQueue import docspell.store.records.{RCollective, RJob, RSource} import org.log4s._ -import scala.concurrent.ExecutionContext - trait OUpload[F[_]] { def submit(data: OUpload.UploadData[F], account: AccountId): F[OUpload.UploadResult] @@ -51,11 +49,11 @@ object OUpload { case object NoSource extends UploadResult } - def apply[F[_]: ConcurrentEffect]( + def apply[F[_]: Sync]( store: Store[F], queue: JobQueue[F], cfg: Config, - httpClientEC: ExecutionContext + joex: OJoex[F] ): Resource[F, OUpload[F]] = Resource.pure[F, OUpload[F]](new OUpload[F] { @@ -92,7 +90,7 @@ object OUpload { for { _ <- logger.fdebug(s"Storing jobs: $jobs") _ <- queue.insertAll(jobs) - _ <- OJoex.notifyAll(store, httpClientEC) + _ <- joex.notifyAllNodes } yield UploadResult.Success private def saveFile(file: File[F]): F[Option[ProcessItemArgs.File]] = @@ -109,7 +107,7 @@ object OUpload { }, id => Some(ProcessItemArgs.File(file.name, id)))) private def checkFileList(files: Seq[ProcessItemArgs.File]): F[Either[UploadResult, Unit]] = - Effect[F].pure(if (files.isEmpty) Left(UploadResult.NoFiles) else Right(())) + Sync[F].pure(if (files.isEmpty) Left(UploadResult.NoFiles) else Right(())) private def makeJobs( args: Vector[ProcessItemArgs], diff --git a/modules/joexapi/src/main/scala/docspell/joexapi/client/JoexClient.scala b/modules/joexapi/src/main/scala/docspell/joexapi/client/JoexClient.scala new file mode 100644 index 00000000..964d4691 --- /dev/null +++ b/modules/joexapi/src/main/scala/docspell/joexapi/client/JoexClient.scala @@ -0,0 +1,55 @@ +package docspell.joexapi.client + +import cats.implicits._ +import cats.effect._ +import docspell.common.{Ident, LenientUri} +import org.http4s.{Method, Request, Uri} +import org.http4s.client.Client +import org.http4s.client.blaze.BlazeClientBuilder +import scala.concurrent.ExecutionContext + +import org.log4s.getLogger + +trait JoexClient[F[_]] { + + def notifyJoex(base: LenientUri): F[Unit] + + def notifyJoexIgnoreErrors(base: LenientUri): F[Unit] + + def cancelJob(base: LenientUri, job: Ident): F[Unit] + +} + +object JoexClient { + + private[this] val logger = getLogger + + def apply[F[_]: Sync](client: Client[F]): JoexClient[F] = + new JoexClient[F] { + def notifyJoex(base: LenientUri): F[Unit] = { + val notifyUrl = base / "api" / "v1" / "notify" + val req = Request[F](Method.POST, uri(notifyUrl)) + client.expect[String](req).map(_ => ()) + } + + def notifyJoexIgnoreErrors(base: LenientUri): F[Unit] = + notifyJoex(base).attempt.map { + case Right(()) => () + case Left(ex) => + logger.warn(s"Notifying Joex instance '${base.asString}' failed: ${ex.getMessage}") + () + } + + def cancelJob(base: LenientUri, job: Ident): F[Unit] = { + val cancelUrl = base / "api" / "v1" / "job" / job.id / "cancel" + val req = Request[F](Method.POST, uri(cancelUrl)) + client.expect[String](req).map(_ => ()) + } + + private def uri(u: LenientUri): Uri = + Uri.unsafeFromString(u.asString) + } + + def resource[F[_]: ConcurrentEffect](ec: ExecutionContext): Resource[F, JoexClient[F]] = + BlazeClientBuilder[F](ec).resource.map(apply[F]) +} \ No newline at end of file