Create tables and load data in Hive

The data model that I want to try out on Hive is a dimensional database model that we currently use on PostgreSQL. Questions I want to answer are:

  1. To what extent can we use our current data model?
  2. Can Hive answer our queries?
  3. Is Hive the right Apache Hadoop part or are there other parts more appropriate?

I’m using the tutorial ‘Hive and Data ETL’ from Hortonworks as quick start: http://hortonworks.com/hadoop-tutorial/hello-world-an-introduction-to-hadoop-hcatalog-hive-and-pig/#section_4

To what extent can we use our current data model? The basic structure of our current data model is as follows:

  1. The model makes a distinction between so called ‘dimensions’ and ‘facts’. For a description of these terms and more information about dimensional modeling of data warehouses, see e.g. https://www.amazon.com/Data-Warehouse-Toolkit-Definitive-Dimensional/dp/1118530802/ref=dp_ob_title_bk
  2. Each variable or ‘attribute’ or ‘Feature’ in the data warehouse model is stored in it’s own fact table, named ‘bv_<x>’, where x is the feature number.
  3. Each bv_ record binds a set of dimensions, a feature and a timestamp to a certain value. As such, each bv_ table can be seen as a collection of time series data for a certain Feature or signal.
  4. The data model uses both arrays of integers, and json blobs. For our data model to work, Hive support for array’s and json is required.

The first step is to create CSV files with test data. The PostgreSQL COPY seemed a good initial candidate to create the CSV files with, but it turned out that COPY cannot output data without a quote character. For import into Hive, no quoting was most appropriate for my use case. Therefore I wrote the following python script to create the csv files:

import sys
import getopt
import psycopg2
import psycopg2.extras
import csv
import json

class DBConnection(object):
    def __init__(self, dsn):
        self.dsn = dsn
        self.conn = None

    def getconn(self):
        if self.conn is None:
            try:
                self.conn = psycopg2.connect(self.dsn)
            except psycopg2.OperationalError, e:
                print 'caught error {e}'.format(e=e.message)
                raise

        return self.conn


class DatabaseObject(object):
    def __init__(self, dbconn, project_schema):
        self.dbconn = dbconn
        self.project_schema = project_schema

    def execute_sql(self, sql, params=[]):
        cur = self.dbconn.cursor()
        cur.execute(sql, params)
        return cur.fetchall()


class Process(DatabaseObject):
    def __init__(self, dsn, project_schema, dbconn=None):
        self.dsn = dsn
        self.project_schema = project_schema
        self.dbconn = dbconn or DBConnection(dsn).getconn()

        self.feature = self.execute_sql("""
            SELECT *
            FROM {schema}.feature
            """.format(schema=self.project_schema))

        self.dimension = self.execute_sql("""
            SELECT *
            FROM {schema}.dimension
            """.format(schema=self.project_schema))

    def read_bv(self, feature_id):
        # use char 002 as array delimiter
        return self.execute_sql("""
            SELECT uoo_id,
            feature_id,
            translate(dimension_id::text, ',{{}}', chr(2)),
            translate(dimension_role::text, ',{{}}', chr(2)),
            time_lowvalue::timestamp,
            time_highvalue::timestamp,
            time_availability::timestamp,
            value
            FROM {schema}.bv_{i}
            """.format(schema=self.project_schema,
                       i=str(feature_id)))

    def dump(self, result, target):
        with open(target, 'wb') as csvfile:
            # use default char 001 column delimiter
            writer = csv.writer(csvfile, delimiter='\001', quotechar=None, quoting=csv.QUOTE_NONE)
            for line in result:
                row = []
                for i in range(len(line)):
                    if isinstance(line[i], dict) or isinstance(line[i], list):
                        # dump json into a string without unicode u' indicator
                        j = json.dumps(line[i])
                        row.append(j)
                    else:
                        row.append(line[i])
                writer.writerow(row)

    def run(self):
        self.dump(self.dimension, 'dimension.txt')
        self.dump(self.feature, 'feature.txt')
        for i in range(1, 11):
            self.dump(self.read_bv(i), "bv_%d.txt" % i)


def main(argv):
    workspace_dsn = ""
    project_schema = ""

    def usage(code):
        print 'Usage: python ' + argv[0] + ' -w <workspace dsn> -p <project schema>'
        sys.exit(code)

    try:
        opts, args = getopt.getopt(argv[1:], "hw:p:", ["workspace-dsn=", "project-schema="])
    except getopt.GetoptError:
        usage(2)
    for opt, arg in opts:
        if opt == '-h':
            usage(0)
        elif opt in ("-w", "--workspace-dsn"):
            workspace_dsn = arg
        elif opt in ("-p", "--project-schema"):
            project_schema = arg

    if workspace_dsn == "" or project_schema == "":
        usage(1)

    p = Process(workspace_dsn, project_schema)
    p.run()


if __name__ == '__main__':
    main(sys.argv[0:])

