From 7dcb61ef564341d61a61a9051c5f82e8d571048e Mon Sep 17 00:00:00 2001 From: eikek Date: Sat, 26 Jun 2021 14:43:16 +0200 Subject: [PATCH 1/3] Use old ip checks and fix dev-ui-build script These checks could be improved to not use javas InetAddress. But for now it should have the same behaviour as before. --- .../src/main/scala/docspell/restserver/Config.scala | 9 ++++----- .../docspell/restserver/http4s/ClientRequestInfo.scala | 3 ++- project/dev-ui-build.sh | 1 + 3 files changed, 7 insertions(+), 6 deletions(-) diff --git a/modules/restserver/src/main/scala/docspell/restserver/Config.scala b/modules/restserver/src/main/scala/docspell/restserver/Config.scala index 3696610e..e5aae021 100644 --- a/modules/restserver/src/main/scala/docspell/restserver/Config.scala +++ b/modules/restserver/src/main/scala/docspell/restserver/Config.scala @@ -42,14 +42,13 @@ object Config { case class HttpHeader(enabled: Boolean, headerName: String, headerValue: String) case class AllowedIps(enabled: Boolean, ips: Set[String]) { - def containsAddress(inet: IpAddress): Boolean = { - val ip = inet.fold(_.toUriString, _.toUriString) //.getHostAddress + def containsAddress(ipa: IpAddress): Boolean = { + val inet = ipa.toInetAddress + val ip = inet.getHostAddress lazy val ipParts = ip.split('.') def checkSingle(pattern: String): Boolean = - pattern == ip || (ip.contains( - "localhost" - ) && pattern == "127.0.0.1") || (pattern + pattern == ip || (inet.isLoopbackAddress && pattern == "127.0.0.1") || (pattern .split('.') .zip(ipParts) .foldLeft(true) { case (r, (a, b)) => diff --git a/modules/restserver/src/main/scala/docspell/restserver/http4s/ClientRequestInfo.scala b/modules/restserver/src/main/scala/docspell/restserver/http4s/ClientRequestInfo.scala index 8c6256da..91b37a60 100644 --- a/modules/restserver/src/main/scala/docspell/restserver/http4s/ClientRequestInfo.scala +++ b/modules/restserver/src/main/scala/docspell/restserver/http4s/ClientRequestInfo.scala @@ -41,7 +41,8 @@ object ClientRequestInfo { req.headers .get[`X-Forwarded-For`] .flatMap(_.values.head) - .map(ip => ip.fold(_.toUriString, _.toUriString)) + .map(_.toInetAddress) + .flatMap(inet => Option(inet.getHostName).orElse(Option(inet.getHostAddress))) private def xForwardedHost[F[_]](req: Request[F]): Option[String] = req.headers diff --git a/project/dev-ui-build.sh b/project/dev-ui-build.sh index c61da8ed..bb1fedb1 100755 --- a/project/dev-ui-build.sh +++ b/project/dev-ui-build.sh @@ -48,6 +48,7 @@ watch_both() { } +cd "$wdir/modules/webapp" case "$1" in all) compile_js From 988367a2817155ee9d7d519711d6ca46025aa1aa Mon Sep 17 00:00:00 2001 From: eikek Date: Sat, 26 Jun 2021 21:12:02 +0200 Subject: [PATCH 2/3] Extend query builder to compare results from subselects --- .../main/scala/docspell/store/qb/Condition.scala | 3 +++ .../src/main/scala/docspell/store/qb/DSL.scala | 13 +++++++++++++ .../docspell/store/qb/impl/ConditionBuilder.scala | 13 +++++++++++++ 3 files changed, 29 insertions(+) diff --git a/modules/store/src/main/scala/docspell/store/qb/Condition.scala b/modules/store/src/main/scala/docspell/store/qb/Condition.scala index 9a329033..cfe156a2 100644 --- a/modules/store/src/main/scala/docspell/store/qb/Condition.scala +++ b/modules/store/src/main/scala/docspell/store/qb/Condition.scala @@ -19,6 +19,9 @@ object Condition { val P: Put[A] ) extends Condition + case class CompareSelect(sel: SelectExpr, op: Operator, subSelect: Select) + extends Condition + case class CompareCol[A](col1: Column[A], op: Operator, col2: Column[A]) extends Condition diff --git a/modules/store/src/main/scala/docspell/store/qb/DSL.scala b/modules/store/src/main/scala/docspell/store/qb/DSL.scala index d6f32d9c..671d4e71 100644 --- a/modules/store/src/main/scala/docspell/store/qb/DSL.scala +++ b/modules/store/src/main/scala/docspell/store/qb/DSL.scala @@ -215,6 +215,19 @@ trait DSL extends DoobieMeta { def in(subsel: Select): Condition = Condition.InSubSelect(col, subsel) + def >(subsel: Select): Condition = + Condition.CompareSelect(col.s, Operator.Gt, subsel) + def <(subsel: Select): Condition = + Condition.CompareSelect(col.s, Operator.Lt, subsel) + def >=(subsel: Select): Condition = + Condition.CompareSelect(col.s, Operator.Gte, subsel) + def <=(subsel: Select): Condition = + Condition.CompareSelect(col.s, Operator.Lte, subsel) + def ===(subsel: Select): Condition = + Condition.CompareSelect(col.s, Operator.Eq, subsel) + def !==(subsel: Select): Condition = + Condition.CompareSelect(col.s, Operator.Neq, subsel) + def notIn(subsel: Select): Condition = in(subsel).negate diff --git a/modules/store/src/main/scala/docspell/store/qb/impl/ConditionBuilder.scala b/modules/store/src/main/scala/docspell/store/qb/impl/ConditionBuilder.scala index c9e50575..665ca476 100644 --- a/modules/store/src/main/scala/docspell/store/qb/impl/ConditionBuilder.scala +++ b/modules/store/src/main/scala/docspell/store/qb/impl/ConditionBuilder.scala @@ -100,6 +100,19 @@ object ConditionBuilder { } c1Frag ++ operator(op) ++ c2Frag + case Condition.CompareSelect(col, op, subsel) => + val opFrag = operator(op) + val colFrag = op match { + case Operator.LowerLike => + lower(col) + case Operator.LowerEq => + lower(col) + case _ => + SelectExprBuilder.build(col) + } + val sub = SelectBuilder(subsel) + colFrag ++ opFrag ++ sql"(" ++ sub ++ sql")" + case Condition.InSubSelect(col, subsel) => val sub = SelectBuilder(subsel) SelectExprBuilder.column(col) ++ sql" IN (" ++ sub ++ parenClose From ce6f53cc293f6fb15494ad87fc3ecac8edbe2eb6 Mon Sep 17 00:00:00 2001 From: eikek Date: Sat, 26 Jun 2021 23:33:25 +0200 Subject: [PATCH 3/3] Fix next-job query to do round-robin through job groups --- build.sbt | 5 +- .../scala/docspell/store/queries/QJob.scala | 39 ++++++-- .../docspell/store/records/RJobGroupUse.scala | 3 + .../store/src/test/resources/logback-test.xml | 14 +++ .../scala/docspell/store/StoreFixture.scala | 30 ++++--- .../docspell/store/queries/QJobTest.scala | 88 +++++++++++++++++++ project/Dependencies.scala | 6 +- 7 files changed, 162 insertions(+), 23 deletions(-) create mode 100644 modules/store/src/test/resources/logback-test.xml create mode 100644 modules/store/src/test/scala/docspell/store/queries/QJobTest.scala diff --git a/build.sbt b/build.sbt index 2205fe5c..1aca4bf2 100644 --- a/build.sbt +++ b/build.sbt @@ -8,6 +8,9 @@ val elmCompileMode = settingKey[ElmCompileMode]("How to compile elm sources") // --- Settings +def inTest(d0: Seq[ModuleID], ds: Seq[ModuleID]*) = + ds.fold(d0)(_ ++ _).map(_ % Test) + val scalafixSettings = Seq( semanticdbEnabled := true, // enable SemanticDB semanticdbVersion := scalafixSemanticdb.revision, //"4.4.0" @@ -45,7 +48,7 @@ val sharedSettings = Seq( ) ++ scalafixSettings val testSettingsMUnit = Seq( - libraryDependencies ++= Dependencies.munit.map(_ % Test), + libraryDependencies ++= inTest(Dependencies.munit, Dependencies.logging), testFrameworks += new TestFramework("munit.Framework") ) diff --git a/modules/store/src/main/scala/docspell/store/queries/QJob.scala b/modules/store/src/main/scala/docspell/store/queries/QJob.scala index 7e95d4f1..9115dfa5 100644 --- a/modules/store/src/main/scala/docspell/store/queries/QJob.scala +++ b/modules/store/src/main/scala/docspell/store/queries/QJob.scala @@ -60,8 +60,11 @@ object QJob { store.transact(for { n <- RJob.setScheduled(job.id, worker) _ <- - if (n == 1) RJobGroupUse.setGroup(RJobGroupUse(worker, job.group)) + if (n == 1) RJobGroupUse.setGroup(RJobGroupUse(job.group, worker)) else 0.pure[ConnectionIO] + _ <- logger.fdebug[ConnectionIO]( + s"Scheduled job ${job.info} to worker ${worker.id}" + ) } yield if (n == 1) Right(job) else Left(())) for { @@ -98,22 +101,42 @@ object QJob { val stateCond = JC.state === JobState.waiting || (JC.state === JobState.stuck && stuckTrigger < now.toMillis) + object AllGroups extends TableDef { + val tableName = "allgroups" + val alias = Some("ag") + + val group: Column[Ident] = JC.group.copy(table = this) + + val selectAll = Select(JC.group.s, from(JC), stateCond).distinct + } + val sql1 = Select( - max(JC.group).as("g"), - from(JC).innerJoin(G, JC.group === G.group), - G.worker === worker && stateCond + select(min(AllGroups.group).as("g"), lit("0 as n")), + from(AllGroups), + AllGroups.group > Select(G.group.s, from(G), G.worker === worker) ) val sql2 = - Select(min(JC.group).as("g"), from(JC), stateCond) + Select( + select(min(AllGroups.group).as("g"), lit("1 as n")), + from(AllGroups) + ) val gcol = Column[String]("g", TableDef("")) + val gnum = Column[Int]("n", TableDef("")) val groups = - Select(select(gcol), from(union(sql1, sql2), "t0"), gcol.isNull.negate) + withCte(AllGroups -> AllGroups.selectAll) + .select(Select(gcol.s, from(union(sql1, sql2), "t0"), gcol.isNull.negate)) + .orderBy(gnum.asc) + .limit(1) - // either 0, one or two results, but may be empty if RJob table is empty - groups.build.query[Ident].to[List].map(_.headOption) + val frag = groups.build + logger.trace( + s"nextGroupQuery: $frag (now=${now.toMillis}, pause=${initialPause.millis})" + ) + + frag.query[Ident].option } private def stuckTriggerValue(t: RJob.Table, initialPause: Duration, now: Timestamp) = diff --git a/modules/store/src/main/scala/docspell/store/records/RJobGroupUse.scala b/modules/store/src/main/scala/docspell/store/records/RJobGroupUse.scala index 8763753c..c00a4e77 100644 --- a/modules/store/src/main/scala/docspell/store/records/RJobGroupUse.scala +++ b/modules/store/src/main/scala/docspell/store/records/RJobGroupUse.scala @@ -36,4 +36,7 @@ object RJobGroupUse { def findGroup(workerId: Ident): ConnectionIO[Option[Ident]] = run(select(T.group), from(T), T.worker === workerId).query[Ident].option + + def deleteAll: ConnectionIO[Int] = + DML.delete(T, T.group.isNotNull) } diff --git a/modules/store/src/test/resources/logback-test.xml b/modules/store/src/test/resources/logback-test.xml new file mode 100644 index 00000000..9cf93b57 --- /dev/null +++ b/modules/store/src/test/resources/logback-test.xml @@ -0,0 +1,14 @@ + + + true + + + %highlight(%-5level) %cyan(%logger{15}) - %msg %n + + + + + + + + diff --git a/modules/store/src/test/scala/docspell/store/StoreFixture.scala b/modules/store/src/test/scala/docspell/store/StoreFixture.scala index 9460433a..00798db9 100644 --- a/modules/store/src/test/scala/docspell/store/StoreFixture.scala +++ b/modules/store/src/test/scala/docspell/store/StoreFixture.scala @@ -1,27 +1,33 @@ package docspell.store import cats.effect._ -import cats.effect.unsafe.implicits.global import docspell.common.LenientUri import docspell.store.impl.StoreImpl import doobie._ +import munit._ import org.h2.jdbcx.JdbcConnectionPool -trait StoreFixture { - def withStore(db: String)(code: Store[IO] => IO[Unit]): Unit = { - //StoreFixture.store(StoreFixture.memoryDB(db)).use(code).unsafeRunSync() - val jdbc = StoreFixture.memoryDB(db) - val xa = StoreFixture.globalXA(jdbc) - val store = new StoreImpl[IO](jdbc, xa) - store.migrate.unsafeRunSync() - code(store).unsafeRunSync() +trait StoreFixture extends CatsEffectFunFixtures { self: CatsEffectSuite => + + val xa = ResourceFixture { + val cfg = StoreFixture.memoryDB("test") + for { + xa <- StoreFixture.makeXA(cfg) + store = new StoreImpl[IO](cfg, xa) + _ <- Resource.eval(store.migrate) + } yield xa } - def withXA(db: String)(code: Transactor[IO] => IO[Unit]): Unit = - StoreFixture.makeXA(StoreFixture.memoryDB(db)).use(code).unsafeRunSync() - + val store = ResourceFixture { + val cfg = StoreFixture.memoryDB("test") + for { + xa <- StoreFixture.makeXA(cfg) + store = new StoreImpl[IO](cfg, xa) + _ <- Resource.eval(store.migrate) + } yield store + } } object StoreFixture { diff --git a/modules/store/src/test/scala/docspell/store/queries/QJobTest.scala b/modules/store/src/test/scala/docspell/store/queries/QJobTest.scala new file mode 100644 index 00000000..66dade6b --- /dev/null +++ b/modules/store/src/test/scala/docspell/store/queries/QJobTest.scala @@ -0,0 +1,88 @@ +package docspell.store.queries + +import java.time.Instant +import java.util.concurrent.atomic.AtomicLong + +import cats.implicits._ + +import docspell.common._ +import docspell.store.StoreFixture +import docspell.store.records.RJob +import docspell.store.records.RJobGroupUse + +import doobie.implicits._ +import munit._ + +class QJobTest extends CatsEffectSuite with StoreFixture { + private[this] val c = new AtomicLong(0) + + private val worker = Ident.unsafe("joex1") + private val initialPause = Duration.seconds(5) + private val nowTs = Timestamp(Instant.parse("2021-06-26T14:54:00Z")) + private val group1 = Ident.unsafe("group1") + private val group2 = Ident.unsafe("group2") + + def createJob(group: Ident): RJob = + RJob.newJob[Unit]( + Ident.unsafe(s"job-${c.incrementAndGet()}"), + Ident.unsafe("task"), + group, + (), + "some subject", + nowTs - Duration.days(3), + Ident.unsafe("user1"), + Priority.Low, + None + ) + + xa.test("set group must insert or update") { tx => + val res = + for { + _ <- RJobGroupUse.setGroup(RJobGroupUse(group1, worker)).transact(tx) + res <- RJobGroupUse.findGroup(worker).transact(tx) + } yield res + + res.assertEquals(Some(group1)) + } + + xa.test("selectNextGroup should return first group on initial state") { tx => + val nextGroup = for { + _ <- List(group1, group2, group1, group2, group2) + .map(createJob) + .map(RJob.insert) + .traverse(_.transact(tx)) + _ <- RJobGroupUse.deleteAll.transact(tx) + next <- QJob.selectNextGroup(worker, nowTs, initialPause).transact(tx) + } yield next + + nextGroup.assertEquals(Some(group1)) + } + + xa.test("selectNextGroup should return second group on subsequent call (1)") { tx => + val nextGroup = for { + _ <- List(group1, group2, group1, group2) + .map(createJob) + .map(RJob.insert) + .traverse(_.transact(tx)) + _ <- RJobGroupUse.deleteAll.transact(tx) + _ <- RJobGroupUse.setGroup(RJobGroupUse(group1, worker)).transact(tx) + next <- QJob.selectNextGroup(worker, nowTs, initialPause).transact(tx) + } yield next + + nextGroup.assertEquals(Some(group2)) + } + + xa.test("selectNextGroup should return second group on subsequent call (2)") { tx => + val nextGroup = for { + _ <- List(group1, group2, group1, group2) + .map(createJob) + .map(RJob.insert) + .traverse(_.transact(tx)) + _ <- RJobGroupUse.deleteAll.transact(tx) + _ <- RJobGroupUse.setGroup(RJobGroupUse(group2, worker)).transact(tx) + next <- QJob.selectNextGroup(worker, nowTs, initialPause).transact(tx) + } yield next + + nextGroup.assertEquals(Some(group1)) + } +} diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 0e369f30..0447ac08 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -29,6 +29,7 @@ object Dependencies { val LogbackVersion = "1.2.3" val MariaDbVersion = "2.7.3" val MUnitVersion = "0.7.26" + val MUnitCatsEffectVersion = "1.0.5" val OrganizeImportsVersion = "0.5.0" val PdfboxVersion = "2.0.24" val PoiVersion = "4.1.2" @@ -265,8 +266,9 @@ object Dependencies { ) val munit = Seq( - "org.scalameta" %% "munit" % MUnitVersion, - "org.scalameta" %% "munit-scalacheck" % MUnitVersion + "org.scalameta" %% "munit" % MUnitVersion, + "org.scalameta" %% "munit-scalacheck" % MUnitVersion, + "org.typelevel" %% "munit-cats-effect-3" % MUnitCatsEffectVersion ) val kindProjectorPlugin = "org.typelevel" %% "kind-projector" % KindProjectorVersion