Friday, 9 March 2012

Pagination with Cassandra

I have been going through a number of articles trying to find the best way to traverse through all records in my Cassandra Database row-wise.
The requirement is to implement pagination for my application which reads records from Cassandra database based on row keys and displays links of upto 10 pages on the UI.

I am using Hector client for JAVA for my application, and the requirement can be roughly depicted as follows:

Page 1            Row key key4 to key13

Page 2            Row key keyl4 to key23
    .                           .
    .                           .
    .                           .
Page last            Row key key94 to key103

I found these links very useful:
http://blog.dynatrace.com/2011/12/05/pagination-with-cassandra-and-what-we-can-learn-from-it/
https://github.com/rantav/hector/wiki/User-Guide


Here is the code:

import java.util.List;
import me.prettyprint.cassandra.serializers.StringSerializer;
import me.prettyprint.hector.api.Cluster;
import me.prettyprint.hector.api.Keyspace;
import me.prettyprint.hector.api.beans.ColumnSlice;
import me.prettyprint.hector.api.beans.HColumn;
import me.prettyprint.hector.api.beans.OrderedRows;
import me.prettyprint.hector.api.beans.Row;
import me.prettyprint.hector.api.factory.HFactory;
import me.prettyprint.hector.api.query.RangeSlicesQuery;
import me.prettyprint.hector.api.query.QueryResult;
import static me.prettyprint.hector.api.factory.HFactory.getOrCreateCluster;
public class PageRecords {
          private static final StringSerializer se = new StringSerializer();
 private static Cluster cluster= getOrCreateCluster("Test Cluster2", "localhost:9160");
 private static  Keyspace keyspace = HFactory.createKeyspace("KS1", cluster);


public static void main(String args[]) throws Exception{
RangeSlicesQuery<String, String, String> rangeSlicesQuery =
HFactory.createRangeSlicesQuery(keyspace, se, se,
se);
rangeSlicesQuery.setColumnFamily("CF1");
rangeSlicesQuery.setKeys("", "");                       //set blank as start and end keys
rangeSlicesQuery.setRange("", "", false, 3);    //no. of columns
rangeSlicesQuery.setRowCount(11);            //get 11 to get start key for next page
QueryResult<OrderedRows<String, String, String>> result = rangeSlicesQuery.execute();
OrderedRows<String, String, String> orderedRows = result.get();
List<Row<String, String, String>> listRows = orderedRows.getList();
for(int i=1; i< 11; i++){
                       System.out.println("***************page "+ i+" ****************");
result = rangeSlicesQuery.execute();
orderedRows = result.get();
listRows = orderedRows.getList();
for(int j=0; j< listRows.size()-1;j++ ){
Row<String, String, String> row = listRows.get(j);
String key = row.getKey();
System.out.println(key);
ColumnSlice<String, String> colSlice=row.getColumnSlice();
List<HColumn<String, String>> colList= colSlice.getColumns();
for(int k=0; k<colList.size();k++){
System.out.println(colList.get(k).getName() + " : "+colList.get(k).getValue());
}
}
                 Row<String,String,String> lastRow = orderedRows.peekLast(); //get last key
   rangeSlicesQuery.setKeys(lastRow.getKey(), "");   //set last key as start key for next page
}
}
}
PS: this is just a sample code based on my initial analysis, comments/suggestions for improvement are welcomed..

Tuesday, 28 February 2012

Cassandra and Pig --- Column Metadata Problems

I have been trying to get Pig running with my cassandra cluster.
My column family is having columns with metadata.

I was trying to run simple pig script using the syntax below:
rows = LOAD 'cassandra://demoKs1/demoCf2' USING CassandraStorage();

I was getting the following error:

Duplicate schema alias: value in "columns"
org.apache.pig.impl.plan.PlanValidationException: ERROR 1108: 


