Tag Archives: hadoop

Using Apache Nifi to Stream Live Twitter Feeds to Hadoop

nifi

Introduction

with data sources are producing more data over time,with big data evolvement and IoT, with the continuous new sources that enterprises are trying to capture, a mechanism of architecting, visualising the data flow, monitoring and watching the noise that would become signal and a direction for a decision next day, along with enterprise requirements for security like encryption, and data quality at read rather than post process that requires an extensive amount of time from resources.

Apache Nifi (Acquired recently by Hortonworks) comes along with  a web based data flow management and transformation tool, with unique features like configurable back pressure, configurable latency vs. throughput, that allows Nifi to tolerate fails in network, disks, software crashes or just human mistakes…

a full description and user guide can be found here

in this example, we will show how to build an easy twitter stream data flow that will collect tweets that mentions the word “hadoop” over time and push these tweets into json file in HDFS.

Prerequisites:

  • Hortonworks Sandbox
  • 8GB RAM memory and preferably 4 processor cores.
  • Twitter dev account with Oauth token, follow steps here to set it up if you done have one.

Installation:

once you have downloaded and started the Hortonworks Sandbox, you can proceed with ssh connectivity to the sandbox, once you are in you would need to download Apache Nifi using the following command:

cd /hadoop
wget http://apache.uberglobalmirror.com//nifi/0.2.1/nifi-0.2.1-bin.tar.gz
tar -xvzf nifi-0.2.1-bin.tar.gz

after Apache Nifi is now extracted, we need to change the http web port no. from 8080 to 8089 so it doesn’t conflict with Ambari, you can do this by editing the file under /hadoop/nifi-0.2.1/conf/nifi.properties

nifi.web.http.port=8089

Confirm installation is successful by starting Apache Nifi and logging to the webserver

/hadoop/nifi-0.2.1/bin/nifi.sh start

now point your browser to the IP address of the Sandbox followed by the port no. 8089, in my setup it looks like the following:

http://172.16.61.130:8089/nifi

Creating a Data Flow

In order to connect to Twitter, we would either create a whole data workflow from scratch, or use a twitter template that is already available under the templates from Apache Nifi here, from these templates we will download the pull_from_twitter_garden_hose.xml file and place it on your computer.

wget https://cwiki.apache.org/confluence/download/attachments/57904847/Pull_from_Twitter_Garden_Hose.xml?version=1&modificationDate=1433234009000&api=v2

once the template is downloaded we can go to the webserver and add the template by clicking the template button on the left side corner and browse for the downloaded template to add it.

Template Uploading

now Browse to the xml file downloaded in the previous step and upload it here.

Template Confirmationyou will see the template that you have installed as marked in red, once this step is completed, lets add the template to the workspace and start configuring the processors, you can do that by adding a template using the button on the right top corner as followingAdd TemplateChoose Template

once you add the template you will end up with something like this:

Twitter Templateyou can start discovering every processor, but in a nutshell this is what every processor is doing:

Grab Garden Hose: Connecting To twitter and downloading the tweets based on the search terms provided using the twitter stream api.

Pull Key Attributes: evaluates one or more expressions against the set values, in our case its SCREEN_NAME,TEXT,LANGAUGE,USERID and we want to make sure the value is not NULL for those otherwise it will not mean much to us, the criteria is set to on “Matched” which means only Matched criteria will be passed to the next processor.

Find only Tweets: filters tweets that has no body message, retweets..etc

copy of Tweets: this is an output port that copy these into the Apache Nifi folder under /$NIFI_HOME/content_repository

now, lets create a new processor that copies the data to HDFS, but before we do that lets create the deistination folder on Hadoop by using the following command

hadoop fs -mkdir -p /nifi/tweets

now lets add the processor by clicking on the processor button on the top right corner

Add Processor

Now, lets choose the PutHDFS Processor, you can easily search for it in the search bar on the top.

putHDFS Processor

connect the HDFS Processor to the Find Only Tweet Processor and choose “tweet” as a relationship


Screenshot 2015-09-02 10.56.28
Screenshot 2015-09-02 10.19.33
now right click on the putHDFS processor and configure, you need to choose how the processor will terminate the flow, and  since this is the last processor in the flow and it wont pass any data beyond, tick all the auto-termination boxes.

