Masstransit IReceiveObserver无法获取邮件正文

问题描述 投票:0回答:2

我试图从我的服务消耗的所有消息中获取特定信息,我不想在我拥有的每个消费者中重复相同的代码。

所以我创建了一个实现IReceiveObserver的类,并且在public Task PreReceive(ReceiveContext context)中我试图获取消息体,除了我不能。

  public Task PreReceive(ReceiveContext context)
        {
            var body = context.GetBody();
            return null;
        }

我假设该主体将包含消息正文,所以我试图使用以下方法反序列化它:

   public static object DeserializeFromStream(MemoryStream stream)
        {
            IFormatter formatter = new BinaryFormatter();
            stream.Seek(0, SeekOrigin.Begin);
            object o = formatter.Deserialize(stream);
            return o;
        }
var Deserialized =(GeneralMessageType) DeserializeFromStream((MemoryStream)body);

我收到以下错误:

System.Runtime.Serialization.SerializationException:'输入流不是有效的二进制格式。

这是实现我需要的正确方法吗?我想我正在遵循正确的方法,但我在提取邮件正文时遇到了问题。

更新:

消息正文使用Console.WriteLine("{0}", (new StreamReader(stream)).ReadToEnd());

{\ r \ n \“messageId \”:\“01110000-5d1b-0015-b7c9-08d5479e73b2 \”,\ r \ n \ n“correlationId \”:\“da197981-a10e-43a9-a53b-67b7a3261511 \”,\ r \ n \“conversationId \”:\“01110000-5d1b-0015-80d3-08d5479e73a6 \”,\ r \ n \“\ sourceAddress \”:\“queue URL”,\ r \ n \“destinationAddress \”:\ “receiverURL”,\ r \ n \“messageType \”:[\ r \ n \“urn:message:MessageType”\ r \ n],\ r \ n \“message \”:{\ r \ n \ n \“ correlationId \“:\”da197981-a10e-43a9-a53b-67b7a3261511 \“,\ r \ n \”\ Id \“:\”62d2bfff-7e20-4b51-b431-7d46e44abfc0 \“,\ r \ n \”名称\ “:\”'''yry'''\“,\ r \ n \”代码\“:\”ryryyy \“,\ r \ n \”isActive \“:true,\ r \ n \”用户名\ “:\”username \“,\ r \ n \”密码\“:\”***** \“,\ r \ n \”isUserActive \“:true \ r \ n},\ r \ n \ “headers \”:{},\ r \ n \“host \”:{\ r \ n \“machineName \”:\“myMachine \”,\ r \ n \“processName \”:\“w3wp \”,\ r \ n \“processId \”:4568,\ r \ n \“assembly \”:\“MassTransit \”,\ r \ n \“assemblyVersion \”:\“3.5.7.1082 \”,\ r \ n \“\ frameworkVersion \”:\“4.0.30319.42000 \”,\ r \ n \ n \“massTransitVersion \”:\“3.5.7.1082 \”,\ r \ n \“operatingSystemVersion \”:\“Microsoft Windows NT 10.0.16299.0 \”\ r \ n} \ r \ n}“

c# masstransit observers
2个回答
1
投票

所以你不需要打扰BinaryFormatter。相反,您应该使用System.Runtime.Serialization.DataContractJsonSerializer,因为您的流的内容将作为JSON传递。您可以使用以下代码执行此操作:

DataContractJsonSerializer serializer = new DataContractJsonSerializer(typeof(MassTransit));
MassTransit obj = (MassTransit)serializer.ReadObject(stream);

这假设您尝试反序列化的对象是MassTransit类型。如果不是,那么只需将其替换为您尝试反序列化实例的任何类。

你可以把它变成这样的通用方法:

public static T JsonDeserializer<T>(MemoryStream strm)
{
    DataContractJsonSerializer serializer = new DataContractJsonSerializer(typeof(T));
    return (T)serializer.ReadObject(strm);
}

只需在您想要从JSON流反序列化对象的地方调用它。


0
投票

弄清楚了:

正如您所看到的那样,除了一些其他属性外,context.GetBody();还提供了包含主机信息,标题和消息正文的json。

我所做的是删除“\ r \ n”并替换“with”然后转到https://jsonutils.com/,粘贴生成的Json,它会将它转换为你应该反序列化的类。

这个东西包含的信息比发送的消息多得多,所有这些都是由masstransit和rabbitmq添加的。

 public class Message
{
    //Message attributes
}

public class Headers
{
}

public class Host
{
    public string machineName { get; set; }
    public string processName { get; set; }
    public int processId { get; set; }
    public string assembly { get; set; }
    public string assemblyVersion { get; set; }
    public string frameworkVersion { get; set; }
    public string massTransitVersion { get; set; }
    public string operatingSystemVersion { get; set; }
}

public class TotalMessage
{
    public string messageId { get; set; }
    public string correlationId { get; set; }
    public string conversationId { get; set; }
    public string sourceAddress { get; set; }
    public string destinationAddress { get; set; }
    public IList<string> messageType { get; set; }
    public Message message { get; set; }
    public Headers headers { get; set; }
    public Host host { get; set; }
}

对于反序列化,您可以使用:

  public static object DeserializeFromStream(MemoryStream stream)
        {
            IFormatter formatter = new BinaryFormatter();
            stream.Seek(0, SeekOrigin.Begin);
            object o = formatter.Deserialize(stream);
            return o;
        }

 var body = context.GetBody();
 var totalMessage= JsonDeserializer<TotalMessage>((MemoryStream)body);

获取消息正文:totalMessage.message

更新:

当然,您可以通过以下方式反序列化为仅消息正文:

public class TotalMessage
{
    public Message message { get; set; }
}
© www.soinside.com 2019 - 2024. All rights reserved.