Menu

Multi Processing Part 2: How to make Migrate move

John Ennew

Nov 08 2013

Make migrate move

In the last blog post we looked at a multi threading handler for Drush. In this post we look at a practical use for the handler in making Drupal Migrate run faster.

Performance improvements via this method

The image above is a graph of the time taken for migrating 5,000 records as processes vary. With one concurrent process, this migration took 2.5 hours, with four concurrent processes this came down to 40 minutes and with eight concurrent processes it was down to 25 minutes an improvement of over 500%. The database was backed up before the first run and restored between each migration to give a fair comparison between the migrations, the only variance was the number of processes. The records cover a range of migration operations, with nodes being created, deleted and linked and with updating of apache solr indexes or the flushing of varnish caches. This was done using version 7.x-2.6-rc1 of the Migrate module. This version of migrate is a requirement of the tutorial which follows.

How to do this

This tutorial assumes you have a working migration setup using Drupal Migrate. An example migration is provided for the purposes of this tutorial called mtm_example and is available for download on github. You will also need a 7.x-2.6 branch version of Migrate module. This tutorial was written with 7.x-2.6-rc1. You are probably aware that if you try and run Drupal Migrate twice for the same migration you'll get the following error: There is already an active process on MyMigration A migration instance can only be run once. Migrate does however have a feature to create Migration instances dynamically. If we were to do this before a thread starts, then that thread would have a single Migration instance. Once the thread finishes, the dynamic instance can be removed. You'll need to setup your migrate class in such a way that it can take as arguments a limit and offset of the total migration effort. Let's see what this looks like.

1. Setting up migrate for concurrent Migrations

You'll need to be able to segment your migration into chunks. One way to do this is by having a limit and offset variable which you pass to your migration class via arguments as shown in a greatly cut down Migrate constructor below.

 class MTMExample extends Migration { 
 public function __construct($args) { 
 parent::__construct($args);
 $limit = empty($args['limit']) ? 100 : $args['limit'];
 $offset = empty($args['offset']) ? 0 : $args['offset'];
 }
} 
,

How you use these values is then up to you, but as you will see later, each thread will create instances of this class with different offsets allowing each separate migration instance to work on a small part of the total migration. When we create our dynamic migrations we will want to ensure that they are all using the same mapping table and not a different mapping table for each migration. When you declare a mapping make sure you do not use the machine name but some shared key. For example, in your migrate constructor you might include something like:-

 $this->map = new MigrateSQLMap('mtm_example', array(
 'reference' => array(
 'type' => 'varchar', 
 'length' => 255,
 'not null' => TRUE,
 'description' => 'Unique content reference',
 'alias' => 'o',
 ),
), MigrateDestinationNode::getKeySchema() );

How you use these values is then up to you, but as you will see later, each thread will create instances of this class with different offsets allowing each separate migration instance to work on a small part of the total migration. When we create our dynamic migrations we will want to ensure that they are all using the same mapping table and not a different mapping table for each migration. When you declare a mapping make sure you do not use the machine name but some shared key. For example, in your migrate constructor you might include something like:-

 $this->map = new MigrateSQLMap('mtm_example', array(
 'reference' => array(
 'type' => 'varchar', 
 'length' => 255,
 'not null' => TRUE,
 'description' => 'Unique content reference',
 'alias' => 'o',
 ),
), MigrateDestinationNode::getKeySchema() );
,

2. Making the multi threaded Drush script

As with the previous blog post, you will need to have the mt.drush.inc file in sites/all/drush. You will also need the mtm.drush.inc script from the same repository there as well. The mtm.drush.inc file provides a general purpose multi threading migrate drush command which can be invoked with: drush mtm-import MTMExample 10 10 1 This will run a migration with the MTMExample Migration class, importing 10 items in batches of 10 with 1 thread. To import 10 items with 2 processes you can then run: drush mtm-import MTMExample 10 5 2 Lets look at how the mtm.drush.inc file works. For your purposes, this command may be sufficient - otherwise you will need to use it as the starting point for your own work.

 /**
 * Implementation of hook_drush_command().
 */
