首页

参考beangle-db-replication数据库同步机制源码设计实现从而对系统同步设计原理有更好的认识beangle

标签:beangle,replication,数据同步,数据库主从复制,Replicator     发布时间:2018-01-01   

一、前言

基于beangle-db-replication源码包(2.2.1)中的数据库db同步复制replication实现原理源码分析,主要定义org.beangle.db.replication.Replicator同步接口,然后分别有关于索引IndexReplicator实现、序列SequenceReplicator实现、数据DataReplicator实现、混合类型CompositeReplicator实现、公共约束ConstraintReplicator实现等,从而对常用的数据库同步replicator基于设计实现有可见深刻的认识,具体参见下方源码示例。

二、源码说明

1.Replicator接口、DataWrapper数据接口

package org.beangle.db.replication;@b@@b@public abstract interface Replicator@b@{@b@  public abstract void setTarget(DataWrapper paramDataWrapper);@b@@b@  public abstract void setSource(DataWrapper paramDataWrapper);@b@@b@  public abstract void reset();@b@@b@  public abstract void start();@b@}
package org.beangle.db.replication;@b@@b@import java.util.List;@b@import org.beangle.db.jdbc.meta.Table;@b@@b@public abstract interface DataWrapper@b@{@b@  public abstract List<Object> getData(String paramString);@b@@b@  public abstract List<Object> getData(Table paramTable);@b@@b@  public abstract int pushData(Table paramTable, List<Object> paramList);@b@@b@  public abstract void close();@b@@b@  public abstract int count(Table paramTable);@b@}

2.主要实现类有CompositeReplicator、ConstraintReplicator..SequenceReplicator

