Skip to content
Open
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 46 additions & 2 deletions sky/jobs/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
from sky.jobs import scheduler
from sky.jobs import state as managed_job_state
from sky.schemas.api import responses
from sky.serve import serve_state
from sky.skylet import constants
from sky.skylet import job_lib
from sky.skylet import log_lib
Expand Down Expand Up @@ -95,7 +96,7 @@

_JOB_WAITING_STATUS_MESSAGE = ux_utils.spinner_message(
'Waiting for task to start[/]'
'{status_str}. It may take a few minutes.\n'
'{status_str}. It may take a few minutes.{pool_str}\n'
' [dim]View controller logs: sky jobs logs --controller {job_id}')
_JOB_CANCELLED_MESSAGE = (
ux_utils.spinner_message('Waiting for task status to be updated.') +
Expand Down Expand Up @@ -1195,6 +1196,41 @@ def controller_log_file_for_job(job_id: int,
return os.path.join(log_dir, f'{job_id}.log')


def _get_pool_health_summary(pool_name: str) -> Optional[str]:
"""One-line summary of a pool's worker statuses for the wait spinner.

Pool jobs that can't dispatch (no idle replicas) sit in PENDING with the
generic 'Waiting for task to start' spinner — the user has no signal
whether the pool is healthy or just busy. This helper produces a short
string like 'Pool 'foo': 1/3 ready (2 provisioning)' to surface in that
spinner. Returns None on any error so the caller falls back cleanly.
"""
try:
replica_infos = serve_state.get_replica_infos(pool_name)
if not replica_infos:
return f'Pool {pool_name!r}: 0 workers'
total = len(replica_infos)
ready = sum(1 for r in replica_infos
if r.status == serve_state.ReplicaStatus.READY)
failed_set = set(serve_state.ReplicaStatus.failed_statuses())
other: 'collections.OrderedDict[str, int]' = collections.OrderedDict()
for r in replica_infos:
if r.status == serve_state.ReplicaStatus.READY:
continue
if r.status in failed_set:
label = 'failed'
else:
label = r.status.value.lower()
other[label] = other.get(label, 0) + 1
if other:
other_str = ', '.join(f'{c} {label}' for label, c in other.items())
return (f'Pool {pool_name!r}: {ready}/{total} ready '
f'({other_str})')
return f'Pool {pool_name!r}: {ready}/{total} ready'
Comment thread
lloyd-brown marked this conversation as resolved.
Outdated
except Exception: # pylint: disable=broad-except
return None


def stream_logs_by_id(
job_id: int,
follow: bool = True,
Expand Down Expand Up @@ -1312,7 +1348,9 @@ def matches_task_filter(task_id: int, task_name: str,
# task_filter is a str, match by task name
return task_name == task_filter

msg = _JOB_WAITING_STATUS_MESSAGE.format(status_str='', job_id=job_id)
msg = _JOB_WAITING_STATUS_MESSAGE.format(status_str='',
pool_str='',
job_id=job_id)
status_display = rich_utils.safe_status(msg)
num_tasks = managed_job_state.get_num_tasks(job_id)

Expand Down Expand Up @@ -1501,10 +1539,16 @@ def matches_task_filter(task_id: int, task_name: str,
if (managed_job_status is not None and managed_job_status !=
managed_job_state.ManagedJobStatus.RUNNING):
status_str = f' (status: {managed_job_status.value})'
pool_str = ''
if pool is not None:
pool_summary = _get_pool_health_summary(pool)
if pool_summary is not None:
pool_str = f'\n [dim]{pool_summary}'
logger.debug(
f'INFO: The log is not ready yet{status_str}. '
f'Waiting for {JOB_STATUS_CHECK_GAP_SECONDS} seconds.')
msg = _JOB_WAITING_STATUS_MESSAGE.format(status_str=status_str,
pool_str=pool_str,
job_id=job_id)
if msg != prev_msg:
status_display.update(msg)
Expand Down
Loading