Tuesday 14 January 2014

Spring RESTful Service - CRUD operation for HBase Table



Lets first create some POJO classes to hold User and Facebook/Tweeter posts/tweets


package com.restapi.model;

import java.util.Date;

/**
 * Created with IntelliJ IDEA.
 * User: santosh
 * Date: 10/1/14
 * Time: 7:26 PM
 *POJO class to hold Posts/Messages/Twits
 */
public class Message {
    private Long messageId;
    private String messageText;
    private Date messagePostedOn;

    public Long getMessageId() {
        return messageId;
    }

    public void setMessageId(Long messageId) {
        this.messageId = messageId;
    }

    public String getMessageText() {
        return messageText;
    }

    public void setMessageText(String messageText) {
        this.messageText = messageText;
    }

    public Date getMessagePostedOn() {
        return messagePostedOn;
    }

    public void setMessagePostedOn(Date messagePostedOn) {
        this.messagePostedOn = messagePostedOn;
    }
}
























package com.restapi.model;

import java.util.List;

/**
 * Created with IntelliJ IDEA.
 * User: santosh
 * Date: 10/1/14
 * Time: 7:24 PM
 * User class to hold userId and it respective posts
 */
public class User {
    private Long id;
    private List<Message> messageList;

    public Long getId() {
        return id;
    }

    public void setId(Long id) {
        this.id = id;
    }

    public List<Message> getMessageList() {
        return messageList;
    }

    public void setMessageList(List<Message> messageList) {
        this.messageList = messageList;
    }

    @Override
    public boolean equals(Object obj) {
        return this.getId() == ((User)obj).getId();
    }
}
 
 ====== DAO Class for Hbase POSTS table CRUD Operations==========













package com.restapi.hbase;
import com.restapi.model.User;
import com.restapi.model.Message;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.filter.CompareFilter;
import org.apache.hadoop.hbase.filter.RowFilter;
import org.apache.hadoop.hbase.filter.SubstringComparator;
import org.apache.hadoop.hbase.util.Bytes;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.*;

/**
 * Created with IntelliJ IDEA.
 * User: santosh
 * Date: 13/1/14
 * Time: 3:48 PM
 * DAO class for CRUD Operation on the HBASE POSTS Table
 */

@Component
public class PostDAO {

    private static Configuration configuration = HBaseConfiguration.create();
    private static final String POSTS_TABLE = "POSTS";
    private static final String MESSAGE_COLUMN_FAMILTY="message";
    static{
        configuration.set("hbase.zookeeper.quorum","localhost");
        configuration.set("hbase.zookeeper.property.port","2181");
    }


    public User create(User user){
        SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyyMMdd");
        String key="";
        long id = user.getId();

        String messageId="";
        HTableInterface tableInterface = null;
        try {
            tableInterface = new HTable(configuration,POSTS_TABLE);
            List<Put> putList = new ArrayList<Put>();
            for(Message message:user.getMessageList()){
                key = id + ":" + simpleDateFormat.format(message.getMessagePostedOn());
                messageId = message.getMessageId().toString();
                Put put = new Put(key.getBytes());
                put.add(MESSAGE_COLUMN_FAMILTY.getBytes(),messageId.getBytes(),message.getMessageText().getBytes());
                putList.add(put);
            }
            tableInterface.put(putList);
        } catch (IOException e) {
            e.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.
        }
        return user;
    }


    public void create(List<User> userList){
        SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyyMMdd");
        String key="";
        String messageId="";
        HTableInterface tableInterface = null;
        try {
            tableInterface = new HTable(configuration,POSTS_TABLE);
            List<Put> putList = new ArrayList<Put>();
            for(User user:userList){
                for(Message message:user.getMessageList()){
                    key = (user.getId()) + ":" + simpleDateFormat.format(message.getMessagePostedOn());
                    messageId = message.getMessageId().toString();
                    Put put = new Put(key.getBytes());
                    put.add(MESSAGE_COLUMN_FAMILTY.getBytes(),messageId.getBytes(),message.getMessageText().getBytes());
                    putList.add(put);
                }
            }
            tableInterface.put(putList);
        } catch (IOException e) {
            e.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.
        }
    }

    public User getUserMessagesForGivenDate(long userId, Date date){
        SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyyMMdd");
        User user = new User();
        user.setId(userId);
        String key = userId + ":" + simpleDateFormat.format(date);
        List<Message> messageList = new ArrayList<Message>();
        try {
            HTableInterface tableInterface = new HTable(configuration,POSTS_TABLE);
            Get get = new Get(key.getBytes());
            Result result = tableInterface.get(get);
            NavigableMap<byte[],byte[]> cqValueMap = result.getFamilyMap(MESSAGE_COLUMN_FAMILTY.getBytes());
            Message message = null;
            for(byte[] cq : cqValueMap.navigableKeySet()){
                message = new Message();
                message.setMessageId(new Long(Bytes.toString(cq)));
                message.setMessageText(Bytes.toString(cqValueMap.get(cq)));
                message.setMessagePostedOn(date);
                messageList.add(message);
            }

        } catch (IOException e) {
            e.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.
        }
        user.setMessageList(messageList);
        return user;
    }


