Skip to content
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# Copyright 2021-2024 VMware, Inc.
# SPDX-License-Identifier: Apache-2.0
import logging
import socket

STRUCTLOG_LOGGING_METADATA_KEY = "structlog_metadata"
STRUCTLOG_LOGGING_FORMAT_KEY = "structlog_format"
Expand Down Expand Up @@ -63,3 +64,17 @@
) | set(STRUCTLOG_LOGGING_METADATA_JOB.keys())

RECORD_DEFAULT_FIELDS = set(vars(logging.LogRecord("", "", "", "", "", "", "")))

# Syslog constants
SYSLOG_HOST_KEY = "SYSLOG_HOST"
SYSLOG_PORT_KEY = "SYSLOG_PORT"
SYSLOG_PROTOCOL_KEY = "SYSLOG_PROTOCOL"
SYSLOG_ENABLED_KEY = "SYSLOG_ENABLED"

# Default values for Syslog
DEFAULT_SYSLOG_HOST = "localhost"
DEFAULT_SYSLOG_PORT = 514
DEFAULT_SYSLOG_PROTOCOL = "UDP"
DEFAULT_SYSLOG_ENABLED = False

SYSLOG_PROTOCOLS = {"UDP": socket.SOCK_DGRAM, "TCP": socket.SOCK_STREAM}
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,26 @@
from vdk.internal.builtin_plugins.run.step import Step
from vdk.internal.core.config import ConfigurationBuilder
from vdk.internal.core.context import CoreContext
from vdk.internal.core.statestore import CommonStoreKeys
from vdk.plugin.structlog.constants import DEFAULT_SYSLOG_ENABLED
from vdk.plugin.structlog.constants import DEFAULT_SYSLOG_HOST
from vdk.plugin.structlog.constants import DEFAULT_SYSLOG_PORT
from vdk.plugin.structlog.constants import DEFAULT_SYSLOG_PROTOCOL
from vdk.plugin.structlog.constants import JSON_STRUCTLOG_LOGGING_METADATA_DEFAULT
from vdk.plugin.structlog.constants import STRUCTLOG_CONSOLE_LOG_PATTERN
from vdk.plugin.structlog.constants import STRUCTLOG_LOGGING_FORMAT_DEFAULT
from vdk.plugin.structlog.constants import STRUCTLOG_LOGGING_FORMAT_KEY
from vdk.plugin.structlog.constants import STRUCTLOG_LOGGING_FORMAT_POSSIBLE_VALUES
from vdk.plugin.structlog.constants import STRUCTLOG_LOGGING_METADATA_ALL_KEYS
from vdk.plugin.structlog.constants import STRUCTLOG_LOGGING_METADATA_KEY
from vdk.plugin.structlog.constants import SYSLOG_ENABLED_KEY
from vdk.plugin.structlog.constants import SYSLOG_HOST_KEY
from vdk.plugin.structlog.constants import SYSLOG_PORT_KEY
from vdk.plugin.structlog.constants import SYSLOG_PROTOCOL_KEY
from vdk.plugin.structlog.filters import AttributeAdder
from vdk.plugin.structlog.formatters import create_formatter
from vdk.plugin.structlog.log_level_utils import set_non_root_log_levels
from vdk.plugin.structlog.syslog_config import configure_syslog_handler

