问题 在Cassandra中,异步写入似乎被打破了


当将9百万行批量写入12节点cassandra(2.1.2)集群时,我遇到了spark-cassandra-connector(1.0.4,1.1.0)的问题。我写的是一致性ALL并且读取一致性ONE,但读取的行数每次都不同于900万(8.865.753,8.753.213等)。

我检查了连接器的代码,发现没有问题。然后,我决定编写自己的应用程序,独立于spark和连接器,来调查问题(唯一的依赖是datastax-driver-code version 2.1.3)。

完整代码,启动脚本和配置文件现在可以 在github上找到

在伪代码中,我写了两个不同版本的应用程序,同步一个:

try (Session session = cluster.connect()) {

    String cql = "insert into <<a table with 9 normal fields and 2 collections>>";
    PreparedStatement pstm = session.prepare(cql);

    for(String partitionKey : keySource) {
        // keySource is an Iterable<String> of partition keys

        BoundStatement bound = pstm.bind(partitionKey /*, << plus the other parameters >> */);
        bound.setConsistencyLevel(ConsistencyLevel.ALL);

        session.execute(bound);
    }

}

而异步的一个:

try (Session session = cluster.connect()) {

    List<ResultSetFuture> futures = new LinkedList<ResultSetFuture>();

    String cql = "insert into <<a table with 9 normal fields and 2 collections>>";
    PreparedStatement pstm = session.prepare(cql);

    for(String partitionKey : keySource) {
        // keySource is an Iterable<String> of partition keys

        while(futures.size()>=10 /* Max 10 concurrent writes */) {
            // Wait for the first issued write to terminate
            ResultSetFuture future = futures.get(0);
            future.get();
            futures.remove(0);
        }

        BoundStatement bound = pstm.bind(partitionKey /*, << plus the other parameters >> */);
        bound.setConsistencyLevel(ConsistencyLevel.ALL);

        futures.add(session.executeAsync(bound));
    }

    while(futures.size()>0) {
        // Wait for the other write requests to terminate
        ResultSetFuture future = futures.get(0);
        future.get();
        futures.remove(0);
    }
}

最后一个类似于无批处理配置情况下连接器使用的那个。

除了负载很高外,应用程序的两个版本在所有情况下的工作方式都相同。

例如,当在9台机器(45个线程)上运行带有5个线程的同步版本时,向集群写入9百万行,我会在后续读取中找到所有行(使用spark-cassandra-connector)。

如果我运行异步版本,每台机器有1个线程(9个线程),执行速度要快得多,但我无法在后续读取中找到所有行(与spark-cassandra-connector相同的问题)。

执行期间代码没有抛出任何异常。

问题的原因是什么?

我添加了一些其他结果(感谢您的评论):

  • 9台机器上有9个线程的异步版本,每个线程有5个并发写入器(45个并发写入器):没有问题
  • 在9台计算机上同步90个线程的版本(每个JVM实例10个线程):没有问题

问题似乎开始出现在Async写入和大量并发写入器> 45和<= 90,所以我做了其他测试以确保发现是正确的:

  • 用Result替换了ResultSetFuture的“get”方法 “getUninterruptibly”:同样的问题。
  • 异步版本,9台机器上有18个线程,5个并发 每个线程的编写器(90个并发写入器): 没有问题

最后的发现表明,大量并发写入器(90)不像第一次测试中预期的那样是个问题。问题是使用相同会话的大量异步写入。

在同一会话中有5个并发异步写入,问题不存在。如果我将并发写入的数量增加到10,则某些操作会在没有通知的情况下丢失。

如果在同一会话中同时发出多个(> 5)写入,似乎在Cassandra 2.1.2(或Cassandra Java驱动程序)中打破了异步写入。


8233
2017-12-27 12:28


起源

您是否考虑过使用BATCH语句而不是单独发送每个更新?我知道这并没有解决您遇到的问题,但它似乎更适合进行批量插入。 - Onots
是的,批处理语句也存在问题。我没有使用批次,因为它们受到spark cassandra连接器中另一个问题的影响,该问题已在最新版本的连接器中修复。我使用了修复程序的自编译版本的连接器,发现了同样的问题。 - Nicola Ferraro
我已经添加了所有代码和配置文件 github上 - Nicola Ferraro


答案:


尼古拉和我本周末通过电子邮件进行了交流,并认为我会在这里提供我当前理论的最新信息。我看了看 github项目 Nicola在EC2上分享并试验了一个8节点集群。

我能够重现2.1.2的问题,但确实观察到一段时间后我可以重新执行spark工作并且返回了所有900万行。

