Skip to content

SONIC configuration updates #47748

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Apr 10, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2828,7 +2828,7 @@ class UpgradeWorkflow_SonicTriton(UpgradeWorkflow):
def setup_(self, step, stepName, stepDict, k, properties):
stepDict[stepName][k] = merge([{'--procModifiers': 'allSonicTriton'}, stepDict[step][k]])
def condition(self, fragment, stepList, key, hasHarvest):
return ((fragment=='TTbar_13' or fragment=='TTbar_14TeV') and '2022' in key) \
return ((fragment=='TTbar_13' or fragment=='TTbar_14TeV') and key.startswith('202')) \
or (fragment=='TTbar_14TeV' and 'Run4' in key)
upgradeWFs['SonicTriton'] = UpgradeWorkflow_SonicTriton(
steps = [
Expand Down
4 changes: 2 additions & 2 deletions HeterogeneousCore/SonicCore/test/BuildFile.xml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
<environment>
<test name="TestHeterogeneousCoreSonicCoreProducer" command="cmsRun ${LOCALTOP}/src/HeterogeneousCore/SonicCore/test/sonicTest_cfg.py moduleType=Producer"/>
<test name="TestHeterogeneousCoreSonicCoreFilter" command="cmsRun ${LOCALTOP}/src/HeterogeneousCore/SonicCore/test/sonicTest_cfg.py moduleType=Filter"/>
<test name="TestHeterogeneousCoreSonicCoreProducer" command="cmsRun ${LOCALTOP}/src/HeterogeneousCore/SonicCore/test/sonicTest_cfg.py --moduleType Producer"/>
<test name="TestHeterogeneousCoreSonicCoreFilter" command="cmsRun ${LOCALTOP}/src/HeterogeneousCore/SonicCore/test/sonicTest_cfg.py --moduleType Filter"/>
<test name="TestHeterogeneousCoreSonicCoreOneAnalyzer" command="cmsRun ${LOCALTOP}/src/HeterogeneousCore/SonicCore/test/sonicTestAna_cfg.py"/>
<library file="SonicDummyProducer.cc,SonicDummyFilter.cc,SonicDummyOneAnalyzer.cc" name="HeterogeneousCoreSonicCoreTest">
<flags EDM_PLUGIN="1"/>
Expand Down
1 change: 0 additions & 1 deletion HeterogeneousCore/SonicCore/test/sonicTestAna_cfg.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import FWCore.ParameterSet.Config as cms
from FWCore.ParameterSet.VarParsing import VarParsing

process = cms.Process("Test")

Expand Down
12 changes: 5 additions & 7 deletions HeterogeneousCore/SonicCore/test/sonicTest_cfg.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,11 @@
import FWCore.ParameterSet.Config as cms
from FWCore.ParameterSet.VarParsing import VarParsing

options = VarParsing()
options.register("moduleType","", VarParsing.multiplicity.singleton, VarParsing.varType.string)
options.parseArguments()
from argparse import ArgumentParser, ArgumentDefaultsHelpFormatter

_allowedModuleTypes = ["Producer","Filter"]
if options.moduleType not in ["Producer","Filter"]:
raise ValueError("Unknown module type: {} (allowed: {})".format(options.moduleType,_allowedModuleTypes))
parser = ArgumentParser(formatter_class=ArgumentDefaultsHelpFormatter)
parser.add_argument("--moduleType", type=str, required=True, choices=_allowedModuleTypes, help="Type of module to test")
options = parser.parse_args()

_moduleName = "SonicDummy"+options.moduleType
_moduleClass = getattr(cms,"ED"+options.moduleType)

Expand Down
169 changes: 169 additions & 0 deletions HeterogeneousCore/SonicTriton/python/customize.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
import FWCore.ParameterSet.Config as cms

def getDefaultClientPSet():
from HeterogeneousCore.SonicTriton.TritonGraphAnalyzer import TritonGraphAnalyzer
temp = TritonGraphAnalyzer()
return temp.Client

def getParser():
allowed_compression = ["none","deflate","gzip"]
allowed_devices = ["auto","cpu","gpu"]
allowed_containers = ["apptainer","docker","podman","podman-hpc"]

from argparse import ArgumentParser, ArgumentDefaultsHelpFormatter
parser = ArgumentParser(formatter_class=ArgumentDefaultsHelpFormatter)
parser.add_argument("--maxEvents", default=-1, type=int, help="Number of events to process (-1 for all)")
parser.add_argument("--serverName", default="default", type=str, help="name for server (used internally)")
parser.add_argument("--address", default="", type=str, help="server address")
parser.add_argument("--port", default=8001, type=int, help="server port")
parser.add_argument("--timeout", default=30, type=int, help="timeout for requests")
parser.add_argument("--timeoutUnit", default="seconds", type=str, help="unit for timeout")
parser.add_argument("--params", default="", type=str, help="json file containing server address/port")
parser.add_argument("--threads", default=1, type=int, help="number of threads")
parser.add_argument("--streams", default=0, type=int, help="number of streams")
parser.add_argument("--verbose", default=False, action="store_true", help="enable all verbose output")
parser.add_argument("--verboseClient", default=False, action="store_true", help="enable verbose output for clients")
parser.add_argument("--verboseServer", default=False, action="store_true", help="enable verbose output for server")
parser.add_argument("--verboseService", default=False, action="store_true", help="enable verbose output for TritonService")
parser.add_argument("--verboseDiscovery", default=False, action="store_true", help="enable verbose output just for server discovery in TritonService")
parser.add_argument("--noShm", default=False, action="store_true", help="disable shared memory")
parser.add_argument("--compression", default="", type=str, choices=allowed_compression, help="enable I/O compression")
parser.add_argument("--ssl", default=False, action="store_true", help="enable SSL authentication for server communication")
parser.add_argument("--tries", default=0, type=int, help="number of retries for failed request")
parser.add_argument("--device", default="auto", type=str.lower, choices=allowed_devices, help="specify device for fallback server")
parser.add_argument("--container", default="apptainer", type=str.lower, choices=allowed_containers, help="specify container for fallback server")
parser.add_argument("--fallbackName", default="", type=str, help="name for fallback server")
parser.add_argument("--imageName", default="", type=str, help="container image name for fallback server")
parser.add_argument("--tempDir", default="", type=str, help="temp directory for fallback server")

return parser

def getOptions(parser, verbose=False):
options = parser.parse_args()

if len(options.params)>0:
with open(options.params,'r') as pfile:
pdict = json.load(pfile)
options.address = pdict["address"]
options.port = int(pdict["port"])
if verbose: print("server = "+options.address+":"+str(options.port))

return options

def applyOptions(process, options, applyToModules=False):
process.maxEvents.input = cms.untracked.int32(options.maxEvents)

if options.threads>0:
process.options.numberOfThreads = options.threads
process.options.numberOfStreams = options.streams

if options.verbose:
configureLoggingAll(process)
else:
configureLogging(process,
client=options.verboseClient,
server=options.verboseServer,
service=options.verboseService,
discovery=options.verboseDiscovery
)

if hasattr(process,'TritonService'):
process.TritonService.fallback.container = options.container
process.TritonService.fallback.imageName = options.imageName
process.TritonService.fallback.tempDir = options.tempDir
process.TritonService.fallback.device = options.device
if len(options.fallbackName)>0:
process.TritonService.fallback.instanceBaseName = options.fallbackName
if len(options.address)>0:
process.TritonService.servers.append(
dict(
name = options.serverName,
address = options.address,
port = options.port,
useSsl = options.ssl,
)
)

if applyToModules:
process = configureModules(process, **getClientOptions(options))

return process

def getClientOptions(options):
return dict(
compression = cms.untracked.string(options.compression),
useSharedMemory = cms.untracked.bool(not options.noShm),
timeout = cms.untracked.uint32(options.timeout),
timeoutUnit = cms.untracked.string(options.timeoutUnit),
allowedTries = cms.untracked.uint32(options.tries),
)
Comment on lines +93 to +99
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could probably be

Suggested change
return dict(
compression = cms.untracked.string(options.compression),
useSharedMemory = cms.untracked.bool(not options.noShm),
timeout = cms.untracked.uint32(options.timeout),
timeoutUnit = cms.untracked.string(options.timeoutUnit),
allowedTries = cms.untracked.uint32(options.tries),
)
return dict(
compression = options.compression,
useSharedMemory = not options.noShm,
timeout = options.timeout,
timeoutUnit = options.timeoutUnit,
allowedTries = options.tries,
)

if the EDModule object being modified is expected to originate from fillDescriptions().

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately, that is not always the case. I had originally written it this way and then hit the "parameter does not already exist" error. I do not know another way around that besides always specifying the types.


def applyClientOptions(client, options):
return configureClient(client, **getClientOptions(options))

def configureModules(process, modules=None, returnConfigured=False, **kwargs):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to make sure, if user specifies the modules argument, they are expected to provide it as a dictionary of those along

labels = # list of strings
modules = {label: getattr(process, label) for label in labels}

?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this is mainly there to allow users to apply a configuration to a selected subset of modules (based on whatever criteria they might have). e.g. they could populate the dictionary by filtering out unwanted entries from process.producers_().

if modules is None:
modules = {}
modules.update(process.producers_())
modules.update(process.filters_())
modules.update(process.analyzers_())
configured = []
for pname,producer in modules.items():
if hasattr(producer,'Client'):
producer.Client = configureClient(producer.Client, **kwargs)
configured.append(pname)
if returnConfigured:
return process, configured
else:
return process

def configureClient(client, **kwargs):
client.update_(kwargs)
return client

def configureLogging(process, client=False, server=False, service=False, discovery=False):
if not any([client, server, service, discovery]):
return

keepMsgs = []
if discovery:
keepMsgs.append('TritonDiscovery')
if client:
keepMsgs.append('TritonClient')
if service:
keepMsgs.append('TritonService')

if hasattr(process,'TritonService'):
process.TritonService.verbose = service or discovery
process.TritonService.fallback.verbose = server
if client:
process, configured = configureModules(process, returnConfigured=True, verbose = True)
for module in configured:
keepMsgs.extend([module, module+':TritonClient'])

process.MessageLogger.cerr.FwkReport.reportEvery = 500
for msg in keepMsgs:
setattr(process.MessageLogger.cerr, msg,
dict(
limit = 10000000,
)
)

return process

# dedicated functions for cmsDriver customization

def configureLoggingClient(process):
return configureLogging(process, client=True)

def configureLoggingServer(process):
return configureLogging(process, server=True)

def configureLoggingService(process):
return configureLogging(process, service=True)

def configureLoggingDiscovery(process):
return configureLogging(process, discovery=True)

def configureLoggingAll(process):
return configureLogging(process, client=True, server=True, service=True, discovery=True)
8 changes: 5 additions & 3 deletions HeterogeneousCore/SonicTriton/src/TritonService.cc
Original file line number Diff line number Diff line change
Expand Up @@ -161,11 +161,13 @@ TritonService::TritonService(const edm::ParameterSet& pset, edm::ActivityRegistr
msg += modelName + ", ";
}
} else {
const std::string& baseMsg = "unable to get repository index";
const std::string& extraMsg = err.Message().empty() ? "" : ": " + err.Message();
if (verbose_)
msg += "unable to get repository index";
msg += baseMsg + extraMsg;
else
edm::LogWarning("TritonFailure") << "TritonService(): unable to get repository index for " + serverName + " (" +
server.url + ")";
edm::LogWarning("TritonFailure") << "TritonService(): " << baseMsg << " for " << serverName << " ("
<< server.url << ")" << extraMsg;
}
if (verbose_)
msg += "\n";
Expand Down
92 changes: 6 additions & 86 deletions HeterogeneousCore/SonicTriton/test/tritonTest_cfg.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import FWCore.ParameterSet.Config as cms
import os, sys, json
from argparse import ArgumentParser, ArgumentDefaultsHelpFormatter
from HeterogeneousCore.SonicTriton.customize import getDefaultClientPSet, getParser, getOptions, applyOptions, applyClientOptions

# module/model correspondence
models = {
Expand All @@ -13,46 +13,16 @@

# other choices
allowed_modes = ["Async","PseudoAsync","Sync"]
allowed_compression = ["none","deflate","gzip"]
allowed_devices = ["auto","cpu","gpu"]
allowed_containers = ["apptainer","docker","podman","podman-hpc"]

parser = ArgumentParser(formatter_class=ArgumentDefaultsHelpFormatter)
parser.add_argument("--maxEvents", default=-1, type=int, help="Number of events to process (-1 for all)")
parser.add_argument("--serverName", default="default", type=str, help="name for server (used internally)")
parser.add_argument("--address", default="", type=str, help="server address")
parser.add_argument("--port", default=8001, type=int, help="server port")
parser.add_argument("--timeout", default=30, type=int, help="timeout for requests")
parser.add_argument("--timeoutUnit", default="seconds", type=str, help="unit for timeout")
parser.add_argument("--params", default="", type=str, help="json file containing server address/port")
parser.add_argument("--threads", default=1, type=int, help="number of threads")
parser.add_argument("--streams", default=0, type=int, help="number of streams")
parser = getParser()
parser.add_argument("--modules", metavar=("MODULES"), default=["TritonGraphProducer"], nargs='+', type=str, choices=list(models), help="list of modules to run (choices: %(choices)s)")
parser.add_argument("--models", default=["gat_test"], nargs='+', type=str, help="list of models (same length as modules, or just 1 entry if all modules use same model)")
parser.add_argument("--mode", default="Async", type=str, choices=allowed_modes, help="mode for client")
parser.add_argument("--verbose", default=False, action="store_true", help="enable all verbose output")
parser.add_argument("--verboseClient", default=False, action="store_true", help="enable verbose output for clients")
parser.add_argument("--verboseServer", default=False, action="store_true", help="enable verbose output for server")
parser.add_argument("--verboseService", default=False, action="store_true", help="enable verbose output for TritonService")
parser.add_argument("--verboseDiscovery", default=False, action="store_true", help="enable verbose output just for server discovery in TritonService")
parser.add_argument("--brief", default=False, action="store_true", help="briefer output for graph modules")
parser.add_argument("--fallbackName", default="", type=str, help="name for fallback server")
parser.add_argument("--unittest", default=False, action="store_true", help="unit test mode: reduce input sizes")
parser.add_argument("--testother", default=False, action="store_true", help="also test gRPC communication if shared memory enabled, or vice versa")
parser.add_argument("--noShm", default=False, action="store_true", help="disable shared memory")
parser.add_argument("--compression", default="", type=str, choices=allowed_compression, help="enable I/O compression")
parser.add_argument("--ssl", default=False, action="store_true", help="enable SSL authentication for server communication")
parser.add_argument("--device", default="auto", type=str.lower, choices=allowed_devices, help="specify device for fallback server")
parser.add_argument("--container", default="apptainer", type=str.lower, choices=allowed_containers, help="specify container for fallback server")
parser.add_argument("--tries", default=0, type=int, help="number of retries for failed request")
options = parser.parse_args()

if len(options.params)>0:
with open(options.params,'r') as pfile:
pdict = json.load(pfile)
options.address = pdict["address"]
options.port = int(pdict["port"])
print("server = "+options.address+":"+str(options.port))
options = getOptions(parser, verbose=True)

# check models and modules
if len(options.modules)!=len(options.models):
Expand All @@ -68,30 +38,8 @@
process = cms.Process('tritonTest',enableSonicTriton)

process.load("HeterogeneousCore.SonicTriton.TritonService_cff")

process.maxEvents = cms.untracked.PSet( input = cms.untracked.int32(options.maxEvents) )

process.source = cms.Source("EmptySource")

process.TritonService.verbose = options.verbose or options.verboseService or options.verboseDiscovery
process.TritonService.fallback.verbose = options.verbose or options.verboseServer
process.TritonService.fallback.container = options.container
process.TritonService.fallback.device = options.device
if len(options.fallbackName)>0:
process.TritonService.fallback.instanceBaseName = options.fallbackName
if len(options.address)>0:
process.TritonService.servers.append(
cms.PSet(
name = cms.untracked.string(options.serverName),
address = cms.untracked.string(options.address),
port = cms.untracked.uint32(options.port),
useSsl = cms.untracked.bool(options.ssl),
rootCertificates = cms.untracked.string(""),
privateKey = cms.untracked.string(""),
certificateChain = cms.untracked.string(""),
)
)

# Let it run
process.p = cms.Path()

Expand All @@ -101,31 +49,19 @@
"Analyzer": cms.EDAnalyzer,
}

keepMsgs = []
if options.verbose or options.verboseDiscovery:
keepMsgs.append('TritonDiscovery')
if options.verbose or options.verboseClient:
keepMsgs.append('TritonClient')
if options.verbose or options.verboseService:
keepMsgs.append('TritonService')
defaultClient = applyClientOptions(getDefaultClientPSet().clone(), options)

for im,module in enumerate(options.modules):
model = options.models[im]
Module = [obj for name,obj in modules.items() if name in module][0]
setattr(process, module,
Module(module,
Client = cms.PSet(
Client = defaultClient.clone(
mode = cms.string(options.mode),
preferredServer = cms.untracked.string(""),
timeout = cms.untracked.uint32(options.timeout),
timeoutUnit = cms.untracked.string(options.timeoutUnit),
modelName = cms.string(model),
modelVersion = cms.string(""),
modelConfigPath = cms.FileInPath("HeterogeneousCore/SonicTriton/data/models/{}/config.pbtxt".format(model)),
verbose = cms.untracked.bool(options.verbose or options.verboseClient),
allowedTries = cms.untracked.uint32(options.tries),
useSharedMemory = cms.untracked.bool(not options.noShm),
compression = cms.untracked.string(options.compression),
)
)
)
Expand All @@ -148,8 +84,6 @@
processModule.edgeMax = cms.uint32(15000)
processModule.brief = cms.bool(options.brief)
process.p += processModule
if options.verbose or options.verboseClient:
keepMsgs.extend([module,module+':TritonClient'])
if options.testother:
# clone modules to test both gRPC and shared memory
_module2 = module+"GRPC" if processModule.Client.useSharedMemory else "SHM"
Expand All @@ -160,19 +94,5 @@
)
processModule2 = getattr(process, _module2)
process.p += processModule2
if options.verbose or options.verboseClient:
keepMsgs.extend([_module2,_module2+':TritonClient'])

process.load('FWCore/MessageService/MessageLogger_cfi')
process.MessageLogger.cerr.FwkReport.reportEvery = 500
for msg in keepMsgs:
setattr(process.MessageLogger.cerr,msg,
cms.untracked.PSet(
limit = cms.untracked.int32(10000000),
)
)

if options.threads>0:
process.options.numberOfThreads = options.threads
process.options.numberOfStreams = options.streams

process = applyOptions(process, options)