Fetch: una biblioteca para acceder a los datos

Fetch es una biblioteca de Scala para organizar el acceso a datos de sistemas de archivos, bases de datos, servicios web y cualquier otra fuente, cuyos datos se pueden obtener mediante un identificador único. La biblioteca está escrita en un estilo funcional y se basa en Cats and Cats Effect. Diseñado para la composición y optimización de consultas a diferentes fuentes de datos. Te permite:



  • solicitar datos de múltiples fuentes en paralelo;
  • solicitar datos de una fuente en paralelo;
  • combinar solicitudes a una fuente en una solicitud;
  • deduplicar consultas en cada una de las situaciones enumeradas;
  • resultados de la consulta en caché.


Para hacer esto, la biblioteca proporciona herramientas que le permiten escribir código comercial limpio sin construcciones de bajo nivel para implementar las optimizaciones enumeradas.

Los ejemplos utilizan la última versión de Fetch en el momento de escribir este artículo: 1.3.0.



Fuente de datos en Fetch



Para implementar el acceso a cualquier fuente a través de Fetch, debe implementar:



  • descripción de la fuente de datos (rasgo Data[I, A]);
  • métodos para obtener datos de la fuente (rasgo DataSource[F[_], I, A]).


DataSource[F[_], I, A] ( I — , - : , ID ; A — , F — ) :



/**
 * A `DataSource` is the recipe for fetching a certain identity `I`, which yields
 * results of type `A` performing an effect of type `F[_]`.
 */
trait DataSource[F[_], I, A] {
  def data: Data[I, A]

  implicit def CF: Concurrent[F]

  /** Fetch one identity, returning a None if it wasn't found.
   */
  def fetch(id: I): F[Option[A]]

  /** Fetch many identities, returning a mapping from identities to results. If an
   * identity wasn't found, it won't appear in the keys.
   */
  def batch(ids: NonEmptyList[I]): F[Map[I, A]] =
    FetchExecution.parallel(
      ids.map(id => fetch(id).tupleLeft(id))
    ).map(_.collect { case (id, Some(x)) => id -> x }.toMap)

  def maxBatchSize: Option[Int] = None

  def batchExecution: BatchExecution = InParallel
}


data: Data[I, A]Data[I,A], . CF — «» , — Concurrent. fetch — ID. batch — , ID . fetch — ID . : , , .



Fetch:



class ListData(val list: List[String]) extends Data[Int, String] {
  override def name: String = "My List of Data"
}

class ListDataSource(list: ListData)(implicit cs: ContextShift[IO])
    extends DataSource[IO, Int, String]
    with LazyLogging {

  override def data: ListData = list

  /*implicit  Stack overflow,       */
  override def CF: Concurrent[IO] = Concurrent[IO]

  override def fetch(id: Int): IO[Option[String]] =
    CF.delay {
      logger.info(s"Processing element from index $id")
      data.list.lift(id)
    }
}


: DataSource - Data. :



class ListSource(list: List[String])(implicit cf: ContextShift[IO]) extends Data[Int, String] with LazyLogging {
  override def name: String        = "My List of Data"
  private def instance: ListSource = this

  def source = new DataSource[IO, Int, String] {
    override def data: Data[Int, String] = instance

    override def CF: Concurrent[IO] = Concurrent[IO]

    override def fetch(id: Int): IO[Option[String]] =
      CF.delay {
        logger.info(s"Processing element from index $id")
        list.lift(id)
      }
  }
}


fetch . . Fetch , .



DataSource . Fetch. — - "" . . Fetch Fetch.run ( run , ). ID . , F, Concurrent[F]. :



val list                                = List("a", "b", "c")
val data: ListSource                    = new ListSource(list)
val source: DataSource[IO, Int, String] = data.source

val fetchDataPlan: Fetch[IO, String] = Fetch(1, source)
val fetchData: IO[String]            = Fetch.run(fetchDataPlan)
val dataCalculated: String           = fetchData.unsafeRunSync // b


, ID , .



:



object Example extends App {

