1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104
| package Kafka2ES
import java.util
import org.apache.flink.api.common.functions.RuntimeContext import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.connectors.elasticsearch.{ElasticsearchSinkFunction, RequestIndexer} import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink import org.apache.http.HttpHost import org.elasticsearch.action.update.UpdateRequest import org.elasticsearch.script.{Script, ScriptType}
case class Message(id: String, para_name: String, para_val: String)
object Demo1 { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1)
val dataList = List( "MSG_001,id,sensor1"
)
val inputStream = env.fromCollection(dataList)
val dataStream: DataStream[Message] = inputStream.map(row => { val lines = row.split(",") Message(lines(0),lines(1),lines(2)) })
val httpHosts = new util.ArrayList[HttpHost]() httpHosts.add(new HttpHost("127.0.0.1", 9200)) val esSinkBuilder: ElasticsearchSink.Builder[Message] = new ElasticsearchSink.Builder[Message]( httpHosts, new ElasticsearchSinkFunction[Message] { override def process(t: Message, runtimeContext: RuntimeContext, requestIndexer: RequestIndexer): Unit = { println("saving data:" + t)
val json = new util.HashMap[String, String]() val mesId = t.id var id = ""; var timestamp = ""; var temperature = ""; if(t.para_name.equals("timestamp")){ timestamp = t.para_val json.put("timestamp",timestamp) } if(t.para_name.equals("temperature")){ temperature = t.para_val json.put("temperature",temperature) } if(t.para_name.equals("id")){ id = t.para_val json.put("id",id) }
val updateRequest = new UpdateRequest("ames_1", "readingdata", mesId).doc(json); updateRequest.docAsUpsert(true); requestIndexer.add(updateRequest) println("data 写入完成。。。") } } )
dataStream.addSink(esSinkBuilder.build())
env.execute("sink ES test") } }
|