Skip to content

Commit

Permalink
reader: fix heartbeat_interval bug
Browse files Browse the repository at this point in the history
* conn not self for heartbeat_interval
* chmod +x and add shebang to example
* consolidate heartbeat handling into Client
* improve test flakiness and duration by intelligently waiting for nsqd
* add test for reader heartbeats
  • Loading branch information
mreiferson committed Feb 21, 2014
1 parent f16b99b commit 2921a47
Show file tree
Hide file tree
Showing 5 changed files with 55 additions and 25 deletions.
1 change: 1 addition & 0 deletions examples/all_identify_options.py
100644 → 100755
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
#!/usr/bin/env python
import nsq
import ssl
import tornado.options
Expand Down
17 changes: 16 additions & 1 deletion nsq/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ def _check_last_recv_timestamps(self):

def is_stale(conn):
timestamp = conn.last_recv_timestamp
return (now - timestamp) > ((self.heartbeat_interval * 2) / 1000.0)
return (now - timestamp) > ((conn.heartbeat_interval * 2) / 1000.0)

# first get the list of stale connections, then close
# (`conn.close()` may modify the list of connections while we're iterating)
Expand All @@ -46,3 +46,18 @@ def is_stale(conn):
logging.warning('[%s:%s] connection is stale (%.02fs), closing',
conn.id, self.name, (now - timestamp))
conn.close()

def _on_heartbeat(self, conn):
logging.info('[%s:%s] received heartbeat' % (conn.id, self.name))
self.heartbeat(conn)

def heartbeat(self, conn):
"""
Called whenever a heartbeat has been received
This is useful to subclass and override to perform an action based on liveness (for
monitoring, etc.)
:param conn: the :class:`nsq.AsyncConn` over which the heartbeat was received
"""
pass
13 changes: 1 addition & 12 deletions nsq/reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -405,7 +405,7 @@ def connect_to_nsqd(self, host, port):
conn.on('close', self._on_connection_close)
conn.on('ready', self._on_connection_ready)
conn.on('message', self._on_message)
conn.on('heartbeat', self.heartbeat)
conn.on('heartbeat', self._on_heartbeat)
conn.on('backoff', functools.partial(self._on_backoff_resume, success=False))
conn.on('resume', functools.partial(self._on_backoff_resume, success=True))

Expand Down Expand Up @@ -613,17 +613,6 @@ def disabled(self):
"""
return False

def heartbeat(self, conn):
"""
Called whenever a heartbeat has been received
This is useful to subclass and override to perform an action based on liveness (for
monitoring, etc.)
:param conn: the :class:`nsq.AsyncConn` over which the heartbeat was received
"""
logging.info('[%s:%s] received heartbeat' % (conn.id, self.name))

def validate_message(self, message):
return True

Expand Down
12 changes: 4 additions & 8 deletions nsq/writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,16 +72,19 @@ def __init__(self, handlers, **settings):
:param nsqd_tcp_addresses: a sequence with elements of the form 'address:port' corresponding
to the ``nsqd`` instances this writer should publish to
:param name: a string that is used for logging messages (defaults to first nsqd address)
:param \*\*kwargs: passed to :class:`nsq.AsyncConn` initialization
"""
def __init__(self, nsqd_tcp_addresses, **kwargs):
def __init__(self, nsqd_tcp_addresses, name=None, **kwargs):
super(Writer, self).__init__(**kwargs)

if not isinstance(nsqd_tcp_addresses, (list, set, tuple)):
assert isinstance(nsqd_tcp_addresses, (str, unicode))
nsqd_tcp_addresses = [nsqd_tcp_addresses]
assert nsqd_tcp_addresses

self.name = name or nsqd_tcp_addresses[0]
self.nsqd_tcp_addresses = nsqd_tcp_addresses
self.conns = {}
self.conn_kwargs = kwargs
Expand Down Expand Up @@ -179,10 +182,3 @@ def _finish_pub(self, conn, data, command, topic, msg):
if isinstance(data, nsq.Error):
logging.error('[%s] failed to %s (%s, %s), data is %s',
conn.id, command, topic, msg, data)

#
# subclass overwriteable
#

def heartbeat(self, conn):
logging.info('[%s] received heartbeat', conn.id)
37 changes: 33 additions & 4 deletions tests/test_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import time
import ssl

import tornado.httpclient
import tornado.testing

# shunt '..' into sys.path since we are in a 'tests' subdirectory
Expand Down Expand Up @@ -33,7 +34,19 @@ def setUp(self):
'--tls-key=%s/tests/key.pem' % base_dir,
'--tls-cert=%s/tests/cert.pem' % base_dir])
self.processes.append(proc)
time.sleep(1)
http = tornado.httpclient.HTTPClient()
start = time.time()
while True:
try:
resp = http.fetch('http://127.0.0.1:4151/ping')
if resp.body == 'OK':
break
continue
except:
if time.time() - start > 5:
raise
time.sleep(0.1)
continue

def tearDown(self):
super(ReaderIntegrationTest, self).tearDown()
Expand Down Expand Up @@ -61,9 +74,6 @@ def test_conn_identify_options(self):
assert isinstance(response['data'], dict)
assert response['data']['snappy'] is True
assert response['data']['tls_v1'] is True
# assert response['data']['user_agent'] == 'sup'
# assert response['data']['output_buffer_size'] == 4096
# assert response['data']['output_buffer_timeout'] == 50

def test_conn_subscribe(self):
topic = 'test_conn_suscribe_%s' % time.time()
Expand Down Expand Up @@ -135,3 +145,22 @@ def handler(msg):
**self.identify_options)

self.wait()

def test_reader_heartbeat(self):
this = self
this.count = 0

def handler(msg):
return True

class HeartbeatReader(nsq.Reader):
def heartbeat(self, conn):
this.count += 1
if this.count == 2:
this.stop()

topic = 'test_reader_hb_%s' % time.time()
HeartbeatReader(nsqd_tcp_addresses=['127.0.0.1:4150'], topic=topic, channel='ch',
io_loop=self.io_loop, message_handler=handler, max_in_flight=100,
heartbeat_interval=1)
self.wait()

0 comments on commit 2921a47

Please sign in to comment.