sftp file transfers

This commit is contained in:
Eugene
2024-07-20 14:28:59 +02:00
parent ec8ccb5d43
commit c2922c960b
7 changed files with 95 additions and 120 deletions

View File

@@ -63,22 +63,24 @@ export abstract class FileTransfer {
}
export abstract class FileDownload extends FileTransfer {
abstract write (buffer: Buffer): Promise<void>
abstract write (buffer: Uint8Array): Promise<void>
}
export abstract class FileUpload extends FileTransfer {
abstract read (): Promise<Buffer>
abstract read (): Promise<Uint8Array>
async readAll (): Promise<Buffer> {
const buffers: Buffer[] = []
async readAll (): Promise<Uint8Array> {
const result = new Uint8Array(this.getSize())
let pos = 0
while (true) {
const buf = await this.read()
if (!buf.length) {
break
}
buffers.push(Buffer.from(buf))
result.set(buf, pos)
pos += buf.length
}
return Buffer.concat(buffers)
return result
}
}
@@ -210,12 +212,12 @@ export class HTMLFileUpload extends FileUpload {
return this.file.size
}
async read (): Promise<Buffer> {
async read (): Promise<Uint8Array> {
const result: any = await this.reader.read()
if (result.done || !result.value) {
return Buffer.from('')
return new Uint8Array(0)
}
const chunk = Buffer.from(result.value)
const chunk = new Uint8Array(result.value)
this.increaseProgress(chunk.length)
return chunk
}

View File

@@ -306,7 +306,7 @@ export class VaultFileProvider extends FileProvider {
id,
description: `${description} (${transfer.getName()})`,
},
value: (await transfer.readAll()).toString('base64'),
value: Buffer.from(await transfer.readAll()).toString('base64'),
})
return `${this.prefix}${id}`
}

View File

