Skip to content

推送服务

小马哥 edited this page Oct 12, 2016 · 6 revisions

简介

Hprose 2.0 最大的亮点就是增加了推送功能的支持,而且这个功能的增加是在不修改现有通讯协议的方式下实现的,因此,这里的推送服务,即使不是 Hprose 2.0 的客户端或者服务器也可以使用。

当然,在旧版本的客户端调用推送服务,或者在旧版本的服务器上自己实现推送,需要多写一些代码。所以,如果你所使用的语言支持 Hprose 2.0,那么推荐直接使用 Hprose 2.0 的推送 API 来做推送,这样会极大的减少你的工作量。

下面我们来分别介绍一下客户端和服务器端增加的关于推送的 API。

客户端

客户端关于推送的方法只有两个,它们分别是:

subscribe 方法

client.subscribe(topic, callback[, timeout[, failswitch]]);
client.subscribe(topic, id, callback[, timeout[, failswitch]]);

subscribe 方法的用处是订阅服务器端的推送服务。该方法有两种方式,一种是自动获取设置客户端 id,另一种是手动设置客户端 id

参数 topic 是订阅的主题名,它实际上也是一个服务器端的方法,该方法与普通方法的区别是,它只有一个参数 id,该参数表示客户端的唯一编号,该方法的返回值即推送信息,当返回值为 null 或者抛出异常时,客户端会忽略并再次调用该 topic 方法。当该方法返回推送消息时,callback 回调函数会执行,并同时再次调用该 topic 方法。因此当没有推送消息时,该方法不应该立即返回值,而应该挂起等待,直到超时或者有推送消息时再返回结果。

当然,对于开发者来说,自己实现一个完善的推送方法还是有一定难度的。因此,hprose 2.0 的服务器端已经提供了一整套的专门用于推送的 API,通过这些 API,可以方便的自动实现用于推送的服务方法。在后面介绍服务器端时,我们再介绍这部分内容。

参数 id 是客户端的唯一编号,如果省略的话,客户端会自动通过 clientid 属性来获取,如果该属性未初始化,会自动调用一个名字为 # 的服务器端远程方法,之所以使用这个特殊的名字是为了防止跟用户发布的普通方法发生冲突。hprose 2.0 服务器已经自动实现了该方法,但是用户也可以用自己的实现来替换它,它的默认实现是一个安全随机字符串。当用户指定了 id 参数时,客户端会将它作为该 topic 方法的参数值传给服务器端,但不会修改客户端的 id 属性值。

参数 callback 是用来处理推送消息的回调函数,该参数不能省略。

参数 timeout 是等待推送消息的超时时间,单位是毫秒(ms),可以省略,默认值与 timeout 属性值相同。超时之后并不会产生异常,而是重新发起对 topic 方法的调用。因此,如果用户要在服务器端自己实现推送方法,应当注意处理好同一个客户端对同一个推送方法可能会进行重复调用的问题。如果使用 hprose 2.0 提供的推送 API,则不需要关心这一点。

参数 failswitch 表示当客户端与服务器端通讯中发生网络故障,是否自动切换服务器。默认值是 false,表示不切换。

对于同一个推送主题,subscribe 方法允许被多次调用,这样可以对同一个推送主题指定多个不同的回调方法。但通常没有必要也不推荐这样做。

unsubscribe 方法

client.unsubscribe(topic)
client.unsubscribe(topic, callback)
client.unsubscribe(topic, id)
client.unsubscribe(topic, id, callback)

该方法用于取消订阅推送主题。当调用该方法时,带有 callback 参数,将只取消对该 callback 方法的回调,除非这是该主题上最后一个 callback,否则对该主题远程方法的调用并不会中断。当所有的 callback 都被取消之后,或者当调用该方法时,没有指定 callback 参数时,将会中断对该主题远程方法的循环调用。

如果 id 参数若未指定,那么当客户端 id 属性有值时,将只取消对该 id 属性值对应的推送主题的订阅。当客户端 id 属性未初始化时,将会取消该主题上所有的订阅。

通常来说,当你调用 subscribe 方法时如果指定了 id 参数,那么当调用 unsubscribe 方法时你也应该指定相同的 id 参数。当你调用 subscribe 方法时没有指定 id 参数,那么当调用 unsubscribe 方法时你也不需要指定 id 参数。

isSubscribed 方法

client.isSubscribed(topic);

当 topic 已被订阅时,返回 true,否则返回 false

subscribedList 方法

client.subscribedList();

返回已被订阅的主题的列表,返回值是一个字符串数组。数组元素为已订阅的主题名称。

服务器端

