Skip to content

Commit 82ad1b3

Browse files
authored
Merge pull request #1891 from Exirel/job-prevent-parallel-execution
jobs: prevent parallel execution and plugin specific scheduler
2 parents 75d0375 + e0fd07a commit 82ad1b3

File tree

8 files changed

+751
-116
lines changed

8 files changed

+751
-116
lines changed

docs/source/api.rst

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,11 @@ sopel.tools.events
4646
:members:
4747
:undoc-members:
4848

49+
sopel.tools.jobs
50+
----------------
51+
.. automodule:: sopel.tools.jobs
52+
:members:
53+
4954
sopel.formatting
5055
----------------
5156
.. automodule:: sopel.formatting

docs/source/plugin.rst

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -146,6 +146,12 @@ sopel.plugins.rules
146146
:members:
147147
:undoc-members:
148148

149+
sopel.plugins.jobs
150+
------------------
151+
.. automodule:: sopel.plugins.jobs
152+
:members:
153+
:show-inheritance:
154+
149155
sopel.loader
150156
------------
151157
.. automodule:: sopel.loader

sopel/bot.py

Lines changed: 26 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
from sopel.db import SopelDB
2323
import sopel.loader
2424
from sopel.module import NOLIMIT
25-
from sopel.plugins import rules as plugin_rules
25+
from sopel.plugins import jobs as plugin_jobs, rules as plugin_rules
2626
from sopel.tools import deprecated, Identifier
2727
import sopel.tools.jobs
2828
from sopel.trigger import Trigger
@@ -61,6 +61,7 @@ def __init__(self, config, daemon=False):
6161
self._running_triggers_lock = threading.Lock()
6262
self._plugins = {}
6363
self._rules_manager = plugin_rules.Manager()
64+
self._scheduler = plugin_jobs.Scheduler(self)
6465

6566
self._times = {}
6667
"""
@@ -118,13 +119,16 @@ def __init__(self, config, daemon=False):
118119
self.shutdown_methods = []
119120
"""List of methods to call on shutdown."""
120121

121-
self.scheduler = sopel.tools.jobs.JobScheduler(self)
122-
"""Job Scheduler. See :func:`sopel.module.interval`."""
123-
124122
@property
125123
def rules(self):
124+
"""Rules manager."""
126125
return self._rules_manager
127126

127+
@property
128+
def scheduler(self):
129+
"""Job Scheduler. See :func:`sopel.module.interval`."""
130+
return self._scheduler
131+
128132
@property
129133
def command_groups(self):
130134
"""A mapping of plugin names to lists of their commands.
@@ -250,7 +254,7 @@ def setup(self):
250254
"""
251255
self.setup_logging()
252256
self.setup_plugins()
253-
self.scheduler.start()
257+
self._scheduler.start()
254258

255259
def setup_logging(self):
256260
"""Set up logging based on config options."""
@@ -414,7 +418,7 @@ def remove_plugin(self, plugin, callables, jobs, shutdowns, urls):
414418

415419
# remove plugin rules, jobs, shutdown functions, and url callbacks
416420
self._rules_manager.unregister_plugin(name)
417-
self.unregister_jobs(jobs)
421+
self._scheduler.unregister_plugin(name)
418422
self.unregister_shutdowns(shutdowns)
419423
self.unregister_urls(urls)
420424

@@ -450,10 +454,14 @@ def get_plugin_meta(self, name):
450454
version='7.1',
451455
removed_in='8.0')
452456
def unregister(self, obj):
453-
"""Unregister a job or a shutdown method.
457+
"""Unregister a shutdown method.
454458
455-
:param obj: the job or shutdown method to unregister
459+
:param obj: the shutdown method to unregister
456460
:type obj: :term:`object`
461+
462+
This method was used to unregister anything (rules, commands, urls,
463+
jobs, and shutdown methods), but since everything can be done by other
464+
means, there is no use for it anymore.
457465
"""
458466
callable_name = getattr(obj, "__name__", 'UNKNOWN')
459467

@@ -544,20 +552,12 @@ def register_callables(self, callables):
544552

545553
def register_jobs(self, jobs):
546554
for func in jobs:
547-
for interval in func.interval:
548-
job = sopel.tools.jobs.Job(interval, func)
549-
self.scheduler.add_job(job)
550-
callable_name = getattr(func, "__name__", 'UNKNOWN')
551-
LOGGER.debug(
552-
'Job added "%s", will run every %d seconds',
553-
callable_name,
554-
interval)
555+
job = sopel.tools.jobs.Job.from_callable(self.settings, func)
556+
self._scheduler.register(job)
555557

556558
def unregister_jobs(self, jobs):
557559
for job in jobs:
558-
callable_name = getattr(job, "__name__", 'UNKNOWN')
559-
self.scheduler.remove_callable_job(job)
560-
LOGGER.debug('Job callable removed: %s', callable_name)
560+
self._scheduler.remove_callable_job(job)
561561

