使用 Python 脚本中的 POST 发送文件

问题描述 投票:0回答:10

有没有办法从 Python 脚本使用 POST 发送文件?

python post file-upload http-post
10个回答
251
投票

来自:https://requests.readthedocs.io/en/latest/user/quickstart/#post-a-multipart-encoded-file

Requests 使上传多部分编码文件变得非常简单:

with open('report.xls', 'rb') as f:
    r = requests.post('http://httpbin.org/post', files={'report.xls': f})

就是这样。我不是在开玩笑 - 这是一行代码。文件已发送。让我们检查一下:

>>> r.text
{
  "origin": "179.13.100.4",
  "files": {
    "report.xls": "<censored...binary...data>"
  },
  "form": {},
  "url": "http://httpbin.org/post",
  "args": {},
  "headers": {
    "Content-Length": "3196",
    "Accept-Encoding": "identity, deflate, compress, gzip",
    "Accept": "*/*",
    "User-Agent": "python-requests/0.8.0",
    "Host": "httpbin.org:80",
    "Content-Type": "multipart/form-data; boundary=127.0.0.1.502.21746.1321131593.786.1"
  },
  "data": ""
}

32
投票

是的。您将使用

urllib2
模块,并使用
multipart/form-data
内容类型进行编码。这里有一些示例代码可以帮助您入门——它不仅仅是文件上传,但您应该能够通读它并了解它是如何工作的:

user_agent = "image uploader"
default_message = "Image $current of $total"

import logging
import os
from os.path import abspath, isabs, isdir, isfile, join
import random
import string
import sys
import mimetypes
import urllib2
import httplib
import time
import re

def random_string (length):
    return ''.join (random.choice (string.letters) for ii in range (length + 1))

def encode_multipart_data (data, files):
    boundary = random_string (30)

    def get_content_type (filename):
        return mimetypes.guess_type (filename)[0] or 'application/octet-stream'

    def encode_field (field_name):
        return ('--' + boundary,
                'Content-Disposition: form-data; name="%s"' % field_name,
                '', str (data [field_name]))

    def encode_file (field_name):
        filename = files [field_name]
        return ('--' + boundary,
                'Content-Disposition: form-data; name="%s"; filename="%s"' % (field_name, filename),
                'Content-Type: %s' % get_content_type(filename),
                '', open (filename, 'rb').read ())

    lines = []
    for name in data:
        lines.extend (encode_field (name))
    for name in files:
        lines.extend (encode_file (name))
    lines.extend (('--%s--' % boundary, ''))
    body = '\r\n'.join (lines)

    headers = {'content-type': 'multipart/form-data; boundary=' + boundary,
               'content-length': str (len (body))}

    return body, headers

def send_post (url, data, files):
    req = urllib2.Request (url)
    connection = httplib.HTTPConnection (req.get_host ())
    connection.request ('POST', req.get_selector (),
                        *encode_multipart_data (data, files))
    response = connection.getresponse ()
    logging.debug ('response = %s', response.read ())
    logging.debug ('Code: %s %s', response.status, response.reason)

def make_upload_file (server, thread, delay = 15, message = None,
                      username = None, email = None, password = None):

    delay = max (int (delay or '0'), 15)

    def upload_file (path, current, total):
        assert isabs (path)
        assert isfile (path)

        logging.debug ('Uploading %r to %r', path, server)
        message_template = string.Template (message or default_message)

        data = {'MAX_FILE_SIZE': '3145728',
                'sub': '',
                'mode': 'regist',
                'com': message_template.safe_substitute (current = current, total = total),
                'resto': thread,
                'name': username or '',
                'email': email or '',
                'pwd': password or random_string (20),}
        files = {'upfile': path}

        send_post (server, data, files)

        logging.info ('Uploaded %r', path)
        rand_delay = random.randint (delay, delay + 5)
        logging.debug ('Sleeping for %.2f seconds------------------------------\n\n', rand_delay)
        time.sleep (rand_delay)

    return upload_file

def upload_directory (path, upload_file):
    assert isabs (path)
    assert isdir (path)

    matching_filenames = []
    file_matcher = re.compile (r'\.(?:jpe?g|gif|png)$', re.IGNORECASE)

    for dirpath, dirnames, filenames in os.walk (path):
        for name in filenames:
            file_path = join (dirpath, name)
            logging.debug ('Testing file_path %r', file_path)
            if file_matcher.search (file_path):
                matching_filenames.append (file_path)
            else:
                logging.info ('Ignoring non-image file %r', path)

    total_count = len (matching_filenames)
    for index, file_path in enumerate (matching_filenames):
        upload_file (file_path, index + 1, total_count)

def run_upload (options, paths):
    upload_file = make_upload_file (**options)

    for arg in paths:
        path = abspath (arg)
        if isdir (path):
            upload_directory (path, upload_file)
        elif isfile (path):
            upload_file (path)
        else:
            logging.error ('No such path: %r' % path)

    logging.info ('Done!')

6
投票

看起来 python requests 不能处理非常大的多部分文件。

文档建议您查看

requests-toolbelt

这是他们文档中的相关页面


4
投票

