Skip to content

Commit 2b08f9d

Browse files
committed
Implemented writev.
1 parent dc39271 commit 2b08f9d

File tree

3 files changed

+129
-6
lines changed

3 files changed

+129
-6
lines changed

package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525
"dependencies": {
2626
"duplexify": "^3.2.0",
2727
"inherits": "^2.0.1",
28-
"through2": "^2.0.0",
28+
"readable-stream": "^2.2.0",
2929
"ws": "^2.2.3",
3030
"xtend": "^4.0.0"
3131
},

stream.js

Lines changed: 50 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,41 @@
11
'use strict'
22

3-
var through = require('through2')
3+
var Transform = require('readable-stream').Transform
44
var duplexify = require('duplexify')
55
var WS = require('ws')
66

77
module.exports = WebSocketStream
88

9+
function buildProxy (options, socketWrite, socketEnd) {
10+
var proxy = new Transform({
11+
objectMode: options.objectMode
12+
})
13+
14+
proxy._destroyed = false
15+
proxy._write = socketWrite
16+
proxy._flush = socketEnd
17+
18+
proxy.destroy = function(err) {
19+
if (this._destroyed) return
20+
this._destroyed = true
21+
22+
var self = this
23+
process.nextTick(function() {
24+
if (err)
25+
self.emit('error', err)
26+
self.emit('close')
27+
})
28+
}
29+
30+
return proxy
31+
}
32+
933
function WebSocketStream(target, protocols, options) {
1034
var stream, socket
1135

1236
var isBrowser = process.title === 'browser'
1337
var isNative = !!global.WebSocket
1438
var socketWrite = isBrowser ? socketWriteBrowser : socketWriteNode
15-
var proxy = through.obj(socketWrite, socketEnd)
1639

1740
if (protocols && !Array.isArray(protocols) && 'object' === typeof protocols) {
1841
// accept the "options" Object as the 2nd argument
@@ -26,6 +49,16 @@ function WebSocketStream(target, protocols, options) {
2649

2750
if (!options) options = {}
2851

52+
if (options.objectMode === undefined) {
53+
options.objectMode = !(options.binary === true || options.binary === undefined)
54+
}
55+
56+
var proxy = buildProxy(options, socketWrite, socketEnd)
57+
58+
if (!options.objectMode) {
59+
proxy._writev = writev
60+
}
61+
2962
// browser only: sets the maximum socket buffer size before throttling
3063
var bufferSize = options.browserBufferSize || 1024 * 512
3164

@@ -64,7 +97,7 @@ function WebSocketStream(target, protocols, options) {
6497

6598
proxy.on('close', destroy)
6699

67-
var coerceToBuffer = options.binary || options.binary === undefined
100+
var coerceToBuffer = !options.objectMode
68101

69102
function socketWriteNode(chunk, enc, next) {
70103
// avoid errors, this never happens unless
@@ -130,5 +163,19 @@ function WebSocketStream(target, protocols, options) {
130163
socket.close()
131164
}
132165

166+
// this is to be enabled only if objectMode is false
167+
function writev (chunks, cb) {
168+
var buffers = new Array(chunks.length)
169+
for (var i = 0; i < chunks.length; i++) {
170+
if (typeof chunks[i].chunk === 'string') {
171+
buffers[i] = new Buffer(chunks[i], 'utf8') // TODO use safe-buffer
172+
} else {
173+
buffers[i] = chunks[i].chunk
174+
}
175+
}
176+
177+
this._write(Buffer.concat(buffers), 'binary', cb)
178+
}
179+
133180
return stream
134181
}

test.js

Lines changed: 78 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ var concat = require('concat-stream')
88
test('echo server', function(t) {
99

1010
echo.start(function() {
11-
var client = websocket(echo.url, echo.options)
11+
var client = websocket(echo.url)
1212

1313
client.on('error', console.error)
1414

@@ -29,7 +29,7 @@ test('echo server', function(t) {
2929
test('emitting not connected errors', function(t) {
3030

3131
echo.start(function() {
32-
var client = websocket(echo.url, echo.options)
32+
var client = websocket(echo.url)
3333

3434
client.on('error', function() {
3535
echo.stop(function() {
@@ -225,3 +225,79 @@ test('stream handlers should fire once per connection', function(t) {
225225
w.end('pizza cats\n')
226226
})
227227
})
228+
229+
test('client with writev', function(t) {
230+
var server = http.createServer()
231+
232+
var str = ''
233+
var wss = websocket.createServer({
234+
server: server
235+
}, function (stream) {
236+
stream.once('data', function(data) {
237+
t.ok(Buffer.isBuffer(data), 'is a buffer')
238+
t.equal(data.toString(), 'hello world')
239+
240+
stream.once('data', function(data) {
241+
t.ok(Buffer.isBuffer(data), 'is a buffer')
242+
t.equal(data.toString(), str)
243+
stream.end()
244+
server.close()
245+
t.end()
246+
})
247+
})
248+
})
249+
250+
server.listen(8352, function () {
251+
var client = websocket('ws://localhost:8352', {
252+
objectMode: false
253+
})
254+
255+
client.on('error', console.error)
256+
257+
client.once('connect', function () {
258+
client.cork()
259+
do {
260+
str += 'foobar'
261+
} while (client.write('foobar'))
262+
client.uncork()
263+
})
264+
265+
client.write('hello world')
266+
})
267+
})
268+
269+
test('server with writev', function(t) {
270+
var server = http.createServer()
271+
272+
var str = ''
273+
var wss = websocket.createServer({
274+
server: server,
275+
objectMode: false
276+
}, function (stream) {
277+
stream.cork()
278+
do {
279+
str += 'foobar'
280+
} while (stream.write('foobar'))
281+
stream.uncork()
282+
})
283+
284+
server.listen(8352, function () {
285+
var client = websocket('ws://localhost:8352')
286+
287+
client.on('error', console.error)
288+
289+
client.once('data', function(data) {
290+
t.ok(Buffer.isBuffer(data), 'is a buffer')
291+
t.equal(data.toString(), str)
292+
client.end()
293+
server.close()
294+
t.end()
295+
})
296+
})
297+
})
298+
299+
test('stop echo', function(t) {
300+
echo.stop(function() {
301+
t.end()
302+
})
303+
})

0 commit comments

Comments
 (0)