ZooKeeper與Curator注冊和監(jiān)控方法

這篇文章主要介紹“ZooKeeper與Curator注冊和監(jiān)控方法”,在日常操作中,相信很多人在ZooKeeper與Curator注冊和監(jiān)控方法問題上存在疑惑,小編查閱了各式資料,整理出簡單好用的操作方法,希望對大家解答”ZooKeeper與Curator注冊和監(jiān)控方法”的疑惑有所幫助!接下來,請跟著小編一起來學(xué)習(xí)吧!

這篇文章主要介紹“ZooKeeper與Curator注冊和監(jiān)控方法”,在日常操作中,相信很多人在ZooKeeper與Curator注冊和監(jiān)控方法問題上存在疑惑,小編查閱了各式資料,整理出簡單好用的操作方法,希望對大家解答”ZooKeeper與Curator注冊和監(jiān)控方法”的疑惑有所幫助!接下來,請跟著小編一起來學(xué)習(xí)吧!

創(chuàng)新互聯(lián)公司堅持“要么做到,要么別承諾”的工作理念,服務(wù)領(lǐng)域包括:成都網(wǎng)站設(shè)計、成都網(wǎng)站建設(shè)、企業(yè)官網(wǎng)、英文網(wǎng)站、手機(jī)端網(wǎng)站、網(wǎng)站推廣等服務(wù),滿足客戶于互聯(lián)網(wǎng)時代的貞豐網(wǎng)站設(shè)計、移動媒體設(shè)計的需求,幫助企業(yè)找到有效的互聯(lián)網(wǎng)解決方案。努力成為您成熟可靠的網(wǎng)絡(luò)建設(shè)合作伙伴!

Curator提供了對zookeeper客戶端的封裝,并監(jiān)控連接狀態(tài)和會話session,特別是會話session過期后,curator能夠重新連接zookeeper,并且創(chuàng)建一個新的session。

對于zk的使用者來說,session的概念至關(guān)重要,如果想了解更多session的說明,請訪問:

http://zookeeper.apache.org/doc/trunk/zookeeperProgrammers.html

zk客戶端和zk間主要可能存在下面幾種異常情況:

短暫失去連接:此時客戶端檢測到與服務(wù)端的連接已經(jīng)斷開,但是服務(wù)端維護(hù)的客戶端session尚未過期,之后客戶端和服務(wù)端重新建立了連接;當(dāng)客戶端重新連接后,由于session沒有過期,zookeeper能夠保證連接恢復(fù)后保持正常服務(wù)。

失去連接時間很長:此時服務(wù)器相對于客戶端的session已經(jīng)過期了,與先前session相關(guān)的watcher和ephemeral的路徑和數(shù)據(jù)都會消失;當(dāng)Curator重新創(chuàng)建了與zk的連接后,會獲取到session expired異常,Curator會銷毀先前的session,并且會創(chuàng)建一個新的session,需要注意的是,與之前session相關(guān)的watcher和ephemeral類型的路徑和數(shù)據(jù)在新的session中也不會存在,需要開發(fā)者在CuratorFramework.getConnectionStateListenable().addListener()中添加狀態(tài)監(jiān)聽事件,對ConnectionState.LOST事件進(jìn)行監(jiān)聽,當(dāng)session過期后,使得之前的session狀態(tài)得以恢復(fù)。對于ephemeral類型,在客戶端應(yīng)該保持?jǐn)?shù)據(jù)的狀態(tài),以便及時恢復(fù)。

客戶端重新啟動:不論先前的zk session是否已經(jīng)過期,都需要重新創(chuàng)建臨時節(jié)點(diǎn)、添加數(shù)據(jù)和watch事件,先前的session也會在稍后的一段時間內(nèi)過期。

Zk服務(wù)器重新啟動:由于zk將session信息存放到了硬盤上,因此重啟后,先前未過期的session仍然存在,在zk服務(wù)器啟動后,客戶端與zk服務(wù)器創(chuàng)建新的連接,并使用先前的session,與1相同。

需要注意的是,當(dāng)session過期了,在session過期期間另外的客戶端修改了zk的值,那么這個修改在客戶端重新連接到zk上時,zk客戶端不會接收到這個修改的watch事件(盡管添加了watch),如果需要嚴(yán)格的watch邏輯,就需要在curator的狀態(tài)監(jiān)控中添加邏輯。

