问题 cassandra中的全表扫描问题


第一:我知道在Cassandra进行全面扫描并不是一个好主意,但是,目前,这就是我需要的。

当我开始寻找像这样做的东西时,我读到人们说不可能在卡桑德拉进行全面扫描而且他没有做这种事情。

不满意,我一直在寻找,直到找到这篇文章: http://www.myhowto.org/bigdata/2013/11/04/scanning-the-entire-cassandra-column-family-with-cql/

看起来很合理,我试一试。 因为我将只执行一次全扫描,时间和性能不是问题,我编写了查询并将其放在一个简单的Job中,以查找我想要的所有记录。从20亿行记录中,1000个是我预期的输出,但是,我只有100条记录。

我的工作:

public void run() {
    Cluster cluster = getConnection();
    Session session = cluster.connect("db");

    LOGGER.info("Starting ...");

    boolean run = true;
    int print = 0;

    while ( run ) {
        if (maxTokenReached(actualToken)) {
            LOGGER.info("Max Token Reached!");
            break;
        }
        ResultSet resultSet = session.execute(queryBuilder(actualToken));

        Iterator<Row> rows = resultSet.iterator();
        if ( !rows.hasNext()){
            break;
        }

        List<String> rowIds = new ArrayList<String>();

        while (rows.hasNext()) {
            Row row = rows.next();

            Long leadTime = row.getLong("my_column");
            if (myCondition(myCollumn)) {
                String rowId = row.getString("key");
                rowIds.add(rowId);
            }

            if (!rows.hasNext()) {
                Long token = row.getLong("token(rowid)");
                if (!rowIds.isEmpty()) {
                    LOGGER.info(String.format("Keys found! RowId's: %s ", rowIds));
                }
                actualToken = nextToken(token);
            }

        }

    }
    LOGGER.info("Done!");
    cluster.shutdown();
}

public boolean maxTokenReached(Long actualToken){
    return actualToken >= maxToken;
}

public String queryBuilder(Long nextRange) {
    return String.format("select token(key), key, my_column from mytable where token(key) >= %s limit 10000;", nextRange.toString());
}

public Long nextToken(Long token){
    return token + 1;
}

基本上我所做的是搜索允许的最小令牌并逐步进行到最后一次。

我不知道,但就像工作没有完全扫描完全扫描或我的查询只访问过一个节点或东西。我不知道我做错了什么,或者是不是真的可以进行全面扫描。

今天我有近2 TB的数据,在一个七个节点的集群中只有一个表。

有人已经处于这种情况或有一些建议吗?


10563
2018-04-24 01:04


起源

什么是'mytable'的键空间架构?是多次运行查询(因为while循环),最后一次查询可能会返回100而不是1000 - turbo
架构: pastebin.com/DyWAc1wa 。是的,查询运行多次并返回LIMIT子句上设置的所有行。 - bcfurtado


答案:


在Cassandra中进行全表扫描绝对是可能的 - 事实上,对Spark这样的事情来说很常见。然而,它通常不是“快速”,所以除非你知道你为什么要这样做,否则它是气馁的。对于您的实际问题:

1)如果您正在使用CQL,那么您几乎肯定会使用Murmur3分区程序,因此您的最小标记为-9223372036854775808(最大标记为9223372036854775808)。

2)你正在使用session.execute(),它将使用一个默认的一致性,这可能不会返回你的集群中的所有结果,特别是如果你也在ONE编写,我怀疑你可能会这样。将其提升为ALL,并使用预准备语句来加速CQL解析:

 public void run() {
     Cluster cluster = getConnection();
     Session session = cluster.connect("db");
     LOGGER.info("Starting ...");
     actualToken = -9223372036854775808;
     boolean run = true;
     int print = 0;

     while ( run ) {
         if (maxTokenReached(actualToken)) {
             LOGGER.info("Max Token Reached!");
             break;
         }
         SimpleStatement stmt = new SimpleStatement(queryBuilder(actualToken));
         stmt.setConsistencyLevel(ConsistencyLevel.ALL);
         ResultSet resultSet = session.execute(stmt);

         Iterator<Row> rows = resultSet.iterator();
         if ( !rows.hasNext()){
             break;
         }

         List<String> rowIds = new ArrayList<String>();

         while (rows.hasNext()) {
             Row row = rows.next();

             Long leadTime = row.getLong("my_column");
             if (myCondition(myCollumn)) {
                 String rowId = row.getString("key");
                 rowIds.add(rowId);
             }

             if (!rows.hasNext()) {
                 Long token = row.getLong("token(rowid)");
                 if (!rowIds.isEmpty()) {
                     LOGGER.info(String.format("Keys found! RowId's: %s ", rowIds));
                 }
             actualToken = nextToken(token);
             }
         }
      }
     LOGGER.info("Done!");
     cluster.shutdown(); 
  }