562562
def register_shutdowns(self, shutdowns):
563563
# Append plugin's shutdown function to the bot's list of functions to
@@ -857,8 +857,8 @@ def _update_running_triggers(self, running_triggers):
857857
def on_scheduler_error(self, scheduler, exc):
858858
"""Called when the Job Scheduler fails.
859859
860-
:param scheduler: the JobScheduler that errored
861-
:type scheduler: :class:`sopel.tools.jobs.JobScheduler`
860+
:param scheduler: the job scheduler that errored
861+
:type scheduler: :class:`sopel.plugins.jobs.Scheduler`
862862
:param Exception exc: the raised exception
863863
864864
.. seealso::
@@ -870,8 +870,8 @@ def on_scheduler_error(self, scheduler, exc):
870870
def on_job_error(self, scheduler, job, exc):
871871
"""Called when a job from the Job Scheduler fails.
872872
873-
:param scheduler: the JobScheduler responsible for the errored ``job``
874-
:type scheduler: :class:`sopel.tools.jobs.JobScheduler`
873+
:param scheduler: the job scheduler responsible for the errored ``job``
874+
:type scheduler: :class:`sopel.plugins.jobs.Scheduler`
875875
:param job: the Job that errored
876876
:type job: :class:`sopel.tools.jobs.Job`
877877
:param Exception exc: the raised exception
@@ -939,16 +939,16 @@ def _shutdown(self):
939939
LOGGER.info('Shutting down')
940940
# Stop Job Scheduler
941941
LOGGER.info('Stopping the Job Scheduler.')
942-
self.scheduler.stop()
942+
self._scheduler.stop()
943943

944944
try:
945-
self.scheduler.join(timeout=15)
945+
self._scheduler.join(timeout=15)
946946
except RuntimeError:
947947
LOGGER.exception('Unable to stop the Job Scheduler.')
948948
else:
949949
LOGGER.info('Job Scheduler stopped.')
950950

951-
self.scheduler.clear_jobs()
951+
self._scheduler.clear_jobs()
952952