Duplicate schema alias: value in "columns"
at org.apache.pig.newplan.logical.visitor.SchemaAliasVisitor.validate(SchemaAliasVisitor.java:74)
at org.apache.pig.newplan.logical.visitor.SchemaAliasVisitor.visit(SchemaAliasVisitor.java:109)
at org.apache.pig.newplan.logical.relational.LOInnerLoad.accept(LOInnerLoad.java:128)
at org.apache.pig.newplan.DependencyOrderWalker.walk(DependencyOrderWalker.java:75)
at org.apache.pig.newplan.PlanVisitor.visit(PlanVisitor.java:50)
at org.apache.pig.newplan.logical.visitor.SchemaAliasVisitor.visit(SchemaAliasVisitor.java:99)
at org.apache.pig.newplan.logical.relational.LOForEach.accept(LOForEach.java:74)
at org.apache.pig.newplan.DependencyOrderWalker.walk(DependencyOrderWalker.java:75)
at org.apache.pig.newplan.PlanVisitor.visit(PlanVisitor.java:50)
at org.apache.pig.PigServer$Graph.compile(PigServer.java:1659)
at org.apache.pig.PigServer$Graph.compile(PigServer.java:1653)
at org.apache.pig.PigServer$Graph.access$200(PigServer.java:1378)
at org.apache.pig.PigServer.execute(PigServer.java:1280)
at org.apache.pig.PigServer.executeBatch(PigServer.java:360)
at org.apache.pig.tools.grunt.GruntParser.executeBatch(GruntParser.java:131)
at org.apache.pig.tools.grunt.GruntParser.processDump(GruntParser.java:654)
at org.apache.pig.tools.pigscript.parser.PigScriptParser.parse(PigScriptParser.java:303)
at org.apache.pig.tools.grunt.GruntParser.parseStopOnError(GruntParser.java:188)
at org.apache.pig.tools.grunt.GruntParser.parseStopOnError(GruntParser.java:164)
at org.apache.pig.tools.grunt.Grunt.exec(Grunt.java:81)
at org.apache.pig.Main.run(Main.java:553)
at org.apache.pig.Main.main(Main.java:108)
========================================================================

Finally I found this issue in JIRA. https://issues.apache.org/jira/browse/CASSANDRA-3371

After incorporating some changes from the above, I was able to get the code running with my column family having metadata.
Here is the Updated CassandraStorage.java
Please do look at the new release of Cassandra 1.0.8, I believe this is fixed there.

/**
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements. See the NOTICE file distributed with this
 * work for additional information regarding copyright ownership. The ASF
 * licenses this file to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations under
 * the License.
 */
package org.apache.cassandra.hadoop.pig;
import java.io.IOException;
import java.math.BigInteger;
import java.nio.ByteBuffer;
import java.util.*;


import org.apache.cassandra.config.ConfigurationException;
import org.apache.cassandra.db.marshal.BytesType;
import org.apache.cassandra.db.marshal.IntegerType;
import org.apache.cassandra.db.marshal.TypeParser;
import org.apache.cassandra.thrift.*;
import org.apache.cassandra.utils.Hex;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;


import org.apache.cassandra.db.Column;
import org.apache.cassandra.db.IColumn;
import org.apache.cassandra.db.SuperColumn;
import org.apache.cassandra.db.marshal.*;
import org.apache.cassandra.hadoop.*;
import org.apache.cassandra.thrift.Mutation;
import org.apache.cassandra.thrift.Deletion;
import org.apache.cassandra.thrift.ColumnOrSuperColumn;
import org.apache.cassandra.utils.ByteBufferUtil;


import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.*;


import org.apache.pig.*;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
import org.apache.pig.data.*;
import org.apache.pig.ResourceSchema.ResourceFieldSchema;
import org.apache.pig.impl.logicalLayer.FrontendException;
import org.apache.pig.impl.util.UDFContext;
import org.apache.thrift.TDeserializer;
import org.apache.thrift.TException;
import org.apache.thrift.TSerializer;
import org.apache.thrift.transport.TFramedTransport;
import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TTransport;
import org.apache.thrift.transport.TTransportException;


/**
 * A LoadStoreFunc for retrieving data from and storing data to Cassandra
 *
 * A row from a standard CF will be returned as nested tuples: (key, ((name1, val1), (name2, val2))).
 */