  implicit val ec: ExecutionContext = global
  implicit val cs: ContextShift[IO] = IO.contextShift(ec)  //  Fetch.run  ListDataSource
  implicit val timer: Timer[IO]     = IO.timer(ec) //  Fetch.run

  val list = List("a", "b", "c")
  val data   = new ListSource(list)
  val source = data.source

  Fetch.run(Fetch(0, source)).unsafeRunSync  
  // INFO ListDataSource - Processing element from index 0

  Fetch.run(Fetch(1, source)).unsafeRunSync  
  // INFO ListDataSource - Processing element from index 1

  Fetch.run(Fetch(2, source)).unsafeRunSync  
  // INFO ListDataSource - Processing element from index 2

  Fetch.run(Fetch(3, source)).unsafeRunSync  
  // INFO ListDataSource - Processing element from index 3
  // Exception in thread "main" fetch.package$MissingIdentity
}


, data.list.lift(id) fetch . , fetch None. , Option Option : DataSource[F[_], I, A]. Option , Fetch apply, optional:



Fetch.run(Fetch.optional(3, source)).unsafeRunSync  // None


, :



val fApply: Fetch[IO, String]            = Fetch(3, source)
val fOptional: Fetch[IO, Option[String]] = Fetch.optional(3, source)


- Data , Fetch. , , optional:



def fetchElem(id: Int) = Fetch.optional(id, source)


fetchElem ListSource:



Fetch.run(data.fetchElem(0)).unsafeRunSync // INFO app.ListDataSource - Processing element from index 0
Fetch.run(data.fetchElem(1)).unsafeRunSync // INFO app.ListDataSource - Processing element from index 1
Fetch.run(data.fetchElem(2)).unsafeRunSync // INFO app.ListDataSource - Processing element from index 2
println(Fetch.run(data.fetchElem(2)).unsafeRunSync) // Some(c)
println(Fetch.run(data.fetchElem(3)).unsafeRunSync) // None




Fetch « »:



def fetch(id: Int): Option[String] = {
  val run   = Fetch.run(data.fetchElem(id))
  run.unsafeRunSync
}

fetch(1)  // INFO app.ListDataSource - Processing element from index 1
fetch(1)  // INFO app.ListDataSource - Processing element from index 1
fetch(1)  // INFO app.ListDataSource - Processing element from index 1


Fetch DataCache[F[_]]. — InMemoryCache[F[_]: Monad](state: Map[(Data[Any, Any], DataSourceId), DataSourceResult]). - from — ; empty — :



def from[F[_]: Monad, I, A](results: ((Data[I, A], I), A)*): InMemoryCache[F] 
def empty[F[_]: Monad]: InMemoryCache[F]


, , Map[(Data[Any, Any], DataSourceId), DataSourceResult]. :



final class DataSourceId(val id: Any)         extends AnyVal
final class DataSourceResult(val result: Any) extends AnyVal


, — (Data[Any, Any], DataSourceId). ID . — DataSourceResult. . , Fetch . , — Any . . InMemoryCache :



def lookup[I, A](i: I, data: Data[I, A]): F[Option[A]] =
  Applicative[F].pure(
    state
      .get((data.asInstanceOf[Data[Any, Any]], new DataSourceId(i)))
      .map(_.result.asInstanceOf[A])
  )


, Data — . Data A, asInstanceOf[A] Any . Map updated, .



, , Map Scala — . - .



from:



val cacheF: DataCache[IO] = InMemoryCache.from((data, 1) -> "b", (data, 2) -> "c")

Fetch.run(data.fetchElem(1), cacheF).unsafeRunSync  //   
Fetch.run(data.fetchElem(1), cacheF).unsafeRunSync  //   
Fetch.run(data.fetchElem(1), cacheF).unsafeRunSync
Fetch.run(data.fetchElem(1), cacheF).unsafeRunSync
Fetch.run(data.fetchElem(0), cacheF).unsafeRunSync
Fetch.run(data.fetchElem(0), cacheF).unsafeRunSync

// INFO app.ListDataSource - Processing element from index 0
// INFO app.ListDataSource - Processing element from index 0


, , . , - — Map . , . Fetch.runCache, ( , ):



