如何使用ThreadPool(kotlin)

如何使用ThreadPool(kotlin)

情境

HandlerHandlerThreadHandlerThread2 介紹單一 Thread 如何運作,
如果想要讓多個 Thread 並行, 也可以透過 ThreadPool 來處理。

說明

Java的Executor框架為Thread以及使用的資源有更進一步的控制,
Executor提供一個簡單的interface, 目標是將任務的建立跟執行分開。

interface Executor {  
    fun execute(runnable: Runnable)  
}

class SimpleExecutor : Executor {  
    override fun execute(runnable: Runnable) {  
        Thread(runnable).start()  
    }  
}

不過一般我們不會自己設計Executor,Java提供了 ThreadPoolExecutor 讓我們使用, 有幾個優點 :

  • Thread 能保持存活,等待新任務,不會隨著任務建立再銷毀。
  • Thread Pool 限制最大 Thread 數量,避免系統浪費。
  • Thread 的生命週期被 Thread Pool 控制。

ThreadPoolExecutor 可以自行定義一些設定。

ThreadPoolExecutor(
    int corePoolSize,
    int maxPoolSize,
    long keepAliveTime,
    TimeUnit unit,
    BlockingQueue<Runnable> workQueue
)
  • core pool size (核心緩衝池數量):
    Thread pool 的 Thread 下限,Thread 數量不會低於這個數字,

  • maxumum pool size (最大緩衝池數量):
    Thread pool 的 Thread 最大數量,如果 Thread 都被執行,則 Task 會被塞進 Queue 直到有空閒的 Thread 出現為止,比較好的做法是根據底層硬體來決定數量。

val N = Runtime.getRuntime().availableProcessors()
  • keep-alive time (最大閒置時間) :
    如果超過閒置時間,則系統會回收 core Thread 數量以上的 Thread。

  • task queue type (任務佇列類型):
    被加進來的 Task,根據策略不同,所用的演算法也會不同。

利用 allowCoreThreadTimeOut(true) 這個方法,可以在 Core Thread 閒置的時候,讓系統回收。

val N = Runtime.getRuntime().availableProcessors()
val executor = new ThreadPoolExecutor(
    N,
    N * 2,
    60L,
    TimeUnit.SECONDS,
    LinkedBlockingQueue<Runnable>())
executor.allowCoreThreadTimeOut(true)

這邊有一個重點可能需要注意一下。

val rList = LinkedBlockingQueue<Runnable>()  
rList.add(Runnable {  
	//run 1  
})
rList.add(Runnable {  
	//run 2  
})
val executor = ThreadPoolExecutor(1, 2, 1, TimeUnit.SECONDS, rList)

一開始 Thread Pool 剛建立 Thread,此時並沒有任何 Thread 可以執行 Task,
因此所有的 Task 將會被丟進 Queue 內等待,直到有新的 Task 後來又被加入,
才會連同之前等待的 Task 一起執行,如果要解決這個問題可以預先建立 core thread。

executor.prestartCoreThread()
//or
executor.prestartAllCoreThreads()

呼叫其中一個方法可以先建立 core thread,另外還有一個陷阱就是如果 core thread 設定為 0,那麼講無論如何就必須等有其他的 task 再被加到 Thread Pool 內才會一起把 Queue 內的 task 執行。

val executor = ThreadPoolExecutor(0, 2, 1, TimeUnit.SECONDS, rList)

如果你不想要自建 Excutor,Java 有內建一些 Executor。

val fixExecutor = Executors.newFixedThreadPool(2)
val cacheExecutor = Executors.newCachedThreadPool()
val singleExecutor = Executors.newSingleThreadExecutor()

分別是

  • 固定尺寸執行器 (newFixedThreadPool)
    只要固定好數量,假設是2,那就代表 Thread 永遠固定是 2 個,特性是用完就丟,執行新任務會再開新的 Thread。

  • 動態尺寸執行器 (newCachedThreadPool)
    Thread 會隨著任務多寡,新增或刪除,假設一個 Thread 被閒置 60 秒,系統則會進行移除。

  • 單一執行緒執行器 (newSingleThreadExecutor)
    這個執行器最多只會有一個 Thread,因此是執行緒安全的,但是相對效率會下降,因為 Task 會被阻塞,透過Callable搭配Future可以讓我們更方便管理執行緒。

val executor = Executors.newSingleThreadExecutor()  
val future = executor.submit(object : Callable<Something>() {  
    fun call(): Something {  
        return doLongTask()  
    }  
})  
val result = future.get()

與 Runnable 不同的是 Callable 可以回傳結果,透過 blocking 直到 long task 完成回傳物件,Executor 提供同時多個 Thread 並行的操作。
InvokeAll : 同時並行多個 Thread,並且透過 blocking 來取回每一個 Task 的結果。
InvokeAny: 同時並行多個 Thread,只要有一個回傳成功,則終止剩下的 Task。
也可以利用 ExecutorCompletionService 來查詢所完成的任務,他會將完成任務的結果放置 BlockingQueue 內,透過 polling 的方式,來查詢任務是否完成。