Add a file-repository for better organizing files

Docspell now must use a new api for accessing files.

Issue: #1379
This commit is contained in:
eikek
2022-02-13 12:08:01 +01:00
parent 3dcb113cef
commit 553b1fa249
40 changed files with 451 additions and 232 deletions

View File

@ -31,7 +31,7 @@ object Classify {
_ <- OptionT.liftF(logger.info(s"Guessing label for ${cname.name}"))
model <- OptionT(store.transact(RClassifierModel.findByName(coll, cname.name)))
.flatTapNone(logger.debug("No classifier model found."))
modelData = store.fileStore.getBytes(model.fileId)
modelData = store.fileRepo.getBytes(model.fileId)
cls <- OptionT(File.withTempDir(workingDir, "classify").use { dir =>
val modelFile = dir.resolve("model.ser.gz")
modelData

View File

@ -91,7 +91,7 @@ object LearnClassifierTask {
n <- ctx.store.transact(RClassifierModel.deleteAll(list.map(_.id)))
_ <- list
.map(_.fileId)
.traverse(id => ctx.store.fileStore.delete(id))
.traverse(id => ctx.store.fileRepo.delete(id))
_ <- ctx.logger.debug(s"Deleted $n model files.")
} yield ()

View File

@ -42,7 +42,12 @@ object StoreClassifierModel {
_ <- logger.debug(s"Storing new trained model for: ${modelName.name}")
fileData = Files[F].readAll(trainedModel.model)
newFileId <-
fileData.through(store.fileStore.save(MimeTypeHint.none)).compile.lastOrError
fileData
.through(
store.fileRepo.save(collective, FileCategory.Classifier, MimeTypeHint.none)
)
.compile
.lastOrError
_ <- store.transact(
RClassifierModel.updateFile(collective, modelName.name, newFileId)
)
@ -50,7 +55,7 @@ object StoreClassifierModel {
_ <- oldFile match {
case Some(fid) =>
logger.debug(s"Deleting old model file ${fid.id}") *>
store.fileStore.delete(fid)
store.fileRepo.delete(fid)
case None => ().pure[F]
}
} yield ()

View File

@ -92,7 +92,7 @@ object PdfConvTask {
ctx: Context[F, Args],
in: RFileMeta
): F[Unit] = {
val fs = ctx.store.fileStore
val fs = ctx.store.fileRepo
val data = fs.getBytes(in.id)
val storeResult: ConversionResult.Handler[F, Unit] =
@ -141,11 +141,15 @@ object PdfConvTask {
newFile: Stream[F, Byte]
): F[Unit] = {
val mimeHint = MimeTypeHint.advertised(meta.mimetype)
val collective = meta.id.collective
val cat = FileCategory.AttachmentConvert
for {
fid <-
newFile.through(ctx.store.fileStore.save(mimeHint)).compile.lastOrError
newFile
.through(ctx.store.fileRepo.save(collective, cat, mimeHint))
.compile
.lastOrError
_ <- ctx.store.transact(RAttachment.updateFileId(ctx.args.attachId, fid))
} yield ()
}
}

View File

@ -100,5 +100,5 @@ object AttachmentPageCount {
.getOrElse(MimeType.octetStream)
def loadFile[F[_]](ctx: Context[F, _])(ra: RAttachment): Stream[F, Byte] =
ctx.store.fileStore.getBytes(ra.fileId)
ctx.store.fileRepo.getBytes(ra.fileId)
}

View File

@ -59,7 +59,7 @@ object AttachmentPreview {
preview.previewPNG(loadFile(ctx)(ra)).flatMap {
case Some(out) =>
ctx.logger.debug("Preview generated, saving to database…") *>
createRecord(ctx, out, ra).map(_.some)
createRecord(ctx, ra.fileId.collective, out, ra).map(_.some)
case None =>
ctx.logger
.info(s"Preview could not be generated. Maybe the pdf has no pages?") *>
@ -73,6 +73,7 @@ object AttachmentPreview {
private def createRecord[F[_]: Sync](
ctx: Context[F, _],
collective: Ident,
png: Stream[F, Byte],
ra: RAttachment
): F[RAttachmentPreview] = {
@ -82,7 +83,11 @@ object AttachmentPreview {
for {
fileId <- png
.through(
ctx.store.fileStore.save(MimeTypeHint(name.map(_.fullName), Some("image/png")))
ctx.store.fileRepo.save(
collective,
FileCategory.PreviewImage,
MimeTypeHint(name.map(_.fullName), Some("image/png"))
)
)
.compile
.lastOrError
@ -99,5 +104,5 @@ object AttachmentPreview {
.getOrElse(MimeType.octetStream)
def loadFile[F[_]](ctx: Context[F, _])(ra: RAttachment): Stream[F, Byte] =
ctx.store.fileStore.getBytes(ra.fileId)
ctx.store.fileRepo.getBytes(ra.fileId)
}

View File

@ -32,11 +32,12 @@ import docspell.store.records._
* This step assumes an existing premature item, it traverses its attachments.
*/
object ConvertPdf {
type Args = ProcessItemArgs
def apply[F[_]: Async](
cfg: ConvertConfig,
item: ItemData
): Task[F, ProcessItemArgs, ItemData] =
): Task[F, Args, ItemData] =
Task { ctx =>
def convert(ra: RAttachment): F[(RAttachment, Option[RAttachmentMeta])] =
isConverted(ctx)(ra).flatMap {
@ -61,7 +62,7 @@ object ConvertPdf {
}
def isConverted[F[_]](ctx: Context[F, ProcessItemArgs])(
def isConverted[F[_]](ctx: Context[F, Args])(
ra: RAttachment
): F[Boolean] =
ctx.store.transact(RAttachmentSource.isConverted(ra.id))
@ -74,14 +75,14 @@ object ConvertPdf {
def convertSafe[F[_]: Async](
cfg: ConvertConfig,
sanitizeHtml: SanitizeHtml,
ctx: Context[F, ProcessItemArgs],
ctx: Context[F, Args],
item: ItemData
)(ra: RAttachment, mime: MimeType): F[(RAttachment, Option[RAttachmentMeta])] =
loadCollectivePasswords(ctx).flatMap(collPass =>
Conversion.create[F](cfg, sanitizeHtml, collPass, ctx.logger).use { conv =>
mime match {
case mt =>
val data = ctx.store.fileStore.getBytes(ra.fileId)
val data = ctx.store.fileRepo.getBytes(ra.fileId)
val handler = conversionHandler[F](ctx, cfg, ra, item)
ctx.logger
.info(s"Converting file ${ra.name} (${mime.asString}) into a PDF") *>
@ -93,14 +94,14 @@ object ConvertPdf {
)
private def loadCollectivePasswords[F[_]: Async](
ctx: Context[F, ProcessItemArgs]
ctx: Context[F, Args]
): F[List[Password]] =
ctx.store
.transact(RCollectivePassword.findAll(ctx.args.meta.collective))
.map(_.map(_.password).distinct)
private def conversionHandler[F[_]: Sync](
ctx: Context[F, ProcessItemArgs],
ctx: Context[F, Args],
cfg: ConvertConfig,
ra: RAttachment,
item: ItemData
@ -146,7 +147,7 @@ object ConvertPdf {
}
private def storePDF[F[_]: Sync](
ctx: Context[F, ProcessItemArgs],
ctx: Context[F, Args],
cfg: ConvertConfig,
ra: RAttachment,
pdf: Stream[F, Byte]
@ -160,7 +161,13 @@ object ConvertPdf {
.map(_.fullName)
pdf
.through(ctx.store.fileStore.save(MimeTypeHint(hint.filename, hint.advertised)))
.through(
ctx.store.fileRepo.save(
ctx.args.meta.collective,
FileCategory.AttachmentConvert,
MimeTypeHint(hint.filename, hint.advertised)
)
)
.compile
.lastOrError
.flatMap(fmId => updateAttachment[F](ctx, ra, fmId, newName).map(_ => fmId))
@ -170,7 +177,7 @@ object ConvertPdf {
private def updateAttachment[F[_]: Sync](
ctx: Context[F, _],
ra: RAttachment,
fmId: Ident,
fmId: FileKey,
newName: Option[String]
): F[Unit] =
for {
@ -188,7 +195,7 @@ object ConvertPdf {
if (sameFile) ().pure[F]
else
ctx.logger.info("Deleting previous attachment file") *>
ctx.store.fileStore
ctx.store.fileRepo
.delete(raPrev.fileId)
.attempt
.flatMap {

View File

@ -14,6 +14,7 @@ import fs2.Stream
import docspell.common._
import docspell.joex.scheduler.{Context, Task}
import docspell.store.file.FileMetadata
import docspell.store.queries.QItem
import docspell.store.records._
@ -28,7 +29,7 @@ object CreateItem {
def createNew[F[_]: Sync]: Task[F, ProcessItemArgs, ItemData] =
Task { ctx =>
def isValidFile(fm: RFileMeta) =
def isValidFile(fm: FileMetadata) =
ctx.args.meta.validFileTypes.isEmpty ||
ctx.args.meta.validFileTypes.toSet
.contains(fm.mimetype)
@ -39,9 +40,7 @@ object CreateItem {
.flatMap { offset =>
Stream
.emits(ctx.args.files)
.evalMap(f =>
ctx.store.fileStore.findMeta(f.fileMetaId).value.map(fm => (f, fm))
)
.evalMap(f => ctx.store.fileRepo.findMeta(f.fileMetaId).map(fm => (f, fm)))
.collect { case (f, Some(fm)) if isValidFile(fm) => f }
.zipWithIndex
.evalMap { case (f, index) =>
@ -198,6 +197,6 @@ object CreateItem {
// TODO if no source is present, it must be saved!
private def originFileTuple(
t: (RAttachment, Option[RAttachmentSource])
): (Ident, Ident) =
): (Ident, FileKey) =
t._2.map(s => s.id -> s.fileId).getOrElse(t._1.id -> t._1.fileId)
}

View File

@ -51,7 +51,7 @@ object DuplicateCheck {
val fname = ctx.args.files.find(_.fileMetaId == fd.fm.id).flatMap(_.name)
if (fd.exists)
ctx.logger
.info(s"Deleting duplicate file $fname!") *> ctx.store.fileStore
.info(s"Deleting duplicate file $fname!") *> ctx.store.fileRepo
.delete(fd.fm.id)
else ().pure[F]
}

View File

@ -32,16 +32,17 @@ import emil.Mail
* This step assumes an existing premature item, it traverses its attachments.
*/
object ExtractArchive {
type Args = ProcessItemArgs
def apply[F[_]: Async](
item: ItemData
): Task[F, ProcessItemArgs, ItemData] =
): Task[F, Args, ItemData] =
multiPass(item, None).map(_._2)
def multiPass[F[_]: Async](
item: ItemData,
archive: Option[RAttachmentArchive]
): Task[F, ProcessItemArgs, (Option[RAttachmentArchive], ItemData)] =
): Task[F, Args, (Option[RAttachmentArchive], ItemData)] =
singlePass(item, archive).flatMap { t =>
if (t._1.isEmpty) Task.pure(t)
else multiPass(t._2, t._1)
@ -50,7 +51,7 @@ object ExtractArchive {
def singlePass[F[_]: Async](
item: ItemData,
archive: Option[RAttachmentArchive]
): Task[F, ProcessItemArgs, (Option[RAttachmentArchive], ItemData)] =
): Task[F, Args, (Option[RAttachmentArchive], ItemData)] =
Task { ctx =>
def extract(ra: RAttachment, pos: Int): F[Extracted] =
findMime(ctx)(ra).flatMap(m => extractSafe(ctx, archive)(ra, pos, m))
@ -88,7 +89,7 @@ object ExtractArchive {
.getOrElse(MimeType.octetStream)
def extractSafe[F[_]: Async](
ctx: Context[F, ProcessItemArgs],
ctx: Context[F, Args],
archive: Option[RAttachmentArchive]
)(ra: RAttachment, pos: Int, mime: MimeType): F[Extracted] =
mime match {
@ -120,7 +121,7 @@ object ExtractArchive {
)
_ <- ctx.store.transact(RAttachmentArchive.delete(ra.id))
_ <- ctx.store.transact(RAttachment.delete(ra.id))
_ <- ctx.store.fileStore.delete(ra.fileId)
_ <- ctx.store.fileRepo.delete(ra.fileId)
} yield extracted
case None =>
for {
@ -132,10 +133,10 @@ object ExtractArchive {
}
def extractZip[F[_]: Async](
ctx: Context[F, ProcessItemArgs],
ctx: Context[F, Args],
archive: Option[RAttachmentArchive]
)(ra: RAttachment, pos: Int): F[Extracted] = {
val zipData = ctx.store.fileStore.getBytes(ra.fileId)
val zipData = ctx.store.fileRepo.getBytes(ra.fileId)
val glob = ctx.args.meta.fileFilter.getOrElse(Glob.all)
ctx.logger.debug(s"Filtering zip entries with '${glob.asString}'") *>
zipData
@ -148,10 +149,10 @@ object ExtractArchive {
}
def extractMail[F[_]: Async](
ctx: Context[F, ProcessItemArgs],
ctx: Context[F, Args],
archive: Option[RAttachmentArchive]
)(ra: RAttachment, pos: Int): F[Extracted] = {
val email: Stream[F, Byte] = ctx.store.fileStore.getBytes(ra.fileId)
val email: Stream[F, Byte] = ctx.store.fileRepo.getBytes(ra.fileId)
val glob = ctx.args.meta.fileFilter.getOrElse(Glob.all)
val attachOnly = ctx.args.meta.attachmentsOnly.getOrElse(false)
@ -183,7 +184,7 @@ object ExtractArchive {
.getOrElse(Extracted.empty)
def handleEntry[F[_]: Sync](
ctx: Context[F, _],
ctx: Context[F, Args],
ra: RAttachment,
pos: Int,
archive: Option[RAttachmentArchive],
@ -193,7 +194,10 @@ object ExtractArchive {
): Stream[F, Extracted] = {
val (entry, subPos) = tentry
val mimeHint = MimeTypeHint.filename(entry.name).withAdvertised(entry.mime.asString)
val fileId = entry.data.through(ctx.store.fileStore.save(mimeHint))
val fileId = entry.data.through(
ctx.store.fileRepo
.save(ctx.args.meta.collective, FileCategory.AttachmentSource, mimeHint)
)
Stream.eval(ctx.logger.debug(s"Extracted ${entry.name}. Storing as attachment.")) >>
fileId.evalMap { fid =>

View File

@ -38,7 +38,7 @@ case class ItemData(
attachments: Vector[RAttachment],
metas: Vector[RAttachmentMeta],
dateLabels: Vector[AttachmentDates],
originFile: Map[Ident, Ident], // maps RAttachment.id -> FileMeta.id
originFile: Map[Ident, FileKey], // maps RAttachment.id -> FileMeta.id
givenMeta: MetaProposalList, // given meta data not associated to a specific attachment
// a list of tags (names or ids) attached to the item if they exist
tags: List[String],

View File

@ -133,7 +133,7 @@ object ItemHandler {
ctx.logger.info("Deleting input files …") *>
Stream
.emits(ctx.args.files.map(_.fileMetaId))
.evalMap(id => ctx.store.fileStore.delete(id).attempt)
.evalMap(id => ctx.store.fileRepo.delete(id).attempt)
.compile
.drain
)

View File

@ -126,11 +126,11 @@ object TextExtraction {
ctx: Context[F, _],
extr: Extraction[F],
lang: Language
)(fileId: Ident): F[ExtractResult] = {
val data = ctx.store.fileStore.getBytes(fileId)
)(fileId: FileKey): F[ExtractResult] = {
val data = ctx.store.fileRepo.getBytes(fileId)
def findMime: F[MimeType] =
OptionT(ctx.store.transact(RFileMeta.findById(fileId)))
OptionT(ctx.store.fileRepo.findMeta(fileId))
.map(_.mimetype)
.getOrElse(MimeType.octetStream)
@ -143,7 +143,7 @@ object TextExtraction {
cfg: ExtractConfig,
ra: RAttachment,
lang: Language
)(fileIds: List[Ident]): F[Option[ExtractResult.Success]] =
)(fileIds: List[FileKey]): F[Option[ExtractResult.Success]] =
fileIds match {
case Nil =>
ctx.logger.error(s"Cannot extract text").map(_ => None)
@ -179,7 +179,7 @@ object TextExtraction {
private def filesToExtract[F[_]: Sync](ctx: Context[F, _])(
item: ItemData,
ra: RAttachment
): F[List[Ident]] =
): F[List[FileKey]] =
item.originFile.get(ra.id) match {
case Some(sid) =>
ctx.store.transact(RFileMeta.findMime(sid)).map {