var cache: DataCache[IO] = InMemoryCache.empty

def cachedRun(id: Int): Option[String] = {
  val (c, r) = Fetch.runCache(Fetch.optional(id, source), cache).unsafeRunSync
  cache = c  //    
  r
}

cachedRun(1)
cachedRun(1)
cachedRun(2)
cachedRun(2)
cachedRun(4)
cachedRun(4)

// INFO app.ListDataSource - Processing element from index 1
// INFO app.ListDataSource - Processing element from index 2
// INFO app.ListDataSource - Processing element from index 4
// INFO app.ListDataSource - Processing element from index 4


, . — .



: Caffeine



DataCache. Fetch. DataCache Java- Caffeine, — Scala Scaffeine:



class ScaffeineCache extends DataCache[IO] with LazyLogging {

  private val cache =
    Scaffeine()
      .recordStats()
      .expireAfterWrite(1.hour)
      .maximumSize(500)
      .build[(Data[Any, Any], Any), Any]()

  override def lookup[I, A](i: I, data: Data[I, A]): IO[Option[A]] = IO {
    cache
      .getIfPresent(data.asInstanceOf[Data[Any, Any]] -> i)
      .map { any =>
        val correct = any.asInstanceOf[A]
        logger.info(s"From cache: $i")
        correct
      }
  }

  override def insert[I, A](i: I, v: A, data: Data[I, A]): IO[DataCache[IO]] = {
    cache.put(data.asInstanceOf[Data[Any, Any]] -> i, v) // Unit
    IO(this)
  }

}


, InMemoryCache. Scaffeine — Any: build[(Data[Any, Any], Any), Any](). asInstanceOf:



val list  = List("a", "b", "c")
val listSource  = new ListSource(list)
val source = listSource.source
val randomSource = new RandomSource()
val cache = new ScaffeineCache()

/**   */
Fetch.run(Fetch(1, source)).unsafeRunSync // Processing element from index 1
Fetch.run(Fetch(1, source)).unsafeRunSync // Processing element from index 1

println()

/**   */
Fetch.run(Fetch(1, source), cache).unsafeRunSync // Processing element from index 1
Fetch.run(Fetch(1, source), cache).unsafeRunSync // From cache: 1
Fetch.run(Fetch("a", source), cache).unsafeRunSync // type mismatch

/**      */
Fetch.run(randomSource.fetchInt(2), cache).unsafeRunSync  // Getting next random by max 2
Fetch.run(randomSource.fetchInt(2), cache).unsafeRunSync  // From cache: 2


:



  • ID ( asInstanceOf) type mismatch Fetch ID Source, ;
  • ;
  • Caffeine, — . , DataCache insert.




Fetch Scala Cats. — List[Fetch[_,_]] Fetch[_, List[_]], Fetch.run. Fetch: , , , . Fetch , Fetch Fetch.run — , .



. :



val t1: IO[(String, String)] = for {
  a <- Fetch.run(Fetch(1, source))
  b <- Fetch.run(Fetch(1, source))
} yield (a, b)


for : Fetch[_, List[_]] , Fetch run , . for :



val f1: Fetch[IO, (String, String)] = for {
  a <- Fetch(1, source)
  b <- Fetch(1, source)
} yield (a,b)

val t2: IO[(String, String)] = Fetch.run(f1)


Fetch . , Fetch.run :



  • ;
  • — ;
  • — .


DataSource batch .



Cats. , sequence List[F[_]] F[List[_]]. traverse : List[] `F[List[]]`. , tupled :



val f3: List[Fetch[IO, String]] = List(
  Fetch(1, source),
  Fetch(2, source),
  Fetch(2, source)
)

val f31: Fetch[IO, List[String]] = f3.sequence
val t3: IO[List[String]]         = Fetch.run(f31)

val f4: List[Int] = List(
  1,
  2,
  2
)

val f41: Fetch[IO, List[String]] = f4.traverse(Fetch(_, source))
val t4: IO[List[String]]         = Fetch.run(f41)