package org.beangle.db.replication.impl;@b@@b@import java.util.List;@b@import org.beangle.commons.collection.CollectUtils;@b@import org.beangle.db.replication.DataWrapper;@b@import org.beangle.db.replication.Replicator;@b@@b@public class CompositeReplicator@b@  implements Replicator@b@{@b@  List<Replicator> replicators = CollectUtils.newArrayList();@b@@b@  public CompositeReplicator(Replicator[] replicators)@b@  {@b@    this.replicators = CollectUtils.newArrayList(replicators);@b@  }@b@@b@  public void setTarget(DataWrapper source) {@b@    for (Replicator replicator : this.replicators)@b@      replicator.setTarget(source);@b@  }@b@@b@  public void setSource(DataWrapper source)@b@  {@b@    for (Replicator replicator : this.replicators)@b@      replicator.setSource(source);@b@  }@b@@b@  public void reset()@b@  {@b@    for (Replicator replicator : this.replicators)@b@      replicator.reset();@b@  }@b@@b@  public void start()@b@  {@b@    for (Replicator replicator : this.replicators)@b@      replicator.start();@b@  }@b@}
package org.beangle.db.replication.impl;@b@@b@import java.util.Collection;@b@import java.util.Collections;@b@import java.util.List;@b@import org.apache.commons.lang.time.StopWatch;@b@import org.beangle.commons.collection.CollectUtils;@b@import org.beangle.db.jdbc.meta.Constraint;@b@import org.beangle.db.jdbc.meta.Database;@b@import org.beangle.db.jdbc.meta.ForeignKey;@b@import org.beangle.db.replication.DataWrapper;@b@import org.beangle.db.replication.Replicator;@b@import org.beangle.db.replication.wrappers.DatabaseWrapper;@b@import org.slf4j.Logger;@b@import org.slf4j.LoggerFactory;@b@@b@public class ConstraintReplicator@b@  implements Replicator@b@{@b@  private static final Logger logger = LoggerFactory.getLogger(ConstraintReplicator.class);@b@  private DatabaseWrapper source;@b@  private DatabaseWrapper target;@b@  private List<Constraint> contraints = CollectUtils.newArrayList();@b@@b@  public ConstraintReplicator(DatabaseWrapper source, DatabaseWrapper target)@b@  {@b@    this.source = source;@b@    this.target = target;@b@  }@b@@b@  public void addAll(Collection<? extends Constraint> newContraints) {@b@    this.contraints.addAll(newContraints);@b@  }@b@@b@  public void reset()@b@  {@b@  }@b@@b@  public void start() {@b@    Collections.sort(this.contraints);@b@    StopWatch watch = new StopWatch();@b@    watch.start();@b@    logger.info("Start constraint replication...");@b@    String targetSchema = this.target.getDatabase().getSchema();@b@    for (Constraint contraint : this.contraints)@b@      if (contraint instanceof ForeignKey) {@b@        ForeignKey fk = (ForeignKey)contraint;@b@        String sql = fk.getAlterSql(this.target.getDialect(), targetSchema);@b@        try {@b@          this.target.execute(sql);@b@          logger.info("Apply constaint {}", fk.getName());@b@        } catch (Exception e) {@b@          logger.warn("Cannot execute {}", sql);@b@        }@b@      }@b@@b@    logger.info("End constraint replication,using {}", Long.valueOf(watch.getTime()));@b@  }@b@@b@  public void setSource(DataWrapper source) {@b@    this.source = ((DatabaseWrapper)source);@b@  }@b@@b@  public void setTarget(DataWrapper target) {@b@    this.target = ((DatabaseWrapper)target);@b@  }@b@}
package org.beangle.db.replication.impl;@b@@b@import java.util.Collection;@b@import java.util.Collections;@b@import java.util.List;@b@import org.apache.commons.collections.Buffer;@b@import org.apache.commons.collections.BufferUnderflowException;@b@import org.apache.commons.collections.BufferUtils;@b@import org.apache.commons.collections.buffer.UnboundedFifoBuffer;@b@import org.apache.commons.lang.StringUtils;@b@import org.apache.commons.lang.time.StopWatch;@b@import org.beangle.commons.collection.CollectUtils;@b@import org.beangle.commons.collection.page.PageLimit;@b@import org.beangle.db.jdbc.meta.Database;@b@import org.beangle.db.jdbc.meta.Table;@b@import org.beangle.db.replication.DataWrapper;@b@import org.beangle.db.replication.Replicator;@b@import org.beangle.db.replication.wrappers.DatabaseWrapper;@b@import org.slf4j.Logger;@b@import org.slf4j.LoggerFactory;@b@@b@public class DataReplicator@b@  implements Replicator@b@{@b@  private static final Logger logger = LoggerFactory.getLogger(DataReplicator.class);@b@  List<Table> tables = CollectUtils.newArrayList();@b@  DatabaseWrapper source;@b@  DatabaseWrapper target;@b@@b@  public DataReplicator()@b@  {@b@  }@b@@b@  public DataReplicator(DatabaseWrapper source, DatabaseWrapper target)@b@  {@b@    this.source = source;@b@    this.target = target;@b@  }@b@@b@  protected void addTable(Table table) {@b@    this.tables.add(table);@b@  }@b@@b@  public void addAll(Collection<? extends Table> newTables) {@b@    this.tables.addAll(newTables);@b@  }@b@@b@  public boolean addTables(String[] tables) {@b@    boolean success = true;@b@    for (int i = 0; i < tables.length; ++i) {@b@      String newTable = tables[i];@b@      if ((!(StringUtils.contains(tables[i], '.'))) && (null != this.source.getDatabase().getSchema()))@b@        newTable = Table.qualify(this.source.getDatabase().getSchema(), newTable);@b@@b@      Table tm = this.source.getDatabase().getTable(newTable);@b@      if (null == tm)@b@        logger.error("cannot find metadata for {}", newTable);@b@      else@b@        addTable(tm);@b@@b@      success &= tm != null;@b@    }@b@    return success;@b@  }@b@@b@  public void reset()@b@  {@b@  }@b@@b@  public void setSource(DataWrapper source) {@b@    this.source = ((DatabaseWrapper)source);@b@  }@b@@b@  public void setTarget(DataWrapper target) {@b@    this.target = ((DatabaseWrapper)target);@b@  }@b@@b@  public void start() {@b@    Collections.sort(this.tables);@b@    UnboundedFifoBuffer fifoBuffer = new UnboundedFifoBuffer();@b@    for (Table table : this.tables)@b@      fifoBuffer.add(table);@b@@b@    Buffer tableBuffer = BufferUtils.synchronizedBuffer(fifoBuffer);@b@    StopWatch watch = new StopWatch();@b@    watch.start();@b@    logger.info("Start data replication...");@b@    List tasks = CollectUtils.newArrayList();@b@    for (int i = 0; i < 9; ++i) {@b@      Thread thread = new Thread(new ReplicatorTask(this.source, this.target, tableBuffer));@b@      tasks.add(thread);@b@      thread.start();@b@    }@b@    for (Thread task : tasks)@b@      try {@b@        task.join();@b@      } catch (InterruptedException e) {@b@        e.printStackTrace();@b@      }@b@@b@    logger.info("End data replication,using {}", Long.valueOf(watch.getTime()));@b@  }@b@@b@  public static class ReplicatorTask implements Runnable {@b@    DatabaseWrapper source;@b@    DatabaseWrapper target;@b@    Buffer buffer;@b@@b@    public void run() {@b@      try {@b@        while (!(this.buffer.isEmpty())) {@b@          Table table = (Table)this.buffer.remove();@b@          replicate(table);@b@        }@b@      } catch (BufferUnderflowException e) {@b@        return;@b@      }@b@    }@b@@b@    public ReplicatorTask(DatabaseWrapper source, DatabaseWrapper target, Buffer buffer)@b@    {@b@      this.source = source;@b@      this.target = target;@b@      this.buffer = buffer;@b@    }@b@@b@    private boolean createOrReplaceTable(Table table) {@b@      if (this.target.drop(table)) {@b@        if (this.target.create(table)) {@b@          DataReplicator.access$000().info("Create table {}", table.getName());@b@          return true;@b@        }@b@        DataReplicator.access$000().error("Create table {} failure.", table.getName());@b@      }@b@@b@      return false;@b@    }@b@@b@    public void replicate(Table table) {@b@      String tableName = table.identifier();@b@@b@      table.setSchema(this.target.getDatabase().getSchema());@b@      try {@b@        if (!(createOrReplaceTable(table))) return;@b@        Table srcTable = this.source.getDatabase().getTable(tableName);@b@        int count = this.source.count(srcTable);@b@        if (count == 0) {@b@          this.target.pushData(table, Collections.emptyList());@b@          DataReplicator.access$000().info("Replicate {}(0)", table);@b@        } else {@b@          int curr = 0;@b@          PageLimit limit = new PageLimit(0, 1000);@b@          while (curr < count) {@b@            limit.setPageNo(limit.getPageNo() + 1);@b@            List data = this.source.getData(srcTable, limit);@b@            if (data.isEmpty()) {@b@              DataReplicator.access$000().error("Cannot fetch limit data in {} with page size {}", Integer.valueOf(limit.getPageNo()), Integer.valueOf(limit.getPageSize()));@b@            }@b@@b@            int successed = this.target.pushData(table, data);@b@            curr += data.size();@b@            if (successed == count)@b@              DataReplicator.access$000().info("Replicate {}({})", table, Integer.valueOf(successed));@b@            else if (successed == data.size())@b@              DataReplicator.access$000().info("Replicate {}({}/{})", new Object[] { table, Integer.valueOf(curr), Integer.valueOf(count) });@b@            else@b@              DataReplicator.access$000().warn("Replicate {}({}/{})", new Object[] { table, Integer.valueOf(successed), Integer.valueOf(data.size()) });@b@          }@b@        }@b@      }@b@      catch (Exception e) {@b@        DataReplicator.access$000().error("Replicate error " + table.identifier(), e);@b@      }@b@    }@b@  }@b@}
package org.beangle.db.replication.impl;@b@@b@import org.beangle.db.replication.DataWrapper;@b@import org.beangle.db.replication.Replicator;@b@import org.beangle.db.replication.wrappers.DatabaseWrapper;@b@@b@public class IndexReplicator@b@  implements Replicator@b@{@b@  DatabaseWrapper source;@b@  DatabaseWrapper target;@b@@b@  public void reset()@b@  {@b@  }@b@@b@  public void start()@b@  {@b@  }@b@@b@  public void setSource(DataWrapper source)@b@  {@b@    this.source = ((DatabaseWrapper)source);@b@  }@b@@b@  public void setTarget(DataWrapper target) {@b@    this.target = ((DatabaseWrapper)target);@b@  }@b@}
package org.beangle.db.replication.impl;@b@@b@import java.util.Collection;@b@import java.util.Collections;@b@import java.util.List;@b@import org.apache.commons.lang.time.StopWatch;@b@import org.beangle.commons.collection.CollectUtils;@b@import org.beangle.db.jdbc.dialect.Dialect;@b@import org.beangle.db.jdbc.meta.Database;@b@import org.beangle.db.jdbc.meta.Sequence;@b@import org.beangle.db.replication.DataWrapper;@b@import org.beangle.db.replication.Replicator;@b@import org.beangle.db.replication.wrappers.DatabaseWrapper;@b@import org.slf4j.Logger;@b@import org.slf4j.LoggerFactory;@b@@b@public class SequenceReplicator@b@  implements Replicator@b@{@b@  private static final Logger logger = LoggerFactory.getLogger(SequenceReplicator.class);@b@  DatabaseWrapper source;@b@  DatabaseWrapper target;@b@  List<Sequence> sequences = CollectUtils.newArrayList();@b@@b@  public SequenceReplicator(DatabaseWrapper source, DatabaseWrapper target)@b@  {@b@    this.source = source;@b@    this.target = target;@b@  }@b@@b@  public void setSource(DataWrapper source) {@b@    this.source = ((DatabaseWrapper)source);@b@  }@b@@b@  public void setTarget(DataWrapper target) {@b@    this.target = ((DatabaseWrapper)target);@b@  }@b@@b@  public void reset()@b@  {@b@  }@b@@b@  private boolean reCreate(Sequence sequence) {@b@    if (this.target.drop(sequence)) {@b@      if (this.target.create(sequence)) {@b@        logger.info("Recreate sequence {}", sequence.getName());@b@        return true;@b@      }@b@      logger.error("Recreate sequence {} failure.", sequence.getName());@b@    }@b@@b@    return false;@b@  }@b@@b@  public void start() {@b@    Dialect targetDialect = this.target.getDatabase().getDialect();@b@    if (null == targetDialect.getSequenceGrammar()) {@b@      logger.info("Target database {} dosen't support sequence,replication ommited.", targetDialect.getClass().getSimpleName());@b@@b@      return;@b@    }@b@    Collections.sort(this.sequences);@b@    StopWatch watch = new StopWatch();@b@    watch.start();@b@    logger.info("Start sequence replication...");@b@    for (Sequence sequence : this.sequences)@b@      reCreate(sequence);@b@@b@    logger.info("End sequence replication,using {}", Long.valueOf(watch.getTime()));@b@  }@b@@b@  public void addAll(Collection<Sequence> newSequences) {@b@    this.sequences.addAll(newSequences);@b@  }@b@}

