rxjs-websockets, 一个非常灵活且良好的RxJS测试 web socket库

分享于 

9分钟阅读

GitHub

  繁體 雙語
websocket rxjs streams provided by an injectable service
  • 源代码名称:rxjs-websockets
  • 源代码网址:http://www.github.com/ohjames/rxjs-websockets
  • rxjs-websockets源代码文档
  • rxjs-websockets源代码下载
  • Git URL:
    git://www.github.com/ohjames/rxjs-websockets.git
    Git Clone代码到本地:
    git clone http://www.github.com/ohjames/rxjs-websockets
    Subversion代码到本地:
    $ svn co --depth empty http://www.github.com/ohjames/rxjs-websockets
    Checked out revision 1.
    $ cd repo
    $ svn up trunk
    
    RxJS web sockets

    build status

    一个 RxJS web socket库,具有简单而灵活的实现。 支持浏览器和 node.js.

    与其他 RxJS web socket库的比较:

    • 观察套接字
      • 可以观察的套接字为用户提供了输入主题,RxJS允许用户提供输入流作为参数,以允许用户选择适合自己使用情况的语义。
      • 在用户可见的socket中,web socket对象必须使用和管理,RxJS web socket根据用户对可见消息的订阅迟缓地管理用户的web socket。
      • 通过观察到的套接字,web socket对象必须使用普通的事件来观察连接状态,web socket提供了连接状态,它可以作为一个可见的。
    • RxJS内置了 web socket主题。
      • 作为一个主题实现,因此缺少 RxJS web sockets和可以观察套接字提供的灵活性。
      • 不提供监视网络套接字连接状态的任何功能。

    安装

    安装依赖项:

    npm install -S rxjs-websockets# the following dependency is recommended for most usersnpm install -S queueing-subject

    简单用法

    import { QueueingSubject } from'queueing-subject'importwebsocketConnectfrom'rxjs-websockets'// this subject queues as necessary to ensure every message is deliveredconst input =newQueueingSubject<string>()// this method returns an object which contains two observablesconst { messages, connectionStatus } =websocketConnect('ws://localhost/websocket-path', input)// send data to the serverinput.next('some data')// the connectionStatus stream will provides the current number of websocket// connections immediately to each new observer and updates as it changesconst connectionStatusSubscription =connectionStatus.subscribe(numberConnected=> {
     console.log('number of connected websockets:', numberConnected)
    })// the websocket connection is created lazily when the messages observable is// subscribed toconst messagesSubscription =messages.subscribe((message:string) => {
     console.log('received message:', message)
    })// this will close the websocketmessagesSubscription.unsubscribe()// closing the websocket does not close the connection status observable, it// can be used to monitor future connection status changesconnectionStatusSubscription.unsubscribe()

    messages 是一个很冷的可以观察性,这意味着在可以观察到 messages的订阅时延迟尝试。 本库高级用户将发现了解热和冷观测非常重要,多数情况下,使用共享操作符,如下面的示例所示。

    失败时重新连接

    这可以通过内置RxJS运算符完成:

    const input =newQueueingSubject<string>()const { messages, connectionStatus } =websocketConnect(`ws://server`, input)// try to reconnect every secondmessages.retryWhen(errors=>errors.delay(1000)).subscribe(message=> {
     console.log(message)
    })

    可选的web socket实现

    可以提供一个定制的web socket工厂函数,它接受一个URL并返回一个与 web socket兼容的对象:

    const { messages } =websocketConnect(
     'ws://127.0.0.1:4201/ws',
     this.inputStream=newQueueingSubject<string>(),
     undefined,
     (url, protocols) =>newWebSocket(url, protocols)
    )

    协议

    API typings说明如何使用包括协议在内的所有功能:

    exportinterfaceConnection {
     connectionStatus:Observable<number>
     messages:Observable<string>
    }exportinterfaceIWebSocket {
     close():any send(data:string|ArrayBuffer|Blob):any onopen?: (OpenEvent) =>any onclose?: (CloseEvent) =>any onmessage?: (MessageEvent) =>any onerror?: (ErrorEvent) =>any}exportdeclaretypeWebSocketFactory= (url:string, protocols?:string|string[]) =>IWebSocketexportdefaultfunction connect(
     url:string,
     input:Observable<string>,
     protocols?:string|string[],
     websocketFactory?:WebSocketFactory):Connection

    JSON消息和响应

    这里示例演示如何使用 map 运算符来处理传出消息的JSON编码和响应解析:

    function jsonWebsocketConnect(url:string, input:Observable<object>, protocols?:string|string[]) {
     const jsonInput =input.map(message=>JSON.stringify(message))
     const { connectionStatus, messages } =websocketConnect(url, jsonInput, protocols)
     const jsonMessages =messages.map(message=>JSON.parse(message))
     return { connectionStatus, messages: jsonMessages }
    }

    Angular 4示例

    下面是一个非常简单的Angular 4服务示例,它使用 rxjs-websockets 将来自服务器的消息作为可见的信息公开,并使用一个程序API接收输入消息。 在大多数情况下,最好将输入流直接从一个或者多个源 observables。

    // file: server-socket.service.tsimport { Injectable } from'@angular/core'import { QueueingSubject } from'queueing-subject'import { Observable } from'rxjs/Observable'importwebsocketConnectfrom'rxjs-websockets'import'rxjs/add/operator/share'@Injectable()exportclassServerSocket {
     private inputStream:QueueingSubject<string>
     public messages:Observable<string>
     public connect() {
     if (this.messages)
     return// Using share() causes a single websocket to be created when the first// observer subscribes. This socket is shared with subsequent observers// and closed when the observer count falls to zero.this.messages=websocketConnect(
     'ws://127.0.0.1:4201/ws',
     this.inputStream=newQueueingSubject<string>()
     ).messages.share()
     }
     public send(message:string):void {
     // If the websocket is not connected then the QueueingSubject will ensure// that messages are queued and delivered when the websocket reconnects.// A regular Subject can be used to discard messages sent when the websocket// is disconnected.this.inputStream.next(message)
     }
    }

    这里服务可以像这样使用:

    import { Component } from'@angular/core'import { Subscription } from'rxjs/Subscription'import { ServerSocket } from'./server-socket.service'@Component({
     selector: 'socket-user',
     templateUrl: './socket-user.component.html',
     styleUrls: ['./socket-user.component.scss']
    })exportclassSocketUserComponent {
     private socketSubscription:Subscriptionconstructor(privatesocket:ServerSocket) {}
     ngOnInit() {
     this.socket.connect()
     this.socketSubscription=this.socket.messages.subscribe((message:string) => {
     console.log('received message from server: ', message)
     })
     // send message to server, if the socket is not connected it will be sent// as soon as the connection becomes available thanks to QueueingSubjectthis.socket.send('hello')
     }
     ngOnDestroy() {
     this.socketSubscription.unsubscribe()
     }
    }

    WEB  test  FLEX  Websocket  RXJS  
    相关文章