xiaobai5150
1/1/2021 - 6:38 AM

lock wait&notify

    public synchronized void add(R record, long timeoutMs) {
        throwIfTerminal();

        if (bufferedRecords() >= maxBufferedRecords) {
            final long addStartTimeMs = time.milliseconds();
            for (long elapsedMs = time.milliseconds() - addStartTimeMs;
                 !isTerminal() && elapsedMs < timeoutMs && bufferedRecords() >= maxBufferedRecords;
                 elapsedMs = time.milliseconds() - addStartTimeMs) {
                try {
                    wait(timeoutMs - elapsedMs);//释放锁this锁
                } catch (InterruptedException e) {
                    throw new ConnectException(e);
                }
            }
            throwIfTerminal();
            if (bufferedRecords() >= maxBufferedRecords) {
                throw new ConnectException("Add timeout expired before buffer availability");
            }
        }

        unsentRecords.addLast(record);
        notifyAll();//唤醒wait线程
    }