我目前正在尝试创建一个消息传递库,消息传递的原则之一是只能通过消息修改可变状态。传递的'消息'是带有'发送者'(创建消息)和'接收者'的函数对象(工作者/演员/你从队列中处理消息的内容)。
工作者定义如下,并且使用接口的自引用性质,因为工作者可能具有它想要向消息发送者公开的状态,并且这要求发送者知道它的唯一类型。
public interface Worker<T extends Worker<T>> {
T getWorker(); //convert a Worker<T> to it's <T> since T extends Worker<T>
<S extends Worker<S>> void message(Message<S,T> msg);
}
其中<S>
是发送消息的工作者的类型。
消息定义如下:
public interface Message<S extends Worker<S>,R extends Worker<R>> {
void process(S sender, R thisWorker);
}
其中<S>
是发送者的类类型,<R>
是处理工作者的类类型。正如预期的接口和消息传递的机制一样,该函数将能够修改thisWorker
的状态。但是,如果消息是直接改变sender
,则会导致竞争条件,因为在worker / threads之间没有同步(这是使用消息开始的全部要点!)
虽然我可以声明<S>
是一个通用的Worker<?>
,但这会破坏消息以有意义的方式“回复”它的发送者的能力,因为它不能再引用它的特定字段。
因此,我如何确保sender
不会被修改,除非在专门针对它的消息的上下文中?
T extends Worker<T>
确实用于防止类返回非Worker类型。我可能只是逃脱Worker<T>
并信任用户。
我特别想要实现的是消息传递系统,其中消息是代码,这将避免在每个工作者中对不同消息类型的任何特殊处理的需要。但是,我可能会意识到,如果我想强制执行它,就像在Swing EDT上执行一样,没有遵循某些消息协议就没有优雅的方法。
我不太了解你的设计。 interface Worker<T extends Worker<T>>
等接口定义中递归泛型的目的究竟是什么?不会interface Worker<T>
足够,仍然允许T getWorker()
的实现获得实现Worker
接口的类的具体类型?是否试图限制接口实现从Worker
返回不是getWorker()
的东西?在这种情况下,我认为interface Worker<T extends Worker<?>>
会减少混乱。
因此,我如何确保
sender
不会被修改,除非在专门针对它的消息的上下文中?
据我所知,阻止在所需上下文之外调用对象方法的唯一方法是将对象方法的执行限制在客户端代码无法访问的单个线程中。这将要求对象检查当前线程是每个方法中的第一件事。下面是一个简单的例子。它本质上是一个标准的生产者/消费者实现,其中被修改的对象(不是Mailbox
/消费者实现!)阻止在指定线程(消耗传入消息的线程)之外的线程上修改自身。
class Mailbox {
private final BlockingQueue<Runnable> mQueue = new LinkedBlockingQueue<>();
private final Thread mReceiverThread = new Thread(() -> {
while (true) {
try {
Runnable job = mQueue.take();
job.run();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
public Mailbox() {
mReceiverThread.start();
}
public void submitMsg(Runnable msg) {
try {
mQueue.put(msg);
} catch (InterruptedException e) {
// TODO exception handling; rethrow wrapped in unchecked exception here in order to allow for use of lambdas.
throw new RuntimeException(e);
}
}
public void ensureMailboxThread(Object target) {
if (Thread.currentThread() != mReceiverThread) {
throw new RuntimeException("operations on " + target + " are confined to thread " + mReceiverThread);
}
}
}
class A {
// Example use
public static void main(String[] args) {
Mailbox a1Mailbox = new Mailbox();
Mailbox a2Mailbox = new Mailbox();
A a1 = new A(a1Mailbox);
A a2 = new A(a2Mailbox);
// Let's send something to a1 and have it send something to a2 if a certain condition is met.
// Notice that there is no explicit sender:
// If you wish to reply, you "hardcode" the reply to what you consider the sender in the Runnable's run().
a1Mailbox.submitMsg(() -> {
if (a1.calculateSomething() > 3.0) {
a2Mailbox.submitMsg(() -> a2.doSomething());
} else {
a1.doSomething();
}
});
}
private final Mailbox mAssociatedMailbox;
public A(Mailbox mailbox) {
mAssociatedMailbox = mailbox;
}
public double calculateSomething() {
mAssociatedMailbox.ensureMailboxThread(this);
return 3 + .14;
}
public void doSomething() {
mAssociatedMailbox.ensureMailboxThread(this);
System.out.println("hello");
}
}
让我重申强调的部分:参与消息传递的每个单独的类都要在每个方法的开头验证当前线程。无法将此验证提取到通用类/接口,因为可以从任何线程调用对象的方法。例如,在上面的示例中使用Runnable
提交的submitMsg
可以选择生成一个新线程并尝试修改该线程上的对象:
a1Mailbox.submitMsg(() -> {
Thread t = new Thread(() -> a1.doSomething());
t.start();
});
但是,这可以防止,但只是由于检查是A.doSomething()
本身的一部分。