function mtm_drush_command() { 
 $items = array();
 $items['mtm-import'] = array(
 'description' => 'Multi thread migration',
 'arguments' => array(
 'class' => 'The name of the migration base class',
 'limit' => 'Total number of jobs to migrate - use 0 for all.',
 'batch_size' => 'Number of jobs each thread will work on.',
 'threads' => 'Number of threads',
 ),
 'options' => array(
 'offset' => 'A starting offset should you want to start 1000 records in',
 ),
 ); 
 $items['mtm-migrate-cleanup'] = array(
 'description' => 'Clean up old mt migrations',
 ); 
 return $items; 
}

This shows we are going to define two new Drush commands. One is the multi threaded migration and the other one is a cleanup command, needed if a migration fails partway through. 

/**
 * Multi threaded import.
 */
function drush_mtm_import($class, $limit = 10, $batch_size = 10, $threads = 1) { 
 try { 
 $GLOBALS['mtm-migrate-baseclass'] = $class; 
 $GLOBALS['mtm-migrate-runtime'] = time(); 
 $starting_offset = drush_get_option('offset', 0); 
 drush_thread_manager($limit, $batch_size, $threads, '_mtm_thread_setup', '_mtm_thread_teardown', $starting_offset);
 } catch (Exception $e) { 
 drush_set_error($e->getMessage()); 
 }
}

This is the main migration start command. This ties together the process setup and teardown methods with the multi threaded handler.

/**
 * Create a sub migration and the drush command to execute it. 
 * 
 * @param int $thread_id 
 * A thread identifier which is managing this migration. 
 * @param int $limit 
 * Total number of items to migrate with this class.
 * @param int $offset 
 * Offset to start the migration at. 
 *
 * @return string
 * A command to run. 
*/
function _mtm_thread_setup($thread_id, $limit, $offset) { 
 $class_name = $GLOBALS['mtm-migrate-baseclass']; 
 $run_time = $GLOBALS['mtm-migrate-runtime'];
 $machine_name = _mtm_migrate_generate_machine_name($class_name, $thread_id, $run_time); 
 MigrationBase::registerMigration( $class_name, $machine_name, array(
 'limit' => $limit, 
 'offset' => $offset, 
 'machine_name' => $machine_name, 
 )); 
 $site_record = drush_sitealias_get_record('@self'); 
 $drush_command_path = drush_build_drush_command(); 
 $command_options = _drush_backend_get_global_contexts($site_record); 
 $command_options[] = '--update'; 
 $command = 'migrate-import'; 
 $args = array($machine_name); 
 $cmd = _drush_backend_generate_command($site_record, $drush_command_path . " " . $command, $args, $command_options) . ' 2>&1'; 
 return $cmd;
}

/**
 * Get the machine name for a migration based off the thread id. 
 *
 * @return string
 * The machine name for the thread based on the current run 
 * time, thread_id and name of the base migration. 
 */
 function _mtm_migrate_generate_machine_name($class_name, $thread_id, $run_time) { 
 return $class_name . '_' . $thread_id . '_' . $run_time;
 } 

2. Making the multi threaded Drush script

As with the previous blog post, you will need to have the mt.drush.inc file in sites/all/drush. You will also need the mtm.drush.inc script from the same repository there as well. The mtm.drush.inc file provides a general purpose multi threading migrate drush command which can be invoked with: drush mtm-import MTMExample 10 10 1 This will run a migration with the MTMExample Migration class, importing 10 items in batches of 10 with 1 thread. To import 10 items with 2 processes you can then run: drush mtm-import MTMExample 10 5 2 Lets look at how the mtm.drush.inc file works. For your purposes, this command may be sufficient - otherwise you will need to use it as the starting point for your own work.

 /**
 * Implementation of hook_drush_command().
 */
function mtm_drush_command() { 
 $items = array();
 $items['mtm-import'] = array(
 'description' => 'Multi thread migration',
 'arguments' => array(
 'class' => 'The name of the migration base class',
 'limit' => 'Total number of jobs to migrate - use 0 for all.',
 'batch_size' => 'Number of jobs each thread will work on.',
 'threads' => 'Number of threads',
 ),
 'options' => array(
 'offset' => 'A starting offset should you want to start 1000 records in',
 ),
 ); 
 $items['mtm-migrate-cleanup'] = array(
 'description' => 'Clean up old mt migrations',
 ); 
 return $items; 
}

This shows we are going to define two new Drush commands. One is the multi threaded migration and the other one is a cleanup command, needed if a migration fails partway through. 

