Entity Framework Bulk Operations

Home / Entity Framework Bulk Operations

Late last week, I ran into an interesting problem. My current web-based project allows users to perform bulk operations on various database records – up to 1000 records at a time. When dealing with a small number of records, performance was adequate. However, when working with 1000 records, performance was completely unacceptable.


What was causing the poor performance? Well, it has a lot to do with auditing of data. Every field, on the records in question, that is modified results in an AuditField record being inserted. There is also a parent Audit record to which the AuditFields are linked. In the case of 1000 records being updated, this resulted in 3000 records being inserted. That’s a small number of records, but we have to examine how Entity Framework, and pretty much any other ORM, works.

When EF inserts a record that has a database generated primary key, it results in an INSERT statement being executed and a corresponding SELECT to get the PK through SCOPE_IDENTITY. Factor in other overhead, and this operation performing the update on 1000 records and then the inserting 3000 auditting records was taking over four minutes. (phew!)

Fortunately, there are a a few other source libraries available that add batch/bulk operations to Entity Framework.

First up, we have Entity Framework Extended. This library adds batch UPDATE/DELETE DbSet extensions. Using it, as in my case where I’m updating two fields to the same value across 1000 records, it’s quite straight forward:

_dbSet
    .Where(x => listOfIds.Contains(x.Id))
    .Update(new Entity() { x.StatusId = 1, x.StatusDate = currentDate });

This results in the appropriate UPDATE statement being generated, which takes into consideration your EF column mappings, schema, table name, connection, etc. However, I wanted a little more control. I wanted to be able to get the queries out of the extensions and execute them myself within my own transaction. I also wanted to be able to see/log the queries in LOG4Net. Since the query generation routines are contained within internal/private methods, it was a matter of extracting the appropriate code from the source. This wasn’t a big deal. I wound up with two methods – one returns the SELECT (WHERE expression) of the query and the other returns the UPDATE portion. Both methods also returned the appropriate parameters to pass along w/ the parameterized queries. Going down this path allowed me to easily take advantage of the batch features of this library within my own transactionally aware repository by executing the SQL command w/ the DbContext’s ExecuteSqlCommand feature.

public int BatchUpdate(Expression<Func<TEntity, bool>> queryExpr, Expression<Func<TEntity, TEntity>> updateExpr)
{
    var objectContext = ((IObjectContextAdapter)_dbContext).ObjectContext;
    var selectQuery = _dbSet.Where(queryExpr);
    var dbQuery = selectQuery as DbQuery<TEntity>;
    var objectQuery = dbQuery.ToObjectQuery();
    var entityMap = objectQuery.GetEntityMap<TEntity>();

    var selectQueryData = objectQuery.GetSelectQuery(entityMap);
    selectQueryData.QueryString = Commissions.DataAccess.Data.DatabaseContext.CommandOptimizations.ReplaceInClause(selectQueryData.QueryString);
    var updateQueryData = updateExpr.GetUpdateQuery(objectContext, entityMap);

    var sqlBuilder = new StringBuilder(updateQueryData.QueryString);
    sqlBuilder.AppendLine(" ");
    sqlBuilder.AppendFormat("FROM {0} AS j0 INNER JOIN (", entityMap.TableName);
    sqlBuilder.AppendLine();
    sqlBuilder.AppendLine(selectQueryData.QueryString);
    sqlBuilder.Append(") AS j1 ON (");

    bool wroteKey = false;
    foreach (var keyMap in entityMap.KeyMaps)
    {
        if (wroteKey)
            sqlBuilder.Append(" AND ");

        sqlBuilder.AppendFormat("j0.[{0}] = j1.[{0}]", keyMap.ColumnName);
        wroteKey = true;
    }
    sqlBuilder.Append(")");

    var queryStr = sqlBuilder.ToString();
    var parameters = new List<SqlParameter>();
    parameters.AddRange(selectQueryData.SqlParameters);
    parameters.AddRange(updateQueryData.SqlParameters);

    try
    {
        return _dbContext.Database.ExecuteSqlCommand(queryStr, parameters.ToArray());
    }
    catch (Exception ex)
    {
        Console.WriteLine(ex);
    }
    return 0;
}

