1

フィールドの1つがタイムスタンプであるカスタムhadoopWritableクラスを実装しようとしています。これを簡単にするクラス(たとえば、Writable for DateまたはCalendar)をhadoopライブラリーで見つけることができないようです。カレンダーでget/setTimeInMillisを使用してカスタム書き込み可能ファイルを作成することを考えていますが、この問題に対するより良い/組み込みのソリューションがあるかどうか疑問に思っています。

4

2 に答える 2

4

Hadoopにはカレンダー/日付の書き込みはありません。カレンダーオブジェクトからtimeInMillisをlongとして取得できることを考慮すると、アプリケーションが常にデフォルトのUTCタイムゾーンを使用する場合(つまり、タイムゾーンに「不可知論的」である場合に限り)、LongWritableを使用してカレンダーオブジェクトをシリアル化できます。そのtimeInMillisはUTC時間を表します)。

別のタイムゾーンを使用する場合、またはアプリケーションがさまざまなタイムゾーンに関してtimeInMillisを解釈できる必要がある場合は、デフォルトの書き込み可能な実装を最初から作成する必要があります。

于 2012-08-14T18:11:25.157 に答える
1

これは、3つのプロパティ(そのうちの1つは日付)を持つ書き込み可能ファイルを説明するために生成したカスタム書き込み可能ファイルです。データ値がlongとして永続化されており、longをDateとの間で簡単に変換できることがわかります。3つのプロパティがあることが多すぎる場合は、日付付きの書き込み可能ファイルを生成できます。

package com.lmx.writable;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInput;
import java.io.DataInputStream;
import java.io.DataOutput;
import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.*;
import com.eaio.uuid.UUID;
import org.apache.hadoop.io.*;
import org.apache.pig.ResourceSchema;
import org.apache.pig.ResourceSchema.ResourceFieldSchema;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.DataType;
import org.apache.pig.data.DefaultDataBag;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;
import org.json.JSONArray;
import org.json.JSONException;
import org.json.JSONObject;

public class MyCustomWritable implements Writable {

  public static int PROPERTY_DATE = 0;
  public static int PROPERTY_COUNT = 1;
  public static int PROPERTY_NAME = 2;

  private boolean[] changeFlag = new boolean[3];

  private Date  _date;
  private int   _count;
  private String    _name;

  public MyCustomWritable() {
    resetChangeFlags();
  }

  public MyCustomWritable(Date _date, int _count, String _name) {
    resetChangeFlags();
    setDate(_date);  
    setCount(_count);  
    setName(_name);  
  }

  public MyCustomWritable(byte[] bytes) {
    ByteArrayInputStream is = new ByteArrayInputStream(bytes);
    DataInput in = new DataInputStream(is);
    try { readFields(in); } catch (IOException e) { }
    resetChangeFlags();
  }



  public Date getDate() {
    return _date;
  }

  public void setDate(Date value) {
    _date = value;
    changeFlag[PROPERTY_DATE] = true;
  }  

  public int getCount() {
    return _count;
  }

  public void setCount(int value) {
    _count = value;
    changeFlag[PROPERTY_COUNT] = true;
  }  

  public String getName() {
    return _name;
  }

  public void setName(String value) {
    _name = value;
    changeFlag[PROPERTY_NAME] = true;
  }  

  public void readFields(DataInput in) throws IOException {

            // Read Date _date

        if (in.readBoolean()) {
            _date = new Date(in.readLong());
            changeFlag[PROPERTY_DATE] = true;
        } else {
            _date = null;
            changeFlag[PROPERTY_DATE] = false;
        }       
            // Read int _count

        _count = in.readInt();
        changeFlag[PROPERTY_COUNT] = true;

            // Read String _name

        if (in.readBoolean()) {
            _name = Text.readString(in);
            changeFlag[PROPERTY_NAME] = true;
        } else {
            _name = null;
            changeFlag[PROPERTY_NAME] = false;
        }
  }

  public void write(DataOutput out) throws IOException {

            // Write Date _date

      if (_date == null) {
            out.writeBoolean(false);
      } else {
            out.writeBoolean(true);
            out.writeLong(_date.getTime());
      }

            // Write int _count

      out.writeInt(_count);

            // Write String _name

      if (_name == null) {
            out.writeBoolean(false);
      } else {
            out.writeBoolean(true);
            Text.writeString(out,_name);
      }
  }

  public byte[] getBytes() throws IOException {
      ByteArrayOutputStream os = new ByteArrayOutputStream();
      DataOutputStream out = new DataOutputStream(os);
      write(out);
      out.flush();
      out.close();
      return os.toByteArray();
  }

  public void resetChangeFlags() {
    changeFlag[PROPERTY_DATE] = false;
    changeFlag[PROPERTY_COUNT] = false;
    changeFlag[PROPERTY_NAME] = false;
  }

  public boolean getChangeFlag(int i) {
    return changeFlag[i];
  }


