Cómo cargar rápidamente una tabla grande en Apache Ignite a través de la API de valor clave

Hace algún tiempo, la plataforma Apache Ignite emergió en el horizonte y comenzó a ganar popularidad. La computación en memoria es velocidad, lo que significa que la velocidad debe garantizarse en todas las etapas del trabajo, especialmente al cargar datos.



Debajo del corte se encuentra una descripción de un método para cargar rápidamente datos desde una tabla relacional en un clúster Apache Ignite distribuido. Se describe el preprocesamiento del conjunto de resultados de la consulta SQL en el nodo cliente del clúster y la distribución de datos en el clúster mediante la tarea de reducción de mapas. Describe las cachés y las tablas relacionales relacionadas, muestra cómo crear un objeto personalizado a partir de una fila de la tabla y cómo usar ComputeTaskAdapter para colocar rápidamente los objetos creados. Todo el código se puede ver en su totalidad en el repositorio FastDataLoad .



Historia del problema



Este texto es una traducción al ruso de mi publicación en el blog In-Memory Computing en el sitio web GridGain.



Entonces, cierta compañía decide acelerar una aplicación lenta moviendo la computación a un clúster en memoria. Los datos iniciales para los cálculos están en MS SQL; el resultado de los cálculos debe colocarse allí. El cluster está distribuido, dado que ya hay muchos datos, el rendimiento de la aplicación está al límite y el volumen de datos está creciendo. Se establecen límites de tiempo difíciles.



Antes de escribir código rápido para procesar datos, los datos deben cargarse rápidamente. Una búsqueda frenética en la web revela una clara escasez de ejemplos de código que pueden escalar a tablas de decenas o cientos de millones de filas. Ejemplos que puede descargar, compilar y seguir los pasos de la depuración. Esto es por un lado.



, Apache Ignite / GridGain, . , . " ?", — , .

, .



(World Database)



, data collocation, . world.sql Apache Ignite.



CSV , — SQL :





countryCache country.csv. countryCache — code, — String, — Country, (name, continent, region).







, — , . Country , . org.h2.tools.Csv, CSV java.sql.ResultSet. Apache Ignite , SQL H2.



    // define countryCache
    IgniteCache<String,Country> cache = ignite.cache("countryCache");

    try (ResultSet rs = new Csv().read(csvFileName, null, null)) {
     while (rs.next()) {
      String code = rs.getString("Code");
      String name = rs.getString("Name");
      String continent = rs.getString("Continent");
      Country country = new Country(code,name,continent);
      cache.put(code,country);
     }
    }


. , , . - .



, . , .





Apache Ignite — -. , PARTITIONED - (partition) . ; , . -, affinity function, , .



ResultSet . , . .





, :



  • HashMap partition_number -> key -> Value

    Map<Integer, Map<String, Country>> result = new HashMap<>();
  • affinity function partition_number. cache.put() - HashMap partition_number

    try (ResultSet rs = new Csv().read(csvFileName, null, null)) {
     while (rs.next()) {
      String code = rs.getString("Code");
      String name = rs.getString("Name");
      String continent = rs.getString("Continent");
      Country country = new Country(code,name,continent);
      result.computeIfAbsent(affinity.partition(key), k -> new HashMap<>()).put(code,country);
     }
    }


ComputeTaskAdapter ComputeJobAdapter. ComputeJobAdapter 1024. , .



ComputeJobAdapter . , .



Compute Task,



, "ComputeTaskAdapter initiates the simplified, in-memory, map-reduce process". ComputeJobAdapter map — , . reduce — .



(RenewLocalCacheJob)



, RenewLocalCacheJob .



targetCache.putAll(addend);


RenewLocalCacheJob partition_number .



(AbstractLoadTask)



( loader) — AbstractLoadTask. . ( ), AbstractLoadTask TargetCacheKeyType. HashMap



    Map<Integer, Map<TargetCacheKeyType, BinaryObject>> result;


countryCache String. . AbstractLoadTask TargetCacheKeyType, BinaryObject. , — .



BinaryObject



— . , JVM, - . class definition , JAR- . Country



    IgniteCache<String, Country> countryCache;


, , classpath ClassNotFound.



. — classpath, :



  • JAR- ;
  • classpath ;
  • ;
  • .


BinaryObject () . :





  • IgniteCache<String, BinaryObject> countryCache;    
  • Country BinaryObject (. LoadCountries.java)

    Country country = new Country(code, name, .. );    
    BinaryObject binCountry = node.binary().toBinary(country);    
  • HashMap, BinaryObject

    Map<Integer, Map<String, BinaryObject>> result


, . , , ClassNotFoundException .



. .



Apache Ignite : .





default-config.xml — . :



  • GridGain CE Installing Using ZIP Archive. 8.7.10, FastDataLoad , ;
  • {gridgain}\config default-config.xml



    <bean id="grid.cfg" class="org.apache.ignite.configuration.IgniteConfiguration">
        <property name="peerClassLoadingEnabled" value="true"/>
    </bean>
  • , {gridgain}\bin ignite.bat. ; ;
  • , . ,

    [08:40:04] Topology snapshot [ver=2, locNode=d52b1db3, servers=2, clients=0, state=ACTIVE, CPUs=8, offheap=3.2GB, heap=2.0GB]


. , 8.7.25, pom.xml



    <gridgain.version>8.7.25</gridgain.version>




class org.apache.ignite.spi.IgniteSpiException: Local node and remote node have different version numbers (node will not join, Ignite does not support rolling updates, so versions must be exactly the same) [locBuildVer=8.7.25, rmtBuildVer=8.7.10]




, , map-reduce. — JAR-, compute task . Windows, Linux.

:



  • FastDataLoad;
  • ;

    mvn clean package
  • , .

    java -jar .\target\fastDataLoad.jar


main() LoadApp LoaderAgrument . map-reduce LoadCountries.

LoadCountries RenewLocalCacheJob , ( ).



#1





#2









country.csv , CountryCode . cityCache countryLanguageCache; , .





.



.



:



  • (SQL Server Management Studio):

    • — 44 686 837;
    • — 1.071 GB;
  • — 0H:1M:35S;
  • RenewLocalCacheJob reduce — 0H:0M:9S.


Se necesita menos tiempo para distribuir datos en un clúster que para ejecutar una consulta SQL.




All Articles