953953
# Shutdown plugins
954954
LOGGER.info(

sopel/coretasks.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@
2727
from sopel import loader, module
2828
from sopel.irc import isupport
2929
from sopel.irc.utils import CapReq, MyInfo
30-
from sopel.tools import events, Identifier, iteritems, jobs, target, web
30+
from sopel.tools import events, Identifier, iteritems, target, web
3131

3232

3333
if sys.version_info.major >= 3:
@@ -47,11 +47,14 @@ def setup(bot):
4747
wait_interval = max(bot.settings.core.throttle_wait, 1)
4848

4949
@module.interval(wait_interval)
50+
@module.label('throttle_join')
5051
def processing_job(bot):
5152
_join_event_processing(bot)
5253

5354
loader.clean_callable(processing_job, bot.settings)
54-
bot.scheduler.add_job(jobs.Job(wait_interval, processing_job))
55+
processing_job.plugin_name = 'coretasks'
56+
57+
bot.register_jobs([processing_job])
5558

5659

5760
def shutdown(bot):

sopel/irc/backends.py

Lines changed: 22 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,8 @@
1616
import sys
1717
import threading
1818

19-
from sopel.tools.jobs import Job, JobScheduler
19+
from sopel import loader, module
20+
from sopel.tools import jobs
2021
from .abstract_backends import AbstractIRCBackend
2122
from .utils import get_cnames
2223

@@ -40,6 +41,7 @@
4041
LOGGER = logging.getLogger(__name__)
4142

4243

44+
@module.thread(False)
4345
def _send_ping(backend):
4446
if not backend.is_connected():
4547
return
@@ -51,6 +53,7 @@ def _send_ping(backend):
5153
LOGGER.exception('Socket error on PING')
5254

5355

56+
@module.thread(False)
5457
def _check_timeout(backend):
5558
if not backend.is_connected():
5659
return
@@ -61,10 +64,6 @@ def _check_timeout(backend):
6164
backend.close_when_done()
6265

6366

64-
_send_ping.thread = False
65-
_check_timeout.thread = False
66-
67-
6867
class AsynchatBackend(AbstractIRCBackend, asynchat.async_chat):
6968
"""IRC backend implementation using :mod:`asynchat` (:mod:`asyncore`).
7069
@@ -85,13 +84,17 @@ def __init__(self, bot, server_timeout=None, ping_timeout=None, **kwargs):
8584
self.host = None
8685
self.port = None
8786
self.source_address = None
87+
self.timeout_scheduler = jobs.Scheduler(self)
8888

89-
ping_job = Job(self.ping_timeout, _send_ping)
90-
timeout_job = Job(self.server_timeout, _check_timeout)
89+
# prepare interval decorator
90+
ping_interval = module.interval(self.ping_timeout)
91+
timeout_interval = module.interval(self.server_timeout)
9192

92-
self.timeout_scheduler = JobScheduler(self)
93-
self.timeout_scheduler.add_job(ping_job)
94-
self.timeout_scheduler.add_job(timeout_job)
93+
# register timeout jobs
94+
self.register_timeout_jobs([
95+
ping_interval(_send_ping),
96+
timeout_interval(_check_timeout),
97+
])
9598

9699
def is_connected(self):
97100
return self.connected
@@ -109,8 +112,17 @@ def irc_send(self, data):
109112

110113
def run_forever(self):
111114
"""Run forever."""
115+
LOGGER.debug('Running forever.')
112116
asyncore.loop()
113117

118+
def register_timeout_jobs(self, handlers):
119+
"""Register the timeout handlers for the timeout scheduler."""
120+
for handler in handlers:
121+
loader.clean_callable(handler, self.bot.settings)
122+
job = jobs.Job.from_callable(self.bot.settings, handler)
123+
self.timeout_scheduler.register(job)
124+
LOGGER.debug('Timeout Job registered: %s', str(job))
125+
114126
def initiate_connect(self, host, port, source_address):
115127
"""Initiate IRC connection.
116128

sopel/plugins/jobs.py

Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
# coding=utf-8
2+
"""Sopel's plugin jobs management.
3+
4+
.. versionadded:: 7.1
5+
6+
.. important::
7+
8+
This is all fresh and new. Its usage and documentation is for Sopel core
9+
development and advanced developers. It is subject to rapid changes
10+
between versions without much (or any) warning.
11+
12+
Do **not** build your plugin based on what is here, you do **not** need to.
13+
14+
"""
15+
# Copyright 2020, Florian Strzelecki <[email protected]>
16+
#
17+
# Licensed under the Eiffel Forum License 2.
18+
from __future__ import absolute_import, division, print_function, unicode_literals
19+
20+
import itertools
21+
import logging
22+
23+
from sopel import tools
24+
from sopel.tools import jobs
25+
26+
LOGGER = logging.getLogger(__name__)
27+
28+
29+
class Scheduler(jobs.Scheduler):
30+
"""Plugins's Job Scheduler
31+
32+
:param manager: bot instance passed to jobs as argument
33+
:type manager: :class:`sopel.bot.Sopel`
34+
35+
Scheduler that stores plugin jobs and behaves like its
36+
:class:`parent class <sopel.tools.jobs.Scheduler>`.
37+
38+
.. versionadded:: 7.1
39+
40+
.. note::
41+
42+
This class is a specific implementation of the scheduler, made to store
43+
jobs by their plugins and be used by the bot (its ``manager``).
44+
It follows a similar interface as the
45+
:class:`plugin rules manager <sopel.plugins.rules.Manager>`.
46+
47+
.. important::
48+
49+
This is an internal tool used by Sopel to manage its jobs. To register
50+
a job, plugin authors should use :func:`sopel.module.interval`.
51+
52+
"""
53+
def __init__(self, manager):
54+
super(Scheduler, self).__init__(manager)
55+
self._jobs = tools.SopelMemoryWithDefault(list)
56+
57+
def register(self, job):
58+
with self._mutex:
59+
self._jobs[job.get_plugin_name()].append(job)
60+
LOGGER.debug('Job registered: %s', str(job))
61+
62+
def unregister_plugin(self, plugin_name):
63+
"""Unregister all the jobs from a plugin.
64+
65+
:param str plugin_name: the name of the plugin to remove
66+
:return: the number of jobs unregistered for this plugin
67+
:rtype: int
68+
69+
All jobs of that plugin will be removed from the scheduler.
70+
71+
This method is thread safe. However, it won't cancel or stop any
72+
currently running jobs.
73+
"""
74+
unregistered_jobs = 0
75+
with self._mutex:
76+
jobs_count = len(self._jobs[plugin_name])
77+
del self._jobs[plugin_name]
78+
unregistered_jobs = unregistered_jobs + jobs_count
79+
80+
LOGGER.debug(
81+
'[%s] Successfully unregistered %d jobs',
82+
plugin_name,
83+
unregistered_jobs)
84+
85+
return unregistered_jobs
86+
87+
def clear_jobs(self):
88+
with self._mutex:
89+
self._jobs = tools.SopelMemoryWithDefault(list)
90+
91+
LOGGER.debug('Successfully unregistered all jobs')
92+
93+
def remove_callable_job(self, callable):
94+
plugin_name = getattr(callable, 'plugin_name', None)
95+
if not self._jobs[plugin_name]:
96+
return
97+
98+
with self._mutex:
99+
self._jobs[plugin_name] = [
100+
job for job in self._jobs[plugin_name]
101+
if job._handler != callable
102+
]
103+
104+
def _get_ready_jobs(self, now):
105+
with self._mutex:
106+
jobs = [
107+
job for job in itertools.chain(*self._jobs.values())
108+
if job.is_ready_to_run(now)
109+
]
110+
111+
return jobs

0 commit comments

Comments
 (0)