Allow to specify an item id to amend files to existing items

This commit is contained in:
Eike Kettner 2020-05-23 17:16:03 +02:00
parent 25d089da6c
commit f4949446e3
12 changed files with 128 additions and 62 deletions

View File

@ -1,8 +1,9 @@
package docspell.backend.ops package docspell.backend.ops
import bitpeace.MimetypeHint import bitpeace.MimetypeHint
import cats.implicits._ import cats.data.OptionT
import cats.effect._ import cats.effect._
import cats.implicits._
import docspell.backend.Config import docspell.backend.Config
import fs2.Stream import fs2.Stream
import docspell.common._ import docspell.common._
@ -17,13 +18,15 @@ trait OUpload[F[_]] {
def submit( def submit(
data: OUpload.UploadData[F], data: OUpload.UploadData[F],
account: AccountId, account: AccountId,
notifyJoex: Boolean notifyJoex: Boolean,
itemId: Option[Ident]
): F[OUpload.UploadResult] ): F[OUpload.UploadResult]
def submit( def submit(
data: OUpload.UploadData[F], data: OUpload.UploadData[F],
sourceId: Ident, sourceId: Ident,
notifyJoex: Boolean notifyJoex: Boolean,
itemId: Option[Ident]
): F[OUpload.UploadResult] ): F[OUpload.UploadResult]
} }
@ -68,7 +71,8 @@ object OUpload {
def submit( def submit(
data: OUpload.UploadData[F], data: OUpload.UploadData[F],
account: AccountId, account: AccountId,
notifyJoex: Boolean notifyJoex: Boolean,
itemId: Option[Ident]
): F[OUpload.UploadResult] = ): F[OUpload.UploadResult] =
for { for {
files <- data.files.traverse(saveFile).map(_.flatten) files <- data.files.traverse(saveFile).map(_.flatten)
@ -76,6 +80,7 @@ object OUpload {
lang <- store.transact(RCollective.findLanguage(account.collective)) lang <- store.transact(RCollective.findLanguage(account.collective))
meta = ProcessItemArgs.ProcessMeta( meta = ProcessItemArgs.ProcessMeta(
account.collective, account.collective,
itemId,
lang.getOrElse(Language.German), lang.getOrElse(Language.German),
data.meta.direction, data.meta.direction,
data.meta.sourceAbbrev, data.meta.sourceAbbrev,
@ -95,18 +100,18 @@ object OUpload {
def submit( def submit(
data: OUpload.UploadData[F], data: OUpload.UploadData[F],
sourceId: Ident, sourceId: Ident,
notifyJoex: Boolean notifyJoex: Boolean,
itemId: Option[Ident]
): F[OUpload.UploadResult] = ): F[OUpload.UploadResult] =
for { (for {
sOpt <- src <- OptionT(store.transact(RSource.find(sourceId)))
store updata = data.copy(
.transact(RSource.find(sourceId)) meta = data.meta.copy(sourceAbbrev = src.abbrev),
.map(_.toRight(UploadResult.NoSource)) priority = src.priority
abbrev = sOpt.map(_.abbrev).toOption.getOrElse(data.meta.sourceAbbrev) )
updata = data.copy(meta = data.meta.copy(sourceAbbrev = abbrev)) accId = AccountId(src.cid, src.sid)
accId = sOpt.map(source => AccountId(source.cid, source.sid)) result <- OptionT.liftF(submit(updata, accId, notifyJoex, itemId))
result <- accId.traverse(acc => submit(updata, acc, notifyJoex)) } yield result).getOrElse(UploadResult.NoSource)
} yield result.fold(identity, identity)
private def submitJobs( private def submitJobs(
notifyJoex: Boolean notifyJoex: Boolean

View File

@ -4,6 +4,14 @@ import io.circe._, io.circe.generic.semiauto._
import docspell.common.syntax.all._ import docspell.common.syntax.all._
import ProcessItemArgs._ import ProcessItemArgs._
/** Arguments to the process-item task.
*
* This task is run for each new file to create a new item from it or
* to add this file as an attachment to an existing item.
*
* If the `itemId' is set to some value, the item is tried to load to
* ammend with the given files. Otherwise a new item is created.
*/
case class ProcessItemArgs(meta: ProcessMeta, files: List[File]) { case class ProcessItemArgs(meta: ProcessMeta, files: List[File]) {
def makeSubject: String = def makeSubject: String =
@ -22,6 +30,7 @@ object ProcessItemArgs {
case class ProcessMeta( case class ProcessMeta(
collective: Ident, collective: Ident,
itemId: Option[Ident],
language: Language, language: Language,
direction: Option[Direction], direction: Option[Direction],
sourceAbbrev: String, sourceAbbrev: String,

View File

@ -77,7 +77,7 @@ object JoexAppImpl {
.withTask( .withTask(
JobTask.json( JobTask.json(
ProcessItemArgs.taskName, ProcessItemArgs.taskName,
ItemHandler[F](cfg), ItemHandler.newItem[F](cfg),
ItemHandler.onCancel[F] ItemHandler.onCancel[F]
) )
) )

View File

@ -32,44 +32,75 @@ object CreateItem {
def fileMetas(itemId: Ident, now: Timestamp) = def fileMetas(itemId: Ident, now: Timestamp) =
Stream Stream
.emits(ctx.args.files) .eval(ctx.store.transact(RAttachment.countOnItem(itemId)))
.flatMap(f => ctx.store.bitpeace.get(f.fileMetaId.id).map(fm => (f, fm))) .flatMap { offset =>
.collect({ case (f, Some(fm)) if isValidFile(fm) => f }) Stream
.zipWithIndex .emits(ctx.args.files)
.evalMap({ .flatMap(f => ctx.store.bitpeace.get(f.fileMetaId.id).map(fm => (f, fm)))
case (f, index) => .collect({ case (f, Some(fm)) if isValidFile(fm) => f })
Ident .zipWithIndex
.randomId[F] .evalMap({
.map(id => case (f, index) =>
RAttachment(id, itemId, f.fileMetaId, index.toInt, now, f.name) Ident
) .randomId[F]
}) .map(id =>
RAttachment(
id,
itemId,
f.fileMetaId,
index.toInt + offset,
now,
f.name
)
)
})
}
.compile .compile
.toVector .toVector
val item = RItem.newItem[F]( val loadItemOrInsertNew =
ctx.args.meta.collective, ctx.args.meta.itemId match {
ctx.args.makeSubject, case Some(id) =>
ctx.args.meta.sourceAbbrev, (for {
ctx.args.meta.direction.getOrElse(Direction.Incoming), _ <- OptionT.liftF(
ItemState.Premature ctx.logger.info(
) s"Loading item with id ${id.id} to ammend"
)
)
item <- OptionT(
ctx.store
.transact(RItem.findByIdAndCollective(id, ctx.args.meta.collective))
)
} yield (1, item))
.getOrElseF(Sync[F].raiseError(new Exception(s"Item not found.")))
case None =>
for {
_ <- ctx.logger.info(
s"Creating new item with ${ctx.args.files.size} attachment(s)"
)
item <- RItem.newItem[F](
ctx.args.meta.collective,
ctx.args.makeSubject,
ctx.args.meta.sourceAbbrev,
ctx.args.meta.direction.getOrElse(Direction.Incoming),
ItemState.Premature
)
n <- ctx.store.transact(RItem.insert(item))
} yield (n, item)
}
for { for {
_ <- ctx.logger.info(
s"Creating new item with ${ctx.args.files.size} attachment(s)"
)
time <- Duration.stopTime[F] time <- Duration.stopTime[F]
it <- item it <- loadItemOrInsertNew
n <- ctx.store.transact(RItem.insert(it)) _ <- if (it._1 != 1) storeItemError[F](ctx) else ().pure[F]
_ <- if (n != 1) storeItemError[F](ctx) else ().pure[F] now <- Timestamp.current[F]
fm <- fileMetas(it.id, it.created) fm <- fileMetas(it._2.id, now)
k <- fm.traverse(insertAttachment(ctx)) k <- fm.traverse(insertAttachment(ctx))
_ <- logDifferences(ctx, fm, k.sum) _ <- logDifferences(ctx, fm, k.sum)
dur <- time dur <- time
_ <- ctx.logger.info(s"Creating item finished in ${dur.formatExact}") _ <- ctx.logger.info(s"Creating item finished in ${dur.formatExact}")
} yield ItemData( } yield ItemData(
it, it._2,
fm, fm,
Vector.empty, Vector.empty,
Vector.empty, Vector.empty,
@ -86,7 +117,7 @@ object CreateItem {
} yield n) } yield n)
} }
def findExisting[F[_]: Sync]: Task[F, ProcessItemArgs, Option[ItemData]] = private def findExisting[F[_]: Sync]: Task[F, ProcessItemArgs, Option[ItemData]] =
Task { ctx => Task { ctx =>
for { for {
cand <- ctx.store.transact(QItem.findByFileIds(ctx.args.files.map(_.fileMetaId))) cand <- ctx.store.transact(QItem.findByFileIds(ctx.args.files.map(_.fileMetaId)))

View File

@ -50,8 +50,10 @@ object ExtractArchive {
findMime(ctx)(ra).flatMap(m => extractSafe(ctx, archive)(ra, m)) findMime(ctx)(ra).flatMap(m => extractSafe(ctx, archive)(ra, m))
for { for {
ras <- item.attachments.traverse(extract) ras <- item.attachments.traverse(extract)
nra = ras.flatMap(_.files).zipWithIndex.map(t => t._1.copy(position = t._2)) lastPos <- ctx.store.transact(RAttachment.countOnItem(item.item.id))
nra =
ras.flatMap(_.files).zipWithIndex.map(t => t._1.copy(position = lastPos + t._2))
_ <- nra.traverse(storeAttachment(ctx)) _ <- nra.traverse(storeAttachment(ctx))
naa = ras.flatMap(_.archives) naa = ras.flatMap(_.archives)
_ <- naa.traverse(storeArchive(ctx)) _ <- naa.traverse(storeArchive(ctx))

View File

@ -2,19 +2,20 @@ package docspell.joex.process
import cats.implicits._ import cats.implicits._
import cats.effect._ import cats.effect._
import fs2.Stream
import docspell.common.{ItemState, ProcessItemArgs} import docspell.common.{ItemState, ProcessItemArgs}
import docspell.joex.Config import docspell.joex.Config
import docspell.joex.scheduler.{Context, Task} import docspell.joex.scheduler.Task
import docspell.store.queries.QItem import docspell.store.queries.QItem
import docspell.store.records.{RItem, RJob} import docspell.store.records.RItem
object ItemHandler { object ItemHandler {
def onCancel[F[_]: Sync: ContextShift]: Task[F, ProcessItemArgs, Unit] = def onCancel[F[_]: Sync: ContextShift]: Task[F, ProcessItemArgs, Unit] =
logWarn("Now cancelling. Deleting potentially created data.").flatMap(_ => logWarn("Now cancelling. Deleting potentially created data.").flatMap(_ =>
deleteByFileIds deleteByFileIds.flatMap(_ => deleteFiles)
) )
def apply[F[_]: ConcurrentEffect: ContextShift]( def newItem[F[_]: ConcurrentEffect: ContextShift](
cfg: Config cfg: Config
): Task[F, ProcessItemArgs, Unit] = ): Task[F, ProcessItemArgs, Unit] =
CreateItem[F] CreateItem[F]
@ -31,16 +32,13 @@ object ItemHandler {
.map(_ => data) .map(_ => data)
) )
def isLastRetry[F[_]: Sync, A](ctx: Context[F, A]): F[Boolean] = def isLastRetry[F[_]: Sync]: Task[F, ProcessItemArgs, Boolean] =
for { Task(_.isLastRetry)
current <- ctx.store.transact(RJob.getRetries(ctx.jobId))
last = ctx.config.retries == current.getOrElse(0)
} yield last
def safeProcess[F[_]: ConcurrentEffect: ContextShift]( def safeProcess[F[_]: ConcurrentEffect: ContextShift](
cfg: Config cfg: Config
)(data: ItemData): Task[F, ProcessItemArgs, ItemData] = )(data: ItemData): Task[F, ProcessItemArgs, ItemData] =
Task(isLastRetry[F, ProcessItemArgs] _).flatMap { isLastRetry[F].flatMap {
case true => case true =>
ProcessItem[F](cfg)(data).attempt.flatMap({ ProcessItem[F](cfg)(data).attempt.flatMap({
case Right(d) => case Right(d) =>
@ -64,6 +62,15 @@ object ItemHandler {
} yield () } yield ()
} }
private def deleteFiles[F[_]: Sync]: Task[F, ProcessItemArgs, Unit] =
Task(ctx =>
Stream
.emits(ctx.args.files.map(_.fileMetaId.id))
.flatMap(id => ctx.store.bitpeace.delete(id).attempt.drain)
.compile
.drain
)
private def logWarn[F[_]](msg: => String): Task[F, ProcessItemArgs, Unit] = private def logWarn[F[_]](msg: => String): Task[F, ProcessItemArgs, Unit] =
Task(_.logger.warn(msg)) Task(_.logger.warn(msg))
} }

View File

@ -259,7 +259,7 @@ object ScanMailboxTask {
priority = Priority.Low, priority = Priority.Low,
tracker = None tracker = None
) )
res <- upload.submit(data, ctx.args.account, false) res <- upload.submit(data, ctx.args.account, false, None)
} yield res } yield res
} }

View File

@ -1,7 +1,7 @@
package docspell.joex.scheduler package docspell.joex.scheduler
import cats.Functor import cats.{Applicative, Functor}
import cats.effect.{Blocker, Concurrent} import cats.effect._
import cats.implicits._ import cats.implicits._
import docspell.common._ import docspell.common._
import docspell.store.Store import docspell.store.Store
@ -23,6 +23,12 @@ trait Context[F[_], A] { self =>
def store: Store[F] def store: Store[F]
final def isLastRetry(implicit ev: Applicative[F]): F[Boolean] =
for {
current <- store.transact(RJob.getRetries(jobId))
last = config.retries == current.getOrElse(0)
} yield last
def blocker: Blocker def blocker: Blocker
def map[C](f: A => C)(implicit F: Functor[F]): Context[F, C] = def map[C](f: A => C)(implicit F: Functor[F]): Context[F, C] =

View File

@ -80,7 +80,7 @@ object IntegrationEndpointRoutes {
cfg.backend.files.validMimeTypes cfg.backend.files.validMimeTypes
) )
account = AccountId(coll, Ident.unsafe("docspell-system")) account = AccountId(coll, Ident.unsafe("docspell-system"))
result <- backend.upload.submit(updata, account, true) result <- backend.upload.submit(updata, account, true, None)
res <- Ok(basicResult(result)) res <- Ok(basicResult(result))
} yield res } yield res
} }

View File

@ -36,7 +36,7 @@ object UploadRoutes {
Priority.High, Priority.High,
cfg.backend.files.validMimeTypes cfg.backend.files.validMimeTypes
) )
result <- backend.upload.submit(updata, user.account, true) result <- backend.upload.submit(updata, user.account, true, None)
res <- Ok(basicResult(result)) res <- Ok(basicResult(result))
} yield res } yield res
} }
@ -56,7 +56,7 @@ object UploadRoutes {
Priority.Low, Priority.Low,
cfg.backend.files.validMimeTypes cfg.backend.files.validMimeTypes
) )
result <- backend.upload.submit(updata, id, true) result <- backend.upload.submit(updata, id, true, None)
res <- Ok(basicResult(result)) res <- Ok(basicResult(result))
} yield res } yield res
} }

View File

@ -38,6 +38,9 @@ object RAttachment {
fr"${v.id},${v.itemId},${v.fileId.id},${v.position},${v.created},${v.name}" fr"${v.id},${v.itemId},${v.fileId.id},${v.position},${v.created},${v.name}"
).update.run ).update.run
def countOnItem(id: Ident): ConnectionIO[Int] =
selectCount(itemId, table, itemId.is(id)).query[Int].unique
def updateFileIdAndName( def updateFileIdAndName(
attachId: Ident, attachId: Ident,
fId: Ident, fId: Ident,

View File

@ -290,4 +290,7 @@ object RItem {
def existsById(itemId: Ident): ConnectionIO[Boolean] = def existsById(itemId: Ident): ConnectionIO[Boolean] =
selectCount(id, table, id.is(itemId)).query[Int].unique.map(_ > 0) selectCount(id, table, id.is(itemId)).query[Int].unique.map(_ > 0)
def findByIdAndCollective(itemId: Ident, coll: Ident): ConnectionIO[Option[RItem]] =
selectSimple(all, table, and(id.is(itemId), cid.is(coll))).query[RItem].option
} }