/// <summary>
/// Retrieve the SQL query based on ObjectQuery
/// </summary>
/// <typeparam name="TEntity"></typeparam>
/// <param name="query"></param>
/// <param name="entityMap"></param>
/// <returns></returns>
public static SqlQueryData GetSelectQuery<TEntity>(this ObjectQuery<TEntity> query, EntityMap entityMap)
    where TEntity : class
{
    var sqlQueryData = new SqlQueryData();

    // changing query to only select keys
    var selector = new StringBuilder(50);
    selector.Append("new(");
    foreach (var propertyMap in entityMap.KeyMaps)
    {
        if (selector.Length > 4)
            selector.Append((", "));

        selector.Append(propertyMap.PropertyName);
    }
    selector.Append(")");

    var selectQuery = DynamicQueryable.Select(query, selector.ToString());
    var objectQuery = selectQuery as ObjectQuery;

    if (objectQuery == null)
        throw new ArgumentException("The query must be of type ObjectQuery.", "query");

    sqlQueryData.QueryString = objectQuery.ToTraceString();

    // create parameters
    foreach (var objectParameter in objectQuery.Parameters)
    {
        sqlQueryData.Parameters.Add(objectParameter.Name, objectParameter.Value ?? DBNull.Value);
        sqlQueryData.SqlParameters.Add(new SqlParameter() { ParameterName = objectParameter.Name, Value = objectParameter.Value ?? DBNull.Value });
    }

    return sqlQueryData;
}

public static SqlQueryData GetUpdateQuery<TEntity>(this Expression<Func<TEntity, TEntity>> updateExpr, ObjectContext objectContext, EntityMap entityMap)
    where TEntity : class
{
    var initExpr = updateExpr.Body as MemberInitExpression;
    if (initExpr == null)
        throw new ArgumentException("updateExpr expression must be MemberInitExpression.", "updateExpr");

    int nameCount = 0;
    bool wroteSet = false;

    var sqlBuilder = new StringBuilder();
    var sqlQueryData = new SqlQueryData();
    sqlBuilder.Append("UPDATE ");
    sqlBuilder.Append(entityMap.TableName);
    sqlBuilder.AppendLine(" SET ");

    // Iterate over the member bindings
    foreach (var binding in initExpr.Bindings)
    {
        if (wroteSet)
            sqlBuilder.AppendLine(", ");

        string propName = binding.Member.Name;
        string colName = entityMap
            .PropertyMaps
            .Where(x => x.PropertyName == propName)
            .Select(x => x.ColumnName)
            .FirstOrDefault();

        var memberAssignment = binding as MemberAssignment;
        var memberExpression = memberAssignment.Expression;
        ParameterExpression parameterExpression = null;
        memberExpression.Visit((ParameterExpression p) =>
        {
            if (p.Type == entityMap.EntityType)
                parameterExpression = p;

            return p;
        });

        if (parameterExpression == null)
        {
            object value;

            if (memberExpression.NodeType == ExpressionType.Constant)
            {
                var constantExpression = memberExpression as ConstantExpression;
                if (constantExpression == null)
                    throw new ArgumentException(
                        "The MemberAssignment expression is not a ConstantExpression.", "updateExpression");

                value = constantExpression.Value;
            }
            else
            {
                LambdaExpression lambda = Expression.Lambda(memberExpression, null);
                value = lambda.Compile().DynamicInvoke();
            }

            if (value != null)
            {
                string parameterName = "p__update__" + nameCount++;

                sqlQueryData.Parameters.Add(parameterName, value);
                sqlQueryData.SqlParameters.Add(new SqlParameter() { ParameterName = parameterName, Value = value });
                sqlBuilder.AppendFormat("[{0}] = @{1}", colName, parameterName);
            }
            else
            {
                sqlBuilder.AppendFormat("[{0}] = NULL", colName);
            }
        }
        else
        {
            // create clean objectset to build query from
            var objectSet = objectContext.CreateObjectSet<TEntity>();

            Type[] typeArguments = new[] { entityMap.EntityType, memberExpression.Type };

            ConstantExpression constantExpression = Expression.Constant(objectSet);
            LambdaExpression lambdaExpression = Expression.Lambda(memberExpression, parameterExpression);

            MethodCallExpression selectExpression = Expression.Call(
                typeof(Queryable),
                "Select",
                typeArguments,
                constantExpression,
                lambdaExpression);

            // create query from expression
            var selectQuery = objectSet.CreateQuery(selectExpression, entityMap.EntityType);
            string sql = selectQuery.ToTraceString();

            // parse select part of sql to use as update
            string regex = @"SELECT\s*\r\n\s*(?<ColumnValue>.+)?\s*AS\s*(?<ColumnAlias>\[\w+\])\r\n\s*FROM\s*(?<TableName>\[\w+\]\.\[\w+\]|\[\w+\])\s*AS\s*(?<TableAlias>\[\w+\])";
            Match match = Regex.Match(sql, regex);
            if (!match.Success)
                throw new ArgumentException("The MemberAssignment expression could not be processed.", "updateExpression");

            string value = match.Groups["ColumnValue"].Value;
            string alias = match.Groups["TableAlias"].Value;

            value = value.Replace(alias + ".", "");

            foreach (ObjectParameter objectParameter in selectQuery.Parameters)
            {
                string parameterName = "p__update__" + nameCount++;

                sqlQueryData.Parameters.Add(parameterName, objectParameter.Value ?? DBNull.Value);
                sqlQueryData.SqlParameters.Add(new SqlParameter() { ParameterName = objectParameter.Name, Value = objectParameter.Value ?? DBNull.Value });
                value = value.Replace(objectParameter.Name, parameterName);
            }
            sqlBuilder.AppendFormat("[{0}] = {1}", colName, value);
        }
        wroteSet = true;
    }

    sqlQueryData.QueryString = sqlBuilder.ToString();
    return sqlQueryData;
}

