26/9更新: 更新了推荐的向后兼容性维护方法

为了处理消息,节点注册了对 input 事件的监听器。每当该事件被触发时,注册的回调函数就会随着消息被调用

this.on('input', function(msg) {

})

在此函数中,节点可以执行它需要做的任何工作,并且在某个时候它可能会调用 node.send() 以便发送消息。我之所以说“在某个时候”,是因为运行时不知道节点打算做什么。有几种可能性,例如:

  • 它可能是一个完全同步的函数,并在返回之前调用 node.send()
  • 它可能执行一些异步操作,并在事件处理函数返回后在未来调用 node.send()
  • 它可能多次调用 node.send()
  • 它可能不发送任何东西。

这使得运行时无法确定节点何时完成了消息处理。这也意味着,当节点发送消息时,无法知道它发送消息的原因

为什么这很重要?

到目前为止,Node-RED 在运行时不知道这些事情的情况下也运行得很好。然而,当我们展望1.0版本之后,有许多新功能将需要这些额外的信息。

例如,我在早期的博客文章中提到,我们正在研究运行时如何更好地监控通过流的消息,并提供一种标准的方式来超时任何耗时过长的节点。

正在研究的另一个功能是能够更优雅地关闭流——这样流可以停止接受新的工作,但允许正在传输中的消息完成通过流。

只有当运行时知道节点何时完成了消息处理时,这些事情才可能实现。

正在发生什么变化?

为了解决这个问题,作为 1.0 版本发布的一部分,我们正在为 input 事件回调函数引入新的签名。

this.on('input', function(msg, send, done) {

})

除了接收消息对象,该函数还获得了自己的 senddone 函数,用于消息。

send 函数是 node.send() 的直接替代。关键区别在于,当它被调用时,运行时能够将其与接收到的原始 msg 对象关联起来。

done 函数必须在处理完消息后调用。它接受一个可选参数,如果节点由于某种原因未能处理消息,则为 error 对象。

例如

this.on('input', function(msg, send, done) {
    // do some work with msg
    someImaginaryLibrary(msg, (err, result) => {
        if (err) {
            // Report back the error. This is equivalent to
            //    node.error(err,msg)
            // but with the timeout handling dealt with as well
            done(err);
        } else {
            msg.payload = result;
            send(msg);
            done();
        }
    })
})

添加“完成”节点

这个新API的用途之一是当没有输出的节点完成时(例如电子邮件节点)触发流。这就是新的“完成”节点的作用。

如果节点调用 done(),它将触发工作区中已配置为目标该节点的任何“完成”节点。如果 done 被调用时带有错误,那么它将触发任何“捕获”节点,与现有对 node.error(err,msg) 的调用一样。

向后兼容性

撰写本文时,流程库中有超过2200个节点模块。显然,它们不可能在一夜之间更新到这个新的回调签名。但这没问题,因为运行时能够检测节点是注册“旧式”还是“新式”回调函数,并相应地处理它。

我们希望节点作者能随着时间的推移将其节点迁移到这种新格式。

我们也意识到并非所有人都会立即升级到 Node-RED 1.0。那么,如果一个节点使用了新的回调签名,但它安装在 Node-RED 的 1.0 之前版本中会发生什么呢?

为了实现这一点,节点可以编写成防御性的,以适应两种情况

let node = this;
this.on('input', function(msg, send, done) {
    // If this is pre-1.0, 'send' will be undefined, so fallback to node.send
    send = send || function() { node.send.apply(node,arguments) }
    // do some work with msg
    someImaginaryLibrary(msg, (err, result) => {
        if (err) {
            // Report back the error
            if (done) {
                 // Use done if defined (1.0+)
                done(err)
            } else {
                // Fallback to node.error (pre-1.0)
                node.error(err, msg);
            }
        } else {
            msg.payload = result;
            send(msg);
            // Check done exists (1.0+)
            if (done) {
                done();
            }
        }
    })
})

我们将在 1.0 版本发布时提供所有相关的正式文档。