邢台网站建设网站,购物网站设计的意义,贵金属交易平台app最新排名,重庆市建设工程信息网官网入口文章目录一、原理实现1. 方案设计流程图2. 实现原理二、mysql开启binlog模式2.1. 配置my.ini2.2. 重启mysql服务2.3. 验证binlog模式2.4. 创建canal账号2.5. 账号验证三、docker-compose环境搭建3.1. 环境总览3.2. 编写docker-compose.yml3.3. 安装docker-compose3.4. 构建环境…
文章目录一、原理实现1. 方案设计流程图2. 实现原理二、mysql开启binlog模式2.1. 配置my.ini2.2. 重启mysql服务2.3. 验证binlog模式2.4. 创建canal账号2.5. 账号验证三、docker-compose环境搭建3.1. 环境总览3.2. 编写docker-compose.yml3.3. 安装docker-compose3.4. 构建环境3.5. 环境验证3.6. 异常解决四、微服务项目实战4.1. 项目依赖4.2. yml配置4.3. 索引对象4.4. 监听对象4.5. 表结构4.6. 类型常量四、测试验证4.1. 改变数据4.2. 监听数据变化一、原理实现
1. 方案设计流程图 2. 实现原理
二、mysql开启binlog模式
2.1. 配置my.ini
找到my.ini配置文件位置 添加以下内容
# 开启binlog
log-binmysql-bin
# 选择ROW模式
binlog-formatROW
# 配置Mysql replaction 需要定义 不要和canal的serverId 重复
server_id12.2. 重启mysql服务
鼠标右击重启
2.3. 验证binlog模式
log_bin为ON 则binlog开启
show variables like log_%;2.4. 创建canal账号
mysql -uroot -p123456
# 创建账号账号canal 密码canal
CREATE USER canal IDENTIFIED BY canal;
# 赋予权限
GRANT SELECT,REPLICATION SLAVE,REPLICATION CLIENT ON *.* TO canal%;
# 刷新并应用权限
flush privileges;2.5. 账号验证
新建mysql连接用户canal 密码canal 主机为win本机ip地址
三、docker-compose环境搭建
3.1. 环境总览
中间件ip地址端口mysql8.0.26192.168.43.1223306canal-server1.1.4192.168.159.14211111zookeeperlatest192.168.159.1422181kafka2.12-2.3.0192.168.159.1429092elasticsearch6.8.6192.168.159.1429200/9300kibana6.5.4192.168.159.1425601
3.2. 编写docker-compose.yml mysql 在windows本地 IP说明备注192.168.43.122mysql连接地址或者开放指定中间件端口192.168.159.142宿主机地址关闭防火墙或者开放指定中间件端口
version: 2
services: canal-server: image: canal/canal-server:v1.1.4container_name: canal-serverports: - 11111:11111environment: - canal.instance.mysql.slaveId12- canal.auto.scanfalse- canal.instance.master.address192.168.43.122:3306- canal.instance.dbUsernamecanal- canal.instance.dbPasswordcanal- canal.mq.topicmayikt-20212-topic- canal.serverModekafka- canal.mq.servers192.168.159.142:9092volumes: - /app/canal-server/conf/:/admin/canal-server/conf/- /app/canal-server/logs/:/admin/canal-server/logs/zookeeper:image: wurstmeister/zookeeperports:- 2181:2181restart: alwayskafka:image: wurstmeister/kafka:2.12-2.3.0ports:- 9092:9092environment:- KAFKA_ZOOKEEPER_CONNECTzookeeper:2181- KAFKA_ADVERTISED_LISTENERSPLAINTEXT://192.168.159.142:9092- KAFKA_LISTENERSPLAINTEXT://:9092volumes:- /var/run/docker.sock:/var/run/docker.sockrestart: alwayselasticsearch:image: daocloud.io/library/elasticsearch:6.8.6restart: alwayscontainer_name: elasticsearchenvironment:- ES_JAVA_OPTS-Xms512m -Xmx512mports: - 9200:9200- 9300:9300kibana:image: daocloud.io/library/kibana:6.5.4restart: alwayscontainer_name: kibanaports:- 5601:5601environment:- elasticsearch_urlhttp://192.168.159.142:9200depends_on:- elasticsearch
3.3. 安装docker-compose
安装docker-compose插件 https://blog.csdn.net/weixin_40816738/article/details/126422834
3.4. 构建环境
mkdir /app/canal把docker-compose.yml上传至/app/canal目录下
开始构建
docker-compose up -d3.5. 环境验证
docker psCONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
cb47055de10b daocloud.io/library/kibana:6.5.4 /usr/local/bin/kiba… 3 hours ago Up 3 hours 0.0.0.0:5601-5601/tcp, :::5601-5601/tcp kibana
07cdb595dc8d wurstmeister/kafka:2.12-2.3.0 start-kafka.sh 3 hours ago Up 3 hours 0.0.0.0:9092-9092/tcp, :::9092-9092/tcp canal_kafka_1
78e49092fc37 wurstmeister/zookeeper /bin/sh -c /usr/sb… 3 hours ago Up 3 hours 22/tcp, 2888/tcp, 3888/tcp, 0.0.0.0:2181-2181/tcp, :::2181-2181/tcp canal_zookeeper_1
7bcb0ad7bf89 daocloud.io/library/elasticsearch:6.8.6 /usr/local/bin/dock… 3 hours ago Up 3 hours 0.0.0.0:9200-9200/tcp, :::9200-9200/tcp, 0.0.0.0:9300-9300/tcp, :::9300-9300/tcp elasticsearch
12328f934a1b canal/canal-server:v1.1.4 /alidata/bin/main.s… 3 hours ago Up 3 hours 9100/tcp, 11110/tcp, 11112/tcp, 0.0.0.0:11111-11111/tcp, :::11111-11111/tcp canal-serveres http://192.168.122.128:9200/ kibanahttp://192.168.159.142:5601/app/kibana#/home?_g() zk
3.6. 异常解决
max virtual memory areas vm.max_map_count [65530] is too low, increase to at least [262144] https://blog.csdn.net/weixin_40816738/article/details/126333689?
四、微服务项目实战
4.1. 项目依赖
备注es版本服务端和客户端要一致 !-- springBoot集成kafka --dependencygroupIdorg.springframework.kafka/groupIdartifactIdspring-kafka/artifactId/dependencydependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-data-elasticsearch/artifactId/dependency4.2. yml配置
# kafka
spring:kafka:# kafka服务器地址(可以多个)bootstrap-servers: 192.168.159.142:9092consumer:# 指定一个默认的组名group-id: kafka2# earliest:当各分区下有已提交的offset时从提交的offset开始消费无提交的offset时从头开始消费# latest:当各分区下有已提交的offset时从提交的offset开始消费无提交的offset时消费新产生的该分区下的数据# none:topic各分区都存在已提交的offset时从offset后开始消费只要有一个分区不存在已提交的offset则抛出异常auto-offset-reset: earliest# key/value的反序列化key-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializerenable-auto-commit: falseproducer:# key/value的序列化key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializer# 批量抓取batch-size: 65536# 缓存容量buffer-memory: 524288data:elasticsearch:repositories:enabled: truecluster-name: docker-clustercluster-nodes: 192.168.159.142:9300
server:port: 8085
4.3. 索引对象 商品信息表commodity_info表索引对象企业中真实场景应该先筛选搜索频次较高的信息然后编写索引结构存储es package com.mayikt.canal.commodity.es.bean;import lombok.Data;
import org.springframework.data.annotation.Id;
import org.springframework.data.elasticsearch.annotations.Document;
import org.springframework.data.elasticsearch.annotations.Field;
import org.springframework.data.elasticsearch.annotations.FieldType;import java.io.Serializable;
import java.math.BigDecimal;
import java.util.Date;/*** author gblfy* date 2022-8-19* 商品信息表commodity_info表索引对象*/
Data
//通过这个注解可以声明一个文档指定其所在的索引库和type
Document(indexName commodityinfo)
public class CommodityInfo implements Serializable {Idprivate Long id;private Long commodityId;//商品IDField(type FieldType.Text)private String name;//商品名称Field(type FieldType.Text)private String mainImage;//大图片Field(type FieldType.Text)private String subImage;//小图片Field(type FieldType.Text)private String detail;//详情Field(type FieldType.Text)private String attributeList;//属性列表Field(type FieldType.Double)private BigDecimal price;//价格Field(type FieldType.Integer)private Long stock;//库存Field(type FieldType.Integer)private Integer status;//状态Field(type FieldType.Date)private Date createTime;//创建时间private Date updateTime;//更新时间
}
4.4. 监听对象
package com.mayikt.canal.commodity.consumer;import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.mayikt.canal.commodity.constant.MayiktConstant;
import com.mayikt.canal.commodity.es.bean.CommodityInfo;
import com.mayikt.canal.commodity.es.repository.CommodityInfoRepository;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;/*** author gblfy* date 2022-8-19* 监听mysql数据库*/
Component
Slf4j
public class CanalConsumer {Autowiredprivate CommodityInfoRepository commodityInfoRepository;/*** 消费者监听mayikt-topic** param consumer*/KafkaListener(topics mayikt-20212-topic)public void receive01(ConsumerRecord?, ? consumer) {log.info(分组1的消费者1topic名称:{},,key:{},分区位置:{},offset{},数据:{},consumer.topic(), consumer.key(), consumer.partition(), consumer.offset(), consumer.value());String json (String) consumer.value();JSONObject jsonObject JSONObject.parseObject(json);//监听数据 变化类型UPDATE INSERT DELETE 根据type调用es不同apiString type jsonObject.getString(type);String pkNames jsonObject.getJSONArray(pkNames).getString(0);JSONArray data jsonObject.getJSONArray(data);//监听表名称String table jsonObject.getString(table);//监听数据库String database jsonObject.getString(database);for (int i 0; i data.size(); i) {JSONObject dataObject data.getJSONObject(i);//商品信息表commodity_info表索引对象 这是一个通用方法根据不同数据库的不同表走不通分支进行数据同步esCommodityInfo productEntity dataObject.toJavaObject(CommodityInfo.class);switch (type) {case MayiktConstant.CANAL_UPDATE:case MayiktConstant.CANAL_INSERT:commodityInfoRepository.save(productEntity);break;case MayiktConstant.CANAL_DELETE:commodityInfoRepository.delete(productEntity);break;}}}
}
4.5. 表结构
create database mayikt-integral;
use mayikt-integral;SET NAMES utf8mb4;
SET FOREIGN_KEY_CHECKS 0;-- ----------------------------
-- Table structure for commodity_info
-- ----------------------------
DROP TABLE IF EXISTS commodity_info;
CREATE TABLE commodity_info (id int NOT NULL,commodity_id int NULL DEFAULT NULL COMMENT 商品ID,name varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT 商品名称,subtitle varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT 商品子标题,main_image varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT 大图片,sub_image varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT 小图片,detail varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT 详情,attribute_list varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT 属性列表,price int NULL DEFAULT NULL COMMENT 价格,stock varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT 库存,status int NULL DEFAULT NULL COMMENT 状态,create_time datetime NULL DEFAULT NULL COMMENT 创建时间,update_time datetime NULL DEFAULT NULL COMMENT 更新时间,PRIMARY KEY (id) USING BTREE
) ENGINE InnoDB CHARACTER SET utf8 COLLATE utf8_general_ci COMMENT 商品信息表 ROW_FORMAT Dynamic;-- ----------------------------
-- Records of commodity_info
-- ----------------------------
INSERT INTO commodity_info VALUES (1, 10000, meite11111, 2030华为P100全新架构师10G手机, 1, 1, 2030华为P100全新架构10G手机, [{\内存\:\128GB\,\颜色\:\红色\,\国行\},{\内存\:\128GB\,\颜色\:\红色\,\国行\}], NULL, NULL, NULL, 2021-04-27 16:22:28, 2021-04-27 16:22:33);SET FOREIGN_KEY_CHECKS 1;4.6. 类型常量
package com.mayikt.canal.commodity.constant;/*** author gblfy* date 2022-8-19* 类型常量*/
public interface MayiktConstant {String CANAL_UPDATE UPDATE;String CANAL_DELETE DELETE;String CANAL_INSERT INSERT;
}
es接口
package com.mayikt.canal.commodity.es.repository;import com.mayikt.canal.commodity.es.bean.CommodityInfo;
import org.springframework.data.elasticsearch.repository.ElasticsearchRepository;
import org.springframework.stereotype.Repository;/*** author gblfy* date 2022-8-19* es接口*/
Repository
public interface CommodityInfoRepository extends ElasticsearchRepositoryCommodityInfo, Long {
}
四、测试验证
4.1. 改变数据 4.2. 监听数据变化 json格式化后 同步后