public class CassandraStorage extends LoadFunc implements StoreFuncInterface, LoadMetadata
{
    // system environment variables that can be set to configure connection info:
    // alternatively, Hadoop JobConf variables can be set using keys from ConfigHelper
    public final static String PIG_RPC_PORT = "PIG_RPC_PORT";
    public final static String PIG_INITIAL_ADDRESS = "PIG_INITIAL_ADDRESS";
    public final static String PIG_PARTITIONER = "PIG_PARTITIONER";


    private final static ByteBuffer BOUND = ByteBufferUtil.EMPTY_BYTE_BUFFER;
    private static final Log logger = LogFactory.getLog(CassandraStorage.class);


    private ByteBuffer slice_start = BOUND;
    private ByteBuffer slice_end = BOUND;
    private boolean slice_reverse = false;
    private String keyspace;
    private String column_family;
    private String loadSignature;
    private String storeSignature;


    private Configuration conf;
    private RecordReader reader;
    private RecordWriter writer;
    private int limit;


    public CassandraStorage()
    {
        this(1024);
    }


    /**
     * @param limit: number of columns to fetch in a slice
     */
    public CassandraStorage(int limit)
    {
        super();
        this.limit = limit;
    }


    public int getLimit()
    {
        return limit;
    }


    @Override
    public Tuple getNext() throws IOException
    {
        try
        {
            // load the next pair
            if (!reader.nextKeyValue())
                return null;


            CfDef cfDef = getCfDef(loadSignature);
            ByteBuffer key = (ByteBuffer)reader.getCurrentKey();
            SortedMap<ByteBuffer,IColumn> cf = (SortedMap<ByteBuffer,IColumn>)reader.getCurrentValue();
            assert key != null && cf != null;


            // output tuple, will hold the key, each indexed column in a tuple, then a bag of the rest
                   Tuple tuple = TupleFactory.getInstance().newTuple();
                      DefaultDataBag bag = new DefaultDataBag();
                     // set the key
                     tuple.append(new DataByteArray(key.array(), key.position()+key.arrayOffset(), key.limit()+key.arrayOffset()));
                        // we must add all the indexed columns first to match the schema
                     HashMap<ByteBuffer, Boolean> added = new HashMap<ByteBuffer, Boolean>();
                      // luckily this is sorted for us already
                      for (ColumnDef cdef : cfDef.column_metadata)
                      {
                         if (cf.containsKey(cdef.name))
                          {
                               tuple.append(columnToTuple(cdef.name, cf.get(cdef.name), cfDef));
                           }
                          else
                           {   // otherwise, we need to add an empty tuple to take its place
                             tuple.append(TupleFactory.getInstance().newTuple());
                           }
                           added.put(cdef.name, true);
                     }
                      // now add all the other columns
           
           
            for (Map.Entry<ByteBuffer, IColumn> entry : cf.entrySet())
            {
            if (!added.containsKey(entry.getKey()))
            bag.add(columnToTuple(entry.getKey(), entry.getValue(), cfDef));
           
           
           
            }


            tuple.append(bag);
            return tuple;
        }
        catch (InterruptedException e)
        {
            throw new IOException(e.getMessage());
        }
    }


    private Tuple columnToTuple(ByteBuffer name, IColumn col, CfDef cfDef) throws IOException
    {
        Tuple pair = TupleFactory.getInstance().newTuple(2);
        List<AbstractType> marshallers = getDefaultMarshallers(cfDef);
        Map<ByteBuffer,AbstractType> validators = getValidatorMap(cfDef);


        setTupleValue(pair, 0, marshallers.get(0).compose(name));
        if (col instanceof Column)
        {
            // standard
            if (validators.get(name) == null)
                setTupleValue(pair, 1, marshallers.get(1).compose(col.value()));
            else
                setTupleValue(pair, 1, validators.get(name).compose(col.value()));
            return pair;
        }


        // super
        ArrayList<Tuple> subcols = new ArrayList<Tuple>();
        for (IColumn subcol : col.getSubColumns())
            subcols.add(columnToTuple(subcol.name(), subcol, cfDef));
       
        pair.set(1, new DefaultDataBag(subcols));
        return pair;
    }