val f5: (Fetch[IO, String], Fetch[IO, String]) = (Fetch(1, source), Fetch(2, source))
val f51: Fetch[IO, (String, String)]           = f5.tupled
val t5: IO[(String, String)]                   = Fetch.run(f51)

val f6: (Int, Int)                = (1, 2)
val f61: Fetch[IO, (Int, String)] = f6.traverse(Fetch(_, source))


flatMap:



val f0: Fetch[IO, String]  = Fetch(1, source).flatMap(_ => Fetch(1, source))
val t0: IO[String]         = Fetch.run(f0)


, , Fetch . Cats. Fetch — , . , , .



(Batching)



Fetch . :



import fetch.fetchM  //  Fetch   Cats

val tuple: Fetch[IO, (Option[String], Option[String])] = (data.fetchElem(0), data.fetchElem(1)).tupled

Fetch.run(tuple).unsafeRunSync()  // (Some(a),Some(b))


batch DataSource fetch . . , SQL- .



batch ListSource:



override def batch(ids: NonEmptyList[Int]): IO[Map[Int, String]] = {
  logger.info(s"IDs fetching in batch: $ids")
  super.batch(ids)
}


traverse:



import fetch.fetchM 

def findMany: Fetch[IO, List[Option[String]]] =
  List(0, 1, 2, 3, 4, 5).traverse(data.fetchElem)

Fetch.run(findMany).unsafeRunSync
// INFO app.ListSource - IDs fetching in batch: NonEmptyList(0, 5, 1, 2, 3, 4)


, maxBatchSize:



override def maxBatchSize: Option[Int] = 2.some  // defaults to None

// INFO app.ListSource - IDs fetching in batch: NonEmptyList(0, 5)
// INFO app.ListSource - IDs fetching in batch: NonEmptyList(1, 2)
// INFO app.ListSource - IDs fetching in batch: NonEmptyList(3, 4)


, . batchExecution:



override def batchExecution: BatchExecution = Sequentially // defaults to `InParallel`




Fetch.run. , , Fetch .



, , - . ID, . Fetch , :



class RandomSource(implicit cf: ContextShift[IO]) extends Data[Int, Int] with LazyLogging {

  override def name: String          = "Random numbers generator"
  private def instance: RandomSource = this

  def source: DataSource[IO, Int, Int] = new DataSource[IO, Int, Int] {
    override def data: Data[Int, Int] = instance

    override def CF: Concurrent[IO] = Concurrent[IO]

    override def fetch(max: Int): IO[Option[Int]] =
      CF.delay {
        logger.info(s"Getting next random by max $max")
        scala.util.Random.nextInt(max).some
      }
  }
}


Fetch listSource:



val listSource = new ListSource(List("a", "b", "c"))
val randomSource = new RandomSource()

def fetchMulti: Fetch[IO, (Int, String)] =
  for {
    rnd <- Fetch(3, randomSource.source)  // Fetch[IO, Int]
    char <- Fetch(rnd, listSource.source)  // Fetch[IO, String]
  } yield (rnd, char)

println(Fetch.run(fetchMulti).unsafeRunSync)  // , (0,a)


, . : , , -, . - .





, . , for flatMap , . sequence traverse . , , sequence traverse . tupled, . , .





, , . — ID , . .



:



val list = List("a", "b", "c", "d", "e", "f", "g", "h", "i")
val data = new ListSource(list, 2.some)

val tupleD: Fetch[IO, (Option[String], Option[String])] = (data.fetchElem(0), data.fetchElem(0)).tupled
Fetch.run(tupleD).unsafeRunSync()
//INFO app.sources.ListSource - Processing element from index 0
//(Some(a),Some(a))


:



def fetchMultiD: Fetch[IO, (Int, String, Int, String)] =
  for {
    rnd1 <- Fetch(3, randomSource.source)  // Fetch[IO, Int]
    char1 <- Fetch(rnd1, listSource.source)  // Fetch[IO, String]
    rnd2 <- Fetch(3, randomSource.source)  // Fetch[IO, Int]
    char2 <- Fetch(rnd2, listSource.source)  // Fetch[IO, String]
  } yield (rnd1, char1, rnd2, char2)

