-
Notifications
You must be signed in to change notification settings - Fork 639
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
new examples : a parallel pipeline for image processing(resize) #1755
base: main
Are you sure you want to change the base?
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,17 @@ | ||
from sys import stdout | ||
|
||
from loguru import logger | ||
|
||
log_format = [ | ||
'<W><k>{time: YYYY-MM-DD hh:mm:ss}</k></W>', | ||
'<c>{file:^15}</c>', | ||
'<w>{function:^25}</w>', | ||
'<e>{line:03d}</e>', | ||
'<r>{level:^10}</r>', | ||
'<W><k>{message:<50}</k></W>', | ||
] | ||
|
||
log_separator = ' | ' | ||
|
||
logger.remove() | ||
logger.add(sink=stdout, level='TRACE', format=log_separator.join(log_format)) |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,212 @@ | ||
import multiprocessing as mp | ||
import pickle | ||
import time | ||
from glob import glob | ||
from os import path | ||
from typing import List, Tuple | ||
|
||
import click | ||
import cv2 | ||
from loguru import logger | ||
from worker import process_images | ||
|
||
import zmq | ||
|
||
""" | ||
ZMQ client-server for parallel image processing(resize). | ||
The server(router) create n workers(dealer): | ||
# each worker can ask a job to the server | ||
# a job is an image to resize and a path where the resized_image will be saved | ||
# once, the image was resied, the worker can ask a new job | ||
# server will keep sending job to workers until there is no images left or user hit ctl+c | ||
NOTE: opencv, numpy, click, loguru must be installed | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Probably makes sense to add a There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hello, as you suggest, i've added the README.md and a requirements.txt(click, pyzmq, pillow). |
||
""" | ||
|
||
""" | ||
This program has two mode : | ||
# sequential | ||
python main.py sequential-processing --path2initial_images /path/to/source/images --path2resized_images /path/to/target/images --image_extension '*.jpg' --size 512 512 | ||
# parallel | ||
python main.py parallel-processing --path2initial_images /path/to/source/images --path2resized_images /path/to/target/images --image_extension '*.jpg' --nb_workers 8 --size 512 512 | ||
parallel mode can be 10x times faster than sequential mode | ||
""" | ||
|
||
|
||
@click.group(chain=False, invoke_without_command=True) | ||
@click.pass_context | ||
def cmd_group(clk: click.Context): | ||
subcommand = clk.invoked_subcommand | ||
if subcommand is not None: | ||
logger.debug(f'{subcommand} was called') | ||
else: | ||
logger.debug('use --help to see availables subcommands') | ||
|
||
|
||
@cmd_group.command() | ||
@click.option( | ||
'--path2initial_images', help='initial images location', type=click.Path(True) | ||
) | ||
@click.option( | ||
'--path2resized_images', help='resized images location', type=click.Path(True) | ||
) | ||
@click.option( | ||
'--image_extension', help='image file extension', default='*.jpg', type=str | ||
) | ||
@click.option( | ||
'--size', help='new image size', type=click.Tuple([int, int]), default=(512, 512) | ||
) | ||
@click.pass_context | ||
def sequential_processing( | ||
clk: click.Context, | ||
path2initial_images: click.Path(True), | ||
path2resized_images: click.Path(True), | ||
image_extension: str, | ||
size: Tuple[int, int], | ||
): | ||
image_filepaths: List[str] = sorted( | ||
glob(path.join(path2initial_images, image_extension)) | ||
) | ||
nb_images = len(image_filepaths) | ||
logger.debug(f'{nb_images:05d} were found at {path2initial_images}') | ||
start = time.time() | ||
for cursor, path2source_image in enumerate(image_filepaths): | ||
bgr_image = cv2.imread(path2source_image) | ||
resized_image = cv2.resize(bgr_image, dsize=size) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If we're just resizing images, pillow is a far lighter dependency than a full computer vision library like opencv. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This new version use pillow for image resizing, hope this example will help the community to have a better understanding on how to combine zeromq and python-multiprocessing for fast image processing tasks. |
||
_, filename = path.split(path2source_image) | ||
path2target_image = path.join(path2resized_images, filename) | ||
cv2.imwrite(path2target_image, resized_image) | ||
print(resized_image.shape, f'{cursor:04d} images') | ||
|
||
end = time.time() | ||
duration = int(round(end - start)) | ||
logger.success( | ||
f'server has processed {cursor:04d}/{nb_images} images in {duration:03d}s' | ||
) | ||
|
||
|
||
@cmd_group.command() | ||
@click.option( | ||
'--path2initial_images', help='initial images location', type=click.Path(True) | ||
) | ||
@click.option( | ||
'--path2resized_images', help='resized images location', type=click.Path(True) | ||
) | ||
@click.option( | ||
'--image_extension', help='image file extension', default='*.jpg', type=str | ||
) | ||
@click.option( | ||
'--nb_workers', help='number of workers to process images', default=2, type=int | ||
) | ||
@click.option( | ||
'--size', help='new image size', type=click.Tuple([int, int]), default=(512, 512) | ||
) | ||
@click.pass_context | ||
def parallel_processing( | ||
clk: click.Context, | ||
path2initial_images: click.Path(True), | ||
path2resized_images: click.Path(True), | ||
image_extension: str, | ||
nb_workers: int, | ||
size: Tuple[int, int], | ||
): | ||
ZEROMQ_INIT = 0 | ||
WORKER_INIT = 0 | ||
try: | ||
router_address = 'ipc://router.ipc' | ||
publisher_address = 'ipc://publisher.ipc' | ||
|
||
ctx: zmq.Context = zmq.Context() | ||
router_socket: zmq.Socket = ctx.socket(zmq.ROUTER) | ||
publisher_socket: zmq.Socket = ctx.socket(zmq.PUB) | ||
|
||
router_socket.bind(router_address) | ||
publisher_socket.bind(publisher_address) | ||
ZEROMQ_INIT = 1 | ||
|
||
image_filepaths: List[str] = sorted( | ||
glob(path.join(path2initial_images, image_extension)) | ||
) | ||
nb_images = len(image_filepaths) | ||
logger.debug(f'{nb_images:05d} were found at {path2initial_images}') | ||
|
||
if nb_images == 0: | ||
raise Exception(f'{path2initial_images} is empty') | ||
|
||
workers_acc = [] | ||
server_liveness = mp.Event() | ||
worker_arrival = mp.Value('i', 0) | ||
arrival_condition = mp.Condition() | ||
workers_synchronizer = mp.Barrier(nb_workers) | ||
for worker_id in range(nb_workers): | ||
worker_ = mp.Process( | ||
target=process_images, | ||
kwargs={ | ||
'size': size, | ||
'worker_id': worker_id, | ||
'router_address': router_address, | ||
'publisher_address': publisher_address, | ||
'worker_arrival': worker_arrival, | ||
'server_liveness': server_liveness, | ||
'arrival_condition': arrival_condition, | ||
'workers_synchronizer': workers_synchronizer, | ||
'path2resized_images': path2resized_images, | ||
}, | ||
) | ||
|
||
workers_acc.append(worker_) | ||
workers_acc[-1].start() | ||
|
||
WORKER_INIT = 1 | ||
arrival_condition.acquire() | ||
server_liveness.set() # send signal to worker | ||
arrival_condition.wait_for( | ||
predicate=lambda: worker_arrival.value == nb_workers, timeout=10 | ||
) | ||
|
||
if worker_arrival.value != nb_workers: | ||
logger.error('server wait to long for worker to be ready') | ||
exit(1) | ||
|
||
logger.success('all workers are up and ready to process images') | ||
cursor = 0 | ||
keep_loop = True | ||
start = time.time() | ||
while keep_loop: | ||
socket_id, _, msgtype, message = router_socket.recv_multipart() | ||
if msgtype == b'req': | ||
if cursor < nb_images: | ||
path2source_image = image_filepaths[cursor] | ||
router_socket.send_multipart( | ||
[socket_id, b'', path2source_image.encode()] | ||
) | ||
cursor = cursor + 1 | ||
if msgtype == b'rsp': | ||
content = pickle.loads(message) | ||
if content['status'] == 1: | ||
print(f"{content['worker_id']:03d}", f"{cursor:04d} items") | ||
keep_loop = cursor < nb_images | ||
# end loop over images | ||
end = time.time() | ||
duration = int(round(end - start)) | ||
logger.success( | ||
f'server has processed {cursor:04d}/{nb_images} images in {duration:03d}s' | ||
) | ||
except Exception as e: | ||
logger.error(e) | ||
finally: | ||
if WORKER_INIT: | ||
logger.debug('server is waiting for worker to quit the loop') | ||
publisher_socket.send_multipart([b'quit', b'']) | ||
for worker_ in workers_acc: | ||
worker_.join() | ||
|
||
if ZEROMQ_INIT == 1: | ||
publisher_socket.close() | ||
router_socket.close() | ||
ctx.term() | ||
|
||
logger.success('server has released all zeromq ressources') | ||
|
||
|
||
if __name__ == '__main__': | ||
cmd_group() |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,134 @@ | ||
import multiprocessing as mp | ||
from os import path | ||
from typing import Tuple | ||
|
||
import cv2 | ||
from log import logger | ||
|
||
import zmq | ||
|
||
|
||
def process_images( | ||
size: Tuple[int, int], | ||
worker_id: int, | ||
router_address: str, | ||
publisher_address: str, | ||
path2resized_images: str, | ||
worker_arrival: mp.Value, | ||
arrival_condition: mp.Condition, | ||
server_liveness: mp.Event, | ||
workers_synchronizer: mp.Barrier, | ||
): | ||
|
||
ZEROMQ_INIT = 0 | ||
try: | ||
ctx: zmq.Context = zmq.Context() | ||
dealer_socket: zmq.Socket = ctx.socket(zmq.DEALER) | ||
subscriber_socket: zmq.Socket = ctx.socket(zmq.SUB) | ||
|
||
dealer_socket.connect(router_address) | ||
subscriber_socket.connect(publisher_address) | ||
subscriber_socket.set(zmq.SUBSCRIBE, b'') # subscribe to all topics | ||
|
||
ZEROMQ_INIT = 1 | ||
|
||
poller = zmq.Poller() | ||
poller.register(dealer_socket, zmq.POLLIN) | ||
poller.register(subscriber_socket, zmq.POLLIN) | ||
|
||
liveness_value = server_liveness.wait( | ||
timeout=10 | ||
) # wait atleast 10s for server to be ready | ||
if not liveness_value: | ||
logger.error(f'worker {worker_id:03d} wait too long for server to be ready') | ||
exit(1) | ||
|
||
arrival_condition.acquire() | ||
with worker_arrival.get_lock(): | ||
worker_arrival.value += 1 | ||
logger.debug( | ||
f'worker {worker_id:03d} has established connection with {router_address} and {publisher_address}' | ||
) | ||
arrival_condition.notify_all() | ||
arrival_condition.release() | ||
|
||
logger.debug(f'worker {worker_id:03d} is waiting at the barrier') | ||
workers_synchronizer.wait(timeout=5) # wait at the barrier | ||
|
||
worker_status = 0 # 0 => free | 1 => busy | ||
keep_loop = True | ||
while keep_loop: | ||
if not server_liveness.is_set(): | ||
logger.warning(f'server is down...! worker {worker_id:03d} will stop') | ||
break | ||
|
||
if worker_status == 0: | ||
dealer_socket.send_multipart([b'', b'req', b'']) # ask a job | ||
worker_status = 1 # worker is busy | ||
|
||
incoming_events = dict(poller.poll(100)) | ||
dealer_poller_status = incoming_events.get(dealer_socket, None) | ||
subscriber_poller_status = incoming_events.get(subscriber_socket, None) | ||
if dealer_poller_status is not None: | ||
if dealer_poller_status == zmq.POLLIN: | ||
try: | ||
_, encoded_path2image = dealer_socket.recv_multipart() | ||
path2source_image = encoded_path2image.decode() | ||
bgr_image = cv2.imread(path2source_image) | ||
resized_image = cv2.resize(bgr_image, dsize=size) | ||
_, filename = path.split(path2source_image) | ||
path2target_image = path.join(path2resized_images, filename) | ||
cv2.imwrite(path2target_image, resized_image) | ||
dealer_socket.send_multipart([b'', b'rsp'], flags=zmq.SNDMORE) | ||
dealer_socket.send_pyobj( | ||
{ | ||
'incoming_image': path2source_image, | ||
'worker_id': worker_id, | ||
'status': 1, | ||
} | ||
) | ||
except Exception as e: | ||
logger.error(e) | ||
logger.error( | ||
f'worker {worker_id:03d} was not able to process : {path2source_image}' | ||
) | ||
dealer_socket.send_multipart([b'', b'rsp'], flags=zmq.SNDMORE) | ||
dealer_socket.send_pyobj( | ||
{ | ||
'incoming_image': path2source_image, | ||
'worker_id': worker_id, | ||
'status': 0, | ||
} | ||
) | ||
worker_status = 0 # worker is free => can ask a new job | ||
|
||
if subscriber_poller_status is not None: | ||
if subscriber_poller_status == zmq.POLLIN: | ||
topic, _ = subscriber_socket.recv_multipart() | ||
if topic == b'quit': | ||
logger.debug( | ||
f'worker {worker_id:03d} got the quit signal from server' | ||
) | ||
keep_loop = False | ||
# end while loop ...! | ||
|
||
except KeyboardInterrupt: | ||
pass | ||
except Exception as e: | ||
if workers_synchronizer.broken: | ||
logger.warning( | ||
f'worker {worker_id:03d} will stop. the barrier was broken by some workers' | ||
) | ||
else: | ||
logger.error(e) | ||
finally: | ||
if ZEROMQ_INIT == 1: | ||
poller.unregister(subscriber_socket) | ||
poller.unregister(dealer_socket) | ||
|
||
subscriber_socket.close() | ||
dealer_socket.close() | ||
|
||
ctx.term() | ||
|
||
logger.success(f'worker {worker_id:03d} has released all zeromq ressources') |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe not worth it to use a third-party logger for a simple example? Let's stick to the standard library.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i also changed the loguru with the standard logging library.