-
I have a strange problem - code works with merge, but not with concatenate. Does anyone knows what could I do to locate the problem (using profilers, or adding some manual debugging)? The problem that I have is that the code below is just hanging and if I enable When I tried switching between those two methods and log the debug info: With I'm posting here parts of the code that I think are relevant for the issue I want to describe:
when it fails with timeout this is part of the exception that I get:
|
Beta Was this translation helpful? Give feedback.
Replies: 7 comments 19 replies
-
I can even find a group that is entering the processing, but it never emits the
|
Beta Was this translation helpful? Give feedback.
-
After a lot more debugging and whatnot, I pinned the problem.
first few markets have "void_reason", so that TRUE.equals(group.key()) and only that group is emitted (for each "void_reason":"REASON" I can see in the logs |
Beta Was this translation helpful? Give feedback.
-
Here is the smallest reproducable I created so far - change elements to 119 and it will work!?
|
Beta Was this translation helpful? Give feedback.
-
And this is the reason: Why is it 128? |
Beta Was this translation helpful? Give feedback.
-
I don't think that's the root cause, 128 is for batching (we could make it configurable TBH, a hard coded value is never optimal). The problem you are observing are likely due to:
|
Beta Was this translation helpful? Give feedback.
-
@jponge Can you then please tell me what change to do to my last code I posted above, so that it works for larger json arrays? |
Beta Was this translation helpful? Give feedback.
-
Here is a more minimal reproducer with working code based on the logic you had: List<Integer> result = Multi.createFrom().range(0, 300)
.log()
.group().by(x -> x > 20)
.onItem().transformToMultiAndMerge(
group -> group.key()
? group.collect().asList().replaceWith(1).toMulti()
: group.collect().asList().replaceWith(2).toMulti()
)
.onCompletion().invoke(() -> System.out.println("Transformation completed"))
.ifNoItem().after(Duration.ofSeconds(2)).fail()
.runSubscriptionOn(Executors.newSingleThreadExecutor())
.collect().asList().await().indefinitely();
System.out.println(result); The problem in your code was that the If you extrapolate that to the rest of your code, you should see why there was a problem. |
Beta Was this translation helpful? Give feedback.
Here is a more minimal reproducer with working code based on the logic you had: