Skip to content

Commit

Permalink
Merge pull request #179 from alpaker/no-decr-total-rdy
Browse files Browse the repository at this point in the history
reader:  don't decrement total_rdy on message receipt
  • Loading branch information
mreiferson authored May 13, 2017
2 parents 278f148 + 47d1693 commit d24ee1c
Show file tree
Hide file tree
Showing 4 changed files with 28 additions and 30 deletions.
1 change: 0 additions & 1 deletion nsq/async.py
Original file line number Diff line number Diff line change
Expand Up @@ -486,7 +486,6 @@ def _on_data(self, data, **kwargs):
frame, data = protocol.unpack_response(data)
if frame == protocol.FRAME_TYPE_MESSAGE:
self.last_msg_timestamp = time.time()
self.rdy = max(self.rdy - 1, 0)
self.in_flight += 1

message = protocol.decode_message(data)
Expand Down
54 changes: 26 additions & 28 deletions nsq/reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -317,21 +317,7 @@ def _on_message(self, conn, message, **kwargs):
logger.exception('[%s:%s] failed to handle_message() %r', conn.id, self.name, message)

def _handle_message(self, conn, message):
self.total_rdy = max(self.total_rdy - 1, 0)

rdy_conn = conn
if len(self.conns) > self.max_in_flight and time.time() - self.random_rdy_ts > 30:
# if all connections aren't getting RDY
# occsionally randomize which connection gets RDY
self.random_rdy_ts = time.time()
conns_with_no_rdy = [c for c in itervalues(self.conns) if not c.rdy]
if conns_with_no_rdy:
rdy_conn = random.choice(conns_with_no_rdy)
if rdy_conn is not conn:
logger.info('[%s:%s] redistributing RDY to %s',
conn.id, self.name, rdy_conn.id)

self._maybe_update_rdy(rdy_conn)
self._maybe_update_rdy(conn)

success = False
try:
Expand All @@ -358,7 +344,9 @@ def _maybe_update_rdy(self, conn):
if self.backoff_timer.get_interval() or self.max_in_flight == 0:
return

if conn.rdy <= 1 or conn.rdy < int(conn.last_rdy * 0.25):
# On a new connection or in backoff we start with a tentative RDY count
# of 1. After successfully receiving a first message we go to full throttle.
if conn.rdy == 1:
self._send_rdy(conn, self._connection_max_in_flight())

def _finish_backoff_block(self):
Expand Down Expand Up @@ -452,15 +440,10 @@ def _send_rdy(self, conn, value):
if value > conn.max_rdy_count:
value = conn.max_rdy_count

if (self.total_rdy + value) > self.max_in_flight:
if not conn.rdy:
# if we're going from RDY 0 to non-0 and we couldn't because
# of the configured max in flight, try again
rdy_retry_callback = functools.partial(self._rdy_retry, conn, value)
conn.rdy_timeout = self.io_loop.add_timeout(time.time() + 5, rdy_retry_callback)
new_rdy = max(self.total_rdy - conn.rdy + value, 0)
if new_rdy > self.max_in_flight:
return

new_rdy = max(self.total_rdy - conn.rdy + value, 0)
if conn.send_rdy(value):
self.total_rdy = new_rdy

Expand Down Expand Up @@ -665,10 +648,25 @@ def _redistribute_rdy_state(self):
logger.info('[%s:%s] idle connection, giving up RDY count', conn.id, self.name)
self._send_rdy(conn, 0)

conns = self.conns.values()

in_flight_or_rdy = len([c for c in conns if c.in_flight or c.rdy])
if backoff_interval:
max_in_flight = 1 - self.total_rdy
available_rdy = max(0, 1 - in_flight_or_rdy)
else:
max_in_flight = self.max_in_flight - self.total_rdy
available_rdy = max(0, self.max_in_flight - in_flight_or_rdy)

# if moving any connections from RDY 0 to non-0 would violate in-flight constraints,
# set RDY 0 on some connection with msgs in flight so that a later redistribution
# round can proceed and we don't stay pinned to the same connections.
#
# if nothing's in flight, then we have connections with RDY 1 that are still
# waiting to hit the idle timeout, in which case it's ok to do nothing.
in_flight = [c for c in conns if c.in_flight]
if in_flight and not available_rdy:
c = random.choice(in_flight)
logger.info('[%s:%s] too many msgs in flight, giving up RDY count', c.id, self.name)
self._send_rdy(c, 0)

# randomly walk the list of possible connections and send RDY 1 (up to our
# calculated "max_in_flight"). We only need to send RDY 1 because in both
Expand All @@ -677,9 +675,9 @@ def _redistribute_rdy_state(self):
# We also don't attempt to avoid the connections who previously might have had RDY 1
# because it would be overly complicated and not actually worth it (ie. given enough
# redistribution rounds it doesn't matter).
possible_conns = list(self.conns.values())
while possible_conns and max_in_flight:
max_in_flight -= 1
possible_conns = [c for c in conns if not (c.in_flight or c.rdy)]
while possible_conns and available_rdy:
available_rdy -= 1
conn = possible_conns.pop(random.randrange(len(possible_conns)))
logger.info('[%s:%s] redistributing RDY', conn.id, self.name)
self._send_rdy(conn, 1)
Expand Down
1 change: 1 addition & 0 deletions tests/test_backoff.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ def _get_conn(reader):

def _send_message(conn):
msg = _get_message(conn)
conn.in_flight += 1
conn.trigger(event.MESSAGE, conn=conn, message=msg)
return msg

Expand Down
2 changes: 1 addition & 1 deletion tests/test_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ def test_conn_messages(self):

def _on_message(*args, **kwargs):
self.msg_count += 1
if c.rdy == 0:
if c.in_flight == 5:
self.stop()

def _on_ready(*args, **kwargs):
Expand Down

0 comments on commit d24ee1c

Please sign in to comment.