@Configuration
public
class
rocketMqConfig {
@Value
(
"${apache.rocketmq.namesrvAddr}"
)
private
String namesrvAddr;
private
static
final
String PRODUCER_GROUP =
"xd_producer_group"
;
@Bean
public
ITransactionRocketMqUtil rocketMqUtilSs(){
TransactionMQProducer producer =
new
TransactionMQProducer(PRODUCER_GROUP);
producer.setVipChannelEnabled(
false
);
producer.setNamesrvAddr(namesrvAddr);
ThreadFactory threadFactory =
new
ThreadFactoryBuilder().setNameFormat(
"transaction-thread-name-%s"
).build();
ThreadPoolExecutor executor =
new
ThreadPoolExecutor(
2
,
5
,
60
, TimeUnit.SECONDS,
new
ArrayBlockingQueue<>(
30
), threadFactory);
producer.setExecutorService(executor);
producer.setTransactionListener(
new
TransactionListener() {
@Override
public
LocalTransactionState executeLocalTransaction(Message message, Object o) {
try
{
LoggerUtil.info(
"开始事务处理"
);
System.out.println(
1
/
0
);
LoggerUtil.info(
"完成事务"
);
return
LocalTransactionState.COMMIT_MESSAGE;
}
catch
(Exception e){
LoggerUtil.info(
"失败事务"
);
return
LocalTransactionState.ROLLBACK_MESSAGE;
}
finally
{
LoggerUtil.info(
"事务处理结束"
);
return
null
;
}
}
@Override
public
LocalTransactionState checkLocalTransaction(MessageExt messageExt) {
try
{
LoggerUtil.info(
"开始事务回查"
);
System.out.println(
1
/
1
);
return
LocalTransactionState.COMMIT_MESSAGE;
}
catch
(Exception e){
return
LocalTransactionState.ROLLBACK_MESSAGE;
}
finally
{
LoggerUtil.info(
"事务回查结束"
);
}
}
});
return
new
TransactionRocketMqUtil(producer);
}
}