Cleanup nodes that are not reachable anymore

This commit is contained in:
Eike Kettner 2021-02-18 00:34:37 +01:00
parent b16166f1e9
commit d7bc963450
9 changed files with 139 additions and 7 deletions

View File

@ -170,6 +170,14 @@ docspell.joex {
# whether more or less memory should be used. # whether more or less memory should be used.
delete-batch = "100" delete-batch = "100"
} }
# Removes node entries that are not reachable anymore.
check-nodes {
# Whether this task is enabled
enabled = true
# How often the node must be unreachable, before it is removed.
min-not-found = 2
}
} }
# Configuration of text extraction # Configuration of text extraction

View File

@ -0,0 +1,67 @@
package docspell.joex.hk
import cats.effect._
import cats.implicits._
import docspell.common._
import docspell.joex.scheduler.{Context, Task}
import docspell.store.records._
import org.http4s.client.Client
import org.http4s.client.blaze.BlazeClientBuilder
object CheckNodesTask {
def apply[F[_]: ConcurrentEffect](
cfg: HouseKeepingConfig.CheckNodes
): Task[F, Unit, Unit] =
Task { ctx =>
if (cfg.enabled)
for {
_ <- ctx.logger.info("Check nodes reachability")
_ <- BlazeClientBuilder[F](ctx.blocker.blockingContext).resource.use { client =>
checkNodes(ctx, client)
}
_ <- ctx.logger.info(
s"Remove nodes not found more than ${cfg.minNotFound} times"
)
n <- removeNodes(ctx, cfg)
_ <- ctx.logger.info(s"Removed $n nodes")
} yield ()
else
ctx.logger.info("CheckNodes task is disabled in the configuration")
}
def checkNodes[F[_]: Sync](ctx: Context[F, _], client: Client[F]): F[Unit] =
ctx.store
.transact(RNode.streamAll)
.evalMap(node =>
checkNode(ctx.logger, client)(node.url)
.flatMap(seen =>
if (seen) ctx.store.transact(RNode.resetNotFound(node.id))
else ctx.store.transact(RNode.incrementNotFound(node.id))
)
)
.compile
.drain
def checkNode[F[_]: Sync](logger: Logger[F], client: Client[F])(
url: LenientUri
): F[Boolean] = {
val apiVersion = url / "api" / "info" / "version"
for {
res <- client.expect[String](apiVersion.asString).attempt
_ <- res.fold(
ex => logger.info(s"Node ${url.asString} not found: ${ex.getMessage}"),
_ => logger.info(s"Node ${url.asString} is reachable")
)
} yield res.isRight
}
def removeNodes[F[_]: Sync](
ctx: Context[F, _],
cfg: HouseKeepingConfig.CheckNodes
): F[Int] =
ctx.store.transact(RNode.deleteNotFound(cfg.minNotFound))
}

View File

