@@ -15,7 +15,7 @@ const DEFAULT_WAIT_FOR_TIMEOUT = 1000
15
15
*
16
16
* @returns A stream.
17
17
*/
18
- function sink ( { destroyOnError = false , emitErrorEvent = false } = { } ) {
18
+ function sink ( { destroyOnError = false , emitErrorEvent = false } = { } ) {
19
19
const stream = split ( ( data ) => {
20
20
try {
21
21
return JSON . parse ( data )
@@ -37,7 +37,7 @@ function sink ({ destroyOnError = false, emitErrorEvent = false } = {}) {
37
37
*
38
38
* @throws If the expected value is not equal to the chunk value.
39
39
*/
40
- function check ( chunk , expectedOrCallback , assert ) {
40
+ function check ( chunk , expectedOrCallback , assert ) {
41
41
const { time, pid, hostname, ...chunkCopy } = chunk
42
42
43
43
nodeAssert . strictEqual ( new Date ( time ) <= new Date ( ) , true , 'time is greater than Date.now()' )
@@ -77,7 +77,7 @@ function check (chunk, expectedOrCallback, assert) {
77
77
* assert.strictEqual(log.msg, 'hello world 1')
78
78
* })
79
79
*/
80
- async function once ( stream , expectedOrCallback , assert = nodeAssert . deepStrictEqual ) {
80
+ async function once ( stream , expectedOrCallback , assert = nodeAssert . deepStrictEqual ) {
81
81
return new Promise ( ( resolve , reject ) => {
82
82
const dataHandler = ( data ) => {
83
83
stream . removeListener ( 'error' , reject )
@@ -118,7 +118,7 @@ async function once (stream, expectedOrCallback, assert = nodeAssert.deepStrictE
118
118
* ]
119
119
* await pinoTest.consecutive(stream, expecteds)
120
120
*/
121
- async function consecutive ( stream , expectedOrCallbacks , assert = nodeAssert . deepStrictEqual ) {
121
+ async function consecutive ( stream , expectedOrCallbacks , assert = nodeAssert . deepStrictEqual ) {
122
122
let i = 0
123
123
for await ( const chunk of stream ) {
124
124
check ( chunk , expectedOrCallbacks [ i ] , assert )
@@ -136,9 +136,11 @@ async function consecutive (stream, expectedOrCallbacks, assert = nodeAssert.dee
136
136
*
137
137
* @param {import('node:stream').Transform } stream The stream to be tested.
138
138
* @param {Array<object | Function> } expectedsOrCallbacks The array of expected values to be tested or callback functions.
139
- * @param {Function } [assert=nodeAssert.deepStrictEqual] The assert function to be used when the expectedOrCallback parameter is an object.
140
- * @param {Number } [timeout=1000] The time in milliseconds to wait for the expected value.
141
- *
139
+ * @param {Function } [assert=nodeAssert.partialDeepStrictEqual] The assert function to be used when the expectedOrCallback parameter is an object.
140
+ * @param {Object } [options] The options to be used.
141
+ * @param {Number } [options.timeout=1000] The time in milliseconds to wait for the expected value.
142
+ * @param {Number } [options.maxMessages=100] The maximum number of messages to wait for.
143
+ * @param {Boolean } [options.debug=false] If true, the stream will be logged to the console.
142
144
* @returns A promise that resolves when the expected value is equal to the stream value.
143
145
* @throws If the expected value is not equal to the stream value.
144
146
* @throws If the callback function throws an error.
@@ -154,8 +156,37 @@ async function consecutive (stream, expectedOrCallbacks, assert = nodeAssert.dee
154
156
*
155
157
* await pinoTest.waitFor(stream, { msg: 'server started', level: 30 })
156
158
*/
157
- async function waitFor ( stream , expectedOrCallbacks , assert = nodeAssert . deepStrictEqual , timeout = DEFAULT_WAIT_FOR_TIMEOUT ) {
158
- // TODO implementation
159
+ async function waitFor ( stream , expectedOrCallbacks , assert = nodeAssert . partialDeepStrictEqual , { maxMessages = DEFAULT_MAX_MESSAGES , timeout = DEFAULT_WAIT_FOR_TIMEOUT , debug = false } = { } ) {
160
+ return new Promise ( ( resolve , reject ) => {
161
+ let count = 0
162
+ const fn = ( message ) => {
163
+ if ( debug ) {
164
+ console . log ( 'waitFor received' , message )
165
+ }
166
+
167
+ if ( check ( message , expectedOrCallbacks , assert ) ) {
168
+ stream . off ( 'data' , fn )
169
+ stream . off ( 'error' , reject )
170
+ resolve ( )
171
+ }
172
+ count ++
173
+ if ( count > maxMessages ) {
174
+ stream . off ( 'data' , fn )
175
+ stream . off ( 'error' , reject )
176
+ const identifier = typeof expectedOrCallbacks !== 'function' ? expectedOrCallbacks . msg : '[function]'
177
+ reject ( new Error ( `Max message count reached on waitFor: ${ identifier } ` ) )
178
+ }
179
+ }
180
+ stream . on ( 'data' , fn )
181
+ stream . on ( 'error' , reject )
182
+
183
+ setTimeout ( ( ) => {
184
+ stream . off ( 'data' , fn )
185
+ stream . off ( 'error' , reject )
186
+ const identifier = typeof expectedOrCallbacks !== 'function' ? expectedOrCallbacks . msg : '[function]'
187
+ reject ( new Error ( `Timeout on waitFor: ${ identifier } ` ) )
188
+ } , timeout )
189
+ } )
159
190
}
160
191
161
- module . exports = { sink, once, consecutive }
192
+ module . exports = { sink, once, consecutive, waitFor }
0 commit comments