Screenshot 2015-09-02 10.59.49

and go to the properties tab and add the hdfs-site.xml and core-site.xml locations, usually they are under /etc/hadoop/2.3.0.0-2557/0/core-site.xml if you are downloading the latest 2.3 sandbox/ , also dont forget to add the folder that we have created earlier on HDFS.

Screenshot 2015-09-02 11.00.51

hopefully after this you should get a red stop button instead of a warning logo on the processor box, if not check what the error is by keeping the cursor on the warning logo.

lets configure the twitter processor and add the consumer key and the access token information, notice that the consumer secret and the token secret are encrypted and wont be visible, also make sure you change the twitter end point to “Filter Endpoint” otherwise your search terms wouldn’t be active.

Screenshot 2015-09-02 10.12.52

Screenshot 2015-09-03 08.15.40

as you can see in the search terms we have added the word “hadoop”, once you are done, verify with the warning logo disappearing from the box.

now we are ready to start the flow, just simply right-click on each box and start, you should start seeing numbers and back pressure information, after a while you can stop it and verify that json files are now stored on HDFS.

Screenshot 2015-09-02 10.30.09

nifi-output

you can always add more processors to encrypt, compress or transform the files to Avro or SEQ files for example prior to dropping them in HDFS, a complete list of the available processors can be found here

The Rise of Analytics, Hadoop and The Data Lake – Analytics

In my current role, I am being so lucky getting insights from different customers around the rise of their analytics story, and the baby steps companies out there are taking in order to get faster insights on their customer base before some competitor does in order to retain the customer base and possibly grow it, along with the baby steps some giant mistakes are occurring returning the company a thousand step backwards, especially in a field where FINALLY application and storage meet together outside performance requirement and IOPS discussions but talking about the governance of the data, storage , access, protection and most importantly Automation.

In the last 2 decades of data storage and with more and more application being developed in organizations, data ended up in silos of storage, some are still being tier one served from a tier one SAN storage other are shared folders and web logs being stored in a slower more economic and denser tier which could be a NAS storage.

The last big step in the database world was data warehousing, where many of databases where living isolated from each other, not allowing for a collaborative way of achieving analytics efficiently in an organization, over the last decade most of the CIOs task list was about the Data Warehousing Strategy.

Analytics on databases is a way of creating complex (sometime not) queries or searches from a data source(s), others may call it data mining well all , in my opinion, There are three different stages for analytics.

Stage 1 started with single database analytics, Stage 2 started a decade ago with analytics performed on multiple data sources and databases mostly using data warehousing where the data was mostly structured, Stage 3 that is discovering or just about to implement it which is analytics based on the same old structured data that we use to query in addition to the unstructured data.

Handling structured data is so mature now that most people understands that I need a relational database to handle my customer orders records for example (Although NOSQL databases are a valid replacement for this market), unstructured data came in with its own challenges, I can write a an 3000 word article on each of them but shortly for this one its data ingestion and flow where tools like Apache storm and Pivotal springXD excels at, software storage layer where a file system like Hadoop is the preferred way to go depends on the case you may use MongoDB, Cassandra…etc a friend of mine actually forwarded me a great article on when to use whathttp://kkovacs.eu/cassandra-vs-mongodb-vs-couchdb-vs-redis

Unstructured data varies from machine generated data (e.g. server logs) web logs (cookies, customers movement on the web, tracking..etc) social media like twitter, facebook, mobile phones beacons (Bluetooth and Wifi) and any other information that relates to your business, customers or product.

These days organizations are taking analytics really seriously, I can see these job ads everywhere for data engineers and data scientists (yes there is a major different), I have met tons of these data scientists here and no not all of them are R experts, some of them only know couple of visualization tools like Tableau but they are able to get great insights from the data, some of them doesn’t even know basic shell commands on unix, and some of them are completely Microsoft windows only users….

