diff --git a/modules/backend/src/main/scala/docspell/backend/ops/OUpload.scala b/modules/backend/src/main/scala/docspell/backend/ops/OUpload.scala index bc2c688e..f787a17a 100644 --- a/modules/backend/src/main/scala/docspell/backend/ops/OUpload.scala +++ b/modules/backend/src/main/scala/docspell/backend/ops/OUpload.scala @@ -1,8 +1,9 @@ package docspell.backend.ops import bitpeace.MimetypeHint -import cats.implicits._ +import cats.data.OptionT import cats.effect._ +import cats.implicits._ import docspell.backend.Config import fs2.Stream import docspell.common._ @@ -17,13 +18,15 @@ trait OUpload[F[_]] { def submit( data: OUpload.UploadData[F], account: AccountId, - notifyJoex: Boolean + notifyJoex: Boolean, + itemId: Option[Ident] ): F[OUpload.UploadResult] def submit( data: OUpload.UploadData[F], sourceId: Ident, - notifyJoex: Boolean + notifyJoex: Boolean, + itemId: Option[Ident] ): F[OUpload.UploadResult] } @@ -68,7 +71,8 @@ object OUpload { def submit( data: OUpload.UploadData[F], account: AccountId, - notifyJoex: Boolean + notifyJoex: Boolean, + itemId: Option[Ident] ): F[OUpload.UploadResult] = for { files <- data.files.traverse(saveFile).map(_.flatten) @@ -76,6 +80,7 @@ object OUpload { lang <- store.transact(RCollective.findLanguage(account.collective)) meta = ProcessItemArgs.ProcessMeta( account.collective, + itemId, lang.getOrElse(Language.German), data.meta.direction, data.meta.sourceAbbrev, @@ -95,18 +100,18 @@ object OUpload { def submit( data: OUpload.UploadData[F], sourceId: Ident, - notifyJoex: Boolean + notifyJoex: Boolean, + itemId: Option[Ident] ): F[OUpload.UploadResult] = - for { - sOpt <- - store - .transact(RSource.find(sourceId)) - .map(_.toRight(UploadResult.NoSource)) - abbrev = sOpt.map(_.abbrev).toOption.getOrElse(data.meta.sourceAbbrev) - updata = data.copy(meta = data.meta.copy(sourceAbbrev = abbrev)) - accId = sOpt.map(source => AccountId(source.cid, source.sid)) - result <- accId.traverse(acc => submit(updata, acc, notifyJoex)) - } yield result.fold(identity, identity) + (for { + src <- OptionT(store.transact(RSource.find(sourceId))) + updata = data.copy( + meta = data.meta.copy(sourceAbbrev = src.abbrev), + priority = src.priority + ) + accId = AccountId(src.cid, src.sid) + result <- OptionT.liftF(submit(updata, accId, notifyJoex, itemId)) + } yield result).getOrElse(UploadResult.NoSource) private def submitJobs( notifyJoex: Boolean diff --git a/modules/common/src/main/scala/docspell/common/ProcessItemArgs.scala b/modules/common/src/main/scala/docspell/common/ProcessItemArgs.scala index 73e7a951..d170eae0 100644 --- a/modules/common/src/main/scala/docspell/common/ProcessItemArgs.scala +++ b/modules/common/src/main/scala/docspell/common/ProcessItemArgs.scala @@ -4,6 +4,14 @@ import io.circe._, io.circe.generic.semiauto._ import docspell.common.syntax.all._ import ProcessItemArgs._ +/** Arguments to the process-item task. + * + * This task is run for each new file to create a new item from it or + * to add this file as an attachment to an existing item. + * + * If the `itemId' is set to some value, the item is tried to load to + * ammend with the given files. Otherwise a new item is created. + */ case class ProcessItemArgs(meta: ProcessMeta, files: List[File]) { def makeSubject: String = @@ -22,6 +30,7 @@ object ProcessItemArgs { case class ProcessMeta( collective: Ident, + itemId: Option[Ident], language: Language, direction: Option[Direction], sourceAbbrev: String, diff --git a/modules/joex/src/main/scala/docspell/joex/JoexAppImpl.scala b/modules/joex/src/main/scala/docspell/joex/JoexAppImpl.scala index d07ca841..f2d3cd91 100644 --- a/modules/joex/src/main/scala/docspell/joex/JoexAppImpl.scala +++ b/modules/joex/src/main/scala/docspell/joex/JoexAppImpl.scala @@ -77,7 +77,7 @@ object JoexAppImpl { .withTask( JobTask.json( ProcessItemArgs.taskName, - ItemHandler[F](cfg), + ItemHandler.newItem[F](cfg), ItemHandler.onCancel[F] ) ) diff --git a/modules/joex/src/main/scala/docspell/joex/process/CreateItem.scala b/modules/joex/src/main/scala/docspell/joex/process/CreateItem.scala index 595c0b1b..30737bba 100644 --- a/modules/joex/src/main/scala/docspell/joex/process/CreateItem.scala +++ b/modules/joex/src/main/scala/docspell/joex/process/CreateItem.scala @@ -32,44 +32,75 @@ object CreateItem { def fileMetas(itemId: Ident, now: Timestamp) = Stream - .emits(ctx.args.files) - .flatMap(f => ctx.store.bitpeace.get(f.fileMetaId.id).map(fm => (f, fm))) - .collect({ case (f, Some(fm)) if isValidFile(fm) => f }) - .zipWithIndex - .evalMap({ - case (f, index) => - Ident - .randomId[F] - .map(id => - RAttachment(id, itemId, f.fileMetaId, index.toInt, now, f.name) - ) - }) + .eval(ctx.store.transact(RAttachment.countOnItem(itemId))) + .flatMap { offset => + Stream + .emits(ctx.args.files) + .flatMap(f => ctx.store.bitpeace.get(f.fileMetaId.id).map(fm => (f, fm))) + .collect({ case (f, Some(fm)) if isValidFile(fm) => f }) + .zipWithIndex + .evalMap({ + case (f, index) => + Ident + .randomId[F] + .map(id => + RAttachment( + id, + itemId, + f.fileMetaId, + index.toInt + offset, + now, + f.name + ) + ) + }) + } .compile .toVector - val item = RItem.newItem[F]( - ctx.args.meta.collective, - ctx.args.makeSubject, - ctx.args.meta.sourceAbbrev, - ctx.args.meta.direction.getOrElse(Direction.Incoming), - ItemState.Premature - ) + val loadItemOrInsertNew = + ctx.args.meta.itemId match { + case Some(id) => + (for { + _ <- OptionT.liftF( + ctx.logger.info( + s"Loading item with id ${id.id} to ammend" + ) + ) + item <- OptionT( + ctx.store + .transact(RItem.findByIdAndCollective(id, ctx.args.meta.collective)) + ) + } yield (1, item)) + .getOrElseF(Sync[F].raiseError(new Exception(s"Item not found."))) + case None => + for { + _ <- ctx.logger.info( + s"Creating new item with ${ctx.args.files.size} attachment(s)" + ) + item <- RItem.newItem[F]( + ctx.args.meta.collective, + ctx.args.makeSubject, + ctx.args.meta.sourceAbbrev, + ctx.args.meta.direction.getOrElse(Direction.Incoming), + ItemState.Premature + ) + n <- ctx.store.transact(RItem.insert(item)) + } yield (n, item) + } for { - _ <- ctx.logger.info( - s"Creating new item with ${ctx.args.files.size} attachment(s)" - ) time <- Duration.stopTime[F] - it <- item - n <- ctx.store.transact(RItem.insert(it)) - _ <- if (n != 1) storeItemError[F](ctx) else ().pure[F] - fm <- fileMetas(it.id, it.created) + it <- loadItemOrInsertNew + _ <- if (it._1 != 1) storeItemError[F](ctx) else ().pure[F] + now <- Timestamp.current[F] + fm <- fileMetas(it._2.id, now) k <- fm.traverse(insertAttachment(ctx)) _ <- logDifferences(ctx, fm, k.sum) dur <- time _ <- ctx.logger.info(s"Creating item finished in ${dur.formatExact}") } yield ItemData( - it, + it._2, fm, Vector.empty, Vector.empty, @@ -86,7 +117,7 @@ object CreateItem { } yield n) } - def findExisting[F[_]: Sync]: Task[F, ProcessItemArgs, Option[ItemData]] = + private def findExisting[F[_]: Sync]: Task[F, ProcessItemArgs, Option[ItemData]] = Task { ctx => for { cand <- ctx.store.transact(QItem.findByFileIds(ctx.args.files.map(_.fileMetaId))) diff --git a/modules/joex/src/main/scala/docspell/joex/process/ExtractArchive.scala b/modules/joex/src/main/scala/docspell/joex/process/ExtractArchive.scala index ddb184ab..d3a156ff 100644 --- a/modules/joex/src/main/scala/docspell/joex/process/ExtractArchive.scala +++ b/modules/joex/src/main/scala/docspell/joex/process/ExtractArchive.scala @@ -50,8 +50,10 @@ object ExtractArchive { findMime(ctx)(ra).flatMap(m => extractSafe(ctx, archive)(ra, m)) for { - ras <- item.attachments.traverse(extract) - nra = ras.flatMap(_.files).zipWithIndex.map(t => t._1.copy(position = t._2)) + ras <- item.attachments.traverse(extract) + lastPos <- ctx.store.transact(RAttachment.countOnItem(item.item.id)) + nra = + ras.flatMap(_.files).zipWithIndex.map(t => t._1.copy(position = lastPos + t._2)) _ <- nra.traverse(storeAttachment(ctx)) naa = ras.flatMap(_.archives) _ <- naa.traverse(storeArchive(ctx)) diff --git a/modules/joex/src/main/scala/docspell/joex/process/ItemHandler.scala b/modules/joex/src/main/scala/docspell/joex/process/ItemHandler.scala index 4334ece4..dbc0f70a 100644 --- a/modules/joex/src/main/scala/docspell/joex/process/ItemHandler.scala +++ b/modules/joex/src/main/scala/docspell/joex/process/ItemHandler.scala @@ -2,19 +2,20 @@ package docspell.joex.process import cats.implicits._ import cats.effect._ +import fs2.Stream import docspell.common.{ItemState, ProcessItemArgs} import docspell.joex.Config -import docspell.joex.scheduler.{Context, Task} +import docspell.joex.scheduler.Task import docspell.store.queries.QItem -import docspell.store.records.{RItem, RJob} +import docspell.store.records.RItem object ItemHandler { def onCancel[F[_]: Sync: ContextShift]: Task[F, ProcessItemArgs, Unit] = logWarn("Now cancelling. Deleting potentially created data.").flatMap(_ => - deleteByFileIds + deleteByFileIds.flatMap(_ => deleteFiles) ) - def apply[F[_]: ConcurrentEffect: ContextShift]( + def newItem[F[_]: ConcurrentEffect: ContextShift]( cfg: Config ): Task[F, ProcessItemArgs, Unit] = CreateItem[F] @@ -31,16 +32,13 @@ object ItemHandler { .map(_ => data) ) - def isLastRetry[F[_]: Sync, A](ctx: Context[F, A]): F[Boolean] = - for { - current <- ctx.store.transact(RJob.getRetries(ctx.jobId)) - last = ctx.config.retries == current.getOrElse(0) - } yield last + def isLastRetry[F[_]: Sync]: Task[F, ProcessItemArgs, Boolean] = + Task(_.isLastRetry) def safeProcess[F[_]: ConcurrentEffect: ContextShift]( cfg: Config )(data: ItemData): Task[F, ProcessItemArgs, ItemData] = - Task(isLastRetry[F, ProcessItemArgs] _).flatMap { + isLastRetry[F].flatMap { case true => ProcessItem[F](cfg)(data).attempt.flatMap({ case Right(d) => @@ -64,6 +62,15 @@ object ItemHandler { } yield () } + private def deleteFiles[F[_]: Sync]: Task[F, ProcessItemArgs, Unit] = + Task(ctx => + Stream + .emits(ctx.args.files.map(_.fileMetaId.id)) + .flatMap(id => ctx.store.bitpeace.delete(id).attempt.drain) + .compile + .drain + ) + private def logWarn[F[_]](msg: => String): Task[F, ProcessItemArgs, Unit] = Task(_.logger.warn(msg)) } diff --git a/modules/joex/src/main/scala/docspell/joex/scanmailbox/ScanMailboxTask.scala b/modules/joex/src/main/scala/docspell/joex/scanmailbox/ScanMailboxTask.scala index 670b2fec..8d59b481 100644 --- a/modules/joex/src/main/scala/docspell/joex/scanmailbox/ScanMailboxTask.scala +++ b/modules/joex/src/main/scala/docspell/joex/scanmailbox/ScanMailboxTask.scala @@ -259,7 +259,7 @@ object ScanMailboxTask { priority = Priority.Low, tracker = None ) - res <- upload.submit(data, ctx.args.account, false) + res <- upload.submit(data, ctx.args.account, false, None) } yield res } diff --git a/modules/joex/src/main/scala/docspell/joex/scheduler/Context.scala b/modules/joex/src/main/scala/docspell/joex/scheduler/Context.scala index ca16c1a8..d8a98906 100644 --- a/modules/joex/src/main/scala/docspell/joex/scheduler/Context.scala +++ b/modules/joex/src/main/scala/docspell/joex/scheduler/Context.scala @@ -1,7 +1,7 @@ package docspell.joex.scheduler -import cats.Functor -import cats.effect.{Blocker, Concurrent} +import cats.{Applicative, Functor} +import cats.effect._ import cats.implicits._ import docspell.common._ import docspell.store.Store @@ -23,6 +23,12 @@ trait Context[F[_], A] { self => def store: Store[F] + final def isLastRetry(implicit ev: Applicative[F]): F[Boolean] = + for { + current <- store.transact(RJob.getRetries(jobId)) + last = config.retries == current.getOrElse(0) + } yield last + def blocker: Blocker def map[C](f: A => C)(implicit F: Functor[F]): Context[F, C] = diff --git a/modules/restserver/src/main/scala/docspell/restserver/routes/IntegrationEndpointRoutes.scala b/modules/restserver/src/main/scala/docspell/restserver/routes/IntegrationEndpointRoutes.scala index 6f0361d4..9433fa01 100644 --- a/modules/restserver/src/main/scala/docspell/restserver/routes/IntegrationEndpointRoutes.scala +++ b/modules/restserver/src/main/scala/docspell/restserver/routes/IntegrationEndpointRoutes.scala @@ -80,7 +80,7 @@ object IntegrationEndpointRoutes { cfg.backend.files.validMimeTypes ) account = AccountId(coll, Ident.unsafe("docspell-system")) - result <- backend.upload.submit(updata, account, true) + result <- backend.upload.submit(updata, account, true, None) res <- Ok(basicResult(result)) } yield res } diff --git a/modules/restserver/src/main/scala/docspell/restserver/routes/UploadRoutes.scala b/modules/restserver/src/main/scala/docspell/restserver/routes/UploadRoutes.scala index d48d5159..77dfb427 100644 --- a/modules/restserver/src/main/scala/docspell/restserver/routes/UploadRoutes.scala +++ b/modules/restserver/src/main/scala/docspell/restserver/routes/UploadRoutes.scala @@ -36,7 +36,7 @@ object UploadRoutes { Priority.High, cfg.backend.files.validMimeTypes ) - result <- backend.upload.submit(updata, user.account, true) + result <- backend.upload.submit(updata, user.account, true, None) res <- Ok(basicResult(result)) } yield res } @@ -56,7 +56,7 @@ object UploadRoutes { Priority.Low, cfg.backend.files.validMimeTypes ) - result <- backend.upload.submit(updata, id, true) + result <- backend.upload.submit(updata, id, true, None) res <- Ok(basicResult(result)) } yield res } diff --git a/modules/store/src/main/scala/docspell/store/records/RAttachment.scala b/modules/store/src/main/scala/docspell/store/records/RAttachment.scala index a63dd4d5..dbb5dc16 100644 --- a/modules/store/src/main/scala/docspell/store/records/RAttachment.scala +++ b/modules/store/src/main/scala/docspell/store/records/RAttachment.scala @@ -38,6 +38,9 @@ object RAttachment { fr"${v.id},${v.itemId},${v.fileId.id},${v.position},${v.created},${v.name}" ).update.run + def countOnItem(id: Ident): ConnectionIO[Int] = + selectCount(itemId, table, itemId.is(id)).query[Int].unique + def updateFileIdAndName( attachId: Ident, fId: Ident, diff --git a/modules/store/src/main/scala/docspell/store/records/RItem.scala b/modules/store/src/main/scala/docspell/store/records/RItem.scala index 05f06e9f..6d300e62 100644 --- a/modules/store/src/main/scala/docspell/store/records/RItem.scala +++ b/modules/store/src/main/scala/docspell/store/records/RItem.scala @@ -290,4 +290,7 @@ object RItem { def existsById(itemId: Ident): ConnectionIO[Boolean] = selectCount(id, table, id.is(itemId)).query[Int].unique.map(_ > 0) + + def findByIdAndCollective(itemId: Ident, coll: Ident): ConnectionIO[Option[RItem]] = + selectSimple(all, table, and(id.is(itemId), cid.is(coll))).query[RItem].option }