首页

关于event源码包中事件Event、回调CallBack及循环体Loop定义及超时实现原理等源码说明

标签:event,事件,回调,callback,循环体,超时设计Timeout     发布时间:2018-04-24   

一、前言

关于event源码包中定义的event.Callback、event.Event、event.Loop事件相关接口及抽象类,然后结合TCP服务客户端超时链接事件Event进行源码设计和实现,具体参考下面源码说明部分。

二、源码说明

1.Callback、Event接口及Loop抽象类

package event;@b@@b@import java.io.PrintStream;@b@import java.nio.ByteBuffer;@b@import java.nio.channels.ServerSocketChannel;@b@import java.nio.channels.SocketChannel;@b@@b@public abstract interface Callback@b@{@b@  public static final ErrorCallback DEFAULT_ERROR_CB = new DefaultErrorCallback();@b@@b@  public static abstract class TCPServerCB@b@    implements Callback.ErrorCallback@b@  {@b@    public void onConnect(TCPServerLoop l, ServerSocketChannel ssc)@b@    {@b@    }@b@@b@    public abstract void onAccept(TCPServerLoop paramTCPServerLoop, ServerSocketChannel paramServerSocketChannel, SocketChannel paramSocketChannel);@b@@b@    public void onClose(TCPServerLoop l, ServerSocketChannel ssc)@b@    {@b@    }@b@@b@    public void onError(Loop l, Throwable t)@b@    {@b@      l.onError(t); }@b@@b@    public void onError(Loop l, String msg) {@b@      l.onError(msg); }@b@@b@    public void onError(TCPServerLoop l, ServerSocketChannel ssc, Throwable t) {@b@      onError(l, t);@b@    }@b@  }@b@@b@  public static abstract class TCPClientCB@b@    implements Callback.ErrorCallback@b@  {@b@    public void onConnect(TCPClientLoop l, SocketChannel c)@b@    {@b@    }@b@@b@    public abstract void onData(TCPClientLoop paramTCPClientLoop, SocketChannel paramSocketChannel, ByteBuffer paramByteBuffer);@b@@b@    public void onClose(TCPClientLoop l, SocketChannel c)@b@    {@b@    }@b@@b@    public void onError(Loop l, Throwable t)@b@    {@b@      l.onError(t); }@b@@b@    public void onError(Loop l, String msg) {@b@      l.onError(msg); }@b@@b@    public void onError(TCPClientLoop l, SocketChannel c, Throwable t) {@b@      onError(l, t);@b@    }@b@  }@b@@b@  public static class DefaultErrorCallback@b@    implements Callback.ErrorCallback@b@  {@b@    public void onError(Loop l, Throwable t)@b@    {@b@      t.printStackTrace();@b@      System.exit(1); }@b@@b@    public void onError(Loop l, String msg) {@b@      System.err.println(msg);@b@      Thread.currentThread(); Thread.dumpStack();@b@      System.exit(1);@b@    }@b@  }@b@@b@  public static abstract interface ErrorCallback extends Callback@b@  {@b@    public abstract void onError(Loop paramLoop, Throwable paramThrowable);@b@@b@    public abstract void onError(Loop paramLoop, String paramString);@b@  }@b@}
package event;@b@@b@public abstract interface Event@b@{@b@  public static abstract class Timeout@b@    implements Event@b@  {@b@    long timeout;@b@@b@    public Timeout()@b@    {@b@    }@b@@b@    public Timeout(long ms)@b@    {@b@      this.timeout = ms;@b@    }@b@@b@    public long getTimeout() {@b@      return this.timeout;@b@    }@b@@b@    public abstract void go(TimeoutLoop paramTimeoutLoop);@b@  }@b@}
package event;@b@@b@import java.io.IOException;@b@import java.io.PrintStream;@b@import java.nio.channels.Selector;@b@import java.nio.channels.spi.SelectorProvider;@b@@b@public abstract class Loop extends Thread@b@{@b@  protected long maxSleep;@b@  volatile boolean stopped;@b@  protected Thread loopThread;@b@  protected Callback.ErrorCallback errCB;@b@  protected Selector selector;@b@@b@  public Loop()@b@  {@b@    this.maxSleep = 0L;@b@    try@b@    {@b@      this.selector = SelectorProvider.provider().openSelector();@b@    } catch (IOException ioe) {@b@      throw new RuntimeException(ioe);@b@    }@b@  }@b@@b@  public Loop(Callback.ErrorCallback cb)@b@  {@b@    setErrCB(cb);@b@  }@b@@b@  public void run() {@b@    this.loopThread = Thread.currentThread();@b@    int numSelected = 0;@b@    if (!(this.stopped))@b@      try@b@      {@b@        numSelected = this.selector.select(this.maxSleep);@b@        this.maxSleep = 0L;@b@        go();@b@      } catch (Throwable t) {@b@        onError(t);@b@      }@b@  }@b@@b@  public boolean isLoopThread()@b@  {@b@    return ((this.loopThread != null) && (Thread.currentThread().equals(this.loopThread)));@b@  }@b@@b@  protected abstract void go()@b@    throws Throwable;@b@@b@  protected void onError(Throwable t)@b@  {@b@    if (null != this.errCB)@b@      this.errCB.onError(this, t);@b@    else@b@      Callback.DEFAULT_ERROR_CB.onError(this, t);@b@  }@b@@b@  protected void onError(String msg) {@b@    if (null != this.errCB)@b@      this.errCB.onError(this, msg);@b@    else@b@      Callback.DEFAULT_ERROR_CB.onError(this, msg);@b@  }@b@@b@  public void wake()@b@  {@b@    this.selector.wakeup();@b@  }@b@@b@  public void stopLoop()@b@  {@b@    this.stopped = true;@b@    wake();@b@  }@b@@b@  public void setErrCB(Callback.ErrorCallback errCB) {@b@    this.errCB = errCB;@b@  }@b@@b@  static void p(Object o) {@b@    System.out.println(o);@b@  }@b@}

