开发准备 依赖 新建Maven工程,引入依赖
1 2 3 4 5 6 <!-- https: <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>4.5.0</version> </dependency>
这里我还引入了一个rocketmq-example,这里面的示例代码和官方文档是一直的,可以直接参考 1 2 3 4 5 6 <!-- https: <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-example</artifactId> <version>4.5.0</version> </dependency>
前一步的RocketMQ服务搭建并运行无误就可以进入Java编码了 命令 这里重点说一下,防火墙请放行9876端口,如果可以请把防火墙彻底关掉。 贴上CentOS7中 firewalld 的基本使用命令:启动:systemctl start firewalld 关闭:systemctl stop firewalld 查看状态:systemctl status firewalld 开机禁用 :systemctl disable firewalld 开机启用 :systemctl enable firewalld 放行9876端口:firewall-cmd --zone=public --add-port=9876/tcp --permanent(–permanent永久生效,没有此参数重启后失效) 重新载入:firewall-cmd --reload(修改端口后需要重新载入) 查看9876端口:firewall-cmd --zone= public --query-port=9876/tcp 删除9876端口:firewall-cmd --zone= public --remove-port=9876/tcp --permanent 文档 本篇代码参照自官方文档及 rocketmq-example 中的示例代码 官方文档及快速上手示例地址:https://rocketmq.apache.org/docs/simple-example/ Producer示例代码路径:org.apache.rocketmq.example.quickstart.Producer (需导入 rocketmq-example 依赖) Customer示例代码路径:org.apache.rocketmq.example.quickstart.Customer (需导入 rocketmq-example 依赖) 编码运行 生产者 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 package com.rocketmq.demo;import org.apache.rocketmq.client.exception.MQClientException;import org.apache.rocketmq.client.producer.DefaultMQProducer;import org.apache.rocketmq.client.producer.SendResult;import org.apache.rocketmq.common.message.Message;import org.apache.rocketmq.remoting.common.RemotingHelper;public class ProducerTest { public static void main (String[] args) throws MQClientException, InterruptedException { DefaultMQProducer producer = new DefaultMQProducer("myProducerGroupName_1" ); producer.setNamesrvAddr("47.75.222.71:9876" ); producer.setInstanceName("myProducer_1" ); producer.start(); boolean flag1 = true ; boolean flag2 = true ; for (int i = 0 ; i < 8 ; i++) { try { Message msg; if (i % 2 == 1 ) { msg = new Message("myTopicTest_1" , flag1 ? "TagA" : "TagB" , ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)); flag1 = !flag1; } else { msg = new Message("myTopicTest_2" , flag2 ? "TagC" : "TagD" , ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)); flag2 = !flag2; } SendResult sendResult = producer.send(msg); System.out.printf("%s%n" , sendResult); } catch (Exception e) { e.printStackTrace(); Thread.sleep(500 ); } } producer.shutdown(); } }
消费者 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 package com.rocketmq.demo;import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;import org.apache.rocketmq.client.exception.MQClientException;import org.apache.rocketmq.common.consumer.ConsumeFromWhere;import org.apache.rocketmq.common.message.MessageExt;import java.util.List;public class CustomerTest { public static void main (String[] args) throws MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("myCustomerGroupName_1" ); consumer.setNamesrvAddr("47.75.222.71:9876" ); consumer.setInstanceName("myConsumer_1" ); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); consumer.subscribe("myTopicTest_1" , "TagA" ); consumer.subscribe("myTopicTest_2" , "TagC||TagD" ); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage (List<MessageExt> msgs, ConsumeConcurrentlyContext context) { System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs.size()); MessageExt msg = msgs.get(0 ); System.out.printf("%s Receive New Messages: %s %n" , new Object[]{Thread.currentThread().getName(), msgs}); System.out.println("----topic: " + msg.getTopic() + ", tag: " + msg.getTags() + ", body: " + new String(msg.getBody())); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); System.out.printf("Consumer Started.%n" ); } }
# 采坑记录
RocketMQ服务外网不能访问 如果你按照上一篇的方式运行,那么你可能会遇到下面这种错误(在本地运行RocketMQ不会遇到这个问题) 1 2 3 4 5 6 7 org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException: sendDefaultImpl call timeout at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendDefaultImpl(DefaultMQProducerImpl.java:634 ) at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:1279 ) at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:1225 ) at org.apache.rocketmq.client.producer.DefaultMQProducer.send(DefaultMQProducer.java:283 ) at com.rocketmq.demo.ProducerTest.main(ProducerTest.java:69 ) 09 :19 :42.792 [NettyClientSelector_1] INFO RocketmqRemoting - closeChannel: close the connection to remote address[] result: true
1 2 3 4 5 6 7 8 9 10 11 12 brokerClusterName = DefaultCluster brokerName = broker-a brokerId = 0 deleteWhen = 04 fileReservedTime = 48 brokerRole = ASYNC_MASTER flushDiskType = ASYNC_FLUSH # 添加如下两行内容,'47.**.***.71'为你的服务器公网IP namesrvAddr=47 .**.***.71 :9876 # 需要指定broker外网IP否则外网不能访问 brokerIP1=47 .**.***.71
然后将broker的运行方式改为: nohup sh bin/mqbroker -c conf/broker.conf & 未标明转载的内容均为博主原创,行文难免有疏漏,如有错误欢迎留言指出 明人不说暗话,如果你觉得可以的话,你懂的!
打赏 微信支付
支付宝