@ -9,7 +9,8 @@ case class HouseKeepingConfig(
schedule: CalEvent, schedule: CalEvent,
cleanupInvites: CleanupInvites, cleanupInvites: CleanupInvites,
cleanupJobs: CleanupJobs, cleanupJobs: CleanupJobs,
cleanupRememberMe: CleanupRememberMe cleanupRememberMe: CleanupRememberMe,
checkNodes: CheckNodes
) )
object HouseKeepingConfig { object HouseKeepingConfig {
@ -20,4 +21,6 @@ object HouseKeepingConfig {
case class CleanupRememberMe(enabled: Boolean, olderThan: Duration) case class CleanupRememberMe(enabled: Boolean, olderThan: Duration)
case class CheckNodes(enabled: Boolean, minNotFound: Int)
} }

View File

@ -15,12 +15,13 @@ object HouseKeepingTask {
val taskName: Ident = Ident.unsafe("housekeeping") val taskName: Ident = Ident.unsafe("housekeeping")
def apply[F[_]: Sync](cfg: Config): Task[F, Unit, Unit] = def apply[F[_]: ConcurrentEffect](cfg: Config): Task[F, Unit, Unit] =
Task Task
.log[F, Unit](_.info(s"Running house-keeping task now")) .log[F, Unit](_.info(s"Running house-keeping task now"))
.flatMap(_ => CleanupInvitesTask(cfg.houseKeeping.cleanupInvites)) .flatMap(_ => CleanupInvitesTask(cfg.houseKeeping.cleanupInvites))
.flatMap(_ => CleanupRememberMeTask(cfg.houseKeeping.cleanupRememberMe)) .flatMap(_ => CleanupRememberMeTask(cfg.houseKeeping.cleanupRememberMe))
.flatMap(_ => CleanupJobsTask(cfg.houseKeeping.cleanupJobs)) .flatMap(_ => CleanupJobsTask(cfg.houseKeeping.cleanupJobs))
.flatMap(_ => CheckNodesTask(cfg.houseKeeping.checkNodes))
def onCancel[F[_]: Sync]: Task[F, Unit, Unit] = def onCancel[F[_]: Sync]: Task[F, Unit, Unit] =
Task.log[F, Unit](_.warn("Cancelling house-keeping task")) Task.log[F, Unit](_.warn("Cancelling house-keeping task"))

View File

@ -0,0 +1,2 @@
ALTER TABLE "node"
ADD COLUMN "not_found" int not null default 0;

View File

@ -0,0 +1,2 @@
ALTER TABLE `node`
ADD COLUMN `not_found` int not null default 0;

View File

@ -0,0 +1,2 @@
ALTER TABLE "node"
ADD COLUMN "not_found" int not null default 0;

View File

@ -1,8 +1,8 @@
package docspell.store.records package docspell.store.records
import cats.data.NonEmptyList import cats.data.NonEmptyList
import cats.effect.Sync import cats.effect.Sync
import cats.implicits._ import cats.implicits._
import fs2.Stream
import docspell.common._ import docspell.common._
import docspell.store.qb.DSL._ import docspell.store.qb.DSL._
@ -16,13 +16,14 @@ case class RNode(
nodeType: NodeType, nodeType: NodeType,
url: LenientUri, url: LenientUri,
updated: Timestamp, updated: Timestamp,
created: Timestamp created: Timestamp,
notFound: Int
) {} ) {}
object RNode { object RNode {
def apply[F[_]: Sync](id: Ident, nodeType: NodeType, uri: LenientUri): F[RNode] = def apply[F[_]: Sync](id: Ident, nodeType: NodeType, uri: LenientUri): F[RNode] =
Timestamp.current[F].map(now => RNode(id, nodeType, uri, now, now)) Timestamp.current[F].map(now => RNode(id, nodeType, uri, now, now, 0))
final case class Table(alias: Option[String]) extends TableDef { final case class Table(alias: Option[String]) extends TableDef {
val tableName = "node" val tableName = "node"
@ -32,18 +33,20 @@ object RNode {
val url = Column[LenientUri]("url", this) val url = Column[LenientUri]("url", this)
val updated = Column[Timestamp]("updated", this) val updated = Column[Timestamp]("updated", this)
val created = Column[Timestamp]("created", this) val created = Column[Timestamp]("created", this)
val all = NonEmptyList.of[Column[_]](id, nodeType, url, updated, created) val notFound = Column[Int]("not_found", this)
val all = NonEmptyList.of[Column[_]](id, nodeType, url, updated, created, notFound)
} }
def as(alias: String): Table = def as(alias: String): Table =
Table(Some(alias)) Table(Some(alias))
val T = Table(None)
def insert(v: RNode): ConnectionIO[Int] = { def insert(v: RNode): ConnectionIO[Int] = {
val t = Table(None) val t = Table(None)
DML.insert( DML.insert(
t, t,
t.all, t.all,
fr"${v.id},${v.nodeType},${v.url},${v.updated},${v.created}" fr"${v.id},${v.nodeType},${v.url},${v.updated},${v.created},${v.notFound}"
) )
} }
@ -61,6 +64,22 @@ object RNode {
) )
} }
def incrementNotFound(nid: Ident): ConnectionIO[Int] =
Timestamp
.current[ConnectionIO]
.flatMap(now =>
DML
.update(T, T.id === nid, DML.set(T.notFound.increment(1), T.updated.setTo(now)))
)
def resetNotFound(id: Ident): ConnectionIO[Int] =
Timestamp
.current[ConnectionIO]
.flatMap(now =>
DML
.update(T, T.id === id, DML.set(T.notFound.setTo(0), T.updated.setTo(now)))
)
def set(v: RNode): ConnectionIO[Int] = def set(v: RNode): ConnectionIO[Int] =
for { for {
n <- update(v) n <- update(v)
@ -81,4 +100,10 @@ object RNode {
val t = Table(None) val t = Table(None)
run(select(t.all), from(t), t.id === nodeId).query[RNode].option run(select(t.all), from(t), t.id === nodeId).query[RNode].option
} }
def streamAll: Stream[ConnectionIO, RNode] =
run(select(T.all), from(T)).query[RNode].streamWithChunkSize(50)
def deleteNotFound(min: Int): ConnectionIO[Int] =
DML.delete(T, T.notFound >= min)
} }

View File

@ -58,6 +58,10 @@ let
enabled = true; enabled = true;
older-than = "30 days"; older-than = "30 days";
}; };
check-nodes = {
enabled = true;
min-not-found = 2;
};
}; };
extraction = { extraction = {
pdf = { pdf = {
@ -540,6 +544,24 @@ in {
default = defaults.house-keeping.cleanup-remember-me; default = defaults.house-keeping.cleanup-remember-me;
description = "Settings for cleaning up remember me tokens."; description = "Settings for cleaning up remember me tokens.";
}; };
check-nodes = mkOption {
type = types.submodule({
options = {
enabled = mkOption {
type = types.bool;
default = defaults.house-keeping.check-nodes.enabled;
description = "Whether this task is enabled.";
};
min-not-found = mkOption {
type = types.int;
default = defaults.house-keeping.check-nodes.min-not-found;
description = "How often the node must be unreachable, before it is removed.";
};
};
});
default = defaults.house-keeping.cleanup-nodes;
description = "Removes node entries that are not reachable anymore.";
};
}; };
}); });
default = defaults.house-keeping; default = defaults.house-keeping;