Druid
所有文档

          百度MapReduce

          Druid

          Druid简介

          Druid是一个高性能的实时数据分析系统,由MetaMarkets公司在2012开源,专门为OLAP场景而设计。Druid遵从Lambda架构,支持批量和实时两种方式导入数据,并提供高性能的数据查询。

          集群准备

          Druid模版

          登陆百度云控制台,选择“产品服务->百度MapReduce BMR”,点击“创建集群”,进入集群创建页。购置集群时选择Druid模版即可,另外Druid的元数据存储支持本地MySQL和云数据库RDS MySQL。如下图所示:

          image.png

          节点类型

          image.png

          BMR的Master节点上部署Druid的Overlord、Coordinator、Broker和Router,供提交任务、查看数据源和数据查询。

          image.png

          image.png

          为了更容易扩展节点,BMR的Core节点部署Druid的Historical和MiddleManager, Task节点部署Druid的Broker和MiddleManager。可以按照需求变更Core和Task的节点数目。

          Druid对内存的要求较高,建议使用4核16GB(或更高配置)的节点配置。在Druid集群内部,建议不要使用MapReduce跑其他开源组件的作业。

          使用简介(批量索引)

          Druid使用的端口如下:

          Druid节点类型 端口
          Broker 8590
          Overlord 8591
          Coordinator 8592
          Router 8593
          Historical 8594
          MiddleManager 8595

          Druid提供Http访问,每个请求都会返回结果,如果使用curl命令没有任何返回,可以加上-v查看具体的响应信息。

          1. 远程登陆到创建好的集群中

            ssh root@[master节点公网ip]
            使用创建集群时输入的密码

          2. 切换成hdfs用户,在HDFS上创建quickstart目录

            hdfs dfs -mkdir /user/druid/quickstart

          3. 将示例数据文件wikiticker-2015-09-12-sampled.json.gz拷贝到HDFS

            hdfs dfs -copyFromLocal /opt/bmr/druid/quickstart/wikiticker-2015-09-12-sampled.json.gz /user/druid/quickstart

          4. 向Overlord提交批量索引任务

            curl -v -X POST -H 'Content-Type: application/json' -d @/opt/bmr/druid/quickstart/wikiticker-index.json http://xxx-master-instance-xxx-1(hostname):8591/druid/indexer/v1/task
            成功返回结果为{"task":"index_hadoop_wikiticker_yyyy-MM-ddThh:mm:ss.xxZ"}

            如果使用的是高可用集群,Overlord只有一个为Active,有效的hostname可能是xxx-master-instance-xxx-2。

          5. 使用Web UI查看任务执行情况

            使用ssh -ND [本地未使用端口] root@[公网ip] 并对浏览器做相关配置,例如Chrome浏览器通过Swichysharp配置了VPN之后(通过SSH-Tunnel访问集群),或者使用OpenVPN,可通过Master节点的内网ip加上Overlord端口(8591)查看结果,如下图所示:

            image.png

          6. 等到索引任务执行成功,查看数据源

          使用Web UI:配置好VPN后,通过Master节点的内网ip加上Coordinator端口(8592)查看结果,如下图所示:

          使用Http:

          curl -v -X GET http://xxx-master-instance-xxx-1:8592/druid/coordinator/v1/datasources

          成功返回结果为["wikiticker"]

          如果使用的是高可用集群,Coordinator只有一个为Active,有效的hostname可能是xxx-master-instance-xxx-2。

          1. 查询数据

            当Overlord Web UI显示任务成功或者查看数据源显示有wikiticker的时候,可以使用Broker查询数据:
            curl -v -X 'POST' -H 'Content-Type:application/json' -d @/opt/bmr/druid/quickstart/wikiticker-top-pages.json http://[master/task hostname或ip]:8590/druid/v2?pretty

          Tips: 如果需要使用自己的Druid Spec JSON,建议在~目录下编辑,并在本地做好备份。Spec的编写请参考Apache Druid官方文档。

          使用Kafka Indexing Service

          Druid提供了Kafka Indexing Service摄入实时数据,详情参考Druid官方文档。BMR托管的Druid可支持百度消息服务BMS、BMR内部和放在同一VPC下的Kafka集群。本示例使用百度消息服务BMS演示,通过其他方式部署的Kafka集群步骤类似。

          1. 使用百度消息服务BMS创建主题 kafka_demo,然后下载BMS的证书,用于后续创建Kafka生产者和消费者,参考BMS文档
          2. 在~目录(/home/hdfs),将下载完毕的BMS证书上传并解压到此处,然后scp到其他节点

            unzip -d kafka-key kafka-key.zip
            scp -rp kafka-key root@[节点ip或hostname]:~/
          3. 在~目录(/home/hdfs)下编写Druid Spec文件kafka_demo.json,BMS的主题需要带上创建后生成的前缀

            查看client.properties文件,替换kafka_demo.json里的ssl.keystore.password
            
            {
              "type": "kafka",
              "dataSchema": {
                "dataSource": "wiki_kafka_demo",
                "parser": {
                  "type": "string",
                  "parseSpec": {
                    "format": "json",
                    "timestampSpec": {
                      "column": "time",
                      "format": "auto"
                    },
                    "dimensionsSpec": {
                      "dimensions": [
                        "channel",
                        "cityName",
                        "comment",
                        "countryIsoCode",
                        "countryName",
                        "isAnonymous",
                        "isMinor",
                        "isNew",
                        "isRobot",
                        "isUnpatrolled",
                        "metroCode",
                        "namespace",
                        "page",
                        "regionIsoCode",
                        "regionName",
                        "user",
                        { "name": "added", "type": "long" },
                        { "name": "deleted", "type": "long" },
                        { "name": "delta", "type": "long" }
                      ]
                    }
                  }
                },
                "metricsSpec" : [],
                "granularitySpec": {
                  "type": "uniform",
                  "segmentGranularity": "DAY",
                  "queryGranularity": "NONE",
                  "rollup": false
                }
              },
              "tuningConfig": {
                "type": "kafka",
                "reportParseExceptions": false
              },
              "ioConfig": {
                "topic": "[accountID]__kafka_demo",
                "replicas": 2,
                "taskDuration": "PT10M",
                "completionTimeout": "PT20M",
                "consumerProperties": {
                  "bootstrap.servers": "[kafka_address]:[kafka_port]",
                  "security.protocol": "SSL",
                  "ssl.truststore.password": "kafka",
                  "ssl.truststore.location": "/home/hdfs/kafka-key/client.truststore.jks",
                  "ssl.keystore.location": "/home/hdfs/kafka-key/client.keystore.jks",
                  "ssl.keystore.password": "******"
                }
              }
            } 

            若使用的不是开启SSL的Kafka集群,consumerProperties只需要配置bootstrap.servers即可,例如BMR集群中的Kafka。

          4. 向Overlord提交Kafka Supervisor任务

             curl -XPOST -H'Content-Type: application/json' -d @kafka_demo.json http://[overlord_ip]:8591/druid/indexer/v1/supervisor

            成功返回:

            {"id":"wiki_kafka_demo"},Overlord Web UI上也可以看到结果:

            image.png

          5. 本地目录解压BMS证书压缩包,使用client.propertie启动Kafka生产者并发送消息

            sh bin/kafka-console-producer.sh --producer.config client.properties --topic <accountID>__kafka_demo --sync --broker-list [kafka_address]:[kafka_port]
            
            {"time":"2015-09-12T23:58:31.643Z","channel":"#en.wikipedia","cityName":null,"comment":"Notification: tagging for deletion of [[File:Axintele team.gif]]. ([[WP:TW|TW]])","countryIsoCode":null,"countryName":null,"isAnonymous":false,"isMinor":false,"isNew":false,"isRobot":false,"isUnpatrolled":false,"metroCode":null,"namespace":"User talk","page":"User talk:AlexGeorge33","regionIsoCode":null,"regionName":null,"user":"Sir Sputnik","delta":1921,"added":1921,"deleted":0}
            {"time":"2015-09-12T23:58:33.743Z","channel":"#vi.wikipedia","cityName":null,"comment":"clean up using [[Project:AWB|AWB]]","countryIsoCode":null,"countryName":null,"isAnonymous":false,"isMinor":false,"isNew":false,"isRobot":true,"isUnpatrolled":false,"metroCode":null,"namespace":"Main","page":"Codiaeum finisterrae","regionIsoCode":null,"regionName":null,"user":"ThitxongkhoiAWB","delta":18,"added":18,"deleted":0}
            {"time":"2015-09-12T23:58:35.732Z","channel":"#en.wikipedia","cityName":null,"comment":"Put info into infobox, succession box, split sections a bit","countryIsoCode":null,"countryName":null,"isAnonymous":false,"isMinor":false,"isNew":false,"isRobot":false,"isUnpatrolled":false,"metroCode":null,"namespace":"Main","page":"Jamel Holley","regionIsoCode":null,"regionName":null,"user":"Mr. Matté","delta":3427,"added":3427,"deleted":0}
            {"time":"2015-09-12T23:58:38.531Z","channel":"#uz.wikipedia","cityName":null,"comment":"clean up, replaced: ozbekcha → oʻzbekcha, olchami → oʻlchami (3) using [[Project:AWB|AWB]]","countryIsoCode":null,"countryName":null,"isAnonymous":false,"isMinor":true,"isNew":false,"isRobot":true,"isUnpatrolled":false,"metroCode":null,"namespace":"Main","page":"Chambors","regionIsoCode":null,"regionName":null,"user":"Ximik1991Bot","delta":8,"added":8,"deleted":0}
            {"time":"2015-09-12T23:58:41.619Z","channel":"#it.wikipedia","cityName":null,"comment":"/* Gruppo Discovery Italia */","countryIsoCode":null,"countryName":null,"isAnonymous":false,"isMinor":false,"isNew":false,"isRobot":false,"isUnpatrolled":true,"metroCode":null,"namespace":"Main","page":"Deejay TV","regionIsoCode":null,"regionName":null,"user":"Ciosl","delta":4,"added":4,"deleted":0}
            {"time":"2015-09-12T23:58:43.304Z","channel":"#en.wikipedia","cityName":null,"comment":"/* Plot */","countryIsoCode":null,"countryName":null,"isAnonymous":false,"isMinor":false,"isNew":false,"isRobot":false,"isUnpatrolled":false,"metroCode":null,"namespace":"Main","page":"Paper Moon (film)","regionIsoCode":null,"regionName":null,"user":"Siddharth Mehrotra","delta":0,"added":0,"deleted":0}
            {"time":"2015-09-12T23:58:46.732Z","channel":"#en.wikipedia","cityName":null,"comment":"/* Jeez */ delete (also fixed Si Trew's syntax error, as I think he meant to make a link there)","countryIsoCode":null,"countryName":null,"isAnonymous":false,"isMinor":false,"isNew":false,"isRobot":false,"isUnpatrolled":false,"metroCode":null,"namespace":"Wikipedia","page":"Wikipedia:Redirects for discussion/Log/2015 September 12","regionIsoCode":null,"regionName":null,"user":"JaykeBird","delta":293,"added":293,"deleted":0}
            {"time":"2015-09-12T23:58:48.729Z","channel":"#fr.wikipedia","cityName":null,"comment":"MàJ","countryIsoCode":null,"countryName":null,"isAnonymous":false,"isMinor":false,"isNew":false,"isRobot":false,"isUnpatrolled":false,"metroCode":null,"namespace":"Wikipédia","page":"Wikipédia:Bons articles/Nouveau","regionIsoCode":null,"regionName":null,"user":"Gemini1980","delta":76,"added":76,"deleted":0}
            {"time":"2015-09-12T23:58:51.785Z","channel":"#vi.wikipedia","cityName":null,"comment":"clean up using [[Project:AWB|AWB]]","countryIsoCode":null,"countryName":null,"isAnonymous":false,"isMinor":false,"isNew":false,"isRobot":true,"isUnpatrolled":false,"metroCode":null,"namespace":"Main","page":"Amata yezonis","regionIsoCode":null,"regionName":null,"user":"ThitxongkhoiAWB","delta":10,"added":10,"deleted":0}
            {"time":"2015-09-12T23:58:53.898Z","channel":"#vi.wikipedia","cityName":null,"comment":"clean up using [[Project:AWB|AWB]]","countryIsoCode":null,"countryName":null,"isAnonymous":false,"isMinor":false,"isNew":false,"isRobot":true,"isUnpatrolled":false,"metroCode":null,"namespace":"Main","page":"Codia triverticillata","regionIsoCode":null,"regionName":null,"user":"ThitxongkhoiAWB","delta":36,"added":36,"deleted":0}
          6. 通过Coordinator Web UI查看实时数据源

          跨集群使用Druid

          Druid支持通过跨集群和其他Hadoop组件联合使用,如Hadoop、Hive、Spark等。在集群创建页将多个集群创建在同一个私有网络(VPC)之下,curl命令或者Http客户端就可以指定IP和端口向Druid各组件发送跨集群的请求。

          目前只支持通过IP地址访问同一个VPC下的另一个集群的主机。

          使用Hive简化Druid操作

          在Hive中可以通过指定IP地址和端口访问Druid节点。

          本节提供一个能够运行的示例,更多使用详情可参考Apache Hive官方文档https://cwiki.apache.org/confluence/display/Hive/Druid+Integration

          示例一:查询

          推荐使用Hive查询Druid数据,可以简化Druid常用的查询,不用为了简单的查询而去编写复杂的JSON文件。

          1. 创建一个Hive集群,登陆Hive集群

            ssh root@[hive集群公网ip]
            使用创建hive集群时输入的密码

          2. 指定Druid集群Broker的IP地址和端口

            hive> SET hive.druid.broker.address.default=x.x.x.x:8590;

          3. 创建外表(确保按照之前的示例,创建了wikiticker数据源)

            hive> CREATE EXTERNAL TABLE druid_hive_demo_01 STORED BY 'org.apache.hadoop.hive.druid.DruidStorageHandler' TBLPROPERTIES ("druid.datasource" = "wikiticker");

          4. 查看元信息

            hive> DESCRIBE FORMATTED druid_hive_demo_01;
            其中维度列是string类型,指标列是bigint类型,时间戳字段__time是timestamp with local time zone类型

          5. 查询数据

            hive> SELECT `__time`, count, page, `user`, added FROM druid_hive_demo_01 LIMIT 10;

            注意:使用Hive无法扫描Druid全表,如上面的例子不使用limit会报错

            __time是Druid表(数据源)的时间戳字段,在Hive中需要用反引号转义,返回结果如下:

            2015-09-12 08:46:58.771 Asia/Shanghai	1	36	Talk:Oswald Tilghman	GELongstreet
            2015-09-12 08:47:00.496 Asia/Shanghai	1	17	Rallicula	PereBot
            2015-09-12 08:47:05.474 Asia/Shanghai	1	0	Peremptory norm	60.225.66.142
            2015-09-12 08:47:08.77 Asia/Shanghai	1	18	Apamea abruzzorum	Cheers!-bot
            2015-09-12 08:47:11.862 Asia/Shanghai	1	18	Atractus flammigerus	ThitxongkhoiAWB
            2015-09-12 08:47:13.987 Asia/Shanghai	1	18	Agama mossambica	ThitxongkhoiAWB
            2015-09-12 08:47:17.009 Asia/Shanghai	1	0	Campanya dels Balcans (1914-1918)	Jaumellecha
            2015-09-12 08:47:19.591 Asia/Shanghai	1	345	Talk:Dani Ploeger	New Media Theorist
            2015-09-12 08:47:21.578 Asia/Shanghai	1	121	User:WP 1.0 bot/Tables/Project/Pubs	WP 1.0 bot
            2015-09-12 08:47:25.821 Asia/Shanghai	1	18	Agama persimilis	ThitxongkhoiAWB

          示例二:导入历史数据

          可以使用已有的Hive表,通过建外表将HDFS数据导入到Druid中,不必编写复杂的Druid Ingestion Spec文件。注意,必须指定时间戳字段__time,且时间戳类型为timestamp with local time zone。

          1. 进入~目录,下载示例数据文件,创建示例Hive表

            wget http://files.grouplens.org/datasets/movielens/ml-100k.zip
            unzip ml-100k.zip
            
            hive>
            CREATE TABLE u_data (
                userid INT,
                movieid INT,
                rating INT,
                unixtime STRING)
            ROW FORMAT DELIMITED
            FIELDS TERMINATED BY '\t'
            STORED AS TEXTFILE;
            
            LOAD DATA LOCAL INPATH 'ml-100k/u.data'
            OVERWRITE INTO TABLE u_data;
          2. 设置Hive属性,关联Druid

            // 获取druid集群的元数据库信息:ssh root@public_ip 'cat /etc/druid/conf/_common/common.runtime.properties | grep "druid.metadata.storage.connector"'
            // 跨集群访问druid集群的mysql,需要使用ip地址,若使用hostname需要更新hive集群的hosts文件
            // 需要删除外表时开启,可以清空Druid数据: SET external.table.purge=true;
            SET hive.druid.metadata.uri=jdbc:mysql://[druid_mysql_ip]:3306/druid?characterEncoding=UTF-8;
            SET hive.druid.metadata.username=[druid_user];
            SET hive.druid.metadata.password=[druid_passwd];
            SET hive.druid.broker.address.default=[druid_broker_ip]:8590;
            SET hive.druid.overlord.address.default=[druid_overlord_ip]:8591;
            SET hive.druid.coordinator.address.default=[druid_coordinator_ip]:8592;
            SET hive.druid.storage.storageDirectory=hdfs://[hive_hdfs_ip]:8020/user/druid/warehouses;
          3. 方法一:先建表再导入数据

            CREATE EXTERNAL TABLE druid_hive_demo_02
            (`__time` TIMESTAMP WITH LOCAL TIME ZONE, userid STRING, moveid STRING, rating INT)
            STORED BY 'org.apache.hadoop.hive.druid.DruidStorageHandler'
            TBLPROPERTIES (
                "druid.segment.granularity" = "MONTH",
                "druid.query.granularity" = "DAY"
            );
            
            INSERT INTO druid_hive_demo_02
            SELECT
                cast (from_unixtime(cast(unixtime as int)) as timestamp with local time zone)  as `__time`,
                cast(userid as STRING) userid,
                cast(movieid as STRING) moveid,
                cast(rating as INT) rating
            FROM u_data;

            方法二,使用CTAS语句创建Druid表并导入数据

            // 首先打开Hive外表的CTAS开关
            SET hive.ctas.external.tables=true;
            
            CREATE EXTERNAL TABLE druid_hive_demo_02
            STORED BY 'org.apache.hadoop.hive.druid.DruidStorageHandler'
            TBLPROPERTIES (
                "druid.segment.granularity" = "MONTH",
                "druid.query.granularity" = "DAY")
            AS
            SELECT
                cast (from_unixtime(cast(unixtime as int)) as timestamp with local time zone)  as `__time`,
                cast(userid as STRING) userid,
                cast(movieid as STRING) moveid,
                cast(rating as INT) rating
            FROM u_data;

            如果CTAS语句执行失败,创建的表仍然存在,可以继续使用INSERT语句插入数据,或者删除该表并重新执行。

          4. 查询结果

            hive> SELECT * FROM druid_hive_demo_02 LIMIT 10;
            OK
            1997-09-19 08:00:00.0 Asia/Shanghai	259	1074 3
            1997-09-19 08:00:00.0 Asia/Shanghai	259	108	 4
            1997-09-19 08:00:00.0 Asia/Shanghai	259	117	 4
            1997-09-19 08:00:00.0 Asia/Shanghai	259	173	 4
            1997-09-19 08:00:00.0 Asia/Shanghai	259	176	 4
            1997-09-19 08:00:00.0 Asia/Shanghai	259	185	 4
            1997-09-19 08:00:00.0 Asia/Shanghai	259	200	 4
            1997-09-19 08:00:00.0 Asia/Shanghai	259	210	 4
            1997-09-19 08:00:00.0 Asia/Shanghai	259	255	 4
            1997-09-19 08:00:00.0 Asia/Shanghai	259	286	 4

          示例三:导入Kafka实时数据

          1. 使用百度消息服务BMS创建主题 kafka_demo,并下载证书scp到druid集群各节点,参考BMS文档

          scp -rp kafka-key root@[druid集群的节点ip或hostname]:~/

          1. 创建Hive表,通过属性关联BMS Kafka

            // 获取druid集群的元数据库信息:ssh hdfs@public_ip 'cat /etc/druid/conf/_common/common.runtime.properties | grep "druid.metadata.storage.connector"'
            // 跨集群访问druid集群的mysql,需要使用ip地址,若使用hostname需要更新hive集群的hosts文件
            SET hive.druid.metadata.uri=jdbc:mysql://[druid_mysql_ip]:3306/druid?characterEncoding=UTF-8;
            SET hive.druid.metadata.username=[druid_user];
            SET hive.druid.metadata.password=[druid_passwd];
            SET hive.druid.broker.address.default=[druid_broker_ip]:8590;
            SET hive.druid.overlord.address.default=[druid_overlord_ip]:8591;
            SET hive.druid.coordinator.address.default=[druid_coordinator_ip]:8592;
            SET hive.druid.storage.storageDirectory=hdfs://[hive_hdfs_ip]:8020/user/druid/warehouses;
            // Kafka Supervisor配置前缀是druid.kafka.ingestion
            CREATE EXTERNAL TABLE druid_kafka_demo
            (
                `__time`          TIMESTAMP WITH LOCAL TIME ZONE, 
                `channel`         STRING,
                `cityName`        STRING,
                `comment`         STRING,
                `countryIsoCode`  STRING,
                `isAnonymous`     STRING,
                `isMinor`         STRING,
                `isNew`           STRING,
                `isRobot`         STRING,
                `isUnpatrolled`   STRING,
                `metroCode`       STRING,
                `namespace`       STRING,
                `page`            STRING,
                `regionIsoCode`   STRING,
                `regionName`      STRING,
                `added`           INT,
                `user`            STRING,
                `deleted`         INT,
                `delta`           INT
            )
            STORED BY 'org.apache.hadoop.hive.druid.DruidStorageHandler'
            TBLPROPERTIES (
                "kafka.bootstrap.servers" = "[kafka_address]:[kafka_port]",
                "kafka.security.protocol" = "SSL",
                "kafka.ssl.truststore.password" = "kafka",
                "kafka.ssl.truststore.location" =  "/home/hdfs/kafka-key/client.truststore.jks",
                "kafka.ssl.keystore.location" =  "/home/hdfs/kafka-key/client.keystore.jks",
                "kafka.ssl.keystore.password" =  "******",
                "kafka.topic" = "[account_id]__kafka_demo",
                "druid.kafka.ingestion.useEarliestOffset" = "true",
                "druid.kafka.ingestion.maxRowsInMemory" = "5",
                "druid.kafka.ingestion.startDelay" = "PT1S",
                "druid.kafka.ingestion.period" = "PT1S",
                "druid.kafka.ingestion.taskDuration" = "PT10M",
                "druid.kafka.ingestion.completionTimeout" = "PT20M",
                "druid.kafka.ingestion.consumer.retries" = "2"
            );

            如果使用的不是开启了SSL的Kafka,可移除ssl相关配置,例如BMR集群里的Kafka

          2. 启动Kafka生产者发送数据

            sh bin/kafka-console-producer.sh --producer.config client.properties --topic druid_kafka_demo --sync --broker-list [kafka_address]:[kafka_port]
            
            {"__time":"2015-09-12T23:58:31.643Z","channel":"#en.wikipedia","cityName":null,"comment":"Notification: tagging for deletion of [[File:Axintele team.gif]]. ([[WP:TW|TW]])","countryIsoCode":null,"countryName":null,"isAnonymous":false,"isMinor":false,"isNew":false,"isRobot":false,"isUnpatrolled":false,"metroCode":null,"namespace":"User talk","page":"User talk:AlexGeorge33","regionIsoCode":null,"regionName":null,"user":"Sir Sputnik","delta":1921,"added":1921,"deleted":0}
            {"__time":"2015-09-12T23:58:33.743Z","channel":"#vi.wikipedia","cityName":null,"comment":"clean up using [[Project:AWB|AWB]]","countryIsoCode":null,"countryName":null,"isAnonymous":false,"isMinor":false,"isNew":false,"isRobot":true,"isUnpatrolled":false,"metroCode":null,"namespace":"Main","page":"Codiaeum finisterrae","regionIsoCode":null,"regionName":null,"user":"ThitxongkhoiAWB","delta":18,"added":18,"deleted":0}
            {"__time":"2015-09-12T23:58:35.732Z","channel":"#en.wikipedia","cityName":null,"comment":"Put info into infobox, succession box, split sections a bit","countryIsoCode":null,"countryName":null,"isAnonymous":false,"isMinor":false,"isNew":false,"isRobot":false,"isUnpatrolled":false,"metroCode":null,"namespace":"Main","page":"Jamel Holley","regionIsoCode":null,"regionName":null,"user":"Mr. Matté","delta":3427,"added":3427,"deleted":0}
            {"__time":"2015-09-12T23:58:38.531Z","channel":"#uz.wikipedia","cityName":null,"comment":"clean up, replaced: ozbekcha → oʻzbekcha, olchami → oʻlchami (3) using [[Project:AWB|AWB]]","countryIsoCode":null,"countryName":null,"isAnonymous":false,"isMinor":true,"isNew":false,"isRobot":true,"isUnpatrolled":false,"metroCode":null,"namespace":"Main","page":"Chambors","regionIsoCode":null,"regionName":null,"user":"Ximik1991Bot","delta":8,"added":8,"deleted":0}
            {"__time":"2015-09-12T23:58:41.619Z","channel":"#it.wikipedia","cityName":null,"comment":"/* Gruppo Discovery Italia */","countryIsoCode":null,"countryName":null,"isAnonymous":false,"isMinor":false,"isNew":false,"isRobot":false,"isUnpatrolled":true,"metroCode":null,"namespace":"Main","page":"Deejay TV","regionIsoCode":null,"regionName":null,"user":"Ciosl","delta":4,"added":4,"deleted":0}
            {"__time":"2015-09-12T23:58:43.304Z","channel":"#en.wikipedia","cityName":null,"comment":"/* Plot */","countryIsoCode":null,"countryName":null,"isAnonymous":false,"isMinor":false,"isNew":false,"isRobot":false,"isUnpatrolled":false,"metroCode":null,"namespace":"Main","page":"Paper Moon (film)","regionIsoCode":null,"regionName":null,"user":"Siddharth Mehrotra","delta":0,"added":0,"deleted":0}
            {"__time":"2015-09-12T23:58:46.732Z","channel":"#en.wikipedia","cityName":null,"comment":"/* Jeez */ delete (also fixed Si Trew's syntax error, as I think he meant to make a link there)","countryIsoCode":null,"countryName":null,"isAnonymous":false,"isMinor":false,"isNew":false,"isRobot":false,"isUnpatrolled":false,"metroCode":null,"namespace":"Wikipedia","page":"Wikipedia:Redirects for discussion/Log/2015 September 12","regionIsoCode":null,"regionName":null,"user":"JaykeBird","delta":293,"added":293,"deleted":0}
            {"__time":"2015-09-12T23:58:48.729Z","channel":"#fr.wikipedia","cityName":null,"comment":"MàJ","countryIsoCode":null,"countryName":null,"isAnonymous":false,"isMinor":false,"isNew":false,"isRobot":false,"isUnpatrolled":false,"metroCode":null,"namespace":"Wikipédia","page":"Wikipédia:Bons articles/Nouveau","regionIsoCode":null,"regionName":null,"user":"Gemini1980","delta":76,"added":76,"deleted":0}
            {"__time":"2015-09-12T23:58:51.785Z","channel":"#vi.wikipedia","cityName":null,"comment":"clean up using [[Project:AWB|AWB]]","countryIsoCode":null,"countryName":null,"isAnonymous":false,"isMinor":false,"isNew":false,"isRobot":true,"isUnpatrolled":false,"metroCode":null,"namespace":"Main","page":"Amata yezonis","regionIsoCode":null,"regionName":null,"user":"ThitxongkhoiAWB","delta":10,"added":10,"deleted":0}
            {"__time":"2015-09-12T23:58:53.898Z","channel":"#vi.wikipedia","cityName":null,"comment":"clean up using [[Project:AWB|AWB]]","countryIsoCode":null,"countryName":null,"isAnonymous":false,"isMinor":false,"isNew":false,"isRobot":true,"isUnpatrolled":false,"metroCode":null,"namespace":"Main","page":"Codia triverticillata","regionIsoCode":null,"regionName":null,"user":"ThitxongkhoiAWB","delta":36,"added":36,"deleted":0}
          3. 可以通过DDL控制Kafka导入数据的启停

            // 只有druid.kafka.ingestion为START,Hive才能够提交实时任务到Druid
            ALTER TABLE druid_kafka_demo SET TBLPROPERTIES('druid.kafka.ingestion' = 'START');
            ALTER TABLE druid_kafka_demo SET TBLPROPERTIES('druid.kafka.ingestion' = 'STOP');
            ALTER TABLE druid_kafka_demo SET TBLPROPERTIES('druid.kafka.ingestion' = 'RESET');
          4. 当第一个Segment上传成功,Coordinator显示出数据源后即可使用Hive查询数据,见示例一
          上一篇
          Flink
          下一篇
          Impala