Create a simple client for joex in its api module

This client can be used within the backend app and later in other
modules. The `OJoex` object is replaced with a better implementation
where the http client is initialized once on app start.
This commit is contained in:
Eike Kettner 2020-03-03 23:07:49 +01:00
parent 42c59179b8
commit 9b28858d06
6 changed files with 97 additions and 65 deletions
build.sbt
modules
backend/src/main/scala/docspell/backend
joexapi/src/main/scala/docspell/joexapi/client

@ -260,7 +260,8 @@ val joexapi = project.in(file("modules/joexapi")).
settings( settings(
name := "docspell-joexapi", name := "docspell-joexapi",
libraryDependencies ++= libraryDependencies ++=
Dependencies.circe, Dependencies.circe ++
Dependencies.http4sClient,
openapiTargetLanguage := Language.Scala, openapiTargetLanguage := Language.Scala,
openapiPackage := Pkg("docspell.joexapi.model"), openapiPackage := Pkg("docspell.joexapi.model"),
openapiSpec := (Compile/resourceDirectory).value/"joex-openapi.yml" openapiSpec := (Compile/resourceDirectory).value/"joex-openapi.yml"
@ -302,7 +303,7 @@ val backend = project.in(file("modules/backend")).
Dependencies.bcrypt ++ Dependencies.bcrypt ++
Dependencies.http4sClient ++ Dependencies.http4sClient ++
Dependencies.emil Dependencies.emil
).dependsOn(store) ).dependsOn(store, joexapi)
val webapp = project.in(file("modules/webapp")). val webapp = project.in(file("modules/webapp")).
disablePlugins(RevolverPlugin). disablePlugins(RevolverPlugin).

