日常数据需求讲解2-json内容改造

背景

目前有这样一张表ods_cs_json表(算法模型返回客服舆情数据)有2个字段user_id、message,后续会将数据传输到后端kafka,message消息如下:

[
    {
        "criteriaOfJudgment": "该用户在客诉工单中有1条语兴好物圈需净化内容",
        "matchingDegree": "100%",
        "profilingTag": "语兴好物客诉舆情用户",
        "tagType": "modelTag",
        "datasource":"社区"
    },
    {
        "criteriaOfJudgment": "该用户在客诉工单中有1条语兴好物圈需净化内容",
        "matchingDegree": "100%",
        "profilingTag": "语兴好物客诉舆情用户",
        "tagType": "modelTag",
        "datasource":"电商"
    },
    {
        "criteriaOfJudgment": "该用户在客诉工单中有1条语兴好物圈需净化内容",
        "matchingDegree": "100%",
        "profilingTag": "语兴好物客诉舆情用户",
        "tagType": "modelTag",
        "datasource":"金融"
    }
]

内容属于list套json,可能多个json在list中,目前需求是接入一个新的数据源,动态舆情,看下有客诉的用户他动态舆情多少(通过user_id关联),并把原来客诉工单号也给带上,动态舆情对应数据表样式:

CREATE TABLE IF NOT EXISTS ods_trend_intelligence_di(
         user_id STRING COMMENT 'user_id',
         trend_id STRING COMMENT '动态id',
         url STRING COMMENT '备注',
) 

最终实现样式为,就是把动态舆情数据与算法推送topic数据合并一起

[
    {
        "criteriaOfJudgment": "该用户在客诉工单中有1条疑似被骗内容",
        "matchingDegree": "100%",
        "profilingTag": "疑似被骗用户",
        "tagType": "modelTag",
        "ticketList":"xxx",
        "datasource":"社区"
    },
    {
        "criteriaOfJudgment": "该用户共发布动态数1条",
        "matchingDegree": "0%",
        "profilingTag": "动态舆情用户",
        "tagType": "bizTag",
        "ticketList":"xxx,xxx,xx",
        "datasource":"社区"
    }
]

思路

思路1:首先拿到这个需求先要明确2个json的内容映射,需要找产品和后端对齐

json中字段

映射字段

备注

criteriaOfJudgment

计算好的动态数

matchingDegree

0%

由于是业务打标所以这里算法识别度为0%

profilingTag

动态舆情用户(文本)

tagType

bizTag(文本)

datasource

社区

思路2:由于原来list数据中是多json的,因此需要对list去炸裂,并解析出每个元素

思路3:对动态舆情数据进行指标计算,并且封装json

思路4:将动态舆情数据和算法推送客诉数据合并到一个list中

操作

insert overwrite table xxx.dwd_xxx_di partition(pt='${bizdate}')

select t0.user_id
      ,t1.message
from ods_cs_json t0

left join 
(

select concat('[',CONCAT_WS(',',message),coalesce(if(t1_1.content_json is not null ,concat(',',t1_1.content_json),null),''),']') as message
      ,user_id
from (
SELECT collect_list(to_json(map(
                    'criteriaOfJudgment',criteriaOfJudgment,
                    'matchingDegree',matchingDegree,
                    'profilingTag',profilingTag,
                    'tagType',tagType,
                    'datasource',datasource,
                    'ticketList',concat_ws(',',t1_0_1.ticket_list) 
                  )
            )
        ) as message
        ,user_id
from (
SELECT GET_JSON_OBJECT(message3,'$.criteriaOfJudgment')  as criteriaOfJudgment
      ,GET_JSON_OBJECT(message3,'$.matchingDegree')  as matchingDegree 
      ,GET_JSON_OBJECT(message3,'$.profilingTag')  as profilingTag --取出元素
      ,GET_JSON_OBJECT(message3,'$.datasource')  as datasource --取出元素      
      ,'modelTag' as tagType

      ,user_id

from (
select concat(replace(replace(message2,'[{','{'),',{','{'),'}') as message3 
      ,user_id
from (
select  user_id
       ,message
       ,datasource
from ods_cs_json
where pt='${bizdate_0}' --由于是实时返回数据,需要取t-0分区
)
lateral view explode(split(message,'}')) t as message2--炸裂
) 
where message3<>']}'
) t1_0_0

left join (
select collect_list(ticket_id) as ticket_list
      ,user_id
      ,datasource
from 
dwd_cs_json_di
where pt>='${bizdate_30}'
group by user_id
        ,datasource
) t1_0_1
on t1_0_0.user_id=t1_0_1.user_id
AND t1_0_0.datasource=t1_0_1.datasource


group by user_id   

) t1_0 


left join 

(
select user_id
      ,concat('{"criteriaOfJudgment":"',criteriaOfJudgment,'",',
                '"matchingDegree":"','0%','",',
                '"profilingTag":"','动态舆情用户','",',
                '"datasource":"','社区','",',
                '"ticketList":"',ticketList,'",',   
                '"tagType":"','bizTag','"}',
               
      ) as content_json
from (
select concat('该用户共发布动态数',trend_cnt,'条') as criteriaOfJudgment
       concat_ws(',',trend_list)) as ticketList
      ,user_id
from (
SELECT collect_list(concat(trend_id,'-',url)) as trend_list
      ,count(1) as trend_cnt
      ,user_id
FROM ods_trend_intelligence_di
WHERE pt >= '${bizdate_30}'--t-1
group by user_id
)
)
) t1_1 
ON cast(t1_0.user_id as string)=cast(t1_1.user_id as string)
) t1
on t0.user_id = t1.user_id
#数据人offer决赛圈怎么选##数据人的面试交流地##牛客创作赏金赛##聊聊我眼中的AI##Java#
全部评论

相关推荐

12-06 16:17
济宁学院 Java
点赞 评论 收藏
分享
评论
2
1
分享

创作者周榜

更多
牛客网
牛客网在线编程
牛客网题解
牛客企业服务