吾爱破解 - 52pojie.cn

 找回密码
 注册[Register]

QQ登录

只需一步,快速开始

查看: 3107|回复: 6
收起左侧

[Java 转载] 基于rocketmq的分布式事务

[复制链接]
especial 发表于 2020-8-11 15:30
本帖最后由 especial 于 2020-8-11 15:35 编辑

一、linux安装启动rocketmq注意点
    1、在安装完rocketmq之后要先启动 mqnamesrv 再启动 mqbroker
    2、在启动mqbroker的时候要记得处理ip的问题,默认启动会使用docker的内网地址,需要配置成虚拟机ip
[Shell] 纯文本查看 复制代码
1
2
3
4
5
6
7
8
#启动mqnamesrv
nohup sh mqnamesrv &
#创建一个mqbroker的配置文件
echo brokerIP1=192.168.147.130 >broker.properties
#启动mqbroker 开启自动创建主题
nohup sh mqbroker -n 你的ip:9876 -c broket.properties autoCreateTopicEnable=true &
#查看是否启动成功
tail -f ~/logs/rocketmqlogs/broker.log





二、java代码通过rocketmq的事务 处理分布式事务 并封装(个人基本封装例子,可通过基于配置文件配置,更灵活简便)这里直接使用代码配置


1、引入依赖
[XML] 纯文本查看 复制代码
01
02
03
04
05
06
07
08
09
10
<dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-client</artifactId>
            <version>4.4.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-remoting</artifactId>
            <version>4.4.0</version>
        </dependency>



2、启动加载生产者实例工具类,使用的是TransactionMqProducer  使用事务  默认的是不具有事务的,并且配置事务处理的类 实现TransactionListener  这里没有独立写 直接new了



[Java] 纯文本查看 复制代码
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
@Configuration
public class rocketMqConfig {
 
    @Value("${apache.rocketmq.namesrvAddr}")
    private String namesrvAddr;
 
    private static final String PRODUCER_GROUP = "xd_producer_group";
 
    @Bean
    public ITransactionRocketMqUtil rocketMqUtilSs(){
 
        TransactionMQProducer producer = new TransactionMQProducer(PRODUCER_GROUP);
        producer.setVipChannelEnabled(false);
        producer.setNamesrvAddr(namesrvAddr);
        //创建一个自定义线程工厂
        ThreadFactory threadFactory =
                new ThreadFactoryBuilder().setNameFormat("transaction-thread-name-%s").build();
        //创建一个线程池
        ThreadPoolExecutor executor =
                new ThreadPoolExecutor(
                        2, 5, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(30), threadFactory);
        //设置生产者线程池
        producer.setExecutorService(executor);
        //设置事务监听器
        producer.setTransactionListener(new TransactionListener() {
            @Override
            public LocalTransactionState executeLocalTransaction(Message message, Object o) {
                try {
                    LoggerUtil.info("开始事务处理");
                    System.out.println(1/0);
                    LoggerUtil.info("完成事务");
                    return LocalTransactionState.COMMIT_MESSAGE;
                }catch (Exception e){
                    LoggerUtil.info("失败事务");
                    return LocalTransactionState.ROLLBACK_MESSAGE;
                }finally {
                    LoggerUtil.info("事务处理结束");
                    return null;
                }
            }
 
            @Override
            public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {
                try {
                    LoggerUtil.info("开始事务回查");
                    System.out.println(1/1);
                    return LocalTransactionState.COMMIT_MESSAGE;
                }catch (Exception e){
                    return LocalTransactionState.ROLLBACK_MESSAGE;
                }finally {
                    LoggerUtil.info("事务回查结束");
                }
            }
        });
        return new TransactionRocketMqUtil(producer);
    }
}



3、生产者消息发送工具类


[Java] 纯文本查看 复制代码
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public class TransactionRocketMqUtil implements ITransactionRocketMqUtil {
 
    private TransactionMQProducer producer;
 
    public TransactionRocketMqUtil(TransactionMQProducer producer){
        LoggerUtil.info("初始化类");
        this.producer = producer;
        start();
    }
 
    public TransactionSendResult transactionSend (String topic,String msg) throws Exception {
        Message message = new Message(topic,msg.getBytes());
        TransactionSendResult result =  producer.sendMessageInTransaction(message,null);
        return result;
    }
 
    public void shutdown(){
        this.producer.shutdown();
    }
 
    /**
     * 使用前先调用start方法
     */
    public void start(){
        try {
            this.producer.start();
        }catch (Exception e){
            e.printStackTrace();
        }
    }
}