我似乎注意到的是,当节点处于压缩状态时,我没有获得所有900万行。我一时兴起看了看 更改2.1的日志 并观察到一个问题 CASSANDRA-8429 - “在压实过程中某些键无法读取” 这可以解释这个问题。

看到问题已经解决,目标是2.1.3,我重新对cassandra-2.1分支进行测试,并在压缩活动发生时运行计数工作并返回900万行。

我想对此进行一些实验,因为我对cassandra-2.1分支的测试相当有限,压缩活动可能纯属巧合,但我希望这可以解释这些问题。


6
2018-01-12 04:10



没有使用2.1.3进行测试,但我可以确认只有在自动压缩过程中才会出现问题,只有高级压缩策略出现问题。通过尺寸分层压实或平稳压实,Cassandra效果很好。 - Nicola Ferraro


一些可能性:

  • 您的异步示例是使用9个线程一次发出10次写入,因此每次执行90次,而同步示例一次只执行45次写入,因此我会尝试将异步降低到相同的速率,因此这是一个苹果对苹果的比较。

    您没有说明如何使用异步方法检查异常。我看到你正在使用 future.get(),但建议使用 getUninterruptibly() 如文件中所述:

    等待查询返回并返回其结果。这个方法是   通常比Future.get()更方便,因为它:等待   结果不可中断,因此不会抛出InterruptedException。   返回有意义的异常,而不是必须处理   为ExecutionException。因此,它是获得未来的首选方式   结果。

    因此,您可能没有看到异步示例中出现的写异常。

  • 另一种不太可能的方法是,您的keySource由于某种原因返回重复的分区键,因此当您执行写操作时,其中一些最终会覆盖先前插入的行,并且不会增加行计数。但这也应该影响同步版本,所以这就是为什么我说这不太可能。

    我会尝试编写小于900万的小集,并且速度很慢,看看问题是否只是在一定数量的插入或特定插入速率下才会发生。如果插入的数量有影响,那么我怀疑数据中的行键有问题。如果插入速率有影响,那么我怀疑热点会导致写入超时错误。

  • 要检查的另一件事是Cassandra日志文件,以查看是否有任何异常报告。

附录:2014年12月30日

我尝试使用Cassandra 2.1.2和驱动程序2.1.3的示例代码重现症状。我使用了一个带有递增数字键的单个表,以便我可以看到数据中的间隙。我做了很多异步插入(每个线程在10个线程中一次30个,全部使用一个全局会话)。然后我做了表的“select count(*)”,实际上它报告的表中的行数少于预期。然后我做了一个“select *”并将行转储到一个文件中并检查是否缺少密钥。它们似乎是随机分布的,但是当我查询那些丢失的单个行时,事实证明它们实际上存在于表中。然后我注意到每次做“选择计数(*)”时,它都会返回一个不同的数字,所以它似乎给出了表格中行数的近似值而不是实际数字。

因此,我修改了测试程序以在所有写入之后执行回读阶段,因为我知道所有键值。当我这样做时,所有异步写入都出现在表中。

所以我的问题是,你在完成写作后如何检查表中的行数?您是在查询每个单独的键值还是使用某种操作,例如“select *”?如果是后者,那似乎可以提供大部分行,但不是全部,所以也许您的数据实际存在。由于没有抛出异常,似乎表明写入都是成功的。另一个问题是,你确定你的关键值对于所有900万行是唯一的。


6
2017-12-27 16:02



我没有使用count(*)因为它从一开始就向我显示了错误的结果。我使用了两种方法来计算行:1)Spark-cassandra-connector,它在令牌环空间中执行多个查询并总结结果; 2)带有hadoop mapreduce API的Spark。我注意到这两种方法的行为相同。 - Nicola Ferraro
我也确定行id不同。我多次检查它们,当我在启动脚本中将参数“Async”更改为“Sync”时,它可以工作,因此行ID是正常的。我也经历过你正在描述的关于阅读一行的行为。找到单行的事实可能是由于:1)读取修复,如果它们在群集中启用2)每次读取行时,都可以从不同的节点读取相关的计数(*)。因为你写的是一致性ALL,所以不应该这样。 - Nicola Ferraro
您可能希望尝试为测试设置复制因子1,并查看是否可以找到异步写入后实际丢失的行。通过单个键读回行是确定行是否缺失的确定性测试,因为这些其他方法似乎在计算中而不是缺少行。如果您可以发布更多代码,我可以尝试重现该症状,但到目前为止,当我使用一个会话执行大量异步写入时,它们都存在于表中。 - Jim Meyer