这篇文章主要介绍了c#实现几种数据库的大数据批量插入,主要包括sqlserver、oracle、sqlite和mysql,有兴趣的可以了解一下。
在之前只知道sqlserver支持数据批量插入,殊不知道oracle、sqlite和mysql也是支持的,不过oracle需要使用orace.dataaccess驱动,今天就贴出几种数据库的批量插入解决方法。
首先说一下,iprovider里有一个用于实现批量插入的插件服务接口ibatcherprovider,此接口在前一篇文章中已经提到过了。
/// <summary>
/// 提供数据批量处理的方法。
/// </summary>
public interface ibatcherprovider : iproviderservice
{
/// <summary>
/// 将 <see cref="datatable"/> 的数据批量插入到数据库中。
/// </summary>
/// <param name="datatable">要批量插入的 <see cref="datatable"/>。</param>
/// <param name="batchsize">每批次写入的数据量。</param>
void insert(datatable datatable, int batchsize = 10000);
}
一、sqlserver数据批量插入
sqlserver的批量插入很简单,使用sqlbulkcopy就可以,以下是该类的实现:
/// <summary>
/// 为 system.data.sqlclient 提供的用于批量操作的方法。
/// </summary>
public sealed class mssqlbatcher : ibatcherprovider
{
/// <summary>
/// 获取或设置提供者服务的上下文。
/// </summary>
public servicecontext servicecontext { get; set; }
/// <summary>
/// 将 <see cref="datatable"/> 的数据批量插入到数据库中。
/// </summary>
/// <param name="datatable">要批量插入的 <see cref="datatable"/>。</param>
/// <param name="batchsize">每批次写入的数据量。</param>
public void insert(datatable datatable, int batchsize = 10000)
{
checker.argumentnull(datatable, "datatable");
if (datatable.rows.count == 0)
{
return;
}
using (var connection = (sqlconnection)servicecontext.database.createconnection())
{
try
{
connection.tryopen();
//给表名加上前后导符
var tablename = dbutility.formatbyquote(servicecontext.database.provider.getservice<isyntaxprovider>(), datatable.tablename);
using (var bulk = new sqlbulkcopy(connection, sqlbulkcopyoptions.keepidentity, null)
{
destinationtablename = tablename,
batchsize = batchsize
})
{
//循环所有列,为bulk添加映射
datatable.eachcolumn(c => bulk.columnmappings.add(c.columnname, c.columnname), c => !c.autoincrement);
bulk.writetoserver(datatable);
bulk.close();
}
}
catch (exception exp)
{
throw new batcherexception(exp);
}
finally
{
connection.tryclose();
}
}
}
}
以上没有使用事务,使用事务在性能上会有一定的影响,如果要使用事务,可以设置sqlbulkcopyoptions.useinternaltransaction。
二、oracle数据批量插入
system.data.oracleclient不支持批量插入,因此只能使用oracle.dataaccess组件来作为提供者。
/// <summary>
/// oracle.data.access 组件提供的用于批量操作的方法。
/// </summary>
public sealed class oracleaccessbatcher : ibatcherprovider
{
/// <summary>
/// 获取或设置提供者服务的上下文。
/// </summary>
public servicecontext servicecontext { get; set; }
/// <summary>
/// 将 <see cref="datatable"/> 的数据批量插入到数据库中。
/// </summary>
/// <param name="datatable">要批量插入的 <see cref="datatable"/>。</param>
/// <param name="batchsize">每批次写入的数据量。</param>
public void insert(datatable datatable, int batchsize = 10000)
{
checker.argumentnull(datatable, "datatable");
if (datatable.rows.count == 0)
{
return;
}
using (var connection = servicecontext.database.createconnection())
{
try
{
connection.tryopen();
using (var command = servicecontext.database.provider.dbproviderfactory.createcommand())
{
if (command == null)
{
throw new batcherexception(new argumentexception("command"));
}
command.connection = connection;
command.commandtext = generateinsersql(servicecontext.database, command, datatable);
command.executenonquery();
}
}
catch (exception exp)
{
throw new batcherexception(exp);
}
finally
{
connection.tryclose();
}
}
}
/// <summary>
/// 生成插入数据的sql语句。
/// </summary>
/// <param name="database"></param>
/// <param name="command"></param>
/// <param name="table"></param>
/// <returns></returns>
private string generateinsersql(idatabase database, dbcommand command, datatable table)
{
var names = new stringbuilder();
var values = new stringbuilder();
//将一个datatable的数据转换为数组的数组
var data = table.toarray();
//设置arraybindcount属性
command.gettype().getproperty("arraybindcount").setvalue(command, table.rows.count, null);
var syntax = database.provider.getservice<isyntaxprovider>();
for (var i = 0; i < table.columns.count; i++)
{
var column = table.columns[i];
var parameter = database.provider.dbproviderfactory.createparameter();
if (parameter == null)
{
continue;
}
parameter.parametername = column.columnname;
parameter.direction = parameterdirection.input;
parameter.dbtype = column.datatype.getdbtype();
parameter.value = data[i];
if (names.length > 0)
{
names.append(",");
values.append(",");
}
names.appendformat("{0}", dbutility.formatbyquote(syntax, column.columnname));
values.appendformat("{0}{1}", syntax.parameterprefix, column.columnname);
command.parameters.add(parameter);
}
return string.format("insert into {0}({1}) values ({2})", dbutility.formatbyquote(syntax, table.tablename), names, values);
}
}
以上最重要的一步,就是将datatable转为数组的数组表示,即object[][],前数组的上标是列的个数,后数组是行的个数,因此循环columns将后数组作为parameter的值,也就是说,参数的值是一个数组。而insert语句与一般的插入语句没有什么不一样。
三、sqlite数据批量插入
sqlite的批量插入只需开启事务就可以了,这个具体的原理不得而知。
public sealed class sqlitebatcher : ibatcherprovider
{
/// <summary>
/// 获取或设置提供者服务的上下文。
/// </summary>
public servicecontext servicecontext { get; set; }
/// <summary>
/// 将 <see cref="datatable"/> 的数据批量插入到数据库中。
/// </summary>
/// <param name="datatable">要批量插入的 <see cref="datatable"/>。</param>
/// <param name="batchsize">每批次写入的数据量。</param>
public void insert(datatable datatable, int batchsize = 10000)
{
checker.argumentnull(datatable, "datatable");
if (datatable.rows.count == 0)
{
return;
}
using (var connection = servicecontext.database.createconnection())
{
dbtransaction transcation = null;
try
{
connection.tryopen();
transcation = connection.begintransaction();
using (var command = servicecontext.database.provider.dbproviderfactory.createcommand())
{
if (command == null)
{
throw new batcherexception(new argumentexception("command"));
}
command.connection = connection;
command.commandtext = generateinsersql(servicecontext.database, datatable);
if (command.commandtext == string.empty)
{
return;
}
var flag = new assertflag();
datatable.eachrow(row =>
{
var first = flag.asserttrue();
processcommandparameters(datatable, command, row, first);
command.executenonquery();
});
}
transcation.commit();
}
catch (exception exp)
{
if (transcation != null)
{
transcation.rollback();
}
throw new batcherexception(exp);
}
finally
{
connection.tryclose();
}
}
}
private void processcommandparameters(datatable datatable, dbcommand command, datarow row, bool first)
{
for (var c = 0; c < datatable.columns.count; c++)
{
dbparameter parameter;
//首次创建参数,是为了使用缓存
if (first)
{
parameter = servicecontext.database.provider.dbproviderfactory.createparameter();
parameter.parametername = datatable.columns[c].columnname;
command.parameters.add(parameter);
}
else
{
parameter = command.parameters[c];
}
parameter.value = row[c];
}
}
/// <summary>
/// 生成插入数据的sql语句。
/// </summary>
/// <param name="database"></param>
/// <param name="table"></param>
/// <returns></returns>
private string generateinsersql(idatabase database, datatable table)
{
var syntax = database.provider.getservice<isyntaxprovider>();
var names = new stringbuilder();
var values = new stringbuilder();
var flag = new assertflag();
table.eachcolumn(column =>
{
if (!flag.asserttrue())
{
names.append(",");
values.append(",");
}
names.append(dbutility.formatbyquote(syntax, column.columnname));
values.appendformat("{0}{1}", syntax.parameterprefix, column.columnname);
});
return string.format("insert into {0}({1}) values ({2})", dbutility.formatbyquote(syntax, table.tablename), names, values);
}
}
四、mysql数据批量插入
/// <summary>
/// 为 mysql.data 组件提供的用于批量操作的方法。
/// </summary>
public sealed class mysqlbatcher : ibatcherprovider
{
/// <summary>
/// 获取或设置提供者服务的上下文。
/// </summary>
public servicecontext servicecontext { get; set; }
/// <summary>
/// 将 <see cref="datatable"/> 的数据批量插入到数据库中。
/// </summary>
/// <param name="datatable">要批量插入的 <see cref="datatable"/>。</param>
/// <param name="batchsize">每批次写入的数据量。</param>
public void insert(datatable datatable, int batchsize = 10000)
{
checker.argumentnull(datatable, "datatable");
if (datatable.rows.count == 0)
{
return;
}
using (var connection = servicecontext.database.createconnection())
{
try
{
connection.tryopen();
using (var command = servicecontext.database.provider.dbproviderfactory.createcommand())
{
if (command == null)
{
throw new batcherexception(new argumentexception("command"));
}
command.connection = connection;
command.commandtext = generateinsersql(servicecontext.database, command, datatable);
if (command.commandtext == string.empty)
{
return;
}
command.executenonquery();
}
}
catch (exception exp)
{
throw new batcherexception(exp);
}
finally
{
connection.tryclose();
}
}
}
/// <summary>
/// 生成插入数据的sql语句。
/// </summary>
/// <param name="database"></param>
/// <param name="command"></param>
/// <param name="table"></param>
/// <returns></returns>
private string generateinsersql(idatabase database, dbcommand command, datatable table)
{
var names = new stringbuilder();
var values = new stringbuilder();
var types = new list<dbtype>();
var count = table.columns.count;
var syntax = database.provider.getservice<isyntaxprovider>();
table.eachcolumn(c =>
{
if (names.length > 0)
{
names.append(",");
}
names.appendformat("{0}", dbutility.formatbyquote(syntax, c.columnname));
types.add(c.datatype.getdbtype());
});
var i = 0;
foreach (datarow row in table.rows)
{
if (i > 0)
{
values.append(",");
}
values.append("(");
for (var j = 0; j < count; j++)
{
if (j > 0)
{
values.append(", ");
}
var isstrtype = isstringtype(types[j]);
var parameter = createparameter(database.provider, isstrtype, types[j], row[j], syntax.parameterprefix, i, j);
if (parameter != null)
{
values.append(parameter.parametername);
command.parameters.add(parameter);
}
else if (isstrtype)
{
values.appendformat("'{0}'", row[j]);
}
else
{
values.append(row[j]);
}
}
values.append(")");
i++;
}
return string.format("insert into {0}({1}) values {2}", dbutility.formatbyquote(syntax, table.tablename), names, values);
}
/// <summary>
/// 判断是否为字符串类别。
/// </summary>
/// <param name="dbtype"></param>
/// <returns></returns>
private bool isstringtype(dbtype dbtype)
{
return dbtype == dbtype.ansistring || dbtype == dbtype.ansistringfixedlength || dbtype == dbtype.string || dbtype == dbtype.stringfixedlength;
}
/// <summary>
/// 创建参数。
/// </summary>
/// <param name="provider"></param>
/// <param name="isstrtype"></param>
/// <param name="dbtype"></param>
/// <param name="value"></param>
/// <param name="parprefix"></param>
/// <param name="row"></param>
/// <param name="col"></param>
/// <returns></returns>
private dbparameter createparameter(iprovider provider, bool isstrtype, dbtype dbtype, object value, char parprefix, int row, int col)
{
//如果生成全部的参数,则速度会很慢,因此,只有数据类型为字符串(包含'号)和日期型时才添加参数
if ((isstrtype && value.tostring().indexof('\'') != -1) || dbtype == dbtype.datetime)
{
var name = string.format("{0}p_{1}_{2}", parprefix, row, col);
var parameter = provider.dbproviderfactory.createparameter();
parameter.parametername = name;
parameter.direction = parameterdirection.input;
parameter.dbtype = dbtype;
parameter.value = value;
return parameter;
}
return null;
}
}
mysql的批量插入,是将值全部写在语句的values里,例如,insert batcher(id, name) values(1, '1', 2, '2', 3, '3', ........ 10, '10')。
五、测试
接下来写一个测试用例来看一下使用批量插入的效果。
public void testbatchinsert()
{
console.writeline(timewatcher.watch(() =>
invoketest(database =>
{
var table = new datatable("batcher");
table.columns.add("id", typeof(int));
table.columns.add("name1", typeof(string));
table.columns.add("name2", typeof(string));
table.columns.add("name3", typeof(string));
table.columns.add("name4", typeof(string));
//构造100000条数据
for (var i = 0; i < 100000; i++)
{
table.rows.add(i, i.tostring(), i.tostring(), i.tostring(), i.tostring());
}
//获取 ibatcherprovider
var batcher = database.provider.getservice<ibatcherprovider>();
if (batcher == null)
{
console.writeline("不支持批量插入。");
}
else
{
batcher.insert(table);
}
//输出batcher表的数据量
var sql = new sqlcommand("select count(1) from batcher");
console.writeline("当前共有 {0} 条数据", database.executescalar(sql));
})));
}
以下表中列出了四种数据库生成10万条数据各耗用的时间
数据库
耗用时间
mssql 00:00:02.9376300
oracle 00:00:01.5155959
sqlite 00:00:01.6275634
mysql 00:00:05.4166891
以上就是详解c#实现几种数据库的大数据批量插入的示例代码的详细内容。