好的,首先要做的事情 - 此 FTP 上几乎没有可用的配置;它是工业 HMI 控制面板的一部分。我已经通过 Filezilla 测试了在没有 PASV 的情况下下载文件的能力,并且工作正常。这是来自 filezilla 的详细信息 - 您可以看到它大约需要 1 秒,因为我们在这里讨论的是千字节。
Status: Starting download of /USB_Pen_Memory/Log/Data Log Trend_Ops_Data_Log_230802.txt
Trace: CFtpChangeDirOpData::Send() in state 0
Trace: CFtpChangeDirOpData::Send() in state 2
Command: CWD /USB_Pen_Memory/Log
Trace: CFtpControlSocket::OnReceive()
Response: 250 CWD command successful.
Trace: CFtpChangeDirOpData::ParseResponse() in state 2
Trace: CFtpControlSocket::ResetOperation(0)
Trace: CControlSocket::ResetOperation(0)
Trace: CFtpChangeDirOpData::Reset(0) in state 2
Trace: CFtpFileTransferOpData::SubcommandResult(0) in state 1
Trace: CFtpControlSocket::SetAsyncRequestReply
Trace: CControlSocket::SendNextCommand()
Trace: CFtpFileTransferOpData::Send() in state 5
Trace: CFtpRawTransferOpData::Send() in state 0
Trace: CFtpRawTransferOpData::Send() in state 1
Command: TYPE A
Trace: CFtpControlSocket::OnReceive()
Response: 200 Type set to A.
Trace: CFtpRawTransferOpData::ParseResponse() in state 1
Trace: CControlSocket::SendNextCommand()
Trace: CFtpRawTransferOpData::Send() in state 2
Command: PASV
Trace: CFtpControlSocket::OnReceive()
Response: 502 Invalid command.
Trace: CFtpRawTransferOpData::ParseResponse() in state 2
Trace: CControlSocket::SendNextCommand()
Trace: CFtpRawTransferOpData::Send() in state 2
Command: PORT 192,168,0,41,204,135
Trace: CFtpControlSocket::OnReceive()
Response: 200 PORT command successful.
Trace: CFtpRawTransferOpData::ParseResponse() in state 2
Trace: CControlSocket::SendNextCommand()
Trace: CFtpRawTransferOpData::Send() in state 4
Command: RETR Data Log Trend_Ops_Data_Log_230802.txt
Trace: CTransferSocket::OnAccept(0)
Trace: CTransferSocket::OnConnect
Trace: CFtpControlSocket::OnReceive()
Response: 150 Opening ASCII mode data connection for Data Log Trend_Ops_Data_Log_230802.txt.
Trace: CFtpRawTransferOpData::ParseResponse() in state 4
Trace: CControlSocket::SendNextCommand()
Trace: CFtpRawTransferOpData::Send() in state 5
Trace: CFtpControlSocket::OnReceive()
Response: 226 Transfer complete.
Trace: CFtpRawTransferOpData::ParseResponse() in state 5
Trace: CControlSocket::SendNextCommand()
Trace: CFtpRawTransferOpData::Send() in state 8
Trace: CTransferSocket::TransferEnd(1)
Trace: CFtpControlSocket::TransferEnd()
Trace: CFtpControlSocket::ResetOperation(0)
Trace: CControlSocket::ResetOperation(0)
Trace: CFtpRawTransferOpData::Reset(0) in state 8
Trace: CFtpFileTransferOpData::SubcommandResult(0) in state 7
Trace: CFtpControlSocket::ResetOperation(0)
Trace: CControlSocket::ResetOperation(0)
Trace: CFtpFileTransferOpData::Reset(0) in state 7
Status: File transfer successful, transferred 48.20 KB in 1 second
这是我在 AWS Lambda Python 版本 3.10 中的代码
from datetime import datetime as dt
from datetime import timedelta
from io import BytesIO
import ftplib
import logging
import os
import csv
import time
import boto3
import logging
def ftp_connection(HOST, USER, PASS):
ftp = ftplib.FTP(source_address=())
ftp.connect(HOST)
ftp.login(USER,PASS)
ftp.set_pasv(False)
ftp.set_debuglevel(1)
return ftp
def lambda_handler(event, context):
yesterday = dt.strftime(dt.today() - timedelta (days = 1), '%y%m%d')
timestamp = time.strftime("%Y%m%d%H%M%S", time.gmtime())
config_s3_bucket_name = os.environ['S3_CONFIG_LOC']
data_s3_bucket_name = os.environ['S3_DATA_LOC']
config_file_key = 'ftp_config.csv'
crawler_name = 'FTPLogs'
logging.basicConfig(level=logging.ERROR, format='%(asctime)s %(levelname)s %(name)s %(message)s')
logger=logging.getLogger(__name__)
# Step 1: Connect to S3 and download the config file
s3 = boto3.client('s3', config=boto3.session.Config(signature_version='s3v4'))
config_file_obj = s3.get_object(Bucket=config_s3_bucket_name, Key=config_file_key)
config_file_data = config_file_obj['Body'].read().decode('utf-8').splitlines()
config_reader = csv.DictReader(config_file_data)
# Step 2: Loop through each row in the config file and load latest log to s3
for row in config_reader:
ftp_site_name = row['ftp_site_name']
ftp_ip = row['ftp_ip_address']
ftp_username = row['ftp_username']
ftp_password = row['ftp_password']
file_directory = row['ftp_log_directory']
filename_convention = row['filename_convention']
conn = ftp_connection(ftp_ip, ftp_username, ftp_password)
#change the directory
conn.cwd(file_directory)
try :
with BytesIO() as output_buffer:
conn.transfercmd('RETR ' + filename_convention + yesterday + '.txt', output_buffer.write)
output_buffer.write(f',{ftp_site_name},{timestamp}\n'.encode('utf-8'))
output_buffer.seek(0)
s3_key = f'{ftp_site_name}/dt={timestamp}/{latest_csv_file}'
s3.upload_fileobj(output_buffer, Bucket=data_s3_bucket_name, Key=s3_key)
# Close file and connection
local_file.close()
conn.close()
# Upload the file from the local temporary directory to S3
s3 = boto3.client('s3')
s3_key = f'dt={timestamp}/{filename_convention}{yesterday}.txt'
s3.upload_file(local_temp_file, Bucket=data_s3_bucket_name, Key=s3_key)
#Log any error
except ftplib.all_errors as e :
logger.error(str(e))
continue
logging.shutdown()
glue = boto3.client('glue', config=boto3.session.Config(signature_version='s3v4'))
glue.start_crawler(Name=crawler_name)
return {
'statusCode': 200,
'body': 'Function executed successfully. Crawler Started'
}
问题是它只是坐着:
2023-08-02T19:58:15.620-07:00 *cmd* 'CWD /USB_Pen_Memory/Log'
2023-08-02T19:58:15.675-07:00 *resp* '250 CWD command successful.'
2023-08-02T19:58:15.676-07:00 *cmd* 'PORT 169,254,76,1,132,71'
2023-08-02T19:58:15.730-07:00 *resp* '200 PORT command successful.'
2023-08-02T19:58:15.730-07:00 *cmd* 'REST <built-in method write of _io.BytesIO object at 0x7f4ff15a53a0>'
2023-08-02T19:58:15.790-07:00 *resp* '350 Restarting at 0.'
2023-08-02T19:58:15.790-07:00 *cmd* 'RETR Data Log Trend_Ops_Data_Log_230801.txt'
2023-08-02T20:00:15.274-07:00 2023-08-03T03:00:15.273Z f68b8176-a188-4a50-ae45-6c12167dcad6 Task timed out after 120.05 seconds
PORT 在这里引起了我的注意,因为它与 filezilla 中显示的不同。我尝试过retrbinary和transfercmd;两者是相同的。我做错了什么?
编辑:
由于 Tim 指出 s3 很慢,所以这是我尝试下载到本地 tmp 的版本,它也遇到了同样的问题。我还记得我有两个具有不同 IP 的站点,并且两个站点都显示相同的内容。这个奇怪的不可路由的端口 IP。
from datetime import datetime as dt
from datetime import timedelta
from io import BytesIO
import ftplib
import logging
import os
import csv
import time
import boto3
import logging
def ftp_connection(HOST, USER, PASS):
ftp = ftplib.FTP(source_address=())
ftp.connect(HOST)
ftp.login(USER,PASS)
ftp.set_pasv(False)
ftp.set_debuglevel(1)
return ftp
def lambda_handler(event, context):
yesterday = dt.strftime(dt.today() - timedelta (days = 1), '%y%m%d')
timestamp = time.strftime("%Y%m%d%H%M%S", time.gmtime())
config_s3_bucket_name = os.environ['S3_CONFIG_LOC']
data_s3_bucket_name = os.environ['S3_DATA_LOC']
config_file_key = 'ftp_config.csv'
crawler_name = 'FTPLogs'
logging.basicConfig(level=logging.ERROR, format='%(asctime)s %(levelname)s %(name)s %(message)s')
logger=logging.getLogger(__name__)
# Step 1: Connect to S3 and download the config file
s3 = boto3.client('s3', config=boto3.session.Config(signature_version='s3v4'))
config_file_obj = s3.get_object(Bucket=config_s3_bucket_name, Key=config_file_key)
config_file_data = config_file_obj['Body'].read().decode('utf-8').splitlines()
config_reader = csv.DictReader(config_file_data)
# Step 2: Loop through each row in the config file
for row in config_reader:
ftp_site_name = row['ftp_site_name']
ftp_ip = row['ftp_ip_address']
ftp_username = row['ftp_username']
ftp_password = row['ftp_password']
file_directory = row['ftp_log_directory']
filename_convention = row['filename_convention']
conn = ftp_connection(ftp_ip, ftp_username, ftp_password)
#change the directory
conn.cwd(file_directory)
try :
# Define the local temporary directory
local_temp_directory = '/tmp'
os.makedirs(local_temp_directory, exist_ok=True)
# Define the local temporary file path
local_temp_file = os.path.join(local_temp_directory, f'{filename_convention}{yesterday}.txt')
# Download the file to the local temporary directory
with open(local_temp_file, 'wb') as local_file:
conn.retrbinary(f'RETR {filename_convention}{yesterday}.txt', local_file.write)
local_file.write(f',{ftp_site_name},{timestamp}\n'.encode('utf-8'))
# Close file and connection
local_file.close()
conn.close()
# Upload the file from the local temporary directory to S3
s3 = boto3.client('s3')
s3_key = f'dt={timestamp}/{filename_convention}{yesterday}.txt'
s3.upload_file(local_temp_file, Bucket=data_s3_bucket_name, Key=s3_key)
#Log any error
except ftplib.all_errors as e :
logger.error(str(e))
continue
logging.shutdown()
glue = boto3.client('glue', config=boto3.session.Config(signature_version='s3v4'))
glue.start_crawler(Name=crawler_name)
return {
'statusCode': 200,
'body': 'Function executed successfully. Crawler Started'
}