-
Bug
-
Resolution: Done
-
Major
-
None
-
None
The AsyncNonBlockingStore is known to allow for concurrent writes more than modification queue size. Unfortunately, due to how synchronized Publisher works it can be much much more if a given thread is delayed as other threads can sneak in. For example in the stress test with 1024 modification queue size I could get 6K entries in a given queue. The problem is that the concurrent publisher would add the value to its own queue which was only processed on the first thread, which means many could sneak in as no changes were actually populated in the map. https://github.com/ReactiveX/RxJava/blob/3.x/src/main/java/io/reactivex/rxjava3/processors/SerializedProcessor.java#L60
We should change this to be a bit more strict and limit the writes to only queue + # of threads with concurrent modifications.
> We should change this to be a bit more strict and limit the writes to only queue + # of threads with concurrent modifications.
Note that there is no lower limit on modificationQueueSize or upper limit on # of concurrent modifications (either synchronous or asynchronous), so in fact this could also allow many more modifications in the queue than modificationQueueSize.