11

ネットワーク経由で結果を送信する別のシステムのパラメーターSELECTとして SQL クエリの結果を返す Java 関数が必要です。InputStream

ただし、 はカスタムの区切り文字 (常にではありませんが、多くの場合 CSV) を使用するInputStream必要があります。String

結果を取得する関数を簡単に作成し、区切られた を作成し、String最終的にそれStringをに変換InputStreamできますが、多くの場合、SQL の結果はメモリ内で処理するには大きすぎます。また、結果を返す前に結果セット全体を処理すると、不要な待ち時間が発生します。

InputStreamSQL の結果を反復処理し、データベースから返された処理済み (区切られた) データを送信するにはどうすればよいですか?

4

3 に答える 3

9

基本的なアイデアを提供するはずの(テストされていない)コードスニペットを投稿します。

/**
 * Implementors of this interface should only convert current row to byte array and return it.
 * 
 * @author yura
 */
public interface RowToByteArrayConverter {
    byte[] rowToByteArray(ResultSet resultSet);
}

public class ResultSetAsInputStream extends InputStream {

    private final RowToByteArrayConverter converter;
    private final PreparedStatement statement;
    private final ResultSet resultSet;

    private byte[] buffer;
    private int position;

    public ResultSetAsInputStream(final RowToByteArrayConverter converter, final Connection connection, final String sql, final Object... parameters) throws SQLException {
        this.converter = converter;
        statement = createStatement(connection, sql, parameters);
        resultSet = statement.executeQuery();
    }

    private static PreparedStatement createStatement(final Connection connection, final String sql, final Object[] parameters) {
        // PreparedStatement should be created here from passed connection, sql and parameters
        return null;
    }

    @Override
    public int read() throws IOException {
        try {
            if(buffer == null) {
                // first call of read method
                if(!resultSet.next()) {
                    return -1; // no rows - empty input stream
                } else {
                    buffer = converter.rowToByteArray(resultSet);
                    position = 0;
                    return buffer[position++] & (0xff);
                }
            } else {
                // not first call of read method
                if(position < buffer.length) {
                    // buffer already has some data in, which hasn't been read yet - returning it
                    return buffer[position++] & (0xff);
                } else {
                    // all data from buffer was read - checking whether there is next row and re-filling buffer
                    if(!resultSet.next()) {
                        return -1; // the buffer was read to the end and there is no rows - end of input stream
                    } else {
                        // there is next row - converting it to byte array and re-filling buffer
                        buffer = converter.rowToByteArray(resultSet);
                        position = 0;
                        return buffer[position++] & (0xff);
                    }
                }
            }
        } catch(final SQLException ex) {
            throw new IOException(ex);
        }
    }



    @Override
    public void close() throws IOException {
        try {
            statement.close();
        } catch(final SQLException ex) {
            throw new IOException(ex);
        }
    }
}

これは非常に単純な実装であり、次の方法で改善できます。

  • 読み取りメソッドの if と else の間のコードの重複を削除できます - 明確化のために投稿されました
  • 行ごとにバイト配列バッファーを再作成する (new byte[]コストのかかる操作です) 代わりに、一度だけ初期化されてから再入力されるバイト配列バッファーを使用する、より洗練されたロジックを実装できます。次に、埋められたバイト数を返し、渡されたバイト配列を満たすRowToByteArrayConverter.rowToByteArrayメソッドのシグネチャを変更する必要があります。int fillByteArrayFromRow(ResultSet rs, byte[] array)

バイト配列には符号付きバイト-1(実際に255は符号なしバイト) が含まれているため、ストリームの末尾が正しくないことを示しているため、& (0xff)符号付きバイトを符号なしバイトに整数値として変換するために使用されます。詳細については、Java が int をバイトに変換する方法を参照してください。.

また、ネットワーク転送速度が遅い場合、結果セットが長時間開かれたままになり、データベースに問題が生じる可能性があることにも注意してください。

お役に立てれば ...

于 2012-06-28T22:51:28.597 に答える
2

以下を導入することで、@Yura が提案した回答を改善します
。RowToByteArrayConverter の実装内で、データをバイト配列に簡単に書き込むために、ByteArrayOutputStream で初期化された DataOutputStream を使用します。
実際、コンバーターの階層を持つことをお勧めします。それらはすべて同じ抽象クラスを拡張します (これは私のアイデアのコード スニペットです。最初からコンパイルできない可能性があります)。

public abstract class RowToByteArrayConverter {
  public byte[] rowToByteArray(ResultSet resultSet) {
      parseResultSet(dataOutputStream, resultSet);
      return byteArrayOutputSteam.toByteArray();
  }

  public RowToByteArrayConverter() {
    dataOutputStream = new DataOutputStream(byteArrayOutputStream);
  }

  protected DataOutputStream dataOutputStream;
  protected ByteArrayOutputStream byteArrayOutputStream;

  protected abstract void parseResultSet(DataOutputStream dataOutputStresm, ResultSet rs); 
}

これで、parseResultSet メソッドをオーバーライドするだけで、このクラスをオーバーライドできます。
たとえば、レコード内の列「name」から名前を文字列として取得するコードを記述します。DataOputputStream で writeUTF8 を実行します。

于 2012-07-02T16:51:50.753 に答える
2

上記の回答は、制限されたサイズの stringbuilder を超えるという問題に対する有用な解決策を提供します。また、メモリ効率も優れています。ただし、私のテストでは、stringbuilder にデータを書き込んで呼び出すよりも遅いことが示唆されています。

new ByteArrayInputStream(data.getBytes("UTF-8"))

入力ストリームを取得します。

パーティション関数を使用して受信データをスライスし、それぞれに複数のスレッドを使用することで、はるかにパフォーマンスが向上することがわかりました。

  1. データのサブセットについてソース データベースにクエリを実行する
  2. ターゲットにデータを書き込む

これにより、合計データが文字列バッファーの最大サイズを超える可能性があるという問題も回避されます。

たとえば、SQL Server テーブルに「RecordDate」という列を持つ 6m のレコードがあります。Recorddate の値は 2013 年と 2016 年の間で異なります。そのため、2013、14、15、16 のデータをそれぞれ要求するように各スレッドを構成します。次に、各スレッドはトランスコードされたデータを StringBuilder に書き込み、上記のように getBytes() を使用して Inputstream に変換することにより、各バルク ロードをターゲットに書き込みます。

これにより、速度が 2 倍になりました。

なんで?ソース データベースとターゲット データベースは複数の同時要求を処理できるため、全体的なワークロードは、ソース データベース、トランスコーダ、ターゲット データベースの 3 つのプロセスすべての複数のスレッドに分散されます。

于 2016-08-19T11:08:45.370 に答える