我试图从我的服务消耗的所有消息中获取特定信息,我不想在我拥有的每个消费者中重复相同的代码。
所以我创建了一个实现IReceiveObserver的类,并且在public Task PreReceive(ReceiveContext context)
中我试图获取消息体,除了我不能。
public Task PreReceive(ReceiveContext context)
{
var body = context.GetBody();
return null;
}
我假设该主体将包含消息正文,所以我试图使用以下方法反序列化它:
public static object DeserializeFromStream(MemoryStream stream)
{
IFormatter formatter = new BinaryFormatter();
stream.Seek(0, SeekOrigin.Begin);
object o = formatter.Deserialize(stream);
return o;
}
var Deserialized =(GeneralMessageType) DeserializeFromStream((MemoryStream)body);
我收到以下错误:
System.Runtime.Serialization.SerializationException:'输入流不是有效的二进制格式。
这是实现我需要的正确方法吗?我想我正在遵循正确的方法,但我在提取邮件正文时遇到了问题。
更新:
消息正文使用Console.WriteLine("{0}", (new StreamReader(stream)).ReadToEnd());
:
{\ r \ n \“messageId \”:\“01110000-5d1b-0015-b7c9-08d5479e73b2 \”,\ r \ n \ n“correlationId \”:\“da197981-a10e-43a9-a53b-67b7a3261511 \”,\ r \ n \“conversationId \”:\“01110000-5d1b-0015-80d3-08d5479e73a6 \”,\ r \ n \“\ sourceAddress \”:\“queue URL”,\ r \ n \“destinationAddress \”:\ “receiverURL”,\ r \ n \“messageType \”:[\ r \ n \“urn:message:MessageType”\ r \ n],\ r \ n \“message \”:{\ r \ n \ n \“ correlationId \“:\”da197981-a10e-43a9-a53b-67b7a3261511 \“,\ r \ n \”\ Id \“:\”62d2bfff-7e20-4b51-b431-7d46e44abfc0 \“,\ r \ n \”名称\ “:\”'''yry'''\“,\ r \ n \”代码\“:\”ryryyy \“,\ r \ n \”isActive \“:true,\ r \ n \”用户名\ “:\”username \“,\ r \ n \”密码\“:\”***** \“,\ r \ n \”isUserActive \“:true \ r \ n},\ r \ n \ “headers \”:{},\ r \ n \“host \”:{\ r \ n \“machineName \”:\“myMachine \”,\ r \ n \“processName \”:\“w3wp \”,\ r \ n \“processId \”:4568,\ r \ n \“assembly \”:\“MassTransit \”,\ r \ n \“assemblyVersion \”:\“3.5.7.1082 \”,\ r \ n \“\ frameworkVersion \”:\“4.0.30319.42000 \”,\ r \ n \ n \“massTransitVersion \”:\“3.5.7.1082 \”,\ r \ n \“operatingSystemVersion \”:\“Microsoft Windows NT 10.0.16299.0 \”\ r \ n} \ r \ n}“
所以你不需要打扰BinaryFormatter
。相反,您应该使用System.Runtime.Serialization.DataContractJsonSerializer
,因为您的流的内容将作为JSON传递。您可以使用以下代码执行此操作:
DataContractJsonSerializer serializer = new DataContractJsonSerializer(typeof(MassTransit));
MassTransit obj = (MassTransit)serializer.ReadObject(stream);
这假设您尝试反序列化的对象是MassTransit
类型。如果不是,那么只需将其替换为您尝试反序列化实例的任何类。
你可以把它变成这样的通用方法:
public static T JsonDeserializer<T>(MemoryStream strm)
{
DataContractJsonSerializer serializer = new DataContractJsonSerializer(typeof(T));
return (T)serializer.ReadObject(strm);
}
只需在您想要从JSON流反序列化对象的地方调用它。
弄清楚了:
正如您所看到的那样,除了一些其他属性外,context.GetBody();
还提供了包含主机信息,标题和消息正文的json。
我所做的是删除“\ r \ n”并替换“with”然后转到https://jsonutils.com/,粘贴生成的Json,它会将它转换为你应该反序列化的类。
这个东西包含的信息比发送的消息多得多,所有这些都是由masstransit和rabbitmq添加的。
public class Message
{
//Message attributes
}
public class Headers
{
}
public class Host
{
public string machineName { get; set; }
public string processName { get; set; }
public int processId { get; set; }
public string assembly { get; set; }
public string assemblyVersion { get; set; }
public string frameworkVersion { get; set; }
public string massTransitVersion { get; set; }
public string operatingSystemVersion { get; set; }
}
public class TotalMessage
{
public string messageId { get; set; }
public string correlationId { get; set; }
public string conversationId { get; set; }
public string sourceAddress { get; set; }
public string destinationAddress { get; set; }
public IList<string> messageType { get; set; }
public Message message { get; set; }
public Headers headers { get; set; }
public Host host { get; set; }
}
对于反序列化,您可以使用:
public static object DeserializeFromStream(MemoryStream stream)
{
IFormatter formatter = new BinaryFormatter();
stream.Seek(0, SeekOrigin.Begin);
object o = formatter.Deserialize(stream);
return o;
}
var body = context.GetBody();
var totalMessage= JsonDeserializer<TotalMessage>((MemoryStream)body);
获取消息正文:totalMessage.message
更新:
当然,您可以通过以下方式反序列化为仅消息正文:
public class TotalMessage
{
public Message message { get; set; }
}