当将9百万行批量写入12节点cassandra(2.1.2)集群时,我遇到了spark-cassandra-connector(1.0.4,1.1.0)的问题。我写的是一致性ALL并且读取一致性ONE,但读取的行数每次都不同于900万(8.865.753,8.753.213等)。
我检查了连接器的代码,发现没有问题。然后,我决定编写自己的应用程序,独立于spark和连接器,来调查问题(唯一的依赖是datastax-driver-code version 2.1.3)。
完整代码,启动脚本和配置文件现在可以 在github上找到。
在伪代码中,我写了两个不同版本的应用程序,同步一个:
try (Session session = cluster.connect()) {
String cql = "insert into <<a table with 9 normal fields and 2 collections>>";
PreparedStatement pstm = session.prepare(cql);
for(String partitionKey : keySource) {
// keySource is an Iterable<String> of partition keys
BoundStatement bound = pstm.bind(partitionKey /*, << plus the other parameters >> */);
bound.setConsistencyLevel(ConsistencyLevel.ALL);
session.execute(bound);
}
}
而异步的一个:
try (Session session = cluster.connect()) {
List<ResultSetFuture> futures = new LinkedList<ResultSetFuture>();
String cql = "insert into <<a table with 9 normal fields and 2 collections>>";
PreparedStatement pstm = session.prepare(cql);
for(String partitionKey : keySource) {
// keySource is an Iterable<String> of partition keys
while(futures.size()>=10 /* Max 10 concurrent writes */) {
// Wait for the first issued write to terminate
ResultSetFuture future = futures.get(0);
future.get();
futures.remove(0);
}
BoundStatement bound = pstm.bind(partitionKey /*, << plus the other parameters >> */);
bound.setConsistencyLevel(ConsistencyLevel.ALL);
futures.add(session.executeAsync(bound));
}
while(futures.size()>0) {
// Wait for the other write requests to terminate
ResultSetFuture future = futures.get(0);
future.get();
futures.remove(0);
}
}
最后一个类似于无批处理配置情况下连接器使用的那个。
除了负载很高外,应用程序的两个版本在所有情况下的工作方式都相同。
例如,当在9台机器(45个线程)上运行带有5个线程的同步版本时,向集群写入9百万行,我会在后续读取中找到所有行(使用spark-cassandra-connector)。
如果我运行异步版本,每台机器有1个线程(9个线程),执行速度要快得多,但我无法在后续读取中找到所有行(与spark-cassandra-connector相同的问题)。
执行期间代码没有抛出任何异常。
问题的原因是什么?
我添加了一些其他结果(感谢您的评论):
- 9台机器上有9个线程的异步版本,每个线程有5个并发写入器(45个并发写入器):没有问题
- 在9台计算机上同步90个线程的版本(每个JVM实例10个线程):没有问题
问题似乎开始出现在Async写入和大量并发写入器> 45和<= 90,所以我做了其他测试以确保发现是正确的:
- 用Result替换了ResultSetFuture的“get”方法 “getUninterruptibly”:同样的问题。
- 异步版本,9台机器上有18个线程,5个并发 每个线程的编写器(90个并发写入器): 没有问题。
最后的发现表明,大量并发写入器(90)不像第一次测试中预期的那样是个问题。问题是使用相同会话的大量异步写入。
在同一会话中有5个并发异步写入,问题不存在。如果我将并发写入的数量增加到10,则某些操作会在没有通知的情况下丢失。
如果在同一会话中同时发出多个(> 5)写入,似乎在Cassandra 2.1.2(或Cassandra Java驱动程序)中打破了异步写入。