scalafmtAll

This commit is contained in:
Eike Kettner
2020-03-26 18:26:00 +01:00
parent 09ea724c13
commit 9656ba62f4
91 changed files with 871 additions and 295 deletions

View File

@ -12,8 +12,10 @@ import org.log4s._
object Main extends IOApp {
private[this] val logger = getLogger
val blockingEC = ThreadFactories.cached[IO](ThreadFactories.ofName("docspell-joex-blocking"))
val connectEC = ThreadFactories.fixed[IO](5, ThreadFactories.ofName("docspell-joex-dbconnect"))
val blockingEC =
ThreadFactories.cached[IO](ThreadFactories.ofName("docspell-joex-blocking"))
val connectEC =
ThreadFactories.fixed[IO](5, ThreadFactories.ofName("docspell-joex-dbconnect"))
def run(args: List[String]) = {
args match {
@ -52,9 +54,17 @@ object Main extends IOApp {
blocker = Blocker.liftExecutorService(bec)
} yield Pools(cec, bec, blocker)
pools.use(p =>
JoexServer.stream[IO](cfg, p.connectEC, p.clientEC, p.blocker).compile.drain.as(ExitCode.Success)
JoexServer
.stream[IO](cfg, p.connectEC, p.clientEC, p.blocker)
.compile
.drain
.as(ExitCode.Success)
)
}
case class Pools(connectEC: ExecutionContext, clientEC: ExecutionContext, blocker: Blocker)
case class Pools(
connectEC: ExecutionContext,
clientEC: ExecutionContext,
blocker: Blocker
)
}

View File

@ -6,9 +6,9 @@ import docspell.common._
import HouseKeepingConfig._
case class HouseKeepingConfig(
schedule: CalEvent,
cleanupInvites: CleanupInvites,
cleanupJobs: CleanupJobs
schedule: CalEvent,
cleanupInvites: CleanupInvites,
cleanupJobs: CleanupJobs
)
object HouseKeepingConfig {

View File

@ -16,7 +16,8 @@ object HouseKeepingTask {
val taskName: Ident = Ident.unsafe("housekeeping")
def apply[F[_]: Sync](cfg: Config): Task[F, Unit, Unit] =
Task.log[F](_.info(s"Running house-keeping task now"))
Task
.log[F](_.info(s"Running house-keeping task now"))
.flatMap(_ => CleanupInvitesTask(cfg.houseKeeping.cleanupInvites))
.flatMap(_ => CleanupJobsTask(cfg.houseKeeping.cleanupJobs))

View File

@ -25,7 +25,10 @@ object CreateItem {
Task { ctx =>
def isValidFile(fm: FileMeta) =
ctx.args.meta.validFileTypes.isEmpty ||
ctx.args.meta.validFileTypes.map(_.asString).toSet.contains(fm.mimetype.baseType)
ctx.args.meta.validFileTypes
.map(_.asString)
.toSet
.contains(fm.mimetype.baseType)
def fileMetas(itemId: Ident, now: Timestamp) =
Stream
@ -37,7 +40,9 @@ object CreateItem {
case (f, index) =>
Ident
.randomId[F]
.map(id => RAttachment(id, itemId, f.fileMetaId, index.toInt, now, f.name))
.map(id =>
RAttachment(id, itemId, f.fileMetaId, index.toInt, now, f.name)
)
})
.compile
.toVector
@ -51,7 +56,9 @@ object CreateItem {
)
for {
_ <- ctx.logger.info(s"Creating new item with ${ctx.args.files.size} attachment(s)")
_ <- ctx.logger.info(
s"Creating new item with ${ctx.args.files.size} attachment(s)"
)
time <- Duration.stopTime[F]
it <- item
n <- ctx.store.transact(RItem.insert(it))
@ -61,7 +68,13 @@ object CreateItem {
_ <- logDifferences(ctx, fm, k.sum)
dur <- time
_ <- ctx.logger.info(s"Creating item finished in ${dur.formatExact}")
} yield ItemData(it, fm, Vector.empty, Vector.empty, fm.map(a => a.id -> a.fileId).toMap)
} yield ItemData(
it,
fm,
Vector.empty,
Vector.empty,
fm.map(a => a.id -> a.fileId).toMap
)
}
def insertAttachment[F[_]: Sync](ctx: Context[F, _])(ra: RAttachment): F[Int] = {
@ -79,7 +92,8 @@ object CreateItem {
_ <- if (cand.nonEmpty) ctx.logger.warn("Found existing item with these files.")
else ().pure[F]
ht <- cand.drop(1).traverse(ri => QItem.delete(ctx.store)(ri.id, ri.cid))
_ <- if (ht.sum > 0) ctx.logger.warn(s"Removed ${ht.sum} items with same attachments")
_ <- if (ht.sum > 0)
ctx.logger.warn(s"Removed ${ht.sum} items with same attachments")
else ().pure[F]
rms <- OptionT(
cand.headOption.traverse(ri =>
@ -92,7 +106,9 @@ object CreateItem {
origMap = orig
.map(originFileTuple)
.toMap
} yield cand.headOption.map(ri => ItemData(ri, rms, Vector.empty, Vector.empty, origMap))
} yield cand.headOption.map(ri =>
ItemData(ri, rms, Vector.empty, Vector.empty, origMap)
)
}
private def logDifferences[F[_]: Sync](
@ -114,6 +130,8 @@ object CreateItem {
}
//TODO if no source is present, it must be saved!
private def originFileTuple(t: (RAttachment, Option[RAttachmentSource])): (Ident, Ident) =
private def originFileTuple(
t: (RAttachment, Option[RAttachmentSource])
): (Ident, Ident) =
t._2.map(s => s.id -> s.fileId).getOrElse(t._1.id -> t._1.fileId)
}

View File

@ -24,7 +24,10 @@ case class ItemData(
copy(metas = next)
}
def changeMeta(attachId: Ident, f: RAttachmentMeta => RAttachmentMeta): RAttachmentMeta =
def changeMeta(
attachId: Ident,
f: RAttachmentMeta => RAttachmentMeta
): RAttachmentMeta =
f(findOrCreate(attachId))
def findOrCreate(attachId: Ident): RAttachmentMeta =

View File

@ -10,15 +10,21 @@ import docspell.store.records.{RItem, RJob}
object ItemHandler {
def onCancel[F[_]: Sync: ContextShift]: Task[F, ProcessItemArgs, Unit] =
logWarn("Now cancelling. Deleting potentially created data.").flatMap(_ => deleteByFileIds)
logWarn("Now cancelling. Deleting potentially created data.").flatMap(_ =>
deleteByFileIds
)
def apply[F[_]: ConcurrentEffect: ContextShift](cfg: Config): Task[F, ProcessItemArgs, Unit] =
def apply[F[_]: ConcurrentEffect: ContextShift](
cfg: Config
): Task[F, ProcessItemArgs, Unit] =
CreateItem[F]
.flatMap(itemStateTask(ItemState.Processing))
.flatMap(safeProcess[F](cfg))
.map(_ => ())
def itemStateTask[F[_]: Sync, A](state: ItemState)(data: ItemData): Task[F, A, ItemData] =
def itemStateTask[F[_]: Sync, A](
state: ItemState
)(data: ItemData): Task[F, A, ItemData] =
Task(ctx => ctx.store.transact(RItem.updateState(data.item.id, state)).map(_ => data))
def isLastRetry[F[_]: Sync, A](ctx: Context[F, A]): F[Boolean] =
@ -36,8 +42,9 @@ object ItemHandler {
case Right(d) =>
Task.pure(d)
case Left(ex) =>
logWarn[F]("Processing failed on last retry. Creating item but without proposals.")
.flatMap(_ => itemStateTask(ItemState.Created)(data))
logWarn[F](
"Processing failed on last retry. Creating item but without proposals."
).flatMap(_ => itemStateTask(ItemState.Created)(data))
.andThen(_ => Sync[F].raiseError(ex))
})
case false =>

View File

@ -34,7 +34,7 @@ object TextAnalysis {
for {
list0 <- stanfordNer[F](lang, rm)
list1 <- contactNer[F](rm)
list = list0 ++ list1
list = list0 ++ list1
spans = NerLabelSpan.build(list.toSeq)
dates <- dateNer[F](rm, lang)
} yield (rm.copy(nerlabels = (spans ++ list ++ dates.toNerLabel).toList), dates)
@ -48,11 +48,14 @@ object TextAnalysis {
rm.content.map(Contact.annotate).getOrElse(Vector.empty)
}
def dateNer[F[_]: Sync](rm: RAttachmentMeta, lang: Language): F[AttachmentDates] = Sync[F].delay {
AttachmentDates(
rm,
rm.content.map(txt => DateFind.findDates(txt, lang).toVector).getOrElse(Vector.empty)
)
}
def dateNer[F[_]: Sync](rm: RAttachmentMeta, lang: Language): F[AttachmentDates] =
Sync[F].delay {
AttachmentDates(
rm,
rm.content
.map(txt => DateFind.findDates(txt, lang).toVector)
.getOrElse(Vector.empty)
)
}
}

View File

@ -19,11 +19,13 @@ object TextExtraction {
for {
_ <- ctx.logger.info("Starting text extraction")
start <- Duration.stopTime[F]
txt <- item.attachments.traverse(extractTextIfEmpty(ctx, cfg, ctx.args.meta.language, item))
_ <- ctx.logger.debug("Storing extracted texts")
_ <- txt.toList.traverse(rm => ctx.store.transact(RAttachmentMeta.upsert(rm)))
dur <- start
_ <- ctx.logger.info(s"Text extraction finished in ${dur.formatExact}")
txt <- item.attachments.traverse(
extractTextIfEmpty(ctx, cfg, ctx.args.meta.language, item)
)
_ <- ctx.logger.debug("Storing extracted texts")
_ <- txt.toList.traverse(rm => ctx.store.transact(RAttachmentMeta.upsert(rm)))
dur <- start
_ <- ctx.logger.info(s"Text extraction finished in ${dur.formatExact}")
} yield item.copy(metas = txt)
}
@ -53,7 +55,10 @@ object TextExtraction {
_ <- ctx.logger.debug(s"Extracting text for attachment ${stripAttachmentName(ra)}")
dst <- Duration.stopTime[F]
txt <- extractTextFallback(ctx, cfg, ra, lang)(filesToExtract(item, ra))
meta = item.changeMeta(ra.id, rm => rm.setContentIfEmpty(txt.map(_.trim).filter(_.nonEmpty)))
meta = item.changeMeta(
ra.id,
rm => rm.setContentIfEmpty(txt.map(_.trim).filter(_.nonEmpty))
)
est <- dst
_ <- ctx.logger.debug(
s"Extracting text for attachment ${stripAttachmentName(ra)} finished in ${est.formatExact}"
@ -76,7 +81,9 @@ object TextExtraction {
.getOrElse(Mimetype.`application/octet-stream`)
findMime
.flatMap(mt => extr.extractText(data, DataType(MimeType(mt.primary, mt.sub, mt.params)), lang))
.flatMap(mt =>
extr.extractText(data, DataType(MimeType(mt.primary, mt.sub, mt.params)), lang)
)
}
private def extractTextFallback[F[_]: Sync: ContextShift](

View File

@ -49,7 +49,9 @@ object JoexRoutes {
case POST -> Root / "job" / Ident(id) / "cancel" =>
for {
flag <- app.scheduler.requestCancel(id)
resp <- Ok(BasicResult(flag, if (flag) "Cancel request submitted" else "Job not found"))
resp <- Ok(
BasicResult(flag, if (flag) "Cancel request submitted" else "Job not found")
)
} yield resp
}
}

View File

@ -16,11 +16,19 @@ import io.circe.Decoder
* convenience constructor that uses circe to decode json into some
* type A.
*/
case class JobTask[F[_]](name: Ident, task: Task[F, String, Unit], onCancel: Task[F, String, Unit])
case class JobTask[F[_]](
name: Ident,
task: Task[F, String, Unit],
onCancel: Task[F, String, Unit]
)
object JobTask {
def json[F[_]: Sync, A](name: Ident, task: Task[F, A, Unit], onCancel: Task[F, A, Unit])(
def json[F[_]: Sync, A](
name: Ident,
task: Task[F, A, Unit],
onCancel: Task[F, A, Unit]
)(
implicit D: Decoder[A]
): JobTask[F] = {
val convert: String => F[A] =

View File

@ -20,7 +20,12 @@ case class LogEvent(
object LogEvent {
def create[F[_]: Sync](jobId: Ident, jobInfo: String, level: LogLevel, msg: String): F[LogEvent] =
def create[F[_]: Sync](
jobId: Ident,
jobInfo: String,
level: LogLevel,
msg: String
): F[LogEvent] =
Timestamp.current[F].map(now => LogEvent(jobId, jobInfo, now, level, msg))
}

View File

@ -42,7 +42,16 @@ object PeriodicScheduler {
for {
waiter <- Resource.liftF(SignallingRef(true))
state <- Resource.liftF(SignallingRef(PeriodicSchedulerImpl.emptyState[F]))
psch = new PeriodicSchedulerImpl[F](cfg, sch, queue, store, client, waiter, state, timer)
psch = new PeriodicSchedulerImpl[F](
cfg,
sch,
queue,
store,
client,
waiter,
state,
timer
)
_ <- Resource.liftF(psch.init)
} yield psch

View File

@ -3,6 +3,6 @@ package docspell.joex.scheduler
import docspell.common._
case class PeriodicSchedulerConfig(
name: Ident,
wakeupPeriod: Duration
name: Ident,
wakeupPeriod: Duration
)

View File

@ -7,7 +7,11 @@ import fs2.concurrent.Queue
object QueueLogger {
def create[F[_]: Sync](jobId: Ident, jobInfo: String, q: Queue[F, LogEvent]): Logger[F] =
def create[F[_]: Sync](
jobId: Ident,
jobInfo: String,
q: Queue[F, LogEvent]
): Logger[F] =
new Logger[F] {
def trace(msg: => String): F[Unit] =
LogEvent.create[F](jobId, jobInfo, LogLevel.Debug, msg).flatMap(q.enqueue1)

View File

@ -38,7 +38,9 @@ case class SchedulerBuilder[F[_]: ConcurrentEffect: ContextShift](
copy(queue = Resource.pure[F, JobQueue[F]](queue))
def serve: Resource[F, Scheduler[F]] =
resource.evalMap(sch => ConcurrentEffect[F].start(sch.start.compile.drain).map(_ => sch))
resource.evalMap(sch =>
ConcurrentEffect[F].start(sch.start.compile.drain).map(_ => sch)
)
def resource: Resource[F, Scheduler[F]] = {
val scheduler = for {
@ -46,7 +48,17 @@ case class SchedulerBuilder[F[_]: ConcurrentEffect: ContextShift](
waiter <- Resource.liftF(SignallingRef(true))
state <- Resource.liftF(SignallingRef(SchedulerImpl.emptyState[F]))
perms <- Resource.liftF(Semaphore(config.poolSize.toLong))
} yield new SchedulerImpl[F](config, blocker, jq, tasks, store, logSink, state, waiter, perms)
} yield new SchedulerImpl[F](
config,
blocker,
jq,
tasks,
store,
logSink,
state,
waiter,
perms
)
scheduler.evalTap(_.init).map(s => s: Scheduler[F])
}

View File

@ -50,7 +50,8 @@ final class SchedulerImpl[F[_]: ConcurrentEffect: ContextShift](
def requestCancel(jobId: Ident): F[Boolean] =
state.get.flatMap(_.cancelRequest(jobId) match {
case Some(ct) => ct.map(_ => true)
case None => logger.fwarn(s"Job ${jobId.id} not found, cannot cancel.").map(_ => false)
case None =>
logger.fwarn(s"Job ${jobId.id} not found, cannot cancel.").map(_ => false)
})
def notifyChange: F[Unit] =
@ -67,12 +68,15 @@ final class SchedulerImpl[F[_]: ConcurrentEffect: ContextShift](
.eval(runShutdown)
.evalMap(_ => logger.finfo("Scheduler is shutting down now."))
.flatMap(_ =>
Stream.eval(state.get) ++ Stream.suspend(state.discrete.takeWhile(_.getRunning.nonEmpty))
Stream.eval(state.get) ++ Stream
.suspend(state.discrete.takeWhile(_.getRunning.nonEmpty))
)
.flatMap { state =>
if (state.getRunning.isEmpty) Stream.eval(logger.finfo("No jobs running."))
else
Stream.eval(logger.finfo(s"Waiting for ${state.getRunning.size} jobs to finish.")) ++
Stream.eval(
logger.finfo(s"Waiting for ${state.getRunning.size} jobs to finish.")
) ++
Stream.emit(state)
}
@ -86,11 +90,14 @@ final class SchedulerImpl[F[_]: ConcurrentEffect: ContextShift](
def mainLoop: Stream[F, Nothing] = {
val body: F[Boolean] =
for {
_ <- permits.available.flatMap(a => logger.fdebug(s"Try to acquire permit ($a free)"))
_ <- permits.available.flatMap(a =>
logger.fdebug(s"Try to acquire permit ($a free)")
)
_ <- permits.acquire
_ <- logger.fdebug("New permit acquired")
down <- state.get.map(_.shutdownRequest)
rjob <- if (down) logger.finfo("") *> permits.release *> (None: Option[RJob]).pure[F]
rjob <- if (down)
logger.finfo("") *> permits.release *> (None: Option[RJob]).pure[F]
else
queue.nextJob(
group => state.modify(_.nextPrio(group, config.countingScheme)),
@ -151,7 +158,11 @@ final class SchedulerImpl[F[_]: ConcurrentEffect: ContextShift](
} yield ()
def onStart(job: RJob): F[Unit] =
QJob.setRunning(job.id, config.name, store) //also increments retries if current state=stuck
QJob.setRunning(
job.id,
config.name,
store
) //also increments retries if current state=stuck
def wrapTask(
job: RJob,
@ -159,7 +170,9 @@ final class SchedulerImpl[F[_]: ConcurrentEffect: ContextShift](
ctx: Context[F, String]
): Task[F, String, Unit] =
task
.mapF(fa => onStart(job) *> logger.fdebug("Starting task now") *> blocker.blockOn(fa))
.mapF(fa =>
onStart(job) *> logger.fdebug("Starting task now") *> blocker.blockOn(fa)
)
.mapF(_.attempt.flatMap({
case Right(()) =>
logger.info(s"Job execution successful: ${job.info}")
@ -196,7 +209,12 @@ final class SchedulerImpl[F[_]: ConcurrentEffect: ContextShift](
onFinish(job, JobState.Stuck)
})
def forkRun(job: RJob, code: F[Unit], onCancel: F[Unit], ctx: Context[F, String]): F[F[Unit]] = {
def forkRun(
job: RJob,
code: F[Unit],
onCancel: F[Unit],
ctx: Context[F, String]
): F[F[Unit]] = {
val bfa = blocker.blockOn(code)
logger.fdebug(s"Forking job ${job.info}") *>
ConcurrentEffect[F]
@ -236,10 +254,16 @@ object SchedulerImpl {
}
def addRunning(job: RJob, token: CancelToken[F]): (State[F], Unit) =
(State(counters, cancelled, cancelTokens.updated(job.id, token), shutdownRequest), ())
(
State(counters, cancelled, cancelTokens.updated(job.id, token), shutdownRequest),
()
)
def removeRunning(job: RJob): (State[F], Unit) =
(copy(cancelled = cancelled - job.id, cancelTokens = cancelTokens.removed(job.id)), ())
(
copy(cancelled = cancelled - job.id, cancelTokens = cancelTokens.removed(job.id)),
()
)
def markCancelled(job: RJob): (State[F], Unit) =
(copy(cancelled = cancelled + job.id), ())

View File

@ -25,11 +25,13 @@ trait Task[F[_], A, B] {
def mapF[C](f: F[B] => F[C]): Task[F, A, C] =
Task(Task.toKleisli(this).mapF(f))
def attempt(implicit F: ApplicativeError[F, Throwable]): Task[F, A, Either[Throwable, B]] =
def attempt(
implicit F: ApplicativeError[F, Throwable]
): Task[F, A, Either[Throwable, B]] =
mapF(_.attempt)
def contramap[C](f: C => F[A])(implicit F: FlatMap[F]): Task[F, C, B] = { ctxc: Context[F, C] =>
f(ctxc.args).flatMap(a => run(ctxc.map(_ => a)))
def contramap[C](f: C => F[A])(implicit F: FlatMap[F]): Task[F, C, B] = {
ctxc: Context[F, C] => f(ctxc.args).flatMap(a => run(ctxc.map(_ => a)))
}
}