ftplib RETR 小文件卡住了

问题描述 投票:0回答:0

好的,首先要做的事情 - 此 FTP 上几乎没有可用的配置;它是工业 HMI 控制面板的一部分。我已经通过 Filezilla 测试了在没有 PASV 的情况下下载文件的能力,并且工作正常。这是来自 filezilla 的详细信息 - 您可以看到它大约需要 1 秒,因为我们在这里讨论的是千字节。

Status: Starting download of /USB_Pen_Memory/Log/Data Log Trend_Ops_Data_Log_230802.txt
Trace:  CFtpChangeDirOpData::Send() in state 0
Trace:  CFtpChangeDirOpData::Send() in state 2
Command:    CWD /USB_Pen_Memory/Log
Trace:  CFtpControlSocket::OnReceive()
Response:   250 CWD command successful.
Trace:  CFtpChangeDirOpData::ParseResponse() in state 2
Trace:  CFtpControlSocket::ResetOperation(0)
Trace:  CControlSocket::ResetOperation(0)
Trace:  CFtpChangeDirOpData::Reset(0) in state 2
Trace:  CFtpFileTransferOpData::SubcommandResult(0) in state 1
Trace:  CFtpControlSocket::SetAsyncRequestReply
Trace:  CControlSocket::SendNextCommand()
Trace:  CFtpFileTransferOpData::Send() in state 5
Trace:  CFtpRawTransferOpData::Send() in state 0
Trace:  CFtpRawTransferOpData::Send() in state 1
Command:    TYPE A
Trace:  CFtpControlSocket::OnReceive()
Response:   200 Type set to A.
Trace:  CFtpRawTransferOpData::ParseResponse() in state 1
Trace:  CControlSocket::SendNextCommand()
Trace:  CFtpRawTransferOpData::Send() in state 2
Command:    PASV
Trace:  CFtpControlSocket::OnReceive()
Response:   502 Invalid command.
Trace:  CFtpRawTransferOpData::ParseResponse() in state 2
Trace:  CControlSocket::SendNextCommand()
Trace:  CFtpRawTransferOpData::Send() in state 2
Command:    PORT 192,168,0,41,204,135
Trace:  CFtpControlSocket::OnReceive()
Response:   200 PORT command successful.
Trace:  CFtpRawTransferOpData::ParseResponse() in state 2
Trace:  CControlSocket::SendNextCommand()
Trace:  CFtpRawTransferOpData::Send() in state 4
Command:    RETR Data Log Trend_Ops_Data_Log_230802.txt
Trace:  CTransferSocket::OnAccept(0)
Trace:  CTransferSocket::OnConnect
Trace:  CFtpControlSocket::OnReceive()
Response:   150 Opening ASCII mode data connection for Data Log Trend_Ops_Data_Log_230802.txt.
Trace:  CFtpRawTransferOpData::ParseResponse() in state 4
Trace:  CControlSocket::SendNextCommand()
Trace:  CFtpRawTransferOpData::Send() in state 5
Trace:  CFtpControlSocket::OnReceive()
Response:   226 Transfer complete.
Trace:  CFtpRawTransferOpData::ParseResponse() in state 5
Trace:  CControlSocket::SendNextCommand()
Trace:  CFtpRawTransferOpData::Send() in state 8
Trace:  CTransferSocket::TransferEnd(1)
Trace:  CFtpControlSocket::TransferEnd()
Trace:  CFtpControlSocket::ResetOperation(0)
Trace:  CControlSocket::ResetOperation(0)
Trace:  CFtpRawTransferOpData::Reset(0) in state 8
Trace:  CFtpFileTransferOpData::SubcommandResult(0) in state 7
Trace:  CFtpControlSocket::ResetOperation(0)
Trace:  CControlSocket::ResetOperation(0)
Trace:  CFtpFileTransferOpData::Reset(0) in state 7
Status: File transfer successful, transferred 48.20 KB in 1 second