2.TimeoutLoop超时实现类

package event;@b@@b@import java.io.PrintStream;@b@import java.util.LinkedList;@b@import java.util.PriorityQueue;@b@import java.util.Queue;@b@@b@public class TimeoutLoop extends Loop@b@{@b@  private Queue<T> timeouts;@b@  private LinkedList<T> newTimeouts;@b@  long loopTime;@b@@b@  public TimeoutLoop()@b@  {@b@    this.timeouts = new PriorityQueue();@b@    this.newTimeouts = new LinkedList();@b@  }@b@@b@  protected void go() {@b@    if ((!($assertionsDisabled)) && (!(isLoopThread()))) throw new AssertionError();@b@@b@    this.loopTime = System.nanoTime();@b@    handleTimeouts();@b@@b@    handleNewTimeouts();@b@  }@b@@b@  private void handleNewTimeouts() {@b@    if ((!($assertionsDisabled)) && (!(isLoopThread()))) throw new AssertionError();@b@@b@    synchronized (this.newTimeouts) {@b@      this.timeouts.addAll(this.newTimeouts);@b@      setMaxSleep();@b@@b@      this.newTimeouts.clear();@b@    }@b@  }@b@@b@  private void setMaxSleep()@b@  {@b@    if ((!($assertionsDisabled)) && (!(isLoopThread()))) throw new AssertionError();@b@@b@    T timeout = (T)this.timeouts.peek();@b@@b@    long sleep = (null == timeout) ? 0L : max(1000000L, timeout.time - this.loopTime);@b@    sleep /= 1000000L;@b@@b@    this.maxSleep = sleep;@b@  }@b@@b@  private int handleTimeouts() {@b@    if ((!($assertionsDisabled)) && (!(isLoopThread()))) throw new AssertionError();@b@@b@    if (this.timeouts.size() == 0) {@b@      return 0;@b@    }@b@@b@    int count = 0;@b@    T timeout = null;@b@    do@b@    {@b@      timeout = (T)this.timeouts.peek();@b@      if (this.loopTime < timeout.time) break;@b@      timeout.ev.go(this);@b@      this.timeouts.remove(timeout);@b@      ++count;@b@      if (timeout.interval) {@b@        timeout.time = (this.loopTime + timeout.ev.getTimeout() * 1000000L);@b@        this.timeouts.add(timeout);@b@      }@b@@b@    }@b@@b@    while (0 != this.timeouts.size());@b@@b@    return count;@b@  }@b@@b@  public void addTimeout(Event.Timeout ev) {@b@    addTimeout(ev, false); }@b@@b@  public void addInterval(Event.Timeout ev) {@b@    addTimeout(ev, true);@b@  }@b@@b@  private void addTimeout(Event.Timeout ev, boolean interval)@b@  {@b@    long timesOutOn = System.nanoTime() + ev.getTimeout() * 1000000L;@b@    T t = new T(this, timesOutOn, ev, interval);@b@@b@    synchronized (this.newTimeouts)@b@    {@b@      this.newTimeouts.add(t);@b@    }@b@    if (!(isLoopThread()))@b@      wake();@b@  }@b@@b@  private static long min(long one, long two)@b@  {@b@    if (one < two)@b@      return one;@b@@b@    return two; }@b@@b@  private static long max(long one, long two) {@b@    if (one > two)@b@      return one;@b@@b@    return two;@b@  }@b@@b@  public static void main(String[] args) throws Throwable@b@  {@b@    TimeoutLoop loop = new TimeoutLoop();@b@@b@    loop.start();@b@    loop.addTimeout(new Event.Timeout(750L) {@b@      public void go(TimeoutLoop l) { TimeoutLoop.p("timeout");@b@      }@b@@b@    });@b@    loop.addTimeout(new Event.Timeout() {@b@      public void go(TimeoutLoop l) { TimeoutLoop.p("timeout-1");@b@      }@b@@b@    });@b@    loop.addTimeout(new Event.Timeout(850L) {@b@      public void go(TimeoutLoop l) { TimeoutLoop.p("timeout2");@b@      }@b@@b@    });@b@    loop.addTimeout(new Event.Timeout(150L) {@b@      int i;@b@@b@      public void go(TimeoutLoop l) { TimeoutLoop.p("timeout0");@b@        this.i += 1;@b@        if (this.i > 3) return;@b@        l.addTimeout(this);@b@      }@b@@b@    });@b@    loop.addInterval(new Event.Timeout(100L) {@b@      public void go(TimeoutLoop l) { TimeoutLoop.p("interval");@b@      }@b@@b@    });@b@    Thread.sleep(1000L);@b@    loop.stopLoop();@b@  }@b@@b@  static void p(Object o) {@b@    System.out.println(o); }@b@@b@  class T implements Comparable<T> {@b@    Event.Timeout ev;@b@    long time;@b@    boolean interval;@b@@b@    T(, long paramLong, Event.Timeout paramTimeout, boolean paramBoolean) { this.time = paramLong;@b@      this.ev = ev;@b@      this.interval = paramBoolean; }@b@@b@    public int compareTo() {@b@      return (int)(this.time - o.time);@b@    }@b@  }@b@}

