多线程编程

This commit is contained in:
罗祥
2019-11-18 09:31:03 +08:00
parent 2ffc0a3a94
commit 910790be8c
39 changed files with 1605 additions and 0 deletions

View File

@ -0,0 +1,54 @@
package com.heibaiying.atomic;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
/**
* 原子对象
*/
public class J1_SimpleType {
private static int i = 0;
private static AtomicInteger j = new AtomicInteger(0);
/*使用AtomicReference对普通对象进行封装*/
private static AtomicReference<Integer> k = new AtomicReference<>(0);
static class Task implements Runnable {
private CountDownLatch latch;
Task(CountDownLatch latch) {
this.latch = latch;
}
@Override
public void run() {
i++;
j.incrementAndGet();
k.getAndUpdate(x -> x + 1);
latch.countDown();
}
}
public static void main(String[] args) throws InterruptedException {
int number = 10000;
CountDownLatch latch = new CountDownLatch(number);
Semaphore semaphore = new Semaphore(10);
ExecutorService executorService = Executors.newFixedThreadPool(10);
Task task = new Task(latch);
for (int i = 0; i < number; i++) {
semaphore.acquire();
executorService.execute(task);
semaphore.release();
}
latch.await();
System.out.println("输出i的值" + i);
System.out.println("输出j的值" + j.get());
System.out.println("输出K的值" + k.get());
executorService.shutdown();
}
}

View File

@ -0,0 +1,52 @@
package com.heibaiying.atomic;
import java.util.ArrayList;
import java.util.Vector;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
public class J2_ArrayThreadSafe {
// 对集合本生的操作线程安全
private static LinkedBlockingQueue<Integer> LinkedBlockingQueue = new LinkedBlockingQueue<>();
//对集合本生的操作线程安全
private static Vector<Integer> vector = new Vector<>();
//普通集合
private static ArrayList<Integer> arrayList = new ArrayList<>();
static class Task implements Runnable {
private CountDownLatch latch;
Task(CountDownLatch latch) {
this.latch = latch;
}
@Override
public void run() {
for (int i = 0; i < 1000; i++) {
LinkedBlockingQueue.add(i);
vector.add(i);
arrayList.add(i);
}
latch.countDown();
}
}
public static void main(String[] args) throws InterruptedException {
int number = 1000;
CountDownLatch latch = new CountDownLatch(number);
ExecutorService executorService = Executors.newFixedThreadPool(10);
for (int i = 0; i < number; i++) {
executorService.execute(new Task(latch));
}
latch.await();
System.out.println("集合本生的线程安全:");
System.out.println("LinkedBlockingQueue size : " + LinkedBlockingQueue.size());
System.out.println("vector size : " + vector.size());
System.out.println("arrayList size : " + arrayList.size());
executorService.shutdown();
}
}

View File

@ -0,0 +1,61 @@
package com.heibaiying.atomic;
import java.util.ArrayList;
import java.util.Vector;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicIntegerArray;
public class J3_ArrayElementThreadUnsafe {
private static int capacity = 10;
// 保证对集合内元素的操作具有原子性
private static AtomicIntegerArray atomicIntegerArray = new AtomicIntegerArray(capacity);
// 对集合内元素的操作线程不安全
private static Vector<Integer> vector = new Vector<>(capacity);
// 对集合内元素的操作线程不安全
private static ArrayList<Integer> arrayList = new ArrayList<>(capacity);
static {
for (int i = 0; i < capacity; i++) {
arrayList.add(0);
vector.add(0);
}
}
static class Task implements Runnable {
private CountDownLatch latch;
Task(CountDownLatch latch) {
this.latch = latch;
}
@Override
public void run() {
for (int i = 0; i < 1000; i++) {
int num = i % capacity;
atomicIntegerArray.getAndIncrement(num);
vector.set(num, vector.get(num) + 1);
arrayList.set(num, arrayList.get(num) + 1);
}
latch.countDown();
}
}
public static void main(String[] args) throws InterruptedException {
int number = 1000;
CountDownLatch latch = new CountDownLatch(number);
ExecutorService executorService = Executors.newFixedThreadPool(10);
for (int i = 0; i < number; i++) {
executorService.execute(new Task(latch));
}
latch.await();
System.out.println("集合内元素的线程安全:");
System.out.println("atomicIntegerArray size : " + atomicIntegerArray);
System.out.println("vector size : " + vector);
System.out.println("arrayList size : " + arrayList);
executorService.shutdown();
}
}

View File

@ -0,0 +1,66 @@
package com.heibaiying.atomic;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class J4_ThreadUnsafe {
static class Task implements Runnable {
private Candidate candidate;
private CountDownLatch latch;
Task(CountDownLatch latch, Candidate candidate) {
this.candidate = candidate;
this.latch = latch;
}
@Override
public void run() {
candidate.setScore(candidate.getScore() + 1);
latch.countDown();
}
}
public static void main(String[] args) throws InterruptedException {
int number = 100000;
CountDownLatch latch = new CountDownLatch(number);
ExecutorService executorService = Executors.newFixedThreadPool(10);
Candidate candidate = new Candidate("候选人", 0);
for (int i = 0; i < number; i++) {
executorService.execute(new Task(latch, candidate));
}
latch.await();
System.out.println(candidate.getName() + "获得票数:" + candidate.getScore());
executorService.shutdown();
}
private static class Candidate {
private String name;
private volatile int score;
Candidate(String name, int score) {
this.name = name;
this.score = score;
}
public int getScore() {
return score;
}
public void setScore(int score) {
this.score = score;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
}
}

View File

@ -0,0 +1,72 @@
package com.heibaiying.atomic;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
public class J5_AtomicIntegerFieldUpdater {
static class Task implements Runnable {
private Candidate candidate;
private CountDownLatch latch;
private AtomicIntegerFieldUpdater fieldUpdater;
Task(CountDownLatch latch, Candidate candidate, AtomicIntegerFieldUpdater fieldUpdater) {
this.candidate = candidate;
this.latch = latch;
this.fieldUpdater = fieldUpdater;
}
@Override
public void run() {
fieldUpdater.incrementAndGet(candidate);
latch.countDown();
}
}
public static void main(String[] args) throws InterruptedException {
int number = 100000;
CountDownLatch latch = new CountDownLatch(number);
ExecutorService executorService = Executors.newFixedThreadPool(10);
Candidate candidate = new Candidate("候选人", 0);
AtomicIntegerFieldUpdater<Candidate> fieldUpdater = AtomicIntegerFieldUpdater.newUpdater(Candidate.class, "score");
for (int i = 0; i < number; i++) {
executorService.execute(new Task(latch, candidate, fieldUpdater));
}
latch.await();
System.out.println(candidate.getName() + "获得票数:" + candidate.getScore());
executorService.shutdown();
}
private static class Candidate {
private String name;
// 1. 不能声明为private 2. 必须用volatile关键字修饰
public volatile int score;
Candidate(String name, int score) {
this.name = name;
this.score = score;
}
public int getScore() {
return score;
}
public void setScore(int score) {
this.score = score;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
}
}

View File

@ -0,0 +1,47 @@
package com.heibaiying.atomic;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;
public class J6_SynchronousQueue {
private static SynchronousQueue<Double> queue = new SynchronousQueue<>();
static class ReadThread implements Runnable {
@Override
public void run() {
System.out.println("读线程启动");
while (true) {
try {
Double peek = queue.take();
System.out.println("读线程获取值:" + peek);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
static class WriteThread implements Runnable {
@Override
public void run() {
System.out.println("写线程写入值");
try {
queue.put(Math.random());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) throws InterruptedException {
new Thread(new ReadThread()).start();
Thread.sleep(3000);
ScheduledExecutorService pool = Executors.newScheduledThreadPool(3);
pool.scheduleAtFixedRate(new WriteThread(), 0, 2, TimeUnit.SECONDS);
}
}