    private void setTupleValue(Tuple pair, int position, Object value) throws ExecException
    {
       if (value instanceof BigInteger)
           pair.set(position, ((BigInteger) value).intValue());
       else if (value instanceof ByteBuffer)
           pair.set(position, new DataByteArray(ByteBufferUtil.getArray((ByteBuffer) value)));
       else
           pair.set(position, value);
    }


    private CfDef getCfDef(String signature)
    {
        UDFContext context = UDFContext.getUDFContext();
        Properties property = context.getUDFProperties(CassandraStorage.class);
        return cfdefFromString(property.getProperty(signature));
    }


    private List<AbstractType> getDefaultMarshallers(CfDef cfDef) throws IOException
    {
        ArrayList<AbstractType> marshallers = new ArrayList<AbstractType>();
        AbstractType comparator = null;
        AbstractType default_validator = null;
        AbstractType key_validator = null;
        try
        {
            comparator = TypeParser.parse(cfDef.getComparator_type());
            default_validator = TypeParser.parse(cfDef.getDefault_validation_class());
            key_validator = TypeParser.parse(cfDef.getKey_validation_class());
        }
        catch (ConfigurationException e)
        {
            throw new IOException(e);
        }


        marshallers.add(comparator);
        marshallers.add(default_validator);
        marshallers.add(key_validator);
        return marshallers;
    }


    private Map<ByteBuffer, AbstractType> getValidatorMap(CfDef cfDef) throws IOException
    {
        Map<ByteBuffer, AbstractType> validators = new HashMap<ByteBuffer, AbstractType>();
        for (ColumnDef cd : cfDef.getColumn_metadata())
        {
            if (cd.getValidation_class() != null && !cd.getValidation_class().isEmpty())
            {
                AbstractType validator = null;
                try
                {
                    validator = TypeParser.parse(cd.getValidation_class());
                    validators.put(cd.name, validator);
                }
                catch (ConfigurationException e)
                {
                    throw new IOException(e);
                }
            }
        }
        return validators;
    }


    @Override
    public InputFormat getInputFormat()
    {
        return new ColumnFamilyInputFormat();
    }


    @Override
    public void prepareToRead(RecordReader reader, PigSplit split)
    {
        this.reader = reader;
    }


    public static Map<String, String> getQueryMap(String query)
    {
        String[] params = query.split("&");
        Map<String, String> map = new HashMap<String, String>();
        for (String param : params)
        {
            String[] keyValue = param.split("=");
            map.put(keyValue[0], keyValue[1]);
        }
        return map;
    }


    private void setLocationFromUri(String location) throws IOException
    {
        // parse uri into keyspace and columnfamily
        String names[];
        try
        {
            if (!location.startsWith("cassandra://"))
                throw new Exception("Bad scheme.");
            String[] urlParts = location.split("\\?");
            if (urlParts.length > 1)
            {
                Map<String, String> urlQuery = getQueryMap(urlParts[1]);
                AbstractType comparator = BytesType.instance;
                if (urlQuery.containsKey("comparator"))
                    comparator = TypeParser.parse(urlQuery.get("comparator"));
                if (urlQuery.containsKey("slice_start"))
                    slice_start = comparator.fromString(urlQuery.get("slice_start"));
                if (urlQuery.containsKey("slice_end"))
                    slice_end = comparator.fromString(urlQuery.get("slice_end"));
                if (urlQuery.containsKey("reversed"))
                    slice_reverse = Boolean.parseBoolean(urlQuery.get("reversed"));
                if (urlQuery.containsKey("limit"))
                    limit = Integer.parseInt(urlQuery.get("limit"));
            }
            String[] parts = urlParts[0].split("/+");
            keyspace = parts[1];
            column_family = parts[2];
        }
        catch (Exception e)
        {
            throw new IOException("Expected 'cassandra://<keyspace>/<columnfamily>[?slice_start=<start>&slice_end=<end>[&reversed=true][&limit=1]]': " + e.getMessage());
        }
    }


