mirror of
https://github.com/TheAnachronism/docspell.git
synced 2025-06-22 02:18:26 +00:00
Hooking the new pubsub impl into the application
This commit is contained in:
@ -20,6 +20,7 @@ import docspell.joex.analysis.RegexNerFile
|
||||
import docspell.joex.hk.HouseKeepingConfig
|
||||
import docspell.joex.scheduler.{PeriodicSchedulerConfig, SchedulerConfig}
|
||||
import docspell.joex.updatecheck.UpdateCheckConfig
|
||||
import docspell.pubsub.naive.PubSubConfig
|
||||
import docspell.store.JdbcConfig
|
||||
|
||||
case class Config(
|
||||
@ -39,7 +40,11 @@ case class Config(
|
||||
mailDebug: Boolean,
|
||||
fullTextSearch: Config.FullTextSearch,
|
||||
updateCheck: UpdateCheckConfig
|
||||
)
|
||||
) {
|
||||
|
||||
def pubSubConfig: PubSubConfig =
|
||||
PubSubConfig(appId, baseUrl / "internal" / "pubsub", 100)
|
||||
}
|
||||
|
||||
object Config {
|
||||
case class Bind(address: String, port: Int)
|
||||
|
@ -6,14 +6,13 @@
|
||||
|
||||
package docspell.joex
|
||||
|
||||
import scala.concurrent.ExecutionContext
|
||||
|
||||
import cats.effect._
|
||||
import cats.implicits._
|
||||
import fs2.concurrent.SignallingRef
|
||||
|
||||
import docspell.analysis.TextAnalyser
|
||||
import docspell.backend.fulltext.CreateIndex
|
||||
import docspell.backend.msg.{CancelJob, Ping, Topics}
|
||||
import docspell.backend.ops._
|
||||
import docspell.common._
|
||||
import docspell.ftsclient.FtsClient
|
||||
@ -34,6 +33,7 @@ import docspell.joex.scanmailbox._
|
||||
import docspell.joex.scheduler._
|
||||
import docspell.joex.updatecheck._
|
||||
import docspell.joexapi.client.JoexClient
|
||||
import docspell.pubsub.api.{PubSub, PubSubT}
|
||||
import docspell.store.Store
|
||||
import docspell.store.queue._
|
||||
import docspell.store.records.{REmptyTrashSetting, RJobLog}
|
||||
@ -41,19 +41,20 @@ import docspell.store.usertask.UserTaskScope
|
||||
import docspell.store.usertask.UserTaskStore
|
||||
|
||||
import emil.javamail._
|
||||
import org.http4s.blaze.client.BlazeClientBuilder
|
||||
import org.http4s.client.Client
|
||||
|
||||
final class JoexAppImpl[F[_]: Async](
|
||||
cfg: Config,
|
||||
nodeOps: ONode[F],
|
||||
store: Store[F],
|
||||
queue: JobQueue[F],
|
||||
pubSubT: PubSubT[F],
|
||||
pstore: PeriodicTaskStore[F],
|
||||
termSignal: SignallingRef[F, Boolean],
|
||||
val scheduler: Scheduler[F],
|
||||
val periodicScheduler: PeriodicScheduler[F]
|
||||
) extends JoexApp[F] {
|
||||
private[this] val logger: Logger[F] =
|
||||
Logger.log4s(org.log4s.getLogger(s"Joex-${cfg.appId.id}"))
|
||||
|
||||
def init: F[Unit] = {
|
||||
val run = scheduler.start.compile.drain
|
||||
@ -64,16 +65,26 @@ final class JoexAppImpl[F[_]: Async](
|
||||
_ <- Async[F].start(prun)
|
||||
_ <- scheduler.periodicAwake
|
||||
_ <- periodicScheduler.periodicAwake
|
||||
_ <- nodeOps.register(cfg.appId, NodeType.Joex, cfg.baseUrl)
|
||||
_ <- subscriptions
|
||||
} yield ()
|
||||
}
|
||||
|
||||
def subscriptions =
|
||||
for {
|
||||
_ <- Async[F].start(pubSubT.subscribeSink(Ping.topic) { msg =>
|
||||
logger.info(s">>>> PING $msg")
|
||||
})
|
||||
_ <- Async[F].start(pubSubT.subscribeSink(Topics.jobsNotify) { _ =>
|
||||
scheduler.notifyChange
|
||||
})
|
||||
_ <- Async[F].start(pubSubT.subscribeSink(CancelJob.topic) { msg =>
|
||||
scheduler.requestCancel(msg.body.jobId).as(())
|
||||
})
|
||||
} yield ()
|
||||
|
||||
def findLogs(jobId: Ident): F[Vector[RJobLog]] =
|
||||
store.transact(RJobLog.findLogs(jobId))
|
||||
|
||||
def shutdown: F[Unit] =
|
||||
nodeOps.unregister(cfg.appId)
|
||||
|
||||
def initShutdown: F[Unit] =
|
||||
periodicScheduler.shutdown *> scheduler.shutdown(false) *> termSignal.set(true)
|
||||
|
||||
@ -116,16 +127,19 @@ object JoexAppImpl {
|
||||
def create[F[_]: Async](
|
||||
cfg: Config,
|
||||
termSignal: SignallingRef[F, Boolean],
|
||||
connectEC: ExecutionContext
|
||||
store: Store[F],
|
||||
httpClient: Client[F],
|
||||
pubSub: PubSub[F]
|
||||
): Resource[F, JoexApp[F]] =
|
||||
for {
|
||||
httpClient <- BlazeClientBuilder[F].resource
|
||||
client = JoexClient(httpClient)
|
||||
store <- Store.create(cfg.jdbc, cfg.files.chunkSize, connectEC)
|
||||
queue <- JobQueue(store)
|
||||
pstore <- PeriodicTaskStore.create(store)
|
||||
nodeOps <- ONode(store)
|
||||
joex <- OJoex(client, store)
|
||||
client = JoexClient(httpClient)
|
||||
pubSubT = PubSubT(
|
||||
pubSub,
|
||||
Logger.log4s(org.log4s.getLogger(s"joex-${cfg.appId.id}"))
|
||||
)
|
||||
joex <- OJoex(pubSubT)
|
||||
upload <- OUpload(store, queue, joex)
|
||||
fts <- createFtsClient(cfg)(httpClient)
|
||||
createIndex <- CreateIndex.resource(fts, store)
|
||||
@ -138,6 +152,7 @@ object JoexAppImpl {
|
||||
JavaMailEmil(Settings.defaultSettings.copy(debug = cfg.mailDebug))
|
||||
sch <- SchedulerBuilder(cfg.scheduler, store)
|
||||
.withQueue(queue)
|
||||
.withPubSub(pubSubT)
|
||||
.withTask(
|
||||
JobTask.json(
|
||||
ProcessItemArgs.taskName,
|
||||
@ -264,8 +279,8 @@ object JoexAppImpl {
|
||||
pstore,
|
||||
client
|
||||
)
|
||||
app = new JoexAppImpl(cfg, nodeOps, store, queue, pstore, termSignal, sch, psch)
|
||||
appR <- Resource.make(app.init.map(_ => app))(_.shutdown)
|
||||
app = new JoexAppImpl(cfg, store, queue, pubSubT, pstore, termSignal, sch, psch)
|
||||
appR <- Resource.make(app.init.map(_ => app))(_.initShutdown)
|
||||
} yield appR
|
||||
|
||||
private def createFtsClient[F[_]: Async](
|
||||
|
@ -11,10 +11,14 @@ import cats.effect._
|
||||
import fs2.Stream
|
||||
import fs2.concurrent.SignallingRef
|
||||
|
||||
import docspell.backend.msg.Topics
|
||||
import docspell.common.Pools
|
||||
import docspell.joex.routes._
|
||||
import docspell.pubsub.naive.NaivePubSub
|
||||
import docspell.store.Store
|
||||
|
||||
import org.http4s.HttpApp
|
||||
import org.http4s.blaze.client.BlazeClientBuilder
|
||||
import org.http4s.blaze.server.BlazeServerBuilder
|
||||
import org.http4s.implicits._
|
||||
import org.http4s.server.Router
|
||||
@ -33,9 +37,19 @@ object JoexServer {
|
||||
val app = for {
|
||||
signal <- Resource.eval(SignallingRef[F, Boolean](false))
|
||||
exitCode <- Resource.eval(Ref[F].of(ExitCode.Success))
|
||||
joexApp <- JoexAppImpl.create[F](cfg, signal, pools.connectEC)
|
||||
|
||||
store <- Store.create[F](
|
||||
cfg.jdbc,
|
||||
cfg.files.chunkSize,
|
||||
pools.connectEC
|
||||
)
|
||||
httpClient <- BlazeClientBuilder[F].resource
|
||||
pubSub <- NaivePubSub(cfg.pubSubConfig, store, httpClient)(Topics.all.map(_.topic))
|
||||
|
||||
joexApp <- JoexAppImpl.create[F](cfg, signal, store, httpClient, pubSub)
|
||||
|
||||
httpApp = Router(
|
||||
"/internal/pubsub" -> pubSub.receiveRoute,
|
||||
"/api/info" -> InfoRoutes(cfg),
|
||||
"/api/v1" -> JoexRoutes(joexApp)
|
||||
).orNotFound
|
||||
|
@ -11,6 +11,7 @@ import cats.effect.std.Semaphore
|
||||
import cats.implicits._
|
||||
import fs2.concurrent.SignallingRef
|
||||
|
||||
import docspell.pubsub.api.PubSubT
|
||||
import docspell.store.Store
|
||||
import docspell.store.queue.JobQueue
|
||||
|
||||
@ -19,7 +20,8 @@ case class SchedulerBuilder[F[_]: Async](
|
||||
tasks: JobTaskRegistry[F],
|
||||
store: Store[F],
|
||||
queue: Resource[F, JobQueue[F]],
|
||||
logSink: LogSink[F]
|
||||
logSink: LogSink[F],
|
||||
pubSub: PubSubT[F]
|
||||
) {
|
||||
|
||||
def withConfig(cfg: SchedulerConfig): SchedulerBuilder[F] =
|
||||
@ -32,7 +34,7 @@ case class SchedulerBuilder[F[_]: Async](
|
||||
withTaskRegistry(tasks.withTask(task))
|
||||
|
||||
def withQueue(queue: Resource[F, JobQueue[F]]): SchedulerBuilder[F] =
|
||||
SchedulerBuilder[F](config, tasks, store, queue, logSink)
|
||||
copy(queue = queue)
|
||||
|
||||
def withLogSink(sink: LogSink[F]): SchedulerBuilder[F] =
|
||||
copy(logSink = sink)
|
||||
@ -40,6 +42,9 @@ case class SchedulerBuilder[F[_]: Async](
|
||||
def withQueue(queue: JobQueue[F]): SchedulerBuilder[F] =
|
||||
copy(queue = Resource.pure[F, JobQueue[F]](queue))
|
||||
|
||||
def withPubSub(pubSubT: PubSubT[F]): SchedulerBuilder[F] =
|
||||
copy(pubSub = pubSubT)
|
||||
|
||||
def serve: Resource[F, Scheduler[F]] =
|
||||
resource.evalMap(sch => Async[F].start(sch.start.compile.drain).map(_ => sch))
|
||||
|
||||
@ -52,6 +57,7 @@ case class SchedulerBuilder[F[_]: Async](
|
||||
} yield new SchedulerImpl[F](
|
||||
config,
|
||||
jq,
|
||||
pubSub,
|
||||
tasks,
|
||||
store,
|
||||
logSink,
|
||||
@ -76,7 +82,8 @@ object SchedulerBuilder {
|
||||
JobTaskRegistry.empty[F],
|
||||
store,
|
||||
JobQueue(store),
|
||||
LogSink.db[F](store)
|
||||
LogSink.db[F](store),
|
||||
PubSubT.noop[F]
|
||||
)
|
||||
|
||||
}
|
||||
|
@ -13,19 +13,22 @@ import cats.implicits._
|
||||
import fs2.Stream
|
||||
import fs2.concurrent.SignallingRef
|
||||
|
||||
import docspell.backend.msg.JobDone
|
||||
import docspell.common._
|
||||
import docspell.common.syntax.all._
|
||||
import docspell.joex.scheduler.SchedulerImpl._
|
||||
import docspell.pubsub.api.PubSubT
|
||||
import docspell.store.Store
|
||||
import docspell.store.queries.QJob
|
||||
import docspell.store.queue.JobQueue
|
||||
import docspell.store.records.RJob
|
||||
|
||||
import org.log4s._
|
||||
import org.log4s.getLogger
|
||||
|
||||
final class SchedulerImpl[F[_]: Async](
|
||||
val config: SchedulerConfig,
|
||||
queue: JobQueue[F],
|
||||
pubSub: PubSubT[F],
|
||||
tasks: JobTaskRegistry[F],
|
||||
store: Store[F],
|
||||
logSink: LogSink[F],
|
||||
@ -55,20 +58,21 @@ final class SchedulerImpl[F[_]: Async](
|
||||
state.get.flatMap(s => QJob.findAll(s.getRunning, store))
|
||||
|
||||
def requestCancel(jobId: Ident): F[Boolean] =
|
||||
state.get.flatMap(_.cancelRequest(jobId) match {
|
||||
case Some(ct) => ct.map(_ => true)
|
||||
case None =>
|
||||
(for {
|
||||
job <- OptionT(store.transact(RJob.findByIdAndWorker(jobId, config.name)))
|
||||
_ <- OptionT.liftF(
|
||||
if (job.isInProgress) executeCancel(job)
|
||||
else ().pure[F]
|
||||
)
|
||||
} yield true)
|
||||
.getOrElseF(
|
||||
logger.fwarn(s"Job ${jobId.id} not found, cannot cancel.").map(_ => false)
|
||||
)
|
||||
})
|
||||
logger.finfo(s"Scheduler requested to cancel job: ${jobId.id}") *>
|
||||
state.get.flatMap(_.cancelRequest(jobId) match {
|
||||
case Some(ct) => ct.map(_ => true)
|
||||
case None =>
|
||||
(for {
|
||||
job <- OptionT(store.transact(RJob.findByIdAndWorker(jobId, config.name)))
|
||||
_ <- OptionT.liftF(
|
||||
if (job.isInProgress) executeCancel(job)
|
||||
else ().pure[F]
|
||||
)
|
||||
} yield true)
|
||||
.getOrElseF(
|
||||
logger.fwarn(s"Job ${jobId.id} not found, cannot cancel.").map(_ => false)
|
||||
)
|
||||
})
|
||||
|
||||
def notifyChange: F[Unit] =
|
||||
waiter.update(b => !b)
|
||||
@ -198,6 +202,10 @@ final class SchedulerImpl[F[_]: Async](
|
||||
)
|
||||
_ <- state.modify(_.removeRunning(job))
|
||||
_ <- QJob.setFinalState(job.id, finalState, store)
|
||||
_ <- pubSub.publish1IgnoreErrors(
|
||||
JobDone.topic,
|
||||
JobDone(job.id, job.task, job.args, finalState)
|
||||
)
|
||||
} yield ()
|
||||
|
||||
def onStart(job: RJob): F[Unit] =
|
||||
|
Reference in New Issue
Block a user