Skip to content

Commit ae9be46

Browse files
committed
[fix][misc] Fix compareTo contract violation for NamespaceBundleStats, TimeAverageMessageData and ResourceUnitRanking (#24772)
(cherry picked from commit 7504538)
1 parent 2d68d84 commit ae9be46

File tree

9 files changed

+333
-32
lines changed

9 files changed

+333
-32
lines changed

pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -820,7 +820,7 @@ private synchronized ResourceUnit findBrokerForPlacement(Multimap<Long, Resource
820820
minLoadPercentage = loadPercentage;
821821
} else {
822822
if ((unboundedRanks ? ranking.compareMessageRateTo(selectedRanking)
823-
: ranking.compareTo(selectedRanking)) < 0) {
823+
: ranking.compareToOtherRanking(selectedRanking)) < 0) {
824824
minLoadPercentage = loadPercentage;
825825
selectedRU = candidate;
826826
selectedRanking = ranking;

pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/models/TopKBundlesTest.java

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -271,4 +271,29 @@ public void testPartitionSort() {
271271
}
272272
}
273273
}
274+
275+
// Issue https://github.com/apache/pulsar/issues/24754
276+
@Test
277+
public void testPartitionSortCompareToContractViolationIssue() {
278+
Random rnd = new Random(0);
279+
ArrayList<NamespaceBundleStats> stats = new ArrayList<>();
280+
for (int i = 0; i < 1000; ++i) {
281+
NamespaceBundleStats s = new NamespaceBundleStats();
282+
s.msgThroughputIn = 4 * 75000 * rnd.nextDouble(); // Just above threshold (1e5)
283+
s.msgThroughputOut = 75000000 - (4 * (75000 * rnd.nextDouble()));
284+
s.msgRateIn = 4 * 75 * rnd.nextDouble();
285+
s.msgRateOut = 75000 - (4 * 75 * rnd.nextDouble());
286+
s.topics = i;
287+
s.consumerCount = i;
288+
s.producerCount = 4 * rnd.nextInt(375);
289+
s.cacheSize = 75000000 - (rnd.nextInt(4 * 75000));
290+
stats.add(s);
291+
}
292+
List<Map.Entry<String, ? extends Comparable>> bundleEntries = new ArrayList<>();
293+
294+
for (NamespaceBundleStats s : stats) {
295+
bundleEntries.add(Map.entry("bundle-" + s.msgThroughputIn, s));
296+
}
297+
TopKBundles.partitionSort(bundleEntries, 100);
298+
}
274299
}

pulsar-client-admin-api/src/main/java/org/apache/pulsar/policies/data/loadbalancer/NamespaceBundleStats.java

Lines changed: 22 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -37,14 +37,14 @@ public class NamespaceBundleStats implements Comparable<NamespaceBundleStats>, S
3737
public long topics;
3838
public long cacheSize;
3939

40-
// Consider the throughput equal if difference is less than 100 KB/s
41-
private static final double throughputDifferenceThreshold = 1e5;
42-
// Consider the msgRate equal if the difference is less than 100
43-
private static final double msgRateDifferenceThreshold = 100;
44-
// Consider the total topics/producers/consumers equal if the difference is less than 500
45-
private static final long topicConnectionDifferenceThreshold = 500;
46-
// Consider the cache size equal if the difference is less than 100 kb
47-
private static final long cacheSizeDifferenceThreshold = 100000;
40+
// When comparing throughput, uses a resolution of 100 KB/s, effectively rounding values before comparison
41+
private static final double throughputComparisonResolution = 1e5;
42+
// When comparing message rate, uses a resolution of 100, effectively rounding values before comparison
43+
private static final double msgRateComparisonResolution = 100;
44+
// When comparing total topics/producers/consumers, uses a resolution/rounding of 500
45+
private static final long topicConnectionComparisonResolution = 500;
46+
// When comparing cache size, uses a resolution/rounding of 100kB
47+
private static final long cacheSizeComparisonResolution = 100000;
4848

4949
public NamespaceBundleStats() {
5050
reset();
@@ -89,39 +89,33 @@ public int compareTo(NamespaceBundleStats other) {
8989
public int compareByMsgRate(NamespaceBundleStats other) {
9090
double thisMsgRate = this.msgRateIn + this.msgRateOut;
9191
double otherMsgRate = other.msgRateIn + other.msgRateOut;
92-
if (Math.abs(thisMsgRate - otherMsgRate) > msgRateDifferenceThreshold) {
93-
return Double.compare(thisMsgRate, otherMsgRate);
94-
}
95-
return 0;
92+
return compareDoubleWithResolution(thisMsgRate, otherMsgRate, msgRateComparisonResolution);
93+
}
94+
95+
private static int compareDoubleWithResolution(double v1, double v2, double resolution) {
96+
return Long.compare(Math.round(v1 / resolution), Math.round(v2 / resolution));
97+
}
98+
99+
private static int compareLongWithResolution(long v1, long v2, long resolution) {
100+
return Long.compare(v1 / resolution, v2 / resolution);
96101
}
97102

98103
public int compareByTopicConnections(NamespaceBundleStats other) {
99104
long thisTopicsAndConnections = this.topics + this.consumerCount + this.producerCount;
100105
long otherTopicsAndConnections = other.topics + other.consumerCount + other.producerCount;
101-
if (Math.abs(thisTopicsAndConnections - otherTopicsAndConnections) > topicConnectionDifferenceThreshold) {
102-
return Long.compare(thisTopicsAndConnections, otherTopicsAndConnections);
103-
}
104-
return 0;
106+
return compareLongWithResolution(thisTopicsAndConnections, otherTopicsAndConnections,
107+
topicConnectionComparisonResolution);
105108
}
106109

107110
public int compareByCacheSize(NamespaceBundleStats other) {
108-
if (Math.abs(this.cacheSize - other.cacheSize) > cacheSizeDifferenceThreshold) {
109-
return Long.compare(this.cacheSize, other.cacheSize);
110-
}
111-
return 0;
111+
return compareLongWithResolution(cacheSize, other.cacheSize, cacheSizeComparisonResolution);
112112
}
113113

114114
public int compareByBandwidthIn(NamespaceBundleStats other) {
115-
if (Math.abs(this.msgThroughputIn - other.msgThroughputIn) > throughputDifferenceThreshold) {
116-
return Double.compare(this.msgThroughputIn, other.msgThroughputIn);
117-
}
118-
return 0;
115+
return compareDoubleWithResolution(msgThroughputIn, other.msgThroughputIn, throughputComparisonResolution);
119116
}
120117

121118
public int compareByBandwidthOut(NamespaceBundleStats other) {
122-
if (Math.abs(this.msgThroughputOut - other.msgThroughputOut) > throughputDifferenceThreshold) {
123-
return Double.compare(this.msgThroughputOut, other.msgThroughputOut);
124-
}
125-
return 0;
119+
return compareDoubleWithResolution(msgThroughputOut, other.msgThroughputOut, throughputComparisonResolution);
126120
}
127121
}
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.pulsar.policies.data.loadbalancer;
20+
21+
import java.util.ArrayList;
22+
import java.util.List;
23+
import java.util.Random;
24+
import org.testng.annotations.Test;
25+
26+
public class NamespaceBundleStatsTest {
27+
28+
@Test
29+
public void testCompareToContract() {
30+
Random rnd = new Random();
31+
List<NamespaceBundleStats> stats = new ArrayList<>();
32+
for (int i = 0; i < 1000; ++i) {
33+
NamespaceBundleStats s = new NamespaceBundleStats();
34+
s.msgThroughputIn = 4 * 75000 * rnd.nextDouble();
35+
s.msgThroughputOut = 75000000 - (4 * (75000 * rnd.nextDouble()));
36+
s.msgRateIn = 4 * 75 * rnd.nextDouble();
37+
s.msgRateOut = 75000 - (4 * 75 * rnd.nextDouble());
38+
s.topics = i;
39+
s.consumerCount = i;
40+
s.producerCount = 4 * rnd.nextInt(375);
41+
s.cacheSize = 75000000 - (rnd.nextInt(4 * 75000));
42+
stats.add(s);
43+
}
44+
// this would throw "java.lang.IllegalArgumentException: Comparison method violates its general contract!"
45+
// if compareTo() is not implemented correctly.
46+
stats.sort(NamespaceBundleStats::compareTo);
47+
}
48+
}
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.pulsar.common.util;
20+
21+
import lombok.experimental.UtilityClass;
22+
23+
/**
24+
* Utility class for comparing values.
25+
*/
26+
@UtilityClass
27+
public class CompareUtil {
28+
29+
/**
30+
* Compare two double values with a given resolution.
31+
* @param v1 first value to compare
32+
* @param v2 second value to compare
33+
* @param resolution resolution to compare with
34+
* @return -1 if v1 < v2, 0 if v1 == v2, 1 if v1 > v2
35+
*/
36+
public static int compareDoubleWithResolution(double v1, double v2, double resolution) {
37+
return Long.compare((long) (v1 / resolution), (long) (v2 / resolution));
38+
}
39+
40+
/**
41+
* Compare two long values with a given resolution.
42+
* @param v1 first value to compare
43+
* @param v2 second value to compare
44+
* @param resolution resolution to compare with
45+
* @return -1 if v1 < v2, 0 if v1 == v2, 1 if v1 > v2
46+
*/
47+
public static int compareLongWithResolution(long v1, long v2, long resolution) {
48+
return Long.compare(v1 / resolution, v2 / resolution);
49+
}
50+
51+
/**
52+
* Compare two int values with a given resolution.
53+
* @param v1 first value to compare
54+
* @param v2 second value to compare
55+
* @param resolution resolution to compare with
56+
* @return -1 if v1 < v2, 0 if v1 == v2, 1 if v1 > v2
57+
*/
58+
public static int compareIntegerWithResolution(int v1, int v2, int resolution) {
59+
return Integer.compare(v1 / resolution, v2 / resolution);
60+
}
61+
}

pulsar-common/src/main/java/org/apache/pulsar/policies/data/loadbalancer/ResourceUnitRanking.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@
2626
* The class containing information about system resources, allocated quota, and loaded bundles.
2727
*/
2828
@EqualsAndHashCode
29-
public class ResourceUnitRanking implements Comparable<ResourceUnitRanking> {
29+
public class ResourceUnitRanking {
3030

3131
private static final long KBITS_TO_BYTES = 1024 / 8;
3232
private static final double PERCENTAGE_DIFFERENCE_THRESHOLD = 5.0;
@@ -129,7 +129,13 @@ private void estimateLoadPercentage() {
129129

130130
}
131131

132-
public int compareTo(ResourceUnitRanking other) {
132+
/**
133+
* Compares to another ranking. Please note that this cannot be used to sort the rankings since the results
134+
* of this method don't satify the contract of {@link Comparable#compareTo(Object)}
135+
* @param other other ranking to compare to
136+
* @return negative if this is less than other, 0 if they are equal, positive if this is greater than other
137+
*/
138+
public int compareToOtherRanking(ResourceUnitRanking other) {
133139
if (Math.abs(this.estimatedLoadPercentage - other.estimatedLoadPercentage) > PERCENTAGE_DIFFERENCE_THRESHOLD) {
134140
return Double.compare(this.estimatedLoadPercentage, other.estimatedLoadPercentage);
135141
}

pulsar-common/src/main/java/org/apache/pulsar/policies/data/loadbalancer/TimeAverageMessageData.java

Lines changed: 37 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,14 @@
1818
*/
1919
package org.apache.pulsar.policies.data.loadbalancer;
2020

21+
import static org.apache.pulsar.common.util.CompareUtil.compareDoubleWithResolution;
22+
import lombok.EqualsAndHashCode;
23+
2124
/**
2225
* Data class comprising the average message data over a fixed period of time.
2326
*/
24-
public class TimeAverageMessageData {
27+
@EqualsAndHashCode
28+
public class TimeAverageMessageData implements Comparable<TimeAverageMessageData> {
2529
// The maximum number of samples this data will consider.
2630
private int maxSamples;
2731

@@ -41,6 +45,11 @@ public class TimeAverageMessageData {
4145
// The average message rate out per second.
4246
private double msgRateOut;
4347

48+
// When comparing throughput, uses a resolution of 100 KB/s, effectively rounding values before comparison
49+
private static final double throughputComparisonResolution = 1e5;
50+
// When comparing message rate, uses a resolution of 100, effectively rounding values before comparison
51+
private static final double msgRateComparisonResolution = 100;
52+
4453
// For JSON only.
4554
public TimeAverageMessageData() {
4655
}
@@ -177,4 +186,31 @@ public double totalMsgRate() {
177186
public double totalMsgThroughput() {
178187
return msgThroughputIn + msgThroughputOut;
179188
}
189+
190+
@Override
191+
public int compareTo(TimeAverageMessageData other) {
192+
int result = this.compareByBandwidthIn(other);
193+
194+
if (result == 0) {
195+
result = this.compareByBandwidthOut(other);
196+
}
197+
if (result == 0) {
198+
result = this.compareByMsgRate(other);
199+
}
200+
return result;
201+
}
202+
203+
public int compareByMsgRate(TimeAverageMessageData other) {
204+
double thisMsgRate = this.msgRateIn + this.msgRateOut;
205+
double otherMsgRate = other.msgRateIn + other.msgRateOut;
206+
return compareDoubleWithResolution(thisMsgRate, otherMsgRate, msgRateComparisonResolution);
207+
}
208+
209+
public int compareByBandwidthIn(TimeAverageMessageData other) {
210+
return compareDoubleWithResolution(msgThroughputIn, other.msgThroughputIn, throughputComparisonResolution);
211+
}
212+
213+
public int compareByBandwidthOut(TimeAverageMessageData other) {
214+
return compareDoubleWithResolution(msgThroughputOut, other.msgThroughputOut, throughputComparisonResolution);
215+
}
180216
}

0 commit comments

Comments
 (0)