Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BUG: Req-Rep threaded message lost with HWM=1 #2040

Closed
1 task done
ericjmcd opened this issue Oct 19, 2024 · 2 comments
Closed
1 task done

BUG: Req-Rep threaded message lost with HWM=1 #2040

ericjmcd opened this issue Oct 19, 2024 · 2 comments

Comments

@ericjmcd
Copy link

This is a pyzmq bug

  • This is a pyzmq-specific bug, not an issue of zmq socket behavior. Don't worry if you're not sure! We'll figure it out together.

What pyzmq version?

26.0.3

What libzmq version?

4.3.5

Python version (and how it was installed)

Python 3.12 installed via apt from ppa:deadsnakes/ppa

OS

ubuntu 20.04 inside docker running on ubuntu 20.04 host

What happened?

With a REP socket and a REP socket running in a separate thread, sending request-replies over and over with a SNDHWM=1 on the REP socket, eventually, the REP send hangs. The REP does not throw zmq.Again or zmq.Error but the message appears to get lost. The example shows that the REQ times out receiving the response and then sets a global error flag that shows that the REP thread moved past the send (where the message is lost) and is waiting for the next request.

SNDHWM does not really make sense for a REQ socket but we inadvertently set HWM=1 for all our sockets and occasionally experienced this failure so thought it should be reviewed. Setting HWM=2 appears to resolve the issue.

Code to reproduce bug

"""Test ZMQ REQ-REP with REP HWM=1
Example output:
Rep HWM=1
Req timed out receiving response 2659829
Waiting for request after error
Rep thread exiting
Rep HWM=2
Rep thread exiting
"""

import zmq
import time
import threading

URL = 'ipc:///tmp/zmq_test_pipe'

global running, error
error = False
running = True

def reply_sock(hwm):
    ctx = zmq.Context.instance()
    sock = ctx.socket(zmq.REP)
    print(f'Rep HWM={hwm}')
    sock.SNDHWM=hwm
    sock.RCVTIMEO=100
    sock.SNDTIMEO=1
    sock.LINGER=0
    sock.bind(URL)
        
    resp = 'x'*100
    resp_bin = resp.encode('utf-8')

    global running, error
    while running:
        try:
            if error:
                print('Waiting for request after error')
                error = False # We caught the error, so clear flag
            rx = sock.recv()
        except zmq.Again:
            continue
        try:
            sock.send(resp_bin)
        except zmq.Again:
            print('Reply timed out sending response')
            break
        except Exception as ex:
            print(f'Unexpected exception: {ex}')
    sock.close()
    print(f'Rep thread exiting')

def req_sock(count):
    ctx = zmq.Context.instance()
    sock = ctx.socket(zmq.REQ)
    sock.RCVTIMEO=100
    sock.REQ_RELAXED=1  # Does not seem to matter
    sock.LINGER=0
    sock.connect(URL)
        
    i = 0
    global running, error
    while running:
        i += 1
        try:
            msg = 'test'.encode('utf-8')
            sock.send(msg)
        except zmq.Again:
            print('Req timed out sending')
        try:
            resp = sock.recv()
        except zmq.Again:
            print(f'Req timed out receiving response {i}')
            error = True
            time.sleep(1) # Let Rep thread print
            break
        except Exception as ex:
            print(f'Unexpected exception: {ex}')
        if i > count:  # Unclear how many are needed for 100% prob of failure.  1M was not enough sometimes.
            break
    running = False
    sock.close()

def run_test(hwm, N):
    global running, error
    running = True
    error = False
    rep_thread = threading.Thread(target=reply_sock, daemon=True, args=(hwm,))
    rep_thread.start()
    time.sleep(0.1)  # Let Req get started
    req_sock(N)
    rep_thread.join(timeout=3)
    if rep_thread.is_alive():
        print('Req thread did not join')
     

if __name__ == "__main__":
    TEST = None
    N = 5000000
    run_test(hwm=1, N=N)  # Will usually fail < 5M msgs
    run_test(hwm=2, N=N)  # Never fails

Traceback, if applicable

No response

More info

No response

@minrk
Copy link
Member

minrk commented Oct 19, 2024

This does sound like a libzmq bug. Req/rep with hwm greater than 1 doesn't make a lot of sense since it cannot ever have more than one message outstanding due to the req/rep cycle. Maybe it has to do with the timeouts leaving messages unsent for a short time. Feel free to report it on the libzmq repo, since I don't think there's anything pyzmq can do about low-level socket behavior like hwm.

@ericjmcd
Copy link
Author

THanks - submitted at libzmq: zeromq/libzmq#4750

@minrk minrk closed this as completed Oct 22, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants