JONCHAN-CN
4/11/2020 - 9:31 AM

jdbc-hive-core


# -*- coding: utf8 -*-
# https://github.com/ghostyusheng/python-hive-jdbc.git
import glob
import logging
import os
import sys
import traceback

import jaydebeapi
import yaml

logger = logging.getLogger(f'main.{str(os.path.basename(sys.argv[0])).strip(".py")}')
# 获取环境变量env,读取对应的数据库参数
try:
    env = os.getenv('env')
except:
    logger.error('Environment value not set!')
    env = 'stg'
try:
    cfg = yaml.load(open('./db-%s.yaml' % env, 'r'), Loader=yaml.FullLoader)
except:
    logger.error('No database config for current environment!')


class Jdbc:
    def __init__(self):
        self.ip = cfg['hive']['ip']
        self.port = cfg['hive']['port']
        self.db = cfg['hive']['db']
        self.user = cfg['hive']['user']
        self.passwd = cfg['hive']['passwd']
        self.driver_ver = cfg['hive']['driver_version']
        self.base_sql = [
            "set hive.mapred.mode=nonstrict",
            "set hive.strict.checks.cartesian.product=false",
            "set hive.execution.engine=tez"
        ]

    def connect(self, db=None):
        if db is not None:
            self.db = db
        url = f'jdbc:hive2://{self.ip}:{self.port}/{self.db}'
        dirver = 'org.apache.hive.jdbc.HiveDriver'
        hive_jar = glob.glob(f"./lib/*{self.driver_ver}.jar")
        _jar = glob.glob('./lib/[!ve]*.jar')
        DIR = os.getcwd() + '/lib/'
        jarFile = [
            DIR + 'commons-logging-1.2.jar',
            DIR + 'libthrift-0.12.0.jar',
            DIR + 'httpclient-4.5.9.jar',
            DIR + 'httpcore-4.4.11.jar',
            DIR + 'slf4j-api-1.7.26.jar',
            DIR + 'curator-framework-4.2.0.jar',
            DIR + 'curator-recipes-4.2.0.jar',
            DIR + 'curator-client-4.2.0.jar',
            DIR + 'commons-lang-2.6.jar',
            DIR + 'hadoop-common-3.2.0.jar',
            DIR + 'httpcore-4.4.11.jar',
            DIR + 'guava-28.0-jre.jar',
        ]
        [jarFile.append(i) for i in hive_jar]

        try:
            self.conn = jaydebeapi.connect(dirver, url, [self.user, self.passwd], jarFile)
            self.curs = self.conn.cursor()
            for _sql in self.base_sql:
                self.curs.execute(_sql)
            logger.info('Connected to %s' % self.ip)
        except Exception:
            logger.error('Connect to HIVE via JDBC FAIL!\n')
            logger.error(traceback.format_exc())

    def query(self, sql):
        try:
            self.curs.execute(sql)
            return self.curs.fetchall()
        except Exception:
            logger.error('Execute sql FAIL! - %s' % sql)
            logger.error(traceback.format_exc())
            return None

    def disconnect(self):
        self.curs.close()
        self.conn.close()