mirror of
https://github.com/TheAnachronism/docspell.git
synced 2025-06-21 18:08:25 +00:00
sbt scalafmtAll
This commit is contained in:
@ -7,13 +7,13 @@ import docspell.convert.ConvertConfig
|
||||
import docspell.extract.ExtractConfig
|
||||
|
||||
case class Config(
|
||||
appId: Ident,
|
||||
baseUrl: LenientUri,
|
||||
bind: Config.Bind,
|
||||
jdbc: JdbcConfig,
|
||||
scheduler: SchedulerConfig,
|
||||
extraction: ExtractConfig,
|
||||
convert: ConvertConfig
|
||||
appId: Ident,
|
||||
baseUrl: LenientUri,
|
||||
bind: Config.Bind,
|
||||
jdbc: JdbcConfig,
|
||||
scheduler: SchedulerConfig,
|
||||
extraction: ExtractConfig,
|
||||
convert: ConvertConfig
|
||||
)
|
||||
|
||||
object Config {
|
||||
|
@ -52,15 +52,15 @@ object JoexAppImpl {
|
||||
store <- Store.create(cfg.jdbc, connectEC, blocker)
|
||||
nodeOps <- ONode(store)
|
||||
sch <- SchedulerBuilder(cfg.scheduler, blocker, store)
|
||||
.withTask(
|
||||
JobTask.json(
|
||||
ProcessItemArgs.taskName,
|
||||
ItemHandler[F](cfg),
|
||||
ItemHandler.onCancel[F]
|
||||
)
|
||||
)
|
||||
.resource
|
||||
app = new JoexAppImpl(cfg, nodeOps, store, termSignal, sch)
|
||||
.withTask(
|
||||
JobTask.json(
|
||||
ProcessItemArgs.taskName,
|
||||
ItemHandler[F](cfg),
|
||||
ItemHandler.onCancel[F]
|
||||
)
|
||||
)
|
||||
.resource
|
||||
app = new JoexAppImpl(cfg, nodeOps, store, termSignal, sch)
|
||||
appR <- Resource.make(app.init.map(_ => app))(_.shutdown)
|
||||
} yield appR
|
||||
}
|
||||
|
@ -68,7 +68,9 @@ object ConvertPdf {
|
||||
.through(ctx.store.bitpeace.fetchData2(RangeDef.all))
|
||||
val handler = conversionHandler[F](ctx, cfg, ra, item)
|
||||
ctx.logger.info(s"Converting file ${ra.name} (${mime.asString}) into a PDF") *>
|
||||
conv.toPDF(DataType(MimeType(mime.primary, mime.sub)), ctx.args.meta.language, handler)(data)
|
||||
conv.toPDF(DataType(MimeType(mime.primary, mime.sub)), ctx.args.meta.language, handler)(
|
||||
data
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
@ -107,19 +109,21 @@ object ConvertPdf {
|
||||
})
|
||||
|
||||
private def storePDF[F[_]: Sync](
|
||||
ctx: Context[F, ProcessItemArgs],
|
||||
cfg: ConvertConfig,
|
||||
ra: RAttachment,
|
||||
pdf: Stream[F, Byte]
|
||||
) = {
|
||||
val hint = MimeTypeHint.advertised(MimeType.pdf).withName(ra.name.getOrElse("file.pdf"))
|
||||
ctx: Context[F, ProcessItemArgs],
|
||||
cfg: ConvertConfig,
|
||||
ra: RAttachment,
|
||||
pdf: Stream[F, Byte]
|
||||
) = {
|
||||
val hint = MimeTypeHint.advertised(MimeType.pdf).withName(ra.name.getOrElse("file.pdf"))
|
||||
val newName = ra.name.map(n => s"$n.pdf")
|
||||
ctx.store.bitpeace
|
||||
.saveNew(pdf, cfg.chunkSize, MimetypeHint(hint.filename, hint.advertised))
|
||||
.compile
|
||||
.lastOrError
|
||||
.map(fm => Ident.unsafe(fm.id))
|
||||
.flatMap(fmId => ctx.store.transact(RAttachment.updateFileIdAndName(ra.id, fmId, newName)).map(_ => fmId))
|
||||
.flatMap(fmId =>
|
||||
ctx.store.transact(RAttachment.updateFileIdAndName(ra.id, fmId, newName)).map(_ => fmId)
|
||||
)
|
||||
.map(fmId => ra.copy(fileId = fmId, name = newName))
|
||||
}
|
||||
}
|
||||
|
@ -25,7 +25,7 @@ object CreateItem {
|
||||
Task { ctx =>
|
||||
def isValidFile(fm: FileMeta) =
|
||||
ctx.args.meta.validFileTypes.isEmpty ||
|
||||
ctx.args.meta.validFileTypes.map(_.asString).toSet.contains(fm.mimetype.baseType)
|
||||
ctx.args.meta.validFileTypes.map(_.asString).toSet.contains(fm.mimetype.baseType)
|
||||
|
||||
def fileMetas(itemId: Ident, now: Timestamp) =
|
||||
Stream
|
||||
@ -77,18 +77,18 @@ object CreateItem {
|
||||
for {
|
||||
cand <- ctx.store.transact(QItem.findByFileIds(ctx.args.files.map(_.fileMetaId)))
|
||||
_ <- if (cand.nonEmpty) ctx.logger.warn("Found existing item with these files.")
|
||||
else ().pure[F]
|
||||
else ().pure[F]
|
||||
ht <- cand.drop(1).traverse(ri => QItem.delete(ctx.store)(ri.id, ri.cid))
|
||||
_ <- if (ht.sum > 0) ctx.logger.warn(s"Removed ${ht.sum} items with same attachments")
|
||||
else ().pure[F]
|
||||
else ().pure[F]
|
||||
rms <- OptionT(
|
||||
cand.headOption.traverse(ri =>
|
||||
ctx.store.transact(RAttachment.findByItemAndCollective(ri.id, ri.cid))
|
||||
)
|
||||
).getOrElse(Vector.empty)
|
||||
cand.headOption.traverse(ri =>
|
||||
ctx.store.transact(RAttachment.findByItemAndCollective(ri.id, ri.cid))
|
||||
)
|
||||
).getOrElse(Vector.empty)
|
||||
orig <- rms.traverse(a =>
|
||||
ctx.store.transact(RAttachmentSource.findById(a.id)).map(s => (a, s))
|
||||
)
|
||||
ctx.store.transact(RAttachmentSource.findById(a.id)).map(s => (a, s))
|
||||
)
|
||||
origMap = orig
|
||||
.map(originFileTuple)
|
||||
.toMap
|
||||
|
@ -95,10 +95,10 @@ object FindProposal {
|
||||
labels => self.find(labels).map(f)
|
||||
|
||||
def next(f: Finder[F])(implicit F: FlatMap[F], F3: Applicative[F]): Finder[F] =
|
||||
flatMap({ ml0 =>
|
||||
flatMap { ml0 =>
|
||||
if (ml0.hasResultsAll) Finder.unit[F](ml0)
|
||||
else f.map(ml1 => ml0.fillEmptyFrom(ml1))
|
||||
})
|
||||
}
|
||||
|
||||
def nextWhenEmpty(f: Finder[F], mt0: MetaProposalType, mts: MetaProposalType*)(
|
||||
implicit F: FlatMap[F],
|
||||
|
@ -19,14 +19,12 @@ object ItemHandler {
|
||||
.map(_ => ())
|
||||
|
||||
def itemStateTask[F[_]: Sync, A](state: ItemState)(data: ItemData): Task[F, A, ItemData] =
|
||||
Task { ctx =>
|
||||
ctx.store.transact(RItem.updateState(data.item.id, state)).map(_ => data)
|
||||
}
|
||||
Task(ctx => ctx.store.transact(RItem.updateState(data.item.id, state)).map(_ => data))
|
||||
|
||||
def isLastRetry[F[_]: Sync, A](ctx: Context[F, A]): F[Boolean] =
|
||||
for {
|
||||
current <- ctx.store.transact(RJob.getRetries(ctx.jobId))
|
||||
last = ctx.config.retries == current.getOrElse(0)
|
||||
last = ctx.config.retries == current.getOrElse(0)
|
||||
} yield last
|
||||
|
||||
def safeProcess[F[_]: Sync: ContextShift](
|
||||
|
@ -11,9 +11,7 @@ object TestTasks {
|
||||
private[this] val logger = getLogger
|
||||
|
||||
def success[F[_]]: Task[F, ProcessItemArgs, Unit] =
|
||||
Task { ctx =>
|
||||
ctx.logger.info(s"Running task now: ${ctx.args}")
|
||||
}
|
||||
Task(ctx => ctx.logger.info(s"Running task now: ${ctx.args}"))
|
||||
|
||||
def failing[F[_]: Sync]: Task[F, ProcessItemArgs, Unit] =
|
||||
Task { ctx =>
|
||||
|
@ -20,8 +20,8 @@ object TextAnalysis {
|
||||
t <- item.metas.toList.traverse(annotateAttachment[F](ctx.args.meta.language))
|
||||
_ <- ctx.logger.debug(s"Storing tags: ${t.map(_._1.copy(content = None))}")
|
||||
_ <- t.traverse(m =>
|
||||
ctx.store.transact(RAttachmentMeta.updateLabels(m._1.id, m._1.nerlabels))
|
||||
)
|
||||
ctx.store.transact(RAttachmentMeta.updateLabels(m._1.id, m._1.nerlabels))
|
||||
)
|
||||
e <- s
|
||||
_ <- ctx.logger.info(s"Text-Analysis finished in ${e.formatExact}")
|
||||
v = t.toVector
|
||||
|
@ -12,8 +12,8 @@ import docspell.store.records.{RAttachment, RAttachmentMeta, RFileMeta}
|
||||
object TextExtraction {
|
||||
|
||||
def apply[F[_]: Sync: ContextShift](
|
||||
cfg: ExtractConfig,
|
||||
item: ItemData
|
||||
cfg: ExtractConfig,
|
||||
item: ItemData
|
||||
): Task[F, ProcessItemArgs, ItemData] =
|
||||
Task { ctx =>
|
||||
for {
|
||||
@ -28,11 +28,11 @@ object TextExtraction {
|
||||
}
|
||||
|
||||
def extractTextIfEmpty[F[_]: Sync: ContextShift](
|
||||
ctx: Context[F, _],
|
||||
cfg: ExtractConfig,
|
||||
lang: Language,
|
||||
item: ItemData
|
||||
)(ra: RAttachment): F[RAttachmentMeta] = {
|
||||
ctx: Context[F, _],
|
||||
cfg: ExtractConfig,
|
||||
lang: Language,
|
||||
item: ItemData
|
||||
)(ra: RAttachment): F[RAttachmentMeta] = {
|
||||
val rm = item.findOrCreate(ra.id)
|
||||
rm.content match {
|
||||
case Some(_) =>
|
||||
@ -50,14 +50,14 @@ object TextExtraction {
|
||||
item: ItemData
|
||||
)(ra: RAttachment): F[RAttachmentMeta] =
|
||||
for {
|
||||
_ <- ctx.logger.debug(s"Extracting text for attachment ${stripAttachmentName(ra)}")
|
||||
dst <- Duration.stopTime[F]
|
||||
txt <- extractTextFallback(ctx, cfg, ra, lang)(filesToExtract(item, ra))
|
||||
_ <- ctx.logger.debug(s"Extracting text for attachment ${stripAttachmentName(ra)}")
|
||||
dst <- Duration.stopTime[F]
|
||||
txt <- extractTextFallback(ctx, cfg, ra, lang)(filesToExtract(item, ra))
|
||||
meta = item.changeMeta(ra.id, rm => rm.setContentIfEmpty(txt.map(_.trim).filter(_.nonEmpty)))
|
||||
est <- dst
|
||||
est <- dst
|
||||
_ <- ctx.logger.debug(
|
||||
s"Extracting text for attachment ${stripAttachmentName(ra)} finished in ${est.formatExact}"
|
||||
)
|
||||
s"Extracting text for attachment ${stripAttachmentName(ra)} finished in ${est.formatExact}"
|
||||
)
|
||||
} yield meta
|
||||
|
||||
def extractText[F[_]: Sync: ContextShift](
|
||||
@ -76,16 +76,15 @@ object TextExtraction {
|
||||
.getOrElse(Mimetype.`application/octet-stream`)
|
||||
|
||||
findMime
|
||||
.flatMap(mt =>
|
||||
extr.extractText(data, DataType(MimeType(mt.primary, mt.sub)), lang))
|
||||
.flatMap(mt => extr.extractText(data, DataType(MimeType(mt.primary, mt.sub)), lang))
|
||||
}
|
||||
|
||||
private def extractTextFallback[F[_]: Sync: ContextShift](
|
||||
ctx: Context[F, _],
|
||||
cfg: ExtractConfig,
|
||||
ra: RAttachment,
|
||||
lang: Language,
|
||||
)(fileIds: List[Ident]): F[Option[String]] = {
|
||||
ctx: Context[F, _],
|
||||
cfg: ExtractConfig,
|
||||
ra: RAttachment,
|
||||
lang: Language
|
||||
)(fileIds: List[Ident]): F[Option[String]] =
|
||||
fileIds match {
|
||||
case Nil =>
|
||||
ctx.logger.error(s"Cannot extract text").map(_ => None)
|
||||
@ -99,15 +98,18 @@ object TextExtraction {
|
||||
txt.some.pure[F]
|
||||
|
||||
case ExtractResult.UnsupportedFormat(mt) =>
|
||||
ctx.logger.warn(s"Cannot extract text from file ${stripAttachmentName(ra)}: unsupported format ${mt.asString}. Try with converted file.").
|
||||
flatMap(_ => extractTextFallback[F](ctx, cfg, ra, lang)(rest))
|
||||
ctx.logger
|
||||
.warn(
|
||||
s"Cannot extract text from file ${stripAttachmentName(ra)}: unsupported format ${mt.asString}. Try with converted file."
|
||||
)
|
||||
.flatMap(_ => extractTextFallback[F](ctx, cfg, ra, lang)(rest))
|
||||
|
||||
case ExtractResult.Failure(ex) =>
|
||||
ctx.logger.warn(s"Cannot extract text: ${ex.getMessage}. Try with converted file").
|
||||
flatMap(_ => extractTextFallback[F](ctx, cfg, ra, lang)(rest))
|
||||
ctx.logger
|
||||
.warn(s"Cannot extract text: ${ex.getMessage}. Try with converted file")
|
||||
.flatMap(_ => extractTextFallback[F](ctx, cfg, ra, lang)(rest))
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
/** Returns the fileIds to extract text from. First, the source file
|
||||
* is tried. If that fails, the converted file is tried.
|
||||
@ -115,7 +117,7 @@ object TextExtraction {
|
||||
private def filesToExtract(item: ItemData, ra: RAttachment): List[Ident] =
|
||||
item.originFile.get(ra.id) match {
|
||||
case Some(sid) => List(sid, ra.fileId).distinct
|
||||
case None => List(ra.fileId)
|
||||
case None => List(ra.fileId)
|
||||
}
|
||||
|
||||
private def stripAttachmentName(ra: RAttachment): String =
|
||||
|
@ -25,15 +25,15 @@ object JoexRoutes {
|
||||
case GET -> Root / "running" =>
|
||||
for {
|
||||
jobs <- app.scheduler.getRunning
|
||||
jj = jobs.map(mkJob)
|
||||
jj = jobs.map(mkJob)
|
||||
resp <- Ok(JobList(jj.toList))
|
||||
} yield resp
|
||||
|
||||
case POST -> Root / "shutdownAndExit" =>
|
||||
for {
|
||||
_ <- ConcurrentEffect[F].start(
|
||||
Timer[F].sleep(Duration.seconds(1).toScala) *> app.initShutdown
|
||||
)
|
||||
Timer[F].sleep(Duration.seconds(1).toScala) *> app.initShutdown
|
||||
)
|
||||
resp <- Ok(BasicResult(true, "Shutdown initiated."))
|
||||
} yield resp
|
||||
|
||||
@ -41,8 +41,8 @@ object JoexRoutes {
|
||||
for {
|
||||
optJob <- app.scheduler.getRunning.map(_.find(_.id == id))
|
||||
optLog <- optJob.traverse(j => app.findLogs(j.id))
|
||||
jAndL = for { job <- optJob; log <- optLog } yield mkJobLog(job, log)
|
||||
resp <- jAndL.map(Ok(_)).getOrElse(NotFound(BasicResult(false, "Not found")))
|
||||
jAndL = for { job <- optJob; log <- optLog } yield mkJobLog(job, log)
|
||||
resp <- jAndL.map(Ok(_)).getOrElse(NotFound(BasicResult(false, "Not found")))
|
||||
} yield resp
|
||||
|
||||
case POST -> Root / "job" / Ident(id) / "cancel" =>
|
||||
|
@ -54,7 +54,7 @@ object Context {
|
||||
_ <- log.ftrace("Creating logger for task run")
|
||||
logger <- QueueLogger(job.id, job.info, config.logBufferSize, logSink)
|
||||
_ <- log.ftrace("Logger created, instantiating context")
|
||||
ctx = create[F, A](job, arg, config, logger, store, blocker)
|
||||
ctx = create[F, A](job, arg, config, logger, store, blocker)
|
||||
} yield ctx
|
||||
|
||||
final private class ContextImpl[F[_]: Functor, A](
|
||||
|
@ -38,9 +38,9 @@ object QueueLogger {
|
||||
sink: LogSink[F]
|
||||
): F[Logger[F]] =
|
||||
for {
|
||||
q <- Queue.circularBuffer[F, LogEvent](bufferSize)
|
||||
q <- Queue.circularBuffer[F, LogEvent](bufferSize)
|
||||
log = create(jobId, jobInfo, q)
|
||||
_ <- Concurrent[F].start(q.dequeue.through(sink.receive).compile.drain)
|
||||
_ <- Concurrent[F].start(q.dequeue.through(sink.receive).compile.drain)
|
||||
} yield log
|
||||
|
||||
}
|
||||
|
@ -91,12 +91,12 @@ final class SchedulerImpl[F[_]: ConcurrentEffect: ContextShift](
|
||||
_ <- logger.fdebug("New permit acquired")
|
||||
down <- state.get.map(_.shutdownRequest)
|
||||
rjob <- if (down) logger.finfo("") *> permits.release *> (None: Option[RJob]).pure[F]
|
||||
else
|
||||
queue.nextJob(
|
||||
group => state.modify(_.nextPrio(group, config.countingScheme)),
|
||||
config.name,
|
||||
config.retryDelay
|
||||
)
|
||||
else
|
||||
queue.nextJob(
|
||||
group => state.modify(_.nextPrio(group, config.countingScheme)),
|
||||
config.name,
|
||||
config.retryDelay
|
||||
)
|
||||
_ <- logger.fdebug(s"Next job found: ${rjob.map(_.info)}")
|
||||
_ <- rjob.map(execute).getOrElse(permits.release)
|
||||
} yield rjob.isDefined
|
||||
@ -122,8 +122,8 @@ final class SchedulerImpl[F[_]: ConcurrentEffect: ContextShift](
|
||||
def execute(job: RJob): F[Unit] = {
|
||||
val task = for {
|
||||
jobtask <- tasks
|
||||
.find(job.task)
|
||||
.toRight(s"This executor cannot run tasks with name: ${job.task}")
|
||||
.find(job.task)
|
||||
.toRight(s"This executor cannot run tasks with name: ${job.task}")
|
||||
} yield jobtask
|
||||
|
||||
task match {
|
||||
@ -144,8 +144,8 @@ final class SchedulerImpl[F[_]: ConcurrentEffect: ContextShift](
|
||||
for {
|
||||
_ <- logger.fdebug(s"Job ${job.info} done $finalState. Releasing resources.")
|
||||
_ <- permits.release *> permits.available.flatMap(a =>
|
||||
logger.fdebug(s"Permit released ($a free)")
|
||||
)
|
||||
logger.fdebug(s"Permit released ($a free)")
|
||||
)
|
||||
_ <- state.modify(_.removeRunning(job))
|
||||
_ <- QJob.setFinalState(job.id, finalState, store)
|
||||
} yield ()
|
||||
|
Reference in New Issue
Block a user