Python 中的共享内存 IPC

问题描述 投票:0回答:0

我有两个 C 程序,可以在共享内存上传输数据,并且它们可以工作。这是数据结构:

struct SecondData
{
    uint64_t x[16];
    uint y;
    byte z[16];
    char w[128];
    uint64_t q;
};

struct SharedData
{
    int action;
    SecondData data;
    bool ack;
    uint32_t checksum;
};

我需要将连接的一侧更改为 Python 脚本,但我不知道如何操作!我写了一个脚本,但它不起作用。

这是我的Python源代码:

from ctypes import *
import threading
import time

IPC_CREAT = 512
libc = CDLL("", use_errno=True, use_last_error=True)


# int shmget(key_t key, size_t size, int shmflg);
shmget = libc.shmget
shmget.restype = c_int
shmget.argtypes = (c_int, c_size_t, c_int)


# void* shmat(int shmid, const void *shmaddr, int shmflg);
shmat = libc.shmat
shmat.restype = c_void_p
shmat.argtypes = (c_int, c_void_p, c_int)

strlen = libc.strlen

# int shmdt(const void *shmaddr);
shmdt = libc.shmdt
shmdt.restype = c_int
shmdt.argtypes = (c_void_p,)

lock = threading.Lock()
shmaddr = None

# Enum definition
class Action(c_int):
    SENegotiate = 1
    SRead = 2
    SWrite = 3
    SIn = 4
    SExit = 5

# Structure definitions
class SecondData(Structure):
    _fields_ = [
        ("x", c_ulonglong * 64),
        ("y", c_uint),
        ("z", c_byte * 16),
        ("w", c_char * 128),
        ("q", c_uint64),
    ]

class SharedData(Structure):
    _fields_ = [
        ("action", c_int),
        ("data", SecondData),
        ("ack", c_bool),
        ("cehcksum", c_uint32),
    ]

def connect():
    global shmaddr
    # Get the shared memory key using ftok
    shm_key = libc.ftok(b"SharedMemory", 65)
    
    # Get the shared memory ID
    shmid = shmget(shm_key, 1024, 0o666)
    if shmid == -1:
        print("[INFO] error in shmget:", shmid)
        return 1
    
    # Attach to the shared memory
    shmaddr = shmat(shmid, None, 0)
    if shmaddr == -1:
        print("[INFO] error in shmat")
        return 1
    
    # Cast the shared memory pointer to the SharedData struct
    shared_data = cast(shmaddr, POINTER(SharedData)).contents
    print(f"[INFO] shmid : {shmid}")
    print("[INFO] send hello  ...")
    data_to_send = b"AAAAAAAAAAA!\x00"
    
    with lock:
        shared_data.cehcksum = c_uint32(0)
        shared_data.data.w = data_to_send
        shared_data.ack = False
        shared_data.action = Action.SENegotiate
    
    connection = False
    shared_data = cast(shmaddr, POINTER(SharedData)).contents
    print("[INFO] waiting ...")
    while not connection:
        with lock:
            if shared_data.ack and strlen(shared_data.data.w) > 0:
                q = shared_data.data.q
                w = shared_data.data.w.decode("utf-8")
                print("[INFO] 0x{:016X}    {}".format(q, w))
                shared_data.ack = True
                print("[INFO] the c code is up ...")
                ce_connection = True
    return 0



if connect():
    print("[INFO] can't connect")
    exit(1)

C 代码获取数据,但消息 (

data_to_send
) 无效,我在 C 端获得了 asm 代码
mov    rdi, rsp
!,之后 Python 陷入
if shared_data.ack and strlen(shared_data.data.w) > 0
并且无法检测更新的内存。

python ipc shared-memory
© www.soinside.com 2019 - 2024. All rights reserved.