PreviousNext

Examples

Event Producer

An example of a program written to produce events for an ET system is adapted from the file et_producer1.c. It follows below:

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <signal.h>
#include <sys/types.h>
#include <unistd.h>
#include <time.h>
#include <limits.h>
#include <et.h>
 
#define NUMLOOPS 20000
#define CHUNK 10
 
main(int argc,char **argv)
{
int i, j, size, status;
int freq, freq_tot=0, freq_avg, iterations=1, count;
et_att_id attach1;
et_sys_id id;
et_openconfig openconfig;
et_event *pe[CHUNK];
struct timespec t1, t2;
double time;

/* handy data for testing */
int numbers[] = {0,1,2,3,4,5,6,7,8,9};
char *stuff[] = {"One","Two","Three","Four","Five","Six","Seven","Eight","Nine","Ten"};
int control[] = {17,8,-1,-1}; /* 17 & 8 are arbitrary numbers */

/* pass the ET filename and event size on command line */
if ((argc != 2) && (argc != 3)) {
  printf("Usage: et_producer <et_filename> [<eventsize>]\n");
  exit(1);
}
size = 10;
if (argc == 3) {
  size = atoi(argv[2]);
}

/* open local ET system and don't wait for it */
et_open_config_init(&openconfig);
if (et_open(&id, argv[1], openconfig) != ET_OK) {
  printf("et_producer: et_open problems\n");
  exit(1);
}
et_open_config_destroy(openconfig);

/* set level of debug output (everything) */
et_system_setdebug(id, ET_DEBUG_INFO);

/* attach to grandcentral station */
if (et_station_attach(id, ET_GRANDCENTRAL, &attach1) < 0) {
  printf("et_producer: error in station attach\n");
  exit(1);
}

/* while the ET system is alive, do the following loop */
while (et_alive(id)) {

  /* read time for future statistics calculations */
  clock_gettime(CLOCK_REALTIME, &t1);

  /* loop NUMLOOPS times before printing out statistics */
  for (j=0; j < NUMLOOPS ; j++) {
    /* get CHUNK new events at a time */
    count = 0;
    status = et_events_new(id, attach1, pe, ET_SLEEP, NULL, size, CHUNK, &count);
    if (status == 0) {
      /* everything is OK */;
    }
    else if (status == ET_ERROR_DEAD) {
      printf("et_producer: ET is dead\n");
      break;
    }
    else if (status == ET_ERROR_TIMEOUT) {
      printf("et_producer: got timeout\n");
      break;
    }
    else if (status == ET_ERROR_EMPTY) {
      printf("et_producer: no events\n");
      break;
    }
    else if (status == ET_ERROR_BUSY) {
      printf("et_producer: grandcentral is busy\n");
      break;
    }
    else if (status == ET_ERROR_WAKEUP) {
      printf("et_producer: someone told me to wake up\n");
      break;
    }
    else if (status != ET_OK) {
      printf("et_producer: request error\n");
      goto error;
    }

    /* write data, set priority, set control values here */
    if (1) {
      void *pdata;
      for (i=0; i < count; i++) {
        /* allow et_client modes 3 & 4 to work (see et_client.c below) */
        et_event_setcontrol(pe[i], control, 4);
        et_event_getdata(pe[i], &pdata);
        memcpy(pdata, (const void *) &numbers[i], sizeof(int));
        et_event_setlength(pe[i], sizeof(int));
      }
    }

    /* put events back into the ET system */
    status = et_events_put(id, attach1, pe, count);
    if (status == ET_OK) {
      ;
    }
    else if (status == ET_ERROR_DEAD) {
      printf("et_producer: put, ET is dead\n");
      break;
    }
    else if (status != ET_OK) {
      printf("et_producer: put error\n");
      goto error;
    }
  } /* for NUMLOOPS */

  /* calculate and printout event rates */
  clock_gettime(CLOCK_REALTIME, &t2);
  time = (double)(t2.tv_sec - t1.tv_sec) + 1.e-9*(t2.tv_nsec - t1.tv_nsec);
  freq = (count*NUMLOOPS)/time;
  /* if numbers get too big, start over */
  if ((INT_MAX - freq_tot) < freq) {
    freq_tot = 0;
    iterations = 1;
  }
  freq_tot += freq;
  freq_avg = freq_tot/iterations;
  iterations++;
  printf("et_producer: %d Hz, %d Hz Avg.\n", freq, freq_avg);

} /* end of while(alive)loop */

error:
printf("et_producer: ERROR\n");
exit(0);
}

Event Consumer

An example of a program written to consume events produced by an ET system is adapted from the file et_client.c. It follows below:

#include <stdio.h>
#include <stdlib.h>
#include <signal.h>
#include <sys/types.h>
#include <unistd.h>
#include <time.h>
#include <thread.h>
#include <et.h>
 
#define NUMEVENTS 500000
#define CHUNK 100
 
