Add a folder-id to item processing

This allows to define a folder when uploading files. All generated
items are associated to this folder on creation.
This commit is contained in:
Eike Kettner
2020-07-14 21:25:44 +02:00
parent ec7f027b4e
commit 5b01c93711
14 changed files with 126 additions and 24 deletions

View File

@ -84,6 +84,7 @@ object JoexAppImpl {
joex <- OJoex(client, store)
upload <- OUpload(store, queue, cfg.files, joex)
fts <- createFtsClient(cfg)(httpClient)
itemOps <- OItem(store, fts)
javaEmil =
JavaMailEmil(blocker, Settings.defaultSettings.copy(debug = cfg.mailDebug))
sch <- SchedulerBuilder(cfg.scheduler, blocker, store)
@ -91,7 +92,7 @@ object JoexAppImpl {
.withTask(
JobTask.json(
ProcessItemArgs.taskName,
ItemHandler.newItem[F](cfg, fts),
ItemHandler.newItem[F](cfg, itemOps, fts),
ItemHandler.onCancel[F]
)
)

View File

@ -5,6 +5,7 @@ import cats.effect._
import cats.implicits._
import fs2.Stream
import docspell.backend.ops.OItem
import docspell.common.{ItemState, ProcessItemArgs}
import docspell.ftsclient.FtsClient
import docspell.joex.Config
@ -27,11 +28,12 @@ object ItemHandler {
def newItem[F[_]: ConcurrentEffect: ContextShift](
cfg: Config,
itemOps: OItem[F],
fts: FtsClient[F]
): Task[F, Args, Unit] =
CreateItem[F]
.flatMap(itemStateTask(ItemState.Processing))
.flatMap(safeProcess[F](cfg, fts))
.flatMap(safeProcess[F](cfg, itemOps, fts))
.map(_ => ())
def itemStateTask[F[_]: Sync, A](
@ -48,11 +50,12 @@ object ItemHandler {
def safeProcess[F[_]: ConcurrentEffect: ContextShift](
cfg: Config,
itemOps: OItem[F],
fts: FtsClient[F]
)(data: ItemData): Task[F, Args, ItemData] =
isLastRetry[F].flatMap {
case true =>
ProcessItem[F](cfg, fts)(data).attempt.flatMap({
ProcessItem[F](cfg, itemOps, fts)(data).attempt.flatMap({
case Right(d) =>
Task.pure(d)
case Left(ex) =>
@ -62,7 +65,7 @@ object ItemHandler {
.andThen(_ => Sync[F].raiseError(ex))
})
case false =>
ProcessItem[F](cfg, fts)(data).flatMap(itemStateTask(ItemState.Created))
ProcessItem[F](cfg, itemOps, fts)(data).flatMap(itemStateTask(ItemState.Created))
}
private def markItemCreated[F[_]: Sync]: Task[F, Args, Boolean] =

View File

@ -2,6 +2,7 @@ package docspell.joex.process
import cats.effect._
import docspell.backend.ops.OItem
import docspell.common.ProcessItemArgs
import docspell.ftsclient.FtsClient
import docspell.joex.Config
@ -11,6 +12,7 @@ object ProcessItem {
def apply[F[_]: ConcurrentEffect: ContextShift](
cfg: Config,
itemOps: OItem[F],
fts: FtsClient[F]
)(item: ItemData): Task[F, ProcessItemArgs, ItemData] =
ExtractArchive(item)
@ -22,6 +24,7 @@ object ProcessItem {
.flatMap(analysisOnly[F](cfg))
.flatMap(Task.setProgress(80))
.flatMap(LinkProposal[F])
.flatMap(SetGivenData[F](itemOps))
.flatMap(Task.setProgress(99))
def analysisOnly[F[_]: Sync](

View File

@ -0,0 +1,35 @@
package docspell.joex.process
import cats.effect._
import cats.implicits._
import docspell.backend.ops.OItem
import docspell.common._
import docspell.joex.scheduler.Task
object SetGivenData {
def apply[F[_]: Sync](
ops: OItem[F]
)(data: ItemData): Task[F, ProcessItemArgs, ItemData] =
if (data.item.state.isValid)
Task
.log[F, ProcessItemArgs](_.debug(s"Not setting data on existing item"))
.map(_ => data)
else
Task { ctx =>
val itemId = data.item.id
val folderId = ctx.args.meta.folderId
val collective = ctx.args.meta.collective
for {
_ <- ctx.logger.info("Starting setting given data")
_ <- ctx.logger.debug(s"Set item folder: '${folderId.map(_.id)}'")
e <- ops.setFolder(itemId, folderId, collective).attempt
_ <- e.fold(
ex => ctx.logger.warn(s"Error setting folder: ${ex.getMessage}"),
_ => ().pure[F]
)
} yield data
}
}

View File

@ -143,7 +143,7 @@ object ScanMailboxTask {
folder <- requireFolder(a)(name)
search <- searchMails(a)(folder)
headers <- Kleisli.liftF(filterMessageIds(search.mails))
_ <- headers.traverse(handleOne(a, upload))
_ <- headers.traverse(handleOne(ctx.args, a, upload))
} yield ScanResult(name, search.mails.size, search.count - search.mails.size)
def requireFolder[C](a: Access[F, C])(name: String): MailOp[F, C, MailFolder] =
@ -239,7 +239,9 @@ object ScanMailboxTask {
MailOp.pure(())
}
def submitMail(upload: OUpload[F])(mail: Mail[F]): F[OUpload.UploadResult] = {
def submitMail(upload: OUpload[F], args: Args)(
mail: Mail[F]
): F[OUpload.UploadResult] = {
val file = OUpload.File(
Some(mail.header.subject + ".eml"),
Some(MimeType.emls.head),
@ -251,6 +253,7 @@ object ScanMailboxTask {
meta = OUpload.UploadMeta(
Some(dir),
s"mailbox-${ctx.args.account.user.id}",
args.itemFolder,
Seq.empty
)
data = OUpload.UploadData(
@ -264,14 +267,14 @@ object ScanMailboxTask {
} yield res
}
def handleOne[C](a: Access[F, C], upload: OUpload[F])(
def handleOne[C](args: Args, a: Access[F, C], upload: OUpload[F])(
mh: MailHeader
): MailOp[F, C, Unit] =
for {
mail <- a.loadMail(mh)
res <- mail match {
case Some(m) =>
Kleisli.liftF(submitMail(upload)(m).attempt)
Kleisli.liftF(submitMail(upload, args)(m).attempt)
case None =>
MailOp.pure[F, C, Either[Throwable, OUpload.UploadResult]](
Either.left(new Exception(s"Mail not found"))