Blackhole预测出租车票价案例
所有文档

          BML 全功能AI开发平台

          Blackhole预测出租车票价案例

          使用Blackhole预测出租车票价


          使用美国纽约 2009~2019年yellow出租车数据,全量数据75G左右,预测出租车行驶需要的票价。

          原版实现地址:https://github.com/rapidsai-community/notebooks-contrib/blob/branch-0.12/intermediate_notebooks/E2E/taxi/NYCTaxi-E2E.ipynb

          Blackhole环境准备

          CodeLab平台默认不安装Blackhole,请先到导航左边“包管理”页面安装blackhole。
          更多关于blackhole使用方法和案例,请参考Blackhole简介和基本用法

          数据集准备

          本案例选取2005年~2006年部分数据进行分析和训练,并且考虑到用户场景,我们准备了两份数据集:
          小数据集: 抽样10000行,大小1.5M,让用户能够在本地端环境里,快速体验Blackhole。
          大数据集: 大约7000万行,大小在10G左右,让用户通过端云同步,体验Blackhole处理大数据量过程。

          ## 小数据量词表路径:https://codelab-dataset.cdn.bcebos.com/small/kaggle/taxi.zip
          ## 大数据量词表路径:https://codelab-dataset.cdn.bcebos.com/full/kaggle/taxi.zip
          ! wget https://codelab-dataset.cdn.bcebos.com/small/kaggle/taxi.zip && unzip -o taxi.zip
          --2021-04-22 05:23:46--  https://codelab-dataset.cdn.bcebos.com/small/kaggle/taxi.zip
          Resolving codelab-dataset.cdn.bcebos.com (codelab-dataset.cdn.bcebos.com)... 123.125.132.35
          Connecting to codelab-dataset.cdn.bcebos.com (codelab-dataset.cdn.bcebos.com)|123.125.132.35|:443... connected.
          HTTP request sent, awaiting response... 200 OK
          Length: 443110 (433K) [application/zip]
          Saving to: 'taxi.zip'
          
          taxi.zip            100%[===================>] 432.72K  --.-KB/s    in 0.09s   
          
          2021-04-22 05:23:46 (4.73 MB/s) - 'taxi.zip' saved [443110/443110]
          
          Archive:  taxi.zip
             creating: taxi/
            inflating: taxi/taxi_train.csv     
            inflating: taxi/taxi_test.csv      

          步骤1: 导入Blackhole依赖

          import math
          
          import numpy as np
          import blackhole.dataframe as pd
          from blackhole.ml.model_selection import train_test_split
          from blackhole.ml.ensemble import RandomForestRegressor as bh_RandomForestRegressor
          import os
          import warnings
          warnings.filterwarnings('ignore')

          步骤2 数据导入

          %%time
          data_dir = './taxi/'
          data_path = os.path.join(data_dir, "taxi_train.csv")
          train = pd.read_csv(data_path)
          CPU times: user 160 ms, sys: 8.28 ms, total: 168 ms
          Wall time: 15.3 ms
          train.info()
          <class 'blackhole.dataframe.frame.DataFrame'>
          Index: 9999 entries, 0 to 9998
          Data columns (total 19 columns):
           #   Column                 Non-Null Count  Dtype  
          ---  ------                 --------------  -----  
           0   VendorID               9999 non-null   int32  
           1   tpep_pickup_datetime   9999 non-null   object 
           2   tpep_dropoff_datetime  9999 non-null   object 
           3   passenger_count        9999 non-null   int32  
           4   trip_distance          9999 non-null   float64
           5   pickup_longitude       9999 non-null   float64
           6   pickup_latitude        9999 non-null   float64
           7   RatecodeID             9999 non-null   int32  
           8   store_and_fwd_flag     9999 non-null   object 
           9   dropoff_longitude      9999 non-null   float64
           10  dropoff_latitude       9999 non-null   float64
           11  payment_type           9999 non-null   int32  
           12  fare_amount            9999 non-null   float64
           13  extra                  9999 non-null   float64
           14  mta_tax                9999 non-null   float64
           15  tip_amount             9999 non-null   float64
           16  tolls_amount           9999 non-null   float64
           17  improvement_surcharge  9999 non-null   float64
           18  total_amount           9999 non-null   float64
          dtypes: float64(12), int32(4), object(3)

          步骤3: 数据分析与处理

          def clean_data(data, remap, must_haves):
              """data clean.
          
              :param data: the origin data to be clean
              :param remap: the remap column names
              :param must_haves: the include column names
              :return: the cleaned data
              """
              # lowercase column names
              tmp = {col: col.strip().lower() for col in list(data.columns)}
              data = data.rename(columns=tmp)
              # rename
              data = data.rename(columns=remap)
              # 清洗不需要的数据
              for col in data.columns:
                  if col not in must_haves:
                      data = data.drop(col, axis=1)
                      continue
          
                  # 处理时间类型数据
                  if data[col].dtype == 'object' and col in ['pickup_datetime', 'dropoff_datetime']:
                      data[col] = data[col].astype('datetime64')
                      continue
          
                  # 数据类型类型(from string to float)& 缺失值处理(fillna)
                  if data[col].dtype == 'object':
                      data[col] = data[col].str.fillna('-1')
                      data[col] = data[col].astype('float32')
                  else:
                      # downcast from 64bit to 32bit
                      if 'int' in str(data[col].dtype):
                          data[col] = data[col].astype('int16')
                      if 'float' in str(data[col].dtype):
                          data[col] = data[col].astype('float32')
                      data[col] = data[col].fillna(-1)
              return data
          
          def filter_outliers(data):
              """filter outliers.
          
              :param data: the data to be filter
              :return: the filtered data
              """
              res = data[(data["fare_amount"] > 0) & (data["fare_amount"] < 500) & 
                         (data["passenger_count"] > 0) & (data["passenger_count"] < 6) &
                         (data["pickup_longitude"] != 0) & (data["pickup_latitude"] != 0) & 
                         (data["dropoff_longitude"] != 0) & (data["dropoff_latitude"] != 0)]
              return res
          
          def haversine_distance_kernel(data):
              """
              :param pickup_latitude: the pickup_latitude column
              :param pickup_longitude: the pickup_longitude column
              :param dropoff_latitude: the dropoff_latitude column
              :param dropoff_longitude: the dropoff_longitude column
              :param h_distance: the h_distance column
              """
              x_1 = math.pi / 180 * data['pickup_latitude']
              y_1 = math.pi / 180 * data['pickup_longitude']
              x_2 = math.pi / 180 * data['dropoff_latitude']
              y_2 = math.pi / 180 * data['dropoff_longitude']
              dlon = y_2 - y_1
              dlat = x_2 - x_1
              a = np.sin(dlat / 2) ** 2 + np.cos(x_1) * np.cos(x_2) * np.sin(dlon / 2) ** 2
          
              c = 2 * np.arcsin(np.sqrt(a))
              r = 6371
              data['h_distance'] = c * r
              return data
          
          
          def day_of_the_week_kernel(data):
              """
              :param day: the day column from pickup_datetime
              :param month: the month column from pickup_datetime
              :param year: the year column from pickup_datetime
              :param day_of_week: the day_of_week column
              """
              shift = data['month'] if list(data['month'] < 3) else 0
              Y = data['year'] - (data['month'] < 3)
              y = Y - 2000
              c = 20
              d = data['day']
              m = data['month'] + shift + 1
              data['day_of_week'] = (d + np.floor(m * 2.6) + y + (y // 4) + (c // 4) - 2 * c) % 7
              return data
          
          def add_features(data):
              """add new features.
          
              :param data: the origin data to be added new features
              :return: the whole data
              """
              data['hour'] = data['pickup_datetime'].dt.hour.astype('int16')
              data['year'] = data['pickup_datetime'].dt.year.astype('int16')
              data['month'] = data['pickup_datetime'].dt.month.astype('int16')
              data['day'] = data['pickup_datetime'].dt.day.astype('int16')
              data = data.drop('pickup_datetime', axis=1)
              data = data.drop('dropoff_datetime', axis=1)
              data = haversine_distance_kernel(data)
              data = day_of_the_week_kernel(data)
              data['is_weekend'] = data['day_of_week'] < 2
              data['is_weekend'] = data['is_weekend'].astype('int16')
              data['passenger_count'] = data['passenger_count'].astype('int16')
              data['rate_code'] = data['rate_code'].astype('int16')
              return data
          %%time
          remap = {
                      'tpep_pickup_datetime': 'pickup_datetime',
                      'tpep_dropoff_datetime': 'dropoff_datetime',
                      'ratecodeid': 'rate_code',
                  }
          # 必需的columns & dtypes
          must_haves = {
              'pickup_datetime': 'object',
              'dropoff_datetime': 'object',
              'passenger_count': 'int16',
              'trip_distance': 'float32',
              'pickup_longitude': 'float32',
              'pickup_latitude': 'float32',
              'rate_code': 'int16',
              'dropoff_longitude': 'float32',
              'dropoff_latitude': 'float32',
              'fare_amount': 'float32',
          }
          # 数据清洗
          train = clean_data(train, remap, must_haves)
          # 过滤离群值
          train = filter_outliers(train)
          # 构造新特征
          train_data = add_features(train)
          CPU times: user 2.12 s, sys: 8.61 ms, total: 2.13 s
          Wall time: 102 ms
          train_data.info()
          <class 'blackhole.dataframe.frame.DataFrame'>
          Index: 9485 entries, 0 to 9998
          Data columns (total 15 columns):
           #   Column             Non-Null Count  Dtype  
          ---  ------             --------------  -----  
           0   passenger_count    9485 non-null   int16  
           1   trip_distance      9485 non-null   float32
           2   pickup_longitude   9485 non-null   float32
           3   pickup_latitude    9485 non-null   float32
           4   rate_code          9485 non-null   int16  
           5   dropoff_longitude  9485 non-null   float32
           6   dropoff_latitude   9485 non-null   float32
           7   fare_amount        9485 non-null   float32
           8   hour               9485 non-null   int16  
           9   year               9485 non-null   int16  
           10  month              9485 non-null   int16  
           11  day                9485 non-null   int16  
           12  h_distance         9485 non-null   float64
           13  day_of_week        9485 non-null   float64
           14  is_weekend         9485 non-null   int16  
          dtypes: float32(6), float64(2), int16(7)
          %%time
          X = train_data[train_data.columns.difference(['fare_amount'])]
          y = train_data['fare_amount']
          X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.1)
          CPU times: user 150 ms, sys: 767 µs, total: 151 ms
          Wall time: 18.1 ms
          X_train.shape, y_train.shape, X_test.shape, y_test.shape
          ((8536, 14), (8536,), (949, 14), (949,))

          步骤4: 模型训练

          %%time
          params = {
              'n_estimators': 20,
              'max_depth': 12,
              'mtries': 12,
              'stopping_rounds': 2,
              'stopping_tolerance': 0.01,
              'stopping_metric': 'rmse',
              
          }
          rf = bh_RandomForestRegressor(**params)
          rf.fit(X_train, y_train)
          Parse progress: |█████████████████████████████████████████████████████████| 100%
          Parse progress: |█████████████████████████████████████████████████████████| 100%
          drf Model Build progress: |███████████████████████████████████████████████| 100%
          CPU times: user 30.6 s, sys: 818 ms, total: 31.4 s
          Wall time: 2.61 s

          步骤5: 预测评估

          %%time
          from blackhole.ml.metrics import mean_squared_error, r2_score
          
          def regressor_evaluate(y_true, y_pred):
              """ regressor evaluate """
              mse = mean_squared_error(y_true, y_pred)
              rmse = np.sqrt(mse)
              r2 = r2_score(y_true, y_pred, multioutput='variance_weighted')
              return rmse, r2
          
          y_pred = rf.predict(X_test)
          rmse, r2 = regressor_evaluate(y_test, y_pred)
          Parse progress: |█████████████████████████████████████████████████████████| 100%
          drf prediction progress: |████████████████████████████████████████████████| 100%
          Export File progress: |███████████████████████████████████████████████████| 100%
          CPU times: user 6.49 s, sys: 378 ms, total: 6.87 s
          Wall time: 888 ms
          rmse, r2
          (3.262973716501928, 0.8988329694941469)

          步骤6: 导出模型

          from blackhole.ml import save_model
          import shutil
          ##云端执行时输出路径必须在环境变量“OUTPUT_PATH”下面
          output_path = os.getenv('OUTPUT_PATH', "./")  #输出路径
          model_path = os.path.join(output_path, 'bh_taxi_model') #模型保存路径
          # 判断模型路径是否存在,如果已存在删除并更新
          if os.path.exists(model_path):
              shutil.rmtree(model_path)
          save_model(rf, model_path)
          上一篇
          Blackhole预测航班延迟案例
          下一篇
          常见问题汇总