Skip to content

Commit 0b81709

Browse files
xinchen384xin.chen
andauthored
[batch] job FIFO scheduler as baseline (#231)
* job FIFO scheduler as baseline * update comments * func name and typo --------- Co-authored-by: xin.chen <[email protected]>
1 parent 4704a69 commit 0b81709

File tree

1 file changed

+120
-0
lines changed

1 file changed

+120
-0
lines changed
Lines changed: 120 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,120 @@
1+
# Licensed under the Apache License, Version 2.0 (the "License");
2+
# you may not use this file except in compliance with the License.
3+
# You may obtain a copy of the License at
4+
#
5+
# http://www.apache.org/licenses/LICENSE-2.0
6+
#
7+
# Unless required by applicable law or agreed to in writing, software
8+
# distributed under the License is distributed on an "AS IS" BASIS,
9+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10+
# See the License for the specific language governing permissions and
11+
# limitations under the License.
12+
13+
from enum import Enum
14+
import time
15+
import asyncio
16+
import bisect
17+
import queue
18+
19+
# This is the time interval for the sliding window to check.
20+
EXPIRE_INTERVAL = 1
21+
22+
23+
class SchedulePolicy(Enum):
24+
FIFO = 1
25+
26+
27+
class JobScheduler:
28+
def __init__(self, policy=SchedulePolicy.FIFO):
29+
"""
30+
self._jobs_queue are all the jobs.
31+
self._due_jobs_list stores all potential jobs that can be marked
32+
as expired jobs.
33+
self._inactive_jobs are jobs that are already invalid.
34+
"""
35+
self.interval = EXPIRE_INTERVAL
36+
self._jobs_queue = queue.Queue()
37+
self._inactive_jobs = set()
38+
self._due_jobs_list = []
39+
# Start sliding process in an async way
40+
asyncio.create_task(self.job_cleanup_loop())
41+
self._policy = policy
42+
43+
def append_job(self, job_id, due_time_seconds):
44+
# This submits a job to scheduler. The scheduler will determine
45+
# which job gets executed.
46+
self._jobs_queue.put(job_id)
47+
48+
def key_func(x):
49+
return x[1]
50+
51+
current_time = time.time()
52+
due_time = current_time + due_time_seconds
53+
item = (job_id, due_time)
54+
index = bisect.bisect_left(
55+
[key_func(t) for t in self._due_jobs_list], key_func(item)
56+
)
57+
self._due_jobs_list.insert(index, item)
58+
59+
def schedule_get_job(self):
60+
# Scheduler outputs a job to be processed following the specified policy.
61+
job_id = None
62+
63+
# [TODO] use class abstraction for SchedulingPolicy
64+
if self._policy == SchedulePolicy.FIFO:
65+
if not self._jobs_queue.empty():
66+
job_id = self._jobs_queue.get()
67+
68+
# Every time when popping a job from queue,
69+
# we check if this job is in active state.
70+
while (
71+
job_id
72+
and job_id in self._inactive_jobs
73+
and not self._jobs_queue.empty()
74+
):
75+
job_id = self._jobs_queue.get()
76+
77+
else:
78+
print("Unsupported scheduling policy!")
79+
80+
return job_id
81+
82+
def get_inactive_jobs(self):
83+
return self._inactive_jobs
84+
85+
async def expire_jobs(self):
86+
# This is to expire jobs based on specified due time per job.
87+
if self._policy == SchedulePolicy.FIFO:
88+
current_time = time.time()
89+
idx = 0
90+
while (
91+
idx < len(self._due_jobs_list)
92+
and self._due_jobs_list[idx][1] <= current_time
93+
):
94+
idx += 1
95+
96+
print("Number of expired jobs is ", idx)
97+
for i in range(idx):
98+
# Update job's status to job manager
99+
job_id = self._due_jobs_list[i][0]
100+
self._inactive_jobs.add(job_id)
101+
print("======> ", job_id, "expired.")
102+
self._due_jobs_list = self._due_jobs_list[idx:]
103+
else:
104+
print("Unsupported scheduling policy!")
105+
106+
async def job_cleanup_loop(self):
107+
"""
108+
This is a long-running process to check if jobs have expired or not.
109+
"""
110+
round_id = 0
111+
while True:
112+
start_time = time.time() # Record start time
113+
await self.expire_jobs() # Run the process
114+
elapsed_time = time.time() - start_time # Calculate elapsed time
115+
time_to_next_run = max(
116+
0, self.interval - elapsed_time
117+
) # Calculate remaining time
118+
print("Sliding, round: ", round_id)
119+
round_id += 1
120+
await asyncio.sleep(time_to_next_run) # Wait for the remaining time

0 commit comments

Comments
 (0)