diff --git a/hbase11xsqlreader/doc/hbase11xsqlreader.md b/hbase11xsqlreader/doc/hbase11xsqlreader.md
index 03261a1f..9f70077f 100644
--- a/hbase11xsqlreader/doc/hbase11xsqlreader.md
+++ b/hbase11xsqlreader/doc/hbase11xsqlreader.md
@@ -60,12 +60,16 @@ hbase11xsqlreader插件实现了从Phoenix(HBase SQL)读取数据。在底层实
                          //填写连接Phoenix的hbase集群zk地址
                         "hbaseConfig": {
                             "hbase.zookeeper.quorum": "hb-proxy-xxx-002.hbase.rds.aliyuncs.com,hb-proxy-xxx-001.hbase.rds.aliyuncs.com,hb-proxy-xxx-003.hbase.rds.aliyuncs.com"
-                        },  
+                        }, 
+                        //填写要读取的phoenix的命名空间
+                        "schema": "TAG",
                         //填写要读取的phoenix的表名
                         "table": "US_POPULATION",
                         //填写要读取的列名,不填读取所有列
                         "column": [
-                        ]   
+                        ],
+                        //查询条件
+                       "where": "id="
                     }   
                 },  
                 "writer": {
@@ -92,11 +96,18 @@ hbase11xsqlreader插件实现了从Phoenix(HBase SQL)读取数据。在底层实
  
 	* 必选:是 
  
+	* 默认值:无 
+* **schema**
+ 
+	* 描述:编写Phoenix中的namespace,该值设置为''
+ 
+	* 必选:是 
+ 
 	* 默认值:无 
  
 * **table**
  
-	* 描述:编写Phoenix中的表名,如果有namespace,该值设置为'namespace.tablename'
+	* 描述:编写Phoenix中的表名,该值设置为'tablename'
  
 	* 必选:是 
  
@@ -109,7 +120,13 @@ hbase11xsqlreader插件实现了从Phoenix(HBase SQL)读取数据。在底层实
 	* 必选:是 
  
 	* 默认值:无 
+* **where**
+
+	* 描述:填写需要从phoenix表中读取条件判断。
  
+	* 可选:是 
+ 
+	* 默认值:无 
 
 ### 3.3 类型转换
 
@@ -172,11 +189,14 @@ hbase11xsqlreader插件实现了从Phoenix(HBase SQL)读取数据。在底层实
                         "hbaseConfig": {
                             "hbase.zookeeper.quorum": "hb-proxy-xxx-002.hbase.rds.aliyuncs.com,hb-proxy-xxx-001.hbase.rds.aliyuncs.com,hb-proxy-xxx-003.hbase.rds.aliyuncs.com"
                         },  
+                        "schema": "TAG",
                         //填写要读取的phoenix的表名
                         "table": "US_POPULATION",
                         //填写要读取的列名,不填读取所有列
                         "column": [
-                        ]   
+                        ],
+                        //查询条件
+                       "where": "id="
                     }   
                 },  
                 "writer": {
@@ -204,7 +224,13 @@ hbase11xsqlreader插件实现了从Phoenix(HBase SQL)读取数据。在底层实
 	* 必选:是 
  
 	* 默认值:无 
- 
+* **schema**
+  
+ 	* 描述:编写Phoenix中的namespace,该值设置为''
+  
+ 	* 必选:是 
+  
+ 	* 默认值:无 
 * **table**
  
 	* 描述:编写Phoenix中的表名,如果有namespace,该值设置为'namespace.tablename'
@@ -220,7 +246,13 @@ hbase11xsqlreader插件实现了从Phoenix(HBase SQL)读取数据。在底层实
 	* 必选:是 
  
 	* 默认值:无 
+ * **where**
  
+ 	* 描述:填写需要从phoenix表中读取条件判断。
+  
+ 	* 可选:是 
+  
+ 	* 默认值:无 
 
 ### 3.3 类型转换
 
diff --git a/hbase11xsqlreader/src/main/java/com/alibaba/datax/plugin/reader/hbase11xsqlreader/HbaseSQLHelper.java b/hbase11xsqlreader/src/main/java/com/alibaba/datax/plugin/reader/hbase11xsqlreader/HbaseSQLHelper.java
index 71665a6b..cf4304ee 100644
--- a/hbase11xsqlreader/src/main/java/com/alibaba/datax/plugin/reader/hbase11xsqlreader/HbaseSQLHelper.java
+++ b/hbase11xsqlreader/src/main/java/com/alibaba/datax/plugin/reader/hbase11xsqlreader/HbaseSQLHelper.java
@@ -26,9 +26,7 @@ import java.io.IOException;
 import java.sql.Connection;
 import java.sql.DriverManager;
 import java.sql.SQLException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
 
 
 public class HbaseSQLHelper {
@@ -50,11 +48,15 @@ public class HbaseSQLHelper {
         String zkUrl = readerConfig.getZkUrl();
 
         PhoenixConfigurationUtil.setInputClass(conf, PhoenixRecordWritable.class);
-        PhoenixConfigurationUtil.setInputTableName(conf, table);
+
+        PhoenixConfigurationUtil.setInputTableName(conf, readerConfig.getSchema()+"."+table);
 
         if (!columns.isEmpty()) {
             PhoenixConfigurationUtil.setSelectColumnNames(conf, columns.toArray(new String[columns.size()]));
         }
+        if(Objects.nonNull(readerConfig.getWhere())){
+            PhoenixConfigurationUtil.setInputTableConditions(conf,readerConfig.getWhere());
+        }
         PhoenixEmbeddedDriver.ConnectionInfo info = null;
         try {
             info = PhoenixEmbeddedDriver.ConnectionInfo.create(zkUrl);
@@ -67,15 +69,19 @@ public class HbaseSQLHelper {
             conf.setInt(HConstants.ZOOKEEPER_CLIENT_PORT, info.getPort());
         if (info.getRootNode() != null)
             conf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, info.getRootNode());
+        conf.set(Key.NAME_SPACE_MAPPING_ENABLED,"true");
+        conf.set(Key.SYSTEM_TABLES_TO_NAMESPACE,"true");
         return conf;
     }
 
-    public static List getPColumnNames(String connectionString, String tableName) throws SQLException {
-        Connection con =
-                DriverManager.getConnection(connectionString);
+    public static List getPColumnNames(String connectionString, String tableName,String schema) throws SQLException {
+        Properties pro = new Properties();
+        pro.put(Key.NAME_SPACE_MAPPING_ENABLED, true);
+        pro.put(Key.SYSTEM_TABLES_TO_NAMESPACE, true);
+        Connection con = DriverManager.getConnection(connectionString,pro);
         PhoenixConnection phoenixConnection = con.unwrap(PhoenixConnection.class);
         MetaDataClient metaDataClient = new MetaDataClient(phoenixConnection);
-        PTable table = metaDataClient.updateCache("", tableName).getTable();
+        PTable table = metaDataClient.updateCache(schema, tableName).getTable();
         List columnNames = new ArrayList();
         for (PColumn pColumn : table.getColumns()) {
             if (!pColumn.getName().getString().equals(SaltingUtil.SALTING_COLUMN_NAME))
diff --git a/hbase11xsqlreader/src/main/java/com/alibaba/datax/plugin/reader/hbase11xsqlreader/HbaseSQLReaderConfig.java b/hbase11xsqlreader/src/main/java/com/alibaba/datax/plugin/reader/hbase11xsqlreader/HbaseSQLReaderConfig.java
index ab06f6e1..37060986 100644
--- a/hbase11xsqlreader/src/main/java/com/alibaba/datax/plugin/reader/hbase11xsqlreader/HbaseSQLReaderConfig.java
+++ b/hbase11xsqlreader/src/main/java/com/alibaba/datax/plugin/reader/hbase11xsqlreader/HbaseSQLReaderConfig.java
@@ -9,6 +9,7 @@ import org.slf4j.LoggerFactory;
 
 import java.sql.SQLException;
 import java.util.List;
+import java.util.StringJoiner;
 
 public class HbaseSQLReaderConfig {
     private final static Logger LOG = LoggerFactory.getLogger(HbaseSQLReaderConfig.class);
@@ -27,6 +28,9 @@ public class HbaseSQLReaderConfig {
     private String tableName;
     private List columns;         // 目的表的所有列的列名,包括主键和非主键,不包括时间列
 
+    private String where;//条件
+
+    private String schema;//
     /**
      * @return 获取原始的datax配置
      */
@@ -96,22 +100,27 @@ public class HbaseSQLReaderConfig {
         }
         String zkQuorum = zkCfg.getFirst();
         String znode = zkCfg.getSecond();
+
         if (zkQuorum == null || zkQuorum.isEmpty()) {
             throw DataXException.asDataXException(
                     HbaseSQLReaderErrorCode.ILLEGAL_VALUE, "HBase的hbase.zookeeper.quorum配置不能为空" );
         }
         // 生成sql使用的连接字符串, 格式: jdbc:hbase:zk_quorum:2181:/znode_parent
-        cfg.connectionString = "jdbc:phoenix:" + zkQuorum;
-        cfg.zkUrl = zkQuorum + ":2181";
+        StringBuilder connectionString=new StringBuilder("jdbc:phoenix:");
+        connectionString.append(zkQuorum);
+        cfg.connectionString = connectionString.toString();
+        StringBuilder zkUrl =new StringBuilder(zkQuorum);
+        cfg.zkUrl = zkUrl.append(":2181").toString();
         if (!znode.isEmpty()) {
-            cfg.connectionString += cfg.connectionString + ":" + znode;
-            cfg.zkUrl += cfg.zkUrl + ":" + znode;
+            cfg.connectionString =  connectionString.append(":").append(znode).toString();
+            cfg.zkUrl=zkUrl.append(":").append(znode).toString();
         }
     }
 
     private static void parseTableConfig(HbaseSQLReaderConfig cfg, Configuration dataxCfg) {
         // 解析并检查表名
         cfg.tableName = dataxCfg.getString(Key.TABLE);
+        cfg.schema = dataxCfg.getString(Key.SCHEMA);
         if (cfg.tableName == null || cfg.tableName.isEmpty()) {
             throw DataXException.asDataXException(
                     HbaseSQLReaderErrorCode.ILLEGAL_VALUE, "HBase的tableName配置不能为空,请检查并修改配置." );
@@ -124,13 +133,14 @@ public class HbaseSQLReaderConfig {
                     HbaseSQLReaderErrorCode.ILLEGAL_VALUE, "您配置的tableName含有非法字符{0},请检查您的配置.");
         } else if (cfg.columns.isEmpty()) {
             try {
-                cfg.columns = HbaseSQLHelper.getPColumnNames(cfg.connectionString, cfg.tableName);
+                cfg.columns = HbaseSQLHelper.getPColumnNames(cfg.connectionString, cfg.tableName,cfg.schema);
                 dataxCfg.set(Key.COLUMN, cfg.columns);
             } catch (SQLException e) {
                 throw DataXException.asDataXException(
                         HbaseSQLReaderErrorCode.GET_PHOENIX_COLUMN_ERROR, "HBase的columns配置不能为空,请添加目标表的列名配置." + e.getMessage(), e);
             }
         }
+        cfg.where=dataxCfg.getString(Key.WHERE);
     }
 
     @Override
@@ -151,6 +161,8 @@ public class HbaseSQLReaderConfig {
             ret.append(",");
         }
         ret.setLength(ret.length() - 1);
+        ret.append("[where=]").append(getWhere());
+        ret.append("[schema=]").append(getSchema());
         ret.append("\n");
 
         return ret.toString();
@@ -161,4 +173,20 @@ public class HbaseSQLReaderConfig {
      */
     private HbaseSQLReaderConfig() {
     }
+
+    public String getWhere() {
+        return where;
+    }
+
+    public void setWhere(String where) {
+        this.where = where;
+    }
+
+    public String getSchema() {
+        return schema;
+    }
+
+    public void setSchema(String schema) {
+        this.schema = schema;
+    }
 }
diff --git a/hbase11xsqlreader/src/main/java/com/alibaba/datax/plugin/reader/hbase11xsqlreader/HbaseSQLReaderTask.java b/hbase11xsqlreader/src/main/java/com/alibaba/datax/plugin/reader/hbase11xsqlreader/HbaseSQLReaderTask.java
index 1ca22c6f..461649d1 100644
--- a/hbase11xsqlreader/src/main/java/com/alibaba/datax/plugin/reader/hbase11xsqlreader/HbaseSQLReaderTask.java
+++ b/hbase11xsqlreader/src/main/java/com/alibaba/datax/plugin/reader/hbase11xsqlreader/HbaseSQLReaderTask.java
@@ -19,10 +19,8 @@ import org.slf4j.LoggerFactory;
 import java.io.IOException;
 import java.math.BigDecimal;
 import java.sql.*;
-import java.util.HashMap;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
+import java.sql.Date;
+import java.util.*;
 
 /**
  * Created by admin on 1/3/18.
@@ -42,11 +40,14 @@ public class HbaseSQLReaderTask {
     }
 
     private void getPColumns() throws SQLException {
+        Properties pro = new Properties();
+        pro.put(Key.NAME_SPACE_MAPPING_ENABLED, true);
+        pro.put(Key.SYSTEM_TABLES_TO_NAMESPACE, true);
         Connection con =
-                DriverManager.getConnection(this.readerConfig.getConnectionString());
+                DriverManager.getConnection(this.readerConfig.getConnectionString(),pro);
         PhoenixConnection phoenixConnection = con.unwrap(PhoenixConnection.class);
         MetaDataClient metaDataClient = new MetaDataClient(phoenixConnection);
-        PTable table = metaDataClient.updateCache("", this.readerConfig.getTableName()).getTable();
+        PTable table = metaDataClient.updateCache(this.readerConfig.getSchema(), this.readerConfig.getTableName()).getTable();
         List columnNames = this.readerConfig.getColumns();
         for (PColumn pColumn : table.getColumns()) {
             if (columnNames.contains(pColumn.getName().getString())) {
diff --git a/hbase11xsqlreader/src/main/java/com/alibaba/datax/plugin/reader/hbase11xsqlreader/Key.java b/hbase11xsqlreader/src/main/java/com/alibaba/datax/plugin/reader/hbase11xsqlreader/Key.java
index 7987d6c8..f8453add 100644
--- a/hbase11xsqlreader/src/main/java/com/alibaba/datax/plugin/reader/hbase11xsqlreader/Key.java
+++ b/hbase11xsqlreader/src/main/java/com/alibaba/datax/plugin/reader/hbase11xsqlreader/Key.java
@@ -24,5 +24,18 @@ public final class Key {
      * 【必选】列配置
      */
     public final static String COLUMN = "column";
+    /**
+     *
+     */
+    public static final String WHERE = "where";
+
+    /**
+     * 【可选】Phoenix表所属schema,默认为空
+     */
+    public static final String SCHEMA = "schema";
+
+    public static final String NAME_SPACE_MAPPING_ENABLED = "phoenix.schema.isNamespaceMappingEnabled";
+
+    public static final String SYSTEM_TABLES_TO_NAMESPACE = "phoenix.schema.mapSystemTablesToNamespace";
 
 }