Showing posts with label Distributed Systems. Show all posts
Showing posts with label Distributed Systems. Show all posts

Friday, November 7, 2014

Configuring Hadoop on Mac OSx in pseudo-distributed cluster mode.


Hadoop is an open-source Apache project that enables processing of extremely large datasets in a distributed computing environment. There are three different modes in which it can be run:

1. Standalone Mode
2. Pseudo-Distributed Mode
3. Fully-Distributed Mode

This post covers setting up of Hadoop 2.5.1 in a Pseudo-distributed mode. A Pseudo-Distributed mode is one where each hadoop daemon runs as a separate java process.

Prerequisites


Java: Install Java if it isn’t installed on your mac.
Homebrew: Homebrew is a package manager for Mac. You can find the installation instructions here
Keyless SSH : First, ensure Remote Login under System Preferences -> Sharing is checked to enable SSH. Generate the key pairs.
$ssh-keygen -t dsa -P '' -f ~/.ssh/id_dsa
$cat ~/.ssh/id_dsa.pub >> ~/.ssh/authorized_keys
Now ssh into your localhost and allow authorization.

Installation


This is where Homebrew is used.
$brew install Hadoop
If you do not want to use homebrew or you want to install a specific version of Hadoop, you can download it from the Apache Hadoop. Unpack the .tar to the location of your choice and assign ownership to the user setting up Hadoop.

Configuration


Every component of Hadoop is configured using an XML file specifically located in /usr/local/Cellar/hadoop/2.5.1/libexec/etc/hadoop.MapReduce properties go in mapred-site.xml, HDFS properties in hdfs-site.xml and common properties in core-site.xml. The general Hadoop environment properties are found in hadoop-env.sh.

hadoop-env.sh

Replace the existing HADOOP_OPTS with following.
export HADOOP_OPTS="-Djava.security.krb5.realm=OX.AC.UK -Djava.security.krb5.kdc=kdc0.ox.ac.uk:kdc1.ox.ac.uk"
If Homebrew was not used to install Hadoop, kindly point the JAVA_HOME to your java installation.

core-site.xml
<configuration>
    <property>
        <name>fs.defaultFS</name>
        <value>hdfs://localhost:9000</value>
    </property>
</configuration>

hdfs-site.xml

The Hadoop Distributed File System properties go in this config file. Since we are only setting up one node, we set the value of dfs.replication to 1.
<configuration>
    <property>
        <name>dfs.replication</name>
        <value>1</value>
    </property>
</configuration>


Execution

Before starting the daemons we must format the newly installed HDFS.
$ cd /usr/local/Cellar/hadoop/2.5.1/libexec/bin
$ hdfs namenode -format

Start the Daemons:
$ cd /usr/local/Cellar/hadoop/2.5.1/libexec/sbin
$ ./start-dfs.sh

Monitoring
Check the output of jps
$jps
10756 NameNode
1282 
10842 DataNode
11022 Jps
10951 SecondaryNameNode
1842 

Alternatively, the web interface for the NameNode can be browsed at http://localhost:50070

Running Examples
1. Create the HDFS directories required to execute MapReduce jobs:
$ cd /usr/local/Cellar/hadoop/2.5.1/libexec/bin
$ hdfs dfs -mkdir /user
$ hdfs dfs -mkdir /user/<username>

2. Copy the input files to the Hadoop Distributed File System
$ hdfs dfs -put ../etc/hadoop input

3. Run the example provided
$ hadoop jar share/hadoop/mapreduce/hadoop-mapreduce-examples-2.5.1.jar grep input output 'dfs[a-z.]+'

