#include <stdio.h>
#include <unistd.h>
#include <stdlib.h>
#include <pthread.h>
#include <semaphore.h>
#define SIZE 10
#define LOOPS 100
#define PROD_MS 1
#define CONS_MS 5
typedef int buffer_t;
buffer_t buffer[SIZE];
int buffer_index;
pthread_mutex_t buffer_mutex;
/* initially buffer will be empty. full_sem
will be initialized to buffer SIZE, which means
SIZE number of producer threads can write to it.
And empty_sem will be initialized to 0, so no
consumer can read from buffer until a producer
thread posts to empty_sem */
sem_t full_sem; /* when 0, buffer is full */
sem_t empty_sem; /* when 0, buffer is empty. Kind of
like an index for the buffer */
/* sem_post algorithm:
mutex_lock sem_t->mutex
sem_t->value++
mutex_unlock sem_t->mutex
sem_wait algorithn:
mutex_lock sem_t->mutex
while (sem_t->value > 0) {
mutex_unlock sem_t->mutex
sleep... wake up
mutex_lock sem_t->mutex
}
sem_t->value--
mutex_unlock sem_t->mutex
*/
void insertbuffer(buffer_t value) {
if (buffer_index < SIZE) {
buffer[buffer_index++] = value;
} else {
printf("Buffer overflow\n");
}
}
buffer_t dequeuebuffer() {
if (buffer_index > 0) {
return buffer[--buffer_index]; // buffer_index-- would be error!
} else {
printf("Buffer underflow\n");
}
return 0;
}
void *producer() {
buffer_t value;
int i=0;
while (++i < LOOPS) {
sleep(PROD_MS);
value = rand() % 100;
sem_wait(&full_sem); // sem=0: wait. sem>0: go and decrement it
/* possible race condition here. After this thread wakes up,
another thread could aqcuire mutex before this one, and add to list.
Then the list would be full again
and when this thread tried to insert to buffer there would be
a buffer overflow error */
pthread_mutex_lock(&buffer_mutex);
insertbuffer(value);
pthread_mutex_unlock(&buffer_mutex);
sem_post(&empty_sem); // post (increment) emptybuffer semaphore
printf("[Producer] B <- %d \n", value);
}
pthread_exit(0);
}
void *consumer() {
buffer_t value;
int i=0;
while (++i < LOOPS) {
sleep(CONS_MS);
sem_wait(&empty_sem);
/* there could be race condition here, that could cause
buffer underflow error */
pthread_mutex_lock(&buffer_mutex);
value = dequeuebuffer(value);
pthread_mutex_unlock(&buffer_mutex);
sem_post(&full_sem); // post (increment) fullbuffer semaphore
printf("[Consumer] B -> %d\n", value);
}
pthread_exit(0);
}
int main() {
buffer_index = 0;
pthread_mutex_init(&buffer_mutex, NULL);
sem_init(&full_sem, // sem_t *sem
0, // int pshared. 0 = shared between threads of process, 1 = shared between processes
1); // unsigned int value. Initial value
sem_init(&empty_sem,
0,
0);
/* full_sem is initialized to buffer size because SIZE number of
producers can add one element to buffer each. They will wait
semaphore each time, which will decrement semaphore value.
empty_sem is initialized to 0, because buffer starts empty and
consumer cannot take any element from it. They will have to wait
until producer posts to that semaphore (increments semaphore
value) */
pthread_t prod_th, cons_th;
pthread_create(&prod_th, NULL, producer, NULL);
pthread_create(&cons_th, NULL, consumer, NULL);
pthread_join(prod_th, NULL);
pthread_join(cons_th, NULL);
printf("Done.\n");
pthread_mutex_destroy(&buffer_mutex);
sem_destroy(&full_sem);
sem_destroy(&empty_sem);
return 0;
}