问题 使用Node.js实时读取文件


我需要使用node.js实时读出正在写入文件的数据的最佳方法。麻烦的是,Node是一艘快速发展的船,它使寻找解决问题的最佳方法变得困难。

我想做的事
我有一个正在做某事的java进程,然后将它所做的事情的结果写入文本文件。它通常需要5分钟到5个小时才能运行,数据是全时写入的,并且可以达到一些相当高的吞吐率(大约1000线/秒)。

我想实时读取这个文件,然后使用节点聚合数据并将其写入套接字,在套接字上可以在客户端上绘制图形。

客户端,图形,套接字和聚合逻辑都已完成,但我对阅读文件的最佳方法感到困惑。

我尝试过(或者至少玩过)
FIFO  - 我可以告诉我的Java进程写入fifo并使用node读取它,这实际上是我们如何使用Perl实现这一点,但是因为其他所有东西都在节点中运行,所以将代码移植到底是有意义的。

Unix Sockets - 如上。

fs.watchFile  - 这会对我们需要的东西起作用吗?

fs.createReadStream  - 这比watchFile更好吗?

fs & tail -f  - 看起来像是一个黑客。

实际上,我的问题是什么
我倾向于使用Unix套接字,这似乎是最快的选择。但节点是否具有更好的内置功能,可以实时读取fs中的文件?


2981
2018-06-27 11:18


起源



答案:


如果您希望将文件保留为数据的持久存储,以防止在系统崩溃或网络中的某个成员运行进程死亡时丢失流,您仍然可以继续写入文件并阅读从中。

如果您不需要将此文件作为Java进程生成结果的持久存储,那么使用Unix套接字对于简易性和性能都要好得多。

fs.watchFile() 不是你需要的,因为它适用于文件统计信息,因为文件系统会报告它,因为你想要读取已经写入的文件,这不是你想要的。

简短更新: 我很遗憾地意识到,虽然我有人指责 fs.watchFile() 为了使用上一段中的文件统计信息,我在下面的示例代码中完成了同样的事情!虽然我已经警告读者“要小心!”因为我在几分钟内写完了它,甚至没有测试好;不过,使用它可以做得更好 fs.watch() 代替 watchFile 要么 fstatSync 如果底层系统支持它。

对于从文件中读取/写入,我在下面的内容中写了以下内容以获得乐趣:

测试-FS-writer.js:[您在Java进程中编写文件后不需要这样]

var fs = require('fs'),
    lineno=0;

var stream = fs.createWriteStream('test-read-write.txt', {flags:'a'});

stream.on('open', function() {
    console.log('Stream opened, will start writing in 2 secs');
    setInterval(function() { stream.write((++lineno)+' oi!\n'); }, 2000);
});

测试-FS-reader.js:[小心,这只是演示,检查错误的对象!]

var fs = require('fs'),
    bite_size = 256,
    readbytes = 0,
    file;

fs.open('test-read-write.txt', 'r', function(err, fd) { file = fd; readsome(); });

function readsome() {
    var stats = fs.fstatSync(file); // yes sometimes async does not make sense!
    if(stats.size<readbytes+1) {
        console.log('Hehe I am much faster than your writer..! I will sleep for a while, I deserve it!');
        setTimeout(readsome, 3000);
    }
    else {
        fs.read(file, new Buffer(bite_size), 0, bite_size, readbytes, processsome);
    }
}

function processsome(err, bytecount, buff) {
    console.log('Read', bytecount, 'and will process it now.');

    // Here we will process our incoming data:
        // Do whatever you need. Just be careful about not using beyond the bytecount in buff.
        console.log(buff.toString('utf-8', 0, bytecount));

    // So we continue reading from where we left:
    readbytes+=bytecount;
    process.nextTick(readsome);
}

您可以安全地避免使用 nextTick 并打电话 readsome() 直接代替。由于我们仍然在这里进行同步,因此在任何意义上都没有必要。我喜欢它。 :p

编辑 奥利弗劳埃德

以上面的例子,但扩展它来读取CSV数据给出:

var lastLineFeed,
    lineArray;
function processsome(err, bytecount, buff) {
    lastLineFeed = buff.toString('utf-8', 0, bytecount).lastIndexOf('\n');

    if(lastLineFeed > -1){

        // Split the buffer by line
        lineArray = buff.toString('utf-8', 0, bytecount).slice(0,lastLineFeed).split('\n');

        // Then split each line by comma
        for(i=0;i<lineArray.length;i++){
            // Add read rows to an array for use elsewhere
            valueArray.push(lineArray[i].split(','));
        }   

        // Set a new position to read from
        readbytes+=lastLineFeed+1;
    } else {
        // No complete lines were read
        readbytes+=bytecount;
    }
    process.nextTick(readFile);
}

7
2018-06-27 18:47



这是一个直接解决我的问题的好例子。它需要增强,但一次只处理一行,但可以说这是一件好事; node缺少现有的fs接口意味着它是完全可定制的,所以即使我必须编写额外的代码,我也可以完全满足我的需求。 - Oliver Lloyd
我将上面的示例扩展为使用CSV文件。 - Oliver Lloyd
当作为节点<test-fs-reader.js>运行时,这绝对有效但是如何将此代码放在app.js中并在html页面中获得结果? - usersam


答案:


如果您希望将文件保留为数据的持久存储,以防止在系统崩溃或网络中的某个成员运行进程死亡时丢失流,您仍然可以继续写入文件并阅读从中。

如果您不需要将此文件作为Java进程生成结果的持久存储,那么使用Unix套接字对于简易性和性能都要好得多。

fs.watchFile() 不是你需要的,因为它适用于文件统计信息,因为文件系统会报告它,因为你想要读取已经写入的文件,这不是你想要的。

简短更新: 我很遗憾地意识到,虽然我有人指责 fs.watchFile() 为了使用上一段中的文件统计信息,我在下面的示例代码中完成了同样的事情!虽然我已经警告读者“要小心!”因为我在几分钟内写完了它,甚至没有测试好;不过,使用它可以做得更好 fs.watch() 代替 watchFile 要么 fstatSync 如果底层系统支持它。

对于从文件中读取/写入,我在下面的内容中写了以下内容以获得乐趣:

测试-FS-writer.js:[您在Java进程中编写文件后不需要这样]

var fs = require('fs'),
    lineno=0;

var stream = fs.createWriteStream('test-read-write.txt', {flags:'a'});

stream.on('open', function() {
    console.log('Stream opened, will start writing in 2 secs');
    setInterval(function() { stream.write((++lineno)+' oi!\n'); }, 2000);
});

测试-FS-reader.js:[小心,这只是演示,检查错误的对象!]

var fs = require('fs'),
    bite_size = 256,
    readbytes = 0,
    file;

fs.open('test-read-write.txt', 'r', function(err, fd) { file = fd; readsome(); });

function readsome() {
    var stats = fs.fstatSync(file); // yes sometimes async does not make sense!
    if(stats.size<readbytes+1) {
        console.log('Hehe I am much faster than your writer..! I will sleep for a while, I deserve it!');
        setTimeout(readsome, 3000);
    }
    else {
        fs.read(file, new Buffer(bite_size), 0, bite_size, readbytes, processsome);
    }
}

function processsome(err, bytecount, buff) {
    console.log('Read', bytecount, 'and will process it now.');

    // Here we will process our incoming data:
        // Do whatever you need. Just be careful about not using beyond the bytecount in buff.
        console.log(buff.toString('utf-8', 0, bytecount));

    // So we continue reading from where we left:
    readbytes+=bytecount;
    process.nextTick(readsome);
}

您可以安全地避免使用 nextTick 并打电话 readsome() 直接代替。由于我们仍然在这里进行同步,因此在任何意义上都没有必要。我喜欢它。 :p

编辑 奥利弗劳埃德

以上面的例子,但扩展它来读取CSV数据给出:

var lastLineFeed,
    lineArray;
function processsome(err, bytecount, buff) {
    lastLineFeed = buff.toString('utf-8', 0, bytecount).lastIndexOf('\n');

    if(lastLineFeed > -1){

        // Split the buffer by line
        lineArray = buff.toString('utf-8', 0, bytecount).slice(0,lastLineFeed).split('\n');

        // Then split each line by comma
        for(i=0;i<lineArray.length;i++){
            // Add read rows to an array for use elsewhere
            valueArray.push(lineArray[i].split(','));
        }   

        // Set a new position to read from
        readbytes+=lastLineFeed+1;
    } else {
        // No complete lines were read
        readbytes+=bytecount;
    }
    process.nextTick(readFile);
}

7
2018-06-27 18:47