服务器端提供了比较多的关于推送的 API,包括广播,多播和单播方式的推送,还有超时,心跳,推送事件等设置。

timeout 属性

该属性有两个用处。一个用处是用来设置底层 socket 连接的空闲超时。另一个是设置推送空闲超时。该属性默认值为 120000,单位是毫秒(ms),即 2 分钟。

当空闲超时后,底层的 socket 连接会收到一个 timeout 事件,但是跟客户端的连接并不会断开或销毁。hprose 默认并不处理该事件,用户如果有需要可以自己处理。

当服务器发布了推送主题后,客户端会跟服务器端保持一个长连接,如果达到超时时间,仍然没有任何消息推送给客户端,则返回 null,此时,如果客户端仍然在线的话,则会立即再次发送获取推送主题的请求。服务器端通过这个方式可以获知客户端是否还在线。

heartbeat 属性

该属性用来设置推送的心跳检测间隔时间。该属性默认值为 3000,单位是毫秒,即 3 秒钟。

当服务器端推送数据给客户端后,如果客户端在 heartbeat 时间内没有取走推送数据,则服务器端认为客户端已掉线。对于已掉线的客户端,服务器端会清除为该客户端分配的内存空间,并将该客户端从推送列表中移除。

timeoutheartbeat 属性在检测客户端是否离线时是相互配合的,当服务器端没有向客户端推送任何消息时,服务器端需要至少 timeout + heartbeat 的时间才能检测到客户端以离线。当服务器端有向客户端推送消息时,则在推送消息之后经过 heartbeat 时间可以检测到客户端已掉线。

timeoutheartbeat 设置的时间越短,检测到客户端离线的时间就越短。但是需要注意以下几个问题:

timeout 时间越短,服务器端和客户端之间的用于检测是否掉线的通讯就越频繁,所以不应该将 timeout 设置的过短,否则会严重增加服务器的负担。

因此,timeout 的设置一般不应少于 30 秒。对于负载比较高的服务器,保持默认值就是一个不错的选项。

对于推送频繁的服务器来说,heartbeat 时间越长,对于已经离线的客户端,在服务器端存储的离线消息就越多,这会严重的占用服务器端的内存,因此,不宜将 heartbeat 的时间设置的过长。

如果 heartbeat 的时间设置的过短,客户端可能会因为网络原因导致不能及时取走推送消息,这就会导致错误的离线判断,当错误离线判断发生后,会丢失一些推送消息。

因此,heartbeat 的选择则应根据客户端的网络情况来决定,如果客户端都是来自局域网,并且客户端数量较少,设置为 1 秒甚至更短的时间也是可以的。而对于比较慢速且不太稳定的移动网络,设置为 5 秒或者 10 秒可能是一个比较合适的取值。对于普通的互联网客户端来说,保持默认值就可以了。

publish 方法

server.publish(topic[, options]);

该方法用于发布一个推送主题。这个推送的主题实际上是一个自动生成的远程服务方法。它的功能就是实现推送。

topic 为主题名,字符串类型。

options 是发布推送主题的选项,该选项同 add 方法的选项完全不同。这里的选项值包括以下 3 个:

  • timeout
  • heartbeat
  • events

这里 timeoutheartbeat 在前面的属性介绍里已经说明过了,这里不再重复。

events 是一个 EventEmitter 对象。你可以在上面定义以下两个事件:

  • subscribe
  • unsubscribe

这两个事件的参数相同,第一个参数是客户端 id,第二个参数是服务器对象本身。

当编号为 id 的客户端第一次连接该推送主题的时,subscribe 事件会被触发。当编号为 id 的客户端被检测到离线时,unsubscribe 事件会被触发。

如果该主题没有设置 events 选项,则会触发服务器上的这两个事件,但是服务器上的这两个事件的会在开头多一个参数 topic,即 publish 方法发布的 topic 参数。

publish 方法仅仅是告诉客户端,现在有一个叫做 topic 的推送主题可以订阅。

而要真正推送数据给客户端,则需要使用以下几个方法。

广播

server.broadcast(topic, result[, callback]);
server.push(topic, result);

这两个方法功能相同,但是 broadcast 方法支持回调,该回调方法有两个参数,这个参数都是数组类型,第一个数组中是所有推送成功的客户端 id,第二个数组中是所有推送失败的客户端 id

一旦服务器启动,你可以在任何地方进行数据推送。比如在其它的服务方法中,在服务器事件中,甚至在服务器外的并行运行的函数中。例如:

时间推送服务器

var hprose = require('hprose');
var server = hprose.Server.create("http://0.0.0.0:8080");
server.publish('time');
setInterval(function() {
    server.push('time', new Date());
}, 1000);
server.start();

