首页

分析apache的avro-ipc包中源码包中关于设计使用Callback异步回调及其实现代码示例

标签:Callback,异步回调,apache,avro-ipc,Apache     发布时间:2018-01-01   

一、前言

关于apache的avro-ipc源码包(1.7.5)中异步回调org.apache.avro.ipc.Callback、org.apache.avro.ipc.CallFuture、org.apache.avro.ipc.TransceiverCallback接口及其实现类的源码示例分析说明,从中我们可以了解一个回调处理设计实现案例,便于更好的了解callback的应用场景和实现设计等。

二、源码说明

1.Callback接口

package org.apache.avro.ipc;@b@@b@public abstract interface Callback<T>@b@{@b@  public abstract void handleResult(T paramT);@b@@b@  public abstract void handleError(Throwable paramThrowable);@b@}

2.原始实现类org.apache.avro.ipc.TransceiverCallback

protected class TransceiverCallback<T> implements Callback<List<ByteBuffer>> {@b@    private final Request request;@b@    private final Callback<T> callback; @b@   @b@    public TransceiverCallback(Request request, Callback<T> callback) {@b@      this.request = request;@b@      this.callback = callback;@b@    }@b@    @b@    @Override@b@    @SuppressWarnings("unchecked")@b@    public void handleResult(List<ByteBuffer> responseBytes) {@b@      ByteBufferInputStream bbi = new ByteBufferInputStream(responseBytes);@b@      BinaryDecoder in = DecoderFactory.get().binaryDecoder(bbi, null);@b@      try {@b@        if (!readHandshake(in)) {@b@          // Resend the handshake and return@b@          Request handshake = new Request(request);@b@          getTransceiver().transceive@b@            (handshake.getBytes(),@b@             new TransceiverCallback<T>(handshake, callback));@b@          return;@b@        }@b@      } catch (Exception e) {@b@        LOG.error("Error handling transceiver callback: " + e, e);@b@      }@b@      @b@      // Read response; invoke callback@b@      Response response = new Response(request, in);@b@      Object responseObject;@b@      try {@b@        try {@b@          responseObject = response.getResponse();@b@        } catch (Exception e) {@b@          if (callback != null) {@b@            callback.handleError(e);@b@          }@b@          return;@b@        }@b@        if (callback != null) {@b@          callback.handleResult((T)responseObject);@b@        }@b@      } catch (Throwable t) {@b@        LOG.error("Error in callback handler: " + t, t);@b@      }@b@    }@b@    @b@    @Override@b@    public void handleError(Throwable error) {@b@      callback.handleError(error);@b@    }@b@  }

3.CallFuture代理引用实现类

package org.apache.avro.ipc;@b@@b@import java.util.concurrent.CountDownLatch;@b@import java.util.concurrent.ExecutionException;@b@import java.util.concurrent.Future;@b@import java.util.concurrent.TimeUnit;@b@import java.util.concurrent.TimeoutException;@b@@b@public class CallFuture<T>@b@  implements Future<T>, Callback<T>@b@{@b@  private final CountDownLatch latch;@b@  private final Callback<T> chainedCallback;@b@  private T result;@b@  private Throwable error;@b@@b@  public CallFuture()@b@  {@b@    this(null);@b@  }@b@@b@  public CallFuture(Callback<T> chainedCallback)@b@  {@b@    this.latch = new CountDownLatch(1);@b@@b@    this.result = null;@b@    this.error = null;@b@@b@    this.chainedCallback = chainedCallback;@b@  }@b@@b@  public void handleResult(T result)@b@  {@b@    this.result = result;@b@    this.latch.countDown();@b@    if (this.chainedCallback != null)@b@      this.chainedCallback.handleResult(result);@b@  }@b@@b@  public void handleError(Throwable error)@b@  {@b@    this.error = error;@b@    this.latch.countDown();@b@    if (this.chainedCallback != null)@b@      this.chainedCallback.handleError(error);@b@  }@b@@b@  public T getResult()@b@  {@b@    return this.result;@b@  }@b@@b@  public Throwable getError()@b@  {@b@    return this.error;@b@  }@b@@b@  public boolean cancel(boolean mayInterruptIfRunning)@b@  {@b@    return false;@b@  }@b@@b@  public boolean isCancelled()@b@  {@b@    return false;@b@  }@b@@b@  public T get()@b@    throws InterruptedException, ExecutionException@b@  {@b@    this.latch.await();@b@    if (this.error != null)@b@      throw new ExecutionException(this.error);@b@@b@    return this.result;@b@  }@b@@b@  public T get(long timeout, TimeUnit unit)@b@    throws InterruptedException, ExecutionException, TimeoutException@b@  {@b@    if (this.latch.await(timeout, unit)) {@b@      if (this.error != null)@b@        throw new ExecutionException(this.error);@b@@b@      return this.result;@b@    }@b@    throw new TimeoutException();@b@  }@b@@b@  public void await()@b@    throws InterruptedException@b@  {@b@    this.latch.await();@b@  }@b@@b@  public void await(long timeout, TimeUnit unit)@b@    throws InterruptedException, TimeoutException@b@  {@b@    if (!(this.latch.await(timeout, unit)))@b@      throw new TimeoutException();@b@  }@b@@b@  public boolean isDone()@b@  {@b@    return (this.latch.getCount() <= 0L);@b@  }@b@}