|
| 1 | +import logging |
| 2 | +import os |
| 3 | +from typing import Any, Dict, Tuple |
| 4 | + |
| 5 | +import requests # type: ignore |
| 6 | +import yaml |
| 7 | +from pydantic import BaseModel |
| 8 | + |
| 9 | +from holmes.core.tools import ( |
| 10 | + CallablePrerequisite, |
| 11 | + StructuredToolResult, |
| 12 | + Tool, |
| 13 | + ToolParameter, |
| 14 | + ToolResultStatus, |
| 15 | + Toolset, |
| 16 | + ToolsetTag, |
| 17 | +) |
| 18 | + |
| 19 | +""" |
| 20 | +Example of the content of the log filter config file: |
| 21 | +```yaml |
| 22 | +log_filter: |
| 23 | + - label: k8s-app=kube-dns |
| 24 | + filters: |
| 25 | + - "[WARNING] No files matching import glob pattern" |
| 26 | +``` |
| 27 | +""" |
| 28 | +LOG_FILTER_CONFIG_PATH = "LOG_FILTER" |
| 29 | + |
| 30 | + |
| 31 | +class LogFilter(BaseModel): |
| 32 | + label: str |
| 33 | + filters: list[str] |
| 34 | + |
| 35 | + |
| 36 | +class LogFilterConfig(BaseModel): |
| 37 | + log_filter: list[LogFilter] |
| 38 | + |
| 39 | + |
| 40 | +class LogFilterToolset(Toolset): |
| 41 | + def __init__(self): |
| 42 | + super().__init__( |
| 43 | + name="log_filter", |
| 44 | + enabled=True, |
| 45 | + description="A toolset to return a pod log filter based on pod labels.", |
| 46 | + docs_url="https://docs.robusta.dev/master/configuration/holmesgpt/toolsets/log_filter.html", |
| 47 | + icon_url="https://upload.wikimedia.org/wikipedia/commons/thumb/3/3b/Filter_icon.svg/1200px-Filter_icon.svg.png", |
| 48 | + prerequisites=[CallablePrerequisite(callable=self.prerequisites_callable)], |
| 49 | + tools=[LogFilterTool()], |
| 50 | + tags=[ToolsetTag.CLI], |
| 51 | + is_default=True, |
| 52 | + ) |
| 53 | + |
| 54 | + def prerequisites_callable(self, config: dict[str, Any]) -> Tuple[bool, str]: |
| 55 | + log_filter_config_path = os.environ.get(LOG_FILTER_CONFIG_PATH, None) |
| 56 | + if not log_filter_config_path: |
| 57 | + return True, "" |
| 58 | + |
| 59 | + try: |
| 60 | + log_filter_str = load_log_filter_config(log_filter_config_path) |
| 61 | + log_filter = yaml.safe_load(log_filter_str) |
| 62 | + LogFilterConfig.model_validate(log_filter) |
| 63 | + except Exception as e: |
| 64 | + return ( |
| 65 | + False, |
| 66 | + f"Log filter config from {log_filter_config_path} is not valid: {str(e)}", |
| 67 | + ) |
| 68 | + return True, "" |
| 69 | + |
| 70 | + def get_example_config(self) -> Dict[str, Any]: |
| 71 | + return {} |
| 72 | + |
| 73 | + |
| 74 | +class LogFilterTool(Tool): |
| 75 | + def __init__(self): |
| 76 | + super().__init__( |
| 77 | + name="log_filter", |
| 78 | + description="Return logs filter Perl-based regular expression based on the pod label.", |
| 79 | + parameters={ |
| 80 | + "label": ToolParameter( |
| 81 | + type="string", |
| 82 | + description="The pod label to filter logs by. For example, 'app=my-app'.", |
| 83 | + ), |
| 84 | + }, |
| 85 | + ) |
| 86 | + |
| 87 | + def get_parameterized_one_liner(self, params) -> str: |
| 88 | + return f"logs filter for pod label {params.get('label')}" |
| 89 | + |
| 90 | + @staticmethod |
| 91 | + def get_default_log_filter(params: dict) -> StructuredToolResult: |
| 92 | + """Returns a default log filter regex pattern filter out info level log""" |
| 93 | + default_log_filter = "(^I\d{4})|(level=info)" |
| 94 | + return StructuredToolResult( |
| 95 | + status=ToolResultStatus.SUCCESS, |
| 96 | + data=default_log_filter, |
| 97 | + params=params, |
| 98 | + ) |
| 99 | + |
| 100 | + @staticmethod |
| 101 | + def label_in_labels(key_value: str, log_filter: str) -> bool: |
| 102 | + """Check if a key=value string is in a list of labels joined by comma.""" |
| 103 | + label_list = log_filter.split(",") |
| 104 | + return any(item == key_value for item in label_list) |
| 105 | + |
| 106 | + def _invoke(self, params: Dict[str, Any]) -> StructuredToolResult: |
| 107 | + # _invoke returns default log filter if no matching label is found |
| 108 | + |
| 109 | + log_filter_config_path = os.environ.get(LOG_FILTER_CONFIG_PATH, None) |
| 110 | + if not log_filter_config_path: |
| 111 | + return self.get_default_log_filter(params) |
| 112 | + |
| 113 | + if params.get("label") is None: |
| 114 | + logging.info("label is not provided. Returning default log filter.") |
| 115 | + return self.get_default_log_filter(params) |
| 116 | + |
| 117 | + try: |
| 118 | + log_filter_str = load_log_filter_config(log_filter_config_path) |
| 119 | + log_filter_dict = yaml.safe_load(log_filter_str) |
| 120 | + |
| 121 | + log_filters = LogFilterConfig(**log_filter_dict) |
| 122 | + |
| 123 | + for log_filter in log_filters.log_filter: |
| 124 | + if self.label_in_labels(params["label"], log_filter.label): |
| 125 | + combined_filter = "|".join(log_filter.filters) |
| 126 | + return StructuredToolResult( |
| 127 | + status=ToolResultStatus.SUCCESS, |
| 128 | + data=f"({combined_filter})", |
| 129 | + params=params, |
| 130 | + ) |
| 131 | + logging.info( |
| 132 | + f"label '{params['label']}' not found in log filter config. Returning default log filter." |
| 133 | + ) |
| 134 | + except Exception as e: |
| 135 | + logging.error( |
| 136 | + f"Error processing log filter config: {str(e)}. Returning default log filter." |
| 137 | + ) |
| 138 | + return self.get_default_log_filter(params) |
| 139 | + |
| 140 | + |
| 141 | +def load_log_filter_config(file_path: str) -> str: |
| 142 | + """Reads a file, either local or remote. |
| 143 | +
|
| 144 | + Args: |
| 145 | + file_path: The path to the file, can be a local path or a URL. |
| 146 | +
|
| 147 | + Returns: |
| 148 | + The content of the file as a string, or None if an error occurs. |
| 149 | + """ |
| 150 | + if file_path.startswith("http://") or file_path.startswith("https://"): |
| 151 | + # Handle remote file (URL) |
| 152 | + response = requests.get(file_path) |
| 153 | + response.raise_for_status() # Raise an exception for bad status codes |
| 154 | + return response.text |
| 155 | + # Handle local file |
| 156 | + if os.path.exists(file_path) and os.path.isfile(file_path): |
| 157 | + with open(file_path, "r") as file: |
| 158 | + return file.read() |
| 159 | + raise FileNotFoundError(f"File not found: {file_path}") |
0 commit comments