@ -25,6 +25,7 @@ trait BackendApp[F[_]] {
def job: OJob[F] def job: OJob[F]
def item: OItem[F] def item: OItem[F]
def mail: OMail[F] def mail: OMail[F]
def joex: OJoex[F]
} }
object BackendApp { object BackendApp {
@ -44,9 +45,10 @@ object BackendApp {
tagImpl <- OTag[F](store) tagImpl <- OTag[F](store)
equipImpl <- OEquipment[F](store) equipImpl <- OEquipment[F](store)
orgImpl <- OOrganization(store) orgImpl <- OOrganization(store)
uploadImpl <- OUpload(store, queue, cfg, httpClientEc) joexImpl <- OJoex.create(httpClientEc, store)
uploadImpl <- OUpload(store, queue, cfg, joexImpl)
nodeImpl <- ONode(store) nodeImpl <- ONode(store)
jobImpl <- OJob(store, httpClientEc) jobImpl <- OJob(store, joexImpl)
itemImpl <- OItem(store) itemImpl <- OItem(store)
mailImpl <- OMail(store, JavaMailEmil(blocker)) mailImpl <- OMail(store, JavaMailEmil(blocker))
} yield new BackendApp[F] { } yield new BackendApp[F] {
@ -62,6 +64,7 @@ object BackendApp {
val job = jobImpl val job = jobImpl
val item = itemImpl val item = itemImpl
val mail = mailImpl val mail = mailImpl
val joex = joexImpl
} }
def apply[F[_]: ConcurrentEffect: ContextShift]( def apply[F[_]: ConcurrentEffect: ContextShift](

@ -1,15 +1,13 @@
package docspell.backend.ops package docspell.backend.ops
import cats.implicits._ import cats.implicits._
import cats.effect.{ConcurrentEffect, Resource} import cats.effect._
import docspell.backend.ops.OJob.{CollectiveQueueState, JobCancelResult} import docspell.backend.ops.OJob.{CollectiveQueueState, JobCancelResult}
import docspell.common.{Ident, JobState} import docspell.common.{Ident, JobState}
import docspell.store.Store import docspell.store.Store
import docspell.store.queries.QJob import docspell.store.queries.QJob
import docspell.store.records.{RJob, RJobLog} import docspell.store.records.{RJob, RJobLog}
import scala.concurrent.ExecutionContext
trait OJob[F[_]] { trait OJob[F[_]] {
def queueState(collective: Ident, maxResults: Int): F[CollectiveQueueState] def queueState(collective: Ident, maxResults: Int): F[CollectiveQueueState]
@ -36,9 +34,9 @@ object OJob {
jobs.filter(_.job.state == JobState.Running) jobs.filter(_.job.state == JobState.Running)
} }
def apply[F[_]: ConcurrentEffect]( def apply[F[_]: Sync](
store: Store[F], store: Store[F],
clientEC: ExecutionContext joex: OJoex[F]
): Resource[F, OJob[F]] = ): Resource[F, OJob[F]] =
Resource.pure[F, OJob[F]](new OJob[F] { Resource.pure[F, OJob[F]](new OJob[F] {
@ -70,8 +68,7 @@ object OJob {
} }
def tryCancel(job: RJob, worker: Ident): F[JobCancelResult] = def tryCancel(job: RJob, worker: Ident): F[JobCancelResult] =
OJoex joex.cancelJob(job.id, worker)
.cancelJob(job.id, worker, store, clientEC)
.map(flag => if (flag) JobCancelResult.CancelRequested else JobCancelResult.JobNotFound) .map(flag => if (flag) JobCancelResult.CancelRequested else JobCancelResult.JobNotFound)
for { for {

@ -1,62 +1,40 @@
package docspell.backend.ops package docspell.backend.ops
import cats.implicits._ import cats.implicits._
import cats.effect.ConcurrentEffect import cats.effect._
import docspell.common.{Ident, NodeType} import docspell.common.{Ident, NodeType}
import docspell.joexapi.client.JoexClient
import docspell.store.Store import docspell.store.Store
import docspell.store.records.RNode import docspell.store.records.RNode
import org.http4s.client.blaze.BlazeClientBuilder
import org.http4s.Method._
import org.http4s.{Request, Uri}
import scala.concurrent.ExecutionContext import scala.concurrent.ExecutionContext
import org.log4s._
trait OJoex[F[_]] {
def notifyAllNodes: F[Unit]
def cancelJob(job: Ident, worker: Ident): F[Boolean]
}
object OJoex { object OJoex {
private[this] val logger = getLogger
def notifyAll[F[_]: ConcurrentEffect]( def apply[F[_]: Sync](client: JoexClient[F], store: Store[F]): Resource[F, OJoex[F]] =
store: Store[F], Resource.pure[F, OJoex[F]](new OJoex[F] {
clientExecutionContext: ExecutionContext def notifyAllNodes: F[Unit] =
): F[Unit] = for {
for { nodes <- store.transact(RNode.findAll(NodeType.Joex))
nodes <- store.transact(RNode.findAll(NodeType.Joex)) _ <- nodes.toList.traverse(n => client.notifyJoexIgnoreErrors(n.url))
_ <- nodes.toList.traverse(notifyJoex[F](clientExecutionContext)) } yield ()
} yield ()
def cancelJob[F[_]: ConcurrentEffect]( def cancelJob(job: Ident, worker: Ident): F[Boolean] =
jobId: Ident, for {
worker: Ident, node <- store.transact(RNode.findById(worker))
store: Store[F], cancel <- node.traverse(n => client.cancelJob(n.url, job))
clientEc: ExecutionContext } yield cancel.isDefined
): F[Boolean] = })
for {
node <- store.transact(RNode.findById(worker))
cancel <- node.traverse(joexCancel(clientEc)(_, jobId))
} yield cancel.getOrElse(false)
private def joexCancel[F[_]: ConcurrentEffect]( def create[F[_]: ConcurrentEffect](ec: ExecutionContext, store: Store[F]): Resource[F, OJoex[F]] =
ec: ExecutionContext JoexClient.resource(ec).flatMap(client => apply(client, store))
)(node: RNode, job: Ident): F[Boolean] = {
val notifyUrl = node.url / "api" / "v1" / "job" / job.id / "cancel"
BlazeClientBuilder[F](ec).resource.use { client =>
val req = Request[F](POST, Uri.unsafeFromString(notifyUrl.asString))
client.expect[String](req).map(_ => true)
}
}
private def notifyJoex[F[_]: ConcurrentEffect](ec: ExecutionContext)(node: RNode): F[Unit] = {
val notifyUrl = node.url / "api" / "v1" / "notify"
val execute = BlazeClientBuilder[F](ec).resource.use { client =>
val req = Request[F](POST, Uri.unsafeFromString(notifyUrl.asString))
client.expect[String](req).map(_ => ())
}
execute.attempt.map {
case Right(_) =>
()
case Left(_) =>
logger.warn(s"Notifying Joex instance '${node.id.id}/${node.url.asString}' failed.")
()
}
}
} }

@ -2,7 +2,7 @@ package docspell.backend.ops
import bitpeace.MimetypeHint import bitpeace.MimetypeHint
import cats.implicits._ import cats.implicits._
import cats.effect.{ConcurrentEffect, Effect, Resource} import cats.effect._
import docspell.backend.Config import docspell.backend.Config
import fs2.Stream import fs2.Stream
import docspell.common._ import docspell.common._
@ -12,8 +12,6 @@ import docspell.store.queue.JobQueue
import docspell.store.records.{RCollective, RJob, RSource} import docspell.store.records.{RCollective, RJob, RSource}
import org.log4s._ import org.log4s._
import scala.concurrent.ExecutionContext
trait OUpload[F[_]] { trait OUpload[F[_]] {
def submit(data: OUpload.UploadData[F], account: AccountId): F[OUpload.UploadResult] def submit(data: OUpload.UploadData[F], account: AccountId): F[OUpload.UploadResult]
@ -51,11 +49,11 @@ object OUpload {
case object NoSource extends UploadResult case object NoSource extends UploadResult
} }
def apply[F[_]: ConcurrentEffect]( def apply[F[_]: Sync](
store: Store[F], store: Store[F],
queue: JobQueue[F], queue: JobQueue[F],
cfg: Config, cfg: Config,
httpClientEC: ExecutionContext joex: OJoex[F]
): Resource[F, OUpload[F]] = ): Resource[F, OUpload[F]] =
Resource.pure[F, OUpload[F]](new OUpload[F] { Resource.pure[F, OUpload[F]](new OUpload[F] {
@ -92,7 +90,7 @@ object OUpload {
for { for {
_ <- logger.fdebug(s"Storing jobs: $jobs") _ <- logger.fdebug(s"Storing jobs: $jobs")
_ <- queue.insertAll(jobs) _ <- queue.insertAll(jobs)
_ <- OJoex.notifyAll(store, httpClientEC) _ <- joex.notifyAllNodes
} yield UploadResult.Success } yield UploadResult.Success
private def saveFile(file: File[F]): F[Option[ProcessItemArgs.File]] = private def saveFile(file: File[F]): F[Option[ProcessItemArgs.File]] =
@ -109,7 +107,7 @@ object OUpload {
}, id => Some(ProcessItemArgs.File(file.name, id)))) }, id => Some(ProcessItemArgs.File(file.name, id))))
private def checkFileList(files: Seq[ProcessItemArgs.File]): F[Either[UploadResult, Unit]] = private def checkFileList(files: Seq[ProcessItemArgs.File]): F[Either[UploadResult, Unit]] =
Effect[F].pure(if (files.isEmpty) Left(UploadResult.NoFiles) else Right(())) Sync[F].pure(if (files.isEmpty) Left(UploadResult.NoFiles) else Right(()))
private def makeJobs( private def makeJobs(
args: Vector[ProcessItemArgs], args: Vector[ProcessItemArgs],

@ -0,0 +1,55 @@
package docspell.joexapi.client
import cats.implicits._
import cats.effect._
import docspell.common.{Ident, LenientUri}
import org.http4s.{Method, Request, Uri}
import org.http4s.client.Client
import org.http4s.client.blaze.BlazeClientBuilder
import scala.concurrent.ExecutionContext
import org.log4s.getLogger
trait JoexClient[F[_]] {
def notifyJoex(base: LenientUri): F[Unit]
def notifyJoexIgnoreErrors(base: LenientUri): F[Unit]
def cancelJob(base: LenientUri, job: Ident): F[Unit]
}
object JoexClient {
private[this] val logger = getLogger
def apply[F[_]: Sync](client: Client[F]): JoexClient[F] =
new JoexClient[F] {
def notifyJoex(base: LenientUri): F[Unit] = {
val notifyUrl = base / "api" / "v1" / "notify"
val req = Request[F](Method.POST, uri(notifyUrl))
client.expect[String](req).map(_ => ())
}
def notifyJoexIgnoreErrors(base: LenientUri): F[Unit] =
notifyJoex(base).attempt.map {
case Right(()) => ()
case Left(ex) =>
logger.warn(s"Notifying Joex instance '${base.asString}' failed: ${ex.getMessage}")
()
}
def cancelJob(base: LenientUri, job: Ident): F[Unit] = {
val cancelUrl = base / "api" / "v1" / "job" / job.id / "cancel"
val req = Request[F](Method.POST, uri(cancelUrl))
client.expect[String](req).map(_ => ())
}
private def uri(u: LenientUri): Uri =
Uri.unsafeFromString(u.asString)
}
def resource[F[_]: ConcurrentEffect](ec: ExecutionContext): Resource[F, JoexClient[F]] =
BlazeClientBuilder[F](ec).resource.map(apply[F])
}