zookeeper小入门(二)

577 查看

昨晚微信开放了JS接口。忙活了一晚上。
然后周六就没啥斗志了,突然想起第二篇说好要介绍demo的没介绍,就赶紧来写了。

先来个 传送门

这里简单介绍了下官方demo。用来演示Zookeeper官方推荐的程序框架(Executor、DataMonitor)。

这个Demo实现了如下功能:

  1. 监视一个结点,获取该结点的数据,并启动一个自定义程序
  2. 如果这个结点发生变化了(数据更改,新增或者删除),则停止这个自定义程序并重新执行。
  3. 如果这个结点被删除了,则停止运行自定义程序

对着Demo看下代码。首先,官方有这么个章节

Program Design

Conventionally, ZooKeeper applications are broken into two units, one which maintains the connection, and the other which monitors data. In this application, the class called the Executor maintains the ZooKeeper connection, and the class called the DataMonitor monitors the data in the ZooKeeper tree. Also, Executor contains the main thread and contains the execution logic. It is responsible for what little user interaction there is, as well as interaction with the exectuable program you pass in as an argument and which the sample (per the requirements) shuts down and restarts, according to the state of the znode.

通俗翻译下,官方认为,一个zookeeper应用简单地分为两个单元即可,一个用来管理连接,另一个用来监视数据。在我们的Demo中,Executor被用来管理和Zookeeper的连接,DataMonitor用来监视Zookeeper中的数据树。另外,Executor同时也包含了主线程的执行(也就是Main函数),然后也包含一点点的执行逻辑。Demo对用户交互的部分非常少,输入的几个参数(好吧,我自己去实现的时候,偷懒去写死了)就可以开始跑了,Demo还会对znode的变化或者新的znode的连入进行一些响应。

看完这段,对原文最下面的完整样例算了解一些,那这里既然就两个类,ExecutorDataMonitor嘛,对吧。就分别看下。

javapublic Executor(String hostPort, String znode, String filename,
                    String exec[]) throws KeeperException, IOException {
    this.filename = filename;
    this.exec = exec;
    zk = new ZooKeeper(hostPort, 3000, this);
    dm = new DataMonitor(zk, znode, null, this);
}

这个构造函数么实际上做了这么几个事:

  1. 这里的filename是用来记录znode变化后的data的,随便填写,当然你也可以删了。
  2. exec就是上面提到的 znode变化后执行的程序。(我偷懒,指定了一个ls)
  3. zk就是用来初始化ZooKeeper连接的啦,第一个参数是主机和端口号,第二个是Session超时时间,第三个是DefaultWatcher。DefaultWacheter就是你在不指定wacher对象但是想watch的时候默认的watcher,Wathcer是一个接口。
  4. dm就是初始化DataMoniter用来监视znode啦。

Watcher

javapublic void process(WatchedEvent event);

这个是Watcher接口里的唯一函数,用处就是在你关心的事件被触发后,会调用这个接口,具体参数内容是什么就看文档吧。

DataMonitor

javapublic DataMonitor(ZooKeeper zk, String znode, Watcher chainedWatcher,
                       DataMonitorListener listener) {
    this.zk = zk;
    this.znode = znode;
    this.chainedWatcher = chainedWatcher;
    this.listener = listener;
    // Get things started by checking if the node exists. We are going
    // to be completely event driven
    zk.exists(znode, true, this, null);
}

这段代码是DataMonitor的构造函数,除了里面的一些赋值函数,重点说下zk.exists这个函数
它有4个参数,第一个参数是你要query的znode名字,类似/zk_test这样的。第二个参数是问你要不要watch,就是这个结点变化后要不要得到通知,第三个是exists结果的回调,是StatCallback接口,exists目的是想查找这个znode是否存在,结果从第三个接口返回,

其实这个接口的设计让我挺头疼的,单一职责原则不见了,这个接口干了两件事情,第一是告诉你这个znode存在不存在,第二件事是问你要不要监控这个znode的变化。OK这里你不提供Watcher的话,就用上面new Zookeeper里面提供的默认Watcher(这里就是我们的Executor对象啦)。
好了让我们看下工作流吧。

zk.exists第一次调用的时候,返回的结果传给了DataMonitor(实现了StatCallback接口)的processResult函数