4、消费者注册封装


[Java] 纯文本查看 复制代码
01
02
03
04
05
06
07
08
09
10
11
12
13
@Configuration
public class ConsumerContext {
 
    private DefaultMQPushConsumer consumer;
 
    @Value("${apache.rocketmq.namesrvAddr}")
    private String namesrvAddr;
    @Bean
    public ContextContextTool testConsumer() throws Exception {
        new ContextContextTool().ContextTool();
        return null;
    }
}



这里我写死了消费者的类可以在 ContextTool() 方法中配置传入消费者的 类 以及组 和 主题   消费者的类需要实现MessageListenerConcurrently接口,逻辑写在consumeMessage方法中


[Java] 纯文本查看 复制代码
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
public class ContextContextTool {
    @Value("${apache.rocketmq.namesrvAddr}")
    private static String namesrvAddr;
    public void ContextTool() throws MQClientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("test_group");
        consumer.setNamesrvAddr(namesrvAddr);
        consumer.subscribe("test_topic","*");
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                for (MessageExt msg : list) {
                    LoggerUtil.info("消费成功"+msg);
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();
        LoggerUtil.info("创建消费者并启动监听:test_topic");
    }
}



5、消息发送


[Java] 纯文本查看 复制代码
1
2
3
4
public void sendProducerTransaction() throws Exception {
       TransactionSendResult result = transactionRocketMqUtil.transactionSend("test_topic","test_msg");
       LoggerUtil.info("发送消息到rocketmq事务监听"+result);
   }



6、结果


其中可能还存在某些问题 欢迎大佬指正



测试

测试
图片.png

免费评分

参与人数 2吾爱币 +5 热心值 +2 收起 理由
alian2018 + 1 谢谢@Thanks!
苏紫方璇 + 5 + 1 欢迎分析讨论交流,吾爱破解论坛有你更精彩!

查看全部评分

发帖前要善用论坛搜索功能,那里可能会有你要找的答案或者已经有人发布过相同内容了,请勿重复发帖。

头像被屏蔽
偶尔平凡 发表于 2020-8-12 12:14
提示: 作者被禁止或删除 内容自动屏蔽
JsFuck攻城狮 发表于 2020-8-12 16:59
怎么保证消息的丢失问题,之前业务上的做法是把消息持久化到库里
 楼主| especial 发表于 2020-8-13 10:01
JsFuck攻城狮 发表于 2020-8-12 16:59
怎么保证消息的丢失问题,之前业务上的做法是把消息持久化到库里

1、rocketmq节点集群
2、rocketmq事务是一个2段式提交,事务开始只是提交了预备消息  在事务开始前提交,只有事务完成才会commit 提交真正的消息,在没有收到commit事务确认,预备消息会进行回查,确认后再进行消费,消费失败会重复调用消费,分布式事务做到的是数据最终一致。如果说在预备消息发送就丢失了,则第一段事务是根本不会执行的。
3、如果在第一段提交成功 commit成功后 还没消费 消息丢失,这种情况只能通过附加业务冗余来保证稳定性,但是rocketmq是十分稳定的,可以这么说,你整个分布式系统挂了,redis 挂了rocketmq都不会挂。
(个人见解)
c03xp 发表于 2020-8-13 10:36
长知识了
drundragon 发表于 2020-8-21 16:09
谢谢分享
头像被屏蔽
xiadongming 发表于 2021-9-4 14:28
提示: 作者被禁止或删除 内容自动屏蔽
您需要登录后才可以回帖 登录 | 注册[Register]

本版积分规则

返回列表

RSS订阅|小黑屋|处罚记录|联系我们|吾爱破解 - LCG - LSG ( 京ICP备16042023号 | 京公网安备 11010502030087号 )

GMT+8, 2025-7-8 05:31

Powered by Discuz!

Copyright © 2001-2020, Tencent Cloud.

快速回复 返回顶部 返回列表