From 422c0905dc20e8c0e389c2df255a8927598607ed Mon Sep 17 00:00:00 2001 From: eikek Date: Tue, 8 Mar 2022 00:11:40 +0100 Subject: [PATCH] Add task to copy files --- .../scala/docspell/backend/BackendApp.scala | 3 + .../main/scala/docspell/backend/Config.scala | 14 +- .../scala/docspell/backend/JobFactory.scala | 20 +++ .../backend/ops/OFileRepository.scala | 40 ++++++ .../main/scala/docspell/common/Banner.scala | 4 +- .../docspell/common/DocspellSystem.scala | 2 + .../docspell/common/FileCopyTaskArgs.scala | 56 ++++++++ .../docspell/common/FileStoreConfig.scala | 3 + .../main/scala/docspell/joex/ConfigFile.scala | 3 + .../scala/docspell/joex/JoexAppImpl.scala | 8 ++ .../main/scala/docspell/joex/JoexServer.scala | 2 +- .../src/main/scala/docspell/joex/Main.scala | 3 +- .../docspell/joex/filecopy/FileCopyTask.scala | 133 ++++++++++++++++++ .../logging/impl/ScribeConfigure.scala | 6 +- .../src/main/resources/docspell-openapi.yml | 41 ++++++ .../main/scala/docspell/restserver/Main.scala | 3 +- .../scala/docspell/restserver/RestApp.scala | 2 + .../docspell/restserver/RestAppImpl.scala | 5 +- .../docspell/restserver/RestServer.scala | 7 +- .../routes/FileRepositoryRoutes.scala | 54 +++++++ .../src/main/scala/docspell/store/Store.scala | 9 +- .../docspell/store/file/AttributeStore.scala | 104 +++++++++----- .../docspell/store/file/BinnyUtils.scala | 2 +- .../docspell/store/file/FileRepository.scala | 24 +++- .../store/file/FileRepositoryConfig.scala | 11 ++ .../store/file/FileRepositoryImpl.scala | 5 +- .../scala/docspell/store/impl/StoreImpl.scala | 11 +- .../scala/docspell/store/StoreFixture.scala | 2 +- 28 files changed, 512 insertions(+), 65 deletions(-) create mode 100644 modules/backend/src/main/scala/docspell/backend/ops/OFileRepository.scala create mode 100644 modules/common/src/main/scala/docspell/common/FileCopyTaskArgs.scala create mode 100644 modules/joex/src/main/scala/docspell/joex/filecopy/FileCopyTask.scala create mode 100644 modules/restserver/src/main/scala/docspell/restserver/routes/FileRepositoryRoutes.scala diff --git a/modules/backend/src/main/scala/docspell/backend/BackendApp.scala b/modules/backend/src/main/scala/docspell/backend/BackendApp.scala index fd217fb5..9c24f29c 100644 --- a/modules/backend/src/main/scala/docspell/backend/BackendApp.scala +++ b/modules/backend/src/main/scala/docspell/backend/BackendApp.scala @@ -50,6 +50,7 @@ trait BackendApp[F[_]] { def events: EventExchange[F] def notification: ONotification[F] def bookmarks: OQueryBookmarks[F] + def fileRepository: OFileRepository[F] } object BackendApp { @@ -91,6 +92,7 @@ object BackendApp { ) notifyImpl <- ONotification(store, notificationMod) bookmarksImpl <- OQueryBookmarks(store) + fileRepoImpl <- OFileRepository(queue, joexImpl) } yield new BackendApp[F] { val pubSub = pubSubT val login = loginImpl @@ -118,5 +120,6 @@ object BackendApp { val events = notificationMod val notification = notifyImpl val bookmarks = bookmarksImpl + val fileRepository = fileRepoImpl } } diff --git a/modules/backend/src/main/scala/docspell/backend/Config.scala b/modules/backend/src/main/scala/docspell/backend/Config.scala index e9cd3356..efccb8dc 100644 --- a/modules/backend/src/main/scala/docspell/backend/Config.scala +++ b/modules/backend/src/main/scala/docspell/backend/Config.scala @@ -42,15 +42,11 @@ object Config { def defaultStoreConfig: FileStoreConfig = enabledStores(defaultStore) - def toFileRepositoryConfig: FileRepositoryConfig = - defaultStoreConfig match { - case FileStoreConfig.DefaultDatabase(_) => - FileRepositoryConfig.Database(chunkSize) - case FileStoreConfig.S3(_, endpoint, accessKey, secretKey, bucket) => - FileRepositoryConfig.S3(endpoint, accessKey, secretKey, bucket, chunkSize) - case FileStoreConfig.FileSystem(_, directory) => - FileRepositoryConfig.Directory(directory, chunkSize) - } + def defaultFileRepositoryConfig: FileRepositoryConfig = + FileRepositoryConfig.fromFileStoreConfig(chunkSize, defaultStoreConfig) + + def getFileRepositoryConfig(id: Ident): Option[FileRepositoryConfig] = + stores.get(id).map(FileRepositoryConfig.fromFileStoreConfig(chunkSize, _)) def validate: ValidatedNec[String, Files] = { val storesEmpty = diff --git a/modules/backend/src/main/scala/docspell/backend/JobFactory.scala b/modules/backend/src/main/scala/docspell/backend/JobFactory.scala index 05320b31..3f43b280 100644 --- a/modules/backend/src/main/scala/docspell/backend/JobFactory.scala +++ b/modules/backend/src/main/scala/docspell/backend/JobFactory.scala @@ -15,6 +15,26 @@ import docspell.notification.api.PeriodicQueryArgs import docspell.store.records.RJob object JobFactory extends MailAddressCodec { + def fileCopy[F[_]: Sync]( + args: FileCopyTaskArgs, + submitter: AccountId = DocspellSystem.account + ): F[RJob] = + for { + id <- Ident.randomId[F] + now <- Timestamp.current[F] + job = RJob.newJob( + id, + FileCopyTaskArgs.taskName, + submitter.collective, + args, + s"Copying all files", + now, + submitter.user, + Priority.High, + Some(FileCopyTaskArgs.taskName) + ) + } yield job + def periodicQuery[F[_]: Sync](args: PeriodicQueryArgs, submitter: AccountId): F[RJob] = for { id <- Ident.randomId[F] diff --git a/modules/backend/src/main/scala/docspell/backend/ops/OFileRepository.scala b/modules/backend/src/main/scala/docspell/backend/ops/OFileRepository.scala new file mode 100644 index 00000000..270ab66e --- /dev/null +++ b/modules/backend/src/main/scala/docspell/backend/ops/OFileRepository.scala @@ -0,0 +1,40 @@ +/* + * Copyright 2020 Eike K. & Contributors + * + * SPDX-License-Identifier: AGPL-3.0-or-later + */ + +package docspell.backend.ops + +import cats.effect._ +import cats.implicits._ + +import docspell.backend.JobFactory +import docspell.common.FileCopyTaskArgs +import docspell.store.queue.JobQueue +import docspell.store.records.RJob + +trait OFileRepository[F[_]] { + + /** Inserts the job or return None if such a job already is running. */ + def cloneFileRepository(args: FileCopyTaskArgs, notifyJoex: Boolean): F[Option[RJob]] +} + +object OFileRepository { + + def apply[F[_]: Async]( + queue: JobQueue[F], + joex: OJoex[F] + ): Resource[F, OFileRepository[F]] = + Resource.pure(new OFileRepository[F] { + def cloneFileRepository( + args: FileCopyTaskArgs, + notifyJoex: Boolean + ): F[Option[RJob]] = + for { + job <- JobFactory.fileCopy(args) + flag <- queue.insertIfNew(job) + _ <- if (notifyJoex) joex.notifyAllNodes else ().pure[F] + } yield Option.when(flag)(job) + }) +} diff --git a/modules/common/src/main/scala/docspell/common/Banner.scala b/modules/common/src/main/scala/docspell/common/Banner.scala index 169c67cc..21a7f299 100644 --- a/modules/common/src/main/scala/docspell/common/Banner.scala +++ b/modules/common/src/main/scala/docspell/common/Banner.scala @@ -14,7 +14,8 @@ case class Banner( configFile: Option[String], appId: Ident, baseUrl: LenientUri, - ftsUrl: Option[LenientUri] + ftsUrl: Option[LenientUri], + fileStoreConfig: FileStoreConfig ) { private val banner = @@ -36,6 +37,7 @@ case class Banner( s"Database: ${jdbcUrl.asString}", s"Fts: ${ftsUrl.map(_.asString).getOrElse("-")}", s"Config: ${configFile.getOrElse("")}", + s"FileRepo: ${fileStoreConfig}", "" ) diff --git a/modules/common/src/main/scala/docspell/common/DocspellSystem.scala b/modules/common/src/main/scala/docspell/common/DocspellSystem.scala index 4ecfff2a..37319864 100644 --- a/modules/common/src/main/scala/docspell/common/DocspellSystem.scala +++ b/modules/common/src/main/scala/docspell/common/DocspellSystem.scala @@ -10,6 +10,8 @@ object DocspellSystem { val user = Ident.unsafe("docspell-system") val taskGroup = user + val account: AccountId = AccountId(taskGroup, user) + val migrationTaskTracker = Ident.unsafe("full-text-index-tracker") val allPreviewTaskTracker = Ident.unsafe("generate-all-previews") val allPageCountTaskTracker = Ident.unsafe("all-page-count-tracker") diff --git a/modules/common/src/main/scala/docspell/common/FileCopyTaskArgs.scala b/modules/common/src/main/scala/docspell/common/FileCopyTaskArgs.scala new file mode 100644 index 00000000..5026a775 --- /dev/null +++ b/modules/common/src/main/scala/docspell/common/FileCopyTaskArgs.scala @@ -0,0 +1,56 @@ +/* + * Copyright 2020 Eike K. & Contributors + * + * SPDX-License-Identifier: AGPL-3.0-or-later + */ + +package docspell.common + +import cats.data.NonEmptyList + +import docspell.common.FileCopyTaskArgs.Selection + +import io.circe.generic.semiauto.{deriveDecoder, deriveEncoder} +import io.circe.syntax._ +import io.circe.{Decoder, Encoder} + +/** This is the input to the `FileCopyTask`. The task copies all files from on + * FileRepository to one ore more target repositories. + * + * If no `from` is given, the default file repository is used. For targets, a list of ids + * can be specified that must match a configured file store in the config file. When + * selecting "all", it means all enabled stores. + */ +final case class FileCopyTaskArgs(from: Option[Ident], to: Selection) + +object FileCopyTaskArgs { + val taskName = Ident.unsafe("copy-file-repositories") + + sealed trait Selection + + object Selection { + + case object All extends Selection + case class Stores(ids: NonEmptyList[Ident]) extends Selection + + implicit val jsonEncoder: Encoder[Selection] = + Encoder.instance { + case All => "!all".asJson + case Stores(ids) => ids.toList.asJson + } + + implicit val jsonDecoder: Decoder[Selection] = + Decoder.instance { cursor => + cursor.value.asString match { + case Some(s) if s.equalsIgnoreCase("!all") => Right(All) + case _ => cursor.value.as[NonEmptyList[Ident]].map(Stores.apply) + } + } + } + + implicit val jsonDecoder: Decoder[FileCopyTaskArgs] = + deriveDecoder + + implicit val jsonEncoder: Encoder[FileCopyTaskArgs] = + deriveEncoder +} diff --git a/modules/common/src/main/scala/docspell/common/FileStoreConfig.scala b/modules/common/src/main/scala/docspell/common/FileStoreConfig.scala index 80652217..cb9afa9d 100644 --- a/modules/common/src/main/scala/docspell/common/FileStoreConfig.scala +++ b/modules/common/src/main/scala/docspell/common/FileStoreConfig.scala @@ -32,5 +32,8 @@ object FileStoreConfig { bucket: String ) extends FileStoreConfig { val storeType = FileStoreType.S3 + + override def toString = + s"S3(enabled=$enabled, endpoint=$endpoint, bucket=$bucket, accessKey=$accessKey, secretKey=***)" } } diff --git a/modules/joex/src/main/scala/docspell/joex/ConfigFile.scala b/modules/joex/src/main/scala/docspell/joex/ConfigFile.scala index 32049b16..a17d94a3 100644 --- a/modules/joex/src/main/scala/docspell/joex/ConfigFile.scala +++ b/modules/joex/src/main/scala/docspell/joex/ConfigFile.scala @@ -19,6 +19,9 @@ import pureconfig.generic.auto._ import yamusca.imports._ object ConfigFile { + // IntelliJ is wrong, this is required + import Implicits._ + def loadConfig[F[_]: Async](args: List[String]): F[Config] = { val logger = docspell.logging.getLogger[F] ConfigFactory diff --git a/modules/joex/src/main/scala/docspell/joex/JoexAppImpl.scala b/modules/joex/src/main/scala/docspell/joex/JoexAppImpl.scala index ce28e8d3..7881a8f4 100644 --- a/modules/joex/src/main/scala/docspell/joex/JoexAppImpl.scala +++ b/modules/joex/src/main/scala/docspell/joex/JoexAppImpl.scala @@ -20,6 +20,7 @@ import docspell.ftsclient.FtsClient import docspell.ftssolr.SolrFtsClient import docspell.joex.analysis.RegexNerFile import docspell.joex.emptytrash._ +import docspell.joex.filecopy.FileCopyTask import docspell.joex.fts.{MigrationTask, ReIndexTask} import docspell.joex.hk._ import docspell.joex.learn.LearnClassifierTask @@ -279,6 +280,13 @@ object JoexAppImpl extends MailAddressCodec { PeriodicDueItemsTask.onCancel[F] ) ) + .withTask( + JobTask.json( + FileCopyTaskArgs.taskName, + FileCopyTask[F](cfg), + FileCopyTask.onCancel[F] + ) + ) .resource psch <- PeriodicScheduler.create( cfg.periodicScheduler, diff --git a/modules/joex/src/main/scala/docspell/joex/JoexServer.scala b/modules/joex/src/main/scala/docspell/joex/JoexServer.scala index 29b7b9e8..a13d4b1f 100644 --- a/modules/joex/src/main/scala/docspell/joex/JoexServer.scala +++ b/modules/joex/src/main/scala/docspell/joex/JoexServer.scala @@ -41,7 +41,7 @@ object JoexServer { store <- Store.create[F]( cfg.jdbc, - cfg.files.toFileRepositoryConfig, + cfg.files.defaultFileRepositoryConfig, pools.connectEC ) settings <- Resource.eval(store.transact(RInternalSetting.create)) diff --git a/modules/joex/src/main/scala/docspell/joex/Main.scala b/modules/joex/src/main/scala/docspell/joex/Main.scala index 7866d6d1..a7607a5f 100644 --- a/modules/joex/src/main/scala/docspell/joex/Main.scala +++ b/modules/joex/src/main/scala/docspell/joex/Main.scala @@ -31,7 +31,8 @@ object Main extends IOApp { Option(System.getProperty("config.file")), cfg.appId, cfg.baseUrl, - Some(cfg.fullTextSearch.solr.url).filter(_ => cfg.fullTextSearch.enabled) + Some(cfg.fullTextSearch.solr.url).filter(_ => cfg.fullTextSearch.enabled), + cfg.files.defaultStoreConfig ) _ <- logger.info(s"\n${banner.render("***>")}") _ <- diff --git a/modules/joex/src/main/scala/docspell/joex/filecopy/FileCopyTask.scala b/modules/joex/src/main/scala/docspell/joex/filecopy/FileCopyTask.scala new file mode 100644 index 00000000..986c094b --- /dev/null +++ b/modules/joex/src/main/scala/docspell/joex/filecopy/FileCopyTask.scala @@ -0,0 +1,133 @@ +/* + * Copyright 2020 Eike K. & Contributors + * + * SPDX-License-Identifier: AGPL-3.0-or-later + */ + +package docspell.joex.filecopy + +import cats.data.NonEmptyList +import cats.effect._ +import cats.implicits._ + +import docspell.common.FileCopyTaskArgs.Selection +import docspell.common.{FileCopyTaskArgs, Ident} +import docspell.joex.Config +import docspell.joex.scheduler.Task +import docspell.logging.Logger +import docspell.store.file.{BinnyUtils, FileRepository, FileRepositoryConfig} + +import binny.CopyTool.Counter +import binny.{BinaryId, BinaryStore, CopyTool} +import io.circe.generic.semiauto.deriveCodec +import io.circe.{Codec, Decoder, Encoder} + +object FileCopyTask { + type Args = FileCopyTaskArgs + + case class CopyResult(success: Boolean, message: String, counter: List[Counter]) + object CopyResult { + def noSourceImpl: CopyResult = + CopyResult(false, "No source BinaryStore implementation found!", Nil) + + def noTargetImpl: CopyResult = + CopyResult(false, "No target BinaryStore implementation found!", Nil) + + def noSourceStore(id: Ident): CopyResult = + CopyResult( + false, + s"No source file repo found with id: ${id.id}. Make sure it is present in the config.", + Nil + ) + + def noTargetStore: CopyResult = + CopyResult(false, "No target file repositories defined", Nil) + + def success(counter: NonEmptyList[Counter]): CopyResult = + CopyResult(true, "Done", counter.toList) + + implicit val binaryIdCodec: Codec[BinaryId] = + Codec.from( + Decoder.decodeString.map(BinaryId.apply), + Encoder.encodeString.contramap(_.id) + ) + + implicit val counterEncoder: Codec[Counter] = + deriveCodec + implicit val jsonCodec: Codec[CopyResult] = + deriveCodec + } + + def onCancel[F[_]]: Task[F, Args, Unit] = + Task.log(_.warn(s"Cancelling ${FileCopyTaskArgs.taskName.id} task")) + + def apply[F[_]: Async](cfg: Config): Task[F, Args, CopyResult] = + Task { ctx => + val src = ctx.args.from + .map(id => + cfg.files.getFileRepositoryConfig(id).toRight(CopyResult.noSourceStore(id)) + ) + .getOrElse(Right(cfg.files.defaultFileRepositoryConfig)) + + val targets = ctx.args.to match { + case Selection.All => + cfg.files.enabledStores.values.toList + .map(FileRepositoryConfig.fromFileStoreConfig(cfg.files.chunkSize, _)) + case Selection.Stores(ids) => + ids.traverse(cfg.files.getFileRepositoryConfig).map(_.toList).getOrElse(Nil) + } + + // remove source from targets if present there + val data = + for { + srcConfig <- src + trgConfig <- NonEmptyList + .fromList(targets.filter(_ != srcConfig)) + .toRight(CopyResult.noTargetStore) + + srcRepo = ctx.store.createFileRepository(srcConfig, true) + targetRepos = trgConfig.map(ctx.store.createFileRepository(_, false)) + } yield (srcRepo, targetRepos) + + data match { + case Right((from, tos)) => + ctx.logger.info(s"Start copying all files from ") *> + copy(ctx.logger, from, tos).flatTap(r => + if (r.success) ctx.logger.info(s"Copying finished: ${r.counter}") + else ctx.logger.error(s"Copying failed: $r") + ) + + case Left(res) => + ctx.logger.error(s"Copying failed: $res") *> res.pure[F] + } + } + + def copy[F[_]: Async]( + logger: Logger[F], + from: FileRepository[F], + to: NonEmptyList[FileRepository[F]] + ): F[CopyResult] = + FileRepository.getDelegate(from) match { + case None => + CopyResult.noSourceImpl.pure[F] + + case Some((src, srcMeta)) => + to.traverse(FileRepository.getDelegate).map(_.map(_._1)) match { + case None => + CopyResult.noTargetImpl.pure[F] + + case Some(targets) => + val log = BinnyUtils.LoggerAdapter(logger) + val maxConcurrent = { + val nCores = Runtime.getRuntime.availableProcessors() + if (nCores > 2) nCores / 2 else 1 + } + + def copyTo(to: BinaryStore[F]) = + CopyTool.copyAll[F](log, src, srcMeta, to, 50, maxConcurrent) + + logger.info(s"Start copying ${from.config} -> ${to.map(_.config)}") *> + targets.traverse(copyTo).map(CopyResult.success) + } + } +} diff --git a/modules/logging/scribe/src/main/scala/docspell/logging/impl/ScribeConfigure.scala b/modules/logging/scribe/src/main/scala/docspell/logging/impl/ScribeConfigure.scala index 6bf3ad8d..81e6e662 100644 --- a/modules/logging/scribe/src/main/scala/docspell/logging/impl/ScribeConfigure.scala +++ b/modules/logging/scribe/src/main/scala/docspell/logging/impl/ScribeConfigure.scala @@ -22,10 +22,12 @@ object ScribeConfigure { Sync[F].delay { replaceJUL() val docspellLogger = scribe.Logger("docspell") - val flywayLogger = scribe.Logger("org.flywaydb") + unsafeConfigure(scribe.Logger.root, cfg.copy(minimumLevel = getRootMinimumLevel)) unsafeConfigure(docspellLogger, cfg) - unsafeConfigure(flywayLogger, cfg) + unsafeConfigure(scribe.Logger("org.flywaydb"), cfg) + unsafeConfigure(scribe.Logger("binny"), cfg) + unsafeConfigure(scribe.Logger("org.http4s"), cfg) } private[this] def getRootMinimumLevel: Level = diff --git a/modules/restapi/src/main/resources/docspell-openapi.yml b/modules/restapi/src/main/resources/docspell-openapi.yml index 50dc715d..14cc07e2 100644 --- a/modules/restapi/src/main/resources/docspell-openapi.yml +++ b/modules/restapi/src/main/resources/docspell-openapi.yml @@ -2487,6 +2487,35 @@ paths: schema: $ref: "#/components/schemas/BasicResult" + /admin/files/cloneFileRepository: + post: + operationId: "admin-files-cloneFileRepository" + tags: [Admin] + summary: Copy all files into a new repository + description: | + Submits a task that will copy all files of the application + (from the default file repository) into another file + repository as specified in the request. The request may define + ids of file repository configurations that must be present in + the config file. An empty list means to copy to all enabled + file repositories from te default file repository. + security: + - adminHeader: [] + requestBody: + content: + application/json: + schema: + $ref: "#/components/schemas/FileRepositoryCloneRequest" + responses: + 422: + description: BadRequest + 200: + description: Ok + content: + application/json: + schema: + $ref: "#/components/schemas/BasicResult" + /sec/source: get: operationId: "sec-source-get-all" @@ -5433,6 +5462,18 @@ paths: components: schemas: + FileRepositoryCloneRequest: + description: | + Clone the file repository to a new location. + required: + - targetRepositories + properties: + targetRepositories: + type: array + items: + type: string + format: ident + BookmarkedQuery: description: | A query bookmark. diff --git a/modules/restserver/src/main/scala/docspell/restserver/Main.scala b/modules/restserver/src/main/scala/docspell/restserver/Main.scala index c8a6a003..106052a1 100644 --- a/modules/restserver/src/main/scala/docspell/restserver/Main.scala +++ b/modules/restserver/src/main/scala/docspell/restserver/Main.scala @@ -28,7 +28,8 @@ object Main extends IOApp { Option(System.getProperty("config.file")), cfg.appId, cfg.baseUrl, - Some(cfg.fullTextSearch.solr.url).filter(_ => cfg.fullTextSearch.enabled) + Some(cfg.fullTextSearch.solr.url).filter(_ => cfg.fullTextSearch.enabled), + cfg.backend.files.defaultStoreConfig ) _ <- logger.info(s"\n${banner.render("***>")}") _ <- diff --git a/modules/restserver/src/main/scala/docspell/restserver/RestApp.scala b/modules/restserver/src/main/scala/docspell/restserver/RestApp.scala index 9ad2f34e..b7288f60 100644 --- a/modules/restserver/src/main/scala/docspell/restserver/RestApp.scala +++ b/modules/restserver/src/main/scala/docspell/restserver/RestApp.scala @@ -7,7 +7,9 @@ package docspell.restserver import fs2.Stream + import docspell.backend.BackendApp + import org.http4s.HttpRoutes import org.http4s.server.websocket.WebSocketBuilder2 diff --git a/modules/restserver/src/main/scala/docspell/restserver/RestAppImpl.scala b/modules/restserver/src/main/scala/docspell/restserver/RestAppImpl.scala index ea44065d..7ed9d2dc 100644 --- a/modules/restserver/src/main/scala/docspell/restserver/RestAppImpl.scala +++ b/modules/restserver/src/main/scala/docspell/restserver/RestAppImpl.scala @@ -9,6 +9,7 @@ package docspell.restserver import cats.effect._ import fs2.Stream import fs2.concurrent.Topic + import docspell.backend.BackendApp import docspell.backend.auth.{AuthToken, ShareToken} import docspell.ftsclient.FtsClient @@ -23,6 +24,7 @@ import docspell.restserver.routes._ import docspell.restserver.webapp.{TemplateRoutes, Templates, WebjarRoutes} import docspell.restserver.ws.{OutputEvent, WebSocketRoutes} import docspell.store.Store + import emil.javamail.JavaMailEmil import org.http4s.HttpRoutes import org.http4s.client.Client @@ -76,7 +78,8 @@ final class RestAppImpl[F[_]: Async]( "user/otp" -> TotpRoutes.admin(backend), "user" -> UserRoutes.admin(backend), "info" -> InfoRoutes.admin(config), - "attachments" -> AttachmentRoutes.admin(backend) + "attachments" -> AttachmentRoutes.admin(backend), + "files" -> FileRepositoryRoutes.admin(backend) ) def shareRoutes( diff --git a/modules/restserver/src/main/scala/docspell/restserver/RestServer.scala b/modules/restserver/src/main/scala/docspell/restserver/RestServer.scala index 4546fc34..a4d4fb6d 100644 --- a/modules/restserver/src/main/scala/docspell/restserver/RestServer.scala +++ b/modules/restserver/src/main/scala/docspell/restserver/RestServer.scala @@ -7,18 +7,21 @@ package docspell.restserver import scala.concurrent.duration._ + import cats.effect._ import cats.implicits._ import fs2.Stream import fs2.concurrent.Topic + import docspell.backend.msg.Topics import docspell.common._ import docspell.pubsub.naive.NaivePubSub import docspell.restserver.http4s.InternalHeader -import docspell.restserver.ws.OutputEvent.KeepAlive import docspell.restserver.ws.OutputEvent +import docspell.restserver.ws.OutputEvent.KeepAlive import docspell.store.Store import docspell.store.records.RInternalSetting + import org.http4s._ import org.http4s.blaze.client.BlazeClientBuilder import org.http4s.blaze.server.BlazeServerBuilder @@ -73,7 +76,7 @@ object RestServer { httpClient <- BlazeClientBuilder[F].resource store <- Store.create[F]( cfg.backend.jdbc, - cfg.backend.files.toFileRepositoryConfig, + cfg.backend.files.defaultFileRepositoryConfig, pools.connectEC ) setting <- Resource.eval(store.transact(RInternalSetting.create)) diff --git a/modules/restserver/src/main/scala/docspell/restserver/routes/FileRepositoryRoutes.scala b/modules/restserver/src/main/scala/docspell/restserver/routes/FileRepositoryRoutes.scala new file mode 100644 index 00000000..7a608283 --- /dev/null +++ b/modules/restserver/src/main/scala/docspell/restserver/routes/FileRepositoryRoutes.scala @@ -0,0 +1,54 @@ +/* + * Copyright 2020 Eike K. & Contributors + * + * SPDX-License-Identifier: AGPL-3.0-or-later + */ + +package docspell.restserver.routes + +import cats.data.NonEmptyList +import cats.effect._ +import cats.implicits._ + +import docspell.backend.BackendApp +import docspell.common.FileCopyTaskArgs +import docspell.common.FileCopyTaskArgs.Selection +import docspell.restapi.model._ + +import org.http4s._ +import org.http4s.circe.CirceEntityDecoder._ +import org.http4s.circe.CirceEntityEncoder._ +import org.http4s.dsl.Http4sDsl + +object FileRepositoryRoutes { + + def admin[F[_]: Async](backend: BackendApp[F]): HttpRoutes[F] = { + val dsl = Http4sDsl[F] + import dsl._ + val logger = docspell.logging.getLogger[F] + + HttpRoutes.of { case req @ POST -> Root / "cloneFileRepository" => + for { + input <- req.as[FileRepositoryCloneRequest] + args = makeTaskArgs(input) + job <- backend.fileRepository.cloneFileRepository(args, true) + result = BasicResult( + job.isDefined, + job.fold(s"Job for '${FileCopyTaskArgs.taskName.id}' already running")(j => + s"Job for '${FileCopyTaskArgs.taskName.id}' submitted: ${j.id.id}" + ) + ) + _ <- logger.info(result.message) + resp <- Ok(result) + } yield resp + } + } + + def makeTaskArgs(input: FileRepositoryCloneRequest): FileCopyTaskArgs = + NonEmptyList.fromList(input.targetRepositories) match { + case Some(nel) => + FileCopyTaskArgs(None, Selection.Stores(nel)) + case None => + FileCopyTaskArgs(None, Selection.All) + } +} diff --git a/modules/store/src/main/scala/docspell/store/Store.scala b/modules/store/src/main/scala/docspell/store/Store.scala index 24c80b98..b7f611e3 100644 --- a/modules/store/src/main/scala/docspell/store/Store.scala +++ b/modules/store/src/main/scala/docspell/store/Store.scala @@ -28,6 +28,11 @@ trait Store[F[_]] { def fileRepo: FileRepository[F] + def createFileRepository( + cfg: FileRepositoryConfig, + withAttributeStore: Boolean + ): FileRepository[F] + def add(insert: ConnectionIO[Int], exists: ConnectionIO[Boolean]): F[AddResult] } @@ -50,8 +55,8 @@ object Store { ds.setDriverClassName(jdbc.driverClass) } xa = HikariTransactor(ds, connectEC) - fr = FileRepository.apply(xa, ds, fileRepoConfig) - st = new StoreImpl[F](fr, jdbc, xa) + fr = FileRepository.apply(xa, ds, fileRepoConfig, true) + st = new StoreImpl[F](fr, jdbc, ds, xa) _ <- Resource.eval(st.migrate) } yield st } diff --git a/modules/store/src/main/scala/docspell/store/file/AttributeStore.scala b/modules/store/src/main/scala/docspell/store/file/AttributeStore.scala index 4814a9dc..127788b2 100644 --- a/modules/store/src/main/scala/docspell/store/file/AttributeStore.scala +++ b/modules/store/src/main/scala/docspell/store/file/AttributeStore.scala @@ -6,6 +6,7 @@ package docspell.store.file +import cats.Applicative import cats.data.OptionT import cats.effect._ import cats.implicits._ @@ -17,40 +18,71 @@ import binny._ import doobie._ import doobie.implicits._ -final private[file] class AttributeStore[F[_]: Sync](xa: Transactor[F]) - extends BinaryAttributeStore[F] { - def saveAttr(id: BinaryId, attrs: F[BinaryAttributes]): F[Unit] = - for { - now <- Timestamp.current[F] - a <- attrs - fileKey <- makeFileKey(id) - fm = RFileMeta( - fileKey, - now, - MimeType.parse(a.contentType.contentType).getOrElse(MimeType.octetStream), - ByteSize(a.length), - a.sha256 - ) - _ <- RFileMeta.insert(fm).transact(xa) - } yield () - - def deleteAttr(id: BinaryId): F[Boolean] = - makeFileKey(id).flatMap(fileKey => RFileMeta.delete(fileKey).transact(xa).map(_ > 0)) - - def findAttr(id: BinaryId): OptionT[F, BinaryAttributes] = - findMeta(id).map(fm => - BinaryAttributes( - fm.checksum, - SimpleContentType(fm.mimetype.asString), - fm.length.bytes - ) - ) - - def findMeta(id: BinaryId): OptionT[F, RFileMeta] = - OptionT(makeFileKey(id).flatMap(fileKey => RFileMeta.findById(fileKey).transact(xa))) - - private def makeFileKey(binaryId: BinaryId): F[FileKey] = - Sync[F] - .pure(BinnyUtils.binaryIdToFileKey(binaryId).left.map(new IllegalStateException(_))) - .rethrow +private[file] trait AttributeStore[F[_]] extends BinaryAttributeStore[F] { + def findMeta(id: BinaryId): OptionT[F, RFileMeta] +} + +private[file] object AttributeStore { + def empty[F[_]: Applicative]: AttributeStore[F] = + new AttributeStore[F] { + val delegate = BinaryAttributeStore.empty[F] + + def findMeta(id: BinaryId) = + OptionT.none + + def saveAttr(id: BinaryId, attrs: F[BinaryAttributes]) = + delegate.saveAttr(id, attrs) + + def deleteAttr(id: BinaryId) = + delegate.deleteAttr(id) + + def findAttr(id: BinaryId) = + delegate.findAttr(id) + } + + def apply[F[_]: Sync](xa: Transactor[F]): AttributeStore[F] = + new Impl[F](xa) + + final private class Impl[F[_]: Sync](xa: Transactor[F]) extends AttributeStore[F] { + def saveAttr(id: BinaryId, attrs: F[BinaryAttributes]): F[Unit] = + for { + now <- Timestamp.current[F] + a <- attrs + fileKey <- makeFileKey(id) + fm = RFileMeta( + fileKey, + now, + MimeType.parse(a.contentType.contentType).getOrElse(MimeType.octetStream), + ByteSize(a.length), + a.sha256 + ) + _ <- RFileMeta.insert(fm).transact(xa) + } yield () + + def deleteAttr(id: BinaryId): F[Boolean] = + makeFileKey(id).flatMap(fileKey => + RFileMeta.delete(fileKey).transact(xa).map(_ > 0) + ) + + def findAttr(id: BinaryId): OptionT[F, BinaryAttributes] = + findMeta(id).map(fm => + BinaryAttributes( + fm.checksum, + SimpleContentType(fm.mimetype.asString), + fm.length.bytes + ) + ) + + def findMeta(id: BinaryId): OptionT[F, RFileMeta] = + OptionT( + makeFileKey(id).flatMap(fileKey => RFileMeta.findById(fileKey).transact(xa)) + ) + + private def makeFileKey(binaryId: BinaryId): F[FileKey] = + Sync[F] + .pure( + BinnyUtils.binaryIdToFileKey(binaryId).left.map(new IllegalStateException(_)) + ) + .rethrow + } } diff --git a/modules/store/src/main/scala/docspell/store/file/BinnyUtils.scala b/modules/store/src/main/scala/docspell/store/file/BinnyUtils.scala index bdd09f48..a9f309e7 100644 --- a/modules/store/src/main/scala/docspell/store/file/BinnyUtils.scala +++ b/modules/store/src/main/scala/docspell/store/file/BinnyUtils.scala @@ -21,7 +21,7 @@ import binny.jdbc.{GenericJdbcStore, JdbcStoreConfig} import binny.minio.{MinioBinaryStore, MinioConfig, S3KeyMapping} import scodec.bits.ByteVector -private[store] object BinnyUtils { +object BinnyUtils { def fileKeyToBinaryId(fk: FileKey): BinaryId = BinaryId(s"${fk.collective.id}/${fk.category.id.id}/${fk.id.id}") diff --git a/modules/store/src/main/scala/docspell/store/file/FileRepository.scala b/modules/store/src/main/scala/docspell/store/file/FileRepository.scala index b8bef362..48b30b71 100644 --- a/modules/store/src/main/scala/docspell/store/file/FileRepository.scala +++ b/modules/store/src/main/scala/docspell/store/file/FileRepository.scala @@ -13,10 +13,12 @@ import fs2._ import docspell.common._ -import binny.{BinaryId, BinaryStore} +import binny.{BinaryAttributeStore, BinaryId, BinaryStore} import doobie.Transactor trait FileRepository[F[_]] { + def config: FileRepositoryConfig + def getBytes(key: FileKey): Stream[F, Byte] def findMeta(key: FileKey): F[Option[FileMetadata]] @@ -35,13 +37,27 @@ object FileRepository { def apply[F[_]: Async]( xa: Transactor[F], ds: DataSource, - cfg: FileRepositoryConfig + cfg: FileRepositoryConfig, + withAttributeStore: Boolean ): FileRepository[F] = { - val attrStore = new AttributeStore[F](xa) + val attrStore = + if (withAttributeStore) AttributeStore[F](xa) + else AttributeStore.empty[F] val log = docspell.logging.getLogger[F] val keyFun: FileKey => BinaryId = BinnyUtils.fileKeyToBinaryId val binStore: BinaryStore[F] = BinnyUtils.binaryStore(cfg, attrStore, ds, log) - new FileRepositoryImpl[F](binStore, attrStore, keyFun) + new FileRepositoryImpl[F](cfg, binStore, attrStore, keyFun) } + + def getDelegate[F[_]]( + repo: FileRepository[F] + ): Option[(BinaryStore[F], BinaryAttributeStore[F])] = + repo match { + case n: FileRepositoryImpl[F] => + Some((n.bs, n.attrStore)) + + case _ => + None + } } diff --git a/modules/store/src/main/scala/docspell/store/file/FileRepositoryConfig.scala b/modules/store/src/main/scala/docspell/store/file/FileRepositoryConfig.scala index 5c575e3d..41aec6da 100644 --- a/modules/store/src/main/scala/docspell/store/file/FileRepositoryConfig.scala +++ b/modules/store/src/main/scala/docspell/store/file/FileRepositoryConfig.scala @@ -8,6 +8,8 @@ package docspell.store.file import fs2.io.file.Path +import docspell.common.FileStoreConfig + sealed trait FileRepositoryConfig {} object FileRepositoryConfig { @@ -24,4 +26,13 @@ object FileRepositoryConfig { final case class Directory(path: Path, chunkSize: Int) extends FileRepositoryConfig + def fromFileStoreConfig(chunkSize: Int, cfg: FileStoreConfig): FileRepositoryConfig = + cfg match { + case FileStoreConfig.DefaultDatabase(_) => + FileRepositoryConfig.Database(chunkSize) + case FileStoreConfig.S3(_, endpoint, accessKey, secretKey, bucket) => + FileRepositoryConfig.S3(endpoint, accessKey, secretKey, bucket, chunkSize) + case FileStoreConfig.FileSystem(_, directory) => + FileRepositoryConfig.Directory(directory, chunkSize) + } } diff --git a/modules/store/src/main/scala/docspell/store/file/FileRepositoryImpl.scala b/modules/store/src/main/scala/docspell/store/file/FileRepositoryImpl.scala index 7f820244..e605d79c 100644 --- a/modules/store/src/main/scala/docspell/store/file/FileRepositoryImpl.scala +++ b/modules/store/src/main/scala/docspell/store/file/FileRepositoryImpl.scala @@ -16,8 +16,9 @@ import docspell.common._ import binny._ final class FileRepositoryImpl[F[_]: Sync]( - bs: BinaryStore[F], - attrStore: AttributeStore[F], + val config: FileRepositoryConfig, + val bs: BinaryStore[F], + val attrStore: AttributeStore[F], keyFun: FileKey => BinaryId ) extends FileRepository[F] { diff --git a/modules/store/src/main/scala/docspell/store/impl/StoreImpl.scala b/modules/store/src/main/scala/docspell/store/impl/StoreImpl.scala index 54505649..1c774956 100644 --- a/modules/store/src/main/scala/docspell/store/impl/StoreImpl.scala +++ b/modules/store/src/main/scala/docspell/store/impl/StoreImpl.scala @@ -6,12 +6,14 @@ package docspell.store.impl +import javax.sql.DataSource + import cats.arrow.FunctionK import cats.effect.Async import cats.implicits._ import cats.~> -import docspell.store.file.FileRepository +import docspell.store.file.{FileRepository, FileRepositoryConfig} import docspell.store.migrate.FlywayMigrate import docspell.store.{AddResult, JdbcConfig, Store} @@ -21,9 +23,16 @@ import doobie.implicits._ final class StoreImpl[F[_]: Async]( val fileRepo: FileRepository[F], jdbc: JdbcConfig, + ds: DataSource, xa: Transactor[F] ) extends Store[F] { + def createFileRepository( + cfg: FileRepositoryConfig, + withAttributeStore: Boolean + ): FileRepository[F] = + FileRepository(xa, ds, cfg, withAttributeStore) + def transform: ConnectionIO ~> F = FunctionK.lift(transact) diff --git a/modules/store/src/test/scala/docspell/store/StoreFixture.scala b/modules/store/src/test/scala/docspell/store/StoreFixture.scala index dc11ff41..a59871e3 100644 --- a/modules/store/src/test/scala/docspell/store/StoreFixture.scala +++ b/modules/store/src/test/scala/docspell/store/StoreFixture.scala @@ -69,7 +69,7 @@ object StoreFixture { xa <- makeXA(ds) cfg = FileRepositoryConfig.Database(64 * 1024) fr = FileRepository[IO](xa, ds, cfg) - store = new StoreImpl[IO](fr, jdbc, xa) + store = new StoreImpl[IO](fr, jdbc, ds, xa) _ <- Resource.eval(store.migrate) } yield store }