【问题标题】:Avro Kafka conversion issues between scala and Pythonscala 和 Python 之间的 Avro Kafka 转换问题
【发布时间】:2023-04-05 10:15:01
【问题描述】:

我们的项目有 scala 和 python 代码,我们需要向 kafka 发送/使用 avro 编码的消息。

我正在使用 python 和 scala 向 kafka 发送 avro 编码消息。我在 scala 代码中有生产者,它使用 Twitter 双射库发送 avro 编码消息,如下所示:

val resourcesPath = getClass.getResource("/avro/url_info_schema.avsc")
val schemaFile = scala.io.Source.fromURL(resourcesPath).mkString
val schema = parser.parse(schemaFile)
val recordInjection = GenericAvroCodecs[GenericRecord](schema)
val avroRecord = new GenericData.Record(schema)
avroRecord.put("url_sha256", row._1)
avroRecord.put("url", row._2._1)
avroRecord.put("timestamp", row._2._2)
val recordBytes = recordInjection.apply(avroRecord)
kafkaProducer.value.send("topic", recordBytes)

Avro 架构看起来像

{
  "namespace": "com.rm.avro",
  "type": "record",
  "name": "url_info",
  "fields":[
     {
        "name": "url_sha256", "type": "string"
     },
     {
        "name": "url",  "type": "string"
     },
     {
        "name": "timestamp", "type": ["long"]
     }
 ]

}

我能够在 Scala 的 KafkaConsumer 中成功解码

val resourcesPath = getClass.getResource("/avro/url_info_schema.avsc")
val schemaFile = scala.io.Source.fromURL(resourcesPath).mkString


kafkaInputStream.foreachRDD(kafkaRDD => {
  kafkaRDD.foreach(

    avroRecord => {
      val parser = new Schema.Parser()
      val schema = parser.parse(schemaFile)
      val recordInjection = GenericAvroCodecs[GenericRecord](schema)
      val record = recordInjection.invert(avroRecord.value()).get
      println(record)
    }
  )

}

但是,我无法在 python 中解码消息,但出现以下异常

'utf8' codec can't decode byte 0xe4 in position 16: invalid continuation byte

python 代码如下所示:
schema_path="avro/url_info_schema.avsc"
schema = avro.schema.parse(open(schema_path).read())

for msg in consumer:
   bytes_reader = io.BytesIO(msg.value)
    decoder = avro.io.BinaryDecoder(bytes_reader)
    reader = avro.io.DatumReader(schema)
    decoded_msg = reader.read(decoder)
    print(decoded_msg)

scala avro 消费者也无法理解 python avro 生产者消息。我有一个例外。 Python Avro 生产者如下所示:

datum_writer = DatumWriter(schema)
bytes_writer = io.BytesIO()

datum_writer = avro.io.DatumWriter(schema)
encoder = avro.io.BinaryEncoder(bytes_writer)
datum_writer.write(data, encoder) 
raw_bytes = bytes_writer.getvalue()
producer.send(topic, raw_bytes)

如何在 python 和 scala 中保持一致?任何指针都会很棒

【问题讨论】:

  • 找到了解决方案。将很快发布解决方案。它可能会帮助其他人。

标签:
python
scala
apache-kafka
spark-avro