Source code for easycore.torch.parallel.engine

import torch.multiprocessing as mp
import threading
import queue
import atexit
import bisect
from typing import Callable, Iterable, Any
from easycore.common.config import CfgNode as CN

[docs]class BaseRunner: """ A Multi-process runner whose consumer receive data in unorder. The runner will start multi-processes for producers and 1 thread for consumer. """ class _Producer(mp.Process): def __init__(self, input_queue, output_queue, device, cfg, init_func, work_func, end_func): super(BaseRunner._Producer, self).__init__() self.input_queue = input_queue self.output_queue = output_queue self.device = device self.cfg = cfg.copy() self.init_func = init_func self.work_func = work_func self.end_func = end_func def run(self): raise NotImplementedError("This is a base runner without implement.") class _StopToken: pass class _Consumer(threading.Thread): def __init__(self, receive_func, input_queue, output_queue, cfg, init_func, work_func, end_func): super(BaseRunner._Consumer, self).__init__(daemon=True) self.receive_func = receive_func self.input_queue = input_queue self.output_queue = output_queue self.cfg = cfg.copy() self.init_func = init_func self.work_func = work_func self.end_func = end_func def run(self): raise NotImplementedError("This is a base runner without implement.") class _InitToken: pass class _EndToken: pass class _StopToken: pass
[docs] def __init__(self, devices, cfg = CN(), queue_scale = 3.0): """ Args: devices (int or Iterable): If the `devices` is `int`, it will use devices cpu to do the work. If the `devices` is an iterable object, such as list, it will use the devices specified by the iterable object, such as ["cpu", "cuda:0", "cuda:1"]. cfg (easycore.common.config.CfgNode): user custom data. queue_scale (float): scale the queues for communication between processes. """ # get devices if isinstance(devices, int): self.devices = ["cpu" for _ in range(devices)] elif isinstance(devices, Iterable): self.devices = devices else: raise Exception("parameter `devices` must be int or Iterable.") self.cfg = cfg self.queue_scale = queue_scale self._is_activate = False self.activate() atexit.register(self.close)
@property def is_activate(self): """ whether the runner is alive. """ return self._is_activate
[docs] @staticmethod def producer_init(device, cfg): """ function for producer initialization. Args: device (str): device for the this process. cfg (easycore.common.config.CfgNode): config of this process, you can use it to transfer data to `producer_work` and `producer_end` function. """ pass
[docs] @staticmethod def producer_work(device, cfg, data): """ function specify how the producer processes the data. Args: device (str): device for this process. cfg (easycore.common.config.CfgNode): config of this process, you can use it to get data from `producer_init` function and transfer data to the next `producer_work` and `producer_end` function. data (Any): data get from input of `__call__` method. Returns: Any: processed data """ return data
[docs] @staticmethod def producer_end(device, cfg): """ function after finishing all of its task and before close the process. Args: device (str): device for this process. cfg (easycore.common.config.CfgNode): config of this process, you can use it to get data from `producer_init` and `producer_work` function. """ pass
[docs] @staticmethod def consumer_init(cfg): """ function for consumer initialization. Args: cfg (easycore.common.config.CfgNode): config of this process, you can use it to transfer data to `consumer_work` and `consumer_end` function. """ pass
[docs] @staticmethod def consumer_work(cfg, data): """ function specify how the consumer processses the data from producers. Args: cfg (easycore.common.config.CfgNode): config of this process, you can use it to get data from `consumer_init` function and transfer data to the next `consumer_work` and `consumer_end` function. """ pass
[docs] @staticmethod def consumer_end(cfg): """ function after receiving all data from producers. Args: cfg (easycore.common.config.CfgNode): config of this process, you can use it get data from `consumer_work` function. Returns: Any: processed data """ return None
[docs] def __call__(self, data_iter): """ Args: data_iter (Iterable): iterator of data Returns: Any: result """ if not self.is_activate: raise Exception("The runner is closed. Please activate it.") # inform the consumer to initialize self._put_into_consumer(self._Consumer._InitToken()) # put data to producer for data in data_iter: self._put_into_producer(data) self._put_into_consumer(None) # inform the consumer to process 1 data # inform the consumer to return result self._put_into_consumer(self._Consumer._EndToken()) # get result from consumer data = self._get_from_consumer() return data
def __del__(self): self.close()
[docs] def close(self): """ Shutdown all processes if this runner is alive. """ if self.is_activate: self._is_activate = False # stop workers for _ in self.devices: self.producer_input_queue.put(self._Producer._StopToken()) self.consumer_input_queue.put(self._Consumer._StopToken()) # join workers for producer in self.producers: producer.join() self.consumer.join() # delete resources del self.producer_input_queue del self.producer_output_queue del self.consumer_input_queue del self.consumer_output_queue del self.producers del self.consumer
[docs] def activate(self): """ Restart all processes if this runner is closed. """ if not self.is_activate: self._is_activate = True # init queues for communication between processes self.producer_input_queue = mp.Queue(maxsize = int(len(self.devices) * self.queue_scale)) self.producer_output_queue = mp.Queue(maxsize = int(len(self.devices) * self.queue_scale)) self.consumer_input_queue = queue.Queue(maxsize = int(len(self.devices) * self.queue_scale)) self.consumer_output_queue = queue.Queue(maxsize = 1) # create workers self.producers = [] for device in self.devices: self.producers.append( self._Producer( self.producer_input_queue, self.producer_output_queue, device, self.cfg, self.producer_init, self.producer_work, self.producer_end)) self.consumer = self._Consumer( self._get_from_producer, self.consumer_input_queue, self.consumer_output_queue, self.cfg, self.consumer_init, self.consumer_work, self.consumer_end) # start workers for producer in self.producers: producer.start() self.consumer.start()
def _put_into_producer(self, data): self.producer_input_queue.put(data) def _get_from_producer(self): return self.producer_output_queue.get() def _put_into_consumer(self, data): self.consumer_input_queue.put(data) def _get_from_consumer(self): data = self.consumer_output_queue.get() self.consumer_output_queue.task_done() return data
[docs]class UnorderedRunner(BaseRunner): """ A Multi-process runner whose consumer receive data in unorder. The runner will start multi-processes for producers and 1 thread for consumer. """ class _Producer(BaseRunner._Producer): def run(self): # initialization self.init_func(self.device, self.cfg) while True: data = self.input_queue.get() if isinstance(data, self._StopToken): break # decode data and do task data = self.work_func(self.device, self.cfg, data) self.output_queue.put(data) # end self.end_func(self.device, self.cfg) class _Consumer(BaseRunner._Consumer): def run(self): while True: data = self.input_queue.get() self.input_queue.task_done() if isinstance(data, self._StopToken): break elif isinstance(data, self._InitToken): # initialization cfg = self.cfg.copy() self.init_func(cfg) elif isinstance(data, self._EndToken): # end data = self.end_func(cfg) self.output_queue.put(data) del cfg else: # work data = self.receive_func() self.work_func(cfg, data)
[docs] def __init__(self, devices, cfg = CN(), queue_scale = 3.0): """ Args: devices (int or Iterable): If the `devices` is `int`, it will use devices cpu to do the work. If the `devices` is an iterable object, such as list, it will use the devices specified by the iterable object, such as ["cpu", "cuda:0", "cuda:1"]. cfg (easycore.common.config.CfgNode): user custom data. queue_scale (float): scale the queues for communication between processes. """ super(UnorderedRunner, self).__init__(devices, cfg=cfg, queue_scale=queue_scale)
[docs]class OrderedRunner(BaseRunner): """ A Multi-process runner whose consumer receive data in order. The runner will start multi-processes for producers and 1 thread for consumer. """ class _Producer(BaseRunner._Producer): def run(self): # initialization self.init_func(self.device, self.cfg) while True: data = self.input_queue.get() if isinstance(data, self._StopToken): break # decode data and do task id, data = data data = self.work_func(self.device, self.cfg, data) self.output_queue.put((id, data)) # end self.end_func(self.device, self.cfg) class _Consumer(BaseRunner._Consumer): def run(self): while True: data = self.input_queue.get() self.input_queue.task_done() if isinstance(data, self._StopToken): break elif isinstance(data, self._InitToken): # initialization cfg = self.cfg.copy() self.init_func(cfg) elif isinstance(data, self._EndToken): # end data = self.end_func(cfg) self.output_queue.put(data) del cfg else: data = self.receive_func() self.work_func(cfg, data)
[docs] def __init__(self, devices, cfg = CN(), queue_scale = 3.0): """ Args: devices (int or Iterable): If the `devices` is `int`, it will use devices cpu to do the work. If the `devices` is an iterable object, such as list, it will use the devices specified by the iterable object, such as ["cpu", "cuda:0", "cuda:1"]. cfg (easycore.common.config.CfgNode): user custom data. queue_scale (float): scale the queues for communication between processes. """ super(OrderedRunner, self).__init__(devices, cfg=cfg, queue_scale=queue_scale)
[docs] def close(self): """ Shutdown all processes if this runner is alive. """ if self.is_activate: del self._put_id, self._get_id del self._id_buffer, self._data_buffer super(OrderedRunner, self).close()
[docs] def activate(self): """ Restart all processes if this runner is closed. """ if not self.is_activate: self._put_id = 0 self._get_id = 0 self._id_buffer = [] self._data_buffer = [] super(OrderedRunner, self).activate()
def _put_into_producer(self, data): id = self._put_id self._put_id += 1 self.producer_input_queue.put((id, data)) def _get_from_producer(self): if len(self._id_buffer) and self._id_buffer[0] == self._get_id: data = self._data_buffer[0] del self._id_buffer[0], self._data_buffer[0] self._get_id += 1 return data while True: id, data = self.producer_output_queue.get() if id == self._get_id: self._get_id += 1 return data insert_position = bisect.bisect(self._id_buffer, id) self._id_buffer.insert(insert_position, id) self._data_buffer.insert(insert_position, data)