diff --git a/pom.xml b/pom.xml index 8fb7387e..131720d4 100644 --- a/pom.xml +++ b/pom.xml @@ -34,7 +34,7 @@ 1.8 - 2.0.2 + 2.1.0 UTF-8 ${java.version} ${java.version} @@ -59,6 +59,7 @@ tcc-transaction-core tcc-transaction-spring tcc-transaction-dashboard + tcc-transaction-prometheus-exporter tcc-transaction-server tcc-transaction-dubbo tcc-transaction-bom diff --git a/tcc-transaction-core/src/main/java/org/mengyun/tcctransaction/TccServer.java b/tcc-transaction-core/src/main/java/org/mengyun/tcctransaction/TccServer.java index ec71ee62..2cf6635d 100644 --- a/tcc-transaction-core/src/main/java/org/mengyun/tcctransaction/TccServer.java +++ b/tcc-transaction-core/src/main/java/org/mengyun/tcctransaction/TccServer.java @@ -26,6 +26,7 @@ import org.mengyun.tcctransaction.serializer.TccRemotingCommandSerializer; import org.mengyun.tcctransaction.serializer.TccTransactionStoreSerializer; import org.mengyun.tcctransaction.serializer.TransactionStoreSerializer; +import org.mengyun.tcctransaction.stats.StatsManager; import org.mengyun.tcctransaction.storage.StorageType; import org.mengyun.tcctransaction.storage.TransactionStorage; import org.mengyun.tcctransaction.storage.TransactionStorageFactory; @@ -73,6 +74,8 @@ public class TccServer implements TccService { private List registryServices; + private StatsManager statsManager; + public TccServer(ServerConfig serverConfig) { if (serverConfig != null) { this.serverConfig = serverConfig; @@ -85,7 +88,11 @@ public TccServer(ServerConfig serverConfig) { throw new SystemException(String.format("unsupported StorageType<%s> in server side.", this.serverConfig.getStorageType().value())); } - this.remotingServer = new NettyRemotingServer(this.remotingCommandSerializer, this.serverConfig); + String instance = StringUtils.isNotEmpty(this.serverConfig.getRegistryAddress()) + ? this.serverConfig.getRegistryAddress() : (NetUtils.getLocalAddress() + ":" + this.serverConfig.getListenPort()); + this.statsManager = new StatsManager(instance); + + this.remotingServer = new NettyRemotingServer(this.remotingCommandSerializer, this.serverConfig, this.statsManager); this.registryServices = RegistryFactory.getInstance(this.serverConfig); @@ -231,6 +238,10 @@ public RecoveryExecutor getRecoveryExecutor() { return recoveryExecutor; } + public StatsManager getStatsManager() { + return statsManager; + } + @ChannelHandler.Sharable class UnregisterScheduleHandler extends ChannelInboundHandlerAdapter { @Override diff --git a/tcc-transaction-core/src/main/java/org/mengyun/tcctransaction/constants/RemotingServiceCode.java b/tcc-transaction-core/src/main/java/org/mengyun/tcctransaction/constants/RemotingServiceCode.java index a04f0955..44b921f6 100644 --- a/tcc-transaction-core/src/main/java/org/mengyun/tcctransaction/constants/RemotingServiceCode.java +++ b/tcc-transaction-core/src/main/java/org/mengyun/tcctransaction/constants/RemotingServiceCode.java @@ -1,6 +1,11 @@ package org.mengyun.tcctransaction.constants; +import java.util.HashMap; +import java.util.Map; + public class RemotingServiceCode { + + public static final int SYSTEM = 0; public static final int CREATE = 1; public static final int UPDATE = 2; public static final int DELETE = 3; @@ -10,6 +15,24 @@ public class RemotingServiceCode { public static final int REGISTER = 7; public static final int DESERIALIZE_TRANSACTION = 8; + private static Map codeToDesc = new HashMap<>(); + + static { + codeToDesc.put(SYSTEM, "SYSTEM"); + codeToDesc.put(CREATE, "CREATE"); + codeToDesc.put(UPDATE, "UPDATE"); + codeToDesc.put(DELETE, "DELETE"); + codeToDesc.put(FIND, "FIND"); + codeToDesc.put(RECOVER_COMMIT, "RECOVER_COMMIT"); + codeToDesc.put(RECOVER_ROLLBACK, "RECOVER_ROLLBACK"); + codeToDesc.put(REGISTER, "REGISTER"); + codeToDesc.put(DESERIALIZE_TRANSACTION, "DESERIALIZE_TRANSACTION"); + } + private RemotingServiceCode() { } + + public static String getDesc(int code) { + return codeToDesc.getOrDefault(code, "UNKNOWN"); + } } diff --git a/tcc-transaction-core/src/main/java/org/mengyun/tcctransaction/recovery/RecoveryScheduler.java b/tcc-transaction-core/src/main/java/org/mengyun/tcctransaction/recovery/RecoveryScheduler.java index c5d95e2e..c286f723 100644 --- a/tcc-transaction-core/src/main/java/org/mengyun/tcctransaction/recovery/RecoveryScheduler.java +++ b/tcc-transaction-core/src/main/java/org/mengyun/tcctransaction/recovery/RecoveryScheduler.java @@ -37,16 +37,20 @@ public void registerScheduleAndStartIfNotPresent(String domain) { } public Scheduler registerScheduleIfNotPresent(String domain) { - if (!schedulers.containsKey(domain)) { - synchronized (RecoveryScheduler.class) { - if (!schedulers.containsKey(domain)) { - Scheduler scheduler = createScheduler(domain); - scheduleJob(scheduler, domain); - schedulers.put(domain, scheduler); - } + Scheduler scheduler = schedulers.get(domain); + if (scheduler != null) { + return scheduler; + } + synchronized (RecoveryScheduler.class) { + scheduler = schedulers.get(domain); + if (scheduler != null) { + return scheduler; } + scheduler = createScheduler(domain); + scheduleJob(scheduler, domain); + schedulers.put(domain, scheduler); + return scheduler; } - return schedulers.get(domain); } public void unregisterSchedule(String domain) { diff --git a/tcc-transaction-core/src/main/java/org/mengyun/tcctransaction/remoting/netty/AbstractNettyRemoting.java b/tcc-transaction-core/src/main/java/org/mengyun/tcctransaction/remoting/netty/AbstractNettyRemoting.java index d67a4533..457d75dc 100644 --- a/tcc-transaction-core/src/main/java/org/mengyun/tcctransaction/remoting/netty/AbstractNettyRemoting.java +++ b/tcc-transaction-core/src/main/java/org/mengyun/tcctransaction/remoting/netty/AbstractNettyRemoting.java @@ -6,6 +6,7 @@ import org.mengyun.tcctransaction.remoting.RequestProcessor; import org.mengyun.tcctransaction.remoting.protocol.RemotingCommand; import org.mengyun.tcctransaction.remoting.protocol.RemotingCommandCode; +import org.mengyun.tcctransaction.stats.StatsManager; import org.mengyun.tcctransaction.utils.NetUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -26,10 +27,11 @@ public abstract class AbstractNettyRemoting { new HashMap<>(64); protected Pair, ExecutorService> defaultRequestProcessor; + protected StatsManager statsManager; + protected ConcurrentMap responseTable = new ConcurrentHashMap<>(256); protected void processMessageReceived(ChannelHandlerContext ctx, RemotingCommand cmd) { - if (cmd.isRequestCommand()) { processRequestCommand(ctx, cmd); } else { @@ -54,7 +56,7 @@ private void processRequestCommand(ChannelHandlerContext ctx, RemotingCommand cm Runnable run = () -> { try { RemotingCommand responseCommand = pair.getLeft().processRequest(ctx, cmd); - + responseCommand.setOriginalCommand(cmd); responseCommand.setCode(RemotingCommandCode.SERVICE_RESP); responseCommand.setRequestId(cmd.getRequestId()); ctx.writeAndFlush(responseCommand); @@ -65,6 +67,7 @@ private void processRequestCommand(ChannelHandlerContext ctx, RemotingCommand cm StringWriter errors = new StringWriter(); e.printStackTrace(new PrintWriter(errors)); RemotingCommand responseCommand = RemotingCommand.createCommand(RemotingCommandCode.SYSTEM_EXCEPTION_RESP, errors.toString()); + responseCommand.setOriginalCommand(cmd); responseCommand.setRequestId(cmd.getRequestId()); ctx.writeAndFlush(responseCommand); } @@ -80,6 +83,7 @@ private void processRequestCommand(ChannelHandlerContext ctx, RemotingCommand cm final RemotingCommand response = RemotingCommand.createCommand(RemotingCommandCode.SYSTEM_BUSY_RESP, "[OVERLOAD]system busy, start flow control for a while"); + response.setOriginalCommand(cmd); response.setRequestId(cmd.getRequestId()); ctx.writeAndFlush(response); } diff --git a/tcc-transaction-core/src/main/java/org/mengyun/tcctransaction/remoting/netty/NettyRemotingServer.java b/tcc-transaction-core/src/main/java/org/mengyun/tcctransaction/remoting/netty/NettyRemotingServer.java index 4e3ee822..818cbd0c 100644 --- a/tcc-transaction-core/src/main/java/org/mengyun/tcctransaction/remoting/netty/NettyRemotingServer.java +++ b/tcc-transaction-core/src/main/java/org/mengyun/tcctransaction/remoting/netty/NettyRemotingServer.java @@ -24,7 +24,9 @@ import org.mengyun.tcctransaction.remoting.exception.RemotingSendRequestException; import org.mengyun.tcctransaction.remoting.exception.RemotingTimeoutException; import org.mengyun.tcctransaction.remoting.protocol.RemotingCommand; +import org.mengyun.tcctransaction.remoting.protocol.RemotingCommandCode; import org.mengyun.tcctransaction.serializer.RemotingCommandSerializer; +import org.mengyun.tcctransaction.stats.StatsManager; import org.mengyun.tcctransaction.support.FactoryBuilder; import org.mengyun.tcctransaction.utils.NetUtils; import org.slf4j.Logger; @@ -54,10 +56,11 @@ public class NettyRemotingServer extends AbstractNettyRemoting implements Remoti private RemotingCommandSerializer serializer; - public NettyRemotingServer(RemotingCommandSerializer serializer, NettyServerConfig nettyServerConfig) { + public NettyRemotingServer(RemotingCommandSerializer serializer, NettyServerConfig nettyServerConfig, StatsManager statsManager) { this.nettyServerConfig = nettyServerConfig; this.serializer = serializer; this.serverBootstrap = new ServerBootstrap(); + this.statsManager = statsManager; if (useEpoll()) { @@ -141,6 +144,7 @@ protected void initChannel(SocketChannel ch) throws Exception { new NettyEncoder(serializer), new ReadTimeoutHandler(nettyServerConfig.getChannelIdleTimeoutSeconds()), new HeartBeatRespHandler(), + new NettyServerMetricsHandler(statsManager), new NettyServerHandler() ); @@ -241,7 +245,38 @@ class NettyServerHandler extends SimpleChannelInboundHandler { @Override protected void channelRead0(ChannelHandlerContext ctx, RemotingCommand cmd) throws Exception { ServerFlowMonitor.count(); + cmd.setReceiveTime(System.currentTimeMillis()); processMessageReceived(ctx, cmd); } } + + @ChannelHandler.Sharable + class NettyServerMetricsHandler extends ChannelOutboundHandlerAdapter { + private StatsManager statsManage; + + public NettyServerMetricsHandler(StatsManager statsManage) { + this.statsManage = statsManage; + } + + @Override + public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { + try { + if (msg instanceof RemotingCommand) { + RemotingCommand remotingCommand = (RemotingCommand) msg; + if (remotingCommand.getCode() == RemotingCommandCode.SERVICE_RESP && remotingCommand.getOriginalCommand() != null) { + RemotingCommand originCommand = remotingCommand.getOriginalCommand(); + statsManage.incRpcRequestCost(originCommand.getServiceCode(), System.currentTimeMillis() - originCommand.getReceiveTime()); + if (RemotingCommandCode.isFail(remotingCommand.getCode())) { + statsManage.incFailRpcRequestNum(originCommand.getServiceCode()); + } else { + statsManage.incSuccessRpcRequestNum(originCommand.getServiceCode()); + } + } + } + } catch (Exception e) { + logger.warn("failed to record stats data", e); + } + super.write(ctx, msg, promise); + } + } } diff --git a/tcc-transaction-core/src/main/java/org/mengyun/tcctransaction/remoting/protocol/RemotingCommand.java b/tcc-transaction-core/src/main/java/org/mengyun/tcctransaction/remoting/protocol/RemotingCommand.java index 3d051342..8a069683 100644 --- a/tcc-transaction-core/src/main/java/org/mengyun/tcctransaction/remoting/protocol/RemotingCommand.java +++ b/tcc-transaction-core/src/main/java/org/mengyun/tcctransaction/remoting/protocol/RemotingCommand.java @@ -12,6 +12,13 @@ public class RemotingCommand { private int serviceCode; private byte[] body; + /** + * temporary information used for statistics + */ + private transient long receiveTime; + + private transient RemotingCommand originalCommand; + public static RemotingCommand createServiceResponseCommand(String remark) { RemotingCommand cmd = new RemotingCommand(); cmd.setCode(RemotingCommandCode.SERVICE_RESP); @@ -71,4 +78,20 @@ public String getRemark() { public void setRemark(String remark) { this.remark = remark; } + + public long getReceiveTime() { + return receiveTime; + } + + public void setReceiveTime(long receiveTime) { + this.receiveTime = receiveTime; + } + + public RemotingCommand getOriginalCommand() { + return originalCommand; + } + + public void setOriginalCommand(RemotingCommand originalCommand) { + this.originalCommand = originalCommand; + } } \ No newline at end of file diff --git a/tcc-transaction-core/src/main/java/org/mengyun/tcctransaction/remoting/protocol/RemotingCommandCode.java b/tcc-transaction-core/src/main/java/org/mengyun/tcctransaction/remoting/protocol/RemotingCommandCode.java index 4094d4b0..2d08112f 100644 --- a/tcc-transaction-core/src/main/java/org/mengyun/tcctransaction/remoting/protocol/RemotingCommandCode.java +++ b/tcc-transaction-core/src/main/java/org/mengyun/tcctransaction/remoting/protocol/RemotingCommandCode.java @@ -37,6 +37,10 @@ public static RemotingCommandCode valueOf(byte value) { throw new IllegalArgumentException(String.format("unknown RemotingCommand Type of value :%d", value)); } + public static boolean isFail(RemotingCommandCode code) { + return code == RemotingCommandCode.SYSTEM_BUSY_RESP || code == RemotingCommandCode.SYSTEM_EXCEPTION_RESP; + } + public byte value() { return this.value; } diff --git a/tcc-transaction-core/src/main/java/org/mengyun/tcctransaction/stats/CounterStatsItem.java b/tcc-transaction-core/src/main/java/org/mengyun/tcctransaction/stats/CounterStatsItem.java new file mode 100644 index 00000000..a1c4f76a --- /dev/null +++ b/tcc-transaction-core/src/main/java/org/mengyun/tcctransaction/stats/CounterStatsItem.java @@ -0,0 +1,24 @@ +package org.mengyun.tcctransaction.stats; + +import java.util.concurrent.atomic.LongAdder; + +/** + * @author Nervose.Wu + * @date 2024/1/19 14:14 + */ +public class CounterStatsItem extends StatsItem { + private final LongAdder count; + + public CounterStatsItem(String statsName, String statsKey) { + super(statsName, statsKey); + this.count = new LongAdder(); + } + + public void record(long inc) { + count.add(inc); + } + + public long getCount() { + return count.sum(); + } +} diff --git a/tcc-transaction-core/src/main/java/org/mengyun/tcctransaction/stats/HistogramStatsItem.java b/tcc-transaction-core/src/main/java/org/mengyun/tcctransaction/stats/HistogramStatsItem.java new file mode 100644 index 00000000..e5f75a46 --- /dev/null +++ b/tcc-transaction-core/src/main/java/org/mengyun/tcctransaction/stats/HistogramStatsItem.java @@ -0,0 +1,42 @@ +package org.mengyun.tcctransaction.stats; + +import java.util.Arrays; +import java.util.concurrent.atomic.LongAdder; + +/** + * @author Nervose.Wu + * @date 2024/1/29 11:14 + */ +public class HistogramStatsItem extends StatsItem { + private final LongAdder sum; + + private final long[] buckets; + private final LongAdder[] distribution; + + + public HistogramStatsItem(String statsName, String statsKey, long[] buckets) { + super(statsName, statsKey); + this.buckets = buckets; + this.sum = new LongAdder(); + this.distribution = new LongAdder[buckets.length]; + Arrays.setAll(this.distribution, key -> new LongAdder()); + } + + public void record(long inc) { + for (int i = 0; i < buckets.length; i++) { + if (inc <= buckets[i]) { + distribution[i].add(1); + break; + } + } + sum.add(inc); + } + + public long getSum() { + return sum.sum(); + } + + public long[] getDistribution() { + return Arrays.stream(distribution).mapToLong(LongAdder::sum).toArray(); + } +} diff --git a/tcc-transaction-core/src/main/java/org/mengyun/tcctransaction/stats/StatsCategory.java b/tcc-transaction-core/src/main/java/org/mengyun/tcctransaction/stats/StatsCategory.java new file mode 100644 index 00000000..ca524f1f --- /dev/null +++ b/tcc-transaction-core/src/main/java/org/mengyun/tcctransaction/stats/StatsCategory.java @@ -0,0 +1,50 @@ +package org.mengyun.tcctransaction.stats; + +import java.util.Arrays; +import java.util.List; + +/** + * @author Nervose.Wu + * @date 2024/1/29 10:51 + */ + +public enum StatsCategory { + TCC_SERVER_SUCCESS_RPC_REQUEST_NUMS(StatsType.COUNTER, Arrays.asList("ServiceCode"), 16, null), + TCC_SERVER_FAIL_RPC_REQUEST_NUMS(StatsType.COUNTER, Arrays.asList("ServiceCode"), 16, null), + TCC_SERVER_RPC_REQUEST_COSTS_DISTRIBUTION(StatsType.HISTOGRAM, Arrays.asList("ServiceCode"), 16, new long[]{5, 10, 20, 40, 80, 160, 320, 640, 1280, 2560, 5120, Long.MAX_VALUE}), + ; + + private final StatsType statsType; + + private final List dimensions; + + private final int initialCapacity; + + /** + * only used by HISTOGRAM type and the last element should be Long.MAX_VALUE + */ + private final long[] buckets; + + StatsCategory(StatsType statsType, List dimensions, int initialCapacity, long[] buckets) { + this.statsType = statsType; + this.dimensions = dimensions; + this.initialCapacity = initialCapacity; + this.buckets = buckets; + } + + public StatsType getStatsType() { + return statsType; + } + + public List getDimensions() { + return dimensions; + } + + public int getInitialCapacity() { + return initialCapacity; + } + + public long[] getBuckets() { + return buckets; + } +} diff --git a/tcc-transaction-core/src/main/java/org/mengyun/tcctransaction/stats/StatsDto.java b/tcc-transaction-core/src/main/java/org/mengyun/tcctransaction/stats/StatsDto.java new file mode 100644 index 00000000..7730a134 --- /dev/null +++ b/tcc-transaction-core/src/main/java/org/mengyun/tcctransaction/stats/StatsDto.java @@ -0,0 +1,90 @@ +package org.mengyun.tcctransaction.stats; + +import java.util.List; +import java.util.Map; + +/** + * @author Nervose.Wu + * @date 2024/1/22 17:40 + */ +public class StatsDto { + + private String instance; + + private List statsItemSetDtos; + + public StatsDto() { + } + + public StatsDto(String instance, List statsItemSetDtos) { + this.instance = instance; + this.statsItemSetDtos = statsItemSetDtos; + } + + public String getInstance() { + return instance; + } + + public void setInstance(String instance) { + this.instance = instance; + } + + public List getStatsItemSetDtos() { + return statsItemSetDtos; + } + + public void setStatsItemSetDtos(List statsItemSetDtos) { + this.statsItemSetDtos = statsItemSetDtos; + } + + public static class StatsItemSetDto { + private String statsName; + private String statsType; + private List labels; + private Map valueMap; + private Map valuesMap; + + public StatsItemSetDto() { + } + + public String getStatsName() { + return statsName; + } + + public void setStatsName(String statsName) { + this.statsName = statsName; + } + + public List getLabels() { + return labels; + } + + public void setLabels(List labels) { + this.labels = labels; + } + + public String getStatsType() { + return statsType; + } + + public void setStatsType(String statsType) { + this.statsType = statsType; + } + + public Map getValueMap() { + return valueMap; + } + + public void setValueMap(Map valueMap) { + this.valueMap = valueMap; + } + + public Map getValuesMap() { + return valuesMap; + } + + public void setValuesMap(Map valuesMap) { + this.valuesMap = valuesMap; + } + } +} diff --git a/tcc-transaction-core/src/main/java/org/mengyun/tcctransaction/stats/StatsItem.java b/tcc-transaction-core/src/main/java/org/mengyun/tcctransaction/stats/StatsItem.java new file mode 100644 index 00000000..399ecea7 --- /dev/null +++ b/tcc-transaction-core/src/main/java/org/mengyun/tcctransaction/stats/StatsItem.java @@ -0,0 +1,24 @@ +package org.mengyun.tcctransaction.stats; + +/** + * @author Nervose.Wu + * @date 2024/1/29 11:15 + */ +public abstract class StatsItem { + + protected final String statsName; + protected final String statsKey; + + public StatsItem(String statsName, String statsKey) { + this.statsName = statsName; + this.statsKey = statsKey; + } + + public String getStatsName() { + return statsName; + } + + public String getStatsKey() { + return statsKey; + } +} diff --git a/tcc-transaction-core/src/main/java/org/mengyun/tcctransaction/stats/StatsItemSet.java b/tcc-transaction-core/src/main/java/org/mengyun/tcctransaction/stats/StatsItemSet.java new file mode 100644 index 00000000..6250e44b --- /dev/null +++ b/tcc-transaction-core/src/main/java/org/mengyun/tcctransaction/stats/StatsItemSet.java @@ -0,0 +1,43 @@ +package org.mengyun.tcctransaction.stats; + +import org.mengyun.tcctransaction.utils.CollectionUtils; + +import java.util.concurrent.ConcurrentHashMap; +import java.util.function.Function; + +/** + * 某种类型的指标集合 + * @author Nervose.Wu + * @date 2024/1/19 14:11 + */ +public class StatsItemSet { + + private final StatsCategory statsCategory; + + private final ConcurrentHashMap statItemMap; + + public StatsItemSet(StatsCategory statsCategory) { + this.statsCategory = statsCategory; + this.statItemMap = new ConcurrentHashMap<>(statsCategory.getInitialCapacity()); + } + + public StatsCategory getStatsCategory() { + return statsCategory; + } + + public ConcurrentHashMap getStatItemMap() { + return statItemMap; + } + + public T getOrCreate(String key, Function function){ + return CollectionUtils.fixedConcurrentComputeIfAbsent(statItemMap, key, function); + } + + //public StatsDto.StatsItemSetDto getData() { + // StatsDto.StatsItemSetDto statsItemSetDto = new StatsDto.StatsItemSetDto(); + // statsItemSetDto.setStatsName(statsName); + // statsItemSetDto.setLabels(labels); + // statsItemSetDto.setValueMap(statItemMap.values().stream().collect(Collectors.toMap(StatsItem::getStatsKey, StatsItem::getValue))); + // return statsItemSetDto; + //} +} diff --git a/tcc-transaction-core/src/main/java/org/mengyun/tcctransaction/stats/StatsManager.java b/tcc-transaction-core/src/main/java/org/mengyun/tcctransaction/stats/StatsManager.java new file mode 100644 index 00000000..c919b0e5 --- /dev/null +++ b/tcc-transaction-core/src/main/java/org/mengyun/tcctransaction/stats/StatsManager.java @@ -0,0 +1,97 @@ +package org.mengyun.tcctransaction.stats; + +import org.mengyun.tcctransaction.constants.RemotingServiceCode; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +/** + * @author Nervose.Wu + * @date 2024/1/19 13:57 + */ +public class StatsManager { + + public static final String KEY_SPLITER = "@"; + + private final HashMap> statsTable = new HashMap<>(); + + public final String instance; + + + public StatsManager(String instance) { + this.instance = instance; + for (StatsCategory each : StatsCategory.values()) { + if (each.getStatsType() == StatsType.COUNTER) { + statsTable.put(each, new StatsItemSet(each)); + } else if (each.getStatsType() == StatsType.HISTOGRAM) { + statsTable.put(each, new StatsItemSet(each)); + } + } + } + + @SuppressWarnings("unchecked") + public void incSuccessRpcRequestNum(int serviceCode) { + ((StatsItemSet) statsTable.get(StatsCategory.TCC_SERVER_SUCCESS_RPC_REQUEST_NUMS)) + .getOrCreate(buildStatesKey(RemotingServiceCode.getDesc(serviceCode)), key -> new CounterStatsItem(StatsCategory.TCC_SERVER_SUCCESS_RPC_REQUEST_NUMS.name(), key)).record(1); + } + + @SuppressWarnings("unchecked") + public void incFailRpcRequestNum(int serviceCode) { + ((StatsItemSet) statsTable.get(StatsCategory.TCC_SERVER_FAIL_RPC_REQUEST_NUMS)) + .getOrCreate(buildStatesKey(RemotingServiceCode.getDesc(serviceCode)), key -> new CounterStatsItem(StatsCategory.TCC_SERVER_FAIL_RPC_REQUEST_NUMS.name(), key)).record(1); + } + + @SuppressWarnings("unchecked") + public void incRpcRequestCost(int serviceCode, long timeInMills) { + ((StatsItemSet) statsTable.get(StatsCategory.TCC_SERVER_RPC_REQUEST_COSTS_DISTRIBUTION)) + .getOrCreate(buildStatesKey(RemotingServiceCode.getDesc(serviceCode)), key -> new HistogramStatsItem(StatsCategory.TCC_SERVER_RPC_REQUEST_COSTS_DISTRIBUTION.name(), key, StatsCategory.TCC_SERVER_RPC_REQUEST_COSTS_DISTRIBUTION.getBuckets())).record(timeInMills); + } + + public String buildStatesKey(String label) { + return label; + } + + public String buildStatesKey(String... labels) { + StringBuilder sb = new StringBuilder(); + for (String label : labels) { + sb.append(label); + sb.append(KEY_SPLITER); + } + sb.deleteCharAt(sb.lastIndexOf(KEY_SPLITER)); + return sb.toString(); + } + + @SuppressWarnings("unchecked") + public StatsDto getStats() { + StatsDto statsDto = new StatsDto(); + statsDto.setInstance(instance); + + List statsItemSetDtos = new ArrayList<>(); + statsTable.forEach((key, value) -> { + StatsDto.StatsItemSetDto statsItemSetDto =new StatsDto.StatsItemSetDto(); + statsItemSetDto.setStatsName(key.name()); + statsItemSetDto.setLabels(key.getDimensions()); + statsItemSetDto.setStatsType(key.getStatsType().name()); + + if (key.getStatsType() == StatsType.COUNTER) { + statsItemSetDto.setValueMap( + ((Map) value.getStatItemMap()).entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, each -> each.getValue().getCount())) + ); + } else if(key.getStatsType()==StatsType.HISTOGRAM){ + statsItemSetDto.setValueMap( + ((Map) value.getStatItemMap()).entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, each -> each.getValue().getSum())) + ); + statsItemSetDto.setValuesMap( + ((Map) value.getStatItemMap()).entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, each -> each.getValue().getDistribution())) + ); + } + statsItemSetDtos.add(statsItemSetDto); + }); + statsDto.setStatsItemSetDtos(statsItemSetDtos); + + return statsDto; + } +} \ No newline at end of file diff --git a/tcc-transaction-core/src/main/java/org/mengyun/tcctransaction/stats/StatsSupplier.java b/tcc-transaction-core/src/main/java/org/mengyun/tcctransaction/stats/StatsSupplier.java new file mode 100644 index 00000000..8e96a45a --- /dev/null +++ b/tcc-transaction-core/src/main/java/org/mengyun/tcctransaction/stats/StatsSupplier.java @@ -0,0 +1,12 @@ +package org.mengyun.tcctransaction.stats; + +import java.util.List; + +/** + * @author Nervose.Wu + * @date 2024/1/22 19:17 + */ +public interface StatsSupplier { + List getStatsDtoList(); + +} diff --git a/tcc-transaction-core/src/main/java/org/mengyun/tcctransaction/stats/StatsType.java b/tcc-transaction-core/src/main/java/org/mengyun/tcctransaction/stats/StatsType.java new file mode 100644 index 00000000..596fece1 --- /dev/null +++ b/tcc-transaction-core/src/main/java/org/mengyun/tcctransaction/stats/StatsType.java @@ -0,0 +1,27 @@ +package org.mengyun.tcctransaction.stats; + +import java.util.Arrays; +import java.util.Collections; +import java.util.Map; +import java.util.function.Function; +import java.util.stream.Collectors; + +/** + * @author Nervose.Wu + * @date 2024/1/29 10:23 + */ +public enum StatsType { + COUNTER, + HISTOGRAM, + ; + + private final static Map statsTypeMap; + + static { + statsTypeMap = Collections.unmodifiableMap(Arrays.stream(StatsType.values()).collect(Collectors.toMap(StatsType::name, Function.identity()))); + } + + public static StatsType findByName(String name) { + return statsTypeMap.get(name); + } +} diff --git a/tcc-transaction-core/src/main/java/org/mengyun/tcctransaction/utils/CollectionUtils.java b/tcc-transaction-core/src/main/java/org/mengyun/tcctransaction/utils/CollectionUtils.java index 2e64172b..84192a7f 100644 --- a/tcc-transaction-core/src/main/java/org/mengyun/tcctransaction/utils/CollectionUtils.java +++ b/tcc-transaction-core/src/main/java/org/mengyun/tcctransaction/utils/CollectionUtils.java @@ -3,6 +3,8 @@ import java.util.ArrayList; import java.util.Collection; import java.util.List; +import java.util.concurrent.ConcurrentMap; +import java.util.function.Function; /** * Created by changming.xie on 2/21/16. @@ -31,4 +33,14 @@ public static List merge(List firstList, List secondList) { } return mergedList; } + + /** + * Workaround for problem: https://bugs.openjdk.java.net/browse/JDK-8161372 + */ + public static V fixedConcurrentComputeIfAbsent(ConcurrentMap concurrentMap, K key, + Function mappingFunction) { + V v, newValue; + return ((v = concurrentMap.get(key)) == null && (newValue = mappingFunction.apply(key)) != null && + (v = concurrentMap.putIfAbsent(key, newValue)) == null) ? newValue : v; + } } \ No newline at end of file diff --git a/tcc-transaction-dashboard/pom.xml b/tcc-transaction-dashboard/pom.xml index dbd4e72f..e9a55524 100644 --- a/tcc-transaction-dashboard/pom.xml +++ b/tcc-transaction-dashboard/pom.xml @@ -103,6 +103,12 @@ tcc-transaction-spring-boot-starter ${project.version} + + + org.mengyun + tcc-transaction-prometheus-exporter + ${project.version} + diff --git a/tcc-transaction-dashboard/src/main/java/org/mengyun/tcctransaction/dashboard/security/SecurityConfig.java b/tcc-transaction-dashboard/src/main/java/org/mengyun/tcctransaction/dashboard/security/SecurityConfig.java index 62585f51..a3a2598f 100644 --- a/tcc-transaction-dashboard/src/main/java/org/mengyun/tcctransaction/dashboard/security/SecurityConfig.java +++ b/tcc-transaction-dashboard/src/main/java/org/mengyun/tcctransaction/dashboard/security/SecurityConfig.java @@ -45,7 +45,8 @@ protected void configure(HttpSecurity http) throws Exception { "/*.png", "/robots.txt", "/favicon.ico", - "/api/user/login" + "/api/user/login", + "/prometheus/metrics" ).permitAll().and() .authorizeRequests().anyRequest().authenticated().and().exceptionHandling() //鉴权不需要 .authenticationEntryPoint(new TccAuthenticationEntryPoint()).accessDeniedHandler(new TccAccessDeniedHandler()); diff --git a/tcc-transaction-dashboard/src/main/java/org/mengyun/tcctransaction/dashboard/service/impl/tccserver/TccServerTransactionServiceImpl.java b/tcc-transaction-dashboard/src/main/java/org/mengyun/tcctransaction/dashboard/service/impl/tccserver/TccServerTransactionServiceImpl.java index cdc4692e..81030de7 100644 --- a/tcc-transaction-dashboard/src/main/java/org/mengyun/tcctransaction/dashboard/service/impl/tccserver/TccServerTransactionServiceImpl.java +++ b/tcc-transaction-dashboard/src/main/java/org/mengyun/tcctransaction/dashboard/service/impl/tccserver/TccServerTransactionServiceImpl.java @@ -6,6 +6,8 @@ import org.mengyun.tcctransaction.dashboard.enums.ResponseCodeEnum; import org.mengyun.tcctransaction.dashboard.service.TransactionService; import org.mengyun.tcctransaction.dashboard.service.condition.TccServerStorageCondition; +import org.mengyun.tcctransaction.stats.StatsDto; +import org.mengyun.tcctransaction.stats.StatsSupplier; import org.mengyun.tcctransaction.storage.TransactionStorage; import org.mengyun.tcctransaction.utils.CollectionUtils; import org.slf4j.Logger; @@ -17,7 +19,10 @@ import org.springframework.util.StringUtils; import org.springframework.web.client.RestTemplate; +import java.util.ArrayList; import java.util.List; +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicInteger; /** * @Author huabao.fang @@ -25,12 +30,14 @@ **/ @Conditional(TccServerStorageCondition.class) @Service -public class TccServerTransactionServiceImpl implements TransactionService { +public class TccServerTransactionServiceImpl implements TransactionService, StatsSupplier { private Logger logger = LoggerFactory.getLogger(TccServerTransactionServiceImpl.class); private static final String REQUEST_METHOD_TRANSACTION_DETAIL = "transaction/detail"; + private static final String REQUEST_METHOD_STATS = "server/stats"; + @Autowired private TccServerFeignClient tccServerFeignClient; @@ -40,6 +47,20 @@ public class TccServerTransactionServiceImpl implements TransactionService { @Autowired private RestTemplate restTemplate; + private final ExecutorService executorService = new ThreadPoolExecutor(5, + 5, + 0, + TimeUnit.MILLISECONDS, + new LinkedBlockingQueue<>(100), + new ThreadFactory() { + private final AtomicInteger threadIndex = new AtomicInteger(0); + + @Override + public Thread newThread(Runnable r) { + return new Thread(r, String.format("StatsDtoObtainThread_%d", threadIndex.getAndIncrement())); + } + }); + @Override public ResponseDto list(TransactionPageRequestDto requestDto) { return tccServerFeignClient.transactionList(requestDto); @@ -48,7 +69,7 @@ public ResponseDto list(TransactionPageRequestDto requestDto @Override public ResponseDto detail(TransactionDetailRequestDto requestDto) { - List servers = springClientFactory.getLoadBalancer(DashboardConstant.TCC_SERVER_GROUP).getReachableServers(); + List servers = getServers(); if (CollectionUtils.isEmpty(servers)) { return ResponseDto.returnFail(ResponseCodeEnum.TRANSACTION_DETAIL_NO_INSTANCES); } @@ -115,4 +136,44 @@ public ResponseDto delete(TransactionOperateRequestDto requestDto) { public TransactionStorage getTransactionStorage() { return null; } + + @Override + public List getStatsDtoList() { + CompletionService completionService = new ExecutorCompletionService<>(executorService); + int added = 0; + for (Server server : getServers()) { + try { + completionService.submit(() -> getStatsDto(server)); + added++; + } catch (RejectedExecutionException e) { + logger.warn("system thread pool busy", e); + } + } + List res = new ArrayList<>(); + for (int i = 0; i < added; i++) { + try { + StatsDto statsDto = completionService.take().get(); + if (statsDto != null) { + res.add(statsDto); + } + } catch (Exception e) { + logger.warn("request server stats failed!", e); + } + } + return res; + } + + public StatsDto getStatsDto(Server server) { + String statsRequestUrl = "http://" + .concat(server.getHostPort()) + .concat("/") + .concat(DashboardConstant.TCC_SERVER_GROUP) + .concat("/") + .concat(REQUEST_METHOD_STATS); + return restTemplate.getForObject(statsRequestUrl, StatsDto.class); + } + + private List getServers(){ + return springClientFactory.getLoadBalancer(DashboardConstant.TCC_SERVER_GROUP).getReachableServers(); + } } diff --git a/tcc-transaction-dependencies/pom.xml b/tcc-transaction-dependencies/pom.xml index 04e5c67d..4c04e941 100644 --- a/tcc-transaction-dependencies/pom.xml +++ b/tcc-transaction-dependencies/pom.xml @@ -337,12 +337,6 @@ 1.4.1 - - org.yaml - snakeyaml - 1.33 - - org.springframework.cloud spring-cloud-starter-netflix-ribbon @@ -409,6 +403,17 @@ grpc-server-spring-boot-starter 2.0.1.RELEASE + + + io.prometheus + simpleclient + 0.16.0 + + + io.prometheus + simpleclient_servlet + 0.16.0 + diff --git a/tcc-transaction-prometheus-exporter/grafana-dashboard-example.json b/tcc-transaction-prometheus-exporter/grafana-dashboard-example.json new file mode 100644 index 00000000..0b3956ce --- /dev/null +++ b/tcc-transaction-prometheus-exporter/grafana-dashboard-example.json @@ -0,0 +1,391 @@ +{ + "__inputs": [ + { + "name": "DS_PROMETHEUS", + "label": "Prometheus", + "description": "", + "type": "datasource", + "pluginId": "prometheus", + "pluginName": "Prometheus" + } + ], + "annotations": { + "list": [ + { + "builtIn": 1, + "datasource": { + "type": "grafana", + "uid": "-- Grafana --" + }, + "enable": true, + "hide": true, + "iconColor": "rgba(0, 211, 255, 1)", + "name": "Annotations & Alerts", + "type": "dashboard" + } + ] + }, + "editable": true, + "fiscalYearStartMonth": 0, + "graphTooltip": 0, + "id": 231, + "links": [], + "liveNow": false, + "panels": [ + { + "datasource": "${DS_PROMETHEUS}", + "description": "", + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "drawStyle": "line", + "fillOpacity": 0, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "auto", + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "red", + "value": 80 + } + ] + } + }, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 12, + "x": 0, + "y": 0 + }, + "id": 1, + "options": { + "legend": { + "calcs": [], + "displayMode": "list", + "placement": "bottom", + "showLegend": true + }, + "tooltip": { + "mode": "single", + "sort": "none" + } + }, + "targets": [ + { + "datasource": "${DS_PROMETHEUS}", + "editorMode": "code", + "expr": "((sum(rate(TCC_SERVER_SUCCESS_RPC_REQUEST_NUMS_total{Instance=~\"$instance\",ServiceCode=~\"$serviceCode\"}[2m])) by ()) or on() vector(0))\n+ ((sum(rate(TCC_SERVER_FAIL_RPC_REQUEST_NUMS_total{Instance=~\"$instance\",ServiceCode=~\"$serviceCode\"}[2m])) by ()) or on() vector(0))", + "legendFormat": "QPS", + "range": true, + "refId": "A" + } + ], + "title": "请求QPS", + "type": "timeseries" + }, + { + "datasource": "${DS_PROMETHEUS}", + "description": "", + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "drawStyle": "line", + "fillOpacity": 0, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "auto", + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "red", + "value": 80 + } + ] + }, + "unit": "ms" + }, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 12, + "x": 12, + "y": 0 + }, + "id": 2, + "options": { + "legend": { + "calcs": [], + "displayMode": "list", + "placement": "bottom", + "showLegend": true + }, + "tooltip": { + "mode": "single", + "sort": "none" + } + }, + "targets": [ + { + "datasource": "${DS_PROMETHEUS}", + "editorMode": "code", + "expr": "(sum(rate(TCC_SERVER_RPC_REQUEST_COSTS_DISTRIBUTION_count{Instance=~\"$instance\",ServiceCode=~\"$serviceCode\"}[2m])) by (Instance,ServiceCode)) /\n((sum(rate(TCC_SERVER_RPC_REQUEST_COSTS_DISTRIBUTION_sum{Instance=~\"$instance\",ServiceCode=~\"$serviceCode\"}[2m])) by (Instance,ServiceCode)) )", + "legendFormat": "{{Instance}}-{{ServiceCode}} avg", + "range": true, + "refId": "A" + }, + { + "datasource": "${DS_PROMETHEUS}", + "editorMode": "code", + "expr": "histogram_quantile(0.9, rate(TCC_SERVER_RPC_REQUEST_COSTS_DISTRIBUTION_bucket{Instance=~\"$instance\",ServiceCode=~\"$serviceCode\"}[2m]) )", + "hide": false, + "legendFormat": "{{Instance}}-{{ServiceCode}} 90", + "range": true, + "refId": "B" + }, + { + "datasource": "${DS_PROMETHEUS}", + "editorMode": "code", + "expr": "histogram_quantile(0.99, rate(TCC_SERVER_RPC_REQUEST_COSTS_DISTRIBUTION_bucket{Instance=~\"$instance\",ServiceCode=~\"$serviceCode\"}[2m]) )", + "hide": false, + "legendFormat": "{{Instance}}-{{ServiceCode}} 99", + "range": true, + "refId": "C" + }, + { + "datasource": "${DS_PROMETHEUS}", + "editorMode": "code", + "expr": "histogram_quantile(1, rate(TCC_SERVER_RPC_REQUEST_COSTS_DISTRIBUTION_bucket{Instance=~\"$instance\",ServiceCode=~\"$serviceCode\"}[2m]) )", + "hide": false, + "legendFormat": "{{Instance}}-{{ServiceCode}} max", + "range": true, + "refId": "D" + } + ], + "title": "请求耗时", + "type": "timeseries" + }, + { + "datasource": "${DS_PROMETHEUS}", + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "drawStyle": "line", + "fillOpacity": 0, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "auto", + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "max": 105, + "min": 0, + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "red", + "value": 80 + } + ] + }, + "unit": "%" + }, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 12, + "x": 0, + "y": 8 + }, + "id": 3, + "options": { + "legend": { + "calcs": [], + "displayMode": "list", + "placement": "bottom", + "showLegend": true + }, + "tooltip": { + "mode": "single", + "sort": "none" + } + }, + "targets": [ + { + "datasource": "${DS_PROMETHEUS}", + "editorMode": "code", + "expr": "100*(sum(rate(TCC_SERVER_SUCCESS_RPC_REQUEST_NUMS_total{Instance=~\"$instance\",ServiceCode=~\"$serviceCode\"}[2m])) by () or on() vector(0)) /\n((sum(rate(TCC_SERVER_SUCCESS_RPC_REQUEST_NUMS_total{Instance=~\"$instance\",ServiceCode=~\"$serviceCode\"}[2m])) by () or on() vector(0)) + ((sum(rate(TCC_SERVER_FAIL_RPC_REQUEST_NUMS_total{Instance=~\"$instance\",ServiceCode=~\"$serviceCode\"}[2m])) by () or on() vector(0))))", + "legendFormat": "成功率", + "range": true, + "refId": "A" + } + ], + "title": "请求成功率", + "type": "timeseries" + } + ], + "refresh": "", + "schemaVersion": 38, + "style": "dark", + "tags": [], + "templating": { + "list": [ + { + "current": { + "selected": false + }, + "datasource": "${DS_PROMETHEUS}", + "definition": "label_values(TCC_SERVER_SUCCESS_RPC_REQUEST_NUMS_total,Instance)", + "description": "实例", + "hide": 0, + "includeAll": false, + "label": "实例", + "multi": true, + "name": "instance", + "options": [], + "query": { + "query": "label_values(TCC_SERVER_SUCCESS_RPC_REQUEST_NUMS_total,Instance)", + "refId": "PrometheusVariableQueryEditor-VariableQuery" + }, + "refresh": 1, + "regex": "", + "skipUrlSync": false, + "sort": 0, + "type": "query" + }, + { + "current": { + "selected": false + }, + "datasource": "${DS_PROMETHEUS}", + "definition": "label_values(TCC_SERVER_SUCCESS_RPC_REQUEST_NUMS_total,ServiceCode)", + "description": "请求方法", + "hide": 0, + "includeAll": false, + "label": "请求方法", + "multi": true, + "name": "serviceCode", + "options": [], + "query": { + "query": "label_values(TCC_SERVER_SUCCESS_RPC_REQUEST_NUMS_total,ServiceCode)", + "refId": "PrometheusVariableQueryEditor-VariableQuery" + }, + "refresh": 1, + "regex": "", + "skipUrlSync": false, + "sort": 0, + "type": "query" + } + ] + }, + "time": { + "from": "now-1h", + "to": "now" + }, + "timepicker": {}, + "timezone": "", + "title": "TCC监控仪表盘", + "uid": "ScbBKUFec4", + "version": 14, + "weekStart": "" +} \ No newline at end of file diff --git a/tcc-transaction-prometheus-exporter/pom.xml b/tcc-transaction-prometheus-exporter/pom.xml new file mode 100644 index 00000000..25fbf24f --- /dev/null +++ b/tcc-transaction-prometheus-exporter/pom.xml @@ -0,0 +1,53 @@ + + + + tcc-transaction-parent + org.mengyun + ${revision} + ../tcc-transaction-parent/pom.xml + + 4.0.0 + tcc-transaction-prometheus-exporter + + tcc-transaction-prometheus-exporter ${project.version} + + + + org.mengyun + tcc-transaction-core + ${project.version} + + + + org.springframework + spring-web + + + + org.apache.tomcat.embed + tomcat-embed-core + + + tomcat-annotations-api + org.apache.tomcat + + + + + + io.prometheus + simpleclient + + + + io.prometheus + simpleclient_servlet + + + + org.aspectj + aspectjweaver + + + diff --git a/tcc-transaction-prometheus-exporter/src/main/java/org/mengyun/tcctransaction/prometheus/exporter/PrometheusMetricsCollector.java b/tcc-transaction-prometheus-exporter/src/main/java/org/mengyun/tcctransaction/prometheus/exporter/PrometheusMetricsCollector.java new file mode 100644 index 00000000..8aa2a4e4 --- /dev/null +++ b/tcc-transaction-prometheus-exporter/src/main/java/org/mengyun/tcctransaction/prometheus/exporter/PrometheusMetricsCollector.java @@ -0,0 +1,123 @@ +package org.mengyun.tcctransaction.prometheus.exporter; + + +import io.prometheus.client.Collector; +import io.prometheus.client.CounterMetricFamily; +import org.mengyun.tcctransaction.stats.StatsCategory; +import org.mengyun.tcctransaction.stats.StatsDto; +import org.mengyun.tcctransaction.stats.StatsManager; +import org.mengyun.tcctransaction.stats.StatsSupplier; + +import java.util.*; + +/** + * @author Nervose.Wu + * @date 2024/1/22 17:21 + */ +public class PrometheusMetricsCollector extends Collector { + + private StatsSupplier statsSupplier; + + private static final String INSTANCE = "Instance"; + + public PrometheusMetricsCollector(StatsSupplier statsSupplier) { + this.statsSupplier = statsSupplier; + } + + @Override + public List collect() { + List statsDtos = statsSupplier.getStatsDtoList(); + Map> groupedStatsDtos = groupByStatsNameAndInstance(statsDtos); + List samples = new ArrayList<>(); + collectRequestNums(groupedStatsDtos, samples); + collectRequestCost(groupedStatsDtos, samples); + return samples; + } + + private void collectRequestCost(Map> groupedStatsDtos, List samples) { + collectorHistogram(groupedStatsDtos, samples, StatsCategory.TCC_SERVER_RPC_REQUEST_COSTS_DISTRIBUTION); + } + + private void collectRequestNums(Map> groupedStatsDtos, List samples) { + collectorCounter(groupedStatsDtos, samples, StatsCategory.TCC_SERVER_SUCCESS_RPC_REQUEST_NUMS); + collectorCounter(groupedStatsDtos, samples, StatsCategory.TCC_SERVER_FAIL_RPC_REQUEST_NUMS); + } + + private void collectorCounter(Map> groupedStatsDtos, List samples, StatsCategory statsCategory) { + String statsName = statsCategory.name(); + Map statsItemSetDtoMap = groupedStatsDtos.get(statsName); + if (!statsItemSetDtoMap.isEmpty()) { + List labelNames = new ArrayList<>(); + labelNames.add(INSTANCE); + labelNames.addAll(statsItemSetDtoMap.values().iterator().next().getLabels()); + CounterMetricFamily counterMetricFamily = new CounterMetricFamily(statsName, statsName, labelNames); + for (Map.Entry entry : statsItemSetDtoMap.entrySet()) { + for (Map.Entry valueMapEntry : entry.getValue().getValueMap().entrySet()) { + List labelValues = new ArrayList<>(); + labelValues.add(entry.getKey()); + labelValues.addAll(Arrays.asList(valueMapEntry.getKey().split(StatsManager.KEY_SPLITER))); + counterMetricFamily.addMetric(labelValues, valueMapEntry.getValue()); + } + } + if (!counterMetricFamily.samples.isEmpty()) { + samples.add(counterMetricFamily); + } + } + } + + private void collectorHistogram(Map> groupedStatsDtos, List samples, StatsCategory statsCategory) { + String statsName = statsCategory.name(); + Map statsItemSetDtoMap = groupedStatsDtos.get(statsName); + if (!statsItemSetDtoMap.isEmpty()) { + List curSamples = new ArrayList<>(); + List labelNames = new ArrayList<>(); + labelNames.add(INSTANCE); + labelNames.addAll(statsItemSetDtoMap.values().iterator().next().getLabels()); + List labelNamesWithLe = new ArrayList<>(labelNames); + labelNamesWithLe.add("le"); + for (String instanceName : statsItemSetDtoMap.keySet()) { + StatsDto.StatsItemSetDto statsItemSetDto = statsItemSetDtoMap.get(instanceName); + for (String labelKey : statsItemSetDto.getValuesMap().keySet()) { + long count = 0; + List labelValues = new ArrayList<>(); + labelValues.add(instanceName); + labelValues.addAll(Arrays.asList(labelKey.split(StatsManager.KEY_SPLITER))); + long[] values = statsItemSetDto.getValuesMap().get(labelKey); + long sum = statsItemSetDto.getValueMap().getOrDefault(labelKey, 0L); + for (int i = 0; i < statsCategory.getBuckets().length; i++) { + List labelValuesWithLe = new ArrayList<>(labelValues); + labelValuesWithLe.add(longToGoString(statsCategory.getBuckets()[i])); + count += values[i]; + curSamples.add(new MetricFamilySamples.Sample(statsName + "_bucket", labelNamesWithLe, labelValuesWithLe, count)); + } + curSamples.add(new MetricFamilySamples.Sample(statsName + "_count", labelNames, labelValues, count)); + curSamples.add(new MetricFamilySamples.Sample(statsName + "_sum", labelNames, labelValues, sum)); + } + } + if (!curSamples.isEmpty()) { + samples.add(new MetricFamilySamples(statsCategory.name(), Type.HISTOGRAM, statsCategory.name(), curSamples)); + } + + } + } + + private Map> groupByStatsNameAndInstance(List statsDtos) { + Map> res = new HashMap<>(); + for (StatsDto statsDto : statsDtos) { + for (StatsDto.StatsItemSetDto statsItemSetDto : statsDto.getStatsItemSetDtos()) { + res.computeIfAbsent(statsItemSetDto.getStatsName(), key -> new HashMap<>()).put(statsDto.getInstance(), statsItemSetDto); + } + } + return res; + } + + private String longToGoString(long value) { + if (value == Long.MAX_VALUE) { + return "+Inf"; + } + if (value == Long.MIN_VALUE) { + return "-Inf"; + } + return Long.toString(value); + } +} diff --git a/tcc-transaction-server/pom.xml b/tcc-transaction-server/pom.xml index 6af68591..0393886a 100644 --- a/tcc-transaction-server/pom.xml +++ b/tcc-transaction-server/pom.xml @@ -31,6 +31,12 @@ ${project.version} + + org.mengyun + tcc-transaction-prometheus-exporter + ${project.version} + + org.springframework.boot spring-boot-starter-web diff --git a/tcc-transaction-server/src/main/java/org/mengyun/tcctransaction/server/controller/StatsController.java b/tcc-transaction-server/src/main/java/org/mengyun/tcctransaction/server/controller/StatsController.java new file mode 100644 index 00000000..706cd571 --- /dev/null +++ b/tcc-transaction-server/src/main/java/org/mengyun/tcctransaction/server/controller/StatsController.java @@ -0,0 +1,26 @@ +package org.mengyun.tcctransaction.server.controller; + +import org.mengyun.tcctransaction.TccServer; +import org.mengyun.tcctransaction.stats.StatsDto; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RestController; + +/** + * @author Nervose.Wu + * @date 2024/1/19 17:10 + */ +@RestController +public class StatsController { + + @Autowired + private TccServer tccServer; + + @RequestMapping("/server/stats") + public StatsDto metrics() { + if (tccServer.getStatsManager() == null) { + return null; + } + return tccServer.getStatsManager().getStats(); + } +} diff --git a/tcc-transaction-spring-boot-starter/pom.xml b/tcc-transaction-spring-boot-starter/pom.xml index 981f6f93..7c590427 100644 --- a/tcc-transaction-spring-boot-starter/pom.xml +++ b/tcc-transaction-spring-boot-starter/pom.xml @@ -44,6 +44,13 @@ true + + org.mengyun + tcc-transaction-prometheus-exporter + ${project.version} + true + + org.springframework.boot spring-boot-configuration-processor diff --git a/tcc-transaction-spring-boot-starter/src/main/java/org/mengyun/tcctransaction/springboot/starter/TccTransactionAutoConfiguration.java b/tcc-transaction-spring-boot-starter/src/main/java/org/mengyun/tcctransaction/springboot/starter/TccTransactionAutoConfiguration.java index e726cbed..2317670e 100644 --- a/tcc-transaction-spring-boot-starter/src/main/java/org/mengyun/tcctransaction/springboot/starter/TccTransactionAutoConfiguration.java +++ b/tcc-transaction-spring-boot-starter/src/main/java/org/mengyun/tcctransaction/springboot/starter/TccTransactionAutoConfiguration.java @@ -1,5 +1,7 @@ package org.mengyun.tcctransaction.springboot.starter; +import io.prometheus.client.CollectorRegistry; +import io.prometheus.client.exporter.MetricsServlet; import net.devh.springboot.autoconfigure.grpc.client.GlobalClientInterceptorConfigurerAdapter; import net.devh.springboot.autoconfigure.grpc.client.GlobalClientInterceptorRegistry; import net.devh.springboot.autoconfigure.grpc.server.GlobalServerInterceptorConfigurerAdapter; @@ -7,6 +9,7 @@ import org.mengyun.tcctransaction.ClientConfig; import org.mengyun.tcctransaction.grpc.interceptor.TransactionContextClientInterceptor; import org.mengyun.tcctransaction.grpc.interceptor.TransactionContextServerInterceptor; +import org.mengyun.tcctransaction.prometheus.exporter.PrometheusMetricsCollector; import org.mengyun.tcctransaction.properties.RecoveryProperties; import org.mengyun.tcctransaction.properties.registry.ClientRegistryProperties; import org.mengyun.tcctransaction.properties.remoting.NettyClientProperties; @@ -14,11 +17,14 @@ import org.mengyun.tcctransaction.recovery.RecoveryConfig; import org.mengyun.tcctransaction.remoting.netty.NettyClientConfig; import org.mengyun.tcctransaction.spring.annotation.EnableTccTransaction; +import org.mengyun.tcctransaction.stats.StatsSupplier; import org.mengyun.tcctransaction.storage.StoreConfig; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.autoconfigure.condition.ConditionalOnBean; import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.boot.context.properties.EnableConfigurationProperties; +import org.springframework.boot.web.servlet.ServletRegistrationBean; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @@ -92,4 +98,21 @@ public void addServerInterceptors(GlobalServerInterceptorRegistry registry) { }; } } + + @Configuration + @ConditionalOnClass({PrometheusMetricsCollector.class}) + static class PrometheusExporterConfiguration { + + @Bean + @ConditionalOnBean(StatsSupplier.class) + public ServletRegistrationBean metricsServletServletRegistrationBean(@Autowired StatsSupplier statsSupplier) { + ServletRegistrationBean bean = new ServletRegistrationBean<>(); + PrometheusMetricsCollector collector = new PrometheusMetricsCollector(statsSupplier); + CollectorRegistry.defaultRegistry.register(collector); + bean.setServlet(new MetricsServlet()); + bean.setLoadOnStartup(1); + bean.addUrlMappings("/prometheus/metrics"); + return bean; + } + } }