Skip to content
This repository was archived by the owner on Nov 23, 2017. It is now read-only.
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
127 changes: 79 additions & 48 deletions asyncio/selector_events.py
Original file line number Diff line number Diff line change
@@ -11,6 +11,7 @@
import functools
import socket
import warnings
import weakref
try:
import ssl
except ImportError: # pragma: no cover
@@ -64,6 +65,7 @@ def __init__(self, selector=None):
logger.debug('Using selector: %s', selector.__class__.__name__)
self._selector = selector
self._make_self_pipe()
self._transports = weakref.WeakValueDictionary()

def _make_socket_transport(self, sock, protocol, waiter=None, *,
extra=None, server=None):
@@ -115,7 +117,7 @@ def _socketpair(self):
raise NotImplementedError

def _close_self_pipe(self):
self.remove_reader(self._ssock.fileno())
self._remove_reader(self._ssock.fileno())
self._ssock.close()
self._ssock = None
self._csock.close()
@@ -128,7 +130,7 @@ def _make_self_pipe(self):
self._ssock.setblocking(False)
self._csock.setblocking(False)
self._internal_fds += 1
self.add_reader(self._ssock.fileno(), self._read_from_self)
self._add_reader(self._ssock.fileno(), self._read_from_self)

def _process_self_data(self, data):
pass
@@ -163,8 +165,8 @@ def _write_to_self(self):

def _start_serving(self, protocol_factory, sock,
sslcontext=None, server=None, backlog=100):
self.add_reader(sock.fileno(), self._accept_connection,
protocol_factory, sock, sslcontext, server, backlog)
self._add_reader(sock.fileno(), self._accept_connection,
protocol_factory, sock, sslcontext, server, backlog)

