Skip to content

Commit

Permalink
Merge branch 'master-2.x-local-dev' into 'master-2.x'
Browse files Browse the repository at this point in the history
update version to 2.1.0

See merge request changming.xie/tcc-transaction!2
  • Loading branch information
changming.xie committed Mar 7, 2024
2 parents 08c8c7d + e5c7f80 commit 47f6bae
Show file tree
Hide file tree
Showing 29 changed files with 1,250 additions and 22 deletions.
3 changes: 2 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@

<properties>
<java.version>1.8</java.version>
<revision>2.0.2</revision>
<revision>2.1.0</revision>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.target>${java.version}</maven.compiler.target>
<maven.compiler.source>${java.version}</maven.compiler.source>
Expand All @@ -59,6 +59,7 @@
<module>tcc-transaction-core</module>
<module>tcc-transaction-spring</module>
<module>tcc-transaction-dashboard</module>
<module>tcc-transaction-prometheus-exporter</module>
<module>tcc-transaction-server</module>
<module>tcc-transaction-dubbo</module>
<module>tcc-transaction-bom</module>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.mengyun.tcctransaction.serializer.TccRemotingCommandSerializer;
import org.mengyun.tcctransaction.serializer.TccTransactionStoreSerializer;
import org.mengyun.tcctransaction.serializer.TransactionStoreSerializer;
import org.mengyun.tcctransaction.stats.StatsManager;
import org.mengyun.tcctransaction.storage.StorageType;
import org.mengyun.tcctransaction.storage.TransactionStorage;
import org.mengyun.tcctransaction.storage.TransactionStorageFactory;
Expand Down Expand Up @@ -73,6 +74,8 @@ public class TccServer implements TccService {

private List<RegistryService> registryServices;

private StatsManager statsManager;

public TccServer(ServerConfig serverConfig) {
if (serverConfig != null) {
this.serverConfig = serverConfig;
Expand All @@ -85,7 +88,11 @@ public TccServer(ServerConfig serverConfig) {
throw new SystemException(String.format("unsupported StorageType<%s> in server side.", this.serverConfig.getStorageType().value()));
}

this.remotingServer = new NettyRemotingServer(this.remotingCommandSerializer, this.serverConfig);
String instance = StringUtils.isNotEmpty(this.serverConfig.getRegistryAddress())
? this.serverConfig.getRegistryAddress() : (NetUtils.getLocalAddress() + ":" + this.serverConfig.getListenPort());
this.statsManager = new StatsManager(instance);

this.remotingServer = new NettyRemotingServer(this.remotingCommandSerializer, this.serverConfig, this.statsManager);

this.registryServices = RegistryFactory.getInstance(this.serverConfig);

Expand Down Expand Up @@ -231,6 +238,10 @@ public RecoveryExecutor getRecoveryExecutor() {
return recoveryExecutor;
}

public StatsManager getStatsManager() {
return statsManager;
}

@ChannelHandler.Sharable
class UnregisterScheduleHandler extends ChannelInboundHandlerAdapter {
@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
package org.mengyun.tcctransaction.constants;

import java.util.HashMap;
import java.util.Map;

public class RemotingServiceCode {

public static final int SYSTEM = 0;
public static final int CREATE = 1;
public static final int UPDATE = 2;
public static final int DELETE = 3;
Expand All @@ -10,6 +15,24 @@ public class RemotingServiceCode {
public static final int REGISTER = 7;
public static final int DESERIALIZE_TRANSACTION = 8;

private static Map<Integer, String> codeToDesc = new HashMap<>();

static {
codeToDesc.put(SYSTEM, "SYSTEM");
codeToDesc.put(CREATE, "CREATE");
codeToDesc.put(UPDATE, "UPDATE");
codeToDesc.put(DELETE, "DELETE");
codeToDesc.put(FIND, "FIND");
codeToDesc.put(RECOVER_COMMIT, "RECOVER_COMMIT");
codeToDesc.put(RECOVER_ROLLBACK, "RECOVER_ROLLBACK");
codeToDesc.put(REGISTER, "REGISTER");
codeToDesc.put(DESERIALIZE_TRANSACTION, "DESERIALIZE_TRANSACTION");
}

private RemotingServiceCode() {
}

public static String getDesc(int code) {
return codeToDesc.getOrDefault(code, "UNKNOWN");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,16 +37,20 @@ public void registerScheduleAndStartIfNotPresent(String domain) {
}

public Scheduler registerScheduleIfNotPresent(String domain) {
if (!schedulers.containsKey(domain)) {
synchronized (RecoveryScheduler.class) {
if (!schedulers.containsKey(domain)) {
Scheduler scheduler = createScheduler(domain);
scheduleJob(scheduler, domain);
schedulers.put(domain, scheduler);
}
Scheduler scheduler = schedulers.get(domain);
if (scheduler != null) {
return scheduler;
}
synchronized (RecoveryScheduler.class) {
scheduler = schedulers.get(domain);
if (scheduler != null) {
return scheduler;
}
scheduler = createScheduler(domain);
scheduleJob(scheduler, domain);
schedulers.put(domain, scheduler);
return scheduler;
}
return schedulers.get(domain);
}

public void unregisterSchedule(String domain) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import org.mengyun.tcctransaction.remoting.RequestProcessor;
import org.mengyun.tcctransaction.remoting.protocol.RemotingCommand;
import org.mengyun.tcctransaction.remoting.protocol.RemotingCommandCode;
import org.mengyun.tcctransaction.stats.StatsManager;
import org.mengyun.tcctransaction.utils.NetUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -26,10 +27,11 @@ public abstract class AbstractNettyRemoting {
new HashMap<>(64);
protected Pair<RequestProcessor<ChannelHandlerContext>, ExecutorService> defaultRequestProcessor;

protected StatsManager statsManager;

protected ConcurrentMap<Integer /* requestId */, ResponseFuture> responseTable = new ConcurrentHashMap<>(256);

protected void processMessageReceived(ChannelHandlerContext ctx, RemotingCommand cmd) {

if (cmd.isRequestCommand()) {
processRequestCommand(ctx, cmd);
} else {
Expand All @@ -54,7 +56,7 @@ private void processRequestCommand(ChannelHandlerContext ctx, RemotingCommand cm
Runnable run = () -> {
try {
RemotingCommand responseCommand = pair.getLeft().processRequest(ctx, cmd);

responseCommand.setOriginalCommand(cmd);
responseCommand.setCode(RemotingCommandCode.SERVICE_RESP);
responseCommand.setRequestId(cmd.getRequestId());
ctx.writeAndFlush(responseCommand);
Expand All @@ -65,6 +67,7 @@ private void processRequestCommand(ChannelHandlerContext ctx, RemotingCommand cm
StringWriter errors = new StringWriter();
e.printStackTrace(new PrintWriter(errors));
RemotingCommand responseCommand = RemotingCommand.createCommand(RemotingCommandCode.SYSTEM_EXCEPTION_RESP, errors.toString());
responseCommand.setOriginalCommand(cmd);
responseCommand.setRequestId(cmd.getRequestId());
ctx.writeAndFlush(responseCommand);
}
Expand All @@ -80,6 +83,7 @@ private void processRequestCommand(ChannelHandlerContext ctx, RemotingCommand cm

final RemotingCommand response = RemotingCommand.createCommand(RemotingCommandCode.SYSTEM_BUSY_RESP,
"[OVERLOAD]system busy, start flow control for a while");
response.setOriginalCommand(cmd);
response.setRequestId(cmd.getRequestId());
ctx.writeAndFlush(response);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@
import org.mengyun.tcctransaction.remoting.exception.RemotingSendRequestException;
import org.mengyun.tcctransaction.remoting.exception.RemotingTimeoutException;
import org.mengyun.tcctransaction.remoting.protocol.RemotingCommand;
import org.mengyun.tcctransaction.remoting.protocol.RemotingCommandCode;
import org.mengyun.tcctransaction.serializer.RemotingCommandSerializer;
import org.mengyun.tcctransaction.stats.StatsManager;
import org.mengyun.tcctransaction.support.FactoryBuilder;
import org.mengyun.tcctransaction.utils.NetUtils;
import org.slf4j.Logger;
Expand Down Expand Up @@ -54,10 +56,11 @@ public class NettyRemotingServer extends AbstractNettyRemoting implements Remoti

private RemotingCommandSerializer serializer;

public NettyRemotingServer(RemotingCommandSerializer serializer, NettyServerConfig nettyServerConfig) {
public NettyRemotingServer(RemotingCommandSerializer serializer, NettyServerConfig nettyServerConfig, StatsManager statsManager) {
this.nettyServerConfig = nettyServerConfig;
this.serializer = serializer;
this.serverBootstrap = new ServerBootstrap();
this.statsManager = statsManager;

if (useEpoll()) {

Expand Down Expand Up @@ -141,6 +144,7 @@ protected void initChannel(SocketChannel ch) throws Exception {
new NettyEncoder(serializer),
new ReadTimeoutHandler(nettyServerConfig.getChannelIdleTimeoutSeconds()),
new HeartBeatRespHandler(),
new NettyServerMetricsHandler(statsManager),
new NettyServerHandler()
);

Expand Down Expand Up @@ -241,7 +245,38 @@ class NettyServerHandler extends SimpleChannelInboundHandler<RemotingCommand> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, RemotingCommand cmd) throws Exception {
ServerFlowMonitor.count();
cmd.setReceiveTime(System.currentTimeMillis());
processMessageReceived(ctx, cmd);
}
}

@ChannelHandler.Sharable
class NettyServerMetricsHandler extends ChannelOutboundHandlerAdapter {
private StatsManager statsManage;

public NettyServerMetricsHandler(StatsManager statsManage) {
this.statsManage = statsManage;
}

@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
try {
if (msg instanceof RemotingCommand) {
RemotingCommand remotingCommand = (RemotingCommand) msg;
if (remotingCommand.getCode() == RemotingCommandCode.SERVICE_RESP && remotingCommand.getOriginalCommand() != null) {
RemotingCommand originCommand = remotingCommand.getOriginalCommand();
statsManage.incRpcRequestCost(originCommand.getServiceCode(), System.currentTimeMillis() - originCommand.getReceiveTime());
if (RemotingCommandCode.isFail(remotingCommand.getCode())) {
statsManage.incFailRpcRequestNum(originCommand.getServiceCode());
} else {
statsManage.incSuccessRpcRequestNum(originCommand.getServiceCode());
}
}
}
} catch (Exception e) {
logger.warn("failed to record stats data", e);
}
super.write(ctx, msg, promise);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,13 @@ public class RemotingCommand {
private int serviceCode;
private byte[] body;

/**
* temporary information used for statistics
*/
private transient long receiveTime;

private transient RemotingCommand originalCommand;

public static RemotingCommand createServiceResponseCommand(String remark) {
RemotingCommand cmd = new RemotingCommand();
cmd.setCode(RemotingCommandCode.SERVICE_RESP);
Expand Down Expand Up @@ -71,4 +78,20 @@ public String getRemark() {
public void setRemark(String remark) {
this.remark = remark;
}

public long getReceiveTime() {
return receiveTime;
}

public void setReceiveTime(long receiveTime) {
this.receiveTime = receiveTime;
}

public RemotingCommand getOriginalCommand() {
return originalCommand;
}

public void setOriginalCommand(RemotingCommand originalCommand) {
this.originalCommand = originalCommand;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@ public static RemotingCommandCode valueOf(byte value) {
throw new IllegalArgumentException(String.format("unknown RemotingCommand Type of value :%d", value));
}

public static boolean isFail(RemotingCommandCode code) {
return code == RemotingCommandCode.SYSTEM_BUSY_RESP || code == RemotingCommandCode.SYSTEM_EXCEPTION_RESP;
}

public byte value() {
return this.value;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package org.mengyun.tcctransaction.stats;

import java.util.concurrent.atomic.LongAdder;

/**
* @author Nervose.Wu
* @date 2024/1/19 14:14
*/
public class CounterStatsItem extends StatsItem {
private final LongAdder count;

public CounterStatsItem(String statsName, String statsKey) {
super(statsName, statsKey);
this.count = new LongAdder();
}

public void record(long inc) {
count.add(inc);
}

public long getCount() {
return count.sum();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package org.mengyun.tcctransaction.stats;

import java.util.Arrays;
import java.util.concurrent.atomic.LongAdder;

/**
* @author Nervose.Wu
* @date 2024/1/29 11:14
*/
public class HistogramStatsItem extends StatsItem {
private final LongAdder sum;

private final long[] buckets;
private final LongAdder[] distribution;


public HistogramStatsItem(String statsName, String statsKey, long[] buckets) {
super(statsName, statsKey);
this.buckets = buckets;
this.sum = new LongAdder();
this.distribution = new LongAdder[buckets.length];
Arrays.setAll(this.distribution, key -> new LongAdder());
}

public void record(long inc) {
for (int i = 0; i < buckets.length; i++) {
if (inc <= buckets[i]) {
distribution[i].add(1);
break;
}
}
sum.add(inc);
}

public long getSum() {
return sum.sum();
}

public long[] getDistribution() {
return Arrays.stream(distribution).mapToLong(LongAdder::sum).toArray();
}
}
Loading

0 comments on commit 47f6bae

Please sign in to comment.