首页

关于taobao的tddl-sequence定义分布式序列及默认数据实现源码分析示例说明

标签:taobao,tddl-sequence,分布式序列,数据库序列,自定义序列生成器     发布时间:2019-01-04   

一、前言

关于taobao的tddl-sequence包(5.0.0)的定义com.taobao.tddl.sequence.Sequence序列接口、序列对应com.taobao.tddl.sequence.SequenceDao数据层查询接口,详情参见源码说明部分。

二、源码说明

1. Sequence序列接口、SequenceDao数据层接口

package com.taobao.tddl.sequence;@b@@b@import com.taobao.tddl.sequence.exception.SequenceException;@b@@b@public abstract interface Sequence@b@{@b@  public abstract long nextValue()@b@    throws SequenceException;@b@}
package com.taobao.tddl.sequence;@b@@b@import com.taobao.tddl.sequence.exception.SequenceException;@b@@b@public abstract interface SequenceDao@b@{@b@  public abstract SequenceRange nextRange(String paramString)@b@    throws SequenceException;@b@}

2. 序列默认DefaultSequence实现类、数据层DefaultSequenceDao实现类

package com.taobao.tddl.sequence.impl;@b@@b@import com.taobao.tddl.sequence.Sequence;@b@import com.taobao.tddl.sequence.SequenceDao;@b@import com.taobao.tddl.sequence.SequenceRange;@b@import com.taobao.tddl.sequence.exception.SequenceException;@b@import java.util.concurrent.locks.Lock;@b@import java.util.concurrent.locks.ReentrantLock;@b@@b@public class DefaultSequence@b@  implements Sequence@b@{@b@  private final Lock lock;@b@  private SequenceDao sequenceDao;@b@  private String name;@b@  private volatile SequenceRange currentRange;@b@@b@  public DefaultSequence()@b@  {@b@    this.lock = new ReentrantLock();@b@  }@b@@b@  public long nextValue()@b@    throws SequenceException@b@  {@b@    if (this.currentRange == null) {@b@      this.lock.lock();@b@      try {@b@        if (this.currentRange == null)@b@          this.currentRange = this.sequenceDao.nextRange(this.name);@b@      }@b@      finally {@b@        this.lock.unlock();@b@      }@b@    }@b@@b@    long value = this.currentRange.getAndIncrement();@b@    if (value == -1L) {@b@      this.lock.lock();@b@      try {@b@        while (true) {@b@          if (this.currentRange.isOver()) {@b@            this.currentRange = this.sequenceDao.nextRange(this.name);@b@          }@b@@b@          value = this.currentRange.getAndIncrement();@b@          if (value != -1L)@b@            break;@b@        }@b@@b@      }@b@      finally@b@      {@b@        this.lock.unlock();@b@      }@b@    }@b@@b@    if (value < 0L) {@b@      throw new SequenceException("Sequence value overflow, value = " + value);@b@    }@b@@b@    return value;@b@  }@b@@b@  public SequenceDao getSequenceDao() {@b@    return this.sequenceDao;@b@  }@b@@b@  public void setSequenceDao(SequenceDao sequenceDao) {@b@    this.sequenceDao = sequenceDao;@b@  }@b@@b@  public String getName() {@b@    return this.name;@b@  }@b@@b@  public void setName(String name) {@b@    this.name = name;@b@  }@b@}
package com.taobao.tddl.sequence.impl;@b@@b@import com.taobao.tddl.common.utils.logger.Logger;@b@import com.taobao.tddl.common.utils.logger.LoggerFactory;@b@import com.taobao.tddl.sequence.SequenceDao;@b@import com.taobao.tddl.sequence.SequenceRange;@b@import com.taobao.tddl.sequence.exception.SequenceException;@b@import java.sql.Connection;@b@import java.sql.PreparedStatement;@b@import java.sql.ResultSet;@b@import java.sql.SQLException;@b@import java.sql.Statement;@b@import java.sql.Timestamp;@b@import javax.sql.DataSource;@b@@b@public class DefaultSequenceDao@b@  implements SequenceDao@b@{@b@  private static final Logger logger = LoggerFactory.getLogger(DefaultSequenceDao.class);@b@  private static final int MIN_STEP = 1;@b@  private static final int MAX_STEP = 100000;@b@  private static final int DEFAULT_STEP = 1000;@b@  private static final int DEFAULT_RETRY_TIMES = 150;@b@  private static final String DEFAULT_TABLE_NAME = "sequence";@b@  private static final String DEFAULT_NAME_COLUMN_NAME = "name";@b@  private static final String DEFAULT_VALUE_COLUMN_NAME = "value";@b@  private static final String DEFAULT_GMT_MODIFIED_COLUMN_NAME = "gmt_modified";@b@  private static final long DELTA = 100000000L;@b@  private DataSource dataSource;@b@  private int retryTimes;@b@  private int step;@b@  private String tableName;@b@  private String nameColumnName;@b@  private String valueColumnName;@b@  private String gmtModifiedColumnName;@b@  private volatile String selectSql;@b@  private volatile String updateSql;@b@@b@  public DefaultSequenceDao()@b@  {@b@    this.retryTimes = 150;@b@@b@    this.step = 1000;@b@@b@    this.tableName = "sequence";@b@@b@    this.nameColumnName = "name";@b@@b@    this.valueColumnName = "value";@b@@b@    this.gmtModifiedColumnName = "gmt_modified";@b@  }@b@@b@  public SequenceRange nextRange(String name)@b@    throws SequenceException@b@  {@b@    if (name == null) {@b@      throw new IllegalArgumentException("序列名称不能为空");@b@    }@b@@b@    Connection conn = null;@b@    PreparedStatement stmt = null;@b@    ResultSet rs = null;@b@@b@    for (int i = 0; i < this.retryTimes + 1; ++i) { long oldValue;@b@      long newValue;@b@      try { StringBuilder message;@b@        conn = this.dataSource.getConnection();@b@        stmt = conn.prepareStatement(getSelectSql());@b@        stmt.setString(1, name);@b@        rs = stmt.executeQuery();@b@        rs.next();@b@        oldValue = rs.getLong(1);@b@@b@        if (oldValue < 0L) {@b@          message = new StringBuilder();@b@          message.append("Sequence value cannot be less than zero, value = ").append(oldValue);@b@          message.append(", please check table ").append(getTableName());@b@@b@          throw new SequenceException(message.toString());@b@        }@b@@b@        if (oldValue > 9223372036754775807L) {@b@          message = new StringBuilder();@b@          message.append("Sequence value overflow, value = ").append(oldValue);@b@          message.append(", please check table ").append(getTableName());@b@@b@          throw new SequenceException(message.toString());@b@        }@b@@b@        newValue = oldValue + getStep();@b@      } catch (SQLException e) {@b@      }@b@      finally {@b@        closeResultSet(rs);@b@        rs = null;@b@        closeStatement(stmt);@b@        stmt = null;@b@        closeConnection(conn);@b@        conn = null;@b@      }@b@      try@b@      {@b@        conn = this.dataSource.getConnection();@b@        stmt = conn.prepareStatement(getUpdateSql());@b@        stmt.setLong(1, newValue);@b@        stmt.setTimestamp(2, new Timestamp(System.currentTimeMillis()));@b@        stmt.setString(3, name);@b@        stmt.setLong(4, oldValue);@b@        int affectedRows = stmt.executeUpdate();@b@        if (affectedRows == 0)@b@        {@b@          closeStatement(stmt);@b@          stmt = null;@b@          closeConnection(conn);@b@          conn = null; break label454:@b@        }@b@        SequenceRange localSequenceRange = new SequenceRange(oldValue + 1L, newValue);@b@@b@        return localSequenceRange;@b@      }@b@      catch (SQLException affectedRows)@b@      {@b@      }@b@      finally@b@      {@b@        closeStatement(stmt);@b@        stmt = null;@b@        closeConnection(conn);@b@        conn = null;@b@      }@b@    }@b@@b@    label454: throw new SequenceException("Retried too many times, retryTimes = " + this.retryTimes);@b@  }@b@@b@  private String getSelectSql() {@b@    if (this.selectSql == null)@b@      synchronized (this) {@b@        if (this.selectSql == null) {@b@          StringBuilder buffer = new StringBuilder();@b@          buffer.append("select ").append(getValueColumnName());@b@          buffer.append(" from ").append(getTableName());@b@          buffer.append(" where ").append(getNameColumnName()).append(" = ?");@b@@b@          this.selectSql = buffer.toString();@b@        }@b@      }@b@@b@@b@    return this.selectSql;@b@  }@b@@b@  private String getUpdateSql() {@b@    if (this.updateSql == null)@b@      synchronized (this) {@b@        if (this.updateSql == null) {@b@          StringBuilder buffer = new StringBuilder();@b@          buffer.append("update ").append(getTableName());@b@          buffer.append(" set ").append(getValueColumnName()).append(" = ?, ");@b@          buffer.append(getGmtModifiedColumnName()).append(" = ? where ");@b@          buffer.append(getNameColumnName()).append(" = ? and ");@b@          buffer.append(getValueColumnName()).append(" = ?");@b@@b@          this.updateSql = buffer.toString();@b@        }@b@      }@b@@b@@b@    return this.updateSql;@b@  }@b@@b@  private static void closeResultSet(ResultSet rs) {@b@    if (rs != null)@b@      try {@b@        rs.close();@b@      } catch (SQLException e) {@b@        logger.debug("Could not close JDBC ResultSet", e);@b@      } catch (Throwable e) {@b@        logger.debug("Unexpected exception on closing JDBC ResultSet", e);@b@      }@b@  }@b@@b@  private static void closeStatement(Statement stmt)@b@  {@b@    if (stmt != null)@b@      try {@b@        stmt.close();@b@      } catch (SQLException e) {@b@        logger.debug("Could not close JDBC Statement", e);@b@      } catch (Throwable e) {@b@        logger.debug("Unexpected exception on closing JDBC Statement", e);@b@      }@b@  }@b@@b@  private static void closeConnection(Connection conn)@b@  {@b@    if (conn != null)@b@      try {@b@        conn.close();@b@      } catch (SQLException e) {@b@        logger.debug("Could not close JDBC Connection", e);@b@      } catch (Throwable e) {@b@        logger.debug("Unexpected exception on closing JDBC Connection", e);@b@      }@b@  }@b@@b@  public DataSource getDataSource()@b@  {@b@    return this.dataSource;@b@  }@b@@b@  public void setDataSource(DataSource dataSource) {@b@    this.dataSource = dataSource;@b@  }@b@@b@  public int getRetryTimes() {@b@    return this.retryTimes;@b@  }@b@@b@  public void setRetryTimes(int retryTimes) {@b@    if (retryTimes < 0) {@b@      throw new IllegalArgumentException("Property retryTimes cannot be less than zero, retryTimes = " + retryTimes);@b@    }@b@@b@    this.retryTimes = retryTimes;@b@  }@b@@b@  public int getStep() {@b@    return this.step;@b@  }@b@@b@  public void setStep(int step) {@b@    if ((step < 1) || (step > 100000)) {@b@      StringBuilder message = new StringBuilder();@b@      message.append("Property step out of range [").append(1);@b@      message.append(",").append(100000).append("], step = ").append(step);@b@@b@      throw new IllegalArgumentException(message.toString());@b@    }@b@@b@    this.step = step;@b@  }@b@@b@  public String getTableName() {@b@    return this.tableName;@b@  }@b@@b@  public void setTableName(String tableName) {@b@    this.tableName = tableName;@b@  }@b@@b@  public String getNameColumnName() {@b@    return this.nameColumnName;@b@  }@b@@b@  public void setNameColumnName(String nameColumnName) {@b@    this.nameColumnName = nameColumnName;@b@  }@b@@b@  public String getValueColumnName() {@b@    return this.valueColumnName;@b@  }@b@@b@  public void setValueColumnName(String valueColumnName) {@b@    this.valueColumnName = valueColumnName;@b@  }@b@@b@  public String getGmtModifiedColumnName() {@b@    return this.gmtModifiedColumnName;@b@  }@b@@b@  public void setGmtModifiedColumnName(String gmtModifiedColumnName) {@b@    this.gmtModifiedColumnName = gmtModifiedColumnName;@b@  }@b@}