特別提示:watcher僅僅是一次性的,zookeeper通知了watcher事件后,就會將這個watcher從session中刪除,因此,如果想繼續(xù)監(jiān)控,就要添加新的watcher。

下面提供了對persistent和ephemeral兩種類型節(jié)點(diǎn)的監(jiān)控方法,其中g(shù)et方法說明了persistent節(jié)點(diǎn)如何監(jiān)控,而register方法說明了ephemeral類型的節(jié)點(diǎn)如何監(jiān)控。

package demo;import java.net.InetAddress;import java.nio.charset.Charset;import java.util.concurrent.ConcurrentSkipListSet;import org.apache.curator.framework.CuratorFramework;import org.apache.curator.framework.CuratorFrameworkFactory;import org.apache.curator.framework.api.CuratorWatcher;import org.apache.curator.framework.state.ConnectionState;import org.apache.curator.framework.state.ConnectionStateListener;import org.apache.curator.retry.RetryNTimes;import org.apache.zookeeper.CreateMode;import org.apache.zookeeper.WatchedEvent;import org.apache.zookeeper.Watcher.Event.EventType;import org.apache.zookeeper.ZooDefs;import org.apache.zookeeper.data.Stat;public class CuratorTest {    private CuratorFramework zkTools;    private ConcurrentSkipListSet<String> watchers = new ConcurrentSkipListSet<String>();    private static Charset charset = Charset.forName("utf-8");    public CuratorTest() {        zkTools = CuratorFrameworkFactory.builder()                .connectString("192.168.0.216:3306")                .namespace("zk/test")                .retryPolicy(new RetryNTimes(2000, 20000))                .build();        zkTools.start();    }    public void addReconnectionWatcher(final String path, final ZookeeperWatcherType watcherType,            final CuratorWatcher watcher) {        synchronized (this) {            if (!watchers.contains(watcher.toString()))// 不要添加重復(fù)的監(jiān)聽事件            {                watchers.add(watcher.toString());                System.out.println("add new watcher " + watcher);                zkTools.getConnectionStateListenable().addListener(new ConnectionStateListener() {                    @Override                    public void stateChanged(CuratorFramework client, ConnectionState newState) {                        System.out.println(newState);                        if (newState == ConnectionState.LOST) {// 處理session過期                            try {                                if (watcherType == ZookeeperWatcherType.EXITS) {                                    zkTools.checkExists().usingWatcher(watcher).forPath(path);                                } else if (watcherType == ZookeeperWatcherType.GET_CHILDREN) {                                    zkTools.getChildren().usingWatcher(watcher).forPath(path);                                } else if (watcherType == ZookeeperWatcherType.GET_DATA) {                                    zkTools.getData().usingWatcher(watcher).forPath(path);                                } else if (watcherType == ZookeeperWatcherType.CREATE_ON_NO_EXITS) {                                    // ephemeral類型的節(jié)點(diǎn)session過期了,需要重新創(chuàng)建節(jié)點(diǎn),并且注冊監(jiān)聽事件,之后監(jiān)聽事件中,                                    // 會處理create事件,將路徑值恢復(fù)到先前狀態(tài)                                    Stat stat = zkTools.checkExists().usingWatcher(watcher)                                            .forPath(path);                                    if (stat == null) {                                        System.err.println("to create");                                        zkTools.create().creatingParentsIfNeeded()                                                .withMode(CreateMode.EPHEMERAL)                                                .withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE).forPath(path);                                    }                                }                            } catch (Exception e) {                                e.printStackTrace();                            }                        }                    }                });            }        }    }    public void create() throws Exception {        zkTools.create()// 創(chuàng)建一個路徑                .creatingParentsIfNeeded()// 如果指定的節(jié)點(diǎn)的父節(jié)點(diǎn)不存在,遞歸創(chuàng)建父節(jié)點(diǎn)                .withMode(CreateMode.PERSISTENT)// 存儲類型(臨時的還是持久的)                .withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)// 訪問權(quán)限                .forPath("zk/test");// 創(chuàng)建的路徑    }    public void put() throws Exception {        // 對路徑節(jié)點(diǎn)賦值        zkTools.setData().forPath("zk/test", "hello world".getBytes(Charset.forName("utf-8")));    }    public void get() throws Exception {        String path = "zk/test";        ZKWatch watch = new ZKWatch(path);        byte[] buffer = zkTools.getData().usingWatcher(watch).forPath(path);        System.out.println(new String(buffer, charset));        // 添加session過期的監(jiān)控        addReconnectionWatcher(path, ZookeeperWatcherType.GET_DATA, watch);    }    public void register() throws Exception {        String ip = InetAddress.getLocalHost().getHostAddress();        String registeNode = "zk/register/" + ip;// 節(jié)點(diǎn)路徑        byte[] data = "disable".getBytes(charset);// 節(jié)點(diǎn)值        CuratorWatcher watcher = new ZKWatchRegister(registeNode, data); // 創(chuàng)建一個register watcher        Stat stat = zkTools.checkExists().forPath(registeNode);        if (stat != null) {            zkTools.delete().forPath(registeNode);        }        zkTools.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL)                .withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE).forPath(registeNode, data);// 創(chuàng)建的路徑和值        // 添加到session過期監(jiān)控事件中        addReconnectionWatcher(registeNode, ZookeeperWatcherType.CREATE_ON_NO_EXITS, watcher);        data = zkTools.getData().usingWatcher(watcher).forPath(registeNode);        System.out.println("get path form zk : " + registeNode + ":" + new String(data, charset));    }    public static void main(String[] args) throws Exception {        CuratorTest test = new CuratorTest();        test.get();        test.register();        Thread.sleep(10000000000L);    }    public class ZKWatch implements CuratorWatcher {        private final String path;        public String getPath() {            return path;        }        public ZKWatch(String path) {            this.path = path;        }        @Override        public void process(WatchedEvent event) throws Exception {            System.out.println(event.getType());            if (event.getType() == EventType.NodeDataChanged) {                byte[] data = zkTools.getData().usingWatcher(this).forPath(path);                System.out.println(path + ":" + new String(data, Charset.forName("utf-8")));            }        }    }    public class ZKWatchRegister implements CuratorWatcher {        private final String path;        private byte[] value;        public String getPath() {            return path;        }        public ZKWatchRegister(String path, byte[] value) {            this.path = path;            this.value = value;        }        @Override        public void process(WatchedEvent event) throws Exception {            System.out.println(event.getType());            if (event.getType() == EventType.NodeDataChanged) {                // 節(jié)點(diǎn)數(shù)據(jù)改變了,需要記錄下來,以便session過期后,能夠恢復(fù)到先前的數(shù)據(jù)狀態(tài)                byte[] data = zkTools.getData().usingWatcher(this).forPath(path);                value = data;                System.out.println(path + ":" + new String(data, charset));            } else if (event.getType() == EventType.NodeDeleted) {                // 節(jié)點(diǎn)被刪除了,需要創(chuàng)建新的節(jié)點(diǎn)                System.out.println(path + ":" + path + " has been deleted.");                Stat stat = zkTools.checkExists().usingWatcher(this).forPath(path);                if (stat == null) {                    zkTools.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL)                            .withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE).forPath(path);                }            } else if (event.getType() == EventType.NodeCreated) {                // 節(jié)點(diǎn)被創(chuàng)建時,需要添加監(jiān)聽事件(創(chuàng)建可能是由于session過期后,curator的狀態(tài)監(jiān)聽部分觸發(fā)的)                System.out.println(path + ":" + " has been created!" + "the current data is "                        + new String(value));                zkTools.setData().forPath(path, value);                zkTools.getData().usingWatcher(this).forPath(path);            }        }    }    public enum ZookeeperWatcherType {        GET_DATA, GET_CHILDREN, EXITS, CREATE_ON_NO_EXITS    }}

本文名稱:ZooKeeper與Curator注冊和監(jiān)控方法
本文鏈接:http://muchs.cn/article2/choic.html

成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供靜態(tài)網(wǎng)站、搜索引擎優(yōu)化、網(wǎng)站營銷、商城網(wǎng)站、定制開發(fā)、響應(yīng)式網(wǎng)站

廣告

聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請盡快告知,我們將會在第一時間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場,如需處理請聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時需注明來源: 創(chuàng)新互聯(lián)

搜索引擎優(yōu)化