Skip to content

Commit

Permalink
Alternative implementation of zmq.socket.pubsub.Dist to resolve issue z…
Browse files Browse the repository at this point in the history
  • Loading branch information
fbacchella committed Jan 28, 2023
1 parent d39421e commit 6bb2279
Showing 1 changed file with 60 additions and 123 deletions.
183 changes: 60 additions & 123 deletions src/main/java/zmq/socket/pubsub/Dist.java
Original file line number Diff line number Diff line change
@@ -1,216 +1,153 @@
package zmq.socket.pubsub;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.HashMap;
import java.util.Map;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Predicate;

import zmq.Msg;
import zmq.pipe.Pipe;

public class Dist
{
private static class State {
boolean matching;
boolean active;
boolean eligible;
}

// List of outbound pipes.
private final List<Pipe> pipes;
private final Map<Pipe, State> pipes;

// Number of all the pipes to send the next message to.
private int matching;
// private int matching;

// Number of active pipes. All the active pipes are located at the
// beginning of the pipes array. These are the pipes the messages
// can be sent to at the moment.
private int active;
//private int active;

// Number of pipes eligible for sending messages to. This includes all
// the active pipes plus all the pipes that we can in theory send
// messages to (the HWM is not yet reached), but sending a message
// to them would result in partial message being delivered, ie. message
// with initial parts missing.
private int eligible;
//private int eligible;

// True if last we are in the middle of a multipart message.
private boolean more;

public Dist()
{
matching = 0;
active = 0;
eligible = 0;
more = false;
pipes = new ArrayList<>();
pipes = new HashMap<>();
}

// Adds the pipe to the distributor object.
public void attach(Pipe pipe)
{
State newState = new State();
State oldState = pipes.put(pipe, newState);
assert oldState == null;
newState.eligible = true;
// If we are in the middle of sending a message, we'll add new pipe
// into the list of eligible pipes. Otherwise we add it to the list
// into the list of eligible pipes. Otherwise, we add it to the list
// of active pipes.
if (more) {
pipes.add(pipe);
Collections.swap(pipes, eligible, pipes.size() - 1);
eligible++;
}
else {
pipes.add(pipe);
Collections.swap(pipes, active, pipes.size() - 1);
active++;
eligible++;
}
newState.active = ! more;
newState.matching = false;
}

// Mark the pipe as matching. Subsequent call to sendToMatching
// will send message also to this pipe.
public void match(Pipe pipe)
{
int idx = pipes.indexOf(pipe);
// If pipe is already matching do nothing.
if (idx < matching) {
return;
}

// If the pipe isn't eligible, ignore it.
if (idx >= eligible) {
return;
}

// Mark the pipe as matching.
Collections.swap(pipes, idx, matching);
matching++;
pipes.get(pipe).matching = pipes.get(pipe).eligible;
}

// Mark all pipes as non-matching.
public void unmatch()
{
matching = 0;
pipes.values().forEach(s -> s.matching = false);

}

// Removes the pipe from the distributor object.
public void terminated(Pipe pipe)
{
// Remove the pipe from the list; adjust number of matching, active and/or
// eligible pipes accordingly.
if (pipes.indexOf(pipe) < matching) {
Collections.swap(pipes, pipes.indexOf(pipe), matching - 1);
matching--;
}
if (pipes.indexOf(pipe) < active) {
Collections.swap(pipes, pipes.indexOf(pipe), active - 1);
active--;
}
if (pipes.indexOf(pipe) < eligible) {
Collections.swap(pipes, pipes.indexOf(pipe), eligible - 1);
eligible--;
}
pipes.remove(pipe);
}

// Activates pipe that have previously reached high watermark.
public void activated(Pipe pipe)
{
// Move the pipe from passive to eligible state.
if (eligible < pipes.size()) {
Collections.swap(pipes, pipes.indexOf(pipe), eligible);
eligible++;
}

State s = pipes.get(pipe);
s.eligible = true;
// If there's no message being sent at the moment, move it to
// the active state.
if (!more && active < pipes.size()) {
Collections.swap(pipes, eligible - 1, active);
active++;
}
s.active = !more;
}

// Send the message to all the outbound pipes.
public boolean sendToAll(Msg msg)
{
matching = active;
return sendToMatching(msg);
}

// Send the message to the matching outbound pipes.
public boolean sendToMatching(Msg msg)
{
private void sendMsg(Msg msg, Predicate<State> checkWrite) {
// Is this end of a multipart message?
boolean msgMore = msg.hasMore();

// Push the message to matching pipes.
distribute(msg);

// If multipart message is fully sent, activate all the eligible pipes.
if (!msgMore) {
active = eligible;
}

pipes.entrySet().stream().forEach(e -> {
State s = e.getValue();
Pipe p = e.getKey();
boolean activate = true;
if (checkWrite.test(s)) {
if (!p.write(msg)) {
s.matching = false;
activate = false;
s.eligible = false;
} else {
s.matching = true;
if (!msgMore) {
p.flush();
}
}
}
s.active = activate;
});
more = msgMore;

return true;
}

// Put the message to all active pipes.
private void distribute(Msg msg)
// Send the message to all the outbound pipes.
public boolean sendToAll(Msg msg)
{
// If there are no matching pipes available, simply drop the message.
if (matching == 0) {
return;
}

// TODO isVsm

// Push copy of the message to each matching pipe.
for (int idx = 0; idx < matching; ++idx) {
if (!write(pipes.get(idx), msg)) {
--idx; // Retry last write because index will have been swapped
}
}
sendMsg(msg, s -> s.active);
return true;
}

public boolean hasOut()
// Send the message to the matching outbound pipes.
public boolean sendToMatching(Msg msg)
{
sendMsg(msg, s -> s.matching && s.active);
return true;
}

// Write the message to the pipe. Make the pipe inactive if writing
// fails. In such a case false is returned.
private boolean write(Pipe pipe, Msg msg)
public boolean hasOut()
{
if (!pipe.write(msg)) {
Collections.swap(pipes, pipes.indexOf(pipe), matching - 1);
matching--;
Collections.swap(pipes, pipes.indexOf(pipe), active - 1);
active--;
Collections.swap(pipes, active, eligible - 1);
eligible--;
return false;
}
if (!msg.hasMore()) {
pipe.flush();
}
return true;
}

public boolean checkHwm()
{
for (int idx = 0; idx < matching; ++idx) {
if (!pipes.get(idx).checkHwm()) {
return false;
}
}
return true;
return pipes.keySet().stream().map(Pipe::checkHwm).reduce(true, Boolean::logicalAnd);
}

int active()
{
return active;
return pipes.values().stream().map(s -> s.active ? 1 : 0).reduce(0, Integer::sum);
}

int eligible()
{
return eligible;
return pipes.values().stream().map(s -> s.eligible ? 1 : 0).reduce(0, Integer::sum);
}

int matching()
{
return matching;
return pipes.values().stream().map(s -> s.matching ? 1 : 0).reduce(0, Integer::sum);
}
}

0 comments on commit 6bb2279

Please sign in to comment.