Uploaded image for project: 'Infinispan'
  1. Infinispan
  2. ISPN-14250

RESP Pub/Sub doesn't work with redis-cli

XMLWordPrintable

    • Icon: Bug Bug
    • Resolution: Done
    • Icon: Major Major
    • 14.0.2.Final, 15.0.0.Final
    • 14.0.1.Final
    • RESP
    • None

      RESP endpoint pub sub fails when using redis-cli as the storage is configured to be text-plain but the pub sub handler expects the key to be a byte[].

      This in turn throws the following exception when trying to publish a message to another redis client listening to the same channel.

      2022-10-20 15:21:32,937 ERROR (non-blocking-thread--p2-t9) [org.infinispan.interceptors.impl.InvocationContextInterceptor] ISPN000136: Error executing command PutKeyValueCommand on Cache 'respCache', writing keys [WrappedByteArray[8E10\NFD7F\t\e\s\t (9 bytes)]] java.lang.ClassCastException: class java.lang.String cannot be cast to class [B (java.lang.String and [B are in module java.base of loader 'bootstrap')
      	at org.infinispan.notifications.cachelistener.CacheNotifierImpl$BaseCacheEntryListenerInvocation.shouldInvoke(CacheNotifierImpl.java:1817)
      	at org.infinispan.notifications.cachelistener.CacheNotifierImpl$BaseCacheEntryListenerInvocation.invoke(CacheNotifierImpl.java:1754)
      	at org.infinispan.notifications.cachelistener.CacheNotifierImpl.doNotifyCreated(CacheNotifierImpl.java:428)
      	at org.infinispan.notifications.cachelistener.CacheNotifierImpl.notifyCacheEntryCreated(CacheNotifierImpl.java:410)
      	at org.infinispan.notifications.cachelistener.NotifyHelper.entryCommitted(NotifyHelper.java:63)
      	at org.infinispan.interceptors.locking.ClusteringDependentLogic$ReplicationLogic.commitSingleEntry(ClusteringDependentLogic.java:561)
      	at org.infinispan.interceptors.locking.ClusteringDependentLogic$AbstractClusteringDependentLogic.commitEntry(ClusteringDependentLogic.java:243)
      	at org.infinispan.interceptors.impl.EntryWrappingInterceptor.commitContextEntry(EntryWrappingInterceptor.java:612)
      	at org.infinispan.interceptors.impl.EntryWrappingInterceptor.commitEntryIfNeeded(EntryWrappingInterceptor.java:885)
      	at org.infinispan.interceptors.impl.EntryWrappingInterceptor.commitContextEntries(EntryWrappingInterceptor.java:592)
      	at org.infinispan.interceptors.impl.EntryWrappingInterceptor.applyChanges(EntryWrappingInterceptor.java:686)
      	at org.infinispan.interceptors.impl.EntryWrappingInterceptor.applyAndFixVersion(EntryWrappingInterceptor.java:738)
      	at org.infinispan.interceptors.SyncInvocationStage.thenApply(SyncInvocationStage.java:44)
      	at org.infinispan.interceptors.impl.EntryWrappingInterceptor.setSkipRemoteGetsAndInvokeNextForDataCommand(EntryWrappingInterceptor.java:733)
      	at org.infinispan.interceptors.impl.EntryWrappingInterceptor.visitPutKeyValueCommand(EntryWrappingInterceptor.java:336)
      	at org.infinispan.commands.write.PutKeyValueCommand.acceptVisitor(PutKeyValueCommand.java:65)
      	at org.infinispan.interceptors.BaseAsyncInterceptor.invokeNextAndFinally(BaseAsyncInterceptor.java:155)
      	at org.infinispan.interceptors.locking.AbstractLockingInterceptor.lambda$nonTxLockAndInvokeNext$3(AbstractLockingInterceptor.java:318)
      	at org.infinispan.interceptors.SyncInvocationStage.andHandle(SyncInvocationStage.java:69)
      	at org.infinispan.interceptors.locking.AbstractLockingInterceptor.nonTxLockAndInvokeNext(AbstractLockingInterceptor.java:313)
      	at org.infinispan.interceptors.locking.AbstractLockingInterceptor.visitNonTxDataWriteCommand(AbstractLockingInterceptor.java:138)
      	at org.infinispan.interceptors.locking.NonTransactionalLockingInterceptor.visitDataWriteCommand(NonTransactionalLockingInterceptor.java:41)
      	at org.infinispan.interceptors.locking.AbstractLockingInterceptor.visitPutKeyValueCommand(AbstractLockingInterceptor.java:82)
      	at org.infinispan.commands.write.PutKeyValueCommand.acceptVisitor(PutKeyValueCommand.java:65)
      	at org.infinispan.interceptors.BaseAsyncInterceptor.invokeNextAndHandle(BaseAsyncInterceptor.java:188)
      	at org.infinispan.statetransfer.StateTransferInterceptor.handleNonTxWriteCommand(StateTransferInterceptor.java:312)
      	at org.infinispan.statetransfer.StateTransferInterceptor.handleWriteCommand(StateTransferInterceptor.java:256)
      	at org.infinispan.statetransfer.StateTransferInterceptor.visitPutKeyValueCommand(StateTransferInterceptor.java:96)
      	at org.infinispan.commands.write.PutKeyValueCommand.acceptVisitor(PutKeyValueCommand.java:65)
      	at org.infinispan.interceptors.BaseAsyncInterceptor.invokeNext(BaseAsyncInterceptor.java:59)
      	at org.infinispan.interceptors.impl.CacheMgmtInterceptor.updateStoreStatistics(CacheMgmtInterceptor.java:265)
      	at org.infinispan.interceptors.impl.CacheMgmtInterceptor.visitPutKeyValueCommand(CacheMgmtInterceptor.java:224)
      	at org.infinispan.commands.write.PutKeyValueCommand.acceptVisitor(PutKeyValueCommand.java:65)
      	at org.infinispan.interceptors.BaseAsyncInterceptor.invokeNext(BaseAsyncInterceptor.java:59)
      	at org.infinispan.interceptors.DDAsyncInterceptor.handleDefault(DDAsyncInterceptor.java:54)
      	at org.infinispan.interceptors.DDAsyncInterceptor.visitPutKeyValueCommand(DDAsyncInterceptor.java:60)
      	at org.infinispan.commands.write.PutKeyValueCommand.acceptVisitor(PutKeyValueCommand.java:65)
      	at org.infinispan.interceptors.BaseAsyncInterceptor.invokeNextAndExceptionally(BaseAsyncInterceptor.java:128)
      	at org.infinispan.interceptors.impl.InvocationContextInterceptor.visitCommand(InvocationContextInterceptor.java:89)
      	at org.infinispan.interceptors.impl.AsyncInterceptorChainImpl.invokeAsync(AsyncInterceptorChainImpl.java:220)
      	at org.infinispan.cache.impl.InvocationHelper.doInvokeAsync(InvocationHelper.java:318)
      	at org.infinispan.cache.impl.InvocationHelper.invokeAsync(InvocationHelper.java:156)
      	at org.infinispan.cache.impl.InvocationHelper.invokeAsync(InvocationHelper.java:139)
      	at org.infinispan.cache.impl.CacheImpl.putAsync(CacheImpl.java:1431)
      	at org.infinispan.cache.impl.CacheImpl.putAsync(CacheImpl.java:1903)
      	at org.infinispan.cache.impl.CacheImpl.putAsync(CacheImpl.java:1425)
      	at org.infinispan.cache.impl.CacheImpl.putAsync(CacheImpl.java:431)
      	at org.infinispan.cache.impl.AbstractDelegatingCache.putAsync(AbstractDelegatingCache.java:173)
      	at org.infinispan.cache.impl.AbstractDelegatingCache.putAsync(AbstractDelegatingCache.java:173)
      	at org.infinispan.cache.impl.EncoderCache.putAsync(EncoderCache.java:266)
      	at org.infinispan.server.resp.Resp3Handler.performSet(Resp3Handler.java:216)
      	at org.infinispan.server.resp.Resp3Handler.handleRequest(Resp3Handler.java:102)
      	at org.infinispan.server.resp.RespLettuceHandler.decode(RespLettuceHandler.java:49)
      	at io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:519)
      	at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:458)
      	at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:280)
      

      The fix should just require changing the key media type to be OCTET_STREAM like the value so it is stored as a byte[] instead of a String.

            wburns@redhat.com Will Burns
            wburns@redhat.com Will Burns
            Votes:
            0 Vote for this issue
            Watchers:
            1 Start watching this issue

              Created:
              Updated:
              Resolved: