Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use pipelines when dumping #53

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
102 changes: 75 additions & 27 deletions redisdl.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,18 +141,18 @@ def dumps(host='localhost', port=6379, password=None, db=0, pretty=False,
class BytesWriteWrapper(object):
def __init__(self, stream):
self.stream = stream

def write(self, str):
return self.stream.write(str.encode())

def dump(fp, host='localhost', port=6379, password=None, db=0, pretty=False,
unix_socket_path=None, encoding='utf-8', keys='*'):

try:
fp.write('')
except TypeError:
fp = BytesWriteWrapper(fp)

if pretty:
# hack to avoid implementing pretty printing
fp.write(dumps(host=host, port=port, password=password, db=db,
Expand Down Expand Up @@ -276,28 +276,76 @@ def _read_key(key, r, pretty, encoding):
return (type, ttl, value)

def _reader(r, pretty, encoding, keys='*'):
for encoded_key in r.keys(keys):
key = encoded_key.decode(encoding)
handled = False
for i in range(10):
try:
type, ttl, value = _read_key(encoded_key, r, pretty, encoding)
yield key, type, ttl, value
handled = True
break
except KeyDeletedError:
# do not dump the key
handled = True
break
except redis.WatchError:
# same logic as key type changed
pass
except KeyTypeChangedError:
# retry reading type again
pass
if not handled:
# ran out of retries
raise ConcurrentModificationError('Key %s is being concurrently modified' % key)
encoded_keys = r.keys(keys)
i = 0
while i < len(encoded_keys):
for key, type, ttl, value in _read_keys(r, encoded_keys[i:i+10000],
pretty=pretty, encoding=encoding):
yield key, type, ttl, value
i += 10000

def _read_keys(r, encoded_keys, pretty, encoding):
decoded_keys = [encoded_key.decode() for encoded_key in encoded_keys]
do_keys = decoded_keys
retries = 5
type_results = None
while len(do_keys) > 0 and retries > 0:
next_do_keys = []
next_type_results = []

if type_results is None:
# first pass, need to get the types.
# on subsequent passes we know the types
# because the previous pass retrieved them and
# found a type mismatch
p = r.pipeline()
for key in do_keys:
p.type(key)
type_results = p.execute()

p = r.pipeline()
for i in range(len(do_keys)):
key = decoded_keys[i]
type = type_results[i].decode('ascii')
if type == 'none':
# key was deleted by a concurrent operation on the data store.
# issue noops so that the number of results does not change
p.type(key)
p.type(key)
p.type(key)
continue
reader = readers.get(type)
if reader is None:
raise UnknownTypeError("Unknown key type: %s" % type)
reader.send_command(p, key)
r.pttl_or_ttl_pipeline(p, key)
p.type(key)
results = p.execute()

for i in range(len(do_keys)):
key = decoded_keys[i]
original_type = type_results[i]
if original_type == 'none':
# this is where we actually skip a key that was deleted
# by concurrent operations
continue
final_type = results[i*3+2].decode('ascii')
if original_type != final_type:
# type changed, will retry
next_do_keys.append(key)
# need to update expected type
next_type_results.append(final_type)
continue
reader = readers.get(original_type)
value = reader.handle_response(results[i*3], pretty, encoding)
ttl = r.decode_pttl_or_ttl_pipeline_value(results[i*3+1])
yield key, final_type, ttl, value
retries -= 1
do_keys = next_do_keys
type_results = next_type_results

if len(do_keys) > 0:
raise ConcurrentModificationError('Keys %s are being concurrently modified' % ', '.join(do_keys))

def _empty(r):
for key in r.keys():
Expand Down Expand Up @@ -372,14 +420,14 @@ def ijson_top_level_items(file, local_streaming_backend):
class TextReadWrapper(object):
def __init__(self, fp):
self.fp = fp

def read(self, *args, **kwargs):
return self.fp.read(*args, **kwargs).decode()

class BytesReadWrapper(object):
def __init__(self, fp):
self.fp = fp

def read(self, *args, **kwargs):
return self.fp.read(*args, **kwargs).encode('utf-8')

Expand Down