当前位置: 技术问答>java相关
Non-Blocking SocketChannel的问题
来源: 互联网 发布时间:2017-03-21
本文导语: Non-Blocking SocketChannel的问题挺难伺候的,请大家帮忙。我的服务程序目的是通过ServerSocketChannel监听来自客户端的请求,客户端发一个文件名给服务器,服务器吧这个文件的内容送给客户端,很简单,但一定要用Non-Bloc...
Non-Blocking SocketChannel的问题挺难伺候的,请大家帮忙。我的服务程序目的是通过ServerSocketChannel监听来自客户端的请求,客户端发一个文件名给服务器,服务器吧这个文件的内容送给客户端,很简单,但一定要用Non-Blocking SocketChannel实现。测试的时候请用大文件测试(文件大于100K).为什么我要用2个Selector?因为一个Selector我没有成功,看到Sun一段代码(可惜不能直接运行测试的)用2个Selector,所以我依样做,还是不成功。
//server side SelectorTestServer.java
/**
http://access1.sun.com/codesamples/Selectors/selectors.html
* This example demostrate use of multiple Selectors() with a single SocketChannel.
* There are two threads running Selectors() each registered for READ and WRITE respectively
*/
import java.net.*;
import java.io.*;
import java.util.*;
import java.nio.channels.*;
import java.nio.charset.*;
import java.nio.channels.spi.SelectorProvider;
import java.nio.*;
public class SelectorTestServer {
/** Single selector for accepts, reads */
private Selector readSelector = null;
/** Single selector for writes */
private Selector writeSelector = null;
/** ServerSocketChannel which listens for client connections*/
private ServerSocketChannel ssch = null;
/** The thread that waits for ready Channels - accept / read*/
private SelectorThread readThread = null;
/** The thread that waits for ready Channels - write*/
private WriteThread writeThread = null;
/** Thread which runs the Selector */
private class SelectorThread extends Thread {
private int eventCount;
public SelectorThread() {
super("SelectorThread");
System.out.println("SelectorThread handle OP_ACCEPT|OP_WRITE event");
}
public void run () {
boolean running = true;
try{
eventCount=0;
while(readSelector.select() > 0){
eventCount++;
System.out.println("Read Thread while loop,eventCount="+eventCount);
Set readyKeys = readSelector.selectedKeys();
Iterator i = readyKeys.iterator();
while (i.hasNext()){
SelectionKey sk = (SelectionKey) i.next();
i.remove();
if (sk.isAcceptable()){
// new client connection
ServerSocketChannel nextReady = (ServerSocketChannel) sk.channel();
SocketChannel channel = nextReady.accept();
channel.configureBlocking(false);
channel.register(readSelector, SelectionKey.OP_READ);
writeSelector.wakeup(); // Added as required by specs
channel.register(writeSelector, SelectionKey.OP_WRITE);
addTask(channel);
}
else if (sk.isReadable())
{
System.out.println("****** readable");
doWork(sk);
}
}
}
}
catch (Exception ex){
System.out.println("Exception in selector loop: "+ex.toString());
running = false;
}
} //end run()
} //end class
/** Thread which runs the Selector */
private class WriteThread extends Thread {
private int eventCount;
public WriteThread() {
super("WriteThread");
System.out.println("WriteThread handle OP_WRITE event");
}
public void run () {
boolean running = true;
try{
// block until a Channel is ready for I/O
eventCount=0;
while(writeSelector.select() > 0){
eventCount++;
System.out.println("Write Thread while loop,event count:"+eventCount);
Set readyKeys = writeSelector.selectedKeys();
Iterator i = readyKeys.iterator();
while (i.hasNext()){
SelectionKey sk = (SelectionKey) i.next();
i.remove();
if (sk.isWritable()){
System.out.println("******* writable");
doWork(sk);
}
}
}
}
catch (Exception ex){
System.out.println("Exception in write selector loop: "+ex.toString());
running = false;
}
} //end run()
} //end class
private void doWork(SelectionKey sk)
{
Task t=getTask(sk);
if(t!=null)
{
t.activate(sk);
t.work();
if(t.isFinished())
deleteTask(t);
}
}
/** Stop the selector thread*/
public synchronized void stop () {
try {
this.ssch.close();
}
catch (Exception e) {
System.err.println("Exception in stop()"+e.toString());
}
this.ssch = null;
}
/**
Sets up the selectors and starts listening
*/
protected void startListening () {
try{
// create the selector and the ServerSocket
readSelector = SelectorProvider.provider().openSelector();
writeSelector = SelectorProvider.provider().openSelector();
ssch = ServerSocketChannel.open();
ssch.configureBlocking(false);
InetSocketAddress isa = new InetSocketAddress(InetAddress.getLocalHost(), 4900);
ssch.socket().bind(isa);
ssch.register(readSelector, SelectionKey.OP_ACCEPT);
ssch.register(writeSelector, SelectionKey.OP_ACCEPT);
System.out.println("startListening 4900 port");
}
catch (Exception e){
System.out.println("Error starting listening"+e.toString());
}
this.writeThread = new WriteThread();
this.writeThread.setDaemon(true);
this.writeThread.start();
this.readThread = new SelectorThread();
this.readThread.setDaemon(true);
this.readThread.start();
}
// Test the working
public static void main (String argv[]){
SelectorTestServer s = new SelectorTestServer();
s.startListening();
try{
Thread.currentThread().sleep(500000);
}
catch (Exception e){
System.err.println(e.toString());
}
}
private LinkedList taskQueue=new LinkedList();
private LinkedList activeQueue=new LinkedList();
public synchronized void addTask(SocketChannel sc)
{
Task t =new Task(sc);
taskQueue.addLast(t);
}
public synchronized Task getTask(SelectionKey sk)
{
boolean validchannel=false;
SocketChannel sc=(SocketChannel)sk.channel();
ListIterator li = taskQueue.listIterator();
while(li.hasNext())
{
Task t = (Task)li.next();
if(t.isMe(sc))
{
validchannel=true;
return t;
}
}
return null;
}
public synchronized void deleteTask(Task t)
{
taskQueue.remove(t);
}
private static long lastmessagetime=0;
public static void debugMsg(String s)
{
// long lt=System.currentTimeMillis();
// if(lt-lastmessagetime>1000)
// {
// System.out.println(s);
// lastmessagetime=lt;
// }
System.out.println(s);
}
}
//server side SelectorTestServer.java
/**
http://access1.sun.com/codesamples/Selectors/selectors.html
* This example demostrate use of multiple Selectors() with a single SocketChannel.
* There are two threads running Selectors() each registered for READ and WRITE respectively
*/
import java.net.*;
import java.io.*;
import java.util.*;
import java.nio.channels.*;
import java.nio.charset.*;
import java.nio.channels.spi.SelectorProvider;
import java.nio.*;
public class SelectorTestServer {
/** Single selector for accepts, reads */
private Selector readSelector = null;
/** Single selector for writes */
private Selector writeSelector = null;
/** ServerSocketChannel which listens for client connections*/
private ServerSocketChannel ssch = null;
/** The thread that waits for ready Channels - accept / read*/
private SelectorThread readThread = null;
/** The thread that waits for ready Channels - write*/
private WriteThread writeThread = null;
/** Thread which runs the Selector */
private class SelectorThread extends Thread {
private int eventCount;
public SelectorThread() {
super("SelectorThread");
System.out.println("SelectorThread handle OP_ACCEPT|OP_WRITE event");
}
public void run () {
boolean running = true;
try{
eventCount=0;
while(readSelector.select() > 0){
eventCount++;
System.out.println("Read Thread while loop,eventCount="+eventCount);
Set readyKeys = readSelector.selectedKeys();
Iterator i = readyKeys.iterator();
while (i.hasNext()){
SelectionKey sk = (SelectionKey) i.next();
i.remove();
if (sk.isAcceptable()){
// new client connection
ServerSocketChannel nextReady = (ServerSocketChannel) sk.channel();
SocketChannel channel = nextReady.accept();
channel.configureBlocking(false);
channel.register(readSelector, SelectionKey.OP_READ);
writeSelector.wakeup(); // Added as required by specs
channel.register(writeSelector, SelectionKey.OP_WRITE);
addTask(channel);
}
else if (sk.isReadable())
{
System.out.println("****** readable");
doWork(sk);
}
}
}
}
catch (Exception ex){
System.out.println("Exception in selector loop: "+ex.toString());
running = false;
}
} //end run()
} //end class
/** Thread which runs the Selector */
private class WriteThread extends Thread {
private int eventCount;
public WriteThread() {
super("WriteThread");
System.out.println("WriteThread handle OP_WRITE event");
}
public void run () {
boolean running = true;
try{
// block until a Channel is ready for I/O
eventCount=0;
while(writeSelector.select() > 0){
eventCount++;
System.out.println("Write Thread while loop,event count:"+eventCount);
Set readyKeys = writeSelector.selectedKeys();
Iterator i = readyKeys.iterator();
while (i.hasNext()){
SelectionKey sk = (SelectionKey) i.next();
i.remove();
if (sk.isWritable()){
System.out.println("******* writable");
doWork(sk);
}
}
}
}
catch (Exception ex){
System.out.println("Exception in write selector loop: "+ex.toString());
running = false;
}
} //end run()
} //end class
private void doWork(SelectionKey sk)
{
Task t=getTask(sk);
if(t!=null)
{
t.activate(sk);
t.work();
if(t.isFinished())
deleteTask(t);
}
}
/** Stop the selector thread*/
public synchronized void stop () {
try {
this.ssch.close();
}
catch (Exception e) {
System.err.println("Exception in stop()"+e.toString());
}
this.ssch = null;
}
/**
Sets up the selectors and starts listening
*/
protected void startListening () {
try{
// create the selector and the ServerSocket
readSelector = SelectorProvider.provider().openSelector();
writeSelector = SelectorProvider.provider().openSelector();
ssch = ServerSocketChannel.open();
ssch.configureBlocking(false);
InetSocketAddress isa = new InetSocketAddress(InetAddress.getLocalHost(), 4900);
ssch.socket().bind(isa);
ssch.register(readSelector, SelectionKey.OP_ACCEPT);
ssch.register(writeSelector, SelectionKey.OP_ACCEPT);
System.out.println("startListening 4900 port");
}
catch (Exception e){
System.out.println("Error starting listening"+e.toString());
}
this.writeThread = new WriteThread();
this.writeThread.setDaemon(true);
this.writeThread.start();
this.readThread = new SelectorThread();
this.readThread.setDaemon(true);
this.readThread.start();
}
// Test the working
public static void main (String argv[]){
SelectorTestServer s = new SelectorTestServer();
s.startListening();
try{
Thread.currentThread().sleep(500000);
}
catch (Exception e){
System.err.println(e.toString());
}
}
private LinkedList taskQueue=new LinkedList();
private LinkedList activeQueue=new LinkedList();
public synchronized void addTask(SocketChannel sc)
{
Task t =new Task(sc);
taskQueue.addLast(t);
}
public synchronized Task getTask(SelectionKey sk)
{
boolean validchannel=false;
SocketChannel sc=(SocketChannel)sk.channel();
ListIterator li = taskQueue.listIterator();
while(li.hasNext())
{
Task t = (Task)li.next();
if(t.isMe(sc))
{
validchannel=true;
return t;
}
}
return null;
}
public synchronized void deleteTask(Task t)
{
taskQueue.remove(t);
}
private static long lastmessagetime=0;
public static void debugMsg(String s)
{
// long lt=System.currentTimeMillis();
// if(lt-lastmessagetime>1000)
// {
// System.out.println(s);
// lastmessagetime=lt;
// }
System.out.println(s);
}
}
|
gz
|
太长了,先帮你up
再慢慢看
再慢慢看
|
太长了,我现在正忙验收项目等,实在没有时间,等忙完了,在看看看
|
I am so tired.
|
五星上将呀!这么厉害!
|
太长了,我现在正忙,过会看。
|
up
|
实在是不会
up
up
|
非阻塞套接字通道?
不懂啊.
给你 UP
不懂啊.
给你 UP
|
这个例子实在是好长·········
IBM的那个好像是有点问题·······
最近在看JDK14 Tutorial
里面有这方面的例子,不过当然比这个简单了
只有大约4K,或许你可以去参考参考?
IBM的那个好像是有点问题·······
最近在看JDK14 Tutorial
里面有这方面的例子,不过当然比这个简单了
只有大约4K,或许你可以去参考参考?
|
http://www.javaworld.com/javaworld/jw-09-2001/jw-0907-merlin_p.html
|
你的服务器端的selector用法好像有点错, 为什么把serverSocketChannel注册到两个selector中阿?
您可能感兴趣的文章:
本站(WWW.)旨在分享和传播互联网科技相关的资讯和技术,将尽最大努力为读者提供更好的信息聚合和浏览方式。
本站(WWW.)站内文章除注明原创外,均为转载、整理或搜集自网络。欢迎任何形式的转载,转载请注明出处。
本站(WWW.)站内文章除注明原创外,均为转载、整理或搜集自网络。欢迎任何形式的转载,转载请注明出处。