Descarga multiproceso de archivos con script ftp python

¿Por qué es necesario?

Una vez me enfrenté a la tarea de copiar una gran cantidad de archivos de un servidor ftp. Era necesario hacer una copia de seguridad. ¡Parece que podría ser más fácil! Pero, por desgracia, no se pudo encontrar nada listo para trabajar tan rápido para mis condiciones.





Situación

Era necesario recuperar periódicamente un par de cientos de archivos del servidor ftp en Windows. Muchas cosas pequeñas y algunos archivos muy grandes. Un total de unos 500 GB. El servidor es un vps ubicado bastante lejos en el extranjero. Durante el día el auto está muy cargado, temprano en la noche se realiza un mantenimiento de rutina, en total hay 5 horas para descargar como máximo.





Ninguna de las empresas de servicios públicos que he revisado pudo hacer frente de manera eficiente y en el tiempo asignado. Bueno, no hay ningún lugar para ir, un sistema de copia de seguridad normal aún no se ha comprado, lo que significa que armamos nosotros mismos con un editor o IDE Python y listo! ¡Aventuras!





Config

Pondremos todos los parámetros para el script en un archivo separado para mayor comodidad.





Plantilla de configuración:





host = 'ip.ip.ip.ip'
user = 'ftpusername'
passwd = 'ftppassword'
basepath = '/path/to/backup/folder'  # ,        
max_threads = 20 #     
log_path = '\path\to\logfile'
statusfilepath = '\path\to\statusfile'
      
      



Guardamos la configuración con la extensión .py y la importamos al comienzo de nuestro script. Puede importar directamente al espacio de nombres del script, pero hice la construcción un poco como una muleta en la parte principal de mi script:





if __name__ == "__main__":
    host = config.host
    user = config.user
    passwd = config.passwd
    basepath = config.basepath  # ,        
    max_threads = config.max_threads
    log_path = config.log_path
    statusfilepath = config.statusfilepath
    main()
      
      



Al principio había una lista

ftp , , , ftp- . , , - .





. , , , -.





- ftp:





class MyFtp (ftplib.FTP):
    """  ,        """
    def __init__(self):
        self.host = host
        self.user = user
        self.passwd = passwd
        self.timeout = 1800
        super(MyFtp, self).__init__()

    def connect(self):
        super(MyFtp, self).connect(self.host, timeout=self.timeout)

    def login(self):
        super(MyFtp, self).login(user=self.user, passwd=self.passwd)

    def quit(self):
        super(MyFtp,self).quit()
      
      



. ftplib, .





:





class FileList:
    """      """
    def __init__(self):
        self.ftp = None
        self.file_list = []

    def connect_ftp(self):
        import sys
        self.ftp = MyFtp()
        self.ftp.connect()
        self.ftp.login()
        self.ftp.__class__.encoding = sys.getfilesystemencoding()

    def get_list(self, name):
        """       ftp-."""
        import os
        for dirname in self.ftp.mlsd(str(name), facts=["type"]):
            if dirname[1]["type"] == "file":
                entry_file_list = {}
                entry_file_list['remote_path'] = name  #  
                entry_file_list['filename'] = dirname[0]  # 
                self.file_list.append(entry_file_list)
            else:
                path = os.path.join(name, dirname[0])
                self.get_list(path)

    def get_next_file(self):
        return self.file_list.pop()

    def len(self):
        return len(self.file_list)
      
      



, , , , , .





logging. , .