public boolean maxTokenReached(Long actualToken){
     return actualToken >= maxToken; 
 }

 public String queryBuilder(Long nextRange) {
     return String.format("select token(key), key, my_column from mytable where token(key) >= %s limit 10000;", nextRange.toString()); 
 }

 public Long nextToken(Long token) {
     return token + 1; 
 }

5
2018-04-28 03:49



嘿杰夫,首先,谢谢你的帮助!我正在使用带有Murmur的CQL,我知道最大和最小令牌值。今天的工作会收到他们将搜索行的一系列令牌。这样我可以在线程中抛出这些范围来加快速度。 - bcfurtado
其次,我实现了你的消化,但我与我所做的并没有多大区别,实际上,这项工作比第一次有所回报。但是,一旦我注意到机器的负载比以前低,则在作业运行的整个集群之间分配更多。在负载变高之前,只有特定的机器在不同的时间。 - bcfurtado
提高一致性会导致更多负载,因为它会查询更多副本以确保它不会丢失任何数据。要清楚:它返回了多少行,你期望它返回多少行? - Jeff Jirsa
究竟。我期待1000行的东西,只有100~200的东西。 - bcfurtado
你有没有运行SELECT COUNT(*)来计算。这使用内部分页,应该相当准确 - Jeff Jirsa


我强烈建议使用Spark - 即使在独立的应用程序中(即没有集群)。它将负责分区并逐个处理它们。死也易于使用:

https://github.com/datastax/spark-cassandra-connector 


2
2018-04-28 15:10





这是你需要做的常见事吗?还是一个案例?我同意这不是你想要定期做的事情,但我也有一个问题,我必须阅读ColumnFamily的所有行,我依靠 AllRowsReader配方 从 Astyanax客户。我看到您正在使用Datastax CQL驱动程序连接到您的群集,但如果您正在寻找的东西被证明有效,那么您可能无需使用Astyanax库来解决问题。

在我的情况下,我曾经阅读所有行键,然后我有另一个工作与我收集的键与ColumnFamily交互。

import com.netflix.astyanax.Keyspace;
import com.netflix.astyanax.model.ColumnFamily;
import com.netflix.astyanax.model.ConsistencyLevel;
import com.netflix.astyanax.recipes.reader.AllRowsReader;

import java.util.concurrent.CopyOnWriteArrayList;

...        

private final Keyspace keyspace;
private final ColumnFamily<String, byte[]> columnFamily;

public List<String> getAllKeys() throws Exception {

    final List<String> rowKeys = new CopyOnWriteArrayList<>();

    new AllRowsReader.Builder<>(keyspace, columnFamily).withColumnRange(null, null, false, 0)
        .withPartitioner(null) // this will use keyspace's partitioner
        .withConsistencyLevel(ConsistencyLevel.CL_ONE).forEachRow(row -> {
        if (row == null) {
            return true;
        }

        String key = row.getKey();

        rowKeys.add(key);

        return true;
    }).build().call();

    return rowKeys;
}

有几个不同的配置选项可以在多个线程和许多其他东西中运行它,比如我说我只是在我的代码中运行了一次并且工作得非常好,如果你遇到问题试图让它工作,我很乐意提供帮助。

希望这可以帮助,

何塞路易斯


1
2018-04-30 06:28





如果您经常需要对Cassandra表进行全表扫描,比如Spark中的分析,那么我强烈建议您考虑使用读取优化的数据模型来存储数据。你可以退房 http://github.com/tuplejump/FiloDB 有关Cassandra上的读取优化设置的示例。


1
2018-05-16 14:51