Skip to content

Commit

Permalink
Simplify and improve key generation when working in cluster mode.
Browse files Browse the repository at this point in the history
Instead of randomly generating keys and storing them per shard until
we get a key which matches our desired shard, generate one which will
always match.
This improved performance and reduces memory overhead when working with
a lot of primaries.
  • Loading branch information
ushachar committed Feb 7, 2023
1 parent 15edf14 commit 30b1e91
Show file tree
Hide file tree
Showing 3 changed files with 868 additions and 88 deletions.
108 changes: 24 additions & 84 deletions cluster_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
#include "memtier_benchmark.h"
#include "obj_gen.h"
#include "shard_connection.h"
#include "crc16_slottable.h"

#define KEY_INDEX_QUEUE_MAX_SIZE 1000000

Expand Down Expand Up @@ -100,24 +101,13 @@ static inline uint16_t crc16(const char *buf, size_t len) {
return crc;
}

static uint32_t calc_hslot_crc16_cluster(const char *str, size_t length)
{
uint32_t rv = (uint32_t) crc16(str, length) & MAX_CLUSTER_HSLOT;
return rv;
}

///////////////////////////////////////////////////////////////////////////////////////////////////////

cluster_client::cluster_client(client_group* group) : client(group)
{
}

cluster_client::~cluster_client() {
for (unsigned int i = 0; i < m_key_index_pools.size(); i++) {
key_index_pool* key_idx_pool = m_key_index_pools[i];
delete key_idx_pool;
}
m_key_index_pools.clear();
}

