public synchronized void add(R record, long timeoutMs) {
throwIfTerminal();
if (bufferedRecords() >= maxBufferedRecords) {
final long addStartTimeMs = time.milliseconds();
for (long elapsedMs = time.milliseconds() - addStartTimeMs;
!isTerminal() && elapsedMs < timeoutMs && bufferedRecords() >= maxBufferedRecords;
elapsedMs = time.milliseconds() - addStartTimeMs) {
try {
wait(timeoutMs - elapsedMs);//释放锁this锁
} catch (InterruptedException e) {
throw new ConnectException(e);
}
}
throwIfTerminal();
if (bufferedRecords() >= maxBufferedRecords) {
throw new ConnectException("Add timeout expired before buffer availability");
}
}
unsentRecords.addLast(record);
notifyAll();//唤醒wait线程
}