mirror of
				https://github.com/TheAnachronism/docspell.git
				synced 2025-10-31 09:30:12 +00:00 
			
		
		
		
	Migrate background task to new collective id
This commit is contained in:
		| @@ -14,7 +14,7 @@ import io.circe.generic.semiauto.{deriveDecoder, deriveEncoder} | ||||
| import io.circe.syntax._ | ||||
| import io.circe.{Decoder, Encoder} | ||||
|  | ||||
| /** This is the input to the `FileCopyTask`. The task copies all files from on | ||||
| /** This is the input to the `FileCopyTask`. The task copies all files from one | ||||
|   * FileRepository to one ore more target repositories. | ||||
|   * | ||||
|   * If no `from` is given, the default file repository is used. For targets, a list of ids | ||||
|   | ||||
| @@ -17,9 +17,10 @@ object FileKeyPart { | ||||
|  | ||||
|   case object Empty extends FileKeyPart | ||||
|  | ||||
|   final case class Collective(collective: Ident) extends FileKeyPart | ||||
|   final case class Collective(collective: CollectiveId) extends FileKeyPart | ||||
|  | ||||
|   final case class Category(collective: Ident, category: FileCategory) extends FileKeyPart | ||||
|   final case class Category(collective: CollectiveId, category: FileCategory) | ||||
|       extends FileKeyPart | ||||
|  | ||||
|   final case class Key(key: FileKey) extends FileKeyPart | ||||
|  | ||||
| @@ -37,7 +38,7 @@ object FileKeyPart { | ||||
|   implicit val jsonDecoder: Decoder[FileKeyPart] = | ||||
|     Decoder.instance { cursor => | ||||
|       for { | ||||
|         cid <- cursor.getOrElse[Option[Ident]]("collective")(None) | ||||
|         cid <- cursor.getOrElse[Option[CollectiveId]]("collective")(None) | ||||
|         cat <- cursor.getOrElse[Option[FileCategory]]("category")(None) | ||||
|         emptyObj = cursor.keys.exists(_.isEmpty) | ||||
|  | ||||
|   | ||||
| @@ -15,9 +15,7 @@ trait StringSyntax { | ||||
|       Option(s).filter(_.trim.nonEmpty) | ||||
|  | ||||
|     def parseJsonAs[A](implicit d: Decoder[A]): Either[Throwable, A] = | ||||
|       for { | ||||
|         json <- parser.decode[A](s) | ||||
|       } yield json | ||||
|       parser.decode[A](s) | ||||
|   } | ||||
| } | ||||
|  | ||||
|   | ||||
| @@ -0,0 +1,457 @@ | ||||
| package db.migration.common | ||||
|  | ||||
| import cats.syntax.all._ | ||||
| import cats.effect._ | ||||
| import docspell.common._ | ||||
| import docspell.store.records.{RCollective, RJob, RPeriodicTask, RUser} | ||||
| import doobie._ | ||||
| import doobie.implicits._ | ||||
| import db.migration.data.{ | ||||
|   AllPreviewsArgs => AllPreviewArgsLegacy, | ||||
|   ConvertAllPdfArgs => ConvertAllPdfArgsLegacy, | ||||
|   DownloadZipArgs => DownloadZipArgsLegacy, | ||||
|   EmptyTrashArgs => EmptyTrashArgsLegacy, | ||||
|   FileIntegrityCheckArgs => FileIntegrityCheckArgsLegacy, | ||||
|   ItemAddonTaskArgs => ItemAddonTaskArgsLegacy, | ||||
|   LearnClassifierArgs => LearnClassifierArgsLegacy, | ||||
|   PeriodicDueItemsArgs => PeriodicDueItemsArgsLegacy, | ||||
|   PeriodicQueryArgs => PeriodicQueryArgsLegacy, | ||||
|   ProcessItemArgs => ProcessItemArgsLegacy, | ||||
|   ReIndexTaskArgs => ReIndexTaskArgsLegacy, | ||||
|   ScanMailboxArgs => ScanMailboxArgsLegacy, | ||||
|   ScheduledAddonTaskArgs => ScheduledAddonTaskArgsLegacy | ||||
| } | ||||
| import docspell.notification.api.{PeriodicDueItemsArgs, PeriodicQueryArgs} | ||||
| import docspell.store.qb._ | ||||
| import docspell.store.qb.DSL._ | ||||
| import io.circe.{Decoder, Encoder, parser} | ||||
| import org.flywaydb.core.api.migration.Context | ||||
|  | ||||
| object MigrateCollectiveIdTaskArgs extends TransactorSupport { | ||||
|  | ||||
|   val logger = docspell.logging.getLogger[IO] | ||||
|   val loggerCIO = docspell.logging.getLogger[ConnectionIO] | ||||
|  | ||||
|   def run(ctx: Context): IO[Unit] = { | ||||
|     val tx = mkTransactor(ctx) | ||||
|     migrateAllPreviewsArgs(tx) | ||||
|   } | ||||
|  | ||||
|   def migrateAllPreviewsArgs(xa: Transactor[IO]) = | ||||
|     for { | ||||
|       _ <- logger.info(s"Loading account info mappings") | ||||
|       idMap <- collectiveIdMapping.transact(xa) | ||||
|       accMap <- accountInfoMapping.transact(xa) | ||||
|  | ||||
|       _ <- logger.info("Converting job and periodic task arguments") | ||||
|       _ <- convertJob[ScheduledAddonTaskArgsLegacy, ScheduledAddonTaskArgs]( | ||||
|         ScheduledAddonTaskArgsLegacy.taskName, | ||||
|         convertScheduledAddonTaskArgs(idMap) | ||||
|       ).transact(xa) | ||||
|       _ <- convertPeriodicJob[ScheduledAddonTaskArgsLegacy, ScheduledAddonTaskArgs]( | ||||
|         ScheduledAddonTaskArgsLegacy.taskName, | ||||
|         convertScheduledAddonTaskArgs(idMap) | ||||
|       ).transact(xa) | ||||
|  | ||||
|       _ <- convertJob[ReIndexTaskArgsLegacy, ReIndexTaskArgs]( | ||||
|         ReIndexTaskArgsLegacy.taskName, | ||||
|         convertReIndexTaskArgs(idMap) | ||||
|       ).transact(xa) | ||||
|  | ||||
|       _ <- convertJob[ProcessItemArgsLegacy, ProcessItemArgs]( | ||||
|         ProcessItemArgsLegacy.taskName, | ||||
|         convertProcessItemArgs(idMap) | ||||
|       ).transact(xa) | ||||
|       _ <- convertJob[ProcessItemArgsLegacy, ProcessItemArgs]( | ||||
|         ProcessItemArgsLegacy.multiUploadTaskName, | ||||
|         convertProcessItemArgs(idMap) | ||||
|       ).transact(xa) | ||||
|  | ||||
|       _ <- convertJob[LearnClassifierArgsLegacy, LearnClassifierArgs]( | ||||
|         LearnClassifierArgsLegacy.taskName, | ||||
|         convertLearnClassifierArgs(idMap) | ||||
|       ).transact(xa) | ||||
|       _ <- convertPeriodicJob[LearnClassifierArgsLegacy, LearnClassifierArgs]( | ||||
|         LearnClassifierArgsLegacy.taskName, | ||||
|         convertLearnClassifierArgs(idMap) | ||||
|       ).transact(xa) | ||||
|  | ||||
|       _ <- convertJob[ItemAddonTaskArgsLegacy, ItemAddonTaskArgs]( | ||||
|         ItemAddonTaskArgsLegacy.taskName, | ||||
|         convertItemAddonTaskArgs(idMap) | ||||
|       ).transact(xa) | ||||
|  | ||||
|       _ <- convertJob[FileIntegrityCheckArgsLegacy, FileIntegrityCheckArgs]( | ||||
|         FileIntegrityCheckArgsLegacy.taskName, | ||||
|         convertFileIntegrityCheckArgs(idMap) | ||||
|       ).transact(xa) | ||||
|  | ||||
|       _ <- convertJob[EmptyTrashArgsLegacy, EmptyTrashArgs]( | ||||
|         EmptyTrashArgsLegacy.taskName, | ||||
|         convertEmptyTrashArgs(idMap) | ||||
|       ).transact(xa) | ||||
|       _ <- convertPeriodicJob[EmptyTrashArgsLegacy, EmptyTrashArgs]( | ||||
|         EmptyTrashArgsLegacy.taskName, | ||||
|         convertEmptyTrashArgs(idMap) | ||||
|       ).transact(xa) | ||||
|  | ||||
|       _ <- convertJob[AllPreviewArgsLegacy, AllPreviewsArgs]( | ||||
|         AllPreviewArgsLegacy.taskName, | ||||
|         convertAllPreviewsArgs(idMap) | ||||
|       ).transact(xa) | ||||
|  | ||||
|       _ <- convertJob[ConvertAllPdfArgsLegacy, ConvertAllPdfArgs]( | ||||
|         ConvertAllPdfArgsLegacy.taskName, | ||||
|         convertAllPdfArgs(idMap) | ||||
|       ).transact(xa) | ||||
|  | ||||
|       _ <- convertJob[ScanMailboxArgsLegacy, ScanMailboxArgs]( | ||||
|         ScanMailboxArgsLegacy.taskName, | ||||
|         convertScanMailboxArgs(accMap) | ||||
|       ).transact(xa) | ||||
|       _ <- convertPeriodicJob[ScanMailboxArgsLegacy, ScanMailboxArgs]( | ||||
|         ScanMailboxArgsLegacy.taskName, | ||||
|         convertScanMailboxArgs(accMap) | ||||
|       ).transact(xa) | ||||
|  | ||||
|       _ <- convertJob[PeriodicDueItemsArgsLegacy, PeriodicDueItemsArgs]( | ||||
|         PeriodicDueItemsArgsLegacy.taskName, | ||||
|         convertPeriodicDueItemsArgs(accMap) | ||||
|       ).transact(xa) | ||||
|       _ <- convertPeriodicJob[PeriodicDueItemsArgsLegacy, PeriodicDueItemsArgs]( | ||||
|         PeriodicDueItemsArgsLegacy.taskName, | ||||
|         convertPeriodicDueItemsArgs(accMap) | ||||
|       ).transact(xa) | ||||
|  | ||||
|       _ <- convertJob[PeriodicQueryArgsLegacy, PeriodicQueryArgs]( | ||||
|         PeriodicQueryArgs.taskName, | ||||
|         convertPeriodicQueryArgs(accMap) | ||||
|       ).transact(xa) | ||||
|       _ <- convertPeriodicJob[PeriodicQueryArgsLegacy, PeriodicQueryArgs]( | ||||
|         PeriodicQueryArgs.taskName, | ||||
|         convertPeriodicQueryArgs(accMap) | ||||
|       ).transact(xa) | ||||
|  | ||||
|       // The new DownloadZipArgs are not in scope here. These jobs are deleted, as they are | ||||
|       // done in 99% probably. If not a user will just click again on the "download all" | ||||
|       // button | ||||
|       _ <- RJob.deleteByTask(DownloadZipArgsLegacy.taskName).transact(xa) | ||||
|       _ <- logger.info("Done converting task arguments.") | ||||
|  | ||||
|       _ <- logger.info("Updating job submitter info") | ||||
|       _ <- updateJobSubmitter(idMap, accMap).transact(xa) | ||||
|     } yield () | ||||
|  | ||||
|   def updateJobSubmitter( | ||||
|       idMap: Map[Ident, CollectiveId], | ||||
|       accMap: Map[AccountId, AccountInfo] | ||||
|   ) = { | ||||
|     val job = RJob.as("j") | ||||
|     val pt = RPeriodicTask.as("pt") | ||||
|  | ||||
|     val updateUser = | ||||
|       accMap.toList.traverse_ { case (accId, accInfo) => | ||||
|         val up1 = | ||||
|           DML.update( | ||||
|             job, | ||||
|             job.group === accId.collective && job.submitter === accId.user, | ||||
|             DML.set( | ||||
|               job.group.setTo(accInfo.collectiveId.valueAsIdent), | ||||
|               job.submitter.setTo(accInfo.userId) | ||||
|             ) | ||||
|           ) | ||||
|  | ||||
|         val up2 = | ||||
|           DML.update( | ||||
|             pt, | ||||
|             pt.group === accId.collective && pt.submitter === accId.user, | ||||
|             DML.set( | ||||
|               pt.group.setTo(accInfo.collectiveId.valueAsIdent), | ||||
|               pt.submitter.setTo(accInfo.userId) | ||||
|             ) | ||||
|           ) | ||||
|  | ||||
|         val up3 = | ||||
|           DML.update( | ||||
|             job, | ||||
|             job.group === accId.collective && job.submitter === accId.collective, | ||||
|             DML.set( | ||||
|               job.group.setTo(accInfo.collectiveId.valueAsIdent), | ||||
|               job.submitter.setTo(accInfo.collectiveId.valueAsIdent) | ||||
|             ) | ||||
|           ) | ||||
|  | ||||
|         val up4 = | ||||
|           DML.update( | ||||
|             pt, | ||||
|             pt.group === accId.collective && pt.submitter === accId.collective, | ||||
|             DML.set( | ||||
|               pt.group.setTo(accInfo.collectiveId.valueAsIdent), | ||||
|               pt.submitter.setTo(accInfo.collectiveId.valueAsIdent) | ||||
|             ) | ||||
|           ) | ||||
|  | ||||
|         up1 *> up2 *> up3 *> up4 | ||||
|       } | ||||
|  | ||||
|     val updateJobGroup = | ||||
|       idMap.toList.traverse_ { case (cname, cid) => | ||||
|         val up1 = | ||||
|           DML.update(job, job.group === cname, DML.set(job.group.setTo(cid.valueAsIdent))) | ||||
|         val up2 = | ||||
|           DML.update(pt, pt.group === cname, DML.set(pt.group.setTo(cid.valueAsIdent))) | ||||
|         up1 *> up2 | ||||
|       } | ||||
|  | ||||
|     updateUser *> updateJobGroup | ||||
|   } | ||||
|  | ||||
|   def convertJob[S: Decoder, T: Encoder]( | ||||
|       task: Ident, | ||||
|       conv: S => Option[T] | ||||
|   ): ConnectionIO[Int] = | ||||
|     for { | ||||
|       jobs <- RJob.findByTaskName(task) | ||||
|       converted = | ||||
|         jobs.traverse(j => | ||||
|           convertJobArgs(j)(conv) | ||||
|             .map(_.pure[ConnectionIO]) | ||||
|             .recoverWith { | ||||
|               case ex if JobState.isDone(j.state) => | ||||
|                 val cause = Option(ex.getCause).getOrElse(ex) | ||||
|                 Either.right( | ||||
|                   loggerCIO.warn( | ||||
|                     s"Removing old job '${j.id.id}', because argument parsing failed: ${cause.getMessage}" | ||||
|                   ) *> RJob.delete(j.id).as(Option.empty[RJob]) | ||||
|                 ) | ||||
|             } | ||||
|         ) | ||||
|       conv <- Sync[ConnectionIO] | ||||
|         .pure(converted) | ||||
|         .rethrow | ||||
|         .flatMap(_.sequence) | ||||
|         .map(_.flatten) | ||||
|       _ <- conv.traverse_(j => RJob.setArgs(j.id, j.args)) | ||||
|     } yield conv.size | ||||
|  | ||||
|   def convertPeriodicJob[S: Decoder, T: Encoder]( | ||||
|       task: Ident, | ||||
|       conv: S => Option[T] | ||||
|   ): ConnectionIO[Int] = | ||||
|     for { | ||||
|       jobs <- RPeriodicTask.findByTask(task) | ||||
|       converted <- Sync[ConnectionIO] | ||||
|         .pure(jobs.traverse(j => convertPeriodicJobArgs(j)(conv))) | ||||
|         .rethrow | ||||
|         .map(_.flatten) | ||||
|       _ <- converted.traverse_(j => RPeriodicTask.setArgs(j.id, j.args)) | ||||
|     } yield converted.size | ||||
|  | ||||
|   private def convertPeriodicDueItemsArgs(accMap: Map[AccountId, AccountInfo])( | ||||
|       oldArgs: PeriodicDueItemsArgsLegacy | ||||
|   ) = | ||||
|     accMap | ||||
|       .get(oldArgs.account) | ||||
|       .map(acc => | ||||
|         PeriodicDueItemsArgs( | ||||
|           account = acc, | ||||
|           channels = oldArgs.channels, | ||||
|           remindDays = oldArgs.remindDays, | ||||
|           daysBack = oldArgs.daysBack, | ||||
|           tagsInclude = oldArgs.tagsInclude, | ||||
|           tagsExclude = oldArgs.tagsExclude, | ||||
|           baseUrl = oldArgs.baseUrl | ||||
|         ) | ||||
|       ) | ||||
|  | ||||
|   private def convertPeriodicQueryArgs( | ||||
|       accMap: Map[AccountId, AccountInfo] | ||||
|   )(oldArgs: PeriodicQueryArgsLegacy) = | ||||
|     accMap | ||||
|       .get(oldArgs.account) | ||||
|       .map(acc => | ||||
|         PeriodicQueryArgs( | ||||
|           account = acc, | ||||
|           channels = oldArgs.channels, | ||||
|           query = oldArgs.query, | ||||
|           bookmark = oldArgs.bookmark, | ||||
|           baseUrl = oldArgs.baseUrl, | ||||
|           contentStart = oldArgs.contentStart | ||||
|         ) | ||||
|       ) | ||||
|  | ||||
|   private def convertScanMailboxArgs( | ||||
|       accMap: Map[AccountId, AccountInfo] | ||||
|   )(oldArgs: ScanMailboxArgsLegacy) = | ||||
|     accMap | ||||
|       .get(oldArgs.account) | ||||
|       .map(acc => | ||||
|         ScanMailboxArgs( | ||||
|           account = acc, | ||||
|           imapConnection = oldArgs.imapConnection, | ||||
|           scanRecursively = oldArgs.scanRecursively, | ||||
|           folders = oldArgs.folders, | ||||
|           receivedSince = oldArgs.receivedSince, | ||||
|           targetFolder = oldArgs.targetFolder, | ||||
|           deleteMail = oldArgs.deleteMail, | ||||
|           direction = oldArgs.direction, | ||||
|           itemFolder = oldArgs.itemFolder, | ||||
|           fileFilter = oldArgs.fileFilter, | ||||
|           tags = oldArgs.tags, | ||||
|           subjectFilter = oldArgs.subjectFilter, | ||||
|           language = oldArgs.language, | ||||
|           postHandleAll = oldArgs.postHandleAll, | ||||
|           attachmentsOnly = oldArgs.attachmentsOnly | ||||
|         ) | ||||
|       ) | ||||
|  | ||||
|   private def convertScheduledAddonTaskArgs(idMap: Map[Ident, CollectiveId])( | ||||
|       oldArgs: ScheduledAddonTaskArgsLegacy | ||||
|   ) = | ||||
|     idMap | ||||
|       .get(oldArgs.collective) | ||||
|       .map(cid => ScheduledAddonTaskArgs(cid, oldArgs.addonTaskId)) | ||||
|  | ||||
|   private def convertReIndexTaskArgs( | ||||
|       idMap: Map[Ident, CollectiveId] | ||||
|   )(oldArgs: ReIndexTaskArgsLegacy) = | ||||
|     oldArgs.collective.flatMap { cname => | ||||
|       idMap | ||||
|         .get(cname) | ||||
|         .map(cid => ReIndexTaskArgs(cid.some)) | ||||
|     } | ||||
|  | ||||
|   private def convertProcessItemArgs(idMap: Map[Ident, CollectiveId])( | ||||
|       oldArgs: ProcessItemArgsLegacy | ||||
|   ) = | ||||
|     idMap | ||||
|       .get(oldArgs.meta.collective) | ||||
|       .map(cid => | ||||
|         ProcessItemArgs( | ||||
|           ProcessItemArgs.ProcessMeta( | ||||
|             collective = cid, | ||||
|             itemId = oldArgs.meta.itemId, | ||||
|             language = oldArgs.meta.language, | ||||
|             direction = oldArgs.meta.direction, | ||||
|             sourceAbbrev = oldArgs.meta.sourceAbbrev, | ||||
|             folderId = oldArgs.meta.folderId, | ||||
|             validFileTypes = oldArgs.meta.validFileTypes, | ||||
|             skipDuplicate = oldArgs.meta.skipDuplicate, | ||||
|             fileFilter = oldArgs.meta.fileFilter, | ||||
|             tags = oldArgs.meta.tags, | ||||
|             reprocess = oldArgs.meta.reprocess, | ||||
|             attachmentsOnly = oldArgs.meta.attachmentsOnly | ||||
|           ), | ||||
|           oldArgs.files.map(f => | ||||
|             ProcessItemArgs | ||||
|               .File(f.name, FileKey(cid, f.fileMetaId.category, f.fileMetaId.id)) | ||||
|           ) | ||||
|         ) | ||||
|       ) | ||||
|  | ||||
|   private def convertLearnClassifierArgs(idMap: Map[Ident, CollectiveId])( | ||||
|       oldArgs: LearnClassifierArgsLegacy | ||||
|   ) = | ||||
|     idMap | ||||
|       .get(oldArgs.collective) | ||||
|       .map(cid => LearnClassifierArgs(cid)) | ||||
|  | ||||
|   private def convertItemAddonTaskArgs(idMap: Map[Ident, CollectiveId])( | ||||
|       oldArgs: ItemAddonTaskArgsLegacy | ||||
|   ) = | ||||
|     idMap | ||||
|       .get(oldArgs.collective) | ||||
|       .map(cid => ItemAddonTaskArgs(cid, oldArgs.itemId, oldArgs.addonRunConfigs)) | ||||
|  | ||||
|   private def convertFileIntegrityCheckArgs(idMap: Map[Ident, CollectiveId])( | ||||
|       oldArgs: FileIntegrityCheckArgsLegacy | ||||
|   ) = | ||||
|     oldArgs.pattern match { | ||||
|       case FileIntegrityCheckArgsLegacy.FileKeyPart.Key(key) => | ||||
|         idMap | ||||
|           .get(key.collective) | ||||
|           .map(cid => | ||||
|             FileIntegrityCheckArgs(FileKeyPart.Key(FileKey(cid, key.category, key.id))) | ||||
|           ) | ||||
|  | ||||
|       case FileIntegrityCheckArgsLegacy.FileKeyPart.Collective(cname) => | ||||
|         idMap | ||||
|           .get(cname) | ||||
|           .map(cid => FileIntegrityCheckArgs(FileKeyPart.Collective(cid))) | ||||
|  | ||||
|       case FileIntegrityCheckArgsLegacy.FileKeyPart.Category(cname, category) => | ||||
|         idMap | ||||
|           .get(cname) | ||||
|           .map(cid => FileIntegrityCheckArgs(FileKeyPart.Category(cid, category))) | ||||
|  | ||||
|       case FileIntegrityCheckArgsLegacy.FileKeyPart.Empty => | ||||
|         None | ||||
|     } | ||||
|  | ||||
|   private def convertEmptyTrashArgs(idMap: Map[Ident, CollectiveId])( | ||||
|       oldArgs: EmptyTrashArgsLegacy | ||||
|   ) = | ||||
|     idMap.get(oldArgs.collective).map(cid => EmptyTrashArgs(cid, oldArgs.minAge)) | ||||
|  | ||||
|   private def convertAllPreviewsArgs(idMap: Map[Ident, CollectiveId])( | ||||
|       oldArgs: AllPreviewArgsLegacy | ||||
|   ) = | ||||
|     oldArgs.collective.flatMap { cname => | ||||
|       idMap | ||||
|         .get(cname) | ||||
|         .map(cid => AllPreviewsArgs(cid.some, oldArgs.storeMode)) | ||||
|     } | ||||
|  | ||||
|   private def convertAllPdfArgs(idMap: Map[Ident, CollectiveId])( | ||||
|       oldArgs: ConvertAllPdfArgsLegacy | ||||
|   ) = | ||||
|     oldArgs.collective.flatMap(cname => | ||||
|       idMap.get(cname).map(cid => ConvertAllPdfArgs(cid.some)) | ||||
|     ) | ||||
|  | ||||
|   def convertJobArgs[S: Decoder, T: Encoder]( | ||||
|       job: RJob | ||||
|   )(update: S => Option[T]): Either[Throwable, Option[RJob]] = | ||||
|     for { | ||||
|       oldArgs <- parser | ||||
|         .decode[S](job.args) | ||||
|         .leftMap(err => | ||||
|           new IllegalStateException( | ||||
|             s"Error parsing arguments of job: ${job.id.id}", | ||||
|             err | ||||
|           ) | ||||
|         ) | ||||
|       upJob = update(oldArgs).map(j => job.withArgs(j)) | ||||
|     } yield upJob | ||||
|  | ||||
|   def convertPeriodicJobArgs[S: Decoder, T: Encoder]( | ||||
|       job: RPeriodicTask | ||||
|   )(update: S => Option[T]): Either[Throwable, Option[RPeriodicTask]] = | ||||
|     for { | ||||
|       oldArgs <- parser | ||||
|         .decode[S](job.args) | ||||
|         .leftMap(err => | ||||
|           new IllegalStateException( | ||||
|             s"Error parsing arguments of periodic task: ${job.id.id}", | ||||
|             err | ||||
|           ) | ||||
|         ) | ||||
|       upJob = update(oldArgs).map(j => job.withArgs(j)) | ||||
|     } yield upJob | ||||
|  | ||||
|   private def collectiveIdMapping: ConnectionIO[Map[Ident, CollectiveId]] = | ||||
|     RCollective.findAll(_.id).map(_.map(coll => coll.name -> coll.id).toMap) | ||||
|  | ||||
|   private def accountInfoMapping: ConnectionIO[Map[AccountId, AccountInfo]] = { | ||||
|     val u = RUser.as("u") | ||||
|     val c = RCollective.as("c") | ||||
|  | ||||
|     Select( | ||||
|       select(c.id, c.name, u.uid, u.login), | ||||
|       from(u).innerJoin(c, c.id === u.cid) | ||||
|     ).build | ||||
|       .query[AccountInfo] | ||||
|       .to[List] | ||||
|       .map(_.map(a => a.asAccountId -> a).toMap) | ||||
|   } | ||||
| } | ||||
| @@ -6,8 +6,8 @@ import cats.implicits._ | ||||
| import docspell.common._ | ||||
| import docspell.common.syntax.StringSyntax._ | ||||
| import docspell.notification.api._ | ||||
| import docspell.store.queries.QLogin | ||||
| import docspell.store.records._ | ||||
| import docspell.store.qb.DSL._ | ||||
| import db.migration.data.{ | ||||
|   PeriodicDueItemsArgs => PeriodicDueItemsArgsLegacy, | ||||
|   PeriodicQueryArgs => PeriodicQueryArgsLegacy, | ||||
| @@ -103,8 +103,7 @@ object MigrateDueItemTasks extends TransactorSupport with JsonCodecs { | ||||
|   private def saveChannel(ch: Channel, account: AccountId): ConnectionIO[ChannelRef] = | ||||
|     (for { | ||||
|       newId <- OptionT.liftF(Ident.randomId[ConnectionIO]) | ||||
|       userData <- OptionT(QLogin.findAccount(account)) | ||||
|       userId = userData.userId | ||||
|       userId <- OptionT(findIdByAccountId(account)) | ||||
|       r <- RNotificationChannel.fromChannel(ch, newId, userId) | ||||
|       _ <- OptionT.liftF(RNotificationChannel.insert(r)) | ||||
|       _ <- OptionT.liftF( | ||||
| @@ -113,4 +112,9 @@ object MigrateDueItemTasks extends TransactorSupport with JsonCodecs { | ||||
|       ref = r.asRef | ||||
|     } yield ref) | ||||
|       .getOrElseF(Sync[ConnectionIO].raiseError(new Exception("User not found!"))) | ||||
|  | ||||
|   def findIdByAccountId(accountId: AccountId): ConnectionIO[Option[Ident]] = | ||||
|     sql"select u.uid from user_ u where u.cid = ${accountId.collective} and u.login = ${accountId.user}" | ||||
|       .query[Ident] | ||||
|       .option | ||||
| } | ||||
|   | ||||
| @@ -3,14 +3,13 @@ package db.migration.common | ||||
| import cats.data.{NonEmptyList, OptionT} | ||||
| import cats.effect.{IO, Sync} | ||||
| import cats.implicits._ | ||||
|  | ||||
| import docspell.common._ | ||||
| import docspell.common.syntax.StringSyntax._ | ||||
| import docspell.notification.api._ | ||||
| import docspell.store.queries.QLogin | ||||
| import docspell.store.records._ | ||||
|  | ||||
| import docspell.store.records.{RNotificationChannelMail, RPeriodicTask} | ||||
| import docspell.store.qb.DSL._ | ||||
| import db.migration.data.{PeriodicDueItemsArgs => PeriodicDueItemsArgsLegacy, _} | ||||
| import docspell.store.qb.DML | ||||
| import doobie._ | ||||
| import doobie.implicits._ | ||||
| import emil.MailAddress | ||||
| @@ -74,20 +73,20 @@ object MigrateNotifyTasks extends TransactorSupport { | ||||
|       } | ||||
|  | ||||
|     for { | ||||
|       userData <- OptionT(QLogin.findAccount(old.account)) | ||||
|       userId = userData.userId | ||||
|       userId <- OptionT(findIdByAccountId(old.account)) | ||||
|       id <- OptionT.liftF(Ident.randomId[ConnectionIO]) | ||||
|       now <- OptionT.liftF(Timestamp.current[ConnectionIO]) | ||||
|       connId <- OptionT(findSmtpConnectionId(old.smtpConnection, userId)) | ||||
|       chName = Some("migrate notify items") | ||||
|       ch = RNotificationChannelMail( | ||||
|         id, | ||||
|         userId, | ||||
|         chName, | ||||
|         old.smtpConnection, | ||||
|         connId, | ||||
|         recs, | ||||
|         now | ||||
|       ) | ||||
|       _ <- OptionT.liftF(RNotificationChannelMail.insert(ch)) | ||||
|       _ <- OptionT.liftF(insert(ch)) | ||||
|       args = PeriodicDueItemsArgsLegacy( | ||||
|         old.account, | ||||
|         NonEmptyList.of(ChannelRef(ch.id, ChannelType.Mail, chName)), | ||||
| @@ -99,4 +98,24 @@ object MigrateNotifyTasks extends TransactorSupport { | ||||
|       ) | ||||
|     } yield args | ||||
|   } | ||||
|  | ||||
|   def findIdByAccountId(accountId: AccountId): ConnectionIO[Option[Ident]] = | ||||
|     sql"select u.uid from user_ u where u.cid = ${accountId.collective} and u.login = ${accountId.user}" | ||||
|       .query[Ident] | ||||
|       .option | ||||
|  | ||||
|   def findSmtpConnectionId(name: Ident, userId: Ident): ConnectionIO[Option[Ident]] = | ||||
|     sql"select id from useremail where uid = $userId and (name = $name or id = $name)" | ||||
|       .query[Ident] | ||||
|       .option | ||||
|  | ||||
|   // insert without 'name' column, it was added afterwards | ||||
|   def insert(r: RNotificationChannelMail): ConnectionIO[Int] = { | ||||
|     val t = RNotificationChannelMail.T | ||||
|     DML.insert( | ||||
|       t, | ||||
|       NonEmptyList.of(t.id, t.uid, t.connection, t.recipients, t.created), | ||||
|       sql"${r.id},${r.uid},${r.connection},${r.recipients},${r.created}" | ||||
|     ) | ||||
|   } | ||||
| } | ||||
|   | ||||
| @@ -0,0 +1,84 @@ | ||||
| /* | ||||
|  * Copyright 2020 Eike K. & Contributors | ||||
|  * | ||||
|  * SPDX-License-Identifier: AGPL-3.0-or-later | ||||
|  */ | ||||
|  | ||||
| package db.migration.data | ||||
|  | ||||
| import cats.implicits._ | ||||
| import db.migration.data.FileIntegrityCheckArgs.FileKeyPart | ||||
| import docspell.common.{FileCategory, Ident} | ||||
| import io.circe.generic.semiauto.{deriveDecoder, deriveEncoder} | ||||
| import io.circe.{Decoder, Encoder} | ||||
| import io.circe.syntax._ | ||||
| import io.circe.DecodingFailure | ||||
|  | ||||
| /** @deprecated `FileKey` and `FileKeyPart` was replaced to use a `CollectiveId` */ | ||||
| final case class FileIntegrityCheckArgs(pattern: FileKeyPart) | ||||
|  | ||||
| object FileIntegrityCheckArgs { | ||||
|   val taskName: Ident = Ident.unsafe("all-file-integrity-check") | ||||
|  | ||||
|   final case class FileKey(collective: Ident, category: FileCategory, id: Ident) { | ||||
|     override def toString = | ||||
|       s"${collective.id}/${category.id.id}/${id.id}" | ||||
|   } | ||||
|  | ||||
|   object FileKey { | ||||
|  | ||||
|     implicit val jsonDecoder: Decoder[FileKey] = | ||||
|       deriveDecoder[FileKey] | ||||
|  | ||||
|     implicit val jsonEncoder: Encoder[FileKey] = | ||||
|       deriveEncoder[FileKey] | ||||
|   } | ||||
|  | ||||
|   sealed trait FileKeyPart {} | ||||
|  | ||||
|   object FileKeyPart { | ||||
|  | ||||
|     case object Empty extends FileKeyPart | ||||
|  | ||||
|     final case class Collective(collective: Ident) extends FileKeyPart | ||||
|  | ||||
|     final case class Category(collective: Ident, category: FileCategory) | ||||
|         extends FileKeyPart | ||||
|  | ||||
|     final case class Key(key: FileKey) extends FileKeyPart | ||||
|  | ||||
|     implicit val jsonEncoder: Encoder[FileKeyPart] = | ||||
|       Encoder.instance { | ||||
|         case Empty => ().asJson | ||||
|         case Collective(cid) => | ||||
|           Map("collective" -> cid.asJson).asJson | ||||
|         case Category(cid, cat) => | ||||
|           Map("collective" -> cid.asJson, "category" -> cat.asJson).asJson | ||||
|         case Key(key) => | ||||
|           key.asJson | ||||
|       } | ||||
|  | ||||
|     implicit val jsonDecoder: Decoder[FileKeyPart] = | ||||
|       Decoder.instance { cursor => | ||||
|         for { | ||||
|           cid <- cursor.getOrElse[Option[Ident]]("collective")(None) | ||||
|           cat <- cursor.getOrElse[Option[FileCategory]]("category")(None) | ||||
|           emptyObj = cursor.keys.exists(_.isEmpty) | ||||
|  | ||||
|           c3 = cursor.as[FileKey].map(Key).toOption | ||||
|           c2 = (cid, cat).mapN(Category) | ||||
|           c1 = cid.map(Collective) | ||||
|           c0 = Option.when(emptyObj)(Empty) | ||||
|  | ||||
|           c = c3.orElse(c2).orElse(c1).orElse(c0) | ||||
|           res <- c.toRight(DecodingFailure("", cursor.history)) | ||||
|         } yield res | ||||
|       } | ||||
|   } | ||||
|  | ||||
|   implicit val jsonDecoder: Decoder[FileIntegrityCheckArgs] = | ||||
|     deriveDecoder | ||||
|  | ||||
|   implicit val jsonEncoder: Encoder[FileIntegrityCheckArgs] = | ||||
|     deriveEncoder | ||||
| } | ||||
| @@ -7,7 +7,7 @@ | ||||
| package db.migration.data | ||||
|  | ||||
| import ProcessItemArgs._ | ||||
| import docspell.common._ | ||||
| import docspell.common.{FileIntegrityCheckArgs => _, _} | ||||
| import docspell.common.syntax.all._ | ||||
|  | ||||
| import io.circe._ | ||||
| @@ -68,7 +68,7 @@ object ProcessItemArgs { | ||||
|     implicit val jsonDecoder: Decoder[ProcessMeta] = deriveDecoder[ProcessMeta] | ||||
|   } | ||||
|  | ||||
|   case class File(name: Option[String], fileMetaId: FileKey) | ||||
|   case class File(name: Option[String], fileMetaId: FileIntegrityCheckArgs.FileKey) | ||||
|   object File { | ||||
|     implicit val jsonEncoder: Encoder[File] = deriveEncoder[File] | ||||
|     implicit val jsonDecoder: Decoder[File] = deriveDecoder[File] | ||||
|   | ||||
| @@ -12,9 +12,6 @@ import io.circe.{Decoder, Encoder} | ||||
|  | ||||
| /** @deprecated | ||||
|   *   This has been replaced with a version using a `CollectiveId` | ||||
|   * | ||||
|   * @param collective | ||||
|   * @param addonTaskId | ||||
|   */ | ||||
| final case class ScheduledAddonTaskArgs(collective: Ident, addonTaskId: Ident) | ||||
|  | ||||
|   | ||||
| @@ -0,0 +1,18 @@ | ||||
| /* | ||||
|  * Copyright 2020 Eike K. & Contributors | ||||
|  * | ||||
|  * SPDX-License-Identifier: AGPL-3.0-or-later | ||||
|  */ | ||||
|  | ||||
| package db.migration.h2 | ||||
|  | ||||
| import cats.effect.unsafe.implicits._ | ||||
| import db.migration.common.MigrateCollectiveIdTaskArgs | ||||
| import org.flywaydb.core.api.migration.{BaseJavaMigration, Context} | ||||
|  | ||||
| class V1_39_2__MigrateTasks extends BaseJavaMigration { | ||||
|   val logger = org.log4s.getLogger | ||||
|  | ||||
|   override def migrate(ctx: Context): Unit = | ||||
|     MigrateCollectiveIdTaskArgs.run(ctx).unsafeRunSync() | ||||
| } | ||||
| @@ -0,0 +1,18 @@ | ||||
| /* | ||||
|  * Copyright 2020 Eike K. & Contributors | ||||
|  * | ||||
|  * SPDX-License-Identifier: AGPL-3.0-or-later | ||||
|  */ | ||||
|  | ||||
| package db.migration.mariadb | ||||
|  | ||||
| import cats.effect.unsafe.implicits._ | ||||
| import db.migration.common.MigrateCollectiveIdTaskArgs | ||||
| import org.flywaydb.core.api.migration.{BaseJavaMigration, Context} | ||||
|  | ||||
| class V1_39_2__MigrateTasks extends BaseJavaMigration { | ||||
|   val logger = org.log4s.getLogger | ||||
|  | ||||
|   override def migrate(ctx: Context): Unit = | ||||
|     MigrateCollectiveIdTaskArgs.run(ctx).unsafeRunSync() | ||||
| } | ||||
| @@ -0,0 +1,18 @@ | ||||
| /* | ||||
|  * Copyright 2020 Eike K. & Contributors | ||||
|  * | ||||
|  * SPDX-License-Identifier: AGPL-3.0-or-later | ||||
|  */ | ||||
|  | ||||
| package db.migration.postgresql | ||||
|  | ||||
| import cats.effect.unsafe.implicits._ | ||||
| import db.migration.common.MigrateCollectiveIdTaskArgs | ||||
| import org.flywaydb.core.api.migration.{BaseJavaMigration, Context} | ||||
|  | ||||
| class V1_39_2__MigrateTasks extends BaseJavaMigration { | ||||
|   val logger = org.log4s.getLogger | ||||
|  | ||||
|   override def migrate(ctx: Context): Unit = | ||||
|     MigrateCollectiveIdTaskArgs.run(ctx).unsafeRunSync() | ||||
| } | ||||
| @@ -29,8 +29,8 @@ object BinnyUtils { | ||||
|   def fileKeyPartToPrefix(fkp: FileKeyPart): Option[String] = | ||||
|     fkp match { | ||||
|       case FileKeyPart.Empty              => None | ||||
|       case FileKeyPart.Collective(cid)    => Some(s"${cid.id}/%") | ||||
|       case FileKeyPart.Category(cid, cat) => Some(s"${cid.id}/${cat.id.id}/%") | ||||
|       case FileKeyPart.Collective(cid)    => Some(s"${cid.value}/%") | ||||
|       case FileKeyPart.Category(cid, cat) => Some(s"${cid.value}/${cat.id.id}/%") | ||||
|       case FileKeyPart.Key(key)           => Some(fileKeyToBinaryId(key).id) | ||||
|     } | ||||
|  | ||||
|   | ||||
| @@ -76,7 +76,7 @@ object DML extends DoobieMeta { | ||||
|       setter: Nel[Setter[_]] | ||||
|   ): Fragment = { | ||||
|     val condFrag = cond.map(SelectBuilder.cond).getOrElse(Fragment.empty) | ||||
|     fr"UPDATE" ++ FromExprBuilder.buildTable(table) ++ fr"SET" ++ | ||||
|     fr"UPDATE" ++ FromExprBuilder.buildTable(table) ++ fr" SET" ++ | ||||
|       setter | ||||
|         .map(s => buildSetter(s)) | ||||
|         .reduceLeft(_ ++ comma ++ _) ++ | ||||
|   | ||||
| @@ -44,6 +44,9 @@ case class RJob( | ||||
|  | ||||
|   def isInProgress: Boolean = | ||||
|     JobState.inProgress.contains(state) | ||||
|  | ||||
|   def withArgs[A: Encoder](args: A): RJob = | ||||
|     copy(args = Encoder[A].apply(args).noSpaces) | ||||
| } | ||||
|  | ||||
| object RJob { | ||||
| @@ -151,6 +154,13 @@ object RJob { | ||||
|     ) | ||||
|   } | ||||
|  | ||||
|   def findByTaskName(task: Ident): ConnectionIO[Vector[RJob]] = | ||||
|     Select( | ||||
|       select(T.all), | ||||
|       from(T), | ||||
|       T.task === task | ||||
|     ).build.query[RJob].to[Vector] | ||||
|  | ||||
|   def findFromIds(ids: Seq[Ident]): ConnectionIO[Vector[RJob]] = | ||||
|     NonEmptyList.fromList(ids.toList) match { | ||||
|       case None => | ||||
| @@ -180,6 +190,12 @@ object RJob { | ||||
|     ) | ||||
|   } | ||||
|  | ||||
|   def setJsonArgs[A: Encoder](jobId: Ident, args: A): ConnectionIO[Int] = | ||||
|     DML.update(T, T.id === jobId, DML.set(T.args.setTo(Encoder[A].apply(args).noSpaces))) | ||||
|  | ||||
|   def setArgs(jobId: Ident, args: String): ConnectionIO[Int] = | ||||
|     DML.update(T, T.id === jobId, DML.set(T.args.setTo(args))) | ||||
|  | ||||
|   def incrementRetries(jobid: Ident): ConnectionIO[Int] = | ||||
|     DML | ||||
|       .update( | ||||
| @@ -301,6 +317,14 @@ object RJob { | ||||
|       n1 <- DML.delete(T, T.id === jobId) | ||||
|     } yield n0 + n1 | ||||
|  | ||||
|   def deleteByTask(task: Ident): ConnectionIO[Int] = { | ||||
|     val query = Select(select(T.id), from(T), T.task === task) | ||||
|     for { | ||||
|       n1 <- DML.delete(RJobLog.T, RJobLog.T.jobId.in(query)) | ||||
|       n2 <- DML.delete(T, T.task === task) | ||||
|     } yield n1 + n2 | ||||
|   } | ||||
|  | ||||
|   def findIdsDoneAndOlderThan(ts: Timestamp): Stream[ConnectionIO, Ident] = | ||||
|     run( | ||||
|       select(T.id), | ||||
|   | ||||
| @@ -9,14 +9,13 @@ package docspell.store.records | ||||
| import cats.data.NonEmptyList | ||||
| import cats.effect._ | ||||
| import cats.implicits._ | ||||
|  | ||||
| import docspell.common._ | ||||
| import docspell.store.qb.DSL._ | ||||
| import docspell.store.qb._ | ||||
|  | ||||
| import com.github.eikek.calev.CalEvent | ||||
| import doobie._ | ||||
| import doobie.implicits._ | ||||
| import io.circe.Encoder | ||||
|  | ||||
| /** A periodic task is a special job description, that shares a few properties of a | ||||
|   * `RJob`. It must provide all information to create a `RJob` value eventually. | ||||
| @@ -38,6 +37,9 @@ case class RPeriodicTask( | ||||
|     summary: Option[String] | ||||
| ) { | ||||
|  | ||||
|   def withArgs[A: Encoder](args: A): RPeriodicTask = | ||||
|     copy(args = Encoder[A].apply(args).noSpaces) | ||||
|  | ||||
|   def toJob[F[_]: Sync]: F[RJob] = | ||||
|     for { | ||||
|       now <- Timestamp.current[F] | ||||
| @@ -112,6 +114,9 @@ object RPeriodicTask { | ||||
|   def updateTask(id: Ident, taskName: Ident, args: String): ConnectionIO[Int] = | ||||
|     DML.update(T, T.id === id, DML.set(T.task.setTo(taskName), T.args.setTo(args))) | ||||
|  | ||||
|   def setArgs(taskId: Ident, args: String): ConnectionIO[Int] = | ||||
|     DML.update(T, T.id === taskId, DML.set(T.args.setTo(args))) | ||||
|  | ||||
|   def setEnabledByTask(taskName: Ident, enabled: Boolean): ConnectionIO[Int] = | ||||
|     DML.update(T, T.task === taskName, DML.set(T.enabled.setTo(enabled))) | ||||
|  | ||||
|   | ||||
		Reference in New Issue
	
	Block a user