我有两个 C 程序,可以在共享内存上传输数据,并且它们可以工作。这是数据结构:
struct SecondData
{
uint64_t x[16];
uint y;
byte z[16];
char w[128];
uint64_t q;
};
struct SharedData
{
int action;
SecondData data;
bool ack;
uint32_t checksum;
};
我需要将连接的一侧更改为 Python 脚本,但我不知道如何操作!我写了一个脚本,但它不起作用。
这是我的Python源代码:
from ctypes import *
import threading
import time
IPC_CREAT = 512
libc = CDLL("", use_errno=True, use_last_error=True)
# int shmget(key_t key, size_t size, int shmflg);
shmget = libc.shmget
shmget.restype = c_int
shmget.argtypes = (c_int, c_size_t, c_int)
# void* shmat(int shmid, const void *shmaddr, int shmflg);
shmat = libc.shmat
shmat.restype = c_void_p
shmat.argtypes = (c_int, c_void_p, c_int)
strlen = libc.strlen
# int shmdt(const void *shmaddr);
shmdt = libc.shmdt
shmdt.restype = c_int
shmdt.argtypes = (c_void_p,)
lock = threading.Lock()
shmaddr = None
# Enum definition
class Action(c_int):
SENegotiate = 1
SRead = 2
SWrite = 3
SIn = 4
SExit = 5
# Structure definitions
class SecondData(Structure):
_fields_ = [
("x", c_ulonglong * 64),
("y", c_uint),
("z", c_byte * 16),
("w", c_char * 128),
("q", c_uint64),
]
class SharedData(Structure):
_fields_ = [
("action", c_int),
("data", SecondData),
("ack", c_bool),
("cehcksum", c_uint32),
]
def connect():
global shmaddr
# Get the shared memory key using ftok
shm_key = libc.ftok(b"SharedMemory", 65)
# Get the shared memory ID
shmid = shmget(shm_key, 1024, 0o666)
if shmid == -1:
print("[INFO] error in shmget:", shmid)
return 1
# Attach to the shared memory
shmaddr = shmat(shmid, None, 0)
if shmaddr == -1:
print("[INFO] error in shmat")
return 1
# Cast the shared memory pointer to the SharedData struct
shared_data = cast(shmaddr, POINTER(SharedData)).contents
print(f"[INFO] shmid : {shmid}")
print("[INFO] send hello ...")
data_to_send = b"AAAAAAAAAAA!\x00"
with lock:
shared_data.cehcksum = c_uint32(0)
shared_data.data.w = data_to_send
shared_data.ack = False
shared_data.action = Action.SENegotiate
connection = False
shared_data = cast(shmaddr, POINTER(SharedData)).contents
print("[INFO] waiting ...")
while not connection:
with lock:
if shared_data.ack and strlen(shared_data.data.w) > 0:
q = shared_data.data.q
w = shared_data.data.w.decode("utf-8")
print("[INFO] 0x{:016X} {}".format(q, w))
shared_data.ack = True
print("[INFO] the c code is up ...")
ce_connection = True
return 0
if connect():
print("[INFO] can't connect")
exit(1)
C 代码获取数据,但消息 (
data_to_send
) 无效,我在 C 端获得了 asm 代码 mov rdi, rsp
!,之后 Python 陷入 if shared_data.ack and strlen(shared_data.data.w) > 0
并且无法检测更新的内存。