[Image classification models] #deeplearning
source: https://www.kaggle.com/knowledgegrappler/a-keras-prototype-0-21174-on-pl/code
from matplotlib import pyplot
from keras.preprocessing.image import ImageDataGenerator
from keras.models import Sequential
from keras.layers import Conv2D, MaxPooling2D, Dense, Dropout, Input, Flatten
from keras.layers import GlobalMaxPooling2D
from keras.layers.normalization import BatchNormalization
from keras.layers.merge import Concatenate
from keras.models import Model
from keras.optimizers import Adam
from keras.callbacks import ModelCheckpoint, Callback, EarlyStopping
def get_callbacks(filepath, patience=2):
es = EarlyStopping('val_loss', patience=patience, mode="min")
msave = ModelCheckpoint(filepath, save_best_only=True)
return [es, msave]
def get_model():
bn_model = 0
p_activation = "elu"
input_1 = Input(shape=(75, 75, 3), name="X_1")
input_2 = Input(shape=[1], name="angle")
img_1 = Conv2D(16, kernel_size = (3,3), activation=p_activation) ((BatchNormalization(momentum=bn_model))(input_1))
img_1 = Conv2D(16, kernel_size = (3,3), activation=p_activation) (img_1)
img_1 = MaxPooling2D((2,2)) (img_1)
img_1 = Dropout(0.2)(img_1)
img_1 = Conv2D(32, kernel_size = (3,3), activation=p_activation) (img_1)
img_1 = Conv2D(32, kernel_size = (3,3), activation=p_activation) (img_1)
img_1 = MaxPooling2D((2,2)) (img_1)
img_1 = Dropout(0.2)(img_1)
img_1 = Conv2D(64, kernel_size = (3,3), activation=p_activation) (img_1)
img_1 = Conv2D(64, kernel_size = (3,3), activation=p_activation) (img_1)
img_1 = MaxPooling2D((2,2)) (img_1)
img_1 = Dropout(0.2)(img_1)
img_1 = Conv2D(128, kernel_size = (3,3), activation=p_activation) (img_1)
img_1 = MaxPooling2D((2,2)) (img_1)
img_1 = Dropout(0.2)(img_1)
img_1 = GlobalMaxPooling2D() (img_1)
img_2 = Conv2D(128, kernel_size = (3,3), activation=p_activation) ((BatchNormalization(momentum=bn_model))(input_1))
img_2 = MaxPooling2D((2,2)) (img_2)
img_2 = Dropout(0.2)(img_2)
img_2 = GlobalMaxPooling2D() (img_2)
img_concat = (Concatenate()([img_1, img_2, BatchNormalization(momentum=bn_model)(input_2)]))
dense_ayer = Dropout(0.5) (BatchNormalization(momentum=bn_model) ( Dense(256, activation=p_activation)(img_concat) ))
dense_ayer = Dropout(0.5) (BatchNormalization(momentum=bn_model) ( Dense(64, activation=p_activation)(dense_ayer) ))
output = Dense(1, activation="sigmoid")(dense_ayer)
model = Model([input_1,input_2], output)
optimizer = Adam(lr=0.001, beta_1=0.9, beta_2=0.999, epsilon=1e-08, decay=0.0)
model.compile(loss="binary_crossentropy", optimizer=optimizer, metrics=["accuracy"])
return model
model = get_model()
model.summary()
file_path = ".model_weights.hdf5"
callbacks = get_callbacks(filepath=file_path, patience=5)
model = get_model()
model.fit([X_train, X_angle_train], y_train, epochs=100
, validation_data=([X_valid, X_angle_valid], y_valid)
, batch_size=32
, callbacks=callbacks)
#Load data
train = pd.read_json("../input/train.json")
test = pd.read_json("../input/test.json")
train.inc_angle = train.inc_angle.replace('na', 0)
train.inc_angle = train.inc_angle.astype(float).fillna(0.0)
print("done!")
done!
# Train data
x_band1 = np.array([np.array(band).astype(np.float32).reshape(75, 75) for band in train["band_1"]])
x_band2 = np.array([np.array(band).astype(np.float32).reshape(75, 75) for band in train["band_2"]])
X_train = np.concatenate([x_band1[:, :, :, np.newaxis]
, x_band2[:, :, :, np.newaxis]
, ((x_band1+x_band1)/2)[:, :, :, np.newaxis]], axis=-1)
X_angle_train = np.array(train.inc_angle)
y_train = np.array(train["is_iceberg"])
# Test data
x_band1 = np.array([np.array(band).astype(np.float32).reshape(75, 75) for band in test["band_1"]])
x_band2 = np.array([np.array(band).astype(np.float32).reshape(75, 75) for band in test["band_2"]])
X_test = np.concatenate([x_band1[:, :, :, np.newaxis]
, x_band2[:, :, :, np.newaxis]
, ((x_band1+x_band1)/2)[:, :, :, np.newaxis]], axis=-1)
X_angle_test = np.array(test.inc_angle)
X_train, X_valid, X_angle_train, X_angle_valid, y_train, y_valid = train_test_split(X_train
, X_angle_train, y_train, random_state=123, train_size=0.75)
source: https://www.kaggle.com/devm2024/transfer-learning-with-vgg-16-cnn-aug-lb-0-1712
Overview of the following core functions
# Fits the model on data generated batch-by-batch by a Python generator.
# The generator is run in parallel to the model, for efficiency. For instance, this allows you to do real-time data augmentation on images on CPU in parallel to training your model on GPU.
callbacks = get_callbacks(filepath=file_path, patience=5)
gen_flow = gen_flow_for_two_inputs(X_train_cv, X_angle_cv, y_train_cv)
galaxyModel= getVggAngleModel()
galaxyModel.fit_generator(
gen_flow,
steps_per_epoch=24,
epochs=100,
shuffle=True,
verbose=1,
validation_data=([X_holdout,X_angle_hold], Y_holdout),
callbacks=callbacks)
def get_callbacks(filepath, patience=2):
es = EarlyStopping('val_loss', patience=10, mode="min")
msave = ModelCheckpoint(filepath, save_best_only=True)
return [es, msave]
from keras.preprocessing.image import ImageDataGenerator
batch_size=64
# Define the image transformations here
gen = ImageDataGenerator(horizontal_flip = True,
vertical_flip = True,
width_shift_range = 0.,
height_shift_range = 0.,
channel_shift_range=0,
zoom_range = 0.2,
rotation_range = 10)
# We use the exact same generator with the same random seed for both the y and angle arrays
def gen_flow_for_two_inputs(X1, X2, y):
genX1 = gen.flow(X1,y, batch_size=batch_size,seed=55)
genX2 = gen.flow(X1,X2, batch_size=batch_size,seed=55)
while True:
X1i = genX1.next()
X2i = genX2.next()
#Assert arrays are equal - this was for peace of mind, but slows down training
#np.testing.assert_array_equal(X1i[0],X2i[0])
yield [X1i[0], X2i[1]], X1i[1]
def getVggAngleModel():
input_2 = Input(shape=[1], name="angle")
angle_layer = Dense(1, )(input_2)
base_model = VGG16(weights='imagenet', include_top=False,
input_shape=X_train.shape[1:], classes=1)
x = base_model.get_layer('block5_pool').output
x = GlobalMaxPooling2D()(x)
merge_one = concatenate([x, angle_layer])
merge_one = Dense(512, activation='relu', name='fc2')(merge_one)
merge_one = Dropout(0.3)(merge_one)
merge_one = Dense(512, activation='relu', name='fc3')(merge_one)
merge_one = Dropout(0.3)(merge_one)
predictions = Dense(1, activation='sigmoid')(merge_one)
model = Model(input=[base_model.input, input_2], output=predictions)
sgd = SGD(lr=1e-3, decay=1e-6, momentum=0.9, nesterov=True)
model.compile(loss='binary_crossentropy',
optimizer=sgd,
metrics=['accuracy'])
return model
#Using K-fold Cross Validation with Data Augmentation.
def myAngleCV(X_train, X_angle, X_test):
K=3
folds = list(StratifiedKFold(n_splits=K, shuffle=True, random_state=16).split(X_train, target_train))
y_test_pred_log = 0
y_train_pred_log=0
y_valid_pred_log = 0.0*target_train
for j, (train_idx, test_idx) in enumerate(folds):
print('\n===================FOLD=',j)
X_train_cv = X_train[train_idx]
y_train_cv = target_train[train_idx]
X_holdout = X_train[test_idx]
Y_holdout= target_train[test_idx]
#Angle
X_angle_cv=X_angle[train_idx]
X_angle_hold=X_angle[test_idx]
#define file path and get callbacks
file_path = "%s_aug_model_weights.hdf5"%j
callbacks = get_callbacks(filepath=file_path, patience=5)
gen_flow = gen_flow_for_two_inputs(X_train_cv, X_angle_cv, y_train_cv)
galaxyModel= getVggAngleModel()
galaxyModel.fit_generator(
gen_flow,
steps_per_epoch=24,
epochs=100,
shuffle=True,
verbose=1,
validation_data=([X_holdout,X_angle_hold], Y_holdout),
callbacks=callbacks)
#Getting the Best Model
galaxyModel.load_weights(filepath=file_path)
#Getting Training Score
score = galaxyModel.evaluate([X_train_cv,X_angle_cv], y_train_cv, verbose=0)
print('Train loss:', score[0])
print('Train accuracy:', score[1])
#Getting Test Score
score = galaxyModel.evaluate([X_holdout,X_angle_hold], Y_holdout, verbose=0)
print('Test loss:', score[0])
print('Test accuracy:', score[1])
#Getting validation Score.
pred_valid=galaxyModel.predict([X_holdout,X_angle_hold])
y_valid_pred_log[test_idx] = pred_valid.reshape(pred_valid.shape[0])
#Getting Test Scores
temp_test=galaxyModel.predict([X_test, X_test_angle])
y_test_pred_log+=temp_test.reshape(temp_test.shape[0])
#Getting Train Scores
temp_train=galaxyModel.predict([X_train, X_angle])
y_train_pred_log+=temp_train.reshape(temp_train.shape[0])
y_test_pred_log=y_test_pred_log/K
y_train_pred_log=y_train_pred_log/K
print('\n Train Log Loss Validation= ',log_loss(target_train, y_train_pred_log))
print(' Test Log Loss Validation= ',log_loss(target_train, y_valid_pred_log))
return y_test_pred_log
preds=myAngleCV(X_train, X_angle, X_test)