udp_input.cpp 10.4 KB
Newer Older
jcaceres's avatar
jcaceres committed
1
2
3
4
5
6
7
#include "udp_input.h"
#include "networkInfo.h"
#include "stream.h"
#include "unistd.h"
#include <stdlib.h>
#include <stdio.h>
#include <iostream.h>
8
#include <QHostInfo>//***JPC Port to qt4*****************
jcaceres's avatar
jcaceres committed
9
10
11
extern QString *IPv4Addr (char *namebuf);
extern int set_fifo_priority (bool half);

12
13
//-------------------------------------------------------------------------------
//-------------------------------------------------------------------------------
jcaceres's avatar
jcaceres committed
14
UDPInput::UDPInput (NetworkInfoT netInfo, AudioInfoT audInfo):
15
  InputPlugin ("UDP Input"), netInfo (netInfo), audInfo (audInfo)
jcaceres's avatar
jcaceres committed
16
{
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
  bpp = netInfo->getDataBytesPerPacket ();
  cout << "bpp = " << bpp << endl;
  char localhostbuf[100];
  has_peer = false;
  _rcvr = NULL;
  //sock = new Q3SocketDevice (Q3SocketDevice::Datagram);	// for an unreliable UDP socket//***JPC Port to qt4*****************
  //sock->setAddressReusable(true);//***JPC Port to qt4*****************
  sock = new QUdpSocket;//***JPC Port to qt4*****************
  peerAddress = new QHostAddress;
  if (gethostname (localhostbuf, 99))
    {
      perror ("gethostname");
      exit ();
    }

  cout <<"Local Host Name: " << QString(QHostInfo::localHostName ()).latin1() << endl;//***JPC Port to qt4*****************
  //cout << "Rx buff = " << sock->receiveBufferSize () << endl;//***JPC Port to qt4*****************
  QHostAddress *ha = new QHostAddress ();//***JPC Port to qt4*****************
  QString *s = IPv4Addr (localhostbuf);	// dotted integer from name//***JPC Port to qt4*****************
  ha->setAddress (*s);//***JPC Port to qt4*****************
  cout << "INPUT PORT: " << netInfo->getInPort () << endl;

  //if (!(sock->bind (*ha, netInfo->getInPort ())))//***JPC Port to qt4*****************
  if (!(sock->bind (*ha, netInfo->getInPort (), QUdpSocket::ShareAddress ) ) )//***JPC Port to qt4*****************
    {
      perror ("bind\n");
      exit ();
    }
  if (!sock->isValid ())
    {
      cout << "socket creation error " << endl;
    }
jcaceres's avatar
jcaceres committed
49

50
51
52
53
54
55
56
57
  packetIndex = 0;
  wholeSize = sizeof (nsHeader) + (netInfo->getChunksPerPacket () * bpp) + 1;
  packetData = new char[wholeSize];
  memset (packetData, 0, wholeSize);
  numRedundantBuffers = netInfo->getChunksPerPacket() - 1;
  maxPacketIndex = netInfo->getMaxSeq();
  cout << endl << "UDPInput binding to " << localhostbuf
       << " port " << netInfo->getInPort () << endl;
jcaceres's avatar
jcaceres committed
58
59
60
}


61
62
63
64
//-------------------------------------------------------------------------------
//-------------------------------------------------------------------------------
UDPInput::~UDPInput()
{
65
66
  //delete sock;
  //delete peerAddress;
67
68
69
70
71
72
73
74
}


//-------------------------------------------------------------------------------
/*! cout that the UDPInput thread has started
 *
 */
//-------------------------------------------------------------------------------
jcaceres's avatar
jcaceres committed
75
76
77
void
UDPInput::Initial ()
{
78
  cout << "Started UDPInput thread" << endl;
jcaceres's avatar
jcaceres committed
79
80
}

81
82
83

//-------------------------------------------------------------------------------
/* Receive a buffer from the UDPSocket.
jcaceres's avatar
jcaceres committed
84
85
86
 * @param buf is the location at which to store the buffer.
 * @param le is the size of the buffer to be received.
 */
