29
29
#include " FWCore/MessageLogger/interface/ErrorObj.h"
30
30
#include " FWCore/MessageLogger/interface/MessageLogger.h"
31
31
#include " FWCore/ParameterSet/interface/ConfigurationDescriptions.h"
32
+ #include " FWCore/ParameterSet/interface/EmptyGroupDescription.h"
33
+ #include " FWCore/ParameterSet/interface/ParameterSet.h"
32
34
#include " FWCore/ParameterSet/interface/ParameterSetDescription.h"
33
35
#include " FWCore/ParameterSet/interface/ParameterSetDescriptionFiller.h"
34
- #include " FWCore/ParameterSet/interface/ParameterSet.h"
35
36
#include " FWCore/Sources/interface/ProducerSourceBase.h"
37
+ #include " FWCore/Utilities/interface/EDMException.h"
36
38
#include " HeterogeneousCore/MPICore/interface/MPIToken.h"
37
39
#include " HeterogeneousCore/MPIServices/interface/MPIService.h"
38
40
@@ -54,10 +56,22 @@ class MPISource : public edm::ProducerSourceBase {
54
56
bool setRunAndEventInfo (edm::EventID& id, edm::TimeValue_t& time, edm::EventAuxiliary::ExperimentType&) override ;
55
57
void produce (edm::Event&) override ;
56
58
59
+ enum Mode { kInvalid = 0 , kCommWorld , kIntercommunicator };
60
+ static constexpr const char * ModeDescription[] = {" Invalid" , " CommWorld" , " Intercommunicator" };
61
+ Mode parseMode (std::string const & label) {
62
+ if (label == ModeDescription[kCommWorld ])
63
+ return kCommWorld ;
64
+ else if (label == ModeDescription[kIntercommunicator ])
65
+ return kIntercommunicator ;
66
+ else
67
+ return kInvalid ;
68
+ }
69
+
57
70
char port_[MPI_MAX_PORT_NAME];
58
71
MPI_Comm comm_ = MPI_COMM_NULL;
59
72
MPIChannel channel_;
60
73
edm::EDPutTokenT<MPIToken> token_;
74
+ Mode mode_;
61
75
62
76
edm::ProcessHistory history_;
63
77
};
@@ -67,48 +81,91 @@ MPISource::MPISource(edm::ParameterSet const& config, edm::InputSourceDescriptio
67
81
// effectively be ignored, because this ConfigurableSource will explicitly set the run, lumi, and event
68
82
// numbers, the timestamp, and the event type
69
83
edm::ProducerSourceBase(config, desc, false ),
70
- token_(produces<MPIToken>()) //
84
+ token_(produces<MPIToken>()),
85
+ mode_(parseMode(config.getUntrackedParameter<std::string>(" mode" ))) //
71
86
{
72
87
// make sure that MPI is initialised
73
88
MPIService::required ();
74
89
75
- // FIXME move into the MPIService ?
76
- // make sure the EDM MPI types are available
90
+ // Make sure the EDM MPI types are available.
77
91
EDM_MPI_build_types ();
78
92
79
- // open a server-side port
80
- MPI_Open_port (MPI_INFO_NULL, port_);
93
+ if (mode_ == kCommWorld ) {
94
+ // All processes are in MPI_COMM_WORLD.
95
+ // The current implementation supports only two processes: one controller and one source.
96
+ edm::LogAbsolute (" MPI" ) << " MPISource in " << ModeDescription[mode_] << " mode." ;
97
+
98
+ // Check how many processes are there in MPI_COMM_WORLD
99
+ int size;
100
+ MPI_Comm_size (MPI_COMM_WORLD, &size);
101
+ if (size != 2 ) {
102
+ throw edm::Exception (edm::errors::Configuration)
103
+ << " The current implementation supports only two processes: one controller and one source." ;
104
+ }
81
105
82
- // publish the port under the name "server"
83
- MPI_Info port_info;
84
- MPI_Info_create (&port_info);
85
- MPI_Info_set (port_info, " ompi_global_scope" , " true" );
86
- MPI_Info_set (port_info, " ompi_unique" , " true" );
87
- MPI_Publish_name (" server" , port_info, port_);
106
+ // Check the rank of this process, and determine the rank of the other process.
107
+ int rank;
108
+ MPI_Comm_rank (MPI_COMM_WORLD, &rank);
109
+ edm::LogAbsolute (" MPI" ) << " MPISource has rank " << rank << " in MPI_COMM_WORLD." ;
110
+ int other_rank = 1 - rank;
111
+ comm_ = MPI_COMM_WORLD;
112
+ channel_ = MPIChannel (comm_, other_rank);
113
+ } else if (mode_ == kIntercommunicator ) {
114
+ // Use an intercommunicator to let two groups of processes communicate with each other.
115
+ // The current implementation supports only two processes: one controller and one source.
116
+ edm::LogAbsolute (" MPI" ) << " MPISource in " << ModeDescription[mode_] << " mode." ;
117
+
118
+ // Check how many processes are there in MPI_COMM_WORLD
119
+ int size;
120
+ MPI_Comm_size (MPI_COMM_WORLD, &size);
121
+ if (size != 1 ) {
122
+ throw edm::Exception (edm::errors::Configuration)
123
+ << " The current implementation supports only two processes: one controller and one source." ;
124
+ }
88
125
89
- // create an intercommunicator and accept a client connection
90
- edm::LogAbsolute (" MPI" ) << " waiting for a connection to the MPI server at port " << port_;
91
- MPI_Comm_accept (port_, MPI_INFO_NULL, 0 , MPI_COMM_WORLD, &comm_);
92
- channel_ = MPIChannel (comm_, 0 );
126
+ // Open a server-side port.
127
+ MPI_Open_port (MPI_INFO_NULL, port_);
128
+
129
+ // Publish the port under the name indicated by the parameter "server".
130
+ std::string name = config.getUntrackedParameter <std::string>(" name" , " server" );
131
+ MPI_Info port_info;
132
+ MPI_Info_create (&port_info);
133
+ MPI_Info_set (port_info, " ompi_global_scope" , " true" );
134
+ MPI_Info_set (port_info, " ompi_unique" , " true" );
135
+ MPI_Publish_name (name.c_str (), port_info, port_);
136
+
137
+ // Create an intercommunicator and accept a client connection.
138
+ edm::LogAbsolute (" MPI" ) << " Waiting for a connection to the MPI server at port " << port_;
139
+
140
+ MPI_Comm_accept (port_, MPI_INFO_NULL, 0 , MPI_COMM_SELF, &comm_);
141
+ edm::LogAbsolute (" MPI" ) << " Connection accepted." ;
142
+ channel_ = MPIChannel (comm_, 0 );
143
+ } else {
144
+ // Invalid mode.
145
+ throw edm::Exception (edm::errors::Configuration)
146
+ << " Invalid mode \" " << config.getUntrackedParameter <std::string>(" mode" ) << " \" " ;
147
+ }
93
148
94
- // wait for a client to connect
149
+ // Wait for a client to connect.
95
150
MPI_Status status;
96
151
EDM_MPI_Empty_t buffer;
97
152
MPI_Recv (&buffer, 1 , EDM_MPI_Empty, MPI_ANY_SOURCE, EDM_MPI_Connect, comm_, &status);
98
153
edm::LogAbsolute (" MPI" ) << " connected from " << status.MPI_SOURCE ;
99
154
}
100
155
101
156
MPISource::~MPISource () {
102
- // close the intercommunicator
103
- MPI_Comm_disconnect (&comm_);
104
-
105
- // unpublish and close the port
106
- MPI_Info port_info;
107
- MPI_Info_create (&port_info);
108
- MPI_Info_set (port_info, " ompi_global_scope" , " true" );
109
- MPI_Info_set (port_info, " ompi_unique" , " true" );
110
- MPI_Unpublish_name (" server" , port_info, port_);
111
- MPI_Close_port (port_);
157
+ if (mode_ == kIntercommunicator ) {
158
+ // Close the intercommunicator.
159
+ MPI_Comm_disconnect (&comm_);
160
+
161
+ // Unpublish and close the port.
162
+ MPI_Info port_info;
163
+ MPI_Info_create (&port_info);
164
+ MPI_Info_set (port_info, " ompi_global_scope" , " true" );
165
+ MPI_Info_set (port_info, " ompi_unique" , " true" );
166
+ MPI_Unpublish_name (" server" , port_info, port_);
167
+ MPI_Close_port (port_);
168
+ }
112
169
}
113
170
114
171
// MPISource::ItemTypeInfo MPISource::getNextItemType() {
@@ -255,9 +312,19 @@ void MPISource::produce(edm::Event& event) {
255
312
}
256
313
257
314
void MPISource::fillDescriptions (edm::ConfigurationDescriptions& descriptions) {
315
+ descriptions.setComment (
316
+ " This module connects to an \" MPIController\" in a separate CMSSW job, receives all Runs, LuminosityBlocks and "
317
+ " Events from the remote process and reproduces them in the local one." );
318
+
258
319
edm::ParameterSetDescription desc;
259
- desc.setComment (" Comunicate with another cmsRun process over MPI." );
260
320
edm::ProducerSourceBase::fillDescription (desc);
321
+ desc.ifValue (
322
+ edm::ParameterDescription<std::string>(" mode" , " CommWorld" , false ),
323
+ ModeDescription[kCommWorld ] >> edm::EmptyGroupDescription () or
324
+ ModeDescription[kIntercommunicator ] >> edm::ParameterDescription<std::string>(" name" , " server" , false ))
325
+ ->setComment (
326
+ " Valid modes are CommWorld (use MPI_COMM_WORLD) and Intercommunicator (use an MPI name server to setup an "
327
+ " intercommunicator)." );
261
328
262
329
descriptions.add (" source" , desc);
263
330
}
0 commit comments