Skip to content

Commit

Permalink
Merge pull request #66 from mreiferson/integration_tests_66
Browse files Browse the repository at this point in the history
better end-to-end tests
  • Loading branch information
jehiah committed Feb 17, 2014
2 parents 8f1423d + 809ca15 commit 28190a3
Show file tree
Hide file tree
Showing 10 changed files with 291 additions and 95 deletions.
13 changes: 9 additions & 4 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,15 @@ env:
- TORNADO_VERSION=3.1.1
- TORNADO_VERSION=3.2
install:
- "pip install simplejson"
- "export PYCURL_SSL_LIBRARY=openssl"
- "pip install pycurl"
- "pip install tornado==$TORNADO_VERSION"
- pip install simplejson
- export PYCURL_SSL_LIBRARY=openssl
- pip install pycurl
- pip install tornado==$TORNADO_VERSION
- sudo apt-get install libsnappy-dev
- pip install python-snappy
- wget https://github.com/bitly/nsq/releases/download/v0.2.26/nsq-0.2.26.linux-amd64.go1.2.tar.gz
- tar zxvf nsq-0.2.26.linux-amd64.go1.2.tar.gz
- sudo cp nsq-0.2.26.linux-amd64.go1.2/bin/nsqd nsq-0.2.26.linux-amd64.go1.2/bin/nsqlookupd /usr/local/bin
script: py.test
notifications:
email: false
14 changes: 9 additions & 5 deletions nsq/async.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,8 @@ class AsyncConn(EventedMixin):
"""
def __init__(self, host, port, timeout=1.0, heartbeat_interval=30, requeue_delay=90,
tls_v1=False, tls_options=None, snappy=False, user_agent=None,
output_buffer_size=16 * 1024, output_buffer_timeout=250, sample_rate=0):
output_buffer_size=16 * 1024, output_buffer_timeout=250, sample_rate=0,
io_loop=None):
assert isinstance(host, (str, unicode))
assert isinstance(port, int)
assert isinstance(timeout, float)
Expand Down Expand Up @@ -118,6 +119,9 @@ def __init__(self, host, port, timeout=1.0, heartbeat_interval=30, requeue_delay
self.short_hostname = self.hostname.split('.')[0]
self.heartbeat_interval = heartbeat_interval * 1000
self.requeue_delay = requeue_delay
self.io_loop = io_loop
if not self.io_loop:
self.io_loop = tornado.ioloop.IOLoop.instance()

self.output_buffer_size = output_buffer_size
self.output_buffer_timeout = output_buffer_timeout
Expand All @@ -144,7 +148,7 @@ def connect(self):
self.socket.settimeout(self.timeout)
self.socket.setblocking(0)

self.stream = tornado.iostream.IOStream(self.socket)
self.stream = tornado.iostream.IOStream(self.socket, io_loop=self.io_loop)
self.stream.set_close_callback(self._socket_close)

self.state = 'CONNECTING'
Expand Down Expand Up @@ -183,7 +187,7 @@ def _read_body(self, data):
self.trigger('data', conn=self, data=data)
except Exception:
logging.exception('uncaught exception in data event')
tornado.ioloop.IOLoop.instance().add_callback(self._start_read)
self.io_loop.add_callback(self._start_read)

def send(self, data):
self.stream.write(data)
Expand All @@ -196,7 +200,7 @@ def upgrade_to_tls(self, options=None):
# first remove the event handler for the currently open socket
# so that when we add the socket to the new SSLIOStream below,
# it can re-add the appropriate event handlers.
tornado.ioloop.IOLoop.instance().remove_handler(self.socket.fileno())
self.io_loop.remove_handler(self.socket.fileno())

opts = {
'cert_reqs': ssl.CERT_REQUIRED,
Expand All @@ -206,7 +210,7 @@ def upgrade_to_tls(self, options=None):
self.socket = ssl.wrap_socket(self.socket, ssl_version=ssl.PROTOCOL_TLSv1,
do_handshake_on_connect=False, **opts)

self.stream = tornado.iostream.SSLIOStream(self.socket)
self.stream = tornado.iostream.SSLIOStream(self.socket, io_loop=self.io_loop)
self.stream.set_close_callback(self._socket_close)

# now that the IOStream has been swapped we can kickstart
Expand Down
10 changes: 8 additions & 2 deletions nsq/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,14 @@


class Client(object):
def __init__(self):
tornado.ioloop.PeriodicCallback(self._check_last_recv_timestamps, 60 * 1000).start()
def __init__(self, io_loop=None, **kwargs):
self.io_loop = io_loop
if not self.io_loop:
self.io_loop = tornado.ioloop.IOLoop.instance()

tornado.ioloop.PeriodicCallback(self._check_last_recv_timestamps,
60 * 1000,
io_loop=self.io_loop).start()

def _on_connection_identify(self, conn, data, **kwargs):
logging.info('[%s:%s] IDENTIFY sent %r' % (conn.id, self.name, data))
Expand Down
32 changes: 18 additions & 14 deletions nsq/reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,8 @@ def __init__(self, topic, channel, message_handler=None, name=None,
max_tries=5, max_in_flight=1, lookupd_poll_interval=60,
low_rdy_idle_timeout=10, max_backoff_duration=128, lookupd_poll_jitter=0.3,
**kwargs):
super(Reader, self).__init__(**kwargs)

assert isinstance(topic, (str, unicode)) and len(topic) > 0
assert isinstance(channel, (str, unicode)) and len(channel) > 0
assert isinstance(max_in_flight, int) and max_in_flight > 0
Expand Down Expand Up @@ -175,12 +177,10 @@ def __init__(self, topic, channel, message_handler=None, name=None,

self.conns = {}
self.connection_attempts = {}
self.http_client = tornado.httpclient.AsyncHTTPClient()

self.ioloop = tornado.ioloop.IOLoop.instance()
self.http_client = tornado.httpclient.AsyncHTTPClient(io_loop=self.io_loop)

# will execute when run() is called (for all Reader instances)
self.ioloop.add_callback(self._run)
self.io_loop.add_callback(self._run)

def _run(self):
assert self.message_handler, "you must specify the Reader's message_handler"
Expand All @@ -191,19 +191,23 @@ def _run(self):
address, port = addr.split(':')
self.connect_to_nsqd(address, int(port))

tornado.ioloop.PeriodicCallback(self._redistribute_rdy_state, 5 * 1000).start()
tornado.ioloop.PeriodicCallback(self._redistribute_rdy_state,
5 * 1000,
io_loop=self.io_loop).start()

if not self.lookupd_http_addresses:
return
# trigger the first lookup query manually
self.query_lookupd()

periodic = tornado.ioloop.PeriodicCallback(self.query_lookupd,
self.lookupd_poll_interval * 1000)
self.lookupd_poll_interval * 1000,
io_loop=self.io_loop)

# randomize the time we start this poll loop so that all
# consumers don't query at exactly the same time
delay = random.random() * self.lookupd_poll_interval * self.lookupd_poll_jitter
self.ioloop.add_timeout(time.time() + delay, periodic.start)
self.io_loop.add_timeout(time.time() + delay, periodic.start)

def set_message_handler(self, message_handler):
"""
Expand Down Expand Up @@ -352,22 +356,22 @@ def _start_backoff_block(self):
for c in self.conns.values():
self._send_rdy(c, 0)