/**
 * Multi threaded import.
 */
function drush_mtm_import($class, $limit = 10, $batch_size = 10, $threads = 1) { 
 try { 
 $GLOBALS['mtm-migrate-baseclass'] = $class; 
 $GLOBALS['mtm-migrate-runtime'] = time(); 
 $starting_offset = drush_get_option('offset', 0); 
 drush_thread_manager($limit, $batch_size, $threads, '_mtm_thread_setup', '_mtm_thread_teardown', $starting_offset);
 } catch (Exception $e) { 
 drush_set_error($e->getMessage()); 
 }
}

This is the main migration start command. This ties together the process setup and teardown methods with the multi threaded handler.

/**
 * Create a sub migration and the drush command to execute it. 
 * 
 * @param int $thread_id 
 * A thread identifier which is managing this migration. 
 * @param int $limit 
 * Total number of items to migrate with this class.
 * @param int $offset 
 * Offset to start the migration at. 
 *
 * @return string
 * A command to run. 
*/
function _mtm_thread_setup($thread_id, $limit, $offset) { 
 $class_name = $GLOBALS['mtm-migrate-baseclass']; 
 $run_time = $GLOBALS['mtm-migrate-runtime'];
 $machine_name = _mtm_migrate_generate_machine_name($class_name, $thread_id, $run_time); 
 MigrationBase::registerMigration( $class_name, $machine_name, array(
 'limit' => $limit, 
 'offset' => $offset, 
 'machine_name' => $machine_name, 
 )); 
 $site_record = drush_sitealias_get_record('@self'); 
 $drush_command_path = drush_build_drush_command(); 
 $command_options = _drush_backend_get_global_contexts($site_record); 
 $command_options[] = '--update'; 
 $command = 'migrate-import'; 
 $args = array($machine_name); 
 $cmd = _drush_backend_generate_command($site_record, $drush_command_path . " " . $command, $args, $command_options) . ' 2>&1'; 
 return $cmd;
}

/**
 * Get the machine name for a migration based off the thread id. 
 *
 * @return string
 * The machine name for the thread based on the current run 
 * time, thread_id and name of the base migration. 
 */
 function _mtm_migrate_generate_machine_name($class_name, $thread_id, $run_time) { 
 return $class_name . '_' . $thread_id . '_' . $run_time;
 } 
,

Here we see the thread setup function which generates a dynamic migration based on the original MyMigration base class with a limit and an offset. The machine name for this migration is based on the base class, thread id and the start time of the migration. The result of the setup function is a string which is the drush command to execute the dynamically created migration using the standard drush migrate-import command. In the registerMigration function, the third argument is the array of args which will be passed to your constructor. In the example we are feeding through the offset and limit but you might need to pass other variables in here depending on your migration.

 /**
 * Teardown function when a thread finishes. 
 *
 * @param int $thread_id 
 * The id of the thread.
 */ 
function _mtm_thread_teardown($thread_id) { 
 $class_name = $GLOBALS['mtm-migrate-baseclass'];
 $run_time = $GLOBALS['mtm-migrate-runtime']; 
 $machine_name = _mtm_migrate_generate_machine_name($class_name, $thread_id, $run_time); 
 _mtm_migrate_destroy_chuncked_migrations(array($machine_name => TRUE));
}

/**
 * Deregister a list of migrations at the end of the process.
 *
 * @param array $migrations 
 * An array of migration objects to be deregistered. 
 */ 
function _mtm_migrate_destroy_chuncked_migrations($migrations) { 
 foreach ($migrations as $machine_name => $migration) { 
 try { 
 MigrationBase::deregisterMigration($machine_name); 
 } catch (Exception $e) { 
 drush_set_error($e->getMessage()); 
 } 
 } 
} 

Here we see the teardown function which simply calls a helper function to deregister the dynamic migration which has just finished. 

/**
 * Remove old dynamically created migrations.
 */
function drush_mtm_migrate_cleanup() { 
 $removals = array(); 
 $migration_objects = migrate_migrations(); 
 foreach ($migration_objects as $machine_name => $migration) { 
 if (preg_match('/[^_]+_\d+_\d+/i', $machine_name)) { 
 $removals[$machine_name] = $migration; 
 } 
 }
 _mtm_migrate_destroy_chuncked_migrations($removals); 
} 

