Manage notification channels separately and migrate

It's more convenient to manage notification channels separately, as it
is done with email settings. Notification hook and other forms are
adopted to only select channels. Hooks can now use more than one
channel.
This commit is contained in:
eikek
2022-01-19 21:51:18 +01:00
parent d41490dd88
commit 23cb34a6ff
78 changed files with 2583 additions and 1422 deletions

View File

@@ -6,23 +6,23 @@
package db.migration
import cats.data.NonEmptyList
import cats.data.{NonEmptyList, OptionT}
import cats.effect.{IO, Sync}
import cats.implicits._
import docspell.common._
import docspell.common.syntax.StringSyntax._
import docspell.notification.api.Channel
import docspell.notification.api.PeriodicDueItemsArgs
import docspell.store.records.RPeriodicTask
import docspell.notification.api._
import docspell.store.records._
import db.migration.data.{PeriodicDueItemsArgsOld, PeriodicQueryArgsOld}
import doobie._
import doobie.implicits._
import doobie.util.transactor.Strategy
import emil.MailAddress
import emil.javamail.syntax._
import io.circe.Encoder
import io.circe.syntax._
import io.circe.{Decoder, Encoder}
import org.flywaydb.core.api.migration.Context
trait MigrationTasks {
@@ -31,6 +31,8 @@ trait MigrationTasks {
implicit val jsonEncoder: Encoder[MailAddress] =
Encoder.encodeString.contramap(_.asUnicodeString)
implicit val jsonDecoder: Decoder[MailAddress] =
Decoder.decodeString.emap(MailAddress.parse)
def migrateDueItemTasks: ConnectionIO[Unit] =
for {
@@ -42,20 +44,114 @@ trait MigrationTasks {
_ <- RPeriodicTask.setEnabledByTask(NotifyDueItemsArgs.taskName, false)
} yield ()
def migrateDueItemTask1(old: RPeriodicTask): ConnectionIO[Int] = {
val converted = old.args
.parseJsonAs[NotifyDueItemsArgs]
.leftMap(_.getMessage())
.flatMap(convertArgs)
def migratePeriodicItemTasks: ConnectionIO[Unit] =
for {
tasks2 <- RPeriodicTask.findByTask(PeriodicDueItemsArgsOld.taskName)
tasks3 <- RPeriodicTask.findByTask(PeriodicQueryArgsOld.taskName)
size = tasks2.size + tasks3.size
_ <- Sync[ConnectionIO].delay(
logger.info(s"Starting to migrate $size user tasks")
)
_ <- tasks2.traverse(migratePeriodicDueItemsTask)
_ <- tasks3.traverse(migratePeriodicQueryTask)
_ <- RPeriodicTask.setEnabledByTask(PeriodicQueryArgsOld.taskName, false)
_ <- RPeriodicTask.setEnabledByTask(PeriodicDueItemsArgsOld.taskName, false)
} yield ()
converted match {
case Right(args) =>
Sync[ConnectionIO].delay(logger.info(s"Converting user task: $old")) *>
private def migratePeriodicQueryTask(old: RPeriodicTask): ConnectionIO[Int] =
old.args
.parseJsonAs[PeriodicQueryArgsOld]
.leftMap { ex =>
logger.error(ex)(s"Error migrating tasks")
0.pure[ConnectionIO]
}
.map { oldArgs =>
val ref = oldArgs.channel match {
case Right(c) => saveChannel(c, oldArgs.account)
case Left(ref) => ref.pure[ConnectionIO]
}
ref.flatMap(channelRef =>
RPeriodicTask.updateTask(
old.id,
PeriodicQueryArgs.taskName,
PeriodicQueryArgs(
oldArgs.account,
NonEmptyList.of(channelRef),
oldArgs.query,
oldArgs.bookmark,
oldArgs.baseUrl,
oldArgs.contentStart
).asJson.noSpaces
)
)
}
.fold(identity, identity)
private def migratePeriodicDueItemsTask(old: RPeriodicTask): ConnectionIO[Int] =
old.args
.parseJsonAs[PeriodicDueItemsArgsOld]
.leftMap { ex =>
logger.error(ex)(s"Error migrating tasks")
0.pure[ConnectionIO]
}
.map { oldArgs =>
val ref = oldArgs.channel match {
case Right(c) => saveChannel(c, oldArgs.account)
case Left(ref) => ref.pure[ConnectionIO]
}
ref.flatMap(channelRef =>
RPeriodicTask.updateTask(
old.id,
PeriodicDueItemsArgs.taskName,
args.asJson.noSpaces
PeriodicDueItemsArgs(
oldArgs.account,
NonEmptyList.of(channelRef),
oldArgs.remindDays,
oldArgs.daysBack,
oldArgs.tagsInclude,
oldArgs.tagsExclude,
oldArgs.baseUrl
).asJson.noSpaces
)
)
}
.fold(identity, identity)
private def saveChannel(ch: Channel, account: AccountId): ConnectionIO[ChannelRef] =
(for {
newId <- OptionT.liftF(Ident.randomId[ConnectionIO])
userId <- OptionT(RUser.findIdByAccount(account))
r <- RNotificationChannel.fromChannel(ch, newId, userId)
_ <- OptionT.liftF(RNotificationChannel.insert(r))
_ <- OptionT.liftF(
Sync[ConnectionIO].delay(logger.debug(s"Created channel $r for $account"))
)
ref = r.asRef
} yield ref)
.getOrElseF(Sync[ConnectionIO].raiseError(new Exception("User not found!")))
private def migrateDueItemTask1(old: RPeriodicTask): ConnectionIO[Int] = {
val converted = old.args
.parseJsonAs[NotifyDueItemsArgs]
.leftMap(_.getMessage())
.map(convertArgs)
converted match {
case Right(args) =>
val task = args
.semiflatMap(a =>
RPeriodicTask
.updateTask(
old.id,
PeriodicDueItemsArgs.taskName,
a.asJson.noSpaces
)
)
.getOrElse(0)
Sync[ConnectionIO].delay(logger.info(s"Converting user task: $old")) *> task
case Left(err) =>
logger.error(s"Error converting user task: $old. $err")
@@ -63,22 +159,44 @@ trait MigrationTasks {
}
}
def convertArgs(old: NotifyDueItemsArgs): Either[String, PeriodicDueItemsArgs] =
old.recipients
.traverse(MailAddress.parse)
.flatMap(l => NonEmptyList.fromList(l).toRight("No recipients provided"))
.map { rec =>
PeriodicDueItemsArgs(
old.account,
Right(Channel.Mail(Ident.unsafe(""), None, old.smtpConnection, rec)),
old.remindDays,
old.daysBack,
old.tagsInclude,
old.tagsExclude,
old.itemDetailUrl
)
private def convertArgs(
old: NotifyDueItemsArgs
): OptionT[ConnectionIO, PeriodicDueItemsArgs] = {
val recs = old.recipients
.map(MailAddress.parse)
.flatMap {
case Right(m) => Some(m)
case Left(err) =>
logger.warn(s"Cannot read mail address: $err. Skip this while migrating.")
None
}
for {
userId <- OptionT(RUser.findIdByAccount(old.account))
id <- OptionT.liftF(Ident.randomId[ConnectionIO])
now <- OptionT.liftF(Timestamp.current[ConnectionIO])
chName = Some("migrate notify items")
ch = RNotificationChannelMail(
id,
userId,
chName,
old.smtpConnection,
recs,
now
)
_ <- OptionT.liftF(RNotificationChannelMail.insert(ch))
args = PeriodicDueItemsArgs(
old.account,
NonEmptyList.of(ChannelRef(ch.id, ChannelType.Mail, chName)),
old.remindDays,
old.daysBack,
old.tagsInclude,
old.tagsExclude,
old.itemDetailUrl
)
} yield args
}
def mkTransactor(ctx: Context): Transactor[IO] = {
val xa = Transactor.fromConnection[IO](ctx.getConnection())
Transactor.strategy.set(xa, Strategy.void) // transactions are handled by flyway

View File

@@ -0,0 +1,48 @@
/*
* Copyright 2020 Eike K. & Contributors
*
* SPDX-License-Identifier: AGPL-3.0-or-later
*/
package db.migration.data
import docspell.common._
import emil.MailAddress
import io.circe.generic.semiauto
import io.circe.{Decoder, Encoder}
/** Arguments to the notification task.
*
* This tasks queries items with a due date and informs the user via mail.
*
* If the structure changes, there must be some database migration to update or remove
* the json data of the corresponding task.
*/
final case class PeriodicDueItemsArgsOld(
account: AccountId,
channel: ChannelOrRef,
remindDays: Int,
daysBack: Option[Int],
tagsInclude: List[Ident],
tagsExclude: List[Ident],
baseUrl: Option[LenientUri]
)
object PeriodicDueItemsArgsOld {
val taskName = Ident.unsafe("periodic-due-items-notify")
implicit def jsonDecoder(implicit
mc: Decoder[MailAddress]
): Decoder[PeriodicDueItemsArgsOld] = {
implicit val x = ChannelOrRef.jsonDecoder
semiauto.deriveDecoder
}
implicit def jsonEncoder(implicit
mc: Encoder[MailAddress]
): Encoder[PeriodicDueItemsArgsOld] = {
implicit val x = ChannelOrRef.jsonEncoder
semiauto.deriveEncoder
}
}

View File

@@ -0,0 +1,40 @@
/*
* Copyright 2020 Eike K. & Contributors
*
* SPDX-License-Identifier: AGPL-3.0-or-later
*/
package db.migration.data
import docspell.common._
import emil.MailAddress
import io.circe.generic.semiauto
import io.circe.{Decoder, Encoder}
final case class PeriodicQueryArgsOld(
account: AccountId,
channel: ChannelOrRef,
query: Option[ItemQueryString],
bookmark: Option[String],
baseUrl: Option[LenientUri],
contentStart: Option[String]
)
object PeriodicQueryArgsOld {
val taskName = Ident.unsafe("periodic-query-notify")
implicit def jsonDecoder(implicit
mc: Decoder[MailAddress]
): Decoder[PeriodicQueryArgsOld] = {
implicit val x = ChannelOrRef.jsonDecoder
semiauto.deriveDecoder
}
implicit def jsonEncoder(implicit
mc: Encoder[MailAddress]
): Encoder[PeriodicQueryArgsOld] = {
implicit val x = ChannelOrRef.jsonEncoder
semiauto.deriveEncoder
}
}

View File

@@ -0,0 +1,29 @@
/*
* Copyright 2020 Eike K. & Contributors
*
* SPDX-License-Identifier: AGPL-3.0-or-later
*/
package db.migration
import docspell.notification.api._
import emil.MailAddress
import io.circe.{Decoder, Encoder}
package object data {
type ChannelOrRef = Either[ChannelRef, Channel]
object ChannelOrRef {
implicit def jsonDecoder(implicit mc: Decoder[MailAddress]): Decoder[ChannelOrRef] =
Channel.jsonDecoder.either(ChannelRef.jsonDecoder).map(_.swap)
implicit def jsonEncoder(implicit mc: Encoder[MailAddress]): Encoder[ChannelOrRef] =
Encoder.instance(_.fold(ChannelRef.jsonEncoder.apply, Channel.jsonEncoder.apply))
implicit class ChannelOrRefOpts(cr: ChannelOrRef) {
def channelType: ChannelType =
cr.fold(_.channelType, _.channelType)
}
}
}

View File

@@ -0,0 +1,22 @@
/*
* Copyright 2020 Eike K. & Contributors
*
* SPDX-License-Identifier: AGPL-3.0-or-later
*/
package db.migration.h2
import cats.effect.unsafe.implicits._
import db.migration.MigrationTasks
import doobie.implicits._
import org.flywaydb.core.api.migration.{BaseJavaMigration, Context}
class V1_32_2__MigrateChannels extends BaseJavaMigration with MigrationTasks {
val logger = org.log4s.getLogger
override def migrate(ctx: Context): Unit = {
val xa = mkTransactor(ctx)
migratePeriodicItemTasks.transact(xa).unsafeRunSync()
}
}

View File

@@ -0,0 +1,22 @@
/*
* Copyright 2020 Eike K. & Contributors
*
* SPDX-License-Identifier: AGPL-3.0-or-later
*/
package db.migration.mariadb
import cats.effect.unsafe.implicits._
import db.migration.MigrationTasks
import doobie.implicits._
import org.flywaydb.core.api.migration.{BaseJavaMigration, Context}
class V1_32_2__MigrateChannels extends BaseJavaMigration with MigrationTasks {
val logger = org.log4s.getLogger
override def migrate(ctx: Context): Unit = {
val xa = mkTransactor(ctx)
migratePeriodicItemTasks.transact(xa).unsafeRunSync()
}
}

View File

@@ -0,0 +1,22 @@
/*
* Copyright 2020 Eike K. & Contributors
*
* SPDX-License-Identifier: AGPL-3.0-or-later
*/
package db.migration.postgresql
import cats.effect.unsafe.implicits._
import db.migration.MigrationTasks
import doobie.implicits._
import org.flywaydb.core.api.migration.{BaseJavaMigration, Context}
class V1_32_2__MigrateChannels extends BaseJavaMigration with MigrationTasks {
val logger = org.log4s.getLogger
override def migrate(ctx: Context): Unit = {
val xa = mkTransactor(ctx)
migratePeriodicItemTasks.transact(xa).unsafeRunSync()
}
}