这是我在 AWS Lambda Python 版本 3.10 中的代码

from datetime import datetime as dt
from datetime import timedelta
from io import BytesIO
import ftplib
import logging
import os
import csv
import time
import boto3
import logging

def ftp_connection(HOST, USER, PASS):
    
    ftp = ftplib.FTP(source_address=())
    ftp.connect(HOST)
    ftp.login(USER,PASS)
    ftp.set_pasv(False)
    ftp.set_debuglevel(1)

    return ftp

def lambda_handler(event, context):

    yesterday = dt.strftime(dt.today() - timedelta (days = 1), '%y%m%d')
    timestamp = time.strftime("%Y%m%d%H%M%S", time.gmtime())
    config_s3_bucket_name = os.environ['S3_CONFIG_LOC']
    data_s3_bucket_name = os.environ['S3_DATA_LOC']
    config_file_key = 'ftp_config.csv'
    crawler_name = 'FTPLogs'

    logging.basicConfig(level=logging.ERROR, format='%(asctime)s %(levelname)s %(name)s %(message)s')
    logger=logging.getLogger(__name__)

    # Step 1: Connect to S3 and download the config file
    s3 = boto3.client('s3', config=boto3.session.Config(signature_version='s3v4'))
    config_file_obj = s3.get_object(Bucket=config_s3_bucket_name, Key=config_file_key)
    config_file_data = config_file_obj['Body'].read().decode('utf-8').splitlines()
    config_reader = csv.DictReader(config_file_data)

    # Step 2: Loop through each row in the config file and load latest log to s3
    for row in config_reader:
        ftp_site_name = row['ftp_site_name']
        ftp_ip = row['ftp_ip_address']
        ftp_username = row['ftp_username']
        ftp_password = row['ftp_password']
        file_directory = row['ftp_log_directory']
        filename_convention = row['filename_convention']

        conn = ftp_connection(ftp_ip, ftp_username, ftp_password)

        #change the directory
        conn.cwd(file_directory)

        try :
                
            with BytesIO() as output_buffer:
                conn.transfercmd('RETR ' + filename_convention + yesterday + '.txt', output_buffer.write)
                output_buffer.write(f',{ftp_site_name},{timestamp}\n'.encode('utf-8'))
                output_buffer.seek(0)

                s3_key = f'{ftp_site_name}/dt={timestamp}/{latest_csv_file}'
                s3.upload_fileobj(output_buffer, Bucket=data_s3_bucket_name, Key=s3_key)


            # Close file and connection
            local_file.close()
            conn.close()

            # Upload the file from the local temporary directory to S3
            s3 = boto3.client('s3')
            s3_key = f'dt={timestamp}/{filename_convention}{yesterday}.txt'
            s3.upload_file(local_temp_file, Bucket=data_s3_bucket_name, Key=s3_key)
        
        #Log any error
        except ftplib.all_errors as e :

            logger.error(str(e))
            continue

        logging.shutdown()

    glue = boto3.client('glue', config=boto3.session.Config(signature_version='s3v4')) 
    glue.start_crawler(Name=crawler_name)
    
    return {
        'statusCode': 200,
        'body': 'Function executed successfully. Crawler Started'
    }

问题是它只是坐着:

2023-08-02T19:58:15.620-07:00   *cmd* 'CWD /USB_Pen_Memory/Log'

2023-08-02T19:58:15.675-07:00   *resp* '250 CWD command successful.'

2023-08-02T19:58:15.676-07:00   *cmd* 'PORT 169,254,76,1,132,71'

2023-08-02T19:58:15.730-07:00   *resp* '200 PORT command successful.'

2023-08-02T19:58:15.730-07:00   *cmd* 'REST <built-in method write of _io.BytesIO object at 0x7f4ff15a53a0>'

2023-08-02T19:58:15.790-07:00   *resp* '350 Restarting at 0.'

2023-08-02T19:58:15.790-07:00   *cmd* 'RETR Data Log Trend_Ops_Data_Log_230801.txt'