3.TCPClientLoop、TCPServerLoop类

package event;@b@@b@import java.io.IOException;@b@import java.net.InetAddress;@b@import java.net.InetSocketAddress;@b@import java.net.Socket;@b@import java.net.SocketAddress;@b@import java.nio.ByteBuffer;@b@import java.nio.channels.ClosedChannelException;@b@import java.nio.channels.SelectionKey;@b@import java.nio.channels.Selector;@b@import java.nio.channels.SocketChannel;@b@import java.util.Iterator;@b@import java.util.LinkedList;@b@import java.util.Queue;@b@import java.util.Set;@b@@b@public class TCPClientLoop extends TimeoutLoop@b@{@b@  private final ByteBuffer buf = ByteBuffer.allocateDirect(65535);@b@@b@  public SocketChannel createTCPClient(Callback.TCPClientCB cb, String host, int port)@b@  {@b@    SocketChannel sc;@b@    try@b@    {@b@      sc = SocketChannel.open();@b@@b@      SocketAddress remote = new InetSocketAddress(InetAddress.getByName(host), port);@b@@b@      sc.configureBlocking(false);@b@      if (isLoopThread())@b@        sc.register(this.selector, 8, new R(this, sc, cb));@b@      else@b@        addTimeout(new Event.Timeout(this, sc, cb) {@b@          public void go() {@b@            TCPClientLoop l = (TCPClientLoop)loop;@b@            try {@b@              this.val$sc.register(l.selector, 8, new TCPClientLoop.R(this.this$0, this.val$sc, this.val$cb));@b@            } catch (ClosedChannelException cce) {@b@              this.val$cb.onError(l, this.val$sc, cce);@b@            }@b@          }@b@        });@b@@b@      sc.connect(remote);@b@      return sc;@b@    } catch (Throwable t) {@b@      cb.onError(this, t);@b@    }@b@    return null; }@b@@b@  public void createTCPClient(Callback.TCPClientCB cb, SocketChannel sc) {@b@    try {@b@      if (null == sc) {@b@        cb.onError(this, "channel is null");@b@        return;@b@      }@b@      if (!(sc.isConnected())) {@b@        cb.onError(this, "channel not connected!");@b@        return;@b@      }@b@      if (sc.isBlocking()) {@b@        sc.configureBlocking(false);@b@        if (sc.isBlocking()) {@b@          cb.onError(this, "can't make channel non-blocking");@b@          return;@b@        }@b@      }@b@      if (isLoopThread())@b@        sc.register(this.selector, 1, new R(this, sc, cb));@b@      else@b@        addTimeout(new Event.Timeout(this, sc, cb) {@b@          public void go() {@b@            TCPClientLoop loop = (TCPClientLoop)l;@b@            try {@b@              this.val$sc.register(loop.selector, 1, new TCPClientLoop.R(this.this$0, this.val$sc, this.val$cb));@b@            } catch (ClosedChannelException cce) {@b@              this.val$cb.onError(loop, this.val$sc, cce);@b@            }@b@          }@b@        });@b@    }@b@    catch (Throwable t)@b@    {@b@      cb.onError(this, t);@b@    }@b@  }@b@@b@  public void write(SocketChannel sc, Callback.TCPClientCB cb, byte[] bytes)@b@  {@b@    write(sc, cb, ByteBuffer.wrap(bytes)); }@b@@b@  public void write(SocketChannel sc, Callback.TCPClientCB cb, ByteBuffer buffer) {@b@    if (!(isLoopThread())) {@b@      addTimeout(new Event.Timeout(this, sc, cb, buffer) {@b@        public void go() {@b@          ((TCPClientLoop)loop).write(this.val$sc, this.val$cb, this.val$buffer);@b@        }@b@@b@      });@b@      return;@b@    }@b@@b@    SelectionKey key = sc.keyFor(this.selector);@b@    if (null == key) {@b@      cb.onError(this, sc, new RuntimeException("not a previously configured channel!"));@b@    }@b@    else {@b@      key.interestOps(key.interestOps() | 0x4);@b@@b@      R r_orig = (R)key.attachment();@b@      r_orig.push(buffer);@b@    }@b@  }@b@@b@  public void close(SocketChannel sc, Callback.TCPClientCB client)@b@  {@b@    addTimeout(new Event.Timeout(this, sc, client) {@b@      public void go() {@b@        try {@b@          this.val$sc.close();@b@        } catch (Throwable t) {@b@          this.val$client.onError(this.this$0, this.val$sc, t);@b@        }@b@      }@b@    });@b@  }@b@@b@  public void go() {@b@    if ((!($assertionsDisabled)) && (!(isLoopThread()))) throw new AssertionError();@b@    Iterator keys = this.selector.selectedKeys().iterator();@b@    while (true) { SelectionKey key;@b@      do { if (!(keys.hasNext())) break label119;@b@        key = (SelectionKey)keys.next();@b@        keys.remove();@b@        if ((key.isValid()) && (key.isConnectable()))@b@          handleConnect(key);@b@@b@        if ((key.isValid()) && (key.isReadable()))@b@          handleRead(key);@b@      }@b@      while ((!(key.isValid())) || (!(key.isWritable())));@b@      handleWrite(key);@b@    }@b@@b@    label119: super.go();@b@  }@b@@b@  private void handleRead(SelectionKey key)@b@  {@b@    if ((!($assertionsDisabled)) && (!(isLoopThread()))) throw new AssertionError();@b@@b@    SocketChannel sc = (SocketChannel)key.channel();@b@    Callback.TCPClientCB cb = ((R)key.attachment()).cb;@b@@b@    this.buf.clear();@b@    int i = 0;@b@    try {@b@      i = sc.read(this.buf);@b@    } catch (IOException ioe) {@b@      cb.onError(this, sc, ioe);@b@      return;@b@    }@b@    if (-1 == i) {@b@      cb.onClose(this, sc);@b@@b@      key.interestOps(key.interestOps() & 0xFFFFFFFE);@b@    }@b@    else@b@    {@b@      this.buf.flip();@b@      cb.onData(this, sc, this.buf);@b@    }@b@  }@b@@b@  private void handleWrite(SelectionKey key)@b@  {@b@    if ((!($assertionsDisabled)) && (!(isLoopThread()))) throw new AssertionError();@b@    SocketChannel sc = (SocketChannel)key.channel();@b@    R r = (R)key.attachment();@b@@b@    Queue data = r.bufferList;@b@    ByteBuffer buffer = null;@b@    while (null != (buffer = (ByteBuffer)data.peek())) {@b@      long num;@b@      try {@b@        num = sc.write(buffer);@b@      }@b@      catch (IOException ioe) {@b@        r.cb.onError(this, sc, ioe);@b@@b@        return;@b@      }@b@      if (buffer.remaining() != 0)@b@      {@b@        break;@b@      }@b@      data.remove();@b@    }@b@@b@    if (data.isEmpty())@b@    {@b@      key.interestOps(key.interestOps() & 0xFFFFFFFB);@b@    }@b@  }@b@@b@  private void handleConnect(SelectionKey key)@b@  {@b@    if ((!($assertionsDisabled)) && (!(isLoopThread()))) throw new AssertionError();@b@@b@    SocketChannel sc = (SocketChannel)key.channel();@b@    Callback.TCPClientCB cb = ((R)key.attachment()).cb;@b@    try {@b@      sc.finishConnect();@b@    } catch (IOException ioe) {@b@      cb.onError(this, sc, ioe);@b@      return;@b@    }@b@    cb.onConnect(this, sc);@b@@b@    int io = key.interestOps();@b@    io |= 1;@b@    io &= -9;@b@    key.interestOps(io);@b@  }@b@@b@  public void shutdownOutput(SocketChannel sc, Callback.TCPClientCB cb)@b@  {@b@    try@b@    {@b@      sc.socket().shutdownOutput();@b@    } catch (IOException ioe) {@b@      cb.onError(this, sc, ioe);@b@    }@b@  }@b@@b@  static String bin(int num) {@b@    return Integer.toBinaryString(num); }@b@@b@  public static void main(String[] args) {@b@    TCPClientLoop loop = new TCPClientLoop();@b@    loop.start();@b@    Callback.TCPClientCB cb = new Callback.TCPClientCB() {@b@      public void onConnect(TCPClientLoop l, SocketChannel ch) {@b@        TimeoutLoop.p("onConnect: " + ch);@b@        byte[] bs = "GET / HTTP/1.1\r\n\r\n".getBytes();@b@        l.write(ch, this, ByteBuffer.wrap(bs)); }@b@@b@      public void onData(TCPClientLoop l, SocketChannel ch, ByteBuffer b) {@b@        TimeoutLoop.p("onData: " + b);@b@@b@        byte[] bs = "GET / HTTP/1.1\r\n\r\n".getBytes();@b@        l.write(ch, this, ByteBuffer.wrap(bs)); }@b@@b@      public void onClose(TCPClientLoop l, SocketChannel ch) {@b@        TimeoutLoop.p("closed: " + ch);@b@        SelectionKey key = ch.keyFor(l.selector);@b@        TimeoutLoop.p(key);@b@      }@b@@b@    };@b@    loop.createTCPClient(cb, args[0], 8000); }@b@@b@  class R {@b@    SocketChannel channel;@b@    Callback.TCPClientCB cb;@b@    ByteBuffer buffer;@b@    Queue<ByteBuffer> bufferList;@b@@b@    R(, SocketChannel paramSocketChannel, Callback.TCPClientCB paramTCPClientCB) {@b@      this.channel = paramSocketChannel;@b@      this.cb = paramTCPClientCB;@b@      this.bufferList = new LinkedList(); }@b@@b@    void push() {@b@      this.bufferList.add(buffer);@b@    }@b@  }@b@}
package event;@b@@b@import java.io.IOException;@b@import java.net.ServerSocket;@b@import java.net.SocketAddress;@b@import java.nio.channels.ClosedChannelException;@b@import java.nio.channels.SelectionKey;@b@import java.nio.channels.Selector;@b@import java.nio.channels.ServerSocketChannel;@b@import java.nio.channels.SocketChannel;@b@import java.util.Iterator;@b@import java.util.Set;@b@@b@public class TCPServerLoop extends TCPClientLoop@b@{@b@  public void createTCPServer(Callback.TCPServerCB cb, SocketAddress sa)@b@  {@b@    ServerSocketChannel ssc;@b@    try@b@    {@b@      ssc = ServerSocketChannel.open();@b@      ssc.configureBlocking(false);@b@      ssc.socket().bind(sa);@b@      cb.onConnect(this, ssc);@b@      if (isLoopThread())@b@        ssc.register(this.selector, 16, cb);@b@      else@b@        addTimeout(new Event.Timeout(this, ssc, cb) {@b@          public void go() {@b@            try {@b@              this.val$ssc.register(l.selector, 16, this.val$cb);@b@            } catch (ClosedChannelException cce) {@b@              this.val$cb.onError((TCPServerLoop)l, this.val$ssc, cce);@b@            }@b@          }@b@        });@b@    }@b@    catch (IOException ioe) {@b@      cb.onError(this, ioe);@b@    }@b@  }@b@@b@  public void go() {@b@    if ((!($assertionsDisabled)) && (!(isLoopThread()))) throw new AssertionError();@b@    Iterator keys = this.selector.selectedKeys().iterator();@b@    while (true) { SelectionKey key;@b@      do { if (!(keys.hasNext())) break label81;@b@        key = (SelectionKey)keys.next(); }@b@      while ((!(key.isValid())) || (!(key.isAcceptable())));@b@      keys.remove();@b@      handleAccept(key);@b@    }@b@@b@    label81: super.go();@b@  }@b@@b@  public void handleAccept(SelectionKey key) {@b@    if ((!($assertionsDisabled)) && (!(isLoopThread()))) throw new AssertionError();@b@    if ((!($assertionsDisabled)) && (!(key.isAcceptable()))) throw new AssertionError();@b@@b@    ServerSocketChannel ssc = (ServerSocketChannel)key.channel();@b@    Callback.TCPServerCB cb = (Callback.TCPServerCB)key.attachment();@b@    SocketChannel sc = null;@b@    try {@b@      sc = ssc.accept();@b@    } catch (IOException ioe) {@b@      cb.onError(this, ssc, ioe);@b@      return;@b@    }@b@    cb.onAccept(this, ssc, sc);@b@  }@b@}