Retail, City Councils, Airports, Insurance, Finance, Banking, Government, Defense, and even small startups have started doing something around analytics, infact some of them talk about innovation and Gartner Pace Layered approach (Open Data Lake), in Airports we talk about flight delays prediction, Bluetooth beacons, city councils does parking sensors on the streets for sending the inspector when time is expired for a certain occupied bay and traffic light management using beacons, Retail needs better insights and not the overnight ones from EDW,also their massively increasing foot print of data, insurance collects data from mobile apps in order and correlate customers to their information in order to retain their footprint, health insurance are giving away fit bits to offer better discounts for active healthy customers, cant talk about government and defense.

in the next episodes I will be  connecting more dots together in technology prespecitves, hadoop, in-memory databases, mpp databases,reporting tools,lambda architecture,gartner stuff, …etc stay tuned…

Twitter Analysis for #Emcworld 2014 using SpringXD, Pivotal, Hawq, Isilon and Tableau

 

Did you ever wonder what people were mainly talking about in #EMCWORLD? what are the most products that were grabbing most of the attention of attendees? who were the top Twitter contributors? how many Tweets did we get for #EMCWORLD this year?

#EMCWORLD 2014 Twitter Analysis Results

Disclaimer

EMC World ran from 5th of May to the 8th of May 2014 in Las Vegas, I was able to collect around 30,000 tweets, unfortunately there were many missing tweets due to the twitter API limitation at this point and not being able to consistently get all updates as Twitter restricts you by the rate of data being pulled, but at least I am covering no less than 70% of the twitter data in that period which is usually enough to make an idea of the top stuff that was going on.

I have done my best efforts to make this simple, however due to limited time to put this together they might be some missing pieces or not in depth explained steps that I will be refining over the next couple of weeks, therefore I appreciate your patience.

I have used many products in order to get this going, make sure you read the terms and conditions before downloading and agree on the terms on your own risk (although most of them are free for at least 30 days trial, and the rest is free for personal/educational purposes).

I have not covered Isilon Storage Nodes piece in this article, however I will be updating it shortly to show you how to connect isilon into PHD 1.1 so please stay tuned.

Introduction

Running analysis on Twitter is one emerging way of analysing human responses to products, politics and events in general, for many organization out there understanding the market behaviour is through what people mention on Twitter, who are the most people that tweet with the most influence…etc

The outcome of such analysis is usually GOLD to marketing departments and support organization that can make sure customer satisfaction is guaranteed by capturing outcome of marketing events,surveys, product support, general product satisfaction..etc .etc

The Hadoop Part

In this exercise all tweets from a specific hash-tag will be streamed to Hadoop, in my case I am using Pivotal HD and storing the data on HDFS Isilon Storage (not local disk), usually having a Scale Out HDFS storage like Isilon is an ease of mind for enterprises as you separate your compute layer from the storage layer, better protection of data with tiering of disk pools, adding more compute with no storage, and you can still serve file-server  / Home Directories / and lots of more workload on the same storage, for more information on why Isilon and Hadoop click here.

SpringXD

Another piece of the puzzle is SpringXD, it allows you to connect many sources as Mail,Http,Twitter,Syslogs,Splunk….etc it has ready made APIs to connect to sources and grab you the information you are after, then get sinked to sink of your choice e.g. HDFS,FILE,JDBC,Splunk..etc, in our case the source is Twitter and the  Sink is HDFS for more information on how to connect different sources and sinks click here.

Hawq

Once SpringXD is up and Running we need a way to query data, and that is why we are using Pivotal HAWQ which is the fastest SQL-Like implementation on Hadoop until now, Hawq will enable us to create external tables through PXF on the fly, once we are done we can create views and manipulate the data in SQL if we want to and export it later on.

Tableau

Tableau will come in place after all data is gathered so we can create a pretty visualization of a mixture of text data coming out from SpringXD and table data coming out of Hawq. you can always use different software like SpotFire from TIBCO or Cognos.

Now lets start

Prep Work

– Download PHD VM 1.1 (currently its up to 2.0 but unfortunately SpringXD only supports PHD 1.1 at the moment same applies to Isilon) from here
– Download SpringXD M6 from here
– Create a Twitter account if you dont have one
– Sing-in to dev.twitter.com, create an application and generate Keys, take note of all the keys.

