Fix previously published db migration

This commit is contained in:
eikek
2022-03-19 13:32:19 +01:00
parent dc268ba70d
commit ae845dfe3b
7 changed files with 187 additions and 31 deletions

View File

@ -37,7 +37,7 @@ final class StoreImpl[F[_]: Async](
FunctionK.lift(transact)
def migrate: F[Int] =
FlywayMigrate.run[F](jdbc).map(_.migrationsExecuted)
FlywayMigrate[F](jdbc, xa).run.map(_.migrationsExecuted)
def transact[A](prg: ConnectionIO[A]): F[A] =
prg.transact(xa)

View File

@ -7,37 +7,90 @@
package docspell.store.migrate
import cats.effect.Sync
import cats.implicits._
import docspell.store.JdbcConfig
import docspell.store.migrate.FlywayMigrate.MigrationKind
import doobie.implicits._
import doobie.util.transactor.Transactor
import org.flywaydb.core.Flyway
import org.flywaydb.core.api.output.MigrateResult
import org.log4s._
object FlywayMigrate {
private[this] val logger = getLogger
class FlywayMigrate[F[_]: Sync](jdbc: JdbcConfig, xa: Transactor[F]) {
private[this] val logger = docspell.logging.getLogger[F]
def run[F[_]: Sync](jdbc: JdbcConfig): F[MigrateResult] =
Sync[F].delay {
logger.info("Running db migrations...")
val locations = jdbc.dbmsName match {
case Some(dbtype) =>
List(s"classpath:db/migration/$dbtype", "classpath:db/migration/common")
case None =>
logger.warn(
s"Cannot read database name from jdbc url: ${jdbc.url}. Go with H2"
)
List("classpath:db/migration/h2", "classpath:db/migration/common")
}
private def createLocations(folder: String) =
jdbc.dbmsName match {
case Some(dbtype) =>
List(s"classpath:db/$folder/$dbtype", s"classpath:db/$folder/common")
case None =>
logger.warn(
s"Cannot read database name from jdbc url: ${jdbc.url}. Go with H2"
)
List(s"classpath:db/$folder/h2", s"classpath:db/$folder/common")
}
logger.info(s"Using migration locations: $locations")
val fw = Flyway
def createFlyway(kind: MigrationKind): F[Flyway] =
for {
locations <- Sync[F].pure(createLocations(kind.folder))
_ <- logger.info(s"Creating Flyway for: $locations")
fw = Flyway
.configure()
.table(kind.table)
.cleanDisabled(true)
.dataSource(jdbc.url.asString, jdbc.user, jdbc.password)
.locations(locations: _*)
.baselineOnMigrate(kind == MigrationKind.Fixups)
.load()
} yield fw
fw.migrate()
def run: F[MigrateResult] =
for {
_ <- runFixups
fw <- createFlyway(MigrationKind.Main)
_ <- logger.info(s"!!! Running main migrations")
result <- Sync[F].blocking(fw.migrate())
} yield result
// A hack to fix already published migrations
def runFixups: F[Unit] =
isSchemaEmpty.flatMap {
case true =>
().pure[F]
case false =>
for {
fw <- createFlyway(MigrationKind.Fixups)
_ <- logger.info(s"!!! Running fixup migrations")
_ <- Sync[F].blocking(fw.migrate())
} yield ()
}
private def isSchemaEmpty: F[Boolean] =
sql"select count(1) from flyway_schema_history"
.query[Int]
.unique
.attemptSql
.transact(xa)
.map(_.isLeft)
}
object FlywayMigrate {
def apply[F[_]: Sync](jdbcConfig: JdbcConfig, xa: Transactor[F]): FlywayMigrate[F] =
new FlywayMigrate[F](jdbcConfig, xa)
sealed trait MigrationKind {
def table: String
def folder: String
}
object MigrationKind {
case object Main extends MigrationKind {
val table = "flyway_schema_history"
val folder = "migration"
}
case object Fixups extends MigrationKind {
val table = "flyway_fixup_history"
val folder = "fixups"
}
}
}