From 076b1c71299f2fdb72efd2e8281011412e13d6b1 Mon Sep 17 00:00:00 2001 From: Eugene Pankov Date: Mon, 27 Dec 2021 20:08:22 +0100 Subject: [PATCH] group various stream processors into middleware --- tabby-serial/src/api.ts | 33 +++--- tabby-telnet/src/session.ts | 55 ++++------ .../src/api/baseTerminalTab.component.ts | 2 +- tabby-terminal/src/api/middleware.ts | 101 ++++++++++++++++++ .../loginScriptsSettings.component.ts | 2 +- .../streamProcessingSettings.component.ts | 2 +- tabby-terminal/src/features/zmodem.ts | 2 +- tabby-terminal/src/index.ts | 7 +- .../loginScriptProcessing.ts | 15 +-- .../oscProcessing.ts} | 9 +- .../{api => middleware}/streamProcessing.ts | 18 ++-- tabby-terminal/src/session.ts | 46 +++++--- 12 files changed, 195 insertions(+), 97 deletions(-) create mode 100644 tabby-terminal/src/api/middleware.ts rename tabby-terminal/src/{api => middleware}/loginScriptProcessing.ts (88%) rename tabby-terminal/src/{api/osc1337Processing.ts => middleware/oscProcessing.ts} (88%) rename tabby-terminal/src/{api => middleware}/streamProcessing.ts (89%) diff --git a/tabby-serial/src/api.ts b/tabby-serial/src/api.ts index 7fba2306..d3a50dca 100644 --- a/tabby-serial/src/api.ts +++ b/tabby-serial/src/api.ts @@ -3,7 +3,7 @@ import SerialPort from 'serialport' import { LogService, NotificationsService, Profile } from 'tabby-core' import { Subject, Observable } from 'rxjs' import { Injector, NgZone } from '@angular/core' -import { BaseSession, LoginScriptsOptions, StreamProcessingOptions, TerminalStreamProcessor } from 'tabby-terminal' +import { BaseSession, LoginScriptsOptions, SessionMiddleware, StreamProcessingOptions, TerminalStreamProcessor } from 'tabby-terminal' import { SerialService } from './services/serial.service' export interface SerialProfile extends Profile { @@ -32,6 +32,14 @@ export interface SerialPortInfo { description?: string } +class SlowFeedMiddleware extends SessionMiddleware { + feedFromTerminal (data: Buffer): void { + for (const byte of data) { + this.outputToSession.next(Buffer.from([byte])) + } + } +} + export class SerialSession extends BaseSession { serial: SerialPort @@ -50,13 +58,11 @@ export class SerialSession extends BaseSession { this.notifications = injector.get(NotificationsService) this.streamProcessor = new TerminalStreamProcessor(profile.options) - this.streamProcessor.outputToSession$.subscribe(data => { - this.serial?.write(data.toString()) - }) - this.streamProcessor.outputToTerminal$.subscribe(data => { - this.emitOutput(data) - this.loginScriptProcessor?.feedFromSession(data) - }) + this.middleware.push(this.streamProcessor) + + if (this.profile.options.slowSend) { + this.middleware.unshift(new SlowFeedMiddleware()) + } this.setLoginScriptsOptions(profile.options) } @@ -110,7 +116,7 @@ export class SerialSession extends BaseSession { setTimeout(() => this.streamProcessor.start()) this.serial.on('readable', () => { - this.streamProcessor.feedFromSession(this.serial.read()) + this.emitOutput(this.serial.read()) }) this.serial.on('end', () => { @@ -124,17 +130,10 @@ export class SerialSession extends BaseSession { } write (data: Buffer): void { - if (!this.profile.options.slowSend) { - this.streamProcessor.feedFromTerminal(data) - } else { - for (const byte of data) { - this.streamProcessor.feedFromTerminal(Buffer.from([byte])) - } - } + this.serial?.write(data.toString()) } async destroy (): Promise { - this.streamProcessor.close() this.serviceMessage.complete() await super.destroy() } diff --git a/tabby-telnet/src/session.ts b/tabby-telnet/src/session.ts index aa6a2b51..a7f2b928 100644 --- a/tabby-telnet/src/session.ts +++ b/tabby-telnet/src/session.ts @@ -3,7 +3,7 @@ import colors from 'ansi-colors' import stripAnsi from 'strip-ansi' import { Injector } from '@angular/core' import { Profile, LogService } from 'tabby-core' -import { BaseSession, LoginScriptsOptions, StreamProcessingOptions, TerminalStreamProcessor } from 'tabby-terminal' +import { BaseSession, LoginScriptsOptions, SessionMiddleware, StreamProcessingOptions, TerminalStreamProcessor } from 'tabby-terminal' import { Subject, Observable } from 'rxjs' @@ -41,6 +41,21 @@ enum TelnetOptions { NEW_ENVIRON = 0x27, } +class UnescapeFFMiddleware extends SessionMiddleware { + feedFromSession (data: Buffer): void { + while (data.includes(0xff)) { + const pos = data.indexOf(0xff) + + this.outputToTerminal.next(data.slice(0, pos)) + this.outputToTerminal.next(Buffer.from([0xff, 0xff])) + + data = data.slice(pos + 1) + } + + this.outputToTerminal.next(data) + } +} + export class TelnetSession extends BaseSession { get serviceMessage$ (): Observable { return this.serviceMessage } @@ -48,7 +63,6 @@ export class TelnetSession extends BaseSession { private socket: Socket private streamProcessor: TerminalStreamProcessor private telnetProtocol = false - private echoEnabled = false private lastWidth = 0 private lastHeight = 0 private requestedOptions = new Set() @@ -59,33 +73,10 @@ export class TelnetSession extends BaseSession { ) { super(injector.get(LogService).create(`telnet-${profile.options.host}-${profile.options.port}`)) this.streamProcessor = new TerminalStreamProcessor(profile.options) - this.streamProcessor.outputToSession$.subscribe(data => { - this.socket.write(this.unescapeFF(data)) - }) - this.streamProcessor.outputToTerminal$.subscribe(data => { - this.emitOutput(data) - }) + this.middleware.push(this.streamProcessor) this.setLoginScriptsOptions(profile.options) } - unescapeFF (data: Buffer): Buffer { - if (!this.telnetProtocol) { - return data - } - const result: Buffer[] = [] - while (data.includes(0xff)) { - const pos = data.indexOf(0xff) - - result.push(data.slice(0, pos)) - result.push(Buffer.from([0xff, 0xff])) - - data = data.slice(pos + 1) - } - - result.push(data) - return Buffer.concat(result) - } - async start (): Promise { this.socket = new Socket() this.emitServiceMessage(`Connecting to ${this.profile.options.host}`) @@ -124,6 +115,7 @@ export class TelnetSession extends BaseSession { onData (data: Buffer): void { if (!this.telnetProtocol && data[0] === TelnetCommands.IAC) { this.telnetProtocol = true + this.middleware.push(new UnescapeFFMiddleware()) this.requestOption(TelnetCommands.DO, TelnetOptions.SUPPRESS_GO_AHEAD) this.emitTelnet(TelnetCommands.WILL, TelnetOptions.TERMINAL_TYPE) this.emitTelnet(TelnetCommands.WILL, TelnetOptions.NEGO_WINDOW_SIZE) @@ -131,7 +123,7 @@ export class TelnetSession extends BaseSession { if (this.telnetProtocol) { data = this.processTelnetProtocol(data) } - this.streamProcessor.feedFromSession(data) + this.emitOutput(data) } emitTelnet (command: TelnetCommands, option: TelnetOptions): void { @@ -190,7 +182,7 @@ export class TelnetSession extends BaseSession { this.emitTelnet(TelnetCommands.WILL, option) this.emitSize() } else if (option === TelnetOptions.ECHO) { - this.echoEnabled = true + this.streamProcessor.forceEcho = true this.emitTelnet(TelnetCommands.WILL, option) } else if (option === TelnetOptions.TERMINAL_TYPE) { this.emitTelnet(TelnetCommands.WILL, option) @@ -201,7 +193,7 @@ export class TelnetSession extends BaseSession { } if (command === TelnetCommands.DONT) { if (option === TelnetOptions.ECHO) { - this.echoEnabled = false + this.streamProcessor.forceEcho = false this.emitTelnet(TelnetCommands.WONT, option) } else { this.logger.debug('(!) Unhandled option') @@ -249,10 +241,7 @@ export class TelnetSession extends BaseSession { } write (data: Buffer): void { - if (this.echoEnabled) { - this.emitOutput(data) - } - this.streamProcessor.feedFromTerminal(data) + this.socket.write(data) } kill (_signal?: string): void { diff --git a/tabby-terminal/src/api/baseTerminalTab.component.ts b/tabby-terminal/src/api/baseTerminalTab.component.ts index 1ea9f4b0..38d6f116 100644 --- a/tabby-terminal/src/api/baseTerminalTab.component.ts +++ b/tabby-terminal/src/api/baseTerminalTab.component.ts @@ -389,7 +389,7 @@ export class BaseTerminalTabComponent extends BaseTabComponent implements OnInit if (!(data instanceof Buffer)) { data = Buffer.from(data, 'utf-8') } - this.session?.write(data) + this.session?.feedFromTerminal(data) if (this.config.store.terminal.scrollOnInput) { this.frontend?.scrollToBottom() } diff --git a/tabby-terminal/src/api/middleware.ts b/tabby-terminal/src/api/middleware.ts new file mode 100644 index 00000000..5edcc98b --- /dev/null +++ b/tabby-terminal/src/api/middleware.ts @@ -0,0 +1,101 @@ +import { Subject, Observable } from 'rxjs' +import { SubscriptionContainer } from 'tabby-core' + +export class SessionMiddleware { + get outputToSession$ (): Observable { return this.outputToSession } + get outputToTerminal$ (): Observable { return this.outputToTerminal } + + protected outputToSession = new Subject() + protected outputToTerminal = new Subject() + + feedFromSession (data: Buffer): void { + this.outputToTerminal.next(data) + } + + feedFromTerminal (data: Buffer): void { + this.outputToSession.next(data) + } + + close (): void { + this.outputToSession.complete() + this.outputToTerminal.complete() + } +} + +export class SesssionMiddlewareStack extends SessionMiddleware { + private stack: SessionMiddleware[] = [] + private subs = new SubscriptionContainer() + + constructor () { + super() + this.push(new SessionMiddleware()) + } + + push (middleware: SessionMiddleware): void { + this.stack.push(middleware) + this.relink() + } + + unshift (middleware: SessionMiddleware): void { + this.stack.unshift(middleware) + this.relink() + } + + remove (middleware: SessionMiddleware): void { + this.stack = this.stack.filter(m => m !== middleware) + this.relink() + } + + replace (middleware: SessionMiddleware, newMiddleware: SessionMiddleware): void { + const index = this.stack.indexOf(middleware) + if (index >= 0) { + this.stack[index].close() + this.stack[index] = newMiddleware + } else { + this.stack.push(newMiddleware) + } + this.relink() + } + + feedFromSession (data: Buffer): void { + this.stack[0].feedFromSession(data) + } + + feedFromTerminal (data: Buffer): void { + this.stack[this.stack.length - 1].feedFromTerminal(data) + } + + close (): void { + for (const m of this.stack) { + m.close() + } + this.subs.cancelAll() + super.close() + } + + private relink () { + this.subs.cancelAll() + + for (let i = 0; i < this.stack.length - 1; i++) { + this.subs.subscribe( + this.stack[i].outputToTerminal$, + x => this.stack[i + 1].feedFromSession(x) + ) + } + this.subs.subscribe( + this.stack[this.stack.length - 1].outputToTerminal$, + x => this.outputToTerminal.next(x), + ) + + for (let i = this.stack.length - 2; i >= 0; i--) { + this.subs.subscribe( + this.stack[i + 1].outputToSession$, + x => this.stack[i].feedFromTerminal(x) + ) + } + this.subs.subscribe( + this.stack[0].outputToSession$, + x => this.outputToSession.next(x), + ) + } +} diff --git a/tabby-terminal/src/components/loginScriptsSettings.component.ts b/tabby-terminal/src/components/loginScriptsSettings.component.ts index 5f610908..9736e7f9 100644 --- a/tabby-terminal/src/components/loginScriptsSettings.component.ts +++ b/tabby-terminal/src/components/loginScriptsSettings.component.ts @@ -2,7 +2,7 @@ import { Component, Input } from '@angular/core' import { PlatformService } from 'tabby-core' -import { LoginScript, LoginScriptsOptions } from '../api/loginScriptProcessing' +import { LoginScript, LoginScriptsOptions } from '../middleware/loginScriptProcessing' /** @hidden */ @Component({ diff --git a/tabby-terminal/src/components/streamProcessingSettings.component.ts b/tabby-terminal/src/components/streamProcessingSettings.component.ts index be2ff872..5eddcd6b 100644 --- a/tabby-terminal/src/components/streamProcessingSettings.component.ts +++ b/tabby-terminal/src/components/streamProcessingSettings.component.ts @@ -1,6 +1,6 @@ /* eslint-disable @typescript-eslint/explicit-module-boundary-types */ import { Component, Input } from '@angular/core' -import { StreamProcessingOptions } from '../api/streamProcessing' +import { StreamProcessingOptions } from '../middleware/streamProcessing' /** @hidden */ @Component({ diff --git a/tabby-terminal/src/features/zmodem.ts b/tabby-terminal/src/features/zmodem.ts index ad3847f5..cd5d660b 100644 --- a/tabby-terminal/src/features/zmodem.ts +++ b/tabby-terminal/src/features/zmodem.ts @@ -32,7 +32,7 @@ export class ZModemDecorator extends TerminalDecorator { terminal.write(data) } }, - sender: data => terminal.session!.write(Buffer.from(data)), + sender: data => terminal.session!.feedFromTerminal(Buffer.from(data)), on_detect: async detection => { try { terminal.enablePassthrough = false diff --git a/tabby-terminal/src/index.ts b/tabby-terminal/src/index.ts index a3fe4eab..4a8120c5 100644 --- a/tabby-terminal/src/index.ts +++ b/tabby-terminal/src/index.ts @@ -87,8 +87,9 @@ export { TerminalFrontendService, TerminalDecorator, TerminalContextMenuItemProv export { Frontend, XTermFrontend, XTermWebGLFrontend } export { BaseTerminalTabComponent } from './api/baseTerminalTab.component' export * from './api/interfaces' -export * from './api/streamProcessing' -export * from './api/loginScriptProcessing' -export * from './api/osc1337Processing' +export * from './middleware/streamProcessing' +export * from './middleware/loginScriptProcessing' +export * from './middleware/oscProcessing' +export * from './api/middleware' export * from './session' export { LoginScriptsSettingsComponent, StreamProcessingSettingsComponent } diff --git a/tabby-terminal/src/api/loginScriptProcessing.ts b/tabby-terminal/src/middleware/loginScriptProcessing.ts similarity index 88% rename from tabby-terminal/src/api/loginScriptProcessing.ts rename to tabby-terminal/src/middleware/loginScriptProcessing.ts index efd88f07..f08ba6bd 100644 --- a/tabby-terminal/src/api/loginScriptProcessing.ts +++ b/tabby-terminal/src/middleware/loginScriptProcessing.ts @@ -1,6 +1,6 @@ import deepClone from 'clone-deep' -import { Subject, Observable } from 'rxjs' import { Logger } from 'tabby-core' +import { SessionMiddleware } from '../api/middleware' export interface LoginScript { expect: string @@ -13,10 +13,7 @@ export interface LoginScriptsOptions { scripts?: LoginScript[] } -export class LoginScriptProcessor { - get outputToSession$ (): Observable { return this.outputToSession } - - private outputToSession = new Subject() +export class LoginScriptProcessor extends SessionMiddleware { private remainingScripts: LoginScript[] = [] private escapeSeqMap = { @@ -34,6 +31,7 @@ export class LoginScriptProcessor { private logger: Logger, options: LoginScriptsOptions ) { + super() this.remainingScripts = deepClone(options.scripts ?? []) for (const script of this.remainingScripts) { if (!script.isRegex) { @@ -43,10 +41,9 @@ export class LoginScriptProcessor { } } - feedFromSession (data: Buffer): boolean { + feedFromSession (data: Buffer): void { const dataString = data.toString() - let found = false for (const script of this.remainingScripts) { if (!script.expect) { continue @@ -60,14 +57,12 @@ export class LoginScriptProcessor { } if (match) { - found = true this.logger.info('Executing script:', script) this.outputToSession.next(Buffer.from(script.send + '\n')) this.remainingScripts = this.remainingScripts.filter(x => x !== script) } else { if (script.optional) { this.logger.debug('Skip optional script: ' + script.expect) - found = true this.remainingScripts = this.remainingScripts.filter(x => x !== script) } else { break @@ -75,7 +70,7 @@ export class LoginScriptProcessor { } } - return found + super.feedFromSession(data) } close (): void { diff --git a/tabby-terminal/src/api/osc1337Processing.ts b/tabby-terminal/src/middleware/oscProcessing.ts similarity index 88% rename from tabby-terminal/src/api/osc1337Processing.ts rename to tabby-terminal/src/middleware/oscProcessing.ts index 1a0aacbc..85c61633 100644 --- a/tabby-terminal/src/api/osc1337Processing.ts +++ b/tabby-terminal/src/middleware/oscProcessing.ts @@ -1,17 +1,18 @@ import * as os from 'os' import { Subject, Observable } from 'rxjs' +import { SessionMiddleware } from '../api/middleware' const OSCPrefix = Buffer.from('\x1b]') const OSCSuffix = Buffer.from('\x07') -export class OSCProcessor { +export class OSCProcessor extends SessionMiddleware { get cwdReported$ (): Observable { return this.cwdReported } get copyRequested$ (): Observable { return this.copyRequested } private cwdReported = new Subject() private copyRequested = new Subject() - process (data: Buffer): Buffer { + feedFromSession (data: Buffer): void { let startIndex = 0 while (data.includes(OSCPrefix, startIndex) && data.includes(OSCSuffix, startIndex)) { const params = data.subarray(data.indexOf(OSCPrefix, startIndex) + OSCPrefix.length) @@ -42,10 +43,12 @@ export class OSCProcessor { continue } } - return data + super.feedFromSession(data) } close (): void { this.cwdReported.complete() + this.copyRequested.complete() + super.close() } } diff --git a/tabby-terminal/src/api/streamProcessing.ts b/tabby-terminal/src/middleware/streamProcessing.ts similarity index 89% rename from tabby-terminal/src/api/streamProcessing.ts rename to tabby-terminal/src/middleware/streamProcessing.ts index 81487703..2c604949 100644 --- a/tabby-terminal/src/api/streamProcessing.ts +++ b/tabby-terminal/src/middleware/streamProcessing.ts @@ -2,9 +2,10 @@ import hexdump from 'hexer' import bufferReplace from 'buffer-replace' import colors from 'ansi-colors' import binstring from 'binstring' -import { Subject, Observable, interval, debounce } from 'rxjs' +import { interval, debounce } from 'rxjs' import { PassThrough, Readable, Writable } from 'stream' import { ReadLine, createInterface as createReadline, clearLine } from 'readline' +import { SessionMiddleware } from '../api/middleware' export type InputMode = null | 'local-echo' | 'readline' | 'readline-hex' export type OutputMode = null | 'hex' @@ -17,13 +18,8 @@ export interface StreamProcessingOptions { outputNewlines?: NewlineMode } -export class TerminalStreamProcessor { - get outputToSession$ (): Observable { return this.outputToSession } - get outputToTerminal$ (): Observable { return this.outputToTerminal } - - protected outputToSession = new Subject() - protected outputToTerminal = new Subject() - +export class TerminalStreamProcessor extends SessionMiddleware { + forceEcho = false private inputReadline: ReadLine private inputPromptVisible = false private inputReadlineInStream: Readable & Writable @@ -31,6 +27,7 @@ export class TerminalStreamProcessor { private started = false constructor (private options: StreamProcessingOptions) { + super() this.inputReadlineInStream = new PassThrough() this.inputReadlineOutStream = new PassThrough() this.inputReadlineOutStream.on('data', data => { @@ -85,7 +82,7 @@ export class TerminalStreamProcessor { } feedFromTerminal (data: Buffer): void { - if (this.options.inputMode === 'local-echo') { + if (this.options.inputMode === 'local-echo' || this.forceEcho) { this.outputToTerminal.next(this.replaceNewlines(data, 'crlf')) } if (this.options.inputMode?.startsWith('readline')) { @@ -103,8 +100,7 @@ export class TerminalStreamProcessor { close (): void { this.inputReadline.close() - this.outputToSession.complete() - this.outputToTerminal.complete() + super.close() } private onTerminalInput (data: Buffer) { diff --git a/tabby-terminal/src/session.ts b/tabby-terminal/src/session.ts index f390ae10..2d2e8726 100644 --- a/tabby-terminal/src/session.ts +++ b/tabby-terminal/src/session.ts @@ -1,7 +1,8 @@ import { Observable, Subject } from 'rxjs' import { Logger } from 'tabby-core' -import { LoginScriptProcessor, LoginScriptsOptions } from './api/loginScriptProcessing' -import { OSCProcessor } from './api/osc1337Processing' +import { LoginScriptProcessor, LoginScriptsOptions } from './middleware/loginScriptProcessing' +import { OSCProcessor } from './middleware/oscProcessing' +import { SesssionMiddlewareStack } from './api/middleware' /** * A session object for a [[BaseTerminalTabComponent]] @@ -11,6 +12,7 @@ export abstract class BaseSession { open: boolean truePID?: number oscProcessor = new OSCProcessor() + protected readonly middleware = new SesssionMiddlewareStack() protected output = new Subject() protected binaryOutput = new Subject() protected closed = new Subject() @@ -26,20 +28,29 @@ export abstract class BaseSession { get destroyed$ (): Observable { return this.destroyed } constructor (protected logger: Logger) { + this.middleware.push(this.oscProcessor) this.oscProcessor.cwdReported$.subscribe(cwd => { this.reportedCWD = cwd }) + + this.middleware.outputToTerminal$.subscribe(data => { + if (!this.initialDataBufferReleased) { + this.initialDataBuffer = Buffer.concat([this.initialDataBuffer, data]) + } else { + this.output.next(data.toString()) + this.binaryOutput.next(data) + } + }) + + this.middleware.outputToSession$.subscribe(data => this.write(data)) } - emitOutput (data: Buffer): void { - data = this.oscProcessor.process(data) - if (!this.initialDataBufferReleased) { - this.initialDataBuffer = Buffer.concat([this.initialDataBuffer, data]) - } else { - this.output.next(data.toString()) - this.binaryOutput.next(data) - this.loginScriptProcessor?.feedFromSession(data) - } + feedFromTerminal (data: Buffer): void { + this.middleware.feedFromTerminal(data) + } + + protected emitOutput (data: Buffer): void { + this.middleware.feedFromSession(data) } releaseInitialDataBuffer (): void { @@ -50,21 +61,24 @@ export abstract class BaseSession { } setLoginScriptsOptions (options: LoginScriptsOptions): void { - this.loginScriptProcessor?.close() - this.loginScriptProcessor = new LoginScriptProcessor(this.logger, options) - this.loginScriptProcessor.outputToSession$.subscribe(data => this.write(data)) + const newProcessor = new LoginScriptProcessor(this.logger, options) + if (this.loginScriptProcessor) { + this.middleware.replace(this.loginScriptProcessor, newProcessor) + } else { + this.middleware.push(newProcessor) + } + this.loginScriptProcessor = newProcessor } async destroy (): Promise { if (this.open) { this.logger.info('Destroying') this.open = false - this.loginScriptProcessor?.close() this.closed.next() this.destroyed.next() await this.gracefullyKillProcess() } - this.oscProcessor.close() + this.middleware.close() this.closed.complete() this.destroyed.complete() this.output.complete()