– Download WINSCP and transfer SpringXD to your Pivotal HD host from here

winscp2

Deploying Pivotal HD

This is a straight forward process when you just fire the VM up with Vmware workstation and it should properly work, make sure you assign enough Memory and CPU by default it will take 4GB of RAM and 1 CPU,1 Core, I would bring it up to 8GB and 2 CPU or at least 2 cores if possible.

btw, credentials are usually root/password and gpadmin/Gpadmin1

Deploying SpringXD

Once you have PHD 1.1 running, using root user move the springXD folder to /home/gpadmin then start the sequence of the following commands:
using root:

#chown gpadmin:hadoop /home/gpadmin/<path to springXD>
#chmod 770 /home/gpadmin/<path yo SpringXD>

using gpadmin:
#unzip <home/gpadmin/<path to sprinXD>

unzipSpringXD
#vi ~/.bashrc
once you are in the file using vi add the following row
export XD_HOME=<path to SpringXD>
export JAVA_HOME=<path to JDK directory>

springxd export

We now need to start our HDFS platform and SpringXD (using gpadmin):

#~/Desktop/start_all.

startup_all

#vi /<path-to-xd>/xd/config/servers.yml

uncomment all the lines from spring to fsUri

servers-yml-config
#~/<path to SpringXD>/xd/bin/start-singlenode –hadoopDistro phd1

starting_springxd

Twitter’s API

Twitter uses two APIs for communication, TwitterSearch and TwitterStream, you can read in depth on the differences here but in my case TwitterStream kept coming back with rate-limit issues due to the amount of tweets I was getting, but I will still show you how to create both of them but before that lets check that everything is ready for SpringXD.

Reformatting Tweets with Python

using gpadmin, #hadoop fs -ls / , at this point you should see something like this:
hadoop_ls

if you cant see it, then PHD was not started, so start it up first, after that lets work on a little script that I have actually used from the guys here, this script that you can download from here will allow us to reformat the tweets from json, however we will need to modify it a little bit to get the text of the tweets as the script below

import groovy.json.JsonSlurper

def slurper = new JsonSlurper()
def jsonPayload = slurper.parseText(payload)
def fromUser = jsonPayload?.fromUser
def hashTags = jsonPayload?.entities?.hashTags
def followers = jsonPayload?.user?.followersCount
def createdAt = jsonPayload?.createdAt
def languageCode = jsonPayload?.languageCode
def retweetCount = jsonPayload?.retweetCount
def retweet = jsonPayload?.retweet
def text1 = jsonPayload?.text
def id = jsonPayload?.id

def result = “”
if (hashTags == null || hashTags.size() == 0) {
result = result + jsonPayload.id + ‘\t’ + fromUser + ‘\t’ + createdAt + ‘\t’ + ‘-‘ + ‘\t’ + followers + ‘\t’ + text1 + ‘\t’ + retweetCount + ‘\t’ + retweet
} else {
hashTags.each { tag ->
if (result.size() > 0) {
result = result + “\n”
}
result = result + jsonPayload.id + ‘\t’ + fromUser + ‘\t’ + createdAt + ‘\t’ + tag.text.replace(‘\r’, ‘ ‘).replace(‘\n’, ‘ ‘).replace(‘\t’, ‘ ‘) + ‘\t’ + followers + ‘\t’ + text1 + ‘\t’ + retweetCount + ‘\t’ + retweet
}
}

return result

save this script under ~/<path to springXD>/xd/modules/processors/scripts/reformat.script

Creating SpringXD Streams

Now we are ready to create our first stream, lets start with TwitterSearch, its usually easier and get the job done straight forward, remember the API keys are masked in this example therefore make sure you replace them with the one you have, as for this example we will query a hashtag let it be #Australia, which means we will be able to get all tweets that contains the #Australia hashtag, make sure you set the delay for at least 10000 otherwise you will get an error of rate-limit very soon:

using gpadmin, run to ~/<path-to-springxd>/shell/bin/xd-shell

xd-shell> hadoop config fs –namenode local

springxd-hadoop-setup