println(Fetch.run(fetchMultiD).unsafeRunSync)
//18:43:11.875 [scala-execution-context-global-14] INFO app.sources.RandomSource - Getting next random by max 3
//18:43:11.876 [scala-execution-context-global-13] INFO app.sources.ListSource - Processing element from index 1
//(1,b,1,b)




Fetch :



  • FetchException;
  • MissingIdentity ID;
  • UnhandledException .


Fetch Option, , Either:



// val i: String = Fetch.run(Fetch(5, data.source)).unsafeRunSync // Exception in thread "main" fetch.package$MissingIdentity

val i: Either[Throwable, String] = Fetch.run(Fetch(5, data.source)).attempt.unsafeRunSync // Left(fetch.package$MissingIdentity)


Fetch



Fetch Fetch.runLog, FetchLog . fetch-debug describe, Throwable Log.



throwable:



// libraryDependencies += "com.47deg" %% "fetch-debug" % "1.3.0"
import fetch.debug.describe
val t: Either[Throwable, (Log, String)] = Fetch.runLog(Fetch(5, data.source)).attempt.unsafeRunSync
println(t.fold(describe, identity))
// [ERROR] Identity with id `5` for data source `My List of Data` not found, fetch interrupted after 1 rounds
// Fetch execution 0.00 seconds
//
//    [Round 1] 0.00 seconds
//      [Fetch one] From `My List of Data` with id 5 0.00 seconds


Fetch (>> Cats flatMap(_ => ...)):



object DebugExample extends App with ContextEntities {
  val list = List("a", "b", "c", "d", "e", "f", "g", "h")

  val listData                                = new ListSource(list)
  val listSource: DataSource[IO, Int, String] = listData.source
  val randomSource                            = new RandomSource().source

  val cacheF: DataCache[IO] = InMemoryCache.from((listData, 1) -> "b")

  //    
  val cached = Fetch(1, listSource)

  //  #1,   
  val notCached = Fetch(2, listSource)

  // #2
  val random = Fetch(10, randomSource)

  // #3
  val batched: Fetch[IO, (String, String)] = (Fetch(3, listSource), Fetch(4, listSource)).tupled

  // #4
  val combined = (Fetch(5, listSource), Fetch(150, randomSource)).tupled

  /** End of fetches */
  val complicatedFetch: Fetch[IO, (String, Int)] = cached >> notCached >> random >> notCached >> batched >> combined
  val result: IO[(Log, (String, Int))]           = Fetch.runLog(complicatedFetch, cacheF)
  val tuple: (Log, (String, Int))                = result.unsafeRunSync()

  println(tuple._2) // (f,17)
  println(describe(tuple._1))
  println(tuple._1)

  //Fetch execution 0.11 seconds
  //
  //  [Round 1] 0.06 seconds
  //    [Fetch one] From `My List of Data` with id 2 0.06 seconds
  //  [Round 2] 0.00 seconds
  //    [Fetch one] From `Random numbers generator` with id 10 0.00 seconds
  //  [Round 3]  0.01 seconds
  //    [Batch] From `My List of Data` with ids List(3, 4)  0.01 seconds
  //  [Round 4]  0.00 seconds
  //    [Fetch one] From `Random numbers generator` with id 150  0.00 seconds
  //    [Fetch one] From `My List of Data` with id 5  0.00 seconds

  // raw:
  // FetchLog(Queue(Round(List(Request(FetchOne(2,app.sources.ListSource@ea6147e),10767139,10767203))), Round(List(Request(FetchOne(10,app.sources.RandomSource@58b31054),10767211,10767213))), Round(List(Request(Batch(NonEmptyList(3, 4),app.sources.ListSource@ea6147e),10767234,10767242))), Round(List(Request(FetchOne(150,app.sources.RandomSource@58b31054),10767252,10767252), Request(FetchOne(5,app.sources.ListSource@ea6147e),10767252,10767252)))))
}


:



  • cached , ;
  • notCached . C ;
  • batch ;
  • .


. >>, . 3 4 tupled, . — , . — , .



:



, , . , :



  • ID - ;
  • ;
  • ID;
  • .


( ):



/*Model*/

