在这里,将从Actor之间发送消息的角度来介绍所有关于消息传递的概念。
● Ask:向Actor发送一条消息,返回一个Future。当Actor返回响应时,会完成Future。不会向消息发送者的邮箱返回任何消息。
● Tell。向Actor发送一条消息。所有发送至sender()的响应都会返回给发送消息的Actor。● Forward:将接收到的消息再发送给另一个Actor。所有发送至sender()的响应都会返回给原始消息的发送者。
● Pipe:用于将Future的结果返回给sender()或另一个Actor。如果正在使用Ask或是处理一个Future,那么使用Pipe可以正确地返回Future的结果。
可变消息定义
// Java
public class Message {public StringBuffer mutableBuffer;
public Message(StringBuffer: mutableBuffer) {this.mutableBuffer = mutableBuffer;
}
}
// scala
class Message(var mutableBuffer: StringBuffer = new StringBuffer);
Message message = new Message(new StringBuffer("original"));
message.mutableBuffer = new StringBuffer("new");
val message = new Message(new StringBuffer("original"))
message.mutableBuffer = new StringBuffer("new")
这个例子新建了一条消息,然后修改mutableBuffer引用,使之指向另一个新建的StringBuffer,消息创建时传入的是StringBuffer (“original”),后来被修改成了StringBuffer(“new”)。这就是通过修改引用来修改消息的方法。
消息不可变
public class Message {// final
public final StringBuffer mutableBuffer;
Message(StringBuffer mutableBuffer) {this.mutableBuffer = mutableBuffer;
}
}
public class ImmutableMessage{public final String immutableType;
public ImmutableMessage(String immutableType) {this.immutableType = immutableType;
}
}
class ImmutableMessage(immutableType: String)
总结
理解不可变性不仅仅对于Akka消息是至关重要的,对于如何进行安全的通用并发编程也是必不可少的,因此,无论什么时候,只要需要在线程之间共享数据,就应该首先考虑将数据定义为不可变。
1.2 ASK消息模式![epub_22651331_27.jpg](https://img-blog.csdnimg.cn/img_convert/2a7e44802270d65d74f4b6588fff8522.jpeg
要求
Ask模式要求定义一个超时参数,如果对方没有在超时参数限定的时间内返回这个ask的响应,那么Future就会返回失败。ask/?方法要求提供的超时参数可以是长整型的毫秒数,也可以是akka.util.Timeout,这种类型提供了更丰富的时间表达方式。
![epub_22651331_29.jpg](https://img-blog.csdnimg.cn/img_convert/50dcf9f3caa3978e58df33c9ff7b4690.jpeg#averageHue=#f0f0f0&clientId=uac482df2-4a9a-4&crop=0&crop=0&crop=1&crop=1&from=paste&height=278&id=ue3790d29&margin=[object Object]&name=epub_22651331_29.jpg&originHeight=347&originWidth=1198&originalType=binary&ratio=1&rotation=0&showTitle=false&size=44177&status=done&style=none&taskId=uc3ef683b-00ce-4632-86a0-8a7a71984e3&title=&width=958.4)
static import akka.pattern.Patterns.ask;
Timeout timeout = new akka.util.Timeout(
1,
java.util.concurrent.TimeUnit.SECONDS
);
Future future = ask(actor, message, timeout);
import scala.concurrent.duration._
import akka.pattern.ask
// 设置超时时间
implicit val timeout = akka.util.Timeout(1 second)
val future = actorRef ? "message"
案例:请参考前面的案例3
缺点
Ask模式看上去很简单,不过它是有隐藏的额外性能开销的,首先,ask会导致Akka在/temp路径下新建一个临时Actor。
这个临时Actor会等待从接收ask消息的Actor返回的响应,其次,Future也有额外的性能开销。
Ask会创建Future,由临时Actor负责完成,这个开销并不大,但是如果需要非常高频地执行ask操作,那么还是要将这一开销考虑在内的。
Ask很简单,不过考虑到性能,使用tell是更高效的解决方案。
//Java
actor.forward(result, getContext());
//Scala
actor forward message
1.4 Pipe消息模式很多情况下,需要将Actor中的某个Future返回给请求发送者。上文介绍过sender()是一个方法,所以要在Future的回调函数中访问sender(),我们必须存储一个指向sender()的引用:
//Java
final ActorRef senderRef = sender();
future.map(x ->{senderRef.tell(x, ActorRef.noSender())});
//Scala
val senderRef = sender();
future.map(x =>senderRef ! x);(原文为future.map(x =>senderRef ! ActorRef.noSender),有误。)
//Java
pipe(future, system.dispatcher()).to(sender());
//Scala
future pipeTo sender()
pipe(future) to sender()
pipe接受Future的结果作为参数,然后将其传递给所提供的Actor引用。在上面的例子中,因为sender()执行在当前线程上,所以我们可以直接调用sender(),而不用干一些奇怪的事情(比如把sender()引用存储在一个变量中),执行结果一切正确。这就好多了!
你是否还在寻找稳定的海外服务器提供商?创新互联www.cdcxhl.cn海外机房具备T级流量清洗系统配攻击溯源,准确流量调度确保服务器高可用性,企业级服务器适合批量采购,新人活动首月15元起,快前往官网查看详情吧