在 Python 中使用 MSGraph API 移动 Sharepoint 文件

问题描述 投票:0回答:1

我正在尝试利用 Python 中的 MSGraph API 来根据某些输入移动 SharePoint 文件。我目前已经解决了 API 的下载部分,但由于某种原因移动文件不起作用。这是我的代码:

async def _move_sharepoint_file(self, url, site_name, src_drive_path, file_name, dst_site_name, dst_drive_path, new_file_name=None):
    # Get source site and its drive items
        sites = await self.user_client.sites.with_url(url).get()
        for site in sites.value:
            if site.name == site_name:
                src_site_id = site.id
                break
        else:
            raise ValueError(f"Source site with name '{site_name}' not found")

        # Get destination site
        if site_name == dst_site_name:
            dst_site_id = src_site_id
            
            
        else:
            for site in sites.value:
                if site.name == dst_site_name:
                    
                    dst_site_id = site.id
                    
                    break
            else:
                raise ValueError(f"Destination site with name '{dst_site_name}' not found")

        # Find the source item
        src_item_id = await self.get_item_id(src_site_id, src_drive_path, file_name)
        
        if src_item_id is None:
            raise ValueError(f"Item with name '{file_name}' in path '{src_drive_path}' not found")
        query_params = ItemsRequestBuilder.ItemsRequestBuilderGetQueryParameters(
            filter = True
        )

        request_configuration = ItemsRequestBuilder.ItemsRequestBuilderGetRequestConfiguration(
            query_parameters = query_params,
        )
        
        path_list = dst_drive_path.replace("\\", "/").split('/')
        drive_items = await self.user_client.drives.by_drive_id(dst_site_id).items.get(request_configuration=request_configuration)
        drive_items = drive_items.value
        
        partial_dir = ""
        print(f"path_list: {path_list}")
        for dir in path_list:
            if len(path_list) == 1 and path_list[0] == "":
                correct_drive_items = drive_items
                break
            else:
                partial_dir += f"/{dir}"
            print(f"partial_dir: {partial_dir}") if self.verbose else None
            [print(f"drive: {drive.name}") for drive in drive_items] if self.verbose else None
            print() if self.verbose else None
            for drive_item in drive_items:
                if drive_item.name == dir and hasattr(drive_item, 'folder'):  # Check if the item has 'folder' attribute
                    break
            else:
                raise ValueError(f"Folder with name '{partial_dir}' not found")

            drive_id = drive_item.id  # Assume this item is the folder
            print(f"Selected Folder Name: {drive_item.name}, ID: {drive_id}")

        dst_folder_id = drive_id #await self.get_item_id(dst_site_id, dst_drive_path, None)
          # Assuming you pass None for file_name to get the folder ID
        if dst_folder_id is None:
            raise ValueError(f"Destination path '{dst_drive_path}' not found")

        # Prepare the request body
        request_body = DriveItem(
            parent_reference=ItemReference(id=dst_folder_id),
            name=new_file_name if new_file_name else file_name
        )

        # Move the item by updating its parent reference
        result = await self.user_client.drives.by_drive_id(src_site_id).items.by_drive_item_id(src_item_id).patch(request_body)
        if result.is_success:
            print("File moved successfully.")
            return result.content
        else:
            print(f"Failed to move file. Status code: {result.status_code}")
            return None

当尝试运行此代码时,我最终收到错误(模糊了请求 ID):

msgraph.generated.models.o_data_errors.o_data_error.ODataError:
        APIError
        Code: 403
        message: None
        error: MainError(additional_data={}, code='accessDenied', details=None, inner_error=InnerError(additional_data={}, client_request_id='XXXXX', date=DateTime(2024, 5, 14, 7, 6, 52, tzinfo=Timezone('UTC')), odata_type=None, request_id='XXXX'), message='Access denied', target=None)

请注意,我确实拥有尝试访问的 SharePoint 网站的正确权限。我们也有正确的权限(file.readwrite.all)。

我的同事是编写下载部分的人,但我的任务是扩展此部分并添加移动功能。这是实际有效的下载功能:

