Skip to content

Commit 813a29b

Browse files
authored
Fix publish-on-change (#1073)
1 parent 99c8964 commit 813a29b

File tree

4 files changed

+71
-24
lines changed

4 files changed

+71
-24
lines changed

hivemq-edge/src/main/java/com/hivemq/edge/modules/adapters/impl/polling/PollingTask.java

Lines changed: 15 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ public void run() {
8282
}
8383
}
8484
});
85-
} catch (Throwable t) {
85+
} catch (final Throwable t) {
8686
// the sampler shouldn't throw a exception, but better safe than sorry as we might to miss rescheduling the task otherwise.
8787
handleExceptionDuringPolling(t);
8888
}
@@ -96,10 +96,10 @@ private void handleInterruptionException(final @NotNull Throwable throwable) {
9696
//-- Job was killed by the framework as it took too long
9797
//-- Do not call back to the job here (notify) since it will
9898
//-- Not respond and we dont want to block other polls
99-
int errorCountTotal = watchdogErrorCount.incrementAndGet();
100-
boolean stopBecauseOfTooManyErrors =
99+
final var errorCountTotal = watchdogErrorCount.incrementAndGet();
100+
final var stopBecauseOfTooManyErrors =
101101
errorCountTotal > InternalConfigurations.ADAPTER_RUNTIME_WATCHDOG_TIMEOUT_ERRORS_BEFORE_INTERRUPT.get();
102-
final long milliSecondsSinceLastPoll = TimeUnit.NANOSECONDS.toMillis(nanoTimeProvider.nanoTime() - nanosOfLastPolling);
102+
final var milliSecondsSinceLastPoll = TimeUnit.NANOSECONDS.toMillis(nanoTimeProvider.nanoTime() - nanosOfLastPolling);
103103
if (stopBecauseOfTooManyErrors) {
104104
log.warn(
105105
"Detected bad system process {} in sampler {} - terminating process to maintain health ({}ms runtime)",
@@ -123,7 +123,7 @@ private void handleInterruptionException(final @NotNull Throwable throwable) {
123123

124124

125125
private void handleExceptionDuringPolling(final @NotNull Throwable throwable) {
126-
int errorCountTotal = applicationErrorCount.incrementAndGet();
126+
final int errorCountTotal = applicationErrorCount.incrementAndGet();
127127
final int maxErrorsBeforeRemoval = sampler.getMaxErrorsBeforeRemoval();
128128
// case 1: Unlimited retry (maxErrorsBeforeRemoval < 0) or less errors than the limit
129129
if (maxErrorsBeforeRemoval < 0 || errorCountTotal <= maxErrorsBeforeRemoval) {
@@ -149,18 +149,18 @@ private void handleExceptionDuringPolling(final @NotNull Throwable throwable) {
149149
}
150150

151151
private void notifyOnError(
152-
final @NotNull ProtocolAdapterPollingSampler sampler, final @NotNull Throwable t, boolean continuing) {
152+
final @NotNull ProtocolAdapterPollingSampler sampler, final @NotNull Throwable t, final boolean continuing) {
153153
try {
154154
sampler.error(t, continuing);
155-
} catch (Throwable samplerError) {
155+
} catch (final Throwable samplerError) {
156156
if (log.isInfoEnabled()) {
157157
log.info("Sampler Encountered Error In Notification", samplerError);
158158
}
159159
}
160160
}
161161

162-
private void reschedule(int errorCountTotal) {
163-
long pollDuration = TimeUnit.NANOSECONDS.toMillis(nanoTimeProvider.nanoTime() - nanosOfLastPolling);
162+
private void reschedule(final int errorCountTotal) {
163+
final long pollDuration = TimeUnit.NANOSECONDS.toMillis(nanoTimeProvider.nanoTime() - nanosOfLastPolling);
164164
final long delayInMillis = sampler.getPeriod() - pollDuration;
165165
// a negative delay means that the last polling attempt took longer to be processed than the specified delay between polls
166166
if (delayInMillis < 0) {
@@ -177,24 +177,24 @@ private void reschedule(int errorCountTotal) {
177177
.fire();
178178
}
179179

180-
long nonNegativeDelay = Math.max(0, delayInMillis);
180+
final long nonNegativeDelay = Math.max(0, delayInMillis);
181181

182182
if (errorCountTotal == 0) {
183183
schedule(nonNegativeDelay);
184184
} else {
185-
long backoff = getBackoff(errorCountTotal,
185+
final long backoff = getBackoff(errorCountTotal,
186186
InternalConfigurations.ADAPTER_RUNTIME_MAX_APPLICATION_ERROR_BACKOFF.get());
187-
long effectiveDelay = Math.max(nonNegativeDelay, backoff);
187+
final long effectiveDelay = Math.max(nonNegativeDelay, backoff);
188188
schedule(effectiveDelay);
189189
}
190190
}
191191

192192
@VisibleForTesting
193-
void schedule(long nonNegativeDelay) {
193+
void schedule(final long nonNegativeDelay) {
194194
if (continueScheduling.get()) {
195195
try {
196196
scheduledExecutorService.schedule(this, nonNegativeDelay, TimeUnit.MILLISECONDS);
197-
} catch (RejectedExecutionException rejectedExecutionException) {
197+
} catch (final RejectedExecutionException rejectedExecutionException) {
198198
// ignore. This is fine during shutdown.
199199
}
200200
}
@@ -205,7 +205,7 @@ private void resetErrorStats() {
205205
watchdogErrorCount.set(0);
206206
}
207207

208-
private static long getBackoff(int errorCount, long max) {
208+
private static long getBackoff(final int errorCount, final long max) {
209209
//-- This will backoff up to a max of about a day (unless the max provided is less)
210210
long f = (long) (Math.pow(2, Math.min(errorCount, 20)) * 100);
211211
f += ThreadLocalRandom.current().nextInt(0, errorCount * 100);

modules/hivemq-edge-module-plc4x/src/main/java/com/hivemq/edge/adapters/plc4x/PublishChangedDataOnlyHandler.java

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -18,28 +18,37 @@
1818
import com.hivemq.adapter.sdk.api.data.DataPoint;
1919
import org.jetbrains.annotations.NotNull;
2020

21+
import java.util.HashSet;
2122
import java.util.List;
2223
import java.util.Map;
24+
import java.util.Set;
2325
import java.util.concurrent.ConcurrentHashMap;
26+
import java.util.concurrent.atomic.AtomicBoolean;
2427

2528
/**
2629
* This class is here TEMPORARY, the functionality will be moved into NorthboundMappings
2730
*/
2831
public class PublishChangedDataOnlyHandler {
29-
private final @NotNull Map<String, List<DataPoint>> lastSamples = new ConcurrentHashMap<>();
32+
private final @NotNull Map<String, Set<DataPoint>> lastSamples = new ConcurrentHashMap<>();
3033

31-
public boolean replaceIfValueIsNew(final @NotNull String tagName, final @NotNull List<DataPoint> newValue) {
32-
final var computedValue = lastSamples.compute(tagName, (key,value) -> {
34+
public boolean checkIfValuesHaveChangedSinceLastInvocation(final @NotNull String tagName, final @NotNull List<DataPoint> newValue) {
35+
final var replaced = new AtomicBoolean(false);
36+
lastSamples.compute(tagName, (key,value) -> {
3337
if (value == null) {
34-
return newValue;
35-
} else if (value.equals(newValue)) {
36-
return value;
38+
replaced.set(true);
39+
return new HashSet<>(newValue);
3740
} else {
38-
return newValue;
41+
final var newSet = new HashSet<>(newValue);
42+
if(!newSet.equals(value)) {
43+
replaced.set(true);
44+
return new HashSet<>(newValue);
45+
} else {
46+
return value;
47+
}
3948
}
4049
});
4150

42-
return newValue != computedValue;
51+
return replaced.get();
4352
}
4453

4554
public void clear() {

modules/hivemq-edge-module-plc4x/src/main/java/com/hivemq/edge/adapters/plc4x/impl/AbstractPlc4xAdapter.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,7 @@ public void poll(final @NotNull BatchPollingInput pollingInput, final @NotNull B
111111
.collect(Collectors.groupingBy(DataPoint::getTagName,
112112
Collectors.mapping(Function.identity(), Collectors.toList())));
113113
tagsToValueList.forEach((tagName,tagValues) -> {
114-
if (lastSamples.replaceIfValueIsNew(tagName, tagValues)) {
114+
if (lastSamples.checkIfValuesHaveChangedSinceLastInvocation(tagName, tagValues)) {
115115
tagValues.forEach(pollingOutput::addDataPoint);
116116
}
117117
});
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
/*
2+
* Copyright 2023-present HiveMQ GmbH
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.hivemq.edge.adapters.plc4x;
17+
18+
import com.hivemq.edge.modules.adapters.impl.factories.AdapterFactoriesImpl;
19+
import org.junit.jupiter.api.Test;
20+
21+
import java.util.List;
22+
23+
import static org.assertj.core.api.Assertions.assertThat;
24+
25+
public class PublishChangedDataOnlyHandlerTest {
26+
27+
@Test
28+
public void test() {
29+
final var factory = new AdapterFactoriesImpl();
30+
final var dataPointFactory = factory.dataPointFactory();
31+
final var toTest = new PublishChangedDataOnlyHandler();
32+
final var initial = toTest.checkIfValuesHaveChangedSinceLastInvocation("tag1", List.of(dataPointFactory.create("tag1", "value1")));
33+
final var secondTry = toTest.checkIfValuesHaveChangedSinceLastInvocation("tag1", List.of(dataPointFactory.create("tag1", "value1")));
34+
35+
assertThat(initial).isTrue();
36+
assertThat(secondTry).isFalse();
37+
}
38+
}

0 commit comments

Comments
 (0)