zookeeper经典使用场景
# leader选举
场景:某个服务有多个worker服务,当其中一个服务挂掉后,其他worker能够自动接替任务继续执行,解决服务单点问题。例如监控网站流量的服务,通过解析access log日志,分析当前网站流量,当达到一定阈值时,发出报警短信。
实现原理
ServerNode.java
import java.io.Serializable;
public class ServerNode implements Serializable {
private static final long serialVersionUID = -522933808813265389L;
private int id;
private String name;
private boolean active = true;
public ServerNode(int id) {
this.id = id;
this.name = "Client #" + id;
}
public ServerNode(int id, String name) {
this.id = id;
this.name = name;
}
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public boolean isActive() {
return active;
}
public void setActive(boolean active) {
this.active = active;
}
@Override
public String toString() {
return "ServerNode{" + "id=" + id + ", name='" + name + '\'' + ", active=" + active + '}';
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
WorkServer.java
import java.io.Serializable;
/**
* 服务节点信息
*/
public class ServerNode implements Serializable {
private static final long serialVersionUID = -522933808813265389L;
private int id;
private String name;
private boolean active = false;
public ServerNode(int id) {
this.id = id;
this.name = "Client #" + id;
}
public ServerNode(int id, String name) {
this.id = id;
this.name = name;
}
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public boolean isActive() {
return active;
}
public void setActive(boolean active) {
this.active = active;
}
@Override
public String toString() {
return "ServerNode{" + "id=" + id + ", name='" + name + '\'' + ", active=" + active + '}';
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
WorkServer.java
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.leader.LeaderSelector;
import org.apache.curator.framework.recipes.leader.LeaderSelectorListenerAdapter;
import java.io.Closeable;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
/**
* worker服务
*/
public class WorkServer extends LeaderSelectorListenerAdapter implements Closeable {
private final ServerNode node;
private final LeaderSelector leaderSelector;
private final AtomicInteger leaderCount = new AtomicInteger();
public WorkServer(CuratorFramework client, String path, int id) {
this.node = new ServerNode(id);
// 创建leader选举对象
leaderSelector = new LeaderSelector(client, path, this);
// 在释放leader权限后自动重新参加选举
leaderSelector.autoRequeue();
}
public void start() {
System.out.println(this.node.getName() + " start...");
// 服务启动
leaderSelector.start();
}
@Override
public void close() {
System.out.println(this.node.getName() + " close...");
// 关闭服务
leaderSelector.close();
}
/**
* 争抢到leader权限后的回调方法
*/
@Override
public void takeLeadership(CuratorFramework client) {
this.node.setActive(true);
String name = this.node.getName();
try {
final int waitSeconds = (int) (5 * Math.random()) + 1;
System.out.println(name + " is now the leader. Waiting " + waitSeconds + " seconds...");
System.out.println(name + " has been leader " + leaderCount.getAndIncrement() + " time(s) before.");
Thread.sleep(TimeUnit.SECONDS.toMillis(waitSeconds));
} catch (InterruptedException e) {
System.err.println(name + " was interrupted.");
Thread.currentThread().interrupt();
} finally {
// 执行完毕,释放leader权限
System.out.println(name + " relinquishing leadership.\n");
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
LeaderSelectorMain.java
import com.google.common.collect.Lists;
import org.apache.curator.utils.CloseableUtils;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.util.List;
/**
* 服务选举测试
*/
public class LeaderSelectorMain {
private static final int CLIENT_QTY = 10;
private static final String PATH = "/examples/leader";
public static void main(String[] args) throws Exception {
List<CuratorFramework> clients = Lists.newArrayList();
List<WorkServer> workServers = Lists.newArrayList();
try {
for (int i = 0; i < CLIENT_QTY; ++i) {
CuratorFramework client = CuratorFrameworkFactory.newClient("127.0.0.1:2181",
new ExponentialBackoffRetry(1000, 3));
clients.add(client);
WorkServer workServer = new WorkServer(client, PATH, i);
workServers.add(workServer);
client.start();
workServer.start();
}
System.out.println("回车键退出\n");
new BufferedReader(new InputStreamReader(System.in)).readLine();
} finally {
System.out.println("Shutting down...");
for (WorkServer workServer : workServers) {
CloseableUtils.closeQuietly(workServer);
}
for (CuratorFramework client : clients) {
CloseableUtils.closeQuietly(client);
}
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
依赖
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>2.12.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>2.12.0</version>
</dependency>
1
2
3
4
5
6
7
8
9
10
2
3
4
5
6
7
8
9
10
# 服务注册、发现
场景:在分布式架构中,通常需要有多个服务来共同服务,解决服务单点问题和扩展服务吞吐量。例如:在http微服务架构中,通常客户端直接请求网关服务,网关通过zookeeper查询服务节点信息,将请求转发到后端服务。后端服务启动时向zookeeper注册服务信息。
zookeeper在http服务中的服务发现应用
示例代码
InstanceDetails.java
import org.codehaus.jackson.map.annotate.JsonRootName;
@JsonRootName("details")
public class InstanceDetails {
private String description;
public InstanceDetails() {
this("");
}
public InstanceDetails(String description) {
this.description = description;
}
public void setDescription(String description) {
this.description = description;
}
public String getDescription() {
return description;
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
ExampleServer.java
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.utils.CloseableUtils;
import org.apache.curator.x.discovery.ServiceDiscovery;
import org.apache.curator.x.discovery.ServiceDiscoveryBuilder;
import org.apache.curator.x.discovery.ServiceInstance;
import org.apache.curator.x.discovery.UriSpec;
import org.apache.curator.x.discovery.details.JsonInstanceSerializer;
import java.io.Closeable;
public class ExampleServer implements Closeable {
private final ServiceDiscovery<InstanceDetails> serviceDiscovery;
private final ServiceInstance<InstanceDetails> thisInstance;
public ExampleServer(CuratorFramework client, String path, String serviceName, String description)
throws Exception {
UriSpec uriSpec = new UriSpec("{scheme}://foo.com:{port}");
// 构造服务信息
thisInstance = ServiceInstance.<InstanceDetails> builder().name(serviceName)
.payload(new InstanceDetails(description)).port((int) (65535 * Math.random())).uriSpec(uriSpec).build();
// 服务信息json序列化器
JsonInstanceSerializer<InstanceDetails> serializer = new JsonInstanceSerializer<InstanceDetails>(
InstanceDetails.class);
// 创建服务发现对象(用户注册,查询,修改,删除服务)
serviceDiscovery = ServiceDiscoveryBuilder.builder(InstanceDetails.class).client(client).basePath(path)
.serializer(serializer).thisInstance(thisInstance).build();
}
public ServiceInstance<InstanceDetails> getThisInstance() {
return thisInstance;
}
public void start() throws Exception {
// 服务启动,将节点信息写入zookeeper
serviceDiscovery.start();
}
@Override
public void close() {
// 服务关闭,将节点数据删除,并断开与zookeeper连接
CloseableUtils.closeQuietly(serviceDiscovery);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
DiscoveryMain.java
import com.google.common.base.Predicate;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.utils.CloseableUtils;
import org.apache.curator.x.discovery.ServiceDiscovery;
import org.apache.curator.x.discovery.ServiceDiscoveryBuilder;
import org.apache.curator.x.discovery.ServiceInstance;
import org.apache.curator.x.discovery.ServiceProvider;
import org.apache.curator.x.discovery.details.JsonInstanceSerializer;
import org.apache.curator.x.discovery.strategies.RandomStrategy;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Map;
public class DiscoveryMain {
private static final String PATH = "/example/discovery";
public static void main(String[] args) throws Exception {
CuratorFramework client = null;
ServiceDiscovery<InstanceDetails> serviceDiscovery = null;
Map<String, ServiceProvider<InstanceDetails>> providers = Maps.newHashMap();
try {
client = CuratorFrameworkFactory.newClient("127.0.0.1:2181", new ExponentialBackoffRetry(1000, 3));
client.start();
// 服务信息json序列化器
JsonInstanceSerializer<InstanceDetails> serializer = new JsonInstanceSerializer<InstanceDetails>(
InstanceDetails.class);
// 创建服务发现对象(用户注册,查询,修改,删除服务
serviceDiscovery = ServiceDiscoveryBuilder.builder(InstanceDetails.class).client(client).basePath(PATH)
.serializer(serializer).build();
serviceDiscovery.start();
processCommands(serviceDiscovery, providers, client);
} finally {
for (ServiceProvider<InstanceDetails> cache : providers.values()) {
CloseableUtils.closeQuietly(cache);
}
CloseableUtils.closeQuietly(serviceDiscovery);
CloseableUtils.closeQuietly(client);
}
}
private static void processCommands(ServiceDiscovery<InstanceDetails> serviceDiscovery,
Map<String, ServiceProvider<InstanceDetails>> providers, CuratorFramework client) throws Exception {
printHelp();
List<ExampleServer> servers = Lists.newArrayList();
try {
BufferedReader in = new BufferedReader(new InputStreamReader(System.in));
boolean done = false;
while (!done) {
System.out.print("> ");
String line = in.readLine();
if (line == null) {
break;
}
String command = line.trim();
String[] parts = command.split("\\s");
if (parts.length == 0) {
continue;
}
String operation = parts[0];
String args[] = Arrays.copyOfRange(parts, 1, parts.length);
if (operation.equalsIgnoreCase("help") || operation.equalsIgnoreCase("?")) {
printHelp();
} else if (operation.equalsIgnoreCase("q") || operation.equalsIgnoreCase("quit")) {
done = true;
} else if (operation.equals("add")) {
addInstance(args, client, command, servers);
} else if (operation.equals("delete")) {
deleteInstance(args, command, servers);
} else if (operation.equals("random")) {
listRandomInstance(args, serviceDiscovery, providers, command);
} else if (operation.equals("list")) {
listInstances(serviceDiscovery);
}
}
} finally {
for (ExampleServer server : servers) {
CloseableUtils.closeQuietly(server);
}
}
}
private static void listRandomInstance(String[] args, ServiceDiscovery<InstanceDetails> serviceDiscovery,
Map<String, ServiceProvider<InstanceDetails>> providers, String command) throws Exception {
if (args.length != 1) {
System.err.println("syntax error (expected random <name>): " + command);
return;
}
String serviceName = args[0];
ServiceProvider<InstanceDetails> provider = providers.get(serviceName);
if (provider == null) {
provider = serviceDiscovery.serviceProviderBuilder().serviceName(serviceName)
.providerStrategy(new RandomStrategy<InstanceDetails>()).build();
providers.put(serviceName, provider);
provider.start();
Thread.sleep(2500);
}
ServiceInstance<InstanceDetails> instance = provider.getInstance();
if (instance == null) {
System.err.println("No instances named: " + serviceName);
} else {
outputInstance(instance);
}
}
private static void listInstances(ServiceDiscovery<InstanceDetails> serviceDiscovery) throws Exception {
try {
Collection<String> serviceNames = serviceDiscovery.queryForNames();
System.out.println(serviceNames.size() + " type(s)");
for (String serviceName : serviceNames) {
Collection<ServiceInstance<InstanceDetails>> instances = serviceDiscovery
.queryForInstances(serviceName);
System.out.println(serviceName);
for (ServiceInstance<InstanceDetails> instance : instances) {
outputInstance(instance);
}
}
} finally {
CloseableUtils.closeQuietly(serviceDiscovery);
}
}
private static void outputInstance(ServiceInstance<InstanceDetails> instance) {
System.out.println("\t" + instance.getPayload().getDescription() + ": " + instance.buildUriSpec());
}
private static void deleteInstance(String[] args, String command, List<ExampleServer> servers) {
// simulate a random instance going down
// in a real application, this would occur due to normal operation, a crash, maintenance, etc.
if (args.length != 1) {
System.err.println("syntax error (expected delete <name>): " + command);
return;
}
final String serviceName = args[0];
ExampleServer server = Iterables.find(servers, new Predicate<ExampleServer>() {
@Override
public boolean apply(ExampleServer server) {
return server.getThisInstance().getName().endsWith(serviceName);
}
}, null);
if (server == null) {
System.err.println("No servers found named: " + serviceName);
return;
}
servers.remove(server);
CloseableUtils.closeQuietly(server);
System.out.println("Removed a random instance of: " + serviceName);
}
private static void addInstance(String[] args, CuratorFramework client, String command, List<ExampleServer> servers)
throws Exception {
if (args.length < 2) {
System.err.println("syntax error (expected add <name> <description>): " + command);
return;
}
StringBuilder description = new StringBuilder();
for (int i = 1; i < args.length; ++i) {
if (i > 1) {
description.append(' ');
}
description.append(args[i]);
}
String serviceName = args[0];
ExampleServer server = new ExampleServer(client, PATH, serviceName, description.toString());
servers.add(server);
server.start();
System.out.println(serviceName + " added");
}
private static void printHelp() {
System.out.println(
"An example of using the ServiceDiscovery APIs. This example is driven by entering commands at the prompt:\n");
System.out.println("add <name> <description>: Adds a mock service with the given name and description");
System.out.println("delete <name>: Deletes one of the mock services with the given name");
System.out.println("list: Lists all the currently registered services");
System.out.println("random <name>: Lists a random instance of the service with the given name");
System.out.println("quit: Quit the example");
System.out.println();
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
依赖
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>2.12.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-x-discovery</artifactId>
<version>2.12.0</version>
</dependency>
1
2
3
4
5
6
7
8
9
10
2
3
4
5
6
7
8
9
10
# 分布式锁
# 消息发布、订阅
很少用,通常都使用专门处理处理消息的中间件,例如:rabbitmq、kafka、rocketmq等。
Last Updated: 2024/04/23, 01:30:37