4. View the output files on HDFS
$ hdfs dfs -cat output/*

Reference : http://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-common/SingleCluster.html#Pseudo-Distributed_Operation

Thursday, August 21, 2014

An Introduction to Zookeeper - Part I of the Zookeeper series

A distributed system consists of multiple computers that communicate through a computer network and interact with each other to achieve a common goal.Major benefits that distributed systems offer over centralized systems is scalability and redundancy. Systems can be easily expanded by adding more machines as needed, and even if one of the machines is unavailable, the service continues to be available. However, such a system comes with its own challenges.

Zookeeper is an open source, high-performance coordination service for distributed applications. It tackles the common challenges that distributed applications face. It exposes common services that are required in the distributed environment like naming, configuration management, group service and provides the solution to distributed synchronization.

Zookeeper can be run in either a single-server mode or cluster(replicated) mode. Running zookeeper in single-server mode does not take advantage of zookeeper’s inherent features of high availability and resilience. Typically in a production environment, Zookeeper is run in a multi-server mode. A zookeeper cluster is called as an ensemble. A leader is elected on service startup. If the leader goes down, a new leader is elected. Clients only connect to a single zookeeper server and maintain a TCP connection. A Client can read from any zookeeper server, however writes go through the leader and needs a majority consensus.

Zookeeper provides sequential consistency guarantee, i.e., updates are applied in the order in which they are sent. It guarantees atomic updates, i.e., the updates either succeed or fail, there are no partial updates. It guarantees that a zookeeper client sees the same view of the service irrespective of the server in the ensemble that it connects to. A server will not accept a connection from a client until it has caught up with the state of the server to which the client was connected previously. Zookeeper ensures reliability, i.e., if an update succeeds in Zookeeper, then it is not rolled back. Zookeeper guarantees timeliness, i.e., a client is bound to see system changes within a certain time bound.

Zookeeper is an eventually consistent system, i.e., it does not guarantee that different clients will have identical view of zookeeper data at every instance in time. But it guarantees that if a follower falls too far behind the leader, then it goes offline.

The distributed processes using Zookeeper coordinate with each other through shared hierarchical namespaces. These namespaces are organized like UNIX file system. More on Zookeeper in the next post.

Sunday, August 3, 2014

Configuration and Coordination with Zookeeper

It took me a while to understand the concept of Zookeeper and it took me another some to understand how to use it for the task that I had begun with. This post is intended to help others cross the bridge faster.
Dynamic Configuration Management for today's system comes with all the nitty-gritties that are involved with a distributed environment. A distributed environment is relatively unreliable, with problems like network failures, clock synchronization, and it becomes the responsibility of each server to keep track of correctness of current configuration. These problems are the motivation behind configuration and coordination systems.

This particular post looks at Zookeeper as a configuration management tool. A short and good course on Zookeeper

Setup: A zookeeper cluster with 5 servers. A java wrapper that uses zookeeper to keep track of configuration changes. For our purposes we used the Curator framework.

Read zookeeper multi-server setup on a single host for setting up zookeeper cluster.

In your java project include the dependency for the curator framework.

Initializing connection to Zookeeper ensemble
public static CuratorFramework createConnection(String zookeeperConnectionString) {
    //First retry will wait for 1 second, the second will wait up to 2 seconds, the third will wait
    //upto 4 seconds.
    RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
    CuratorFramework client = CuratorFrameworkFactory.builder().connectString(zookeeperConnectionString)
            .retryPolicy(retryPolicy)
            .namespace("session_service")
            .canBeReadOnly(true)
            .zookeeperFactory(new DefaultZookeeperFactory())
            .build();
    client.start();
    return client;
}

Creating a new znode
public static void create(CuratorFramework client, String path) throws Exception {
    client.create().forPath(path);
}

Setting data of a znode
public static void setData(CuratorFramework client, String path, String data) throws Exception {
    byte[] payload = data.getBytes();
    client.setData().forPath(path, payload);
}

Deleting a znode
public static void delete(CuratorFramework client, String path) throws Exception {
    client.delete().forPath(path);
}

Get children and set the given watcher on the node
public static List watchedGetChildren(CuratorFramework client, String path) throws Exception {
    return client.getChildren().usingWatcher(new WatcherImpl(client,path)).forPath(path);
}

Get Data of the node and set a watcher on the node
public static String getData(CuratorFramework client, String path) throws Exception{
    String str = new String(client.getData().usingWatcher(new WatcherImpl(client,path)).forPath(path));
    return str;
}

Zookeeper works on the idea of client setting watches on znodes. Whenever the znode changes, the watch is triggered, that is, the client is notified.

We should have a class that implements the Watcher interface. This class should implement the process method. This is called if and when the corresponding changes occur.
public class WatcherImpl implements Watcher{
    @Override
    public void process(WatchedEvent event) {
        if(event.getType() == Event.EventType.NodeDataChanged) {
     System.out.println("The Data has changed");
        }
        else if(event.getType() == Event.EventType.NodeChildrenChanged){
     System.out.println("Children have changed");
        }
    }
}

Its important to note that watches are one time event and if its required to continue monitoring the changes the watch needs to be set again. All the read operations like getData(), getChildren() and exists() have the option of setting a watch. These watches are triggered on the corresponding changes.

That's all for getting the configuration management with zookeeper up and running.