xd-shell> stream create –name Emc–definition “twittersearch –consumerKey=xxxxxoxRYkIQZzDgMicxxxx –consumerSecret=bZvxxxxx51hld3UXObHlCy4rc6bmXJhEMH3TjGIndDxxxxxN –query=’#Australia –fixedDelay=10000 –outputType=application/json | transform –script=reformat.script | hdfs –rollover=1000” –deploy

Now lets create a stream using the TwitterStreamAPI in addition to the search one that we have just did:

xd-shell> stream create –name TestTwitterStream –definition “twitterstream –consumerKey=xxxxxoxRYkIQZzDgMicxxxx –consumerSecret=bZvxxxxx51hld3UXObHlCy4rc6bmXJhEMH3TjGIndDxxxxxN –accessToken=xxxxx71851-pf9IWlpWiKT9mgk84pR9K6GFOrZiyWxxxxxxxxx –accessTokenSecret=pgQ2Yxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx –track=’#Emc’ | hdfs –rollover=1000” –deploy

 

by the way, SpringXD is supported to run on windows, so you are welcome to run it from an external client as long as you configure the IP address or the dns name when you start xd-shell using the command:

 

hadoop config fs –namenode hdfs://<hdfs-IP-ADDRESS-OR-DNS>:8020

Validating the Data

Once we have all tweets required for the analysis, lets say you were gathering tweets for hours, days…etc on a specific subject (In my case I was tracking #Emcworld event in Las Vegas for 4 days)

lets see how my files are looking like from the shell by using the following command with gpadmin user:

#hadoop fs -ls /xd/<the folder name>

hadoop files

you can calculate the sizes using the following command

#hadoop fs -du sh /xd/

Connecting to HAWQ

Now, all data looking good, I need to create my tweets table using the information I have so I can do some awesome queries and fetch some results, I have used the same command used by some pivotal experts like here

CREATE EXTERNAL TABLE tweets(
id BIGINT, from_user VARCHAR(255), created_at TIMESTAMP, hash_tag VARCHAR(255),
followers INTEGER, tweet_text VARCHAR(255), retweet_count INTEGER, retweet BOOLEAN)
LOCATION (‘pxf://pivhdsne:50070/xd/<directory of your tweet texts>*.txt?Fragmenter=HdfsDataFragmenter&Accessor=TextFileAccessor&Resolver=TextResolver’)
FORMAT ‘TEXT’ (DELIMITER = E’\t’);

The output will show you a table that consists of all the HDFS files ingested into a the HAWQ table, from there you can start querying the table and get the information you are after, sometimes and depending on how big the files are you might need to clean up the files, mainly with another python script or manually using excel but this means you have to export the data merge them and reformat them with excel then back to the cluster once you are done, most of the clean up tasks will be around deleting null or empty values, you might wanna remove duplicates if any or re-sort the tweets.

Now, lets look on how Hawq is querying the data, lets for example see how many tweets do I have (to enter the psql prompt, jst type psql)

lets see what is the total tweets that we have received during EMCWORLD

 

count

 

Now lets see who was the top tweeters

no_of_tweets_per_user

I was wondering how many retweets we had vs original tweets

non-retweeted

 

retweeted

 

 

 

Now, I wanted to know how many mentions we had for most of EMC products, so I have created a table including all the products, and then created a view joining the tweets table the and product table to get the mentions per product code below:

create view products_tweets as select b.name product, count(a.*) product_count from tweets a,emc_products b where a.tweet_text like ‘%’||b.name||’%’ group by b.name order by count(a.*);

The result was fascinating
products_count

Exporting Data to Tableau

After we had all the fun with SQL commands and queries, I wanted to have a proper visualisation of the results, I had exported the files and the output of view table to text file and ingest it into Tableau, started playing with Tableau and that was the outcome.

If you are new to Tableau, you can go through some free tutorials and examples that will give you an idea how to start, usually its pretty simple unless you started to have more complex models, then a proper training is usually recommended.

 

below is one of the working sheets in Tableau showing the Top Hashtags being used without running a sql query.Tableau Results

Top Hashtags

Remember to comment or like my post if you enjoyed it, please feel free to ask any question, I am regularly updating this post to make sure there are not issues with downloading and I am refining the commands regularly to make it easier to deploy such analysis.