self.backoff_timeout = self.ioloop.add_timeout(time.time() + backoff_interval,
self._finish_backoff_block)
self.backoff_timeout = self.io_loop.add_timeout(time.time() + backoff_interval,
self._finish_backoff_block)

def _rdy_retry(self, conn, value):
conn.rdy_timeout = None
self._send_rdy(conn, value)

def _send_rdy(self, conn, value):
if conn.rdy_timeout:
self.ioloop.remove_timeout(conn.rdy_timeout)
self.io_loop.remove_timeout(conn.rdy_timeout)
conn.rdy_timeout = None

if value and self.disabled():
logging.info('[%s:%s] disabled, delaying RDY state change', conn.id, self.name)
rdy_retry_callback = functools.partial(self._rdy_retry, conn, value)
conn.rdy_timeout = self.ioloop.add_timeout(time.time() + 15, rdy_retry_callback)
conn.rdy_timeout = self.io_loop.add_timeout(time.time() + 15, rdy_retry_callback)
return

if value > conn.max_rdy_count:
Expand All @@ -378,7 +382,7 @@ def _send_rdy(self, conn, value):
# if we're going from RDY 0 to non-0 and we couldn't because
# of the configured max in flight, try again
rdy_retry_callback = functools.partial(self._rdy_retry, conn, value)
conn.rdy_timeout = self.ioloop.add_timeout(time.time() + 5, rdy_retry_callback)
conn.rdy_timeout = self.io_loop.add_timeout(time.time() + 5, rdy_retry_callback)
return

