Reconnecting a websocket in Angular and rxjs?
Actually there now is a WebsocketSubject in rxjs!
import { webSocket } from 'rxjs/webSocket' // for RxJS 6, for v5 use Observable.webSocket let subject = webSocket('ws://localhost:8081'); subject.subscribe( (msg) => console.log('message received: ' + msg), (err) => console.log(err), () => console.log('complete') ); subject.next(JSON.stringify({ op: 'hello' }));
It does handle reconnection when you resubscribe to a broken connection. So for example write this to reconnect:
subject.retry().subscribe(...)
See the docs for more info. Unfortunately the searchbox doesn't show the method, but you find it here:
http://reactivex.io/rxjs/class/es6/Observable.js~Observable.html#static-method-webSocket
that #-navigation is not working in my browser, so search for "webSocket" on that page.
Source: http://reactivex.io/rxjs/file/es6/observable/dom/WebSocketSubject.js.html#lineNumber15
This might not be the good answer but it's way too much for a comment.
The problem might comes from your service :
listenToTheSocket(): Observable<any> { this.websocket = new WebSocket(this.destination); this.websocket.onopen = () => { console.log("WebService Connected to " + this.destination); } return Observable.create(observer => { this.websocket.onmessage = (evt) => { observer.next(evt); }; }) .map(res => res.data) .share();}
Do you think that you go multiple times in your component into the ngOnInit
method?
You should try to put a console.log
into ngOnInit
to be sure.
Because if you do so, in your service you'll override the this.websocket
with a new one.
You should try something like that instead :
@Injectable()export class MemberService implements OnInit { private websocket: any; private websocketSubject$ = new BehaviorSubject<any>(); private websocket$ = this.websocketSubject$.asObservable(); private destination = 'wss://notessensei.mybluemix.net/ws/time'; constructor() { } ngOnInit() { } listenToTheSocket(): Observable<any> { if (this.websocket) { return this.websocket$; } this.websocket = new WebSocket(this.destination); this.websocket.onopen = () => console.log(`WebService Connected to ${this.destination}`); this.websocket.onmessage = (res) => this.websocketSubject$.next(res => res.data); }}
The BehaviorSubject
will send the last value if it receives an event before you subscribe to it. Plus, as it's a subject, no need to use the share
operator.
For rxjs 6 implementation
import { webSocket } from 'rxjs/webSocket'let subject = webSocket('ws://localhost:8081');subject.pipe( retry() //support auto reconnect).subscribe(...)