Donnie

不积跬步无以至千里

Oracle数据同步到mysql数据库

Navicat的数据传输 vs 阿里愚公

1.Navicat 优点是可以自动在mysql创建表结构,但字段不一定完美,需要手动需改,阿里愚公需要自己建对应的表
2.Navicat 缺点是不能增量同步,阿里愚公可以增量同步
3.数据量少可以选择Navicat,数量大选择阿里愚公,因为万一出现数据同步失败,又需重新来过,比较耗时。阿里愚公可以增量
4.Navicat是收费,阿里愚公是开源,也可以自己修改源码按需同步

开源软件:阿里愚公

1.源码下载
git clone https://github.com/alibaba/yugong.git

2.编译
mvn clean install -Dmaven.test.skip -Denv=release

3.修改配置文件yugong.properties


    #oracle数据源
    yugong.database.source.username=test
    yugong.database.source.password=test
    yugong.database.source.type=ORACLE
    yugong.database.source.url=jdbc:oracle:thin:@192.168.1.16:1521:oracl
    yugong.database.source.encode=UTF-8
    yugong.database.source.poolSize=30
    
    #mysql数据源
    #注意加:?useUnicode=true&characterEncoding=utf-8,不加可能出现中文乱码
    yugong.database.target.url=jdbc:mysql://192.168.1.17:3306/test?useUnicode=true&characterEncoding=utf-8
    yugong.database.target.username=root
    yugong.database.target.password=root
    yugong.database.target.type=DRDS
    yugong.database.target.encode=UTF-8
    yugong.database.target.poolSize=30
    #需要同步的表,表名跟字段最好都相同
    yugong.table.white=test_table
    
    #其他基本不用改

源码可能出现queue空指针问题

解决办法
将OracleFullRecordExtractor类的77行改到71行
即 queue = new LinkedBlockingQueue<>(context.getOnceCrawNum() * 2); 初始化要放在
extractorThread = new NamedThreadFactory 前面
如图

注意

愚公是增量同步,每同步一定条数,会将最后的id,写入文本中,以便下次继续同步
一般写入目录在:../conf/positioner/同步表名.bat
同步数据前都会判断有没有这个文本,如果有,就查看lastPositionId,然后从那里开始同步,
比如同步数据时退出了程序,重新启动还是会先判断lastPositionId,从没同步过的数据开始同步
如需要从头开始同步则删除此文本

docker获取镜像异常并解决

异常1

Error response from daemon: Get https://index.docker.io/v1/search?q=elasticsearch&n=25: x509: certificate is valid for *.rideamigos.com, *.production.rideamigos.com, not index.docker.io

解决办法
vi /etc/docker/daemon.json
编辑daemon.json,没有就新增

    {
      "registry-mirrors":["https://docker.mirrors.ustc.edu.cn"]
    }

sudo systemctl daemon-reload
sudo systemctl restart docker

如果还是异常,可以更换其他加速器
docker官方提供 https://registry.docker-cn.com

异常2

Using default tag: latest
Error response from daemon: manifest for elasticsearch:latest not found
是未加具体的版本号

解决办法
https://hub.docker.com 查找具体的版本号,再pull
如:

     docker pull elasticsearch:6.7.1

SpringBoot添加过滤器和拦截器

过滤器和拦截器主要区别包括以下几个方面:
  1、Filter是依赖于Servlet容器,属于Servlet规范的一部分,而拦截器则是独立存在的,可以在任何情况下使用。
  2、Filter的执行由Servlet容器回调完成,而拦截器通常通过动态代理的方式来执行。
  3、Filter的生命周期由Servlet容器管理,而拦截器则可以通过IoC容器来管理,因此可以通过注入等方式来获取其他Bean的实例,因此使用会更方便。

过滤器实现

  1. 实现Filter类
  2. 添加WebFilter注解

    package com.donnie.Filter;
    
    import lombok.extern.slf4j.Slf4j;
    
    import javax.servlet.*;
    import javax.servlet.annotation.WebFilter;
    import java.io.IOException;
    
        //urlPatterns过滤需要过滤的接口
    @WebFilter(urlPatterns = "/*", filterName = "ProfileFilter")
    @Slf4j
    public class ProfileFilter implements Filter {
    
        @Override
        public void init(FilterConfig filterConfig) throws ServletException {
    
        }
    
        @Override
        public void doFilter(ServletRequest servletRequest, ServletResponse servletResponse, FilterChain filterChain) throws IOException, ServletException {
            long start = System.currentTimeMillis();
            filterChain.doFilter(servletRequest, servletResponse);
            log.info("请求响应时间为 : " + (System.currentTimeMillis() - start)+"ms");
        }
    
        @Override
        public void destroy() {
    
        }
    }


  1. 在springboot启动类上添加 @ServletComponentScan("com.donnie.Filter")

@SpringBootApplication
@ServletComponentScan("com.donnie.Filter")
public class MycatDemoApplication {

    public static void main(String[] args) {
        SpringApplication.run(MycatDemoApplication.class, args);
    }

}

拦截器实现

1.实现HandlerInterceptor接口
2.添加@Component 注解


    /**
     * 拦截请求,获取判断租户和写入ThreadLocal
     */
    @Component
    @Slf4j
    public class UserInterceptor implements HandlerInterceptor {
    
        @Override
        public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
            // 一般不可能是传参数的,这里只做演示
            // 但可以判断用户登陆的session,或cookies,或二级域名识别是哪个租户
            String tenantId = request.getParameter("tenantId");
            log.info("preHandle---->>"+tenantId);
    
            if (tenantId != null && tenantId.length() > 0) {
                MultiTenantHolder.setCurrentNode("dn" + tenantId);
            }
            return true;
        }
    
    
    
        @Override
        public void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object handler, Exception ex) throws Exception {
            MultiTenantHolder.remove();
            log.info("afterCompletion---->>");
        }
    
        @Override
        public void postHandle(HttpServletRequest request, HttpServletResponse response, Object handler, ModelAndView modelAndView) throws Exception {
            log.info("postHandle---->>");
        }
    }

