Movimiento kafkariano o cómo mover particiones

Introducción

¡Hola, Habr! Trabajo como programador java en una organización financiera. Decidí dejar mi huella en Habré y escribir mi primer artículo. Debido a problemas con la presencia de devops, se me encomendó la tarea de actualizar el clúster kafka de 2.0 a 2.6 sin tiempo de inactividad y pérdida de mensajes (ya sabes, a nadie le gusta cuando el dinero está en el aire o se pierde en algún lugar). Me gustaría compartir esta experiencia con ustedes y recibir comentarios constructivos. Entonces, suficiente agua, manos a la obra.





Esquema de migración

La tarea se complicó por el hecho de que era necesario migrar de máquinas virtuales antiguas a nuevas, por lo que la opción de apagar el broker, cambiar los binarios e iniciarlo no me convenía.





A continuación se muestra un diagrama de la migración. Tenemos 3 corredores en máquinas virtuales antiguas, 3 corredores en máquinas virtuales nuevas y cada corredor tiene su propio cuidador del zoológico.





Plan de migración

Para que podamos migrar sin que nadie lo note, tendremos que sudar "un poco". A continuación se muestra un esquema general.





  1. Agregar direcciones de nuevos corredores a la configuración de la aplicación





  2. Perforar el acceso entre todos y todo





  3. Prepare la infraestructura en nuevas máquinas virtuales





  4. Crea un nuevo grupo de zookieepers y combínalo con el anterior





  5. Crea nuevos corredores de kafka





  6. Migre todas las particiones de los corredores antiguos a los nuevos





  7. Desactive los corredores de Kafka antiguos y los zookieepers antiguos





  8. Elimina los corredores y zukipers antiguos de las nuevas configuraciones









. , , . "bootstrap.servers"





old-server1:9092,old-server2:9092,old-server3:9092,new-server4:9092,new-server5:9092,new-server6:9092







- . , . , .





  1. , 9092 -> ( 3 )





  2. <----> (+18 )





  3. 9092 (+6 )





  4. 2 3 : 2181, 2888, 3888 ( (18+6)*3 = 72)





: 99 . ! .





100 ,









, .





kafka





- kafka. , /etc/security/limits.conf





kafka hard nofile 262144
kafka soft nofile 262144
      
      



, , , .





, 262144, , ( ). 10 , .









java





/home/kafka/.bash_profile





export JAVA_HOME=/opt/java
export PATH=JAVA_HOME/bin:PATH
      
      



jre /opt/java









, , . .





setup.sh
tar -xf ../jdk1.8.0_181.tar.gz -C /opt/
mv /home/kafka/kafka /opt
mv /home/kafka/zookeeper /opt
mv /home/kafka/kafka-start.sh /opt
mv /home/kafka/scripts /opt

ln -sfn /opt/kafka/kafka_2.13-2.6.0 /opt/kafka/current
ln -sfn /opt/zookeeper/zookeeper-3.6.2 /opt/zookeeper/current
ln -sfn /opt/jdk1.8.0_181/ /opt/java

chown -R kafka:kafka /opt
chmod -R 700 /opt

#env_var start ------------------------------->

kafkaProfile=/home/kafka/.bash_profile

homeVar="export JAVA_HOME=/opt/java"
javaHome=$(cat $kafkaProfile | grep "$homeVar")


if [ "$javaHome" != "$homeVar" ]; then
    echo -e "\n$homeVar\n" >> $kafkaProfile
fi


pathVar="export PATH=\$JAVA_HOME/bin:\$PATH"
path=$(cat $kafkaProfile | grep "$pathVar")

if [ "$path" != "$pathVar" ]; then
    echo -e "\n$pathVar\n" >> $kafkaProfile
fi

#env_var end --------------------------------->




#ulimit start >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>

limitsFile=/etc/security/limits.conf

soft="kafka soft nofile 262144"
limitSoft=$(cat $limitsFile | grep "$soft")

if [ "$limitSoft" != "$soft" ]; then
    echo -e "\n$soft\n" >> $limitsFile
fi


hard="kafka hard nofile 262144"
limitHard=$(cat $limitsFile | grep "$hard")

if [ "$limitHard" != "$hard" ]; then
    echo -e "\n$hard\n" >> $limitsFile
fi

#ulimit end >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
      
      







kafka





ulimit -n  = 262144
echo $JAVA_HOME  = /opt/java
echo $PATH    /opt/java/bin
      
      



, , , , myid, id .





/opt/zookeeper/current/conf/zoo.cfg





zoo.cfg
dataDir=/opt/zookeeper/zookeeper-data
server.1=server1:2888:3888
server.2=server2:2888:3888
server.3=server3:2888:3888

server.4=server4:2888:3888
server.5=server5:2888:3888
server.6=server6:2888:3888
      
      







/opt/zookeeper/current/bin/zkServer.sh start





.