2023-08-02T20:00:15.274-07:00   2023-08-03T03:00:15.273Z f68b8176-a188-4a50-ae45-6c12167dcad6 Task timed out after 120.05 seconds

PORT 在这里引起了我的注意,因为它与 filezilla 中显示的不同。我尝试过retrbinary和transfercmd;两者是相同的。我做错了什么?

编辑:

由于 Tim 指出 s3 很慢,所以这是我尝试下载到本地 tmp 的版本,它也遇到了同样的问题。我还记得我有两个具有不同 IP 的站点,并且两个站点都显示相同的内容。这个奇怪的不可路由的端口 IP。

from datetime import datetime as dt
from datetime import timedelta
from io import BytesIO
import ftplib
import logging
import os
import csv
import time
import boto3
import logging

def ftp_connection(HOST, USER, PASS):
    
    ftp = ftplib.FTP(source_address=())
    ftp.connect(HOST)
    ftp.login(USER,PASS)
    ftp.set_pasv(False)
    ftp.set_debuglevel(1)

    return ftp

def lambda_handler(event, context):

    yesterday = dt.strftime(dt.today() - timedelta (days = 1), '%y%m%d')
    timestamp = time.strftime("%Y%m%d%H%M%S", time.gmtime())
    config_s3_bucket_name = os.environ['S3_CONFIG_LOC']
    data_s3_bucket_name = os.environ['S3_DATA_LOC']
    config_file_key = 'ftp_config.csv'
    crawler_name = 'FTPLogs'

    logging.basicConfig(level=logging.ERROR, format='%(asctime)s %(levelname)s %(name)s %(message)s')
    logger=logging.getLogger(__name__)

    # Step 1: Connect to S3 and download the config file
    s3 = boto3.client('s3', config=boto3.session.Config(signature_version='s3v4'))
    config_file_obj = s3.get_object(Bucket=config_s3_bucket_name, Key=config_file_key)
    config_file_data = config_file_obj['Body'].read().decode('utf-8').splitlines()
    config_reader = csv.DictReader(config_file_data)

    # Step 2: Loop through each row in the config file
    for row in config_reader:
        ftp_site_name = row['ftp_site_name']
        ftp_ip = row['ftp_ip_address']
        ftp_username = row['ftp_username']
        ftp_password = row['ftp_password']
        file_directory = row['ftp_log_directory']
        filename_convention = row['filename_convention']

        conn = ftp_connection(ftp_ip, ftp_username, ftp_password)

        #change the directory
        conn.cwd(file_directory)

        try :
            
            # Define the local temporary directory
            local_temp_directory = '/tmp'
            os.makedirs(local_temp_directory, exist_ok=True)

            # Define the local temporary file path
            local_temp_file = os.path.join(local_temp_directory, f'{filename_convention}{yesterday}.txt')

            # Download the file to the local temporary directory
            with open(local_temp_file, 'wb') as local_file:
                conn.retrbinary(f'RETR {filename_convention}{yesterday}.txt', local_file.write)
                local_file.write(f',{ftp_site_name},{timestamp}\n'.encode('utf-8'))

            # Close file and connection
            local_file.close()
            conn.close()

            # Upload the file from the local temporary directory to S3
            s3 = boto3.client('s3')
            s3_key = f'dt={timestamp}/{filename_convention}{yesterday}.txt'
            s3.upload_file(local_temp_file, Bucket=data_s3_bucket_name, Key=s3_key)
        
        #Log any error
        except ftplib.all_errors as e :

            logger.error(str(e))
            continue

        logging.shutdown()

    glue = boto3.client('glue', config=boto3.session.Config(signature_version='s3v4')) 
    glue.start_crawler(Name=crawler_name)
    
    return {
        'statusCode': 200,
        'body': 'Function executed successfully. Crawler Started'
    }
python ftplib
© www.soinside.com 2019 - 2024. All rights reserved.