我已安装 pymsteams 并将其连接到通道并且能够发送消息。我阅读了文档,但没有找到任何与如何发送消息并提及一个人为 @me -> 提及团队中的一个人相关的内容。
这是我的代码的开始:
import pymsteams;
myMessage = pymsteams.connectocard("webhookurl")
.... // here is a logic
myMessage.send()
github 上有一个问题线程正好请求这种功能 https://github.com/rveachkc/pymsteams/issues/11
并且有评论提出了一个临时解决方案,即手动修改connectorcard对象的payload属性。这对我有用。
teams_message = pymsteams.connectorcard(webhook)
teams_message.payload = {
"type": "message",
"attachments": [
{
"contentType": "application/vnd.microsoft.card.adaptive",
"content": {
"type": "AdaptiveCard",
"body": [
{
"type": "TextBlock",
"text": "Hello <at>Leonardo</at>"
}
],
"$schema": "http://adaptivecards.io/schemas/adaptive-card.json",
"version": "1.0",
"msteams": {
"entities": [
{
"type": "mention",
"text": "<at>Leonardo</at>",
"mentioned": {
"id": "[email protected]",
"name": "Leonardo Benitez"
}
}
]
}
}
}]
}
teams_message.send()
代码片段,感谢作者提供。
基于Teams提供的API以及pymsteams的能力,我封装了一个可以调用别人发送消息的方法。你可以参考一下。
import copy
from pymsteams import connectorcard, TeamsWebhookException
from typing import Optional, List
from pydantic import BaseModel
class MessageDTO(BaseModel):
text: str
title: Optional[str] = None
mention_users: Optional[List] = None
link_title: Optional[str] = None
link_url: Optional[str] = None
logger = logging.getLogger(__name__)
DEFAULT_PAYLOAD_TEMPLATE = {
"type": "message",
"attachments": [
{
"contentType": "application/vnd.microsoft.card.adaptive",
"content": {
"type": "AdaptiveCard",
"body": [],
"actions": [],
"$schema": "http://adaptivecards.io/schemas/adaptive-card.json",
"version": "1.0",
"msteams": {"width": "Full"}
}
}]
}
class TeamsMessenger(Messenger):
def __init__(self, webhook_url, messageDTO: MessageDTO):
self.messageDTO = messageDTO
self.my_messenger = connectorcard(hookurl=webhook_url)
def send_message(self):
try:
# Init with message payload template
self.my_messenger.payload = copy.deepcopy(DEFAULT_PAYLOAD_TEMPLATE)
# Add title
if self.messageDTO.title:
self.my_messenger.payload["attachments"][0]["content"]["body"].append({
"type": "TextBlock",
"size": "Large",
"weight": "Bolder",
"text": self.messageDTO.title
})
# Add text
self.my_messenger.payload["attachments"][0]["content"]["body"].append({
"type": "TextBlock",
"size": "Medium",
"text": self.messageDTO.text,
"wrap": True
})
# Add mentions
if self.messageDTO.mention_users:
mentions_entities = []
mention_text = ""
for mention_user in self.messageDTO.mention_users:
mentions_entities.append({
"type": "mention",
"text": f"<at>{mention_user.name}</at>",
"mentioned": {
"id": mention_user.email,
"name": mention_user.name
}
})
mention_text += f"@<at>{mention_user.name}</at> "
self.my_messenger.payload["attachments"][0]["content"]["body"].append({
"type": "TextBlock",
"text": mention_text
})
self.my_messenger.payload["attachments"][0]["content"]["msteams"]["entities"] = mentions_entities
# Add link
if self.messageDTO.link_title and self.messageDTO.link_url:
self.my_messenger.payload["attachments"][0]["content"]["actions"].append({
"type": "Action.OpenUrl",
"title": self.messageDTO.link_title,
"url": self.messageDTO.link_url
})
# Send message
self.my_messenger.send()
except TeamsWebhookException as e:
logger.error(f"An error occurred when send Teams message. {e}")
except Exception as e:
logger.error(f"An error occurred when process sending Teams message. {e}")
参考