    private void setConnectionInformation() throws IOException
    {
        if (System.getenv(PIG_RPC_PORT) != null)
            ConfigHelper.setRpcPort(conf, System.getenv(PIG_RPC_PORT));
        else if (ConfigHelper.getRpcPort(conf) == 0)
            throw new IOException("PIG_RPC_PORT environment variable not set");
        if (System.getenv(PIG_INITIAL_ADDRESS) != null)
            ConfigHelper.setInitialAddress(conf, System.getenv(PIG_INITIAL_ADDRESS));
        else if (ConfigHelper.getInitialAddress(conf) == null)
            throw new IOException("PIG_INITIAL_ADDRESS environment variable not set");
        if (System.getenv(PIG_PARTITIONER) != null)
            ConfigHelper.setPartitioner(conf, System.getenv(PIG_PARTITIONER));
        else if (ConfigHelper.getPartitioner(conf) == null)
            throw new IOException("PIG_PARTITIONER environment variable not set");
    }


    @Override
    public void setLocation(String location, Job job) throws IOException
    {
        conf = job.getConfiguration();
        setLocationFromUri(location);
        if (ConfigHelper.getRawInputSlicePredicate(conf) == null)
        {
            SliceRange range = new SliceRange(slice_start, slice_end, slice_reverse, limit);
            SlicePredicate predicate = new SlicePredicate().setSlice_range(range);
            ConfigHelper.setInputSlicePredicate(conf, predicate);
        }
        ConfigHelper.setInputColumnFamily(conf, keyspace, column_family);
        setConnectionInformation();
        if (loadSignature == null)
              loadSignature = location;
        initSchema(loadSignature);
    }


    public ResourceSchema getSchema(String location, Job job) throws IOException
    {
        setLocation(location, job);
        CfDef cfDef = getCfDef(loadSignature);


        if (cfDef.column_type.equals("Super"))
            return null;
        // top-level schema, no type
        ResourceSchema schema = new ResourceSchema();


        // get default marshallers and validators
        List<AbstractType> marshallers = getDefaultMarshallers(cfDef);
        Map<ByteBuffer,AbstractType> validators = getValidatorMap(cfDef);
        ResourceSchema bagSchema = new ResourceSchema();
        // add key
        ResourceFieldSchema keyFieldSchema = new ResourceFieldSchema();
        keyFieldSchema.setName("key");
        keyFieldSchema.setType(getPigType(marshallers.get(2)));


        // will become the bag of tuples
        ResourceFieldSchema bagField = new ResourceFieldSchema();
           bagField.setType(DataType.BAG);
            bagField.setName("columns");
             // inside the bag, place one tuple with the default comparator/validator schema
             ResourceSchema bagTupleSchema = new ResourceSchema();
             ResourceFieldSchema bagTupleField = new ResourceFieldSchema();
            bagTupleField.setType(DataType.TUPLE);
            ResourceFieldSchema bagcolSchema = new ResourceFieldSchema();
             ResourceFieldSchema bagvalSchema = new ResourceFieldSchema();
             bagcolSchema.setName("name");
             bagvalSchema.setName("value");
             bagcolSchema.setType(getPigType(marshallers.get(0)));
            bagvalSchema.setType(getPigType(marshallers.get(1)));
              bagTupleSchema.setFields(new ResourceFieldSchema[] { bagcolSchema, bagvalSchema });
            bagTupleField.setSchema(bagTupleSchema);
             bagSchema.setFields(new ResourceFieldSchema[] { bagTupleField });
            bagField.setSchema(bagSchema);
       
               // will contain all fields for this schema
              List<ResourceFieldSchema> allSchemaFields = new ArrayList<ResourceFieldSchema>();
              // add the key first, then the indexed columns, and finally the bag
              allSchemaFields.add(keyFieldSchema);


        // defined validators/indexes
        for (ColumnDef cdef : cfDef.column_metadata)
        {
        // make a new tuple for each col/val pair
                  ResourceSchema innerTupleSchema = new ResourceSchema();
                  ResourceFieldSchema innerTupleField = new ResourceFieldSchema();
                  innerTupleField.setType(DataType.TUPLE);
                  innerTupleField.setSchema(innerTupleSchema);
                  innerTupleField.setName(new String(cdef.getName()));
       
                  ResourceFieldSchema idxColSchema = new ResourceFieldSchema();
                   idxColSchema.setName("name");
                  idxColSchema.setType(getPigType(marshallers.get(0)));
       
                  ResourceFieldSchema valSchema = new ResourceFieldSchema();
                  AbstractType validator = validators.get(cdef.name);
            if (validator == null)
                validator = marshallers.get(1);
            valSchema.setName("value");
            valSchema.setType(getPigType(validator));
            innerTupleSchema.setFields(new ResourceFieldSchema[] { idxColSchema, valSchema });
            allSchemaFields.add(innerTupleField);
        }
        // bag at the end for unknown columns
           allSchemaFields.add(bagField);
        // a bag can contain only one tuple, but that tuple can contain anything
           schema.setFields(allSchemaFields.toArray(new ResourceFieldSchema[allSchemaFields.size()]));
           return schema;
    }


