From 7424a9b2f9a0a97c100ee35918965617b526a1b5 Mon Sep 17 00:00:00 2001 From: eikek Date: Sun, 7 Aug 2022 13:13:22 +0200 Subject: [PATCH] Migrate background task to new collective id --- .../docspell/common/FileCopyTaskArgs.scala | 2 +- .../scala/docspell/common/FileKeyPart.scala | 7 +- .../docspell/common/syntax/StringSyntax.scala | 4 +- .../common/MigrateCollectiveIdTaskArgs.scala | 457 ++++++++++++++++++ .../common/MigrateDueItemTasks.scala | 10 +- .../migration/common/MigrateNotifyTasks.scala | 35 +- .../data/FileIntegrityCheckArgs.scala | 84 ++++ .../db/migration/data/ProcessItemArgs.scala | 4 +- .../data/ScheduledAddonTaskArgs.scala | 3 - .../migration/h2/V1_39_2__MigrateTasks.scala | 18 + .../mariadb/V1_39_2__MigrateTasks.scala | 18 + .../postgresql/V1_39_2__MigrateTasks.scala | 18 + .../docspell/store/file/BinnyUtils.scala | 4 +- .../main/scala/docspell/store/qb/DML.scala | 2 +- .../scala/docspell/store/records/RJob.scala | 24 + .../store/records/RPeriodicTask.scala | 9 +- 16 files changed, 671 insertions(+), 28 deletions(-) create mode 100644 modules/store/src/main/scala/db/migration/common/MigrateCollectiveIdTaskArgs.scala create mode 100644 modules/store/src/main/scala/db/migration/data/FileIntegrityCheckArgs.scala create mode 100644 modules/store/src/main/scala/db/migration/h2/V1_39_2__MigrateTasks.scala create mode 100644 modules/store/src/main/scala/db/migration/mariadb/V1_39_2__MigrateTasks.scala create mode 100644 modules/store/src/main/scala/db/migration/postgresql/V1_39_2__MigrateTasks.scala diff --git a/modules/common/src/main/scala/docspell/common/FileCopyTaskArgs.scala b/modules/common/src/main/scala/docspell/common/FileCopyTaskArgs.scala index 6d04faa1..843e57cc 100644 --- a/modules/common/src/main/scala/docspell/common/FileCopyTaskArgs.scala +++ b/modules/common/src/main/scala/docspell/common/FileCopyTaskArgs.scala @@ -14,7 +14,7 @@ import io.circe.generic.semiauto.{deriveDecoder, deriveEncoder} import io.circe.syntax._ import io.circe.{Decoder, Encoder} -/** This is the input to the `FileCopyTask`. The task copies all files from on +/** This is the input to the `FileCopyTask`. The task copies all files from one * FileRepository to one ore more target repositories. * * If no `from` is given, the default file repository is used. For targets, a list of ids diff --git a/modules/common/src/main/scala/docspell/common/FileKeyPart.scala b/modules/common/src/main/scala/docspell/common/FileKeyPart.scala index fab01ab0..8693f07f 100644 --- a/modules/common/src/main/scala/docspell/common/FileKeyPart.scala +++ b/modules/common/src/main/scala/docspell/common/FileKeyPart.scala @@ -17,9 +17,10 @@ object FileKeyPart { case object Empty extends FileKeyPart - final case class Collective(collective: Ident) extends FileKeyPart + final case class Collective(collective: CollectiveId) extends FileKeyPart - final case class Category(collective: Ident, category: FileCategory) extends FileKeyPart + final case class Category(collective: CollectiveId, category: FileCategory) + extends FileKeyPart final case class Key(key: FileKey) extends FileKeyPart @@ -37,7 +38,7 @@ object FileKeyPart { implicit val jsonDecoder: Decoder[FileKeyPart] = Decoder.instance { cursor => for { - cid <- cursor.getOrElse[Option[Ident]]("collective")(None) + cid <- cursor.getOrElse[Option[CollectiveId]]("collective")(None) cat <- cursor.getOrElse[Option[FileCategory]]("category")(None) emptyObj = cursor.keys.exists(_.isEmpty) diff --git a/modules/common/src/main/scala/docspell/common/syntax/StringSyntax.scala b/modules/common/src/main/scala/docspell/common/syntax/StringSyntax.scala index 5e0038da..d1e27c4b 100644 --- a/modules/common/src/main/scala/docspell/common/syntax/StringSyntax.scala +++ b/modules/common/src/main/scala/docspell/common/syntax/StringSyntax.scala @@ -15,9 +15,7 @@ trait StringSyntax { Option(s).filter(_.trim.nonEmpty) def parseJsonAs[A](implicit d: Decoder[A]): Either[Throwable, A] = - for { - json <- parser.decode[A](s) - } yield json + parser.decode[A](s) } } diff --git a/modules/store/src/main/scala/db/migration/common/MigrateCollectiveIdTaskArgs.scala b/modules/store/src/main/scala/db/migration/common/MigrateCollectiveIdTaskArgs.scala new file mode 100644 index 00000000..d3522c7e --- /dev/null +++ b/modules/store/src/main/scala/db/migration/common/MigrateCollectiveIdTaskArgs.scala @@ -0,0 +1,457 @@ +package db.migration.common + +import cats.syntax.all._ +import cats.effect._ +import docspell.common._ +import docspell.store.records.{RCollective, RJob, RPeriodicTask, RUser} +import doobie._ +import doobie.implicits._ +import db.migration.data.{ + AllPreviewsArgs => AllPreviewArgsLegacy, + ConvertAllPdfArgs => ConvertAllPdfArgsLegacy, + DownloadZipArgs => DownloadZipArgsLegacy, + EmptyTrashArgs => EmptyTrashArgsLegacy, + FileIntegrityCheckArgs => FileIntegrityCheckArgsLegacy, + ItemAddonTaskArgs => ItemAddonTaskArgsLegacy, + LearnClassifierArgs => LearnClassifierArgsLegacy, + PeriodicDueItemsArgs => PeriodicDueItemsArgsLegacy, + PeriodicQueryArgs => PeriodicQueryArgsLegacy, + ProcessItemArgs => ProcessItemArgsLegacy, + ReIndexTaskArgs => ReIndexTaskArgsLegacy, + ScanMailboxArgs => ScanMailboxArgsLegacy, + ScheduledAddonTaskArgs => ScheduledAddonTaskArgsLegacy +} +import docspell.notification.api.{PeriodicDueItemsArgs, PeriodicQueryArgs} +import docspell.store.qb._ +import docspell.store.qb.DSL._ +import io.circe.{Decoder, Encoder, parser} +import org.flywaydb.core.api.migration.Context + +object MigrateCollectiveIdTaskArgs extends TransactorSupport { + + val logger = docspell.logging.getLogger[IO] + val loggerCIO = docspell.logging.getLogger[ConnectionIO] + + def run(ctx: Context): IO[Unit] = { + val tx = mkTransactor(ctx) + migrateAllPreviewsArgs(tx) + } + + def migrateAllPreviewsArgs(xa: Transactor[IO]) = + for { + _ <- logger.info(s"Loading account info mappings") + idMap <- collectiveIdMapping.transact(xa) + accMap <- accountInfoMapping.transact(xa) + + _ <- logger.info("Converting job and periodic task arguments") + _ <- convertJob[ScheduledAddonTaskArgsLegacy, ScheduledAddonTaskArgs]( + ScheduledAddonTaskArgsLegacy.taskName, + convertScheduledAddonTaskArgs(idMap) + ).transact(xa) + _ <- convertPeriodicJob[ScheduledAddonTaskArgsLegacy, ScheduledAddonTaskArgs]( + ScheduledAddonTaskArgsLegacy.taskName, + convertScheduledAddonTaskArgs(idMap) + ).transact(xa) + + _ <- convertJob[ReIndexTaskArgsLegacy, ReIndexTaskArgs]( + ReIndexTaskArgsLegacy.taskName, + convertReIndexTaskArgs(idMap) + ).transact(xa) + + _ <- convertJob[ProcessItemArgsLegacy, ProcessItemArgs]( + ProcessItemArgsLegacy.taskName, + convertProcessItemArgs(idMap) + ).transact(xa) + _ <- convertJob[ProcessItemArgsLegacy, ProcessItemArgs]( + ProcessItemArgsLegacy.multiUploadTaskName, + convertProcessItemArgs(idMap) + ).transact(xa) + + _ <- convertJob[LearnClassifierArgsLegacy, LearnClassifierArgs]( + LearnClassifierArgsLegacy.taskName, + convertLearnClassifierArgs(idMap) + ).transact(xa) + _ <- convertPeriodicJob[LearnClassifierArgsLegacy, LearnClassifierArgs]( + LearnClassifierArgsLegacy.taskName, + convertLearnClassifierArgs(idMap) + ).transact(xa) + + _ <- convertJob[ItemAddonTaskArgsLegacy, ItemAddonTaskArgs]( + ItemAddonTaskArgsLegacy.taskName, + convertItemAddonTaskArgs(idMap) + ).transact(xa) + + _ <- convertJob[FileIntegrityCheckArgsLegacy, FileIntegrityCheckArgs]( + FileIntegrityCheckArgsLegacy.taskName, + convertFileIntegrityCheckArgs(idMap) + ).transact(xa) + + _ <- convertJob[EmptyTrashArgsLegacy, EmptyTrashArgs]( + EmptyTrashArgsLegacy.taskName, + convertEmptyTrashArgs(idMap) + ).transact(xa) + _ <- convertPeriodicJob[EmptyTrashArgsLegacy, EmptyTrashArgs]( + EmptyTrashArgsLegacy.taskName, + convertEmptyTrashArgs(idMap) + ).transact(xa) + + _ <- convertJob[AllPreviewArgsLegacy, AllPreviewsArgs]( + AllPreviewArgsLegacy.taskName, + convertAllPreviewsArgs(idMap) + ).transact(xa) + + _ <- convertJob[ConvertAllPdfArgsLegacy, ConvertAllPdfArgs]( + ConvertAllPdfArgsLegacy.taskName, + convertAllPdfArgs(idMap) + ).transact(xa) + + _ <- convertJob[ScanMailboxArgsLegacy, ScanMailboxArgs]( + ScanMailboxArgsLegacy.taskName, + convertScanMailboxArgs(accMap) + ).transact(xa) + _ <- convertPeriodicJob[ScanMailboxArgsLegacy, ScanMailboxArgs]( + ScanMailboxArgsLegacy.taskName, + convertScanMailboxArgs(accMap) + ).transact(xa) + + _ <- convertJob[PeriodicDueItemsArgsLegacy, PeriodicDueItemsArgs]( + PeriodicDueItemsArgsLegacy.taskName, + convertPeriodicDueItemsArgs(accMap) + ).transact(xa) + _ <- convertPeriodicJob[PeriodicDueItemsArgsLegacy, PeriodicDueItemsArgs]( + PeriodicDueItemsArgsLegacy.taskName, + convertPeriodicDueItemsArgs(accMap) + ).transact(xa) + + _ <- convertJob[PeriodicQueryArgsLegacy, PeriodicQueryArgs]( + PeriodicQueryArgs.taskName, + convertPeriodicQueryArgs(accMap) + ).transact(xa) + _ <- convertPeriodicJob[PeriodicQueryArgsLegacy, PeriodicQueryArgs]( + PeriodicQueryArgs.taskName, + convertPeriodicQueryArgs(accMap) + ).transact(xa) + + // The new DownloadZipArgs are not in scope here. These jobs are deleted, as they are + // done in 99% probably. If not a user will just click again on the "download all" + // button + _ <- RJob.deleteByTask(DownloadZipArgsLegacy.taskName).transact(xa) + _ <- logger.info("Done converting task arguments.") + + _ <- logger.info("Updating job submitter info") + _ <- updateJobSubmitter(idMap, accMap).transact(xa) + } yield () + + def updateJobSubmitter( + idMap: Map[Ident, CollectiveId], + accMap: Map[AccountId, AccountInfo] + ) = { + val job = RJob.as("j") + val pt = RPeriodicTask.as("pt") + + val updateUser = + accMap.toList.traverse_ { case (accId, accInfo) => + val up1 = + DML.update( + job, + job.group === accId.collective && job.submitter === accId.user, + DML.set( + job.group.setTo(accInfo.collectiveId.valueAsIdent), + job.submitter.setTo(accInfo.userId) + ) + ) + + val up2 = + DML.update( + pt, + pt.group === accId.collective && pt.submitter === accId.user, + DML.set( + pt.group.setTo(accInfo.collectiveId.valueAsIdent), + pt.submitter.setTo(accInfo.userId) + ) + ) + + val up3 = + DML.update( + job, + job.group === accId.collective && job.submitter === accId.collective, + DML.set( + job.group.setTo(accInfo.collectiveId.valueAsIdent), + job.submitter.setTo(accInfo.collectiveId.valueAsIdent) + ) + ) + + val up4 = + DML.update( + pt, + pt.group === accId.collective && pt.submitter === accId.collective, + DML.set( + pt.group.setTo(accInfo.collectiveId.valueAsIdent), + pt.submitter.setTo(accInfo.collectiveId.valueAsIdent) + ) + ) + + up1 *> up2 *> up3 *> up4 + } + + val updateJobGroup = + idMap.toList.traverse_ { case (cname, cid) => + val up1 = + DML.update(job, job.group === cname, DML.set(job.group.setTo(cid.valueAsIdent))) + val up2 = + DML.update(pt, pt.group === cname, DML.set(pt.group.setTo(cid.valueAsIdent))) + up1 *> up2 + } + + updateUser *> updateJobGroup + } + + def convertJob[S: Decoder, T: Encoder]( + task: Ident, + conv: S => Option[T] + ): ConnectionIO[Int] = + for { + jobs <- RJob.findByTaskName(task) + converted = + jobs.traverse(j => + convertJobArgs(j)(conv) + .map(_.pure[ConnectionIO]) + .recoverWith { + case ex if JobState.isDone(j.state) => + val cause = Option(ex.getCause).getOrElse(ex) + Either.right( + loggerCIO.warn( + s"Removing old job '${j.id.id}', because argument parsing failed: ${cause.getMessage}" + ) *> RJob.delete(j.id).as(Option.empty[RJob]) + ) + } + ) + conv <- Sync[ConnectionIO] + .pure(converted) + .rethrow + .flatMap(_.sequence) + .map(_.flatten) + _ <- conv.traverse_(j => RJob.setArgs(j.id, j.args)) + } yield conv.size + + def convertPeriodicJob[S: Decoder, T: Encoder]( + task: Ident, + conv: S => Option[T] + ): ConnectionIO[Int] = + for { + jobs <- RPeriodicTask.findByTask(task) + converted <- Sync[ConnectionIO] + .pure(jobs.traverse(j => convertPeriodicJobArgs(j)(conv))) + .rethrow + .map(_.flatten) + _ <- converted.traverse_(j => RPeriodicTask.setArgs(j.id, j.args)) + } yield converted.size + + private def convertPeriodicDueItemsArgs(accMap: Map[AccountId, AccountInfo])( + oldArgs: PeriodicDueItemsArgsLegacy + ) = + accMap + .get(oldArgs.account) + .map(acc => + PeriodicDueItemsArgs( + account = acc, + channels = oldArgs.channels, + remindDays = oldArgs.remindDays, + daysBack = oldArgs.daysBack, + tagsInclude = oldArgs.tagsInclude, + tagsExclude = oldArgs.tagsExclude, + baseUrl = oldArgs.baseUrl + ) + ) + + private def convertPeriodicQueryArgs( + accMap: Map[AccountId, AccountInfo] + )(oldArgs: PeriodicQueryArgsLegacy) = + accMap + .get(oldArgs.account) + .map(acc => + PeriodicQueryArgs( + account = acc, + channels = oldArgs.channels, + query = oldArgs.query, + bookmark = oldArgs.bookmark, + baseUrl = oldArgs.baseUrl, + contentStart = oldArgs.contentStart + ) + ) + + private def convertScanMailboxArgs( + accMap: Map[AccountId, AccountInfo] + )(oldArgs: ScanMailboxArgsLegacy) = + accMap + .get(oldArgs.account) + .map(acc => + ScanMailboxArgs( + account = acc, + imapConnection = oldArgs.imapConnection, + scanRecursively = oldArgs.scanRecursively, + folders = oldArgs.folders, + receivedSince = oldArgs.receivedSince, + targetFolder = oldArgs.targetFolder, + deleteMail = oldArgs.deleteMail, + direction = oldArgs.direction, + itemFolder = oldArgs.itemFolder, + fileFilter = oldArgs.fileFilter, + tags = oldArgs.tags, + subjectFilter = oldArgs.subjectFilter, + language = oldArgs.language, + postHandleAll = oldArgs.postHandleAll, + attachmentsOnly = oldArgs.attachmentsOnly + ) + ) + + private def convertScheduledAddonTaskArgs(idMap: Map[Ident, CollectiveId])( + oldArgs: ScheduledAddonTaskArgsLegacy + ) = + idMap + .get(oldArgs.collective) + .map(cid => ScheduledAddonTaskArgs(cid, oldArgs.addonTaskId)) + + private def convertReIndexTaskArgs( + idMap: Map[Ident, CollectiveId] + )(oldArgs: ReIndexTaskArgsLegacy) = + oldArgs.collective.flatMap { cname => + idMap + .get(cname) + .map(cid => ReIndexTaskArgs(cid.some)) + } + + private def convertProcessItemArgs(idMap: Map[Ident, CollectiveId])( + oldArgs: ProcessItemArgsLegacy + ) = + idMap + .get(oldArgs.meta.collective) + .map(cid => + ProcessItemArgs( + ProcessItemArgs.ProcessMeta( + collective = cid, + itemId = oldArgs.meta.itemId, + language = oldArgs.meta.language, + direction = oldArgs.meta.direction, + sourceAbbrev = oldArgs.meta.sourceAbbrev, + folderId = oldArgs.meta.folderId, + validFileTypes = oldArgs.meta.validFileTypes, + skipDuplicate = oldArgs.meta.skipDuplicate, + fileFilter = oldArgs.meta.fileFilter, + tags = oldArgs.meta.tags, + reprocess = oldArgs.meta.reprocess, + attachmentsOnly = oldArgs.meta.attachmentsOnly + ), + oldArgs.files.map(f => + ProcessItemArgs + .File(f.name, FileKey(cid, f.fileMetaId.category, f.fileMetaId.id)) + ) + ) + ) + + private def convertLearnClassifierArgs(idMap: Map[Ident, CollectiveId])( + oldArgs: LearnClassifierArgsLegacy + ) = + idMap + .get(oldArgs.collective) + .map(cid => LearnClassifierArgs(cid)) + + private def convertItemAddonTaskArgs(idMap: Map[Ident, CollectiveId])( + oldArgs: ItemAddonTaskArgsLegacy + ) = + idMap + .get(oldArgs.collective) + .map(cid => ItemAddonTaskArgs(cid, oldArgs.itemId, oldArgs.addonRunConfigs)) + + private def convertFileIntegrityCheckArgs(idMap: Map[Ident, CollectiveId])( + oldArgs: FileIntegrityCheckArgsLegacy + ) = + oldArgs.pattern match { + case FileIntegrityCheckArgsLegacy.FileKeyPart.Key(key) => + idMap + .get(key.collective) + .map(cid => + FileIntegrityCheckArgs(FileKeyPart.Key(FileKey(cid, key.category, key.id))) + ) + + case FileIntegrityCheckArgsLegacy.FileKeyPart.Collective(cname) => + idMap + .get(cname) + .map(cid => FileIntegrityCheckArgs(FileKeyPart.Collective(cid))) + + case FileIntegrityCheckArgsLegacy.FileKeyPart.Category(cname, category) => + idMap + .get(cname) + .map(cid => FileIntegrityCheckArgs(FileKeyPart.Category(cid, category))) + + case FileIntegrityCheckArgsLegacy.FileKeyPart.Empty => + None + } + + private def convertEmptyTrashArgs(idMap: Map[Ident, CollectiveId])( + oldArgs: EmptyTrashArgsLegacy + ) = + idMap.get(oldArgs.collective).map(cid => EmptyTrashArgs(cid, oldArgs.minAge)) + + private def convertAllPreviewsArgs(idMap: Map[Ident, CollectiveId])( + oldArgs: AllPreviewArgsLegacy + ) = + oldArgs.collective.flatMap { cname => + idMap + .get(cname) + .map(cid => AllPreviewsArgs(cid.some, oldArgs.storeMode)) + } + + private def convertAllPdfArgs(idMap: Map[Ident, CollectiveId])( + oldArgs: ConvertAllPdfArgsLegacy + ) = + oldArgs.collective.flatMap(cname => + idMap.get(cname).map(cid => ConvertAllPdfArgs(cid.some)) + ) + + def convertJobArgs[S: Decoder, T: Encoder]( + job: RJob + )(update: S => Option[T]): Either[Throwable, Option[RJob]] = + for { + oldArgs <- parser + .decode[S](job.args) + .leftMap(err => + new IllegalStateException( + s"Error parsing arguments of job: ${job.id.id}", + err + ) + ) + upJob = update(oldArgs).map(j => job.withArgs(j)) + } yield upJob + + def convertPeriodicJobArgs[S: Decoder, T: Encoder]( + job: RPeriodicTask + )(update: S => Option[T]): Either[Throwable, Option[RPeriodicTask]] = + for { + oldArgs <- parser + .decode[S](job.args) + .leftMap(err => + new IllegalStateException( + s"Error parsing arguments of periodic task: ${job.id.id}", + err + ) + ) + upJob = update(oldArgs).map(j => job.withArgs(j)) + } yield upJob + + private def collectiveIdMapping: ConnectionIO[Map[Ident, CollectiveId]] = + RCollective.findAll(_.id).map(_.map(coll => coll.name -> coll.id).toMap) + + private def accountInfoMapping: ConnectionIO[Map[AccountId, AccountInfo]] = { + val u = RUser.as("u") + val c = RCollective.as("c") + + Select( + select(c.id, c.name, u.uid, u.login), + from(u).innerJoin(c, c.id === u.cid) + ).build + .query[AccountInfo] + .to[List] + .map(_.map(a => a.asAccountId -> a).toMap) + } +} diff --git a/modules/store/src/main/scala/db/migration/common/MigrateDueItemTasks.scala b/modules/store/src/main/scala/db/migration/common/MigrateDueItemTasks.scala index 6f246718..3a3dfbc4 100644 --- a/modules/store/src/main/scala/db/migration/common/MigrateDueItemTasks.scala +++ b/modules/store/src/main/scala/db/migration/common/MigrateDueItemTasks.scala @@ -6,8 +6,8 @@ import cats.implicits._ import docspell.common._ import docspell.common.syntax.StringSyntax._ import docspell.notification.api._ -import docspell.store.queries.QLogin import docspell.store.records._ +import docspell.store.qb.DSL._ import db.migration.data.{ PeriodicDueItemsArgs => PeriodicDueItemsArgsLegacy, PeriodicQueryArgs => PeriodicQueryArgsLegacy, @@ -103,8 +103,7 @@ object MigrateDueItemTasks extends TransactorSupport with JsonCodecs { private def saveChannel(ch: Channel, account: AccountId): ConnectionIO[ChannelRef] = (for { newId <- OptionT.liftF(Ident.randomId[ConnectionIO]) - userData <- OptionT(QLogin.findAccount(account)) - userId = userData.userId + userId <- OptionT(findIdByAccountId(account)) r <- RNotificationChannel.fromChannel(ch, newId, userId) _ <- OptionT.liftF(RNotificationChannel.insert(r)) _ <- OptionT.liftF( @@ -113,4 +112,9 @@ object MigrateDueItemTasks extends TransactorSupport with JsonCodecs { ref = r.asRef } yield ref) .getOrElseF(Sync[ConnectionIO].raiseError(new Exception("User not found!"))) + + def findIdByAccountId(accountId: AccountId): ConnectionIO[Option[Ident]] = + sql"select u.uid from user_ u where u.cid = ${accountId.collective} and u.login = ${accountId.user}" + .query[Ident] + .option } diff --git a/modules/store/src/main/scala/db/migration/common/MigrateNotifyTasks.scala b/modules/store/src/main/scala/db/migration/common/MigrateNotifyTasks.scala index 5230086d..1f9a4d4d 100644 --- a/modules/store/src/main/scala/db/migration/common/MigrateNotifyTasks.scala +++ b/modules/store/src/main/scala/db/migration/common/MigrateNotifyTasks.scala @@ -3,14 +3,13 @@ package db.migration.common import cats.data.{NonEmptyList, OptionT} import cats.effect.{IO, Sync} import cats.implicits._ - import docspell.common._ import docspell.common.syntax.StringSyntax._ import docspell.notification.api._ -import docspell.store.queries.QLogin -import docspell.store.records._ - +import docspell.store.records.{RNotificationChannelMail, RPeriodicTask} +import docspell.store.qb.DSL._ import db.migration.data.{PeriodicDueItemsArgs => PeriodicDueItemsArgsLegacy, _} +import docspell.store.qb.DML import doobie._ import doobie.implicits._ import emil.MailAddress @@ -74,20 +73,20 @@ object MigrateNotifyTasks extends TransactorSupport { } for { - userData <- OptionT(QLogin.findAccount(old.account)) - userId = userData.userId + userId <- OptionT(findIdByAccountId(old.account)) id <- OptionT.liftF(Ident.randomId[ConnectionIO]) now <- OptionT.liftF(Timestamp.current[ConnectionIO]) + connId <- OptionT(findSmtpConnectionId(old.smtpConnection, userId)) chName = Some("migrate notify items") ch = RNotificationChannelMail( id, userId, chName, - old.smtpConnection, + connId, recs, now ) - _ <- OptionT.liftF(RNotificationChannelMail.insert(ch)) + _ <- OptionT.liftF(insert(ch)) args = PeriodicDueItemsArgsLegacy( old.account, NonEmptyList.of(ChannelRef(ch.id, ChannelType.Mail, chName)), @@ -99,4 +98,24 @@ object MigrateNotifyTasks extends TransactorSupport { ) } yield args } + + def findIdByAccountId(accountId: AccountId): ConnectionIO[Option[Ident]] = + sql"select u.uid from user_ u where u.cid = ${accountId.collective} and u.login = ${accountId.user}" + .query[Ident] + .option + + def findSmtpConnectionId(name: Ident, userId: Ident): ConnectionIO[Option[Ident]] = + sql"select id from useremail where uid = $userId and (name = $name or id = $name)" + .query[Ident] + .option + + // insert without 'name' column, it was added afterwards + def insert(r: RNotificationChannelMail): ConnectionIO[Int] = { + val t = RNotificationChannelMail.T + DML.insert( + t, + NonEmptyList.of(t.id, t.uid, t.connection, t.recipients, t.created), + sql"${r.id},${r.uid},${r.connection},${r.recipients},${r.created}" + ) + } } diff --git a/modules/store/src/main/scala/db/migration/data/FileIntegrityCheckArgs.scala b/modules/store/src/main/scala/db/migration/data/FileIntegrityCheckArgs.scala new file mode 100644 index 00000000..ec97977c --- /dev/null +++ b/modules/store/src/main/scala/db/migration/data/FileIntegrityCheckArgs.scala @@ -0,0 +1,84 @@ +/* + * Copyright 2020 Eike K. & Contributors + * + * SPDX-License-Identifier: AGPL-3.0-or-later + */ + +package db.migration.data + +import cats.implicits._ +import db.migration.data.FileIntegrityCheckArgs.FileKeyPart +import docspell.common.{FileCategory, Ident} +import io.circe.generic.semiauto.{deriveDecoder, deriveEncoder} +import io.circe.{Decoder, Encoder} +import io.circe.syntax._ +import io.circe.DecodingFailure + +/** @deprecated `FileKey` and `FileKeyPart` was replaced to use a `CollectiveId` */ +final case class FileIntegrityCheckArgs(pattern: FileKeyPart) + +object FileIntegrityCheckArgs { + val taskName: Ident = Ident.unsafe("all-file-integrity-check") + + final case class FileKey(collective: Ident, category: FileCategory, id: Ident) { + override def toString = + s"${collective.id}/${category.id.id}/${id.id}" + } + + object FileKey { + + implicit val jsonDecoder: Decoder[FileKey] = + deriveDecoder[FileKey] + + implicit val jsonEncoder: Encoder[FileKey] = + deriveEncoder[FileKey] + } + + sealed trait FileKeyPart {} + + object FileKeyPart { + + case object Empty extends FileKeyPart + + final case class Collective(collective: Ident) extends FileKeyPart + + final case class Category(collective: Ident, category: FileCategory) + extends FileKeyPart + + final case class Key(key: FileKey) extends FileKeyPart + + implicit val jsonEncoder: Encoder[FileKeyPart] = + Encoder.instance { + case Empty => ().asJson + case Collective(cid) => + Map("collective" -> cid.asJson).asJson + case Category(cid, cat) => + Map("collective" -> cid.asJson, "category" -> cat.asJson).asJson + case Key(key) => + key.asJson + } + + implicit val jsonDecoder: Decoder[FileKeyPart] = + Decoder.instance { cursor => + for { + cid <- cursor.getOrElse[Option[Ident]]("collective")(None) + cat <- cursor.getOrElse[Option[FileCategory]]("category")(None) + emptyObj = cursor.keys.exists(_.isEmpty) + + c3 = cursor.as[FileKey].map(Key).toOption + c2 = (cid, cat).mapN(Category) + c1 = cid.map(Collective) + c0 = Option.when(emptyObj)(Empty) + + c = c3.orElse(c2).orElse(c1).orElse(c0) + res <- c.toRight(DecodingFailure("", cursor.history)) + } yield res + } + } + + implicit val jsonDecoder: Decoder[FileIntegrityCheckArgs] = + deriveDecoder + + implicit val jsonEncoder: Encoder[FileIntegrityCheckArgs] = + deriveEncoder +} diff --git a/modules/store/src/main/scala/db/migration/data/ProcessItemArgs.scala b/modules/store/src/main/scala/db/migration/data/ProcessItemArgs.scala index 85c6a204..f951fed7 100644 --- a/modules/store/src/main/scala/db/migration/data/ProcessItemArgs.scala +++ b/modules/store/src/main/scala/db/migration/data/ProcessItemArgs.scala @@ -7,7 +7,7 @@ package db.migration.data import ProcessItemArgs._ -import docspell.common._ +import docspell.common.{FileIntegrityCheckArgs => _, _} import docspell.common.syntax.all._ import io.circe._ @@ -68,7 +68,7 @@ object ProcessItemArgs { implicit val jsonDecoder: Decoder[ProcessMeta] = deriveDecoder[ProcessMeta] } - case class File(name: Option[String], fileMetaId: FileKey) + case class File(name: Option[String], fileMetaId: FileIntegrityCheckArgs.FileKey) object File { implicit val jsonEncoder: Encoder[File] = deriveEncoder[File] implicit val jsonDecoder: Decoder[File] = deriveDecoder[File] diff --git a/modules/store/src/main/scala/db/migration/data/ScheduledAddonTaskArgs.scala b/modules/store/src/main/scala/db/migration/data/ScheduledAddonTaskArgs.scala index 5df1fd36..04064ab6 100644 --- a/modules/store/src/main/scala/db/migration/data/ScheduledAddonTaskArgs.scala +++ b/modules/store/src/main/scala/db/migration/data/ScheduledAddonTaskArgs.scala @@ -12,9 +12,6 @@ import io.circe.{Decoder, Encoder} /** @deprecated * This has been replaced with a version using a `CollectiveId` - * - * @param collective - * @param addonTaskId */ final case class ScheduledAddonTaskArgs(collective: Ident, addonTaskId: Ident) diff --git a/modules/store/src/main/scala/db/migration/h2/V1_39_2__MigrateTasks.scala b/modules/store/src/main/scala/db/migration/h2/V1_39_2__MigrateTasks.scala new file mode 100644 index 00000000..0d4f09e0 --- /dev/null +++ b/modules/store/src/main/scala/db/migration/h2/V1_39_2__MigrateTasks.scala @@ -0,0 +1,18 @@ +/* + * Copyright 2020 Eike K. & Contributors + * + * SPDX-License-Identifier: AGPL-3.0-or-later + */ + +package db.migration.h2 + +import cats.effect.unsafe.implicits._ +import db.migration.common.MigrateCollectiveIdTaskArgs +import org.flywaydb.core.api.migration.{BaseJavaMigration, Context} + +class V1_39_2__MigrateTasks extends BaseJavaMigration { + val logger = org.log4s.getLogger + + override def migrate(ctx: Context): Unit = + MigrateCollectiveIdTaskArgs.run(ctx).unsafeRunSync() +} diff --git a/modules/store/src/main/scala/db/migration/mariadb/V1_39_2__MigrateTasks.scala b/modules/store/src/main/scala/db/migration/mariadb/V1_39_2__MigrateTasks.scala new file mode 100644 index 00000000..d301dd55 --- /dev/null +++ b/modules/store/src/main/scala/db/migration/mariadb/V1_39_2__MigrateTasks.scala @@ -0,0 +1,18 @@ +/* + * Copyright 2020 Eike K. & Contributors + * + * SPDX-License-Identifier: AGPL-3.0-or-later + */ + +package db.migration.mariadb + +import cats.effect.unsafe.implicits._ +import db.migration.common.MigrateCollectiveIdTaskArgs +import org.flywaydb.core.api.migration.{BaseJavaMigration, Context} + +class V1_39_2__MigrateTasks extends BaseJavaMigration { + val logger = org.log4s.getLogger + + override def migrate(ctx: Context): Unit = + MigrateCollectiveIdTaskArgs.run(ctx).unsafeRunSync() +} diff --git a/modules/store/src/main/scala/db/migration/postgresql/V1_39_2__MigrateTasks.scala b/modules/store/src/main/scala/db/migration/postgresql/V1_39_2__MigrateTasks.scala new file mode 100644 index 00000000..8acd6336 --- /dev/null +++ b/modules/store/src/main/scala/db/migration/postgresql/V1_39_2__MigrateTasks.scala @@ -0,0 +1,18 @@ +/* + * Copyright 2020 Eike K. & Contributors + * + * SPDX-License-Identifier: AGPL-3.0-or-later + */ + +package db.migration.postgresql + +import cats.effect.unsafe.implicits._ +import db.migration.common.MigrateCollectiveIdTaskArgs +import org.flywaydb.core.api.migration.{BaseJavaMigration, Context} + +class V1_39_2__MigrateTasks extends BaseJavaMigration { + val logger = org.log4s.getLogger + + override def migrate(ctx: Context): Unit = + MigrateCollectiveIdTaskArgs.run(ctx).unsafeRunSync() +} diff --git a/modules/store/src/main/scala/docspell/store/file/BinnyUtils.scala b/modules/store/src/main/scala/docspell/store/file/BinnyUtils.scala index 773f2f6e..ee89bc88 100644 --- a/modules/store/src/main/scala/docspell/store/file/BinnyUtils.scala +++ b/modules/store/src/main/scala/docspell/store/file/BinnyUtils.scala @@ -29,8 +29,8 @@ object BinnyUtils { def fileKeyPartToPrefix(fkp: FileKeyPart): Option[String] = fkp match { case FileKeyPart.Empty => None - case FileKeyPart.Collective(cid) => Some(s"${cid.id}/%") - case FileKeyPart.Category(cid, cat) => Some(s"${cid.id}/${cat.id.id}/%") + case FileKeyPart.Collective(cid) => Some(s"${cid.value}/%") + case FileKeyPart.Category(cid, cat) => Some(s"${cid.value}/${cat.id.id}/%") case FileKeyPart.Key(key) => Some(fileKeyToBinaryId(key).id) } diff --git a/modules/store/src/main/scala/docspell/store/qb/DML.scala b/modules/store/src/main/scala/docspell/store/qb/DML.scala index 98939a34..e9b3c790 100644 --- a/modules/store/src/main/scala/docspell/store/qb/DML.scala +++ b/modules/store/src/main/scala/docspell/store/qb/DML.scala @@ -76,7 +76,7 @@ object DML extends DoobieMeta { setter: Nel[Setter[_]] ): Fragment = { val condFrag = cond.map(SelectBuilder.cond).getOrElse(Fragment.empty) - fr"UPDATE" ++ FromExprBuilder.buildTable(table) ++ fr"SET" ++ + fr"UPDATE" ++ FromExprBuilder.buildTable(table) ++ fr" SET" ++ setter .map(s => buildSetter(s)) .reduceLeft(_ ++ comma ++ _) ++ diff --git a/modules/store/src/main/scala/docspell/store/records/RJob.scala b/modules/store/src/main/scala/docspell/store/records/RJob.scala index 4804232b..f8b00e8e 100644 --- a/modules/store/src/main/scala/docspell/store/records/RJob.scala +++ b/modules/store/src/main/scala/docspell/store/records/RJob.scala @@ -44,6 +44,9 @@ case class RJob( def isInProgress: Boolean = JobState.inProgress.contains(state) + + def withArgs[A: Encoder](args: A): RJob = + copy(args = Encoder[A].apply(args).noSpaces) } object RJob { @@ -151,6 +154,13 @@ object RJob { ) } + def findByTaskName(task: Ident): ConnectionIO[Vector[RJob]] = + Select( + select(T.all), + from(T), + T.task === task + ).build.query[RJob].to[Vector] + def findFromIds(ids: Seq[Ident]): ConnectionIO[Vector[RJob]] = NonEmptyList.fromList(ids.toList) match { case None => @@ -180,6 +190,12 @@ object RJob { ) } + def setJsonArgs[A: Encoder](jobId: Ident, args: A): ConnectionIO[Int] = + DML.update(T, T.id === jobId, DML.set(T.args.setTo(Encoder[A].apply(args).noSpaces))) + + def setArgs(jobId: Ident, args: String): ConnectionIO[Int] = + DML.update(T, T.id === jobId, DML.set(T.args.setTo(args))) + def incrementRetries(jobid: Ident): ConnectionIO[Int] = DML .update( @@ -301,6 +317,14 @@ object RJob { n1 <- DML.delete(T, T.id === jobId) } yield n0 + n1 + def deleteByTask(task: Ident): ConnectionIO[Int] = { + val query = Select(select(T.id), from(T), T.task === task) + for { + n1 <- DML.delete(RJobLog.T, RJobLog.T.jobId.in(query)) + n2 <- DML.delete(T, T.task === task) + } yield n1 + n2 + } + def findIdsDoneAndOlderThan(ts: Timestamp): Stream[ConnectionIO, Ident] = run( select(T.id), diff --git a/modules/store/src/main/scala/docspell/store/records/RPeriodicTask.scala b/modules/store/src/main/scala/docspell/store/records/RPeriodicTask.scala index 813b9a10..b250d9d4 100644 --- a/modules/store/src/main/scala/docspell/store/records/RPeriodicTask.scala +++ b/modules/store/src/main/scala/docspell/store/records/RPeriodicTask.scala @@ -9,14 +9,13 @@ package docspell.store.records import cats.data.NonEmptyList import cats.effect._ import cats.implicits._ - import docspell.common._ import docspell.store.qb.DSL._ import docspell.store.qb._ - import com.github.eikek.calev.CalEvent import doobie._ import doobie.implicits._ +import io.circe.Encoder /** A periodic task is a special job description, that shares a few properties of a * `RJob`. It must provide all information to create a `RJob` value eventually. @@ -38,6 +37,9 @@ case class RPeriodicTask( summary: Option[String] ) { + def withArgs[A: Encoder](args: A): RPeriodicTask = + copy(args = Encoder[A].apply(args).noSpaces) + def toJob[F[_]: Sync]: F[RJob] = for { now <- Timestamp.current[F] @@ -112,6 +114,9 @@ object RPeriodicTask { def updateTask(id: Ident, taskName: Ident, args: String): ConnectionIO[Int] = DML.update(T, T.id === id, DML.set(T.task.setTo(taskName), T.args.setTo(args))) + def setArgs(taskId: Ident, args: String): ConnectionIO[Int] = + DML.update(T, T.id === taskId, DML.set(T.args.setTo(args))) + def setEnabledByTask(taskName: Ident, enabled: Boolean): ConnectionIO[Int] = DML.update(T, T.task === taskName, DML.set(T.enabled.setTo(enabled)))