    public User getAllMessagesForGivenUser(long userId){
        SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyyMMdd");
        User user = new User();
        user.setId(userId);
        List<Message> messageList = new ArrayList<Message>();
        try {
            HTableInterface tableInterface = new HTable(configuration,POSTS_TABLE);
            RowFilter rowFilter = new RowFilter(CompareFilter.CompareOp.EQUAL,new SubstringComparator(userId+":"));
            Scan scan = new Scan();
            scan.setFilter(rowFilter);
            ResultScanner resultScanner = tableInterface.getScanner(scan);
            Iterator<Result> resultIterator = resultScanner.iterator();
            Result result = null;
            StringTokenizer stringTokenizer = null;
            Date messageDate = null;

            while(resultIterator.hasNext()){
                result = resultIterator.next();
                stringTokenizer = new StringTokenizer(Bytes.toString(result.getRow()),":");
                stringTokenizer.nextToken();
                try {
                    messageDate = simpleDateFormat.parse(stringTokenizer.nextToken());
                } catch (ParseException e) {
                    e.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.
                }
                NavigableMap<byte[],byte[]> cqValueMap = result.getFamilyMap(MESSAGE_COLUMN_FAMILTY.getBytes());
                Message message = null;
                for(byte[] cq : cqValueMap.navigableKeySet()){
                    message = new Message();
                    message.setMessageId(new Long(Bytes.toString(cq)));
                    message.setMessageText(Bytes.toString(cqValueMap.get(cq)));
                    message.setMessagePostedOn(messageDate);
                    messageList.add(message);
                }
            }


        } catch (IOException e) {
            e.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.
        }
        user.setMessageList(messageList);
        return user;
    }

    public User update(User user){
        return this.create(user);
    }

    public void deleteSpecificMessages(User user){
        SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyyMMdd");
        String key="";
        long id = user.getId(); // should have come from the DB
        String messageId="";
        HTableInterface tableInterface = null;
        try {
            tableInterface = new HTable(configuration,POSTS_TABLE);
            List<Delete> deleteList = new ArrayList<Delete>();
            for(Message message:user.getMessageList()){
                key = id + ":" + simpleDateFormat.format(message.getMessagePostedOn());
                messageId = message.getMessageId().toString();
                Delete delete = new Delete(key.getBytes());
                delete.deleteColumn(MESSAGE_COLUMN_FAMILTY.getBytes(),messageId.getBytes());
                deleteList.add(delete);
            }
            tableInterface.delete(deleteList);
        } catch (IOException e) {
            e.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.
        }
    }

    public void deleteMessagesForGivenUserAndDate(long userId,Date date){
        SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyyMMdd");
        String key="";
        HTableInterface tableInterface = null;
        try {
            tableInterface = new HTable(configuration,POSTS_TABLE);
            key = userId + ":" + simpleDateFormat.format(date);
            Delete delete = new Delete(key.getBytes());
            tableInterface.delete(delete);
        } catch (IOException e) {
            e.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.
        }
    }

    public void deleteAllMessagesForGivenUser(long userId){
        try{
            HTableInterface tableInterface = new HTable(configuration,POSTS_TABLE);
            Scan scan = new Scan();
            RowFilter rowFilter = new RowFilter(CompareFilter.CompareOp.EQUAL,new SubstringComparator(userId+""));
            scan.setFilter(rowFilter);
            ResultScanner resultScanner = tableInterface.getScanner(scan);
            List<Delete> deleteKeyList = new ArrayList<Delete>();
            Iterator<Result> iterator = resultScanner.iterator();
            while(iterator.hasNext()){
                deleteKeyList.add(new Delete(iterator.next().getRow()));
            }
            tableInterface.delete(deleteKeyList);
        }catch (IOException e){
            e.printStackTrace();
        }
    }

