使用多播技术动态更新系统配置

multicast(多播技术)

多播是指把信息同时传递给一组目的地址。它使用的策略是最高效的,因为消息在每条网络链路上只需传递一次,且只有在链路分叉的时候,消息才会被复制。

解决什么问题

有这样一个模块A,管理系统的很多配置,其它模块很多地方都是需要从A中读取配置。
A中的初始配置数据是从数据库中读取出来的,然后保存在一个缓存map当中。
其它模块读取配置时,就直接由map中取出数据。
方便快捷。
但这又引出一个问题,当数据库中的数据发生了改变,就与A模块中的map中的数据不太一致。
当然初步的想法就是在数据库改变时,更新下map中对应的key的值。
想法是正确的,但这又会存在另一个问题,如果模块A在多个进程中都被使用,或者说多个web站点使用(就是说A属于基础类库)。
当一个进程X,发生改变系统配置的行为,如果更新了数据库的相关字段,X可以使自己调用相关函数使X的模块A中的map数据与数据库一致。
因为进程都是相关独立的,容器中的站点也是相关独立的,X进程无法直接调用Y进程的函数,使Y函数的A模块数据改变。
这就形成了X,Y进程中同一个A模块数据不一致,X进程是与数据库中的一致,Y进程与数据库中不一致。
当进程更多,改变的数据更多,配置数据就不是实时的了。

这个问题最根本的原因就是进程之间的通讯,如果X进程,改变了数据。通过进程间的通讯,告诉Y进程某配置发生了改变,这样Y进程从数据库中再去读一次,配置就是一致的了。

用多播怎么解决的

在上面问题的背景下,由于X进程也不知道有多少进程在使用A模块,也并没有使用A模块进程的相关信息。
多播就在这里发挥了作用。
在A模块中增加一个监听器,监听一多播地址。
多播地址有这样一个特性:
一般来说,一个端口只会让一个进程绑定,第二个进程绑定时,会出现已经被用了的异常。
但多播地址的端口可以让多个进程绑定。

当A模块的某数据发生改变时,就跟多播地址发条消息。
这样所有的A模块的监听器都会收到这条消息,再对这消息进行处理就行。
不管A模块在哪些进程中,这些进程都能收到消息,完成了进程间的通知。

示例代码

ConfigureMonitor配置监听器,收广播消息,使缓存无效。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35

public class ConfigureMonitor extends Thread {

private boolean isclose = false;
private InetSocketAddress groupaddress = null;
private ConfigureProvider provider = null;
public ConfigureMonitor(ConfigureProvider p) {
provider = p;
groupaddress = p.groupAddress;
}

@Override
public void run() {
try {
MulticastSocket socket = new MulticastSocket(groupaddress.getPort());
socket.joinGroup(groupaddress.getAddress());
DatagramPacket packet = new DatagramPacket(new byte[1024], 1024);
while (!isclose) {
socket.receive(packet);
byte[] buf = packet.getData();
String key = new String(buf, packet.getOffset(), packet.getLength());
// 收到消息,使缓存无效
provider.CACHE.remove(key);
System.out.println(Thread.currentThread().getName() + "配置参数 "+key+" 已失效");
}
socket.leaveGroup(InetAddress.getByName("225.1.2.3"));
socket.close();
} catch (Exception e) {
e.printStackTrace();
}
}

public void doClose() {
isclose = true;
}

ConfigureProvider 配置提供类,简化版的A模块。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
public class ConfigureProvider {

final Map<String, String> CACHE = new WeakHashMap<String, String>();
InetSocketAddress groupAddress;
ConfigureMonitor monitor = null;

public ConfigureProvider() {
try {
// 初始化配置
groupAddress = new InetSocketAddress(InetAddress.getByName("225.1.2.4"), 17788);
monitor = new ConfigureMonitor(this);
monitor.start();//开启监听线程
} catch (UnknownHostException e) {
e.printStackTrace();
}
}

public void invalidProperty(String key) {
try {
byte[] buf = key.getBytes();
DatagramPacket packet = new DatagramPacket(buf, buf.length, groupAddress);
MulticastSocket socket;
// 向多播地址发消息
socket = new MulticastSocket(groupAddress.getPort());
socket.joinGroup(groupAddress.getAddress());
socket.send(packet);
socket.leaveGroup(groupAddress.getAddress());
socket.close();
} catch (IOException e) {
e.printStackTrace();
}
}

public String getProperty(String key) {
// 缓存中有,就返回缓存中
if(CACHE.containsKey(key))
{
return CACHE.get(key);
}
// 没有就从db中取
String value = getFromDB(key);
// 取完更新到缓存中
CACHE.put(key, value);
return value;
}

public void setProperty(String key, String value) {
// 更新到数据库中
updateToDB(key, value);
// 使缓存中的key无效,发出多播
invalidProperty(key);
}
void preload()
{
// 在这里把相关数据从数据库中读到缓存中, 过程略
}
public void close()
{
monitor.doClose();
monitor.interrupt();
monitor = null;
}
private String getFromDB(String key)
{
// 过程略
return "xxx";
}
private void updateToDB(String key,String value)
{
// 过程略
}
}

代码解析

  • 每次setProperty时,就有先更新数据库,再发个的广播,进行通知。
  • 在消息接收器中,使缓存无效。
  • 下次再getProperty时,发现缓存中没有,就会从数据库中去取。

这样就算实现了动态更新。

动态更新配置的其它方法

  • watch dog(观察者模式)文件发生改变进行通知
  • 定时扫描

参考