rxjs cleanup

This commit is contained in:
Eugene Pankov
2018-08-09 12:37:14 -07:00
parent 9e228a4e93
commit 23e93f0969
10 changed files with 105 additions and 70 deletions

View File

@@ -20,32 +20,33 @@ export abstract class BaseSession {
name: string
recoveryId: string
truePID: number
output$: Observable<string>
closed$: Observable<void>
destroyed$: Observable<void>
protected output_ = new Subject<string>()
protected closed_ = new Subject<void>()
protected destroyed_ = new Subject<void>()
protected output = new Subject<string>()
protected closed = new Subject<void>()
protected destroyed = new Subject<void>()
private initialDataBuffer = ''
private initialDataBufferReleased = false
get output$ (): Observable<string> { return this.output }
get closed$ (): Observable<void> { return this.closed }
get destroyed$ (): Observable<void> { return this.destroyed }
constructor () {
this.output$ = this.output_.asObservable()
this.closed$ = this.closed_.asObservable()
this.destroyed$ = this.destroyed_.asObservable()
this.output$ = this.output.asObservable()
this.closed$ = this.closed.asObservable()
this.destroyed$ = this.destroyed.asObservable()
}
emitOutput (data: string) {
if (!this.initialDataBufferReleased) {
this.initialDataBuffer += data
} else {
this.output_.next(data)
this.output.next(data)
}
}
releaseInitialDataBuffer () {
this.initialDataBufferReleased = true
this.output_.next(this.initialDataBuffer)
this.output.next(this.initialDataBuffer)
this.initialDataBuffer = null
}
@@ -60,9 +61,9 @@ export abstract class BaseSession {
async destroy (): Promise<void> {
if (this.open) {
this.open = false
this.closed_.next()
this.destroyed_.next()
this.output_.complete()
this.closed.next()
this.destroyed.next()
this.output.complete()
await this.gracefullyKillProcess()
}
}