    private byte getPigType(AbstractType type)
    {
        if (type instanceof LongType)
            return DataType.LONG;
        else if (type instanceof IntegerType)
            return DataType.INTEGER;
        else if (type instanceof AsciiType)
            return DataType.CHARARRAY;
        else if (type instanceof UTF8Type)
            return DataType.CHARARRAY;
        else if (type instanceof FloatType)
            return DataType.FLOAT;
        else if (type instanceof DoubleType)
            return DataType.DOUBLE;
        return DataType.BYTEARRAY;
    }


    public ResourceStatistics getStatistics(String location, Job job)
    {
        return null;
    }


    public String[] getPartitionKeys(String location, Job job)
    {
        return null;
    }


    public void setPartitionFilter(Expression partitionFilter)
    {
        // no-op
    }


    @Override
    public String relativeToAbsolutePath(String location, Path curDir) throws IOException
    {
        return location;
    }


    @Override
    public void setUDFContextSignature(String signature)
    {
        this.loadSignature = signature;
    }


    /* StoreFunc methods */
    public void setStoreFuncUDFContextSignature(String signature)
    {
        this.storeSignature = signature;
    }


    public String relToAbsPathForStoreLocation(String location, Path curDir) throws IOException
    {
        return relativeToAbsolutePath(location, curDir);
    }


    public void setStoreLocation(String location, Job job) throws IOException
    {
        conf = job.getConfiguration();
        setLocationFromUri(location);
        ConfigHelper.setOutputColumnFamily(conf, keyspace, column_family);
        setConnectionInformation();
        initSchema(storeSignature);
    }


    public OutputFormat getOutputFormat()
    {
        return new ColumnFamilyOutputFormat();
    }


    public void checkSchema(ResourceSchema schema) throws IOException
    {
        // we don't care about types, they all get casted to ByteBuffers
    }


    public void prepareToWrite(RecordWriter writer)
    {
        this.writer = writer;
    }


    private ByteBuffer objToBB(Object o)
    {
        if (o == null)
            return (ByteBuffer)o;
        if (o instanceof java.lang.String)
            o = new DataByteArray((String)o);
        return ByteBuffer.wrap(((DataByteArray) o).get());
    }