这是一个直接解决我的问题的好例子。它需要增强,但一次只处理一行,但可以说这是一件好事; node缺少现有的fs接口意味着它是完全可定制的,所以即使我必须编写额外的代码,我也可以完全满足我的需求。 - Oliver Lloyd
我将上面的示例扩展为使用CSV文件。 - Oliver Lloyd
当作为节点<test-fs-reader.js>运行时,这绝对有效但是如何将此代码放在app.js中并在html页面中获得结果? - usersam


为什么你认为 tail -f 是一个黑客?

虽然弄清楚我发现了一个很好的例子,但我会做类似的事情。 node.js和WebSocket的实时在线活动监控示例:
http://blog.new-bamboo.co.uk/2009/12/7/real-time-online-activity-monitor-example-with-node-js-and-websocket

只是为了让这个答案完整,我给你写了一个在0.8.0下运行的示例代码 - (http服务器可能是黑客)。

子进程是用tail生成的,因为子进程是一个带有三个流的EventEmitter(在我们的例子中我们使用stdout)你可以添加一个监听器 on

文件名: tailServer.js

用法: node tailServer /var/log/filename.log

var http = require("http");
var filename = process.argv[2];


if (!filename)
    return console.log("Usage: node tailServer filename");

var spawn = require('child_process').spawn;
var tail = spawn('tail', ['-f', filename]);

http.createServer(function (request, response) {
    console.log('request starting...');

    response.writeHead(200, {'Content-Type': 'text/plain' });

    tail.stdout.on('data', function (data) {
      response.write('' + data);                
    });
}).listen(8088);

console.log('Server running at http://127.0.0.1:8088/');

4
2018-06-27 18:05



我对tail -f的关注是它要求在写入文件之前读取进程是活动的,如果不是数据丢失的话。我的用例是这样的,读取可能在数据写入很久之后发生。更新为+1的+1虽然这是一个很好的解决方案,可以从同一个源控制写入和读取。 - Oliver Lloyd
watchFile也是事件驱动的,但根据文档很难稳定。上面的示例通过在高级代码中轮询来处理文件更改。对我来说,这看起来像一个黑客。但是,只要它对你有用,那么做到这一点很好。否则你可以 touch 该文件,如果它不存在,你不会丢失任何数据,你可以使用 wc -l message.text | awk '{print $1}' 并把它交给 tail -f -n - vik


这个模块是@hasanyasin建议的原则的实现:

https://github.com/felixge/node-growing-file


1
2017-07-05 00:12



谢谢,这看起来它在这里工作得很好而且felixge的其他项目很扎实,所以我很高兴尝试这个模块。 - Oliver Lloyd


我从@hasanyasin那里得到了答案并将其包装成模块化的承诺。基本思想是传递一个文件和一个处理函数,它对从文件中读取的字符串化缓冲区执行某些操作。如果处理函数返回true,则文件将停止被读取。如果处理程序没有足够快地返回true,您还可以设置一个超时,该超时将终止读取。

如果由于超时调用resolve(),则promiser将返回true,否则返回false。

有关用法示例,请参见底部。

// https://stackoverflow.com/a/11233045

var fs = require('fs');
var Promise = require('promise');

class liveReaderPromiseMe {
    constructor(file, buffStringHandler, opts) {
        /*
            var opts = {
                starting_position: 0,
                byte_size: 256,
                check_for_bytes_every_ms: 3000,
                no_handler_resolution_timeout_ms: null
            };
        */

        if (file == null) {
            throw new Error("file arg must be present");
        } else {
            this.file = file;
        }

        if (buffStringHandler == null) {
            throw new Error("buffStringHandler arg must be present");
        } else {
            this.buffStringHandler = buffStringHandler;
        }

        if (opts == null) {
            opts = {};
        }

        if (opts.starting_position == null) {
            this.current_position = 0;
        } else {
            this.current_position = opts.starting_position;
        }

        if (opts.byte_size == null) {
            this.byte_size = 256;
        } else {
            this.byte_size = opts.byte_size;
        }

        if (opts.check_for_bytes_every_ms == null) {
            this.check_for_bytes_every_ms = 3000;
        } else {
            this.check_for_bytes_every_ms = opts.check_for_bytes_every_ms;
        }

        if (opts.no_handler_resolution_timeout_ms == null) {
            this.no_handler_resolution_timeout_ms = null;
        } else {
            this.no_handler_resolution_timeout_ms = opts.no_handler_resolution_timeout_ms;
        }
    }