if conn.send_rdy(value):
Expand Down Expand Up @@ -467,15 +471,15 @@ def _on_connection_close(self, conn, **kwargs):
self.need_rdy_redistributed = True

if conn.rdy_timeout:
self.ioloop.remove_timeout(conn.rdy_timeout)
self.io_loop.remove_timeout(conn.rdy_timeout)
conn.rdy_timeout = None

if not self.lookupd_http_addresses:
# automatically reconnect to nsqd addresses when not using lookupd
logging.info('[%s:%s] attempting to reconnect in 15s', conn.id, self.name)
reconnect_callback = functools.partial(self.connect_to_nsqd,
host=conn.host, port=conn.port)
self.ioloop.add_timeout(time.time() + 15, reconnect_callback)
self.io_loop.add_timeout(time.time() + 15, reconnect_callback)

def query_lookupd(self):
"""
Expand Down
8 changes: 4 additions & 4 deletions nsq/writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@
import functools
import random

import tornado.ioloop

from client import Client
import nsq
import async
Expand Down Expand Up @@ -77,6 +75,8 @@ def __init__(self, handlers, **settings):
:param \*\*kwargs: passed to :class:`nsq.AsyncConn` initialization
"""
def __init__(self, nsqd_tcp_addresses, **kwargs):
super(Writer, self).__init__(**kwargs)

if not isinstance(nsqd_tcp_addresses, (list, set, tuple)):
assert isinstance(nsqd_tcp_addresses, (str, unicode))
nsqd_tcp_addresses = [nsqd_tcp_addresses]
Expand All @@ -86,7 +86,7 @@ def __init__(self, nsqd_tcp_addresses, **kwargs):
self.conns = {}
self.conn_kwargs = kwargs

self.ioloop.add_callback(self._run)
self.io_loop.add_callback(self._run)

def _run(self):
logging.info('starting writer...')
Expand Down Expand Up @@ -173,7 +173,7 @@ def _on_connection_close(self, conn, **kwargs):
logging.info('[%s] attempting to reconnect in 15s', conn.id)
reconnect_callback = functools.partial(self.connect_to_nsqd,
host=conn.host, port=conn.port)
tornado.ioloop.IOLoop.instance().add_timeout(time.time() + 15, reconnect_callback)
self.io_loop.add_timeout(time.time() + 15, reconnect_callback)

def _finish_pub(self, conn, data, command, topic, msg):
if isinstance(data, nsq.Error):
Expand Down
26 changes: 26 additions & 0 deletions tests/cert.pem
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
-----BEGIN CERTIFICATE-----
MIIEbjCCA1agAwIBAgIJAK6x7y6AwBmLMA0GCSqGSIb3DQEBBQUAMIGAMQswCQYD
VQQGEwJVUzERMA8GA1UECBMITmV3IFlvcmsxFjAUBgNVBAcTDU5ldyBZb3JrIENp
dHkxDDAKBgNVBAoTA05TUTETMBEGA1UEAxMKdGVzdC5sb2NhbDEjMCEGCSqGSIb3
DQEJARYUbXJlaWZlcnNvbkBnbWFpbC5jb20wHhcNMTMwNjI4MDA0MzQ4WhcNMTYw
NDE3MDA0MzQ4WjCBgDELMAkGA1UEBhMCVVMxETAPBgNVBAgTCE5ldyBZb3JrMRYw
FAYDVQQHEw1OZXcgWW9yayBDaXR5MQwwCgYDVQQKEwNOU1ExEzARBgNVBAMTCnRl
c3QubG9jYWwxIzAhBgkqhkiG9w0BCQEWFG1yZWlmZXJzb25AZ21haWwuY29tMIIB
IjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAnX0KB+svwy+yHU2qggz/EaGg
craKShagKo+9M9y5HLM852ngk5c+t+tJJbx3N954Wr1FXBuGIv1ltU05rU4zhvBS
25tVP1UIEnT5pBt2TeetLkl199Y7fxh1hKmnwJMG3fy3VZdNXEndBombXMmtXpQY
shuEJHKeUNDbQKz5X+GjEdkTPO/HY/VMHsxS23pbSimQozMg3hvLIdgv0aS3QECz
ydZBgTPThy3uDtHIuCpxCwXd/vDF68ATlYgo3h3lh2vxNwM/pjklIUhzMh4XaKQF
7m3/0KbtUcXfy0QHueeuMr11E9MAFNyRN4xf9Fk1yB97KJ3PJBTC5WD/m1nW+QID
AQABo4HoMIHlMB0GA1UdDgQWBBR3HMBws4lmYYSIgwoZsfW+bbgaMjCBtQYDVR0j
BIGtMIGqgBR3HMBws4lmYYSIgwoZsfW+bbgaMqGBhqSBgzCBgDELMAkGA1UEBhMC
VVMxETAPBgNVBAgTCE5ldyBZb3JrMRYwFAYDVQQHEw1OZXcgWW9yayBDaXR5MQww
CgYDVQQKEwNOU1ExEzARBgNVBAMTCnRlc3QubG9jYWwxIzAhBgkqhkiG9w0BCQEW
FG1yZWlmZXJzb25AZ21haWwuY29tggkArrHvLoDAGYswDAYDVR0TBAUwAwEB/zAN
BgkqhkiG9w0BAQUFAAOCAQEANOYTbanW2iyV1v4oYpcM/y3TWcQKzSME8D2SGFZb
dbMYU81hH3TTlQdvyeh3FAcdjhKE8Xi/RfNNjEslTBscdKXePGpZg6eXRNJzPP5K
KZPf5u6tcpAeUOKrMqbGwbE+h2QixxG1EoVQtE421szsU2P7nHRTdHzKFRnOerfl
Phm3NocR0P40Rv7WKdxpOvqc+XKf0onTruoVYoPWGpwcLixCG0zu4ZQ23/L/Dy18
4u70Hbq6O/6kq9FBFaDNp3IhiEdu2Cq6ZplU6bL9XDF27KIEErHwtuqBHVlMG+zB
oH/k9vZvwH7OwAjHdKp+1yeZFLYC8K5hjFIHqcdwpZCNIg==
-----END CERTIFICATE-----
27 changes: 27 additions & 0 deletions tests/key.pem
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
-----BEGIN RSA PRIVATE KEY-----
MIIEogIBAAKCAQEAnX0KB+svwy+yHU2qggz/EaGgcraKShagKo+9M9y5HLM852ng
k5c+t+tJJbx3N954Wr1FXBuGIv1ltU05rU4zhvBS25tVP1UIEnT5pBt2TeetLkl1
99Y7fxh1hKmnwJMG3fy3VZdNXEndBombXMmtXpQYshuEJHKeUNDbQKz5X+GjEdkT
PO/HY/VMHsxS23pbSimQozMg3hvLIdgv0aS3QECzydZBgTPThy3uDtHIuCpxCwXd
/vDF68ATlYgo3h3lh2vxNwM/pjklIUhzMh4XaKQF7m3/0KbtUcXfy0QHueeuMr11
E9MAFNyRN4xf9Fk1yB97KJ3PJBTC5WD/m1nW+QIDAQABAoIBACvtfKbIywG+hAf4
ad7skRjx5DcbA2e29+XnQfb9UgTXWd2SgrmoLi5OypBkCTzkKN3mfTo70yZfV8dC
Sxwz+9tfnTz0DssjhKThS+CiaFVCkeOfSfBfKSlCQUVHrSrh18CDhP+yvDlJwQTZ
zSQMfPcsh9bmJe2kqtQP7ZgUp1o+vaB8Sju8YYrO6FllxbdLRGm4pfvvrHIRRmXa
oVHn0ei0JpwoTY9kHYht4LNeJnbP/MCWdmcuv3Gnel7jAlhaKab5aNIGr0Xe7aIQ
iX6mpZ0/Rnt8o/XcTOg8l3ruIdVuySX6SYn08JMnfFkXdNYRVhoV1tC5ElWkaZLf
hPmj2yECgYEAyts0R0b8cZ6HTAyuLm3ilw0s0v0/MM9ZtaqMRilr2WEtAhF0GpHG
TzmGnii0WcTNXD7NTsNcECR/0ZpXPRleMczsL2Juwd4FkQ37h7hdKPseJNrfyHRg
VolOFBX9H14C3wMB9cwdsG4Egw7fE27WCoreEquHgwFxl1zBrXKH088CgYEAxr8w
BKZs0bF7LRrFT5pH8hpMLYHMYk8ZIOfgmEGVBKDQCOERPR9a9kqUss7wl/98LVNK
RnFlyWD6Z0/QcQsLL4LjBeZJ25qEMc6JXm9VGAzhXA1ZkUofVoYCnG+f6KUn8CuJ
/AcV2ZDFsEP10IiQG0hKsceXiwFEvEr8306tMrcCgYBLgnscSR0xAeyk71dq6vZc
ecgEpcX+2kAvclOSzlpZ6WVCjtKkDT0/Qk+M0eQIQkybGLl9pxS+4Yc+s2/jy2yX
pwsHvGE0AvwZeZX2eDcdSRR4bYy9ZixyKdwJeAHnyivRbaIuJ5Opl9pQGpoI9snv
1K9DTdw8dK4exKVHdgl/WwKBgDkmLsuXg4EEtPOyV/xc08VVNIR9Z2T5c7NXmeiO
KyiKiWeUOF3ID2L07S9BfENozq9F3PzGjMtMXJSqibiHwW6nB1rh7mj8VHjx9+Q0
xVZGFeNfX1r84mgB3uxW2LeQDhzsmB/lda37CC14TU3qhu2hawEV8IijE73FHlOk
Dv+fAoGAI4/XO5o5tNn5Djo8gHmGMCbinUE9+VySxl7wd7PK8w2VSofO88ofixDk
NX94yBYhg5WZcLdPm45RyUnq+WVQYz9IKUrdxLFTH+wxyzUqZCW7jgXCvWV+071q
vqm9C+kndq+18/1VKuCSGWnF7Ay4lbsgPXY2s4VKRxcb3QpZSPU=
-----END RSA PRIVATE KEY-----
43 changes: 21 additions & 22 deletions tests/test_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,9 @@
import sys

