Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
91 changes: 45 additions & 46 deletions examples/pubsub_eventing.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ def publisher_process(
logger.info('publisher shutdown')


# Execute within a separate thread context
def on_event(payload: Dict[str, Any]) -> None:
'''Subscriber callback.'''
global num_events_received
Expand All @@ -55,49 +56,47 @@ def on_event(payload: Dict[str, Any]) -> None:

if __name__ == '__main__':
start_time = time.time()

# Start dispatcher thread using EventManager
event_manager = EventManager()
event_manager.start_event_dispatcher()
assert event_manager.event_queue

# Create a subscriber
subscriber = EventSubscriber(event_manager.event_queue)
# Internally, subscribe will start a separate thread
# to receive incoming published messages
subscriber.subscribe(on_event)

# Start a publisher process to demonstrate safe exchange
# of messages between processes.
publisher_shutdown_event = multiprocessing.Event()
publisher = multiprocessing.Process(
target=publisher_process, args=(
publisher_shutdown_event, event_manager.event_queue, ),
)
publisher.start()

try:
while True:
# Dispatch event from main process
event_manager.event_queue.publish(
request_id=main_publisher_request_id,
event_name=eventNames.WORK_STARTED,
event_payload={'time': time.time()},
publisher_id='eventing_pubsub_main',
)
except KeyboardInterrupt:
logger.info('bye!!!')
finally:
# Stop publisher
publisher_shutdown_event.set()
publisher.join()
# Stop subscriber thread
subscriber.unsubscribe()
# Signal dispatcher to shutdown
event_manager.stop_event_dispatcher()
logger.info(
'Received {0} events from main thread, {1} events from another process, in {2} seconds'.format(
num_events_received[0], num_events_received[1], time.time(
) - start_time,
),
)
# Start eventing core
with EventManager() as event_manager:
assert event_manager.queue

# Create a subscriber.
# Internally, subscribe will start a separate thread
# to receive incoming published messages.
subscriber = EventSubscriber(event_manager.queue)
subscriber.subscribe(on_event)

# Start a publisher process to demonstrate safe exchange
# of messages between processes.
publisher_shutdown_event = multiprocessing.Event()
publisher = multiprocessing.Process(
target=publisher_process, args=(
publisher_shutdown_event, event_manager.queue, ),
)
publisher.start()

# Dispatch event from main process too
# to demonstrate safe exchange of messages
# between threads.
try:
while True:
event_manager.queue.publish(
request_id=main_publisher_request_id,
event_name=eventNames.WORK_STARTED,
event_payload={'time': time.time()},
publisher_id='eventing_pubsub_main',
)
except KeyboardInterrupt:
logger.info('bye!!!')
finally:
# Stop publisher
publisher_shutdown_event.set()
publisher.join()
# Stop subscriber thread
subscriber.unsubscribe()
logger.info(
'Received {0} events from main thread, {1} events from another process, in {2} seconds'.format(
num_events_received[0], num_events_received[1], time.time(
) - start_time,
),
)
64 changes: 39 additions & 25 deletions proxy/core/event/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@
import threading
import multiprocessing

from typing import Optional
from typing import Optional, Type
from types import TracebackType

from .queue import EventQueue
from .dispatcher import EventDispatcher
Expand All @@ -33,39 +34,52 @@


class EventManager:
"""Event manager is an encapsulation around various initialization, dispatcher
start / stop API required for end-to-end eventing.
"""Event manager is a context manager which provides
encapsulation around various setup and shutdown steps
to start the eventing core.
"""

def __init__(self) -> None:
self.event_queue: Optional[EventQueue] = None
self.event_dispatcher: Optional[EventDispatcher] = None
self.event_dispatcher_thread: Optional[threading.Thread] = None
self.event_dispatcher_shutdown: Optional[threading.Event] = None
self.queue: Optional[EventQueue] = None
self.dispatcher: Optional[EventDispatcher] = None
self.dispatcher_thread: Optional[threading.Thread] = None
self.dispatcher_shutdown: Optional[threading.Event] = None
self.manager: Optional[multiprocessing.managers.SyncManager] = None

def start_event_dispatcher(self) -> None:
def __enter__(self) -> 'EventManager':
self.setup()
return self

def __exit__(
self,
exc_type: Optional[Type[BaseException]],
exc_val: Optional[BaseException],
exc_tb: Optional[TracebackType],
) -> None:
self.shutdown()

def setup(self) -> None:
self.manager = multiprocessing.Manager()
self.event_queue = EventQueue(self.manager.Queue())
self.event_dispatcher_shutdown = threading.Event()
assert self.event_dispatcher_shutdown
assert self.event_queue
self.event_dispatcher = EventDispatcher(
shutdown=self.event_dispatcher_shutdown,
event_queue=self.event_queue,
self.queue = EventQueue(self.manager.Queue())
self.dispatcher_shutdown = threading.Event()
assert self.dispatcher_shutdown
assert self.queue
self.dispatcher = EventDispatcher(
shutdown=self.dispatcher_shutdown,
event_queue=self.queue,
)
self.event_dispatcher_thread = threading.Thread(
target=self.event_dispatcher.run,
self.dispatcher_thread = threading.Thread(
target=self.dispatcher.run,
)
self.event_dispatcher_thread.start()
logger.debug('Thread ID: %d', self.event_dispatcher_thread.ident)
self.dispatcher_thread.start()
logger.debug('Thread ID: %d', self.dispatcher_thread.ident)

def stop_event_dispatcher(self) -> None:
assert self.event_dispatcher_shutdown
assert self.event_dispatcher_thread
self.event_dispatcher_shutdown.set()
self.event_dispatcher_thread.join()
def shutdown(self) -> None:
assert self.dispatcher_shutdown
assert self.dispatcher_thread
self.dispatcher_shutdown.set()
self.dispatcher_thread.join()
logger.debug(
'Shutdown of global event dispatcher thread %d successful',
self.event_dispatcher_thread.ident,
self.dispatcher_thread.ident,
)
6 changes: 3 additions & 3 deletions proxy/proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,11 +114,11 @@ def __enter__(self) -> 'Proxy':
if self.flags.enable_events:
logger.info('Core Event enabled')
self.event_manager = EventManager()
self.event_manager.start_event_dispatcher()
self.event_manager.setup()
self.pool = AcceptorPool(
flags=self.flags,
work_klass=self.work_klass,
event_queue=self.event_manager.event_queue if self.event_manager is not None else None,
event_queue=self.event_manager.queue if self.event_manager is not None else None,
)
self.pool.setup()
return self
Expand All @@ -133,7 +133,7 @@ def __exit__(
self.pool.shutdown()
if self.flags.enable_events:
assert self.event_manager is not None
self.event_manager.stop_event_dispatcher()
self.event_manager.shutdown()


def main(
Expand Down
10 changes: 5 additions & 5 deletions tests/test_main.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,12 +127,12 @@ def test_enable_events(
mock_initialize.return_value.enable_events = True
main([])
mock_event_manager.assert_called_once()
mock_event_manager.return_value.start_event_dispatcher.assert_called_once()
mock_event_manager.return_value.stop_event_dispatcher.assert_called_once()
mock_event_manager.return_value.setup.assert_called_once()
mock_event_manager.return_value.shutdown.assert_called_once()
mock_acceptor_pool.assert_called_with(
flags=mock_initialize.return_value,
work_klass=HttpProtocolHandler,
event_queue=mock_event_manager.return_value.event_queue,
event_queue=mock_event_manager.return_value.queue,
)
mock_acceptor_pool.return_value.setup.assert_called()
mock_acceptor_pool.return_value.shutdown.assert_called()
Expand Down Expand Up @@ -171,8 +171,8 @@ def test_enable_dashboard(
mock_acceptor_pool.return_value.setup.assert_called()
# dashboard will also enable eventing
mock_event_manager.assert_called_once()
mock_event_manager.return_value.start_event_dispatcher.assert_called_once()
mock_event_manager.return_value.stop_event_dispatcher.assert_called_once()
mock_event_manager.return_value.setup.assert_called_once()
mock_event_manager.return_value.shutdown.assert_called_once()

@mock.patch('time.sleep')
@mock.patch('proxy.common.plugins.Plugins.load')
Expand Down