"""
Handlers
Expand Down Expand Up @@ -98,6 +108,30 @@ def vdk_configure(self, config_builder: ConfigurationBuilder):
),
)

config_builder.add(
key=SYSLOG_HOST_KEY,
default_value=DEFAULT_SYSLOG_HOST,
description="Hostname of the Syslog server.",
)

config_builder.add(
key=SYSLOG_PORT_KEY,
default_value=DEFAULT_SYSLOG_PORT,
description="Port of the Syslog server.",
)

config_builder.add(
key=SYSLOG_PROTOCOL_KEY,
default_value=DEFAULT_SYSLOG_PROTOCOL,
description="Syslog protocol (UDP or TCP).",
)

config_builder.add(
key=SYSLOG_ENABLED_KEY,
default_value=DEFAULT_SYSLOG_ENABLED,
description="Enable Syslog logging (True or False).",
)

@hookimpl
def vdk_initialize(self, context: CoreContext):
if logging.getLogger().getEffectiveLevel() == logging.NOTSET:
Expand Down Expand Up @@ -177,10 +211,32 @@ def initialize_job(self, context: JobContext) -> None:

root_logger.addHandler(handler)

syslog_enabled = context.core_context.configuration.get_value(
SYSLOG_ENABLED_KEY
)
syslog_host = context.core_context.configuration.get_value(SYSLOG_HOST_KEY)
syslog_port = context.core_context.configuration.get_value(SYSLOG_PORT_KEY)
syslog_protocol = context.core_context.configuration.get_value(
SYSLOG_PROTOCOL_KEY
)
attempt_id = context.core_context.state.get(CommonStoreKeys.ATTEMPT_ID)
syslog_handler = configure_syslog_handler(
syslog_enabled,
syslog_host,
syslog_port,
syslog_protocol,
job_name,
attempt_id,
)
if syslog_handler:
root_logger.addHandler(syslog_handler)

out: HookCallResult
out = yield

root_logger.removeHandler(handler)
if syslog_handler:
root_logger.removeHandler(syslog_handler)

@hookimpl(hookwrapper=True)
def run_job(self, context: JobContext) -> Optional[ExecutionResult]:
Expand Down Expand Up @@ -226,6 +282,26 @@ def run_job(self, context: JobContext) -> Optional[ExecutionResult]:

root_logger.addHandler(handler)

syslog_enabled = context.core_context.configuration.get_value(
SYSLOG_ENABLED_KEY
)
syslog_host = context.core_context.configuration.get_value(SYSLOG_HOST_KEY)
syslog_port = context.core_context.configuration.get_value(SYSLOG_PORT_KEY)
syslog_protocol = context.core_context.configuration.get_value(
SYSLOG_PROTOCOL_KEY
)
attempt_id = context.core_context.state.get(CommonStoreKeys.ATTEMPT_ID)
syslog_handler = configure_syslog_handler(
syslog_enabled,
syslog_host,
syslog_port,
syslog_protocol,
job_name,
attempt_id,
)
if syslog_handler:
root_logger.addHandler(syslog_handler)

out: HookCallResult
out = yield

Expand All @@ -235,11 +311,17 @@ def run_job(self, context: JobContext) -> Optional[ExecutionResult]:
def run_step(self, context: JobContext, step: Step) -> Optional[StepResult]:
root_logger = logging.getLogger()
handler = root_logger.handlers[0]

metadata_filter = None
# make sure the metadata filter executes last
# so that step_name and step_type are filtered if necessary
metadata_filter = [f for f in handler.filters if f.name == "metadata_filter"][0]
handler.removeFilter(metadata_filter)
metadata_filter_result = [
f for f in handler.filters if f.name == "metadata_filter"
]
if metadata_filter_result:
metadata_filter = metadata_filter_result[0]

if metadata_filter:
handler.removeFilter(metadata_filter)

step_name_adder = AttributeAdder("vdk_step_name", step.name)
step_type_adder = AttributeAdder("vdk_step_type", step.type)
Expand All @@ -248,7 +330,8 @@ def run_step(self, context: JobContext, step: Step) -> Optional[StepResult]:

# make sure the metadata filter executes last
# so that step_name and step_type are filtered if necessary
handler.addFilter(metadata_filter)
if metadata_filter:
handler.addFilter(metadata_filter)
out: HookCallResult
out = yield
handler.removeFilter(step_name_adder)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
# Copyright 2021-2024 VMware, Inc.
# SPDX-License-Identifier: Apache-2.0
import logging.handlers

from vdk.plugin.structlog.constants import SYSLOG_PROTOCOLS
from vdk.plugin.structlog.filters import AttributeAdder


DETAILED_FORMAT = (
"%(asctime)s [VDK] %(job_name)s [%(levelname)-5.5s] %(name)-30.30s %(filename)20.20s:%("
"lineno)-4.4s %(funcName)-16.16s[id:%(attempt_id)s]- %(message)s"
)


def configure_syslog_handler(
syslog_enabled,
syslog_host,
syslog_port,
syslog_protocol,
job_name="",
attempt_id="no-id",
):
if not syslog_enabled:
return None

if syslog_protocol not in SYSLOG_PROTOCOLS:
raise ValueError(
f"Provided configuration variable for SYSLOG_PROTOCOL has an invalid value. "
f"VDK was run with SYSLOG_PROTOCOL={syslog_protocol}, however, "
f"{syslog_protocol} is an invalid value for this variable. "
f"Provide a valid value for SYSLOG_PROTOCOL. "
f"Currently possible values are {list(SYSLOG_PROTOCOLS.keys())}"
)

syslog_socktype = SYSLOG_PROTOCOLS[syslog_protocol.upper()]
syslog_handler = logging.handlers.SysLogHandler(
address=(syslog_host, syslog_port),
facility=logging.handlers.SysLogHandler.LOG_DAEMON,
socktype=syslog_socktype,
)

formatter = logging.Formatter(DETAILED_FORMAT)
syslog_handler.setFormatter(formatter)

job_name_adder = AttributeAdder("job_name", job_name)
attempt_id_adder = AttributeAdder("attempt_id", attempt_id)

syslog_handler.addFilter(job_name_adder)
syslog_handler.addFilter(attempt_id_adder)

syslog_handler.setLevel("DEBUG")

return syslog_handler
40 changes: 40 additions & 0 deletions projects/vdk-plugins/vdk-structlog/tests/test_structlog.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
import logging
import os
import re
import socket
import threading
from unittest import mock

import pytest
Expand Down Expand Up @@ -108,6 +110,44 @@ def test_structlog(log_format):
)


def test_structlog_syslog():
with mock.patch.dict(
os.environ,
{
"VDK_STRUCTLOG_METADATA": f"timestamp,level,file_name,line_number,vdk_job_name,{BOUND_TEST_KEY},{EXTRA_TEST_KEY}",
"VDK_STRUCTLOG_FORMAT": "console",
"LOG_LEVEL_MODULE": "test_structlog=WARNING",
"VDK_SYSLOG_HOST": "127.0.0.1",
"VDK_SYSLOG_PORT": "32123",
"VDK_SYSLOG_PROTOCOL": "UDP",
"VDK_SYSLOG_ENABLED": "True",
},
):
syslog_out = []

def start_syslog_server(host, port):
server_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
server_socket.bind((host, port))
print(f"Syslog server listening on {host}:{port}")

while True:
data, addr = server_socket.recvfrom(1024)
syslog_out.append(f"{data.decode('utf-8')}")

server = threading.Thread(
target=start_syslog_server, args=("127.0.0.1", 32123), daemon=True
)
server.start()

_run_job_and_get_logs()

assert syslog_out

# assert log entries are formatted using the hardcoded formatter
for log in syslog_out:
assert re.search("\\[id:.*\\]", log)


@pytest.mark.parametrize("log_format", ["console", "ltsv", "json"])
def test_stock_fields_removal(log_format):
stock_field_reps = STOCK_FIELD_REPRESENTATIONS[log_format]
Expand Down
84 changes: 84 additions & 0 deletions projects/vdk-plugins/vdk-structlog/tests/test_syslog_config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
# Copyright 2021-2024 VMware, Inc.
# SPDX-License-Identifier: Apache-2.0
import logging.handlers
import socket
from unittest.mock import MagicMock
from unittest.mock import patch

import pytest
from vdk.plugin.structlog.filters import AttributeAdder
from vdk.plugin.structlog.syslog_config import configure_syslog_handler
from vdk.plugin.structlog.syslog_config import DETAILED_FORMAT


@pytest.fixture
def mock_job_context():
job_context = MagicMock()
job_context.name = "test_job"
job_context.core_context.state.get.return_value = "12345"
job_context.core_context.configuration.get_value.side_effect = lambda key: {
"SYSLOG_ENABLED": True,
"SYSLOG_HOST": "localhost",
"SYSLOG_PORT": 514,
"SYSLOG_PROTOCOL": "UDP",
}.get(key, None)
return job_context


def test_configure_syslog_handler_enabled(mock_job_context):
syslog_handler = configure_syslog_handler(
mock_job_context.core_context.configuration.get_value("SYSLOG_ENABLED"),
mock_job_context.core_context.configuration.get_value("SYSLOG_HOST"),
mock_job_context.core_context.configuration.get_value("SYSLOG_PORT"),
mock_job_context.core_context.configuration.get_value("SYSLOG_PROTOCOL"),
mock_job_context.name,
mock_job_context.core_context.state.get(),
)
assert isinstance(syslog_handler, logging.handlers.SysLogHandler)
assert syslog_handler.level == logging.DEBUG
assert any(isinstance(filter, AttributeAdder) for filter in syslog_handler.filters)
assert syslog_handler.formatter._fmt == DETAILED_FORMAT


@pytest.mark.parametrize("protocol", ["UDP", "TCP"])
@patch("socket.socket")
def test_configure_syslog_handler_with_different_protocols(
mock_socket, protocol, mock_job_context
):
mock_socket_instance = MagicMock()
mock_socket.return_value = mock_socket_instance

syslog_handler = configure_syslog_handler(
True,
"localhost",
514,
protocol,
mock_job_context.name,
mock_job_context.core_context.state.get(),
)

expected_socktype = socket.SOCK_DGRAM if protocol == "UDP" else socket.SOCK_STREAM
assert mock_socket.called
assert mock_socket.call_args[0][1] == expected_socktype
assert any(
isinstance(filter, AttributeAdder) and filter._attr_key == "job_name"
for filter in syslog_handler.filters
)
assert any(
isinstance(filter, AttributeAdder) and filter._attr_key == "attempt_id"
for filter in syslog_handler.filters
)


def test_configure_syslog_handler_disabled(mock_job_context):
syslog_handler = configure_syslog_handler(
False, "localhost", 514, "UDP", "test_job", "12345"
)
assert syslog_handler is None


def test_configure_syslog_handler_invalid_protocol(mock_job_context):
with pytest.raises(ValueError):
configure_syslog_handler(
True, "localhost", 514, "invalid_protocol", "test_job", "12345"
)