main(int argc,char **argv)
{
int i, j, status, swtch, numread, totalread=0;
int con[ET_STATION_SELECT_INTS];
et_statconfig sconfig;
et_openconfig openconfig;
et_event   *pe[CHUNK];
et_att_id  attach1;
et_stat_id my_stat;
et_sys_id  id;
int selections[] = {17,15,-1,-1}; /* 17 & 5 are arbitrary */

if(argc != 4) {
  printf("Usage: et_client <et_filename> <station_name> <mode>\n");
  exit(1);
}

/* open local ET system and don't wait for it */
et_open_config_init(&openconfig);
if (et_open(&id, argv[1], openconfig) != ET_OK) {
  printf("et_client: et_open problems\n");
  exit(1);
}
et_open_config_destroy(openconfig);

/* User selects which type/mode of station to create - got 6 choices.
 * (Of course, many more combinations of settings are possible.)
 */

swtch = atoi(argv[3]);

/* set some common values */
et_station_config_init(&sconfig);
et_station_config_setuser(sconfig, ET_STATION_USER_MULTI);
et_station_config_setrestore(sconfig, ET_STATION_RESTORE_OUT);
et_station_config_setprescale(sconfig, 5);
et_station_config_setcue(sconfig, 20);
 
if (swtch == 1) {
  /* get everything */
  et_station_config_setselect(sconfig, ET_STATION_SELECT_ALL);
  et_station_config_setblock(sconfig, ET_STATION_BLOCKING);
}
else if (swtch == 2) {
  /* nonblocking */
  et_station_config_setselect(sconfig, ET_STATION_SELECT_ALL);
  et_station_config_setblock(sconfig, ET_STATION_NONBLOCKING);
}
else if (swtch == 3) {
  /* select events */
  et_station_config_setselect(sconfig, ET_STATION_SELECT_MATCH);
  et_station_config_setblock(sconfig, ET_STATION_BLOCKING);
  et_station_config_setselectwords(sconfig, selections);
}
else if (swtch == 4) {
  /* non-blocking, selection */
  et_station_config_setselect(sconfig, ET_STATION_SELECT_MATCH);
  et_station_config_setblock(sconfig, ET_STATION_NONBLOCKING);
  et_station_config_setselectwords(sconfig, selections);
}
else if (swtch == 5) {
  /* user's condition, blocking mode */
  et_station_config_setselect(sconfig, ET_STATION_SELECT_USER);
  et_station_config_setblock(sconfig, ET_STATION_BLOCKING);
  et_station_config_setselectwords(sconfig, selections);
  if (et_station_config_setfunction(sconfig, "et_my_function") == ET_ERROR) {
    printf("et_client: cannot set function\n");
    exit(1);
  }
  if (et_station_config_setlib(sconfig, "/.../libet_user.so") == ET_ERROR) {
    printf("et_client: cannot set library\n");
    exit(1);
  }
}
else if (swtch == 6) {
  /* user's condition, nonblocking mode */
  et_station_config_setselect(sconfig, ET_STATION_SELECT_USER);
  et_station_config_setblock(sconfig, ET_STATION_NONBLOCKING);
  et_station_config_setselectwords(sconfig, selections);
  if (et_station_config_setfunction(sconfig, "et_my_function") == ET_ERROR) {
    printf("et_client: cannot set function\n");
    exit(1);
  }
  if (et_station_config_setlib(sconfig, "/.../libet_user.so") == ET_ERROR) {
    printf("et_client: cannot set library\n");
    exit(1);
  }
}

/* set level of debug output */
et_system_setdebug(id, ET_DEBUG_INFO);

/* create the station */
if ((status = et_station_create(id, &my_stat, argv[2], sconfig)) < ET_OK) {
  if (status == ET_ERROR_EXISTS) {
    /* my_stat contains pointer to existing station */
    printf("et_client: station already exists\n");
  }
  else if (status == ET_ERROR_TOOMANY) {
    printf("et_client: too many stations created\n");
    goto error;
  }
  else {
    printf("et_client: error in station creation\n");
    goto error;
  }
}
et_station_config_destroy(sconfig);

/* attach to the newly created station */
if (et_station_attach(id, my_stat, &attach1) < 0) {
  printf("et_client: error in station attach\n");
  goto error;
}

/* loop, while ET system is alive, to read and write events */
while (et_alive(id)) {
  /* example of reading array of up to "CHUNK" events */
  status = et_events_get(id, attach1, pe, ET_SLEEP, NULL, CHUNK, &numread);
  if (status == ET_OK) {
    /* everything is OK*/
  }
  else if (status == ET_ERROR_DEAD) {
    printf("et_client: ET is dead\n");
    goto end;
  }
  else if (status == ET_ERROR_TIMEOUT) {
    printf("et_client: got timeout\n");
    goto end;
  }
  else if (status == ET_ERROR_EMPTY) {
    printf("et_client: no events\n");
    goto end;
  }
  else if (status == ET_ERROR_BUSY) {
    printf("et_client: station is busy\n");
    goto end;
  }
  else if (status == ET_ERROR_WAKEUP) {
    printf("et_client: someone told me to wake up\n");
    goto end;
  }
  else if (status != ET_OK) {
    printf("et_client: get error\n");
    goto error;
  }
 
  /* print data */
  if (0) {
    int pri, len, *data;
    for (j=0; j < numread; j++) {
      et_event_getdata(pe[j], (void **) &data);
      et_event_getpriority(pe[j], &pri);
      et_event_getlength(pe[j], &len);
      et_event_getcontrol(pe[j], con);
      printf("et_client data = %d, pri = %d, len = %d\n", *data, pri, len);
      for (i=0; i < ET_STATION_SELECT_INTS; i++) {
        printf(" con[%d] = %d\n", i, con[i]);
      }
    }
  }

  /* putting array of events */
  status = et_events_put(id, attach1, pe, numread);
  if (status == ET_ERROR_DEAD) {
    printf("et_client: ET is dead\n");
    goto end;
  }
  else if (status != ET_OK) {
    printf("et_client: put error\n");
    goto error;
  }
  totalread += numread;

end:
  /* print something out after having read NUMEVENTS events */
  if (totalread >= NUMEVENTS) {
    totalread = 0;
    printf(" et_client: %d events\n", NUMEVENTS);
  }

} /* while(alive) */

error:
free(pe);
printf("et_client: ERROR\n");
exit(0);
}

PreviousNext