日常数据需求讲解2-json内容改造
背景
目前有这样一张表ods_cs_json表(算法模型返回客服舆情数据)有2个字段user_id、message,后续会将数据传输到后端kafka,message消息如下:
[
{
"criteriaOfJudgment": "该用户在客诉工单中有1条语兴好物圈需净化内容",
"matchingDegree": "100%",
"profilingTag": "语兴好物客诉舆情用户",
"tagType": "modelTag",
"datasource":"社区"
},
{
"criteriaOfJudgment": "该用户在客诉工单中有1条语兴好物圈需净化内容",
"matchingDegree": "100%",
"profilingTag": "语兴好物客诉舆情用户",
"tagType": "modelTag",
"datasource":"电商"
},
{
"criteriaOfJudgment": "该用户在客诉工单中有1条语兴好物圈需净化内容",
"matchingDegree": "100%",
"profilingTag": "语兴好物客诉舆情用户",
"tagType": "modelTag",
"datasource":"金融"
}
]
内容属于list套json,可能多个json在list中,目前需求是接入一个新的数据源,动态舆情,看下有客诉的用户他动态舆情多少(通过user_id关联),并把原来客诉工单号也给带上,动态舆情对应数据表样式:
CREATE TABLE IF NOT EXISTS ods_trend_intelligence_di(
user_id STRING COMMENT 'user_id',
trend_id STRING COMMENT '动态id',
url STRING COMMENT '备注',
)
最终实现样式为,就是把动态舆情数据与算法推送topic数据合并一起
[
{
"criteriaOfJudgment": "该用户在客诉工单中有1条疑似被骗内容",
"matchingDegree": "100%",
"profilingTag": "疑似被骗用户",
"tagType": "modelTag",
"ticketList":"xxx",
"datasource":"社区"
},
{
"criteriaOfJudgment": "该用户共发布动态数1条",
"matchingDegree": "0%",
"profilingTag": "动态舆情用户",
"tagType": "bizTag",
"ticketList":"xxx,xxx,xx",
"datasource":"社区"
}
]
思路
思路1:首先拿到这个需求先要明确2个json的内容映射,需要找产品和后端对齐
|
json中字段 |
映射字段 |
备注 |
|
criteriaOfJudgment |
计算好的动态数 |
|
|
matchingDegree |
0% |
由于是业务打标所以这里算法识别度为0% |
|
profilingTag |
动态舆情用户(文本) |
|
|
tagType |
bizTag(文本) |
|
|
datasource |
社区 |
思路2:由于原来list数据中是多json的,因此需要对list去炸裂,并解析出每个元素
思路3:对动态舆情数据进行指标计算,并且封装json
思路4:将动态舆情数据和算法推送客诉数据合并到一个list中
操作
insert overwrite table xxx.dwd_xxx_di partition(pt='${bizdate}')
select t0.user_id
,t1.message
from ods_cs_json t0
left join
(
select concat('[',CONCAT_WS(',',message),coalesce(if(t1_1.content_json is not null ,concat(',',t1_1.content_json),null),''),']') as message
,user_id
from (
SELECT collect_list(to_json(map(
'criteriaOfJudgment',criteriaOfJudgment,
'matchingDegree',matchingDegree,
'profilingTag',profilingTag,
'tagType',tagType,
'datasource',datasource,
'ticketList',concat_ws(',',t1_0_1.ticket_list)
)
)
) as message
,user_id
from (
SELECT GET_JSON_OBJECT(message3,'$.criteriaOfJudgment') as criteriaOfJudgment
,GET_JSON_OBJECT(message3,'$.matchingDegree') as matchingDegree
,GET_JSON_OBJECT(message3,'$.profilingTag') as profilingTag --取出元素
,GET_JSON_OBJECT(message3,'$.datasource') as datasource --取出元素
,'modelTag' as tagType
,user_id
from (
select concat(replace(replace(message2,'[{','{'),',{','{'),'}') as message3
,user_id
from (
select user_id
,message
,datasource
from ods_cs_json
where pt='${bizdate_0}' --由于是实时返回数据,需要取t-0分区
)
lateral view explode(split(message,'}')) t as message2--炸裂
)
where message3<>']}'
) t1_0_0
left join (
select collect_list(ticket_id) as ticket_list
,user_id
,datasource
from
dwd_cs_json_di
where pt>='${bizdate_30}'
group by user_id
,datasource
) t1_0_1
on t1_0_0.user_id=t1_0_1.user_id
AND t1_0_0.datasource=t1_0_1.datasource
group by user_id
) t1_0
left join
(
select user_id
,concat('{"criteriaOfJudgment":"',criteriaOfJudgment,'",',
'"matchingDegree":"','0%','",',
'"profilingTag":"','动态舆情用户','",',
'"datasource":"','社区','",',
'"ticketList":"',ticketList,'",',
'"tagType":"','bizTag','"}',
) as content_json
from (
select concat('该用户共发布动态数',trend_cnt,'条') as criteriaOfJudgment
concat_ws(',',trend_list)) as ticketList
,user_id
from (
SELECT collect_list(concat(trend_id,'-',url)) as trend_list
,count(1) as trend_cnt
,user_id
FROM ods_trend_intelligence_di
WHERE pt >= '${bizdate_30}'--t-1
group by user_id
)
)
) t1_1
ON cast(t1_0.user_id as string)=cast(t1_1.user_id as string)
) t1
on t0.user_id = t1.user_id
#数据人offer决赛圈怎么选##数据人的面试交流地##牛客创作赏金赛##聊聊我眼中的AI##Java#