3

以下のThriftサービスを定義したとしましょう

service FileResource {      
binary get_file(1:string file_name)
}

ここに私が理解できない生成された実装があります

public ByteBuffer recv_get_file() throws org.apache.thrift.TException
{
  org.apache.thrift.protocol.TMessage msg = iprot_.readMessageBegin();
  if (msg.type == org.apache.thrift.protocol.TMessageType.EXCEPTION) {
    org.apache.thrift.TApplicationException x = org.apache.thrift.TApplicationException.read(iprot_);
    iprot_.readMessageEnd();
    throw x;
  }
  if (msg.seqid != seqid_) {
    throw new org.apache.thrift.TApplicationException(org.apache.thrift.TApplicationException.BAD_SEQUENCE_ID, "get_file failed: out of sequence response");
  }
  get_file_result result = new get_file_result();
  result.read(iprot_);
  iprot_.readMessageEnd();
  if (result.isSetSuccess()) {
    return result.success;
  }
  throw new org.apache.thrift.TApplicationException(org.apache.thrift.TApplicationException.MISSING_RESULT, "get_file failed: unknown result");
}

文字列のしくみ

 result.read(iprot_);

? 同期ですか非同期ですか?大きなデータ (数メガバイト以上) ではどのように機能しますか? そして、それらのデータを読み取るために何が必要ですか? 残念ながら、私は java.nio と ByteBuffer を扱うことに慣れていません。例やガイドがあればいいでしょう。

4

2 に答える 2

-4

最後に、サーバーからクライアントへのファイル転送に成功しました。Thrift によって自動生成された Client クラスと Processor クラスを拡張しました。TProtocol オブジェクトにアクセスできるようになりました。これにより、任意のデータ ストリームを送受信できます。
私の解決策は非常に大雑把であると確信しています。Thriftアーキテクチャに準拠して実装する方法を誰かが教えてくれたらうれしいです。カスタムThriftプロトコルを実装することで、よりうまく達成できますか?

クライアント:

package alehro.droid;

import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;

import org.apache.thrift.TException;
import org.apache.thrift.protocol.TProtocol;

import alehro.log.Logger;
import alehro.tcp.ChunkFileResourceThrift;
import alehro.tcp.ServerSideError;

class ThriftClientExt extends ChunkFileResourceThrift.Client {
public ThriftClientExt(TProtocol prot) {
    super(prot);

}

public void recv_get_file_ext(String get_file_out_path) throws TException,
        IOException, ServerSideError {
    FileOutputStream fos = new FileOutputStream(get_file_out_path);
    FileChannel channel = fos.getChannel();
    int size = 0;
    // -1 - end of file, -2 exception.
    while ((size = iprot_.readI32()) > 0) {
        Logger.me.v("receiving buffer size=" + size);
        ByteBuffer out = iprot_.readBinary();
        // out.flip();
        channel.write(out);
    }
    if (size == -2) {
        String msg = iprot_.readString();
        Logger.me.e("Server error: " + msg);
        // TODO: report error to user
    }
    channel.close();
    recv_get_file();
}

}

サーバ:

package alehro.tcp;

import org.apache.thrift.TException;
import org.apache.thrift.protocol.TProtocol;

import alehro.log.Logger;
import alehro.tcp.ChunkFileResourceThrift.Iface;
import alehro.tcp.ChunkFileResourceThrift.get_file_args;
import alehro.tcp.ChunkFileResourceThrift.get_file_result;