public class SqlQueryData
{
    public string QueryString { get; set; }
    public Dictionary<string, object> Parameters { get; set; }
    public List<SqlParameter> SqlParameters { get; set; }

    public SqlQueryData()
    {
        Parameters = new Dictionary<string, object>();
        SqlParameters = new List<SqlParameter>();
    }
}

With that code in place, I could execute against my IRepository:

_repo.BatchUpdate(x => listOfIds.Contains(x.Id), y => new Entity{ StatusId = statusId, StatusDate = currentDate });

That solves one problem. The bigger problem, though, is insertion of a large number of records and the round-tripping that occurs.

So, solving the second, and bigger performance issue, required a different approach. For this, I used another library called EntityFramework.BulkInsert. This library will basically take an IEnumerable and perform a SQL BulkCopy. The useful part of this library, though, is that it will generate the appropriate data and command text based on your EntityFramework configuration. The insertion is available as an extension method on the DbContext. Fortunately, it also allows passing in a Transaction to which the operation is attached. Following my repository pattern, it was simple enough to add generic methods to allow bulk inserting any entity types (not just type T of the repository):

public virtual void BulkInsert(List<TEntity> entities)
{
    var transaction = _dbContext.Transaction != null && _dbContext.Transaction.UnderlyingTransaction != null ?
        _dbContext.Transaction.UnderlyingTransaction as IDbTransaction :
        null;
    if (transaction != null)
    {
        _dbContext.BulkInsert(entities, transaction, SqlBulkCopyOptions.CheckConstraints);
    }
    else
    {
        _dbContext.BulkInsert(entities, SqlBulkCopyOptions.CheckConstraints);
    }
}

void IRepository<TEntity>.BulkInsert<V>(List<V> entities)
{
    var transaction = _dbContext.Transaction != null && _dbContext.Transaction.UnderlyingTransaction != null ?
        _dbContext.Transaction.UnderlyingTransaction as IDbTransaction :
        null;
    if (transaction != null)
    {
        _dbContext.BulkInsert<V>(entities, transaction, SqlBulkCopyOptions.CheckConstraints);
    }
    else
    {
        _dbContext.BulkInsert<V>(entities, SqlBulkCopyOptions.CheckConstraints);
    }
}

I should also mention, the Transaction member available on my context is just a private member created like so:


_dbTransaction = this.Database.BeginTransaction(System.Data.IsolationLevel.RepeatableRead);

That nearly solves all of the problems of performance. But, recall the round tripping to retrieve the SCOPE_IDENTITY. Well, we can’t insert the child records (AuditField) without having the parent (Audit) Id. This requires making a call back to the database in order to get the the Ids of the newly inserted records. How do we accomplish this though? There are many ways, but for my use case, there are some key pieces of data that help identify what was inserted:

  • Time
  • Modified Entity’s Id
  • User name record w/ the Audit record

With this in mind, I created a Dictionary to store all of the AuditFields I was creating relative to the Entity that for which the Audit record was created. I could simply query the database to get all of the Audit records after BulkInserting them, get their PK’s, set them on the appropriate AuditFields, and perform a bulk insert on the children (AuditFields):

var currentUser = _currentUserSvc.UserName;
var auditsProj = _repo.GetUntracked(x => listEntityOfIds.Contains(x.Id) && x.CreateDate == currentDate && x.CreateUser == currentUser)
    .Select(x => new { Id = x.Id, EntityId= x.EntityId})
    .ToList();

foreach (var audit in auditsProj)
{
    var list = auditFieldsDict[audit.EntityId];

    foreach (var auditField in list)
    {
        auditField.AuditId = audit.Id;
        auditField.Audit.Id = audit.Id;

    }
    auditFields.AddRange(list);
}

_repo.BulkInsert<AuditField>(auditFields);

I realize that’s a really long explanation of something that winds up not being very involved. To put things in perspective, though, consider that initially the user operation was taking nearly 4 1/2 minutes. After implementing these changes to handle bulk/batch operations, processing time was reduced to between 1 and 2 SECONDS. That’s a crazy performance difference! This particular function of the system went from being completely unusable to being pretty performant!

Leave a Reply