3. 主要依赖类:SequenceRange序列范围定义类、随机序列RandomSequence实现类

package com.taobao.tddl.sequence;@b@@b@import java.util.concurrent.atomic.AtomicLong;@b@@b@public class SequenceRange@b@{@b@  private final long min;@b@  private final long max;@b@  private final AtomicLong value;@b@  private volatile boolean over = false;@b@@b@  public SequenceRange(long min, long max)@b@  {@b@    this.min = min;@b@    this.max = max;@b@    this.value = new AtomicLong(min);@b@  }@b@@b@  public long getAndIncrement() {@b@    long currentValue = this.value.getAndIncrement();@b@    if (currentValue > this.max) {@b@      this.over = true;@b@      return -1L;@b@    }@b@@b@    return currentValue;@b@  }@b@@b@  public long getMin() {@b@    return this.min;@b@  }@b@@b@  public long getMax() {@b@    return this.max;@b@  }@b@@b@  public boolean isOver() {@b@    return this.over;@b@  }@b@}
package com.taobao.tddl.sequence.util;@b@@b@import com.taobao.tddl.sequence.exception.SequenceException;@b@import java.util.ArrayList;@b@import java.util.Collections;@b@import java.util.Comparator;@b@import java.util.HashMap;@b@import java.util.Map;@b@import java.util.Map.Entry;@b@import java.util.Random;@b@@b@public class RandomSequence@b@{@b@  public static int[] randomIntSequence(int n)@b@    throws SequenceException@b@  {@b@    if (n <= 0)@b@      throw new SequenceException("产生随机序列范围值小于等于0");@b@@b@    int[] num = new int[n];@b@    for (int i = 0; i < n; ++i)@b@      num[i] = i;@b@@b@    if (n == 1)@b@      return num;@b@@b@    Random random = new Random();@b@    if ((n == 2) && (random.nextInt(2) == 1))@b@    {@b@      int temp = num[0];@b@      num[0] = num[1];@b@      num[1] = temp;@b@    }@b@@b@    int[] result = randomIntSequence(num);@b@@b@    return result;@b@  }@b@@b@  public static int[] randomIntSequence(int[] sourceQueue)@b@  {@b@    int size = sourceQueue.length;@b@    Map map = new HashMap(size);@b@    Random random = new Random();@b@    for (int i = 0; i < size; ++i) {@b@      int randomNum = random.nextInt(size * 100);@b@      map.put(Integer.valueOf(sourceQueue[i]), Integer.valueOf(randomNum));@b@    }@b@    ArrayList resultQueue = sortByValueAsc(map);@b@    int[] result = new int[size];@b@    for (int i = 0; i < size; ++i)@b@      result[i] = ((Integer)((Map.Entry)resultQueue.get(i)).getKey()).intValue();@b@@b@    return result;@b@  }@b@@b@  private static ArrayList<Map.Entry<Integer, Integer>> sortByValueAsc(Map<Integer, Integer> map) {@b@    ArrayList list = new ArrayList(map.entrySet());@b@    Collections.sort(list, new Comparator()@b@    {@b@      public int compare(Map.Entry<Integer, Integer> arg0, Map.Entry<Integer, Integer> arg1) {@b@        int result = -1;@b@        if (((Integer)arg0.getValue()).intValue() - ((Integer)arg1.getValue()).intValue() > 0)@b@          result = 1;@b@@b@        return result;@b@      }@b@@b@    });@b@    return list;@b@  }@b@}