3.实现WebMvcConfigurer接口
4.添加@Configuration注解
5.添加拦截器


    @Configuration
    public class WebConfigurer implements WebMvcConfigurer {
    
        @Autowired
        UserInterceptor userInterceptor;
    
        @Override
        public void addInterceptors(InterceptorRegistry registry) {
            // addPathPatterns("/**") 表示拦截所有的请求,
            // excludePathPatterns("/login", "/register") 表示除了登陆与注册之外,因为登陆注册不需要登陆也可以访问
            registry.addInterceptor(userInterceptor)
                    .addPathPatterns("/**")
                    .excludePathPatterns("/login", "/register");
        }
    }

springboot+mybatis+macat实现多租户切换访问数据库

多租户实现原理

  1. 使用ThreadLocal记录多租户的数据节点
  2. 通过Mybatis拦截器拦截到要执行的sql
  3. 将当前租户的数据节点封装成mycat注释后的sql

核心源码


    /*
     * 多租户节点持有者
     */
    public class MultiTenantHolder {
    
        //租户节点存储在ThreadLocal
        private static ThreadLocal<String> currentNodeThreadLocal = new ThreadLocal<String>();
    
        public static void setCurrentNode(String currentNode) {
            if (currentNode != null) {
                currentNodeThreadLocal.set(currentNode);
            }
        }
    
    
        public static String getCurrentNode() {
            return currentNodeThreadLocal.get();
        }
    
        /**
         * 清除本地线程变量
         */
        public static void remove() {
            currentNodeThreadLocal.remove();
        }
    }
    


Mybatis拦截器


    @Intercepts(value = {
            @Signature(type = StatementHandler.class,
                    method = "prepare",
                    args = {Connection.class,Integer.class})})
    @Slf4j
    public class MultiTenantInterceptor implements Interceptor {
    
        private static final String preState="/*!mycat:datanode=";
        private static final String afterState="*/";
    
        @Override
        public Object intercept(Invocation invocation) throws Throwable {
            StatementHandler statementHandler=(StatementHandler)invocation.getTarget();
            MetaObject metaStatementHandler= SystemMetaObject.forObject(statementHandler);
    
            //拦截要执行的sql
            String sql=(String)metaStatementHandler.getValue("delegate.boundSql.sql");
    
            //获取租户使用节点
            String node = MultiTenantHolder.getCurrentNode();
    
            if(node!=null) {
                sql = preState + node + afterState + sql;
            }
    
            log.info("添加mycat注释的sql =" + sql);
            metaStatementHandler.setValue("delegate.boundSql.sql",sql);
            Object result = invocation.proceed();
    
            MultiTenantHolder.remove();
    
            return result;
        }
    
        @Override
        public Object plugin(Object target) {
    
            return Plugin.wrap(target, this);
        }
    
        @Override
        public void setProperties(Properties properties) {
    
        }
    }

全部源码

https://github.com/donnie0915/MycatDemo

mycat使用

https://www.okhjp.com/post/47

mycat demo

这里数据库使用mysql5.7 ,mycat目前好像还没对mysql8做较好的兼容,所以使用5.7版本

创建3个数据库

    create database user01
    create database user02
    create database user03

