我正在尝试创建一个持久的ZNode并存储我已处理的特定文件的行数。创建的工作原理与应该的一样,从节点读取数据也是如此,但是如果在同一代码中,则删除将不起作用。我会解释我的意思。
我已创建函数:
setOrCreateFileCheckpoint(fileName:String,lineNumber:Int)
:-检查ZNode是否存在,如果不存在则创建它,并将存储的值设置为lineNumbergetFileCheckpoint(fileName:String):-返回存储在ZNode中的值deleteFileCheckpoint(fileName:String):-删除ZNode下面是这三个代码:
/* updates or creates a checkpoint for a file being processed */ def setOrCreateFileCheckpoint(fileName: String, lineNumber: Int): Unit = { val fileCheckpointPath = checkpointPoolPath + "/" +fileName val zk = getZookeeper val zkCuratorClient = getZookeeperCuratorClient if ( zk.exists(fileCheckpointPath, false) == null) { val node = new PersistentNode(zkCuratorClient, CreateMode.PERSISTENT, false, fileCheckpointPath, lineNumber.toString.getBytes()) node.start() } else zk.setData(fileCheckpointPath, lineNumber.toString.getBytes(), -1) } /* gets checkpoint for a file */ def getFileCheckpoint(fileName: String): Int = { val fileCheckpointPath = checkpointPoolPath + "/" +fileName val zk = getZookeeper val zkCuratorClient = getZookeeperCuratorClient if ( zk.exists(fileCheckpointPath, false) != null) new String(zk.getData(fileCheckpointPath, false, null)).toInt else 0 } /* deletes the file checkpoint so that we don't keep accumulating zNodes on the zookeeper */ def deleteFileCheckpoint(fileName: String): Unit = { val fileCheckpointPath = checkpointPoolPath + "/" +fileName val zk = getZookeeper if ( zk.exists(fileCheckpointPath, false) == null) { throw RuntimeException("Trying to delete checkpoint that doesn't exist for file: " + fileName) } else { /*println(zk.exists(fileCheckpointPath, false).getVersion) zk.delete(fileCheckpointPath, zk.exists(fileCheckpointPath, false).getVersion)*/ deleteChildren(zk, fileCheckpointPath, true) } }
以下是我正在测试的代码,并为之困惑:
ZookeeperUtility.setOrCreateFileCheckpoint("file1", 2000) //let's call it cre1 println(ZookeeperUtility.getFileCheckpoint("file1")) //let's call it get1 ZookeeperUtility.deleteFileCheckpoint("file1") //let's call it del1 println("del1") ZookeeperUtility.deleteFileCheckpoint("file1") //let's call in del2 println("del2")
Run 1:
Step1:我运行上面显示的代码
结果:在del2上遇到错误
Step2:注释掉cre1并再次运行代码
结果:提取节点,给出正确的值,作为在del2上遇到的结果错误。这真是令人难以置信。我不明白为什么。该节点应该被删除。
Step3:仍注释cre1,与上一步相同,再次运行代码
结果:节点不存在,在get1处给出0,这意味着节点不存在。在del1处遇到错误。在step2本身中应该发生什么?
Run2:
Step1:注释掉del2,运行代码
结果:创建节点,获取正确的数据,正常退出
Step2:注释掉cre1,运行代码
结果:从应该删除的节点获取值2000。正常退出
Step3:再次运行与step2相同的代码
[结果:获取0,在del1上遇到错误。
如果我一次只运行一次代码,如果我只一次创建代码,仅在下一次运行中获取代码,然后在运行后删除,那么一切都会按照应有的方式工作。我正要拔头发。
P.S。该代码是用Scala编写的,但是我正在使用Java API。 Scala似乎可以与Java类一起使用。
如果您查看了deleteFileCheckpoint
函数,我已经注释掉了一部分,我也尝试过这种方法。它具有完全相同的行为。我正在尝试创建一个持久的ZNode并存储我已处理的特定文件的行数。创建的工作与应该的一样,从节点读取数据也是如此,但是...
这真是令人难以置信。我不明白为什么。该节点应该被删除。