Skip to content

Commit 9db1e5b

Browse files
committed
Implement simple MPI client and server for CMSSW
This implementation foresees two CMSSW processes running in parallel, on the same or different machines: - a "driver" process; - a "follower" process. The driver process is free to analyze any run/lumi/event from any source. An "MPIDriver" module runs inside the driver process, listens to all stream transitions (begin/end stream, begin/end run, begin/end lumi, new event) and reads an arbitrary list of event data products. The MPIDriver module connects over MPI to the follower process; it will signal the same transitions to the follower process, and transfer to it the event data products it reads. The follower process uses a dedicated source, an "MPISource" module, to recreate the same run, lumi and event transitions received from the driver process. The MPISource connects to the MPIDriver module in the driver process, and will generate in the local process the same run and lumi transitions and events that it receives; it will populate the local event with the data product received from the driver, along with an additional product used to keep track of the driver process rank and stream. An automated test is available in the test/ directory. Current limitations: - support a single stream - support a single "follower" - transfer only per-event data products
1 parent 649dbc0 commit 9db1e5b

20 files changed

+1647
-0
lines changed
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
<use name="DataFormats/Common"/>
2+
<use name="FWCore/Framework"/>
3+
<export>
4+
<lib name="1"/>
5+
</export>
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
#ifndef HeterogeneousCore_MPICore_interface_MPIOrigin_h
2+
#define HeterogeneousCore_MPICore_interface_MPIOrigin_h
3+
4+
#include "DataFormats/Common/interface/traits.h"
5+
6+
/* MPIOrigin
7+
*
8+
* Include the information that describe form which MPI process an Event originates from,
9+
* so that data products can be sent back to it.
10+
*
11+
* Data members are `int` to match types expected by MPI.
12+
*
13+
* This class is always transient, as it doesn't make sense to store it and read it from
14+
* a different, subsequent process.
15+
*/
16+
17+
class MPIOrigin : public edm::DoNotRecordParents {
18+
public:
19+
MPIOrigin() = default;
20+
MPIOrigin(int process, uint32_t stream) : process_(process), stream_(static_cast<int>(stream)) {}
21+
22+
int process() const { return process_; } // rank of the original MPI process
23+
int rank() const { return process_; } // alias for process()
24+
25+
int stream() const { return stream_; } // EDM stream id within the original process
26+
int tag() const { return stream_; } // alias for stream()
27+
28+
bool valid() const { return (-1 != process_ and -1 != stream_); }
29+
30+
private:
31+
int process_ = -1; // the rank of the MPI process the Even originates from
32+
int stream_ = -1; // the EDM stream ID within the original process
33+
};
34+
35+
#endif // HeterogeneousCore_MPICore_interface_MPIOrigin_h
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
<library file="*.cc" name="HeterogeneousCoreMPICorePlugins">
2+
<use name="DataFormats/Common"/>
3+
<use name="DataFormats/Provenance"/>
4+
<use name="FWCore/Framework"/>
5+
<use name="FWCore/MessageLogger"/>
6+
<use name="FWCore/ParameterSet"/>
7+
<use name="FWCore/PluginManager"/>
8+
<use name="FWCore/Reflection"/>
9+
<use name="FWCore/ServiceRegistry"/>
10+
<use name="FWCore/Sources"/>
11+
<use name="FWCore/Utilities"/>
12+
<use name="HeterogeneousCore/MPICore"/>
13+
<use name="HeterogeneousCore/MPIServices"/>
14+
<flags EDM_PLUGIN="1"/>
15+
</library>
Lines changed: 299 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,299 @@
1+
#include <iostream>
2+
#include <sstream>
3+
4+
#include <mpi.h>
5+
6+
#include <TBufferFile.h>
7+
#include <TClass.h>
8+
9+
#include "DataFormats/Provenance/interface/BranchKey.h"
10+
#include "DataFormats/Provenance/interface/ProcessHistoryRegistry.h"
11+
#include "FWCore/Framework/interface/Event.h"
12+
#include "FWCore/Framework/interface/GenericHandle.h"
13+
#include "FWCore/Framework/interface/LuminosityBlock.h"
14+
#include "FWCore/Framework/interface/MakerMacros.h"
15+
#include "FWCore/Framework/interface/Run.h"
16+
#include "FWCore/Framework/interface/stream/EDAnalyzer.h"
17+
#include "FWCore/MessageLogger/interface/MessageLogger.h"
18+
#include "FWCore/ParameterSet/interface/ConfigurationDescriptions.h"
19+
#include "FWCore/ParameterSet/interface/ParameterDescriptionNode.h"
20+
#include "FWCore/ParameterSet/interface/ParameterSet.h"
21+
#include "FWCore/ParameterSet/interface/ParameterSetDescription.h"
22+
#include "FWCore/Reflection/interface/ObjectWithDict.h"
23+
#include "FWCore/Reflection/interface/TypeWithDict.h"
24+
#include "FWCore/ServiceRegistry/interface/Service.h"
25+
#include "FWCore/Utilities/src/Guid.h"
26+
#include "HeterogeneousCore/MPIServices/interface/MPIService.h"
27+
28+
#include "api.h"
29+
#include "messages.h"
30+
31+
/* MPIDriver class
32+
*
33+
* This module runs inside a CMSSW job (the "driver") and connects to an "MPISource" in a separate CMSSW job (the "follower").
34+
* The follower is informed of all stream transitions seen by the driver, and can replicate them in it
35+
*
36+
* Current limitations:
37+
* - support a single "follower"
38+
* - transfer the stream transitions, but no data products
39+
*
40+
* Future work:
41+
* - support multiple servers
42+
* - let this module consume any number of event, lumi and run products
43+
* (use an output module-like syntax ?), and send them to the server
44+
*/
45+
46+
class MPIDriver : public edm::stream::EDAnalyzer<> {
47+
public:
48+
explicit MPIDriver(edm::ParameterSet const& config);
49+
~MPIDriver() override;
50+
51+
void beginStream(edm::StreamID sid) override;
52+
void endStream() override;
53+
54+
void beginRun(edm::Run const& run, edm::EventSetup const& setup) override;
55+
void endRun(edm::Run const& run, edm::EventSetup const& setup) override;
56+
57+
void beginLuminosityBlock(edm::LuminosityBlock const& lumi, edm::EventSetup const& setup) override;
58+
void endLuminosityBlock(edm::LuminosityBlock const& lumi, edm::EventSetup const& setup) override;
59+
60+
void analyze(edm::Event const& event, edm::EventSetup const& setup) override;
61+
62+
static void fillDescriptions(edm::ConfigurationDescriptions& descriptions);
63+
64+
private:
65+
edm::StreamID sid_ = edm::StreamID::invalidStreamID();
66+
67+
MPISender sender_;
68+
MPI_Comm comm_ = MPI_COMM_NULL;
69+
70+
std::vector<std::string> eventLabels_;
71+
std::vector<edm::BranchDescription> branches_;
72+
std::vector<edm::EDGetToken> tokens_;
73+
};
74+
75+
MPIDriver::MPIDriver(edm::ParameterSet const& config)
76+
: eventLabels_(config.getUntrackedParameter<std::vector<std::string>>("eventProducts")) {
77+
// sort the labels, so they can be used with binary_search
78+
std::sort(eventLabels_.begin(), eventLabels_.end());
79+
80+
// make sure that MPI is initialised
81+
MPIService::required();
82+
83+
// make sure the EDM MPI types are available
84+
EDM_MPI_build_types();
85+
86+
// look up the "server" port
87+
char port[MPI_MAX_PORT_NAME];
88+
MPI_Lookup_name("server", MPI_INFO_NULL, port);
89+
edm::LogAbsolute("MPI") << "Trying to connect to the MPI server on port " << port;
90+
91+
// connect to the server
92+
int size;
93+
MPI_Comm_connect(port, MPI_INFO_NULL, 0, MPI_COMM_SELF, &comm_);
94+
MPI_Comm_remote_size(comm_, &size);
95+
edm::LogAbsolute("MPI") << "Client connected to " << size << (size == 1 ? " server" : " servers");
96+
if (size > 1) {
97+
throw cms::Exception("UnsupportedFeature")
98+
<< "MPIDriver supports only a single follower, but it was connected to " << size << " followers";
99+
}
100+
sender_ = MPISender(comm_, 0);
101+
102+
// register a dependency on all the event products described by the "eventProducts"
103+
callWhenNewProductsRegistered([this](edm::BranchDescription const& branch) {
104+
static const std::string kWildcard("*");
105+
static const std::string kPathStatus("edm::PathStatus");
106+
static const std::string kEndPathStatus("edm::EndPathStatus");
107+
108+
switch (branch.branchType()) {
109+
case edm::InEvent:
110+
if (std::binary_search(eventLabels_.begin(), eventLabels_.end(), branch.moduleLabel()) or
111+
(std::binary_search(eventLabels_.begin(), eventLabels_.end(), kWildcard) and
112+
branch.className() != kPathStatus and branch.className() != kEndPathStatus)) {
113+
tokens_.push_back(
114+
this->consumes(edm::TypeToGet{branch.unwrappedTypeID(), edm::PRODUCT_TYPE},
115+
edm::InputTag{branch.moduleLabel(), branch.productInstanceName(), branch.processName()}));
116+
branches_.push_back(branch);
117+
}
118+
break;
119+
120+
default:
121+
// ignore the other product types
122+
;
123+
}
124+
});
125+
}
126+
127+
MPIDriver::~MPIDriver() {
128+
// close the intercommunicator
129+
MPI_Comm_disconnect(&comm_);
130+
}
131+
132+
void MPIDriver::beginStream(edm::StreamID sid) {
133+
// store this stream's id
134+
sid_ = sid;
135+
136+
// signal the connection
137+
sender_.sendConnect(sid_);
138+
139+
/* is there a way to access all known process histories ?
140+
edm::ProcessHistoryRegistry const& registry = * edm::ProcessHistoryRegistry::instance();
141+
edm::LogAbsolute("MPI") << "ProcessHistoryRegistry:";
142+
for (auto const& keyval: registry) {
143+
edm::LogAbsolute("MPI") << keyval.first << ": " << keyval.second;
144+
}
145+
*/
146+
147+
// send the branch descriptions for all event products
148+
for (auto& branch : branches_) {
149+
sender_.sendSerializedProduct(sid_, branch);
150+
}
151+
// indicate that all branches have been sent
152+
sender_.sendComplete(sid_);
153+
// signal the begin stream
154+
sender_.sendBeginStream(sid_);
155+
}
156+
157+
void MPIDriver::endStream() {
158+
// signal the end stream
159+
sender_.sendEndStream(sid_);
160+
// signal the disconnection
161+
sender_.sendDisconnect(sid_);
162+
}
163+
164+
void MPIDriver::beginRun(edm::Run const& run, edm::EventSetup const& setup) {
165+
// signal a new run, and transmit the RunAuxiliary
166+
/* FIXME
167+
* Ideally the ProcessHistoryID stored in the run.runAuxiliary() should be the correct one, and
168+
* we could simply do
169+
170+
sender_.sendBeginRun(sid_, run.runAuxiliary());
171+
172+
* Instead, it looks like the ProcessHistoryID stored in the run.runAuxiliary() is that of the
173+
* _parent_ process.
174+
* So, we make a copy of the RunAuxiliary, set the ProcessHistoryID to the correct value, and
175+
* transmit the modified RunAuxiliary.
176+
*/
177+
auto aux = run.runAuxiliary();
178+
aux.setProcessHistoryID(run.processHistory().id());
179+
sender_.sendBeginRun(sid_, aux);
180+
// transmit the ProcessHistory
181+
sender_.sendSerializedProduct(sid_, run.processHistory());
182+
}
183+
184+
void MPIDriver::endRun(edm::Run const& run, edm::EventSetup const& setup) {
185+
// signal the end of run
186+
/* FIXME
187+
* Ideally the ProcessHistoryID stored in the run.runAuxiliary() should be the correct one, and
188+
* we could simply do
189+
190+
sender_.sendEndRun(sid_, run.runAuxiliary());
191+
192+
* Instead, it looks like the ProcessHistoryID stored in the run.runAuxiliary() is that of the
193+
* _parent_ process.
194+
* So, we make a copy of the RunAuxiliary, set the ProcessHistoryID to the correct value, and
195+
* transmit the modified RunAuxiliary.
196+
*/
197+
auto aux = run.runAuxiliary();
198+
aux.setProcessHistoryID(run.processHistory().id());
199+
sender_.sendEndRun(sid_, aux);
200+
}
201+
202+
void MPIDriver::beginLuminosityBlock(edm::LuminosityBlock const& lumi, edm::EventSetup const& setup) {
203+
// signal a new luminosity block, and transmit the LuminosityBlockAuxiliary
204+
/* FIXME
205+
* Ideally the ProcessHistoryID stored in the lumi.luminosityBlockAuxiliary() should be the
206+
* correct one, and we could simply do
207+
208+
sender_.sendBeginLuminosityBlock(sid_, lumi.luminosityBlockAuxiliary());
209+
210+
* Instead, it looks like the ProcessHistoryID stored in the lumi.luminosityBlockAuxiliary() is
211+
* that of the _parent_ process.
212+
* So, we make a copy of the LuminosityBlockAuxiliary, set the ProcessHistoryID to the correct
213+
* value, and transmit the modified LuminosityBlockAuxiliary.
214+
*/
215+
auto aux = lumi.luminosityBlockAuxiliary();
216+
aux.setProcessHistoryID(lumi.processHistory().id());
217+
sender_.sendBeginLuminosityBlock(sid_, aux);
218+
}
219+
220+
void MPIDriver::endLuminosityBlock(edm::LuminosityBlock const& lumi, edm::EventSetup const& setup) {
221+
// signal the end of luminosity block
222+
/* FIXME
223+
* Ideally the ProcessHistoryID stored in the lumi.luminosityBlockAuxiliary() should be the
224+
* correct one, and we could simply do
225+
226+
sender_.sendEndLuminosityBlock(sid_, lumi.luminosityBlockAuxiliary());
227+
228+
* Instead, it looks like the ProcessHistoryID stored in the lumi.luminosityBlockAuxiliary() is
229+
* that of the _parent_ process.
230+
* So, we make a copy of the LuminosityBlockAuxiliary, set the ProcessHistoryID to the correct
231+
* value, and transmit the modified LuminosityBlockAuxiliary.
232+
*/
233+
auto aux = lumi.luminosityBlockAuxiliary();
234+
aux.setProcessHistoryID(lumi.processHistory().id());
235+
sender_.sendEndLuminosityBlock(sid_, aux);
236+
}
237+
238+
void MPIDriver::analyze(edm::Event const& event, edm::EventSetup const& setup) {
239+
/*
240+
{
241+
edm::LogAbsolute log("MPI");
242+
log << "stream " << sid_ << ": processing run " << event.run() << ", lumi " << event.luminosityBlock() << ", event "
243+
<< event.id().event();
244+
log << "\nprocess history: " << event.processHistory();
245+
log << "\nprocess history id: " << event.processHistory().id();
246+
log << "\nprocess history id: " << event.eventAuxiliary().processHistoryID() << " (from eventAuxiliary)";
247+
log << "\nisRealData " << event.eventAuxiliary().isRealData();
248+
log << "\nexperimentType " << event.eventAuxiliary().experimentType();
249+
log << "\nbunchCrossing " << event.eventAuxiliary().bunchCrossing();
250+
log << "\norbitNumber " << event.eventAuxiliary().orbitNumber();
251+
log << "\nstoreNumber " << event.eventAuxiliary().storeNumber();
252+
log << "\nprocessHistoryID " << event.eventAuxiliary().processHistoryID();
253+
log << "\nprocessGUID " << edm::Guid(event.eventAuxiliary().processGUID(), true).toString();
254+
}
255+
*/
256+
257+
// signal a new event, and transmit the EventAuxiliary
258+
sender_.sendEvent(sid_, event.eventAuxiliary());
259+
260+
// transmit the event data products
261+
unsigned int size = tokens_.size();
262+
assert(branches_.size() == size);
263+
for (unsigned int i = 0; i < size; ++i) {
264+
auto const& token = tokens_[i];
265+
auto const& branch = branches_[i];
266+
auto const& type = branch.unwrappedType();
267+
edm::GenericHandle handle(type);
268+
event.getByToken(token, handle);
269+
if (handle.isValid()) {
270+
// transmit the BranchKey in order to reconstruct the BranchDescription on the receiving side
271+
sender_.sendSerializedProduct(sid_, edm::BranchKey(branch));
272+
// transmit the ProductProvenance
273+
sender_.sendSerializedProduct(sid_, *handle.provenance()->productProvenance());
274+
// transmit the ProductID
275+
sender_.sendSerializedProduct(sid_, handle.id());
276+
// transmit the wrapped product
277+
sender_.sendSerializedProduct(sid_, *handle.product());
278+
}
279+
}
280+
281+
// indicate that all products have been sent
282+
sender_.sendComplete(sid_);
283+
}
284+
285+
void MPIDriver::fillDescriptions(edm::ConfigurationDescriptions& descriptions) {
286+
descriptions.setComment(
287+
"This module connects to an \"MPISource\" in a separate CMSSW job, and transmits all "
288+
"Runs, LuminosityBlocks and Events from the current process to the remote one."
289+
"Optionally, it can transfer any non-transient event data product to the remote process.");
290+
291+
edm::ParameterSetDescription desc;
292+
desc.addUntracked<std::vector<std::string>>("eventProducts", {})
293+
->setComment(
294+
"List of modules whose event products will be sent to the remote process. "
295+
"Use \"*\" to send all event products.");
296+
descriptions.addWithDefaultLabel(desc);
297+
}
298+
299+
DEFINE_FWK_MODULE(MPIDriver);

0 commit comments

Comments
 (0)