Warm tip: This article is reproduced from serverfault.com, please click

javascript-节点JS:将响应对象传递到服务器端事件的Bull队列

(javascript - Node JS: Passing Response Object to Bull Queue for Server Side Events)

发布于 2020-11-28 07:56:28

我陷于架构决定。我有Node + Express应用程序,它有一个用于上传文件的API。上传完成后,将关闭响应,并在Bull Bull Queue + Redis的帮助下,通过FFMPEG逐批处理上传的文件。此结构运行良好,但最近我开始对服务器端事件进行测试,以向最终用户提供有关处理的更新。但是我无法将响应对象传递给Bull Queue,以将常规更新从服务器写入用户。

1.进口

import childProcess from 'child_process';
import Bull from 'bull'
const Queue = new Bull('background_job', {redis: {port: process.env.port, host: process.env.host, password: process.env.password}});

2.上传功能

const uploadVideo = async(req, res) => {
    try{
        const result = await authUser(req);
        const result2 = await checkUploadFile(result);
        const result3 = await insertPost(result2, res);
        await Queue.add(result3.data, result3.opts)
    } catch(err){
        res.status(403).send(err);
    }
}

3.承诺

const authUser = (req) => {
    return new Promise((resolve, reject) => {
      //do some work
    })
}

const checkUploadFile = (result) => {
    return new Promise((resolve, reject) => {
      //do some more work
    })
}

const insertPost= (result, res) => {
    return new Promise((resolve, reject) => {
      //do final work
       ...........
      //preparing server side events
       const headers = {
            'Content-Type': 'text/event-stream',
            'Connection': 'keep-alive',
            'Cache-Control': 'no-cache',
            'Access-Control-Allow-Origin': '*'
        };
        res.writeHead(200, headers);
        res.write(JSON.stringify({status: true, id: 1})); //testing server side events for the first time

        //Let's continue to Bull
        const data = {res: res} <- error here: TypeError: Converting circular structure to JSON 
        const opts = {removeOnComplete: true, removeOnFail: true}
        resolve({data: data, opts: opts});
    })
}

4.排队过程

Queue.process((job, done) => {
    const res = job.data.res
    childProcess.execFile('someScript.sh', [`some`, `arguments`], { stdio: ['pipe', 'pipe', 'ignore']}, (err, stderr, stdout) => {
        if(err){
            done(new Error("Failed: " + err))
            res.write(JSON.stringify({status: true, id: 2})); //here using SSE
            res.end()
        } else {
            done()
            res.write(JSON.stringify({status: false})); //here using SSE
            res.end()
        }
    })
})

5. PM2记录的错误

TypeError: Converting circular structure to JSON
    --> starting at object with constructor 'Socket'
    |     property 'parser' -> object with constructor 'HTTPParser'
    --- property 'socket' closes the circle

我想用来JSON.stringify(res)将响应对象作为JSON传递,但这也不起作用。现在,我正在考虑这种方法是否正确,还是应该使用Socket.io(对于简单的服务器端事件而言,这是过大的选择)

谢谢

Questioner
Abhinandan Chakraborty
Viewed
1
kg99 2020-11-28 19:31:28

你为什么还要写这行:

const data = {res: res} <- error here: TypeError: Converting circular structure to JSON.

你仍然可以在其中调用insertPost的uploadVideo函数中访问响应对象。因此可以简单地是:

await Queue.add(res, result3.opts).

例如:

const uploadVideo = async(req, res) => {
    try{
        const result = await authUser(req);
        const result2 = await checkUploadFile(result);
        const result3 = await insertPost(result2, res);
        await Queue.add(res, result3.opts); // still have access to res
    } catch(err){
        res.status(403).send(err);
    }

删除此行:

const data = {res: res} <- error here: TypeError: Converting circular structure to JSON 

只需使用响应

Queue.process((res, done) => {
    //const res = job.data.res
    childProcess.execFile('someScript.sh', [`some`, `arguments`], { stdio: ['pipe', 'pipe', 'ignore']}, (err, stderr, stdout) => {
        if(err){
            done(new Error("Failed: " + err))
            res.write(JSON.stringify({status: true, id: 2})); //here using SSE
            res.end()
        } else {
            done()
            res.write(JSON.stringify({status: false})); //here using SSE
            res.end()
        }
    })
});

编辑:

我明白你的意思了。看了公牛模块。你为什么不能做这样的事情。

const uploadVideo = async(req, res) => {
  try{
      res.jobId = 0; // we need a way to know if job completed is our request          const result = await authUser(req);
      const result2 = await checkUploadFile(result);
      const result3 = await insertPost(result2, res);
      Queue.add({id: res.jobId, somedatafromresult3: 'result3.somedata' }, result3.opts);
      Queue.on("completed", (err, data) => {
        if (data.id === res.jobId) { // check to see if completed job is our one.
          res.write(JSON.stringify(data)); //here using SSE
          res.end()
        }
        console.log(data);
      });
  } catch(err){
      res.status(403).send(err);
  }
}

然后在你的处理函数中,只需返回将要发出的数据。IE

  videoQueue.process(function(job, done){
  childProcess.execFile('someScript.sh', [`some`, `arguments`], { stdio: ['pipe', 'pipe', 'ignore']}, (err, stderr, stdout) => {
    if(err){
        done(err, {status: true, id: job.data.id});
    } else {
      done(null, {status: false, id: job.data.id});
    }
})
})

;