    public void putNext(Tuple t) throws ExecException, IOException
    {
        ByteBuffer key = objToBB(t.get(0));
        DefaultDataBag pairs = (DefaultDataBag) t.get(1);
        ArrayList<Mutation> mutationList = new ArrayList<Mutation>();
        CfDef cfDef = getCfDef(storeSignature);
        try
        {
            for (Tuple pair : pairs)
            {
               Mutation mutation = new Mutation();
               if (DataType.findType(pair.get(1)) == DataType.BAG) // supercolumn
               {
                   org.apache.cassandra.thrift.SuperColumn sc = new org.apache.cassandra.thrift.SuperColumn();
                   sc.name = objToBB(pair.get(0));
                   ArrayList<org.apache.cassandra.thrift.Column> columns = new ArrayList<org.apache.cassandra.thrift.Column>();
                   for (Tuple subcol : (DefaultDataBag) pair.get(1))
                   {
                       org.apache.cassandra.thrift.Column column = new org.apache.cassandra.thrift.Column();
                       column.name = objToBB(subcol.get(0));
                       column.value = objToBB(subcol.get(1));
                       column.setTimestamp(System.currentTimeMillis() * 1000);
                       columns.add(column);
                   }
                   if (columns.isEmpty()) // a deletion
                   {
                       mutation.deletion = new Deletion();
                       mutation.deletion.super_column = objToBB(pair.get(0));
                       mutation.deletion.setTimestamp(System.currentTimeMillis() * 1000);
                   }
                   else
                   {
                       sc.columns = columns;
                       mutation.column_or_supercolumn = new ColumnOrSuperColumn();
                       mutation.column_or_supercolumn.super_column = sc;
                   }
               }
               else // assume column since it couldn't be anything else
               {
                   if (pair.get(1) == null)
                   {
                       mutation.deletion = new Deletion();
                       mutation.deletion.predicate = new org.apache.cassandra.thrift.SlicePredicate();
                       mutation.deletion.predicate.column_names = Arrays.asList(objToBB(pair.get(0)));
                       mutation.deletion.setTimestamp(System.currentTimeMillis() * 1000);
                   }
                   else
                   {
                       org.apache.cassandra.thrift.Column column = new org.apache.cassandra.thrift.Column();
                       column.name = objToBB(pair.get(0));
                       column.value = objToBB(pair.get(1));
                       column.setTimestamp(System.currentTimeMillis() * 1000);
                       mutation.column_or_supercolumn = new ColumnOrSuperColumn();
                       mutation.column_or_supercolumn.column = column;
                   }
               }
               mutationList.add(mutation);
            }
        }
        catch (ClassCastException e)
        {
            throw new IOException(e + " Output must be (key, {(column,value)...}) for ColumnFamily or (key, {supercolumn:{(column,value)...}...}) for SuperColumnFamily", e);
        }
        try
        {
            writer.write(key, mutationList);
        }
        catch (InterruptedException e)
        {
           throw new IOException(e);
        }
    }


    public void cleanupOnFailure(String failure, Job job)
    {
    }


    /* Methods to get the column family schema from Cassandra */


    private void initSchema(String signature)
    {
        UDFContext context = UDFContext.getUDFContext();
        Properties property = context.getUDFProperties(CassandraStorage.class);


        // Only get the schema if we haven't already gotten it
        if (!property.containsKey(signature))
        {
            Cassandra.Client client = null;
            try
            {
                client = ConfigHelper.getClientFromAddressList(conf);
                CfDef cfDef = null;
                client.set_keyspace(keyspace);
                KsDef ksDef = client.describe_keyspace(keyspace);
                List<CfDef> defs = ksDef.getCf_defs();
                for (CfDef def : defs)
                {
                    if (column_family.equalsIgnoreCase(def.getName()))
                    {
                        cfDef = def;
                        break;
                    }
                }
                property.setProperty(signature, cfdefToString(cfDef));
            }
            catch (TException e)
            {
                throw new RuntimeException(e);
            }
            catch (InvalidRequestException e)
            {
                throw new RuntimeException(e);
            }
            catch (NotFoundException e)
            {
                throw new RuntimeException(e);
            }
            catch (IOException e)
            {
                throw new RuntimeException(e);
            }
        }
    }


    private static String cfdefToString(CfDef cfDef)
    {
        assert cfDef != null;
        // this is so awful it's kind of cool!
        TSerializer serializer = new TSerializer(new TBinaryProtocol.Factory());
        try
        {
            return Hex.bytesToHex(serializer.serialize(cfDef));
        }
        catch (TException e)
        {
            throw new RuntimeException(e);
        }
    }


    private static CfDef cfdefFromString(String st)
    {
        assert st != null;
        TDeserializer deserializer = new TDeserializer(new TBinaryProtocol.Factory());
        CfDef cfDef = new CfDef();
        try
        {
            deserializer.deserialize(cfDef, Hex.hexToBytes(st));
        }
        catch (TException e)
        {
            throw new RuntimeException(e);
        }
        return cfDef;
    }
}

Friday, 3 February 2012

Configure Cassandra as a Windows Service

1. Download binary tarball of Cassandra. (Make sure you are having all the prerequisites)
http://cassandra.apache.org/download/

2. Unzip this archive to a directory say D:\apache-cassandra-1.0.7.

