Refactor scala base db migrations. Add marker trait for task args

This commit is contained in:
eikek 2022-08-05 14:27:34 +02:00
parent 66265d8455
commit ae4c49027f
28 changed files with 188 additions and 162 deletions

View File

@ -12,6 +12,7 @@ import io.circe.generic.semiauto.{deriveDecoder, deriveEncoder}
import io.circe.{Decoder, Encoder}
final case class DownloadZipArgs(account: AccountInfo, req: DownloadRequest)
extends TaskArguments
object DownloadZipArgs {
val taskName: Ident = Ident.unsafe("download-query-zip")

View File

@ -18,7 +18,7 @@ import io.circe.{Decoder, Encoder}
case class AllPreviewsArgs(
collective: Option[CollectiveId],
storeMode: MakePreviewArgs.StoreMode
)
) extends TaskArguments
object AllPreviewsArgs {

View File

@ -16,7 +16,7 @@ import io.circe.generic.semiauto._
* submitted by this task run in the realm of the collective (and only their files are
* considered). If it is empty, it is a system task and all files are considered.
*/
case class ConvertAllPdfArgs(collective: Option[CollectiveId])
case class ConvertAllPdfArgs(collective: Option[CollectiveId]) extends TaskArguments
object ConvertAllPdfArgs {

View File

@ -20,7 +20,7 @@ import io.circe.generic.semiauto._
case class EmptyTrashArgs(
collective: CollectiveId,
minAge: Duration
) {
) extends TaskArguments {
def makeSubject: String =
s"Empty Trash: Remove older than ${minAge.toJava}"

View File

@ -22,6 +22,7 @@ import io.circe.{Decoder, Encoder}
* selecting "all", it means all enabled stores.
*/
final case class FileCopyTaskArgs(from: Option[Ident], to: Selection)
extends TaskArguments
object FileCopyTaskArgs {
val taskName = Ident.unsafe("copy-file-repositories")

View File

@ -9,7 +9,7 @@ package docspell.common
import io.circe.generic.semiauto.{deriveDecoder, deriveEncoder}
import io.circe.{Decoder, Encoder}
final case class FileIntegrityCheckArgs(pattern: FileKeyPart) {}
final case class FileIntegrityCheckArgs(pattern: FileKeyPart) extends TaskArguments
object FileIntegrityCheckArgs {
val taskName: Ident = Ident.unsafe("all-file-integrity-check")

View File

@ -18,7 +18,7 @@ final case class ItemAddonTaskArgs(
collective: CollectiveId,
itemId: Ident,
addonRunConfigs: Set[Ident]
)
) extends TaskArguments
object ItemAddonTaskArgs {
val taskName: Ident = Ident.unsafe("addon-existing-item")

View File

@ -19,11 +19,10 @@ import io.circe.generic.semiauto._
*/
case class LearnClassifierArgs(
collectiveId: CollectiveId
) {
) extends TaskArguments {
def makeSubject: String =
"Learn tags"
}
object LearnClassifierArgs {

View File

@ -14,7 +14,7 @@ import io.circe.{Decoder, Encoder}
*/
case class MakePageCountArgs(
attachment: Ident
)
) extends TaskArguments
object MakePageCountArgs {

View File

@ -16,7 +16,7 @@ import io.circe.{Decoder, Encoder}
case class MakePreviewArgs(
attachment: Ident,
store: MakePreviewArgs.StoreMode
)
) extends TaskArguments
object MakePreviewArgs {

View File

@ -22,7 +22,7 @@ import io.circe.generic.semiauto._
*
* It is also re-used by the 'ReProcessItem' task.
*/
case class ProcessItemArgs(meta: ProcessMeta, files: List[File]) {
case class ProcessItemArgs(meta: ProcessMeta, files: List[File]) extends TaskArguments {
def makeSubject: String =
files.flatMap(_.name) match {

View File

@ -9,7 +9,7 @@ package docspell.common
import io.circe._
import io.circe.generic.semiauto._
final case class ReIndexTaskArgs(collective: Option[CollectiveId])
final case class ReIndexTaskArgs(collective: Option[CollectiveId]) extends TaskArguments
object ReIndexTaskArgs {
val taskName = Ident.unsafe("full-text-reindex")

View File

@ -16,6 +16,7 @@ import io.circe.{Decoder, Encoder}
* list is empty, then all attachments are re-processed.
*/
case class ReProcessItemArgs(itemId: Ident, attachments: List[Ident])
extends TaskArguments
object ReProcessItemArgs {

View File

@ -49,7 +49,7 @@ case class ScanMailboxArgs(
postHandleAll: Option[Boolean],
// Exclude the mail body when importing
attachmentsOnly: Option[Boolean]
)
) extends TaskArguments
object ScanMailboxArgs {

View File

@ -10,6 +10,7 @@ import io.circe.generic.semiauto.{deriveDecoder, deriveEncoder}
import io.circe.{Decoder, Encoder}
final case class ScheduledAddonTaskArgs(collective: CollectiveId, addonTaskId: Ident)
extends TaskArguments
object ScheduledAddonTaskArgs {
val taskName: Ident = Ident.unsafe("addon-scheduled-task")

View File

@ -0,0 +1,8 @@
package docspell.common
/** A marker trait for task arguments.
*
* Arguments for tasks are stored as a JSON blob in the database. Changes in structure
* requires a corresponding database migration.
*/
trait TaskArguments

View File

@ -28,7 +28,7 @@ final case class PeriodicDueItemsArgs(
tagsInclude: List[Ident],
tagsExclude: List[Ident],
baseUrl: Option[LenientUri]
)
) extends TaskArguments
object PeriodicDueItemsArgs {
val taskName = Ident.unsafe("periodic-due-items-notify2")

View File

@ -20,7 +20,7 @@ final case class PeriodicQueryArgs(
bookmark: Option[String],
baseUrl: Option[LenientUri],
contentStart: Option[String]
)
) extends TaskArguments
object PeriodicQueryArgs {
val taskName = Ident.unsafe("periodic-query-notify2")

View File

@ -0,0 +1,14 @@
package db.migration.common
import emil.MailAddress
import emil.javamail.syntax._
import io.circe.{Decoder, Encoder}
trait JsonCodecs {
implicit val jsonEncoder: Encoder[MailAddress] =
Encoder.encodeString.contramap(_.asUnicodeString)
implicit val jsonDecoder: Decoder[MailAddress] =
Decoder.decodeString.emap(MailAddress.parse)
}

View File

@ -1,21 +1,13 @@
/*
* Copyright 2020 Eike K. & Contributors
*
* SPDX-License-Identifier: AGPL-3.0-or-later
*/
package db.migration
package db.migration.common
import cats.data.{NonEmptyList, OptionT}
import cats.effect.{IO, Sync}
import cats.implicits._
import docspell.common._
import docspell.common.syntax.StringSyntax._
import docspell.notification.api._
import docspell.store.queries.QLogin
import docspell.store.records._
import db.migration.data.{
PeriodicDueItemsArgs => PeriodicDueItemsArgsLegacy,
PeriodicQueryArgs => PeriodicQueryArgsLegacy,
@ -23,31 +15,15 @@ import db.migration.data.{
}
import doobie._
import doobie.implicits._
import doobie.util.transactor.Strategy
import emil.MailAddress
import emil.javamail.syntax._
import io.circe.syntax._
import io.circe.{Decoder, Encoder}
import org.flywaydb.core.api.migration.Context
trait MigrationTasks {
object MigrateDueItemTasks extends TransactorSupport with JsonCodecs {
def logger: org.log4s.Logger
val logger = docspell.logging.getLogger[IO]
implicit val jsonEncoder: Encoder[MailAddress] =
Encoder.encodeString.contramap(_.asUnicodeString)
implicit val jsonDecoder: Decoder[MailAddress] =
Decoder.decodeString.emap(MailAddress.parse)
def migrateDueItemTasks: ConnectionIO[Unit] =
for {
tasks <- RPeriodicTask.findByTask(NotifyDueItemsArgs.taskName)
_ <- Sync[ConnectionIO].delay(
logger.info(s"Starting to migrate ${tasks.size} user tasks")
)
_ <- tasks.traverse(migrateDueItemTask1)
_ <- RPeriodicTask.setEnabledByTask(NotifyDueItemsArgs.taskName, false)
} yield ()
def run(ctx: Context) =
migratePeriodicItemTasks.transact(mkTransactor(ctx))
def migratePeriodicItemTasks: ConnectionIO[Unit] =
for {
@ -137,75 +113,4 @@ trait MigrationTasks {
ref = r.asRef
} yield ref)
.getOrElseF(Sync[ConnectionIO].raiseError(new Exception("User not found!")))
private def migrateDueItemTask1(old: RPeriodicTask): ConnectionIO[Int] = {
val converted = old.args
.parseJsonAs[NotifyDueItemsArgs]
.leftMap(_.getMessage())
.map(convertArgs)
converted match {
case Right(args) =>
val task = args
.semiflatMap(a =>
RPeriodicTask
.updateTask(
old.id,
PeriodicDueItemsArgsLegacy.taskName,
a.asJson.noSpaces
)
)
.getOrElse(0)
Sync[ConnectionIO].delay(logger.info(s"Converting user task: $old")) *> task
case Left(err) =>
logger.error(s"Error converting user task: $old. $err")
0.pure[ConnectionIO]
}
}
private def convertArgs(
old: NotifyDueItemsArgs
): OptionT[ConnectionIO, PeriodicDueItemsArgsLegacy] = {
val recs = old.recipients
.map(MailAddress.parse)
.flatMap {
case Right(m) => Some(m)
case Left(err) =>
logger.warn(s"Cannot read mail address: $err. Skip this while migrating.")
None
}
for {
userData <- OptionT(QLogin.findAccount(old.account))
userId = userData.userId
id <- OptionT.liftF(Ident.randomId[ConnectionIO])
now <- OptionT.liftF(Timestamp.current[ConnectionIO])
chName = Some("migrate notify items")
ch = RNotificationChannelMail(
id,
userId,
chName,
old.smtpConnection,
recs,
now
)
_ <- OptionT.liftF(RNotificationChannelMail.insert(ch))
args = PeriodicDueItemsArgsLegacy(
old.account,
NonEmptyList.of(ChannelRef(ch.id, ChannelType.Mail, chName)),
old.remindDays,
old.daysBack,
old.tagsInclude,
old.tagsExclude,
old.itemDetailUrl
)
} yield args
}
def mkTransactor(ctx: Context): Transactor[IO] = {
val xa = Transactor.fromConnection[IO](ctx.getConnection)
Transactor.strategy.set(xa, Strategy.void) // transactions are handled by flyway
}
}

View File

@ -0,0 +1,102 @@
package db.migration.common
import cats.data.{NonEmptyList, OptionT}
import cats.effect.{IO, Sync}
import cats.implicits._
import docspell.common._
import docspell.common.syntax.StringSyntax._
import docspell.notification.api._
import docspell.store.queries.QLogin
import docspell.store.records._
import db.migration.data.{PeriodicDueItemsArgs => PeriodicDueItemsArgsLegacy, _}
import doobie._
import doobie.implicits._
import emil.MailAddress
import emil.javamail.syntax._
import io.circe.syntax._
import org.flywaydb.core.api.migration.Context
object MigrateNotifyTasks extends TransactorSupport {
val logger = docspell.logging.getLogger[IO]
def run(ctx: Context): IO[Unit] =
migrateDueItemTasks.transact(mkTransactor(ctx))
def migrateDueItemTasks: ConnectionIO[Unit] =
for {
tasks <- RPeriodicTask.findByTask(NotifyDueItemsArgs.taskName)
_ <- Sync[ConnectionIO].delay(
logger.info(s"Starting to migrate ${tasks.size} user tasks")
)
_ <- tasks.traverse(migrateDueItemTask1)
_ <- RPeriodicTask.setEnabledByTask(NotifyDueItemsArgs.taskName, false)
} yield ()
private def migrateDueItemTask1(old: RPeriodicTask): ConnectionIO[Int] = {
val converted = old.args
.parseJsonAs[NotifyDueItemsArgs]
.leftMap(_.getMessage())
.map(convertArgs)
converted match {
case Right(args) =>
val task = args
.semiflatMap(a =>
RPeriodicTask
.updateTask(
old.id,
PeriodicDueItemsArgsLegacy.taskName,
a.asJson.noSpaces
)
)
.getOrElse(0)
Sync[ConnectionIO].delay(logger.info(s"Converting user task: $old")) *> task
case Left(err) =>
logger.error(s"Error converting user task: $old. $err")
0.pure[ConnectionIO]
}
}
private def convertArgs(
old: NotifyDueItemsArgs
): OptionT[ConnectionIO, PeriodicDueItemsArgsLegacy] = {
val recs = old.recipients
.map(MailAddress.parse)
.flatMap {
case Right(m) => Some(m)
case Left(err) =>
logger.warn(s"Cannot read mail address: $err. Skip this while migrating.")
None
}
for {
userData <- OptionT(QLogin.findAccount(old.account))
userId = userData.userId
id <- OptionT.liftF(Ident.randomId[ConnectionIO])
now <- OptionT.liftF(Timestamp.current[ConnectionIO])
chName = Some("migrate notify items")
ch = RNotificationChannelMail(
id,
userId,
chName,
old.smtpConnection,
recs,
now
)
_ <- OptionT.liftF(RNotificationChannelMail.insert(ch))
args = PeriodicDueItemsArgsLegacy(
old.account,
NonEmptyList.of(ChannelRef(ch.id, ChannelType.Mail, chName)),
old.remindDays,
old.daysBack,
old.tagsInclude,
old.tagsExclude,
old.itemDetailUrl
)
} yield args
}
}

View File

@ -0,0 +1,17 @@
package db.migration.common
import cats.effect.IO
import docspell.logging.Logger
import doobie.util.transactor.{Strategy, Transactor}
import org.flywaydb.core.api.migration.Context
trait TransactorSupport {
def logger: Logger[IO]
def mkTransactor(ctx: Context): Transactor[IO] = {
val xa = Transactor.fromConnection[IO](ctx.getConnection)
logger.asUnsafe.info(s"Creating transactor for db migrations from connection: $xa")
Transactor.strategy.set(xa, Strategy.void) // transactions are handled by flyway
}
}

View File

@ -7,17 +7,13 @@
package db.migration.h2
import cats.effect.unsafe.implicits._
import db.migration.MigrationTasks
import doobie.implicits._
import db.migration.common.MigrateNotifyTasks
import org.flywaydb.core.api.migration.BaseJavaMigration
import org.flywaydb.core.api.migration.Context
class V1_29_2__MigrateNotifyTask extends BaseJavaMigration with MigrationTasks {
class V1_29_2__MigrateNotifyTask extends BaseJavaMigration {
val logger = org.log4s.getLogger
override def migrate(ctx: Context): Unit = {
val xa = mkTransactor(ctx)
migrateDueItemTasks.transact(xa).unsafeRunSync()
}
override def migrate(ctx: Context): Unit =
MigrateNotifyTasks.run(ctx).unsafeRunSync()
}

View File

@ -7,16 +7,12 @@
package db.migration.h2
import cats.effect.unsafe.implicits._
import db.migration.MigrationTasks
import doobie.implicits._
import db.migration.common.MigrateDueItemTasks
import org.flywaydb.core.api.migration.{BaseJavaMigration, Context}
class V1_32_2__MigrateChannels extends BaseJavaMigration with MigrationTasks {
class V1_32_2__MigrateChannels extends BaseJavaMigration {
val logger = org.log4s.getLogger
override def migrate(ctx: Context): Unit = {
val xa = mkTransactor(ctx)
migratePeriodicItemTasks.transact(xa).unsafeRunSync()
}
override def migrate(ctx: Context): Unit =
MigrateDueItemTasks.run(ctx).unsafeRunSync()
}

View File

@ -7,17 +7,13 @@
package db.migration.mariadb
import cats.effect.unsafe.implicits._
import db.migration.MigrationTasks
import doobie.implicits._
import db.migration.common.MigrateNotifyTasks
import org.flywaydb.core.api.migration.BaseJavaMigration
import org.flywaydb.core.api.migration.Context
class V1_29_2__MigrateNotifyTask extends BaseJavaMigration with MigrationTasks {
class V1_29_2__MigrateNotifyTask extends BaseJavaMigration {
val logger = org.log4s.getLogger
override def migrate(ctx: Context): Unit = {
val xa = mkTransactor(ctx)
migrateDueItemTasks.transact(xa).unsafeRunSync()
}
override def migrate(ctx: Context): Unit =
MigrateNotifyTasks.run(ctx).unsafeRunSync()
}

View File

@ -7,16 +7,13 @@
package db.migration.mariadb
import cats.effect.unsafe.implicits._
import db.migration.MigrationTasks
import doobie.implicits._
import db.migration.common.MigrateDueItemTasks
import org.flywaydb.core.api.migration.{BaseJavaMigration, Context}
class V1_32_2__MigrateChannels extends BaseJavaMigration with MigrationTasks {
class V1_32_2__MigrateChannels extends BaseJavaMigration {
val logger = org.log4s.getLogger
override def migrate(ctx: Context): Unit = {
val xa = mkTransactor(ctx)
migratePeriodicItemTasks.transact(xa).unsafeRunSync()
}
override def migrate(ctx: Context): Unit =
MigrateDueItemTasks.run(ctx).unsafeRunSync()
}

View File

@ -7,17 +7,13 @@
package db.migration.postgresql
import cats.effect.unsafe.implicits._
import db.migration.MigrationTasks
import doobie.implicits._
import db.migration.common.MigrateNotifyTasks
import org.flywaydb.core.api.migration.BaseJavaMigration
import org.flywaydb.core.api.migration.Context
class V1_29_2__MigrateNotifyTask extends BaseJavaMigration with MigrationTasks {
class V1_29_2__MigrateNotifyTask extends BaseJavaMigration {
val logger = org.log4s.getLogger
override def migrate(ctx: Context): Unit = {
val xa = mkTransactor(ctx)
migrateDueItemTasks.transact(xa).unsafeRunSync()
}
override def migrate(ctx: Context): Unit =
MigrateNotifyTasks.run(ctx).unsafeRunSync()
}

View File

@ -7,16 +7,12 @@
package db.migration.postgresql
import cats.effect.unsafe.implicits._
import db.migration.MigrationTasks
import doobie.implicits._
import db.migration.common.MigrateDueItemTasks
import org.flywaydb.core.api.migration.{BaseJavaMigration, Context}
class V1_32_2__MigrateChannels extends BaseJavaMigration with MigrationTasks {
class V1_32_2__MigrateChannels extends BaseJavaMigration {
val logger = org.log4s.getLogger
override def migrate(ctx: Context): Unit = {
val xa = mkTransactor(ctx)
migratePeriodicItemTasks.transact(xa).unsafeRunSync()
}
override def migrate(ctx: Context): Unit =
MigrateDueItemTasks.run(ctx).unsafeRunSync()
}