温馨提示:本文翻译自stackoverflow.com,查看原文请点击:node.js - Pipe data from Writable to Readable
node.js stream typescript

node.js - 将数据从可写传输到可读

发布于 2020-04-23 17:43:26

我正在努力从SFTP服务器获取文件,并使用其SDK将数据通过管道传递到Box.com。Box SDK将可读流作为参数来上传文件。我编写的从sftp服务器获取文件的代码使用npm模块ssh2-sftp-client

我遇到的问题是,可写流是流的“行尾”,除非您使用Transform之类的东西,Transform是双工的,并且实现了读写功能。下面是我正在使用的代码。因为我正在为客户做这件事,所以我故意遗漏了一些不必要的东西。

下面是sftp类的方法

async getFile(filepath: string): Promise<Readable> {
  logger.info(`Fetching file: ${filepath}`);
  const writable = new Writable();
  const stream = new PassThrough();
  await this.client.get(filepath, writable);
  return writable.pipe(stream);
}

获取文件并尝试通过管道传递到Box(已授权BoxSDK客户端的实例)的实现。

try {
  for (const filename of filenames) {
    const stream: Readable = await tmsClient.getFile(
      'redacted' + filename,
    );

    logger.info(`Piping ${filename} to Box...`);
    await box.createFile(filename, 'redacted', stream);
    logger.info(`${filename} successfully downloaded`);
  }
} catch (error) {
  logger.error(`Failed to move files: ${error}`);
}

我并不是非常精通流派,但是根据我的研究,我认为这应该在理论上可行。

我还尝试了ssh客户端返回缓冲区的实现,然后尝试将该缓冲区作为可读流传递。通过此实现,尽管我不断从Box sdk中收到错误消息,该错误导致流意外终止。

async getFile(filepath: string): Promise<Readable> {
  logger.info(`Fetching file: ${filepath}`);
  const stream = new Readable();
  const buffer = (await this.client.get(filepath)) as Buffer;
  stream._read = (): void => {
    stream.push(buffer);
    stream.push(null);
  };
  return stream;
}

和错误消息:2020-02-06 15:24:57 error: Failed to move files: Error: Unexpected API Response [400 Bad Request] bad_request - Stream ended unexpectedly

任何见解都将不胜感激!

查看更多

提问者
SamG
被浏览
31
SamG 2020-02-08 02:51

因此,在对此进行了更多研究之后,事实证明,问题实际上出在Node的Box sdk上。sdk在实际完成之前就终止了流的主体。这是因为他们在后台使用的request库需要一个content-length标头来发送较大的有效负载。没有适当的设置,它将在发送有效负载之前继续终止流。

他们在Box社区论坛上建议将属性添加到流原型中,以将内容传递给基础请求库。我对此表示强烈反对,因为这不是正确的解决方法。Box SDK需要提供一种以字节为单位传递内容长度的方法。作为其API的用户,我不必操纵其基础依赖关系。我将用他们的SDK来解决一个问题,并希望能解决此问题。

希望这对其他人有用!