@@ -3,6 +3,9 @@ use assert_fs::prelude::*;
3
3
use encrypt:: header:: { PREFIX , PREFIX_SIZE } ;
4
4
use std:: path:: Path ;
5
5
use std:: process:: { Child , Command , Output } ;
6
+ use std:: sync:: { Arc , Mutex } ;
7
+ use std:: time:: Duration ;
8
+ use uuid:: Uuid ;
6
9
7
10
const PASSWORD : & ' static str = "plop" ;
8
11
const SALT : & ' static str = "12345678901234567890123456789012" ;
@@ -235,6 +238,113 @@ fn end_to_end_upload_and_download() {
235
238
temp. close ( ) . unwrap ( ) ;
236
239
}
237
240
241
+ #[ test]
242
+ fn concurent_uploads ( ) {
243
+ /*
244
+ This test:
245
+ - spawns a node server that stores uploaded files in tests/fixtures/server-static/uploads/
246
+ - spawns a ds proxy that uses the node proxy as a storage backend
247
+ - attempts to store many files concurently files, while the server has a high latency
248
+
249
+ For large number of threads, you may need to increase the open files limit before this works.
250
+ For instance:
251
+ ulimit -n 2048
252
+ */
253
+
254
+ const THREADS_COUNT : u16 = 10 ;
255
+ const DELAY_BETWEEN_TREADS : Duration = Duration :: from_millis ( 10 ) ;
256
+ const SERVER_LATENCY : Duration = Duration :: from_millis ( 100 ) ;
257
+
258
+ let mut proxy_server = launch_proxy ( PrintServerLogs :: No ) ;
259
+ let mut node_server = launch_node_with_latency ( Some ( SERVER_LATENCY ) , PrintServerLogs :: No ) ;
260
+ thread:: sleep ( Duration :: from_secs ( 1 ) ) ;
261
+
262
+ // Spawn threads (with a slight delay between each)
263
+ let mut child_threads = vec ! [ ] ;
264
+ let counter = Arc :: new ( Mutex :: new ( 0 ) ) ;
265
+
266
+ for _ in 0 ..THREADS_COUNT {
267
+ thread:: sleep ( DELAY_BETWEEN_TREADS ) ;
268
+
269
+ let counter = Arc :: clone ( & counter) ;
270
+
271
+ let name = format ! ( "thread {}" , child_threads. len( ) ) ;
272
+ let child_thread = thread:: Builder :: new ( ) . name ( name) . spawn ( move || {
273
+ {
274
+ let mut threads_count = counter. lock ( ) . unwrap ( ) ;
275
+ * threads_count += 1 ;
276
+ println ! ( "Number of threads: {}" , threads_count) ;
277
+ }
278
+
279
+ let original_path = "tests/fixtures/computer.svg" ;
280
+ let original_bytes = std:: fs:: read ( original_path) . unwrap ( ) ;
281
+
282
+ let stored_filename = Uuid :: new_v4 ( ) ;
283
+ let uploaded_path = format ! ( "tests/fixtures/server-static/uploads/{}" , stored_filename) ;
284
+
285
+ let temp = assert_fs:: TempDir :: new ( ) . unwrap ( ) ;
286
+ let decrypted_file = temp. child ( "computer.dec.svg" ) ;
287
+ let decrypted_path = decrypted_file. path ( ) ;
288
+
289
+ let curl_upload = curl_put ( original_path, & format ! ( "localhost:4444/{}" , stored_filename) ) ;
290
+ if !curl_upload. status . success ( ) {
291
+ panic ! ( "unable to upload file!" ) ;
292
+ }
293
+
294
+ let curl_upload = curl_put ( original_path, & format ! ( "localhost:4444/{}" , stored_filename) ) ;
295
+ if !curl_upload. status . success ( ) {
296
+ panic ! ( "unable to upload file!" ) ;
297
+ }
298
+
299
+ let uploaded_bytes = std:: fs:: read ( & uploaded_path) . expect ( "uploaded should exist !" ) ;
300
+ assert ! ( uploaded_bytes. len( ) > 0 ) ;
301
+ assert_eq ! ( & uploaded_bytes[ 0 ..PREFIX_SIZE ] , PREFIX ) ;
302
+
303
+ decrypt ( & uploaded_path, decrypted_path) ;
304
+ let decrypted_bytes = std:: fs:: read ( decrypted_path) . unwrap ( ) ;
305
+ assert_eq ! ( original_bytes. len( ) , decrypted_bytes. len( ) ) ;
306
+ assert_eq ! ( original_bytes, decrypted_bytes) ;
307
+
308
+ let curl_download = curl_get ( & format ! ( "localhost:4444/{}" , stored_filename) ) ;
309
+ assert_eq ! ( curl_download. stdout. len( ) , original_bytes. len( ) ) ;
310
+ assert_eq ! ( curl_download. stdout, original_bytes) ;
311
+
312
+ let curl_socket_download = curl_socket_get ( & format ! ( "localhost:4444/{}" , stored_filename) ) ;
313
+ assert_eq ! ( curl_socket_download. stdout. len( ) , original_bytes. len( ) ) ;
314
+ assert_eq ! ( curl_socket_download. stdout, original_bytes) ;
315
+
316
+ let curl_chunked_download = curl_get ( & format ! ( "localhost:4444/chunked/{}" , stored_filename) ) ;
317
+ assert_eq ! ( curl_chunked_download. stdout. len( ) , original_bytes. len( ) ) ;
318
+ assert_eq ! ( curl_chunked_download. stdout, original_bytes) ;
319
+
320
+ // Cleanup
321
+ temp. close ( ) . unwrap ( ) ;
322
+ std:: fs:: remove_file ( & uploaded_path)
323
+ . expect ( & format ! ( "Unable to remove uploaded file{}!" , uploaded_path) ) ;
324
+
325
+ {
326
+ let mut threads_count = counter. lock ( ) . unwrap ( ) ;
327
+ * threads_count -= 1 ;
328
+ println ! ( "Number of threads: {}" , threads_count) ;
329
+ }
330
+ } ) . unwrap ( ) ;
331
+ child_threads. push ( child_thread) ;
332
+ }
333
+
334
+ // Wait for all threads to have successfully finished
335
+ // (or panic if a child thread panicked.)
336
+ for child_thread in child_threads {
337
+ child_thread. join ( )
338
+ . expect ( "A child thread panicked" ) ;
339
+ }
340
+
341
+ proxy_server. child
342
+ . kill ( )
343
+ . expect ( "killing the proxy server should succeed !" ) ;
344
+ node_server. child
345
+ . kill ( )
346
+ . expect ( "killing node's upload server should succeed !" ) ;
347
+ }
238
348
239
349
//
240
350
// Test helpers
0 commit comments