private static void ProducerThreadEntry() {
IMessage[] localQueue = new IMessage[PRODUCER_BUFFER_SIZE];
int localQueueIndex = 0;
while (queueRunning) {
while (localQueueIndex < PRODUCER_BUFFER_SIZE) {
byte[] data = ReceiveWholeMessageFromStream();
IMessage message = DeserializeMessage(data);
localQueue[localQueueIndex++] = message;
}
lock (producerConsumerLock) {
for (int i = 0; i < PRODUCER_BUFFER_SIZE; ++i) {
messageQueue.Enqueue(localQueue[i]);
}
localQueueIndex = 0;
Monitor.Pulse(producerConsumerLock);
}
}
}
private static void ConsumerThreadEntry() {
IMessage[] localQueue = new IMessage[CONSUMER_BUFFER_SIZE];
while (queueRunning) {
lock (producerConsumerLock) {
while (messageQueue.Count < CONSUMER_BUFFER_SIZE) {
if (!queueRunning) return;
Monitor.Wait(producerConsumerLock);
}
for (int i = 0; i < CONSUMER_BUFFER_SIZE; ++i) {
localQueue[i] = messageQueue.Dequeue();
}
}
for (int i = 0; i < CONSUMER_BUFFER_SIZE; ++i) {
IMessage message = localQueue[i];
DispatchMessage(message);
}
}
}
Code snippet taken from "Common Multithreading Mistakes in C# - II: Unnecessary Contention".