class MyLogger:
    """   """
    def __init__(self):
        self.logger = None

    def start_file_logging(self, logger_name, log_path):
        """   """
        import logging
        self.logger = logging.getLogger(logger_name)
        self.logger.setLevel(logging.INFO)
        try:
            fh = logging.FileHandler(log_path)
        except FileNotFoundError:
            log_path = "downloader.log"
            fh = logging.FileHandler(log_path)
        formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
        fh.setFormatter(formatter)
        self.logger.addHandler(fh)

    def start_rotate_logging(self, logger_name, log_path, max_bytes=104857600, story_backup=5):
        """     """
        import logging
        from logging.handlers import RotatingFileHandler
        self.logger = logging.getLogger(logger_name)
        self.logger.setLevel(logging.INFO)
        try:
            fh = RotatingFileHandler(log_path, maxBytes=max_bytes, backupCount=story_backup)
        except FileNotFoundError:
            log_path = "downloader.log"
            fh = RotatingFileHandler(log_path, maxBytes=max_bytes, backupCount=story_backup)
        formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
        fh.setFormatter(formatter)
        self.logger.addHandler(fh)

    def add(self, msg):
        self.logger.info(str(msg))

    def add_error(self, msg):
        self.logger.error(str(msg))

      
      



, .





. , :





class BaseFileDownload(threading.Thread):
    """     """
    count = 0

    def __init__(self, rpath, filename, log):
        threading.Thread.__init__(self)
        self.remote_path = rpath
        self.filename = filename
        self.ftp = None
        self.command = None
        self.currentpath = None
        self.log = log
        self.__class__.count += 1 #     

    def __del__(self):
        self.__class__.count -= 1

    def connect(self):
        """    ftp"""
        import sys
        self.ftp = MyFtp()
        self.ftp.connect()
        self.ftp.login()
        self.ftp.__class__.encoding = sys.getfilesystemencoding()


    def run(self):
        """  """
        import os
        self.connect()
        self.command = str(bytes('RETR ', encoding='latin-1'), encoding='utf-8')
        self.currentpath = os.path.join(basepath, self.remote_path[3:])
        self.ftp.cwd(self.remote_path)
        if not os.path.exists(self.currentpath):
            os.makedirs(self.currentpath, exist_ok=True)
        self.host_file = os.path.join(self.currentpath, self.filename)
        try:
            with open(self.host_file, 'wb') as local_file:
                self.log.add("Start downloading " + self.filename)
                self.ftp.retrbinary(self.command + self.filename, local_file.write)
                self.log.add("Downloading " + self.filename + " complete")
        except ftplib.error_perm:
            self.log.add_error('Perm error')
        self.ftp.quit()
      
      



count. : , , , .





run - threading ( !), .





, os.makedirs.





-

. zabbix, , - , .





La clase para trabajar con estos archivos se ve así:





class StatusFile:
    """          ."""
    def __init__(self):
        self.msg = ''

    def setstatus(self, msg):
        global statusfilepath
        with open(statusfilepath, 'w') as status_file:
            status_file.write(msg)

      
      



Multihilo

Y finalmente, la propia función del script principal, que funciona con flujos de descarga:





def main():
    import os
    import datetime
    import time

    log = MyLogger()
    log.start_rotate_logging("DownloaderLog", os.path.join(log_path, "download_backup.log")) #  
    now = datetime.datetime.today().strftime("%Y%m%d")
    global basepath
    basepath = os.path.join(basepath, now)  #  ,   
    list_file = FileList()
    list_file.connect_ftp()
    list_file.get_list("..")
    for i in range(list_file.len()):
        flag = True
        while flag:   #         
            if BaseFileDownload.count < max_threads:
                curfile = list_file.get_next_file()
                threadid = BaseFileDownload(curfile["remote_path"], curfile["filename"], log)
                threadid.start()
                flag = False
            else:
                time.sleep(20)
    log.add("Downloading files complete")
    statusfile = StatusFile()
    statusfile.setstatus("Downloading at " + str(datetime.datetime.now()) + " finishing successful")
      
      



Aquí comenzamos a registrar, obtenemos una lista de archivos (se almacena en la memoria).





En un ciclo while eterno, verificamos el número de descargas que se ejecutan simultáneamente y, si es necesario, iniciamos subprocesos adicionales.





El código fuente completo se puede encontrar aquí .












All Articles