async def _download_sharepoint_file(self, url, site_name, drive_path, file_names, dst_path=None):
        # Deze functie aanroepen werkt niet met asyncio, dus die heb ik gekopieerd en geplakt
        # drive = await self.get_sharepoint_site_drive_contents(url, site_name, drive_path)
        sites = await self.user_client.sites.with_url(url).get()
        
        print(f"Sites:") if self.verbose else None    
        [print(f"site: {site.name}") for site in sites.value] if self.verbose else None
        print() if self.verbose else None

        for site in sites.value:
            if site.name == site_name:
                break
        else:
            raise ValueError(f"Site with name '{site_name}' not found")
        
        site_id = site.id

        query_params = ItemsRequestBuilder.ItemsRequestBuilderGetQueryParameters(
            filter = True
        )

        request_configuration = ItemsRequestBuilder.ItemsRequestBuilderGetRequestConfiguration(
            query_parameters = query_params,
        )
        
        path_list = drive_path.replace("\\", "/").split('/')
        print(path_list)
        drive_items = await self.user_client.drives.by_drive_id(site_id).items.get(request_configuration=request_configuration)
        drive_items = drive_items.value
        
        partial_dir = ""
        print(f"path_list: {path_list}")
        for dir in path_list:
            if len(path_list) == 1 and path_list[0] == "":
                
                correct_drive_items = drive_items
                
                break
            else:
                partial_dir += f"/{dir}"
            print(f"partial_dir: {partial_dir}") if self.verbose else None
            [print(f"drive: {drive.name}") for drive in drive_items] if self.verbose else None
            print() if self.verbose else None
            for drive_item in drive_items:
                if drive_item.name == dir:
                    break
            else:
                raise ValueError(f"Drive with name '{partial_dir}' not found")
            
            drive_id = drive_item.id
            drive_items = await self.user_client.drives.by_drive_id(site_id).items.by_drive_item_id(drive_id).children.get()
            drive_items = drive_items.value
            
            correct_drive_items = []
            for drive_item in drive_items:
                print(f"drive_item.name: {drive_item.name}") if self.verbose else None
                parent_path = drive_item.parent_reference.path
                print(f"drive_item.parent_reference.path: {drive_item.parent_reference.path}") if self.verbose else None
                if parent_path.endswith(partial_dir):
                    correct_drive_items.append(drive_item)
                    print(f"YES") if self.verbose else None
                
        drives = correct_drive_items
        #### Einde van get_sharepoint_site_drive_contents

        [print(f"drive_item.name: {drive_item.name}") for drive_item in drives] if self.verbose else None
        if isinstance(file_names, str):
            file_names = [file_names]
        drives = [drive for drive in drives if drive.name in file_names]
        if len(drives) == 0:    
            raise ValueError(f"Drive with name '{file_names}' not found")
        
        for drive, file_name in zip(drives, file_names):
            drive_id = drive.id
            parent_drive_id = drive.parent_reference.drive_id
            print(f"drive_id; {drive_id}") if self.verbose else None
            print(f"parent_drive_id: {parent_drive_id}") if self.verbose else None
            try:
                result = await self.user_client.drives.by_drive_id(parent_drive_id).items.by_drive_item_id(drive_id).content.get()
                
            except:
                result = await self.user_client.drives.by_drive_id(parent_drive_id).items.by_drive_item_id(drive_id).get()
                
            if not dst_path:
                return result
            
            _, ext_sp = os.path.splitext(drive.name)
            _, ext = os.path.splitext(file_name)
            assert ext_sp == ext, f"Extensions do not match: {ext_sp} != {ext}"

            if isinstance(result, DriveItem):
                return result

            with open(f"{dst_path}/{file_name}", "wb") as f:
                f.write(result)
            print(f"File saved as: {dst_path}/{file_name}") if self.verbose else None

        return result

从昨天开始我一直在努力解决这个问题,但我尝试的所有操作最终都会出现相同或其他错误。如果有人知道如何修复此错误,我将不胜感激。

python sharepoint microsoft-graph-api
1个回答
0
投票
async def _move_sharepoint_file(self, url, site_name, src_drive_path, file_name, dst_site_name, dst_drive_path, new_file_name=None):
    # Check permissions and verify authentication - ensure proper scopes are included in token
    
    # Get source site and its drive items
    sites = await self.user_client.sites.with_url(url).get()
    src_site_id = None
    dst_site_id = None
    
    for site in sites.value:
        if site.name == site_name:
            src_site_id = site.id
        elif site.name == dst_site_name:
            dst_site_id = site.id
    
    if src_site_id is None:
        raise ValueError(f"Source site with name '{site_name}' not found")
    if dst_site_id is None:
        raise ValueError(f"Destination site with name '{dst_site_name}' not found")

    # Find the source item
    src_item_id = await self.get_item_id(src_site_id, src_drive_path, file_name)
    
    if src_item_id is None:
        raise ValueError(f"Item with name '{file_name}' in path '{src_drive_path}' not found")

    # Get destination folder ID
    dst_drive_items = await self.user_client.drives.by_drive_id(dst_site_id).items.get()
    dst_folder_id = None
    for drive_item in dst_drive_items.value:
        if drive_item.name == dst_drive_path:
            dst_folder_id = drive_item.id
            break
    
    if dst_folder_id is None:
        raise ValueError(f"Destination folder '{dst_drive_path}' not found")

    # Get source drive ID
    src_drive_items = await self.user_client.drives.by_drive_id(src_site_id).items.get()
    src_drive_id = None
    for drive_item in src_drive_items.value:
        if drive_item.name == src_drive_path:
            src_drive_id = drive_item.id
            break
    
    if src_drive_id is None:
        raise ValueError(f"Source drive '{src_drive_path}' not found")

    # Prepare the request body
    request_body = {
        "parentReference": {"driveId": dst_site_id, "id": dst_folder_id},
        "name": new_file_name if new_file_name else file_name
    }

    # Move the item by updating its parent reference
    result = await self.user_client.drives.by_drive_id(src_site_id).items.by_drive_item_id(src_item_id).update(request_body)
    
    if result.is_success:
        print("File moved successfully.")
        return result.content
    else:
        print(f"Failed to move file. Status code: {result.status_code}")
        return None

    
Dont adapt the code according to your specific needs and test thoroughly,one adjustment i made is to ensure that the correct drive ID is used when moving the file. In SharePoint, each site can have multiple document libraries, each represented as a drive also I checked permissions and verified authentication are performed before proceeding with the code.Simplified the retrieval of source and destination site IDs.Added a check for the destination folder ID to ensure it exists.Modified the request body to use a dictionary format.Used the update method instead of patch to update the item's parent reference.Hope it helps
© www.soinside.com 2019 - 2024. All rights reserved.