mirror of
				https://github.com/TheAnachronism/docspell.git
				synced 2025-11-03 18:00:11 +00:00 
			
		
		
		
	Log structured details during job execution
This commit is contained in:
		@@ -59,7 +59,14 @@ object ContextImpl {
 | 
			
		||||
    val log = docspell.logging.getLogger[F]
 | 
			
		||||
    for {
 | 
			
		||||
      _ <- log.trace("Creating logger for task run")
 | 
			
		||||
      logger <- QueueLogger(job.id, job.info, config.logBufferSize, logSink)
 | 
			
		||||
      logger <- QueueLogger(
 | 
			
		||||
        job.id,
 | 
			
		||||
        job.task,
 | 
			
		||||
        job.group,
 | 
			
		||||
        job.info,
 | 
			
		||||
        config.logBufferSize,
 | 
			
		||||
        logSink
 | 
			
		||||
      )
 | 
			
		||||
      _ <- log.trace("Logger created, instantiating context")
 | 
			
		||||
      ctx = create[F, A](job.id, arg, config, logger, store)
 | 
			
		||||
    } yield ctx
 | 
			
		||||
 
 | 
			
		||||
@@ -4,7 +4,7 @@
 | 
			
		||||
 * SPDX-License-Identifier: AGPL-3.0-or-later
 | 
			
		||||
 */
 | 
			
		||||
 | 
			
		||||
package docspell.scheduler
 | 
			
		||||
package docspell.scheduler.impl
 | 
			
		||||
 | 
			
		||||
import cats.effect.Sync
 | 
			
		||||
import cats.implicits._
 | 
			
		||||
@@ -13,6 +13,8 @@ import docspell.common._
 | 
			
		||||
 | 
			
		||||
case class LogEvent(
 | 
			
		||||
    jobId: Ident,
 | 
			
		||||
    taskName: Ident,
 | 
			
		||||
    group: Ident,
 | 
			
		||||
    jobInfo: String,
 | 
			
		||||
    time: Timestamp,
 | 
			
		||||
    level: LogLevel,
 | 
			
		||||
@@ -29,10 +31,14 @@ object LogEvent {
 | 
			
		||||
 | 
			
		||||
  def create[F[_]: Sync](
 | 
			
		||||
      jobId: Ident,
 | 
			
		||||
      taskName: Ident,
 | 
			
		||||
      group: Ident,
 | 
			
		||||
      jobInfo: String,
 | 
			
		||||
      level: LogLevel,
 | 
			
		||||
      msg: String
 | 
			
		||||
  ): F[LogEvent] =
 | 
			
		||||
    Timestamp.current[F].map(now => LogEvent(jobId, jobInfo, now, level, msg))
 | 
			
		||||
    Timestamp
 | 
			
		||||
      .current[F]
 | 
			
		||||
      .map(now => LogEvent(jobId, taskName, group, jobInfo, now, level, msg))
 | 
			
		||||
 | 
			
		||||
}
 | 
			
		||||
@@ -12,7 +12,6 @@ import fs2.Pipe
 | 
			
		||||
 | 
			
		||||
import docspell.common._
 | 
			
		||||
import docspell.logging
 | 
			
		||||
import docspell.scheduler.LogEvent
 | 
			
		||||
import docspell.store.Store
 | 
			
		||||
import docspell.store.records.RJobLog
 | 
			
		||||
 | 
			
		||||
@@ -32,7 +31,10 @@ object LogSink {
 | 
			
		||||
  def logInternal[F[_]: Sync](e: LogEvent): F[Unit] = {
 | 
			
		||||
    val logger = docspell.logging.getLogger[F]
 | 
			
		||||
    val addData: logging.LogEvent => logging.LogEvent =
 | 
			
		||||
      _.data("jobId", e.jobId).data("jobInfo", e.jobInfo)
 | 
			
		||||
      _.data("jobId", e.jobId)
 | 
			
		||||
        .data("task", e.taskName)
 | 
			
		||||
        .data("group", e.group)
 | 
			
		||||
        .data("jobInfo", e.jobInfo)
 | 
			
		||||
 | 
			
		||||
    e.level match {
 | 
			
		||||
      case LogLevel.Info =>
 | 
			
		||||
 
 | 
			
		||||
@@ -14,12 +14,13 @@ import fs2.Stream
 | 
			
		||||
import docspell.common.{Ident, LogLevel}
 | 
			
		||||
import docspell.logging
 | 
			
		||||
import docspell.logging.{Level, Logger}
 | 
			
		||||
import docspell.scheduler.LogEvent
 | 
			
		||||
 | 
			
		||||
object QueueLogger {
 | 
			
		||||
 | 
			
		||||
  def create[F[_]: Sync](
 | 
			
		||||
      jobId: Ident,
 | 
			
		||||
      taskName: Ident,
 | 
			
		||||
      group: Ident,
 | 
			
		||||
      jobInfo: String,
 | 
			
		||||
      q: Queue[F, LogEvent]
 | 
			
		||||
  ): Logger[F] =
 | 
			
		||||
@@ -27,7 +28,14 @@ object QueueLogger {
 | 
			
		||||
 | 
			
		||||
      def log(logEvent: logging.LogEvent) =
 | 
			
		||||
        LogEvent
 | 
			
		||||
          .create[F](jobId, jobInfo, level2Level(logEvent.level), logEvent.msg())
 | 
			
		||||
          .create[F](
 | 
			
		||||
            jobId,
 | 
			
		||||
            taskName,
 | 
			
		||||
            group,
 | 
			
		||||
            jobInfo,
 | 
			
		||||
            level2Level(logEvent.level),
 | 
			
		||||
            logEvent.msg()
 | 
			
		||||
          )
 | 
			
		||||
          .flatMap { ev =>
 | 
			
		||||
            val event =
 | 
			
		||||
              logEvent.findErrors.headOption
 | 
			
		||||
@@ -42,13 +50,15 @@ object QueueLogger {
 | 
			
		||||
 | 
			
		||||
  def apply[F[_]: Async](
 | 
			
		||||
      jobId: Ident,
 | 
			
		||||
      taskName: Ident,
 | 
			
		||||
      group: Ident,
 | 
			
		||||
      jobInfo: String,
 | 
			
		||||
      bufferSize: Int,
 | 
			
		||||
      sink: LogSink[F]
 | 
			
		||||
  ): F[Logger[F]] =
 | 
			
		||||
    for {
 | 
			
		||||
      q <- Queue.circularBuffer[F, LogEvent](bufferSize)
 | 
			
		||||
      log = create(jobId, jobInfo, q)
 | 
			
		||||
      log = create(jobId, taskName, group, jobInfo, q)
 | 
			
		||||
      _ <- Async[F].start(
 | 
			
		||||
        Stream.fromQueueUnterminated(q).through(sink.receive).compile.drain
 | 
			
		||||
      )
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user