branch feature/3857_sql_replication_service created (now af103b0)
This is an automated email from the git hooks/post-receive script. New change to branch feature/3857_sql_replication_service in repository topia. See http://git.nuiton.org/topia.git at af103b0 New replication sql service (See #3857) This branch includes the following new commits: new af103b0 New replication sql service (See #3857) The 1 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "adds" were already present in the repository and have only been added to this reference. Detailed log of new commits: commit af103b01ad2730be5cb7f47f22df9ed2823ef890 Author: Tony CHEMIT <chemit@codelutin.com> Date: Wed Dec 30 14:34:46 2015 +0100 New replication sql service (See #3857) -- To stop receiving notification emails like this one, please contact nuiton.org SCM administrator <admin+scm@nuiton.org>.
This is an automated email from the git hooks/post-receive script. New commit to branch feature/3857_sql_replication_service in repository topia. See http://git.nuiton.org/topia.git commit af103b01ad2730be5cb7f47f22df9ed2823ef890 Author: Tony CHEMIT <chemit@codelutin.com> Date: Wed Dec 30 14:34:46 2015 +0100 New replication sql service (See #3857) --- .../sql/TopiaSqlReplicationService.java | 29 +++ .../sql/TopiaSqlReplicationServiceImpl.java | 49 +++++ .../sql/action/ReplicateActionBuilderSupport.java | 44 ++++ .../action/ReplicationActionRequestSupport.java | 61 ++++++ .../sql/action/ReplicationActionSupport.java | 169 +++++++++++++++ .../topia/replication/sql/action/package-info.java | 9 + .../sql/action/todb/ReplicateToDbAction.java | 210 +++++++++++++++++++ .../action/todb/ReplicateToDbActionBuilder.java | 35 ++++ .../action/todb/ReplicateToDbActionRequest.java | 25 +++ .../replication/sql/action/todb/package-info.java | 9 + .../sql/action/tosql/ReplicateToSqlAction.java | 199 ++++++++++++++++++ .../action/tosql/ReplicateToSqlActionBuilder.java | 35 ++++ .../action/tosql/ReplicateToSqlActionRequest.java | 26 +++ .../replication/sql/action/tosql/package-info.java | 9 + .../nuiton/topia/replication/sql/package-info.java | 20 ++ .../sql/table/TopiaReplicationTable.java | 79 +++++++ .../sql/table/TopiaReplicationTables.java | 36 ++++ .../sql/table/TopiaReplicationTablesBuilder.java | 227 +++++++++++++++++++++ .../topia/replication/sql/table/package-info.java | 9 + 19 files changed, 1280 insertions(+) diff --git a/topia-service-replication/src/main/java/org/nuiton/topia/replication/sql/TopiaSqlReplicationService.java b/topia-service-replication/src/main/java/org/nuiton/topia/replication/sql/TopiaSqlReplicationService.java new file mode 100644 index 0000000..ff66fc9 --- /dev/null +++ b/topia-service-replication/src/main/java/org/nuiton/topia/replication/sql/TopiaSqlReplicationService.java @@ -0,0 +1,29 @@ +package org.nuiton.topia.replication.sql; + +import org.nuiton.topia.persistence.TopiaApplicationContext; +import org.nuiton.topia.persistence.TopiaService; +import org.nuiton.topia.replication.sql.action.todb.ReplicateToDbActionBuilder; +import org.nuiton.topia.replication.sql.action.tosql.ReplicateToSqlActionBuilder; + +import java.io.FileNotFoundException; +import java.io.Writer; + +/** + * Created on 29/12/15. + * + * @author Tony Chemit - chemit@codelutin.com + * @since 3.0.1 + */ +public interface TopiaSqlReplicationService extends TopiaService { + + /** + * @return a new replicate to db action builder + */ + ReplicateToDbActionBuilder newReplicateToDbActionBuilder(TopiaApplicationContext targetApplicationContext); + + /** + * @return a new replicate to sql action builder + */ + ReplicateToSqlActionBuilder newReplicatetoSqlActionBuilder(Writer writer) throws FileNotFoundException; + +} diff --git a/topia-service-replication/src/main/java/org/nuiton/topia/replication/sql/TopiaSqlReplicationServiceImpl.java b/topia-service-replication/src/main/java/org/nuiton/topia/replication/sql/TopiaSqlReplicationServiceImpl.java new file mode 100644 index 0000000..e2fb5ce --- /dev/null +++ b/topia-service-replication/src/main/java/org/nuiton/topia/replication/sql/TopiaSqlReplicationServiceImpl.java @@ -0,0 +1,49 @@ +package org.nuiton.topia.replication.sql; + +import org.nuiton.topia.persistence.TopiaApplicationContext; +import org.nuiton.topia.replication.sql.action.todb.ReplicateToDbActionBuilder; +import org.nuiton.topia.replication.sql.action.tosql.ReplicateToSqlActionBuilder; + +import java.io.Writer; +import java.util.Map; + +/** + * + * Created on 29/12/15. + * + * @author Tony Chemit - chemit@codelutin.com + * @since 3.0.1 + */ +public class TopiaSqlReplicationServiceImpl implements TopiaSqlReplicationService { + + public static final int DEFAULT_FETCH_SIZE = 1000; + + protected TopiaApplicationContext topiaApplicationContext; + + @Override + public ReplicateToDbActionBuilder newReplicateToDbActionBuilder(TopiaApplicationContext targetApplicationContext) { + return new ReplicateToDbActionBuilder() + .from(topiaApplicationContext) + .to(targetApplicationContext) + .setFetchSize(DEFAULT_FETCH_SIZE); + } + + @Override + public ReplicateToSqlActionBuilder newReplicatetoSqlActionBuilder(Writer writer) { + return new ReplicateToSqlActionBuilder() + .from(topiaApplicationContext) + .to(writer) + .setFetchSize(DEFAULT_FETCH_SIZE); + } + + @Override + public void initTopiaService(TopiaApplicationContext topiaApplicationContext, Map<String, String> serviceConfiguration) { + this.topiaApplicationContext = topiaApplicationContext; + //TODO Add parameters (default fetchSize) + } + + @Override + public void close() { + + } +} diff --git a/topia-service-replication/src/main/java/org/nuiton/topia/replication/sql/action/ReplicateActionBuilderSupport.java b/topia-service-replication/src/main/java/org/nuiton/topia/replication/sql/action/ReplicateActionBuilderSupport.java new file mode 100644 index 0000000..ccdb246 --- /dev/null +++ b/topia-service-replication/src/main/java/org/nuiton/topia/replication/sql/action/ReplicateActionBuilderSupport.java @@ -0,0 +1,44 @@ +package org.nuiton.topia.replication.sql.action; + +import org.nuiton.topia.persistence.TopiaApplicationContext; +import org.nuiton.topia.replication.sql.table.TopiaReplicationTables; + +/** + * Support to create action builder. + * + * Created on 29/12/15. + * + * @author Tony Chemit - chemit@codelutin.com + * @since 3.0.1 + */ +public abstract class ReplicateActionBuilderSupport<B extends ReplicateActionBuilderSupport, R extends ReplicationActionRequestSupport, A extends ReplicationActionSupport<R>> { + + protected final R replicationRequest; + + public abstract A build(); + + public ReplicateActionBuilderSupport(R replicationRequest) { + this.replicationRequest = replicationRequest; + } + + public B from(TopiaApplicationContext sourceTopiaApplicationContext) { + replicationRequest.setSourceTopiaApplicationContext(sourceTopiaApplicationContext); + return (B) this; + } + + public B setFetchSize(int fetchSize) { + replicationRequest.setFetchSize(fetchSize); + return (B) this; + } + + public B setArg(String arg) { + replicationRequest.setArg(arg); + return (B) this; + } + + public B setTables(TopiaReplicationTables tables) { + replicationRequest.setTables(tables); + return (B) this; + } + +} diff --git a/topia-service-replication/src/main/java/org/nuiton/topia/replication/sql/action/ReplicationActionRequestSupport.java b/topia-service-replication/src/main/java/org/nuiton/topia/replication/sql/action/ReplicationActionRequestSupport.java new file mode 100644 index 0000000..471350c --- /dev/null +++ b/topia-service-replication/src/main/java/org/nuiton/topia/replication/sql/action/ReplicationActionRequestSupport.java @@ -0,0 +1,61 @@ +package org.nuiton.topia.replication.sql.action; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.nuiton.topia.persistence.TopiaApplicationContext; +import org.nuiton.topia.replication.sql.table.TopiaReplicationTables; + +/** + * Support to create action request. + * + * Created on 29/12/15. + * + * @author Tony Chemit - chemit@codelutin.com + * @since 3.0.1 + */ +public abstract class ReplicationActionRequestSupport { + + /** Logger. */ + private static final Log log = LogFactory.getLog(ReplicationActionRequestSupport.class); + + protected TopiaApplicationContext sourceTopiaApplicationContext; + + protected TopiaReplicationTables tables; + + protected int fetchSize; + + protected String arg; + + public TopiaReplicationTables getTables() { + return tables; + } + + public int getFetchSize() { + return fetchSize; + } + + public String getArg() { + return arg; + } + + public TopiaApplicationContext getSourceTopiaApplicationContext() { + return sourceTopiaApplicationContext; + } + + protected void setTables(TopiaReplicationTables tables) { + this.tables = tables; + } + + protected void setFetchSize(int fetchSize) { + this.fetchSize = fetchSize; + } + + protected void setArg(String arg) { + this.arg = arg; + } + + protected void setSourceTopiaApplicationContext(TopiaApplicationContext sourceTopiaApplicationContext) { + this.sourceTopiaApplicationContext = sourceTopiaApplicationContext; + } + +} diff --git a/topia-service-replication/src/main/java/org/nuiton/topia/replication/sql/action/ReplicationActionSupport.java b/topia-service-replication/src/main/java/org/nuiton/topia/replication/sql/action/ReplicationActionSupport.java new file mode 100644 index 0000000..35aa731 --- /dev/null +++ b/topia-service-replication/src/main/java/org/nuiton/topia/replication/sql/action/ReplicationActionSupport.java @@ -0,0 +1,169 @@ +package org.nuiton.topia.replication.sql.action; + +import com.google.common.collect.Iterables; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.nuiton.topia.persistence.TopiaPersistenceContext; +import org.nuiton.topia.persistence.internal.AbstractTopiaPersistenceContext; +import org.nuiton.topia.persistence.support.TopiaSqlWork; +import org.nuiton.topia.replication.sql.table.TopiaReplicationTable; +import org.nuiton.topia.replication.sql.table.TopiaReplicationTables; +import org.nuiton.util.TimeLog; + +import java.io.Closeable; +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSetMetaData; +import java.sql.SQLException; + +/** + * Support to create action. + * + * Created on 29/12/15. + * + * @author Tony Chemit - chemit@codelutin.com + * @since 3.0.1 + */ +public abstract class ReplicationActionSupport<R extends ReplicationActionRequestSupport> implements Runnable, Closeable { + + /** Logger. */ + private static final Log log = LogFactory.getLog(ReplicationActionSupport.class); + + private static final TimeLog TIME_LOG = new TimeLog(ReplicationActionSupport.class, 50L, 100L); + + protected final R request; + + protected TopiaPersistenceContext sourcePersistenceContext; + + protected long startTime; + + protected long endTime; + + protected abstract void executeOnTable(TopiaReplicationTable table); + + protected ReplicationActionSupport(R request) { + this.request = request; + } + + @Override + public final void run() { + + TopiaReplicationTables tables = request.getTables(); + + before(tables); + + for (TopiaReplicationTable table : tables) { + + long startTable = TimeLog.getTime(); + + executeOnTable(table); + + TIME_LOG.log(startTable, "Executed on table.", table.getFullyTableName()); + + } + + TIME_LOG.log(startTime, "All tables executed"); + + after(tables); + + endTime = TIME_LOG.log(startTime, "Replication executed."); + + } + + @Override + public void close() { + + if (sourcePersistenceContext != null) { + sourcePersistenceContext.close(); + } + + } + + public R getRequest() { + return request; + } + + protected PreparedStatement createReadStatement(TopiaReplicationTable table, Connection connection) throws SQLException { + + StringBuilder sqlBuilder = new StringBuilder("SELECT " + table.getTableName() + ".*"); + + sqlBuilder.append(" FROM ").append(table.getFromClause()); + for (String joinClause : table.getJoinClauses()) { + sqlBuilder.append(" ").append(joinClause); + } + if (table.getWhereClause() != null) { + sqlBuilder.append(" WHERE ").append(table.getWhereClause()); + } + + String sql = sqlBuilder.toString(); + if (log.isDebugEnabled()) { + log.debug("Read sql: " + sql); + } + PreparedStatement statement = connection.prepareStatement(sql); + if (request.getArg() != null) { + statement.setString(1, request.getArg()); + } + statement.setFetchSize(request.getFetchSize()); + return statement; + + } + + protected String createWriteStatement(TopiaReplicationTable table, ResultSetMetaData readResultTatMetaData) throws SQLException { + + int columnCount = readResultTatMetaData.getColumnCount(); + + StringBuilder sqlBuilder = new StringBuilder("INSERT INTO "); + sqlBuilder.append(table.getSchemaName()).append(".").append(table.getTableName()); + + StringBuilder columnNamesBuilder = new StringBuilder(); + + columnNamesBuilder.append(" ").append(readResultTatMetaData.getColumnName(1)); + + for (int i = 2; i <= columnCount; i++) { + + String columnName = readResultTatMetaData.getColumnName(i); + columnNamesBuilder.append(", ").append(columnName); + } + + sqlBuilder.append("(") + .append(columnNamesBuilder.toString().trim()) + .append(")") + .append("VALUES (%s);\n"); + + String sql = sqlBuilder.toString(); + if (log.isDebugEnabled()) { + log.debug("Write sql: " + sql.trim()); + } + return sql; + + } + + protected void before(TopiaReplicationTables tables) { + startTime = TimeLog.getTime(); + if (log.isDebugEnabled()) { + log.debug("Before with tables: " + Iterables.size(tables)); + } + } + + protected void after(TopiaReplicationTables tables) { + + if (log.isDebugEnabled()) { + log.debug("After with tables: " + Iterables.size(tables)); + } + + TIME_LOG.log(startTime, "All tables executed"); + + } + + protected void executeSqlWork(TopiaSqlWork sqlWork) { + getSourcePersistenceContext().getSqlSupport().doSqlWork(sqlWork); + } + + protected AbstractTopiaPersistenceContext getSourcePersistenceContext() { + if (sourcePersistenceContext == null) { + sourcePersistenceContext = request.getSourceTopiaApplicationContext().newPersistenceContext(); + } + return (AbstractTopiaPersistenceContext) sourcePersistenceContext; + } + +} diff --git a/topia-service-replication/src/main/java/org/nuiton/topia/replication/sql/action/package-info.java b/topia-service-replication/src/main/java/org/nuiton/topia/replication/sql/action/package-info.java new file mode 100644 index 0000000..532a6d5 --- /dev/null +++ b/topia-service-replication/src/main/java/org/nuiton/topia/replication/sql/action/package-info.java @@ -0,0 +1,9 @@ +/** + * <h1>Paquetage de base pour les actions</h1> + * + * <p>On retrouve ici les classes abstraites de l'API d'action</p> + * + * @author Tony Chemit - chemit@codelutin.com + * @since 3.0.1 + */ +package org.nuiton.topia.replication.sql.action; \ No newline at end of file diff --git a/topia-service-replication/src/main/java/org/nuiton/topia/replication/sql/action/todb/ReplicateToDbAction.java b/topia-service-replication/src/main/java/org/nuiton/topia/replication/sql/action/todb/ReplicateToDbAction.java new file mode 100644 index 0000000..da6842b --- /dev/null +++ b/topia-service-replication/src/main/java/org/nuiton/topia/replication/sql/action/todb/ReplicateToDbAction.java @@ -0,0 +1,210 @@ +package org.nuiton.topia.replication.sql.action.todb; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.nuiton.topia.persistence.internal.AbstractTopiaPersistenceContext; +import org.nuiton.topia.persistence.support.TopiaSqlWork; +import org.nuiton.topia.replication.sql.action.ReplicationActionSupport; +import org.nuiton.topia.replication.sql.table.TopiaReplicationTable; +import org.nuiton.topia.replication.sql.table.TopiaReplicationTables; + +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.ResultSetMetaData; +import java.sql.SQLException; +import java.sql.Statement; + +/** + * Action to replicate some tables to another database. + * + * Created on 29/12/15. + * + * @author Tony Chemit - chemit@codelutin.com + * @since 3.0.1 + */ +public class ReplicateToDbAction extends ReplicationActionSupport<ReplicateToDbActionRequest> { + + /** Logger. */ + private static final Log log = LogFactory.getLog(ReplicateToDbAction.class); + + protected AbstractTopiaPersistenceContext targetPersistenceContext; + + public ReplicateToDbAction(ReplicateToDbActionRequest context) { + super(context); + } + + @Override + public void close() { + + try { + super.close(); + } finally { + + if (targetPersistenceContext != null) { + targetPersistenceContext.close(); + } + + } + + } + + public AbstractTopiaPersistenceContext getTargetPersistenceContext() { + if (targetPersistenceContext == null) { + targetPersistenceContext = (AbstractTopiaPersistenceContext) request.getTargetTopiaApplicationContext().newPersistenceContext(); + } + return targetPersistenceContext; + } + + @Override + protected void after(TopiaReplicationTables tables) { + + super.after(tables); + + if (targetPersistenceContext != null) { + targetPersistenceContext.commit(); + } + + } + + @Override + protected void executeOnTable(TopiaReplicationTable table) { + + if (log.isInfoEnabled()) { + log.info("Replicate: " + table.getFullyTableName()); + } + + ReadSqlWork sqlWork = new ReadSqlWork(table); + executeSqlWork(sqlWork); + + } + + protected class ReadSqlWork implements TopiaSqlWork { + + private final TopiaReplicationTable table; + + public ReadSqlWork(TopiaReplicationTable table) { + + this.table = table; + } + + @Override + public void execute(Connection connection) throws SQLException { + + try (PreparedStatement readStatement = createReadStatement(table, connection)) { + + readStatement.execute(); + CopySqlWork sqlWork = new CopySqlWork(table, readStatement); + getTargetPersistenceContext().getSqlSupport().doSqlWork(sqlWork); + + } + + } + + } + + protected class CopySqlWork implements TopiaSqlWork { + + protected final Statement readStatement; + + private final TopiaReplicationTable table; + + public CopySqlWork(TopiaReplicationTable table, Statement readStatement) { + this.table = table; + this.readStatement = readStatement; + } + + @Override + public void execute(Connection connection) throws SQLException { + + ResultSet readResultSet = readStatement.getResultSet(); + + ResultSetMetaData readResultSetMetaData = readResultSet.getMetaData(); + int columnCount = readResultSetMetaData.getColumnCount(); + + int fetchSize = request.getFetchSize(); + String tableName = table.getFullyTableName(); + + try (PreparedStatement writeStatement = createWriteStatement(connection, readResultSetMetaData)) { + + + long index = 0; + while (readResultSet.next()) { + + copyRow(readResultSet, writeStatement, columnCount); + + if ((++index % fetchSize) == 0) { + + flush(writeStatement, tableName, index); + + } + } + + flush(writeStatement, tableName, index); + } + + } + + public PreparedStatement createWriteStatement(Connection connection, ResultSetMetaData readResultTatMetaData) throws SQLException { + + int columnCount = readResultTatMetaData.getColumnCount(); + + StringBuilder sqlBuilder = new StringBuilder("INSERT INTO "); + sqlBuilder.append(table.getSchemaName()).append(".").append(table.getTableName()); + + StringBuilder columnNamesBuilder = new StringBuilder(); + StringBuilder argsBuilder = new StringBuilder(); + + columnNamesBuilder.append(" ").append(readResultTatMetaData.getColumnName(1)); + argsBuilder.append("?"); + + for (int i = 2; i <= columnCount; i++) { + + String columnName = readResultTatMetaData.getColumnName(i); + columnNamesBuilder.append(", ").append(columnName); + argsBuilder.append(", ?"); + } + + sqlBuilder.append("(") + .append(columnNamesBuilder) + .append(")") + .append("VALUES (") + .append(argsBuilder) + .append(")"); + + String sql = sqlBuilder.toString(); + if (log.isDebugEnabled()) { + log.debug("Write sql: " + sql); + } + PreparedStatement statement = connection.prepareStatement(sql); + + return statement; + + } + + protected void copyRow(ResultSet readResultSet, PreparedStatement writeStatement, int columnCount) throws SQLException { + writeStatement.clearParameters(); + + if (log.isTraceEnabled()) { + log.trace("Copy " + readResultSet.getString(1)); + } + for (int i = 1; i <= columnCount; i++) { + Object object = readResultSet.getObject(i); + writeStatement.setObject(i, object); + } + writeStatement.addBatch(); + } + + protected void flush(PreparedStatement writeStatement, String tableName, long index) throws SQLException { + + if (log.isDebugEnabled()) { + log.debug("Flush for : " + tableName + " (size: " + index + ")"); + } + writeStatement.executeBatch(); + writeStatement.clearBatch(); + + } + + } + +} diff --git a/topia-service-replication/src/main/java/org/nuiton/topia/replication/sql/action/todb/ReplicateToDbActionBuilder.java b/topia-service-replication/src/main/java/org/nuiton/topia/replication/sql/action/todb/ReplicateToDbActionBuilder.java new file mode 100644 index 0000000..fa5cd96 --- /dev/null +++ b/topia-service-replication/src/main/java/org/nuiton/topia/replication/sql/action/todb/ReplicateToDbActionBuilder.java @@ -0,0 +1,35 @@ +package org.nuiton.topia.replication.sql.action.todb; + +import com.google.common.base.Preconditions; +import org.nuiton.topia.persistence.TopiaApplicationContext; +import org.nuiton.topia.replication.sql.action.ReplicateActionBuilderSupport; + +/** + * Builder of {@link ReplicateToDbAction}. + * + * Created on 29/12/15. + * + * @author Tony Chemit - chemit@codelutin.com + * @since 3.0.1 + */ +public class ReplicateToDbActionBuilder extends ReplicateActionBuilderSupport<ReplicateToDbActionBuilder, ReplicateToDbActionRequest, ReplicateToDbAction> { + + public ReplicateToDbActionBuilder() { + super(new ReplicateToDbActionRequest()); + } + + public ReplicateToDbActionBuilder to(TopiaApplicationContext targetTopiaApplicationContext) { + replicationRequest.setTargetTopiaApplicationContext(targetTopiaApplicationContext); + return this; + } + + @Override + public ReplicateToDbAction build() { + Preconditions.checkState(replicationRequest.getSourceTopiaApplicationContext() != null, "No sourceTopiaApplicationContext defined"); + Preconditions.checkState(replicationRequest.getTargetTopiaApplicationContext() != null, "No targetTopiaApplicationContext defined"); + + ReplicateToDbAction action = new ReplicateToDbAction(replicationRequest); + return action; + } + +} diff --git a/topia-service-replication/src/main/java/org/nuiton/topia/replication/sql/action/todb/ReplicateToDbActionRequest.java b/topia-service-replication/src/main/java/org/nuiton/topia/replication/sql/action/todb/ReplicateToDbActionRequest.java new file mode 100644 index 0000000..007120b --- /dev/null +++ b/topia-service-replication/src/main/java/org/nuiton/topia/replication/sql/action/todb/ReplicateToDbActionRequest.java @@ -0,0 +1,25 @@ +package org.nuiton.topia.replication.sql.action.todb; + +import org.nuiton.topia.persistence.TopiaApplicationContext; +import org.nuiton.topia.replication.sql.action.ReplicationActionRequestSupport; + +/** + * Request to perform a {@link ReplicateToDbAction}. + * + * Created on 28/12/15. + * + * @author Tony Chemit - chemit@codelutin.com + * @since 3.0.1 + */ +public class ReplicateToDbActionRequest extends ReplicationActionRequestSupport { + + protected TopiaApplicationContext targetTopiaApplicationContext; + + public TopiaApplicationContext getTargetTopiaApplicationContext() { + return targetTopiaApplicationContext; + } + + protected void setTargetTopiaApplicationContext(TopiaApplicationContext targetTopiaApplicationContext) { + this.targetTopiaApplicationContext = targetTopiaApplicationContext; + } +} diff --git a/topia-service-replication/src/main/java/org/nuiton/topia/replication/sql/action/todb/package-info.java b/topia-service-replication/src/main/java/org/nuiton/topia/replication/sql/action/todb/package-info.java new file mode 100644 index 0000000..023f309 --- /dev/null +++ b/topia-service-replication/src/main/java/org/nuiton/topia/replication/sql/action/todb/package-info.java @@ -0,0 +1,9 @@ +/** + * <h1>Réplication vers une base de données gérée par ToPIA</h1> + * + * Created on 30/12/15. + * + * @author Tony Chemit - chemit@codelutin.com + * @since 3.0.1 + */ +package org.nuiton.topia.replication.sql.action.todb; \ No newline at end of file diff --git a/topia-service-replication/src/main/java/org/nuiton/topia/replication/sql/action/tosql/ReplicateToSqlAction.java b/topia-service-replication/src/main/java/org/nuiton/topia/replication/sql/action/tosql/ReplicateToSqlAction.java new file mode 100644 index 0000000..885d9c6 --- /dev/null +++ b/topia-service-replication/src/main/java/org/nuiton/topia/replication/sql/action/tosql/ReplicateToSqlAction.java @@ -0,0 +1,199 @@ +package org.nuiton.topia.replication.sql.action.tosql; + +import org.apache.commons.io.output.ByteArrayOutputStream; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.nuiton.topia.persistence.support.TopiaSqlWork; +import org.nuiton.topia.replication.sql.action.ReplicationActionSupport; +import org.nuiton.topia.replication.sql.table.TopiaReplicationTable; +import org.nuiton.topia.replication.sql.table.TopiaReplicationTables; + +import javax.sql.rowset.serial.SerialBlob; +import java.io.IOException; +import java.io.Writer; +import java.sql.Blob; +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.ResultSetMetaData; +import java.sql.SQLException; +import java.sql.Types; + +/** + * Action to replicate some tables into a sql script. + * + * Created on 29/12/15. + * + * @author Tony Chemit - chemit@codelutin.com + * @since 3.0.1 + */ +public class ReplicateToSqlAction extends ReplicationActionSupport<ReplicateToSqlActionRequest> { + + /** Logger. */ + private static final Log log = LogFactory.getLog(ReplicateToSqlAction.class); + + protected static void flush(Writer writer) { + try { + writer.flush(); + } catch (IOException e) { + throw new RuntimeException("Could not flush writer", e); + } + } + + protected ReplicateToSqlAction(ReplicateToSqlActionRequest request) { + super(request); + } + + @Override + protected void after(TopiaReplicationTables tables) { + + super.after(tables); + + ReplicateToSqlAction.flush(request.getWriter()); + + } + + @Override + protected void executeOnTable(TopiaReplicationTable table) { + + if (log.isInfoEnabled()) { + log.info("Replicate: " + table.getFullyTableName()); + } + + ReadSqlWork sqlWork = new ReadSqlWork(table); + executeSqlWork(sqlWork); + + } + + protected class ReadSqlWork implements TopiaSqlWork { + + private final TopiaReplicationTable table; + + public ReadSqlWork(TopiaReplicationTable table) { + + this.table = table; + } + + @Override + public void execute(Connection connection) throws SQLException { + + try (PreparedStatement readStatement = createReadStatement(table, connection)) { + + readStatement.execute(); + + ResultSet readResultSet = readStatement.getResultSet(); + + ResultSetMetaData readResultSetMetaData = readResultSet.getMetaData(); + int columnCount = readResultSetMetaData.getColumnCount(); + Class[] types = computeTypes(readResultSetMetaData, columnCount); + + String writeStatement = createWriteStatement(table, readResultSetMetaData); + + int fetchSize = request.getFetchSize(); + String tableName = table.getFullyTableName(); + + Writer writer = request.getWriter(); + long index = 0; + while (readResultSet.next()) { + + try { + copyRow(readResultSet, writeStatement, types, writer, columnCount); + } catch (IOException e) { + throw new RuntimeException("Could not copyRow", e); + } + + if ((++index % fetchSize) == 0) { + + flush(writer, tableName, index); + + } + } + + flush(writer, tableName, index); + + } + + } + + protected void copyRow(ResultSet readResultSet, String writeStatement, Class[] types, Writer writer, int columnCount) throws SQLException, IOException { + + if (log.isTraceEnabled()) { + log.trace("Copy " + readResultSet.getString(1)); + } + + String statement = ""; + + for (int i = 1; i <= columnCount; i++) { + Class columnType = types[i - 1]; + if (String.class.equals(columnType)) { + String stringValue = readResultSet.getString(i); + if (stringValue == null) { + statement += ", NULL"; + } else { + statement += ", '" + stringValue + "'"; + } + } else if (Blob.class.equals(columnType)) { + Blob blob = readResultSet.getBlob(i); + if (blob == null) { + statement += ", NULL"; + } else { + + SerialBlob serialBlob = new SerialBlob(blob); + try (ByteArrayOutputStream stringWriter = new ByteArrayOutputStream((int) serialBlob.length())) { + stringWriter.write(serialBlob.getBinaryStream()); + statement += ", '" + new String(stringWriter.toByteArray()) + "'"; + } + + } + } else { + statement += ", " + readResultSet.getObject(i); + } + } + + statement = String.format(writeStatement, statement.substring(2)); + + if (log.isTraceEnabled()) { + log.trace("Write: " + statement.trim()); + } + + try { + writer.append(statement); + } catch (IOException e) { + throw new RuntimeException("Could not append to writer", e); + } + + } + + protected void flush(Writer writer, String tableName, long index) { + + if (log.isDebugEnabled()) { + log.debug("Flush for : " + tableName + " (size: " + index + ")"); + } + + ReplicateToSqlAction.flush(writer); + + } + + protected Class[] computeTypes(ResultSetMetaData readResultSetMetaData, int columnCount) throws SQLException { + Class[] result = new Class[columnCount]; + for (int i = 1; i <= columnCount; i++) { + int columnType = readResultSetMetaData.getColumnType(i); + + switch (columnType) { + case Types.VARCHAR: + case Types.NVARCHAR: + case Types.LONGVARCHAR: + case Types.LONGNVARCHAR: + result[i - 1] = String.class; + break; + case Types.BLOB: + result[i - 1] = Blob.class; + default: + result[i - 1] = Object.class; + } + } + return result; + } + } + +} diff --git a/topia-service-replication/src/main/java/org/nuiton/topia/replication/sql/action/tosql/ReplicateToSqlActionBuilder.java b/topia-service-replication/src/main/java/org/nuiton/topia/replication/sql/action/tosql/ReplicateToSqlActionBuilder.java new file mode 100644 index 0000000..1984fae --- /dev/null +++ b/topia-service-replication/src/main/java/org/nuiton/topia/replication/sql/action/tosql/ReplicateToSqlActionBuilder.java @@ -0,0 +1,35 @@ +package org.nuiton.topia.replication.sql.action.tosql; + +import com.google.common.base.Preconditions; +import org.nuiton.topia.replication.sql.action.ReplicateActionBuilderSupport; + +import java.io.Writer; + +/** + * Builder of {@link ReplicateToSqlAction}. + * + * Created on 29/12/15. + * + * @author Tony Chemit - chemit@codelutin.com + * @since 3.0.1 + */ +public class ReplicateToSqlActionBuilder extends ReplicateActionBuilderSupport<ReplicateToSqlActionBuilder, ReplicateToSqlActionRequest, ReplicateToSqlAction> { + + public ReplicateToSqlActionBuilder() { + super(new ReplicateToSqlActionRequest()); + } + + public ReplicateToSqlActionBuilder to(Writer writer) { + replicationRequest.setWriter(writer); + return this; + } + + @Override + public ReplicateToSqlAction build() { + Preconditions.checkState(replicationRequest.getSourceTopiaApplicationContext() != null, "No sourceTopiaApplicationContext defined"); + Preconditions.checkState(replicationRequest.getWriter() != null, "No writer defined"); + ReplicateToSqlAction action = new ReplicateToSqlAction(replicationRequest); + return action; + } + +} diff --git a/topia-service-replication/src/main/java/org/nuiton/topia/replication/sql/action/tosql/ReplicateToSqlActionRequest.java b/topia-service-replication/src/main/java/org/nuiton/topia/replication/sql/action/tosql/ReplicateToSqlActionRequest.java new file mode 100644 index 0000000..89a6b7b --- /dev/null +++ b/topia-service-replication/src/main/java/org/nuiton/topia/replication/sql/action/tosql/ReplicateToSqlActionRequest.java @@ -0,0 +1,26 @@ +package org.nuiton.topia.replication.sql.action.tosql; + +import org.nuiton.topia.replication.sql.action.ReplicationActionRequestSupport; + +import java.io.Writer; + +/** + * Request to perform a {@link ReplicateToSqlAction}. + * + * Created on 28/12/15. + * + * @author Tony Chemit - chemit@codelutin.com + * @since 3.0.1 + */ +public class ReplicateToSqlActionRequest extends ReplicationActionRequestSupport { + + protected Writer writer; + + public Writer getWriter() { + return writer; + } + + protected void setWriter(Writer writer) { + this.writer = writer; + } +} diff --git a/topia-service-replication/src/main/java/org/nuiton/topia/replication/sql/action/tosql/package-info.java b/topia-service-replication/src/main/java/org/nuiton/topia/replication/sql/action/tosql/package-info.java new file mode 100644 index 0000000..5c42a8f --- /dev/null +++ b/topia-service-replication/src/main/java/org/nuiton/topia/replication/sql/action/tosql/package-info.java @@ -0,0 +1,9 @@ +/** + * <h1>Réplication vers un fichier sql</h1> + * + * Created on 30/12/15. + * + * @author Tony Chemit - chemit@codelutin.com + * @since 3.0.1 + */ +package org.nuiton.topia.replication.sql.action.tosql; \ No newline at end of file diff --git a/topia-service-replication/src/main/java/org/nuiton/topia/replication/sql/package-info.java b/topia-service-replication/src/main/java/org/nuiton/topia/replication/sql/package-info.java new file mode 100644 index 0000000..51904a7 --- /dev/null +++ b/topia-service-replication/src/main/java/org/nuiton/topia/replication/sql/package-info.java @@ -0,0 +1,20 @@ +/** + * <h1>Service de réplication performant</h1> + * + * Ce nouveau service remplacera à terme le service de réplication basé sur la réplication hibernate. + * + * Il est beaucoup plus performant en terme de temps d'exécution. + * + * <h2>Les réplications possibles</h2> + * + * <p>On définit deux modes de réplication :</p> + * + * <ul> + * <li>La réplication vers une autre base gérée par ToPIA {@link org.nuiton.topia.replication.sql.action.todb}</li> + * <li>La réplication vers un fichier sql {@link org.nuiton.topia.replication.sql.action.tosql}</li> + * </ul> + * + * @author Tony Chemit - chemit@codelutin.com + * @since 3.0.1 + */ +package org.nuiton.topia.replication.sql; \ No newline at end of file diff --git a/topia-service-replication/src/main/java/org/nuiton/topia/replication/sql/table/TopiaReplicationTable.java b/topia-service-replication/src/main/java/org/nuiton/topia/replication/sql/table/TopiaReplicationTable.java new file mode 100644 index 0000000..6a7aa36 --- /dev/null +++ b/topia-service-replication/src/main/java/org/nuiton/topia/replication/sql/table/TopiaReplicationTable.java @@ -0,0 +1,79 @@ +package org.nuiton.topia.replication.sql.table; + +import com.google.common.collect.ImmutableSet; + +/** + * Created on 30/12/15. + * + * @author Tony Chemit - chemit@codelutin.com + */ +public class TopiaReplicationTable { + + /** + * Table schema name. + */ + protected final String schemaName; + + /** + * Table name. + */ + protected final String tableName; + + /** + * Fully table name (including the schema name). + */ + protected final String fullyTableName; + + /** + * From clause. + */ + protected final String fromClause; + + /** + * Where clause. + */ + protected final String whereClause; + + /** + * Join clauses. + */ + protected final ImmutableSet<String> joinClauses; + + public TopiaReplicationTable(String schemaName, + String tableName, + String fromClause, + String whereClause, + ImmutableSet<String> joinClauses) { + this.schemaName = schemaName.toLowerCase(); + this.tableName = tableName.toLowerCase(); + this.fullyTableName = this.schemaName + "." + this.tableName; + this.fromClause = fromClause; + this.whereClause = whereClause; + this.joinClauses = joinClauses; + } + + public String getSchemaName() { + return schemaName; + } + + public String getTableName() { + return tableName; + } + + public String getFullyTableName() { + return fullyTableName; + } + + public String getFromClause() { + return fromClause; + } + + public String getWhereClause() { + return whereClause; + } + + public ImmutableSet<String> getJoinClauses() { + return joinClauses; + } + +} diff --git a/topia-service-replication/src/main/java/org/nuiton/topia/replication/sql/table/TopiaReplicationTables.java b/topia-service-replication/src/main/java/org/nuiton/topia/replication/sql/table/TopiaReplicationTables.java new file mode 100644 index 0000000..5d55cff --- /dev/null +++ b/topia-service-replication/src/main/java/org/nuiton/topia/replication/sql/table/TopiaReplicationTables.java @@ -0,0 +1,36 @@ +package org.nuiton.topia.replication.sql.table; + +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; + +import java.util.Iterator; + +/** + * Define a ordered set of {@link TopiaReplicationTable}. + * + * Created on 30/12/15. + * + * @author Tony Chemit - chemit@codelutin.com + */ +public class TopiaReplicationTables implements Iterable<TopiaReplicationTable> { + + protected final ImmutableMap<String, TopiaReplicationTable> tablesByFullyTableName; + + protected final ImmutableSet<TopiaReplicationTable> orderedTables; + + public TopiaReplicationTables(ImmutableMap<String, TopiaReplicationTable> tablesByFullyTableName, + ImmutableSet<TopiaReplicationTable> orderedTables) { + this.tablesByFullyTableName = tablesByFullyTableName; + this.orderedTables = orderedTables; + } + + public TopiaReplicationTable getTable(String key) { + return tablesByFullyTableName.get(key); + } + + @Override + public Iterator<TopiaReplicationTable> iterator() { + return orderedTables.iterator(); + } + +} diff --git a/topia-service-replication/src/main/java/org/nuiton/topia/replication/sql/table/TopiaReplicationTablesBuilder.java b/topia-service-replication/src/main/java/org/nuiton/topia/replication/sql/table/TopiaReplicationTablesBuilder.java new file mode 100644 index 0000000..827a7f8 --- /dev/null +++ b/topia-service-replication/src/main/java/org/nuiton/topia/replication/sql/table/TopiaReplicationTablesBuilder.java @@ -0,0 +1,227 @@ +package org.nuiton.topia.replication.sql.table; + +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Lists; +import org.nuiton.topia.persistence.TopiaEntityEnum; + +import java.util.Collections; +import java.util.List; +import java.util.TreeMap; + +/** + * To build a {@link TopiaReplicationTables}. + * + * Created on 30/12/15. + * + * @author Tony Chemit - chemit@codelutin.com + */ +public class TopiaReplicationTablesBuilder { + + protected final TreeMap<String, TopiaReplicationTable> tablesByFullyTableName; + + protected final TreeMap<Integer, TopiaReplicationTable> tablesByOrder; + + protected int internalOrder; + + protected final boolean withId; + + public static TopiaReplicationTablesBuilder builder(boolean withId) { + return new TopiaReplicationTablesBuilder(withId); + } + + public TopiaReplicationTables build() { + + List<Integer> orders = Lists.newArrayList(tablesByOrder.keySet()); + Collections.sort(orders); + ImmutableSet.Builder<TopiaReplicationTable> orderedTablesBuilder = ImmutableSet.builder(); + for (Integer order : orders) { + orderedTablesBuilder.add(tablesByOrder.get(order)); + } + + return new TopiaReplicationTables(ImmutableMap.copyOf(tablesByFullyTableName), + orderedTablesBuilder.build()); + } + + public TopiaReplicationTablesBuilder addTopTable(TopiaEntityEnum entityEnum) { + + String schemaName = entityEnum.dbSchemaName().toLowerCase(); + String tableName = entityEnum.dbTableName().toLowerCase(); + String whereClause = tableName + ".topiaid = ?"; + String fromClause = schemaName + "." + tableName + " " + tableName; + + registerTable(schemaName, + tableName, + whereClause, + fromClause, + ImmutableSet.<String>of()); + + return this; + } + + public TopiaReplicationTablesBuilder addTopJoinTable(TopiaEntityEnum parentEntityEnum, TopiaEntityEnum entityEnum) { + + TopiaReplicationTable parentTable = getTable(parentEntityEnum); + + String schemaName = entityEnum.dbSchemaName().toLowerCase(); + String tableName = entityEnum.dbTableName().toLowerCase(); + String parentTableName = parentTable.getTableName(); + String whereClause = tableName + "." + parentTableName + " = ?"; + String fromClause = schemaName + "." + tableName + " " + tableName; + + return registerTable(schemaName, + tableName, + whereClause, + fromClause, + ImmutableSet.<String>of()); + + } + + public TopiaReplicationTablesBuilder addJoinTable(TopiaEntityEnum parentEntityEnum, TopiaEntityEnum entityEnum) { + + TopiaReplicationTable parentTable = getTable(parentEntityEnum); + + String schemaName = entityEnum.dbSchemaName().toLowerCase(); + String tableName = entityEnum.dbTableName().toLowerCase(); + String whereClause = parentTable.getWhereClause(); + String fromClause = parentTable.getFromClause(); + + String parentTableName = parentTable.getTableName(); + String joinClause = " INNER JOIN " + schemaName + "." + tableName + " " + tableName + " ON " + tableName + "." + parentTableName + " = " + parentTableName + ".topiaId"; + + ImmutableSet<String> joinClauses = addJoinCause(parentTable.getJoinClauses(), joinClause); + + return registerTable(schemaName, + tableName, + whereClause, + fromClause, + joinClauses); + + } + + public TopiaReplicationTablesBuilder addInversedJoinTable(TopiaEntityEnum parentEntityEnum, TopiaEntityEnum entityEnum) { + + TopiaReplicationTable parentTable = getTable(parentEntityEnum); + + String schemaName = entityEnum.dbSchemaName().toLowerCase(); + String tableName = entityEnum.dbTableName().toLowerCase(); + String whereClause = parentTable.getWhereClause(); + String fromClause = parentTable.getFromClause(); + + String parentTableName = parentTable.getTableName(); + String joinClause = " INNER JOIN " + schemaName + "." + tableName + " " + tableName + " ON " + tableName + ".topiaId = " + parentTableName + "." + tableName; + + ImmutableSet<String> joinClauses = addJoinCause(parentTable.getJoinClauses(), joinClause); + + return registerTable( + schemaName, + tableName, + whereClause, + fromClause, + joinClauses); + + } + + public TopiaReplicationTablesBuilder addAssociationTable(TopiaEntityEnum parentEntityEnum, String associationName, boolean addInnerJoin) { + + TopiaReplicationTable parentTable = getTable(parentEntityEnum); + + String schemaName = parentEntityEnum.dbSchemaName().toLowerCase(); + String tableName = getAssociationTableName(associationName.toLowerCase(), parentTable.getTableName()); + String whereClause = parentTable.getWhereClause(); + String fromClause = parentTable.getFromClause(); + if (parentTable.getJoinClauses().isEmpty()) { + fromClause = schemaName + "." + tableName; + } + ImmutableSet<String> joinClauses; + + if (addInnerJoin) { + + String parentTableName = parentTable.getTableName(); + String joinClause = " INNER JOIN " + schemaName + "." + tableName + " " + tableName + " ON " + tableName + "." + parentTableName + " = " + parentTableName + ".topiaId"; + joinClauses = addJoinCause(parentTable.getJoinClauses(), joinClause); + + } else { + joinClauses = parentTable.getJoinClauses(); + } + + return registerTable(schemaName, + tableName, + whereClause, + fromClause, + joinClauses); + } + + public TopiaReplicationTablesBuilder orderPlusOne() { + return incrementsOrder(1); + } + + public TopiaReplicationTablesBuilder orderMinusTwo() { + return decrementsOrder(2); + } + + public TopiaReplicationTablesBuilder incrementsOrder(int inc) { + internalOrder += inc; + return this; + } + + public TopiaReplicationTablesBuilder decrementsOrder(int dec) { + internalOrder -= dec; + return this; + } + + protected ImmutableSet<String> addJoinCause(ImmutableSet<String> joinClauses, String joinClause) { + return ImmutableSet + .<String>builder() + .addAll(joinClauses) + .add(joinClause) + .build(); + } + + protected TopiaReplicationTablesBuilder registerTable(String schemaName, + String tableName, + String whereClause, + String fromClause, + ImmutableSet<String> joinClauses) { + TopiaReplicationTable table = new TopiaReplicationTable( + schemaName, + tableName, + fromClause, withId ? whereClause : null, + joinClauses); + + tablesByFullyTableName.put(table.getFullyTableName(), table); + tablesByOrder.put(internalOrder++, table); + return this; + } + + protected TopiaReplicationTable getTable(String key) { + return tablesByFullyTableName.get(key); + } + + protected TopiaReplicationTable getTable(TopiaEntityEnum entityEnum) { + String key = getFullyTableName(entityEnum); + return tablesByFullyTableName.get(key); + } + + protected String getFullyTableName(TopiaEntityEnum entityEnum) { + String fullyTableName = entityEnum.dbSchemaName().toLowerCase() + "." + entityEnum.dbTableName().toLowerCase(); + return fullyTableName; + } + + protected TopiaReplicationTablesBuilder(boolean withId) { + this.withId = withId; + this.tablesByFullyTableName = new TreeMap<>(); + this.tablesByOrder = new TreeMap<>(); + } + + protected String getAssociationTableName(String tableName, String parentTableName) { + String associationTableName; + if (tableName.compareTo(parentTableName) < 0) { + associationTableName = tableName + "_" + parentTableName; + } else { + associationTableName = parentTableName + "_" + tableName; + } + return associationTableName; + } + +} diff --git a/topia-service-replication/src/main/java/org/nuiton/topia/replication/sql/table/package-info.java b/topia-service-replication/src/main/java/org/nuiton/topia/replication/sql/table/package-info.java new file mode 100644 index 0000000..89e7d08 --- /dev/null +++ b/topia-service-replication/src/main/java/org/nuiton/topia/replication/sql/table/package-info.java @@ -0,0 +1,9 @@ +/** + * <h1>Définition des tables à répliquer</h1> + * + * Created on 30/12/15. + * + * @author Tony Chemit - chemit@codelutin.com + * @since 3.0.1 + */ +package org.nuiton.topia.replication.sql.table; \ No newline at end of file -- To stop receiving notification emails like this one, please contact nuiton.org SCM administrator <admin+scm@nuiton.org>.
participants (1)
-
nuiton.org scm