我想进行一个涉及网络多播的实验。为此,我有一个服务器将一些处理过的模型权重(ndarrays 元组)发送回客户端。当尝试使用 pickle.save() 和 pickle.load() 序列化和反序列化接收到的权重时,我总是收到以下错误 _pickle.UnpicklingError:无效的加载密钥,'XXX' 其中 XXX 是转储中的某个密钥。
我正在使用 python 3.10
下面的最小可复制示例
发射器:
import pickle
import socket
import struct
import sys
import numpy as np
group_ip_address = "239.0.0.100"
port = 10000
source_ip = "192.168.68.116"
source_port = 12345
print("Multicast sender transmitting data to ", group_ip_address, "/", port, ". Local address: ", source_ip,
"/", source_port)
message = tuple(np.random.rand(*(100,100)) for _ in range(10))
multicast_group = (group_ip_address, port)
# Create UDP socket
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
# bind to selected address
sock.bind((source_ip, source_port))
# Set the time-to-live for messages to 1 so they do not go past the
# local network segment.
ttl = struct.pack('b', 1)
sock.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, ttl)
try:
BUFFER_SIZE = 4096
# Send data to the multicast group
print(sys.getsizeof(message))
print(message)
serialized = pickle.dumps(message)
for i in range(0, len(serialized), BUFFER_SIZE):
data = serialized[i:i + BUFFER_SIZE]
sock.sendto(data, multicast_group)
while True:
sock.settimeout(1)
print('waiting to receive acknowledgment')
try:
data, server = sock.recvfrom(BUFFER_SIZE)
except socket.timeout:
print('timed out, resending package')
sock.sendto(data, multicast_group)
else:
print('received "%s" from %s' % (data, server))
break
# tell edge server to stop the recv loop
sock.sendto(b'END', multicast_group)
finally:
print('closing socket')
sock.close()
接收器:
import pickle
import socket
import struct
multicast_group = '239.0.0.100'
port = 10000
print("Multicast receiver listening for data at ", multicast_group, "/", port)
############
server_address = ('', port)
# Create UDP socket
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
# Bind to the server address
sock.bind(server_address)
# Tell the operating system to add the socket to the multicast group
# on all interfaces.
group = socket.inet_aton(multicast_group)
mreq = struct.pack('4sL', group, socket.INADDR_ANY)
sock.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, mreq)
BUFFER_SIZE = 4096
packets = []
while True:
data, address = sock.recvfrom(BUFFER_SIZE)
print(data)
if not data or data.endswith(b'END'):
break
packets.append(data)
sock.sendto(b'ack', address)
# print('received %s bytes from %s' % (len(data), address))
# print('total : %s' % len(packets))
bytes = b''.join(packets)
new_aggregated_parameters = pickle.loads(bytes)
print(type(new_aggregated_parameters))