Upgrade code base to CE3

This commit is contained in:
eikek
2021-06-21 21:33:54 +02:00
parent 903ec26e54
commit bd791b4593
146 changed files with 638 additions and 758 deletions

View File

@ -30,10 +30,10 @@ import docspell.store.queue._
import docspell.store.records.RJobLog
import emil.javamail._
import org.http4s.blaze.client.BlazeClientBuilder
import org.http4s.client.Client
import org.http4s.client.blaze.BlazeClientBuilder
final class JoexAppImpl[F[_]: ConcurrentEffect: Timer](
final class JoexAppImpl[F[_]: Async](
cfg: Config,
nodeOps: ONode[F],
store: Store[F],
@ -49,8 +49,8 @@ final class JoexAppImpl[F[_]: ConcurrentEffect: Timer](
val prun = periodicScheduler.start.compile.drain
for {
_ <- scheduleBackgroundTasks
_ <- ConcurrentEffect[F].start(run)
_ <- ConcurrentEffect[F].start(prun)
_ <- Async[F].start(run)
_ <- Async[F].start(prun)
_ <- scheduler.periodicAwake
_ <- periodicScheduler.periodicAwake
_ <- nodeOps.register(cfg.appId, NodeType.Joex, cfg.baseUrl)
@ -79,17 +79,16 @@ final class JoexAppImpl[F[_]: ConcurrentEffect: Timer](
object JoexAppImpl {
def create[F[_]: ConcurrentEffect: ContextShift: Timer](
def create[F[_]: Async](
cfg: Config,
termSignal: SignallingRef[F, Boolean],
connectEC: ExecutionContext,
clientEC: ExecutionContext,
blocker: Blocker
clientEC: ExecutionContext
): Resource[F, JoexApp[F]] =
for {
httpClient <- BlazeClientBuilder[F](clientEC).resource
client = JoexClient(httpClient)
store <- Store.create(cfg.jdbc, connectEC, blocker)
store <- Store.create(cfg.jdbc, connectEC)
queue <- JobQueue(store)
pstore <- PeriodicTaskStore.create(store)
nodeOps <- ONode(store)
@ -97,11 +96,11 @@ object JoexAppImpl {
upload <- OUpload(store, queue, cfg.files, joex)
fts <- createFtsClient(cfg)(httpClient)
itemOps <- OItem(store, fts, queue, joex)
analyser <- TextAnalyser.create[F](cfg.textAnalysis.textAnalysisConfig, blocker)
regexNer <- RegexNerFile(cfg.textAnalysis.regexNerFileConfig, blocker, store)
analyser <- TextAnalyser.create[F](cfg.textAnalysis.textAnalysisConfig)
regexNer <- RegexNerFile(cfg.textAnalysis.regexNerFileConfig, store)
javaEmil =
JavaMailEmil(blocker, Settings.defaultSettings.copy(debug = cfg.mailDebug))
sch <- SchedulerBuilder(cfg.scheduler, blocker, store)
JavaMailEmil(Settings.defaultSettings.copy(debug = cfg.mailDebug))
sch <- SchedulerBuilder(cfg.scheduler, store)
.withQueue(queue)
.withTask(
JobTask.json(
@ -207,14 +206,13 @@ object JoexAppImpl {
sch,
queue,
pstore,
client,
Timer[F]
client
)
app = new JoexAppImpl(cfg, nodeOps, store, queue, pstore, termSignal, sch, psch)
appR <- Resource.make(app.init.map(_ => app))(_.shutdown)
} yield appR
private def createFtsClient[F[_]: ConcurrentEffect](
private def createFtsClient[F[_]: Async](
cfg: Config
)(client: Client[F]): Resource[F, FtsClient[F]] =
if (cfg.fullTextSearch.enabled) SolrFtsClient(cfg.fullTextSearch.solr, client)

View File

@ -1,7 +1,7 @@
package docspell.joex
import cats.effect.Ref
import cats.effect._
import cats.effect.concurrent.Ref
import fs2.Stream
import fs2.concurrent.SignallingRef
@ -9,9 +9,9 @@ import docspell.common.Pools
import docspell.joex.routes._
import org.http4s.HttpApp
import org.http4s.blaze.server.BlazeServerBuilder
import org.http4s.implicits._
import org.http4s.server.Router
import org.http4s.server.blaze.BlazeServerBuilder
import org.http4s.server.middleware.Logger
object JoexServer {
@ -22,17 +22,14 @@ object JoexServer {
exitRef: Ref[F, ExitCode]
)
def stream[F[_]: ConcurrentEffect: ContextShift](
cfg: Config,
pools: Pools
)(implicit T: Timer[F]): Stream[F, Nothing] = {
def stream[F[_]: Async](cfg: Config, pools: Pools): Stream[F, Nothing] = {
val app = for {
signal <- Resource.eval(SignallingRef[F, Boolean](false))
exitCode <- Resource.eval(Ref[F].of(ExitCode.Success))
joexApp <-
JoexAppImpl
.create[F](cfg, signal, pools.connectEC, pools.httpClientEC, pools.blocker)
.create[F](cfg, signal, pools.connectEC, pools.httpClientEC)
httpApp = Router(
"/api/info" -> InfoRoutes(cfg),

View File

@ -57,9 +57,8 @@ object Main extends IOApp {
val pools = for {
cec <- connectEC
bec <- blockingEC
blocker = Blocker.liftExecutorService(bec)
rec <- restserverEC
} yield Pools(cec, bec, blocker, rec)
} yield Pools(cec, bec, rec)
pools.use(p =>
JoexServer
.stream[IO](cfg, p)

View File

@ -33,16 +33,15 @@ object NerFile {
private def jsonFilePath(directory: Path, collective: Ident): Path =
directory.resolve(s"${collective.id}.json")
def find[F[_]: Sync: ContextShift](
def find[F[_]: Async](
collective: Ident,
directory: Path,
blocker: Blocker
directory: Path
): F[Option[NerFile]] = {
val file = jsonFilePath(directory, collective)
File.existsNonEmpty[F](file).flatMap {
case true =>
File
.readJson[F, NerFile](file, blocker)
.readJson[F, NerFile](file)
.map(_.some)
case false =>
(None: Option[NerFile]).pure[F]

View File

@ -3,7 +3,7 @@ package docspell.joex.analysis
import java.nio.file.Path
import cats.effect._
import cats.effect.concurrent.Semaphore
import cats.effect.std.Semaphore
import cats.implicits._
import docspell.common._
@ -31,19 +31,17 @@ object RegexNerFile {
case class Config(maxEntries: Int, directory: Path, minTime: Duration)
def apply[F[_]: Concurrent: ContextShift](
def apply[F[_]: Async](
cfg: Config,
blocker: Blocker,
store: Store[F]
): Resource[F, RegexNerFile[F]] =
for {
dir <- File.withTempDir[F](cfg.directory, "regexner-")
writer <- Resource.eval(Semaphore(1))
} yield new Impl[F](cfg.copy(directory = dir), blocker, store, writer)
} yield new Impl[F](cfg.copy(directory = dir), store, writer)
final private class Impl[F[_]: Concurrent: ContextShift](
final private class Impl[F[_]: Async](
cfg: Config,
blocker: Blocker,
store: Store[F],
writer: Semaphore[F] //TODO allow parallelism per collective
) extends RegexNerFile[F] {
@ -55,7 +53,7 @@ object RegexNerFile {
def doMakeFile(collective: Ident): F[Option[Path]] =
for {
now <- Timestamp.current[F]
existing <- NerFile.find[F](collective, cfg.directory, blocker)
existing <- NerFile.find[F](collective, cfg.directory)
result <- existing match {
case Some(nf) =>
val dur = Duration.between(nf.creation, now)
@ -105,11 +103,13 @@ object RegexNerFile {
} yield result
private def updateTimestamp(nf: NerFile, now: Timestamp): F[Unit] =
writer.withPermit(for {
file <- Sync[F].pure(nf.jsonFilePath(cfg.directory))
_ <- File.mkDir(file.getParent)
_ <- File.writeString(file, nf.copy(creation = now).asJson.spaces2)
} yield ())
writer.permit.use(_ =>
for {
file <- Sync[F].pure(nf.jsonFilePath(cfg.directory))
_ <- File.mkDir(file.getParent)
_ <- File.writeString(file, nf.copy(creation = now).asJson.spaces2)
} yield ()
)
private def createFile(
lastUpdate: Timestamp,
@ -117,13 +117,17 @@ object RegexNerFile {
now: Timestamp
): F[NerFile] = {
def update(nf: NerFile, text: String): F[Unit] =
writer.withPermit(for {
jsonFile <- Sync[F].pure(nf.jsonFilePath(cfg.directory))
_ <- logger.fdebug(s"Writing custom NER file for collective '${collective.id}'")
_ <- File.mkDir(jsonFile.getParent)
_ <- File.writeString(nf.nerFilePath(cfg.directory), text)
_ <- File.writeString(jsonFile, nf.asJson.spaces2)
} yield ())
writer.permit.use(_ =>
for {
jsonFile <- Sync[F].pure(nf.jsonFilePath(cfg.directory))
_ <- logger.fdebug(
s"Writing custom NER file for collective '${collective.id}'"
)
_ <- File.mkDir(jsonFile.getParent)
_ <- File.writeString(nf.nerFilePath(cfg.directory), text)
_ <- File.writeString(jsonFile, nf.asJson.spaces2)
} yield ()
)
for {
_ <- logger.finfo(s"Generating custom NER file for collective '${collective.id}'")

View File

@ -28,7 +28,7 @@ object Migration {
def from[F[_]: Applicative: FlatMap](fm: FtsMigration[F]): Migration[F] =
Migration(fm.version, fm.engine, fm.description, FtsWork.from(fm.task))
def apply[F[_]: Effect](
def apply[F[_]: Async](
cfg: Config.FullTextSearch,
fts: FtsClient[F],
store: Store[F],
@ -41,7 +41,7 @@ object Migration {
}
}
def applySingle[F[_]: Effect](ctx: FtsContext[F])(m: Migration[F]): F[Unit] =
def applySingle[F[_]: Async](ctx: FtsContext[F])(m: Migration[F]): F[Unit] =
for {
_ <- ctx.logger.info(s"Apply ${m.version}/${m.description}")
_ <- m.task.run(ctx)

View File

@ -12,7 +12,7 @@ import docspell.store.records.RJob
object MigrationTask {
val taskName = Ident.unsafe("full-text-index")
def apply[F[_]: ConcurrentEffect](
def apply[F[_]: Async](
cfg: Config.FullTextSearch,
fts: FtsClient[F]
): Task[F, Unit, Unit] =
@ -46,7 +46,7 @@ object MigrationTask {
Some(DocspellSystem.migrationTaskTracker)
)
def migrationTasks[F[_]: Effect](fts: FtsClient[F]): F[List[Migration[F]]] =
def migrationTasks[F[_]: Async](fts: FtsClient[F]): F[List[Migration[F]]] =
fts.initialize.map(_.map(fm => Migration.from(fm)))
}

View File

@ -14,7 +14,7 @@ object ReIndexTask {
val taskName = ReIndexTaskArgs.taskName
val tracker = DocspellSystem.migrationTaskTracker
def apply[F[_]: ConcurrentEffect](
def apply[F[_]: Async](
cfg: Config.FullTextSearch,
fts: FtsClient[F]
): Task[F, Args, Unit] =
@ -27,7 +27,7 @@ object ReIndexTask {
def onCancel[F[_]]: Task[F, Args, Unit] =
Task.log[F, Args](_.warn("Cancelling full-text re-index task"))
private def clearData[F[_]: ConcurrentEffect](collective: Option[Ident]): FtsWork[F] =
private def clearData[F[_]: Async](collective: Option[Ident]): FtsWork[F] =
FtsWork.log[F](_.info("Clearing index data")) ++
(collective match {
case Some(_) =>

View File

@ -7,19 +7,20 @@ import docspell.common._
import docspell.joex.scheduler.{Context, Task}
import docspell.store.records._
import org.http4s.blaze.client.BlazeClientBuilder
import org.http4s.client.Client
import org.http4s.client.blaze.BlazeClientBuilder
object CheckNodesTask {
def apply[F[_]: ConcurrentEffect](
def apply[F[_]: Async](
cfg: HouseKeepingConfig.CheckNodes
): Task[F, Unit, Unit] =
Task { ctx =>
if (cfg.enabled)
for {
_ <- ctx.logger.info("Check nodes reachability")
_ <- BlazeClientBuilder[F](ctx.blocker.blockingContext).resource.use { client =>
ec = scala.concurrent.ExecutionContext.global
_ <- BlazeClientBuilder[F](ec).resource.use { client =>
checkNodes(ctx, client)
}
_ <- ctx.logger.info(
@ -32,7 +33,7 @@ object CheckNodesTask {
ctx.logger.info("CheckNodes task is disabled in the configuration")
}
def checkNodes[F[_]: Sync](ctx: Context[F, _], client: Client[F]): F[Unit] =
def checkNodes[F[_]: Async](ctx: Context[F, _], client: Client[F]): F[Unit] =
ctx.store
.transact(RNode.streamAll)
.evalMap(node =>
@ -45,7 +46,7 @@ object CheckNodesTask {
.compile
.drain
def checkNode[F[_]: Sync](logger: Logger[F], client: Client[F])(
def checkNode[F[_]: Async](logger: Logger[F], client: Client[F])(
url: LenientUri
): F[Boolean] = {
val apiVersion = url / "api" / "info" / "version"

View File

@ -15,7 +15,7 @@ object HouseKeepingTask {
val taskName: Ident = Ident.unsafe("housekeeping")
def apply[F[_]: ConcurrentEffect](cfg: Config): Task[F, Unit, Unit] =
def apply[F[_]: Async](cfg: Config): Task[F, Unit, Unit] =
Task
.log[F, Unit](_.info(s"Running house-keeping task now"))
.flatMap(_ => CleanupInvitesTask(cfg.houseKeeping.cleanupInvites))

View File

@ -5,6 +5,7 @@ import java.nio.file.Path
import cats.data.OptionT
import cats.effect._
import cats.implicits._
import fs2.io.file.Files
import docspell.analysis.classifier.{ClassifierModel, TextClassifier}
import docspell.common._
@ -15,8 +16,7 @@ import bitpeace.RangeDef
object Classify {
def apply[F[_]: Sync: ContextShift](
blocker: Blocker,
def apply[F[_]: Async](
logger: Logger[F],
workingDir: Path,
store: Store[F],
@ -36,7 +36,7 @@ object Classify {
cls <- OptionT(File.withTempDir(workingDir, "classify").use { dir =>
val modelFile = dir.resolve("model.ser.gz")
modelData
.through(fs2.io.file.writeAll(modelFile, blocker))
.through(Files[F].writeAll(modelFile))
.compile
.drain
.flatMap(_ => classifier.classify(logger, ClassifierModel(modelFile), text))

View File

@ -20,7 +20,7 @@ object LearnClassifierTask {
def onCancel[F[_]]: Task[F, Args, Unit] =
Task.log(_.warn("Cancelling learn-classifier task"))
def apply[F[_]: Sync: ContextShift](
def apply[F[_]: Async](
cfg: Config.TextAnalysis,
analyser: TextAnalyser[F]
): Task[F, Args, Unit] =
@ -28,7 +28,7 @@ object LearnClassifierTask {
.flatMap(_ => learnItemEntities(cfg, analyser))
.flatMap(_ => Task(_ => Sync[F].delay(System.gc())))
private def learnItemEntities[F[_]: Sync: ContextShift](
private def learnItemEntities[F[_]: Async](
cfg: Config.TextAnalysis,
analyser: TextAnalyser[F]
): Task[F, Args, Unit] =
@ -45,7 +45,7 @@ object LearnClassifierTask {
else ().pure[F]
}
private def learnTags[F[_]: Sync: ContextShift](
private def learnTags[F[_]: Async](
cfg: Config.TextAnalysis,
analyser: TextAnalyser[F]
): Task[F, Args, Unit] =

View File

@ -11,7 +11,7 @@ import docspell.common._
import docspell.joex.scheduler._
object LearnItemEntities {
def learnAll[F[_]: Sync: ContextShift, A](
def learnAll[F[_]: Async, A](
analyser: TextAnalyser[F],
collective: Ident,
maxItems: Int,
@ -22,7 +22,7 @@ object LearnItemEntities {
.flatMap(_ => learnConcPerson(analyser, collective, maxItems, maxTextLen))
.flatMap(_ => learnConcEquip(analyser, collective, maxItems, maxTextLen))
def learnCorrOrg[F[_]: Sync: ContextShift, A](
def learnCorrOrg[F[_]: Async, A](
analyser: TextAnalyser[F],
collective: Ident,
maxItems: Int,
@ -33,7 +33,7 @@ object LearnItemEntities {
ctx => SelectItems.forCorrOrg(ctx.store, collective, maxItems, maxTextLen)
)
def learnCorrPerson[F[_]: Sync: ContextShift, A](
def learnCorrPerson[F[_]: Async, A](
analyser: TextAnalyser[F],
collective: Ident,
maxItems: Int,
@ -44,7 +44,7 @@ object LearnItemEntities {
ctx => SelectItems.forCorrPerson(ctx.store, collective, maxItems, maxTextLen)
)
def learnConcPerson[F[_]: Sync: ContextShift, A](
def learnConcPerson[F[_]: Async, A](
analyser: TextAnalyser[F],
collective: Ident,
maxItems: Int,
@ -55,7 +55,7 @@ object LearnItemEntities {
ctx => SelectItems.forConcPerson(ctx.store, collective, maxItems, maxTextLen)
)
def learnConcEquip[F[_]: Sync: ContextShift, A](
def learnConcEquip[F[_]: Async, A](
analyser: TextAnalyser[F],
collective: Ident,
maxItems: Int,
@ -66,7 +66,7 @@ object LearnItemEntities {
ctx => SelectItems.forConcEquip(ctx.store, collective, maxItems, maxTextLen)
)
private def learn[F[_]: Sync: ContextShift, A](
private def learn[F[_]: Async, A](
analyser: TextAnalyser[F],
collective: Ident
)(cname: ClassifierName, data: Context[F, _] => Stream[F, Data]): Task[F, A, Unit] =

View File

@ -11,7 +11,7 @@ import docspell.store.records.RClassifierSetting
object LearnTags {
def learnTagCategory[F[_]: Sync: ContextShift, A](
def learnTagCategory[F[_]: Async, A](
analyser: TextAnalyser[F],
collective: Ident,
maxItems: Int,
@ -33,7 +33,7 @@ object LearnTags {
)
}
def learnAllTagCategories[F[_]: Sync: ContextShift, A](analyser: TextAnalyser[F])(
def learnAllTagCategories[F[_]: Async, A](analyser: TextAnalyser[F])(
collective: Ident,
maxItems: Int,
maxTextLen: Int

View File

@ -2,6 +2,7 @@ package docspell.joex.learn
import cats.effect._
import cats.implicits._
import fs2.io.file.Files
import docspell.analysis.classifier.ClassifierModel
import docspell.common._
@ -13,18 +14,17 @@ import bitpeace.MimetypeHint
object StoreClassifierModel {
def handleModel[F[_]: Sync: ContextShift](
def handleModel[F[_]: Async](
ctx: Context[F, _],
collective: Ident,
modelName: ClassifierName
)(
trainedModel: ClassifierModel
): F[Unit] =
handleModel(ctx.store, ctx.blocker, ctx.logger)(collective, modelName, trainedModel)
handleModel(ctx.store, ctx.logger)(collective, modelName, trainedModel)
def handleModel[F[_]: Sync: ContextShift](
def handleModel[F[_]: Async](
store: Store[F],
blocker: Blocker,
logger: Logger[F]
)(
collective: Ident,
@ -36,7 +36,7 @@ object StoreClassifierModel {
RClassifierModel.findByName(collective, modelName.name).map(_.map(_.fileId))
)
_ <- logger.debug(s"Storing new trained model for: ${modelName.name}")
fileData = fs2.io.file.readAll(trainedModel.model, blocker, 4096)
fileData = Files[F].readAll(trainedModel.model, 4096)
newFile <-
store.bitpeace.saveNew(fileData, 4096, MimetypeHint.none).compile.lastOrError
_ <- store.transact(

View File

@ -15,7 +15,7 @@ import emil.{MimeType => _, _}
object ReadMail {
def readBytesP[F[_]: ConcurrentEffect](
def readBytesP[F[_]: Async](
logger: Logger[F],
glob: Glob
): Pipe[F, Byte, Binary[F]] =
@ -26,7 +26,7 @@ object ReadMail {
Stream.eval(logger.debug(s"Converting e-mail file...")) >>
s.through(Mail.readBytes[F])
def mailToEntries[F[_]: ConcurrentEffect](
def mailToEntries[F[_]: Async](
logger: Logger[F],
glob: Glob
)(mail: Mail[F]): Stream[F, Binary[F]] = {

View File

@ -35,7 +35,7 @@ object PdfConvTask {
val taskName = Ident.unsafe("pdf-files-migration")
def apply[F[_]: Sync: ContextShift](cfg: Config): Task[F, Args, Unit] =
def apply[F[_]: Async](cfg: Config): Task[F, Args, Unit] =
Task { ctx =>
for {
_ <- ctx.logger.info(s"Converting pdf file ${ctx.args} using ocrmypdf")
@ -62,7 +62,7 @@ object PdfConvTask {
val existsPdf =
for {
meta <- ctx.store.transact(RAttachment.findMeta(ctx.args.attachId))
res = meta.filter(_.mimetype.matches(Mimetype.`application/pdf`))
res = meta.filter(_.mimetype.matches(Mimetype.applicationPdf))
_ <-
if (res.isEmpty)
ctx.logger.info(
@ -83,7 +83,7 @@ object PdfConvTask {
else none.pure[F]
}
def convert[F[_]: Sync: ContextShift](
def convert[F[_]: Async](
cfg: Config,
ctx: Context[F, Args],
in: FileMeta
@ -118,7 +118,6 @@ object PdfConvTask {
cfg.convert.ocrmypdf,
lang,
in.chunksize,
ctx.blocker,
ctx.logger
)(data, storeResult)

View File

@ -95,7 +95,7 @@ object AttachmentPageCount {
def findMime[F[_]: Functor](ctx: Context[F, _])(ra: RAttachment): F[MimeType] =
OptionT(ctx.store.transact(RFileMeta.findById(ra.fileId)))
.map(_.mimetype)
.getOrElse(Mimetype.`application/octet-stream`)
.getOrElse(Mimetype.applicationOctetStream)
.map(_.toLocal)
def loadFile[F[_]](ctx: Context[F, _])(ra: RAttachment): Stream[F, Byte] =

View File

@ -98,7 +98,7 @@ object AttachmentPreview {
def findMime[F[_]: Functor](ctx: Context[F, _])(ra: RAttachment): F[MimeType] =
OptionT(ctx.store.transact(RFileMeta.findById(ra.fileId)))
.map(_.mimetype)
.getOrElse(Mimetype.`application/octet-stream`)
.getOrElse(Mimetype.applicationOctetStream)
.map(_.toLocal)
def loadFile[F[_]](ctx: Context[F, _])(ra: RAttachment): Stream[F, Byte] =

View File

@ -33,7 +33,7 @@ import bitpeace.{Mimetype, MimetypeHint, RangeDef}
*/
object ConvertPdf {
def apply[F[_]: Sync: ContextShift](
def apply[F[_]: Async](
cfg: ConvertConfig,
item: ItemData
): Task[F, ProcessItemArgs, ItemData] =
@ -69,15 +69,15 @@ object ConvertPdf {
def findMime[F[_]: Functor](ctx: Context[F, _])(ra: RAttachment): F[Mimetype] =
OptionT(ctx.store.transact(RFileMeta.findById(ra.fileId)))
.map(_.mimetype)
.getOrElse(Mimetype.`application/octet-stream`)
.getOrElse(Mimetype.applicationOctetStream)
def convertSafe[F[_]: Sync: ContextShift](
def convertSafe[F[_]: Async](
cfg: ConvertConfig,
sanitizeHtml: SanitizeHtml,
ctx: Context[F, ProcessItemArgs],
item: ItemData
)(ra: RAttachment, mime: Mimetype): F[(RAttachment, Option[RAttachmentMeta])] =
Conversion.create[F](cfg, sanitizeHtml, ctx.blocker, ctx.logger).use { conv =>
Conversion.create[F](cfg, sanitizeHtml, ctx.logger).use { conv =>
mime.toLocal match {
case mt =>
val data = ctx.store.bitpeace

View File

@ -32,12 +32,12 @@ import emil.Mail
*/
object ExtractArchive {
def apply[F[_]: ConcurrentEffect: ContextShift](
def apply[F[_]: Async](
item: ItemData
): Task[F, ProcessItemArgs, ItemData] =
multiPass(item, None).map(_._2)
def multiPass[F[_]: ConcurrentEffect: ContextShift](
def multiPass[F[_]: Async](
item: ItemData,
archive: Option[RAttachmentArchive]
): Task[F, ProcessItemArgs, (Option[RAttachmentArchive], ItemData)] =
@ -46,7 +46,7 @@ object ExtractArchive {
else multiPass(t._2, t._1)
}
def singlePass[F[_]: ConcurrentEffect: ContextShift](
def singlePass[F[_]: Async](
item: ItemData,
archive: Option[RAttachmentArchive]
): Task[F, ProcessItemArgs, (Option[RAttachmentArchive], ItemData)] =
@ -85,9 +85,9 @@ object ExtractArchive {
def findMime[F[_]: Functor](ctx: Context[F, _])(ra: RAttachment): F[Mimetype] =
OptionT(ctx.store.transact(RFileMeta.findById(ra.fileId)))
.map(_.mimetype)
.getOrElse(Mimetype.`application/octet-stream`)
.getOrElse(Mimetype.applicationOctetStream)
def extractSafe[F[_]: ConcurrentEffect: ContextShift](
def extractSafe[F[_]: Async](
ctx: Context[F, ProcessItemArgs],
archive: Option[RAttachmentArchive]
)(ra: RAttachment, pos: Int, mime: Mimetype): F[Extracted] =
@ -131,7 +131,7 @@ object ExtractArchive {
} yield extracted.copy(files = extracted.files.filter(_.id != ra.id))
}
def extractZip[F[_]: ConcurrentEffect: ContextShift](
def extractZip[F[_]: Async](
ctx: Context[F, ProcessItemArgs],
archive: Option[RAttachmentArchive]
)(ra: RAttachment, pos: Int): F[Extracted] = {
@ -142,7 +142,7 @@ object ExtractArchive {
val glob = ctx.args.meta.fileFilter.getOrElse(Glob.all)
ctx.logger.debug(s"Filtering zip entries with '${glob.asString}'") *>
zipData
.through(Zip.unzipP[F](8192, ctx.blocker, glob))
.through(Zip.unzipP[F](8192, glob))
.zipWithIndex
.flatMap(handleEntry(ctx, ra, pos, archive, None))
.foldMonoid
@ -150,7 +150,7 @@ object ExtractArchive {
.lastOrError
}
def extractMail[F[_]: ConcurrentEffect](
def extractMail[F[_]: Async](
ctx: Context[F, ProcessItemArgs],
archive: Option[RAttachmentArchive]
)(ra: RAttachment, pos: Int): F[Extracted] = {

View File

@ -28,7 +28,7 @@ object ItemHandler {
}
)
def newItem[F[_]: ConcurrentEffect: ContextShift](
def newItem[F[_]: Async](
cfg: Config,
itemOps: OItem[F],
fts: FtsClient[F],
@ -62,7 +62,7 @@ object ItemHandler {
def isLastRetry[F[_]: Sync]: Task[F, Args, Boolean] =
Task(_.isLastRetry)
def safeProcess[F[_]: ConcurrentEffect: ContextShift](
def safeProcess[F[_]: Async](
cfg: Config,
itemOps: OItem[F],
fts: FtsClient[F],

View File

@ -12,7 +12,7 @@ import docspell.joex.scheduler.Task
object ProcessItem {
def apply[F[_]: ConcurrentEffect: ContextShift](
def apply[F[_]: Async](
cfg: Config,
itemOps: OItem[F],
fts: FtsClient[F],
@ -27,7 +27,7 @@ object ProcessItem {
.flatMap(Task.setProgress(99))
.flatMap(RemoveEmptyItem(itemOps))
def processAttachments[F[_]: ConcurrentEffect: ContextShift](
def processAttachments[F[_]: Async](
cfg: Config,
fts: FtsClient[F],
analyser: TextAnalyser[F],
@ -35,7 +35,7 @@ object ProcessItem {
)(item: ItemData): Task[F, ProcessItemArgs, ItemData] =
processAttachments0[F](cfg, fts, analyser, regexNer, (30, 60, 90))(item)
def analysisOnly[F[_]: Sync: ContextShift](
def analysisOnly[F[_]: Async](
cfg: Config,
analyser: TextAnalyser[F],
regexNer: RegexNerFile[F]
@ -46,7 +46,7 @@ object ProcessItem {
.flatMap(CrossCheckProposals[F])
.flatMap(SaveProposals[F])
private def processAttachments0[F[_]: ConcurrentEffect: ContextShift](
private def processAttachments0[F[_]: Async](
cfg: Config,
fts: FtsClient[F],
analyser: TextAnalyser[F],

View File

@ -20,7 +20,7 @@ import docspell.store.records.RItem
object ReProcessItem {
type Args = ReProcessItemArgs
def apply[F[_]: ConcurrentEffect: ContextShift](
def apply[F[_]: Async](
cfg: Config,
fts: FtsClient[F],
itemOps: OItem[F],
@ -84,7 +84,7 @@ object ReProcessItem {
)
}
def processFiles[F[_]: ConcurrentEffect: ContextShift](
def processFiles[F[_]: Async](
cfg: Config,
fts: FtsClient[F],
itemOps: OItem[F],
@ -133,7 +133,7 @@ object ReProcessItem {
def isLastRetry[F[_]: Sync]: Task[F, Args, Boolean] =
Task(_.isLastRetry)
def safeProcess[F[_]: ConcurrentEffect: ContextShift](
def safeProcess[F[_]: Async](
cfg: Config,
fts: FtsClient[F],
itemOps: OItem[F],

View File

@ -19,7 +19,7 @@ import docspell.store.records.{RAttachmentMeta, RClassifierSetting}
object TextAnalysis {
type Args = ProcessItemArgs
def apply[F[_]: Sync: ContextShift](
def apply[F[_]: Async](
cfg: Config.TextAnalysis,
analyser: TextAnalyser[F],
nerFile: RegexNerFile[F]
@ -78,7 +78,7 @@ object TextAnalysis {
} yield (rm.copy(nerlabels = labels.all.toList), AttachmentDates(rm, labels.dates))
}
def predictTags[F[_]: Sync: ContextShift](
def predictTags[F[_]: Async](
ctx: Context[F, Args],
cfg: Config.TextAnalysis,
metas: Vector[RAttachmentMeta],
@ -97,7 +97,7 @@ object TextAnalysis {
} yield tags.flatten
}
def predictItemEntities[F[_]: Sync: ContextShift](
def predictItemEntities[F[_]: Async](
ctx: Context[F, Args],
cfg: Config.TextAnalysis,
metas: Vector[RAttachmentMeta],
@ -128,13 +128,12 @@ object TextAnalysis {
.map(MetaProposalList.apply)
}
private def makeClassify[F[_]: Sync: ContextShift](
private def makeClassify[F[_]: Async](
ctx: Context[F, Args],
cfg: Config.TextAnalysis,
classifier: TextClassifier[F]
)(text: String): ClassifierName => F[Option[String]] =
Classify[F](
ctx.blocker,
ctx.logger,
cfg.workingDir,
ctx.store,

View File

@ -15,7 +15,7 @@ import bitpeace.{Mimetype, RangeDef}
object TextExtraction {
def apply[F[_]: ConcurrentEffect: ContextShift](cfg: ExtractConfig, fts: FtsClient[F])(
def apply[F[_]: Async](cfg: ExtractConfig, fts: FtsClient[F])(
item: ItemData
): Task[F, ProcessItemArgs, ItemData] =
Task { ctx =>
@ -60,7 +60,7 @@ object TextExtraction {
case class Result(am: RAttachmentMeta, td: TextData, tags: List[String] = Nil)
def extractTextIfEmpty[F[_]: Sync: ContextShift](
def extractTextIfEmpty[F[_]: Async](
ctx: Context[F, ProcessItemArgs],
cfg: ExtractConfig,
lang: Language,
@ -93,7 +93,7 @@ object TextExtraction {
}
}
def extractTextToMeta[F[_]: Sync: ContextShift](
def extractTextToMeta[F[_]: Async](
ctx: Context[F, _],
cfg: ExtractConfig,
lang: Language,
@ -132,13 +132,13 @@ object TextExtraction {
def findMime: F[Mimetype] =
OptionT(ctx.store.transact(RFileMeta.findById(fileId)))
.map(_.mimetype)
.getOrElse(Mimetype.`application/octet-stream`)
.getOrElse(Mimetype.applicationOctetStream)
findMime
.flatMap(mt => extr.extractText(data, DataType(mt.toLocal), lang))
}
private def extractTextFallback[F[_]: Sync: ContextShift](
private def extractTextFallback[F[_]: Async](
ctx: Context[F, _],
cfg: ExtractConfig,
ra: RAttachment,
@ -149,7 +149,7 @@ object TextExtraction {
ctx.logger.error(s"Cannot extract text").map(_ => None)
case id :: rest =>
val extr = Extraction.create[F](ctx.blocker, ctx.logger, cfg)
val extr = Extraction.create[F](ctx.logger, cfg)
extractText[F](ctx, extr, lang)(id)
.flatMap({

View File

@ -14,7 +14,7 @@ import org.http4s.dsl.Http4sDsl
object JoexRoutes {
def apply[F[_]: ConcurrentEffect: Timer](app: JoexApp[F]): HttpRoutes[F] = {
def apply[F[_]: Async](app: JoexApp[F]): HttpRoutes[F] = {
val dsl = new Http4sDsl[F] {}
import dsl._
HttpRoutes.of[F] {
@ -34,8 +34,8 @@ object JoexRoutes {
case POST -> Root / "shutdownAndExit" =>
for {
_ <- ConcurrentEffect[F].start(
Timer[F].sleep(Duration.seconds(1).toScala) *> app.initShutdown
_ <- Async[F].start(
Temporal[F].sleep(Duration.seconds(1).toScala) *> app.initShutdown
)
resp <- Ok(BasicResult(true, "Shutdown initiated."))
} yield resp

View File

@ -31,45 +31,40 @@ trait Context[F[_], A] { self =>
last = config.retries == current.getOrElse(0)
} yield last
def blocker: Blocker
def map[C](f: A => C)(implicit F: Functor[F]): Context[F, C] =
new Context.ContextImpl[F, C](f(args), logger, store, blocker, config, jobId)
new Context.ContextImpl[F, C](f(args), logger, store, config, jobId)
}
object Context {
private[this] val log = getLogger
def create[F[_]: Functor, A](
def create[F[_]: Async, A](
jobId: Ident,
arg: A,
config: SchedulerConfig,
log: Logger[F],
store: Store[F],
blocker: Blocker
store: Store[F]
): Context[F, A] =
new ContextImpl(arg, log, store, blocker, config, jobId)
new ContextImpl(arg, log, store, config, jobId)
def apply[F[_]: Concurrent, A](
def apply[F[_]: Async, A](
job: RJob,
arg: A,
config: SchedulerConfig,
logSink: LogSink[F],
blocker: Blocker,
store: Store[F]
): F[Context[F, A]] =
for {
_ <- log.ftrace("Creating logger for task run")
logger <- QueueLogger(job.id, job.info, config.logBufferSize, logSink)
_ <- log.ftrace("Logger created, instantiating context")
ctx = create[F, A](job.id, arg, config, logger, store, blocker)
ctx = create[F, A](job.id, arg, config, logger, store)
} yield ctx
final private class ContextImpl[F[_]: Functor, A](
val args: A,
val logger: Logger[F],
val store: Store[F],
val blocker: Blocker,
val config: SchedulerConfig,
val jobId: Ident
) extends Context[F, A] {

View File

@ -1,8 +1,8 @@
package docspell.joex.scheduler
import cats.effect.{Concurrent, Sync}
import cats.effect._
import cats.implicits._
import fs2.{Pipe, Stream}
import fs2.Pipe
import docspell.common._
import docspell.common.syntax.all._
@ -45,7 +45,7 @@ object LogSink {
def printer[F[_]: Sync]: LogSink[F] =
LogSink(_.evalMap(e => logInternal(e)))
def db[F[_]: Sync](store: Store[F]): LogSink[F] =
def db[F[_]: Async](store: Store[F]): LogSink[F] =
LogSink(
_.evalMap(ev =>
for {
@ -63,9 +63,6 @@ object LogSink {
)
)
def dbAndLog[F[_]: Concurrent](store: Store[F]): LogSink[F] = {
val s: Stream[F, Pipe[F, LogEvent, Unit]] =
Stream.emits(Seq(printer[F].receive, db[F](store).receive))
LogSink(Pipe.join(s))
}
def dbAndLog[F[_]: Async](store: Store[F]): LogSink[F] =
LogSink(_.broadcastThrough(printer[F].receive, db[F](store).receive))
}

View File

@ -24,20 +24,19 @@ trait PeriodicScheduler[F[_]] {
def shutdown: F[Unit]
def periodicAwake: F[Fiber[F, Unit]]
def periodicAwake: F[Fiber[F, Throwable, Unit]]
def notifyChange: F[Unit]
}
object PeriodicScheduler {
def create[F[_]: ConcurrentEffect](
def create[F[_]: Async](
cfg: PeriodicSchedulerConfig,
sch: Scheduler[F],
queue: JobQueue[F],
store: PeriodicTaskStore[F],
client: JoexClient[F],
timer: Timer[F]
client: JoexClient[F]
): Resource[F, PeriodicScheduler[F]] =
for {
waiter <- Resource.eval(SignallingRef(true))
@ -49,8 +48,7 @@ object PeriodicScheduler {
store,
client,
waiter,
state,
timer
state
)
_ <- Resource.eval(psch.init)
} yield psch

View File

@ -12,21 +12,19 @@ import docspell.joexapi.client.JoexClient
import docspell.store.queue._
import docspell.store.records.RPeriodicTask
import com.github.eikek.fs2calev._
import eu.timepit.fs2cron.calev.CalevScheduler
import org.log4s.getLogger
final class PeriodicSchedulerImpl[F[_]: ConcurrentEffect](
final class PeriodicSchedulerImpl[F[_]: Async](
val config: PeriodicSchedulerConfig,
sch: Scheduler[F],
queue: JobQueue[F],
store: PeriodicTaskStore[F],
client: JoexClient[F],
waiter: SignallingRef[F, Boolean],
state: SignallingRef[F, State[F]],
timer: Timer[F]
state: SignallingRef[F, State[F]]
) extends PeriodicScheduler[F] {
private[this] val logger = getLogger
implicit private val _timer: Timer[F] = timer
private[this] val logger = getLogger
def start: Stream[F, Nothing] =
logger.sinfo("Starting periodic scheduler") ++
@ -35,8 +33,8 @@ final class PeriodicSchedulerImpl[F[_]: ConcurrentEffect](
def shutdown: F[Unit] =
state.modify(_.requestShutdown)
def periodicAwake: F[Fiber[F, Unit]] =
ConcurrentEffect[F].start(
def periodicAwake: F[Fiber[F, Throwable, Unit]] =
Async[F].start(
Stream
.awakeEvery[F](config.wakeupPeriod.toScala)
.evalMap(_ => logger.fdebug("Periodic awake reached") *> notifyChange)
@ -127,10 +125,11 @@ final class PeriodicSchedulerImpl[F[_]: ConcurrentEffect](
s"Scheduling next notify for timer ${pj.timer.asString} -> ${pj.timer.nextElapse(now.toUtcDateTime)}"
)
) *>
ConcurrentEffect[F]
Async[F]
.start(
CalevFs2
.sleep[F](pj.timer)
CalevScheduler
.utc[F]
.sleep(pj.timer)
.evalMap(_ => notifyChange)
.compile
.drain
@ -168,15 +167,15 @@ object PeriodicSchedulerImpl {
case class State[F[_]](
shutdownRequest: Boolean,
scheduledNotify: Option[Fiber[F, Unit]]
scheduledNotify: Option[Fiber[F, Throwable, Unit]]
) {
def requestShutdown: (State[F], Unit) =
(copy(shutdownRequest = true), ())
def setNotify(fb: Fiber[F, Unit]): (State[F], Unit) =
def setNotify(fb: Fiber[F, Throwable, Unit]): (State[F], Unit) =
(copy(scheduledNotify = Some(fb)), ())
def clearNotify: (State[F], Option[Fiber[F, Unit]]) =
def clearNotify: (State[F], Option[Fiber[F, Throwable, Unit]]) =
(copy(scheduledNotify = None), scheduledNotify)
}

View File

@ -1,8 +1,9 @@
package docspell.joex.scheduler
import cats.effect.{Concurrent, Sync}
import cats.effect._
import cats.effect.std.Queue
import cats.implicits._
import fs2.concurrent.Queue
import fs2.Stream
import docspell.common._
@ -15,28 +16,28 @@ object QueueLogger {
): Logger[F] =
new Logger[F] {
def trace(msg: => String): F[Unit] =
LogEvent.create[F](jobId, jobInfo, LogLevel.Debug, msg).flatMap(q.enqueue1)
LogEvent.create[F](jobId, jobInfo, LogLevel.Debug, msg).flatMap(q.offer)
def debug(msg: => String): F[Unit] =
LogEvent.create[F](jobId, jobInfo, LogLevel.Debug, msg).flatMap(q.enqueue1)
LogEvent.create[F](jobId, jobInfo, LogLevel.Debug, msg).flatMap(q.offer)
def info(msg: => String): F[Unit] =
LogEvent.create[F](jobId, jobInfo, LogLevel.Info, msg).flatMap(q.enqueue1)
LogEvent.create[F](jobId, jobInfo, LogLevel.Info, msg).flatMap(q.offer)
def warn(msg: => String): F[Unit] =
LogEvent.create[F](jobId, jobInfo, LogLevel.Warn, msg).flatMap(q.enqueue1)
LogEvent.create[F](jobId, jobInfo, LogLevel.Warn, msg).flatMap(q.offer)
def error(ex: Throwable)(msg: => String): F[Unit] =
LogEvent
.create[F](jobId, jobInfo, LogLevel.Error, msg)
.map(le => le.copy(ex = Some(ex)))
.flatMap(q.enqueue1)
.flatMap(q.offer)
def error(msg: => String): F[Unit] =
LogEvent.create[F](jobId, jobInfo, LogLevel.Error, msg).flatMap(q.enqueue1)
LogEvent.create[F](jobId, jobInfo, LogLevel.Error, msg).flatMap(q.offer)
}
def apply[F[_]: Concurrent](
def apply[F[_]: Async](
jobId: Ident,
jobInfo: String,
bufferSize: Int,
@ -45,7 +46,9 @@ object QueueLogger {
for {
q <- Queue.circularBuffer[F, LogEvent](bufferSize)
log = create(jobId, jobInfo, q)
_ <- Concurrent[F].start(q.dequeue.through(sink.receive).compile.drain)
_ <- Async[F].start(
Stream.fromQueueUnterminated(q).through(sink.receive).compile.drain
)
} yield log
}

View File

@ -1,6 +1,6 @@
package docspell.joex.scheduler
import cats.effect.{Fiber, Timer}
import cats.effect._
import fs2.Stream
import docspell.common.Ident
@ -30,5 +30,5 @@ trait Scheduler[F[_]] {
*/
def shutdown(cancelAll: Boolean): F[Unit]
def periodicAwake(implicit T: Timer[F]): F[Fiber[F, Unit]]
def periodicAwake: F[Fiber[F, Throwable, Unit]]
}

View File

@ -1,18 +1,17 @@
package docspell.joex.scheduler
import cats.effect._
import cats.effect.concurrent.Semaphore
import cats.effect.std.Semaphore
import cats.implicits._
import fs2.concurrent.SignallingRef
import docspell.store.Store
import docspell.store.queue.JobQueue
case class SchedulerBuilder[F[_]: ConcurrentEffect: ContextShift](
case class SchedulerBuilder[F[_]: Async](
config: SchedulerConfig,
tasks: JobTaskRegistry[F],
store: Store[F],
blocker: Blocker,
queue: Resource[F, JobQueue[F]],
logSink: LogSink[F]
) {
@ -27,10 +26,7 @@ case class SchedulerBuilder[F[_]: ConcurrentEffect: ContextShift](
withTaskRegistry(tasks.withTask(task))
def withQueue(queue: Resource[F, JobQueue[F]]): SchedulerBuilder[F] =
SchedulerBuilder[F](config, tasks, store, blocker, queue, logSink)
def withBlocker(blocker: Blocker): SchedulerBuilder[F] =
copy(blocker = blocker)
SchedulerBuilder[F](config, tasks, store, queue, logSink)
def withLogSink(sink: LogSink[F]): SchedulerBuilder[F] =
copy(logSink = sink)
@ -39,19 +35,16 @@ case class SchedulerBuilder[F[_]: ConcurrentEffect: ContextShift](
copy(queue = Resource.pure[F, JobQueue[F]](queue))
def serve: Resource[F, Scheduler[F]] =
resource.evalMap(sch =>
ConcurrentEffect[F].start(sch.start.compile.drain).map(_ => sch)
)
resource.evalMap(sch => Async[F].start(sch.start.compile.drain).map(_ => sch))
def resource: Resource[F, Scheduler[F]] = {
val scheduler = for {
val scheduler: Resource[F, SchedulerImpl[F]] = for {
jq <- queue
waiter <- Resource.eval(SignallingRef(true))
state <- Resource.eval(SignallingRef(SchedulerImpl.emptyState[F]))
perms <- Resource.eval(Semaphore(config.poolSize.toLong))
} yield new SchedulerImpl[F](
config,
blocker,
jq,
tasks,
store,
@ -68,16 +61,14 @@ case class SchedulerBuilder[F[_]: ConcurrentEffect: ContextShift](
object SchedulerBuilder {
def apply[F[_]: ConcurrentEffect: ContextShift](
def apply[F[_]: Async](
config: SchedulerConfig,
blocker: Blocker,
store: Store[F]
): SchedulerBuilder[F] =
new SchedulerBuilder[F](
config,
JobTaskRegistry.empty[F],
store,
blocker,
JobQueue(store),
LogSink.db[F](store)
)

View File

@ -2,7 +2,7 @@ package docspell.joex.scheduler
import cats.data.OptionT
import cats.effect._
import cats.effect.concurrent.Semaphore
import cats.effect.std.Semaphore
import cats.implicits._
import fs2.Stream
import fs2.concurrent.SignallingRef
@ -17,9 +17,8 @@ import docspell.store.records.RJob
import org.log4s._
final class SchedulerImpl[F[_]: ConcurrentEffect: ContextShift](
final class SchedulerImpl[F[_]: Async](
val config: SchedulerConfig,
blocker: Blocker,
queue: JobQueue[F],
tasks: JobTaskRegistry[F],
store: Store[F],
@ -37,8 +36,8 @@ final class SchedulerImpl[F[_]: ConcurrentEffect: ContextShift](
def init: F[Unit] =
QJob.runningToWaiting(config.name, store)
def periodicAwake(implicit T: Timer[F]): F[Fiber[F, Unit]] =
ConcurrentEffect[F].start(
def periodicAwake: F[Fiber[F, Throwable, Unit]] =
Async[F].start(
Stream
.awakeEvery[F](config.wakeupPeriod.toScala)
.evalMap(_ => logger.fdebug("Periodic awake reached") *> notifyChange)
@ -153,7 +152,7 @@ final class SchedulerImpl[F[_]: ConcurrentEffect: ContextShift](
for {
_ <-
logger.fdebug(s"Creating context for job ${job.info} to run cancellation $t")
ctx <- Context[F, String](job, job.args, config, logSink, blocker, store)
ctx <- Context[F, String](job, job.args, config, logSink, store)
_ <- t.onCancel.run(ctx)
_ <- state.modify(_.markCancelled(job))
_ <- onFinish(job, JobState.Cancelled)
@ -177,7 +176,7 @@ final class SchedulerImpl[F[_]: ConcurrentEffect: ContextShift](
case Right(t) =>
for {
_ <- logger.fdebug(s"Creating context for job ${job.info} to run $t")
ctx <- Context[F, String](job, job.args, config, logSink, blocker, store)
ctx <- Context[F, String](job, job.args, config, logSink, store)
jot = wrapTask(job, t.task, ctx)
tok <- forkRun(job, jot.run(ctx), t.onCancel.run(ctx), ctx)
_ <- state.modify(_.addRunning(job, tok))
@ -208,9 +207,7 @@ final class SchedulerImpl[F[_]: ConcurrentEffect: ContextShift](
ctx: Context[F, String]
): Task[F, String, Unit] =
task
.mapF(fa =>
onStart(job) *> logger.fdebug("Starting task now") *> blocker.blockOn(fa)
)
.mapF(fa => onStart(job) *> logger.fdebug("Starting task now") *> fa)
.mapF(_.attempt.flatMap({
case Right(()) =>
logger.info(s"Job execution successful: ${job.info}")
@ -252,11 +249,10 @@ final class SchedulerImpl[F[_]: ConcurrentEffect: ContextShift](
code: F[Unit],
onCancel: F[Unit],
ctx: Context[F, String]
): F[F[Unit]] = {
val bfa = blocker.blockOn(code)
): F[F[Unit]] =
logger.fdebug(s"Forking job ${job.info}") *>
ConcurrentEffect[F]
.start(bfa)
Async[F]
.start(code)
.map(fiber =>
logger.fdebug(s"Cancelling job ${job.info}") *>
fiber.cancel *>
@ -271,11 +267,12 @@ final class SchedulerImpl[F[_]: ConcurrentEffect: ContextShift](
ctx.logger.warn("Job has been cancelled.") *>
logger.fdebug(s"Job ${job.info} has been cancelled.")
)
}
}
object SchedulerImpl {
type CancelToken[F[_]] = F[Unit]
def emptyState[F[_]]: State[F] =
State(Map.empty, Set.empty, Map.empty, false)