0

以下は、カフカ プロデューサー用の私の python コーディングです。メッセージをカフカ ブローカーに発行できるかどうかはわかりません。コンシューマー側がメッセージを受信して​​いないためです。プロデューサーコンソールコマンドを使用してテストしている間、私のコンシューマーpythonプログラムは正常に動作しています。

from __future__ import print_function

import sys
from pyspark import SparkContext
from kafka import KafkaClient, SimpleProducer

if __name__ == "__main__":

if len(sys.argv) != 2:
    print("Usage:spark-submit producer1.py <input file>", file=sys.stderr)
    exit(-1)

sc = SparkContext(appName="PythonRegression")

def sendkafka(messages):
    ## Set broker port
    kafka = KafkaClient("localhost:9092")
    producer = SimpleProducer(kafka, async=True, batch_send_every_n=5,  
batch_send_every_t=10)
    send_counts = 0
    for message in messages:
        try:
            print(message)
            ## Set topic name and push messages to the Kafka Broker
            yield producer.send_messages('test', message.encode('utf-8'))
        except Exception, e:
            print("Error: %s" % str(e))
        else:
            send_counts += 1
    print("The count of prediction results which were sent IN THIS PARTITION 
is %d.\n" % send_counts)

## Connect and read the file.    
rawData = sc.textFile(sys.argv[1])

## Find and skip the first row
dataHeader = rawData.first()
data =  rawData.filter(lambda x: x != dataHeader)

## Collect the RDDs.
sentRDD = data.mapPartitions(sendkafka) 
sentRDD.collect()

## Stop file connection
sc.stop()

これは私の「消費者」のpythonコーディングです

from __future__ import print_function
import sys
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils

if len(sys.argv) < 3:
print ("Program to pulls the messages from kafka brokers.")
print("Usage: consume.py <zk> <topic>", file=sys.stderr)

else:
## Flow
## Loads settings from system properties, for launching of spark-submit.
sc = SparkContext(appName="PythonStreamingKafkaWordCount")

## Create a StreamingContext using an existing SparkContext.
ssc = StreamingContext(sc, 10)

## Get everything after the python script name
zkQuorum, topic = sys.argv[1:]

## Create an input stream that pulls messages from Kafka Brokers.
kvs = KafkaUtils.createStream(ssc, zkQuorum, "spark-streaming-consumer", 
{topic: 1})

## 
lines = kvs.map(lambda x: x[1])

## Print the messages pulled from Kakfa Brokers
lines.pprint()

## Save the pulled messages as file
## lines.saveAsTextFiles("OutputA")

## Start receiving data and processing it
ssc.start()

## Allows the current process to wait for the termination of the context 
ssc.awaitTermination()
4

3 に答える 3

0

Produce リクエストで増加している場合は、トピック内のメッセージの数を確認できます。

./bin/kafka-run-class.sh kafka.tools.GetOffsetShell \
--broker-list <Kafka_broker_hostname>:<broker_port> --topic Que1 \ 
--time -1 --offsets 1 | awk -F  ":" '{sum += $3} END {print sum}'

メッセージの数が増えている場合は、Producer が正常に動作していることを意味します。

于 2016-09-14T08:02:41.803 に答える
0

別のサーバーで完全に動作することをテストしたので、ローカルの Zookeeper または Kafka に何か問題があると思います。しかし、私に返信してくれた人に感謝します;)

于 2016-09-21T02:54:32.833 に答える
0

私は通常、kafka-console-consumer (Apache Kafka の一部) を使用してこのような問題をデバッグし、作成しようとしたトピックから消費します。コンソール コンシューマーがメッセージを受信した場合、メッセージが Kafka に到着したことがわかります。

最初にプロデューサを実行し、終了させて​​からコンシューマを開始すると、問題は、コンシューマがログの最後から開始し、追加のメッセージを待機している可能性があります。最初にコンシューマーを開始していることを確認するか、最初に自動的に開始するように構成してください (申し訳ありませんが、Python クライアントでそれを行う方法がわかりません)。

于 2016-09-14T04:58:27.910 に答える