加入收藏 | 设为首页 | 会员中心 | 我要投稿 安卓应用网_福州站长网 (https://www.0591zz.com/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 大数据 > 正文

大数据处理项目相关

发布时间:2021-01-17 10:37:40 所属栏目:大数据 来源:网络整理
导读:副标题#e# mapReduce部分 * MapReduce MAP :映射 reduce :归纳* 简单来说,一个映射函数就是对一些独立元素组成的概念上的列表(例如,一个测试成绩的列表)的每一个元素进行指定的操作(比如,有人发现所有学生的成绩都被高估了一分,他可以定义一个“减一

使用librdkafka开发一个producer的步骤:
librdkafka:
1. conf 设置
kafka conf:
rd_kafka_conf_new(): rd_kafka_conf_set()
topic conf:
rd_kafka_topic_conf_new(): rd_kafka_topic_conf_set()

  1. 设置conf回调,消息发送成功或者失败都会调用
    rd_kafka_conf_set_dr_cb()
    rd_kafka_conf_set_dr_msg_cb()

  2. 创建kafka
    rd_kafka_new()
    设置系统日志
    rd_kafka_set_logger()
    rd_kafka_set_log_level()
    添加下游brokers:
    rd_kafka_brokers_add()

  3. 创建新的topic
    rd_kafka_topic_new()

  4. producer:
    rd_kafka_produce()
    发送后,设置时间观察,第二个参数是阻塞等待时间,一般设置为0,rd_kafka_poll()

  5. 销毁操作
    rd_kafka_topic_destroy()
    rd_kafka_destroy()
    rd_kafka_wait_destroyed(2000)

一些数据结构的解释

  • Brokers
    librdkafka 只需要一份最初的brokers列表(至少包含一个broker)。它将连接所有”metadata.broker.list”或者是rd_kafka_brokers_add()函数添加的brokers,然后向每个brokers申请一些元数据信息:包含brokers的完整列表、topic、partitions以及它们在Kafka 集群中的leaders broker信息。

Brokers名字的形式为:host:port; 其中port是可选的,默认是9092,host是任何一个可以解析的hostname或者ipv4或者ipv6地址。如果host是多个地址,librdkafka将会在每一次连接尝试中循环连接这些地址。包含所有broker 地址的DNS记录可以用来提供可靠的bootstrap broker。

  • rd_kafka_t
    实际应用中,需要创建一个top-level的对象 rd_kafka_t, 这个对象是基本的容器,它提供了全局性配置属性以及共享状态信息,它由rd_kafka_new()函数创建。
  • rd_kafka_topic_t
    同时也需要创建一个或者多个topics对象rd_kafka_topic_t,给produer以及consumer使用。 topic对象具有topic特定的配置属性,同时还包含了所有可用partitions与leader brokers映射关系。它通过调用rd_kafka_topic_new()函数创建。

    注意:实际应用中,可能会创建多个rd_kafka_t对象,它们并没有共享状态信息
    注意:rd_kafka_topic_t对象只能由创建它的对象rd_kafka_t使用。

  • 线程和回调函数

    librdkafka 内部将会有多个线程,以充分利用硬件资源。API的实现是完全线程安全的,实际应用中可以在任何时候任何线程中调用任何API函数而不用担心线程安全。

    一个以轮询为基础的API用来给实际应用提供信号反馈,实际应用应当按照固定时间间隔调用rd_kafka_poll()函数。这个轮询的API将会调用以下可的回调(都是可选的):

    消息发送报告回调:报告消息发送失败。这将允许实际应用采取措施应对发送失败,并释放消息发送过程中占有的资源。

    错误回调:报告错误;错误一般是信息化方面的,例如连接broker失败,实际应用通常不需要采取任何措施。错误的数据类型是通过rd_kafka_resp_err_t enum类型数据,可以描述本地错误和远程broker错误。

    不是poll函数引起的可选回调函数,可能是由任意线程引发的:

    logging 回调:实际应用中,用于发送librdkafka产生的log消息。

    partitioner 回调:实际应用提供消息的partitioner。partitioner可能被任何线程任何时候调用,它可能由于同一个key而被调用多次。Partitioner 函数有以下限制:

    一定不能调用rd_kafka_*()等函数

    一定不能阻塞或延长执行

    (编辑:安卓应用网_福州站长网)

    【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!