/* * JBoss, Home of Professional Open Source. * Copyright 2006, Red Hat Middleware LLC, and individual contributors * as indicated by the @author tags. See the copyright.txt file in the * distribution for a full listing of individual contributors. * * This is free software; you can redistribute it and/or modify it * under the terms of the GNU Lesser General Public License as * published by the Free Software Foundation; either version 2.1 of * the License, or (at your option) any later version. * * This software is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * Lesser General Public License for more details. * * You should have received a copy of the GNU Lesser General Public * License along with this software; if not, write to the Free * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA * 02110-1301 USA, or see the FSF site: http://www.fsf.org. */ package org.jboss.remoting3.test; import java.io.IOException; import java.net.ConnectException; import java.net.InetSocketAddress; import java.net.URI; import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import org.jboss.remoting3.Channel; import org.jboss.remoting3.CloseHandler; import org.jboss.remoting3.Connection; import org.jboss.remoting3.Endpoint; import org.jboss.remoting3.MessageInputStream; import org.jboss.remoting3.OpenListener; import org.jboss.remoting3.Registration; import org.jboss.remoting3.Remoting; import org.jboss.remoting3.remote.RemoteConnectionProviderFactory; import org.jboss.remoting3.security.SimpleServerAuthenticationProvider; import org.jboss.remoting3.spi.NetworkServerProvider; import org.xnio.IoFuture; import org.xnio.IoFuture.Status; import org.xnio.IoUtils; import org.xnio.OptionMap; import org.xnio.Options; import org.xnio.Sequence; import org.xnio.Xnio; import org.xnio.channels.AcceptingChannel; import org.xnio.channels.ConnectedStreamChannel; /** * * @author Kabir Khan * @version $Revision: 1.1 $ */ public class __KabirsPlayThing { protected Endpoint endpoint; protected ExecutorService executorService; private AcceptingChannel streamServer; private SimpleServerAuthenticationProvider provider; private Registration serviceRegistration; private Registration registration; private NetworkServerProvider networkServerProvider; private Connection connection; private Channel sendChannel; public static void main (String[] args) throws Exception { __KabirsPlayThing thing = new __KabirsPlayThing(); try { if (args.length > 0 && args[0].equals("server")) { thing.setupServer(); } else { thing.connectClient(); } System.out.println("Press to exit"); System.in.read(); } finally { thing.close(); } } private void setupServer() throws Exception { setupRemoting(); streamServer = networkServerProvider.createServer(new InetSocketAddress("::1", 30123), OptionMap.create(Options.SASL_MECHANISMS, Sequence.of("CRAM-MD5")), provider); endpoint.registerService("channel", new OpenListener() { @Override public void registrationTerminated() { } @Override public void channelOpened(final Channel channel) { channelStuff("Server", channel); } }, OptionMap.EMPTY); } private void connectClient() throws Exception{ setupRemoting(); IoFuture futureConnection = endpoint.connect(new URI("remote://[::1]:30123"), OptionMap.EMPTY, "bob", "test", "pass".toCharArray()); Status status = futureConnection.await(20000, TimeUnit.MILLISECONDS); if (status == Status.WAITING) { futureConnection.cancel(); throw new ConnectException(); } this.connection = futureConnection.get(); IoFuture futureChannel = connection.openChannel("channel", OptionMap.EMPTY); sendChannel = futureChannel.get(); channelStuff("Client", sendChannel); } private void setupRemoting() throws Exception { executorService = new ThreadPoolExecutor(16, 16, 1L, TimeUnit.DAYS, new LinkedBlockingQueue()); endpoint = Remoting.createEndpoint("test", executorService, OptionMap.EMPTY); Xnio xnio = Xnio.getInstance(); registration = endpoint.addConnectionProvider("remote", new RemoteConnectionProviderFactory(xnio), OptionMap.create(Options.SSL_ENABLED, Boolean.FALSE)); networkServerProvider = endpoint.getConnectionProviderInterface("remote", NetworkServerProvider.class); provider = new SimpleServerAuthenticationProvider(); provider.addUser("bob", "test", "pass".toCharArray()); } private void close() throws Exception { System.out.println("initiating close"); if (sendChannel != null) { sendChannel.writeShutdown(); //This causes it to hang forever sendChannel.awaitClosed(); } IoUtils.safeClose(connection); IoUtils.safeClose(serviceRegistration); IoUtils.safeClose(streamServer); IoUtils.safeClose(endpoint); IoUtils.safeClose(registration); executorService.shutdown(); executorService.awaitTermination(1L, TimeUnit.DAYS); executorService.shutdownNow(); } private void channelStuff(final String channelName, final Channel channel) { System.out.println(channelName + " Channel opened"); channel.addCloseHandler(new CloseHandler() { public void handleClose(Channel closed) { System.out.println(channelName + " Channel close handler"); } }); channel.receiveMessage(new Channel.Receiver() { @Override public void handleMessage(Channel channel, MessageInputStream message) { try { while (message.read() != -1) { } } catch (IOException e) { e.printStackTrace(); } } @Override public void handleError(Channel channel, IOException error) { System.out.println(channelName + " Handled error"); error.printStackTrace(); } @Override public void handleEnd(Channel channel) { System.out.println(channelName + " Handled end"); try { channel.close(); } catch (IOException e) { e.printStackTrace(); } } }); } }