Here we see the thread setup function which generates a dynamic migration based on the original MyMigration base class with a limit and an offset. The machine name for this migration is based on the base class, thread id and the start time of the migration. The result of the setup function is a string which is the drush command to execute the dynamically created migration using the standard drush migrate-import command. In the registerMigration function, the third argument is the array of args which will be passed to your constructor. In the example we are feeding through the offset and limit but you might need to pass other variables in here depending on your migration.

 /**
 * Teardown function when a thread finishes. 
 *
 * @param int $thread_id 
 * The id of the thread.
 */ 
function _mtm_thread_teardown($thread_id) { 
 $class_name = $GLOBALS['mtm-migrate-baseclass'];
 $run_time = $GLOBALS['mtm-migrate-runtime']; 
 $machine_name = _mtm_migrate_generate_machine_name($class_name, $thread_id, $run_time); 
 _mtm_migrate_destroy_chuncked_migrations(array($machine_name => TRUE));
}

/**
 * Deregister a list of migrations at the end of the process.
 *
 * @param array $migrations 
 * An array of migration objects to be deregistered. 
 */ 
function _mtm_migrate_destroy_chuncked_migrations($migrations) { 
 foreach ($migrations as $machine_name => $migration) { 
 try { 
 MigrationBase::deregisterMigration($machine_name); 
 } catch (Exception $e) { 
 drush_set_error($e->getMessage()); 
 } 
 } 
} 

Here we see the teardown function which simply calls a helper function to deregister the dynamic migration which has just finished. 

/**
 * Remove old dynamically created migrations.
 */
function drush_mtm_migrate_cleanup() { 
 $removals = array(); 
 $migration_objects = migrate_migrations(); 
 foreach ($migration_objects as $machine_name => $migration) { 
 if (preg_match('/[^_]+_\d+_\d+/i', $machine_name)) { 
 $removals[$machine_name] = $migration; 
 } 
 }
 _mtm_migrate_destroy_chuncked_migrations($removals); 
} 
,

The last thing to look at with the drush scripts is the cleanup command which deregisters all dynamic migrations created by the multi process drush command. It is only necessary to run this should a migration be cancelled part way through.

3. Running the multi threaded migration

We are now ready to run our first multi threaded migration. For starters, try importing 1000 items in batches of 100 with one thread to see how it runs. Time it to get a base migration time. drush mtm-import MTMExample 1000 100 1 Next try ramping up the threads and timing each time drush mtm-import MTMExample 1000 100 2 drush mtm-import MTMExample 1000 100 4 Don't go crazy with the number of threads. Depending on your hardware and migration, performance will gradually improve to a point before the management of multiple threads becomes the bottle neck in the system. Adding additional threads past this point will decrease performance. You should also be monitoring the overall responsiveness of the server as well, if you are migrating into a live environment then this will have an impact on the performance of your website. It may kill your kittens.

4. Dealing with concurrency issues

Eventually you'll be running with so many threads you'll hit some interesting concurrency issues. The next section describes how to protect against them and further improve performance. A typical error you'll encounter in the watchdog logs is: SQLSTATE[42000]: Syntax error or access violation: 1305 SAVEPOINT savepoint_1 does not exist SQLSTATE[40001]: Serialization failure: 1213 Deadlock found when trying to get lock; try restarting transaction

MySQL tuning

To solve these, you can use the following variable setting to enable MySQL to prevent some update operations from locking database access. Add this setting to your MySQL configuration file (/etc/my.cnf) in the [mysqld] section. innodb_locks_unsafe_for_binlog = 1 Note that this may have consequences if you use binary logs for replication or point in time recovery from backups. For our purposes it solved the database concurrency issues. It would also make sense to increase the default timeout to 120s. innodb_lock_wait_timeout = 120

Memcache

Using memcache for caching helps lighten the load on your database which is where bottle necks can start to happen once you start using concurrent processing. In addition, the memcache module also provides an improved alternative to the Drupal standard locking mechanism. This is a drop-in replacement if you have memcached configured already for Drupal by adding the following lines to you settings.php

 $conf['lock_inc'] = 'sites/all/modules/memcache/memcache-lock.inc'; $conf['memcache_stampede_protection'] = TRUE;
John Ennew

About the author

John Ennew

Technical Director