zookeeper

介绍

高性能分布式数据一致性解决方案,

基本概念:

  • 集群角色
    • leader: 负责进行投票的发起和决议,更新系统状态
    • follower: 接收客户端请求并想客户端返回结果,在选主过程中参与投票
    • observer: 接收客户端连接,将写请求转发给leader,但不参与投票,只同步leader的状态,observer的目的是为了扩展系统,提高读取速度
  • 会话: 客户端与服务器的链接
  • 节点
    • 临时节点: 在会话断开后删除
    • 永久节点: 在会话断开后不会删除
  • 版本
    • version: 当前节点数据变更版本号
    • cversion: 当前节点子节点版本号
    • acersion: 当前节点ACL变更版本号
  • watcher: 注册相关数据变更的client就是一个watcher。当数据变更时,server将数据变更事件通知给对应数据的watcher
  • ACL权限控制:
    • CREATE: 节点创建
    • READ: 节点数据读取
    • WRITE: 节点数据更新
    • DELETE: 节点数据删除
    • ADMIN: 节点数据ACL管理

zkCli基本使用

# 连接zk服务器,默认连接127.0.0.1:2181
bash zkCli.sh

e.zookeeper.ZooKeeperMain$MyWatcher@41906a77
Welcome to ZooKeeper!
2019-02-17 17:12:43,137 [myid:] - INFO  [main-SendThread(localhost:2181):ClientCnxn$SendThread@1028] - Opening socket connection to server localhost/127.0.0.1:2181. Will not attempt to authenticate using SASL (unknown error)
JLine support is enabled
2019-02-17 17:12:43,259 [myid:] - INFO  [main-SendThread(localhost:2181):ClientCnxn$SendThread@878] - Socket connection established to localhost/127.0.0.1:2181, initiating session
2019-02-17 17:12:43,281 [myid:] - INFO  [main-SendThread(localhost:2181):ClientCnxn$SendThread@1302] - Session establishment complete on server localhost/127.0.0.1:2181, sessionid = 0x10000b9659a0001, negotiated timeout = 30000

WATCHER::

WatchedEvent state:SyncConnected type:None path:null
[zk: localhost:2181(CONNECTED) 0] help # 帮助文档
ZooKeeper -server host:port cmd args
	stat path [watch]
	set path data [version]
	ls path [watch]
	delquota [-n|-b] path
	ls2 path [watch]
	setAcl path acl
	setquota -n|-b val path
	history 
	redo cmdno
	printwatches on|off
	delete path [version]
	sync path
	listquota path
	rmr path
	get path [watch]
	create [-s] [-e] path data acl
	addauth scheme auth
	quit 
	getAcl path
	close 
	connect host:port


经典使用场景

leader选举

场景:某个服务有多个worker服务,当其中一个服务挂掉后,其他worker能够自动接替任务继续执行,解决服务单点问题。例如监控网站流量的服务,通过解析access log日志,分析当前网站流量,当达到一定阈值时,发出报警短信。

实现原理 zk-leader

