一、redis sink

对应jar包

<dependency>
    <groupId>org.apache.bahir</groupId>
    <artifactId>flink-connector-redis_2.11</artifactId>
    <version>1.0</version>
</dependency>

将文件内容写入到hash中

代码:

object RedisSinkTest {

  def main(args: Array[String]): Unit = {

    val env = StreamExecutionEnvironment.getExecutionEnvironment

    val streamFromFile = env.readTextFile("C:\\Users\\Mi\\Documents\\project\\idea\\FlinkTitorial\\src\\main\\resources\\sensor.txt")

    val dataStream: DataStream[SensorReading] = streamFromFile.map(d => {
      val arr = d.split(",")
      SensorReading(arr(0).trim, arr(1).trim.toLong, arr(2).toDouble)
    })

    //redis sink
    val config: FlinkJedisPoolConfig = new FlinkJedisPoolConfig.Builder().setHost("hadoop102").setPort(6379).build()
    dataStream.addSink(new RedisSink(config,new MyRedisMapper))

    env.execute("redis sink test")
  }

}

class MyRedisMapper extends RedisMapper[SensorReading]{

  //命令为hset,键为sensor_temperature
  override def getCommandDescription: RedisCommandDescription = {
    new RedisCommandDescription(RedisCommand.HSET,"sensor_temperature")
  }

  //field为传感器id
  override def getKeyFromData(t: SensorReading): String = t.id

  //value为温度
  override def getValueFromData(t: SensorReading): String = t.temperature.toString
}

redis查看结果

127.0.0.1:6379> hgetall sensor_temperature
1) "sensor_1"
2) "35.80018327300259"
3) "sensor_6"
4) "15.402984393403084"
5) "sensor_10"
6) "38.101067604893444"
7) "sensor_7"
8) "6.720945201171228"

  

二、es sink

对应jar包

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-elasticsearch6_2.11</artifactId>
    <version>1.7.2</version>
</dependency>

将文件内容写入到es中

代码:

object EsSinkTest {

  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    val streamFromFile = env.readTextFile("C:\\Users\\Mi\\Documents\\project\\idea\\FlinkTitorial\\src\\main\\resources\\sensor.txt")

    val dataStream: DataStream[SensorReading] = streamFromFile.map(d => {
      val arr = d.split(",")
      SensorReading(arr(0).trim, arr(1).trim.toLong, arr(2).toDouble)
    })

    //es sink
    val httpHosts = new util.ArrayList[HttpHost]()
    httpHosts.add(new HttpHost("hadoop101",9200))
    //创建一个es sink的builder
    val esSinkBuilder = new ElasticsearchSink.Builder[SensorReading](httpHosts, new ElasticsearchSinkFunction[SensorReading] {
      override def process(t: SensorReading, runtimeContext: RuntimeContext, requestIndexer: RequestIndexer): Unit = {
        println("保存数据:" + t)
        //包装成map
        val map = new util.HashMap[String, String]()
        map.put("sensor_id", t.id)
        map.put("temperature", t.temperature.toString)
        map.put("ts", t.timestamp.toString)

        //创建index request,准备发送数据
        val indexRequest: IndexRequest = Requests.indexRequest().index("sensor").`type`("redingdata").source(map)

        //利用requestIndexer发送请求,写入数据
        requestIndexer.add(indexRequest)

        println("保存成功")
      }
    })
    esSinkBuilder

    dataStream.addSink(esSinkBuilder.build())

    env.execute("redis sink test")
  }

}

es中查看结果

