Make sure the empty-trash task is started for all collectives

This commit is contained in:
eikek 2021-08-14 20:10:42 +02:00
parent 50706c3d6d
commit 27fd7a5867
6 changed files with 55 additions and 15 deletions

View File

@ -185,7 +185,7 @@ object OCollective {
None, None,
EmptyTrashArgs(coll) EmptyTrashArgs(coll)
) )
_ <- uts.updateOneTask(AccountId(coll, EmptyTrashArgs.taskName), ut) _ <- uts.updateOneTask(AccountId(coll, coll), ut)
_ <- joex.notifyAllNodes _ <- joex.notifyAllNodes
} yield () } yield ()
@ -215,7 +215,7 @@ object OCollective {
CalEvent(WeekdayComponent.All, DateEvent.All, TimeEvent.All), CalEvent(WeekdayComponent.All, DateEvent.All, TimeEvent.All),
None, None,
EmptyTrashArgs(collective) EmptyTrashArgs(collective)
).encode.toPeriodicTask(AccountId(collective, EmptyTrashArgs.taskName)) ).encode.toPeriodicTask(AccountId(collective, collective))
job <- ut.toJob job <- ut.toJob
_ <- queue.insert(job) _ <- queue.insert(job)
_ <- joex.notifyAllNodes _ <- joex.notifyAllNodes

View File

