Apply scalafmt to all files

This commit is contained in:
Eike Kettner
2019-12-30 21:44:13 +01:00
parent 57e274e2b0
commit fc3e22e399
133 changed files with 3003 additions and 2112 deletions

View File

@ -5,17 +5,23 @@ import docspell.joex.scheduler.SchedulerConfig
import docspell.store.JdbcConfig
import docspell.text.ocr.{Config => OcrConfig}
case class Config(appId: Ident
, baseUrl: LenientUri
, bind: Config.Bind
, jdbc: JdbcConfig
, scheduler: SchedulerConfig
, extraction: OcrConfig
case class Config(
appId: Ident,
baseUrl: LenientUri,
bind: Config.Bind,
jdbc: JdbcConfig,
scheduler: SchedulerConfig,
extraction: OcrConfig
)
object Config {
val postgres = JdbcConfig(LenientUri.unsafe("jdbc:postgresql://localhost:5432/docspelldev"), "dev", "dev")
val h2 = JdbcConfig(LenientUri.unsafe("jdbc:h2:./target/docspelldev.db;MODE=PostgreSQL;DATABASE_TO_LOWER=TRUE"), "sa", "")
val postgres =
JdbcConfig(LenientUri.unsafe("jdbc:postgresql://localhost:5432/docspelldev"), "dev", "dev")
val h2 = JdbcConfig(
LenientUri.unsafe("jdbc:h2:./target/docspelldev.db;MODE=PostgreSQL;DATABASE_TO_LOWER=TRUE"),
"sa",
""
)
case class Bind(address: String, port: Int)
}

View File

@ -11,7 +11,6 @@ object ConfigFile {
def loadConfig: Config =
ConfigSource.default.at("docspell.joex").loadOrThrow[Config]
object Implicits {
implicit val countingSchemeReader: ConfigReader[CountingScheme] =
ConfigReader[String].emap(reason(CountingScheme.readString))

View File

@ -12,11 +12,13 @@ import fs2.concurrent.SignallingRef
import scala.concurrent.ExecutionContext
final class JoexAppImpl[F[_]: ConcurrentEffect : ContextShift: Timer]( cfg: Config
, nodeOps: ONode[F]
, store: Store[F]
, termSignal: SignallingRef[F, Boolean]
, val scheduler: Scheduler[F]) extends JoexApp[F] {
final class JoexAppImpl[F[_]: ConcurrentEffect: ContextShift: Timer](
cfg: Config,
nodeOps: ONode[F],
store: Store[F],
termSignal: SignallingRef[F, Boolean],
val scheduler: Scheduler[F]
) extends JoexApp[F] {
def init: F[Unit] = {
val run = scheduler.start.compile.drain
@ -40,17 +42,25 @@ final class JoexAppImpl[F[_]: ConcurrentEffect : ContextShift: Timer]( cfg: Conf
object JoexAppImpl {
def create[F[_]: ConcurrentEffect : ContextShift: Timer](cfg: Config
, termSignal: SignallingRef[F, Boolean]
, connectEC: ExecutionContext
, blocker: Blocker): Resource[F, JoexApp[F]] =
def create[F[_]: ConcurrentEffect: ContextShift: Timer](
cfg: Config,
termSignal: SignallingRef[F, Boolean],
connectEC: ExecutionContext,
blocker: Blocker
): Resource[F, JoexApp[F]] =
for {
store <- Store.create(cfg.jdbc, connectEC, blocker)
store <- Store.create(cfg.jdbc, connectEC, blocker)
nodeOps <- ONode(store)
sch <- SchedulerBuilder(cfg.scheduler, blocker, store).
withTask(JobTask.json(ProcessItemArgs.taskName, ItemHandler[F](cfg.extraction), ItemHandler.onCancel[F])).
resource
app = new JoexAppImpl(cfg, nodeOps, store, termSignal, sch)
appR <- Resource.make(app.init.map(_ => app))(_.shutdown)
sch <- SchedulerBuilder(cfg.scheduler, blocker, store)
.withTask(
JobTask.json(
ProcessItemArgs.taskName,
ItemHandler[F](cfg.extraction),
ItemHandler.onCancel[F]
)
)
.resource
app = new JoexAppImpl(cfg, nodeOps, store, termSignal, sch)
appR <- Resource.make(app.init.map(_ => app))(_.shutdown)
} yield appR
}

View File

@ -15,20 +15,26 @@ import scala.concurrent.ExecutionContext
object JoexServer {
private case class App[F[_]](
httpApp: HttpApp[F],
termSig: SignallingRef[F, Boolean],
exitRef: Ref[F, ExitCode]
)
private case class App[F[_]](httpApp: HttpApp[F], termSig: SignallingRef[F, Boolean], exitRef: Ref[F, ExitCode])
def stream[F[_]: ConcurrentEffect : ContextShift](cfg: Config, connectEC: ExecutionContext, blocker: Blocker)
(implicit T: Timer[F]): Stream[F, Nothing] = {
def stream[F[_]: ConcurrentEffect: ContextShift](
cfg: Config,
connectEC: ExecutionContext,
blocker: Blocker
)(implicit T: Timer[F]): Stream[F, Nothing] = {
val app = for {
signal <- Resource.liftF(SignallingRef[F, Boolean](false))
signal <- Resource.liftF(SignallingRef[F, Boolean](false))
exitCode <- Resource.liftF(Ref[F].of(ExitCode.Success))
joexApp <- JoexAppImpl.create[F](cfg, signal, connectEC, blocker)
httpApp = Router(
"/api/info" -> InfoRoutes(),
"/api/v1" -> JoexRoutes(joexApp)
"/api/v1" -> JoexRoutes(joexApp)
).orNotFound
// With Middlewares in place
@ -36,14 +42,15 @@ object JoexServer {
} yield App(finalHttpApp, signal, exitCode)
Stream.resource(app).flatMap(app =>
BlazeServerBuilder[F].
bindHttp(cfg.bind.port, cfg.bind.address).
withHttpApp(app.httpApp).
withoutBanner.
serveWhile(app.termSig, app.exitRef)
)
Stream
.resource(app)
.flatMap(app =>
BlazeServerBuilder[F]
.bindHttp(cfg.bind.port, cfg.bind.address)
.withHttpApp(app.httpApp)
.withoutBanner
.serveWhile(app.termSig, app.exitRef)
)
}.drain
}

View File

@ -14,10 +14,12 @@ object Main extends IOApp {
private[this] val logger = getLogger
val blockingEc: ExecutionContext = ExecutionContext.fromExecutor(
Executors.newCachedThreadPool(ThreadFactories.ofName("docspell-joex-blocking")))
Executors.newCachedThreadPool(ThreadFactories.ofName("docspell-joex-blocking"))
)
val blocker = Blocker.liftExecutionContext(blockingEc)
val connectEC: ExecutionContext = ExecutionContext.fromExecutorService(
Executors.newFixedThreadPool(5, ThreadFactories.ofName("docspell-joex-dbconnect")))
Executors.newFixedThreadPool(5, ThreadFactories.ofName("docspell-joex-dbconnect"))
)
def run(args: List[String]) = {
args match {
@ -40,12 +42,15 @@ object Main extends IOApp {
}
val cfg = ConfigFile.loadConfig
val banner = Banner("JOEX"
, BuildInfo.version
, BuildInfo.gitHeadCommit
, cfg.jdbc.url
, Option(System.getProperty("config.file"))
, cfg.appId, cfg.baseUrl)
val banner = Banner(
"JOEX",
BuildInfo.version,
BuildInfo.gitHeadCommit,
cfg.jdbc.url,
Option(System.getProperty("config.file")),
cfg.appId,
cfg.baseUrl
)
logger.info(s"\n${banner.render("***>")}")
JoexServer.stream[IO](cfg, connectEC, blocker).compile.drain.as(ExitCode.Success)
}

View File

@ -16,56 +16,76 @@ object CreateItem {
def apply[F[_]: Sync]: Task[F, ProcessItemArgs, ItemData] =
findExisting[F].flatMap {
case Some(ri) => Task.pure(ri)
case None => createNew[F]
case None => createNew[F]
}
def createNew[F[_]: Sync]: Task[F, ProcessItemArgs, ItemData] =
Task { ctx =>
val validFiles = ctx.args.meta.validFileTypes.map(_.asString).toSet
def fileMetas(itemId: Ident, now: Timestamp) = Stream.emits(ctx.args.files).
flatMap(f => ctx.store.bitpeace.get(f.fileMetaId.id).map(fm => (f, fm))).
collect({ case (f, Some(fm)) if validFiles.contains(fm.mimetype.baseType) => f }).
zipWithIndex.
evalMap({ case (f, index) =>
Ident.randomId[F].map(id => RAttachment(id, itemId, f.fileMetaId, index.toInt, now, f.name))
}).
compile.toVector
def fileMetas(itemId: Ident, now: Timestamp) =
Stream
.emits(ctx.args.files)
.flatMap(f => ctx.store.bitpeace.get(f.fileMetaId.id).map(fm => (f, fm)))
.collect({ case (f, Some(fm)) if validFiles.contains(fm.mimetype.baseType) => f })
.zipWithIndex
.evalMap({
case (f, index) =>
Ident
.randomId[F]
.map(id => RAttachment(id, itemId, f.fileMetaId, index.toInt, now, f.name))
})
.compile
.toVector
val item = RItem.newItem[F](ctx.args.meta.collective
, ctx.args.makeSubject
, ctx.args.meta.sourceAbbrev
, ctx.args.meta.direction.getOrElse(Direction.Incoming)
, ItemState.Premature)
val item = RItem.newItem[F](
ctx.args.meta.collective,
ctx.args.makeSubject,
ctx.args.meta.sourceAbbrev,
ctx.args.meta.direction.getOrElse(Direction.Incoming),
ItemState.Premature
)
for {
_ <- ctx.logger.info(s"Creating new item with ${ctx.args.files.size} attachment(s)")
_ <- ctx.logger.info(s"Creating new item with ${ctx.args.files.size} attachment(s)")
time <- Duration.stopTime[F]
it <- item
n <- ctx.store.transact(RItem.insert(it))
_ <- if (n != 1) storeItemError[F](ctx) else ().pure[F]
fm <- fileMetas(it.id, it.created)
k <- fm.traverse(a => ctx.store.transact(RAttachment.insert(a)))
_ <- logDifferences(ctx, fm, k.sum)
dur <- time
_ <- ctx.logger.info(s"Creating item finished in ${dur.formatExact}")
it <- item
n <- ctx.store.transact(RItem.insert(it))
_ <- if (n != 1) storeItemError[F](ctx) else ().pure[F]
fm <- fileMetas(it.id, it.created)
k <- fm.traverse(a => ctx.store.transact(RAttachment.insert(a)))
_ <- logDifferences(ctx, fm, k.sum)
dur <- time
_ <- ctx.logger.info(s"Creating item finished in ${dur.formatExact}")
} yield ItemData(it, fm, Vector.empty, Vector.empty)
}
def findExisting[F[_]: Sync]: Task[F, ProcessItemArgs, Option[ItemData]] =
Task { ctx =>
for {
cand <- ctx.store.transact(QItem.findByFileIds(ctx.args.files.map(_.fileMetaId)))
_ <- if (cand.nonEmpty) ctx.logger.warn("Found existing item with these files.") else ().pure[F]
ht <- cand.drop(1).traverse(ri => QItem.delete(ctx.store)(ri.id, ri.cid))
_ <- if (ht.sum > 0) ctx.logger.warn(s"Removed ${ht.sum} items with same attachments") else ().pure[F]
rms <- cand.headOption.traverse(ri => ctx.store.transact(RAttachment.findByItemAndCollective(ri.id, ri.cid)))
} yield cand.headOption.map(ri => ItemData(ri, rms.getOrElse(Vector.empty), Vector.empty, Vector.empty))
cand <- ctx.store.transact(QItem.findByFileIds(ctx.args.files.map(_.fileMetaId)))
_ <- if (cand.nonEmpty) ctx.logger.warn("Found existing item with these files.")
else ().pure[F]
ht <- cand.drop(1).traverse(ri => QItem.delete(ctx.store)(ri.id, ri.cid))
_ <- if (ht.sum > 0) ctx.logger.warn(s"Removed ${ht.sum} items with same attachments")
else ().pure[F]
rms <- cand.headOption.traverse(ri =>
ctx.store.transact(RAttachment.findByItemAndCollective(ri.id, ri.cid))
)
} yield cand.headOption.map(ri =>
ItemData(ri, rms.getOrElse(Vector.empty), Vector.empty, Vector.empty)
)
}
private def logDifferences[F[_]: Sync](ctx: Context[F, ProcessItemArgs], saved: Vector[RAttachment], saveCount: Int): F[Unit] =
private def logDifferences[F[_]: Sync](
ctx: Context[F, ProcessItemArgs],
saved: Vector[RAttachment],
saveCount: Int
): F[Unit] =
if (ctx.args.files.size != saved.size) {
ctx.logger.warn(s"Not all given files (${ctx.args.files.size}) have been stored. Files retained: ${saved.size}; saveCount=$saveCount")
ctx.logger.warn(
s"Not all given files (${ctx.args.files.size}) have been stored. Files retained: ${saved.size}; saveCount=$saveCount"
)
} else {
().pure[F]
}

View File

@ -19,45 +19,65 @@ object FindProposal {
def apply[F[_]: Sync](data: ItemData): Task[F, ProcessItemArgs, ItemData] =
Task { ctx =>
val rmas = data.metas.map(rm =>
rm.copy(nerlabels = removeDuplicates(rm.nerlabels)))
val rmas = data.metas.map(rm => rm.copy(nerlabels = removeDuplicates(rm.nerlabels)))
ctx.logger.info("Starting find-proposal") *>
rmas.traverse(rm => processAttachment(rm, data.findDates(rm), ctx).map(ml => rm.copy(proposals = ml))).
flatMap(rmv => rmv.traverse(rm =>
ctx.logger.debug(s"Storing attachment proposals: ${rm.proposals}") *>
ctx.store.transact(RAttachmentMeta.updateProposals(rm.id, rm.proposals))).
map(_ => data.copy(metas = rmv)))
rmas
.traverse(rm =>
processAttachment(rm, data.findDates(rm), ctx).map(ml => rm.copy(proposals = ml))
)
.flatMap(rmv =>
rmv
.traverse(rm =>
ctx.logger.debug(s"Storing attachment proposals: ${rm.proposals}") *>
ctx.store.transact(RAttachmentMeta.updateProposals(rm.id, rm.proposals))
)
.map(_ => data.copy(metas = rmv))
)
}
def processAttachment[F[_]: Sync]( rm: RAttachmentMeta
, rd: Vector[NerDateLabel]
, ctx: Context[F, ProcessItemArgs]): F[MetaProposalList] = {
def processAttachment[F[_]: Sync](
rm: RAttachmentMeta,
rd: Vector[NerDateLabel],
ctx: Context[F, ProcessItemArgs]
): F[MetaProposalList] = {
val finder = Finder.searchExact(ctx).next(Finder.searchFuzzy(ctx))
List(finder.find(rm.nerlabels), makeDateProposal(rd)).
traverse(identity).map(MetaProposalList.flatten)
List(finder.find(rm.nerlabels), makeDateProposal(rd))
.traverse(identity)
.map(MetaProposalList.flatten)
}
def makeDateProposal[F[_]: Sync](dates: Vector[NerDateLabel]): F[MetaProposalList] = {
def makeDateProposal[F[_]: Sync](dates: Vector[NerDateLabel]): F[MetaProposalList] =
Timestamp.current[F].map { now =>
val latestFirst = dates.sortWith(_.date isAfter _.date)
val nowDate = now.value.atZone(ZoneId.of("GMT")).toLocalDate
val latestFirst = dates.sortWith((l1, l2) => l1.date.isAfter(l2.date))
val nowDate = now.value.atZone(ZoneId.of("GMT")).toLocalDate
val (after, before) = latestFirst.span(ndl => ndl.date.isAfter(nowDate))
val dueDates = MetaProposalList.fromSeq1(MetaProposalType.DueDate,
after.map(ndl => Candidate(IdRef(Ident.unsafe(ndl.date.toString), ndl.date.toString), Set(ndl.label))))
val itemDates = MetaProposalList.fromSeq1(MetaProposalType.DocDate,
before.map(ndl => Candidate(IdRef(Ident.unsafe(ndl.date.toString), ndl.date.toString), Set(ndl.label))))
val dueDates = MetaProposalList.fromSeq1(
MetaProposalType.DueDate,
after.map(ndl =>
Candidate(IdRef(Ident.unsafe(ndl.date.toString), ndl.date.toString), Set(ndl.label))
)
)
val itemDates = MetaProposalList.fromSeq1(
MetaProposalType.DocDate,
before.map(ndl =>
Candidate(IdRef(Ident.unsafe(ndl.date.toString), ndl.date.toString), Set(ndl.label))
)
)
MetaProposalList.flatten(Seq(dueDates, itemDates))
}
}
def removeDuplicates(labels: List[NerLabel]): List[NerLabel] =
labels.foldLeft((Set.empty[String], List.empty[NerLabel])) { case ((seen, result), el) =>
if (seen.contains(el.tag.name+el.label.toLowerCase)) (seen, result)
else (seen + (el.tag.name + el.label.toLowerCase), el :: result)
}._2.sortBy(_.startPosition)
labels
.foldLeft((Set.empty[String], List.empty[NerLabel])) {
case ((seen, result), el) =>
if (seen.contains(el.tag.name + el.label.toLowerCase)) (seen, result)
else (seen + (el.tag.name + el.label.toLowerCase), el :: result)
}
._2
.sortBy(_.startPosition)
trait Finder[F[_]] { self =>
def find(labels: Seq[NerLabel]): F[MetaProposalList]
@ -80,12 +100,14 @@ object FindProposal {
else f.map(ml1 => ml0.fillEmptyFrom(ml1))
})
def nextWhenEmpty(f: Finder[F], mt0: MetaProposalType, mts: MetaProposalType*)
(implicit F: FlatMap[F], F2: Applicative[F]): Finder[F] =
flatMap(res0 => {
def nextWhenEmpty(f: Finder[F], mt0: MetaProposalType, mts: MetaProposalType*)(
implicit F: FlatMap[F],
F2: Applicative[F]
): Finder[F] =
flatMap { res0 =>
if (res0.hasResults(mt0, mts: _*)) Finder.unit[F](res0)
else f.map(res1 => res0.fillEmptyFrom(res1))
})
}
}
object Finder {
@ -102,7 +124,11 @@ object FindProposal {
labels => labels.toList.traverse(nl => search(nl, false, ctx)).map(MetaProposalList.flatten)
}
private def search[F[_]: Sync](nt: NerLabel, exact: Boolean, ctx: Context[F, ProcessItemArgs]): F[MetaProposalList] = {
private def search[F[_]: Sync](
nt: NerLabel,
exact: Boolean,
ctx: Context[F, ProcessItemArgs]
): F[MetaProposalList] = {
val value =
if (exact) normalizeSearchValue(nt.label)
else s"%${normalizeSearchValue(nt.label)}%"
@ -110,70 +136,84 @@ object FindProposal {
if (exact) 2 else 5
if (value.length < minLength) {
ctx.logger.debug(s"Skipping too small value '$value' (original '${nt.label}').").map(_ => MetaProposalList.empty)
} else nt.tag match {
case NerTag.Organization =>
ctx.logger.debug(s"Looking for organizations: $value") *>
ctx.store.transact(ROrganization.findLike(ctx.args.meta.collective, value)).
map(MetaProposalList.from(MetaProposalType.CorrOrg, nt))
ctx.logger
.debug(s"Skipping too small value '$value' (original '${nt.label}').")
.map(_ => MetaProposalList.empty)
} else
nt.tag match {
case NerTag.Organization =>
ctx.logger.debug(s"Looking for organizations: $value") *>
ctx.store
.transact(ROrganization.findLike(ctx.args.meta.collective, value))
.map(MetaProposalList.from(MetaProposalType.CorrOrg, nt))
case NerTag.Person =>
val s1 = ctx.store.transact(RPerson.findLike(ctx.args.meta.collective, value, true)).
map(MetaProposalList.from(MetaProposalType.ConcPerson, nt))
val s2 = ctx.store.transact(RPerson.findLike(ctx.args.meta.collective, value, false)).
map(MetaProposalList.from(MetaProposalType.CorrPerson, nt))
ctx.logger.debug(s"Looking for persons: $value") *> (for {
ml0 <- s1
ml1 <- s2
} yield ml0 |+| ml1)
case NerTag.Person =>
val s1 = ctx.store
.transact(RPerson.findLike(ctx.args.meta.collective, value, true))
.map(MetaProposalList.from(MetaProposalType.ConcPerson, nt))
val s2 = ctx.store
.transact(RPerson.findLike(ctx.args.meta.collective, value, false))
.map(MetaProposalList.from(MetaProposalType.CorrPerson, nt))
ctx.logger.debug(s"Looking for persons: $value") *> (for {
ml0 <- s1
ml1 <- s2
} yield ml0 |+| ml1)
case NerTag.Location =>
ctx.logger.debug(s"NerTag 'Location' is currently not used. Ignoring value '$value'.").
map(_ => MetaProposalList.empty)
case NerTag.Location =>
ctx.logger
.debug(s"NerTag 'Location' is currently not used. Ignoring value '$value'.")
.map(_ => MetaProposalList.empty)
case NerTag.Misc =>
ctx.logger.debug(s"Looking for equipments: $value") *>
ctx.store.transact(REquipment.findLike(ctx.args.meta.collective, value)).
map(MetaProposalList.from(MetaProposalType.ConcEquip, nt))
case NerTag.Misc =>
ctx.logger.debug(s"Looking for equipments: $value") *>
ctx.store
.transact(REquipment.findLike(ctx.args.meta.collective, value))
.map(MetaProposalList.from(MetaProposalType.ConcEquip, nt))
case NerTag.Email =>
searchContact(nt, ContactKind.Email, value, ctx)
case NerTag.Email =>
searchContact(nt, ContactKind.Email, value, ctx)
case NerTag.Website =>
if (!exact) {
val searchString = Domain.domainFromUri(nt.label.toLowerCase).
toOption.
map(_.toPrimaryDomain.asString).
map(s => s"%$s%").
getOrElse(value)
searchContact(nt, ContactKind.Website, searchString, ctx)
} else {
searchContact(nt, ContactKind.Website, value, ctx)
}
case NerTag.Website =>
if (!exact) {
val searchString = Domain
.domainFromUri(nt.label.toLowerCase)
.toOption
.map(_.toPrimaryDomain.asString)
.map(s => s"%$s%")
.getOrElse(value)
searchContact(nt, ContactKind.Website, searchString, ctx)
} else {
searchContact(nt, ContactKind.Website, value, ctx)
}
case NerTag.Date =>
// There is no database search required for this tag
MetaProposalList.empty.pure[F]
}
case NerTag.Date =>
// There is no database search required for this tag
MetaProposalList.empty.pure[F]
}
}
private def searchContact[F[_]: Sync]( nt: NerLabel
, kind: ContactKind
, value: String
, ctx: Context[F, ProcessItemArgs]): F[MetaProposalList] = {
val orgs = ctx.store.transact(ROrganization.findLike(ctx.args.meta.collective, kind, value)).
map(MetaProposalList.from(MetaProposalType.CorrOrg, nt))
val corrP = ctx.store.transact(RPerson.findLike(ctx.args.meta.collective, kind, value, false)).
map(MetaProposalList.from(MetaProposalType.CorrPerson, nt))
val concP = ctx.store.transact(RPerson.findLike(ctx.args.meta.collective, kind, value, true)).
map(MetaProposalList.from(MetaProposalType.CorrPerson, nt))
private def searchContact[F[_]: Sync](
nt: NerLabel,
kind: ContactKind,
value: String,
ctx: Context[F, ProcessItemArgs]
): F[MetaProposalList] = {
val orgs = ctx.store
.transact(ROrganization.findLike(ctx.args.meta.collective, kind, value))
.map(MetaProposalList.from(MetaProposalType.CorrOrg, nt))
val corrP = ctx.store
.transact(RPerson.findLike(ctx.args.meta.collective, kind, value, false))
.map(MetaProposalList.from(MetaProposalType.CorrPerson, nt))
val concP = ctx.store
.transact(RPerson.findLike(ctx.args.meta.collective, kind, value, true))
.map(MetaProposalList.from(MetaProposalType.CorrPerson, nt))
ctx.logger.debug(s"Looking with $kind: $value") *>
List(orgs, corrP, concP).traverse(identity).map(MetaProposalList.flatten)
}
// The backslash *must* be stripped from search strings.
private [this] val invalidSearch =
private[this] val invalidSearch =
"…_[]^<>=&ſ/{}*?@#$|~`+%\"';\\".toSet
private def normalizeSearchValue(str: String): String =

View File

@ -4,10 +4,12 @@ import docspell.common.{Ident, NerDateLabel, NerLabel}
import docspell.joex.process.ItemData.AttachmentDates
import docspell.store.records.{RAttachment, RAttachmentMeta, RItem}
case class ItemData( item: RItem
, attachments: Vector[RAttachment]
, metas: Vector[RAttachmentMeta]
, dateLabels: Vector[AttachmentDates]) {
case class ItemData(
item: RItem,
attachments: Vector[RAttachment],
metas: Vector[RAttachmentMeta],
dateLabels: Vector[AttachmentDates]
) {
def findMeta(attachId: Ident): Option[RAttachmentMeta] =
metas.find(_.id == attachId)
@ -16,7 +18,6 @@ case class ItemData( item: RItem
dateLabels.find(m => m.rm.id == rm.id).map(_.dates).getOrElse(Vector.empty)
}
object ItemData {
case class AttachmentDates(rm: RAttachmentMeta, dates: Vector[NerDateLabel]) {
@ -24,4 +25,4 @@ object ItemData {
dates.map(dl => dl.label.copy(label = dl.date.toString))
}
}
}

View File

@ -10,14 +10,13 @@ import docspell.text.ocr.{Config => OcrConfig}
object ItemHandler {
def onCancel[F[_]: Sync: ContextShift]: Task[F, ProcessItemArgs, Unit] =
logWarn("Now cancelling. Deleting potentially created data.").
flatMap(_ => deleteByFileIds)
logWarn("Now cancelling. Deleting potentially created data.").flatMap(_ => deleteByFileIds)
def apply[F[_]: Sync: ContextShift](cfg: OcrConfig): Task[F, ProcessItemArgs, Unit] =
CreateItem[F].
flatMap(itemStateTask(ItemState.Processing)).
flatMap(safeProcess[F](cfg)).
map(_ => ())
CreateItem[F]
.flatMap(itemStateTask(ItemState.Processing))
.flatMap(safeProcess[F](cfg))
.map(_ => ())
def itemStateTask[F[_]: Sync, A](state: ItemState)(data: ItemData): Task[F, A, ItemData] =
Task { ctx =>
@ -26,26 +25,25 @@ object ItemHandler {
def isLastRetry[F[_]: Sync, A](ctx: Context[F, A]): F[Boolean] =
for {
current <- ctx.store.transact(RJob.getRetries(ctx.jobId))
last = ctx.config.retries == current.getOrElse(0)
current <- ctx.store.transact(RJob.getRetries(ctx.jobId))
last = ctx.config.retries == current.getOrElse(0)
} yield last
def safeProcess[F[_]: Sync: ContextShift](cfg: OcrConfig)(data: ItemData): Task[F, ProcessItemArgs, ItemData] =
def safeProcess[F[_]: Sync: ContextShift](
cfg: OcrConfig
)(data: ItemData): Task[F, ProcessItemArgs, ItemData] =
Task(isLastRetry[F, ProcessItemArgs] _).flatMap {
case true =>
ProcessItem[F](cfg)(data).
attempt.flatMap({
ProcessItem[F](cfg)(data).attempt.flatMap({
case Right(d) =>
Task.pure(d)
case Left(ex) =>
logWarn[F]("Processing failed on last retry. Creating item but without proposals.").
flatMap(_ => itemStateTask(ItemState.Created)(data)).
andThen(_ => Sync[F].raiseError(ex))
logWarn[F]("Processing failed on last retry. Creating item but without proposals.")
.flatMap(_ => itemStateTask(ItemState.Created)(data))
.andThen(_ => Sync[F].raiseError(ex))
})
case false =>
ProcessItem[F](cfg)(data).
flatMap(itemStateTask(ItemState.Created))
ProcessItem[F](cfg)(data).flatMap(itemStateTask(ItemState.Created))
}
def deleteByFileIds[F[_]: Sync: ContextShift]: Task[F, ProcessItemArgs, Unit] =

View File

@ -13,28 +13,40 @@ object LinkProposal {
val proposals = MetaProposalList.flatten(data.metas.map(_.proposals))
ctx.logger.info(s"Starting linking proposals") *>
MetaProposalType.all.
traverse(applyValue(data, proposals, ctx)).
map(result => ctx.logger.info(s"Results from proposal processing: $result")).
map(_ => data)
MetaProposalType.all
.traverse(applyValue(data, proposals, ctx))
.map(result => ctx.logger.info(s"Results from proposal processing: $result"))
.map(_ => data)
}
def applyValue[F[_]: Sync](data: ItemData, proposalList: MetaProposalList, ctx: Context[F, ProcessItemArgs])(mpt: MetaProposalType): F[Result] = {
def applyValue[F[_]: Sync](
data: ItemData,
proposalList: MetaProposalList,
ctx: Context[F, ProcessItemArgs]
)(mpt: MetaProposalType): F[Result] =
proposalList.find(mpt) match {
case None =>
Result.noneFound(mpt).pure[F]
case Some(a) if a.isSingleValue =>
ctx.logger.info(s"Found one candidate for ${a.proposalType}") *>
setItemMeta(data.item.id, ctx, a.proposalType, a.values.head.ref.id).
map(_ => Result.single(mpt))
setItemMeta(data.item.id, ctx, a.proposalType, a.values.head.ref.id).map(_ =>
Result.single(mpt)
)
case Some(a) =>
ctx.logger.info(s"Found many (${a.size}, ${a.values.map(_.ref.id.id)}) candidates for ${a.proposalType}. Setting first.") *>
setItemMeta(data.item.id, ctx, a.proposalType, a.values.head.ref.id).
map(_ => Result.multiple(mpt))
ctx.logger.info(
s"Found many (${a.size}, ${a.values.map(_.ref.id.id)}) candidates for ${a.proposalType}. Setting first."
) *>
setItemMeta(data.item.id, ctx, a.proposalType, a.values.head.ref.id).map(_ =>
Result.multiple(mpt)
)
}
}
def setItemMeta[F[_]: Sync](itemId: Ident, ctx: Context[F, ProcessItemArgs], mpt: MetaProposalType, value: Ident): F[Int] =
def setItemMeta[F[_]: Sync](
itemId: Ident,
ctx: Context[F, ProcessItemArgs],
mpt: MetaProposalType,
value: Ident
): F[Int] =
mpt match {
case MetaProposalType.CorrOrg =>
ctx.logger.debug(s"Updating item organization with: ${value.id}") *>
@ -54,18 +66,17 @@ object LinkProposal {
ctx.logger.debug(s"Not linking document date suggestion ${value.id}").map(_ => 0)
}
sealed trait Result {
def proposalType: MetaProposalType
}
object Result {
case class NoneFound(proposalType: MetaProposalType) extends Result
case class SingleResult(proposalType: MetaProposalType) extends Result
case class NoneFound(proposalType: MetaProposalType) extends Result
case class SingleResult(proposalType: MetaProposalType) extends Result
case class MultipleResult(proposalType: MetaProposalType) extends Result
def noneFound(proposalType: MetaProposalType): Result = NoneFound(proposalType)
def single(proposalType: MetaProposalType): Result = SingleResult(proposalType)
def multiple(proposalType: MetaProposalType): Result = MultipleResult(proposalType)
def single(proposalType: MetaProposalType): Result = SingleResult(proposalType)
def multiple(proposalType: MetaProposalType): Result = MultipleResult(proposalType)
}
}

View File

@ -7,13 +7,15 @@ import docspell.text.ocr.{Config => OcrConfig}
object ProcessItem {
def apply[F[_]: Sync: ContextShift](cfg: OcrConfig)(item: ItemData): Task[F, ProcessItemArgs, ItemData] =
TextExtraction(cfg, item).
flatMap(Task.setProgress(25)).
flatMap(TextAnalysis[F]).
flatMap(Task.setProgress(50)).
flatMap(FindProposal[F]).
flatMap(Task.setProgress(75)).
flatMap(LinkProposal[F]).
flatMap(Task.setProgress(99))
def apply[F[_]: Sync: ContextShift](
cfg: OcrConfig
)(item: ItemData): Task[F, ProcessItemArgs, ItemData] =
TextExtraction(cfg, item)
.flatMap(Task.setProgress(25))
.flatMap(TextAnalysis[F])
.flatMap(Task.setProgress(50))
.flatMap(FindProposal[F])
.flatMap(Task.setProgress(75))
.flatMap(LinkProposal[F])
.flatMap(Task.setProgress(99))
}

View File

@ -8,7 +8,7 @@ import docspell.joex.scheduler.Task
import org.log4s._
object TestTasks {
private [this] val logger = getLogger
private[this] val logger = getLogger
def success[F[_]]: Task[F, ProcessItemArgs, Unit] =
Task { ctx =>
@ -17,23 +17,23 @@ object TestTasks {
def failing[F[_]: Sync]: Task[F, ProcessItemArgs, Unit] =
Task { ctx =>
ctx.logger.info(s"Failing the task run :(").map(_ =>
sys.error("Oh, cannot extract gold from this document")
)
ctx.logger
.info(s"Failing the task run :(")
.map(_ => sys.error("Oh, cannot extract gold from this document"))
}
def longRunning[F[_]: Sync]: Task[F, ProcessItemArgs, Unit] =
Task { ctx =>
logger.fwarn(s"${Thread.currentThread()} From executing long running task") >>
ctx.logger.info(s"${Thread.currentThread()} Running task now: ${ctx.args}") >>
sleep(2400) >>
ctx.logger.debug("doing things") >>
sleep(2400) >>
ctx.logger.debug("doing more things") >>
sleep(2400) >>
ctx.logger.info("doing more things")
ctx.logger.info(s"${Thread.currentThread()} Running task now: ${ctx.args}") >>
sleep(2400) >>
ctx.logger.debug("doing things") >>
sleep(2400) >>
ctx.logger.debug("doing more things") >>
sleep(2400) >>
ctx.logger.info("doing more things")
}
private def sleep[F[_]:Sync](ms: Long): F[Unit] =
private def sleep[F[_]: Sync](ms: Long): F[Unit] =
Sync[F].delay(Thread.sleep(ms))
}

View File

@ -15,35 +15,42 @@ object TextAnalysis {
def apply[F[_]: Sync](item: ItemData): Task[F, ProcessItemArgs, ItemData] =
Task { ctx =>
for {
_ <- ctx.logger.info("Starting text analysis")
s <- Duration.stopTime[F]
t <- item.metas.toList.traverse(annotateAttachment[F](ctx.args.meta.language))
_ <- ctx.logger.debug(s"Storing tags: ${t.map(_._1.copy(content = None))}")
_ <- t.traverse(m => ctx.store.transact(RAttachmentMeta.updateLabels(m._1.id, m._1.nerlabels)))
e <- s
_ <- ctx.logger.info(s"Text-Analysis finished in ${e.formatExact}")
v = t.toVector
_ <- ctx.logger.info("Starting text analysis")
s <- Duration.stopTime[F]
t <- item.metas.toList.traverse(annotateAttachment[F](ctx.args.meta.language))
_ <- ctx.logger.debug(s"Storing tags: ${t.map(_._1.copy(content = None))}")
_ <- t.traverse(m =>
ctx.store.transact(RAttachmentMeta.updateLabels(m._1.id, m._1.nerlabels))
)
e <- s
_ <- ctx.logger.info(s"Text-Analysis finished in ${e.formatExact}")
v = t.toVector
} yield item.copy(metas = v.map(_._1), dateLabels = v.map(_._2))
}
def annotateAttachment[F[_]: Sync](lang: Language)(rm: RAttachmentMeta): F[(RAttachmentMeta, AttachmentDates)] =
def annotateAttachment[F[_]: Sync](
lang: Language
)(rm: RAttachmentMeta): F[(RAttachmentMeta, AttachmentDates)] =
for {
list0 <- stanfordNer[F](lang, rm)
list1 <- contactNer[F](rm)
dates <- dateNer[F](rm, lang)
} yield (rm.copy(nerlabels = (list0 ++ list1 ++ dates.toNerLabel).toList), dates)
def stanfordNer[F[_]: Sync](lang: Language, rm: RAttachmentMeta): F[Vector[NerLabel]] = Sync[F].delay {
rm.content.map(StanfordNerClassifier.nerAnnotate(lang)).getOrElse(Vector.empty)
}
def stanfordNer[F[_]: Sync](lang: Language, rm: RAttachmentMeta): F[Vector[NerLabel]] =
Sync[F].delay {
rm.content.map(StanfordNerClassifier.nerAnnotate(lang)).getOrElse(Vector.empty)
}
def contactNer[F[_]: Sync](rm: RAttachmentMeta): F[Vector[NerLabel]] = Sync[F].delay {
rm.content.map(Contact.annotate).getOrElse(Vector.empty)
}
def dateNer[F[_]: Sync](rm: RAttachmentMeta, lang: Language): F[AttachmentDates] = Sync[F].delay {
AttachmentDates(rm, rm.content.map(txt => DateFind.findDates(txt, lang).toVector).getOrElse(Vector.empty))
AttachmentDates(
rm,
rm.content.map(txt => DateFind.findDates(txt, lang).toVector).getOrElse(Vector.empty)
)
}
}

View File

@ -11,10 +11,13 @@ import docspell.text.ocr.{TextExtract, Config => OcrConfig}
object TextExtraction {
def apply[F[_]: Sync : ContextShift](cfg: OcrConfig, item: ItemData): Task[F, ProcessItemArgs, ItemData] =
def apply[F[_]: Sync: ContextShift](
cfg: OcrConfig,
item: ItemData
): Task[F, ProcessItemArgs, ItemData] =
Task { ctx =>
for {
_ <- ctx.logger.info("Starting text extraction")
_ <- ctx.logger.info("Starting text extraction")
start <- Duration.stopTime[F]
txt <- item.attachments.traverse(extractTextToMeta(ctx, cfg, ctx.args.meta.language))
_ <- ctx.logger.debug("Storing extracted texts")
@ -24,22 +27,33 @@ object TextExtraction {
} yield item.copy(metas = txt)
}
def extractTextToMeta[F[_]: Sync : ContextShift](ctx: Context[F, _], cfg: OcrConfig, lang: Language)(ra: RAttachment): F[RAttachmentMeta] =
def extractTextToMeta[F[_]: Sync: ContextShift](
ctx: Context[F, _],
cfg: OcrConfig,
lang: Language
)(ra: RAttachment): F[RAttachmentMeta] =
for {
_ <- ctx.logger.debug(s"Extracting text for attachment ${ra.name}")
dst <- Duration.stopTime[F]
txt <- extractText(cfg, lang, ctx.store, ctx.blocker)(ra)
meta = RAttachmentMeta.empty(ra.id).copy(content = txt.map(_.trim).filter(_.nonEmpty))
est <- dst
_ <- ctx.logger.debug(s"Extracting text for attachment ${ra.name} finished in ${est.formatExact}")
_ <- ctx.logger.debug(s"Extracting text for attachment ${ra.name}")
dst <- Duration.stopTime[F]
txt <- extractText(cfg, lang, ctx.store, ctx.blocker)(ra)
meta = RAttachmentMeta.empty(ra.id).copy(content = txt.map(_.trim).filter(_.nonEmpty))
est <- dst
_ <- ctx.logger.debug(
s"Extracting text for attachment ${ra.name} finished in ${est.formatExact}"
)
} yield meta
def extractText[F[_]: Sync : ContextShift](ocrConfig: OcrConfig, lang: Language, store: Store[F], blocker: Blocker)(ra: RAttachment): F[Option[String]] = {
val data = store.bitpeace.get(ra.fileId.id).
unNoneTerminate.
through(store.bitpeace.fetchData2(RangeDef.all))
def extractText[F[_]: Sync: ContextShift](
ocrConfig: OcrConfig,
lang: Language,
store: Store[F],
blocker: Blocker
)(ra: RAttachment): F[Option[String]] = {
val data = store.bitpeace
.get(ra.fileId.id)
.unNoneTerminate
.through(store.bitpeace.fetchData2(RangeDef.all))
TextExtract.extract(data, blocker, lang.iso3, ocrConfig).
compile.last
TextExtract.extract(data, blocker, lang.iso3, ocrConfig).compile.last
}
}

View File

@ -10,15 +10,19 @@ import org.http4s.dsl.Http4sDsl
object InfoRoutes {
def apply[F[_]: Sync](): HttpRoutes[F] = {
val dsl = new Http4sDsl[F]{}
val dsl = new Http4sDsl[F] {}
import dsl._
HttpRoutes.of[F] {
case GET -> (Root / "version") =>
Ok(VersionInfo(BuildInfo.version
, BuildInfo.builtAtMillis
, BuildInfo.builtAtString
, BuildInfo.gitHeadCommit.getOrElse("")
, BuildInfo.gitDescribedVersion.getOrElse("")))
Ok(
VersionInfo(
BuildInfo.version,
BuildInfo.builtAtMillis,
BuildInfo.builtAtString,
BuildInfo.gitHeadCommit.getOrElse(""),
BuildInfo.gitDescribedVersion.getOrElse("")
)
)
}
}
}

View File

@ -13,7 +13,7 @@ import org.http4s.dsl.Http4sDsl
object JoexRoutes {
def apply[F[_]: ConcurrentEffect: Timer](app: JoexApp[F]): HttpRoutes[F] = {
val dsl = new Http4sDsl[F]{}
val dsl = new Http4sDsl[F] {}
import dsl._
HttpRoutes.of[F] {
case POST -> Root / "notify" =>
@ -24,14 +24,16 @@ object JoexRoutes {
case GET -> Root / "running" =>
for {
jobs <- app.scheduler.getRunning
jj = jobs.map(mkJob)
resp <- Ok(JobList(jj.toList))
jobs <- app.scheduler.getRunning
jj = jobs.map(mkJob)
resp <- Ok(JobList(jj.toList))
} yield resp
case POST -> Root / "shutdownAndExit" =>
for {
_ <- ConcurrentEffect[F].start(Timer[F].sleep(Duration.seconds(1).toScala) *> app.initShutdown)
_ <- ConcurrentEffect[F].start(
Timer[F].sleep(Duration.seconds(1).toScala) *> app.initShutdown
)
resp <- Ok(BasicResult(true, "Shutdown initiated."))
} yield resp
@ -39,20 +41,28 @@ object JoexRoutes {
for {
optJob <- app.scheduler.getRunning.map(_.find(_.id == id))
optLog <- optJob.traverse(j => app.findLogs(j.id))
jAndL = for { job <- optJob; log <- optLog } yield mkJobLog(job, log)
jAndL = for { job <- optJob; log <- optLog } yield mkJobLog(job, log)
resp <- jAndL.map(Ok(_)).getOrElse(NotFound(BasicResult(false, "Not found")))
} yield resp
case POST -> Root / "job" / Ident(id) / "cancel" =>
for {
flag <- app.scheduler.requestCancel(id)
flag <- app.scheduler.requestCancel(id)
resp <- Ok(BasicResult(flag, if (flag) "Cancel request submitted" else "Job not found"))
} yield resp
}
}
def mkJob(j: RJob): Job =
Job(j.id, j.subject, j.submitted, j.priority, j.retries, j.progress, j.started.getOrElse(Timestamp.Epoch))
Job(
j.id,
j.subject,
j.submitted,
j.priority,
j.retries,
j.progress,
j.started.getOrElse(Timestamp.Epoch)
)
def mkJobLog(j: RJob, jl: Vector[RJobLog]): JobAndLog =
JobAndLog(mkJob(j), jl.map(r => JobLogEvent(r.created, r.level, r.message)).toList)

View File

@ -30,40 +30,45 @@ trait Context[F[_], A] { self =>
}
object Context {
private [this] val log = getLogger
private[this] val log = getLogger
def create[F[_]: Functor, A]( job: RJob
, arg: A
, config: SchedulerConfig
, log: Logger[F]
, store: Store[F]
, blocker: Blocker): Context[F, A] =
def create[F[_]: Functor, A](
job: RJob,
arg: A,
config: SchedulerConfig,
log: Logger[F],
store: Store[F],
blocker: Blocker
): Context[F, A] =
new ContextImpl(arg, log, store, blocker, config, job.id)
def apply[F[_]: Concurrent, A]( job: RJob
, arg: A
, config: SchedulerConfig
, logSink: LogSink[F]
, blocker: Blocker
, store: Store[F]): F[Context[F, A]] =
def apply[F[_]: Concurrent, A](
job: RJob,
arg: A,
config: SchedulerConfig,
logSink: LogSink[F],
blocker: Blocker,
store: Store[F]
): F[Context[F, A]] =
for {
_ <- log.ftrace("Creating logger for task run")
logger <- Logger(job.id, job.info, config.logBufferSize, logSink)
_ <- log.ftrace("Logger created, instantiating context")
ctx = create[F, A](job, arg, config, logger, store, blocker)
ctx = create[F, A](job, arg, config, logger, store, blocker)
} yield ctx
private final class ContextImpl[F[_]: Functor, A]( val args: A
, val logger: Logger[F]
, val store: Store[F]
, val blocker: Blocker
, val config: SchedulerConfig
, val jobId: Ident)
extends Context[F,A] {
final private class ContextImpl[F[_]: Functor, A](
val args: A,
val logger: Logger[F],
val store: Store[F],
val blocker: Blocker,
val config: SchedulerConfig,
val jobId: Ident
) extends Context[F, A] {
def setProgress(percent: Int): F[Unit] = {
val pval = math.min(100, math.max(0, percent))
store.transact(RJob.setProgress(jobId, pval)).map(_ => ())
}
def setProgress(percent: Int): F[Unit] = {
val pval = math.min(100, math.max(0, percent))
store.transact(RJob.setProgress(jobId, pval)).map(_ => ())
}
}
}
}

View File

@ -11,14 +11,13 @@ import docspell.common.Priority
*/
case class CountingScheme(high: Int, low: Int, counter: Int = 0) {
def nextPriority: (CountingScheme, Priority) = {
def nextPriority: (CountingScheme, Priority) =
if (counter <= 0) (increment, Priority.High)
else {
val rest = counter % (high + low)
if (rest < high) (increment, Priority.High)
else (increment, Priority.Low)
}
}
def increment: CountingScheme =
copy(counter = counter + 1)
@ -32,8 +31,7 @@ object CountingScheme {
def readString(str: String): Either[String, CountingScheme] =
str.split(',') match {
case Array(h, l) =>
Either.catchNonFatal(CountingScheme(h.toInt, l.toInt)).
left.map(_.getMessage)
Either.catchNonFatal(CountingScheme(h.toInt, l.toInt)).left.map(_.getMessage)
case _ =>
Left(s"Invalid counting scheme: $str")
}

View File

@ -20,13 +20,16 @@ case class JobTask[F[_]](name: Ident, task: Task[F, String, Unit], onCancel: Tas
object JobTask {
def json[F[_]: Sync, A](name: Ident, task: Task[F, A, Unit], onCancel: Task[F, A, Unit])
(implicit D: Decoder[A]): JobTask[F] = {
def json[F[_]: Sync, A](name: Ident, task: Task[F, A, Unit], onCancel: Task[F, A, Unit])(
implicit D: Decoder[A]
): JobTask[F] = {
val convert: String => F[A] =
str => str.parseJsonAs[A] match {
case Right(a) => a.pure[F]
case Left(ex) => Sync[F].raiseError(new Exception(s"Cannot parse task arguments: $str", ex))
}
str =>
str.parseJsonAs[A] match {
case Right(a) => a.pure[F]
case Left(ex) =>
Sync[F].raiseError(new Exception(s"Cannot parse task arguments: $str", ex))
}
JobTask(name, task.contramap(convert), onCancel.contramap(convert))
}

View File

@ -4,12 +4,14 @@ import cats.implicits._
import docspell.common._
import cats.effect.Sync
case class LogEvent( jobId: Ident
, jobInfo: String
, time: Timestamp
, level: LogLevel
, msg: String
, ex: Option[Throwable] = None) {
case class LogEvent(
jobId: Ident,
jobInfo: String,
time: Timestamp,
level: LogLevel,
msg: String,
ex: Option[Throwable] = None
) {
def logLine: String =
s">>> ${time.asString} $level $jobInfo: $msg"
@ -21,5 +23,4 @@ object LogEvent {
def create[F[_]: Sync](jobId: Ident, jobInfo: String, level: LogLevel, msg: String): F[LogEvent] =
Timestamp.current[F].map(now => LogEvent(jobId, jobInfo, now, level, msg))
}

View File

@ -44,12 +44,22 @@ object LogSink {
LogSink(_.evalMap(e => logInternal(e)))
def db[F[_]: Sync](store: Store[F]): LogSink[F] =
LogSink(_.evalMap(ev => for {
id <- Ident.randomId[F]
joblog = RJobLog(id, ev.jobId, ev.level, ev.time, ev.msg + ev.ex.map(th => ": "+ th.getMessage).getOrElse(""))
_ <- logInternal(ev)
_ <- store.transact(RJobLog.insert(joblog))
} yield ()))
LogSink(
_.evalMap(ev =>
for {
id <- Ident.randomId[F]
joblog = RJobLog(
id,
ev.jobId,
ev.level,
ev.time,
ev.msg + ev.ex.map(th => ": " + th.getMessage).getOrElse("")
)
_ <- logInternal(ev)
_ <- store.transact(RJobLog.insert(joblog))
} yield ()
)
)
def dbAndLog[F[_]: Concurrent](store: Store[F]): LogSink[F] = {
val s: Stream[F, Pipe[F, LogEvent, Unit]] =

View File

@ -33,17 +33,25 @@ object Logger {
LogEvent.create[F](jobId, jobInfo, LogLevel.Warn, msg).flatMap(q.enqueue1)
def error(ex: Throwable)(msg: => String): F[Unit] =
LogEvent.create[F](jobId, jobInfo, LogLevel.Error, msg).map(le => le.copy(ex = Some(ex))).flatMap(q.enqueue1)
LogEvent
.create[F](jobId, jobInfo, LogLevel.Error, msg)
.map(le => le.copy(ex = Some(ex)))
.flatMap(q.enqueue1)
def error(msg: => String): F[Unit] =
LogEvent.create[F](jobId, jobInfo, LogLevel.Error, msg).flatMap(q.enqueue1)
}
def apply[F[_]: Concurrent](jobId: Ident, jobInfo: String, bufferSize: Int, sink: LogSink[F]): F[Logger[F]] =
def apply[F[_]: Concurrent](
jobId: Ident,
jobInfo: String,
bufferSize: Int,
sink: LogSink[F]
): F[Logger[F]] =
for {
q <- Queue.circularBuffer[F, LogEvent](bufferSize)
log = create(jobId, jobInfo, q)
_ <- Concurrent[F].start(q.dequeue.through(sink.receive).compile.drain)
q <- Queue.circularBuffer[F, LogEvent](bufferSize)
log = create(jobId, jobInfo, q)
_ <- Concurrent[F].start(q.dequeue.through(sink.receive).compile.drain)
} yield log
}
}

View File

@ -7,13 +7,14 @@ import docspell.store.Store
import docspell.store.queue.JobQueue
import fs2.concurrent.SignallingRef
case class SchedulerBuilder[F[_]: ConcurrentEffect : ContextShift](
config: SchedulerConfig
, tasks: JobTaskRegistry[F]
, store: Store[F]
, blocker: Blocker
, queue: Resource[F, JobQueue[F]]
, logSink: LogSink[F]) {
case class SchedulerBuilder[F[_]: ConcurrentEffect: ContextShift](
config: SchedulerConfig,
tasks: JobTaskRegistry[F],
store: Store[F],
blocker: Blocker,
queue: Resource[F, JobQueue[F]],
logSink: LogSink[F]
) {
def withConfig(cfg: SchedulerConfig): SchedulerBuilder[F] =
copy(config = cfg)
@ -33,7 +34,6 @@ case class SchedulerBuilder[F[_]: ConcurrentEffect : ContextShift](
def withLogSink(sink: LogSink[F]): SchedulerBuilder[F] =
copy(logSink = sink)
def serve: Resource[F, Scheduler[F]] =
resource.evalMap(sch => ConcurrentEffect[F].start(sch.start.compile.drain).map(_ => sch))
@ -45,22 +45,25 @@ case class SchedulerBuilder[F[_]: ConcurrentEffect : ContextShift](
perms <- Resource.liftF(Semaphore(config.poolSize.toLong))
} yield new SchedulerImpl[F](config, blocker, jq, tasks, store, logSink, state, waiter, perms)
scheduler.evalTap(_.init).
map(s => s: Scheduler[F])
scheduler.evalTap(_.init).map(s => s: Scheduler[F])
}
}
object SchedulerBuilder {
def apply[F[_]: ConcurrentEffect : ContextShift]( config: SchedulerConfig
, blocker: Blocker
, store: Store[F]): SchedulerBuilder[F] =
new SchedulerBuilder[F](config
, JobTaskRegistry.empty[F]
, store
, blocker
, JobQueue(store)
, LogSink.db[F](store))
def apply[F[_]: ConcurrentEffect: ContextShift](
config: SchedulerConfig,
blocker: Blocker,
store: Store[F]
): SchedulerBuilder[F] =
new SchedulerBuilder[F](
config,
JobTaskRegistry.empty[F],
store,
blocker,
JobQueue(store),
LogSink.db[F](store)
)
}

View File

@ -2,24 +2,26 @@ package docspell.joex.scheduler
import docspell.common._
case class SchedulerConfig( name: Ident
, poolSize: Int
, countingScheme: CountingScheme
, retries: Int
, retryDelay: Duration
, logBufferSize: Int
, wakeupPeriod: Duration
)
case class SchedulerConfig(
name: Ident,
poolSize: Int,
countingScheme: CountingScheme,
retries: Int,
retryDelay: Duration,
logBufferSize: Int,
wakeupPeriod: Duration
)
object SchedulerConfig {
val default = SchedulerConfig(
name = Ident.unsafe("default-scheduler")
, poolSize = 2 // math.max(2, Runtime.getRuntime.availableProcessors / 2)
, countingScheme = CountingScheme(2, 1)
, retries = 5
, retryDelay = Duration.seconds(30)
, logBufferSize = 500
, wakeupPeriod = Duration.minutes(10)
name = Ident.unsafe("default-scheduler"),
poolSize = 2 // math.max(2, Runtime.getRuntime.availableProcessors / 2)
,
countingScheme = CountingScheme(2, 1),
retries = 5,
retryDelay = Duration.seconds(30),
logBufferSize = 500,
wakeupPeriod = Duration.minutes(10)
)
}

View File

@ -14,17 +14,19 @@ import SchedulerImpl._
import docspell.store.Store
import docspell.store.queries.QJob
final class SchedulerImpl[F[_]: ConcurrentEffect : ContextShift](val config: SchedulerConfig
, blocker: Blocker
, queue: JobQueue[F]
, tasks: JobTaskRegistry[F]
, store: Store[F]
, logSink: LogSink[F]
, state: SignallingRef[F, State[F]]
, waiter: SignallingRef[F, Boolean]
, permits: Semaphore[F]) extends Scheduler[F] {
final class SchedulerImpl[F[_]: ConcurrentEffect: ContextShift](
val config: SchedulerConfig,
blocker: Blocker,
queue: JobQueue[F],
tasks: JobTaskRegistry[F],
store: Store[F],
logSink: LogSink[F],
state: SignallingRef[F, State[F]],
waiter: SignallingRef[F, Boolean],
permits: Semaphore[F]
) extends Scheduler[F] {
private [this] val logger = getLogger
private[this] val logger = getLogger
/**
* On startup, get all jobs in state running from this scheduler
@ -34,8 +36,13 @@ final class SchedulerImpl[F[_]: ConcurrentEffect : ContextShift](val config: Sch
QJob.runningToWaiting(config.name, store)
def periodicAwake(implicit T: Timer[F]): F[Fiber[F, Unit]] =
ConcurrentEffect[F].start(Stream.awakeEvery[F](config.wakeupPeriod.toScala).
evalMap(_ => logger.fdebug("Periodic awake reached") *> notifyChange).compile.drain)
ConcurrentEffect[F].start(
Stream
.awakeEvery[F](config.wakeupPeriod.toScala)
.evalMap(_ => logger.fdebug("Periodic awake reached") *> notifyChange)
.compile
.drain
)
def getRunning: F[Vector[RJob]] =
state.get.flatMap(s => QJob.findAll(s.getRunning, store))
@ -43,7 +50,7 @@ final class SchedulerImpl[F[_]: ConcurrentEffect : ContextShift](val config: Sch
def requestCancel(jobId: Ident): F[Boolean] =
state.get.flatMap(_.cancelRequest(jobId) match {
case Some(ct) => ct.map(_ => true)
case None => logger.fwarn(s"Job ${jobId.id} not found, cannot cancel.").map(_ => false)
case None => logger.fwarn(s"Job ${jobId.id} not found, cannot cancel.").map(_ => false)
})
def notifyChange: F[Unit] =
@ -51,59 +58,72 @@ final class SchedulerImpl[F[_]: ConcurrentEffect : ContextShift](val config: Sch
def shutdown(cancelAll: Boolean): F[Unit] = {
val doCancel =
state.get.
flatMap(_.cancelTokens.values.toList.traverse(identity)).
map(_ => ())
state.get.flatMap(_.cancelTokens.values.toList.traverse(identity)).map(_ => ())
val runShutdown =
state.modify(_.requestShutdown) *> (if (cancelAll) doCancel else ().pure[F])
val wait = Stream.eval(runShutdown).
evalMap(_ => logger.finfo("Scheduler is shutting down now.")).
flatMap(_ => Stream.eval(state.get) ++ Stream.suspend(state.discrete.takeWhile(_.getRunning.nonEmpty))).
flatMap(state => {
val wait = Stream
.eval(runShutdown)
.evalMap(_ => logger.finfo("Scheduler is shutting down now."))
.flatMap(_ =>
Stream.eval(state.get) ++ Stream.suspend(state.discrete.takeWhile(_.getRunning.nonEmpty))
)
.flatMap { state =>
if (state.getRunning.isEmpty) Stream.eval(logger.finfo("No jobs running."))
else Stream.eval(logger.finfo(s"Waiting for ${state.getRunning.size} jobs to finish.")) ++
Stream.emit(state)
})
else
Stream.eval(logger.finfo(s"Waiting for ${state.getRunning.size} jobs to finish.")) ++
Stream.emit(state)
}
(wait.drain ++ Stream.emit(())).compile.lastOrError
}
def start: Stream[F, Nothing] =
logger.sinfo("Starting scheduler") ++
mainLoop
mainLoop
def mainLoop: Stream[F, Nothing] = {
val body: F[Boolean] =
for {
_ <- permits.available.flatMap(a => logger.fdebug(s"Try to acquire permit ($a free)"))
_ <- permits.acquire
_ <- logger.fdebug("New permit acquired")
down <- state.get.map(_.shutdownRequest)
rjob <- if (down) logger.finfo("") *> permits.release *> (None: Option[RJob]).pure[F]
else queue.nextJob(group => state.modify(_.nextPrio(group, config.countingScheme)), config.name, config.retryDelay)
_ <- logger.fdebug(s"Next job found: ${rjob.map(_.info)}")
_ <- rjob.map(execute).getOrElse(permits.release)
_ <- permits.available.flatMap(a => logger.fdebug(s"Try to acquire permit ($a free)"))
_ <- permits.acquire
_ <- logger.fdebug("New permit acquired")
down <- state.get.map(_.shutdownRequest)
rjob <- if (down) logger.finfo("") *> permits.release *> (None: Option[RJob]).pure[F]
else
queue.nextJob(
group => state.modify(_.nextPrio(group, config.countingScheme)),
config.name,
config.retryDelay
)
_ <- logger.fdebug(s"Next job found: ${rjob.map(_.info)}")
_ <- rjob.map(execute).getOrElse(permits.release)
} yield rjob.isDefined
Stream.eval(state.get.map(_.shutdownRequest)).
evalTap(if (_) logger.finfo[F]("Stopping main loop due to shutdown request.") else ().pure[F]).
flatMap(if (_) Stream.empty else Stream.eval(body)).
flatMap({
Stream
.eval(state.get.map(_.shutdownRequest))
.evalTap(
if (_) logger.finfo[F]("Stopping main loop due to shutdown request.")
else ().pure[F]
)
.flatMap(if (_) Stream.empty else Stream.eval(body))
.flatMap({
case true =>
mainLoop
case false =>
logger.sdebug(s"Waiting for notify") ++
waiter.discrete.take(2).drain ++
logger.sdebug(s"Notify signal, going into main loop") ++
mainLoop
waiter.discrete.take(2).drain ++
logger.sdebug(s"Notify signal, going into main loop") ++
mainLoop
})
}
def execute(job: RJob): F[Unit] = {
val task = for {
jobtask <- tasks.find(job.task).toRight(s"This executor cannot run tasks with name: ${job.task}")
jobtask <- tasks
.find(job.task)
.toRight(s"This executor cannot run tasks with name: ${job.task}")
} yield jobtask
task match {
@ -122,18 +142,25 @@ final class SchedulerImpl[F[_]: ConcurrentEffect : ContextShift](val config: Sch
def onFinish(job: RJob, finalState: JobState): F[Unit] =
for {
_ <- logger.fdebug(s"Job ${job.info} done $finalState. Releasing resources.")
_ <- permits.release *> permits.available.flatMap(a => logger.fdebug(s"Permit released ($a free)"))
_ <- state.modify(_.removeRunning(job))
_ <- QJob.setFinalState(job.id, finalState, store)
_ <- logger.fdebug(s"Job ${job.info} done $finalState. Releasing resources.")
_ <- permits.release *> permits.available.flatMap(a =>
logger.fdebug(s"Permit released ($a free)")
)
_ <- state.modify(_.removeRunning(job))
_ <- QJob.setFinalState(job.id, finalState, store)
} yield ()
def onStart(job: RJob): F[Unit] =
QJob.setRunning(job.id, config.name, store) //also increments retries if current state=stuck
def wrapTask(job: RJob, task: Task[F, String, Unit], ctx: Context[F, String]): Task[F, String, Unit] = {
task.mapF(fa => onStart(job) *> logger.fdebug("Starting task now") *> blocker.blockOn(fa)).
mapF(_.attempt.flatMap({
def wrapTask(
job: RJob,
task: Task[F, String, Unit],
ctx: Context[F, String]
): Task[F, String, Unit] =
task
.mapF(fa => onStart(job) *> logger.fdebug("Starting task now") *> blocker.blockOn(fa))
.mapF(_.attempt.flatMap({
case Right(()) =>
logger.info(s"Job execution successful: ${job.info}")
ctx.logger.info("Job execution successful") *>
@ -148,16 +175,18 @@ final class SchedulerImpl[F[_]: ConcurrentEffect : ContextShift](val config: Sch
QJob.exceedsRetries(job.id, config.retries, store).flatMap {
case true =>
logger.error(ex)(s"Job ${job.info} execution failed. Retries exceeded.")
ctx.logger.error(ex)(s"Job ${job.info} execution failed. Retries exceeded.").
map(_ => JobState.Failed: JobState)
ctx.logger
.error(ex)(s"Job ${job.info} execution failed. Retries exceeded.")
.map(_ => JobState.Failed: JobState)
case false =>
logger.error(ex)(s"Job ${job.info} execution failed. Retrying later.")
ctx.logger.error(ex)(s"Job ${job.info} execution failed. Retrying later.").
map(_ => JobState.Stuck: JobState)
ctx.logger
.error(ex)(s"Job ${job.info} execution failed. Retrying later.")
.map(_ => JobState.Stuck: JobState)
}
}
})).
mapF(_.attempt.flatMap {
}))
.mapF(_.attempt.flatMap {
case Right(jstate) =>
onFinish(job, jstate)
case Left(ex) =>
@ -165,14 +194,14 @@ final class SchedulerImpl[F[_]: ConcurrentEffect : ContextShift](val config: Sch
// we don't know the real outcome here…
// since tasks should be idempotent, set it to stuck. if above has failed, this might fail anyways
onFinish(job, JobState.Stuck)
})
}
})
def forkRun(job: RJob, code: F[Unit], onCancel: F[Unit], ctx: Context[F, String]): F[F[Unit]] = {
val bfa = blocker.blockOn(code)
logger.fdebug(s"Forking job ${job.info}") *>
ConcurrentEffect[F].start(bfa).
map(fiber =>
ConcurrentEffect[F]
.start(bfa)
.map(fiber =>
logger.fdebug(s"Cancelling job ${job.info}") *>
fiber.cancel *>
onCancel.attempt.map({
@ -184,7 +213,8 @@ final class SchedulerImpl[F[_]: ConcurrentEffect : ContextShift](val config: Sch
state.modify(_.markCancelled(job)) *>
onFinish(job, JobState.Cancelled) *>
ctx.logger.warn("Job has been cancelled.") *>
logger.fdebug(s"Job ${job.info} has been cancelled."))
logger.fdebug(s"Job ${job.info} has been cancelled.")
)
}
}
@ -193,10 +223,12 @@ object SchedulerImpl {
def emptyState[F[_]]: State[F] =
State(Map.empty, Set.empty, Map.empty, false)
case class State[F[_]]( counters: Map[Ident, CountingScheme]
, cancelled: Set[Ident]
, cancelTokens: Map[Ident, CancelToken[F]]
, shutdownRequest: Boolean) {
case class State[F[_]](
counters: Map[Ident, CountingScheme],
cancelled: Set[Ident],
cancelTokens: Map[Ident, CancelToken[F]],
shutdownRequest: Boolean
) {
def nextPrio(group: Ident, initial: CountingScheme): (State[F], Priority) = {
val (cs, prio) = counters.getOrElse(group, initial).nextPriority

View File

@ -24,11 +24,11 @@ trait Task[F[_], A, B] {
def mapF[C](f: F[B] => F[C]): Task[F, A, C] =
Task(Task.toKleisli(this).mapF(f))
def attempt(implicit F: ApplicativeError[F,Throwable]): Task[F, A, Either[Throwable, B]] =
def attempt(implicit F: ApplicativeError[F, Throwable]): Task[F, A, Either[Throwable, B]] =
mapF(_.attempt)
def contramap[C](f: C => F[A])(implicit F: FlatMap[F]): Task[F, C, B] = {
ctxc: Context[F, C] => f(ctxc.args).flatMap(a => run(ctxc.map(_ => a)))
def contramap[C](f: C => F[A])(implicit F: FlatMap[F]): Task[F, C, B] = { ctxc: Context[F, C] =>
f(ctxc.args).flatMap(a => run(ctxc.map(_ => a)))
}
}
@ -46,7 +46,6 @@ object Task {
def apply[F[_], A, B](k: Kleisli[F, Context[F, A], B]): Task[F, A, B] =
c => k.run(c)
def toKleisli[F[_], A, B](t: Task[F, A, B]): Kleisli[F, Context[F, A], B] =
Kleisli(t.run)

View File

@ -6,8 +6,8 @@ import minitest.SimpleTestSuite
object CountingSchemeSpec extends SimpleTestSuite {
test("counting") {
val cs = CountingScheme(2,1)
val list = List.iterate(cs.nextPriority, 6)(_._1.nextPriority).map(_._2)
val cs = CountingScheme(2, 1)
val list = List.iterate(cs.nextPriority, 6)(_._1.nextPriority).map(_._2)
val expect = List(Priority.High, Priority.High, Priority.Low)
assertEquals(list, expect ++ expect)
}