Multiprocessing parallel acceleration tools¶
easycore make it easy to parallel your tasks in cpus and gpus.
API¶
You can write a parallel runner by inheriting class UnorderedRunner
or OrderedRunner
and overriding following 6 static methods.
@staticmethod
def producer_init(device, cfg):
"""
function for producer initialization.
Args:
device (str): device for the this process.
cfg (easycore.common.config.CfgNode): config of this process, you can use it to transfer data
to `producer_work` and `producer_end` function.
"""
pass
@staticmethod
def producer_work(device, cfg, data):
"""
function specify how the producer processes the data.
Args:
device (str): device for this process.
cfg (easycore.common.config.CfgNode): config of this process, you can use it to get data from
`producer_init` function and transfer data to the next `producer_work` and `producer_end`
function.
data (Any): data get from input of `__call__` method.
Returns:
Any: processed data
"""
return data
@staticmethod
def producer_end(device, cfg):
"""
function after finishing all of its task and before close the process.
Args:
device (str): device for this process.
cfg (easycore.common.config.CfgNode): config of this process, you can use it to get data
from `producer_init` and `producer_work` function.
"""
pass
@staticmethod
def consumer_init(cfg):
"""
function for consumer initialization.
Args:
cfg (easycore.common.config.CfgNode): config of this process, you can use it to transfer data
to `consumer_work` and `consumer_end` function.
"""
pass
@staticmethod
def consumer_work(cfg, data):
"""
function specify how the consumer processses the data from producers.
Args:
cfg (easycore.common.config.CfgNode): config of this process, you can use it to get data from
`consumer_init` function and transfer data to the next `consumer_work` and `consumer_end`
function.
"""
pass
@staticmethod
def consumer_end(cfg):
"""
function after receiving all data from producers.
Args:
cfg (easycore.common.config.CfgNode): config of this process, you can use it get data from
`consumer_work` function.
Returns:
Any: processed data
"""
return None
Example 1: Sum of squares¶
It can be implemented with a simple way:
data_list = list(range(100))
result = sum([data * data for data in data_list])
# or more simple
result = 0
for data in data_list:
square = data * data
result += square
We calculate square of each element of the list, and then sum they together. In this case, it can be divided into two tasks. We assign this two tasks to producer and consumer respectively.
from easycore.common.config import CfgNode
from easycore.common.parallel import UnorderedRunner
class Runner(UnorderedRunner):
@staticmethod
def producer_work(device, cfg, data):
return data * data # calculate square of data
@staticmethod
def consumer_init(cfg):
cfg.sum = 0 # init a sum variable with 0, you can use cfg to transfer data
@staticmethod
def consumer_work(cfg, data):
cfg.sum += data # add the square to the sum variable
@staticmethod
def consumer_end(cfg):
return cfg.sum # return the result you need
if __name__ == '__main__':
runner = Runner(devices=3) # if you specify `device with a integer`, it will use cpus.
# You can specify a list of str instead, such as:
# runner = Runner(devices=["cpu", "cpu", "cpu"])
data_list = list(range(100)) # prepare data, it must be iterable
result = runner(data_list) # call the runner
print(result)
runner.close() # close the runner and shutdown all processes it opens.
Example 2: An neural network predictor¶
First we define an neural network in network.py
:
import torch
import torch.nn as nn
import torch.nn.functional as F
class Net(nn.Module):
def __init__(self):
super(Net, self).__init__()
self.fc = nn.Linear(1, 3)
def forward(self, x):
x = self.fc(x)
x = F.relu(x)
return x
The network can be paralleled to 4 gpus in the following way:
from easycore.common.config import CfgNode
from easycore.common.parallel import OrderedRunner
from network import Net
import torch
class Predictor(OrderedRunner):
@staticmethod
def producer_init(device, cfg):
cfg.model = Net() # init the producer with a model
cfg.model.to(device) # transfer the model to certain device
@staticmethod
def producer_work(device, cfg, data):
with torch.no_grad():
data = torch.Tensor([[data]]) # preprocess data
data = data.to(device) # transfer data to certain device
output = cfg.model(data) # predict
output = output.cpu() # transfer result to cpu
return output
@staticmethod
def producer_end(device, cfg):
del cfg.model # delete the model when all data has been predicted.
@staticmethod
def consumer_init(cfg):
cfg.data_list = [] # prepare a list to store all data from producers.
@staticmethod
def consumer_work(cfg, data):
cfg.data_list.append(data) # store data from producers.
@staticmethod
def consumer_end(cfg):
data = torch.cat(cfg.data_list, dim=0) # postprocess data.
return data
if __name__ == '__main__':
predictor = Predictor(devices=["cuda:0", "cuda:1", "cuda:2", "cuda:3"]) # init a parallel predictor
data_list = list(range(100)) # prepare data
result = predictor(data_list) # predict
print(result.shape)
predictor.close() # close the predictor when you no longer need it.
Example 3: Process data with batch¶
You can use a simple generator or pytorch dataloader to generate batch data.
from easycore.common.config import CfgNode
from easycore.torch.parallel import OrderedRunner
from network import Net
import torch
def batch_generator(data_list, batch_size):
for i in range(0, len(data_list), batch_size):
data_batch = data_list[i : i+batch_size]
yield data_batch
class Predictor(OrderedRunner):
@staticmethod
def producer_init(device, cfg):
cfg.model = Net()
cfg.model.to(device)
@staticmethod
def producer_work(device, cfg, data):
with torch.no_grad():
data = torch.Tensor(data).view(-1,1)
data = data.to(device)
output = cfg.model(data)
output = output.cpu()
return output
@staticmethod
def producer_end(device, cfg):
del cfg.model
@staticmethod
def consumer_init(cfg):
cfg.data_list = []
@staticmethod
def consumer_work(cfg, data):
cfg.data_list.append(data)
@staticmethod
def consumer_end(cfg):
data = torch.cat(cfg.data_list, dim=0)
return data
if __name__ == '__main__':
predictor = Rredictor(devices=["cuda:0", "cuda:1"])
data_list = list(range(100))
result = predictor(batch_generator(data_list, batch_size=10))
print(result.shape)
predictor.close()
Here, we replace easycore.common.parallel
with easycore.torch.parallel
. easycore.torch.parallel
has the same API with easycore.common.parallel
but use torch.multiprocessing
library instead of multiprocessing
library.
Example 4: Transfer outside parameters into Runner¶
You can transfer parameters into runner through cfg
parameter. cfg
is a easycore.common.config.CfgNode
. See tutorial “Light weight config tools” for how to use it.
We use “sum of power” as an example:
from easycore.common.config import CfgNode as CN
from easycore.common.parallel import UnorderedRunner
class Runner(UnorderedRunner):
@staticmethod
def producer_work(device, cfg, data):
return data ** cfg.exponent # calculate power of data with outside parameter "exponent".
@staticmethod
def consumer_init(cfg):
cfg.sum = 0 # init a sum variable with 0, you can use cfg to transfer data
@staticmethod
def consumer_work(cfg, data):
cfg.sum += data # add the square to the sum variable
@staticmethod
def consumer_end(cfg):
return cfg.sum # return the result you need
if __name__ == '__main__':
# set parameters outside.
cfg = CN()
cfg.exponent = 3
runner = Runner(devices=3, cfg=cfg) # transfer `cfg` into the runner
data_list = list(range(100))
result = runner(data_list)
print(result)
runner.close()