zk-add-new-servers.sh
echo -e "\nserver.4=server4:2888:3888" >> /opt/zookeeper/zookeeper-3.4.13/conf/zoo.cfg
echo -e "\nserver.5=server5:2888:3888" >> /opt/zookeeper/zookeeper-3.4.13/conf/zoo.cfg
echo -e "\nserver.6=server6:2888:3888" >> /opt/zookeeper/zookeeper-3.4.13/conf/zoo.cfg
      
      







, . .





/opt/zookeeper/zookeeper-3.4.13/bin/zkServer.sh restart
      
      



, .





.../zkServer.sh status
      
      



.





, server.properties broker.id zookeeper.connect





4





broker.id=4
zookeeper.connect=server4:2181,server5:2181,server6:2181/cluster_name
      
      







, , .





server.properties (2.0.0). .





inter.broker.protocol.version=2.0.0
      
      







nohup /opt/kafka/current/bin/kafka-server-start.sh /opt/kafka/config/server.properties > log.log 2>&1 &
      
      



. cli .





/opt/zookeeper/current/bin/zkCli.sh
      
      







ls /cluster_name/brokers/ids
      
      







[1,2,3,4,5,6]
      
      



6 , .





. - , , , . replication.factor . . .





, "", , .





json , .





{"topics": [{"topic": "foo1"},
              {"topic": "foo2"}],
  "version":1
 }
      
      



,





json kafka-reassign-partitions.sh --generate id --broker-list "4,5,6".









generate1.json
{"version":1,
  "partitions":[{"topic":"foo1","partition":2,"replicas":[1,2,3]},
                {"topic":"foo1","partition":0,"replicas":[3,2,1]},
                {"topic":"foo2","partition":2,"replicas":[1,2,3]},
                {"topic":"foo2","partition":0,"replicas":[3,2,1]},
                {"topic":"foo1","partition":1,"replicas":[2,3,1]},
                {"topic":"foo2","partition":1,"replicas":[2,3,1]}]
  }

  Proposed partition reassignment configuration

  {"version":1,
  "partitions":[{"topic":"foo1","partition":2,"replicas":[5,4,6]},
                {"topic":"foo1","partition":0,"replicas":[4,5,6]},
                {"topic":"foo2","partition":2,"replicas":[6,4,5]},
                {"topic":"foo2","partition":0,"replicas":[4,5,6]},
                {"topic":"foo1","partition":1,"replicas":[5,4,6]},
                {"topic":"foo2","partition":1,"replicas":[4,5,6]}]
  }
      
      







Proposed partition reassignment configuration ( ). - . , json.





. "" , . kafka-reassign-helper.jar .









prepare-for-reassignment.sh
#       .    20
if [ "$#" -eq 0 ]
then
    echo "no arguments"
    exit 1
fi


echo "Start reassignment preparing"

/opt/kafka/current/bin/kafka-topics.sh --list --zookeeper localhost:2181/cluster_name >> topics.txt

echo created topics.txt

java -jar kafka-reassign-helper.jar generate topics.txt $1

fileCount=$(ls -dq generate*.json | wc -l)

echo "created $fileCount file for topics to move"

echo -e "\nCreating generated files\n"

mkdir -p generated
for ((i = 1; i < $fileCount+1; i++ ))
do
/opt/kafka/current/bin/kafka-reassign-partitions.sh --zookeeper localhost:2181/cluster_name --broker-list "4,5,6" --topics-to-move-json-file "generate$i.json" --generate >> "generated/generated$i.txt"
echo "generated/generated$i.txt" created
done

echo -e "\nCreating execute/rollback files"

java -jar kafka-reassign-helper.jar execute $fileCount

echo -e "\nexecute/rollback files created"

echo -e "\nPreparing finished successfully!"
      
      







kafka-reassign-partitions.sh, --execute





move-partitions.sh
#     execute1.json, execute2.json ....
if [ "$#" -eq 0 ]
then
    echo "no arguments"
    exit 1
fi
        

/opt/kafka/current/bin/kafka-reassign-partitions.sh --zookeeper localhost:2181/cluster_name --reassignment-json-file "execute/execute$1.json" --execute
      
      







. , . , , kafka-reassign-partitions.sh, --verify, .





, , .





reassign-verify.sh
progress=-1

while [ $progress != 0 ]
do

    progress=$(/opt/kafka/current/bin/kafka-reassign-partitions.sh --zookeeper localhost:2181/cluster_name --reassignment-json-file execute/execute$1.json --verify | grep "in progress" -c)
    complete=$(/opt/kafka/current/bin/kafka-reassign-partitions.sh --zookeeper localhost:2181/cluster_name --reassignment-json-file execute/execute$1.json --verify | grep "is complete" -c)
    failed=$(/opt/kafka/current/bin/kafka-reassign-partitions.sh --zookeeper localhost:2181/cluster_name --reassignment-json-file execute/execute$1.json --verify | grep "failed" -c)

    echo "In progress:" $progress;
    echo "Is complete:" $complete;
    echo "Failed:" $failed;

    sleep 2s

done
      
      







In progress 0. Is complete - . Failed 0.





move-partitions.sh reassign-verify.sh , .





log.dirs , - .





. kafka-server-stop.sh zkStop.sh .





. zoo.cfg





