1

データセットの行をループして、アクティブスペース環境に挿入しています(tibco、そのインメモリデータベースによる)。これが私がやっている方法です。

これを回避するためのより速い方法はありますか?

行を分割してから各パーティションを並列化することを考えていましたが、それで高速になるかどうかはわかりません。

System.Threading.Tasks.Parallel.ForEach(
    dataSet.Tables[0].Rows,
    currRow =>
    {
        var tuple = Com.Tibco.As.Space.Tuple.Create();

        for (int i = 0; i < currRow.Values.Length; i++)
        {
            if (currRow.Values[i] != null)
            {
                var k = ConvertToAny(currRow.Values[i].ToString());

                if (k.GetType().IsEquivalentTo(typeof(DateTime)))
                {
                    tuple.Put(dataSet.Tables[0].ColumnNames[i], (DateTime)k);
                }
                else if (k.GetType().IsEquivalentTo(typeof(double)))
                {
                    tuple.Put(dataSet.Tables[0].ColumnNames[i], (double)k);
                }
                else
                {
                    tuple.Put(dataSet.Tables[0].ColumnNames[i], k.ToString());
                }
            }
        }
        try
        {
            inSpace_.Put(tuple);
        }
        catch (Exception e)
        {
        }
    }
);

誰かが助けてくれるなら、私は一度に約1000でそれをバッチ処理することを考えています:(

編集:

リストtuplesToAdd=new List(); for(int i = 0; i <dataSet.Tables [0] .Rows.Length; i ++){var tuple = Com.Tibco.As.Space.Tuple.Create();

            for (int j = 0; j < dataSet.Tables[0].Rows[i].Values.Length; j++)
            {
                if (dataSet.Tables[0].Rows[i].Values[j] != null)
                {
                    var k = ConvertToAny(dataSet.Tables[0].Rows[i].Values[j].ToString());
                    if (k is DateTime)
                    {
                        tuple.Put(dataSet.Tables[0].ColumnNames[j], (DateTime)k);
                    }
                    else if (k is Double)
                    {
                        tuple.Put(dataSet.Tables[0].ColumnNames[j], (Double)k);
                    }
                    else
                    {
                        tuple.Put(dataSet.Tables[0].ColumnNames[j], k.ToString());
                    }
                }
            }

            tuplesToAdd.Add(tuple);

            if (i % 100000 == 0 || i == dataSet.Tables[0].Rows.Length - 1)
            {

                ThreadStart TUPLE_WORKER = delegate
                {
                    inSpace_.PutAll(tuplesToAdd);
                };
                new Thread(TUPLE_WORKER).Start();
                tuplesToAdd.Clear();
            }
        }

(バッチ処理によって)それを実行しようとする私の新しい方法があります

4

1 に答える 1

1

よくわかりませんがToString、変換コードでを回避できるようです。つまり、ではなく:

var k = ConvertToAny(currRow.Values[i].ToString());

if (k.GetType().IsEquivalentTo(typeof(DateTime)))

に置き換えることができます...

var k = currRow.Values[i];
if (k is DateTime)
{
    tuple.Put(dataSet.Tables[0].ColumnNames[i], (DateTime)k);
}

これにより、文字列に変換してから元に戻す手間が省けます。

コメントに応じて追加

まず、あなたConvertToAnyは不要です。のアイテムcurrRow.Values[i]はすでに正しいタイプです。あなたはそれがどんなタイプかわからないだけです。DateTimeaまたは。の文字列表現である可能性があると言っている場合を除きますDouble。たとえば、型がすでにdoubleである場合、文字列に変換し、解析してから、元に戻す理由はありません。つまり、次の2ビットのコードは同じことを行います。

object o = 3.14;
var k = ConvertToAny(o.ToString());
if (k.GetType.IsEquivalentTo(typeof(double))

object o = 3.14;
if (o is double)

唯一の違いは、2番目の方がはるかに高速になることです。

ただし、

object o = "3.14";

それをに変換したいdouble場合は、変換を行う必要があります。

物事をバッチ処理するコードは、追加および更新時にリストをロックする必要があります。そうしないと、破損します。私は提案します:

lock (tuplesToAdd)
{
    tuplesToAdd.Add(tuple);
    if ((tuplesToAdd.Count % 10000) == 0)
    {
        // push them all to the database.
        inspace_.PutAll(tuplesToAdd);
        tuplesToAdd.Clear();
    }
}

そして、あなたがすべて終わったとき(つまり、終わったときParallel.Foreach):

if (tuplesToAdd.Count > 0)
{
    // push the remaining items
}

これで、更新中にすべてのスレッドがブロックされないようにしたい場合は、少しクリエイティブにすることができます。

まず、ロックできる2つのオブジェクトを作成します。

private object lockObject = new Object();
private object pushLock = new Object();

リストを作成した直後に作成しtuplesToAddます。次に、アイテムを追加する場合:

Monitor.Enter(lockObject); // acquires the lock
tuplesToAdd.Add(tuple);
if (tuplesToAdd.Count == 100000)
{
    var tuplesToPush = tuplesToAdd;
    tuplesToAdd = new List<tuple>(10000);
    Monitor.Exit(lockObject);  // releases the lock so other threads can process
    lock (pushLock)  // prevent multiple threads from pushing at the same time
    {
        inspace_.PutAll(tuplesToPush);
    }
}
else
{
    Monitor.Exit(lockObject);
}

そうすれば、1つのスレッドがデータベースを更新している間に、他のスレッドが次回のためにリストを埋めることができます。


そして、もう少し考えてみると、おそらくこのタスクに並列処理を使用する必要はありません。あなたの時間の大部分は、Put呼び出しを待っているスレッドによって費やされていた可能性があります。単一のスレッドを使用してこれらをバッチ処理し、まとめて書き込むと、おそらく元のソリューションよりもはるかに高速に実行されます。あなたが決めたパラレルバージョンはより速くなりますが、それが非常に速くなるとは思えません。

于 2013-03-08T17:01:36.970 に答える