type DocumentId = String
type PersonId = String

case class FtsResponse(ids: List[DocumentId])

case class SimilarityItem(id: DocumentId, similarity: Double)

case class DocumentInfo(id: DocumentId, info: String, authors: List[PersonId])

case class Person(id: PersonId, fullTitle: String)

/*Response*/
case class DocumentSearchResponse(
    items: List[DocumentSearchItem]
)

case class DocumentItem(id: DocumentId, info: Option[String], authors: List[Person])

case class DocumentSimilarItem(
    item: DocumentItem,
    similarity: Double
)

case class DocumentSearchItem(
    item: DocumentItem,
    similar: List[DocumentSimilarItem]
)

class DocumentSearchExample(
    fts: Fts[IO],
    documentInfoRepo: DocumentInfoRepo[IO],
    vectorSearch: VectorSearch[IO],
    personRepo: PersonRepo[IO]
)(
    implicit cs: ContextShift[IO]
) {

  val infoSource    = new DocumentInfoSource(documentInfoRepo, 16.some)
  val personSource  = new PersonSource(personRepo, 16.some)
  val similarSource = new SimilarDocumentSource(vectorSearch, 16.some)

  def documentItemFetch(id: DocumentId): Fetch[IO, DocumentItem] =
    for {
      infoOpt <- infoSource.fetchElem(id)
      p       <- infoOpt.traverse(i => i.authors.traverse(personSource.fetchElem).map(_.flatten))
    } yield DocumentItem(id, infoOpt.map(_.info), p.getOrElse(List.empty[Person]))

  def fetchSimilarItems(id: DocumentId): Fetch[IO, List[DocumentSimilarItem]] =
    similarSource
      .fetchElem(id)
      .map(_.getOrElse(List.empty[SimilarityItem]))
      .flatMap {
        _.traverse { si =>
          documentItemFetch(si.id).map { di =>
            DocumentSimilarItem(di, si.similarity)
          }
        }
      }

  def searchDocumentFetch(query: String): Fetch[IO, DocumentSearchResponse] =
    for {
      docs <- Fetch.liftF(fts.search(query))
      items <- docs.ids.traverse { id =>
                (documentItemFetch(id), fetchSimilarItems(id)).tupled.map(r => DocumentSearchItem(r._1, r._2))
              }
    } yield DocumentSearchResponse(items)

}


DocumentSearchExample . , . , Doobie. Fetch — DocumentInfoSource, PersonSource SimilarDocumentSource, .



searchDocumentFetch query. , , Fetch. search:



def search(query: String): F[FtsResponse]


liftF Fetch[F, FtsResponse], for Fetch.



ID ID . Fetch tupled, , . , DocumentSearchItem.



fetchSimilarItems ID similarSource, — documentItemFetch. . DocumentInfo ID, , .



Fetch[IO, Fetch[...]] map flatMap:



similarSource
  .fetchElem(id)
  .map(_.getOrElse(List.empty[SimilarityItem]))  // Fetch
  .flatMap { 
    _.traverse { si =>
      documentItemFetch(si.id).map { di =>  // Fetch
        DocumentSimilarItem(di, si.similarity)
      }
    }
  }


:



private val docInfo = Map(
  "1" -> DocumentInfo("1", "Document 1", List(1)),
  "2" -> DocumentInfo("2", "Document 2", List(1,2)),
  "3" -> DocumentInfo("3", "Document 3", List(3,1)),
  "4" -> DocumentInfo("4", "Document 4", List(2,1)),
  "5" -> DocumentInfo("5", "Document 5", List(2)),
  "6" -> DocumentInfo("6", "Document 6", List(1,3))
)

private val similars = Map(
  "1" -> List(SimilarityItem("2", 0.7), SimilarityItem("3", 0.6)),
  "2" -> List(SimilarityItem("1", 0.7)),
  "3" -> List(SimilarityItem("1", 0.6)),
  "4" -> List(),
  "5" -> List(SimilarityItem("6", 0.5)),
  "6" -> List(SimilarityItem("5", 0.5))
)

