Merge pull request #57 from eikek/feature/archives

Feature/archives
This commit is contained in:
eikek 2020-03-19 23:11:06 +01:00 committed by GitHub
commit 7c4e4bb076
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
22 changed files with 691 additions and 64 deletions

View File

@ -1,5 +1,22 @@
# Changelog
## v0.4.0
*unknown*
- Support for archive files. Archives are files that contain other
files, like zip files. Docspell now extracts archives and adds the
content to an item. The extraction process is recursive, so there
may be zip files in zip files. File types supported:
- `zip` every file inside is added to one item as attachment
- `eml` (RCF822 E-Mail files) E-mails are considered archives, since
they may contain multiple files (body and attachments).
- Periodic Tasks framework: Docspell can now run tasks periodically
based on a schedule. This is not yet exposed to the user, but there
are some system cleanup jobs to start with.
- Improvement of the text analysis. For my test files there was a
increase in accuracy by about 10%.
## v0.3.0
*Mar. 1, 2020*

View File

@ -0,0 +1,24 @@
package docspell.common
import fs2.Stream
final case class Binary[F[_]](name: String, mime: MimeType, data: Stream[F, Byte]) {
def withMime(mime: MimeType): Binary[F] =
copy(mime = mime)
}
object Binary {
def apply[F[_]](name: String, data: Stream[F, Byte]): Binary[F] =
Binary[F](name, MimeType.octetStream, data)
def utf8[F[_]](name: String, content: String): Binary[F] =
Binary[F](name, MimeType.octetStream, Stream.emit(content).through(fs2.text.utf8Encode))
def text[F[_]](name: String, content: String): Binary[F] =
utf8(name, content).withMime(MimeType.plain)
def html[F[_]](name: String, content: String): Binary[F] =
utf8(name, content).withMime(MimeType.html)
}

View File

@ -0,0 +1,47 @@
package docspell.files
import cats.effect._
import cats.implicits._
import fs2.{Pipe, Stream}
import java.io.InputStream
import java.util.zip.ZipInputStream
import java.nio.file.Paths
import docspell.common.Binary
object Zip {
def unzipP[F[_]: ConcurrentEffect: ContextShift](
chunkSize: Int,
blocker: Blocker
): Pipe[F, Byte, Binary[F]] =
s => unzip[F](chunkSize, blocker)(s)
def unzip[F[_]: ConcurrentEffect: ContextShift](chunkSize: Int, blocker: Blocker)(
data: Stream[F, Byte]
): Stream[F, Binary[F]] =
data.through(fs2.io.toInputStream[F]).flatMap(in => unzipJava(in, chunkSize, blocker))
def unzipJava[F[_]: Sync: ContextShift](
in: InputStream,
chunkSize: Int,
blocker: Blocker
): Stream[F, Binary[F]] = {
val zin = new ZipInputStream(in)
val nextEntry = Resource.make(Sync[F].delay(Option(zin.getNextEntry))) {
case Some(_) => Sync[F].delay(zin.closeEntry())
case None => ().pure[F]
}
Stream
.resource(nextEntry)
.repeat
.unNoneTerminate
.map { ze =>
val name = Paths.get(ze.getName()).getFileName.toString
val data =
fs2.io.readInputStream[F]((zin: InputStream).pure[F], chunkSize, blocker, false)
Binary(name, data)
}
}
}

Binary file not shown.

View File

@ -0,0 +1,30 @@
package docspell.files
import minitest._
import cats.effect._
import cats.implicits._
import scala.concurrent.ExecutionContext
object ZipTest extends SimpleTestSuite {
val blocker = Blocker.liftExecutionContext(ExecutionContext.global)
implicit val CS = IO.contextShift(ExecutionContext.global)
test("unzip") {
val zipFile = ExampleFiles.letters_zip.readURL[IO](8192, blocker)
val uncomp = zipFile.through(Zip.unzip(8192, blocker))
uncomp.evalMap(entry => {
val x = entry.data.map(_ => 1).foldMonoid.compile.lastOrError
x.map(size => {
if (entry.name.endsWith(".pdf")) {
assertEquals(entry.name, "letter-de.pdf")
assertEquals(size, 34815)
} else {
assertEquals(entry.name, "letter-en.txt")
assertEquals(size, 1131)
}
})
}).compile.drain.unsafeRunSync
}
}

View File

