diff --git a/modules/backend/src/main/scala/docspell/backend/BackendApp.scala b/modules/backend/src/main/scala/docspell/backend/BackendApp.scala index 9c24f29c..f3142016 100644 --- a/modules/backend/src/main/scala/docspell/backend/BackendApp.scala +++ b/modules/backend/src/main/scala/docspell/backend/BackendApp.scala @@ -92,7 +92,7 @@ object BackendApp { ) notifyImpl <- ONotification(store, notificationMod) bookmarksImpl <- OQueryBookmarks(store) - fileRepoImpl <- OFileRepository(queue, joexImpl) + fileRepoImpl <- OFileRepository(store, queue, joexImpl) } yield new BackendApp[F] { val pubSub = pubSubT val login = loginImpl diff --git a/modules/backend/src/main/scala/docspell/backend/JobFactory.scala b/modules/backend/src/main/scala/docspell/backend/JobFactory.scala index 3f43b280..f41911f4 100644 --- a/modules/backend/src/main/scala/docspell/backend/JobFactory.scala +++ b/modules/backend/src/main/scala/docspell/backend/JobFactory.scala @@ -15,6 +15,26 @@ import docspell.notification.api.PeriodicQueryArgs import docspell.store.records.RJob object JobFactory extends MailAddressCodec { + def integrityCheck[F[_]: Sync]( + args: FileIntegrityCheckArgs, + submitter: AccountId = DocspellSystem.account + ): F[RJob] = + for { + id <- Ident.randomId[F] + now <- Timestamp.current[F] + job = RJob.newJob( + id, + FileIntegrityCheckArgs.taskName, + submitter.collective, + args, + s"Check integrity of files", + now, + submitter.user, + Priority.High, + Some(FileIntegrityCheckArgs.taskName) + ) + } yield job + def fileCopy[F[_]: Sync]( args: FileCopyTaskArgs, submitter: AccountId = DocspellSystem.account diff --git a/modules/backend/src/main/scala/docspell/backend/ops/OFileRepository.scala b/modules/backend/src/main/scala/docspell/backend/ops/OFileRepository.scala index 270ab66e..aec12823 100644 --- a/modules/backend/src/main/scala/docspell/backend/ops/OFileRepository.scala +++ b/modules/backend/src/main/scala/docspell/backend/ops/OFileRepository.scala @@ -6,27 +6,41 @@ package docspell.backend.ops +import cats.data.OptionT import cats.effect._ import cats.implicits._ import docspell.backend.JobFactory -import docspell.common.FileCopyTaskArgs +import docspell.backend.ops.OFileRepository.IntegrityResult +import docspell.common._ +import docspell.store.Store import docspell.store.queue.JobQueue import docspell.store.records.RJob +import scodec.bits.ByteVector + trait OFileRepository[F[_]] { /** Inserts the job or return None if such a job already is running. */ def cloneFileRepository(args: FileCopyTaskArgs, notifyJoex: Boolean): F[Option[RJob]] + + def checkIntegrityAll(part: FileKeyPart, notifyJoex: Boolean): F[Option[RJob]] + + def checkIntegrity(key: FileKey, hash: Option[ByteVector]): F[Option[IntegrityResult]] } object OFileRepository { + case class IntegrityResult(ok: Boolean, key: FileKey) + def apply[F[_]: Async]( + store: Store[F], queue: JobQueue[F], joex: OJoex[F] ): Resource[F, OFileRepository[F]] = Resource.pure(new OFileRepository[F] { + private[this] val logger = docspell.logging.getLogger[F] + def cloneFileRepository( args: FileCopyTaskArgs, notifyJoex: Boolean @@ -36,5 +50,43 @@ object OFileRepository { flag <- queue.insertIfNew(job) _ <- if (notifyJoex) joex.notifyAllNodes else ().pure[F] } yield Option.when(flag)(job) + + def checkIntegrityAll(part: FileKeyPart, notifyJoex: Boolean): F[Option[RJob]] = + for { + job <- JobFactory.integrityCheck(FileIntegrityCheckArgs(part)) + flag <- queue.insertIfNew(job) + _ <- if (notifyJoex) joex.notifyAllNodes else ().pure[F] + } yield Option.when(flag)(job) + + def checkIntegrity( + key: FileKey, + hash: Option[ByteVector] + ): F[Option[IntegrityResult]] = + (for { + _ <- OptionT.liftF( + logger.debugWith(s"Checking file $key")(_.data("fileKey", key)) + ) + expectedHash <- + hash.fold(OptionT(store.fileRepo.findMeta(key)).map(_.checksum))(h => + OptionT.pure[F](h) + ) + + actualHash <- + OptionT.liftF( + logger.debugWith(s"Calculating new hash for $key")( + _.data("fileKey", key) + ) *> + store.fileRepo + .getBytes(key) + .through(fs2.hash.sha256) + .compile + .foldChunks(ByteVector.empty)(_ ++ _.toByteVector) + ) + res = IntegrityResult(expectedHash == actualHash, key) + _ <- OptionT.liftF { + if (res.ok) logger.debug(s"File hashes match for $key") + else logger.warnWith(s"File hashes differ for: $key")(_.data("fileKey", key)) + } + } yield res).value }) } diff --git a/modules/common/src/main/scala/docspell/common/FileIntegrityCheckArgs.scala b/modules/common/src/main/scala/docspell/common/FileIntegrityCheckArgs.scala new file mode 100644 index 00000000..671596f3 --- /dev/null +++ b/modules/common/src/main/scala/docspell/common/FileIntegrityCheckArgs.scala @@ -0,0 +1,22 @@ +/* + * Copyright 2020 Eike K. & Contributors + * + * SPDX-License-Identifier: AGPL-3.0-or-later + */ + +package docspell.common + +import io.circe.generic.semiauto.{deriveDecoder, deriveEncoder} +import io.circe.{Decoder, Encoder} + +final case class FileIntegrityCheckArgs(pattern: FileKeyPart) {} + +object FileIntegrityCheckArgs { + val taskName: Ident = Ident.unsafe("all-file-integrity-check") + + implicit val jsonDecoder: Decoder[FileIntegrityCheckArgs] = + deriveDecoder + + implicit val jsonEncoder: Encoder[FileIntegrityCheckArgs] = + deriveEncoder +} diff --git a/modules/common/src/main/scala/docspell/common/FileKey.scala b/modules/common/src/main/scala/docspell/common/FileKey.scala index 17dc009e..46eb8409 100644 --- a/modules/common/src/main/scala/docspell/common/FileKey.scala +++ b/modules/common/src/main/scala/docspell/common/FileKey.scala @@ -9,7 +9,10 @@ package docspell.common import io.circe.generic.semiauto.{deriveDecoder, deriveEncoder} import io.circe.{Decoder, Encoder} -case class FileKey(collective: Ident, category: FileCategory, id: Ident) +final case class FileKey(collective: Ident, category: FileCategory, id: Ident) { + override def toString = + s"${collective.id}/${category.id.id}/${id.id}" +} object FileKey { diff --git a/modules/common/src/main/scala/docspell/common/FileKeyPart.scala b/modules/common/src/main/scala/docspell/common/FileKeyPart.scala new file mode 100644 index 00000000..fab01ab0 --- /dev/null +++ b/modules/common/src/main/scala/docspell/common/FileKeyPart.scala @@ -0,0 +1,53 @@ +/* + * Copyright 2020 Eike K. & Contributors + * + * SPDX-License-Identifier: AGPL-3.0-or-later + */ + +package docspell.common + +import cats.implicits._ + +import io.circe.syntax._ +import io.circe.{Decoder, DecodingFailure, Encoder} + +sealed trait FileKeyPart {} + +object FileKeyPart { + + case object Empty extends FileKeyPart + + final case class Collective(collective: Ident) extends FileKeyPart + + final case class Category(collective: Ident, category: FileCategory) extends FileKeyPart + + final case class Key(key: FileKey) extends FileKeyPart + + implicit val jsonEncoder: Encoder[FileKeyPart] = + Encoder.instance { + case Empty => ().asJson + case Collective(cid) => + Map("collective" -> cid.asJson).asJson + case Category(cid, cat) => + Map("collective" -> cid.asJson, "category" -> cat.asJson).asJson + case Key(key) => + key.asJson + } + + implicit val jsonDecoder: Decoder[FileKeyPart] = + Decoder.instance { cursor => + for { + cid <- cursor.getOrElse[Option[Ident]]("collective")(None) + cat <- cursor.getOrElse[Option[FileCategory]]("category")(None) + emptyObj = cursor.keys.exists(_.isEmpty) + + c3 = cursor.as[FileKey].map(Key).toOption + c2 = (cid, cat).mapN(Category) + c1 = cid.map(Collective) + c0 = Option.when(emptyObj)(Empty) + + c = c3.orElse(c2).orElse(c1).orElse(c0) + res <- c.toRight(DecodingFailure("", cursor.history)) + } yield res + } +} diff --git a/modules/joex/src/main/scala/docspell/joex/JoexAppImpl.scala b/modules/joex/src/main/scala/docspell/joex/JoexAppImpl.scala index 7881a8f4..fc668f0f 100644 --- a/modules/joex/src/main/scala/docspell/joex/JoexAppImpl.scala +++ b/modules/joex/src/main/scala/docspell/joex/JoexAppImpl.scala @@ -20,7 +20,7 @@ import docspell.ftsclient.FtsClient import docspell.ftssolr.SolrFtsClient import docspell.joex.analysis.RegexNerFile import docspell.joex.emptytrash._ -import docspell.joex.filecopy.FileCopyTask +import docspell.joex.filecopy.{FileCopyTask, FileIntegrityCheckTask} import docspell.joex.fts.{MigrationTask, ReIndexTask} import docspell.joex.hk._ import docspell.joex.learn.LearnClassifierTask @@ -151,6 +151,7 @@ object JoexAppImpl extends MailAddressCodec { regexNer <- RegexNerFile(cfg.textAnalysis.regexNerFileConfig, store) updateCheck <- UpdateCheck.resource(httpClient) notification <- ONotification(store, notificationMod) + fileRepo <- OFileRepository(store, queue, joex) sch <- SchedulerBuilder(cfg.scheduler, store) .withQueue(queue) .withPubSub(pubSubT) @@ -287,6 +288,13 @@ object JoexAppImpl extends MailAddressCodec { FileCopyTask.onCancel[F] ) ) + .withTask( + JobTask.json( + FileIntegrityCheckArgs.taskName, + FileIntegrityCheckTask[F](fileRepo), + FileIntegrityCheckTask.onCancel[F] + ) + ) .resource psch <- PeriodicScheduler.create( cfg.periodicScheduler, diff --git a/modules/joex/src/main/scala/docspell/joex/filecopy/FileIntegrityCheckTask.scala b/modules/joex/src/main/scala/docspell/joex/filecopy/FileIntegrityCheckTask.scala new file mode 100644 index 00000000..17ee1386 --- /dev/null +++ b/modules/joex/src/main/scala/docspell/joex/filecopy/FileIntegrityCheckTask.scala @@ -0,0 +1,71 @@ +/* + * Copyright 2020 Eike K. & Contributors + * + * SPDX-License-Identifier: AGPL-3.0-or-later + */ + +package docspell.joex.filecopy + +import cats.Monoid +import cats.effect._ +import cats.implicits._ + +import docspell.backend.ops.OFileRepository +import docspell.backend.ops.OFileRepository.IntegrityResult +import docspell.common.{FileIntegrityCheckArgs, FileKey} +import docspell.joex.scheduler.Task +import docspell.store.records.RFileMeta + +import io.circe.Encoder +import io.circe.generic.semiauto.deriveEncoder + +object FileIntegrityCheckTask { + type Args = FileIntegrityCheckArgs + + case class Result(ok: Int, failedKeys: Set[FileKey]) { + override def toString: String = + s"Result(ok=$ok, failed=${failedKeys.size}, keysFailed=$failedKeys)" + } + object Result { + val empty = Result(0, Set.empty) + + def from(r: IntegrityResult): Result = + if (r.ok) Result(1, Set.empty) else Result(0, Set(r.key)) + + implicit val monoid: Monoid[Result] = + Monoid.instance(empty, (a, b) => Result(a.ok + b.ok, a.failedKeys ++ b.failedKeys)) + + implicit val jsonEncoder: Encoder[Result] = + deriveEncoder + } + + def apply[F[_]: Sync](ops: OFileRepository[F]): Task[F, Args, Result] = + Task { ctx => + ctx.store + .transact( + RFileMeta + .findAll(ctx.args.pattern, 50) + ) + .chunks + .evalTap(c => ctx.logger.info(s"Checking next ${c.size} files…")) + .unchunks + .evalMap(meta => ops.checkIntegrity(meta.id, meta.checksum.some)) + .evalMap { + case Some(r) => + Result.from(r).pure[F] + case None => + ctx.logger.error(s"File not found").as(Result.empty) + } + .foldMonoid + .compile + .lastOrError + .flatTap(result => + ctx.logger + .infoWith(s"File check result: $result")(_.data("integrityCheck", result)) + ) + } + + def onCancel[F[_]]: Task[F, Args, Unit] = + Task.log(_.warn(s"Cancelling ${FileIntegrityCheckArgs.taskName.id} task")) + +} diff --git a/modules/restserver/src/main/scala/docspell/restserver/routes/FileRepositoryRoutes.scala b/modules/restserver/src/main/scala/docspell/restserver/routes/FileRepositoryRoutes.scala index 7a608283..e35c020c 100644 --- a/modules/restserver/src/main/scala/docspell/restserver/routes/FileRepositoryRoutes.scala +++ b/modules/restserver/src/main/scala/docspell/restserver/routes/FileRepositoryRoutes.scala @@ -11,8 +11,8 @@ import cats.effect._ import cats.implicits._ import docspell.backend.BackendApp -import docspell.common.FileCopyTaskArgs import docspell.common.FileCopyTaskArgs.Selection +import docspell.common.{FileCopyTaskArgs, FileIntegrityCheckArgs, FileKeyPart} import docspell.restapi.model._ import org.http4s._ @@ -27,20 +27,35 @@ object FileRepositoryRoutes { import dsl._ val logger = docspell.logging.getLogger[F] - HttpRoutes.of { case req @ POST -> Root / "cloneFileRepository" => - for { - input <- req.as[FileRepositoryCloneRequest] - args = makeTaskArgs(input) - job <- backend.fileRepository.cloneFileRepository(args, true) - result = BasicResult( - job.isDefined, - job.fold(s"Job for '${FileCopyTaskArgs.taskName.id}' already running")(j => - s"Job for '${FileCopyTaskArgs.taskName.id}' submitted: ${j.id.id}" + HttpRoutes.of { + case req @ POST -> Root / "cloneFileRepository" => + for { + input <- req.as[FileRepositoryCloneRequest] + args = makeTaskArgs(input) + job <- backend.fileRepository.cloneFileRepository(args, true) + result = BasicResult( + job.isDefined, + job.fold(s"Job for '${FileCopyTaskArgs.taskName.id}' already running")(j => + s"Job for '${FileCopyTaskArgs.taskName.id}' submitted: ${j.id.id}" + ) ) - ) - _ <- logger.info(result.message) - resp <- Ok(result) - } yield resp + _ <- logger.info(result.message) + resp <- Ok(result) + } yield resp + + case req @ POST -> Root / "integrityCheckAll" => + for { + input <- req.as[FileKeyPart] + job <- backend.fileRepository.checkIntegrityAll(input, true) + result = BasicResult( + job.isDefined, + job.fold(s"Job for '${FileCopyTaskArgs.taskName.id}' already running")(j => + s"Job for '${FileIntegrityCheckArgs.taskName.id}' submitted: ${j.id.id}" + ) + ) + _ <- logger.info(result.message) + resp <- Ok(result) + } yield resp } } diff --git a/modules/store/src/main/scala/docspell/store/file/BinnyUtils.scala b/modules/store/src/main/scala/docspell/store/file/BinnyUtils.scala index a9f309e7..e87d9d3d 100644 --- a/modules/store/src/main/scala/docspell/store/file/BinnyUtils.scala +++ b/modules/store/src/main/scala/docspell/store/file/BinnyUtils.scala @@ -26,6 +26,14 @@ object BinnyUtils { def fileKeyToBinaryId(fk: FileKey): BinaryId = BinaryId(s"${fk.collective.id}/${fk.category.id.id}/${fk.id.id}") + def fileKeyPartToPrefix(fkp: FileKeyPart): Option[String] = + fkp match { + case FileKeyPart.Empty => None + case FileKeyPart.Collective(cid) => Some(s"${cid.id}/%") + case FileKeyPart.Category(cid, cat) => Some(s"${cid.id}/${cat.id.id}/%") + case FileKeyPart.Key(key) => Some(fileKeyToBinaryId(key).id) + } + def binaryIdToFileKey(bid: BinaryId): Either[String, FileKey] = bid.id.split('/').toList match { case cId :: catId :: fId :: Nil => diff --git a/modules/store/src/main/scala/docspell/store/records/RFileMeta.scala b/modules/store/src/main/scala/docspell/store/records/RFileMeta.scala index 8263e875..fb26272d 100644 --- a/modules/store/src/main/scala/docspell/store/records/RFileMeta.scala +++ b/modules/store/src/main/scala/docspell/store/records/RFileMeta.scala @@ -8,8 +8,10 @@ package docspell.store.records import cats.data.NonEmptyList import cats.implicits._ +import fs2.Stream import docspell.common.{FileKey, _} +import docspell.store.file.BinnyUtils import docspell.store.qb.DSL._ import docspell.store.qb._ @@ -44,6 +46,18 @@ object RFileMeta { def as(alias: String): Table = Table(Some(alias)) + def findAll(part: FileKeyPart, chunkSize: Int): Stream[ConnectionIO, RFileMeta] = { + val cond = BinnyUtils + .fileKeyPartToPrefix(part) + .map(prefix => T.id.cast[String].like(prefix)) + + Select( + select(T.all), + from(T), + cond.getOrElse(Condition.unit) + ).build.query[RFileMeta].streamWithChunkSize(chunkSize) + } + def insert(r: RFileMeta): ConnectionIO[Int] = DML.insert(T, T.all, fr"${r.id},${r.created},${r.mimetype},${r.length},${r.checksum}") diff --git a/modules/store/src/test/scala/docspell/store/StoreFixture.scala b/modules/store/src/test/scala/docspell/store/StoreFixture.scala index a59871e3..2933b51f 100644 --- a/modules/store/src/test/scala/docspell/store/StoreFixture.scala +++ b/modules/store/src/test/scala/docspell/store/StoreFixture.scala @@ -68,7 +68,7 @@ object StoreFixture { ds <- dataSource(jdbc) xa <- makeXA(ds) cfg = FileRepositoryConfig.Database(64 * 1024) - fr = FileRepository[IO](xa, ds, cfg) + fr = FileRepository[IO](xa, ds, cfg, true) store = new StoreImpl[IO](fr, jdbc, ds, xa) _ <- Resource.eval(store.migrate) } yield store