java public void processResult(int rc, String path, Object ctx, Stat stat) {
        boolean exists;
        switch (rc) {
            case Code.Ok:
                exists = true;
                break;
            case Code.NoNode:
                exists = false;
                break;
            case Code.SessionExpired:
            case Code.NoAuth:
                dead = true;
                listener.closing(rc);
                return;
            default:
                // Retry errors
                zk.exists(znode, true, this, null);
                return;
        }

        byte b[] = null;
        if (exists) {
            try {
                b = zk.getData(znode, false, null);
            } catch (KeeperException e) {
                // We don't need to worry about recovering now. The watch
                // callbacks will kick off any exception handling
                e.printStackTrace();
            } catch (InterruptedException e) {
                return;
            }
        }
        if ((b == null && b != prevData)
                || (b != null && !Arrays.equals(prevData, b))) {
            listener.exists(b);
            prevData = b;
        }
    }

rc是这次exists查询的返回码,具体含义看变量名也大概知道。 最后一个if里面的listenerDataMonitor里面的DataMonitorListener接口,我们的Executor实现了它(其实就是把结果从DataMonitor再传出去)。这里实现的效果就是如果结点存在的话,就把结点的数据传出去,不然就传个null,当然如果数据没有变化(用prevData记录了上一次的数据)那么就什么都不做。

于是第一次你的exec就执行了。

这时候,我用zkCli去删除了这个监视的结点(我设置的是/zk_test)或者重新设置了data,那么我的Watcher都会收到通知(这里我就很奇怪,为什么不是我关心的事情,比如exists关心的应该是createdelete
源码里是这么说的

java/**
     * Return the stat of the node of the given path. Return null if no such a
     * node exists.
     * <p>
     * If the watch is non-null and the call is successful (no exception is thrown),
     * a watch will be left on the node with the given path. The watch will be
     * triggered by a successful operation that creates/delete the node or sets
     * the data on the node.
     *
     * @param path the node path
     * @param watcher explicit watcher
     * @return the stat of the node of the given path; return null if no such a
     *         node exists.
     * @throws KeeperException If the server signals an error
     * @throws InterruptedException If the server transaction is interrupted.
     * @throws IllegalArgumentException if an invalid path is specified
     */
    public Stat exists(final String path, Watcher watcher)
        throws KeeperException, InterruptedException;

OK,我刚刚设置的Watcher是Executor,实际上Executor是把这个事情传给了DataMonitor,看process函数

javapublic void process(WatchedEvent event) {
    dm.process(event);
}

DataMonitor中的process

javapublic void process(WatchedEvent event) {
    String path = event.getPath();
    if (event.getType() == Event.EventType.None) {
        // We are are being told that the state of the
        // connection has changed
        switch (event.getState()) {
            case SyncConnected:
                logger.debug("SyncConnected");
                // In this particular example we don't need to do anything
                // here - watches are automatically re-registered with
                // server and any watches triggered while the client was
                // disconnected will be delivered (in order of course)
                break;
            case Expired:
                logger.debug("expired");
                // It's all over
                dead = true;
                listener.closing(Code.SESSIONEXPIRED.intValue());
                break;
        }
    } else {
        if (path != null && path.equals(znode)) {
            // Something has changed on the node, let's find out
            zk.exists(znode, true, this, null);
        }
    }
    if (chainedWatcher != null) {
        chainedWatcher.process(event);
    }
}

关键的是第一个if/else, else中处理的事件是我们最关心的和znode相关的,我们发现znode发生了变化(create,delete,dataChanged,childChanged)就再次调用exists去进行我们结点关心的操作。

好了,说了那么多的代码其实都是在说框架里的东西。这个demo里最让人不关心的是业务逻辑部分么= =
请看Executor里面的exists方法

java public void exists(byte[] data) {
    if (data == null) {
        if (child != null) {
            System.out.println("Killing process");
            child.destroy();
            try {
                child.waitFor();
            } catch (InterruptedException e) {
            }
        }
        child = null;
    } else {
        if (child != null) {
            System.out.println("Stopping child");
            child.destroy();
            try {
                child.waitFor();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        try {
            FileOutputStream fos = new FileOutputStream(filename);
            fos.write(data);
            fos.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
        try {
            System.out.println("Starting child");
            child = Runtime.getRuntime().exec(exec);
            new StreamWriter(child.getInputStream(), System.out);
            new StreamWriter(child.getErrorStream(), System.err);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

就是根据我们的要的data来执行我们要的代码之类的。具体大家可以调试一下。

两篇文章应该够入个门了= =,接下去我会做一些跟自己工作上业务结合的东西。

真正写代码的时候,样例、文档、源码三者联合起来看,你才能看的出作者的设计目的,但是用多了才能知道作者的设计初衷。

啊,如果有人可以看完这篇blog,在下感谢您的大力支持。
有刺尽管挑= = 大家和谐讨论。应该还有三的。