CSL  6.0
RemoteStream.cpp
Go to the documentation of this file.
1 //
2 // RemoteStream.cpp -- a frame stream that sends RFS requests to its "server" to get buffers.
3 // See the copyright notice and acknowledgment of authors in the file COPYRIGHT
4 //
5 
6 #include "RemoteStream.h"
7 
8 #ifndef CSL_WINDOWS
9 #include <sys/param.h>
10 #endif
11 
12 using namespace csl;
13 
14 #ifdef RS_TIMING
15 static struct timeval then, now; // used for getting the real time
16 static long timeVals, thisSec, timeSum; // for printing run-time statistics
17 #endif
18 
19 // RemoteStream Constructor: socket-code heavy lifting
20 // Open up a remote frame stream using a socket to the given remote "server" ip/host
21 
22 RemoteStream::RemoteStream(char * server_name, unsigned short server_port, unsigned chans, unsigned bufSize) {
23  unsigned waitTime = (unsigned) (1000000.0f * (float) bufSize / (float) CGestalt::sample_rate());
24  mNumChannels = chans;
25  mBufferSize = bufSize;
26  mCurrentBuffer = 0;
27  mCurrentFrame = 0;
28 #ifdef CSL_WINDOWS
29  WSADATA localWSA;
30  if (WSAStartup(MAKEWORD(1,1),&localWSA)!= 0)
31  perror("Couldn't do WSAStartup");
32 #endif
33  // set up the packet storage buffers
34  for (int i = 0; i < 2; i++) {
35  mIoBuffers[i] = (sample *) malloc(RS_RESPONSE_PACKET_SIZE);
36  }
37  // call the setup methods
38  if (initSockets(serverName, serverPort))
39  goto error;
40  initPacket();
41  if (connectToServer())
42  goto error;
43  // Now set up the background server semaphore and thread
44  mServerSemaphore = (sem_t *) sem_open("RFS_read_sem", O_CREAT, 0644, 0);
45  if (mServerSemaphore == (sem_t *) SEM_FAILED)
46  perror("RFS: sem_open");
47  // Create the reader thread
48  CSL_CreateThread(RS_read_loop, (void *) this);
49  // Now try to fill the input input buffers
50  for (unsigned i = 0; i < 2; i++) {
51  sem_post(mServerSemaphore); // Trigger the server reader thread
52  sleep_usec(waitTime); // sleep
54  }
55  return;
56 error:
57  logMsg(kLogError, "RemoteStream socket open error");
58  if (mSocket)
59  if (closesocket(mSocket) < 0)
60  perror("Close mSocket");
61  mSocket = -1;
62 }
63 
64 // Now for the socket creation
65 
66 int RemoteStream::initSockets(char * serverName, unsigned short serverPort) {
67  struct hostent * he;
68  // Set up the background thread's semaphore
69  mServerSemaphore = (sem_t *) sem_open("RFS_read_sem", O_CREAT, 0644, 0);
70  if (mServerSemaphore == (sem_t *) SEM_FAILED)
71  perror("RemoteStream: sem_open");
72 
73 #ifdef CSL_DEBUG
74  logMsg("Open RemoteStream output socket"); // Open request socket
75 #endif
76 
77  if ((mSocket = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
78  perror("RemoteStream: socket");
79  return 1;
80  }
81  if ((he = gethostbyname(serverName)) == NULL) { // get the server's IP host info
82  perror("\tRemoteStream: gethostbyname");
83  return 1;
84  }
85  memset((char *) &mServerAddr, 0, sizeof(mServerAddr)); // Set up server address structure
86  mServerAddr.sin_family = AF_INET;
87  mServerAddr.sin_port = htons(serverPort);
88  mServerAddr.sin_addr = *((struct in_addr *) he->h_addr);
89  logMsg("RemoteStream open output socket to %s:%d (= %d) (sz = %d)",
90  inet_ntoa(mServerAddr.sin_addr), serverPort, mSocket, mBufferSize);
91  return 0;
92 }
93 
94 // Set up request packet header
95 
97  unsigned char * bString;
98  // Write the structure into the buffer
102  for (int i = 0; i < 2; i++) {
103  bString = (unsigned char *) mIoBuffers[i];
104  memcpy(bString, &mHeader, RS_PACKET_SIZE); // Copy the header to the packets
105  }
106 }
107 
108 // Now send out the "introduction" packet
109 
111  CSL_RS_MSG * packetHeader;
112  ssize_t xferCnt;
113  unsigned command;
114  // connect the socket to the server
115  if (connect(mSocket, (struct sockaddr *) & mServerAddr, sizeof(mServerAddr)) < 0) {
116  perror("\tRemoteStream connect");
117  return 1;
118  } // write the intro packet
119  xferCnt = write(mSocket, (char *) mIoBuffers[0], RS_PACKET_SIZE);
120  if (xferCnt != RS_PACKET_SIZE) {
121  perror("\tRemoteStream intro packet write");
122  return 1;
123  } // read the response
124  xferCnt = read(mSocket, (char *) mIoBuffers[0], RS_PACKET_SIZE);
125  if (xferCnt != RS_PACKET_SIZE) {
126  logMsg(kLogError, "RemoteStream: intro packet read (%d)", xfer_cnt);
127  perror("RemoteStream connect");
128  return 1;
129  } // check the response packet
130  packetHeader = (CSL_RS_MSG *) mIoBuffers[0]; // Cast buffer into a packet header
131  command = packetHeader->magic;
132  if ((command & 0xffffff00) != RS_PACKET_MAGIC) { // Check header fields
133  logMsg(kLogError, "RemoteStream: bad request magic number: %x\n", command);
134  return 1;
135  }
136  logMsg("RemoteStream received server confirmation");
137  return 0;
138 }
139 
140 // Destructor
141 
143  free(mIoBuffers[0]);
144  free(mIoBuffers[1]);
145  if (mSocket)
146  if (closesocket(mSocket) < 0)
147  perror("RemoteStream: close _sock");
148 }
149 
150 //// nextBuffer method -- copy data and signal the semaphore when you need more ////
151 
152 void RemoteStream::nextBuffer(Buffer &outputBuffer) throw(CException) {
153  unsigned char * out, * buf;
154  unsigned frames = outputBuffer.mNumFrames;
155  unsigned toCopy; // # of bytes to copy per channel
156  unsigned offset;
157  bool split; // whether we're splitting output buffers
158 
159  if ((mCurrentFrame + frames) <= mBufferSize) {
160  split = false;
161  toCopy = frames * sizeof(sample);
162  } else { // else we have to read from both buffers
163  split = true;
164  toCopy = (mBufferSize - mCurrentFrame) * sizeof(sample);
165  }
166  // Copy the output buffer samples
167  for (unsigned i = 0; i < outputBuffer.mNumChannels; i++) {
168  out = (unsigned char *) outputBuffer.buffer(i];
169  offset = RS_PACKET_SIZE + (mCurrentFrame * sizeof(sample))
170  + (i * mBufferSize * sizeof(sample));
171  buf = ((unsigned char *) (mIoBuffers[mCurrentBuffer])) + offset;
172  memcpy(out, buf, toCopy);
173  }
174  mCurrentFrame += frames;
175  if (mCurrentFrame >= mBufferSize) { // If we're done with this buffer
176  mCurrentBuffer = 1 - mCurrentBuffer;
177  mCurrentFrame = 0;
178  sem_post(mServerSemaphore); // Trigger the server reader thread!
179  }
180  if (split) { // if we have to read the start of the other buffer
181  toCopy = (frames * sizeof(sample)) - toCopy;
182  for (unsigned i = 0; i < outputBuffer.mNumChannels; i++) {
183  out = (unsigned char *) outputBuffer.buffer(i];
184  offset = RS_PACKET_SIZE + (i * mBufferSize * sizeof(sample));
185  buf = ((unsigned char *) (mIoBuffers[mCurrentBuffer])) + offset;
186  memcpy(out, buf, toCopy);
187  }
188  mCurrentFrame = toCopy / sizeof(sample);
189  }
190  return;
191 }
192 
193 // Here's the reader thread function; it waits on a semaphore to send a packet to the server
194 
195 extern "C" void * RS_Read_Loop(void * inst) {
196  RemoteStream * rfs = (RemoteStream *) inst;
197  ssize_t bytes_xfer;
198  unsigned command, packet_size;
199  unsigned _numChannels = rfs->channels();
200  unsigned which, _bufferSize;
201  unsigned char * b_str;
202  CSL_RFS_MSG * pkt_header;
203  int sock = rfs->get_sock();
204 #ifdef RS_TIMING
205  thisSec = timeVals = timeSum = 0L;
206 #endif
207 
208  logMsg("\tRemoteStream starting read_loop");
209  while(1) { // -- ENDLESS LOOP --
210  sem_wait(rfs->get_semaphore()); // Wait on the semaphore sent from the reader thread
211  // Set up request packet header
212  which = rfs->get_buffer_switch();
213  _bufferSize = rfs->get_buffer_size();
214  b_str = (unsigned char *) ((rfs->get_io_buffers())[1 - which]);
215  pkt_header = (CSL_RFS_MSG *) b_str; // Cast buffer into a packet header
216  pkt_header->magic = RFS_PACKET_MAGIC | CSL_CMD_NEXT_BUFFER;
217  pkt_header->frames = _bufferSize;
218  pkt_header->channels = _numChannels;
219 #ifdef RS_TIMING
220  GET_TIME(&then);
221 #endif // Send a request packet
222  if (write(sock, (char *) b_str, RFS_PACKET_SIZE) != (int) RFS_PACKET_SIZE) {
223  logMsg(kLogError, "RemoteStream read_loop write");
224  perror("\t\t");
225  continue;
226  } // Read a response packet
227  packet_size = RFS_RESPONSE_PACKET_SIZE;
228  bytes_xfer = read(sock, (char *) b_str, packet_size);
229  if (bytes_xfer != (int) packet_size) {
230  logMsg(kLogError, "RemoteStream read_loop read: %d", bytes_xfer);
231  perror("\t\t");
232  continue;
233  }
234 #ifdef RS_TIMING
235  GET_TIME(&now);
236  rfs->print_time_statistics(&now, &then, &thisSec, &timeSum, &timeVals);
237 #endif // Now process the response buffer
238  command = pkt_header->magic; // Check header/footer fields, ignore packet on error
239  if ((command & 0xffffff00) != RFS_PACKET_MAGIC) {
240  logMsg(kLogError, "RemoteStream: bad response magic number: %x", command);
241  continue;
242  }
243  memcpy(& command, & b_str[packet_size - 4], 4);
244  if (command != RFS_PACKET_MAGIC) { // Check trailing magic number
245  logMsg(kLogError, "RemoteStream: bad response footer: %x", command);
246  continue;
247  }
248  }
249 }
250 
251 // Private thread management function
252 
253 extern "C" int CSL_CreateThread(THREAD_START_ROUTINE pfnThreadProc, void * pvParam) {
254  pthread_t tid;
255  pthread_attr_t attr;
256  int ret;
257 
258  pthread_attr_init(&attr);
259  pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
260  pthread_attr_setscope(&attr, PTHREAD_SCOPE_SYSTEM);
261 
262  ret = pthread_create(&tid, &attr, pfnThreadProc, pvParam);
263  return (ret == 0);
264 }
void logMsg(const char *format,...)
These are the public logging messages.
Definition: CGestalt.cpp:292
AdditiveInstrument.h – Sum-of-sines synthesis instrument class.
Definition: Accessor.h:17
void * RS_Read_Loop(void *inst)
void nextBuffer(Buffer &outputBuffer)
Get a buffer of frames.
void * RS_read_loop(void *inst)
#define GET_TIME(val)
Definition: CSL_Core.h:700
struct sockaddr_in mServerAddr
Socket addresses for the remote server and for me.
Definition: RemoteStream.h:127
RemoteStream class.
Definition: RemoteStream.h:100
#define RS_PACKET_MAGIC
Definition: RemoteStream.h:67
RemoteStream(char *clientName, unsigned short clientPort, unsigned ch, unsigned bufSize)
Constructor.
#define RS_RESPONSE_PACKET_SIZE
Definition: RemoteStream.h:70
int initSockets(char *serverName, unsigned short serverPort)
float sample
(could be changed to int, or double)
Definition: CSL_Types.h:191
unsigned mNumChannels
my "expected" number of output channels
Definition: CSL_Core.h:292
unsigned mBufferSize
the size of the input ring buffer (in FRAMES)
Definition: RemoteStream.h:119
unsigned short channels
Definition: RemoteStream.h:62
virtual int connectToServer()
unsigned short frames
Definition: RemoteStream.h:61
#define RS_PACKET_SIZE
Definition: RemoteStream.h:68
sample * mIoBuffers[2]
My IO buffers (2 for dbl-buffering; mBufferSize frames in size)
Definition: RemoteStream.h:120
#define closesocket(x)
Definition: RemoteStream.h:51
sem_t * mServerSemaphore
Semaphore to trigger call to server for samples.
Definition: RemoteStream.h:128
int CSL_CreateThread(THREAD_START_ROUTINE pfnThreadProc, void *pvParam)
unsigned mCurrentBuffer
current IO buffer in use
Definition: RemoteStream.h:121
Buffer – the multi-channel sample buffer class (passed around between generators and IO guys)...
Definition: CSL_Core.h:106
CSL_RS_MSG mHeader
The request packet header.
Definition: RemoteStream.h:126
void *(* THREAD_START_ROUTINE)(void *)
Definition: RemoteStream.h:93
#define CSL_CMD_NEXT_BUFFER
Definition: RemoteStream.h:81
unsigned magic
Definition: RemoteStream.h:60
unsigned mCurrentFrame
current position in buffer
Definition: RemoteStream.h:122
#define CSL_CMD_SET_CLIENT
Definition: RemoteStream.h:80
void error(int num, const char *m, const char *path)
Definition: OSC_support.cpp:70
int mSocket
The socket I send to.
Definition: RemoteStream.h:125
Base class of CSL exceptions (written upper-case). Has a string message.