You need the tf2-rapids-py39 conda environment for this example to work, refer to the conda page for details
### Change the visible GPU incase one of them is full
# import os
# os.environ["CUDA_DEVICE_ORDER"]="PCI_BUS_ID"
# os.environ["CUDA_VISIBLE_DEVICES"]="1"
GPU Accelerated Dataframes
Check the available GPUs
Jupyter Magic Commands
!nvidia-smi
We will use the %time
magic command command to measure the time it takes to run a single command, and we will use %%time
to measure the runtime of the entire cell.
from time import sleep
%time sleep(2)
%%time
sleep(2)
sleep(1)
GPU Dataframes
Reading Data
import cudf # Can be used inplace of pandas in almost all cases
import cupy as cp # Can be used inplace of numpay in almost all cases
import pandas as pd
import numpy as np
%time gdf = cudf.read_csv('/data/datasets/rapids/pop_1-03.csv')
%time df = pd.read_csv('/data/datasets/rapids/pop_1-03.csv')
gdf.shape
df.shape
Writing Data
GPU
%time blackpool_residents = gdf.loc[gdf['county'] == 'BLACKPOOL']
print(f'{blackpool_residents.shape[0]} residents')
%time blackpool_residents.to_csv('/data/datasets/rapids/blackpool.csv')
CPU
%time blackpool_residents_pd = df.loc[df['county'] == 'BLACKPOOL']
print(f'{blackpool_residents_pd.shape[0]} residents')
%time blackpool_residents_pd.to_csv('/data/datasets/rapids/blackpool_pd.csv')
Data Filtering
GPU
%%time
sunderland_residents = gdf.loc[gdf['county'] == 'Sunderland']
northmost_sunderland_lat = sunderland_residents['lat'].max()
counties_with_pop_north_of = gdf.loc[gdf['lat'] > northmost_sunderland_lat]['county'].unique()
CPU
%%time
sunderland_residents = df.loc[df['county'] == 'Sunderland']
northmost_sunderland_lat = sunderland_residents['lat'].max()
counties_with_pop_north_of = df.loc[df['lat'] > northmost_sunderland_lat]['county'].unique()
Data Cleaning
%time gdf['county'] = gdf['county'].str.title()
%time df['county'] = df['county'].str.title()
GPU Accelerated Graphs
import cugraph as cg # Can be used inplace of networkx
import networkx as nx
Reading the nodes
road_nodes = cudf.read_csv('/data/datasets/rapids/road_nodes_1-06.csv')
road_nodes.head()
road_nodes.dtypes
road_nodes.shape
road_nodes['type'].unique()
Reading the edges
road_edges = cudf.read_csv('/data/datasets/rapids/road_edges_1-06.csv')
road_edges.head()
road_edges.dtypes
road_edges.shape
road_edges['type'].unique()
road_edges['form'].unique()
Cleaning the data
road_edges['src_id'] = road_edges['src_id'].str.lstrip('#')
road_edges['dst_id'] = road_edges['dst_id'].str.lstrip('#')
road_edges[['src_id', 'dst_id']].head()
Creating the graph objects
G = cg.Graph()
%time G.from_cudf_edgelist(road_edges, source='src_id', destination='dst_id', edge_attr='length')
road_edges_cpu = road_edges.to_pandas()
%time G_cpu = nx.convert_matrix.from_pandas_edgelist(road_edges_cpu, source='src_id', target='dst_id', edge_attr='length')
Multiple GPUs using Dask in one Node
import subprocess # we will use this to obtain our local IP using the following command
cmd = "hostname --all-ip-addresses"
process = subprocess.Popen(cmd.split(), stdout=subprocess.PIPE)
output, error = process.communicate()
IPADDR = str(output.decode()).split()[0]
Start the cluster scheduler. If you are on UOB network or using tunneling you can visit the cluster status page.
from dask_cuda import LocalCUDACluster
cluster = LocalCUDACluster(ip=IPADDR)
cluster
# from dask.distributed import SSHCluster
# cluster = SSHCluster(["master", "cn01", "cn02"])
# cluster
from dask.distributed import Client, progress
client = Client(cluster)
!ls -sh /data/datasets/rapids/pop5x_1-07.csv
import dask_cudf # dask ditributed dataferame object
ddf = dask_cudf.read_csv('/data/datasets/rapids/pop_1-03.csv', dtype=['float32', 'str', 'str', 'float32', 'float32', 'str'])
We can see theat the data on both GPUs does not correspond to the total dataset size. That is because dask creates the task graphs first and then execute it when it is needed.
!nvidia-smi
The visualisation will show the parallel task graph and the asigned tasks to each partitioin.
ddf.visualize(format='svg') # This visualization is very large, and using `format='svg'` will make it easier to view.
ddf.npartitions
mean_age = ddf['age'].mean()
mean_age.visualize(format='svg')
The following line will execute the graph.
mean_age.compute()
!nvidia-smi
And this will presist the data on the GPU.
ddf = ddf.persist()
!nvidia-smi
This shows that we no longer have operations on our dask graph.
ddf.visualize(format='svg')
And since we persisted the data, performaing computations on the dataset is much faster from now on.
ddf['age'].mean().compute()
ddf.head() # As a convenience, no need to `.compute` the `head()` method
ddf.count().compute()
ddf.dtypes