A Discrete-Event Network Simulator
API
granted-time-window-mpi-interface.cc
Go to the documentation of this file.
1 /*
2  * This program is free software; you can redistribute it and/or modify
3  * it under the terms of the GNU General Public License version 2 as
4  * published by the Free Software Foundation;
5  *
6  * This program is distributed in the hope that it will be useful,
7  * but WITHOUT ANY WARRANTY; without even the implied warranty of
8  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
9  * GNU General Public License for more details.
10  *
11  * You should have received a copy of the GNU General Public License
12  * along with this program; if not, write to the Free Software
13  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
14  *
15  * Author: George Riley <riley@ece.gatech.edu>
16  */
17 
24 // This object contains static methods that provide an easy interface
25 // to the necessary MPI information.
26 
28 
29 #include "mpi-interface.h"
30 #include "mpi-receiver.h"
31 
32 #include "ns3/log.h"
33 #include "ns3/net-device.h"
34 #include "ns3/node-list.h"
35 #include "ns3/node.h"
36 #include "ns3/nstime.h"
37 #include "ns3/simulator-impl.h"
38 #include "ns3/simulator.h"
39 
40 #include <iomanip>
41 #include <iostream>
42 #include <list>
43 #include <mpi.h>
44 
45 namespace ns3
46 {
47 
48 NS_LOG_COMPONENT_DEFINE("GrantedTimeWindowMpiInterface");
49 
50 NS_OBJECT_ENSURE_REGISTERED(GrantedTimeWindowMpiInterface);
51 
53 {
54  m_buffer = nullptr;
55  m_request = nullptr;
56 }
57 
59 {
60  delete[] m_buffer;
61 }
62 
63 uint8_t*
65 {
66  return m_buffer;
67 }
68 
69 void
70 SentBuffer::SetBuffer(uint8_t* buffer)
71 {
72  m_buffer = buffer;
73 }
74 
75 MPI_Request*
77 {
78  return &m_request;
79 }
80 
87 std::list<SentBuffer> GrantedTimeWindowMpiInterface::g_pendingTx;
88 
91 MPI_Comm GrantedTimeWindowMpiInterface::g_communicator = MPI_COMM_WORLD;
93 ;
94 
95 TypeId
97 {
98  static TypeId tid =
99  TypeId("ns3::GrantedTimeWindowMpiInterface").SetParent<Object>().SetGroupName("Mpi");
100  return tid;
101 }
102 
103 void
105 {
106  NS_LOG_FUNCTION(this);
107 
108  for (uint32_t i = 0; i < GetSize(); ++i)
109  {
110  delete[] g_pRxBuffers[i];
111  }
112  delete[] g_pRxBuffers;
113  delete[] g_requests;
114 
115  g_pendingTx.clear();
116 }
117 
118 uint32_t
120 {
122  return g_rxCount;
123 }
124 
125 uint32_t
127 {
129  return g_txCount;
130 }
131 
132 uint32_t
134 {
136  return g_sid;
137 }
138 
139 uint32_t
141 {
143  return g_size;
144 }
145 
146 bool
148 {
149  return g_enabled;
150 }
151 
152 MPI_Comm
154 {
156  return g_communicator;
157 }
158 
159 void
160 GrantedTimeWindowMpiInterface::Enable(int* pargc, char*** pargv)
161 {
162  NS_LOG_FUNCTION(this << pargc << pargv);
163 
164  NS_ASSERT(g_enabled == false);
165 
166  // Initialize the MPI interface
167  MPI_Init(pargc, pargv);
168  Enable(MPI_COMM_WORLD);
169  g_mpiInitCalled = true;
170  g_enabled = true;
171 }
172 
173 void
175 {
176  NS_LOG_FUNCTION(this);
177 
178  NS_ASSERT(g_enabled == false);
179 
180  // Standard MPI practice is to duplicate the communicator for
181  // library to use. Library communicates in isolated communication
182  // context.
183  MPI_Comm_dup(communicator, &g_communicator);
184  g_freeCommunicator = true;
185 
186  MPI_Barrier(g_communicator);
187 
188  int mpiSystemId;
189  int mpiSize;
190  MPI_Comm_rank(g_communicator, &mpiSystemId);
191  MPI_Comm_size(g_communicator, &mpiSize);
192  g_sid = mpiSystemId;
193  g_size = mpiSize;
194 
195  g_enabled = true;
196  // Post a non-blocking receive for all peers
197  g_pRxBuffers = new char*[g_size];
198  g_requests = new MPI_Request[g_size];
199  for (uint32_t i = 0; i < GetSize(); ++i)
200  {
201  g_pRxBuffers[i] = new char[MAX_MPI_MSG_SIZE];
202  MPI_Irecv(g_pRxBuffers[i],
204  MPI_CHAR,
205  MPI_ANY_SOURCE,
206  0,
208  &g_requests[i]);
209  }
210 }
211 
212 void
214  const Time& rxTime,
215  uint32_t node,
216  uint32_t dev)
217 {
218  NS_LOG_FUNCTION(this << p << rxTime.GetTimeStep() << node << dev);
219 
220  SentBuffer sendBuf;
221  g_pendingTx.push_back(sendBuf);
222  std::list<SentBuffer>::reverse_iterator i = g_pendingTx.rbegin(); // Points to the last element
223 
224  uint32_t serializedSize = p->GetSerializedSize();
225  uint8_t* buffer = new uint8_t[serializedSize + 16];
226  i->SetBuffer(buffer);
227  // Add the time, dest node and dest device
228  uint64_t t = rxTime.GetInteger();
229  uint64_t* pTime = reinterpret_cast<uint64_t*>(buffer);
230  *pTime++ = t;
231  uint32_t* pData = reinterpret_cast<uint32_t*>(pTime);
232  *pData++ = node;
233  *pData++ = dev;
234  // Serialize the packet
235  p->Serialize(reinterpret_cast<uint8_t*>(pData), serializedSize);
236 
237  // Find the system id for the destination node
238  Ptr<Node> destNode = NodeList::GetNode(node);
239  uint32_t nodeSysId = destNode->GetSystemId();
240 
241  MPI_Isend(reinterpret_cast<void*>(i->GetBuffer()),
242  serializedSize + 16,
243  MPI_CHAR,
244  nodeSysId,
245  0,
247  (i->GetRequest()));
248  g_txCount++;
249 }
250 
251 void
253 {
255 
256  // Poll the non-block reads to see if data arrived
257  while (true)
258  {
259  int flag = 0;
260  int index = 0;
261  MPI_Status status;
262 
263  MPI_Testany(MpiInterface::GetSize(), g_requests, &index, &flag, &status);
264  if (!flag)
265  {
266  break; // No more messages
267  }
268  int count;
269  MPI_Get_count(&status, MPI_CHAR, &count);
270  g_rxCount++; // Count this receive
271 
272  // Get the meta data first
273  uint64_t* pTime = reinterpret_cast<uint64_t*>(g_pRxBuffers[index]);
274  uint64_t time = *pTime++;
275  uint32_t* pData = reinterpret_cast<uint32_t*>(pTime);
276  uint32_t node = *pData++;
277  uint32_t dev = *pData++;
278 
279  Time rxTime(time);
280 
281  count -= sizeof(time) + sizeof(node) + sizeof(dev);
282 
283  Ptr<Packet> p = Create<Packet>(reinterpret_cast<uint8_t*>(pData), count, true);
284 
285  // Find the correct node/device to schedule receive event
286  Ptr<Node> pNode = NodeList::GetNode(node);
287  Ptr<MpiReceiver> pMpiRec = nullptr;
288  uint32_t nDevices = pNode->GetNDevices();
289  for (uint32_t i = 0; i < nDevices; ++i)
290  {
291  Ptr<NetDevice> pThisDev = pNode->GetDevice(i);
292  if (pThisDev->GetIfIndex() == dev)
293  {
294  pMpiRec = pThisDev->GetObject<MpiReceiver>();
295  break;
296  }
297  }
298 
299  NS_ASSERT(pNode && pMpiRec);
300 
301  // Schedule the rx event
303  rxTime - Simulator::Now(),
305  pMpiRec,
306  p);
307 
308  // Re-queue the next read
309  MPI_Irecv(g_pRxBuffers[index],
311  MPI_CHAR,
312  MPI_ANY_SOURCE,
313  0,
315  &g_requests[index]);
316  }
317 }
318 
319 void
321 {
323 
324  std::list<SentBuffer>::iterator i = g_pendingTx.begin();
325  while (i != g_pendingTx.end())
326  {
327  MPI_Status status;
328  int flag = 0;
329  MPI_Test(i->GetRequest(), &flag, &status);
330  std::list<SentBuffer>::iterator current = i; // Save current for erasing
331  i++; // Advance to next
332  if (flag)
333  { // This message is complete
334  g_pendingTx.erase(current);
335  }
336  }
337 }
338 
339 void
341 {
343 
344  if (g_freeCommunicator)
345  {
346  MPI_Comm_free(&g_communicator);
347  g_freeCommunicator = false;
348  }
349 
350  // ns-3 should MPI finalize only if ns-3 was used to initialize
351  if (g_mpiInitCalled)
352  {
353  int flag = 0;
354  MPI_Initialized(&flag);
355  if (flag)
356  {
357  MPI_Finalize();
358  }
359  else
360  {
361  NS_FATAL_ERROR("Cannot disable MPI environment without Initializing it first");
362  }
363  g_mpiInitCalled = false;
364  }
365 
366  g_enabled = false;
367 }
368 
369 } // namespace ns3
static void ReceiveMessages()
Check for received messages complete.
MPI_Comm GetCommunicator() override
Return the communicator used to run ns-3.
static bool g_freeCommunicator
Did ns-3 create the communicator? Have to free it.
uint32_t GetSystemId() override
Get the id number of this rank.
void Disable() override
Clean up the ns-3 parallel communications interface.
static void TestSendComplete()
Check for completed sends.
static bool g_mpiInitCalled
Has MPI Init been called by this interface.
void SendPacket(Ptr< Packet > p, const Time &rxTime, uint32_t node, uint32_t dev) override
Send a packet to a remote node.
static uint32_t g_size
Size of the MPI COM_WORLD group.
static bool g_enabled
Has this interface been enabled.
static std::list< SentBuffer > g_pendingTx
List of pending non-blocking sends.
void Enable(int *pargc, char ***pargv) override
Setup the parallel communication interface.
bool IsEnabled() override
Returns enabled state of parallel environment.
static MPI_Request * g_requests
Pending non-blocking receives.
static uint32_t g_rxCount
Total packets received.
uint32_t GetSize() override
Get the number of ranks used by ns-3.
static char ** g_pRxBuffers
Data buffers for non-blocking reads.
void Destroy() override
Deletes storage used by the parallel environment.
static uint32_t g_txCount
Total packets sent.
static MPI_Comm g_communicator
MPI communicator being used for ns-3 tasks.
static TypeId GetTypeId()
Register this type.
static uint32_t g_sid
System ID (rank) for this task.
static uint32_t GetSize()
Get the number of ranks used by ns-3.
Class to aggregate to a NetDevice if it supports MPI capability.
Definition: mpi-receiver.h:48
void Receive(Ptr< Packet > p)
Direct an incoming packet to the device Receive() method.
Definition: mpi-receiver.cc:51
uint32_t GetSystemId() const
Definition: node.cc:131
uint32_t GetNDevices() const
Definition: node.cc:162
uint32_t GetId() const
Definition: node.cc:117
Ptr< NetDevice > GetDevice(uint32_t index) const
Retrieve the index-th NetDevice associated to this node.
Definition: node.cc:152
static Ptr< Node > GetNode(uint32_t n)
Definition: node-list.cc:251
A base class which provides memory management and object aggregation.
Definition: object.h:89
uint32_t GetSerializedSize() const
Returns number of bytes required for packet serialization.
Definition: packet.cc:610
uint32_t Serialize(uint8_t *buffer, uint32_t maxSize) const
Serialize a packet, tags, and metadata into a byte buffer.
Definition: packet.cc:663
Tracks non-blocking sends.
MPI_Request m_request
The MPI request handle.
static void ScheduleWithContext(uint32_t context, const Time &delay, FUNC f, Ts &&... args)
Schedule an event with the given context.
Definition: simulator.h:587
static Time Now()
Return the current simulation virtual time.
Definition: simulator.cc:199
Simulation virtual time values and global simulation resolution.
Definition: nstime.h:105
int64_t GetInteger() const
Get the raw time value, in the current resolution unit.
Definition: nstime.h:454
int64_t GetTimeStep() const
Get the raw time value, in the current resolution unit.
Definition: nstime.h:444
a unique identifier for an interface.
Definition: type-id.h:60
TypeId SetParent(TypeId tid)
Set the parent TypeId.
Definition: type-id.cc:935
Declaration of classes ns3::SentBuffer and ns3::GrantedTimeWindowMpiInterface.
#define NS_ASSERT(condition)
At runtime, in debugging builds, if this condition is not true, the program prints the source file,...
Definition: assert.h:66
#define NS_FATAL_ERROR(msg)
Report a fatal error with a message and terminate.
Definition: fatal-error.h:179
#define NS_LOG_COMPONENT_DEFINE(name)
Define a Log component with a specific name.
Definition: log.h:202
#define NS_LOG_FUNCTION_NOARGS()
Output the name of the function.
#define NS_LOG_FUNCTION(parameters)
If log level LOG_FUNCTION is enabled, this macro will output all input parameters separated by ",...
#define NS_OBJECT_ENSURE_REGISTERED(type)
Register an Object subclass with the TypeId system.
Definition: object-base.h:46
Declaration of class ns3::MpiInterface.
ns3::MpiReciver declaration, provides an interface to aggregate to MPI-compatible NetDevices.
Every class exported by the ns3 library is enclosed in the ns3 namespace.
const uint32_t MAX_MPI_MSG_SIZE
maximum MPI message size for easy buffer creation