mirror of
https://github.com/TheAnachronism/docspell.git
synced 2025-06-23 19:08:26 +00:00
@ -0,0 +1,17 @@
|
||||
/*
|
||||
* Copyright 2020 Eike K. & Contributors
|
||||
*
|
||||
* SPDX-License-Identifier: AGPL-3.0-or-later
|
||||
*/
|
||||
|
||||
package docspell.store
|
||||
|
||||
case class SchemaMigrateConfig(
|
||||
runMainMigrations: Boolean,
|
||||
runFixupMigrations: Boolean,
|
||||
repairSchema: Boolean
|
||||
)
|
||||
|
||||
object SchemaMigrateConfig {
|
||||
val defaults = SchemaMigrateConfig(true, true, false)
|
||||
}
|
@ -42,6 +42,7 @@ object Store {
|
||||
|
||||
def create[F[_]: Async](
|
||||
jdbc: JdbcConfig,
|
||||
schemaCfg: SchemaMigrateConfig,
|
||||
fileRepoConfig: FileRepositoryConfig,
|
||||
connectEC: ExecutionContext
|
||||
): Resource[F, Store[F]] = {
|
||||
@ -58,7 +59,7 @@ object Store {
|
||||
}
|
||||
xa = HikariTransactor(ds, connectEC)
|
||||
fr = FileRepository.apply(xa, ds, fileRepoConfig, true)
|
||||
st = new StoreImpl[F](fr, jdbc, ds, xa)
|
||||
st = new StoreImpl[F](fr, jdbc, schemaCfg, ds, xa)
|
||||
_ <- Resource.eval(st.migrate)
|
||||
} yield st
|
||||
}
|
||||
|
@ -13,9 +13,9 @@ import cats.effect.Async
|
||||
import cats.implicits._
|
||||
import cats.~>
|
||||
|
||||
import docspell.store._
|
||||
import docspell.store.file.{FileRepository, FileRepositoryConfig}
|
||||
import docspell.store.migrate.FlywayMigrate
|
||||
import docspell.store.{AddResult, JdbcConfig, Store}
|
||||
|
||||
import doobie._
|
||||
import doobie.implicits._
|
||||
@ -23,6 +23,7 @@ import doobie.implicits._
|
||||
final class StoreImpl[F[_]: Async](
|
||||
val fileRepo: FileRepository[F],
|
||||
jdbc: JdbcConfig,
|
||||
schemaCfg: SchemaMigrateConfig,
|
||||
ds: DataSource,
|
||||
val transactor: Transactor[F]
|
||||
) extends Store[F] {
|
||||
@ -38,7 +39,7 @@ final class StoreImpl[F[_]: Async](
|
||||
FunctionK.lift(transact)
|
||||
|
||||
def migrate: F[Int] =
|
||||
FlywayMigrate[F](jdbc, xa).run.map(_.migrationsExecuted)
|
||||
FlywayMigrate[F](jdbc, schemaCfg, xa).run.map(_.migrationsExecuted)
|
||||
|
||||
def transact[A](prg: ConnectionIO[A]): F[A] =
|
||||
prg.transact(xa)
|
||||
|
@ -10,15 +10,19 @@ import cats.data.OptionT
|
||||
import cats.effect.Sync
|
||||
import cats.implicits._
|
||||
|
||||
import docspell.store.JdbcConfig
|
||||
import docspell.store.migrate.FlywayMigrate.MigrationKind
|
||||
import docspell.store.{JdbcConfig, SchemaMigrateConfig}
|
||||
|
||||
import doobie.implicits._
|
||||
import doobie.util.transactor.Transactor
|
||||
import org.flywaydb.core.Flyway
|
||||
import org.flywaydb.core.api.output.MigrateResult
|
||||
|
||||
class FlywayMigrate[F[_]: Sync](jdbc: JdbcConfig, xa: Transactor[F]) {
|
||||
class FlywayMigrate[F[_]: Sync](
|
||||
jdbc: JdbcConfig,
|
||||
cfg: SchemaMigrateConfig,
|
||||
xa: Transactor[F]
|
||||
) {
|
||||
private[this] val logger = docspell.logging.getLogger[F]
|
||||
|
||||
private def createLocations(folder: String) =
|
||||
@ -49,28 +53,46 @@ class FlywayMigrate[F[_]: Sync](jdbc: JdbcConfig, xa: Transactor[F]) {
|
||||
def run: F[MigrateResult] =
|
||||
for {
|
||||
_ <- runFixups
|
||||
fw <- createFlyway(MigrationKind.Main)
|
||||
_ <- logger.info(s"!!! Running main migrations")
|
||||
result <- Sync[F].blocking(fw.migrate())
|
||||
result <- runMain
|
||||
} yield result
|
||||
|
||||
def runMain: F[MigrateResult] =
|
||||
if (!cfg.runMainMigrations)
|
||||
logger
|
||||
.info("Running main migrations is disabled!")
|
||||
.as(new MigrateResult("", "", ""))
|
||||
else
|
||||
for {
|
||||
fw <- createFlyway(MigrationKind.Main)
|
||||
_ <- logger.info(s"!!! Running main migrations (repair=${cfg.repairSchema})")
|
||||
_ <- if (cfg.repairSchema) Sync[F].blocking(fw.repair()).void else ().pure[F]
|
||||
result <- Sync[F].blocking(fw.migrate())
|
||||
} yield result
|
||||
|
||||
// A hack to fix already published migrations
|
||||
def runFixups: F[Unit] =
|
||||
isSchemaEmpty.flatMap {
|
||||
case true =>
|
||||
().pure[F]
|
||||
case false =>
|
||||
(for {
|
||||
current <- OptionT(getSchemaVersion)
|
||||
_ <- OptionT
|
||||
.fromOption[F](versionComponents(current))
|
||||
.filter(v => v._1 >= 1 && v._2 >= 32)
|
||||
fw <- OptionT.liftF(createFlyway(MigrationKind.Fixups))
|
||||
_ <- OptionT.liftF(logger.info(s"!!! Running fixup migrations"))
|
||||
_ <- OptionT.liftF(Sync[F].blocking(fw.migrate()))
|
||||
} yield ())
|
||||
.getOrElseF(logger.info(s"Fixup migrations not applied."))
|
||||
}
|
||||
if (!cfg.runFixupMigrations) logger.info(s"Running fixup migrations is disabled!")
|
||||
else
|
||||
isSchemaEmpty.flatMap {
|
||||
case true =>
|
||||
().pure[F]
|
||||
case false =>
|
||||
(for {
|
||||
current <- OptionT(getSchemaVersion)
|
||||
_ <- OptionT
|
||||
.fromOption[F](versionComponents(current))
|
||||
.filter(v => v._1 >= 1 && v._2 >= 32)
|
||||
fw <- OptionT.liftF(createFlyway(MigrationKind.Fixups))
|
||||
_ <- OptionT.liftF(
|
||||
logger.info(s"!!! Running fixup migrations (repair=${cfg.repairSchema})")
|
||||
)
|
||||
_ <-
|
||||
if (cfg.repairSchema) OptionT.liftF(Sync[F].blocking(fw.repair()).void)
|
||||
else OptionT.pure[F](())
|
||||
_ <- OptionT.liftF(Sync[F].blocking(fw.migrate()))
|
||||
} yield ())
|
||||
.getOrElseF(logger.info(s"Fixup migrations not applied."))
|
||||
}
|
||||
|
||||
private def isSchemaEmpty: F[Boolean] =
|
||||
sql"select count(1) from flyway_schema_history"
|
||||
@ -95,8 +117,12 @@ class FlywayMigrate[F[_]: Sync](jdbc: JdbcConfig, xa: Transactor[F]) {
|
||||
}
|
||||
|
||||
object FlywayMigrate {
|
||||
def apply[F[_]: Sync](jdbcConfig: JdbcConfig, xa: Transactor[F]): FlywayMigrate[F] =
|
||||
new FlywayMigrate[F](jdbcConfig, xa)
|
||||
def apply[F[_]: Sync](
|
||||
jdbcConfig: JdbcConfig,
|
||||
schemaCfg: SchemaMigrateConfig,
|
||||
xa: Transactor[F]
|
||||
): FlywayMigrate[F] =
|
||||
new FlywayMigrate[F](jdbcConfig, schemaCfg, xa)
|
||||
|
||||
sealed trait MigrationKind {
|
||||
def table: String
|
||||
|
Reference in New Issue
Block a user