Skip to content
This repository was archived by the owner on Jun 26, 2023. It is now read-only.

fix: allow stream methods to be async #404

Merged
merged 2 commits into from
May 17, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 53 additions & 13 deletions packages/interface-stream-muxer/src/stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@ export interface AbstractStreamInit {
onEnd?: (err?: Error | undefined) => void
}

function isPromise (res?: any): res is Promise<void> {
return res != null && typeof res.then === 'function'
}

export abstract class AbstractStream implements Stream {
public id: string
public stat: StreamStat
Expand Down Expand Up @@ -82,7 +86,13 @@ export abstract class AbstractStream implements Stream {
onEnd: () => {
// already sent a reset message
if (this.stat.timeline.reset !== null) {
this.sendCloseRead()
const res = this.sendCloseRead()

if (isPromise(res)) {
res.catch(err => {
log.error('error while sending close read', err)
})
}
}

this.onSourceEnd()
Expand Down Expand Up @@ -169,7 +179,13 @@ export abstract class AbstractStream implements Stream {
try {
// need to call this here as the sink method returns in the catch block
// when the close controller is aborted
this.sendCloseWrite()
const res = this.sendCloseWrite()

if (isPromise(res)) {
res.catch(err => {
log.error('error while sending close write', err)
})
}
} catch (err) {
log.trace('%s stream %s error sending close', this.stat.direction, this.id, err)
}
Expand Down Expand Up @@ -215,17 +231,31 @@ export abstract class AbstractStream implements Stream {
source = abortableSource(source, signal)

if (this.stat.direction === 'outbound') { // If initiator, open a new stream
this.sendNewStream()
const res = this.sendNewStream()

if (isPromise(res)) {
await res
}
}

for await (let data of source) {
while (data.length > 0) {
if (data.length <= this.maxDataSize) {
this.sendData(data instanceof Uint8Array ? new Uint8ArrayList(data) : data)
const res = this.sendData(data instanceof Uint8Array ? new Uint8ArrayList(data) : data)

if (isPromise(res)) { // eslint-disable-line max-depth
await res
}

break
}
data = data instanceof Uint8Array ? new Uint8ArrayList(data) : data
this.sendData(data.sublist(0, this.maxDataSize))
const res = this.sendData(data.sublist(0, this.maxDataSize))

if (isPromise(res)) {
await res
}

data.consume(this.maxDataSize)
}
}
Expand All @@ -252,7 +282,12 @@ export abstract class AbstractStream implements Stream {
} else {
log.trace('%s stream %s error', this.stat.direction, this.id, err)
try {
this.sendReset()
const res = this.sendReset()

if (isPromise(res)) {
await res
}

this.stat.timeline.reset = Date.now()
} catch (err) {
log.trace('%s stream %s error sending reset', this.stat.direction, this.id, err)
Expand All @@ -261,13 +296,18 @@ export abstract class AbstractStream implements Stream {

this.streamSource.end(err)
this.onSinkEnd(err)
return

throw err
} finally {
signal.clear()
}

try {
this.sendCloseWrite()
const res = this.sendCloseWrite()

if (isPromise(res)) {
await res
}
} catch (err) {
log.trace('%s stream %s error sending close', this.stat.direction, this.id, err)
}
Expand Down Expand Up @@ -295,27 +335,27 @@ export abstract class AbstractStream implements Stream {
* Send a message to the remote muxer informing them a new stream is being
* opened
*/
abstract sendNewStream (): void
abstract sendNewStream (): void | Promise<void>

/**
* Send a data message to the remote muxer
*/
abstract sendData (buf: Uint8ArrayList): void
abstract sendData (buf: Uint8ArrayList): void | Promise<void>

/**
* Send a reset message to the remote muxer
*/
abstract sendReset (): void
abstract sendReset (): void | Promise<void>

/**
* Send a message to the remote muxer, informing them no more data messages
* will be sent by this end of the stream
*/
abstract sendCloseWrite (): void
abstract sendCloseWrite (): void | Promise<void>

/**
* Send a message to the remote muxer, informing them no more data messages
* will be read by this end of the stream
*/
abstract sendCloseRead (): void
abstract sendCloseRead (): void | Promise<void>
}