zk-remove-old-servers.sh
sed -i '/server.1/d' /opt/zookeeper/current/conf/zoo.cfg
sed -i '/server.2/d' /opt/zookeeper/current/conf/zoo.cfg
sed -i '/server.3/d' /opt/zookeeper/current/conf/zoo.cfg
      
      







, , , . : 2 1 .





server.properties





remove-old-protocol.sh
sed -i '/inter.broker.protocol.version=2.0.0/d' /opt/kafka/config/server.properties
      
      







, , insync, min.insync.replicas ( 2), default.replication.factor ( 3)





- 2 , 3, , , , insync .









check-insync.sh
input="check-topics.txt"
rm -f $input

/opt/kafka/current/bin/kafka-topics.sh --list --zookeeper localhost:2181/cluster_name >> check-topics.txt

checkPerIter=100
i=0
list=""
notInsync=0
while IFS= read -r line
do
 ((i=i+1))
 list+="${line}|"
 if [ $i -eq $checkPerIter ]
  then
   list=${list::${#list}-1}
   echo "checking $list"
   count=$(/opt/kafka/current/bin/kafka-topics.sh --describe --topic $list --zookeeper localhost:2181/cluster_name | egrep "Isr: [4-6/,]{3}$" -c)
   if [ "$count" -ne 0 ]
    then
     /opt/kafka/current/bin/kafka-topics.sh --describe --topic $list --zookeeper localhost:2181/cluster_name | egrep "Isr: [4-6/,]{3}$"
   fi
   ((notInsync=notInsync+count))
   list=""
   i=0
 fi
done < "$input"

echo "not insync: $notInsync"
      
      







Si no obtenemos insync: 0 en la salida , podemos reiniciar los corredores uno por uno.





Eso es todo. La migración de corredores ahora está completa, excepto por la posterior reconfiguración del monitoreo y otros asuntos auxiliares.





Así es como se ve la instrucción, que envié a los administradores que lo hicieron todo en la batalla. Sorprendentemente, lo hicieron la primera vez y no hicieron preguntas.





README.txt
    2.0.0 -> 2.6.0

1 .  

1.1       

NEW4.tar.gz ->  4 
migration.tar.gz ->  4 

NEW5.tar.gz ->  5 
NEW6.tar.gz ->  6 

1.2  

tar -xf NEW4.tar.gz -C /home/kafka
tar -xf NEW5.tar.gz -C /home/kafka
tar -xf NEW6.tar.gz -C /home/kafka

1.3   root       (    ,   )

/home/kafka/scripts/setup.sh

1.4    kafka (   )

1.5     (   )

ulimit -n  = 262144
echo $JAVA_HOME  = /opt/java
echo $PATH    /opt/java/bin

  /opt   kafka kafka



2 .    

2.1    kafka (   )

2.2       (  )

/opt/scripts/zkStart.sh

2.3       

OLD1.tar.gz ->  1 
OLD2.tar.gz ->  2 
OLD3.tar.gz ->  3 

2.4  

tar -xf OLD1.tar.gz -C /home/kafka
tar -xf OLD2.tar.gz -C /home/kafka
tar -xf OLD3.tar.gz -C /home/kafka

2.5   root       (    )

/home/kafka/scripts/setup-old.sh

2.6    kafka    

2.7       (      )

/home/kafka/scripts/zk-add-new-servers.sh

2.8       (  )

/home/kafka/scripts/zkStatus.sh

      

2.9       
    

/home/kafka/scripts/zkRestart.sh

2.10       
     

/home/kafka/scripts/zkStatus.sh ( )
/opt/scripts/zkStatus.sh ( )

        follower
  leader

2.11   
    
/opt/kafka-start.sh

2.12      
   

/home/kafka/migration/zkCli.sh

ls /cluster_name/brokers/ids

   [1,2,3,4,5,6]

3      

3.1   (    )
/home/kafka/migration/prepare-for-assignment.sh  20

3.2           /home/kafka/migration/execute


:
  execute1.json
/home/kafka/migration/move-partitions.sh 1

      

:
/home/kafka/migration/reassign-verify.sh 1

     "in progress"  0        .3.2   

3.3          ,     /opt/kafka/kafka-data    
     

3.4       ,  
/opt/kafka/current/bin/kafka-server-stop.sh

3.5       ,  

/home/kafka/scripts/zkStop.sh

3.6      (      )
/opt/scripts/zk-remove-old-servers.sh

3.7     

/opt/scripts/zkRestart.sh

3.8       ( , 2 )

/opt/scripts/zkStatus.sh

3.9      (       )
/opt/scripts/remove-old-protocol.sh


3.10       insync

    /home/kafka/migration/check-insync.sh

not insync    0

3.11       

/opt/kafka/current/bin/kafka-server-stop.sh

    (ps aux | grep kafka    )

 /opt/kafka-start.sh

  3.10  

 !
      
      







Espero que haya ayudado a alguien en este tema. Estoy esperando sus comentarios y observaciones.





Todos los scripts e instrucciones, incluidos los de reversiones, se pueden encontrar aquí.








All Articles