3.ReplicatorBuilder构成类及相关

package org.beangle.db.replication;@b@@b@import java.util.Collection;@b@import java.util.List;@b@import java.util.Map;@b@import java.util.Set;@b@import javax.sql.DataSource;@b@import org.apache.commons.lang.Validate;@b@import org.beangle.commons.collection.CollectUtils;@b@import org.beangle.db.jdbc.dialect.Dialect;@b@import org.beangle.db.jdbc.dialect.Dialects;@b@import org.beangle.db.jdbc.meta.Constraint;@b@import org.beangle.db.jdbc.meta.Database;@b@import org.beangle.db.jdbc.meta.Sequence;@b@import org.beangle.db.jdbc.meta.Table;@b@import org.beangle.db.replication.impl.CompositeReplicator;@b@import org.beangle.db.replication.impl.ConstraintReplicator;@b@import org.beangle.db.replication.impl.DataReplicator;@b@import org.beangle.db.replication.impl.DefaultTableFilter;@b@import org.beangle.db.replication.impl.SequenceReplicator;@b@import org.beangle.db.replication.wrappers.DatabaseWrapper;@b@@b@public final class ReplicatorBuilder@b@{@b@  DatabaseSource source;@b@  DatabaseTarget target;@b@@b@  public DatabaseSource source(String dialectName, DataSource dataSource)@b@  {@b@    this.source = new DatabaseSource(dataSource, Dialects.getDialect(dialectName));@b@    return this.source;@b@  }@b@@b@  public DatabaseTarget target(String dialectName, DataSource dataSource) {@b@    this.target = new DatabaseTarget(this, dataSource, Dialects.getDialect(dialectName));@b@    return this.target;@b@  }@b@@b@  public Replicator build() {@b@    DatabaseWrapper sourceWrapper = this.source.buildWrapper();@b@    DatabaseWrapper targetWrapper = this.target.buildWrapper();@b@@b@    DataReplicator dataReplicator = new DataReplicator(sourceWrapper, targetWrapper);@b@    dataReplicator.addAll(this.source.filterTables());@b@@b@    ConstraintReplicator contraintRelicator = new ConstraintReplicator(sourceWrapper, targetWrapper);@b@    contraintRelicator.addAll(this.source.filterConstraints());@b@@b@    SequenceReplicator sequenceReplicator = new SequenceReplicator(sourceWrapper, targetWrapper);@b@    sequenceReplicator.addAll(this.source.filterSequences());@b@@b@    return new CompositeReplicator(new Replicator[] { dataReplicator, contraintRelicator, sequenceReplicator }); }@b@@b@  public static final class DatabaseSource { DataSource dataSource;@b@    Dialect dialect;@b@    String schema;@b@    String catelog;@b@    List<Table> tables;@b@    String[] includes;@b@    String[] excludes;@b@    boolean toLowercase = false;@b@    DatabaseWrapper wrapper = null;@b@    Collection<String> tablenames = null;@b@@b@    public DatabaseSource(DataSource dataSource, Dialect dialect) { this.dataSource = dataSource;@b@      this.dialect = dialect;@b@    }@b@@b@    public DatabaseSource schema(String schema) {@b@      this.schema = schema;@b@      return this;@b@    }@b@@b@    public DatabaseSource catelog(String catelog) {@b@      this.catelog = catelog;@b@      return this;@b@    }@b@@b@    public DatabaseSource lowercase() {@b@      this.toLowercase = true;@b@      return this;@b@    }@b@@b@    protected DatabaseWrapper buildWrapper() {@b@      if (null == this.schema) this.schema = this.dialect.defaultSchema();@b@      this.wrapper = new DatabaseWrapper(this.dataSource, this.dialect, this.catelog, this.schema);@b@      return this.wrapper;@b@    }@b@@b@    private Collection<String> filter(Set<String> finalTables) {@b@      String[] arr$;@b@      int len$;@b@      int i$;@b@      DefaultTableFilter filter = new DefaultTableFilter();@b@      if (null != this.includes) {@b@        arr$ = this.includes; len$ = arr$.length; for (i$ = 0; i$ < len$; ++i$) { String include = arr$[i$];@b@          filter.addInclude(include); }@b@      }@b@      if (null != this.excludes) {@b@        arr$ = this.excludes; len$ = arr$.length; for (i$ = 0; i$ < len$; ++i$) { String exclude = arr$[i$];@b@          filter.addExclude(exclude); }@b@      }@b@      return filter.filter(finalTables);@b@    }@b@@b@    public DatabaseSource tables(String[] includes) {@b@      this.includes = includes;@b@      return this;@b@    }@b@@b@    public DatabaseSource exclude(String[] excludes) {@b@      this.excludes = excludes;@b@      return this;@b@    }@b@@b@    public DatabaseSource indexes(String[] indexes) {@b@      return this;@b@    }@b@@b@    public DatabaseSource contraints(String[] string) {@b@      return this;@b@    }@b@@b@    public DatabaseSource sequences(String[] string) {@b@      return this;@b@    }@b@@b@    protected List<Table> filterTables() {@b@      Collection tablenames = filter(this.wrapper.getDatabase().getTables().keySet());@b@      this.tables = CollectUtils.newArrayList();@b@      for (String name : tablenames) {@b@        Table tb = this.wrapper.getDatabase().getTable(name);@b@        tb = tb.clone();@b@        if (this.toLowercase)@b@          tb.lowerCase();@b@@b@        this.tables.add(tb);@b@      }@b@      return this.tables;@b@    }@b@@b@    protected List<Constraint> filterConstraints() {@b@      Validate.notNull(this.tables, "Call filterTables first");@b@      List contraints = CollectUtils.newArrayList();@b@      for (Table table : this.tables)@b@        contraints.addAll(table.getForeignKeys().values());@b@@b@      return contraints;@b@    }@b@@b@    protected Collection<Sequence> filterSequences() {@b@      return this.wrapper.getDatabase().getSequences();@b@    }@b@  }@b@@b@  public final class DatabaseTarget@b@  {@b@    DataSource dataSource;@b@    Dialect dialect;@b@    String schema;@b@    String catelog;@b@@b@    public DatabaseTarget(, DataSource paramDataSource, Dialect paramDialect)@b@    {@b@      this.dataSource = paramDataSource;@b@      this.dialect = paramDialect;@b@    }@b@@b@    public DatabaseTarget schema() {@b@      this.schema = schema;@b@      return this;@b@    }@b@@b@    public DatabaseTarget catelog() {@b@      this.catelog = catelog;@b@      return this;@b@    }@b@@b@    protected DatabaseWrapper buildWrapper() {@b@      if (null == this.schema) this.schema = this.dialect.defaultSchema();@b@      return new DatabaseWrapper(this.dataSource, this.dialect, this.catelog, this.schema);@b@    }@b@  }@b@}

4.ReplicatorMain主入口调用

package org.beangle.db.replication;@b@@b@import java.io.PrintStream;@b@import org.apache.commons.lang.StringUtils;@b@import org.beangle.db.jdbc.util.DataSourceUtil;@b@@b@public class ReplicatorMain@b@{@b@  public static void main(String[] args)@b@    throws Exception@b@  {@b@    ReplicatorBuilder builder = new ReplicatorBuilder();@b@    if (args.length < 2) {@b@      System.out.println("Usage:ReplicatorMain datasource:dialect:schema targetsource:dialect:schema");@b@      System.exit(0);@b@    }@b@    String src = args[1];@b@    String tar = args[2];@b@    String[] source = StringUtils.split(src, ':');@b@    String[] target = StringUtils.split(tar, ':');@b@    builder.source(source[1], DataSourceUtil.getDataSource(source[0])).schema(source[2]).tables(new String[] { "*" }).contraints(new String[] { "*" }).sequences(new String[] { "*" });@b@@b@    builder.target(target[1], DataSourceUtil.getDataSource(target[0])).schema(target[2]);@b@    Replicator replicator = builder.build();@b@    replicator.start();@b@  }@b@}