13
13
import threading
14
14
import time
15
15
16
+ from types import ModuleType
17
+ from typing import Iterable , Optional
18
+
16
19
from flaky import flaky
17
20
import pytest
18
21
44
47
greenlet = None
45
48
46
49
47
- def measurable_line (l ) :
50
+ def measurable_line (l : str ) -> bool :
48
51
"""Is this a line of code coverage will measure?
49
52
50
53
Not blank, not a comment, and not "else"
@@ -59,12 +62,12 @@ def measurable_line(l):
59
62
return True
60
63
61
64
62
- def line_count (s ) :
65
+ def line_count (s : str ) -> int :
63
66
"""How many measurable lines are in `s`?"""
64
67
return len (list (filter (measurable_line , s .splitlines ())))
65
68
66
69
67
- def print_simple_annotation (code , linenos ) :
70
+ def print_simple_annotation (code : str , linenos : Iterable [ int ]) -> None :
68
71
"""Print the lines in `code` with X for each line number in `linenos`."""
69
72
for lineno , line in enumerate (code .splitlines (), start = 1 ):
70
73
print (" {} {}" .format ("X" if lineno in linenos else " " , line ))
@@ -75,7 +78,7 @@ class LineCountTest(CoverageTest):
75
78
76
79
run_in_temp_dir = False
77
80
78
- def test_line_count (self ):
81
+ def test_line_count (self ) -> None :
79
82
CODE = """
80
83
# Hey there!
81
84
x = 1
@@ -169,7 +172,7 @@ def sum_range(limit):
169
172
"""
170
173
171
174
172
- def cant_trace_msg (concurrency , the_module ) :
175
+ def cant_trace_msg (concurrency : str , the_module : Optional [ ModuleType ]) -> Optional [ str ] :
173
176
"""What might coverage.py say about a concurrency setting and imported module?"""
174
177
# In the concurrency choices, "multiprocessing" doesn't count, so remove it.
175
178
if "multiprocessing" in concurrency :
@@ -197,7 +200,13 @@ class ConcurrencyTest(CoverageTest):
197
200
198
201
QLIMIT = 1000
199
202
200
- def try_some_code (self , code , concurrency , the_module , expected_out = None ):
203
+ def try_some_code (
204
+ self ,
205
+ code : str ,
206
+ concurrency : str ,
207
+ the_module : ModuleType ,
208
+ expected_out : Optional [str ]= None ,
209
+ ) -> None :
201
210
"""Run some concurrency testing code and see that it was all covered.
202
211
203
212
`code` is the Python code to execute. `concurrency` is the name of
@@ -232,39 +241,40 @@ def try_some_code(self, code, concurrency, the_module, expected_out=None):
232
241
# If the test fails, it's helpful to see this info:
233
242
fname = abs_file ("try_it.py" )
234
243
linenos = data .lines (fname )
244
+ assert linenos is not None
235
245
print (f"{ len (linenos )} : { linenos } " )
236
246
print_simple_annotation (code , linenos )
237
247
238
248
lines = line_count (code )
239
249
assert line_counts (data )['try_it.py' ] == lines
240
250
241
- def test_threads (self ):
251
+ def test_threads (self ) -> None :
242
252
code = (THREAD + SUM_RANGE_Q + PRINT_SUM_RANGE ).format (QLIMIT = self .QLIMIT )
243
253
self .try_some_code (code , "thread" , threading )
244
254
245
- def test_threads_simple_code (self ):
255
+ def test_threads_simple_code (self ) -> None :
246
256
code = SIMPLE .format (QLIMIT = self .QLIMIT )
247
257
self .try_some_code (code , "thread" , threading )
248
258
249
- def test_eventlet (self ):
259
+ def test_eventlet (self ) -> None :
250
260
code = (EVENTLET + SUM_RANGE_Q + PRINT_SUM_RANGE ).format (QLIMIT = self .QLIMIT )
251
261
self .try_some_code (code , "eventlet" , eventlet )
252
262
253
- def test_eventlet_simple_code (self ):
263
+ def test_eventlet_simple_code (self ) -> None :
254
264
code = SIMPLE .format (QLIMIT = self .QLIMIT )
255
265
self .try_some_code (code , "eventlet" , eventlet )
256
266
257
267
# https://github.com/nedbat/coveragepy/issues/663
258
268
@pytest .mark .skipif (env .WINDOWS , reason = "gevent has problems on Windows: #663" )
259
- def test_gevent (self ):
269
+ def test_gevent (self ) -> None :
260
270
code = (GEVENT + SUM_RANGE_Q + PRINT_SUM_RANGE ).format (QLIMIT = self .QLIMIT )
261
271
self .try_some_code (code , "gevent" , gevent )
262
272
263
- def test_gevent_simple_code (self ):
273
+ def test_gevent_simple_code (self ) -> None :
264
274
code = SIMPLE .format (QLIMIT = self .QLIMIT )
265
275
self .try_some_code (code , "gevent" , gevent )
266
276
267
- def test_greenlet (self ):
277
+ def test_greenlet (self ) -> None :
268
278
GREENLET = """\
269
279
from greenlet import greenlet
270
280
@@ -282,11 +292,11 @@ def test2(u):
282
292
"""
283
293
self .try_some_code (GREENLET , "greenlet" , greenlet , "hello world\n 42\n " )
284
294
285
- def test_greenlet_simple_code (self ):
295
+ def test_greenlet_simple_code (self ) -> None :
286
296
code = SIMPLE .format (QLIMIT = self .QLIMIT )
287
297
self .try_some_code (code , "greenlet" , greenlet )
288
298
289
- def test_bug_330 (self ):
299
+ def test_bug_330 (self ) -> None :
290
300
BUG_330 = """\
291
301
from weakref import WeakKeyDictionary
292
302
import eventlet
@@ -304,7 +314,7 @@ def do():
304
314
"""
305
315
self .try_some_code (BUG_330 , "eventlet" , eventlet , "0\n " )
306
316
307
- def test_threads_with_gevent (self ):
317
+ def test_threads_with_gevent (self ) -> None :
308
318
self .make_file ("both.py" , """\
309
319
import queue
310
320
import threading
@@ -345,25 +355,25 @@ def gwork(q):
345
355
last_line = self .squeezed_lines (out )[- 1 ]
346
356
assert re .search (r"TOTAL \d+ 0 100%" , last_line )
347
357
348
- def test_bad_concurrency (self ):
358
+ def test_bad_concurrency (self ) -> None :
349
359
with pytest .raises (ConfigError , match = "Unknown concurrency choices: nothing" ):
350
360
self .command_line ("run --concurrency=nothing prog.py" )
351
361
352
- def test_bad_concurrency_in_config (self ):
362
+ def test_bad_concurrency_in_config (self ) -> None :
353
363
self .make_file (".coveragerc" , "[run]\n concurrency = nothing\n " )
354
364
with pytest .raises (ConfigError , match = "Unknown concurrency choices: nothing" ):
355
365
self .command_line ("run prog.py" )
356
366
357
- def test_no_multiple_light_concurrency (self ):
367
+ def test_no_multiple_light_concurrency (self ) -> None :
358
368
with pytest .raises (ConfigError , match = "Conflicting concurrency settings: eventlet, gevent" ):
359
369
self .command_line ("run --concurrency=gevent,eventlet prog.py" )
360
370
361
- def test_no_multiple_light_concurrency_in_config (self ):
371
+ def test_no_multiple_light_concurrency_in_config (self ) -> None :
362
372
self .make_file (".coveragerc" , "[run]\n concurrency = gevent, eventlet\n " )
363
373
with pytest .raises (ConfigError , match = "Conflicting concurrency settings: eventlet, gevent" ):
364
374
self .command_line ("run prog.py" )
365
375
366
- def test_multiprocessing_needs_config_file (self ):
376
+ def test_multiprocessing_needs_config_file (self ) -> None :
367
377
with pytest .raises (ConfigError , match = "multiprocessing requires a configuration file" ):
368
378
self .command_line ("run --concurrency=multiprocessing prog.py" )
369
379
@@ -372,9 +382,9 @@ class WithoutConcurrencyModuleTest(CoverageTest):
372
382
"""Tests of what happens if the requested concurrency isn't installed."""
373
383
374
384
@pytest .mark .parametrize ("module" , ["eventlet" , "gevent" , "greenlet" ])
375
- def test_missing_module (self , module ) :
385
+ def test_missing_module (self , module : str ) -> None :
376
386
self .make_file ("prog.py" , "a = 1" )
377
- sys .modules [module ] = None
387
+ sys .modules [module ] = None # type: ignore[assignment]
378
388
msg = f"Couldn't trace with concurrency={ module } , the module isn't installed."
379
389
with pytest .raises (ConfigError , match = msg ):
380
390
self .command_line (f"run --concurrency={ module } prog.py" )
@@ -428,9 +438,9 @@ def process_worker_main(args):
428
438
429
439
430
440
@pytest .fixture (params = ["fork" , "spawn" ], name = "start_method" )
431
- def start_method_fixture (request ) :
441
+ def start_method_fixture (request : pytest . FixtureRequest ) -> str :
432
442
"""Parameterized fixture to choose the start_method for multiprocessing."""
433
- start_method = request .param
443
+ start_method : str = request .param
434
444
if start_method not in multiprocessing .get_all_start_methods ():
435
445
# Windows doesn't support "fork".
436
446
pytest .skip (f"start_method={ start_method } not supported here" )
@@ -443,14 +453,14 @@ class MultiprocessingTest(CoverageTest):
443
453
444
454
def try_multiprocessing_code (
445
455
self ,
446
- code ,
447
- expected_out ,
448
- the_module ,
449
- nprocs ,
450
- start_method ,
451
- concurrency = "multiprocessing" ,
452
- args = "" ,
453
- ):
456
+ code : str ,
457
+ expected_out : Optional [ str ] ,
458
+ the_module : ModuleType ,
459
+ nprocs : int ,
460
+ start_method : str ,
461
+ concurrency : str = "multiprocessing" ,
462
+ args : str = "" ,
463
+ ) -> None :
454
464
"""Run code using multiprocessing, it should produce `expected_out`."""
455
465
self .make_file ("multi.py" , code )
456
466
self .make_file (".coveragerc" , f"""\
@@ -459,9 +469,7 @@ def try_multiprocessing_code(
459
469
source = .
460
470
""" )
461
471
462
- cmd = "coverage run {args} multi.py {start_method}" .format (
463
- args = args , start_method = start_method ,
464
- )
472
+ cmd = f"coverage run { args } multi.py { start_method } "
465
473
out = self .run_command (cmd )
466
474
expected_cant_trace = cant_trace_msg (concurrency , the_module )
467
475
@@ -489,7 +497,7 @@ def try_multiprocessing_code(
489
497
last_line = self .squeezed_lines (out )[- 1 ]
490
498
assert re .search (r"TOTAL \d+ 0 100%" , last_line )
491
499
492
- def test_multiprocessing_simple (self , start_method ) :
500
+ def test_multiprocessing_simple (self , start_method : str ) -> None :
493
501
nprocs = 3
494
502
upto = 30
495
503
code = (SQUARE_OR_CUBE_WORK + MULTI_CODE ).format (NPROCS = nprocs , UPTO = upto )
@@ -503,7 +511,7 @@ def test_multiprocessing_simple(self, start_method):
503
511
start_method = start_method ,
504
512
)
505
513
506
- def test_multiprocessing_append (self , start_method ) :
514
+ def test_multiprocessing_append (self , start_method : str ) -> None :
507
515
nprocs = 3
508
516
upto = 30
509
517
code = (SQUARE_OR_CUBE_WORK + MULTI_CODE ).format (NPROCS = nprocs , UPTO = upto )
@@ -518,7 +526,7 @@ def test_multiprocessing_append(self, start_method):
518
526
start_method = start_method ,
519
527
)
520
528
521
- def test_multiprocessing_and_gevent (self , start_method ) :
529
+ def test_multiprocessing_and_gevent (self , start_method : str ) -> None :
522
530
nprocs = 3
523
531
upto = 30
524
532
code = (
@@ -535,7 +543,7 @@ def test_multiprocessing_and_gevent(self, start_method):
535
543
start_method = start_method ,
536
544
)
537
545
538
- def test_multiprocessing_with_branching (self , start_method ) :
546
+ def test_multiprocessing_with_branching (self , start_method : str ) -> None :
539
547
nprocs = 3
540
548
upto = 30
541
549
code = (SQUARE_OR_CUBE_WORK + MULTI_CODE ).format (NPROCS = nprocs , UPTO = upto )
@@ -559,7 +567,7 @@ def test_multiprocessing_with_branching(self, start_method):
559
567
last_line = self .squeezed_lines (out )[- 1 ]
560
568
assert re .search (r"TOTAL \d+ 0 \d+ 0 100%" , last_line )
561
569
562
- def test_multiprocessing_bootstrap_error_handling (self ):
570
+ def test_multiprocessing_bootstrap_error_handling (self ) -> None :
563
571
# An exception during bootstrapping will be reported.
564
572
self .make_file ("multi.py" , """\
565
573
import multiprocessing
@@ -576,7 +584,7 @@ def test_multiprocessing_bootstrap_error_handling(self):
576
584
assert "Exception during multiprocessing bootstrap init" in out
577
585
assert "Exception: Crashing because called by _bootstrap" in out
578
586
579
- def test_bug_890 (self ):
587
+ def test_bug_890 (self ) -> None :
580
588
# chdir in multiprocessing shouldn't keep us from finding the
581
589
# .coveragerc file.
582
590
self .make_file ("multi.py" , """\
@@ -596,11 +604,11 @@ def test_bug_890(self):
596
604
assert out .splitlines ()[- 1 ] == "ok"
597
605
598
606
599
- def test_coverage_stop_in_threads ():
607
+ def test_coverage_stop_in_threads () -> None :
600
608
has_started_coverage = []
601
609
has_stopped_coverage = []
602
610
603
- def run_thread (): # pragma: nested
611
+ def run_thread () -> None : # pragma: nested
604
612
"""Check that coverage is stopping properly in threads."""
605
613
deadline = time .time () + 5
606
614
ident = threading .current_thread ().ident
@@ -648,7 +656,7 @@ def test_thread_safe_save_data(tmp_path: pathlib.Path) -> None:
648
656
for module_name in module_names :
649
657
import_local_file (module_name )
650
658
651
- def random_load (): # pragma: nested
659
+ def random_load () -> None : # pragma: nested
652
660
"""Import modules randomly to stress coverage."""
653
661
while should_run [0 ]:
654
662
module_name = random .choice (module_names )
@@ -695,7 +703,7 @@ class SigtermTest(CoverageTest):
695
703
"""Tests of our handling of SIGTERM."""
696
704
697
705
@pytest .mark .parametrize ("sigterm" , [False , True ])
698
- def test_sigterm_saves_data (self , sigterm ) :
706
+ def test_sigterm_saves_data (self , sigterm : bool ) -> None :
699
707
# A terminated process should save its coverage data.
700
708
self .make_file ("clobbered.py" , """\
701
709
import multiprocessing
@@ -741,7 +749,7 @@ def subproc(x):
741
749
expected = "clobbered.py 17 5 71% 5-10"
742
750
assert self .squeezed_lines (out )[2 ] == expected
743
751
744
- def test_sigterm_still_runs (self ):
752
+ def test_sigterm_still_runs (self ) -> None :
745
753
# A terminated process still runs its own SIGTERM handler.
746
754
self .make_file ("handler.py" , """\
747
755
import multiprocessing
0 commit comments