mirror of
https://github.com/TheAnachronism/docspell.git
synced 2025-06-21 18:08:25 +00:00
Initial version.
Features: - Upload PDF files let them analyze - Manage meta data and items - See processing in webapp
This commit is contained in:
14
modules/joex/src/main/resources/logback.xml
Normal file
14
modules/joex/src/main/resources/logback.xml
Normal file
@ -0,0 +1,14 @@
|
||||
<configuration>
|
||||
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
|
||||
<withJansi>true</withJansi>
|
||||
|
||||
<encoder>
|
||||
<pattern>[%thread] %highlight(%-5level) %cyan(%logger{15}) - %msg %n</pattern>
|
||||
</encoder>
|
||||
</appender>
|
||||
|
||||
<logger name="docspell" level="debug" />
|
||||
<root level="INFO">
|
||||
<appender-ref ref="STDOUT" />
|
||||
</root>
|
||||
</configuration>
|
@ -1,3 +1,131 @@
|
||||
docspell.joex {
|
||||
|
||||
# This is the id of this node. If you run more than one server, you
|
||||
# have to make sure to provide unique ids per node.
|
||||
app-id = "joex1"
|
||||
|
||||
|
||||
# This is the base URL this application is deployed to. This is used
|
||||
# to register this joex instance such that docspell rest servers can
|
||||
# reach them
|
||||
base-url = "http://localhost:7878"
|
||||
|
||||
# Where the REST server binds to.
|
||||
#
|
||||
# JOEX provides a very simple REST interface to inspect its state.
|
||||
bind {
|
||||
address = "localhost"
|
||||
port = 7878
|
||||
}
|
||||
|
||||
# The database connection.
|
||||
#
|
||||
# By default a H2 file-based database is configured. You can provide
|
||||
# a postgresql or mariadb connection here. When using H2 use the
|
||||
# PostgreSQL compatibility mode and AUTO_SERVER feature.
|
||||
#
|
||||
# It must be the same connection as the rest server is using.
|
||||
jdbc {
|
||||
url = "jdbc:h2://"${java.io.tmpdir}"/docspell-demo.db;MODE=PostgreSQL;DATABASE_TO_LOWER=TRUE;AUTO_SERVER=TRUE"
|
||||
user = "sa"
|
||||
password = ""
|
||||
}
|
||||
|
||||
# Configuration for the job scheduler.
|
||||
scheduler {
|
||||
|
||||
# Each scheduler needs a unique name. This defaults to the node
|
||||
# name, which must be unique, too.
|
||||
name = ${docspell.joex.app-id}
|
||||
|
||||
# Number of processing allowed in parallel.
|
||||
pool-size = 2
|
||||
|
||||
# A counting scheme determines the ratio of how high- and low-prio
|
||||
# jobs are run. For example: 4,1 means run 4 high prio jobs, then
|
||||
# 1 low prio and then start over.
|
||||
counting-scheme = "4,1"
|
||||
|
||||
# How often a failed job should be retried until it enters failed
|
||||
# state. If a job fails, it becomes "stuck" and will be retried
|
||||
# after a delay.
|
||||
retries = 5
|
||||
|
||||
# The delay until the next try is performed for a failed job. This
|
||||
# delay is increased exponentially with the number of retries.
|
||||
retry-delay = "1 minute"
|
||||
|
||||
# The queue size of log statements from a job.
|
||||
log-buffer-size = 500
|
||||
|
||||
# If no job is left in the queue, the scheduler will wait until a
|
||||
# notify is requested (using the REST interface). To also retry
|
||||
# stuck jobs, it will notify itself periodically.
|
||||
wakeup-period = "30 minutes"
|
||||
}
|
||||
|
||||
# Configuration of text extraction
|
||||
#
|
||||
# Extracting text currently only work for image and pdf files. It
|
||||
# will first runs ghostscript to create a gray image from a
|
||||
# pdf. Then unpaper is run to optimize the image for the upcoming
|
||||
# ocr, which will be done by tesseract. All these programs must be
|
||||
# available in your PATH or the absolute path can be specified
|
||||
# below.
|
||||
extraction {
|
||||
allowed-content-types = [ "application/pdf", "image/jpeg", "image/png" ]
|
||||
|
||||
# Defines what pages to process. If a PDF with 600 pages is
|
||||
# submitted, it is probably not necessary to scan through all of
|
||||
# them. This would take a long time and occupy resources for no
|
||||
# value. The first few pages should suffice. The default is first
|
||||
# 10 pages.
|
||||
#
|
||||
# If you want all pages being processed, set this number to -1.
|
||||
#
|
||||
# Note: if you change the ghostscript command below, be aware that
|
||||
# this setting (if not -1) will add another parameter to the
|
||||
# beginning of the command.
|
||||
page-range {
|
||||
begin = 10
|
||||
}
|
||||
|
||||
# The ghostscript command.
|
||||
ghostscript {
|
||||
command {
|
||||
program = "gs"
|
||||
args = [ "-dNOPAUSE"
|
||||
, "-dBATCH"
|
||||
, "-dSAFER"
|
||||
, "-sDEVICE=tiffscaled8"
|
||||
, "-sOutputFile={{outfile}}"
|
||||
, "{{infile}}"
|
||||
]
|
||||
timeout = "5 minutes"
|
||||
}
|
||||
working-dir = ${java.io.tmpdir}"/docspell-extraction"
|
||||
}
|
||||
|
||||
# The unpaper command.
|
||||
unpaper {
|
||||
command {
|
||||
program = "unpaper"
|
||||
args = [ "{{infile}}", "{{outfile}}" ]
|
||||
timeout = "5 minutes"
|
||||
}
|
||||
}
|
||||
|
||||
# The tesseract command.
|
||||
tesseract {
|
||||
command {
|
||||
program = "tesseract"
|
||||
args = ["{{file}}"
|
||||
, "stdout"
|
||||
, "-l"
|
||||
, "{{lang}}"
|
||||
]
|
||||
timeout = "5 minutes"
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
@ -1,18 +1,21 @@
|
||||
package docspell.joex
|
||||
|
||||
import docspell.common.{Ident, LenientUri}
|
||||
import docspell.joex.scheduler.SchedulerConfig
|
||||
import docspell.store.JdbcConfig
|
||||
import docspell.text.ocr.{Config => OcrConfig}
|
||||
|
||||
case class Config(id: String
|
||||
case class Config(appId: Ident
|
||||
, baseUrl: LenientUri
|
||||
, bind: Config.Bind
|
||||
, jdbc: JdbcConfig
|
||||
, scheduler: SchedulerConfig
|
||||
, extraction: OcrConfig
|
||||
)
|
||||
|
||||
object Config {
|
||||
|
||||
|
||||
val default: Config =
|
||||
Config("testid", Config.Bind("localhost", 7878), JdbcConfig("", "", ""))
|
||||
|
||||
val postgres = JdbcConfig(LenientUri.unsafe("jdbc:postgresql://localhost:5432/docspelldev"), "dev", "dev")
|
||||
val h2 = JdbcConfig(LenientUri.unsafe("jdbc:h2:./target/docspelldev.db;MODE=PostgreSQL;DATABASE_TO_LOWER=TRUE"), "sa", "")
|
||||
|
||||
case class Bind(address: String, port: Int)
|
||||
}
|
||||
|
20
modules/joex/src/main/scala/docspell/joex/ConfigFile.scala
Normal file
20
modules/joex/src/main/scala/docspell/joex/ConfigFile.scala
Normal file
@ -0,0 +1,20 @@
|
||||
package docspell.joex
|
||||
|
||||
import docspell.common.pureconfig.Implicits._
|
||||
import _root_.pureconfig._
|
||||
import _root_.pureconfig.generic.auto._
|
||||
import docspell.joex.scheduler.CountingScheme
|
||||
|
||||
object ConfigFile {
|
||||
import Implicits._
|
||||
|
||||
def loadConfig: Config =
|
||||
ConfigSource.default.at("docspell.joex").loadOrThrow[Config]
|
||||
|
||||
|
||||
object Implicits {
|
||||
implicit val countingSchemeReader: ConfigReader[CountingScheme] =
|
||||
ConfigReader[String].emap(reason(CountingScheme.readString))
|
||||
|
||||
}
|
||||
}
|
@ -1,6 +1,23 @@
|
||||
package docspell.joex
|
||||
|
||||
import docspell.common.Ident
|
||||
import docspell.joex.scheduler.Scheduler
|
||||
import docspell.store.records.RJobLog
|
||||
|
||||
trait JoexApp[F[_]] {
|
||||
|
||||
def init: F[Unit]
|
||||
|
||||
def scheduler: Scheduler[F]
|
||||
|
||||
def findLogs(jobId: Ident): F[Vector[RJobLog]]
|
||||
|
||||
/** Shuts down the job executor.
|
||||
*
|
||||
* It will immediately stop taking new jobs, waiting for currently
|
||||
* running jobs to complete normally (i.e. running jobs are not
|
||||
* canceled). After this completed, the webserver stops and the
|
||||
* main loop will exit.
|
||||
*/
|
||||
def initShutdown: F[Unit]
|
||||
}
|
||||
|
@ -1,16 +1,56 @@
|
||||
package docspell.joex
|
||||
|
||||
import cats.implicits._
|
||||
import cats.effect._
|
||||
import docspell.common.{Ident, NodeType, ProcessItemArgs}
|
||||
import docspell.joex.process.ItemHandler
|
||||
import docspell.joex.scheduler.{JobTask, Scheduler, SchedulerBuilder}
|
||||
import docspell.store.Store
|
||||
import docspell.store.ops.ONode
|
||||
import docspell.store.records.RJobLog
|
||||
import fs2.concurrent.SignallingRef
|
||||
|
||||
final class JoexAppImpl[F[_]: Sync](cfg: Config) extends JoexApp[F] {
|
||||
import scala.concurrent.ExecutionContext
|
||||
|
||||
def init: F[Unit] =
|
||||
Sync[F].pure(())
|
||||
final class JoexAppImpl[F[_]: ConcurrentEffect : ContextShift: Timer]( cfg: Config
|
||||
, nodeOps: ONode[F]
|
||||
, store: Store[F]
|
||||
, termSignal: SignallingRef[F, Boolean]
|
||||
, val scheduler: Scheduler[F]) extends JoexApp[F] {
|
||||
|
||||
def init: F[Unit] = {
|
||||
val run = scheduler.start.compile.drain
|
||||
for {
|
||||
_ <- ConcurrentEffect[F].start(run)
|
||||
_ <- scheduler.periodicAwake
|
||||
_ <- nodeOps.register(cfg.appId, NodeType.Joex, cfg.baseUrl)
|
||||
} yield ()
|
||||
}
|
||||
|
||||
def findLogs(jobId: Ident): F[Vector[RJobLog]] =
|
||||
store.transact(RJobLog.findLogs(jobId))
|
||||
|
||||
def shutdown: F[Unit] =
|
||||
nodeOps.unregister(cfg.appId)
|
||||
|
||||
def initShutdown: F[Unit] =
|
||||
scheduler.shutdown(false) *> termSignal.set(true)
|
||||
|
||||
}
|
||||
|
||||
object JoexAppImpl {
|
||||
|
||||
def create[F[_]: Sync](cfg: Config): Resource[F, JoexApp[F]] =
|
||||
Resource.liftF(Sync[F].pure(new JoexAppImpl(cfg)))
|
||||
def create[F[_]: ConcurrentEffect : ContextShift: Timer](cfg: Config
|
||||
, termSignal: SignallingRef[F, Boolean]
|
||||
, connectEC: ExecutionContext
|
||||
, blocker: Blocker): Resource[F, JoexApp[F]] =
|
||||
for {
|
||||
store <- Store.create(cfg.jdbc, connectEC, blocker)
|
||||
nodeOps <- ONode(store)
|
||||
sch <- SchedulerBuilder(cfg.scheduler, blocker, store).
|
||||
withTask(JobTask.json(ProcessItemArgs.taskName, ItemHandler[F](cfg.extraction), ItemHandler.onCancel[F])).
|
||||
resource
|
||||
app = new JoexAppImpl(cfg, nodeOps, store, termSignal, sch)
|
||||
appR <- Resource.make(app.init.map(_ => app))(_.shutdown)
|
||||
} yield appR
|
||||
}
|
||||
|
@ -1,37 +1,48 @@
|
||||
package docspell.joex
|
||||
|
||||
import cats.effect._
|
||||
import cats.effect.concurrent.Ref
|
||||
import docspell.joex.routes._
|
||||
import org.http4s.server.blaze.BlazeServerBuilder
|
||||
import org.http4s.implicits._
|
||||
import fs2.Stream
|
||||
|
||||
import fs2.concurrent.SignallingRef
|
||||
import org.http4s.HttpApp
|
||||
import org.http4s.server.middleware.Logger
|
||||
import org.http4s.server.Router
|
||||
|
||||
import scala.concurrent.ExecutionContext
|
||||
|
||||
object JoexServer {
|
||||
|
||||
def stream[F[_]: ConcurrentEffect](cfg: Config)
|
||||
|
||||
private case class App[F[_]](httpApp: HttpApp[F], termSig: SignallingRef[F, Boolean], exitRef: Ref[F, ExitCode])
|
||||
|
||||
def stream[F[_]: ConcurrentEffect : ContextShift](cfg: Config, connectEC: ExecutionContext, blocker: Blocker)
|
||||
(implicit T: Timer[F]): Stream[F, Nothing] = {
|
||||
|
||||
val app = for {
|
||||
joexApp <- JoexAppImpl.create[F](cfg)
|
||||
_ <- Resource.liftF(joexApp.init)
|
||||
signal <- Resource.liftF(SignallingRef[F, Boolean](false))
|
||||
exitCode <- Resource.liftF(Ref[F].of(ExitCode.Success))
|
||||
joexApp <- JoexAppImpl.create[F](cfg, signal, connectEC, blocker)
|
||||
|
||||
httpApp = Router(
|
||||
"/api/info" -> InfoRoutes(cfg)
|
||||
"/api/info" -> InfoRoutes(cfg),
|
||||
"/api/v1" -> JoexRoutes(cfg, joexApp)
|
||||
).orNotFound
|
||||
|
||||
// With Middlewares in place
|
||||
finalHttpApp = Logger.httpApp(false, false)(httpApp)
|
||||
|
||||
} yield finalHttpApp
|
||||
} yield App(finalHttpApp, signal, exitCode)
|
||||
|
||||
|
||||
Stream.resource(app).flatMap(httpApp =>
|
||||
BlazeServerBuilder[F]
|
||||
.bindHttp(cfg.bind.port, cfg.bind.address)
|
||||
.withHttpApp(httpApp)
|
||||
.serve
|
||||
Stream.resource(app).flatMap(app =>
|
||||
BlazeServerBuilder[F].
|
||||
bindHttp(cfg.bind.port, cfg.bind.address).
|
||||
withHttpApp(app.httpApp).
|
||||
withoutBanner.
|
||||
serveWhile(app.termSig, app.exitRef)
|
||||
)
|
||||
|
||||
}.drain
|
||||
|
@ -1,16 +1,23 @@
|
||||
package docspell.joex
|
||||
|
||||
import cats.effect.{ExitCode, IO, IOApp}
|
||||
import cats.effect.{Blocker, ExitCode, IO, IOApp}
|
||||
import cats.implicits._
|
||||
|
||||
import scala.concurrent.ExecutionContext
|
||||
import java.util.concurrent.Executors
|
||||
import java.nio.file.{Files, Paths}
|
||||
|
||||
import docspell.common.{Banner, ThreadFactories}
|
||||
import org.log4s._
|
||||
|
||||
object Main extends IOApp {
|
||||
private[this] val logger = getLogger
|
||||
|
||||
val blockingEc: ExecutionContext = ExecutionContext.fromExecutor(Executors.newCachedThreadPool)
|
||||
val blockingEc: ExecutionContext = ExecutionContext.fromExecutor(
|
||||
Executors.newCachedThreadPool(ThreadFactories.ofName("docspell-joex-blocking")))
|
||||
val blocker = Blocker.liftExecutionContext(blockingEc)
|
||||
val connectEC: ExecutionContext = ExecutionContext.fromExecutorService(
|
||||
Executors.newFixedThreadPool(5, ThreadFactories.ofName("docspell-joex-dbconnect")))
|
||||
|
||||
def run(args: List[String]) = {
|
||||
args match {
|
||||
@ -32,7 +39,14 @@ object Main extends IOApp {
|
||||
}
|
||||
}
|
||||
|
||||
val cfg = Config.default
|
||||
JoexServer.stream[IO](cfg).compile.drain.as(ExitCode.Success)
|
||||
val cfg = ConfigFile.loadConfig
|
||||
val banner = Banner("JOEX"
|
||||
, BuildInfo.version
|
||||
, BuildInfo.gitHeadCommit
|
||||
, cfg.jdbc.url
|
||||
, Option(System.getProperty("config.file"))
|
||||
, cfg.appId, cfg.baseUrl)
|
||||
logger.info(s"\n${banner.render("***>")}")
|
||||
JoexServer.stream[IO](cfg, connectEC, blocker).compile.drain.as(ExitCode.Success)
|
||||
}
|
||||
}
|
||||
|
@ -0,0 +1,73 @@
|
||||
package docspell.joex.process
|
||||
|
||||
import cats.implicits._
|
||||
import cats.effect.Sync
|
||||
import fs2.Stream
|
||||
import docspell.common._
|
||||
import docspell.joex.scheduler.{Context, Task}
|
||||
import docspell.store.queries.QItem
|
||||
import docspell.store.records.{RAttachment, RItem}
|
||||
|
||||
/**
|
||||
* Task that creates the item.
|
||||
*/
|
||||
object CreateItem {
|
||||
|
||||
def apply[F[_]: Sync]: Task[F, ProcessItemArgs, ItemData] =
|
||||
findExisting[F].flatMap {
|
||||
case Some(ri) => Task.pure(ri)
|
||||
case None => createNew[F]
|
||||
}
|
||||
|
||||
def createNew[F[_]: Sync]: Task[F, ProcessItemArgs, ItemData] =
|
||||
Task { ctx =>
|
||||
val validFiles = ctx.args.meta.validFileTypes.map(_.asString).toSet
|
||||
|
||||
def fileMetas(itemId: Ident, now: Timestamp) = Stream.emits(ctx.args.files).
|
||||
flatMap(f => ctx.store.bitpeace.get(f.fileMetaId.id).map(fm => (f, fm))).
|
||||
collect({ case (f, Some(fm)) if validFiles.contains(fm.mimetype.baseType) => (f, fm) }).
|
||||
zipWithIndex.
|
||||
evalMap({ case ((f, fm), index) =>
|
||||
Ident.randomId[F].map(id => RAttachment(id, itemId, f.fileMetaId, index.toInt, now, f.name))
|
||||
}).
|
||||
compile.toVector
|
||||
|
||||
val item = RItem.newItem[F](ctx.args.meta.collective
|
||||
, ctx.args.makeSubject
|
||||
, ctx.args.meta.sourceAbbrev
|
||||
, ctx.args.meta.direction.getOrElse(Direction.Incoming)
|
||||
, ItemState.Premature)
|
||||
|
||||
for {
|
||||
_ <- ctx.logger.info(s"Creating new item with ${ctx.args.files.size} attachment(s)")
|
||||
time <- Duration.stopTime[F]
|
||||
it <- item
|
||||
n <- ctx.store.transact(RItem.insert(it))
|
||||
_ <- if (n != 1) storeItemError[F](ctx) else ().pure[F]
|
||||
fm <- fileMetas(it.id, it.created)
|
||||
k <- fm.traverse(a => ctx.store.transact(RAttachment.insert(a)))
|
||||
_ <- logDifferences(ctx, fm, k.sum)
|
||||
dur <- time
|
||||
_ <- ctx.logger.info(s"Creating item finished in ${dur.formatExact}")
|
||||
} yield ItemData(it, fm, Vector.empty, Vector.empty)
|
||||
}
|
||||
|
||||
def findExisting[F[_]: Sync]: Task[F, ProcessItemArgs, Option[ItemData]] =
|
||||
Task { ctx =>
|
||||
for {
|
||||
cand <- ctx.store.transact(QItem.findByFileIds(ctx.args.files.map(_.fileMetaId)))
|
||||
_ <- if (cand.nonEmpty) ctx.logger.warn("Found existing item with these files.") else ().pure[F]
|
||||
ht <- cand.drop(1).traverse(ri => QItem.delete(ctx.store)(ri.id, ri.cid))
|
||||
_ <- if (ht.sum > 0) ctx.logger.warn(s"Removed ${ht.sum} items with same attachments") else ().pure[F]
|
||||
rms <- cand.headOption.traverse(ri => ctx.store.transact(RAttachment.findByItemAndCollective(ri.id, ri.cid)))
|
||||
} yield cand.headOption.map(ri => ItemData(ri, rms.getOrElse(Vector.empty), Vector.empty, Vector.empty))
|
||||
}
|
||||
|
||||
private def logDifferences[F[_]: Sync](ctx: Context[F, ProcessItemArgs], saved: Vector[RAttachment], saveCount: Int): F[Unit] =
|
||||
ctx.logger.info("TODO log diffs")
|
||||
|
||||
private def storeItemError[F[_]: Sync](ctx: Context[F, ProcessItemArgs]): F[Unit] = {
|
||||
val msg = "Inserting item failed. DB returned 0 update count!"
|
||||
ctx.logger.error(msg) *> Sync[F].raiseError(new Exception(msg))
|
||||
}
|
||||
}
|
@ -0,0 +1,181 @@
|
||||
package docspell.joex.process
|
||||
|
||||
import java.time.ZoneId
|
||||
|
||||
import cats.{Applicative, FlatMap}
|
||||
import cats.implicits._
|
||||
import cats.effect.Sync
|
||||
import docspell.common.MetaProposal.Candidate
|
||||
import docspell.common._
|
||||
import docspell.joex.scheduler.{Context, Task}
|
||||
import docspell.store.records.{RAttachmentMeta, REquipment, ROrganization, RPerson}
|
||||
import docspell.text.contact.Domain
|
||||
|
||||
/** Super simple approach to find corresponding meta data to an item
|
||||
* by looking up values from NER in the users address book.
|
||||
*
|
||||
*/
|
||||
object FindProposal {
|
||||
|
||||
def apply[F[_]: Sync](data: ItemData): Task[F, ProcessItemArgs, ItemData] =
|
||||
Task { ctx =>
|
||||
val rmas = data.metas.map(rm =>
|
||||
rm.copy(nerlabels = removeDuplicates(rm.nerlabels)))
|
||||
|
||||
ctx.logger.info("Starting find-proposal") *>
|
||||
rmas.traverse(rm => processAttachment(rm, data.findDates(rm), ctx).map(ml => rm.copy(proposals = ml))).
|
||||
flatMap(rmv => rmv.traverse(rm =>
|
||||
ctx.logger.debug(s"Storing attachment proposals: ${rm.proposals}") *>
|
||||
ctx.store.transact(RAttachmentMeta.updateProposals(rm.id, rm.proposals))).
|
||||
map(_ => data.copy(metas = rmv)))
|
||||
}
|
||||
|
||||
def processAttachment[F[_]: Sync]( rm: RAttachmentMeta
|
||||
, rd: Vector[NerDateLabel]
|
||||
, ctx: Context[F, ProcessItemArgs]): F[MetaProposalList] = {
|
||||
val finder = Finder.searchExact(ctx).next(Finder.searchFuzzy(ctx))
|
||||
List(finder.find(rm.nerlabels), makeDateProposal(rd)).
|
||||
traverse(identity).map(MetaProposalList.flatten)
|
||||
}
|
||||
|
||||
def makeDateProposal[F[_]: Sync](dates: Vector[NerDateLabel]): F[MetaProposalList] = {
|
||||
Timestamp.current[F].map { now =>
|
||||
val latestFirst = dates.sortWith(_.date isAfter _.date)
|
||||
val nowDate = now.value.atZone(ZoneId.of("GMT")).toLocalDate
|
||||
val (after, before) = latestFirst.span(ndl => ndl.date.isAfter(nowDate))
|
||||
|
||||
val dueDates = MetaProposalList.fromSeq1(MetaProposalType.DueDate,
|
||||
after.map(ndl => Candidate(IdRef(Ident.unsafe(ndl.date.toString), ndl.date.toString), Set(ndl.label))))
|
||||
val itemDates = MetaProposalList.fromSeq1(MetaProposalType.DocDate,
|
||||
before.map(ndl => Candidate(IdRef(Ident.unsafe(ndl.date.toString), ndl.date.toString), Set(ndl.label))))
|
||||
|
||||
MetaProposalList.flatten(Seq(dueDates, itemDates))
|
||||
}
|
||||
}
|
||||
|
||||
def removeDuplicates(labels: List[NerLabel]): List[NerLabel] =
|
||||
labels.foldLeft((Set.empty[String], List.empty[NerLabel])) { case ((seen, result), el) =>
|
||||
if (seen.contains(el.tag.name+el.label.toLowerCase)) (seen, result)
|
||||
else (seen + (el.tag.name + el.label.toLowerCase), el :: result)
|
||||
}._2.sortBy(_.startPosition)
|
||||
|
||||
trait Finder[F[_]] { self =>
|
||||
def find(labels: Seq[NerLabel]): F[MetaProposalList]
|
||||
|
||||
def contraMap(f: Seq[NerLabel] => Seq[NerLabel]): Finder[F] =
|
||||
labels => self.find(f(labels))
|
||||
|
||||
def filterLabels(f: NerLabel => Boolean): Finder[F] =
|
||||
contraMap(_.filter(f))
|
||||
|
||||
def flatMap(f: MetaProposalList => Finder[F])(implicit F: FlatMap[F]): Finder[F] =
|
||||
labels => self.find(labels).flatMap(ml => f(ml).find(labels))
|
||||
|
||||
def map(f: MetaProposalList => MetaProposalList)(implicit F: Applicative[F]): Finder[F] =
|
||||
labels => self.find(labels).map(f)
|
||||
|
||||
def next(f: Finder[F])(implicit F: FlatMap[F], F3: Applicative[F]): Finder[F] =
|
||||
flatMap({ ml0 =>
|
||||
if (ml0.hasResultsAll) Finder.unit[F](ml0)
|
||||
else f.map(ml1 => ml0.fillEmptyFrom(ml1))
|
||||
})
|
||||
|
||||
def nextWhenEmpty(f: Finder[F], mt0: MetaProposalType, mts: MetaProposalType*)
|
||||
(implicit F: FlatMap[F], F2: Applicative[F]): Finder[F] =
|
||||
flatMap(res0 => {
|
||||
if (res0.hasResults(mt0, mts: _*)) Finder.unit[F](res0)
|
||||
else f.map(res1 => res0.fillEmptyFrom(res1))
|
||||
})
|
||||
}
|
||||
|
||||
object Finder {
|
||||
def none[F[_]: Applicative]: Finder[F] =
|
||||
_ => MetaProposalList.empty.pure[F]
|
||||
|
||||
def unit[F[_]: Applicative](value: MetaProposalList): Finder[F] =
|
||||
_ => value.pure[F]
|
||||
|
||||
def searchExact[F[_]: Sync](ctx: Context[F, ProcessItemArgs]): Finder[F] =
|
||||
labels => labels.toList.traverse(nl => search(nl, true, ctx)).map(MetaProposalList.flatten)
|
||||
|
||||
def searchFuzzy[F[_]: Sync](ctx: Context[F, ProcessItemArgs]): Finder[F] =
|
||||
labels => labels.toList.traverse(nl => search(nl, false, ctx)).map(MetaProposalList.flatten)
|
||||
}
|
||||
|
||||
private def search[F[_]: Sync](nt: NerLabel, exact: Boolean, ctx: Context[F, ProcessItemArgs]): F[MetaProposalList] = {
|
||||
val value =
|
||||
if (exact) normalizeSearchValue(nt.label)
|
||||
else s"%${normalizeSearchValue(nt.label)}%"
|
||||
val minLength =
|
||||
if (exact) 2 else 5
|
||||
|
||||
if (value.length < minLength) {
|
||||
ctx.logger.debug(s"Skipping too small value '$value' (original '${nt.label}').").map(_ => MetaProposalList.empty)
|
||||
} else nt.tag match {
|
||||
case NerTag.Organization =>
|
||||
ctx.logger.debug(s"Looking for organizations: $value") *>
|
||||
ctx.store.transact(ROrganization.findLike(ctx.args.meta.collective, value)).
|
||||
map(MetaProposalList.from(MetaProposalType.CorrOrg, nt))
|
||||
|
||||
case NerTag.Person =>
|
||||
val s1 = ctx.store.transact(RPerson.findLike(ctx.args.meta.collective, value, true)).
|
||||
map(MetaProposalList.from(MetaProposalType.ConcPerson, nt))
|
||||
val s2 = ctx.store.transact(RPerson.findLike(ctx.args.meta.collective, value, false)).
|
||||
map(MetaProposalList.from(MetaProposalType.CorrPerson, nt))
|
||||
ctx.logger.debug(s"Looking for persons: $value") *> (for {
|
||||
ml0 <- s1
|
||||
ml1 <- s2
|
||||
} yield ml0 |+| ml1)
|
||||
|
||||
case NerTag.Location =>
|
||||
ctx.logger.debug(s"NerTag 'Location' is currently not used. Ignoring value '$value'.").
|
||||
map(_ => MetaProposalList.empty)
|
||||
|
||||
case NerTag.Misc =>
|
||||
ctx.logger.debug(s"Looking for equipments: $value") *>
|
||||
ctx.store.transact(REquipment.findLike(ctx.args.meta.collective, value)).
|
||||
map(MetaProposalList.from(MetaProposalType.ConcEquip, nt))
|
||||
|
||||
case NerTag.Email =>
|
||||
searchContact(nt, ContactKind.Email, value, ctx)
|
||||
|
||||
case NerTag.Website =>
|
||||
if (!exact) {
|
||||
val searchString = Domain.domainFromUri(nt.label.toLowerCase).
|
||||
toOption.
|
||||
map(_.toPrimaryDomain.asString).
|
||||
map(s => s"%$s%").
|
||||
getOrElse(value)
|
||||
searchContact(nt, ContactKind.Website, searchString, ctx)
|
||||
} else {
|
||||
searchContact(nt, ContactKind.Website, value, ctx)
|
||||
}
|
||||
|
||||
case NerTag.Date =>
|
||||
// There is no database search required for this tag
|
||||
MetaProposalList.empty.pure[F]
|
||||
}
|
||||
}
|
||||
|
||||
private def searchContact[F[_]: Sync]( nt: NerLabel
|
||||
, kind: ContactKind
|
||||
, value: String
|
||||
, ctx: Context[F, ProcessItemArgs]): F[MetaProposalList] = {
|
||||
val orgs = ctx.store.transact(ROrganization.findLike(ctx.args.meta.collective, kind, value)).
|
||||
map(MetaProposalList.from(MetaProposalType.CorrOrg, nt))
|
||||
val corrP = ctx.store.transact(RPerson.findLike(ctx.args.meta.collective, kind, value, false)).
|
||||
map(MetaProposalList.from(MetaProposalType.CorrPerson, nt))
|
||||
val concP = ctx.store.transact(RPerson.findLike(ctx.args.meta.collective, kind, value, true)).
|
||||
map(MetaProposalList.from(MetaProposalType.CorrPerson, nt))
|
||||
|
||||
ctx.logger.debug(s"Looking with $kind: $value") *>
|
||||
List(orgs, corrP, concP).traverse(identity).map(MetaProposalList.flatten)
|
||||
}
|
||||
|
||||
// The backslash *must* be stripped from search strings.
|
||||
private [this] val invalidSearch =
|
||||
"…_[]^<>=&ſ/{}*?@#$|~`+%\"';\\".toSet
|
||||
|
||||
private def normalizeSearchValue(str: String): String =
|
||||
str.toLowerCase.filter(c => !invalidSearch.contains(c))
|
||||
}
|
@ -0,0 +1,27 @@
|
||||
package docspell.joex.process
|
||||
|
||||
import docspell.common.{Ident, NerDateLabel, NerLabel}
|
||||
import docspell.joex.process.ItemData.AttachmentDates
|
||||
import docspell.store.records.{RAttachment, RAttachmentMeta, RItem}
|
||||
|
||||
case class ItemData( item: RItem
|
||||
, attachments: Vector[RAttachment]
|
||||
, metas: Vector[RAttachmentMeta]
|
||||
, dateLabels: Vector[AttachmentDates]) {
|
||||
|
||||
def findMeta(attachId: Ident): Option[RAttachmentMeta] =
|
||||
metas.find(_.id == attachId)
|
||||
|
||||
def findDates(rm: RAttachmentMeta): Vector[NerDateLabel] =
|
||||
dateLabels.find(m => m.rm.id == rm.id).map(_.dates).getOrElse(Vector.empty)
|
||||
}
|
||||
|
||||
|
||||
object ItemData {
|
||||
|
||||
case class AttachmentDates(rm: RAttachmentMeta, dates: Vector[NerDateLabel]) {
|
||||
def toNerLabel: Vector[NerLabel] =
|
||||
dates.map(dl => dl.label.copy(label = dl.date.toString))
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,62 @@
|
||||
package docspell.joex.process
|
||||
|
||||
import cats.implicits._
|
||||
import cats.effect.{ContextShift, Sync}
|
||||
import docspell.common.{ItemState, ProcessItemArgs}
|
||||
import docspell.joex.scheduler.{Context, Task}
|
||||
import docspell.store.queries.QItem
|
||||
import docspell.store.records.{RItem, RJob}
|
||||
import docspell.text.ocr.{Config => OcrConfig}
|
||||
|
||||
object ItemHandler {
|
||||
def onCancel[F[_]: Sync: ContextShift]: Task[F, ProcessItemArgs, Unit] =
|
||||
logWarn("Now cancelling. Deleting potentially created data.").
|
||||
flatMap(_ => deleteByFileIds)
|
||||
|
||||
def apply[F[_]: Sync: ContextShift](cfg: OcrConfig): Task[F, ProcessItemArgs, Unit] =
|
||||
CreateItem[F].
|
||||
flatMap(itemStateTask(ItemState.Processing)).
|
||||
flatMap(safeProcess[F](cfg)).
|
||||
map(_ => ())
|
||||
|
||||
def itemStateTask[F[_]: Sync, A](state: ItemState)(data: ItemData): Task[F, A, ItemData] =
|
||||
Task { ctx =>
|
||||
ctx.store.transact(RItem.updateState(data.item.id, state)).map(_ => data)
|
||||
}
|
||||
|
||||
def isLastRetry[F[_]: Sync, A](ctx: Context[F, A]): F[Boolean] =
|
||||
for {
|
||||
current <- ctx.store.transact(RJob.getRetries(ctx.jobId))
|
||||
last = ctx.config.retries == current.getOrElse(0)
|
||||
} yield last
|
||||
|
||||
|
||||
def safeProcess[F[_]: Sync: ContextShift](cfg: OcrConfig)(data: ItemData): Task[F, ProcessItemArgs, ItemData] =
|
||||
Task(isLastRetry[F, ProcessItemArgs] _).flatMap {
|
||||
case true =>
|
||||
ProcessItem[F](cfg)(data).
|
||||
attempt.flatMap({
|
||||
case Right(d) =>
|
||||
Task.pure(d)
|
||||
case Left(ex) =>
|
||||
logWarn[F]("Processing failed on last retry. Creating item but without proposals.").
|
||||
flatMap(_ => itemStateTask(ItemState.Created)(data)).
|
||||
andThen(_ => Sync[F].raiseError(ex))
|
||||
})
|
||||
case false =>
|
||||
ProcessItem[F](cfg)(data).
|
||||
flatMap(itemStateTask(ItemState.Created))
|
||||
}
|
||||
|
||||
def deleteByFileIds[F[_]: Sync: ContextShift]: Task[F, ProcessItemArgs, Unit] =
|
||||
Task { ctx =>
|
||||
for {
|
||||
items <- ctx.store.transact(QItem.findByFileIds(ctx.args.files.map(_.fileMetaId)))
|
||||
_ <- ctx.logger.info(s"Deleting items ${items.map(_.id.id)}")
|
||||
_ <- items.traverse(i => QItem.delete(ctx.store)(i.id, ctx.args.meta.collective))
|
||||
} yield ()
|
||||
}
|
||||
|
||||
private def logWarn[F[_]](msg: => String): Task[F, ProcessItemArgs, Unit] =
|
||||
Task(_.logger.warn(msg))
|
||||
}
|
@ -0,0 +1,71 @@
|
||||
package docspell.joex.process
|
||||
|
||||
import cats.implicits._
|
||||
import cats.effect.Sync
|
||||
import docspell.common._
|
||||
import docspell.joex.scheduler.{Context, Task}
|
||||
import docspell.store.records.RItem
|
||||
|
||||
object LinkProposal {
|
||||
|
||||
def apply[F[_]: Sync](data: ItemData): Task[F, ProcessItemArgs, ItemData] =
|
||||
Task { ctx =>
|
||||
val proposals = MetaProposalList.flatten(data.metas.map(_.proposals))
|
||||
|
||||
ctx.logger.info(s"Starting linking proposals") *>
|
||||
MetaProposalType.all.
|
||||
traverse(applyValue(data, proposals, ctx)).
|
||||
map(result => ctx.logger.info(s"Results from proposal processing: $result")).
|
||||
map(_ => data)
|
||||
}
|
||||
|
||||
def applyValue[F[_]: Sync](data: ItemData, proposalList: MetaProposalList, ctx: Context[F, ProcessItemArgs])(mpt: MetaProposalType): F[Result] = {
|
||||
proposalList.find(mpt) match {
|
||||
case None =>
|
||||
Result.noneFound(mpt).pure[F]
|
||||
case Some(a) if a.isSingleValue =>
|
||||
ctx.logger.info(s"Found one candidate for ${a.proposalType}") *>
|
||||
setItemMeta(data.item.id, ctx, a.proposalType, a.values.head.ref.id).
|
||||
map(_ => Result.single(mpt))
|
||||
case Some(a) =>
|
||||
ctx.logger.info(s"Found many (${a.size}, ${a.values.map(_.ref.id.id)}) candidates for ${a.proposalType}. Setting first.") *>
|
||||
setItemMeta(data.item.id, ctx, a.proposalType, a.values.head.ref.id).
|
||||
map(_ => Result.multiple(mpt))
|
||||
}
|
||||
}
|
||||
|
||||
def setItemMeta[F[_]: Sync](itemId: Ident, ctx: Context[F, ProcessItemArgs], mpt: MetaProposalType, value: Ident): F[Int] =
|
||||
mpt match {
|
||||
case MetaProposalType.CorrOrg =>
|
||||
ctx.logger.debug(s"Updating item organization with: ${value.id}") *>
|
||||
ctx.store.transact(RItem.updateCorrOrg(itemId, ctx.args.meta.collective, Some(value)))
|
||||
case MetaProposalType.ConcPerson =>
|
||||
ctx.logger.debug(s"Updating item concerning person with: $value") *>
|
||||
ctx.store.transact(RItem.updateConcPerson(itemId, ctx.args.meta.collective, Some(value)))
|
||||
case MetaProposalType.CorrPerson =>
|
||||
ctx.logger.debug(s"Updating item correspondent person with: $value") *>
|
||||
ctx.store.transact(RItem.updateCorrPerson(itemId, ctx.args.meta.collective, Some(value)))
|
||||
case MetaProposalType.ConcEquip =>
|
||||
ctx.logger.debug(s"Updating item concerning equipment with: $value") *>
|
||||
ctx.store.transact(RItem.updateConcEquip(itemId, ctx.args.meta.collective, Some(value)))
|
||||
case MetaProposalType.DocDate =>
|
||||
ctx.logger.debug(s"Not linking document date suggestion ${value.id}").map(_ => 0)
|
||||
case MetaProposalType.DueDate =>
|
||||
ctx.logger.debug(s"Not linking document date suggestion ${value.id}").map(_ => 0)
|
||||
}
|
||||
|
||||
|
||||
sealed trait Result {
|
||||
def proposalType: MetaProposalType
|
||||
}
|
||||
object Result {
|
||||
|
||||
case class NoneFound(proposalType: MetaProposalType) extends Result
|
||||
case class SingleResult(proposalType: MetaProposalType) extends Result
|
||||
case class MultipleResult(proposalType: MetaProposalType) extends Result
|
||||
|
||||
def noneFound(proposalType: MetaProposalType): Result = NoneFound(proposalType)
|
||||
def single(proposalType: MetaProposalType): Result = SingleResult(proposalType)
|
||||
def multiple(proposalType: MetaProposalType): Result = MultipleResult(proposalType)
|
||||
}
|
||||
}
|
@ -0,0 +1,19 @@
|
||||
package docspell.joex.process
|
||||
|
||||
import cats.effect.{ContextShift, Sync}
|
||||
import docspell.common.ProcessItemArgs
|
||||
import docspell.joex.scheduler.Task
|
||||
import docspell.text.ocr.{Config => OcrConfig}
|
||||
|
||||
object ProcessItem {
|
||||
|
||||
def apply[F[_]: Sync: ContextShift](cfg: OcrConfig)(item: ItemData): Task[F, ProcessItemArgs, ItemData] =
|
||||
TextExtraction(cfg, item).
|
||||
flatMap(Task.setProgress(25)).
|
||||
flatMap(TextAnalysis[F]).
|
||||
flatMap(Task.setProgress(50)).
|
||||
flatMap(FindProposal[F]).
|
||||
flatMap(Task.setProgress(75)).
|
||||
flatMap(LinkProposal[F]).
|
||||
flatMap(Task.setProgress(99))
|
||||
}
|
@ -0,0 +1,39 @@
|
||||
package docspell.joex.process
|
||||
|
||||
import cats.implicits._
|
||||
import cats.effect.Sync
|
||||
import docspell.common.ProcessItemArgs
|
||||
import docspell.common.syntax.all._
|
||||
import docspell.joex.scheduler.Task
|
||||
import org.log4s._
|
||||
|
||||
object TestTasks {
|
||||
private [this] val logger = getLogger
|
||||
|
||||
def success[F[_]]: Task[F, ProcessItemArgs, Unit] =
|
||||
Task { ctx =>
|
||||
ctx.logger.info(s"Running task now: ${ctx.args}")
|
||||
}
|
||||
|
||||
def failing[F[_]: Sync]: Task[F, ProcessItemArgs, Unit] =
|
||||
Task { ctx =>
|
||||
ctx.logger.info(s"Failing the task run :(").map(_ =>
|
||||
sys.error("Oh, cannot extract gold from this document")
|
||||
)
|
||||
}
|
||||
|
||||
def longRunning[F[_]: Sync]: Task[F, ProcessItemArgs, Unit] =
|
||||
Task { ctx =>
|
||||
logger.fwarn(s"${Thread.currentThread()} From executing long running task") >>
|
||||
ctx.logger.info(s"${Thread.currentThread()} Running task now: ${ctx.args}") >>
|
||||
sleep(2400) >>
|
||||
ctx.logger.debug("doing things") >>
|
||||
sleep(2400) >>
|
||||
ctx.logger.debug("doing more things") >>
|
||||
sleep(2400) >>
|
||||
ctx.logger.info("doing more things")
|
||||
}
|
||||
|
||||
private def sleep[F[_]:Sync](ms: Long): F[Unit] =
|
||||
Sync[F].delay(Thread.sleep(ms))
|
||||
}
|
@ -0,0 +1,49 @@
|
||||
package docspell.joex.process
|
||||
|
||||
import cats.implicits._
|
||||
import cats.effect.Sync
|
||||
import docspell.common.{Duration, Language, NerLabel, ProcessItemArgs}
|
||||
import docspell.joex.process.ItemData.AttachmentDates
|
||||
import docspell.joex.scheduler.Task
|
||||
import docspell.store.records.RAttachmentMeta
|
||||
import docspell.text.contact.Contact
|
||||
import docspell.text.date.DateFind
|
||||
import docspell.text.nlp.StanfordNerClassifier
|
||||
|
||||
object TextAnalysis {
|
||||
|
||||
def apply[F[_]: Sync](item: ItemData): Task[F, ProcessItemArgs, ItemData] =
|
||||
Task { ctx =>
|
||||
for {
|
||||
_ <- ctx.logger.info("Starting text analysis")
|
||||
s <- Duration.stopTime[F]
|
||||
t <- item.metas.toList.traverse(annotateAttachment[F](ctx.args.meta.language))
|
||||
_ <- ctx.logger.debug(s"Storing tags: ${t.map(_._1.copy(content = None))}")
|
||||
_ <- t.traverse(m => ctx.store.transact(RAttachmentMeta.updateLabels(m._1.id, m._1.nerlabels)))
|
||||
e <- s
|
||||
_ <- ctx.logger.info(s"Text-Analysis finished in ${e.formatExact}")
|
||||
v = t.toVector
|
||||
} yield item.copy(metas = v.map(_._1), dateLabels = v.map(_._2))
|
||||
}
|
||||
|
||||
def annotateAttachment[F[_]: Sync](lang: Language)(rm: RAttachmentMeta): F[(RAttachmentMeta, AttachmentDates)] =
|
||||
for {
|
||||
list0 <- stanfordNer[F](lang, rm)
|
||||
list1 <- contactNer[F](rm)
|
||||
dates <- dateNer[F](rm, lang)
|
||||
} yield (rm.copy(nerlabels = (list0 ++ list1 ++ dates.toNerLabel).toList), dates)
|
||||
|
||||
def stanfordNer[F[_]: Sync](lang: Language, rm: RAttachmentMeta): F[Vector[NerLabel]] = Sync[F].delay {
|
||||
rm.content.map(StanfordNerClassifier.nerAnnotate(lang)).getOrElse(Vector.empty)
|
||||
}
|
||||
|
||||
def contactNer[F[_]: Sync](rm: RAttachmentMeta): F[Vector[NerLabel]] = Sync[F].delay {
|
||||
rm.content.map(Contact.annotate).getOrElse(Vector.empty)
|
||||
}
|
||||
|
||||
def dateNer[F[_]: Sync](rm: RAttachmentMeta, lang: Language): F[AttachmentDates] = Sync[F].delay {
|
||||
AttachmentDates(rm, rm.content.map(txt => DateFind.findDates(txt, lang).toVector).getOrElse(Vector.empty))
|
||||
}
|
||||
|
||||
|
||||
}
|
@ -0,0 +1,45 @@
|
||||
package docspell.joex.process
|
||||
|
||||
import bitpeace.RangeDef
|
||||
import cats.implicits._
|
||||
import cats.effect.{Blocker, ContextShift, Sync}
|
||||
import docspell.common.{Duration, Language, ProcessItemArgs}
|
||||
import docspell.joex.scheduler.{Context, Task}
|
||||
import docspell.store.Store
|
||||
import docspell.store.records.{RAttachment, RAttachmentMeta}
|
||||
import docspell.text.ocr.{TextExtract, Config => OcrConfig}
|
||||
|
||||
object TextExtraction {
|
||||
|
||||
def apply[F[_]: Sync : ContextShift](cfg: OcrConfig, item: ItemData): Task[F, ProcessItemArgs, ItemData] =
|
||||
Task { ctx =>
|
||||
for {
|
||||
_ <- ctx.logger.info("Starting text extraction")
|
||||
start <- Duration.stopTime[F]
|
||||
txt <- item.attachments.traverse(extractTextToMeta(ctx, cfg, ctx.args.meta.language))
|
||||
_ <- ctx.logger.debug("Storing extracted texts")
|
||||
_ <- txt.toList.traverse(rm => ctx.store.transact(RAttachmentMeta.upsert(rm)))
|
||||
dur <- start
|
||||
_ <- ctx.logger.info(s"Text extraction finished in ${dur.formatExact}")
|
||||
} yield item.copy(metas = txt)
|
||||
}
|
||||
|
||||
def extractTextToMeta[F[_]: Sync : ContextShift](ctx: Context[F, _], cfg: OcrConfig, lang: Language)(ra: RAttachment): F[RAttachmentMeta] =
|
||||
for {
|
||||
_ <- ctx.logger.debug(s"Extracting text for attachment ${ra.name}")
|
||||
dst <- Duration.stopTime[F]
|
||||
txt <- extractText(cfg, lang, ctx.store, ctx.blocker)(ra)
|
||||
meta = RAttachmentMeta.empty(ra.id).copy(content = txt.map(_.trim).filter(_.nonEmpty))
|
||||
est <- dst
|
||||
_ <- ctx.logger.debug(s"Extracting text for attachment ${ra.name} finished in ${est.formatExact}")
|
||||
} yield meta
|
||||
|
||||
def extractText[F[_]: Sync : ContextShift](ocrConfig: OcrConfig, lang: Language, store: Store[F], blocker: Blocker)(ra: RAttachment): F[Option[String]] = {
|
||||
val data = store.bitpeace.get(ra.fileId.id).
|
||||
unNoneTerminate.
|
||||
through(store.bitpeace.fetchData2(RangeDef.all))
|
||||
|
||||
TextExtract.extract(data, blocker, lang.iso3, ocrConfig).
|
||||
compile.last
|
||||
}
|
||||
}
|
@ -1,14 +1,12 @@
|
||||
package docspell.joex
|
||||
package docspell.joex.routes
|
||||
|
||||
import cats.effect._
|
||||
import org.http4s._
|
||||
import cats.effect.Sync
|
||||
import docspell.joex.{BuildInfo, Config}
|
||||
import docspell.joexapi.model.VersionInfo
|
||||
import org.http4s.HttpRoutes
|
||||
import org.http4s.dsl.Http4sDsl
|
||||
import org.http4s.circe.CirceEntityEncoder._
|
||||
|
||||
import docspell.joexapi.model._
|
||||
import docspell.joex.BuildInfo
|
||||
|
||||
object InfoRoutes {
|
||||
|
||||
def apply[F[_]: Sync](cfg: Config): HttpRoutes[F] = {
|
@ -0,0 +1,59 @@
|
||||
package docspell.joex.routes
|
||||
|
||||
import cats.implicits._
|
||||
import cats.effect._
|
||||
import docspell.common.{Duration, Ident, Timestamp}
|
||||
import docspell.joex.{Config, JoexApp}
|
||||
import docspell.joexapi.model._
|
||||
import docspell.store.records.{RJob, RJobLog}
|
||||
import org.http4s.HttpRoutes
|
||||
import org.http4s.dsl.Http4sDsl
|
||||
import org.http4s.circe.CirceEntityEncoder._
|
||||
|
||||
object JoexRoutes {
|
||||
|
||||
def apply[F[_]: ConcurrentEffect: Timer](cfg: Config, app: JoexApp[F]): HttpRoutes[F] = {
|
||||
val dsl = new Http4sDsl[F]{}
|
||||
import dsl._
|
||||
HttpRoutes.of[F] {
|
||||
case POST -> Root / "notify" =>
|
||||
for {
|
||||
_ <- app.scheduler.notifyChange
|
||||
resp <- Ok(BasicResult(true, "Scheduler notified."))
|
||||
} yield resp
|
||||
|
||||
case GET -> Root / "running" =>
|
||||
for {
|
||||
jobs <- app.scheduler.getRunning
|
||||
jj = jobs.map(mkJob)
|
||||
resp <- Ok(JobList(jj.toList))
|
||||
} yield resp
|
||||
|
||||
case POST -> Root / "shutdownAndExit" =>
|
||||
for {
|
||||
_ <- ConcurrentEffect[F].start(Timer[F].sleep(Duration.seconds(1).toScala) *> app.initShutdown)
|
||||
resp <- Ok(BasicResult(true, "Shutdown initiated."))
|
||||
} yield resp
|
||||
|
||||
case GET -> Root / "job" / Ident(id) =>
|
||||
for {
|
||||
optJob <- app.scheduler.getRunning.map(_.find(_.id == id))
|
||||
optLog <- optJob.traverse(j => app.findLogs(j.id))
|
||||
jAndL = for { job <- optJob; log <- optLog } yield mkJobLog(job, log)
|
||||
resp <- jAndL.map(Ok(_)).getOrElse(NotFound(BasicResult(false, "Not found")))
|
||||
} yield resp
|
||||
|
||||
case POST -> Root / "job" / Ident(id) / "cancel" =>
|
||||
for {
|
||||
flag <- app.scheduler.requestCancel(id)
|
||||
resp <- Ok(BasicResult(flag, if (flag) "Cancel request submitted" else "Job not found"))
|
||||
} yield resp
|
||||
}
|
||||
}
|
||||
|
||||
def mkJob(j: RJob): Job =
|
||||
Job(j.id, j.subject, j.submitted, j.priority, j.retries, j.progress, j.started.getOrElse(Timestamp.Epoch))
|
||||
|
||||
def mkJobLog(j: RJob, jl: Vector[RJobLog]): JobAndLog =
|
||||
JobAndLog(mkJob(j), jl.map(r => JobLogEvent(r.created, r.level, r.message)).toList)
|
||||
}
|
@ -0,0 +1,69 @@
|
||||
package docspell.joex.scheduler
|
||||
|
||||
import cats.Functor
|
||||
import cats.effect.{Blocker, Concurrent}
|
||||
import cats.implicits._
|
||||
import docspell.common.Ident
|
||||
import docspell.store.Store
|
||||
import docspell.store.records.RJob
|
||||
import docspell.common.syntax.all._
|
||||
import org.log4s.{Logger => _, _}
|
||||
|
||||
trait Context[F[_], A] { self =>
|
||||
|
||||
def jobId: Ident
|
||||
|
||||
def args: A
|
||||
|
||||
def config: SchedulerConfig
|
||||
|
||||
def logger: Logger[F]
|
||||
|
||||
def setProgress(percent: Int): F[Unit]
|
||||
|
||||
def store: Store[F]
|
||||
|
||||
def blocker: Blocker
|
||||
|
||||
def map[C](f: A => C)(implicit F: Functor[F]): Context[F, C] =
|
||||
new Context.ContextImpl[F, C](f(args), logger, store, blocker, config, jobId)
|
||||
}
|
||||
|
||||
object Context {
|
||||
private [this] val log = getLogger
|
||||
|
||||
def create[F[_]: Functor, A]( job: RJob
|
||||
, arg: A
|
||||
, config: SchedulerConfig
|
||||
, log: Logger[F]
|
||||
, store: Store[F]
|
||||
, blocker: Blocker): Context[F, A] =
|
||||
new ContextImpl(arg, log, store, blocker, config, job.id)
|
||||
|
||||
def apply[F[_]: Concurrent, A]( job: RJob
|
||||
, arg: A
|
||||
, config: SchedulerConfig
|
||||
, logSink: LogSink[F]
|
||||
, blocker: Blocker
|
||||
, store: Store[F]): F[Context[F, A]] =
|
||||
for {
|
||||
_ <- log.ftrace("Creating logger for task run")
|
||||
logger <- Logger(job.id, job.info, config.logBufferSize, logSink)
|
||||
_ <- log.ftrace("Logger created, instantiating context")
|
||||
ctx = create[F, A](job, arg, config, logger, store, blocker)
|
||||
} yield ctx
|
||||
|
||||
private final class ContextImpl[F[_]: Functor, A]( val args: A
|
||||
, val logger: Logger[F]
|
||||
, val store: Store[F]
|
||||
, val blocker: Blocker
|
||||
, val config: SchedulerConfig
|
||||
, val jobId: Ident)
|
||||
extends Context[F,A] {
|
||||
|
||||
def setProgress(percent: Int): F[Unit] = {
|
||||
val pval = math.min(100, math.max(0, percent))
|
||||
store.transact(RJob.setProgress(jobId, pval)).map(_ => ())
|
||||
}
|
||||
}
|
||||
}
|
@ -0,0 +1,40 @@
|
||||
package docspell.joex.scheduler
|
||||
|
||||
import cats.implicits._
|
||||
import docspell.common.Priority
|
||||
|
||||
/** A counting scheme to indicate a ratio between scheduling high and
|
||||
* low priority jobs.
|
||||
*
|
||||
* For example high=4, low=1 means: ”schedule 4 high priority jobs
|
||||
* and then 1 low priority job“.
|
||||
*/
|
||||
case class CountingScheme(high: Int, low: Int, counter: Int = 0) {
|
||||
|
||||
def nextPriority: (CountingScheme, Priority) = {
|
||||
if (counter <= 0) (increment, Priority.High)
|
||||
else {
|
||||
val rest = counter % (high + low)
|
||||
if (rest < high) (increment, Priority.High)
|
||||
else (increment, Priority.Low)
|
||||
}
|
||||
}
|
||||
|
||||
def increment: CountingScheme =
|
||||
copy(counter = counter + 1)
|
||||
}
|
||||
|
||||
object CountingScheme {
|
||||
|
||||
def writeString(cs: CountingScheme): String =
|
||||
s"${cs.high},${cs.low}"
|
||||
|
||||
def readString(str: String): Either[String, CountingScheme] =
|
||||
str.split(',') match {
|
||||
case Array(h, l) =>
|
||||
Either.catchNonFatal(CountingScheme(h.toInt, l.toInt)).
|
||||
left.map(_.getMessage)
|
||||
case _ =>
|
||||
Left(s"Invalid counting scheme: $str")
|
||||
}
|
||||
}
|
@ -0,0 +1,33 @@
|
||||
package docspell.joex.scheduler
|
||||
|
||||
import cats.implicits._
|
||||
import cats.effect.Sync
|
||||
import docspell.common.Ident
|
||||
import docspell.common.syntax.all._
|
||||
import io.circe.Decoder
|
||||
|
||||
/**
|
||||
* Binds a Task to a name. This is required to lookup the code based
|
||||
* on the taskName in the RJob data and to execute it given the
|
||||
* arguments that have to be read from a string.
|
||||
*
|
||||
* Since the scheduler only has a string for the task argument, this
|
||||
* only works for Task impls that accept a string. There is a
|
||||
* convenience constructor that uses circe to decode json into some
|
||||
* type A.
|
||||
*/
|
||||
case class JobTask[F[_]](name: Ident, task: Task[F, String, Unit], onCancel: Task[F, String, Unit])
|
||||
|
||||
object JobTask {
|
||||
|
||||
def json[F[_]: Sync, A](name: Ident, task: Task[F, A, Unit], onCancel: Task[F, A, Unit])
|
||||
(implicit D: Decoder[A]): JobTask[F] = {
|
||||
val convert: String => F[A] =
|
||||
str => str.parseJsonAs[A] match {
|
||||
case Right(a) => a.pure[F]
|
||||
case Left(ex) => Sync[F].raiseError(new Exception(s"Cannot parse task arguments: $str", ex))
|
||||
}
|
||||
|
||||
JobTask(name, task.contramap(convert), onCancel.contramap(convert))
|
||||
}
|
||||
}
|
@ -0,0 +1,26 @@
|
||||
package docspell.joex.scheduler
|
||||
|
||||
import docspell.common.Ident
|
||||
|
||||
/**
|
||||
* This is a mapping from some identifier to a task. This is used by
|
||||
* the scheduler to lookup an implementation using the taskName field
|
||||
* of the RJob database record.
|
||||
*/
|
||||
final class JobTaskRegistry[F[_]](tasks: Map[Ident, JobTask[F]]) {
|
||||
|
||||
def withTask(task: JobTask[F]): JobTaskRegistry[F] =
|
||||
JobTaskRegistry(tasks.updated(task.name, task))
|
||||
|
||||
def find(taskName: Ident): Option[JobTask[F]] =
|
||||
tasks.get(taskName)
|
||||
}
|
||||
|
||||
object JobTaskRegistry {
|
||||
|
||||
def apply[F[_]](map: Map[Ident, JobTask[F]]): JobTaskRegistry[F] =
|
||||
new JobTaskRegistry[F](map)
|
||||
|
||||
def empty[F[_]]: JobTaskRegistry[F] = apply(Map.empty)
|
||||
|
||||
}
|
@ -0,0 +1,25 @@
|
||||
package docspell.joex.scheduler
|
||||
|
||||
import cats.implicits._
|
||||
import docspell.common._
|
||||
import cats.effect.Sync
|
||||
|
||||
case class LogEvent( jobId: Ident
|
||||
, jobInfo: String
|
||||
, time: Timestamp
|
||||
, level: LogLevel
|
||||
, msg: String
|
||||
, ex: Option[Throwable] = None) {
|
||||
|
||||
def logLine: String =
|
||||
s">>> ${time.asString} $level $jobInfo: $msg"
|
||||
|
||||
}
|
||||
|
||||
object LogEvent {
|
||||
|
||||
def create[F[_]: Sync](jobId: Ident, jobInfo: String, level: LogLevel, msg: String): F[LogEvent] =
|
||||
Timestamp.current[F].map(now => LogEvent(jobId, jobInfo, now, level, msg))
|
||||
|
||||
|
||||
}
|
@ -0,0 +1,59 @@
|
||||
package docspell.joex.scheduler
|
||||
|
||||
import cats.implicits._
|
||||
import cats.effect.{Concurrent, Sync}
|
||||
import fs2.{Pipe, Stream}
|
||||
import org.log4s.{LogLevel => _, _}
|
||||
import docspell.common._
|
||||
import docspell.common.syntax.all._
|
||||
import docspell.store.Store
|
||||
import docspell.store.records.RJobLog
|
||||
|
||||
trait LogSink[F[_]] {
|
||||
|
||||
def receive: Pipe[F, LogEvent, Unit]
|
||||
|
||||
}
|
||||
|
||||
object LogSink {
|
||||
private[this] val logger = getLogger
|
||||
|
||||
def apply[F[_]](sink: Pipe[F, LogEvent, Unit]): LogSink[F] =
|
||||
new LogSink[F] {
|
||||
val receive = sink
|
||||
}
|
||||
|
||||
def logInternal[F[_]: Sync](e: LogEvent): F[Unit] =
|
||||
e.level match {
|
||||
case LogLevel.Info =>
|
||||
logger.finfo(e.logLine)
|
||||
case LogLevel.Debug =>
|
||||
logger.fdebug(e.logLine)
|
||||
case LogLevel.Warn =>
|
||||
logger.fwarn(e.logLine)
|
||||
case LogLevel.Error =>
|
||||
e.ex match {
|
||||
case Some(exc) =>
|
||||
logger.ferror(exc)(e.logLine)
|
||||
case None =>
|
||||
logger.ferror(e.logLine)
|
||||
}
|
||||
}
|
||||
|
||||
def printer[F[_]: Sync]: LogSink[F] =
|
||||
LogSink(_.evalMap(e => logInternal(e)))
|
||||
|
||||
def db[F[_]: Sync](store: Store[F]): LogSink[F] =
|
||||
LogSink(_.evalMap(ev => for {
|
||||
id <- Ident.randomId[F]
|
||||
joblog = RJobLog(id, ev.jobId, ev.level, ev.time, ev.msg + ev.ex.map(th => ": "+ th.getMessage).getOrElse(""))
|
||||
_ <- logInternal(ev)
|
||||
_ <- store.transact(RJobLog.insert(joblog))
|
||||
} yield ()))
|
||||
|
||||
def dbAndLog[F[_]: Concurrent](store: Store[F]): LogSink[F] = {
|
||||
val s: Stream[F, Pipe[F, LogEvent, Unit]] =
|
||||
Stream.emits(Seq(printer[F].receive, db[F](store).receive))
|
||||
LogSink(Pipe.join(s))
|
||||
}
|
||||
}
|
@ -0,0 +1,49 @@
|
||||
package docspell.joex.scheduler
|
||||
|
||||
import cats.implicits._
|
||||
import cats.effect.{Concurrent, Sync}
|
||||
import docspell.common._
|
||||
import fs2.concurrent.Queue
|
||||
|
||||
trait Logger[F[_]] {
|
||||
|
||||
def trace(msg: => String): F[Unit]
|
||||
def debug(msg: => String): F[Unit]
|
||||
def info(msg: => String): F[Unit]
|
||||
def warn(msg: => String): F[Unit]
|
||||
def error(ex: Throwable)(msg: => String): F[Unit]
|
||||
def error(msg: => String): F[Unit]
|
||||
|
||||
}
|
||||
|
||||
object Logger {
|
||||
|
||||
def create[F[_]: Sync](jobId: Ident, jobInfo: String, q: Queue[F, LogEvent]): Logger[F] =
|
||||
new Logger[F] {
|
||||
def trace(msg: => String): F[Unit] =
|
||||
LogEvent.create[F](jobId, jobInfo, LogLevel.Debug, msg).flatMap(q.enqueue1)
|
||||
|
||||
def debug(msg: => String): F[Unit] =
|
||||
LogEvent.create[F](jobId, jobInfo, LogLevel.Debug, msg).flatMap(q.enqueue1)
|
||||
|
||||
def info(msg: => String): F[Unit] =
|
||||
LogEvent.create[F](jobId, jobInfo, LogLevel.Info, msg).flatMap(q.enqueue1)
|
||||
|
||||
def warn(msg: => String): F[Unit] =
|
||||
LogEvent.create[F](jobId, jobInfo, LogLevel.Warn, msg).flatMap(q.enqueue1)
|
||||
|
||||
def error(ex: Throwable)(msg: => String): F[Unit] =
|
||||
LogEvent.create[F](jobId, jobInfo, LogLevel.Error, msg).map(le => le.copy(ex = Some(ex))).flatMap(q.enqueue1)
|
||||
|
||||
def error(msg: => String): F[Unit] =
|
||||
LogEvent.create[F](jobId, jobInfo, LogLevel.Error, msg).flatMap(q.enqueue1)
|
||||
}
|
||||
|
||||
def apply[F[_]: Concurrent](jobId: Ident, jobInfo: String, bufferSize: Int, sink: LogSink[F]): F[Logger[F]] =
|
||||
for {
|
||||
q <- Queue.circularBuffer[F, LogEvent](bufferSize)
|
||||
log = create(jobId, jobInfo, q)
|
||||
fib <- Concurrent[F].start(q.dequeue.through(sink.receive).compile.drain)
|
||||
} yield log
|
||||
|
||||
}
|
@ -0,0 +1,33 @@
|
||||
package docspell.joex.scheduler
|
||||
|
||||
import cats.effect.{Fiber, Timer}
|
||||
import fs2.Stream
|
||||
import docspell.common.Ident
|
||||
import docspell.store.records.RJob
|
||||
|
||||
trait Scheduler[F[_]] {
|
||||
|
||||
def config: SchedulerConfig
|
||||
|
||||
def getRunning: F[Vector[RJob]]
|
||||
|
||||
def requestCancel(jobId: Ident): F[Boolean]
|
||||
|
||||
def notifyChange: F[Unit]
|
||||
|
||||
def start: Stream[F, Nothing]
|
||||
|
||||
/** Requests to shutdown the scheduler.
|
||||
*
|
||||
* The scheduler will not take any new jobs from the queue. If
|
||||
* there are still running jobs, it waits for them to complete.
|
||||
* when the cancelAll flag is set to true, it cancels all running
|
||||
* jobs.
|
||||
*
|
||||
* The returned F[Unit] can be evaluated to wait for all that to
|
||||
* complete.
|
||||
*/
|
||||
def shutdown(cancelAll: Boolean): F[Unit]
|
||||
|
||||
def periodicAwake(implicit T: Timer[F]): F[Fiber[F, Unit]]
|
||||
}
|
@ -0,0 +1,66 @@
|
||||
package docspell.joex.scheduler
|
||||
|
||||
import cats.implicits._
|
||||
import cats.effect.concurrent.Semaphore
|
||||
import cats.effect.{Blocker, ConcurrentEffect, ContextShift, Resource}
|
||||
import docspell.store.Store
|
||||
import docspell.store.queue.JobQueue
|
||||
import fs2.concurrent.SignallingRef
|
||||
|
||||
case class SchedulerBuilder[F[_]: ConcurrentEffect : ContextShift](
|
||||
config: SchedulerConfig
|
||||
, tasks: JobTaskRegistry[F]
|
||||
, store: Store[F]
|
||||
, blocker: Blocker
|
||||
, queue: Resource[F, JobQueue[F]]
|
||||
, logSink: LogSink[F]) {
|
||||
|
||||
def withConfig(cfg: SchedulerConfig): SchedulerBuilder[F] =
|
||||
copy(config = cfg)
|
||||
|
||||
def withTaskRegistry(reg: JobTaskRegistry[F]): SchedulerBuilder[F] =
|
||||
copy(tasks = reg)
|
||||
|
||||
def withTask[A](task: JobTask[F]): SchedulerBuilder[F] =
|
||||
withTaskRegistry(tasks.withTask(task))
|
||||
|
||||
def withQueue(queue: Resource[F, JobQueue[F]]): SchedulerBuilder[F] =
|
||||
SchedulerBuilder[F](config, tasks, store, blocker, queue, logSink)
|
||||
|
||||
def withBlocker(blocker: Blocker): SchedulerBuilder[F] =
|
||||
copy(blocker = blocker)
|
||||
|
||||
def withLogSink(sink: LogSink[F]): SchedulerBuilder[F] =
|
||||
copy(logSink = sink)
|
||||
|
||||
|
||||
def serve: Resource[F, Scheduler[F]] =
|
||||
resource.evalMap(sch => ConcurrentEffect[F].start(sch.start.compile.drain).map(_ => sch))
|
||||
|
||||
def resource: Resource[F, Scheduler[F]] = {
|
||||
val scheduler = for {
|
||||
jq <- queue
|
||||
waiter <- Resource.liftF(SignallingRef(true))
|
||||
state <- Resource.liftF(SignallingRef(SchedulerImpl.emptyState[F]))
|
||||
perms <- Resource.liftF(Semaphore(config.poolSize.toLong))
|
||||
} yield new SchedulerImpl[F](config, blocker, jq, tasks, store, logSink, state, waiter, perms)
|
||||
|
||||
scheduler.evalTap(_.init).
|
||||
map(s => s: Scheduler[F])
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
object SchedulerBuilder {
|
||||
|
||||
def apply[F[_]: ConcurrentEffect : ContextShift]( config: SchedulerConfig
|
||||
, blocker: Blocker
|
||||
, store: Store[F]): SchedulerBuilder[F] =
|
||||
new SchedulerBuilder[F](config
|
||||
, JobTaskRegistry.empty[F]
|
||||
, store
|
||||
, blocker
|
||||
, JobQueue(store)
|
||||
, LogSink.db[F](store))
|
||||
|
||||
}
|
@ -0,0 +1,25 @@
|
||||
package docspell.joex.scheduler
|
||||
|
||||
import docspell.common._
|
||||
|
||||
case class SchedulerConfig( name: Ident
|
||||
, poolSize: Int
|
||||
, countingScheme: CountingScheme
|
||||
, retries: Int
|
||||
, retryDelay: Duration
|
||||
, logBufferSize: Int
|
||||
, wakeupPeriod: Duration
|
||||
)
|
||||
|
||||
object SchedulerConfig {
|
||||
|
||||
val default = SchedulerConfig(
|
||||
name = Ident.unsafe("default-scheduler")
|
||||
, poolSize = 2 // math.max(2, Runtime.getRuntime.availableProcessors / 2)
|
||||
, countingScheme = CountingScheme(2, 1)
|
||||
, retries = 5
|
||||
, retryDelay = Duration.seconds(30)
|
||||
, logBufferSize = 500
|
||||
, wakeupPeriod = Duration.minutes(10)
|
||||
)
|
||||
}
|
@ -0,0 +1,227 @@
|
||||
package docspell.joex.scheduler
|
||||
|
||||
import fs2.Stream
|
||||
import cats.implicits._
|
||||
import cats.effect.concurrent.Semaphore
|
||||
import docspell.common._
|
||||
import docspell.common.syntax.all._
|
||||
import docspell.store.queue.JobQueue
|
||||
import docspell.store.records.RJob
|
||||
import fs2.concurrent.SignallingRef
|
||||
import cats.effect._
|
||||
import org.log4s._
|
||||
import SchedulerImpl._
|
||||
import docspell.store.Store
|
||||
import docspell.store.queries.QJob
|
||||
|
||||
final class SchedulerImpl[F[_]: ConcurrentEffect : ContextShift](val config: SchedulerConfig
|
||||
, blocker: Blocker
|
||||
, queue: JobQueue[F]
|
||||
, tasks: JobTaskRegistry[F]
|
||||
, store: Store[F]
|
||||
, logSink: LogSink[F]
|
||||
, state: SignallingRef[F, State[F]]
|
||||
, waiter: SignallingRef[F, Boolean]
|
||||
, permits: Semaphore[F]) extends Scheduler[F] {
|
||||
|
||||
private [this] val logger = getLogger
|
||||
|
||||
/**
|
||||
* On startup, get all jobs in state running from this scheduler
|
||||
* and put them into waiting state, so they get picked up again.
|
||||
*/
|
||||
def init: F[Unit] =
|
||||
QJob.runningToWaiting(config.name, store)
|
||||
|
||||
def periodicAwake(implicit T: Timer[F]): F[Fiber[F, Unit]] =
|
||||
ConcurrentEffect[F].start(Stream.awakeEvery[F](config.wakeupPeriod.toScala).
|
||||
evalMap(_ => logger.fdebug("Periodic awake reached") *> notifyChange).compile.drain)
|
||||
|
||||
def getRunning: F[Vector[RJob]] =
|
||||
state.get.flatMap(s => QJob.findAll(s.getRunning, store))
|
||||
|
||||
def requestCancel(jobId: Ident): F[Boolean] =
|
||||
state.get.flatMap(_.cancelRequest(jobId) match {
|
||||
case Some(ct) => ct.map(_ => true)
|
||||
case None => logger.fwarn(s"Job ${jobId.id} not found, cannot cancel.").map(_ => false)
|
||||
})
|
||||
|
||||
def notifyChange: F[Unit] =
|
||||
waiter.update(b => !b)
|
||||
|
||||
def shutdown(cancelAll: Boolean): F[Unit] = {
|
||||
val doCancel =
|
||||
state.get.
|
||||
flatMap(_.cancelTokens.values.toList.traverse(identity)).
|
||||
map(_ => ())
|
||||
|
||||
val runShutdown =
|
||||
state.modify(_.requestShutdown) *> (if (cancelAll) doCancel else ().pure[F])
|
||||
|
||||
val wait = Stream.eval(runShutdown).
|
||||
evalMap(_ => logger.finfo("Scheduler is shutting down now.")).
|
||||
flatMap(_ => Stream.eval(state.get) ++ Stream.suspend(state.discrete.takeWhile(_.getRunning.nonEmpty))).
|
||||
flatMap(state => {
|
||||
if (state.getRunning.isEmpty) Stream.eval(logger.finfo("No jobs running."))
|
||||
else Stream.eval(logger.finfo(s"Waiting for ${state.getRunning.size} jobs to finish.")) ++
|
||||
Stream.emit(state)
|
||||
})
|
||||
|
||||
(wait.drain ++ Stream.emit(())).compile.lastOrError
|
||||
}
|
||||
|
||||
def start: Stream[F, Nothing] =
|
||||
logger.sinfo("Starting scheduler") ++
|
||||
mainLoop
|
||||
|
||||
def mainLoop: Stream[F, Nothing] = {
|
||||
val body: F[Boolean] =
|
||||
for {
|
||||
_ <- permits.available.flatMap(a => logger.fdebug(s"Try to acquire permit ($a free)"))
|
||||
_ <- permits.acquire
|
||||
_ <- logger.fdebug("New permit acquired")
|
||||
down <- state.get.map(_.shutdownRequest)
|
||||
rjob <- if (down) logger.finfo("") *> permits.release *> (None: Option[RJob]).pure[F]
|
||||
else queue.nextJob(group => state.modify(_.nextPrio(group, config.countingScheme)), config.name, config.retryDelay)
|
||||
_ <- logger.fdebug(s"Next job found: ${rjob.map(_.info)}")
|
||||
_ <- rjob.map(execute).getOrElse(permits.release)
|
||||
} yield rjob.isDefined
|
||||
|
||||
Stream.eval(state.get.map(_.shutdownRequest)).
|
||||
evalTap(if (_) logger.finfo[F]("Stopping main loop due to shutdown request.") else ().pure[F]).
|
||||
flatMap(if (_) Stream.empty else Stream.eval(body)).
|
||||
flatMap({
|
||||
case true =>
|
||||
mainLoop
|
||||
case false =>
|
||||
logger.sdebug(s"Waiting for notify") ++
|
||||
waiter.discrete.take(2).drain ++
|
||||
logger.sdebug(s"Notify signal, going into main loop") ++
|
||||
mainLoop
|
||||
})
|
||||
}
|
||||
|
||||
def execute(job: RJob): F[Unit] = {
|
||||
val task = for {
|
||||
jobtask <- tasks.find(job.task).toRight(s"This executor cannot run tasks with name: ${job.task}")
|
||||
} yield jobtask
|
||||
|
||||
task match {
|
||||
case Left(err) =>
|
||||
logger.ferror(s"Unable to start a task for job ${job.info}: $err")
|
||||
case Right(t) =>
|
||||
for {
|
||||
_ <- logger.fdebug(s"Creating context for job ${job.info} to run $t")
|
||||
ctx <- Context[F, String](job, job.args, config, logSink, blocker, store)
|
||||
jot = wrapTask(job, t.task, ctx)
|
||||
tok <- forkRun(job, jot.run(ctx), t.onCancel.run(ctx), ctx)
|
||||
_ <- state.modify(_.addRunning(job, tok))
|
||||
} yield ()
|
||||
}
|
||||
}
|
||||
|
||||
def onFinish(job: RJob, finalState: JobState): F[Unit] =
|
||||
for {
|
||||
_ <- logger.fdebug(s"Job ${job.info} done $finalState. Releasing resources.")
|
||||
_ <- permits.release *> permits.available.flatMap(a => logger.fdebug(s"Permit released ($a free)"))
|
||||
_ <- state.modify(_.removeRunning(job))
|
||||
_ <- QJob.setFinalState(job.id, finalState, store)
|
||||
} yield ()
|
||||
|
||||
def onStart(job: RJob): F[Unit] =
|
||||
QJob.setRunning(job.id, config.name, store) //also increments retries if current state=stuck
|
||||
|
||||
def wrapTask(job: RJob, task: Task[F, String, Unit], ctx: Context[F, String]): Task[F, String, Unit] = {
|
||||
task.mapF(fa => onStart(job) *> logger.fdebug("Starting task now") *> blocker.blockOn(fa)).
|
||||
mapF(_.attempt.flatMap({
|
||||
case Right(()) =>
|
||||
logger.info(s"Job execution successful: ${job.info}")
|
||||
ctx.logger.info("Job execution successful") *>
|
||||
(JobState.Success: JobState).pure[F]
|
||||
case Left(ex) =>
|
||||
state.get.map(_.wasCancelled(job)).flatMap {
|
||||
case true =>
|
||||
logger.error(ex)(s"Job ${job.info} execution failed (cancel = true)")
|
||||
ctx.logger.error(ex)("Job execution failed (cancel = true)") *>
|
||||
(JobState.Cancelled: JobState).pure[F]
|
||||
case false =>
|
||||
QJob.exceedsRetries(job.id, config.retries, store).flatMap {
|
||||
case true =>
|
||||
logger.error(ex)(s"Job ${job.info} execution failed. Retries exceeded.")
|
||||
ctx.logger.error(ex)(s"Job ${job.info} execution failed. Retries exceeded.").
|
||||
map(_ => JobState.Failed: JobState)
|
||||
case false =>
|
||||
logger.error(ex)(s"Job ${job.info} execution failed. Retrying later.")
|
||||
ctx.logger.error(ex)(s"Job ${job.info} execution failed. Retrying later.").
|
||||
map(_ => JobState.Stuck: JobState)
|
||||
}
|
||||
}
|
||||
})).
|
||||
mapF(_.attempt.flatMap {
|
||||
case Right(jstate) =>
|
||||
onFinish(job, jstate)
|
||||
case Left(ex) =>
|
||||
logger.error(ex)(s"Error happened during post-processing of ${job.info}!")
|
||||
// we don't know the real outcome here…
|
||||
// since tasks should be idempotent, set it to stuck. if above has failed, this might fail anyways
|
||||
onFinish(job, JobState.Stuck)
|
||||
})
|
||||
}
|
||||
|
||||
def forkRun(job: RJob, code: F[Unit], onCancel: F[Unit], ctx: Context[F, String]): F[F[Unit]] = {
|
||||
val bfa = blocker.blockOn(code)
|
||||
logger.fdebug(s"Forking job ${job.info}") *>
|
||||
ConcurrentEffect[F].start(bfa).
|
||||
map(fiber =>
|
||||
logger.fdebug(s"Cancelling job ${job.info}") *>
|
||||
fiber.cancel *>
|
||||
onCancel.attempt.map({
|
||||
case Right(_) => ()
|
||||
case Left(ex) =>
|
||||
logger.error(ex)(s"Task's cancelling code failed. Job ${job.info}.")
|
||||
()
|
||||
}) *>
|
||||
state.modify(_.markCancelled(job)) *>
|
||||
onFinish(job, JobState.Cancelled) *>
|
||||
ctx.logger.warn("Job has been cancelled.") *>
|
||||
logger.fdebug(s"Job ${job.info} has been cancelled."))
|
||||
}
|
||||
}
|
||||
|
||||
object SchedulerImpl {
|
||||
|
||||
def emptyState[F[_]]: State[F] =
|
||||
State(Map.empty, Set.empty, Map.empty, false)
|
||||
|
||||
case class State[F[_]]( counters: Map[Ident, CountingScheme]
|
||||
, cancelled: Set[Ident]
|
||||
, cancelTokens: Map[Ident, CancelToken[F]]
|
||||
, shutdownRequest: Boolean) {
|
||||
|
||||
def nextPrio(group: Ident, initial: CountingScheme): (State[F], Priority) = {
|
||||
val (cs, prio) = counters.getOrElse(group, initial).nextPriority
|
||||
(copy(counters = counters.updated(group, cs)), prio)
|
||||
}
|
||||
|
||||
def addRunning(job: RJob, token: CancelToken[F]): (State[F], Unit) =
|
||||
(State(counters, cancelled, cancelTokens.updated(job.id, token), shutdownRequest), ())
|
||||
|
||||
def removeRunning(job: RJob): (State[F], Unit) =
|
||||
(copy(cancelled = cancelled - job.id, cancelTokens = cancelTokens.removed(job.id)), ())
|
||||
|
||||
def markCancelled(job: RJob): (State[F], Unit) =
|
||||
(copy(cancelled = cancelled + job.id), ())
|
||||
|
||||
def wasCancelled(job: RJob): Boolean =
|
||||
cancelled.contains(job.id)
|
||||
|
||||
def cancelRequest(id: Ident): Option[F[Unit]] =
|
||||
cancelTokens.get(id)
|
||||
|
||||
def getRunning: Seq[Ident] =
|
||||
cancelTokens.keys.toSeq
|
||||
|
||||
def requestShutdown: (State[F], Unit) =
|
||||
(copy(shutdownRequest = true), ())
|
||||
}
|
||||
}
|
@ -0,0 +1,55 @@
|
||||
package docspell.joex.scheduler
|
||||
|
||||
import cats.implicits._
|
||||
import cats.{Applicative, ApplicativeError, FlatMap, Functor}
|
||||
import cats.data.Kleisli
|
||||
import cats.effect.Sync
|
||||
|
||||
/**
|
||||
* The code that is executed by the scheduler
|
||||
*/
|
||||
trait Task[F[_], A, B] {
|
||||
|
||||
def run(ctx: Context[F, A]): F[B]
|
||||
|
||||
def map[C](f: B => C)(implicit F: Functor[F]): Task[F, A, C] =
|
||||
Task(Task.toKleisli(this).map(f))
|
||||
|
||||
def flatMap[C](f: B => Task[F, A, C])(implicit F: FlatMap[F]): Task[F, A, C] =
|
||||
Task(Task.toKleisli(this).flatMap(a => Task.toKleisli(f(a))))
|
||||
|
||||
def andThen[C](f: B => F[C])(implicit F: FlatMap[F]): Task[F, A, C] =
|
||||
Task(Task.toKleisli(this).andThen(f))
|
||||
|
||||
def mapF[C](f: F[B] => F[C]): Task[F, A, C] =
|
||||
Task(Task.toKleisli(this).mapF(f))
|
||||
|
||||
def attempt(implicit F: ApplicativeError[F,Throwable]): Task[F, A, Either[Throwable, B]] =
|
||||
mapF(_.attempt)
|
||||
|
||||
def contramap[C](f: C => F[A])(implicit F: FlatMap[F]): Task[F, C, B] = {
|
||||
ctxc: Context[F, C] => f(ctxc.args).flatMap(a => run(ctxc.map(_ => a)))
|
||||
}
|
||||
}
|
||||
|
||||
object Task {
|
||||
|
||||
def pure[F[_]: Applicative, A, B](b: B): Task[F, A, B] =
|
||||
Task(_ => b.pure[F])
|
||||
|
||||
def of[F[_], A, B](b: F[B]): Task[F, A, B] =
|
||||
Task(_ => b)
|
||||
|
||||
def apply[F[_], A, B](f: Context[F, A] => F[B]): Task[F, A, B] =
|
||||
(ctx: Context[F, A]) => f(ctx)
|
||||
|
||||
def apply[F[_], A, B](k: Kleisli[F, Context[F, A], B]): Task[F, A, B] =
|
||||
c => k.run(c)
|
||||
|
||||
|
||||
def toKleisli[F[_], A, B](t: Task[F, A, B]): Kleisli[F, Context[F, A], B] =
|
||||
Kleisli(t.run)
|
||||
|
||||
def setProgress[F[_]: Sync, A, B](n: Int)(data: B): Task[F, A, B] =
|
||||
Task(_.setProgress(n).map(_ => data))
|
||||
}
|
@ -0,0 +1,15 @@
|
||||
package docspell.joex.scheduler
|
||||
|
||||
import docspell.common.Priority
|
||||
import minitest.SimpleTestSuite
|
||||
|
||||
object CountingSchemeSpec extends SimpleTestSuite {
|
||||
|
||||
test("counting") {
|
||||
val cs = CountingScheme(2,1)
|
||||
val list = List.iterate(cs.nextPriority, 6)(_._1.nextPriority).map(_._2)
|
||||
val expect = List(Priority.High, Priority.High, Priority.Low)
|
||||
assertEquals(list, expect ++ expect)
|
||||
}
|
||||
|
||||
}
|
Reference in New Issue
Block a user