def _accept_connection(self, protocol_factory, sock,
sslcontext=None, server=None, backlog=100):
@@ -194,7 +196,7 @@ def _accept_connection(self, protocol_factory, sock,
'exception': exc,
'socket': sock,
})
self.remove_reader(sock.fileno())
self._remove_reader(sock.fileno())
self.call_later(constants.ACCEPT_RETRY_DELAY,
self._start_serving,
protocol_factory, sock, sslcontext, server,
@@ -244,8 +246,18 @@ def _accept_connection2(self, protocol_factory, conn, extra,
context['transport'] = transport
self.call_exception_handler(context)

def add_reader(self, fd, callback, *args):
"""Add a reader callback."""
def _ensure_fd_no_transport(self, fd):
try:
transport = self._transports[fd]
except KeyError:
pass
else:
if not transport.is_closing():
raise RuntimeError(
'File descriptor {!r} is used by transport {!r}'.format(
fd, transport))

def _add_reader(self, fd, callback, *args):
self._check_closed()
handle = events.Handle(callback, args, self)
try:
@@ -260,8 +272,7 @@ def add_reader(self, fd, callback, *args):
if reader is not None:
reader.cancel()

def remove_reader(self, fd):
"""Remove a reader callback."""
def _remove_reader(self, fd):
if self.is_closed():
return False
try:
@@ -282,8 +293,7 @@ def remove_reader(self, fd):
else:
return False

def add_writer(self, fd, callback, *args):
"""Add a writer callback.."""
def _add_writer(self, fd, callback, *args):
self._check_closed()
handle = events.Handle(callback, args, self)
try:
@@ -298,7 +308,7 @@ def add_writer(self, fd, callback, *args):
if writer is not None:
writer.cancel()

def remove_writer(self, fd):
def _remove_writer(self, fd):
"""Remove a writer callback."""
if self.is_closed():
return False
@@ -321,6 +331,26 @@ def remove_writer(self, fd):
else:
return False

def add_reader(self, fd, callback, *args):
"""Add a reader callback."""
self._ensure_fd_no_transport(fd)
return self._add_reader(fd, callback, *args)

def remove_reader(self, fd):
"""Remove a reader callback."""
self._ensure_fd_no_transport(fd)
return self._remove_reader(fd)

def add_writer(self, fd, callback, *args):
"""Add a writer callback.."""
self._ensure_fd_no_transport(fd)
return self._add_writer(fd, callback, *args)

def remove_writer(self, fd):
"""Remove a writer callback."""
self._ensure_fd_no_transport(fd)
return self._remove_writer(fd)

def sock_recv(self, sock, n):
"""Receive data from the socket.
@@ -494,17 +524,17 @@ def _process_events(self, event_list):
fileobj, (reader, writer) = key.fileobj, key.data
if mask & selectors.EVENT_READ and reader is not None:
if reader._cancelled:
self.remove_reader(fileobj)
self._remove_reader(fileobj)
else:
self._add_callback(reader)
if mask & selectors.EVENT_WRITE and writer is not None:
if writer._cancelled:
self.remove_writer(fileobj)
self._remove_writer(fileobj)
else:
self._add_callback(writer)

def _stop_serving(self, sock):
self.remove_reader(sock.fileno())
self._remove_reader(sock.fileno())
sock.close()


@@ -539,6 +569,7 @@ def __init__(self, loop, sock, protocol, extra=None, server=None):
self._closing = False # Set when close() called.
if self._server is not None:
self._server._attach()
loop._transports[self._sock_fd] = self

def __repr__(self):
info = [self.__class__.__name__]
@@ -584,10 +615,10 @@ def close(self):
if self._closing:
return
self._closing = True
self._loop.remove_reader(self._sock_fd)
self._loop._remove_reader(self._sock_fd)
if not self._buffer:
self._conn_lost += 1
self._loop.remove_writer(self._sock_fd)
self._loop._remove_writer(self._sock_fd)
self._loop.call_soon(self._call_connection_lost, None)

# On Python 3.3 and older, objects with a destructor part of a reference
@@ -618,10 +649,10 @@ def _force_close(self, exc):
return
if self._buffer:
self._buffer.clear()
self._loop.remove_writer(self._sock_fd)
self._loop._remove_writer(self._sock_fd)
if not self._closing:
self._closing = True
self._loop.remove_reader(self._sock_fd)
self._loop._remove_reader(self._sock_fd)
self._conn_lost += 1
self._loop.call_soon(self._call_connection_lost, exc)

@@ -658,7 +689,7 @@ def __init__(self, loop, sock, protocol, waiter=None,

self._loop.call_soon(self._protocol.connection_made, self)
# only start reading when connection_made() has been called
self._loop.call_soon(self._loop.add_reader,
self._loop.call_soon(self._loop._add_reader,
self._sock_fd, self._read_ready)
if waiter is not None:
# only wake up the waiter when connection_made() has been called
@@ -671,7 +702,7 @@ def pause_reading(self):
if self._paused:
raise RuntimeError('Already paused')
self._paused = True
self._loop.remove_reader(self._sock_fd)
self._loop._remove_reader(self._sock_fd)
if self._loop.get_debug():
logger.debug("%r pauses reading", self)

@@ -681,7 +712,7 @@ def resume_reading(self):
self._paused = False
if self._closing:
return
self._loop.add_reader(self._sock_fd, self._read_ready)
self._loop._add_reader(self._sock_fd, self._read_ready)
if self._loop.get_debug():
logger.debug("%r resumes reading", self)

@@ -705,7 +736,7 @@ def _read_ready(self):
# We're keeping the connection open so the
# protocol can write more, but we still can't
# receive more, so remove the reader callback.
self._loop.remove_reader(self._sock_fd)
self._loop._remove_reader(self._sock_fd)
else:
self.close()

@@ -738,7 +769,7 @@ def write(self, data):
if not data:
return
# Not all was written; register write handler.
self._loop.add_writer(self._sock_fd, self._write_ready)
self._loop._add_writer(self._sock_fd, self._write_ready)

# Add it to the buffer.
self._buffer.extend(data)
@@ -754,15 +785,15 @@ def _write_ready(self):
except (BlockingIOError, InterruptedError):
pass
except Exception as exc:
self._loop.remove_writer(self._sock_fd)
self._loop._remove_writer(self._sock_fd)
self._buffer.clear()
self._fatal_error(exc, 'Fatal write error on socket transport')
else:
if n:
del self._buffer[:n]
self._maybe_resume_protocol() # May append to buffer.
if not self._buffer:
self._loop.remove_writer(self._sock_fd)
self._loop._remove_writer(self._sock_fd)
if self._closing:
self._call_connection_lost(None)
elif self._eof:
@@ -833,28 +864,28 @@ def _on_handshake(self, start_time):
try:
self._sock.do_handshake()
except ssl.SSLWantReadError:
self._loop.add_reader(self._sock_fd,
self._on_handshake, start_time)
self._loop._add_reader(self._sock_fd,
self._on_handshake, start_time)
return
except ssl.SSLWantWriteError:
self._loop.add_writer(self._sock_fd,
self._on_handshake, start_time)
self._loop._add_writer(self._sock_fd,
self._on_handshake, start_time)
return
except BaseException as exc:
if self._loop.get_debug():
logger.warning("%r: SSL handshake failed",
self, exc_info=True)
self._loop.remove_reader(self._sock_fd)
self._loop.remove_writer(self._sock_fd)
self._loop._remove_reader(self._sock_fd)
self._loop._remove_writer(self._sock_fd)
self._sock.close()
self._wakeup_waiter(exc)
if isinstance(exc, Exception):
return
else:
raise

self._loop.remove_reader(self._sock_fd)
self._loop.remove_writer(self._sock_fd)
self._loop._remove_reader(self._sock_fd)
self._loop._remove_writer(self._sock_fd)

peercert = self._sock.getpeercert()
if not hasattr(self._sslcontext, 'check_hostname'):
@@ -882,7 +913,7 @@ def _on_handshake(self, start_time):

self._read_wants_write = False
self._write_wants_read = False
self._loop.add_reader(self._sock_fd, self._read_ready)
self._loop._add_reader(self._sock_fd, self._read_ready)
self._protocol_connected = True
self._loop.call_soon(self._protocol.connection_made, self)
# only wake up the waiter when connection_made() has been called
@@ -904,7 +935,7 @@ def pause_reading(self):
if self._paused:
raise RuntimeError('Already paused')
self._paused = True
self._loop.remove_reader(self._sock_fd)
self._loop._remove_reader(self._sock_fd)
if self._loop.get_debug():
logger.debug("%r pauses reading", self)

@@ -914,7 +945,7 @@ def resume_reading(self):
self._paused = False
if self._closing:
return
self._loop.add_reader(self._sock_fd, self._read_ready)
self._loop._add_reader(self._sock_fd, self._read_ready)
if self._loop.get_debug():
logger.debug("%r resumes reading", self)

@@ -926,16 +957,16 @@ def _read_ready(self):
self._write_ready()

if self._buffer:
self._loop.add_writer(self._sock_fd, self._write_ready)
self._loop._add_writer(self._sock_fd, self._write_ready)

try:
data = self._sock.recv(self.max_size)
except (BlockingIOError, InterruptedError, ssl.SSLWantReadError):
pass
except ssl.SSLWantWriteError:
self._read_wants_write = True
self._loop.remove_reader(self._sock_fd)
self._loop.add_writer(self._sock_fd, self._write_ready)
self._loop._remove_reader(self._sock_fd)
self._loop._add_writer(self._sock_fd, self._write_ready)
except Exception as exc:
self._fatal_error(exc, 'Fatal read error on SSL transport')
else:
@@ -960,7 +991,7 @@ def _write_ready(self):
self._read_ready()

if not (self._paused or self._closing):
self._loop.add_reader(self._sock_fd, self._read_ready)
self._loop._add_reader(self._sock_fd, self._read_ready)

if self._buffer:
try:
@@ -969,10 +1000,10 @@ def _write_ready(self):
n = 0
except ssl.SSLWantReadError:
n = 0
self._loop.remove_writer(self._sock_fd)
self._loop._remove_writer(self._sock_fd)
self._write_wants_read = True
except Exception as exc:
self._loop.remove_writer(self._sock_fd)
self._loop._remove_writer(self._sock_fd)
self._buffer.clear()
self._fatal_error(exc, 'Fatal write error on SSL transport')
return
@@ -983,7 +1014,7 @@ def _write_ready(self):
self._maybe_resume_protocol() # May append to buffer.

if not self._buffer:
self._loop.remove_writer(self._sock_fd)
self._loop._remove_writer(self._sock_fd)
if self._closing:
self._call_connection_lost(None)

@@ -1001,7 +1032,7 @@ def write(self, data):
return

if not self._buffer:
self._loop.add_writer(self._sock_fd, self._write_ready)
self._loop._add_writer(self._sock_fd, self._write_ready)

# Add it to the buffer.
self._buffer.extend(data)
@@ -1021,7 +1052,7 @@ def __init__(self, loop, sock, protocol, address=None,
self._address = address
self._loop.call_soon(self._protocol.connection_made, self)
# only start reading when connection_made() has been called
self._loop.call_soon(self._loop.add_reader,
self._loop.call_soon(self._loop._add_reader,
self._sock_fd, self._read_ready)
if waiter is not None:
# only wake up the waiter when connection_made() has been called
@@ -1071,7 +1102,7 @@ def sendto(self, data, addr=None):
self._sock.sendto(data, addr)
return
except (BlockingIOError, InterruptedError):
self._loop.add_writer(self._sock_fd, self._sendto_ready)
self._loop._add_writer(self._sock_fd, self._sendto_ready)
except OSError as exc:
self._protocol.error_received(exc)
return
@@ -1105,6 +1136,6 @@ def _sendto_ready(self):

self._maybe_resume_protocol() # May append to buffer.
if not self._buffer:
self._loop.remove_writer(self._sock_fd)
self._loop._remove_writer(self._sock_fd)
if self._closing:
self._call_connection_lost(None)
42 changes: 38 additions & 4 deletions asyncio/test_utils.py
Original file line number Diff line number Diff line change
@@ -13,6 +13,8 @@
import threading
import time
import unittest
import weakref

from unittest import mock

from http.server import HTTPServer
@@ -300,6 +302,8 @@ def gen():
self.writers = {}
self.reset_counters()

self._transports = weakref.WeakValueDictionary()

def time(self):
return self._time

@@ -318,10 +322,10 @@ def close(self):
else: # pragma: no cover
raise AssertionError("Time generator is not finished")

def add_reader(self, fd, callback, *args):
def _add_reader(self, fd, callback, *args):
self.readers[fd] = events.Handle(callback, args, self)

def remove_reader(self, fd):
def _remove_reader(self, fd):
self.remove_reader_count[fd] += 1
if fd in self.readers:
del self.readers[fd]
@@ -337,10 +341,10 @@ def assert_reader(self, fd, callback, *args):
assert handle._args == args, '{!r} != {!r}'.format(
handle._args, args)

def add_writer(self, fd, callback, *args):
def _add_writer(self, fd, callback, *args):
self.writers[fd] = events.Handle(callback, args, self)

def remove_writer(self, fd):
def _remove_writer(self, fd):
self.remove_writer_count[fd] += 1
if fd in self.writers:
del self.writers[fd]
@@ -356,6 +360,36 @@ def assert_writer(self, fd, callback, *args):
assert handle._args == args, '{!r} != {!r}'.format(
handle._args, args)

def _ensure_fd_no_transport(self, fd):
try:
transport = self._transports[fd]
except KeyError:
pass
else:
raise RuntimeError(
'File descriptor {!r} is used by transport {!r}'.format(
fd, transport))

def add_reader(self, fd, callback, *args):
"""Add a reader callback."""
self._ensure_fd_no_transport(fd)
return self._add_reader(fd, callback, *args)

def remove_reader(self, fd):
"""Remove a reader callback."""
self._ensure_fd_no_transport(fd)
return self._remove_reader(fd)

def add_writer(self, fd, callback, *args):
"""Add a writer callback.."""
self._ensure_fd_no_transport(fd)
return self._add_writer(fd, callback, *args)

def remove_writer(self, fd):
"""Remove a writer callback."""
self._ensure_fd_no_transport(fd)
return self._remove_writer(fd)

def reset_counters(self):
self.remove_reader_count = collections.defaultdict(int)
self.remove_writer_count = collections.defaultdict(int)
26 changes: 13 additions & 13 deletions asyncio/unix_events.py
Original file line number Diff line number Diff line change
@@ -321,7 +321,7 @@ def __init__(self, loop, pipe, protocol, waiter=None, extra=None):

self._loop.call_soon(self._protocol.connection_made, self)
# only start reading when connection_made() has been called
self._loop.call_soon(self._loop.add_reader,
self._loop.call_soon(self._loop._add_reader,
self._fileno, self._read_ready)
if waiter is not None:
# only wake up the waiter when connection_made() has been called
@@ -364,15 +364,15 @@ def _read_ready(self):
if self._loop.get_debug():
logger.info("%r was closed by peer", self)
self._closing = True
self._loop.remove_reader(self._fileno)
self._loop._remove_reader(self._fileno)
self._loop.call_soon(self._protocol.eof_received)
self._loop.call_soon(self._call_connection_lost, None)

def pause_reading(self):
self._loop.remove_reader(self._fileno)
self._loop._remove_reader(self._fileno)

def resume_reading(self):
self._loop.add_reader(self._fileno, self._read_ready)
self._loop._add_reader(self._fileno, self._read_ready)

def set_protocol(self, protocol):
self._protocol = protocol
@@ -412,7 +412,7 @@ def _fatal_error(self, exc, message='Fatal error on pipe transport'):

def _close(self, exc):
self._closing = True
self._loop.remove_reader(self._fileno)
self._loop._remove_reader(self._fileno)
self._loop.call_soon(self._call_connection_lost, exc)

def _call_connection_lost(self, exc):
@@ -457,7 +457,7 @@ def __init__(self, loop, pipe, protocol, waiter=None, extra=None):
# works for pipes and sockets. (Exception: OS X 10.4? Issue #19294.)
if is_socket or (is_fifo and not sys.platform.startswith("aix")):
# only start reading when connection_made() has been called
self._loop.call_soon(self._loop.add_reader,
self._loop.call_soon(self._loop._add_reader,
self._fileno, self._read_ready)

if waiter is not None:
@@ -530,7 +530,7 @@ def write(self, data):
return
elif n > 0:
data = memoryview(data)[n:]
self._loop.add_writer(self._fileno, self._write_ready)
self._loop._add_writer(self._fileno, self._write_ready)

self._buffer += data
self._maybe_pause_protocol()
@@ -547,15 +547,15 @@ def _write_ready(self):
self._conn_lost += 1
# Remove writer here, _fatal_error() doesn't it
# because _buffer is empty.
self._loop.remove_writer(self._fileno)
self._loop._remove_writer(self._fileno)
self._fatal_error(exc, 'Fatal write error on pipe transport')
else:
if n == len(self._buffer):
self._buffer.clear()
self._loop.remove_writer(self._fileno)
self._loop._remove_writer(self._fileno)
self._maybe_resume_protocol() # May append to buffer.
if self._closing:
self._loop.remove_reader(self._fileno)
self._loop._remove_reader(self._fileno)
self._call_connection_lost(None)
return
elif n > 0:
@@ -570,7 +570,7 @@ def write_eof(self):
assert self._pipe
self._closing = True
if not self._buffer:
self._loop.remove_reader(self._fileno)
self._loop._remove_reader(self._fileno)
self._loop.call_soon(self._call_connection_lost, None)

def set_protocol(self, protocol):
@@ -616,9 +616,9 @@ def _fatal_error(self, exc, message='Fatal error on pipe transport'):
def _close(self, exc=None):
self._closing = True
if self._buffer:
self._loop.remove_writer(self._fileno)
self._loop._remove_writer(self._fileno)
self._buffer.clear()
self._loop.remove_reader(self._fileno)
self._loop._remove_reader(self._fileno)
self._loop.call_soon(self._call_connection_lost, exc)

def _call_connection_lost(self, exc):
16 changes: 8 additions & 8 deletions tests/test_base_events.py
Original file line number Diff line number Diff line change
@@ -1168,10 +1168,10 @@ def _test_create_connection_ip_addr(self, m_socket, allow_inet_pton):
m_socket.getaddrinfo = socket.getaddrinfo
sock = m_socket.socket.return_value

self.loop.add_reader = mock.Mock()
self.loop.add_reader._is_coroutine = False
self.loop.add_writer = mock.Mock()
self.loop.add_writer._is_coroutine = False
self.loop._add_reader = mock.Mock()
self.loop._add_reader._is_coroutine = False
self.loop._add_writer = mock.Mock()
self.loop._add_writer._is_coroutine = False

coro = self.loop.create_connection(asyncio.Protocol, '1.2.3.4', 80)
t, p = self.loop.run_until_complete(coro)
@@ -1603,8 +1603,8 @@ def getaddrinfo(*args, **kw):

m_socket.getaddrinfo = getaddrinfo
m_socket.socket.return_value.bind = bind = mock.Mock()
self.loop.add_reader = mock.Mock()
self.loop.add_reader._is_coroutine = False
self.loop._add_reader = mock.Mock()
self.loop._add_reader._is_coroutine = False

reuseport_supported = hasattr(socket, 'SO_REUSEPORT')
coro = self.loop.create_datagram_endpoint(
@@ -1635,13 +1635,13 @@ def test_accept_connection_exception(self, m_log):
sock = mock.Mock()
sock.fileno.return_value = 10
sock.accept.side_effect = OSError(errno.EMFILE, 'Too many open files')
self.loop.remove_reader = mock.Mock()
self.loop._remove_reader = mock.Mock()
self.loop.call_later = mock.Mock()

self.loop._accept_connection(MyProto, sock)
self.assertTrue(m_log.error.called)
self.assertFalse(sock.close.called)
self.loop.remove_reader.assert_called_with(10)
self.loop._remove_reader.assert_called_with(10)
self.loop.call_later.assert_called_with(constants.ACCEPT_RETRY_DELAY,
# self.loop._start_serving
mock.ANY,
64 changes: 32 additions & 32 deletions tests/test_selector_events.py
Original file line number Diff line number Diff line change
@@ -74,11 +74,11 @@ def test_make_socket_transport(self):
@unittest.skipIf(ssl is None, 'No ssl module')
def test_make_ssl_transport(self):
m = mock.Mock()
self.loop.add_reader = mock.Mock()
self.loop.add_reader._is_coroutine = False
self.loop.add_writer = mock.Mock()
self.loop.remove_reader = mock.Mock()
self.loop.remove_writer = mock.Mock()
self.loop._add_reader = mock.Mock()
self.loop._add_reader._is_coroutine = False
self.loop._add_writer = mock.Mock()
self.loop._remove_reader = mock.Mock()
self.loop._remove_writer = mock.Mock()
waiter = asyncio.Future(loop=self.loop)
with test_utils.disable_logger():
transport = self.loop._make_ssl_transport(
@@ -121,7 +121,7 @@ def _make_self_pipe(self):
ssock.fileno.return_value = 7
csock = self.loop._csock
csock.fileno.return_value = 1
remove_reader = self.loop.remove_reader = mock.Mock()
remove_reader = self.loop._remove_reader = mock.Mock()

self.loop._selector.close()
self.loop._selector = selector = mock.Mock()
@@ -653,12 +653,12 @@ def test_process_events_read_cancelled(self):
reader = mock.Mock()
reader.cancelled = True

self.loop.remove_reader = mock.Mock()
self.loop._remove_reader = mock.Mock()
self.loop._process_events(
[(selectors.SelectorKey(
1, 1, selectors.EVENT_READ, (reader, None)),
selectors.EVENT_READ)])
self.loop.remove_reader.assert_called_with(1)
self.loop._remove_reader.assert_called_with(1)

def test_process_events_write(self):
writer = mock.Mock()
@@ -674,13 +674,13 @@ def test_process_events_write(self):
def test_process_events_write_cancelled(self):
writer = mock.Mock()
writer.cancelled = True
self.loop.remove_writer = mock.Mock()
self.loop._remove_writer = mock.Mock()

self.loop._process_events(
[(selectors.SelectorKey(1, 1, selectors.EVENT_WRITE,
(None, writer)),
selectors.EVENT_WRITE)])
self.loop.remove_writer.assert_called_with(1)
self.loop._remove_writer.assert_called_with(1)

def test_accept_connection_multiple(self):
sock = mock.Mock()
@@ -749,8 +749,8 @@ def test_close_write_buffer(self):
def test_force_close(self):
tr = self.create_transport()
tr._buffer.extend(b'1')
self.loop.add_reader(7, mock.sentinel)
self.loop.add_writer(7, mock.sentinel)
self.loop._add_reader(7, mock.sentinel)
self.loop._add_writer(7, mock.sentinel)
tr._force_close(None)

self.assertTrue(tr.is_closing())
@@ -1039,7 +1039,7 @@ def test_write_ready(self):

transport = self.socket_transport()
transport._buffer.extend(data)
self.loop.add_writer(7, transport._write_ready)
self.loop._add_writer(7, transport._write_ready)
transport._write_ready()
self.assertTrue(self.sock.send.called)
self.assertFalse(self.loop.writers)
@@ -1051,7 +1051,7 @@ def test_write_ready_closing(self):
transport = self.socket_transport()
transport._closing = True
transport._buffer.extend(data)
self.loop.add_writer(7, transport._write_ready)
self.loop._add_writer(7, transport._write_ready)
transport._write_ready()
self.assertTrue(self.sock.send.called)
self.assertFalse(self.loop.writers)
@@ -1069,7 +1069,7 @@ def test_write_ready_partial(self):

transport = self.socket_transport()
transport._buffer.extend(data)
self.loop.add_writer(7, transport._write_ready)
self.loop._add_writer(7, transport._write_ready)
transport._write_ready()
self.loop.assert_writer(7, transport._write_ready)
self.assertEqual(list_to_buffer([b'ta']), transport._buffer)
@@ -1080,7 +1080,7 @@ def test_write_ready_partial_none(self):

transport = self.socket_transport()
transport._buffer.extend(data)
self.loop.add_writer(7, transport._write_ready)
self.loop._add_writer(7, transport._write_ready)
transport._write_ready()
self.loop.assert_writer(7, transport._write_ready)
self.assertEqual(list_to_buffer([b'data']), transport._buffer)
@@ -1090,7 +1090,7 @@ def test_write_ready_tryagain(self):

transport = self.socket_transport()
transport._buffer = list_to_buffer([b'data1', b'data2'])
self.loop.add_writer(7, transport._write_ready)
self.loop._add_writer(7, transport._write_ready)
transport._write_ready()

self.loop.assert_writer(7, transport._write_ready)
@@ -1132,7 +1132,7 @@ def test_write_eof_buffer(self):

@mock.patch('asyncio.base_events.logger')
def test_transport_close_remove_writer(self, m_log):
remove_writer = self.loop.remove_writer = mock.Mock()
remove_writer = self.loop._remove_writer = mock.Mock()

transport = self.socket_transport()
transport.close()
@@ -1290,7 +1290,7 @@ def test_read_ready_recv(self):
self.assertEqual((b'data',), self.protocol.data_received.call_args[0])

def test_read_ready_write_wants_read(self):
self.loop.add_writer = mock.Mock()
self.loop._add_writer = mock.Mock()
self.sslsock.recv.side_effect = BlockingIOError
transport = self._make_one()
transport._write_wants_read = True
@@ -1300,7 +1300,7 @@ def test_read_ready_write_wants_read(self):

self.assertFalse(transport._write_wants_read)
transport._write_ready.assert_called_with()
self.loop.add_writer.assert_called_with(
self.loop._add_writer.assert_called_with(
transport._sock_fd, transport._write_ready)

def test_read_ready_recv_eof(self):
@@ -1335,16 +1335,16 @@ def test_read_ready_recv_retry(self):
self.assertFalse(self.protocol.data_received.called)

def test_read_ready_recv_write(self):
self.loop.remove_reader = mock.Mock()
self.loop.add_writer = mock.Mock()
self.loop._remove_reader = mock.Mock()
self.loop._add_writer = mock.Mock()
self.sslsock.recv.side_effect = ssl.SSLWantWriteError
transport = self._make_one()
transport._read_ready()
self.assertFalse(self.protocol.data_received.called)
self.assertTrue(transport._read_wants_write)

self.loop.remove_reader.assert_called_with(transport._sock_fd)
self.loop.add_writer.assert_called_with(
self.loop._remove_reader.assert_called_with(transport._sock_fd)
self.loop._add_writer.assert_called_with(
transport._sock_fd, transport._write_ready)

def test_read_ready_recv_exc(self):
@@ -1421,12 +1421,12 @@ def test_write_ready_send_read(self):
transport = self._make_one()
transport._buffer = list_to_buffer([b'data'])

self.loop.remove_writer = mock.Mock()
self.loop._remove_writer = mock.Mock()
self.sslsock.send.side_effect = ssl.SSLWantReadError
transport._write_ready()
self.assertFalse(self.protocol.data_received.called)
self.assertTrue(transport._write_wants_read)
self.loop.remove_writer.assert_called_with(transport._sock_fd)
self.loop._remove_writer.assert_called_with(transport._sock_fd)

def test_write_ready_send_exc(self):
err = self.sslsock.send.side_effect = OSError()
@@ -1441,7 +1441,7 @@ def test_write_ready_send_exc(self):
self.assertEqual(list_to_buffer(), transport._buffer)

def test_write_ready_read_wants_write(self):
self.loop.add_reader = mock.Mock()
self.loop._add_reader = mock.Mock()
self.sslsock.send.side_effect = BlockingIOError
transport = self._make_one()
transport._read_wants_write = True
@@ -1450,7 +1450,7 @@ def test_write_ready_read_wants_write(self):

self.assertFalse(transport._read_wants_write)
transport._read_ready.assert_called_with()
self.loop.add_reader.assert_called_with(
self.loop._add_reader.assert_called_with(
transport._sock_fd, transport._read_ready)

def test_write_eof(self):
@@ -1701,7 +1701,7 @@ def test_sendto_ready(self):

transport = self.datagram_transport()
transport._buffer.append((data, ('0.0.0.0', 12345)))
self.loop.add_writer(7, transport._sendto_ready)
self.loop._add_writer(7, transport._sendto_ready)
transport._sendto_ready()
self.assertTrue(self.sock.sendto.called)
self.assertEqual(
@@ -1715,7 +1715,7 @@ def test_sendto_ready_closing(self):
transport = self.datagram_transport()
transport._closing = True
transport._buffer.append((data, ()))
self.loop.add_writer(7, transport._sendto_ready)
self.loop._add_writer(7, transport._sendto_ready)
transport._sendto_ready()
self.sock.sendto.assert_called_with(data, ())
self.assertFalse(self.loop.writers)
@@ -1724,7 +1724,7 @@ def test_sendto_ready_closing(self):

def test_sendto_ready_no_data(self):
transport = self.datagram_transport()
self.loop.add_writer(7, transport._sendto_ready)
self.loop._add_writer(7, transport._sendto_ready)
transport._sendto_ready()
self.assertFalse(self.sock.sendto.called)
self.assertFalse(self.loop.writers)
@@ -1734,7 +1734,7 @@ def test_sendto_ready_tryagain(self):

transport = self.datagram_transport()
transport._buffer.extend([(b'data1', ()), (b'data2', ())])
self.loop.add_writer(7, transport._sendto_ready)
self.loop._add_writer(7, transport._sendto_ready)
transport._sendto_ready()

self.loop.assert_writer(7, transport._sendto_ready)