@ -6,8 +6,8 @@
package docspell.common package docspell.common
import com.github.eikek.calev.CalEvent
import docspell.common.syntax.all._ import docspell.common.syntax.all._
import io.circe._ import io.circe._
import io.circe.generic.semiauto._ import io.circe.generic.semiauto._
@ -21,7 +21,7 @@ case class EmptyTrashArgs(
) { ) {
def makeSubject: String = def makeSubject: String =
"Empty trash " "Empty trash"
} }
@ -29,6 +29,8 @@ object EmptyTrashArgs {
val taskName = Ident.unsafe("empty-trash") val taskName = Ident.unsafe("empty-trash")
val defaultSchedule = CalEvent.unsafe("*-*-1/7 03:00:00")
implicit val jsonEncoder: Encoder[EmptyTrashArgs] = implicit val jsonEncoder: Encoder[EmptyTrashArgs] =
deriveEncoder[EmptyTrashArgs] deriveEncoder[EmptyTrashArgs]
implicit val jsonDecoder: Decoder[EmptyTrashArgs] = implicit val jsonDecoder: Decoder[EmptyTrashArgs] =

View File

@ -7,11 +7,9 @@
package docspell.joex package docspell.joex
import scala.concurrent.ExecutionContext import scala.concurrent.ExecutionContext
import cats.effect._ import cats.effect._
import cats.implicits._ import cats.implicits._
import fs2.concurrent.SignallingRef import fs2.concurrent.SignallingRef
import docspell.analysis.TextAnalyser import docspell.analysis.TextAnalyser
import docspell.backend.ops._ import docspell.backend.ops._
import docspell.common._ import docspell.common._
@ -34,8 +32,7 @@ import docspell.joex.scheduler._
import docspell.joexapi.client.JoexClient import docspell.joexapi.client.JoexClient
import docspell.store.Store import docspell.store.Store
import docspell.store.queue._ import docspell.store.queue._
import docspell.store.records.RJobLog import docspell.store.records.{REmptyTrashSetting, RJobLog}
import emil.javamail._ import emil.javamail._
import org.http4s.blaze.client.BlazeClientBuilder import org.http4s.blaze.client.BlazeClientBuilder
import org.http4s.client.Client import org.http4s.client.Client
@ -77,11 +74,23 @@ final class JoexAppImpl[F[_]: Async](
HouseKeepingTask HouseKeepingTask
.periodicTask[F](cfg.houseKeeping.schedule) .periodicTask[F](cfg.houseKeeping.schedule)
.flatMap(pstore.insert) *> .flatMap(pstore.insert) *>
scheduleEmptyTrashTasks *>
MigrationTask.job.flatMap(queue.insertIfNew) *> MigrationTask.job.flatMap(queue.insertIfNew) *>
AllPreviewsTask AllPreviewsTask
.job(MakePreviewArgs.StoreMode.WhenMissing, None) .job(MakePreviewArgs.StoreMode.WhenMissing, None)
.flatMap(queue.insertIfNew) *> .flatMap(queue.insertIfNew) *>
AllPageCountTask.job.flatMap(queue.insertIfNew) AllPageCountTask.job.flatMap(queue.insertIfNew)
private def scheduleEmptyTrashTasks: F[Unit] =
store
.transact(
REmptyTrashSetting.findForAllCollectives(EmptyTrashArgs.defaultSchedule, 50)
)
.evalMap(es => EmptyTrashTask.periodicTask(es.cid, es.schedule))
.evalMap(pstore.insert)
.compile
.drain
} }
object JoexAppImpl { object JoexAppImpl {

View File

@ -8,15 +8,15 @@ package docspell.joex.emptytrash
import cats.effect._ import cats.effect._
import cats.implicits._ import cats.implicits._
import com.github.eikek.calev.CalEvent
import fs2.Stream import fs2.Stream
import docspell.backend.ops.{OItem, OItemSearch} import docspell.backend.ops.{OItem, OItemSearch}
import docspell.common._ import docspell.common._
import docspell.joex.scheduler._ import docspell.joex.scheduler._
import docspell.store.records.RItem import docspell.store.records.{RItem, RPeriodicTask}
import docspell.store.usertask.UserTask
object EmptyTrashTask { object EmptyTrashTask {
type Args = EmptyTrashArgs type Args = EmptyTrashArgs
def onCancel[F[_]]: Task[F, Args, Unit] = def onCancel[F[_]]: Task[F, Args, Unit] =
@ -24,6 +24,19 @@ object EmptyTrashTask {
private val pageSize = 20 private val pageSize = 20
def periodicTask[F[_]: Sync](collective: Ident, ce: CalEvent): F[RPeriodicTask] = {
Ident.randomId[F].flatMap( id =>
UserTask(
id,
EmptyTrashArgs.taskName,
true,
ce,
None,
EmptyTrashArgs(collective)
).encode.toPeriodicTask(AccountId(collective, collective)))
}
def apply[F[_]: Async]( def apply[F[_]: Async](
itemOps: OItem[F], itemOps: OItem[F],
itemSearchOps: OItemSearch[F] itemSearchOps: OItemSearch[F]

View File

@ -8,15 +8,13 @@ package docspell.restserver.routes
import cats.effect._ import cats.effect._
import cats.implicits._ import cats.implicits._
import docspell.backend.BackendApp import docspell.backend.BackendApp
import docspell.backend.auth.AuthToken import docspell.backend.auth.AuthToken
import docspell.backend.ops.OCollective import docspell.backend.ops.OCollective
import docspell.common.ListType import docspell.common.{EmptyTrashArgs, ListType}
import docspell.restapi.model._ import docspell.restapi.model._
import docspell.restserver.conv.Conversions import docspell.restserver.conv.Conversions
import docspell.restserver.http4s._ import docspell.restserver.http4s._
import com.github.eikek.calev.CalEvent import com.github.eikek.calev.CalEvent
import org.http4s.HttpRoutes import org.http4s.HttpRoutes
import org.http4s.circe.CirceEntityDecoder._ import org.http4s.circe.CirceEntityDecoder._
@ -71,7 +69,7 @@ object CollectiveRoutes {
CollectiveSettings( CollectiveSettings(
c.language, c.language,
c.integrationEnabled, c.integrationEnabled,
c.emptyTrash.getOrElse(CalEvent.unsafe("*-*-1/7 03:00:00")), c.emptyTrash.getOrElse(EmptyTrashArgs.defaultSchedule),
ClassifierSetting( ClassifierSetting(
c.classifier.map(_.itemCount).getOrElse(0), c.classifier.map(_.itemCount).getOrElse(0),
c.classifier c.classifier

View File

@ -8,6 +8,7 @@ package docspell.store.records
import cats.data.NonEmptyList import cats.data.NonEmptyList
import cats.implicits._ import cats.implicits._
import fs2.Stream
import docspell.common._ import docspell.common._
import docspell.store.qb.DSL._ import docspell.store.qb.DSL._
@ -62,6 +63,23 @@ object REmptyTrashSetting {
sql.query[REmptyTrashSetting].option sql.query[REmptyTrashSetting].option
} }
def findForAllCollectives(
default: CalEvent,
chunkSize: Int
): Stream[ConnectionIO, REmptyTrashSetting] = {
val c = RCollective.as("c")
val e = REmptyTrashSetting.as("e")
val sql = run(
select(
c.id.s,
coalesce(e.schedule.s, const(default)).s,
coalesce(e.created.s, c.created.s).s
),
from(c).leftJoin(e, e.cid === c.id)
)
sql.query[REmptyTrashSetting].streamWithChunkSize(chunkSize)
}
def delete(coll: Ident): ConnectionIO[Int] = def delete(coll: Ident): ConnectionIO[Int] =
DML.delete(T, T.cid === coll) DML.delete(T, T.cid === coll)