mirror of
				https://github.com/TheAnachronism/docspell.git
				synced 2025-10-30 21:40:12 +00:00 
			
		
		
		
	Update full-text index when changing data
This commit is contained in:
		| @@ -45,6 +45,7 @@ object BackendApp { | ||||
|   ): Resource[F, BackendApp[F]] = | ||||
|     for { | ||||
|       httpClient     <- BlazeClientBuilder[F](httpClientEc).resource | ||||
|       solrFts        <- SolrFtsClient(cfg.fullTextSearch.solr, httpClient) | ||||
|       utStore        <- UserTaskStore(store) | ||||
|       queue          <- JobQueue(store) | ||||
|       loginImpl      <- Login[F](store) | ||||
| @@ -58,9 +59,8 @@ object BackendApp { | ||||
|       uploadImpl     <- OUpload(store, queue, cfg.files, joexImpl) | ||||
|       nodeImpl       <- ONode(store) | ||||
|       jobImpl        <- OJob(store, joexImpl) | ||||
|       itemImpl       <- OItem(store) | ||||
|       itemImpl       <- OItem(store, solrFts) | ||||
|       itemSearchImpl <- OItemSearch(store) | ||||
|       solrFts        <- SolrFtsClient(cfg.fullTextSearch.solr, httpClient) | ||||
|       fulltextImpl   <- OFulltext(itemSearchImpl, solrFts) | ||||
|       javaEmil = | ||||
|         JavaMailEmil(blocker, Settings.defaultSettings.copy(debug = cfg.mailDebug)) | ||||
|   | ||||
| @@ -5,10 +5,12 @@ import cats.implicits._ | ||||
| import cats.effect.{Effect, Resource} | ||||
| import doobie._ | ||||
| import doobie.implicits._ | ||||
| import org.log4s.getLogger | ||||
| import docspell.store.{AddResult, Store} | ||||
| import docspell.store.queries.{QAttachment, QItem} | ||||
| import docspell.common.{Direction, Ident, ItemState, MetaProposalList, Timestamp} | ||||
| import docspell.common._ | ||||
| import docspell.store.records._ | ||||
| import docspell.ftsclient.FtsClient | ||||
|  | ||||
| trait OItem[F[_]] { | ||||
|  | ||||
| @@ -38,7 +40,7 @@ trait OItem[F[_]] { | ||||
|  | ||||
|   def setNotes(item: Ident, notes: Option[String], collective: Ident): F[AddResult] | ||||
|  | ||||
|   def setName(item: Ident, notes: String, collective: Ident): F[AddResult] | ||||
|   def setName(item: Ident, name: String, collective: Ident): F[AddResult] | ||||
|  | ||||
|   def setState(item: Ident, state: ItemState, collective: Ident): F[AddResult] | ||||
|  | ||||
| @@ -67,11 +69,12 @@ trait OItem[F[_]] { | ||||
|  | ||||
| object OItem { | ||||
|  | ||||
|   def apply[F[_]: Effect](store: Store[F]): Resource[F, OItem[F]] = | ||||
|   def apply[F[_]: Effect](store: Store[F], fts: FtsClient[F]): Resource[F, OItem[F]] = | ||||
|     for { | ||||
|       otag   <- OTag(store) | ||||
|       oorg   <- OOrganization(store) | ||||
|       oequip <- OEquipment(store) | ||||
|       logger <- Resource.pure[F, Logger[F]](Logger.log4s(getLogger)) | ||||
|       oitem <- Resource.pure[F, OItem[F]](new OItem[F] { | ||||
|         def moveAttachmentBefore( | ||||
|             itemId: Ident, | ||||
| @@ -259,12 +262,18 @@ object OItem { | ||||
|             .transact(RItem.updateNotes(item, collective, notes)) | ||||
|             .attempt | ||||
|             .map(AddResult.fromUpdate) | ||||
|             .flatTap( | ||||
|               onSuccessIgnoreError(fts.updateItemNotes(logger, item, collective, notes)) | ||||
|             ) | ||||
|  | ||||
|         def setName(item: Ident, name: String, collective: Ident): F[AddResult] = | ||||
|           store | ||||
|             .transact(RItem.updateName(item, collective, name)) | ||||
|             .attempt | ||||
|             .map(AddResult.fromUpdate) | ||||
|             .flatTap( | ||||
|               onSuccessIgnoreError(fts.updateItemName(logger, item, collective, name)) | ||||
|             ) | ||||
|  | ||||
|         def setState(item: Ident, state: ItemState, collective: Ident): F[AddResult] = | ||||
|           store | ||||
| @@ -310,6 +319,29 @@ object OItem { | ||||
|             .transact(RAttachment.updateName(attachId, collective, name)) | ||||
|             .attempt | ||||
|             .map(AddResult.fromUpdate) | ||||
|             .flatTap( | ||||
|               onSuccessIgnoreError( | ||||
|                 OptionT(store.transact(RAttachment.findItemId(attachId))) | ||||
|                   .semiflatMap(itemId => | ||||
|                     fts.updateAttachmentName(logger, itemId, attachId, collective, name) | ||||
|                   ) | ||||
|                   .fold(())(identity) | ||||
|               ) | ||||
|             ) | ||||
|  | ||||
|         private def onSuccessIgnoreError(update: F[Unit])(ar: AddResult): F[Unit] = | ||||
|           ar match { | ||||
|             case AddResult.Success => | ||||
|               update.attempt.flatMap { | ||||
|                 case Right(()) => ().pure[F] | ||||
|                 case Left(ex) => | ||||
|                   logger.warn(s"Error updating full-text index: ${ex.getMessage}") | ||||
|               } | ||||
|             case AddResult.Failure(_) => | ||||
|               ().pure[F] | ||||
|             case AddResult.EntityExists(_) => | ||||
|               ().pure[F] | ||||
|           } | ||||
|       }) | ||||
|     } yield oitem | ||||
| } | ||||
|   | ||||
| @@ -44,4 +44,41 @@ trait FtsClient[F[_]] { | ||||
|   def updateIndex(logger: Logger[F], data: TextData*): F[Unit] = | ||||
|     updateIndex(logger, Stream.emits(data)) | ||||
|  | ||||
|   def updateItemName( | ||||
|       logger: Logger[F], | ||||
|       itemId: Ident, | ||||
|       collective: Ident, | ||||
|       name: String | ||||
|   ): F[Unit] = | ||||
|     updateIndex(logger, TextData.item(itemId, collective, Some(name), None)) | ||||
|  | ||||
|   def updateItemNotes( | ||||
|       logger: Logger[F], | ||||
|       itemId: Ident, | ||||
|       collective: Ident, | ||||
|       notes: Option[String] | ||||
|   ): F[Unit] = | ||||
|     updateIndex( | ||||
|       logger, | ||||
|       TextData.item(itemId, collective, None, Some(notes.getOrElse(""))) | ||||
|     ) | ||||
|  | ||||
|   def updateAttachmentName( | ||||
|       logger: Logger[F], | ||||
|       itemId: Ident, | ||||
|       attachId: Ident, | ||||
|       collective: Ident, | ||||
|       name: Option[String] | ||||
|   ): F[Unit] = | ||||
|     updateIndex( | ||||
|       logger, | ||||
|       TextData.attachment( | ||||
|         itemId, | ||||
|         attachId, | ||||
|         collective, | ||||
|         Language.English, | ||||
|         Some(name.getOrElse("")), | ||||
|         None | ||||
|       ) | ||||
|     ) | ||||
| } | ||||
|   | ||||
| @@ -21,19 +21,22 @@ final class SolrFtsClient[F[_]: Effect]( | ||||
|     solrQuery.query(q) | ||||
|  | ||||
|   def indexData(logger: Logger[F], data: Stream[F, TextData]): F[Unit] = | ||||
|     (for { | ||||
|       _      <- Stream.eval(logger.debug("Inserting data into index")) | ||||
|     modifyIndex(logger, data)(solrUpdate.add) | ||||
|  | ||||
|   def updateIndex(logger: Logger[F], data: Stream[F, TextData]): F[Unit] = | ||||
|     modifyIndex(logger, data)(solrUpdate.update) | ||||
|  | ||||
|   def modifyIndex(logger: Logger[F], data: Stream[F, TextData])(f: List[TextData] => F[Unit]): F[Unit] = | ||||
|         (for { | ||||
|       _      <- Stream.eval(logger.debug("Updating SOLR index")) | ||||
|       chunks <- data.chunks | ||||
|       res    <- Stream.eval(solrUpdate.add(chunks.toList).attempt) | ||||
|       res    <- Stream.eval(f(chunks.toList).attempt) | ||||
|       _ <- res match { | ||||
|         case Right(()) => Stream.emit(()) | ||||
|         case Left(ex) => | ||||
|           Stream.eval(logger.error(ex)("Error inserting chunk of data into index")) | ||||
|           Stream.eval(logger.error(ex)("Error updating with chunk of data")) | ||||
|       } | ||||
|     } yield ()).compile.drain | ||||
|  | ||||
|   def updateIndex(logger: Logger[F], data: Stream[F, TextData]): F[Unit] = ??? | ||||
|  | ||||
| } | ||||
|  | ||||
| object SolrFtsClient { | ||||
|   | ||||
| @@ -183,4 +183,6 @@ object RAttachment { | ||||
|       n2 <- deleteFrom(table, id.is(attachId)).update.run | ||||
|     } yield n0 + n1 + n2 | ||||
|  | ||||
|   def findItemId(attachId: Ident): ConnectionIO[Option[Ident]] = | ||||
|     selectSimple(Seq(itemId), table, id.is(attachId)).query[Ident].option | ||||
| } | ||||
|   | ||||
		Reference in New Issue
	
	Block a user