00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037
00038 #define _XOPEN_SOURCE 500
00039
00040 #include <config.h>
00041
00042 #include <errno.h>
00043 #include <limits.h>
00044 #include <pthread.h>
00045 #include <signal.h>
00046 #include <stdarg.h>
00047 #include <stdio.h>
00048 #include <stdlib.h>
00049 #include <string.h>
00050 #include <sys/types.h>
00051 #include <sys/time.h>
00052 #include <unistd.h>
00053
00054 #include "misc.h"
00055 #include "empthread.h"
00056 #include "prototypes.h"
00057
00058 #define EMPTH_KILLED 1
00059 #define EMPTH_INTR 2
00060
00061 struct empth_t {
00062 char *name;
00063 void *ud;
00064 int state;
00065 void (*ep)(void *);
00066 pthread_t id;
00067 };
00068
00069 struct empth_rwlock_t {
00070 char *name;
00071 pthread_rwlock_t lock;
00072 };
00073
00074
00075 static pthread_key_t ctx_key;
00076
00077
00078 static int empth_flags;
00079
00080
00081 static void **udata;
00082
00083
00084
00085
00086
00087
00088
00089 static pthread_mutex_t mtx_ctxsw;
00090
00091 static void empth_status(char *format, ...)
00092 ATTRIBUTE((format (printf, 1, 2)));
00093 static void empth_alarm(int sig);
00094
00095 static void *
00096 empth_start(void *arg)
00097 {
00098 empth_t *ctx = arg;
00099
00100 ctx->id = pthread_self();
00101 pthread_setspecific(ctx_key, ctx);
00102 pthread_mutex_lock(&mtx_ctxsw);
00103 *udata = ctx->ud;
00104 ctx->ep(ctx->ud);
00105 empth_exit();
00106 return NULL;
00107 }
00108
00109 static void
00110 empth_status(char *format, ...)
00111 {
00112 va_list ap;
00113 static struct timeval startTime;
00114 struct timeval tv;
00115 char buf[1024];
00116 int sec, msec;
00117 empth_t *a;
00118
00119 va_start(ap, format);
00120 if (empth_flags & EMPTH_PRINT) {
00121 if (startTime.tv_sec == 0)
00122 gettimeofday(&startTime, 0);
00123 gettimeofday(&tv, 0);
00124 sec = tv.tv_sec - startTime.tv_sec;
00125 msec = (tv.tv_usec - startTime.tv_usec) / 1000;
00126 if (msec < 0) {
00127 sec++;
00128 msec += 1000;
00129 }
00130 vsprintf(buf, format, ap);
00131 a = empth_self();
00132 printf("%d:%02d.%03d %17s: %s\n", sec / 60, sec % 60, msec / 10,
00133 a->name, buf);
00134
00135 }
00136 va_end(ap);
00137 }
00138
00139
00140 int
00141 empth_init(void **ctx_ptr, int flags)
00142 {
00143 empth_t *ctx;
00144 sigset_t set;
00145 struct sigaction act;
00146
00147 empth_flags = flags;
00148 udata = ctx_ptr;
00149
00150 empth_init_signals();
00151 sigemptyset(&set);
00152 sigaddset(&set, SIGHUP);
00153 sigaddset(&set, SIGINT);
00154 sigaddset(&set, SIGTERM);
00155 pthread_sigmask(SIG_BLOCK, &set, NULL);
00156 act.sa_flags = 0;
00157 sigemptyset(&act.sa_mask);
00158 act.sa_handler = empth_alarm;
00159 sigaction(SIGALRM, &act, NULL);
00160
00161 pthread_key_create(&ctx_key, NULL);
00162 pthread_mutex_init(&mtx_ctxsw, NULL);
00163
00164 ctx = malloc(sizeof(empth_t));
00165 if (!ctx) {
00166 logerror("pthread init failed: not enough memory");
00167 exit(1);
00168 }
00169 ctx->name = "Main";
00170 ctx->ep = 0;
00171 ctx->ud = 0;
00172 ctx->id = pthread_self();
00173 ctx->state = 0;
00174 pthread_setspecific(ctx_key, ctx);
00175 pthread_mutex_lock(&mtx_ctxsw);
00176 logerror("pthreads initialized");
00177 return 0;
00178 }
00179
00180
00181 empth_t *
00182 empth_create(void (*entry)(void *), int size, int flags,
00183 char *name, void *ud)
00184 {
00185 pthread_t t;
00186 pthread_attr_t attr;
00187 empth_t *ctx;
00188 int eno;
00189
00190 empth_status("creating new thread %s", name);
00191
00192 ctx = malloc(sizeof(empth_t));
00193 if (!ctx) {
00194 logerror("not enough memory to create thread %s", name);
00195 return NULL;
00196 }
00197 ctx->name = strdup(name);
00198 ctx->ud = ud;
00199 ctx->state = 0;
00200 ctx->ep = entry;
00201
00202 eno = pthread_attr_init(&attr);
00203 if (eno) {
00204 logerror("can not create thread attribute %s: %s",
00205 name, strerror(eno));
00206 goto bad;
00207 }
00208 if (size < PTHREAD_STACK_MIN)
00209 size = PTHREAD_STACK_MIN;
00210 pthread_attr_setstacksize(&attr, size);
00211 pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
00212
00213 eno = pthread_create(&t, &attr, empth_start, ctx);
00214 if (eno) {
00215 logerror("can not create thread: %s: %s", name, strerror(eno));
00216 goto bad;
00217 }
00218 empth_status("new thread id is %ld", (long)t);
00219 empth_yield();
00220 return ctx;
00221
00222 bad:
00223 pthread_attr_destroy(&attr);
00224 free(ctx);
00225 return NULL;
00226 }
00227
00228
00229 static void
00230 empth_restorectx(void)
00231 {
00232 empth_t *ctx_ptr;
00233
00234 ctx_ptr = pthread_getspecific(ctx_key);
00235 *udata = ctx_ptr->ud;
00236 if (ctx_ptr->state == EMPTH_KILLED) {
00237 empth_status("i am dead");
00238 empth_exit();
00239 }
00240 ctx_ptr->state = 0;
00241 empth_status("context restored");
00242 }
00243
00244 empth_t *
00245 empth_self(void)
00246 {
00247 return pthread_getspecific(ctx_key);
00248 }
00249
00250 void
00251 empth_exit(void)
00252 {
00253 empth_t *ctx = pthread_getspecific(ctx_key);
00254
00255 empth_status("empth_exit");
00256 pthread_mutex_unlock(&mtx_ctxsw);
00257 free(ctx->name);
00258 free(ctx);
00259 pthread_exit(0);
00260 }
00261
00262 void
00263 empth_yield(void)
00264 {
00265 pthread_mutex_unlock(&mtx_ctxsw);
00266 pthread_mutex_lock(&mtx_ctxsw);
00267 empth_restorectx();
00268 }
00269
00270 void
00271 empth_terminate(empth_t *a)
00272 {
00273 empth_status("killing thread %s", a->name);
00274 a->state = EMPTH_KILLED;
00275 pthread_kill(a->id, SIGALRM);
00276 }
00277
00278 void
00279 empth_select(int fd, int flags)
00280 {
00281
00282 fd_set readmask;
00283 fd_set writemask;
00284 struct timeval tv;
00285 int n;
00286
00287 pthread_mutex_unlock(&mtx_ctxsw);
00288 empth_status("%s select on %d",
00289 flags == EMPTH_FD_READ ? "read" : "write", fd);
00290 while (1) {
00291 tv.tv_sec = 1000000;
00292 tv.tv_usec = 0;
00293
00294 FD_ZERO(&readmask);
00295 FD_ZERO(&writemask);
00296
00297 switch (flags) {
00298 case EMPTH_FD_READ:
00299 FD_SET(fd, &readmask);
00300 break;
00301 case EMPTH_FD_WRITE:
00302 FD_SET(fd, &writemask);
00303 break;
00304 default:
00305 logerror("bad flag %d passed to empth_select", flags);
00306 empth_exit();
00307 }
00308
00309 n = select(fd + 1, &readmask, &writemask, (fd_set *) 0, &tv);
00310
00311 if (n < 0) {
00312 if (errno == EINTR) {
00313
00314 empth_status("select broken by signal");
00315 goto done;
00316 return;
00317 }
00318
00319 empth_status("select failed (%s)", strerror(errno));
00320 goto done;
00321 return;
00322 }
00323
00324 if (flags == EMPTH_FD_READ && FD_ISSET(fd, &readmask)) {
00325 empth_status("input ready");
00326 break;
00327 }
00328 if (flags == EMPTH_FD_WRITE && FD_ISSET(fd, &writemask)) {
00329 empth_status("output ready");
00330 break;
00331 }
00332 }
00333
00334 done:
00335 pthread_mutex_lock(&mtx_ctxsw);
00336 empth_restorectx();
00337 }
00338
00339 static void
00340 empth_alarm(int sig)
00341 {
00342
00343
00344
00345
00346 }
00347
00348 void
00349 empth_wakeup(empth_t *a)
00350 {
00351 empth_status("waking up thread %s", a->name);
00352 if (a->state == 0)
00353 a->state = EMPTH_INTR;
00354 pthread_kill(a->id, SIGALRM);
00355 }
00356
00357 int
00358 empth_sleep(time_t until)
00359 {
00360 empth_t *ctx = pthread_getspecific(ctx_key);
00361 struct timeval tv;
00362 int res;
00363
00364 empth_status("going to sleep %ld sec", until - time(0));
00365 pthread_mutex_unlock(&mtx_ctxsw);
00366 do {
00367 tv.tv_sec = until - time(NULL);
00368 tv.tv_usec = 0;
00369 res = select(0, NULL, NULL, NULL, &tv);
00370 } while (res < 0 && ctx->state == 0);
00371 empth_status("sleep done. Waiting for lock");
00372 pthread_mutex_lock(&mtx_ctxsw);
00373 empth_restorectx();
00374 return res;
00375 }
00376
00377 int
00378 empth_wait_for_signal(void)
00379 {
00380 sigset_t set;
00381 int sig, err;
00382
00383 sigemptyset(&set);
00384 sigaddset(&set, SIGHUP);
00385 sigaddset(&set, SIGINT);
00386 sigaddset(&set, SIGTERM);
00387 pthread_mutex_unlock(&mtx_ctxsw);
00388 for (;;) {
00389 empth_status("waiting for signals");
00390 err = sigwait(&set, &sig);
00391 if (CANT_HAPPEN(err)) {
00392 sleep(60);
00393 continue;
00394 }
00395 empth_status("got awaited signal %d", sig);
00396 pthread_mutex_lock(&mtx_ctxsw);
00397 empth_restorectx();
00398 return sig;
00399 }
00400 }
00401
00402 empth_rwlock_t *
00403 empth_rwlock_create(char *name)
00404 {
00405 empth_rwlock_t *rwlock;
00406
00407 rwlock = malloc(sizeof(*rwlock));
00408 if (!rwlock)
00409 return NULL;
00410
00411 if (pthread_rwlock_init(&rwlock->lock, NULL) != 0) {
00412 free(rwlock);
00413 return NULL;
00414 }
00415
00416 rwlock->name = strdup(name);
00417 return rwlock;
00418 }
00419
00420 void
00421 empth_rwlock_destroy(empth_rwlock_t *rwlock)
00422 {
00423 pthread_rwlock_destroy(&rwlock->lock);
00424 free(rwlock->name);
00425 free(rwlock);
00426 }
00427
00428 void
00429 empth_rwlock_wrlock(empth_rwlock_t *rwlock)
00430 {
00431 pthread_mutex_unlock(&mtx_ctxsw);
00432 pthread_rwlock_wrlock(&rwlock->lock);
00433 pthread_mutex_lock(&mtx_ctxsw);
00434 empth_restorectx();
00435 }
00436
00437 void
00438 empth_rwlock_rdlock(empth_rwlock_t *rwlock)
00439 {
00440 pthread_mutex_unlock(&mtx_ctxsw);
00441 pthread_rwlock_rdlock(&rwlock->lock);
00442 pthread_mutex_lock(&mtx_ctxsw);
00443 empth_restorectx();
00444 }
00445
00446 void
00447 empth_rwlock_unlock(empth_rwlock_t *rwlock)
00448 {
00449 pthread_rwlock_unlock(&rwlock->lock);
00450 }