mirror of
https://github.com/TheAnachronism/docspell.git
synced 2025-04-04 10:29:34 +00:00
Check file integrity
This commit is contained in:
parent
422c0905dc
commit
c1ce0769eb
@ -92,7 +92,7 @@ object BackendApp {
|
||||
)
|
||||
notifyImpl <- ONotification(store, notificationMod)
|
||||
bookmarksImpl <- OQueryBookmarks(store)
|
||||
fileRepoImpl <- OFileRepository(queue, joexImpl)
|
||||
fileRepoImpl <- OFileRepository(store, queue, joexImpl)
|
||||
} yield new BackendApp[F] {
|
||||
val pubSub = pubSubT
|
||||
val login = loginImpl
|
||||
|
@ -15,6 +15,26 @@ import docspell.notification.api.PeriodicQueryArgs
|
||||
import docspell.store.records.RJob
|
||||
|
||||
object JobFactory extends MailAddressCodec {
|
||||
def integrityCheck[F[_]: Sync](
|
||||
args: FileIntegrityCheckArgs,
|
||||
submitter: AccountId = DocspellSystem.account
|
||||
): F[RJob] =
|
||||
for {
|
||||
id <- Ident.randomId[F]
|
||||
now <- Timestamp.current[F]
|
||||
job = RJob.newJob(
|
||||
id,
|
||||
FileIntegrityCheckArgs.taskName,
|
||||
submitter.collective,
|
||||
args,
|
||||
s"Check integrity of files",
|
||||
now,
|
||||
submitter.user,
|
||||
Priority.High,
|
||||
Some(FileIntegrityCheckArgs.taskName)
|
||||
)
|
||||
} yield job
|
||||
|
||||
def fileCopy[F[_]: Sync](
|
||||
args: FileCopyTaskArgs,
|
||||
submitter: AccountId = DocspellSystem.account
|
||||
|
@ -6,27 +6,41 @@
|
||||
|
||||
package docspell.backend.ops
|
||||
|
||||
import cats.data.OptionT
|
||||
import cats.effect._
|
||||
import cats.implicits._
|
||||
|
||||
import docspell.backend.JobFactory
|
||||
import docspell.common.FileCopyTaskArgs
|
||||
import docspell.backend.ops.OFileRepository.IntegrityResult
|
||||
import docspell.common._
|
||||
import docspell.store.Store
|
||||
import docspell.store.queue.JobQueue
|
||||
import docspell.store.records.RJob
|
||||
|
||||
import scodec.bits.ByteVector
|
||||
|
||||
trait OFileRepository[F[_]] {
|
||||
|
||||
/** Inserts the job or return None if such a job already is running. */
|
||||
def cloneFileRepository(args: FileCopyTaskArgs, notifyJoex: Boolean): F[Option[RJob]]
|
||||
|
||||
def checkIntegrityAll(part: FileKeyPart, notifyJoex: Boolean): F[Option[RJob]]
|
||||
|
||||
def checkIntegrity(key: FileKey, hash: Option[ByteVector]): F[Option[IntegrityResult]]
|
||||
}
|
||||
|
||||
object OFileRepository {
|
||||
|
||||
case class IntegrityResult(ok: Boolean, key: FileKey)
|
||||
|
||||
def apply[F[_]: Async](
|
||||
store: Store[F],
|
||||
queue: JobQueue[F],
|
||||
joex: OJoex[F]
|
||||
): Resource[F, OFileRepository[F]] =
|
||||
Resource.pure(new OFileRepository[F] {
|
||||
private[this] val logger = docspell.logging.getLogger[F]
|
||||
|
||||
def cloneFileRepository(
|
||||
args: FileCopyTaskArgs,
|
||||
notifyJoex: Boolean
|
||||
@ -36,5 +50,43 @@ object OFileRepository {
|
||||
flag <- queue.insertIfNew(job)
|
||||
_ <- if (notifyJoex) joex.notifyAllNodes else ().pure[F]
|
||||
} yield Option.when(flag)(job)
|
||||
|
||||
def checkIntegrityAll(part: FileKeyPart, notifyJoex: Boolean): F[Option[RJob]] =
|
||||
for {
|
||||
job <- JobFactory.integrityCheck(FileIntegrityCheckArgs(part))
|
||||
flag <- queue.insertIfNew(job)
|
||||
_ <- if (notifyJoex) joex.notifyAllNodes else ().pure[F]
|
||||
} yield Option.when(flag)(job)
|
||||
|
||||
def checkIntegrity(
|
||||
key: FileKey,
|
||||
hash: Option[ByteVector]
|
||||
): F[Option[IntegrityResult]] =
|
||||
(for {
|
||||
_ <- OptionT.liftF(
|
||||
logger.debugWith(s"Checking file $key")(_.data("fileKey", key))
|
||||
)
|
||||
expectedHash <-
|
||||
hash.fold(OptionT(store.fileRepo.findMeta(key)).map(_.checksum))(h =>
|
||||
OptionT.pure[F](h)
|
||||
)
|
||||
|
||||
actualHash <-
|
||||
OptionT.liftF(
|
||||
logger.debugWith(s"Calculating new hash for $key")(
|
||||
_.data("fileKey", key)
|
||||
) *>
|
||||
store.fileRepo
|
||||
.getBytes(key)
|
||||
.through(fs2.hash.sha256)
|
||||
.compile
|
||||
.foldChunks(ByteVector.empty)(_ ++ _.toByteVector)
|
||||
)
|
||||
res = IntegrityResult(expectedHash == actualHash, key)
|
||||
_ <- OptionT.liftF {
|
||||
if (res.ok) logger.debug(s"File hashes match for $key")
|
||||
else logger.warnWith(s"File hashes differ for: $key")(_.data("fileKey", key))
|
||||
}
|
||||
} yield res).value
|
||||
})
|
||||
}
|
||||
|
@ -0,0 +1,22 @@
|
||||
/*
|
||||
* Copyright 2020 Eike K. & Contributors
|
||||
*
|
||||
* SPDX-License-Identifier: AGPL-3.0-or-later
|
||||
*/
|
||||
|
||||
package docspell.common
|
||||
|
||||
import io.circe.generic.semiauto.{deriveDecoder, deriveEncoder}
|
||||
import io.circe.{Decoder, Encoder}
|
||||
|
||||
final case class FileIntegrityCheckArgs(pattern: FileKeyPart) {}
|
||||
|
||||
object FileIntegrityCheckArgs {
|
||||
val taskName: Ident = Ident.unsafe("all-file-integrity-check")
|
||||
|
||||
implicit val jsonDecoder: Decoder[FileIntegrityCheckArgs] =
|
||||
deriveDecoder
|
||||
|
||||
implicit val jsonEncoder: Encoder[FileIntegrityCheckArgs] =
|
||||
deriveEncoder
|
||||
}
|
@ -9,7 +9,10 @@ package docspell.common
|
||||
import io.circe.generic.semiauto.{deriveDecoder, deriveEncoder}
|
||||
import io.circe.{Decoder, Encoder}
|
||||
|
||||
case class FileKey(collective: Ident, category: FileCategory, id: Ident)
|
||||
final case class FileKey(collective: Ident, category: FileCategory, id: Ident) {
|
||||
override def toString =
|
||||
s"${collective.id}/${category.id.id}/${id.id}"
|
||||
}
|
||||
|
||||
object FileKey {
|
||||
|
||||
|
@ -0,0 +1,53 @@
|
||||
/*
|
||||
* Copyright 2020 Eike K. & Contributors
|
||||
*
|
||||
* SPDX-License-Identifier: AGPL-3.0-or-later
|
||||
*/
|
||||
|
||||
package docspell.common
|
||||
|
||||
import cats.implicits._
|
||||
|
||||
import io.circe.syntax._
|
||||
import io.circe.{Decoder, DecodingFailure, Encoder}
|
||||
|
||||
sealed trait FileKeyPart {}
|
||||
|
||||
object FileKeyPart {
|
||||
|
||||
case object Empty extends FileKeyPart
|
||||
|
||||
final case class Collective(collective: Ident) extends FileKeyPart
|
||||
|
||||
final case class Category(collective: Ident, category: FileCategory) extends FileKeyPart
|
||||
|
||||
final case class Key(key: FileKey) extends FileKeyPart
|
||||
|
||||
implicit val jsonEncoder: Encoder[FileKeyPart] =
|
||||
Encoder.instance {
|
||||
case Empty => ().asJson
|
||||
case Collective(cid) =>
|
||||
Map("collective" -> cid.asJson).asJson
|
||||
case Category(cid, cat) =>
|
||||
Map("collective" -> cid.asJson, "category" -> cat.asJson).asJson
|
||||
case Key(key) =>
|
||||
key.asJson
|
||||
}
|
||||
|
||||
implicit val jsonDecoder: Decoder[FileKeyPart] =
|
||||
Decoder.instance { cursor =>
|
||||
for {
|
||||
cid <- cursor.getOrElse[Option[Ident]]("collective")(None)
|
||||
cat <- cursor.getOrElse[Option[FileCategory]]("category")(None)
|
||||
emptyObj = cursor.keys.exists(_.isEmpty)
|
||||
|
||||
c3 = cursor.as[FileKey].map(Key).toOption
|
||||
c2 = (cid, cat).mapN(Category)
|
||||
c1 = cid.map(Collective)
|
||||
c0 = Option.when(emptyObj)(Empty)
|
||||
|
||||
c = c3.orElse(c2).orElse(c1).orElse(c0)
|
||||
res <- c.toRight(DecodingFailure("", cursor.history))
|
||||
} yield res
|
||||
}
|
||||
}
|
@ -20,7 +20,7 @@ import docspell.ftsclient.FtsClient
|
||||
import docspell.ftssolr.SolrFtsClient
|
||||
import docspell.joex.analysis.RegexNerFile
|
||||
import docspell.joex.emptytrash._
|
||||
import docspell.joex.filecopy.FileCopyTask
|
||||
import docspell.joex.filecopy.{FileCopyTask, FileIntegrityCheckTask}
|
||||
import docspell.joex.fts.{MigrationTask, ReIndexTask}
|
||||
import docspell.joex.hk._
|
||||
import docspell.joex.learn.LearnClassifierTask
|
||||
@ -151,6 +151,7 @@ object JoexAppImpl extends MailAddressCodec {
|
||||
regexNer <- RegexNerFile(cfg.textAnalysis.regexNerFileConfig, store)
|
||||
updateCheck <- UpdateCheck.resource(httpClient)
|
||||
notification <- ONotification(store, notificationMod)
|
||||
fileRepo <- OFileRepository(store, queue, joex)
|
||||
sch <- SchedulerBuilder(cfg.scheduler, store)
|
||||
.withQueue(queue)
|
||||
.withPubSub(pubSubT)
|
||||
@ -287,6 +288,13 @@ object JoexAppImpl extends MailAddressCodec {
|
||||
FileCopyTask.onCancel[F]
|
||||
)
|
||||
)
|
||||
.withTask(
|
||||
JobTask.json(
|
||||
FileIntegrityCheckArgs.taskName,
|
||||
FileIntegrityCheckTask[F](fileRepo),
|
||||
FileIntegrityCheckTask.onCancel[F]
|
||||
)
|
||||
)
|
||||
.resource
|
||||
psch <- PeriodicScheduler.create(
|
||||
cfg.periodicScheduler,
|
||||
|
@ -0,0 +1,71 @@
|
||||
/*
|
||||
* Copyright 2020 Eike K. & Contributors
|
||||
*
|
||||
* SPDX-License-Identifier: AGPL-3.0-or-later
|
||||
*/
|
||||
|
||||
package docspell.joex.filecopy
|
||||
|
||||
import cats.Monoid
|
||||
import cats.effect._
|
||||
import cats.implicits._
|
||||
|
||||
import docspell.backend.ops.OFileRepository
|
||||
import docspell.backend.ops.OFileRepository.IntegrityResult
|
||||
import docspell.common.{FileIntegrityCheckArgs, FileKey}
|
||||
import docspell.joex.scheduler.Task
|
||||
import docspell.store.records.RFileMeta
|
||||
|
||||
import io.circe.Encoder
|
||||
import io.circe.generic.semiauto.deriveEncoder
|
||||
|
||||
object FileIntegrityCheckTask {
|
||||
type Args = FileIntegrityCheckArgs
|
||||
|
||||
case class Result(ok: Int, failedKeys: Set[FileKey]) {
|
||||
override def toString: String =
|
||||
s"Result(ok=$ok, failed=${failedKeys.size}, keysFailed=$failedKeys)"
|
||||
}
|
||||
object Result {
|
||||
val empty = Result(0, Set.empty)
|
||||
|
||||
def from(r: IntegrityResult): Result =
|
||||
if (r.ok) Result(1, Set.empty) else Result(0, Set(r.key))
|
||||
|
||||
implicit val monoid: Monoid[Result] =
|
||||
Monoid.instance(empty, (a, b) => Result(a.ok + b.ok, a.failedKeys ++ b.failedKeys))
|
||||
|
||||
implicit val jsonEncoder: Encoder[Result] =
|
||||
deriveEncoder
|
||||
}
|
||||
|
||||
def apply[F[_]: Sync](ops: OFileRepository[F]): Task[F, Args, Result] =
|
||||
Task { ctx =>
|
||||
ctx.store
|
||||
.transact(
|
||||
RFileMeta
|
||||
.findAll(ctx.args.pattern, 50)
|
||||
)
|
||||
.chunks
|
||||
.evalTap(c => ctx.logger.info(s"Checking next ${c.size} files…"))
|
||||
.unchunks
|
||||
.evalMap(meta => ops.checkIntegrity(meta.id, meta.checksum.some))
|
||||
.evalMap {
|
||||
case Some(r) =>
|
||||
Result.from(r).pure[F]
|
||||
case None =>
|
||||
ctx.logger.error(s"File not found").as(Result.empty)
|
||||
}
|
||||
.foldMonoid
|
||||
.compile
|
||||
.lastOrError
|
||||
.flatTap(result =>
|
||||
ctx.logger
|
||||
.infoWith(s"File check result: $result")(_.data("integrityCheck", result))
|
||||
)
|
||||
}
|
||||
|
||||
def onCancel[F[_]]: Task[F, Args, Unit] =
|
||||
Task.log(_.warn(s"Cancelling ${FileIntegrityCheckArgs.taskName.id} task"))
|
||||
|
||||
}
|
@ -11,8 +11,8 @@ import cats.effect._
|
||||
import cats.implicits._
|
||||
|
||||
import docspell.backend.BackendApp
|
||||
import docspell.common.FileCopyTaskArgs
|
||||
import docspell.common.FileCopyTaskArgs.Selection
|
||||
import docspell.common.{FileCopyTaskArgs, FileIntegrityCheckArgs, FileKeyPart}
|
||||
import docspell.restapi.model._
|
||||
|
||||
import org.http4s._
|
||||
@ -27,20 +27,35 @@ object FileRepositoryRoutes {
|
||||
import dsl._
|
||||
val logger = docspell.logging.getLogger[F]
|
||||
|
||||
HttpRoutes.of { case req @ POST -> Root / "cloneFileRepository" =>
|
||||
for {
|
||||
input <- req.as[FileRepositoryCloneRequest]
|
||||
args = makeTaskArgs(input)
|
||||
job <- backend.fileRepository.cloneFileRepository(args, true)
|
||||
result = BasicResult(
|
||||
job.isDefined,
|
||||
job.fold(s"Job for '${FileCopyTaskArgs.taskName.id}' already running")(j =>
|
||||
s"Job for '${FileCopyTaskArgs.taskName.id}' submitted: ${j.id.id}"
|
||||
HttpRoutes.of {
|
||||
case req @ POST -> Root / "cloneFileRepository" =>
|
||||
for {
|
||||
input <- req.as[FileRepositoryCloneRequest]
|
||||
args = makeTaskArgs(input)
|
||||
job <- backend.fileRepository.cloneFileRepository(args, true)
|
||||
result = BasicResult(
|
||||
job.isDefined,
|
||||
job.fold(s"Job for '${FileCopyTaskArgs.taskName.id}' already running")(j =>
|
||||
s"Job for '${FileCopyTaskArgs.taskName.id}' submitted: ${j.id.id}"
|
||||
)
|
||||
)
|
||||
)
|
||||
_ <- logger.info(result.message)
|
||||
resp <- Ok(result)
|
||||
} yield resp
|
||||
_ <- logger.info(result.message)
|
||||
resp <- Ok(result)
|
||||
} yield resp
|
||||
|
||||
case req @ POST -> Root / "integrityCheckAll" =>
|
||||
for {
|
||||
input <- req.as[FileKeyPart]
|
||||
job <- backend.fileRepository.checkIntegrityAll(input, true)
|
||||
result = BasicResult(
|
||||
job.isDefined,
|
||||
job.fold(s"Job for '${FileCopyTaskArgs.taskName.id}' already running")(j =>
|
||||
s"Job for '${FileIntegrityCheckArgs.taskName.id}' submitted: ${j.id.id}"
|
||||
)
|
||||
)
|
||||
_ <- logger.info(result.message)
|
||||
resp <- Ok(result)
|
||||
} yield resp
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -26,6 +26,14 @@ object BinnyUtils {
|
||||
def fileKeyToBinaryId(fk: FileKey): BinaryId =
|
||||
BinaryId(s"${fk.collective.id}/${fk.category.id.id}/${fk.id.id}")
|
||||
|
||||
def fileKeyPartToPrefix(fkp: FileKeyPart): Option[String] =
|
||||
fkp match {
|
||||
case FileKeyPart.Empty => None
|
||||
case FileKeyPart.Collective(cid) => Some(s"${cid.id}/%")
|
||||
case FileKeyPart.Category(cid, cat) => Some(s"${cid.id}/${cat.id.id}/%")
|
||||
case FileKeyPart.Key(key) => Some(fileKeyToBinaryId(key).id)
|
||||
}
|
||||
|
||||
def binaryIdToFileKey(bid: BinaryId): Either[String, FileKey] =
|
||||
bid.id.split('/').toList match {
|
||||
case cId :: catId :: fId :: Nil =>
|
||||
|
@ -8,8 +8,10 @@ package docspell.store.records
|
||||
|
||||
import cats.data.NonEmptyList
|
||||
import cats.implicits._
|
||||
import fs2.Stream
|
||||
|
||||
import docspell.common.{FileKey, _}
|
||||
import docspell.store.file.BinnyUtils
|
||||
import docspell.store.qb.DSL._
|
||||
import docspell.store.qb._
|
||||
|
||||
@ -44,6 +46,18 @@ object RFileMeta {
|
||||
def as(alias: String): Table =
|
||||
Table(Some(alias))
|
||||
|
||||
def findAll(part: FileKeyPart, chunkSize: Int): Stream[ConnectionIO, RFileMeta] = {
|
||||
val cond = BinnyUtils
|
||||
.fileKeyPartToPrefix(part)
|
||||
.map(prefix => T.id.cast[String].like(prefix))
|
||||
|
||||
Select(
|
||||
select(T.all),
|
||||
from(T),
|
||||
cond.getOrElse(Condition.unit)
|
||||
).build.query[RFileMeta].streamWithChunkSize(chunkSize)
|
||||
}
|
||||
|
||||
def insert(r: RFileMeta): ConnectionIO[Int] =
|
||||
DML.insert(T, T.all, fr"${r.id},${r.created},${r.mimetype},${r.length},${r.checksum}")
|
||||
|
||||
|
@ -68,7 +68,7 @@ object StoreFixture {
|
||||
ds <- dataSource(jdbc)
|
||||
xa <- makeXA(ds)
|
||||
cfg = FileRepositoryConfig.Database(64 * 1024)
|
||||
fr = FileRepository[IO](xa, ds, cfg)
|
||||
fr = FileRepository[IO](xa, ds, cfg, true)
|
||||
store = new StoreImpl[IO](fr, jdbc, ds, xa)
|
||||
_ <- Resource.eval(store.migrate)
|
||||
} yield store
|
||||
|
Loading…
x
Reference in New Issue
Block a user