import struct
from mock import patch, create_autospec
from mock import patch, create_autospec, MagicMock
from tornado.iostream import IOStream


# shunt '..' into sys.path since we are in a 'tests' subdirectory
base_dir = os.path.abspath(os.path.join(os.path.dirname(os.path.abspath(__file__)), '..'))
if base_dir not in sys.path:
Expand All @@ -20,8 +19,8 @@ def f(*args, **kwargs):
pass


def _get_test_conn():
conn = nsq.async.AsyncConn('test', 4150)
def _get_test_conn(io_loop=None):
conn = nsq.async.AsyncConn('test', 4150, io_loop=io_loop)
# now set the stream attribute, which is ordinarily set in conn.connect()
conn.stream = create_autospec(IOStream)
return conn
Expand Down Expand Up @@ -87,23 +86,23 @@ def test_read_size():


def test_read_body():
conn = _get_test_conn()
mock_io_loop = MagicMock()

conn = _get_test_conn(io_loop=mock_io_loop)
on_data = create_autospec(f)
conn.on('data', on_data)
# I won't autospec the mock below, it doesn't seem to want to behave.
# I only assert against one of its attrs anyway, which I will spec
with patch('nsq.async.tornado.ioloop.IOLoop.instance') as mock_io_loop:
mock_ioloop_addcb = create_autospec(f)
mock_io_loop.return_value.add_callback = mock_ioloop_addcb
data = 'NSQ'
conn._read_body(data)
on_data.assert_called_once_with(conn=conn, data=data)
mock_ioloop_addcb.assert_called_once_with(conn._start_read)

# now test functionality when the data_callback fails
on_data.reset_mock()
mock_ioloop_addcb.reset_mock()
on_data.return_value = Exception("Boom.")
conn._read_body(data)
# verify that we still added callback for the next start_read
mock_ioloop_addcb.assert_called_once_with(conn._start_read)

mock_ioloop_addcb = create_autospec(f)
mock_io_loop.add_callback = mock_ioloop_addcb
data = 'NSQ'
conn._read_body(data)
on_data.assert_called_once_with(conn=conn, data=data)
mock_ioloop_addcb.assert_called_once_with(conn._start_read)

# now test functionality when the data_callback fails
on_data.reset_mock()
mock_ioloop_addcb.reset_mock()
on_data.return_value = Exception("Boom.")
conn._read_body(data)
# verify that we still added callback for the next start_read
mock_ioloop_addcb.assert_called_once_with(conn._start_read)
Loading

0 comments on commit 28190a3

Please sign in to comment.