-
Notifications
You must be signed in to change notification settings - Fork 691
Java messageproducer
本指南以1.4.5版本的java客户端为起点编写。
- 简单例子
- 消息
- 客户端配置
- 会话工厂MessageSessionFactory
- 发送消息MessageProducer
- 订阅消息MessageConsumer
- 遍历消息TopicBrowser
- Spring框架支持
- 高级主题
MQ系统的三个角色:生产者,消费者和队列。生产者作为消息内容的提供商,是整个系统的驱动者。生产者发送消息到队列服务器,消费者从队列服务器获取消息并消费。三者分工明确,密切协作来完成异步任务。
在上一节创建了消息会话工厂后,就可以通过工厂来创建消息生产者,这都是通过createProducer
方法:
final MessageProducer producer = sessionFactory.createProducer();
创建的生产者是MessageProducer
类的实例,有了生产者是不是就可以马上发送消息?很抱歉,不行。原因是这样,服务端注册它提供的topic到zookeeper,但是客户端链接到zookeeper后还不知道应该连接哪个服务器。它必须告诉zookeeper想去连接哪个topic的服务器,拿到服务器地址,然后才能建立socket连接,最后才可以发送消息。
因此创建生产者之后还需要一个发布Topic的过程:
// publish topic
final String topic = "meta-test";
producer.publish(topic);
假设我们要发送的消息的主题是meta-test
,那么只要调用publish(topic)
这个方法就可以告诉zookeeper我们想要查找提供topic为meta-test
的服务器,从zookeeper获取服务器地址后,会话工厂会连接服务器,生产者就可以使用这些连接来发送消息了。
publish(topic)
方法可以针对同一个topic调用多次,这跟调用一次的效果是一样的。
同一个生产者想要发送多个不同topic的消息,那么这些topic都需要被发布一次:
producer.publish("topic1");
producer.publish("topic2");
例如,这里我们发布了两个topic:topic1
和topic2
,接下来就可以尝试发送这些主题的消息。
我们发布了meta-test
的主题,然后希望zookeeper告诉我们哪里有提供meta-test
主题的队列服务的服务器,但是zookeeper可能找不到提供这个主题服务的服务器,那么在发送消息的时候就会看到这样的异常:
There is no aviable partition for topic meta-test,maybe you don't publish it at first?
,这种情况你必须检查你的服务器是否配置了meta-test
这个topic,并且客户端和服务器的zookeeper配置也保持一致。
在消息一节,我们已经介绍了怎么创建消息,现在我们要做的就是调用send
方法将创建的消息发送出去:
// send message
try{
final SendResult sendResult = producer.sendMessage(new Message(topic, "hello,MetaQ!".getBytes()));
// check result
if (!sendResult.isSuccess()) {
System.err.println("Send message failed,error message:" + sendResult.getErrorMessage());
}
else {
System.out.println("Send message successfully,sent to " + sendResult.getPartition());
}
}catch(MetaClientException e){
e.printStackTrace(); //TODO log
}catch(InterruptedException e){
Thread.currentThread().interrupt();
}
发送的结果保存在返回的SendResult
实例。同时。你可能需要处理一些异常,如中断异常和客户端异常。通过SendResult.isSuccess()
返回的布尔值可以判断发送成功还是失败。
如果发送失败,通过SendResult.getErrorMessage()
方法可以获取发送失败的具体错误信息,方便调试。如果发送成功,则可以通过getPartition()
和getOffset()
方法获取消息被发送到哪个分区,以及消息在该分区数据文件中的绝对偏移量。
send(message)
是一个同步调用,默认使用3秒的超时时间,如果超时还没有返回则会抛出MetaOpeartionTimeoutException
异常,这个异常是MetaClientException
异常的子类。如果要捕捉超时异常,可以单独catch这个异常信息。
send
同时有一个同步的重载的方法用于设定发送超时时间,比如5秒发送超时:
producer.send(message, 5000, TimeUnit.MILLISECONDS);
同步发送会阻塞当前线程,直到服务端返回发送结果或者超时才能解除阻塞。现在应用都讲究异步化,因此MessageProducer
还提供了异步发送的方法,采用经典的回调方式:
producer.sendMessage(new Message(topic, "hello,MetaQ".getBytes()), new SendMessageCallback() {
@Override
public void onMessageSent(final SendResult result) {
if (result.isSuccess()) {
System.out.println("Send message successfully,sent to " + result.getPartition());
}
else {
System.err.println("Send message failed,error message:" + result.getErrorMessage());
}
}
@Override
public void onException(final Throwable e) {
e.printStackTrace();
}
});
send(msg, callback)
除了接收消息外,还接收一个实现了接口SendMessageCallback
的回调对象。这个接口有两个必须实现的方法:
- onMessageSent(sendResult),当消息发送后,返回发送的结果,检查
sendResult
来判断成功还是失败。 - onException(throwable),当客户端异常发生的时候,此时消息已经发送失败,你可以处理这个异常或者简单地打印日志记录。
异步的send
方法就没有返回结果了,发送结果通过onMessageSent
返回给回调对象。同样,异步的send
方法也有指定超时的重载版本send(message, callback, timeout, timeUnit)
异步发送消息,在发送结果从服务端返回之前,你传入的callback对象会一直在内存里,由于异步的send
方法不会阻塞,因此理论上你不断地异步发送消息,注册无穷多个callback到生产者那里等待通知。但是由于内存是有限的,很可能过多的callback对象会导致内存爆满并频繁发生Full GC,导致服务彻底不可用。因此异步发送应该做一定的流量控制限制,比如这里提供一个简单的例子,使用信号量做流控:
//最多同时存在1万个回调消息
Semaphore permits= new Semaphore(10000);
......
while (true) {
// 每次发送前都请求许可
if (permits.tryAcquire()) {
// 获取许可成功,发送消息
try {
producer.send(message, new SendMessageCallback() {
@Override
public void onMessageSent(SendResult result) {
try {
// 处理发送结果result
}
finally {
// 切记释放许可
permits.release();
}
}
@Override
public void onException(Throwable e) {
e.printStackTrace();
// 切记释放许可
permits.release();
}
});
}
catch (Exception e) {
e.printStackTrace();
}
//已发送,跳出循环。
break;
}
else {
// 让出执行权,等待再次获取许可
Thread.yield();
}
}
我们设置信号量的最大许可为1万,然后每次发送消息前都尝试申请许可,申请成功才可以发送消息,然后在callback里释放许可(无论是发送成功失败还是产生异常都需要释放许可);如果申请失败,则调用yield
让出执行权,等待while循环的下次重试。
这里只是一个简单的流控示范,你也可以用有限的阻塞队列等方式来实现流控。
MetaQ支持发送事务消息,包括本地事务和支持XA协议(二阶段提交协议)的分布式事务。
所谓本地事务消息,通俗来说就是批量发送一批消息,要么同时发送成功,要么同时发送失败,保证事务的ACID的特性。MessageProducer
类有beginTransaction()
,commit()
和rollback()
方法用来控制一个本地事务,一个具体的例子:
// 开始事务
producer.beginTransaction();
try{
SendResult result1 = producer.send(msg1);
SendResult result2 = producer.send(msg2);
if(result.isSuccess() && result2.isSuccess()){
producer.commit();
}else{
//任何一条发送失败都应该回滚消息
producer.rollback();
}
}catch(Exception e){
//发生异常,回滚消息
producer.rollback();
}
通过beginTransaction
开启一个本地事务,然后在事务内发送两条消息,只有在两条消息都发送成功的时候才能通过commit
方法提交事务,否则都使用rollback
回滚事务,同样,在发生异常的时候也应该回滚事务。
事务还可以设置超时时间
//设置事务超时为10秒
producer.setTransactionTimeout(10);
超过这个时间还没有提交或者回滚的事务,将被服务器自动回滚。事务的最大超时时间不能超过服务器设置的maxTxTimeoutInSeconds
,超过会被设置为maxTxTimeoutInSeconds
设定的最大值。
beginTransaction
,rollback
和commit
本质上是跟服务器做通讯,告诉服务器开始,回滚或者提交某个事务,通讯必然涉及到通讯超时的问题,也可以设置这些事务请求的超时时间,默认为5秒:
//设置事务请求超时为10秒。
producer.setTransactionRequestTimeout(10000,TimeUnit.MILLISECONDS);
通常来说这个值不应该超过事务的时间时间。
另外,请注意,事务内可以发送多个不同的topic类型的消息,前提是可以在zookeeper上找到至少一台broker提供提供所有这些topic。下面的XA事务也是一样。
所谓XA事务消息,是指在一个事务内除了MetaQ这个事务源之外,还有另外一个事务源参与了事务,最常见的比如数据库源。一个典型的场景是:往MetaQ发送消息,同时要向数据库插入一条数据,两个操作要么同时成功,要么同时失败。不允许出现发送消息成功,而插入数据库失败的情况,反之亦然。比如下订单这个操作,要往订单表插入一条记录,同时发送一条消息到MetaQ,执行一些异步任务如通知用户、物流,记录日志,统计分析等等,就需要分布式事务。JavaEE规范支持XA协议,也就是两阶段提交协议,更详细的关于这块的信息请参考JTA规范和J2EE规范,阅读两阶段协议的相关资料来获得。
在这里我们举一个简单的例子,假设有一张订单表是这样:
CREATE TABLE `test`.`orders` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`uid` int(11) NOT NULL,
`address` varchar(255) NOT NULL,
PRIMARY KEY (`id`)
);
要求uid
和address
字段都不允许为null。
现在我们就要模拟一个下订单的操作,同时插入一条记录到订单表,并且发送一条消息给MetaQ处理。
首先,创建XAMessageSessionFactory
实例,跟创建MessageSessionFactory
类似:
final XAMessageSessionFactory xasf = new XAMetaMessageSessionFactory(metaClientConfig);
接下来创建XAMessageProducer
并发布topic:
final String topic = "meta-test";
XAMessageProducer xaMessageProducer = xasf.createXAProducer();
// publish topic
xaMessageProducer.publish(topic);
XAMessageProducer
就是一个参与分布式事务的事务源,接下来要创建一个XADataSource
来代表数据库事务源,注意必须使用数据库的XA数据源:
final MysqlXADataSource mysqlXADataSource = new MysqlXADataSource();
mysqlXADataSource
.setUrl("jdbc:mysql://localhost:3306/test?characterEncoding=utf8&connectTimeout=1000&autoReconnect=true");
mysqlXADataSource.setUser("root");
mysqlXADataSource.setPassword("");
mysqlXADataSource.setPreparedStatementCacheSize(20);
接下来你还需要选择一个JTA实现,比如我们这里用Atomikos这个开源事务管理器实现,你也可以用容器内的JTA管理器,只要能获取TransactionManager即可,我们还需要一个XATransactionTemplate来包装XA事务的模板操作,我已经在例子里提供。利用这个模板来执行分布式事务:
final int uid = 100;
final String address = "beijing";
// we should create a template every transaction.
final XATransactionTemplate template = new XATransactionTemplate(tm, xads, xaMessageProducer);
template.executeCallback(new XACallback() {
@Override
public Object execute(final Connection conn, final XAMessageProducer producer, Status status)
throws Exception {
final PreparedStatement pstmt =
conn.prepareStatement("insert into orders(uid,address) values(?,?)");
pstmt.setInt(1, uid);
pstmt.setString(2, address);
pstmt.close();
if (pstmt.executeUpdate() <= 0) {
status.setRollbackOnly();
return null;
}
if (!producer.sendMessage(new Message(topic, address.getBytes())).isSuccess()) {
status.setRollbackOnly();
}
return null;
}
});
注意在XACallback
是如何处理发送失败或者插入失败的情况,都通过 status.setRollbackOnly();
来回滚整个分布式事务。也可以通过抛出运行时异常来回滚事务。
你可以尝试将address
设置为null来故意违反数据库约束来观察事务的回滚情况。
XAMessageProducer
跟MessageProducer
类似,同样可设置事务超时和事务请求超时,恕不赘述。
XAMessageProducer
有一个全局唯一的标识符,可以通过getUniqueQualifier()
获取,这个标识符必须全局唯一,它将被保存在MetaQ和TransactionManager的事务日志里,用于追踪和标识。
当运行这个分布式事务的某台机器要做迁移,必须同时迁移TransactionManager的事务日志(为了recover事务),为了保证在不同机器之间做到平滑迁移,XAMessageProducer
的标识符最好明确设置成一个不变的有意义的全局标识符,默认的标识符跟机器hostname绑定,你可以通过setUniqueQualifier
方法明确修改这个标识符。
TODO
MetaQ的topic必须首先在服务端配置,生产者才可以发送这个Topic的消息。但是常常会有这样的需求,希望能发送任何类型topic的消息,而不是每次都要在MetaQ服务端配置topic。这可以通过设置MessageProducer
的默认topic来实现。假设你发送topic1的消息,当生产者在zookeeper上找不到提供topic1的服务器,那就会选择提供默认topic的服务器来发送这条消息。
首先在服务端配置一个默认topic,如:
[topic=app-*]
注意,必须保证app-*
这个topic的numPartitions
和系统[system]
的配置保持一致。因为任意topic的消息因为没有明确在服务端配置,都会使用系统的默认配置,但是发送的时候却是使用app-*
的配置,因此两者必须保持一致。
接下来设置MessageProducer
的默认topic:
producer.setDefaultTopic("app-*");
接下来你就可以发布任意以app-
开头的topic的消息,例如:
producer.send(new Message("app-topic1","hello world".getBytes()));
producer.send(new Message("app-topic2","hello world".getBytes()));
producer.send(new Message("app-topic3","hello world".getBytes()));
哪怕这些topic没有被publish
过。这些topic都将在服务端动态创建,无需明确配置。
我们这里以app-*
作为默认topic,如果你不想有任何前缀设置,那也可以使用*
作为默认topic,这样就可以发送任意类型topic的消息。
默认情况下,消息生产者会获取所有服务器的分区并组织成一张链表,按照broker id,partition号的顺序的排序,然后轮询发送消息到这些分区。这个轮询策略就是一个分区选择器RoundRobinPartitionSelector
。你可以定制自己的分区选择器,只要实现PartitionSelector
接口即可:
public interface PartitionSelector {
/**
* 根据topic、message从partitions列表中选择分区
*
* @param topic
* topic
* @param partitions
* 分区列表
* @param message
* 消息
* @return
* @throws MetaClientException
* 此方法抛出的任何异常都应当包装为MetaClientException
*/
public Partition getPartition(String topic, List<Partition> partitions, Message message) throws MetaClientException;
}
在定制实现自己的分区选择器之后,可以在创建消息生产者的时候传入:
MessageProducer producer = sessionFactory.createProducer(new MyPartitionSelector());
除了RoundRobinPartitionSelector
之外,MetaQ还提供RandomPartitionSelector
用于随机选择分区发送。
TODO
消息生产者作为重量级资源,最好也作为单例存在,同时也需要明确地调用shutdown()
方法来释放资源。