    startHandlerTimeout() {
        if (this.no_handler_resolution_timeout_ms && (this._handlerTimer == null)) {
            var that = this;
            this._handlerTimer = setTimeout(
                function() {
                    that._is_handler_timed_out = true;
                },
                this.no_handler_resolution_timeout_ms
            );
        }
    }

    clearHandlerTimeout() {
        if (this._handlerTimer != null) {
            clearTimeout(this._handlerTimer);
            this._handlerTimer = null;
        }
        this._is_handler_timed_out = false;
    }

    isHandlerTimedOut() {
        return !!this._is_handler_timed_out;
    }


    fsReadCallback(err, bytecount, buff) {
        try {
            if (err) {
                throw err;
            } else {
                this.current_position += bytecount;
                var buff_str = buff.toString('utf-8', 0, bytecount);

                var that = this;

                Promise.resolve().then(function() {
                    return that.buffStringHandler(buff_str);
                }).then(function(is_handler_resolved) {
                    if (is_handler_resolved) {
                        that.resolve(false);
                    } else {
                        process.nextTick(that.doReading.bind(that));
                    }
                }).catch(function(err) {
                    that.reject(err);
                });
            }
        } catch(err) {
            this.reject(err);
        }
    }

    fsRead(bytecount) {
        fs.read(
            this.file,
            new Buffer(bytecount),
            0,
            bytecount,
            this.current_position,
            this.fsReadCallback.bind(this)
        );
    }

    doReading() {
        if (this.isHandlerTimedOut()) {
            return this.resolve(true);
        } 

        var max_next_bytes = fs.fstatSync(this.file).size - this.current_position;
        if (max_next_bytes) {
            this.fsRead( (this.byte_size > max_next_bytes) ? max_next_bytes : this.byte_size );
        } else {
            setTimeout(this.doReading.bind(this), this.check_for_bytes_every_ms);
        }
    }


    promiser() {
        var that = this;
        return new Promise(function(resolve, reject) {
            that.resolve = resolve;
            that.reject = reject;
            that.doReading();
            that.startHandlerTimeout();
        }).then(function(was_resolved_by_timeout) {
            that.clearHandlerTimeout();
            return was_resolved_by_timeout;
        });
    }
}


module.exports = function(file, buffStringHandler, opts) {
    try {
        var live_reader = new liveReaderPromiseMe(file, buffStringHandler, opts);
        return live_reader.promiser();
    } catch(err) {
        return Promise.reject(err);
    }
};

然后使用上面这样的代码:

var fs = require('fs');
var path = require('path');
var Promise = require('promise');
var liveReadAppendingFilePromiser = require('./path/to/liveReadAppendingFilePromiser');

var ending_str = '_THIS_IS_THE_END_';
var test_path = path.join('E:/tmp/test.txt');

var s_list = [];
var buffStringHandler = function(s) {
    s_list.push(s);
    var tmp = s_list.join('');
    if (-1 !== tmp.indexOf(ending_str)) {
        // if this return never occurs, then the file will be read until no_handler_resolution_timeout_ms
        // by default, no_handler_resolution_timeout_ms is null, so read will continue forever until this function returns something that evaluates to true
        return true;
        // you can also return a promise:
        //  return Promise.resolve().then(function() { return true; } );
    }
};

var appender = fs.openSync(test_path, 'a');
try {
    var reader = fs.openSync(test_path, 'r');
    try {
        var options = {
            starting_position: 0,
            byte_size: 256,
            check_for_bytes_every_ms: 3000,
            no_handler_resolution_timeout_ms: 10000,
        };

        liveReadAppendingFilePromiser(reader, buffStringHandler, options)
        .then(function(did_reader_time_out) {
            console.log('reader timed out: ', did_reader_time_out);
            console.log(s_list.join(''));
        }).catch(function(err) {
            console.error('bad stuff: ', err);
        }).then(function() {
            fs.closeSync(appender);
            fs.closeSync(reader);
        });

        fs.write(appender, '\ncheck it out, I am a string');
        fs.write(appender, '\nwho killed kenny');
        //fs.write(appender, ending_str);
    } catch(err) {
        fs.closeSync(reader);
        console.log('err1');
        throw err;
    }
} catch(err) {
    fs.closeSync(appender);
        console.log('err2');
    throw err;
}

0
2018-06-14 05:55