public class ChunkedFileResourceProcessor extends
    ChunkFileResourceThrift.Processor {

public interface IfaceExt extends Iface {
    void get_file_raw(String key, String file_name, TProtocol out)
            throws TException, ServerSideError;
}

final private IfaceExt iface_1;


public ChunkedFileResourceProcessor(IfaceExt iface) {
    super(iface);
    iface_1 = iface;
    // replace generated implementation by my custom one.
    processMap_.put("get_file", new get_file_raw());
}

private class get_file_raw implements ProcessFunction {

    @Override
    public void process(int seqid, TProtocol iprot, TProtocol oprot)
            throws TException {
        get_file_args args = new get_file_args();
        try {
            args.read(iprot);
        } catch (org.apache.thrift.protocol.TProtocolException e) {
            iprot.readMessageEnd();
            org.apache.thrift.TApplicationException x = new org.apache.thrift.TApplicationException(
                    org.apache.thrift.TApplicationException.PROTOCOL_ERROR,
                    e.getMessage());
            oprot.writeMessageBegin(new org.apache.thrift.protocol.TMessage(
                    "get_file",
                    org.apache.thrift.protocol.TMessageType.EXCEPTION,
                    seqid));
            x.write(oprot);
            oprot.writeMessageEnd();
            oprot.getTransport().flush();
            return;
        }
        iprot.readMessageEnd();
        get_file_result result = new get_file_result();
        try {
            iface_1.get_file_raw(args.key, args.file_name, oprot);
        } catch (ServerSideError ouch) {
            result.ouch = ouch;
        } catch (Throwable th) {
            Logger.me.e("Internal error processing get_file_raw");
            Logger.me.e(th.getMessage());
            Logger.me.e(th);
            org.apache.thrift.TApplicationException x = new org.apache.thrift.TApplicationException(
                    org.apache.thrift.TApplicationException.INTERNAL_ERROR,
                    "Internal error processing get_file");
            oprot.writeMessageBegin(new org.apache.thrift.protocol.TMessage(
                    "get_file",
                    org.apache.thrift.protocol.TMessageType.EXCEPTION,
                    seqid));
            x.write(oprot);
            oprot.writeMessageEnd();
            oprot.getTransport().flush();
            return;
        }
        oprot.writeMessageBegin(new org.apache.thrift.protocol.TMessage(
                "get_file", org.apache.thrift.protocol.TMessageType.REPLY,
                seqid));
        result.write(oprot);
        oprot.writeMessageEnd();
        oprot.getTransport().flush();
    }

}

}

サーバー ハンドラ:

public class ChunkedFileResourceHandler implements
    ChunkedFileResourceProcessor.IfaceExt {
....
@Override
public void get_file(String key, String file_name) throws TException {
    // stub
    throw new TException("Wrong call. Use get_file_raw instead.");
}

@Override
public void get_file_raw(String key, String file_name, final TProtocol out)
        throws ServerSideError, TException {
    // catch all here. mimic original get_file throw politics.
    try {
        Logger.me.v("Begin get_file_raw");
        UserSession se = accessUserSession(key, "get", 0, 0);
        vali(se != null);
        synchronized (se) {
            String fullPath = "";
            Logger.me.i("get file start: " + file_name);
            String userDir = AppConfig.getUserDir(se.info.email);
            fullPath = userDir + file_name;

            final FileInputStream inputFile;
            ByteBuffer buffer = null;
            int bytesRead = -1;
            FileChannel fileChannel = null;

            inputFile = new FileInputStream(fullPath);
            fileChannel = inputFile.getChannel();
            buffer = ByteBuffer.allocate(2048);
            bytesRead = fileChannel.read(buffer);

            // Logger.me.v("start sending file");
            while (bytesRead != -1) {
                buffer.flip();
                int length = buffer.limit() - buffer.position()
                        - buffer.arrayOffset();
                Logger.me.v("sending buffer length=" + length);

                out.writeI32(length); // read it in client
                out.writeBinary(buffer); // read it in client
                buffer.clear();

                bytesRead = fileChannel.read(buffer);

            }
            out.writeI32(-1); // read it in client

            Logger.me.i("get file end.");
        }
    } catch (TException e) {
        throw e;
    } catch (Throwable e) {
        write_get_file_exception(file_name, e, out);
        return;
    }

}

void write_get_file_exception(String file, Throwable e, final TProtocol out)
        throws TException {
    out.writeI32(-2);
    out.writeString("Exception in get_file_raw: file=" + file
            + "description=" + e.getMessage());
    Logger.me.e(e);
    Logger.me.i("get file ended wtih errors: " + e.getMessage());
}
}
于 2011-08-27T14:58:58.187 に答える