@@ -263,12 +263,12 @@ class ElectronFileUpload extends FileUpload {
private size: number
private mode: number
private file: fs.FileHandle
private buffer: Buffer
private buffer: Uint8Array
private powerSaveBlocker = 0
constructor (private filePath: string, private electron: ElectronService) {
super()
this.buffer = Buffer.alloc(256 * 1024)
this.buffer = new Uint8Array(256 * 1024)
this.powerSaveBlocker = electron.powerSaveBlocker.start('prevent-app-suspension')
}
@@ -291,7 +291,7 @@ class ElectronFileUpload extends FileUpload {
return this.size
}
async read (): Promise<Buffer> {
async read (): Promise<Uint8Array> {
const result = await this.file.read(this.buffer, 0, this.buffer.length, null)
this.increaseProgress(result.bytesRead)
return this.buffer.slice(0, result.bytesRead)
@@ -333,7 +333,7 @@ class ElectronFileDownload extends FileDownload {
return this.size
}
async write (buffer: Buffer): Promise<void> {
async write (buffer: Uint8Array): Promise<void> {
let pos = 0
while (pos < buffer.length) {
const result = await this.file.write(buffer, pos, buffer.length - pos, null)

View File

@@ -49,6 +49,10 @@ export class EditSFTPContextMenu extends SFTPContextMenuItemProvider {
this.platform.openPath(tempPath)
const events = new Subject<string>()
fs.chmodSync(tempPath, 0o700)
// skip the first burst of events
setTimeout(() => {
const watcher = fs.watch(tempPath, event => events.next(event))
events.pipe(debounceTime(1000), debounce(async event => {
if (event === 'rename') {
@@ -63,5 +67,6 @@ export class EditSFTPContextMenu extends SFTPContextMenuItemProvider {
})).subscribe()
watcher.on('close', () => events.complete())
sftp.closed$.subscribe(() => watcher.close())
}, 1000)
}
}

View File

@@ -123,7 +123,7 @@ export class VaultSettingsTabComponent extends BaseComponent {
}
await this.vault.updateSecret(secret, {
...secret,
value: (await transfers[0].readAll()).toString('base64'),
value: Buffer.from(await transfers[0].readAll()).toString('base64'),
})
this.loadVault()
}

View File

@@ -18,54 +18,27 @@ export interface SFTPFile {
export class SFTPFileHandle {
position = 0
// constructor (
// private sftp: russh.SFTP,
// private handle: Buffer,
// private zone: NgZone,
// ) { }
constructor (
private inner: russh.SFTPFile|null,
) { }
read (): Promise<Buffer> {
throw new Error('Not implemented')
// const buffer = Buffer.alloc(256 * 1024)
// return wrapPromise(this.zone, new Promise((resolve, reject) => {
// while (true) {
// const wait = this.sftp.read(this.handle, buffer, 0, buffer.length, this.position, (err, read) => {
// if (err) {
// reject(err)
// return
// }
// this.position += read
// resolve(buffer.slice(0, read))
// })
// if (!wait) {
// break
// }
// }
// }))
async read (): Promise<Uint8Array> {
if (!this.inner) {
return Promise.resolve(new Uint8Array(0))
}
return this.inner.read(256 * 1024)
}
write (chunk: Buffer): Promise<void> {
throw new Error('Not implemented')
// return wrapPromise(this.zone, new Promise<void>((resolve, reject) => {
// while (true) {
// const wait = this.sftp.write(this.handle, chunk, 0, chunk.length, this.position, err => {
// if (err) {
// reject(err)
// return
// }
// this.position += chunk.length
// resolve()
// })
// if (!wait) {
// break
// }
// }
// }))
async write (chunk: Uint8Array): Promise<void> {
if (!this.inner) {
throw new Error('File handle is closed')
}
await this.inner.writeAll(chunk)
}
close (): Promise<void> {
throw new Error('Not implemented')
// return wrapPromise(this.zone, promisify(this.sftp.close.bind(this.sftp))(this.handle))
async close (): Promise<void> {
await this.inner?.shutdown()
this.inner = null
}
}
@@ -109,11 +82,10 @@ export class SFTPSession {
}
}
async open (p: string, mode: string): Promise<SFTPFileHandle> {
throw new Error('Not implemented')
// this.logger.debug('open', p)
// const handle = await wrapPromise(this.zone, promisify<Buffer>(f => this.sftp.open(p, mode, f))())
// return new SFTPFileHandle(this.sftp, handle, this.zone)
async open (p: string, mode: number): Promise<SFTPFileHandle> {
this.logger.debug('open', p, mode)
const handle = await this.sftp.open(p, mode)
return new SFTPFileHandle(handle)
}
async rmdir (p: string): Promise<void> {
@@ -139,49 +111,45 @@ export class SFTPSession {
}
async upload (path: string, transfer: FileUpload): Promise<void> {
throw new Error('Not implemented')
// this.logger.info('Uploading into', path)
// const tempPath = path + '.tabby-upload'
// try {
// const handle = await this.open(tempPath, 'w')
// while (true) {
// const chunk = await transfer.read()
// if (!chunk.length) {
// break
// }
// await handle.write(chunk)
// }
// handle.close()
// try {
// await this.unlink(path)
// } catch { }
// await this.rename(tempPath, path)
// transfer.close()
// } catch (e) {
// transfer.cancel()
// this.unlink(tempPath)
// throw e
// }
this.logger.info('Uploading into', path)
const tempPath = path + '.tabby-upload'
try {
const handle = await this.open(tempPath, russh.OPEN_WRITE | russh.OPEN_CREATE)
while (true) {
const chunk = await transfer.read()
if (!chunk.length) {
break
}
await handle.write(chunk)
}
await handle.close()
await this.unlink(path).catch(() => null)
await this.rename(tempPath, path)
transfer.close()
} catch (e) {
transfer.cancel()
this.unlink(tempPath).catch(() => null)
throw e
}
}
async download (path: string, transfer: FileDownload): Promise<void> {
throw new Error('Not implemented')
// this.logger.info('Downloading', path)
// try {
// const handle = await this.open(path, 'r')
// while (true) {
// const chunk = await handle.read()
// if (!chunk.length) {
// break
// }
// await transfer.write(chunk)
// }
// transfer.close()
// handle.close()
// } catch (e) {
// transfer.cancel()
// throw e
// }
this.logger.info('Downloading', path)
try {
const handle = await this.open(path, russh.OPEN_READ)
while (true) {
const chunk = await handle.read()
if (!chunk.length) {
break
}
await transfer.write(chunk)
}
transfer.close()
handle.close()
} catch (e) {
transfer.cancel()
throw e
}
}
private _makeFile (p: string, entry: russh.SFTPDirectoryEntry): SFTPFile {

View File

@@ -145,7 +145,7 @@ export class WebPlatformService extends PlatformService {
}
class HTMLFileDownload extends FileDownload {
private buffers: Buffer[] = []
private buffers: Uint8Array[] = []
constructor (
private name: string,
@@ -167,8 +167,8 @@ class HTMLFileDownload extends FileDownload {
return this.size
}
async write (buffer: Buffer): Promise<void> {
this.buffers.push(Buffer.from(buffer))
async write (buffer: Uint8Array): Promise<void> {
this.buffers.push(Uint8Array.from(buffer))
this.increaseProgress(buffer.length)
if (this.isComplete()) {
this.finish()