Add task to copy files

This commit is contained in:
eikek
2022-03-08 00:11:40 +01:00
parent e82b00c582
commit 422c0905dc
28 changed files with 512 additions and 65 deletions

View File

@ -50,6 +50,7 @@ trait BackendApp[F[_]] {
def events: EventExchange[F] def events: EventExchange[F]
def notification: ONotification[F] def notification: ONotification[F]
def bookmarks: OQueryBookmarks[F] def bookmarks: OQueryBookmarks[F]
def fileRepository: OFileRepository[F]
} }
object BackendApp { object BackendApp {
@ -91,6 +92,7 @@ object BackendApp {
) )
notifyImpl <- ONotification(store, notificationMod) notifyImpl <- ONotification(store, notificationMod)
bookmarksImpl <- OQueryBookmarks(store) bookmarksImpl <- OQueryBookmarks(store)
fileRepoImpl <- OFileRepository(queue, joexImpl)
} yield new BackendApp[F] { } yield new BackendApp[F] {
val pubSub = pubSubT val pubSub = pubSubT
val login = loginImpl val login = loginImpl
@ -118,5 +120,6 @@ object BackendApp {
val events = notificationMod val events = notificationMod
val notification = notifyImpl val notification = notifyImpl
val bookmarks = bookmarksImpl val bookmarks = bookmarksImpl
val fileRepository = fileRepoImpl
} }
} }

View File