{
  "took" : 148,
  "timed_out" : false,
  "_shards" : {
    "total" : 5,
    "successful" : 5,
    "skipped" : 0,
    "failed" : 0
  },
  "hits" : {
    "total" : 6,
    "max_score" : 1.0,
    "hits" : [
      {
        "_index" : "sensor",
        "_type" : "redingdata",
        "_id" : "QXpZnnEBUwLRQchmepbT",
        "_score" : 1.0,
        "_source" : {
          "sensor_id" : "sensor_6",
          "temperature" : "15.402984393403084",
          "ts" : "1547718201"
        }
      },
      {
        "_index" : "sensor",
        "_type" : "redingdata",
        "_id" : "RnpZnnEBUwLRQchme5ZS",
        "_score" : 1.0,
        "_source" : {
          "sensor_id" : "sensor_7",
          "temperature" : "6.720945201171228",
          "ts" : "1547718202"
        }
      },
      {
        "_index" : "sensor",
        "_type" : "redingdata",
        "_id" : "Q3pZnnEBUwLRQchmepbr",
        "_score" : 1.0,
        "_source" : {
          "sensor_id" : "sensor_1",
          "temperature" : "35.80018327300259",
          "ts" : "1547718199"
        }
      },
      {
        "_index" : "sensor",
        "_type" : "redingdata",
        "_id" : "QnpZnnEBUwLRQchmepbo",
        "_score" : 1.0,
        "_source" : {
          "sensor_id" : "sensor_1",
          "temperature" : "30.8",
          "ts" : "1547718200"
        }
      },
      {
        "_index" : "sensor",
        "_type" : "redingdata",
        "_id" : "RHpZnnEBUwLRQchmepbs",
        "_score" : 1.0,
        "_source" : {
          "sensor_id" : "sensor_1",
          "temperature" : "40.8",
          "ts" : "1547718201"
        }
      },
      {
        "_index" : "sensor",
        "_type" : "redingdata",
        "_id" : "RXpZnnEBUwLRQchmepbu",
        "_score" : 1.0,
        "_source" : {
          "sensor_id" : "sensor_10",
          "temperature" : "38.101067604893444",
          "ts" : "1547718205"
        }
      }
    ]
  }
}

 三、jdbc sink

①mysql驱动

<!-- mysql sink -->
<dependency>
    <groupId>mysql</groupId>
    <artifactId>mysql-connector-java</artifactId>
    <version>5.1.44</version>
</dependency>

②自定义mysql sink,继承RichSinkFunction,重写执行逻辑以及初始化和关闭资源的方法。

class MyJdbcSink() extends RichSinkFunction[SensorReading]{

  //定义sql连接、预编译器
  var conn:Connection = _
  var insertStmt : PreparedStatement = _
  var updateStmt:PreparedStatement=_

  //初始化
  override def open(parameters: Configuration): Unit = {
    super.open(parameters)

    conn = DriverManager.getConnection("jdbc:mysql:///test","root","123456")

    insertStmt = conn.prepareStatement("insert into temperatures(sensor,temp) values(?,?)")

    updateStmt = conn.prepareStatement("update temperatures set temp=? where sensor=?")
  }

  //调用连接,执行sql
  override def invoke(value: SensorReading, context: SinkFunction.Context[_]): Unit = {
    //这句必须删掉
    //super.invoke(value, context)

    //执行更新语句
    updateStmt.setDouble(1,value.temperature)
    updateStmt.setString(2,value.id)
    updateStmt.execute()

    //如果没有,则插入
    if (updateStmt.getUpdateCount == 0){
      insertStmt.setString(1,value.id)
      insertStmt.setDouble(2,value.temperature)
      insertStmt.execute()
    }

  }

  //关闭资源
  override def close(): Unit = {
    updateStmt.close()
    insertStmt.close()
    conn.close()
  }

}

③添加自定义的mysql sink并执行

object JdbcSinkTest {

  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    val streamFromFile = env.readTextFile("C:\\Users\\Mi\\Documents\\project\\idea\\FlinkTitorial\\src\\main\\resources\\sensor.txt")

    val dataStream: DataStream[SensorReading] = streamFromFile.map(d => {
      val arr = d.split(",")
      SensorReading(arr(0).trim, arr(1).trim.toLong, arr(2).toDouble)
    })

    //jdbc sink
    dataStream.addSink(new MyJdbcSink())

    env.execute("jdbc sink test")
  }

}