diff --git a/examples/image-processing/README.md b/examples/image-processing/README.md new file mode 100644 index 000000000..85872a9d8 --- /dev/null +++ b/examples/image-processing/README.md @@ -0,0 +1,83 @@ +# ZMQ client-server for parallel image processing(resize). + +This program is an example of parallel programming with zeromq(for workers syncrhonization). it allows to apply a set of operations on images such as resize, to_gray, threshold, or flip. + +We have a client-server architecture (see https://zguide.zeromq.org/docs/chapter3/), the clients are workers who will process the images, each worker can request a task from the server. A worker cannot execute several tasks at the same time, which makes it possible to have a balanced system in terms of load. + +# cpu usage (parallel zeromq image processing) + + + +
+ +# benchmark + +- sequential-processing Flickr8k image dataset + +```bash + 2022-09-28 10:11:11,714 - image-logger - INFO: server has processed 8090/8091 images in 224s +``` + +- parallel-processing(zeromq ROUTER-DEALER) Flickr8k image dataset + +```bash + 2022-09-28 10:13:15,137 - image-logger - INFO: server has processed 8091/8091 images in 026s +``` + +# code organization + +- main.py + - this file is the entrypoint + - it exposes two modes(sequential-processing, parallel-processing) + - use **python main.py --help** to see availables subcommands +- worker.py + - this file contains the implementation of the zeromq worker + - each worker has two sockets(dealer and subscriber) + - the dealer socket will be used for async communcation between server and worker + - the subscriber socket allows the server to be able to broadcast messages such as (KILL SIGNAL) + +# initilization + +```bash + # create virtual env + python -m venv env + # activate virutal env + source env/bin/activate + # upgrade pip + pip install --upgrade pip + # install dependencies + pip install -r requirements.txt +``` + +# run the programm + +## helper + +```bash + # main help + python main.py --help + # sequential mode help + python main.py sequential-processing --help + # parallel mode help + python main.py parallel-processing --help +``` + +## This program has two modes : + +- sequential mode + ```bash + python main.py sequential-processing + --path2initial_images /path/to/source/images + --path2resized_images /path/to/target/images + --image_extension '*.jpg' + --size 512 512 + ``` +- parallel mode + ```bash + python main.py parallel-processing + --path2initial_images /path/to/source/images + --path2resized_images /path/to/target/images + --image_extension '*.jpg' + --nb_workers 8 + --size 512 512 + ``` diff --git a/examples/image-processing/cpu_usage.jpg b/examples/image-processing/cpu_usage.jpg new file mode 100644 index 000000000..49f74f1ee Binary files /dev/null and b/examples/image-processing/cpu_usage.jpg differ diff --git a/examples/image-processing/log.py b/examples/image-processing/log.py new file mode 100644 index 000000000..04ee82a20 --- /dev/null +++ b/examples/image-processing/log.py @@ -0,0 +1,10 @@ +import logging + +logging.basicConfig( + level=logging.DEBUG, format='%(asctime)s - %(name)s - %(levelname)s: %(message)s' +) + +logger = logging.getLogger('image-logger') + +if __name__ == '__main__': + logger.debug('... log is up ...') diff --git a/examples/image-processing/main.py b/examples/image-processing/main.py new file mode 100644 index 000000000..9d2dd1ce6 --- /dev/null +++ b/examples/image-processing/main.py @@ -0,0 +1,193 @@ +import multiprocessing as mp +import pickle +import time +from glob import glob +from os import path +from typing import List, Tuple + +import click +from log import logger +from PIL import Image +from worker import process_images + +import zmq + + +@click.group(chain=False, invoke_without_command=True) +@click.pass_context +def cmd_group(clk: click.Context): + subcommand = clk.invoked_subcommand + if subcommand is not None: + logger.debug(f'{subcommand} was called') + else: + logger.debug('use --help to see availables subcommands') + + +@cmd_group.command() +@click.option( + '--path2initial_images', help='initial images location', type=click.Path(True) +) +@click.option( + '--path2resized_images', help='resized images location', type=click.Path(True) +) +@click.option( + '--image_extension', help='image file extension', default='*.jpg', type=str +) +@click.option( + '--size', help='new image size', type=click.Tuple([int, int]), default=(512, 512) +) +@click.pass_context +def sequential_processing( + clk: click.Context, + path2initial_images: click.Path(True), + path2resized_images: click.Path(True), + image_extension: str, + size: Tuple[int, int], +): + image_filepaths: List[str] = sorted( + glob(path.join(path2initial_images, image_extension)) + ) + nb_images = len(image_filepaths) + logger.debug(f'{nb_images:05d} were found at {path2initial_images}') + start = time.time() + for cursor, path2source_image in enumerate(image_filepaths): + image = Image.open(path2source_image) + resized_image = image.resize(size=size) + _, filename = path.split(path2source_image) + path2target_image = path.join(path2resized_images, filename) + resized_image.save(path2target_image) + print(resized_image.size, f'{cursor:04d} images') + + end = time.time() + duration = int(round(end - start)) + logger.info( + f'server has processed {cursor:04d}/{nb_images} images in {duration:03d}s' + ) + + +@cmd_group.command() +@click.option( + '--path2initial_images', help='initial images location', type=click.Path(True) +) +@click.option( + '--path2resized_images', help='resized images location', type=click.Path(True) +) +@click.option( + '--image_extension', help='image file extension', default='*.jpg', type=str +) +@click.option( + '--nb_workers', help='number of workers to process images', default=2, type=int +) +@click.option( + '--size', help='new image size', type=click.Tuple([int, int]), default=(512, 512) +) +@click.pass_context +def parallel_processing( + clk: click.Context, + path2initial_images: click.Path(True), + path2resized_images: click.Path(True), + image_extension: str, + nb_workers: int, + size: Tuple[int, int], +): + ZEROMQ_INIT = 0 + WORKER_INIT = 0 + try: + router_address = 'ipc://router.ipc' + publisher_address = 'ipc://publisher.ipc' + + ctx: zmq.Context = zmq.Context() + router_socket: zmq.Socket = ctx.socket(zmq.ROUTER) + publisher_socket: zmq.Socket = ctx.socket(zmq.PUB) + + router_socket.bind(router_address) + publisher_socket.bind(publisher_address) + ZEROMQ_INIT = 1 + + image_filepaths: List[str] = sorted( + glob(path.join(path2initial_images, image_extension)) + ) + nb_images = len(image_filepaths) + logger.debug(f'{nb_images:05d} were found at {path2initial_images}') + + if nb_images == 0: + raise Exception(f'{path2initial_images} is empty') + + workers_acc = [] + server_liveness = mp.Event() + worker_arrival = mp.Value('i', 0) + arrival_condition = mp.Condition() + workers_synchronizer = mp.Barrier(nb_workers) + for worker_id in range(nb_workers): + worker_ = mp.Process( + target=process_images, + kwargs={ + 'size': size, + 'worker_id': worker_id, + 'router_address': router_address, + 'publisher_address': publisher_address, + 'worker_arrival': worker_arrival, + 'server_liveness': server_liveness, + 'arrival_condition': arrival_condition, + 'workers_synchronizer': workers_synchronizer, + 'path2resized_images': path2resized_images, + }, + ) + + workers_acc.append(worker_) + workers_acc[-1].start() + + WORKER_INIT = 1 + arrival_condition.acquire() + server_liveness.set() # send signal to worker + arrival_condition.wait_for( + predicate=lambda: worker_arrival.value == nb_workers, timeout=10 + ) + + if worker_arrival.value != nb_workers: + logger.error('server wait to long for worker to be ready') + exit(1) + + logger.info('all workers are up and ready to process images') + cursor = 0 + keep_loop = True + start = time.time() + while keep_loop: + socket_id, _, msgtype, message = router_socket.recv_multipart() + if msgtype == b'req': + if cursor < nb_images: + path2source_image = image_filepaths[cursor] + router_socket.send_multipart( + [socket_id, b'', path2source_image.encode()] + ) + cursor = cursor + 1 + if msgtype == b'rsp': + content = pickle.loads(message) + if content['status'] == 1: + print(f"{content['worker_id']:03d}", f"{cursor:04d} items") + keep_loop = cursor < nb_images + # end loop over images + end = time.time() + duration = int(round(end - start)) + logger.info( + f'server has processed {cursor:04d}/{nb_images} images in {duration:03d}s' + ) + except Exception as e: + logger.error(e) + finally: + if WORKER_INIT: + logger.debug('server is waiting for worker to quit the loop') + publisher_socket.send_multipart([b'quit', b'']) + for worker_ in workers_acc: + worker_.join() + + if ZEROMQ_INIT == 1: + publisher_socket.close() + router_socket.close() + ctx.term() + + logger.info('server has released all zeromq ressources') + + +if __name__ == '__main__': + cmd_group() diff --git a/examples/image-processing/requirements.txt b/examples/image-processing/requirements.txt new file mode 100644 index 000000000..65042a5b3 --- /dev/null +++ b/examples/image-processing/requirements.txt @@ -0,0 +1,3 @@ +click +pillow +pyzmq diff --git a/examples/image-processing/worker.py b/examples/image-processing/worker.py new file mode 100644 index 000000000..a301f0c35 --- /dev/null +++ b/examples/image-processing/worker.py @@ -0,0 +1,134 @@ +import multiprocessing as mp +from os import path +from typing import Tuple + +from log import logger +from PIL import Image + +import zmq + + +def process_images( + size: Tuple[int, int], + worker_id: int, + router_address: str, + publisher_address: str, + path2resized_images: str, + worker_arrival: mp.Value, + arrival_condition: mp.Condition, + server_liveness: mp.Event, + workers_synchronizer: mp.Barrier, +): + + ZEROMQ_INIT = 0 + try: + ctx: zmq.Context = zmq.Context() + dealer_socket: zmq.Socket = ctx.socket(zmq.DEALER) + subscriber_socket: zmq.Socket = ctx.socket(zmq.SUB) + + dealer_socket.connect(router_address) + subscriber_socket.connect(publisher_address) + subscriber_socket.set(zmq.SUBSCRIBE, b'') # subscribe to all topics + + ZEROMQ_INIT = 1 + + poller = zmq.Poller() + poller.register(dealer_socket, zmq.POLLIN) + poller.register(subscriber_socket, zmq.POLLIN) + + liveness_value = server_liveness.wait( + timeout=10 + ) # wait atleast 10s for server to be ready + if not liveness_value: + logger.error(f'worker {worker_id:03d} wait too long for server to be ready') + exit(1) + + arrival_condition.acquire() + with worker_arrival.get_lock(): + worker_arrival.value += 1 + logger.debug( + f'worker {worker_id:03d} has established connection with {router_address} and {publisher_address}' + ) + arrival_condition.notify_all() + arrival_condition.release() + + logger.debug(f'worker {worker_id:03d} is waiting at the barrier') + workers_synchronizer.wait(timeout=5) # wait at the barrier + + worker_status = 0 # 0 => free | 1 => busy + keep_loop = True + while keep_loop: + if not server_liveness.is_set(): + logger.warning(f'server is down...! worker {worker_id:03d} will stop') + break + + if worker_status == 0: + dealer_socket.send_multipart([b'', b'req', b'']) # ask a job + worker_status = 1 # worker is busy + + incoming_events = dict(poller.poll(100)) + dealer_poller_status = incoming_events.get(dealer_socket, None) + subscriber_poller_status = incoming_events.get(subscriber_socket, None) + if dealer_poller_status is not None: + if dealer_poller_status == zmq.POLLIN: + try: + _, encoded_path2image = dealer_socket.recv_multipart() + path2source_image = encoded_path2image.decode() + image = Image.open(path2source_image) + resized_image = image.resize(size=size) + _, filename = path.split(path2source_image) + path2target_image = path.join(path2resized_images, filename) + resized_image.save(path2target_image) + dealer_socket.send_multipart([b'', b'rsp'], flags=zmq.SNDMORE) + dealer_socket.send_pyobj( + { + 'incoming_image': path2source_image, + 'worker_id': worker_id, + 'status': 1, + } + ) + except Exception as e: + logger.error(e) + logger.error( + f'worker {worker_id:03d} was not able to process : {path2source_image}' + ) + dealer_socket.send_multipart([b'', b'rsp'], flags=zmq.SNDMORE) + dealer_socket.send_pyobj( + { + 'incoming_image': path2source_image, + 'worker_id': worker_id, + 'status': 0, + } + ) + worker_status = 0 # worker is free => can ask a new job + + if subscriber_poller_status is not None: + if subscriber_poller_status == zmq.POLLIN: + topic, _ = subscriber_socket.recv_multipart() + if topic == b'quit': + logger.debug( + f'worker {worker_id:03d} got the quit signal from server' + ) + keep_loop = False + # end while loop ...! + + except KeyboardInterrupt: + pass + except Exception as e: + if workers_synchronizer.broken: + logger.warning( + f'worker {worker_id:03d} will stop. the barrier was broken by some workers' + ) + else: + logger.error(e) + finally: + if ZEROMQ_INIT == 1: + poller.unregister(subscriber_socket) + poller.unregister(dealer_socket) + + subscriber_socket.close() + dealer_socket.close() + + ctx.term() + + logger.info(f'worker {worker_id:03d} has released all zeromq ressources')