87
//-------------------------------------------------------------------------------
jcaceres's avatar
jcaceres committed
88
89
90
int
UDPInput::rcv (char *buf)
{
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
  //int	rv = sock->readBlock (packetData, wholeSize);//***JPC Port to qt4*****************
  //cout << "###############################################" << endl;
  int	rv = sock->readDatagram (packetData, wholeSize, peerAddress);//***JPC Port to qt4*****************
  //cout << "###############################################" << rv << endl;
  //cout << "***Packet Size***: " << rv << endl;//***JPC Port to qt4*****************

  char *datapart;
  packetIndex = ((nsHeader *) packetData)->i_seq;
  datapart = packetData + sizeof (nsHeader) + 
    ((packetIndex % ((nsHeader *) packetData)->i_copies) * bpp);
  memcpy (buf, datapart, bpp);
  /*
    ((nsHeader *) packetData)->i_type = 0;
    ((nsHeader *) packetData)->i_nframes = 1;
    ((nsHeader *) packetData)->i_nchans = 2;
    ((nsHeader *) packetData)->i_copies = n;
    ((nsHeader *) packetData)->i_cksum = 4;
    ((nsHeader *) packetData)->i_seq = packetIndex;
    ((nsHeader *) packetData)->i_rtnseq = 6;
    ((nsHeader *) packetData)->i_rtt = 7;
  */
  if (rv < 0)
    {
      cerr << "bad read..." << endl;
    }
  return packetIndex;
jcaceres's avatar
jcaceres committed
117
118
}

119
120
121

//-------------------------------------------------------------------------------
//-------------------------------------------------------------------------------
jcaceres's avatar
jcaceres committed
122
123
124
int
UDPInput::rcvz1 (char *bufz1, int z)
{
125
126
127
128
129
130
131
132
133
134
135
  char *datapart;
  packetIndex = ((nsHeader *) packetData)->i_seq-z;
  if (packetIndex < 0) 
    {
      packetIndex += maxPacketIndex;
      //		cout << "backed below 0 (a good thing)" << endl;
    }
  datapart = packetData + sizeof (nsHeader) + 
    ((packetIndex % ((nsHeader *) packetData)->i_copies) * bpp);
  memcpy (bufz1, datapart, bpp);
  return packetIndex;
jcaceres's avatar
jcaceres committed
136
137
}

138
139
140

//-------------------------------------------------------------------------------
/* The main run loop.  Calls waitForConnection().  Once connected,
jcaceres's avatar
jcaceres committed
141
142
143
 * while running is true, it checks if the socket has a new buffer.
 * If it does, it writes the buffer to the stream.
 */
144
//-------------------------------------------------------------------------------
jcaceres's avatar
jcaceres committed
145
146
147
void
UDPInput::stop ()
{
148
  _running = false;
jcaceres's avatar
jcaceres committed
149
150
}

