422 使用 FastAPI 通过 Swagger UI 文档发送表单数据列表时出现不可处理的实体错误

问题描述 投票:0回答:1

我创建了一个用于对视频文件进行转码的 API,例如 YouTube 的自适应比特率。我用的是FastAPI。目前,我正在尝试为每个文件设置分辨率值。但每当我添加多个分辨率时,我都会在 Swagger 中收到

422 unprocessible entity
错误。

这是我的代码:

async def transcode_video(input_path, output_folder, res, unique_id, total_files, pbar):
 # Use asyncio for command execution with progress bar
 output_path = os.path.join(output_folder, f"{res}p.m3u8")

 # Calculate the target size
 target_size = calculate_target_size(input_path)

 cmd_duration = f"ffprobe -v error -show_entries format=duration -of default=noprint_wrappers=1:nokey=1 {input_path}"
 total_duration = float(subprocess.check_output(cmd_duration, shell=True, text=True).strip())

 cmd = (
     f"ffmpeg -i {input_path} -vf scale=-2:{res} -c:a aac -b:a 128k "
     f"-g 50 -hls_time 1 -hls_list_size 0 "
     f"-crf 23 -b:v 100k -fs {target_size} "
     f"-hls_segment_filename \"{output_path.replace('.m3u8', '_%03d.ts')}\" "
     f"{output_path}"
 )

 process = await asyncio.create_subprocess_shell(
     cmd,
     stdout=asyncio.subprocess.PIPE,
     stderr=asyncio.subprocess.PIPE
 )

 while True:
     line = await process.stderr.readline()
     if not line:
         break
     line = line.decode().strip()
     if "time=" in line:
         # Extracting the time progress from FFmpeg output
         time_str = line.split("time=")[1].split()[0]
         current_time = sum(x * float(t) for x, t in zip([3600, 60, 1], time_str.split(":")))
         progress = (current_time / total_duration) * 100
         pbar.update(progress - pbar.n)

 # Wait for the transcoding process to complete
 await process.wait()

 if process.returncode != 0:
     raise HTTPException(status_code=500, detail="Video transcoding failed.")
 pbar.close()

 # Increment the total number of transcoded files
 total_files[0] += 1

@app.post("/transcode/")
async def transcode_video_endpoint(files: List[UploadFile] = File(...), resolutions: List[int] = None):
 # Counters for transcoded videos
 total_files = [0]

 # Iterate over each file
 for file in files:
     # Check if the file is a valid video file based on its extension
     valid_video_extensions = {".mp4", ".avi", ".mkv", ".mov", ".wmv", ".flv"}
     if not any(file.filename.lower().endswith(ext) for ext in valid_video_extensions):
         print(f"Skipping non-video file: {file.filename}")
         continue

     # Assign a unique ID for each file
     unique_id = str(uuid.uuid4())

     # Log the filename and unique ID
     print(f"Processing file: {file.filename} with unique ID: {unique_id}")

     # Create a folder for the unique ID
     unique_id_folder = os.path.join(OUTPUT_FOLDER, unique_id)
     Path(unique_id_folder).mkdir(parents=True, exist_ok=True)

     # Save the uploaded file
     input_path = os.path.join(UPLOAD_FOLDER, file.filename)
     with open(input_path, "wb") as video_file:
         video_file.write(file.file.read())

     # Check if the file is a valid video file using ffprobe
     try:
         subprocess.run(
             ["ffprobe", "-v", "error", "-select_streams", "v:0", "-show_entries", "stream=codec_type", "-of", "csv=p=0", input_path],
             check=True, capture_output=True
         )
     except subprocess.CalledProcessError:
         print(f"Skipping non-video file: {file.filename}")
         continue

     # Determine the resolutions to transcode based on the provided or default resolution
     resolutions_to_transcode = [res for res in [240, 360, 480, 720] if resolutions is None or res in resolutions]

     # If resolutions is not exactly 360, 480, 720, or 1080, transcode to the nearest lower resolution
     if resolutions is not None:
         resolutions_to_transcode = [get_closest_lower_resolution(res) for res in resolutions]

     # Transcode the video into the specified resolutions
     for res in resolutions_to_transcode:
         output_folder = os.path.join(unique_id_folder, f"{res}p")
         Path(output_folder).mkdir(parents=True, exist_ok=True)

         # Call the transcode_video function with tqdm progress bar
         with tqdm(total=100, desc=f"Transcoding {res}p", position=0, leave=True) as pbar:
             await transcode_video(input_path, output_folder, res, unique_id, total_files, pbar)

     # Create index.m3u8 file after transcoding all resolutions
     create_index_m3u8(unique_id, resolutions_to_transcode)

 return JSONResponse(content={"message": f"{total_files[0]} videos transcoded."})

如果我提供一种解决方案,它就可以正常工作。但如果我提供所有上传视频的分辨率列表,我会收到错误 422。


Code    Details
422 
Error: Unprocessable Entity

Response body
Download
{
  "detail": [
    {
      "loc": [
        "body",
        "resolutions",
        0
      ],
      "msg": "value is not a valid integer",
      "type": "type_error.integer"
    }
  ]
}

我使用的是python 3.8.18。你能帮我吗?我尝试使用列表、对象,但总是收到错误。我的输入格式如下所示:

python swagger fastapi openapi swagger-ui
1个回答
0
投票

这是 Swagger UI 中的一个已知问题,无法真正说明为什么它尚未得到修复。当发送

list
数据时(无论类型如何;可以是
int
str
),它们将作为单个字符串值而不是单独的值发送。 这个答案中有更详细的解释;因此,请查看它以获取更多信息和解决方案。

简而言之,如果您使用其他客户端(例如 Python 请求)来测试 API 端点,它应该可以正常工作。下面的示例基于此答案的方法 1 - 请查看该答案,因为您可能会找到有用的详细信息和示例。

工作示例 1 - 使用
List
数据

app.py

Form

测试.py

from fastapi import FastAPI, Form, File, UploadFile from typing import List app = FastAPI() @app.post("/submit") def submit( files: List[UploadFile] = File(...), resolutions: List[int] = Form(...) ): return { "Filenames": [file.filename for file in files], "resolutions": resolutions }

您还可以使用查询参数,如果适合您的应用程序,它可以与 Swagger UI 自动文档配合使用。有关更多详细信息和示例,请参阅[此答案][3]以及[此答案][3]、[此答案][3]和[此答案][3]。
工作示例 2 - 使用
import requests url = 'http://127.0.0.1:8000/submit' files = [('files', open('a.txt', 'rb'))] data = {"resolutions": [720, 480]} r = requests.post(url=url, data=data, files=files) print(r.json())

参数
List

app.py

Query

测试.py

from fastapi import FastAPI, File, UploadFile, Query from typing import List app = FastAPI() @app.post("/submit") def submit( files: List[UploadFile] = File(...), resolutions: List[int] = Query(...) ): return { "Filenames": [file.filename for file in files], "resolutions": resolutions }

© www.soinside.com 2019 - 2024. All rights reserved.