pull( self.getPullStream(ipfsPath, options), pull.asyncMap((file, cb) => { if (file.content) { pull( file.content, pull.collect((err, 拖车视频 buffers) => { if (err) { return cb(err) } file.content = Buffer.concat(buffers) cb(null, file) }) ) } else { cb(null, file) } }), pull.collect(callback) )上篇文章的内容,我们回忆到这里就结束了,下面我们仔细研究 streamBytes 函数及相关的深度遍历是如何实现的。
streamBytes 函数使用了 pull-traverse 类库提供深度优先、广度优先、叶子优先等算法,它的每个算法都返回一个 pull 类库的 through 流,新闻资讯这个流被它后面的流所调用。在这里使用深度优先算法,返回的流被 pull 类库的 map 流所调用,用于获取每一个元素。
深度优先算法的相关代码如下:
var once = exports.once = function (value) { return function (abort, cb) { if(abort) return cb(abort) if(value != null) { var _value = value; value = null cb(null, _value) } else cb(true) } }streamBytes 函数定义在 file.js 文件中,我们来看下它的内容:var depthFirst = exports.depthFirst = function (start, createStream) { var reads = [], ended
reads.unshift(once(start))
return function next (end, cb) { if(!reads.length) return cb(true) if(ended) return cb(ended)
reads[0](end, function (end, data) { if(end) { if(end !== true) { ended = end reads.shift()
while(reads.length) reads.shift()(end, function () {})
return cb(end) }
reads.shift() return next(null, cb) }
reads.unshift(createStream(data)) cb(end, data) }) } }
function streamBytes (dag, node, fileSize, offset, length) { if (offset === fileSize