From 74f4f12d44836228de6205d85d64663c08408631 Mon Sep 17 00:00:00 2001 From: Matt Reiferson Date: Sun, 16 Feb 2014 16:01:18 -0500 Subject: [PATCH 1/4] allow io_loop to be specified --- nsq/async.py | 14 +++++++++----- nsq/client.py | 10 ++++++++-- nsq/reader.py | 32 ++++++++++++++++++-------------- nsq/writer.py | 8 ++++---- 4 files changed, 39 insertions(+), 25 deletions(-) diff --git a/nsq/async.py b/nsq/async.py index 8591d96..8ae6706 100644 --- a/nsq/async.py +++ b/nsq/async.py @@ -86,7 +86,8 @@ class AsyncConn(EventedMixin): """ def __init__(self, host, port, timeout=1.0, heartbeat_interval=30, requeue_delay=90, tls_v1=False, tls_options=None, snappy=False, user_agent=None, - output_buffer_size=16 * 1024, output_buffer_timeout=250, sample_rate=0): + output_buffer_size=16 * 1024, output_buffer_timeout=250, sample_rate=0, + io_loop=None): assert isinstance(host, (str, unicode)) assert isinstance(port, int) assert isinstance(timeout, float) @@ -118,6 +119,9 @@ def __init__(self, host, port, timeout=1.0, heartbeat_interval=30, requeue_delay self.short_hostname = self.hostname.split('.')[0] self.heartbeat_interval = heartbeat_interval * 1000 self.requeue_delay = requeue_delay + self.io_loop = io_loop + if not self.io_loop: + self.io_loop = tornado.ioloop.IOLoop.instance() self.output_buffer_size = output_buffer_size self.output_buffer_timeout = output_buffer_timeout @@ -144,7 +148,7 @@ def connect(self): self.socket.settimeout(self.timeout) self.socket.setblocking(0) - self.stream = tornado.iostream.IOStream(self.socket) + self.stream = tornado.iostream.IOStream(self.socket, io_loop=self.io_loop) self.stream.set_close_callback(self._socket_close) self.state = 'CONNECTING' @@ -183,7 +187,7 @@ def _read_body(self, data): self.trigger('data', conn=self, data=data) except Exception: logging.exception('uncaught exception in data event') - tornado.ioloop.IOLoop.instance().add_callback(self._start_read) + self.io_loop.add_callback(self._start_read) def send(self, data): self.stream.write(data) @@ -196,7 +200,7 @@ def upgrade_to_tls(self, options=None): # first remove the event handler for the currently open socket # so that when we add the socket to the new SSLIOStream below, # it can re-add the appropriate event handlers. - tornado.ioloop.IOLoop.instance().remove_handler(self.socket.fileno()) + self.io_loop.remove_handler(self.socket.fileno()) opts = { 'cert_reqs': ssl.CERT_REQUIRED, @@ -206,7 +210,7 @@ def upgrade_to_tls(self, options=None): self.socket = ssl.wrap_socket(self.socket, ssl_version=ssl.PROTOCOL_TLSv1, do_handshake_on_connect=False, **opts) - self.stream = tornado.iostream.SSLIOStream(self.socket) + self.stream = tornado.iostream.SSLIOStream(self.socket, io_loop=self.io_loop) self.stream.set_close_callback(self._socket_close) # now that the IOStream has been swapped we can kickstart diff --git a/nsq/client.py b/nsq/client.py index 53e2407..7307e8a 100644 --- a/nsq/client.py +++ b/nsq/client.py @@ -5,8 +5,14 @@ class Client(object): - def __init__(self): - tornado.ioloop.PeriodicCallback(self._check_last_recv_timestamps, 60 * 1000).start() + def __init__(self, io_loop=None, **kwargs): + self.io_loop = io_loop + if not self.io_loop: + self.io_loop = tornado.ioloop.IOLoop.instance() + + tornado.ioloop.PeriodicCallback(self._check_last_recv_timestamps, + 60 * 1000, + io_loop=self.io_loop).start() def _on_connection_identify(self, conn, data, **kwargs): logging.info('[%s:%s] IDENTIFY sent %r' % (conn.id, self.name, data)) diff --git a/nsq/reader.py b/nsq/reader.py index 5ad23aa..279702b 100644 --- a/nsq/reader.py +++ b/nsq/reader.py @@ -124,6 +124,8 @@ def __init__(self, topic, channel, message_handler=None, name=None, max_tries=5, max_in_flight=1, lookupd_poll_interval=60, low_rdy_idle_timeout=10, max_backoff_duration=128, lookupd_poll_jitter=0.3, **kwargs): + super(Reader, self).__init__(**kwargs) + assert isinstance(topic, (str, unicode)) and len(topic) > 0 assert isinstance(channel, (str, unicode)) and len(channel) > 0 assert isinstance(max_in_flight, int) and max_in_flight > 0 @@ -175,12 +177,10 @@ def __init__(self, topic, channel, message_handler=None, name=None, self.conns = {} self.connection_attempts = {} - self.http_client = tornado.httpclient.AsyncHTTPClient() - - self.ioloop = tornado.ioloop.IOLoop.instance() + self.http_client = tornado.httpclient.AsyncHTTPClient(io_loop=self.io_loop) # will execute when run() is called (for all Reader instances) - self.ioloop.add_callback(self._run) + self.io_loop.add_callback(self._run) def _run(self): assert self.message_handler, "you must specify the Reader's message_handler" @@ -191,7 +191,9 @@ def _run(self): address, port = addr.split(':') self.connect_to_nsqd(address, int(port)) - tornado.ioloop.PeriodicCallback(self._redistribute_rdy_state, 5 * 1000).start() + tornado.ioloop.PeriodicCallback(self._redistribute_rdy_state, + 5 * 1000, + io_loop=self.io_loop).start() if not self.lookupd_http_addresses: return @@ -199,11 +201,13 @@ def _run(self): self.query_lookupd() periodic = tornado.ioloop.PeriodicCallback(self.query_lookupd, - self.lookupd_poll_interval * 1000) + self.lookupd_poll_interval * 1000, + io_loop=self.io_loop) + # randomize the time we start this poll loop so that all # consumers don't query at exactly the same time delay = random.random() * self.lookupd_poll_interval * self.lookupd_poll_jitter - self.ioloop.add_timeout(time.time() + delay, periodic.start) + self.io_loop.add_timeout(time.time() + delay, periodic.start) def set_message_handler(self, message_handler): """ @@ -352,8 +356,8 @@ def _start_backoff_block(self): for c in self.conns.values(): self._send_rdy(c, 0) - self.backoff_timeout = self.ioloop.add_timeout(time.time() + backoff_interval, - self._finish_backoff_block) + self.backoff_timeout = self.io_loop.add_timeout(time.time() + backoff_interval, + self._finish_backoff_block) def _rdy_retry(self, conn, value): conn.rdy_timeout = None @@ -361,13 +365,13 @@ def _rdy_retry(self, conn, value): def _send_rdy(self, conn, value): if conn.rdy_timeout: - self.ioloop.remove_timeout(conn.rdy_timeout) + self.io_loop.remove_timeout(conn.rdy_timeout) conn.rdy_timeout = None if value and self.disabled(): logging.info('[%s:%s] disabled, delaying RDY state change', conn.id, self.name) rdy_retry_callback = functools.partial(self._rdy_retry, conn, value) - conn.rdy_timeout = self.ioloop.add_timeout(time.time() + 15, rdy_retry_callback) + conn.rdy_timeout = self.io_loop.add_timeout(time.time() + 15, rdy_retry_callback) return if value > conn.max_rdy_count: @@ -378,7 +382,7 @@ def _send_rdy(self, conn, value): # if we're going from RDY 0 to non-0 and we couldn't because # of the configured max in flight, try again rdy_retry_callback = functools.partial(self._rdy_retry, conn, value) - conn.rdy_timeout = self.ioloop.add_timeout(time.time() + 5, rdy_retry_callback) + conn.rdy_timeout = self.io_loop.add_timeout(time.time() + 5, rdy_retry_callback) return if conn.send_rdy(value): @@ -467,7 +471,7 @@ def _on_connection_close(self, conn, **kwargs): self.need_rdy_redistributed = True if conn.rdy_timeout: - self.ioloop.remove_timeout(conn.rdy_timeout) + self.io_loop.remove_timeout(conn.rdy_timeout) conn.rdy_timeout = None if not self.lookupd_http_addresses: @@ -475,7 +479,7 @@ def _on_connection_close(self, conn, **kwargs): logging.info('[%s:%s] attempting to reconnect in 15s', conn.id, self.name) reconnect_callback = functools.partial(self.connect_to_nsqd, host=conn.host, port=conn.port) - self.ioloop.add_timeout(time.time() + 15, reconnect_callback) + self.io_loop.add_timeout(time.time() + 15, reconnect_callback) def query_lookupd(self): """ diff --git a/nsq/writer.py b/nsq/writer.py index 91541ce..224b3c3 100644 --- a/nsq/writer.py +++ b/nsq/writer.py @@ -4,8 +4,6 @@ import functools import random -import tornado.ioloop - from client import Client import nsq import async @@ -77,6 +75,8 @@ def __init__(self, handlers, **settings): :param \*\*kwargs: passed to :class:`nsq.AsyncConn` initialization """ def __init__(self, nsqd_tcp_addresses, **kwargs): + super(Writer, self).__init__(**kwargs) + if not isinstance(nsqd_tcp_addresses, (list, set, tuple)): assert isinstance(nsqd_tcp_addresses, (str, unicode)) nsqd_tcp_addresses = [nsqd_tcp_addresses] @@ -86,7 +86,7 @@ def __init__(self, nsqd_tcp_addresses, **kwargs): self.conns = {} self.conn_kwargs = kwargs - self.ioloop.add_callback(self._run) + self.io_loop.add_callback(self._run) def _run(self): logging.info('starting writer...') @@ -173,7 +173,7 @@ def _on_connection_close(self, conn, **kwargs): logging.info('[%s] attempting to reconnect in 15s', conn.id) reconnect_callback = functools.partial(self.connect_to_nsqd, host=conn.host, port=conn.port) - tornado.ioloop.IOLoop.instance().add_timeout(time.time() + 15, reconnect_callback) + self.io_loop.add_timeout(time.time() + 15, reconnect_callback) def _finish_pub(self, conn, data, command, topic, msg): if isinstance(data, nsq.Error): From 7aa488c1ff8d5af2458b4ec9f469afd431458082 Mon Sep 17 00:00:00 2001 From: Matt Reiferson Date: Sun, 16 Feb 2014 16:49:27 -0500 Subject: [PATCH 2/4] fix tests after io_loop refactoring --- tests/test_async.py | 42 ++++++++++++------------ tests/test_backoff.py | 76 ++++++++++++++++++------------------------- 2 files changed, 53 insertions(+), 65 deletions(-) diff --git a/tests/test_async.py b/tests/test_async.py index ab7c47b..85d98f4 100644 --- a/tests/test_async.py +++ b/tests/test_async.py @@ -3,7 +3,7 @@ import sys import struct -from mock import patch, create_autospec +from mock import patch, create_autospec, MagicMock from tornado.iostream import IOStream @@ -20,8 +20,8 @@ def f(*args, **kwargs): pass -def _get_test_conn(): - conn = nsq.async.AsyncConn('test', 4150) +def _get_test_conn(io_loop=None): + conn = nsq.async.AsyncConn('test', 4150, io_loop=io_loop) # now set the stream attribute, which is ordinarily set in conn.connect() conn.stream = create_autospec(IOStream) return conn @@ -87,23 +87,23 @@ def test_read_size(): def test_read_body(): - conn = _get_test_conn() + mock_io_loop = MagicMock() + + conn = _get_test_conn(io_loop=mock_io_loop) on_data = create_autospec(f) conn.on('data', on_data) - # I won't autospec the mock below, it doesn't seem to want to behave. - # I only assert against one of its attrs anyway, which I will spec - with patch('nsq.async.tornado.ioloop.IOLoop.instance') as mock_io_loop: - mock_ioloop_addcb = create_autospec(f) - mock_io_loop.return_value.add_callback = mock_ioloop_addcb - data = 'NSQ' - conn._read_body(data) - on_data.assert_called_once_with(conn=conn, data=data) - mock_ioloop_addcb.assert_called_once_with(conn._start_read) - - # now test functionality when the data_callback fails - on_data.reset_mock() - mock_ioloop_addcb.reset_mock() - on_data.return_value = Exception("Boom.") - conn._read_body(data) - # verify that we still added callback for the next start_read - mock_ioloop_addcb.assert_called_once_with(conn._start_read) + + mock_ioloop_addcb = create_autospec(f) + mock_io_loop.add_callback = mock_ioloop_addcb + data = 'NSQ' + conn._read_body(data) + on_data.assert_called_once_with(conn=conn, data=data) + mock_ioloop_addcb.assert_called_once_with(conn._start_read) + + # now test functionality when the data_callback fails + on_data.reset_mock() + mock_ioloop_addcb.reset_mock() + on_data.return_value = Exception("Boom.") + conn._read_body(data) + # verify that we still added callback for the next start_read + mock_ioloop_addcb.assert_called_once_with(conn._start_read) diff --git a/tests/test_backoff.py b/tests/test_backoff.py index cb21634..a80ace0 100644 --- a/tests/test_backoff.py +++ b/tests/test_backoff.py @@ -4,7 +4,8 @@ import random import time -from mock import Mock, patch +from mock import patch, create_autospec +from tornado.ioloop import IOLoop # shunt '..' into sys.path since we are in a 'tests' subdirectory base_dir = os.path.abspath(os.path.join(os.path.dirname(os.path.abspath(__file__)), '..')) @@ -21,11 +22,12 @@ def _message_handler(msg): msg.enable_async() -def _get_reader(): +def _get_reader(io_loop=None): return nsq.Reader("test", "test", message_handler=_message_handler, lookupd_http_addresses=["http://test.local:4161"], - max_in_flight=5) + max_in_flight=5, + io_loop=io_loop) def _get_conn(reader): @@ -50,12 +52,9 @@ def _get_message(conn): return msg -@patch('nsq.async.tornado.ioloop.IOLoop', autospec=True) -def test_backoff_easy(mock_ioloop): - instance = Mock() - mock_ioloop.instance.return_value = instance - - r = _get_reader() +def test_backoff_easy(): + mock_ioloop = create_autospec(IOLoop) + r = _get_reader(mock_ioloop) conn = _get_conn(r) msg = _send_message(conn) @@ -69,9 +68,9 @@ def test_backoff_easy(mock_ioloop): msg.trigger('requeue', message=msg) assert r.backoff_block is True assert r.backoff_timer.get_interval() > 0 - assert instance.add_timeout.called + assert mock_ioloop.add_timeout.called - timeout_args, timeout_kwargs = instance.add_timeout.call_args + timeout_args, timeout_kwargs = mock_ioloop.add_timeout.call_args timeout_args[1]() assert r.backoff_block is False send_args, send_kwargs = conn.stream.write.call_args @@ -93,16 +92,13 @@ def test_backoff_easy(mock_ioloop): assert conn.stream.write.call_args_list == [((arg,),) for arg in expected_args] -@patch('nsq.async.tornado.ioloop.IOLoop', autospec=True) -def test_backoff_hard(mock_ioloop): - expected_args = ['SUB test test\n', 'RDY 1\n', 'RDY 5\n'] - - instance = Mock() - mock_ioloop.instance.return_value = instance - - r = _get_reader() +def test_backoff_hard(): + mock_ioloop = create_autospec(IOLoop) + r = _get_reader(io_loop=mock_ioloop) conn = _get_conn(r) + expected_args = ['SUB test test\n', 'RDY 1\n', 'RDY 5\n'] + num_fails = 0 fail = True last_timeout_time = 0 @@ -124,9 +120,9 @@ def test_backoff_hard(mock_ioloop): assert r.backoff_block is True assert r.backoff_timer.get_interval() > 0 - assert instance.add_timeout.called + assert mock_ioloop.add_timeout.called - timeout_args, timeout_kwargs = instance.add_timeout.call_args + timeout_args, timeout_kwargs = mock_ioloop.add_timeout.call_args if timeout_args[0] != last_timeout_time: timeout_args[1]() last_timeout_time = timeout_args[0] @@ -142,7 +138,7 @@ def test_backoff_hard(mock_ioloop): msg.trigger('finish', message=msg) expected_args.append('FIN 1234\n') - timeout_args, timeout_kwargs = instance.add_timeout.call_args + timeout_args, timeout_kwargs = mock_ioloop.add_timeout.call_args if timeout_args[0] != last_timeout_time: timeout_args[1]() last_timeout_time = timeout_args[0] @@ -163,15 +159,11 @@ def test_backoff_hard(mock_ioloop): assert conn.stream.write.call_args_list == [((arg,),) for arg in expected_args] -@patch('nsq.async.tornado.ioloop.IOLoop', autospec=True) -def test_backoff_many_conns(mock_ioloop): - num_conns = 5 - - instance = Mock() - mock_ioloop.instance.return_value = instance - - r = _get_reader() +def test_backoff_many_conns(): + mock_ioloop = create_autospec(IOLoop) + r = _get_reader(io_loop=mock_ioloop) + num_conns = 5 conns = [] for i in range(num_conns): conn = _get_conn(r) @@ -208,9 +200,9 @@ def test_backoff_many_conns(mock_ioloop): assert r.backoff_block is True assert r.backoff_timer.get_interval() > 0 - assert instance.add_timeout.called + assert mock_ioloop.add_timeout.called - timeout_args, timeout_kwargs = instance.add_timeout.call_args + timeout_args, timeout_kwargs = mock_ioloop.add_timeout.call_args if timeout_args[0] != last_timeout_time: conn = timeout_args[1]() last_timeout_time = timeout_args[0] @@ -242,7 +234,7 @@ def test_backoff_many_conns(mock_ioloop): conn.expected_args.append('FIN 1234\n') - timeout_args, timeout_kwargs = instance.add_timeout.call_args + timeout_args, timeout_kwargs = mock_ioloop.add_timeout.call_args if timeout_args[0] != last_timeout_time: conn = timeout_args[1]() last_timeout_time = timeout_args[0] @@ -264,15 +256,11 @@ def test_backoff_many_conns(mock_ioloop): assert c.stream.write.call_args_list == [((arg,),) for arg in c.expected_args] -@patch('nsq.async.tornado.ioloop.IOLoop', autospec=True) -def test_backoff_conns_disconnect(mock_ioloop): - num_conns = 5 - - instance = Mock() - mock_ioloop.instance.return_value = instance - - r = _get_reader() +def test_backoff_conns_disconnect(): + mock_ioloop = create_autospec(IOLoop) + r = _get_reader(io_loop=mock_ioloop) + num_conns = 5 conns = [] for i in range(num_conns): conn = _get_conn(r) @@ -327,9 +315,9 @@ def test_backoff_conns_disconnect(mock_ioloop): assert r.backoff_block is True assert r.backoff_timer.get_interval() > 0 - assert instance.add_timeout.called + assert mock_ioloop.add_timeout.called - timeout_args, timeout_kwargs = instance.add_timeout.call_args + timeout_args, timeout_kwargs = mock_ioloop.add_timeout.call_args if timeout_args[0] != last_timeout_time: conn = timeout_args[1]() last_timeout_time = timeout_args[0] @@ -350,7 +338,7 @@ def test_backoff_conns_disconnect(mock_ioloop): conn.expected_args.append('FIN 1234\n') - timeout_args, timeout_kwargs = instance.add_timeout.call_args + timeout_args, timeout_kwargs = mock_ioloop.add_timeout.call_args if timeout_args[0] != last_timeout_time: conn = timeout_args[1]() last_timeout_time = timeout_args[0] From 84a9e0c6a6238fccf37db8fea584507bce3b41a4 Mon Sep 17 00:00:00 2001 From: Matt Reiferson Date: Sun, 16 Feb 2014 16:01:38 -0500 Subject: [PATCH 3/4] add reader integration tests --- tests/cert.pem | 26 ++++++++ tests/key.pem | 27 +++++++++ tests/test_async.py | 1 - tests/test_reader.py | 137 +++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 190 insertions(+), 1 deletion(-) create mode 100644 tests/cert.pem create mode 100644 tests/key.pem create mode 100644 tests/test_reader.py diff --git a/tests/cert.pem b/tests/cert.pem new file mode 100644 index 0000000..ed73acf --- /dev/null +++ b/tests/cert.pem @@ -0,0 +1,26 @@ +-----BEGIN CERTIFICATE----- +MIIEbjCCA1agAwIBAgIJAK6x7y6AwBmLMA0GCSqGSIb3DQEBBQUAMIGAMQswCQYD +VQQGEwJVUzERMA8GA1UECBMITmV3IFlvcmsxFjAUBgNVBAcTDU5ldyBZb3JrIENp +dHkxDDAKBgNVBAoTA05TUTETMBEGA1UEAxMKdGVzdC5sb2NhbDEjMCEGCSqGSIb3 +DQEJARYUbXJlaWZlcnNvbkBnbWFpbC5jb20wHhcNMTMwNjI4MDA0MzQ4WhcNMTYw +NDE3MDA0MzQ4WjCBgDELMAkGA1UEBhMCVVMxETAPBgNVBAgTCE5ldyBZb3JrMRYw +FAYDVQQHEw1OZXcgWW9yayBDaXR5MQwwCgYDVQQKEwNOU1ExEzARBgNVBAMTCnRl +c3QubG9jYWwxIzAhBgkqhkiG9w0BCQEWFG1yZWlmZXJzb25AZ21haWwuY29tMIIB +IjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAnX0KB+svwy+yHU2qggz/EaGg +craKShagKo+9M9y5HLM852ngk5c+t+tJJbx3N954Wr1FXBuGIv1ltU05rU4zhvBS +25tVP1UIEnT5pBt2TeetLkl199Y7fxh1hKmnwJMG3fy3VZdNXEndBombXMmtXpQY +shuEJHKeUNDbQKz5X+GjEdkTPO/HY/VMHsxS23pbSimQozMg3hvLIdgv0aS3QECz +ydZBgTPThy3uDtHIuCpxCwXd/vDF68ATlYgo3h3lh2vxNwM/pjklIUhzMh4XaKQF +7m3/0KbtUcXfy0QHueeuMr11E9MAFNyRN4xf9Fk1yB97KJ3PJBTC5WD/m1nW+QID +AQABo4HoMIHlMB0GA1UdDgQWBBR3HMBws4lmYYSIgwoZsfW+bbgaMjCBtQYDVR0j +BIGtMIGqgBR3HMBws4lmYYSIgwoZsfW+bbgaMqGBhqSBgzCBgDELMAkGA1UEBhMC +VVMxETAPBgNVBAgTCE5ldyBZb3JrMRYwFAYDVQQHEw1OZXcgWW9yayBDaXR5MQww +CgYDVQQKEwNOU1ExEzARBgNVBAMTCnRlc3QubG9jYWwxIzAhBgkqhkiG9w0BCQEW +FG1yZWlmZXJzb25AZ21haWwuY29tggkArrHvLoDAGYswDAYDVR0TBAUwAwEB/zAN +BgkqhkiG9w0BAQUFAAOCAQEANOYTbanW2iyV1v4oYpcM/y3TWcQKzSME8D2SGFZb +dbMYU81hH3TTlQdvyeh3FAcdjhKE8Xi/RfNNjEslTBscdKXePGpZg6eXRNJzPP5K +KZPf5u6tcpAeUOKrMqbGwbE+h2QixxG1EoVQtE421szsU2P7nHRTdHzKFRnOerfl +Phm3NocR0P40Rv7WKdxpOvqc+XKf0onTruoVYoPWGpwcLixCG0zu4ZQ23/L/Dy18 +4u70Hbq6O/6kq9FBFaDNp3IhiEdu2Cq6ZplU6bL9XDF27KIEErHwtuqBHVlMG+zB +oH/k9vZvwH7OwAjHdKp+1yeZFLYC8K5hjFIHqcdwpZCNIg== +-----END CERTIFICATE----- diff --git a/tests/key.pem b/tests/key.pem new file mode 100644 index 0000000..9b4db2e --- /dev/null +++ b/tests/key.pem @@ -0,0 +1,27 @@ +-----BEGIN RSA PRIVATE KEY----- +MIIEogIBAAKCAQEAnX0KB+svwy+yHU2qggz/EaGgcraKShagKo+9M9y5HLM852ng +k5c+t+tJJbx3N954Wr1FXBuGIv1ltU05rU4zhvBS25tVP1UIEnT5pBt2TeetLkl1 +99Y7fxh1hKmnwJMG3fy3VZdNXEndBombXMmtXpQYshuEJHKeUNDbQKz5X+GjEdkT +PO/HY/VMHsxS23pbSimQozMg3hvLIdgv0aS3QECzydZBgTPThy3uDtHIuCpxCwXd +/vDF68ATlYgo3h3lh2vxNwM/pjklIUhzMh4XaKQF7m3/0KbtUcXfy0QHueeuMr11 +E9MAFNyRN4xf9Fk1yB97KJ3PJBTC5WD/m1nW+QIDAQABAoIBACvtfKbIywG+hAf4 +ad7skRjx5DcbA2e29+XnQfb9UgTXWd2SgrmoLi5OypBkCTzkKN3mfTo70yZfV8dC +Sxwz+9tfnTz0DssjhKThS+CiaFVCkeOfSfBfKSlCQUVHrSrh18CDhP+yvDlJwQTZ +zSQMfPcsh9bmJe2kqtQP7ZgUp1o+vaB8Sju8YYrO6FllxbdLRGm4pfvvrHIRRmXa +oVHn0ei0JpwoTY9kHYht4LNeJnbP/MCWdmcuv3Gnel7jAlhaKab5aNIGr0Xe7aIQ +iX6mpZ0/Rnt8o/XcTOg8l3ruIdVuySX6SYn08JMnfFkXdNYRVhoV1tC5ElWkaZLf +hPmj2yECgYEAyts0R0b8cZ6HTAyuLm3ilw0s0v0/MM9ZtaqMRilr2WEtAhF0GpHG +TzmGnii0WcTNXD7NTsNcECR/0ZpXPRleMczsL2Juwd4FkQ37h7hdKPseJNrfyHRg +VolOFBX9H14C3wMB9cwdsG4Egw7fE27WCoreEquHgwFxl1zBrXKH088CgYEAxr8w +BKZs0bF7LRrFT5pH8hpMLYHMYk8ZIOfgmEGVBKDQCOERPR9a9kqUss7wl/98LVNK +RnFlyWD6Z0/QcQsLL4LjBeZJ25qEMc6JXm9VGAzhXA1ZkUofVoYCnG+f6KUn8CuJ +/AcV2ZDFsEP10IiQG0hKsceXiwFEvEr8306tMrcCgYBLgnscSR0xAeyk71dq6vZc +ecgEpcX+2kAvclOSzlpZ6WVCjtKkDT0/Qk+M0eQIQkybGLl9pxS+4Yc+s2/jy2yX +pwsHvGE0AvwZeZX2eDcdSRR4bYy9ZixyKdwJeAHnyivRbaIuJ5Opl9pQGpoI9snv +1K9DTdw8dK4exKVHdgl/WwKBgDkmLsuXg4EEtPOyV/xc08VVNIR9Z2T5c7NXmeiO +KyiKiWeUOF3ID2L07S9BfENozq9F3PzGjMtMXJSqibiHwW6nB1rh7mj8VHjx9+Q0 +xVZGFeNfX1r84mgB3uxW2LeQDhzsmB/lda37CC14TU3qhu2hawEV8IijE73FHlOk +Dv+fAoGAI4/XO5o5tNn5Djo8gHmGMCbinUE9+VySxl7wd7PK8w2VSofO88ofixDk +NX94yBYhg5WZcLdPm45RyUnq+WVQYz9IKUrdxLFTH+wxyzUqZCW7jgXCvWV+071q +vqm9C+kndq+18/1VKuCSGWnF7Ay4lbsgPXY2s4VKRxcb3QpZSPU= +-----END RSA PRIVATE KEY----- diff --git a/tests/test_async.py b/tests/test_async.py index 85d98f4..4f44610 100644 --- a/tests/test_async.py +++ b/tests/test_async.py @@ -6,7 +6,6 @@ from mock import patch, create_autospec, MagicMock from tornado.iostream import IOStream - # shunt '..' into sys.path since we are in a 'tests' subdirectory base_dir = os.path.abspath(os.path.join(os.path.dirname(os.path.abspath(__file__)), '..')) if base_dir not in sys.path: diff --git a/tests/test_reader.py b/tests/test_reader.py new file mode 100644 index 0000000..7bfe94d --- /dev/null +++ b/tests/test_reader.py @@ -0,0 +1,137 @@ +import os +import sys +import signal +import subprocess +import time +import ssl + +import tornado.testing + +# shunt '..' into sys.path since we are in a 'tests' subdirectory +base_dir = os.path.abspath(os.path.join(os.path.dirname(os.path.abspath(__file__)), '..')) +if base_dir not in sys.path: + sys.path.insert(0, base_dir) + +import nsq + + +class ReaderIntegrationTest(tornado.testing.AsyncTestCase): + identify_options = { + 'user_agent': 'sup', + 'snappy': True, + 'tls_v1': True, + 'tls_options': {'cert_reqs': ssl.CERT_NONE}, + 'heartbeat_interval': 10, + 'output_buffer_size': 4096, + 'output_buffer_timeout': 50 + } + + def setUp(self): + super(ReaderIntegrationTest, self).setUp() + self.processes = [] + proc = subprocess.Popen(['nsqd', '--verbose', '--snappy', + '--tls-key=%s/tests/key.pem' % base_dir, + '--tls-cert=%s/tests/cert.pem' % base_dir]) + self.processes.append(proc) + time.sleep(1) + + def tearDown(self): + super(ReaderIntegrationTest, self).tearDown() + for proc in self.processes: + os.kill(proc.pid, signal.SIGKILL) + proc.wait() + + def test_conn_identify(self): + c = nsq.async.AsyncConn('127.0.0.1', 4150, io_loop=self.io_loop) + c.on('identify_response', self.stop) + c.connect() + response = self.wait() + print response + assert response['conn'] is c + assert isinstance(response['data'], dict) + + def test_conn_identify_options(self): + c = nsq.async.AsyncConn('127.0.0.1', 4150, io_loop=self.io_loop, + **self.identify_options) + c.on('identify_response', self.stop) + c.connect() + response = self.wait() + print response + assert response['conn'] is c + assert isinstance(response['data'], dict) + assert response['data']['snappy'] is True + assert response['data']['tls_v1'] is True + # assert response['data']['user_agent'] == 'sup' + # assert response['data']['output_buffer_size'] == 4096 + # assert response['data']['output_buffer_timeout'] == 50 + + def test_conn_subscribe(self): + topic = 'test_conn_suscribe_%s' % time.time() + c = nsq.async.AsyncConn('127.0.0.1', 4150, io_loop=self.io_loop, + **self.identify_options) + + def _on_ready(*args, **kwargs): + c.on('response', self.stop) + c.send(nsq.subscribe(topic, 'ch')) + + c.on('ready', _on_ready) + c.connect() + response = self.wait() + print response + assert response['conn'] is c + assert response['data'] == 'OK' + + def _send_messages(self, topic, count, body): + c = nsq.async.AsyncConn('127.0.0.1', 4150, io_loop=self.io_loop) + c.connect() + + def _on_ready(*args, **kwargs): + for i in range(count): + c.send(nsq.pub(topic, body)) + + c.on('ready', _on_ready) + + def test_conn_messages(self): + self.msg_count = 0 + + topic = 'test_conn_suscribe_%s' % time.time() + self._send_messages(topic, 5, 'sup') + + c = nsq.async.AsyncConn('127.0.0.1', 4150, io_loop=self.io_loop, + **self.identify_options) + + def _on_message(*args, **kwargs): + self.msg_count += 1 + if c.rdy == 0: + self.stop() + + def _on_ready(*args, **kwargs): + c.on('message', _on_message) + c.send(nsq.subscribe(topic, 'ch')) + c.send_rdy(5) + + c.on('ready', _on_ready) + c.connect() + + self.wait() + assert self.msg_count == 5 + + def test_reader_messages(self): + self.msg_count = 0 + num_messages = 500 + + topic = 'test_reader_msgs_%s' % time.time() + self._send_messages(topic, num_messages, 'sup') + + def handler(msg): + assert msg.body == 'sup' + self.msg_count += 1 + if self.msg_count >= num_messages: + self.stop() + return True + + nsq.Reader(nsqd_tcp_addresses=['127.0.0.1:4150'], topic=topic, channel='ch', + io_loop=self.io_loop, message_handler=handler, max_in_flight=100, + **self.identify_options) + + self.wait() From 809ca152ea7782089ff3bcff7658199f84a8bbcb Mon Sep 17 00:00:00 2001 From: Matt Reiferson Date: Sun, 16 Feb 2014 16:04:06 -0500 Subject: [PATCH 4/4] install nsq in travis --- .travis.yml | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/.travis.yml b/.travis.yml index 4832bfe..c856a66 100644 --- a/.travis.yml +++ b/.travis.yml @@ -8,10 +8,15 @@ env: - TORNADO_VERSION=3.1.1 - TORNADO_VERSION=3.2 install: - - "pip install simplejson" - - "export PYCURL_SSL_LIBRARY=openssl" - - "pip install pycurl" - - "pip install tornado==$TORNADO_VERSION" + - pip install simplejson + - export PYCURL_SSL_LIBRARY=openssl + - pip install pycurl + - pip install tornado==$TORNADO_VERSION + - sudo apt-get install libsnappy-dev + - pip install python-snappy + - wget https://github.com/bitly/nsq/releases/download/v0.2.26/nsq-0.2.26.linux-amd64.go1.2.tar.gz + - tar zxvf nsq-0.2.26.linux-amd64.go1.2.tar.gz + - sudo cp nsq-0.2.26.linux-amd64.go1.2/bin/nsqd nsq-0.2.26.linux-amd64.go1.2/bin/nsqlookupd /usr/local/bin script: py.test notifications: email: false