我正在构建一个接受多个输入图像的 HTTP 服务器,而图像可以作为图像文件或缓冲区输入。
例如,请求命令可能如下所示:
$ curl -F image_file=@/path/to/image1.jpg \
-F image_file=@/path/to/image2.png \
-F image_buffer=/path/to/image_buffer.bin -F image_buffer_shape="256,256,3" \
-F image_file=@/path/to/image4.tif \
<server URL>
上面的命令将发送一个
multipart/form-data
类型的 HTTP POST 请求,该请求将按顺序包含以下 5 个部分: image_file, image_file, image_buffer, image_buffer_shape, image_file
- 这是客户端发送它们的顺序。
问题是,当使用Python的内置函数
cgi.parse_multipart
时,结果将是一个列表字典,它丢失了不同名称的输入的顺序。在我的示例中,它将如下所示:
{'image_file' : [<content of image1.jpg>, <content of image2.png>, <content of image4.tif>],
'image_buffer' : [<content of image_buffer.bin>],
'image_buffer_shape' : ['256,256,3']
}
所以在这个例子中,没有办法表明
image4.tif
是在image_bufffer.bin
之后提到的。
在访问每个输入的原始索引的同时解析多部分有效负载的最“标准”和优雅的方法是什么?
我的问题涉及使用Python内置的
cgi
模块来解析标头和多部分有效负载的场景。然而,cgi
模块已被弃用。
为了解析标头,实际上不需要使用
cgi
模块,因为 self.headers
中可用的 BaseHTTPRequestHandler.do_POST
属于类 http.client.HTTPMessage
,它是 email.message.Message
的子类。此类提供了解析标头的标准方法(例如,self.headers.get_content_type()
、self.headers.get_boundary()
、self.headers.get("Content-Length")
。)
为了解析多部分有效负载,用户可以参考第三方库或手动解析数据。要手动解析,可以按边界分割有效负载(分割时需要添加前导
--
字符串),删除第一个和最后一个部分(第一个边界之前和最后一个边界之后),并将每个部分传递给 email.message_from_bytes
。生成的对象将允许您读取该部分的标头字段(例如,.get_param('name', header='content-disposition')
),并使用 .get_payload(decode = True)
获取有效负载。
用我找到的解决方法回答:
在 Python 中处理具有多部分负载的 POST 请求通常如下所示:
import http.server
import cgi
class MyHandler(http.server.BaseHTTPRequestHandler):
def do_POST(self):
ctype, pdict = cgi.parse_header(self.headers['content-type'])
pdict['boundary'] = bytes(pdict['boundary'], 'utf-8')
postvars = cgi.parse_multipart(self.rfile, pdict)
# --- do somthing with the postvars dictionary ---
self.send_response(200)
self.end_headers()
self.wfile.write('whatever response...')
正如问题中所解释的,
postvars
返回的cgi.parse_multipart
字典不保留不同输入名称之间的部分索引。
cgi.parse_multipart
函数,这样我们就可以通过逐一解析各个部分来保留索引。代码可能如下所示:
import http.server
import cgi
import io
class MyHandler(http.server.BaseHTTPRequestHandler):
def do_POST(self):
ctype, pdict = cgi.parse_header(self.headers['content-type'])
boundary_for_manual_split = bytes('--' + pdict['boundary'], 'utf-8')
pdict['boundary'] = bytes(pdict['boundary'], 'utf-8')
content = self.rfile.read(int(self.headers['Content-Length']))
all_boundary_indices = []
start_index = 0
while True:
try:
all_boundary_indices.append(content.index(boundary_for_manual_split, start_index))
start_index = all_boundary_indices[-1]+1
except:
break
all_boundary_indices.append(len(content))
parts = [content[all_boundary_indices[i]:all_boundary_indices[i+1]] for i in range(len(all_boundary_indices) - 1)]
postvar_list = []
for i, part in enumerate(parts):
postvars = cgi.parse_multipart(io.BytesIO(part), pdict)
postvar_list.append(postvars.items())
# --- do somthing with the postvar_list inputs ---
self.send_response(200)
self.end_headers()
self.wfile.write('whatever response...')
上面的实现创建了列表
postvar_list
,其中包含输入名称和内容的排序元组。