diff --git a/examples/all_identify_options.py b/examples/all_identify_options.py old mode 100644 new mode 100755 index b1aea44..dd9ad0e --- a/examples/all_identify_options.py +++ b/examples/all_identify_options.py @@ -1,3 +1,4 @@ +#!/usr/bin/env python import nsq import ssl import tornado.options diff --git a/nsq/client.py b/nsq/client.py index 7307e8a..74ea6d1 100644 --- a/nsq/client.py +++ b/nsq/client.py @@ -34,7 +34,7 @@ def _check_last_recv_timestamps(self): def is_stale(conn): timestamp = conn.last_recv_timestamp - return (now - timestamp) > ((self.heartbeat_interval * 2) / 1000.0) + return (now - timestamp) > ((conn.heartbeat_interval * 2) / 1000.0) # first get the list of stale connections, then close # (`conn.close()` may modify the list of connections while we're iterating) @@ -46,3 +46,18 @@ def is_stale(conn): logging.warning('[%s:%s] connection is stale (%.02fs), closing', conn.id, self.name, (now - timestamp)) conn.close() + + def _on_heartbeat(self, conn): + logging.info('[%s:%s] received heartbeat' % (conn.id, self.name)) + self.heartbeat(conn) + + def heartbeat(self, conn): + """ + Called whenever a heartbeat has been received + + This is useful to subclass and override to perform an action based on liveness (for + monitoring, etc.) + + :param conn: the :class:`nsq.AsyncConn` over which the heartbeat was received + """ + pass diff --git a/nsq/reader.py b/nsq/reader.py index 279702b..64eaad5 100644 --- a/nsq/reader.py +++ b/nsq/reader.py @@ -405,7 +405,7 @@ def connect_to_nsqd(self, host, port): conn.on('close', self._on_connection_close) conn.on('ready', self._on_connection_ready) conn.on('message', self._on_message) - conn.on('heartbeat', self.heartbeat) + conn.on('heartbeat', self._on_heartbeat) conn.on('backoff', functools.partial(self._on_backoff_resume, success=False)) conn.on('resume', functools.partial(self._on_backoff_resume, success=True)) @@ -613,17 +613,6 @@ def disabled(self): """ return False - def heartbeat(self, conn): - """ - Called whenever a heartbeat has been received - - This is useful to subclass and override to perform an action based on liveness (for - monitoring, etc.) - - :param conn: the :class:`nsq.AsyncConn` over which the heartbeat was received - """ - logging.info('[%s:%s] received heartbeat' % (conn.id, self.name)) - def validate_message(self, message): return True diff --git a/nsq/writer.py b/nsq/writer.py index 224b3c3..258908d 100644 --- a/nsq/writer.py +++ b/nsq/writer.py @@ -72,9 +72,11 @@ def __init__(self, handlers, **settings): :param nsqd_tcp_addresses: a sequence with elements of the form 'address:port' corresponding to the ``nsqd`` instances this writer should publish to + :param name: a string that is used for logging messages (defaults to first nsqd address) + :param \*\*kwargs: passed to :class:`nsq.AsyncConn` initialization """ - def __init__(self, nsqd_tcp_addresses, **kwargs): + def __init__(self, nsqd_tcp_addresses, name=None, **kwargs): super(Writer, self).__init__(**kwargs) if not isinstance(nsqd_tcp_addresses, (list, set, tuple)): @@ -82,6 +84,7 @@ def __init__(self, nsqd_tcp_addresses, **kwargs): nsqd_tcp_addresses = [nsqd_tcp_addresses] assert nsqd_tcp_addresses + self.name = name or nsqd_tcp_addresses[0] self.nsqd_tcp_addresses = nsqd_tcp_addresses self.conns = {} self.conn_kwargs = kwargs @@ -179,10 +182,3 @@ def _finish_pub(self, conn, data, command, topic, msg): if isinstance(data, nsq.Error): logging.error('[%s] failed to %s (%s, %s), data is %s', conn.id, command, topic, msg, data) - - # - # subclass overwriteable - # - - def heartbeat(self, conn): - logging.info('[%s] received heartbeat', conn.id) diff --git a/tests/test_reader.py b/tests/test_reader.py index 7bfe94d..f6de919 100644 --- a/tests/test_reader.py +++ b/tests/test_reader.py @@ -5,6 +5,7 @@ import time import ssl +import tornado.httpclient import tornado.testing # shunt '..' into sys.path since we are in a 'tests' subdirectory @@ -33,7 +34,19 @@ def setUp(self): '--tls-key=%s/tests/key.pem' % base_dir, '--tls-cert=%s/tests/cert.pem' % base_dir]) self.processes.append(proc) - time.sleep(1) + http = tornado.httpclient.HTTPClient() + start = time.time() + while True: + try: + resp = http.fetch('http://127.0.0.1:4151/ping') + if resp.body == 'OK': + break + continue + except: + if time.time() - start > 5: + raise + time.sleep(0.1) + continue def tearDown(self): super(ReaderIntegrationTest, self).tearDown() @@ -61,9 +74,6 @@ def test_conn_identify_options(self): assert isinstance(response['data'], dict) assert response['data']['snappy'] is True assert response['data']['tls_v1'] is True - # assert response['data']['user_agent'] == 'sup' - # assert response['data']['output_buffer_size'] == 4096 - # assert response['data']['output_buffer_timeout'] == 50 def test_conn_subscribe(self): topic = 'test_conn_suscribe_%s' % time.time() @@ -135,3 +145,22 @@ def handler(msg): **self.identify_options) self.wait() + + def test_reader_heartbeat(self): + this = self + this.count = 0 + + def handler(msg): + return True + + class HeartbeatReader(nsq.Reader): + def heartbeat(self, conn): + this.count += 1 + if this.count == 2: + this.stop() + + topic = 'test_reader_hb_%s' % time.time() + HeartbeatReader(nsqd_tcp_addresses=['127.0.0.1:4150'], topic=topic, channel='ch', + io_loop=self.io_loop, message_handler=handler, max_in_flight=100, + heartbeat_interval=1) + self.wait()