3. Run cassandra.bat from bin folder to make sure its up and running.

4. Close this and now lets try and install it as a windows service.

5. Open command prompt, point to  D:\apache-cassandra-1.0.7\bin and run "cassandra.bat install".

6. This will give the right message, but it won't be installed. 

7. Make a folder daemon on this path "D:\apache-cassandra-1.0.7-bin\bin\daemon".

8. Download prunsrv.exe from http://www.apache.org/dist/commons/daemon/binaries/windows/ (Commons Daemon Service Runner)

9. Extract the correct version(32/64 bit) of prunsrv.exe from the above to D:\apache-cassandra-1.0.7-bin\bin\daemon. 

10. Now again Open command prompt, point to  D:\apache-cassandra-1.0.7\bin and run "cassandra.bat install". 


11. Now run "cassandra" service from Services Manger. 

12. To uninstall, Open command prompt, point to  D:\apache-cassandra-1.0.7\bin and run "cassandra.bat uninstall".

Make sure you choose 32/64 bit prunsrv.exe based on jre installed(32/64 bit) and not necessarily based on your Windows (32/64 bit).
You may get error like this in log file... 
[2012-02-04 11:04:24] [info]  Commons Daemon procrun (1.0.8.0 64-bit) started
[2012-02-04 11:04:24] [info]  Running 'cassandra' Service...
[2012-02-04 11:04:24] [info]  Starting service...
[2012-02-04 11:04:24] [error] Failed creating java 
[2012-02-04 11:04:24] [error] The system cannot find the file specified.
[2012-02-04 11:04:24] [error] ServiceStart returned 1
[2012-02-04 11:04:24] [error] The system cannot find the file specified.
[2012-02-04 11:04:24] [info]  Run service finished.
[2012-02-04 11:04:24] [info]  Commons Daemon procrun finished



This is because there is a mismatch between the jre expected by prunsrv.exe and the jre installed on your machine. 

To locate this on Task Manager, look for process with image name "prunsrv.exe". 


Android Browser Back Form Re-submission/Caching Issue

I am not much into Android Development. But I came across a strange issue while working on porting our existing web based application to android compatible browsers.
We hadn't made any drastic changes to our HTML pages(we were using XSLT) to port it to Android compatible browsers and it all seemed fine, until I came across this one.
We used to have a html form and pass hidden fields through it using the submit button... No big deal.. just plain old html forms with some jQuery.
While we navigated to one of our pages on Android Browser and clicked browser back 3 or 4 times, it used to navigate back to the previous page... no issues until now.....
But when we clicked the submit button on that old page, it used to dynamically create the form and submit it again, just like the first time we navigated from that page but took us to the page before that.

Just to explain it.....

                      (take input data)                           (take password)                     (Complete)
Screen2       ----------------->        Screen 3        ---------------------->                  Screen4
(Submit Button)                            (Submit Button)                                 ( No navigation button)
(hidden fields:                              (hidden fields:
nextScreen=3)                            nextScreen=4)    

1. Suppose User is on Screen 4.
2. Now User clicks Browser Back Button 3 or 4 times (android 2.2 and android 2.3).
3. User is navigated to screen 3.
4. Now User clicks on Submit Button of Screen 3.
5. User is now navigated to Screen 2.  (Request sent to the server had  nextScreen=2)

We had used proper meta tags like shown below:
<meta http-equiv="Cache-Control" content="no-store, no-cache, max-age=0"/>
<meta http-equiv="Pragma" content="no-cache" />


We tried clearing the browser cache and load our pages again, it again used to run into same trouble.


Finally we figured out  what was creating this trouble......


On Screen 3, our form used to look like this:
<form method="post" action="doSomething" name="abc" autocomplete="off" >
<input type="password" name="def" />
<input type="hidden" name="nextScreen" value ="4" />
</form>


This  autocomplete="off" used to create havoc and take some cached value of nextScreen (=2 in this case) from the previous screens.

So we updated our form to use it like this:


<form method="post" action="doSomething" name="abc"  >
<input type="password" name="def" autocomplete="off" />
<input type="hidden" name="nextScreen" value ="4" />
</form>