阻止您直接在文件对象上使用 urlopen 的唯一原因是内置文件对象缺少 len 定义。一个简单的方法是创建一个子类,它为 urlopen 提供正确的文件。 我还修改了下面文件中的 Content-Type 标头。

import os
import urllib2
class EnhancedFile(file):
    def __init__(self, *args, **keyws):
        file.__init__(self, *args, **keyws)

    def __len__(self):
        return int(os.fstat(self.fileno())[6])

theFile = EnhancedFile('a.xml', 'r')
theUrl = "http://example.com/abcde"
theHeaders= {'Content-Type': 'text/xml'}

theRequest = urllib2.Request(theUrl, theFile, theHeaders)

response = urllib2.urlopen(theRequest)

theFile.close()


for line in response:
    print line

2
投票

Chris Atlee 的 poster 库对此非常有效(特别是便利功能

poster.encode.multipart_encode()
)。作为奖励,它支持大文件流式传输,而无需将整个文件加载到内存中。另请参阅 Python 问题 3244


2
投票

我正在尝试测试 django rest api 及其对我的工作:

def test_upload_file(self):
        filename = "/Users/Ranvijay/tests/test_price_matrix.csv"
        data = {'file': open(filename, 'rb')}
        client = APIClient()
        # client.credentials(HTTP_AUTHORIZATION='Token ' + token.key)
        response = client.post(reverse('price-matrix-csv'), data, format='multipart')

        print response
        self.assertEqual(response.status_code, status.HTTP_200_OK)

0
投票

您可能还想看看 httplib2,以及 examples。我发现使用 httplib2 比使用内置的 HTTP 模块更简洁。


0
投票
def visit_v2(device_code, camera_code):
    image1 = MultipartParam.from_file("files", "/home/yuzx/1.txt")
    image2 = MultipartParam.from_file("files", "/home/yuzx/2.txt")
    datagen, headers = multipart_encode([('device_code', device_code), ('position', 3), ('person_data', person_data), image1, image2])
    print "".join(datagen)
    if server_port == 80:
        port_str = ""
    else:
        port_str = ":%s" % (server_port,)
    url_str = "http://" + server_ip + port_str + "/adopen/device/visit_v2"
    headers['nothing'] = 'nothing'
    request = urllib2.Request(url_str, datagen, headers)
    try:
        response = urllib2.urlopen(request)
        resp = response.read()
        print "http_status =", response.code
        result = json.loads(resp)
        print resp
        return result
    except urllib2.HTTPError, e:
        print "http_status =", e.code
        print e.read()

0
投票

我在这里尝试了一些选项,但我在标题方面遇到了一些问题(“文件”字段为空)。

一个简单的模拟来解释我如何使用请求来完成这篇文章并解决问题:

import requests

url = 'http://127.0.0.1:54321/upload'
file_to_send = '25893538.pdf'

files = {'file': (file_to_send,
                  open(file_to_send, 'rb'),
                  'application/pdf',
                  {'Expires': '0'})}

reply = requests.post(url=url, files=files)
print(reply.text)

更多信息请访问 https://requests.readthedocs.io/en/latest/user/quickstart/

要测试此代码,您可以使用一个简单的虚拟服务器(认为可以在 GNU/Linux 或类似版本中运行):

import os
from flask import Flask, request, render_template

rx_file_listener = Flask(__name__)

files_store = "/tmp"
@rx_file_listener.route("/upload", methods=['POST'])
def upload_file():
    storage = os.path.join(files_store, "uploaded/")
    print(storage)
    
    if not os.path.isdir(storage):
        os.mkdir(storage)

    try:
        for file_rx in request.files.getlist("file"):
            name = file_rx.filename
            destination = "/".join([storage, name])
            file_rx.save(destination)
        
        return "200"
    except Exception:
        return "500"

if __name__ == "__main__":
    rx_file_listener.run(port=54321, debug=True)

0
投票

python 3.11.3

客户:

        with open(local_file_path, 'rb') as file_obj:
            multipart_data = MultipartEncoder(
                fields={
                    'file': (os.path.basename(local_file_path), file_obj, your_file_content_type)}
            )
            request_headers["Content-Type"] = multipart_data.content_type
            response = self.session.post(url=upload_server_url, headers=request_headers, data=multipart_data)

'file'
是表单字段名称,即服务器端的参数名称(
@RequestParam("file")
)。
data=multipart_data
会将文件数据放入请求正文中。

服务器端、Java、Springboot:

@Slf4j
@Controller
@CrossOrigin
@RequestMapping("/myapi")
public class MultiPartController {

    @RequestMapping(value = "/upload", method = RequestMethod.POST)
    public String upload(HttpServletRequest httpServletRequest, @RequestParam("file")MultipartFile f) throws IOException {
        log.info(":::::  {}, {}, {}, {}", f.getName(), f.getContentType(), f.getSize(), f.getOriginalFilename());
        
        return "ok";
    }
}

参考https://github.com/requests/toolbelt/tree/1.0.0?tab=readme-ov-file#multipartform-data-encoder

© www.soinside.com 2019 - 2024. All rights reserved.