diff --git a/modules/scheduler/impl/src/main/scala/docspell/scheduler/impl/ContextImpl.scala b/modules/scheduler/impl/src/main/scala/docspell/scheduler/impl/ContextImpl.scala index d801a169..59016b9f 100644 --- a/modules/scheduler/impl/src/main/scala/docspell/scheduler/impl/ContextImpl.scala +++ b/modules/scheduler/impl/src/main/scala/docspell/scheduler/impl/ContextImpl.scala @@ -59,7 +59,14 @@ object ContextImpl { val log = docspell.logging.getLogger[F] for { _ <- log.trace("Creating logger for task run") - logger <- QueueLogger(job.id, job.info, config.logBufferSize, logSink) + logger <- QueueLogger( + job.id, + job.task, + job.group, + job.info, + config.logBufferSize, + logSink + ) _ <- log.trace("Logger created, instantiating context") ctx = create[F, A](job.id, arg, config, logger, store) } yield ctx diff --git a/modules/scheduler/api/src/main/scala/docspell/scheduler/LogEvent.scala b/modules/scheduler/impl/src/main/scala/docspell/scheduler/impl/LogEvent.scala similarity index 71% rename from modules/scheduler/api/src/main/scala/docspell/scheduler/LogEvent.scala rename to modules/scheduler/impl/src/main/scala/docspell/scheduler/impl/LogEvent.scala index 29a91631..1744e330 100644 --- a/modules/scheduler/api/src/main/scala/docspell/scheduler/LogEvent.scala +++ b/modules/scheduler/impl/src/main/scala/docspell/scheduler/impl/LogEvent.scala @@ -4,7 +4,7 @@ * SPDX-License-Identifier: AGPL-3.0-or-later */ -package docspell.scheduler +package docspell.scheduler.impl import cats.effect.Sync import cats.implicits._ @@ -13,6 +13,8 @@ import docspell.common._ case class LogEvent( jobId: Ident, + taskName: Ident, + group: Ident, jobInfo: String, time: Timestamp, level: LogLevel, @@ -29,10 +31,14 @@ object LogEvent { def create[F[_]: Sync]( jobId: Ident, + taskName: Ident, + group: Ident, jobInfo: String, level: LogLevel, msg: String ): F[LogEvent] = - Timestamp.current[F].map(now => LogEvent(jobId, jobInfo, now, level, msg)) + Timestamp + .current[F] + .map(now => LogEvent(jobId, taskName, group, jobInfo, now, level, msg)) } diff --git a/modules/scheduler/impl/src/main/scala/docspell/scheduler/impl/LogSink.scala b/modules/scheduler/impl/src/main/scala/docspell/scheduler/impl/LogSink.scala index c59b45ce..e133a1df 100644 --- a/modules/scheduler/impl/src/main/scala/docspell/scheduler/impl/LogSink.scala +++ b/modules/scheduler/impl/src/main/scala/docspell/scheduler/impl/LogSink.scala @@ -12,7 +12,6 @@ import fs2.Pipe import docspell.common._ import docspell.logging -import docspell.scheduler.LogEvent import docspell.store.Store import docspell.store.records.RJobLog @@ -32,7 +31,10 @@ object LogSink { def logInternal[F[_]: Sync](e: LogEvent): F[Unit] = { val logger = docspell.logging.getLogger[F] val addData: logging.LogEvent => logging.LogEvent = - _.data("jobId", e.jobId).data("jobInfo", e.jobInfo) + _.data("jobId", e.jobId) + .data("task", e.taskName) + .data("group", e.group) + .data("jobInfo", e.jobInfo) e.level match { case LogLevel.Info => diff --git a/modules/scheduler/impl/src/main/scala/docspell/scheduler/impl/QueueLogger.scala b/modules/scheduler/impl/src/main/scala/docspell/scheduler/impl/QueueLogger.scala index 8a4a0824..3a3992ff 100644 --- a/modules/scheduler/impl/src/main/scala/docspell/scheduler/impl/QueueLogger.scala +++ b/modules/scheduler/impl/src/main/scala/docspell/scheduler/impl/QueueLogger.scala @@ -14,12 +14,13 @@ import fs2.Stream import docspell.common.{Ident, LogLevel} import docspell.logging import docspell.logging.{Level, Logger} -import docspell.scheduler.LogEvent object QueueLogger { def create[F[_]: Sync]( jobId: Ident, + taskName: Ident, + group: Ident, jobInfo: String, q: Queue[F, LogEvent] ): Logger[F] = @@ -27,7 +28,14 @@ object QueueLogger { def log(logEvent: logging.LogEvent) = LogEvent - .create[F](jobId, jobInfo, level2Level(logEvent.level), logEvent.msg()) + .create[F]( + jobId, + taskName, + group, + jobInfo, + level2Level(logEvent.level), + logEvent.msg() + ) .flatMap { ev => val event = logEvent.findErrors.headOption @@ -42,13 +50,15 @@ object QueueLogger { def apply[F[_]: Async]( jobId: Ident, + taskName: Ident, + group: Ident, jobInfo: String, bufferSize: Int, sink: LogSink[F] ): F[Logger[F]] = for { q <- Queue.circularBuffer[F, LogEvent](bufferSize) - log = create(jobId, jobInfo, q) + log = create(jobId, taskName, group, jobInfo, q) _ <- Async[F].start( Stream.fromQueueUnterminated(q).through(sink.receive).compile.drain )