    public List<User> list(){
        SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyyMMdd");
        List<User> userList = new ArrayList<User>();
        try {
            HTableInterface tableInterface = new HTable(configuration,POSTS_TABLE);
            ResultScanner resultScanner =  tableInterface.getScanner(MESSAGE_COLUMN_FAMILTY.getBytes());
            Iterator<Result> resultIterator = resultScanner.iterator();
            Result result = null;
            StringTokenizer stringTokenizer = null;
            Date messageDate = null;
            User user = null;
            List<Message> messageList = null;
            while(resultIterator.hasNext()){
                result = resultIterator.next();
                user = new User();
                messageList = new ArrayList<Message>();
                stringTokenizer = new StringTokenizer(Bytes.toString(result.getRow()),":");
                user.setId(Long.parseLong(stringTokenizer.nextToken()));
                try {
                    messageDate = simpleDateFormat.parse(stringTokenizer.nextToken());
                } catch (ParseException e) {
                    e.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.
                }
                NavigableMap<byte[],byte[]> cqValueMap = result.getFamilyMap(MESSAGE_COLUMN_FAMILTY.getBytes());
                Message message = null;
                for(byte[] cq : cqValueMap.navigableKeySet()){
                    message = new Message();
                    message.setMessageId(new Long(Bytes.toString(cq)));
                    message.setMessageText(Bytes.toString(cqValueMap.get(cq)));
                    message.setMessagePostedOn(messageDate);
                    messageList.add(message);
                }
                user.setMessageList(messageList);
                int index = userList.indexOf(user);
                if(index!=-1){
                    userList.get(index).getMessageList().addAll(messageList);
                } else {
                    userList.add(user);
                }
            }


        } catch (IOException e) {
            e.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.
        }
        return userList;
    }








 ====== REST Class for Hbase POSTS table CRUD Operations==========
package com.restapi.controller;

import com.restapi.hbase.PostDAO;
import com.restapi.model.User;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpStatus;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.*;

import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.List;

/**
 * Created with IntelliJ IDEA.
 * User: santosh
 * Date: 13/1/14
 * Time: 1:15 PM
 * To change this template use File | Settings | File Templates.
 */
@Controller
@RequestMapping("/messageService")
public class UserMessageController {
    @Autowired
    private PostDAO postDAO ;


    @RequestMapping(value = "/getAllUsersMessages",method = RequestMethod.GET)
    @ResponseBody
    public List<User> getAllUsersMessages(){
        return postDAO.list();
    }

    @RequestMapping(value = "/createUserWithMessages" , method = RequestMethod.POST)
    @ResponseStatus( HttpStatus.CREATED )
    @ResponseBody
    public User create(@RequestBody User user){
        return postDAO.create(user);
    }

    @RequestMapping(value = "/createUsersWithMessages" , method = RequestMethod.POST)
    @ResponseStatus( HttpStatus.CREATED )
    public void create(@RequestBody List<User> userList){
        this.postDAO.create(userList);
    }

    @RequestMapping(value = "/getMessagesForUserAndDate/{userId}/{date}" , method = RequestMethod.GET)
    @ResponseBody
    public User getMessagesForUserAndDate(@PathVariable("userId") long userId,@PathVariable("date") String strdate){
        SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyyMMdd");
        Date date = null;
        try {
            date = simpleDateFormat.parse(strdate);
        } catch (ParseException e) {
            e.printStackTrace();
        }
      return  this.postDAO.getUserMessagesForGivenDate(userId,date);
    }

    @RequestMapping(value = "/getMessagesForUser/{uerId}" , method = RequestMethod.GET)
    @ResponseBody
    public User getMessagesForUser(@PathVariable("userId")long userId){
        return this.postDAO.getAllMessagesForGivenUser(userId);
    }

    @RequestMapping(value = "/updateUser" , method = RequestMethod.PUT)
    @ResponseStatus(HttpStatus.OK)
    public User update(@RequestBody User user){
        return this.postDAO.update(user);
    }

    @RequestMapping(value = "/deleteSpecificMessages" ,method = RequestMethod.DELETE)
    @ResponseStatus(HttpStatus.OK)
    public void deleteSpecificMessages(@RequestBody User user){
         this.postDAO.deleteSpecificMessages(user);
    }

    @RequestMapping(value = "/deleteMessagesForUserAndDate/{userId}/{date}",method = RequestMethod.DELETE)
    @ResponseStatus(HttpStatus.OK)
    public void deleteMessagesForUserAndDate(@PathVariable("userId")long userId,@PathVariable("date")String strdate){
        SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyyMMdd");
        Date date = null;
        try {
            date = simpleDateFormat.parse(strdate);
        } catch (ParseException e) {
            e.printStackTrace();
        }
        this.postDAO.deleteMessagesForGivenUserAndDate(userId,date);
    }
    @RequestMapping(value = "/deleteMessagesForUser/{userId}",method = RequestMethod.DELETE)
    @ResponseStatus(HttpStatus.OK)
    public void deleteMessagesForUser(long userId){
        this.postDAO.deleteAllMessagesForGivenUser(userId);
    }

}


No comments:

Post a Comment