时间显示客户端

var hprose = require('hprose');
var client = hprose.Client.create("http://127.0.0.1:8080");
var count = 0;
client.subscribe('time', function(date) {
    if (++count > 10) {
        client.unsubscribe('time');
    }
    else {
        console.log(date);
    }
});

该程序运行结果为:

Sat Aug 08 2015 20:54:44 GMT+0800 (CST)
Sat Aug 08 2015 20:54:45 GMT+0800 (CST)
Sat Aug 08 2015 20:54:46 GMT+0800 (CST)
Sat Aug 08 2015 20:54:47 GMT+0800 (CST)
Sat Aug 08 2015 20:54:48 GMT+0800 (CST)
Sat Aug 08 2015 20:54:49 GMT+0800 (CST)
Sat Aug 08 2015 20:54:50 GMT+0800 (CST)
Sat Aug 08 2015 20:54:51 GMT+0800 (CST)
Sat Aug 08 2015 20:54:52 GMT+0800 (CST)
Sat Aug 08 2015 20:54:53 GMT+0800 (CST)

有时候,你可能想在某个服务方法中推送数据给客户端,但是该服务方法可能在其它文件中定义。因此,你得不到 server 对象。那这时还能进行推送吗?

答案是可以,没问题。我们前面说过,在服务方法中我们可以得到一个 context 参数,这个 context 参数中就包含有一个 clients 对象,这个对象上就包含了所有跟推送有关的方法,这些方法跟 server 对象上的推送方法是完全一样的,例如:

context.clients.broadcast(topic, result[, callback]);
context.clients.push(topic, result);

我们再来看一个例子:

服务器

var hprose = require('hprose');
function hello(name, context) {
    context.clients.push("news", "this is a pushed message: " + name);
    context.clients.broadcast("news", {x: 1,
                                       y: 2,
                                 message: "this is a pushed object:"  + name});
    return 'Hello ' + name + '! -- ' + context.socket.remoteAddress;
}
var server = hprose.Server.create("ws://0.0.0.0:8080");
server.addFunction(hello, {passContext: true});
server.publish('news');
server.start();

客户端

var hprose = require('hprose');
var client = hprose.Client.create("ws://127.0.0.1:8080", ['hello']);
client.subscribe('news', function(news) {
     console.log(news);
});
client.hello('hprose').then(function(result) {
     console.log(result);
});

假设我们运行两个客户端,则第一个客户端显示:

this is a pushed message: hprose
Hello hprose! -- 127.0.0.1
{ x: 1, y: 2, message: 'this is a pushed object:hprose' }
this is a pushed message: hprose
{ x: 1, y: 2, message: 'this is a pushed object:hprose' }

第二个客户端显示:

this is a pushed message: hprose
Hello hprose! -- 127.0.0.1
{ x: 1, y: 2, message: 'this is a pushed object:hprose' }

这两个客户端显示结果之后并不会退出,如果有其它客户端再次运行时,这两个客户端还会继续显示推送信息。也就是说,对于已经执行了 subscribe 的客户端,在未执行对应的 unsubscribe 方法之前,该客户端会一直运行,接收推送数据,即使服务器已经关闭,客户端也不会退出。

多播

server.multicast(topic, ids, result[, callback]);
server.push(topic, ids, result);
context.clients.multicast(topic, ids, result[, callback]);
context.clients.push(topic, ids, result);

跟广播类似,多播也有这样几种形式。跟广播相比,多播多了一个 ids 参数,它是一个客户端 id 的数组。也就是说,你可以向指定的一组客户端推送消息。

单播

server.unicast(topic, id, result[, callback]);
server.push(topic, id, result);
context.clients.unicast(topic, id, result[, callback]);
context.clients.push(topic, id, result);

单播是跟多播的形式也类似,只不过客户端 ids 数组参数变成了一个客户端 id 参数。

但是还有一点要注意,unicast 的回调方法跟 broadcastmulticast 不同,unicast 的回调方法只有一个参数,而且是一个 Boolean 值,该值为 true 是表示推送成功,为 false 表示推送失败。

上面的几种不同的推送方式,在最新的版本中都已经支持直接推送 promise 的结果。

idlist 方法

server.idlist(topic);
context.clients.idlist(topic);

该方法用于获取当前在线的所有客户端的 id 列表。

exist 方法

server.exist(topic, id);
context.clients.exist(topic, id);

该方法用于快速判断 id 是否在当前在线的客户端列表中。

注意,客户端在线状态是针对主题的,同一个客户端可能针对一个主题处于在线状态,但是针对另一个主题却处于离线状态,这种情况是正常的。