Merge pull request #392 from eikek/fix-process-timeout

Fix process timeout
This commit is contained in:
mergify[bot] 2020-10-26 23:48:27 +00:00 committed by GitHub
commit 2ad2f3be57
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 33 additions and 13 deletions

View File

@ -58,6 +58,13 @@ private[extern] object ExternConv {
} }
.compile .compile
.lastOrError .lastOrError
.attempt
.flatMap {
case Right(v) =>
v.pure[F]
case Left(ex) =>
handler.run(ConversionResult.failure(ex))
}
def readResult[F[_]: Sync: ContextShift]( def readResult[F[_]: Sync: ContextShift](
blocker: Blocker, blocker: Blocker,

View File

@ -110,7 +110,7 @@ object PdfConvTask {
ctx.logger.warn(s"Unable to convert '${mime}' file ${ctx.args}: $reason") ctx.logger.warn(s"Unable to convert '${mime}' file ${ctx.args}: $reason")
case ConversionResult.Failure(ex) => case ConversionResult.Failure(ex) =>
ctx.logger.error(ex)(s"Failure converting file ${ctx.args}: ${ex.getMessage}") Sync[F].raiseError(ex)
}) })
def ocrMyPdf(lang: Language): F[Unit] = def ocrMyPdf(lang: Language): F[Unit] =

View File

@ -121,9 +121,10 @@ object CreateItem {
private def findExisting[F[_]: Sync]: Task[F, ProcessItemArgs, Option[ItemData]] = private def findExisting[F[_]: Sync]: Task[F, ProcessItemArgs, Option[ItemData]] =
Task { ctx => Task { ctx =>
val states = ItemState.invalidStates.toList.toSet
val fileMetaIds = ctx.args.files.map(_.fileMetaId).toSet val fileMetaIds = ctx.args.files.map(_.fileMetaId).toSet
for { for {
cand <- ctx.store.transact(QItem.findByFileIds(fileMetaIds.toSeq)) cand <- ctx.store.transact(QItem.findByFileIds(fileMetaIds.toSeq, states))
_ <- _ <-
if (cand.nonEmpty) if (cand.nonEmpty)
ctx.logger.warn(s"Found ${cand.size} existing item with these files.") ctx.logger.warn(s"Found ${cand.size} existing item with these files.")

View File

@ -103,10 +103,13 @@ object ItemHandler {
) )
} }
def deleteByFileIds[F[_]: Sync: ContextShift]: Task[F, Args, Unit] = private def deleteByFileIds[F[_]: Sync: ContextShift]: Task[F, Args, Unit] =
Task { ctx => Task { ctx =>
val states = ItemState.invalidStates.toList.toSet
for { for {
items <- ctx.store.transact(QItem.findByFileIds(ctx.args.files.map(_.fileMetaId))) items <- ctx.store.transact(
QItem.findByFileIds(ctx.args.files.map(_.fileMetaId), states)
)
_ <- _ <-
if (items.nonEmpty) ctx.logger.info(s"Deleting items ${items.map(_.id.id)}") if (items.nonEmpty) ctx.logger.info(s"Deleting items ${items.map(_.id.id)}")
else else

View File

@ -493,13 +493,15 @@ object QItem {
private def findByFileIdsQuery( private def findByFileIdsQuery(
fileMetaIds: NonEmptyList[Ident], fileMetaIds: NonEmptyList[Ident],
limit: Option[Int] limit: Option[Int],
states: Set[ItemState]
): Fragment = { ): Fragment = {
val IC = RItem.Columns.all.map(_.prefix("i")) val IC = RItem.Columns.all.map(_.prefix("i"))
val aItem = RAttachment.Columns.itemId.prefix("a") val aItem = RAttachment.Columns.itemId.prefix("a")
val aId = RAttachment.Columns.id.prefix("a") val aId = RAttachment.Columns.id.prefix("a")
val aFileId = RAttachment.Columns.fileId.prefix("a") val aFileId = RAttachment.Columns.fileId.prefix("a")
val iId = RItem.Columns.id.prefix("i") val iId = RItem.Columns.id.prefix("i")
val iState = RItem.Columns.state.prefix("i")
val sId = RAttachmentSource.Columns.id.prefix("s") val sId = RAttachmentSource.Columns.id.prefix("s")
val sFileId = RAttachmentSource.Columns.fileId.prefix("s") val sFileId = RAttachmentSource.Columns.fileId.prefix("s")
val rId = RAttachmentArchive.Columns.id.prefix("r") val rId = RAttachmentArchive.Columns.id.prefix("r")
@ -516,11 +518,15 @@ object QItem {
fr"LEFT OUTER JOIN" ++ RAttachmentArchive.table ++ fr"r ON" ++ aId.is(rId) ++ fr"LEFT OUTER JOIN" ++ RAttachmentArchive.table ++ fr"r ON" ++ aId.is(rId) ++
fr"LEFT OUTER JOIN" ++ RFileMeta.table ++ fr"m3 ON" ++ m3Id.is(rFileId) fr"LEFT OUTER JOIN" ++ RFileMeta.table ++ fr"m3 ON" ++ m3Id.is(rFileId)
val q = selectSimple( val fileCond =
IC, or(m1Id.isIn(fileMetaIds), m2Id.isIn(fileMetaIds), m3Id.isIn(fileMetaIds))
from, val cond = NonEmptyList.fromList(states.toList) match {
and(or(m1Id.isIn(fileMetaIds), m2Id.isIn(fileMetaIds), m3Id.isIn(fileMetaIds))) case Some(nel) =>
) and(fileCond, iState.isIn(nel))
case None =>
fileCond
}
val q = selectSimple(IC, from, cond)
limit match { limit match {
case Some(n) => q ++ fr"LIMIT $n" case Some(n) => q ++ fr"LIMIT $n"
@ -531,15 +537,18 @@ object QItem {
def findOneByFileIds(fileMetaIds: Seq[Ident]): ConnectionIO[Option[RItem]] = def findOneByFileIds(fileMetaIds: Seq[Ident]): ConnectionIO[Option[RItem]] =
NonEmptyList.fromList(fileMetaIds.toList) match { NonEmptyList.fromList(fileMetaIds.toList) match {
case Some(nel) => case Some(nel) =>
findByFileIdsQuery(nel, Some(1)).query[RItem].option findByFileIdsQuery(nel, Some(1), Set.empty).query[RItem].option
case None => case None =>
(None: Option[RItem]).pure[ConnectionIO] (None: Option[RItem]).pure[ConnectionIO]
} }
def findByFileIds(fileMetaIds: Seq[Ident]): ConnectionIO[Vector[RItem]] = def findByFileIds(
fileMetaIds: Seq[Ident],
states: Set[ItemState]
): ConnectionIO[Vector[RItem]] =
NonEmptyList.fromList(fileMetaIds.toList) match { NonEmptyList.fromList(fileMetaIds.toList) match {
case Some(nel) => case Some(nel) =>
findByFileIdsQuery(nel, None).query[RItem].to[Vector] findByFileIdsQuery(nel, None, states).query[RItem].to[Vector]
case None => case None =>
Vector.empty[RItem].pure[ConnectionIO] Vector.empty[RItem].pure[ConnectionIO]
} }