记录下用 RabitMQ 订阅 binlog 的方法,相关编码的 GitHub 地址在 lin-mq
1.数据库 数据库新建 canal 从库用户用于订阅
1 2 3 CREATE USER canal IDENTIFIED BY 'canal&*123ABC' ;GRANT SELECT , REPLICATION SLAVE, REPLICATION CLIENT ON * .* TO 'canal' @'%' ;FLUSH PRIVILEGES;
查看是否开启 binlog 模式,如果log_bin的值为OFF是未开启,为ON是已开启
1 SHOW VARIABLES LIKE '%log_bin%' ;
修改/etc/my.cnf 需要开启binlog模式
1 2 3 4 [mysqld] log-bin=mysql-bin binlog-format=ROW server_id=1
2.canal 下载 canal
1 2 3 4 5 wget https://mirror.ghproxy.com/\?q\=https%3A%2F%2Fgithub.com%2Falibaba%2Fcanal%2Freleases%2Fdownload%2Fcanal-1.1.5%2Fcanal.deployer-1.1.5.tar.gz mv index.html\?q=https:%2F%2Fgithub.com%2Falibaba%2Fcanal%2Freleases%2Fdownload%2Fcanal-1.1.5%2Fcanal.deployer-1.1.5.tar.gz canal.deployer.tar.gztar -zvxf canal.deployer.tar.gz
配置
conf/canal.properties
1 2 3 4 5 6 7 8 9 10 canal.serverMode = rabbitMQ rabbitmq.host = 172.17.0.1 rabbitmq.virtual.host = / rabbitmq.exchange = BINLOG_MQ_EXCHANGE rabbitmq.username = mq rabbitmq.password = mq123 rabbitmq.deliveryMode =
conf/example/instance.properties
1 2 3 4 5 6 7 8 9 10 11 12 13 canal.instance.dbUsername=canal canal.instance.dbPassword=canal&*123ABC canal.instance.connectionCharset = UTF-8 canal.instance.filter.regex=.*\\..* canal.instance.filter.black.regex=mysql\\.slave_.* canal.mq.topic=BINLOG_MQ_KEY.canal
启动没成功,错误
1 2 3 4 5 6 OpenJDK 64-Bit Server VM warning: Ignoring option PermSize; support was removed in 8.0 OpenJDK 64-Bit Server VM warning: Ignoring option MaxPermSize; support was removed in 8.0 OpenJDK 64-Bit Server VM warning: Option UseConcMarkSweepGC was deprecated in version 9.0 and will likely be removed in a future release. Unrecognized VM option 'UseCMSCompactAtFullCollection' Error: Could not create the Java Virtual Machine. Error: A fatal exception has occurred. Program will exit .
环境用的 openjdk11,有些 JVM 参数不能用所有启动失败了,改下启动脚本指定用 java8,或者自己改下 JVM 参数试试,如果你懂的话
1 2 3 4 5 6 if [ -z "$JAVA " ] ; then //JAVA=$(which java) which 出来的是系统变量的 java 路径,换成你想要的,这里我的 java8 路径是 /usr/local/java/bin/java JAVA=/usr/local/java/bin/java fi
3.RabbitMQ 和 Java 工程消费 建一个 topic 模式的交换机 BINLOG_MQ_EXCHANGE,再建一个队列 BINLOG_MQ_QUEUE 绑定交换机,路由键设置为 BINLOG_MQ_KEY.*
Spring Boot Java 工程 MQ 的配置
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 import org.springframework.amqp.core.Binding;import org.springframework.amqp.core.BindingBuilder;import org.springframework.amqp.core.Queue;import org.springframework.amqp.core.TopicExchange;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;@Configuration public class BinlogMQConfig { public final static String BINLOG_MQ_EXCHANGE = "BINLOG_MQ_EXCHANGE" ; public final static String BINLOG_MQ_QUEUE = "BINLOG_MQ_QUEUE" ; public final static String BINLOG_MQ_KEY = "BINLOG_MQ_KEY.*" ; @Bean public TopicExchange binlogTopicExchange () { return new TopicExchange (BINLOG_MQ_EXCHANGE); } @Bean public Queue binlogQueue () { return new Queue (BINLOG_MQ_QUEUE); } @Bean Binding bindingBinlogExchangeMessages (Queue queue, TopicExchange topicExchange) { return BindingBuilder.bind(queue).to(topicExchange).with(BINLOG_MQ_KEY); } }
消费者这里用了事物确认模式,手动 ACK 以下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 import com.rabbitmq.client.Channel;import lombok.extern.slf4j.Slf4j;import org.springframework.amqp.core.ExchangeTypes;import org.springframework.amqp.core.Message;import org.springframework.amqp.rabbit.annotation.*;import org.springframework.stereotype.Component;import java.io.IOException;@Slf4j @Component public class BinlogConsumerService { @RabbitListener(bindings = @QueueBinding( value = @Queue(value = BinlogMQConfig.BINLOG_MQ_QUEUE, autoDelete = "false"), exchange = @Exchange(value = BinlogMQConfig.BINLOG_MQ_EXCHANGE, type = ExchangeTypes.TOPIC), key = BinlogMQConfig.BINLOG_MQ_KEY), containerFactory = "pointTaskContainerFactory") @RabbitHandler public void process (Message msg, Channel channel) throws IOException { log.info("===binlog消费者获取mq消息:{}" , msg); log.info("===msg properties: " + msg.getMessageProperties().toString()); log.info("===msg body: " + new String (msg.getBody())); channel.basicAck(msg.getMessageProperties().getDeliveryTag(),false ); } }
测试插入和更新
1 2 3 4 INSERT INTO `lin`.`sys_log` (`user_id`, `username`, `operation`, `time `, `method `, `params`, `ip`, `gmt_create`) VALUES ('100' , 'admin' , '用户登录' , '162' , 'com.admin.system.controller.LoginController.ajaxLogin()' , NULL , '127.0.0.1' , '2021-09-11 17:59:33' ); UPDATE sys_log SET user_id= '101' WHERE id = 9563 ;
消费日志
1 2 3 4 5 6 2021-09-11 18:20:49.246 INFO 3500 --- [ntContainer 2021-09-11 18:20:49.247 INFO 3500 --- [ntContainer 2021-09-11 18:20:49.247 INFO 3500 --- [ntContainer 2021-09-11 18:21:15.380 INFO 3500 --- [ntContainer 2021-09-11 18:21:15.380 INFO 3500 --- [ntContainer 2021-09-11 18:21:15.380 INFO 3500 --- [ntContainer
4.参考