FlinkDataStream Kafka2ES

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
package Kafka2ES

import java.util

import org.apache.flink.api.common.functions.RuntimeContext
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.elasticsearch.{ElasticsearchSinkFunction, RequestIndexer}
import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink
import org.apache.http.HttpHost
import org.elasticsearch.action.update.UpdateRequest
import org.elasticsearch.script.{Script, ScriptType}

case class Message(id: String, para_name: String, para_val: String)

object Demo1 {

def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)

val dataList = List(
"MSG_001,id,sensor1"
// "MSG_001,timestamp,1603766281"
// "MSG_001,temperature,56",
// "MSG_002,id,sensor2",
// "MSG_002,timestamp,1603766282",
// "MSG_002,temperature,57",
// "MSG_003,id,sensor3",
// "MSG_003,timestamp,1603766283",
// "MSG_003,temperature,58",
// "MSG_004,id,sensor4",
// "MSG_004,timestamp,1603766284",
// "MSG_004,temperature,59"
)

//Source操作
val inputStream = env.fromCollection(dataList)

val dataStream: DataStream[Message] = inputStream.map(row => {
val lines = row.split(",")
// 匹配
Message(lines(0),lines(1),lines(2))
})

val httpHosts = new util.ArrayList[HttpHost]()
httpHosts.add(new HttpHost("127.0.0.1", 9200))
//创建一个ES Sink的builder
val esSinkBuilder: ElasticsearchSink.Builder[Message] = new ElasticsearchSink.Builder[Message](
httpHosts,
new ElasticsearchSinkFunction[Message] {
override def process(t: Message, runtimeContext: RuntimeContext, requestIndexer: RequestIndexer): Unit = {
println("saving data:" + t)

//包装成一个Map或者JsonObject格式
val json = new util.HashMap[String, String]()
val mesId = t.id
var id = "";
var timestamp = "";
var temperature = "";
if(t.para_name.equals("timestamp")){
timestamp = t.para_val
json.put("timestamp",timestamp)

}

if(t.para_name.equals("temperature")){
temperature = t.para_val
json.put("temperature",temperature)

}

if(t.para_name.equals("id")){
id = t.para_val
json.put("id",id)
}


//创建indexRequest准备发送数据
// val indexRequest = Requests.indexRequest()
// .index("ames_1")
// .`type`("readingdata")
// .id(mesId)
// .source(json)

val updateRequest = new UpdateRequest("ames_1", "readingdata", mesId).doc(json);
updateRequest.docAsUpsert(true);

//利用requestIndexer进行发送请求,写入数据
requestIndexer.add(updateRequest)
println("data 写入完成。。。")
}
}
)


//Sink操作
dataStream.addSink(esSinkBuilder.build())

env.execute("sink ES test")
}



}

FlinkDataStream Kafka2ES
http://example.com/2022/01/10/Flink-DataStream kafka2ES/
Author
Hoey
Posted on
January 10, 2022
Licensed under