A couple of notes about this program:

  • The default delimiters of Hive are used: \001 for column delimiter, \002 for array delimiter.
  • The timestamptz PostgreSQL data *must* be converted to timestamp literal form, otherwise after import the timestamp columns contain null.
  • Json data is input as string. With no special handling in python 2, a nested object will be converted with u’ prepended to every text element in the json blob.

About array types on Hive: https://cwiki.apache.org/confluence/display/Hive/LanguageManual+Types#LanguageManualTypes-MiscTypes

To upload the CSV files, in Ambari, select the HDFS Files menu option:

select hdfs files in ambariNavigate to /tmp/ folder and create a new folder ‘axle’. AXLE is the name of the project that created the synthetic dataset that I am using.

create /tmp/axle

Also create /tmp/axle/data and navigate to it. Next, select bv_1.txt and press upload.

upload fileFinally, set all permissions in the /tmp/axle/data folder to all writable. This window can be opened by right clicking on the data folder:

set permissions

Next go to the Ambari Hive user view:

select hive user viewThe CSV data is imported in three steps:

  1. first define for each CSV file a Hive table with text storage
  2. associate this table with the earlier uploaded CSV file
  3. create for each table with text storage a table with ORC storage

Lets start with bv_1. In PostgreSQL the definition is as follows:

workspace=# \d bv_1
               Table "axle_basevalues.bv_1"
      Column       |           Type           | Modifiers 
-------------------+--------------------------+-----------
 uoo_id            | integer                  | not null
 feature_id        | integer                  | not null
 dimension_id      | integer[]                | 
 dimension_role    | dimension_role[]         | 
 time_lowvalue     | timestamp with time zone | not null
 time_highvalue    | timestamp with time zone | 
 time_availability | timestamp with time zone | 
 value             | text                     | 
Indexes:
    "bv_1_uoo_id_dimension_id_time_lowvalue_idx" UNIQUE, btree (uoo_id, dimension_id, time_lowvalue)
    "bv_1_time_lowvalue_idx" brin (time_lowvalue)
Inherits: bv

A straightforward text storage table in Hive is the following. I’ve chosen ‘,’ as field separator for now; later this is changed to Hive’s default character \001.

CREATE TABLE bv_1_stage (uoo_id int, feature_id int, dimension_id array&amp;amp;amp;amp;amp;amp;amp;lt;string&amp;amp;amp;amp;amp;amp;amp;gt;,
						 dimension_role array&amp;amp;amp;amp;amp;amp;amp;lt;string&amp;amp;amp;amp;amp;amp;amp;gt;,
						 time_lowvalue timestamp, time_highvalue timestamp,
						 time_availability timestamp,
						 value string)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
STORED AS TEXTFILE
TBLPROPERTIES ("skip.header.line.count"="1");

Note the skip.header.line.count=”1″ to skip the CSV header. Then press the Execute button. After a while the execution will be finished with status: succeeded:

execution finished

It can be verified that the table actually by refreshing the database button at the left side of the screen:

refresh database

Now the table is still empty. Load the earlier uploaded file into this table:

LOAD DATA INPATH '/tmp/axle/data/bv_1.txt' OVERWRITE INTO TABLE bv_1_stage;

In the database explorer, next to bv_1_stage, press the button to load sample data. We then see the data loaded with a new worksheet query and results like the following.

Sample data with wrong array

The dimension_id and dimension_role array’s are not loaded correctly. The question is what it the array literal syntax for Hive data loaded with CSV?

After some trial and error, I could import the array’s by using the delimiter ascii character 2, specified with the literal ‘\002’.

array literal upload ok

The next problem was that all timestamp columns where null, even though no errors or warnings were presented in the UI. After some experimentation it was clear the solution was to cast the PostgreSQL timestamptz datatype to timestamp. Now all the timestamp values in the input were loaded correctly:

timestamp loaded ok

The export scripts was modified to: use char 1 as field (column) separator and char 2 as array (collection) separator.

And this was the final set of actions to perform on Hive to load a data table:

drop table bv_1_stage;

CREATE TABLE bv_1_stage (uoo_id int, feature_id int, dimension_id array<int>,
                         dimension_role array<string>,
                         time_lowvalue timestamp, time_highvalue timestamp,
                         time_availability timestamp,
                         value string)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '\001'
COLLECTION ITEMS TERMINATED BY '\002'
STORED AS TEXTFILE
TBLPROPERTIES ("skip.header.line.count"="1");

LOAD DATA INPATH '/tmp/axle/data/bv_1.txt' OVERWRITE INTO TABLE bv_1_stage;

CREATE TABLE bv_1 STORED AS ORC AS SELECT * FROM bv_1_stage;
Advertenties

Geef een reactie

Vul je gegevens in of klik op een icoon om in te loggen.

WordPress.com logo

Je reageert onder je WordPress.com account. Log uit /  Bijwerken )

Google+ photo

Je reageert onder je Google+ account. Log uit /  Bijwerken )

Twitter-afbeelding

Je reageert onder je Twitter account. Log uit /  Bijwerken )

Facebook foto

Je reageert onder je Facebook account. Log uit /  Bijwerken )

Verbinden met %s