@ -0,0 +1,62 @@
package docspell.joex.mail
import cats.effect._
import cats.implicits._
import fs2.{Pipe, Stream}
import emil.{MimeType => _, _}
import emil.javamail.syntax._
import cats.Applicative
import docspell.common._
object ReadMail {
def read[F[_]: Sync](str: String): F[Mail[F]] =
Mail.deserialize(str)
def readBytesP[F[_]: Sync](logger: Logger[F]): Pipe[F, Byte, Binary[F]] =
s =>
Stream.eval(logger.debug(s"Converting e-mail into its parts")) >>
bytesToMail(s).flatMap(mailToEntries[F](logger))
def bytesToMail[F[_]: Sync](data: Stream[F, Byte]): Stream[F, Mail[F]] =
data.through(fs2.text.utf8Decode).foldMonoid.evalMap(read[F])
def mailToEntries[F[_]: Applicative](
logger: Logger[F]
)(mail: Mail[F]): Stream[F, Binary[F]] = {
val bodyEntry: F[Option[Binary[F]]] = mail.body.fold(
_ => (None: Option[Binary[F]]).pure[F],
txt => txt.text.map(c => Binary.text[F]("mail.txt", c).some),
html => html.html.map(c => Binary.html[F]("mail.html", c).some),
both => both.html.map(c => Binary.html[F]("mail.html", c).some)
)
Stream.eval(
logger.debug(
s"E-mail has ${mail.attachments.size} attachments and ${bodyType(mail.body)}"
)
) >>
(Stream
.eval(bodyEntry)
.flatMap(e => Stream.emits(e.toSeq)) ++
Stream
.emits(mail.attachments.all)
.map(a =>
Binary(a.filename.getOrElse("noname"), a.mimeType.toDocspell, a.content)
))
}
implicit class MimeTypeConv(m: emil.MimeType) {
def toDocspell: MimeType =
MimeType(m.primary, m.sub)
}
private def bodyType[F[_]](body: MailBody[F]): String =
body.fold(
_ => "empty-body",
_ => "text-body",
_ => "html-body",
_ => "text-and-html-body"
)
}

View File

