Reformat with scalafmt 3.0.0

This commit is contained in:
Scala Steward
2021-08-19 08:50:30 +02:00
parent 5a2a0295ef
commit e4fecefaea
127 changed files with 558 additions and 658 deletions

View File

@ -22,10 +22,9 @@ trait JoexApp[F[_]] {
/** Shuts down the job executor.
*
* It will immediately stop taking new jobs, waiting for currently
* running jobs to complete normally (i.e. running jobs are not
* canceled). After this completed, the webserver stops and the
* main loop will exit.
* It will immediately stop taking new jobs, waiting for currently running jobs to
* complete normally (i.e. running jobs are not canceled). After this completed, the
* webserver stops and the main loop will exit.
*/
def initShutdown: F[Unit]
}

View File

@ -97,7 +97,7 @@ object NerFile {
private def sanitizeRegex(str: String): String =
str.trim.toLowerCase.foldLeft("") { (res, ch) =>
if (invalidChars.contains(ch)) s"${res}\\$ch"
if (invalidChars.contains(ch)) s"$res\\$ch"
else s"$res$ch"
}
}

View File

@ -22,8 +22,7 @@ import docspell.store.records.RPerson
import io.circe.syntax._
import org.log4s.getLogger
/** Maintains a custom regex-ner file per collective for stanford's
* regexner annotator.
/** Maintains a custom regex-ner file per collective for stanford's regexner annotator.
*/
trait RegexNerFile[F[_]] {
@ -64,7 +63,7 @@ object RegexNerFile {
val dur = Duration.between(nf.creation, now)
if (dur > cfg.minTime)
logger.fdebug(
s"Cache time elapsed (${dur} > ${cfg.minTime}). Check for new state."
s"Cache time elapsed ($dur > ${cfg.minTime}). Check for new state."
) *> updateFile(
collective,
now,
@ -141,7 +140,7 @@ object RegexNerFile {
)
for {
_ <- logger.finfo(s"Generating custom NER file for collective '${collective.id}'")
_ <- logger.finfo(s"Generating custom NER file for collective '${collective.id}'")
names <- store.transact(QCollective.allNames(collective, cfg.maxEntries))
nerFile = NerFile(collective, lastUpdate, now)
_ <- update(nerFile, NerFile.mkNerConfig(names))

View File

@ -52,7 +52,7 @@ object EmptyTrashTask {
s"Starting removing all soft-deleted items older than ${maxDate.asString}"
)
nDeleted <- deleteAll(ctx.args, maxDate, itemOps, itemSearchOps, ctx)
_ <- ctx.logger.info(s"Finished deleting ${nDeleted} items")
_ <- ctx.logger.info(s"Finished deleting $nDeleted items")
} yield ()
}

View File

@ -101,7 +101,7 @@ object FtsWork {
def recoverWith(
other: FtsWork[F]
)(implicit ev: ApplicativeError[F, Throwable]): FtsWork[F] =
Kleisli(ctx => mt.run(ctx).onError({ case _ => other.run(ctx) }))
Kleisli(ctx => mt.run(ctx).onError { case _ => other.run(ctx) })
def forContext(
cfg: Config.FullTextSearch,

View File

@ -19,9 +19,8 @@ import docspell.store.Store
/** Migrating the index from the previous version to this version.
*
* The migration asks the fulltext search client for a list of
* migration tasks to run. It may be empty when there is no migration
* required.
* The migration asks the fulltext search client for a list of migration tasks to run. It
* may be empty when there is no migration required.
*/
case class Migration[F[_]](
version: Int,

View File

@ -10,8 +10,7 @@ import cats.data.Kleisli
package object fts {
/** Some work that must be done to advance the schema of the fulltext
* index.
/** Some work that must be done to advance the schema of the fulltext index.
*/
type FtsWork[F[_]] = Kleisli[F, FtsContext[F], Unit]

View File

@ -23,7 +23,7 @@ object ClassifierName {
private val categoryPrefix = "tagcategory-"
def tagCategory(cat: String): ClassifierName =
apply(s"${categoryPrefix}${cat}")
apply(s"$categoryPrefix$cat")
val concernedPerson: ClassifierName =
apply("concernedperson")
@ -56,7 +56,7 @@ object ClassifierName {
def findOrphanTagModels[F[_]](coll: Ident): ConnectionIO[List[RClassifierModel]] =
for {
cats <- RClassifierSetting.getActiveCategories(coll)
allModels = RClassifierModel.findAllByQuery(coll, s"${categoryPrefix}%")
allModels = RClassifierModel.findAllByQuery(coll, s"$categoryPrefix%")
result <- NonEmptyList.fromList(cats) match {
case Some(nel) =>
allModels.flatMap(all =>

View File

@ -47,7 +47,7 @@ object Classify {
.flatMap(_ => classifier.classify(logger, ClassifierModel(modelFile), text))
}).filter(_ != LearnClassifierTask.noClass)
.flatTapNone(logger.debug("Guessed: <none>"))
_ <- OptionT.liftF(logger.debug(s"Guessed: ${cls}"))
_ <- OptionT.liftF(logger.debug(s"Guessed: $cls"))
} yield cls).value
}

View File

@ -40,7 +40,7 @@ object NotifyDueItemsTask {
_ <- createMail(cfg, mailCfg, ctx)
.semiflatMap { mail =>
for {
_ <- ctx.logger.info(s"Sending notification mail to ${ctx.args.recipients}")
_ <- ctx.logger.info(s"Sending notification mail to ${ctx.args.recipients}")
res <- emil(mailCfg.toMailConfig).send(mail).map(_.head)
_ <- ctx.logger.info(s"Sent mail with id: $res")
} yield ()

View File

@ -26,9 +26,8 @@ import bitpeace.RangeDef
import io.circe.generic.semiauto._
import io.circe.{Decoder, Encoder}
/** Converts the given attachment file using ocrmypdf if it is a pdf
* and has not already been converted (the source file is the same as
* in the attachment).
/** Converts the given attachment file using ocrmypdf if it is a pdf and has not already
* been converted (the source file is the same as in the attachment).
*/
object PdfConvTask {
case class Args(attachId: Ident)
@ -100,7 +99,7 @@ object PdfConvTask {
.through(bp.fetchData2(RangeDef.all))
val storeResult: ConversionResult.Handler[F, Unit] =
Kleisli({
Kleisli {
case ConversionResult.SuccessPdf(file) =>
storeToAttachment(ctx, in, file)
@ -109,15 +108,15 @@ object PdfConvTask {
case ConversionResult.UnsupportedFormat(mime) =>
ctx.logger.warn(
s"Unable to convert '${mime}' file ${ctx.args}: unsupported format."
s"Unable to convert '$mime' file ${ctx.args}: unsupported format."
)
case ConversionResult.InputMalformed(mime, reason) =>
ctx.logger.warn(s"Unable to convert '${mime}' file ${ctx.args}: $reason")
ctx.logger.warn(s"Unable to convert '$mime' file ${ctx.args}: $reason")
case ConversionResult.Failure(ex) =>
Sync[F].raiseError(ex)
})
}
def ocrMyPdf(lang: Language): F[Unit] =
OcrMyPdf.toPDF[F, Unit](

View File

@ -22,9 +22,8 @@ import docspell.store.syntax.MimeTypes._
import bitpeace.{Mimetype, RangeDef}
/** Goes through all attachments that must be already converted into a
* pdf. If it is a pdf, the number of pages are retrieved and stored
* in the attachment metadata.
/** Goes through all attachments that must be already converted into a pdf. If it is a
* pdf, the number of pages are retrieved and stored in the attachment metadata.
*/
object AttachmentPageCount {

View File

@ -24,9 +24,9 @@ import docspell.store.syntax.MimeTypes._
import bitpeace.{Mimetype, MimetypeHint, RangeDef}
/** Goes through all attachments that must be already converted into a
* pdf. If it is a pdf, the first page is converted into a small
* preview png image and linked to the attachment.
/** Goes through all attachments that must be already converted into a pdf. If it is a
* pdf, the first page is converted into a small preview png image and linked to the
* attachment.
*/
object AttachmentPreview {

View File

@ -23,19 +23,16 @@ import docspell.store.syntax.MimeTypes._
import bitpeace.{Mimetype, MimetypeHint, RangeDef}
/** Goes through all attachments and creates a PDF version of it where
* supported.
/** Goes through all attachments and creates a PDF version of it where supported.
*
* The `attachment` record is updated with the PDF version while the
* original file has been stored in the `attachment_source` record.
* The `attachment` record is updated with the PDF version while the original file has
* been stored in the `attachment_source` record.
*
* If pdf conversion is not possible or if the input is already a
* pdf, both files are identical. That is, the `file_id`s point to
* the same file. Since the name of an attachment may be changed by
* the user, the `attachment_origin` record keeps that, too.
* If pdf conversion is not possible or if the input is already a pdf, both files are
* identical. That is, the `file_id`s point to the same file. Since the name of an
* attachment may be changed by the user, the `attachment_origin` record keeps that, too.
*
* This step assumes an existing premature item, it traverses its
* attachments.
* This step assumes an existing premature item, it traverses its attachments.
*/
object ConvertPdf {
@ -104,7 +101,7 @@ object ConvertPdf {
ra: RAttachment,
item: ItemData
): Handler[F, (RAttachment, Option[RAttachmentMeta])] =
Kleisli({
Kleisli {
case ConversionResult.SuccessPdf(pdf) =>
ctx.logger.info(s"Conversion to pdf successful. Saving file.") *>
storePDF(ctx, cfg, ra, pdf)
@ -142,7 +139,7 @@ object ConvertPdf {
ctx.logger
.error(s"PDF conversion failed: ${ex.getMessage}. Go without PDF file") *>
(ra, None: Option[RAttachmentMeta]).pure[F]
})
}
private def storePDF[F[_]: Sync](
ctx: Context[F, ProcessItemArgs],
@ -196,7 +193,7 @@ object ConvertPdf {
case Right(_) => ().pure[F]
case Left(ex) =>
ctx.logger
.error(ex)(s"Cannot delete previous attachment file: ${raPrev}")
.error(ex)(s"Cannot delete previous attachment file: $raPrev")
}
} yield ()

View File

@ -45,9 +45,9 @@ object CreateItem {
Stream
.emits(ctx.args.files)
.flatMap(f => ctx.store.bitpeace.get(f.fileMetaId.id).map(fm => (f, fm)))
.collect({ case (f, Some(fm)) if isValidFile(fm) => f })
.collect { case (f, Some(fm)) if isValidFile(fm) => f }
.zipWithIndex
.evalMap({ case (f, index) =>
.evalMap { case (f, index) =>
Ident
.randomId[F]
.map(id =>
@ -60,7 +60,7 @@ object CreateItem {
f.name
)
)
})
}
}
.compile
.toVector
@ -152,7 +152,7 @@ object CreateItem {
.transact(RAttachment.findByItemCollectiveSource(ri.id, ri.cid, fids))
.flatTap(ats =>
ctx.logger.debug(
s"Found ${ats.size} attachments. Use only those from task args: ${fileMetaIds}"
s"Found ${ats.size} attachments. Use only those from task args: $fileMetaIds"
)
)
)

View File

@ -14,12 +14,10 @@ import cats.implicits._
import docspell.common._
import docspell.joex.scheduler.Task
/** After candidates have been determined, the set is reduced by doing
* some cross checks. For example: if a organization is suggested as
* correspondent, the correspondent person must be linked to that
* organization. So this *removes all* person candidates that are not
* linked to the first organization candidate (which will be linked
* to the item).
/** After candidates have been determined, the set is reduced by doing some cross checks.
* For example: if a organization is suggested as correspondent, the correspondent person
* must be linked to that organization. So this *removes all* person candidates that are
* not linked to the first organization candidate (which will be linked to the item).
*/
object CrossCheckProposals {

View File

@ -52,7 +52,7 @@ object DuplicateCheck {
val fname = ctx.args.files.find(_.fileMetaId.id == fd.fm.id).flatMap(_.name)
if (fd.exists)
ctx.logger
.info(s"Deleting duplicate file ${fname}!") *> ctx.store.bitpeace
.info(s"Deleting duplicate file $fname!") *> ctx.store.bitpeace
.delete(fd.fm.id)
.compile
.drain

View File

@ -15,8 +15,7 @@ import docspell.common._
import docspell.joex.scheduler.{Context, Task}
import docspell.store.records.{RAttachmentMeta, RPerson}
/** Calculate weights for candidates that adds the most likely
* candidate a lower number.
/** Calculate weights for candidates that adds the most likely candidate a lower number.
*/
object EvalProposals {

View File

@ -25,16 +25,13 @@ import docspell.store.syntax.MimeTypes._
import bitpeace.{Mimetype, MimetypeHint, RangeDef}
import emil.Mail
/** Goes through all attachments and extracts archive files, like zip
* files. The process is recursive, until all archives have been
* extracted.
/** Goes through all attachments and extracts archive files, like zip files. The process
* is recursive, until all archives have been extracted.
*
* The archive file is stored as a `attachment_archive` record that
* references all its elements. If there are inner archive, only the
* outer archive file is preserved.
* The archive file is stored as a `attachment_archive` record that references all its
* elements. If there are inner archive, only the outer archive file is preserved.
*
* This step assumes an existing premature item, it traverses its
* attachments.
* This step assumes an existing premature item, it traverses its attachments.
*/
object ExtractArchive {
@ -78,11 +75,10 @@ object ExtractArchive {
)
}
/** After all files have been extracted, the `extract' contains the
* whole (combined) result. This fixes positions of the attachments
* such that the elements of an archive are "spliced" into the
* attachment list at the position of the archive. If there is no
* archive, positions don't need to be fixed.
/** After all files have been extracted, the `extract' contains the whole (combined)
* result. This fixes positions of the attachments such that the elements of an archive
* are "spliced" into the attachment list at the position of the archive. If there is
* no archive, positions don't need to be fixed.
*/
private def fixPositions(extract: Extracted): Extracted =
if (extract.archives.isEmpty) extract
@ -203,8 +199,8 @@ object ExtractArchive {
tentry: (Binary[F], Long)
): Stream[F, Extracted] = {
val (entry, subPos) = tentry
val mimeHint = MimetypeHint.filename(entry.name).withAdvertised(entry.mime.asString)
val fileMeta = ctx.store.bitpeace.saveNew(entry.data, 8192, mimeHint)
val mimeHint = MimetypeHint.filename(entry.name).withAdvertised(entry.mime.asString)
val fileMeta = ctx.store.bitpeace.saveNew(entry.data, 8192, mimeHint)
Stream.eval(ctx.logger.debug(s"Extracted ${entry.name}. Storing as attachment.")) >>
fileMeta.evalMap { fm =>
Ident.randomId.map { id =>
@ -267,7 +263,7 @@ object ExtractArchive {
val sorted = nel.sorted
val offset = sorted.head.first
val pos =
sorted.zipWithIndex.map({ case (p, i) => p.id -> (i + offset) }).toList.toMap
sorted.zipWithIndex.map { case (p, i) => p.id -> (i + offset) }.toList.toMap
val nf =
files.map(f => pos.get(f.id).map(n => f.copy(position = n)).getOrElse(f))
copy(files = nf)

View File

@ -19,8 +19,8 @@ import docspell.joex.Config
import docspell.joex.scheduler.{Context, Task}
import docspell.store.records._
/** Super simple approach to find corresponding meta data to an item
* by looking up values from NER in the users address book.
/** Super simple approach to find corresponding meta data to an item by looking up values
* from NER in the users address book.
*/
object FindProposal {
type Args = ProcessItemArgs

View File

@ -12,18 +12,23 @@ import docspell.store.records.{RAttachment, RAttachmentMeta, RItem}
/** Data that is carried across all processing tasks.
*
* @param item the stored item record
* @param attachments the attachments belonging to the item
* @param metas the meta data to each attachment; depending on the
* state of processing, this may be empty
* @param dateLabels a separate list of found dates
* @param originFile a mapping from an attachment id to a filemeta-id
* containng the source or origin file
* @param givenMeta meta data to this item that was not "guessed"
* from an attachment but given and thus is always correct
* @param classifyProposals these are proposals that were obtained by
* a trained classifier. There are no ner-tags, it will only provide a
* single label
* @param item
* the stored item record
* @param attachments
* the attachments belonging to the item
* @param metas
* the meta data to each attachment; depending on the state of processing, this may be
* empty
* @param dateLabels
* a separate list of found dates
* @param originFile
* a mapping from an attachment id to a filemeta-id containng the source or origin file
* @param givenMeta
* meta data to this item that was not "guessed" from an attachment but given and thus
* is always correct
* @param classifyProposals
* these are proposals that were obtained by a trained classifier. There are no
* ner-tags, it will only provide a single label
*/
case class ItemData(
item: RItem,
@ -31,7 +36,7 @@ case class ItemData(
metas: Vector[RAttachmentMeta],
dateLabels: Vector[AttachmentDates],
originFile: Map[Ident, Ident], // maps RAttachment.id -> FileMeta.id
givenMeta: MetaProposalList, // given meta data not associated to a specific attachment
givenMeta: MetaProposalList, // given meta data not associated to a specific attachment
// a list of tags (names or ids) attached to the item if they exist
tags: List[String],
// proposals obtained from the classifier
@ -39,9 +44,8 @@ case class ItemData(
classifyTags: List[String]
) {
/** sort by weight; order of equal weights is not important, just
* choose one others are then suggestions
* doc-date is only set when given explicitely, not from "guessing"
/** sort by weight; order of equal weights is not important, just choose one others are
* then suggestions doc-date is only set when given explicitely, not from "guessing"
*/
def finalProposals: MetaProposalList =
MetaProposalList

View File

@ -77,7 +77,7 @@ object ItemHandler {
)(data: ItemData): Task[F, Args, ItemData] =
isLastRetry[F].flatMap {
case true =>
ProcessItem[F](cfg, itemOps, fts, analyser, regexNer)(data).attempt.flatMap({
ProcessItem[F](cfg, itemOps, fts, analyser, regexNer)(data).attempt.flatMap {
case Right(d) =>
Task.pure(d)
case Left(ex) =>
@ -85,7 +85,7 @@ object ItemHandler {
"Processing failed on last retry. Creating item but without proposals."
).flatMap(_ => itemStateTask(ItemState.Created)(data))
.andThen(_ => Sync[F].raiseError(ex))
})
}
case false =>
ProcessItem[F](cfg, itemOps, fts, analyser, regexNer)(data)
.flatMap(itemStateTask(ItemState.Created))

View File

@ -57,7 +57,7 @@ object LinkProposal {
case Some(a) =>
val ids = a.values.map(_.ref.id.id)
ctx.logger.info(
s"Found many (${a.size}, ${ids}) candidates for ${a.proposalType}. Setting first."
s"Found many (${a.size}, $ids) candidates for ${a.proposalType}. Setting first."
) *>
setItemMeta(data.item.id, ctx, a.proposalType, a.values.head.ref.id).map(_ =>
Result.multiple(mpt)

View File

@ -149,14 +149,14 @@ object ReProcessItem {
isLastRetry[F].flatMap {
case true =>
processFiles[F](cfg, fts, itemOps, analyser, regexNer, data).attempt
.flatMap({
.flatMap {
case Right(d) =>
Task.pure(d)
case Left(ex) =>
logWarn[F](
"Processing failed on last retry."
).andThen(_ => Sync[F].raiseError(ex))
})
}
case false =>
processFiles[F](cfg, fts, itemOps, analyser, regexNer, data)
}

View File

@ -67,7 +67,7 @@ object SetGivenData {
val tags =
(ctx.args.meta.tags.getOrElse(Nil) ++ data.tags ++ data.classifyTags).distinct
for {
_ <- ctx.logger.info(s"Set tags from given data: ${tags}")
_ <- ctx.logger.info(s"Set tags from given data: $tags")
e <- ops.linkTags(itemId, tags, collective).attempt
_ <- e.fold(
ex => ctx.logger.warn(s"Error setting tags: ${ex.getMessage}"),

View File

@ -106,7 +106,7 @@ object TextExtraction {
item: ItemData
)(ra: RAttachment): F[(RAttachmentMeta, List[String])] =
for {
_ <- ctx.logger.debug(s"Extracting text for attachment ${stripAttachmentName(ra)}")
_ <- ctx.logger.debug(s"Extracting text for attachment ${stripAttachmentName(ra)}")
dst <- Duration.stopTime[F]
fids <- filesToExtract(ctx)(item, ra)
res <- extractTextFallback(ctx, cfg, ra, lang)(fids)
@ -158,7 +158,7 @@ object TextExtraction {
val extr = Extraction.create[F](ctx.logger, cfg)
extractText[F](ctx, extr, lang)(id)
.flatMap({
.flatMap {
case res @ ExtractResult.Success(_, _) =>
res.some.pure[F]
@ -173,15 +173,14 @@ object TextExtraction {
ctx.logger
.warn(s"Cannot extract text: ${ex.getMessage}. Try with converted file")
.flatMap(_ => extractTextFallback[F](ctx, cfg, ra, lang)(rest))
})
}
}
/** Returns the fileIds to extract text from. First, the source file
* is tried. If that fails, the converted file is tried.
/** Returns the fileIds to extract text from. First, the source file is tried. If that
* fails, the converted file is tried.
*
* If the source file is a PDF, then use the converted file. This
* may then already contain the text if ocrmypdf is enabled. If it
* is disabled, both files are the same.
* If the source file is a PDF, then use the converted file. This may then already
* contain the text if ocrmypdf is enabled. If it is disabled, both files are the same.
*/
private def filesToExtract[F[_]: Sync](ctx: Context[F, _])(
item: ItemData,

View File

@ -50,7 +50,10 @@ object JoexRoutes {
for {
optJob <- app.scheduler.getRunning.map(_.find(_.id == id))
optLog <- optJob.traverse(j => app.findLogs(j.id))
jAndL = for { job <- optJob; log <- optLog } yield mkJobLog(job, log)
jAndL = for {
job <- optJob
log <- optLog
} yield mkJobLog(job, log)
resp <- jAndL.map(Ok(_)).getOrElse(NotFound(BasicResult(false, "Not found")))
} yield resp

View File

@ -46,7 +46,7 @@ object ScanMailboxTask {
userId = ctx.args.account.user
imapConn = ctx.args.imapConnection
_ <- ctx.logger.info(
s"Reading mails for user ${userId.id} from ${imapConn.id}/${folders}"
s"Reading mails for user ${userId.id} from ${imapConn.id}/$folders"
)
_ <- importMails(cfg, mailCfg, emil, upload, joex, ctx)
} yield ()

View File

@ -10,11 +10,10 @@ import cats.implicits._
import docspell.common.Priority
/** A counting scheme to indicate a ratio between scheduling high and
* low priority jobs.
/** A counting scheme to indicate a ratio between scheduling high and low priority jobs.
*
* For example high=4, low=1 means: ”schedule 4 high priority jobs
* and then 1 low priority job“.
* For example high=4, low=1 means: ”schedule 4 high priority jobs and then 1 low
* priority job“.
*/
case class CountingScheme(high: Int, low: Int, counter: Int = 0) {

View File

@ -14,14 +14,13 @@ import docspell.common.syntax.all._
import io.circe.Decoder
/** Binds a Task to a name. This is required to lookup the code based
* on the taskName in the RJob data and to execute it given the
* arguments that have to be read from a string.
/** Binds a Task to a name. This is required to lookup the code based on the taskName in
* the RJob data and to execute it given the arguments that have to be read from a
* string.
*
* Since the scheduler only has a string for the task argument, this
* only works for Task impls that accept a string. There is a
* convenience constructor that uses circe to decode json into some
* type A.
* Since the scheduler only has a string for the task argument, this only works for Task
* impls that accept a string. There is a convenience constructor that uses circe to
* decode json into some type A.
*/
case class JobTask[F[_]](
name: Ident,

View File

@ -8,9 +8,8 @@ package docspell.joex.scheduler
import docspell.common.Ident
/** This is a mapping from some identifier to a task. This is used by
* the scheduler to lookup an implementation using the taskName field
* of the RJob database record.
/** This is a mapping from some identifier to a task. This is used by the scheduler to
* lookup an implementation using the taskName field of the RJob database record.
*/
final class JobTaskRegistry[F[_]](tasks: Map[Ident, JobTask[F]]) {

View File

@ -13,14 +13,12 @@ import fs2.concurrent.SignallingRef
import docspell.joexapi.client.JoexClient
import docspell.store.queue._
/** A periodic scheduler takes care to submit periodic tasks to the
* job queue.
/** A periodic scheduler takes care to submit periodic tasks to the job queue.
*
* It is run in the background to regularily find a periodic task to
* execute. If the task is due, it will be submitted into the job
* queue where it will be picked up by the scheduler from some joex
* instance. If it is due in the future, a notification is scheduled
* to be received at that time so the task can be looked up again.
* It is run in the background to regularily find a periodic task to execute. If the task
* is due, it will be submitted into the job queue where it will be picked up by the
* scheduler from some joex instance. If it is due in the future, a notification is
* scheduled to be received at that time so the task can be looked up again.
*/
trait PeriodicScheduler[F[_]] {

View File

@ -53,8 +53,8 @@ final class PeriodicSchedulerImpl[F[_]: Async](
// internal
/** On startup, get all periodic jobs from this scheduler and remove
* the mark, so they get picked up again.
/** On startup, get all periodic jobs from this scheduler and remove the mark, so they
* get picked up again.
*/
def init: F[Unit] =
logError("Error clearing marks")(store.clearMarks(config.name))
@ -68,7 +68,7 @@ final class PeriodicSchedulerImpl[F[_]: Async](
go <- logThrow("Error getting next task")(
store
.takeNext(config.name, None)
.use({
.use {
case Marked.Found(pj) =>
logger
.fdebug(s"Found periodic task '${pj.subject}/${pj.timer.asString}'") *>
@ -79,7 +79,7 @@ final class PeriodicSchedulerImpl[F[_]: Async](
case Marked.NotMarkable =>
logger.fdebug("Periodic job cannot be marked. Trying again.") *> true
.pure[F]
})
}
)
} yield go
@ -90,7 +90,7 @@ final class PeriodicSchedulerImpl[F[_]: Async](
else ().pure[F]
)
.flatMap(if (_) Stream.empty else Stream.eval(cancelNotify *> body))
.flatMap({
.flatMap {
case true =>
mainLoop
case false =>
@ -98,7 +98,7 @@ final class PeriodicSchedulerImpl[F[_]: Async](
waiter.discrete.take(2).drain ++
logger.sdebug(s"Notify signal, going into main loop") ++
mainLoop
})
}
}
def isTriggered(pj: RPeriodicTask, now: Timestamp): Boolean =
@ -107,7 +107,7 @@ final class PeriodicSchedulerImpl[F[_]: Async](
def submitJob(pj: RPeriodicTask): F[Boolean] =
store
.findNonFinalJob(pj.id)
.flatMap({
.flatMap {
case Some(job) =>
logger.finfo[F](
s"There is already a job with non-final state '${job.state}' in the queue"
@ -116,7 +116,7 @@ final class PeriodicSchedulerImpl[F[_]: Async](
case None =>
logger.finfo[F](s"Submitting job for periodic task '${pj.task.id}'") *>
pj.toJob.flatMap(queue.insert) *> notifyJoex *> true.pure[F]
})
}
def notifyJoex: F[Unit] =
sch.notifyChange *> store.findJoexNodes.flatMap(
@ -145,12 +145,12 @@ final class PeriodicSchedulerImpl[F[_]: Async](
def cancelNotify: F[Unit] =
state
.modify(_.clearNotify)
.flatMap({
.flatMap {
case Some(fb) =>
fb.cancel
case None =>
().pure[F]
})
}
private def logError(msg: => String)(fa: F[Unit]): F[Unit] =
fa.attempt.flatMap {
@ -159,12 +159,10 @@ final class PeriodicSchedulerImpl[F[_]: Async](
}
private def logThrow[A](msg: => String)(fa: F[A]): F[A] =
fa.attempt
.flatMap({
case r @ Right(_) => (r: Either[Throwable, A]).pure[F]
case l @ Left(ex) => logger.ferror(ex)(msg).map(_ => (l: Either[Throwable, A]))
})
.rethrow
fa.attempt.flatMap {
case r @ Right(_) => (r: Either[Throwable, A]).pure[F]
case l @ Left(ex) => logger.ferror(ex)(msg).map(_ => (l: Either[Throwable, A]))
}.rethrow
}
object PeriodicSchedulerImpl {

View File

@ -26,13 +26,11 @@ trait Scheduler[F[_]] {
/** Requests to shutdown the scheduler.
*
* The scheduler will not take any new jobs from the queue. If
* there are still running jobs, it waits for them to complete.
* when the cancelAll flag is set to true, it cancels all running
* jobs.
* The scheduler will not take any new jobs from the queue. If there are still running
* jobs, it waits for them to complete. when the cancelAll flag is set to true, it
* cancels all running jobs.
*
* The returned F[Unit] can be evaluated to wait for all that to
* complete.
* The returned F[Unit] can be evaluated to wait for all that to complete.
*/
def shutdown(cancelAll: Boolean): F[Unit]

View File

@ -36,8 +36,8 @@ final class SchedulerImpl[F[_]: Async](
private[this] val logger = getLogger
/** On startup, get all jobs in state running from this scheduler
* and put them into waiting state, so they get picked up again.
/** On startup, get all jobs in state running from this scheduler and put them into
* waiting state, so they get picked up again.
*/
def init: F[Unit] =
QJob.runningToWaiting(config.name, store)
@ -132,7 +132,7 @@ final class SchedulerImpl[F[_]: Async](
else ().pure[F]
)
.flatMap(if (_) Stream.empty else Stream.eval(body))
.flatMap({
.flatMap {
case true =>
mainLoop
case false =>
@ -140,7 +140,7 @@ final class SchedulerImpl[F[_]: Async](
waiter.discrete.take(2).drain ++
logger.sdebug(s"Notify signal, going into main loop") ++
mainLoop
})
}
}
private def executeCancel(job: RJob): F[Unit] = {
@ -214,7 +214,7 @@ final class SchedulerImpl[F[_]: Async](
): Task[F, String, Unit] =
task
.mapF(fa => onStart(job) *> logger.fdebug("Starting task now") *> fa)
.mapF(_.attempt.flatMap({
.mapF(_.attempt.flatMap {
case Right(()) =>
logger.info(s"Job execution successful: ${job.info}")
ctx.logger.info("Job execution successful") *>
@ -239,7 +239,7 @@ final class SchedulerImpl[F[_]: Async](
.map(_ => JobState.Stuck: JobState)
}
}
}))
})
.mapF(_.attempt.flatMap {
case Right(jstate) =>
onFinish(job, jstate)
@ -262,12 +262,12 @@ final class SchedulerImpl[F[_]: Async](
.map(fiber =>
logger.fdebug(s"Cancelling job ${job.info}") *>
fiber.cancel *>
onCancel.attempt.map({
onCancel.attempt.map {
case Right(_) => ()
case Left(ex) =>
logger.error(ex)(s"Task's cancelling code failed. Job ${job.info}.")
()
}) *>
} *>
state.modify(_.markCancelled(job)) *>
onFinish(job, JobState.Cancelled) *>
ctx.logger.warn("Job has been cancelled.") *>