mirror of
https://github.com/TheAnachronism/docspell.git
synced 2025-04-05 02:49:32 +00:00
Merge pull request #1454 from eikek/migration-fix
Fix previously published db migration
This commit is contained in:
commit
71f88486c2
@ -0,0 +1,58 @@
|
||||
drop table if exists valid_file_ids;
|
||||
create table valid_file_ids (
|
||||
id bigserial primary key,
|
||||
file_id varchar(254) not null unique
|
||||
);
|
||||
|
||||
-- Source files
|
||||
insert into valid_file_ids (file_id)
|
||||
select rs.file_id
|
||||
from attachment_source rs
|
||||
;
|
||||
|
||||
-- Archive files
|
||||
insert into valid_file_ids (file_id)
|
||||
select distinct rs.file_id
|
||||
from attachment_archive rs
|
||||
;
|
||||
|
||||
-- Preview image
|
||||
insert into valid_file_ids (file_id)
|
||||
select ap.file_id
|
||||
from attachment_preview ap
|
||||
;
|
||||
|
||||
-- classifier
|
||||
insert into valid_file_ids (file_id)
|
||||
select file_id
|
||||
from classifier_model
|
||||
;
|
||||
|
||||
-- save obsolete files
|
||||
drop table if exists obsolete_files;
|
||||
create table obsolete_files(
|
||||
file_id varchar(254) not null
|
||||
);
|
||||
|
||||
with
|
||||
missing_ids as (
|
||||
select file_id from filemeta
|
||||
except
|
||||
select file_id as file_id from valid_file_ids)
|
||||
insert into obsolete_files (file_id)
|
||||
select file_id from filemeta
|
||||
where file_id in (select file_id from missing_ids)
|
||||
;
|
||||
|
||||
|
||||
-- remove orphaned chunks
|
||||
delete from "filechunk"
|
||||
where "file_id" in (
|
||||
select distinct file_id from filechunk
|
||||
where file_id not in (select file_id from valid_file_ids)
|
||||
and file_id not in (select file_id from obsolete_files)
|
||||
);
|
||||
|
||||
-- drop temp table
|
||||
drop table valid_file_ids;
|
||||
drop table obsolete_files;
|
@ -37,7 +37,7 @@ final class StoreImpl[F[_]: Async](
|
||||
FunctionK.lift(transact)
|
||||
|
||||
def migrate: F[Int] =
|
||||
FlywayMigrate.run[F](jdbc).map(_.migrationsExecuted)
|
||||
FlywayMigrate[F](jdbc, xa).run.map(_.migrationsExecuted)
|
||||
|
||||
def transact[A](prg: ConnectionIO[A]): F[A] =
|
||||
prg.transact(xa)
|
||||
|
@ -7,37 +7,90 @@
|
||||
package docspell.store.migrate
|
||||
|
||||
import cats.effect.Sync
|
||||
import cats.implicits._
|
||||
|
||||
import docspell.store.JdbcConfig
|
||||
import docspell.store.migrate.FlywayMigrate.MigrationKind
|
||||
|
||||
import doobie.implicits._
|
||||
import doobie.util.transactor.Transactor
|
||||
import org.flywaydb.core.Flyway
|
||||
import org.flywaydb.core.api.output.MigrateResult
|
||||
import org.log4s._
|
||||
|
||||
object FlywayMigrate {
|
||||
private[this] val logger = getLogger
|
||||
class FlywayMigrate[F[_]: Sync](jdbc: JdbcConfig, xa: Transactor[F]) {
|
||||
private[this] val logger = docspell.logging.getLogger[F]
|
||||
|
||||
def run[F[_]: Sync](jdbc: JdbcConfig): F[MigrateResult] =
|
||||
Sync[F].delay {
|
||||
logger.info("Running db migrations...")
|
||||
val locations = jdbc.dbmsName match {
|
||||
case Some(dbtype) =>
|
||||
List(s"classpath:db/migration/$dbtype", "classpath:db/migration/common")
|
||||
case None =>
|
||||
logger.warn(
|
||||
s"Cannot read database name from jdbc url: ${jdbc.url}. Go with H2"
|
||||
)
|
||||
List("classpath:db/migration/h2", "classpath:db/migration/common")
|
||||
}
|
||||
private def createLocations(folder: String) =
|
||||
jdbc.dbmsName match {
|
||||
case Some(dbtype) =>
|
||||
List(s"classpath:db/$folder/$dbtype", s"classpath:db/$folder/common")
|
||||
case None =>
|
||||
logger.warn(
|
||||
s"Cannot read database name from jdbc url: ${jdbc.url}. Go with H2"
|
||||
)
|
||||
List(s"classpath:db/$folder/h2", s"classpath:db/$folder/common")
|
||||
}
|
||||
|
||||
logger.info(s"Using migration locations: $locations")
|
||||
val fw = Flyway
|
||||
def createFlyway(kind: MigrationKind): F[Flyway] =
|
||||
for {
|
||||
locations <- Sync[F].pure(createLocations(kind.folder))
|
||||
_ <- logger.info(s"Creating Flyway for: $locations")
|
||||
fw = Flyway
|
||||
.configure()
|
||||
.table(kind.table)
|
||||
.cleanDisabled(true)
|
||||
.dataSource(jdbc.url.asString, jdbc.user, jdbc.password)
|
||||
.locations(locations: _*)
|
||||
.baselineOnMigrate(kind == MigrationKind.Fixups)
|
||||
.load()
|
||||
} yield fw
|
||||
|
||||
fw.migrate()
|
||||
def run: F[MigrateResult] =
|
||||
for {
|
||||
_ <- runFixups
|
||||
fw <- createFlyway(MigrationKind.Main)
|
||||
_ <- logger.info(s"!!! Running main migrations")
|
||||
result <- Sync[F].blocking(fw.migrate())
|
||||
} yield result
|
||||
|
||||
// A hack to fix already published migrations
|
||||
def runFixups: F[Unit] =
|
||||
isSchemaEmpty.flatMap {
|
||||
case true =>
|
||||
().pure[F]
|
||||
case false =>
|
||||
for {
|
||||
fw <- createFlyway(MigrationKind.Fixups)
|
||||
_ <- logger.info(s"!!! Running fixup migrations")
|
||||
_ <- Sync[F].blocking(fw.migrate())
|
||||
} yield ()
|
||||
}
|
||||
|
||||
private def isSchemaEmpty: F[Boolean] =
|
||||
sql"select count(1) from flyway_schema_history"
|
||||
.query[Int]
|
||||
.unique
|
||||
.attemptSql
|
||||
.transact(xa)
|
||||
.map(_.isLeft)
|
||||
}
|
||||
|
||||
object FlywayMigrate {
|
||||
def apply[F[_]: Sync](jdbcConfig: JdbcConfig, xa: Transactor[F]): FlywayMigrate[F] =
|
||||
new FlywayMigrate[F](jdbcConfig, xa)
|
||||
|
||||
sealed trait MigrationKind {
|
||||
def table: String
|
||||
def folder: String
|
||||
}
|
||||
object MigrationKind {
|
||||
case object Main extends MigrationKind {
|
||||
val table = "flyway_schema_history"
|
||||
val folder = "migration"
|
||||
}
|
||||
case object Fixups extends MigrationKind {
|
||||
val table = "flyway_fixup_history"
|
||||
val folder = "fixups"
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -17,7 +17,9 @@ import docspell.store.migrate.FlywayMigrate
|
||||
|
||||
import doobie._
|
||||
import munit._
|
||||
import org.h2.jdbcx.JdbcConnectionPool
|
||||
import org.h2.jdbcx.{JdbcConnectionPool, JdbcDataSource}
|
||||
import org.mariadb.jdbc.MariaDbDataSource
|
||||
import org.postgresql.ds.PGConnectionPoolDataSource
|
||||
|
||||
trait StoreFixture extends CatsEffectFunFixtures { self: CatsEffectSuite =>
|
||||
|
||||
@ -26,7 +28,7 @@ trait StoreFixture extends CatsEffectFunFixtures { self: CatsEffectSuite =>
|
||||
for {
|
||||
ds <- StoreFixture.dataSource(cfg)
|
||||
xa <- StoreFixture.makeXA(ds)
|
||||
_ <- Resource.eval(FlywayMigrate.run[IO](cfg))
|
||||
_ <- Resource.eval(FlywayMigrate[IO](cfg, xa).run)
|
||||
} yield xa
|
||||
}
|
||||
|
||||
@ -52,7 +54,30 @@ object StoreFixture {
|
||||
|
||||
def dataSource(jdbc: JdbcConfig): Resource[IO, JdbcConnectionPool] = {
|
||||
def jdbcConnPool =
|
||||
JdbcConnectionPool.create(jdbc.url.asString, jdbc.user, jdbc.password)
|
||||
jdbc.dbmsName match {
|
||||
case Some("mariadb") =>
|
||||
val ds = new MariaDbDataSource()
|
||||
ds.setUrl(jdbc.url.asString)
|
||||
ds.setUser(jdbc.user)
|
||||
ds.setPassword(jdbc.password)
|
||||
JdbcConnectionPool.create(ds)
|
||||
|
||||
case Some("postgresql") =>
|
||||
val ds = new PGConnectionPoolDataSource()
|
||||
ds.setURL(jdbc.url.asString)
|
||||
ds.setUser(jdbc.user)
|
||||
ds.setPassword(jdbc.password)
|
||||
JdbcConnectionPool.create(ds)
|
||||
|
||||
case Some("h2") =>
|
||||
val ds = new JdbcDataSource()
|
||||
ds.setURL(jdbc.url.asString)
|
||||
ds.setUser(jdbc.user)
|
||||
ds.setPassword(jdbc.password)
|
||||
JdbcConnectionPool.create(ds)
|
||||
|
||||
case n => sys.error(s"Unknown db name: $n")
|
||||
}
|
||||
|
||||
Resource.make(IO(jdbcConnPool))(cp => IO(cp.dispose()))
|
||||
}
|
||||
|
@ -18,8 +18,15 @@ class H2MigrateTest extends FunSuite with TestLoggingConfig {
|
||||
|
||||
test("h2 empty schema migration") {
|
||||
val jdbc = StoreFixture.memoryDB("h2test")
|
||||
val result = FlywayMigrate.run[IO](jdbc).unsafeRunSync()
|
||||
assert(result.migrationsExecuted > 0)
|
||||
}
|
||||
val ds = StoreFixture.dataSource(jdbc)
|
||||
val result =
|
||||
ds.flatMap(StoreFixture.makeXA).use { xa =>
|
||||
FlywayMigrate[IO](jdbc, xa).run
|
||||
}
|
||||
|
||||
assert(result.unsafeRunSync().migrationsExecuted > 0)
|
||||
|
||||
// a second time to apply fixup migrations
|
||||
assert(result.unsafeRunSync().migrationsExecuted == 0)
|
||||
}
|
||||
}
|
||||
|
@ -11,7 +11,7 @@ import cats.effect.unsafe.implicits._
|
||||
|
||||
import docspell.common.LenientUri
|
||||
import docspell.logging.TestLoggingConfig
|
||||
import docspell.store.JdbcConfig
|
||||
import docspell.store.{JdbcConfig, StoreFixture}
|
||||
|
||||
import com.dimafeng.testcontainers.MariaDBContainer
|
||||
import com.dimafeng.testcontainers.munit.TestContainerForAll
|
||||
@ -30,8 +30,13 @@ class MariaDbMigrateTest
|
||||
withContainers { cnt =>
|
||||
val jdbc =
|
||||
JdbcConfig(LenientUri.unsafe(cnt.jdbcUrl), cnt.dbUsername, cnt.dbPassword)
|
||||
val result = FlywayMigrate.run[IO](jdbc).unsafeRunSync()
|
||||
assert(result.migrationsExecuted > 0)
|
||||
val ds = StoreFixture.dataSource(jdbc)
|
||||
val result = ds.flatMap(StoreFixture.makeXA).use { xa =>
|
||||
FlywayMigrate[IO](jdbc, xa).run
|
||||
}
|
||||
assert(result.unsafeRunSync().migrationsExecuted > 0)
|
||||
// a second time to apply fixup migrations
|
||||
assert(result.unsafeRunSync().migrationsExecuted == 0)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -11,7 +11,7 @@ import cats.effect.unsafe.implicits._
|
||||
|
||||
import docspell.common.LenientUri
|
||||
import docspell.logging.TestLoggingConfig
|
||||
import docspell.store.JdbcConfig
|
||||
import docspell.store.{JdbcConfig, StoreFixture}
|
||||
|
||||
import com.dimafeng.testcontainers.PostgreSQLContainer
|
||||
import com.dimafeng.testcontainers.munit.TestContainerForAll
|
||||
@ -30,8 +30,16 @@ class PostgresqlMigrateTest
|
||||
withContainers { cnt =>
|
||||
val jdbc =
|
||||
JdbcConfig(LenientUri.unsafe(cnt.jdbcUrl), cnt.username, cnt.password)
|
||||
val result = FlywayMigrate.run[IO](jdbc).unsafeRunSync()
|
||||
assert(result.migrationsExecuted > 0)
|
||||
|
||||
val ds = StoreFixture.dataSource(jdbc)
|
||||
val result =
|
||||
ds.flatMap(StoreFixture.makeXA).use { xa =>
|
||||
FlywayMigrate[IO](jdbc, xa).run
|
||||
}
|
||||
assert(result.unsafeRunSync().migrationsExecuted > 0)
|
||||
|
||||
// a second time to apply fixup migrations
|
||||
assert(result.unsafeRunSync().migrationsExecuted == 0)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user