private val persons = Map(
  1 -> Person(1, "Rick Deckard"),
  2 -> Person(2, "Roy Batty"),
  3 -> Person(3, "Joe")
)


, . , :



INFO app.searchfetchproto.source.SimilarDocumentSource - Fetching similar documents for ID: 2. It is: Some(List(1))
INFO app.searchfetchproto.source.SimilarDocumentSource - Fetching similar documents for ID: 4. It is: None
INFO app.searchfetchproto.source.SimilarDocumentSource - Fetching similar documents for ID: 3. It is: Some(List(1))
INFO app.searchfetchproto.source.SimilarDocumentSource - Fetching similar documents for ID: 1. It is: Some(List(2, 3))
INFO app.searchfetchproto.source.SimilarDocumentSource - Fetching similar documents for ID: 6. It is: Some(List(5))
INFO app.searchfetchproto.source.SimilarDocumentSource - Fetching similar documents for ID: 5. It is: Some(List(6))
INFO app.searchfetchproto.source.DocumentInfoSource - Document IDs fetching in batch: NonEmptyList(4, 5, 2, 3, 6, 1)
INFO app.searchfetchproto.source.PersonSource - Person IDs fetching in batch: NonEmptyList(1, 2, 3)


, , batch:



[Round 1]  0.12 seconds
    [Batch] From `Similar Document Source` with ids List(4, 5, 2, 3, 6, 1) 0.06 seconds
    [Batch] From `Document Info Source` with ids List(4, 5, 2, 3, 6, 1) 0.12 seconds
  [Round 2]  0.00 seconds
    [Batch] From `Persons source` with ids List(1, 2, 3)  0.00 seconds


searchDocumentFetch :



(documentItemFetch(id), fetchSimilarItems(id)).tupled


tupled. , fetchSimilarItems documentItemFetch, , :



[Batch] From `Similar Document Source` with ids List(4, 5, 2, 3, 6, 1) 0.06 seconds
[Batch] From `Document Info Source` with ids List(4, 5, 2, 3, 6, 1) 0.12 seconds


, , . . (, Thread.sleep(100) ), :



INFO app.searchfetchproto.source.DocumentInfoSource - Document IDs fetching in batch: NonEmptyList(4, 5, 2, 3, 6, 1)
INFO app.searchfetchproto.source.SimilarDocumentSource - Fetching similar documents for ID: 4. It is: None
INFO app.searchfetchproto.source.SimilarDocumentSource - Fetching similar documents for ID: 5. It is: Some(List(6))
INFO app.searchfetchproto.source.SimilarDocumentSource - Fetching similar documents for ID: 3. It is: Some(List(1))
INFO app.searchfetchproto.source.SimilarDocumentSource - Fetching similar documents for ID: 2. It is: Some(List(1))
INFO app.searchfetchproto.source.SimilarDocumentSource - Fetching similar documents for ID: 6. It is: Some(List(5))
INFO app.searchfetchproto.source.SimilarDocumentSource - Fetching similar documents for ID: 1. It is: Some(List(2, 3))
INFO app.searchfetchproto.source.DocumentInfoSource - Document IDs fetching in batch: NonEmptyList(5, 2, 3, 6, 1)

  [Round 1]  0.13 seconds
    [Batch] From `Similar Document Source` with ids List(4, 5, 2, 3, 6, 1)  0.13 seconds
    [Batch] From `Document Info Source` with ids List(4, 5, 2, 3, 6, 1)  0.08 seconds
  [Round 2]  0.00 seconds
    [Batch] From `Document Info Source` with ids List(5, 2, 3, 6, 1)  0.00 seconds
    [Batch] From `Persons source` with ids List(1, 2, 3)  0.00 seconds


, fetchSimilarItems. , .





Fetch . , , , . Cats, Fetch Scala Doobie fs2.





  • ZQuery — , , ZIO Cats;
  • Clump — , Fetch, 2015;
  • Haxl — Haskell.


ZQuery Fetch — “There is no Fork: an Abstraction for Efficient, Concurrent, and Concise Data Access” (https://simonmar.github.io/bib/papers/haxl-icfp14.pdf), ( ) .








All Articles