铁匠 铁匠
首页
golang
java
架构
常用算法
  • Java
  • nginx
  • 系统运维
  • 系统安全
  • mysql
  • redis
参考文档
关于
收藏
  • 分类
  • 标签
  • 归档
GitHub (opens new window)

铁匠

不予评判的专注当下
首页
golang
java
架构
常用算法
  • Java
  • nginx
  • 系统运维
  • 系统安全
  • mysql
  • redis
参考文档
关于
收藏
  • 分类
  • 标签
  • 归档
GitHub (opens new window)
  • 概览

  • 设计模式

  • 性能优化

  • 分布式

    • zookeeper

      • zookeeper介绍
      • zookeeper经典使用场景
    • 网关

    • 流量治理

    • 数据治理

    • 云原生

    • 架构
    • 分布式
    • zookeeper
    FengJianxin
    2019-07-20
    目录

    zookeeper经典使用场景

    # leader选举

    场景:某个服务有多个worker服务,当其中一个服务挂掉后,其他worker能够自动接替任务继续执行,解决服务单点问题。例如监控网站流量的服务,通过解析access log日志,分析当前网站流量,当达到一定阈值时,发出报警短信。

    实现原理 zk-leader

    示例代码(参考:https://github.com/apache/curator/blob/master/curator-examples/src/main/java/leader/ExampleClient.java (opens new window))

    ServerNode.java

    import java.io.Serializable;
    
    public class ServerNode implements Serializable {
    
        private static final long serialVersionUID = -522933808813265389L;
    
        private int id;
        private String name;
        private boolean active = true;
    
        public ServerNode(int id) {
            this.id = id;
            this.name = "Client #" + id;
        }
    
        public ServerNode(int id, String name) {
            this.id = id;
            this.name = name;
        }
    
        public int getId() {
            return id;
        }
    
        public void setId(int id) {
            this.id = id;
        }
    
        public String getName() {
            return name;
        }
    
        public void setName(String name) {
            this.name = name;
        }
    
        public boolean isActive() {
            return active;
        }
    
        public void setActive(boolean active) {
            this.active = active;
        }
    
        @Override
        public String toString() {
            return "ServerNode{" + "id=" + id + ", name='" + name + '\'' + ", active=" + active + '}';
        }
    }
    
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50

    WorkServer.java

    import java.io.Serializable;
    /**
     * 服务节点信息
     */
    public class ServerNode implements Serializable {
    
        private static final long serialVersionUID = -522933808813265389L;
    
        private int id;
        private String name;
        private boolean active = false;
    
        public ServerNode(int id) {
            this.id = id;
            this.name = "Client #" + id;
        }
    
        public ServerNode(int id, String name) {
            this.id = id;
            this.name = name;
        }
    
        public int getId() {
            return id;
        }
    
        public void setId(int id) {
            this.id = id;
        }
    
        public String getName() {
            return name;
        }
    
        public void setName(String name) {
            this.name = name;
        }
    
        public boolean isActive() {
            return active;
        }
    
        public void setActive(boolean active) {
            this.active = active;
        }
    
        @Override
        public String toString() {
            return "ServerNode{" + "id=" + id + ", name='" + name + '\'' + ", active=" + active + '}';
        }
    }
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51

    WorkServer.java

    import org.apache.curator.framework.CuratorFramework;
    import org.apache.curator.framework.recipes.leader.LeaderSelector;
    import org.apache.curator.framework.recipes.leader.LeaderSelectorListenerAdapter;
    import java.io.Closeable;
    import java.util.concurrent.TimeUnit;
    import java.util.concurrent.atomic.AtomicInteger;
    /**
     * worker服务
     */
    public class WorkServer extends LeaderSelectorListenerAdapter implements Closeable {
    
        private final ServerNode node;
        private final LeaderSelector leaderSelector;
        private final AtomicInteger leaderCount = new AtomicInteger();
    
        public WorkServer(CuratorFramework client, String path, int id) {
            this.node = new ServerNode(id);
            // 创建leader选举对象
            leaderSelector = new LeaderSelector(client, path, this);
            // 在释放leader权限后自动重新参加选举
            leaderSelector.autoRequeue();
        }
    
        public void start() {
            System.out.println(this.node.getName() + " start...");
            // 服务启动
            leaderSelector.start();
        }
    
        @Override
        public void close() {
            System.out.println(this.node.getName() + " close...");
            // 关闭服务
            leaderSelector.close();
        }
    
        /**
         * 争抢到leader权限后的回调方法
         */
        @Override
        public void takeLeadership(CuratorFramework client) {
            this.node.setActive(true);
            String name = this.node.getName();
            try {
                final int waitSeconds = (int) (5 * Math.random()) + 1;
                System.out.println(name + " is now the leader. Waiting " + waitSeconds + " seconds...");
                System.out.println(name + " has been leader " + leaderCount.getAndIncrement() + " time(s) before.");
                Thread.sleep(TimeUnit.SECONDS.toMillis(waitSeconds));
            } catch (InterruptedException e) {
                System.err.println(name + " was interrupted.");
                Thread.currentThread().interrupt();
            } finally {
                // 执行完毕,释放leader权限
                System.out.println(name + " relinquishing leadership.\n");
            }
        }
    }
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57

    LeaderSelectorMain.java

    import com.google.common.collect.Lists;
    import org.apache.curator.utils.CloseableUtils;
    import org.apache.curator.framework.CuratorFramework;
    import org.apache.curator.framework.CuratorFrameworkFactory;
    import org.apache.curator.retry.ExponentialBackoffRetry;
    import java.io.BufferedReader;
    import java.io.InputStreamReader;
    import java.util.List;
    /**
     * 服务选举测试
     */
    public class LeaderSelectorMain {
        private static final int CLIENT_QTY = 10;
    
        private static final String PATH = "/examples/leader";
    
        public static void main(String[] args) throws Exception {
            List<CuratorFramework> clients = Lists.newArrayList();
            List<WorkServer> workServers = Lists.newArrayList();
            try {
                for (int i = 0; i < CLIENT_QTY; ++i) {
                    CuratorFramework client = CuratorFrameworkFactory.newClient("127.0.0.1:2181",
                            new ExponentialBackoffRetry(1000, 3));
                    clients.add(client);
                    WorkServer workServer = new WorkServer(client, PATH, i);
                    workServers.add(workServer);
                    client.start();
                    workServer.start();
                }
    
                System.out.println("回车键退出\n");
                new BufferedReader(new InputStreamReader(System.in)).readLine();
            } finally {
                System.out.println("Shutting down...");
                for (WorkServer workServer : workServers) {
                    CloseableUtils.closeQuietly(workServer);
                }
                for (CuratorFramework client : clients) {
                    CloseableUtils.closeQuietly(client);
                }
            }
        }
    }
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43

    依赖

    <dependency>
        <groupId>org.apache.curator</groupId>
        <artifactId>curator-framework</artifactId>
        <version>2.12.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.curator</groupId>
        <artifactId>curator-recipes</artifactId>
        <version>2.12.0</version>
    </dependency>
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10

    # 服务注册、发现

    场景:在分布式架构中,通常需要有多个服务来共同服务,解决服务单点问题和扩展服务吞吐量。例如:在http微服务架构中,通常客户端直接请求网关服务,网关通过zookeeper查询服务节点信息,将请求转发到后端服务。后端服务启动时向zookeeper注册服务信息。

    zookeeper在http服务中的服务发现应用 zk-discovery-http-server.png

    示例代码

    InstanceDetails.java

    import org.codehaus.jackson.map.annotate.JsonRootName;
    
    @JsonRootName("details")
    public class InstanceDetails {
    
        private String description;
    
        public InstanceDetails() {
            this("");
        }
    
        public InstanceDetails(String description) {
            this.description = description;
        }
    
        public void setDescription(String description) {
            this.description = description;
        }
    
        public String getDescription() {
            return description;
        }
    }
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23

    ExampleServer.java

    import org.apache.curator.framework.CuratorFramework;
    import org.apache.curator.utils.CloseableUtils;
    import org.apache.curator.x.discovery.ServiceDiscovery;
    import org.apache.curator.x.discovery.ServiceDiscoveryBuilder;
    import org.apache.curator.x.discovery.ServiceInstance;
    import org.apache.curator.x.discovery.UriSpec;
    import org.apache.curator.x.discovery.details.JsonInstanceSerializer;
    
    import java.io.Closeable;
    
    public class ExampleServer implements Closeable {
        private final ServiceDiscovery<InstanceDetails> serviceDiscovery;
        private final ServiceInstance<InstanceDetails> thisInstance;
    
        public ExampleServer(CuratorFramework client, String path, String serviceName, String description)
                throws Exception {
            UriSpec uriSpec = new UriSpec("{scheme}://foo.com:{port}");
            // 构造服务信息
            thisInstance = ServiceInstance.<InstanceDetails> builder().name(serviceName)
                    .payload(new InstanceDetails(description)).port((int) (65535 * Math.random())).uriSpec(uriSpec).build();
            // 服务信息json序列化器
            JsonInstanceSerializer<InstanceDetails> serializer = new JsonInstanceSerializer<InstanceDetails>(
                    InstanceDetails.class);
            // 创建服务发现对象(用户注册,查询,修改,删除服务)
            serviceDiscovery = ServiceDiscoveryBuilder.builder(InstanceDetails.class).client(client).basePath(path)
                    .serializer(serializer).thisInstance(thisInstance).build();
        }
    
        public ServiceInstance<InstanceDetails> getThisInstance() {
            return thisInstance;
        }
    
        public void start() throws Exception {
            // 服务启动,将节点信息写入zookeeper
            serviceDiscovery.start();
        }
    
        @Override
        public void close() {
            // 服务关闭,将节点数据删除,并断开与zookeeper连接
            CloseableUtils.closeQuietly(serviceDiscovery);
        }
    }
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43

    DiscoveryMain.java

    import com.google.common.base.Predicate;
    import com.google.common.collect.Iterables;
    import com.google.common.collect.Lists;
    import com.google.common.collect.Maps;
    import org.apache.curator.framework.CuratorFramework;
    import org.apache.curator.framework.CuratorFrameworkFactory;
    import org.apache.curator.retry.ExponentialBackoffRetry;
    import org.apache.curator.utils.CloseableUtils;
    import org.apache.curator.x.discovery.ServiceDiscovery;
    import org.apache.curator.x.discovery.ServiceDiscoveryBuilder;
    import org.apache.curator.x.discovery.ServiceInstance;
    import org.apache.curator.x.discovery.ServiceProvider;
    import org.apache.curator.x.discovery.details.JsonInstanceSerializer;
    import org.apache.curator.x.discovery.strategies.RandomStrategy;
    
    import java.io.BufferedReader;
    import java.io.InputStreamReader;
    import java.util.Arrays;
    import java.util.Collection;
    import java.util.List;
    import java.util.Map;
    
    public class DiscoveryMain {
        private static final String PATH = "/example/discovery";
    
        public static void main(String[] args) throws Exception {
            CuratorFramework client = null;
            ServiceDiscovery<InstanceDetails> serviceDiscovery = null;
            Map<String, ServiceProvider<InstanceDetails>> providers = Maps.newHashMap();
            try {
                client = CuratorFrameworkFactory.newClient("127.0.0.1:2181", new ExponentialBackoffRetry(1000, 3));
                client.start();
                // 服务信息json序列化器
                JsonInstanceSerializer<InstanceDetails> serializer = new JsonInstanceSerializer<InstanceDetails>(
                        InstanceDetails.class);
                // 创建服务发现对象(用户注册,查询,修改,删除服务
                serviceDiscovery = ServiceDiscoveryBuilder.builder(InstanceDetails.class).client(client).basePath(PATH)
                        .serializer(serializer).build();
                serviceDiscovery.start();
    
                processCommands(serviceDiscovery, providers, client);
            } finally {
                for (ServiceProvider<InstanceDetails> cache : providers.values()) {
                    CloseableUtils.closeQuietly(cache);
                }
                CloseableUtils.closeQuietly(serviceDiscovery);
                CloseableUtils.closeQuietly(client);
            }
        }
    
        private static void processCommands(ServiceDiscovery<InstanceDetails> serviceDiscovery,
                Map<String, ServiceProvider<InstanceDetails>> providers, CuratorFramework client) throws Exception {
            printHelp();
            List<ExampleServer> servers = Lists.newArrayList();
            try {
                BufferedReader in = new BufferedReader(new InputStreamReader(System.in));
                boolean done = false;
                while (!done) {
                    System.out.print("> ");
                    String line = in.readLine();
                    if (line == null) {
                        break;
                    }
                    String command = line.trim();
                    String[] parts = command.split("\\s");
                    if (parts.length == 0) {
                        continue;
                    }
                    String operation = parts[0];
                    String args[] = Arrays.copyOfRange(parts, 1, parts.length);
    
                    if (operation.equalsIgnoreCase("help") || operation.equalsIgnoreCase("?")) {
                        printHelp();
                    } else if (operation.equalsIgnoreCase("q") || operation.equalsIgnoreCase("quit")) {
                        done = true;
                    } else if (operation.equals("add")) {
                        addInstance(args, client, command, servers);
                    } else if (operation.equals("delete")) {
                        deleteInstance(args, command, servers);
                    } else if (operation.equals("random")) {
                        listRandomInstance(args, serviceDiscovery, providers, command);
                    } else if (operation.equals("list")) {
                        listInstances(serviceDiscovery);
                    }
                }
            } finally {
                for (ExampleServer server : servers) {
                    CloseableUtils.closeQuietly(server);
                }
            }
        }
    
    
        private static void listRandomInstance(String[] args, ServiceDiscovery<InstanceDetails> serviceDiscovery,
                Map<String, ServiceProvider<InstanceDetails>> providers, String command) throws Exception {
            if (args.length != 1) {
                System.err.println("syntax error (expected random <name>): " + command);
                return;
            }
    
            String serviceName = args[0];
            ServiceProvider<InstanceDetails> provider = providers.get(serviceName);
            if (provider == null) {
                provider = serviceDiscovery.serviceProviderBuilder().serviceName(serviceName)
                        .providerStrategy(new RandomStrategy<InstanceDetails>()).build();
                providers.put(serviceName, provider);
                provider.start();
                Thread.sleep(2500);
            }
    
            ServiceInstance<InstanceDetails> instance = provider.getInstance();
            if (instance == null) {
                System.err.println("No instances named: " + serviceName);
            } else {
                outputInstance(instance);
            }
        }
        
        private static void listInstances(ServiceDiscovery<InstanceDetails> serviceDiscovery) throws Exception {
            try {
                Collection<String> serviceNames = serviceDiscovery.queryForNames();
                System.out.println(serviceNames.size() + " type(s)");
                for (String serviceName : serviceNames) {
                    Collection<ServiceInstance<InstanceDetails>> instances = serviceDiscovery
                            .queryForInstances(serviceName);
                    System.out.println(serviceName);
                    for (ServiceInstance<InstanceDetails> instance : instances) {
                        outputInstance(instance);
                    }
                }
            } finally {
                CloseableUtils.closeQuietly(serviceDiscovery);
            }
        }
    
        private static void outputInstance(ServiceInstance<InstanceDetails> instance) {
            System.out.println("\t" + instance.getPayload().getDescription() + ": " + instance.buildUriSpec());
        }
    
        private static void deleteInstance(String[] args, String command, List<ExampleServer> servers) {
            // simulate a random instance going down
            // in a real application, this would occur due to normal operation, a crash, maintenance, etc.
    
            if (args.length != 1) {
                System.err.println("syntax error (expected delete <name>): " + command);
                return;
            }
    
            final String serviceName = args[0];
            ExampleServer server = Iterables.find(servers, new Predicate<ExampleServer>() {
                @Override
                public boolean apply(ExampleServer server) {
                    return server.getThisInstance().getName().endsWith(serviceName);
                }
            }, null);
            if (server == null) {
                System.err.println("No servers found named: " + serviceName);
                return;
            }
    
            servers.remove(server);
            CloseableUtils.closeQuietly(server);
            System.out.println("Removed a random instance of: " + serviceName);
        }
    
        private static void addInstance(String[] args, CuratorFramework client, String command, List<ExampleServer> servers)
                throws Exception {
            if (args.length < 2) {
                System.err.println("syntax error (expected add <name> <description>): " + command);
                return;
            }
    
            StringBuilder description = new StringBuilder();
            for (int i = 1; i < args.length; ++i) {
                if (i > 1) {
                    description.append(' ');
                }
                description.append(args[i]);
            }
    
            String serviceName = args[0];
            ExampleServer server = new ExampleServer(client, PATH, serviceName, description.toString());
            servers.add(server);
            server.start();
    
            System.out.println(serviceName + " added");
        }
    
        private static void printHelp() {
            System.out.println(
                    "An example of using the ServiceDiscovery APIs. This example is driven by entering commands at the prompt:\n");
            System.out.println("add <name> <description>: Adds a mock service with the given name and description");
            System.out.println("delete <name>: Deletes one of the mock services with the given name");
            System.out.println("list: Lists all the currently registered services");
            System.out.println("random <name>: Lists a random instance of the service with the given name");
            System.out.println("quit: Quit the example");
            System.out.println();
        }
    }
    
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    100
    101
    102
    103
    104
    105
    106
    107
    108
    109
    110
    111
    112
    113
    114
    115
    116
    117
    118
    119
    120
    121
    122
    123
    124
    125
    126
    127
    128
    129
    130
    131
    132
    133
    134
    135
    136
    137
    138
    139
    140
    141
    142
    143
    144
    145
    146
    147
    148
    149
    150
    151
    152
    153
    154
    155
    156
    157
    158
    159
    160
    161
    162
    163
    164
    165
    166
    167
    168
    169
    170
    171
    172
    173
    174
    175
    176
    177
    178
    179
    180
    181
    182
    183
    184
    185
    186
    187
    188
    189
    190
    191
    192
    193
    194
    195
    196
    197
    198
    199
    200

    依赖

    <dependency>
        <groupId>org.apache.curator</groupId>
        <artifactId>curator-framework</artifactId>
        <version>2.12.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.curator</groupId>
        <artifactId>curator-x-discovery</artifactId>
        <version>2.12.0</version>
    </dependency>
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10

    # 分布式锁

    # 消息发布、订阅

    很少用,通常都使用专门处理处理消息的中间件,例如:rabbitmq、kafka、rocketmq等。

    #zookeeper
    Last Updated: 2024/04/23, 01:30:37
    zookeeper介绍
    OpenResty从入门到开发一个网关服务

    ← zookeeper介绍 OpenResty从入门到开发一个网关服务→

    最近更新
    01
    go-kit学习指南 - 多协议支持
    04-19
    02
    go-kit学习指南 - 中间件
    04-19
    03
    go-kit开发微服务 - 服务注册与发现
    04-19
    更多文章>
    Theme by Vdoing | Copyright © 2016-2024 铁匠 | 粤ICP备15021633号
    • 跟随系统
    • 浅色模式
    • 深色模式
    • 阅读模式