int cluster_client::connect(void) {
Expand All @@ -128,11 +118,6 @@ int cluster_client::connect(void) {
// set main connection to send 'CLUSTER SLOTS' command
sc->set_cluster_slots();

// create key index pool for main connection
key_index_pool* key_idx_pool = new key_index_pool;
m_key_index_pools.push_back(key_idx_pool);
assert(m_connections.size() == m_key_index_pools.size());

// continue with base class
client::connect();

Expand Down Expand Up @@ -166,22 +151,10 @@ shard_connection* cluster_client::create_shard_connection(abstract_protocol* abs

m_connections.push_back(sc);

// create key index pool
key_index_pool* key_idx_pool = new key_index_pool;
assert(key_idx_pool != NULL);

m_key_index_pools.push_back(key_idx_pool);
assert(m_connections.size() == m_key_index_pools.size());

return sc;
}

bool cluster_client::connect_shard_connection(shard_connection* sc, char* address, char* port) {
// empty key index queue
if (m_key_index_pools[sc->get_id()]->size()) {
key_index_pool empty_queue;
std::swap(*m_key_index_pools[sc->get_id()], empty_queue);
}

// save address and port
sc->set_address_port(address, port);
Expand Down Expand Up @@ -224,9 +197,12 @@ void cluster_client::handle_cluster_slots(protocol_response *r) {
*/
unsigned long prev_connections_size = m_connections.size();
std::vector<bool> close_sc(prev_connections_size, true);
for (unsigned int i = 0; i < MAX_SLOTS; i++) {
m_conn_to_init_slot[i] = UINT16_MAX;
}

// run over response and create connections
for (unsigned int i=0; i<r->get_mbulk_value()->mbulks_elements.size(); i++) {
for (unsigned int i = 0; i < r->get_mbulk_value()->mbulks_elements.size(); i++) {
// create connection
mbulk_size_el* shard = r->get_mbulk_value()->mbulks_elements[i]->as_mbulk_size();

Expand Down Expand Up @@ -273,17 +249,26 @@ void cluster_client::handle_cluster_slots(protocol_response *r) {
connect_shard_connection(sc, addr, port);
}

// update range
unsigned int sc_id = sc->get_id();
// Set the initial slot for this shard connection
if (m_conn_to_init_slot[sc_id] == UINT16_MAX) {
m_conn_to_init_slot[sc_id] = min_slot;
}
for (int j = min_slot; j <= max_slot; j++) {
m_slot_to_shard[j] = sc->get_id();
if (j < max_slot) {
m_slot_lists[j] = j+1;
} else {
// Close the loop - point the last index to the first one owned by the shard connection
m_slot_lists[j] = m_conn_to_init_slot[sc_id];
}
}

free(addr);
free(port);
}

// check if some connections left with no slots, and need to be closed
for (unsigned int i=0; i < prev_connections_size; i++) {
for (unsigned int i = 0; i < prev_connections_size; i++) {
if ((close_sc[i] == true) &&
(m_connections[i]->get_connection_state() != conn_disconnected)) {

Expand All @@ -299,8 +284,7 @@ bool cluster_client::hold_pipeline(unsigned int conn_id) {

// don't exceed requests
if (m_config->requests) {
if (m_key_index_pools[conn_id]->empty() &&
m_reqs_generated >= m_config->requests) {
if (m_reqs_generated >= m_config->requests) {
return true;
}
}
Expand All @@ -309,53 +293,13 @@ bool cluster_client::hold_pipeline(unsigned int conn_id) {
}

bool cluster_client::get_key_for_conn(unsigned int conn_id, int iter, unsigned long long* key_index) {
// first check if we already have key in pool
if (!m_key_index_pools[conn_id]->empty()) {
*key_index = m_key_index_pools[conn_id]->front();
m_key_len = snprintf(m_key_buffer, sizeof(m_key_buffer)-1, "%s%llu", m_obj_gen->get_key_prefix(), *key_index);

m_key_index_pools[conn_id]->pop();
return true;
}

// keep generate key till it match for this connection, or requests reached
while (true) {
// generate key
*key_index = m_obj_gen->get_key_index(iter);
m_key_len = snprintf(m_key_buffer, sizeof(m_key_buffer)-1, "%s%llu", m_obj_gen->get_key_prefix(), *key_index);

unsigned int hslot = calc_hslot_crc16_cluster(m_key_buffer, m_key_len);

// check if the key match for this connection
if (m_slot_to_shard[hslot] == conn_id) {
m_reqs_generated++;
return true;
}

// handle key for other connection
unsigned int other_conn_id = m_slot_to_shard[hslot];

// in case we generated key for connection that is disconnected, 'slot to shard' map may need to be updated
if (m_connections[other_conn_id]->get_connection_state() == conn_disconnected) {
m_connections[conn_id]->set_cluster_slots();
return false;
}

// in case connection is during cluster slots command, his slots mapping not relevant
if (m_connections[other_conn_id]->get_cluster_slots_state() != setup_done)
continue;

// store key for other connection, if queue is not full
key_index_pool* key_idx_pool = m_key_index_pools[other_conn_id];
if (key_idx_pool->size() < KEY_INDEX_QUEUE_MAX_SIZE) {
key_idx_pool->push(*key_index);
m_reqs_generated++;
}

// don't exceed requests
if (m_config->requests > 0 && m_reqs_generated >= m_config->requests)
return false;
}
*key_index = m_obj_gen->get_key_index(iter);
m_key_len = snprintf(m_key_buffer, sizeof(m_key_buffer)-1, "%s{%s}%llu",
m_obj_gen->get_key_prefix(), crc16_slot_table[m_conn_to_init_slot[conn_id]], *key_index);
m_conn_to_init_slot[conn_id] = m_slot_lists[m_conn_to_init_slot[conn_id]];
m_reqs_generated++;
return true;
}

// This function could use some urgent TLC -- but we need to do it without altering the behavior
Expand Down Expand Up @@ -432,10 +376,6 @@ void cluster_client::handle_moved(unsigned int conn_id, struct timeval timestamp
if (m_connections[conn_id]->get_cluster_slots_state() != setup_done)
return;

// queue may stored uncorrected mapping indexes, empty them
key_index_pool empty_queue;
std::swap(*m_key_index_pools[conn_id], empty_queue);

// set connection to send 'CLUSTER SLOTS' command
m_connections[conn_id]->set_cluster_slots();
}
Expand Down
13 changes: 9 additions & 4 deletions cluster_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,20 @@
#include <set>
#include "client.h"

typedef std::queue<unsigned long long> key_index_pool;
#define MAX_SLOTS 16384

// forward decleration
// forward declaration
class shard_connection;

class cluster_client : public client {
protected:
std::vector<key_index_pool*> m_key_index_pools;
unsigned int m_slot_to_shard[16384];
/*
* Stores the first slot owned by the indexed connection.
* Since we connect only to primaries we can have at most 16K distinct connections...
*/
uint16_t m_conn_to_init_slot[MAX_SLOTS];
// An index-linked array used to store circular lists of slots, one for each shard returned by the SLOTS command.
uint16_t m_slot_lists[MAX_SLOTS];

char m_key_buffer[250];
int m_key_len;
Expand Down
Loading

0 comments on commit 30b1e91

Please sign in to comment.