@ -64,7 +64,7 @@ object CreateItem {
} yield ItemData(it, fm, Vector.empty, Vector.empty, fm.map(a => a.id -> a.fileId).toMap)
}
def insertAttachment[F[_]: Sync](ctx: Context[F, ProcessItemArgs])(ra: RAttachment): F[Int] = {
def insertAttachment[F[_]: Sync](ctx: Context[F, _])(ra: RAttachment): F[Int] = {
val rs = RAttachmentSource.of(ra)
ctx.store.transact(for {
n <- RAttachment.insert(ra)

View File

@ -0,0 +1,201 @@
package docspell.joex.process
import bitpeace.{Mimetype, MimetypeHint, RangeDef}
import cats.Functor
import cats.data.OptionT
import cats.effect._
import cats.implicits._
import fs2.Stream
import docspell.common._
import docspell.joex.mail._
import docspell.joex.scheduler._
import docspell.store.records._
import docspell.files.Zip
import cats.kernel.Monoid
/** Goes through all attachments and extracts archive files, like zip
* files. The process is recursive, until all archives have been
* extracted.
*
* The archive file is stored as a `attachment_archive` record that
* references all its elements. If there are inner archive, only the
* outer archive file is preserved.
*
* This step assumes an existing premature item, it traverses its
* attachments.
*/
object ExtractArchive {
def apply[F[_]: ConcurrentEffect: ContextShift](
item: ItemData
): Task[F, ProcessItemArgs, ItemData] =
multiPass(item, None).map(_._2)
def multiPass[F[_]: ConcurrentEffect: ContextShift](
item: ItemData,
archive: Option[RAttachmentArchive]
): Task[F, ProcessItemArgs, (Option[RAttachmentArchive], ItemData)] =
singlePass(item, archive).flatMap { t =>
if (t._1 == None) Task.pure(t)
else multiPass(t._2, t._1)
}
def singlePass[F[_]: ConcurrentEffect: ContextShift](
item: ItemData,
archive: Option[RAttachmentArchive]
): Task[F, ProcessItemArgs, (Option[RAttachmentArchive], ItemData)] =
Task { ctx =>
def extract(ra: RAttachment) =
findMime(ctx)(ra).flatMap(m => extractSafe(ctx, archive)(ra, m))
for {
ras <- item.attachments.traverse(extract)
nra = ras.flatMap(_.files).zipWithIndex.map(t => t._1.copy(position = t._2))
_ <- nra.traverse(storeAttachment(ctx))
naa = ras.flatMap(_.archives)
_ <- naa.traverse(storeArchive(ctx))
} yield naa.headOption -> item.copy(
attachments = nra,
originFile = item.originFile ++ nra.map(a => a.id -> a.fileId).toMap
)
}
def findMime[F[_]: Functor](ctx: Context[F, _])(ra: RAttachment): F[Mimetype] =
OptionT(ctx.store.transact(RFileMeta.findById(ra.fileId)))
.map(_.mimetype)
.getOrElse(Mimetype.`application/octet-stream`)
def extractSafe[F[_]: ConcurrentEffect: ContextShift](
ctx: Context[F, ProcessItemArgs],
archive: Option[RAttachmentArchive]
)(ra: RAttachment, mime: Mimetype): F[Extracted] =
mime match {
case Mimetype.`application/zip` if ra.name.exists(_.endsWith(".zip")) =>
ctx.logger.info(s"Extracting zip archive ${ra.name.getOrElse("<noname>")}.") *>
extractZip(ctx, archive)(ra)
.flatTap(_ => cleanupParents(ctx, ra, archive))
case Mimetype("message", "rfc822", _) =>
ctx.logger.info(s"Reading e-mail ${ra.name.getOrElse("<noname>")}") *>
extractMail(ctx, archive)(ra)
.flatTap(_ => cleanupParents(ctx, ra, archive))
case _ =>
ctx.logger.debug(s"Not an archive: ${mime.asString}") *>
Extracted.noArchive(ra).pure[F]
}
def cleanupParents[F[_]: Sync](
ctx: Context[F, _],
ra: RAttachment,
archive: Option[RAttachmentArchive]
): F[Unit] =
archive match {
case Some(_) =>
for {
_ <- ctx.logger.debug(
s"Extracted inner attachment ${ra.name}. Remove it completely."
)
_ <- ctx.store.transact(RAttachmentArchive.delete(ra.id))
_ <- ctx.store.transact(RAttachment.delete(ra.id))
_ <- ctx.store.bitpeace.delete(ra.fileId.id).compile.drain
} yield ()
case None =>
for {
_ <- ctx.logger.debug(
s"Extracted attachment ${ra.name}. Remove it from the item."
)
_ <- ctx.store.transact(RAttachment.delete(ra.id))
} yield ()
}
def extractZip[F[_]: ConcurrentEffect: ContextShift](
ctx: Context[F, _],
archive: Option[RAttachmentArchive]
)(ra: RAttachment): F[Extracted] = {
val zipData = ctx.store.bitpeace
.get(ra.fileId.id)
.unNoneTerminate
.through(ctx.store.bitpeace.fetchData2(RangeDef.all))
zipData
.through(Zip.unzipP[F](8192, ctx.blocker))
.flatMap(handleEntry(ctx, ra, archive))
.foldMonoid
.compile
.lastOrError
}
def extractMail[F[_]: Sync](
ctx: Context[F, _],
archive: Option[RAttachmentArchive]
)(ra: RAttachment): F[Extracted] = {
val email = ctx.store.bitpeace
.get(ra.fileId.id)
.unNoneTerminate
.through(ctx.store.bitpeace.fetchData2(RangeDef.all))
email
.through(ReadMail.readBytesP[F](ctx.logger))
.flatMap(handleEntry(ctx, ra, archive))
.foldMonoid
.compile
.lastOrError
}
def handleEntry[F[_]: Sync](
ctx: Context[F, _],
ra: RAttachment,
archive: Option[RAttachmentArchive]
)(
entry: Binary[F]
): Stream[F, Extracted] = {
val mimeHint = MimetypeHint.filename(entry.name).withAdvertised(entry.mime.asString)
val fileMeta = ctx.store.bitpeace.saveNew(entry.data, 8192, mimeHint)
Stream.eval(ctx.logger.debug(s"Extracted ${entry.name}. Storing as attachment.")) >>
fileMeta.evalMap { fm =>
Ident.randomId.map { id =>
val nra = RAttachment(
id,
ra.itemId,
Ident.unsafe(fm.id),
0, //position is updated afterwards
ra.created,
Option(entry.name).map(_.trim).filter(_.nonEmpty)
)
val aa = archive.getOrElse(RAttachmentArchive.of(ra)).copy(id = id)
Extracted.of(nra, aa)
}
}
}
def storeAttachment[F[_]: Sync](ctx: Context[F, _])(ra: RAttachment): F[Int] = {
val insert = CreateItem.insertAttachment(ctx)(ra)
for {
n1 <- ctx.store.transact(RAttachment.updatePosition(ra.id, ra.position))
n2 <- if (n1 > 0) 0.pure[F] else insert
} yield n1 + n2
}
def storeArchive[F[_]: Sync](ctx: Context[F, _])(aa: RAttachmentArchive): F[Int] =
ctx.store.transact(RAttachmentArchive.insert(aa))
case class Extracted(files: Vector[RAttachment], archives: Vector[RAttachmentArchive]) {
def ++(e: Extracted) =
Extracted(files ++ e.files, archives ++ e.archives)
}
object Extracted {
val empty = Extracted(Vector.empty, Vector.empty)
def noArchive(ra: RAttachment): Extracted =
Extracted(Vector(ra), Vector.empty)
def of(ra: RAttachment, aa: RAttachmentArchive): Extracted =
Extracted(Vector(ra), Vector(aa))
implicit val extractedMonoid: Monoid[Extracted] =
Monoid.instance(empty, _ ++ _)
}
}

View File

@ -1,7 +1,7 @@
package docspell.joex.process
import cats.implicits._
import cats.effect.{ContextShift, Sync}
import cats.effect._
import docspell.common.{ItemState, ProcessItemArgs}
import docspell.joex.Config
import docspell.joex.scheduler.{Context, Task}
@ -12,7 +12,7 @@ object ItemHandler {
def onCancel[F[_]: Sync: ContextShift]: Task[F, ProcessItemArgs, Unit] =
logWarn("Now cancelling. Deleting potentially created data.").flatMap(_ => deleteByFileIds)
def apply[F[_]: Sync: ContextShift](cfg: Config): Task[F, ProcessItemArgs, Unit] =
def apply[F[_]: ConcurrentEffect: ContextShift](cfg: Config): Task[F, ProcessItemArgs, Unit] =
CreateItem[F]
.flatMap(itemStateTask(ItemState.Processing))
.flatMap(safeProcess[F](cfg))
@ -27,7 +27,7 @@ object ItemHandler {
last = ctx.config.retries == current.getOrElse(0)
} yield last
def safeProcess[F[_]: Sync: ContextShift](
def safeProcess[F[_]: ConcurrentEffect: ContextShift](
cfg: Config
)(data: ItemData): Task[F, ProcessItemArgs, ItemData] =
Task(isLastRetry[F, ProcessItemArgs] _).flatMap {

View File

@ -1,23 +1,20 @@
package docspell.joex.process
import cats.effect.{ContextShift, Sync}
import cats.effect._
import docspell.common.ProcessItemArgs
import docspell.joex.scheduler.Task
import docspell.joex.Config
object ProcessItem {
def apply[F[_]: Sync: ContextShift](
def apply[F[_]: ConcurrentEffect: ContextShift](
cfg: Config
)(item: ItemData): Task[F, ProcessItemArgs, ItemData] =
ConvertPdf(cfg.convert, item)
ExtractArchive(item)
.flatMap(ConvertPdf(cfg.convert, _))
.flatMap(TextExtraction(cfg.extraction, _))
.flatMap(Task.setProgress(25))
.flatMap(TextAnalysis[F])
.flatMap(Task.setProgress(50))
.flatMap(FindProposal[F])
.flatMap(EvalProposals[F])
.flatMap(SaveProposals[F])
.flatMap(analysisOnly[F])
.flatMap(Task.setProgress(75))
.flatMap(LinkProposal[F])
.flatMap(Task.setProgress(99))

View File

@ -15,7 +15,8 @@ immediately as long as there are enough resource.
What is missing, is a component that maintains periodic tasks. The
reason for this is to have house keeping tasks that run regularily and
clean up stale or unused data. Later, users should be able to create
periodic tasks, for example to read e-mails from an inbox.
periodic tasks, for example to read e-mails from an inbox or to be
notified of due items.
The problem is again, that it must work with multiple job executor
instances running at the same time. This is the same pattern as with
@ -38,14 +39,16 @@ For internal housekeeping tasks, it may suffice to reuse the existing
`job` queue by adding more fields such that a job may be considered
periodic. But this conflates with what the `Scheduler` is doing now
(executing tasks as soon as possible while being bound to some
resources) with a completely different subject.
resource limits) with a completely different subject.
There will be a new `PeriodicScheduler` that works on a new table in
the database that is representing periodic tasks. This table will
share fields with the `job` table to be able to create `RJob`
instances. This new component is only taking care of periodically
submitting jobs to the job queue such that the `Scheduler` will
eventually pick it up and run it.
share fields with the `job` table to be able to create `RJob` records.
This new component is only taking care of periodically submitting jobs
to the job queue such that the `Scheduler` will eventually pick it up
and run it. If the tasks cannot run (for example due to resource
limitation), the periodic scheduler can't do nothing but wait and try
next time.
```sql
CREATE TABLE "periodic_task" (
@ -65,11 +68,11 @@ CREATE TABLE "periodic_task" (
);
```
Preparing for other features, periodic tasks will be created by users.
It should be possible to disable/enable them. The next 6 properties
are needed to insert jobs into the `job` table. The `worker` field
(and `marked`) are used to mark a periodic job as "being worked on by
a job executor".
Preparing for other features, at some point periodic tasks will be
created by users. It should be possible to disable/enable them. The
next 6 properties are needed to insert jobs into the `job` table. The
`worker` field (and `marked`) are used to mark a periodic job as
"being worked on by a job executor".
The `timer` is the schedule, which is a
[systemd-like](https://man.cx/systemd.time#heading7) calendar event

View File

@ -0,0 +1,44 @@
---
layout: docs
title: Archive Files
---
# {{ page.title }}
## Context and Problem Statement
Docspell should have support for files that contain the actual files
that matter, like zip files and other such things. It should extract
its contents automatcially.
Since docspell should never drop or modify user data, the archive file
must be present in the database. And it must be possible to download
the file unmodified.
On the other hand, files in there need to be text analysed and
converted to pdf files.
## Decision Outcome
There is currently a table `attachment_source` which holds references
to "original" files. These are the files as uploaded by the user,
before converted to pdf. Archive files add a subtlety to this: in case
of an archive, an `attachment_source` is the original (non-archive)
file inside an archive.
The archive file itself will be stored in a separate table `attachment_archive`.
Example: uploading a `files.zip` ZIP file containing `report.jpg`:
- `attachment_source`: report.jpg
- `attachment`: report.pdf
- `attachment_archive`: files.zip
Archive may contain other archives. Then the inner archives will not
be saved. The archive file is extracted recursively, until there is no
known archive file found.
## Initial Support
Initial support is implemented for ZIP and EML (e-mail files) files.

View File

@ -25,6 +25,15 @@ compete on getting the next job from the queue. After a job finishes
and no job is waiting in the queue, joex will sleep until notified
again. It will also periodically notify itself as a fallback.
## Task vs Job
Just for the sake of this document, a task denotes the code that has
to be executed or the thing that has to be done. It emerges in a job,
once a task is submitted into the queue from where it will be picked
up and executed eventually. A job maintains a state and other things,
while a task is just code.
## Scheduler and Queue
The scheduler is the part that runs and monitors the long running
@ -115,6 +124,15 @@ reach a joex component. This periodic wakup is just to ensure that
jobs are eventually run.
## Periodic Tasks
The job executor can execute tasks periodically. These tasks are
stored in the database such that they can be submitted into the job
queue. Multiple job executors can run at once, only one is ever doing
something with a task. So a periodic task is never submitted twice. It
is also not submitted, if a previous task has not finished yet.
## Starting on demand
The job executor and rest server can be started multiple times. This
@ -129,6 +147,7 @@ all have unique `app-id`s.
Once the files have been processced you can stop the additional
executors.
## Shutting down
If a job executor is sleeping and not executing any jobs, you can just

View File

@ -28,6 +28,9 @@ title: Features and Limitations
- Images (jpg, png, tiff)
- HTML
- text/* (treated as Markdown)
- zip
- [eml](https://en.wikipedia.org/wiki/Email#Filename_extensions)
(e-mail files in plain text MIME)
- Tools:
- Watch a folder: watch folders for changes and send files to docspell
- Firefox plugin: right click on a link and send the file to docspell

View File

@ -0,0 +1,8 @@
CREATE TABLE `attachment_archive` (
`id` varchar(254) not null primary key,
`file_id` varchar(254) not null,
`filename` varchar(254),
`created` timestamp not null,
foreign key (`file_id`) references `filemeta`(`id`),
foreign key (`id`) references `attachment`(`attachid`)
);

View File

@ -0,0 +1,8 @@
CREATE TABLE "attachment_archive" (
"id" varchar(254) not null primary key,
"file_id" varchar(254) not null,
"filename" varchar(254),
"created" timestamp not null,
foreign key ("file_id") references "filemeta"("id"),
foreign key ("id") references "attachment"("attachid")
);

View File

@ -13,10 +13,10 @@ object FlywayMigrate {
val locations = jdbc.dbmsName match {
case Some(dbtype) =>
val name = if (dbtype == "h2") "postgresql" else dbtype
List("classpath:db/migration/common", s"classpath:db/migration/${name}")
List(s"classpath:db/migration/${name}")
case None =>
logger.warn(s"Cannot read database name from jdbc url: ${jdbc.url}. Go with H2")
List("classpath:db/migration/common", "classpath:db/h2")
logger.warn(s"Cannot read database name from jdbc url: ${jdbc.url}. Go with PostgreSQL")
List("classpath:db/postgresql")
}
logger.info(s"Using migration locations: $locations")

View File

@ -3,14 +3,17 @@ package docspell.store.queries
import fs2.Stream
import cats.implicits._
import cats.effect.Sync
import cats.data.OptionT
import doobie._
import doobie.implicits._
import docspell.common.{Ident, MetaProposalList}
import docspell.store.Store
import docspell.store.impl.Implicits._
import docspell.store.records.{RAttachment, RAttachmentMeta, RAttachmentSource, RItem}
import docspell.store.records._
import docspell.common.syntax.all._
object QAttachment {
private[this] val logger = org.log4s.getLogger
def deleteById[F[_]: Sync](store: Store[F])(attachId: Ident, coll: Ident): F[Int] =
for {
@ -20,9 +23,12 @@ object QAttachment {
rsFile <- store
.transact(RAttachmentSource.findByIdAndCollective(attachId, coll))
.map(_.map(_.fileId))
aaFile <- store
.transact(RAttachmentArchive.findByIdAndCollective(attachId, coll))
.map(_.map(_.fileId))
n <- store.transact(RAttachment.delete(attachId))
f <- Stream
.emits(raFile.toSeq ++ rsFile.toSeq)
.emits(raFile.toSeq ++ rsFile.toSeq ++ aaFile.toSeq)
.map(_.id)
.flatMap(store.bitpeace.delete)
.map(flag => if (flag) 1 else 0)
@ -32,20 +38,45 @@ object QAttachment {
def deleteAttachment[F[_]: Sync](store: Store[F])(ra: RAttachment): F[Int] =
for {
_ <- logger.fdebug[F](s"Deleting attachment: ${ra.id.id}")
s <- store.transact(RAttachmentSource.findById(ra.id))
n <- store.transact(RAttachment.delete(ra.id))
_ <- logger.fdebug[F](
s"Deleted $n meta records (source, meta, archive). Deleting binaries now."
)
f <- Stream
.emits(ra.fileId.id +: s.map(_.fileId.id).toSeq)
.emits(ra.fileId.id +: (s.map(_.fileId.id).toSeq))
.flatMap(store.bitpeace.delete)
.map(flag => if (flag) 1 else 0)
.compile
.foldMonoid
} yield n + f
def deleteItemAttachments[F[_]: Sync](store: Store[F])(itemId: Ident, coll: Ident): F[Int] =
def deleteArchive[F[_]: Sync](store: Store[F])(attachId: Ident): F[Int] = {
(for {
aa <- OptionT(store.transact(RAttachmentArchive.findById(attachId)))
n <- OptionT.liftF(store.transact(RAttachmentArchive.deleteAll(aa.fileId)))
_ <- OptionT.liftF(
Stream
.emit(aa.fileId.id)
.flatMap(store.bitpeace.delete)
.compile
.drain
)
} yield n).getOrElse(0)
}
def deleteItemAttachments[F[_]: Sync](
store: Store[F]
)(itemId: Ident, coll: Ident): F[Int] =
for {
ras <- store.transact(RAttachment.findByItemAndCollective(itemId, coll))
ns <- ras.traverse(deleteAttachment[F](store))
_ <- logger.finfo[F](
s"Have ${ras.size} attachments to delete. Must first delete archive entries"
)
a <- ras.traverse(a => deleteArchive(store)(a.id))
_ <- logger.fdebug[F](s"Deleted ${a.sum} archive entries")
ns <- ras.traverse(deleteAttachment[F](store))
} yield ns.sum
def getMetaProposals(itemId: Ident, coll: Ident): ConnectionIO[MetaProposalList] = {
@ -56,8 +87,12 @@ object QAttachment {
val q = fr"SELECT" ++ MC.proposals
.prefix("m")
.f ++ fr"FROM" ++ RAttachmentMeta.table ++ fr"m" ++
fr"INNER JOIN" ++ RAttachment.table ++ fr"a ON" ++ AC.id.prefix("a").is(MC.id.prefix("m")) ++
fr"INNER JOIN" ++ RItem.table ++ fr"i ON" ++ AC.itemId.prefix("a").is(IC.id.prefix("i")) ++
fr"INNER JOIN" ++ RAttachment.table ++ fr"a ON" ++ AC.id
.prefix("a")
.is(MC.id.prefix("m")) ++
fr"INNER JOIN" ++ RItem.table ++ fr"i ON" ++ AC.itemId
.prefix("a")
.is(IC.id.prefix("i")) ++
fr"WHERE" ++ and(AC.itemId.prefix("a").is(itemId), IC.cid.prefix("i").is(coll))
for {
@ -73,14 +108,18 @@ object QAttachment {
val MC = RAttachmentMeta.Columns
val IC = RItem.Columns
val q = fr"SELECT" ++ commas(MC.all.map(_.prefix("m").f)) ++ fr"FROM" ++ RItem.table ++ fr"i" ++
fr"INNER JOIN" ++ RAttachment.table ++ fr"a ON" ++ IC.id
.prefix("i")
.is(AC.itemId.prefix("a")) ++
fr"INNER JOIN" ++ RAttachmentMeta.table ++ fr"m ON" ++ AC.id
.prefix("a")
.is(MC.id.prefix("m")) ++
fr"WHERE" ++ and(AC.id.prefix("a").is(attachId), IC.cid.prefix("i").is(collective))
val q =
fr"SELECT" ++ commas(MC.all.map(_.prefix("m").f)) ++ fr"FROM" ++ RItem.table ++ fr"i" ++
fr"INNER JOIN" ++ RAttachment.table ++ fr"a ON" ++ IC.id
.prefix("i")
.is(AC.itemId.prefix("a")) ++
fr"INNER JOIN" ++ RAttachmentMeta.table ++ fr"m ON" ++ AC.id
.prefix("a")
.is(MC.id.prefix("m")) ++
fr"WHERE" ++ and(
AC.id.prefix("a").is(attachId),
IC.cid.prefix("i").is(collective)
)
q.query[RAttachmentMeta].option
}

View File

@ -39,6 +39,9 @@ object QCollective {
union distinct
select a.file_id,m.length from attachment_source a
inner join filemeta m on m.id = a.file_id where a.id in (select aid from attachs)
union distinct
select a.file_id,m.length from attachment_archive a
inner join filemeta m on m.id = a.file_id where a.id in (select aid from attachs)
) as t""".query[Option[Long]].unique
val q3 = fr"SELECT" ++ commas(

View File

@ -40,7 +40,11 @@ object QItem {
val ICC = List(RItem.Columns.id, RItem.Columns.name).map(_.prefix("ref"))
val cq =
selectSimple(IC ++ OC ++ P0C ++ P1C ++ EC ++ ICC, RItem.table ++ fr"i", Fragment.empty) ++
selectSimple(
IC ++ OC ++ P0C ++ P1C ++ EC ++ ICC,
RItem.table ++ fr"i",
Fragment.empty
) ++
fr"LEFT JOIN" ++ ROrganization.table ++ fr"o ON" ++ RItem.Columns.corrOrg
.prefix("i")
.is(ROrganization.Columns.oid.prefix("o")) ++
@ -179,7 +183,11 @@ object QItem {
// inclusive tags are AND-ed
val tagSelectsIncl = q.tagsInclude
.map(tid =>
selectSimple(List(RTagItem.Columns.itemId), RTagItem.table, RTagItem.Columns.tagId.is(tid))
selectSimple(
List(RTagItem.Columns.itemId),
RTagItem.table,
RTagItem.Columns.tagId.is(tid)
)
)
.map(f => sql"(" ++ f ++ sql") ")
@ -207,21 +215,28 @@ object QItem {
REquipment.Columns.eid.prefix("e1").isOrDiscard(q.concEquip),
if (q.tagsInclude.isEmpty) Fragment.empty
else
IC.id.prefix("i") ++ sql" IN (" ++ tagSelectsIncl.reduce(_ ++ fr"INTERSECT" ++ _) ++ sql")",
IC.id.prefix("i") ++ sql" IN (" ++ tagSelectsIncl
.reduce(_ ++ fr"INTERSECT" ++ _) ++ sql")",
if (q.tagsExclude.isEmpty) Fragment.empty
else IC.id.prefix("i").f ++ sql" NOT IN (" ++ tagSelectsExcl ++ sql")",
q.dateFrom
.map(d => coalesce(IC.itemDate.prefix("i").f, IC.created.prefix("i").f) ++ fr">= $d")
.map(d =>
coalesce(IC.itemDate.prefix("i").f, IC.created.prefix("i").f) ++ fr">= $d"
)
.getOrElse(Fragment.empty),
q.dateTo
.map(d => coalesce(IC.itemDate.prefix("i").f, IC.created.prefix("i").f) ++ fr"<= $d")
.map(d =>
coalesce(IC.itemDate.prefix("i").f, IC.created.prefix("i").f) ++ fr"<= $d"
)
.getOrElse(Fragment.empty),
q.dueDateFrom.map(d => IC.dueDate.prefix("i").isGt(d)).getOrElse(Fragment.empty),
q.dueDateTo.map(d => IC.dueDate.prefix("i").isLt(d)).getOrElse(Fragment.empty)
)
val order = orderBy(coalesce(IC.itemDate.prefix("i").f, IC.created.prefix("i").f) ++ fr"DESC")
val frag = query ++ fr"WHERE" ++ cond ++ order
val order = orderBy(
coalesce(IC.itemDate.prefix("i").f, IC.created.prefix("i").f) ++ fr"DESC"
)
val frag = query ++ fr"WHERE" ++ cond ++ order
logger.trace(s"List items: $frag")
frag.query[ListItem].stream
}
@ -247,25 +262,39 @@ object QItem {
}
def findByChecksum(checksum: String, collective: Ident): ConnectionIO[Vector[RItem]] = {
val IC = RItem.Columns.all.map(_.prefix("i"))
val aItem = RAttachment.Columns.itemId.prefix("a")
val aId = RAttachment.Columns.id.prefix("a")
val aFileId = RAttachment.Columns.fileId.prefix("a")
val iId = RItem.Columns.id.prefix("i")
val iColl = RItem.Columns.cid.prefix("i")
val sId = RAttachmentSource.Columns.id.prefix("s")
val sFileId = RAttachmentSource.Columns.fileId.prefix("s")
val m1Id = RFileMeta.Columns.id.prefix("m1")
val m2Id = RFileMeta.Columns.id.prefix("m2")
val IC = RItem.Columns.all.map(_.prefix("i"))
val aItem = RAttachment.Columns.itemId.prefix("a")
val aId = RAttachment.Columns.id.prefix("a")
val aFileId = RAttachment.Columns.fileId.prefix("a")
val iId = RItem.Columns.id.prefix("i")
val iColl = RItem.Columns.cid.prefix("i")
val sId = RAttachmentSource.Columns.id.prefix("s")
val sFileId = RAttachmentSource.Columns.fileId.prefix("s")
val rId = RAttachmentArchive.Columns.id.prefix("r")
val rFileId = RAttachmentArchive.Columns.fileId.prefix("r")
val m1Id = RFileMeta.Columns.id.prefix("m1")
val m2Id = RFileMeta.Columns.id.prefix("m2")
val m3Id = RFileMeta.Columns.id.prefix("m3")
val m1Checksum = RFileMeta.Columns.checksum.prefix("m1")
val m2Checksum = RFileMeta.Columns.checksum.prefix("m2")
val m3Checksum = RFileMeta.Columns.checksum.prefix("m3")
val from = RItem.table ++ fr"i INNER JOIN" ++ RAttachment.table ++ fr"a ON" ++ aItem.is(iId) ++
fr"INNER JOIN" ++ RAttachmentSource.table ++ fr"s ON" ++ aId.is(sId) ++
fr"INNER JOIN" ++ RFileMeta.table ++ fr"m1 ON" ++ m1Id.is(aFileId) ++
fr"INNER JOIN" ++ RFileMeta.table ++ fr"m2 ON" ++ m2Id.is(sFileId)
selectSimple(IC, from, and(or(m1Checksum.is(checksum), m2Checksum.is(checksum)), iColl.is(collective)))
.query[RItem]
val from =
RItem.table ++ fr"i INNER JOIN" ++ RAttachment.table ++ fr"a ON" ++ aItem.is(iId) ++
fr"INNER JOIN" ++ RAttachmentSource.table ++ fr"s ON" ++ aId.is(sId) ++
fr"INNER JOIN" ++ RFileMeta.table ++ fr"m1 ON" ++ m1Id.is(aFileId) ++
fr"INNER JOIN" ++ RFileMeta.table ++ fr"m2 ON" ++ m2Id.is(sFileId) ++
fr"LEFT OUTER JOIN" ++ RAttachmentArchive.table ++ fr"r ON" ++ aId.is(rId) ++
fr"INNER JOIN" ++ RFileMeta.table ++ fr"m3 ON" ++ m3Id.is(rFileId)
selectSimple(
IC,
from,
and(
or(m1Checksum.is(checksum), m2Checksum.is(checksum), m3Checksum.is(checksum)),
iColl.is(collective)
)
).query[RItem]
.to[Vector]
}

View File

@ -41,6 +41,9 @@ object RAttachment {
def updateFileIdAndName(attachId: Ident, fId: Ident, fname: Option[String]): ConnectionIO[Int] =
updateRow(table, id.is(attachId), commas(fileId.setTo(fId), name.setTo(fname))).update.run
def updatePosition(attachId: Ident, pos: Int): ConnectionIO[Int] =
updateRow(table, id.is(attachId), position.setTo(pos)).update.run
def findById(attachId: Ident): ConnectionIO[Option[RAttachment]] =
selectSimple(all, table, id.is(attachId)).query[RAttachment].option

View File

@ -0,0 +1,90 @@
package docspell.store.records
import bitpeace.FileMeta
import doobie._
import doobie.implicits._
import docspell.common._
import docspell.store.impl._
import docspell.store.impl.Implicits._
/** The archive file of some attachment. The `id` is shared with the
* attachment, to create a 0..1-1 relationship.
*/
case class RAttachmentArchive(
id: Ident, //same as RAttachment.id
fileId: Ident,
name: Option[String],
created: Timestamp
)
object RAttachmentArchive {
val table = fr"attachment_archive"
object Columns {
val id = Column("id")
val fileId = Column("file_id")
val name = Column("filename")
val created = Column("created")
val all = List(id, fileId, name, created)
}
import Columns._
def of(ra: RAttachment): RAttachmentArchive =
RAttachmentArchive(ra.id, ra.fileId, ra.name, ra.created)
def insert(v: RAttachmentArchive): ConnectionIO[Int] =
insertRow(table, all, fr"${v.id},${v.fileId},${v.name},${v.created}").update.run
def findById(attachId: Ident): ConnectionIO[Option[RAttachmentArchive]] =
selectSimple(all, table, id.is(attachId)).query[RAttachmentArchive].option
def delete(attachId: Ident): ConnectionIO[Int] =
deleteFrom(table, id.is(attachId)).update.run
def deleteAll(fId: Ident): ConnectionIO[Int] =
deleteFrom(table, fileId.is(fId)).update.run
def findByIdAndCollective(
attachId: Ident,
collective: Ident
): ConnectionIO[Option[RAttachmentArchive]] = {
val bId = RAttachment.Columns.id.prefix("b")
val aId = Columns.id.prefix("a")
val bItem = RAttachment.Columns.itemId.prefix("b")
val iId = RItem.Columns.id.prefix("i")
val iColl = RItem.Columns.cid.prefix("i")
val from = table ++ fr"a INNER JOIN" ++
RAttachment.table ++ fr"b ON" ++ aId.is(bId) ++
fr"INNER JOIN" ++ RItem.table ++ fr"i ON" ++ bItem.is(iId)
val where = and(aId.is(attachId), bId.is(attachId), iColl.is(collective))
selectSimple(all.map(_.prefix("a")), from, where).query[RAttachmentArchive].option
}
def findByItemWithMeta(id: Ident): ConnectionIO[Vector[(RAttachmentArchive, FileMeta)]] = {
import bitpeace.sql._
val aId = Columns.id.prefix("a")
val afileMeta = fileId.prefix("a")
val bPos = RAttachment.Columns.position.prefix("b")
val bId = RAttachment.Columns.id.prefix("b")
val bItem = RAttachment.Columns.itemId.prefix("b")
val mId = RFileMeta.Columns.id.prefix("m")
val cols = all.map(_.prefix("a")) ++ RFileMeta.Columns.all.map(_.prefix("m"))
val from = table ++ fr"a INNER JOIN" ++
RFileMeta.table ++ fr"m ON" ++ afileMeta.is(mId) ++ fr"INNER JOIN" ++
RAttachment.table ++ fr"b ON" ++ aId.is(bId)
val where = bItem.is(id)
(selectSimple(cols, from, where) ++ orderBy(bPos.asc))
.query[(RAttachmentArchive, FileMeta)]
.to[Vector]
}
}