Scripting

Notch Up Producer-Consumer Paradigm with Python

Producer Consumer Paradigm in Python

The last time we discussed asynchronous programming using Processes and Threads, here is a link to it. It is highly recommended to visit the article before reading this since we shall be using concepts mentioned there. Today we are going to discuss when to use what briefly and implementation of the Producer-Consumer paradigm.

Processes Threads
Processes are suitable for applications where CPU utilisation is more. Consider crunching huge numbers or processing large data. These tasks are best performed across processes or CPUs. Threads are suitable for scenarios where we have a time consuming I/O operations eg: file I/O, http calls or socket connections. The thread making such I/O call waits for the response while other threads may keep running.

Producer-Consumer Pattern

The producer-Consumer design pattern is widely used today in an array of applications. Most of the integration interfaces use this approach. Message Queues (MQs) use this paradigm extensively and extend it into publisher/subscriber pattern, P2P pattern or Push/Pull pattern.

The basic idea behind using this pattern is to distribute a collection of tasks to multiple threads running asynchronously. We can also achieve chaining of producers and consumers and use intermediary nodes that shall act as both producers and consumers.

Take an example of android notifications, the servers push (produce) notifications to FCM which consumes them and further pushes to individual devices.

Another real-life example could be that of Postal Services, people produce mails to Postal Service which consumes them and in turn produces them to recipients who act as consumers.

Queue

We need a queue when dealing with Producers and Consumers. Producers need a place to push the messages and consumers to read them from. Queues are widely used data structures here considering, we would usually want to process messages in FIFO fashion. Although other data structures like List and Arrays should work just fine. It is very important for the producers and consumers to share the same object of Queue.

Producers

Producers are the objects running asynchronously and adding messages inside a queue. The messages pushed in a queue then become available to be consumed by a consumer. The producers do not bother if the messages are getting consumed, their job is limited and it is to push messages. There could be multiple producers producing messages to a queue.

Consumers

Consumers keep polling the queue, waiting for messages to arrive in the queue. The consumers start processing the messages once they become available. There could be multiple consumers polling through a queue.

Poison (Optional)

Helps to identify when to return a running thread. When a producer/consumer receives a poison, it is a signal for them to stop. This ensures there are no hanging threads at the end of processing.

Do It Yourself:

There are multiple examples of single producers and consumers. Let us take it a notch up.

Assumptions:

There are some tasks (viz. initial_task, final_task) which can be done relatively faster than others and there are tasks(viz. intermediary_task) which take a lot of time to process.

Design:

There would be just a single thread for running initial_task and another for final_task however, there would be three threads dedicated for the intermediary_task. As soon as the initial_task is done, it is required to be picked up by the available thread which then performs intermediary_task. Once completed the result is then produced into the final queue which is then consumed to perform the final_task.

import time 
from queue import Queue 
from threading import Thread 

Poison = -1 


class TimeConsumingNode(Thread): 
  def __init__(self, consumer_queue: Queue = None, 
               producer_queue: Queue = None, task_time: int = 0): 
      super(TimeConsumingNode, self).__init__() 
      self.consumer_queue = consumer_queue 
      self.producer_queue = producer_queue 
      self.task_time = task_time 

  def run(self) -> None: 
     global Poison 
     while True: 
        consumed_message = self.consumer_queue.get() 
        print(self, f"consumed {consumed_message}") 
        if consumed_message == Poison: 
           print(self, "got poison") 
           # Pass the poison to intermediary producers. 
           self.consumer_queue.put(consumed_message) 
           # Pass the poison to the final consumer. 
           self.producer_queue.put(consumed_message) 
           break 
        # Mock processing the consumed message. 
        time.sleep(self.task_time) 
        # Mock some heavy tasks. 
        time.sleep(self.task_time) 
        print(self, 
              f"took at least {self.task_time * 2} for consuming " 
              f"{consumed_message} and for producing {consumed_message}") 
        self.producer_queue.put(consumed_message) 


# Comparatively faster task node. 
class ProducerNode(Thread): 
  def __init__(self, consumed_queue: Queue = None, 
               produced_queue: Queue = None, 
               task_time: int = 0): 
      super(ProducerNode, self).__init__() 
      self.consumed_queue = consumed_queue 
      self.produced_queue = produced_queue 
      self.task_time = task_time 

  def run(self) -> None: 
      global Poison 
      while not self.consumed_queue.empty(): 
         some_number = self.consumed_queue.get() 
         time.sleep(self.task_time) 
         print(self, f"Producing {some_number}") 
         self.produced_queue.put(some_number) 

      # Add poison. 
      print(self, f"Adding poison") 
      self.produced_queue.put(Poison) 


# Last task again comparatively faster one. 
class ConsumerNode(Thread): 
  def __init__(self, consumed_queue: Queue = None, task_time: int = 0): 
     super(ConsumerNode, self).__init__() 
     self.consumed_queue = consumed_queue 
     self.task_time = task_time 
     self.poisons = 0 

  def run(self) -> None: 
     global Poison 
     while True: 
        consumed_message = self.consumed_queue.get() 
        if consumed_message == Poison: 
           self.poisons = self.poisons + 1 
           print(self, f"got {self.poisons} poisons") 
           if self.poisons == 3: 
              break 
        # Mock some time taking task 
        time.sleep(self.task_time / 2) 
        print(self, 
f"took {self.task_time / 2} to consume {consumed_message}") 


if __name__ == "__main__": 
   my_queue = Queue() 
   my_queue.put(2) 
   my_queue.put(3) 
   my_queue.put(4) 
   my_queue.put(5) 
   my_queue.put(6) 

   intermediary_queue = Queue() 
   final_queue = Queue() 
   first_prod_node = ProducerNode(consumed_queue=my_queue, 
produced_queue=intermediary_queue, 
task_time=3) 

   tc_node_1 = TimeConsumingNode(consumer_queue=intermediary_queue, 
producer_queue=final_queue, task_time=4) 
   tc_node_2 = TimeConsumingNode(consumer_queue=intermediary_queue, 
producer_queue=final_queue, task_time=5) 
   tc_node_3 = TimeConsumingNode(consumer_queue=intermediary_queue, 
producer_queue=final_queue, task_time=2) 

   final_node = ConsumerNode(consumed_queue=final_queue, task_time=1) 

   first_prod_node.start() 
   tc_node_1.start() 
   tc_node_2.start() 
   tc_node_3.start() 
   final_node.start() 

   first_prod_node.join() 
   tc_node_1.join() 
   tc_node_2.join() 
   tc_node_3.join() 
   final_node.join()

Output:

Risks:

If not properly implemented they may lead to

  1. Hanging threads
  2. Deadlocks
  3. IndexOutOfBound Exceptions while reading queues.

Conclusion

The producer-Consumer pattern is a very useful design that can be leveraged to a varied extent in order to enable asynchronous processing of multiple time-consuming tasks. The concept has been widely incorporated in modern-day messaging queues viz. Kafka, RabbitMQ, Cloud MQs provided by AWS, GCP, etc. They are powerful and risky! Use them wisely!

Similar Posts