151
152
//-------------------------------------------------------------------------------
//-------------------------------------------------------------------------------
jcaceres's avatar
jcaceres committed
153
154
155
void
UDPInput::run ()
{
156
157
158
159
  _running = true;
  int seq;
  char *buf = (char *) new char[bpp];
  char *bufz1 = (char *) new char[bpp];
jcaceres's avatar
jcaceres committed
160

161
162
163
164
165
166
167
168
169
  cout << "UDP Input waiting for peer." << endl;
  while (has_peer == false)
    {
      //cout << "pendingDatagramSize!!!!!!" << sock->pendingDatagramSize () << endl;//***JPC Port to qt4*****************
      //cout << "hasPendingDatagrams!!!!!!" << sock->hasPendingDatagrams () << endl;//***JPC Port to qt4*****************
      //}
	  
      //if (sock->bytesAvailable () >= wholeSize)	// not an error//***JPC Port to qt4*****************
      if (sock->pendingDatagramSize () >= wholeSize)	// not an error//***JPC Port to qt4*****************
jcaceres's avatar
jcaceres committed
170
	{
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
	  //cout <<"wholeSize = " << wholeSize << " " <<sock->bytesAvailable ()<< endl;// not an error//***JPC Port to qt4*****************
	  cout <<"wholeSize = " << wholeSize << " " <<sock->pendingDatagramSize ()<< endl;// not an error//***JPC Port to qt4*****************
	  //sock->readBlock (buf, wholeSize);
	  this->rcv (buf);
	  has_peer = true;	// really rcvd something
	  cout << "UDP Input Connected!" << endl;			
	} else
	msleep (100);
    }
  msleep (10);
  cout << "Started UDP Input Run" << endl;
  set_fifo_priority (false);
  bool timeout;
  int ret;
  unsigned long now = 0;
  unsigned long lastTickTime = usecTime ();
  int ctr = 0;
  double max = 0.0;	
  int gap;
  double gapAvg = 0.0;
  while (_running)
    {
      // If timeout is non-null and no error occurred 
      // (i.e. it does not return -1): this function sets *timeout to TRUE, 
      // if the reason for returning was that the timeout was reached; 
      // otherwise it sets *timeout to FALSE. This is useful to find out 
      // if the peer closed the connection.
      //ret = (sock->waitForMore (30, &timeout));//***JPC Port to qt4*****************
199
200
201
202
203
204
205
206

      //***********************************************************
      //###########################################################
      // THE SEGMENTATION FAAULT BUG IS IN THE FOLLOWING LINE
      // IT SEEMS THAT QT4 IS MESSING UP SOMETHING 
      //###########################################################
      //***********************************************************
      cout << "BEFORE SEGFAULT LINE" << endl;
207
      timeout = sock->waitForReadyRead(30);//***JPC Port to qt4*****************
208
209
210
211
212
      cout << "AFTER SEGFAULT LINE" << endl;
      //***********************************************************
      //##########################################################
      //***********************************************************

213
214
215
216
217
218
      //if (ret == -1)//***JPC Port to qt4*****************
      //cerr << "udp in sock problems..." << endl;//***JPC Port to qt4*****************
      //else if (timeout)//***JPC Port to qt4*****************
      if (!timeout)//***JPC Port to qt4*****************
	cerr << "udp in waited too long (more than 30ms)..." << endl;
      else 
jcaceres's avatar
jcaceres committed
219
	{
220
221
222
223
224
	  seq = this->rcv (buf);
	  if (stream == NULL)
	    {
	      cerr << "ERROR: UDPInput has no sream to write to!" << endl;
	    }
jcaceres's avatar
jcaceres committed
225
			
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
	  int z = numRedundantBuffers;
	  while (z)
	    {
	      int zseq = this->rcvz1 (bufz1, z);
	      stream->writeRedundant (bufz1, key, z, zseq);
	      z--;
	    }
	  gap = stream->writeRedundant (buf, key, 0, seq);
	  //		cout << "writePosition " << gap <<"\t\t";
	  /*
	    now = usecTime ();
	    ctr++;
	    if (ctr == 40)
	    {
	    ctr = 0;
	    gapAvg = gapAvg / 40.0;
	    //		plotVal (gapAvg);
	    //		plotVal (max);
	    gapAvg = 0.0;
	    max = 0.0;
	    } else {
	    double xxx = (((double)now - (double)lastTickTime)/1000000.0);
	    //			if (xxx>max) max = xxx;
	    max += xxx;
	    gapAvg += ((double)gap);
	    }
	    lastTickTime = now;
	  */
	  //			stream->write (buf, key);
jcaceres's avatar
jcaceres committed
255
	}
256
257
    }
  cout << "UDP Input stop" << endl;
jcaceres's avatar
jcaceres committed
258
259
}

260
261
262

//-------------------------------------------------------------------------------
//-------------------------------------------------------------------------------
jcaceres's avatar
jcaceres committed
263
264
void UDPInput::plotVal (double v)
{
265
266
267
268
269
    if(_rcvr!=NULL)
    {
    ThreadCommEvent *e = new ThreadCommEvent (-1.0, v, 0.0);
    QApplication::postEvent (_rcvr, e);	// to app event loop
    }
jcaceres's avatar
jcaceres committed
270
}
271
272
273
274


//-------------------------------------------------------------------------------
//-------------------------------------------------------------------------------
jcaceres's avatar
jcaceres committed
275
276
bool UDPInput::hasPeer ()
{
277
  return has_peer;
jcaceres's avatar
jcaceres committed
278
279
280
}


281
282
283
284
285
//-------------------------------------------------------------------------------
/* Returns the address of the peer to which the socket is connected
 *
 */
//-------------------------------------------------------------------------------
jcaceres's avatar
jcaceres committed
286
287
QHostAddress UDPInput::peer ()
{
288
289
290
291
292
293
294
295
  cout << "#################RETURNING PEEEEERRRRRRRRR#############################" << endl;
  sock->readDatagram (packetData, wholeSize, peerAddress);//***JPC Port to qt4*****************
  cout << (*peerAddress).toString().latin1() << endl;
  //cout << sock->state() << endl;
  //cout << sock->peerName().latin1() << endl;
  //cout << "RETURNING 23232323**************** *********" << endl;
  //return sock->peerAddress ();
  return (*peerAddress);
jcaceres's avatar
jcaceres committed
296
}