大数据技术 数据库并行读取和写入
沉沙 2018-10-11 来源 : 阅读 2314 评论 0

摘要:本篇教程探讨了大数据技术 数据库并行读取和写入,希望阅读本篇文章以后大家有所收获,帮助大家对大数据技术的理解更加深入。

本篇教程探讨了大数据技术 数据库并行读取和写入,希望阅读本篇文章以后大家有所收获,帮助大家对大数据技术的理解更加深入。

<


前言
一共23w条数据,是之前通过自然语言分析处理过的数据,附一张截图:

要实现对news主体的读取,并且找到其中含有的股票名称,只要发现,就将这支股票和对应的日期、score写入数据库。
显然,几十万条数据要是一条条读写,然后在本机上操作,耗时太久,可行性极低。所以,如何有效并行的读取内容,并且进行操作,最后再写入数据库呢?

并行读取和写入

并行读取:创建N*max_process个进程,对数据库进行读取。读取的时候应该注意:

每个进程需要分配不同的connection和对应的cursor,否则数据库会报错。
数据库必须能承受相应的高并发访问(可以手动更改)


实现的时候,如果不在进程里面创建新的connection,就会发生冲突,每个进程拿到权限后,会被下个进程释放,所以汇报出来NoneType Error的错误。

并行写入:在对数据库进行更改的时候,不可以多进程更改。所以,我们需要根据已有的表,创建max_process-1个同样结构的表用来写入。表的命名规则可以直接在原来基础上加上1,2,3...数字可以通过对max_process取余得到。

此时,对应进程里面先后出现读入的conn(保存消息后关闭)和写入的conn。每个进程对应的表的index就是 主循环中的num对max_process取余(100->4,101->5),这样每个进程只对一个表进行操作了。

部分代码实现
max_process = 16 #最大进程数

def read_SQL_write(r_host,r_port,r_user,r_passwd,r_db,r_charset,w_host,w_port,w_user,w_passwd,w_db,w_charset,cmd,index=None):
    #得到tem字典保存着信息
    try:
        conn = pymysql.Connect(host=r_host, port=r_port, user=r_user, passwd =r_passwd, db =r_db, charset =r_charset)
        cursor = conn.cursor()
        cursor.execute(cmd)
    except Exception as e:
        error = "[-][-]%d fail to connect SQL for reading" % index
        log_error('error.log',error)
        return 
    else:
        tem = cursor.fetchone()
        print('[+][+]%d succeed to connect SQL for reading' % index)
    finally:
        cursor.close()
        conn.close()
    
    try:
        conn = pymysql.Connect(host=w_host, port=w_port, user=w_user, passwd =w_passwd, db =w_db, charset =w_charset)
        cursor = conn.cursor()
        cursor.execute(cmd)
    except Exception as e:
        error = "[-][-]%d fail to connect SQL for writing" % index
        log_error('error.log',error)
        return 
    else:
        print('[+][+]%d succeed to connect SQL for writing' % index)
    
    
    r_dict = dict()
    r_dict['id'] = tem[0]
    r_dict['content_id'] = tem[1]
    r_dict['pub_date'] = tem[2]
    r_dict['title'] = cht_to_chs(tem[3])
    r_dict['title_score'] =tem[4]![](//images2015.cns.com//1172464/201706/1172464-20170609000900309-1810357590.png)

    r_dict['news_content'] = cht_to_chs(tem[5])
    r_dict['content_score'] = tem[6]
    
    for key in stock_dict.keys():
        #能找到对应的股票
        if stock_dict[key][1] and ( r_dict['title'].find(stock_dict[key][1])!=-1 or r_dict['news_content'].find(stock_dict[key][1])!=-1 ):
            w_dict=dict()
            w_dict['code'] = key
            w_dict['english_name'] = stock_dict[key][0]
            w_dict['cn_name'] = stock_dict[key][1]
            #得到分数
            if r_dict['title_score']:
                w_dict['score']=r_dict['title_score']
            else:
                w_dict['score']=r_dict['content_score']
            
            #开始写入
            try:
                global max_process
                cmd = "INSERT INTO dyx_stock_score%d VALUES ('%s', '%s' , %d , '%s' , '%s' , %.2f );" % \
                    (index%max_process ,r_dict['content_id'] ,r_dict['pub_date'] ,w_dict['code'] ,w_dict['english_name'] ,w_dict['cn_name'] ,w_dict['score'])
                cursor.execute(cmd)
                conn.commit()
            except Exception as e:
                error = "   [-]%d fail to write to SQL" % index
                cursor.rollback()
                log_error('error.log',error)
            else:
                print("   [+]%d succeed to write to SQL" % index)

    cursor.close()
    conn.close()
def main():
    num = 238143#数据库查询拿到的总数
    p = None
    for index in range(1,num+1):
        if index%max_process==1:
            if p:
                p.close()
                p.join()
            p = multiprocessing.Pool(max_process)
        r_cmd = ('select id,content_id,pub_date,title,title_score,news_content,content_score from dyx_emotion_analysis where id = %d;' % (index))
        p.apply_async(func = read_SQL_write,args=(r_host,r_port,r_user,r_passwd,r_db,r_charset,w_host,w_port,w_user,w_passwd,w_db,w_charset,r_cmd,index,))

    if p:
        p.close()
        p.join()

   

本文由职坐标整理发布,学习更多的大数据技术相关知识,请关注职坐标大技术云计算大技术技术频道!

本文由 @沉沙 发布于职坐标。未经许可,禁止转载。
喜欢 | 0 不喜欢 | 0
看完这篇文章有何感觉?已经有0人表态,0%的人喜欢 快给朋友分享吧~
评论(0)
后参与评论

您输入的评论内容中包含违禁敏感词

我知道了

助您圆梦职场 匹配合适岗位
验证码手机号,获得海同独家IT培训资料
选择就业方向:
人工智能物联网
大数据开发/分析
人工智能Python
Java全栈开发
WEB前端+H5

请输入正确的手机号码

请输入正确的验证码

获取验证码

您今天的短信下发次数太多了,明天再试试吧!

提交

我们会在第一时间安排职业规划师联系您!

您也可以联系我们的职业规划师咨询:

小职老师的微信号:z_zhizuobiao
小职老师的微信号:z_zhizuobiao

版权所有 职坐标-一站式AI+学习就业服务平台 沪ICP备13042190号-4
上海海同信息科技有限公司 Copyright ©2015 www.zhizuobiao.com,All Rights Reserved.
 沪公网安备 31011502005948号    

©2015 www.zhizuobiao.com All Rights Reserved