mirror of
https://github.com/TheAnachronism/docspell.git
synced 2025-06-22 10:28:27 +00:00
Adopt to new loggin api
This commit is contained in:
@ -18,6 +18,17 @@ docspell.joex {
|
||||
port = 7878
|
||||
}
|
||||
|
||||
# Configures logging
|
||||
logging {
|
||||
# The format for the log messages. Can be one of:
|
||||
# Json, Logfmt, Fancy or Plain
|
||||
format = "Json"
|
||||
|
||||
# The minimum level to log. From lowest to highest:
|
||||
# Trace, Debug, Info, Warn, Error
|
||||
minimumLevel = "Debug"
|
||||
}
|
||||
|
||||
# The database connection.
|
||||
#
|
||||
# It must be the same connection as the rest server is using.
|
||||
|
@ -21,12 +21,14 @@ import docspell.joex.hk.HouseKeepingConfig
|
||||
import docspell.joex.routes.InternalHeader
|
||||
import docspell.joex.scheduler.{PeriodicSchedulerConfig, SchedulerConfig}
|
||||
import docspell.joex.updatecheck.UpdateCheckConfig
|
||||
import docspell.logging.LogConfig
|
||||
import docspell.pubsub.naive.PubSubConfig
|
||||
import docspell.store.JdbcConfig
|
||||
|
||||
case class Config(
|
||||
appId: Ident,
|
||||
baseUrl: LenientUri,
|
||||
logging: LogConfig,
|
||||
bind: Config.Bind,
|
||||
jdbc: JdbcConfig,
|
||||
scheduler: SchedulerConfig,
|
||||
|
@ -8,7 +8,6 @@ package docspell.joex
|
||||
|
||||
import cats.effect.Async
|
||||
|
||||
import docspell.common.Logger
|
||||
import docspell.config.Implicits._
|
||||
import docspell.config.{ConfigFactory, Validation}
|
||||
import docspell.joex.scheduler.CountingScheme
|
||||
@ -23,7 +22,7 @@ object ConfigFile {
|
||||
import Implicits._
|
||||
|
||||
def loadConfig[F[_]: Async](args: List[String]): F[Config] = {
|
||||
val logger = Logger.log4s[F](org.log4s.getLogger)
|
||||
val logger = docspell.logging.getLogger[F]
|
||||
ConfigFactory
|
||||
.default[F, Config](logger, "docspell.joex")(args, validate)
|
||||
}
|
||||
|
@ -132,10 +132,8 @@ object JoexAppImpl extends MailAddressCodec {
|
||||
): Resource[F, JoexApp[F]] =
|
||||
for {
|
||||
pstore <- PeriodicTaskStore.create(store)
|
||||
pubSubT = PubSubT(
|
||||
pubSub,
|
||||
Logger.log4s(org.log4s.getLogger(s"joex-${cfg.appId.id}"))
|
||||
)
|
||||
joexLogger = docspell.logging.getLogger[F](s"joex-${cfg.appId.id}")
|
||||
pubSubT = PubSubT(pubSub, joexLogger)
|
||||
javaEmil =
|
||||
JavaMailEmil(Settings.defaultSettings.copy(debug = cfg.mailDebug))
|
||||
notificationMod <- Resource.eval(
|
||||
|
@ -9,12 +9,12 @@ package docspell.joex
|
||||
import cats.effect._
|
||||
|
||||
import docspell.common._
|
||||
|
||||
import org.log4s.getLogger
|
||||
import docspell.logging.Logger
|
||||
import docspell.logging.impl.ScribeConfigure
|
||||
|
||||
object Main extends IOApp {
|
||||
|
||||
private val logger: Logger[IO] = Logger.log4s[IO](getLogger)
|
||||
private val logger: Logger[IO] = docspell.logging.getLogger[IO]
|
||||
|
||||
private val connectEC =
|
||||
ThreadFactories.fixed[IO](5, ThreadFactories.ofName("docspell-joex-dbconnect"))
|
||||
@ -22,6 +22,7 @@ object Main extends IOApp {
|
||||
def run(args: List[String]): IO[ExitCode] =
|
||||
for {
|
||||
cfg <- ConfigFile.loadConfig[IO](args)
|
||||
_ <- ScribeConfigure.configure[IO](cfg.logging)
|
||||
banner = Banner(
|
||||
"JOEX",
|
||||
BuildInfo.version,
|
||||
|
@ -12,7 +12,6 @@ import cats.implicits._
|
||||
import fs2.io.file.Path
|
||||
|
||||
import docspell.common._
|
||||
import docspell.common.syntax.all._
|
||||
import docspell.store.Store
|
||||
import docspell.store.queries.QCollective
|
||||
import docspell.store.records.REquipment
|
||||
@ -20,7 +19,6 @@ import docspell.store.records.ROrganization
|
||||
import docspell.store.records.RPerson
|
||||
|
||||
import io.circe.syntax._
|
||||
import org.log4s.getLogger
|
||||
|
||||
/** Maintains a custom regex-ner file per collective for stanford's regexner annotator. */
|
||||
trait RegexNerFile[F[_]] {
|
||||
@ -30,7 +28,6 @@ trait RegexNerFile[F[_]] {
|
||||
}
|
||||
|
||||
object RegexNerFile {
|
||||
private[this] val logger = getLogger
|
||||
|
||||
case class Config(maxEntries: Int, directory: Path, minTime: Duration)
|
||||
|
||||
@ -49,6 +46,8 @@ object RegexNerFile {
|
||||
writer: Semaphore[F] // TODO allow parallelism per collective
|
||||
) extends RegexNerFile[F] {
|
||||
|
||||
private[this] val logger = docspell.logging.getLogger[F]
|
||||
|
||||
def makeFile(collective: Ident): F[Option[Path]] =
|
||||
if (cfg.maxEntries > 0) doMakeFile(collective)
|
||||
else (None: Option[Path]).pure[F]
|
||||
@ -61,7 +60,7 @@ object RegexNerFile {
|
||||
case Some(nf) =>
|
||||
val dur = Duration.between(nf.creation, now)
|
||||
if (dur > cfg.minTime)
|
||||
logger.fdebug(
|
||||
logger.debug(
|
||||
s"Cache time elapsed ($dur > ${cfg.minTime}). Check for new state."
|
||||
) *> updateFile(
|
||||
collective,
|
||||
@ -89,12 +88,12 @@ object RegexNerFile {
|
||||
case Some(cur) =>
|
||||
val nerf =
|
||||
if (cur.updated == lup)
|
||||
logger.fdebug(s"No state change detected.") *> updateTimestamp(
|
||||
logger.debug(s"No state change detected.") *> updateTimestamp(
|
||||
cur,
|
||||
now
|
||||
) *> cur.pure[F]
|
||||
else
|
||||
logger.fdebug(
|
||||
logger.debug(
|
||||
s"There have been state changes for collective '${collective.id}'. Reload NER file."
|
||||
) *> createFile(lup, collective, now)
|
||||
nerf.map(_.nerFilePath(cfg.directory).some)
|
||||
@ -126,7 +125,7 @@ object RegexNerFile {
|
||||
writer.permit.use(_ =>
|
||||
for {
|
||||
jsonFile <- Sync[F].pure(nf.jsonFilePath(cfg.directory))
|
||||
_ <- logger.fdebug(
|
||||
_ <- logger.debug(
|
||||
s"Writing custom NER file for collective '${collective.id}'"
|
||||
)
|
||||
_ <- jsonFile.parent match {
|
||||
@ -139,7 +138,7 @@ object RegexNerFile {
|
||||
)
|
||||
|
||||
for {
|
||||
_ <- logger.finfo(s"Generating custom NER file for collective '${collective.id}'")
|
||||
_ <- logger.info(s"Generating custom NER file for collective '${collective.id}'")
|
||||
names <- store.transact(QCollective.allNames(collective, cfg.maxEntries))
|
||||
nerFile = NerFile(collective, lastUpdate, now)
|
||||
_ <- update(nerFile, NerFile.mkNerConfig(names))
|
||||
|
@ -7,10 +7,10 @@
|
||||
package docspell.joex.fts
|
||||
|
||||
import docspell.backend.fulltext.CreateIndex
|
||||
import docspell.common.Logger
|
||||
import docspell.ftsclient.FtsClient
|
||||
import docspell.joex.Config
|
||||
import docspell.joex.scheduler.Context
|
||||
import docspell.logging.Logger
|
||||
import docspell.store.Store
|
||||
|
||||
case class FtsContext[F[_]](
|
||||
|
@ -15,6 +15,7 @@ import docspell.common._
|
||||
import docspell.ftsclient._
|
||||
import docspell.joex.Config
|
||||
import docspell.joex.scheduler.Context
|
||||
import docspell.logging.Logger
|
||||
|
||||
object FtsWork {
|
||||
import syntax._
|
||||
|
@ -15,6 +15,7 @@ import docspell.backend.fulltext.CreateIndex
|
||||
import docspell.common._
|
||||
import docspell.ftsclient._
|
||||
import docspell.joex.Config
|
||||
import docspell.logging.Logger
|
||||
import docspell.store.Store
|
||||
|
||||
/** Migrating the index from the previous version to this version.
|
||||
|
@ -11,6 +11,7 @@ import cats.implicits._
|
||||
|
||||
import docspell.common._
|
||||
import docspell.joex.scheduler.{Context, Task}
|
||||
import docspell.logging.Logger
|
||||
import docspell.store.records._
|
||||
|
||||
import org.http4s.blaze.client.BlazeClientBuilder
|
||||
|
@ -14,6 +14,7 @@ import fs2.io.file.Path
|
||||
|
||||
import docspell.analysis.classifier.{ClassifierModel, TextClassifier}
|
||||
import docspell.common._
|
||||
import docspell.logging.Logger
|
||||
import docspell.store.Store
|
||||
import docspell.store.records.RClassifierModel
|
||||
|
||||
|
@ -15,6 +15,7 @@ import docspell.backend.ops.OCollective
|
||||
import docspell.common._
|
||||
import docspell.joex.Config
|
||||
import docspell.joex.scheduler._
|
||||
import docspell.logging.Logger
|
||||
import docspell.store.records.{RClassifierModel, RClassifierSetting}
|
||||
|
||||
object LearnClassifierTask {
|
||||
|
@ -13,6 +13,7 @@ import fs2.io.file.Files
|
||||
import docspell.analysis.classifier.ClassifierModel
|
||||
import docspell.common._
|
||||
import docspell.joex.scheduler._
|
||||
import docspell.logging.Logger
|
||||
import docspell.store.Store
|
||||
import docspell.store.records.RClassifierModel
|
||||
|
||||
|
@ -11,6 +11,7 @@ import cats.implicits._
|
||||
import fs2.{Pipe, Stream}
|
||||
|
||||
import docspell.common._
|
||||
import docspell.logging.Logger
|
||||
import docspell.store.syntax.MimeTypes._
|
||||
|
||||
import emil.javamail.syntax._
|
||||
|
@ -12,6 +12,7 @@ import cats.implicits._
|
||||
|
||||
import docspell.backend.ops.ONotification
|
||||
import docspell.common._
|
||||
import docspell.logging.Logger
|
||||
import docspell.notification.api.ChannelRef
|
||||
import docspell.notification.api.Event
|
||||
import docspell.notification.api.EventContext
|
||||
|
@ -13,6 +13,7 @@ import cats.implicits._
|
||||
|
||||
import docspell.common._
|
||||
import docspell.joex.scheduler.Task
|
||||
import docspell.logging.Logger
|
||||
|
||||
/** After candidates have been determined, the set is reduced by doing some cross checks.
|
||||
* For example: if a organization is suggested as correspondent, the correspondent person
|
||||
|
@ -49,7 +49,7 @@ object ReProcessItem {
|
||||
|
||||
private def contains[F[_]](ctx: Context[F, Args]): RAttachment => Boolean = {
|
||||
val selection = ctx.args.attachments.toSet
|
||||
if (selection.isEmpty) (_ => true)
|
||||
if (selection.isEmpty) _ => true
|
||||
else ra => selection.contains(ra.id)
|
||||
}
|
||||
|
||||
|
@ -1,45 +0,0 @@
|
||||
/*
|
||||
* Copyright 2020 Eike K. & Contributors
|
||||
*
|
||||
* SPDX-License-Identifier: AGPL-3.0-or-later
|
||||
*/
|
||||
|
||||
package docspell.joex.process
|
||||
|
||||
import cats.effect.Sync
|
||||
import cats.implicits._
|
||||
|
||||
import docspell.common.ProcessItemArgs
|
||||
import docspell.common.syntax.all._
|
||||
import docspell.joex.scheduler.Task
|
||||
|
||||
import org.log4s._
|
||||
|
||||
object TestTasks {
|
||||
private[this] val logger = getLogger
|
||||
|
||||
def success[F[_]]: Task[F, ProcessItemArgs, Unit] =
|
||||
Task(ctx => ctx.logger.info(s"Running task now: ${ctx.args}"))
|
||||
|
||||
def failing[F[_]: Sync]: Task[F, ProcessItemArgs, Unit] =
|
||||
Task { ctx =>
|
||||
ctx.logger
|
||||
.info(s"Failing the task run :(")
|
||||
.map(_ => sys.error("Oh, cannot extract gold from this document"))
|
||||
}
|
||||
|
||||
def longRunning[F[_]: Sync]: Task[F, ProcessItemArgs, Unit] =
|
||||
Task { ctx =>
|
||||
logger.fwarn(s"${Thread.currentThread()} From executing long running task") >>
|
||||
ctx.logger.info(s"${Thread.currentThread()} Running task now: ${ctx.args}") >>
|
||||
sleep(2400) >>
|
||||
ctx.logger.debug("doing things") >>
|
||||
sleep(2400) >>
|
||||
ctx.logger.debug("doing more things") >>
|
||||
sleep(2400) >>
|
||||
ctx.logger.info("doing more things")
|
||||
}
|
||||
|
||||
private def sleep[F[_]: Sync](ms: Long): F[Unit] =
|
||||
Sync[F].delay(Thread.sleep(ms))
|
||||
}
|
@ -17,6 +17,7 @@ import docspell.backend.ops.{OJoex, OUpload}
|
||||
import docspell.common._
|
||||
import docspell.joex.Config
|
||||
import docspell.joex.scheduler.{Context, Task}
|
||||
import docspell.logging.Logger
|
||||
import docspell.store.queries.QOrganization
|
||||
import docspell.store.records._
|
||||
|
||||
|
@ -11,12 +11,10 @@ import cats.implicits._
|
||||
import cats.{Applicative, Functor}
|
||||
|
||||
import docspell.common._
|
||||
import docspell.common.syntax.all._
|
||||
import docspell.logging.Logger
|
||||
import docspell.store.Store
|
||||
import docspell.store.records.RJob
|
||||
|
||||
import org.log4s.{Logger => _, _}
|
||||
|
||||
trait Context[F[_], A] { self =>
|
||||
|
||||
def jobId: Ident
|
||||
@ -42,7 +40,6 @@ trait Context[F[_], A] { self =>
|
||||
}
|
||||
|
||||
object Context {
|
||||
private[this] val log = getLogger
|
||||
|
||||
def create[F[_]: Async, A](
|
||||
jobId: Ident,
|
||||
@ -59,13 +56,15 @@ object Context {
|
||||
config: SchedulerConfig,
|
||||
logSink: LogSink[F],
|
||||
store: Store[F]
|
||||
): F[Context[F, A]] =
|
||||
): F[Context[F, A]] = {
|
||||
val log = docspell.logging.getLogger[F]
|
||||
for {
|
||||
_ <- log.ftrace("Creating logger for task run")
|
||||
_ <- log.trace("Creating logger for task run")
|
||||
logger <- QueueLogger(job.id, job.info, config.logBufferSize, logSink)
|
||||
_ <- log.ftrace("Logger created, instantiating context")
|
||||
_ <- log.trace("Logger created, instantiating context")
|
||||
ctx = create[F, A](job.id, arg, config, logger, store)
|
||||
} yield ctx
|
||||
}
|
||||
|
||||
final private class ContextImpl[F[_]: Functor, A](
|
||||
val args: A,
|
||||
|
@ -11,12 +11,9 @@ import cats.implicits._
|
||||
import fs2.Pipe
|
||||
|
||||
import docspell.common._
|
||||
import docspell.common.syntax.all._
|
||||
import docspell.store.Store
|
||||
import docspell.store.records.RJobLog
|
||||
|
||||
import org.log4s.{LogLevel => _, _}
|
||||
|
||||
trait LogSink[F[_]] {
|
||||
|
||||
def receive: Pipe[F, LogEvent, Unit]
|
||||
@ -24,29 +21,30 @@ trait LogSink[F[_]] {
|
||||
}
|
||||
|
||||
object LogSink {
|
||||
private[this] val logger = getLogger
|
||||
|
||||
def apply[F[_]](sink: Pipe[F, LogEvent, Unit]): LogSink[F] =
|
||||
new LogSink[F] {
|
||||
val receive = sink
|
||||
}
|
||||
|
||||
def logInternal[F[_]: Sync](e: LogEvent): F[Unit] =
|
||||
def logInternal[F[_]: Sync](e: LogEvent): F[Unit] = {
|
||||
val logger = docspell.logging.getLogger[F]
|
||||
e.level match {
|
||||
case LogLevel.Info =>
|
||||
logger.finfo(e.logLine)
|
||||
logger.info(e.logLine)
|
||||
case LogLevel.Debug =>
|
||||
logger.fdebug(e.logLine)
|
||||
logger.debug(e.logLine)
|
||||
case LogLevel.Warn =>
|
||||
logger.fwarn(e.logLine)
|
||||
logger.warn(e.logLine)
|
||||
case LogLevel.Error =>
|
||||
e.ex match {
|
||||
case Some(exc) =>
|
||||
logger.ferror(exc)(e.logLine)
|
||||
logger.error(exc)(e.logLine)
|
||||
case None =>
|
||||
logger.ferror(e.logLine)
|
||||
logger.error(e.logLine)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
def printer[F[_]: Sync]: LogSink[F] =
|
||||
LogSink(_.evalMap(e => logInternal(e)))
|
||||
|
@ -13,13 +13,11 @@ import fs2.concurrent.SignallingRef
|
||||
|
||||
import docspell.backend.ops.OJoex
|
||||
import docspell.common._
|
||||
import docspell.common.syntax.all._
|
||||
import docspell.joex.scheduler.PeriodicSchedulerImpl.State
|
||||
import docspell.store.queue._
|
||||
import docspell.store.records.RPeriodicTask
|
||||
|
||||
import eu.timepit.fs2cron.calev.CalevScheduler
|
||||
import org.log4s.getLogger
|
||||
|
||||
final class PeriodicSchedulerImpl[F[_]: Async](
|
||||
val config: PeriodicSchedulerConfig,
|
||||
@ -30,10 +28,10 @@ final class PeriodicSchedulerImpl[F[_]: Async](
|
||||
waiter: SignallingRef[F, Boolean],
|
||||
state: SignallingRef[F, State[F]]
|
||||
) extends PeriodicScheduler[F] {
|
||||
private[this] val logger = getLogger
|
||||
private[this] val logger = docspell.logging.getLogger[F]
|
||||
|
||||
def start: Stream[F, Nothing] =
|
||||
logger.sinfo("Starting periodic scheduler") ++
|
||||
logger.stream.info("Starting periodic scheduler").drain ++
|
||||
mainLoop
|
||||
|
||||
def shutdown: F[Unit] =
|
||||
@ -43,7 +41,7 @@ final class PeriodicSchedulerImpl[F[_]: Async](
|
||||
Async[F].start(
|
||||
Stream
|
||||
.awakeEvery[F](config.wakeupPeriod.toScala)
|
||||
.evalMap(_ => logger.fdebug("Periodic awake reached") *> notifyChange)
|
||||
.evalMap(_ => logger.debug("Periodic awake reached") *> notifyChange)
|
||||
.compile
|
||||
.drain
|
||||
)
|
||||
@ -62,22 +60,22 @@ final class PeriodicSchedulerImpl[F[_]: Async](
|
||||
def mainLoop: Stream[F, Nothing] = {
|
||||
val body: F[Boolean] =
|
||||
for {
|
||||
_ <- logger.fdebug(s"Going into main loop")
|
||||
_ <- logger.debug(s"Going into main loop")
|
||||
now <- Timestamp.current[F]
|
||||
_ <- logger.fdebug(s"Looking for next periodic task")
|
||||
_ <- logger.debug(s"Looking for next periodic task")
|
||||
go <- logThrow("Error getting next task")(
|
||||
store
|
||||
.takeNext(config.name, None)
|
||||
.use {
|
||||
case Marked.Found(pj) =>
|
||||
logger
|
||||
.fdebug(s"Found periodic task '${pj.subject}/${pj.timer.asString}'") *>
|
||||
.debug(s"Found periodic task '${pj.subject}/${pj.timer.asString}'") *>
|
||||
(if (isTriggered(pj, now)) submitJob(pj)
|
||||
else scheduleNotify(pj).map(_ => false))
|
||||
case Marked.NotFound =>
|
||||
logger.fdebug("No periodic task found") *> false.pure[F]
|
||||
logger.debug("No periodic task found") *> false.pure[F]
|
||||
case Marked.NotMarkable =>
|
||||
logger.fdebug("Periodic job cannot be marked. Trying again.") *> true
|
||||
logger.debug("Periodic job cannot be marked. Trying again.") *> true
|
||||
.pure[F]
|
||||
}
|
||||
)
|
||||
@ -86,7 +84,7 @@ final class PeriodicSchedulerImpl[F[_]: Async](
|
||||
Stream
|
||||
.eval(state.get.map(_.shutdownRequest))
|
||||
.evalTap(
|
||||
if (_) logger.finfo[F]("Stopping main loop due to shutdown request.")
|
||||
if (_) logger.info("Stopping main loop due to shutdown request.")
|
||||
else ().pure[F]
|
||||
)
|
||||
.flatMap(if (_) Stream.empty else Stream.eval(cancelNotify *> body))
|
||||
@ -94,9 +92,9 @@ final class PeriodicSchedulerImpl[F[_]: Async](
|
||||
case true =>
|
||||
mainLoop
|
||||
case false =>
|
||||
logger.sdebug(s"Waiting for notify") ++
|
||||
logger.stream.debug(s"Waiting for notify").drain ++
|
||||
waiter.discrete.take(2).drain ++
|
||||
logger.sdebug(s"Notify signal, going into main loop") ++
|
||||
logger.stream.debug(s"Notify signal, going into main loop").drain ++
|
||||
mainLoop
|
||||
}
|
||||
}
|
||||
@ -109,12 +107,12 @@ final class PeriodicSchedulerImpl[F[_]: Async](
|
||||
.findNonFinalJob(pj.id)
|
||||
.flatMap {
|
||||
case Some(job) =>
|
||||
logger.finfo[F](
|
||||
logger.info(
|
||||
s"There is already a job with non-final state '${job.state}' in the queue"
|
||||
) *> scheduleNotify(pj) *> false.pure[F]
|
||||
|
||||
case None =>
|
||||
logger.finfo[F](s"Submitting job for periodic task '${pj.task.id}'") *>
|
||||
logger.info(s"Submitting job for periodic task '${pj.task.id}'") *>
|
||||
pj.toJob.flatMap(queue.insert) *> notifyJoex *> true.pure[F]
|
||||
}
|
||||
|
||||
@ -125,7 +123,7 @@ final class PeriodicSchedulerImpl[F[_]: Async](
|
||||
Timestamp
|
||||
.current[F]
|
||||
.flatMap(now =>
|
||||
logger.fdebug(
|
||||
logger.debug(
|
||||
s"Scheduling next notify for timer ${pj.timer.asString} -> ${pj.timer.nextElapse(now.toUtcDateTime)}"
|
||||
)
|
||||
) *>
|
||||
@ -153,13 +151,13 @@ final class PeriodicSchedulerImpl[F[_]: Async](
|
||||
private def logError(msg: => String)(fa: F[Unit]): F[Unit] =
|
||||
fa.attempt.flatMap {
|
||||
case Right(_) => ().pure[F]
|
||||
case Left(ex) => logger.ferror(ex)(msg).map(_ => ())
|
||||
case Left(ex) => logger.error(ex)(msg).map(_ => ())
|
||||
}
|
||||
|
||||
private def logThrow[A](msg: => String)(fa: F[A]): F[A] =
|
||||
fa.attempt.flatMap {
|
||||
case r @ Right(_) => (r: Either[Throwable, A]).pure[F]
|
||||
case l @ Left(ex) => logger.ferror(ex)(msg).map(_ => (l: Either[Throwable, A]))
|
||||
case l @ Left(ex) => logger.error(ex)(msg).map(_ => (l: Either[Throwable, A]))
|
||||
}.rethrow
|
||||
}
|
||||
|
||||
|
@ -12,6 +12,8 @@ import cats.implicits._
|
||||
import fs2.Stream
|
||||
|
||||
import docspell.common._
|
||||
import docspell.logging
|
||||
import docspell.logging.{Level, Logger}
|
||||
|
||||
object QueueLogger {
|
||||
|
||||
@ -21,26 +23,20 @@ object QueueLogger {
|
||||
q: Queue[F, LogEvent]
|
||||
): Logger[F] =
|
||||
new Logger[F] {
|
||||
def trace(msg: => String): F[Unit] =
|
||||
LogEvent.create[F](jobId, jobInfo, LogLevel.Debug, msg).flatMap(q.offer)
|
||||
|
||||
def debug(msg: => String): F[Unit] =
|
||||
LogEvent.create[F](jobId, jobInfo, LogLevel.Debug, msg).flatMap(q.offer)
|
||||
|
||||
def info(msg: => String): F[Unit] =
|
||||
LogEvent.create[F](jobId, jobInfo, LogLevel.Info, msg).flatMap(q.offer)
|
||||
|
||||
def warn(msg: => String): F[Unit] =
|
||||
LogEvent.create[F](jobId, jobInfo, LogLevel.Warn, msg).flatMap(q.offer)
|
||||
|
||||
def error(ex: Throwable)(msg: => String): F[Unit] =
|
||||
def log(logEvent: logging.LogEvent) =
|
||||
LogEvent
|
||||
.create[F](jobId, jobInfo, LogLevel.Error, msg)
|
||||
.map(le => le.copy(ex = Some(ex)))
|
||||
.flatMap(q.offer)
|
||||
.create[F](jobId, jobInfo, level2Level(logEvent.level), logEvent.msg())
|
||||
.flatMap { ev =>
|
||||
val event =
|
||||
logEvent.findErrors.headOption
|
||||
.map(ex => ev.copy(ex = Some(ex)))
|
||||
.getOrElse(ev)
|
||||
|
||||
def error(msg: => String): F[Unit] =
|
||||
LogEvent.create[F](jobId, jobInfo, LogLevel.Error, msg).flatMap(q.offer)
|
||||
q.offer(event)
|
||||
}
|
||||
|
||||
def asUnsafe = Logger.off
|
||||
}
|
||||
|
||||
def apply[F[_]: Async](
|
||||
@ -57,4 +53,6 @@ object QueueLogger {
|
||||
)
|
||||
} yield log
|
||||
|
||||
private def level2Level(level: Level): LogLevel =
|
||||
LogLevel.fromLevel(level)
|
||||
}
|
||||
|
@ -15,7 +15,6 @@ import fs2.concurrent.SignallingRef
|
||||
|
||||
import docspell.backend.msg.JobDone
|
||||
import docspell.common._
|
||||
import docspell.common.syntax.all._
|
||||
import docspell.joex.scheduler.SchedulerImpl._
|
||||
import docspell.notification.api.Event
|
||||
import docspell.notification.api.EventSink
|
||||
@ -26,7 +25,6 @@ import docspell.store.queue.JobQueue
|
||||
import docspell.store.records.RJob
|
||||
|
||||
import io.circe.Json
|
||||
import org.log4s.getLogger
|
||||
|
||||
final class SchedulerImpl[F[_]: Async](
|
||||
val config: SchedulerConfig,
|
||||
@ -41,7 +39,7 @@ final class SchedulerImpl[F[_]: Async](
|
||||
permits: Semaphore[F]
|
||||
) extends Scheduler[F] {
|
||||
|
||||
private[this] val logger = getLogger
|
||||
private[this] val logger = docspell.logging.getLogger[F]
|
||||
|
||||
/** On startup, get all jobs in state running from this scheduler and put them into
|
||||
* waiting state, so they get picked up again.
|
||||
@ -53,7 +51,7 @@ final class SchedulerImpl[F[_]: Async](
|
||||
Async[F].start(
|
||||
Stream
|
||||
.awakeEvery[F](config.wakeupPeriod.toScala)
|
||||
.evalMap(_ => logger.fdebug("Periodic awake reached") *> notifyChange)
|
||||
.evalMap(_ => logger.debug("Periodic awake reached") *> notifyChange)
|
||||
.compile
|
||||
.drain
|
||||
)
|
||||
@ -62,7 +60,7 @@ final class SchedulerImpl[F[_]: Async](
|
||||
state.get.flatMap(s => QJob.findAll(s.getRunning, store))
|
||||
|
||||
def requestCancel(jobId: Ident): F[Boolean] =
|
||||
logger.finfo(s"Scheduler requested to cancel job: ${jobId.id}") *>
|
||||
logger.info(s"Scheduler requested to cancel job: ${jobId.id}") *>
|
||||
state.get.flatMap(_.cancelRequest(jobId) match {
|
||||
case Some(ct) => ct.map(_ => true)
|
||||
case None =>
|
||||
@ -74,7 +72,7 @@ final class SchedulerImpl[F[_]: Async](
|
||||
)
|
||||
} yield true)
|
||||
.getOrElseF(
|
||||
logger.fwarn(s"Job ${jobId.id} not found, cannot cancel.").map(_ => false)
|
||||
logger.warn(s"Job ${jobId.id} not found, cannot cancel.").map(_ => false)
|
||||
)
|
||||
})
|
||||
|
||||
@ -90,16 +88,16 @@ final class SchedulerImpl[F[_]: Async](
|
||||
|
||||
val wait = Stream
|
||||
.eval(runShutdown)
|
||||
.evalMap(_ => logger.finfo("Scheduler is shutting down now."))
|
||||
.evalMap(_ => logger.info("Scheduler is shutting down now."))
|
||||
.flatMap(_ =>
|
||||
Stream.eval(state.get) ++ Stream
|
||||
.suspend(state.discrete.takeWhile(_.getRunning.nonEmpty))
|
||||
)
|
||||
.flatMap { state =>
|
||||
if (state.getRunning.isEmpty) Stream.eval(logger.finfo("No jobs running."))
|
||||
if (state.getRunning.isEmpty) Stream.eval(logger.info("No jobs running."))
|
||||
else
|
||||
Stream.eval(
|
||||
logger.finfo(s"Waiting for ${state.getRunning.size} jobs to finish.")
|
||||
logger.info(s"Waiting for ${state.getRunning.size} jobs to finish.")
|
||||
) ++
|
||||
Stream.emit(state)
|
||||
}
|
||||
@ -108,35 +106,35 @@ final class SchedulerImpl[F[_]: Async](
|
||||
}
|
||||
|
||||
def start: Stream[F, Nothing] =
|
||||
logger.sinfo("Starting scheduler") ++
|
||||
logger.stream.info("Starting scheduler").drain ++
|
||||
mainLoop
|
||||
|
||||
def mainLoop: Stream[F, Nothing] = {
|
||||
val body: F[Boolean] =
|
||||
for {
|
||||
_ <- permits.available.flatMap(a =>
|
||||
logger.fdebug(s"Try to acquire permit ($a free)")
|
||||
logger.debug(s"Try to acquire permit ($a free)")
|
||||
)
|
||||
_ <- permits.acquire
|
||||
_ <- logger.fdebug("New permit acquired")
|
||||
_ <- logger.debug("New permit acquired")
|
||||
down <- state.get.map(_.shutdownRequest)
|
||||
rjob <-
|
||||
if (down)
|
||||
logger.finfo("") *> permits.release *> (None: Option[RJob]).pure[F]
|
||||
logger.info("") *> permits.release *> (None: Option[RJob]).pure[F]
|
||||
else
|
||||
queue.nextJob(
|
||||
group => state.modify(_.nextPrio(group, config.countingScheme)),
|
||||
config.name,
|
||||
config.retryDelay
|
||||
)
|
||||
_ <- logger.fdebug(s"Next job found: ${rjob.map(_.info)}")
|
||||
_ <- logger.debug(s"Next job found: ${rjob.map(_.info)}")
|
||||
_ <- rjob.map(execute).getOrElse(permits.release)
|
||||
} yield rjob.isDefined
|
||||
|
||||
Stream
|
||||
.eval(state.get.map(_.shutdownRequest))
|
||||
.evalTap(
|
||||
if (_) logger.finfo[F]("Stopping main loop due to shutdown request.")
|
||||
if (_) logger.info("Stopping main loop due to shutdown request.")
|
||||
else ().pure[F]
|
||||
)
|
||||
.flatMap(if (_) Stream.empty else Stream.eval(body))
|
||||
@ -144,9 +142,9 @@ final class SchedulerImpl[F[_]: Async](
|
||||
case true =>
|
||||
mainLoop
|
||||
case false =>
|
||||
logger.sdebug(s"Waiting for notify") ++
|
||||
logger.stream.debug(s"Waiting for notify").drain ++
|
||||
waiter.discrete.take(2).drain ++
|
||||
logger.sdebug(s"Notify signal, going into main loop") ++
|
||||
logger.stream.debug(s"Notify signal, going into main loop").drain ++
|
||||
mainLoop
|
||||
}
|
||||
}
|
||||
@ -161,17 +159,17 @@ final class SchedulerImpl[F[_]: Async](
|
||||
|
||||
task match {
|
||||
case Left(err) =>
|
||||
logger.ferror(s"Unable to run cancellation task for job ${job.info}: $err")
|
||||
logger.error(s"Unable to run cancellation task for job ${job.info}: $err")
|
||||
case Right(t) =>
|
||||
for {
|
||||
_ <-
|
||||
logger.fdebug(s"Creating context for job ${job.info} to run cancellation $t")
|
||||
logger.debug(s"Creating context for job ${job.info} to run cancellation $t")
|
||||
ctx <- Context[F, String](job, job.args, config, logSink, store)
|
||||
_ <- t.onCancel.run(ctx)
|
||||
_ <- state.modify(_.markCancelled(job))
|
||||
_ <- onFinish(job, Json.Null, JobState.Cancelled)
|
||||
_ <- ctx.logger.warn("Job has been cancelled.")
|
||||
_ <- logger.fdebug(s"Job ${job.info} has been cancelled.")
|
||||
_ <- logger.debug(s"Job ${job.info} has been cancelled.")
|
||||
} yield ()
|
||||
}
|
||||
}
|
||||
@ -186,10 +184,10 @@ final class SchedulerImpl[F[_]: Async](
|
||||
|
||||
task match {
|
||||
case Left(err) =>
|
||||
logger.ferror(s"Unable to start a task for job ${job.info}: $err")
|
||||
logger.error(s"Unable to start a task for job ${job.info}: $err")
|
||||
case Right(t) =>
|
||||
for {
|
||||
_ <- logger.fdebug(s"Creating context for job ${job.info} to run $t")
|
||||
_ <- logger.debug(s"Creating context for job ${job.info} to run $t")
|
||||
ctx <- Context[F, String](job, job.args, config, logSink, store)
|
||||
jot = wrapTask(job, t.task, ctx)
|
||||
tok <- forkRun(job, jot.run(ctx), t.onCancel.run(ctx), ctx)
|
||||
@ -200,9 +198,9 @@ final class SchedulerImpl[F[_]: Async](
|
||||
|
||||
def onFinish(job: RJob, result: Json, finishState: JobState): F[Unit] =
|
||||
for {
|
||||
_ <- logger.fdebug(s"Job ${job.info} done $finishState. Releasing resources.")
|
||||
_ <- logger.debug(s"Job ${job.info} done $finishState. Releasing resources.")
|
||||
_ <- permits.release *> permits.available.flatMap(a =>
|
||||
logger.fdebug(s"Permit released ($a free)")
|
||||
logger.debug(s"Permit released ($a free)")
|
||||
)
|
||||
_ <- state.modify(_.removeRunning(job))
|
||||
_ <- QJob.setFinalState(job.id, finishState, store)
|
||||
@ -241,7 +239,7 @@ final class SchedulerImpl[F[_]: Async](
|
||||
ctx: Context[F, String]
|
||||
): Task[F, String, Unit] =
|
||||
task
|
||||
.mapF(fa => onStart(job) *> logger.fdebug("Starting task now") *> fa)
|
||||
.mapF(fa => onStart(job) *> logger.debug("Starting task now") *> fa)
|
||||
.mapF(_.attempt.flatMap {
|
||||
case Right(result) =>
|
||||
logger.info(s"Job execution successful: ${job.info}")
|
||||
@ -284,11 +282,11 @@ final class SchedulerImpl[F[_]: Async](
|
||||
onCancel: F[Unit],
|
||||
ctx: Context[F, String]
|
||||
): F[F[Unit]] =
|
||||
logger.fdebug(s"Forking job ${job.info}") *>
|
||||
logger.debug(s"Forking job ${job.info}") *>
|
||||
Async[F]
|
||||
.start(code)
|
||||
.map(fiber =>
|
||||
logger.fdebug(s"Cancelling job ${job.info}") *>
|
||||
logger.debug(s"Cancelling job ${job.info}") *>
|
||||
fiber.cancel *>
|
||||
onCancel.attempt.map {
|
||||
case Right(_) => ()
|
||||
@ -299,7 +297,7 @@ final class SchedulerImpl[F[_]: Async](
|
||||
state.modify(_.markCancelled(job)) *>
|
||||
onFinish(job, Json.Null, JobState.Cancelled) *>
|
||||
ctx.logger.warn("Job has been cancelled.") *>
|
||||
logger.fdebug(s"Job ${job.info} has been cancelled.")
|
||||
logger.debug(s"Job ${job.info} has been cancelled.")
|
||||
)
|
||||
}
|
||||
|
||||
|
@ -11,7 +11,7 @@ import cats.data.Kleisli
|
||||
import cats.effect.Sync
|
||||
import cats.implicits._
|
||||
|
||||
import docspell.common.Logger
|
||||
import docspell.logging.Logger
|
||||
|
||||
/** The code that is executed by the scheduler */
|
||||
trait Task[F[_], A, B] {
|
||||
|
Reference in New Issue
Block a user