Skip to content

Commit bd2576d

Browse files
committed
Improve behavior of streamer system
- Merging of simple run changes now handled - Additional unit tests added, including tests of failure cases
1 parent f0dc95b commit bd2576d

10 files changed

+232
-26
lines changed

IOPool/Streamer/interface/StreamerOutputModuleBase.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,9 @@ namespace edm {
4545
private:
4646
edm::EDGetTokenT<edm::TriggerResults> trToken_;
4747
edm::EDGetTokenT<SendJobHeader::ParameterSetMap> psetToken_;
48+
edm::ProcessHistoryID lastHistory_;
4849
bool lastCallWasBeginRun_ = false;
50+
bool initWritten_ = false;
4951

5052
}; //end-of-class-def
5153
} // namespace streamer

IOPool/Streamer/src/StreamerInputFile.cc

Lines changed: 2 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -321,19 +321,8 @@ namespace edm::streamer {
321321
eventSize = head.size();
322322
if (code != Header::EVENT) {
323323
if (code == Header::INIT) {
324-
edm::LogWarning("StreamerInputFile") << "Found another INIT header in the file. It will be skipped";
325-
if (eventSize < sizeof(EventHeader)) {
326-
//very unlikely case that EventHeader is larger than total INIT size inserted in the middle of the file
327-
hdrSkipped = nGot - eventSize;
328-
memmove(&eventBuf_[0], &eventBuf_[eventSize], hdrSkipped);
329-
continue;
330-
}
331-
if (headerBuf_.size() < eventSize)
332-
headerBuf_.resize(eventSize);
333-
memcpy(&headerBuf_[0], &eventBuf_[0], nGot);
334-
readBytes(&headerBuf_[nGot], eventSize, true, nGot);
335-
//do not parse this header and proceed to the next event
336-
continue;
324+
throw Exception(errors::FileReadError, "StreamerInputFile::readEventMessage")
325+
<< "Found another INIT header in the file.";
337326
}
338327
throw Exception(errors::FileReadError, "StreamerInputFile::readEventMessage")
339328
<< "Failed reading streamer file, unknown code in event header\n"

IOPool/Streamer/src/StreamerInputSource.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ namespace edm::streamer {
6666
pReg.updateFromInput(descs);
6767
std::string mergeInfo = reg.merge(pReg, std::string(), BranchDescription::Permissive);
6868
if (!mergeInfo.empty()) {
69-
throw cms::Exception("MismatchedInput", "RootInputFileSequence::previousEvent()") << mergeInfo;
69+
throw cms::Exception("MismatchedInput", "StreamerInputSource::mergeIntoRegistry") << mergeInfo;
7070
}
7171
} else {
7272
declareStreamers(descs);

IOPool/Streamer/src/StreamerOutputModuleBase.cc

Lines changed: 25 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -26,18 +26,31 @@ namespace edm::streamer {
2626
void StreamerOutputModuleBase::beginRun(RunForOutput const& iRun) {
2727
start();
2828

29-
auto psetMapHandle = iRun.getHandle(psetToken_);
30-
31-
std::unique_ptr<InitMsgBuilder> init_message =
32-
serializeRegistry(OutputModule::processName(),
33-
description().moduleLabel(),
34-
moduleDescription().mainParameterSetID(),
35-
psetMapHandle.isValid() ? psetMapHandle.product() : nullptr);
36-
37-
doOutputHeader(*init_message);
38-
lastCallWasBeginRun_ = true;
39-
40-
clearHeaderBuffer();
29+
if (not initWritten_) {
30+
auto psetMapHandle = iRun.getHandle(psetToken_);
31+
32+
std::unique_ptr<InitMsgBuilder> init_message =
33+
serializeRegistry(OutputModule::processName(),
34+
description().moduleLabel(),
35+
moduleDescription().mainParameterSetID(),
36+
psetMapHandle.isValid() ? psetMapHandle.product() : nullptr);
37+
38+
doOutputHeader(*init_message);
39+
lastCallWasBeginRun_ = true;
40+
auto history = iRun.processHistory();
41+
lastHistory_ = history.reduce().id();
42+
initWritten_ = true;
43+
44+
clearHeaderBuffer();
45+
} else {
46+
auto history = iRun.processHistory();
47+
if (lastHistory_ != history.reduce().id()) {
48+
throw edm::Exception(errors::FileWriteError) << "Streamer output can not handle writing a new Run if the "
49+
"ProcessHistory changed since the last Run written.";
50+
}
51+
//need to write meta data anyway
52+
lastCallWasBeginRun_ = true;
53+
}
4154
}
4255

4356
void StreamerOutputModuleBase::endRun(RunForOutput const&) { stop(); }

IOPool/Streamer/test/BuildFile.xml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,10 @@
2828

2929
<test name="TestIOPoolStreamerRefProductIDMetadataConsistency" command="run_TestRefProductIDMetadataConsistencyStreamer.sh"/>
3030

31+
<test name="TestIOPoolStreamerRefMerge" command="run_RefMerge.sh"/>
32+
33+
<test name="TestIOPoolStreamerFailures" command="run_failures.sh"/>
34+
3135
<library file="StreamThingProducer.cc" name="StreamThingProducer">
3236
<flags EDM_PLUGIN="1"/>
3337
<use name="DataFormats/TestObjects"/>

IOPool/Streamer/test/ref_merge_cfg.py

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
import FWCore.ParameterSet.Config as cms
2+
import argparse
3+
import sys
4+
5+
parser = argparse.ArgumentParser(prog=sys.argv[0], description='Test ConditionalTasks.')
6+
7+
parser.add_argument("--inFile1", help="first file to read")
8+
parser.add_argument("--inFile2", help="second file to read")
9+
parser.add_argument("--outFile", help="name of output file", default="ref_merge.root")
10+
11+
args = parser.parse_args()
12+
13+
process = cms.Process("MERGE")
14+
15+
from IOPool.Streamer.modules import NewEventStreamFileReader
16+
process.source = NewEventStreamFileReader(fileNames = [f"file:{args.inFile1}",
17+
f"file:{args.inFile2}"]
18+
)
19+
20+
from IOPool.Streamer.modules import EventStreamFileWriter
21+
process.out = EventStreamFileWriter(fileName = args.outFile)
22+
23+
from FWCore.Integration.modules import OtherThingAnalyzer
24+
process.tester = OtherThingAnalyzer(other = ("d","testUserTag"))
25+
26+
process.o = cms.EndPath(process.out+process.tester)
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
import FWCore.ParameterSet.Config as cms
2+
3+
import argparse
4+
import sys
5+
6+
parser = argparse.ArgumentParser(prog=sys.argv[0], description='Test ConditionalTasks.')
7+
8+
parser.add_argument("--extraProducers", help="Add extra producers to configuration", action="store_true")
9+
parser.add_argument("--fileName", help="name of output file")
10+
parser.add_argument("--firstLumi", help="LuminosityBlock number for first lumi", type = int, default=1)
11+
parser.add_argument("--firstRun", help="LuminosityBlock number for first run", type = int, default=1)
12+
parser.add_argument("--keepAllProducts", help="Keep all products made in the job", action="store_true")
13+
parser.add_argument("--dropThings", help="drop the Things collections so the refs will not function", action="store_true")
14+
15+
args = parser.parse_args()
16+
17+
18+
process = cms.Process("PROD")
19+
20+
nEvents = 10
21+
from FWCore.Modules.modules import EmptySource
22+
process.source = EmptySource(firstRun = args.firstRun,
23+
firstLuminosityBlock = args.firstLumi,
24+
firstEvent = nEvents*(args.firstLumi-1)+1
25+
)
26+
27+
process.maxEvents.input = nEvents
28+
29+
if args.extraProducers:
30+
from FWCore.Framework.modules import IntProducer
31+
process.a = IntProducer(ivalue = 1)
32+
33+
process.b = IntProducer(ivalue = 2)
34+
35+
from FWCore.Integration.modules import ThingProducer, OtherThingProducer, OtherThingAnalyzer
36+
process.c = ThingProducer()
37+
38+
process.d = OtherThingProducer(thingTag="c")
39+
40+
outputs = []
41+
if not args.keepAllProducts:
42+
outputs = ["drop *",
43+
"keep edmtestOtherThings_*_*_*"]
44+
if not args.dropThings:
45+
outputs.append("keep edmtestThings_*_*_*")
46+
47+
48+
from IOPool.Streamer.modules import EventStreamFileWriter
49+
process.o = EventStreamFileWriter(outputCommands = outputs,
50+
fileName = args.fileName
51+
)
52+
if args.extraProducers:
53+
process.p = cms.Path(process.a+process.b+process.c*process.d)
54+
else:
55+
process.p = cms.Path(process.c*process.d)
56+
57+
process.tester = OtherThingAnalyzer(other = ("d","testUserTag"))
58+
59+
process.out = cms.EndPath(process.o+process.tester)
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
import FWCore.ParameterSet.Config as cms
2+
import argparse
3+
import sys
4+
5+
parser = argparse.ArgumentParser(prog=sys.argv[0], description='Test Refs after merge.')
6+
7+
parser.add_argument("--fileName", help="file to read")
8+
9+
args = parser.parse_args()
10+
11+
process = cms.Process("TEST")
12+
13+
from IOPool.Streamer.modules import NewEventStreamFileReader
14+
process.source = NewEventStreamFileReader(fileNames = [f"file:{args.fileName}"])
15+
16+
from FWCore.Integration.modules import OtherThingAnalyzer
17+
process.tester = OtherThingAnalyzer(other = ("d","testUserTag"))
18+
19+
process.e = cms.EndPath(process.tester)

IOPool/Streamer/test/run_RefMerge.sh

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
#!/bin/bash
2+
3+
test=ref_merge_
4+
5+
function die { echo Failure $1: status $2 ; exit $2 ; }
6+
7+
LOCAL_TEST_DIR=${SCRAM_TEST_PATH}
8+
#------------- same configs, same run ------------
9+
10+
echo ${test}prod_a ------------------------------------------------------------
11+
cmsRun ${LOCAL_TEST_DIR}/${test}prod_cfg.py --fileName 'ref_merge_proda.root' || die "cmsRun ${test}prod_cfg.py" $?
12+
13+
echo ${test}prod_b ------------------------------------------------------------
14+
cmsRun ${LOCAL_TEST_DIR}/${test}prod_cfg.py --firstLumi 10 --fileName 'ref_merge_prodb.root'|| die "cmsRun ${test}prod_cfg.py" $?
15+
16+
echo ${test}MERGE_same------------------------------------------------------------
17+
cmsRun ${LOCAL_TEST_DIR}/${test}cfg.py --inFile1 'ref_merge_proda.root' --inFile2 'ref_merge_prodb.root' --outFile 'ref_merge_same.root' || die "cmsRun ${test}cfg.py same" $?
18+
19+
echo ${test}test_same------------------------------------------------------------
20+
cmsRun ${LOCAL_TEST_DIR}/${test}test_cfg.py --fileName 'ref_merge_same.root' || die "cmsRun ${test}test_cfg.py same" $?
21+
22+
#------------- same configs different stored products, same run ------------
23+
# works if subsequent files have a strict subset of stored products of the first file
24+
25+
echo ${test}prod_b ------------------------------------------------------------
26+
cmsRun ${LOCAL_TEST_DIR}/${test}prod_cfg.py --firstLumi 10 --fileName 'ref_merge_prod_all.root' --keepAllProducts || die "cmsRun ${test}prod_cfg.py" $?
27+
28+
echo ${test}MERGE_diff_prods1------------------------------------------------------------
29+
cmsRun ${LOCAL_TEST_DIR}/${test}cfg.py --inFile2 'ref_merge_proda.root' --inFile1 'ref_merge_prod_all.root' --outFile 'ref_merge_diff_prods.root' || die "cmsRun ${test}cfg.py diff prods" $?
30+
31+
echo ${test}test_diff_prods1------------------------------------------------------------
32+
cmsRun ${LOCAL_TEST_DIR}/${test}test_cfg.py --fileName 'ref_merge_diff_prods.root' || die "cmsRun ${test}test_cfg.py diff prods" $?
33+
34+
#------------- same configs, different run ------------
35+
36+
echo ${test}prod_run10 ------------------------------------------------------------
37+
cmsRun ${LOCAL_TEST_DIR}/${test}prod_cfg.py --firstRun 10 --fileName 'ref_merge_prod_run10.root'|| die "cmsRun ${test}prod_cfg.py run10" $?
38+
39+
echo ${test}MERGE_diff_runs------------------------------------------------------------
40+
cmsRun ${LOCAL_TEST_DIR}/${test}cfg.py --inFile1 'ref_merge_proda.root' --inFile2 'ref_merge_prod_run10.root' --outFile 'ref_merge_diffRuns.root' || die "cmsRun ${test}cfg.py diff runs" $?
41+
42+
echo ${test}test_diff_runs------------------------------------------------------------
43+
cmsRun ${LOCAL_TEST_DIR}/${test}test_cfg.py --fileName 'ref_merge_diffRuns.root' || die "cmsRun ${test}test_cfg.py diff runs" $?
44+
45+
exit 0

IOPool/Streamer/test/run_failures.sh

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
#!/bin/bash
2+
3+
test=ref_merge_
4+
5+
function die { echo Failure $1: status $2 ; exit $2 ; }
6+
7+
LOCAL_TEST_DIR=${SCRAM_TEST_PATH}
8+
#------------- same configs, same run ------------
9+
10+
echo ${test}prod_a ------------------------------------------------------------
11+
cmsRun ${LOCAL_TEST_DIR}/${test}prod_cfg.py --fileName 'ref_merge_proda.root' || die "cmsRun ${test}prod_cfg.py" $?
12+
13+
echo ${test}prod_b ------------------------------------------------------------
14+
cmsRun ${LOCAL_TEST_DIR}/${test}prod_cfg.py --firstLumi 10 --fileName 'ref_merge_prodb.root'|| die "cmsRun ${test}prod_cfg.py" $?
15+
16+
#------------- same configs, same run using cat ------------
17+
cat ref_merge_proda.root ref_merge_prodb.root > ref_merge_cat.root
18+
19+
echo ${test}test_cat------------------------------------------------------------
20+
cmsRun ${LOCAL_TEST_DIR}/${test}test_cfg.py --fileName 'ref_merge_cat.root' && die "cmsRun ${test}test_cfg.py same" 1
21+
22+
#------------- same configs different stored products, same run ------------
23+
24+
echo ${test}prod_ass ------------------------------------------------------------
25+
cmsRun ${LOCAL_TEST_DIR}/${test}prod_cfg.py --firstLumi 10 --fileName 'ref_merge_prod_all.root' --keepAllProducts || die "cmsRun ${test}prod_cfg.py" $?
26+
27+
echo ${test}MERGE_diff_prods2------------------------------------------------------------
28+
cmsRun ${LOCAL_TEST_DIR}/${test}cfg.py --inFile1 'ref_merge_proda.root' --inFile2 'ref_merge_prod_all.root' --outFile 'ref_merge_diff_prods2.root' && die "cmsRun ${test}cfg.py diff prods 2" 1
29+
30+
#------------- different configs ------------
31+
32+
echo ${test}prod1 ------------------------------------------------------------
33+
cmsRun ${LOCAL_TEST_DIR}/${test}prod_cfg.py --extraProducers --fileName 'ref_merge_prod1.root' || die "cmsRun ${test}prod_cfg.py --extraProducers" $?
34+
35+
echo ${test}prod2 ------------------------------------------------------------
36+
cmsRun ${LOCAL_TEST_DIR}/${test}prod_cfg.py --firstLumi 10 --fileName 'ref_merge_prod2.root'|| die "cmsRun ${test}prod_cfg.py" $?
37+
38+
echo ${test}MERGE_diff_configs------------------------------------------------------------
39+
cmsRun ${LOCAL_TEST_DIR}/${test}cfg.py --inFile1 'ref_merge_prod1.root' --inFile2 'ref_merge_prod2.root' --outFile 'ref_merge.root' && die "cmsRun ${test}cfg.py diff configs" 1
40+
41+
#------------- different configs and different products ------------
42+
43+
echo ${test}keepAllProd ------------------------------------------------------------
44+
cmsRun ${LOCAL_TEST_DIR}/${test}prod_cfg.py --extraProducers --keepAllProducts --fileName 'ref_merge_prod_all.root' || die "cmsRun ${test}prod_cfg.py --keepAllProducts" $?
45+
46+
echo ${test}MERGE_keepAll1st ------------------------------------------------------------
47+
cmsRun ${LOCAL_TEST_DIR}/${test}cfg.py --inFile2 'ref_merge_prod_all.root' --inFile1 'ref_merge_prod2.root' --outFile 'ref_merge_all1st.root' && die "cmsRun ${test}cfg.py" 1
48+
49+
exit 0

0 commit comments

Comments
 (0)