@ -42,15 +42,11 @@ object Config {
def defaultStoreConfig: FileStoreConfig = def defaultStoreConfig: FileStoreConfig =
enabledStores(defaultStore) enabledStores(defaultStore)
def toFileRepositoryConfig: FileRepositoryConfig = def defaultFileRepositoryConfig: FileRepositoryConfig =
defaultStoreConfig match { FileRepositoryConfig.fromFileStoreConfig(chunkSize, defaultStoreConfig)
case FileStoreConfig.DefaultDatabase(_) =>
FileRepositoryConfig.Database(chunkSize) def getFileRepositoryConfig(id: Ident): Option[FileRepositoryConfig] =
case FileStoreConfig.S3(_, endpoint, accessKey, secretKey, bucket) => stores.get(id).map(FileRepositoryConfig.fromFileStoreConfig(chunkSize, _))
FileRepositoryConfig.S3(endpoint, accessKey, secretKey, bucket, chunkSize)
case FileStoreConfig.FileSystem(_, directory) =>
FileRepositoryConfig.Directory(directory, chunkSize)
}
def validate: ValidatedNec[String, Files] = { def validate: ValidatedNec[String, Files] = {
val storesEmpty = val storesEmpty =

View File

@ -15,6 +15,26 @@ import docspell.notification.api.PeriodicQueryArgs
import docspell.store.records.RJob import docspell.store.records.RJob
object JobFactory extends MailAddressCodec { object JobFactory extends MailAddressCodec {
def fileCopy[F[_]: Sync](
args: FileCopyTaskArgs,
submitter: AccountId = DocspellSystem.account
): F[RJob] =
for {
id <- Ident.randomId[F]
now <- Timestamp.current[F]
job = RJob.newJob(
id,
FileCopyTaskArgs.taskName,
submitter.collective,
args,
s"Copying all files",
now,
submitter.user,
Priority.High,
Some(FileCopyTaskArgs.taskName)
)
} yield job
def periodicQuery[F[_]: Sync](args: PeriodicQueryArgs, submitter: AccountId): F[RJob] = def periodicQuery[F[_]: Sync](args: PeriodicQueryArgs, submitter: AccountId): F[RJob] =
for { for {
id <- Ident.randomId[F] id <- Ident.randomId[F]

View File

@ -0,0 +1,40 @@
/*
* Copyright 2020 Eike K. & Contributors
*
* SPDX-License-Identifier: AGPL-3.0-or-later
*/
package docspell.backend.ops
import cats.effect._
import cats.implicits._
import docspell.backend.JobFactory
import docspell.common.FileCopyTaskArgs
import docspell.store.queue.JobQueue
import docspell.store.records.RJob
trait OFileRepository[F[_]] {
/** Inserts the job or return None if such a job already is running. */
def cloneFileRepository(args: FileCopyTaskArgs, notifyJoex: Boolean): F[Option[RJob]]
}
object OFileRepository {
def apply[F[_]: Async](
queue: JobQueue[F],
joex: OJoex[F]
): Resource[F, OFileRepository[F]] =
Resource.pure(new OFileRepository[F] {
def cloneFileRepository(
args: FileCopyTaskArgs,
notifyJoex: Boolean
): F[Option[RJob]] =
for {
job <- JobFactory.fileCopy(args)
flag <- queue.insertIfNew(job)
_ <- if (notifyJoex) joex.notifyAllNodes else ().pure[F]
} yield Option.when(flag)(job)
})
}

View File

@ -14,7 +14,8 @@ case class Banner(
configFile: Option[String], configFile: Option[String],
appId: Ident, appId: Ident,
baseUrl: LenientUri, baseUrl: LenientUri,
ftsUrl: Option[LenientUri] ftsUrl: Option[LenientUri],
fileStoreConfig: FileStoreConfig
) { ) {
private val banner = private val banner =
@ -36,6 +37,7 @@ case class Banner(
s"Database: ${jdbcUrl.asString}", s"Database: ${jdbcUrl.asString}",
s"Fts: ${ftsUrl.map(_.asString).getOrElse("-")}", s"Fts: ${ftsUrl.map(_.asString).getOrElse("-")}",
s"Config: ${configFile.getOrElse("")}", s"Config: ${configFile.getOrElse("")}",
s"FileRepo: ${fileStoreConfig}",
"" ""
) )

View File

@ -10,6 +10,8 @@ object DocspellSystem {
val user = Ident.unsafe("docspell-system") val user = Ident.unsafe("docspell-system")
val taskGroup = user val taskGroup = user
val account: AccountId = AccountId(taskGroup, user)
val migrationTaskTracker = Ident.unsafe("full-text-index-tracker") val migrationTaskTracker = Ident.unsafe("full-text-index-tracker")
val allPreviewTaskTracker = Ident.unsafe("generate-all-previews") val allPreviewTaskTracker = Ident.unsafe("generate-all-previews")
val allPageCountTaskTracker = Ident.unsafe("all-page-count-tracker") val allPageCountTaskTracker = Ident.unsafe("all-page-count-tracker")

View File

@ -0,0 +1,56 @@
/*
* Copyright 2020 Eike K. & Contributors
*
* SPDX-License-Identifier: AGPL-3.0-or-later
*/
package docspell.common
import cats.data.NonEmptyList
import docspell.common.FileCopyTaskArgs.Selection
import io.circe.generic.semiauto.{deriveDecoder, deriveEncoder}
import io.circe.syntax._
import io.circe.{Decoder, Encoder}
/** This is the input to the `FileCopyTask`. The task copies all files from on
* FileRepository to one ore more target repositories.
*
* If no `from` is given, the default file repository is used. For targets, a list of ids
* can be specified that must match a configured file store in the config file. When
* selecting "all", it means all enabled stores.
*/
final case class FileCopyTaskArgs(from: Option[Ident], to: Selection)
object FileCopyTaskArgs {
val taskName = Ident.unsafe("copy-file-repositories")
sealed trait Selection
object Selection {
case object All extends Selection
case class Stores(ids: NonEmptyList[Ident]) extends Selection
implicit val jsonEncoder: Encoder[Selection] =
Encoder.instance {
case All => "!all".asJson
case Stores(ids) => ids.toList.asJson
}
implicit val jsonDecoder: Decoder[Selection] =
Decoder.instance { cursor =>
cursor.value.asString match {
case Some(s) if s.equalsIgnoreCase("!all") => Right(All)
case _ => cursor.value.as[NonEmptyList[Ident]].map(Stores.apply)
}
}
}
implicit val jsonDecoder: Decoder[FileCopyTaskArgs] =
deriveDecoder
implicit val jsonEncoder: Encoder[FileCopyTaskArgs] =
deriveEncoder
}

View File

@ -32,5 +32,8 @@ object FileStoreConfig {
bucket: String bucket: String
) extends FileStoreConfig { ) extends FileStoreConfig {
val storeType = FileStoreType.S3 val storeType = FileStoreType.S3
override def toString =
s"S3(enabled=$enabled, endpoint=$endpoint, bucket=$bucket, accessKey=$accessKey, secretKey=***)"
} }
} }

View File

@ -19,6 +19,9 @@ import pureconfig.generic.auto._
import yamusca.imports._ import yamusca.imports._
object ConfigFile { object ConfigFile {
// IntelliJ is wrong, this is required
import Implicits._
def loadConfig[F[_]: Async](args: List[String]): F[Config] = { def loadConfig[F[_]: Async](args: List[String]): F[Config] = {
val logger = docspell.logging.getLogger[F] val logger = docspell.logging.getLogger[F]
ConfigFactory ConfigFactory

View File

@ -20,6 +20,7 @@ import docspell.ftsclient.FtsClient
import docspell.ftssolr.SolrFtsClient import docspell.ftssolr.SolrFtsClient
import docspell.joex.analysis.RegexNerFile import docspell.joex.analysis.RegexNerFile
import docspell.joex.emptytrash._ import docspell.joex.emptytrash._
import docspell.joex.filecopy.FileCopyTask
import docspell.joex.fts.{MigrationTask, ReIndexTask} import docspell.joex.fts.{MigrationTask, ReIndexTask}
import docspell.joex.hk._ import docspell.joex.hk._
import docspell.joex.learn.LearnClassifierTask import docspell.joex.learn.LearnClassifierTask
@ -279,6 +280,13 @@ object JoexAppImpl extends MailAddressCodec {
PeriodicDueItemsTask.onCancel[F] PeriodicDueItemsTask.onCancel[F]
) )
) )
.withTask(
JobTask.json(
FileCopyTaskArgs.taskName,
FileCopyTask[F](cfg),
FileCopyTask.onCancel[F]
)
)
.resource .resource
psch <- PeriodicScheduler.create( psch <- PeriodicScheduler.create(
cfg.periodicScheduler, cfg.periodicScheduler,

View File

@ -41,7 +41,7 @@ object JoexServer {
store <- Store.create[F]( store <- Store.create[F](
cfg.jdbc, cfg.jdbc,
cfg.files.toFileRepositoryConfig, cfg.files.defaultFileRepositoryConfig,
pools.connectEC pools.connectEC
) )
settings <- Resource.eval(store.transact(RInternalSetting.create)) settings <- Resource.eval(store.transact(RInternalSetting.create))

View File

@ -31,7 +31,8 @@ object Main extends IOApp {
Option(System.getProperty("config.file")), Option(System.getProperty("config.file")),
cfg.appId, cfg.appId,
cfg.baseUrl, cfg.baseUrl,
Some(cfg.fullTextSearch.solr.url).filter(_ => cfg.fullTextSearch.enabled) Some(cfg.fullTextSearch.solr.url).filter(_ => cfg.fullTextSearch.enabled),
cfg.files.defaultStoreConfig
) )
_ <- logger.info(s"\n${banner.render("***>")}") _ <- logger.info(s"\n${banner.render("***>")}")
_ <- _ <-

View File

@ -0,0 +1,133 @@
/*
* Copyright 2020 Eike K. & Contributors
*
* SPDX-License-Identifier: AGPL-3.0-or-later
*/
package docspell.joex.filecopy
import cats.data.NonEmptyList
import cats.effect._
import cats.implicits._
import docspell.common.FileCopyTaskArgs.Selection
import docspell.common.{FileCopyTaskArgs, Ident}
import docspell.joex.Config
import docspell.joex.scheduler.Task
import docspell.logging.Logger
import docspell.store.file.{BinnyUtils, FileRepository, FileRepositoryConfig}
import binny.CopyTool.Counter
import binny.{BinaryId, BinaryStore, CopyTool}
import io.circe.generic.semiauto.deriveCodec
import io.circe.{Codec, Decoder, Encoder}
object FileCopyTask {
type Args = FileCopyTaskArgs
case class CopyResult(success: Boolean, message: String, counter: List[Counter])
object CopyResult {
def noSourceImpl: CopyResult =
CopyResult(false, "No source BinaryStore implementation found!", Nil)
def noTargetImpl: CopyResult =
CopyResult(false, "No target BinaryStore implementation found!", Nil)
def noSourceStore(id: Ident): CopyResult =
CopyResult(
false,
s"No source file repo found with id: ${id.id}. Make sure it is present in the config.",
Nil
)
def noTargetStore: CopyResult =
CopyResult(false, "No target file repositories defined", Nil)
def success(counter: NonEmptyList[Counter]): CopyResult =
CopyResult(true, "Done", counter.toList)
implicit val binaryIdCodec: Codec[BinaryId] =
Codec.from(
Decoder.decodeString.map(BinaryId.apply),
Encoder.encodeString.contramap(_.id)
)
implicit val counterEncoder: Codec[Counter] =
deriveCodec
implicit val jsonCodec: Codec[CopyResult] =
deriveCodec
}
def onCancel[F[_]]: Task[F, Args, Unit] =
Task.log(_.warn(s"Cancelling ${FileCopyTaskArgs.taskName.id} task"))
def apply[F[_]: Async](cfg: Config): Task[F, Args, CopyResult] =
Task { ctx =>
val src = ctx.args.from
.map(id =>
cfg.files.getFileRepositoryConfig(id).toRight(CopyResult.noSourceStore(id))
)
.getOrElse(Right(cfg.files.defaultFileRepositoryConfig))
val targets = ctx.args.to match {
case Selection.All =>
cfg.files.enabledStores.values.toList
.map(FileRepositoryConfig.fromFileStoreConfig(cfg.files.chunkSize, _))
case Selection.Stores(ids) =>
ids.traverse(cfg.files.getFileRepositoryConfig).map(_.toList).getOrElse(Nil)
}
// remove source from targets if present there
val data =
for {
srcConfig <- src
trgConfig <- NonEmptyList
.fromList(targets.filter(_ != srcConfig))
.toRight(CopyResult.noTargetStore)
srcRepo = ctx.store.createFileRepository(srcConfig, true)
targetRepos = trgConfig.map(ctx.store.createFileRepository(_, false))
} yield (srcRepo, targetRepos)
data match {
case Right((from, tos)) =>
ctx.logger.info(s"Start copying all files from ") *>
copy(ctx.logger, from, tos).flatTap(r =>
if (r.success) ctx.logger.info(s"Copying finished: ${r.counter}")
else ctx.logger.error(s"Copying failed: $r")
)
case Left(res) =>
ctx.logger.error(s"Copying failed: $res") *> res.pure[F]
}
}
def copy[F[_]: Async](
logger: Logger[F],
from: FileRepository[F],
to: NonEmptyList[FileRepository[F]]
): F[CopyResult] =
FileRepository.getDelegate(from) match {
case None =>
CopyResult.noSourceImpl.pure[F]
case Some((src, srcMeta)) =>
to.traverse(FileRepository.getDelegate).map(_.map(_._1)) match {
case None =>
CopyResult.noTargetImpl.pure[F]
case Some(targets) =>
val log = BinnyUtils.LoggerAdapter(logger)
val maxConcurrent = {
val nCores = Runtime.getRuntime.availableProcessors()
if (nCores > 2) nCores / 2 else 1
}
def copyTo(to: BinaryStore[F]) =
CopyTool.copyAll[F](log, src, srcMeta, to, 50, maxConcurrent)
logger.info(s"Start copying ${from.config} -> ${to.map(_.config)}") *>
targets.traverse(copyTo).map(CopyResult.success)
}
}
}

View File

@ -22,10 +22,12 @@ object ScribeConfigure {
Sync[F].delay { Sync[F].delay {
replaceJUL() replaceJUL()
val docspellLogger = scribe.Logger("docspell") val docspellLogger = scribe.Logger("docspell")
val flywayLogger = scribe.Logger("org.flywaydb")
unsafeConfigure(scribe.Logger.root, cfg.copy(minimumLevel = getRootMinimumLevel)) unsafeConfigure(scribe.Logger.root, cfg.copy(minimumLevel = getRootMinimumLevel))
unsafeConfigure(docspellLogger, cfg) unsafeConfigure(docspellLogger, cfg)
unsafeConfigure(flywayLogger, cfg) unsafeConfigure(scribe.Logger("org.flywaydb"), cfg)
unsafeConfigure(scribe.Logger("binny"), cfg)
unsafeConfigure(scribe.Logger("org.http4s"), cfg)
} }
private[this] def getRootMinimumLevel: Level = private[this] def getRootMinimumLevel: Level =

View File

@ -2487,6 +2487,35 @@ paths:
schema: schema:
$ref: "#/components/schemas/BasicResult" $ref: "#/components/schemas/BasicResult"
/admin/files/cloneFileRepository:
post:
operationId: "admin-files-cloneFileRepository"
tags: [Admin]
summary: Copy all files into a new repository
description: |
Submits a task that will copy all files of the application
(from the default file repository) into another file
repository as specified in the request. The request may define
ids of file repository configurations that must be present in
the config file. An empty list means to copy to all enabled
file repositories from te default file repository.
security:
- adminHeader: []
requestBody:
content:
application/json:
schema:
$ref: "#/components/schemas/FileRepositoryCloneRequest"
responses:
422:
description: BadRequest
200:
description: Ok
content:
application/json:
schema:
$ref: "#/components/schemas/BasicResult"
/sec/source: /sec/source:
get: get:
operationId: "sec-source-get-all" operationId: "sec-source-get-all"
@ -5433,6 +5462,18 @@ paths:
components: components:
schemas: schemas:
FileRepositoryCloneRequest:
description: |
Clone the file repository to a new location.
required:
- targetRepositories
properties:
targetRepositories:
type: array
items:
type: string
format: ident
BookmarkedQuery: BookmarkedQuery:
description: | description: |
A query bookmark. A query bookmark.

View File

@ -28,7 +28,8 @@ object Main extends IOApp {
Option(System.getProperty("config.file")), Option(System.getProperty("config.file")),
cfg.appId, cfg.appId,
cfg.baseUrl, cfg.baseUrl,
Some(cfg.fullTextSearch.solr.url).filter(_ => cfg.fullTextSearch.enabled) Some(cfg.fullTextSearch.solr.url).filter(_ => cfg.fullTextSearch.enabled),
cfg.backend.files.defaultStoreConfig
) )
_ <- logger.info(s"\n${banner.render("***>")}") _ <- logger.info(s"\n${banner.render("***>")}")
_ <- _ <-

View File

@ -7,7 +7,9 @@
package docspell.restserver package docspell.restserver
import fs2.Stream import fs2.Stream
import docspell.backend.BackendApp import docspell.backend.BackendApp
import org.http4s.HttpRoutes import org.http4s.HttpRoutes
import org.http4s.server.websocket.WebSocketBuilder2 import org.http4s.server.websocket.WebSocketBuilder2

View File

@ -9,6 +9,7 @@ package docspell.restserver
import cats.effect._ import cats.effect._
import fs2.Stream import fs2.Stream
import fs2.concurrent.Topic import fs2.concurrent.Topic
import docspell.backend.BackendApp import docspell.backend.BackendApp
import docspell.backend.auth.{AuthToken, ShareToken} import docspell.backend.auth.{AuthToken, ShareToken}
import docspell.ftsclient.FtsClient import docspell.ftsclient.FtsClient
@ -23,6 +24,7 @@ import docspell.restserver.routes._
import docspell.restserver.webapp.{TemplateRoutes, Templates, WebjarRoutes} import docspell.restserver.webapp.{TemplateRoutes, Templates, WebjarRoutes}
import docspell.restserver.ws.{OutputEvent, WebSocketRoutes} import docspell.restserver.ws.{OutputEvent, WebSocketRoutes}
import docspell.store.Store import docspell.store.Store
import emil.javamail.JavaMailEmil import emil.javamail.JavaMailEmil
import org.http4s.HttpRoutes import org.http4s.HttpRoutes
import org.http4s.client.Client import org.http4s.client.Client
@ -76,7 +78,8 @@ final class RestAppImpl[F[_]: Async](
"user/otp" -> TotpRoutes.admin(backend), "user/otp" -> TotpRoutes.admin(backend),
"user" -> UserRoutes.admin(backend), "user" -> UserRoutes.admin(backend),
"info" -> InfoRoutes.admin(config), "info" -> InfoRoutes.admin(config),
"attachments" -> AttachmentRoutes.admin(backend) "attachments" -> AttachmentRoutes.admin(backend),
"files" -> FileRepositoryRoutes.admin(backend)
) )
def shareRoutes( def shareRoutes(

View File

@ -7,18 +7,21 @@
package docspell.restserver package docspell.restserver
import scala.concurrent.duration._ import scala.concurrent.duration._
import cats.effect._ import cats.effect._
import cats.implicits._ import cats.implicits._
import fs2.Stream import fs2.Stream
import fs2.concurrent.Topic import fs2.concurrent.Topic
import docspell.backend.msg.Topics import docspell.backend.msg.Topics
import docspell.common._ import docspell.common._
import docspell.pubsub.naive.NaivePubSub import docspell.pubsub.naive.NaivePubSub
import docspell.restserver.http4s.InternalHeader import docspell.restserver.http4s.InternalHeader
import docspell.restserver.ws.OutputEvent.KeepAlive
import docspell.restserver.ws.OutputEvent import docspell.restserver.ws.OutputEvent
import docspell.restserver.ws.OutputEvent.KeepAlive
import docspell.store.Store import docspell.store.Store
import docspell.store.records.RInternalSetting import docspell.store.records.RInternalSetting
import org.http4s._ import org.http4s._
import org.http4s.blaze.client.BlazeClientBuilder import org.http4s.blaze.client.BlazeClientBuilder
import org.http4s.blaze.server.BlazeServerBuilder import org.http4s.blaze.server.BlazeServerBuilder
@ -73,7 +76,7 @@ object RestServer {
httpClient <- BlazeClientBuilder[F].resource httpClient <- BlazeClientBuilder[F].resource
store <- Store.create[F]( store <- Store.create[F](
cfg.backend.jdbc, cfg.backend.jdbc,
cfg.backend.files.toFileRepositoryConfig, cfg.backend.files.defaultFileRepositoryConfig,
pools.connectEC pools.connectEC
) )
setting <- Resource.eval(store.transact(RInternalSetting.create)) setting <- Resource.eval(store.transact(RInternalSetting.create))

View File

@ -0,0 +1,54 @@
/*
* Copyright 2020 Eike K. & Contributors
*
* SPDX-License-Identifier: AGPL-3.0-or-later
*/
package docspell.restserver.routes
import cats.data.NonEmptyList
import cats.effect._
import cats.implicits._
import docspell.backend.BackendApp
import docspell.common.FileCopyTaskArgs
import docspell.common.FileCopyTaskArgs.Selection
import docspell.restapi.model._
import org.http4s._
import org.http4s.circe.CirceEntityDecoder._
import org.http4s.circe.CirceEntityEncoder._
import org.http4s.dsl.Http4sDsl
object FileRepositoryRoutes {
def admin[F[_]: Async](backend: BackendApp[F]): HttpRoutes[F] = {
val dsl = Http4sDsl[F]
import dsl._
val logger = docspell.logging.getLogger[F]
HttpRoutes.of { case req @ POST -> Root / "cloneFileRepository" =>
for {
input <- req.as[FileRepositoryCloneRequest]
args = makeTaskArgs(input)
job <- backend.fileRepository.cloneFileRepository(args, true)
result = BasicResult(
job.isDefined,
job.fold(s"Job for '${FileCopyTaskArgs.taskName.id}' already running")(j =>
s"Job for '${FileCopyTaskArgs.taskName.id}' submitted: ${j.id.id}"
)
)
_ <- logger.info(result.message)
resp <- Ok(result)
} yield resp
}
}
def makeTaskArgs(input: FileRepositoryCloneRequest): FileCopyTaskArgs =
NonEmptyList.fromList(input.targetRepositories) match {
case Some(nel) =>
FileCopyTaskArgs(None, Selection.Stores(nel))
case None =>
FileCopyTaskArgs(None, Selection.All)
}
}

View File

@ -28,6 +28,11 @@ trait Store[F[_]] {
def fileRepo: FileRepository[F] def fileRepo: FileRepository[F]
def createFileRepository(
cfg: FileRepositoryConfig,
withAttributeStore: Boolean
): FileRepository[F]
def add(insert: ConnectionIO[Int], exists: ConnectionIO[Boolean]): F[AddResult] def add(insert: ConnectionIO[Int], exists: ConnectionIO[Boolean]): F[AddResult]
} }
@ -50,8 +55,8 @@ object Store {
ds.setDriverClassName(jdbc.driverClass) ds.setDriverClassName(jdbc.driverClass)
} }
xa = HikariTransactor(ds, connectEC) xa = HikariTransactor(ds, connectEC)
fr = FileRepository.apply(xa, ds, fileRepoConfig) fr = FileRepository.apply(xa, ds, fileRepoConfig, true)
st = new StoreImpl[F](fr, jdbc, xa) st = new StoreImpl[F](fr, jdbc, ds, xa)
_ <- Resource.eval(st.migrate) _ <- Resource.eval(st.migrate)
} yield st } yield st
} }

View File

@ -6,6 +6,7 @@
package docspell.store.file package docspell.store.file
import cats.Applicative
import cats.data.OptionT import cats.data.OptionT
import cats.effect._ import cats.effect._
import cats.implicits._ import cats.implicits._
@ -17,40 +18,71 @@ import binny._
import doobie._ import doobie._
import doobie.implicits._ import doobie.implicits._
final private[file] class AttributeStore[F[_]: Sync](xa: Transactor[F]) private[file] trait AttributeStore[F[_]] extends BinaryAttributeStore[F] {
extends BinaryAttributeStore[F] { def findMeta(id: BinaryId): OptionT[F, RFileMeta]
def saveAttr(id: BinaryId, attrs: F[BinaryAttributes]): F[Unit] = }
for {
now <- Timestamp.current[F] private[file] object AttributeStore {
a <- attrs def empty[F[_]: Applicative]: AttributeStore[F] =
fileKey <- makeFileKey(id) new AttributeStore[F] {
fm = RFileMeta( val delegate = BinaryAttributeStore.empty[F]
fileKey,
now, def findMeta(id: BinaryId) =
MimeType.parse(a.contentType.contentType).getOrElse(MimeType.octetStream), OptionT.none
ByteSize(a.length),
a.sha256 def saveAttr(id: BinaryId, attrs: F[BinaryAttributes]) =
) delegate.saveAttr(id, attrs)
_ <- RFileMeta.insert(fm).transact(xa)
} yield () def deleteAttr(id: BinaryId) =
delegate.deleteAttr(id)
def deleteAttr(id: BinaryId): F[Boolean] =
makeFileKey(id).flatMap(fileKey => RFileMeta.delete(fileKey).transact(xa).map(_ > 0)) def findAttr(id: BinaryId) =
delegate.findAttr(id)
def findAttr(id: BinaryId): OptionT[F, BinaryAttributes] = }
findMeta(id).map(fm =>
BinaryAttributes( def apply[F[_]: Sync](xa: Transactor[F]): AttributeStore[F] =
fm.checksum, new Impl[F](xa)
SimpleContentType(fm.mimetype.asString),
fm.length.bytes final private class Impl[F[_]: Sync](xa: Transactor[F]) extends AttributeStore[F] {
) def saveAttr(id: BinaryId, attrs: F[BinaryAttributes]): F[Unit] =
) for {
now <- Timestamp.current[F]
def findMeta(id: BinaryId): OptionT[F, RFileMeta] = a <- attrs
OptionT(makeFileKey(id).flatMap(fileKey => RFileMeta.findById(fileKey).transact(xa))) fileKey <- makeFileKey(id)
fm = RFileMeta(
private def makeFileKey(binaryId: BinaryId): F[FileKey] = fileKey,
Sync[F] now,
.pure(BinnyUtils.binaryIdToFileKey(binaryId).left.map(new IllegalStateException(_))) MimeType.parse(a.contentType.contentType).getOrElse(MimeType.octetStream),
.rethrow ByteSize(a.length),
a.sha256
)
_ <- RFileMeta.insert(fm).transact(xa)
} yield ()
def deleteAttr(id: BinaryId): F[Boolean] =
makeFileKey(id).flatMap(fileKey =>
RFileMeta.delete(fileKey).transact(xa).map(_ > 0)
)
def findAttr(id: BinaryId): OptionT[F, BinaryAttributes] =
findMeta(id).map(fm =>
BinaryAttributes(
fm.checksum,
SimpleContentType(fm.mimetype.asString),
fm.length.bytes
)
)
def findMeta(id: BinaryId): OptionT[F, RFileMeta] =
OptionT(
makeFileKey(id).flatMap(fileKey => RFileMeta.findById(fileKey).transact(xa))
)
private def makeFileKey(binaryId: BinaryId): F[FileKey] =
Sync[F]
.pure(
BinnyUtils.binaryIdToFileKey(binaryId).left.map(new IllegalStateException(_))
)
.rethrow
}
} }

View File

@ -21,7 +21,7 @@ import binny.jdbc.{GenericJdbcStore, JdbcStoreConfig}
import binny.minio.{MinioBinaryStore, MinioConfig, S3KeyMapping} import binny.minio.{MinioBinaryStore, MinioConfig, S3KeyMapping}
import scodec.bits.ByteVector import scodec.bits.ByteVector
private[store] object BinnyUtils { object BinnyUtils {
def fileKeyToBinaryId(fk: FileKey): BinaryId = def fileKeyToBinaryId(fk: FileKey): BinaryId =
BinaryId(s"${fk.collective.id}/${fk.category.id.id}/${fk.id.id}") BinaryId(s"${fk.collective.id}/${fk.category.id.id}/${fk.id.id}")

View File

@ -13,10 +13,12 @@ import fs2._
import docspell.common._ import docspell.common._
import binny.{BinaryId, BinaryStore} import binny.{BinaryAttributeStore, BinaryId, BinaryStore}
import doobie.Transactor import doobie.Transactor
trait FileRepository[F[_]] { trait FileRepository[F[_]] {
def config: FileRepositoryConfig
def getBytes(key: FileKey): Stream[F, Byte] def getBytes(key: FileKey): Stream[F, Byte]
def findMeta(key: FileKey): F[Option[FileMetadata]] def findMeta(key: FileKey): F[Option[FileMetadata]]
@ -35,13 +37,27 @@ object FileRepository {
def apply[F[_]: Async]( def apply[F[_]: Async](
xa: Transactor[F], xa: Transactor[F],
ds: DataSource, ds: DataSource,
cfg: FileRepositoryConfig cfg: FileRepositoryConfig,
withAttributeStore: Boolean
): FileRepository[F] = { ): FileRepository[F] = {
val attrStore = new AttributeStore[F](xa) val attrStore =
if (withAttributeStore) AttributeStore[F](xa)
else AttributeStore.empty[F]
val log = docspell.logging.getLogger[F] val log = docspell.logging.getLogger[F]
val keyFun: FileKey => BinaryId = BinnyUtils.fileKeyToBinaryId val keyFun: FileKey => BinaryId = BinnyUtils.fileKeyToBinaryId
val binStore: BinaryStore[F] = BinnyUtils.binaryStore(cfg, attrStore, ds, log) val binStore: BinaryStore[F] = BinnyUtils.binaryStore(cfg, attrStore, ds, log)
new FileRepositoryImpl[F](binStore, attrStore, keyFun) new FileRepositoryImpl[F](cfg, binStore, attrStore, keyFun)
} }
def getDelegate[F[_]](
repo: FileRepository[F]
): Option[(BinaryStore[F], BinaryAttributeStore[F])] =
repo match {
case n: FileRepositoryImpl[F] =>
Some((n.bs, n.attrStore))
case _ =>
None
}
} }

View File

@ -8,6 +8,8 @@ package docspell.store.file
import fs2.io.file.Path import fs2.io.file.Path
import docspell.common.FileStoreConfig
sealed trait FileRepositoryConfig {} sealed trait FileRepositoryConfig {}
object FileRepositoryConfig { object FileRepositoryConfig {
@ -24,4 +26,13 @@ object FileRepositoryConfig {
final case class Directory(path: Path, chunkSize: Int) extends FileRepositoryConfig final case class Directory(path: Path, chunkSize: Int) extends FileRepositoryConfig
def fromFileStoreConfig(chunkSize: Int, cfg: FileStoreConfig): FileRepositoryConfig =
cfg match {
case FileStoreConfig.DefaultDatabase(_) =>
FileRepositoryConfig.Database(chunkSize)
case FileStoreConfig.S3(_, endpoint, accessKey, secretKey, bucket) =>
FileRepositoryConfig.S3(endpoint, accessKey, secretKey, bucket, chunkSize)
case FileStoreConfig.FileSystem(_, directory) =>
FileRepositoryConfig.Directory(directory, chunkSize)
}
} }

View File

@ -16,8 +16,9 @@ import docspell.common._
import binny._ import binny._
final class FileRepositoryImpl[F[_]: Sync]( final class FileRepositoryImpl[F[_]: Sync](
bs: BinaryStore[F], val config: FileRepositoryConfig,
attrStore: AttributeStore[F], val bs: BinaryStore[F],
val attrStore: AttributeStore[F],
keyFun: FileKey => BinaryId keyFun: FileKey => BinaryId
) extends FileRepository[F] { ) extends FileRepository[F] {

View File

@ -6,12 +6,14 @@
package docspell.store.impl package docspell.store.impl
import javax.sql.DataSource
import cats.arrow.FunctionK import cats.arrow.FunctionK
import cats.effect.Async import cats.effect.Async
import cats.implicits._ import cats.implicits._
import cats.~> import cats.~>
import docspell.store.file.FileRepository import docspell.store.file.{FileRepository, FileRepositoryConfig}
import docspell.store.migrate.FlywayMigrate import docspell.store.migrate.FlywayMigrate
import docspell.store.{AddResult, JdbcConfig, Store} import docspell.store.{AddResult, JdbcConfig, Store}
@ -21,9 +23,16 @@ import doobie.implicits._
final class StoreImpl[F[_]: Async]( final class StoreImpl[F[_]: Async](
val fileRepo: FileRepository[F], val fileRepo: FileRepository[F],
jdbc: JdbcConfig, jdbc: JdbcConfig,
ds: DataSource,
xa: Transactor[F] xa: Transactor[F]
) extends Store[F] { ) extends Store[F] {
def createFileRepository(
cfg: FileRepositoryConfig,
withAttributeStore: Boolean
): FileRepository[F] =
FileRepository(xa, ds, cfg, withAttributeStore)
def transform: ConnectionIO ~> F = def transform: ConnectionIO ~> F =
FunctionK.lift(transact) FunctionK.lift(transact)

View File

@ -69,7 +69,7 @@ object StoreFixture {
xa <- makeXA(ds) xa <- makeXA(ds)
cfg = FileRepositoryConfig.Database(64 * 1024) cfg = FileRepositoryConfig.Database(64 * 1024)
fr = FileRepository[IO](xa, ds, cfg) fr = FileRepository[IO](xa, ds, cfg)
store = new StoreImpl[IO](fr, jdbc, xa) store = new StoreImpl[IO](fr, jdbc, ds, xa)
_ <- Resource.eval(store.migrate) _ <- Resource.eval(store.migrate)
} yield store } yield store
} }