¿Por qué es necesario?
Una vez me enfrenté a la tarea de copiar una gran cantidad de archivos de un servidor ftp. Era necesario hacer una copia de seguridad. ¡Parece que podría ser más fácil! Pero, por desgracia, no se pudo encontrar nada listo para trabajar tan rápido para mis condiciones.
Situación
Era necesario recuperar periódicamente un par de cientos de archivos del servidor ftp en Windows. Muchas cosas pequeñas y algunos archivos muy grandes. Un total de unos 500 GB. El servidor es un vps ubicado bastante lejos en el extranjero. Durante el día el auto está muy cargado, temprano en la noche se realiza un mantenimiento de rutina, en total hay 5 horas para descargar como máximo.
Ninguna de las empresas de servicios públicos que he revisado pudo hacer frente de manera eficiente y en el tiempo asignado. Bueno, no hay ningún lugar para ir, un sistema de copia de seguridad normal aún no se ha comprado, lo que significa que armamos nosotros mismos con un editor o IDE Python y listo! ¡Aventuras!
Config
Pondremos todos los parámetros para el script en un archivo separado para mayor comodidad.
Plantilla de configuración:
host = 'ip.ip.ip.ip'
user = 'ftpusername'
passwd = 'ftppassword'
basepath = '/path/to/backup/folder' # ,
max_threads = 20 #
log_path = '\path\to\logfile'
statusfilepath = '\path\to\statusfile'
Guardamos la configuración con la extensión .py y la importamos al comienzo de nuestro script. Puede importar directamente al espacio de nombres del script, pero hice la construcción un poco como una muleta en la parte principal de mi script:
if __name__ == "__main__":
host = config.host
user = config.user
passwd = config.passwd
basepath = config.basepath # ,
max_threads = config.max_threads
log_path = config.log_path
statusfilepath = config.statusfilepath
main()
Al principio había una lista
ftp , , , ftp- . , , - .
. , , , -.
- ftp:
class MyFtp (ftplib.FTP):
""" , """
def __init__(self):
self.host = host
self.user = user
self.passwd = passwd
self.timeout = 1800
super(MyFtp, self).__init__()
def connect(self):
super(MyFtp, self).connect(self.host, timeout=self.timeout)
def login(self):
super(MyFtp, self).login(user=self.user, passwd=self.passwd)
def quit(self):
super(MyFtp,self).quit()
. ftplib, .
:
class FileList:
""" """
def __init__(self):
self.ftp = None
self.file_list = []
def connect_ftp(self):
import sys
self.ftp = MyFtp()
self.ftp.connect()
self.ftp.login()
self.ftp.__class__.encoding = sys.getfilesystemencoding()
def get_list(self, name):
""" ftp-."""
import os
for dirname in self.ftp.mlsd(str(name), facts=["type"]):
if dirname[1]["type"] == "file":
entry_file_list = {}
entry_file_list['remote_path'] = name #
entry_file_list['filename'] = dirname[0] #
self.file_list.append(entry_file_list)
else:
path = os.path.join(name, dirname[0])
self.get_list(path)
def get_next_file(self):
return self.file_list.pop()
def len(self):
return len(self.file_list)
, , , , , .
logging. , .
class MyLogger:
""" """
def __init__(self):
self.logger = None
def start_file_logging(self, logger_name, log_path):
""" """
import logging
self.logger = logging.getLogger(logger_name)
self.logger.setLevel(logging.INFO)
try:
fh = logging.FileHandler(log_path)
except FileNotFoundError:
log_path = "downloader.log"
fh = logging.FileHandler(log_path)
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
fh.setFormatter(formatter)
self.logger.addHandler(fh)
def start_rotate_logging(self, logger_name, log_path, max_bytes=104857600, story_backup=5):
""" """
import logging
from logging.handlers import RotatingFileHandler
self.logger = logging.getLogger(logger_name)
self.logger.setLevel(logging.INFO)
try:
fh = RotatingFileHandler(log_path, maxBytes=max_bytes, backupCount=story_backup)
except FileNotFoundError:
log_path = "downloader.log"
fh = RotatingFileHandler(log_path, maxBytes=max_bytes, backupCount=story_backup)
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
fh.setFormatter(formatter)
self.logger.addHandler(fh)
def add(self, msg):
self.logger.info(str(msg))
def add_error(self, msg):
self.logger.error(str(msg))
, .
. , :
class BaseFileDownload(threading.Thread):
""" """
count = 0
def __init__(self, rpath, filename, log):
threading.Thread.__init__(self)
self.remote_path = rpath
self.filename = filename
self.ftp = None
self.command = None
self.currentpath = None
self.log = log
self.__class__.count += 1 #
def __del__(self):
self.__class__.count -= 1
def connect(self):
""" ftp"""
import sys
self.ftp = MyFtp()
self.ftp.connect()
self.ftp.login()
self.ftp.__class__.encoding = sys.getfilesystemencoding()
def run(self):
""" """
import os
self.connect()
self.command = str(bytes('RETR ', encoding='latin-1'), encoding='utf-8')
self.currentpath = os.path.join(basepath, self.remote_path[3:])
self.ftp.cwd(self.remote_path)
if not os.path.exists(self.currentpath):
os.makedirs(self.currentpath, exist_ok=True)
self.host_file = os.path.join(self.currentpath, self.filename)
try:
with open(self.host_file, 'wb') as local_file:
self.log.add("Start downloading " + self.filename)
self.ftp.retrbinary(self.command + self.filename, local_file.write)
self.log.add("Downloading " + self.filename + " complete")
except ftplib.error_perm:
self.log.add_error('Perm error')
self.ftp.quit()
count. : , , , .
run - threading ( !), .
, os.makedirs.
-
. zabbix, , - , .
La clase para trabajar con estos archivos se ve así:
class StatusFile:
""" ."""
def __init__(self):
self.msg = ''
def setstatus(self, msg):
global statusfilepath
with open(statusfilepath, 'w') as status_file:
status_file.write(msg)
Multihilo
Y finalmente, la propia función del script principal, que funciona con flujos de descarga:
def main():
import os
import datetime
import time
log = MyLogger()
log.start_rotate_logging("DownloaderLog", os.path.join(log_path, "download_backup.log")) #
now = datetime.datetime.today().strftime("%Y%m%d")
global basepath
basepath = os.path.join(basepath, now) # ,
list_file = FileList()
list_file.connect_ftp()
list_file.get_list("..")
for i in range(list_file.len()):
flag = True
while flag: #
if BaseFileDownload.count < max_threads:
curfile = list_file.get_next_file()
threadid = BaseFileDownload(curfile["remote_path"], curfile["filename"], log)
threadid.start()
flag = False
else:
time.sleep(20)
log.add("Downloading files complete")
statusfile = StatusFile()
statusfile.setstatus("Downloading at " + str(datetime.datetime.now()) + " finishing successful")
Aquí comenzamos a registrar, obtenemos una lista de archivos (se almacena en la memoria).
En un ciclo while eterno, verificamos el número de descargas que se ejecutan simultáneamente y, si es necesario, iniciamos subprocesos adicionales.
El código fuente completo se puede encontrar aquí .