A Discrete-Event Network Simulator
API
null-message-mpi-interface.cc
Go to the documentation of this file.
1 /*
2  * Copyright 2013. Lawrence Livermore National Security, LLC.
3  *
4  * This program is free software; you can redistribute it and/or modify
5  * it under the terms of the GNU General Public License version 2 as
6  * published by the Free Software Foundation;
7  *
8  * This program is distributed in the hope that it will be useful,
9  * but WITHOUT ANY WARRANTY; without even the implied warranty of
10  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11  * GNU General Public License for more details.
12  *
13  * You should have received a copy of the GNU General Public License
14  * along with this program; if not, write to the Free Software
15  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
16  *
17  * Author: Steven Smith <smith84@llnl.gov>
18  *
19  */
20 
28 
31 #include "remote-channel-bundle.h"
32 
33 #include "ns3/log.h"
34 #include "ns3/mpi-receiver.h"
35 #include "ns3/net-device.h"
36 #include "ns3/node-list.h"
37 #include "ns3/node.h"
38 #include "ns3/nstime.h"
39 #include "ns3/simulator.h"
40 
41 #include <iomanip>
42 #include <iostream>
43 #include <list>
44 #include <mpi.h>
45 
46 namespace ns3
47 {
48 
49 NS_LOG_COMPONENT_DEFINE("NullMessageMpiInterface");
50 
51 NS_OBJECT_ENSURE_REGISTERED(NullMessageMpiInterface);
52 
61 {
62  public:
65 
69  uint8_t* GetBuffer();
73  void SetBuffer(uint8_t* buffer);
77  MPI_Request* GetRequest();
78 
79  private:
83  uint8_t* m_buffer;
84 
88  MPI_Request m_request;
89 };
90 
95 const uint32_t NULL_MESSAGE_MAX_MPI_MSG_SIZE = 2000;
96 
98 {
99  m_buffer = nullptr;
100  m_request = nullptr;
101 }
102 
104 {
105  delete[] m_buffer;
106 }
107 
108 uint8_t*
110 {
111  return m_buffer;
112 }
113 
114 void
116 {
117  m_buffer = buffer;
118 }
119 
120 MPI_Request*
122 {
123  return &m_request;
124 }
125 
126 uint32_t NullMessageMpiInterface::g_sid = 0;
131 
132 std::list<NullMessageSentBuffer> NullMessageMpiInterface::g_pendingTx;
133 
134 MPI_Comm NullMessageMpiInterface::g_communicator = MPI_COMM_WORLD;
138 
139 TypeId
141 {
142  static TypeId tid =
143  TypeId("ns3::NullMessageMpiInterface").SetParent<Object>().SetGroupName("Mpi");
144  return tid;
145 }
146 
148 {
149  NS_LOG_FUNCTION(this);
150 }
151 
153 {
154  NS_LOG_FUNCTION(this);
155 }
156 
157 void
159 {
160  NS_LOG_FUNCTION(this);
161 }
162 
163 uint32_t
165 {
167  return g_sid;
168 }
169 
170 uint32_t
172 {
174  return g_size;
175 }
176 
177 MPI_Comm
179 {
181  return g_communicator;
182 }
183 
184 bool
186 {
187  return g_enabled;
188 }
189 
190 void
191 NullMessageMpiInterface::Enable(int* pargc, char*** pargv)
192 {
193  NS_LOG_FUNCTION(this << *pargc);
194 
195  NS_ASSERT(g_enabled == false);
196 
197  // Initialize the MPI interface
198  MPI_Init(pargc, pargv);
199  Enable(MPI_COMM_WORLD);
200  g_mpiInitCalled = true;
201 }
202 
203 void
204 NullMessageMpiInterface::Enable(MPI_Comm communicator)
205 {
206  NS_LOG_FUNCTION(this);
207 
208  NS_ASSERT(g_enabled == false);
209 
210  // Standard MPI practice is to duplicate the communicator for
211  // library to use. Library communicates in isolated communication
212  // context.
213  MPI_Comm_dup(communicator, &g_communicator);
214  g_freeCommunicator = true;
215 
216  // SystemId and Size are unit32_t in interface but MPI uses int so convert.
217  int mpiSystemId;
218  int mpiSize;
219  MPI_Comm_rank(g_communicator, &mpiSystemId);
220  MPI_Comm_size(g_communicator, &mpiSize);
221 
222  g_sid = mpiSystemId;
223  g_size = mpiSize;
224 
225  g_enabled = true;
226 
227  MPI_Barrier(g_communicator);
228 }
229 
230 void
232 {
235 
237 
238  // Post a non-blocking receive for all peers
239  g_requests = new MPI_Request[g_numNeighbors];
240  g_pRxBuffers = new char*[g_numNeighbors];
241  int index = 0;
242  for (uint32_t rank = 0; rank < g_size; ++rank)
243  {
245  if (bundle)
246  {
247  g_pRxBuffers[index] = new char[NULL_MESSAGE_MAX_MPI_MSG_SIZE];
248  MPI_Irecv(g_pRxBuffers[index],
250  MPI_CHAR,
251  rank,
252  0,
254  &g_requests[index]);
255  ++index;
256  }
257  }
258 }
259 
260 void
261 NullMessageMpiInterface::SendPacket(Ptr<Packet> p, const Time& rxTime, uint32_t node, uint32_t dev)
262 {
263  NS_LOG_FUNCTION(this << p << rxTime.GetTimeStep() << node << dev);
264 
266 
267  // Find the system id for the destination node
268  Ptr<Node> destNode = NodeList::GetNode(node);
269  uint32_t nodeSysId = destNode->GetSystemId();
270 
271  NullMessageSentBuffer sendBuf;
272  g_pendingTx.push_back(sendBuf);
273  std::list<NullMessageSentBuffer>::reverse_iterator iter =
274  g_pendingTx.rbegin(); // Points to the last element
275 
276  uint32_t serializedSize = p->GetSerializedSize();
277  uint32_t bufferSize = serializedSize + (2 * sizeof(uint64_t)) + (2 * sizeof(uint32_t));
278  uint8_t* buffer = new uint8_t[bufferSize];
279  iter->SetBuffer(buffer);
280  // Add the time, dest node and dest device
281  uint64_t t = rxTime.GetInteger();
282  uint64_t* pTime = reinterpret_cast<uint64_t*>(buffer);
283  *pTime++ = t;
284 
285  Time guarantee_update =
287  *pTime++ = guarantee_update.GetTimeStep();
288 
289  uint32_t* pData = reinterpret_cast<uint32_t*>(pTime);
290  *pData++ = node;
291  *pData++ = dev;
292  // Serialize the packet
293  p->Serialize(reinterpret_cast<uint8_t*>(pData), serializedSize);
294 
295  MPI_Isend(reinterpret_cast<void*>(iter->GetBuffer()),
296  bufferSize,
297  MPI_CHAR,
298  nodeSysId,
299  0,
301  (iter->GetRequest()));
302 
304 }
305 
306 void
309 {
310  NS_LOG_FUNCTION(guarantee_update.GetTimeStep() << bundle);
311 
313 
314  NullMessageSentBuffer sendBuf;
315  g_pendingTx.push_back(sendBuf);
316  std::list<NullMessageSentBuffer>::reverse_iterator iter =
317  g_pendingTx.rbegin(); // Points to the last element
318 
319  uint32_t bufferSize = 2 * sizeof(uint64_t) + 2 * sizeof(uint32_t);
320  uint8_t* buffer = new uint8_t[bufferSize];
321  iter->SetBuffer(buffer);
322  // Add the time, dest node and dest device
323  uint64_t* pTime = reinterpret_cast<uint64_t*>(buffer);
324  *pTime++ = 0;
325  *pTime++ = guarantee_update.GetInteger();
326  uint32_t* pData = reinterpret_cast<uint32_t*>(pTime);
327  *pData++ = 0;
328  *pData++ = 0;
329 
330  // Find the system id for the destination MPI rank
331  uint32_t nodeSysId = bundle->GetSystemId();
332 
333  MPI_Isend(reinterpret_cast<void*>(iter->GetBuffer()),
334  bufferSize,
335  MPI_CHAR,
336  nodeSysId,
337  0,
339  (iter->GetRequest()));
340 }
341 
342 void
344 {
346 
347  ReceiveMessages(true);
348 }
349 
350 void
352 {
354 
355  ReceiveMessages(false);
356 }
357 
358 void
360 {
361  NS_LOG_FUNCTION(blocking);
362 
364 
365  // stop flag set to true when no more messages are found to
366  // process.
367  bool stop = false;
368 
369  if (!g_numNeighbors)
370  {
371  // Not communicating with anyone.
372  return;
373  }
374 
375  do
376  {
377  int messageReceived = 0;
378  int index = 0;
379  MPI_Status status;
380 
381  if (blocking)
382  {
383  MPI_Waitany(g_numNeighbors, g_requests, &index, &status);
384  messageReceived = 1; /* Wait always implies message was received */
385  stop = true;
386  }
387  else
388  {
389  MPI_Testany(g_numNeighbors, g_requests, &index, &messageReceived, &status);
390  }
391 
392  if (messageReceived)
393  {
394  int count;
395  MPI_Get_count(&status, MPI_CHAR, &count);
396 
397  // Get the meta data first
398  uint64_t* pTime = reinterpret_cast<uint64_t*>(g_pRxBuffers[index]);
399  uint64_t time = *pTime++;
400  uint64_t guaranteeUpdate = *pTime++;
401 
402  uint32_t* pData = reinterpret_cast<uint32_t*>(pTime);
403  uint32_t node = *pData++;
404  uint32_t dev = *pData++;
405 
406  Time rxTime(time);
407 
408  // rxtime == 0 means this is a Null Message
409  if (rxTime > Time(0))
410  {
411  count -= sizeof(time) + sizeof(guaranteeUpdate) + sizeof(node) + sizeof(dev);
412 
413  Ptr<Packet> p = Create<Packet>(reinterpret_cast<uint8_t*>(pData), count, true);
414 
415  // Find the correct node/device to schedule receive event
416  Ptr<Node> pNode = NodeList::GetNode(node);
417  Ptr<MpiReceiver> pMpiRec = nullptr;
418  uint32_t nDevices = pNode->GetNDevices();
419  for (uint32_t i = 0; i < nDevices; ++i)
420  {
421  Ptr<NetDevice> pThisDev = pNode->GetDevice(i);
422  if (pThisDev->GetIfIndex() == dev)
423  {
424  pMpiRec = pThisDev->GetObject<MpiReceiver>();
425  break;
426  }
427  }
428  NS_ASSERT(pNode && pMpiRec);
429 
430  // Schedule the rx event
432  rxTime - Simulator::Now(),
434  pMpiRec,
435  p);
436  }
437 
438  // Update guarantee time for both packet receives and Null Messages.
440  NS_ASSERT(bundle);
441 
442  bundle->SetGuaranteeTime(Time(guaranteeUpdate));
443 
444  // Re-queue the next read
445  MPI_Irecv(g_pRxBuffers[index],
447  MPI_CHAR,
448  status.MPI_SOURCE,
449  0,
451  &g_requests[index]);
452  }
453  else
454  {
455  // if non-blocking and no message received in testany then stop message loop
456  stop = true;
457  }
458  } while (!stop);
459 }
460 
461 void
463 {
465 
467 
468  std::list<NullMessageSentBuffer>::iterator iter = g_pendingTx.begin();
469  while (iter != g_pendingTx.end())
470  {
471  MPI_Status status;
472  int flag = 0;
473  MPI_Test(iter->GetRequest(), &flag, &status);
474  std::list<NullMessageSentBuffer>::iterator current = iter; // Save current for erasing
475  ++iter; // Advance to next
476  if (flag)
477  { // This message is complete
478  g_pendingTx.erase(current);
479  }
480  }
481 }
482 
483 void
485 {
486  NS_LOG_FUNCTION(this);
487 
488  if (g_enabled)
489  {
490  for (std::list<NullMessageSentBuffer>::iterator iter = g_pendingTx.begin();
491  iter != g_pendingTx.end();
492  ++iter)
493  {
494  MPI_Cancel(iter->GetRequest());
495  MPI_Request_free(iter->GetRequest());
496  }
497 
498  for (uint32_t i = 0; i < g_numNeighbors; ++i)
499  {
500  MPI_Cancel(&g_requests[i]);
501  MPI_Request_free(&g_requests[i]);
502  }
503 
504  for (uint32_t i = 0; i < g_numNeighbors; ++i)
505  {
506  delete[] g_pRxBuffers[i];
507  }
508  delete[] g_pRxBuffers;
509  delete[] g_requests;
510 
511  g_pendingTx.clear();
512 
513  if (g_freeCommunicator)
514  {
515  MPI_Comm_free(&g_communicator);
516  g_freeCommunicator = false;
517  }
518 
519  if (g_mpiInitCalled)
520  {
521  int flag = 0;
522  MPI_Initialized(&flag);
523  if (flag)
524  {
525  MPI_Finalize();
526  }
527  else
528  {
529  NS_FATAL_ERROR("Cannot disable MPI environment without Initializing it first");
530  }
531  }
532 
533  g_enabled = false;
534  g_mpiInitCalled = false;
535  }
536  else
537  {
538  NS_FATAL_ERROR("Cannot disable MPI environment without Initializing it first");
539  }
540 }
541 
542 } // namespace ns3
Class to aggregate to a NetDevice if it supports MPI capability.
Definition: mpi-receiver.h:48
void Receive(Ptr< Packet > p)
Direct an incoming packet to the device Receive() method.
Definition: mpi-receiver.cc:51
uint32_t GetSystemId() const
Definition: node.cc:131
uint32_t GetNDevices() const
Definition: node.cc:162
uint32_t GetId() const
Definition: node.cc:117
Ptr< NetDevice > GetDevice(uint32_t index) const
Retrieve the index-th NetDevice associated to this node.
Definition: node.cc:152
static Ptr< Node > GetNode(uint32_t n)
Definition: node-list.cc:251
static bool g_mpiInitCalled
Has MPI Init been called by this interface.
void Destroy() override
Deletes storage used by the parallel environment.
static void ReceiveMessagesBlocking()
Blocking message receive.
void SendPacket(Ptr< Packet > p, const Time &rxTime, uint32_t node, uint32_t dev) override
Send a packet to a remote node.
bool IsEnabled() override
Returns enabled state of parallel environment.
uint32_t GetSize() override
Get the number of ranks used by ns-3.
static MPI_Comm g_communicator
MPI communicator being used for ns-3 tasks.
static TypeId GetTypeId()
Register this type.
static void ReceiveMessagesNonBlocking()
Non-blocking check for received messages complete.
MPI_Comm GetCommunicator() override
Return the communicator used to run ns-3.
static MPI_Request * g_requests
Pending non-blocking receives.
static void SendNullMessage(const Time &guaranteeUpdate, Ptr< RemoteChannelBundle > bundle)
Send a Null Message to across the specified bundle.
static void TestSendComplete()
Check for completed sends.
static void ReceiveMessages(bool blocking=false)
Check for received messages complete.
void Enable(int *pargc, char ***pargv) override
Setup the parallel communication interface.
static bool g_enabled
Has this interface been enabled.
static char ** g_pRxBuffers
Data buffers for non-blocking receives.
static void InitializeSendReceiveBuffers()
Initialize send and receive buffers.
static uint32_t g_sid
System ID (rank) for this task.
static uint32_t g_size
Size of the MPI COM_WORLD group.
void Disable() override
Clean up the ns-3 parallel communications interface.
static std::list< NullMessageSentBuffer > g_pendingTx
List of pending non-blocking sends.
static bool g_freeCommunicator
Did we create the communicator? Have to free it.
uint32_t GetSystemId() override
Get the id number of this rank.
static uint32_t g_numNeighbors
Number of neighbor tasks, tasks that this task shares a link with.
Non-blocking send buffers for Null Message implementation.
MPI_Request m_request
MPI request posted for the send.
uint8_t * m_buffer
Buffer for send.
static NullMessageSimulatorImpl * GetInstance()
Time CalculateGuaranteeTime(uint32_t systemId)
void RescheduleNullMessageEvent(Ptr< RemoteChannelBundle > bundle)
A base class which provides memory management and object aggregation.
Definition: object.h:89
uint32_t GetSerializedSize() const
Returns number of bytes required for packet serialization.
Definition: packet.cc:610
uint32_t Serialize(uint8_t *buffer, uint32_t maxSize) const
Serialize a packet, tags, and metadata into a byte buffer.
Definition: packet.cc:663
Smart pointer class similar to boost::intrusive_ptr.
Definition: ptr.h:78
static Ptr< RemoteChannelBundle > Find(uint32_t systemId)
Get the bundle corresponding to a remote rank.
static std::size_t Size()
Get the number of ns-3 channels in this bundle.
static void ScheduleWithContext(uint32_t context, const Time &delay, FUNC f, Ts &&... args)
Schedule an event with the given context.
Definition: simulator.h:587
static Time Now()
Return the current simulation virtual time.
Definition: simulator.cc:199
Simulation virtual time values and global simulation resolution.
Definition: nstime.h:105
int64_t GetInteger() const
Get the raw time value, in the current resolution unit.
Definition: nstime.h:454
int64_t GetTimeStep() const
Get the raw time value, in the current resolution unit.
Definition: nstime.h:444
a unique identifier for an interface.
Definition: type-id.h:60
TypeId SetParent(TypeId tid)
Set the parent TypeId.
Definition: type-id.cc:935
#define NS_ASSERT(condition)
At runtime, in debugging builds, if this condition is not true, the program prints the source file,...
Definition: assert.h:66
#define NS_FATAL_ERROR(msg)
Report a fatal error with a message and terminate.
Definition: fatal-error.h:179
#define NS_LOG_COMPONENT_DEFINE(name)
Define a Log component with a specific name.
Definition: log.h:202
#define NS_LOG_FUNCTION_NOARGS()
Output the name of the function.
#define NS_LOG_FUNCTION(parameters)
If log level LOG_FUNCTION is enabled, this macro will output all input parameters separated by ",...
#define NS_OBJECT_ENSURE_REGISTERED(type)
Register an Object subclass with the TypeId system.
Definition: object-base.h:46
void(* Time)(Time oldValue, Time newValue)
TracedValue callback signature for Time.
Definition: nstime.h:848
Every class exported by the ns3 library is enclosed in the ns3 namespace.
const uint32_t NULL_MESSAGE_MAX_MPI_MSG_SIZE
maximum MPI message size for easy buffer creation
Declaration of classes ns3::NullMessageSentBuffer and ns3::NullMessageMpiInterface.
Declaration of class ns3::NullMessageSimulatorImpl.
Declaration of class ns3::RemoteChannelBundleManager.
Declaration of class ns3::RemoteChannelBundle.