数据库分别添加user表


    CREATE TABLE `user` (
      `id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '用户id',
      `name` varchar(50) DEFAULT NULL COMMENT '名称',
      `email` varchar(50) DEFAULT NULL COMMENT '用户邮箱',
      `sex` varchar(10) DEFAULT NULL COMMENT '性别',
      `age` int(3) DEFAULT NULL COMMENT '年龄',
      `status` tinyint(4) DEFAULT NULL COMMENT '状态(0:开启 1:禁用)',
      `description` varchar(100) DEFAULT NULL COMMENT '描述',
      `mobile` varchar(20) DEFAULT NULL COMMENT '手机号',
      PRIMARY KEY (`id`) USING BTREE
    ) ENGINE=InnoDB DEFAULT CHARSET=utf8 ROW_FORMAT=DYNAMIC COMMENT='会员表';

配置mycat

至于mycat的下载安装请移步到http://www.mycat.io/ 这里使用mycat1.6
启动mycat前这里只修改server.xml 和 schema.xml

server.xml

    <?xml version="1.0" encoding="UTF-8"?>
    <!DOCTYPE mycat:server SYSTEM "server.dtd">
    <mycat:server xmlns:mycat="http://org.opencloudb/">
        <system>
            <property name="useSqlStat">0</property>
            <property name="useGlobleTableCheck">0</property>
            <property name="sequnceHandlerType">2</property>
            <property name="handleDistributedTransactions">0</property>
            <property name="useOffHeapForMerge">1</property>
            <property name="memoryPageSize">1m</property>
            <property name="spillsFileBufferSize">1k</property>
            <property name="useStreamOutput">0</property>
            <property name="systemReserveMemorySize">384m</property>
            <property name="useZKSwitch">true</property>
        </system>
    
        <!-- 用户名:okhjp-->
        <user name="okhjp">
            <!--mycat登陆密码-->
            <property name="password">123456</property>
            <!-- 定义schema 也可以说是数据库 -->
            <property name="schemas">user_db</property>
        </user>
    
    </mycat:server>

schema.xml


<?xml version="1.0"?>
<!DOCTYPE mycat:schema SYSTEM "schema.dtd">
<mycat:schema xmlns:mycat="http://org.opencloudb/" >

    <!--user_db 对应server.xml 的 schemas -->
    <schema name="user_db" checkSQLschema="true" sqlMaxLimit="100">
        <!-- 3个节点合成一个表 -->
        <table name="user" dataNode="dn1,dn2,dn3" />
    </schema>

    <dataNode name="dn1" dataHost="host_17" database="user01" />
    <dataNode name="dn2" dataHost="host_17" database="user02" />
    <dataNode name="dn3" dataHost="host_17" database="user03" />

    <dataHost name="host_17" maxCon="1000" minCon="10" balance="0"
              writeType="0" dbType="mysql" dbDriver="native" switchType="1" slaveThreshold="100">
        <heartbeat>select user()</heartbeat>
        <!-- 设置连接mysql数据库 -->
        <writeHost
                   host="hostS1"
                   url="192.168.1.17:3306" 
                   user="root"
                   password="123456"
        />
    </dataHost>

</mycat:schema>

启动mycat

./mycat restart

连接mycat

点击连接测试,可以测试一下是否连接成功

成功后出现一个user表

使用mycat

1.查看节点

 EXPLAIN select * from user

可以看到3个节点,分别是dn1,dn2,dn3

2.往user02插入数据

    /*!mycat:datanode=dn2*/ INSERT INTO `user`(name,email,sex,age,status,description,mobile)
    values("黄一","test@163.com","男","18",1,"demo","13316990000")

注意如果不加mycat注释,会全部节点插入数据

3.查询dn2节点的数据
/!mycat:datanode=dn2/select * from user

注意如果不加mycat注释,会查全部节点的数据

远程访问mysql8.0

从mysql5.6换到mysql8.0注意点

1.连接测试的时候就会报错:client does not support authentication protocol requested by server; consider upgrading MySQL client,这里的错误信息就是不支持身份认证方式,需要配置认证方式,默认的密码加密方式是:caching_sha2_password,而现在很多客户端工具还不支持这种加密认证方式。
所以需要在my.conf加入:

    default_authentication_plugin=mysql_native_password

2.分配权限 使用以前方式会出现异常
GRANT ALL PRIVILEGES ON . TO 'root'@'%' IDENTIFIED BY '123456' WITH GRANT OPTION;
ERROR 1064 (42000): You have an error in your SQL syntax; check the manual that corresponds to your MySQL server version for the right syntax to use near 'IDENTIFIED BY '123456' WITH GRANT OPTION' at line 1
正确分配如下:


    GRANT ALL PRIVILEGES ON *.* TO 'root'@'%'  WITH GRANT OPTION;
    FLUSH   PRIVILEGES;

3.客户端连接会出现:this authentication plugin is not supported,需要重新更新密码


    ALTER USER ‘root’@’%’ IDENTIFIED WITH mysql_native_password BY ‘123456’;

    flush privileges;

注意是使用mysql_native_password对新密码进行编码
注意在连接mysql的url上加上&allowNativePasswords=true
如beego


    root:123456@tcp(192.168.1.17:3306)/db?charset=utf8&allowNativePasswords=true

VirtualBox克隆Centos7.6后修改静态ip异常

一,修改静态ip启动异常

异常如下


    [root@localhost network-scripts]#  systemctl restart network
    Job for network.service failed because the control process exited with error code. See "systemctl status network.service" and "journalctl -xe" for details.
    [root@localhost network-scripts]# systemctl status network.service
    ● network.service - LSB: Bring up/down networking
       Loaded: loaded (/etc/rc.d/init.d/network; bad; vendor preset: disabled)
       Active: failed (Result: exit-code) since Wed 2019-04-10 11:06:17 CST; 8s ago
         Docs: man:systemd-sysv-generator(8)
    TYPE="Ethernet"
      Process: 7025 ExecStart=/etc/rc.d/init.d/network start (code=exited, status=1/FAILURE)
    
    Apr 10 11:06:17 localhost.localdomain network[7025]: RTNETLINK answers: File exists
    Apr 10 11:06:17 localhost.localdomain network[7025]: RTNETLINK answers: File exists
    Apr 10 11:06:17 localhost.localdomain network[7025]: RTNETLINK answers: File exists
    Apr 10 11:06:17 localhost.localdomain network[7025]: RTNETLINK answers: File exists
    Apr 10 11:06:17 localhost.localdomain network[7025]: RTNETLINK answers: File exists
    Apr 10 11:06:17 localhost.localdomain network[7025]: RTNETLINK answers: File exists
    Apr 10 11:06:17 localhost.localdomain systemd[1]: network.service: control process exited, code=exited status=1
    Apr 10 11:06:17 localhost.localdomain systemd[1]: Failed to start LSB: Bring up/down networking.
    Apr 10 11:06:17 localhost.localdomain systemd[1]: Unit network.service entered failed state.
    Apr 10 11:06:17 localhost.localdomain systemd[1]: network.service failed.

二,原因

网卡配置名不统一,如下几图


三,统一配置名称并修改静态ip

  1. cd /etc/sysconfig/network-scripts/

  2. mv ifcfg-ens160 ifcfg-enp0s3

  3. vi ifcfg-enp0s3


    TYPE="Ethernet"
    PROXY_METHOD="none"
    BROWSER_ONLY="no"
    BOOTPROTO="static" #静态IP
    DEFROUTE="yes"
    IPV4_FAILURE_FATAL="yes"
    IPV6INIT="yes"
    IPV6_AUTOCONF="yes"
    IPV6_DEFROUTE="yes"
    IPV6_FAILURE_FATAL="no"
    IPV6_ADDR_GEN_MODE="stable-privacy"
    UUID="da69ad28-4fc3-4858-972d-0455ac92d569"
    PREFIX="24"
    IPV6_PRIVACY="no"
    NAME="enp0s3"  #都需要保持一致,不然会出现重启异常
    DEVICE="enp0s3" #
    ONBOOT="yes" #开机启动
    IPADDR="192.168.1.17" #本机地址
    GATEWAY="192.168.1.1"  #默认网关
    NETMASK=255.255.255.0 #子网掩码

4.再重启 systemctl restart network

java应用简单部署在Docker上

准备文件

  1. 准备好一个已经编译好的eureka.jar

  2. 创建Dockerfile

    #直接下载镜像,但镜像较大,如需要小的可以换其他版本,也可以使用其他镜像仓库,如网易云上的
    FROM java:8
    
    # 拷贝文件到容器,需要在同一目录下
    ADD eureka.jar app.jar
    
    #声明需要暴露的端口
    EXPOSE 8761
    
    #配置容器启动后执行的命令
    ENTRYPOINT ["java","-jar","/app.jar"]

构建镜像

eureka.jar 与Dockerfile必须放在同一目录下
镜像取名:eureka/eureka
. : 当前目录

docker build -t eureka/eureka .

查看镜像

docker images

运行镜像

docker run -p 18761:8761 -d eureka/eureka

删除镜像

删除镜像需要先停止容器,删除容器,再删除镜像

    //停止容器,docker ps可以查看容器id
docker stop 1938f612d643(容器id)

    //删除容器,docker ps -a 可以查看容器id
    docker rm 1938f612d643(容器id)

    //删除镜像
   docker rmi eureka/eureka

docker上安装mysql8.0

安装并运行mysql步骤

  1. 先查询一下mysql
    docker search mysql

  2. pull mysql 版本是8.0
    docker pull mysql:8.0

  3. 创建挂载目录,持久化数据,日志,配置
    mkdir -p /data/mysql/data /data/mysql/logs /data/mysql/conf
    cd /data/mysql/conf
    vi my.conf

    
    
            [mysqld]    
    
    
        pid-file        = /var/run/mysqld/mysqld.pid
        socket          = /var/run/mysqld/mysqld.sock
        datadir         = /var/lib/mysql
        secure-file-priv= NULL
        # Disabling symbolic-links is recommended to prevent assorted security risks
        symbolic-links=0
    
    
        port = 3306
        character-set-server=utf8
        #忘记密码时使用
        #skip-grant-tables
        #设置协议认证方式 
        default_authentication_plugin=mysql_native_password
        [mysql]
        default-character-set=utf8
    
    
    
  4. 运行mysql

docker run -p 3306:3306 --name mysql -v $PWD/conf/my.conf:/etc/mysql/my.conf -v $PWD/logs:/logs -v $PWD/data:/var/lib/mysql -e MYSQL_ROOT_PASSWORD=123456 -d mysql:8.0

使用mysql

  1. 进入mysql容器
    docker exec -it mysql /bin/bash
    mysql -uroot -p

  2. 容器外进入mysql
    docker exec -it mysql mysql -uroot -p

  3. 停止mysql
    docker stop mysql

  4. 启动mysql
    docker start mysql

  5. 修改端口映射
    vim /var/lib/docker/containers/{容器id}/hostconfig.json
    修改PortBindings参数配置,宿主机13306端口映射容器3306端口示例:
    “PortBindings”:{“3306/tcp”:[{“HostIp”:””,”HostPort”:”13306″}]},”

参考资料

https://hub.docker.com/_/mysql
http://www.runoob.com/docker/docker-install-mysql.html
https://www.one234.com/share/973/update-docker-container-port-map/

阿里云Centos7.6安装docker

在Centos7上安装docker

  1. 国际惯例,先安装依赖
    yum install -y yum-utils device-mapper-persistent-data lvm2

  2. 设置存储库
    yum-config-manager --add-repo https://download.docker.com/linux/centos/docker-ce.repo

  3. 开始安装
    yum install docker-ce

  4. 启动docker
    systemctl start docker

  5. 查看docker版本
    docker version 轻松安装成功

  6. 又国际惯例,运行一下hello-world表示安装成功
    docker run hello-world

参考资料

https://my.oschina.net/shyloveliyi/blog/1616025

注意:Centos6安装Docker血泪史

1.目前能在Centos6上能装的版本是1.7.1 ,但是版本低下,有很多镜像不支持
2.Centos6内核低,装不上最新Docker版本
3.有网上说Centos6上升级内核,(亲测,系统可能启动不了。系统崩。。。。。)
4.也有反应即使升级成功,在运行容器时,极不稳定,会无缘无故停止

墙裂不推荐Centos6上搞Docker

使用maven插件生成protobuf文件

配置依赖

下面代码放入pom.xml里

    <properties>
            <protobuf.version>3.6.1</protobuf.version>
        </properties>
    
        <dependencies>
            <dependency>
                <groupId>com.google.protobuf</groupId>
                <artifactId>protobuf-java</artifactId>
                <version>${protobuf.version}</version>
            </dependency>
        </dependencies>
        
        <build>
            <extensions>
                <extension>
                    <groupId>kr.motd.maven</groupId>
                    <artifactId>os-maven-plugin</artifactId>
                    <version>1.5.0.Final</version>
                </extension>
            </extensions>
            <plugins>
    
                <!-- proto 编译组件 -->
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-dependency-plugin</artifactId>
                    <executions>
                        <execution>
                            <id>copy-protoc</id>
                            <phase>generate-sources</phase>
                            <goals>
                                <goal>copy</goal>
                            </goals>
                            <configuration>
                                <artifactItems>
                                    <artifactItem>
                                        <groupId>com.google.protobuf</groupId>
                                        <artifactId>protoc</artifactId>
                                        <version>${protobuf.version}</version>
                                        <classifier>${os.detected.classifier}</classifier>
                                        <type>exe</type>
                                        <overWrite>true</overWrite>
                                        <!-- 生成的Java类目录-->
                                        <outputDirectory>${project.build.directory}</outputDirectory>
                                    </artifactItem>
                                </artifactItems>
                            </configuration>
                        </execution>
                    </executions>
                </plugin>
    
                <plugin>
                    <groupId>org.xolstice.maven.plugins</groupId>
                    <artifactId>protobuf-maven-plugin</artifactId>
                    <version>0.5.1</version>
                    <extensions>true</extensions>
                    <configuration>
                        <!-- porto源文件-->
                        <protoSourceRoot>${project.basedir}/src/main/java/com/donnie/proto</protoSourceRoot>
                        <protocArtifact>com.google.protobuf:protoc:${protobuf.version}:exe:${os.detected.classifier}</protocArtifact>
                        <pluginId>grpc-java</pluginId>
                        <pluginArtifact>io.grpc:protoc-gen-grpc-java:1.0.0:exe:${os.detected.classifier}</pluginArtifact>
                    </configuration>
                    <executions>
                        <execution>
                            <goals>
                                <goal>compile</goal>
                                <goal>compile-custom</goal>
                                <goal>test-compile</goal>
                            </goals>
                        </execution>
                    </executions>
                </plugin>
    
            <!-- 编译jar包的jdk版本 -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.1</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
    
            </plugins>
        </build>

proto文件

user.porto 放入com.donnie.proto目录

    syntax = "proto3";
    option java_package = "com.donnie.proto";
    option java_outer_classname = "UserModel";
    
    message User {
         int32 id = 1;
         string name = 2;
         string email = 3;
         string sex = 4;
    }

测试类


    import com.donnie.proto.UserModel;
    
    public class Test {
    
        public static void main(String[] args) {
            UserModel.User.Builder user = UserModel.User.newBuilder();
            user.setId(1);
            user.setSex("男");
            user.setEmail("oox@163.com");
    
            System.out.println(user);
        }
    }

intellij Idea 安装protobuf插件

安装插件protobuf Support,效果如图:

使用docker安装kafka

一,下载镜像

    docker pull wurstmeister/zookeeper 
    docker pull wurstmeister/kafka

二, 运行镜像

  1. 运行zookeeper
    -d 后台运行
    --name: 服务名称叫zookeeper (很重要)
    -p :端口

    docker run -d --name zookeeper -p 2181 -t wurstmeister/zookeeper

  1. 运行zookeeper
    --name:服务名叫 kafka
    --publish:同-p 端口映射
    --link:可以用来链接2个容器,使得源容器和接收容器之间可以互相通信,如下面让kafka可以访问zookeeper容器
    --env :设置环境变量
    --volume:同-v,目录挂载
    docker run -d --name kafka --publish 9092:9092 --link zookeeper --env KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 --env KAFKA_ADVERTISED_HOST_NAME=127.0.0.1 --env KAFKA_ADVERTISED_PORT=9092 --volume /data/data/localtime:/etc/localtime wurstmeister/kafka:latest

三,进入容器

docker ps 是查看容器运行id,或容器名称


    docker ps 
    docker exec -it kafka /bin/bash

四,进入kafka默认目录

    cd /opt/kafka

五,创建主题

    bin/kafka-topics.sh --create --zookeeper zookeeper:2181 --replication-factor 1 --partitions 1 --topic hikafka

注意在容器外面可以这么操作,如下:

    docker exec kafka /opt/kafka/bin/kafka-topics.sh --create --zookeeper zookeeper:2181 --replication-factor 1 --partitions 1 --topic hikafka

六,运行一个消生产者,指定topic为刚刚创建的主题

    bin/kafka-console-producer.sh --broker-list localhost:9092 --topic hikafka

七,运行一个消费者,指定同样的主题

    bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic hikafka --from-beginning

注意新版本的kafka,消费不再使用--zookeeper zookeeper:2181

容器外:

    docker exec kafka /opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic hikafka --from-beginning

参考资料

https://johng.cn/install-kafka-with-docker/
https://stackoverflow.com/questions/45025329/consumer-connecting-to-zookeeper-not-broker-for-message-consumption
https://gist.github.com/abacaphiliac/f0553548f9c577214d16290c2e751071
https://www.jianshu.com/p/21d66ca6115e

在MAC系统上安装docker并运行redis

下载

  1. 复制链接 https://hub.docker.com/editions/community/docker-ce-desktop-mac
  2. 在浏览器打开
  3. 下载稳定版,如图:

安装 docker

拖动到Applications即可安装

导航栏上出现docker 图标说明已经安装成功

运行redis

  1. docker pull redis
  2. docker run -p 16379:6379 -v /data:/data -d redis redis-server --appendonly yes
    可能会出现以下错误,
    docker: Error response from daemon: Mounts denied: 
    The path /data
    is not shared from OS X and is not known to Docker.
    You can configure shared paths from Docker -> Preferences... -> File Sharing.

就是说/data目录未加到file sharing中如图,只有加入到file sharing中即可正常启动redis

    localhost:data donnie$ docker ps
    CONTAINER ID        IMAGE               COMMAND                  CREATED             STATUS              PORTS                     NAMES
    ccba72df8821        redis               "docker-entrypoint.s…"   12 seconds ago      Up 11 seconds       0.0.0.0:16379->6379/tcp   loving_bohr

-p :端口映射 docker内端口6379,对外端口16379
-v :目录挂载,宿主目录:容器挂载的目录,有时候容器里面运行的产生的数据(如mysql)或者配置项(如nginx的nginx.conf)我们又需要保存起来的,因而我们需要对容器某些修改的数据进行挂载。
-d:开启Daemon模式,即后台运行服务
--appendonly yes :打开redis持久化配置

AI源码,Netty高可用版,估值一个亿

效果图

源码

服务端:

    public final class Server {
    
        public  static void main(String[] args) throws Exception {
            EventLoopGroup bossGroup = new NioEventLoopGroup(1);
            EventLoopGroup workerGroup = new NioEventLoopGroup();
            try {
                ServerBootstrap b = new ServerBootstrap();
                b.group(bossGroup,workerGroup)
                        .channel(NioServerSocketChannel.class)
                        .handler(new LoggingHandler(LogLevel.INFO))
                        .childHandler(new ServerInitializer());
                ChannelFuture f = b.bind(8888);
                f.channel().closeFuture().sync();
            } finally {
                bossGroup.shutdownGracefully();
                workerGroup.shutdownGracefully();
            }
        }
    }

服务端Handler:

    @Sharable
    public class ServerHandler extends SimpleChannelInboundHandler<String> {
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            // 为新连接发送庆祝
            ctx.write("Welcome to " + InetAddress.getLocalHost().getHostName() + "!\r\n");
            ctx.write("It is " + new Date() + " now.\r\n");
            ctx.flush();
        }
    
        @Override
        public void channelRead0(ChannelHandlerContext ctx, String request) throws Exception {
            // Generate and write a response.
            String response;
            boolean close = false;
            if (request.isEmpty()) {
                response = "请说点话吧 \r\n";
            } else if ("bye".equals(request.toLowerCase())||"再见".equals(request)) {
                response = "再见,下次见\r\n";
                close = true;
            } else {
                response =  request + " \r\n";
                response= response.replace("吗", "");
                response= response.replace("?", "!");
                response= response.replace("?", "!");
            }
    
            ChannelFuture future = ctx.write(response);
    
            if (close) {
                future.addListener(ChannelFutureListener.CLOSE);
            }
        }
    
        @Override
        public void channelReadComplete(ChannelHandlerContext ctx) {
            ctx.flush();
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
            cause.printStackTrace();
            ctx.close();
        }
    }


    public class ServerInitializer extends ChannelInitializer<SocketChannel> {
        private static final StringDecoder DECODER = new StringDecoder();
        private static final StringEncoder ENCODER = new StringEncoder();
    
        private static final ServerHandler SERVER_HANDLER = new ServerHandler();
    
        @Override
        public void initChannel(SocketChannel ch) throws Exception {
            ChannelPipeline pipeline = ch.pipeline();
    
            // Add the text line codec combination first,
            pipeline.addLast(new DelimiterBasedFrameDecoder(8192, Delimiters.lineDelimiter()));
            // the encoder and decoder are static as these are sharable
            pipeline.addLast(DECODER);
            pipeline.addLast(ENCODER);
    
            // and then business logic.
            pipeline.addLast(SERVER_HANDLER);
        }
    } 

客户端:

        public final class Client {
        public static void main(String[] args) throws Exception {
            EventLoopGroup group = new NioEventLoopGroup();
            try {
                Bootstrap b = new Bootstrap();
                b.group(group)
                        .channel(NioSocketChannel.class)
                        .handler(new ClientInitializer());
                Channel ch = b.connect("127.0.0.1",8888).sync().channel();
    
    
                ChannelFuture lastWriteFuture = null;
                BufferedReader in = new BufferedReader(new InputStreamReader(System.in));
                for (;;) {
                    String line = in.readLine();
                    if (line == null) {
                        break;
                    }
    
                    // Sends the received line to the server.
                    lastWriteFuture = ch.writeAndFlush(line + "\r\n");
    
                    // If user typed the 'bye' command, wait until the server closes
                    // the connection.
                    if ("bye".equals(line.toLowerCase())) {
                        ch.closeFuture().sync();
                        break;
                    }
                }
    
                // Wait until all messages are flushed before closing the channel.
                if (lastWriteFuture != null) {
                    lastWriteFuture.sync();
                }
            } finally {
                group.shutdownGracefully();
            }
        }
    }

客户端handler:

    @Sharable
    public class ClientHandler extends SimpleChannelInboundHandler<String> {
        @Override
        protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
            System.err.println(msg);
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
            cause.printStackTrace();
            ctx.close();
        }
    }
  
    public class ClientInitializer extends ChannelInitializer<SocketChannel> {
        private static final StringDecoder DECODER = new StringDecoder();
        private static final StringEncoder ENCODER = new StringEncoder();
    
        private static final ClientHandler CLIENT_HANDLER = new ClientHandler();
    
    
        @Override
        public void initChannel(SocketChannel ch) {
            ChannelPipeline pipeline = ch.pipeline();
            pipeline.addLast(new DelimiterBasedFrameDecoder(8192, Delimiters.lineDelimiter()));
            pipeline.addLast(DECODER);
            pipeline.addLast(ENCODER);
    
            pipeline.addLast(CLIENT_HANDLER);
        }
    }

哈哈哈,祝各位看观愚人节快乐!

Netty Demo

服务端:

    public class Server {
    
        /**
         * 开启服务的方法
         */
        public void StartNetty(){
    
            /**
             *创建两个EventLoop的组,EventLoop 这个相当于一个处理线程,
             是Netty接收请求和处理IO请求的线程。
    
            相关资料:NioEventLoopGroup是一个处理I/O操作的多线程事件循环。
            Netty为不同类型的传输提供了各种EventLoopGroup实现。
            在本例中,我们正在实现一个服务器端应用程序,因此将使用两个NioEventLoopGroup。
            第一个,通常称为“boss”,接受传入的连接。
            第二个,通常称为“worker”,当boss接受连接并注册被接受的连接到worker时,处理被接受连接的流量。
            使用了多少线程以及如何将它们映射到创建的通道取决于EventLoopGroup实现,甚至可以通过构造函数进行配置。
            */
            EventLoopGroup bossGroup = new NioEventLoopGroup();
            EventLoopGroup workerGroup = new NioEventLoopGroup();
    
            try {
                //1、创建启动类
                ServerBootstrap bootstrap = new ServerBootstrap();
                //2,配置启动参数等
                /**
                 * 设置循环线程组,前者用于处理客户端连接事件,后者用于处理网络IO(server使用两个参数这个)
                 */
                bootstrap.group(bossGroup, workerGroup);
                /**
                 * 设置选项
                 * 参数:Socket的标准参数(key,value)
                 */
                bootstrap.option(ChannelOption.SO_BACKLOG, 1024);
    
                //用于构造socketchannel工厂
                bootstrap.channel(NioServerSocketChannel.class);
    
                /**
                 * 传入自定义客户端Handle(服务端在这里搞事情)
                 */
                bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel socketChannel) throws Exception {
                        // 注册handler
                        socketChannel.pipeline().addLast(new NettyServerHandler());
                    }
                });
    
                /**
                 * 绑定端口,开始接收进来的连接
                 */
                ChannelFuture channelFuture = bootstrap.bind("127.0.0.1", 9999).sync();
    
                // 等待服务器 socket 关闭 。
                channelFuture.channel().closeFuture().sync();
    
            }catch (Exception e){
                e.printStackTrace();
            }finally {
    
                workerGroup.shutdownGracefully();
                bossGroup.shutdownGracefully();
            }
    
    
    
        }
    
    
        public static void main(String[] args) {
    
    
            new Server().StartNetty();
        }
    
    
    }

服务端Handler:

    public class NettyServerHandler extends ChannelInboundHandlerAdapter {
    
        /**
         * 本方法用于读取客户端发送的信息
         * @param ctx
         * @param msg
         * @throws Exception
         */
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    
            ByteBuf result = (ByteBuf) msg;
            byte[] result1 = new byte[result.readableBytes()];
            // msg中存储的是ByteBuf类型的数据,把数据读取到byte[]中
            result.readBytes(result1);
            String resultStr = new String(result1);
            // 接收并打印客户端的信息
            System.out.println("Client said:" + resultStr);
            // 释放资源,这行很关键
            result.release();
    
            // 向客户端发送消息
            String response = "hello client! I am Server";
            // 在当前场景下,发送的数据必须转换成ByteBuf数组
            ByteBuf encoded = ctx.alloc().buffer(4 * response.length());
            encoded.writeBytes(response.getBytes());
            ctx.write(encoded);
            ctx.flush();
        }
    
        /**
         * 信息获取完毕后操作
         * @param ctx
         * @throws Exception
         */
        @Override
        public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
            ctx.flush();
        }
    
        /**
         * 本方法用作处理异常
         * @param ctx
         * @param cause
         * @throws Exception
         */
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            // 当出现异常就关闭连接
            cause.printStackTrace();
            ctx.close();
        }
    }

客户端:

    public class Client {
    
    
        public void connect(String ip, int port) throws Exception {
            EventLoopGroup worker = new NioEventLoopGroup();
    
            try {
                Bootstrap bootstrap = new Bootstrap();
    
                //EventLoop的组
                bootstrap.group(worker);
                //用于构造socketchannel工厂
                bootstrap.channel(NioSocketChannel.class);
    
                //设置选项
                bootstrap.option(ChannelOption.SO_KEEPALIVE, true);
                bootstrap.handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel socketChannel) throws Exception {
                        socketChannel.pipeline().addLast(new NettyClientHandler());
                    }
                });
    
                /** 开启客户端监听*/
                ChannelFuture f = bootstrap.connect(ip, port).sync();
    
                /**等待数据直到客户端关闭*/
                f.channel().closeFuture().sync();
    
            } catch (Exception e) {
    
            }finally {
                worker.shutdownGracefully();
            }
    
        }
    
        public static void main(String[] args) throws Exception {
    
            Client client=new Client();
            client.connect("127.0.0.1", 9999);
        }
    
    }

客户端handler:

    public class NettyClientHandler extends ChannelInboundHandlerAdapter {
    
        /**
         * 本方法用于接收服务端发送过来的消息
         * @param ctx
         * @param msg
         * @throws Exception
         */
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    
            ByteBuf result = (ByteBuf) msg;
            byte[] result1 = new byte[result.readableBytes()];
            result.readBytes(result1);
            System.out.println("Server said:" + new String(result1));
            result.release();
    
        }
    
        /**
         * 本方法用于向服务端发送信息
         * @param ctx
         * @throws Exception
         */
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            String msg = "hello Server,I am client";
            ByteBuf encoded = ctx.alloc().buffer(4 * msg.length());
            encoded.writeBytes(msg.getBytes());
            ctx.write(encoded);
            ctx.flush();
        }
    
        /**
         * 本方法用于处理异常
         * @param ctx
         * @param cause
         * @throws Exception
         */
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
    
            // 当出现异常就关闭连接
            cause.printStackTrace();
    
            ctx.close();
        }
    }

CountDownLatch Demo

    import java.util.concurrent.CountDownLatch;
    import java.util.concurrent.TimeUnit;
    
    public class CountDownLatchDemo {
    
        public static void main(String[] args) throws InterruptedException {
    
            //初始化2
            CountDownLatch countDownLatch = new CountDownLatch(2) {
                @Override
                public void await() throws InterruptedException {
                    super.await();
                    System.out.println(Thread.currentThread().getName() + " count down is ok");
                }
            };
    
            Thread t1 = new Thread(()-> {
                try {
                    TimeUnit.SECONDS.sleep(1);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(Thread.currentThread().getName() );
                countDownLatch.countDown(); //减1
            }, "t1");
    
            Thread t2 = new Thread(()-> {
                try {
                    TimeUnit.SECONDS.sleep(1);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(Thread.currentThread().getName());
                countDownLatch.countDown(); //减1
            }, "t2");
    
    
            t2.start();
            t1.start();
    
            countDownLatch.await();
        }
    
    }

CyclicBarrier Demo

    public class CyclicBarrierDemo {
    
        public static void main(String[] args) {
    
            //CyclicBarrier循环屏障,未达到屏障数需要等待一起执行
            CyclicBarrier cb= new CyclicBarrier(2);
    
            new Thread(()-> {
                try {
                    cb.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (BrokenBarrierException e) {
                    e.printStackTrace();
                }
                System.out.println("e1:"+Thread.currentThread());
            },"t1").start();
    
           new Thread(()->{
    
               try {
                   TimeUnit.SECONDS.sleep(5);
                   cb.await();
               } catch (InterruptedException e) {
                   e.printStackTrace();
               } catch (BrokenBarrierException e) {
                   e.printStackTrace();
               }
               System.out.println("e2:"+Thread.currentThread());
           },"t2").start();
    
    
        }
    }

volatile Demo

volatile是对内存的可见性,会强制将线程的工作内存刷新到主内存。
下面代码示例,如果将volatile去掉,就会造成线程t2进入死循环。

     import java.util.ArrayList;
    import java.util.List;
    import java.util.concurrent.TimeUnit;
    
    public class VolatileDemo {
    
        //如果没有volatile修饰,t2线程一直循环下去,因为不会主动的将t1的工作内存刷到主内存。
        private volatile static List list =new ArrayList();
    
        public void add() {
            list.add("test data");
        }
    
        public int size() {
            return list.size();
        }
    
        public static void main(String[] args) {
    
            final VolatileDemo listData = new VolatileDemo();
    
            Thread t1=new Thread(()->{
                try {
                    for (int i = 0; i < 20; i++) {
                        listData.add();
                        System.out.println("当前线程:" + Thread.currentThread().getName() + " 添加了一个元素");
                        TimeUnit.SECONDS.sleep(1);
                    }
    
                }catch (InterruptedException e){
                    e.printStackTrace();
                }
            },"t1");
    
            Thread t2=new Thread(()-> {
                while (true) {
    
                    if (listData.size() == 4) {
                        System.out.println("当前线程收到通知 " + Thread.currentThread().getName() + " list size = 5 停止线程");
                        break;
                    }
    
                }
            }, "t2");
    
            t1.start();
            t2.start();
        }
    }   

BlockingQueue

        import java.util.LinkedList;
        import java.util.concurrent.TimeUnit;
        import java.util.concurrent.atomic.AtomicInteger;

            public class MyBlockingQueue {
    
        //用于装载元素的集合
        private LinkedList<Object> list = new LinkedList<>();
    
        //计数器
        private AtomicInteger count = new AtomicInteger();
    
        //需要定制上下限
        private final int maxSize ;  //使用final需要初始化数据,1.直接赋值,2,或构造方法赋值,否则会编译错误
    
        private final int minSize = 0;
    
        //构造方法
        public MyBlockingQueue(int maxSize) {
            this.maxSize = maxSize;
        }
    
        //初始化一个对象锁
        private final Object lock = new Object();
    
        public void put(Object obj) {
    
            synchronized (lock) {
    
                //如果计数超过最大容量,用锁等待
                while (count.get() == maxSize) {
                    try {
                        lock.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
    
                }
    
                list.add(obj);
                count.incrementAndGet();
    
                lock.notify();
    
            }
    
        }
    
    
        public Object take() {
    
            Object value = null;
            synchronized (lock) {
    
                //如果拿到没有值,就开启锁阻塞等待put放入值
                while (count.get() == minSize) {
                    try {
                        lock.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
    
                value = list.removeFirst();
                count.decrementAndGet();
                lock.notify();
                System.out.println("移除元素:" + value);
    
            }
    
            return value;
    
        }
    
    
        public int getSize() {
           return this.count.get();
        }
    
        public static void main(String[] args) {
    
            MyBlockingQueue myQueue = new MyBlockingQueue(5);
            myQueue.put("a");
            myQueue.put("b");
            myQueue.put("c");
            myQueue.put("d");
            myQueue.put("e");
            System.out.println(myQueue.getSize());
    
            Thread t1 =new Thread(new Runnable() {
                @Override
                public void run() {
                    myQueue.put("f");
                    myQueue.put("g");
                }
            });
            t1.start();
    
            Thread t2 = new Thread(new Runnable(){
    
                @Override
                public void run(){
                    myQueue.take();
                    myQueue.take();
                }
            });
    
    
            try {
                TimeUnit.SECONDS.sleep(2);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
    
            t2.start();
    
        }
    
    }