   public byte[] getDateAsBytes() throws IOException {
      ByteArrayOutputStream os = new ByteArrayOutputStream();
      DataOutputStream out = new DataOutputStream(os);

            // Write Date _date

      if (_date == null) {
            out.writeBoolean(false);
      } else {
            out.writeBoolean(true);
            out.writeLong(_date.getTime());
      }

      out.flush();
      out.close();
      return os.toByteArray();
   }

   public byte[] getCountAsBytes() throws IOException {
      ByteArrayOutputStream os = new ByteArrayOutputStream();
      DataOutputStream out = new DataOutputStream(os);

            // Write int _count

      out.writeInt(_count);

      out.flush();
      out.close();
      return os.toByteArray();
   }

   public byte[] getNameAsBytes() throws IOException {
      ByteArrayOutputStream os = new ByteArrayOutputStream();
      DataOutputStream out = new DataOutputStream(os);

            // Write String _name

      if (_name == null) {
            out.writeBoolean(false);
      } else {
            out.writeBoolean(true);
            Text.writeString(out,_name);
      }

      out.flush();
      out.close();
      return os.toByteArray();
   }


   public void setDateFromBytes(byte[] b) throws IOException {
      ByteArrayInputStream is = new ByteArrayInputStream(b);
      DataInput in = new DataInputStream(is);
      int len;

            // Read Date _date

        if (in.readBoolean()) {
            _date = new Date(in.readLong());
            changeFlag[PROPERTY_DATE] = true;
        } else {
            _date = null;
            changeFlag[PROPERTY_DATE] = false;
        }
   }

   public void setCountFromBytes(byte[] b) throws IOException {
      ByteArrayInputStream is = new ByteArrayInputStream(b);
      DataInput in = new DataInputStream(is);
      int len;

            // Read int _count

        _count = in.readInt();
        changeFlag[PROPERTY_COUNT] = true;

   }

   public void setNameFromBytes(byte[] b) throws IOException {
      ByteArrayInputStream is = new ByteArrayInputStream(b);
      DataInput in = new DataInputStream(is);
      int len;

            // Read String _name

        if (in.readBoolean()) {
            _name = Text.readString(in);
            changeFlag[PROPERTY_NAME] = true;
        } else {
            _name = null;
            changeFlag[PROPERTY_NAME] = false;
        }

   }

    public Tuple asTuple() throws ExecException {

        Tuple tuple = TupleFactory.getInstance().newTuple(3);

        if (getDate() == null) {
            tuple.set(0, (Long) null);
        } else {
            tuple.set(0, new Long(getDate().getTime()));
        }
        tuple.set(1, new Integer(getCount()));
        if (getName() == null) {
            tuple.set(2, (String) null);
        } else {
            tuple.set(2, getName());
        }

        return tuple;
    }

    public static ResourceSchema getPigSchema() throws IOException {

        ResourceSchema schema = new ResourceSchema();
        ResourceFieldSchema fieldSchema[] = new ResourceFieldSchema[3];
        ResourceSchema bagSchema;
        ResourceFieldSchema bagField[];

        fieldSchema[0] = new ResourceFieldSchema();
        fieldSchema[0].setName("date");
        fieldSchema[0].setType(DataType.LONG);

        fieldSchema[1] = new ResourceFieldSchema();
        fieldSchema[1].setName("count");
        fieldSchema[1].setType(DataType.INTEGER);

        fieldSchema[2] = new ResourceFieldSchema();
        fieldSchema[2].setName("name");
        fieldSchema[2].setType(DataType.CHARARRAY);

        schema.setFields(fieldSchema);
        return schema;

    }

    public static MyCustomWritable fromJson(String source) {

        MyCustomWritable obj = null;

        try {
            JSONObject jsonObj = new JSONObject(source);
            obj = fromJson(jsonObj);
        } catch (JSONException e) {
            System.out.println(e.toString());
        }

        return obj; 
    }

    public static MyCustomWritable fromJson(JSONObject jsonObj) {

        MyCustomWritable obj = new MyCustomWritable();

        try {

            if (jsonObj.has("date")) {
                obj.setDate(new Date(jsonObj.getLong("date")));
            }

            if (jsonObj.has("count")) {
                obj.setCount(jsonObj.getInt("count"));
            }

            if (jsonObj.has("name")) {
                obj.setName(jsonObj.getString("name"));
            }

        } catch (JSONException e) {
            System.out.println(e.toString());
            obj = null;
        }

        return obj; 
    }

    public JSONObject toJson() {

        try {
            JSONObject jsonObj = new JSONObject();
            JSONArray jsonArray;

            if (getDate() != null) {
                jsonObj.put("date", getDate().getTime());
            }
            jsonObj.put("count", getCount());

            if (getName() != null) {
                jsonObj.put("name", getName());
            }
            return jsonObj; 
        } catch (JSONException e) { }

        return null;    
    }

    public String toJsonString() {

        return toJson().toString(); 

    }
}
于 2012-08-14T18:15:14.727 に答える