示例代码(参考:https://github.com/apache/curator/blob/master/curator-examples/src/main/java/leader/ExampleClient.java

ServerNode.java

import java.io.Serializable;

public class ServerNode implements Serializable {

    private static final long serialVersionUID = -522933808813265389L;

    private int id;
    private String name;
    private boolean active = true;

    public ServerNode(int id) {
        this.id = id;
        this.name = "Client #" + id;
    }

    public ServerNode(int id, String name) {
        this.id = id;
        this.name = name;
    }

    public int getId() {
        return id;
    }

    public void setId(int id) {
        this.id = id;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public boolean isActive() {
        return active;
    }

    public void setActive(boolean active) {
        this.active = active;
    }

    @Override
    public String toString() {
        return "ServerNode{" + "id=" + id + ", name='" + name + '\'' + ", active=" + active + '}';
    }
}

WorkServer.java

import java.io.Serializable;
/**
 * 服务节点信息
 */
public class ServerNode implements Serializable {

    private static final long serialVersionUID = -522933808813265389L;

    private int id;
    private String name;
    private boolean active = false;

    public ServerNode(int id) {
        this.id = id;
        this.name = "Client #" + id;
    }

    public ServerNode(int id, String name) {
        this.id = id;
        this.name = name;
    }

    public int getId() {
        return id;
    }

    public void setId(int id) {
        this.id = id;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public boolean isActive() {
        return active;
    }

    public void setActive(boolean active) {
        this.active = active;
    }

    @Override
    public String toString() {
        return "ServerNode{" + "id=" + id + ", name='" + name + '\'' + ", active=" + active + '}';
    }
}

WorkServer.java

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.leader.LeaderSelector;
import org.apache.curator.framework.recipes.leader.LeaderSelectorListenerAdapter;
import java.io.Closeable;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
/**
 * worker服务
 */
public class WorkServer extends LeaderSelectorListenerAdapter implements Closeable {

    private final ServerNode node;
    private final LeaderSelector leaderSelector;
    private final AtomicInteger leaderCount = new AtomicInteger();

    public WorkServer(CuratorFramework client, String path, int id) {
        this.node = new ServerNode(id);
        // 创建leader选举对象
        leaderSelector = new LeaderSelector(client, path, this);
        // 在释放leader权限后自动重新参加选举
        leaderSelector.autoRequeue();
    }

    public void start() {
        System.out.println(this.node.getName() + " start...");
        // 服务启动
        leaderSelector.start();
    }

    @Override
    public void close() {
        System.out.println(this.node.getName() + " close...");
        // 关闭服务
        leaderSelector.close();
    }

    /**
     * 争抢到leader权限后的回调方法
     */
    @Override
    public void takeLeadership(CuratorFramework client) {
        this.node.setActive(true);
        String name = this.node.getName();
        try {
            final int waitSeconds = (int) (5 * Math.random()) + 1;
            System.out.println(name + " is now the leader. Waiting " + waitSeconds + " seconds...");
            System.out.println(name + " has been leader " + leaderCount.getAndIncrement() + " time(s) before.");
            Thread.sleep(TimeUnit.SECONDS.toMillis(waitSeconds));
        } catch (InterruptedException e) {
            System.err.println(name + " was interrupted.");
            Thread.currentThread().interrupt();
        } finally {
            // 执行完毕,释放leader权限
            System.out.println(name + " relinquishing leadership.\n");
        }
    }
}

LeaderSelectorMain.java

import com.google.common.collect.Lists;
import org.apache.curator.utils.CloseableUtils;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.util.List;
/**
 * 服务选举测试
 */
public class LeaderSelectorMain {
    private static final int CLIENT_QTY = 10;

    private static final String PATH = "/examples/leader";

    public static void main(String[] args) throws Exception {
        List<CuratorFramework> clients = Lists.newArrayList();
        List<WorkServer> workServers = Lists.newArrayList();
        try {
            for (int i = 0; i < CLIENT_QTY; ++i) {
                CuratorFramework client = CuratorFrameworkFactory.newClient("127.0.0.1:2181",
                        new ExponentialBackoffRetry(1000, 3));
                clients.add(client);
                WorkServer workServer = new WorkServer(client, PATH, i);
                workServers.add(workServer);
                client.start();
                workServer.start();
            }

            System.out.println("回车键退出\n");
            new BufferedReader(new InputStreamReader(System.in)).readLine();
        } finally {
            System.out.println("Shutting down...");
            for (WorkServer workServer : workServers) {
                CloseableUtils.closeQuietly(workServer);
            }
            for (CuratorFramework client : clients) {
                CloseableUtils.closeQuietly(client);
            }
        }
    }
}

依赖

<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-framework</artifactId>
    <version>2.12.0</version>
</dependency>
<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-recipes</artifactId>
    <version>2.12.0</version>
</dependency>

服务注册、发现

场景:在分布式架构中,通常需要有多个服务来共同服务,解决服务单点问题和扩展服务吞吐量。例如:在http微服务架构中,通常客户端直接请求网关服务,网关通过zookeeper查询服务节点信息,将请求转发到后端服务。后端服务启动时向zookeeper注册服务信息。

zookeeper在http服务中的服务发现应用 zk-discovery-http-server.png

示例代码

InstanceDetails.java

import org.codehaus.jackson.map.annotate.JsonRootName;

@JsonRootName("details")
public class InstanceDetails {

    private String description;

    public InstanceDetails() {
        this("");
    }

    public InstanceDetails(String description) {
        this.description = description;
    }

    public void setDescription(String description) {
        this.description = description;
    }

    public String getDescription() {
        return description;
    }
}

ExampleServer.java

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.utils.CloseableUtils;
import org.apache.curator.x.discovery.ServiceDiscovery;
import org.apache.curator.x.discovery.ServiceDiscoveryBuilder;
import org.apache.curator.x.discovery.ServiceInstance;
import org.apache.curator.x.discovery.UriSpec;
import org.apache.curator.x.discovery.details.JsonInstanceSerializer;

import java.io.Closeable;

public class ExampleServer implements Closeable {
    private final ServiceDiscovery<InstanceDetails> serviceDiscovery;
    private final ServiceInstance<InstanceDetails> thisInstance;

    public ExampleServer(CuratorFramework client, String path, String serviceName, String description)
            throws Exception {
        UriSpec uriSpec = new UriSpec("{scheme}://foo.com:{port}");
        // 构造服务信息
        thisInstance = ServiceInstance.<InstanceDetails> builder().name(serviceName)
                .payload(new InstanceDetails(description)).port((int) (65535 * Math.random())).uriSpec(uriSpec).build();
        // 服务信息json序列化器
        JsonInstanceSerializer<InstanceDetails> serializer = new JsonInstanceSerializer<InstanceDetails>(
                InstanceDetails.class);
        // 创建服务发现对象(用户注册,查询,修改,删除服务)
        serviceDiscovery = ServiceDiscoveryBuilder.builder(InstanceDetails.class).client(client).basePath(path)
                .serializer(serializer).thisInstance(thisInstance).build();
    }

    public ServiceInstance<InstanceDetails> getThisInstance() {
        return thisInstance;
    }

    public void start() throws Exception {
        // 服务启动,将节点信息写入zookeeper
        serviceDiscovery.start();
    }

    @Override
    public void close() {
        // 服务关闭,将节点数据删除,并断开与zookeeper连接
        CloseableUtils.closeQuietly(serviceDiscovery);
    }
}

DiscoveryMain.java

import com.google.common.base.Predicate;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.utils.CloseableUtils;
import org.apache.curator.x.discovery.ServiceDiscovery;
import org.apache.curator.x.discovery.ServiceDiscoveryBuilder;
import org.apache.curator.x.discovery.ServiceInstance;
import org.apache.curator.x.discovery.ServiceProvider;
import org.apache.curator.x.discovery.details.JsonInstanceSerializer;
import org.apache.curator.x.discovery.strategies.RandomStrategy;

import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Map;

public class DiscoveryMain {
    private static final String PATH = "/example/discovery";

    public static void main(String[] args) throws Exception {
        CuratorFramework client = null;
        ServiceDiscovery<InstanceDetails> serviceDiscovery = null;
        Map<String, ServiceProvider<InstanceDetails>> providers = Maps.newHashMap();
        try {
            client = CuratorFrameworkFactory.newClient("127.0.0.1:2181", new ExponentialBackoffRetry(1000, 3));
            client.start();
            // 服务信息json序列化器
            JsonInstanceSerializer<InstanceDetails> serializer = new JsonInstanceSerializer<InstanceDetails>(
                    InstanceDetails.class);
            // 创建服务发现对象(用户注册,查询,修改,删除服务
            serviceDiscovery = ServiceDiscoveryBuilder.builder(InstanceDetails.class).client(client).basePath(PATH)
                    .serializer(serializer).build();
            serviceDiscovery.start();

            processCommands(serviceDiscovery, providers, client);
        } finally {
            for (ServiceProvider<InstanceDetails> cache : providers.values()) {
                CloseableUtils.closeQuietly(cache);
            }
            CloseableUtils.closeQuietly(serviceDiscovery);
            CloseableUtils.closeQuietly(client);
        }
    }

    private static void processCommands(ServiceDiscovery<InstanceDetails> serviceDiscovery,
            Map<String, ServiceProvider<InstanceDetails>> providers, CuratorFramework client) throws Exception {
        printHelp();
        List<ExampleServer> servers = Lists.newArrayList();
        try {
            BufferedReader in = new BufferedReader(new InputStreamReader(System.in));
            boolean done = false;
            while (!done) {
                System.out.print("> ");
                String line = in.readLine();
                if (line == null) {
                    break;
                }
                String command = line.trim();
                String[] parts = command.split("\\s");
                if (parts.length == 0) {
                    continue;
                }
                String operation = parts[0];
                String args[] = Arrays.copyOfRange(parts, 1, parts.length);

                if (operation.equalsIgnoreCase("help") || operation.equalsIgnoreCase("?")) {
                    printHelp();
                } else if (operation.equalsIgnoreCase("q") || operation.equalsIgnoreCase("quit")) {
                    done = true;
                } else if (operation.equals("add")) {
                    addInstance(args, client, command, servers);
                } else if (operation.equals("delete")) {
                    deleteInstance(args, command, servers);
                } else if (operation.equals("random")) {
                    listRandomInstance(args, serviceDiscovery, providers, command);
                } else if (operation.equals("list")) {
                    listInstances(serviceDiscovery);
                }
            }
        } finally {
            for (ExampleServer server : servers) {
                CloseableUtils.closeQuietly(server);
            }
        }
    }


    private static void listRandomInstance(String[] args, ServiceDiscovery<InstanceDetails> serviceDiscovery,
            Map<String, ServiceProvider<InstanceDetails>> providers, String command) throws Exception {
        if (args.length != 1) {
            System.err.println("syntax error (expected random <name>): " + command);
            return;
        }

        String serviceName = args[0];
        ServiceProvider<InstanceDetails> provider = providers.get(serviceName);
        if (provider == null) {
            provider = serviceDiscovery.serviceProviderBuilder().serviceName(serviceName)
                    .providerStrategy(new RandomStrategy<InstanceDetails>()).build();
            providers.put(serviceName, provider);
            provider.start();
            Thread.sleep(2500);
        }

        ServiceInstance<InstanceDetails> instance = provider.getInstance();
        if (instance == null) {
            System.err.println("No instances named: " + serviceName);
        } else {
            outputInstance(instance);
        }
    }
    
    private static void listInstances(ServiceDiscovery<InstanceDetails> serviceDiscovery) throws Exception {
        try {
            Collection<String> serviceNames = serviceDiscovery.queryForNames();
            System.out.println(serviceNames.size() + " type(s)");
            for (String serviceName : serviceNames) {
                Collection<ServiceInstance<InstanceDetails>> instances = serviceDiscovery
                        .queryForInstances(serviceName);
                System.out.println(serviceName);
                for (ServiceInstance<InstanceDetails> instance : instances) {
                    outputInstance(instance);
                }
            }
        } finally {
            CloseableUtils.closeQuietly(serviceDiscovery);
        }
    }

    private static void outputInstance(ServiceInstance<InstanceDetails> instance) {
        System.out.println("\t" + instance.getPayload().getDescription() + ": " + instance.buildUriSpec());
    }

    private static void deleteInstance(String[] args, String command, List<ExampleServer> servers) {
        // simulate a random instance going down
        // in a real application, this would occur due to normal operation, a crash, maintenance, etc.

        if (args.length != 1) {
            System.err.println("syntax error (expected delete <name>): " + command);
            return;
        }

        final String serviceName = args[0];
        ExampleServer server = Iterables.find(servers, new Predicate<ExampleServer>() {
            @Override
            public boolean apply(ExampleServer server) {
                return server.getThisInstance().getName().endsWith(serviceName);
            }
        }, null);
        if (server == null) {
            System.err.println("No servers found named: " + serviceName);
            return;
        }

        servers.remove(server);
        CloseableUtils.closeQuietly(server);
        System.out.println("Removed a random instance of: " + serviceName);
    }

    private static void addInstance(String[] args, CuratorFramework client, String command, List<ExampleServer> servers)
            throws Exception {
        if (args.length < 2) {
            System.err.println("syntax error (expected add <name> <description>): " + command);
            return;
        }

        StringBuilder description = new StringBuilder();
        for (int i = 1; i < args.length; ++i) {
            if (i > 1) {
                description.append(' ');
            }
            description.append(args[i]);
        }

        String serviceName = args[0];
        ExampleServer server = new ExampleServer(client, PATH, serviceName, description.toString());
        servers.add(server);
        server.start();

        System.out.println(serviceName + " added");
    }

    private static void printHelp() {
        System.out.println(
                "An example of using the ServiceDiscovery APIs. This example is driven by entering commands at the prompt:\n");
        System.out.println("add <name> <description>: Adds a mock service with the given name and description");
        System.out.println("delete <name>: Deletes one of the mock services with the given name");
        System.out.println("list: Lists all the currently registered services");
        System.out.println("random <name>: Lists a random instance of the service with the given name");
        System.out.println("quit: Quit the example");
        System.out.println();
    }
}

依赖

<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-framework</artifactId>
    <version>2.12.0</version>
</dependency>
<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-x-discovery</artifactId>
    <version>2.12.0</version>
</dependency>

分布式锁

消息发布、订阅

很少用,通常都使用专门处理处理消息的中间件,例如:rabbitmq、kafka、rocketmq等。